Skip to content

MCP Events: Topic-based PUB/SUB server-to-client events#1

Open
elijahr wants to merge 15 commits intomainfrom
mcp-events
Open

MCP Events: Topic-based PUB/SUB server-to-client events#1
elijahr wants to merge 15 commits intomainfrom
mcp-events

Conversation

@elijahr
Copy link
Copy Markdown

@elijahr elijahr commented Apr 8, 2026

Summary

  • Adds events/subscribe, events/unsubscribe, events/list, and events/emit as new MCP protocol methods
  • SubscriptionRegistry with MQTT-style wildcard matching (+/#) and at-most-once delivery
  • RetainedValueStore with TTL expiry and async safety
  • Client-side event handler with topic filtering and subscription tracking
  • 64 tests covering types, registry, and end-to-end roundtrip

Context

Reference implementation for MCP SEP: topic-based server-to-client events.
This is the protocol foundation layer that fastmcp and opencode build on.

Test plan

  • uv run pytest tests/test_event_types.py tests/test_subscription_registry.py tests/test_event_roundtrip.py -x -v

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request restructures the project documentation into a dedicated docs/ directory and introduces a new events system for server-to-client notifications. It also implements idle session cleanup for Streamable HTTP, adds metadata support for resources, and improves transport reliability and UTF-8 handling. Review feedback suggests enhancing logging consistency in the conformance client and refactoring duplicated validation and error-response logic in the HTTP session manager to improve maintainability.

Comment on lines +344 to +346
if len(sys.argv) < 2:
print(f"Usage: {sys.argv[0]} <server-url>", file=sys.stderr)
sys.exit(1)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The main entry point uses print() to stderr for usage instructions. Consider using logging.error() for consistency with the rest of the script's logging setup.

Comment on lines +75 to +78
if session_idle_timeout is not None and session_idle_timeout <= 0:
raise ValueError("session_idle_timeout must be a positive number of seconds")
if stateless and session_idle_timeout is not None:
raise RuntimeError("session_idle_timeout is not supported in stateless mode")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic for validating session_idle_timeout is duplicated in the constructor. Consider moving this validation to a helper method or using a property setter to ensure consistency.

Comment on lines +311 to +317
error_response = JSONRPCError(
jsonrpc="2.0",
id="server-error",
error=ErrorData(
code=INVALID_REQUEST,
message="Session not found",
),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The error response construction is repeated. Consider defining a helper method to generate standard JSON-RPC error responses to reduce code duplication.

@elijahr
Copy link
Copy Markdown
Author

elijahr commented Apr 8, 2026

@gemini-code-assist please re-review

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements an events system with publish/subscribe functionality, adds metadata support to resources, and improves OAuth 2.1 conformance. It also refactors the documentation, introduces conformance testing tools, and adds idle session management for Streamable HTTP. The review identified a bug in the MQTT-style topic matching regex for root wildcards and recommended refactoring duplicated matching logic into a shared utility. Additional feedback suggested caching compiled regex patterns for performance and moving local imports to the module level.

Comment thread src/mcp/server/events.py Outdated
raise ValueError("'#' wildcard is only valid as the last segment")
# Use (/.*)?$ so that # matches zero or more trailing segments.
# e.g. "a/#" -> "^a(/.*)?$" matches "a", "a/b", "a/b/c"
return re.compile("^" + "/".join(regex_parts) + "(/.*)?$")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The regex generated for the MQTT multi-level wildcard # is incorrect when it is the only segment in the pattern. If pattern is #, regex_parts will be empty, and the resulting regex will be ^(/.*)?$. This regex only matches strings starting with a forward slash (or an empty string), failing to match common topics like foo. Per MQTT specification, a root # should match all topics.

Suggested change
return re.compile("^" + "/".join(regex_parts) + "(/.*)?$")
return re.compile("^" + "/".join(regex_parts) + "(/.*)?$") if regex_parts else re.compile("^.*$")

Comment thread src/mcp/server/events.py
return event

async def get_matching(self, pattern: str) -> list[RetainedEvent]:
"""Return all non-expired retained events whose topic matches *pattern*."""
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The regex for the topic pattern is compiled on every call to get_matching. For better performance, especially when many retained events are stored, consider caching the compiled regex patterns, similar to how SubscriptionRegistry does it.

Comment thread src/mcp/client/session.py
Comment on lines +283 to +304
def _topic_matches_subscriptions(self, topic: str) -> bool:
"""Check if a topic matches any of our subscribed patterns."""
import re as _re

for pattern in self._subscribed_patterns:
parts = pattern.split("/")
regex_parts: list[str] = []
for i, part in enumerate(parts):
if part == "#":
regex = "^" + "/".join(regex_parts) + "(/.*)?$"
if _re.match(regex, topic):
return True
break
elif part == "+":
regex_parts.append("[^/]+")
else:
regex_parts.append(_re.escape(part))
else:
regex = "^" + "/".join(regex_parts) + "$"
if _re.match(regex, topic):
return True
return False
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The MQTT topic matching logic is duplicated here and in _handle_event. It also contains the same bug regarding the root # wildcard as found in src/mcp/server/events.py. Consider moving the pattern-to-regex conversion logic to a shared utility module (e.g., src/mcp/shared/topic_utils.py) to ensure consistency and maintainability.

Comment thread src/mcp/client/session.py Outdated

def _topic_matches_subscriptions(self, topic: str) -> bool:
"""Check if a topic matches any of our subscribed patterns."""
import re as _re
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Importing re inside a method that may be called frequently (on every incoming event) is inefficient. Move the import to the top level of the module.

Comment thread src/mcp/server/session.py Outdated
Comment on lines +221 to +225
from ulid import ULID

event_id = str(ULID())
if timestamp is None:
from datetime import datetime, timezone
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Since python-ulid is now a required dependency in pyproject.toml, these imports should be moved to the top level of the module instead of being inside the method.

@elijahr
Copy link
Copy Markdown
Author

elijahr commented Apr 8, 2026

@gemini-code-assist please re-review

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request performs a major documentation restructure, moving content from the README to a dedicated docs/ directory and adding several project policy files. It introduces a new events system for server-to-client notifications with wildcard subscriptions, adds metadata support to resources, and improves server reliability through idle session timeouts and better transport-close handling. A review comment correctly identifies an opportunity to reduce code duplication by centralizing the MQTT-style topic matching logic into a shared utility.

Comment thread src/mcp/client/session.py
Comment on lines +284 to +338
def _topic_matches_subscriptions(self, topic: str) -> bool:
"""Check if a topic matches any of our subscribed patterns."""
for pattern in self._subscribed_patterns:
parts = pattern.split("/")
regex_parts: list[str] = []
for i, part in enumerate(parts):
if part == "#":
if regex_parts:
regex = "^" + "/".join(regex_parts) + "(/.*)?$"
else:
regex = "^.*$"
if re.match(regex, topic):
return True
break
elif part == "+":
regex_parts.append("[^/]+")
else:
regex_parts.append(re.escape(part))
else:
regex = "^" + "/".join(regex_parts) + "$"
if re.match(regex, topic):
return True
return False

async def _handle_event(self, params: types.EventParams) -> None:
"""Dispatch an incoming event to the registered handler."""
if self._event_handler is None:
return

if self._subscribed_patterns and not self._topic_matches_subscriptions(params.topic):
return

if self._event_topic_filter is not None:
parts = self._event_topic_filter.split("/")
regex_parts: list[str] = []
matched = False
for i, part in enumerate(parts):
if part == "#":
if regex_parts:
regex = "^" + "/".join(regex_parts) + "(/.*)?$"
else:
regex = "^.*$"
matched = bool(re.match(regex, params.topic))
break
elif part == "+":
regex_parts.append("[^/]+")
else:
regex_parts.append(re.escape(part))
else:
regex = "^" + "/".join(regex_parts) + "$"
matched = bool(re.match(regex, params.topic))
if not matched:
return

await self._event_handler(params)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The logic for converting an MQTT-style topic pattern to a regular expression and matching it against a topic is duplicated here in _topic_matches_subscriptions and _handle_event. A similar function, _pattern_to_regex, also exists in src/mcp/server/events.py.

To improve maintainability and reduce code duplication, consider extracting this logic into a shared utility function. This function could take a pattern and a topic and return whether they match. You could place this utility in a shared module, for example mcp.shared.events_utils.

For example, you could have a function like:

# in a new mcp/shared/events_utils.py
import re

def topic_matches_pattern(topic: str, pattern: str) -> bool:
    # ... implementation of regex conversion and matching ...

This would simplify both _topic_matches_subscriptions and _handle_event significantly.

@elijahr
Copy link
Copy Markdown
Author

elijahr commented Apr 9, 2026

@gemini-code-assist please re-review

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a comprehensive event system for the MCP Python SDK, enabling server-to-client push notifications with MQTT-style topic wildcards and retained values. Key enhancements include metadata support for resources, session idle timeouts for Streamable HTTP, improved transport closure handling, and a significant overhaul of the documentation. Feedback focuses on performance optimizations for event matching, specifically recommending the caching of compiled regex patterns to avoid overhead during high event volumes. Additionally, a logic correction was suggested to ensure that events are dropped when no matching subscriptions exist, aligning the implementation with the provided documentation.

Comment thread src/mcp/client/session.py
Comment on lines +265 to +273
def set_event_handler(
self,
handler: EventHandlerFnT,
*,
topic_filter: str | None = None,
) -> None:
"""Register a callback for incoming event notifications."""
self._event_handler = handler
self._event_topic_filter = topic_filter
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The topic_filter provided to set_event_handler should be pre-compiled into a regex pattern during registration rather than being reconstructed and recompiled on every incoming event in _handle_event. This will improve performance when processing high volumes of events.

Comment thread src/mcp/client/session.py
if self._event_handler is None:
return

if self._subscribed_patterns and not self._topic_matches_subscriptions(params.topic):
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The subscription enforcement logic here contradicts the documentation and creates inconsistent behavior.

According to docs/events.md:247, events that do not match any active subscription should be dropped. However, the current implementation if self._subscribed_patterns and ... only performs the check if the client has at least one subscription. This means a client with zero subscriptions will receive all events pushed by the server, but as soon as it subscribes to a single topic, it becomes restricted.

Removing the self._subscribed_patterns and check will ensure that events are only processed if they match an active subscription, aligning the code with the documented behavior.

Suggested change
if self._subscribed_patterns and not self._topic_matches_subscriptions(params.topic):
if not self._topic_matches_subscriptions(params.topic):

Comment thread src/mcp/client/session.py
Comment on lines +284 to +307
def _topic_matches_subscriptions(self, topic: str) -> bool:
"""Check if a topic matches any of our subscribed patterns."""
for pattern in self._subscribed_patterns:
parts = pattern.split("/")
regex_parts: list[str] = []
for i, part in enumerate(parts):
if part == "#":
if regex_parts:
regex = "^" + "/".join(regex_parts) + "(/.*)?$"
else:
regex = "^.*$"
if re.match(regex, topic):
return True
break
elif part == "+":
regex_parts.append("[^/]+")
else:
regex_parts.append(re.escape(part))
else:
regex = "^" + "/".join(regex_parts) + "$"
if re.match(regex, topic):
return True
return False

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The MQTT-style topic matching logic in _topic_matches_subscriptions is inefficient and duplicated.

  1. Performance: The code repeatedly splits pattern strings and constructs regex patterns for every incoming event. This will become a significant CPU bottleneck for clients with multiple subscriptions or high event volume. Compiled regexes should be cached.
  2. Duplication: The logic for converting MQTT wildcards (+, #) to regex is duplicated in _handle_event (lines 316-336) and is also present in src/mcp/server/events.py.
  3. Validation: This implementation lacks the validation enforced on the server side (e.g., ensuring # is the last segment).

Consider refactoring the conversion logic into a shared utility function and caching the compiled regex patterns within the ClientSession (e.g., using a dict[str, re.Pattern]).

@elijahr
Copy link
Copy Markdown
Author

elijahr commented Apr 9, 2026

@gemini-code-assist please re-review

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a major update to the MCP Python SDK, featuring a new event system for server-to-client push notifications, support for resource metadata, and an idle timeout mechanism for stateful HTTP sessions. The project structure is refined by moving detailed documentation to a dedicated docs/ directory and adding formal roadmap, versioning, and dependency policies. Feedback suggests standardizing timeout validation logic and error messages, avoiding hardcoded identifiers in session error responses, and optimizing memory usage when closing response streams by avoiding full dictionary copies.

Comment on lines +75 to +76
if session_idle_timeout is not None and session_idle_timeout <= 0:
raise ValueError("session_idle_timeout must be a positive number of seconds")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The validation logic for session_idle_timeout should be consistent with other timeout configurations in the SDK. Consider using a more descriptive error message or a standard validation pattern if available.

Comment on lines +311 to +317
error_response = JSONRPCError(
jsonrpc="2.0",
id="server-error",
error=ErrorData(
code=INVALID_REQUEST,
message="Session not found",
),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The error response for a missing session ID uses a hardcoded 'server-error' ID. Consider using a more dynamic or standard identifier if the protocol allows, or ensure this ID is documented as a specific error case.

Comment thread src/mcp/shared/session.py
for id, stream in self._response_streams.items():
# Snapshot: stream.send() wakes the waiter, whose finally pops
# from _response_streams before the next __next__() call.
for id, stream in list(self._response_streams.items()):
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using list(self._response_streams.items()) creates a full copy of the dictionary items. If this dictionary is large, consider using an iterator or a more memory-efficient approach to avoid unnecessary allocations during connection closure.

elijahr added 4 commits April 8, 2026 23:08
…nt events

Implements events/subscribe, events/unsubscribe, events/list, and events/emit
as new MCP protocol methods. Includes subscription registry with MQTT-style
wildcard matching (+/#), retained value store with TTL expiry, and client-side
event handler with topic filtering and subscription tracking.

Reference implementation for MCP SEP (Specification Enhancement Proposal)
for topic-based server-to-client events.
elijahr added 6 commits April 8, 2026 23:27
Read session_id from InitializeResult._meta after initialize and expose
as session.session_id property. Enables client-side construction of
session-scoped event topics for the {session_id} authorization convention.
ProvenanceEnvelope wraps events with client-assessed provenance metadata
(server, trust tier, topic, source) for safe injection into LLM context,
with to_dict(), to_xml() (XML-escaped), and from_event() factory methods.

EventQueue provides priority-aware buffering with 4 deques (urgent/high/
normal/low) that drain in strict priority order, supporting partial drains
via max_count. Priority is resolved from the highest-priority
requestedEffect; events without effects default to normal.

Fixed a bug in the plan's _resolve_priority where the baseline was "normal"
(rank 2), causing "low" priority events to be promoted to "normal".
Changed baseline to "low" (rank 3) so all priority levels resolve correctly.
- Fix 1 (test_priority_from_multiple_effects): add low-priority event and drain with max_count=1 to prove multi-effect event lands in urgent bucket
- Fix 2 (test_priority_no_effects): add low-priority event enqueued first and assert drain order proves no-effects defaults to normal
- Fix 3 (test_to_xml_basic): exact string equality, assert event_id/received_at absent when not set
- Fix 4 (test_to_xml_with_special_chars_in_attrs): verify escaped forms (single-quote wrapping for ", &lt; for <) with exact string equality
- Fix 5 (test_from_event_extracts_fields): replace "T" in string check with datetime.fromisoformat() validation
@elijahr
Copy link
Copy Markdown
Author

elijahr commented Apr 10, 2026

@gemini-code-assist please re-review

Sort imports and reformat lines in tests/client/test_events.py and
src/mcp/client/events.py to pass pre-commit ruff checks.
Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements a topic-based server-to-client event system for the MCP SDK, enabling push notifications with MQTT-style wildcard patterns, retained values, and advisory effect hints. The implementation includes server-side subscription management and retained value storage, along with client-side event handling, filtering, and priority-aware buffering. Feedback suggests that the current linear scan in the subscription matching logic may present a performance bottleneck for high-volume servers, recommending a more efficient data structure like a trie for future optimization.

Comment thread src/mcp/server/events.py
Comment on lines +68 to +81
async def match(self, topic: str) -> set[str]:
"""Return session IDs whose subscriptions match *topic*.

Each session appears at most once (at-most-once delivery guarantee).
"""
async with self._lock:
result: set[str] = set()
for session_id, patterns in self._subscriptions.items():
for pattern in patterns:
regex = self._compile(pattern)
if regex.match(topic):
result.add(session_id)
break # at-most-once per session
return result
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The match method performs a linear scan over all sessions and all their patterns. While acceptable for a reference implementation or small-scale deployments, this $O(N \times M)$ complexity (where $N$ is the number of sessions and $M$ is the average number of subscriptions per session) could become a performance bottleneck in a server with many active sessions and high event volume. Consider using a trie-based structure or a specialized topic tree for more efficient matching if high throughput is required.

@elijahr
Copy link
Copy Markdown
Author

elijahr commented Apr 10, 2026

@gemini-code-assist please re-review

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request implements a topic-based event system for server-to-client push notifications, featuring MQTT-style wildcards, retained values, and advisory effect hints. It introduces server-side subscription registries and retained value stores, as well as client-side utilities for priority-aware buffering and provenance tracking. Feedback identifies opportunities to support multiple event handlers on the client session, optimize server-side lock usage during topic matching, and improve MQTT compliance for the '+' wildcard to match empty segments.

Comment thread src/mcp/client/session.py
Comment on lines +151 to +153
self._event_handler: EventHandlerFnT | None = None
self._event_topic_filter: str | None = None
self._event_topic_filter_regex: re.Pattern[str] | None = None
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The current implementation only supports a single global event handler. If a user uses the @on_event decorator multiple times, each subsequent call will overwrite the previous handler. This is likely unexpected behavior for a PUB/SUB system. Consider maintaining a list of handlers and their associated filters.

Suggested change
self._event_handler: EventHandlerFnT | None = None
self._event_topic_filter: str | None = None
self._event_topic_filter_regex: re.Pattern[str] | None = None
self._event_handlers: list[tuple[EventHandlerFnT, re.Pattern[str] | None]] = []

Comment thread src/mcp/client/session.py
Comment on lines +295 to +311
def set_event_handler(
self,
handler: EventHandlerFnT,
*,
topic_filter: str | None = None,
) -> None:
"""Register a callback for incoming event notifications.

If *topic_filter* is provided, it is compiled once here and the
cached regex is reused for every incoming event. The filter uses
the same MQTT-style wildcard syntax as subscription patterns
(``+`` for a single segment, ``#`` as a trailing multi-segment
wildcard).
"""
self._event_handler = handler
self._event_topic_filter = topic_filter
self._event_topic_filter_regex = pattern_to_regex(topic_filter) if topic_filter is not None else None
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Renaming this to add_event_handler and appending to a list would allow multiple handlers to coexist, fixing the overwriting issue when using the decorator or multiple manual registrations.

Suggested change
def set_event_handler(
self,
handler: EventHandlerFnT,
*,
topic_filter: str | None = None,
) -> None:
"""Register a callback for incoming event notifications.
If *topic_filter* is provided, it is compiled once here and the
cached regex is reused for every incoming event. The filter uses
the same MQTT-style wildcard syntax as subscription patterns
(``+`` for a single segment, ``#`` as a trailing multi-segment
wildcard).
"""
self._event_handler = handler
self._event_topic_filter = topic_filter
self._event_topic_filter_regex = pattern_to_regex(topic_filter) if topic_filter is not None else None
def add_event_handler(
self,
handler: EventHandlerFnT,
*,
topic_filter: str | None = None,
) -> None:
"""Register a callback for incoming event notifications.
If *topic_filter* is provided, it is compiled once here and the
cached regex is reused for every incoming event. The filter uses
the same MQTT-style wildcard syntax as subscription patterns
(``+`` for a single segment, ``#`` as a trailing multi-segment
wildcard).
"""
regex = pattern_to_regex(topic_filter) if topic_filter is not None else None
self._event_handlers.append((handler, regex))

Comment thread src/mcp/client/session.py
Comment on lines +352 to +361
if self._event_handler is None:
return

if self._subscribed_patterns and not self._topic_matches_subscriptions(params.topic):
return

if self._event_topic_filter_regex is not None and not self._event_topic_filter_regex.match(params.topic):
return

await self._event_handler(params)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Awaiting the event handler directly in the reader loop means that a slow or blocking handler will stop the client from receiving any further messages (including responses to pending requests). Consider spawning a background task for each handler execution, though be mindful of event ordering if that is a requirement.

else:
return re.compile("^.*$")
elif part == "+":
regex_parts.append("[^/]+")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The + wildcard in MQTT is intended to match exactly one topic level, which can be an empty string (e.g., a//c matches a/+/c). The current regex [^/]+ requires at least one character, which will fail to match empty levels. Using [^/]* is more compliant with MQTT semantics.

Suggested change
regex_parts.append("[^/]+")
regex_parts.append("[^/]*")

Comment thread src/mcp/server/events.py
Comment on lines +73 to +81
async with self._lock:
result: set[str] = set()
for session_id, patterns in self._subscriptions.items():
for pattern in patterns:
regex = self._compile(pattern)
if regex.match(topic):
result.add(session_id)
break # at-most-once per session
return result
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Holding the asyncio.Lock during the entire matching loop (which involves synchronous regex execution) can lead to significant performance bottlenecks and block other operations like add or remove if there are many sessions or patterns. It is better to snapshot the subscriptions under the lock and perform the matching outside of it.

Suggested change
async with self._lock:
result: set[str] = set()
for session_id, patterns in self._subscriptions.items():
for pattern in patterns:
regex = self._compile(pattern)
if regex.match(topic):
result.add(session_id)
break # at-most-once per session
return result
async with self._lock:
# Snapshot subscriptions to minimize lock duration
snapshot = {sid: list(patterns) for sid, patterns in self._subscriptions.items()}
result: set[str] = set()
for session_id, patterns in snapshot.items():
for pattern in patterns:
regex = self._compile(pattern)
if regex.match(topic):
result.add(session_id)
break # at-most-once per session
return result

@elijahr
Copy link
Copy Markdown
Author

elijahr commented Apr 10, 2026

@gemini-code-assist please re-review

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a comprehensive event system for the Model Context Protocol (MCP), enabling topic-based server-to-client push notifications. It includes new documentation, client and server-side subscription registries, a retained value store, and MQTT-style wildcard matching. Feedback focuses on improving the robustness of ISO 8601 timestamp parsing for Python 3.10 compatibility and optimizing expiration checks in the RetainedValueStore by reducing redundant system calls.

Comment thread src/mcp/server/events.py
Comment on lines +154 to +165
def _is_expired(self, topic: str) -> bool:
"""Check if a retained value has expired based on its ``expires_at``."""
expires_at = self._expires.get(topic)
if expires_at is None:
return False
try:
expiry = datetime.fromisoformat(expires_at)
if expiry.tzinfo is None:
expiry = expiry.replace(tzinfo=timezone.utc)
return datetime.now(timezone.utc) >= expiry
except (ValueError, TypeError):
return False
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The datetime.fromisoformat method in Python 3.10 (which this project supports) does not handle the 'Z' suffix for UTC. This will cause a ValueError for valid ISO 8601 strings ending in 'Z', leading _is_expired to incorrectly return False. Additionally, accepting an optional now parameter allows for optimization in batch operations like get_matching.

Suggested change
def _is_expired(self, topic: str) -> bool:
"""Check if a retained value has expired based on its ``expires_at``."""
expires_at = self._expires.get(topic)
if expires_at is None:
return False
try:
expiry = datetime.fromisoformat(expires_at)
if expiry.tzinfo is None:
expiry = expiry.replace(tzinfo=timezone.utc)
return datetime.now(timezone.utc) >= expiry
except (ValueError, TypeError):
return False
def _is_expired(self, topic: str, now: datetime | None = None) -> bool:
"""Check if a retained value has expired based on its ``expires_at``."""
expires_at = self._expires.get(topic)
if expires_at is None:
return False
try:
ts = expires_at
if ts.endswith("Z"):
ts = ts[:-1] + "+00:00"
expiry = datetime.fromisoformat(ts)
if expiry.tzinfo is None:
expiry = expiry.replace(tzinfo=timezone.utc)
if now is None:
now = datetime.now(timezone.utc)
return now >= expiry
except (ValueError, TypeError):
return False

Comment thread src/mcp/server/events.py
Comment on lines +128 to +146
async def get_matching(self, pattern: str) -> list[RetainedEvent]:
"""Return all non-expired retained events whose topic matches *pattern*."""
async with self._lock:
if pattern not in self._regex_cache:
self._regex_cache[pattern] = _pattern_to_regex(pattern)
regex = self._regex_cache[pattern]
result: list[RetainedEvent] = []
expired_topics: list[str] = []
for topic, event in self._store.items():
if self._is_expired(topic):
expired_topics.append(topic)
continue
if regex.match(topic):
result.append(event)
# Clean up expired entries
for topic in expired_topics:
del self._store[topic]
self._expires.pop(topic, None)
return result
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

In get_matching, _is_expired is called for every topic in the store. Since _is_expired calls datetime.now(timezone.utc) internally, this results in redundant system calls. It is more efficient to capture the current time once at the start of get_matching and pass it to _is_expired.

Suggested change
async def get_matching(self, pattern: str) -> list[RetainedEvent]:
"""Return all non-expired retained events whose topic matches *pattern*."""
async with self._lock:
if pattern not in self._regex_cache:
self._regex_cache[pattern] = _pattern_to_regex(pattern)
regex = self._regex_cache[pattern]
result: list[RetainedEvent] = []
expired_topics: list[str] = []
for topic, event in self._store.items():
if self._is_expired(topic):
expired_topics.append(topic)
continue
if regex.match(topic):
result.append(event)
# Clean up expired entries
for topic in expired_topics:
del self._store[topic]
self._expires.pop(topic, None)
return result
async def get_matching(self, pattern: str) -> list[RetainedEvent]:
"""Return all non-expired retained events whose topic matches *pattern*."""
async with self._lock:
if pattern not in self._regex_cache:
self._regex_cache[pattern] = _pattern_to_regex(pattern)
regex = self._regex_cache[pattern]
result: list[RetainedEvent] = []
expired_topics: list[str] = []
now = datetime.now(timezone.utc)
for topic, event in self._store.items():
if self._is_expired(topic, now):
expired_topics.append(topic)
continue
if regex.match(topic):
result.append(event)
# Clean up expired entries
for topic in expired_topics:
del self._store[topic]
self._expires.pop(topic, None)
return result

Lines 215-217 (meta session_id extraction) and 239 (session_id property)
in client/session.py were uncovered. Two new tests exercise these paths:
one where _meta contains a string session_id and one where it contains
a non-string value (integer), verifying the isinstance guard.
@elijahr
Copy link
Copy Markdown
Author

elijahr commented Apr 10, 2026

@gemini-code-assist please re-review

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a comprehensive event system for the Model Context Protocol (MCP), enabling topic-based server-to-client push notifications with support for MQTT-style wildcards, retained values, and advisory effect hints. The implementation includes a server-side subscription registry and retained value store, as well as client-side utilities for event handling, priority-aware buffering, and provenance metadata for LLM context injection. Feedback focuses on aligning the wildcard regex with MQTT standards and addressing potential memory leaks and performance bottlenecks caused by unbounded caches and linear matching logic in the server-side registry.

Comment on lines +37 to +38
elif part == "+":
regex_parts.append("[^/]+")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The regex for the + wildcard currently uses [^/]+, which requires at least one character in the topic segment. However, according to the MQTT specification (which this implementation aims to follow), the + wildcard matches exactly one topic level, and that level can be empty. For example, sport/+ should match sport/. Changing the regex to [^/]* would align with standard MQTT behavior by allowing empty segments to match.

Suggested change
elif part == "+":
regex_parts.append("[^/]+")
elif part == "+":
regex_parts.append("[^/]*")

Comment thread src/mcp/server/events.py
# session_id -> set of raw pattern strings
self._subscriptions: dict[str, set[str]] = {}
# Cache compiled regexes: pattern string -> compiled regex
self._compiled: dict[str, re.Pattern[str]] = {}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The _compiled dictionary in SubscriptionRegistry acts as a cache for compiled regular expressions but is never pruned. If clients subscribe to many unique topic patterns (e.g., patterns containing unique IDs), this dictionary will grow indefinitely, potentially leading to a memory leak. Consider using a size-limited cache (like functools.lru_cache on a helper function or a custom eviction policy) to prevent unbounded memory growth.

Comment thread src/mcp/server/events.py
Comment on lines +68 to +81
async def match(self, topic: str) -> set[str]:
"""Return session IDs whose subscriptions match *topic*.

Each session appears at most once (at-most-once delivery guarantee).
"""
async with self._lock:
result: set[str] = set()
for session_id, patterns in self._subscriptions.items():
for pattern in patterns:
regex = self._compile(pattern)
if regex.match(topic):
result.add(session_id)
break # at-most-once per session
return result
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The match method iterates over every session and every pattern registered to those sessions while holding an asynchronous lock. This results in $O(N_{sessions} \times N_{patterns})$ complexity per event emission. For a server with a large number of subscribers, this will become a significant performance bottleneck and block other registry operations (like add or remove). For better scalability, consider using a more efficient data structure for topic matching, such as a Trie or a specialized subscription tree.

Comment thread src/mcp/server/events.py
self._lock = asyncio.Lock()
self._store: dict[str, RetainedEvent] = {}
self._expires: dict[str, str] = {} # topic -> ISO 8601 expires_at
self._regex_cache: dict[str, re.Pattern[str]] = {}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Similar to the SubscriptionRegistry, the _regex_cache in RetainedValueStore is an unbounded dictionary that caches compiled regexes for every pattern used in get_matching. This poses a risk of memory exhaustion if a large variety of patterns are queried over time. Implementing a cache eviction strategy or using a fixed-size cache is recommended.

@elijahr
Copy link
Copy Markdown
Author

elijahr commented Apr 10, 2026

@gemini-code-assist please re-review

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a comprehensive 'Events' feature to the MCP SDK, enabling topic-based server-to-client push notifications using MQTT-style wildcards. Key additions include server-side subscription management and retained value stores, client-side priority-aware event buffering, and provenance metadata for LLM context injection. Feedback highlights a potential inconsistency in client-side filtering when all topics are unsubscribed and a compatibility issue with ISO 8601 timestamp parsing in Python versions prior to 3.11.

Comment thread src/mcp/client/session.py
Comment on lines +355 to +356
if self._subscribed_patterns and not self._topic_matches_subscriptions(params.topic):
return
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current logic for client-side event filtering is inconsistent when a client unsubscribes from all topics. If self._subscribed_patterns becomes empty after previously having subscriptions, the client reverts to "pass-through" mode where it accepts all events from the server. This contradicts the expected behavior where unsubscribing from everything should stop the delivery of those events (or at least keep the filter strict).

Consider adding a flag to track if the client has ever entered "subscription mode" to maintain strict filtering even when the subscription set is empty.

Comment thread src/mcp/server/events.py
if expires_at is None:
return False
try:
expiry = datetime.fromisoformat(expires_at)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

In Python versions prior to 3.11, datetime.fromisoformat() does not support the Z suffix for UTC. Since this is a library that may be used in environments running Python 3.10, it is safer to handle the Z suffix explicitly to avoid ValueError when parsing ISO 8601 timestamps provided by clients or external sources.

            expiry_str = expires_at.replace("Z", "+00:00") if expires_at.endswith("Z") else expires_at
            expiry = datetime.fromisoformat(expiry_str)

@elijahr
Copy link
Copy Markdown
Author

elijahr commented Apr 10, 2026

@gemini-code-assist please re-review

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a topic-based event system for server-to-client push notifications, featuring MQTT-style wildcard patterns, retained values, and advisory effect hints. The implementation includes server-side subscription management via a new registry and store, as well as client-side event handling with priority-aware buffering. Feedback was provided regarding a potential performance bottleneck in the server-side subscription matching logic, which currently scales linearly with the number of sessions and patterns.

Comment thread src/mcp/server/events.py
Comment on lines +68 to +81
async def match(self, topic: str) -> set[str]:
"""Return session IDs whose subscriptions match *topic*.

Each session appears at most once (at-most-once delivery guarantee).
"""
async with self._lock:
result: set[str] = set()
for session_id, patterns in self._subscriptions.items():
for pattern in patterns:
regex = self._compile(pattern)
if regex.match(topic):
result.add(session_id)
break # at-most-once per session
return result
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current implementation of match iterates over all sessions and all patterns for each session, resulting in $O(N \times M)$ complexity where $N$ is the number of sessions and $M$ is the average number of subscriptions per session. While MCP servers typically handle a small number of sessions, this could become a performance bottleneck if the number of sessions or subscriptions grows. Consider refactoring the registry to map patterns to sets of session IDs (e.g., dict[str, set[str]]) to reduce the matching complexity to $O(\text{unique patterns})$.

@elijahr
Copy link
Copy Markdown
Author

elijahr commented Apr 10, 2026

@gemini-code-assist please re-review

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces topic-based server-to-client push notifications (Events) to the MCP SDK. It includes a new events.md documentation file, server-side components like SubscriptionRegistry and RetainedValueStore, and client-side support for subscribing to and handling events with priority-aware buffering. Feedback focuses on improving the efficiency of the RetainedValueStore by pre-parsing expiration timestamps and optimizing the _is_expired check within loops. Additionally, the documentation examples for subscription handlers should be updated to use a stable session identifier instead of the transient request_id.

Comment thread docs/events.md
Comment on lines +166 to +179
async def handle_subscribe(req: EventSubscribeRequest):
ctx = request_ctx.get()
subscribed = []
for pattern in req.params.topics:
await registry.add(str(ctx.request_id), pattern)
subscribed.append(SubscribedTopic(pattern=pattern))

retained: list[RetainedEvent] = []
for pattern in req.params.topics:
retained.extend(await store.get_matching(pattern))

return ServerResult(
EventSubscribeResult(subscribed=subscribed, retained=retained)
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

In this example, ctx.request_id is used as the session identifier for the subscription registry. This is misleading because request_id is unique to the events/subscribe request itself, not the session. Subscriptions should be tied to a stable session identifier that persists across multiple requests from the same client. A better choice in this context would be str(id(ctx.session)), which identifies the ServerSession object associated with the connection.

Suggested change
async def handle_subscribe(req: EventSubscribeRequest):
ctx = request_ctx.get()
subscribed = []
for pattern in req.params.topics:
await registry.add(str(ctx.request_id), pattern)
subscribed.append(SubscribedTopic(pattern=pattern))
retained: list[RetainedEvent] = []
for pattern in req.params.topics:
retained.extend(await store.get_matching(pattern))
return ServerResult(
EventSubscribeResult(subscribed=subscribed, retained=retained)
)
async def handle_subscribe(req: EventSubscribeRequest):
ctx = request_ctx.get()
subscribed = []
session_id = str(id(ctx.session))
for pattern in req.params.topics:
await registry.add(session_id, pattern)
subscribed.append(SubscribedTopic(pattern=pattern))
retained: list[RetainedEvent] = []
for pattern in req.params.topics:
retained.extend(await store.get_matching(pattern))
return ServerResult(
EventSubscribeResult(subscribed=subscribed, retained=retained)
)

Comment thread docs/events.md
Comment on lines +182 to +188
async def handle_unsubscribe(req: EventUnsubscribeRequest):
ctx = request_ctx.get()
for pattern in req.params.topics:
await registry.remove(str(ctx.request_id), pattern)
return ServerResult(
EventUnsubscribeResult(unsubscribed=req.params.topics)
)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Similar to the subscribe handler, the unsubscribe handler should use a stable session identifier instead of the request_id of the unsubscribe request.

Suggested change
async def handle_unsubscribe(req: EventUnsubscribeRequest):
ctx = request_ctx.get()
for pattern in req.params.topics:
await registry.remove(str(ctx.request_id), pattern)
return ServerResult(
EventUnsubscribeResult(unsubscribed=req.params.topics)
)
async def handle_unsubscribe(req: EventUnsubscribeRequest):
ctx = request_ctx.get()
session_id = str(id(ctx.session))
for pattern in req.params.topics:
await registry.remove(session_id, pattern)
return ServerResult(
EventUnsubscribeResult(unsubscribed=req.params.topics)
)

Comment thread src/mcp/server/events.py
def __init__(self) -> None:
self._lock = asyncio.Lock()
self._store: dict[str, RetainedEvent] = {}
self._expires: dict[str, str] = {} # topic -> ISO 8601 expires_at
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The _expires dictionary currently stores expiration timestamps as strings, which requires repeated parsing in _is_expired (especially inside the loop in get_matching). Storing parsed datetime objects would improve performance.

Suggested change
self._expires: dict[str, str] = {} # topic -> ISO 8601 expires_at
self._expires: dict[str, datetime] = {} # topic -> expires_at

Comment thread src/mcp/server/events.py
Comment on lines +107 to +114
async def set(self, topic: str, event: RetainedEvent, expires_at: str | None = None) -> None:
"""Store or replace the retained value for *topic*."""
async with self._lock:
self._store[topic] = event
if expires_at is not None:
self._expires[topic] = expires_at
else:
self._expires.pop(topic, None)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Parse the expires_at string into a datetime object once during set to avoid repeated parsing during lookups and matching.

Suggested change
async def set(self, topic: str, event: RetainedEvent, expires_at: str | None = None) -> None:
"""Store or replace the retained value for *topic*."""
async with self._lock:
self._store[topic] = event
if expires_at is not None:
self._expires[topic] = expires_at
else:
self._expires.pop(topic, None)
async def set(self, topic: str, event: RetainedEvent, expires_at: str | None = None) -> None:
"""Store or replace the retained value for *topic*."""
async with self._lock:
self._store[topic] = event
if expires_at is not None:
try:
expiry = datetime.fromisoformat(expires_at)
if expiry.tzinfo is None:
expiry = expiry.replace(tzinfo=timezone.utc)
self._expires[topic] = expiry
except (ValueError, TypeError):
self._expires.pop(topic, None)
else:
self._expires.pop(topic, None)

Comment thread src/mcp/server/events.py
Comment on lines +128 to +146
async def get_matching(self, pattern: str) -> list[RetainedEvent]:
"""Return all non-expired retained events whose topic matches *pattern*."""
async with self._lock:
if pattern not in self._regex_cache:
self._regex_cache[pattern] = _pattern_to_regex(pattern)
regex = self._regex_cache[pattern]
result: list[RetainedEvent] = []
expired_topics: list[str] = []
for topic, event in self._store.items():
if self._is_expired(topic):
expired_topics.append(topic)
continue
if regex.match(topic):
result.append(event)
# Clean up expired entries
for topic in expired_topics:
del self._store[topic]
self._expires.pop(topic, None)
return result
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

In get_matching, the current time is retrieved via datetime.now() for every topic in the store. It is more efficient to capture the current time once at the start of the method and pass it to _is_expired.

Suggested change
async def get_matching(self, pattern: str) -> list[RetainedEvent]:
"""Return all non-expired retained events whose topic matches *pattern*."""
async with self._lock:
if pattern not in self._regex_cache:
self._regex_cache[pattern] = _pattern_to_regex(pattern)
regex = self._regex_cache[pattern]
result: list[RetainedEvent] = []
expired_topics: list[str] = []
for topic, event in self._store.items():
if self._is_expired(topic):
expired_topics.append(topic)
continue
if regex.match(topic):
result.append(event)
# Clean up expired entries
for topic in expired_topics:
del self._store[topic]
self._expires.pop(topic, None)
return result
async def get_matching(self, pattern: str) -> list[RetainedEvent]:
"""Return all non-expired retained events whose topic matches *pattern*."""
async with self._lock:
if pattern not in self._regex_cache:
self._regex_cache[pattern] = _pattern_to_regex(pattern)
regex = self._regex_cache[pattern]
result: list[RetainedEvent] = []
expired_topics: list[str] = []
now = datetime.now(timezone.utc)
for topic, event in self._store.items():
if self._is_expired(topic, now):
expired_topics.append(topic)
continue
if regex.match(topic):
result.append(event)
# Clean up expired entries
for topic in expired_topics:
del self._store[topic]
self._expires.pop(topic, None)
return result

Comment thread src/mcp/server/events.py
Comment on lines +154 to +165
def _is_expired(self, topic: str) -> bool:
"""Check if a retained value has expired based on its ``expires_at``."""
expires_at = self._expires.get(topic)
if expires_at is None:
return False
try:
expiry = datetime.fromisoformat(expires_at)
if expiry.tzinfo is None:
expiry = expiry.replace(tzinfo=timezone.utc)
return datetime.now(timezone.utc) >= expiry
except (ValueError, TypeError):
return False
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Update _is_expired to work with pre-parsed datetime objects and accept an optional now parameter to avoid redundant datetime.now() calls in loops.

    def _is_expired(self, topic: str, now: datetime | None = None) -> bool:
        """Check if a retained value has expired based on its ``expires_at``."""
        expiry = self._expires.get(topic)
        if expiry is None:
            return False
        if now is None:
            now = datetime.now(timezone.utc)
        return now >= expiry

@elijahr
Copy link
Copy Markdown
Author

elijahr commented Apr 10, 2026

@gemini-code-assist please re-review

Copy link
Copy Markdown

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a comprehensive event system for the Model Context Protocol (MCP), enabling topic-based server-to-client push notifications. It includes server-side components like SubscriptionRegistry and RetainedValueStore for managing MQTT-style wildcard subscriptions and cached events, as well as client-side utilities for priority-aware event buffering and provenance tracking. Feedback is provided regarding the need for error handling in the client's event dispatch loop to prevent user-provided handlers from crashing the session message processing.

Comment thread src/mcp/client/session.py
if self._event_topic_filter_regex is not None and not self._event_topic_filter_regex.match(params.topic):
return

await self._event_handler(params)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The _handle_event method awaits the registered _event_handler without a try-except block. If a user-provided handler raises an exception, it will propagate up and potentially disrupt the session's message processing loop. It might be safer to wrap this call in a try-except block and log any errors from the handler to ensure session stability.

Suggested change
await self._event_handler(params)
try:
await self._event_handler(params)
except Exception as e:
self._logger.error(f"Error in event handler: {e}")

elijahr added 2 commits April 11, 2026 10:07
Align attribute names: use 'trust' (not 'server_trust') in XML output.
Add priority and correlation_id fields to envelope. Update from_event()
to extract priority from requestedEffects. XML format now matches
opencode's formatMcpEvents and typescript-sdk's ProvenanceEnvelope.
- Remove requestedEffects and correlationId from EventParams
- Remove timestamp from EventParams and RetainedEvent
- Add top-level priority field to EventParams
- Make EventParams.payload optional (None for signal events)
- Update EventQueue to read priority directly
- Add kind (required), suggestedHandle, and retained fields to
  EventTopicDescriptor
- Add retained flag to RetainedEvent
- Drop EventEffect class (effects model replaced by priority +
  suggestedHandle + client-side handle)
- Update ProvenanceEnvelope: drop correlation_id, fix XML attribute
  order to spec (server, topic, priority, event_id, trust, source),
  always emit trust attribute
- Rename {session_id} to {agent_id} in topic pattern docstrings and
  docs (only where referring to application-level identity; MCP
  transport session ids unchanged)
- Update emit_event server API: drop correlation_id/requested_effects/
  timestamp kwargs, add priority kwarg
- Update tests and docs/events.md to match v2
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant