Skip to content

Commit

Permalink
Add separate good/bad sink configurations (close #388)
Browse files Browse the repository at this point in the history
  • Loading branch information
pondzix authored and peel committed Nov 10, 2023
1 parent 884b5fb commit 09c8f87
Show file tree
Hide file tree
Showing 59 changed files with 903 additions and 447 deletions.
82 changes: 56 additions & 26 deletions examples/config.kafka.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -157,28 +157,17 @@ collector {
}

streams {
# Events which have successfully been collected will be stored in the good stream/topic
good = "good"

# Bad rows (https://docs.snowplowanalytics.com/docs/try-snowplow/recipes/recipe-understanding-bad-data/) will be stored in the bad stream/topic.
# The collector can currently produce two flavours of bad row:
# - a size_violation if an event is larger that the Kinesis (1MB) or SQS (256KB) limits;
# - a generic_error if a request's querystring cannot be parsed because of illegal characters
bad = "bad"

# Whether to use the incoming event's ip as the partition key for the good stream/topic
# Note: Nsq does not make use of partition key.
useIpAddressAsPartitionKey = false

# Enable the chosen sink by uncommenting the appropriate configuration
sink {
# Choose between kinesis, sqs, google-pub-sub, kafka, nsq, or stdout.
# To use stdout, comment or remove everything in the "collector.streams.sink" section except
# "enabled" which should be set to "stdout".
enabled = kafka

# Or Kafka
# Events which have successfully been collected will be stored in the good stream/topic
good {

name = "good"
brokers = "localhost:9092,another.host:9092"

## Number of retries to perform before giving up on sending a record
retries = 10
# The kafka producer has a variety of possible configuration options defined at
Expand All @@ -187,6 +176,7 @@ collector {
# "bootstrap.servers" = brokers
# "buffer.memory" = buffer.byteLimit
# "linger.ms" = buffer.timeLimit

#producerConf {
# acks = all
# "key.serializer" = "org.apache.kafka.common.serialization.StringSerializer"
Expand All @@ -197,18 +187,58 @@ collector {
# If a record is bigger, a size violation bad row is emitted instead
# Default: 1 MB
maxBytes = 1000000

# Incoming events are stored in a buffer before being sent to Kafka.
# The buffer is emptied whenever:
# - the number of stored records reaches record-limit or
# - the combined size of the stored records reaches byte-limit or
# - the time in milliseconds since the buffer was last emptied reaches time-limit
buffer {
byteLimit = 3145728
recordLimit = 500
timeLimit = 5000
}
}

# Incoming events are stored in a buffer before being sent to Kinesis/Kafka.
# Note: Buffering is not supported by NSQ.
# The buffer is emptied whenever:
# - the number of stored records reaches record-limit or
# - the combined size of the stored records reaches byte-limit or
# - the time in milliseconds since the buffer was last emptied reaches time-limit
buffer {
byteLimit = 3145728
recordLimit = 500
timeLimit = 5000
# Bad rows (https://docs.snowplowanalytics.com/docs/try-snowplow/recipes/recipe-understanding-bad-data/) will be stored in the bad stream/topic.
# The collector can currently produce two flavours of bad row:
# - a size_violation if an event is larger that the Kinesis (1MB) or SQS (256KB) limits;
# - a generic_error if a request's querystring cannot be parsed because of illegal characters
bad {

name = "bad"
brokers = "localhost:9092,another.host:9092"

## Number of retries to perform before giving up on sending a record
retries = 10
# The kafka producer has a variety of possible configuration options defined at
# https://kafka.apache.org/documentation/#producerconfigs
# Some values are set to other values from this config by default:
# "bootstrap.servers" = brokers
# "buffer.memory" = buffer.byteLimit
# "linger.ms" = buffer.timeLimit

#producerConf {
# acks = all
# "key.serializer" = "org.apache.kafka.common.serialization.StringSerializer"
# "value.serializer" = "org.apache.kafka.common.serialization.StringSerializer"
#}

# Optional. Maximum number of bytes that a single record can contain.
# If a record is bigger, a size violation bad row is emitted instead
# Default: 1 MB
maxBytes = 1000000

# Incoming events are stored in a buffer before being sent to Kafka.
# The buffer is emptied whenever:
# - the number of stored records reaches record-limit or
# - the combined size of the stored records reaches byte-limit or
# - the time in milliseconds since the buffer was last emptied reaches time-limit
buffer {
byteLimit = 3145728
recordLimit = 500
timeLimit = 5000
}
}
}

Expand Down
14 changes: 8 additions & 6 deletions examples/config.kafka.minimal.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ collector {
port = 8080

streams {
good = "good"
bad = "bad"

sink {
brokers = "localhost:9092,another.host:9092"
good {
name = "good"
brokers = "localhost:9092,another.host:9092"
}
bad {
name = "bad"
brokers = "localhost:9092,another.host:9092"
}
}
}
}
122 changes: 95 additions & 27 deletions examples/config.kinesis.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -157,26 +157,19 @@ collector {
}

streams {
# Events which have successfully been collected will be stored in the good stream/topic
good = "good"

# Bad rows (https://docs.snowplowanalytics.com/docs/try-snowplow/recipes/recipe-understanding-bad-data/) will be stored in the bad stream/topic.
# The collector can currently produce two flavours of bad row:
# - a size_violation if an event is larger that the Kinesis (1MB) or SQS (256KB) limits;
# - a generic_error if a request's querystring cannot be parsed because of illegal characters

bad = "bad"

# Whether to use the incoming event's ip as the partition key for the good stream/topic
# Note: Nsq does not make use of partition key.
useIpAddressAsPartitionKey = false

# Enable the chosen sink by uncommenting the appropriate configuration
sink {
# Choose between kinesis, sqs, google-pub-sub, kafka, nsq, or stdout.
# To use stdout, comment or remove everything in the "collector.streams.sink" section except
# "enabled" which should be set to "stdout".
enabled = kinesis
good {

# Events which have successfully been collected will be stored in the good stream/topic
name = "good"

# Region where the streams are located
region = "eu-central-1"

Expand All @@ -187,15 +180,13 @@ collector {
# Thread pool size for Kinesis and SQS API requests
threadPoolSize = 10

# Optional SQS buffer for good and bad events (respectively).
# Optional SQS buffer for good events.
# When messages can't be sent to Kinesis, they will be sent to SQS.
# If not configured, sending to Kinesis will be retried.
# This should only be set up for the Kinesis sink, where it acts as a failsafe.
# For the SQS sink, the good and bad queue should be specified under streams.good and streams.bad, respectively and these settings should be ignored.
#sqsGoodBuffer = {{sqsGoodBuffer}}

#sqsBadBuffer = {{sqsBadBuffer}}

# Optional. Maximum number of bytes that a single record can contain.
# If a record is bigger, a size violation bad row is emitted instead
# Default: 192 kb
Expand Down Expand Up @@ -236,19 +227,96 @@ collector {
# This is the interval for the calls.
# /sink-health is made healthy as soon as requests are successful or records are successfully inserted.
startupCheckInterval = 1 second

# Incoming events are stored in a buffer before being sent to Kinesis.
# The buffer is emptied whenever:
# - the number of stored records reaches record-limit or
# - the combined size of the stored records reaches byte-limit or
# - the time in milliseconds since the buffer was last emptied reaches time-limit
buffer {
byteLimit = 3145728
recordLimit = 500
timeLimit = 5000
}
}

# Incoming events are stored in a buffer before being sent to Kinesis/Kafka.
# Note: Buffering is not supported by NSQ.
# The buffer is emptied whenever:
# - the number of stored records reaches record-limit or
# - the combined size of the stored records reaches byte-limit or
# - the time in milliseconds since the buffer was last emptied reaches time-limit
buffer {
byteLimit = 3145728
recordLimit = 500
timeLimit = 5000
}

# Bad rows (https://docs.snowplowanalytics.com/docs/try-snowplow/recipes/recipe-understanding-bad-data/) will be stored in the bad stream/topic.
# The collector can currently produce two flavours of bad row:
# - a size_violation if an event is larger that the Kinesis (1MB) or SQS (256KB) limits;
# - a generic_error if a request's querystring cannot be parsed because of illegal characters
bad {

name = "bad"

# Region where the streams are located
region = "eu-central-1"

## Optional endpoint url configuration to override aws kinesis endpoints,
## this can be used to specify local endpoints when using localstack
# customEndpoint = {{kinesisEndpoint}}

# Thread pool size for Kinesis and SQS API requests
threadPoolSize = 10

# Optional SQS buffer for bad events.
# When messages can't be sent to Kinesis, they will be sent to SQS.
# If not configured, sending to Kinesis will be retried.
# This should only be set up for the Kinesis sink, where it acts as a failsafe.
# For the SQS sink, the good and bad queue should be specified under streams.good and streams.bad, respectively and these settings should be ignored.
#sqsBadBuffer = {{sqsBadBuffer}}

# Optional. Maximum number of bytes that a single record can contain.
# If a record is bigger, a size violation bad row is emitted instead
# Default: 192 kb
# SQS has a record size limit of 256 kb, but records are encoded with Base64,
# which adds approximately 33% of the size, so we set the limit to 256 kb * 3/4
sqsMaxBytes = 192000

# The following are used to authenticate for the Amazon Kinesis and SQS sinks.
# If both are set to 'default', the default provider chain is used
# (see http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html)
# If both are set to 'iam', use AWS IAM Roles to provision credentials.
# If both are set to 'env', use environment variables AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY
aws {
accessKey = iam
secretKey = iam
}

# Optional
backoffPolicy {
# Minimum backoff period in milliseconds
minBackoff = 500
# Maximum backoff period in milliseconds
maxBackoff = 1500
# Failed inserts are retried forever.
# In case of just Kinesis without SQS, number of retries before setting /sink-health unhealthy.
# In case of Kinesis + SQS, number of retries with one before retrying with the other.
maxRetries = 3
}

# Optional. Maximum number of bytes that a single record can contain.
# If a record is bigger, a size violation bad row is emitted instead
# Default: 1 MB
# If SQS buffer is activated, sqsMaxBytes is used instead
maxBytes = 1000000

# When collector starts, it checks if Kinesis streams exist with describeStreamSummary
# and if SQS buffers exist with getQueueUrl (if configured).
# This is the interval for the calls.
# /sink-health is made healthy as soon as requests are successful or records are successfully inserted.
startupCheckInterval = 1 second

# Incoming events are stored in a buffer before being sent to Kinesis.
# The buffer is emptied whenever:
# - the number of stored records reaches record-limit or
# - the combined size of the stored records reaches byte-limit or
# - the time in milliseconds since the buffer was last emptied reaches time-limit
buffer {
byteLimit = 3145728
recordLimit = 500
timeLimit = 5000
}
}
}

# Telemetry sends heartbeat events to external pipeline.
Expand Down
10 changes: 6 additions & 4 deletions examples/config.kinesis.minimal.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@ collector {
port = 8080

streams {
good = "good"
bad = "bad"

sink {
good {
name = "good"
region = eu-central-1
}
bad {
name = "bad"
region = eu-central-1
}
}
Expand Down
36 changes: 20 additions & 16 deletions examples/config.nsq.extended.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -157,27 +157,14 @@ collector {
}

streams {
# Events which have successfully been collected will be stored in the good stream/topic
good = "good"

# Bad rows (https://docs.snowplowanalytics.com/docs/try-snowplow/recipes/recipe-understanding-bad-data/) will be stored in the bad stream/topic.
# The collector can currently produce two flavours of bad row:
# - a size_violation if an event is larger that the Kinesis (1MB) or SQS (256KB) limits;
# - a generic_error if a request's querystring cannot be parsed because of illegal characters
bad = "bad"

# Whether to use the incoming event's ip as the partition key for the good stream/topic
# Note: Nsq does not make use of partition key.
useIpAddressAsPartitionKey = false

# Enable the chosen sink by uncommenting the appropriate configuration
sink {
# Choose between kinesis, sqs, google-pub-sub, kafka, nsq, or stdout.
# To use stdout, comment or remove everything in the "collector.streams.sink" section except
# "enabled" which should be set to "stdout".
enabled = nsq

# Or NSQ
# Events which have successfully been collected will be stored in the good stream/topic
good {
name = "good"
## Host name for nsqd
host = "nsqHost"
## TCP port for nsqd, 4150 by default
Expand All @@ -188,6 +175,23 @@ collector {
# Default: 1 MB
maxBytes = 1000000
}

# Bad rows (https://docs.snowplowanalytics.com/docs/try-snowplow/recipes/recipe-understanding-bad-data/) will be stored in the bad stream/topic.
# The collector can currently produce two flavours of bad row:
# - a size_violation if an event is larger that the Kinesis (1MB) or SQS (256KB) limits;
# - a generic_error if a request's querystring cannot be parsed because of illegal characters
bad {
name = "bad"
## Host name for nsqd
host = "nsqHost"
## TCP port for nsqd, 4150 by default
port = 4150

# Optional. Maximum number of bytes that a single record can contain.
# If a record is bigger, a size violation bad row is emitted instead
# Default: 1 MB
maxBytes = 1000000
}
}

# Telemetry sends heartbeat events to external pipeline.
Expand Down
11 changes: 7 additions & 4 deletions examples/config.nsq.minimal.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@ collector {
port = 8080

streams {
good = "good"
bad = "bad"

sink {
good {
name = "good"
host = "nsqHost"
}

bad {
name = "bad"
host = "nsqHost"
}
}
Expand Down
Loading

0 comments on commit 09c8f87

Please sign in to comment.