Worker Processes
RouteMQ supports horizontal scaling through MQTT shared subscriptions, allowing multiple worker processes to handle messages from the same topic for high-throughput applications.
Shared Subscriptions Overview
MQTT shared subscriptions distribute messages across multiple subscribers, enabling load balancing and horizontal scaling:
┌─────────────────┐ ┌─────────────────┐
│ MQTT Broker │ │ Shared Group │
│ │ │ "workers" │
│ Topic: │ │ │
│ sensors/+/data │───▶│ ┌──────────┐ │
│ │ │ │ Worker 1 │ │
│ Messages: │ │ └──────────┘ │
│ [Msg1, Msg2, │ │ ┌──────────┐ │
│ Msg3, Msg4, │ │ │ Worker 2 │ │
│ Msg5, Msg6] │ │ └──────────┘ │
│ │ │ ┌──────────┐ │
│ │ │ │ Worker 3 │ │
│ │ │ └──────────┘ │
└─────────────────┘ └─────────────────┘
│ │
│ Load Balancing:
│ Worker 1: Msg1, Msg4
│ Worker 2: Msg2, Msg5
│ Worker 3: Msg3, Msg6
▼
Round-robin or
broker-specific
distributionEnabling Shared Subscriptions
Route Configuration
Enable shared subscriptions for specific routes:
from core.router import Router
from app.controllers.sensor_controller import SensorController
router = Router()
# Regular subscription (single instance)
router.on("alerts/{device_id}", AlertController.handle_alert)
# Shared subscription with 3 workers
router.on("sensors/{device_id}/data",
SensorController.process_data,
shared=True,
worker_count=3)
# High-throughput route with 5 workers
router.on("telemetry/bulk",
TelemetryController.process_bulk,
shared=True,
worker_count=5,
qos=1)Group Configuration
Configure the shared subscription group name:
# Environment variable
MQTT_GROUP_NAME=production_workers
# Or in code
worker_manager = WorkerManager(router, group_name="production_workers")Worker Manager
The WorkerManager handles the lifecycle of worker processes:
Starting Workers
from core.worker_manager import WorkerManager
# Initialize worker manager
worker_manager = WorkerManager(
router=main_router,
group_name="production_workers",
router_directory="app.routers"
)
# Start workers for shared subscriptions
worker_manager.start_workers()
# Or specify custom worker count
worker_manager.start_workers(num_workers=8)Worker Process Management
class WorkerManager:
def start_workers(self, num_workers: int = None):
"""Start worker processes based on route configuration"""
shared_routes = self.get_shared_routes_info()
if not shared_routes:
self.logger.info("No shared routes found, skipping worker startup")
return
# Calculate total workers needed
total_workers = num_workers or sum(route['worker_count'] for route in shared_routes)
# Start worker processes
for worker_id in range(total_workers):
process = multiprocessing.Process(
target=worker_process_main,
args=(worker_id, self.router_directory, shared_routes,
broker_config, self.group_name)
)
process.start()
self.workers.append(process)
self.logger.info(f"Started {total_workers} worker processes")Worker Process Architecture
Process Isolation
Each worker runs in a separate process:
def worker_process_main(worker_id: int, router_directory: str,
shared_routes: List[Dict], broker_config: Dict,
group_name: str):
"""Main function for individual worker process"""
# Setup logging for this worker
logging.basicConfig(
level=logging.INFO,
format=f'%(asctime)s - Worker-{worker_id} - %(name)s - %(levelname)s - %(message)s'
)
# Create worker instance
worker = WorkerProcess(worker_id, router_directory, shared_routes,
broker_config, group_name)
# Run the worker
worker.run()Independent Router Loading
Each worker loads its own router instance:
class WorkerProcess:
def setup_router(self):
"""Each worker loads routes independently"""
try:
registry = RouterRegistry(self.router_directory)
self.router = registry.discover_and_load_routers()
self.logger.info(f"Worker {self.worker_id} loaded {len(self.router.routes)} routes")
except Exception as e:
self.logger.error(f"Worker {self.worker_id} failed to load router: {e}")
self.router = Router() # Fallback to empty routerMQTT Client per Worker
Each worker maintains its own MQTT connection:
def setup_client(self):
"""Setup dedicated MQTT client for this worker"""
from paho.mqtt import client as mqtt_client
# Unique client ID per worker
client_id = f"mqtt-worker-{self.worker_id}-{uuid.uuid4().hex[:8]}"
self.client = mqtt_client.Client(client_id=client_id)
self.client.on_connect = self._on_connect
self.client.on_message = self._on_message
# Configure authentication
if self.broker_config.get('username'):
self.client.username_pw_set(
self.broker_config['username'],
self.broker_config['password']
)Shared Subscription Topics
Topic Format
Shared subscriptions use the $share prefix:
# Regular subscription
topic: "sensors/device123/data"
# Shared subscription
topic: "$share/workers/sensors/+/data"
# Format: $share/{group_name}/{topic_pattern}Subscription Management
Workers automatically subscribe to shared topics:
def _on_connect(self, client, userdata, flags, rc):
"""Subscribe to shared topics when connected"""
self.logger.info(f"Worker {self.worker_id} connected with result code {rc}")
for route_info in self.shared_routes:
# Create shared subscription topic
shared_topic = f"$share/{self.group_name}/{route_info['mqtt_topic']}"
self.logger.info(f"Worker {self.worker_id} subscribing to {shared_topic}")
client.subscribe(shared_topic, route_info['qos'])Topic Extraction
Workers extract the original topic from shared subscription messages:
def _on_message(self, client, userdata, msg):
"""Handle message from shared subscription"""
# Original topic from shared subscription
actual_topic = msg.topic
if msg.topic.startswith(f"$share/{self.group_name}/"):
# Strip shared prefix to get actual topic
actual_topic = msg.topic[len(f"$share/{self.group_name}/"):]
# Process with actual topic for route matching
await self.router.dispatch(actual_topic, payload, client)Load Balancing Strategies
Broker-Based Distribution
Most MQTT brokers implement round-robin distribution:
# Messages distributed evenly across workers
Worker 1: Message 1, 4, 7, 10, ...
Worker 2: Message 2, 5, 8, 11, ...
Worker 3: Message 3, 6, 9, 12, ...Route-Specific Worker Counts
Configure different worker counts per route:
# Low-throughput routes - single worker
router.on("config/update", ConfigController.update, shared=False)
# Medium-throughput routes - 2 workers
router.on("alerts/{device_id}", AlertController.handle, shared=True, worker_count=2)
# High-throughput routes - 5 workers
router.on("sensors/+/data", SensorController.process, shared=True, worker_count=5)
# Bulk processing - 10 workers
router.on("telemetry/bulk", BulkController.process, shared=True, worker_count=10)Scaling Patterns
Auto-Scaling Based on Load
class AdaptiveWorkerManager(WorkerManager):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.message_rates = {}
self.scaling_enabled = True
async def monitor_and_scale(self):
"""Monitor message rates and adjust worker count"""
while self.scaling_enabled:
for route in self.get_shared_routes_info():
current_rate = await self.get_message_rate(route['topic'])
optimal_workers = self.calculate_optimal_workers(current_rate)
if optimal_workers != route['worker_count']:
await self.scale_workers(route['topic'], optimal_workers)
await asyncio.sleep(60) # Check every minute
def calculate_optimal_workers(self, message_rate: float) -> int:
"""Calculate optimal worker count based on message rate"""
# Example: 1 worker per 100 messages/second
return max(1, min(10, int(message_rate / 100)))Geographic Distribution
# Region-specific worker groups
worker_manager_us = WorkerManager(router, group_name="us_east_workers")
worker_manager_eu = WorkerManager(router, group_name="eu_west_workers")
worker_manager_asia = WorkerManager(router, group_name="asia_workers")
# Each region handles its own message volume
worker_manager_us.start_workers(num_workers=5)
worker_manager_eu.start_workers(num_workers=3)
worker_manager_asia.start_workers(num_workers=2)Performance Characteristics
Throughput Benefits
# Single process (no shared subscription)
Throughput: ~1,000 messages/second
# 3 workers with shared subscription
Throughput: ~2,800 messages/second (2.8x improvement)
# 5 workers with shared subscription
Throughput: ~4,500 messages/second (4.5x improvement)
# Diminishing returns after optimal point
# 10 workers: ~6,000 messages/second (overhead increases)Latency Considerations
Process Startup: Initial latency for worker process creation
Message Distribution: Small overhead for shared subscription routing
Context Switching: Minimal impact due to process isolation
Memory Usage
# Memory per worker process
Base Process: ~20MB
Router + Routes: ~5MB
MQTT Client: ~2MB
Application Code: ~10MB
Total per Worker: ~37MB
# 5 workers = ~185MB total
# vs Single process = ~37MBConfiguration and Deployment
Environment Configuration
# .env file
MQTT_BROKER=mqtt.example.com
MQTT_PORT=1883
MQTT_USERNAME=worker_user
MQTT_PASSWORD=worker_pass
MQTT_GROUP_NAME=production_workers
# Worker configuration
WORKER_COUNT=5
WORKER_AUTO_SCALE=true
WORKER_MIN_COUNT=2
WORKER_MAX_COUNT=10Docker Deployment
# Dockerfile
FROM python:3.11-slim
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . /app
WORKDIR /app
# Default to 3 workers
ENV WORKER_COUNT=3
CMD ["python", "main.py", "--workers"]Kubernetes Scaling
# k8s-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: routemq-workers
spec:
replicas: 3 # Number of pods
selector:
matchLabels:
app: routemq-workers
template:
metadata:
labels:
app: routemq-workers
spec:
containers:
- name: worker
image: routemq:latest
env:
- name: WORKER_COUNT
value: "2" # Workers per pod
- name: MQTT_GROUP_NAME
value: "k8s-workers"Monitoring and Observability
Worker Health Monitoring
class WorkerHealthMonitor:
def __init__(self, worker_manager: WorkerManager):
self.worker_manager = worker_manager
self.health_checks = {}
async def monitor_workers(self):
"""Monitor worker process health"""
while True:
for worker in self.worker_manager.workers:
if not worker.is_alive():
self.logger.error(f"Worker {worker.pid} died, restarting...")
await self.restart_worker(worker)
await asyncio.sleep(30) # Check every 30 seconds
async def restart_worker(self, failed_worker):
"""Restart a failed worker process"""
# Remove failed worker
self.worker_manager.workers.remove(failed_worker)
# Start replacement worker
new_worker = self.worker_manager.start_single_worker()
self.worker_manager.workers.append(new_worker)Metrics Collection
class WorkerMetrics:
def __init__(self):
self.message_counts = {}
self.processing_times = {}
self.error_counts = {}
def record_message_processed(self, worker_id: int, processing_time: float):
"""Record successful message processing"""
self.message_counts[worker_id] = self.message_counts.get(worker_id, 0) + 1
self.processing_times.setdefault(worker_id, []).append(processing_time)
def record_error(self, worker_id: int, error_type: str):
"""Record processing error"""
key = f"{worker_id}:{error_type}"
self.error_counts[key] = self.error_counts.get(key, 0) + 1
def get_worker_stats(self, worker_id: int) -> dict:
"""Get performance stats for a worker"""
times = self.processing_times.get(worker_id, [])
return {
'messages_processed': self.message_counts.get(worker_id, 0),
'avg_processing_time': sum(times) / len(times) if times else 0,
'error_count': sum(v for k, v in self.error_counts.items()
if k.startswith(f"{worker_id}:"))
}Best Practices
When to Use Shared Subscriptions
✅ Use for:
High-throughput topics (>100 messages/second)
CPU-intensive processing
I/O-bound operations that can be parallelized
Bulk data processing
❌ Don't use for:
Low-frequency topics (<10 messages/second)
Order-dependent processing
Stateful operations requiring message sequence
Simple, fast operations (<1ms processing time)
Worker Count Guidelines
# Calculation formula
optimal_workers = min(
max_workers_allowed,
max(
min_workers_required,
message_rate_per_second / messages_per_worker_per_second
)
)
# Example calculations:
# 500 msg/sec, 50 msg/worker/sec → 10 workers
# 100 msg/sec, 50 msg/worker/sec → 2 workers
# 50 msg/sec, 50 msg/worker/sec → 1 workerResource Management
Memory: Monitor memory usage per worker
CPU: Ensure CPU cores available for workers
Connections: Limit database connections per worker
File Handles: Monitor open file descriptors
Error Handling
class RobustWorkerProcess(WorkerProcess):
async def handle_message_with_retry(self, topic: str, payload: Any):
"""Handle message with retry logic"""
max_retries = 3
for attempt in range(max_retries):
try:
await self.router.dispatch(topic, payload, self.client)
return
except Exception as e:
self.logger.warning(f"Attempt {attempt + 1} failed: {e}")
if attempt == max_retries - 1:
# Final attempt failed - send to dead letter queue
await self.send_to_dlq(topic, payload, str(e))
else:
# Wait before retry
await asyncio.sleep(2 ** attempt)Troubleshooting
Common Issues
Workers not receiving messages:
# Check shared subscription support
mosquitto_pub -h broker -t '$share/test/topic' -m 'test'
# Verify group name consistency
grep MQTT_GROUP_NAME .envUneven load distribution:
# Monitor message distribution
for worker_id, count in worker_metrics.message_counts.items():
print(f"Worker {worker_id}: {count} messages")
# Expected: roughly equal distributionHigh memory usage:
# Monitor worker memory
import psutil
for worker in worker_manager.workers:
process = psutil.Process(worker.pid)
print(f"Worker {worker.pid}: {process.memory_info().rss / 1024 / 1024:.1f}MB")Debug Mode
# Enable debug logging for workers
ROUTEMQ_LOG_LEVEL=DEBUG
# Worker-specific debugging
class DebugWorkerProcess(WorkerProcess):
def _on_message(self, client, userdata, msg):
self.logger.debug(f"Worker {self.worker_id} received: {msg.topic}")
super()._on_message(client, userdata, msg)
self.logger.debug(f"Worker {self.worker_id} finished processing")Next Steps
Controllers - Implement business logic for worker processes
Deployment - Deploy workers in production
Monitoring - Monitor worker performance
Last updated