Dispatching Jobs

Once you've created a job, you need to dispatch it to the queue for processing. RouteMQ provides several methods for dispatching jobs.

Using the dispatch() Helper

The simplest way to dispatch a job:

from core.queue.queue_manager import dispatch
from app.jobs.send_notification_job import SendNotificationJob

# In your MQTT handler or anywhere in your code
async def handle_message(context):
    # Create and configure the job
    job = SendNotificationJob()
    job.user_id = 123
    job.message = "Your order has been shipped!"

    # Dispatch to queue
    await dispatch(job)

The dispatch() helper:

  • Uses the queue specified in the job (job.queue)

  • Uses the default connection from .env

  • Returns immediately after pushing to queue

Using the Queue Manager

For more control, use the QueueManager directly:

from core.queue.queue_manager import queue
from app.jobs.send_email_job import SendEmailJob

# Create job
job = SendEmailJob()
job.to = "[email protected]"
job.subject = "Welcome!"
job.body = "Thanks for signing up!"

# Dispatch to default queue
await queue.push(job)

# Dispatch to specific queue
await queue.push(job, queue="emails")

# Dispatch to specific connection
await queue.push(job, connection="database")

Delayed Jobs

Schedule a job to run after a delay:

from core.queue.queue_manager import queue
from app.jobs.generate_report_job import GenerateReportJob

job = GenerateReportJob()
job.report_type = "monthly"
job.user_id = 456

# Run after 1 hour (3600 seconds)
await queue.later(3600, job)

# Run after 5 minutes
await queue.later(300, job)

# Run after 24 hours
await queue.later(86400, job)

How delayed jobs work:

  • Job is stored with an available_at timestamp

  • Worker ignores the job until the timestamp is reached

  • Redis uses sorted sets for efficient delay handling

  • Database uses datetime comparison

Bulk Dispatching

Dispatch multiple jobs at once:

from core.queue.queue_manager import queue
from app.jobs.send_notification_job import SendNotificationJob

jobs = []
for user_id in user_ids:
    job = SendNotificationJob()
    job.user_id = user_id
    job.message = "System maintenance tonight"
    jobs.append(job)

# Dispatch all jobs
await queue.bulk(jobs)

This is more efficient than dispatching jobs one by one in a loop.

Dispatching from MQTT Handlers

Example 1: Dispatch from Controller

# app/controllers/order_controller.py
from core.controller import Controller
from core.queue.queue_manager import dispatch
from app.jobs.process_order_job import ProcessOrderJob


class OrderController(Controller):
    @staticmethod
    async def handle_new_order(order_id: str, payload, client):
        print(f"Received new order {order_id}")

        # Dispatch background job for processing
        job = ProcessOrderJob()
        job.order_id = order_id
        job.order_data = payload
        await dispatch(job)

        # Respond immediately without waiting for processing
        return {"status": "accepted", "order_id": order_id}

Example 2: Dispatch with Middleware

# app/middleware/queue_middleware.py
from core.middleware import Middleware
from core.queue.queue_manager import dispatch
from app.jobs.log_message_job import LogMessageJob


class QueueMiddleware(Middleware):
    async def handle(self, context, next_handler):
        # Dispatch logging job
        job = LogMessageJob()
        job.topic = context['topic']
        job.payload = context['payload']
        await dispatch(job)

        # Continue processing
        return await next_handler(context)

Example 3: Conditional Dispatching

async def handle_sensor_data(device_id: str, payload, client):
    temperature = payload.get('temperature')

    # Only queue processing for high temperatures
    if temperature > 30:
        job = ProcessHighTempJob()
        job.device_id = device_id
        job.temperature = temperature
        await dispatch(job)

    return {"status": "received"}

Checking Queue Size

Monitor how many jobs are pending:

from core.queue.queue_manager import queue

# Check queue size
size = await queue.size("default")
print(f"Pending jobs: {size}")

# Check multiple queues
for queue_name in ["default", "emails", "reports"]:
    size = await queue.size(queue_name)
    print(f"{queue_name}: {size} jobs")

Queue Manager API Reference

push()

Push a job to the queue immediately.

await queue.push(
    job,                    # Job instance (required)
    queue="default",        # Queue name (optional)
    connection="redis"      # Connection (optional)
)

later()

Push a job with a delay.

await queue.later(
    delay,                  # Delay in seconds (required)
    job,                    # Job instance (required)
    queue="default",        # Queue name (optional)
    connection="redis"      # Connection (optional)
)

bulk()

Push multiple jobs at once.

await queue.bulk(
    jobs,                   # List of Job instances (required)
    queue="default",        # Queue name (optional)
    connection="redis"      # Connection (optional)
)

size()

Get the number of pending jobs in a queue.

size = await queue.size(
    queue="default",        # Queue name (optional)
    connection="redis"      # Connection (optional)
)

Common Patterns

Pattern 1: Fan-out

Dispatch multiple jobs from one event:

async def handle_user_signup(user_id, email):
    # Send welcome email
    email_job = SendEmailJob()
    email_job.to = email
    email_job.template = "welcome"
    await dispatch(email_job)

    # Create user profile
    profile_job = CreateProfileJob()
    profile_job.user_id = user_id
    await dispatch(profile_job)

    # Send SMS verification
    sms_job = SendSMSJob()
    sms_job.user_id = user_id
    await dispatch(sms_job)

Pattern 2: Delayed Chain

Schedule a series of jobs:

from core.queue.queue_manager import queue

# Send welcome email immediately
welcome_job = SendEmailJob()
welcome_job.to = email
welcome_job.template = "welcome"
await dispatch(welcome_job)

# Send tips email after 1 day
tips_job = SendEmailJob()
tips_job.to = email
tips_job.template = "tips"
await queue.later(86400, tips_job)  # 24 hours

# Send feedback request after 7 days
feedback_job = SendEmailJob()
feedback_job.to = email
feedback_job.template = "feedback"
await queue.later(604800, feedback_job)  # 7 days

Pattern 3: Priority Queues

Use different queues for different priorities:

# High priority - immediate processing
if is_urgent:
    job.queue = "high-priority"
    await dispatch(job)

# Normal priority
elif is_normal:
    job.queue = "default"
    await dispatch(job)

# Low priority - background cleanup
else:
    job.queue = "low-priority"
    await dispatch(job)

Then run workers with appropriate settings:

# High priority - check every second
python main.py --queue-work --queue high-priority --sleep 1

# Normal priority - check every 3 seconds
python main.py --queue-work --queue default --sleep 3

# Low priority - check every 10 seconds
python main.py --queue-work --queue low-priority --sleep 10

Pattern 4: Rate Limiting

Prevent overwhelming external services:

async def send_to_external_api(data):
    # Dispatch job instead of calling API directly
    job = ExternalAPIJob()
    job.data = data
    await dispatch(job)

    # Worker processes these at controlled rate
    # Can even add delays: await queue.later(5, job)

Error Handling

Handle Dispatch Errors

from core.queue.queue_manager import dispatch

try:
    job = SendEmailJob()
    job.to = "[email protected]"
    await dispatch(job)
    print("Job dispatched successfully")

except Exception as e:
    print(f"Failed to dispatch job: {e}")
    # Handle error (log, retry, alert, etc.)

Verify Job Data

def create_email_job(to, subject, body):
    # Validate data before dispatching
    if not to or '@' not in to:
        raise ValueError("Invalid email address")

    if not subject:
        raise ValueError("Subject is required")

    job = SendEmailJob()
    job.to = to
    job.subject = subject
    job.body = body

    return job

# Use it
try:
    job = create_email_job(email, subject, body)
    await dispatch(job)
except ValueError as e:
    print(f"Invalid job data: {e}")

Best Practices

1. Dispatch Early, Process Later

# ✅ Good - dispatch and return quickly
async def handle_order(order_data):
    job = ProcessOrderJob()
    job.order_data = order_data
    await dispatch(job)
    return {"status": "accepted"}

# ❌ Bad - blocking the handler
async def handle_order(order_data):
    await process_order(order_data)  # Takes 30 seconds!
    return {"status": "processed"}

2. Don't Dispatch Too Much Data

# ✅ Good - store ID only
job = ProcessOrderJob()
job.order_id = order_id  # Small
await dispatch(job)

# In handle():
# order = await fetch_order(self.order_id)

# ❌ Bad - storing large data
job = ProcessOrderJob()
job.order_data = huge_dictionary  # Large payload
await dispatch(job)

3. Use Appropriate Delays

# ✅ Good - reasonable delays
await queue.later(60, job)      # 1 minute
await queue.later(3600, job)    # 1 hour
await queue.later(86400, job)   # 1 day

# ❌ Bad - very long delays
await queue.later(31536000, job)  # 1 year - use a scheduler instead

4. Choose the Right Queue

# ✅ Good - organized by purpose
email_job.queue = "emails"
report_job.queue = "reports"
cleanup_job.queue = "low-priority"

# ❌ Bad - everything in default
job.queue = "default"  # For all jobs

Next Steps

Last updated