Skip to content

Commit

Permalink
Skip write to time_series_v4 table for the same fingerprint in curren… (
Browse files Browse the repository at this point in the history
  • Loading branch information
srikanthccv authored Jun 13, 2024
1 parent b519ab7 commit f60d60d
Show file tree
Hide file tree
Showing 3 changed files with 18 additions and 42 deletions.
57 changes: 15 additions & 42 deletions exporter/clickhousemetricsexporter/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
clickhouse "github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/google/uuid"
"github.com/jellydator/ttlcache/v3"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -64,6 +65,8 @@ type clickHouse struct {
database string
maxTimeSeriesInQuery int

cache *ttlcache.Cache[uint64, bool]

timeSeriesRW sync.RWMutex
// Maintains the lookup map for fingerprints that are
// written to time series table. This map is used to eliminate the
Expand Down Expand Up @@ -113,11 +116,19 @@ func NewClickHouse(params *ClickHouseParams) (base.Storage, error) {
return nil, fmt.Errorf("could not connect to clickhouse: %s", err)
}

cache := ttlcache.New[uint64, bool](
ttlcache.WithTTL[uint64, bool](45*time.Minute),
ttlcache.WithDisableTouchOnHit[uint64, bool](),
)

go cache.Start()

ch := &clickHouse{
conn: conn,
l: l,
database: options.Auth.Database,
maxTimeSeriesInQuery: params.MaxTimeSeriesInQuery,
cache: cache,

timeSeries: make(map[uint64]struct{}, 8192),

Expand Down Expand Up @@ -295,48 +306,6 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr
return err
}

// Write to distributed_time_series_v3 table
err = func() error {

statement, err := ch.conn.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s (env, temporality, metric_name, fingerprint, timestamp_ms, labels, description, unit, type, is_monotonic) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", ch.database, DISTRIBUTED_TIME_SERIES_TABLE_V3), driver.WithReleaseConnection())
if err != nil {
return err
}
timestamp := model.Now().Time().UnixMilli()
for fingerprint, labels := range newTimeSeries {
encodedLabels := string(marshalLabels(labels, make([]byte, 0, 128)))
meta := metricNameToMeta[fingerprintToName[fingerprint][nameLabel]]
err = statement.Append(
fingerprintToName[fingerprint][envLabel],
meta.Temporality.String(),
fingerprintToName[fingerprint][nameLabel],
fingerprint,
timestamp,
encodedLabels,
meta.Description,
meta.Unit,
meta.Typ.String(),
meta.IsMonotonic,
)
if err != nil {
return err
}
}

start := time.Now()
err = statement.Send()
ctx, _ = tag.New(ctx,
tag.Upsert(exporterKey, string(component.DataTypeMetrics.String())),
tag.Upsert(tableKey, DISTRIBUTED_TIME_SERIES_TABLE_V3),
)
stats.Record(ctx, writeLatencyMillis.M(int64(time.Since(start).Milliseconds())))
return err
}()

if err != nil {
return err
}

metrics := map[string]usage.Metric{}
err = func() error {
ctx := context.Background()
Expand Down Expand Up @@ -447,6 +416,9 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr
unixMilli := model.Now().Time().UnixMilli() / 3600000 * 3600000

for fingerprint, labels := range timeSeries {
if ch.cache.Get(fingerprint) != nil && ch.cache.Get(fingerprint).Value() {
continue
}
encodedLabels := string(marshalLabels(labels, make([]byte, 0, 128)))
meta := metricNameToMeta[fingerprintToName[fingerprint][nameLabel]]
err = statement.Append(
Expand All @@ -464,6 +436,7 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr
if err != nil {
return err
}
ch.cache.Set(fingerprint, true, ttlcache.DefaultTTL)
}

start := time.Now()
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ require (
github.com/gorilla/mux v1.8.1
github.com/gorilla/websocket v1.5.0
github.com/hashicorp/golang-lru v1.0.2
github.com/jellydator/ttlcache/v3 v3.2.0
github.com/knadh/koanf v1.5.0
github.com/lightstep/go-expohisto v1.0.0
github.com/oklog/ulid v1.3.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,8 @@ github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh6
github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs=
github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY=
github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc=
github.com/jellydator/ttlcache/v3 v3.2.0 h1:6lqVJ8X3ZaUwvzENqPAobDsXNExfUJd61u++uW8a3LE=
github.com/jellydator/ttlcache/v3 v3.2.0/go.mod h1:hi7MGFdMAwZna5n2tuvh63DvFLzVKySzCVW6+0gA2n4=
github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik=
github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
Expand Down

0 comments on commit f60d60d

Please sign in to comment.