From 709e81faea89a21c8e158127da89a6ba2e09eafb Mon Sep 17 00:00:00 2001 From: Sai Sindhur Malleni Date: Mon, 24 May 2021 09:26:32 -0500 Subject: [PATCH] Improvements to kafka checks - Update backend details to the doc sent to ES - Close kafka consumer - Increase the `end_time` used to get the kafka offset This is because, unlike ES, the timestamp is the one created by Kafka when the log message is recorded in Kafka. For example, in the case of ES, the timestamp added by FluenD becomes the timestamp field in ES, so we can query accunrately for log messages between the `start_time` and `end_time` of the log generation test. However, in the case of Kafka, we cannot accurately check for the log messages generated during the test because the records in Kafka and their corresponding timstamps for the offsets could be well after the `end_time` of the test as the messages slowly show up in the kafka cluster and there is no way to use the timestamp from FluentD as the timstamp for the kafka offset. The FluentD timestamp is actually in the content of the Kafka record. We currently check for all messages from the `start_time` and before the `timeout` configured in the CR. Signed-off-by: Sai Sindhur Malleni --- snafu/log_generator_wrapper/trigger_log_generator.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/snafu/log_generator_wrapper/trigger_log_generator.py b/snafu/log_generator_wrapper/trigger_log_generator.py index 2fdb6e35..f9414ef5 100755 --- a/snafu/log_generator_wrapper/trigger_log_generator.py +++ b/snafu/log_generator_wrapper/trigger_log_generator.py @@ -92,6 +92,7 @@ def _json_payload(self, data): "kafka_topic": self.kafka_topic, "backend": "kafka", } + payload.update(backend) payload.update(data) return payload @@ -225,7 +226,7 @@ def _check_kafka(self, start_time, end_time): ) # get all offsets with timestamp in ms greater than or equal to start_time last = consumer.offsets_for_times( {tp: end_time * 1000} - ) # get all offsets with timestamp in ms greater than or equal to start_time + ) # get all offsets with timestamp in ms greater than or equal to end_time if beg[tp] is None: continue # continue to next partition if no offsets found during test in this partition consumer.seek(tp, beg[tp].offset) # seek to the offset corresponding to start_time of test @@ -246,6 +247,7 @@ def _check_kafka(self, start_time, end_time): log_dict["message"] == self.my_message ): # check to make sure the message stored matches the message generated count += 1 + consumer.close() return count def emit_actions(self): @@ -273,7 +275,12 @@ def emit_actions(self): elif self.es_url: messages_received = self._check_es(int(start_time), int(end_time)) elif self.kafka_bootstrap_server: - messages_received = self._check_kafka(start_time, end_time) + messages_received = self._check_kafka(start_time, end_time + self.timeout) + logger.info( + "Current messages received is {}, waiting more time for kafka".format( + messages_received + ) + ) if messages_received == message_count: received_all_messages = True else: