Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support nebula source for flink sql connector #60

Merged
merged 9 commits into from
Aug 11, 2022

Conversation

liuxiaocs7
Copy link
Contributor

try to add flink sql support, it's still under development.

@CLAassistant
Copy link

CLAassistant commented Jul 7, 2022

CLA assistant check
All committers have signed the CLA.

@Nicole00 Nicole00 self-requested a review July 7, 2022 07:45
@spike-liu spike-liu mentioned this pull request Jul 15, 2022
@liuxiaocs7 liuxiaocs7 changed the title [WIP] feat add flink sql support [WIP] support nebula source for flink sql connector Jul 15, 2022
@Nicole00
Copy link
Contributor

Please see the conflicts, sorry for these conflicts because of another similar work which has been merged.

@liuxiaocs7
Copy link
Contributor Author

liuxiaocs7 commented Jul 21, 2022

Please see the conflicts, sorry for these conflicts because of another similar work which has been merged.

OK, i will handle the conflicts according to the current sink as quickly as possible.

# Conflicts:
#	connector/src/main/java/org.apache.flink/connector/nebula/sink/NebulaBatchOutputFormat.java
#	connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaDynamicTableFactory.java
#	connector/src/main/java/org.apache.flink/connector/nebula/table/NebulaDynamicTableSink.java
#	connector/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
@liuxiaocs7 liuxiaocs7 changed the title [WIP] support nebula source for flink sql connector support nebula source for flink sql connector Jul 22, 2022
@codecov-commenter
Copy link

Codecov Report

Merging #60 (6dbf56b) into master (428de90) will increase coverage by 9.93%.
The diff coverage is 66.14%.

@@             Coverage Diff              @@
##             master      #60      +/-   ##
============================================
+ Coverage     49.72%   59.65%   +9.93%     
- Complexity      190      271      +81     
============================================
  Files            50       50              
  Lines          1613     1678      +65     
  Branches        153      155       +2     
============================================
+ Hits            802     1001     +199     
+ Misses          743      598     -145     
- Partials         68       79      +11     
Impacted Files Coverage Δ
...nnector/nebula/sink/NebulaVertexBatchExecutor.java 63.15% <ø> (ø)
...he.flink/connector/nebula/utils/WriteModeEnum.java 50.00% <0.00%> (-50.00%) ⬇️
...connector/nebula/table/NebulaRowDataConverter.java 62.06% <62.06%> (ø)
...nnector/nebula/table/NebulaDynamicTableSource.java 85.71% <83.33%> (+85.71%) ⬆️
...nector/nebula/table/NebulaDynamicTableFactory.java 96.87% <93.33%> (+16.02%) ⬆️
...ink/connector/nebula/source/NebulaInputFormat.java 68.75% <100.00%> (+68.75%) ⬆️
...nnector/nebula/table/NebulaRowDataInputFormat.java 100.00% <100.00%> (+100.00%) ⬆️
...nnector/nebula/connection/NebulaClientOptions.java 91.25% <0.00%> (+1.25%) ⬆️
...k/connector/nebula/statement/ExecutionOptions.java 92.59% <0.00%> (+14.81%) ⬆️
... and 7 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 428de90...6dbf56b. Read the comment docs.

@liuxiaocs7
Copy link
Contributor Author

related to #58.

UT result are as follow:

source vertex:

7> +I[63, aba, abcdefgh, 1, 1111, 22222, 6412233, 2019-01-01, 2019-01-01T04:12:12, 435463424, false, 1.2, 1.0, 03:12:12, POINT(1.0 3.0)]
14> +I[66, aba, abcdefgh, 1, 1111, 22222, 6412233, 2019-01-01, 2019-01-01T04:12:12, 435463424, false, 1.2, 1.0, 03:12:12, LINESTRING(1.0 3.0,2.0 4.0)]
13> +I[64, aba, abcdefgh, 1, 1111, 22222, 6412233, 2019-01-01, 2019-01-01T04:12:12, 435463424, false, 1.2, 1.0, 03:12:12, LINESTRING(1.0 3.0,2.0 4.0)]
6> +I[65, aba, abcdefgh, 1, 1111, 22222, 6412233, 2019-01-01, 2019-01-01T04:12:12, 435463424, false, 1.2, 1.0, 03:12:12, LINESTRING(1.0 3.0,2.0 4.0)]
12> +I[68, aba, 张三, 1, 1111, 22222, 6412233, 2019-01-01, 2019-01-01T04:12:12, 435463424, true, 1.2, 1.0, 03:12:12, POLYGON((0.0 1.0,1.0 2.0,2.0 3.0,0.0 1.0))]
2> +I[62, aba, abcdefgh, 1, 1111, 22222, 6412233, 2019-01-01, 2019-01-01T04:12:12, 435463424, false, 1.2, 1.0, 03:12:12, POINT(1.0 3.0)]
1> +I[67, 李四, abcdefgh, 1, 1111, 22222, 6412233, 2019-01-01, 2019-01-01T04:12:12, 435463424, true, 1.2, 1.0, 03:12:12, POLYGON((0.0 1.0,1.0 2.0,2.0 3.0,0.0 1.0))]
3> +I[61, aba, abcdefgh, 1, 1111, 22222, 6412233, 2019-01-01, 2019-01-01T04:12:12, 435463424, false, 1.2, 1.0, 03:12:12, POINT(1.0 3.0)]

source edges:

3> +I[61, 62, 0, aba, abcdefgh, 1, 1111, 22222, 6412233, 2019-01-01, 2019-01-01T04:12:12, 435463424, false, 1.2, 1.0, 03:12:12, POINT(1.0 3.0)]
1> +I[63, 64, 0, aba, abcdefgh, 1, 1111, 22222, 6412233, 2019-01-01, 2019-01-01T04:12:12, 435463424, false, 1.2, 1.0, 03:12:12, POINT(1.0 3.0)]
2> +I[62, 63, 0, aba, abcdefgh, 1, 1111, 22222, 6412233, 2019-01-01, 2019-01-01T04:12:12, 435463424, false, 1.2, 1.0, 03:12:12, POINT(1.0 3.0)]
15> +I[65, 66, 0, aba, abcdefgh, 1, 1111, 22222, 6412233, 2019-01-01, 2019-01-01T04:12:12, 435463424, false, 1.2, 1.0, 03:12:12, LINESTRING(1.0 3.0,2.0 4.0)]
13> +I[67, 68, 0, 李四, abcdefgh, 1, 1111, 22222, 6412233, 2019-01-01, 2019-01-01T04:12:12, 435463424, true, 1.2, 1.0, 03:12:12, POLYGON((0.0 1.0,1.0 2.0,2.0 3.0,0.0 1.0))]
11> +I[68, 61, 0, aba, 张三, 1, 1111, 22222, 6412233, 2019-01-01, 2019-01-01T04:12:12, 435463424, true, 1.2, 1.0, 03:12:12, POLYGON((0.0 1.0,1.0 2.0,2.0 3.0,0.0 1.0))]
14> +I[66, 67, 0, aba, abcdefgh, 1, 1111, 22222, 6412233, 2019-01-01, 2019-01-01T04:12:12, 435463424, false, 1.2, 1.0, 03:12:12, LINESTRING(1.0 3.0,2.0 4.0)]
16> +I[64, 65, 0, aba, abcdefgh, 1, 1111, 22222, 6412233, 2019-01-01, 2019-01-01T04:12:12, 435463424, false, 1.2, 1.0, 03:12:12, LINESTRING(1.0 3.0,2.0 4.0)]

@@ -0,0 +1,239 @@
/* Copyright (c) 2020 vesoft inc. All rights reserved.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as a new file, the date should be 2022

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the reminder, indeed, I will revise this.

case TINYINT:
case SMALLINT:
case INTEGER:
return val -> (int) val.asLong();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggest to return Long type, not convert to int

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now after the modification, all integer types now return Long. The converter works fine. However, if the table declaration is INT, the RowDataSerializer(use to serialize RowData to another compute operator) will generate an exception, that is, the integer of type Long cannot be cast to Integer. The current temporary solution is to directly set all types in the table declaration to BIGINT.


public static boolean checkValidWriteMode(String modeName) {
return chooseWriteMode(modeName) != INSERT
|| INSERT.name().equalsIgnoreCase(modeName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the purpose for this function? always return true?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is going to be used to verify the write-mode parameter passed in through with in flink sql. This Function returns true if the mode parameter is one of INSERT, UPDATE, DELETE(ignoreCase), return false if it is any other string, such 'write-mode' = 'add'.

public static boolean checkValidWriteMode(String modeName) {
    return chooseWriteMode(modeName) != INSERT
            || INSERT.name().equalsIgnoreCase(modeName);
}

public static WriteModeEnum chooseWriteMode(String modeName) {
    if (UPDATE.name().equalsIgnoreCase(modeName)) {
        return UPDATE;
    }
    if (DELETE.name().equalsIgnoreCase(modeName)) {
        return DELETE;
    }
    return INSERT;
}

But now the write-mode has been removed from with clause, and it seems better to use the enumType to validate that comes with flink, so this part of the logic can be removed.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants