An HTTP service which performs peer forwarding of Event
between Data Prepper nodes for aggregation. Currently, supported by aggregate
, service_map_stateful
, otel_trace_raw
processors.
Peer Forwarder groups events based on the identification keys provided the processors.
For service_map_stateful
and otel_trace_raw
it's traceId
by default and can not be configured.
It's configurable for aggregate
processor using identification_keys
configuration option. You can find more information about identification keys here.
Presently peer discovery is provided by either a static list or by a DNS record lookup or AWS Cloudmap.
Static discover mode allows Data Prepper node to discover nodes using a list of IP addresses or domain names.
peer_forwarder:
discovery_mode: static
static_endpoints: ["data-prepper1", "data-prepper2"]
We recommend using DNS discovery over static discovery when scaling out a Data Prepper cluster. The core concept is to configure a DNS provider to return a list of Data Prepper hosts when given a single domain name. This is a DNS A Record which indicates a list of IP addresses of a given domain.
peer_forwarder:
discovery_mode: dns
domain_name: "data-prepper-cluster.my-domain.net"
AWS Cloud Map provides API-based service discovery as well as DNS-based service discovery.
Peer forwarder can use the API-based service discovery. To support this you must have an existing namespace configured for API instance discovery. You can create a new one following the instructions provided by the Cloud Map documentation.
Your Data Prepper configuration needs to include:
aws_cloud_map_namespace_name
- Set to your Cloud Map Namespace nameaws_cloud_map_service_name
- Set to the service name within your specified Namespaceaws_region
- The AWS region where your namespace exists.discovery_mode
- Set toaws_cloud_map
Your Data Prepper configuration can optionally include:
aws_cloud_map_query_parameters
- Key/value pairs to filter the results based on the custom attributes attached to an instance. Only instances that match all the specified key-value pairs are returned.
Example configuration:
peer_forwarder:
discovery_mode: aws_cloud_map
aws_cloud_map_namespace_name: "my-namespace"
aws_cloud_map_service_name: "data-prepper-cluster"
aws_cloud_map_query_parameters:
instance_type: "r5.xlarge"
aws_region: "us-east-1"
The Data Prepper must also be running with the necessary permissions. The following IAM policy shows the necessary permissions.
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "CloudMapPeerForwarder",
"Effect": "Allow",
"Action": "servicediscovery:DiscoverInstances",
"Resource": "*"
}
]
}
port
(Optional): Anint
between 0 and 65535 represents the port peer forwarder server is running on. Default value is4994
.request_timeout
(Optional): Duration - Anint
representing the request timeout in milliseconds for Peer Forwarder HTTP server. Default value is10000
.server_thread_count
(Optional): Anint
representing number of threads used by Peer Forwarder server. Defaults to200
.client_thread_count
(Optional): Anint
representing number of threads used by Peer Forwarder client. Defaults to200
.maxConnectionCount
(Optional): Anint
representing maximum number of open connections for Peer Forwarder server. Default value is500
.discovery_mode
(Optional): AString
representing the peer discovery mode to be used. Allowable values arelocal_node
,static
,dns
, andaws_cloud_map
. Defaults tolocal_node
which processes events locally.static_endpoints
(Optional): Alist
containing endpoints of all Data Prepper instances. Required ifdiscovery_mode
is set tostatic
.domain_name
(Optional): AString
representing single domain name to query DNS against. Typically, used by creating multiple DNS A Records for the same domain. Required ifdiscovery_mode
is set todns
.aws_cloud_map_namespace_name
(Optional) - AString
representing the Cloud Map namespace when using AWS Cloud Map service discovery. Required ifdiscovery_mode
is set toaws_cloud_map
.aws_cloud_map_service_name
(Optional) - AString
representing the Cloud Map service when using AWS Cloud Map service discovery. Required ifdiscovery_mode
is set toaws_cloud_map
.aws_cloud_map_query_parameters
(Optional): AMap
of Key/value pairs to filter the results based on the custom attributes attached to an instance. Only instances that match all the specified key-value pairs are returned.buffer_size
(Optional): Anint
representing max number of unchecked records the buffer accepts (num of unchecked records = num of records written into the buffer + num of in-flight records not yet checked by the Checkpointing API). Default is512
.batch_size
(Optional): Anint
representing max number of records the buffer returns on read. Default is48
.batch_delay
(Optional): Anint
representing the maximum duration in milliseconds to retrievebatch_size
records from the peer forwarder buffer. If thebatch_size
has not been reached before this duration is exceeded, a partial batch is used. If this value is set to 0, all available records up to the batch size will be immediately returned. If the buffer is empty, the buffer will block for up to 5 milliseconds to wait for records. Default value is3000
.aws_region
(Optional) : AString
represents the AWS region to useACM
,S3
orAWS Cloud Map
. Required ifuse_acm_certificate_for_ssl
is set totrue
orssl_certificate_file
andssl_key_file
isAWS S3
path or ifdiscovery_mode
is set toaws_cloud_map
.drain_timeout
(Optional) : ADuration
representing the wait time for the peer forwarder to complete processing data before shutdown.forwarding_batch_size
(Optional) : Anint
representing the maximum number of records to send in each request to a peer. Default value is1500
, maximum value is15000
.forwarding_batch_queue_depth
(Optional) : Anint
representing the depth of the batching queue. This value is a scalar used to determine the size of the LinkedBlockingQueues used for batching records before they are sent to a peer. The queue size is determined by the formula:workers
*forwarding_batch_size
*forwarding_batch_queue_depth
. Default value is1
.forwarding_batch_timeout
(Optional) : ADuration
representing the maximum time that can occur between flushing batches to a peer. Default is3s
.
The SSL configuration for setting up trust manager for peer forwarding client to connect to other Data Prepper instances.
ssl
(Optional) : Aboolean
that enables TLS/SSL. Default value istrue
.ssl_certificate_file
(Optional) : AString
representing the SSL certificate chain file path or AWS S3 path. S3 path examples3://<bucketName>/<path>
. Defaults toconfig/default_certificate.pem
which is default certificate file. Read more about how the certificate file is generated here.ssl_key_file
(Optional) : AString
represents the SSL key file path or AWS S3 path. S3 path examples3://<bucketName>/<path>
. Defaults toconfig/default_private_key.pem
which is default private key file. Read more about how the private key file is generated here.ssl_insecure_disable_verification
(Optional) : Aboolean
that disables the verification of server's TLS certificate chain. Default value isfalse
.ssl_fingerprint_verification_only
(Optional) : Aboolean
that disables the verification of server's TLS certificate chain and instead verifies only the certificate fingerprint. Default value isfalse
.use_acm_certificate_for_ssl
(Optional) : Aboolean
that enables TLS/SSL using certificate and private key from AWS Certificate Manager (ACM). Default isfalse
.acm_certificate_arn
(Optional) : AString
represents the ACM certificate ARN. ACM certificate take preference over S3 or local file system certificate. Required ifuse_acm_certificate_for_ssl
is set totrue
.acm_private_key_password
(Optional) : AString
that represents the ACM private key password which that will be used to decrypt the private key. If it's not provided, a random password will be generated.acm_certificate_timeout_millis
(Optional) : Anint
representing the timeout in milliseconds for ACM to get certificates. Default value is120000
.aws_region
(Optional) : AString
represents the AWS region to useACM
,S3
orAWS Cloud Map
. Required ifuse_acm_certificate_for_ssl
is set totrue
orssl_certificate_file
andssl_key_file
isAWS S3
path or ifdiscovery_mode
is set toaws_cloud_map
.
peer_forwarder:
ssl: true
ssl_certificate_file: "<cert-file-path>"
ssl_key_file: "<private-key-file-path>"
authentication
(Optional) : AMap
that enables mTLS. It can either bemutual_tls
orunauthenticated
. Default value isunauthenticated
.
peer_forwarder:
authentication:
mutual_tls:
Core Peer Forwarder introduces the following custom metrics and all the metrics are prefixed by core.peerForwarder
requestForwardingLatency
: measures latency of forwarding requests by peer forwarder client.requestProcessingLatency
: measures latency of processing requests by peer forwarder server.
requests
: measures total number of forwarded requests.requestsFailed
: measures total number of failed requests. Requests with HTTP response code other than200
.requestsSuccessful
: measures total number of successful requests. Requests with HTTP response code200
.requestsTooLarge
: measures total number of requests which are too large to be written to peer forwarder buffer. Requests with HTTP response code413
.requestTimeouts
: measures total number of requests which timed out while writing content to peer forwarder buffer. Requests with HTTP response code408
.requestsUnprocessable
: measures total number of requests which failed due to unprocessable entity. Requests with HTTP response code422
.badRequests
: measures total number of requests with bad request format. Requests with HTTP response code400
.recordsSuccessfullyForwarded
: measures total number of forwarded records successfully.recordsFailedForwarding
: measures total number of records failed to be forwarded.recordsToBeForwarded
: measures total number of records to be forwarded.recordsToBeProcessedLocally
: measures total number of records to be processed locally.recordsActuallyProcessedLocally
: measures total number of records actually processed locally. Sum ofrecordsToBeProcessedLocally
andrecordsFailedForwarding
.recordsReceivedFromPeers
: measures total number of records received from remote peers.
peerEndpoints
: measures number of dynamically discovered peer data-prepper endpoints. Forstatic
mode, the size is fixed.