If This Then AT - A high-performance AT Protocol automation service written in Rust. Process events from Jetstream (Bluesky's firehose), webhooks, and periodic schedules using a blueprint-based system with DataLogic expressions for rule evaluation.
Version: 1.0.0-rc.1 Repository: github.com/graze-social/iftta
Version 1.0.0-rc.1 - This release candidate includes comprehensive error handling, production features, and extensive testing. While suitable for production use, thorough testing is recommended for your specific use case.
ifthisthenat is designed for high throughput and low latency:
- Binary serialization reduces cache storage by ~40% compared to JSON
- Multi-layer caching with in-memory, Redis, and PostgreSQL support
- Work shedding prevents unbounded queue growth
- Configurable TTLs for fine-tuning cache freshness vs. performance
- Connection pooling for Redis and PostgreSQL minimizes overhead
- Metrics support via StatsD for production monitoring
Benchmarks:
- Condition node evaluation: ~30-50μs per evaluation
- Sentiment analysis: ~300-500μs per analysis
- Blueprint caching: 40% storage reduction with binary serialization
- Jetstream processing: Supports partitioning for horizontal scaling
- Blueprint System: Define automation workflows with ordered node evaluation
- Entry Nodes:
jetstream_entry: Process AT Protocol firehose eventswebhook_entry: Handle external webhook triggersperiodic_entry: Cron-based scheduled taskszap_entry: Zapier integration support
- Processing Nodes:
condition: Boolean flow control with DataLogic expressionstransform: Data manipulation and transformationfacet_text: Process text with AT Protocol facets/mentionssentiment_analysis: Analyze text sentimentparse_aturi: Parse AT-URI stringsget_record: Retrieve AT Protocol records
- Action Nodes:
publish_record: Create AT Protocol recordspublish_webhook_direct: Synchronous webhook deliverypublish_webhook_queue: Async webhooks with retriesdebug_action: Development logging
- Production Features:
- OAuth 2.0/2.1/OIDC authentication with AT Protocol
- Multi-instance support with Redis coordination
- Distributed leadership election
- Comprehensive error handling with structured codes
- Binary serialization for 40% storage reduction
- Sentry error tracking integration
- Health checks and graceful shutdown
- Queue & Throttling:
- Multiple adapter support (MPSC, Redis)
- Blueprint throttling (per-authority, per-AT-URI)
- Work shedding for overload protection
- Denylist system for blocking bad actors
- Rust 1.70 or later
- PostgreSQL 14+ (required)
- Redis 6.2+ (optional, for multi-instance deployments)
- Docker & Docker Compose (for development)
# Clone the repository
git clone https://github.com/graze-social/iftta.git
cd ifthisthenat
# Start development dependencies
docker-compose up -d
# Run database migrations
sqlx migrate run
# Build the project
cargo build --release
# Run tests
cargo test
# Run with debug logging
RUST_LOG=debug cargo runifthisthenat requires minimal configuration to run. Configuration is validated at startup, and the service will exit with specific error codes if validation fails.
EXTERNAL_BASE: External hostname for service endpoints (e.g.,https://iftta.example.com)HTTP_COOKIE_KEY: Cookie encryption key (generate withopenssl rand -base64 64)ISSUER_DID: Service DID (must be valid did:plc or did:web)AIP_HOSTNAME: AT Protocol OAuth hostnameAIP_CLIENT_ID: OAuth client IDAIP_CLIENT_SECRET: OAuth client secret
# Generate cookie key
export HTTP_COOKIE_KEY=$(openssl rand -base64 64)
# Run with minimal config
EXTERNAL_BASE=http://localhost:8080 \
HTTP_COOKIE_KEY=$HTTP_COOKIE_KEY \
ISSUER_DID=did:plc:test123 \
AIP_HOSTNAME=test.com \
AIP_CLIENT_ID=test \
AIP_CLIENT_SECRET=test \
DATABASE_URL=postgres://user:pass@localhost:5432/ifthisthenat \
cargo runThis will start ifthisthenat with:
- HTTP server on port 8080 (default)
- In-memory caching only
- MPSC queue adapter for async processing
- Jetstream consumer enabled (default)
- Scheduler enabled for periodic tasks (default)
- Noop throttling and denylist (default)
Jetstream → Consumer → Blueprint Cache → Evaluator → Queue → Actions
↓ ↓ ↓ ↓ ↓ ↓
Webhooks Partitions Memory/Redis DataLogic Redis Publish/
Schedules /Memory Webhook
-
Blueprint System (
src/storage/blueprint.rs,src/processor.rs)- Blueprints define automation workflows
- Each blueprint has an AT-URI identifier
- Ordered node evaluation with early termination
-
Node Types (
src/engine/)- Entry nodes:
jetstream_entry,webhook_entry,periodic_entry,zap_entry - Processing nodes:
condition,transform,facet_text,sentiment_analysis,parse_aturi,get_record - Action nodes:
publish_record,publish_webhook_direct,publish_webhook_queue,debug_action
- Entry nodes:
-
Evaluation Engine (
src/engine/evaluator.rs)- DataLogic expression evaluation
- Sequential node processing
- Async evaluation via queue adapters
-
Consumer System (
src/consumer.rs)- Jetstream event processing with partitioning
- Multi-worker thread support
- Redis or file-based cursor persistence
-
Task System (
src/tasks/)- Blueprint evaluation with queue adapters
- Webhook delivery with retry logic
- Periodic task scheduling
- Task lifecycle management
-
Storage Layer (
src/storage/)- PostgreSQL for persistent data
- Redis for caching and queues (optional)
- Binary serialization for efficiency
- Blueprint and node caching with multiple backends
- Session management for OAuth
- Evaluation result storage (filesystem or noop)
Configuration follows the 12-factor app methodology using environment variables exclusively.
EXTERNAL_BASE: External base URLHTTP_COOKIE_KEY: Cookie encryption key (generate withopenssl rand -base64 64)ISSUER_DID: Service DID (valid did:plc or did:web)AIP_HOSTNAME: AT Protocol OAuth hostnameAIP_CLIENT_ID: OAuth client IDAIP_CLIENT_SECRET: OAuth client secret
HTTP_PORT: Server port (default: 8080)HTTP_STATIC_PATH: Static file path (default:static)HTTP_CLIENT_TIMEOUT: Client timeout in seconds (default: 8)USER_AGENT: Custom user agent (default:ifthisthenat/VERSION)DATABASE_URL: PostgreSQL URL (default:postgres://username:password@localhost:5432/ifthisthenat)
AIP_OAUTH_SCOPE: OAuth scope (default:openid email profile atproto account:email)ADMIN_DIDS: Semicolon-separated admin DIDsALLOWED_IDENTITIES: Comma-separated allowed DIDs
JETSTREAM_ENABLED: Enable Jetstream consumer (default: true)JETSTREAM_COLLECTIONS: Comma-separated collections to monitorJETSTREAM_WORKER_THREADS: Number of worker threads (default: 1)JETSTREAM_PARTITION: Partition config (format:INDEX:TOTAL)JETSTREAM_PARTITION_STRATEGY: Partitioning strategy (default:metrohash)JETSTREAM_CURSOR_PATH: File path for cursor persistence
WEBHOOK_QUEUE_ENABLED: Enable webhook queue (default: false)WEBHOOK_MAX_CONCURRENT: Max concurrent webhooks (default: 10)WEBHOOK_DEFAULT_TIMEOUT_MS: Default timeout in ms (default: 30000)WEBHOOK_MAX_RETRIES: Max retry attempts (default: 3)WEBHOOK_RETRY_DELAY_MS: Delay between retries (default: 1000)WEBHOOK_LOG_BODIES: Log webhook bodies (default: false)
BLUEPRINT_QUEUE_ADAPTER: Queue adapter type (mpscorredis, default:mpsc)BLUEPRINT_QUEUE_BUFFER_SIZE: MPSC buffer size (default: 10000)BLUEPRINT_QUEUE_MAX_RETRIES: Max retries (default: 3)BLUEPRINT_THROTTLER: Throttler type (noop,redis,redis-per-identity, default:noop)BLUEPRINT_THROTTLER_AUTHORITY_LIMIT: Authority rate limitBLUEPRINT_THROTTLER_AUTHORITY_WINDOW: Authority window in secondsBLUEPRINT_THROTTLER_ATURI_LIMIT: AT-URI rate limitBLUEPRINT_THROTTLER_ATURI_WINDOW: AT-URI window in seconds
SCHEDULER_ENABLED: Enable periodic scheduler (default: true)SCHEDULER_CHECK_INTERVAL_SECS: Check interval (default: 60)SCHEDULER_CACHE_RELOAD_SECS: Cache reload interval (default: 300)SCHEDULER_MAX_CONCURRENT: Max concurrent evaluations (default: 10)
SENTRY_DSN: Sentry DSN for error trackingSENTRY_ENVIRONMENT: Environment name (default:development)SENTRY_TRACES_SAMPLE_RATE: Trace sampling rate (0.0-1.0, default: 0.01)SENTRY_DEBUG: Enable Sentry debug mode (default: false)
REDIS_URL: Redis connection URL (enables advanced features)REDIS_CURSOR_TTL_SECONDS: TTL for Redis cursors (default: 86400)DENYLIST_TYPE: Denylist type (nooporpostgres, default:noop)DISABLED_NODE_TYPES: Comma-separated node types to disableEVALUATION_STORAGE_TYPE: Storage type (nooporfilesystem, default:noop)EVALUATION_STORAGE_DIRECTORY: Directory for filesystem storage
EXTERNAL_BASE=https://iftta.example.com \
HTTP_PORT=3000 \
HTTP_COOKIE_KEY=$COOKIE_KEY \
ISSUER_DID=did:plc:yourservice \
AIP_HOSTNAME=your-oauth.com \
AIP_CLIENT_ID=your-client-id \
AIP_CLIENT_SECRET=your-client-secret \
DATABASE_URL=postgres://user:pass@localhost/iftta \
JETSTREAM_ENABLED=true \
WEBHOOK_QUEUE_ENABLED=true \
DENYLIST_TYPE=postgres \
SENTRY_DSN=your-sentry-dsn \
SENTRY_ENVIRONMENT=production \
RUST_LOG=info \
./target/release/ifthisthenatEXTERNAL_BASE=https://iftta.example.com \
HTTP_PORT=3000 \
HTTP_COOKIE_KEY=$COOKIE_KEY \
ISSUER_DID=did:plc:yourservice \
AIP_HOSTNAME=your-oauth.com \
AIP_CLIENT_ID=your-client-id \
AIP_CLIENT_SECRET=your-client-secret \
DATABASE_URL=postgres://user:pass@pghost/iftta \
REDIS_URL=redis://redishost:6379 \
JETSTREAM_ENABLED=true \
JETSTREAM_WORKER_THREADS=4 \
JETSTREAM_PARTITION=0:3 \
BLUEPRINT_QUEUE_ADAPTER=redis \
BLUEPRINT_THROTTLER=redis-per-identity \
WEBHOOK_QUEUE_ENABLED=true \
DENYLIST_TYPE=postgres \
SENTRY_DSN=your-sentry-dsn \
SENTRY_ENVIRONMENT=production \
RUST_LOG=info \
./target/release/ifthisthenatGET /xrpc/tools.graze.ifthisthenat.getBlueprints- List blueprintsGET /xrpc/tools.graze.ifthisthenat.getBlueprint- Get specific blueprintPOST /xrpc/tools.graze.ifthisthenat.updateBlueprint- Create/update blueprintPOST /xrpc/tools.graze.ifthisthenat.deleteBlueprint- Delete blueprint
GET /_health- Basic health checkGET /_health/ready- Readiness check (all components)GET /_health/live- Liveness checkGET /_health/components- Individual component status
GET /oauth/authorize- Start OAuth flowGET /oauth/callback- OAuth callbackPOST /oauth/refresh- Refresh tokens
GET /.well-known/atproto-did- AT Protocol DID documentGET /.well-known/did.json- DID documentGET /- Landing page
# Build the image
docker build -t ifthisthenat:latest .
# Run with environment file
docker run -d \
--name ifthisthenat \
--env-file .env \
-p 8080:8080 \
ifthisthenat:latestversion: '3.8'
services:
app:
build: .
ports:
- "8080:8080"
env_file: .env
depends_on:
- postgres
- redis
postgres:
image: postgres:16-alpine
environment:
POSTGRES_DB: ifthisthenat
POSTGRES_USER: iftta
POSTGRES_PASSWORD: secret
volumes:
- postgres_data:/var/lib/postgresql/data
redis:
image: redis:7-alpine
volumes:
- redis_data:/data
volumes:
postgres_data:
redis_data:The engine uses DataLogic for dynamic evaluation. Common patterns:
// Value extraction
{"val": ["path", "to", "field"]}
// Comparisons
{"==": [{"val": ["field"]}, "value"]}
// String operations
{"cat": ["prefix-", {"val": ["field"]}, "-suffix"]}
// Time functions
{"now": []}
{"datetime": [1234567890]}
// Complex conditions
{"and": [
{"==": [{"val": ["kind"]}, "commit"]},
{"contains": [{"val": ["collection"]}, "app.bsky.feed.post"]}
]}
// Hash functions for unique IDs
{"metrohash": ["string1", "string2"]}
// Parse AT-URI components
{"parse_aturi": ["at://did:plc:example/app.bsky.feed.post/abc"]}
// Process text with facets/mentions
{"facet_text": ["Hello @user.bsky.social!"]}src/
├── bin/ # Entry point
├── engine/ # Node evaluation engine
├── storage/ # Database and cache layers
├── http/ # HTTP server and handlers
├── tasks/ # Background task system
│ ├── blueprint.rs # Blueprint evaluation task
│ ├── blueprint_adapter.rs # Queue adapter for blueprints
│ ├── webhook.rs # Webhook delivery task
│ ├── scheduler.rs # Periodic task scheduler
│ └── manager.rs # Task lifecycle management
├── consumer.rs # Jetstream consumer
├── processor.rs # Blueprint processor
├── metrics.rs # Metrics publishing
├── serialization.rs # Binary serialization
└── errors.rs # Error definitions
# Run all tests
cargo test
# Run specific test
cargo test test_name
# Run with output
cargo test -- --nocapture
# Run integration tests
cargo test --test '*'# Check code
cargo check
# Format code
cargo fmt
# Lint code
cargo clippy
# Watch for changes
cargo watch -x test -x run- API Documentation - Blueprint structure and node payloads
- Development Guide - Architecture details and patterns
- Contributing Guide - How to contribute
- Changelog - Release history and changes
Contributions are welcome! Please read our Contributing Guide for details on our code of conduct and the process for submitting pull requests.
This project is licensed under the MIT License. Copyright © 2025 Nick Gerakines and Graze Social. See the LICENSE file for details.
- AT Protocol team for the protocol and specifications
- Bluesky team for Jetstream and the firehose
- Rust community for excellent libraries and tooling
- Contributors and early adopters for feedback and improvements