- System Overview
- Component Architecture
- Data Models
- API Design
- Timeline Architecture
- Fan-out Strategy
- Scaling Strategy
- Failure Handling
Pulse is a distributed social feed platform designed to handle high read and write throughput while maintaining low latency and high availability.
Functional:
- User authentication & profiles
- Post creation and retrieval
- Social graph (follow/unfollow)
- Personalized timeline generation
- Celebrity user detection
Non-Functional:
- Latency: P95 < 200ms for timeline reads
- Availability: 99.9% uptime
- Consistency: Eventual consistency acceptable
- Scalability: Horizontal scaling capability
Responsibilities:
- Request validation & authentication
- Business logic orchestration
- Response formatting
Design Patterns:
- Dependency Injection (FastAPI's Depends)
- Repository pattern (database access)
- Middleware (CORS, logging)
Key Files:
services/
├── routers/
│ ├── auth.py # Authentication endpoints
│ ├── users.py # User management & social graph
│ ├── posts.py # Post CRUD operations
│ ├── timeline.py # Timeline generation
│ └── system.py # Health & metrics
└── main.py # Application entry point
Tables:
- users # User accounts & metadata
- posts # All posts
- follows # Social graph relationshipsIndexes:
users.username(unique, B-tree)users.email(unique, B-tree)posts.author_id(B-tree)posts.created_at(B-tree)follows(follower_id, following_id)(composite, unique)
Data Structures:
- Sorted Sets: timeline:{user_id} → {post_id: timestamp}
- Strings: cache:{key} → JSON values
Memory Management:
- Per-timeline limit: 1000 posts
- Eviction: ZREMRANGEBYRANK (keep latest)
- No TTL (size-limited instead)
Fan-out Worker:
Process Flow:
1. Poll SQS queue (long polling, 20s)
2. Receive post_created events
3. Query followers from database
4. Write to each follower's timeline (Redis)
5. Delete message from queueError Handling:
- SQS visibility timeout: 30s
- Max retries: 3
- Dead letter queue for failures
- Idempotent writes
Queue Configuration:
Queue Name: pulse-post-created
Type: Standard (at-least-once delivery)
Visibility Timeout: 30 seconds
Message Retention: 4 days
Max Message Size: 256 KBMessage Format:
{
"event_type": "post_created",
"post_id": 12345,
"author_id": 678,
"is_celebrity": false,
"timestamp": 1704268800.0
}class User:
id: int # Primary key
username: str # Unique, indexed
email: str # Unique, indexed
hashed_password: str # bcrypt hash
full_name: Optional[str]
bio: Optional[str]
is_active: bool = True
is_celebrity: bool = False # Auto-updated
follower_count: int = 0 # Denormalized
following_count: int = 0 # Denormalized
created_at: datetime
updated_at: datetimeDesign Decision:
- Denormalized counts for performance
- Updated transactionally with follow/unfollow
- Alternative: COUNT query (slower)
class Post:
id: int # Primary key
author_id: int # Foreign key → users.id
content: str # Max 5000 chars
created_at: datetime # Indexed
# Relationship
author: UserDesign Decision:
- Simple flat structure (no nested comments)
- Immutable after creation (no edits)
- Soft delete possible (add
deleted_at)
class Follow:
id: int # Primary key
follower_id: int # Foreign key → users.id
following_id: int # Foreign key → users.id
created_at: datetime
# Unique constraint: (follower_id, following_id)
# Indexes: follower_id, following_idDesign Decision:
- Bidirectional queries supported
- Prevents duplicate follows
- No "mutual follow" flag (computed)
Resource-based URLs:
✅ POST /posts
✅ GET /timeline
✅ POST /users/follow/{user_id}
❌ POST /createPost
❌ GET /getTimeline
JWT-based:
Header: Authorization: Bearer <token>
Token Payload:
{
"user_id": 123,
"username": "alice",
"exp": 1704268800
}
Token Lifecycle:
- Login → Generate token (24h expiry)
- Include in requests → Validate & extract user
- Expired → 401 Unauthorized (re-login)
Standardized Format:
{
"detail": "User not found",
"type": "NotFoundError"
}HTTP Status Codes:
- 200: Success
- 201: Created
- 204: No Content (delete)
- 400: Bad Request (validation)
- 401: Unauthorized (auth)
- 404: Not Found
- 500: Internal Server Error
Trigger: User creates post
Process:
1. Save post to database
2. Publish event to SQS
3. Worker fans out to followers
4. Write to Redis timelines
Pros: Fast reads (cached)
Cons: Write amplification
Best for: Normal users (<100K followers)
Trigger: User requests timeline
Process:
1. Query posts from followed celebrities
2. Merge with cached timeline
3. Sort by timestamp
Pros: No write amplification
Cons: Slower reads
Best for: Celebrities (>100K followers)
def get_timeline(user_id, limit=50):
# 1. Get cached timeline (pushed posts)
cached_posts = redis.zrevrange(f"timeline:{user_id}", 0, limit)
# 2. Get celebrity posts (pulled posts)
celebrity_ids = get_followed_celebrities(user_id)
celebrity_posts = db.query(Post).filter(
Post.author_id.in_(celebrity_ids)
).order_by(Post.created_at.desc()).limit(20)
# 3. Merge and sort
all_posts = merge_sort_by_timestamp(cached_posts, celebrity_posts)
# 4. Return top N
return all_posts[:limit]Complexity:
- Cache hit: O(log N) - Redis sorted set range
- Celebrity query: O(M log M) - M = celebrity count
- Merge: O((N+M) log(N+M))
- Total: O((N+M) log(N+M)) ≈ O(N log N)
Write-through Cache:
# On post creation (via worker)
def add_to_timeline(user_id, post_id, timestamp):
redis.zadd(f"timeline:{user_id}", {post_id: timestamp})
redis.zremrangebyrank(f"timeline:{user_id}", 0, -1001) # Keep 1000Cache Invalidation:
# On follow/unfollow
def on_follow(follower_id, following_id):
# Option 1: Clear cache (simple)
redis.delete(f"timeline:{follower_id}")
# Option 2: Backfill (complex, better UX)
recent_posts = get_user_posts(following_id, limit=100)
for post in recent_posts:
redis.zadd(f"timeline:{follower_id}", {post.id: post.timestamp})Current Implementation: Option 1 (clear cache) for simplicity
CELEBRITY_THRESHOLD = 100_000
def should_fanout(user):
return user.follower_count < CELEBRITY_THRESHOLDSynchronous vs. Asynchronous:
❌ Synchronous (naive):
def create_post(user, content):
post = save_post(user, content)
followers = get_followers(user.id)
for follower in followers:
add_to_timeline(follower.id, post.id) # Blocking!
return postProblem: High follower count = timeout
✅ Asynchronous (our approach):
def create_post(user, content):
post = save_post(user, content)
if not user.is_celebrity:
sqs.publish("post_created", post.id) # Non-blocking
return postAdvantage: API returns immediately, worker handles fan-out
# Worker optimization
def fan_out_to_followers(post_id, follower_ids):
BATCH_SIZE = 1000
for i in range(0, len(follower_ids), BATCH_SIZE):
batch = follower_ids[i:i+BATCH_SIZE]
pipeline = redis.pipeline()
for follower_id in batch:
pipeline.zadd(f"timeline:{follower_id}", {post_id: timestamp})
pipeline.execute() # Single network round-tripPerformance:
- Without batching: 10K followers = 10K Redis calls
- With batching: 10K followers = 10 Redis calls
Load Balancer (ALB)
├── API Instance 1
├── API Instance 2
└── API Instance 3
Auto-scaling triggers:
- CPU > 70%
- Request count > 1000/minStateless Design:
- No in-memory session storage
- JWT for authentication
- All state in Redis/Database
SQS Queue
├── Worker Instance 1
├── Worker Instance 2
└── Worker Instance 3
Auto-scaling triggers:
- Queue depth > 1000 messages
- Oldest message age > 5 minutesParallel Processing:
- Multiple workers reading same queue
- SQS ensures no duplicate processing (visibility timeout)
┌──────────┐
│ Primary │ (writes)
└────┬─────┘
│ Replication
├─────────┬─────────┐
▼ ▼ ▼
Replica1 Replica2 Replica3
(reads) (reads) (reads)
Query Routing:
# Writes
post = Post(...)
primary_db.add(post)
# Reads
posts = read_replica_db.query(Post).all()Shard by user_id:
- Shard 0: user_id % 4 == 0
- Shard 1: user_id % 4 == 1
- Shard 2: user_id % 4 == 2
- Shard 3: user_id % 4 == 3
Challenges:
- Cross-shard queries (followers on different shards)
- Rebalancing (add/remove shards)
16,384 hash slots
Slot assignment: CRC16(key) % 16384
Example:
- Node 1: slots 0-5460
- Node 2: slots 5461-10922
- Node 3: slots 10923-16383
Key Distribution:
key = f"timeline:{user_id}"
slot = crc16(key) % 16384
node = get_node_for_slot(slot)Detection:
def is_available():
try:
redis.ping()
return True
except:
return FalseFallback:
def get_timeline(user_id):
# Try cache first
if redis.is_available():
return redis.get_timeline(user_id)
# Fallback to database
return db.query(Post).filter(...).all()Impact:
- Latency: 50ms → 200ms
- Load: Database sees more queries
- Recovery: Automatic (cache gradually rebuilds)
Connection Pool:
engine = create_engine(
url,
pool_size=10, # 10 persistent connections
max_overflow=20, # 20 temporary connections
pool_pre_ping=True # Verify before using
)Retry Logic:
from tenacity import retry, stop_after_attempt
@retry(stop=stop_after_attempt(3))
def save_post(post):
db.add(post)
db.commit()Message Visibility:
1. Worker receives message
2. Visibility timeout starts (30s)
3. If worker crashes, message becomes visible again
4. Another worker processes it
Dead Letter Queue:
Max receives: 3
If failed 3 times → Move to DLQ
Manual intervention required
Graceful Shutdown:
def signal_handler(sig, frame):
logger.info("Shutting down gracefully...")
# Finish current message
# Don't receive new messages
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)Recovery:
- In-flight messages: Reprocessed after visibility timeout
- New messages: Picked up by other workers
Before:
# N+1 query problem
posts = db.query(Post).all()
for post in posts:
author = db.query(User).get(post.author_id) # N queries!After:
# Eager loading
posts = db.query(Post).options(
joinedload(Post.author)
).all() # 1 query with JOIN# Reuse database connections
SessionLocal = sessionmaker(
autocommit=False,
autoflush=False,
bind=engine
)
# Per-request session
@app.get("/posts")
def get_posts(db: Session = Depends(get_db)):
return db.query(Post).all()
# Session returned to pool automatically# Bad: Multiple round-trips
for user_id in user_ids:
redis.get(f"timeline:{user_id}")
# Good: Single round-trip
pipeline = redis.pipeline()
for user_id in user_ids:
pipeline.get(f"timeline:{user_id}")
results = pipeline.execute()API Layer:
- Request rate (requests/sec)
- Error rate (4xx, 5xx)
- Latency (P50, P95, P99)
Database:
- Query time
- Connection pool usage
- Slow queries (>100ms)
Redis:
- Hit rate
- Memory usage
- Evictions
Worker:
- Messages processed/sec
- Processing time
- Queue depth
@app.get("/system/health")
def health_check():
return {
"database": check_database(),
"redis": check_redis(),
"sqs": check_sqs()
}Usage:
- Load balancer health checks
- Monitoring alerts
- Deployment verification
- JWT with expiration
- Bcrypt password hashing (cost=12)
- HTTPS only (production)
- User can only modify own resources
- Admin endpoints (if added) require role check
- Pydantic models validate all inputs
- SQL injection: SQLAlchemy parameterized queries
- XSS: Escaped output (if HTML rendering)
- Not implemented (TODO)
- Production: Use Redis for rate limit counters
-
Cursor-based Pagination
- Better for real-time feeds
- Prevents duplicate/skipped posts
-
Post Reactions (likes, comments)
- New tables:
reactions,comments - Counter cache in Redis
- New tables:
-
Real-time Updates (WebSocket)
- New posts appear without refresh
- AWS AppSync or WebSocket API
-
Content Delivery Network
- Cache static content
- Reduce API load
-
Machine Learning
- Personalized ranking
- Spam detection
- Recommendation engine
This architecture balances simplicity with production-readiness, demonstrating key distributed systems concepts while remaining demo-friendly.