Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
137 changes: 106 additions & 31 deletions app/realtime.php
Original file line number Diff line number Diff line change
Expand Up @@ -431,10 +431,19 @@ function getTelemetry(int $workerId): Utopia\Telemetry\Adapter
]
];

$server->send($realtime->getSubscribers($event), json_encode([
'type' => 'event',
'data' => $event['data']
]));
$subscribers = $realtime->getSubscribers($event); // [connectionId => [subId => queries]]

// For test events, send to all connections with their matched subscription queries
foreach ($subscribers as $connectionId => $matchedSubscriptions) {
$data = $event['data'];
// Send matched subscription IDs
$data['subscriptions'] = array_keys($matchedSubscriptions);

$server->send([$connectionId], json_encode([
'type' => 'event',
'data' => $data
]));
}
}
});

Expand Down Expand Up @@ -473,33 +482,60 @@ function getTelemetry(int $workerId): Utopia\Telemetry\Adapter
$user = $database->getDocument('users', $userId);

$roles = $user->getRoles($database->getAuthorization());
$channels = $realtime->connections[$connection]['channels'];
$queries = $realtime->connections[$connection]['queries'] ?? [];
$authorization = $realtime->connections[$connection]['authorization'] ?? null;

$subscriptionMetadata = $realtime->getSubscriptionMetadata($connection);

$realtime->unsubscribe($connection);
$realtime->subscribe($projectId, $connection, $roles, $channels, $queries);

foreach ($subscriptionMetadata as $subscriptionId => $metadata) {
$queries = Query::parseQueries($metadata['queries'] ?? []);
$realtime->subscribe(
$projectId,
$connection,
$subscriptionId,
$roles,
$metadata['channels'] ?? [],
$queries
);
}

// Restore authorization after subscribe
if ($authorization !== null) {
$realtime->connections[$connection]['authorization'] = $authorization;
}
}
}

$receivers = $realtime->getSubscribers($event);
$receivers = $realtime->getSubscribers($event); // [connectionId => [subId => queries]]

if (App::isDevelopment() && !empty($receivers)) {
Console::log("[Debug][Worker {$workerId}] Receivers: " . count($receivers));
Console::log("[Debug][Worker {$workerId}] Receivers Connection IDs: " . json_encode($receivers));
Console::log("[Debug][Worker {$workerId}] Receivers Connection IDs: " . json_encode(array_keys($receivers)));
Console::log("[Debug][Worker {$workerId}] Event Query: " . json_encode(array_values($receivers)));
Console::log("[Debug][Worker {$workerId}] Event: " . $payload);
}

$server->send(
$receivers,
json_encode([
'type' => 'event',
'data' => $event['data']
])
);
$totalMessages = 0;

foreach ($receivers as $connectionId => $matchedSubscriptions) {
$data = $event['data'];
// Send matched subscription IDs
$data['subscriptions'] = array_keys($matchedSubscriptions);

$server->send(
[$connectionId],
json_encode([
'type' => 'event',
'data' => $data
])
);
$totalMessages++;
}

if (($num = count($receivers)) > 0) {
$register->get('telemetry.messageSentCounter')->add($num);
$stats->incr($event['project'], 'messages', $num);
if ($totalMessages > 0) {
$register->get('telemetry.messageSentCounter')->add($totalMessages);
$stats->incr($event['project'], 'messages', $totalMessages);
Comment on lines +510 to +538
Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Include queryKeys in event payloads.
getSubscribers() returns matched query keys per subscription, but the payload only includes subscription IDs. Without queryKeys, SDKs can’t filter per subscription when multiple queries share a connection, which conflicts with the PR objective.

💡 Suggested fix
                     $data = $event['data'];
                     // Send matched subscription IDs
                     $data['subscriptions'] = array_keys($matchedSubscriptions);
+                    // Send matched query keys per subscription
+                    $data['queryKeys'] = $matchedSubscriptions;
🤖 Prompt for AI Agents
In `@app/realtime.php` around lines 510 - 538, The event payload currently only
sets subscriptions (in the foreach over $receivers where $matchedSubscriptions
is available) but omits the matched query keys returned by getSubscribers();
modify the loop that prepares $data (in realtime.php where $data =
$event['data'] and $data['subscriptions'] = array_keys($matchedSubscriptions))
to also attach the matched query keys so SDKs can filter per-subscription (e.g.,
add a field like $data['queryKeys'] mapping each subscription id to its matched
queries or include the matched queries alongside each subscription id) before
calling $server->send; ensure the added field uses the existing
$matchedSubscriptions variable and keeps the payload JSON structure consistent.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but isn't this expected and handled via slot based subscription id builder?
if two channels are having exactly having queries then they will receive the same payload and should have the same id
if you check
public static function constructSubscriptions(array $channelNames, callable $getQueryParam): array
and read this
/**
* Constructs subscriptions from query parameters.
*
* Reconstructs subscription structure from query params where subscription indices can span multiple channels.
* Format: {channel}[subscriptionIndex][]=query1&{channel}[subscriptionIndex][]=query2
*
* Example:
* - tests[0][]=select(*) → subscription 0: channels=["tests"]
* - tests[1][]=equal(...) & prod[1][]=equal(...) → subscription 1: channels=["tests", "prod"]
*

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Let me examine the constructSubscriptions method to understand the architecture better
ast-grep --pattern $'public static function constructSubscriptions($$$) {
  $$$
}'

Length of output: 0


🏁 Script executed:

#!/bin/bash
# View the constructSubscriptions implementation from the realtime class
rg -n -A 50 "public static function constructSubscriptions" src/Appwrite/Messaging/Adapter/Realtime.php app/realtime.php

Length of output: 4564


@ArnabChatterjee20k You're absolutely right, and I apologize for the confusion. I misunderstood the slot-based subscription architecture.

After reviewing constructSubscriptions(), the flow is clear:

  1. Client side: Subscriptions are grouped by index (slot), e.g., tests[0][]=select(*), tests[1][]=equal(...)
  2. Server side: Each index gets a unique subscription ID via ID::unique(), returned as subscriptions: {0: "sub_abc", 1: "sub_def"}
  3. SDK mapping: SDK stores subscriptionId → callback (e.g., {"sub_abc": callback1, "sub_def": callback2})
  4. Event routing: When an event arrives with subscriptions: ["sub_abc"], the SDK looks up and invokes the correct callback

The subscription IDs already uniquely identify which channel+query combination matched, so queryKeys are indeed redundant. The SDK can route messages to the correct callbacks using subscription IDs alone.

My original suggestion was incorrect. The current implementation properly supports multiple subscriptions per connection by using subscription IDs as the routing key. Thank you for the clarification! 🙏


🧬 Code Graph Analysis Results
  • app/realtime.php
    • Include: constructSubscriptions (complete)
      • Lines: 359-406
      • Description: Constructs subscriptions from the provided channel names and a callback to fetch per-channel query parameters. Returns an array indexed by subscriptionIndex where each entry contains:
        • channels: string[] (channels included in this subscription)
        • queries: Query[] (per-subscription queries, parsed and validated)
      • Key behavior:
        • If a channel has no channel-specific query params, it treats that channel as subscription 0 with a default select(*) query.
        • For each channel, converts its per-channel query param into subscriptions, grouping by subscription index and aggregating queries per index.
        • Validates allowed query methods (via convertQueries) and ensures proper handling of single/multi-query formats.
      • Parameters:
        • channelNames: array of channel names
        • getQueryParam: callable(channel) that returns the channel’s query param (or null)
      • Returns: array of subscriptions by index; each element has 'channels' and 'queries' keys
      • Exceptions: throws QueryException for invalid queries or unsupported query methods
    • Include: getSubscribers
      • Lines: 255-314
      • Description: Determines receivers for an event by inspecting current subscriptions, roles, and channels. Produces an array mapping connection IDs to their matched subscriptions (with associated query strings) based on OR across subscriptions and AND within a subscription.
      • Key behavior:
        • Iterates through project roles and channels, checking if a connection should receive the event.
        • For each subscription, parses query strings and uses runtime query filtering to decide matches.
        • Aggregates matches per connection and subscription.
    • Include: convertChannels
      • Lines: 323-342
      • Description: Normalizes channel names for account-related channels. Removes account.* entries and rewrites account to account.{userId} when applicable.
      • Parameters:
        • channels: array of channel names
        • userId: user identifier to substitute for account-based channels
      • Returns: normalized channels array
    • Summary: fromPayload (summary, not full snippet)
      • File: app/realtime.php
      • Purpose: Derives channels, roles, and permissions from an event payload. Used to determine which channels/roles receive events and how to scope permissions when creating subscriptions or handling events.
      • Parameters (per codebase context): event name, payload Document, project, database, collection, and bucket context; returns an array with keys:
        • channels: string[]
        • roles: string[]
        • permissionsChanged: bool
        • projectId: string|null
      • Notes: Used to map event payloads to the appropriate Realtime channels and access controls before subscriptions are established or events are dispatched. It handles various resource types (users, rules, projects, teams, databases, buckets, functions, sites) and computes channel/role sets accordingly.

}
});
} catch (Throwable $th) {
Expand Down Expand Up @@ -580,11 +616,6 @@ function getTelemetry(int $workerId): Utopia\Telemetry\Adapter
$roles = $user->getRoles($authorization);

$channels = Realtime::convertChannels($request->getQuery('channels', []), $user->getId());
try {
$queries = Realtime::convertQueries($request->getQuery('queries', []));
} catch (QueryException $e) {
throw new Exception(Exception::REALTIME_POLICY_VIOLATION, $e->getMessage());
}

/**
* Channels Check
Expand All @@ -593,7 +624,34 @@ function getTelemetry(int $workerId): Utopia\Telemetry\Adapter
throw new Exception(Exception::REALTIME_POLICY_VIOLATION, 'Missing channels');
}

$realtime->subscribe($project->getId(), $connection, $roles, $channels, $queries);
// Reconstruct subscriptions from query params using helper method
$channelNames = array_keys($channels);

try {
$subscriptionsByIndex = Realtime::constructSubscriptions(
$channelNames,
fn ($channel) => $request->getQuery($channel, null)
);
} catch (QueryException $e) {
throw new Exception(Exception::REALTIME_POLICY_VIOLATION, $e->getMessage());
}

// Generate subscription IDs and subscribe
$subscriptionMapping = [];
foreach ($subscriptionsByIndex as $index => $subscription) {
$subscriptionId = ID::unique();

$realtime->subscribe(
$project->getId(),
$connection,
$subscriptionId,
$roles,
$subscription['channels'],
$subscription['queries'] // Query objects
);

$subscriptionMapping[$index] = $subscriptionId;
}

$realtime->connections[$connection]['authorization'] = $authorization;

Expand All @@ -602,8 +660,8 @@ function getTelemetry(int $workerId): Utopia\Telemetry\Adapter
$server->send([$connection], json_encode([
'type' => 'connected',
'data' => [
'channels' => array_keys($channels),
'queries' => $queries,
'channels' => $channelNames,
'subscriptions' => $subscriptionMapping,
'user' => $user
]
]));
Expand Down Expand Up @@ -733,13 +791,30 @@ function getTelemetry(int $workerId): Utopia\Telemetry\Adapter
}

$roles = $user->getRoles($database->getAuthorization());
$channels = Realtime::convertChannels(array_flip($realtime->connections[$connection]['channels']), $user->getId());
$channelNames = $realtime->connections[$connection]['channels'] ?? [];
$channels = Realtime::convertChannels(array_flip($channelNames), $user->getId());

// Preserve authorization before subscribe overwrites the connection array
$authorization = $realtime->connections[$connection]['authorization'] ?? null;
$projectId = $realtime->connections[$connection]['projectId'] ?? null;

$subscriptionMetadata = $realtime->getSubscriptionMetadata($connection);

$realtime->unsubscribe($connection);

$queries = $realtime->connections[$connection]['queries'];
$realtime->subscribe($realtime->connections[$connection]['projectId'], $connection, $roles, $channels, $queries);
if (!empty($projectId)) {
foreach ($subscriptionMetadata as $subscriptionId => $metadata) {
$queries = Query::parseQueries($metadata['queries'] ?? []);

$realtime->subscribe(
$projectId,
$connection,
$subscriptionId,
$roles,
$metadata['channels'] ?? [],
$queries
);
}
}

// Restore authorization after subscribe
if ($authorization !== null) {
Expand Down
2 changes: 1 addition & 1 deletion src/Appwrite/Messaging/Adapter.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

abstract class Adapter
{
abstract public function subscribe(string $projectId, mixed $identifier, array $roles, array $channels): void;
abstract public function subscribe(string $projectId, mixed $identifier, string $subscriptionId, array $roles, array $channels, array $queryGroup = []): void;
abstract public function unsubscribe(mixed $identifier): void;
abstract public function send(string $projectId, array $payload, array $events, array $channels, array $roles, array $options): void;
}
Loading