Python WebSockets — Real-Time Communication
WebSockets enable bidirectional, real-time communication between client and server. Unlike HTTP (request-response), WebSockets maintain a persistent connection for live updates. This tutorial covers the WebSocket protocol, building servers with FastAPI, handling concurrent connections, and building real-world applications.
Learning Objectives
- Understand WebSocket vs HTTP trade-offs
- Build WebSocket servers with FastAPI
- Handle concurrent connections with a connection manager
- Implement chat and notification systems
- Add authentication to WebSocket connections
- Build a complete real-time notification system
WebSocket vs HTTP
Architecture Diagram
HTTP (Request-Response):
Client --► "Give me data" --► Server
Client ◄-- "Here is data" ◄-- Server
(Connection closes)
WebSocket (Persistent):
Client ◄---- Persistent Connection ----► Server
Client --► "Hello" ----------------------► Server
Client ◄-- "Response" ◄------------------ Server
Client ◄-- "Update" ◄------------------ Server (server can push!)
Client --► "Message" --------------------► Server
Feature Comparison
| Feature | HTTP | WebSocket |
|---|---|---|
| Connection | Request-response | Persistent, bidirectional |
| Server push | Polling/SSE only | Native |
| Overhead | Headers per request | Minimal after handshake |
| Use case | REST APIs, pages | Chat, gaming, live updates |
| Protocol | http:// | ws:// or wss:// |
| Browser support | Universal | Universal |
Use cases: Chat apps, live dashboards, multiplayer games, real-time notifications, live sports scores, collaborative editing.
FastAPI WebSocket Server
Basic Echo Server
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
app = FastAPI()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
await websocket.send_text(f"Echo: {data}")
except WebSocketDisconnect:
print("Client disconnected")
Connection Manager
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List, Dict, Set
class ConnectionManager:
"""Manages active WebSocket connections."""
def __init__(self):
self.active_connections: List[WebSocket] = []
self.user_connections: Dict[str, Set[WebSocket]] = {}
async def connect(self, websocket: WebSocket, user_id: str = None):
await websocket.accept()
self.active_connections.append(websocket)
if user_id:
if user_id not in self.user_connections:
self.user_connections[user_id] = set()
self.user_connections[user_id].add(websocket)
def disconnect(self, websocket: WebSocket, user_id: str = None):
self.active_connections.remove(websocket)
if user_id and user_id in self.user_connections:
self.user_connections[user_id].discard(websocket)
if not self.user_connections[user_id]:
del self.user_connections[user_id]
async def send_personal(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def send_to_user(self, user_id: str, message: str):
if user_id in self.user_connections:
for connection in self.user_connections[user_id]:
await connection.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
def get_online_users(self) -> List[str]:
return list(self.user_connections.keys())
manager = ConnectionManager()
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
await manager.connect(websocket, client_id)
try:
await manager.broadcast(f"{client_id} joined the chat")
while True:
data = await websocket.receive_text()
await manager.send_personal(f"You said: {data}", websocket)
await manager.broadcast(f"{client_id}: {data}")
except WebSocketDisconnect:
manager.disconnect(websocket, client_id)
await manager.broadcast(f"{client_id} left the chat")
WebSocket Client
Basic Client
import asyncio
import websockets
async def client():
async with websockets.connect("ws://localhost:8000/ws/alice") as ws:
# Send message
await ws.send("Hello, server!")
# Receive response
response = await ws.recv()
print(f"Received: {response}")
asyncio.run(client())
Multi-User Client
import asyncio
import websockets
import json
async def user_client(user_id: str, messages: list):
async with websockets.connect(f"ws://localhost:8000/ws/{user_id}") as ws:
for message in messages:
await ws.send(message)
response = await ws.recv()
print(f"{user_id} received: {response}")
async def main():
await asyncio.gather(
user_client("alice", ["Hi!", "How are you?"]),
user_client("bob", ["Hey!", "Good thanks!"]),
)
asyncio.run(main())
Real-Time Chat Application
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Query
from fastapi.responses import HTMLResponse
from typing import Dict, Set
import json
from datetime import datetime
app = FastAPI()
class ChatRoom:
def __init__(self):
self.connections: Dict[str, WebSocket] = {}
self.history: list = []
async def connect(self, user_id: str, websocket: WebSocket):
await websocket.accept()
self.connections[user_id] = websocket
await self.broadcast({
"type": "user_joined",
"user_id": user_id,
"users": list(self.connections.keys()),
"timestamp": datetime.now().isoformat(),
})
def disconnect(self, user_id: str):
if user_id in self.connections:
del self.connections[user_id]
async def broadcast(self, message: dict):
self.history.append(message)
for connection in self.connections.values():
await connection.send_json(message)
async def send_to_user(self, user_id: str, message: dict):
if user_id in self.connections:
await self.connections[user_id].send_json(message)
rooms: Dict[str, ChatRoom] = {}
@app.websocket("/ws/chat/{room_id}")
async def chat_endpoint(
websocket: WebSocket,
room_id: str,
user_id: str = Query(...)
):
if room_id not in rooms:
rooms[room_id] = ChatRoom()
room = rooms[room_id]
await room.connect(user_id, websocket)
try:
# Send chat history
for msg in room.history[-50:]: # Last 50 messages
await websocket.send_json(msg)
while True:
data = await websocket.receive_json()
if data["type"] == "message":
await room.broadcast({
"type": "message",
"user_id": user_id,
"content": data["content"],
"timestamp": datetime.now().isoformat(),
})
elif data["type"] == "private":
target = data["target"]
await room.send_to_user(target, {
"type": "private",
"from": user_id,
"content": data["content"],
"timestamp": datetime.now().isoformat(),
})
except WebSocketDisconnect:
room.disconnect(user_id)
await room.broadcast({
"type": "user_left",
"user_id": user_id,
"users": list(room.connections.keys()),
"timestamp": datetime.now().isoformat(),
})
Real-Time Notification System
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException
from fastapi.responses import HTMLResponse
from typing import Dict, Set, List
from datetime import datetime
import json
import asyncio
app = FastAPI()
class NotificationManager:
def __init__(self):
self.subscriptions: Dict[str, WebSocket] = {}
self.notification_queue: Dict[str, List[dict]] = {}
async def subscribe(self, user_id: str, ws: WebSocket):
await ws.accept()
self.subscriptions[user_id] = ws
# Send queued notifications
if user_id in self.notification_queue:
for notification in self.notification_queue[user_id]:
await ws.send_json(notification)
del self.notification_queue[user_id]
def unsubscribe(self, user_id: str):
if user_id in self.subscriptions:
del self.subscriptions[user_id]
async def notify(self, user_id: str, notification: dict):
notification["timestamp"] = datetime.now().isoformat()
if user_id in self.subscriptions:
try:
await self.subscriptions[user_id].send_json(notification)
except:
self.unsubscribe(user_id)
# Queue for later
self._queue_notification(user_id, notification)
else:
self._queue_notification(user_id, notification)
async def broadcast(self, notification: dict):
notification["timestamp"] = datetime.now().isoformat()
disconnected = []
for user_id, ws in self.subscriptions.items():
try:
await ws.send_json(notification)
except:
disconnected.append(user_id)
for user_id in disconnected:
self.unsubscribe(user_id)
def _queue_notification(self, user_id: str, notification: dict):
if user_id not in self.notification_queue:
self.notification_queue[user_id] = []
self.notification_queue[user_id].append(notification)
# Keep only last 100 queued notifications
self.notification_queue[user_id] = self.notification_queue[user_id][-100:]
manager = NotificationManager()
@app.websocket("/ws/notifications")
async def notification_endpoint(
websocket: WebSocket,
user_id: str
):
await manager.subscribe(user_id, websocket)
try:
while True:
# Keep connection alive, handle client messages
data = await websocket.receive_text()
if data == "ping":
await websocket.send_json({"type": "pong"})
except WebSocketDisconnect:
manager.unsubscribe(user_id)
# API to send notifications
@app.post("/api/notify/{user_id}")
async def send_notification(user_id: str, notification: dict):
await manager.notify(user_id, notification)
return {"status": "sent"}
@app.post("/api/broadcast")
async def broadcast_notification(notification: dict):
await manager.broadcast(notification)
return {"status": "broadcast"}
Authentication for WebSockets
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, Query
import jwt
SECRET_KEY = "your-secret-key"
async def authenticate_websocket(websocket: WebSocket, token: str) -> dict:
"""Authenticate WebSocket connection via JWT token."""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
return payload
except jwt.ExpiredSignatureError:
await websocket.close(code=4001, reason="Token expired")
return None
except jwt.InvalidTokenError:
await websocket.close(code=4002, reason="Invalid token")
return None
@app.websocket("/ws/authenticated")
async def authenticated_websocket(
websocket: WebSocket,
token: str = Query(...)
):
user = await authenticate_websocket(websocket, token)
if not user:
return
await websocket.accept()
try:
while True:
data = await websocket.receive_text()
await websocket.send_json({
"user": user["user_id"],
"message": data,
})
except WebSocketDisconnect:
pass
Common Mistakes
| Mistake | Problem | Solution |
|---|---|---|
| No heartbeat/ping | Stale connections accumulate | Implement ping/pong |
| Not handling disconnects | Memory leaks | Always use try/except WebSocketDisconnect |
| No authentication | Anyone can connect | Validate tokens on connection |
| No message validation | Crashes from bad data | Validate all incoming messages |
| Blocking operations | Blocks event loop | Use async/await for I/O |
| No connection limits | Resource exhaustion | Limit concurrent connections |
| Not broadcasting errors | Users unaware of issues | Notify on errors |
Best Practices
- Handle disconnections gracefully — always use try/except WebSocketDisconnect
- Use connection managers — track active connections efficiently
- Broadcast messages to all connected clients when needed
- Add authentication for WebSocket connections
- Use ping/pong to detect stale connections
- Limit message size — prevent abuse
- Queue messages for offline users — ensure delivery
- Use JSON for structured messages
- Add reconnection logic on the client side
- Monitor connection count — prevent resource exhaustion
Key Takeaways
- WebSockets are full-duplex (both directions simultaneously)
- Use WebSockets for real-time features (chat, gaming, live data)
- Handle disconnections gracefully with try/except
- Use connection managers to track active connections
- Broadcast messages to all connected clients when needed
- Consider authentication for WebSocket connections
- Use ping/pong to detect stale connections
- Queue messages for offline users
- Always validate incoming messages
- Monitor connection count to prevent resource exhaustion