Microservices - Service Discovery, API Gateway, Communication

MicroservicesMicroservices ArchitectureFree Lesson

Advertisement

Introduction

Microservices architecture structures an application as a collection of loosely coupled services. This tutorial covers service discovery, API gateways, and inter-service communication patterns.

Service Discovery

# service_registry.py
import requests
import time
from typing import Dict, List

class ServiceRegistry:
    def __init__(self, consul_url: str = "http://localhost:8500"):
        self.consul_url = consul_url
    
    def register(self, service_name: str, host: str, port: int):
        url = f"{self.consul_url}/v1/agent/service/register"
        payload = {
            "ID": f"{service_name}-{host}-{port}",
            "Name": service_name,
            "Address": host,
            "Port": port,
            "Check": {
                "HTTP": f"http://{host}:{port}/health",
                "Interval": "10s"
            }
        }
        requests.put(url, json=payload)
    
    def discover(self, service_name: str) -> List[Dict]:
        url = f"{self.consul_url}/v1/health/service/{service_name}"
        response = requests.get(url)
        services = response.json()
        return [
            {"host": s['Service']['Address'], "port": s['Service']['Port']}
            for s in services
        ]

class ServiceClient:
    def __init__(self, service_name: str, registry: ServiceRegistry):
        self.service_name = service_name
        self.registry = registry
        self.instances = []
        self.last_update = 0
    
    def _refresh_instances(self):
        if time.time() - self.last_update > 30:
            self.instances = self.registry.discover(self.service_name)
            self.last_update = time.time()
    
    def get_instance(self):
        self._refresh_instances()
        if not self.instances:
            raise Exception(f"No instances of {self.service_name}")
        return self.instances[0]
    
    def call(self, path: str):
        instance = self.get_instance()
        url = f"http://{instance['host']}:{instance['port']}{path}"
        return requests.get(url).json()

API Gateway

# gateway.py
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
import httpx

app = FastAPI()

# Route configuration
ROUTES = {
    "/api/users": "user-service:8001",
    "/api/products": "product-service:8002",
    "/api/orders": "order-service:8003",
}

@app.api_route("/{path:path}", methods=["GET", "POST", "PUT", "DELETE"])
async def gateway(path: str, request: Request):
    full_path = f"/{path}"
    
    if full_path not in ROUTES:
        raise HTTPException(status_code=404, detail="Route not found")
    
    service = ROUTES[full_path]
    method = request.method
    
    # Forward request
    async with httpx.AsyncClient() as client:
        response = await client.request(
            method=method,
            url=f"http://{service}{request.url.path}",
            headers=dict(request.headers),
            content=await request.body()
        )
    
    return JSONResponse(
        content=response.json(),
        status_code=response.status_code
    )

Inter-Service Communication

# sync_communication.py
import httpx
import asyncio

class ServiceCommunicator:
    def __init__(self, timeout: float = 5.0):
        self.client = httpx.AsyncClient(timeout=timeout)
    
    async def call_service(self, url: str, method: str = "GET", **kwargs):
        response = await self.client.request(method, url, **kwargs)
        return response.json()
    
    async def call_user_service(self, user_id: int):
        return await self.call_service(f"http://user-service:8001/users/{user_id}")
    
    async def call_product_service(self, product_id: int):
        return await self.call_service(f"http://product-service:8002/products/{product_id}")

# async_communication.py
import aio_pika

async def publish_message(exchange_name: str, message: dict):
    connection = await aio_pika.connect_robust("amqp://localhost")
    channel = await connection.channel()
    exchange = await channel.get_exchange(exchange_name)
    
    await exchange.publish(
        aio_pika.Message(body=str(message).encode()),
        routing_key="order.created"
    )

Practice Problems

  1. Implement a health check endpoint for each microservice
  2. Create a circuit breaker pattern to handle service failures
  3. Add authentication at the API gateway level
  4. Implement distributed tracing across services
  5. Create a message-based async communication system

Advertisement

Need Expert Python Help?

Get personalized tutoring, project support, or professional consulting.

Advertisement