Broadcasting is the ability to send messages to multiple WebSocket connections simultaneously. WsForge provides efficient, lock-free broadcasting mechanisms perfect for real-time applications.
- Overview
- Broadcasting Methods
- Basic Examples
- Advanced Patterns
- Performance Considerations
- Best Practices
- Real-World Examples
Broadcasting allows you to:
- Send messages to all connected clients
- Notify users about events (user joined/left, new content)
- Synchronize state across clients
- Implement chat rooms and game lobbies
- Build collaborative applications
WsForge uses DashMap for connection management, providing lock-free concurrent access with O(n) broadcast complexity where n is the number of connections.
Send a message to every connected client:
use wsforge::prelude::*;
use std::sync::Arc;
async fn broadcast_all(
msg: Message,
State(manager): State<Arc<ConnectionManager>>,
) -> Result<()> {
manager.broadcast(msg);
Ok(())
}
Use cases: Server announcements, global events, system messages
Send to everyone except the message sender:
use wsforge::prelude::*;
use std::sync::Arc;
async fn broadcast_others(
msg: Message,
conn: Connection,
State(manager): State<Arc<ConnectionManager>>,
) -> Result<()> {
manager.broadcast_except(conn.id(), msg);
Ok(())
}
Use cases: Chat messages, user actions, multiplayer game moves
Send to specific connections:
use wsforge::prelude::*;
use std::sync::Arc;
async fn broadcast_to_room(
msg: Message,
State(manager): State<Arc<ConnectionManager>>,
) -> Result<()> {
let room_members = vec![
"conn_1".to_string(),
"conn_5".to_string(),
"conn_10".to_string(),
];
manager.broadcast_to(&room_members, msg);
Ok(())
}
Use cases: Private groups, game rooms, team channels
use wsforge::prelude::*;
use std::sync::Arc;
use serde::{Deserialize, Serialize};
#[derive(Deserialize, Serialize)]
struct ChatMessage {
username: String,
text: String,
timestamp: u64,
}
async fn chat_handler(
Json(msg): Json<ChatMessage>,
conn: Connection,
State(manager): State<Arc<ConnectionManager>>,
) -> Result<()> {
println!("{}: {}", msg.username, msg.text);
// Broadcast to everyone except sender
let json = serde_json::to_string(&msg)?;
manager.broadcast_except(conn.id(), Message::text(json));
Ok(())
}
#[tokio::main]
async fn main() -> Result<()> {
let router = Router::new()
.default_handler(handler(chat_handler))
.on_connect(|manager, conn_id| {
println!("✅ {} joined", conn_id);
// Notify everyone about new user
let msg = format!(r#"{{"type":"join","user":"{}"}}"#, conn_id);
manager.broadcast(Message::text(msg));
})
.on_disconnect(|manager, conn_id| {
println!("❌ {} left", conn_id);
// Notify everyone about user leaving
let msg = format!(r#"{{"type":"leave","user":"{}"}}"#, conn_id);
manager.broadcast(Message::text(msg));
});
router.listen("127.0.0.1:8080").await?;
Ok(())
}
use wsforge::prelude::*;
use std::sync::Arc;
use serde::Serialize;
#[derive(Serialize)]
struct DashboardUpdate {
metric: String,
value: f64,
timestamp: u64,
}
async fn update_dashboard(
State(manager): State<Arc<ConnectionManager>>,
) -> Result<()> {
let update = DashboardUpdate {
metric: "cpu_usage".to_string(),
value: 45.2,
timestamp: current_timestamp(),
};
let json = serde_json::to_string(&update)?;
manager.broadcast(Message::text(json));
Ok(())
}
// Periodic updates
async fn start_periodic_updates(manager: Arc<ConnectionManager>) {
use tokio::time::{interval, Duration};
let mut ticker = interval(Duration::from_secs(5));
loop {
ticker.tick().await;
let update = DashboardUpdate {
metric: "active_users".to_string(),
value: manager.count() as f64,
timestamp: current_timestamp(),
};
let json = serde_json::to_string(&update).unwrap();
manager.broadcast(Message::text(json));
}
}
fn current_timestamp() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs()
}
use wsforge::prelude::*;
use std::sync::Arc;
use std::collections::HashMap;
use tokio::sync::RwLock;
#[derive(Clone)]
struct RoomManager {
rooms: Arc<RwLock<HashMap<String, Vec<String>>>>,
}
impl RoomManager {
fn new() -> Self {
Self {
rooms: Arc::new(RwLock::new(HashMap::new())),
}
}
async fn join_room(&self, room: String, conn_id: String) {
let mut rooms = self.rooms.write().await;
rooms.entry(room).or_insert_with(Vec::new).push(conn_id);
}
async fn leave_room(&self, room: &str, conn_id: &str) {
let mut rooms = self.rooms.write().await;
if let Some(members) = rooms.get_mut(room) {
members.retain(|id| id != conn_id);
}
}
async fn get_room_members(&self, room: &str) -> Vec<String> {
let rooms = self.rooms.read().await;
rooms.get(room).cloned().unwrap_or_default()
}
}
async fn room_message_handler(
Json(msg): Json<RoomMessage>,
conn: Connection,
State(room_mgr): State<Arc<RoomManager>>,
State(conn_mgr): State<Arc<ConnectionManager>>,
) -> Result<()> {
let members = room_mgr.get_room_members(&msg.room).await;
let json = serde_json::to_string(&msg)?;
conn_mgr.broadcast_to(&members, Message::text(json));
Ok(())
}
#[derive(serde::Deserialize, serde::Serialize)]
struct RoomMessage {
room: String,
text: String,
}
use wsforge::prelude::*;
use std::sync::Arc;
async fn broadcast_to_admins(
msg: Message,
State(manager): State<Arc<ConnectionManager>>,
State(user_roles): State<Arc<UserRoles>>,
) -> Result<()> {
let admin_connections: Vec<String> = manager
.all_ids()
.into_iter()
.filter(|id| user_roles.is_admin(id))
.collect();
manager.broadcast_to(&admin_connections, msg);
Ok(())
}
struct UserRoles {
admins: std::collections::HashSet<String>,
}
impl UserRoles {
fn is_admin(&self, conn_id: &str) -> bool {
self.admins.contains(conn_id)
}
}
use wsforge::prelude::*;
use std::sync::Arc;
async fn smart_broadcast(
msg: Message,
State(manager): State<Arc<ConnectionManager>>,
) -> Result<()> {
let connections = manager.all_connections();
for conn in connections {
// Only send to connections that meet certain criteria
if should_receive_message(&conn, &msg) {
conn.send(msg.clone())?;
}
}
Ok(())
}
fn should_receive_message(conn: &Connection, msg: &Message) -> bool {
// Custom logic: check subscriptions, permissions, etc.
true
}
WsForge broadcasting is designed for performance:
- Lock-Free: Uses DashMap for concurrent access without locks
- Parallel: Each send operation is independent
- Non-Blocking: Failed sends don't block others
Performance metrics:
- Broadcasting to 1,000 connections: ~1ms
- Broadcasting to 10,000 connections: ~10ms
- Memory overhead: Minimal (message is cloned, not duplicated)
async fn optimized_broadcast(
data: &serde_json::Value,
State(manager): State<Arc<ConnectionManager>>,
) -> Result<()> {
// Serialize once
let json = serde_json::to_string(data)?;
let msg = Message::text(json);
// Broadcast pre-serialized message
manager.broadcast(msg);
Ok(())
}
async fn batch_broadcast(
updates: Vec<Update>,
State(manager): State<Arc<ConnectionManager>>,
) -> Result<()> {
// Combine multiple updates into one message
let batch = BatchUpdate { updates };
let json = serde_json::to_string(&batch)?;
manager.broadcast(Message::text(json));
Ok(())
}
use tokio::time::{interval, Duration};
async fn rate_limited_broadcast(
manager: Arc<ConnectionManager>,
) {
let mut ticker = interval(Duration::from_millis(100));
let mut pending_messages = Vec::new();
loop {
ticker.tick().await;
if !pending_messages.is_empty() {
let combined = combine_messages(&pending_messages);
manager.broadcast(combined);
pending_messages.clear();
}
}
}
fn combine_messages(messages: &[Message]) -> Message {
// Combine logic
Message::text("combined")
}
async fn safe_broadcast(
msg: Message,
State(manager): State<Arc<ConnectionManager>>,
) -> Result<()> {
// WsForge handles errors internally, but you can check results
manager.broadcast(msg.clone());
// Log broadcast stats
println!("Broadcasted to {} connections", manager.count());
Ok(())
}
#[derive(serde::Serialize, serde::Deserialize)]
#[serde(tag = "type")]
enum MessageType {
#[serde(rename = "chat")]
Chat { username: String, text: String },
#[serde(rename = "notification")]
Notification { title: String, body: String },
#[serde(rename = "update")]
Update { field: String, value: serde_json::Value },
}
async fn typed_broadcast(
msg_type: MessageType,
State(manager): State<Arc<ConnectionManager>>,
) -> Result<()> {
let json = serde_json::to_string(&msg_type)?;
manager.broadcast(Message::text(json));
Ok(())
}
use std::time::Instant;
async fn monitored_broadcast(
msg: Message,
State(manager): State<Arc<ConnectionManager>>,
) -> Result<()> {
let start = Instant::now();
let count = manager.count();
manager.broadcast(msg);
let duration = start.elapsed();
println!("Broadcasted to {} connections in {:?}", count, duration);
Ok(())
}
use wsforge::prelude::*;
use std::sync::Arc;
use serde::{Deserialize, Serialize};
#[derive(Deserialize, Serialize)]
struct PlayerMove {
player_id: u64,
x: f32,
y: f32,
action: String,
}
async fn game_handler(
Json(player_move): Json<PlayerMove>,
conn: Connection,
State(manager): State<Arc<ConnectionManager>>,
) -> Result<()> {
// Broadcast player move to all other players
let json = serde_json::to_string(&player_move)?;
manager.broadcast_except(conn.id(), Message::text(json));
Ok(())
}
use wsforge::prelude::*;
use std::sync::Arc;
use serde::{Deserialize, Serialize};
#[derive(Deserialize, Serialize)]
struct DocumentChange {
doc_id: String,
position: usize,
change_type: String,
content: String,
user_id: String,
}
async fn editor_handler(
Json(change): Json<DocumentChange>,
conn: Connection,
State(manager): State<Arc<ConnectionManager>>,
) -> Result<()> {
// Broadcast edit to all users viewing the same document
let json = serde_json::to_string(&change)?;
manager.broadcast_except(conn.id(), Message::text(json));
Ok(())
}
use wsforge::prelude::*;
use std::sync::Arc;
use tokio::time::{interval, Duration};
#[derive(serde::Serialize)]
struct PriceUpdate {
symbol: String,
price: f64,
timestamp: u64,
}
async fn start_price_feed(manager: Arc<ConnectionManager>) {
let mut ticker = interval(Duration::from_secs(1));
loop {
ticker.tick().await;
let update = PriceUpdate {
symbol: "AAPL".to_string(),
price: get_current_price("AAPL"),
timestamp: current_timestamp(),
};
let json = serde_json::to_string(&update).unwrap();
manager.broadcast(Message::text(json));
}
}
fn get_current_price(_symbol: &str) -> f64 {
// Fetch real price
150.25
}
fn current_timestamp() -> u64 {
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs()
}
async fn presence_update(
user_id: String,
status: String,
State(manager): State<Arc<ConnectionManager>>,
) -> Result<()> {
let presence = serde_json::json!({
"type": "presence",
"user_id": user_id,
"status": status,
});
let json = serde_json::to_string(&presence)?;
manager.broadcast(Message::text(json));
Ok(())
}
async fn typing_indicator(
user_id: String,
is_typing: bool,
State(manager): State<Arc<ConnectionManager>>,
) -> Result<()> {
let indicator = serde_json::json!({
"type": "typing",
"user_id": user_id,
"is_typing": is_typing,
});
let json = serde_json::to_string(&indicator)?;
manager.broadcast(Message::text(json));
Ok(())
}
Solution: Check connection count and message size
if manager.count() > 10_000 {
// Consider batching or filtering
eprintln!("Warning: Broadcasting to {} connections", manager.count());
}
Solution: Avoid storing messages, broadcast immediately
// Good: Immediate broadcast
manager.broadcast(Message::text(json));
// Bad: Storing messages
let mut pending = Vec::new();
pending.push(message); // Memory grows!
- State Management - Manage shared data
- Routing - Route messages to specific handlers
- Performance - Optimize your application
Need help? Check the FAQ or open an issue