Lightweight AI agent orchestration engine with YAML workflows and multi-LLM routing.
Define workflows as simple YAML files. The engine resolves step dependencies, routes each step to the optimal LLM provider (OpenAI, Anthropic), and executes them with built-in safety controls and observability.
graph TB
Client[Client / API]
API[FastAPI Gateway]
Config[YAML Config Loader]
Router[LLM Router]
Orchestrator[Workflow Orchestrator]
RouterAgent[Router Agent]
Planner[Planner Agent]
Executor[Executor Agent]
Validator[Validator Agent]
OpenAI[OpenAI API]
Anthropic[Anthropic API]
Local[Local LLM]
Safety[Safety Layer]
Observe[Observability]
Client --> API
API --> Config
Config --> Orchestrator
Orchestrator --> Router
Router --> OpenAI
Router --> Anthropic
Router --> Local
Orchestrator --> RouterAgent
Orchestrator --> Planner
Orchestrator --> Executor
Orchestrator --> Validator
Orchestrator --> Safety
Orchestrator --> Observe
- Declarative YAML Workflows -- Define multi-step AI pipelines as YAML with dependency graphs (DAGs)
- Multi-LLM Routing -- Task-based routing across OpenAI and Anthropic with cost-aware fallback chains
- 4 Built-in Agent Patterns -- Router, Planner, Executor, Validator -- composable building blocks
- Safety Layer -- Token-bucket rate limiting, per-workflow cost controls, prompt injection filtering
- Observability -- Structured logging (structlog), metrics collector, trace ID propagation
- FastAPI Gateway -- REST API with authentication, CORS, and rate limiting
git clone https://github.com/ForwardCodeSolutions/ai-agent-engine.git
cd ai-agent-engine
cp .env.example .env
# Edit .env with your API keys
docker compose up -dThe API is available at http://localhost:8001/api/v1.
uv sync
make dev # Starts service via docker compose up -d
make check # Lint + testsworkflows/document-analyzer.yaml:
name: document-analyzer
description: Analyzes a document and creates a structured summary
version: "1.0"
settings:
max_tokens: 4000
timeout_seconds: 30
cost_limit_usd: 0.50
steps:
- id: classify
agent: router
model: gpt-4o-mini
prompt: "Determine document type: legal, technical, general"
- id: analyze
agent: planner
model: auto
prompt: "Break down the analysis into steps for document type {classify.output}"
depends_on: [classify]
- id: execute
agent: executor
model: auto
prompt: "Execute the analysis steps: {analyze.output}"
depends_on: [analyze]
- id: validate
agent: validator
model: gpt-4o-mini
prompt: "Validate quality and completeness of the analysis: {execute.output}"
depends_on: [execute]Steps reference previous outputs with {step_id.output}. The engine resolves the DAG and executes in dependency order.
All endpoints are under /api/v1. Protected endpoints require the X-API-Key header when API_KEY is set in .env.
curl http://localhost:8001/api/v1/health{"status": "ok", "version": "0.1.0"}curl -X POST http://localhost:8001/api/v1/workflows/execute \
-H "Content-Type: application/json" \
-H "X-API-Key: your-api-key" \
-d '{"workflow_name": "document-analyzer"}'{
"execution_id": "a1b2c3d4e5f67890",
"workflow_name": "document-analyzer",
"status": "completed",
"steps": {
"classify": "completed",
"analyze": "completed",
"execute": "completed",
"validate": "completed"
},
"total_input_tokens": 120,
"total_output_tokens": 480,
"total_cost_usd": 0.0042
}curl http://localhost:8001/api/v1/workflows \
-H "X-API-Key: your-api-key"[
{"name": "document-analyzer", "description": "Analyzes a document and creates a structured summary", "version": "1.0", "steps": 4},
{"name": "research-assistant", "description": "Researches a topic and synthesizes findings into a report", "version": "1.0", "steps": 4},
{"name": "customer-support", "description": "Handles customer support requests with intent classification and routing", "version": "1.0", "steps": 4}
]curl -X POST http://localhost:8001/api/v1/workflows/validate \
-H "Content-Type: application/json" \
-H "X-API-Key: your-api-key" \
-d '{"yaml_content": "name: test\ndescription: test\nversion: \"1.0\"\nsteps:\n - id: s1\n agent: router\n model: auto\n prompt: classify"}'{"valid": true, "workflow_name": "test", "steps": 1, "errors": []}curl http://localhost:8001/api/v1/models \
-H "X-API-Key: your-api-key"[
{"name": "gpt-4o-mini", "provider": "openai", "cost_per_1k_input_tokens": 0.00015, "cost_per_1k_output_tokens": 0.0006},
{"name": "claude-sonnet-4-0", "provider": "anthropic", "cost_per_1k_input_tokens": 0.003, "cost_per_1k_output_tokens": 0.015}
]curl http://localhost:8001/api/v1/metrics \
-H "X-API-Key: your-api-key"{
"total_requests": 12,
"total_failures": 0,
"total_input_tokens": 360,
"total_output_tokens": 1440,
"total_cost_usd": 0.0126,
"average_latency_ms": 245.5,
"requests_by_model": {"gpt-4o-mini": 8, "claude-sonnet-4-0": 4},
"cost_by_model": {"gpt-4o-mini": 0.003, "claude-sonnet-4-0": 0.0096}
}curl http://localhost:8001/api/v1/workflows/a1b2c3d4e5f67890/status \
-H "X-API-Key: your-api-key"Architecture decisions are documented as ADRs:
| ADR | Decision |
|---|---|
| ADR-001 | YAML-based declarative workflows |
| ADR-002 | Multi-LLM router with cost-aware fallback |
| ADR-003 | Four built-in agent patterns |
| ADR-004 | Structured observability with trace propagation |
| ADR-005 | Safety layer with rate limiting and cost controls |
Create a class that extends BaseAgent and register it:
from agent_engine.core.base_agent import AgentContext, AgentResult, BaseAgent
from agent_engine.llm.router import TaskComplexity
class SummarizerAgent(BaseAgent):
@property
def agent_type(self) -> str:
return "summarizer"
async def execute(self, context: AgentContext) -> AgentResult:
prompt = "Summarize the following concisely:\n\n" + context.prompt
return await self._call_llm(prompt, context, TaskComplexity.SIMPLE)
# Register it
registry.register("summarizer", SummarizerAgent)Then use it in a workflow:
steps:
- id: summarize
agent: summarizer
model: auto
prompt: "Summarize: {previous_step.output}"
depends_on: [previous_step]Implement the BaseLLMProvider interface:
from agent_engine.llm.base import BaseLLMProvider
from agent_engine.models.llm import LLMRequest, LLMResponse
class MyProvider(BaseLLMProvider):
@property
def provider_name(self) -> str:
return "my-provider"
@property
def supported_models(self) -> list[str]:
return ["my-model-v1"]
async def complete(self, request: LLMRequest) -> LLMResponse:
# Call your LLM API here
...
async def stream(self, request: LLMRequest) -> AsyncIterator[str]:
# Stream tokens from your LLM API
...Then pass it to the LLMRouter:
router = LLMRouter(providers=[my_provider, openai_provider])| Component | Technology |
|---|---|
| Language | Python 3.11+ |
| Framework | FastAPI + Uvicorn |
| Models | Pydantic v2 + pydantic-settings |
| LLM Clients | httpx (async) |
| Workflow Config | PyYAML |
| Logging | structlog |
| Package Manager | uv + hatchling |
| Linting | ruff |
| Testing | pytest + pytest-asyncio (208 tests, 88% coverage) |
| Container | Docker + Docker Compose |
208 tests, 88% code coverage. All tests use MockProvider — no real API calls.
make test # Run all tests with coverage
make check # Lint + tests| File | What it covers |
|---|---|
test_llm_router.py |
Model resolution, fallback chains, cost-aware routing |
test_agents.py |
All 4 agent patterns, registry, token tracking |
test_orchestrator.py |
Topological sort, DAG execution, prompt variable resolution, error handling |
test_loader.py |
YAML parsing, validation, file/directory loading |
test_validator.py |
Dependency validation, cycle detection, duplicate IDs |
test_safety.py |
Rate limiter, cost controller, content filter, safety manager |
test_observability.py |
Trace IDs, metrics collector, latency measurement |
test_models.py |
Pydantic model validation for all domain models |
test_health.py |
Health endpoint |
test_example_workflows.py |
All 3 example YAML workflows load, validate, and execute |
| File | What it covers |
|---|---|
test_api.py |
All API endpoints, auth, rate limiting, CORS |
test_workflow_execution.py |
End-to-end workflow execution via API |
test_workflow_dag_execution.py |
Full pipeline for all 3 YAML workflows, failed step skips dependents, {step_id.output} propagation |
test_llm_routing_quality.py |
Simple→cheap model, complex→powerful model, fallback on provider error, cost-aware preference |
test_safety_full_flow.py |
Rate limit blocks excess requests, cost limit stops workflow, content filter blocks injection, API requires auth |
test_agent_patterns_integration.py |
RouterAgent classifies, PlannerAgent produces steps, ExecutorAgent uses context, ValidatorAgent checks quality |
test_concurrent_workflows.py |
Parallel execution, context isolation, slow workflow doesn't block fast |
test_observability_metrics.py |
Metrics accumulate across executions, trace ID consistency, per-model cost tracking |
- Architecture
- API Reference
- Workflow Specification
- Code Conventions
- Testing Strategy
- Architecture Decision Records
MIT