From 0337b2bee1f312996a9cb867ec6be245ac0c0c9f Mon Sep 17 00:00:00 2001 From: Anqi Date: Tue, 30 Nov 2021 17:04:54 +0800 Subject: [PATCH] add Clustering Coefficient algorithm (#15) * add Clustering Coefficient algorithm * format result * remove CC license --- example/src/main/resources/data.csv | 14 --- example/src/main/resources/edge | 10 -- example/src/main/resources/test.csv | 0 example/src/main/resources/vertex | 10 -- .../ClusteringCoefficientExample.scala | 9 ++ .../algorithm/DegreeStaticExample.scala | 8 ++ .../algorithm/GraphTriangleCountExample.scala | 8 ++ .../vesoft/nebula/algorithm/LpaExample.scala | 8 ++ .../nebula/algorithm/PageRankExample.scala | 8 ++ .../vesoft/nebula/algorithm/ReadData.scala | 7 ++ .../vesoft/nebula/algorithm/SCCExample.scala | 7 ++ .../algorithm/ShortestPathExample.scala | 7 ++ .../algorithm/TriangleCountExample.scala | 7 ++ .../vesoft/nebula/algorithm/WCCExample.scala | 7 ++ .../src/main/resources/application.conf | 9 +- .../com/vesoft/nebula/algorithm/Main.scala | 6 ++ .../nebula/algorithm/config/AlgoConfig.scala | 18 ++++ .../nebula/algorithm/config/Configs.scala | 33 ++++--- .../lib/ClusteringCoefficientAlgo.scala | 99 +++++++++++++++++++ 19 files changed, 224 insertions(+), 51 deletions(-) delete mode 100644 example/src/main/resources/edge create mode 100644 example/src/main/resources/test.csv delete mode 100644 example/src/main/resources/vertex create mode 100644 example/src/main/scala/com/vesoft/nebula/algorithm/ClusteringCoefficientExample.scala create mode 100644 example/src/main/scala/com/vesoft/nebula/algorithm/DegreeStaticExample.scala create mode 100644 example/src/main/scala/com/vesoft/nebula/algorithm/GraphTriangleCountExample.scala create mode 100644 example/src/main/scala/com/vesoft/nebula/algorithm/LpaExample.scala create mode 100644 example/src/main/scala/com/vesoft/nebula/algorithm/PageRankExample.scala create mode 100644 example/src/main/scala/com/vesoft/nebula/algorithm/ReadData.scala create mode 100644 example/src/main/scala/com/vesoft/nebula/algorithm/SCCExample.scala create mode 100644 example/src/main/scala/com/vesoft/nebula/algorithm/ShortestPathExample.scala create mode 100644 example/src/main/scala/com/vesoft/nebula/algorithm/TriangleCountExample.scala create mode 100644 example/src/main/scala/com/vesoft/nebula/algorithm/WCCExample.scala create mode 100644 nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/ClusteringCoefficientAlgo.scala diff --git a/example/src/main/resources/data.csv b/example/src/main/resources/data.csv index d4966c1..e69de29 100644 --- a/example/src/main/resources/data.csv +++ b/example/src/main/resources/data.csv @@ -1,14 +0,0 @@ -id,col1,col2,col3,col4,col5,col6,col7,col8,col9,col10,col11,col12,col13 -1,Tom,tom,10,20,30,40,2021-01-27,2021-01-01T12:10:10,43535232,true,1.0,2.0,10:10:10 -2,Jina,Jina,11,21,31,41,2021-01-28,2021-01-02T12:10:10,43535232,false,1.1,2.1,11:10:10 -3,Tim,Tim,12,22,32,42,2021-01-29,2021-01-03T12:10:10,43535232,false,1.2,2.2,12:10:10 -4,张三,张三,13,23,33,43,2021-01-30,2021-01-04T12:10:10,43535232,true,1.3,2.3,13:10:10 -5,李四,李四,14,24,34,44,2021-02-01,2021-01-05T12:10:10,43535232,false,1.4,2.4,14:10:10 -6,王五,王五,15,25,35,45,2021-02-02,2021-01-06T12:10:10,0,false,1.5,2.5,15:10:10 -7,Jina,Jina,16,26,36,46,2021-02-03,2021-01-07T12:10:10,43535232,true,1.6,2.6,16:10:10 -8,Jina,Jina,17,27,37,47,2021-02-04,2021-01-08T12:10:10,43535232,false,1.7,2.7,17:10:10 -9,Jina,Jina,18,28,38,48,2021-02-05,2021-01-09T12:10:10,43535232,true,1.8,2.8,18:10:10 -10,Jina,Jina,19,29,39,49,2021-02-06,2021-01-10T12:10:10,43535232,false,1.9,2.9,19:10:10 --1,Jina,Jina,20,30,40,50,2021-02-07,2021-02-11T12:10:10,43535232,false,2.0,3.0,20:10:10 --2,Jina,Jina,21,31,41,51,2021-02-08,2021-03-12T12:10:10,43535232,false,2.1,3.1,21:10:10 --3,Jina,Jina,22,32,42,52,2021-02-09,2021-04-13T12:10:10,43535232,false,2.2,3.2,22:10:10 diff --git a/example/src/main/resources/edge b/example/src/main/resources/edge deleted file mode 100644 index 0681588..0000000 --- a/example/src/main/resources/edge +++ /dev/null @@ -1,10 +0,0 @@ -{"src":12345,"dst":23456,"degree":34, "descr": "aaa","timep": "2020-01-01"} -{"src":11111,"dst":22222,"degree":33, "descr": "aaa","timep": "2020-01-01"} -{"src":11111,"dst":33333,"degree":32, "descr": "a\baa","timep": "2020-01-01"} -{"src":11111,"dst":44444,"degree":31, "descr": "aaa","timep": "2020-01-01"} -{"src":22222,"dst":55555,"degree":30, "descr": "a\naa","timep": "2020-01-01"} -{"src":33333,"dst":44444,"degree":29, "descr": "aaa","timep": "2020-01-01"} -{"src":33333,"dst":55555,"degree":28, "descr": "aa\ta","timep": "2020-01-01"} -{"src":44444,"dst":22222,"degree":27, "descr": "aaa","timep": "2020-01-01"} -{"src":44444,"dst":55555,"degree":26, "descr": "aaa","timep": "2020-01-01"} -{"src":22222,"dst":66666,"degree":25, "descr": "aaa","timep": "2020-01-01"} \ No newline at end of file diff --git a/example/src/main/resources/test.csv b/example/src/main/resources/test.csv new file mode 100644 index 0000000..e69de29 diff --git a/example/src/main/resources/vertex b/example/src/main/resources/vertex deleted file mode 100644 index f66c62f..0000000 --- a/example/src/main/resources/vertex +++ /dev/null @@ -1,10 +0,0 @@ -{"id":12,"name":"Tom","age":20,"born": "2000-01-01"} -{"id":13,"name":"Bob","age":21,"born": "1999-01-02"} -{"id":14,"name":"Jane","age":22,"born": "1998-01-03"} -{"id":15,"name":"Jena","age":23,"born": "1997-01-04"} -{"id":16,"name":"Nic","age":24,"born": "1996-01-05"} -{"id":17,"name":"Mei","age":25,"born": "1995-01-06"} -{"id":18,"name":"HH","age":26,"born": "1994-01-07"} -{"id":19,"name":"Tyler","age":27,"born": "1993-01-08"} -{"id":20,"name":"Ber","age":28,"born": "1992-01-09"} -{"id":21,"name":"Mercy","age":29,"born": "1991-01-10"} \ No newline at end of file diff --git a/example/src/main/scala/com/vesoft/nebula/algorithm/ClusteringCoefficientExample.scala b/example/src/main/scala/com/vesoft/nebula/algorithm/ClusteringCoefficientExample.scala new file mode 100644 index 0000000..e20207b --- /dev/null +++ b/example/src/main/scala/com/vesoft/nebula/algorithm/ClusteringCoefficientExample.scala @@ -0,0 +1,9 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License, + * attached with Common Clause Condition 1.0, found in the LICENSES directory. + */ + +package com.vesoft.nebula.algorithm + +object ClusteringCoefficientExample {} diff --git a/example/src/main/scala/com/vesoft/nebula/algorithm/DegreeStaticExample.scala b/example/src/main/scala/com/vesoft/nebula/algorithm/DegreeStaticExample.scala new file mode 100644 index 0000000..f1f5fda --- /dev/null +++ b/example/src/main/scala/com/vesoft/nebula/algorithm/DegreeStaticExample.scala @@ -0,0 +1,8 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +package com.vesoft.nebula.algorithm + +object DegreeStaticExample {} diff --git a/example/src/main/scala/com/vesoft/nebula/algorithm/GraphTriangleCountExample.scala b/example/src/main/scala/com/vesoft/nebula/algorithm/GraphTriangleCountExample.scala new file mode 100644 index 0000000..b42008f --- /dev/null +++ b/example/src/main/scala/com/vesoft/nebula/algorithm/GraphTriangleCountExample.scala @@ -0,0 +1,8 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +package com.vesoft.nebula.algorithm + +object GraphTriangleCountExample {} diff --git a/example/src/main/scala/com/vesoft/nebula/algorithm/LpaExample.scala b/example/src/main/scala/com/vesoft/nebula/algorithm/LpaExample.scala new file mode 100644 index 0000000..7504501 --- /dev/null +++ b/example/src/main/scala/com/vesoft/nebula/algorithm/LpaExample.scala @@ -0,0 +1,8 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +package com.vesoft.nebula.algorithm + +object LpaExample {} diff --git a/example/src/main/scala/com/vesoft/nebula/algorithm/PageRankExample.scala b/example/src/main/scala/com/vesoft/nebula/algorithm/PageRankExample.scala new file mode 100644 index 0000000..523f722 --- /dev/null +++ b/example/src/main/scala/com/vesoft/nebula/algorithm/PageRankExample.scala @@ -0,0 +1,8 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +package com.vesoft.nebula.algorithm + +object PageRankExample {} diff --git a/example/src/main/scala/com/vesoft/nebula/algorithm/ReadData.scala b/example/src/main/scala/com/vesoft/nebula/algorithm/ReadData.scala new file mode 100644 index 0000000..e99e3db --- /dev/null +++ b/example/src/main/scala/com/vesoft/nebula/algorithm/ReadData.scala @@ -0,0 +1,7 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +package com.vesoft.nebula.algorithm +object ReadData {} diff --git a/example/src/main/scala/com/vesoft/nebula/algorithm/SCCExample.scala b/example/src/main/scala/com/vesoft/nebula/algorithm/SCCExample.scala new file mode 100644 index 0000000..1da84a1 --- /dev/null +++ b/example/src/main/scala/com/vesoft/nebula/algorithm/SCCExample.scala @@ -0,0 +1,7 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +package com.vesoft.nebula.algorithm +object SCCExample {} diff --git a/example/src/main/scala/com/vesoft/nebula/algorithm/ShortestPathExample.scala b/example/src/main/scala/com/vesoft/nebula/algorithm/ShortestPathExample.scala new file mode 100644 index 0000000..f936845 --- /dev/null +++ b/example/src/main/scala/com/vesoft/nebula/algorithm/ShortestPathExample.scala @@ -0,0 +1,7 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +package com.vesoft.nebula.algorithm +object ShortestPathExample {} diff --git a/example/src/main/scala/com/vesoft/nebula/algorithm/TriangleCountExample.scala b/example/src/main/scala/com/vesoft/nebula/algorithm/TriangleCountExample.scala new file mode 100644 index 0000000..b17f64d --- /dev/null +++ b/example/src/main/scala/com/vesoft/nebula/algorithm/TriangleCountExample.scala @@ -0,0 +1,7 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +package com.vesoft.nebula.algorithm +object TriangleCountExample {} diff --git a/example/src/main/scala/com/vesoft/nebula/algorithm/WCCExample.scala b/example/src/main/scala/com/vesoft/nebula/algorithm/WCCExample.scala new file mode 100644 index 0000000..289b4d2 --- /dev/null +++ b/example/src/main/scala/com/vesoft/nebula/algorithm/WCCExample.scala @@ -0,0 +1,7 @@ +/* Copyright (c) 2020 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +package com.vesoft.nebula.algorithm +object WCCExample {} diff --git a/nebula-algorithm/src/main/resources/application.conf b/nebula-algorithm/src/main/resources/application.conf index 8536f9a..64ae4e7 100644 --- a/nebula-algorithm/src/main/resources/application.conf +++ b/nebula-algorithm/src/main/resources/application.conf @@ -121,11 +121,18 @@ # graphTriangleCount parameter graphtrianglecount:{} - # Betweenness centrality parameter + # Betweenness centrality parameter. maxIter parameter means the max times of iterations. betweenness:{ maxIter:5 } + # Clustering Coefficient parameter. The type parameter has two choice, local or global + # local type will compute the clustering coefficient for each vertex, and print the average coefficient for graph. + # global type just compute the graph's clustering coefficient. + clusteringcoefficient:{ + type: local + } + # SingleSourceShortestPathAlgo parameter singlesourceshortestpath:{ sourceid:"1" diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/Main.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/Main.scala index 5c19e33..481e68c 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/Main.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/Main.scala @@ -10,6 +10,7 @@ import com.vesoft.nebula.algorithm.config.{ AlgoConfig, BetweennessConfig, CcConfig, + CoefficientConfig, Configs, HanpConfig, KCoreConfig, @@ -22,6 +23,7 @@ import com.vesoft.nebula.algorithm.config.{ } import com.vesoft.nebula.algorithm.lib.{ BetweennessCentralityAlgo, + ClusteringCoefficientAlgo, ClosenessAlgo, ConnectedComponentsAlgo, DegreeStaticAlgo, @@ -168,6 +170,10 @@ object Main { case "graphtrianglecount" => { GraphTriangleCountAlgo(spark, dataSet) } + case "clusteringcoefficient" => { + val coefficientConfig = CoefficientConfig.getCoefficientConfig(configs) + ClusteringCoefficientAlgo(spark, dataSet, coefficientConfig) + } case "closeness" => { ClosenessAlgo(spark, dataSet, hasWeight) } diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/AlgoConfig.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/AlgoConfig.scala index 2dc54a3..4b1be6b 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/AlgoConfig.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/AlgoConfig.scala @@ -149,6 +149,24 @@ object BetweennessConfig { } } +/** + * ClusterCoefficient + * algoType has two options: local or global + */ +case class CoefficientConfig(algoType: String) + +object CoefficientConfig { + var algoType: String = _ + + def getCoefficientConfig(configs: Configs): CoefficientConfig = { + val coefficientConfig = configs.algorithmConfig.map + algoType = coefficientConfig("algorithm.clustercoefficient.type") + assert(algoType.equalsIgnoreCase("local") || algoType.equalsIgnoreCase("global"), + "ClusterCoefficient only support local or global type.") + CoefficientConfig(algoType) + } +} + /** * Hanp */ diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/Configs.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/Configs.scala index 7ea2427..aa9aad2 100644 --- a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/Configs.scala +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/config/Configs.scala @@ -348,20 +348,21 @@ object Configs { } object AlgoConstants { - val ALGO_ID_COL: String = "_id" - val PAGERANK_RESULT_COL: String = "pagerank" - val LOUVAIN_RESULT_COL: String = "louvain" - val KCORE_RESULT_COL: String = "kcore" - val LPA_RESULT_COL: String = "lpa" - val CC_RESULT_COL: String = "cc" - val SCC_RESULT_COL: String = "scc" - val BETWEENNESS_RESULT_COL: String = "betweennedss" - val SHORTPATH_RESULT_COL: String = "shortestpath" - val DEGREE_RESULT_COL: String = "degree" - val INDEGREE_RESULT_COL: String = "inDegree" - val OUTDEGREE_RESULT_COL: String = "outDegree" - val TRIANGLECOUNT_RESULT_COL: String = "tranglecount" - val CLOSENESS_RESULT_COL: String = "closeness" - val HANP_RESULT_COL: String = "hanp" - val NODE2VEC_RESULT_COL: String = "node2vec" + val ALGO_ID_COL: String = "_id" + val PAGERANK_RESULT_COL: String = "pagerank" + val LOUVAIN_RESULT_COL: String = "louvain" + val KCORE_RESULT_COL: String = "kcore" + val LPA_RESULT_COL: String = "lpa" + val CC_RESULT_COL: String = "cc" + val SCC_RESULT_COL: String = "scc" + val BETWEENNESS_RESULT_COL: String = "betweennedss" + val SHORTPATH_RESULT_COL: String = "shortestpath" + val DEGREE_RESULT_COL: String = "degree" + val INDEGREE_RESULT_COL: String = "inDegree" + val OUTDEGREE_RESULT_COL: String = "outDegree" + val TRIANGLECOUNT_RESULT_COL: String = "tranglecount" + val CLUSTERCOEFFICIENT_RESULT_COL: String = "clustercoefficient" + val CLOSENESS_RESULT_COL: String = "closeness" + val HANP_RESULT_COL: String = "hanp" + val NODE2VEC_RESULT_COL: String = "node2vec" } diff --git a/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/ClusteringCoefficientAlgo.scala b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/ClusteringCoefficientAlgo.scala new file mode 100644 index 0000000..a7403d7 --- /dev/null +++ b/nebula-algorithm/src/main/scala/com/vesoft/nebula/algorithm/lib/ClusteringCoefficientAlgo.scala @@ -0,0 +1,99 @@ +/* Copyright (c) 2021 vesoft inc. All rights reserved. + * + * This source code is licensed under Apache 2.0 License. + */ + +package com.vesoft.nebula.algorithm.lib + +import com.vesoft.nebula.algorithm.config.{AlgoConstants, CoefficientConfig, KCoreConfig} +import com.vesoft.nebula.algorithm.utils.NebulaUtil +import org.apache.log4j.Logger +import org.apache.spark.graphx.Graph +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.types.{DoubleType, IntegerType, LongType, StructField, StructType} +import org.apache.spark.sql.{DataFrame, Dataset, Row, SparkSession} + +object ClusteringCoefficientAlgo { + private val LOGGER = Logger.getLogger(this.getClass) + + val ALGORITHM: String = "ClusterCoefficientAlgo" + + /** + * run the clusterCoefficient algorithm for nebula graph + */ + def apply(spark: SparkSession, + dataset: Dataset[Row], + coefficientConfig: CoefficientConfig): DataFrame = { + + val graph: Graph[None.type, Double] = NebulaUtil.loadInitGraph(dataset, false) + var algoResult: DataFrame = null + + if (coefficientConfig.algoType.equalsIgnoreCase("local")) { + // compute local clustering coefficient + val localClusterCoefficient = executeLocalCC(graph) + val schema = StructType( + List( + StructField(AlgoConstants.ALGO_ID_COL, LongType, nullable = false), + StructField(AlgoConstants.CLUSTERCOEFFICIENT_RESULT_COL, DoubleType, nullable = true) + )) + algoResult = spark.sqlContext.createDataFrame(localClusterCoefficient, schema) + // print the graph's average clustering coefficient + + import spark.implicits._ + val vertexNum = algoResult.count() + + val averageCoeff: Double = + if (vertexNum == 0) 0 + else + algoResult.map(row => row.get(1).toString.toDouble).reduce(_ + _) / algoResult.count() + LOGGER.info(s"graph's average clustering coefficient is ${averageCoeff}") + + } else { + // compute global clustering coefficient + val GlobalClusterCoefficient: Double = executeGlobalCC(graph) + val list = List(GlobalClusterCoefficient) + val rdd = spark.sparkContext.parallelize(list).map(row => Row(row)) + + val schema = StructType( + List( + StructField("globalClusterCoefficient", DoubleType, nullable = false) + )) + algoResult = spark.sqlContext.createDataFrame(rdd, schema) + } + algoResult + } + + /** + * execute local cluster coefficient + */ + def executeLocalCC(graph: Graph[None.type, Double]): RDD[Row] = { + // compute the actual triangle count for each vertex + val triangleNum = graph.triangleCount().vertices + // compute the open triangle count for each vertex + val idealTriangleNum = graph.degrees.mapValues(degree => degree * (degree - 1) / 2) + val result = triangleNum + .innerJoin(idealTriangleNum) { (vid, actualCount, idealCount) => + { + if (idealCount == 0) 0.0 + else (actualCount / idealCount * 1.0).formatted("%.6f").toDouble + } + } + .map(vertex => Row(vertex._1, vertex._2)) + + result + } + + /** + * execute global cluster coefficient + */ + def executeGlobalCC(graph: Graph[None.type, Double]): Double = { + // compute the number of closed triangle + val closedTriangleNum = graph.triangleCount().vertices.map(_._2).reduce(_ + _) + // compute the number of open triangle and closed triangle (According to C(n,2)=n*(n-1)/2) + val triangleNum = graph.degrees.map(vertex => (vertex._2 * (vertex._2 - 1)) / 2.0).reduce(_ + _) + if (triangleNum == 0) + 0.0 + else + (closedTriangleNum / triangleNum * 1.0).formatted("%.6f").toDouble + } +}