Skip to content

Commit

Permalink
[logger] Make map access thread safe and proper terminate thread
Browse files Browse the repository at this point in the history
  • Loading branch information
kcudnik committed Jul 23, 2021
1 parent a90529f commit bf02298
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 29 deletions.
112 changes: 85 additions & 27 deletions common/logger.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,11 @@
#include "consumerstatetable.h"
#include "producerstatetable.h"

namespace swss {
using namespace swss;

void err_exit(const char *fn, int ln, int e, const char *fmt, ...)
#define MUTEX std::lock_guard<std::mutex> _lock(getInstance().m_mutex);

void swss::err_exit(const char *fn, int ln, int e, const char *fmt, ...)
{
va_list ap;
char buff[1024];
Expand All @@ -31,13 +33,34 @@ void err_exit(const char *fn, int ln, int e, const char *fmt, ...)
abort();
}

Logger::~Logger() {
if (m_settingThread) {
terminateSettingThread = true;
Logger::~Logger()
{
terminateSettingThread();
}

void Logger::terminateSettingThread()
{
// can't be executed under mutex, since it can cause deadlock

if (m_settingThread)
{
m_terminateSettingThread = true;

m_settingThread->join();

m_settingThread = nullptr;

m_terminateSettingThread = false;
}
}

void Logger::restartSettingThread()
{
terminateSettingThread();

m_settingThread.reset(new std::thread(&Logger::settingThread, this));
}

const Logger::PriorityStringMap Logger::priorityStringMap = {
{ "EMERG", SWSS_EMERG },
{ "ALERT", SWSS_ALERT },
Expand Down Expand Up @@ -85,17 +108,27 @@ void Logger::swssOutputNotify(const std::string &component, const std::string &o
}
}

void Logger::linkToDbWithOutput(const std::string &dbName, const PriorityChangeNotify& prioNotify, const std::string& defPrio, const OutputChangeNotify& outputNotify, const std::string& defOutput)
void Logger::linkToDbWithOutput(
const std::string& dbName,
const PriorityChangeNotify& prioNotify,
const std::string& defPrio,
const OutputChangeNotify& outputNotify,
const std::string& defOutput)
{
auto& logger = getInstance();

// Initialize internal DB with observer
logger.m_settingChangeObservers.insert(std::make_pair(dbName, std::make_pair(prioNotify, outputNotify)));
std::string prio, output;

{
MUTEX;

// Initialize internal DB with observer
logger.m_settingChangeObservers.insert(std::make_pair(dbName, std::make_pair(prioNotify, outputNotify)));
}

DBConnector db("LOGLEVEL_DB", 0);
auto keys = db.keys("*");

std::string key = dbName + ":" + dbName;
std::string prio, output;
bool doUpdate = false;
auto prioPtr = db.hget(key, DAEMON_LOGLEVEL);
auto outputPtr = db.hget(key, DAEMON_LOGOUTPUT);
Expand Down Expand Up @@ -130,8 +163,14 @@ void Logger::linkToDbWithOutput(const std::string &dbName, const PriorityChangeN
table.set(dbName, fieldValues);
}

logger.m_currentPrios[dbName] = prio;
logger.m_currentOutputs[dbName] = output;
{
MUTEX;
logger.m_currentPrios[dbName] = prio;
logger.m_currentOutputs[dbName] = output;
}

// execute callback outside mutex

prioNotify(dbName, prio);
outputNotify(dbName, output);
}
Expand All @@ -143,10 +182,9 @@ void Logger::linkToDb(const std::string &dbName, const PriorityChangeNotify& pri

void Logger::linkToDbNative(const std::string &dbName, const char * defPrio)
{
auto& logger = getInstance();

linkToDb(dbName, swssPrioNotify, defPrio);
logger.m_settingThread.reset(new std::thread(&Logger::settingThread, &logger));

getInstance().restartSettingThread();
}

Logger &Logger::getInstance()
Expand All @@ -171,10 +209,12 @@ void Logger::settingThread()
DBConnector db("LOGLEVEL_DB", 0);
std::map<std::string, std::shared_ptr<ConsumerStateTable>> selectables;

while (!terminateSettingThread)
while (!m_terminateSettingThread)
{
if (selectables.size() < m_settingChangeObservers.size())
{
MUTEX;

for (const auto& i : m_settingChangeObservers)
{
const std::string &dbName = i.first;
Expand Down Expand Up @@ -208,9 +248,17 @@ void Logger::settingThread()
dynamic_cast<ConsumerStateTable *>(selectable)->pop(koValues);
std::string key = kfvKey(koValues), op = kfvOp(koValues);

if ((op != SET_COMMAND) || (m_settingChangeObservers.find(key) == m_settingChangeObservers.end()))
std::pair<PriorityChangeNotify, OutputChangeNotify> pair;

{
continue;
MUTEX;

if ((op != SET_COMMAND) || (m_settingChangeObservers.find(key) == m_settingChangeObservers.end()))
{
continue;
}

pair = m_settingChangeObservers.at(key);
}

auto values = kfvFieldsValues(koValues);
Expand All @@ -219,13 +267,21 @@ void Logger::settingThread()
const std::string &field = fvField(i), &value = fvValue(i);
if ((field == DAEMON_LOGLEVEL) && (value != m_currentPrios[key]))
{
m_currentPrios[key] = value;
m_settingChangeObservers[key].first(key, value);
{
MUTEX;
m_currentPrios[key] = value;
}

pair.first(key, value);
}
else if ((field == DAEMON_LOGOUTPUT) && (value != m_currentOutputs[key]))
{
m_currentOutputs[key] = value;
m_settingChangeObservers[key].second(key, value);
{
MUTEX;
m_currentOutputs[key] = value;
}

pair.second(key, value);
}

break;
Expand All @@ -246,14 +302,16 @@ void Logger::write(Priority prio, const char *fmt, ...)

if (m_output == SWSS_SYSLOG)
{
vsyslog(prio, fmt, ap);
vsyslog(prio, fmt, ap);
}
else
{
std::stringstream ss;
ss << std::setw(6) << std::right << priorityToString(prio);
ss << fmt << std::endl;
std::lock_guard<std::mutex> lock(m_mutex);

MUTEX;

if (m_output == SWSS_STDOUT)
{
vprintf(ss.str().c_str(), ap);
Expand Down Expand Up @@ -283,7 +341,9 @@ void Logger::wthrow(Priority prio, const char *fmt, ...)
std::stringstream ss;
ss << std::setw(6) << std::right << priorityToString(prio);
ss << fmt << std::endl;
std::lock_guard<std::mutex> lock(m_mutex);

MUTEX;

if (m_output == SWSS_STDOUT)
{
vprintf(ss.str().c_str(), ap);
Expand Down Expand Up @@ -363,5 +423,3 @@ Logger::ScopeTimer::~ScopeTimer()

Logger::getInstance().write(swss::Logger::SWSS_NOTICE, ":- %s: %s took %lf sec", m_fun, m_msg.c_str(), duration);
}

};
13 changes: 11 additions & 2 deletions common/logger.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,14 @@ class Logger
static Logger &getInstance();
static void setMinPrio(Priority prio);
static Priority getMinPrio();
static void linkToDbWithOutput(const std::string &dbName, const PriorityChangeNotify& prioNotify, const std::string& defPrio, const OutputChangeNotify& outputNotify, const std::string& defOutput);

static void linkToDbWithOutput(
const std::string& dbName,
const PriorityChangeNotify& prioNotify,
const std::string& defPrio,
const OutputChangeNotify& outputNotify,
const std::string& defOutput);

static void linkToDb(const std::string &dbName, const PriorityChangeNotify& notify, const std::string& defPrio);
// Must be called after all linkToDb to start select from DB
static void linkToDbNative(const std::string &dbName, const char * defPrio="NOTICE");
Expand Down Expand Up @@ -137,6 +144,8 @@ class Logger
static void swssOutputNotify(const std::string &component, const std::string &outputStr);

void settingThread();
void terminateSettingThread();
void restartSettingThread();

LogSettingChangeObservers m_settingChangeObservers;
std::map<std::string, std::string> m_currentPrios;
Expand All @@ -145,7 +154,7 @@ class Logger
std::atomic<Output> m_output = { SWSS_SYSLOG };
std::unique_ptr<std::thread> m_settingThread;
std::mutex m_mutex;
volatile bool terminateSettingThread = false;
volatile bool m_terminateSettingThread = false;
};

}
Expand Down

0 comments on commit bf02298

Please sign in to comment.