Redis Manager API
Complete API reference for the RouteMQ Redis Manager class and Redis integration patterns.
RedisManager Class
The RedisManager class provides async Redis operations with connection pooling, error handling, and graceful fallback when Redis is unavailable. It implements the singleton pattern to ensure one Redis connection per application.
Import
from core.redis_manager import redis_managerGlobal Instance
RouteMQ provides a pre-configured global Redis manager instance:
# Use the global instance (recommended)
from core.redis_manager import redis_manager
# Or create your own instance
from core.redis_manager import RedisManager
custom_redis = RedisManager()Configuration
Configure Redis through environment variables:
# Enable/disable Redis
ENABLE_REDIS=true
# Connection settings
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_DB=0
REDIS_PASSWORD=your_password
REDIS_USERNAME=your_username
# Connection pool settings
REDIS_MAX_CONNECTIONS=10
REDIS_SOCKET_TIMEOUT=5.0
REDIS_SOCKET_CONNECT_TIMEOUT=5.0Connection Management
initialize()
Initialize Redis connection pool and test connectivity.
Signature:
async def initialize() -> boolReturns: bool - True if connection successful, False otherwise
Example:
# Initialize Redis connection
success = await redis_manager.initialize()
if success:
print("Redis connected successfully")
else:
print("Redis connection failed")disconnect()
Close Redis connections and clean up resources.
Signature:
async def disconnect() -> NoneExample:
# Clean shutdown
await redis_manager.disconnect()is_enabled()
Check if Redis is enabled and available.
Signature:
def is_enabled() -> boolReturns: bool - True if Redis is enabled and connected
Example:
if redis_manager.is_enabled():
# Use Redis operations
await redis_manager.set("key", "value")
else:
# Fallback to alternative storage
local_cache["key"] = "value"get_client()
Get the underlying Redis client instance for advanced operations.
Signature:
def get_client() -> Optional[Redis]Returns: Redis client instance or None if not enabled
Example:
# Advanced Redis operations
client = redis_manager.get_client()
if client:
await client.pipeline().set("key1", "value1").set("key2", "value2").execute()Basic Operations
get(key)
Get value by key.
Signature:
async def get(key: str) -> Optional[str]Parameters:
key(str): Redis key
Returns: str | None - Value as string or None if not found
Example:
# Get a value
value = await redis_manager.get("user:123:name")
if value:
print(f"User name: {value}")set(key, value, ex=None, px=None, nx=False, xx=False)
Set key-value pair with optional expiration and conditions.
Signature:
async def set(key: str, value: Union[str, int, float], ex: Optional[int] = None,
px: Optional[int] = None, nx: bool = False, xx: bool = False) -> boolParameters:
key(str): Redis keyvalue(str | int | float): Value to setex(int, optional): Expire time in secondspx(int, optional): Expire time in millisecondsnx(bool): Only set if key doesn't existxx(bool): Only set if key exists
Returns: bool - True if successful
Example:
# Simple set
await redis_manager.set("session:abc123", "user_data")
# Set with expiration (1 hour)
await redis_manager.set("temp:token", "xyz789", ex=3600)
# Set only if key doesn't exist
success = await redis_manager.set("unique:id", "123", nx=True)
# Set only if key exists
success = await redis_manager.set("existing:key", "new_value", xx=True)delete(*keys)
Delete one or more keys.
Signature:
async def delete(*keys: str) -> intParameters:
keys(str): One or more keys to delete
Returns: int - Number of keys deleted
Example:
# Delete single key
deleted = await redis_manager.delete("temp:data")
# Delete multiple keys
deleted = await redis_manager.delete("key1", "key2", "key3")
print(f"Deleted {deleted} keys")exists(key)
Check if key exists.
Signature:
async def exists(key: str) -> boolParameters:
key(str): Redis key to check
Returns: bool - True if key exists
Example:
if await redis_manager.exists("user:123"):
user_data = await redis_manager.get("user:123")expire(key, time)
Set expiration time for a key.
Signature:
async def expire(key: str, time: int) -> boolParameters:
key(str): Redis keytime(int): Expiration time in seconds
Returns: bool - True if successful
Example:
# Set key to expire in 5 minutes
await redis_manager.expire("session:abc123", 300)ttl(key)
Get time to live for a key.
Signature:
async def ttl(key: str) -> intParameters:
key(str): Redis key
Returns: int - TTL in seconds, -1 if no expiry, -2 if key doesn't exist
Example:
ttl = await redis_manager.ttl("session:abc123")
if ttl > 0:
print(f"Session expires in {ttl} seconds")
elif ttl == -1:
print("Session never expires")
else:
print("Session doesn't exist")Numeric Operations
incr(key, amount=1)
Increment key value by amount.
Signature:
async def incr(key: str, amount: int = 1) -> Optional[int]Parameters:
key(str): Redis keyamount(int): Amount to increment (default: 1)
Returns: int | None - New value or None if error
Example:
# Increment counter
new_value = await redis_manager.incr("page:views")
print(f"Page views: {new_value}")
# Increment by custom amount
views = await redis_manager.incr("api:calls", 5)
# Rate limiting example
current_requests = await redis_manager.incr("rate_limit:user:123")
if current_requests > 100:
raise ValueError("Rate limit exceeded")Hash Operations
hset(name, key=None, value=None, mapping=None)
Set hash field(s).
Signature:
async def hset(name: str, key: str = None, value: str = None,
mapping: Dict[str, Any] = None) -> intParameters:
name(str): Hash namekey(str): Field key (for single field)value(str): Field value (for single field)mapping(dict): Dictionary of field-value pairs
Returns: int - Number of fields added
Example:
# Set single hash field
await redis_manager.hset("user:123", "name", "John Doe")
# Set multiple hash fields
await redis_manager.hset("user:123", mapping={
"email": "[email protected]",
"age": "30",
"status": "active"
})hget(name, key)
Get hash field value.
Signature:
async def hget(name: str, key: str) -> Optional[str]Parameters:
name(str): Hash namekey(str): Field key
Returns: str | None - Field value or None
Example:
# Get user name from hash
name = await redis_manager.hget("user:123", "name")
if name:
print(f"User name: {name}")
# Get user profile data
email = await redis_manager.hget("user:123", "email")
age = await redis_manager.hget("user:123", "age")JSON Operations
set_json(key, value, ex=None, px=None, nx=False, xx=False)
Serialize and store JSON data.
Signature:
async def set_json(key: str, value: Any, ex: Optional[int] = None,
px: Optional[int] = None, nx: bool = False, xx: bool = False) -> boolParameters:
key(str): Redis keyvalue(Any): Value to serialize as JSONOther parameters same as
set()
Returns: bool - True if successful
Example:
# Store complex data structure
user_data = {
"id": 123,
"name": "John Doe",
"preferences": {
"theme": "dark",
"notifications": True
},
"tags": ["admin", "developer"]
}
await redis_manager.set_json("user:123:profile", user_data, ex=3600)get_json(key)
Retrieve and deserialize JSON data.
Signature:
async def get_json(key: str) -> Optional[Any]Parameters:
key(str): Redis key
Returns: Any | None - Deserialized value or None
Example:
# Retrieve complex data structure
user_data = await redis_manager.get_json("user:123:profile")
if user_data:
print(f"User: {user_data['name']}")
print(f"Theme: {user_data['preferences']['theme']}")Common Usage Patterns
Caching
async def get_user_with_cache(user_id: str):
"""Get user data with Redis caching."""
cache_key = f"user:{user_id}"
# Try cache first
cached_user = await redis_manager.get_json(cache_key)
if cached_user:
return cached_user
# Fetch from database
user = await database.get_user(user_id)
if user:
# Cache for 1 hour
await redis_manager.set_json(cache_key, user, ex=3600)
return userSession Management
import uuid
import json
class SessionManager:
@staticmethod
async def create_session(user_id: str, data: dict) -> str:
"""Create a new user session."""
session_id = str(uuid.uuid4())
session_key = f"session:{session_id}"
session_data = {
"user_id": user_id,
"created_at": time.time(),
**data
}
# Session expires in 24 hours
await redis_manager.set_json(session_key, session_data, ex=86400)
return session_id
@staticmethod
async def get_session(session_id: str) -> Optional[dict]:
"""Get session data."""
session_key = f"session:{session_id}"
return await redis_manager.get_json(session_key)
@staticmethod
async def delete_session(session_id: str) -> bool:
"""Delete session."""
session_key = f"session:{session_id}"
deleted = await redis_manager.delete(session_key)
return deleted > 0Rate Limiting
import time
class RateLimiter:
@staticmethod
async def check_rate_limit(identifier: str, max_requests: int = 100,
window_seconds: int = 3600) -> bool:
"""Check if request is within rate limit."""
current_time = int(time.time())
window_start = current_time - window_seconds
# Use sorted set for sliding window
key = f"rate_limit:{identifier}"
# Remove old entries
client = redis_manager.get_client()
if client:
pipe = client.pipeline()
pipe.zremrangebyscore(key, 0, window_start)
pipe.zadd(key, {str(current_time): current_time})
pipe.zcard(key)
pipe.expire(key, window_seconds)
results = await pipe.execute()
current_requests = results[2] # Count from zcard
return current_requests <= max_requests
return True # Allow if Redis unavailableDistributed Locking
import asyncio
import time
class DistributedLock:
def __init__(self, key: str, timeout: int = 10):
self.key = f"lock:{key}"
self.timeout = timeout
self.identifier = str(uuid.uuid4())
async def acquire(self) -> bool:
"""Acquire distributed lock."""
end_time = time.time() + self.timeout
while time.time() < end_time:
# Try to acquire lock
if await redis_manager.set(self.key, self.identifier, ex=self.timeout, nx=True):
return True
# Wait before retrying
await asyncio.sleep(0.001)
return False
async def release(self) -> bool:
"""Release distributed lock."""
# Lua script to atomically check and delete
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
client = redis_manager.get_client()
if client:
result = await client.eval(lua_script, 1, self.key, self.identifier)
return bool(result)
return False
async def __aenter__(self):
if not await self.acquire():
raise TimeoutError(f"Could not acquire lock: {self.key}")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.release()
# Usage
async def critical_section():
async with DistributedLock("resource:123", timeout=30):
# Only one process can execute this at a time
await process_critical_resource()Pub/Sub Messaging
class PubSubManager:
@staticmethod
async def publish(channel: str, message: dict):
"""Publish message to channel."""
client = redis_manager.get_client()
if client:
await client.publish(channel, json.dumps(message))
@staticmethod
async def subscribe(channels: list, handler: callable):
"""Subscribe to channels and handle messages."""
client = redis_manager.get_client()
if not client:
return
pubsub = client.pubsub()
await pubsub.subscribe(*channels)
try:
async for message in pubsub.listen():
if message['type'] == 'message':
try:
data = json.loads(message['data'])
await handler(message['channel'], data)
except Exception as e:
logger.error(f"Error handling pubsub message: {e}")
finally:
await pubsub.close()
# Usage
async def handle_notification(channel: str, data: dict):
print(f"Received notification on {channel}: {data}")
# Publish
await PubSubManager.publish("notifications", {
"type": "user_login",
"user_id": 123,
"timestamp": time.time()
})
# Subscribe
await PubSubManager.subscribe(["notifications"], handle_notification)Error Handling
The Redis Manager handles errors gracefully:
# All operations return sensible defaults on error
value = await redis_manager.get("nonexistent_key") # Returns None
success = await redis_manager.set("key", "value") # Returns False if failed
count = await redis_manager.delete("key") # Returns 0 if failed
# Check if Redis is available before critical operations
if redis_manager.is_enabled():
await redis_manager.set("important_data", data)
else:
# Fallback to alternative storage
await database.save_data(data)Best Practices
1. Use JSON Methods for Complex Data
# Good - structured data
await redis_manager.set_json("user:123", {"name": "John", "age": 30})
# Avoid - manual serialization
await redis_manager.set("user:123", json.dumps({"name": "John", "age": 30}))2. Set Appropriate Expiration Times
# Cache data with reasonable TTL
await redis_manager.set_json("cache:expensive_query", result, ex=3600) # 1 hour
# Session data with longer TTL
await redis_manager.set_json("session:abc123", session_data, ex=86400) # 24 hours3. Use Descriptive Key Patterns
# Good - clear, hierarchical keys
"user:123:profile"
"session:abc123:data"
"cache:product:456"
"rate_limit:user:123"
# Avoid - unclear keys
"u123"
"data"
"temp"4. Handle Redis Unavailability
async def robust_cache_get(key: str):
"""Get from cache with fallback."""
if redis_manager.is_enabled():
return await redis_manager.get_json(key)
return None # Graceful fallback
async def robust_cache_set(key: str, value: Any, ex: int = None):
"""Set cache with error handling."""
if redis_manager.is_enabled():
return await redis_manager.set_json(key, value, ex=ex)
return False # Fail silently if Redis unavailableLast updated