Skip to content

ForwardCodeSolutions/ai-agent-engine

Repository files navigation

ai-agent-engine

Lightweight AI agent orchestration engine with YAML workflows and multi-LLM routing.

Define workflows as simple YAML files. The engine resolves step dependencies, routes each step to the optimal LLM provider (OpenAI, Anthropic), and executes them with built-in safety controls and observability.

Architecture

graph TB
    Client[Client / API]
    API[FastAPI Gateway]
    Config[YAML Config Loader]
    Router[LLM Router]
    Orchestrator[Workflow Orchestrator]

    RouterAgent[Router Agent]
    Planner[Planner Agent]
    Executor[Executor Agent]
    Validator[Validator Agent]

    OpenAI[OpenAI API]
    Anthropic[Anthropic API]
    Local[Local LLM]

    Safety[Safety Layer]
    Observe[Observability]

    Client --> API
    API --> Config
    Config --> Orchestrator
    Orchestrator --> Router
    Router --> OpenAI
    Router --> Anthropic
    Router --> Local
    Orchestrator --> RouterAgent
    Orchestrator --> Planner
    Orchestrator --> Executor
    Orchestrator --> Validator
    Orchestrator --> Safety
    Orchestrator --> Observe
Loading

Key Features

  • Declarative YAML Workflows -- Define multi-step AI pipelines as YAML with dependency graphs (DAGs)
  • Multi-LLM Routing -- Task-based routing across OpenAI and Anthropic with cost-aware fallback chains
  • 4 Built-in Agent Patterns -- Router, Planner, Executor, Validator -- composable building blocks
  • Safety Layer -- Token-bucket rate limiting, per-workflow cost controls, prompt injection filtering
  • Observability -- Structured logging (structlog), metrics collector, trace ID propagation
  • FastAPI Gateway -- REST API with authentication, CORS, and rate limiting

Quick Start

git clone https://github.com/ForwardCodeSolutions/ai-agent-engine.git
cd ai-agent-engine
cp .env.example .env
# Edit .env with your API keys
docker compose up -d

The API is available at http://localhost:8001/api/v1.

Local Development

uv sync
make dev    # Starts service via docker compose up -d
make check  # Lint + tests

Workflow Example

workflows/document-analyzer.yaml:

name: document-analyzer
description: Analyzes a document and creates a structured summary
version: "1.0"

settings:
  max_tokens: 4000
  timeout_seconds: 30
  cost_limit_usd: 0.50

steps:
  - id: classify
    agent: router
    model: gpt-4o-mini
    prompt: "Determine document type: legal, technical, general"

  - id: analyze
    agent: planner
    model: auto
    prompt: "Break down the analysis into steps for document type {classify.output}"
    depends_on: [classify]

  - id: execute
    agent: executor
    model: auto
    prompt: "Execute the analysis steps: {analyze.output}"
    depends_on: [analyze]

  - id: validate
    agent: validator
    model: gpt-4o-mini
    prompt: "Validate quality and completeness of the analysis: {execute.output}"
    depends_on: [execute]

Steps reference previous outputs with {step_id.output}. The engine resolves the DAG and executes in dependency order.

API Endpoints

All endpoints are under /api/v1. Protected endpoints require the X-API-Key header when API_KEY is set in .env.

Health (public)

curl http://localhost:8001/api/v1/health
{"status": "ok", "version": "0.1.0"}

Execute a Workflow

curl -X POST http://localhost:8001/api/v1/workflows/execute \
  -H "Content-Type: application/json" \
  -H "X-API-Key: your-api-key" \
  -d '{"workflow_name": "document-analyzer"}'
{
  "execution_id": "a1b2c3d4e5f67890",
  "workflow_name": "document-analyzer",
  "status": "completed",
  "steps": {
    "classify": "completed",
    "analyze": "completed",
    "execute": "completed",
    "validate": "completed"
  },
  "total_input_tokens": 120,
  "total_output_tokens": 480,
  "total_cost_usd": 0.0042
}

List Workflows

curl http://localhost:8001/api/v1/workflows \
  -H "X-API-Key: your-api-key"
[
  {"name": "document-analyzer", "description": "Analyzes a document and creates a structured summary", "version": "1.0", "steps": 4},
  {"name": "research-assistant", "description": "Researches a topic and synthesizes findings into a report", "version": "1.0", "steps": 4},
  {"name": "customer-support", "description": "Handles customer support requests with intent classification and routing", "version": "1.0", "steps": 4}
]

Validate a Workflow

curl -X POST http://localhost:8001/api/v1/workflows/validate \
  -H "Content-Type: application/json" \
  -H "X-API-Key: your-api-key" \
  -d '{"yaml_content": "name: test\ndescription: test\nversion: \"1.0\"\nsteps:\n  - id: s1\n    agent: router\n    model: auto\n    prompt: classify"}'
{"valid": true, "workflow_name": "test", "steps": 1, "errors": []}

List Models

curl http://localhost:8001/api/v1/models \
  -H "X-API-Key: your-api-key"
[
  {"name": "gpt-4o-mini", "provider": "openai", "cost_per_1k_input_tokens": 0.00015, "cost_per_1k_output_tokens": 0.0006},
  {"name": "claude-sonnet-4-0", "provider": "anthropic", "cost_per_1k_input_tokens": 0.003, "cost_per_1k_output_tokens": 0.015}
]

Get Metrics

curl http://localhost:8001/api/v1/metrics \
  -H "X-API-Key: your-api-key"
{
  "total_requests": 12,
  "total_failures": 0,
  "total_input_tokens": 360,
  "total_output_tokens": 1440,
  "total_cost_usd": 0.0126,
  "average_latency_ms": 245.5,
  "requests_by_model": {"gpt-4o-mini": 8, "claude-sonnet-4-0": 4},
  "cost_by_model": {"gpt-4o-mini": 0.003, "claude-sonnet-4-0": 0.0096}
}

Get Execution Status

curl http://localhost:8001/api/v1/workflows/a1b2c3d4e5f67890/status \
  -H "X-API-Key: your-api-key"

Design Decisions

Architecture decisions are documented as ADRs:

ADR Decision
ADR-001 YAML-based declarative workflows
ADR-002 Multi-LLM router with cost-aware fallback
ADR-003 Four built-in agent patterns
ADR-004 Structured observability with trace propagation
ADR-005 Safety layer with rate limiting and cost controls

How to Extend

Add a Custom Agent

Create a class that extends BaseAgent and register it:

from agent_engine.core.base_agent import AgentContext, AgentResult, BaseAgent
from agent_engine.llm.router import TaskComplexity

class SummarizerAgent(BaseAgent):
    @property
    def agent_type(self) -> str:
        return "summarizer"

    async def execute(self, context: AgentContext) -> AgentResult:
        prompt = "Summarize the following concisely:\n\n" + context.prompt
        return await self._call_llm(prompt, context, TaskComplexity.SIMPLE)

# Register it
registry.register("summarizer", SummarizerAgent)

Then use it in a workflow:

steps:
  - id: summarize
    agent: summarizer
    model: auto
    prompt: "Summarize: {previous_step.output}"
    depends_on: [previous_step]

Add a Custom LLM Provider

Implement the BaseLLMProvider interface:

from agent_engine.llm.base import BaseLLMProvider
from agent_engine.models.llm import LLMRequest, LLMResponse

class MyProvider(BaseLLMProvider):
    @property
    def provider_name(self) -> str:
        return "my-provider"

    @property
    def supported_models(self) -> list[str]:
        return ["my-model-v1"]

    async def complete(self, request: LLMRequest) -> LLMResponse:
        # Call your LLM API here
        ...

    async def stream(self, request: LLMRequest) -> AsyncIterator[str]:
        # Stream tokens from your LLM API
        ...

Then pass it to the LLMRouter:

router = LLMRouter(providers=[my_provider, openai_provider])

Tech Stack

Component Technology
Language Python 3.11+
Framework FastAPI + Uvicorn
Models Pydantic v2 + pydantic-settings
LLM Clients httpx (async)
Workflow Config PyYAML
Logging structlog
Package Manager uv + hatchling
Linting ruff
Testing pytest + pytest-asyncio (208 tests, 88% coverage)
Container Docker + Docker Compose

Test Suite

208 tests, 88% code coverage. All tests use MockProvider — no real API calls.

make test    # Run all tests with coverage
make check   # Lint + tests

Unit Tests (tests/unit/)

File What it covers
test_llm_router.py Model resolution, fallback chains, cost-aware routing
test_agents.py All 4 agent patterns, registry, token tracking
test_orchestrator.py Topological sort, DAG execution, prompt variable resolution, error handling
test_loader.py YAML parsing, validation, file/directory loading
test_validator.py Dependency validation, cycle detection, duplicate IDs
test_safety.py Rate limiter, cost controller, content filter, safety manager
test_observability.py Trace IDs, metrics collector, latency measurement
test_models.py Pydantic model validation for all domain models
test_health.py Health endpoint
test_example_workflows.py All 3 example YAML workflows load, validate, and execute

Integration Tests (tests/integration/)

File What it covers
test_api.py All API endpoints, auth, rate limiting, CORS
test_workflow_execution.py End-to-end workflow execution via API
test_workflow_dag_execution.py Full pipeline for all 3 YAML workflows, failed step skips dependents, {step_id.output} propagation
test_llm_routing_quality.py Simple→cheap model, complex→powerful model, fallback on provider error, cost-aware preference
test_safety_full_flow.py Rate limit blocks excess requests, cost limit stops workflow, content filter blocks injection, API requires auth
test_agent_patterns_integration.py RouterAgent classifies, PlannerAgent produces steps, ExecutorAgent uses context, ValidatorAgent checks quality
test_concurrent_workflows.py Parallel execution, context isolation, slow workflow doesn't block fast
test_observability_metrics.py Metrics accumulate across executions, trace ID consistency, per-model cost tracking

Documentation

License

MIT

About

Lightweight AI agent orchestration engine — YAML workflows, multi-LLM routing (OpenAI/Anthropic), built-in agent patterns, safety layer, and observability. Production-ready with 185 tests.

Topics

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors

Languages