Keep callm and process thousands of requests without (rate) limits
Installation β’ Quick Start β’ Providers β’ Examples β’ Contributing
Building LLM-powered applications often means processing thousands of API requests. You've probably experienced:
| Problem | Without callm | With callm |
|---|---|---|
| Rate limit errors | Constant 429 errors, manual sleep/retry | Automatic RPM & TPM throttling |
| Retry logic | Write custom backoff for each project | Built-in exponential backoff with jitter |
| Token tracking | No visibility into usage | Real-time token consumption metrics |
| Boilerplate code | Copy-paste the same async code everywhere | One function call, any provider |
| Waiting for batch APIs | Provider batch APIs take up to 24 hours | Results in minutes, not hours |
| Multiple SDKs | Install openai, anthropic, cohere, ... | One library, all providers |
Stop rewriting the same parallel processing code. callm handles the infrastructure so you can focus on your application.
Testing multiple providers? Just swap the provider classβno new dependencies, no code changes. Find what works best for your use case.
pip install callm-pyFrom source:
git clone https://github.com/milistu/callm.git
cd callm
pip install -e .Process 1,000 product descriptions to extract structured dataβin under a minute:
import asyncio
from callm import process_requests, RateLimitConfig
from callm.providers import OpenAIProvider
# Configure your provider
provider = OpenAIProvider(
api_key="sk-...",
model="gpt-5-mini",
request_url="https://api.openai.com/v1/responses",
)
# Your data processing requests
products = [
{"id": 1, "description": "Nike Air Max 90 - Classic sneakers in white/black, size 10"},
{"id": 2, "description": "Sony WH-1000XM5 Wireless Headphones - Noise cancelling, 30hr battery"},
# ... thousands more
]
requests = [
{
"input": f"Extract brand, category, and key features from: {p['description']}",
"metadata": {"product_id": p["id"]},
}
for p in products
]
async def main():
results = await process_requests(
provider=provider,
requests=requests,
rate_limit=RateLimitConfig(
max_requests_per_minute=5_000, # Stay under your tier limit
max_tokens_per_minute=2_000_000,
),
)
print(f"Processed {results.stats.successful} requests in {results.stats.duration_seconds:.1f}s")
print(f"Tokens used: {results.stats.total_input_tokens + results.stats.total_output_tokens:,}")
# Access results
for result in results.successes:
print(f"Product {result.metadata['product_id']}: {result.response}")
asyncio.run(main())- Precise Rate Limiting β Token buckets for RPM and TPM, respects provider limits
- Smart Retries β Exponential backoff with jitter, automatic 429/5xx handling
- Usage Tracking β Metrics for input tokens and output tokens
- Flexible I/O β Process from Python lists or JSONL files, output to memory or disk
- Structured Outputs β Support for Pydantic models and JSON schemas
- Provider Agnostic β Same API across OpenAI, Anthropic, Gemini, DeepSeek, and more
![]() OpenAI Chat, Responses, Embeddings |
Anthropic Messages API |
Gemini Generate, Embeddings |
|
DeepSeek Chat Completions |
Cohere Embed API |
![]() Voyage AI Embeddings |
Explore real-world use cases in the examples/ directory:
| Use Case | Description |
|---|---|
| Data Extraction | Extract structured data from product listings, invoices |
| Embeddings | Generate embeddings for RAG and semantic search |
| Evaluation | Multi-judge consensus evaluation |
| Synthetic Data | Generate training data and evaluation sets |
| Classification | Sentiment analysis, content moderation |
| Translation | Dataset translation for multilingual evaluation |
callm supports four processing modes depending on your input source and output destination:
| Input | Output | Best For |
|---|---|---|
| Python list | In-memory | Small batches, interactive use |
| Python list | JSONL file | Medium batches, need persistence |
| JSONL file | JSONL file | Large batches, low memory |
| JSONL file | In-memory | Loading saved requests, testing |
# 1. List β Memory (small batches)
results = await process_requests(
provider=provider,
requests=my_list,
rate_limit=rate_limit,
)
# Access: results.successes, results.failures
# 2. List β File (persist results)
results = await process_requests(
provider=provider,
requests=my_list,
rate_limit=rate_limit,
output_path="results.jsonl",
)
# 3. File β File (large batches, low memory)
results = await process_requests(
provider=provider,
requests="input.jsonl",
rate_limit=rate_limit,
output_path="results.jsonl",
)
# 4. File β Memory (reload saved requests)
results = await process_requests(
provider=provider,
requests="input.jsonl",
rate_limit=rate_limit,
)from callm import RateLimitConfig, RetryConfig
# Rate limiting (required)
rate_limit = RateLimitConfig(
max_requests_per_minute=1000,
max_tokens_per_minute=100_000,
)
# Retry behavior (optional, sensible defaults)
retry = RetryConfig(
max_attempts=5,
base_delay_seconds=0.5,
max_delay_seconds=15.0,
jitter=0.1,
)
results = await process_requests(
provider=provider,
requests=requests,
rate_limit=rate_limit,
retry=retry,
)Main function for parallel API request processing.
| Parameter | Type | Description |
|---|---|---|
provider |
BaseProvider |
Provider instance (OpenAI, Anthropic, etc.) |
requests |
list[dict] | str |
List of request dicts or path to JSONL file |
rate_limit |
RateLimitConfig |
RPM and TPM limits |
retry |
RetryConfig |
Optional retry configuration |
output_path |
str |
Optional path for output JSONL (enables streaming) |
errors_path |
str |
Optional path for error JSONL |
logging_level |
int |
Logging verbosity (default: 20/INFO) |
Returns: ProcessingResults with successes, failures, and stats.
Contributions are welcome! See CONTRIBUTING.md for guidelines.
# Setup development environment
git clone https://github.com/milistu/callm.git
cd callm
uv sync --dev
uv run pre-commit install
# Run tests
uv run noxMIT License - see LICENSE for details.
Built with π§‘ for engineers who process data at scale

