Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement parallel send data in load graph step #248

Merged
merged 8 commits into from
May 29, 2023

Conversation

Radeity
Copy link
Member

@Radeity Radeity commented May 25, 2023

Purpose of the PR

Main Changes

  • During graph loading step, introduce thread pool to parallel send vertices and edges to target Worker.

Verifying these changes

  • This change is already covered by existing tests, such as (please describe tests).

Does this PR potentially affect the following parts?

  • Nope
  • Dependencies (add/update license info)
  • Modify configurations
  • The public API
  • Other affects (typed here)

Documentation Status

  • Doc - TODO
  • Doc - Done
  • Doc - NO Need

coderzc
coderzc previously approved these changes May 25, 2023
Copy link
Member

@coderzc coderzc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

coderzc
coderzc previously approved these changes May 25, 2023
@Radeity Radeity marked this pull request as draft May 25, 2023 12:47
Copy link
Contributor

@javeme javeme left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tiny comments

while (iterator.hasNext()) {
Vertex vertex = iterator.next();
this.sendManager.sendEdge(vertex);
futures.add(CompletableFuture.runAsync(() -> consumer.accept(vertex),
this.sendExecutor));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prefer to align with CompletableFuture

sendThreadNum);
}

private Integer inputLoaderThreadNum(Config config) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

inputLoaderThreadNum => inputSendThreadNum ?

@Radeity
Copy link
Member Author

Radeity commented May 25, 2023

Hi, @javeme , I find that create a future for each vertex(edge) will cause OOM, I will change the implementation to parallel handle inputSplit tomorrow, thus, I draft this PR temporarily. I would appreciate it if you can help review after modification.

@codecov
Copy link

codecov bot commented May 26, 2023

Codecov Report

Merging #248 (e5475dd) into master (f7d10a8) will increase coverage by 0.04%.
The diff coverage is 96.36%.

@@             Coverage Diff              @@
##             master     #248      +/-   ##
============================================
+ Coverage     85.78%   85.82%   +0.04%     
- Complexity     3230     3237       +7     
============================================
  Files           344      344              
  Lines         12076    12105      +29     
  Branches       1088     1090       +2     
============================================
+ Hits          10359    10389      +30     
+ Misses         1190     1188       -2     
- Partials        527      528       +1     
Impacted Files Coverage Δ
...egraph/computer/core/input/WorkerInputManager.java 95.74% <93.33%> (-4.26%) ⬇️
...ugegraph/computer/core/config/ComputerOptions.java 98.88% <100.00%> (+0.01%) ⬆️
...ugegraph/computer/core/compute/ComputeManager.java 86.51% <100.00%> (ø)
...gegraph/computer/core/worker/load/LoadService.java 91.48% <100.00%> (+0.79%) ⬆️

... and 6 files with indirect coverage changes

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more


this.loadServices = new ArrayList<>(this.sendThreadNum);
for (int i = 0; i < this.sendThreadNum; i++) {
this.loadServices.add(new LoadService(context));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems don't need to create multi LoadService, just try calling LoadService.createIteratorFromVertex() with sendThreadNum times.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @javeme , thanks for your comment, I move fetcher into IteratorFromVertex and IteratorFromEdge, also replace creation of multi LoadService as you suggested.

@Radeity Radeity marked this pull request as ready for review May 27, 2023 09:27
*/
private GraphFetcher fetcher;

public IteratorFromVertex(Config config, InputSplitRpcService rpcService) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we just pass into the GraphFetcher fetcher arg

Copy link
Member Author

@Radeity Radeity May 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that we can't, fetcher has property localBatch, different threads can not share the same localBatch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can create GraphFetcher when new IteratorFromVertex()

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean like this?

    public Iterator<Vertex> createIteratorFromVertex() {
        GraphFetcher fetcher = InputSourceFactory.createGraphFetcher(this.config, this.rpcService);
        return new IteratorFromVertex(fetcher);
    }

@Radeity Radeity force-pushed the Feature-84 branch 2 times, most recently from ab11331 to 5f74db7 Compare May 27, 2023 13:55
javeme
javeme previously approved these changes May 27, 2023
Copy link
Contributor

@javeme javeme left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

imbajin
imbajin previously approved these changes May 28, 2023
@javeme
Copy link
Contributor

javeme commented May 28, 2023

ci error:

Error:  Tests run: 563, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 218.399 s <<< FAILURE! - in org.apache.hugegraph.computer.suite.unit.UnitTestSuite
Error:  testRunAlgorithmFromHdfs(org.apache.hugegraph.computer.algorithm.centrality.degree.DegreeCentralityTest)  Time elapsed: 4.379 s  <<< FAILURE!
java.lang.AssertionError: [java.util.concurrent.CompletionException: org.apache.hugegraph.computer.core.common.exception.ComputerException: An exception occurred during parallel sending edges, null]
	at org.apache.hugegraph.computer.algorithm.centrality.degree.DegreeCentralityTest.testRunAlgorithmFromHdfs(DegreeCentralityTest.java:50)

[INFO] 
[INFO] Results:
[INFO] 
Error:  Failures: 
Error:    DegreeCentralityTest.testRunAlgorithmFromHdfs:50->AlgorithmTestBase.runAlgorithm:147 [java.util.concurrent.CompletionException: org.apache.hugegraph.computer.core.common.exception.ComputerException:
  An exception occurred during parallel sending edges, null]
[INFO] 
Error:  Tests run: 563, Failures: 1, Errors: 0, Skipped: 0

@Radeity
Copy link
Member Author

Radeity commented May 28, 2023

ci error:

Error:  Tests run: 563, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 218.399 s <<< FAILURE! - in org.apache.hugegraph.computer.suite.unit.UnitTestSuite
Error:  testRunAlgorithmFromHdfs(org.apache.hugegraph.computer.algorithm.centrality.degree.DegreeCentralityTest)  Time elapsed: 4.379 s  <<< FAILURE!
java.lang.AssertionError: [java.util.concurrent.CompletionException: org.apache.hugegraph.computer.core.common.exception.ComputerException: An exception occurred during parallel sending edges, null]
	at org.apache.hugegraph.computer.algorithm.centrality.degree.DegreeCentralityTest.testRunAlgorithmFromHdfs(DegreeCentralityTest.java:50)

[INFO] 
[INFO] Results:
[INFO] 
Error:  Failures: 
Error:    DegreeCentralityTest.testRunAlgorithmFromHdfs:50->AlgorithmTestBase.runAlgorithm:147 [java.util.concurrent.CompletionException: org.apache.hugegraph.computer.core.common.exception.ComputerException:
  An exception occurred during parallel sending edges, null]
[INFO] 
Error:  Tests run: 563, Failures: 1, Errors: 0, Skipped: 0

Hi, @javeme , I've checked recent CI logs, the error is caused by IOException: Filesystem closed when reading data from HDFS and checking whether it's open. In addition, I find that some TransportException were thrown before, thus I guess maybe there's something wrong with CI environment?

@Radeity Radeity marked this pull request as draft May 29, 2023 04:28
@Radeity
Copy link
Member Author

Radeity commented May 29, 2023

Sry, it's my own fault, I'm doing some code profiling now ...

@Radeity Radeity marked this pull request as ready for review May 29, 2023 08:04
@Radeity
Copy link
Member Author

Radeity commented May 29, 2023

Hi, @javeme , I've careful debug, and find the real reason is that FileElementFetcher maintains inputReader, when the type of inputReader is HDFSFileReader, only one client for the same URI will be cached (according to the setting), also, if one thread close fetcher and inputReader, the client will be removed from the map, the other thread can not get this client and then throw IOException: Filesystem closed.

Thus, I use a fetcher array to manage all fetchers in LoadService, and close them together after sending all edges and vertices, PTAL. cc @coderzc @imbajin

Copy link
Contributor

@javeme javeme left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great, it's clear now.
Thanks for your analysis and feedback of the root cause.

public void init() {
assert this.rpcService != null;
for (int i = 0; i < this.fetcherNum; i++) {
this.fetchers[i] = InputSourceFactory.createGraphFetcher(this.config, this.rpcService);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you please also add some comment for the reason why we hold a fetchers array

javeme

This comment was marked as duplicate.

Copy link
Contributor

@javeme javeme left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

Copy link
Member

@coderzc coderzc left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@imbajin imbajin merged commit 7df3b43 into apache:master May 29, 2023
@Radeity Radeity deleted the Feature-84 branch May 31, 2023 02:26
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: Done
Development

Successfully merging this pull request may close these issues.

[Feature] Implement parallel loading of data
4 participants