Examples
Practical examples and use cases for RouteMQ applications.
Topics
IoT Device Management - Managing IoT devices with MQTT
Sensor Data Collection - Collecting and processing sensor data
Real-time Notifications - Push notifications and alerts
API Gateway - Using RouteMQ as an API gateway
Chat Application - Building a real-time chat system
Complete IoT Example
This example shows a complete IoT device management system:
Device Router
# app/routers/devices.py
from core.router import Router
from app.controllers.device_controller import DeviceController
from app.middleware.auth import AuthMiddleware
from app.middleware.rate_limit import RateLimitMiddleware
router = Router()
auth = AuthMiddleware()
rate_limit = RateLimitMiddleware(max_requests=100, window_seconds=60)
with router.group(prefix="devices", middleware=[auth, rate_limit]) as devices:
devices.on("register/{device_id}", DeviceController.register_device, qos=1)
devices.on("heartbeat/{device_id}", DeviceController.heartbeat, qos=0)
devices.on("data/{device_id}", DeviceController.receive_data, qos=1)
devices.on("control/{device_id}", DeviceController.control_device, qos=2, shared=True)Device Controller
# app/controllers/device_controller.py
from core.controller import Controller
from core.redis_manager import redis_manager
from app.models.device import Device
import json
import time
class DeviceController(Controller):
@staticmethod
async def register_device(device_id, payload, client):
"""Register a new device"""
device_info = {
"device_id": device_id,
"name": payload.get("name"),
"type": payload.get("type"),
"firmware": payload.get("firmware"),
"registered_at": time.time()
}
# Store in Redis for quick access
await redis_manager.set_json(f"device:{device_id}", device_info, ex=86400)
# Store in database for persistence
device = Device(**device_info)
await device.save()
# Publish registration confirmation
response_topic = f"devices/{device_id}/register/response"
client.publish(response_topic, json.dumps({
"status": "registered",
"device_id": device_id,
"timestamp": time.time()
}))
return {"status": "registered"}
@staticmethod
async def receive_data(device_id, payload, client):
"""Receive and process device data"""
# Update last seen timestamp
await redis_manager.set(f"device:{device_id}:last_seen", time.time(), ex=3600)
# Process sensor data
if "temperature" in payload:
await DeviceController._process_temperature(device_id, payload["temperature"])
if "humidity" in payload:
await DeviceController._process_humidity(device_id, payload["humidity"])
return {"status": "processed"}Sensor Data Pipeline
# app/routers/sensors.py
from core.router import Router
from app.controllers.sensor_controller import SensorController
from app.middleware.validation import ValidationMiddleware
router = Router()
validation = ValidationMiddleware(schema="sensor_data.json")
with router.group(prefix="sensors", middleware=[validation]) as sensors:
sensors.on("temperature/{sensor_id}", SensorController.handle_temperature, qos=1)
sensors.on("batch/{location}", SensorController.handle_batch, qos=2, shared=True, worker_count=3)Real-time Chat System
# app/routers/chat.py
from core.router import Router
from app.controllers.chat_controller import ChatController
from app.middleware.auth import AuthMiddleware
router = Router()
auth = AuthMiddleware()
with router.group(prefix="chat", middleware=[auth]) as chat:
chat.on("message/{room_id}", ChatController.handle_message, qos=1)
chat.on("join/{room_id}", ChatController.join_room, qos=1)
chat.on("leave/{room_id}", ChatController.leave_room, qos=1)Performance Monitoring Example
# app/middleware/performance.py
from core.middleware import Middleware
from core.redis_manager import redis_manager
import time
class PerformanceMiddleware(Middleware):
async def handle(self, context, next_handler):
start_time = time.time()
topic = context['topic']
# Track request count
await redis_manager.incr(f"stats:requests:{topic}:count")
await redis_manager.incr("stats:requests:total")
# Execute handler
result = await next_handler(context)
# Track processing time
processing_time = time.time() - start_time
await redis_manager.set_json(f"stats:requests:{topic}:last_time", {
"duration": processing_time,
"timestamp": time.time()
}, ex=3600)
# Track slow requests
if processing_time > 1.0: # Over 1 second
await redis_manager.incr(f"stats:slow_requests:{topic}")
return resultNext Steps
IoT Device Management - Complete IoT example
Sensor Data Collection - Data processing patterns
API Gateway - Gateway implementation
Last updated