Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement fast-failover for MessageRecvManager and DataClientManager #243

Merged
merged 6 commits into from
May 25, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,9 @@ public void onChannelInactive(ConnectionId connectionId) {
@Override
public void exceptionCaught(TransportException cause,
ConnectionId connectionId) {
// TODO: implement failover
LOG.error("Channel for connectionId {} occurred exception",
connectionId, cause);
DataClientManager.this.connManager.closeClient(connectionId);
DataClientManager.this.sender.transportExceptionCaught(cause, connectionId);
Radeity marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ public void onDone(Channel channel, ChannelFuture future) {
}
}

public void onSuccess(Channel channel, ChannelFuture future) {
public void onSuccess(Channel channel, ChannelFuture future) {
if (LOG.isDebugEnabled()) {
LOG.debug("Successfully send data to '{}'",
TransportUtil.remoteAddress(channel));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@
package org.apache.hugegraph.computer.core.receiver;

import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.hugegraph.computer.core.common.ComputerContext;
import org.apache.hugegraph.computer.core.common.Constants;
Expand Down Expand Up @@ -60,7 +63,9 @@ public class MessageRecvManager implements Manager, MessageHandler {

private int workerCount;
private int expectedFinishMessages;
private CountDownLatch finishMessagesLatch;
private CompletableFuture<Void> finishMessagesFuture;
private AtomicInteger finishMessagesCount;

private long waitFinishMessagesTimeout;
private long superstep;

Expand All @@ -71,6 +76,7 @@ public MessageRecvManager(ComputerContext context,
this.fileManager = fileManager;
this.sortManager = sortManager;
this.superstep = Constants.INPUT_SUPERSTEP;
this.finishMessagesCount = new AtomicInteger();
}

@Override
Expand All @@ -90,8 +96,9 @@ public void init(Config config) {
this.workerCount = config.get(ComputerOptions.JOB_WORKERS_COUNT);
// One for vertex and one for edge.
this.expectedFinishMessages = this.workerCount * 2;
this.finishMessagesLatch = new CountDownLatch(
this.expectedFinishMessages);
this.finishMessagesFuture = new CompletableFuture<>();
this.finishMessagesCount.set(this.expectedFinishMessages);

this.waitFinishMessagesTimeout = config.get(
ComputerOptions.WORKER_WAIT_FINISH_MESSAGES_TIMEOUT);
}
Expand All @@ -103,8 +110,9 @@ public void beforeSuperstep(Config config, int superstep) {
this.messagePartitions = new ComputeMessageRecvPartitions(
this.context, fileGenerator, this.sortManager);
this.expectedFinishMessages = this.workerCount;
this.finishMessagesLatch = new CountDownLatch(
this.expectedFinishMessages);
this.finishMessagesFuture = new CompletableFuture<>();
this.finishMessagesCount.set(this.expectedFinishMessages);

this.superstep = superstep;

if (this.superstep == Constants.INPUT_SUPERSTEP + 1) {
Expand Down Expand Up @@ -138,35 +146,20 @@ public void onChannelInactive(ConnectionId connectionId) {
@Override
public void exceptionCaught(TransportException cause,
ConnectionId connectionId) {
// TODO: implement failover
LOG.warn("Exception caught for connection:{}, root cause:",
LOG.error("Exception caught for connection:{}, root cause:",
connectionId, cause);
this.finishMessagesFuture.completeExceptionally(cause);
}

public void waitReceivedAllMessages() {
try {
boolean status = this.finishMessagesLatch.await(
this.waitFinishMessagesTimeout,
TimeUnit.MILLISECONDS);
if (!status) {
throw new ComputerException(
"Expect %s finish-messages received in %s ms, " +
"%s absence in superstep %s",
this.expectedFinishMessages,
this.waitFinishMessagesTimeout,
this.finishMessagesLatch.getCount(),
this.superstep);
}
} catch (InterruptedException e) {
throw new ComputerException(
"Thread is interrupted while waiting %s " +
"finish-messages received in %s ms, " +
"%s absence in superstep %s",
e,
this.expectedFinishMessages,
this.waitFinishMessagesTimeout,
this.finishMessagesLatch.getCount(),
this.superstep);
this.finishMessagesFuture.get(this.waitFinishMessagesTimeout, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
throw new ComputerException("Time out while waiting %s finish-messages received in %s ms in superstep %s",
this.expectedFinishMessages, this.waitFinishMessagesTimeout, this.superstep, e);
} catch (InterruptedException | ExecutionException e) {
throw new ComputerException("Error while waiting %s finish-messages received in %s ms in superstep %s",
this.expectedFinishMessages, this.waitFinishMessagesTimeout, this.superstep, e);
Radeity marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand Down Expand Up @@ -214,7 +207,11 @@ public void onStarted(ConnectionId connectionId) {
@Override
public void onFinished(ConnectionId connectionId) {
LOG.debug("ConnectionId {} finished", connectionId);
this.finishMessagesLatch.countDown();
int messageIdx = this.finishMessagesCount.decrementAndGet();
if (messageIdx == 0) {
coderzc marked this conversation as resolved.
Show resolved Hide resolved
this.finishMessagesFuture.complete(null);
this.finishMessagesCount.set(0);
coderzc marked this conversation as resolved.
Show resolved Hide resolved
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public void startSend(MessageType type) {
}

/**
* Finsih send message, send the last buffer and put an END signal
* Finish send message, send the last buffer and put an END signal
* into queue
* @param type the message type
*/
Expand Down Expand Up @@ -277,10 +277,10 @@ private void sendControlMessageToWorkers(Set<Integer> workerIds,
}
} catch (TimeoutException e) {
throw new ComputerException("Timeout(%sms) to wait for " +
"controling message(%s) to finished",
"controlling message(%s) to finished",
e, timeout, type);
} catch (InterruptedException | ExecutionException e) {
throw new ComputerException("Failed to wait for controling " +
throw new ComputerException("Failed to wait for controlling " +
"message(%s) to finished", e, type);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import java.util.concurrent.CompletableFuture;

import org.apache.hugegraph.computer.core.common.exception.TransportException;
import org.apache.hugegraph.computer.core.network.ConnectionId;
import org.apache.hugegraph.computer.core.network.message.MessageType;

public interface MessageSender {
Expand All @@ -37,4 +39,10 @@ CompletableFuture<Void> send(int workerId, MessageType type)
* @param message message payload
*/
void send(int workerId, QueuedMessage message) throws InterruptedException;

/**
* Invoked when the channel associated with the given connectionId has
* an exception is thrown processing message.
*/
void transportExceptionCaught(TransportException cause, ConnectionId connectionId);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hugegraph.computer.core.common.exception.TransportException;
import org.apache.hugegraph.computer.core.config.ComputerOptions;
import org.apache.hugegraph.computer.core.config.Config;
import org.apache.hugegraph.computer.core.network.ConnectionId;
import org.apache.hugegraph.computer.core.network.TransportClient;
import org.apache.hugegraph.computer.core.network.message.MessageType;
import org.apache.hugegraph.concurrent.BarrierEvent;
Expand Down Expand Up @@ -103,6 +104,15 @@ public void send(int workerId, QueuedMessage message)
channel.queue.put(message);
}

@Override
public void transportExceptionCaught(TransportException cause, ConnectionId connectionId) {
for (WorkerChannel channel : this.channels) {
if (channel.client.connectionId().equals(connectionId)) {
channel.futureRef.get().completeExceptionally(cause);
}
}
}

public Runnable notBusyNotifier() {
/*
* DataClientHandler.sendAvailable() will call it when client
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

import java.util.concurrent.CompletableFuture;

import org.apache.hugegraph.computer.core.common.exception.TransportException;
import org.apache.hugegraph.computer.core.network.ConnectionId;
import org.apache.hugegraph.computer.core.network.message.MessageType;
import org.apache.hugegraph.computer.core.sender.MessageSender;
import org.apache.hugegraph.computer.core.sender.QueuedMessage;
Expand All @@ -36,4 +38,9 @@ public CompletableFuture<Void> send(int workerId, MessageType type) {
public void send(int workerId, QueuedMessage message) {
// pass
}

@Override
public void transportExceptionCaught(TransportException cause, ConnectionId connectionId) {
// pass
}
}