Skip to content

Commit

Permalink
[prism] add windowing strategy (#25518)
Browse files Browse the repository at this point in the history
Co-authored-by: lostluck <13907733+lostluck@users.noreply.github.com>
  • Loading branch information
lostluck and lostluck committed Feb 18, 2023
1 parent de7eb2d commit 977e531
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 0 deletions.
50 changes: 50 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/engine/strategy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package engine

import (
"fmt"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
)

type winStrat interface {
EarliestCompletion(typex.Window) mtime.Time
}

type defaultStrat struct{}

func (ws defaultStrat) EarliestCompletion(w typex.Window) mtime.Time {
return w.MaxTimestamp()
}

func (defaultStrat) String() string {
return "default"
}

type sessionStrat struct {
GapSize time.Duration
}

func (ws sessionStrat) EarliestCompletion(w typex.Window) mtime.Time {
return w.MaxTimestamp().Add(ws.GapSize)
}

func (ws sessionStrat) String() string {
return fmt.Sprintf("session[GapSize:%v]", ws.GapSize)
}
45 changes: 45 additions & 0 deletions sdks/go/pkg/beam/runners/prism/internal/engine/strategy_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package engine

import (
"testing"
"time"

"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/mtime"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/graph/window"
"github.com/apache/beam/sdks/v2/go/pkg/beam/core/typex"
)

func TestEarliestCompletion(t *testing.T) {
tests := []struct {
strat winStrat
input typex.Window
want mtime.Time
}{
{defaultStrat{}, window.GlobalWindow{}, mtime.EndOfGlobalWindowTime},
{defaultStrat{}, window.IntervalWindow{Start: 0, End: 4}, 3},
{defaultStrat{}, window.IntervalWindow{Start: mtime.MinTimestamp, End: mtime.MaxTimestamp}, mtime.MaxTimestamp - 1},
{sessionStrat{}, window.IntervalWindow{Start: 0, End: 4}, 3},
{sessionStrat{GapSize: 3 * time.Millisecond}, window.IntervalWindow{Start: 0, End: 4}, 6},
}

for _, test := range tests {
if got, want := test.strat.EarliestCompletion(test.input), test.want; got != want {
t.Errorf("%v.EarliestCompletion(%v)) = %v, want %v", test.strat, test.input, got, want)
}
}
}

0 comments on commit 977e531

Please sign in to comment.