Skip to content

Commit 38b2eed

Browse files
loks0nclaude
andcommitted
Refactor usage metrics to stateless publisher pattern
Co-Authored-By: Claude Opus 4.6 <[email protected]>
1 parent 7d15d7e commit 38b2eed

50 files changed

Lines changed: 1052 additions & 929 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

CONTRIBUTING.md

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -409,14 +409,17 @@ Next follow the appropriate steps below depending on whether you're adding the m
409409

410410
**API**
411411

412-
In file `app/controllers/shared/api.php` On the database listener, add to an existing or create a new switch case. Add a call to the usage worker with your new metric const like so:
412+
In file `app/controllers/shared/api.php` On the database listener, add to an existing or create a new switch case. Accumulate metrics in the usage context like so:
413413

414414
```php
415415
case $document->getCollection() === 'teams':
416-
$queueForStatsUsage
416+
$usage
417417
->addMetric(METRIC_TEAMS, $value); // per project
418418
break;
419419
```
420+
421+
The metrics will be automatically published by the shutdown hook at the end of the request. There is no need to manually trigger or publish.
422+
420423
There are cases when you need to handle metric that has a parent entity, like buckets.
421424
Files are linked to a parent bucket, you should verify you remove the files stats when you delete a bucket.
422425

@@ -425,14 +428,14 @@ In that case you need also to handle children removal using addReduce() method c
425428
```php
426429

427430
case $document->getCollection() === 'buckets': //buckets
428-
$queueForStatsUsage
431+
$usage
429432
->addMetric(METRIC_BUCKETS, $value); // per project
430433
if ($event === Database::EVENT_DOCUMENT_DELETE) {
431-
$queueForStatsUsage
434+
$usage
432435
->addReduce($document);
433436
}
434437
break;
435-
438+
436439
```
437440

438441
In addition, you will also need to add some logic to the `reduce()` method of the Usage worker located in `/src/Appwrite/Platform/Workers/Usage.php`, like so:
@@ -460,8 +463,12 @@ case $document->getCollection() === 'buckets':
460463

461464
**Background worker**
462465

463-
You need to inject the usage queue in the desired worker on the constructor method
466+
You need to inject the usage context and publisher in the desired worker on the constructor method
464467
```php
468+
use Appwrite\Usage\Context;
469+
use Appwrite\Event\Publisher\Usage as UsagePublisher;
470+
use Appwrite\Event\Message\Usage as UsageMessage;
471+
465472
/**
466473
* @throws Exception
467474
*/
@@ -474,24 +481,32 @@ public function __construct()
474481
->inject('dbForProject')
475482
->inject('queueForFunctions')
476483
->inject('queueForEvents')
477-
->inject('queueForStatsUsage')
484+
->inject('usage')
485+
->inject('publisherForUsage')
478486
->inject('log')
479-
->callback(fn (Message $message, Database $dbForProject, Func $queueForFunctions, Event $queueForEvents, StatsUsage $queueForStatsUsage, Log $log) => $this->action($message, $dbForProject, $queueForFunctions, $queueForEvents, $queueForStatsUsage, $log));
487+
->callback(fn (Message $message, Database $dbForProject, Func $queueForFunctions, Event $queueForEvents, Context $usage, UsagePublisher $publisherForUsage, Log $log) => $this->action($message, $dbForProject, $queueForFunctions, $queueForEvents, $usage, $publisherForUsage, $log));
480488
}
481489
```
482490

483-
and then trigger the queue with the new metric like so:
491+
and then accumulate metrics, create a message, and publish like so:
484492

485493
```php
486-
$queueForStatsUsage
494+
$usage
487495
->addMetric(METRIC_BUILDS, 1)
488496
->addMetric(METRIC_BUILDS_STORAGE, $build->getAttribute('size', 0))
489497
->addMetric(METRIC_BUILDS_COMPUTE, (int)$build->getAttribute('duration', 0) * 1000)
490-
->addMetric(str_replace('{functionInternalId}', $function->getSequence(), METRIC_FUNCTION_ID_BUILDS), 1)
498+
->addMetric(str_replace('{functionInternalId}', $function->getSequence(), METRIC_FUNCTION_ID_BUILDS), 1)
491499
->addMetric(str_replace('{functionInternalId}', $function->getSequence(), METRIC_FUNCTION_ID_BUILDS_STORAGE), $build->getAttribute('size', 0))
492-
->addMetric(str_replace('{functionInternalId}', $function->getSequence(), METRIC_FUNCTION_ID_BUILDS_COMPUTE), (int)$build->getAttribute('duration', 0) * 1000)
493-
->setProject($project)
494-
->trigger();
500+
->addMetric(str_replace('{functionInternalId}', $function->getSequence(), METRIC_FUNCTION_ID_BUILDS_COMPUTE), (int)$build->getAttribute('duration', 0) * 1000);
501+
502+
// Publish the accumulated metrics (workers don't have shutdown hooks)
503+
$message = new UsageMessage(
504+
project: $project,
505+
metrics: $usage->getMetrics(),
506+
reduce: $usage->getReduce()
507+
);
508+
$publisherForUsage->enqueue($message);
509+
$usage->reset();
495510
```
496511

497512

app/cli.php

Lines changed: 35 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
<?php
22

3-
require_once __DIR__ . '/init.php';
3+
require_once __DIR__.'/init.php';
44

55
use Appwrite\Event\Certificate;
66
use Appwrite\Event\Delete;
77
use Appwrite\Event\Func;
88
use Appwrite\Event\StatsResources;
9-
use Appwrite\Event\StatsUsage;
109
use Appwrite\Platform\Appwrite;
1110
use Appwrite\Runtimes\Runtimes;
1211
use Appwrite\Utopia\Database\Documents\User;
@@ -39,15 +38,15 @@
3938
Config::setParam('runtimes', (new Runtimes('v5'))->getAll(supported: false));
4039

4140
// require controllers after overwriting runtimes
42-
require_once __DIR__ . '/controllers/general.php';
41+
require_once __DIR__.'/controllers/general.php';
4342

4443
global $register;
4544

4645
$platform = new Appwrite();
4746
$args = $platform->getEnv('argv');
4847

4948
\array_shift($args);
50-
if (!isset($args[0])) {
49+
if (! isset($args[0])) {
5150
Console::error('Missing task name');
5251
Console::exit(1);
5352
}
@@ -85,6 +84,7 @@
8584
$setResource('authorization', function () {
8685
$authorization = new Authorization();
8786
$authorization->disable();
87+
8888
return $authorization;
8989
}, []);
9090

@@ -113,7 +113,7 @@
113113
$collections = Config::getParam('collections', [])['console'];
114114
$last = \array_key_last($collections);
115115

116-
if (!($dbForPlatform->exists($dbForPlatform->getDatabase(), $last))) { /** TODO cache ready variable using registry */
116+
if (! ($dbForPlatform->exists($dbForPlatform->getDatabase(), $last))) { /** TODO cache ready variable using registry */
117117
throw new Exception('Tables not ready yet.');
118118
}
119119

@@ -122,10 +122,10 @@
122122
Console::warning($err->getMessage());
123123
sleep($sleep);
124124
}
125-
} while ($attempts < $maxAttempts && !$ready);
125+
} while ($attempts < $maxAttempts && ! $ready);
126126

127-
if (!$ready) {
128-
throw new Exception("Console is not ready yet. Please try again later.");
127+
if (! $ready) {
128+
throw new Exception('Console is not ready yet. Please try again later.');
129129
}
130130

131131
return $dbForPlatform;
@@ -153,7 +153,7 @@
153153
$dsn = new DSN($project->getAttribute('database'));
154154
} catch (\InvalidArgumentException) {
155155
// TODO: Temporary until all projects are using shared tables
156-
$dsn = new DSN('mysql://' . $project->getAttribute('database'));
156+
$dsn = new DSN('mysql://'.$project->getAttribute('database'));
157157
}
158158

159159
if (isset($databases[$dsn->getHost()])) {
@@ -163,13 +163,13 @@
163163
if (\in_array($dsn->getHost(), $sharedTables)) {
164164
$database
165165
->setSharedTables(true)
166-
->setTenant((int)$project->getSequence())
166+
->setTenant((int) $project->getSequence())
167167
->setNamespace($dsn->getParam('namespace'));
168168
} else {
169169
$database
170170
->setSharedTables(false)
171171
->setTenant(null)
172-
->setNamespace('_' . $project->getSequence());
172+
->setNamespace('_'.$project->getSequence());
173173
}
174174

175175
return $database;
@@ -184,13 +184,13 @@
184184
if (\in_array($dsn->getHost(), $sharedTables)) {
185185
$database
186186
->setSharedTables(true)
187-
->setTenant((int)$project->getSequence())
187+
->setTenant((int) $project->getSequence())
188188
->setNamespace($dsn->getParam('namespace'));
189189
} else {
190190
$database
191191
->setSharedTables(false)
192192
->setTenant(null)
193-
->setNamespace('_' . $project->getSequence());
193+
->setNamespace('_'.$project->getSequence());
194194
}
195195

196196
$database
@@ -207,8 +207,9 @@
207207
$database = null;
208208

209209
return function (?Document $project = null) use ($pools, $cache, $database, $authorization) {
210-
if ($database !== null && $project !== null && !$project->isEmpty() && $project->getId() !== 'console') {
211-
$database->setTenant((int)$project->getSequence());
210+
if ($database !== null && $project !== null && ! $project->isEmpty() && $project->getId() !== 'console') {
211+
$database->setTenant((int) $project->getSequence());
212+
212213
return $database;
213214
}
214215

@@ -224,8 +225,8 @@
224225
->setMaxQueryValues(APP_DATABASE_QUERY_MAX_VALUES);
225226

226227
// set tenant
227-
if ($project !== null && !$project->isEmpty() && $project->getId() !== 'console') {
228-
$database->setTenant((int)$project->getSequence());
228+
if ($project !== null && ! $project->isEmpty() && $project->getId() !== 'console') {
229+
$database->setTenant((int) $project->getSequence());
229230
}
230231

231232
return $database;
@@ -243,14 +244,17 @@
243244
$setResource('publisherMigrations', function (BrokerPool $publisher) {
244245
return $publisher;
245246
}, ['publisher']);
246-
$setResource('publisherStatsUsage', function (BrokerPool $publisher) {
247-
return $publisher;
248-
}, ['publisher']);
249247
$setResource('publisherMessaging', function (BrokerPool $publisher) {
250248
return $publisher;
251249
}, ['publisher']);
252-
$setResource('queueForStatsUsage', function (Publisher $publisher) {
253-
return new StatsUsage($publisher);
250+
$setResource('usage', function () {
251+
return new \Appwrite\Usage\Context();
252+
}, []);
253+
$setResource('publisherForUsage', function (Publisher $publisher) {
254+
$queueName = \Utopia\System\System::getEnv('_APP_STATS_USAGE_QUEUE_NAME', \Appwrite\Event\Event::STATS_USAGE_QUEUE_NAME);
255+
$queue = new \Utopia\Queue\Queue($queueName, 'utopia-queue');
256+
257+
return new \Appwrite\Event\Publisher\Usage($publisher, $queue);
254258
}, ['publisher']);
255259
$setResource('queueForStatsResources', function (Publisher $publisher) {
256260
return new StatsResources($publisher);
@@ -266,12 +270,12 @@
266270
}, ['publisher']);
267271
$setResource('logError', function (Registry $register) {
268272
return function (Throwable $error, string $namespace, string $action) use ($register) {
269-
Console::error('[Error] Timestamp: ' . date('c', time()));
270-
Console::error('[Error] Type: ' . get_class($error));
271-
Console::error('[Error] Message: ' . $error->getMessage());
272-
Console::error('[Error] File: ' . $error->getFile());
273-
Console::error('[Error] Line: ' . $error->getLine());
274-
Console::error('[Error] Trace: ' . $error->getTraceAsString());
273+
Console::error('[Error] Timestamp: '.date('c', time()));
274+
Console::error('[Error] Type: '.get_class($error));
275+
Console::error('[Error] Message: '.$error->getMessage());
276+
Console::error('[Error] File: '.$error->getFile());
277+
Console::error('[Error] Line: '.$error->getLine());
278+
Console::error('[Error] Trace: '.$error->getTraceAsString());
275279

276280
$logger = $register->get('logger');
277281

@@ -308,9 +312,9 @@
308312

309313
try {
310314
$responseCode = $logger->addLog($log);
311-
Console::info('Error log pushed with status code: ' . $responseCode);
315+
Console::info('Error log pushed with status code: '.$responseCode);
312316
} catch (Throwable $th) {
313-
Console::error('Error pushing log: ' . $th->getMessage());
317+
Console::error('Error pushing log: '.$th->getMessage());
314318
}
315319
}
316320
};
@@ -341,5 +345,5 @@
341345
$cli->shutdown()->action(fn () => Timer::clearAll());
342346

343347
Runtime::enableCoroutine(SWOOLE_HOOK_ALL);
344-
require_once __DIR__ . '/init/span.php';
348+
require_once __DIR__.'/init/span.php';
345349
run($cli->run(...));

app/controllers/api/account.php

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
use Appwrite\Event\Event;
1515
use Appwrite\Event\Mail;
1616
use Appwrite\Event\Messaging;
17-
use Appwrite\Event\StatsUsage;
1817
use Appwrite\Extend\Exception;
1918
use Appwrite\Hooks\Hooks;
2019
use Appwrite\Network\Validator\Email as EmailValidator;
@@ -28,6 +27,7 @@
2827
use Appwrite\SDK\Response as SDKResponse;
2928
use Appwrite\Template\Template;
3029
use Appwrite\URL\URL as URLParser;
30+
use Appwrite\Usage\Context;
3131
use Appwrite\Utopia\Database\Documents\User;
3232
use Appwrite\Utopia\Database\Validator\CustomId;
3333
use Appwrite\Utopia\Database\Validator\Queries\Identities;
@@ -2801,12 +2801,12 @@ function sendSessionAlert(Locale $locale, Document $user, Document $project, arr
28012801
->inject('queueForMessaging')
28022802
->inject('locale')
28032803
->inject('timelimit')
2804-
->inject('queueForStatsUsage')
2804+
->inject('usage')
28052805
->inject('plan')
28062806
->inject('store')
28072807
->inject('proofForCode')
28082808
->inject('authorization')
2809-
->action(function (string $userId, string $phone, Request $request, Response $response, User $user, Document $project, array $platform, Database $dbForProject, Event $queueForEvents, Messaging $queueForMessaging, Locale $locale, callable $timelimit, StatsUsage $queueForStatsUsage, array $plan, Store $store, ProofsCode $proofForCode, Authorization $authorization) {
2809+
->action(function (string $userId, string $phone, Request $request, Response $response, User $user, Document $project, array $platform, Database $dbForProject, Event $queueForEvents, Messaging $queueForMessaging, Locale $locale, callable $timelimit, Context $usage, array $plan, Store $store, ProofsCode $proofForCode, Authorization $authorization) {
28102810
if (empty(System::getEnv('_APP_SMS_PROVIDER'))) {
28112811
throw new Exception(Exception::GENERAL_PHONE_DISABLED, 'Phone provider not configured');
28122812
}
@@ -2955,16 +2955,13 @@ function sendSessionAlert(Locale $locale, Document $user, Document $project, arr
29552955
$countryCode = $helper->parse($phone)->getCountryCode();
29562956

29572957
if (!empty($countryCode)) {
2958-
$queueForStatsUsage
2958+
$usage
29592959
->addMetric(str_replace('{countryCode}', $countryCode, METRIC_AUTH_METHOD_PHONE_COUNTRY_CODE), 1);
29602960
}
29612961
} catch (NumberParseException $e) {
29622962
// Ignore invalid phone number for country code stats
29632963
}
2964-
$queueForStatsUsage
2965-
->addMetric(METRIC_AUTH_METHOD_PHONE, 1)
2966-
->setProject($project)
2967-
->trigger();
2964+
$usage->addMetric(METRIC_AUTH_METHOD_PHONE, 1);
29682965
}
29692966

29702967
$token->setAttribute('secret', $secret);
@@ -4199,11 +4196,11 @@ function sendSessionAlert(Locale $locale, Document $user, Document $project, arr
41994196
->inject('project')
42004197
->inject('locale')
42014198
->inject('timelimit')
4202-
->inject('queueForStatsUsage')
4199+
->inject('usage')
42034200
->inject('plan')
42044201
->inject('proofForCode')
42054202
->inject('authorization')
4206-
->action(function (Request $request, Response $response, User $user, Database $dbForProject, Event $queueForEvents, Messaging $queueForMessaging, Document $project, Locale $locale, callable $timelimit, StatsUsage $queueForStatsUsage, array $plan, ProofsCode $proofForCode, Authorization $authorization) {
4203+
->action(function (Request $request, Response $response, User $user, Database $dbForProject, Event $queueForEvents, Messaging $queueForMessaging, Document $project, Locale $locale, callable $timelimit, Context $usage, array $plan, ProofsCode $proofForCode, Authorization $authorization) {
42074204
if (empty(System::getEnv('_APP_SMS_PROVIDER'))) {
42084205
throw new Exception(Exception::GENERAL_PHONE_DISABLED, 'Phone provider not configured');
42094206
}
@@ -4288,16 +4285,13 @@ function sendSessionAlert(Locale $locale, Document $user, Document $project, arr
42884285
$countryCode = $helper->parse($phone)->getCountryCode();
42894286

42904287
if (!empty($countryCode)) {
4291-
$queueForStatsUsage
4288+
$usage
42924289
->addMetric(str_replace('{countryCode}', $countryCode, METRIC_AUTH_METHOD_PHONE_COUNTRY_CODE), 1);
42934290
}
42944291
} catch (NumberParseException $e) {
42954292
// Ignore invalid phone number for country code stats
42964293
}
4297-
$queueForStatsUsage
4298-
->addMetric(METRIC_AUTH_METHOD_PHONE, 1)
4299-
->setProject($project)
4300-
->trigger();
4294+
$usage->addMetric(METRIC_AUTH_METHOD_PHONE, 1);
43014295
}
43024296

43034297
$verification->setAttribute('secret', $secret);

0 commit comments

Comments
 (0)