Testing
Topics
Quick Overview
python run_tests.py
# Or using uv
uv run pytestWriting Tests
Test Organization
Best Practices
Next Steps
Last updated
python run_tests.py
# Or using uv
uv run pytestLast updated
import unittest
from unittest.mock import Mock, AsyncMock, patch
from app.controllers.sensor_controller import SensorController
from app.middleware.rate_limit import RateLimitMiddleware
class TestSensorController(unittest.TestCase):
async def test_handle_temperature(self):
# Mock client and Redis
client = Mock()
client.publish = Mock()
# Test data
sensor_id = "temp_001"
payload = {"value": 25.5, "unit": "celsius"}
# Call controller
result = await SensorController.handle_temperature(sensor_id, payload, client)
# Assertions
self.assertEqual(result["status"], "processed")
self.assertEqual(result["sensor_id"], sensor_id)
class TestRateLimitMiddleware(unittest.TestCase):
async def test_rate_limiting(self):
# Create rate limiter
rate_limiter = RateLimitMiddleware(max_requests=2, window_seconds=60)
# Mock context and handler
context = {"topic": "test/topic", "payload": {}}
next_handler = AsyncMock(return_value={"success": True})
# First two requests should pass
result1 = await rate_limiter.handle(context, next_handler)
self.assertEqual(result1["success"], True)
result2 = await rate_limiter.handle(context, next_handler)
self.assertEqual(result2["success"], True)
# Third request should be rate limited
result3 = await rate_limiter.handle(context, next_handler)
self.assertIn("rate_limit_exceeded", result3.get("error", ""))tests/
├── __init__.py
├── unit/
│ ├── test_controller.py
│ ├── test_middleware.py
│ └── test_router.py
├── integration/
│ ├── test_full_workflow.py
│ └── test_redis_integration.py
└── fixtures/
└── test_data.json