Added query per subscription and queryKeys along with the messages#11202
Added query per subscription and queryKeys along with the messages#11202
Conversation
📝 WalkthroughWalkthroughSubscriptions now store per-connection query-key maps per channel (queries normalized to query keys; empty queries use the empty-string key). Connection records no longer store raw queries. The messaging adapter’s subscribe/unsubscribe/hasSubscriber/getSubscribers were updated to use the new query-key mapping and return a mapping of receiverId → matched queryKeys. Realtime transport switched from bulk sends to per-connection sends that include the matched subscription IDs/queryKeys and increment a totalMessages telemetry counter. Realtime::subscribe signature was changed to accept a Estimated code review effort🎯 4 (Complex) | ⏱️ ~45 minutes 🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Security Scan Results for PRDocker Image Scan Results
Source Code Scan Results🎉 No vulnerabilities found! |
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@src/Appwrite/Messaging/Adapter/Realtime.php`:
- Around line 96-100: The connections entry no longer stores the 'queries'
field, causing later code (and app/realtime.php access like
$realtime->connections[$connection]['queries']) to fail and losing query
persistence; update the code that sets $this->connections[$identifier] (the
array assignment in the Realtime adapter) to include a 'queries' key populated
from the subscribe() parameter or default to an empty array, and ensure all
reads of $this->connections[...]['queries'] use null-safe access (e.g., treat
missing as []) so connections are initialized with persistent queries and future
accesses won't throw.
🧹 Nitpick comments (3)
src/Appwrite/Messaging/Adapter/Realtime.php (2)
87-94: Unused variable$listflagged by static analysis.The variable
$listfrom the foreach iteration is not used within the loop body. Consider usingarray_keys()or suppressing with underscore convention if your codebase supports it.♻️ Suggested fix
- foreach ($channels as $channel => $list) { + foreach (array_keys($channels) as $channel) { if (!isset($this->subscriptions[$projectId][$role][$channel][$identifier])) { $this->subscriptions[$projectId][$role][$channel][$identifier] = []; } foreach ($queryKeys as $queryKey) { $this->subscriptions[$projectId][$role][$channel][$identifier][$queryKey] = true; } }
115-122: Unused variable$listflagged by static analysis.Same issue as in
subscribe()- the$listvariable is unused.♻️ Suggested fix
- foreach ($channels as $channel => $list) { + foreach (array_keys($channels) as $channel) { unset($this->subscriptions[$projectId][$role][$channel][$connection]); // dropping connection will drop the queries as well if (empty($this->subscriptions[$projectId][$role][$channel])) { unset($this->subscriptions[$projectId][$role][$channel]); // Remove channel when no connections } }tests/e2e/Services/Realtime/RealtimeCustomClientQueryTest.php (1)
1588-1603: Consider consolidating attribute creation and sleep calls.The consecutive
sleep(2)calls after each attribute creation (lines 1590 and 1602) could be consolidated. Create both attributes first, then sleep once.♻️ Suggested optimization
// Attributes used by queries $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $collectionId . '/attributes/string', array_merge([ 'content-type' => 'application/json', 'x-appwrite-project' => $projectId, 'x-appwrite-key' => $this->getProject()['apiKey'] ]), [ 'key' => 'status', 'size' => 256, 'required' => false, ]); - sleep(2); - $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $collectionId . '/attributes/string', array_merge([ 'content-type' => 'application/json', 'x-appwrite-project' => $projectId, 'x-appwrite-key' => $this->getProject()['apiKey'] ]), [ 'key' => 'category', 'size' => 256, 'required' => false, ]); - sleep(2); + sleep(3); // Wait for both attributes to be ready
✨ Benchmark results
⚡ Benchmark Comparison
|
There was a problem hiding this comment.
Actionable comments posted: 0
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
app/realtime.php (1)
743-754: Preserve query filters when re-subscribing after authentication.The new subscribe call omits queries, so any existing query filters are dropped after authentication, widening subscriptions (and potentially leaking data). Capture the existing query keys for this connection and re-parse them before re-subscribing. This same preservation should also be applied to the permissionsChanged re-subscribe path to avoid silently widening subscriptions there.
🛠️ Proposed fix to preserve query filters
- $realtime->subscribe($realtime->connections[$connection]['projectId'], $connection, $roles, $channels); + $projectId = $realtime->connections[$connection]['projectId']; + $queryKeys = []; + foreach ($realtime->subscriptions[$projectId] ?? [] as $roleSubscriptions) { + foreach (array_keys($channels) as $channel) { + if (isset($roleSubscriptions[$channel][$connection])) { + $queryKeys = array_merge($queryKeys, array_keys($roleSubscriptions[$channel][$connection])); + } + } + } + $queryKeys = array_values(array_unique($queryKeys)); + $queries = \in_array('', $queryKeys, true) ? [] : Query::parseQueries($queryKeys); + + $realtime->subscribe($projectId, $connection, $roles, $channels, $queries);
🧹 Nitpick comments (1)
app/realtime.php (1)
492-510: Count only successful sends in telemetry.If a connection is closed or the send fails, the current logic still increments message counters. Consider guarding the increment with the send result so telemetry and stats reflect actual deliveries.
📊 Suggested adjustment
- $server->send( + if ($server->send( [$connectionId], json_encode([ 'type' => 'event', 'data' => $data ]) - ); - $totalMessages++; + )) { + $totalMessages++; + }
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
tests/e2e/Services/Realtime/RealtimeCustomClientQueryTest.php (1)
1410-1422: Duplicate document ID may cause test failure.Line 1415 attempts to create a document with
$targetDocId, but this ID was already used to create a document at Line 1368. Depending on the API behavior, this could either fail with a duplicate ID error or update the existing document, neither of which matches the test intent of creating a new document that matches the ID query but not the status query.🐛 Proposed fix: use a unique ID
+ $targetDocId2 = ID::unique(); // Create document with matching ID but wrong status - should NOT receive event (only one query matches) $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $collectionId . '/documents', array_merge([ 'content-type' => 'application/json', 'x-appwrite-project' => $projectId, ], $this->getHeaders()), [ - 'documentId' => $targetDocId, + 'documentId' => $targetDocId2, 'data' => [ 'status' => 'inactive' ],Note: If the intent is to test updating an existing document, consider using a PATCH request instead and updating the test comment accordingly.
app/realtime.php (1)
476-480: Queries lost on permission change re-subscription.When permissions change, the code attempts to retrieve
queriesfrom$realtime->connections[$connection]['queries'](line 477). However, thesubscribe()method insrc/Appwrite/Messaging/Adapter/Realtime.phponly storesprojectId,roles, andchannelsin the$this->connectionsarray (lines 96-100)—it never persists thequeriesfield. This means the?? []fallback always applies, causing users to lose their subscription queries after a permission change event. Thequeriesare processed and stored in the internal$this->subscriptionsstructure during subscribe, but are not available in$this->connectionsfor later retrieval.
🤖 Fix all issues with AI agents
In `@app/realtime.php`:
- Line 489: The Console::log call is concatenating an array
(array_values($receivers)) into a string, causing an "Array-to-string
conversion" warning and incorrect output; update the debug log in the
Console::log invocation that references $receivers (the "[Debug][Worker
{$workerId}] QueryKeys" line) to convert the array to a string (for example
using implode(', ', array_values($receivers)) or
json_encode(array_values($receivers))) so the actual values are logged instead
of "Array".
🧹 Nitpick comments (1)
tests/e2e/Services/Realtime/RealtimeCustomClientQueryTest.php (1)
1431-1467: Clarify test logic with comments or restructure.The test creates a document with
status => 'active'(Line 1439) but then immediately creates another document withstatus => 'inactive'(Lines 1449-1461), and expects to receive the first document's event only at Line 1463.The inline comments explain this behavior, but the assertion at Lines 1464-1467 validates the queryKeys for the active document, not the inactive one. This is correct but could be clearer. Consider adding an explicit assertion for the document ID to confirm which event was received.
📝 Suggested clarification
$data = json_decode($client->receive(), true); + // Verify we received the active document event (from lines 1431-1444) + $this->assertEquals('active', $data['data']['payload']['status']); $this->assertIsArray($data['data']['queryKeys']); $this->assertEquals(1, count($data['data']['queryKeys'])); $this->assertNotContains(Query::equal('status', ['inactive'])->toString(), $data['data']['queryKeys']); $this->assertContains(Query::equal('status', ['active'])->toString(), $data['data']['queryKeys']);
35b0385 to
3c2f0cc
Compare
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@app/realtime.php`:
- Around line 510-538: The event payload currently only sets subscriptions (in
the foreach over $receivers where $matchedSubscriptions is available) but omits
the matched query keys returned by getSubscribers(); modify the loop that
prepares $data (in realtime.php where $data = $event['data'] and
$data['subscriptions'] = array_keys($matchedSubscriptions)) to also attach the
matched query keys so SDKs can filter per-subscription (e.g., add a field like
$data['queryKeys'] mapping each subscription id to its matched queries or
include the matched queries alongside each subscription id) before calling
$server->send; ensure the added field uses the existing $matchedSubscriptions
variable and keeps the payload JSON structure consistent.
| $receivers = $realtime->getSubscribers($event); // [connectionId => [subId => queries]] | ||
|
|
||
| if (App::isDevelopment() && !empty($receivers)) { | ||
| Console::log("[Debug][Worker {$workerId}] Receivers: " . count($receivers)); | ||
| Console::log("[Debug][Worker {$workerId}] Receivers Connection IDs: " . json_encode($receivers)); | ||
| Console::log("[Debug][Worker {$workerId}] Receivers Connection IDs: " . json_encode(array_keys($receivers))); | ||
| Console::log("[Debug][Worker {$workerId}] Event Query: " . json_encode(array_values($receivers))); | ||
| Console::log("[Debug][Worker {$workerId}] Event: " . $payload); | ||
| } | ||
|
|
||
| $server->send( | ||
| $receivers, | ||
| json_encode([ | ||
| 'type' => 'event', | ||
| 'data' => $event['data'] | ||
| ]) | ||
| ); | ||
| $totalMessages = 0; | ||
|
|
||
| foreach ($receivers as $connectionId => $matchedSubscriptions) { | ||
| $data = $event['data']; | ||
| // Send matched subscription IDs | ||
| $data['subscriptions'] = array_keys($matchedSubscriptions); | ||
|
|
||
| $server->send( | ||
| [$connectionId], | ||
| json_encode([ | ||
| 'type' => 'event', | ||
| 'data' => $data | ||
| ]) | ||
| ); | ||
| $totalMessages++; | ||
| } | ||
|
|
||
| if (($num = count($receivers)) > 0) { | ||
| $register->get('telemetry.messageSentCounter')->add($num); | ||
| $stats->incr($event['project'], 'messages', $num); | ||
| if ($totalMessages > 0) { | ||
| $register->get('telemetry.messageSentCounter')->add($totalMessages); | ||
| $stats->incr($event['project'], 'messages', $totalMessages); |
There was a problem hiding this comment.
Include queryKeys in event payloads.
getSubscribers() returns matched query keys per subscription, but the payload only includes subscription IDs. Without queryKeys, SDKs can’t filter per subscription when multiple queries share a connection, which conflicts with the PR objective.
💡 Suggested fix
$data = $event['data'];
// Send matched subscription IDs
$data['subscriptions'] = array_keys($matchedSubscriptions);
+ // Send matched query keys per subscription
+ $data['queryKeys'] = $matchedSubscriptions;🤖 Prompt for AI Agents
In `@app/realtime.php` around lines 510 - 538, The event payload currently only
sets subscriptions (in the foreach over $receivers where $matchedSubscriptions
is available) but omits the matched query keys returned by getSubscribers();
modify the loop that prepares $data (in realtime.php where $data =
$event['data'] and $data['subscriptions'] = array_keys($matchedSubscriptions))
to also attach the matched query keys so SDKs can filter per-subscription (e.g.,
add a field like $data['queryKeys'] mapping each subscription id to its matched
queries or include the matched queries alongside each subscription id) before
calling $server->send; ensure the added field uses the existing
$matchedSubscriptions variable and keeps the payload JSON structure consistent.
There was a problem hiding this comment.
but isn't this expected and handled via slot based subscription id builder?
if two channels are having exactly having queries then they will receive the same payload and should have the same id
if you check
public static function constructSubscriptions(array $channelNames, callable $getQueryParam): array
and read this
/**
* Constructs subscriptions from query parameters.
*
* Reconstructs subscription structure from query params where subscription indices can span multiple channels.
* Format: {channel}[subscriptionIndex][]=query1&{channel}[subscriptionIndex][]=query2
*
* Example:
* - tests[0][]=select(*) → subscription 0: channels=["tests"]
* - tests[1][]=equal(...) & prod[1][]=equal(...) → subscription 1: channels=["tests", "prod"]
*
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Let me examine the constructSubscriptions method to understand the architecture better
ast-grep --pattern $'public static function constructSubscriptions($$$) {
$$$
}'
Length of output: 0
🏁 Script executed:
#!/bin/bash
# View the constructSubscriptions implementation from the realtime class
rg -n -A 50 "public static function constructSubscriptions" src/Appwrite/Messaging/Adapter/Realtime.php app/realtime.php
Length of output: 4564
@ArnabChatterjee20k You're absolutely right, and I apologize for the confusion. I misunderstood the slot-based subscription architecture.
After reviewing constructSubscriptions(), the flow is clear:
- Client side: Subscriptions are grouped by index (slot), e.g.,
tests[0][]=select(*),tests[1][]=equal(...) - Server side: Each index gets a unique subscription ID via
ID::unique(), returned assubscriptions: {0: "sub_abc", 1: "sub_def"} - SDK mapping: SDK stores
subscriptionId → callback(e.g.,{"sub_abc": callback1, "sub_def": callback2}) - Event routing: When an event arrives with
subscriptions: ["sub_abc"], the SDK looks up and invokes the correct callback
The subscription IDs already uniquely identify which channel+query combination matched, so queryKeys are indeed redundant. The SDK can route messages to the correct callbacks using subscription IDs alone.
My original suggestion was incorrect. The current implementation properly supports multiple subscriptions per connection by using subscription IDs as the routing key. Thank you for the clarification! 🙏
🧬 Code Graph Analysis Results
- app/realtime.php
- Include: constructSubscriptions (complete)
- Lines: 359-406
- Description: Constructs subscriptions from the provided channel names and a callback to fetch per-channel query parameters. Returns an array indexed by subscriptionIndex where each entry contains:
- channels: string[] (channels included in this subscription)
- queries: Query[] (per-subscription queries, parsed and validated)
- Key behavior:
- If a channel has no channel-specific query params, it treats that channel as subscription 0 with a default select(*) query.
- For each channel, converts its per-channel query param into subscriptions, grouping by subscription index and aggregating queries per index.
- Validates allowed query methods (via convertQueries) and ensures proper handling of single/multi-query formats.
- Parameters:
- channelNames: array of channel names
- getQueryParam: callable(channel) that returns the channel’s query param (or null)
- Returns: array of subscriptions by index; each element has 'channels' and 'queries' keys
- Exceptions: throws QueryException for invalid queries or unsupported query methods
- Include: getSubscribers
- Lines: 255-314
- Description: Determines receivers for an event by inspecting current subscriptions, roles, and channels. Produces an array mapping connection IDs to their matched subscriptions (with associated query strings) based on OR across subscriptions and AND within a subscription.
- Key behavior:
- Iterates through project roles and channels, checking if a connection should receive the event.
- For each subscription, parses query strings and uses runtime query filtering to decide matches.
- Aggregates matches per connection and subscription.
- Include: convertChannels
- Lines: 323-342
- Description: Normalizes channel names for account-related channels. Removes account.* entries and rewrites account to account.{userId} when applicable.
- Parameters:
- channels: array of channel names
- userId: user identifier to substitute for account-based channels
- Returns: normalized channels array
- Summary: fromPayload (summary, not full snippet)
- File: app/realtime.php
- Purpose: Derives channels, roles, and permissions from an event payload. Used to determine which channels/roles receive events and how to scope permissions when creating subscriptions or handling events.
- Parameters (per codebase context): event name, payload Document, project, database, collection, and bucket context; returns an array with keys:
- channels: string[]
- roles: string[]
- permissionsChanged: bool
- projectId: string|null
- Notes: Used to map event payloads to the appropriate Realtime channels and access controls before subscriptions are established or events are dispatched. It handles various resource types (users, rules, projects, teams, databases, buckets, functions, sites) and computes channel/role sets accordingly.
- Include: constructSubscriptions (complete)
What does this PR do?
Issue
Currently , query getting set at the connection level during subscriptions resulting in overwriting of the queries and multiple subscriptions creating unexpected results
Solution
Setting at the subscription level with querykeys resulting query per subscription
Note
Usage
Sdk subscription methods should use the query keys to filter out what message to pass to the onMessage callback of the subscription
subscription1 receives all the query keys with ''
Test Plan
(Write your test plan here. If you changed any code, please provide us with clear instructions on how you verified your changes work. Screenshots may also be helpful.)
Related PRs and Issues
Checklist