Skip to content
This repository has been archived by the owner on Oct 18, 2021. It is now read-only.

Commit

Permalink
Merge pull request #100 from Nicole00/datasource_maxCompute
Browse files Browse the repository at this point in the history
Support  maxCompute datasource
  • Loading branch information
Nicole00 committed Jul 7, 2021
2 parents ffb8b9f + 4e2b65c commit fe314b7
Show file tree
Hide file tree
Showing 7 changed files with 201 additions and 65 deletions.
10 changes: 10 additions & 0 deletions nebula-exchange/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -700,6 +700,16 @@
<artifactId>scala-xml</artifactId>
<version>${scala-xml.version}</version>
</dependency>
<dependency>
<groupId>com.aliyun.odps</groupId>
<artifactId>odps-spark-datasource_2.11</artifactId>
<version>3.3.8-public</version>
</dependency>
<dependency>
<groupId>com.aliyun.emr</groupId>
<artifactId>emr-maxcompute_2.11</artifactId>
<version>2.2.0</version>
</dependency>
</dependencies>
<repositories>
<repository>
Expand Down
53 changes: 53 additions & 0 deletions nebula-exchange/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,31 @@
batch: 10
interval.seconds: 10
}

# MaxCompute
{
name: tag-name-8
type:{
source:maxcompute
sink:client
}
table:table
project:project
odpsUrl:"http://service.cn-hangzhou.maxcompute.aliyun.com/api"
tunnelUrl:"http://dt.cn-hangzhou.maxcompute.aliyun.com"
accessKeyId:xxx
accessKeySecret:xxx
partitionSpec:"dt='partition1'"
# maxcompute sql sentence only uses table name. make sure that table name is the same with {table}'s value'.
sentence:"select id, maxcompute-field-0, maxcompute-field-1, maxcompute-field-2 from table where id < 10"
fields:[maxcompute-field-0, maxcompute-field-1]
nebula.fields:[nebula-field-0, nebula-field-1]
vertex:{
field: maxcompute-field-2
}
partition 10
batch:10
}
]

# Processing edges
Expand Down Expand Up @@ -420,5 +445,33 @@
batch: 1000
interval.seconds: 10
}

# MaxCompute
{
name: edge-name-7
type:{
source:maxcompute
sink:client
}
table:table
project:project
odpsUrl:"http://service.cn-hangzhou.maxcompute.aliyun.com/api"
tunnelUrl:"http://dt.cn-hangzhou.maxcompute.aliyun.com"
accessKeyId:xxx
accessKeySecret:xxx
partitionSpec:"dt='partition1'"
# maxcompute sql sentence only uses table name.
sentence:"select * from table"
fields:[maxcompute-field-0, maxcompute-field-1]
nebula.fields:[nebula-field-0, nebula-field-1]
source:{
field: maxcompute-field-2
}
target:{
field: maxcompute-field-3
}
partition 10
batch:10
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import com.vesoft.nebula.exchange.config.{
HiveSourceConfigEntry,
JanusGraphSourceConfigEntry,
KafkaSourceConfigEntry,
MaxComputeConfigEntry,
MySQLSourceConfigEntry,
Neo4JSourceConfigEntry,
PulsarSourceConfigEntry,
Expand All @@ -31,13 +32,14 @@ import com.vesoft.nebula.exchange.reader.{
JSONReader,
JanusGraphReader,
KafkaReader,
MaxcomputeReader,
MySQLReader,
Neo4JReader,
ORCReader,
ParquetReader
ParquetReader,
PulsarReader
}
import com.vesoft.nebula.exchange.processor.ReloadProcessor
import com.vesoft.nebula.exchange.reader.PulsarReader
import org.apache.log4j.Logger
import org.apache.spark.SparkConf

Expand Down Expand Up @@ -272,6 +274,10 @@ object Exchange {
val hbaseSourceConfigEntry = config.asInstanceOf[HBaseSourceConfigEntry]
val reader = new HBaseReader(session, hbaseSourceConfigEntry)
Some(reader.read())
case SourceCategory.MAXCOMPUTE =>
val maxComputeConfigEntry = config.asInstanceOf[MaxComputeConfigEntry]
val reader = new MaxcomputeReader(session, maxComputeConfigEntry)
Some(reader.read())
case _ => {
LOG.error(s"Data source ${config.category} not supported")
None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,17 +472,18 @@ object Configs {
*/
private[this] def toSourceCategory(category: String): SourceCategory.Value = {
category.trim.toUpperCase match {
case "PARQUET" => SourceCategory.PARQUET
case "ORC" => SourceCategory.ORC
case "JSON" => SourceCategory.JSON
case "CSV" => SourceCategory.CSV
case "HIVE" => SourceCategory.HIVE
case "NEO4J" => SourceCategory.NEO4J
case "KAFKA" => SourceCategory.KAFKA
case "MYSQL" => SourceCategory.MYSQL
case "PULSAR" => SourceCategory.PULSAR
case "HBASE" => SourceCategory.HBASE
case _ => throw new IllegalArgumentException(s"${category} not support")
case "PARQUET" => SourceCategory.PARQUET
case "ORC" => SourceCategory.ORC
case "JSON" => SourceCategory.JSON
case "CSV" => SourceCategory.CSV
case "HIVE" => SourceCategory.HIVE
case "NEO4J" => SourceCategory.NEO4J
case "KAFKA" => SourceCategory.KAFKA
case "MYSQL" => SourceCategory.MYSQL
case "PULSAR" => SourceCategory.PULSAR
case "HBASE" => SourceCategory.HBASE
case "MAXCOMPUTE" => SourceCategory.MAXCOMPUTE
case _ => throw new IllegalArgumentException(s"${category} not support")
}
}

Expand Down Expand Up @@ -609,6 +610,31 @@ object Configs {
config.getString("table"),
config.getString("columnFamily"),
fields.toSet.toList)
case SourceCategory.MAXCOMPUTE => {
val partitionSpec = if (config.hasPath("partitionSpec")) {
config.getString("partitionSpec")
} else {
null
}

val sentence = if (config.hasPath("sentence")) {
config.getString("sentence")
} else {
null
}

MaxComputeConfigEntry(
SourceCategory.MAXCOMPUTE,
config.getString("odpsUrl"),
config.getString("tunnelUrl"),
config.getString("table"),
config.getString("project"),
config.getString("accessKeyId"),
config.getString("accessKeySecret"),
partitionSpec,
sentence
)
}
case _ =>
throw new IllegalArgumentException("Unsupported data source")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ object SourceCategory extends Enumeration {
val JANUS_GRAPH = Value("JANUS GRAPH")
val MYSQL = Value("MYSQL")
val HBASE = Value("HBASE")
val MAXCOMPUTE = Value("MAXCOMPUTE")

val SOCKET = Value("SOCKET")
val KAFKA = Value("KAFKA")
Expand Down Expand Up @@ -217,3 +218,28 @@ case class HBaseSourceConfigEntry(override val category: SourceCategory.Value,
s"HBase source host: $host, port: $port, table: $table"
}
}

/**
* MaxComputeConfigEntry
*/
case class MaxComputeConfigEntry(override val category: SourceCategory.Value,
odpsUrl: String,
tunnelUrl: String,
table: String,
project: String,
accessKeyId: String,
accessKeySecret: String,
partitionSpec: String,
override val sentence: String)
extends ServerDataSourceConfigEntry {
require(
!odpsUrl.trim.isEmpty && !tunnelUrl.trim.isEmpty && !table.trim.isEmpty && !project.trim.isEmpty
&& !accessKeyId.trim.isEmpty && !accessKeySecret.trim.isEmpty)

override def toString: String = {
s"MaxCompute source {odpsUrl: $odpsUrl, tunnelUrl: $tunnelUrl, table: $table, project: $project, " +
s"keyId: $accessKeyId, keySecret: $accessKeySecret, partitionSpec:$partitionSpec, " +
s"sentence:$sentence}"
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import com.vesoft.nebula.exchange.config.{
HBaseSourceConfigEntry,
HiveSourceConfigEntry,
JanusGraphSourceConfigEntry,
MaxComputeConfigEntry,
MySQLSourceConfigEntry,
Neo4JSourceConfigEntry,
ServerDataSourceConfigEntry
Expand Down Expand Up @@ -259,3 +260,35 @@ class HBaseReader(override val session: SparkSession, hbaseConfig: HBaseSourceCo
dataFrame
}
}

/**
* MaxCompute Reader
*/
class MaxcomputeReader(override val session: SparkSession, maxComputeConfig: MaxComputeConfigEntry)
extends ServerBaseReader(session, maxComputeConfig.sentence) {

override def read(): DataFrame = {
var dfReader = session.read
.format("org.apache.spark.aliyun.odps.datasource")
.option("odpsUrl", maxComputeConfig.odpsUrl)
.option("tunnelUrl", maxComputeConfig.tunnelUrl)
.option("table", maxComputeConfig.table)
.option("project", maxComputeConfig.project)
.option("accessKeyId", maxComputeConfig.accessKeyId)
.option("accessKeySecret", maxComputeConfig.accessKeySecret)

// if use partition read
if (maxComputeConfig.partitionSpec != null) {
dfReader = dfReader.option("partitionSpec", maxComputeConfig.partitionSpec)
}

val df = dfReader.load()
import session._
if (maxComputeConfig.sentence == null) {
df
} else {
df.createOrReplaceTempView(s"${maxComputeConfig.table}")
session.sql(maxComputeConfig.sentence)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -105,59 +105,41 @@ class NebulaUtilsSuite {

@Test
def getPartitionId(): Unit = {
val storageClient = new StorageClient("127.0.0.1", 9559)
storageClient.connect()
for (i <- 1 to 17) {
val vid =
if (i <= 12) Integer.toString(i)
else if (i == 13) "-1"
else if (i == 14) "-2"
else if (i == 15) "-3"
else if (i == 16) "19"
else "22"
val partitionId = NebulaUtils.getPartitionId(vid, 10, VidType.STRING)
val scanResultIter = storageClient.scanVertex("test_string", partitionId, "person")
var containVertex = false
while (scanResultIter.hasNext) {
val scanResult = scanResultIter.next()
val map = scanResult.getVidVertices
for (value <- map.keySet().asScala if !containVertex) {
if (value.asString().equals(vid)) {
containVertex = true
}
}
}
if (!containVertex) {
LOG.error("vid={},partId={}", vid, partitionId)
}
assert(containVertex)
}
// for String type vid
assert(NebulaUtils.getPartitionId("1", 10, VidType.STRING) == 6)
assert(NebulaUtils.getPartitionId("2", 10, VidType.STRING) == 1)
assert(NebulaUtils.getPartitionId("3", 10, VidType.STRING) == 4)
assert(NebulaUtils.getPartitionId("4", 10, VidType.STRING) == 7)
assert(NebulaUtils.getPartitionId("5", 10, VidType.STRING) == 10)
assert(NebulaUtils.getPartitionId("6", 10, VidType.STRING) == 2)
assert(NebulaUtils.getPartitionId("7", 10, VidType.STRING) == 3)
assert(NebulaUtils.getPartitionId("8", 10, VidType.STRING) == 7)
assert(NebulaUtils.getPartitionId("9", 10, VidType.STRING) == 5)
assert(NebulaUtils.getPartitionId("10", 10, VidType.STRING) == 4)
assert(NebulaUtils.getPartitionId("11", 10, VidType.STRING) == 9)
assert(NebulaUtils.getPartitionId("12", 10, VidType.STRING) == 4)
assert(NebulaUtils.getPartitionId("-1", 10, VidType.STRING) == 1)
assert(NebulaUtils.getPartitionId("-2", 10, VidType.STRING) == 6)
assert(NebulaUtils.getPartitionId("-3", 10, VidType.STRING) == 1)
assert(NebulaUtils.getPartitionId("19", 10, VidType.STRING) == 9)
assert(NebulaUtils.getPartitionId("22", 10, VidType.STRING) == 8)

for (i <- 1 to 17) {
val vid =
if (i <= 12) Integer.toString(i)
else if (i == 13) "-1"
else if (i == 14) "-2"
else if (i == 15) "-3"
else if (i == 16) "19"
else "22"
val partitionId = NebulaUtils.getPartitionId(vid, 10, VidType.INT)
val scanResultIter = storageClient.scanVertex("test_int", partitionId, "person")
var containVertex = false
while (scanResultIter.hasNext) {
val scanResult = scanResultIter.next()
val map = scanResult.getVidVertices
for (value <- map.keySet().asScala if !containVertex) {
if (value.asLong() == vid.toLong) {
containVertex = true
}
}
}
if (!containVertex) {
LOG.error("vid={},partId={}", vid, partitionId)
}
assert(containVertex)
}
// for int type vid
assert(NebulaUtils.getPartitionId("1", 10, VidType.INT) == 2)
assert(NebulaUtils.getPartitionId("2", 10, VidType.INT) == 3)
assert(NebulaUtils.getPartitionId("3", 10, VidType.INT) == 4)
assert(NebulaUtils.getPartitionId("4", 10, VidType.INT) == 5)
assert(NebulaUtils.getPartitionId("5", 10, VidType.INT) == 6)
assert(NebulaUtils.getPartitionId("6", 10, VidType.INT) == 7)
assert(NebulaUtils.getPartitionId("7", 10, VidType.INT) == 8)
assert(NebulaUtils.getPartitionId("8", 10, VidType.INT) == 9)
assert(NebulaUtils.getPartitionId("9", 10, VidType.INT) == 10)
assert(NebulaUtils.getPartitionId("10", 10, VidType.INT) == 1)
assert(NebulaUtils.getPartitionId("11", 10, VidType.INT) == 2)
assert(NebulaUtils.getPartitionId("12", 10, VidType.INT) == 3)
assert(NebulaUtils.getPartitionId("-1", 10, VidType.INT) == 6)
assert(NebulaUtils.getPartitionId("-2", 10, VidType.INT) == 5)
assert(NebulaUtils.getPartitionId("-3", 10, VidType.INT) == 4)
}

@Test
Expand Down

0 comments on commit fe314b7

Please sign in to comment.