Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#22737] Fit & Finish for Go SDK timer support. #26782

Merged
merged 18 commits into from
Jun 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 63 additions & 41 deletions sdks/go/examples/timer_wordcap/wordcap.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,12 @@
package main

import (
"bytes"
"context"
"encoding/binary"
"flag"
"fmt"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/state"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/timers"
"github.com/apache/beam/sdks/v2/go/pkg/beam/log"
Expand Down Expand Up @@ -62,21 +59,7 @@ func NewStateful() *Stateful {
}
}

func (s *Stateful) OnTimer(ctx context.Context, ts beam.EventTime, tp timers.Provider, key, timerKey, timerTag string, emit func(string, string)) {
switch timerKey {
case "outputState":
log.Infof(ctx, "Timer outputState fired on stateful for element: %v.", key)
s.OutputState.Set(tp, ts.ToTime().Add(5*time.Second), timers.WithTag("1"))
switch timerTag {
case "1":
s.OutputState.Clear(tp)
log.Infof(ctx, "Timer with tag 1 fired on outputState stateful DoFn.")
emit(timerKey, timerTag)
}
}
}

func (s *Stateful) ProcessElement(ctx context.Context, ts beam.EventTime, sp state.Provider, tp timers.Provider, key, word string, emit func(string, string)) error {
func (s *Stateful) ProcessElement(ctx context.Context, ts beam.EventTime, sp state.Provider, tp timers.Provider, key, word string, _ func(beam.EventTime, string, string)) error {
s.ElementBag.Add(sp, word)
s.MinTime.Add(sp, int64(ts))

Expand All @@ -85,23 +68,77 @@ func (s *Stateful) ProcessElement(ctx context.Context, ts beam.EventTime, sp sta
return err
}
if !ok {
toFire = int64(mtime.Now().Add(1 * time.Minute))
toFire = int64(time.Now().Add(30 * time.Second).UnixMilli())
}
minTime, _, err := s.MinTime.Read(sp)
if err != nil {
return err
}

s.OutputState.Set(tp, time.UnixMilli(toFire), timers.WithOutputTimestamp(time.UnixMilli(minTime)), timers.WithTag(word))
s.OutputState.Set(tp, time.UnixMilli(toFire), timers.WithOutputTimestamp(time.UnixMilli(minTime)))
// A timer can be set with independent to fire with independant string tags.
s.OutputState.Set(tp, time.UnixMilli(toFire), timers.WithTag(word), timers.WithOutputTimestamp(time.UnixMilli(minTime)))
s.TimerTime.Write(sp, toFire)

return nil
}

func (s *Stateful) OnTimer(ctx context.Context, ts beam.EventTime, sp state.Provider, tp timers.Provider, key string, timer timers.Context, emit func(beam.EventTime, string, string)) {
log.Infof(ctx, "Timer fired for key %q, for family %q and tag %q", key, timer.Family, timer.Tag)

const tag = "emit" // Tags can be arbitrary strings, but we're associating behavior with this tag in this method.

// Check which timer has fired.
switch timer.Family {
case s.OutputState.Family:
switch timer.Tag {
case "":
// Timers can be set within the OnTimer method.
// In this case the emit tag timer to fire in 5 seconds.
s.OutputState.Set(tp, ts.ToTime().Add(5*time.Second), timers.WithTag(tag))
case tag:
// When the emit tag fires, read the batched data.
es, ok, err := s.ElementBag.Read(sp)
if err != nil {
log.Errorf(ctx, "error reading ElementBag: %v", err)
return
}
if !ok {
log.Infof(ctx, "No elements in bag.")
return
}
minTime, _, err := s.MinTime.Read(sp)
if err != nil {
log.Errorf(ctx, "error reading ElementBag: %v", err)
return
}
log.Infof(ctx, "Emitting %d elements", len(es))
for _, word := range es {
emit(beam.EventTime(minTime), key, word)
}
// Clean up the state that has been evicted.
s.ElementBag.Clear(sp)
s.MinTime.Clear(sp)
s.OutputState.ClearTag(tp, tag) // Clean up the fired timer tag. (Temporary workaround for a runner bug.)
}
}
}

func init() {
register.DoFn7x1[context.Context, beam.EventTime, state.Provider, timers.Provider, string, string, func(string, string), error](&Stateful{})
register.Emitter2[string, string]()
register.DoFn7x1[context.Context, beam.EventTime, state.Provider, timers.Provider, string, string, func(beam.EventTime, string, string), error](&Stateful{})
register.Emitter3[beam.EventTime, string, string]()
register.Emitter2[beam.EventTime, int64]()
register.Function1x2(toKeyedString)
}

func generateSequence(s beam.Scope, now time.Time, duration, interval time.Duration) beam.PCollection {
s = s.Scope("generateSequence")
def := beam.Create(s, periodic.NewSequenceDefinition(now, now.Add(duration), interval))
seq := periodic.Sequence(s, def)
return seq
}

func toKeyedString(b int64) (string, string) {
return "test", fmt.Sprintf("%03d", b)
}

func main() {
Expand All @@ -110,26 +147,11 @@ func main() {

ctx := context.Background()

p := beam.NewPipeline()
s := p.Root()

out := periodic.Impulse(s, time.Now(), time.Now().Add(5*time.Minute), 5*time.Second, true)

intOut := beam.ParDo(s, func(b []byte) int64 {
var val int64
buf := bytes.NewReader(b)
binary.Read(buf, binary.BigEndian, &val)
return val
}, out)

str := beam.ParDo(s, func(b int64) string {
return fmt.Sprintf("%03d", b)
}, intOut)
p, s := beam.NewPipelineWithRoot()

keyed := beam.ParDo(s, func(ctx context.Context, ts beam.EventTime, s string) (string, string) {
return "test", s
}, str)
out := generateSequence(s, time.Now(), 1*time.Minute, 5*time.Second)

keyed := beam.ParDo(s, toKeyedString, out)
timed := beam.ParDo(s, NewStateful(), keyed)
debug.Printf(s, "post stateful: %v", timed)

Expand Down
1 change: 0 additions & 1 deletion sdks/go/pkg/beam/core/graph/edge.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,6 @@ type MultiEdge struct {
DoFn *DoFn // ParDo
RestrictionCoder *coder.Coder // SplittableParDo
StateCoders map[string]*coder.Coder // Stateful ParDo
TimerCoders *coder.Coder // Stateful ParDo
CombineFn *CombineFn // Combine
AccumCoder *coder.Coder // Combine
Value []byte // Impulse
Expand Down
33 changes: 22 additions & 11 deletions sdks/go/pkg/beam/core/runtime/exec/coder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1228,8 +1228,7 @@ func EncodeWindowedValueHeader(enc WindowEncoder, ws []typex.Window, t typex.Eve
if err := enc.Encode(ws, w); err != nil {
return err
}
err := coder.EncodePane(p, w)
return err
return coder.EncodePane(p, w)
}

// DecodeWindowedValueHeader deserializes a windowed value header.
Expand Down Expand Up @@ -1257,49 +1256,61 @@ func DecodeWindowedValueHeader(dec WindowDecoder, r io.Reader) ([]typex.Window,
}

// encodeTimer encodes a TimerRecv into a byte stream.
// Avoids partial writes to provided writer on encoding errors.
func encodeTimer(elm ElementEncoder, win WindowEncoder, tm TimerRecv, w io.Writer) error {
var b bytes.Buffer
err := elm.Encode(tm.Key, &b)
if err != nil {
return errors.WithContext(err, "error encoding key")
}

if err := coder.EncodeStringUTF8(tm.Tag, &b); err != nil {
if err := encodeTimerSuffix(win, tm, &b); err != nil {
return err
}
w.Write(b.Bytes())

return nil
}

// encodeTimerSuffix enccodes the timer directly to the provided writer.
func encodeTimerSuffix(win WindowEncoder, tm TimerRecv, w io.Writer) error {
if err := coder.EncodeStringUTF8(tm.Tag, w); err != nil {
return errors.WithContext(err, "error encoding tag")
}

if err := win.Encode(tm.Windows, &b); err != nil {
if err := win.Encode(tm.Windows, w); err != nil {
return errors.WithContext(err, "error encoding window")
}

if err := coder.EncodeBool(tm.Clear, &b); err != nil {
if err := coder.EncodeBool(tm.Clear, w); err != nil {
return errors.WithContext(err, "error encoding clear bit")
}

if !tm.Clear {
if err := coder.EncodeEventTime(tm.FireTimestamp, &b); err != nil {
if err := coder.EncodeEventTime(tm.FireTimestamp, w); err != nil {
return errors.WithContext(err, "error encoding fire timestamp")
}
if err := coder.EncodeEventTime(tm.HoldTimestamp, &b); err != nil {
if err := coder.EncodeEventTime(tm.HoldTimestamp, w); err != nil {
return errors.WithContext(err, "error encoding hold timestamp")
}
if err := coder.EncodePane(tm.Pane, &b); err != nil {
if err := coder.EncodePane(tm.Pane, w); err != nil {
return errors.WithContext(err, "error encoding paneinfo")
}
}
w.Write(b.Bytes())

return nil
}

// decodeTimer decodes timer byte encoded with standard timer coder spec.
func decodeTimer(dec ElementDecoder, win WindowDecoder, r io.Reader) (TimerRecv, error) {
tm := TimerRecv{}
key, err := dec.Decode(r)
var keyBuf bytes.Buffer
tr := io.TeeReader(r, &keyBuf)
key, err := dec.Decode(tr)
if err != nil {
return tm, errors.WithContext(err, "error decoding key")
}
tm.Key = key
tm.KeyString = keyBuf.String()

s, err := coder.DecodeStringUTF8(r)
if err != nil && err != io.EOF {
Expand Down
6 changes: 2 additions & 4 deletions sdks/go/pkg/beam/core/runtime/exec/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,10 +263,8 @@ func (n *DataSource) Process(ctx context.Context) ([]*Checkpoint, error) {
}
},
func(bcr *byteCountReader, ptransformID, timerFamilyID string) error {

if fn, ok := n.OnTimerTransforms[ptransformID].Fn.OnTimerFn(); ok {
_, err := n.OnTimerTransforms[ptransformID].InvokeTimerFn(ctx, fn, timerFamilyID, bcr)
if err != nil {
if node, ok := n.OnTimerTransforms[ptransformID]; ok {
if err := node.ProcessTimers(timerFamilyID, bcr); err != nil {
log.Warnf(ctx, "expected transform %v to have an OnTimer method attached to handle"+
"Timer Family ID: %v callback, but it did not. Please file an issue with Apache Beam"+
"if you have defined OnTimer method with reproducible code at https://github.com/apache/beam/issues", ptransformID, timerFamilyID)
Expand Down
23 changes: 22 additions & 1 deletion sdks/go/pkg/beam/core/runtime/exec/datasource_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package exec

import (
"bytes"
"context"
"fmt"
"io"
Expand Down Expand Up @@ -1020,6 +1021,8 @@ func runOnRoots(ctx context.Context, t *testing.T, p *Plan, name string, mthd fu

type TestDataManager struct {
Ch chan Elements

TimerWrites map[string]*bytes.Buffer
}

func (dm *TestDataManager) OpenElementChan(ctx context.Context, id StreamID, expectedTimerTransforms []string) (<-chan Elements, error) {
Expand All @@ -1031,9 +1034,27 @@ func (dm *TestDataManager) OpenWrite(ctx context.Context, id StreamID) (io.Write
}

func (dm *TestDataManager) OpenTimerWrite(ctx context.Context, id StreamID, family string) (io.WriteCloser, error) {
return nil, nil
if dm.TimerWrites == nil {
dm.TimerWrites = map[string]*bytes.Buffer{}
}
buf, ok := dm.TimerWrites[family]
if !ok {
buf = &bytes.Buffer{}
dm.TimerWrites[family] = buf
}
return struct {
*bytes.Buffer
io.Closer
}{
Buffer: buf,
Closer: noopCloser{},
}, nil
}

type noopCloser struct{}

func (noopCloser) Close() error { return nil }

type chanWriter struct {
Ch chan Elements
Buf []byte
Expand Down
8 changes: 2 additions & 6 deletions sdks/go/pkg/beam/core/runtime/exec/fn.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ type InvokeOpts struct {
we sdf.WatermarkEstimator
sa UserStateAdapter
sr StateReader
ta UserTimerAdapter
ta *userTimerAdapter
tm DataManager
extra []any
}
Expand Down Expand Up @@ -242,11 +242,7 @@ func (n *invoker) invokeWithOpts(ctx context.Context, pn typex.PaneInfo, ws []ty
}

if n.tpIdx >= 0 {
tp, err := opts.ta.NewTimerProvider(ctx, opts.tm, ts, ws, opts.opt)
if err != nil {
return nil, err
}
n.tp = &tp
n.tp = opts.ta.NewTimerProvider(pn, ws)
args[n.tpIdx] = n.tp
}

Expand Down
Loading