PHP API
Everything in Queuety is accessible through the Queuety\Queuety static class. It is the single entry point for dispatching jobs, building workflows, driving state machines, managing queues, and accessing internal subsystems.
use Queuety\Queuety;Standalone core access
Queuety's DB-backed runtime can be initialized outside WordPress.
That standalone path is for the orchestration core:
- jobs and workers
- workflows and signals
- state machines and events
- artifacts, logs, metrics, and schedules
It is not a replacement for the WordPress adapter. Queuety::on_action(),
replace_wp_cron(), the plugin bootstrap, and the real wp queuety ...
registration path still require WordPress.
Queuety::init( Connection $conn )
Initialize Queuety with a direct PDO-backed connection.
use Queuety\Connection;
use Queuety\Queuety;
$conn = new Connection(
host: '127.0.0.1:3306',
dbname: 'wordpress',
user: 'root',
password: 'secret',
prefix: 'wp_',
);
Queuety::init( $conn );Queuety::ensure_schema()
Ensure the Queuety runtime tables exist for the active connection.
Queuety::init( $conn );
Queuety::ensure_schema();For the full model and the WordPress boundary, see Standalone Use.
Jobs
Queuety::dispatch( $handler, $payload )
Dispatch a job. Accepts either a handler name string or a Contracts\Job instance. Returns a PendingJob builder for chaining options.
// Handler name + payload (original API)
Queuety::dispatch( 'send_email', [ 'to' => '[email protected]' ] );
// Contracts\Job instance (v0.6.0+)
Queuety::dispatch( new SendEmailJob( to: '[email protected]' ) );
// With options
Queuety::dispatch( 'send_email', [ 'to' => '[email protected]' ] )
->on_queue( 'emails' )
->with_priority( Priority::High )
->delay( 300 )
->max_attempts( 5 )
->rate_limit( 10, 60 )
->unique()
->after( $other_job_id );
// Get the job ID
$job_id = Queuety::dispatch( 'send_email', $payload )->id();When a Contracts\Job instance is passed, the serializer extracts the FQCN as the handler name and public properties as the payload. The $payload parameter is ignored when $handler is a Job instance.
PendingJob methods
| Method | Description |
|---|---|
on_queue( string $queue ) | Set the target queue |
with_priority( Priority $priority ) | Set the priority level |
delay( int $seconds ) | Delay before the job becomes available |
max_attempts( int $max ) | Maximum retry attempts |
rate_limit( int $max, int $window ) | Rate limit: max executions per window seconds |
unique() | Prevent duplicate dispatches |
after( int $job_id ) | Wait for another job to complete first |
concurrency_group( string $group, int $limit ) | Put the job into a shared worker-admission group |
cost_units( int $units ) | Attach relative execution cost metadata to the dispatch |
id() | Force dispatch and return the job ID |
concurrency_group() and cost_units() feed the broader Resource Management model, so the same dispatch can participate in count-based concurrency limits, weighted queue or provider budgets, and worker admission heuristics.
Queuety::dispatch_sync( $handler, $payload )
Execute a job synchronously in the current process without adding it to the queue.
// With a Job instance
Queuety::dispatch_sync( new SendEmailJob( to: '[email protected]', subject: 'Hi', body: 'Hello' ) );
// With a handler name and payload
Queuety::dispatch_sync( 'send_email', [ 'to' => '[email protected]' ] );The job's handle() method is called directly. No database row is created, no worker is involved, and middleware is not applied.
Queuety::batch( $jobs )
Dispatch multiple jobs in a single multi-row INSERT.
$job_ids = Queuety::batch( [
[ 'handler' => 'send_email', 'payload' => [ 'to' => '[email protected]' ], 'queue' => 'emails' ],
[ 'handler' => 'send_email', 'payload' => [ 'to' => '[email protected]' ], 'priority' => 2 ],
] );Queuety::create_batch( $jobs )
Create a BatchBuilder for dispatching a group of jobs with callbacks and progress tracking. See Batching for full details.
$batch = Queuety::create_batch( [
new ImportUsersJob( $chunk_1 ),
new ImportUsersJob( $chunk_2 ),
] )
->name( 'Import users' )
->then( ImportCompleteHandler::class )
->catch( ImportFailedHandler::class )
->finally( ImportCleanupHandler::class )
->allow_failures()
->on_queue( 'imports' )
->dispatch();BatchBuilder methods
| Method | Returns | Description |
|---|---|---|
name( string $name ) | self | Set a name for the batch |
on_queue( string $queue ) | self | Set the queue for all jobs in the batch |
then( string $handler_class ) | self | Handler called when the batch completes successfully |
catch( string $handler_class ) | self | Handler called when the first job in the batch fails |
finally( string $handler_class ) | self | Handler called when the batch finishes (success or failure) |
allow_failures() | self | Allow the then callback to fire even with failures |
dispatch() | Batch | Create the batch row and all jobs, returns the Batch value object |
Queuety::find_batch( $id )
Find a batch by ID. Returns a Batch value object or null.
$batch = Queuety::find_batch( $batch_id );
if ( null !== $batch ) {
echo $batch->progress() . '% complete';
}Worker control
Queuety::run_worker( $queue = 'default', $once = false )
Run one worker loop directly.
Queuety::run_worker( 'default' );
Queuety::run_worker( 'critical,default', true );Queuety::run_worker_pool( $workers, $queue = 'default' )
Run a fixed-size worker pool directly.
Queuety::run_worker_pool( 4, 'providers' );Queuety::run_auto_scaling_worker_pool( $min_workers, $max_workers, $queue = 'default' )
Run an adaptive worker pool directly.
Queuety::run_auto_scaling_worker_pool( 2, 6, 'providers' );Batch value object
| Method/Property | Type | Description |
|---|---|---|
$id | int | Batch ID |
$name | string|null | Optional batch name |
$total_jobs | int | Total number of jobs in the batch |
$pending_jobs | int | Jobs still pending |
$failed_jobs | int | Number of failed jobs |
$failed_job_ids | array | IDs of failed jobs |
$created_at | DateTimeImmutable | When the batch was created |
$finished_at | DateTimeImmutable|null | When the batch finished |
$cancelled_at | DateTimeImmutable|null | When the batch was cancelled |
progress() | int | Completion percentage (0-100) |
finished() | bool | Whether the batch has finished |
has_failures() | bool | Whether any job failed |
cancelled() | bool | Whether the batch was cancelled |
Queuety::chain( $jobs )
Create a ChainBuilder for sequential job execution. Each job depends on the previous one completing successfully.
$first_job_id = Queuety::chain( [
new FetchDataJob( $url ),
new ProcessDataJob(),
new NotifyCompleteJob(),
] )
->on_queue( 'pipeline' )
->catch( ChainFailedHandler::class )
->dispatch();ChainBuilder methods
| Method | Returns | Description |
|---|---|---|
on_queue( string $queue ) | self | Set the queue for all jobs in the chain |
catch( string $handler_class ) | self | Handler called when any job in the chain is permanently buried |
dispatch() | int | Create all jobs with sequential dependencies, returns the first job ID |
Queuety::retry( $job_id )
Retry a specific job. Resets it to pending status.
Queuety::retry( 42 );Queuety::retry_buried()
Retry all buried jobs. Returns the count of jobs retried.
$count = Queuety::retry_buried();Queuety::purge( $older_than_days )
Purge completed jobs. Returns the count of jobs purged.
$count = Queuety::purge(); // uses QUEUETY_RETENTION_DAYS
$count = Queuety::purge( 30 ); // purge jobs older than 30 daysQueues
Queuety::stats( $queue )
Get job counts grouped by status.
$stats = Queuety::stats(); // all queues
$stats = Queuety::stats( 'emails' ); // specific queue
// ['pending' => 12, 'processing' => 3, 'completed' => 150, 'failed' => 2, 'buried' => 1]Queuety::buried( $queue )
Get all buried jobs.
$jobs = Queuety::buried();
$jobs = Queuety::buried( 'emails' );Queuety::pause( $queue )
Pause a queue so workers skip it.
Queuety::pause( 'emails' );Queuety::resume( $queue )
Resume a paused queue.
Queuety::resume( 'emails' );Queuety::is_paused( $queue )
Check if a queue is paused.
if ( Queuety::is_paused( 'emails' ) ) {
// Queue is paused
}Workflows
Queuety::workflow( $name )
Start building a workflow. Returns a WorkflowBuilder.
$workflow_id = Queuety::workflow( 'generate_report' )
->then( FetchDataHandler::class )
->then( CallLLMHandler::class )
->then( FormatOutputHandler::class )
->dispatch( [ 'user_id' => 42 ] );WorkflowBuilder methods
| Method | Description |
|---|---|
then( string $class, ?string $name ) | Add a sequential step. Optional name for conditional branching. |
parallel( array $classes ) | Add a parallel step group. |
fan_out( string $items_key, string $handler_class, ?string $result_key = null, JoinMode $join_mode = JoinMode::All, ?int $quorum = null, ?string $reducer_class = null, ?string $name = null ) | Add a dynamic fan-out step that expands one branch job per item in workflow state. |
repeat_while( string $target_step, ?string $state_key = null, mixed $expected = true, ?string $name = null, ?string $condition_class = null, ?int $max_iterations = null ) | Jump back to an earlier named step while a public state key or LoopCondition class matches. |
repeat_until( string $target_step, ?string $state_key = null, mixed $expected = true, ?string $name = null, ?string $condition_class = null, ?int $max_iterations = null ) | Jump back to an earlier named step until a public state key or LoopCondition class matches. |
sub_workflow( string $name, WorkflowBuilder $builder ) | Add a sub-workflow step. |
sleep( int $seconds, int $minutes, int $hours, int $days ) | Add a durable timer step. All durations are summed. |
wait_for_signal( string $name, ?string $result_key = null, ?string $step_name = null, array $match_payload = [], ?string $correlation_key = null ) | Add a signal wait step. Optional match_payload and correlation_key let you ignore unrelated events. |
wait_for_signals( array $signal_names, WaitMode $mode = WaitMode::All, ?string $result_key = null, ?string $name = null, array $match_payload = [], ?string $correlation_key = null ) | Wait for all or any of several signals, with optional payload matching and correlation. |
await_approval( string $signal_name = 'approval', ?string $result_key = 'approval', ?string $name = null, array $match_payload = [], ?string $correlation_key = null ) | Wait for a human approval signal. |
await_decision( string $approve_signal = 'approved', string $reject_signal = 'rejected', ?string $result_key = 'decision', ?string $name = null, array $match_payload = [], ?string $correlation_key = null ) | Wait for either approval or rejection and store a structured decision payload. |
await_input( string $signal_name = 'input', ?string $result_key = 'input', ?string $name = null, array $match_payload = [], ?string $correlation_key = null ) | Wait for a human input signal. |
| `await_workflow( int | string $workflow, ?string $result_key = null, ?string $name = null )` |
| `await_workflows( array | string $workflows, WaitMode $mode = WaitMode::All, ?string $result_key = null, ?string $name = null, ?int $quorum = null )` |
await_workflow_group( string $group_key, WaitMode $mode = WaitMode::All, ?int $quorum = null, ?string $result_key = null, ?string $name = null ) | Wait on a previously spawned workflow group by internal group key. |
spawn_workflows( string $items_key, WorkflowBuilder $workflow_builder, string $result_key = 'spawned_workflow_ids', string $payload_key = 'item', bool $inherit_state = true, ?string $name = null, ?string $group_key = null ) | Dispatch one top-level workflow per runtime item, optionally tagging the spawned set with a durable group key. |
spawn_agents( string $items_key, WorkflowBuilder $workflow_builder, string $result_key = 'agent_workflow_ids', string $payload_key = 'agent_task', bool $inherit_state = true, ?string $name = null, ?string $group_key = null ) | Agent-oriented alias for spawn_workflows(). |
| `await_agents( array | string $workflows = 'agent_workflow_ids', WaitMode $mode = WaitMode::All, ?string $result_key = 'agent_results', ?string $name = null, ?int $quorum = null )` |
await_agent_group( string $group_key = 'agents', WaitMode $mode = WaitMode::All, ?int $quorum = null, ?string $result_key = 'agent_results', ?string $name = null ) | Agent-oriented alias for await_workflow_group(). |
version( string $version ) | Tag the workflow definition with an application-level version for inspection and exports. |
idempotency_key( string $key ) | Make workflow dispatch durable for a caller-supplied key. Dispatching again with the same key returns the original workflow ID. |
max_transitions( int $max ) | Fail the workflow if more than $max step transitions complete. |
max_fan_out_items( int $max ) | Fail the workflow if a fan_out() step tries to expand more than $max items. |
max_state_bytes( int $bytes ) | Fail the workflow if the public state grows beyond $bytes. |
max_cost_units( int $max ) | Fail the workflow if completed steps consume more than $max cost units. |
max_spawned_workflows( int $max ) | Fail the workflow before dispatching more than $max child workflows. |
compensate_with( string $class ) | Attach a compensation handler to the most recently added step. |
compensate_on_failure() | Run completed-step compensations automatically when the workflow fails. |
on_queue( string $queue ) | Set the queue for all steps. |
with_priority( Priority $priority ) | Set the priority for all steps. |
max_attempts( int $max ) | Set max retry attempts per step. |
on_cancel( string $class ) | Register a cleanup handler class for cancellation. See Cancellation. |
prune_state_after( int $steps = 2 ) | Enable automatic state pruning. Removes step outputs older than the threshold. See State Pruning. |
must_complete_within( int $seconds, int $minutes, int $hours, int $days ) | Set a deadline for the workflow. All durations are summed. See Deadlines. |
on_deadline( string $class ) | Register a handler called when the workflow deadline is exceeded. See Deadlines. |
dispatch( array $payload ) | Dispatch the workflow. Returns the workflow ID. |
Queuety::workflow_status( $workflow_id )
Get workflow status. Returns a WorkflowState object or null.
$state = Queuety::workflow_status( $workflow_id );
echo $state->name; // 'generate_report'
echo $state->status->value; // 'running'
echo $state->current_step; // 1
echo $state->total_steps; // 3
echo $state->state; // accumulated state array
echo $state->wait_type; // 'signal' or 'workflow' while blocked
echo $state->wait_mode; // 'all', 'any', or 'quorum'
echo $state->definition_version; // 'research.v2'
echo $state->definition_hash; // '4c7a...'
echo $state->idempotency_key; // 'brief:42:research'
echo $state->artifact_count; // 2
print_r( $state->budget ); // configured workflow budget + current usage
print_r( $state->wait_details ); // matched/remaining blockers, result key, correlation info
print_r( $state->artifact_keys ); // ['draft', 'citations']WorkflowState also exposes:
waiting_for: the signal names or workflow IDs currently blocking the runcurrent_step_name: the current step's configured name, if anywait_mode: the current wait mode (all,any, orquorum) while blockedwait_details: extra inspection data such as matched vs remaining blockersdefinition_version: the value set byversion(), if anydefinition_hash: a deterministic hash of the workflow definitionidempotency_key: the durable dispatch key, if anybudget: configured limits plus current usage, if guardrails are enabledartifact_count: number of stored artifacts for the workflowartifact_keys: stored artifact keys for quick inspection
Queuety::retry_workflow( $workflow_id )
Retry a failed workflow from its failed step.
Queuety::retry_workflow( $workflow_id );Throws RuntimeException if the workflow has already been compensated after a failure.
Queuety::pause_workflow( $workflow_id )
Pause a running workflow. Already queued work for the current step may still finish, but later steps are not enqueued until you resume the workflow.
Queuety::pause_workflow( $workflow_id );Queuety::resume_workflow( $workflow_id )
Resume a paused workflow.
Queuety::resume_workflow( $workflow_id );Queuety::cancel_workflow( $workflow_id )
Cancel a workflow and run any registered cleanup handlers. Sets the status to cancelled and buries all pending and processing jobs for the workflow.
Queuety::cancel_workflow( $workflow_id );Throws RuntimeException if the workflow is already completed or cancelled. See Cancellation for details.
Queuety::rewind_workflow( $workflow_id, $to_step )
Rewind a workflow to a previous step and re-run from there. The state is restored from the event log snapshot at the given step, and the next step is enqueued.
Queuety::rewind_workflow( $workflow_id, to_step: 1 );Throws RuntimeException if no state snapshot exists for the target step. Requires the Workflow Event Log. See Time Travel for details.
Queuety::fork_workflow( $workflow_id )
Fork a running workflow into an independent copy at its current state. Returns the new workflow ID.
$forked_id = Queuety::fork_workflow( $workflow_id );The forked workflow gets its own ID and operates independently from the original. See Forking for details.
Queuety::export_workflow( $workflow_id )
Export a workflow's full execution history as a JSON-serializable array. Includes the workflow row, all jobs, workflow events, and log entries.
$data = Queuety::export_workflow( $workflow_id );See Export and Replay for the JSON structure and use cases.
Queuety::replay_workflow( $data )
Replay an exported workflow in the current environment. Creates a new workflow from the export data and returns the new workflow ID.
$new_id = Queuety::replay_workflow( $data );See Export and Replay for details.
Queuety::replace_wp_cron()
Replace the built-in WordPress cron system with Queuety's scheduler. Intercepts wp_schedule_event(), wp_unschedule_event(), and wp_get_scheduled_event() and routes recurring events through the Queuety worker.
Queuety::replace_wp_cron();Only works when WordPress is loaded. See WP-Cron Replacement for details.
Queuety::restore_wp_cron()
Restore WordPress default cron behavior by removing the Queuety cron bridge hooks.
Queuety::restore_wp_cron();See WP-Cron Replacement for limitations.
Queuety::signal( $workflow_id, $name, $data )
Send a signal to a workflow. If the workflow is currently waiting for this signal, it resumes immediately. Otherwise, the signal is stored and picked up when the workflow reaches the corresponding wait_for_signal step.
Queuety::signal( $workflow_id, 'approved' );
// With data merged into workflow state
Queuety::signal( $workflow_id, 'approved', [
'approved_by' => '[email protected]',
] );See Workflow Signals for details.
Queuety::approve_workflow( $workflow_id, $data = [], $signal_name = 'approval' )
Semantic wrapper around signal() for approval flows.
Queuety::approve_workflow( $workflow_id, [
'reviewer' => '[email protected]',
] );Queuety::reject_workflow( $workflow_id, $data = [], $signal_name = 'rejected' )
Semantic wrapper around signal() for rejection flows.
Queuety::reject_workflow( $workflow_id, [
'reason' => 'needs citations',
] );Queuety::submit_workflow_input( $workflow_id, $data = [], $signal_name = 'input' )
Semantic wrapper around signal() for structured human input.
Queuety::submit_workflow_input( $workflow_id, [
'notes' => 'Ship after legal review',
] );Queuety::put_artifact( $workflow_id, $artifact_key, $content, $kind = 'json', $step_index = null, $metadata = [] )
Store one durable artifact for a workflow.
Queuety::put_artifact(
$workflow_id,
'research_brief',
[ 'summary' => 'done' ],
'json',
metadata: [ 'source' => 'agent' ],
);Queuety::put_current_artifact( $artifact_key, $content, $kind = 'json', $metadata = [] )
Store one artifact for the currently executing workflow step.
Queuety::put_current_artifact(
'draft',
[ 'status' => 'ready' ],
'json',
);Queuety::workflow_artifact( $workflow_id, $artifact_key )
Fetch one stored artifact for a workflow.
$artifact = Queuety::workflow_artifact( $workflow_id, 'draft' );Queuety::workflow_artifacts( $workflow_id, $include_content = false )
List artifacts for a workflow.
$artifacts = Queuety::workflow_artifacts( $workflow_id );Queuety::delete_workflow_artifact( $workflow_id, $artifact_key )
Delete one artifact for a workflow.
Queuety::delete_workflow_artifact( $workflow_id, 'draft' );State Machines
Queuety::machine( $name )
Start building a state machine. Returns a StateMachineBuilder.
use Queuety\Enums\StateMachineStatus;
$machine_id = Queuety::machine( 'agent_session' )
->state( 'awaiting_user' )
->on( 'user_message', 'planning' )
->state( 'planning' )
->action( PlanSessionAction::class )
->on( 'planned', 'completed' )
->state( 'completed', StateMachineStatus::Completed )
->dispatch( [ 'thread_id' => 42 ] );StateMachineBuilder methods
| Method | Description |
|---|---|
initial( string $state_name ) | Set the initial state explicitly. |
state( string $state_name, ?StateMachineStatus $terminal_status = null ) | Start configuring one state. |
action( string $action_class ) | Register a queued entry action for the current state. |
on( string $event, string $target, ?string $guard_class = null, ?string $name = null ) | Register one event transition from the current state. |
on_queue( string $queue ) | Set the queue used for queued state actions. |
with_priority( Priority $priority ) | Set the priority for queued state actions. |
max_attempts( int $max_attempts ) | Set the retry limit for queued state actions. |
version( string $version ) | Tag the machine definition with an application-level version. |
idempotency_key( string $key ) | Make dispatch durable for a caller-supplied key. Dispatching again with the same key returns the original machine ID. |
dispatch( array $initial_state = [] ) | Dispatch the machine and return the machine ID. |
State-entry action classes may also expose an optional config() method with the same resource-policy keys that workflow steps support: concurrency_group, concurrency_limit, and cost_units.
Queuety::dispatch_state_machine_definition( $definition, $initial_state = [], $options = [] )
Dispatch a serialized state machine definition bundle. Most application code should prefer machine()->dispatch(). This lower-level method exists so adapters can start a machine without depending on the builder directly.
$machine_id = Queuety::dispatch_state_machine_definition(
$definition,
[ 'thread_id' => 42 ],
[ 'idempotency_key' => 'thread:42' ],
);Queuety::machine_status( $machine_id )
Get the current machine status. Returns a StateMachineState object or null.
$status = Queuety::machine_status( $machine_id );
echo $status->name; // 'agent_session'
echo $status->status->value; // 'waiting_event'
echo $status->current_state; // 'awaiting_review'
print_r( $status->state ); // public machine state
print_r( $status->available_events ); // valid incoming events in the current stateStateMachineState also exposes:
definition_versiondefinition_hashidempotency_keyerror_messagecurrent_actionterminal_status
Queuety::machine_event( $machine_id, $event_name, $payload = [] )
Send one external event into a waiting machine.
Queuety::machine_event(
$machine_id,
'approve',
[
'approved' => true,
'reviewer' => '[email protected]',
]
);The payload is available to transition guards and is merged into the public machine state when the event is accepted.
Queuety::list_machines( $limit = 50, $status = null )
List persisted machine rows for inspection.
$machines = Queuety::list_machines();
$waiting = Queuety::list_machines( status: 'waiting_event' );Queuety::machine_timeline( $machine_id, $limit = 100, $offset = 0 )
Get the durable event timeline for one machine.
$timeline = Queuety::machine_timeline( $machine_id );
$page_two = Queuety::machine_timeline( $machine_id, 100, 100 );See State Machines for the lifecycle model and usage patterns.
Workflow templates
Queuety::define_workflow( $name )
Create a workflow builder for use as a template.
$builder = Queuety::define_workflow( 'onboarding' )
->then( CreateAccountHandler::class )
->then( SendWelcomeEmailHandler::class );Queuety::register_workflow_template( $builder )
Register a workflow template.
Queuety::register_workflow_template( $builder );Queuety::run_workflow( $name, $payload, $options = [] )
Dispatch a registered workflow template by name. Returns the workflow ID.
$workflow_id = Queuety::run_workflow( 'onboarding', [ 'email' => '[email protected]' ] );
$workflow_id = Queuety::run_workflow(
'onboarding',
[ 'email' => '[email protected]' ],
[ 'idempotency_key' => 'onboarding:[email protected]' ],
);Throws RuntimeException if the template is not registered.
Queuety::dispatch_workflow_definition( $definition, $payload, $options = [] )
Dispatch a workflow from a runtime definition bundle.
Most application code should prefer run_workflow() or workflow()->dispatch(). This lower-level method exists so reusable adapters can dispatch a serialized workflow definition without depending on internal workflow manager methods.
$workflow_id = Queuety::dispatch_workflow_definition(
$definition,
[ 'email' => '[email protected]' ],
[ 'idempotency_key' => 'onboarding:[email protected]' ],
);Queuety::on_action( $hook, $workflow, $map = null, $when = null, $idempotency_key = null, $priority = 10, $accepted_args = null )
Register a WordPress action that dispatches a workflow.
Queuety::on_action(
'save_post',
workflow: 'content_review',
map: static fn ( int $post_id, object $post, bool $update ): array => [
'post_id' => $post_id,
'post_type' => $post->post_type,
'update' => $update,
],
when: static fn ( int $post_id, object $post ): bool => 'post' === $post->post_type,
idempotency_key: static fn ( int $post_id ): string => "save_post:{$post_id}",
);$workflow may be either:
- a registered workflow template name
- an inline
WorkflowBuilder
map, when, and idempotency_key run synchronously inside the current WordPress request. Only the workflow steps themselves run asynchronously after dispatch.
See Action Triggers for the full guide.
Queuety::workflow_templates()
Get the workflow template registry.
$registry = Queuety::workflow_templates();
$template = $registry->get( 'onboarding' ); // WorkflowTemplate or nullQueuety::load_workflows( $directory, $recursive )
Load and register all workflow files from a directory. Each .php file should return a WorkflowBuilder. Returns the number of workflows registered.
$count = Queuety::load_workflows( __DIR__ . '/workflows/' );
$count = Queuety::load_workflows( __DIR__ . '/workflows/', recursive: true );Queuety::load_workflow_file( $file_path )
Load and register a single workflow file. Returns the WorkflowTemplate or null if the file does not return a builder.
$template = Queuety::load_workflow_file( __DIR__ . '/workflows/onboard-user.php' );Scheduling
Queuety::schedule( $handler, $payload )
Create a recurring schedule. Returns a PendingSchedule builder.
Queuety::schedule( 'cleanup_handler' )
->every( '1 hour' )
->on_queue( 'maintenance' );
Queuety::schedule( 'nightly_report' )
->cron( '0 3 * * *' )
->on_queue( 'reports' );PendingSchedule methods
| Method | Description |
|---|---|
every( string $interval ) | Set an interval expression (e.g. 1 hour, 30 minutes) |
cron( string $expression ) | Set a 5-field cron expression |
overlap( OverlapPolicy $policy ) | Set the overlap policy (default: Allow). See Scheduling. |
on_queue( string $queue ) | Set the target queue (default: default) |
id() | Force dispatch and return the schedule ID |
Queuety::scheduler()
Get the internal Scheduler instance.
$scheduler = Queuety::scheduler();
$schedules = $scheduler->list();
$scheduler->remove( 'handler_name' );
$count = $scheduler->tick(); // manually triggerHandler registration
Queuety::register( $name, $class )
Register a handler class under a name.
Queuety::register( 'send_email', SendEmailHandler::class );Queuety::discover_handlers( $directory, $namespace )
Auto-discover and register handler classes from a directory. Returns the count of handlers registered.
$count = Queuety::discover_handlers( __DIR__ . '/handlers', 'MyPlugin\\Handlers' );Queuety::registry()
Get the handler registry.
$registry = Queuety::registry();Observability
Queuety::logger()
Get the Logger instance for querying log entries.
$logger = Queuety::logger();
$logs = $logger->for_job( 42 );
$logs = $logger->for_workflow( 7 );
$logs = $logger->for_handler( 'send_email', 50 );
$logs = $logger->for_event( LogEvent::Failed, 50 );
$logs = $logger->since( new \DateTimeImmutable( '-1 hour' ), 100 );
$count = $logger->purge( 30 );Queuety::metrics()
Get the Metrics instance.
$stats = Queuety::metrics()->handler_stats( 60 ); // last 60 minutesQueuety::webhook_notifier()
Get the WebhookNotifier instance.
$notifier = Queuety::webhook_notifier();
$id = $notifier->register( 'job.buried', 'https://hooks.slack.com/...' );
$webhooks = $notifier->list();
$notifier->remove( $id );Queuety::rate_limiter()
Get the RateLimiter instance.
$limiter = Queuety::rate_limiter();
$limiter->register( 'call_openai', 60, 60 );Cache
Queuety::cache()
Get the active cache backend instance.
$cache = Queuety::cache();
$cache->set( 'my_key', 'my_value', 60 );
$value = $cache->get( 'my_key' );Queuety::set_cache( $cache )
Override the cache backend. Call this before Queuety::init() to use a custom implementation.
Queuety::set_cache( new RedisCache() );
Queuety::init( $connection );QUEUETY_CACHE_TTL
Default cache TTL in seconds (default: 5). Set this constant in wp-config.php to change the default TTL used by internal cache operations.
define( 'QUEUETY_CACHE_TTL', 10 );See Caching for details on backends and auto-detection.
Workflow event log
Queuety::workflow_events()
Get the WorkflowEventLog instance.
$log = Queuety::workflow_events();Queuety::workflow_timeline( $workflow_id, $limit = 100, $offset = 0 )
Get the full timeline of events for a workflow. Returns an array of event rows ordered by ID.
$events = Queuety::workflow_timeline( $workflow_id );
$next = Queuety::workflow_timeline( $workflow_id, 100, 100 );Each event row contains: id, workflow_id, step_index, handler, event, state_snapshot, step_output, error_message, duration_ms, created_at.
Queuety::workflow_state_at( $workflow_id, $step_index )
Get the state snapshot at a specific workflow step. Returns the full workflow state array as it existed after the given step completed, or null if not found.
$state = Queuety::workflow_state_at( $workflow_id, 2 );See Workflow Event Log for use cases and CLI commands.
Internal instances
These methods return internal instances for advanced use cases.
| Method | Returns | Description |
|---|---|---|
Queuety::queue() | Queue | Queue operations (claim, release, bury) |
Queuety::worker() | Worker | Worker process control |
Queuety::workflow_manager() | Workflow | Workflow orchestration |
Queuety::machines() | StateMachine | State machine lifecycle management |
Queuety::connection() | Connection | PDO database connection |
Queuety::batch_manager() | BatchManager | Batch lifecycle management (cancel, prune) |
Queuety::chunk_store() | ChunkStore | Chunk persistence for streaming steps |
Queuety::workflow_events() | WorkflowEventLog | Workflow event timeline and state snapshots |
Queuety::resource_manager() | ResourceManager | Worker admission profiles, weighted budgets, and system-memory inspection |
Queuety::cache() | Cache | Cache backend instance |
Testing
Queuety::fake()
Replace the queue with an in-memory fake for testing. Returns a QueueFake instance that records dispatched jobs without touching the database. create_batch() also records fake batch metadata; chains are asserted via the individual pushed jobs they dispatch.
$fake = Queuety::fake();
SendEmailJob::dispatch( '[email protected]', 'Hello', 'Hi!' );
$fake->assert_pushed( SendEmailJob::class );Queuety::reset()
Reset all internal state, including the queue fake. Call in tearDown().
Queuety::reset();QueueFake methods
| Method | Description |
|---|---|
assert_pushed( string $class, ?Closure $callback ) | Assert a job class was pushed. Optional callback filters by job data. |
assert_pushed_times( string $class, int $count ) | Assert a job was pushed exactly N times. |
assert_not_pushed( string $class ) | Assert a job was never pushed. |
assert_nothing_pushed() | Assert no jobs were pushed at all. |
assert_batched( ?Closure $callback ) | Assert a batch was dispatched. Optional callback filters by batch data. |
pushed( string $class ) | Get all recorded dispatches for a class. Returns an array. |
batches() | Get all recorded batches. Returns an array. |
reset() | Clear all recorded jobs and batches. |
See Testing for full examples.
Interfaces
Handler
Interface for simple job handlers.
interface Handler {
public function handle( array $payload ): void;
public function config(): array;
}config() may return queue, max_attempts, backoff, rate_limit, concurrency_group, concurrency_limit, and cost_units.
Step
Interface for workflow step handlers.
interface Step {
public function handle( array $state ): array;
public function config(): array;
}config() may return max_attempts, backoff, concurrency_group, concurrency_limit, and cost_units.
Contracts\FanOutHandler
Interface for dynamic fan-out branch handlers.
namespace Queuety\Contracts;
interface FanOutHandler {
public function handle_item( array $state, mixed $item, int $index ): array;
public function config(): array;
}config() may return max_attempts, backoff, rate_limit, concurrency_group, concurrency_limit, and cost_units.
Contracts\JoinReducer
Optional reducer for fan-out results.
namespace Queuety\Contracts;
interface JoinReducer {
public function reduce( array $state, array $fan_out ): array;
}Contracts\Compensation
Interface for step compensation handlers.
namespace Queuety\Contracts;
interface Compensation {
public function handle( array $state ): void;
}Contracts\LoopCondition
Interface for loop predicates used by repeat_until() and repeat_while() when a state-key comparison is too limited.
namespace Queuety\Contracts;
interface LoopCondition {
public function matches( array $state ): bool;
}Contracts\StateAction
Interface for queued state-entry actions.
namespace Queuety\Contracts;
interface StateAction {
public function handle( array $state, ?string $event = null, array $event_payload = [] ): array|string;
}Return public state updates. Include _event and optionally _event_payload when the action should transition immediately.
State actions may also define an optional config() method. Queuety reads concurrency_group, concurrency_limit, and cost_units from it when the entry action is queued.
Contracts\StateGuard
Interface for state-machine transition guards.
namespace Queuety\Contracts;
interface StateGuard {
public function allows( array $state, array $event_payload, string $event ): bool;
}Contracts\Job
Interface for self-contained dispatchable job classes. Job classes encapsulate their payload as public properties and execution logic in handle().
namespace Queuety\Contracts;
interface Job {
public function handle(): void;
}Job classes can optionally define a middleware() method returning an array of Contracts\Middleware instances.
Contracts\Cache
Interface for cache backends. See Caching for built-in implementations and custom backends.
namespace Queuety\Contracts;
interface Cache {
public function get( string $key ): mixed;
public function set( string $key, mixed $value, int $ttl = 0 ): void;
public function delete( string $key ): void;
public function has( string $key ): bool;
public function add( string $key, mixed $value, int $ttl = 0 ): bool;
public function flush(): void;
}| Method | Returns | Description |
|---|---|---|
get( $key ) | mixed | Retrieve a value. Returns null if not found or expired. |
set( $key, $value, $ttl ) | void | Store a value. TTL of 0 means no expiry. |
delete( $key ) | void | Remove a value. |
has( $key ) | bool | Check if a key exists and is not expired. |
add( $key, $value, $ttl ) | bool | Atomic set-if-not-exists. Returns true if the value was set. |
flush() | void | Remove all items from the cache. |
WorkflowEventLog
Records workflow step transitions with full state snapshots. Access via Queuety::workflow_events().
| Method | Returns | Description |
|---|---|---|
record_step_started( int $workflow_id, int $step_index, string $handler ) | void | Record a step_started event |
record_step_completed( int $workflow_id, int $step_index, string $handler, array $state_snapshot, array $step_output, int $duration_ms ) | void | Record a step_completed event with full state snapshot |
record_step_failed( int $workflow_id, int $step_index, string $handler, string $error, int $duration_ms ) | void | Record a step_failed event |
get_timeline( int $workflow_id, ?int $limit = 100, int $offset = 0 ) | array | Get workflow events in order, with optional paging |
get_state_at_step( int $workflow_id, int $step_index ) | array|null | Get the state snapshot after a specific step completed |
prune( int $older_than_days ) | int | Delete events older than the given number of days. Returns rows deleted. |
Contracts\StreamingStep
Interface for workflow step handlers that produce a stream of chunks. Each yielded value is persisted to the database immediately. See Streaming Steps.
namespace Queuety\Contracts;
interface StreamingStep {
public function stream( array $state, array $existing_chunks = [] ): \Generator;
public function on_complete( array $chunks, array $state ): array;
public function config(): array;
}| Method | Description |
|---|---|
stream( $state, $existing_chunks ) | Generator that yields string chunks. Each yield triggers an immediate DB write. On retry, $existing_chunks contains previously persisted chunks. |
on_complete( $chunks, $state ) | Called after the stream finishes. Returns data to merge into workflow state. |
config() | Optional configuration. Supports max_attempts, backoff, concurrency_group, concurrency_limit, and cost_units. |
ChunkStore
Manages chunk persistence for streaming steps. Access via Queuety::chunk_store().
| Method | Returns | Description |
|---|---|---|
get_chunks( int $job_id ) | string[] | Fetch all chunks for a job, ordered by chunk index |
append_chunk( int $job_id, int $chunk_index, string $content, ?int $workflow_id, ?int $step_index ) | void | Append a single chunk |
clear_chunks( int $job_id ) | void | Delete all chunks for a job |
chunk_count( int $job_id ) | int | Count chunks for a job |
get_accumulated( int $job_id ) | string | Concatenate all chunks into one string |
Heartbeat
Static helper for sending activity heartbeats from inside step handlers and streaming steps. See Heartbeats.
| Method | Description |
|---|---|
Heartbeat::init( int $job_id, Connection $conn ) | Set the job context. Called automatically by the worker. |
Heartbeat::beat( array $progress = [] ) | Send a heartbeat. Updates reserved_at and optionally stores progress data in heartbeat_data. |
Heartbeat::clear() | Clear the job context. Called automatically by the worker. |
Heartbeat::current_job_id() | Get the current job ID (for testing). |
Contracts\Middleware
Interface for job middleware. Middleware wraps job execution in an onion-style pipeline.
namespace Queuety\Contracts;
interface Middleware {
public function handle( object $job, \Closure $next ): void;
}Traits
Dispatchable
Provides static dispatch methods on job classes implementing Contracts\Job. Constructor arguments are passed through to create the job instance.
use Queuety\Contracts\Job;
use Queuety\Dispatchable;
class SendEmailJob implements Job {
use Dispatchable;
// ...
}
// Dispatches a new SendEmailJob instance
SendEmailJob::dispatch( '[email protected]', 'Subject', 'Body' );| Method | Returns | Description |
|---|---|---|
dispatch( ...$args ) | PendingJob | Create an instance and dispatch it |
dispatch_if( bool $condition, ...$args ) | PendingJob|null | Dispatch only if $condition is true |
dispatch_unless( bool $condition, ...$args ) | PendingJob|null | Dispatch only if $condition is false |
with_chain( array $jobs ) | ChainBuilder | Start a chain from this job class |
// Conditional dispatch
SendEmailJob::dispatch_if( $user->wants_email, $user->email, 'Hello', 'Hi!' );
SendEmailJob::dispatch_unless( $maintenance_mode, $user->email, 'Hello', 'Hi!' );
// Chain from a job class
FetchDataJob::with_chain( [
new ProcessDataJob(),
new NotifyCompleteJob(),
] )->dispatch();Job properties
Job classes can declare public properties that the worker reads via reflection to override retry and timeout behavior.
| Property | Type | Description |
|---|---|---|
$tries | int | Maximum number of attempts before the job is buried |
$timeout | int | Maximum execution time in seconds (requires pcntl) |
$backoff | array | Escalating delay values in seconds for each retry attempt |
class ImportProductsJob implements Job {
use Dispatchable;
public int $tries = 5;
public int $timeout = 120;
public array $backoff = [ 10, 30, 60, 120 ];
// ...
}failed() hook
When a job is buried after exhausting all retries, the worker calls a failed() method on the job instance if defined.
public function failed( \Throwable $exception ): void {
error_log( "Job failed: {$exception->getMessage()}" );
}Built-in middleware
All middleware classes implement Contracts\Middleware and are located in the Queuety\Middleware namespace.
| Class | Description |
|---|---|
RateLimited( int $max, int $window ) | Enforce a rate limit of $max executions per $window seconds |
Timeout( int $seconds ) | Kill the job after $seconds seconds (requires pcntl) |
UniqueJob( ?string $key ) | Prevent concurrent execution via a database lock |
WithoutOverlapping( string $key, int $release_after ) | Prevent overlapping execution with automatic lock expiry |
ThrottlesExceptions( int $max_attempts, int $decay_minutes ) | Throttle exceptions to prevent job storms when an external service is down |
See Middleware for usage details and examples.
Enums
Priority
enum Priority: int {
case Low = 0;
case Normal = 1;
case High = 2;
case Urgent = 3;
}JobStatus
enum JobStatus: string {
case Pending = 'pending';
case Processing = 'processing';
case Completed = 'completed';
case Failed = 'failed';
case Buried = 'buried';
}WorkflowStatus
enum WorkflowStatus: string {
case Running = 'running';
case Completed = 'completed';
case Failed = 'failed';
case Paused = 'paused';
case WaitingSignal = 'waiting_signal';
case WaitingWorkflow = 'waiting_workflow';
case Cancelled = 'cancelled';
}StateMachineStatus
enum StateMachineStatus: string {
case Running = 'running';
case WaitingEvent = 'waiting_event';
case Completed = 'completed';
case Failed = 'failed';
case Paused = 'paused';
case Cancelled = 'cancelled';
}BackoffStrategy
enum BackoffStrategy: string {
case Fixed = 'fixed';
case Linear = 'linear';
case Exponential = 'exponential';
}JoinMode
enum JoinMode: string {
case All = 'all';
case FirstSuccess = 'first_success';
case Quorum = 'quorum';
}WaitMode
enum WaitMode: string {
case All = 'all';
case Any = 'any';
}LogEvent
enum LogEvent: string {
case Started = 'started';
case Completed = 'completed';
case Failed = 'failed';
case Buried = 'buried';
case Retried = 'retried';
case WorkflowStarted = 'workflow_started';
case WorkflowCompleted = 'workflow_completed';
case WorkflowFailed = 'workflow_failed';
case WorkflowPaused = 'workflow_paused';
case WorkflowResumed = 'workflow_resumed';
case WorkflowCancelled = 'workflow_cancelled';
case WorkflowRewound = 'workflow_rewound';
case WorkflowForked = 'workflow_forked';
case WorkflowDeadlineExceeded = 'workflow_deadline_exceeded';
case Debug = 'debug';
}OverlapPolicy
enum OverlapPolicy: string {
case Allow = 'allow';
case Skip = 'skip';
case Buffer = 'buffer';
}See Scheduling for details on how each policy works.
Attributes
#[QueuetyHandler]
PHP 8 attribute for auto-registration. See PHP Attributes.
use Queuety\Attributes\QueuetyHandler;
#[QueuetyHandler( name: 'send_email', queue: 'emails', max_attempts: 5 )]
class SendEmailHandler implements Handler {
// ...
}CLI bridge
Queuety::cli_command()
Returns the published WP-CLI root command name.
The default root command is queuety. Define QUEUETY_CLI_COMMAND before Queuety boots if the command should be published under a different name.
Queuety::cli_command_map()
Returns the serializable Queuety CLI catalog for harnesses that need to understand the shipped command surface without scraping WP-CLI classes.
Each entry includes:
- a stable operation ID
- the relative CLI path
- the WP-CLI handler class and method
- the possible execution targets
- the adapter that normalizes raw CLI args into an execution plan
Queuety::resolve_cli_command( $path, $args, $assoc_args )
Turns a parsed CLI invocation into the execution plan the harness should run.
$plan = Queuety::resolve_cli_command(
[ 'workflow', 'export' ],
[ '42' ],
[ 'output' => '/tmp/workflow.json' ]
);
echo $plan['callable']; // Queuety\Queuety::export_workflow_to_file
print_r( $plan['arguments'] );Execution plans are intentionally transport-aware:
- commands resolve to public PHP callables with normalized arguments
- the catalog lists every possible execution target up front, while
callableandtransportidentify the branch chosen for the parsed input
This is the standard Queuety uses for agent harnesses: stable operation ID, public API target, and adapter-owned normalization for CLI-only concerns like queue defaults, JSON payload decoding, worker-pool branching, and workflow export file handling.