Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Io jms fix ack message checkpoint #22932

Merged
merged 25 commits into from
Oct 19, 2022
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
81257d6
[BEAM-11828] => Fix read message queue implementation
rvballada Apr 26, 2022
6c8cdbb
Merge branch 'apache:master' into io_jms_fix_ack_message_checkpoint
rvballada Apr 26, 2022
d5d9bcd
[BEAM-11828] => New implementation to fix acknowledgment
rvballada Jun 3, 2022
c5ab8ef
[BEAM-11828] => Some refactoring (remove drainMessage method)
rvballada Jun 21, 2022
e93c31a
Merge branch 'apache:master' into io_jms_fix_ack_message_checkpoint
rvballada Aug 25, 2022
3d3ce77
Merge branch 'apache:master' into io_jms_fix_ack_message_checkpoint
rvballada Aug 29, 2022
d16d154
Update CHANGES.md
rvballada Aug 29, 2022
5b14193
Pull request review:
rvballada Sep 29, 2022
0a83c78
Pull request review:
rvballada Sep 30, 2022
8158363
Adding some unit tests to new JmsCHeckpointMark discard method
rvballada Oct 4, 2022
490b849
Code review: discard checkpoint and clear messages at beginning of fi…
rvballada Oct 14, 2022
b16e9f6
Throw an IllegalStateException when adding message when checkpoint is…
rvballada Oct 14, 2022
4b1d6ad
Change closeTimeout from long to Duration
rvballada Oct 14, 2022
aa24d45
CHeck that closeTimeout is non negative
rvballada Oct 14, 2022
5e3c4be
Merge branch 'apache:master' into io_jms_fix_ack_message_checkpoint
rvballada Oct 17, 2022
d91bc23
Add private package fields to perform testd
rvballada Oct 17, 2022
803b964
Code review: update comment
rvballada Oct 18, 2022
602cb2c
Code review: add comment
rvballada Oct 18, 2022
b7bb897
Code review: remove empty space
rvballada Oct 18, 2022
497b191
Code review: use ExecutorOptions to get an instance of ShceduleExecut…
rvballada Oct 18, 2022
c3888bb
Code review: avoid Thread.sleep with ExecutorService.awaitTermination…
rvballada Oct 18, 2022
c5dd19f
Apply suggestions from code review
lukecwik Oct 18, 2022
0a8c633
Apply suggestions from code review
lukecwik Oct 18, 2022
a8df80b
Apply suggestions from code review
lukecwik Oct 18, 2022
2611331
Update sdks/java/io/jms/src/test/java/org/apache/beam/sdk/io/jms/JmsI…
lukecwik Oct 18, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@

## Bugfixes

* Fixed JmsIO acknowledgment issue (https://github.com/apache/beam/issues/20814)
* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
## Known Issues

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.jms.JMSException;
import javax.jms.Message;
import org.apache.beam.sdk.io.UnboundedSource;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand All @@ -38,33 +37,21 @@ class JmsCheckpointMark implements UnboundedSource.CheckpointMark, Serializable

private static final Logger LOG = LoggerFactory.getLogger(JmsCheckpointMark.class);

private Instant oldestMessageTimestamp = Instant.now();
private transient List<Message> messages = new ArrayList<>();
private transient JmsIO.UnboundedJmsReader<?> reader;
private transient List<Message> messagesToAck;
private final int readerHash;
rvballada marked this conversation as resolved.
Show resolved Hide resolved

private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();

JmsCheckpointMark() {}

void add(Message message) throws Exception {
lock.writeLock().lock();
try {
Instant currentMessageTimestamp = new Instant(message.getJMSTimestamp());
if (currentMessageTimestamp.isBefore(oldestMessageTimestamp)) {
oldestMessageTimestamp = currentMessageTimestamp;
}
messages.add(message);
} finally {
lock.writeLock().unlock();
}
JmsCheckpointMark(JmsIO.UnboundedJmsReader<?> reader, @Nullable List<Message> messagesToAck) {
this.reader = reader;
this.messagesToAck = messagesToAck;
this.readerHash = System.identityHashCode(reader);
}

Instant getOldestMessageTimestamp() {
lock.readLock().lock();
try {
return this.oldestMessageTimestamp;
} finally {
lock.readLock().unlock();
}
// set an empty list to messages when deserialize
private void readObject(java.io.ObjectInputStream stream)
throws IOException, ClassNotFoundException {
stream.defaultReadObject();
messagesToAck = new ArrayList<>();
}

/**
Expand All @@ -73,47 +60,38 @@ Instant getOldestMessageTimestamp() {
* batch is a good bound for future messages.
*/
@Override
public void finalizeCheckpoint() {
lock.writeLock().lock();
public void finalizeCheckpoint() throws IOException {
try {
for (Message message : messages) {
try {
LOG.debug("Finalize Checkpoint {} {}", reader, messagesToAck.size());
if (reader.active.get() && reader != null) {
rvballada marked this conversation as resolved.
Show resolved Hide resolved
for (Message message : messagesToAck) {
rvballada marked this conversation as resolved.
Show resolved Hide resolved
message.acknowledge();
Instant currentMessageTimestamp = new Instant(message.getJMSTimestamp());
if (currentMessageTimestamp.isAfter(oldestMessageTimestamp)) {
oldestMessageTimestamp = currentMessageTimestamp;
}
} catch (Exception e) {
LOG.error("Exception while finalizing message: ", e);
reader.watermark.updateAndGet(
prev -> Math.max(currentMessageTimestamp.getMillis(), prev));
}
}
messages.clear();
} catch (JMSException e) {
throw new IOException("Exception while finalizing message ", e);
} finally {
lock.writeLock().unlock();
reader = null;
}
}

// set an empty list to messages when deserialize
private void readObject(java.io.ObjectInputStream stream)
throws IOException, ClassNotFoundException {
stream.defaultReadObject();
messages = new ArrayList<>();
}

@Override
public boolean equals(@Nullable Object o) {
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
if (!(o instanceof JmsCheckpointMark)) {
return false;
}
JmsCheckpointMark that = (JmsCheckpointMark) o;
return oldestMessageTimestamp.equals(that.oldestMessageTimestamp);
return readerHash == that.readerHash;
rvballada marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public int hashCode() {
return Objects.hash(oldestMessageTimestamp);
return readerHash;
rvballada marked this conversation as resolved.
Show resolved Hide resolved
}
}
138 changes: 107 additions & 31 deletions sdks/java/io/jms/src/main/java/org/apache/beam/sdk/io/jms/JmsIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,18 @@
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
Expand All @@ -57,6 +62,7 @@
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;
import org.joda.time.Instant;
Expand Down Expand Up @@ -446,7 +452,7 @@ public List<UnboundedJmsSource<T>> split(int desiredNumSplits, PipelineOptions o
@Override
public UnboundedJmsReader<T> createReader(
PipelineOptions options, JmsCheckpointMark checkpointMark) {
return new UnboundedJmsReader<T>(this, checkpointMark);
return new UnboundedJmsReader<T>(this);
}

@Override
Expand All @@ -463,23 +469,24 @@ public Coder<T> getOutputCoder() {
static class UnboundedJmsReader<T> extends UnboundedReader<T> {

private UnboundedJmsSource<T> source;
private JmsCheckpointMark checkpointMark;
private Connection connection;
private Session session;
private MessageConsumer consumer;
private AutoScaler autoScaler;

private T currentMessage;
private Message currentJmsMessage;
private Instant currentTimestamp;

public UnboundedJmsReader(UnboundedJmsSource<T> source, JmsCheckpointMark checkpointMark) {
Set<Message> messagesToAck;
rvballada marked this conversation as resolved.
Show resolved Hide resolved
AtomicBoolean active = new AtomicBoolean(true);
AtomicLong watermark = new AtomicLong(0);

public UnboundedJmsReader(UnboundedJmsSource<T> source) {
this.source = source;
if (checkpointMark != null) {
this.checkpointMark = checkpointMark;
} else {
this.checkpointMark = new JmsCheckpointMark();
}
this.currentMessage = null;
this.messagesToAck = new HashSet<>();
watermark.getAndSet(System.currentTimeMillis());
}

@Override
Expand All @@ -501,6 +508,7 @@ public boolean start() throws IOException {
this.autoScaler = spec.getAutoScaler();
}
this.autoScaler.start();

} catch (Exception e) {
throw new IOException("Error connecting to JMS", e);
}
Expand All @@ -520,28 +528,32 @@ public boolean start() throws IOException {
} catch (Exception e) {
throw new IOException("Error creating JMS consumer", e);
}
this.active.set(true);

return advance();
}

@Override
public boolean advance() throws IOException {
try {
Message message = this.consumer.receiveNoWait();
if (active.get()) {
try {
Message message = this.consumer.receiveNoWait();

if (message == null) {
currentMessage = null;
return false;
if (message == null) {
currentMessage = null;
return false;
}
currentJmsMessage = message;
messagesToAck.add(message);
currentMessage = this.source.spec.getMessageMapper().mapMessage(message);
currentTimestamp = new Instant(message.getJMSTimestamp());
return true;

} catch (Exception e) {
throw new IOException(e);
}

checkpointMark.add(message);

currentMessage = this.source.spec.getMessageMapper().mapMessage(message);
currentTimestamp = new Instant(message.getJMSTimestamp());

return true;
} catch (Exception e) {
throw new IOException(e);
} else {
return false;
}
}

Expand All @@ -555,7 +567,10 @@ public T getCurrent() throws NoSuchElementException {

@Override
public Instant getWatermark() {
return checkpointMark.getOldestMessageTimestamp();
if (watermark == null) {
return new Instant(0);
}
return new Instant(watermark.get());
}

@Override
Expand All @@ -568,7 +583,9 @@ public Instant getCurrentTimestamp() {

@Override
public CheckpointMark getCheckpointMark() {
return checkpointMark;
List<Message> msgToAcks = Lists.newArrayList(messagesToAck);
messagesToAck.clear();
return new JmsCheckpointMark(this, msgToAcks);
}

@Override
Expand All @@ -583,28 +600,87 @@ public long getTotalBacklogBytes() {

@Override
public void close() throws IOException {
active.set(false);
maybeCloseClient();
}

private void maybeCloseClient() throws IOException {
try {
if (consumer != null) {
consumer.close();
consumer = null;
if (!active.get()) {
doClose();
}
if (session != null) {
session.close();
session = null;
} catch (Exception e) {
throw new IOException(e);
}
}

private void doClose() {
acknowledgeLastMessage();
closeAutoscaler();
closeConsumer();
closeSession();
closeConnection();
}

private void acknowledgeLastMessage() {
if (currentJmsMessage != null) {
try {
currentJmsMessage.acknowledge();
} catch (JMSException e) {
LOG.error("Impossible to acknowledge last message", e);
}
}
}

private void closeConnection() {
try {
if (connection != null) {
connection.stop();
connection.close();
connection = null;
}
} catch (Exception e) {
LOG.error("Error closing connection", e);
}
}

private void closeSession() {
try {
if (session != null) {
session.close();
session = null;
}
} catch (Exception e) {
LOG.error("Error closing session", e);
}
}

private void closeConsumer() {
try {
if (consumer != null) {
consumer.close();
consumer = null;
}
} catch (Exception e) {
LOG.error("Error closing consumer", e);
}
}

private void closeAutoscaler() {
try {
if (autoScaler != null) {
autoScaler.stop();
autoScaler = null;
}
} catch (Exception e) {
throw new IOException(e);
LOG.error("Error closing autoscaler", e);
}
}

@Override
protected void finalize() {
doClose();
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,8 +504,7 @@ public void testCheckpointMarkSafety() throws Exception {
/** Test the checkpoint mark default coder, which is actually AvroCoder. */
@Test
public void testCheckpointMarkDefaultCoder() throws Exception {
JmsCheckpointMark jmsCheckpointMark = new JmsCheckpointMark();
jmsCheckpointMark.add(new ActiveMQMessage());
JmsCheckpointMark jmsCheckpointMark = new JmsCheckpointMark(null, null);
Coder coder = new JmsIO.UnboundedJmsSource(null).getCheckpointMarkCoder();
CoderProperties.coderSerializable(coder);
CoderProperties.coderDecodeEncodeEqual(coder, jmsCheckpointMark);
Expand Down