Real-time Notifications
This guide demonstrates how to build a comprehensive real-time notification system using RouteMQ for push notifications, alerts, and real-time updates.
Overview
The notification system handles:
Real-time push notifications to web clients
Email and SMS alerts
System status notifications
User-specific notifications
Broadcast messages
Notification queuing and retry logic
Architecture
Event Sources -> RouteMQ -> Notification Router -> Multiple Channels
-> WebSocket/SSE
-> Email Service
-> SMS Service
-> Push Notifications
-> In-App NotificationsNotification Router Setup
# app/routers/notifications.py
from core.router import Router
from app.controllers.notification_controller import NotificationController
from app.middleware.auth import AuthMiddleware
from app.middleware.rate_limit import RateLimitMiddleware
from app.middleware.notification_filter import NotificationFilterMiddleware
router = Router()
# Middleware setup
auth = AuthMiddleware()
rate_limit = RateLimitMiddleware(max_requests=1000, window_seconds=60)
notification_filter = NotificationFilterMiddleware()
# Notification routes
with router.group(prefix="notifications", middleware=[auth, rate_limit]) as notifications:
# Real-time notifications
notifications.on("realtime/user/{user_id}", NotificationController.send_user_notification, qos=1)
notifications.on("realtime/broadcast", NotificationController.broadcast_notification, qos=1)
notifications.on("realtime/group/{group_id}", NotificationController.send_group_notification, qos=1)
# Alert notifications
notifications.on("alerts/critical/{alert_type}", NotificationController.handle_critical_alert, qos=2)
notifications.on("alerts/warning/{alert_type}", NotificationController.handle_warning_alert, qos=1)
notifications.on("alerts/info/{alert_type}", NotificationController.handle_info_alert, qos=0)
# System notifications
notifications.on("system/status", NotificationController.system_status_update, qos=1)
notifications.on("system/maintenance", NotificationController.maintenance_notification, qos=2)
# Email notifications (queued processing)
notifications.on("email/send", NotificationController.queue_email, qos=2, shared=True, worker_count=3)
notifications.on("email/batch", NotificationController.send_batch_email, qos=2, shared=True)
# SMS notifications
notifications.on("sms/send", NotificationController.send_sms, qos=2, shared=True)
# Push notifications (mobile)
notifications.on("push/mobile/{platform}", NotificationController.send_push_notification, qos=1)
# WebSocket connection management
with router.group(prefix="websocket", middleware=[auth]) as websocket:
websocket.on("connect/{user_id}", NotificationController.websocket_connect, qos=1)
websocket.on("disconnect/{user_id}", NotificationController.websocket_disconnect, qos=1)
websocket.on("heartbeat/{user_id}", NotificationController.websocket_heartbeat, qos=0)Notification Controller Implementation
# app/controllers/notification_controller.py
from core.controller import Controller
from core.redis_manager import redis_manager
from app.models.mail_log import MailLog
from app.models.user import User
from app.services.email_service import EmailService
from app.services.sms_service import SMSService
from app.services.push_service import PushService
from app.services.websocket_service import WebSocketService
import json
import time
import uuid
from typing import Dict, List, Any
from enum import Enum
from datetime import datetime, timedelta
class NotificationPriority(Enum):
LOW = "low"
NORMAL = "normal"
HIGH = "high"
CRITICAL = "critical"
class NotificationController(Controller):
@staticmethod
async def send_user_notification(user_id: str, payload: Dict[str, Any], client):
"""Send real-time notification to specific user"""
try:
notification_id = str(uuid.uuid4())
message = payload.get("message")
title = payload.get("title", "Notification")
priority = payload.get("priority", NotificationPriority.NORMAL.value)
notification_type = payload.get("type", "info")
data = payload.get("data", {})
if not message:
raise ValueError("Message is required")
# Create notification object
notification = {
"id": notification_id,
"user_id": user_id,
"title": title,
"message": message,
"type": notification_type,
"priority": priority,
"data": data,
"timestamp": time.time(),
"read": False,
"channels": []
}
# Store notification
await redis_manager.set_json(f"notification:{notification_id}", notification, ex=86400)
# Add to user's notification list
await redis_manager.lpush(f"user:{user_id}:notifications", notification_id)
await redis_manager.ltrim(f"user:{user_id}:notifications", 0, 99) # Keep last 100
# Check user's notification preferences
preferences = await NotificationController._get_user_preferences(user_id)
# Send via appropriate channels
channels_used = []
# WebSocket/SSE for real-time updates
if preferences.get("realtime", True):
await WebSocketService.send_to_user(user_id, notification)
channels_used.append("realtime")
# Email for high priority notifications
if priority in [NotificationPriority.HIGH.value, NotificationPriority.CRITICAL.value]:
if preferences.get("email", True):
await NotificationController._queue_email_notification(user_id, notification)
channels_used.append("email")
# SMS for critical notifications
if priority == NotificationPriority.CRITICAL.value:
if preferences.get("sms", False):
await NotificationController._queue_sms_notification(user_id, notification)
channels_used.append("sms")
# Push notification for mobile
if preferences.get("push", True):
await NotificationController._send_push_notification(user_id, notification)
channels_used.append("push")
# Update notification with channels used
notification["channels"] = channels_used
await redis_manager.set_json(f"notification:{notification_id}", notification, ex=86400)
# Update user's unread count
await redis_manager.incr(f"user:{user_id}:unread_notifications")
return {
"status": "sent",
"notification_id": notification_id,
"channels": channels_used
}
except Exception as e:
print(f"Error sending user notification to {user_id}: {e}")
raise
@staticmethod
async def broadcast_notification(payload: Dict[str, Any], client):
"""Send broadcast notification to all users"""
try:
message = payload.get("message")
title = payload.get("title", "System Notification")
notification_type = payload.get("type", "system")
target_groups = payload.get("target_groups", []) # Optional user groups
if not message:
raise ValueError("Message is required")
broadcast_id = str(uuid.uuid4())
# Create broadcast notification
broadcast = {
"id": broadcast_id,
"title": title,
"message": message,
"type": notification_type,
"target_groups": target_groups,
"timestamp": time.time(),
"sent_count": 0
}
# Store broadcast info
await redis_manager.set_json(f"broadcast:{broadcast_id}", broadcast, ex=86400)
# Get target users
if target_groups:
users = await NotificationController._get_users_in_groups(target_groups)
else:
users = await NotificationController._get_all_active_users()
sent_count = 0
# Send to each user
for user_id in users:
try:
# Create individual notification
user_notification = {
"id": str(uuid.uuid4()),
"user_id": user_id,
"title": title,
"message": message,
"type": notification_type,
"broadcast_id": broadcast_id,
"timestamp": time.time(),
"read": False
}
# Send via WebSocket
await WebSocketService.send_to_user(user_id, user_notification)
# Store user notification
await redis_manager.lpush(f"user:{user_id}:notifications", user_notification["id"])
await redis_manager.set_json(f"notification:{user_notification['id']}", user_notification, ex=86400)
sent_count += 1
except Exception as user_error:
print(f"Error sending broadcast to user {user_id}: {user_error}")
# Update broadcast stats
broadcast["sent_count"] = sent_count
await redis_manager.set_json(f"broadcast:{broadcast_id}", broadcast, ex=86400)
return {
"status": "broadcast_sent",
"broadcast_id": broadcast_id,
"sent_count": sent_count
}
except Exception as e:
print(f"Error sending broadcast notification: {e}")
raise
@staticmethod
async def handle_critical_alert(alert_type: str, payload: Dict[str, Any], client):
"""Handle critical system alerts"""
try:
alert_id = str(uuid.uuid4())
message = payload.get("message")
source = payload.get("source")
severity = payload.get("severity", "critical")
affected_systems = payload.get("affected_systems", [])
# Create alert notification
alert = {
"id": alert_id,
"type": alert_type,
"message": message,
"source": source,
"severity": severity,
"affected_systems": affected_systems,
"timestamp": time.time(),
"acknowledged": False
}
# Store alert
await redis_manager.set_json(f"alert:{alert_id}", alert, ex=86400)
await redis_manager.lpush("alerts:critical", alert_id)
# Get administrators and on-call personnel
admin_users = await NotificationController._get_admin_users()
oncall_users = await NotificationController._get_oncall_users(alert_type)
# Combine and deduplicate users
target_users = list(set(admin_users + oncall_users))
# Send immediate notifications via all channels
for user_id in target_users:
# Real-time notification
await WebSocketService.send_to_user(user_id, {
"type": "critical_alert",
"alert": alert,
"requires_acknowledgment": True
})
# Email notification
await NotificationController._queue_email_notification(user_id, {
"title": f"CRITICAL ALERT: {alert_type}",
"message": message,
"priority": "critical",
"data": alert
})
# SMS notification
await NotificationController._queue_sms_notification(user_id, {
"message": f"CRITICAL: {alert_type} - {message}",
"priority": "critical"
})
# Set up escalation if not acknowledged
await NotificationController._schedule_alert_escalation(alert_id, target_users)
return {
"status": "critical_alert_sent",
"alert_id": alert_id,
"notified_users": len(target_users)
}
except Exception as e:
print(f"Error handling critical alert {alert_type}: {e}")
raise
@staticmethod
async def queue_email(payload: Dict[str, Any], client):
"""Queue email for delivery"""
try:
email_id = str(uuid.uuid4())
to_email = payload.get("to")
subject = payload.get("subject")
body = payload.get("body")
html_body = payload.get("html_body")
priority = payload.get("priority", "normal")
send_at = payload.get("send_at") # Scheduled delivery
if not all([to_email, subject, body]):
raise ValueError("To email, subject, and body are required")
# Create email job
email_job = {
"id": email_id,
"to": to_email,
"subject": subject,
"body": body,
"html_body": html_body,
"priority": priority,
"send_at": send_at or time.time(),
"created_at": time.time(),
"status": "queued",
"attempts": 0,
"max_attempts": 3
}
# Store email job
await redis_manager.set_json(f"email_job:{email_id}", email_job, ex=86400)
# Add to appropriate queue based on priority
queue_name = f"email_queue:{priority}"
if send_at and send_at > time.time():
# Scheduled email
await redis_manager.zadd("email_queue:scheduled", {email_id: send_at})
else:
# Immediate email
await redis_manager.lpush(queue_name, email_id)
# Log email creation
mail_log = MailLog(
email_id=email_id,
to_email=to_email,
subject=subject,
status="queued",
created_at=datetime.now()
)
await mail_log.save()
return {"status": "email_queued", "email_id": email_id}
except Exception as e:
print(f"Error queuing email: {e}")
raise
@staticmethod
async def send_sms(payload: Dict[str, Any], client):
"""Send SMS notification"""
try:
phone_number = payload.get("phone")
message = payload.get("message")
priority = payload.get("priority", "normal")
if not all([phone_number, message]):
raise ValueError("Phone number and message are required")
# Send SMS via service
result = await SMSService.send_sms(phone_number, message)
# Log SMS
sms_log = {
"phone": phone_number,
"message": message,
"priority": priority,
"status": result.get("status"),
"timestamp": time.time()
}
await redis_manager.lpush("sms_logs", json.dumps(sms_log))
return {"status": "sms_sent", "result": result}
except Exception as e:
print(f"Error sending SMS: {e}")
raise
@staticmethod
async def websocket_connect(user_id: str, payload: Dict[str, Any], client):
"""Handle WebSocket connection"""
try:
session_id = payload.get("session_id")
device_info = payload.get("device_info", {})
if not session_id:
session_id = str(uuid.uuid4())
# Store connection info
connection_info = {
"user_id": user_id,
"session_id": session_id,
"device_info": device_info,
"connected_at": time.time(),
"last_heartbeat": time.time()
}
await redis_manager.set_json(f"websocket:{user_id}:{session_id}", connection_info, ex=3600)
await redis_manager.sadd(f"websocket:users", user_id)
await redis_manager.sadd(f"websocket:user:{user_id}:sessions", session_id)
# Send pending notifications
await NotificationController._send_pending_notifications(user_id)
return {"status": "connected", "session_id": session_id}
except Exception as e:
print(f"Error handling WebSocket connection for user {user_id}: {e}")
raise
@staticmethod
async def websocket_disconnect(user_id: str, payload: Dict[str, Any], client):
"""Handle WebSocket disconnection"""
try:
session_id = payload.get("session_id")
if session_id:
await redis_manager.delete(f"websocket:{user_id}:{session_id}")
await redis_manager.srem(f"websocket:user:{user_id}:sessions", session_id)
# Check if user has any other active sessions
active_sessions = await redis_manager.smembers(f"websocket:user:{user_id}:sessions")
if not active_sessions:
await redis_manager.srem(f"websocket:users", user_id)
return {"status": "disconnected"}
except Exception as e:
print(f"Error handling WebSocket disconnection for user {user_id}: {e}")
raise
# Helper methods
@staticmethod
async def _get_user_preferences(user_id: str) -> Dict[str, Any]:
"""Get user notification preferences"""
preferences = await redis_manager.get_json(f"user:{user_id}:notification_preferences")
if not preferences:
# Default preferences
preferences = {
"realtime": True,
"email": True,
"sms": False,
"push": True
}
return preferences
@staticmethod
async def _queue_email_notification(user_id: str, notification: Dict[str, Any]):
"""Queue email notification for user"""
user = await User.find_by_id(user_id)
if user and user.email:
email_payload = {
"to": user.email,
"subject": notification.get("title", "Notification"),
"body": notification.get("message"),
"priority": notification.get("priority", "normal")
}
# Send to email queue
await NotificationController.queue_email(email_payload, None)
@staticmethod
async def _queue_sms_notification(user_id: str, notification: Dict[str, Any]):
"""Queue SMS notification for user"""
user = await User.find_by_id(user_id)
if user and user.phone:
sms_payload = {
"phone": user.phone,
"message": notification.get("message"),
"priority": notification.get("priority", "normal")
}
# Send to SMS queue
await NotificationController.send_sms(sms_payload, None)
@staticmethod
async def _send_push_notification(user_id: str, notification: Dict[str, Any]):
"""Send push notification to user's devices"""
# Get user's registered devices
device_tokens = await redis_manager.smembers(f"user:{user_id}:push_tokens")
for token in device_tokens:
try:
await PushService.send_notification(token, notification)
except Exception as e:
print(f"Error sending push notification to token {token}: {e}")
@staticmethod
async def _get_admin_users() -> List[str]:
"""Get list of admin user IDs"""
# Implementation depends on your user management system
return await redis_manager.smembers("users:admin")
@staticmethod
async def _get_oncall_users(alert_type: str) -> List[str]:
"""Get on-call users for specific alert type"""
return await redis_manager.smembers(f"oncall:{alert_type}")
@staticmethod
async def _send_pending_notifications(user_id: str):
"""Send pending notifications to newly connected user"""
notification_ids = await redis_manager.lrange(f"user:{user_id}:notifications", 0, 9) # Last 10
for notification_id in notification_ids:
notification = await redis_manager.get_json(f"notification:{notification_id}")
if notification and not notification.get("read"):
await WebSocketService.send_to_user(user_id, notification)WebSocket Service
# app/services/websocket_service.py
from core.redis_manager import redis_manager
import json
import asyncio
class WebSocketService:
@staticmethod
async def send_to_user(user_id: str, data: dict):
"""Send data to all of user's active WebSocket connections"""
sessions = await redis_manager.smembers(f"websocket:user:{user_id}:sessions")
for session_id in sessions:
connection_info = await redis_manager.get_json(f"websocket:{user_id}:{session_id}")
if connection_info:
# In a real implementation, you would send to the actual WebSocket connection
# For now, we'll store it in a queue for the WebSocket handler to pick up
message = {
"user_id": user_id,
"session_id": session_id,
"data": data,
"timestamp": time.time()
}
await redis_manager.lpush(f"websocket:queue:{user_id}:{session_id}", json.dumps(message))
@staticmethod
async def broadcast_to_all(data: dict):
"""Broadcast data to all connected users"""
connected_users = await redis_manager.smembers("websocket:users")
for user_id in connected_users:
await WebSocketService.send_to_user(user_id, data)Usage Examples
User Notification
# Send to: notifications/realtime/user/user123
{
"title": "New Message",
"message": "You have received a new message from John",
"type": "message",
"priority": "normal",
"data": {
"message_id": "msg_456",
"sender": "john_doe"
}
}Critical Alert
# Send to: notifications/alerts/critical/system_failure
{
"message": "Database connection failed",
"source": "database_monitor",
"severity": "critical",
"affected_systems": ["user_service", "order_service"]
}Email Notification
# Send to: notifications/email/send
{
"to": "[email protected]",
"subject": "Account Security Alert",
"body": "Your account was accessed from a new device",
"priority": "high"
}This notification system provides comprehensive real-time communication capabilities with multiple delivery channels and priority handling.
Last updated