Introduction
Microservices architecture structures an application as a collection of loosely coupled services. This tutorial covers service discovery, API gateways, and inter-service communication patterns.
Service Discovery
# service_registry.py
import requests
import time
from typing import Dict, List
class ServiceRegistry:
def __init__(self, consul_url: str = "http://localhost:8500"):
self.consul_url = consul_url
def register(self, service_name: str, host: str, port: int):
url = f"{self.consul_url}/v1/agent/service/register"
payload = {
"ID": f"{service_name}-{host}-{port}",
"Name": service_name,
"Address": host,
"Port": port,
"Check": {
"HTTP": f"http://{host}:{port}/health",
"Interval": "10s"
}
}
requests.put(url, json=payload)
def discover(self, service_name: str) -> List[Dict]:
url = f"{self.consul_url}/v1/health/service/{service_name}"
response = requests.get(url)
services = response.json()
return [
{"host": s['Service']['Address'], "port": s['Service']['Port']}
for s in services
]
class ServiceClient:
def __init__(self, service_name: str, registry: ServiceRegistry):
self.service_name = service_name
self.registry = registry
self.instances = []
self.last_update = 0
def _refresh_instances(self):
if time.time() - self.last_update > 30:
self.instances = self.registry.discover(self.service_name)
self.last_update = time.time()
def get_instance(self):
self._refresh_instances()
if not self.instances:
raise Exception(f"No instances of {self.service_name}")
return self.instances[0]
def call(self, path: str):
instance = self.get_instance()
url = f"http://{instance['host']}:{instance['port']}{path}"
return requests.get(url).json()
API Gateway
# gateway.py
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
import httpx
app = FastAPI()
# Route configuration
ROUTES = {
"/api/users": "user-service:8001",
"/api/products": "product-service:8002",
"/api/orders": "order-service:8003",
}
@app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
async def gateway(path: str, request: Request):
full_path = f"/{path}"
if full_path not in ROUTES:
raise HTTPException(status_code=404, detail="Route not found")
service = ROUTES[full_path]
method = request.method
# Forward request
async with httpx.AsyncClient() as client:
response = await client.request(
method=method,
url=f"http://{service}{request.url.path}",
headers=dict(request.headers),
content=await request.body()
)
return JSONResponse(
content=response.json(),
status_code=response.status_code
)
Inter-Service Communication
# sync_communication.py
import httpx
import asyncio
class ServiceCommunicator:
def __init__(self, timeout: float = 5.0):
self.client = httpx.AsyncClient(timeout=timeout)
async def call_service(self, url: str, method: str = "GET", **kwargs):
response = await self.client.request(method, url, **kwargs)
return response.json()
async def call_user_service(self, user_id: int):
return await self.call_service(f"http://user-service:8001/users/{user_id}")
async def call_product_service(self, product_id: int):
return await self.call_service(f"http://product-service:8002/products/{product_id}")
# async_communication.py
import aio_pika
async def publish_message(exchange_name: str, message: dict):
connection = await aio_pika.connect_robust("amqp://localhost")
channel = await connection.channel()
exchange = await channel.get_exchange(exchange_name)
await exchange.publish(
aio_pika.Message(body=str(message).encode()),
routing_key="order.created"
)
Practice Problems
- Implement a health check endpoint for each microservice
- Create a circuit breaker pattern to handle service failures
- Add authentication at the API gateway level
- Implement distributed tracing across services
- Create a message-based async communication system