Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update default sentence to null #64

Merged
merged 3 commits into from
Jan 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -658,7 +658,7 @@ object Configs {
config.getString("table"),
config.getString("user"),
config.getString("password"),
getOrElse(config, "sentence", "")
getOrElse(config, "sentence", null)
)
case SourceCategory.POSTGRESQL =>
PostgreSQLSourceConfigEntry(
Expand All @@ -669,7 +669,7 @@ object Configs {
config.getString("table"),
config.getString("user"),
config.getString("password"),
getOrElse(config, "sentence", "")
getOrElse(config, "sentence", null)
)
case SourceCategory.KAFKA =>
val intervalSeconds =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ class ConfigsSuite {
assert(postgresql.port == 5432)
assert(postgresql.user.equals("root"))
assert(postgresql.password.equals("nebula"))
assert(postgresql.database.equals("database"))
assert(postgresql.table.equals("table"))
}
case _ => {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,17 @@
package com.vesoft.nebula.exchange.reader

import com.google.common.collect.Maps
import com.vesoft.exchange.common.config.{ClickHouseConfigEntry, HBaseSourceConfigEntry, HiveSourceConfigEntry, JanusGraphSourceConfigEntry, MaxComputeConfigEntry, MySQLSourceConfigEntry, Neo4JSourceConfigEntry, PostgreSQLSourceConfigEntry, ServerDataSourceConfigEntry}
import com.vesoft.exchange.common.config.{
ClickHouseConfigEntry,
HBaseSourceConfigEntry,
HiveSourceConfigEntry,
JanusGraphSourceConfigEntry,
MaxComputeConfigEntry,
MySQLSourceConfigEntry,
Neo4JSourceConfigEntry,
PostgreSQLSourceConfigEntry,
ServerDataSourceConfigEntry
}
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
Expand All @@ -18,7 +28,10 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.types.{DataTypes, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.tinkerpop.gremlin.process.computer.clustering.peerpressure.{ClusterCountMapReduce, PeerPressureVertexProgram}
import org.apache.tinkerpop.gremlin.process.computer.clustering.peerpressure.{
ClusterCountMapReduce,
PeerPressureVertexProgram
}
import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer
import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD
import org.apache.tinkerpop.gremlin.structure.util.GraphFactory
Expand Down Expand Up @@ -77,18 +90,23 @@ class MySQLReader(override val session: SparkSession, mysqlConfig: MySQLSourceCo
.option("user", mysqlConfig.user)
.option("password", mysqlConfig.password)
.load()
df.createOrReplaceTempView(mysqlConfig.table)
session.sql(sentence)
if (sentence != null) {
df.createOrReplaceTempView(mysqlConfig.table)
session.sql(sentence)
} else {
df
}
}
}

/**
* The PostgreSQLReader extends the ServerBaseReader
*
* @param session
* @param postgreConfig
*/
class PostgreSQLReader(override val session: SparkSession, postgreConfig: PostgreSQLSourceConfigEntry)
* The PostgreSQLReader extends the ServerBaseReader
*
* @param session
* @param postgreConfig
*/
class PostgreSQLReader(override val session: SparkSession,
postgreConfig: PostgreSQLSourceConfigEntry)
extends ServerBaseReader(session, postgreConfig.sentence) {
override def read(): DataFrame = {
val url =
Expand All @@ -101,9 +119,12 @@ class PostgreSQLReader(override val session: SparkSession, postgreConfig: Postgr
.option("user", postgreConfig.user)
.option("password", postgreConfig.password)
.load()
df.createOrReplaceTempView(postgreConfig.table)
if(!"".equals(sentence.trim)) session.sql(sentence)
else session.sql(s"select * from ${postgreConfig.table}")
if (sentence != null) {
df.createOrReplaceTempView(postgreConfig.table)
session.sql(sentence)
} else {
df
}
}
}

Expand Down
61 changes: 61 additions & 0 deletions nebula-exchange_spark_2.4/src/main/resources/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,31 @@
batch: 256
partition: 32
}

# PostgreSQL
{
name: tag9
type: {
source: postgresql
sink: client
}
user:root
host: "127.0.0.1"
port: "5432"
database: "database"
table: "table"
user: "root"
password: "nebula"
sentence: "select postgre-field0, postgre-field1, postgre-field2 from table"
fields: [postgre-field-0, postgre-field-1, postgre-field-2]
nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2]
vertex: {
field: postgre-field-0
# policy: "hash"
}
batch: 256
partition: 32
}
]

# Processing edges
Expand All @@ -340,6 +365,7 @@
field:parquet-field-1
#policy:hash
}
ranking: parquet-field-2
batch: 256
partition: 32
}
Expand Down Expand Up @@ -405,6 +431,7 @@
nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2]
source: hive-field-0
target: hive-field-1
ranking: hive-filed-2
batch: 256
partition: 32
}
Expand All @@ -428,6 +455,7 @@
target: {
field: b.neo4j-target-field
}
ranking: neo4j-field-2
partition: 10
batch: 1000
check_point_path: /tmp/test
Expand All @@ -452,6 +480,7 @@
target: {
field: hbase-column-h
}
ranking: hbase-column-t
partition: 10
batch: 1000
}
Expand Down Expand Up @@ -498,6 +527,7 @@
nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2]
source: kafka-field-0
target: kafka-field-1
ranking: kafka-field-2
partition: 10
batch: 1000
interval.seconds: 10
Expand Down Expand Up @@ -527,6 +557,7 @@
target:{
field: maxcompute-field-3
}
ranking: maxcompute-field-4
partition:10
batch:10
}
Expand Down Expand Up @@ -554,9 +585,39 @@
field:clickhouse-field-1
#policy:hash
}
ranking:clickhouse-field-3
batch: 256
partition: 32
}

# PostgreSQL
{
name: edge-name-8
type: {
source: postgresql
sink: client
}
user:root
host: "127.0.0.1"
port: "5432"
database: "database"
table: "table"
user: "root"
password: "nebula"
sentence: "select postgre-field0, postgre-field1, postgre-field2 from table"
fields: [postgre-field-0, postgre-field-1, postgre-field-2]
nebula.fields: [nebula-field-0, nebula-field-1, nebula-field-2]
source: {
field: mysql-field-0
# policy: "hash"
}
source: {
field: mysql-field-0
# policy: "hash"
}
ranking: postgre-field-1
batch: 256
partition: 32
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,17 @@
package com.vesoft.nebula.exchange.reader

import com.google.common.collect.Maps
import com.vesoft.exchange.common.config.{ClickHouseConfigEntry, HBaseSourceConfigEntry, HiveSourceConfigEntry, JanusGraphSourceConfigEntry, MaxComputeConfigEntry, MySQLSourceConfigEntry, Neo4JSourceConfigEntry, PostgreSQLSourceConfigEntry, ServerDataSourceConfigEntry}
import com.vesoft.exchange.common.config.{
ClickHouseConfigEntry,
HBaseSourceConfigEntry,
HiveSourceConfigEntry,
JanusGraphSourceConfigEntry,
MaxComputeConfigEntry,
MySQLSourceConfigEntry,
Neo4JSourceConfigEntry,
PostgreSQLSourceConfigEntry,
ServerDataSourceConfigEntry
}
import com.vesoft.exchange.common.utils.HDFSUtils
import com.vesoft.nebula.exchange.utils.Neo4jUtils
import org.apache.hadoop.hbase.HBaseConfiguration
Expand All @@ -20,7 +30,10 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.types.{DataTypes, StructType}
import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.tinkerpop.gremlin.process.computer.clustering.peerpressure.{ClusterCountMapReduce, PeerPressureVertexProgram}
import org.apache.tinkerpop.gremlin.process.computer.clustering.peerpressure.{
ClusterCountMapReduce,
PeerPressureVertexProgram
}
import org.apache.tinkerpop.gremlin.spark.process.computer.SparkGraphComputer
import org.apache.tinkerpop.gremlin.spark.structure.io.PersistedOutputRDD
import org.apache.tinkerpop.gremlin.structure.util.GraphFactory
Expand Down Expand Up @@ -79,17 +92,22 @@ class MySQLReader(override val session: SparkSession, mysqlConfig: MySQLSourceCo
.option("user", mysqlConfig.user)
.option("password", mysqlConfig.password)
.load()
df.createOrReplaceTempView(mysqlConfig.table)
session.sql(sentence)
if (sentence != null) {
df.createOrReplaceTempView(mysqlConfig.table)
session.sql(sentence)
} else {
df
}
}
}

/**
* The PostgreSQLReader extends the ServerBaseReader
*
*/
class PostgreSQLReader(override val session: SparkSession, postgreConfig: PostgreSQLSourceConfigEntry)
extends ServerBaseReader(session, postgreConfig.sentence) {
* The PostgreSQLReader extends the ServerBaseReader
*
*/
class PostgreSQLReader(override val session: SparkSession,
postgreConfig: PostgreSQLSourceConfigEntry)
extends ServerBaseReader(session, postgreConfig.sentence) {
override def read(): DataFrame = {
val url =
s"jdbc:postgresql://${postgreConfig.host}:${postgreConfig.port}/${postgreConfig.database}"
Expand All @@ -101,9 +119,12 @@ class PostgreSQLReader(override val session: SparkSession, postgreConfig: Postgr
.option("user", postgreConfig.user)
.option("password", postgreConfig.password)
.load()
df.createOrReplaceTempView(postgreConfig.table)
if(!"".equals(sentence.trim)) session.sql(sentence)
else session.sql(s"select * from ${postgreConfig.table}")
if (sentence != null) {
df.createOrReplaceTempView(postgreConfig.table)
session.sql(sentence)
} else {
df
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,19 +74,24 @@ class MySQLReader(override val session: SparkSession, mysqlConfig: MySQLSourceCo
.option("user", mysqlConfig.user)
.option("password", mysqlConfig.password)
.load()
df.createOrReplaceTempView(mysqlConfig.table)
session.sql(sentence)
if (sentence != null) {
df.createOrReplaceTempView(mysqlConfig.table)
session.sql(sentence)
} else {
df
}
}
}

/**
* The PosrgreReader
* TODO
*
* @param session
* @param postgreConfig
*/
class PostgreSQLReader(override val session: SparkSession, postgreConfig: PostgreSQLSourceConfigEntry)
* The PosrgreReader
* TODO
*
* @param session
* @param postgreConfig
*/
class PostgreSQLReader(override val session: SparkSession,
postgreConfig: PostgreSQLSourceConfigEntry)
extends ServerBaseReader(session, postgreConfig.sentence) {
override def read(): DataFrame = {
val url =
Expand All @@ -99,9 +104,12 @@ class PostgreSQLReader(override val session: SparkSession, postgreConfig: Postgr
.option("user", postgreConfig.user)
.option("password", postgreConfig.password)
.load()
df.createOrReplaceTempView(postgreConfig.table)
if(!"".equals(sentence.trim)) session.sql(sentence)
else session.sql(s"select * from ${postgreConfig.table}")
if (sentence != null) {
df.createOrReplaceTempView(postgreConfig.table)
session.sql(sentence)
} else {
df
}
}
}

Expand Down