Skip to content

Proper termination sequence

Magnus Edenhill edited this page Apr 12, 2016 · 8 revisions

Proper termination sequence

librdkafka is asynchronous in its nature and performs most operation in its background threads. It simply tells librdkafka to decommission the provided instance but it will take some time in doing so; cleanly closing network connections, terminating internal threads, etc.

High-level KafkaConsumer

Proper termination sequence for the high-level KafkaConsumer is:

     /* 1) Close the consumer, committing final offsets, etc. */
     rd_kafka_consumer_close(rk);

     /* 2) Destroy handle object */
     rd_kafka_destroy(rk);

NOTE: There is no need to unsubscribe prior to calling consumer close()

NOTE: Any topic objects created must be destroyed prior to rd_kafka_destroy()

Effects of not doing the above, for:

    1. Final offsets are not committed.
    1. librdkafka will continue to operate on the handles. Actual memory leaks.

Producers and simple legacy Consumers

The proper termination sequence for Producers and simple legacy consumers sequence is:

     /* 1) Make sure all outstanding requests are transmitted and handled. */
     while (rd_kafka_outq_len(rk) > 0)
         rd_kafka_poll(rk, 50);

     /* 2) Destroy the topic and handle objects */
     rd_kafka_topic_destroy(rkt);
     rd_kafka_destroy(rk);

     /* NOTE: This is only needed for librdkafka versions < 0.9, since 0.9 the rd_kafka_destroy() call is blocking. */
     /* 3) Wait for ALL rdkafka handles to decommission, this is typically done at the very end of an application. */
     rd_kafka_wait_destroyed();

Effects of not doing the above, for:

    1. Outstanding produce or offsetCommit requests may be dropped
    1. librdkafka will continue to operate on the handles. Actual memory leaks.
    1. Only versions < 0.9: Cosmetic memory leaks reported by memory profilers (e.g., valgrind)

Speeding up termination

To speed up the termination of librdkafka an application can set a termination signal that will be used by librdkafka internally to quickly cancel any outstanding I/O waits. Make sure you block this signal in your application.

   char tmp[16];
   snprintf(tmp, sizeof(tmp), "%i", SIGIO);  /* Or whatever signal you decide */
   rd_kafka_conf_set(rk_conf, "internal.termination.signal", tmp, errstr, sizeof(errstr));