Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(concurrentbatchprocessor): adds mib suffix to max bytes setting #121

Merged
merged 2 commits into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func newBatchProcessor(set processor.CreateSettings, cfg *Config, batchFunc func
shutdownC: make(chan struct{}, 1),
metadataKeys: mks,
metadataLimit: int(cfg.MetadataCardinalityLimit),
sem: semaphore.NewWeighted(int64(cfg.MaxInFlightBytes)),
sem: semaphore.NewWeighted(int64(cfg.MaxInFlightBytesMiB)),
jmacd marked this conversation as resolved.
Show resolved Hide resolved
}
if len(bp.metadataKeys) == 0 {
bp.batcher = &singleShardBatcher{batcher: bp.newShard(nil)}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,9 +234,9 @@ func (bc *blockingConsumer) Capabilities() consumer.Capabilities {
return consumer.Capabilities{MutatesData: false}
}

// helper function to help determine a setting for cfg.MaxInFlightBytes based
// helper function to help determine a setting for cfg.MaxInFlightBytesMiB based
// on the number of requests and number of spans per request.
func calculateMaxInFlightBytes(numRequests, spansPerRequest int) uint32 {
func calculateMaxInFlightBytesMiB(numRequests, spansPerRequest int) uint32 {
sentResourceSpans := ptrace.NewTraces().ResourceSpans()
td := testdata.GenerateTraces(spansPerRequest)
spans := td.ResourceSpans().At(0).ScopeSpans().At(0).Spans()
Expand All @@ -257,7 +257,7 @@ func TestBatchProcessorCancelContext(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.SendBatchSize = 128
cfg.Timeout = 10 * time.Second
cfg.MaxInFlightBytes = calculateMaxInFlightBytes(requestCount, spansPerRequest)
cfg.MaxInFlightBytesMiB = calculateMaxInFlightBytesMiB(requestCount, spansPerRequest)
creationSet := processortest.NewNopCreateSettings()
creationSet.MetricsLevel = configtelemetry.LevelDetailed
bc := &blockingConsumer{blocking: make(chan struct{}, 1)}
Expand Down Expand Up @@ -292,7 +292,7 @@ func TestBatchProcessorCancelContext(t *testing.T) {
}, 5*time.Second, 10*time.Millisecond)

// semaphore should be fully acquired at this point.
assert.False(t, bp.batcher.(*singleShardBatcher).batcher.processor.sem.TryAcquire(int64(1)))
assert.False(t, bp.sem.TryAcquire(int64(1)))

wg.Add(1)
go func() {
Expand All @@ -307,14 +307,14 @@ func TestBatchProcessorCancelContext(t *testing.T) {
wg.Wait()

// check sending another request does not change the semaphore count, even after ConsumeTraces returns.
assert.False(t, bp.batcher.(*singleShardBatcher).batcher.processor.sem.TryAcquire(int64(1)))
assert.False(t, bp.sem.TryAcquire(int64(1)))

// signal to the blockingConsumer to return response to waiters.
bc.unblock()

// Semaphore should be released once all responses are returned. Confirm we can acquire MaxInFlightBytes bytes.
// Semaphore should be released once all responses are returned. Confirm we can acquire MaxInFlightBytesMiB bytes.
require.Eventually(t, func() bool {
return bp.batcher.(*singleShardBatcher).batcher.processor.sem.TryAcquire(int64(cfg.MaxInFlightBytes))
return bp.sem.TryAcquire(int64(cfg.MaxInFlightBytesMiB))
}, 5*time.Second, 10*time.Millisecond)
require.NoError(t, bp.Shutdown(context.Background()))
}
Expand Down Expand Up @@ -582,9 +582,9 @@ func TestBatchProcessorSentByTimeout(t *testing.T) {

func TestBatchProcessorTraceSendWhenClosing(t *testing.T) {
cfg := Config{
Timeout: 3 * time.Second,
SendBatchSize: 1000,
MaxInFlightBytes: defaultMaxBytes,
Timeout: 3 * time.Second,
SendBatchSize: 1000,
MaxInFlightBytesMiB: defaultMaxBytes,
}
sink := new(consumertest.TracesSink)

Expand Down Expand Up @@ -617,9 +617,9 @@ func TestBatchMetricProcessor_ReceivingData(t *testing.T) {
// Instantiate the batch processor with low config values to test data
// gets sent through the processor.
cfg := Config{
Timeout: 200 * time.Millisecond,
SendBatchSize: 50,
MaxInFlightBytes: defaultMaxBytes,
Timeout: 200 * time.Millisecond,
SendBatchSize: 50,
MaxInFlightBytesMiB: defaultMaxBytes,
}

requestCount := 100
Expand Down Expand Up @@ -683,9 +683,9 @@ func testBatchMetricProcessorBatchSize(t *testing.T, tel testTelemetry, useOtel
// Instantiate the batch processor with low config values to test data
// gets sent through the processor.
cfg := Config{
Timeout: 2 * time.Second,
SendBatchSize: 50,
MaxInFlightBytes: defaultMaxBytes,
Timeout: 2 * time.Second,
SendBatchSize: 50,
MaxInFlightBytesMiB: defaultMaxBytes,
}

requestCount := 100
Expand Down Expand Up @@ -757,9 +757,9 @@ func TestBatchMetrics_UnevenBatchMaxSize(t *testing.T) {

func TestBatchMetricsProcessor_Timeout(t *testing.T) {
cfg := Config{
Timeout: 100 * time.Millisecond,
SendBatchSize: 101,
MaxInFlightBytes: defaultMaxBytes,
Timeout: 100 * time.Millisecond,
SendBatchSize: 101,
MaxInFlightBytesMiB: defaultMaxBytes,
}
requestCount := 5
metricsPerRequest := 10
Expand Down Expand Up @@ -803,9 +803,9 @@ func TestBatchMetricsProcessor_Timeout(t *testing.T) {

func TestBatchMetricProcessor_Shutdown(t *testing.T) {
cfg := Config{
Timeout: 3 * time.Second,
SendBatchSize: 1000,
MaxInFlightBytes: defaultMaxBytes,
Timeout: 3 * time.Second,
SendBatchSize: 1000,
MaxInFlightBytesMiB: defaultMaxBytes,
}
requestCount := 5
metricsPerRequest := 10
Expand Down Expand Up @@ -896,20 +896,20 @@ func BenchmarkTraceSizeSpanCount(b *testing.B) {
func BenchmarkBatchMetricProcessor(b *testing.B) {
b.StopTimer()
cfg := Config{
Timeout: 100 * time.Millisecond,
SendBatchSize: 2000,
MaxInFlightBytes: defaultMaxBytes,
Timeout: 100 * time.Millisecond,
SendBatchSize: 2000,
MaxInFlightBytesMiB: defaultMaxBytes,
}
runMetricsProcessorBenchmark(b, cfg)
}

func BenchmarkMultiBatchMetricProcessor(b *testing.B) {
b.StopTimer()
cfg := Config{
Timeout: 100 * time.Millisecond,
SendBatchSize: 2000,
MetadataKeys: []string{"test", "test2"},
MaxInFlightBytes: defaultMaxBytes,
Timeout: 100 * time.Millisecond,
SendBatchSize: 2000,
MetadataKeys: []string{"test", "test2"},
MaxInFlightBytesMiB: defaultMaxBytes,
}
runMetricsProcessorBenchmark(b, cfg)
}
Expand Down Expand Up @@ -957,9 +957,9 @@ func TestBatchLogProcessor_ReceivingData(t *testing.T) {
// Instantiate the batch processor with low config values to test data
// gets sent through the processor.
cfg := Config{
Timeout: 200 * time.Millisecond,
SendBatchSize: 50,
MaxInFlightBytes: defaultMaxBytes,
Timeout: 200 * time.Millisecond,
SendBatchSize: 50,
MaxInFlightBytesMiB: defaultMaxBytes,
}

requestCount := 100
Expand Down Expand Up @@ -1023,9 +1023,9 @@ func testBatchLogProcessorBatchSize(t *testing.T, tel testTelemetry, useOtel boo
// Instantiate the batch processor with low config values to test data
// gets sent through the processor.
cfg := Config{
Timeout: 2 * time.Second,
SendBatchSize: 50,
MaxInFlightBytes: defaultMaxBytes,
Timeout: 2 * time.Second,
SendBatchSize: 50,
MaxInFlightBytesMiB: defaultMaxBytes,
}

requestCount := 100
Expand Down Expand Up @@ -1075,9 +1075,9 @@ func testBatchLogProcessorBatchSize(t *testing.T, tel testTelemetry, useOtel boo

func TestBatchLogsProcessor_Timeout(t *testing.T) {
cfg := Config{
Timeout: 3 * time.Second,
SendBatchSize: 100,
MaxInFlightBytes: defaultMaxBytes,
Timeout: 3 * time.Second,
SendBatchSize: 100,
MaxInFlightBytesMiB: defaultMaxBytes,
}
requestCount := 5
logsPerRequest := 10
Expand Down Expand Up @@ -1121,9 +1121,9 @@ func TestBatchLogsProcessor_Timeout(t *testing.T) {

func TestBatchLogProcessor_Shutdown(t *testing.T) {
cfg := Config{
Timeout: 3 * time.Second,
SendBatchSize: 1000,
MaxInFlightBytes: defaultMaxBytes,
Timeout: 3 * time.Second,
SendBatchSize: 1000,
MaxInFlightBytesMiB: defaultMaxBytes,
}
requestCount := 5
logsPerRequest := 10
Expand Down Expand Up @@ -1391,7 +1391,7 @@ func TestBatchZeroConfig(t *testing.T) {
// This is a no-op configuration. No need for a timer, no
// minimum, no mxaimum, just a pass through.
cfg := Config{
MaxInFlightBytes: defaultMaxBytes,
MaxInFlightBytesMiB: defaultMaxBytes,
}

require.NoError(t, cfg.Validate())
Expand Down Expand Up @@ -1433,8 +1433,8 @@ func TestBatchSplitOnly(t *testing.T) {
const logsPerRequest = 100

cfg := Config{
SendBatchMaxSize: maxBatch,
MaxInFlightBytes: defaultMaxBytes,
SendBatchMaxSize: maxBatch,
MaxInFlightBytesMiB: defaultMaxBytes,
}

require.NoError(t, cfg.Validate())
Expand Down
14 changes: 13 additions & 1 deletion collector/processor/concurrentbatchprocessor/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ type Config struct {

// MaxInFlightBytes limits the number of bytes in queue waiting to be
// processed by the senders.
MaxInFlightBytesMiB uint32 `mapstructure:"max_in_flight_bytes_mib"`

// Deprecated: Use MaxInFlightBytesMiB instead.
MaxInFlightBytes uint32 `mapstructure:"max_in_flight_bytes"`
}

Expand All @@ -68,5 +71,14 @@ func (cfg *Config) Validate() error {
if cfg.Timeout < 0 {
return errors.New("timeout must be greater or equal to 0")
}

// remove this check once MaxInFlightBytes is removed.
if cfg.MaxInFlightBytes != 0 {
return errors.New("max_in_flight_bytes is deprecated, use max_in_flight_bytes_mib instead")
}

if cfg.MaxInFlightBytesMiB < 0 {
return errors.New("max_in_flight_bytes_mib must be greater than or equal to 0")
}
return nil
}
}
4 changes: 2 additions & 2 deletions collector/processor/concurrentbatchprocessor/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestUnmarshalConfig(t *testing.T) {
SendBatchMaxSize: uint32(11000),
Timeout: time.Second * 10,
MetadataCardinalityLimit: 1000,
MaxInFlightBytes: 12345,
MaxInFlightBytesMiB: 12345,
}, cfg)
}

Expand Down Expand Up @@ -74,4 +74,4 @@ func TestValidateConfig_InvalidTimeout(t *testing.T) {
func TestValidateConfig_ValidZero(t *testing.T) {
cfg := &Config{}
assert.NoError(t, cfg.Validate())
}
}
4 changes: 2 additions & 2 deletions collector/processor/concurrentbatchprocessor/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const (
defaultSendBatchSize = uint32(8192)
defaultTimeout = 200 * time.Millisecond
// default inflight bytes is 2 MiB
defaultMaxBytes = 2 * 1048576
defaultMaxBytes = 2 * 1048576

// defaultMetadataCardinalityLimit should be set to the number
// of metadata configurations the user expects to submit to
Expand All @@ -45,7 +45,7 @@ func createDefaultConfig() component.Config {
return &Config{
SendBatchSize: defaultSendBatchSize,
Timeout: defaultTimeout,
MaxInFlightBytes: defaultMaxBytes,
MaxInFlightBytesMiB: defaultMaxBytes,
MetadataCardinalityLimit: defaultMetadataCardinalityLimit,
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
timeout: 10s
send_batch_size: 10000
send_batch_max_size: 11000
max_in_flight_bytes: 12345
max_in_flight_bytes_mib: 12345