Skip to content

Commit

Permalink
Fixed #113 - a race condition that could result in duplicate events t…
Browse files Browse the repository at this point in the history
…o be emitted on reconnect
  • Loading branch information
shyiko committed Sep 20, 2016
1 parent f608aa8 commit 481f63b
Show file tree
Hide file tree
Showing 4 changed files with 196 additions and 56 deletions.
5 changes: 5 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@
All notable changes to this project will be documented in this file.
This project adheres to [Semantic Versioning](http://semver.org/).

## [0.4.2](https://github.com/shyiko/mysql-binlog-connector-java/compare/0.4.1...0.4.2) - 2016-09-19

### Fixed
- A race condition that could result in duplicate events to be emitted on reconnect ([#113](https://github.com/shyiko/mysql-binlog-connector-java/issues/113)).

## [0.4.1](https://github.com/shyiko/mysql-binlog-connector-java/compare/0.4.0...0.4.1) - 2016-08-31

### Fixed
Expand Down
135 changes: 80 additions & 55 deletions src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ public X509Certificate[] getAcceptedIssuers() {
private SocketFactory socketFactory;
private SSLSocketFactory sslSocketFactory;

private PacketChannel channel;
private volatile PacketChannel channel;
private volatile boolean connected;

private ThreadFactory threadFactory;
Expand All @@ -152,7 +152,10 @@ public X509Certificate[] getAcceptedIssuers() {
private volatile ExecutorService keepAliveThreadExecutor;
private long keepAliveThreadShutdownTimeout = TimeUnit.SECONDS.toMillis(6);

private final Lock shutdownLock = new ReentrantLock();
private final Lock connectLock = new ReentrantLock();
private final Lock disconnectLock = new ReentrantLock();
// used to prevent channel reinitialization after it was closed in #disconnectChannel().
private volatile boolean awaitingConnectTermination;

/**
* Alias for BinaryLogClient("localhost", 3306, <no schema> = null, username, password).
Expand Down Expand Up @@ -397,69 +400,80 @@ public void setThreadFactory(ThreadFactory threadFactory) {
* @throws IOException if anything goes wrong while trying to connect
*/
public void connect() throws IOException {
if (connected) {
if (!connectLock.tryLock()) {
throw new IllegalStateException("BinaryLogClient is already connected");
}
GreetingPacket greetingPacket;
try {
try {
Socket socket = socketFactory != null ? socketFactory.createSocket() : new Socket();
socket.connect(new InetSocketAddress(hostname, port));
channel = new PacketChannel(socket);
if (channel.getInputStream().peek() == -1) {
throw new EOFException();
channel = establishConnection();
if (awaitingConnectTermination) {
throw new IOException();
}
GreetingPacket greetingPacket = receiveGreeting();
authenticate(greetingPacket);
connectionId = greetingPacket.getThreadId();
if (binlogFilename == null) {
fetchBinlogFilenameAndPosition();
}
if (binlogPosition < 4) {
if (logger.isLoggable(Level.WARNING)) {
logger.warning("Binary log position adjusted from " + binlogPosition + " to " + 4);
}
binlogPosition = 4;
}
ChecksumType checksumType = fetchBinlogChecksum();
if (checksumType != ChecksumType.NONE) {
confirmSupportOfChecksum(checksumType);
}
requestBinaryLogStream();
} catch (IOException e) {
throw new IOException("Failed to connect to MySQL on " + hostname + ":" + port +
". Please make sure it's running.", e);
}
greetingPacket = receiveGreeting();
authenticate(greetingPacket);
if (binlogFilename == null) {
fetchBinlogFilenameAndPosition();
if (channel != null && channel.isOpen()) {
channel.close();
}
throw e;
}
if (binlogPosition < 4) {
if (logger.isLoggable(Level.WARNING)) {
logger.warning("Binary log position adjusted from " + binlogPosition + " to " + 4);
connected = true;
if (logger.isLoggable(Level.INFO)) {
String position;
synchronized (gtidSetAccessLock) {
position = gtidSet != null ? gtidSet.toString() : binlogFilename + "/" + binlogPosition;
}
binlogPosition = 4;
logger.info("Connected to " + hostname + ":" + port + " at " + position +
" (" + (blocking ? "sid:" + serverId + ", " : "") + "cid:" + connectionId + ")");
}
ChecksumType checksumType = fetchBinlogChecksum();
if (checksumType != ChecksumType.NONE) {
confirmSupportOfChecksum(checksumType);
synchronized (lifecycleListeners) {
for (LifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onConnect(this);
}
}
requestBinaryLogStream();
} catch (IOException e) {
if (channel != null && channel.isOpen()) {
channel.close();
if (keepAlive && !isKeepAliveThreadRunning()) {
spawnKeepAliveThread();
}
throw e;
}
connected = true;
connectionId = greetingPacket.getThreadId();
if (logger.isLoggable(Level.INFO)) {
String position;
ensureEventDataDeserializer(EventType.ROTATE, RotateEventDataDeserializer.class);
synchronized (gtidSetAccessLock) {
position = gtidSet != null ? gtidSet.toString() : binlogFilename + "/" + binlogPosition;
}
logger.info("Connected to " + hostname + ":" + port + " at " + position +
" (" + (blocking ? "sid:" + serverId + ", " : "") + "cid:" + connectionId + ")");
}
synchronized (lifecycleListeners) {
for (LifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onConnect(this);
if (gtidSet != null) {
ensureEventDataDeserializer(EventType.GTID, GtidEventDataDeserializer.class);
}
}
listenForEventPackets();
} finally {
connectLock.unlock();
}
if (keepAlive && !isKeepAliveThreadRunning()) {
spawnKeepAliveThread();
}
ensureEventDataDeserializer(EventType.ROTATE, RotateEventDataDeserializer.class);
synchronized (gtidSetAccessLock) {
if (gtidSet != null) {
ensureEventDataDeserializer(EventType.GTID, GtidEventDataDeserializer.class);
}

private PacketChannel establishConnection() throws IOException {
try {
Socket socket = socketFactory != null ? socketFactory.createSocket() : new Socket();
socket.connect(new InetSocketAddress(hostname, port));
PacketChannel channel = new PacketChannel(socket);
if (channel.getInputStream().peek() == -1) {
throw new EOFException();
}
return channel;
} catch (IOException e) {
throw new IOException("Failed to connect to MySQL on " + hostname + ":" + port +
". Please make sure it's running.", e);
}
listenForEventPackets();
}

private GreetingPacket receiveGreeting() throws IOException {
Expand Down Expand Up @@ -556,7 +570,7 @@ public void run() {
} catch (InterruptedException e) {
// expected in case of disconnect
}
shutdownLock.lock();
disconnectLock.lock();
try {
if (keepAliveThreadExecutor.isShutdown()) {
return;
Expand All @@ -580,7 +594,7 @@ public void run() {
}
}
} finally {
shutdownLock.unlock();
disconnectLock.unlock();
}
}
}
Expand Down Expand Up @@ -895,7 +909,7 @@ public void registerLifecycleListener(LifecycleListener lifecycleListener) {
/**
* Unregister all lifecycle listener of specific type.
*/
public synchronized void unregisterLifecycleListener(Class<? extends LifecycleListener> listenerClass) {
public void unregisterLifecycleListener(Class<? extends LifecycleListener> listenerClass) {
synchronized (lifecycleListeners) {
Iterator<LifecycleListener> iterator = lifecycleListeners.iterator();
while (iterator.hasNext()) {
Expand All @@ -910,7 +924,7 @@ public synchronized void unregisterLifecycleListener(Class<? extends LifecycleLi
/**
* Unregister single lifecycle listener.
*/
public synchronized void unregisterLifecycleListener(LifecycleListener eventListener) {
public void unregisterLifecycleListener(LifecycleListener eventListener) {
synchronized (lifecycleListeners) {
lifecycleListeners.remove(eventListener);
}
Expand All @@ -922,14 +936,14 @@ public synchronized void unregisterLifecycleListener(LifecycleListener eventList
* As the result following {@link #connect()} resumes client from where it left off.
*/
public void disconnect() throws IOException {
shutdownLock.lock();
disconnectLock.lock();
try {
if (isKeepAliveThreadRunning()) {
keepAliveThreadExecutor.shutdownNow();
}
disconnectChannel();
} finally {
shutdownLock.unlock();
disconnectLock.unlock();
}
if (isKeepAliveThreadRunning()) {
waitForKeepAliveThreadToBeTerminated();
Expand Down Expand Up @@ -959,6 +973,7 @@ private void disconnectChannel() throws IOException {
channel.close();
}
} finally {
waitForConnectToTerminate();
synchronized (lifecycleListeners) {
for (LifecycleListener lifecycleListener : lifecycleListeners) {
lifecycleListener.onDisconnect(this);
Expand All @@ -967,6 +982,16 @@ private void disconnectChannel() throws IOException {
}
}

private void waitForConnectToTerminate() {
awaitingConnectTermination = true;
try {
connectLock.lock();
connectLock.unlock();
} finally {
awaitingConnectTermination = false;
}
}

/**
* {@link BinaryLogClient}'s event listener.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
import com.github.shyiko.mysql.binlog.event.deserialization.EventHeaderV4Deserializer;
import com.github.shyiko.mysql.binlog.event.deserialization.QueryEventDataDeserializer;
import com.github.shyiko.mysql.binlog.io.BufferedSocketInputStream;
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
import com.github.shyiko.mysql.binlog.network.AuthenticationException;
import com.github.shyiko.mysql.binlog.network.ServerException;
import com.github.shyiko.mysql.binlog.network.SocketFactory;
import org.mockito.InOrder;
import org.testng.SkipException;
import org.testng.annotations.AfterClass;
Expand All @@ -42,10 +44,15 @@

import java.io.Closeable;
import java.io.EOFException;
import java.io.FilterInputStream;
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.math.BigDecimal;
import java.math.MathContext;
import java.net.Socket;
import java.net.SocketException;
import java.sql.Connection;
import java.sql.DriverManager;
Expand All @@ -62,7 +69,10 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;

Expand Down Expand Up @@ -754,6 +764,106 @@ public void execute(Statement statement) throws SQLException {
}
}

@Test
public void testReconnectRaceCondition() throws Exception {
// this test relies on SO_RCVBUF (sysctl -a | grep rcvbuf)
// a more reliable way would be to use buffered 2-level concurrent filter input stream
try {
client.disconnect();
final BinaryLogClient binaryLogClient =
new BinaryLogClient(slave.hostname, slave.port, slave.username, slave.password);
final Lock inputStreamLock = new ReentrantLock();
final AtomicBoolean breakOutputStream = new AtomicBoolean();
binaryLogClient.setSocketFactory(new SocketFactory() {

@Override
public Socket createSocket() throws SocketException {
return new Socket() {

@Override
public InputStream getInputStream() throws IOException {
return new FilterInputStream(new BufferedSocketInputStream(super.getInputStream())) {

@Override
public int read(byte[] b, int off, int len) throws IOException {
int read = super.read(b, off, len);
inputStreamLock.lock();
inputStreamLock.unlock();
return read;
}
};
}

@Override
public OutputStream getOutputStream() throws IOException {
return new FilterOutputStream(super.getOutputStream()) {

@Override
public void write(int b) throws IOException {
if (breakOutputStream.get()) {
binaryLogClient.setSocketFactory(null);
throw new IOException();
}
super.write(b);
}
};
}
};
}
});
binaryLogClient.registerEventListener(eventListener);
binaryLogClient.setKeepAliveInterval(TimeUnit.MILLISECONDS.toMillis(100));
binaryLogClient.connect(DEFAULT_TIMEOUT);
try {
eventListener.waitFor(EventType.FORMAT_DESCRIPTION, 1, DEFAULT_TIMEOUT);
master.execute(new Callback<Statement>() {
@Override
public void execute(Statement statement) throws SQLException {
statement.execute("insert into bikini_bottom values('SpongeBob')");
}
});
eventListener.waitFor(WriteRowsEventData.class, 1, DEFAULT_TIMEOUT);
// lock input stream
inputStreamLock.lock();
// fill input stream buffer
master.execute(new Callback<Statement>() {
@Override
public void execute(Statement statement) throws SQLException {
statement.execute("insert into bikini_bottom values('Patrick')");
statement.execute("insert into bikini_bottom values('Rocky')");
}
});
// trigger reconnect
final CountDownLatch reconnect = new CountDownLatch(1);
binaryLogClient.registerLifecycleListener(new BinaryLogClient.AbstractLifecycleListener() {

@Override
public void onConnect(BinaryLogClient client) {
reconnect.countDown();
}
});
breakOutputStream.set(true);
// wait for connection to be reestablished
reconnect.await(DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS);
// unlock input stream (from previous connection)
inputStreamLock.unlock();
master.execute(new Callback<Statement>() {
@Override
public void execute(Statement statement) throws SQLException {
statement.execute("delete from bikini_bottom where name = 'Patrick'");
}
});
eventListener.waitFor(DeleteRowsEventData.class, 1, DEFAULT_TIMEOUT);
// assert that no events were delivered twice
eventListener.waitFor(WriteRowsEventData.class, 2, DEFAULT_TIMEOUT);
} finally {
binaryLogClient.disconnect();
}
} finally {
client.connect(DEFAULT_TIMEOUT);
}
}

@AfterMethod
public void afterEachTest() throws Exception {
final CountDownLatch latch = new CountDownLatch(1);
Expand Down
2 changes: 1 addition & 1 deletion supplement/codequality/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@
<property name="max" value="120"/>
</module>
<module name="MethodLength">
<property name="max" value="65"/>
<property name="max" value="100"/>
</module>
<module name="ParameterNumber">
<property name="max" value="5"/>
Expand Down

0 comments on commit 481f63b

Please sign in to comment.