Copy class AdvancedWhitelistMiddleware(RateLimitMiddleware):
"""Rate limiting with advanced whitelisting capabilities"""
def __init__(self, whitelist_config: Dict[str, Any] = None, *args, **kwargs):
super().__init__(*args, **kwargs)
self.whitelist_config = whitelist_config or {}
# Different types of whitelists
self.topic_whitelist = set(self.whitelist_config.get('topics', []))
self.client_whitelist = set(self.whitelist_config.get('clients', []))
self.ip_whitelist = set(self.whitelist_config.get('ip_addresses', []))
self.user_whitelist = set(self.whitelist_config.get('users', []))
self.role_whitelist = set(self.whitelist_config.get('roles', []))
# Conditional whitelisting
self.conditional_whitelist = self.whitelist_config.get('conditional', [])
def _is_whitelisted(self, key: str, context: Dict[str, Any] = None) -> bool:
"""Advanced whitelist checking"""
# Topic-based whitelisting
topic = context.get('topic', '') if context else ''
if self._matches_topic_whitelist(topic):
return True
# Client-based whitelisting
if self._matches_client_whitelist(context):
return True
# IP-based whitelisting
if self._matches_ip_whitelist(context):
return True
# User-based whitelisting
if self._matches_user_whitelist(context):
return True
# Role-based whitelisting
if self._matches_role_whitelist(context):
return True
# Conditional whitelisting
if self._matches_conditional_whitelist(context):
return True
return False
def _matches_topic_whitelist(self, topic: str) -> bool:
"""Check topic against whitelist patterns"""
for pattern in self.topic_whitelist:
if self._topic_matches_pattern(topic, pattern):
return True
return False
def _matches_client_whitelist(self, context: Dict[str, Any]) -> bool:
"""Check client ID against whitelist"""
if not context:
return False
# Check various client identifiers
identifiers = [
context.get('user_id'),
context.get('device_id'),
context.get('client_id'),
context.get('auth_data', {}).get('key_id')
]
for identifier in identifiers:
if identifier and identifier in self.client_whitelist:
return True
return False
def _matches_ip_whitelist(self, context: Dict[str, Any]) -> bool:
"""Check IP address against whitelist"""
if not context:
return False
client_ip = context.get('client_ip')
return client_ip in self.ip_whitelist if client_ip else False
def _matches_user_whitelist(self, context: Dict[str, Any]) -> bool:
"""Check user against whitelist"""
if not context:
return False
user_id = context.get('user_id')
return user_id in self.user_whitelist if user_id else False
def _matches_role_whitelist(self, context: Dict[str, Any]) -> bool:
"""Check user roles against whitelist"""
if not context:
return False
user_roles = context.get('roles', [])
return any(role in self.role_whitelist for role in user_roles)
def _matches_conditional_whitelist(self, context: Dict[str, Any]) -> bool:
"""Check conditional whitelist rules"""
if not context:
return False
for condition in self.conditional_whitelist:
if self._evaluate_condition(condition, context):
return True
return False
def _evaluate_condition(self, condition: Dict[str, Any], context: Dict[str, Any]) -> bool:
"""Evaluate a conditional whitelist rule"""
condition_type = condition.get('type')
if condition_type == 'time_range':
return self._check_time_range(condition, context)
elif condition_type == 'payload_field':
return self._check_payload_field(condition, context)
elif condition_type == 'request_size':
return self._check_request_size(condition, context)
elif condition_type == 'custom_header':
return self._check_custom_header(condition, context)
return False
def _check_time_range(self, condition: Dict, context: Dict) -> bool:
"""Check if current time is within specified range"""
import datetime
now = datetime.datetime.now()
start_hour = condition.get('start_hour', 0)
end_hour = condition.get('end_hour', 23)
return start_hour <= now.hour <= end_hour
def _check_payload_field(self, condition: Dict, context: Dict) -> bool:
"""Check payload field value"""
payload = context.get('payload', {})
field = condition.get('field')
expected_value = condition.get('value')
return payload.get(field) == expected_value
def _check_request_size(self, condition: Dict, context: Dict) -> bool:
"""Check request size"""
payload = context.get('payload', {})
max_size = condition.get('max_size', 1024)
payload_size = len(str(payload))
return payload_size <= max_size
def _check_custom_header(self, condition: Dict, context: Dict) -> bool:
"""Check custom header value"""
headers = context.get('headers', {})
header_name = condition.get('header')
expected_value = condition.get('value')
return headers.get(header_name) == expected_value
# Advanced whitelist configuration
whitelist_config = {
'topics': [
'admin/*',
'emergency/*',
'system/health'
],
'clients': [
'user:admin_user',
'device:critical_device_001',
'api_key:admin_key_123'
],
'ip_addresses': [
'192.168.1.100', # Admin workstation
'10.0.0.50' # Monitoring server
],
'users': [
'admin_user',
'system_user'
],
'roles': [
'admin',
'system_operator'
],
'conditional': [
{
'type': 'time_range',
'start_hour': 9,
'end_hour': 17,
'description': 'Business hours exemption'
},
{
'type': 'payload_field',
'field': 'priority',
'value': 'emergency',
'description': 'Emergency priority messages'
},
{
'type': 'custom_header',
'header': 'X-Bypass-Rate-Limit',
'value': 'true',
'description': 'Explicit bypass header'
}
]
}
advanced_whitelist_limiter = AdvancedWhitelistMiddleware(
max_requests=100,
window_seconds=60,
whitelist_config=whitelist_config
)