- Introduction
- Core Concepts
- Fundamentals
- Intermediate Topics
- Advanced Patterns
- Real-World Examples
- Best Practices
- Common Pitfalls
- Quick Reference
Asynchronous programming is a programming paradigm that allows a program to handle multiple operations concurrently without waiting for each operation to complete before starting the next one. Instead of blocking execution while waiting for I/O operations (like network requests, file operations, or database queries), the program can switch to other tasks and return to the waiting operation when it's ready.
Analogy: Think of a restaurant kitchen. In a synchronous kitchen, the chef would:
- Start cooking dish A
- Wait for dish A to finish completely
- Start cooking dish B
- Wait for dish B to finish completely
In an asynchronous kitchen, the chef would:
- Start cooking dish A (put it in the oven)
- While dish A is cooking, start preparing dish B
- Check on dish A when the timer goes off
- Continue multitasking between dishes
Synchronous Code:
import time
def download_file(name):
print(f"Starting download: {name}")
time.sleep(2) # Simulating download time
print(f"Finished download: {name}")
return f"data_{name}"
# Sequential execution
start = time.time()
result1 = download_file("file1")
result2 = download_file("file2")
result3 = download_file("file3")
print(f"Total time: {time.time() - start:.2f} seconds")
# Output: ~6 seconds (2 + 2 + 2)Asynchronous Code:
import asyncio
async def download_file(name):
print(f"Starting download: {name}")
await asyncio.sleep(2) # Simulating download time
print(f"Finished download: {name}")
return f"data_{name}"
# Concurrent execution
async def main():
start = time.time()
results = await asyncio.gather(
download_file("file1"),
download_file("file2"),
download_file("file3")
)
print(f"Total time: {time.time() - start:.2f} seconds")
# Output: ~2 seconds (all running concurrently)
asyncio.run(main())Use async/await when:
- Making multiple network requests (APIs, web scraping)
- Performing database operations with many queries
- Reading/writing multiple files
- Handling WebSocket connections
- Building web servers that handle many concurrent connections
- Any I/O-bound operations that involve waiting
Don't use async/await when:
- Performing CPU-intensive calculations (use multiprocessing instead)
- Simple scripts with sequential operations
- When there's no I/O waiting time to optimize
- Working with libraries that don't support async (creates complexity without benefit)
The event loop is the heart of async programming in Python. It's a continuously running loop that:
- Checks which coroutines are ready to run
- Executes them until they hit an
awaitstatement - Switches to other coroutines that are ready
- Returns to previous coroutines when their awaited operations complete
Event Loop Cycle:
┌─────────────────────────────────────┐
│ Check ready coroutines │
│ ↓ │
│ Execute coroutine until 'await' │
│ ↓ │
│ Switch to another coroutine │
│ ↓ │
│ Check for completed I/O operations │
│ ↓ │
└───────────┘ → (repeat infinitely)
A coroutine is a special function that can pause its execution and resume later. In Python, you create a coroutine by defining a function with async def.
# This is a coroutine function
async def my_coroutine():
print("Coroutine is running")
await asyncio.sleep(1)
print("Coroutine finished")
return "result"
# Calling it returns a coroutine object (not the result!)
coro = my_coroutine()
print(type(coro)) # <class 'coroutine'>
# You need to await it or run it in an event loop
result = asyncio.run(my_coroutine()) # Runs the coroutineA Task is a wrapper around a coroutine that schedules it to run on the event loop. Tasks allow coroutines to run concurrently.
async def main():
# Create a task (starts running immediately)
task = asyncio.create_task(my_coroutine())
# Do other work while task runs in background
print("Task is running in background")
# Wait for task to complete
result = await taskA Future is a low-level object that represents a result that will be available in the future. Most of the time, you'll work with coroutines and tasks rather than futures directly. Futures are the building blocks that tasks are built upon.
async def: Defines a coroutine functionawait: Pauses execution until the awaited coroutine completesasync with: Asynchronous context managerasync for: Asynchronous iteration
async def example():
# await pauses this coroutine until do_something() completes
result = await do_something()
# async with for resources that need async setup/cleanup
async with aiohttp.ClientSession() as session:
pass
# async for to iterate over async generators
async for item in async_generator():
process(item)Any function defined with async def becomes a coroutine function:
import asyncio
async def greet(name):
"""A simple async function"""
await asyncio.sleep(1) # Simulate some async work
return f"Hello, {name}!"
# This creates a coroutine object, but doesn't run it yet
coro = greet("Alice")You can only use await inside an async function:
async def main():
# Correct: await the coroutine
result = await greet("Alice")
print(result) # "Hello, Alice!"
# Wrong: This would just create a coroutine object without running it
# result = greet("Alice") # ⚠️ Produces a coroutine, not a result
asyncio.run(main())To run async code from synchronous code (like the top level of your script), use asyncio.run():
import asyncio
async def main():
print("Starting")
await asyncio.sleep(1)
print("Finished")
# Entry point: runs the main coroutine
if __name__ == "__main__":
asyncio.run(main())Important:
asyncio.run()creates a new event loop, runs the coroutine, and closes the loop- Call it only once at the program entry point
- Don't call it inside an async function (the loop is already running)
See the complete example in examples/01_basic_async.py
import asyncio
import time
async def make_coffee():
print("Starting to make coffee...")
await asyncio.sleep(2) # Brewing takes 2 seconds
print("Coffee is ready!")
return "Hot Coffee"
async def make_toast():
print("Starting to make toast...")
await asyncio.sleep(1) # Toasting takes 1 second
print("Toast is ready!")
return "Crispy Toast"
async def make_breakfast():
start = time.time()
# Sequential: total time = 2 + 1 = 3 seconds
coffee = await make_coffee()
toast = await make_toast()
elapsed = time.time() - start
print(f"Breakfast ready in {elapsed:.1f} seconds: {coffee} and {toast}")
asyncio.run(make_breakfast())asyncio.gather() runs multiple coroutines concurrently and waits for all of them to complete:
import asyncio
async def fetch_data(id, delay):
await asyncio.sleep(delay)
return f"Data {id}"
async def main():
# Run all three concurrently
results = await asyncio.gather(
fetch_data(1, 2),
fetch_data(2, 1),
fetch_data(3, 3)
)
print(results) # ['Data 1', 'Data 2', 'Data 3']
asyncio.run(main())See examples/03_gather_example.py for more details.
Key features:
- Returns results in the same order as the coroutines were passed
- If one coroutine raises an exception,
gather()raises it (unlessreturn_exceptions=True) - All coroutines start immediately when
gather()is called
create_task() schedules a coroutine to run in the background:
async def background_work():
await asyncio.sleep(2)
print("Background work done")
async def main():
# Start the task (begins running immediately)
task = asyncio.create_task(background_work())
# Do other work
print("Doing other things...")
await asyncio.sleep(1)
# Wait for the background task to finish
await task
asyncio.run(main())See examples/04_task_creation.py for more details.
Difference from gather():
create_task()returns immediately; the task runs in backgroundgather()waits for all coroutines to complete- Use
create_task()when you want to start something and wait for it later
async def slow_operation():
await asyncio.sleep(5)
return "Done"
async def main():
try:
result = await asyncio.wait_for(slow_operation(), timeout=2.0)
except asyncio.TimeoutError:
print("Operation timed out!")
asyncio.run(main())async def main():
tasks = [
asyncio.create_task(fetch_data(1, 1)),
asyncio.create_task(fetch_data(2, 2)),
asyncio.create_task(fetch_data(3, 3))
]
# Wait for the first task to complete
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
print(f"First result: {done.pop().result()}")
# Cancel remaining tasks
for task in pending:
task.cancel()
asyncio.run(main())async def main():
coros = [fetch_data(i, 3-i) for i in range(1, 4)]
for coro in asyncio.as_completed(coros):
result = await coro
print(f"Got result: {result}")
# Results arrive in completion order, not creation order
asyncio.run(main())Async context managers handle resources that need async setup or cleanup:
class AsyncResource:
async def __aenter__(self):
print("Acquiring resource...")
await asyncio.sleep(1)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Releasing resource...")
await asyncio.sleep(1)
async def do_work(self):
print("Working...")
await asyncio.sleep(1)
async def main():
async with AsyncResource() as resource:
await resource.do_work()
# Resource is automatically released here
asyncio.run(main())See examples/05_async_context_manager.py for a complete example.
Async iterators allow you to iterate over data that arrives asynchronously:
async def async_range(count):
"""An async generator"""
for i in range(count):
await asyncio.sleep(1)
yield i
async def main():
async for number in async_range(5):
print(f"Got number: {number}")
asyncio.run(main())Async comprehensions:
async def main():
# Async list comprehension
results = [x async for x in async_range(5)]
print(results) # [0, 1, 2, 3, 4]
# Async dict comprehension
squared = {x: x**2 async for x in async_range(5)}
asyncio.run(main())See examples/06_async_iterator.py for more details.
Semaphores limit the number of concurrent operations:
async def fetch_url(url, semaphore):
async with semaphore:
# Only N tasks can be here at once
print(f"Fetching {url}")
await asyncio.sleep(1)
return f"Content from {url}"
async def main():
# Limit to 3 concurrent fetches
semaphore = asyncio.Semaphore(3)
urls = [f"http://example.com/{i}" for i in range(10)]
tasks = [fetch_url(url, semaphore) for url in urls]
results = await asyncio.gather(*tasks)
asyncio.run(main())Locks ensure mutual exclusion:
lock = asyncio.Lock()
shared_resource = 0
async def increment():
global shared_resource
async with lock:
# Only one coroutine can be here at a time
temp = shared_resource
await asyncio.sleep(0.1) # Simulate work
shared_resource = temp + 1
async def main():
await asyncio.gather(*[increment() for _ in range(10)])
print(shared_resource) # 10 (without lock, it might be less)
asyncio.run(main())See examples/07_semaphore_rate_limiting.py for a real-world example.
Queues allow coroutines to communicate safely:
async def producer(queue, producer_id):
for i in range(5):
item = f"item-{producer_id}-{i}"
await queue.put(item)
print(f"Produced: {item}")
await asyncio.sleep(0.5)
async def consumer(queue, consumer_id):
while True:
item = await queue.get()
print(f"Consumer {consumer_id} consumed: {item}")
await asyncio.sleep(1)
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=10)
# Start producers and consumers
producers = [asyncio.create_task(producer(queue, i)) for i in range(2)]
consumers = [asyncio.create_task(consumer(queue, i)) for i in range(3)]
# Wait for all items to be produced
await asyncio.gather(*producers)
# Wait for queue to be empty
await queue.join()
# Cancel consumers
for c in consumers:
c.cancel()
asyncio.run(main())See examples/08_producer_consumer.py for a complete example.
Error handling works similarly to synchronous code, but with some nuances:
async def risky_operation():
await asyncio.sleep(1)
raise ValueError("Something went wrong!")
async def main():
try:
await risky_operation()
except ValueError as e:
print(f"Caught error: {e}")
asyncio.run(main())With gather() and multiple coroutines:
async def may_fail(n):
if n == 2:
raise ValueError(f"Task {n} failed")
return f"Result {n}"
async def main():
# By default, gather raises the first exception
try:
results = await asyncio.gather(
may_fail(1),
may_fail(2),
may_fail(3)
)
except ValueError as e:
print(f"Caught: {e}")
# With return_exceptions=True, exceptions are returned as results
results = await asyncio.gather(
may_fail(1),
may_fail(2),
may_fail(3),
return_exceptions=True
)
print(results) # ['Result 1', ValueError(...), 'Result 3']
asyncio.run(main())See examples/10_error_handling.py for more patterns.
Tasks can be cancelled:
async def long_running():
try:
print("Starting...")
await asyncio.sleep(10)
print("Finished!")
except asyncio.CancelledError:
print("Task was cancelled!")
raise # Important: re-raise to complete cancellation
async def main():
task = asyncio.create_task(long_running())
await asyncio.sleep(2)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("Main: task was cancelled")
asyncio.run(main())Timeouts with wait_for():
async def main():
try:
await asyncio.wait_for(long_running(), timeout=2.0)
except asyncio.TimeoutError:
print("Timed out!")
asyncio.run(main())Sometimes you need to call blocking (synchronous) code from async code. Use run_in_executor():
import time
from concurrent.futures import ThreadPoolExecutor
def blocking_operation(n):
"""A regular synchronous function that blocks"""
print(f"Blocking for {n} seconds...")
time.sleep(n)
return f"Blocked for {n}s"
async def main():
loop = asyncio.get_event_loop()
# Run blocking code in a thread pool
with ThreadPoolExecutor() as executor:
result = await loop.run_in_executor(
executor,
blocking_operation,
3
)
print(result)
asyncio.run(main())When to use:
- Calling legacy synchronous libraries
- File I/O with libraries that don't support async
- CPU-bound operations (use ProcessPoolExecutor for these)
See examples/11_mixing_sync_async.py for examples.
Using aiohttp for concurrent HTTP requests:
import aiohttp
import asyncio
async def fetch_page(session, url):
async with session.get(url) as response:
return await response.text()
async def main():
urls = [
'https://example.com',
'https://example.org',
'https://example.net'
]
async with aiohttp.ClientSession() as session:
tasks = [fetch_page(session, url) for url in urls]
pages = await asyncio.gather(*tasks)
for url, page in zip(urls, pages):
print(f"{url}: {len(page)} bytes")
asyncio.run(main())See examples/09_http_requests.py for a complete example.
Using aiofiles for non-blocking file operations:
import aiofiles
import asyncio
async def read_file(filename):
async with aiofiles.open(filename, 'r') as f:
contents = await f.read()
return contents
async def write_file(filename, content):
async with aiofiles.open(filename, 'w') as f:
await f.write(content)
async def main():
# Write and read multiple files concurrently
await asyncio.gather(
write_file('file1.txt', 'Content 1'),
write_file('file2.txt', 'Content 2'),
write_file('file3.txt', 'Content 3')
)
contents = await asyncio.gather(
read_file('file1.txt'),
read_file('file2.txt'),
read_file('file3.txt')
)
print(contents)
asyncio.run(main())Using asyncpg for PostgreSQL:
import asyncpg
import asyncio
async def fetch_users(pool):
async with pool.acquire() as connection:
rows = await connection.fetch('SELECT * FROM users')
return rows
async def main():
# Create a connection pool
pool = await asyncpg.create_pool(
user='user',
password='password',
database='mydb',
host='localhost'
)
# Run multiple queries concurrently
results = await asyncio.gather(
fetch_users(pool),
fetch_users(pool),
fetch_users(pool)
)
await pool.close()
asyncio.run(main())Combining semaphores with delays:
async def rate_limited_fetch(url, semaphore, delay=1.0):
async with semaphore:
print(f"Fetching {url}")
# Your fetch logic here
await asyncio.sleep(0.5)
# Wait before releasing semaphore
await asyncio.sleep(delay)
return f"Data from {url}"
async def main():
# Allow 5 concurrent requests, with 1 second between each batch
semaphore = asyncio.Semaphore(5)
urls = [f"https://api.example.com/item/{i}" for i in range(20)]
tasks = [rate_limited_fetch(url, semaphore) for url in urls]
results = await asyncio.gather(*tasks)
asyncio.run(main())See examples/12_real_world_api.py for a complete production-ready example.
Avoid async when:
-
CPU-bound tasks: Use
multiprocessinginstead# Bad: async won't help with CPU-bound work async def compute(): return sum(i**2 for i in range(10_000_000)) # Good: use multiprocessing from concurrent.futures import ProcessPoolExecutor def compute(): return sum(i**2 for i in range(10_000_000)) with ProcessPoolExecutor() as executor: result = executor.submit(compute)
-
No I/O waiting: If your code doesn't wait for I/O, async adds overhead
-
Simple sequential scripts: Async adds complexity without benefit
-
Libraries don't support it: Forcing async with sync libraries creates messy code
I/O-bound (good for async):
- Network requests
- File operations
- Database queries
- User input
- Any operation where you're waiting for external resources
CPU-bound (bad for async):
- Mathematical calculations
- Image processing
- Video encoding
- Data parsing
- Cryptographic operations
Hybrid approach:
async def process_data():
# I/O-bound: fetch data (async)
data = await fetch_from_api()
# CPU-bound: process data (run in process pool)
loop = asyncio.get_event_loop()
with ProcessPoolExecutor() as executor:
result = await loop.run_in_executor(executor, heavy_computation, data)
return result-
Connection pooling: Reuse connections instead of creating new ones
# Bad: creates new session each time async def fetch(url): async with aiohttp.ClientSession() as session: async with session.get(url) as response: return await response.text() # Good: reuse session async def fetch_all(urls): async with aiohttp.ClientSession() as session: tasks = [fetch_one(session, url) for url in urls] return await asyncio.gather(*tasks) async def fetch_one(session, url): async with session.get(url) as response: return await response.text()
-
Limit concurrency: Don't overwhelm resources
# Use semaphores to limit concurrent operations semaphore = asyncio.Semaphore(10) # Max 10 concurrent
-
Batch operations: Group operations when possible
# Instead of many small queries results = [await db.query(id) for id in ids] # Use bulk queries results = await db.query_many(ids)
-
Monitor performance: Use timing to verify async is helping
import time start = time.time() results = await asyncio.gather(*tasks) print(f"Completed in {time.time() - start:.2f}s")
Use pytest with pytest-asyncio:
import pytest
@pytest.mark.asyncio
async def test_async_function():
result = await my_async_function()
assert result == expected_value
@pytest.mark.asyncio
async def test_with_mock():
# Mock async functions
async def mock_fetch():
return "mocked data"
result = await process_data(mock_fetch)
assert result == "processed: mocked data"# Wrong: creates coroutine but doesn't run it
async def main():
result = my_async_function() # ⚠️ Forgot await!
print(result) # Prints <coroutine object...>
# Correct
async def main():
result = await my_async_function() # ✓ Proper await
print(result)Python will warn you: RuntimeWarning: coroutine was never awaited
# Bad: blocks the entire event loop
async def bad_async():
time.sleep(5) # ⚠️ Blocks everything!
return "done"
# Good: allows other coroutines to run
async def good_async():
await asyncio.sleep(5) # ✓ Non-blocking
return "done"Rule: Never use blocking operations in async functions without run_in_executor()
# Bad: exception is silently ignored
async def main():
task = asyncio.create_task(may_fail())
# Task runs in background, exception is never seen
await asyncio.sleep(5)
# Good: wait for task to see exceptions
async def main():
task = asyncio.create_task(may_fail())
await asyncio.sleep(5)
await task # ✓ Exception will be raised here
# Also good: use gather to handle multiple tasks
async def main():
await asyncio.gather(
may_fail(),
may_fail(),
return_exceptions=True # Or handle exceptions explicitly
)# Wrong: can't call async from sync directly
def sync_function():
result = await async_function() # ⚠️ SyntaxError!
# Correct: use asyncio.run() or run_in_executor()
def sync_function():
result = asyncio.run(async_function()) # ✓
# Wrong: creating new event loop inside async
async def async_function():
result = asyncio.run(another_async()) # ⚠️ Loop already running!
# Correct: just await it
async def async_function():
result = await another_async() # ✓# Bad: session may not be closed properly
async def fetch(url):
session = aiohttp.ClientSession()
response = await session.get(url)
return await response.text()
# ⚠️ Session never closed!
# Good: use context manager
async def fetch(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
# ✓ Automatically closed# Bad: creates 1 million concurrent tasks
async def main():
urls = [f"http://api.com/{i}" for i in range(1_000_000)]
await asyncio.gather(*[fetch(url) for url in urls])
# ⚠️ May exhaust resources!
# Good: use semaphore to limit concurrency
async def main():
semaphore = asyncio.Semaphore(100)
urls = [f"http://api.com/{i}" for i in range(1_000_000)]
await asyncio.gather(*[fetch(url, semaphore) for url in urls])
# ✓ Max 100 concurrent requests| Function | Purpose | Example |
|---|---|---|
asyncio.run(coro) |
Run a coroutine (entry point) | asyncio.run(main()) |
await coro |
Wait for coroutine to complete | result = await fetch() |
asyncio.create_task(coro) |
Schedule coroutine as task | task = asyncio.create_task(work()) |
asyncio.gather(*coros) |
Run multiple coroutines concurrently | results = await asyncio.gather(a(), b()) |
asyncio.wait_for(coro, timeout) |
Run with timeout | await asyncio.wait_for(slow(), 5.0) |
asyncio.sleep(seconds) |
Async sleep (non-blocking) | await asyncio.sleep(1) |
asyncio.Queue() |
Async queue for communication | queue = asyncio.Queue() |
asyncio.Semaphore(n) |
Limit concurrent operations | sem = asyncio.Semaphore(10) |
asyncio.Lock() |
Mutual exclusion | lock = asyncio.Lock() |
loop.run_in_executor(ex, fn, *args) |
Run sync code in thread/process | await loop.run_in_executor(None, sync_fn) |
async def my_function():
await asyncio.sleep(1)
return "result"if __name__ == "__main__":
asyncio.run(main())results = await asyncio.gather(task1(), task2(), task3())try:
result = await asyncio.wait_for(slow_task(), timeout=5.0)
except asyncio.TimeoutError:
print("Timed out!")semaphore = asyncio.Semaphore(5)
async def limited_task():
async with semaphore:
await do_work()async with resource() as r:
await r.do_work()async for item in async_generator():
process(item)try:
result = await risky_operation()
except Exception as e:
handle_error(e)task = asyncio.create_task(background_work())
# Do other things...
result = await task # Wait for it when neededDo you need async?
│
├─ Are you waiting for I/O? (network, files, database)
│ ├─ Yes → Use async ✓
│ └─ No → Don't use async
│
├─ Are you doing CPU-intensive work?
│ ├─ Yes → Use multiprocessing, not async
│ └─ No → Continue
│
├─ Do you have multiple I/O operations that can run concurrently?
│ ├─ Yes → Use async with gather() or create_task() ✓
│ └─ No → Async might not help
│
└─ Do your libraries support async?
├─ Yes → Use async ✓
└─ No → Use run_in_executor() or stick with sync
- Official Documentation: asyncio — Asynchronous I/O
- PEP 492: Coroutines with async and await syntax
- Libraries:
aiohttp: Async HTTP client/serveraiofiles: Async file operationsasyncpg: Async PostgreSQL drivermotor: Async MongoDB driverhttpx: Modern async HTTP client
- Write an async program that fetches data from 5 URLs concurrently
- Implement a rate-limited API client using semaphores
- Create a producer-consumer system with multiple producers and consumers
- Build an async web scraper that respects rate limits
- Implement retry logic with exponential backoff for async operations
See the examples/ directory for complete, runnable implementations of these concepts!