diff --git a/exporter/clickhousemetricsexporter/clickhouse.go b/exporter/clickhousemetricsexporter/clickhouse.go index dfcbe889..e0a42f34 100644 --- a/exporter/clickhousemetricsexporter/clickhouse.go +++ b/exporter/clickhousemetricsexporter/clickhouse.go @@ -28,6 +28,7 @@ import ( clickhouse "github.com/ClickHouse/clickhouse-go/v2" "github.com/ClickHouse/clickhouse-go/v2/lib/driver" "github.com/google/uuid" + "github.com/jellydator/ttlcache/v3" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/sirupsen/logrus" @@ -64,6 +65,8 @@ type clickHouse struct { database string maxTimeSeriesInQuery int + cache *ttlcache.Cache[uint64, bool] + timeSeriesRW sync.RWMutex // Maintains the lookup map for fingerprints that are // written to time series table. This map is used to eliminate the @@ -113,11 +116,19 @@ func NewClickHouse(params *ClickHouseParams) (base.Storage, error) { return nil, fmt.Errorf("could not connect to clickhouse: %s", err) } + cache := ttlcache.New[uint64, bool]( + ttlcache.WithTTL[uint64, bool](45*time.Minute), + ttlcache.WithDisableTouchOnHit[uint64, bool](), + ) + + go cache.Start() + ch := &clickHouse{ conn: conn, l: l, database: options.Auth.Database, maxTimeSeriesInQuery: params.MaxTimeSeriesInQuery, + cache: cache, timeSeries: make(map[uint64]struct{}, 8192), @@ -295,48 +306,6 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr return err } - // Write to distributed_time_series_v3 table - err = func() error { - - statement, err := ch.conn.PrepareBatch(ctx, fmt.Sprintf("INSERT INTO %s.%s (env, temporality, metric_name, fingerprint, timestamp_ms, labels, description, unit, type, is_monotonic) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", ch.database, DISTRIBUTED_TIME_SERIES_TABLE_V3), driver.WithReleaseConnection()) - if err != nil { - return err - } - timestamp := model.Now().Time().UnixMilli() - for fingerprint, labels := range newTimeSeries { - encodedLabels := string(marshalLabels(labels, make([]byte, 0, 128))) - meta := metricNameToMeta[fingerprintToName[fingerprint][nameLabel]] - err = statement.Append( - fingerprintToName[fingerprint][envLabel], - meta.Temporality.String(), - fingerprintToName[fingerprint][nameLabel], - fingerprint, - timestamp, - encodedLabels, - meta.Description, - meta.Unit, - meta.Typ.String(), - meta.IsMonotonic, - ) - if err != nil { - return err - } - } - - start := time.Now() - err = statement.Send() - ctx, _ = tag.New(ctx, - tag.Upsert(exporterKey, string(component.DataTypeMetrics.String())), - tag.Upsert(tableKey, DISTRIBUTED_TIME_SERIES_TABLE_V3), - ) - stats.Record(ctx, writeLatencyMillis.M(int64(time.Since(start).Milliseconds()))) - return err - }() - - if err != nil { - return err - } - metrics := map[string]usage.Metric{} err = func() error { ctx := context.Background() @@ -447,6 +416,9 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr unixMilli := model.Now().Time().UnixMilli() / 3600000 * 3600000 for fingerprint, labels := range timeSeries { + if ch.cache.Get(fingerprint) != nil && ch.cache.Get(fingerprint).Value() { + continue + } encodedLabels := string(marshalLabels(labels, make([]byte, 0, 128))) meta := metricNameToMeta[fingerprintToName[fingerprint][nameLabel]] err = statement.Append( @@ -464,6 +436,7 @@ func (ch *clickHouse) Write(ctx context.Context, data *prompb.WriteRequest, metr if err != nil { return err } + ch.cache.Set(fingerprint, true, ttlcache.DefaultTTL) } start := time.Now() diff --git a/go.mod b/go.mod index d07e970b..f1484357 100644 --- a/go.mod +++ b/go.mod @@ -20,6 +20,7 @@ require ( github.com/gorilla/mux v1.8.1 github.com/gorilla/websocket v1.5.0 github.com/hashicorp/golang-lru v1.0.2 + github.com/jellydator/ttlcache/v3 v3.2.0 github.com/knadh/koanf v1.5.0 github.com/lightstep/go-expohisto v1.0.0 github.com/oklog/ulid v1.3.1 diff --git a/go.sum b/go.sum index d1077693..bca96a6b 100644 --- a/go.sum +++ b/go.sum @@ -753,6 +753,8 @@ github.com/jcmturner/gokrb5/v8 v8.4.4 h1:x1Sv4HaTpepFkXbt2IkL29DXRf8sOfZXo8eRKh6 github.com/jcmturner/gokrb5/v8 v8.4.4/go.mod h1:1btQEpgT6k+unzCwX1KdWMEwPPkkgBtP+F6aCACiMrs= github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= +github.com/jellydator/ttlcache/v3 v3.2.0 h1:6lqVJ8X3ZaUwvzENqPAobDsXNExfUJd61u++uW8a3LE= +github.com/jellydator/ttlcache/v3 v3.2.0/go.mod h1:hi7MGFdMAwZna5n2tuvh63DvFLzVKySzCVW6+0gA2n4= github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik= github.com/jmespath/go-jmespath v0.4.0 h1:BEgLn5cpjn8UN1mAw4NjwDrS35OdebyEtFe+9YPoQUg= github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=