Failed Jobs
When jobs fail permanently (exceed max_tries), they're moved to the failed jobs storage for inspection and potential retry. This guide explains how to handle failed jobs.
What Makes a Job Fail?
A job fails permanently when:
Exceeds max_tries - Retried the maximum number of times
Unrecoverable error - Exception that can't be resolved by retrying
Timeout - Job exceeds its timeout limit repeatedly
Where Failed Jobs Are Stored
Database Storage (Recommended)
Failed jobs are stored in the queue_failed_jobs table:
CREATE TABLE queue_failed_jobs (
id INT AUTO_INCREMENT PRIMARY KEY,
connection VARCHAR(255) NOT NULL,
queue VARCHAR(255) NOT NULL,
payload TEXT NOT NULL,
exception TEXT NOT NULL,
failed_at DATETIME NOT NULL,
INDEX(queue)
);Redis Storage (Fallback)
If MySQL is disabled, failed jobs are stored in Redis:
routemq:queue:failed:{queue_name}Viewing Failed Jobs
Using MySQL
-- View all failed jobs
SELECT * FROM queue_failed_jobs
ORDER BY failed_at DESC;
-- Failed jobs by queue
SELECT queue, COUNT(*) as count
FROM queue_failed_jobs
GROUP BY queue;
-- Recent failures
SELECT * FROM queue_failed_jobs
WHERE failed_at >= DATE_SUB(NOW(), INTERVAL 1 HOUR)
ORDER BY failed_at DESC;
-- View specific failure
SELECT id, queue, exception, failed_at
FROM queue_failed_jobs
WHERE id = 123;
-- View full payload
SELECT payload FROM queue_failed_jobs WHERE id = 123;Using Redis CLI
# Connect to Redis
redis-cli
# Check failed job count
LLEN routemq:queue:failed:default
# View failed jobs
LRANGE routemq:queue:failed:default 0 9
# View specific job
LINDEX routemq:queue:failed:default 0The failed() Method
Override the failed() method in your job to handle permanent failures:
from core.job import Job
import logging
logger = logging.getLogger("MyJob")
class MyJob(Job):
async def handle(self):
# Job logic
pass
async def failed(self, exception: Exception):
"""
Called when job fails permanently.
Args:
exception: The exception that caused the final failure
"""
logger.error(f"Job failed permanently: {exception}")
# Send alert to admin
await send_admin_alert(
f"Job {self.__class__.__name__} failed",
str(exception)
)
# Log to monitoring service
await log_to_sentry(exception)
# Clean up resources
await cleanup_resources(self.resource_id)
# Update database status
await mark_as_failed(self.task_id)Common Failed Job Scenarios
Scenario 1: External API Failure
class CallExternalAPIJob(Job):
max_tries = 5
retry_after = 60 # Wait 1 minute between retries
async def handle(self):
response = await call_external_api(self.endpoint, self.data)
if not response.success:
raise Exception("API call failed")
async def failed(self, exception: Exception):
# API still failing after 5 tries
logger.error(f"API {self.endpoint} unreachable: {exception}")
# Store for manual retry later
await store_for_manual_processing(self.endpoint, self.data)
# Notify operations team
await send_slack_message("#ops", f"API {self.endpoint} is down")Scenario 2: Invalid Data
class ProcessDataJob(Job):
max_tries = 1 # Don't retry invalid data
async def handle(self):
if not self.validate_data():
raise ValueError("Invalid data format")
await process_data(self.data)
async def failed(self, exception: Exception):
# Log invalid data for investigation
logger.error(f"Invalid data: {self.data}")
# Store in error log
await save_error_log({
"data": self.data,
"error": str(exception),
"timestamp": datetime.now()
})Scenario 3: Resource Unavailable
class GenerateReportJob(Job):
max_tries = 3
retry_after = 300 # Wait 5 minutes
async def handle(self):
# Check if data is ready
if not await data_ready(self.report_id):
raise Exception("Data not ready")
await generate_report(self.report_id)
async def failed(self, exception: Exception):
# Data still not ready after 3 tries
logger.warning(f"Report {self.report_id} data not ready")
# Notify user
await send_email(
self.user_email,
"Report Delayed",
f"Your report is delayed due to data availability"
)Inspecting Failed Jobs
Get Job Details
from core.model import Model
from app.models.queue_failed_job import QueueFailedJob
from sqlalchemy import select
async def inspect_failed_job(job_id: int):
session = await Model.get_session()
result = await session.execute(
select(QueueFailedJob).where(QueueFailedJob.id == job_id)
)
failed_job = result.scalars().first()
if failed_job:
print(f"Queue: {failed_job.queue}")
print(f"Failed at: {failed_job.failed_at}")
print(f"Exception: {failed_job.exception}")
print(f"Payload: {failed_job.payload}")
await session.close()Analyze Failure Patterns
-- Most common failure reasons
SELECT
SUBSTRING_INDEX(exception, ':', 1) as error_type,
COUNT(*) as count
FROM queue_failed_jobs
GROUP BY error_type
ORDER BY count DESC;
-- Failures by hour
SELECT
DATE_FORMAT(failed_at, '%Y-%m-%d %H:00') as hour,
COUNT(*) as failures
FROM queue_failed_jobs
WHERE failed_at >= DATE_SUB(NOW(), INTERVAL 24 HOUR)
GROUP BY hour
ORDER BY hour;
-- Failure rate by queue
SELECT
queue,
COUNT(*) as total_failures,
COUNT(DISTINCT DATE(failed_at)) as days_with_failures
FROM queue_failed_jobs
GROUP BY queue;Retrying Failed Jobs
Manual Retry
from core.job import Job
from core.queue.queue_manager import dispatch
async def retry_failed_job(failed_job_id: int):
"""Retry a specific failed job."""
# Fetch failed job from database
session = await Model.get_session()
result = await session.execute(
select(QueueFailedJob).where(QueueFailedJob.id == failed_job_id)
)
failed_job = result.scalars().first()
if not failed_job:
print(f"Failed job {failed_job_id} not found")
return
# Deserialize and dispatch again
job = Job.unserialize(failed_job.payload)
await dispatch(job)
# Delete from failed jobs
await session.delete(failed_job)
await session.commit()
await session.close()
print(f"Retried failed job {failed_job_id}")Bulk Retry
async def retry_all_failed_jobs(queue: str = "default"):
"""Retry all failed jobs in a queue."""
session = await Model.get_session()
result = await session.execute(
select(QueueFailedJob).where(QueueFailedJob.queue == queue)
)
failed_jobs = result.scalars().all()
retried = 0
for failed_job in failed_jobs:
try:
job = Job.unserialize(failed_job.payload)
await dispatch(job)
await session.delete(failed_job)
retried += 1
except Exception as e:
print(f"Failed to retry job {failed_job.id}: {e}")
await session.commit()
await session.close()
print(f"Retried {retried} failed jobs from queue '{queue}'")Cleaning Up Failed Jobs
Delete Old Failed Jobs
-- Delete failed jobs older than 30 days
DELETE FROM queue_failed_jobs
WHERE failed_at < DATE_SUB(NOW(), INTERVAL 30 DAY);
-- Delete all failed jobs from a specific queue
DELETE FROM queue_failed_jobs
WHERE queue = 'old-queue';
-- Keep only last 1000 failed jobs
DELETE FROM queue_failed_jobs
WHERE id NOT IN (
SELECT id FROM (
SELECT id FROM queue_failed_jobs
ORDER BY failed_at DESC
LIMIT 1000
) as recent
);Automated Cleanup Script
# cleanup_failed_jobs.py
import asyncio
from datetime import datetime, timedelta
from core.model import Model
from app.models.queue_failed_job import QueueFailedJob
async def cleanup_old_failed_jobs(days: int = 30):
"""Delete failed jobs older than specified days."""
session = await Model.get_session()
cutoff_date = datetime.utcnow() - timedelta(days=days)
result = await session.execute(
delete(QueueFailedJob).where(QueueFailedJob.failed_at < cutoff_date)
)
deleted_count = result.rowcount
await session.commit()
await session.close()
print(f"Deleted {deleted_count} failed jobs older than {days} days")
if __name__ == "__main__":
asyncio.run(cleanup_old_failed_jobs(30))Schedule with cron:
# Run cleanup daily at 2 AM
0 2 * * * cd /path/to/RouteMQ && /path/to/venv/bin/python cleanup_failed_jobs.pyMonitoring Failed Jobs
Alert on Failure Threshold
async def check_failure_rate():
"""Alert if failure rate exceeds threshold."""
session = await Model.get_session()
# Count failures in last hour
one_hour_ago = datetime.utcnow() - timedelta(hours=1)
result = await session.execute(
select(func.count(QueueFailedJob.id))
.where(QueueFailedJob.failed_at >= one_hour_ago)
)
failure_count = result.scalar()
if failure_count > 100: # Threshold
await send_alert(
"High Failure Rate",
f"{failure_count} jobs failed in the last hour"
)
await session.close()Dashboard Query
-- Failed jobs summary for dashboard
SELECT
queue,
COUNT(*) as total_failures,
MAX(failed_at) as last_failure,
MIN(failed_at) as first_failure
FROM queue_failed_jobs
WHERE failed_at >= DATE_SUB(NOW(), INTERVAL 7 DAY)
GROUP BY queue
ORDER BY total_failures DESC;Best Practices
1. Always Implement failed()
class MyJob(Job):
async def failed(self, exception: Exception):
# Log the failure
logger.error(f"Job failed: {exception}")
# Clean up resources
# Notify stakeholders
# Update status2. Set Appropriate max_tries
# Quick tasks - fail fast
class QuickJob(Job):
max_tries = 2
# External API calls - retry more
class APIJob(Job):
max_tries = 5
retry_after = 60
# Critical jobs - many retries
class CriticalJob(Job):
max_tries = 10
retry_after = 3003. Regular Cleanup
# Clean up old failed jobs weekly
async def weekly_cleanup():
await cleanup_old_failed_jobs(days=30)4. Monitor Failure Patterns
# Track failure types
failures_by_type = {}
for exception_type in failure_exceptions:
failures_by_type[exception_type] = count
# Alert on unusual patternsNext Steps
Last updated