Controller Methods
Controller methods are the handler functions that process MQTT messages. This guide covers different patterns and best practices for writing effective controller methods.
Method Signatures
Basic Handler Pattern
@staticmethod
async def handler_name(payload, client):
"""Handle messages without route parameters"""
# Process payload
return resultSingle Parameter Handler
@staticmethod
async def handler_name(param: str, payload, client):
"""Handle messages with one route parameter"""
# Process with parameter
return resultMultiple Parameters Handler
@staticmethod
async def handler_name(param1: str, param2: str, payload, client):
"""Handle messages with multiple route parameters"""
# Process with multiple parameters
return resultParameter Types
Route Parameters
Route parameters are automatically extracted from the MQTT topic and passed to your handler:
# Route: devices/{device_id}/sensors/{sensor_type}
@staticmethod
async def handle_sensor_data(device_id: str, sensor_type: str, payload, client):
print(f"Device: {device_id}") # e.g., "device123"
print(f"Sensor: {sensor_type}") # e.g., "temperature"Payload Parameter
The payload parameter contains the parsed message data:
@staticmethod
async def handle_data(device_id: str, payload, client):
# payload is a dict if message was valid JSON
temperature = payload.get('temperature')
humidity = payload.get('humidity')
# payload is raw bytes/string if not JSON
if isinstance(payload, str):
# Handle raw string data
passClient Parameter
The client parameter is the MQTT client instance for publishing responses:
@staticmethod
async def handle_command(device_id: str, payload, client):
# Process command
result = {"status": "completed"}
# Publish response
response_topic = f"devices/{device_id}/response"
client.publish(response_topic, json.dumps(result))Handler Patterns
Fire and Forget
Simple handlers that process messages without sending responses:
from core.controller import Controller
import logging
class LoggingController(Controller):
@staticmethod
async def log_event(event_type: str, payload, client):
"""Log events without responding"""
logging.info(f"Event {event_type}: {payload}")
# No response needed
return {"logged": True}Request-Response
Handlers that process requests and send responses:
from core.controller import Controller
import json
class ApiController(Controller):
@staticmethod
async def handle_request(device_id: str, payload, client):
"""Process request and send response"""
request_id = payload.get('request_id')
# Process the request
result = await ApiController.process_request(payload)
# Send response
response = {
"request_id": request_id,
"result": result,
"timestamp": time.time()
}
response_topic = f"devices/{device_id}/response"
client.publish(response_topic, json.dumps(response))
return resultPublish-Subscribe
Handlers that receive messages and broadcast to multiple topics:
from core.controller import Controller
import json
class BroadcastController(Controller):
@staticmethod
async def handle_broadcast(channel: str, payload, client):
"""Receive and broadcast messages"""
message = payload.get('message')
sender = payload.get('sender')
# Broadcast to all subscribers
broadcast_message = {
"channel": channel,
"message": message,
"sender": sender,
"timestamp": time.time()
}
# Publish to multiple topics
client.publish(f"broadcast/{channel}/all", json.dumps(broadcast_message))
client.publish(f"notifications/{channel}", json.dumps(broadcast_message))
return {"broadcast": True}State Management
Handlers that maintain state across messages:
from core.controller import Controller
from core.redis_manager import redis_manager
import json
class StateController(Controller):
@staticmethod
async def update_state(device_id: str, payload, client):
"""Update and maintain device state"""
new_state = payload.get('state')
# Get current state
state_key = f"device:{device_id}:state"
current_state = await redis_manager.get_json(state_key) or {}
# Update state
current_state.update(new_state)
# Save updated state
await redis_manager.set_json(state_key, current_state, ex=3600)
# Notify state change
state_topic = f"devices/{device_id}/state"
client.publish(state_topic, json.dumps(current_state))
return current_stateAdvanced Patterns
Batch Processing
Handle multiple items in a single message:
from core.controller import Controller
import json
import asyncio
class BatchController(Controller):
@staticmethod
async def process_batch(payload, client):
"""Process multiple items in batch"""
items = payload.get('items', [])
# Process items concurrently
tasks = [BatchController.process_item(item) for item in items]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Send batch results
response = {
"processed": len(results),
"results": results
}
client.publish("batch/results", json.dumps(response))
return response
@staticmethod
async def process_item(item):
"""Process individual item"""
# Simulate processing
await asyncio.sleep(0.1)
return {"id": item.get('id'), "processed": True}Pipeline Processing
Chain multiple processing steps:
from core.controller import Controller
import json
class PipelineController(Controller):
@staticmethod
async def process_pipeline(device_id: str, payload, client):
"""Process data through multiple stages"""
data = payload.get('data')
# Stage 1: Validate
validated_data = await PipelineController.validate_data(data)
if not validated_data:
return {"error": "Validation failed"}
# Stage 2: Transform
transformed_data = await PipelineController.transform_data(validated_data)
# Stage 3: Enrich
enriched_data = await PipelineController.enrich_data(transformed_data)
# Stage 4: Store
stored = await PipelineController.store_data(device_id, enriched_data)
# Notify completion
result_topic = f"devices/{device_id}/pipeline/complete"
client.publish(result_topic, json.dumps({
"status": "completed",
"stages": ["validate", "transform", "enrich", "store"]
}))
return enriched_dataError Recovery
Handle errors and implement retry logic:
from core.controller import Controller
import json
import asyncio
import logging
class ReliableController(Controller):
@staticmethod
async def reliable_process(device_id: str, payload, client):
"""Process with retry logic"""
max_retries = 3
retry_delay = 1.0
for attempt in range(max_retries):
try:
result = await ReliableController.process_data(payload)
# Success - send result
response_topic = f"devices/{device_id}/success"
client.publish(response_topic, json.dumps(result))
return result
except Exception as e:
logging.warning(f"Attempt {attempt + 1} failed: {e}")
if attempt < max_retries - 1:
# Wait before retry
await asyncio.sleep(retry_delay * (2 ** attempt))
else:
# Final failure
error_topic = f"devices/{device_id}/error"
client.publish(error_topic, json.dumps({
"error": "Processing failed after retries",
"attempts": max_retries
}))
return {"error": "Processing failed"}Method Organization
Grouping Related Methods
Organize related functionality in the same controller:
class DeviceController(Controller):
# Device lifecycle methods
@staticmethod
async def handle_connect(device_id: str, payload, client):
"""Handle device connection"""
pass
@staticmethod
async def handle_disconnect(device_id: str, payload, client):
"""Handle device disconnection"""
pass
# Device control methods
@staticmethod
async def handle_restart(device_id: str, payload, client):
"""Handle device restart command"""
pass
@staticmethod
async def handle_shutdown(device_id: str, payload, client):
"""Handle device shutdown command"""
pass
# Helper methods (not handlers)
@staticmethod
async def get_device_status(device_id: str):
"""Get current device status"""
passSeparation of Concerns
Keep controllers focused on specific domains:
# Good: Focused controllers
class UserController(Controller):
"""Handle user-related operations"""
pass
class DeviceController(Controller):
"""Handle device-related operations"""
pass
class NotificationController(Controller):
"""Handle notification operations"""
pass
# Avoid: Mixed responsibilities
class MegaController(Controller):
"""Handles everything - avoid this pattern"""
passTesting Controller Methods
Example test structure for controller methods:
import pytest
from unittest.mock import AsyncMock, MagicMock
from app.controllers.device_controller import DeviceController
@pytest.mark.asyncio
async def test_handle_control():
# Arrange
device_id = "test_device"
payload = {"command": "restart"}
client = MagicMock()
# Act
result = await DeviceController.handle_control(device_id, payload, client)
# Assert
assert result["status"] == "restarted"
client.publish.assert_called_once()Next Steps
Using Redis in Controllers - Add caching and state management
Database Operations - Work with database models
Best Practices - Follow controller organization guidelines
Last updated