Scaling
Scale RouteMQ applications to handle increased load through horizontal and vertical scaling strategies.
Scaling Overview
RouteMQ supports multiple scaling approaches:
Horizontal Scaling: Add more application instances
Vertical Scaling: Increase resources per instance
Database Scaling: Scale database layer independently
MQTT Broker Scaling: Scale message broker infrastructure
Auto-scaling: Automatic scaling based on metrics
Horizontal Scaling
Shared Subscription Scaling
RouteMQ's built-in shared subscription support enables horizontal scaling:
# Enable shared subscriptions in your routes
from core.router import Router
router = Router()
# Scale with multiple workers for high-throughput routes
router.on("sensors/{device_id}/data",
SensorController.process_data,
shared=True,
worker_count=5) # 5 workers for this route
# Different scaling for different routes
router.on("alerts/{device_id}",
AlertController.handle_alert,
shared=True,
worker_count=2) # 2 workers for alerts
# Non-shared for order-dependent processing
router.on("commands/{device_id}",
CommandController.execute_command,
shared=False) # Single worker maintains orderContainer-Based Horizontal Scaling
Docker Swarm Scaling
# docker-compose.swarm.yml
version: '3.8'
services:
routemq:
image: routemq:production
deploy:
replicas: 5 # Run 5 instances
restart_policy:
condition: on-failure
delay: 5s
max_attempts: 3
update_config:
parallelism: 1
delay: 10s
failure_action: rollback
resources:
limits:
cpus: '1.0'
memory: 512M
reservations:
cpus: '0.25'
memory: 128M
environment:
- MQTT_GROUP_NAME=swarm_workers
- WORKER_COUNT=3
networks:
- routemq-network
networks:
routemq-network:
driver: overlay
attachable: trueDeploy with Docker Swarm:
# Initialize swarm
docker swarm init
# Deploy stack
docker stack deploy -c docker-compose.swarm.yml routemq
# Scale services
docker service scale routemq_routemq=10
# Check service status
docker service ls
docker service ps routemq_routemqKubernetes Horizontal Scaling
# k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: routemq
namespace: production
spec:
replicas: 5
strategy:
type: RollingUpdate
rollingUpdate:
maxSurge: 1
maxUnavailable: 1
selector:
matchLabels:
app: routemq
template:
metadata:
labels:
app: routemq
spec:
containers:
- name: routemq
image: routemq:v1.0.0
ports:
- containerPort: 8080
env:
- name: MQTT_GROUP_NAME
value: "k8s_workers"
- name: WORKER_COUNT
value: "3"
envFrom:
- configMapRef:
name: routemq-config
- secretRef:
name: routemq-secrets
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8080
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8080
initialDelaySeconds: 5
periodSeconds: 5Scale Kubernetes deployment:
# Scale manually
kubectl scale deployment routemq --replicas=10
# Check scaling status
kubectl get deployment routemq
kubectl get pods -l app=routemqMulti-Instance Configuration
Instance Coordination
# config/scaling.py
import os
import socket
import uuid
class InstanceConfig:
"""Configuration for multi-instance coordination"""
def __init__(self):
self.instance_id = self._generate_instance_id()
self.group_name = os.getenv('MQTT_GROUP_NAME', 'default_group')
self.total_instances = int(os.getenv('TOTAL_INSTANCES', '1'))
self.instance_index = int(os.getenv('INSTANCE_INDEX', '0'))
def _generate_instance_id(self):
"""Generate unique instance ID"""
hostname = socket.gethostname()
pod_name = os.getenv('HOSTNAME', hostname) # Kubernetes pod name
unique_suffix = str(uuid.uuid4())[:8]
return f"{pod_name}-{unique_suffix}"
def get_shared_subscription_group(self):
"""Get group name for shared subscriptions"""
return f"{self.group_name}_{self.instance_index}"
def should_handle_route(self, route_hash: str) -> bool:
"""Determine if this instance should handle a specific route"""
# Simple hash-based routing for load distribution
route_hash_int = hash(route_hash) % self.total_instances
return route_hash_int == self.instance_indexLoad Distribution Strategies
# config/load_distribution.py
import hashlib
import time
from typing import List, Dict
class LoadDistributionStrategy:
"""Strategies for distributing load across instances"""
@staticmethod
def round_robin(instances: List[str], request_counter: int) -> str:
"""Round-robin distribution"""
return instances[request_counter % len(instances)]
@staticmethod
def hash_based(instances: List[str], key: str) -> str:
"""Hash-based distribution for consistency"""
hash_value = int(hashlib.md5(key.encode()).hexdigest(), 16)
return instances[hash_value % len(instances)]
@staticmethod
def weighted_distribution(instances: Dict[str, int], total_weight: int) -> str:
"""Weighted distribution based on instance capacity"""
import random
weight_sum = 0
random_value = random.randint(1, total_weight)
for instance, weight in instances.items():
weight_sum += weight
if random_value <= weight_sum:
return instance
# Fallback to first instance
return list(instances.keys())[0]
@staticmethod
def least_connections(instance_connections: Dict[str, int]) -> str:
"""Route to instance with least active connections"""
return min(instance_connections.items(), key=lambda x: x[1])[0]Vertical Scaling
Resource Optimization
CPU Scaling
# Increase CPU resources
services:
routemq:
deploy:
resources:
limits:
cpus: '4.0' # Increased from 1.0
memory: 2G # Increased memory accordingly
reservations:
cpus: '2.0' # Higher baseline
memory: 1G# Optimize CPU usage in application
import asyncio
import concurrent.futures
class CPUOptimizedProcessor:
"""CPU-optimized message processor"""
def __init__(self, max_workers=None):
# Use ThreadPoolExecutor for CPU-bound tasks
self.executor = concurrent.futures.ThreadPoolExecutor(
max_workers=max_workers or os.cpu_count()
)
async def process_cpu_intensive_task(self, data):
"""Process CPU-intensive tasks in thread pool"""
loop = asyncio.get_event_loop()
# Run CPU-bound work in thread pool
result = await loop.run_in_executor(
self.executor,
self._cpu_intensive_function,
data
)
return result
def _cpu_intensive_function(self, data):
"""CPU-intensive processing function"""
# Complex calculations, data transformation, etc.
passMemory Scaling
# config/memory_optimization.py
import gc
import psutil
import logging
class MemoryManager:
"""Memory management for vertical scaling"""
def __init__(self, max_memory_percent=80):
self.max_memory_percent = max_memory_percent
self.logger = logging.getLogger('MemoryManager')
def check_memory_usage(self):
"""Monitor memory usage"""
process = psutil.Process()
memory_info = process.memory_info()
memory_percent = process.memory_percent()
if memory_percent > self.max_memory_percent:
self.logger.warning(f"High memory usage: {memory_percent:.1f}%")
self._optimize_memory()
return {
'memory_mb': memory_info.rss / 1024 / 1024,
'memory_percent': memory_percent
}
def _optimize_memory(self):
"""Optimize memory usage"""
# Force garbage collection
gc.collect()
# Clear internal caches if available
self._clear_application_caches()
def _clear_application_caches(self):
"""Clear application-specific caches"""
# Clear Redis cache
# Clear in-memory data structures
# Reset connection pools if needed
passWorker Process Scaling
Dynamic Worker Adjustment
# core/dynamic_worker_manager.py
import asyncio
import psutil
import time
from typing import Dict, List
class DynamicWorkerManager:
"""Dynamically adjust worker count based on load"""
def __init__(self, min_workers=2, max_workers=10):
self.min_workers = min_workers
self.max_workers = max_workers
self.current_workers = min_workers
self.metrics = {
'messages_per_second': 0,
'cpu_usage': 0,
'memory_usage': 0,
'queue_depth': 0
}
async def monitor_and_scale(self):
"""Monitor metrics and adjust worker count"""
while True:
await self._collect_metrics()
optimal_workers = self._calculate_optimal_workers()
if optimal_workers != self.current_workers:
await self._adjust_workers(optimal_workers)
await asyncio.sleep(30) # Check every 30 seconds
async def _collect_metrics(self):
"""Collect performance metrics"""
# CPU and memory metrics
self.metrics['cpu_usage'] = psutil.cpu_percent(interval=1)
self.metrics['memory_usage'] = psutil.virtual_memory().percent
# Application-specific metrics
self.metrics['messages_per_second'] = await self._get_message_rate()
self.metrics['queue_depth'] = await self._get_queue_depth()
def _calculate_optimal_workers(self) -> int:
"""Calculate optimal number of workers"""
# Base calculation on CPU usage
cpu_factor = min(2.0, self.metrics['cpu_usage'] / 50.0)
# Adjust for message rate
message_factor = min(2.0, self.metrics['messages_per_second'] / 100.0)
# Adjust for queue depth
queue_factor = min(2.0, self.metrics['queue_depth'] / 1000.0)
# Calculate target workers
target_workers = int(
self.min_workers * max(cpu_factor, message_factor, queue_factor)
)
# Apply limits
return max(self.min_workers, min(target_workers, self.max_workers))
async def _adjust_workers(self, target_workers: int):
"""Adjust worker count"""
if target_workers > self.current_workers:
# Scale up
for _ in range(target_workers - self.current_workers):
await self._start_worker()
elif target_workers < self.current_workers:
# Scale down
for _ in range(self.current_workers - target_workers):
await self._stop_worker()
self.current_workers = target_workers
logging.info(f"Scaled workers to {target_workers}")Database Scaling
Read Replicas
# config/database_scaling.py
import random
from sqlalchemy.ext.asyncio import create_async_engine
class DatabaseScalingConfig:
"""Database scaling configuration"""
def __init__(self):
self.write_engine = create_async_engine(
self._get_write_connection_string(),
pool_size=20,
max_overflow=30
)
self.read_engines = [
create_async_engine(
self._get_read_connection_string(replica),
pool_size=15,
max_overflow=25
)
for replica in self._get_read_replicas()
]
def get_write_engine(self):
"""Get engine for write operations"""
return self.write_engine
def get_read_engine(self):
"""Get engine for read operations (load balanced)"""
return random.choice(self.read_engines)
def _get_read_replicas(self):
"""Get list of read replica hosts"""
replicas = os.getenv('DB_READ_REPLICAS', '').split(',')
return [replica.strip() for replica in replicas if replica.strip()]Connection Pool Scaling
# config/connection_pool_scaling.py
class ConnectionPoolScaler:
"""Scale database connection pools based on load"""
def __init__(self):
self.base_pool_size = 10
self.max_pool_size = 50
self.current_pool_size = self.base_pool_size
async def scale_pool_based_on_load(self, engine):
"""Scale connection pool based on current load"""
pool = engine.pool
# Get pool metrics
checked_out = pool.checkedout()
pool_size = pool.size()
overflow = pool.overflow()
utilization = checked_out / pool_size if pool_size > 0 else 0
# Scale up if utilization is high
if utilization > 0.8 and pool_size < self.max_pool_size:
new_size = min(pool_size + 5, self.max_pool_size)
await self._resize_pool(engine, new_size)
# Scale down if utilization is low
elif utilization < 0.3 and pool_size > self.base_pool_size:
new_size = max(pool_size - 2, self.base_pool_size)
await self._resize_pool(engine, new_size)Auto-scaling
Kubernetes Horizontal Pod Autoscaler
# k8s/hpa.yaml
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: routemq-hpa
namespace: production
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: routemq
minReplicas: 2
maxReplicas: 20
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
- type: Resource
resource:
name: memory
target:
type: Utilization
averageUtilization: 80
- type: Pods
pods:
metric:
name: messages_per_second
target:
type: AverageValue
averageValue: "100"
behavior:
scaleUp:
stabilizationWindowSeconds: 60
policies:
- type: Percent
value: 100
periodSeconds: 15
scaleDown:
stabilizationWindowSeconds: 300
policies:
- type: Percent
value: 50
periodSeconds: 60Custom Metrics for Auto-scaling
# monitoring/custom_metrics.py
import asyncio
import time
from kubernetes import client, config
class CustomMetricsExporter:
"""Export custom metrics for Kubernetes HPA"""
def __init__(self):
config.load_incluster_config() # or load_kube_config() for local
self.custom_api = client.CustomObjectsApi()
self.metrics = {}
async def export_message_rate_metric(self, messages_per_second: float):
"""Export message rate metric for HPA"""
metric = {
"apiVersion": "custom.metrics.k8s.io/v1beta1",
"kind": "MetricValue",
"metadata": {
"name": "messages_per_second",
"namespace": "production"
},
"value": str(messages_per_second),
"timestamp": time.time()
}
try:
self.custom_api.create_namespaced_custom_object(
group="custom.metrics.k8s.io",
version="v1beta1",
namespace="production",
plural="metrics",
body=metric
)
except Exception as e:
logging.error(f"Failed to export metric: {e}")
async def monitor_and_export_metrics(self):
"""Continuously monitor and export metrics"""
while True:
# Collect application metrics
message_rate = await self._get_message_rate()
queue_depth = await self._get_queue_depth()
# Export to Kubernetes
await self.export_message_rate_metric(message_rate)
await asyncio.sleep(30)Cloud Auto-scaling
AWS ECS Auto-scaling
{
"service": "routemq-service",
"cluster": "production-cluster",
"taskDefinition": "routemq:latest",
"desiredCount": 3,
"deploymentConfiguration": {
"maximumPercent": 200,
"minimumHealthyPercent": 50
},
"autoScalingSettings": {
"targetTrackingScalingPolicies": [
{
"targetValue": 70.0,
"predefinedMetricSpecification": {
"predefinedMetricType": "ECSServiceAverageCPUUtilization"
},
"scaleOutCooldown": 300,
"scaleInCooldown": 300
},
{
"targetValue": 80.0,
"predefinedMetricSpecification": {
"predefinedMetricType": "ECSServiceAverageMemoryUtilization"
}
}
]
}
}AWS Lambda Auto-scaling
# serverless.yml for Lambda deployment
service: routemq-lambda
provider:
name: aws
runtime: python3.9
memorySize: 512
timeout: 30
functions:
processMessage:
handler: lambda_handler.process_message
events:
- sqs:
arn:
Fn::GetAtt:
- MessageQueue
- Arn
batchSize: 10
reservedConcurrency: 100 # Limit concurrent executions
resources:
Resources:
MessageQueue:
Type: AWS::SQS::Queue
Properties:
VisibilityTimeoutSeconds: 60
RedrivePolicy:
deadLetterTargetArn:
Fn::GetAtt:
- DeadLetterQueue
- Arn
maxReceiveCount: 3Performance Monitoring for Scaling
Scaling Metrics Dashboard
# monitoring/scaling_dashboard.py
import time
import json
from typing import Dict, List
class ScalingMetricsDashboard:
"""Dashboard for monitoring scaling metrics"""
def __init__(self):
self.metrics_history = []
self.scaling_events = []
def record_scaling_event(self, event_type: str, old_count: int, new_count: int, reason: str):
"""Record a scaling event"""
event = {
'timestamp': time.time(),
'type': event_type, # 'scale_up' or 'scale_down'
'old_count': old_count,
'new_count': new_count,
'reason': reason
}
self.scaling_events.append(event)
def get_scaling_summary(self, hours: int = 24) -> Dict:
"""Get scaling summary for the last N hours"""
cutoff_time = time.time() - (hours * 3600)
recent_events = [
event for event in self.scaling_events
if event['timestamp'] >= cutoff_time
]
scale_ups = len([e for e in recent_events if e['type'] == 'scale_up'])
scale_downs = len([e for e in recent_events if e['type'] == 'scale_down'])
return {
'period_hours': hours,
'total_scaling_events': len(recent_events),
'scale_up_events': scale_ups,
'scale_down_events': scale_downs,
'events': recent_events[-10:] # Last 10 events
}Load Testing for Scaling Validation
# testing/load_test.py
import asyncio
import time
import paho.mqtt.client as mqtt
from concurrent.futures import ThreadPoolExecutor
class LoadTester:
"""Load testing tool for validating scaling"""
def __init__(self, broker_host: str, broker_port: int = 1883):
self.broker_host = broker_host
self.broker_port = broker_port
self.clients = []
self.message_count = 0
async def run_load_test(self, num_clients: int, messages_per_second: int, duration_seconds: int):
"""Run load test with specified parameters"""
print(f"Starting load test: {num_clients} clients, {messages_per_second} msg/s, {duration_seconds}s")
# Create MQTT clients
for i in range(num_clients):
client = mqtt.Client(f"load_test_client_{i}")
client.connect(self.broker_host, self.broker_port, 60)
self.clients.append(client)
# Calculate message interval
interval = 1.0 / messages_per_second if messages_per_second > 0 else 1.0
start_time = time.time()
end_time = start_time + duration_seconds
with ThreadPoolExecutor(max_workers=num_clients) as executor:
while time.time() < end_time:
# Send messages from all clients
tasks = []
for client in self.clients:
task = executor.submit(self._send_test_message, client)
tasks.append(task)
# Wait for all messages to be sent
for task in tasks:
task.result()
self.message_count += num_clients
await asyncio.sleep(interval)
# Cleanup
for client in self.clients:
client.disconnect()
total_time = time.time() - start_time
actual_rate = self.message_count / total_time
print(f"Load test completed:")
print(f" Messages sent: {self.message_count}")
print(f" Actual rate: {actual_rate:.1f} msg/s")
print(f" Duration: {total_time:.1f}s")
def _send_test_message(self, client):
"""Send a test message"""
message = {
'timestamp': time.time(),
'test_data': 'load_test_message',
'message_id': self.message_count
}
topic = f"test/load/{self.message_count % 100}" # Distribute across topics
client.publish(topic, json.dumps(message), qos=1)Scaling Best Practices
Guidelines
Start Small: Begin with minimal resources and scale up based on actual load
Monitor Continuously: Use comprehensive monitoring to understand scaling needs
Test Scaling: Regularly test scaling scenarios under load
Plan for Peak Load: Consider peak usage patterns and seasonal variations
Automate Scaling: Use auto-scaling to respond quickly to load changes
Common Pitfalls
Over-provisioning: Wasting resources on unused capacity
Under-provisioning: Causing performance issues during peak load
Ignoring Dependencies: Scaling application without considering database/MQTT broker limits
Missing Monitoring: Scaling without proper metrics and alerting
State Management: Not considering stateful operations when scaling horizontally
Resource Planning
# planning/capacity_planning.py
class CapacityPlanner:
"""Tool for capacity planning and scaling decisions"""
def __init__(self):
self.baseline_metrics = {
'messages_per_instance': 1000, # Messages/hour per instance
'cpu_per_message': 0.01, # CPU seconds per message
'memory_per_message': 0.1, # MB per message
'db_connections_per_instance': 5,
}
def calculate_required_instances(self, expected_messages_per_hour: int) -> dict:
"""Calculate required instances for expected load"""
# Calculate based on message throughput
throughput_instances = expected_messages_per_hour / self.baseline_metrics['messages_per_instance']
# Calculate based on CPU
cpu_hours = (expected_messages_per_hour * self.baseline_metrics['cpu_per_message']) / 3600
cpu_instances = cpu_hours / 1.0 # Assume 1 CPU hour per instance
# Calculate based on memory
memory_mb = expected_messages_per_hour * self.baseline_metrics['memory_per_message']
memory_instances = memory_mb / 512 # Assume 512MB per instance
# Use the maximum requirement
required_instances = max(throughput_instances, cpu_instances, memory_instances)
# Add safety margin
recommended_instances = int(required_instances * 1.2) # 20% safety margin
return {
'expected_load': expected_messages_per_hour,
'throughput_instances': throughput_instances,
'cpu_instances': cpu_instances,
'memory_instances': memory_instances,
'recommended_instances': max(1, recommended_instances),
'safety_margin': 0.2
}Next Steps
Load Balancing - Distribute traffic across scaled instances
Security - Secure your scaled deployment
Monitoring - Monitor your scaled infrastructure
Production Configuration - Optimize configuration for scaling
Last updated