Queue System

RouteMQ includes a powerful background task queue system similar to Laravel's queue functionality. This allows you to defer time-consuming tasks (like sending emails, processing data, generating reports) to background workers, keeping your MQTT message handlers fast and responsive.

Overview

The queue system consists of several components:

  • Job: A class that defines a task to be executed in the background

  • Queue Manager: Dispatches jobs to queues

  • Queue Driver: Handles storage and retrieval of jobs (Redis or Database)

  • Queue Worker: Processes jobs from the queue

Architecture

┌─────────────┐
│  Your Code  │
└──────┬──────┘
       │ dispatch(job)

┌─────────────┐
│Queue Manager│
└──────┬──────┘
       │ push

┌─────────────────┐
│  Queue Driver   │
│ (Redis/Database)│
└──────┬──────────┘
       │ pop

┌─────────────┐
│Queue Worker │
│ job.handle()│
└─────────────┘

Quick Start

# 1. Create a job
from core.job import Job

class SendEmailJob(Job):
    max_tries = 3
    queue = "emails"

    def __init__(self):
        super().__init__()
        self.to = None
        self.subject = None

    async def handle(self):
        # Send email logic
        print(f"Sending email to {self.to}")

# 2. Dispatch the job
from core.queue.queue_manager import dispatch

job = SendEmailJob()
job.to = "[email protected]"
job.subject = "Welcome!"
await dispatch(job)

# 3. Run the worker
# python main.py --queue-work --queue emails

Key Features

  • Laravel-style API - Familiar syntax for Laravel developers

  • Two Queue Drivers - Redis (fast) or Database (persistent)

  • Automatic Retries - Configurable retry logic with delays

  • Multiple Queues - Organize jobs by priority or type

  • Delayed Jobs - Schedule jobs to run later

  • Failed Job Tracking - Inspect and retry failed jobs

  • Docker Support - Production-ready deployment

  • Graceful Shutdown - Workers handle SIGTERM/SIGINT

Documentation

Example Use Cases

1. Email Notifications

from core.queue.queue_manager import dispatch
from app.jobs.send_email_job import SendEmailJob

async def handle_user_signup(context):
    job = SendEmailJob()
    job.to = context["payload"]["email"]
    job.template = "welcome"
    await dispatch(job)

2. Data Processing

from core.queue.queue_manager import queue

job = ProcessDataJob()
job.device_id = device_id
job.sensor_data = data
await queue.push(job, queue="data-processing")

3. Scheduled Reports

# Schedule report for 1 hour later
job = GenerateReportJob()
job.user_id = user_id
await queue.later(3600, job)  # 3600 seconds = 1 hour

Next Steps

  1. Configure your queue - Set up Redis or Database

  2. Create your first job - Define a background task

  3. Dispatch jobs - Send jobs from your code

  4. Run workers - Process jobs in background

Last updated