From 40ac89f825cce72a3d8109e01b681889da61c570 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Barth=C3=A9l=C3=A9my=20von=20Haller?= Date: Tue, 13 Mar 2018 09:46:25 +0100 Subject: [PATCH] Information Service (#6) InformationService to broadcast list of tasks and their objects Add an information service that collects the list of objects published by all the tasks and make it available to clients. There are 2 binaries : qcInfoService and qcInfoServiceDump. The latter is to check what is being produced by the Information Service. The information service can use a file as input if there are no tasks running but one wants to simulate a normal use. QC-57 --- Framework/CMakeLists.txt | 16 ++ Framework/alfa.json | 32 ++- .../include/QualityControl/ObjectsManager.h | 10 +- Framework/include/QualityControl/TaskDevice.h | 3 + Framework/src/InformationService.cxx | 217 ++++++++++++++++++ Framework/src/InformationService.h | 98 ++++++++ Framework/src/InformationServiceDump.cxx | 64 ++++++ Framework/src/InformationServiceDump.h | 48 ++++ Framework/src/ObjectsManager.cxx | 4 +- Framework/src/TaskDevice.cxx | 30 +++ Framework/src/runInformationService.cxx | 35 +++ Framework/src/runInformationServiceDump.cxx | 32 +++ InformationService.json | 81 +++++++ Modules/Example/include/Example/ExampleTask.h | 3 +- Modules/Example/src/ExampleTask.cxx | 32 ++- README.md | 86 ++++++- infoServiceFake.json | 5 + 17 files changed, 776 insertions(+), 20 deletions(-) create mode 100644 Framework/src/InformationService.cxx create mode 100644 Framework/src/InformationService.h create mode 100644 Framework/src/InformationServiceDump.cxx create mode 100644 Framework/src/InformationServiceDump.h create mode 100644 Framework/src/runInformationService.cxx create mode 100644 Framework/src/runInformationServiceDump.cxx create mode 100644 InformationService.json create mode 100644 infoServiceFake.json diff --git a/Framework/CMakeLists.txt b/Framework/CMakeLists.txt index 8033936b9abab..3be9c2bddf928 100644 --- a/Framework/CMakeLists.txt +++ b/Framework/CMakeLists.txt @@ -74,6 +74,8 @@ set(SRCS src/SpyDevice.cxx src/SpyMainFrame.cxx src/CcdbDatabase.cxx + src/InformationService.cxx + src/InformationServiceDump.cxx ) set(HEADERS # needed for the dictionary generation @@ -134,6 +136,20 @@ O2_GENERATE_EXECUTABLE( BUCKET_NAME ${BUCKET_NAME} ) +O2_GENERATE_EXECUTABLE( + EXE_NAME qcInfoService + SOURCES src/runInformationService.cxx + MODULE_LIBRARY_NAME ${LIBRARY_NAME} + BUCKET_NAME ${BUCKET_NAME} +) + +O2_GENERATE_EXECUTABLE( + EXE_NAME qcInfoServiceDump + SOURCES src/runInformationServiceDump.cxx + MODULE_LIBRARY_NAME ${LIBRARY_NAME} + BUCKET_NAME ${BUCKET_NAME} +) + if (FAIRROOT_FOUND) O2_GENERATE_EXECUTABLE( EXE_NAME alfaTestReceiver diff --git a/Framework/alfa.json b/Framework/alfa.json index 6c6658a291a93..5af6373d30312 100644 --- a/Framework/alfa.json +++ b/Framework/alfa.json @@ -16,6 +16,19 @@ "rateLogging": 0 } ] + }, + { + "name": "information-service-out", + "sockets": [ + { + "type": "pub", + "method": "connect", + "address": "tcp://localhost:5560", + "sndBufSize": 10, + "rcvBufSize": 10, + "rateLogging": 0 + } + ] } ] }, @@ -28,9 +41,22 @@ { "type": "pub", "method": "bind", - "address": "tcp://*:5556", - "sndBufSize": 100, - "rcvBufSize": 100, + "address": "tcp://*:5557", + "sndBufSize": 1, + "rcvBufSize": 1, + "rateLogging": 0 + } + ] + }, + { + "name": "information-service-out", + "sockets": [ + { + "type": "pub", + "method": "connect", + "address": "tcp://localhost:5560", + "sndBufSize": 1, + "rcvBufSize": 1, "rateLogging": 0 } ] diff --git a/Framework/include/QualityControl/ObjectsManager.h b/Framework/include/QualityControl/ObjectsManager.h index cb5d91c41502d..5f0f098f26998 100644 --- a/Framework/include/QualityControl/ObjectsManager.h +++ b/Framework/include/QualityControl/ObjectsManager.h @@ -29,15 +29,10 @@ class ObjectsManager public: ObjectsManager(TaskConfig &taskConfig); - - /// Destructor virtual ~ObjectsManager(); - void startPublishing(TObject *obj, std::string objectName = ""); - + // todo stoppublishing void setQuality(std::string objectName, Quality quality); - // todo stop publishing - Quality getQuality(std::string objectName); /// \brief Add a check to the object defined by objectName. @@ -71,6 +66,9 @@ class ObjectsManager iterator end() { return mMonitorObjects.end(); } + std::string getObjectsListString() + { return mObjectsList.GetString().Data(); } + private: void UpdateIndex(const std::string &nonEmptyName) ; diff --git a/Framework/include/QualityControl/TaskDevice.h b/Framework/include/QualityControl/TaskDevice.h index 60dafa9cddb1a..86f310d526b6d 100644 --- a/Framework/include/QualityControl/TaskDevice.h +++ b/Framework/include/QualityControl/TaskDevice.h @@ -56,6 +56,7 @@ class TaskDevice : public FairMQDevice void monitorCycle(); unsigned long publish(); static void CustomCleanupTMessage(void *data, void *object); + void sendToInformationService(std::string objectsListString); private: std::string mTaskName; @@ -65,6 +66,8 @@ class TaskDevice : public FairMQDevice std::unique_ptr mSampler; o2::quality_control::core::TaskInterface *mTask; std::shared_ptr mObjectsManager; +// InformationServiceSender infoServiceSender; + std::string lastListSent; // stats int mTotalNumberObjectsPublished; diff --git a/Framework/src/InformationService.cxx b/Framework/src/InformationService.cxx new file mode 100644 index 0000000000000..a2232191ff700 --- /dev/null +++ b/Framework/src/InformationService.cxx @@ -0,0 +1,217 @@ +// Copyright CERN and copyright holders of ALICE O2. This software is +// distributed under the terms of the GNU General Public License v3 (GPL +// Version 3), copied verbatim in the file "COPYING". +// +// See http://alice-o2.web.cern.ch/license for full licensing information. +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +// + +/// +/// \author bvonhall +/// \file InformationService.cxx +/// + +#include "InformationService.h" +#include "QualityControl/QcInfoLogger.h" +#include + +using namespace std; +typedef boost::tokenizer > t_tokenizer; +using namespace o2::quality_control::core; + +int timeOutIntervals = 5; // in seconds + +InformationService::InformationService() : th(nullptr), mFakeDataIndex(0) +{ + OnData("tasks_input", &InformationService::handleTaskInputData); + OnData("request_data", &InformationService::handleRequestData); +} + +void InformationService::Init() +{ + string fakeDataFile = fConfig->GetValue("fake-data-file"); + + // todo put this in a method + if (fakeDataFile != "") { + readFakeDataFile(fakeDataFile); + } +} + +InformationService::~InformationService() +{ +} + +void InformationService::checkTimedOut() +{ + string line = mFakeData[mFakeDataIndex % mFakeData.size()]; + handleTaskInputData(line); + mFakeDataIndex++; + + // restart timer + mTimer->expires_at(mTimer->expires_at() + boost::posix_time::seconds(timeOutIntervals)); + mTimer->async_wait(boost::bind(&InformationService::checkTimedOut, this)); +} + +bool InformationService::handleRequestData(FairMQMessagePtr &request, int /*index*/) +{ + string requestParam = string(static_cast(request->GetData()), request->GetSize()); + LOG(INFO) << "Received request from client: \"" << requestParam << "\""; + + string *result = nullptr; + if (requestParam == "all") { + result = new string(produceJsonAll()); + } else { + if (mCacheTasksData.count(requestParam) > 0) { + result = new string(produceJson(requestParam)); + } else { + result = new string("{\"error\": \"no such task\"}"); + } + } + + LOG(INFO) << "Sending reply to client."; + FairMQMessagePtr reply(NewMessage(const_cast(result->c_str()), // data + result->length(), // size + [](void * /*data*/, + void *object) { delete static_cast(object); }, // deletion callback + result)); // object that manages the data + if (Send(reply, "request_data") <= 0) { + LOG(ERROR) << "error sending reply"; + } + return true; // keep running +} + +bool InformationService::handleTaskInputData(FairMQMessagePtr &msg, int /*index*/) +{ + string *receivedData = new std::string(static_cast(msg->GetData()), msg->GetSize()); + LOG(INFO) << "Received data, processing..."; + LOG(INFO) << " " << *receivedData; + + handleTaskInputData(*receivedData); + + return true; // keep running +} + +bool InformationService::handleTaskInputData(std::string receivedData) +{ + std::string taskName = getTaskName(&receivedData); + LOG(DEBUG) << "task : " << taskName; + + // check if new data + boost::hash string_hash; + size_t hash = string_hash(receivedData); + if (mCacheTasksObjectsHash.count(taskName) > 0) { + if (mCacheTasksObjectsHash.count(taskName) > 0 && hash == mCacheTasksObjectsHash[taskName]) { + LOG(INFO) << "Data already known, we skip it"; + return true; + } + } + mCacheTasksObjectsHash[taskName] = hash; + + // parse + vector objects = getObjects(&receivedData); + + // store + mCacheTasksData[taskName] = objects; + + // json + string *json = new std::string(produceJson(taskName)); + + // publish + sendJson(json); +} + +void InformationService::readFakeDataFile(std::string fakeDataFile) +{ + std::string line; + std::ifstream myfile(fakeDataFile); + if (!myfile) //Always test the file open. + { + LOG(ERROR) << "Error opening fake data file"; + return; + } + mFakeData.clear(); + while (std::getline(myfile, line)) { + mFakeData.push_back(line); + } + + // start a timer to use the fake data + mTimer = new boost::asio::deadline_timer(io, boost::posix_time::seconds(timeOutIntervals)); + mTimer->async_wait(boost::bind(&InformationService::checkTimedOut, this)); + th = new thread([&] { io.run(); }); +} + +vector InformationService::getObjects(string *receivedData) +{ + vector objects; + std::string objectsString = receivedData->substr(receivedData->find(":") + 1, receivedData->length()); + LOG(DEBUG) << "objects : " << objectsString; + boost::char_separator sep(","); + t_tokenizer tok(objectsString, sep); + for (t_tokenizer::iterator beg = tok.begin(); beg != tok.end(); ++beg) { + objects.push_back(*beg); + } + return objects; +} + +std::string InformationService::getTaskName(std::string *receivedData) +{ + return receivedData->substr(0, receivedData->find(":")); +} + +pt::ptree InformationService::buildTaskNode(std::string taskName) +{ + pt::ptree task_node; + task_node.put("name", taskName); + pt::ptree objects_node; + for (auto &object : mCacheTasksData[taskName]) { + pt::ptree object_node; + object_node.put("id", object); + objects_node.push_back(std::make_pair("", object_node)); + } + task_node.add_child("objects", objects_node); + return task_node; +} + +std::string InformationService::produceJson(std::string taskName) +{ + pt::ptree taskNode = buildTaskNode(taskName); + + std::stringstream ss; + pt::json_parser::write_json(ss, taskNode); + LOG(DEBUG) << "json : " << endl << ss.str(); +// QcInfoLogger::GetInstance() << infologger::Debug << "json : \n" << *json << infologger::endm; + return ss.str(); +} + +std::string InformationService::produceJsonAll() +{ + string result; + pt::ptree main_node; + + pt::ptree tasksListNode; + for (const auto &taskTuple : mCacheTasksData) { + pt::ptree taskNode = buildTaskNode(taskTuple.first); + tasksListNode.push_back(std::make_pair("", taskNode)); + } + main_node.add_child("tasks", tasksListNode); + + std::stringstream ss; + pt::json_parser::write_json(ss, main_node); + LOG(DEBUG) << "json : " << endl << ss.str(); + return ss.str(); +} + +void InformationService::sendJson(std::string *json) +{ + FairMQMessagePtr msg2(NewMessage(const_cast(json->c_str()), + json->length(), + [](void * /*data*/, void *object) { delete static_cast(object); }, + json)); + int ret = Send(msg2, "updates_output"); + if (ret < 0) { + LOG(ERROR) << "Error sending update"; + } +} diff --git a/Framework/src/InformationService.h b/Framework/src/InformationService.h new file mode 100644 index 0000000000000..ef05d16adbc2e --- /dev/null +++ b/Framework/src/InformationService.h @@ -0,0 +1,98 @@ +// Copyright CERN and copyright holders of ALICE O2. This software is +// distributed under the terms of the GNU General Public License v3 (GPL +// Version 3), copied verbatim in the file "COPYING". +// +// See http://alice-o2.web.cern.ch/license for full licensing information. +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +// + +/// +/// \author bvonhall +/// \file InformationService.h +/// + + +#ifndef PROJECT_INFORMATIONSERVICE_H +#define PROJECT_INFORMATIONSERVICE_H + +#include +#include "FairMQDevice.h" + +#include +#include +#include + +namespace pt = boost::property_tree; + +/// \brief Collect the list of objects published by all the tasks and make it available to clients. +/// +/// The InformationService receives the list of objects published by each task. +/// It keeps a list of all tasks and objects and send it upon request to clients. It also publishes updates when +/// new information comes from the tasks. +/// +/// See InformationService.json to know the port where updates are published. +/// See InformationService.json to know the port where to request information for all tasks (param "all") or +/// for a specific task (param ""). +/// See runInformationService.cxx for the steering code. +/// +/// Example usage : +/// qcInfoService -c /absolute/path/to/InformationService.json -n information_service \\ +/// --id information_service --mq-config /absolute/path/to/InformationService.json +/// +/// Format of the string coming from the tasks : +/// `task_id:obj0,obj1,obj2` +/// Format of the JSON output for one task or all tasks : +/// See README +/// +/// \todo Handle tasks dying and their removal from the cache and the publication of an update (heartbeat ?). +/// \todo Handle tasks sending information that they are disappearing. + +class InformationService : public FairMQDevice +{ + public: + InformationService(); + virtual ~InformationService(); + + protected: + /// Callback for data coming from qcTasks + bool handleTaskInputData(FairMQMessagePtr&, int); + /// Callback for the requests coming from clients + bool handleRequestData(FairMQMessagePtr&, int); + void Init(); + + private: + /// Extract the list of objects from the string received from the tasks + std::vector getObjects(std::string *receivedData); + /// Extract the task name from the string received from the tasks + std::string getTaskName(std::string *receivedData); + /// Produce the JSON string for the specified task + std::string produceJson(std::string taskName); + /// Produce the JSON string for all tasks and objects + std::string produceJsonAll(); + /// Send the JSON string to all clients (subscribers) + void sendJson(std::string *json); + pt::ptree buildTaskNode(std::string taskName); + void checkTimedOut(); + /// Compute and send the JSON using the inputString from a task + bool handleTaskInputData(std::string inputString); + /// Reads a file containing data in format as received from the tasks. + /// Store the items and use them at regular intervals to simulate tasks inputs. + /// Calling again this method will delete the former fake data cache. + void readFakeDataFile(std::string filePath); + + private: + std::map> mCacheTasksData; /// the list of objects names for each task + std::map mCacheTasksObjectsHash; /// used to check whether we already have received this list of objects + boost::asio::deadline_timer *mTimer; /// the asynchronous timer to check if agents have timed out + std::vector mFakeData; /// container for the fake data (if any). Each line is in a string and used in turn. + int mFakeDataIndex; + // variables for the timer + boost::asio::io_service io; + std::thread *th; + +}; + +#endif //PROJECT_INFORMATIONSERVICE_H diff --git a/Framework/src/InformationServiceDump.cxx b/Framework/src/InformationServiceDump.cxx new file mode 100644 index 0000000000000..ff79e8251872a --- /dev/null +++ b/Framework/src/InformationServiceDump.cxx @@ -0,0 +1,64 @@ +// Copyright CERN and copyright holders of ALICE O2. This software is +// distributed under the terms of the GNU General Public License v3 (GPL +// Version 3), copied verbatim in the file "COPYING". +// +// See http://alice-o2.web.cern.ch/license for full licensing information. +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +// + +/// +/// \author bvonhall +/// \file InformationServiceDump.cxx +/// + +#include "InformationServiceDump.h" + +#include +#include +#include + +#include "FairMQLogger.h" +#include + +using namespace std; +namespace pt = boost::property_tree; + +InformationServiceDump::InformationServiceDump() +{ + OnData("info_service_input", &InformationServiceDump::HandleData); +} + +InformationServiceDump::~InformationServiceDump() +{ +} + +bool InformationServiceDump::HandleData(FairMQMessagePtr &msg, int /*index*/) +{ + string *receivedData = new std::string(static_cast(msg->GetData()), msg->GetSize()); + LOG(INFO) << "Received data : "; + LOG(INFO) << " " << *receivedData; + + string* text = new string( fConfig->GetValue("request-task")); + LOG(INFO) << "Preparing request for \"" << *text << "\""; + FairMQMessagePtr request(NewMessage(const_cast(text->c_str()), // data + text->length(), // size + [](void* /*data*/, void* object) { delete static_cast(object); }, // deletion callback + text)); // object that manages the data + LOG(INFO) << "Sending request "; + if (Send(request, "send_request") > 0) { + FairMQMessagePtr reply(NewMessage()); + if (Receive(reply, "send_request") >= 0) + { + LOG(INFO) << "Received reply from server: \"" << string(static_cast(reply->GetData()), reply->GetSize()) << "\""; + } else { + LOG(ERROR) << "Problem receiving reply"; + } + } else { + LOG(ERROR) << "problem sending request"; + } + + return true; // keep running +} diff --git a/Framework/src/InformationServiceDump.h b/Framework/src/InformationServiceDump.h new file mode 100644 index 0000000000000..b8d84ba1a0ac4 --- /dev/null +++ b/Framework/src/InformationServiceDump.h @@ -0,0 +1,48 @@ +// Copyright CERN and copyright holders of ALICE O2. This software is +// distributed under the terms of the GNU General Public License v3 (GPL +// Version 3), copied verbatim in the file "COPYING". +// +// See http://alice-o2.web.cern.ch/license for full licensing information. +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +// + +/// +/// \author bvonhall +/// \file InformationServiceDump.h +/// + + +#ifndef PROJECT_INFORMATIONSERVICEDUMP_H +#define PROJECT_INFORMATIONSERVICEDUMP_H + +#include "FairMQDevice.h" +#include + +/// \brief Dump the publications received from the InformationService. +/// +/// Useful for checking the InformationService. +/// It will receive the updates from the tasks. Upon reception , it dumps it and send a request for all or a single +/// task data and displays the reply. +/// To decide which task the request should target, use parameter "request-task". By default it asks for all. +/// +/// See runInformationServiceDump.cxx for the steering code. +/// +/// Example usage : +/// qcInfoServiceDump -c /absolute/path/to/InformationService.json -n information_service_dump +/// --id information_service_dump --mq-config /absolute/path/to/InformationService.json +/// --request-task myTask1 +class InformationServiceDump : public FairMQDevice +{ + public: + InformationServiceDump(); + virtual ~InformationServiceDump(); + + protected: + /// Callback for data coming from InformationService + bool HandleData(FairMQMessagePtr &, int); +}; + +#endif //PROJECT_InformationServiceDump_H diff --git a/Framework/src/ObjectsManager.cxx b/Framework/src/ObjectsManager.cxx index 890dbdf689e3b..d6454402f242f 100644 --- a/Framework/src/ObjectsManager.cxx +++ b/Framework/src/ObjectsManager.cxx @@ -36,7 +36,9 @@ void ObjectsManager::startPublishing(TObject *object, std::string objectName) mMonitorObjects[nonEmptyName] = newObject; //update index - UpdateIndex(nonEmptyName); + if(objectName != MonitorObject::SYSTEM_OBJECT_PUBLICATION_LIST) { + UpdateIndex(nonEmptyName); + } } void ObjectsManager::UpdateIndex(const string &nonEmptyName) diff --git a/Framework/src/TaskDevice.cxx b/Framework/src/TaskDevice.cxx index b67e268bf3834..5c09e8bad6677 100644 --- a/Framework/src/TaskDevice.cxx +++ b/Framework/src/TaskDevice.cxx @@ -199,6 +199,8 @@ unsigned long TaskDevice::publish() sentMessages++; } + sendToInformationService(mObjectsManager->getObjectsListString()); + return sentMessages; } @@ -231,6 +233,34 @@ void TaskDevice::endOfActivity() mCollector->send(ba::mean(pmems), "QC_task_Mean_pmem_whole_run"); } +void TaskDevice::sendToInformationService(string objectsListString) +{ + string* text = new std::string(mTaskName); + *text += ":" + objectsListString; + // todo escape names with a comma or a colon + + // create message object with a pointer to the data buffer, + // its size, + // custom deletion function (called when transfer is done), + // and pointer to the object managing the data buffer + FairMQMessagePtr msg(NewMessage(const_cast(text->c_str()), + text->length(), + [](void* /*data*/, void* object) { delete static_cast(object); }, + text)); + + LOG(info) << "Sending \"" << *text << "\""; + LOG(info) << " llength : " << text->length(); + + // in case of error or transfer interruption, return false to go to IDLE state + // successfull transfer will return number of bytes transfered (can be 0 if sending an empty message). + int ret = Send(msg, "information-service-out"); + if(ret < 0) + { + LOG(error) << "Error sending" << endl; + } + +} + } } } diff --git a/Framework/src/runInformationService.cxx b/Framework/src/runInformationService.cxx new file mode 100644 index 0000000000000..00ace33bcda82 --- /dev/null +++ b/Framework/src/runInformationService.cxx @@ -0,0 +1,35 @@ +// Copyright CERN and copyright holders of ALICE O2. This software is +// distributed under the terms of the GNU General Public License v3 (GPL +// Version 3), copied verbatim in the file "COPYING". +// +// See http://alice-o2.web.cern.ch/license for full licensing information. +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +// + +/// +/// \author bvonhall +/// \file runInformationService.cxx +/// + +#include "runFairMQDevice.h" +#include "InformationService.h" + +namespace bpo = boost::program_options; + +void addCustomOptions(bpo::options_description &options) +{ + options.add_options() + ("fake-data-file", bpo::value()->default_value(""), + "File containing JSON to use as input (useful for tests if no tasks is running). It is used to reply to requests. " + "It is reloaded every 10 seconds and if it changed it is published to the clients."); +} + +FairMQDevicePtr getDevice(const FairMQProgOptions & /*config*/) +{ + InformationService *is = new InformationService(); + + return is; +} \ No newline at end of file diff --git a/Framework/src/runInformationServiceDump.cxx b/Framework/src/runInformationServiceDump.cxx new file mode 100644 index 0000000000000..7beea76ce8935 --- /dev/null +++ b/Framework/src/runInformationServiceDump.cxx @@ -0,0 +1,32 @@ +// Copyright CERN and copyright holders of ALICE O2. This software is +// distributed under the terms of the GNU General Public License v3 (GPL +// Version 3), copied verbatim in the file "COPYING". +// +// See http://alice-o2.web.cern.ch/license for full licensing information. +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. +// + +/// +/// \author bvonhall +/// \file InformationServiceDump.cxx +/// + +#include "runFairMQDevice.h" +#include "InformationServiceDump.h" + +namespace bpo = boost::program_options; + +void addCustomOptions(bpo::options_description &options) +{ + options.add_options() + ("request-task", bpo::value()->default_value("all"), + "The name of the task it will request (default: all)"); +} + +FairMQDevicePtr getDevice(const FairMQProgOptions & /*config*/) +{ + return new InformationServiceDump(); +} \ No newline at end of file diff --git a/InformationService.json b/InformationService.json new file mode 100644 index 0000000000000..71b245c071c11 --- /dev/null +++ b/InformationService.json @@ -0,0 +1,81 @@ +{ + "fairMQOptions": { + "devices": [ + { + "key": "information_service", + "channels": [ + { + "name": "tasks_input", + "sockets": [ + { + "type": "sub", + "method": "bind", + "address": "tcp://*:5560", + "sndBufSize": 1, + "rcvBufSize": 100, + "rateLogging": 0 + } + ] + }, + { + "name": "updates_output", + "sockets": [ + { + "type": "pub", + "method": "bind", + "address": "tcp://*:5561", + "sndBufSize": 1000, + "rcvBufSize": 1000, + "rateLogging": 0 + } + ] + }, + { + "name": "request_data", + "sockets": [ + { + "type": "rep", + "method": "bind", + "address": "tcp://*:5562", + "sndBufSize": 1000, + "rcvBufSize": 1000, + "rateLogging": 0 + } + ] + } + ] + }, + { + "key": "information_service_dump", + "channels": [ + { + "name": "info_service_input", + "sockets": [ + { + "type": "sub", + "method": "connect", + "address": "tcp://localhost:5561", + "sndBufSize": 1000, + "rcvBufSize": 1000, + "rateLogging": 0 + } + ] + }, + { + "name": "send_request", + "sockets": [ + { + "type": "req", + "method": "connect", + "address": "tcp://localhost:5562", + "sndBufSize": 1000, + "rcvBufSize": 1000, + "rateLogging": 0 + } + ] + } + ] + } + ] + } +} \ No newline at end of file diff --git a/Modules/Example/include/Example/ExampleTask.h b/Modules/Example/include/Example/ExampleTask.h index 4c645219ebb9f..75fc3f5d3d0ce 100644 --- a/Modules/Example/include/Example/ExampleTask.h +++ b/Modules/Example/include/Example/ExampleTask.h @@ -47,8 +47,9 @@ class ExampleTask /*final*/: public TaskInterface // todo add back the "final" w } private: - + int mNumberCycles; TH1F *mHistos[25]; + void publishHisto(int i); }; } diff --git a/Modules/Example/src/ExampleTask.cxx b/Modules/Example/src/ExampleTask.cxx index 6120e54ba4145..e0855476b2700 100644 --- a/Modules/Example/src/ExampleTask.cxx +++ b/Modules/Example/src/ExampleTask.cxx @@ -21,6 +21,7 @@ ExampleTask::ExampleTask() for (auto &mHisto : mHistos) { mHisto = nullptr; } + mNumberCycles = 0; } ExampleTask::~ExampleTask() @@ -36,11 +37,8 @@ void ExampleTask::initialize() { QcInfoLogger::GetInstance() << "initialize ExampleTask" << AliceO2::InfoLogger::InfoLogger::endm; - for (int i = 0; i < 25; i++) { - stringstream name; - name << "array-" << i; - mHistos[i] = new TH1F(name.str().c_str(), name.str().c_str(), 100, 0, 99); - getObjectsManager()->startPublishing(mHistos[i], name.str()); + for (int i = 0; i < 24; i++) { + publishHisto(i); } // Extendable axis @@ -55,11 +53,21 @@ void ExampleTask::initialize() "QcExample"); } +void ExampleTask::publishHisto(int i) +{ + stringstream name; + name << "array-" << i; + mHistos[i] = new TH1F(name.str().c_str(), name.str().c_str(), 100, 0, 99); + getObjectsManager()->startPublishing(mHistos[i], name.str()); +} + void ExampleTask::startOfActivity(Activity &activity) { QcInfoLogger::GetInstance() << "startOfActivity" << AliceO2::InfoLogger::InfoLogger::endm; for (auto &mHisto : mHistos) { - mHisto->Reset(); + if (mHisto) { + mHisto->Reset(); + } } } @@ -71,14 +79,22 @@ void ExampleTask::startOfCycle() void ExampleTask::monitorDataBlock(DataSetReference dataSet) { mHistos[0]->Fill(dataSet->at(0)->getData()->header.dataSize); - for (int i = 1; i < 25; i++) { - mHistos[i]->FillRandom("gaus", 1); + for (auto &mHisto : mHistos) { + if (mHisto) { + mHisto->FillRandom("gaus", 1); + } } } void ExampleTask::endOfCycle() { QcInfoLogger::GetInstance() << "endOfCycle" << AliceO2::InfoLogger::InfoLogger::endm; + mNumberCycles++; + + // Add one more object just to show that we can do it + if(mNumberCycles == 3) { + publishHisto(24); + } } void ExampleTask::endOfActivity(Activity &activity) diff --git a/README.md b/README.md index 8cc2899c74c0b..e1d316721102b 100644 --- a/README.md +++ b/README.md @@ -168,6 +168,90 @@ One can also check what is stored in the database by clicking `Stop` and then switching to `Database` source. This will only work if a config file was passed to the `qcSpy` utility. +### Information Service + +The information service publishes information about the tasks currently +running and the objects they publish. It is needed by some GUIs, or +other clients. + +By default it will publish on port 5561 the json description of a task +when it is updated. A client can also request on port 5562 the information +about a specific task or about all the tasks, by passing the name of the +task as a parameter or "all" respectively. + +The JSON for a task looks like : +``` +{ + "name": "myTask_1", + "objects": [ + { + "id": "array-0" + }, + { + "id": "array-1" + }, + { + "id": "array-2" + }, + { + "id": "array-3" + }, + { + "id": "array-4" + } + ] +} +``` + +The JSON for all tasks looks like : +``` +{ + "tasks": [ + { + "name": "myTask_1", + "objects": [ + { + "id": "array-0" + }, + { + "id": "array-1" + } + ] + }, + { + "name": "myTask_2", + "objects": [ + { + "id": "array-0" + }, + { + "id": "array-1" + } + ] + } + ] +} +``` +#### Usage +``` +qcInfoService -c /absolute/path/to/InformationService.json -n information_service \ + --id information_service --mq-config /absolute/path/to/InformationService.json +``` + +The `qcInfoService` can provide fake data from a file. This is useful +to test the clients. Use the option `--fake-data-file` and provide the +absolute path to the file. The file `infoServiceFake.json` is provided +as an example. + +To check what is being output by the Information Service, one can +run the InformationServiceDump : +``` +qcInfoServiceDump -c /absolute/path/to/InformationService.json -n information_service_dump \ + --id information_service_dump --mq-config /absolute/path/to/InformationService.json + --request-task myTask1 +``` +The last parameter can be omitted to receive information about all tasks. + ## Modules development Steps to create a new module Abc @@ -184,4 +268,4 @@ From here, fill in the methods in AbcTask.cxx, and AbcCheck.cxx if needed. In case special additional dependencies are needed, create a new bucket in QualityControlModules/cmake/QualityControlModulesDependencies.cmake. - + diff --git a/infoServiceFake.json b/infoServiceFake.json new file mode 100644 index 0000000000000..1f64744a8b1cf --- /dev/null +++ b/infoServiceFake.json @@ -0,0 +1,5 @@ +myTask_1:array-0,array-1,array-2,array-3,array-4, +myTask_1:array-0,array-1,array-2,array-3,array-4,array-5, +myTask_1:array-0,array-1,array-2,array-3,array-4,array-5, +myTask_1:array-0,array-1,array-2,array-3,array-4, +myTask_2:array-0,array-1,array-2,array-3,array-4, \ No newline at end of file