Running Queue Workers
Queue workers are background processes that fetch and execute jobs from the queue. This guide explains how to run and manage workers.
Starting a Worker
Basic Usage
Start a worker to process jobs from the default queue:
# Process jobs from 'default' queue
python main.py --queue-work
# Or using the routemq command
routemq --queue-workThe worker will:
Connect to Redis/MySQL based on
QUEUE_CONNECTIONPoll the queue for jobs
Execute jobs as they become available
Automatically retry failed jobs
Run until stopped (Ctrl+C)
Worker Options
--queue
Specify which queue to process:
python main.py --queue-work --queue emails
python main.py --queue-work --queue high-priority
python main.py --queue-work --queue reports--connection
Override the queue connection:
# Use Redis queue
python main.py --queue-work --connection redis
# Use database queue
python main.py --queue-work --connection database--max-jobs
Process a maximum number of jobs then stop:
# Process 100 jobs then exit
python main.py --queue-work --max-jobs 100
# Process 1 job then exit (useful for testing)
python main.py --queue-work --max-jobs 1--max-time
Run for a maximum time (in seconds) then stop:
# Run for 1 hour
python main.py --queue-work --max-time 3600
# Run for 8 hours
python main.py --queue-work --max-time 28800--sleep
Seconds to sleep when no jobs are available:
# Check every second (high priority queue)
python main.py --queue-work --sleep 1
# Check every 5 seconds (normal priority)
python main.py --queue-work --sleep 5
# Check every 10 seconds (low priority)
python main.py --queue-work --sleep 10--max-tries
Override the maximum retry attempts for all jobs:
# Retry failed jobs up to 5 times
python main.py --queue-work --max-tries 5
# Never retry (fail immediately)
python main.py --queue-work --max-tries 1--timeout
Maximum seconds a job can run:
# 2 minute timeout
python main.py --queue-work --timeout 120
# 10 minute timeout
python main.py --queue-work --timeout 600Multiple Workers
Run multiple workers for different queues:
# Terminal 1: High-priority queue (check every second)
python main.py --queue-work --queue high-priority --sleep 1
# Terminal 2: Default queue (check every 3 seconds)
python main.py --queue-work --queue default --sleep 3
# Terminal 3: Low-priority queue (check every 10 seconds)
python main.py --queue-work --queue low-priority --sleep 10
# Terminal 4: Email queue (dedicated worker)
python main.py --queue-work --queue emails --sleep 5Production Deployment
Using Docker Compose
The easiest way to run workers in production:
# Start all services including workers
docker compose up -d
# Scale workers
docker compose up -d --scale queue-worker-default=5
# View worker logs
docker compose logs -f queue-worker-defaultSee Docker Deployment for details.
Using Supervisor
For non-Docker deployments, use Supervisor:
; /etc/supervisor/conf.d/routemq-queue.conf
[program:routemq-queue-default]
command=/path/to/venv/bin/python main.py --queue-work --queue default --sleep 3
directory=/path/to/RouteMQ
user=www-data
autostart=true
autorestart=true
redirect_stderr=true
stdout_logfile=/var/log/routemq/queue-default.log
startsecs=10
stopwaitsecs=60
[program:routemq-queue-high]
command=/path/to/venv/bin/python main.py --queue-work --queue high-priority --sleep 1
directory=/path/to/RouteMQ
user=www-data
autostart=true
autorestart=true
redirect_stderr=true
stdout_logfile=/var/log/routemq/queue-high.log
startsecs=10
stopwaitsecs=60
[program:routemq-queue-emails]
command=/path/to/venv/bin/python main.py --queue-work --queue emails --sleep 5
directory=/path/to/RouteMQ
user=www-data
autostart=true
autorestart=true
redirect_stderr=true
stdout_logfile=/var/log/routemq/queue-emails.log
startsecs=10
stopwaitsecs=60Then manage with supervisorctl:
# Reload configuration
sudo supervisorctl reread
sudo supervisorctl update
# Start workers
sudo supervisorctl start routemq-queue-default
sudo supervisorctl start routemq-queue-high
sudo supervisorctl start routemq-queue-emails
# Check status
sudo supervisorctl status
# View logs
sudo supervisorctl tail -f routemq-queue-default
# Restart worker
sudo supervisorctl restart routemq-queue-default
# Stop worker
sudo supervisorctl stop routemq-queue-defaultUsing systemd
Create systemd service files:
# /etc/systemd/system/routemq-queue-default.service
[Unit]
Description=RouteMQ Queue Worker (Default)
After=network.target redis.service mysql.service
[Service]
Type=simple
User=www-data
WorkingDirectory=/path/to/RouteMQ
ExecStart=/path/to/venv/bin/python main.py --queue-work --queue default --sleep 3
Restart=always
RestartSec=10
StandardOutput=append:/var/log/routemq/queue-default.log
StandardError=append:/var/log/routemq/queue-default-error.log
[Install]
WantedBy=multi-user.targetManage with systemctl:
# Reload systemd
sudo systemctl daemon-reload
# Enable service (start on boot)
sudo systemctl enable routemq-queue-default
# Start service
sudo systemctl start routemq-queue-default
# Check status
sudo systemctl status routemq-queue-default
# View logs
sudo journalctl -u routemq-queue-default -f
# Restart service
sudo systemctl restart routemq-queue-default
# Stop service
sudo systemctl stop routemq-queue-defaultWorker Lifecycle
Understanding how workers process jobs:
1. Worker Starts
↓
2. Connect to Queue (Redis/MySQL)
↓
3. Poll for Jobs
↓
├─ Job Available
│ ↓
│ Reserve Job (mark as processing)
│ ↓
│ Execute job.handle()
│ ↓
│ ├─ Success → Delete Job
│ └─ Failure → Release or Move to Failed
│ ↓
│ Loop back to Poll
│
└─ No Jobs → Sleep → Loop back to PollGraceful Shutdown
Workers handle shutdown signals gracefully:
# Send SIGTERM (recommended)
kill -TERM <pid>
# Or Ctrl+C (sends SIGINT)
^C
# Worker will:
# 1. Stop accepting new jobs
# 2. Finish current job
# 3. Clean up connections
# 4. ExitMonitoring Workers
View Worker Output
# In terminal
python main.py --queue-work --queue default
# Output:
# 2024-01-15 10:30:00 - RouteMQ.QueueWorker - INFO - Queue worker started for queue 'default'
# 2024-01-15 10:30:05 - RouteMQ.QueueWorker - INFO - Processing job 123 (attempt 1)
# 2024-01-15 10:30:07 - RouteMQ.QueueWorker - INFO - Job 123 completed successfullyCheck Queue Size
from core.queue.queue_manager import queue
# Check how many jobs are waiting
size = await queue.size("default")
print(f"Pending jobs: {size}")Monitor with Redis CLI
# Connect to Redis
redis-cli
# Check queue length
LLEN routemq:queue:default
# Check delayed jobs
ZCARD routemq:queue:default:delayed
# Check reserved jobs
LLEN routemq:queue:default:reserved
# View queue contents
LRANGE routemq:queue:default 0 9 # First 10 jobsMonitor with MySQL
-- Connect to MySQL
mysql -u root -p routemq_production
-- Check pending jobs
SELECT queue, COUNT(*) as pending
FROM queue_jobs
WHERE reserved_at IS NULL
GROUP BY queue;
-- Check reserved jobs (being processed)
SELECT queue, COUNT(*) as processing
FROM queue_jobs
WHERE reserved_at IS NOT NULL
GROUP BY queue;
-- Check failed jobs
SELECT queue, COUNT(*) as failed
FROM queue_failed_jobs
GROUP BY queue;
-- View job details
SELECT * FROM queue_jobs
WHERE queue = 'default'
ORDER BY created_at DESC
LIMIT 10;Troubleshooting
Worker Not Processing Jobs
Check if worker is running:
ps aux | grep "queue-work"Check worker logs for errors:
# Docker
docker compose logs queue-worker-default
# Supervisor
sudo supervisorctl tail -f routemq-queue-default
# systemd
sudo journalctl -u routemq-queue-default -n 50Common issues:
Queue name mismatch between dispatch and worker
Redis/MySQL connection issues
Jobs failing during execution
Jobs Timing Out
If jobs are timing out:
# Increase timeout
python main.py --queue-work --timeout 300
# Or set timeout in job class
class MyJob(Job):
timeout = 300 # 5 minutesHigh Memory Usage
If worker memory grows:
# Restart worker after processing N jobs
python main.py --queue-work --max-jobs 1000
# Then use supervisor/systemd to auto-restartWorker Stuck
If worker seems stuck:
Send SIGTERM to gracefully stop
Check for infinite loops in job code
Add timeouts to external API calls
Review job logs for errors
Best Practices
1. Run Multiple Workers
# Scale workers based on load
docker compose up -d --scale queue-worker-default=52. Use Different Queues
# High priority - fast polling
python main.py --queue-work --queue critical --sleep 1
# Normal priority
python main.py --queue-work --queue default --sleep 3
# Low priority - slow polling
python main.py --queue-work --queue cleanup --sleep 303. Set Resource Limits
# In supervisor config
[program:routemq-queue]
environment=PYTHONUNBUFFERED="1"
priority=999
startsecs=10
stopwaitsecs=60
killasgroup=true4. Log Everything
# In jobs
logger.info(f"Processing job {self.job_id}")
logger.info(f"Job completed in {elapsed}s")5. Monitor Queue Depth
# Alert if queue grows too large
size = await queue.size("default")
if size > 1000:
await send_alert("Queue backlog detected")Next Steps
Last updated