Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EasyLogging - add asynchronous handling (Linux) #8048

Merged
merged 17 commits into from
Jan 26, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CMake/global_config.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ macro(global_set_flags)
add_definitions(-DBUILD_EASYLOGGINGPP)
endif()

if (ENABLE_EASYLOGGINGPP_ASYNC)
add_definitions(-DEASYLOGGINGPP_ASYNC)
endif()

if(TRACE_API)
add_definitions(-DTRACE_API)
endif()
Expand Down
7 changes: 7 additions & 0 deletions CMake/lrs_options.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,10 @@ option(FORCE_LIBUVC "Explicitly turn-on libuvc backend - deprecated, use FORCE_R
option(FORCE_WINUSB_UVC "Explicitly turn-on winusb_uvc (for win7) backend - deprecated, use FORCE_RSUSB_BACKEND instead" OFF)
option(ANDROID_USB_HOST_UVC "Build UVC backend for Android - deprecated, use FORCE_RSUSB_BACKEND instead" OFF)
option(CHECK_FOR_UPDATES "Checks for versions updates" OFF)
#Performance improvement with Ubuntu 18/20
if(UNIX AND (NOT ANDROID_NDK_TOOLCHAIN_INCLUDED))
option(ENABLE_EASYLOGGINGPP_ASYNC "Switch Logger to Asynchronous Mode (set OFF for Synchronous Mode)" ON)
else()
option(ENABLE_EASYLOGGINGPP_ASYNC "Switch Logger to Asynchronous Mode (set OFF for Synchronous Mode)" OFF)
endif()

175 changes: 114 additions & 61 deletions third-party/easyloggingpp/src/easylogging++.cc
Original file line number Diff line number Diff line change
Expand Up @@ -649,22 +649,35 @@ Logger& Logger::operator=(const Logger& logger) {
}

void Logger::configure(const Configurations& configurations) {
m_isConfigured = false; // we set it to false in case if we fail
initUnflushedCount();
if (m_typedConfigurations != nullptr) {
Configurations* c = const_cast<Configurations*>(m_typedConfigurations->configurations());
if (c->hasConfiguration(Level::Global, ConfigurationType::Filename)) {
flush();
#if ELPP_ASYNC_LOGGING
if (ELPP) {
base::threading::ScopedLock scopedLockConfig(ELPP->configLock());
performConfig(configurations);
}
}
base::threading::ScopedLock scopedLock(lock());
if (m_configurations != configurations) {
m_configurations.setFromBase(const_cast<Configurations*>(&configurations));
}
base::utils::safeDelete(m_typedConfigurations);
m_typedConfigurations = new base::TypedConfigurations(&m_configurations, m_logStreamsReference);
resolveLoggerFormatSpec();
m_isConfigured = true;
else
performConfig(configurations);
#else
performConfig(configurations);
#endif // ELPP_ASYNC_LOGGING
}

void Logger::performConfig(const Configurations& configurations) {
m_isConfigured = false; // we set it to false in case if we fail
initUnflushedCount();
if (m_typedConfigurations != nullptr) {
Configurations* c = const_cast<Configurations*>(m_typedConfigurations->configurations());
if (c->hasConfiguration(Level::Global, ConfigurationType::Filename)) {
flush();
}
}
base::threading::ScopedLock scopedLock(lock());
if (m_configurations != configurations) {
m_configurations.setFromBase(const_cast<Configurations*>(&configurations));
}
base::utils::safeDelete(m_typedConfigurations);
m_typedConfigurations = new base::TypedConfigurations(&m_configurations, m_logStreamsReference);
resolveLoggerFormatSpec();
m_isConfigured = true;
}

void Logger::reconfigure(void) {
Expand Down Expand Up @@ -2072,7 +2085,8 @@ Storage::Storage(const LogBuilderPtr& defaultLogBuilder) :
m_flags(ELPP_DEFAULT_LOGGING_FLAGS),
m_vRegistry(new base::VRegistry(0, &m_flags)),
#if ELPP_ASYNC_LOGGING
m_asyncLogQueue(new base::AsyncLogQueue()),
m_asyncLogWriteQueue(new base::AsyncLogQueue()),
m_asyncLogReadQueue(new base::AsyncLogQueue()),
m_asyncDispatchWorker(asyncDispatchWorker),
#endif // ELPP_ASYNC_LOGGING
m_preRollOutCallback(base::defaultPreRollOutCallback) {
Expand All @@ -2094,8 +2108,10 @@ Storage::Storage(const LogBuilderPtr& defaultLogBuilder) :
addFlag(LoggingFlag::AllowVerboseIfModuleNotSpecified);
#if ELPP_ASYNC_LOGGING
installLogDispatchCallback<base::AsyncLogDispatchCallback>(std::string("AsyncLogDispatchCallback"));
ELPP_INTERNAL_INFO(1, "ELPP ASYNC logger selected");
#else
installLogDispatchCallback<base::DefaultLogDispatchCallback>(std::string("DefaultLogDispatchCallback"));
ELPP_INTERNAL_INFO(1, "ELPP sync logger selected");
#endif // ELPP_ASYNC_LOGGING
#if defined(ELPP_FEATURE_ALL) || defined(ELPP_FEATURE_PERFORMANCE_TRACKING)
installPerformanceTrackingCallback<base::DefaultPerformanceTrackingCallback>
Expand All @@ -2115,8 +2131,9 @@ Storage::~Storage(void) {
installLogDispatchCallback<base::DefaultLogDispatchCallback>(std::string("DefaultLogDispatchCallback"));
ELPP_INTERNAL_INFO(5, "Destroying asyncDispatchWorker");
base::utils::safeDelete(m_asyncDispatchWorker);
ELPP_INTERNAL_INFO(5, "Destroying asyncLogQueue");
base::utils::safeDelete(m_asyncLogQueue);
ELPP_INTERNAL_INFO(5, "Destroying asyncLogQueues");
base::utils::safeDelete(m_asyncLogWriteQueue);
base::utils::safeDelete(m_asyncLogReadQueue);
#endif // ELPP_ASYNC_LOGGING
ELPP_INTERNAL_INFO(5, "Destroying registeredHitCounters");
base::utils::safeDelete(m_registeredHitCounters);
Expand Down Expand Up @@ -2295,15 +2312,11 @@ void DefaultLogDispatchCallback::dispatch(base::type::string_t&& logLine) {
void AsyncLogDispatchCallback::handle(const LogDispatchData* data) {
base::type::string_t logLine = data->logMessage()->logger()->logBuilder()->build(data->logMessage(),
data->dispatchAction() == base::DispatchAction::NormalLog);
if (data->dispatchAction() == base::DispatchAction::NormalLog
&& data->logMessage()->logger()->typedConfigurations()->toStandardOutput(data->logMessage()->level())) {
if (ELPP->hasFlag(LoggingFlag::ColoredTerminalOutput))
data->logMessage()->logger()->logBuilder()->convertToColoredOutput(&logLine, data->logMessage()->level());
ELPP_COUT << ELPP_COUT_LINE(logLine);
}
// Save resources and only queue if we want to write to file otherwise just ignore handler
if (data->logMessage()->logger()->typedConfigurations()->toFile(data->logMessage()->level())) {
ELPP->asyncLogQueue()->push(AsyncLogItem(*(data->logMessage()), *data, logLine));
auto conf = data->logMessage()->logger()->typedConfigurations();
if (conf->toStandardOutput(data->logMessage()->level()) ||
conf->toFile(data->logMessage()->level())) {
ELPP->asyncLogWriteQueue()->push(AsyncLogItem(*(data->logMessage()), *data, logLine));
}
}

Expand All @@ -2315,61 +2328,76 @@ AsyncDispatchWorker::AsyncDispatchWorker() {
AsyncDispatchWorker::~AsyncDispatchWorker() {
setContinueRunning(false);
ELPP_INTERNAL_INFO(6, "Stopping dispatch worker - Cleaning log queue");
if (m_asyncWorkerThread.joinable())
m_asyncWorkerThread.join();
else
ELPP_INTERNAL_INFO(6, "logger not joinable");
clean();
ELPP_INTERNAL_INFO(6, "Log queue cleaned");
}

bool AsyncDispatchWorker::clean(void) {
std::mutex m;
std::unique_lock<std::mutex> lk(m);
cv.wait(lk, [] { return !ELPP->asyncLogQueue()->empty(); });
emptyQueue();
std::unique_lock<std::mutex> lk(_mtx);
try
{
fetchLogQueue();
emptyQueue();
}
catch(...){}
lk.unlock();
cv.notify_one();
return ELPP->asyncLogQueue()->empty();
return (ELPP && ELPP->asyncLogWriteQueue() && ELPP->asyncLogWriteQueue()->empty() && ELPP->asyncLogReadQueue() && ELPP->asyncLogReadQueue()->empty());
}

void AsyncDispatchWorker::emptyQueue(void) {
while (!ELPP->asyncLogQueue()->empty()) {
AsyncLogItem data = ELPP->asyncLogQueue()->next();
handle(&data);
base::threading::msleep(100);
}
if (ELPP && ELPP->asyncLogReadQueue()) {
for (auto i=0UL; i < ELPP->asyncLogReadQueue()->size(); i++) {
AsyncLogItem data = ELPP->asyncLogReadQueue()->next();
handle(&data);
}
}
}

void AsyncDispatchWorker::start(void) {
base::threading::msleep(5000); // 5s (why?)
setContinueRunning(true);
std::thread t1(&AsyncDispatchWorker::run, this);
t1.join();
m_asyncWorkerThread = std::thread(&AsyncDispatchWorker::run, this);
}

void AsyncDispatchWorker::handle(AsyncLogItem* logItem) {
LogDispatchData* data = logItem->data();
LogMessage* logMessage = logItem->logMessage();
Logger* logger = logMessage->logger();
//base::threading::ScopedLock scopedLock(logger->lock());
base::TypedConfigurations* conf = logger->typedConfigurations();
base::type::string_t logLine = logItem->logLine();
if (data->dispatchAction() == base::DispatchAction::NormalLog) {
if (conf->toFile(logMessage->level())) {
base::type::fstream_t* fs = conf->fileStream(logMessage->level());
if (fs != nullptr) {
fs->write(logLine.c_str(), logLine.size());
if (fs->fail()) {
ELPP_INTERNAL_ERROR("Unable to write log to file ["
<< conf->filename(logMessage->level()) << "].\n"
<< "Few possible reasons (could be something else):\n" << " * Permission denied\n"
<< " * Disk full\n" << " * Disk is not writable", true);
} else {
if (ELPP->hasFlag(LoggingFlag::ImmediateFlush) || (logger->isFlushNeeded(logMessage->level()))) {
logger->flush(logMessage->level(), fs);
if (conf) {
if (conf->toFile(logMessage->level())) {
base::type::fstream_t* fs = conf->fileStream(logMessage->level());
if (fs != nullptr) {
fs->write(logLine.c_str(), logLine.size());
if (fs->fail()) {
ELPP_INTERNAL_ERROR("Unable to write log to file ["
<< conf->filename(logMessage->level()) << "].\n"
<< "Few possible reasons (could be something else):\n" << " * Permission denied\n"
<< " * Disk full\n" << " * Disk is not writable", true);
}
else {
if (ELPP->hasFlag(LoggingFlag::ImmediateFlush) || (logger->isFlushNeeded(logMessage->level()))) {
logger->flush(logMessage->level(), fs);
}
}

}
else {
ELPP_INTERNAL_ERROR("Log file for [" << LevelHelper::convertToString(logMessage->level()) << "] "
<< "has not been configured but [TO_FILE] is configured to TRUE. [Logger ID: " << logger->id() << "]", false);
}
}
}
} else {
ELPP_INTERNAL_ERROR("Log file for [" << LevelHelper::convertToString(logMessage->level()) << "] "
<< "has not been configured but [TO_FILE] is configured to TRUE. [Logger ID: " << logger->id() << "]", false);
}
}
else if (conf->toStandardOutput(logMessage->level())) {
ELPP_COUT << ELPP_COUT_LINE(logLine);
}
}
}
# if defined(ELPP_SYSLOG)
else if (data->dispatchAction() == base::DispatchAction::SysLog) {
Expand Down Expand Up @@ -2398,11 +2426,30 @@ void AsyncDispatchWorker::handle(AsyncLogItem* logItem) {
# endif // defined(ELPP_SYSLOG)
}

// This method is used in order to transfer all the logs:
// from the "write queue" - queue in which all the logs are added by the other threads
// to the "read queue" - queue from which the logs are read and dispatched by the async logger's thread
// This double buffer mechanism minimizes the inter-thread locking time, improving log's bandwidth and
// preventing costly stalls due to log flushes to HD.
void AsyncDispatchWorker::fetchLogQueue()
remibettan marked this conversation as resolved.
Show resolved Hide resolved
{
if (ELPP && ELPP->asyncLogWriteQueue() && ELPP->asyncLogWriteQueue()->size()) {
base::threading::ScopedLock scopedLockW(ELPP->asyncLogWriteQueue()->lock());
base::threading::ScopedLock scopedLockR(ELPP->asyncLogReadQueue()->lock());
ELPP->asyncLogWriteQueue()->appendTo(ELPP->asyncLogReadQueue());
ELPP->asyncLogWriteQueue()->clear();
}
}

void AsyncDispatchWorker::run(void) {
while (continueRunning()) {
emptyQueue();
base::threading::msleep(10); // 10ms
}
while (continueRunning()) {
if (ELPP) {
base::threading::ScopedLock scopedLock(ELPP->configLock());
emptyQueue();
}
std::this_thread::sleep_for(std::chrono::milliseconds(5));
fetchLogQueue();
}
}
#endif // ELPP_ASYNC_LOGGING

Expand Down Expand Up @@ -2548,13 +2595,15 @@ MessageBuilder& MessageBuilder::operator<<(const wchar_t* msg) {
// Writer

Writer& Writer::construct(Logger* logger, bool needLock) {
if (!ELPP) return *this;
m_logger = logger;
initializeLogger(logger->id(), false, needLock);
m_messageBuilder.initialize(m_logger);
return *this;
}

Writer& Writer::construct(int count, const char* loggerIds, ...) {
if (!ELPP) return *this;
if (ELPP->hasFlag(LoggingFlag::MultiLoggerSupport)) {
va_list loggersList;
va_start(loggersList, loggerIds);
Expand Down Expand Up @@ -2602,6 +2651,7 @@ void Writer::initializeLogger(const std::string& loggerId, bool lookup, bool nee
}

void Writer::processDispatch() {
if (!ELPP) return;
#if ELPP_LOGGING_ENABLED
if (ELPP->hasFlag(LoggingFlag::MultiLoggerSupport)) {
bool firstDispatched = false;
Expand Down Expand Up @@ -3096,6 +3146,9 @@ bool Loggers::configureFromArg(const char* argKey) {
}

void Loggers::flushAll(void) {
#if ELPP_ASYNC_LOGGING
ELPP->asyncDispatchWorker()->clean();
#endif
ELPP->registeredLoggers()->flushAll();
}

Expand Down
Loading