diff --git a/lib/remote/apilistener-configsync.cpp b/lib/remote/apilistener-configsync.cpp index a12db0bca73..62d342f0fd3 100644 --- a/lib/remote/apilistener-configsync.cpp +++ b/lib/remote/apilistener-configsync.cpp @@ -7,6 +7,7 @@ #include "base/configtype.hpp" #include "base/json.hpp" #include "base/convert.hpp" +#include "base/defer.hpp" #include "config/vmops.hpp" #include @@ -104,6 +105,15 @@ Value ApiListener::ConfigUpdateObjectAPIHandler(const MessageOrigin::Ptr& origin return Empty; } + // Wait for the object name to become available for processing and block it immediately. + // Doing so guarantees that only one cluster event (create/update/delete) of a given + // object is being processed at any given time. + listener->m_ObjectConfigChangeLock.Lock(ptype, objName); + + Defer unlockAndNotify([&listener, &ptype, &objName]{ + listener->m_ObjectConfigChangeLock.Unlock(ptype, objName); + }); + ConfigObject::Ptr object = ctype->GetObject(objName); String config = params->Get("config"); @@ -258,6 +268,15 @@ Value ApiListener::ConfigDeleteObjectAPIHandler(const MessageOrigin::Ptr& origin return Empty; } + // Wait for the object name to become available for processing and block it immediately. + // Doing so guarantees that only one cluster event (create/update/delete) of a given + // object is being processed at any given time. + listener->m_ObjectConfigChangeLock.Lock(ptype, objName); + + Defer unlockAndNotify([&listener, &ptype, &objName]{ + listener->m_ObjectConfigChangeLock.Unlock(ptype, objName); + }); + ConfigObject::Ptr object = ctype->GetObject(objName); if (!object) { @@ -462,3 +481,37 @@ void ApiListener::SendRuntimeConfigObjects(const JsonRpcConnection::Ptr& aclient Log(LogInformation, "ApiListener") << "Finished syncing runtime objects to endpoint '" << endpoint->GetName() << "'."; } + +/** + * Locks the specified object name of the given type. If it is already locked, the call blocks until the lock is released. + * + * @param Type::Ptr ptype The type of the object you want to lock + * @param String objName The object name you want to lock + */ +void ObjectNameMutex::Lock(const Type::Ptr& ptype, const String& objName) +{ + std::unique_lock lock(m_Mutex); + m_CV.wait(lock, [this, &ptype, &objName]{ + auto& locked = m_LockedObjectNames[ptype.get()]; + return locked.find(objName) == locked.end(); + }); + + // Add object name to the locked list again to block all other threads that try + // to process a message affecting the same object. + m_LockedObjectNames[ptype.get()].emplace(objName); +} + +/** + * Unlocks the specified object name of the given type. + * + * @param Type::Ptr ptype The type of the object you want to unlock + * @param String objName The name of the object you want to unlock + */ +void ObjectNameMutex::Unlock(const Type::Ptr& ptype, const String& objName) +{ + { + std::unique_lock lock(m_Mutex); + m_LockedObjectNames[ptype.get()].erase(objName); + } + m_CV.notify_all(); +} diff --git a/lib/remote/apilistener.hpp b/lib/remote/apilistener.hpp index fced0a8afb1..a070652e6d0 100644 --- a/lib/remote/apilistener.hpp +++ b/lib/remote/apilistener.hpp @@ -71,6 +71,26 @@ enum class ApiCapabilities : uint_fast64_t IfwApiCheckCommand = 1u << 1u, }; +/** + * Allows you to easily lock/unlock a specific object of a given type by its name. + * + * That way, locking an object "this" of type Host does not affect an object "this" of + * type "Service" nor an object "other" of type "Host". + * + * @ingroup remote + */ +class ObjectNameMutex +{ +public: + void Lock(const Type::Ptr& ptype, const String& objName); + void Unlock(const Type::Ptr& ptype, const String& objName); + +private: + std::mutex m_Mutex; + std::condition_variable m_CV; + std::map> m_LockedObjectNames; +}; + /** * @ingroup remote */ @@ -257,6 +277,9 @@ class ApiListener final : public ObjectImpl mutable std::mutex m_ActivePackageStagesLock; std::map m_ActivePackageStages; + /* ensures that at most one create/update/delete is being processed per object at each time */ + mutable ObjectNameMutex m_ObjectConfigChangeLock; + void UpdateActivePackageStagesCache(); }; diff --git a/lib/remote/configobjectutility.cpp b/lib/remote/configobjectutility.cpp index 62c910b41f4..9502bde9595 100644 --- a/lib/remote/configobjectutility.cpp +++ b/lib/remote/configobjectutility.cpp @@ -5,6 +5,7 @@ #include "remote/apilistener.hpp" #include "config/configcompiler.hpp" #include "config/configitem.hpp" +#include "base/atomic-file.hpp" #include "base/configwriter.hpp" #include "base/exception.hpp" #include "base/dependencygraph.hpp" @@ -198,13 +199,21 @@ bool ConfigObjectUtility::CreateObject(const Type::Ptr& type, const String& full return false; } + // AtomicFile doesn't create not yet existing directories, so we have to do it by ourselves. Utility::MkDirP(Utility::DirName(path), 0700); - std::ofstream fp(path.CStr(), std::ofstream::out | std::ostream::trunc); + // Using AtomicFile guarantees that two different threads simultaneously creating and loading the same + // configuration file do not interfere with each other, as the configuration is stored in a unique temp file. + // When one thread fails to pass object validation, it only deletes its temporary file and does not affect + // the other thread in any way. + AtomicFile fp(path, 0644); fp << config; - fp.close(); + // Flush the output buffer to catch any errors ASAP and handle them accordingly! + // Note: AtomicFile places these configs in a temp file and will be automatically + // discarded when it is not committed before going out of scope. + fp.flush(); - std::unique_ptr expr = ConfigCompiler::CompileFile(path, String(), "_api"); + std::unique_ptr expr = ConfigCompiler::CompileText(path, config, String(), "_api"); try { ActivationScope ascope; @@ -225,9 +234,7 @@ bool ConfigObjectUtility::CreateObject(const Type::Ptr& type, const String& full if (!ConfigItem::CommitItems(ascope.GetContext(), upq, newItems, true)) { if (errors) { Log(LogNotice, "ConfigObjectUtility") - << "Failed to commit config item '" << fullName << "'. Aborting and removing config path '" << path << "'."; - - Utility::Remove(path); + << "Failed to commit config item '" << fullName << "'."; for (const boost::exception_ptr& ex : upq.GetExceptions()) { errors->Add(DiagnosticInformation(ex, false)); @@ -248,9 +255,7 @@ bool ConfigObjectUtility::CreateObject(const Type::Ptr& type, const String& full if (!ConfigItem::ActivateItems(newItems, true, false, false, cookie)) { if (errors) { Log(LogNotice, "ConfigObjectUtility") - << "Failed to activate config object '" << fullName << "'. Aborting and removing config path '" << path << "'."; - - Utility::Remove(path); + << "Failed to activate config object '" << fullName << "'."; for (const boost::exception_ptr& ex : upq.GetExceptions()) { errors->Add(DiagnosticInformation(ex, false)); @@ -275,6 +280,9 @@ bool ConfigObjectUtility::CreateObject(const Type::Ptr& type, const String& full ConfigObject::Ptr obj = ctype->GetObject(fullName); if (obj) { + // Object has surpassed the compiling/validation processes, we can safely commit the file! + fp.Commit(); + Log(LogInformation, "ConfigObjectUtility") << "Created and activated object '" << fullName << "' of type '" << type->GetName() << "'."; } else { @@ -283,8 +291,6 @@ bool ConfigObjectUtility::CreateObject(const Type::Ptr& type, const String& full } } catch (const std::exception& ex) { - Utility::Remove(path); - if (errors) errors->Add(DiagnosticInformation(ex, false));