Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 64 additions & 8 deletions src/Appwrite/Event/Event.php
Original file line number Diff line number Diff line change
Expand Up @@ -518,10 +518,11 @@ public static function parseEventPattern(string $pattern): array
*
* @param string $pattern
* @param array $params
* @param ?Document $database
* @return array
* @throws \InvalidArgumentException
*/
public static function generateEvents(string $pattern, array $params = []): array
public static function generateEvents(string $pattern, array $params = [], ?Document $database = null): array
{
// $params = \array_filter($params, fn($param) => !\is_array($param));
$paramKeys = \array_keys($params);
Expand All @@ -530,6 +531,12 @@ public static function generateEvents(string $pattern, array $params = []): arra
$patterns = [];

$parsed = self::parseEventPattern($pattern);
// to switch the resource types from databases to the required prefix
// eg; all databases events get fired with databases. prefix which mainly depicts legacy type
// so a projection from databases to the actual prefix
if ((str_contains($pattern, 'databases.') && $database && $database->getAttribute('type') !== 'legacy')) {
$parsed = self::getDatabaseTypeEvents($database, $parsed);
}
$type = $parsed['type'];
$resource = $parsed['resource'];
$subType = $parsed['subType'];
Expand Down Expand Up @@ -630,8 +637,8 @@ public static function generateEvents(string $pattern, array $params = []): arra
$eventValues = \array_values($events);

/**
* Return a combined list of table, collection events.
*/
* Return a combined list of table, collection events and if tablesdb present then include all for backward compatibility
*/
return Event::mirrorCollectionEvents($pattern, $eventValues[0], $eventValues);
}

Expand Down Expand Up @@ -671,16 +678,41 @@ private static function mirrorCollectionEvents(string $pattern, string $firstEve
'attributes' => 'columns',
];

$databasesEventMap = [
'tablesdb' => 'databases',
'tables' => 'collections',
'rows' => 'documents',
'columns' => 'attributes'
];

if (
str_contains($pattern, 'databases.') &&
str_contains($firstEvent, 'collections')
(
str_contains($pattern, 'databases.') &&
str_contains($firstEvent, 'collections')
) ||
(
str_contains($firstEvent, 'tablesdb.')
)
) {
$pairedEvents = [];

foreach ($events as $event) {
$pairedEvents[] = $event;

if (str_contains($event, 'collections')) {
// tablesdb needs databases event with tables and collections
if (str_contains($event, 'tablesdb')) {
$databasesSideEvent = str_replace(
array_keys($databasesEventMap),
array_values($databasesEventMap),
$event
);
$pairedEvents[] = $databasesSideEvent;
$tableSideEvent = str_replace(
array_keys($tableEventMap),
array_values($tableEventMap),
$databasesSideEvent
);
$pairedEvents[] = $tableSideEvent;
} elseif (str_contains($event, 'collections')) {
$tableSideEvent = str_replace(
array_keys($tableEventMap),
array_values($tableEventMap),
Expand All @@ -692,8 +724,32 @@ private static function mirrorCollectionEvents(string $pattern, string $firstEve

$events = $pairedEvents;
}
// mirrored events can have duplicates in case of smaller events
return array_unique($events);
}
Comment thread
abnegate marked this conversation as resolved.

return $events;
/**
* Maps event terminology based on database type
*/
private static function getDatabaseTypeEvents(Document $database, array $event): array
{
$eventMap = [];
switch ($database->getAttribute('type')) {
case 'tablesdb':
$eventMap = [
'databases' => 'tablesdb',
'documents' => 'rows',
'collections' => 'tables',
'attributes' => 'columns',
];
break;
}
foreach ($event as $eventKey => $eventValue) {
if (isset($eventMap[$eventValue])) {
$event[$eventKey] = $eventMap[$eventValue];
}
}
return $event;
}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/Appwrite/Event/Realtime.php
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public function trigger(): string|bool
return false;
}

$allEvents = Event::generateEvents($this->getEvent(), $this->getParams());
$allEvents = Event::generateEvents($this->getEvent(), $this->getParams(), $this->getContext('database'));

$payload = new Document($this->getPayload());

Expand Down
76 changes: 70 additions & 6 deletions src/Appwrite/Messaging/Adapter/Realtime.php
Original file line number Diff line number Diff line change
Expand Up @@ -491,6 +491,7 @@ public static function fromPayload(string $event, Document $payload, ?Document $
$roles = [Role::team(ID::custom($parts[1]))->toString()];
break;
case 'databases':
case 'tablesdb':
$resource = $parts[4] ?? '';
if (in_array($resource, ['columns', 'attributes', 'indexes'])) {
$channels[] = 'console';
Expand All @@ -508,14 +509,26 @@ public static function fromPayload(string $event, Document $payload, ?Document $
$tableId = $payload->getAttribute('$tableId', '');
$collectionId = $payload->getAttribute('$collectionId', '');
$resourceId = $tableId ?: $collectionId;
$channels = [];
// backward compat(tablesdb will have databases channels + tablesdb prefixed channels)
if ($parts[0] === 'databases' || $parts[0] === 'tablesdb') {
$prefix = 'databases';

$channels[] = 'rows';
$channels[] = 'databases.' . $database->getId() . '.tables.' . $resourceId . '.rows';
$channels[] = 'databases.' . $database->getId() . '.tables.' . $resourceId . '.rows.' . $payload->getId();
$channels = self::getDatabaseChannels('legacy', $database->getId(), $resourceId, $payload->getId(), $prefix);

$channels[] = 'documents';
$channels[] = 'databases.' . $database->getId() . '.collections.' . $resourceId . '.documents';
$channels[] = 'databases.' . $database->getId() . '.collections.' . $resourceId . '.documents.' . $payload->getId();
$channels = array_unique([
...$channels,
...self::getDatabaseChannels('tablesdb', $database->getId(), $resourceId, $payload->getId(), $prefix)
]);
}

// prefixed channels -> tablesdb
if ($parts[0] !== 'databases') {
$channels = array_unique([
...$channels,
...self::getDatabaseChannels($parts[0], $database->getId(), $resourceId, $payload->getId()),
]);
}

$roles = $collection->getAttribute('documentSecurity', false)
? \array_merge($collection->getRead(), $payload->getRead())
Expand Down Expand Up @@ -572,4 +585,55 @@ public static function fromPayload(string $event, Document $payload, ?Document $
'projectId' => $projectId
];
}

/**
* Generate realtime channels for database events
*
* @param string $type The database API type
* @param string $databaseId The database ID
* @param string $resourceId The collection/table ID
* @param string $payloadId The document/row ID
* @param string $prefixOverride Override the channel prefix when different API types share the same terminology but need different prefixes
* @return array Array of channel names
*/
private static function getDatabaseChannels(
string $type = 'databases',
string $databaseId = '',
string $resourceId = '',
string $payloadId = '',
string $prefixOverride = '',
): array {
$basePrefix = $prefixOverride ?: $type;

if (!$databaseId || !$resourceId || !$payloadId) {
return [];
}

$channels = [];

switch ($type) {
case 'legacy':
if (empty($prefixOverride)) {
$basePrefix = 'databases';
}
$channels[] = 'documents';
$channels[] = "{$basePrefix}.{$databaseId}.collections.{$resourceId}.documents";
$channels[] = "{$basePrefix}.{$databaseId}.collections.{$resourceId}.documents.{$payloadId}";
break;
case 'tablesdb':
$channels[] = 'rows';
$channels[] = "{$basePrefix}.{$databaseId}.tables.{$resourceId}.rows";
$channels[] = "{$basePrefix}.{$databaseId}.tables.{$resourceId}.rows.{$payloadId}";
break;

default:
$basePrefix = 'databases';
$channels[] = 'documents';
$channels[] = "{$basePrefix}.{$databaseId}.collections.{$resourceId}.documents";
$channels[] = "{$basePrefix}.{$databaseId}.collections.{$resourceId}.documents.{$payloadId}";
break;

}
return $channels;
}
}
120 changes: 120 additions & 0 deletions tests/e2e/Services/Realtime/RealtimeCustomClientTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -2559,6 +2559,126 @@ public function testChannelMemberships(): void
$client->close();
}

public function testChannelsTablesDB()
{
$user = $this->getUser();
$session = $user['session'] ?? '';
$projectId = $this->getProject()['$id'];

$database = $this->client->call(Client::METHOD_POST, '/tablesdb', array_merge([
'content-type' => 'application/json',
'x-appwrite-project' => $projectId,
'x-appwrite-key' => $this->getProject()['apiKey'],
], $this->getHeaders()), [
'databaseId' => ID::unique(),
'name' => 'TablesDB Realtime DB',
]);

$databaseId = $database['body']['$id'];

$table = $this->client->call(Client::METHOD_POST, '/tablesdb/' . $databaseId . '/tables', array_merge([
'content-type' => 'application/json',
'x-appwrite-project' => $projectId,
'x-appwrite-key' => $this->getProject()['apiKey'],
], $this->getHeaders()), [
'tableId' => ID::unique(),
'name' => 'Actors',
'permissions' => [
Permission::read(Role::any()),
Permission::create(Role::any()),
Permission::update(Role::any()),
Permission::delete(Role::any()),
],
]);

$tableId = $table['body']['$id'];

$column = $this->client->call(Client::METHOD_POST, '/tablesdb/' . $databaseId . '/tables/' . $tableId . '/columns/string', array_merge([
'content-type' => 'application/json',
'x-appwrite-project' => $projectId,
'x-appwrite-key' => $this->getProject()['apiKey'],
], $this->getHeaders()), [
'key' => 'name',
'size' => 256,
'required' => true,
]);

$this->assertEquals(202, $column['headers']['status-code']);

$this->assertEventually(function () use ($databaseId, $tableId) {
$column = $this->client->call(Client::METHOD_GET, '/tablesdb/' . $databaseId . '/tables/' . $tableId . '/columns/name', array_merge([
'content-type' => 'application/json',
'x-appwrite-project' => $this->getProject()['$id'],
'x-appwrite-key' => $this->getProject()['apiKey'],
], $this->getHeaders()));

$this->assertEquals(200, $column['headers']['status-code']);
$this->assertEquals('available', $column['body']['status']);
}, 120000, 500);

$client = $this->getWebsocket(['documents', 'collections'], [
'origin' => 'http://localhost',
'cookie' => 'a_session_' . $projectId . '=' . $session,
]);

$response = json_decode($client->receive(), true);

$this->assertArrayHasKey('type', $response);
$this->assertArrayHasKey('data', $response);
$this->assertEquals('connected', $response['type']);
$this->assertNotEmpty($response['data']);
$this->assertCount(2, $response['data']['channels']);
$this->assertContains('documents', $response['data']['channels']);

$rowId = ID::unique();

$row = $this->client->call(Client::METHOD_POST, '/tablesdb/' . $databaseId . '/tables/' . $tableId . '/rows', array_merge([
'content-type' => 'application/json',
'x-appwrite-project' => $projectId,
'x-appwrite-key' => $this->getProject()['apiKey'],
], $this->getHeaders()), [
'rowId' => $rowId,
'data' => [
'name' => 'Chris Evans',
],
'permissions' => [
Permission::read(Role::any()),
Permission::update(Role::any()),
Permission::delete(Role::any()),
],
]);

$this->assertEquals(201, $row['headers']['status-code']);

$response = json_decode($client->receive(), true);

$this->assertArrayHasKey('type', $response);
$this->assertArrayHasKey('data', $response);
$this->assertEquals('event', $response['type']);
$this->assertNotEmpty($response['data']);
$this->assertArrayHasKey('timestamp', $response['data']);

// Core channels for tablesdb row events
$this->assertContains('rows', $response['data']['channels']);

$this->assertContains("tablesdb.{$databaseId}.tables.{$tableId}.rows", $response['data']['channels']);
$this->assertContains("tablesdb.{$databaseId}.tables.{$tableId}.rows.{$rowId}", $response['data']['channels']);

// Collections-style compatibility channels
$this->assertContains('documents', $response['data']['channels']);
$this->assertContains("databases.{$databaseId}.tables.{$tableId}.rows", $response['data']['channels']);
$this->assertContains("databases.{$databaseId}.tables.{$tableId}.rows.{$rowId}", $response['data']['channels']);
$this->assertContains("databases.{$databaseId}.collections.{$tableId}.documents", $response['data']['channels']);
$this->assertContains("databases.{$databaseId}.collections.{$tableId}.documents.{$rowId}", $response['data']['channels']);

// Primary event should still be present
$this->assertContains("databases.{$databaseId}.tables.{$tableId}.rows.{$rowId}.create", $response['data']['events']);
$this->assertNotEmpty($response['data']['payload']);
$this->assertEquals('Chris Evans', $response['data']['payload']['name']);

$client->close();
}

public function testChannelDatabaseTransaction()
{
$user = $this->getUser();
Expand Down
Loading