MCP Events: Topic-based PUB/SUB server-to-client events#1
MCP Events: Topic-based PUB/SUB server-to-client events#1
Conversation
There was a problem hiding this comment.
Code Review
This pull request restructures the project documentation into a dedicated docs/ directory and introduces a new events system for server-to-client notifications. It also implements idle session cleanup for Streamable HTTP, adds metadata support for resources, and improves transport reliability and UTF-8 handling. Review feedback suggests enhancing logging consistency in the conformance client and refactoring duplicated validation and error-response logic in the HTTP session manager to improve maintainability.
| if len(sys.argv) < 2: | ||
| print(f"Usage: {sys.argv[0]} <server-url>", file=sys.stderr) | ||
| sys.exit(1) |
| if session_idle_timeout is not None and session_idle_timeout <= 0: | ||
| raise ValueError("session_idle_timeout must be a positive number of seconds") | ||
| if stateless and session_idle_timeout is not None: | ||
| raise RuntimeError("session_idle_timeout is not supported in stateless mode") |
| error_response = JSONRPCError( | ||
| jsonrpc="2.0", | ||
| id="server-error", | ||
| error=ErrorData( | ||
| code=INVALID_REQUEST, | ||
| message="Session not found", | ||
| ), |
|
@gemini-code-assist please re-review |
There was a problem hiding this comment.
Code Review
This pull request implements an events system with publish/subscribe functionality, adds metadata support to resources, and improves OAuth 2.1 conformance. It also refactors the documentation, introduces conformance testing tools, and adds idle session management for Streamable HTTP. The review identified a bug in the MQTT-style topic matching regex for root wildcards and recommended refactoring duplicated matching logic into a shared utility. Additional feedback suggested caching compiled regex patterns for performance and moving local imports to the module level.
| raise ValueError("'#' wildcard is only valid as the last segment") | ||
| # Use (/.*)?$ so that # matches zero or more trailing segments. | ||
| # e.g. "a/#" -> "^a(/.*)?$" matches "a", "a/b", "a/b/c" | ||
| return re.compile("^" + "/".join(regex_parts) + "(/.*)?$") |
There was a problem hiding this comment.
The regex generated for the MQTT multi-level wildcard # is incorrect when it is the only segment in the pattern. If pattern is #, regex_parts will be empty, and the resulting regex will be ^(/.*)?$. This regex only matches strings starting with a forward slash (or an empty string), failing to match common topics like foo. Per MQTT specification, a root # should match all topics.
| return re.compile("^" + "/".join(regex_parts) + "(/.*)?$") | |
| return re.compile("^" + "/".join(regex_parts) + "(/.*)?$") if regex_parts else re.compile("^.*$") |
| return event | ||
|
|
||
| async def get_matching(self, pattern: str) -> list[RetainedEvent]: | ||
| """Return all non-expired retained events whose topic matches *pattern*.""" |
| def _topic_matches_subscriptions(self, topic: str) -> bool: | ||
| """Check if a topic matches any of our subscribed patterns.""" | ||
| import re as _re | ||
|
|
||
| for pattern in self._subscribed_patterns: | ||
| parts = pattern.split("/") | ||
| regex_parts: list[str] = [] | ||
| for i, part in enumerate(parts): | ||
| if part == "#": | ||
| regex = "^" + "/".join(regex_parts) + "(/.*)?$" | ||
| if _re.match(regex, topic): | ||
| return True | ||
| break | ||
| elif part == "+": | ||
| regex_parts.append("[^/]+") | ||
| else: | ||
| regex_parts.append(_re.escape(part)) | ||
| else: | ||
| regex = "^" + "/".join(regex_parts) + "$" | ||
| if _re.match(regex, topic): | ||
| return True | ||
| return False |
There was a problem hiding this comment.
The MQTT topic matching logic is duplicated here and in _handle_event. It also contains the same bug regarding the root # wildcard as found in src/mcp/server/events.py. Consider moving the pattern-to-regex conversion logic to a shared utility module (e.g., src/mcp/shared/topic_utils.py) to ensure consistency and maintainability.
|
|
||
| def _topic_matches_subscriptions(self, topic: str) -> bool: | ||
| """Check if a topic matches any of our subscribed patterns.""" | ||
| import re as _re |
| from ulid import ULID | ||
|
|
||
| event_id = str(ULID()) | ||
| if timestamp is None: | ||
| from datetime import datetime, timezone |
|
@gemini-code-assist please re-review |
There was a problem hiding this comment.
Code Review
This pull request performs a major documentation restructure, moving content from the README to a dedicated docs/ directory and adding several project policy files. It introduces a new events system for server-to-client notifications with wildcard subscriptions, adds metadata support to resources, and improves server reliability through idle session timeouts and better transport-close handling. A review comment correctly identifies an opportunity to reduce code duplication by centralizing the MQTT-style topic matching logic into a shared utility.
| def _topic_matches_subscriptions(self, topic: str) -> bool: | ||
| """Check if a topic matches any of our subscribed patterns.""" | ||
| for pattern in self._subscribed_patterns: | ||
| parts = pattern.split("/") | ||
| regex_parts: list[str] = [] | ||
| for i, part in enumerate(parts): | ||
| if part == "#": | ||
| if regex_parts: | ||
| regex = "^" + "/".join(regex_parts) + "(/.*)?$" | ||
| else: | ||
| regex = "^.*$" | ||
| if re.match(regex, topic): | ||
| return True | ||
| break | ||
| elif part == "+": | ||
| regex_parts.append("[^/]+") | ||
| else: | ||
| regex_parts.append(re.escape(part)) | ||
| else: | ||
| regex = "^" + "/".join(regex_parts) + "$" | ||
| if re.match(regex, topic): | ||
| return True | ||
| return False | ||
|
|
||
| async def _handle_event(self, params: types.EventParams) -> None: | ||
| """Dispatch an incoming event to the registered handler.""" | ||
| if self._event_handler is None: | ||
| return | ||
|
|
||
| if self._subscribed_patterns and not self._topic_matches_subscriptions(params.topic): | ||
| return | ||
|
|
||
| if self._event_topic_filter is not None: | ||
| parts = self._event_topic_filter.split("/") | ||
| regex_parts: list[str] = [] | ||
| matched = False | ||
| for i, part in enumerate(parts): | ||
| if part == "#": | ||
| if regex_parts: | ||
| regex = "^" + "/".join(regex_parts) + "(/.*)?$" | ||
| else: | ||
| regex = "^.*$" | ||
| matched = bool(re.match(regex, params.topic)) | ||
| break | ||
| elif part == "+": | ||
| regex_parts.append("[^/]+") | ||
| else: | ||
| regex_parts.append(re.escape(part)) | ||
| else: | ||
| regex = "^" + "/".join(regex_parts) + "$" | ||
| matched = bool(re.match(regex, params.topic)) | ||
| if not matched: | ||
| return | ||
|
|
||
| await self._event_handler(params) |
There was a problem hiding this comment.
The logic for converting an MQTT-style topic pattern to a regular expression and matching it against a topic is duplicated here in _topic_matches_subscriptions and _handle_event. A similar function, _pattern_to_regex, also exists in src/mcp/server/events.py.
To improve maintainability and reduce code duplication, consider extracting this logic into a shared utility function. This function could take a pattern and a topic and return whether they match. You could place this utility in a shared module, for example mcp.shared.events_utils.
For example, you could have a function like:
# in a new mcp/shared/events_utils.py
import re
def topic_matches_pattern(topic: str, pattern: str) -> bool:
# ... implementation of regex conversion and matching ...This would simplify both _topic_matches_subscriptions and _handle_event significantly.
|
@gemini-code-assist please re-review |
There was a problem hiding this comment.
Code Review
This pull request introduces a comprehensive event system for the MCP Python SDK, enabling server-to-client push notifications with MQTT-style topic wildcards and retained values. Key enhancements include metadata support for resources, session idle timeouts for Streamable HTTP, improved transport closure handling, and a significant overhaul of the documentation. Feedback focuses on performance optimizations for event matching, specifically recommending the caching of compiled regex patterns to avoid overhead during high event volumes. Additionally, a logic correction was suggested to ensure that events are dropped when no matching subscriptions exist, aligning the implementation with the provided documentation.
| def set_event_handler( | ||
| self, | ||
| handler: EventHandlerFnT, | ||
| *, | ||
| topic_filter: str | None = None, | ||
| ) -> None: | ||
| """Register a callback for incoming event notifications.""" | ||
| self._event_handler = handler | ||
| self._event_topic_filter = topic_filter |
There was a problem hiding this comment.
| if self._event_handler is None: | ||
| return | ||
|
|
||
| if self._subscribed_patterns and not self._topic_matches_subscriptions(params.topic): |
There was a problem hiding this comment.
The subscription enforcement logic here contradicts the documentation and creates inconsistent behavior.
According to docs/events.md:247, events that do not match any active subscription should be dropped. However, the current implementation if self._subscribed_patterns and ... only performs the check if the client has at least one subscription. This means a client with zero subscriptions will receive all events pushed by the server, but as soon as it subscribes to a single topic, it becomes restricted.
Removing the self._subscribed_patterns and check will ensure that events are only processed if they match an active subscription, aligning the code with the documented behavior.
| if self._subscribed_patterns and not self._topic_matches_subscriptions(params.topic): | |
| if not self._topic_matches_subscriptions(params.topic): | |
| def _topic_matches_subscriptions(self, topic: str) -> bool: | ||
| """Check if a topic matches any of our subscribed patterns.""" | ||
| for pattern in self._subscribed_patterns: | ||
| parts = pattern.split("/") | ||
| regex_parts: list[str] = [] | ||
| for i, part in enumerate(parts): | ||
| if part == "#": | ||
| if regex_parts: | ||
| regex = "^" + "/".join(regex_parts) + "(/.*)?$" | ||
| else: | ||
| regex = "^.*$" | ||
| if re.match(regex, topic): | ||
| return True | ||
| break | ||
| elif part == "+": | ||
| regex_parts.append("[^/]+") | ||
| else: | ||
| regex_parts.append(re.escape(part)) | ||
| else: | ||
| regex = "^" + "/".join(regex_parts) + "$" | ||
| if re.match(regex, topic): | ||
| return True | ||
| return False | ||
|
|
There was a problem hiding this comment.
The MQTT-style topic matching logic in _topic_matches_subscriptions is inefficient and duplicated.
- Performance: The code repeatedly splits pattern strings and constructs regex patterns for every incoming event. This will become a significant CPU bottleneck for clients with multiple subscriptions or high event volume. Compiled regexes should be cached.
- Duplication: The logic for converting MQTT wildcards (
+,#) to regex is duplicated in_handle_event(lines 316-336) and is also present insrc/mcp/server/events.py. - Validation: This implementation lacks the validation enforced on the server side (e.g., ensuring
#is the last segment).
Consider refactoring the conversion logic into a shared utility function and caching the compiled regex patterns within the ClientSession (e.g., using a dict[str, re.Pattern]).
|
@gemini-code-assist please re-review |
There was a problem hiding this comment.
Code Review
This pull request introduces a major update to the MCP Python SDK, featuring a new event system for server-to-client push notifications, support for resource metadata, and an idle timeout mechanism for stateful HTTP sessions. The project structure is refined by moving detailed documentation to a dedicated docs/ directory and adding formal roadmap, versioning, and dependency policies. Feedback suggests standardizing timeout validation logic and error messages, avoiding hardcoded identifiers in session error responses, and optimizing memory usage when closing response streams by avoiding full dictionary copies.
| if session_idle_timeout is not None and session_idle_timeout <= 0: | ||
| raise ValueError("session_idle_timeout must be a positive number of seconds") |
| error_response = JSONRPCError( | ||
| jsonrpc="2.0", | ||
| id="server-error", | ||
| error=ErrorData( | ||
| code=INVALID_REQUEST, | ||
| message="Session not found", | ||
| ), |
| for id, stream in self._response_streams.items(): | ||
| # Snapshot: stream.send() wakes the waiter, whose finally pops | ||
| # from _response_streams before the next __next__() call. | ||
| for id, stream in list(self._response_streams.items()): |
…nt events Implements events/subscribe, events/unsubscribe, events/list, and events/emit as new MCP protocol methods. Includes subscription registry with MQTT-style wildcard matching (+/#), retained value store with TTL expiry, and client-side event handler with topic filtering and subscription tracking. Reference implementation for MCP SEP (Specification Enhancement Proposal) for topic-based server-to-client events.
Read session_id from InitializeResult._meta after initialize and expose
as session.session_id property. Enables client-side construction of
session-scoped event topics for the {session_id} authorization convention.
ProvenanceEnvelope wraps events with client-assessed provenance metadata (server, trust tier, topic, source) for safe injection into LLM context, with to_dict(), to_xml() (XML-escaped), and from_event() factory methods. EventQueue provides priority-aware buffering with 4 deques (urgent/high/ normal/low) that drain in strict priority order, supporting partial drains via max_count. Priority is resolved from the highest-priority requestedEffect; events without effects default to normal. Fixed a bug in the plan's _resolve_priority where the baseline was "normal" (rank 2), causing "low" priority events to be promoted to "normal". Changed baseline to "low" (rank 3) so all priority levels resolve correctly.
- Fix 1 (test_priority_from_multiple_effects): add low-priority event and drain with max_count=1 to prove multi-effect event lands in urgent bucket - Fix 2 (test_priority_no_effects): add low-priority event enqueued first and assert drain order proves no-effects defaults to normal - Fix 3 (test_to_xml_basic): exact string equality, assert event_id/received_at absent when not set - Fix 4 (test_to_xml_with_special_chars_in_attrs): verify escaped forms (single-quote wrapping for ", < for <) with exact string equality - Fix 5 (test_from_event_extracts_fields): replace "T" in string check with datetime.fromisoformat() validation
|
@gemini-code-assist please re-review |
Sort imports and reformat lines in tests/client/test_events.py and src/mcp/client/events.py to pass pre-commit ruff checks.
There was a problem hiding this comment.
Code Review
This pull request implements a topic-based server-to-client event system for the MCP SDK, enabling push notifications with MQTT-style wildcard patterns, retained values, and advisory effect hints. The implementation includes server-side subscription management and retained value storage, along with client-side event handling, filtering, and priority-aware buffering. Feedback suggests that the current linear scan in the subscription matching logic may present a performance bottleneck for high-volume servers, recommending a more efficient data structure like a trie for future optimization.
| async def match(self, topic: str) -> set[str]: | ||
| """Return session IDs whose subscriptions match *topic*. | ||
|
|
||
| Each session appears at most once (at-most-once delivery guarantee). | ||
| """ | ||
| async with self._lock: | ||
| result: set[str] = set() | ||
| for session_id, patterns in self._subscriptions.items(): | ||
| for pattern in patterns: | ||
| regex = self._compile(pattern) | ||
| if regex.match(topic): | ||
| result.add(session_id) | ||
| break # at-most-once per session | ||
| return result |
There was a problem hiding this comment.
The match method performs a linear scan over all sessions and all their patterns. While acceptable for a reference implementation or small-scale deployments, this
|
@gemini-code-assist please re-review |
There was a problem hiding this comment.
Code Review
This pull request implements a topic-based event system for server-to-client push notifications, featuring MQTT-style wildcards, retained values, and advisory effect hints. It introduces server-side subscription registries and retained value stores, as well as client-side utilities for priority-aware buffering and provenance tracking. Feedback identifies opportunities to support multiple event handlers on the client session, optimize server-side lock usage during topic matching, and improve MQTT compliance for the '+' wildcard to match empty segments.
| self._event_handler: EventHandlerFnT | None = None | ||
| self._event_topic_filter: str | None = None | ||
| self._event_topic_filter_regex: re.Pattern[str] | None = None |
There was a problem hiding this comment.
The current implementation only supports a single global event handler. If a user uses the @on_event decorator multiple times, each subsequent call will overwrite the previous handler. This is likely unexpected behavior for a PUB/SUB system. Consider maintaining a list of handlers and their associated filters.
| self._event_handler: EventHandlerFnT | None = None | |
| self._event_topic_filter: str | None = None | |
| self._event_topic_filter_regex: re.Pattern[str] | None = None | |
| self._event_handlers: list[tuple[EventHandlerFnT, re.Pattern[str] | None]] = [] |
| def set_event_handler( | ||
| self, | ||
| handler: EventHandlerFnT, | ||
| *, | ||
| topic_filter: str | None = None, | ||
| ) -> None: | ||
| """Register a callback for incoming event notifications. | ||
|
|
||
| If *topic_filter* is provided, it is compiled once here and the | ||
| cached regex is reused for every incoming event. The filter uses | ||
| the same MQTT-style wildcard syntax as subscription patterns | ||
| (``+`` for a single segment, ``#`` as a trailing multi-segment | ||
| wildcard). | ||
| """ | ||
| self._event_handler = handler | ||
| self._event_topic_filter = topic_filter | ||
| self._event_topic_filter_regex = pattern_to_regex(topic_filter) if topic_filter is not None else None |
There was a problem hiding this comment.
Renaming this to add_event_handler and appending to a list would allow multiple handlers to coexist, fixing the overwriting issue when using the decorator or multiple manual registrations.
| def set_event_handler( | |
| self, | |
| handler: EventHandlerFnT, | |
| *, | |
| topic_filter: str | None = None, | |
| ) -> None: | |
| """Register a callback for incoming event notifications. | |
| If *topic_filter* is provided, it is compiled once here and the | |
| cached regex is reused for every incoming event. The filter uses | |
| the same MQTT-style wildcard syntax as subscription patterns | |
| (``+`` for a single segment, ``#`` as a trailing multi-segment | |
| wildcard). | |
| """ | |
| self._event_handler = handler | |
| self._event_topic_filter = topic_filter | |
| self._event_topic_filter_regex = pattern_to_regex(topic_filter) if topic_filter is not None else None | |
| def add_event_handler( | |
| self, | |
| handler: EventHandlerFnT, | |
| *, | |
| topic_filter: str | None = None, | |
| ) -> None: | |
| """Register a callback for incoming event notifications. | |
| If *topic_filter* is provided, it is compiled once here and the | |
| cached regex is reused for every incoming event. The filter uses | |
| the same MQTT-style wildcard syntax as subscription patterns | |
| (``+`` for a single segment, ``#`` as a trailing multi-segment | |
| wildcard). | |
| """ | |
| regex = pattern_to_regex(topic_filter) if topic_filter is not None else None | |
| self._event_handlers.append((handler, regex)) |
| if self._event_handler is None: | ||
| return | ||
|
|
||
| if self._subscribed_patterns and not self._topic_matches_subscriptions(params.topic): | ||
| return | ||
|
|
||
| if self._event_topic_filter_regex is not None and not self._event_topic_filter_regex.match(params.topic): | ||
| return | ||
|
|
||
| await self._event_handler(params) |
There was a problem hiding this comment.
Awaiting the event handler directly in the reader loop means that a slow or blocking handler will stop the client from receiving any further messages (including responses to pending requests). Consider spawning a background task for each handler execution, though be mindful of event ordering if that is a requirement.
| else: | ||
| return re.compile("^.*$") | ||
| elif part == "+": | ||
| regex_parts.append("[^/]+") |
There was a problem hiding this comment.
The + wildcard in MQTT is intended to match exactly one topic level, which can be an empty string (e.g., a//c matches a/+/c). The current regex [^/]+ requires at least one character, which will fail to match empty levels. Using [^/]* is more compliant with MQTT semantics.
| regex_parts.append("[^/]+") | |
| regex_parts.append("[^/]*") |
| async with self._lock: | ||
| result: set[str] = set() | ||
| for session_id, patterns in self._subscriptions.items(): | ||
| for pattern in patterns: | ||
| regex = self._compile(pattern) | ||
| if regex.match(topic): | ||
| result.add(session_id) | ||
| break # at-most-once per session | ||
| return result |
There was a problem hiding this comment.
Holding the asyncio.Lock during the entire matching loop (which involves synchronous regex execution) can lead to significant performance bottlenecks and block other operations like add or remove if there are many sessions or patterns. It is better to snapshot the subscriptions under the lock and perform the matching outside of it.
| async with self._lock: | |
| result: set[str] = set() | |
| for session_id, patterns in self._subscriptions.items(): | |
| for pattern in patterns: | |
| regex = self._compile(pattern) | |
| if regex.match(topic): | |
| result.add(session_id) | |
| break # at-most-once per session | |
| return result | |
| async with self._lock: | |
| # Snapshot subscriptions to minimize lock duration | |
| snapshot = {sid: list(patterns) for sid, patterns in self._subscriptions.items()} | |
| result: set[str] = set() | |
| for session_id, patterns in snapshot.items(): | |
| for pattern in patterns: | |
| regex = self._compile(pattern) | |
| if regex.match(topic): | |
| result.add(session_id) | |
| break # at-most-once per session | |
| return result |
|
@gemini-code-assist please re-review |
There was a problem hiding this comment.
Code Review
This pull request introduces a comprehensive event system for the Model Context Protocol (MCP), enabling topic-based server-to-client push notifications. It includes new documentation, client and server-side subscription registries, a retained value store, and MQTT-style wildcard matching. Feedback focuses on improving the robustness of ISO 8601 timestamp parsing for Python 3.10 compatibility and optimizing expiration checks in the RetainedValueStore by reducing redundant system calls.
| def _is_expired(self, topic: str) -> bool: | ||
| """Check if a retained value has expired based on its ``expires_at``.""" | ||
| expires_at = self._expires.get(topic) | ||
| if expires_at is None: | ||
| return False | ||
| try: | ||
| expiry = datetime.fromisoformat(expires_at) | ||
| if expiry.tzinfo is None: | ||
| expiry = expiry.replace(tzinfo=timezone.utc) | ||
| return datetime.now(timezone.utc) >= expiry | ||
| except (ValueError, TypeError): | ||
| return False |
There was a problem hiding this comment.
The datetime.fromisoformat method in Python 3.10 (which this project supports) does not handle the 'Z' suffix for UTC. This will cause a ValueError for valid ISO 8601 strings ending in 'Z', leading _is_expired to incorrectly return False. Additionally, accepting an optional now parameter allows for optimization in batch operations like get_matching.
| def _is_expired(self, topic: str) -> bool: | |
| """Check if a retained value has expired based on its ``expires_at``.""" | |
| expires_at = self._expires.get(topic) | |
| if expires_at is None: | |
| return False | |
| try: | |
| expiry = datetime.fromisoformat(expires_at) | |
| if expiry.tzinfo is None: | |
| expiry = expiry.replace(tzinfo=timezone.utc) | |
| return datetime.now(timezone.utc) >= expiry | |
| except (ValueError, TypeError): | |
| return False | |
| def _is_expired(self, topic: str, now: datetime | None = None) -> bool: | |
| """Check if a retained value has expired based on its ``expires_at``.""" | |
| expires_at = self._expires.get(topic) | |
| if expires_at is None: | |
| return False | |
| try: | |
| ts = expires_at | |
| if ts.endswith("Z"): | |
| ts = ts[:-1] + "+00:00" | |
| expiry = datetime.fromisoformat(ts) | |
| if expiry.tzinfo is None: | |
| expiry = expiry.replace(tzinfo=timezone.utc) | |
| if now is None: | |
| now = datetime.now(timezone.utc) | |
| return now >= expiry | |
| except (ValueError, TypeError): | |
| return False |
| async def get_matching(self, pattern: str) -> list[RetainedEvent]: | ||
| """Return all non-expired retained events whose topic matches *pattern*.""" | ||
| async with self._lock: | ||
| if pattern not in self._regex_cache: | ||
| self._regex_cache[pattern] = _pattern_to_regex(pattern) | ||
| regex = self._regex_cache[pattern] | ||
| result: list[RetainedEvent] = [] | ||
| expired_topics: list[str] = [] | ||
| for topic, event in self._store.items(): | ||
| if self._is_expired(topic): | ||
| expired_topics.append(topic) | ||
| continue | ||
| if regex.match(topic): | ||
| result.append(event) | ||
| # Clean up expired entries | ||
| for topic in expired_topics: | ||
| del self._store[topic] | ||
| self._expires.pop(topic, None) | ||
| return result |
There was a problem hiding this comment.
In get_matching, _is_expired is called for every topic in the store. Since _is_expired calls datetime.now(timezone.utc) internally, this results in redundant system calls. It is more efficient to capture the current time once at the start of get_matching and pass it to _is_expired.
| async def get_matching(self, pattern: str) -> list[RetainedEvent]: | |
| """Return all non-expired retained events whose topic matches *pattern*.""" | |
| async with self._lock: | |
| if pattern not in self._regex_cache: | |
| self._regex_cache[pattern] = _pattern_to_regex(pattern) | |
| regex = self._regex_cache[pattern] | |
| result: list[RetainedEvent] = [] | |
| expired_topics: list[str] = [] | |
| for topic, event in self._store.items(): | |
| if self._is_expired(topic): | |
| expired_topics.append(topic) | |
| continue | |
| if regex.match(topic): | |
| result.append(event) | |
| # Clean up expired entries | |
| for topic in expired_topics: | |
| del self._store[topic] | |
| self._expires.pop(topic, None) | |
| return result | |
| async def get_matching(self, pattern: str) -> list[RetainedEvent]: | |
| """Return all non-expired retained events whose topic matches *pattern*.""" | |
| async with self._lock: | |
| if pattern not in self._regex_cache: | |
| self._regex_cache[pattern] = _pattern_to_regex(pattern) | |
| regex = self._regex_cache[pattern] | |
| result: list[RetainedEvent] = [] | |
| expired_topics: list[str] = [] | |
| now = datetime.now(timezone.utc) | |
| for topic, event in self._store.items(): | |
| if self._is_expired(topic, now): | |
| expired_topics.append(topic) | |
| continue | |
| if regex.match(topic): | |
| result.append(event) | |
| # Clean up expired entries | |
| for topic in expired_topics: | |
| del self._store[topic] | |
| self._expires.pop(topic, None) | |
| return result |
Lines 215-217 (meta session_id extraction) and 239 (session_id property) in client/session.py were uncovered. Two new tests exercise these paths: one where _meta contains a string session_id and one where it contains a non-string value (integer), verifying the isinstance guard.
|
@gemini-code-assist please re-review |
There was a problem hiding this comment.
Code Review
This pull request introduces a comprehensive event system for the Model Context Protocol (MCP), enabling topic-based server-to-client push notifications with support for MQTT-style wildcards, retained values, and advisory effect hints. The implementation includes a server-side subscription registry and retained value store, as well as client-side utilities for event handling, priority-aware buffering, and provenance metadata for LLM context injection. Feedback focuses on aligning the wildcard regex with MQTT standards and addressing potential memory leaks and performance bottlenecks caused by unbounded caches and linear matching logic in the server-side registry.
| elif part == "+": | ||
| regex_parts.append("[^/]+") |
There was a problem hiding this comment.
The regex for the + wildcard currently uses [^/]+, which requires at least one character in the topic segment. However, according to the MQTT specification (which this implementation aims to follow), the + wildcard matches exactly one topic level, and that level can be empty. For example, sport/+ should match sport/. Changing the regex to [^/]* would align with standard MQTT behavior by allowing empty segments to match.
| elif part == "+": | |
| regex_parts.append("[^/]+") | |
| elif part == "+": | |
| regex_parts.append("[^/]*") |
| # session_id -> set of raw pattern strings | ||
| self._subscriptions: dict[str, set[str]] = {} | ||
| # Cache compiled regexes: pattern string -> compiled regex | ||
| self._compiled: dict[str, re.Pattern[str]] = {} |
There was a problem hiding this comment.
The _compiled dictionary in SubscriptionRegistry acts as a cache for compiled regular expressions but is never pruned. If clients subscribe to many unique topic patterns (e.g., patterns containing unique IDs), this dictionary will grow indefinitely, potentially leading to a memory leak. Consider using a size-limited cache (like functools.lru_cache on a helper function or a custom eviction policy) to prevent unbounded memory growth.
| async def match(self, topic: str) -> set[str]: | ||
| """Return session IDs whose subscriptions match *topic*. | ||
|
|
||
| Each session appears at most once (at-most-once delivery guarantee). | ||
| """ | ||
| async with self._lock: | ||
| result: set[str] = set() | ||
| for session_id, patterns in self._subscriptions.items(): | ||
| for pattern in patterns: | ||
| regex = self._compile(pattern) | ||
| if regex.match(topic): | ||
| result.add(session_id) | ||
| break # at-most-once per session | ||
| return result |
There was a problem hiding this comment.
The match method iterates over every session and every pattern registered to those sessions while holding an asynchronous lock. This results in add or remove). For better scalability, consider using a more efficient data structure for topic matching, such as a Trie or a specialized subscription tree.
| self._lock = asyncio.Lock() | ||
| self._store: dict[str, RetainedEvent] = {} | ||
| self._expires: dict[str, str] = {} # topic -> ISO 8601 expires_at | ||
| self._regex_cache: dict[str, re.Pattern[str]] = {} |
There was a problem hiding this comment.
Similar to the SubscriptionRegistry, the _regex_cache in RetainedValueStore is an unbounded dictionary that caches compiled regexes for every pattern used in get_matching. This poses a risk of memory exhaustion if a large variety of patterns are queried over time. Implementing a cache eviction strategy or using a fixed-size cache is recommended.
|
@gemini-code-assist please re-review |
There was a problem hiding this comment.
Code Review
This pull request introduces a comprehensive 'Events' feature to the MCP SDK, enabling topic-based server-to-client push notifications using MQTT-style wildcards. Key additions include server-side subscription management and retained value stores, client-side priority-aware event buffering, and provenance metadata for LLM context injection. Feedback highlights a potential inconsistency in client-side filtering when all topics are unsubscribed and a compatibility issue with ISO 8601 timestamp parsing in Python versions prior to 3.11.
| if self._subscribed_patterns and not self._topic_matches_subscriptions(params.topic): | ||
| return |
There was a problem hiding this comment.
The current logic for client-side event filtering is inconsistent when a client unsubscribes from all topics. If self._subscribed_patterns becomes empty after previously having subscriptions, the client reverts to "pass-through" mode where it accepts all events from the server. This contradicts the expected behavior where unsubscribing from everything should stop the delivery of those events (or at least keep the filter strict).
Consider adding a flag to track if the client has ever entered "subscription mode" to maintain strict filtering even when the subscription set is empty.
| if expires_at is None: | ||
| return False | ||
| try: | ||
| expiry = datetime.fromisoformat(expires_at) |
There was a problem hiding this comment.
In Python versions prior to 3.11, datetime.fromisoformat() does not support the Z suffix for UTC. Since this is a library that may be used in environments running Python 3.10, it is safer to handle the Z suffix explicitly to avoid ValueError when parsing ISO 8601 timestamps provided by clients or external sources.
expiry_str = expires_at.replace("Z", "+00:00") if expires_at.endswith("Z") else expires_at
expiry = datetime.fromisoformat(expiry_str)|
@gemini-code-assist please re-review |
There was a problem hiding this comment.
Code Review
This pull request introduces a topic-based event system for server-to-client push notifications, featuring MQTT-style wildcard patterns, retained values, and advisory effect hints. The implementation includes server-side subscription management via a new registry and store, as well as client-side event handling with priority-aware buffering. Feedback was provided regarding a potential performance bottleneck in the server-side subscription matching logic, which currently scales linearly with the number of sessions and patterns.
| async def match(self, topic: str) -> set[str]: | ||
| """Return session IDs whose subscriptions match *topic*. | ||
|
|
||
| Each session appears at most once (at-most-once delivery guarantee). | ||
| """ | ||
| async with self._lock: | ||
| result: set[str] = set() | ||
| for session_id, patterns in self._subscriptions.items(): | ||
| for pattern in patterns: | ||
| regex = self._compile(pattern) | ||
| if regex.match(topic): | ||
| result.add(session_id) | ||
| break # at-most-once per session | ||
| return result |
There was a problem hiding this comment.
The current implementation of match iterates over all sessions and all patterns for each session, resulting in dict[str, set[str]]) to reduce the matching complexity to
|
@gemini-code-assist please re-review |
There was a problem hiding this comment.
Code Review
This pull request introduces topic-based server-to-client push notifications (Events) to the MCP SDK. It includes a new events.md documentation file, server-side components like SubscriptionRegistry and RetainedValueStore, and client-side support for subscribing to and handling events with priority-aware buffering. Feedback focuses on improving the efficiency of the RetainedValueStore by pre-parsing expiration timestamps and optimizing the _is_expired check within loops. Additionally, the documentation examples for subscription handlers should be updated to use a stable session identifier instead of the transient request_id.
| async def handle_subscribe(req: EventSubscribeRequest): | ||
| ctx = request_ctx.get() | ||
| subscribed = [] | ||
| for pattern in req.params.topics: | ||
| await registry.add(str(ctx.request_id), pattern) | ||
| subscribed.append(SubscribedTopic(pattern=pattern)) | ||
|
|
||
| retained: list[RetainedEvent] = [] | ||
| for pattern in req.params.topics: | ||
| retained.extend(await store.get_matching(pattern)) | ||
|
|
||
| return ServerResult( | ||
| EventSubscribeResult(subscribed=subscribed, retained=retained) | ||
| ) |
There was a problem hiding this comment.
In this example, ctx.request_id is used as the session identifier for the subscription registry. This is misleading because request_id is unique to the events/subscribe request itself, not the session. Subscriptions should be tied to a stable session identifier that persists across multiple requests from the same client. A better choice in this context would be str(id(ctx.session)), which identifies the ServerSession object associated with the connection.
| async def handle_subscribe(req: EventSubscribeRequest): | |
| ctx = request_ctx.get() | |
| subscribed = [] | |
| for pattern in req.params.topics: | |
| await registry.add(str(ctx.request_id), pattern) | |
| subscribed.append(SubscribedTopic(pattern=pattern)) | |
| retained: list[RetainedEvent] = [] | |
| for pattern in req.params.topics: | |
| retained.extend(await store.get_matching(pattern)) | |
| return ServerResult( | |
| EventSubscribeResult(subscribed=subscribed, retained=retained) | |
| ) | |
| async def handle_subscribe(req: EventSubscribeRequest): | |
| ctx = request_ctx.get() | |
| subscribed = [] | |
| session_id = str(id(ctx.session)) | |
| for pattern in req.params.topics: | |
| await registry.add(session_id, pattern) | |
| subscribed.append(SubscribedTopic(pattern=pattern)) | |
| retained: list[RetainedEvent] = [] | |
| for pattern in req.params.topics: | |
| retained.extend(await store.get_matching(pattern)) | |
| return ServerResult( | |
| EventSubscribeResult(subscribed=subscribed, retained=retained) | |
| ) |
| async def handle_unsubscribe(req: EventUnsubscribeRequest): | ||
| ctx = request_ctx.get() | ||
| for pattern in req.params.topics: | ||
| await registry.remove(str(ctx.request_id), pattern) | ||
| return ServerResult( | ||
| EventUnsubscribeResult(unsubscribed=req.params.topics) | ||
| ) |
There was a problem hiding this comment.
Similar to the subscribe handler, the unsubscribe handler should use a stable session identifier instead of the request_id of the unsubscribe request.
| async def handle_unsubscribe(req: EventUnsubscribeRequest): | |
| ctx = request_ctx.get() | |
| for pattern in req.params.topics: | |
| await registry.remove(str(ctx.request_id), pattern) | |
| return ServerResult( | |
| EventUnsubscribeResult(unsubscribed=req.params.topics) | |
| ) | |
| async def handle_unsubscribe(req: EventUnsubscribeRequest): | |
| ctx = request_ctx.get() | |
| session_id = str(id(ctx.session)) | |
| for pattern in req.params.topics: | |
| await registry.remove(session_id, pattern) | |
| return ServerResult( | |
| EventUnsubscribeResult(unsubscribed=req.params.topics) | |
| ) |
| def __init__(self) -> None: | ||
| self._lock = asyncio.Lock() | ||
| self._store: dict[str, RetainedEvent] = {} | ||
| self._expires: dict[str, str] = {} # topic -> ISO 8601 expires_at |
There was a problem hiding this comment.
The _expires dictionary currently stores expiration timestamps as strings, which requires repeated parsing in _is_expired (especially inside the loop in get_matching). Storing parsed datetime objects would improve performance.
| self._expires: dict[str, str] = {} # topic -> ISO 8601 expires_at | |
| self._expires: dict[str, datetime] = {} # topic -> expires_at |
| async def set(self, topic: str, event: RetainedEvent, expires_at: str | None = None) -> None: | ||
| """Store or replace the retained value for *topic*.""" | ||
| async with self._lock: | ||
| self._store[topic] = event | ||
| if expires_at is not None: | ||
| self._expires[topic] = expires_at | ||
| else: | ||
| self._expires.pop(topic, None) |
There was a problem hiding this comment.
Parse the expires_at string into a datetime object once during set to avoid repeated parsing during lookups and matching.
| async def set(self, topic: str, event: RetainedEvent, expires_at: str | None = None) -> None: | |
| """Store or replace the retained value for *topic*.""" | |
| async with self._lock: | |
| self._store[topic] = event | |
| if expires_at is not None: | |
| self._expires[topic] = expires_at | |
| else: | |
| self._expires.pop(topic, None) | |
| async def set(self, topic: str, event: RetainedEvent, expires_at: str | None = None) -> None: | |
| """Store or replace the retained value for *topic*.""" | |
| async with self._lock: | |
| self._store[topic] = event | |
| if expires_at is not None: | |
| try: | |
| expiry = datetime.fromisoformat(expires_at) | |
| if expiry.tzinfo is None: | |
| expiry = expiry.replace(tzinfo=timezone.utc) | |
| self._expires[topic] = expiry | |
| except (ValueError, TypeError): | |
| self._expires.pop(topic, None) | |
| else: | |
| self._expires.pop(topic, None) |
| async def get_matching(self, pattern: str) -> list[RetainedEvent]: | ||
| """Return all non-expired retained events whose topic matches *pattern*.""" | ||
| async with self._lock: | ||
| if pattern not in self._regex_cache: | ||
| self._regex_cache[pattern] = _pattern_to_regex(pattern) | ||
| regex = self._regex_cache[pattern] | ||
| result: list[RetainedEvent] = [] | ||
| expired_topics: list[str] = [] | ||
| for topic, event in self._store.items(): | ||
| if self._is_expired(topic): | ||
| expired_topics.append(topic) | ||
| continue | ||
| if regex.match(topic): | ||
| result.append(event) | ||
| # Clean up expired entries | ||
| for topic in expired_topics: | ||
| del self._store[topic] | ||
| self._expires.pop(topic, None) | ||
| return result |
There was a problem hiding this comment.
In get_matching, the current time is retrieved via datetime.now() for every topic in the store. It is more efficient to capture the current time once at the start of the method and pass it to _is_expired.
| async def get_matching(self, pattern: str) -> list[RetainedEvent]: | |
| """Return all non-expired retained events whose topic matches *pattern*.""" | |
| async with self._lock: | |
| if pattern not in self._regex_cache: | |
| self._regex_cache[pattern] = _pattern_to_regex(pattern) | |
| regex = self._regex_cache[pattern] | |
| result: list[RetainedEvent] = [] | |
| expired_topics: list[str] = [] | |
| for topic, event in self._store.items(): | |
| if self._is_expired(topic): | |
| expired_topics.append(topic) | |
| continue | |
| if regex.match(topic): | |
| result.append(event) | |
| # Clean up expired entries | |
| for topic in expired_topics: | |
| del self._store[topic] | |
| self._expires.pop(topic, None) | |
| return result | |
| async def get_matching(self, pattern: str) -> list[RetainedEvent]: | |
| """Return all non-expired retained events whose topic matches *pattern*.""" | |
| async with self._lock: | |
| if pattern not in self._regex_cache: | |
| self._regex_cache[pattern] = _pattern_to_regex(pattern) | |
| regex = self._regex_cache[pattern] | |
| result: list[RetainedEvent] = [] | |
| expired_topics: list[str] = [] | |
| now = datetime.now(timezone.utc) | |
| for topic, event in self._store.items(): | |
| if self._is_expired(topic, now): | |
| expired_topics.append(topic) | |
| continue | |
| if regex.match(topic): | |
| result.append(event) | |
| # Clean up expired entries | |
| for topic in expired_topics: | |
| del self._store[topic] | |
| self._expires.pop(topic, None) | |
| return result |
| def _is_expired(self, topic: str) -> bool: | ||
| """Check if a retained value has expired based on its ``expires_at``.""" | ||
| expires_at = self._expires.get(topic) | ||
| if expires_at is None: | ||
| return False | ||
| try: | ||
| expiry = datetime.fromisoformat(expires_at) | ||
| if expiry.tzinfo is None: | ||
| expiry = expiry.replace(tzinfo=timezone.utc) | ||
| return datetime.now(timezone.utc) >= expiry | ||
| except (ValueError, TypeError): | ||
| return False |
There was a problem hiding this comment.
Update _is_expired to work with pre-parsed datetime objects and accept an optional now parameter to avoid redundant datetime.now() calls in loops.
def _is_expired(self, topic: str, now: datetime | None = None) -> bool:
"""Check if a retained value has expired based on its ``expires_at``."""
expiry = self._expires.get(topic)
if expiry is None:
return False
if now is None:
now = datetime.now(timezone.utc)
return now >= expiry|
@gemini-code-assist please re-review |
There was a problem hiding this comment.
Code Review
This pull request introduces a comprehensive event system for the Model Context Protocol (MCP), enabling topic-based server-to-client push notifications. It includes server-side components like SubscriptionRegistry and RetainedValueStore for managing MQTT-style wildcard subscriptions and cached events, as well as client-side utilities for priority-aware event buffering and provenance tracking. Feedback is provided regarding the need for error handling in the client's event dispatch loop to prevent user-provided handlers from crashing the session message processing.
| if self._event_topic_filter_regex is not None and not self._event_topic_filter_regex.match(params.topic): | ||
| return | ||
|
|
||
| await self._event_handler(params) |
There was a problem hiding this comment.
The _handle_event method awaits the registered _event_handler without a try-except block. If a user-provided handler raises an exception, it will propagate up and potentially disrupt the session's message processing loop. It might be safer to wrap this call in a try-except block and log any errors from the handler to ensure session stability.
| await self._event_handler(params) | |
| try: | |
| await self._event_handler(params) | |
| except Exception as e: | |
| self._logger.error(f"Error in event handler: {e}") |
Align attribute names: use 'trust' (not 'server_trust') in XML output. Add priority and correlation_id fields to envelope. Update from_event() to extract priority from requestedEffects. XML format now matches opencode's formatMcpEvents and typescript-sdk's ProvenanceEnvelope.
- Remove requestedEffects and correlationId from EventParams
- Remove timestamp from EventParams and RetainedEvent
- Add top-level priority field to EventParams
- Make EventParams.payload optional (None for signal events)
- Update EventQueue to read priority directly
- Add kind (required), suggestedHandle, and retained fields to
EventTopicDescriptor
- Add retained flag to RetainedEvent
- Drop EventEffect class (effects model replaced by priority +
suggestedHandle + client-side handle)
- Update ProvenanceEnvelope: drop correlation_id, fix XML attribute
order to spec (server, topic, priority, event_id, trust, source),
always emit trust attribute
- Rename {session_id} to {agent_id} in topic pattern docstrings and
docs (only where referring to application-level identity; MCP
transport session ids unchanged)
- Update emit_event server API: drop correlation_id/requested_effects/
timestamp kwargs, add priority kwarg
- Update tests and docs/events.md to match v2
Summary
Context
Reference implementation for MCP SEP: topic-based server-to-client events.
This is the protocol foundation layer that fastmcp and opencode build on.
Test plan
uv run pytest tests/test_event_types.py tests/test_subscription_registry.py tests/test_event_roundtrip.py -x -v