Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement parallel send data in load graph step #248

Merged
merged 8 commits into from
May 29, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,14 @@ public static synchronized ComputerOptions instance() {
""
);

public static final ConfigOption<Integer> INPUT_SEND_THREAD_NUMS =
new ConfigOption<>(
"input.send_thread_nums",
"The number of threads for parallel sending vertex or edge.",
positiveInt(),
4
);

public static final ConfigOption<Integer> SORT_THREAD_NUMS =
new ConfigOption<>(
"sort.thread_nums",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public ComputeManager(ComputerContext context, Managers managers) {
int computeThreadNum = this.partitionComputeThreadNum(context.config());
this.computeExecutor = ExecutorUtil.newFixedThreadPool(
computeThreadNum, PREFIX);
LOG.info("Created partition compute thread poll, thread num: {}",
LOG.info("Created partition compute thread pool, thread num: {}",
computeThreadNum);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,35 +17,63 @@

package org.apache.hugegraph.computer.core.input;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;
import java.util.function.Supplier;

import org.apache.hugegraph.computer.core.common.ComputerContext;
import org.apache.hugegraph.computer.core.common.exception.ComputerException;
import org.apache.hugegraph.computer.core.config.ComputerOptions;
import org.apache.hugegraph.computer.core.config.Config;
import org.apache.hugegraph.computer.core.graph.vertex.Vertex;
import org.apache.hugegraph.computer.core.manager.Manager;
import org.apache.hugegraph.computer.core.network.message.MessageType;
import org.apache.hugegraph.computer.core.rpc.InputSplitRpcService;
import org.apache.hugegraph.computer.core.sender.MessageSendManager;
import org.apache.hugegraph.computer.core.worker.load.LoadService;
import org.apache.hugegraph.util.ExecutorUtil;
import org.apache.hugegraph.util.Log;
import org.slf4j.Logger;

public class WorkerInputManager implements Manager {

private static final Logger LOG = Log.logger(WorkerInputManager.class);
private static final String PREFIX = "input-send-executor-%s";

public static final String NAME = "worker_input";

/*
* Fetch vertices and edges from the data source and convert them
* to computer-vertices and computer-edges
*/
private final LoadService loadService;

private final ExecutorService sendExecutor;
private final int sendThreadNum;

/*
* Send vertex/edge or message to target worker
*/
private final MessageSendManager sendManager;

public WorkerInputManager(ComputerContext context,
MessageSendManager sendManager) {
this.loadService = new LoadService(context);
this.sendManager = sendManager;

this.sendThreadNum = this.inputSendThreadNum(context.config());
this.sendExecutor = ExecutorUtil.newFixedThreadPool(this.sendThreadNum, PREFIX);
LOG.info("Created parallel sending thread pool, thread num: {}",
sendThreadNum);

this.loadService = new LoadService(context);
}

private Integer inputSendThreadNum(Config config) {
return config.get(ComputerOptions.INPUT_SEND_THREAD_NUMS);
}

@Override
Expand All @@ -61,35 +89,61 @@ public void init(Config config) {

@Override
public void close(Config config) {
this.loadService.close();
this.sendManager.close(config);
this.sendExecutor.shutdown();
}

public void service(InputSplitRpcService rpcService) {
this.loadService.rpcService(rpcService);
}

/**
* TODO: Load vertices and edges parallel.
* When this method finish, it means that all vertices and edges are sent,
* but there is no guarantee that all of them has been received.
*/
public void loadGraph() {
List<CompletableFuture<?>> futures = new ArrayList<>();
CompletableFuture<?> future;
this.sendManager.startSend(MessageType.VERTEX);
Iterator<Vertex> iterator = this.loadService.createIteratorFromVertex();
while (iterator.hasNext()) {
Vertex vertex = iterator.next();
this.sendManager.sendVertex(vertex);
for (int i = 0; i < this.sendThreadNum; i++) {
future = send(this.sendManager::sendVertex, this.loadService::createIteratorFromVertex);
futures.add(future);
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).exceptionally(e -> {
throw new ComputerException("An exception occurred during parallel " +
"sending vertices", e);
}).join();
this.sendManager.finishSend(MessageType.VERTEX);

futures.clear();

this.sendManager.startSend(MessageType.EDGE);
iterator = this.loadService.createIteratorFromEdge();
while (iterator.hasNext()) {
Vertex vertex = iterator.next();
this.sendManager.sendEdge(vertex);
for (int i = 0; i < this.sendThreadNum; i++) {
future = send(this.sendManager::sendEdge, this.loadService::createIteratorFromEdge);
futures.add(future);
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).exceptionally(e -> {
throw new ComputerException("An exception occurred during parallel " +
"sending edges", e);
}).join();
this.sendManager.finishSend(MessageType.EDGE);
this.sendManager.clearBuffer();
}

private CompletableFuture<?> send(Consumer<Vertex> sendConsumer,
Supplier<Iterator<Vertex>> iteratorSupplier) {
return CompletableFuture.runAsync(() -> {
Iterator<Vertex> iterator = iteratorSupplier.get();
while (iterator.hasNext()) {
Vertex vertex = iterator.next();
sendConsumer.accept(vertex);
}
try {
((AutoCloseable) iterator).close();
} catch (Exception e) {
throw new ComputerException("Failed to close iterator", e);
}
}, this.sendExecutor);
}

Radeity marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.hugegraph.computer.core.worker.load;

import java.io.IOException;
import java.util.Iterator;
import java.util.NoSuchElementException;

Expand All @@ -43,33 +44,21 @@ public class LoadService {

private final GraphFactory graphFactory;
private final Config config;
/*
* GraphFetcher include:
* VertexFetcher vertexFetcher;
* EdgeFetcher edgeFetcher;
*/
private GraphFetcher fetcher;

// Service proxy on the client
private InputSplitRpcService rpcService;
private final InputFilter inputFilter;

public LoadService(ComputerContext context) {
this.graphFactory = context.graphFactory();
this.config = context.config();
this.fetcher = null;
this.rpcService = null;
this.inputFilter = context.config().createObject(
ComputerOptions.INPUT_FILTER_CLASS);
}

public void init() {
Radeity marked this conversation as resolved.
Show resolved Hide resolved
assert this.rpcService != null;
this.fetcher = InputSourceFactory.createGraphFetcher(this.config,
this.rpcService);
}

public void close() {
this.fetcher.close();
}

public void rpcService(InputSplitRpcService rpcService) {
Expand All @@ -78,30 +67,36 @@ public void rpcService(InputSplitRpcService rpcService) {
}

public Iterator<Vertex> createIteratorFromVertex() {
return new IteratorFromVertex();
GraphFetcher fetcher = InputSourceFactory.createGraphFetcher(this.config, this.rpcService);
return new IteratorFromVertex(fetcher);
}

public Iterator<Vertex> createIteratorFromEdge() {
return new IteratorFromEdge();
GraphFetcher fetcher = InputSourceFactory.createGraphFetcher(this.config, this.rpcService);
return new IteratorFromEdge(fetcher);
}

private class IteratorFromVertex implements Iterator<Vertex> {
private class IteratorFromVertex implements Iterator<Vertex>, AutoCloseable {

private InputSplit currentSplit;

public IteratorFromVertex() {
// GraphFetcher includes VertexFetcher
private GraphFetcher fetcher;

public IteratorFromVertex(GraphFetcher fetcher) {
this.currentSplit = null;
this.fetcher = fetcher;
}

@Override
public boolean hasNext() {
VertexFetcher vertexFetcher = fetcher.vertexFetcher();
VertexFetcher vertexFetcher = this.fetcher.vertexFetcher();
while (this.currentSplit == null || !vertexFetcher.hasNext()) {
/*
* The first time or the current split is complete,
* need to fetch next input split meta
*/
this.currentSplit = fetcher.nextVertexInputSplit();
this.currentSplit = this.fetcher.nextVertexInputSplit();
if (this.currentSplit.equals(InputSplit.END_SPLIT)) {
return false;
}
Expand All @@ -116,7 +111,7 @@ public Vertex next() {
throw new NoSuchElementException();
}
org.apache.hugegraph.structure.graph.Vertex hugeVertex;
hugeVertex = fetcher.vertexFetcher().next();
hugeVertex = this.fetcher.vertexFetcher().next();
return this.convert(hugeVertex);
}

Expand All @@ -131,9 +126,14 @@ private Vertex convert(org.apache.hugegraph.structure.graph.Vertex
computerVertex.properties(properties);
return computerVertex;
}

Radeity marked this conversation as resolved.
Show resolved Hide resolved
@Override
public void close() throws IOException {
Radeity marked this conversation as resolved.
Show resolved Hide resolved
this.fetcher.close();
}
}

private class IteratorFromEdge implements Iterator<Vertex> {
private class IteratorFromEdge implements Iterator<Vertex>, AutoCloseable {

/*
* TODO: If it is an in edge, we should get the data from the in shard;
Expand All @@ -146,26 +146,30 @@ private class IteratorFromEdge implements Iterator<Vertex> {
private InputSplit currentSplit;
private Vertex currentVertex;

public IteratorFromEdge() {
// GraphFetcher includes EdgeFetcher
private GraphFetcher fetcher;

public IteratorFromEdge(GraphFetcher fetcher) {
// this.direction = config.get(ComputerOptions.EDGE_DIRECTION);
this.maxEdges = config.get(
ComputerOptions.INPUT_MAX_EDGES_IN_ONE_VERTEX);
this.currentSplit = null;
this.currentVertex = null;
this.fetcher = fetcher;
}

@Override
public boolean hasNext() {
if (InputSplit.END_SPLIT.equals(this.currentSplit)) {
return this.currentVertex != null;
}
EdgeFetcher edgeFetcher = fetcher.edgeFetcher();
EdgeFetcher edgeFetcher = this.fetcher.edgeFetcher();
while (this.currentSplit == null || !edgeFetcher.hasNext()) {
/*
* The first time or the current split is complete,
* need to fetch next input split meta
*/
this.currentSplit = fetcher.nextEdgeInputSplit();
this.currentSplit = this.fetcher.nextEdgeInputSplit();
if (this.currentSplit.equals(InputSplit.END_SPLIT)) {
return this.currentVertex != null;
}
Expand All @@ -181,7 +185,7 @@ public Vertex next() {
}

org.apache.hugegraph.structure.graph.Edge hugeEdge;
EdgeFetcher edgeFetcher = fetcher.edgeFetcher();
EdgeFetcher edgeFetcher = this.fetcher.edgeFetcher();
while (edgeFetcher.hasNext()) {
hugeEdge = edgeFetcher.next();
Edge edge = this.convert(hugeEdge);
Expand Down Expand Up @@ -227,5 +231,10 @@ private Edge convert(org.apache.hugegraph.structure.graph.Edge edge) {
computerEdge.properties(properties);
return computerEdge;
}

Radeity marked this conversation as resolved.
Show resolved Hide resolved
@Override
public void close() throws Exception {
this.fetcher.close();
}
}
}