Skip to content

Commit a804cba

Browse files
loks0nclaude
andcommitted
Refactor usage metrics to stateless publisher pattern
Co-Authored-By: Claude Opus 4.6 <[email protected]>
1 parent d8f7337 commit a804cba

50 files changed

Lines changed: 852 additions & 729 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

CONTRIBUTING.md

Lines changed: 29 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -409,14 +409,16 @@ Next follow the appropriate steps below depending on whether you're adding the m
409409

410410
**API**
411411

412-
In file `app/controllers/shared/api.php` On the database listener, add to an existing or create a new switch case. Add a call to the usage worker with your new metric const like so:
412+
In file `app/controllers/shared/api.php` On the database listener, add to an existing or create a new switch case. Accumulate metrics in the usage context like so:
413413

414414
```php
415415
case $document->getCollection() === 'teams':
416-
$queueForStatsUsage
417-
->addMetric(METRIC_TEAMS, $value); // per project
416+
$usage->addMetric(METRIC_TEAMS, $value); // per project
418417
break;
419418
```
419+
420+
The metrics will be automatically published by the shutdown hook at the end of the request. There is no need to manually trigger or publish.
421+
420422
There are cases when you need to handle metric that has a parent entity, like buckets.
421423
Files are linked to a parent bucket, you should verify you remove the files stats when you delete a bucket.
422424

@@ -425,14 +427,13 @@ In that case you need also to handle children removal using addReduce() method c
425427
```php
426428

427429
case $document->getCollection() === 'buckets': //buckets
428-
$queueForStatsUsage
429-
->addMetric(METRIC_BUCKETS, $value); // per project
430+
$usage->addMetric(METRIC_BUCKETS, $value); // per project
430431
if ($event === Database::EVENT_DOCUMENT_DELETE) {
431-
$queueForStatsUsage
432+
$usage
432433
->addReduce($document);
433434
}
434435
break;
435-
436+
436437
```
437438

438439
In addition, you will also need to add some logic to the `reduce()` method of the Usage worker located in `/src/Appwrite/Platform/Workers/Usage.php`, like so:
@@ -460,8 +461,12 @@ case $document->getCollection() === 'buckets':
460461

461462
**Background worker**
462463

463-
You need to inject the usage queue in the desired worker on the constructor method
464+
You need to inject the usage context and publisher in the desired worker on the constructor method
464465
```php
466+
use Appwrite\Usage\Context;
467+
use Appwrite\Event\Publisher\Usage as UsagePublisher;
468+
use Appwrite\Event\Message\Usage as UsageMessage;
469+
465470
/**
466471
* @throws Exception
467472
*/
@@ -474,24 +479,32 @@ public function __construct()
474479
->inject('dbForProject')
475480
->inject('queueForFunctions')
476481
->inject('queueForEvents')
477-
->inject('queueForStatsUsage')
482+
->inject('usage')
483+
->inject('publisherForUsage')
478484
->inject('log')
479-
->callback(fn (Message $message, Database $dbForProject, Func $queueForFunctions, Event $queueForEvents, StatsUsage $queueForStatsUsage, Log $log) => $this->action($message, $dbForProject, $queueForFunctions, $queueForEvents, $queueForStatsUsage, $log));
485+
->callback(fn (Message $message, Database $dbForProject, Func $queueForFunctions, Event $queueForEvents, Context $usage, UsagePublisher $publisherForUsage, Log $log) => $this->action($message, $dbForProject, $queueForFunctions, $queueForEvents, $usage, $publisherForUsage, $log));
480486
}
481487
```
482488

483-
and then trigger the queue with the new metric like so:
489+
and then accumulate metrics, create a message, and publish like so:
484490

485491
```php
486-
$queueForStatsUsage
492+
$usage
487493
->addMetric(METRIC_BUILDS, 1)
488494
->addMetric(METRIC_BUILDS_STORAGE, $build->getAttribute('size', 0))
489495
->addMetric(METRIC_BUILDS_COMPUTE, (int)$build->getAttribute('duration', 0) * 1000)
490-
->addMetric(str_replace('{functionInternalId}', $function->getSequence(), METRIC_FUNCTION_ID_BUILDS), 1)
496+
->addMetric(str_replace('{functionInternalId}', $function->getSequence(), METRIC_FUNCTION_ID_BUILDS), 1)
491497
->addMetric(str_replace('{functionInternalId}', $function->getSequence(), METRIC_FUNCTION_ID_BUILDS_STORAGE), $build->getAttribute('size', 0))
492-
->addMetric(str_replace('{functionInternalId}', $function->getSequence(), METRIC_FUNCTION_ID_BUILDS_COMPUTE), (int)$build->getAttribute('duration', 0) * 1000)
493-
->setProject($project)
494-
->trigger();
498+
->addMetric(str_replace('{functionInternalId}', $function->getSequence(), METRIC_FUNCTION_ID_BUILDS_COMPUTE), (int)$build->getAttribute('duration', 0) * 1000);
499+
500+
// Publish the accumulated metrics (workers don't have shutdown hooks)
501+
$message = new UsageMessage(
502+
project: $project,
503+
metrics: $usage->getMetrics(),
504+
reduce: $usage->getReduce()
505+
);
506+
$publisherForUsage->enqueue($message);
507+
$usage->reset();
495508
```
496509

497510

app/cli.php

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@
44

55
use Appwrite\Event\Certificate;
66
use Appwrite\Event\Delete;
7+
use Appwrite\Event\Event;
78
use Appwrite\Event\Func;
9+
use Appwrite\Event\Publisher\Usage as UsagePublisher;
810
use Appwrite\Event\StatsResources;
9-
use Appwrite\Event\StatsUsage;
1011
use Appwrite\Platform\Appwrite;
1112
use Appwrite\Runtimes\Runtimes;
13+
use Appwrite\Usage\Context as UsageContext;
1214
use Appwrite\Utopia\Database\Documents\User;
1315
use Executor\Executor;
1416
use Swoole\Runtime;
@@ -29,6 +31,7 @@
2931
use Utopia\Pools\Group;
3032
use Utopia\Queue\Broker\Pool as BrokerPool;
3133
use Utopia\Queue\Publisher;
34+
use Utopia\Queue\Queue;
3235
use Utopia\Registry\Registry;
3336
use Utopia\System\System;
3437
use Utopia\Telemetry\Adapter\None as NoTelemetry;
@@ -47,7 +50,7 @@
4750
$args = $platform->getEnv('argv');
4851

4952
\array_shift($args);
50-
if (!isset($args[0])) {
53+
if (! isset($args[0])) {
5154
Console::error('Missing task name');
5255
Console::exit(1);
5356
}
@@ -85,6 +88,7 @@
8588
$setResource('authorization', function () {
8689
$authorization = new Authorization();
8790
$authorization->disable();
91+
8892
return $authorization;
8993
}, []);
9094

@@ -113,7 +117,7 @@
113117
$collections = Config::getParam('collections', [])['console'];
114118
$last = \array_key_last($collections);
115119

116-
if (!($dbForPlatform->exists($dbForPlatform->getDatabase(), $last))) { /** TODO cache ready variable using registry */
120+
if (! ($dbForPlatform->exists($dbForPlatform->getDatabase(), $last))) { /** TODO cache ready variable using registry */
117121
throw new Exception('Tables not ready yet.');
118122
}
119123

@@ -122,10 +126,10 @@
122126
Console::warning($err->getMessage());
123127
sleep($sleep);
124128
}
125-
} while ($attempts < $maxAttempts && !$ready);
129+
} while ($attempts < $maxAttempts && ! $ready);
126130

127-
if (!$ready) {
128-
throw new Exception("Console is not ready yet. Please try again later.");
131+
if (! $ready) {
132+
throw new Exception('Console is not ready yet. Please try again later.');
129133
}
130134

131135
return $dbForPlatform;
@@ -163,7 +167,7 @@
163167
if (\in_array($dsn->getHost(), $sharedTables)) {
164168
$database
165169
->setSharedTables(true)
166-
->setTenant((int)$project->getSequence())
170+
->setTenant((int) $project->getSequence())
167171
->setNamespace($dsn->getParam('namespace'));
168172
} else {
169173
$database
@@ -184,7 +188,7 @@
184188
if (\in_array($dsn->getHost(), $sharedTables)) {
185189
$database
186190
->setSharedTables(true)
187-
->setTenant((int)$project->getSequence())
191+
->setTenant((int) $project->getSequence())
188192
->setNamespace($dsn->getParam('namespace'));
189193
} else {
190194
$database
@@ -207,8 +211,9 @@
207211
$database = null;
208212

209213
return function (?Document $project = null) use ($pools, $cache, $database, $authorization) {
210-
if ($database !== null && $project !== null && !$project->isEmpty() && $project->getId() !== 'console') {
211-
$database->setTenant((int)$project->getSequence());
214+
if ($database !== null && $project !== null && ! $project->isEmpty() && $project->getId() !== 'console') {
215+
$database->setTenant((int) $project->getSequence());
216+
212217
return $database;
213218
}
214219

@@ -224,8 +229,8 @@
224229
->setMaxQueryValues(APP_DATABASE_QUERY_MAX_VALUES);
225230

226231
// set tenant
227-
if ($project !== null && !$project->isEmpty() && $project->getId() !== 'console') {
228-
$database->setTenant((int)$project->getSequence());
232+
if ($project !== null && ! $project->isEmpty() && $project->getId() !== 'console') {
233+
$database->setTenant((int) $project->getSequence());
229234
}
230235

231236
return $database;
@@ -243,15 +248,16 @@
243248
$setResource('publisherMigrations', function (BrokerPool $publisher) {
244249
return $publisher;
245250
}, ['publisher']);
246-
$setResource('publisherStatsUsage', function (BrokerPool $publisher) {
247-
return $publisher;
248-
}, ['publisher']);
249251
$setResource('publisherMessaging', function (BrokerPool $publisher) {
250252
return $publisher;
251253
}, ['publisher']);
252-
$setResource('queueForStatsUsage', function (Publisher $publisher) {
253-
return new StatsUsage($publisher);
254-
}, ['publisher']);
254+
$setResource('usage', function () {
255+
return new UsageContext();
256+
}, []);
257+
$setResource('publisherForUsage', fn (Publisher $publisher) => new UsagePublisher(
258+
$publisher,
259+
new Queue(System::getEnv('_APP_STATS_USAGE_QUEUE_NAME', Event::STATS_USAGE_QUEUE_NAME))
260+
), ['publisher']);
255261
$setResource('queueForStatsResources', function (Publisher $publisher) {
256262
return new StatsResources($publisher);
257263
}, ['publisher']);

app/controllers/api/account.php

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
use Appwrite\Event\Event;
1515
use Appwrite\Event\Mail;
1616
use Appwrite\Event\Messaging;
17-
use Appwrite\Event\StatsUsage;
1817
use Appwrite\Extend\Exception;
1918
use Appwrite\Hooks\Hooks;
2019
use Appwrite\Network\Validator\Email as EmailValidator;
@@ -28,6 +27,7 @@
2827
use Appwrite\SDK\Response as SDKResponse;
2928
use Appwrite\Template\Template;
3029
use Appwrite\URL\URL as URLParser;
30+
use Appwrite\Usage\Context;
3131
use Appwrite\Utopia\Database\Documents\User;
3232
use Appwrite\Utopia\Database\Validator\CustomId;
3333
use Appwrite\Utopia\Database\Validator\Queries\Identities;
@@ -2801,12 +2801,12 @@ function sendSessionAlert(Locale $locale, Document $user, Document $project, arr
28012801
->inject('queueForMessaging')
28022802
->inject('locale')
28032803
->inject('timelimit')
2804-
->inject('queueForStatsUsage')
2804+
->inject('usage')
28052805
->inject('plan')
28062806
->inject('store')
28072807
->inject('proofForCode')
28082808
->inject('authorization')
2809-
->action(function (string $userId, string $phone, Request $request, Response $response, User $user, Document $project, array $platform, Database $dbForProject, Event $queueForEvents, Messaging $queueForMessaging, Locale $locale, callable $timelimit, StatsUsage $queueForStatsUsage, array $plan, Store $store, ProofsCode $proofForCode, Authorization $authorization) {
2809+
->action(function (string $userId, string $phone, Request $request, Response $response, User $user, Document $project, array $platform, Database $dbForProject, Event $queueForEvents, Messaging $queueForMessaging, Locale $locale, callable $timelimit, Context $usage, array $plan, Store $store, ProofsCode $proofForCode, Authorization $authorization) {
28102810
if (empty(System::getEnv('_APP_SMS_PROVIDER'))) {
28112811
throw new Exception(Exception::GENERAL_PHONE_DISABLED, 'Phone provider not configured');
28122812
}
@@ -2955,16 +2955,12 @@ function sendSessionAlert(Locale $locale, Document $user, Document $project, arr
29552955
$countryCode = $helper->parse($phone)->getCountryCode();
29562956

29572957
if (!empty($countryCode)) {
2958-
$queueForStatsUsage
2959-
->addMetric(str_replace('{countryCode}', $countryCode, METRIC_AUTH_METHOD_PHONE_COUNTRY_CODE), 1);
2958+
$usage->addMetric(str_replace('{countryCode}', $countryCode, METRIC_AUTH_METHOD_PHONE_COUNTRY_CODE), 1);
29602959
}
29612960
} catch (NumberParseException $e) {
29622961
// Ignore invalid phone number for country code stats
29632962
}
2964-
$queueForStatsUsage
2965-
->addMetric(METRIC_AUTH_METHOD_PHONE, 1)
2966-
->setProject($project)
2967-
->trigger();
2963+
$usage->addMetric(METRIC_AUTH_METHOD_PHONE, 1);
29682964
}
29692965

29702966
$token->setAttribute('secret', $secret);
@@ -4199,11 +4195,11 @@ function sendSessionAlert(Locale $locale, Document $user, Document $project, arr
41994195
->inject('project')
42004196
->inject('locale')
42014197
->inject('timelimit')
4202-
->inject('queueForStatsUsage')
4198+
->inject('usage')
42034199
->inject('plan')
42044200
->inject('proofForCode')
42054201
->inject('authorization')
4206-
->action(function (Request $request, Response $response, User $user, Database $dbForProject, Event $queueForEvents, Messaging $queueForMessaging, Document $project, Locale $locale, callable $timelimit, StatsUsage $queueForStatsUsage, array $plan, ProofsCode $proofForCode, Authorization $authorization) {
4202+
->action(function (Request $request, Response $response, User $user, Database $dbForProject, Event $queueForEvents, Messaging $queueForMessaging, Document $project, Locale $locale, callable $timelimit, Context $usage, array $plan, ProofsCode $proofForCode, Authorization $authorization) {
42074203
if (empty(System::getEnv('_APP_SMS_PROVIDER'))) {
42084204
throw new Exception(Exception::GENERAL_PHONE_DISABLED, 'Phone provider not configured');
42094205
}
@@ -4288,16 +4284,12 @@ function sendSessionAlert(Locale $locale, Document $user, Document $project, arr
42884284
$countryCode = $helper->parse($phone)->getCountryCode();
42894285

42904286
if (!empty($countryCode)) {
4291-
$queueForStatsUsage
4292-
->addMetric(str_replace('{countryCode}', $countryCode, METRIC_AUTH_METHOD_PHONE_COUNTRY_CODE), 1);
4287+
$usage->addMetric(str_replace('{countryCode}', $countryCode, METRIC_AUTH_METHOD_PHONE_COUNTRY_CODE), 1);
42934288
}
42944289
} catch (NumberParseException $e) {
42954290
// Ignore invalid phone number for country code stats
42964291
}
4297-
$queueForStatsUsage
4298-
->addMetric(METRIC_AUTH_METHOD_PHONE, 1)
4299-
->setProject($project)
4300-
->trigger();
4292+
$usage->addMetric(METRIC_AUTH_METHOD_PHONE, 1);
43014293
}
43024294

43034295
$verification->setAttribute('secret', $secret);

0 commit comments

Comments
 (0)