Skip to content

Commit

Permalink
async logger test - correctness and performance checkings added
Browse files Browse the repository at this point in the history
  • Loading branch information
remibettan committed Jan 14, 2021
1 parent e54c618 commit 2ab8086
Show file tree
Hide file tree
Showing 3 changed files with 183 additions and 37 deletions.
2 changes: 1 addition & 1 deletion CMake/global_config.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ macro(global_set_flags)

#Performance improvement with Ubuntu 18/20
if(UNIX AND (NOT ANDROID_NDK_TOOLCHAIN_INCLUDED))
message(INFO "Asynchronous ELPP invoked - experimental")
message(INFO "Asynchronous ELPP invoked")
add_definitions(-DELPP_EXPERIMENTAL_ASYNC)
endif()

Expand Down
17 changes: 5 additions & 12 deletions third-party/easyloggingpp/src/easylogging++.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2108,10 +2108,10 @@ Storage::Storage(const LogBuilderPtr& defaultLogBuilder) :
addFlag(LoggingFlag::AllowVerboseIfModuleNotSpecified);
#if ELPP_ASYNC_LOGGING
installLogDispatchCallback<base::AsyncLogDispatchCallback>(std::string("AsyncLogDispatchCallback"));
ELPP_INTERNAL_INFO(1, "ELPP ASYNC logger");
ELPP_INTERNAL_INFO(1, "ELPP ASYNC logger selected");
#else
installLogDispatchCallback<base::DefaultLogDispatchCallback>(std::string("DefaultLogDispatchCallback"));
ELPP_INTERNAL_INFO(1, "ELPP sync logger");
ELPP_INTERNAL_INFO(1, "ELPP sync logger selected");
#endif // ELPP_ASYNC_LOGGING
#if defined(ELPP_FEATURE_ALL) || defined(ELPP_FEATURE_PERFORMANCE_TRACKING)
installPerformanceTrackingCallback<base::DefaultPerformanceTrackingCallback>
Expand All @@ -2120,7 +2120,6 @@ Storage::Storage(const LogBuilderPtr& defaultLogBuilder) :
ELPP_INTERNAL_INFO(1, "Easylogging++ has been initialized");
#if ELPP_ASYNC_LOGGING
m_asyncDispatchWorker->start();
ELPP_INTERNAL_INFO(1, "Storage::Done");
#endif // ELPP_ASYNC_LOGGING
}

Expand Down Expand Up @@ -2313,12 +2312,6 @@ void DefaultLogDispatchCallback::dispatch(base::type::string_t&& logLine) {
void AsyncLogDispatchCallback::handle(const LogDispatchData* data) {
base::type::string_t logLine = data->logMessage()->logger()->logBuilder()->build(data->logMessage(),
data->dispatchAction() == base::DispatchAction::NormalLog);
/*if (data->dispatchAction() == base::DispatchAction::NormalLog
&& data->logMessage()->logger()->typedConfigurations()->toStandardOutput(data->logMessage()->level())) {
if (ELPP->hasFlag(LoggingFlag::ColoredTerminalOutput))
data->logMessage()->logger()->logBuilder()->convertToColoredOutput(&logLine, data->logMessage()->level());
ELPP_COUT << ELPP_COUT_LINE(logLine);
}*/
// Save resources and only queue if we want to write to file otherwise just ignore handler
auto conf = data->logMessage()->logger()->typedConfigurations();
if (conf->toStandardOutput(data->logMessage()->level()) ||
Expand Down Expand Up @@ -2436,11 +2429,11 @@ void AsyncDispatchWorker::handle(AsyncLogItem* logItem) {
// This method is used in order to transfer all the logs:
// from the "write queue" - queue in which all the logs are added by the other threads
// to the "read queue" - queue from which the logs are read and dispatched by the async logger's thread
// This double buffer mechanism avoids from the other threads the need to wait writing their logs
// until other logs are read from the queue.
// This double buffer mechanism minimizes the inter-thread locking time, improving log's bandwidth and
// preventing costly stalls due to log flushes to HD.
void AsyncDispatchWorker::fetchLogQueue()
{
if (ELPP && ELPP->asyncLogWriteQueue() && ELPP->asyncLogWriteQueue()->size() > 0) {
if (ELPP && ELPP->asyncLogWriteQueue() && ELPP->asyncLogWriteQueue()->size()) {
base::threading::ScopedLock scopedLockW(ELPP->asyncLogWriteQueue()->lock());
base::threading::ScopedLock scopedLockR(ELPP->asyncLogReadQueue()->lock());
ELPP->asyncLogWriteQueue()->appendTo(ELPP->asyncLogReadQueue());
Expand Down
201 changes: 177 additions & 24 deletions unit-tests/log/test-multi-thread-logs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,74 @@
#include "log-common.h"
#include <atomic>


#ifdef ELPP_EXPERIMENTAL_ASYNC

std::atomic<int> atomic_integer(0);
std::chrono::milliseconds global_ms = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch());
std::chrono::milliseconds max_time = (std::chrono::milliseconds)0;
std::chrono::milliseconds min_time = (std::chrono::milliseconds)50;
std::chrono::milliseconds avg_time = (std::chrono::milliseconds)0;
int max_time_iteration = -1;
int number_of_iterations = 10000;
const int number_of_iterations = 10000;
const int number_of_threads = 10;
//thresholds
const int checked_log_write_time = 10; //ms
const int required_log_write_time = 50; //ms
const int checked_lag_time = 15; //ms
const int required_lag_time = 50; //ms
//----------------------- HELPER METHODS START -------------------------
int get_time_from_line(std::string line)
{
auto time_word = line.substr(7, 12);
auto hours = stoi(time_word.substr(0, 2));
auto minutes = stoi(time_word.substr(3, 5));
auto seconds = stoi(time_word.substr(6, 8));
auto ms = stoi(time_word.substr(9, 12));
//std::cout << "h:m:s:ms = " << hours << ":" << minutes << ":" << seconds << ":" << ms << std::endl;
return (ms + seconds * 1000 + minutes * 1000 * 60 + hours * 1000 * 60 * 60);
}


std::string get_last_line(std::ifstream& in)
{
std::string line;
for (int i = 0; i < (number_of_iterations * number_of_threads) - 1; ++i)
{
getline(in, line);
}
return line;
}

// inputs are use as 2 points that define a line:
// first_time_ms, first_value : x1, y1
// second_time_ms, second_value : x2, y2
// value_in_log : y3
// return value will be x3
int get_log_ideal_time(int first_time_ms, int first_value, int second_time_ms, int second_value, int value_in_log)
{
int ideal_time = value_in_log * (second_time_ms - first_time_ms) / (second_value - first_value) +
(first_time_ms * (second_value / (second_value - first_value))) - second_time_ms * (first_value / (second_value - first_value));

return ideal_time;
}

std::pair<unsigned int, int> get_next_50_lines_time_val(std::ifstream& ifs)
{
unsigned int avg_time_ms = -1;
int avg_value = -1;
for (int i = 0; i < 50; ++i) {
std::string line;
getline(ifs, line);
avg_time_ms += get_time_from_line(line);
avg_value += stoi(line.substr(line.find_last_of(" ")));
}
avg_time_ms /= 50;
avg_value /= 50;
return std::make_pair(avg_time_ms, avg_value);
}

//----------------------- HELPER METHODS END ---------------------------

#ifdef ELPP_EXPERIMENTAL_ASYNC
TEST_CASE("async logger", "[log][async_log]")
{
size_t n_callbacks = 0;
Expand All @@ -27,37 +86,31 @@ TEST_CASE("async logger", "[log][async_log]")
int iterations = 0;
auto start = std::chrono::steady_clock::now();

while (std::chrono::steady_clock::now() - start < std::chrono::seconds(20))
while (iterations < number_of_iterations)
{
std::stringstream ss;
int value_to_check = (required_value) + 10 * iterations;
ss << "atomic integer = " << ++atomic_integer << " and required value = " << value_to_check;
ss << "atomic integer = " << ++atomic_integer;
auto start_ms = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch());
rs2::log(RS2_LOG_SEVERITY_DEBUG, ss.str().c_str());
std::chrono::milliseconds ms = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch());
std::chrono::milliseconds delta_ms = ms - start_ms;
if (delta_ms > max_time) {
max_time = delta_ms; max_time_iteration = value_to_check;
}
if (delta_ms > max_time) max_time = delta_ms;
if (delta_ms < min_time) min_time = delta_ms;
avg_time += delta_ms;
start_ms = ms;
++iterations;
/*bool value_result = (atomic_integer <= value_to_check + 3) && (atomic_integer >= value_to_check - 3);
if (!value_result)
int a = 1;*/
/*bool performance_result = (delta_ms < (std::chrono::milliseconds)20);
if (!performance_result)
int a = 1;*/
//std::cout << " , delta_ms = " << delta_ms.count() << std::endl;
}
};
rs2::log_to_file(RS2_LOG_SEVERITY_DEBUG);
//rs2::log_to_callback(RS2_LOG_SEVERITY_DEBUG, callback);
global_ms = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch());
const char* log_file_path = ".//multi-thread-logs.log";
try {
remove(log_file_path);
}
catch (...) {}
rs2::log_to_file(RS2_LOG_SEVERITY_DEBUG, log_file_path);

std::vector<std::thread> threads;

for (int i = 0; i < 10; ++i)
for (int i = 0; i < number_of_threads; ++i)
{
threads.push_back(std::thread(func, i + 1));
}
Expand All @@ -68,10 +121,110 @@ TEST_CASE("async logger", "[log][async_log]")
}

std::this_thread::sleep_for(std::chrono::milliseconds(5000));
std::cout << "max time = " << max_time.count() << " ms, on iteration: " << max_time_iteration << std::endl;
std::cout << "min time = " << min_time.count() << " ms" << std::endl;
std::cout << "avg time = " << (float) (avg_time.count()) / (number_of_iterations * 10.f) << " ms" << std::endl;
REQUIRE(max_time.count() < 10);
std::stringstream ss;
CAPTURE(max_time);
CAPTURE(min_time);
CAPTURE(avg_time);
CHECK_NOFAIL(max_time.count() < checked_log_write_time);
REQUIRE(max_time.count() < required_log_write_time);

// checking logs correctness - that all the integer's values have been logged
{
// preparing array of number of iterations * number of threads
std::array<char, number_of_iterations * number_of_threads> check_table;
// setting all cells to 0
memset(&check_table[0], 0, check_table.size());
// marking cells at the position of each log integer value to 1
std::ifstream check_file(log_file_path, std::ios::in);
if (check_file.good())
{
std::string line;
while (std::getline(check_file, line)) {
auto value_in_log = stoi(line.substr(line.find_last_of(" ")));
REQUIRE(value_in_log > 0);
REQUIRE((value_in_log - 1) < (number_of_iterations* number_of_threads));
check_table.at(value_in_log - 1) = 1;
}
// checking that all cells have value 1, which means that all logs have been sent and received
std::for_each(check_table.cbegin(), check_table.cend(), [](const char c) {
REQUIRE(c == 1);
});
check_file.close();
}
}

// checking logs order in means of logging time
{
std::ifstream check_file(log_file_path, std::ios::in);
int overall_ms = 0;
int overall_logs = 0;
if (check_file.good())
{
// get data to calculate "ideal" line
// using average from 50 first logs as one point,
// and average of 50 last points as second point
auto first_50_pair = get_next_50_lines_time_val(check_file);
unsigned int first_50_avg_time_ms = first_50_pair.first;
int first_50_avg_value = first_50_pair.second;

std::string line;
// the aim of this loop is to get the file's cursor before the 50 last lines,
// considering it is already after the 50 first lines
for (int i = 0; i < (number_of_iterations * number_of_threads) - 1 - 50 - 50; ++i)
getline(check_file, line);

auto last_50_pair = get_next_50_lines_time_val(check_file);
unsigned int last_50_avg_time_ms = last_50_pair.first;
int last_50_avg_value = last_50_pair.second;

// clear and return to start of the file
check_file.clear();
check_file.seekg(0);

// go over each line and check that its logging time is not "far away" in means of time than the "ideal" time
int max_delta = 0;
while (getline(check_file, line)) {
auto value_in_log = stoi(line.substr(line.find_last_of(" ")));
auto log_time_ms = get_time_from_line(line);
auto log_ideal_time_ms = get_log_ideal_time(first_50_avg_time_ms, first_50_avg_value, last_50_avg_time_ms, last_50_avg_value, value_in_log);
auto delta = std::abs(log_ideal_time_ms - log_time_ms);
if (delta > max_delta) max_delta = delta;
}
CHECK_NOFAIL(max_delta < checked_lag_time);
REQUIRE(max_delta < required_lag_time);
}
check_file.close();
}

// checking number of logs throughput per ms
{
std::ifstream check_file(log_file_path, std::ios::in);
int overall_ms = 0;
int overall_logs = 0;
if (check_file.good())
{
std::string line;
while (std::getline(check_file, line)) {
auto previous_time = get_time_from_line(line);
int logs_counter_in_one_ms = 1;
while (std::getline(check_file, line))
{
auto current_time = get_time_from_line(line);
if (current_time != previous_time)
{
overall_logs += logs_counter_in_one_ms;
++overall_ms;
break;
}
else
++logs_counter_in_one_ms;
}
}
}
check_file.close();
float average_logs_in_one_ms = (float)overall_logs / overall_ms;
REQUIRE(average_logs_in_one_ms >= 20.0F);
}
}
#endif

Expand Down

0 comments on commit 2ab8086

Please sign in to comment.