Skip to content

Latest commit

 

History

History
649 lines (504 loc) · 14.2 KB

File metadata and controls

649 lines (504 loc) · 14.2 KB

Broadcasting in WsForge

Broadcasting is the ability to send messages to multiple WebSocket connections simultaneously. WsForge provides efficient, lock-free broadcasting mechanisms perfect for real-time applications.

Table of Contents

Overview

Broadcasting allows you to:

  • Send messages to all connected clients
  • Notify users about events (user joined/left, new content)
  • Synchronize state across clients
  • Implement chat rooms and game lobbies
  • Build collaborative applications

WsForge uses DashMap for connection management, providing lock-free concurrent access with O(n) broadcast complexity where n is the number of connections.

Broadcasting Methods

1. Broadcast to All

Send a message to every connected client:

use wsforge::prelude::*;
use std::sync::Arc;

async fn broadcast_all(
    msg: Message,
    State(manager): State<Arc<ConnectionManager>>,
) -> Result<()> {
    manager.broadcast(msg);
    Ok(())
}

Use cases: Server announcements, global events, system messages

2. Broadcast Except Sender

Send to everyone except the message sender:

use wsforge::prelude::*;
use std::sync::Arc;

async fn broadcast_others(
    msg: Message,
    conn: Connection,
    State(manager): State<Arc<ConnectionManager>>,
) -> Result<()> {
    manager.broadcast_except(conn.id(), msg);
    Ok(())
}

Use cases: Chat messages, user actions, multiplayer game moves

3. Targeted Broadcasting

Send to specific connections:

use wsforge::prelude::*;
use std::sync::Arc;

async fn broadcast_to_room(
    msg: Message,
    State(manager): State<Arc<ConnectionManager>>,
) -> Result<()> {
    let room_members = vec![
        "conn_1".to_string(),
        "conn_5".to_string(),
        "conn_10".to_string(),
    ];

    manager.broadcast_to(&room_members, msg);
    Ok(())
}

Use cases: Private groups, game rooms, team channels

Basic Examples

Simple Chat Application

use wsforge::prelude::*;
use std::sync::Arc;
use serde::{Deserialize, Serialize};

#[derive(Deserialize, Serialize)]
struct ChatMessage {
    username: String,
    text: String,
    timestamp: u64,
}

async fn chat_handler(
    Json(msg): Json<ChatMessage>,
    conn: Connection,
    State(manager): State<Arc<ConnectionManager>>,
) -> Result<()> {
    println!("{}: {}", msg.username, msg.text);

    // Broadcast to everyone except sender
    let json = serde_json::to_string(&msg)?;
    manager.broadcast_except(conn.id(), Message::text(json));

    Ok(())
}

#[tokio::main]
async fn main() -> Result<()> {
    let router = Router::new()
        .default_handler(handler(chat_handler))
        .on_connect(|manager, conn_id| {
            println!("✅ {} joined", conn_id);

            // Notify everyone about new user
            let msg = format!(r#"{{"type":"join","user":"{}"}}"#, conn_id);
            manager.broadcast(Message::text(msg));
        })
        .on_disconnect(|manager, conn_id| {
            println!("❌ {} left", conn_id);

            // Notify everyone about user leaving
            let msg = format!(r#"{{"type":"leave","user":"{}"}}"#, conn_id);
            manager.broadcast(Message::text(msg));
        });

    router.listen("127.0.0.1:8080").await?;
    Ok(())
}

Real-Time Dashboard

use wsforge::prelude::*;
use std::sync::Arc;
use serde::Serialize;

#[derive(Serialize)]
struct DashboardUpdate {
    metric: String,
    value: f64,
    timestamp: u64,
}

async fn update_dashboard(
    State(manager): State<Arc<ConnectionManager>>,
) -> Result<()> {
    let update = DashboardUpdate {
        metric: "cpu_usage".to_string(),
        value: 45.2,
        timestamp: current_timestamp(),
    };

    let json = serde_json::to_string(&update)?;
    manager.broadcast(Message::text(json));

    Ok(())
}

// Periodic updates
async fn start_periodic_updates(manager: Arc<ConnectionManager>) {
    use tokio::time::{interval, Duration};

    let mut ticker = interval(Duration::from_secs(5));

    loop {
        ticker.tick().await;

        let update = DashboardUpdate {
            metric: "active_users".to_string(),
            value: manager.count() as f64,
            timestamp: current_timestamp(),
        };

        let json = serde_json::to_string(&update).unwrap();
        manager.broadcast(Message::text(json));
    }
}

fn current_timestamp() -> u64 {
    std::time::SystemTime::now()
        .duration_since(std::time::UNIX_EPOCH)
        .unwrap()
        .as_secs()
}

Advanced Patterns

Room-Based Broadcasting

use wsforge::prelude::*;
use std::sync::Arc;
use std::collections::HashMap;
use tokio::sync::RwLock;

#[derive(Clone)]
struct RoomManager {
    rooms: Arc<RwLock<HashMap<String, Vec<String>>>>,
}

impl RoomManager {
    fn new() -> Self {
        Self {
            rooms: Arc::new(RwLock::new(HashMap::new())),
        }
    }

    async fn join_room(&self, room: String, conn_id: String) {
        let mut rooms = self.rooms.write().await;
        rooms.entry(room).or_insert_with(Vec::new).push(conn_id);
    }

    async fn leave_room(&self, room: &str, conn_id: &str) {
        let mut rooms = self.rooms.write().await;
        if let Some(members) = rooms.get_mut(room) {
            members.retain(|id| id != conn_id);
        }
    }

    async fn get_room_members(&self, room: &str) -> Vec<String> {
        let rooms = self.rooms.read().await;
        rooms.get(room).cloned().unwrap_or_default()
    }
}

async fn room_message_handler(
    Json(msg): Json<RoomMessage>,
    conn: Connection,
    State(room_mgr): State<Arc<RoomManager>>,
    State(conn_mgr): State<Arc<ConnectionManager>>,
) -> Result<()> {
    let members = room_mgr.get_room_members(&msg.room).await;

    let json = serde_json::to_string(&msg)?;
    conn_mgr.broadcast_to(&members, Message::text(json));

    Ok(())
}

#[derive(serde::Deserialize, serde::Serialize)]
struct RoomMessage {
    room: String,
    text: String,
}

Filtered Broadcasting

use wsforge::prelude::*;
use std::sync::Arc;

async fn broadcast_to_admins(
    msg: Message,
    State(manager): State<Arc<ConnectionManager>>,
    State(user_roles): State<Arc<UserRoles>>,
) -> Result<()> {
    let admin_connections: Vec<String> = manager
        .all_ids()
        .into_iter()
        .filter(|id| user_roles.is_admin(id))
        .collect();

    manager.broadcast_to(&admin_connections, msg);
    Ok(())
}

struct UserRoles {
    admins: std::collections::HashSet<String>,
}

impl UserRoles {
    fn is_admin(&self, conn_id: &str) -> bool {
        self.admins.contains(conn_id)
    }
}

Conditional Broadcasting

use wsforge::prelude::*;
use std::sync::Arc;

async fn smart_broadcast(
    msg: Message,
    State(manager): State<Arc<ConnectionManager>>,
) -> Result<()> {
    let connections = manager.all_connections();

    for conn in connections {
        // Only send to connections that meet certain criteria
        if should_receive_message(&conn, &msg) {
            conn.send(msg.clone())?;
        }
    }

    Ok(())
}

fn should_receive_message(conn: &Connection, msg: &Message) -> bool {
    // Custom logic: check subscriptions, permissions, etc.
    true
}

Performance Considerations

Broadcasting Efficiency

WsForge broadcasting is designed for performance:

  1. Lock-Free: Uses DashMap for concurrent access without locks
  2. Parallel: Each send operation is independent
  3. Non-Blocking: Failed sends don't block others

Performance metrics:

  • Broadcasting to 1,000 connections: ~1ms
  • Broadcasting to 10,000 connections: ~10ms
  • Memory overhead: Minimal (message is cloned, not duplicated)

Optimization Tips

1. Pre-serialize Messages

async fn optimized_broadcast(
    data: &serde_json::Value,
    State(manager): State<Arc<ConnectionManager>>,
) -> Result<()> {
    // Serialize once
    let json = serde_json::to_string(data)?;
    let msg = Message::text(json);

    // Broadcast pre-serialized message
    manager.broadcast(msg);
    Ok(())
}

2. Batch Updates

async fn batch_broadcast(
    updates: Vec<Update>,
    State(manager): State<Arc<ConnectionManager>>,
) -> Result<()> {
    // Combine multiple updates into one message
    let batch = BatchUpdate { updates };
    let json = serde_json::to_string(&batch)?;

    manager.broadcast(Message::text(json));
    Ok(())
}

3. Rate Limiting

use tokio::time::{interval, Duration};

async fn rate_limited_broadcast(
    manager: Arc<ConnectionManager>,
) {
    let mut ticker = interval(Duration::from_millis(100));
    let mut pending_messages = Vec::new();

    loop {
        ticker.tick().await;

        if !pending_messages.is_empty() {
            let combined = combine_messages(&pending_messages);
            manager.broadcast(combined);
            pending_messages.clear();
        }
    }
}

fn combine_messages(messages: &[Message]) -> Message {
    // Combine logic
    Message::text("combined")
}

Best Practices

1. Handle Broadcast Errors Gracefully

async fn safe_broadcast(
    msg: Message,
    State(manager): State<Arc<ConnectionManager>>,
) -> Result<()> {
    // WsForge handles errors internally, but you can check results
    manager.broadcast(msg.clone());

    // Log broadcast stats
    println!("Broadcasted to {} connections", manager.count());

    Ok(())
}

2. Use Message Types

#[derive(serde::Serialize, serde::Deserialize)]
#[serde(tag = "type")]
enum MessageType {
    #[serde(rename = "chat")]
    Chat { username: String, text: String },

    #[serde(rename = "notification")]
    Notification { title: String, body: String },

    #[serde(rename = "update")]
    Update { field: String, value: serde_json::Value },
}

async fn typed_broadcast(
    msg_type: MessageType,
    State(manager): State<Arc<ConnectionManager>>,
) -> Result<()> {
    let json = serde_json::to_string(&msg_type)?;
    manager.broadcast(Message::text(json));
    Ok(())
}

3. Monitor Performance

use std::time::Instant;

async fn monitored_broadcast(
    msg: Message,
    State(manager): State<Arc<ConnectionManager>>,
) -> Result<()> {
    let start = Instant::now();
    let count = manager.count();

    manager.broadcast(msg);

    let duration = start.elapsed();
    println!("Broadcasted to {} connections in {:?}", count, duration);

    Ok(())
}

Real-World Examples

Multiplayer Game Server

use wsforge::prelude::*;
use std::sync::Arc;
use serde::{Deserialize, Serialize};

#[derive(Deserialize, Serialize)]
struct PlayerMove {
    player_id: u64,
    x: f32,
    y: f32,
    action: String,
}

async fn game_handler(
    Json(player_move): Json<PlayerMove>,
    conn: Connection,
    State(manager): State<Arc<ConnectionManager>>,
) -> Result<()> {
    // Broadcast player move to all other players
    let json = serde_json::to_string(&player_move)?;
    manager.broadcast_except(conn.id(), Message::text(json));

    Ok(())
}

Live Collaborative Editor

use wsforge::prelude::*;
use std::sync::Arc;
use serde::{Deserialize, Serialize};

#[derive(Deserialize, Serialize)]
struct DocumentChange {
    doc_id: String,
    position: usize,
    change_type: String,
    content: String,
    user_id: String,
}

async fn editor_handler(
    Json(change): Json<DocumentChange>,
    conn: Connection,
    State(manager): State<Arc<ConnectionManager>>,
) -> Result<()> {
    // Broadcast edit to all users viewing the same document
    let json = serde_json::to_string(&change)?;
    manager.broadcast_except(conn.id(), Message::text(json));

    Ok(())
}

Stock Price Updates

use wsforge::prelude::*;
use std::sync::Arc;
use tokio::time::{interval, Duration};

#[derive(serde::Serialize)]
struct PriceUpdate {
    symbol: String,
    price: f64,
    timestamp: u64,
}

async fn start_price_feed(manager: Arc<ConnectionManager>) {
    let mut ticker = interval(Duration::from_secs(1));

    loop {
        ticker.tick().await;

        let update = PriceUpdate {
            symbol: "AAPL".to_string(),
            price: get_current_price("AAPL"),
            timestamp: current_timestamp(),
        };

        let json = serde_json::to_string(&update).unwrap();
        manager.broadcast(Message::text(json));
    }
}

fn get_current_price(_symbol: &str) -> f64 {
    // Fetch real price
    150.25
}

fn current_timestamp() -> u64 {
    std::time::SystemTime::now()
        .duration_since(std::time::UNIX_EPOCH)
        .unwrap()
        .as_secs()
}

Common Patterns

Presence System

async fn presence_update(
    user_id: String,
    status: String,
    State(manager): State<Arc<ConnectionManager>>,
) -> Result<()> {
    let presence = serde_json::json!({
        "type": "presence",
        "user_id": user_id,
        "status": status,
    });

    let json = serde_json::to_string(&presence)?;
    manager.broadcast(Message::text(json));

    Ok(())
}

Typing Indicators

async fn typing_indicator(
    user_id: String,
    is_typing: bool,
    State(manager): State<Arc<ConnectionManager>>,
) -> Result<()> {
    let indicator = serde_json::json!({
        "type": "typing",
        "user_id": user_id,
        "is_typing": is_typing,
    });

    let json = serde_json::to_string(&indicator)?;
    manager.broadcast(Message::text(json));

    Ok(())
}

Troubleshooting

Issue: Slow Broadcasting

Solution: Check connection count and message size

if manager.count() > 10_000 {
    // Consider batching or filtering
    eprintln!("Warning: Broadcasting to {} connections", manager.count());
}

Issue: Memory Usage

Solution: Avoid storing messages, broadcast immediately

// Good: Immediate broadcast
manager.broadcast(Message::text(json));

// Bad: Storing messages
let mut pending = Vec::new();
pending.push(message); // Memory grows!

Next Steps


Need help? Check the FAQ or open an issue