Skip to content

Commit

Permalink
Fixed #113 - a race condition that could result in duplicate events t…
Browse files Browse the repository at this point in the history
…o be emitted on reconnect
  • Loading branch information
shyiko committed Sep 20, 2016
1 parent f608aa8 commit 5fa7e76
Show file tree
Hide file tree
Showing 4 changed files with 236 additions and 111 deletions.
5 changes: 5 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@
All notable changes to this project will be documented in this file.
This project adheres to [Semantic Versioning](http://semver.org/).

## [0.4.2](https://github.com/shyiko/mysql-binlog-connector-java/compare/0.4.1...0.4.2) - 2016-09-19

### Fixed
- A race condition that could result in duplicate events to be emitted on reconnect ([#113](https://github.com/shyiko/mysql-binlog-connector-java/issues/113)).

## [0.4.1](https://github.com/shyiko/mysql-binlog-connector-java/compare/0.4.0...0.4.1) - 2016-08-31

### Fixed
Expand Down
230 changes: 120 additions & 110 deletions src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public X509Certificate[] getAcceptedIssuers() {
private SocketFactory socketFactory;
private SSLSocketFactory sslSocketFactory;

private PacketChannel channel;
private volatile PacketChannel channel;
private volatile boolean connected;

private ThreadFactory threadFactory;
Expand All @@ -150,9 +150,8 @@ public X509Certificate[] getAcceptedIssuers() {
private long keepAliveConnectTimeout = TimeUnit.SECONDS.toMillis(3);

private volatile ExecutorService keepAliveThreadExecutor;
private long keepAliveThreadShutdownTimeout = TimeUnit.SECONDS.toMillis(6);

private final Lock shutdownLock = new ReentrantLock();
private final Lock connectLock = new ReentrantLock();

/**
* Alias for BinaryLogClient("localhost", 3306, <no schema> = null, username, password).
Expand Down Expand Up @@ -397,69 +396,84 @@ public void setThreadFactory(ThreadFactory threadFactory) {
* @throws IOException if anything goes wrong while trying to connect
*/
public void connect() throws IOException {
if (connected) {
if (!connectLock.tryLock()) {
throw new IllegalStateException("BinaryLogClient is already connected");
}
GreetingPacket greetingPacket;
boolean notifyWhenDisconnected = false;
try {
try {
Socket socket = socketFactory != null ? socketFactory.createSocket() : new Socket();
socket.connect(new InetSocketAddress(hostname, port));
channel = new PacketChannel(socket);
if (channel.getInputStream().peek() == -1) {
throw new EOFException();
channel = openChannel();
GreetingPacket greetingPacket = receiveGreeting();
authenticate(greetingPacket);
connectionId = greetingPacket.getThreadId();
if (binlogFilename == null) {
fetchBinlogFilenameAndPosition();
}
if (binlogPosition < 4) {
if (logger.isLoggable(Level.WARNING)) {
logger.warning("Binary log position adjusted from " + binlogPosition + " to " + 4);
}
binlogPosition = 4;
}
ChecksumType checksumType = fetchBinlogChecksum();
if (checksumType != ChecksumType.NONE) {
confirmSupportOfChecksum(checksumType);
}
requestBinaryLogStream();
} catch (IOException e) {
throw new IOException("Failed to connect to MySQL on " + hostname + ":" + port +
". Please make sure it's running.", e);
disconnectChannel();
throw e;
}
greetingPacket = receiveGreeting();
authenticate(greetingPacket);
if (binlogFilename == null) {
fetchBinlogFilenameAndPosition();
}
if (binlogPosition < 4) {
if (logger.isLoggable(Level.WARNING)) {
logger.warning("Binary log position adjusted from " + binlogPosition + " to " + 4);
connected = true;
notifyWhenDisconnected = true;
if (logger.isLoggable(Level.INFO)) {
String position;
synchronized (gtidSetAccessLock) {
position = gtidSet != null ? gtidSet.toString() : binlogFilename + "/" + binlogPosition;
}
binlogPosition = 4;
logger.info("Connected to " + hostname + ":" + port + " at " + position +
" (" + (blocking ? "sid:" + serverId + ", " : "") + "cid:" + connectionId + ")");
}
ChecksumType checksumType = fetchBinlogChecksum();
if (checksumType != ChecksumType.NONE) {
confirmSupportOfChecksum(checksumType);
synchronized (lifecycleListeners) {
for (LifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onConnect(this);
}
}
requestBinaryLogStream();
} catch (IOException e) {
if (channel != null && channel.isOpen()) {
channel.close();
if (keepAlive && !isKeepAliveThreadRunning()) {
spawnKeepAliveThread();
}
throw e;
}
connected = true;
connectionId = greetingPacket.getThreadId();
if (logger.isLoggable(Level.INFO)) {
String position;
ensureEventDataDeserializer(EventType.ROTATE, RotateEventDataDeserializer.class);
synchronized (gtidSetAccessLock) {
position = gtidSet != null ? gtidSet.toString() : binlogFilename + "/" + binlogPosition;
if (gtidSet != null) {
ensureEventDataDeserializer(EventType.GTID, GtidEventDataDeserializer.class);
}
}
logger.info("Connected to " + hostname + ":" + port + " at " + position +
" (" + (blocking ? "sid:" + serverId + ", " : "") + "cid:" + connectionId + ")");
}
synchronized (lifecycleListeners) {
for (LifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onConnect(this);
listenForEventPackets();
} finally {
connectLock.unlock();
if (notifyWhenDisconnected) {
synchronized (lifecycleListeners) {
for (LifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onDisconnect(this);
}
}
}
}
if (keepAlive && !isKeepAliveThreadRunning()) {
spawnKeepAliveThread();
}
ensureEventDataDeserializer(EventType.ROTATE, RotateEventDataDeserializer.class);
synchronized (gtidSetAccessLock) {
if (gtidSet != null) {
ensureEventDataDeserializer(EventType.GTID, GtidEventDataDeserializer.class);
}

private PacketChannel openChannel() throws IOException {
try {
Socket socket = socketFactory != null ? socketFactory.createSocket() : new Socket();
socket.connect(new InetSocketAddress(hostname, port));
PacketChannel channel = new PacketChannel(socket);
if (channel.getInputStream().peek() == -1) {
throw new EOFException();
}
return channel;
} catch (IOException e) {
throw new IOException("Failed to connect to MySQL on " + hostname + ":" + port +
". Please make sure it's running.", e);
}
listenForEventPackets();
}

private GreetingPacket receiveGreeting() throws IOException {
Expand Down Expand Up @@ -540,51 +554,46 @@ private void authenticate(GreetingPacket greetingPacket) throws IOException {
}

private void spawnKeepAliveThread() {
keepAliveThreadExecutor = Executors.newSingleThreadExecutor(new ThreadFactory() {
final ExecutorService threadExecutor =
Executors.newSingleThreadExecutor(new ThreadFactory() {

@Override
public Thread newThread(Runnable runnable) {
return newNamedThread(runnable, "blc-keepalive-" + hostname + ":" + port);
}
});
keepAliveThreadExecutor.submit(new Runnable() {
@Override
public Thread newThread(Runnable runnable) {
return newNamedThread(runnable, "blc-keepalive-" + hostname + ":" + port);
}
});
threadExecutor.submit(new Runnable() {
@Override
public void run() {
while (true) {
while (!threadExecutor.isShutdown()) {
try {
Thread.sleep(keepAliveInterval);
} catch (InterruptedException e) {
// expected in case of disconnect
}
shutdownLock.lock();
if (threadExecutor.isShutdown()) {
return;
}
try {
if (keepAliveThreadExecutor.isShutdown()) {
return;
channel.write(new PingCommand());
} catch (IOException e) {
if (logger.isLoggable(Level.INFO)) {
logger.info("Trying to restore lost connection to " + hostname + ":" + port);
}
try {
channel.write(new PingCommand());
} catch (IOException e) {
if (logger.isLoggable(Level.INFO)) {
logger.info("Trying to restore lost connection to " + hostname + ":" + port);
}
try {
if (isConnected()) {
disconnectChannel();
}
connect(keepAliveConnectTimeout);
} catch (Exception ce) {
if (logger.isLoggable(Level.WARNING)) {
logger.warning("Failed to restore connection to " + hostname + ":" + port +
". Next attempt in " + keepAliveInterval + "ms");
}
terminateConnect();
connect(keepAliveConnectTimeout);
} catch (Exception ce) {
if (logger.isLoggable(Level.WARNING)) {
logger.warning("Failed to restore connection to " + hostname + ":" + port +
". Next attempt in " + keepAliveInterval + "ms");
}
}
} finally {
shutdownLock.unlock();
}
}
}
});
keepAliveThreadExecutor = threadExecutor;
}

private Thread newNamedThread(Runnable runnable, String threadName) {
Expand Down Expand Up @@ -895,7 +904,7 @@ public void registerLifecycleListener(LifecycleListener lifecycleListener) {
/**
* Unregister all lifecycle listener of specific type.
*/
public synchronized void unregisterLifecycleListener(Class<? extends LifecycleListener> listenerClass) {
public void unregisterLifecycleListener(Class<? extends LifecycleListener> listenerClass) {
synchronized (lifecycleListeners) {
Iterator<LifecycleListener> iterator = lifecycleListeners.iterator();
while (iterator.hasNext()) {
Expand All @@ -910,7 +919,7 @@ public synchronized void unregisterLifecycleListener(Class<? extends LifecycleLi
/**
* Unregister single lifecycle listener.
*/
public synchronized void unregisterLifecycleListener(LifecycleListener eventListener) {
public void unregisterLifecycleListener(LifecycleListener eventListener) {
synchronized (lifecycleListeners) {
lifecycleListeners.remove(eventListener);
}
Expand All @@ -922,48 +931,49 @@ public synchronized void unregisterLifecycleListener(LifecycleListener eventList
* As the result following {@link #connect()} resumes client from where it left off.
*/
public void disconnect() throws IOException {
shutdownLock.lock();
try {
if (isKeepAliveThreadRunning()) {
keepAliveThreadExecutor.shutdownNow();
}
disconnectChannel();
} finally {
shutdownLock.unlock();
terminateKeepAliveThread();
terminateConnect();
}

private void terminateKeepAliveThread() {
ExecutorService keepAliveThreadExecutor = this.keepAliveThreadExecutor;
if (keepAliveThreadExecutor == null) {
return;
}
if (isKeepAliveThreadRunning()) {
waitForKeepAliveThreadToBeTerminated();
keepAliveThreadExecutor.shutdownNow();
while (!awaitTerminationInterruptibly(keepAliveThreadExecutor,
Long.MAX_VALUE, TimeUnit.NANOSECONDS)) {
// ignore
}
}

private void waitForKeepAliveThreadToBeTerminated() {
boolean terminated = false;
private static boolean awaitTerminationInterruptibly(ExecutorService executorService, long timeout, TimeUnit unit) {
try {
terminated = keepAliveThreadExecutor.awaitTermination(keepAliveThreadShutdownTimeout,
TimeUnit.MILLISECONDS);
return executorService.awaitTermination(timeout, unit);
} catch (InterruptedException e) {
if (logger.isLoggable(Level.WARNING)) {
logger.log(Level.WARNING, e.getMessage());
}
return false;
}
if (!terminated) {
throw new IllegalStateException("BinaryLogClient was unable to shut keep alive thread down in " +
keepAliveThreadShutdownTimeout + "ms");
}

private void terminateConnect() throws IOException {
do {
disconnectChannel();
} while (!tryLockInterruptibly(connectLock, 1000, TimeUnit.MILLISECONDS));
connectLock.unlock();
}

private static boolean tryLockInterruptibly(Lock lock, long time, TimeUnit unit) {
try {
return lock.tryLock(time, unit);
} catch (InterruptedException e) {
return false;
}
}

private void disconnectChannel() throws IOException {
try {
connected = false;
if (channel != null && channel.isOpen()) {
channel.close();
}
} finally {
synchronized (lifecycleListeners) {
for (LifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onDisconnect(this);
}
}
connected = false;
if (channel != null && channel.isOpen()) {
channel.close();
}
}

Expand Down
Loading

0 comments on commit 5fa7e76

Please sign in to comment.