Skip to content

Commit

Permalink
DPL integration (AliceO2Group#7)
Browse files Browse the repository at this point in the history
QC-47

* Added dependency on O2 (alidist will need to be updated as well - i will create PR shortly)
* Created TaskDataProcessor class, which executes user modules similarly to TaskDevice, but using DPL and O2 Data Model.
* Created TaskDataProcessorFactory, which generates DataProcessorSpec of QC task.
* Created TaskInterfaceDPL, similar to TaskInterface, to be inherited by user modules.
* Moved Activity class to separate file, because it is used by two different classes from now on.
* Moved implementation of TaskFactory.create to header and made it a template - now it can be used to get both TaskInterface or TaskInterfaceDPL children.
* Created TaskDPL - exemplary workflow using QC/DPL Task
* Created SkeletonDPL - exemplary user module
  • Loading branch information
knopers8 authored and Barthelemy committed Apr 26, 2018
1 parent 15c4ad4 commit b20df41
Show file tree
Hide file tree
Showing 27 changed files with 1,195 additions and 158 deletions.
14 changes: 13 additions & 1 deletion Framework/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ set(SRCS
src/MonitorObject.cxx
src/Quality.cxx
src/ObjectsManager.cxx
src/TaskFactory.cxx
src/Checker.cxx
src/CheckInterface.cxx
src/DatabaseFactory.cxx
Expand All @@ -76,6 +75,9 @@ set(SRCS
src/CcdbDatabase.cxx
src/InformationService.cxx
src/InformationServiceDump.cxx
src/TaskDataProcessor.cxx
src/TaskDataProcessorFactory.cxx
src/TaskInterfaceDPL.cxx
)

set(HEADERS # needed for the dictionary generation
Expand All @@ -85,6 +87,8 @@ set(HEADERS # needed for the dictionary generation
include/QualityControl/SpyMainFrame.h
include/QualityControl/DatabaseInterface.h
include/QualityControl/CcdbDatabase.h
include/QualityControl/TaskDataProcessor.h
include/QualityControl/TaskDataProcessorFactory.h
)

if(MYSQL_FOUND)
Expand Down Expand Up @@ -150,6 +154,14 @@ O2_GENERATE_EXECUTABLE(
BUCKET_NAME ${BUCKET_NAME}
)

O2_GENERATE_EXECUTABLE(
EXE_NAME taskDPL
SOURCES src/TaskDPL.cxx
MODULE_LIBRARY_NAME ${LIBRARY_NAME}
BUCKET_NAME ${BUCKET_NAME}
)
Install(FILES qcTaskDplConfig.ini DESTINATION etc/)

if (FAIRROOT_FOUND)
O2_GENERATE_EXECUTABLE(
EXE_NAME alfaTestReceiver
Expand Down
17 changes: 0 additions & 17 deletions Framework/apmon.cfg

This file was deleted.

30 changes: 1 addition & 29 deletions Framework/example-default.ini
Original file line number Diff line number Diff line change
Expand Up @@ -77,32 +77,4 @@ broadcastAddress=tcp://*:5600
id=0

[Checks]
checkMeanIsAbove/threshold=1

;===============================
; Monitoring
;-------------------------------

[ApMon]
enable=0
pathToConfig=apmon.cfg

[InfluxDB]
enableUDP=1
enableHTTP=0
hostname=aido2mon-gpn
port=8087
db=qc

[InfoLoggerBackend]
enable=1

[ProcessMonitor]
enable=1
interval=10

[DerivedMetrics]
maxCacheSize=1000

[Flume]
enable=0
checkMeanIsAbove/threshold=1
41 changes: 41 additions & 0 deletions Framework/include/QualityControl/Activity.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
///
/// \file Activity.h
/// \author Barthelemy von Haller
///

#ifndef QUALITYCONTROL_CORE_ACTIVITY_H
#define QUALITYCONTROL_CORE_ACTIVITY_H

namespace o2 {
namespace quality_control {
namespace core {

/// \brief Dummy class that should be removed when there is the official one.
/// This corresponds to a Run1/2 "run".
/// \author Barthelemy von Haller
class Activity
{
public:
Activity() = default;
Activity(int id, int type) : mId(id), mType(type)
{}
/// Copy constructor
Activity (const Activity& other) = default;
/// Move constructor
Activity (Activity&& other) noexcept = default;
/// Copy assignment operator
Activity& operator= (const Activity& other) = default;
/// Move assignment operator
Activity& operator= (Activity&& other) noexcept = default;

virtual ~Activity() = default;

int mId{0};
int mType{0};
};

} // namespace core
} // namespace QualityControl
} // namespace o2

#endif // QUALITYCONTROL_CORE_ACTIVITY_H
115 changes: 115 additions & 0 deletions Framework/include/QualityControl/TaskDataProcessor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
///
/// \file TaskDataProcessor.h
/// \author Piotr Konopka
///

#ifndef TASKDATAPROCESSOR_H
#define TASKDATAPROCESSOR_H

#include <thread>
#include <mutex>
// boost (should be first but then it makes errors in fairmq)
#include <boost/serialization/array_wrapper.hpp>
#include <boost/accumulators/accumulators.hpp>
#include <boost/accumulators/statistics.hpp>
#include <boost/asio.hpp>
// O2
#include "Common/Timer.h"
#include "Configuration/ConfigurationInterface.h"
#include "Framework/DataProcessorSpec.h"
#include "Monitoring/MonitoringFactory.h"
// QC
#include "QualityControl/TaskInterfaceDPL.h"
#include "QualityControl/TaskConfig.h"

namespace ba = boost::accumulators;

namespace o2 {
namespace quality_control {
namespace core {

using namespace o2::framework;
using namespace std::chrono;

/// \brief A class driving the execution of a QC task inside DPL.
///
/// TaskDataProcessor is a port of TaskDevice class, adapted for usage inside Data Processing Layer.
/// It is responsible for retrieving details about the task via the Configuration system and the Task (indirectly).
/// It then steers the execution of the task and provides it with O2 Data Model data, provided by framework.
/// It finally publishes the MonitorObjects owned and filled by the QC task and managed by the ObjectsManager.
/// Usage:
/// \code{.cxx}
/// auto qcTask = std::make_shared<TaskDataProcessor>(taskName, configurationSource);
/// DataProcessorSpec newTask{
/// taskName,
/// qcTask->getInputsSpecs(),
/// Outputs{ qcTask->getOutputSpec() },
/// AlgorithmSpec{
/// (AlgorithmSpec::InitCallback) [qcTask = std::move(qcTask)](InitContext& initContext) {
///
/// qcTask->initCallback(initContext);
///
/// return (AlgorithmSpec::ProcessCallback) [qcTask = std::move(qcTask)] (ProcessingContext &processingContext) {
/// qcTask->processCallback(processingContext);
/// };
/// }
/// }
/// };
/// \endcode
///
/// \author Piotr Konopka
/// \author Barthelemy von Haller
class TaskDataProcessor {
public:
TaskDataProcessor(std::string taskName, std::string configurationSource);
~TaskDataProcessor();

/// \brief To be invoked during initialization of Data Processor
void initCallback(InitContext& iCtx);
/// \brief To be invoked inside Data Processor's main ProcessCallback
void processCallback(ProcessingContext& pCtx);
/// \brief To be invoked inside Data Processor's TimerCallback
void timerCallback(ProcessingContext& pCtx);

const Inputs& getInputsSpecs() { return mInputSpecs; };
const OutputSpec getOutputSpec() { return mMonitorObjectsSpec; };

private:
void populateConfig(std::string taskName);
void startOfActivity();
void endOfActivity();
void finishCycle(DataAllocator& outputs);
unsigned long publish(DataAllocator& outputs);
static void CustomCleanupTMessage(void* data, void* object);

private:
std::string mTaskName;
TaskConfig mTaskConfig;
std::shared_ptr<o2::configuration::ConfigurationInterface> mConfigFile; // used in init only
std::shared_ptr<o2::monitoring::Monitoring> mCollector;
TaskInterfaceDPL* mTask;
std::shared_ptr<ObjectsManager> mObjectsManager;
std::recursive_mutex mTaskMutex; // \todo should be plain mutex, when timer callback is implemented in dpl

// consider moving these two to TaskConfig
Inputs mInputSpecs;
OutputSpec mMonitorObjectsSpec;

int mNumberBlocks;
int mLastNumberObjects;
bool mCycleOn;
int mCycleNumber;

// stats
AliceO2::Common::Timer mStatsTimer;
int mTotalNumberObjectsPublished;
AliceO2::Common::Timer mTimerTotalDurationActivity;
ba::accumulator_set<double, ba::features<ba::tag::mean, ba::tag::variance>> mPCpus;
ba::accumulator_set<double, ba::features<ba::tag::mean, ba::tag::variance>> mPMems;
};

}
}
}

#endif // TASKDATAPROCESSOR_H
29 changes: 29 additions & 0 deletions Framework/include/QualityControl/TaskDataProcessorFactory.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
///
/// \file TaskDataProcessorFactory.h
/// \author Piotr Konopka
///

#ifndef PROJECT_TASKDATAPROCESSORFACTORY_H
#define PROJECT_TASKDATAPROCESSORFACTORY_H

#include "Framework/DataProcessorSpec.h"

namespace o2 {
namespace quality_control {
namespace core {

/// \brief Factory in charge of creating DataProcessorSpec of QC task
class TaskDataProcessorFactory
{
public:
TaskDataProcessorFactory();
virtual ~TaskDataProcessorFactory();

o2::framework::DataProcessorSpec create(std::string taskName, std::string configurationSource);
};

} // namespace core
} // namespace QualityControl
} // namespace o2

#endif //PROJECT_TASKDATAPROCESSORFACTORY_H
59 changes: 50 additions & 9 deletions Framework/include/QualityControl/TaskFactory.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,15 @@

#include <iostream>
#include <memory>
// ROOT
#include <TSystem.h>
#include <TClass.h>
#include <TROOT.h>
// O2
#include <Common/Exceptions.h>
#include "QualityControl/TaskConfig.h"
#include "QualityControl/TaskDevice.h"
#include "QualityControl/QcInfoLogger.h"

namespace o2 {
namespace quality_control {
Expand All @@ -27,15 +32,51 @@ class ObjectsManager;
/// The class loaded in the library must inherit from TaskInterface.
class TaskFactory
{
public:
TaskFactory();
virtual ~TaskFactory();

/// \brief Create a new instance of a TaskInterface.
/// The TaskInterface actual class is decided based on the parameters passed.
/// \todo make it static ?
/// \author Barthelemy von Haller
TaskInterface *create(TaskConfig &taskConfig, std::shared_ptr<ObjectsManager> objectsManager);
public:
TaskFactory() {};
virtual ~TaskFactory() {};

using FatalException = AliceO2::Common::FatalException;
using errinfo_details = AliceO2::Common::errinfo_details;
/// \brief Create a new instance of a TaskInterface.
/// The TaskInterface actual class is decided based on the parameters passed.
/// \todo make it static ?
/// \author Barthelemy von Haller
template <class T>
T* create(TaskConfig& taskConfig, std::shared_ptr <ObjectsManager> objectsManager)
{
T* result = nullptr;
QcInfoLogger& logger = QcInfoLogger::GetInstance();

// Load the library
std::string library = "lib" + taskConfig.moduleName + ".so";
logger << "Loading library " << library << AliceO2::InfoLogger::InfoLogger::endm;
if (gSystem->Load(library.c_str())) {
BOOST_THROW_EXCEPTION(FatalException() << errinfo_details("Failed to load Detector Publisher Library"));
}

// Get the class and instantiate
logger << "Loading class " << taskConfig.className << AliceO2::InfoLogger::InfoLogger::endm;
TClass* cl = TClass::GetClass(taskConfig.className.c_str());
std::string tempString("Failed to instantiate Quality Control Module");
if (!cl) {
tempString += " because no dictionary for class named \"";
tempString += taskConfig.className;
tempString += "\" could be retrieved";
BOOST_THROW_EXCEPTION(FatalException() << errinfo_details(tempString));
}
logger << "Instantiating class " << taskConfig.className << " (" << cl << ")"
<< AliceO2::InfoLogger::InfoLogger::endm;
result = static_cast<T*>(cl->New());
if (!result) {
BOOST_THROW_EXCEPTION(FatalException() << errinfo_details(tempString));
}
result->setName(taskConfig.taskName);
result->setObjectsManager(objectsManager);
logger << "QualityControl Module " << taskConfig.moduleName << " loaded " << AliceO2::InfoLogger::InfoLogger::endm;

return result;
}
};

} // namespace core
Expand Down
25 changes: 1 addition & 24 deletions Framework/include/QualityControl/TaskInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,36 +9,13 @@
#include <memory>

#include "QualityControl/ObjectsManager.h"
#include "QualityControl/Activity.h"
#include <Common/DataSet.h>

namespace o2 {
namespace quality_control {
namespace core {

/// \brief Dummy class that should be removed when there is the official one.
/// This corresponds to a Run1/2 "run".
/// \author Barthelemy von Haller
class Activity
{
public:
Activity() = default;
Activity(int id, int type) : mId(id), mType(type)
{}
/// Copy constructor
Activity (const Activity& other) = default;
/// Move constructor
Activity (Activity&& other) noexcept = default;
/// Copy assignment operator
Activity& operator= (const Activity& other) = default;
/// Move assignment operator
Activity& operator= (Activity&& other) noexcept = default;

virtual ~Activity() = default;

int mId{0};
int mType{0};
};

/// \brief Skeleton of a QC task.
///
/// Purely abstract class defining the skeleton and the common interface of a QC task.
Expand Down
Loading

0 comments on commit b20df41

Please sign in to comment.