From 590152ab51b916f0b1da542301cbc12ea50bd1ca Mon Sep 17 00:00:00 2001 From: zhoufek Date: Wed, 31 May 2023 13:35:46 -0400 Subject: [PATCH] Return unknown backlog on Kinesis if reader not started --- CHANGES.md | 1 + .../org/apache/beam/sdk/io/aws2/kinesis/KinesisReader.java | 6 ++++++ .../apache/beam/sdk/io/aws2/kinesis/KinesisReaderTest.java | 6 ++++++ 3 files changed, 13 insertions(+) diff --git a/CHANGES.md b/CHANGES.md index 79fdcaf7f19e4..af9855057d6bb 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -75,6 +75,7 @@ ## Bugfixes * Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)). +* Fixed KinesisIO `NullPointerException` when a progress check is made before the reader is started (IO) ([#23868](https://github.com/apache/beam/issues/23868)) ## Known Issues diff --git a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisReader.java b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisReader.java index 77184ba5ab8f9..1ca37d23cd8cf 100644 --- a/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisReader.java +++ b/sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisReader.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.util.NoSuchElementException; import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO.Read; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.joda.time.Duration; @@ -159,6 +160,11 @@ public UnboundedSource.CheckpointMark getCheckpointMark() { */ @Override public long getSplitBacklogBytes() { + // Safety check in case a progress check is made for the start method is called. + if (shardReadersPool == null) { + return UnboundedReader.BACKLOG_UNKNOWN; + } + Instant latestRecordTimestamp = shardReadersPool.getLatestRecordTimestamp(); if (latestRecordTimestamp.equals(BoundedWindow.TIMESTAMP_MIN_VALUE)) { diff --git a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisReaderTest.java b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisReaderTest.java index 7674b66f32934..c063e817244c2 100644 --- a/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisReaderTest.java +++ b/sdks/java/io/amazon-web-services2/src/test/java/org/apache/beam/sdk/io/aws2/kinesis/KinesisReaderTest.java @@ -29,6 +29,7 @@ import java.io.IOException; import java.util.NoSuchElementException; import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.joda.time.Duration; import org.joda.time.Instant; @@ -150,6 +151,11 @@ public void getSplitBacklogBytesShouldReturnLastSeenValueWhenKinesisExceptionsOc assertThat(reader.getSplitBacklogBytes()).isEqualTo(20); } + @Test + public void getSplitBacklogBytesShouldReturnUnknownIfNotStarted() { + assertThat(reader.getSplitBacklogBytes()).isEqualTo(UnboundedReader.BACKLOG_UNKNOWN); + } + @Test public void getSplitBacklogBytesShouldReturnLastSeenValueWhenCalledFrequently() throws TransientKinesisException, IOException {