-
Notifications
You must be signed in to change notification settings - Fork 41
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: implement parallel send data in load graph step #248
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
tiny comments
while (iterator.hasNext()) { | ||
Vertex vertex = iterator.next(); | ||
this.sendManager.sendEdge(vertex); | ||
futures.add(CompletableFuture.runAsync(() -> consumer.accept(vertex), | ||
this.sendExecutor)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prefer to align with CompletableFuture
sendThreadNum); | ||
} | ||
|
||
private Integer inputLoaderThreadNum(Config config) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
inputLoaderThreadNum => inputSendThreadNum ?
Hi, @javeme , I find that create a future for each vertex(edge) will cause OOM, I will change the implementation to parallel handle |
Codecov Report
@@ Coverage Diff @@
## master #248 +/- ##
============================================
+ Coverage 85.78% 85.82% +0.04%
- Complexity 3230 3237 +7
============================================
Files 344 344
Lines 12076 12105 +29
Branches 1088 1090 +2
============================================
+ Hits 10359 10389 +30
+ Misses 1190 1188 -2
- Partials 527 528 +1
... and 6 files with indirect coverage changes 📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more |
|
||
this.loadServices = new ArrayList<>(this.sendThreadNum); | ||
for (int i = 0; i < this.sendThreadNum; i++) { | ||
this.loadServices.add(new LoadService(context)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems don't need to create multi LoadService, just try calling LoadService.createIteratorFromVertex()
with sendThreadNum times.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, @javeme , thanks for your comment, I move fetcher
into IteratorFromVertex
and IteratorFromEdge
, also replace creation of multi LoadService
as you suggested.
computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/load/LoadService.java
Outdated
Show resolved
Hide resolved
computer-core/src/main/java/org/apache/hugegraph/computer/core/input/WorkerInputManager.java
Outdated
Show resolved
Hide resolved
computer-core/src/main/java/org/apache/hugegraph/computer/core/input/WorkerInputManager.java
Outdated
Show resolved
Hide resolved
computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/load/LoadService.java
Outdated
Show resolved
Hide resolved
*/ | ||
private GraphFetcher fetcher; | ||
|
||
public IteratorFromVertex(Config config, InputSplitRpcService rpcService) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we just pass into the GraphFetcher fetcher
arg
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that we can't, fetcher
has property localBatch
, different threads can not share the same localBatch
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can create GraphFetcher when new IteratorFromVertex()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you mean like this?
public Iterator<Vertex> createIteratorFromVertex() {
GraphFetcher fetcher = InputSourceFactory.createGraphFetcher(this.config, this.rpcService);
return new IteratorFromVertex(fetcher);
}
computer-api/src/main/java/org/apache/hugegraph/computer/core/config/ComputerOptions.java
Outdated
Show resolved
Hide resolved
ab11331
to
5f74db7
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
computer-core/src/main/java/org/apache/hugegraph/computer/core/input/WorkerInputManager.java
Show resolved
Hide resolved
computer-core/src/main/java/org/apache/hugegraph/computer/core/input/WorkerInputManager.java
Outdated
Show resolved
Hide resolved
computer-core/src/main/java/org/apache/hugegraph/computer/core/input/WorkerInputManager.java
Outdated
Show resolved
Hide resolved
ci error: Error: Tests run: 563, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 218.399 s <<< FAILURE! - in org.apache.hugegraph.computer.suite.unit.UnitTestSuite
Error: testRunAlgorithmFromHdfs(org.apache.hugegraph.computer.algorithm.centrality.degree.DegreeCentralityTest) Time elapsed: 4.379 s <<< FAILURE!
java.lang.AssertionError: [java.util.concurrent.CompletionException: org.apache.hugegraph.computer.core.common.exception.ComputerException: An exception occurred during parallel sending edges, null]
at org.apache.hugegraph.computer.algorithm.centrality.degree.DegreeCentralityTest.testRunAlgorithmFromHdfs(DegreeCentralityTest.java:50)
[INFO]
[INFO] Results:
[INFO]
Error: Failures:
Error: DegreeCentralityTest.testRunAlgorithmFromHdfs:50->AlgorithmTestBase.runAlgorithm:147 [java.util.concurrent.CompletionException: org.apache.hugegraph.computer.core.common.exception.ComputerException:
An exception occurred during parallel sending edges, null]
[INFO]
Error: Tests run: 563, Failures: 1, Errors: 0, Skipped: 0 |
Hi, @javeme , I've checked recent CI logs, the error is caused by |
Sry, it's my own fault, I'm doing some code profiling now ... |
Hi, @javeme , I've careful debug, and find the real reason is that Thus, I use a fetcher array to manage all fetchers in |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great, it's clear now.
Thanks for your analysis and feedback of the root cause.
computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/load/LoadService.java
Show resolved
Hide resolved
computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/load/LoadService.java
Outdated
Show resolved
Hide resolved
computer-core/src/main/java/org/apache/hugegraph/computer/core/worker/load/LoadService.java
Outdated
Show resolved
Hide resolved
computer-core/src/main/java/org/apache/hugegraph/computer/core/input/WorkerInputManager.java
Outdated
Show resolved
Hide resolved
public void init() { | ||
assert this.rpcService != null; | ||
for (int i = 0; i < this.fetcherNum; i++) { | ||
this.fetchers[i] = InputSourceFactory.createGraphFetcher(this.config, this.rpcService); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
could you please also add some comment for the reason why we hold a fetchers array
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Purpose of the PR
Main Changes
Verifying these changes
Does this PR potentially affect the following parts?
Documentation Status
Doc - TODO
Doc - Done
Doc - NO Need