Worker Manager API

Complete API reference for the RouteMQ Worker Manager class and worker process management for horizontal scaling.

WorkerManager Class

The WorkerManager class manages multiple worker processes to enable horizontal scaling of MQTT message processing through shared subscriptions. It automatically distributes workload across multiple processes for high-throughput scenarios.

Import

from core.worker_manager import WorkerManager

Constructor

WorkerManager(router, group_name=None, router_directory="app.routers")

Parameters:

  • router (Router): Router instance containing route definitions

  • group_name (str, optional): MQTT shared subscription group name. Default: value from MQTT_GROUP_NAME env var or "mqtt_framework_group"

  • router_directory (str): Python module path for dynamic router loading. Default: "app.routers"

Example:

from core.router import Router
from core.worker_manager import WorkerManager

# Create router with shared routes
router = Router()
router.on("telemetry/{sensor_id}", TelemetryController.handle_data, 
          shared=True, worker_count=5)

# Create worker manager
worker_manager = WorkerManager(
    router=router,
    group_name="telemetry_workers",
    router_directory="app.routers"
)

Core Methods

start_workers(num_workers=None)

Start worker processes for handling shared subscriptions.

Signature:

Parameters:

  • num_workers (int, optional): Number of workers to start. If None, uses router.get_total_workers_needed()

Example:

stop_workers()

Stop all worker processes gracefully.

Signature:

Example:

get_worker_count()

Get the number of currently active worker processes.

Signature:

Returns: int - Number of active workers

Example:

get_shared_routes_info()

Extract information about routes that require shared subscriptions.

Signature:

Returns: List of dictionaries containing shared route information

Example:

Configuration

Worker processes are configured through environment variables:

MQTT Broker Configuration

Example Configuration

WorkerProcess Class

Individual worker process that handles MQTT subscriptions. This class is used internally by WorkerManager.

Key Features

  • Isolated Process: Each worker runs in a separate process for true parallelism

  • Router Loading: Dynamically loads router configuration from specified directory

  • MQTT Connection: Maintains its own MQTT client connection

  • Shared Subscriptions: Uses MQTT shared subscription feature for load balancing

  • Error Handling: Graceful error handling for message processing failures

Worker Lifecycle

  1. Router Setup: Load router configuration from specified directory

  2. MQTT Client Setup: Create and configure MQTT client with unique client ID

  3. Connection: Connect to MQTT broker

  4. Subscription: Subscribe to shared topics for load balancing

  5. Message Processing: Process incoming messages through router and middleware

  6. Shutdown: Clean disconnect and resource cleanup

Usage Patterns

Basic Usage

Advanced Configuration

Dynamic Scaling

Health Monitoring

Best Practices

1. Proper Shutdown Handling

2. Worker Count Optimization

3. Resource Management

4. Error Recovery

5. Logging and Monitoring

Troubleshooting

Common Issues

Workers Not Starting

Workers Dying

Memory Leaks

The Worker Manager API provides powerful horizontal scaling capabilities for RouteMQ applications, enabling you to handle high-throughput MQTT workloads efficiently across multiple processes.

Last updated