From 77453989bc0e6a20bc27dbcdddfa75b8f1c2d448 Mon Sep 17 00:00:00 2001 From: Roberto Preghenella Date: Thu, 13 Feb 2020 11:54:24 +0530 Subject: [PATCH 1/4] Remove usage of embedded file-reader for compressor. Standardise the output. --- Detectors/TOF/compression/CMakeLists.txt | 14 +- .../TOFCompression/CompressedInspectorTask.h | 2 - .../TOFCompression/CompressedWriterTask.h | 48 ---- .../include/TOFCompression/Compressor.h | 22 +- .../include/TOFCompression/CompressorTask.h | 5 - .../include/TOFCompression/RawDataFrame.h | 64 ----- .../include/TOFCompression/RawReaderTask.h | 50 ---- .../src/CompressedInspectorTask.cxx | 169 ++++++------- .../compression/src/CompressedWriterTask.cxx | 84 ------- Detectors/TOF/compression/src/Compressor.cxx | 223 ++---------------- .../TOF/compression/src/CompressorTask.cxx | 75 +++--- .../TOF/compression/src/RawReaderTask.cxx | 100 -------- .../compression/src/TOFCompressionLinkDef.h | 19 -- .../src/tof-compressed-inspector.cxx | 48 ++++ .../TOF/compression/src/tof-compressor.cxx | 42 ++-- 15 files changed, 220 insertions(+), 745 deletions(-) delete mode 100644 Detectors/TOF/compression/include/TOFCompression/CompressedWriterTask.h delete mode 100644 Detectors/TOF/compression/include/TOFCompression/RawDataFrame.h delete mode 100644 Detectors/TOF/compression/include/TOFCompression/RawReaderTask.h delete mode 100644 Detectors/TOF/compression/src/CompressedWriterTask.cxx delete mode 100644 Detectors/TOF/compression/src/RawReaderTask.cxx delete mode 100644 Detectors/TOF/compression/src/TOFCompressionLinkDef.h create mode 100644 Detectors/TOF/compression/src/tof-compressed-inspector.cxx diff --git a/Detectors/TOF/compression/CMakeLists.txt b/Detectors/TOF/compression/CMakeLists.txt index 2b019648e5896..78debc8f8f445 100644 --- a/Detectors/TOF/compression/CMakeLists.txt +++ b/Detectors/TOF/compression/CMakeLists.txt @@ -10,20 +10,20 @@ o2_add_library(TOFCompression SOURCES src/Compressor.cxx - src/RawReaderTask.cxx src/CompressorTask.cxx - src/CompressedWriterTask.cxx - src/CompressedInspectorTask.cxx + src/CompressedInspectorTask.cxx PUBLIC_LINK_LIBRARIES O2::TOFBase O2::Framework O2::Headers O2::DataFormatsTOF O2::DetectorsRaw ) -o2_target_root_dictionary(TOFCompression - HEADERS include/TOFCompression/RawDataFrame.h - ) - o2_add_executable(compressor COMPONENT_NAME tof SOURCES src/tof-compressor.cxx PUBLIC_LINK_LIBRARIES O2::TOFCompression ) + +o2_add_executable(compressed-inspector + COMPONENT_NAME tof + SOURCES src/tof-compressed-inspector.cxx + PUBLIC_LINK_LIBRARIES O2::TOFCompression + ) diff --git a/Detectors/TOF/compression/include/TOFCompression/CompressedInspectorTask.h b/Detectors/TOF/compression/include/TOFCompression/CompressedInspectorTask.h index 49ca97b86e9a9..365a700cf30ba 100644 --- a/Detectors/TOF/compression/include/TOFCompression/CompressedInspectorTask.h +++ b/Detectors/TOF/compression/include/TOFCompression/CompressedInspectorTask.h @@ -39,8 +39,6 @@ class CompressedInspectorTask : public Task void init(InitContext& ic) final; void run(ProcessingContext& pc) final; - static DataProcessorSpec getSpec(); - private: bool mStatus = false; TFile* mFile = nullptr; diff --git a/Detectors/TOF/compression/include/TOFCompression/CompressedWriterTask.h b/Detectors/TOF/compression/include/TOFCompression/CompressedWriterTask.h deleted file mode 100644 index 6ba4c4c3e5b3f..0000000000000 --- a/Detectors/TOF/compression/include/TOFCompression/CompressedWriterTask.h +++ /dev/null @@ -1,48 +0,0 @@ -// Copyright CERN and copyright holders of ALICE O2. This software is -// distributed under the terms of the GNU General Public License v3 (GPL -// Version 3), copied verbatim in the file "COPYING". -// -// See http://alice-o2.web.cern.ch/license for full licensing information. -// -// In applying this license CERN does not waive the privileges and immunities -// granted to it by virtue of its status as an Intergovernmental Organization -// or submit itself to any jurisdiction. - -/// @file CompressedWriterTask.h -/// @author Roberto Preghenella -/// @since 2019-12-18 -/// @brief TOF compressed data writer task - -#ifndef O2_TOF_COMPRESSEDWRITERTASK -#define O2_TOF_COMPRESSEDWRITERTASK - -#include "Framework/Task.h" -#include "Framework/DataProcessorSpec.h" -#include - -using namespace o2::framework; - -namespace o2 -{ -namespace tof -{ - -class CompressedWriterTask : public Task -{ - public: - CompressedWriterTask() = default; - ~CompressedWriterTask() override = default; - void init(InitContext& ic) final; - void run(ProcessingContext& pc) final; - - static DataProcessorSpec getSpec(); - - private: - bool mStatus = false; - std::ofstream mFile; -}; - -} // namespace tof -} // namespace o2 - -#endif /* O2_TOF_COMPRESSEDWRITERTASK */ diff --git a/Detectors/TOF/compression/include/TOFCompression/Compressor.h b/Detectors/TOF/compression/include/TOFCompression/Compressor.h index 8bee30ea20d20..29311b1e94535 100644 --- a/Detectors/TOF/compression/include/TOFCompression/Compressor.h +++ b/Detectors/TOF/compression/include/TOFCompression/Compressor.h @@ -33,7 +33,7 @@ class Compressor public: Compressor() = default; - ~Compressor(); + ~Compressor() = default; inline bool run() { @@ -43,16 +43,11 @@ class Compressor return false; }; - bool init(); - bool open(const std::string inFileName, const std::string outFileName); - bool close(); - inline bool read() { return decoderRead(); }; inline void rewind() { decoderRewind(); encoderRewind(); }; - inline bool write() { return encoderWrite(); }; void checkSummary(); void resetCounters(); @@ -79,10 +74,6 @@ class Compressor /** decoder private functions and data members **/ - bool decoderInit(); - bool decoderOpen(const std::string name); - bool decoderRead(); - bool decoderClose(); bool decoderParanoid(); inline void decoderRewind() { mDecoderPointer = reinterpret_cast(mDecoderBuffer); }; inline void decoderNext() @@ -97,9 +88,7 @@ class Compressor std::ifstream mDecoderFile; char* mDecoderBuffer = nullptr; - bool mOwnDecoderBuffer = false; - long mDecoderBufferSize = 8192; - // long mDecoderBufferSize = 1048576; + long mDecoderBufferSize; uint32_t* mDecoderPointer = nullptr; uint32_t* mDecoderPointerMax = nullptr; uint32_t* mDecoderPointerNext = nullptr; @@ -114,18 +103,13 @@ class Compressor /** encoder private functions and data members **/ - bool encoderInit(); - bool encoderOpen(const std::string name); - bool encoderWrite(); - bool encoderClose(); void encoderSpider(int itrm); inline void encoderRewind() { mEncoderPointer = reinterpret_cast(mEncoderBuffer); }; inline void encoderNext() { mEncoderPointer++; }; std::ofstream mEncoderFile; char* mEncoderBuffer = nullptr; - bool mOwnEncoderBuffer = false; - long mEncoderBufferSize = 1048576; + long mEncoderBufferSize; uint32_t* mEncoderPointer = nullptr; uint32_t* mEncoderPointerMax = nullptr; uint32_t* mEncoderPointerStart = nullptr; diff --git a/Detectors/TOF/compression/include/TOFCompression/CompressorTask.h b/Detectors/TOF/compression/include/TOFCompression/CompressorTask.h index 950675411ccc8..f26fff902ba29 100644 --- a/Detectors/TOF/compression/include/TOFCompression/CompressorTask.h +++ b/Detectors/TOF/compression/include/TOFCompression/CompressorTask.h @@ -19,7 +19,6 @@ #include "Framework/Task.h" #include "Framework/DataProcessorSpec.h" #include "TOFCompression/Compressor.h" -#include "TOFCompression/RawDataFrame.h" #include using namespace o2::framework; @@ -37,12 +36,8 @@ class CompressorTask : public Task void init(InitContext& ic) final; void run(ProcessingContext& pc) final; - static DataProcessorSpec getSpec(); - private: Compressor mCompressor; - int mTicks = 0; - RawDataFrame mDataFrame; }; } // namespace tof diff --git a/Detectors/TOF/compression/include/TOFCompression/RawDataFrame.h b/Detectors/TOF/compression/include/TOFCompression/RawDataFrame.h deleted file mode 100644 index 8156345e57947..0000000000000 --- a/Detectors/TOF/compression/include/TOFCompression/RawDataFrame.h +++ /dev/null @@ -1,64 +0,0 @@ -// Copyright CERN and copyright holders of ALICE O2. This software is -// distributed under the terms of the GNU General Public License v3 (GPL -// Version 3), copied verbatim in the file "COPYING". -// -// See http://alice-o2.web.cern.ch/license for full licensing information. -// -// In applying this license CERN does not waive the privileges and immunities -// granted to it by virtue of its status as an Intergovernmental Organization -// or submit itself to any jurisdiction. - -/// @file RawDataFrame.h -/// @author Roberto Preghenella -/// @since 2019-12-18 -/// @brief TOF raw data frame - -#ifndef O2_TOF_RAWDATAFRAME -#define O2_TOF_RAWDATAFRAME - -namespace o2 -{ -namespace tof -{ - -class RawDataFrame -{ - - public: - RawDataFrame(int size = 1048576) - : mSize(size), mBuffer(new char[size]){}; - RawDataFrame(const RawDataFrame& other) - : mSize(other.mSize), mBuffer(new char[other.mSize]) - { - for (int i = 0; i < mSize; ++i) - mBuffer[i] = other.mBuffer[i]; - }; - RawDataFrame& operator=(const RawDataFrame& other) - { - if (&other == this) - return *this; - if (mSize != other.mSize) { - delete[] mBuffer; - mSize = other.mSize; - mBuffer = new char[mSize]; - } - for (int i = 0; i < mSize; ++i) - mBuffer[i] = other.mBuffer[i]; - return *this; - }; - ~RawDataFrame() { delete[] mBuffer; }; - int getSize() const { return mSize; }; - char* getBuffer() const { return mBuffer; }; - - // private: - - int mSize; - char* mBuffer; // [mSize] - - ClassDef(RawDataFrame, 1); -}; - -} // namespace tof -} // namespace o2 - -#endif /* O2_TOF_RAWDATAFRAME */ diff --git a/Detectors/TOF/compression/include/TOFCompression/RawReaderTask.h b/Detectors/TOF/compression/include/TOFCompression/RawReaderTask.h deleted file mode 100644 index bea7bbf6c57aa..0000000000000 --- a/Detectors/TOF/compression/include/TOFCompression/RawReaderTask.h +++ /dev/null @@ -1,50 +0,0 @@ -// Copyright CERN and copyright holders of ALICE O2. This software is -// distributed under the terms of the GNU General Public License v3 (GPL -// Version 3), copied verbatim in the file "COPYING". -// -// See http://alice-o2.web.cern.ch/license for full licensing information. -// -// In applying this license CERN does not waive the privileges and immunities -// granted to it by virtue of its status as an Intergovernmental Organization -// or submit itself to any jurisdiction. - -/// @file RawReaderTask.h -/// @author Roberto Preghenella -/// @since 2019-12-18 -/// @brief TOF raw reader task - -#ifndef O2_TOF_RAWREADERTASK -#define O2_TOF_RAWREADERTASK - -#include "Framework/Task.h" -#include "Framework/DataProcessorSpec.h" -#include "TOFCompression/RawDataFrame.h" -#include - -using namespace o2::framework; - -namespace o2 -{ -namespace tof -{ - -class RawReaderTask : public Task -{ - public: - RawReaderTask() = default; - ~RawReaderTask() override = default; - void init(InitContext& ic) final; - void run(ProcessingContext& pc) final; - - static DataProcessorSpec getSpec(); - - private: - bool mStatus = false; - std::ifstream mFile; - std::string mBuffer; -}; - -} // namespace tof -} // namespace o2 - -#endif /* O2_TOF_RAWREADERTASK */ diff --git a/Detectors/TOF/compression/src/CompressedInspectorTask.cxx b/Detectors/TOF/compression/src/CompressedInspectorTask.cxx index 5fa4d1c97cad9..cbd4e47884198 100644 --- a/Detectors/TOF/compression/src/CompressedInspectorTask.cxx +++ b/Detectors/TOF/compression/src/CompressedInspectorTask.cxx @@ -14,7 +14,6 @@ /// @brief TOF compressed data inspector task #include "TOFCompression/CompressedInspectorTask.h" -#include "TOFCompression/RawDataFrame.h" #include "Framework/ControlService.h" #include "Framework/ConfigParamRegistry.h" @@ -26,8 +25,6 @@ #include "TH1F.h" #include "TH2F.h" -//#define VERBOSE - using namespace o2::framework; namespace o2 @@ -69,7 +66,6 @@ void CompressedInspectorTask::init(InitContext& ic) mFile->Close(); }; ic.services().get().set(CallbackService::Id::Stop, finishFunction); - // ic.services().get().set(CallbackService::Id::EndOfStream, finishFunction); } void CompressedInspectorTask::run(ProcessingContext& pc) @@ -83,128 +79,117 @@ void CompressedInspectorTask::run(ProcessingContext& pc) } /** receive input **/ - auto dataFrame = pc.inputs().get("dataframe"); - auto pointer = dataFrame->mBuffer; + for (auto& input : pc.inputs()) { + + /** input **/ + const auto* headerIn = DataRefUtils::getHeader(input); + auto payloadIn = const_cast(input.payload); + auto payloadInSize = headerIn->payloadSize; - /** process input **/ - while (pointer < (dataFrame->mBuffer + dataFrame->mSize)) { - auto rdh = reinterpret_cast(pointer); + /** process input **/ + auto pointer = payloadIn; + while (pointer < (payloadIn + payloadInSize)) { + auto rdh = reinterpret_cast(pointer); - /** RDH close detected **/ - if (rdh->stop) { + /** RDH close detected **/ + if (rdh->stop) { #ifdef VERBOSE - std::cout << "--- RDH close detected" << std::endl; - o2::raw::HBFUtils::printRDH(*rdh); + std::cout << "--- RDH close detected" << std::endl; + o2::raw::HBFUtils::printRDH(*rdh); #endif - pointer += rdh->offsetToNext; - continue; - } + pointer += rdh->offsetToNext; + continue; + } #ifdef VERBOSE - std::cout << "--- RDH open detected" << std::endl; - o2::raw::HBFUtils::printRDH(*rdh); + std::cout << "--- RDH open detected" << std::endl; + o2::raw::HBFUtils::printRDH(*rdh); #endif - pointer += rdh->headerSize; + pointer += rdh->headerSize; - while (pointer < (reinterpret_cast(rdh) + rdh->memorySize)) { + while (pointer < (reinterpret_cast(rdh) + rdh->memorySize)) { - auto word = reinterpret_cast(pointer); - if ((*word & 0x80000000) != 0x80000000) { - printf(" %08x [ERROR] \n ", *(uint32_t*)pointer); - return; - } + auto word = reinterpret_cast(pointer); + if ((*word & 0x80000000) != 0x80000000) { + printf(" %08x [ERROR] \n ", *(uint32_t*)pointer); + return; + } - /** crate header detected **/ - auto crateHeader = reinterpret_cast(pointer); + /** crate header detected **/ + auto crateHeader = reinterpret_cast(pointer); #ifdef VERBOSE - printf(" %08x CrateHeader (drmID=%d) \n ", *(uint32_t*)pointer, crateHeader->drmID); + printf(" %08x CrateHeader (drmID=%d) \n ", *(uint32_t*)pointer, crateHeader->drmID); #endif - for (int ibit = 0; ibit < 11; ++ibit) - if (crateHeader->slotEnableMask & (1 << ibit)) - mHistos2D["slotEnableMask"]->Fill(crateHeader->drmID, ibit + 2); - pointer += 4; + for (int ibit = 0; ibit < 11; ++ibit) + if (crateHeader->slotEnableMask & (1 << ibit)) + mHistos2D["slotEnableMask"]->Fill(crateHeader->drmID, ibit + 2); + pointer += 4; - /** crate orbit expected **/ - auto crateOrbit = reinterpret_cast(pointer); + /** crate orbit expected **/ + auto crateOrbit = reinterpret_cast(pointer); #ifdef VERBOSE - printf(" %08x CrateOrbit (orbit=0x%08x) \n ", *(uint32_t*)pointer, crateOrbit->orbitID); + printf(" %08x CrateOrbit (orbit=0x%08x) \n ", *(uint32_t*)pointer, crateOrbit->orbitID); #endif - pointer += 4; + pointer += 4; - while (true) { - word = reinterpret_cast(pointer); + while (true) { + word = reinterpret_cast(pointer); - /** crate trailer detected **/ - if (*word & 0x80000000) { - auto crateTrailer = reinterpret_cast(pointer); + /** crate trailer detected **/ + if (*word & 0x80000000) { + auto crateTrailer = reinterpret_cast(pointer); #ifdef VERBOSE - printf(" %08x CrateTrailer (numberOfDiagnostics=%d) \n ", *(uint32_t*)pointer, crateTrailer->numberOfDiagnostics); + printf(" %08x CrateTrailer (numberOfDiagnostics=%d) \n ", *(uint32_t*)pointer, crateTrailer->numberOfDiagnostics); #endif - pointer += 4; + pointer += 4; - /** loop over diagnostics **/ - for (int i = 0; i < crateTrailer->numberOfDiagnostics; ++i) { - auto diagnostic = reinterpret_cast(pointer); + /** loop over diagnostics **/ + for (int i = 0; i < crateTrailer->numberOfDiagnostics; ++i) { + auto diagnostic = reinterpret_cast(pointer); #ifdef VERBOSE - printf(" %08x Diagnostic (slotId=%d) \n ", *(uint32_t*)pointer, diagnostic->slotID); + printf(" %08x Diagnostic (slotId=%d) \n ", *(uint32_t*)pointer, diagnostic->slotID); #endif - mHistos2D["diagnostic"]->Fill(crateHeader->drmID, diagnostic->slotID); - pointer += 4; - } + mHistos2D["diagnostic"]->Fill(crateHeader->drmID, diagnostic->slotID); + pointer += 4; + } - break; - } + break; + } - /** frame header detected **/ - auto frameHeader = reinterpret_cast(pointer); + /** frame header detected **/ + auto frameHeader = reinterpret_cast(pointer); #ifdef VERBOSE - printf(" %08x FrameHeader (numberOfHits=%d) \n ", *(uint32_t*)pointer, frameHeader->numberOfHits); + printf(" %08x FrameHeader (numberOfHits=%d) \n ", *(uint32_t*)pointer, frameHeader->numberOfHits); #endif - mHistos1D["hHisto"]->Fill(frameHeader->numberOfHits); - pointer += 4; + mHistos1D["hHisto"]->Fill(frameHeader->numberOfHits); + pointer += 4; - /** loop over hits **/ - for (int i = 0; i < frameHeader->numberOfHits; ++i) { - auto packedHit = reinterpret_cast(pointer); + /** loop over hits **/ + for (int i = 0; i < frameHeader->numberOfHits; ++i) { + auto packedHit = reinterpret_cast(pointer); #ifdef VERBOSE - printf(" %08x PackedHit (tdcID=%d) \n ", *(uint32_t*)pointer, packedHit->tdcID); + printf(" %08x PackedHit (tdcID=%d) \n ", *(uint32_t*)pointer, packedHit->tdcID); #endif - auto indexE = packedHit->channel + - 8 * packedHit->tdcID + - 120 * packedHit->chain + - 240 * (frameHeader->trmID - 3) + - 2400 * crateHeader->drmID; - int time = packedHit->time; - time += (frameHeader->frameID << 13); - - mHistos1D["indexE"]->Fill(indexE); - mHistos1D["time"]->Fill(time); - mHistos1D["tot"]->Fill(packedHit->tot); - pointer += 4; + auto indexE = packedHit->channel + + 8 * packedHit->tdcID + + 120 * packedHit->chain + + 240 * (frameHeader->trmID - 3) + + 2400 * crateHeader->drmID; + int time = packedHit->time; + time += (frameHeader->frameID << 13); + + mHistos1D["indexE"]->Fill(indexE); + mHistos1D["time"]->Fill(time); + mHistos1D["tot"]->Fill(packedHit->tot); + pointer += 4; + } } } - } - pointer = reinterpret_cast(rdh) + rdh->offsetToNext; + pointer = reinterpret_cast(rdh) + rdh->offsetToNext; + } } - - /** write to file **/ - // mFile.write(dataFrame->mBuffer, dataFrame->mSize); -} - -DataProcessorSpec CompressedInspectorTask::getSpec() -{ - std::vector inputs; - std::vector outputs; - - return DataProcessorSpec{ - "tof-compressed-inspector", - Inputs{InputSpec("dataframe", o2::header::gDataOriginTOF, "CMPDATAFRAME", 0, Lifetime::Timeframe)}, - Outputs{}, - AlgorithmSpec{adaptFromTask()}, - Options{ - {"tof-inspector-filename", VariantType::String, "inspector.root", {"Name of the inspector output file"}}}}; } } // namespace tof diff --git a/Detectors/TOF/compression/src/CompressedWriterTask.cxx b/Detectors/TOF/compression/src/CompressedWriterTask.cxx deleted file mode 100644 index 4f06bfb88c13e..0000000000000 --- a/Detectors/TOF/compression/src/CompressedWriterTask.cxx +++ /dev/null @@ -1,84 +0,0 @@ -// Copyright CERN and copyright holders of ALICE O2. This software is -// distributed under the terms of the GNU General Public License v3 (GPL -// Version 3), copied verbatim in the file "COPYING". -// -// See http://alice-o2.web.cern.ch/license for full licensing information. -// -// In applying this license CERN does not waive the privileges and immunities -// granted to it by virtue of its status as an Intergovernmental Organization -// or submit itself to any jurisdiction. - -/// @file CompressedWriterTask.cxx -/// @author Roberto Preghenella -/// @since 2019-12-18 -/// @brief TOF compressed data writer task - -#include "TOFCompression/CompressedWriterTask.h" -#include "TOFCompression/RawDataFrame.h" -#include "Framework/ControlService.h" -#include "Framework/ConfigParamRegistry.h" - -using namespace o2::framework; - -namespace o2 -{ -namespace tof -{ - -void CompressedWriterTask::init(InitContext& ic) -{ - LOG(INFO) << "CompressedWriter init"; - auto filename = ic.options().get("tof-compressed-filename"); - - /** open file **/ - if (mFile.is_open()) { - LOG(WARNING) << "a file was already open, closing"; - mFile.close(); - } - mFile.open(filename.c_str(), std::fstream::out | std::fstream::binary); - if (!mFile.is_open()) { - LOG(ERROR) << "cannot open output file: " << filename; - mStatus = true; - return; - } - - auto finishFunction = [this]() { - LOG(INFO) << "CompressedWriter finish"; - mFile.close(); - }; - ic.services().get().set(CallbackService::Id::Stop, finishFunction); -} - -void CompressedWriterTask::run(ProcessingContext& pc) -{ - LOG(DEBUG) << "CompressedWriter run"; - - /** check status **/ - if (mStatus) { - pc.services().get().readyToQuit(QuitRequest::Me); - return; - } - - /** receive input **/ - auto dataFrame = pc.inputs().get("dataframe"); - - /** write to file **/ - mFile.write(dataFrame->mBuffer, dataFrame->mSize); -} - -DataProcessorSpec CompressedWriterTask::getSpec() -{ - std::vector inputs; - std::vector outputs; - - return DataProcessorSpec{ - "tof-compressed-writer", - Inputs{InputSpec("dataframe", o2::header::gDataOriginTOF, "CMPDATAFRAME", 0, Lifetime::Timeframe)}, // inputs - Outputs{}, // outputs - AlgorithmSpec{adaptFromTask()}, // call constructor + execute init (check) - Options{ - {"tof-compressed-filename", VariantType::String, "/dev/null", {"Name of the compressed output file"}}}}; -} - -} // namespace tof -} // namespace o2 diff --git a/Detectors/TOF/compression/src/Compressor.cxx b/Detectors/TOF/compression/src/Compressor.cxx index 3aa30ebed9b1f..7494d1e79ce11 100644 --- a/Detectors/TOF/compression/src/Compressor.cxx +++ b/Detectors/TOF/compression/src/Compressor.cxx @@ -21,11 +21,14 @@ #include #define DECODER_PARANOID -//#define DECODER_VERBOSE -//#define ENCODER_VERBOSE -//#define CHECKER_VERBOSE -//#define CHECKER_COUNTER +#define DECODER_VERBOSE +#define ENCODER_VERBOSE +#define CHECKER_VERBOSE +#define CHECKER_COUNTER +#ifdef DECODER_PARANOID +#warning "Building code with DecoderParanoid option. This may limit the speed." +#endif #ifdef DECODER_VERBOSE #warning "Building code with DecoderVerbose option. This may limit the speed." #endif @@ -96,208 +99,6 @@ namespace o2 namespace tof { -Compressor::~Compressor() -{ - if (mDecoderBuffer && mOwnDecoderBuffer) - delete[] mDecoderBuffer; - if (mEncoderBuffer && mOwnDecoderBuffer) - delete[] mEncoderBuffer; -} - -bool Compressor::init() -{ - if (decoderInit()) - return true; - if (encoderInit()) - return true; - return false; -} - -bool Compressor::open(const std::string inFileName, const std::string outFileName) -{ - if (decoderOpen(inFileName)) - return true; - if (encoderOpen(outFileName)) - return true; - return false; -} - -bool Compressor::close() -{ - if (decoderClose()) - return true; - if (encoderClose()) - return true; - return false; -} - -bool Compressor::decoderInit() -{ -#ifdef DECODER_VERBOSE - if (mDecoderVerbose) { - std::cout << colorBlue - << "--- INITIALISE DECODER BUFFER: " << mDecoderBufferSize << " bytes" - << colorReset - << std::endl; - } -#endif - if (mDecoderBuffer && mOwnDecoderBuffer) { - std::cout << colorYellow - << "--- a buffer was already allocated, cleaning" - << colorReset - << std::endl; - delete[] mDecoderBuffer; - } - mDecoderBuffer = new char[mDecoderBufferSize]; - mOwnDecoderBuffer = true; - return false; -} - -bool Compressor::encoderInit() -{ -#ifdef ENCODER_VERBOSE - if (mEncoderVerbose) { - std::cout << colorBlue - << "--- INITIALISE ENCODER BUFFER: " << mEncoderBufferSize << " bytes" - << colorReset - << std::endl; - } -#endif - if (mEncoderBuffer && mOwnEncoderBuffer) { - std::cout << colorYellow - << "-W- a buffer was already allocated, cleaning" - << colorReset - << std::endl; - delete[] mEncoderBuffer; - } - mEncoderBuffer = new char[mEncoderBufferSize]; - mOwnEncoderBuffer = true; - encoderRewind(); - return false; -} - -bool Compressor::decoderOpen(const std::string name) -{ - if (mDecoderFile.is_open()) { - std::cout << colorYellow - << "-W- a file was already open, closing" - << colorReset - << std::endl; - mDecoderFile.close(); - } - mDecoderFile.open(name.c_str(), std::fstream::in | std::fstream::binary); - if (!mDecoderFile.is_open()) { - std::cerr << colorRed - << "-E- Cannot open input file: " << name - << colorReset - << std::endl; - return true; - } - return false; -} - -bool Compressor::encoderOpen(const std::string name) -{ - if (mEncoderFile.is_open()) { - std::cout << colorYellow - << "-W- a file was already open, closing" - << colorReset - << std::endl; - mEncoderFile.close(); - } - mEncoderFile.open(name.c_str(), std::fstream::out | std::fstream::binary); - if (!mEncoderFile.is_open()) { - std::cerr << colorRed << "-E- Cannot open output file: " << name - << colorReset - << std::endl; - return true; - } - return false; -} - -bool Compressor::decoderClose() -{ - if (mDecoderFile.is_open()) { - mDecoderFile.close(); - return false; - } - return true; -} - -bool Compressor::encoderClose() -{ - if (mEncoderFile.is_open()) - mEncoderFile.close(); - return false; -} - -bool Compressor::decoderRead() -{ - if (!mDecoderFile.is_open()) { - std::cout << colorRed << "--- no input file is open" - << colorReset - << std::endl; - return true; - } - - char* inputPointer = mDecoderBuffer; - mDecoderFile.read(inputPointer, 64); - mDecoderBufferSize = 64; - auto rdh = reinterpret_cast(inputPointer); - while (!rdh->stop) { - mDecoderFile.read(inputPointer + rdh->headerSize, rdh->offsetToNext - rdh->headerSize); - mDecoderBufferSize += (rdh->offsetToNext - rdh->headerSize); - inputPointer += rdh->offsetToNext; - mDecoderFile.read(inputPointer, 64); - mDecoderBufferSize += 64; - rdh = reinterpret_cast(inputPointer); - } - - /** check end of file **/ - if (mDecoderFile.eof()) { - std::cout << colorRed << "--- Nothing else to read" - << colorReset - << std::endl; - return true; - } - -#ifdef DECODER_VERBOSE - if (mDecoderVerbose) { - std::cout << colorBlue - << "--- DECODER READ HBF: " << mDecoderBufferSize << " bytes" - << colorReset - << std::endl; - } -#endif - - return false; -} - -bool Compressor::encoderWrite() -{ -#ifdef ENCODER_VERBOSE - if (mEncoderVerbose) { - std::cout << colorBlue - << "--- ENCODER WRITE BUFFER: " << getEncoderByteCounter() << " bytes" - << colorReset - << std::endl; - } -#endif - mEncoderFile.write(mEncoderBuffer, getEncoderByteCounter()); - encoderRewind(); - return false; -} - -bool Compressor::decoderParanoid() -{ - if (mDecoderPointer >= mDecoderPointerMax) { - printf("%s %08x [ERROR] fatal error: beyond memory size %s \n", colorRed, *mDecoderPointer, colorReset); - mDecoderFatal = true; - return true; - } - return false; -} - bool Compressor::processHBF() { @@ -1000,6 +801,16 @@ bool Compressor::processDRM() return false; } +bool Compressor::decoderParanoid() +{ + if (mDecoderPointer >= mDecoderPointerMax) { + printf("%s %08x [ERROR] fatal error: beyond memory size %s \n", colorRed, *mDecoderPointer, colorReset); + mDecoderFatal = true; + return true; + } + return false; +} + void Compressor::encoderSpider(int itrm) { int slotId = itrm + 3; diff --git a/Detectors/TOF/compression/src/CompressorTask.cxx b/Detectors/TOF/compression/src/CompressorTask.cxx index 24e5dec907ebb..96c631165ae1f 100644 --- a/Detectors/TOF/compression/src/CompressorTask.cxx +++ b/Detectors/TOF/compression/src/CompressorTask.cxx @@ -16,7 +16,11 @@ #include "TOFCompression/CompressorTask.h" #include "Framework/ControlService.h" #include "Framework/ConfigParamRegistry.h" -#include "Framework/WorkflowSpec.h" +#include "Framework/RawDeviceService.h" +#include "Framework/DeviceSpec.h" +#include "Framework/DataSpecUtils.h" + +#include using namespace o2::framework; @@ -29,12 +33,9 @@ void CompressorTask::init(InitContext& ic) { LOG(INFO) << "Compressor init"; - /** link encoder output buffer **/ - mCompressor.setEncoderBuffer(const_cast(mDataFrame.mBuffer)); - - auto decoderVerbose = ic.options().get("tof-compressor-decoder-verbose"); - auto encoderVerbose = ic.options().get("tof-compressor-encoder-verbose"); - auto checkerVerbose = ic.options().get("tof-compressor-checker-verbose"); + auto decoderVerbose = ic.options().get("decoder-verbose"); + auto encoderVerbose = ic.options().get("encoder-verbose"); + auto checkerVerbose = ic.options().get("checker-verbose"); mCompressor.setDecoderVerbose(decoderVerbose); mCompressor.setEncoderVerbose(encoderVerbose); @@ -51,38 +52,48 @@ void CompressorTask::run(ProcessingContext& pc) { LOG(DEBUG) << "Compressor run"; + /** set encoder output buffer **/ + char bufferOut[1048576]; + mCompressor.setEncoderBuffer(bufferOut); + mCompressor.setEncoderBufferSize(1048576); + + auto device = pc.services().get().device(); + auto outputRoutes = pc.services().get().spec().outputs; + auto fairMQChannel = outputRoutes.at(0).channel; + /** receive input **/ for (auto& input : pc.inputs()) { - const auto* header = DataRefUtils::getHeader(input); - auto payload = const_cast(input.payload); - auto payloadSize = header->payloadSize; - mCompressor.setDecoderBuffer(payload); - mCompressor.setDecoderBufferSize(payloadSize); + + /** input **/ + auto headerIn = DataRefUtils::getHeader(input); + auto dataProcessingHeaderIn = DataRefUtils::getHeader(input); + auto payloadIn = const_cast(input.payload); + auto payloadInSize = headerIn->payloadSize; + mCompressor.setDecoderBuffer(payloadIn); + mCompressor.setDecoderBufferSize(payloadInSize); /** run **/ mCompressor.run(); - - /** push output **/ - mDataFrame.mSize = mCompressor.getEncoderByteCounter(); - pc.outputs().snapshot(Output{o2::header::gDataOriginTOF, "CMPDATAFRAME", 0, Lifetime::Timeframe}, mDataFrame); + auto payloadOutSize = mCompressor.getEncoderByteCounter(); + auto payloadMessage = device->NewMessage(payloadOutSize); + std::memcpy(payloadMessage->GetData(), bufferOut, payloadOutSize); + + /** output **/ + auto headerOut = *headerIn; + auto dataProcessingHeaderOut = *dataProcessingHeaderIn; + headerOut.dataDescription = "CRAWDATA"; + headerOut.payloadSize = payloadOutSize; + o2::header::Stack headerStack{headerOut, dataProcessingHeaderOut}; + auto headerMessage = device->NewMessage(headerStack.size()); + std::memcpy(headerMessage->GetData(), headerStack.data(), headerStack.size()); + + /** send **/ + FairMQParts parts; + parts.AddPart(std::move(headerMessage)); + parts.AddPart(std::move(payloadMessage)); + device->Send(parts, fairMQChannel); } } -DataProcessorSpec CompressorTask::getSpec() -{ - std::vector inputs; - std::vector outputs; - - return DataProcessorSpec{ - "tof-compressor", - select("x:TOF/RAWDATA"), - Outputs{OutputSpec(o2::header::gDataOriginTOF, "CMPDATAFRAME", 0, Lifetime::Timeframe)}, - AlgorithmSpec{adaptFromTask()}, - Options{ - {"tof-compressor-decoder-verbose", VariantType::Bool, false, {"Decoder verbose flag"}}, - {"tof-compressor-encoder-verbose", VariantType::Bool, false, {"Encoder verbose flag"}}, - {"tof-compressor-checker-verbose", VariantType::Bool, false, {"Checker verbose flag"}}}}; -} - } // namespace tof } // namespace o2 diff --git a/Detectors/TOF/compression/src/RawReaderTask.cxx b/Detectors/TOF/compression/src/RawReaderTask.cxx deleted file mode 100644 index cf8a0997c3201..0000000000000 --- a/Detectors/TOF/compression/src/RawReaderTask.cxx +++ /dev/null @@ -1,100 +0,0 @@ -// Copyright CERN and copyright holders of ALICE O2. This software is -// distributed under the terms of the GNU General Public License v3 (GPL -// Version 3), copied verbatim in the file "COPYING". -// -// See http://alice-o2.web.cern.ch/license for full licensing information. -// -// In applying this license CERN does not waive the privileges and immunities -// granted to it by virtue of its status as an Intergovernmental Organization -// or submit itself to any jurisdiction. - -/// @file RawReaderTask.cxx -/// @author Roberto Preghenella -/// @since 2019-12-18 -/// @brief TOF raw reader task - -#include "TOFCompression/RawReaderTask.h" -#include "Framework/ControlService.h" -#include "Framework/ConfigParamRegistry.h" -#include "Headers/RAWDataHeader.h" - -using namespace o2::framework; - -namespace o2 -{ -namespace tof -{ - -void RawReaderTask::init(InitContext& ic) -{ - LOG(INFO) << "RawReader init"; - auto filename = ic.options().get("tof-raw-filename"); - mBuffer.reserve(1048576); - - /** open file **/ - if (mFile.is_open()) { - LOG(WARNING) << "a file was already open, closing"; - mFile.close(); - } - mFile.open(filename.c_str(), std::fstream::in | std::fstream::binary); - if (!mFile.is_open()) { - LOG(ERROR) << "cannot open input file: " << filename; - mStatus = true; - return; - } -} - -void RawReaderTask::run(ProcessingContext& pc) -{ - LOG(DEBUG) << "RawReader run"; - - /** check status **/ - if (mStatus) { - pc.services().get().endOfStream(); - pc.services().get().readyToQuit(QuitRequest::Me); - return; - } - - /** read full HBF **/ - int headerSize = sizeof(o2::header::RAWDataHeader); - char* inputPointer = mBuffer.data(); - mFile.read(inputPointer, headerSize); - inputPointer += headerSize; - auto rdh = reinterpret_cast(inputPointer); - while (!rdh->stop) { - auto dataSize = rdh->offsetToNext - headerSize; - mFile.read(inputPointer, dataSize); - inputPointer += dataSize; - mFile.read(inputPointer, headerSize); - rdh = reinterpret_cast(inputPointer); - inputPointer += headerSize; - } - mBuffer.resize(inputPointer - mBuffer.data()); - - auto freefct = [](void* data, void* hint) {}; // simply ignore the cleanup for the test - pc.outputs().adoptChunk(Output{o2::header::gDataOriginTOF, "RAWDATAFRAME", 0, Lifetime::Timeframe}, mBuffer.data(), mBuffer.size(), freefct, nullptr); - - /** check eof **/ - if (mFile.eof()) { - LOG(WARNING) << "nothig else to read"; - mFile.close(); - mStatus = true; - } -} - -DataProcessorSpec RawReaderTask::getSpec() -{ - std::vector inputs; - std::vector outputs; - - return DataProcessorSpec{ - "tof-raw-reader", - Inputs{}, // inputs - Outputs{OutputSpec(o2::header::gDataOriginTOF, "RAWDATAFRAME", 0, Lifetime::Timeframe)}, // outputs - AlgorithmSpec{adaptFromTask()}, // call constructor + execute init (check) - Options{ - {"tof-raw-filename", VariantType::String, "", {"Name of the raw input file"}}}}; -} - -} // namespace tof -} // namespace o2 diff --git a/Detectors/TOF/compression/src/TOFCompressionLinkDef.h b/Detectors/TOF/compression/src/TOFCompressionLinkDef.h deleted file mode 100644 index ffe7749e8f728..0000000000000 --- a/Detectors/TOF/compression/src/TOFCompressionLinkDef.h +++ /dev/null @@ -1,19 +0,0 @@ -// Copyright CERN and copyright holders of ALICE O2. This software is -// distributed under the terms of the GNU General Public License v3 (GPL -// Version 3), copied verbatim in the file "COPYING". -// -// See http://alice-o2.web.cern.ch/license for full licensing information. -// -// In applying this license CERN does not waive the privileges and immunities -// granted to it by virtue of its status as an Intergovernmental Organization -// or submit itself to any jurisdiction. - -#ifdef __CLING__ - -#pragma link off all globals; -#pragma link off all classes; -#pragma link off all functions; - -#pragma link C++ class o2::tof::RawDataFrame + ; - -#endif diff --git a/Detectors/TOF/compression/src/tof-compressed-inspector.cxx b/Detectors/TOF/compression/src/tof-compressed-inspector.cxx new file mode 100644 index 0000000000000..23a42623033c7 --- /dev/null +++ b/Detectors/TOF/compression/src/tof-compressed-inspector.cxx @@ -0,0 +1,48 @@ +// Copyright CERN and copyright holders of ALICE O2. This software is +// distributed under the terms of the GNU General Public License v3 (GPL +// Version 3), copied verbatim in the file "COPYING". +// +// See http://alice-o2.web.cern.ch/license for full licensing information. +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +/// @file tof-compressor.cxx +/// @author Roberto Preghenella +/// @since 2019-12-18 +/// @brief Basic DPL workflow for TOF raw data compression + +#include "TOFCompression/CompressedInspectorTask.h" +#include "Framework/WorkflowSpec.h" +#include "Framework/ConfigParamSpec.h" +#include "FairLogger.h" + +using namespace o2::framework; + +// add workflow options, note that customization needs to be declared before +// including Framework/runDataProcessing +void customize(std::vector& workflowOptions) +{ + auto inputDesc = ConfigParamSpec{"input-desc", VariantType::String, "CRAWDATA", {"Input specs description string"}}; + workflowOptions.push_back(inputDesc); +} + +#include "Framework/runDataProcessing.h" // the main driver + +/// This function hooks up the the workflow specifications into the DPL driver. +WorkflowSpec defineDataProcessing(ConfigContext const& cfgc) +{ + auto inputDesc = cfgc.options().get("input-desc"); + + WorkflowSpec workflow; + workflow.emplace_back(DataProcessorSpec{ + "tof-compressed-inspector", + select(std::string("x:TOF/" + inputDesc).c_str()), + Outputs{}, + AlgorithmSpec{adaptFromTask()}, + Options{ + {"tof-inspector-filename", VariantType::String, "inspector.root", {"Name of the inspector output file"}}}}); + + return workflow; +} diff --git a/Detectors/TOF/compression/src/tof-compressor.cxx b/Detectors/TOF/compression/src/tof-compressor.cxx index 085e04ea55cc9..64c53dfe26bb3 100644 --- a/Detectors/TOF/compression/src/tof-compressor.cxx +++ b/Detectors/TOF/compression/src/tof-compressor.cxx @@ -13,13 +13,10 @@ /// @since 2019-12-18 /// @brief Basic DPL workflow for TOF raw data compression -#include "TOFCompression/RawReaderTask.h" #include "TOFCompression/CompressorTask.h" -#include "TOFCompression/CompressedWriterTask.h" -#include "TOFCompression/CompressedInspectorTask.h" #include "Framework/WorkflowSpec.h" #include "Framework/ConfigParamSpec.h" -#include "Framework/runDataProcessing.h" // the main driver +#include "Framework/ConcreteDataMatcher.h" #include "FairLogger.h" using namespace o2::framework; @@ -28,23 +25,34 @@ using namespace o2::framework; // including Framework/runDataProcessing void customize(std::vector& workflowOptions) { - // workflowOptions.push_back(ConfigParamSpec{"verbose", o2::framework::VariantType::Bool, false, {"Verbose flag"}}); + auto inputDesc = ConfigParamSpec{"input-desc", VariantType::String, "RAWDATA", {"Input specs description string"}}; + auto outputDesc = ConfigParamSpec{"output-desc", VariantType::String, "CRAWDATA", {"Output specs description string"}}; + + workflowOptions.push_back(inputDesc); + workflowOptions.push_back(outputDesc); } +#include "Framework/runDataProcessing.h" // the main driver + /// This function hooks up the the workflow specifications into the DPL driver. WorkflowSpec defineDataProcessing(ConfigContext const& cfgc) { - LOG(INFO) << "TOF COMPRESSION WORKFLOW configuration"; - - // auto verbose = cfgc.options().get("verbose"); - - // add devices (Spec) to the workflow - WorkflowSpec specs; - // specs.emplace_back(o2::tof::RawReaderTask::getSpec()); - specs.emplace_back(o2::tof::CompressorTask::getSpec()); - specs.emplace_back(o2::tof::CompressedWriterTask::getSpec()); - specs.emplace_back(o2::tof::CompressedInspectorTask::getSpec()); - LOG(INFO) << "Number of active devices = " << specs.size(); - return std::move(specs); + auto inputDesc = cfgc.options().get("input-desc"); + auto outputDesc = cfgc.options().get("output-desc"); + std::vector outputs; + outputs.emplace_back(OutputSpec(ConcreteDataTypeMatcher{"TOF", "CRAWDATA"})); + + WorkflowSpec workflow; + workflow.emplace_back(DataProcessorSpec{ + "tof-compressor", + select(std::string("x:TOF/" + inputDesc).c_str()), + outputs, + AlgorithmSpec{adaptFromTask()}, + Options{ + {"decoder-verbose", VariantType::Bool, false, {"Decoder verbose flag"}}, + {"encoder-verbose", VariantType::Bool, false, {"Encoder verbose flag"}}, + {"checker-verbose", VariantType::Bool, false, {"Checker verbose flag"}}}}); + + return workflow; } From f20935054d28b225d7fd6ddda5297a7e8af2dee2 Mon Sep 17 00:00:00 2001 From: noferini Date: Mon, 24 Feb 2020 12:32:08 +0100 Subject: [PATCH 2/4] add tof compressed decoding task --- .../tofworkflow/src/tof-reco-workflow.cxx | 16 +- Detectors/TOF/base/include/TOFBase/Strip.h | 2 +- .../TOF/base/include/TOFBase/WindowFiller.h | 5 +- Detectors/TOF/base/src/WindowFiller.cxx | 22 ++ .../include/TOFReconstruction/Decoder.h | 3 + Detectors/TOF/reconstruction/src/Decoder.cxx | 39 ++- .../include/TOFSimulation/Digitizer.h | 4 +- Detectors/TOF/simulation/src/Digitizer.cxx | 26 +- Detectors/TOF/workflow/CMakeLists.txt | 2 +- ...wReaderSpec.h => CompressedDecodingTask.h} | 39 +-- .../workflow/src/CompressedDecodingTask.cxx | 228 ++++++++++++++++++ Detectors/TOF/workflow/src/RawReaderSpec.cxx | 113 --------- .../TOF/workflow/src/TOFRawWriterSpec.cxx | 6 +- .../src/TOFDigitizerSpec.cxx | 33 ++- 14 files changed, 390 insertions(+), 148 deletions(-) rename Detectors/TOF/workflow/include/TOFWorkflow/{RawReaderSpec.h => CompressedDecodingTask.h} (55%) create mode 100644 Detectors/TOF/workflow/src/CompressedDecodingTask.cxx delete mode 100644 Detectors/TOF/workflow/src/RawReaderSpec.cxx diff --git a/Detectors/GlobalTrackingWorkflow/tofworkflow/src/tof-reco-workflow.cxx b/Detectors/GlobalTrackingWorkflow/tofworkflow/src/tof-reco-workflow.cxx index 75203878dd32b..1d7bfca17aae2 100644 --- a/Detectors/GlobalTrackingWorkflow/tofworkflow/src/tof-reco-workflow.cxx +++ b/Detectors/GlobalTrackingWorkflow/tofworkflow/src/tof-reco-workflow.cxx @@ -17,13 +17,13 @@ #include "GlobalTrackingWorkflow/TrackTPCITSReaderSpec.h" #include "TOFWorkflow/DigitReaderSpec.h" #include "TOFWorkflow/TOFDigitWriterSpec.h" -#include "TOFWorkflow/RawReaderSpec.h" #include "TOFWorkflow/ClusterReaderSpec.h" #include "TOFWorkflow/TOFClusterizerSpec.h" #include "TOFWorkflow/TOFClusterWriterSpec.h" #include "TOFWorkflow/TOFMatchedWriterSpec.h" #include "TOFWorkflow/TOFCalibWriterSpec.h" #include "TOFWorkflow/TOFRawWriterSpec.h" +#include "TOFWorkflow/CompressedDecodingTask.h" #include "Framework/WorkflowSpec.h" #include "Framework/ConfigParamSpec.h" #include "TOFWorkflow/RecoWorkflowSpec.h" @@ -52,6 +52,7 @@ void customize(std::vector& workflowOptions) workflowOptions.push_back(ConfigParamSpec{"tof-lanes", o2::framework::VariantType::Int, 1, {"number of parallel lanes up to the matcher, TBI"}}); workflowOptions.push_back(ConfigParamSpec{"use-ccdb", o2::framework::VariantType::Bool, false, {"enable access to ccdb tof calibration objects"}}); workflowOptions.push_back(ConfigParamSpec{"use-fit", o2::framework::VariantType::Bool, false, {"enable access to fit info for calibration"}}); + workflowOptions.push_back(ConfigParamSpec{"input-desc", o2::framework::VariantType::String, "CRAWDATA", {"Input specs description string"}}); } #include "Framework/runDataProcessing.h" // the main driver @@ -110,6 +111,10 @@ WorkflowSpec defineDataProcessing(ConfigContext const& cfgc) if (outputType.rfind("raw") < outputType.size()) writeraw = 1; + bool dgtinput = 0; + if (inputType == "digits") { + dgtinput = 1; + } bool clusterinput = 0; if (inputType == "clusters") { clusterinput = 1; @@ -135,7 +140,7 @@ WorkflowSpec defineDataProcessing(ConfigContext const& cfgc) if (clusterinput) { LOG(INFO) << "Insert TOF Cluster Reader"; specs.emplace_back(o2::tof::getClusterReaderSpec(useMC)); - } else if (!rawinput) { + } else if (dgtinput) { // TOF clusterizer LOG(INFO) << "Insert TOF Digit reader from file"; specs.emplace_back(o2::tof::getDigitReaderSpec(useMC)); @@ -144,9 +149,10 @@ WorkflowSpec defineDataProcessing(ConfigContext const& cfgc) LOG(INFO) << "Insert TOF Raw writer"; specs.emplace_back(o2::tof::getTOFRawWriterSpec()); } - } else { - LOG(INFO) << "Insert TOF Raw Reader"; - specs.emplace_back(o2::tof::getRawReaderSpec()); + } else if (rawinput) { + LOG(INFO) << "Insert TOF Compressed Raw Decoder"; + auto inputDesc = cfgc.options().get("input-desc"); + specs.emplace_back(o2::tof::getCompressedDecodingSpec(inputDesc)); useMC = 0; if (writedigit) { diff --git a/Detectors/TOF/base/include/TOFBase/Strip.h b/Detectors/TOF/base/include/TOFBase/Strip.h index f2e868023aa17..015ed765e029e 100644 --- a/Detectors/TOF/base/include/TOFBase/Strip.h +++ b/Detectors/TOF/base/include/TOFBase/Strip.h @@ -56,7 +56,7 @@ class Strip /// Empties the point container /// @param option unused - void clear(); + void clear() { mDigits.clear(); } /// Change the chip index /// @param index New chip index diff --git a/Detectors/TOF/base/include/TOFBase/WindowFiller.h b/Detectors/TOF/base/include/TOFBase/WindowFiller.h index 1b6fd22da53c0..ee009b7cc519a 100644 --- a/Detectors/TOF/base/include/TOFBase/WindowFiller.h +++ b/Detectors/TOF/base/include/TOFBase/WindowFiller.h @@ -29,9 +29,11 @@ class WindowFiller void initObj(); + void reset(); + Int_t getCurrentReadoutWindow() const { return mReadoutWindowCurrent; } void setCurrentReadoutWindow(Double_t value) { mReadoutWindowCurrent = value; } - void setEventTime(double value) { mEventTime = value; } + void setEventTime(double value) { mEventTime = value - mTF * o2::constants::lhc::LHCOrbitNS * 256; } std::vector* getDigitPerTimeFrame() { return &mDigitsPerTimeFrame; } std::vector* getReadoutWindowData() { return &mReadoutWindowData; } @@ -45,6 +47,7 @@ class WindowFiller protected: // info TOF timewindow + Int_t mTF = 0; Int_t mReadoutWindowCurrent = 0; Int_t mFirstOrbit = 0; Int_t mFirstBunch = 0; diff --git a/Detectors/TOF/base/src/WindowFiller.cxx b/Detectors/TOF/base/src/WindowFiller.cxx index 8fb883e293509..77fb13571f5be 100644 --- a/Detectors/TOF/base/src/WindowFiller.cxx +++ b/Detectors/TOF/base/src/WindowFiller.cxx @@ -78,6 +78,28 @@ void WindowFiller::initObj() } } //______________________________________________________________________ +void WindowFiller::reset() +{ + mIcurrentReadoutWindow = 0; + mReadoutWindowCurrent = 0; + + for (Int_t i = 0; i < MAXWINDOWS; i++) { + for (Int_t j = 0; j < Geo::NSTRIPS; j++) { + mStrips[i][j].clear(); + } + } + mFutureDigits.clear(); + + mStripsCurrent = &(mStrips[0]); + mStripsNext[0] = &(mStrips[1]); + + mDigitsPerTimeFrame.clear(); + mReadoutWindowData.clear(); + + mFirstOrbit = 0; + mFirstBunch = 0; +} +//______________________________________________________________________ void WindowFiller::fillDigitsInStrip(std::vector* strips, int channel, int tdc, int tot, int nbc, UInt_t istrip, Int_t triggerorbit, Int_t triggerbunch) { (*strips)[istrip].addDigit(channel, tdc, tot * Geo::NTOTBIN_PER_NS, nbc, 0, triggerorbit, triggerbunch); diff --git a/Detectors/TOF/reconstruction/include/TOFReconstruction/Decoder.h b/Detectors/TOF/reconstruction/include/TOFReconstruction/Decoder.h index 534c06b883757..eeee1b4838304 100644 --- a/Detectors/TOF/reconstruction/include/TOFReconstruction/Decoder.h +++ b/Detectors/TOF/reconstruction/include/TOFReconstruction/Decoder.h @@ -45,6 +45,9 @@ class Decoder : public WindowFiller bool decode(); void readTRM(int icru, int icrate, int orbit, int bunchid); + void InsertDigit(int icrate, int itrm, int itdc, int ichain, int channel, int orbit, int bunchid, int time_ext, int tdc, int tot); + void FillWindows(); + void clear(); bool close(); void setVerbose(bool val) { mVerbose = val; }; diff --git a/Detectors/TOF/reconstruction/src/Decoder.cxx b/Detectors/TOF/reconstruction/src/Decoder.cxx index 5945819a19300..4f5659028922e 100644 --- a/Detectors/TOF/reconstruction/src/Decoder.cxx +++ b/Detectors/TOF/reconstruction/src/Decoder.cxx @@ -103,6 +103,36 @@ bool Decoder::close() return false; } +void Decoder::clear() +{ + reset(); +} + +void Decoder::InsertDigit(int icrate, int itrm, int itdc, int ichain, int channel, int orbit, int bunchid, int time_ext, int tdc, int tot) +{ + std::array digitInfo; + + fromRawHit2Digit(icrate, itrm, itdc, ichain, channel, orbit, bunchid, time_ext + tdc, tot, digitInfo); + + mHitDecoded++; + + int isnext = digitInfo[3] * Geo::BC_IN_WINDOW_INV; + + if (isnext >= MAXWINDOWS) { // accumulate all digits which are not in the first windows + + insertDigitInFuture(digitInfo[0], digitInfo[1], digitInfo[2], digitInfo[3], 0, digitInfo[4], digitInfo[5]); + } else { + std::vector* cstrip = mStripsCurrent; // first window + if (isnext) + cstrip = mStripsNext[isnext - 1]; // next window + + UInt_t istrip = digitInfo[0] / Geo::NPADS; + + // add digit + fillDigitsInStrip(cstrip, digitInfo[0], digitInfo[1], digitInfo[2], digitInfo[3], istrip); + } +} + void Decoder::readTRM(int icru, int icrate, int orbit, int bunchid) { @@ -272,11 +302,16 @@ bool Decoder::decode() // return a vector of digits in a TOF readout window // since digits are not yet divided in tof readout window we need to do it // flushOutputContainer does the job + FillWindows(); + + return false; +} + +void Decoder::FillWindows() +{ std::vector digTemp; flushOutputContainer(digTemp); printf("hit decoded = %d (digits not filled = %lu)\n", mHitDecoded, mFutureDigits.size()); - - return false; } void Decoder::printCrateInfo(int icru) const diff --git a/Detectors/TOF/simulation/include/TOFSimulation/Digitizer.h b/Detectors/TOF/simulation/include/TOFSimulation/Digitizer.h index c670a308a9865..a4f3403f83c47 100644 --- a/Detectors/TOF/simulation/include/TOFSimulation/Digitizer.h +++ b/Detectors/TOF/simulation/include/TOFSimulation/Digitizer.h @@ -35,7 +35,9 @@ class Digitizer : public WindowFiller void init(); - void process(const std::vector* hits, std::vector* digits); + void newTF(); + + int process(const std::vector* hits, std::vector* digits); void setCalibApi(CalibApi* calibApi) { mCalibApi = calibApi; } diff --git a/Detectors/TOF/simulation/src/Digitizer.cxx b/Detectors/TOF/simulation/src/Digitizer.cxx index b237a2b1f146c..503c60f430c69 100644 --- a/Detectors/TOF/simulation/src/Digitizer.cxx +++ b/Detectors/TOF/simulation/src/Digitizer.cxx @@ -82,11 +82,13 @@ void Digitizer::init() //______________________________________________________________________ -void Digitizer::process(const std::vector* hits, std::vector* digits) +int Digitizer::process(const std::vector* hits, std::vector* digits) { // hits array of TOF hits for a given simulated event // digits passed from external to be filled, in continuous readout mode we will push it on mDigitsPerTimeFrame vector of vectors of digits + // printf("process event time = %f with %ld hits\n",mEventTime,hits->size()); + Int_t readoutwindow = Int_t((mEventTime - Geo::BC_TIME * (Geo::OVERLAP_IN_BC + 2)) * Geo::READOUTWINDOW_INV); // event time shifted by 2 BC as safe margin before to change current readout window to account for decalibration if (mContinuous && readoutwindow > mReadoutWindowCurrent) { // if we are moving in future readout windows flush previous ones (only for continuous readout mode) @@ -98,6 +100,9 @@ void Digitizer::process(const std::vector* hits, std::vector* di } // close loop readout window } // close if continuous + if (mReadoutWindowCurrent >= 256 * Geo::NWINDOW_IN_ORBIT) // new TF + return 1; + for (auto& hit : *hits) { //TODO: put readout window counting/selection @@ -108,6 +113,8 @@ void Digitizer::process(const std::vector* hits, std::vector* di digits->clear(); fillOutputContainer(*digits); } + + return 0; } //______________________________________________________________________ @@ -306,7 +313,7 @@ void Digitizer::addDigit(Int_t channel, UInt_t istrip, Double_t time, Float_t x, if (isnext < 0) { LOG(ERROR) << "error: isnext =" << isnext << "(current window = " << mReadoutWindowCurrent << ")" - << "\n"; + << " nbc = " << nbc << " -- event time = " << mEventTime << " -- TF = " << mTF << "\n"; return; } @@ -335,7 +342,7 @@ void Digitizer::addDigit(Int_t channel, UInt_t istrip, Double_t time, Float_t x, iscurrent = false; } - // printf("add TOF digit c=%i n=%i\n",iscurrent,isnext); + //printf("add TOF digit c=%i n=%i\n",iscurrent,isnext); std::vector* strips; o2::dataformats::MCTruthContainer* mcTruthContainer; @@ -490,7 +497,16 @@ Float_t Digitizer::getEffZ(Float_t z) //______________________________________________________________________ Float_t Digitizer::getFractionOfCharge(Float_t x, Float_t z) { return 1; } +//______________________________________________________________________ +void Digitizer::newTF() +{ + reset(); + mMCTruthOutputContainerPerTimeFrame.clear(); + + mTF++; + printf("New TF = %d\n", mTF); +} //______________________________________________________________________ void Digitizer::initParameters() { @@ -783,10 +799,12 @@ void Digitizer::fillOutputContainer(std::vector& digits) // filling the digit container doing a loop on all strips for (auto& strip : *mStripsCurrent) { strip.fillOutputContainer(digits); + if (strip.getNumberOfDigits()) + printf("strip size = %d - digit size = %d\n", strip.getNumberOfDigits(), digits.size()); } if (mContinuous) { - // printf("%i) # TOF digits = %lu (%p)\n", mIcurrentReadoutWindow, digits.size(), mStripsCurrent); + //printf("%i) # TOF digits = %lu (%p)\n", mIcurrentReadoutWindow, digits.size(), mStripsCurrent); int first = mDigitsPerTimeFrame.size(); int ne = digits.size(); ReadoutWindowData info(first, ne); diff --git a/Detectors/TOF/workflow/CMakeLists.txt b/Detectors/TOF/workflow/CMakeLists.txt index a117852cfe3f2..25fd0dd508a1b 100644 --- a/Detectors/TOF/workflow/CMakeLists.txt +++ b/Detectors/TOF/workflow/CMakeLists.txt @@ -10,12 +10,12 @@ o2_add_library(TOFWorkflowUtils SOURCES src/DigitReaderSpec.cxx - src/RawReaderSpec.cxx src/ClusterReaderSpec.cxx src/TOFClusterizerSpec.cxx src/TOFClusterWriterSpec.cxx src/TOFDigitWriterSpec.cxx src/TOFRawWriterSpec.cxx + src/CompressedDecodingTask.cxx PUBLIC_LINK_LIBRARIES O2::Framework O2::TOFBase O2::DataFormatsTOF O2::TOFReconstruction) diff --git a/Detectors/TOF/workflow/include/TOFWorkflow/RawReaderSpec.h b/Detectors/TOF/workflow/include/TOFWorkflow/CompressedDecodingTask.h similarity index 55% rename from Detectors/TOF/workflow/include/TOFWorkflow/RawReaderSpec.h rename to Detectors/TOF/workflow/include/TOFWorkflow/CompressedDecodingTask.h index 9a54e1c38fedd..94cfa13fde891 100644 --- a/Detectors/TOF/workflow/include/TOFWorkflow/RawReaderSpec.h +++ b/Detectors/TOF/workflow/include/TOFWorkflow/CompressedDecodingTask.h @@ -8,19 +8,19 @@ // granted to it by virtue of its status as an Intergovernmental Organization // or submit itself to any jurisdiction. -/// @file RawReaderSpec.h +/// @file CompressedDecodingTask.h +/// @author Francesco Noferini +/// @since 2020-02-25 +/// @brief TOF compressed data decoding task -#ifndef O2_TOF_RAWREADER -#define O2_TOF_RAWREADER +#ifndef O2_TOF_COMPRESSEDDECODINGTASK +#define O2_TOF_COMPRESSEDDECODINGTASK -#include "TFile.h" - -#include "Framework/DataProcessorSpec.h" #include "Framework/Task.h" +#include "Framework/DataProcessorSpec.h" +#include #include "TOFReconstruction/Decoder.h" #include "TOFBase/Digit.h" -#include "SimulationDataFormat/MCCompLabel.h" -#include "SimulationDataFormat/MCTruthContainer.h" using namespace o2::framework; @@ -29,25 +29,30 @@ namespace o2 namespace tof { -class RawReader : public Task +class CompressedDecodingTask : public Task { public: - RawReader() = default; - ~RawReader() override = default; + CompressedDecodingTask() = default; + ~CompressedDecodingTask() override = default; void init(InitContext& ic) final; void run(ProcessingContext& pc) final; + void postData(ProcessingContext& pc); + private: - int mState = 0; - std::string mFilename; + bool mStatus = false; + o2::tof::compressed::Decoder mDecoder; std::vector> mDigits; + int mNTF = 0; + int mNCrateOpenTF = 0; + int mNCrateCloseTF = 0; + bool mHasToBePosted = false; + int mInitOrbit = 0; }; -/// create a processor spec -/// read simulated TOF raws from a root file -framework::DataProcessorSpec getRawReaderSpec(); +framework::DataProcessorSpec getCompressedDecodingSpec(std::string inputDesc); } // namespace tof } // namespace o2 -#endif /* O2_TOF_RAWREADER */ +#endif /* O2_TOF_COMPRESSEDDECODINGTASK */ diff --git a/Detectors/TOF/workflow/src/CompressedDecodingTask.cxx b/Detectors/TOF/workflow/src/CompressedDecodingTask.cxx new file mode 100644 index 0000000000000..3657df5ed54f7 --- /dev/null +++ b/Detectors/TOF/workflow/src/CompressedDecodingTask.cxx @@ -0,0 +1,228 @@ +// Copyright CERN and copyright holders of ALICE O2. This software is +// distributed under the terms of the GNU General Public License v3 (GPL +// Version 3), copied verbatim in the file "COPYING". +// +// See http://alice-o2.web.cern.ch/license for full licensing information. +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +/// @file CompressedDecodingTask.cxx +/// @author Francesco Noferini +/// @since 2020-02-25 +/// @brief TOF compressed data decoding task + +#include "TOFWorkflow/CompressedDecodingTask.h" +#include "Framework/ControlService.h" +#include "Framework/ConfigParamRegistry.h" + +#include "Headers/RAWDataHeader.h" +#include "DataFormatsTOF/CompressedDataFormat.h" +#include "DetectorsRaw/HBFUtils.h" +#include "DataFormatsParameters/GRPObject.h" +#include "Framework/WorkflowSpec.h" + +using namespace o2::framework; + +namespace o2 +{ +namespace tof +{ + +void CompressedDecodingTask::init(InitContext& ic) +{ + LOG(INFO) << "CompressedDecoding init"; + + auto finishFunction = [this]() { + LOG(INFO) << "CompressedDecoding finish"; + }; + ic.services().get().set(CallbackService::Id::Stop, finishFunction); +} + +void CompressedDecodingTask::postData(ProcessingContext& pc) +{ + mHasToBePosted = false; + mDecoder.FillWindows(); + + // send output message + std::vector* alldigits = mDecoder.getDigitPerTimeFrame(); + std::vector* row = mDecoder.getReadoutWindowData(); + + int n_tof_window = row->size(); + int n_orbits = n_tof_window / 3; + int digit_size = alldigits->size(); + + LOG(INFO) << "TOF: N tof window decoded = " << n_tof_window << "(orbits = " << n_orbits << ") with " << digit_size << " digits"; + + // add digits in the output snapshot + pc.outputs().snapshot(Output{o2::header::gDataOriginTOF, "DIGITS", 0, Lifetime::Timeframe}, *alldigits); + pc.outputs().snapshot(Output{o2::header::gDataOriginTOF, "READOUTWINDOW", 0, Lifetime::Timeframe}, *row); + + static o2::parameters::GRPObject::ROMode roMode = o2::parameters::GRPObject::CONTINUOUS; + + LOG(INFO) << "TOF: Sending ROMode= " << roMode << " to GRPUpdater"; + pc.outputs().snapshot(Output{o2::header::gDataOriginTOF, "ROMode", 0, Lifetime::Timeframe}, roMode); + + mDecoder.clear(); + + LOG(INFO) << "TOF: TF = " << mNTF << " - Crate in " << mNCrateOpenTF; + + mNTF++; + mNCrateOpenTF = 0; + mNCrateCloseTF = 0; +} + +void CompressedDecodingTask::run(ProcessingContext& pc) +{ + LOG(INFO) << "CompressedDecoding run"; + + /** check status **/ + if (mStatus) { + pc.services().get().readyToQuit(QuitRequest::Me); + return; + } + + /** receive input **/ + for (auto& input : pc.inputs()) { + + /** input **/ + const auto* headerIn = DataRefUtils::getHeader(input); + auto payloadIn = const_cast(input.payload); + auto payloadInSize = headerIn->payloadSize; + + /** process input **/ + auto pointer = payloadIn; + while (pointer < (payloadIn + payloadInSize)) { + auto rdh = reinterpret_cast(pointer); + + /** RDH close detected **/ + if (rdh->stop) { +#ifdef VERBOSE + std::cout << "--- RDH close detected" << std::endl; + o2::raw::HBFUtils::printRDH(*rdh); +#endif + if (rdh->heartbeatOrbit == 255 + mInitOrbit) { + mNCrateCloseTF++; + printf("New TF close RDH %d\n", rdh->feeId); + } + pointer += rdh->offsetToNext; + continue; + } + +#ifdef VERBOSE + std::cout << "--- RDH open detected" << std::endl; + o2::raw::HBFUtils::printRDH(*rdh); +#endif + + if ((rdh->pageCnt == 0) && (rdh->triggerType & o2::trigger::TF)) { + mNCrateOpenTF++; + mInitOrbit = rdh->heartbeatOrbit; + printf("New TF open RDH %d\n", rdh->feeId); + } + + pointer += rdh->headerSize; + + while (pointer < (reinterpret_cast(rdh) + rdh->memorySize)) { + + auto word = reinterpret_cast(pointer); + if ((*word & 0x80000000) != 0x80000000) { + printf(" %08x [ERROR] \n ", *(uint32_t*)pointer); + return; + } + + /** crate header detected **/ + auto crateHeader = reinterpret_cast(pointer); +#ifdef VERBOSE + printf(" %08x CrateHeader (drmID=%d) \n ", *(uint32_t*)pointer, crateHeader->drmID); +#endif + pointer += 4; + + /** crate orbit expected **/ + auto crateOrbit = reinterpret_cast(pointer); +#ifdef VERBOSE + printf(" %08x CrateOrbit (orbit=0x%08x) \n ", *(uint32_t*)pointer, crateOrbit->orbitID); +#endif + pointer += 4; + + while (true) { + word = reinterpret_cast(pointer); + + /** crate trailer detected **/ + if (*word & 0x80000000) { + auto crateTrailer = reinterpret_cast(pointer); +#ifdef VERBOSE + printf(" %08x CrateTrailer (numberOfDiagnostics=%d) \n ", *(uint32_t*)pointer, crateTrailer->numberOfDiagnostics); +#endif + pointer += 4; + + /** loop over diagnostics **/ + for (int i = 0; i < crateTrailer->numberOfDiagnostics; ++i) { + auto diagnostic = reinterpret_cast(pointer); +#ifdef VERBOSE + printf(" %08x Diagnostic (slotId=%d) \n ", *(uint32_t*)pointer, diagnostic->slotID); +#endif + pointer += 4; + } + + break; + } + + /** frame header detected **/ + auto frameHeader = reinterpret_cast(pointer); +#ifdef VERBOSE + printf(" %08x FrameHeader (numberOfHits=%d) \n ", *(uint32_t*)pointer, frameHeader->numberOfHits); +#endif + pointer += 4; + + /** loop over hits **/ + for (int i = 0; i < frameHeader->numberOfHits; ++i) { + auto packedHit = reinterpret_cast(pointer); +#ifdef VERBOSE + printf(" %08x PackedHit (tdcID=%d) \n ", *(uint32_t*)pointer, packedHit->tdcID); +#endif + auto indexE = packedHit->channel + + 8 * packedHit->tdcID + + 120 * packedHit->chain + + 240 * (frameHeader->trmID - 3) + + 2400 * crateHeader->drmID; + int time = packedHit->time; + time += (frameHeader->frameID << 13); + + // fill hit + mDecoder.InsertDigit(crateHeader->drmID, frameHeader->trmID, packedHit->tdcID, packedHit->chain, packedHit->channel, crateOrbit->orbitID, crateHeader->bunchID, frameHeader->frameID << 13, packedHit->time, packedHit->tot); + + pointer += 4; + } + } + } + + pointer = reinterpret_cast(rdh) + rdh->offsetToNext; + } + } + + if (mNCrateOpenTF == 72 && mNCrateOpenTF == mNCrateCloseTF) + mHasToBePosted = true; + + if (mHasToBePosted) { + postData(pc); + } +} + +DataProcessorSpec getCompressedDecodingSpec(std::string inputDesc) +{ + std::vector outputs; + outputs.emplace_back(o2::header::gDataOriginTOF, "DIGITS", 0, Lifetime::Timeframe); + outputs.emplace_back(o2::header::gDataOriginTOF, "READOUTWINDOW", 0, Lifetime::Timeframe); + outputs.emplace_back(o2::header::gDataOriginTOF, "ROMode", 0, Lifetime::Timeframe); + + return DataProcessorSpec{ + "tof-compressed-decoder", + select(std::string("x:TOF/" + inputDesc).c_str()), + outputs, + AlgorithmSpec{adaptFromTask()}, + Options{}}; +} + +} // namespace tof +} // namespace o2 diff --git a/Detectors/TOF/workflow/src/RawReaderSpec.cxx b/Detectors/TOF/workflow/src/RawReaderSpec.cxx deleted file mode 100644 index 2630283aa3510..0000000000000 --- a/Detectors/TOF/workflow/src/RawReaderSpec.cxx +++ /dev/null @@ -1,113 +0,0 @@ -// Copyright CERN and copyright holders of ALICE O2. This software is -// distributed under the terms of the GNU General Public License v3 (GPL -// Version 3), copied verbatim in the file "COPYING". -// -// See http://alice-o2.web.cern.ch/license for full licensing information. -// -// In applying this license CERN does not waive the privileges and immunities -// granted to it by virtue of its status as an Intergovernmental Organization -// or submit itself to any jurisdiction. - -/// @file RawReaderSpec.cxx - -#include - -#include "TTree.h" - -#include "Framework/ControlService.h" -#include "Framework/ConfigParamRegistry.h" -#include "TOFWorkflow/RawReaderSpec.h" -#include "DataFormatsParameters/GRPObject.h" -#include "TOFBase/Geo.h" -#include - -using namespace o2::framework; -using namespace o2::tof; - -namespace o2 -{ -namespace tof -{ - -void RawReader::init(InitContext& ic) -{ - LOG(INFO) << "Init Raw reader!"; - mFilename = ic.options().get("tof-raw-infile"); - mState = 1; - - /* - std::ifstream f(mFilename.c_str(), std::ifstream::in); - - if(f.good()){ - mState = 1; - LOG(INFO) << "TOF: TOF Raw file " << mFilename.c_str() << " found"; - f.close(); - } - else{ - mState = 2; - LOG(ERROR) << "TOF: TOF Raw file " << mFilename.c_str() << " not found"; - } -*/ -} - -void RawReader::run(ProcessingContext& pc) -{ - if (mState != 1) { - return; - } - - printf("Run TOF compressed decoding\n"); - - mState = 2; - - o2::tof::compressed::Decoder decoder; - - decoder.open(mFilename.c_str()); - decoder.setVerbose(0); - - // decode raw to digit here - std::vector digitsTemp; - - printf("start decoding raw\n"); - decoder.decode(); - printf("end decoding raw\n"); - - std::vector* alldigits = decoder.getDigitPerTimeFrame(); - std::vector* row = decoder.getReadoutWindowData(); - - int n_tof_window = row->size(); - int n_orbits = n_tof_window / 3; - int digit_size = alldigits->size(); - - LOG(INFO) << "TOF: N tof window decoded = " << n_tof_window << "(orbits = " << n_orbits << ") with " << digit_size << " digits"; - - // add digits in the output snapshot - pc.outputs().snapshot(Output{o2::header::gDataOriginTOF, "DIGITS", 0, Lifetime::Timeframe}, *alldigits); - pc.outputs().snapshot(Output{o2::header::gDataOriginTOF, "READOUTWINDOW", 0, Lifetime::Timeframe}, *row); - - static o2::parameters::GRPObject::ROMode roMode = o2::parameters::GRPObject::CONTINUOUS; - - LOG(INFO) << "TOF: Sending ROMode= " << roMode << " to GRPUpdater"; - pc.outputs().snapshot(Output{o2::header::gDataOriginTOF, "ROMode", 0, Lifetime::Timeframe}, roMode); - - //pc.services().get().readyToQuit(QuitRequest::Me); - pc.services().get().endOfStream(); -} - -DataProcessorSpec getRawReaderSpec() -{ - std::vector outputs; - outputs.emplace_back(o2::header::gDataOriginTOF, "DIGITS", 0, Lifetime::Timeframe); - outputs.emplace_back(o2::header::gDataOriginTOF, "READOUTWINDOW", 0, Lifetime::Timeframe); - outputs.emplace_back(o2::header::gDataOriginTOF, "ROMode", 0, Lifetime::Timeframe); - - return DataProcessorSpec{ - "tof-raw-reader", - Inputs{}, - outputs, - AlgorithmSpec{adaptFromTask()}, - Options{{"tof-raw-infile", VariantType::String, "cmptof.bin", {"Name of the input file"}}}}; -} - -} // namespace tof -} // namespace o2 diff --git a/Detectors/TOF/workflow/src/TOFRawWriterSpec.cxx b/Detectors/TOF/workflow/src/TOFRawWriterSpec.cxx index 94350e230976e..c831baf43ff1f 100644 --- a/Detectors/TOF/workflow/src/TOFRawWriterSpec.cxx +++ b/Detectors/TOF/workflow/src/TOFRawWriterSpec.cxx @@ -50,6 +50,10 @@ void RawWriter::run(ProcessingContext& pc) int nwindowperorbit = Geo::NWINDOW_IN_ORBIT; int nwindowintimeframe = 256 * nwindowperorbit; + int nwindowFilled = nwindow; + if (nwindowFilled % nwindowintimeframe) { + nwindowFilled = (nwindowFilled / nwindowintimeframe + 1) * nwindowintimeframe; + } std::vector emptyWindow; @@ -57,7 +61,7 @@ void RawWriter::run(ProcessingContext& pc) std::vector> digitWindows; - for (int i = 0; i < nwindowintimeframe; i += nwindowperorbit) { // encode 3 tof windows (1 orbit) + for (int i = 0; i < nwindowFilled; i += nwindowperorbit) { // encode 3 tof windows (1 orbit) if (verbosity) printf("----------\nwindow = %d - %d\n----------\n", i, i + nwindowperorbit - 1); diff --git a/Steer/DigitizerWorkflow/src/TOFDigitizerSpec.cxx b/Steer/DigitizerWorkflow/src/TOFDigitizerSpec.cxx index 3a6a9b949cec4..604a5e7e67edd 100644 --- a/Steer/DigitizerWorkflow/src/TOFDigitizerSpec.cxx +++ b/Steer/DigitizerWorkflow/src/TOFDigitizerSpec.cxx @@ -144,7 +144,34 @@ DataProcessorSpec getTOFDigitizerSpec(int channel, bool useCCDB) // call actual digitization procedure labels->clear(); digits->clear(); - digitizer->process(&hits, digits.get()); + if (digitizer->process(&hits, digits.get())) { + // Post Data + if (digitizer->isContinuous()) { + digits->clear(); + labels->clear(); + digitizer->flushOutputContainer(*digits.get()); + } + + std::vector* digitsVector = digitizer->getDigitPerTimeFrame(); + std::vector* readoutwindow = digitizer->getReadoutWindowData(); + std::vector>* mcLabVecOfVec = digitizer->getMCTruthPerTimeFrame(); + + LOG(INFO) << "Post " << digitsVector->size() << " digits in " << readoutwindow->size() << " RO windows"; + + // here we have all digits and we can send them to consumer (aka snapshot it onto output) + pc.outputs().snapshot(Output{o2::header::gDataOriginTOF, "DIGITS", 0, Lifetime::Timeframe}, *digitsVector); + pc.outputs().snapshot(Output{o2::header::gDataOriginTOF, "DIGITSMCTR", 0, Lifetime::Timeframe}, *mcLabVecOfVec); + pc.outputs().snapshot(Output{o2::header::gDataOriginTOF, "READOUTWINDOW", 0, Lifetime::Timeframe}, *readoutwindow); + LOG(INFO) << "TOF: Sending ROMode= " << roMode << " to GRPUpdater"; + pc.outputs().snapshot(Output{o2::header::gDataOriginTOF, "ROMode", 0, Lifetime::Timeframe}, roMode); + + // go to new TF + digitizer->newTF(); + + digitizer->setEventTime(timesview[collID].timeNS); + + digitizer->process(&hits, digits.get()); + } // copy digits into accumulator //std::copy(digits->begin(), digits->end(), std::back_inserter(*digitsAccum.get())); //labelAccum.mergeAtBack(*labels); @@ -155,13 +182,14 @@ DataProcessorSpec getTOFDigitizerSpec(int channel, bool useCCDB) digits->clear(); labels->clear(); digitizer->flushOutputContainer(*digits.get()); - LOG(INFO) << "FLUSHING LEFTOVER STUFF " << digits->size(); } std::vector* digitsVector = digitizer->getDigitPerTimeFrame(); std::vector* readoutwindow = digitizer->getReadoutWindowData(); std::vector>* mcLabVecOfVec = digitizer->getMCTruthPerTimeFrame(); + LOG(INFO) << "Post " << digitsVector->size() << " digits in " << readoutwindow->size() << " RO windows"; + // here we have all digits and we can send them to consumer (aka snapshot it onto output) pc.outputs().snapshot(Output{o2::header::gDataOriginTOF, "DIGITS", 0, Lifetime::Timeframe}, *digitsVector); pc.outputs().snapshot(Output{o2::header::gDataOriginTOF, "DIGITSMCTR", 0, Lifetime::Timeframe}, *mcLabVecOfVec); @@ -174,6 +202,7 @@ DataProcessorSpec getTOFDigitizerSpec(int channel, bool useCCDB) // we should be only called once; tell DPL that this process is ready to exit pc.services().get().readyToQuit(QuitRequest::Me); + finished = true; }; From eb5bc89f411f951c1e6023f4212a5cf0a6e252ee Mon Sep 17 00:00:00 2001 From: Roberto Preghenella Date: Wed, 26 Feb 2020 11:20:03 +0530 Subject: [PATCH 3/4] Add base compressed-decoder class and adapt compressed decoder workflow --- Detectors/TOF/compression/CMakeLists.txt | 5 +- .../src/CompressedInspectorTask.cxx | 196 --------------- Detectors/TOF/compression/src/Compressor.cxx | 10 +- .../src/tof-compressed-inspector.cxx | 2 +- Detectors/TOF/reconstruction/CMakeLists.txt | 2 + .../include/TOFReconstruction/DecoderBase.h | 93 +++++++ .../TOF/reconstruction/src/DecoderBase.cxx | 237 ++++++++++++++++++ Detectors/TOF/workflow/CMakeLists.txt | 2 +- .../TOFWorkflow/CompressedDecodingTask.h | 10 +- .../TOFWorkflow}/CompressedInspectorTask.h | 14 +- .../workflow/src/CompressedDecodingTask.cxx | 138 +++------- .../workflow/src/CompressedInspectorTask.cxx | 134 ++++++++++ 12 files changed, 528 insertions(+), 315 deletions(-) delete mode 100644 Detectors/TOF/compression/src/CompressedInspectorTask.cxx create mode 100644 Detectors/TOF/reconstruction/include/TOFReconstruction/DecoderBase.h create mode 100644 Detectors/TOF/reconstruction/src/DecoderBase.cxx rename Detectors/TOF/{compression/include/TOFCompression => workflow/include/TOFWorkflow}/CompressedInspectorTask.h (67%) create mode 100644 Detectors/TOF/workflow/src/CompressedInspectorTask.cxx diff --git a/Detectors/TOF/compression/CMakeLists.txt b/Detectors/TOF/compression/CMakeLists.txt index 78debc8f8f445..9dcbfc25f2c94 100644 --- a/Detectors/TOF/compression/CMakeLists.txt +++ b/Detectors/TOF/compression/CMakeLists.txt @@ -11,7 +11,6 @@ o2_add_library(TOFCompression SOURCES src/Compressor.cxx src/CompressorTask.cxx - src/CompressedInspectorTask.cxx PUBLIC_LINK_LIBRARIES O2::TOFBase O2::Framework O2::Headers O2::DataFormatsTOF O2::DetectorsRaw ) @@ -25,5 +24,7 @@ o2_add_executable(compressor o2_add_executable(compressed-inspector COMPONENT_NAME tof SOURCES src/tof-compressed-inspector.cxx - PUBLIC_LINK_LIBRARIES O2::TOFCompression + PUBLIC_LINK_LIBRARIES O2::TOFWorkflowUtils ) + + diff --git a/Detectors/TOF/compression/src/CompressedInspectorTask.cxx b/Detectors/TOF/compression/src/CompressedInspectorTask.cxx deleted file mode 100644 index cbd4e47884198..0000000000000 --- a/Detectors/TOF/compression/src/CompressedInspectorTask.cxx +++ /dev/null @@ -1,196 +0,0 @@ -// Copyright CERN and copyright holders of ALICE O2. This software is -// distributed under the terms of the GNU General Public License v3 (GPL -// Version 3), copied verbatim in the file "COPYING". -// -// See http://alice-o2.web.cern.ch/license for full licensing information. -// -// In applying this license CERN does not waive the privileges and immunities -// granted to it by virtue of its status as an Intergovernmental Organization -// or submit itself to any jurisdiction. - -/// @file CompressedInspectorTask.cxx -/// @author Roberto Preghenella -/// @since 2020-01-25 -/// @brief TOF compressed data inspector task - -#include "TOFCompression/CompressedInspectorTask.h" -#include "Framework/ControlService.h" -#include "Framework/ConfigParamRegistry.h" - -#include "Headers/RAWDataHeader.h" -#include "DataFormatsTOF/CompressedDataFormat.h" -#include "DetectorsRaw/HBFUtils.h" - -#include "TFile.h" -#include "TH1F.h" -#include "TH2F.h" - -using namespace o2::framework; - -namespace o2 -{ -namespace tof -{ - -void CompressedInspectorTask::init(InitContext& ic) -{ - LOG(INFO) << "CompressedInspector init"; - auto filename = ic.options().get("tof-inspector-filename"); - - /** open file **/ - if (mFile && mFile->IsOpen()) { - LOG(WARNING) << "a file was already open, closing"; - mFile->Close(); - delete mFile; - } - mFile = TFile::Open(filename.c_str(), "RECREATE"); - if (!mFile || !mFile->IsOpen()) { - LOG(ERROR) << "cannot open output file: " << filename; - mStatus = true; - return; - } - - mHistos1D["hHisto"] = new TH1F("hHisto", "", 1000, 0., 1000.); - mHistos1D["time"] = new TH1F("hTime", ";time (24.4 ps)", 2097152, 0., 2097152.); - mHistos1D["tot"] = new TH1F("hTOT", ";ToT (48.8 ps)", 2048, 0., 2048.); - mHistos1D["indexE"] = new TH1F("hIndexE", ";index EO", 172800, 0., 172800.); - mHistos2D["slotEnableMask"] = new TH2F("hSlotEnableMask", ";crate;slot", 72, 0., 72., 12, 1., 13.); - mHistos2D["diagnostic"] = new TH2F("hDiagnostic", ";crate;slot", 72, 0., 72., 12, 1., 13.); - - auto finishFunction = [this]() { - LOG(INFO) << "CompressedInspector finish"; - for (auto& histo : mHistos1D) - histo.second->Write(); - for (auto& histo : mHistos2D) - histo.second->Write(); - mFile->Close(); - }; - ic.services().get().set(CallbackService::Id::Stop, finishFunction); -} - -void CompressedInspectorTask::run(ProcessingContext& pc) -{ - LOG(DEBUG) << "CompressedInspector run"; - - /** check status **/ - if (mStatus) { - pc.services().get().readyToQuit(QuitRequest::Me); - return; - } - - /** receive input **/ - for (auto& input : pc.inputs()) { - - /** input **/ - const auto* headerIn = DataRefUtils::getHeader(input); - auto payloadIn = const_cast(input.payload); - auto payloadInSize = headerIn->payloadSize; - - /** process input **/ - auto pointer = payloadIn; - while (pointer < (payloadIn + payloadInSize)) { - auto rdh = reinterpret_cast(pointer); - - /** RDH close detected **/ - if (rdh->stop) { -#ifdef VERBOSE - std::cout << "--- RDH close detected" << std::endl; - o2::raw::HBFUtils::printRDH(*rdh); -#endif - pointer += rdh->offsetToNext; - continue; - } - -#ifdef VERBOSE - std::cout << "--- RDH open detected" << std::endl; - o2::raw::HBFUtils::printRDH(*rdh); -#endif - - pointer += rdh->headerSize; - - while (pointer < (reinterpret_cast(rdh) + rdh->memorySize)) { - - auto word = reinterpret_cast(pointer); - if ((*word & 0x80000000) != 0x80000000) { - printf(" %08x [ERROR] \n ", *(uint32_t*)pointer); - return; - } - - /** crate header detected **/ - auto crateHeader = reinterpret_cast(pointer); -#ifdef VERBOSE - printf(" %08x CrateHeader (drmID=%d) \n ", *(uint32_t*)pointer, crateHeader->drmID); -#endif - for (int ibit = 0; ibit < 11; ++ibit) - if (crateHeader->slotEnableMask & (1 << ibit)) - mHistos2D["slotEnableMask"]->Fill(crateHeader->drmID, ibit + 2); - pointer += 4; - - /** crate orbit expected **/ - auto crateOrbit = reinterpret_cast(pointer); -#ifdef VERBOSE - printf(" %08x CrateOrbit (orbit=0x%08x) \n ", *(uint32_t*)pointer, crateOrbit->orbitID); -#endif - pointer += 4; - - while (true) { - word = reinterpret_cast(pointer); - - /** crate trailer detected **/ - if (*word & 0x80000000) { - auto crateTrailer = reinterpret_cast(pointer); -#ifdef VERBOSE - printf(" %08x CrateTrailer (numberOfDiagnostics=%d) \n ", *(uint32_t*)pointer, crateTrailer->numberOfDiagnostics); -#endif - pointer += 4; - - /** loop over diagnostics **/ - for (int i = 0; i < crateTrailer->numberOfDiagnostics; ++i) { - auto diagnostic = reinterpret_cast(pointer); -#ifdef VERBOSE - printf(" %08x Diagnostic (slotId=%d) \n ", *(uint32_t*)pointer, diagnostic->slotID); -#endif - mHistos2D["diagnostic"]->Fill(crateHeader->drmID, diagnostic->slotID); - pointer += 4; - } - - break; - } - - /** frame header detected **/ - auto frameHeader = reinterpret_cast(pointer); -#ifdef VERBOSE - printf(" %08x FrameHeader (numberOfHits=%d) \n ", *(uint32_t*)pointer, frameHeader->numberOfHits); -#endif - mHistos1D["hHisto"]->Fill(frameHeader->numberOfHits); - pointer += 4; - - /** loop over hits **/ - for (int i = 0; i < frameHeader->numberOfHits; ++i) { - auto packedHit = reinterpret_cast(pointer); -#ifdef VERBOSE - printf(" %08x PackedHit (tdcID=%d) \n ", *(uint32_t*)pointer, packedHit->tdcID); -#endif - auto indexE = packedHit->channel + - 8 * packedHit->tdcID + - 120 * packedHit->chain + - 240 * (frameHeader->trmID - 3) + - 2400 * crateHeader->drmID; - int time = packedHit->time; - time += (frameHeader->frameID << 13); - - mHistos1D["indexE"]->Fill(indexE); - mHistos1D["time"]->Fill(time); - mHistos1D["tot"]->Fill(packedHit->tot); - pointer += 4; - } - } - } - - pointer = reinterpret_cast(rdh) + rdh->offsetToNext; - } - } -} - -} // namespace tof -} // namespace o2 diff --git a/Detectors/TOF/compression/src/Compressor.cxx b/Detectors/TOF/compression/src/Compressor.cxx index 7494d1e79ce11..16242b40abebf 100644 --- a/Detectors/TOF/compression/src/Compressor.cxx +++ b/Detectors/TOF/compression/src/Compressor.cxx @@ -20,11 +20,11 @@ #include #include -#define DECODER_PARANOID -#define DECODER_VERBOSE -#define ENCODER_VERBOSE -#define CHECKER_VERBOSE -#define CHECKER_COUNTER +//#define DECODER_PARANOID +//#define DECODER_VERBOSE +//#define ENCODER_VERBOSE +//#define CHECKER_VERBOSE +//#define CHECKER_COUNTER #ifdef DECODER_PARANOID #warning "Building code with DecoderParanoid option. This may limit the speed." diff --git a/Detectors/TOF/compression/src/tof-compressed-inspector.cxx b/Detectors/TOF/compression/src/tof-compressed-inspector.cxx index 23a42623033c7..a0f0b20a8ef06 100644 --- a/Detectors/TOF/compression/src/tof-compressed-inspector.cxx +++ b/Detectors/TOF/compression/src/tof-compressed-inspector.cxx @@ -13,7 +13,7 @@ /// @since 2019-12-18 /// @brief Basic DPL workflow for TOF raw data compression -#include "TOFCompression/CompressedInspectorTask.h" +#include "TOFWorkflow/CompressedInspectorTask.h" #include "Framework/WorkflowSpec.h" #include "Framework/ConfigParamSpec.h" #include "FairLogger.h" diff --git a/Detectors/TOF/reconstruction/CMakeLists.txt b/Detectors/TOF/reconstruction/CMakeLists.txt index 15f96109faad0..3bdd192d96c42 100644 --- a/Detectors/TOF/reconstruction/CMakeLists.txt +++ b/Detectors/TOF/reconstruction/CMakeLists.txt @@ -11,6 +11,7 @@ o2_add_library(TOFReconstruction SOURCES src/DataReader.cxx src/Clusterer.cxx src/ClustererTask.cxx src/Encoder.cxx + src/DecoderBase.cxx src/Decoder.cxx PUBLIC_LINK_LIBRARIES O2::TOFBase O2::DataFormatsTOF O2::SimulationDataFormat @@ -21,4 +22,5 @@ o2_target_root_dictionary(TOFReconstruction include/TOFReconstruction/Clusterer.h include/TOFReconstruction/ClustererTask.h include/TOFReconstruction/Encoder.h + include/TOFReconstruction/DecoderBase.h include/TOFReconstruction/Decoder.h) diff --git a/Detectors/TOF/reconstruction/include/TOFReconstruction/DecoderBase.h b/Detectors/TOF/reconstruction/include/TOFReconstruction/DecoderBase.h new file mode 100644 index 0000000000000..34a4fb991b27b --- /dev/null +++ b/Detectors/TOF/reconstruction/include/TOFReconstruction/DecoderBase.h @@ -0,0 +1,93 @@ +// Copyright CERN and copyright holders of ALICE O2. This software is +// distributed under the terms of the GNU General Public License v3 (GPL +// Version 3), copied verbatim in the file "COPYING". +// +// See http://alice-o2.web.cern.ch/license for full licensing information. +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +/// @file Decoder.h +/// @author Roberto Preghenella +/// @since 2020-02-24 +/// @brief TOF compressed data decoder + +#ifndef O2_TOF_DECODERBASE +#define O2_TOF_DECODERBASE + +#include +#include +#include +#include +#include "Headers/RAWDataHeader.h" +#include "DataFormatsTOF/CompressedDataFormat.h" + +namespace o2 +{ +namespace tof +{ +namespace compressed +{ + +class DecoderBase +{ + + public: + DecoderBase() = default; + ~DecoderBase() = default; + + inline bool run() + { + rewind(); + while (!processHBF()) + ; + return false; + }; + + inline void rewind() + { + decoderRewind(); + }; + + void setDecoderVerbose(bool val) { mDecoderVerbose = val; }; + void setDecoderBuffer(char* val) { mDecoderBuffer = val; }; + void setDecoderBufferSize(long val) { mDecoderBufferSize = val; }; + + private: + /** handlers **/ + + virtual void rdhHandler(const o2::header::RAWDataHeader* rdh){}; + virtual void headerHandler(const CrateHeader_t* crateHeader, const CrateOrbit_t* crateOrbit){}; + + virtual void frameHandler(const CrateHeader_t* crateHeader, const CrateOrbit_t* crateOrbit, + const FrameHeader_t* frameHeader, const PackedHit_t* packedHits){}; + + virtual void trailerHandler(const CrateHeader_t* crateHeader, const CrateOrbit_t* crateOrbit, + const CrateTrailer_t* crateTrailer, const Diagnostic_t* diagnostics){}; + + bool processHBF(); + bool processDRM(); + + /** decoder private functions and data members **/ + inline void decoderRewind() { mDecoderPointer = reinterpret_cast(mDecoderBuffer); }; + + char* mDecoderBuffer = nullptr; + long mDecoderBufferSize; + uint32_t* mDecoderPointer = nullptr; + uint32_t* mDecoderPointerMax = nullptr; + uint32_t* mDecoderPointerNext = nullptr; + o2::header::RAWDataHeader* mDecoderRDH; + bool mDecoderVerbose = false; + bool mDecoderError = false; + bool mDecoderFatal = false; + char mDecoderSaveBuffer[1048576]; + uint32_t mDecoderSaveBufferDataSize = 0; + uint32_t mDecoderSaveBufferDataLeft = 0; +}; + +} // namespace compressed +} // namespace tof +} // namespace o2 + +#endif /** O2_TOF_DECODERBASE **/ diff --git a/Detectors/TOF/reconstruction/src/DecoderBase.cxx b/Detectors/TOF/reconstruction/src/DecoderBase.cxx new file mode 100644 index 0000000000000..04ac3c3c96711 --- /dev/null +++ b/Detectors/TOF/reconstruction/src/DecoderBase.cxx @@ -0,0 +1,237 @@ +// Copyright CERN and copyright holders of ALICE O2. This software is +// distributed under the terms of the GNU General Public License v3 (GPL +// Version 3), copied verbatim in the file "COPYING". +// +// See http://alice-o2.web.cern.ch/license for full licensing information. +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +/// @file Decoder.cxx +/// @author Roberto Preghenella +/// @since 2020-02-24 +/// @brief TOF compressed data decoder base class + +#include "TOFReconstruction/DecoderBase.h" +#include "DetectorsRaw/HBFUtils.h" + +#include +#include + +//#define DECODER_PARANOID +//#define DECODER_VERBOSE + +#ifdef DECODER_PARANOID +#warning "Building code with DecoderParanoid option. This may limit the speed." +#endif +#ifdef DECODER_VERBOSE +#warning "Building code with DecoderVerbose option. This may limit the speed." +#endif + +#define colorReset "\033[0m" +#define colorRed "\033[1;31m" +#define colorGreen "\033[1;32m" +#define colorYellow "\033[1;33m" +#define colorBlue "\033[1;34m" + +namespace o2 +{ +namespace tof +{ +namespace compressed +{ + +bool DecoderBase::processHBF() +{ + +#ifdef DECODER_VERBOSE + if (mDecoderVerbose) { + std::cout << colorBlue + << "--- PROCESS HBF" + << colorReset + << std::endl; + } +#endif + + mDecoderRDH = reinterpret_cast(mDecoderPointer); + auto rdh = mDecoderRDH; + + /** loop until RDH close **/ + while (!rdh->stop) { + +#ifdef DECODER_VERBOSE + if (mDecoderVerbose) { + std::cout << colorBlue + << "--- RDH open/continue detected" + << colorReset + << std::endl; + o2::raw::HBFUtils::printRDH(*rdh); + } +#endif + + /** rdh handler **/ + rdhHandler(rdh); + + auto headerSize = rdh->headerSize; + auto memorySize = rdh->memorySize; + auto offsetToNext = rdh->offsetToNext; + auto drmPayload = memorySize - headerSize; + + /** copy DRM payload to save buffer **/ + std::memcpy(mDecoderSaveBuffer + mDecoderSaveBufferDataSize, reinterpret_cast(rdh) + headerSize, drmPayload); + mDecoderSaveBufferDataSize += drmPayload; + + /** move to next RDH **/ + rdh = reinterpret_cast(reinterpret_cast(rdh) + offsetToNext); + + /** check next RDH is within buffer **/ + if (reinterpret_cast(rdh) < mDecoderBuffer + mDecoderBufferSize) + continue; + + /** otherwise return **/ + return true; + } + +#ifdef DECODER_VERBOSE + if (mDecoderVerbose) { + std::cout << colorBlue + << "--- RDH close detected" + << colorReset + << std::endl; + o2::raw::HBFUtils::printRDH(*rdh); + } +#endif + + /** process DRM data **/ + mDecoderPointer = reinterpret_cast(mDecoderSaveBuffer); + mDecoderPointerMax = reinterpret_cast(mDecoderSaveBuffer + mDecoderSaveBufferDataSize); + while (mDecoderPointer < mDecoderPointerMax) { + if (processDRM()) + break; + } + mDecoderSaveBufferDataSize = 0; + + /** rdh handler **/ + rdhHandler(rdh); + +#ifdef DECODER_VERBOSE + if (mDecoderVerbose) { + std::cout << colorBlue + << "--- END PROCESS HBF" + << colorReset + << std::endl; + } +#endif + + /** move to next RDH **/ + mDecoderPointer = reinterpret_cast(reinterpret_cast(rdh) + rdh->offsetToNext); + + /** check next RDH is within buffer **/ + if (reinterpret_cast(mDecoderPointer) < mDecoderBuffer + mDecoderBufferSize) + return false; + + /** otherwise return **/ + return true; +} + +bool DecoderBase::processDRM() +{ + +#ifdef DECODER_VERBOSE + if (mDecoderVerbose) { + std::cout << colorBlue << "--- PROCESS DRM" + << colorReset + << std::endl; + } +#endif + + if ((*mDecoderPointer & 0x80000000) != 0x80000000) { +#ifdef DECODER_VERBOSE + if (mDecoderVerbose) { + printf(" %08x [ERROR] \n ", *mDecoderPointer); + } +#endif + return true; + } + + /** crate header detected **/ + auto crateHeader = reinterpret_cast(mDecoderPointer); +#ifdef DECODER_VERBOSE + if (mDecoderVerbose) { + printf(" %08x CrateHeader (drmID=%d) \n ", *mDecoderPointer, crateHeader->drmID); + } +#endif + mDecoderPointer++; + + /** crate orbit expected **/ + auto crateOrbit = reinterpret_cast(mDecoderPointer); +#ifdef DECODER_VERBOSE + if (mDecoderVerbose) { + printf(" %08x CrateOrbit (orbit=0x%08x) \n ", *mDecoderPointer, crateOrbit->orbitID); + } +#endif + mDecoderPointer++; + + /** header handler **/ + headerHandler(crateHeader, crateOrbit); + + while (true) { + + /** crate trailer detected **/ + if (*mDecoderPointer & 0x80000000) { + auto crateTrailer = reinterpret_cast(mDecoderPointer); +#ifdef DECODER_VERBOSE + if (mDecoderVerbose) { + printf(" %08x CrateTrailer (numberOfDiagnostics=%d) \n ", *mDecoderPointer, crateTrailer->numberOfDiagnostics); + } +#endif + mDecoderPointer++; + auto diagnostics = reinterpret_cast(mDecoderPointer); +#ifdef DECODER_VERBOSE + if (mDecoderVerbose) { + for (int i = 0; i < crateTrailer->numberOfDiagnostics; ++i) { + auto diagnostic = reinterpret_cast(mDecoderPointer + i); + printf(" %08x Diagnostic (slotId=%d) \n ", *(mDecoderPointer + i), diagnostic->slotID); + } + } +#endif + mDecoderPointer += crateTrailer->numberOfDiagnostics; + + /** trailer handler **/ + trailerHandler(crateHeader, crateOrbit, crateTrailer, diagnostics); + + return false; + } + + /** frame header detected **/ + auto frameHeader = reinterpret_cast(mDecoderPointer); +#ifdef DECODER_VERBOSE + if (mDecoderVerbose) { + printf(" %08x FrameHeader (numberOfHits=%d) \n ", *mDecoderPointer, frameHeader->numberOfHits); + } +#endif + mDecoderPointer++; + auto packedHits = reinterpret_cast(mDecoderPointer); +#ifdef DECODER_VERBOSE + if (mDecoderVerbose) { + for (int i = 0; i < frameHeader->numberOfHits; ++i) { + auto packedHit = reinterpret_cast(mDecoderPointer + 1); + printf(" %08x PackedHit (tdcID=%d) \n ", *(mDecoderPointer + 1), packedHit->tdcID); + packedHits++; + } + } +#endif + mDecoderPointer += frameHeader->numberOfHits; + + /** frame handler **/ + frameHandler(crateHeader, crateOrbit, frameHeader, packedHits); + } + + /** should never reach here **/ + return false; +} + +} // namespace compressed +} // namespace tof +} // namespace o2 diff --git a/Detectors/TOF/workflow/CMakeLists.txt b/Detectors/TOF/workflow/CMakeLists.txt index 25fd0dd508a1b..6711de02442c8 100644 --- a/Detectors/TOF/workflow/CMakeLists.txt +++ b/Detectors/TOF/workflow/CMakeLists.txt @@ -16,6 +16,6 @@ o2_add_library(TOFWorkflowUtils src/TOFDigitWriterSpec.cxx src/TOFRawWriterSpec.cxx src/CompressedDecodingTask.cxx + src/CompressedInspectorTask.cxx PUBLIC_LINK_LIBRARIES O2::Framework O2::TOFBase O2::DataFormatsTOF O2::TOFReconstruction) - diff --git a/Detectors/TOF/workflow/include/TOFWorkflow/CompressedDecodingTask.h b/Detectors/TOF/workflow/include/TOFWorkflow/CompressedDecodingTask.h index 94cfa13fde891..22d68aa53afc0 100644 --- a/Detectors/TOF/workflow/include/TOFWorkflow/CompressedDecodingTask.h +++ b/Detectors/TOF/workflow/include/TOFWorkflow/CompressedDecodingTask.h @@ -19,6 +19,7 @@ #include "Framework/Task.h" #include "Framework/DataProcessorSpec.h" #include +#include "TOFReconstruction/DecoderBase.h" #include "TOFReconstruction/Decoder.h" #include "TOFBase/Digit.h" @@ -29,7 +30,9 @@ namespace o2 namespace tof { -class CompressedDecodingTask : public Task +using namespace compressed; + +class CompressedDecodingTask : public DecoderBase, public Task { public: CompressedDecodingTask() = default; @@ -40,6 +43,11 @@ class CompressedDecodingTask : public Task void postData(ProcessingContext& pc); private: + /** decoding handlers **/ + void rdhHandler(const o2::header::RAWDataHeader* rdh); + void frameHandler(const CrateHeader_t* crateHeader, const CrateOrbit_t* crateOrbit, + const FrameHeader_t* frameHeader, const PackedHit_t* packedHits); + bool mStatus = false; o2::tof::compressed::Decoder mDecoder; std::vector> mDigits; diff --git a/Detectors/TOF/compression/include/TOFCompression/CompressedInspectorTask.h b/Detectors/TOF/workflow/include/TOFWorkflow/CompressedInspectorTask.h similarity index 67% rename from Detectors/TOF/compression/include/TOFCompression/CompressedInspectorTask.h rename to Detectors/TOF/workflow/include/TOFWorkflow/CompressedInspectorTask.h index 365a700cf30ba..d4987dcfa8950 100644 --- a/Detectors/TOF/compression/include/TOFCompression/CompressedInspectorTask.h +++ b/Detectors/TOF/workflow/include/TOFWorkflow/CompressedInspectorTask.h @@ -18,6 +18,7 @@ #include "Framework/Task.h" #include "Framework/DataProcessorSpec.h" +#include "TOFReconstruction/DecoderBase.h" #include class TFile; @@ -31,7 +32,9 @@ namespace o2 namespace tof { -class CompressedInspectorTask : public Task +using namespace compressed; + +class CompressedInspectorTask : public DecoderBase, public Task { public: CompressedInspectorTask() = default; @@ -40,6 +43,15 @@ class CompressedInspectorTask : public Task void run(ProcessingContext& pc) final; private: + /** decoding handlers **/ + void headerHandler(const CrateHeader_t* crateHeader, const CrateOrbit_t* crateOrbit) override; + + void frameHandler(const CrateHeader_t* crateHeader, const CrateOrbit_t* crateOrbit, + const FrameHeader_t* frameHeader, const PackedHit_t* packedHits) override; + + void trailerHandler(const CrateHeader_t* crateHeader, const CrateOrbit_t* crateOrbit, + const CrateTrailer_t* crateTrailer, const Diagnostic_t* diagnostics) override; + bool mStatus = false; TFile* mFile = nullptr; std::map mHistos1D; diff --git a/Detectors/TOF/workflow/src/CompressedDecodingTask.cxx b/Detectors/TOF/workflow/src/CompressedDecodingTask.cxx index 3657df5ed54f7..69620da39e069 100644 --- a/Detectors/TOF/workflow/src/CompressedDecodingTask.cxx +++ b/Detectors/TOF/workflow/src/CompressedDecodingTask.cxx @@ -91,114 +91,9 @@ void CompressedDecodingTask::run(ProcessingContext& pc) auto payloadIn = const_cast(input.payload); auto payloadInSize = headerIn->payloadSize; - /** process input **/ - auto pointer = payloadIn; - while (pointer < (payloadIn + payloadInSize)) { - auto rdh = reinterpret_cast(pointer); - - /** RDH close detected **/ - if (rdh->stop) { -#ifdef VERBOSE - std::cout << "--- RDH close detected" << std::endl; - o2::raw::HBFUtils::printRDH(*rdh); -#endif - if (rdh->heartbeatOrbit == 255 + mInitOrbit) { - mNCrateCloseTF++; - printf("New TF close RDH %d\n", rdh->feeId); - } - pointer += rdh->offsetToNext; - continue; - } - -#ifdef VERBOSE - std::cout << "--- RDH open detected" << std::endl; - o2::raw::HBFUtils::printRDH(*rdh); -#endif - - if ((rdh->pageCnt == 0) && (rdh->triggerType & o2::trigger::TF)) { - mNCrateOpenTF++; - mInitOrbit = rdh->heartbeatOrbit; - printf("New TF open RDH %d\n", rdh->feeId); - } - - pointer += rdh->headerSize; - - while (pointer < (reinterpret_cast(rdh) + rdh->memorySize)) { - - auto word = reinterpret_cast(pointer); - if ((*word & 0x80000000) != 0x80000000) { - printf(" %08x [ERROR] \n ", *(uint32_t*)pointer); - return; - } - - /** crate header detected **/ - auto crateHeader = reinterpret_cast(pointer); -#ifdef VERBOSE - printf(" %08x CrateHeader (drmID=%d) \n ", *(uint32_t*)pointer, crateHeader->drmID); -#endif - pointer += 4; - - /** crate orbit expected **/ - auto crateOrbit = reinterpret_cast(pointer); -#ifdef VERBOSE - printf(" %08x CrateOrbit (orbit=0x%08x) \n ", *(uint32_t*)pointer, crateOrbit->orbitID); -#endif - pointer += 4; - - while (true) { - word = reinterpret_cast(pointer); - - /** crate trailer detected **/ - if (*word & 0x80000000) { - auto crateTrailer = reinterpret_cast(pointer); -#ifdef VERBOSE - printf(" %08x CrateTrailer (numberOfDiagnostics=%d) \n ", *(uint32_t*)pointer, crateTrailer->numberOfDiagnostics); -#endif - pointer += 4; - - /** loop over diagnostics **/ - for (int i = 0; i < crateTrailer->numberOfDiagnostics; ++i) { - auto diagnostic = reinterpret_cast(pointer); -#ifdef VERBOSE - printf(" %08x Diagnostic (slotId=%d) \n ", *(uint32_t*)pointer, diagnostic->slotID); -#endif - pointer += 4; - } - - break; - } - - /** frame header detected **/ - auto frameHeader = reinterpret_cast(pointer); -#ifdef VERBOSE - printf(" %08x FrameHeader (numberOfHits=%d) \n ", *(uint32_t*)pointer, frameHeader->numberOfHits); -#endif - pointer += 4; - - /** loop over hits **/ - for (int i = 0; i < frameHeader->numberOfHits; ++i) { - auto packedHit = reinterpret_cast(pointer); -#ifdef VERBOSE - printf(" %08x PackedHit (tdcID=%d) \n ", *(uint32_t*)pointer, packedHit->tdcID); -#endif - auto indexE = packedHit->channel + - 8 * packedHit->tdcID + - 120 * packedHit->chain + - 240 * (frameHeader->trmID - 3) + - 2400 * crateHeader->drmID; - int time = packedHit->time; - time += (frameHeader->frameID << 13); - - // fill hit - mDecoder.InsertDigit(crateHeader->drmID, frameHeader->trmID, packedHit->tdcID, packedHit->chain, packedHit->channel, crateOrbit->orbitID, crateHeader->bunchID, frameHeader->frameID << 13, packedHit->time, packedHit->tot); - - pointer += 4; - } - } - } - - pointer = reinterpret_cast(rdh) + rdh->offsetToNext; - } + DecoderBase::setDecoderBuffer(payloadIn); + DecoderBase::setDecoderBufferSize(payloadInSize); + DecoderBase::run(); } if (mNCrateOpenTF == 72 && mNCrateOpenTF == mNCrateCloseTF) @@ -209,6 +104,33 @@ void CompressedDecodingTask::run(ProcessingContext& pc) } } +void CompressedDecodingTask::rdhHandler(const o2::header::RAWDataHeader* rdh) +{ + + // rdh close + if (rdh->stop && rdh->heartbeatOrbit == 255 + mInitOrbit) { + mNCrateCloseTF++; + printf("New TF close RDH %d\n", rdh->feeId); + return; + } + + // rdh open + if ((rdh->pageCnt == 0) && (rdh->triggerType & o2::trigger::TF)) { + mNCrateOpenTF++; + mInitOrbit = rdh->heartbeatOrbit; + printf("New TF open RDH %d\n", rdh->feeId); + } +}; + +void CompressedDecodingTask::frameHandler(const CrateHeader_t* crateHeader, const CrateOrbit_t* crateOrbit, + const FrameHeader_t* frameHeader, const PackedHit_t* packedHits) +{ + for (int i = 0; i < frameHeader->numberOfHits; ++i) { + auto packedHit = packedHits + i; + mDecoder.InsertDigit(crateHeader->drmID, frameHeader->trmID, packedHit->tdcID, packedHit->chain, packedHit->channel, crateOrbit->orbitID, crateHeader->bunchID, frameHeader->frameID << 13, packedHit->time, packedHit->tot); + } +}; + DataProcessorSpec getCompressedDecodingSpec(std::string inputDesc) { std::vector outputs; diff --git a/Detectors/TOF/workflow/src/CompressedInspectorTask.cxx b/Detectors/TOF/workflow/src/CompressedInspectorTask.cxx new file mode 100644 index 0000000000000..6357ac351d2b0 --- /dev/null +++ b/Detectors/TOF/workflow/src/CompressedInspectorTask.cxx @@ -0,0 +1,134 @@ +// Copyright CERN and copyright holders of ALICE O2. This software is +// distributed under the terms of the GNU General Public License v3 (GPL +// Version 3), copied verbatim in the file "COPYING". +// +// See http://alice-o2.web.cern.ch/license for full licensing information. +// +// In applying this license CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +/// @file CompressedInspectorTask.cxx +/// @author Roberto Preghenella +/// @since 2020-01-25 +/// @brief TOF compressed data inspector task + +#include "TOFWorkflow/CompressedInspectorTask.h" +#include "Framework/ControlService.h" +#include "Framework/ConfigParamRegistry.h" + +#include "Headers/RAWDataHeader.h" +#include "DataFormatsTOF/CompressedDataFormat.h" + +#include "TFile.h" +#include "TH1F.h" +#include "TH2F.h" + +using namespace o2::framework; + +namespace o2 +{ +namespace tof +{ + +void CompressedInspectorTask::init(InitContext& ic) +{ + LOG(INFO) << "CompressedInspector init"; + auto filename = ic.options().get("tof-inspector-filename"); + + /** open file **/ + if (mFile && mFile->IsOpen()) { + LOG(WARNING) << "a file was already open, closing"; + mFile->Close(); + delete mFile; + } + mFile = TFile::Open(filename.c_str(), "RECREATE"); + if (!mFile || !mFile->IsOpen()) { + LOG(ERROR) << "cannot open output file: " << filename; + mStatus = true; + return; + } + + mHistos1D["hHisto"] = new TH1F("hHisto", "", 1000, 0., 1000.); + mHistos1D["time"] = new TH1F("hTime", ";time (24.4 ps)", 2097152, 0., 2097152.); + mHistos1D["timebc"] = new TH1F("hTimeBC", ";time (24.4 ps)", 1024, 0., 1024.); + mHistos1D["tot"] = new TH1F("hTOT", ";ToT (48.8 ps)", 2048, 0., 2048.); + mHistos1D["indexE"] = new TH1F("hIndexE", ";index EO", 172800, 0., 172800.); + mHistos2D["slotEnableMask"] = new TH2F("hSlotEnableMask", ";crate;slot", 72, 0., 72., 12, 1., 13.); + mHistos2D["diagnostic"] = new TH2F("hDiagnostic", ";crate;slot", 72, 0., 72., 12, 1., 13.); + + auto finishFunction = [this]() { + LOG(INFO) << "CompressedInspector finish"; + for (auto& histo : mHistos1D) + histo.second->Write(); + for (auto& histo : mHistos2D) + histo.second->Write(); + mFile->Close(); + }; + ic.services().get().set(CallbackService::Id::Stop, finishFunction); +} + +void CompressedInspectorTask::run(ProcessingContext& pc) +{ + LOG(DEBUG) << "CompressedInspector run"; + + /** check status **/ + if (mStatus) { + pc.services().get().readyToQuit(QuitRequest::Me); + return; + } + + /** receive input **/ + for (auto& input : pc.inputs()) { + + /** input **/ + const auto* headerIn = DataRefUtils::getHeader(input); + auto payloadIn = const_cast(input.payload); + auto payloadInSize = headerIn->payloadSize; + + setDecoderBuffer(payloadIn); + setDecoderBufferSize(payloadInSize); + DecoderBase::run(); + } +} + +void CompressedInspectorTask::headerHandler(const CrateHeader_t* crateHeader, const CrateOrbit_t* crateOrbit) +{ + for (int ibit = 0; ibit < 11; ++ibit) + if (crateHeader->slotEnableMask & (1 << ibit)) + mHistos2D["slotEnableMask"]->Fill(crateHeader->drmID, ibit + 2); +}; + +void CompressedInspectorTask::frameHandler(const CrateHeader_t* crateHeader, const CrateOrbit_t* crateOrbit, + const FrameHeader_t* frameHeader, const PackedHit_t* packedHits) +{ + mHistos1D["hHisto"]->Fill(frameHeader->numberOfHits); + for (int i = 0; i < frameHeader->numberOfHits; ++i) { + auto packedHit = packedHits + i; + auto indexE = packedHit->channel + + 8 * packedHit->tdcID + + 120 * packedHit->chain + + 240 * (frameHeader->trmID - 3) + + 2400 * crateHeader->drmID; + int time = packedHit->time; + int timebc = time % 1024; + time += (frameHeader->frameID << 13); + + mHistos1D["indexE"]->Fill(indexE); + mHistos1D["time"]->Fill(time); + mHistos1D["timebc"]->Fill(timebc); + mHistos1D["tot"]->Fill(packedHit->tot); + } +}; + +void CompressedInspectorTask::trailerHandler(const CrateHeader_t* crateHeader, const CrateOrbit_t* crateOrbit, + const CrateTrailer_t* crateTrailer, const Diagnostic_t* diagnostics) +{ + for (int i = 0; i < crateTrailer->numberOfDiagnostics; ++i) { + auto diagnostic = diagnostics + i; + mHistos2D["diagnostic"]->Fill(crateHeader->drmID, diagnostic->slotID); + } +}; + +} // namespace tof +} // namespace o2 From 467618dfe29d005853669d00ced7eb9096df78a5 Mon Sep 17 00:00:00 2001 From: noferini Date: Wed, 26 Feb 2020 13:04:33 +0100 Subject: [PATCH 4/4] finalize multi-frame treatement in tof chain --- Detectors/TOF/simulation/src/Digitizer.cxx | 4 +- .../TOF/workflow/src/TOFDigitWriterSpec.cxx | 21 ++++--- .../src/TOFDigitizerSpec.cxx | 60 ++++++++++--------- 3 files changed, 47 insertions(+), 38 deletions(-) diff --git a/Detectors/TOF/simulation/src/Digitizer.cxx b/Detectors/TOF/simulation/src/Digitizer.cxx index 503c60f430c69..bc205c567529c 100644 --- a/Detectors/TOF/simulation/src/Digitizer.cxx +++ b/Detectors/TOF/simulation/src/Digitizer.cxx @@ -100,8 +100,8 @@ int Digitizer::process(const std::vector* hits, std::vector* dig } // close loop readout window } // close if continuous - if (mReadoutWindowCurrent >= 256 * Geo::NWINDOW_IN_ORBIT) // new TF - return 1; + // if (mReadoutWindowCurrent >= 256 * Geo::NWINDOW_IN_ORBIT) // new TF + // return 1; for (auto& hit : *hits) { //TODO: put readout window counting/selection diff --git a/Detectors/TOF/workflow/src/TOFDigitWriterSpec.cxx b/Detectors/TOF/workflow/src/TOFDigitWriterSpec.cxx index 253e30d9ea548..9d7a0dcbd7a09 100644 --- a/Detectors/TOF/workflow/src/TOFDigitWriterSpec.cxx +++ b/Detectors/TOF/workflow/src/TOFDigitWriterSpec.cxx @@ -35,10 +35,12 @@ template TBranch* getOrMakeBranch(TTree& tree, std::string brname, T* ptr) { if (auto br = tree.GetBranch(brname.c_str())) { - br->SetAddress(static_cast(&ptr)); + printf("Re-use output branch %s\n",brname.c_str()); + br->SetAddress(static_cast(ptr)); return br; } // otherwise make it + printf("Create output branch %s\n",brname.c_str()); return tree.Branch(brname.c_str(), ptr); } @@ -54,9 +56,13 @@ DataProcessorSpec getTOFDigitWriterSpec(bool useMC) auto outputfile = std::make_shared(filename.c_str(), "RECREATE"); auto outputtree = std::make_shared(treename.c_str(), treename.c_str()); + int *nCalls = new int; + *nCalls = 0; + // the callback to be set as hook at stop of processing for the framework - auto finishWriting = [outputfile, outputtree]() { - outputtree->SetEntries(1); + auto finishWriting = [outputfile, outputtree, nCalls]() { + printf("finish writing with %d entries in the tree\n",*nCalls); + outputtree->SetEntries(*nCalls); outputtree->Write(); outputfile->Close(); }; @@ -66,15 +72,16 @@ DataProcessorSpec getTOFDigitWriterSpec(bool useMC) // using by-copy capture of the worker instance shared pointer // the shared pointer makes sure to clean up the instance when the processing // function gets out of scope - auto processingFct = [outputfile, outputtree, useMC](ProcessingContext& pc) { + auto processingFct = [outputfile, outputtree, useMC, nCalls](ProcessingContext& pc) { static bool finished = false; if (finished) { // avoid being executed again when marked as finished; return; } - + (*nCalls)++; // retrieve the digits from the input auto indata = pc.inputs().get*>("tofdigits"); + LOG(INFO) << "Call " << *nCalls; LOG(INFO) << "RECEIVED DIGITS SIZE " << indata->size(); auto row = pc.inputs().get*>("readoutwin"); LOG(INFO) << "RECEIVED READOUT WINDOWS " << row->size(); @@ -100,8 +107,8 @@ DataProcessorSpec getTOFDigitWriterSpec(bool useMC) labelbr->Fill(); } - finished = true; - pc.services().get().readyToQuit(QuitRequest::Me); +// finished = true; +// pc.services().get().readyToQuit(QuitRequest::Me); }; // return the actual processing function as a lambda function using variables diff --git a/Steer/DigitizerWorkflow/src/TOFDigitizerSpec.cxx b/Steer/DigitizerWorkflow/src/TOFDigitizerSpec.cxx index 604a5e7e67edd..741efd94c5549 100644 --- a/Steer/DigitizerWorkflow/src/TOFDigitizerSpec.cxx +++ b/Steer/DigitizerWorkflow/src/TOFDigitizerSpec.cxx @@ -144,34 +144,35 @@ DataProcessorSpec getTOFDigitizerSpec(int channel, bool useCCDB) // call actual digitization procedure labels->clear(); digits->clear(); - if (digitizer->process(&hits, digits.get())) { - // Post Data - if (digitizer->isContinuous()) { - digits->clear(); - labels->clear(); - digitizer->flushOutputContainer(*digits.get()); - } - - std::vector* digitsVector = digitizer->getDigitPerTimeFrame(); - std::vector* readoutwindow = digitizer->getReadoutWindowData(); - std::vector>* mcLabVecOfVec = digitizer->getMCTruthPerTimeFrame(); - - LOG(INFO) << "Post " << digitsVector->size() << " digits in " << readoutwindow->size() << " RO windows"; - - // here we have all digits and we can send them to consumer (aka snapshot it onto output) - pc.outputs().snapshot(Output{o2::header::gDataOriginTOF, "DIGITS", 0, Lifetime::Timeframe}, *digitsVector); - pc.outputs().snapshot(Output{o2::header::gDataOriginTOF, "DIGITSMCTR", 0, Lifetime::Timeframe}, *mcLabVecOfVec); - pc.outputs().snapshot(Output{o2::header::gDataOriginTOF, "READOUTWINDOW", 0, Lifetime::Timeframe}, *readoutwindow); - LOG(INFO) << "TOF: Sending ROMode= " << roMode << " to GRPUpdater"; - pc.outputs().snapshot(Output{o2::header::gDataOriginTOF, "ROMode", 0, Lifetime::Timeframe}, roMode); - - // go to new TF - digitizer->newTF(); - - digitizer->setEventTime(timesview[collID].timeNS); - - digitizer->process(&hits, digits.get()); - } + digitizer->process(&hits, digits.get()); + // if (digitizer->process(&hits, digits.get())) { + // // Post Data + // if (digitizer->isContinuous()) { + // digits->clear(); + // labels->clear(); + // digitizer->flushOutputContainer(*digits.get()); + // } + + // std::vector* digitsVector = digitizer->getDigitPerTimeFrame(); + // std::vector* readoutwindow = digitizer->getReadoutWindowData(); + // std::vector>* mcLabVecOfVec = digitizer->getMCTruthPerTimeFrame(); + + // LOG(INFO) << "Post " << digitsVector->size() << " digits in " << readoutwindow->size() << " RO windows"; + + // // here we have all digits and we can send them to consumer (aka snapshot it onto output) + // pc.outputs().snapshot(Output{o2::header::gDataOriginTOF, "DIGITS", 0, Lifetime::Timeframe}, *digitsVector); + // pc.outputs().snapshot(Output{o2::header::gDataOriginTOF, "DIGITSMCTR", 0, Lifetime::Timeframe}, *mcLabVecOfVec); + // pc.outputs().snapshot(Output{o2::header::gDataOriginTOF, "READOUTWINDOW", 0, Lifetime::Timeframe}, *readoutwindow); + // LOG(INFO) << "TOF: Sending ROMode= " << roMode << " to GRPUpdater"; + // pc.outputs().snapshot(Output{o2::header::gDataOriginTOF, "ROMode", 0, Lifetime::Timeframe}, roMode); + + // // go to new TF + // digitizer->newTF(); + + // digitizer->setEventTime(timesview[collID].timeNS); + + // digitizer->process(&hits, digits.get()); + // } // copy digits into accumulator //std::copy(digits->begin(), digits->end(), std::back_inserter(*digitsAccum.get())); //labelAccum.mergeAtBack(*labels); @@ -201,7 +202,8 @@ DataProcessorSpec getTOFDigitizerSpec(int channel, bool useCCDB) LOG(INFO) << "Digitization took " << timer.CpuTime() << "s"; // we should be only called once; tell DPL that this process is ready to exit - pc.services().get().readyToQuit(QuitRequest::Me); + pc.services().get().endOfStream(); + //pc.services().get().readyToQuit(QuitRequest::Me); finished = true; };