Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Go SDK] Implement State and Timer support #20510

Closed
damccorm opened this issue Jun 4, 2022 · 2 comments
Closed

[Go SDK] Implement State and Timer support #20510

damccorm opened this issue Jun 4, 2022 · 2 comments

Comments

@damccorm
Copy link
Contributor

damccorm commented Jun 4, 2022

There's presently no mechanism to specify timers in the Go SDK, or use them at all. The work would be designing the user facing code, and mechanisms, and plumbing through timers properly. For ecample they can't be conflicting with other user facing constructs like Emitter functions and iterator functions.

However there's an abundance of work to handle before starting to deal with state and timers though.

While timers should work in batch, they're commonly more appropriate for streaming which the SDK doesn't support very well at the moment. DoFns need to be able to Self Checkpoint in order to behave as a streaming source (early checkpointing allows a bundle to self terminate, so it can be rescheduled later or as a minor way to split to multiple workers.). We should also implement Triggers and Advanced/Custom window fns first as those are simpler ways to get some of the advanced functions that timers allow for. We also need to be able to set and propagate the watermark correctly through the SDK (and validate that we do).

See the programming guide for a fuller description of State and Timers
https://beam.apache.org/documentation/programming-guide/#state-and-timers 

  • Design an idiomatic Go approach to Timers and State processing for DoFns
    ** Go doesn’t support annotation like constructs, with the exception of struct field tags.
    ** Design likely requires new framework side marker types.
    ** Design likely requires using field tags.
    ** Needs to allow customization for state types. (easier post generics in Go, but an design that doesn’t require that would be viable sooner)

  • State concerns:
    ** Should support deferred batch reads of multiple states
    ** Needs to be expandable to handle ValueState, Combining State, and BagState

  • Timer concerns: 
    ** Needs to handle Event and Processing Time timers.
    ** Dynamic Timer tags (likely the one and only way to handle Go SDK timers)
    ** Needs to introduce an “OnTimer” method, and associated validation.

Similar locations need changing relative to the Map Side Inputs https://issues.apache.org/jira/browse/BEAM-3293 

On the execution layer, the new forms would need to be added like for exec/sideinput.go
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/sideinput.go
The inputs layer, for the actual abstraction using reflection:
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/input.go

But for specifically handling State (which leverages the state API in a more sophisticated way than Side Inputs do) and Timers. The State API manager implementation is handled in the harness https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/harness/statemgr.go 

The funcx package would need to be updated to detect the new parameter forms
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/funcx/fn.go
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/funcx/sideinput.go

as well has the DoFn graph validation code
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/graph/fn.go#L566

They would need to be correctly translated into the pipeline protos:
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/graphx/translate.go#L315
and finally back to the newly created handlers in the exec package.
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/exec/translate.go#L402

The SideInputCache would need to be changed to be a full [UserState cache]https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/core/runtime/harness/harness.go#L101] as the state_caching protocol URN doesn't make a distinction between side inputs and user state, and we should not break behavior.

It's likely other changes are necessary to handle specifics for state and timers.

If implemented pre-generics, the code generator frontend, and backend would need to be updated to detect and generate code for efficient no-reflection overhead map access functions if necessary 

https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/util/shimx/generate.go
https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/util/starcgenx/starcgenx.go

Unit must be added throughout and Integration tests should be added to verify the functionality against portable beam runners.
https://github.com/apache/beam/tree/master/sdks/go/test/integration/primitives

And of course, the user GoDoc should be updated for the support.

Imported from Jira BEAM-10660. Original Jira may contain additional context.
Reported by: lostluck.

@damccorm
Copy link
Contributor Author

damccorm commented Aug 16, 2022

Since this is rather large, I broke it down into 2 issues. These should be completed before marking this done:

@lostluck
Copy link
Contributor

Timers are now supported in the Go SDK, so marking this as completed.

@github-actions github-actions bot added this to the 2.48.0 Release milestone May 11, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants