Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Io jms fix ack message checkpoint #22932

Merged
merged 25 commits into from
Oct 19, 2022
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
81257d6
[BEAM-11828] => Fix read message queue implementation
rvballada Apr 26, 2022
6c8cdbb
Merge branch 'apache:master' into io_jms_fix_ack_message_checkpoint
rvballada Apr 26, 2022
d5d9bcd
[BEAM-11828] => New implementation to fix acknowledgment
rvballada Jun 3, 2022
c5ab8ef
[BEAM-11828] => Some refactoring (remove drainMessage method)
rvballada Jun 21, 2022
e93c31a
Merge branch 'apache:master' into io_jms_fix_ack_message_checkpoint
rvballada Aug 25, 2022
3d3ce77
Merge branch 'apache:master' into io_jms_fix_ack_message_checkpoint
rvballada Aug 29, 2022
d16d154
Update CHANGES.md
rvballada Aug 29, 2022
5b14193
Pull request review:
rvballada Sep 29, 2022
0a83c78
Pull request review:
rvballada Sep 30, 2022
8158363
Adding some unit tests to new JmsCHeckpointMark discard method
rvballada Oct 4, 2022
490b849
Code review: discard checkpoint and clear messages at beginning of fi…
rvballada Oct 14, 2022
b16e9f6
Throw an IllegalStateException when adding message when checkpoint is…
rvballada Oct 14, 2022
4b1d6ad
Change closeTimeout from long to Duration
rvballada Oct 14, 2022
aa24d45
CHeck that closeTimeout is non negative
rvballada Oct 14, 2022
5e3c4be
Merge branch 'apache:master' into io_jms_fix_ack_message_checkpoint
rvballada Oct 17, 2022
d91bc23
Add private package fields to perform testd
rvballada Oct 17, 2022
803b964
Code review: update comment
rvballada Oct 18, 2022
602cb2c
Code review: add comment
rvballada Oct 18, 2022
b7bb897
Code review: remove empty space
rvballada Oct 18, 2022
497b191
Code review: use ExecutorOptions to get an instance of ShceduleExecut…
rvballada Oct 18, 2022
c3888bb
Code review: avoid Thread.sleep with ExecutorService.awaitTermination…
rvballada Oct 18, 2022
c5dd19f
Apply suggestions from code review
lukecwik Oct 18, 2022
0a8c633
Apply suggestions from code review
lukecwik Oct 18, 2022
a8df80b
Apply suggestions from code review
lukecwik Oct 18, 2022
2611331
Update sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsI…
lukecwik Oct 18, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@

## Bugfixes

* Fixed JmsIO acknowledgment issue (https://github.com/apache/beam/issues/20814)
* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
## Known Issues

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.jms.Message;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;
import org.slf4j.Logger;
Expand All @@ -41,13 +42,20 @@ class JmsCheckpointMark implements UnboundedSource.CheckpointMark, Serializable
private Instant oldestMessageTimestamp = Instant.now();
private transient List<Message> messages = new ArrayList<>();

private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@VisibleForTesting transient boolean discarded = false;

@VisibleForTesting final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

JmsCheckpointMark() {}

void add(Message message) throws Exception {
lock.writeLock().lock();
try {
if (discarded) {
throw new IllegalStateException(
String.format(
"Attempting to add message %s to checkpoint that is discarded.", message));
}
Instant currentMessageTimestamp = new Instant(message.getJMSTimestamp());
if (currentMessageTimestamp.isBefore(oldestMessageTimestamp)) {
oldestMessageTimestamp = currentMessageTimestamp;
Expand All @@ -67,6 +75,15 @@ Instant getOldestMessageTimestamp() {
}
rvballada marked this conversation as resolved.
Show resolved Hide resolved
}

void discard() {
lock.writeLock().lock();
try {
this.discarded = true;
} finally {
lock.writeLock().unlock();
}
}

/**
* Acknowledge all outstanding message. Since we believe that messages will be delivered in
* timestamp order, and acknowledged messages will not be retried, the newest message in this
Expand All @@ -76,6 +93,10 @@ Instant getOldestMessageTimestamp() {
public void finalizeCheckpoint() {
lock.writeLock().lock();
try {
if (discarded) {
messages.clear();
return;
}
for (Message message : messages) {
try {
message.acknowledge();
Expand All @@ -98,6 +119,7 @@ private void readObject(java.io.ObjectInputStream stream)
throws IOException, ClassNotFoundException {
stream.defaultReadObject();
messages = new ArrayList<>();
discarded = false;
}

@Override
Expand Down
108 changes: 91 additions & 17 deletions sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
Expand Down Expand Up @@ -123,11 +126,13 @@
public class JmsIO {

private static final Logger LOG = LoggerFactory.getLogger(JmsIO.class);
private static final Duration DEFAULT_CLOSE_TIMEOUT = Duration.millis(60000L);

public static Read<JmsRecord> read() {
return new AutoValue_JmsIO_Read.Builder<JmsRecord>()
.setMaxNumRecords(Long.MAX_VALUE)
.setCoder(SerializableCoder.of(JmsRecord.class))
.setCloseTimeout(DEFAULT_CLOSE_TIMEOUT)
.setMessageMapper(
(MessageMapper<JmsRecord>)
new MessageMapper<JmsRecord>() {
Expand Down Expand Up @@ -162,7 +167,10 @@ public JmsRecord mapMessage(Message message) throws Exception {
}

public static <T> Read<T> readMessage() {
return new AutoValue_JmsIO_Read.Builder<T>().setMaxNumRecords(Long.MAX_VALUE).build();
return new AutoValue_JmsIO_Read.Builder<T>()
.setMaxNumRecords(Long.MAX_VALUE)
.setCloseTimeout(DEFAULT_CLOSE_TIMEOUT)
.build();
}

public static <EventT> Write<EventT> write() {
Expand Down Expand Up @@ -206,6 +214,8 @@ public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>>

abstract @Nullable AutoScaler getAutoScaler();

abstract Duration getCloseTimeout();

abstract Builder<T> builder();

@AutoValue.Builder
Expand All @@ -230,6 +240,8 @@ abstract static class Builder<T> {

abstract Builder<T> setAutoScaler(AutoScaler autoScaler);

abstract Builder<T> setCloseTimeout(Duration closeTimeout);

abstract Read<T> build();
}

Expand Down Expand Up @@ -364,6 +376,18 @@ public Read<T> withAutoScaler(AutoScaler autoScaler) {
return builder().setAutoScaler(autoScaler).build();
}

/**
* Sets the amount of time to wait for callbacks from the runner stating that the output has
* been durably persisted before closing the connection to the JMS broker. Any callbacks that do
* not occur will cause any unacknowledged messages to be returned to the JMS broker and
* redelivered to other clients.
lukecwik marked this conversation as resolved.
Show resolved Hide resolved
*/
public Read<T> withCloseTimeout(Duration closeTimeout) {
checkArgument(closeTimeout != null, "closeTimeout can not be null");
checkArgument(closeTimeout.getMillis() >= 0, "Close timeout must be non-negative.");
return builder().setCloseTimeout(closeTimeout).build();
rvballada marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public PCollection<T> expand(PBegin input) {
checkArgument(getConnectionFactory() != null, "withConnectionFactory() is required");
Expand Down Expand Up @@ -446,7 +470,7 @@ public List<UnboundedJmsSource<T>> split(int desiredNumSplits, PipelineOptions o
@Override
public UnboundedJmsReader<T> createReader(
PipelineOptions options, JmsCheckpointMark checkpointMark) {
return new UnboundedJmsReader<T>(this, checkpointMark);
return new UnboundedJmsReader<T>(this);
}

@Override
Expand All @@ -472,13 +496,9 @@ static class UnboundedJmsReader<T> extends UnboundedReader<T> {
private T currentMessage;
private Instant currentTimestamp;

public UnboundedJmsReader(UnboundedJmsSource<T> source, JmsCheckpointMark checkpointMark) {
public UnboundedJmsReader(UnboundedJmsSource<T> source) {
this.source = source;
if (checkpointMark != null) {
this.checkpointMark = checkpointMark;
} else {
this.checkpointMark = new JmsCheckpointMark();
}
this.checkpointMark = new JmsCheckpointMark();
this.currentMessage = null;
}

Expand Down Expand Up @@ -582,29 +602,83 @@ public long getTotalBacklogBytes() {
}

@Override
public void close() throws IOException {
public void close() {
doClose();
}

@SuppressWarnings("FutureReturnValueIgnored")
private void doClose() {

try {
if (consumer != null) {
consumer.close();
consumer = null;
closeAutoscaler();
closeConsumer();
ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use the scheduled executor service from ExecutorOptions once it is merged from #23234

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes ok, I noticed that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we get the executor service from ExecutorOptions? As far as I can see, PipelineOptions should be then a reader field, but how do we get the ExecutorService from PipelineOptions?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pipelineOptions.as(ExecutorOptions.class).getScheduledExecutorService(), the key part being the as(...) method

executorService.schedule(
() -> {
LOG.debug(
"Closing session and connection after delay {}", source.spec.getCloseTimeout());
// Discard the checkpoints and set the reader as inactive
checkpointMark.discard();
closeSession();
closeConnection();
},
source.spec.getCloseTimeout().getMillis(),
TimeUnit.MILLISECONDS);
rvballada marked this conversation as resolved.
Show resolved Hide resolved

} catch (Exception e) {
LOG.error("Error closing reader", e);
}
}

private void closeConnection() {
try {
if (connection != null) {
connection.stop();
connection.close();
connection = null;
}
} catch (Exception e) {
LOG.error("Error closing connection", e);
}
}

private void closeSession() {
try {
if (session != null) {
session.close();
session = null;
}
if (connection != null) {
connection.stop();
connection.close();
connection = null;
} catch (Exception e) {
LOG.error("Error closing session" + e.getMessage(), e);
}
}

private void closeConsumer() {
try {
if (consumer != null) {
consumer.close();
consumer = null;
}
} catch (Exception e) {
LOG.error("Error closing consumer", e);
}
}

private void closeAutoscaler() {
try {
if (autoScaler != null) {
autoScaler.stop();
autoScaler = null;
}
} catch (Exception e) {
throw new IOException(e);
LOG.error("Error closing autoscaler", e);
}
}

@Override
protected void finalize() {
doClose();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
lukecwik marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -67,6 +68,7 @@
import org.apache.beam.sdk.transforms.SerializableBiFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
import org.joda.time.Duration;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
lukecwik marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -505,7 +507,6 @@ public void testCheckpointMarkSafety() throws Exception {
@Test
public void testCheckpointMarkDefaultCoder() throws Exception {
JmsCheckpointMark jmsCheckpointMark = new JmsCheckpointMark();
jmsCheckpointMark.add(new ActiveMQMessage());
Coder coder = new JmsIO.UnboundedJmsSource(null).getCheckpointMarkCoder();
CoderProperties.coderSerializable(coder);
CoderProperties.coderDecodeEncodeEqual(coder, jmsCheckpointMark);
Expand Down Expand Up @@ -555,6 +556,101 @@ public void testCustomAutoscaler() throws IOException {
verify(autoScaler, times(1)).stop();
}

@Test
public void testCloseWithTimeout() throws IOException {

lukecwik marked this conversation as resolved.
Show resolved Hide resolved
Duration closeTimeout = Duration.millis(2000L);
JmsIO.Read spec =
JmsIO.read()
.withConnectionFactory(connectionFactory)
.withUsername(USERNAME)
.withPassword(PASSWORD)
.withQueue(QUEUE)
.withCloseTimeout(closeTimeout);

JmsIO.UnboundedJmsSource source = new JmsIO.UnboundedJmsSource(spec);
JmsIO.UnboundedJmsReader reader = source.createReader(null, null);

reader.start();
reader.close();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would suggest using a mock ScheduledExecutorService that you set on the PipelineOptions object when creating the reader. This way you can inject here in the test and capture the runnable/callable directly without needing to have a test reliant on Thread.sleep

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we get the executor service from ExecutorOptions? As far as I can see, PipelineOptions should be then a reader field, but how do we get the ExecutorService from PipelineOptions?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pipelineOptions.as(ExecutorOptions.class).getScheduledExecutorService(), the key part being the as(...) method

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes I get it. I don't know exactly how to mock it.


boolean discarded = getDiscardedValue(reader);
assertFalse(discarded);
lukecwik marked this conversation as resolved.
Show resolved Hide resolved
try {
Thread.sleep(closeTimeout.getMillis() + 1000);
} catch (InterruptedException ignored) {
}
discarded = getDiscardedValue(reader);
assertTrue(discarded);
lukecwik marked this conversation as resolved.
Show resolved Hide resolved
}

private boolean getDiscardedValue(JmsIO.UnboundedJmsReader reader) {
JmsCheckpointMark checkpoint = (JmsCheckpointMark) reader.getCheckpointMark();
checkpoint.lock.readLock().lock();
try {
return checkpoint.discarded;
} finally {
checkpoint.lock.readLock().unlock();
}
}

@Test
public void testDiscardCheckpointMark() throws Exception {

rvballada marked this conversation as resolved.
Show resolved Hide resolved
Connection connection =
connectionFactoryWithSyncAcksAndWithoutPrefetch.createConnection(USERNAME, PASSWORD);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(session.createQueue(QUEUE));
for (int i = 0; i < 10; i++) {
producer.send(session.createTextMessage("test " + i));
}
producer.close();
session.close();
connection.close();

JmsIO.Read spec =
JmsIO.read()
.withConnectionFactory(connectionFactoryWithSyncAcksAndWithoutPrefetch)
.withUsername(USERNAME)
.withPassword(PASSWORD)
.withQueue(QUEUE);
JmsIO.UnboundedJmsSource source = new JmsIO.UnboundedJmsSource(spec);
JmsIO.UnboundedJmsReader reader = source.createReader(null, null);

// start the reader and move to the first record
assertTrue(reader.start());

// consume 3 messages (NB: start already consumed the first message)
rvballada marked this conversation as resolved.
Show resolved Hide resolved
for (int i = 0; i < 3; i++) {
assertTrue(reader.advance());
}

// the messages are still pending in the queue (no ACK yet)
assertEquals(10, count(QUEUE));

// we finalize the checkpoint
reader.getCheckpointMark().finalizeCheckpoint();

// the checkpoint finalize ack the messages, and so they are not pending in the queue anymore
assertEquals(6, count(QUEUE));

// we read the 6 pending messages
for (int i = 0; i < 6; i++) {
assertTrue(reader.advance());
}

// still 6 pending messages as we didn't finalize the checkpoint
assertEquals(6, count(QUEUE));

// But here we discard the checkpoint
((JmsCheckpointMark) reader.getCheckpointMark()).discard();
// we finalize the checkpoint: no more message in the queue
rvballada marked this conversation as resolved.
Show resolved Hide resolved
reader.getCheckpointMark().finalizeCheckpoint();

assertEquals(6, count(QUEUE));
}

private int count(String queue) throws Exception {
Connection connection = connectionFactory.createConnection(USERNAME, PASSWORD);
connection.start();
Expand Down