Skip to content

Commit

Permalink
Improvements to kafka checks
Browse files Browse the repository at this point in the history
- Update backend details to the doc sent to ES
- Close kafka consumer
- Increase the `end_time` used to get the kafka offset
  This is because, unlike ES, the timestamp is the one created
  by Kafka when the log message is recorded in Kafka. For example,
  in the case of ES, the timestamp added by FluenD becomes the timestamp
  field in ES, so we can query accunrately for log messages between the
  `start_time` and `end_time` of the log generation test. However, in the case
  of Kafka, we cannot accurately check for the log messages generated during
  the test because the records in Kafka and their corresponding timstamps for
  the offsets could be well after the `end_time` of the test as the messages
  slowly show up in the kafka cluster and there is no way to use the timestamp from
  FluentD as the timstamp for the kafka offset. The FluentD timestamp is actually
  in the content of the Kafka record. We currently check for all messages from the
  `start_time` and before the `timeout` configured in the CR.

Signed-off-by: Sai Sindhur Malleni <smalleni@redhat.com>
  • Loading branch information
smalleni committed May 24, 2021
1 parent d36f5b7 commit 709e81f
Showing 1 changed file with 9 additions and 2 deletions.
11 changes: 9 additions & 2 deletions snafu/log_generator_wrapper/trigger_log_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ def _json_payload(self, data):
"kafka_topic": self.kafka_topic,
"backend": "kafka",
}
payload.update(backend)
payload.update(data)
return payload

Expand Down Expand Up @@ -225,7 +226,7 @@ def _check_kafka(self, start_time, end_time):
) # get all offsets with timestamp in ms greater than or equal to start_time
last = consumer.offsets_for_times(
{tp: end_time * 1000}
) # get all offsets with timestamp in ms greater than or equal to start_time
) # get all offsets with timestamp in ms greater than or equal to end_time
if beg[tp] is None:
continue # continue to next partition if no offsets found during test in this partition
consumer.seek(tp, beg[tp].offset) # seek to the offset corresponding to start_time of test
Expand All @@ -246,6 +247,7 @@ def _check_kafka(self, start_time, end_time):
log_dict["message"] == self.my_message
): # check to make sure the message stored matches the message generated
count += 1
consumer.close()
return count

def emit_actions(self):
Expand Down Expand Up @@ -273,7 +275,12 @@ def emit_actions(self):
elif self.es_url:
messages_received = self._check_es(int(start_time), int(end_time))
elif self.kafka_bootstrap_server:
messages_received = self._check_kafka(start_time, end_time)
messages_received = self._check_kafka(start_time, end_time + self.timeout)
logger.info(
"Current messages received is {}, waiting more time for kafka".format(
messages_received
)
)
if messages_received == message_count:
received_all_messages = True
else:
Expand Down

0 comments on commit 709e81f

Please sign in to comment.