From 3b3abf0302e7ed87d86b948a092e355a890fce36 Mon Sep 17 00:00:00 2001 From: justxd22 Date: Sun, 12 Apr 2026 19:41:52 +0200 Subject: [PATCH] feat: add opt-in event retention purge (#359) --- CONFIGURATION.md | 3 + resources/default-settings.yaml | 7 + src/@types/repositories.ts | 15 +++ src/@types/services.ts | 4 + src/@types/settings.ts | 15 +++ src/app/maintenance-worker.ts | 38 +++++- src/factories/maintenance-service-factory.ts | 11 ++ src/factories/maintenance-worker-factory.ts | 8 +- src/repositories/event-repository.ts | 97 +++++++++++++- src/services/maintenance-service.ts | 38 ++++++ test/unit/app/maintenance-worker.spec.ts | 86 +++++++++++++ .../repositories/event-repository.spec.ts | 78 +++++++++++ .../unit/services/maintenance-service.spec.ts | 121 ++++++++++++++++++ 13 files changed, 517 insertions(+), 4 deletions(-) create mode 100644 src/factories/maintenance-service-factory.ts create mode 100644 src/services/maintenance-service.ts create mode 100644 test/unit/app/maintenance-worker.spec.ts create mode 100644 test/unit/services/maintenance-service.spec.ts diff --git a/CONFIGURATION.md b/CONFIGURATION.md index cd06011a..979fac3b 100644 --- a/CONFIGURATION.md +++ b/CONFIGURATION.md @@ -109,6 +109,9 @@ Running `nostream` for the first time creates the settings file in ` extends Pick, keyof Promise & ExposedPromiseKeys> { @@ -21,6 +35,7 @@ export interface IEventRepository { deleteByPubkeyAndIds(pubkey: Pubkey, ids: EventId[]): Promise deleteByPubkeyExceptKinds(pubkey: Pubkey, excludedKinds: number[]): Promise hasActiveRequestToVanish(pubkey: Pubkey): Promise + deleteExpiredAndRetained(options?: EventRetentionOptions): Promise } export interface IInvoiceRepository { diff --git a/src/@types/services.ts b/src/@types/services.ts index c1427cfb..e9fc6480 100644 --- a/src/@types/services.ts +++ b/src/@types/services.ts @@ -1,6 +1,10 @@ import { Invoice } from './invoice' import { Pubkey } from './base' +export interface IMaintenanceService { + clearOldEvents(): Promise +} + export interface IPaymentsService { getInvoiceFromPaymentsProcessor(invoice: string | Invoice): Promise> createInvoice( diff --git a/src/@types/settings.ts b/src/@types/settings.ts index a8121f7b..3dbf1bbf 100644 --- a/src/@types/settings.ts +++ b/src/@types/settings.ts @@ -69,6 +69,20 @@ export interface EventWhitelists { ipAddresses?: string[] } +export interface EventRetentionKindLimits { + whitelist?: (EventKinds | EventKindsRange)[] +} + +export interface EventRetentionPubkeyLimits { + whitelist?: Pubkey[] +} + +export interface EventRetentionLimits { + maxDays?: number + kind?: EventRetentionKindLimits + pubkey?: EventRetentionPubkeyLimits +} + export interface EventLimits { eventId?: EventIdLimits pubkey?: PubkeyLimits @@ -77,6 +91,7 @@ export interface EventLimits { content?: ContentLimits | ContentLimits[] rateLimits?: EventRateLimit[] whitelists?: EventWhitelists + retention?: EventRetentionLimits } export interface ClientSubscriptionLimits { diff --git a/src/app/maintenance-worker.ts b/src/app/maintenance-worker.ts index 04dd4b7e..686590c8 100644 --- a/src/app/maintenance-worker.ts +++ b/src/app/maintenance-worker.ts @@ -1,22 +1,25 @@ +import { IMaintenanceService, IPaymentsService } from '../@types/services' import { mergeDeepLeft, path, pipe } from 'ramda' import { IRunnable } from '../@types/base' import { createLogger } from '../factories/logger-factory' import { delayMs } from '../utils/misc' import { InvoiceStatus } from '../@types/invoice' -import { IPaymentsService } from '../@types/services' import { Settings } from '../@types/settings' const UPDATE_INVOICE_INTERVAL = 60000 +const CLEAR_OLD_EVENTS_TIMEOUT_MS = 5000 const debug = createLogger('maintenance-worker') export class MaintenanceWorker implements IRunnable { private interval: NodeJS.Timeout | undefined + private isRunning = false public constructor( private readonly process: NodeJS.Process, private readonly paymentsService: IPaymentsService, + private readonly maintenanceService: IMaintenanceService, private readonly settings: () => Settings, ) { this.process @@ -27,14 +30,43 @@ export class MaintenanceWorker implements IRunnable { .on('unhandledRejection', this.onError.bind(this)) } + private async clearOldEventsSafely(): Promise { + try { + await Promise.race([ + this.maintenanceService.clearOldEvents(), + delayMs(CLEAR_OLD_EVENTS_TIMEOUT_MS).then(() => { + throw new Error(`clearOldEvents timed out after ${CLEAR_OLD_EVENTS_TIMEOUT_MS}ms`) + }), + ]) + } catch (error) { + debug('unable to clear old events: %o', error) + } + } + public run(): void { - this.interval = setInterval(() => this.onSchedule(), UPDATE_INVOICE_INTERVAL) + this.interval = setInterval(async () => { + if (this.isRunning) { + debug('skipping scheduled maintenance run because previous run is still in progress') + return + } + + this.isRunning = true + try { + await this.onSchedule() + } catch (error) { + this.onError(error as Error) + } finally { + this.isRunning = false + } + }, UPDATE_INVOICE_INTERVAL) } private async onSchedule(): Promise { const currentSettings = this.settings() + const clearOldEventsPromise = this.clearOldEventsSafely() if (!path(['payments','enabled'], currentSettings)) { + await clearOldEventsPromise return } @@ -84,6 +116,8 @@ export class MaintenanceWorker implements IRunnable { debug('updated %d of %d invoices successfully', successful, invoices.length) } + + await clearOldEventsPromise } private onError(error: Error) { diff --git a/src/factories/maintenance-service-factory.ts b/src/factories/maintenance-service-factory.ts new file mode 100644 index 00000000..3edb9ea5 --- /dev/null +++ b/src/factories/maintenance-service-factory.ts @@ -0,0 +1,11 @@ +import { getMasterDbClient, getReadReplicaDbClient } from '../database/client' +import { createSettings } from './settings-factory' +import { EventRepository } from '../repositories/event-repository' +import { MaintenanceService } from '../services/maintenance-service' + +export const createMaintenanceService = () => { + return new MaintenanceService( + new EventRepository(getMasterDbClient(), getReadReplicaDbClient()), + createSettings + ) +} diff --git a/src/factories/maintenance-worker-factory.ts b/src/factories/maintenance-worker-factory.ts index 5f0f7abc..9eddcd1d 100644 --- a/src/factories/maintenance-worker-factory.ts +++ b/src/factories/maintenance-worker-factory.ts @@ -1,7 +1,13 @@ +import { createMaintenanceService } from './maintenance-service-factory' import { createPaymentsService } from './payments-service-factory' import { createSettings } from './settings-factory' import { MaintenanceWorker } from '../app/maintenance-worker' export const maintenanceWorkerFactory = () => { - return new MaintenanceWorker(process, createPaymentsService(), createSettings) + return new MaintenanceWorker( + process, + createPaymentsService(), + createMaintenanceService(), + createSettings + ) } diff --git a/src/repositories/event-repository.ts b/src/repositories/event-repository.ts index 021f9955..12e8406e 100644 --- a/src/repositories/event-repository.ts +++ b/src/repositories/event-repository.ts @@ -31,7 +31,7 @@ import { import { ContextMetadataKey, EventDeduplicationMetadataKey, EventExpirationTimeMetadataKey, EventKinds } from '../constants/base' import { DatabaseClient, EventId } from '../@types/base' import { DBEvent, Event } from '../@types/event' -import { IEventRepository, IQueryResult } from '../@types/repositories' +import { EventPurgeCounts, EventRetentionOptions, IEventRepository, IQueryResult } from '../@types/repositories' import { toBuffer, toJSON } from '../utils/transform' import { createLogger } from '../factories/logger-factory' import { isGenericTagQuery } from '../utils/filter' @@ -322,4 +322,99 @@ export class EventRepository implements IEventRepository { return Boolean(result) } + + public deleteExpiredAndRetained(options?: EventRetentionOptions): Promise { + const now = Math.floor(Date.now() / 1000) + const maxDays = options?.maxDays + + if (typeof maxDays !== 'number' || isNaN(maxDays) || maxDays <= 0) { + debug('skipping purge: retention.maxDays is not a positive number') + return Promise.resolve({ + deleted: 0, + expired: 0, + retained: 0, + }) + } + + const retentionLimit = now - (maxDays * 86400) + const batchSize = 1000 + + debug('deleting expired and retained events (retentionLimit: %d, now: %d, batchSize: %d)', retentionLimit, now, batchSize) + + const kindWhitelist = [ + ...(Array.isArray(options?.kindWhitelist) ? options.kindWhitelist : []), + EventKinds.REQUEST_TO_VANISH, + ].reduce<(number | [number, number])[]>((result, item) => { + const key = Array.isArray(item) + ? `range:${item[0]}-${item[1]}` + : `kind:${item}` + + if (!result.some((existing) => { + const existingKey = Array.isArray(existing) + ? `range:${existing[0]}-${existing[1]}` + : `kind:${existing}` + return existingKey === key + })) { + result.push(item) + } + + return result + }, []) + + const candidates = this.masterDbClient('events') + .select('event_id') + .where(function () { + this.where('expires_at', '<', now) + .orWhereNotNull('deleted_at') + .orWhere('event_created_at', '<', retentionLimit) + }) + .modify((query) => { + query.whereNot((builder) => { + kindWhitelist.forEach((kindOrRange) => { + if (Array.isArray(kindOrRange)) { + builder.orWhereBetween('event_kind', kindOrRange) + } else { + builder.orWhere('event_kind', kindOrRange) + } + }) + }) + + if (Array.isArray(options?.pubkeyWhitelist) && options.pubkeyWhitelist.length > 0) { + query.whereNotIn('event_pubkey', map(toBuffer)(options.pubkeyWhitelist)) + } + }) + .limit(batchSize) + + const query = this.masterDbClient('events') + .whereIn('event_id', candidates) + .del(['deleted_at', 'expires_at', 'event_created_at']) + + const mapToCounts = (deletedRows: Pick[]): EventPurgeCounts => deletedRows.reduce((counts, row) => { + if (row.deleted_at) { + counts.deleted += 1 + } else if (typeof row.expires_at === 'number' && row.expires_at < now) { + counts.expired += 1 + } else if (row.event_created_at < retentionLimit) { + counts.retained += 1 + } + + return counts + }, { + deleted: 0, + expired: 0, + retained: 0, + }) + + const getPromise = () => query.then((rows: any) => mapToCounts(rows)) + + return { + then: ( + onfulfilled?: ((value: EventPurgeCounts) => T1 | PromiseLike) | null, + onrejected?: ((reason: any) => T2 | PromiseLike) | null, + ) => getPromise().then(onfulfilled as any, onrejected as any), + catch: (onrejected?: ((reason: any) => T | PromiseLike) | null) => getPromise().catch(onrejected as any), + finally: (onfinally?: (() => void) | null) => getPromise().finally(onfinally as any), + toString: (): string => query.toString(), + } as Promise & { toString(): string } + } } diff --git a/src/services/maintenance-service.ts b/src/services/maintenance-service.ts new file mode 100644 index 00000000..0fd36f28 --- /dev/null +++ b/src/services/maintenance-service.ts @@ -0,0 +1,38 @@ +import { createLogger } from '../factories/logger-factory' +import { IEventRepository } from '../@types/repositories' +import { IMaintenanceService } from '../@types/services' +import { Settings } from '../@types/settings' + +const debug = createLogger('maintenance-service') + +export class MaintenanceService implements IMaintenanceService { + public constructor( + private readonly eventRepository: IEventRepository, + private readonly settings: () => Settings, + ) {} + + public async clearOldEvents(): Promise { + const currentSettings = this.settings() + const retention = currentSettings.limits?.event?.retention + const maxDays = retention?.maxDays + + if (typeof maxDays !== 'number' || isNaN(maxDays) || maxDays <= 0) { + return + } + + try { + debug('purging deleted, expired and old events') + const deletedCounts = await this.eventRepository.deleteExpiredAndRetained({ + maxDays, + kindWhitelist: retention?.kind?.whitelist, + pubkeyWhitelist: retention?.pubkey?.whitelist, + }) + const totalDeleted = deletedCounts.deleted + deletedCounts.expired + deletedCounts.retained + if (totalDeleted > 0) { + console.info(`[Maintenance] Deleted events: deleted=${deletedCounts.deleted}, expired=${deletedCounts.expired}, retained=${deletedCounts.retained}.`) + } + } catch (error) { + console.error('Unable to purge events. Reason:', error) + } + } +} diff --git a/test/unit/app/maintenance-worker.spec.ts b/test/unit/app/maintenance-worker.spec.ts new file mode 100644 index 00000000..a901fe0e --- /dev/null +++ b/test/unit/app/maintenance-worker.spec.ts @@ -0,0 +1,86 @@ +import * as chai from 'chai' +import * as sinon from 'sinon' +import sinonChai from 'sinon-chai' + +chai.use(sinonChai) + +const { expect } = chai + +import { IMaintenanceService, IPaymentsService } from '../../../src/@types/services' +import { MaintenanceWorker } from '../../../src/app/maintenance-worker' +import { Settings } from '../../../src/@types/settings' + +describe('MaintenanceWorker', () => { + let worker: MaintenanceWorker + let sandbox: sinon.SinonSandbox + let paymentsService: sinon.SinonStubbedInstance + let maintenanceService: sinon.SinonStubbedInstance + let settings: sinon.SinonStub + let processMock: any + + beforeEach(() => { + sandbox = sinon.createSandbox() + paymentsService = { + getPendingInvoices: sandbox.stub(), + getInvoiceFromPaymentsProcessor: sandbox.stub(), + updateInvoiceStatus: sandbox.stub(), + confirmInvoice: sandbox.stub(), + sendInvoiceUpdateNotification: sandbox.stub(), + } as any + maintenanceService = { + clearOldEvents: sandbox.stub(), + } as any + settings = sandbox.stub() + processMock = { + on: sandbox.stub().returnsThis(), + } + + worker = new MaintenanceWorker( + processMock as any, + paymentsService as any, + maintenanceService as any, + settings as any, + ) + }) + + afterEach(() => { + sandbox.restore() + }) + + describe('onSchedule', () => { + it('calls maintenance service and processes invoices', async () => { + const currentSettings: Settings = { + info: {} as any, + network: {} as any, + payments: { + enabled: true, + } as any, + } + settings.returns(currentSettings) + maintenanceService.clearOldEvents.resolves() + paymentsService.getPendingInvoices.resolves([]) + + await (worker as any).onSchedule() + + expect(maintenanceService.clearOldEvents).to.have.been.calledOnce + expect(paymentsService.getPendingInvoices).to.have.been.calledOnce + }) + + it('calls maintenance service even if payments are disabled', async () => { + const currentSettings: Settings = { + info: {} as any, + network: {} as any, + payments: { + enabled: false, + } as any, + } + settings.returns(currentSettings) + maintenanceService.clearOldEvents.resolves() + + await (worker as any).onSchedule() + + expect(maintenanceService.clearOldEvents).to.have.been.calledOnce + expect(paymentsService.getPendingInvoices).not.to.have.been.called + }) + }) +}) diff --git a/test/unit/repositories/event-repository.spec.ts b/test/unit/repositories/event-repository.spec.ts index 04ceec26..07bdf99f 100644 --- a/test/unit/repositories/event-repository.spec.ts +++ b/test/unit/repositories/event-repository.spec.ts @@ -489,6 +489,84 @@ describe('EventRepository', () => { }) }) + describe('deleteExpiredAndRetained', () => { + let clock: sinon.SinonFakeTimers + beforeEach(() => { + clock = sinon.useFakeTimers(1000000000) // 1970-01-12T13:46:40.000Z + }) + + afterEach(() => { + clock.restore() + }) + + it('does not delete anything when retention is not set', async () => { + const result = await repository.deleteExpiredAndRetained() + + expect(result).to.deep.equal({ + deleted: 0, + expired: 0, + retained: 0, + }) + }) + + it('does not delete anything when retention.maxDays is zero or negative', async () => { + expect(await repository.deleteExpiredAndRetained({ maxDays: 0 })).to.deep.equal({ + deleted: 0, + expired: 0, + retained: 0, + }) + expect(await repository.deleteExpiredAndRetained({ maxDays: -1 })).to.deep.equal({ + deleted: 0, + expired: 0, + retained: 0, + }) + }) + + it('deletes expired, deleted and old events when retention.maxDays is set', () => { + const query = repository.deleteExpiredAndRetained({ + maxDays: 7, + }).toString() + + expect(query).to.equal('delete from "events" where "event_id" in (select "event_id" from "events" where ("expires_at" < 1000000 or "deleted_at" is not null or "event_created_at" < 395200) and not ("event_kind" = 62) limit 1000) returning "deleted_at", "expires_at", "event_created_at"') + }) + + it('excludes whitelisted kinds and pubkeys from purge', () => { + const query = repository.deleteExpiredAndRetained({ + maxDays: 7, + kindWhitelist: [62], + pubkeyWhitelist: ['001122'], + }).toString() + + expect(query).to.equal('delete from "events" where "event_id" in (select "event_id" from "events" where ("expires_at" < 1000000 or "deleted_at" is not null or "event_created_at" < 395200) and not ("event_kind" = 62) and "event_pubkey" not in (X\'001122\') limit 1000) returning "deleted_at", "expires_at", "event_created_at"') + }) + + it('always excludes kind 62 from purge, even when no kind whitelist is configured', () => { + const query = repository.deleteExpiredAndRetained({ + maxDays: 7, + }).toString() + + expect(query).to.equal('delete from "events" where "event_id" in (select "event_id" from "events" where ("expires_at" < 1000000 or "deleted_at" is not null or "event_created_at" < 395200) and not ("event_kind" = 62) limit 1000) returning "deleted_at", "expires_at", "event_created_at"') + }) + + it('excludes whitelisted kind ranges from purge', () => { + const query = repository.deleteExpiredAndRetained({ + maxDays: 7, + kindWhitelist: [[10000, 20000]], + }).toString() + + expect(query).to.equal('delete from "events" where "event_id" in (select "event_id" from "events" where ("expires_at" < 1000000 or "deleted_at" is not null or "event_created_at" < 395200) and not ("event_kind" between 10000 and 20000 or "event_kind" = 62) limit 1000) returning "deleted_at", "expires_at", "event_created_at"') + }) + + it('excludes a complex mix of kinds and ranges from purge', () => { + const query = repository.deleteExpiredAndRetained({ + maxDays: 7, + kindWhitelist: [0, 62, [30000, 40000]], + }).toString() + + expect(query).to.equal('delete from "events" where "event_id" in (select "event_id" from "events" where ("expires_at" < 1000000 or "deleted_at" is not null or "event_created_at" < 395200) and not ("event_kind" = 0 or "event_kind" = 62 or "event_kind" between 30000 and 40000) limit 1000) returning "deleted_at", "expires_at", "event_created_at"') + }) + }) + describe('upsert', () => { it('replaces event based on event_pubkey and event_kind', () => { const event: Event = { diff --git a/test/unit/services/maintenance-service.spec.ts b/test/unit/services/maintenance-service.spec.ts new file mode 100644 index 00000000..ddd3b78e --- /dev/null +++ b/test/unit/services/maintenance-service.spec.ts @@ -0,0 +1,121 @@ +import * as chai from 'chai' +import * as sinon from 'sinon' +import sinonChai from 'sinon-chai' + +chai.use(sinonChai) + +const { expect } = chai + +import { IEventRepository } from '../../../src/@types/repositories' +import { MaintenanceService } from '../../../src/services/maintenance-service' +import { Settings } from '../../../src/@types/settings' + +describe('MaintenanceService', () => { + let service: MaintenanceService + let sandbox: sinon.SinonSandbox + let eventRepository: sinon.SinonStubbedInstance + let settings: sinon.SinonStub + + beforeEach(() => { + sandbox = sinon.createSandbox() + eventRepository = { + deleteExpiredAndRetained: sandbox.stub(), + } as any + settings = sandbox.stub() + + service = new MaintenanceService( + eventRepository as any, + settings as any, + ) + }) + + afterEach(() => { + sandbox.restore() + }) + + describe('clearOldEvents', () => { + it('purges events when retention.maxDays is a positive number', async () => { + const consoleInfoStub = sandbox.stub(console, 'info') + const currentSettings: Settings = { + limits: { + event: { + retention: { + maxDays: 30, + kind: { + whitelist: [62], + }, + pubkey: { + whitelist: ['aabbcc'], + }, + }, + }, + } as any, + } as any + settings.returns(currentSettings) + eventRepository.deleteExpiredAndRetained.resolves({ + deleted: 4, + expired: 3, + retained: 3, + }) + + await service.clearOldEvents() + + expect(eventRepository.deleteExpiredAndRetained).to.have.been.calledOnceWithExactly({ + maxDays: 30, + kindWhitelist: [62], + pubkeyWhitelist: ['aabbcc'], + }) + expect(consoleInfoStub).to.have.been.calledOnceWithExactly('[Maintenance] Deleted events: deleted=4, expired=3, retained=3.') + }) + + it('does not purge events when retention.maxDays is -1', async () => { + const currentSettings: Settings = { + limits: { + event: { + retention: { + maxDays: -1, + }, + }, + } as any, + } as any + settings.returns(currentSettings) + + await service.clearOldEvents() + + expect(eventRepository.deleteExpiredAndRetained).not.to.have.been.called + }) + + it('does not purge events when retention is not configured', async () => { + const currentSettings: Settings = { + limits: { + event: {}, + } as any, + } as any + settings.returns(currentSettings) + + await service.clearOldEvents() + + expect(eventRepository.deleteExpiredAndRetained).not.to.have.been.called + }) + + it('handles error during purge', async () => { + const currentSettings: Settings = { + limits: { + event: { + retention: { + maxDays: 30, + }, + }, + } as any, + } as any + settings.returns(currentSettings) + eventRepository.deleteExpiredAndRetained.rejects(new Error('DB Error')) + sandbox.stub(console, 'error') + + // Should not throw + await service.clearOldEvents() + + expect(eventRepository.deleteExpiredAndRetained).to.have.been.calledOnce + }) + }) +})