Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: apache_beam.io.kinesis.ReadDataFromKinesis UserCodeException #23868

Closed
rickiesmooth opened this issue Oct 27, 2022 · 1 comment · Fixed by #26953
Closed

[Bug]: apache_beam.io.kinesis.ReadDataFromKinesis UserCodeException #23868

rickiesmooth opened this issue Oct 27, 2022 · 1 comment · Fixed by #26953

Comments

@rickiesmooth
Copy link
Contributor

rickiesmooth commented Oct 27, 2022

What happened?

Environment:

'apache-beam[gcp,aws]'==2.42.0
apache-beam==2.42.0

When running a DataFlow job that reads from a Kinesis stream like so:

import apache_beam as beam
import logging
import argparse

from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.io.kinesis import InitialPositionInStream, ReadDataFromKinesis

def run(argv=None):
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--aws_access_key", dest="aws_access_key", help="AWS Access Key used to connect to Kinesis stream."
    )
    parser.add_argument(
        "--aws_secret_key", dest="aws_secret_key", help="AWS Secret Key used to connect to Kinesis stream."
    )

    known_args, pipeline_args = parser.parse_known_args(argv)

    pipeline_options = PipelineOptions(pipeline_args)
    pipeline_options.view_as(SetupOptions).save_main_session = True
    pipeline_options.view_as(StandardOptions).streaming = is_streaming

    # The pipeline will be run on exiting the with block.
    with beam.Pipeline(options=pipeline_options) as p:
        p
        | 'ReadFromKinesis' >> ReadDataFromKinesis(
            stream_name="EventsDataStream",
            aws_access_key=known_args.aws_access_key,
            aws_secret_key=known_args.aws_secret_key,
            region="us-east-1",
            initial_position_in_stream=InitialPositionInStream.LATEST
        )
            
      

if __name__ == "__main__":
    logging.getLogger().setLevel(logging.INFO)
    run()

I get the following error:

INFO:apache_beam.runners.dataflow.dataflow_runner:2022-10-27T14:28:50.348Z: JOB_MESSAGE_ERROR: generic::unknown: org.apache.beam.sdk.util.UserCodeException: java.lang.NullPointerException
        at org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39)
        at org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown Source)
        at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:889)
        at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:304)
        at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:241)
        at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1791)
        at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:826)
        at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:304)
        at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:241)
        at org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1791)
        at org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:145)
        at org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2360)
        at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2530)
        at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)
        at org.apache.beam.sdk.io.Read$OutputSingleSource.processElement(Read.java:1018)
        at org.apache.beam.sdk.io.Read$OutputSingleSource$DoFnInvoker.invokeProcessElement(Unknown Source)
        at org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:801)
        at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:304)
        at org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:241)
        at org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213)
        at org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158)
        at org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:528)
        at org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
        at org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.NullPointerException
        at org.apache.beam.sdk.io.kinesis.KinesisReader.getSplitBacklogBytes(KinesisReader.java:172)
        at org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$UnboundedSourceAsSDFRestrictionTracker.getProgress(Read.java:983)
        at org.apache.beam.sdk.transforms.reflect.ByteBuddyDoFnInvokerFactory$DefaultGetSize.invokeGetSize(ByteBuddyDoFnInvokerFactory.java:431)
        at org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeGetSize(Unknown Source)
        at org.apache.beam.fn.harness.FnApiDoFnRunner$SizedRestrictionNonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2322)
        at org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2530)
        at org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)
        at org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn.splitRestriction(Read.java:524)

I'm not sure how to debug this further, but I believe I followed the docs and it should work? Let me know if I can supply a better reproduction case.

Issue Priority

Priority: 2

Issue Component

Component: io-py-aws

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
5 participants