Skip to content

Commit

Permalink
Return unknown backlog on Kinesis if reader not started (apache#26953)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhoufek authored and bullet03 committed Aug 11, 2023
1 parent be052ec commit 75bff69
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
## Bugfixes

* Fixed X (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Fixed KinesisIO `NullPointerException` when a progress check is made before the reader is started (IO) ([#23868](https://github.com/apache/beam/issues/23868))

## Known Issues

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.io.IOException;
import java.util.NoSuchElementException;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO.Read;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.joda.time.Duration;
Expand Down Expand Up @@ -159,6 +160,11 @@ public UnboundedSource.CheckpointMark getCheckpointMark() {
*/
@Override
public long getSplitBacklogBytes() {
// Safety check in case a progress check is made for the start method is called.
if (shardReadersPool == null) {
return UnboundedReader.BACKLOG_UNKNOWN;
}

Instant latestRecordTimestamp = shardReadersPool.getLatestRecordTimestamp();

if (latestRecordTimestamp.equals(BoundedWindow.TIMESTAMP_MIN_VALUE)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.io.IOException;
import java.util.NoSuchElementException;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.joda.time.Duration;
import org.joda.time.Instant;
Expand Down Expand Up @@ -150,6 +151,11 @@ public void getSplitBacklogBytesShouldReturnLastSeenValueWhenKinesisExceptionsOc
assertThat(reader.getSplitBacklogBytes()).isEqualTo(20);
}

@Test
public void getSplitBacklogBytesShouldReturnUnknownIfNotStarted() {
assertThat(reader.getSplitBacklogBytes()).isEqualTo(UnboundedReader.BACKLOG_UNKNOWN);
}

@Test
public void getSplitBacklogBytesShouldReturnLastSeenValueWhenCalledFrequently()
throws TransientKinesisException, IOException {
Expand Down

0 comments on commit 75bff69

Please sign in to comment.