Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions Framework/Core/scripts/dpl-mcp-server/dpl_mcp_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,37 @@ async def get_logs(max_lines: int = 100) -> str:
return "\n".join(lines)


@mcp.tool()
async def enable_signpost(device: str, streams: list[str]) -> str:
"""Enable one or more signpost log streams for a DPL device.

Signpost streams produce detailed trace output visible in the device logs.
Use get_logs() after subscribing to see the output.

Known stream names (full form): ch.cern.aliceo2.device,
ch.cern.aliceo2.completion, ch.cern.aliceo2.monitoring_service,
ch.cern.aliceo2.data_processor_context, ch.cern.aliceo2.stream_context.

Args:
device: Device name as shown by list_devices, or "" for the driver.
streams: List of full signpost log names to enable.
"""
await _send({"cmd": "enable_signpost", "device": device, "streams": streams})
return f"Enabled {len(streams)} signpost stream(s) for '{device or 'driver'}': {', '.join(streams)}"


@mcp.tool()
async def disable_signpost(device: str, streams: list[str]) -> str:
"""Disable one or more signpost log streams for a DPL device.

Args:
device: Device name as shown by list_devices, or "" for the driver.
streams: List of full signpost log names to disable.
"""
await _send({"cmd": "disable_signpost", "device": device, "streams": streams})
return f"Disabled {len(streams)} signpost stream(s) for '{device or 'driver'}': {', '.join(streams)}"


@mcp.tool()
async def get_updates(max_updates: int = 50) -> str:
"""Drain and return buffered metric update frames received since the last call.
Expand Down
71 changes: 71 additions & 0 deletions Framework/Core/src/StatusWebSocketHandler.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,15 @@
#include "StatusWebSocketHandler.h"
#include "DPLWebSocket.h"
#include "DriverServerContext.h"
#include "Framework/DeviceControl.h"
#include "Framework/DeviceController.h"
#include "Framework/DeviceInfo.h"
#include "Framework/DeviceMetricsInfo.h"
#include "Framework/DeviceSpec.h"
#include "Framework/DeviceState.h"
#include "Framework/DeviceStateEnums.h"
#include "Framework/LogParsingHelpers.h"
#include "Framework/Signpost.h"
#include <algorithm>
#include <cstdio>
#include <string>
Expand Down Expand Up @@ -250,6 +254,10 @@ void StatusWebSocketHandler::frame(char const* data, size_t s)
handleSubscribeLogs(deviceName);
} else if (cmd == "unsubscribe_logs") {
handleUnsubscribeLogs(deviceName);
} else if (cmd == "enable_signpost") {
handleEnableSignpost(deviceName, extractArrayField(msg, "streams"));
} else if (cmd == "disable_signpost") {
handleDisableSignpost(deviceName, extractArrayField(msg, "streams"));
}
}

Expand Down Expand Up @@ -433,6 +441,69 @@ size_t StatusWebSocketHandler::findDeviceIndex(std::string_view name) const
return SIZE_MAX;
}

void StatusWebSocketHandler::handleEnableSignpost(std::string_view deviceName, std::string_view streamsArr)
{
if (streamsArr.empty()) {
return;
}
if (deviceName.empty()) {
// Driver process — toggle in-process via o2_walk_logs.
forEachStringInArray(streamsArr, [](std::string_view streamName) {
std::string target(streamName);
o2_walk_logs([](char const* name, void* l, void* context) -> bool {
auto* log = static_cast<_o2_log_t*>(l);
if (static_cast<std::string*>(context)->compare(name) == 0) {
_o2_log_set_stacktrace(log, log->defaultStacktrace);
return false;
}
return true;
}, &target);
});
} else {
size_t di = findDeviceIndex(deviceName);
if (di == SIZE_MAX || di >= mContext.controls->size() || !(*mContext.controls)[di].controller) {
return;
}
auto* controller = (*mContext.controls)[di].controller;
forEachStringInArray(streamsArr, [controller](std::string_view name) {
std::string cmd = "/signpost:enable ";
cmd += name;
controller->write(cmd.c_str(), cmd.size());
});
}
}

void StatusWebSocketHandler::handleDisableSignpost(std::string_view deviceName, std::string_view streamsArr)
{
if (streamsArr.empty()) {
return;
}
if (deviceName.empty()) {
forEachStringInArray(streamsArr, [](std::string_view streamName) {
std::string target(streamName);
o2_walk_logs([](char const* name, void* l, void* context) -> bool {
auto* log = static_cast<_o2_log_t*>(l);
if (static_cast<std::string*>(context)->compare(name) == 0) {
_o2_log_set_stacktrace(log, 0);
return false;
}
return true;
}, &target);
});
} else {
size_t di = findDeviceIndex(deviceName);
if (di == SIZE_MAX || di >= mContext.controls->size() || !(*mContext.controls)[di].controller) {
return;
}
auto* controller = (*mContext.controls)[di].controller;
forEachStringInArray(streamsArr, [controller](std::string_view name) {
std::string cmd = "/signpost:disable ";
cmd += name;
controller->write(cmd.c_str(), cmd.size());
});
}
}

void StatusWebSocketHandler::handleSubscribeLogs(std::string_view deviceName)
{
size_t di = findDeviceIndex(deviceName);
Expand Down
13 changes: 13 additions & 0 deletions Framework/Core/src/StatusWebSocketHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,17 @@ struct WSDPLHandler;
/// {"cmd":"unsubscribe_logs","device":"<name>"}
/// → driver stops pushing log lines for the device
///
/// {"cmd":"enable_signpost","device":"<name>","streams":["device","completion",...]}
/// → enable the named signpost log streams for a device (or the driver if device=="")
/// → known streams: "device","completion","monitoring_service","data_processor_context","stream_context"
///
/// {"cmd":"disable_signpost","device":"<name>","streams":["device","completion",...]}
/// → disable the named signpost log streams for a device
///
/// {"cmd":"list_signposts"}
/// → driver replies with {"type":"signposts_list","streams":["device","completion",...]}
/// → lists the known stream names
///
/// Protocol (driver → client):
/// {"type":"snapshot","devices":[{"name","pid","active","streamingState","deviceState"},...]}
/// → sent once on connect; contains no metrics or logs
Expand Down Expand Up @@ -84,6 +95,8 @@ struct StatusWebSocketHandler : public WebSocketHandler {
void handleUnsubscribe(std::string_view deviceName, std::string_view metricsJson);
void handleSubscribeLogs(std::string_view deviceName);
void handleUnsubscribeLogs(std::string_view deviceName);
void handleEnableSignpost(std::string_view deviceName, std::string_view streamsArr);
void handleDisableSignpost(std::string_view deviceName, std::string_view streamsArr);
size_t findDeviceIndex(std::string_view name) const;

DriverServerContext& mContext;
Expand Down
66 changes: 29 additions & 37 deletions Framework/Core/src/WSDriverClient.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -188,48 +188,40 @@ void on_connect(uv_connect_t* connection, int status)
state.tracingFlags = tracingFlags;
});

client->observe("/log-streams", [ref = context->ref](std::string_view cmd) {
auto& state = ref.get<DeviceState>();
static constexpr int prefixSize = std::string_view{"/log-streams "}.size();
if (prefixSize > cmd.size()) {
LOG(error) << "Malformed log-streams request";
client->observe("/signpost:enable", [](std::string_view cmd) {
static constexpr int prefixSize = std::string_view{"/signpost:enable "}.size();
if (cmd.size() <= prefixSize) {
LOG(error) << "Malformed /signpost:enable request";
return;
}
cmd.remove_prefix(prefixSize);
int logStreams = 0;
std::string name(cmd.substr(prefixSize));
o2_walk_logs([](char const* logName, void* l, void* context) -> bool {
auto* log = static_cast<_o2_log_t*>(l);
auto* target = static_cast<std::string*>(context);
if (*target == logName) {
_o2_log_set_stacktrace(log, log->defaultStacktrace);
return false;
}
return true;
}, &name);
});

auto error = std::from_chars(cmd.data(), cmd.data() + cmd.size(), logStreams);
if (error.ec != std::errc()) {
LOG(error) << "Malformed log-streams mask";
client->observe("/signpost:disable", [](std::string_view cmd) {
static constexpr int prefixSize = std::string_view{"/signpost:disable "}.size();
if (cmd.size() <= prefixSize) {
LOG(error) << "Malformed /signpost:disable request";
return;
}
LOGP(info, "Logstreams flags set to {}", logStreams);
state.logStreams = logStreams;
if ((state.logStreams & DeviceState::LogStreams::DEVICE_LOG) != 0) {
O2_LOG_ENABLE(device);
} else {
O2_LOG_DISABLE(device);
}
if ((state.logStreams & DeviceState::LogStreams::COMPLETION_LOG) != 0) {
O2_LOG_ENABLE(completion);
} else {
O2_LOG_DISABLE(completion);
}
if ((state.logStreams & DeviceState::LogStreams::MONITORING_SERVICE_LOG) != 0) {
O2_LOG_ENABLE(monitoring_service);
} else {
O2_LOG_DISABLE(monitoring_service);
}
if ((state.logStreams & DeviceState::LogStreams::DATA_PROCESSOR_CONTEXT_LOG) != 0) {
O2_LOG_ENABLE(data_processor_context);
} else {
O2_LOG_DISABLE(data_processor_context);
}
if ((state.logStreams & DeviceState::LogStreams::STREAM_CONTEXT_LOG) != 0) {
O2_LOG_ENABLE(stream_context);
} else {
O2_LOG_DISABLE(stream_context);
}
std::string name(cmd.substr(prefixSize));
o2_walk_logs([](char const* logName, void* l, void* context) -> bool {
auto* log = static_cast<_o2_log_t*>(l);
auto* target = static_cast<std::string*>(context);
if (*target == logName) {
_o2_log_set_stacktrace(log, 0);
return false;
}
return true;
}, &name);
});

// Client will be filled in the line after. I can probably have a single
Expand Down
27 changes: 18 additions & 9 deletions Framework/GUISupport/src/FrameworkGUIDeviceInspector.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -400,16 +400,25 @@ void displayDeviceInspector(DeviceSpec const& spec,
}
}

bool logsChanged = false;
if (ImGui::CollapsingHeader("Signposts", ImGuiTreeNodeFlags_DefaultOpen)) {
logsChanged = ImGui::CheckboxFlags("Device", &control.logStreams, DeviceState::LogStreams::DEVICE_LOG);
logsChanged = ImGui::CheckboxFlags("Completion", &control.logStreams, DeviceState::LogStreams::COMPLETION_LOG);
logsChanged = ImGui::CheckboxFlags("Monitoring", &control.logStreams, DeviceState::LogStreams::MONITORING_SERVICE_LOG);
logsChanged = ImGui::CheckboxFlags("DataProcessorContext", &control.logStreams, DeviceState::LogStreams::DATA_PROCESSOR_CONTEXT_LOG);
logsChanged = ImGui::CheckboxFlags("StreamContext", &control.logStreams, DeviceState::LogStreams::STREAM_CONTEXT_LOG);
if (logsChanged && control.controller) {
std::string cmd = fmt::format("/log-streams {}", control.logStreams);
control.controller->write(cmd.c_str(), cmd.size());
static const struct {
const char* label;
int bit;
const char* fullName;
} kStreams[] = {
{"Device", DeviceState::LogStreams::DEVICE_LOG, "ch.cern.aliceo2.device"},
{"Completion", DeviceState::LogStreams::COMPLETION_LOG, "ch.cern.aliceo2.completion"},
{"Monitoring", DeviceState::LogStreams::MONITORING_SERVICE_LOG, "ch.cern.aliceo2.monitoring_service"},
{"DataProcessorContext", DeviceState::LogStreams::DATA_PROCESSOR_CONTEXT_LOG, "ch.cern.aliceo2.data_processor_context"},
{"StreamContext", DeviceState::LogStreams::STREAM_CONTEXT_LOG, "ch.cern.aliceo2.stream_context"},
};
for (auto const& s : kStreams) {
if (ImGui::CheckboxFlags(s.label, &control.logStreams, s.bit) && control.controller) {
bool enabled = (control.logStreams & s.bit) != 0;
std::string cmd = enabled ? fmt::format("/signpost:enable {}", s.fullName)
: fmt::format("/signpost:disable {}", s.fullName);
control.controller->write(cmd.c_str(), cmd.size());
}
}
}

Expand Down
Loading