Creating Jobs

Jobs are classes that extend the Job base class. Each job must implement the handle() method which contains the logic to be executed in the background.

Basic Job Structure

# app/jobs/send_notification_job.py
import logging
from core.job import Job

logger = logging.getLogger("RouteMQ.Jobs.SendNotificationJob")


class SendNotificationJob(Job):
    """Send a notification to a user."""

    # Job configuration
    max_tries = 3          # Maximum retry attempts
    timeout = 60           # Maximum seconds to run
    retry_after = 10       # Seconds to wait before retry
    queue = "default"      # Queue name

    def __init__(self):
        super().__init__()
        self.user_id = None
        self.message = None

    async def handle(self) -> None:
        """Execute the job."""
        logger.info(f"Sending notification to user {self.user_id}")
        logger.info(f"Message: {self.message}")

        # Your notification logic here
        # e.g., send push notification, SMS, email, etc.

        logger.info("Notification sent successfully")

    async def failed(self, exception: Exception) -> None:
        """Called when the job fails permanently."""
        logger.error(
            f"Failed to send notification to user {self.user_id}: {str(exception)}"
        )
        # Handle failure (e.g., log to monitoring service, alert admin)

Job Properties

Configure your job's behavior with these class attributes:

Property
Description
Default
Example

max_tries

Maximum number of retry attempts

3

max_tries = 5

timeout

Maximum seconds the job can run

60

timeout = 120

retry_after

Seconds to wait before retrying after failure

0

retry_after = 30

queue

Queue name

"default"

queue = "emails"

Custom Data in Jobs

All public instance attributes are automatically serialized and restored when the job is processed:

Supported Data Types

  • ✅ Strings, integers, floats, booleans

  • ✅ Lists and tuples

  • ✅ Dictionaries

  • ✅ None values

  • ❌ Objects (they won't serialize - store IDs instead)

  • ❌ File handles

  • ❌ Database connections

The handle() Method

The handle() method is where your job's logic lives:

Key points:

  • Must be async (asynchronous)

  • Should return None

  • Can raise exceptions (will trigger retry)

  • Has access to self.attempts (current attempt number)

  • Has access to self.job_id (unique job identifier)

The failed() Method

The failed() method is called when a job fails permanently (exceeds max_tries):

Job Examples

Example 1: Email Job

Example 2: Data Processing Job

Example 3: Report Generation Job

Job Lifecycle

Understanding how jobs are processed:

Best Practices

1. Keep Jobs Small and Focused

2. Make Jobs Idempotent

Jobs should be safe to run multiple times:

3. Set Appropriate Timeouts

4. Use Descriptive Class Names

5. Log Appropriately

Next Steps

Last updated