From 5156107e8f80ac849d8be45d8938cb654e9de89e Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Sun, 22 Aug 2021 00:09:07 +0200 Subject: [PATCH 1/2] DPL: handle region notifications as required by GPU processing * Adds an option, --expected-region-callbacks , to wait for enough callback before going to start. * Deliver callbacks on the main thread, always. --- .../include/Framework/DataProcessingDevice.h | 4 + Framework/Core/src/DataProcessingDevice.cxx | 77 ++++++++++++------- Framework/Core/src/DeviceSpecHelpers.cxx | 2 + Framework/Core/src/runDataProcessing.cxx | 1 + 4 files changed, 56 insertions(+), 28 deletions(-) diff --git a/Framework/Core/include/Framework/DataProcessingDevice.h b/Framework/Core/include/Framework/DataProcessingDevice.h index 249fbc4b305c6..db12bebf7247d 100644 --- a/Framework/Core/include/Framework/DataProcessingDevice.h +++ b/Framework/Core/include/Framework/DataProcessingDevice.h @@ -61,6 +61,7 @@ struct DeviceContext { DeviceState* state = nullptr; ComputingQuotaEvaluator* quotaEvaluator = nullptr; DataProcessingStats* stats = nullptr; + int expectedRegionCallbacks = 0; }; struct DataProcessorContext { @@ -166,6 +167,9 @@ class DataProcessingDevice : public FairMQDevice std::vector mHandles; /// Handles to use to schedule work. std::vector mStreams; /// Information about the task running in the associated mHandle. ComputingQuotaEvaluator& mQuotaEvaluator; /// The component which evaluates if the offer can be used to run a task + /// Handle to wake up the main loop from other threads + /// e.g. when FairMQ notifies some callback in an asynchronous way + uv_async_t* mAwakeHandle = nullptr; }; } // namespace o2::framework diff --git a/Framework/Core/src/DataProcessingDevice.cxx b/Framework/Core/src/DataProcessingDevice.cxx index e2f8b25b66d76..48792bfddb9ea 100644 --- a/Framework/Core/src/DataProcessingDevice.cxx +++ b/Framework/Core/src/DataProcessingDevice.cxx @@ -84,6 +84,12 @@ void on_idle_timer(uv_timer_t* handle) state->loopReason |= DeviceState::TIMER_EXPIRED; } +void on_communication_requested(uv_async_t* s) +{ + DeviceState* state = (DeviceState*)s->data; + state->loopReason |= DeviceState::METRICS_MUST_FLUSH; +} + DataProcessingDevice::DataProcessingDevice(RunningDeviceRef ref, ServiceRegistry& registry) : mSpec{registry.get().devices[ref.index]}, mState{registry.get()}, @@ -94,7 +100,8 @@ DataProcessingDevice::DataProcessingDevice(RunningDeviceRef ref, ServiceRegistry mConfigRegistry{nullptr}, mServiceRegistry{registry}, mAllocator{&mTimingInfo, ®istry, mSpec.outputs}, - mQuotaEvaluator{registry.get()} + mQuotaEvaluator{registry.get()}, + mAwakeHandle{nullptr} { /// FIXME: move erro handling to a service? if (mError != nullptr) { @@ -127,6 +134,29 @@ DataProcessingDevice::DataProcessingDevice(RunningDeviceRef ref, ServiceRegistry // One task for now. mStreams.resize(1); mHandles.resize(1); + + mDeviceContext.device = this; + mDeviceContext.spec = &mSpec; + mDeviceContext.state = &mState; + mDeviceContext.quotaEvaluator = &mQuotaEvaluator; + mDeviceContext.stats = &mStats; + + mAwakeHandle = (uv_async_t*)malloc(sizeof(uv_async_t)); + assert(mState.loop); + int res = uv_async_init(mState.loop, mAwakeHandle, on_communication_requested); + mAwakeHandle->data = &mState; + if (res < 0) { + LOG(ERROR) << "Unable to initialise subscription"; + } + + /// This should post a message on the queue... + SubscribeToNewTransition("dpl", [wakeHandle = mAwakeHandle, deviceContext = &mDeviceContext](fair::mq::Transition t) { + int res = uv_async_send(wakeHandle); + if (res < 0) { + LOG(ERROR) << "Unable to notify subscription"; + } + LOG(debug) << "State transition requested"; + }); } // Callback to execute the processing. Notice how the data is @@ -193,11 +223,6 @@ void on_socket_polled(uv_poll_t* poller, int status, int events) // We do nothing, all the logic for now stays in DataProcessingDevice::doRun() } -void on_communication_requested(uv_async_t* s) -{ - DeviceState* state = (DeviceState*)s->data; - state->loopReason |= DeviceState::METRICS_MUST_FLUSH; -} /// This takes care of initialising the device from its specification. In /// particular it needs to: @@ -275,22 +300,6 @@ void DataProcessingDevice::Init() mState.inputChannelInfos[ci].state = InputChannelState::Pull; } } - uv_async_t* wakeHandle = (uv_async_t*)malloc(sizeof(uv_async_t)); - assert(mState.loop); - int res = uv_async_init(mState.loop, wakeHandle, on_communication_requested); - wakeHandle->data = &mState; - if (res < 0) { - LOG(ERROR) << "Unable to initialise subscription"; - } - - /// This should post a message on the queue... - SubscribeToNewTransition("dpl", [wakeHandle](fair::mq::Transition t) { - int res = uv_async_send(wakeHandle); - if (res < 0) { - LOG(ERROR) << "Unable to notify subscription"; - } - LOG(debug) << "State transition requested"; - }); } void on_signal_callback(uv_signal_t* handle, int signum) @@ -355,8 +364,11 @@ void DataProcessingDevice::InitTask() uv_async_init(mState.loop, mState.awakeMainThread, on_awake_main_thread); } + mDeviceContext.expectedRegionCallbacks = std::stoi(fConfig->GetValue("expected-region-callbacks")); + for (auto& channel : fChannels) { channel.second.at(0).Transport()->SubscribeToRegionEvents([this, + &context = mDeviceContext, ®istry = mServiceRegistry, &pendingRegionInfos = mPendingRegionInfos, ®ionInfoMutex = mRegionInfoMutex](FairMQRegionInfo info) { @@ -366,13 +378,10 @@ void DataProcessingDevice::InitTask() LOG(debug) << "ptr: " << info.ptr; LOG(debug) << "size: " << info.size; LOG(debug) << "flags: " << info.flags; + context.expectedRegionCallbacks -= 1; pendingRegionInfos.push_back(info); - // When not running we can handle the callbacks synchronously. - if (this->GetCurrentState() != fair::mq::State::Running) { - handleRegionCallbacks(registry, pendingRegionInfos); - } else { - uv_async_send(registry.get().awakeMainThread); - } + // We always want to handle these on the main loop + uv_async_send(registry.get().awakeMainThread); }); } @@ -475,6 +484,18 @@ void DataProcessingDevice::InitTask() // do so on a per thread basis, with fine grained locks. mDataProcessorContexes.resize(1); this->fillContext(mDataProcessorContexes.at(0), mDeviceContext); + + /// We now run an event loop also in InitTask. This is needed to: + /// * Make sure region registration callbacks are invoked + /// on the main thread. + /// * Wait for enough callbacks to be delivered before moving to START + while (mDeviceContext.expectedRegionCallbacks > 0 && uv_run(mState.loop, UV_RUN_ONCE)) { + // Handle callbacks if any + { + std::lock_guard lock(mRegionInfoMutex); + handleRegionCallbacks(mServiceRegistry, mPendingRegionInfos); + } + } } void DataProcessingDevice::fillContext(DataProcessorContext& context, DeviceContext& deviceContext) diff --git a/Framework/Core/src/DeviceSpecHelpers.cxx b/Framework/Core/src/DeviceSpecHelpers.cxx index 269a9afe0654c..cfd637c1d3a85 100644 --- a/Framework/Core/src/DeviceSpecHelpers.cxx +++ b/Framework/Core/src/DeviceSpecHelpers.cxx @@ -1122,6 +1122,7 @@ void DeviceSpecHelpers::prepareArguments(bool defaultQuiet, bool defaultStopped, realOdesc.add_options()("severity", bpo::value()); realOdesc.add_options()("child-driver", bpo::value()); realOdesc.add_options()("rate", bpo::value()); + realOdesc.add_options()("expected-region-callbacks", bpo::value()); realOdesc.add_options()("environment", bpo::value()); realOdesc.add_options()("stacktrace-on-signal", bpo::value()); realOdesc.add_options()("post-fork-command", bpo::value()); @@ -1270,6 +1271,7 @@ boost::program_options::options_description DeviceSpecHelpers::getForwardedDevic ("plugin-search-path,S", bpo::value(), "FairMQ plugins search path") // ("control-port", bpo::value(), "Utility port to be used by O2 Control") // ("rate", bpo::value(), "rate for a data source device (Hz)") // + ("expected-region-callbacks", bpo::value(), "region callbacks to expect before starting") // ("shm-monitor", bpo::value(), "whether to use the shared memory monitor") // ("channel-prefix", bpo::value()->default_value(""), "prefix to use for multiplexing multiple workflows in the same session") // ("shm-segment-size", bpo::value(), "size of the shared memory segment in bytes") // diff --git a/Framework/Core/src/runDataProcessing.cxx b/Framework/Core/src/runDataProcessing.cxx index b4004c6127b61..a5f72e5568e9e 100644 --- a/Framework/Core/src/runDataProcessing.cxx +++ b/Framework/Core/src/runDataProcessing.cxx @@ -1101,6 +1101,7 @@ int doChild(int argc, char** argv, ServiceRegistry& serviceRegistry, optsDesc.add_options()("monitoring-backend", bpo::value()->default_value("default"), "monitoring backend info") // ("driver-client-backend", bpo::value()->default_value(defaultDriverClient), "backend for device -> driver communicataon: stdout://: use stdout, ws://: use websockets") // ("infologger-severity", bpo::value()->default_value(""), "minimum FairLogger severity to send to InfoLogger") // + ("expected-region-callbacks", bpo::value()->default_value("0"), "how many region callbacks we are expecting") // ("configuration,cfg", bpo::value()->default_value("command-line"), "configuration backend") // ("infologger-mode", bpo::value()->default_value(""), "O2_INFOLOGGER_MODE override"); r.fConfig.AddToCmdLineOptions(optsDesc, true); From 4047257c3926219173eac0d34e868a40089b2a4b Mon Sep 17 00:00:00 2001 From: Giulio Eulisse <10544+ktf@users.noreply.github.com> Date: Mon, 23 Aug 2021 10:22:33 +0200 Subject: [PATCH 2/2] Fix formatting --- Framework/Core/src/runDataProcessing.cxx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Framework/Core/src/runDataProcessing.cxx b/Framework/Core/src/runDataProcessing.cxx index a5f72e5568e9e..5893885687e78 100644 --- a/Framework/Core/src/runDataProcessing.cxx +++ b/Framework/Core/src/runDataProcessing.cxx @@ -1101,7 +1101,7 @@ int doChild(int argc, char** argv, ServiceRegistry& serviceRegistry, optsDesc.add_options()("monitoring-backend", bpo::value()->default_value("default"), "monitoring backend info") // ("driver-client-backend", bpo::value()->default_value(defaultDriverClient), "backend for device -> driver communicataon: stdout://: use stdout, ws://: use websockets") // ("infologger-severity", bpo::value()->default_value(""), "minimum FairLogger severity to send to InfoLogger") // - ("expected-region-callbacks", bpo::value()->default_value("0"), "how many region callbacks we are expecting") // + ("expected-region-callbacks", bpo::value()->default_value("0"), "how many region callbacks we are expecting") // ("configuration,cfg", bpo::value()->default_value("command-line"), "configuration backend") // ("infologger-mode", bpo::value()->default_value(""), "O2_INFOLOGGER_MODE override"); r.fConfig.AddToCmdLineOptions(optsDesc, true);