Basic Rate Limiting
Get started with RouteMQ's rate limiting middleware to control message processing rates and protect your application from abuse.
Quick Setup
The simplest way to add rate limiting to your routes:
from app.middleware.rate_limit import RateLimitMiddleware
# Basic rate limiting - 100 requests per minute
rate_limiter = RateLimitMiddleware(
max_requests=100,
window_seconds=60
)
# Apply to a route
router.on("api/{endpoint}",
ApiController.handle,
middleware=[rate_limiter])Configuration Options
Basic Parameters
rate_limiter = RateLimitMiddleware(
max_requests=50, # Maximum requests allowed
window_seconds=60, # Time window in seconds
strategy="sliding_window" # Rate limiting algorithm
)Common Configurations
API Endpoints
# Standard API rate limiting
api_rate_limit = RateLimitMiddleware(
max_requests=1000, # 1000 requests
window_seconds=3600 # per hour
)
router.on("api/{endpoint}",
ApiController.handle,
middleware=[api_rate_limit])IoT Device Data
# Device telemetry rate limiting
device_rate_limit = RateLimitMiddleware(
max_requests=100, # 100 messages
window_seconds=60 # per minute
)
router.on("devices/{device_id}/telemetry",
DeviceController.handle_telemetry,
middleware=[device_rate_limit])Public Endpoints
# More restrictive for public access
public_rate_limit = RateLimitMiddleware(
max_requests=50, # 50 requests
window_seconds=60 # per minute
)
router.on("public/{endpoint}",
PublicController.handle,
middleware=[public_rate_limit])Understanding Rate Limiting
How It Works
Request Arrives: A message is received on a topic
Key Generation: A unique key is generated (default: based on topic)
Count Check: Current request count is checked against the limit
Decision: Request is either allowed or blocked
Counter Update: Request count is incremented if allowed
Rate Limit Response
When rate limit is exceeded, the middleware returns:
{
"error": "rate_limit_exceeded",
"message": "Rate limit exceeded. Try again in 45 seconds.",
"rate_limit": {
"max_requests": 100,
"window_seconds": 60,
"remaining": 0,
"reset_time": 45
}
}Context Information
Rate limiting information is added to the context:
async def handle_request(self, context):
rate_limit_info = context.get('rate_limit', {})
if rate_limit_info.get('exceeded'):
# Handle rate limit exceeded case
pass
else:
remaining = rate_limit_info.get('remaining', 0)
print(f"Remaining requests: {remaining}")Storage Backends
Redis Backend (Recommended)
For distributed applications, use Redis for rate limiting:
# Ensure Redis is enabled in your application
from core.redis_manager import redis_manager
# Rate limiter automatically uses Redis if available
rate_limiter = RateLimitMiddleware(
max_requests=100,
window_seconds=60,
redis_key_prefix="api_rate_limit" # Custom prefix
)Benefits of Redis backend:
Shared across multiple application instances
Persistent across application restarts
High performance with atomic operations
Supports advanced algorithms
Memory Backend (Fallback)
When Redis is unavailable, in-memory fallback is used:
# Enable fallback to memory storage
rate_limiter = RateLimitMiddleware(
max_requests=100,
window_seconds=60,
fallback_enabled=True # Default: True
)Memory backend characteristics:
Per-instance rate limiting only
Lost on application restart
No coordination between instances
Suitable for single-instance deployments
Error Handling
Custom Error Messages
rate_limiter = RateLimitMiddleware(
max_requests=100,
window_seconds=60,
custom_error_message="Too many requests! Please slow down and try again later."
)Graceful Degradation
# Disable fallback - allow requests if Redis fails
strict_rate_limiter = RateLimitMiddleware(
max_requests=100,
window_seconds=60,
fallback_enabled=False # Allow requests if Redis unavailable
)
# Enable fallback - use memory if Redis fails
resilient_rate_limiter = RateLimitMiddleware(
max_requests=100,
window_seconds=60,
fallback_enabled=True # Use in-memory fallback
)Monitoring Rate Limits
Logging
Rate limiting events are automatically logged:
INFO - Rate limit middleware initialized: 100 req/60s, strategy: sliding_window
DEBUG - Rate limit check passed for key: topic:api/users, remaining: 95
WARNING - Rate limit exceeded for key: topic:api/usersContext Information
Access rate limit status in your handlers:
class ApiController(Controller):
async def handle_request(self, context):
rate_limit = context.get('rate_limit', {})
# Log rate limit status
self.logger.info(f"Rate limit remaining: {rate_limit.get('remaining', 0)}")
# Add rate limit headers to response
response = {"data": "success"}
if not rate_limit.get('exceeded'):
response['rate_limit'] = {
'remaining': rate_limit.get('remaining'),
'reset_time': rate_limit.get('reset_time')
}
return responseTesting Rate Limits
Unit Testing
import pytest
from app.middleware.rate_limit import RateLimitMiddleware
from unittest.mock import AsyncMock
@pytest.mark.asyncio
async def test_rate_limit_allows_requests_within_limit():
"""Test that requests within limit are allowed"""
rate_limiter = RateLimitMiddleware(
max_requests=5,
window_seconds=60,
fallback_enabled=True # Use memory for testing
)
context = {'topic': 'test/endpoint'}
handler = AsyncMock(return_value="success")
# First 5 requests should be allowed
for i in range(5):
result = await rate_limiter.handle(context.copy(), handler)
assert result == "success"
assert handler.call_count == i + 1
@pytest.mark.asyncio
async def test_rate_limit_blocks_excess_requests():
"""Test that requests exceeding limit are blocked"""
rate_limiter = RateLimitMiddleware(
max_requests=2,
window_seconds=60,
fallback_enabled=True
)
context = {'topic': 'test/endpoint'}
handler = AsyncMock(return_value="success")
# First 2 requests allowed
for i in range(2):
result = await rate_limiter.handle(context.copy(), handler)
assert result == "success"
# 3rd request should be blocked
result = await rate_limiter.handle(context.copy(), handler)
assert isinstance(result, dict)
assert result['error'] == 'rate_limit_exceeded'
assert handler.call_count == 2 # Handler not called for blocked requestIntegration Testing
@pytest.mark.asyncio
async def test_rate_limit_with_router():
"""Test rate limiting integrated with router"""
from core.router import Router
# Create rate limited route
rate_limiter = RateLimitMiddleware(max_requests=3, window_seconds=60)
router = Router()
router.on("test/{id}",
lambda ctx: {"result": "success"},
middleware=[rate_limiter])
# Test within limit
for i in range(3):
result = await router.dispatch("test/123", {}, None)
assert result["result"] == "success"
# Test rate limit exceeded
result = await router.dispatch("test/123", {}, None)
assert result["error"] == "rate_limit_exceeded"Performance Considerations
Key Generation Efficiency
The default key generator uses the topic, which means rate limiting is applied per topic:
# Default behavior - rate limit per topic
context = {'topic': 'api/users'}
# Key: "topic:api/users"
context = {'topic': 'api/orders'}
# Key: "topic:api/orders" # Different limit counterRedis Connection Pooling
Rate limiting middleware uses the shared Redis connection pool from redis_manager, ensuring efficient connection reuse.
Memory Usage
In-memory fallback automatically cleans up expired entries to prevent memory leaks:
# Automatic cleanup runs every 60 seconds
# Old entries are removed based on window_secondsCommon Use Cases
API Rate Limiting
# Standard REST API rate limiting
api_rate_limit = RateLimitMiddleware(
max_requests=1000, # 1000 requests per hour
window_seconds=3600,
strategy="sliding_window"
)
# Apply to all API routes
with router.group(prefix="api", middleware=[api_rate_limit]) as api:
api.on("users/{id}", UserController.get_user)
api.on("orders/{id}", OrderController.get_order)Device Telemetry
# IoT device data ingestion
telemetry_rate_limit = RateLimitMiddleware(
max_requests=500, # 500 messages per minute
window_seconds=60,
strategy="token_bucket",
burst_allowance=100 # Allow bursts up to 600
)
router.on("devices/{device_id}/data",
DeviceController.ingest_data,
middleware=[telemetry_rate_limit])Public Endpoints
# Public API with stricter limits
public_rate_limit = RateLimitMiddleware(
max_requests=100, # 100 requests per hour
window_seconds=3600,
custom_error_message="Public API rate limit reached. Please try again later."
)
router.on("public/weather/{city}",
WeatherController.get_weather,
middleware=[public_rate_limit])Troubleshooting
Common Issues
Rate limiting not working:
Check if Redis is enabled and accessible
Verify fallback_enabled setting
Check logs for Redis connection errors
Different behavior between instances:
Ensure all instances use the same Redis backend
Check Redis key prefix consistency
Verify Redis configuration
Memory usage growing:
Memory fallback automatically cleans up expired entries
Check cleanup interval settings
Monitor memory usage in single-instance deployments
Debug Logging
Enable debug logging to troubleshoot rate limiting:
import logging
logging.getLogger('app.middleware.rate_limit').setLevel(logging.DEBUG)Next Steps
Rate Limiting Strategies - Learn about different algorithms
Topic-Specific Limits - Set custom limits per topic
Client-Based Limiting - Rate limit by client instead of topic
Advanced Features - Whitelisting, custom messages, and more
Last updated