Skip to content

Commit

Permalink
add Clustering Coefficient algorithm (#15)
Browse files Browse the repository at this point in the history
* add Clustering Coefficient algorithm

* format result

* remove CC license
  • Loading branch information
Nicole00 committed Nov 30, 2021
1 parent 1001def commit 0337b2b
Show file tree
Hide file tree
Showing 19 changed files with 224 additions and 51 deletions.
14 changes: 0 additions & 14 deletions example/src/main/resources/data.csv
Original file line number Diff line number Diff line change
@@ -1,14 +0,0 @@
id,col1,col2,col3,col4,col5,col6,col7,col8,col9,col10,col11,col12,col13
1,Tom,tom,10,20,30,40,2021-01-27,2021-01-01T12:10:10,43535232,true,1.0,2.0,10:10:10
2,Jina,Jina,11,21,31,41,2021-01-28,2021-01-02T12:10:10,43535232,false,1.1,2.1,11:10:10
3,Tim,Tim,12,22,32,42,2021-01-29,2021-01-03T12:10:10,43535232,false,1.2,2.2,12:10:10
4,张三,张三,13,23,33,43,2021-01-30,2021-01-04T12:10:10,43535232,true,1.3,2.3,13:10:10
5,李四,李四,14,24,34,44,2021-02-01,2021-01-05T12:10:10,43535232,false,1.4,2.4,14:10:10
6,王五,王五,15,25,35,45,2021-02-02,2021-01-06T12:10:10,0,false,1.5,2.5,15:10:10
7,Jina,Jina,16,26,36,46,2021-02-03,2021-01-07T12:10:10,43535232,true,1.6,2.6,16:10:10
8,Jina,Jina,17,27,37,47,2021-02-04,2021-01-08T12:10:10,43535232,false,1.7,2.7,17:10:10
9,Jina,Jina,18,28,38,48,2021-02-05,2021-01-09T12:10:10,43535232,true,1.8,2.8,18:10:10
10,Jina,Jina,19,29,39,49,2021-02-06,2021-01-10T12:10:10,43535232,false,1.9,2.9,19:10:10
-1,Jina,Jina,20,30,40,50,2021-02-07,2021-02-11T12:10:10,43535232,false,2.0,3.0,20:10:10
-2,Jina,Jina,21,31,41,51,2021-02-08,2021-03-12T12:10:10,43535232,false,2.1,3.1,21:10:10
-3,Jina,Jina,22,32,42,52,2021-02-09,2021-04-13T12:10:10,43535232,false,2.2,3.2,22:10:10
10 changes: 0 additions & 10 deletions example/src/main/resources/edge

This file was deleted.

Empty file.
10 changes: 0 additions & 10 deletions example/src/main/resources/vertex

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License,
* attached with Common Clause Condition 1.0, found in the LICENSES directory.
*/

package com.vesoft.nebula.algorithm

object ClusteringCoefficientExample {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

package com.vesoft.nebula.algorithm

object DegreeStaticExample {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

package com.vesoft.nebula.algorithm

object GraphTriangleCountExample {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

package com.vesoft.nebula.algorithm

object LpaExample {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

package com.vesoft.nebula.algorithm

object PageRankExample {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

package com.vesoft.nebula.algorithm
object ReadData {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

package com.vesoft.nebula.algorithm
object SCCExample {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

package com.vesoft.nebula.algorithm
object ShortestPathExample {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

package com.vesoft.nebula.algorithm
object TriangleCountExample {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
/* Copyright (c) 2020 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

package com.vesoft.nebula.algorithm
object WCCExample {}
9 changes: 8 additions & 1 deletion nebula-algorithm/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,18 @@
# graphTriangleCount parameter
graphtrianglecount:{}

# Betweenness centrality parameter
# Betweenness centrality parameter. maxIter parameter means the max times of iterations.
betweenness:{
maxIter:5
}

# Clustering Coefficient parameter. The type parameter has two choice, local or global
# local type will compute the clustering coefficient for each vertex, and print the average coefficient for graph.
# global type just compute the graph's clustering coefficient.
clusteringcoefficient:{
type: local
}

# SingleSourceShortestPathAlgo parameter
singlesourceshortestpath:{
sourceid:"1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import com.vesoft.nebula.algorithm.config.{
AlgoConfig,
BetweennessConfig,
CcConfig,
CoefficientConfig,
Configs,
HanpConfig,
KCoreConfig,
Expand All @@ -22,6 +23,7 @@ import com.vesoft.nebula.algorithm.config.{
}
import com.vesoft.nebula.algorithm.lib.{
BetweennessCentralityAlgo,
ClusteringCoefficientAlgo,
ClosenessAlgo,
ConnectedComponentsAlgo,
DegreeStaticAlgo,
Expand Down Expand Up @@ -168,6 +170,10 @@ object Main {
case "graphtrianglecount" => {
GraphTriangleCountAlgo(spark, dataSet)
}
case "clusteringcoefficient" => {
val coefficientConfig = CoefficientConfig.getCoefficientConfig(configs)
ClusteringCoefficientAlgo(spark, dataSet, coefficientConfig)
}
case "closeness" => {
ClosenessAlgo(spark, dataSet, hasWeight)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,24 @@ object BetweennessConfig {
}
}

/**
* ClusterCoefficient
* algoType has two options: local or global
*/
case class CoefficientConfig(algoType: String)

object CoefficientConfig {
var algoType: String = _

def getCoefficientConfig(configs: Configs): CoefficientConfig = {
val coefficientConfig = configs.algorithmConfig.map
algoType = coefficientConfig("algorithm.clustercoefficient.type")
assert(algoType.equalsIgnoreCase("local") || algoType.equalsIgnoreCase("global"),
"ClusterCoefficient only support local or global type.")
CoefficientConfig(algoType)
}
}

/**
* Hanp
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,20 +348,21 @@ object Configs {
}

object AlgoConstants {
val ALGO_ID_COL: String = "_id"
val PAGERANK_RESULT_COL: String = "pagerank"
val LOUVAIN_RESULT_COL: String = "louvain"
val KCORE_RESULT_COL: String = "kcore"
val LPA_RESULT_COL: String = "lpa"
val CC_RESULT_COL: String = "cc"
val SCC_RESULT_COL: String = "scc"
val BETWEENNESS_RESULT_COL: String = "betweennedss"
val SHORTPATH_RESULT_COL: String = "shortestpath"
val DEGREE_RESULT_COL: String = "degree"
val INDEGREE_RESULT_COL: String = "inDegree"
val OUTDEGREE_RESULT_COL: String = "outDegree"
val TRIANGLECOUNT_RESULT_COL: String = "tranglecount"
val CLOSENESS_RESULT_COL: String = "closeness"
val HANP_RESULT_COL: String = "hanp"
val NODE2VEC_RESULT_COL: String = "node2vec"
val ALGO_ID_COL: String = "_id"
val PAGERANK_RESULT_COL: String = "pagerank"
val LOUVAIN_RESULT_COL: String = "louvain"
val KCORE_RESULT_COL: String = "kcore"
val LPA_RESULT_COL: String = "lpa"
val CC_RESULT_COL: String = "cc"
val SCC_RESULT_COL: String = "scc"
val BETWEENNESS_RESULT_COL: String = "betweennedss"
val SHORTPATH_RESULT_COL: String = "shortestpath"
val DEGREE_RESULT_COL: String = "degree"
val INDEGREE_RESULT_COL: String = "inDegree"
val OUTDEGREE_RESULT_COL: String = "outDegree"
val TRIANGLECOUNT_RESULT_COL: String = "tranglecount"
val CLUSTERCOEFFICIENT_RESULT_COL: String = "clustercoefficient"
val CLOSENESS_RESULT_COL: String = "closeness"
val HANP_RESULT_COL: String = "hanp"
val NODE2VEC_RESULT_COL: String = "node2vec"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/* Copyright (c) 2021 vesoft inc. All rights reserved.
*
* This source code is licensed under Apache 2.0 License.
*/

package com.vesoft.nebula.algorithm.lib

import com.vesoft.nebula.algorithm.config.{AlgoConstants, CoefficientConfig, KCoreConfig}
import com.vesoft.nebula.algorithm.utils.NebulaUtil
import org.apache.log4j.Logger
import org.apache.spark.graphx.Graph
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession}

object ClusteringCoefficientAlgo {
private val LOGGER = Logger.getLogger(this.getClass)

val ALGORITHM: String = "ClusterCoefficientAlgo"

/**
* run the clusterCoefficient algorithm for nebula graph
*/
def apply(spark: SparkSession,
dataset: Dataset[Row],
coefficientConfig: CoefficientConfig): DataFrame = {

val graph: Graph[None.type, Double] = NebulaUtil.loadInitGraph(dataset, false)
var algoResult: DataFrame = null

if (coefficientConfig.algoType.equalsIgnoreCase("local")) {
// compute local clustering coefficient
val localClusterCoefficient = executeLocalCC(graph)
val schema = StructType(
List(
StructField(AlgoConstants.ALGO_ID_COL, LongType, nullable = false),
StructField(AlgoConstants.CLUSTERCOEFFICIENT_RESULT_COL, DoubleType, nullable = true)
))
algoResult = spark.sqlContext.createDataFrame(localClusterCoefficient, schema)
// print the graph's average clustering coefficient

import spark.implicits._
val vertexNum = algoResult.count()

val averageCoeff: Double =
if (vertexNum == 0) 0
else
algoResult.map(row => row.get(1).toString.toDouble).reduce(_ + _) / algoResult.count()
LOGGER.info(s"graph's average clustering coefficient is ${averageCoeff}")

} else {
// compute global clustering coefficient
val GlobalClusterCoefficient: Double = executeGlobalCC(graph)
val list = List(GlobalClusterCoefficient)
val rdd = spark.sparkContext.parallelize(list).map(row => Row(row))

val schema = StructType(
List(
StructField("globalClusterCoefficient", DoubleType, nullable = false)
))
algoResult = spark.sqlContext.createDataFrame(rdd, schema)
}
algoResult
}

/**
* execute local cluster coefficient
*/
def executeLocalCC(graph: Graph[None.type, Double]): RDD[Row] = {
// compute the actual triangle count for each vertex
val triangleNum = graph.triangleCount().vertices
// compute the open triangle count for each vertex
val idealTriangleNum = graph.degrees.mapValues(degree => degree * (degree - 1) / 2)
val result = triangleNum
.innerJoin(idealTriangleNum) { (vid, actualCount, idealCount) =>
{
if (idealCount == 0) 0.0
else (actualCount / idealCount * 1.0).formatted("%.6f").toDouble
}
}
.map(vertex => Row(vertex._1, vertex._2))

result
}

/**
* execute global cluster coefficient
*/
def executeGlobalCC(graph: Graph[None.type, Double]): Double = {
// compute the number of closed triangle
val closedTriangleNum = graph.triangleCount().vertices.map(_._2).reduce(_ + _)
// compute the number of open triangle and closed triangle (According to C(n,2)=n*(n-1)/2)
val triangleNum = graph.degrees.map(vertex => (vertex._2 * (vertex._2 - 1)) / 2.0).reduce(_ + _)
if (triangleNum == 0)
0.0
else
(closedTriangleNum / triangleNum * 1.0).formatted("%.6f").toDouble
}
}

0 comments on commit 0337b2b

Please sign in to comment.