Queue System
RouteMQ includes a powerful background task queue system similar to Laravel's queue functionality. This allows you to defer time-consuming tasks (like sending emails, processing data, generating reports) to background workers, keeping your MQTT message handlers fast and responsive.
Overview
The queue system consists of several components:
Job: A class that defines a task to be executed in the background
Queue Manager: Dispatches jobs to queues
Queue Driver: Handles storage and retrieval of jobs (Redis or Database)
Queue Worker: Processes jobs from the queue
Architecture
┌─────────────┐
│ Your Code │
└──────┬──────┘
│ dispatch(job)
▼
┌─────────────┐
│Queue Manager│
└──────┬──────┘
│ push
▼
┌─────────────────┐
│ Queue Driver │
│ (Redis/Database)│
└──────┬──────────┘
│ pop
▼
┌─────────────┐
│Queue Worker │
│ job.handle()│
└─────────────┘Quick Start
# 1. Create a job
from core.job import Job
class SendEmailJob(Job):
max_tries = 3
queue = "emails"
def __init__(self):
super().__init__()
self.to = None
self.subject = None
async def handle(self):
# Send email logic
print(f"Sending email to {self.to}")
# 2. Dispatch the job
from core.queue.queue_manager import dispatch
job = SendEmailJob()
job.to = "[email protected]"
job.subject = "Welcome!"
await dispatch(job)
# 3. Run the worker
# python main.py --queue-work --queue emailsKey Features
✅ Laravel-style API - Familiar syntax for Laravel developers
✅ Two Queue Drivers - Redis (fast) or Database (persistent)
✅ Automatic Retries - Configurable retry logic with delays
✅ Multiple Queues - Organize jobs by priority or type
✅ Delayed Jobs - Schedule jobs to run later
✅ Failed Job Tracking - Inspect and retry failed jobs
✅ Docker Support - Production-ready deployment
✅ Graceful Shutdown - Workers handle SIGTERM/SIGINT
Documentation
Getting Started - Installation and configuration
Creating Jobs - Define background tasks
Dispatching Jobs - Send jobs to queues
Running Workers - Process jobs in background
Queue Drivers - Redis vs Database queues
Failed Jobs - Handle and retry failures
Best Practices - Tips for production use
Example Use Cases
1. Email Notifications
from core.queue.queue_manager import dispatch
from app.jobs.send_email_job import SendEmailJob
async def handle_user_signup(context):
job = SendEmailJob()
job.to = context["payload"]["email"]
job.template = "welcome"
await dispatch(job)2. Data Processing
from core.queue.queue_manager import queue
job = ProcessDataJob()
job.device_id = device_id
job.sensor_data = data
await queue.push(job, queue="data-processing")3. Scheduled Reports
# Schedule report for 1 hour later
job = GenerateReportJob()
job.user_id = user_id
await queue.later(3600, job) # 3600 seconds = 1 hourNext Steps
Configure your queue - Set up Redis or Database
Create your first job - Define a background task
Dispatch jobs - Send jobs from your code
Run workers - Process jobs in background
Last updated