Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Framework/Core/include/Framework/DataProcessingDevice.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ struct DeviceContext {
DeviceState* state = nullptr;
ComputingQuotaEvaluator* quotaEvaluator = nullptr;
DataProcessingStats* stats = nullptr;
int expectedRegionCallbacks = 0;
};

struct DataProcessorContext {
Expand Down Expand Up @@ -166,6 +167,9 @@ class DataProcessingDevice : public FairMQDevice
std::vector<uv_work_t> mHandles; /// Handles to use to schedule work.
std::vector<TaskStreamInfo> mStreams; /// Information about the task running in the associated mHandle.
ComputingQuotaEvaluator& mQuotaEvaluator; /// The component which evaluates if the offer can be used to run a task
/// Handle to wake up the main loop from other threads
/// e.g. when FairMQ notifies some callback in an asynchronous way
uv_async_t* mAwakeHandle = nullptr;
};

} // namespace o2::framework
Expand Down
77 changes: 49 additions & 28 deletions Framework/Core/src/DataProcessingDevice.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ void on_idle_timer(uv_timer_t* handle)
state->loopReason |= DeviceState::TIMER_EXPIRED;
}

void on_communication_requested(uv_async_t* s)
{
DeviceState* state = (DeviceState*)s->data;
state->loopReason |= DeviceState::METRICS_MUST_FLUSH;
}

DataProcessingDevice::DataProcessingDevice(RunningDeviceRef ref, ServiceRegistry& registry)
: mSpec{registry.get<RunningWorkflowInfo const>().devices[ref.index]},
mState{registry.get<DeviceState>()},
Expand All @@ -94,7 +100,8 @@ DataProcessingDevice::DataProcessingDevice(RunningDeviceRef ref, ServiceRegistry
mConfigRegistry{nullptr},
mServiceRegistry{registry},
mAllocator{&mTimingInfo, &registry, mSpec.outputs},
mQuotaEvaluator{registry.get<ComputingQuotaEvaluator>()}
mQuotaEvaluator{registry.get<ComputingQuotaEvaluator>()},
mAwakeHandle{nullptr}
{
/// FIXME: move erro handling to a service?
if (mError != nullptr) {
Expand Down Expand Up @@ -127,6 +134,29 @@ DataProcessingDevice::DataProcessingDevice(RunningDeviceRef ref, ServiceRegistry
// One task for now.
mStreams.resize(1);
mHandles.resize(1);

mDeviceContext.device = this;
mDeviceContext.spec = &mSpec;
mDeviceContext.state = &mState;
mDeviceContext.quotaEvaluator = &mQuotaEvaluator;
mDeviceContext.stats = &mStats;

mAwakeHandle = (uv_async_t*)malloc(sizeof(uv_async_t));
assert(mState.loop);
int res = uv_async_init(mState.loop, mAwakeHandle, on_communication_requested);
mAwakeHandle->data = &mState;
if (res < 0) {
LOG(ERROR) << "Unable to initialise subscription";
}

/// This should post a message on the queue...
SubscribeToNewTransition("dpl", [wakeHandle = mAwakeHandle, deviceContext = &mDeviceContext](fair::mq::Transition t) {
int res = uv_async_send(wakeHandle);
if (res < 0) {
LOG(ERROR) << "Unable to notify subscription";
}
LOG(debug) << "State transition requested";
});
}

// Callback to execute the processing. Notice how the data is
Expand Down Expand Up @@ -193,11 +223,6 @@ void on_socket_polled(uv_poll_t* poller, int status, int events)
// We do nothing, all the logic for now stays in DataProcessingDevice::doRun()
}

void on_communication_requested(uv_async_t* s)
{
DeviceState* state = (DeviceState*)s->data;
state->loopReason |= DeviceState::METRICS_MUST_FLUSH;
}

/// This takes care of initialising the device from its specification. In
/// particular it needs to:
Expand Down Expand Up @@ -275,22 +300,6 @@ void DataProcessingDevice::Init()
mState.inputChannelInfos[ci].state = InputChannelState::Pull;
}
}
uv_async_t* wakeHandle = (uv_async_t*)malloc(sizeof(uv_async_t));
assert(mState.loop);
int res = uv_async_init(mState.loop, wakeHandle, on_communication_requested);
wakeHandle->data = &mState;
if (res < 0) {
LOG(ERROR) << "Unable to initialise subscription";
}

/// This should post a message on the queue...
SubscribeToNewTransition("dpl", [wakeHandle](fair::mq::Transition t) {
int res = uv_async_send(wakeHandle);
if (res < 0) {
LOG(ERROR) << "Unable to notify subscription";
}
LOG(debug) << "State transition requested";
});
}

void on_signal_callback(uv_signal_t* handle, int signum)
Expand Down Expand Up @@ -355,8 +364,11 @@ void DataProcessingDevice::InitTask()
uv_async_init(mState.loop, mState.awakeMainThread, on_awake_main_thread);
}

mDeviceContext.expectedRegionCallbacks = std::stoi(fConfig->GetValue<std::string>("expected-region-callbacks"));

for (auto& channel : fChannels) {
channel.second.at(0).Transport()->SubscribeToRegionEvents([this,
&context = mDeviceContext,
&registry = mServiceRegistry,
&pendingRegionInfos = mPendingRegionInfos,
&regionInfoMutex = mRegionInfoMutex](FairMQRegionInfo info) {
Expand All @@ -366,13 +378,10 @@ void DataProcessingDevice::InitTask()
LOG(debug) << "ptr: " << info.ptr;
LOG(debug) << "size: " << info.size;
LOG(debug) << "flags: " << info.flags;
context.expectedRegionCallbacks -= 1;
pendingRegionInfos.push_back(info);
// When not running we can handle the callbacks synchronously.
if (this->GetCurrentState() != fair::mq::State::Running) {
handleRegionCallbacks(registry, pendingRegionInfos);
} else {
uv_async_send(registry.get<DeviceState>().awakeMainThread);
}
// We always want to handle these on the main loop
uv_async_send(registry.get<DeviceState>().awakeMainThread);
});
}

Expand Down Expand Up @@ -475,6 +484,18 @@ void DataProcessingDevice::InitTask()
// do so on a per thread basis, with fine grained locks.
mDataProcessorContexes.resize(1);
this->fillContext(mDataProcessorContexes.at(0), mDeviceContext);

/// We now run an event loop also in InitTask. This is needed to:
/// * Make sure region registration callbacks are invoked
/// on the main thread.
/// * Wait for enough callbacks to be delivered before moving to START
while (mDeviceContext.expectedRegionCallbacks > 0 && uv_run(mState.loop, UV_RUN_ONCE)) {
// Handle callbacks if any
{
std::lock_guard<std::mutex> lock(mRegionInfoMutex);
handleRegionCallbacks(mServiceRegistry, mPendingRegionInfos);
}
}
}

void DataProcessingDevice::fillContext(DataProcessorContext& context, DeviceContext& deviceContext)
Expand Down
2 changes: 2 additions & 0 deletions Framework/Core/src/DeviceSpecHelpers.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1122,6 +1122,7 @@ void DeviceSpecHelpers::prepareArguments(bool defaultQuiet, bool defaultStopped,
realOdesc.add_options()("severity", bpo::value<std::string>());
realOdesc.add_options()("child-driver", bpo::value<std::string>());
realOdesc.add_options()("rate", bpo::value<std::string>());
realOdesc.add_options()("expected-region-callbacks", bpo::value<std::string>());
realOdesc.add_options()("environment", bpo::value<std::string>());
realOdesc.add_options()("stacktrace-on-signal", bpo::value<std::string>());
realOdesc.add_options()("post-fork-command", bpo::value<std::string>());
Expand Down Expand Up @@ -1270,6 +1271,7 @@ boost::program_options::options_description DeviceSpecHelpers::getForwardedDevic
("plugin-search-path,S", bpo::value<std::string>(), "FairMQ plugins search path") //
("control-port", bpo::value<std::string>(), "Utility port to be used by O2 Control") //
("rate", bpo::value<std::string>(), "rate for a data source device (Hz)") //
("expected-region-callbacks", bpo::value<std::string>(), "region callbacks to expect before starting") //
("shm-monitor", bpo::value<std::string>(), "whether to use the shared memory monitor") //
("channel-prefix", bpo::value<std::string>()->default_value(""), "prefix to use for multiplexing multiple workflows in the same session") //
("shm-segment-size", bpo::value<std::string>(), "size of the shared memory segment in bytes") //
Expand Down
1 change: 1 addition & 0 deletions Framework/Core/src/runDataProcessing.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -1101,6 +1101,7 @@ int doChild(int argc, char** argv, ServiceRegistry& serviceRegistry,
optsDesc.add_options()("monitoring-backend", bpo::value<std::string>()->default_value("default"), "monitoring backend info") //
("driver-client-backend", bpo::value<std::string>()->default_value(defaultDriverClient), "backend for device -> driver communicataon: stdout://: use stdout, ws://: use websockets") //
("infologger-severity", bpo::value<std::string>()->default_value(""), "minimum FairLogger severity to send to InfoLogger") //
("expected-region-callbacks", bpo::value<std::string>()->default_value("0"), "how many region callbacks we are expecting") //
("configuration,cfg", bpo::value<std::string>()->default_value("command-line"), "configuration backend") //
("infologger-mode", bpo::value<std::string>()->default_value(""), "O2_INFOLOGGER_MODE override");
r.fConfig.AddToCmdLineOptions(optsDesc, true);
Expand Down