Skip to content

Commit

Permalink
TimeSeries insertion filters for close samples (#3003)
Browse files Browse the repository at this point in the history
* TimeSeries insertion filters for close samples

* fix

* fix

* fix

* fix

* fix

---------

Co-authored-by: Vladyslav Vildanov <117659936+vladvildanov@users.noreply.github.com>
  • Loading branch information
ofekshenawa and vladvildanov committed Jul 11, 2024
1 parent 7539858 commit 8a0c59b
Show file tree
Hide file tree
Showing 2 changed files with 186 additions and 19 deletions.
56 changes: 42 additions & 14 deletions timeseries_commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,25 +40,32 @@ type TimeseriesCmdable interface {
}

type TSOptions struct {
Retention int
ChunkSize int
Encoding string
DuplicatePolicy string
Labels map[string]string
Retention int
ChunkSize int
Encoding string
DuplicatePolicy string
Labels map[string]string
IgnoreMaxTimeDiff int64
IgnoreMaxValDiff float64
}
type TSIncrDecrOptions struct {
Timestamp int64
Retention int
ChunkSize int
Uncompressed bool
Labels map[string]string
Timestamp int64
Retention int
ChunkSize int
Uncompressed bool
DuplicatePolicy string
Labels map[string]string
IgnoreMaxTimeDiff int64
IgnoreMaxValDiff float64
}

type TSAlterOptions struct {
Retention int
ChunkSize int
DuplicatePolicy string
Labels map[string]string
Retention int
ChunkSize int
DuplicatePolicy string
Labels map[string]string
IgnoreMaxTimeDiff int64
IgnoreMaxValDiff float64
}

type TSCreateRuleOptions struct {
Expand Down Expand Up @@ -223,6 +230,9 @@ func (c cmdable) TSAddWithArgs(ctx context.Context, key string, timestamp interf
args = append(args, label, value)
}
}
if options.IgnoreMaxTimeDiff != 0 || options.IgnoreMaxValDiff != 0 {
args = append(args, "IGNORE", options.IgnoreMaxTimeDiff, options.IgnoreMaxValDiff)
}
}
cmd := NewIntCmd(ctx, args...)
_ = c(ctx, cmd)
Expand Down Expand Up @@ -264,6 +274,9 @@ func (c cmdable) TSCreateWithArgs(ctx context.Context, key string, options *TSOp
args = append(args, label, value)
}
}
if options.IgnoreMaxTimeDiff != 0 || options.IgnoreMaxValDiff != 0 {
args = append(args, "IGNORE", options.IgnoreMaxTimeDiff, options.IgnoreMaxValDiff)
}
}
cmd := NewStatusCmd(ctx, args...)
_ = c(ctx, cmd)
Expand Down Expand Up @@ -292,6 +305,9 @@ func (c cmdable) TSAlter(ctx context.Context, key string, options *TSAlterOption
args = append(args, label, value)
}
}
if options.IgnoreMaxTimeDiff != 0 || options.IgnoreMaxValDiff != 0 {
args = append(args, "IGNORE", options.IgnoreMaxTimeDiff, options.IgnoreMaxValDiff)
}
}
cmd := NewStatusCmd(ctx, args...)
_ = c(ctx, cmd)
Expand Down Expand Up @@ -351,12 +367,18 @@ func (c cmdable) TSIncrByWithArgs(ctx context.Context, key string, timestamp flo
if options.Uncompressed {
args = append(args, "UNCOMPRESSED")
}
if options.DuplicatePolicy != "" {
args = append(args, "DUPLICATE_POLICY", options.DuplicatePolicy)
}
if options.Labels != nil {
args = append(args, "LABELS")
for label, value := range options.Labels {
args = append(args, label, value)
}
}
if options.IgnoreMaxTimeDiff != 0 || options.IgnoreMaxValDiff != 0 {
args = append(args, "IGNORE", options.IgnoreMaxTimeDiff, options.IgnoreMaxValDiff)
}
}
cmd := NewIntCmd(ctx, args...)
_ = c(ctx, cmd)
Expand Down Expand Up @@ -391,12 +413,18 @@ func (c cmdable) TSDecrByWithArgs(ctx context.Context, key string, timestamp flo
if options.Uncompressed {
args = append(args, "UNCOMPRESSED")
}
if options.DuplicatePolicy != "" {
args = append(args, "DUPLICATE_POLICY", options.DuplicatePolicy)
}
if options.Labels != nil {
args = append(args, "LABELS")
for label, value := range options.Labels {
args = append(args, label, value)
}
}
if options.IgnoreMaxTimeDiff != 0 || options.IgnoreMaxValDiff != 0 {
args = append(args, "IGNORE", options.IgnoreMaxTimeDiff, options.IgnoreMaxValDiff)
}
}
cmd := NewIntCmd(ctx, args...)
_ = c(ctx, cmd)
Expand Down
149 changes: 144 additions & 5 deletions timeseries_commands_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
Expect(client.Close()).NotTo(HaveOccurred())
})

It("should TSCreate and TSCreateWithArgs", Label("timeseries", "tscreate", "tscreateWithArgs"), func() {
It("should TSCreate and TSCreateWithArgs", Label("timeseries", "tscreate", "tscreateWithArgs", "NonRedisEnterprise"), func() {
result, err := client.TSCreate(ctx, "1").Result()
Expect(err).NotTo(HaveOccurred())
Expect(result).To(BeEquivalentTo("OK"))
Expand Down Expand Up @@ -62,10 +62,60 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
resultInfo, err = client.TSInfo(ctx, keyName).Result()
Expect(err).NotTo(HaveOccurred())
Expect(strings.ToUpper(resultInfo["duplicatePolicy"].(string))).To(BeEquivalentTo(dup))

}
// Test insertion filters
opt = &redis.TSOptions{IgnoreMaxTimeDiff: 5, DuplicatePolicy: "LAST", IgnoreMaxValDiff: 10.0}
result, err = client.TSCreateWithArgs(ctx, "ts-if-1", opt).Result()
Expect(err).NotTo(HaveOccurred())
Expect(result).To(BeEquivalentTo("OK"))
resultAdd, err := client.TSAdd(ctx, "ts-if-1", 1000, 1.0).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo(1000))
resultAdd, err = client.TSAdd(ctx, "ts-if-1", 1010, 11.0).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo(1010))
resultAdd, err = client.TSAdd(ctx, "ts-if-1", 1013, 10.0).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo(1010))
resultAdd, err = client.TSAdd(ctx, "ts-if-1", 1020, 11.5).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo(1020))
resultAdd, err = client.TSAdd(ctx, "ts-if-1", 1021, 22.0).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo(1021))

rangePoints, err := client.TSRange(ctx, "ts-if-1", 1000, 1021).Result()
Expect(err).NotTo(HaveOccurred())
Expect(len(rangePoints)).To(BeEquivalentTo(4))
Expect(rangePoints).To(BeEquivalentTo([]redis.TSTimestampValue{
{Timestamp: 1000, Value: 1.0},
{Timestamp: 1010, Value: 11.0},
{Timestamp: 1020, Value: 11.5},
{Timestamp: 1021, Value: 22.0}}))
// Test insertion filters with other duplicate policy
opt = &redis.TSOptions{IgnoreMaxTimeDiff: 5, IgnoreMaxValDiff: 10.0}
result, err = client.TSCreateWithArgs(ctx, "ts-if-2", opt).Result()
Expect(err).NotTo(HaveOccurred())
Expect(result).To(BeEquivalentTo("OK"))
resultAdd1, err := client.TSAdd(ctx, "ts-if-1", 1000, 1.0).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd1).To(BeEquivalentTo(1000))
resultAdd1, err = client.TSAdd(ctx, "ts-if-1", 1010, 11.0).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd1).To(BeEquivalentTo(1010))
resultAdd1, err = client.TSAdd(ctx, "ts-if-1", 1013, 10.0).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd1).To(BeEquivalentTo(1013))

rangePoints, err = client.TSRange(ctx, "ts-if-1", 1000, 1013).Result()
Expect(err).NotTo(HaveOccurred())
Expect(len(rangePoints)).To(BeEquivalentTo(3))
Expect(rangePoints).To(BeEquivalentTo([]redis.TSTimestampValue{
{Timestamp: 1000, Value: 1.0},
{Timestamp: 1010, Value: 11.0},
{Timestamp: 1013, Value: 10.0}}))
})
It("should TSAdd and TSAddWithArgs", Label("timeseries", "tsadd", "tsaddWithArgs"), func() {
It("should TSAdd and TSAddWithArgs", Label("timeseries", "tsadd", "tsaddWithArgs", "NonRedisEnterprise"), func() {
result, err := client.TSAdd(ctx, "1", 1, 1).Result()
Expect(err).NotTo(HaveOccurred())
Expect(result).To(BeEquivalentTo(1))
Expand Down Expand Up @@ -138,9 +188,23 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
resultGet, err = client.TSGet(ctx, "tsami-1").Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultGet.Value).To(BeEquivalentTo(5))
// Insertion filters
opt = &redis.TSOptions{IgnoreMaxTimeDiff: 5, IgnoreMaxValDiff: 10.0, DuplicatePolicy: "LAST"}
result, err = client.TSAddWithArgs(ctx, "ts-if-1", 1000, 1.0, opt).Result()
Expect(err).NotTo(HaveOccurred())
Expect(result).To(BeEquivalentTo(1000))

result, err = client.TSAddWithArgs(ctx, "ts-if-1", 1004, 3.0, opt).Result()
Expect(err).NotTo(HaveOccurred())
Expect(result).To(BeEquivalentTo(1000))

rangePoints, err := client.TSRange(ctx, "ts-if-1", 1000, 1004).Result()
Expect(err).NotTo(HaveOccurred())
Expect(len(rangePoints)).To(BeEquivalentTo(1))
Expect(rangePoints).To(BeEquivalentTo([]redis.TSTimestampValue{{Timestamp: 1000, Value: 1.0}}))
})

It("should TSAlter", Label("timeseries", "tsalter"), func() {
It("should TSAlter", Label("timeseries", "tsalter", "NonRedisEnterprise"), func() {
result, err := client.TSCreate(ctx, "1").Result()
Expect(err).NotTo(HaveOccurred())
Expect(result).To(BeEquivalentTo("OK"))
Expand Down Expand Up @@ -179,6 +243,33 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
resultInfo, err = client.TSInfo(ctx, "1").Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultInfo["duplicatePolicy"]).To(BeEquivalentTo("min"))
// Test insertion filters
resultAdd, err := client.TSAdd(ctx, "ts-if-1", 1000, 1.0).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo(1000))
resultAdd, err = client.TSAdd(ctx, "ts-if-1", 1010, 11.0).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo(1010))
resultAdd, err = client.TSAdd(ctx, "ts-if-1", 1013, 10.0).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo(1013))

alterOpt := &redis.TSAlterOptions{IgnoreMaxTimeDiff: 5, IgnoreMaxValDiff: 10.0, DuplicatePolicy: "LAST"}
resultAlter, err = client.TSAlter(ctx, "ts-if-1", alterOpt).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAlter).To(BeEquivalentTo("OK"))

resultAdd, err = client.TSAdd(ctx, "ts-if-1", 1015, 11.5).Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultAdd).To(BeEquivalentTo(1013))

rangePoints, err := client.TSRange(ctx, "ts-if-1", 1000, 1013).Result()
Expect(err).NotTo(HaveOccurred())
Expect(len(rangePoints)).To(BeEquivalentTo(3))
Expect(rangePoints).To(BeEquivalentTo([]redis.TSTimestampValue{
{Timestamp: 1000, Value: 1.0},
{Timestamp: 1010, Value: 11.0},
{Timestamp: 1013, Value: 10.0}}))
})

It("should TSCreateRule and TSDeleteRule", Label("timeseries", "tscreaterule", "tsdeleterule"), func() {
Expand Down Expand Up @@ -216,7 +307,7 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
Expect(resultInfo["rules"]).To(BeEquivalentTo(map[interface{}]interface{}{}))
})

It("should TSIncrBy, TSIncrByWithArgs, TSDecrBy and TSDecrByWithArgs", Label("timeseries", "tsincrby", "tsdecrby", "tsincrbyWithArgs", "tsdecrbyWithArgs"), func() {
It("should TSIncrBy, TSIncrByWithArgs, TSDecrBy and TSDecrByWithArgs", Label("timeseries", "tsincrby", "tsdecrby", "tsincrbyWithArgs", "tsdecrbyWithArgs", "NonRedisEnterprise"), func() {
for i := 0; i < 100; i++ {
_, err := client.TSIncrBy(ctx, "1", 1).Result()
Expect(err).NotTo(HaveOccurred())
Expand Down Expand Up @@ -277,6 +368,54 @@ var _ = Describe("RedisTimeseries commands", Label("timeseries"), func() {
resultInfo, err = client.TSInfo(ctx, "4").Result()
Expect(err).NotTo(HaveOccurred())
Expect(resultInfo["chunkSize"]).To(BeEquivalentTo(128))

// Test insertion filters INCRBY
opt = &redis.TSIncrDecrOptions{Timestamp: 1000, IgnoreMaxTimeDiff: 5, IgnoreMaxValDiff: 10.0, DuplicatePolicy: "LAST"}
res, err := client.TSIncrByWithArgs(ctx, "ts-if-1", 1.0, opt).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(BeEquivalentTo(1000))

res, err = client.TSIncrByWithArgs(ctx, "ts-if-1", 3.0, &redis.TSIncrDecrOptions{Timestamp: 1000}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(BeEquivalentTo(1000))

rangePoints, err := client.TSRange(ctx, "ts-if-1", 1000, 1004).Result()
Expect(err).NotTo(HaveOccurred())
Expect(len(rangePoints)).To(BeEquivalentTo(1))
Expect(rangePoints).To(BeEquivalentTo([]redis.TSTimestampValue{{Timestamp: 1000, Value: 1.0}}))

res, err = client.TSIncrByWithArgs(ctx, "ts-if-1", 10.1, &redis.TSIncrDecrOptions{Timestamp: 1000}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(BeEquivalentTo(1000))

rangePoints, err = client.TSRange(ctx, "ts-if-1", 1000, 1004).Result()
Expect(err).NotTo(HaveOccurred())
Expect(len(rangePoints)).To(BeEquivalentTo(1))
Expect(rangePoints).To(BeEquivalentTo([]redis.TSTimestampValue{{Timestamp: 1000, Value: 11.1}}))

// Test insertion filters DECRBY
opt = &redis.TSIncrDecrOptions{Timestamp: 1000, IgnoreMaxTimeDiff: 5, IgnoreMaxValDiff: 10.0, DuplicatePolicy: "LAST"}
res, err = client.TSDecrByWithArgs(ctx, "ts-if-2", 1.0, opt).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(BeEquivalentTo(1000))

res, err = client.TSDecrByWithArgs(ctx, "ts-if-2", 3.0, &redis.TSIncrDecrOptions{Timestamp: 1000}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(BeEquivalentTo(1000))

rangePoints, err = client.TSRange(ctx, "ts-if-2", 1000, 1004).Result()
Expect(err).NotTo(HaveOccurred())
Expect(len(rangePoints)).To(BeEquivalentTo(1))
Expect(rangePoints).To(BeEquivalentTo([]redis.TSTimestampValue{{Timestamp: 1000, Value: -1.0}}))

res, err = client.TSDecrByWithArgs(ctx, "ts-if-2", 10.1, &redis.TSIncrDecrOptions{Timestamp: 1000}).Result()
Expect(err).NotTo(HaveOccurred())
Expect(res).To(BeEquivalentTo(1000))

rangePoints, err = client.TSRange(ctx, "ts-if-2", 1000, 1004).Result()
Expect(err).NotTo(HaveOccurred())
Expect(len(rangePoints)).To(BeEquivalentTo(1))
Expect(rangePoints).To(BeEquivalentTo([]redis.TSTimestampValue{{Timestamp: 1000, Value: -11.1}}))
})

It("should TSGet", Label("timeseries", "tsget"), func() {
Expand Down

0 comments on commit 8a0c59b

Please sign in to comment.