Skip to content

langasync/langasync

Repository files navigation

langasync

50% cost savings on LLM APIs with zero code changes.

PyPI Python License Tests

Quick StartFeaturesDocumentationProvidersContributing

langasync lets you use provider batch APIs (OpenAI, Anthropic, Google Gemini, AWS Bedrock) with your existing LangChain chains. Wrap your chain, submit inputs, get results at half the cost.

from langasync import batch_chain

# Your existing chain — no changes needed
chain = prompt | model | parser

# Wrap for batch processing
batch_wrapper = batch_chain(chain)

# Submit and retrieve results
job = await batch_wrapper.submit(inputs)
results = await job.get_results()

Why langasync?

Provider batch APIs offer 50% cost savings on tokens for workloads that can tolerate 24-hour turnaround. But they require completely different code patterns — file uploads, polling, result parsing.

langasync abstracts all of that:

Without langasync With langasync
Rewrite chains as batch requests Same chain, just wrapped
Manage file uploads (OpenAI) Automatic
Build custom polling logic Built-in BatchPoller
Parse provider-specific responses Unified BatchItem
Handle partial failures manually Automatic success/error separation

Quick Start

Installation

pip install langasync

Set your API key

export OPENAI_API_KEY=sk-...
# or for Anthropic:
export ANTHROPIC_API_KEY=sk-ant-...
# or for Google Gemini:
export GOOGLE_API_KEY=AIza...

For AWS Bedrock, see the Bedrock Setup Guide — it requires an S3 bucket, IAM role, and model access configuration.

Basic Usage

import asyncio
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langasync import batch_chain, BatchPoller, BatchStatus

prompt = ChatPromptTemplate.from_messages([
    ("system", "You are a helpful assistant."),
    ("user", "Explain {topic} in one paragraph.")
])
model = ChatOpenAI(model="gpt-4o-mini")
chain = prompt | model

batch_wrapper = batch_chain(chain)

async def main():
    # Submit — returns immediately
    job = await batch_wrapper.submit([
        {"topic": "quantum computing"},
        {"topic": "machine learning"},
        {"topic": "blockchain"},
    ])
    print(f"Batch submitted: {job.job_id}")

    # Poll until complete — batch APIs typically take minutes to hours
    poller = BatchPoller()
    async for result in poller.wait_all():
        if result.status_info.status == BatchStatus.COMPLETED:
            for r in result.results:
                print(r.content)

asyncio.run(main())

Features

Drop-in Batch Support

Wrap any LangChain chain with batch_chain(). Prompts, models, parsers — all work automatically.

chain = prompt | model | parser
batch_wrapper = batch_chain(chain)

Structured Output

Full support for Pydantic output parsers and schemas:

from pydantic import BaseModel
from langchain_core.output_parsers import PydanticOutputParser

class Analysis(BaseModel):
    sentiment: str
    confidence: float

parser = PydanticOutputParser(pydantic_object=Analysis)
chain = prompt | model | parser

Tool Calling

.bind_tools() works out of the box:

model = ChatOpenAI().bind_tools([my_tool])
chain = prompt | model

Multimodal (Images & PDFs)

Pass images and documents as part of your batch inputs:

from langchain_core.messages import HumanMessage, SystemMessage

batch_wrapper = batch_chain(model, settings)

await batch_wrapper.submit([
    [
        SystemMessage(content="You are a helpful assistant."),
        HumanMessage(content=[
            {"type": "text", "text": "Describe this image."},
            {"type": "image", "url": "https://example.com/photo.jpg"},
        ]),
    ],
])

Job Persistence

Batch jobs can take up to 24 hours. Job metadata persists automatically — resume after process restart:

from langasync import BatchPoller

# Later, in a new process
poller = BatchPoller()

async for result in poller.wait_all():
    print(f"Job {result.job_id}: {result.status_info.status}")

Note: langasync persists job metadata (IDs, status) but not results. Save your results when you receive them (OpenAI and Anthropic delete after ~30 days, Gemini after 48 hours, Bedrock persists in your S3 bucket).

Partial Failure Handling

Get successful results even when some requests fail:

result = await job.get_results()

for r in result.results:
    if r.success:
        print(r.content)
    else:
        print(f"Failed: {r.error}")

Supported Providers

Provider Status Batch API Savings
OpenAI ✅ Supported Batch API 50%
Anthropic ✅ Supported Message Batches 50%
Google Gemini ✅ Supported Batch API 50%
AWS Bedrock (Claude) ✅ Supported Batch Inference (Setup Guide) 50%
Azure OpenAI 🔜 Planned

Documentation

API Reference

Function / Class Description
batch_chain(chain) Wrap a LangChain chain for batch processing
BatchPoller() Poll pending jobs and retrieve results
LangasyncSettings Configuration via env vars or constructor
BatchJobService Service layer — create(), get(), list() batch jobs
BatchJobHandle Returned by submit() / create() — provides get_results(), cancel()
BatchStatus Enum: PENDING, IN_PROGRESS, COMPLETED, FAILED, CANCELLED, EXPIRED

Configuration

langasync reads configuration from environment variables or a .env file automatically:

# Provider API keys
OPENAI_API_KEY=sk-...
ANTHROPIC_API_KEY=sk-ant-...
GOOGLE_API_KEY=AIza...



# AWS Bedrock (see docs/bedrock-setup.md for full setup guide)
AWS_ACCESS_KEY_ID=AKIA...        
AWS_SECRET_ACCESS_KEY=...
AWS_REGION=us-east-1
BEDROCK_S3_BUCKET=my-batch-bucket
BEDROCK_ROLE_ARN=arn:aws:iam::123456789012:role/BedrockBatchRole

# Optional overrides
OPENAI_BASE_URL=https://api.openai.com/v1
ANTHROPIC_BASE_URL=https://api.anthropic.com
GOOGLE_BASE_URL=https://generativelanguage.googleapis.com/v1beta
LANGASYNC_BATCH_POLL_INTERVAL=60.0
LANGASYNC_BASE_STORAGE_PATH=./langasync_jobs

Or configure programmatically:

from langasync import LangasyncSettings, batch_chain

settings = LangasyncSettings(
    openai_api_key="sk-...",
    batch_poll_interval=30.0,
    base_storage_path="./my_jobs",
)
batch_wrapper = batch_chain(chain, settings)

Examples

See examples/ for complete working examples:

# Submit a batch job
python examples/openai_example.py run

# Fetch results (can run later, after restart)
python examples/openai_example.py fetch

Development

Setup

git clone https://github.com/langasync/langasync.git
cd langasync
pip install -e ".[dev]"
pre-commit install

Running Tests

# Unit tests (mocked, fast)
pytest tests

# Integration tests (requires API keys)
pytest tests/integration -o "addopts="

Contributing

Contributions are welcome!

  1. Fork the repository
  2. Create a feature branch (git checkout -b feature/amazing-feature)
  3. Make your changes
  4. Run tests (pytest)
  5. Submit a pull request

Community

License

langasync is licensed under Apache 2.0. Free to use for any purpose — personal projects, commercial products, production workloads.

About

No description, website, or topics provided.

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages