Skip to content

Proper termination sequence

edenhill edited this page Nov 18, 2014 · 8 revisions

Proper termination sequence

librdkafka is asynchronous in its nature and this is also true for the rd_kafka_destroy() call. It simply tells librdkafka to decommission the provided instance but it will take some time in doing so; cleanly closing network connections, terminating internal threads, etc.

     /* 1) Make sure all outstanding requests are transmitted and handled. */
     while (rd_kafka_outq_len(rk) > 0)
         rd_kafka_poll(rk, 50);

     /* 2) Destroy the topic and handle objects */
     rd_kafka_topic_destroy(rkt);
     rd_kafka_destroy(rk);

     /* 3) Wait for ALL rdkafka handles to decommission, this is typically done at the very end of an application. */
     rd_kafka_wait_destroyed();

Effects of not doing the above, for:

    1. Outstanding produce or offsetCommit requests may be dropped
    1. librdkafka will continue to operate on the handles. Actual memory leaks.
    1. Cosmetic memory leaks reported by memory profilers (e.g., valgrind)

Speeding up termination

To speed up the termination of librdkafka an application can set a termination signal that will be used by librdkafka internally to quickly cancel any outstanding I/O waits. Make sure you block this signal in your application.

   char tmp[16];
   snprintf(tmp, sizeof(tmp), "%i", SIGIO);  /* Or whatever signal you decide */
   rd_kafka_conf_set(rk_conf, "internal.termination.signal", tmp, errstr, sizeof(errstr));