Queue Drivers
RouteMQ supports two queue drivers: Redis and Database. This guide explains how they work and when to use each.
Overview
Queue drivers handle the storage and retrieval of jobs. The driver you choose affects:
Performance - How fast jobs are queued and processed
Persistence - Whether jobs survive crashes
Infrastructure - What services you need to run
Scalability - How well it handles high job volumes
Redis Queue Driver
Fast, in-memory queue backed by Redis.
Features
✅ Very fast - In-memory storage for low latency
✅ High throughput - Handles thousands of jobs/second
✅ Atomic operations - Uses RPOPLPUSH for safe job claiming
✅ Delayed jobs - Efficient sorted sets for scheduling
✅ Scalable - Easy to cluster Redis for more capacity
###Requirements
Redis server running (v5.0+)
ENABLE_REDIS=truein.envQUEUE_CONNECTION=redisin.env
Configuration
# Enable Redis
ENABLE_REDIS=true
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_DB=0
REDIS_PASSWORD=
# Use Redis for queue
QUEUE_CONNECTION=redisData Structures
Redis queue uses different data structures for different purposes:
routemq:queue:{name}
Pending jobs
List (FIFO)
routemq:queue:{name}:delayed
Delayed jobs
Sorted Set (by timestamp)
routemq:queue:{name}:reserved
Processing jobs
List
routemq:queue:failed:{name}
Failed jobs
List
How It Works
Pushing a job:
1. Job serialized to JSON
2. RPUSH to routemq:queue:{name}
3. Job ID returnedPopping a job:
1. Check for delayed jobs ready to process
2. RPOPLPUSH from pending to reserved
3. Increment attempts
4. Return job dataCompleting a job:
1. LREM from reserved list
2. Job deletedFailing a job:
1. If attempts < max_tries:
- LREM from reserved
- RPUSH back to pending (or delayed)
2. Else:
- Move to failed list
- LREM from reservedAdvantages
Speed: Sub-millisecond latency
Throughput: Handle high job volumes
Simple: No complex queries needed
Scalable: Easy to add Redis replicas
Disadvantages
Volatility: Jobs lost if Redis crashes (unless AOF enabled)
Infrastructure: Requires Redis server
Memory: All jobs stored in RAM
Best For
High-volume applications
Real-time job processing
When you already use Redis
When speed is critical
Database Queue Driver
Persistent queue backed by MySQL.
Features
✅ Persistent - Jobs survive crashes
✅ ACID - Transactional guarantees
✅ Inspectable - Easy to query with SQL
✅ Reliable - No job loss
✅ No extra service - Uses existing MySQL
Requirements
MySQL server running (v8.0+)
ENABLE_MYSQL=truein.envQUEUE_CONNECTION=databasein.env
Configuration
# Enable MySQL
ENABLE_MYSQL=true
DB_HOST=localhost
DB_PORT=3306
DB_NAME=mqtt_framework
DB_USER=root
DB_PASS=your_password
# Use database for queue
QUEUE_CONNECTION=databaseDatabase Tables
queue_jobs:
id
INT
Primary key
queue
VARCHAR(255)
Queue name
payload
TEXT
Serialized job data
attempts
INT
Number of attempts
reserved_at
DATETIME
When job was claimed (NULL if pending)
available_at
DATETIME
When job becomes available
created_at
DATETIME
When job was created
queue_failed_jobs:
id
INT
Primary key
connection
VARCHAR(255)
Connection name
queue
VARCHAR(255)
Queue name
payload
TEXT
Serialized job data
exception
TEXT
Exception details
failed_at
DATETIME
When job failed
How It Works
Pushing a job:
INSERT INTO queue_jobs
(queue, payload, attempts, available_at, created_at)
VALUES (?, ?, 0, ?, NOW())Popping a job:
-- Use FOR UPDATE SKIP LOCKED for concurrency
SELECT * FROM queue_jobs
WHERE queue = ?
AND reserved_at IS NULL
AND available_at <= NOW()
ORDER BY id
LIMIT 1
FOR UPDATE SKIP LOCKED;
-- Mark as reserved
UPDATE queue_jobs
SET reserved_at = NOW(), attempts = attempts + 1
WHERE id = ?Completing a job:
DELETE FROM queue_jobs WHERE id = ?Failing a job:
-- If retrying
UPDATE queue_jobs
SET reserved_at = NULL,
available_at = DATE_ADD(NOW(), INTERVAL ? SECOND)
WHERE id = ?
-- If permanently failed
INSERT INTO queue_failed_jobs
(connection, queue, payload, exception, failed_at)
VALUES (?, ?, ?, ?, NOW());
DELETE FROM queue_jobs WHERE id = ?Advantages
Persistence: Jobs survive crashes
Reliability: ACID transactions
Visibility: Easy to inspect with SQL
Simplicity: No additional infrastructure
Disadvantages
Speed: Slower than Redis (disk I/O)
Load: Increases database queries
Scaling: Harder to scale than Redis
Best For
Low to medium job volumes
When persistence is critical
When you don't want to manage Redis
When you need to inspect jobs easily
Comparison
Speed
⭐⭐⭐⭐⭐ Very Fast
⭐⭐⭐ Moderate
Persistence
⭐⭐ Configurable
⭐⭐⭐⭐⭐ Always
Scalability
⭐⭐⭐⭐⭐ Excellent
⭐⭐⭐ Good
Reliability
⭐⭐⭐ Good
⭐⭐⭐⭐⭐ Excellent
Setup
⭐⭐⭐ Need Redis
⭐⭐⭐⭐⭐ Use existing DB
Inspection
⭐⭐⭐ Redis CLI
⭐⭐⭐⭐⭐ SQL queries
Memory
⭐⭐ All in RAM
⭐⭐⭐⭐⭐ On disk
Switching Drivers
You can switch drivers at any time:
# Change in .env
QUEUE_CONNECTION=redis # or 'database'Important notes:
Jobs in the old driver won't be transferred
Complete existing jobs before switching
Or manually migrate jobs between drivers
Performance Tips
Redis
# Use a dedicated Redis database
REDIS_DB=1 # Separate from cache
# Enable persistence (optional)
# In redis.conf:
# appendonly yes
# appendfsync everysecDatabase
-- Add indexes for performance
CREATE INDEX idx_queue_reserved ON queue_jobs(queue, reserved_at);
CREATE INDEX idx_available ON queue_jobs(available_at);
-- Monitor slow queries
SHOW FULL PROCESSLIST;Monitoring
Redis
# Connect to Redis
redis-cli
# Check queue sizes
LLEN routemq:queue:default
LLEN routemq:queue:emails
LLEN routemq:queue:reports
# Check delayed jobs
ZCARD routemq:queue:default:delayed
# View pending jobs
LRANGE routemq:queue:default 0 9
# Check memory usage
INFO memory
# Monitor commands
MONITORDatabase
-- Check pending jobs by queue
SELECT queue, COUNT(*) as pending_jobs
FROM queue_jobs
WHERE reserved_at IS NULL
GROUP BY queue;
-- Check processing jobs
SELECT queue, COUNT(*) as processing_jobs
FROM queue_jobs
WHERE reserved_at IS NOT NULL
GROUP BY queue;
-- Check job age
SELECT queue,
MIN(created_at) as oldest_job,
MAX(created_at) as newest_job,
COUNT(*) as total_jobs
FROM queue_jobs
GROUP BY queue;
-- Check failed jobs
SELECT queue, COUNT(*) as failed_jobs
FROM queue_failed_jobs
GROUP BY queue;
-- Find stuck jobs (reserved > 1 hour ago)
SELECT * FROM queue_jobs
WHERE reserved_at < DATE_SUB(NOW(), INTERVAL 1 HOUR);Troubleshooting
Redis Connection Issues
# Test connection
redis-cli ping
# Check if Redis is running
redis-cli INFO server
# Restart Redis
# macOS: brew services restart redis
# Linux: sudo systemctl restart redisDatabase Connection Issues
# Test connection
mysql -h localhost -u root -p -e "SELECT 1"
# Check if MySQL is running
sudo systemctl status mysql
# Check for locks
mysql> SHOW PROCESSLIST;
mysql> SHOW OPEN TABLES WHERE In_use > 0;Jobs Not Processing
Check driver configuration:
QUEUE_CONNECTION=redis # Match your setupVerify service is running:
# Redis redis-cli ping # MySQL mysql -h localhost -u root -p -e "SELECT 1"Check worker connection:
python main.py --queue-work --connection redis python main.py --queue-work --connection database
Best Practices
1. Choose Based on Requirements
High volume, speed critical → Redis
Persistence critical → Database
Low volume, simple setup → Database
Already using Redis → Redis2. Monitor Both
Even if using Redis, keep failed jobs in database:
# RedisQueue already does this
await driver.failed(connection, queue, payload, exception)
# Stores in database if available3. Regular Cleanup
-- Clean old failed jobs (> 30 days)
DELETE FROM queue_failed_jobs
WHERE failed_at < DATE_SUB(NOW(), INTERVAL 30 DAY);4. Backup Important Queues
# Redis
redis-cli SAVE
cp /var/lib/redis/dump.rdb /backup/
# MySQL
mysqldump -u root -p mqtt_framework queue_jobs > backup.sqlNext Steps
Last updated