From f1bbae6084751ea0f0f6986387bf68377ad39241 Mon Sep 17 00:00:00 2001 From: Eduardo Silva Date: Thu, 15 Feb 2024 20:37:43 -0600 Subject: [PATCH] out_opentelemetry: enhancements for log body and attributes handling (fix #8359) The following patch fix and enhance the OpenTelemetry output connector when handling log records. In Fluent Bit world, we deal with ton of unstructured log records which comes from variety of sources, or just simply raw text files. When converting those lines to structured messages, there was no option to define what will be the log body and log attributes and everything is packaged inside log body by default. This patch enhance the previous behavior by allowing the following: - log body: optionally define multiple record accessor patterns that tries to match a key or sub-key from the record structure. For the first matched key, it's value is used as part of the body content. If no matches exists, the whole record is set inside the body. - log attributes: if the log record contains native metadata, the keys are added as OpenTelemetry Attributes. if the log body was populated by using a record accessor pattern as described above, the remaining keys that were not used are added as attributes. To achieve the desired new behavior, the configuration needs to use the new configuration property called 'logs_body_key', which can be used to define multiple record accessor patterns. e.g: pipeline: inputs: - name: dummy metadata: '{"meta": "data"}' dummy: '{"name": "bill", "lastname": "gates", "log": {"abc": {"def":123}, "saveme": true}}' outputs: - name: opentelemetry match: '*' host: localhost port: 4318 logs_body_key: $name logs_body_key: $log['abc']['def'] In the example above, the dummy input plugin will generate a record with metadata, in the output side, the plugin will lookup in order for $name and then $log['abc']['def']. $name will match so 'bill' will become the value of the body and the remaining content as attributes. Here is the output of the vanilla Otel Collector when inspecting the content it receives: Body: Str(bill) Attributes: -> meta: Str(data) -> lastname: Str(gates) -> log: Map({"abc":{"def":123},"saveme":true}) Signed-off-by: Eduardo Silva --- plugins/out_opentelemetry/opentelemetry.c | 305 +++++++++++++----- plugins/out_opentelemetry/opentelemetry.h | 20 ++ .../out_opentelemetry/opentelemetry_conf.c | 153 ++++++++- 3 files changed, 397 insertions(+), 81 deletions(-) diff --git a/plugins/out_opentelemetry/opentelemetry.c b/plugins/out_opentelemetry/opentelemetry.c index 509aa1d7c78..784b1d95a69 100644 --- a/plugins/out_opentelemetry/opentelemetry.c +++ b/plugins/out_opentelemetry/opentelemetry.c @@ -41,6 +41,7 @@ extern void cmt_encode_opentelemetry_destroy(cfl_sds_t text); #include "opentelemetry.h" #include "opentelemetry_conf.h" + static inline Opentelemetry__Proto__Common__V1__AnyValue *msgpack_object_to_otlp_any_value(struct msgpack_object *o); static inline void otlp_any_value_destroy(Opentelemetry__Proto__Common__V1__AnyValue *value); @@ -67,17 +68,19 @@ static inline void otlp_kvarray_destroy(Opentelemetry__Proto__Common__V1__KeyVal static inline void otlp_kvpair_destroy(Opentelemetry__Proto__Common__V1__KeyValue *kvpair) { - if (kvpair != NULL) { - if (kvpair->key != NULL) { - flb_free(kvpair->key); - } + if (kvpair == NULL) { + return; + } - if (kvpair->value != NULL) { - otlp_any_value_destroy(kvpair->value); - } + if (kvpair->key != NULL) { + flb_free(kvpair->key); + } - flb_free(kvpair); + if (kvpair->value != NULL) { + otlp_any_value_destroy(kvpair->value); } + + flb_free(kvpair); } static inline void otlp_kvlist_destroy(Opentelemetry__Proto__Common__V1__KeyValueList *kvlist) @@ -181,10 +184,12 @@ static int http_post(struct opentelemetry_context *ctx, if (ret == 0) { compressed = FLB_TRUE; - } else { + } + else { flb_plg_error(ctx->ins, "cannot gzip payload, disabling compression"); } - } else { + } + else { final_body = (void *) body; final_body_len = body_len; } @@ -280,10 +285,8 @@ static int http_post(struct opentelemetry_context *ctx, out_ret = FLB_RETRY; } else { - if (ctx->log_response_payload && - c->resp.payload != NULL && - c->resp.payload_size > 0) { - flb_plg_info(ctx->ins, "%s:%i, HTTP status=%i\n%.*s", + if (ctx->log_response_payload && c->resp.payload != NULL && c->resp.payload_size > 2) { + flb_plg_info(ctx->ins, "%s:%i, HTTP status=%i%.*s", ctx->host, ctx->port, c->resp.status, (int) c->resp.payload_size, @@ -654,7 +657,6 @@ static inline Opentelemetry__Proto__Common__V1__KeyValue **msgpack_map_to_otlp_k *entry_count = o->via.map.size; result = flb_calloc(*entry_count, sizeof(Opentelemetry__Proto__Common__V1__KeyValue *)); - if (result != NULL) { for (index = 0; index < *entry_count; index++) { kv = &o->via.map.ptr[index]; @@ -743,19 +745,159 @@ static inline Opentelemetry__Proto__Common__V1__AnyValue *msgpack_object_to_otlp return result; } +static inline int log_record_set_body(struct opentelemetry_context *ctx, + Opentelemetry__Proto__Logs__V1__LogRecord *log_records, struct flb_log_event *event, + struct flb_record_accessor **out_ra_match) +{ + int ret; + struct mk_list *head; + struct opentelemetry_body_key *bk; + msgpack_object *s_key = NULL; + msgpack_object *o_key = NULL; + msgpack_object *o_val = NULL; + Opentelemetry__Proto__Common__V1__AnyValue *log_object = NULL; + + *out_ra_match = NULL; + mk_list_foreach(head, &ctx->log_body_key_list) { + bk = mk_list_entry(head, struct opentelemetry_body_key, _head); + + ret = flb_ra_get_kv_pair(bk->ra, *event->body, &s_key, &o_key, &o_val); + if (ret == 0) { + log_object = msgpack_object_to_otlp_any_value(o_val); + + /* Link the record accessor pattern that matched */ + *out_ra_match = bk->ra; + break; + } + + log_object = NULL; + } + + /* At this point the record accessor patterns found nothing, so we just package the whole record */ + if (!log_object) { + log_object = msgpack_object_to_otlp_any_value(event->body); + } + + if (!log_object) { + flb_plg_error(ctx->ins, "log event conversion failure"); + return -1; + } + + /* try to find the following keys: message or log, if found */ + log_records->body = log_object; + return 0; +} + +static int log_record_set_attributes(struct opentelemetry_context *ctx, + Opentelemetry__Proto__Logs__V1__LogRecord *log_record, struct flb_log_event *event, + struct flb_record_accessor *ra_match) +{ + int i; + int ret; + int attr_count = 0; + int unpacked = FLB_FALSE; + size_t array_size; + void *out_buf; + size_t offset = 0; + size_t out_size; + msgpack_object_kv *kv; + msgpack_object *metadata; + msgpack_unpacked result; + Opentelemetry__Proto__Common__V1__KeyValue **buf; + + /* Maximum array size is the total number of root keys in metadata and record keys */ + array_size = event->body->via.map.size; + + /* log metadata (metada that comes from original Fluent Bit record ) */ + metadata = event->metadata; + if (metadata) { + array_size += metadata->via.map.size; + } + + /* + * Remove the keys from the record that were added to the log body and create a new output + * buffer. If there are matches, meaning that a new output buffer was created, ret will + * be FLB_TRUE, if no matches exists it returns FLB_FALSE. + */ + if (ctx->mp_accessor && ra_match) { + /* + * if ra_match is not NULL, it means that the log body was populated with a key from the record + * and the variable holds a reference to the record accessor that matched the key. + * + * Since 'likely' the mp_accessor context can have multiple record accessor patterns, + * we need to make sure to remove 'only' the one that was used in the log body, + * the approach we take is to disable all the patterns, enable the single one that + * matched, process and then re-enable all of them. + */ + flb_mp_accessor_set_active(ctx->mp_accessor, FLB_FALSE); + + /* Only active the one that matched */ + flb_mp_accessor_set_active_by_pattern(ctx->mp_accessor, + ra_match->pattern, + FLB_TRUE); + + /* Remove the undesired key */ + ret = flb_mp_accessor_keys_remove(ctx->mp_accessor, event->body, &out_buf, &out_size); + if (ret) { + msgpack_unpacked_init(&result); + msgpack_unpack_next(&result, out_buf, out_size, &offset); + + array_size += result.data.via.map.size; + unpacked = FLB_TRUE; + } + /* Enable all the mp_accessors */ + flb_mp_accessor_set_active(ctx->mp_accessor, FLB_TRUE); + } + + /* allocate an array to hold the converted map entries */ + buf = flb_calloc(array_size, sizeof(Opentelemetry__Proto__Common__V1__KeyValue *)); + if (!buf) { + flb_errno(); + if (unpacked) { + msgpack_unpacked_destroy(&result); + flb_free(out_buf); + } + return -1; + } + + /* pack metadata */ + for (i = 0; i < metadata->via.map.size; i++) { + kv = &metadata->via.map.ptr[i]; + buf[i] = msgpack_kv_to_otlp_any_value(kv); + attr_count++; + } + + /* remaining fields that were not added to log body */ + if (unpacked) { + /* iterate the map and reference each elemento as an OTLP value */ + for (i = 0; i < result.data.via.map.size; i++) { + kv = &result.data.via.map.ptr[i]; + buf[attr_count] = msgpack_kv_to_otlp_any_value(kv); + attr_count++; + } + msgpack_unpacked_destroy(&result); + flb_free(out_buf); + } + + log_record->attributes = buf; + log_record->n_attributes = attr_count; + return 0; +} + static int flush_to_otel(struct opentelemetry_context *ctx, struct flb_event_chunk *event_chunk, Opentelemetry__Proto__Logs__V1__LogRecord **logs, size_t log_count) { + int ret; + void *body; + unsigned len; + Opentelemetry__Proto__Collector__Logs__V1__ExportLogsServiceRequest export_logs; Opentelemetry__Proto__Logs__V1__ScopeLogs scope_log; Opentelemetry__Proto__Logs__V1__ResourceLogs resource_log; Opentelemetry__Proto__Logs__V1__ResourceLogs *resource_logs[1]; Opentelemetry__Proto__Logs__V1__ScopeLogs *scope_logs[1]; - void *body; - unsigned len; - int res; opentelemetry__proto__collector__logs__v1__export_logs_service_request__init(&export_logs); opentelemetry__proto__logs__v1__resource_logs__init(&resource_log); @@ -781,15 +923,15 @@ static int flush_to_otel(struct opentelemetry_context *ctx, opentelemetry__proto__collector__logs__v1__export_logs_service_request__pack(&export_logs, body); - // send post request to opentelemetry with content type application/x-protobuf - res = http_post(ctx, body, len, + /* send post request to opentelemetry with content type application/x-protobuf */ + ret = http_post(ctx, body, len, event_chunk->tag, flb_sds_len(event_chunk->tag), ctx->logs_uri); flb_free(body); - return res; + return ret; } static int process_logs(struct flb_event_chunk *event_chunk, @@ -797,25 +939,22 @@ static int process_logs(struct flb_event_chunk *event_chunk, struct flb_input_instance *ins, void *out_context, struct flb_config *config) { + int ret; + size_t i; size_t log_record_count; Opentelemetry__Proto__Logs__V1__LogRecord **log_record_list; Opentelemetry__Proto__Logs__V1__LogRecord *log_records; - Opentelemetry__Proto__Common__V1__AnyValue *log_object; struct flb_log_event_decoder *decoder; struct flb_log_event event; - size_t index; struct opentelemetry_context *ctx; - int res; + struct flb_record_accessor *ra_match; ctx = (struct opentelemetry_context *) out_context; - log_record_list = (Opentelemetry__Proto__Logs__V1__LogRecord **) \ - flb_calloc(ctx->batch_size, - sizeof(Opentelemetry__Proto__Logs__V1__LogRecord *)); - - if (log_record_list == NULL) { + log_record_list = (Opentelemetry__Proto__Logs__V1__LogRecord **) + flb_calloc(ctx->batch_size, sizeof(Opentelemetry__Proto__Logs__V1__LogRecord *)); + if (!log_record_list) { flb_errno(); - return -1; } @@ -823,72 +962,71 @@ static int process_logs(struct flb_event_chunk *event_chunk, flb_calloc(ctx->batch_size, sizeof(Opentelemetry__Proto__Logs__V1__LogRecord)); - if (log_records == NULL) { + if (!log_records) { flb_errno(); - flb_free(log_record_list); - return -2; } - for(index = 0 ; index < ctx->batch_size ; index++) { - log_record_list[index] = &log_records[index]; + for (i = 0 ; i < ctx->batch_size ; i++) { + log_record_list[i] = &log_records[i]; } - decoder = flb_log_event_decoder_create((char *) event_chunk->data, - event_chunk->size); - + decoder = flb_log_event_decoder_create((char *) event_chunk->data, event_chunk->size); if (decoder == NULL) { flb_plg_error(ctx->ins, "could not initialize record decoder"); - flb_free(log_record_list); flb_free(log_records); - return -1; } log_record_count = 0; - res = FLB_OK; - - while (flb_log_event_decoder_next(decoder, &event) == 0 && - res == FLB_OK) { + ret = FLB_OK; + while (flb_log_event_decoder_next(decoder, &event) == FLB_EVENT_DECODER_SUCCESS) { + ra_match = NULL; opentelemetry__proto__logs__v1__log_record__init(&log_records[log_record_count]); - log_records[log_record_count].attributes = \ - msgpack_map_to_otlp_kvarray(event.metadata, - &log_records[log_record_count].n_attributes); - log_object = msgpack_object_to_otlp_any_value(event.body); + /* + * Set the record body by using the logic defined in the configuration by + * the 'logs_body_key' properties. + * + * Note that the reference set in `out_body_parent_key` is the parent/root key that holds the content + * that was discovered. We get that reference so we can easily filter it out when composing + * the final list of attributes. + */ + ret = log_record_set_body(ctx, &log_records[log_record_count], &event, &ra_match); + if (ret == -1) { + /* the only possible fail path is a problem with a memory allocation, let's suggest a FLB_RETRY */ + ret = FLB_RETRY; + break; + } - if (log_object == NULL) { - flb_plg_error(ctx->ins, "log event conversion failure"); - res = FLB_ERROR; - continue; + /* set attributes from metadata and remaining fields from the main record */ + ret = log_record_set_attributes(ctx, &log_records[log_record_count], &event, ra_match); + if (ret == -1) { + /* as before, it can only fail on a memory allocation */ + ret = FLB_RETRY; + break; } + ret = FLB_OK; - log_records[log_record_count].body = log_object; + /* set timestamp */ log_records[log_record_count].time_unix_nano = flb_time_to_nanosec(&event.timestamp); - log_record_count++; if (log_record_count >= ctx->batch_size) { - res = flush_to_otel(ctx, - event_chunk, - log_record_list, - log_record_count); - + ret = flush_to_otel(ctx, event_chunk, log_record_list, log_record_count); clear_array(log_record_list, log_record_count); - log_record_count = 0; } } flb_log_event_decoder_destroy(decoder); - if (log_record_count > 0 && - res == FLB_OK) { - res = flush_to_otel(ctx, + if (log_record_count > 0 && ret == FLB_OK) { + ret = flush_to_otel(ctx, event_chunk, log_record_list, log_record_count); @@ -899,13 +1037,13 @@ static int process_logs(struct flb_event_chunk *event_chunk, flb_free(log_record_list); flb_free(log_records); - return res; + return ret; } static int process_metrics(struct flb_event_chunk *event_chunk, - struct flb_output_flush *out_flush, - struct flb_input_instance *ins, void *out_context, - struct flb_config *config) + struct flb_output_flush *out_flush, + struct flb_input_instance *ins, void *out_context, + struct flb_config *config) { int c = 0; int ok; @@ -1165,11 +1303,34 @@ static struct flb_config_map config_map[] = { 0, FLB_TRUE, offsetof(struct opentelemetry_context, metrics_uri), "Specify an optional HTTP URI for the target OTel endpoint." }, + + { + FLB_CONFIG_MAP_INT, "batch_size", DEFAULT_LOG_RECORD_BATCH_SIZE, + 0, FLB_TRUE, offsetof(struct opentelemetry_context, batch_size), + "Set the maximum number of log records to be flushed at a time" + }, + { + FLB_CONFIG_MAP_STR, "compress", NULL, + 0, FLB_FALSE, 0, + "Set payload compression mechanism. Option available is 'gzip'" + }, + + /* + * Logs Properties + * --------------- + */ { FLB_CONFIG_MAP_STR, "logs_uri", "/v1/logs", 0, FLB_TRUE, offsetof(struct opentelemetry_context, logs_uri), "Specify an optional HTTP URI for the target OTel endpoint." }, + + { + FLB_CONFIG_MAP_STR, "logs_body_key", NULL, + FLB_CONFIG_MAP_MULT, FLB_TRUE, offsetof(struct opentelemetry_context, log_body_key_list_str), + "Specify an optional HTTP URI for the target OTel endpoint." + }, + { FLB_CONFIG_MAP_STR, "traces_uri", "/v1/traces", 0, FLB_TRUE, offsetof(struct opentelemetry_context, traces_uri), @@ -1180,16 +1341,6 @@ static struct flb_config_map config_map[] = { 0, FLB_TRUE, offsetof(struct opentelemetry_context, log_response_payload), "Specify if the response paylod should be logged or not" }, - { - FLB_CONFIG_MAP_INT, "batch_size", DEFAULT_LOG_RECORD_BATCH_SIZE, - 0, FLB_TRUE, offsetof(struct opentelemetry_context, batch_size), - "Set the maximum number of log records to be flushed at a time" - }, - { - FLB_CONFIG_MAP_STR, "compress", NULL, - 0, FLB_FALSE, 0, - "Set payload compression mechanism. Option available is 'gzip'" - }, /* EOF */ {0} }; diff --git a/plugins/out_opentelemetry/opentelemetry.h b/plugins/out_opentelemetry/opentelemetry.h index c49d25622d4..6ef6b2bab87 100644 --- a/plugins/out_opentelemetry/opentelemetry.h +++ b/plugins/out_opentelemetry/opentelemetry.h @@ -21,6 +21,8 @@ #define FLB_OUT_OPENTELEMETRY_H #include +#include +#include #define FLB_OPENTELEMETRY_CONTENT_TYPE_HEADER_NAME "Content-Type" #define FLB_OPENTELEMETRY_MIME_PROTOBUF_LITERAL "application/x-protobuf" @@ -33,6 +35,12 @@ */ #define DEFAULT_LOG_RECORD_BATCH_SIZE "1000" +struct opentelemetry_body_key { + flb_sds_t key; + struct flb_record_accessor *ra; + struct mk_list _head; +}; + /* Plugin context */ struct opentelemetry_context { /* HTTP Auth */ @@ -60,9 +68,21 @@ struct opentelemetry_context { /* config reader for 'add_label' */ struct mk_list *add_labels; + /* + * list of linked list body keys given at configuration: note this list is just a slist, + * of strings, once is parsed, it populate the final list in 'log_body_key_list' + */ + struct mk_list *log_body_key_list_str; + + /* head of linked list body keys populated once log_body_key_list_str is parsed */ + struct mk_list log_body_key_list; + /* internal labels ready to append */ struct mk_list kv_labels; + /* special accessor with list of patterns used to populate log metadata */ + struct flb_mp_accessor *mp_accessor; + /* Upstream connection to the backend server */ struct flb_upstream *u; diff --git a/plugins/out_opentelemetry/opentelemetry_conf.c b/plugins/out_opentelemetry/opentelemetry_conf.c index f85876d18dd..83f14bbe153 100644 --- a/plugins/out_opentelemetry/opentelemetry_conf.c +++ b/plugins/out_opentelemetry/opentelemetry_conf.c @@ -26,6 +26,122 @@ #include "opentelemetry.h" #include "opentelemetry_conf.h" +/* create a single entry of log_body_key */ +static int log_body_key_create(struct opentelemetry_context *ctx, char *ra_pattern) +{ + struct opentelemetry_body_key *bk; + + bk = flb_calloc(1, sizeof(struct opentelemetry_body_key)); + if (!bk) { + flb_errno(); + return -1; + } + + bk->key = flb_sds_create(ra_pattern); + if (!bk->key) { + flb_free(bk); + return -1; + } + + bk->ra = flb_ra_create(ra_pattern, FLB_TRUE); + if (!bk->ra) { + flb_plg_error(ctx->ins, + "could not process event_field with pattern '%s'", + ra_pattern); + flb_sds_destroy(bk->key); + flb_free(bk); + return -1; + } + + mk_list_add(&bk->_head, &ctx->log_body_key_list); + + return 0; +} + +/* process and instance the list of body key patterns */ +static int log_body_key_list_create(struct opentelemetry_context *ctx) +{ + int ret; + struct mk_list *head; + struct flb_config_map_val *mv; + + /* If no log_body_key are defined, set the default ones */ + if (!ctx->log_body_key_list_str || mk_list_size(ctx->log_body_key_list_str) == 0) { + ret = log_body_key_create(ctx, "$log"); + if (ret != 0) { + return -1; + } + + ret = log_body_key_create(ctx, "$message"); + if (ret != 0) { + return -1; + } + + return 0; + } + + /* Iterate the list of log body keys defined in the configuration and initiate them */ + flb_config_map_foreach(head, mv, ctx->log_body_key_list_str) { + ret = log_body_key_create(ctx, mv->val.str); + if (ret != 0) { + return -1; + } + } + + return 0; +} + +static void log_body_key_list_destroy(struct opentelemetry_context *ctx) +{ + struct mk_list *tmp; + struct mk_list *head; + struct opentelemetry_body_key *bk; + + mk_list_foreach_safe(head, tmp, &ctx->log_body_key_list) { + bk = mk_list_entry(head, struct opentelemetry_body_key, _head); + flb_sds_destroy(bk->key); + flb_ra_destroy(bk->ra); + mk_list_del(&bk->_head); + flb_free(bk); + } +} + +static int metadata_mp_accessor_create(struct opentelemetry_context *ctx) +{ + int ret; + struct mk_list *head; + struct mk_list slist; + struct opentelemetry_body_key *bk; + struct flb_mp_accessor *mpa; + + ret = flb_slist_create(&slist); + if (ret != 0) { + return -1; + } + + /* Iterate the list of log body keys and create a mp_accessor for each one */ + mk_list_foreach(head, &ctx->log_body_key_list) { + bk = mk_list_entry(head, struct opentelemetry_body_key, _head); + + ret = flb_slist_add(&slist, bk->key); + if (ret != 0) { + flb_slist_destroy(&slist); + return -1; + } + } + + mpa = flb_mp_accessor_create(&slist); + if (!mpa) { + flb_slist_destroy(&slist); + return -1; + } + + ctx->mp_accessor = mpa; + flb_slist_destroy(&slist); + + return 0; +} + static int config_add_labels(struct flb_output_instance *ins, struct opentelemetry_context *ctx) { @@ -119,8 +235,7 @@ static char *sanitize_uri(char *uri){ return uri; } -struct opentelemetry_context *flb_opentelemetry_context_create( - struct flb_output_instance *ins, struct flb_config *config) +struct opentelemetry_context *flb_opentelemetry_context_create(struct flb_output_instance *ins, struct flb_config *config) { int ret; int io_flags = 0; @@ -142,6 +257,7 @@ struct opentelemetry_context *flb_opentelemetry_context_create( } ctx->ins = ins; mk_list_init(&ctx->kv_labels); + mk_list_init(&ctx->log_body_key_list); ret = flb_output_config_map_set(ins, (void *) ctx); if (ret == -1) { @@ -155,6 +271,7 @@ struct opentelemetry_context *flb_opentelemetry_context_create( return NULL; } + check_proxy(ins, ctx, host, port, protocol, metrics_uri); check_proxy(ins, ctx, host, port, protocol, logs_uri); @@ -202,6 +319,8 @@ struct opentelemetry_context *flb_opentelemetry_context_create( ctx->host = ins->host.name; ctx->port = ins->host.port; + + /* Logs Properties */ if (logs_uri == NULL) { flb_plg_trace(ctx->ins, "Could not allocate memory for sanitized " @@ -211,6 +330,26 @@ struct opentelemetry_context *flb_opentelemetry_context_create( ctx->logs_uri = logs_uri; } + /* list of 'logs_body_key' */ + ret = log_body_key_list_create(ctx); + if (ret != 0) { + flb_opentelemetry_context_destroy(ctx); + return NULL; + } + + /* + * Add the pattern to the mp_accessor list: for every key that populates the log body, we need + * it also in the mp_accessor list so remaining keys are set into the metadata field. + * + * This process is far from being optimal since we are kind of duplicating the logic, however + * we can simply use the API already exists in place, let's optimize later (if needed). + */ + ret = metadata_mp_accessor_create(ctx); + if (ret != 0) { + flb_opentelemetry_context_destroy(ctx); + return NULL; + } + if (traces_uri == NULL) { flb_plg_trace(ctx->ins, "Could not allocate memory for sanitized " @@ -244,8 +383,7 @@ struct opentelemetry_context *flb_opentelemetry_context_create( return ctx; } -void flb_opentelemetry_context_destroy( - struct opentelemetry_context *ctx) +void flb_opentelemetry_context_destroy(struct opentelemetry_context *ctx) { if (!ctx) { return; @@ -257,6 +395,13 @@ void flb_opentelemetry_context_destroy( flb_upstream_destroy(ctx->u); } + /* release log_body_key_list */ + log_body_key_list_destroy(ctx); + + if (ctx->mp_accessor) { + flb_mp_accessor_destroy(ctx->mp_accessor); + } + flb_free(ctx->proxy_host); flb_free(ctx); }