Skip to content

Commit

Permalink
Information Service (AliceO2Group#6)
Browse files Browse the repository at this point in the history
InformationService to broadcast list of tasks and their objects

Add an information service that collects the list of objects published by all the tasks and make it available to clients. There are 2 binaries : qcInfoService and qcInfoServiceDump. The latter is to check what is being produced by the Information Service.

The information service can use a file as input if there are no tasks running but one wants to simulate a normal use.
QC-57
  • Loading branch information
Barthelemy authored Mar 13, 2018
1 parent 1d1c54d commit 40ac89f
Show file tree
Hide file tree
Showing 17 changed files with 776 additions and 20 deletions.
16 changes: 16 additions & 0 deletions Framework/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ set(SRCS
src/SpyDevice.cxx
src/SpyMainFrame.cxx
src/CcdbDatabase.cxx
src/InformationService.cxx
src/InformationServiceDump.cxx
)

set(HEADERS # needed for the dictionary generation
Expand Down Expand Up @@ -134,6 +136,20 @@ O2_GENERATE_EXECUTABLE(
BUCKET_NAME ${BUCKET_NAME}
)

O2_GENERATE_EXECUTABLE(
EXE_NAME qcInfoService
SOURCES src/runInformationService.cxx
MODULE_LIBRARY_NAME ${LIBRARY_NAME}
BUCKET_NAME ${BUCKET_NAME}
)

O2_GENERATE_EXECUTABLE(
EXE_NAME qcInfoServiceDump
SOURCES src/runInformationServiceDump.cxx
MODULE_LIBRARY_NAME ${LIBRARY_NAME}
BUCKET_NAME ${BUCKET_NAME}
)

if (FAIRROOT_FOUND)
O2_GENERATE_EXECUTABLE(
EXE_NAME alfaTestReceiver
Expand Down
32 changes: 29 additions & 3 deletions Framework/alfa.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,19 @@
"rateLogging": 0
}
]
},
{
"name": "information-service-out",
"sockets": [
{
"type": "pub",
"method": "connect",
"address": "tcp://localhost:5560",
"sndBufSize": 10,
"rcvBufSize": 10,
"rateLogging": 0
}
]
}
]
},
Expand All @@ -28,9 +41,22 @@
{
"type": "pub",
"method": "bind",
"address": "tcp://*:5556",
"sndBufSize": 100,
"rcvBufSize": 100,
"address": "tcp://*:5557",
"sndBufSize": 1,
"rcvBufSize": 1,
"rateLogging": 0
}
]
},
{
"name": "information-service-out",
"sockets": [
{
"type": "pub",
"method": "connect",
"address": "tcp://localhost:5560",
"sndBufSize": 1,
"rcvBufSize": 1,
"rateLogging": 0
}
]
Expand Down
10 changes: 4 additions & 6 deletions Framework/include/QualityControl/ObjectsManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,10 @@ class ObjectsManager

public:
ObjectsManager(TaskConfig &taskConfig);

/// Destructor
virtual ~ObjectsManager();

void startPublishing(TObject *obj, std::string objectName = "");

// todo stoppublishing
void setQuality(std::string objectName, Quality quality);
// todo stop publishing

Quality getQuality(std::string objectName);

/// \brief Add a check to the object defined by objectName.
Expand Down Expand Up @@ -71,6 +66,9 @@ class ObjectsManager
iterator end()
{ return mMonitorObjects.end(); }

std::string getObjectsListString()
{ return mObjectsList.GetString().Data(); }

private:
void UpdateIndex(const std::string &nonEmptyName) ;

Expand Down
3 changes: 3 additions & 0 deletions Framework/include/QualityControl/TaskDevice.h
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ class TaskDevice : public FairMQDevice
void monitorCycle();
unsigned long publish();
static void CustomCleanupTMessage(void *data, void *object);
void sendToInformationService(std::string objectsListString);

private:
std::string mTaskName;
Expand All @@ -65,6 +66,8 @@ class TaskDevice : public FairMQDevice
std::unique_ptr<AliceO2::DataSampling::SamplerInterface> mSampler;
o2::quality_control::core::TaskInterface *mTask;
std::shared_ptr<ObjectsManager> mObjectsManager;
// InformationServiceSender infoServiceSender;
std::string lastListSent;

// stats
int mTotalNumberObjectsPublished;
Expand Down
217 changes: 217 additions & 0 deletions Framework/src/InformationService.cxx
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
// Copyright CERN and copyright holders of ALICE O2. This software is
// distributed under the terms of the GNU General Public License v3 (GPL
// Version 3), copied verbatim in the file "COPYING".
//
// See http://alice-o2.web.cern.ch/license for full licensing information.
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
//

///
/// \author bvonhall
/// \file InformationService.cxx
///

#include "InformationService.h"
#include "QualityControl/QcInfoLogger.h"
#include <options/FairMQProgOptions.h>

using namespace std;
typedef boost::tokenizer<boost::char_separator<char> > t_tokenizer;
using namespace o2::quality_control::core;

int timeOutIntervals = 5; // in seconds

InformationService::InformationService() : th(nullptr), mFakeDataIndex(0)
{
OnData("tasks_input", &InformationService::handleTaskInputData);
OnData("request_data", &InformationService::handleRequestData);
}

void InformationService::Init()
{
string fakeDataFile = fConfig->GetValue<string>("fake-data-file");

// todo put this in a method
if (fakeDataFile != "") {
readFakeDataFile(fakeDataFile);
}
}

InformationService::~InformationService()
{
}

void InformationService::checkTimedOut()
{
string line = mFakeData[mFakeDataIndex % mFakeData.size()];
handleTaskInputData(line);
mFakeDataIndex++;

// restart timer
mTimer->expires_at(mTimer->expires_at() + boost::posix_time::seconds(timeOutIntervals));
mTimer->async_wait(boost::bind(&InformationService::checkTimedOut, this));
}

bool InformationService::handleRequestData(FairMQMessagePtr &request, int /*index*/)
{
string requestParam = string(static_cast<char *>(request->GetData()), request->GetSize());
LOG(INFO) << "Received request from client: \"" << requestParam << "\"";

string *result = nullptr;
if (requestParam == "all") {
result = new string(produceJsonAll());
} else {
if (mCacheTasksData.count(requestParam) > 0) {
result = new string(produceJson(requestParam));
} else {
result = new string("{\"error\": \"no such task\"}");
}
}

LOG(INFO) << "Sending reply to client.";
FairMQMessagePtr reply(NewMessage(const_cast<char *>(result->c_str()), // data
result->length(), // size
[](void * /*data*/,
void *object) { delete static_cast<string *>(object); }, // deletion callback
result)); // object that manages the data
if (Send(reply, "request_data") <= 0) {
LOG(ERROR) << "error sending reply";
}
return true; // keep running
}

bool InformationService::handleTaskInputData(FairMQMessagePtr &msg, int /*index*/)
{
string *receivedData = new std::string(static_cast<char *>(msg->GetData()), msg->GetSize());
LOG(INFO) << "Received data, processing...";
LOG(INFO) << " " << *receivedData;

handleTaskInputData(*receivedData);

return true; // keep running
}

bool InformationService::handleTaskInputData(std::string receivedData)
{
std::string taskName = getTaskName(&receivedData);
LOG(DEBUG) << "task : " << taskName;

// check if new data
boost::hash<std::string> string_hash;
size_t hash = string_hash(receivedData);
if (mCacheTasksObjectsHash.count(taskName) > 0) {
if (mCacheTasksObjectsHash.count(taskName) > 0 && hash == mCacheTasksObjectsHash[taskName]) {
LOG(INFO) << "Data already known, we skip it";
return true;
}
}
mCacheTasksObjectsHash[taskName] = hash;

// parse
vector<string> objects = getObjects(&receivedData);

// store
mCacheTasksData[taskName] = objects;

// json
string *json = new std::string(produceJson(taskName));

// publish
sendJson(json);
}

void InformationService::readFakeDataFile(std::string fakeDataFile)
{
std::string line;
std::ifstream myfile(fakeDataFile);
if (!myfile) //Always test the file open.
{
LOG(ERROR) << "Error opening fake data file";
return;
}
mFakeData.clear();
while (std::getline(myfile, line)) {
mFakeData.push_back(line);
}

// start a timer to use the fake data
mTimer = new boost::asio::deadline_timer(io, boost::posix_time::seconds(timeOutIntervals));
mTimer->async_wait(boost::bind(&InformationService::checkTimedOut, this));
th = new thread([&] { io.run(); });
}

vector<string> InformationService::getObjects(string *receivedData)
{
vector<string> objects;
std::string objectsString = receivedData->substr(receivedData->find(":") + 1, receivedData->length());
LOG(DEBUG) << "objects : " << objectsString;
boost::char_separator<char> sep(",");
t_tokenizer tok(objectsString, sep);
for (t_tokenizer::iterator beg = tok.begin(); beg != tok.end(); ++beg) {
objects.push_back(*beg);
}
return objects;
}

std::string InformationService::getTaskName(std::string *receivedData)
{
return receivedData->substr(0, receivedData->find(":"));
}

pt::ptree InformationService::buildTaskNode(std::string taskName)
{
pt::ptree task_node;
task_node.put("name", taskName);
pt::ptree objects_node;
for (auto &object : mCacheTasksData[taskName]) {
pt::ptree object_node;
object_node.put("id", object);
objects_node.push_back(std::make_pair("", object_node));
}
task_node.add_child("objects", objects_node);
return task_node;
}

std::string InformationService::produceJson(std::string taskName)
{
pt::ptree taskNode = buildTaskNode(taskName);

std::stringstream ss;
pt::json_parser::write_json(ss, taskNode);
LOG(DEBUG) << "json : " << endl << ss.str();
// QcInfoLogger::GetInstance() << infologger::Debug << "json : \n" << *json << infologger::endm;
return ss.str();
}

std::string InformationService::produceJsonAll()
{
string result;
pt::ptree main_node;

pt::ptree tasksListNode;
for (const auto &taskTuple : mCacheTasksData) {
pt::ptree taskNode = buildTaskNode(taskTuple.first);
tasksListNode.push_back(std::make_pair("", taskNode));
}
main_node.add_child("tasks", tasksListNode);

std::stringstream ss;
pt::json_parser::write_json(ss, main_node);
LOG(DEBUG) << "json : " << endl << ss.str();
return ss.str();
}

void InformationService::sendJson(std::string *json)
{
FairMQMessagePtr msg2(NewMessage(const_cast<char *>(json->c_str()),
json->length(),
[](void * /*data*/, void *object) { delete static_cast<string *>(object); },
json));
int ret = Send(msg2, "updates_output");
if (ret < 0) {
LOG(ERROR) << "Error sending update";
}
}
Loading

0 comments on commit 40ac89f

Please sign in to comment.