Chat Application
This guide demonstrates how to build a real-time chat application using RouteMQ with features like multi-room support, private messaging, user presence, and message history.
Overview
The chat application handles:
Real-time messaging between users
Multiple chat rooms and channels
Private direct messages
User presence and status tracking
Message history and persistence
File sharing and media messages
Message reactions and threading
Typing indicators
Architecture
Chat Clients <-> MQTT <-> RouteMQ Chat Router <-> Redis/Database
<-> WebSocket Service
<-> Message Storage
<-> User PresenceChat Router Setup
# app/routers/chat.py
from core.router import Router
from app.controllers.chat_controller import ChatController
from app.middleware.auth import AuthMiddleware
from app.middleware.rate_limit import RateLimitMiddleware
from app.middleware.profanity_filter import ProfanityFilterMiddleware
from app.middleware.message_validation import MessageValidationMiddleware
router = Router()
# Middleware setup
auth = AuthMiddleware()
rate_limit = RateLimitMiddleware(max_requests=100, window_seconds=60) # Stricter for chat
profanity_filter = ProfanityFilterMiddleware()
message_validation = MessageValidationMiddleware()
# Chat routes
with router.group(prefix="chat", middleware=[auth, rate_limit, message_validation]) as chat:
# Room management
chat.on("rooms/create", ChatController.create_room, qos=2)
chat.on("rooms/join/{room_id}", ChatController.join_room, qos=1)
chat.on("rooms/leave/{room_id}", ChatController.leave_room, qos=1)
chat.on("rooms/list", ChatController.list_rooms, qos=1)
chat.on("rooms/{room_id}/info", ChatController.get_room_info, qos=1)
# Messaging
with chat.group(middleware=[profanity_filter]) as messaging:
messaging.on("message/{room_id}", ChatController.send_room_message, qos=1)
messaging.on("private/{user_id}", ChatController.send_private_message, qos=1)
messaging.on("broadcast", ChatController.send_broadcast_message, qos=1)
# Message management
chat.on("message/{message_id}/edit", ChatController.edit_message, qos=1)
chat.on("message/{message_id}/delete", ChatController.delete_message, qos=1)
chat.on("message/{message_id}/react", ChatController.add_reaction, qos=0)
chat.on("message/{message_id}/reply", ChatController.reply_to_message, qos=1)
# User presence and status
chat.on("presence/online", ChatController.set_user_online, qos=0)
chat.on("presence/offline", ChatController.set_user_offline, qos=0)
chat.on("presence/away", ChatController.set_user_away, qos=0)
chat.on("typing/{room_id}/start", ChatController.start_typing, qos=0)
chat.on("typing/{room_id}/stop", ChatController.stop_typing, qos=0)
# Message history
chat.on("history/{room_id}", ChatController.get_message_history, qos=1)
chat.on("search/{room_id}", ChatController.search_messages, qos=1)
# File sharing
chat.on("file/upload/{room_id}", ChatController.handle_file_upload, qos=2)
chat.on("file/share/{room_id}", ChatController.share_file, qos=1)
# Admin routes
with router.group(prefix="chat/admin", middleware=[auth]) as admin:
admin.on("rooms/{room_id}/moderate", ChatController.moderate_room, qos=2)
admin.on("users/{user_id}/mute", ChatController.mute_user, qos=2)
admin.on("users/{user_id}/ban", ChatController.ban_user, qos=2)
admin.on("messages/cleanup", ChatController.cleanup_messages, qos=1)Chat Controller Implementation
# app/controllers/chat_controller.py
from core.controller import Controller
from core.redis_manager import redis_manager
from app.models.user import User
from app.services.chat_service import ChatService
from app.services.file_service import FileService
from app.services.notification_service import NotificationService
import json
import time
import uuid
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
from enum import Enum
class MessageType(Enum):
TEXT = "text"
IMAGE = "image"
FILE = "file"
SYSTEM = "system"
class UserStatus(Enum):
ONLINE = "online"
AWAY = "away"
OFFLINE = "offline"
class ChatController(Controller):
@staticmethod
async def create_room(payload: Dict[str, Any], client, **kwargs):
"""Create a new chat room"""
try:
context = kwargs.get('context', {})
creator_id = context.get('user_id')
room_name = payload.get("name")
room_description = payload.get("description", "")
room_type = payload.get("type", "public") # public, private, direct
max_members = payload.get("max_members", 100)
if not room_name:
raise ValueError("Room name is required")
room_id = str(uuid.uuid4())
# Create room data
room_data = {
"id": room_id,
"name": room_name,
"description": room_description,
"type": room_type,
"creator_id": creator_id,
"created_at": time.time(),
"max_members": max_members,
"member_count": 1,
"last_activity": time.time()
}
# Store room data
await redis_manager.set_json(f"room:{room_id}", room_data, ex=86400*30) # 30 days
# Add to rooms list
await redis_manager.sadd("rooms:all", room_id)
if room_type == "public":
await redis_manager.sadd("rooms:public", room_id)
# Add creator as member
await redis_manager.sadd(f"room:{room_id}:members", creator_id)
await redis_manager.sadd(f"user:{creator_id}:rooms", room_id)
# Set creator as admin
await redis_manager.sadd(f"room:{room_id}:admins", creator_id)
# Send system message
system_message = {
"id": str(uuid.uuid4()),
"room_id": room_id,
"type": MessageType.SYSTEM.value,
"content": f"Room '{room_name}' created",
"timestamp": time.time(),
"sender": "system"
}
await ChatController._store_message(system_message)
await ChatController._broadcast_to_room(room_id, system_message, client)
return {
"status": "room_created",
"room_id": room_id,
"room_data": room_data
}
except Exception as e:
print(f"Error creating room: {e}")
raise
@staticmethod
async def join_room(room_id: str, payload: Dict[str, Any], client, **kwargs):
"""Join a chat room"""
try:
context = kwargs.get('context', {})
user_id = context.get('user_id')
# Check if room exists
room_data = await redis_manager.get_json(f"room:{room_id}")
if not room_data:
raise ValueError("Room not found")
# Check if user is already a member
is_member = await redis_manager.sismember(f"room:{room_id}:members", user_id)
if is_member:
return {"status": "already_member", "room_id": room_id}
# Check room capacity
member_count = await redis_manager.scard(f"room:{room_id}:members")
if member_count >= room_data.get("max_members", 100):
raise ValueError("Room is full")
# Check if user is banned
is_banned = await redis_manager.sismember(f"room:{room_id}:banned", user_id)
if is_banned:
raise ValueError("User is banned from this room")
# Add user to room
await redis_manager.sadd(f"room:{room_id}:members", user_id)
await redis_manager.sadd(f"user:{user_id}:rooms", room_id)
# Update member count
await redis_manager.incr(f"room:{room_id}:member_count")
# Get user info
user = await User.find_by_id(user_id)
username = user.username if user else f"User_{user_id}"
# Send join message
join_message = {
"id": str(uuid.uuid4()),
"room_id": room_id,
"type": MessageType.SYSTEM.value,
"content": f"{username} joined the room",
"timestamp": time.time(),
"sender": "system",
"user_id": user_id
}
await ChatController._store_message(join_message)
await ChatController._broadcast_to_room(room_id, join_message, client)
# Send recent message history to new member
await ChatController._send_recent_history(room_id, user_id, client)
return {
"status": "joined",
"room_id": room_id,
"member_count": member_count + 1
}
except Exception as e:
print(f"Error joining room {room_id}: {e}")
raise
@staticmethod
async def send_room_message(room_id: str, payload: Dict[str, Any], client, **kwargs):
"""Send message to a room"""
try:
context = kwargs.get('context', {})
sender_id = context.get('user_id')
content = payload.get("content")
message_type = payload.get("type", MessageType.TEXT.value)
reply_to = payload.get("reply_to") # Message ID this is replying to
if not content:
raise ValueError("Message content is required")
# Check if user is member of room
is_member = await redis_manager.sismember(f"room:{room_id}:members", sender_id)
if not is_member:
raise ValueError("User is not a member of this room")
# Check if user is muted
is_muted = await redis_manager.sismember(f"room:{room_id}:muted", sender_id)
if is_muted:
raise ValueError("User is muted in this room")
# Create message
message_id = str(uuid.uuid4())
message = {
"id": message_id,
"room_id": room_id,
"sender_id": sender_id,
"content": content,
"type": message_type,
"timestamp": time.time(),
"reply_to": reply_to,
"reactions": {},
"edited": False
}
# Get sender info
user = await User.find_by_id(sender_id)
message["sender_name"] = user.username if user else f"User_{sender_id}"
# Store message
await ChatController._store_message(message)
# Update room last activity
await redis_manager.set(f"room:{room_id}:last_activity", time.time())
# Broadcast to room members
await ChatController._broadcast_to_room(room_id, message, client)
# Update user's message count
await redis_manager.incr(f"user:{sender_id}:message_count")
return {
"status": "message_sent",
"message_id": message_id,
"timestamp": message["timestamp"]
}
except Exception as e:
print(f"Error sending message to room {room_id}: {e}")
raise
@staticmethod
async def send_private_message(target_user_id: str, payload: Dict[str, Any], client, **kwargs):
"""Send private message to another user"""
try:
context = kwargs.get('context', {})
sender_id = context.get('user_id')
if sender_id == target_user_id:
raise ValueError("Cannot send message to yourself")
content = payload.get("content")
message_type = payload.get("type", MessageType.TEXT.value)
if not content:
raise ValueError("Message content is required")
# Create or get private room ID
room_id = await ChatController._get_or_create_private_room(sender_id, target_user_id)
# Create message
message_id = str(uuid.uuid4())
message = {
"id": message_id,
"room_id": room_id,
"sender_id": sender_id,
"target_id": target_user_id,
"content": content,
"type": message_type,
"timestamp": time.time(),
"is_private": True,
"read": False
}
# Get sender info
user = await User.find_by_id(sender_id)
message["sender_name"] = user.username if user else f"User_{sender_id}"
# Store message
await ChatController._store_message(message)
# Send to both users
await ChatController._send_to_user(sender_id, message, client)
await ChatController._send_to_user(target_user_id, message, client)
# Update unread count for target user
await redis_manager.incr(f"user:{target_user_id}:unread_messages")
# Send push notification to target user if offline
target_status = await redis_manager.get(f"user:{target_user_id}:status")
if target_status != UserStatus.ONLINE.value:
await NotificationService.send_message_notification(target_user_id, message)
return {
"status": "private_message_sent",
"message_id": message_id,
"room_id": room_id
}
except Exception as e:
print(f"Error sending private message: {e}")
raise
@staticmethod
async def start_typing(room_id: str, payload: Dict[str, Any], client, **kwargs):
"""Indicate user is typing"""
try:
context = kwargs.get('context', {})
user_id = context.get('user_id')
# Check if user is member
is_member = await redis_manager.sismember(f"room:{room_id}:members", user_id)
if not is_member:
return {"status": "not_member"}
# Set typing indicator
await redis_manager.setex(f"typing:{room_id}:{user_id}", 10, "1") # 10 seconds
# Get user info
user = await User.find_by_id(user_id)
username = user.username if user else f"User_{user_id}"
# Broadcast typing indicator
typing_message = {
"type": "typing_start",
"room_id": room_id,
"user_id": user_id,
"username": username,
"timestamp": time.time()
}
await ChatController._broadcast_to_room(room_id, typing_message, client, exclude_user=user_id)
return {"status": "typing_started"}
except Exception as e:
print(f"Error setting typing indicator: {e}")
raise
@staticmethod
async def stop_typing(room_id: str, payload: Dict[str, Any], client, **kwargs):
"""Stop typing indicator"""
try:
context = kwargs.get('context', {})
user_id = context.get('user_id')
# Remove typing indicator
await redis_manager.delete(f"typing:{room_id}:{user_id}")
# Broadcast stop typing
typing_message = {
"type": "typing_stop",
"room_id": room_id,
"user_id": user_id,
"timestamp": time.time()
}
await ChatController._broadcast_to_room(room_id, typing_message, client, exclude_user=user_id)
return {"status": "typing_stopped"}
except Exception as e:
print(f"Error stopping typing indicator: {e}")
raise
@staticmethod
async def add_reaction(message_id: str, payload: Dict[str, Any], client, **kwargs):
"""Add reaction to a message"""
try:
context = kwargs.get('context', {})
user_id = context.get('user_id')
emoji = payload.get("emoji")
if not emoji:
raise ValueError("Emoji is required")
# Get message
message = await redis_manager.get_json(f"message:{message_id}")
if not message:
raise ValueError("Message not found")
# Check if user is member of the room
room_id = message["room_id"]
is_member = await redis_manager.sismember(f"room:{room_id}:members", user_id)
if not is_member:
raise ValueError("User is not a member of this room")
# Add reaction
reactions = message.get("reactions", {})
if emoji not in reactions:
reactions[emoji] = []
if user_id not in reactions[emoji]:
reactions[emoji].append(user_id)
message["reactions"] = reactions
# Update message
await redis_manager.set_json(f"message:{message_id}", message, ex=86400*30)
# Broadcast reaction update
reaction_message = {
"type": "reaction_added",
"message_id": message_id,
"emoji": emoji,
"user_id": user_id,
"reactions": reactions,
"timestamp": time.time()
}
await ChatController._broadcast_to_room(room_id, reaction_message, client)
return {"status": "reaction_added", "reactions": reactions}
except Exception as e:
print(f"Error adding reaction: {e}")
raise
@staticmethod
async def get_message_history(room_id: str, payload: Dict[str, Any], client, **kwargs):
"""Get message history for a room"""
try:
context = kwargs.get('context', {})
user_id = context.get('user_id')
# Check if user is member
is_member = await redis_manager.sismember(f"room:{room_id}:members", user_id)
if not is_member:
raise ValueError("User is not a member of this room")
limit = payload.get("limit", 50)
before_timestamp = payload.get("before") # For pagination
# Get messages from Redis sorted set
if before_timestamp:
max_score = before_timestamp
else:
max_score = "+inf"
message_ids = await redis_manager.zrevrangebyscore(
f"room:{room_id}:messages",
max_score,
"-inf",
start=0,
num=limit
)
messages = []
for message_id in message_ids:
message = await redis_manager.get_json(f"message:{message_id}")
if message:
messages.append(message)
return {
"status": "success",
"room_id": room_id,
"messages": messages,
"count": len(messages)
}
except Exception as e:
print(f"Error getting message history: {e}")
raise
@staticmethod
async def set_user_online(payload: Dict[str, Any], client, **kwargs):
"""Set user status to online"""
try:
context = kwargs.get('context', {})
user_id = context.get('user_id')
# Set user status
await redis_manager.setex(f"user:{user_id}:status", 300, UserStatus.ONLINE.value) # 5 minutes
await redis_manager.set(f"user:{user_id}:last_seen", time.time())
# Add to online users set
await redis_manager.sadd("users:online", user_id)
# Broadcast status to user's rooms
await ChatController._broadcast_user_status(user_id, UserStatus.ONLINE.value, client)
return {"status": "online"}
except Exception as e:
print(f"Error setting user online: {e}")
raise
# Helper methods
@staticmethod
async def _store_message(message: Dict[str, Any]):
"""Store message in Redis and database"""
message_id = message["id"]
room_id = message["room_id"]
timestamp = message["timestamp"]
# Store message data
await redis_manager.set_json(f"message:{message_id}", message, ex=86400*30) # 30 days
# Add to room's message timeline
await redis_manager.zadd(f"room:{room_id}:messages", {message_id: timestamp})
# Keep only recent messages in memory (last 1000)
await redis_manager.zremrangebyrank(f"room:{room_id}:messages", 0, -1001)
# Add to global message index for search
if message.get("type") == MessageType.TEXT.value:
await redis_manager.zadd("messages:all", {message_id: timestamp})
@staticmethod
async def _broadcast_to_room(room_id: str, message: Dict[str, Any], client, exclude_user: str = None):
"""Broadcast message to all room members"""
members = await redis_manager.smembers(f"room:{room_id}:members")
for member_id in members:
if exclude_user and member_id == exclude_user:
continue
await ChatController._send_to_user(member_id, message, client)
@staticmethod
async def _send_to_user(user_id: str, message: Dict[str, Any], client):
"""Send message to specific user"""
# In a real implementation, this would send via WebSocket
# For now, we'll publish to a user-specific topic
user_topic = f"chat/user/{user_id}/messages"
client.publish(user_topic, json.dumps(message))
@staticmethod
async def _get_or_create_private_room(user1_id: str, user2_id: str) -> str:
"""Get or create private room between two users"""
# Create consistent room ID regardless of user order
sorted_users = sorted([user1_id, user2_id])
room_id = f"private_{sorted_users[0]}_{sorted_users[1]}"
# Check if room already exists
room_exists = await redis_manager.exists(f"room:{room_id}")
if not room_exists:
# Create private room
room_data = {
"id": room_id,
"type": "private",
"participants": sorted_users,
"created_at": time.time(),
"last_activity": time.time()
}
await redis_manager.set_json(f"room:{room_id}", room_data, ex=86400*30)
# Add users as members
for user_id in sorted_users:
await redis_manager.sadd(f"room:{room_id}:members", user_id)
await redis_manager.sadd(f"user:{user_id}:rooms", room_id)
return room_id
@staticmethod
async def _send_recent_history(room_id: str, user_id: str, client):
"""Send recent message history to user"""
# Get last 20 messages
message_ids = await redis_manager.zrevrange(f"room:{room_id}:messages", 0, 19)
messages = []
for message_id in message_ids:
message = await redis_manager.get_json(f"message:{message_id}")
if message:
messages.append(message)
# Send in chronological order
messages.reverse()
history_message = {
"type": "message_history",
"room_id": room_id,
"messages": messages
}
await ChatController._send_to_user(user_id, history_message, client)
@staticmethod
async def _broadcast_user_status(user_id: str, status: str, client):
"""Broadcast user status to all their rooms"""
user_rooms = await redis_manager.smembers(f"user:{user_id}:rooms")
# Get user info
user = await User.find_by_id(user_id)
username = user.username if user else f"User_{user_id}"
status_message = {
"type": "user_status",
"user_id": user_id,
"username": username,
"status": status,
"timestamp": time.time()
}
for room_id in user_rooms:
await ChatController._broadcast_to_room(room_id, status_message, client, exclude_user=user_id)Message Validation Middleware
# app/middleware/message_validation.py
from core.middleware import Middleware
import re
class MessageValidationMiddleware(Middleware):
async def handle(self, context, next_handler):
payload = context.get('payload', {})
if 'content' in payload:
content = payload['content']
# Length validation
if len(content) > 4000: # 4000 character limit
raise ValueError("Message too long (max 4000 characters)")
if len(content.strip()) == 0:
raise ValueError("Message cannot be empty")
# Basic content validation
if self._contains_excessive_caps(content):
payload['content'] = content.lower()
# URL validation (simplified)
if self._contains_suspicious_urls(content):
payload['flagged'] = True
return await next_handler(context)
def _contains_excessive_caps(self, text: str) -> bool:
"""Check if message has too many capital letters"""
if len(text) < 10:
return False
caps_ratio = sum(1 for c in text if c.isupper()) / len(text)
return caps_ratio > 0.7
def _contains_suspicious_urls(self, text: str) -> bool:
"""Basic check for suspicious URLs"""
suspicious_domains = ['bit.ly', 'tinyurl.com'] # Add more as needed
return any(domain in text for domain in suspicious_domains)Usage Examples
Join Room and Send Message
# Join room: chat/rooms/join/room_123
{
"user_preferences": {
"notifications": true
}
}
# Send message: chat/message/room_123
{
"content": "Hello everyone!",
"type": "text"
}Private Message
# Send private message: chat/private/user_456
{
"content": "Hey, how are you?",
"type": "text"
}Add Reaction
# Add reaction: chat/message/msg_789/react
{
"emoji": "👍"
}Typing Indicator
# Start typing: chat/typing/room_123/start
{}
# Stop typing: chat/typing/room_123/stop
{}This chat application provides comprehensive real-time messaging capabilities with room management, private messaging, presence tracking, and moderation features.
Integration with Frontend
The chat system can be integrated with web clients using WebSocket connections or Server-Sent Events (SSE) for real-time updates. Mobile clients can use MQTT directly or HTTP APIs with push notifications for offline message delivery.
Last updated