Python Networking — HTTP, Sockets & APIs
Networking is essential for web scraping, API consumption, and building network applications. Python provides both low-level socket programming and high-level HTTP libraries.
Learning Objectives
- Make HTTP requests with the requests library
- Understand socket programming basics (TCP/UDP)
- Build API clients with proper authentication and error handling
- Handle network errors, timeouts, and retries gracefully
- Use sessions for connection pooling and cookie persistence
- Parse DNS responses and perform port scanning
HTTP with requests
The requests library is the de facto standard for HTTP in Python:
import requests
# Basic GET request
response = requests.get('https://api.github.com/users/python')
print(response.status_code) # 200
print(response.headers['content-type']) # application/json
print(response.json()) # Parsed JSON response
# GET with query parameters
params = {'q': 'python', 'per_page': 10, 'sort': 'stars'}
response = requests.get('https://api.github.com/search/repositories', params=params)
print(response.url) # Full URL with encoded parameters
POST, PUT, PATCH, DELETE
import requests
# POST with JSON body
data = {'username': 'alice', 'email': 'alice@example.com'}
response = requests.post('https://httpbin.org/post', json=data)
print(response.json())
# POST with form data
form_data = {'username': 'alice', 'password': 'secret'}
response = requests.post('https://httpbin.org/post', data=form_data)
# PUT request
response = requests.put('https://httpbin.org/put', json={'id': 1, 'name': 'Updated'})
# PATCH request
response = requests.patch('https://httpbin.org/patch', json={'name': 'Patched'})
# DELETE request
response = requests.delete('https://httpbin.org/delete')
Request Headers and Authentication
import requests
# Custom headers
headers = {
'User-Agent': 'MyApp/1.0',
'Accept': 'application/json',
'Content-Type': 'application/json'
}
response = requests.get('https://api.github.com', headers=headers)
# Basic Authentication
response = requests.get('https://api.github.com/user', auth=('username', 'password'))
# Bearer Token Authentication
headers = {'Authorization': 'Bearer your_token_here'}
response = requests.get('https://api.example.com/protected', headers=headers)
# API Key in query parameters
params = {'api_key': 'your_key_here', 'query': 'python'}
response = requests.get('https://api.example.com/search', params=params)
Error Handling
Proper error handling is critical for network applications:
import requests
from requests.exceptions import (
HTTPError, ConnectionError, Timeout, RequestException
)
def safe_request(url, method='GET', **kwargs):
"""Make HTTP request with comprehensive error handling."""
try:
response = requests.request(method, url, timeout=10, **kwargs)
response.raise_for_status() # Raise HTTPError for 4xx/5xx
return response
except HTTPError as e:
print(f"HTTP error {e.response.status_code}: {e}")
if e.response.status_code == 404:
print("Resource not found")
elif e.response.status_code == 401:
print("Authentication required")
elif e.response.status_code == 429:
print("Rate limited — retry later")
return None
except ConnectionError:
print(f"Failed to connect to {url}")
return None
except Timeout:
print(f"Request to {url} timed out")
return None
except RequestException as e:
print(f"Request failed: {e}")
return None
Retry Logic with Exponential Backoff
import requests
import time
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def create_session_with_retries(max_retries=3, backoff_factor=0.5):
"""Create requests session with automatic retries."""
session = requests.Session()
retry_strategy = Retry(
total=max_retries,
backoff_factor=backoff_factor,
status_forcelist=[429, 500, 502, 503, 504],
allowed_methods=["HEAD", "GET", "OPTIONS"]
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
# Use the session
session = create_session_with_retries()
response = session.get('https://api.example.com/data')
Sessions and Cookies
Sessions maintain state across requests and enable connection pooling:
import requests
# Session for login flow
with requests.Session() as s:
# Login
login_data = {'username': 'alice', 'password': 'secret'}
s.post('https://example.com/login', data=login_data)
# Cookies are automatically maintained
dashboard = s.get('https://example.com/dashboard')
profile = s.get('https://example.com/profile')
# Session-level settings
s.headers.update({'User-Agent': 'MyApp/1.0'})
s.auth = ('api_key', 'your_key')
# All requests in session use these settings
response = s.get('https://api.example.com/data')
Connection Pooling Benefits
import requests
import time
# Without session — new connection each time
start = time.time()
for i in range(10):
requests.get('https://httpbin.org/get')
print(f"Without session: {time.time() - start:.2f}s")
# With session — reuses connection
start = time.time()
with requests.Session() as s:
for i in range(10):
s.get('https://httpbin.org/get')
print(f"With session: {time.time() - start:.2f}s")
# Sessions are typically 2-3x faster for multiple requests
Socket Programming
Low-level networking with Python's socket module:
TCP Client
import socket
# Simple TCP client
def tcp_client(host, port, message):
"""Send message to TCP server and receive response."""
with socket.create_connection((host, port)) as sock:
# Send data
sock.sendall(message.encode('utf-8'))
# Receive response
response = sock.recv(4096)
return response.decode('utf-8')
# HTTP GET using raw TCP
def http_get(host, path='/'):
"""Make HTTP GET request using raw sockets."""
with socket.create_connection((host, 80)) as sock:
request = f"GET {path} HTTP/1.1\r\nHost: {host}\r\nConnection: close\r\n\r\n"
sock.sendall(request.encode('utf-8'))
response = b''
while True:
data = sock.recv(4096)
if not data:
break
response += data
return response.decode('utf-8', errors='replace')
TCP Server
import socket
import threading
def handle_client(conn, addr):
"""Handle a single client connection."""
print(f"New connection from {addr}")
with conn:
while True:
data = conn.recv(1024)
if not data:
break
print(f"Received from {addr}: {data.decode()}")
conn.sendall(f"Echo: {data.decode()}".encode())
print(f"Connection from {addr} closed")
def tcp_server(host='0.0.0.0', port=8888):
"""Simple TCP echo server."""
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((host, port))
s.listen(5)
print(f"Server listening on {host}:{port}")
while True:
conn, addr = s.accept()
# Handle each client in a new thread
thread = threading.Thread(target=handle_client, args=(conn, addr))
thread.daemon = True
thread.start()
UDP Socket
import socket
# UDP client — connectionless
def udp_client(host, port, message):
"""Send UDP message."""
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.sendto(message.encode('utf-8'), (host, port))
data, addr = s.recvfrom(4096)
return data.decode('utf-8')
# UDP server
def udp_server(host='0.0.0.0', port=9999):
"""Simple UDP echo server."""
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.bind((host, port))
print(f"UDP server listening on {host}:{port}")
while True:
data, addr = s.recvfrom(1024)
print(f"Received from {addr}: {data.decode()}")
s.sendto(f"Echo: {data.decode()}".encode(), addr)
DNS Resolution
import socket
# Resolve hostname to IP
hostname = 'google.com'
ip_address = socket.gethostbyname(hostname)
print(f"{hostname} resolves to {ip_address}")
# Get all IPs for a hostname
ips = socket.getaddrinfo(hostname, None)
for family, type, proto, canonname, sockaddr in ips:
print(f"IP: {sockaddr[0]}")
# Reverse DNS lookup
ip = '8.8.8.8'
hostname = socket.gethostbyaddr(ip)
print(f"{ip} reverse DNS: {hostname[0]}")
Port Scanning
import socket
from concurrent.futures import ThreadPoolExecutor
def scan_port(host, port, timeout=1):
"""Check if a single port is open."""
try:
with socket.create_connection((host, port), timeout=timeout):
return port, True
except (socket.timeout, ConnectionRefusedError, OSError):
return port, False
def scan_ports(host, port_range=range(1, 1024), max_workers=100):
"""Scan a range of ports on a host."""
open_ports = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [
executor.submit(scan_port, host, port)
for port in port_range
]
for future in futures:
port, is_open = future.result()
if is_open:
open_ports.append(port)
print(f"Port {port}: OPEN")
return sorted(open_ports)
# Usage
open_ports = scan_ports('localhost', range(1, 100))
print(f"Open ports: {open_ports}")
Real-World Examples
Example 1: REST API Client
import requests
from typing import Optional, Dict, Any
class APIClient:
"""Reusable REST API client with authentication."""
def __init__(self, base_url: str, api_key: str):
self.base_url = base_url.rstrip('/')
self.session = requests.Session()
self.session.headers.update({
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json',
'Accept': 'application/json'
})
def _request(self, method: str, endpoint: str, **kwargs) -> Optional[Dict[str, Any]]:
"""Make authenticated API request."""
url = f"{self.base_url}{endpoint}"
try:
response = self.session.request(method, url, timeout=30, **kwargs)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
if e.response.status_code == 404:
return None
raise
def get(self, endpoint: str, params: Optional[Dict] = None) -> Optional[Dict]:
return self._request('GET', endpoint, params=params)
def post(self, endpoint: str, data: Dict) -> Optional[Dict]:
return self._request('POST', endpoint, json=data)
def put(self, endpoint: str, data: Dict) -> Optional[Dict]:
return self._request('PUT', endpoint, json=data)
def delete(self, endpoint: str) -> bool:
url = f"{self.base_url}{endpoint}"
try:
response = self.session.delete(url, timeout=30)
response.raise_for_status()
return True
except requests.exceptions.HTTPError:
return False
# Usage
client = APIClient('https://api.example.com', 'your_api_key')
users = client.get('/users', params={'active': True})
new_user = client.post('/users', {'name': 'Alice', 'email': 'alice@example.com'})
Example 2: Webhook Server
from http.server import HTTPServer, BaseHTTPRequestHandler
import json
class WebhookHandler(BaseHTTPRequestHandler):
"""Handle incoming webhook requests."""
def do_POST(self):
content_length = int(self.headers.get('Content-Length', 0))
body = self.rfile.read(content_length)
try:
payload = json.loads(body)
print(f"Received webhook: {payload}")
# Process the webhook
self.process_webhook(payload)
# Send success response
self.send_response(200)
self.send_header('Content-Type', 'application/json')
self.end_headers()
self.wfile.write(json.dumps({'status': 'ok'}).encode())
except json.JSONDecodeError:
self.send_response(400)
self.end_headers()
self.wfile.write(b'Invalid JSON')
def process_webhook(self, payload):
"""Process incoming webhook data."""
event_type = payload.get('event')
if event_type == 'payment.received':
handle_payment(payload['data'])
elif event_type == 'user.created':
handle_new_user(payload['data'])
def run_webhook_server(port=8080):
server = HTTPServer(('0.0.0.0', port), WebhookHandler)
print(f"Webhook server running on port {port}")
server.serve_forever()
Example 3: Download Manager
import requests
import os
from concurrent.futures import ThreadPoolExecutor
from urllib.parse import urlparse
def download_file(url, output_dir='.'):
"""Download a single file with progress tracking."""
filename = os.path.basename(urlparse(url).path) or 'download'
filepath = os.path.join(output_dir, filename)
response = requests.get(url, stream=True, timeout=30)
response.raise_for_status()
total_size = int(response.headers.get('content-length', 0))
downloaded = 0
with open(filepath, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
downloaded += len(chunk)
if total_size:
progress = (downloaded / total_size) * 100
print(f"\rDownloading {filename}: {progress:.1f}%", end='')
print(f"\nDownloaded: {filepath}")
return filepath
def download_multiple(urls, max_workers=5, output_dir='.'):
"""Download multiple files concurrently."""
os.makedirs(output_dir, exist_ok=True)
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = [
executor.submit(download_file, url, output_dir)
for url in urls
]
results = []
for future in futures:
try:
result = future.result()
results.append(result)
except Exception as e:
print(f"Download failed: {e}")
return results
Common Mistakes
| Mistake | Problem | Solution |
|---|---|---|
| No timeout on requests | Requests hang forever | Always set timeout= parameter |
| Not checking status codes | Silent failures | Use raise_for_status() |
| Creating new connections | Slow for multiple requests | Use requests.Session() |
| Not handling exceptions | Crashes on network errors | Use try/except with specific exceptions |
| Blocking in main thread | UI freezes, poor UX | Use threading or async for concurrent requests |
| Not closing sockets | Resource leaks | Use context managers (with statements) |
Best Practices
# 1. Always use timeouts
response = requests.get(url, timeout=(3.05, 30)) # (connect, read)
# 2. Use sessions for multiple requests
with requests.Session() as s:
s.headers.update({'Authorization': 'Bearer token'})
for url in urls:
s.get(url)
# 3. Handle rate limiting
import time
from requests.exceptions import HTTPError
def rate_limited_request(url, max_retries=3):
for attempt in range(max_retries):
response = requests.get(url, timeout=10)
if response.status_code == 429:
wait_time = int(response.headers.get('Retry-After', 60))
print(f"Rate limited. Waiting {wait_time}s...")
time.sleep(wait_time)
continue
response.raise_for_status()
return response
raise Exception("Max retries exceeded")
# 4. Validate SSL certificates (don't disable verification in production)
response = requests.get(url, verify=True) # Default, don't change this
# 5. Use connection pooling for high-throughput applications
from requests.adapters import HTTPAdapter
session = requests.Session()
adapter = HTTPAdapter(pool_connections=10, pool_maxsize=10)
session.mount('https://', adapter)
Key Takeaways
- Use
requestsfor HTTP — it's simpler and more Pythonic than raw sockets - Always set
timeouton requests to prevent hanging - Use
raise_for_status()to detect HTTP errors automatically - Sessions maintain cookies and enable connection pooling for better performance
- Use
json=parameter for JSON request bodies instead of manually encoding - For low-level control (TCP/UDP), use the
socketmodule with context managers - Handle network errors gracefully — networks are unreliable by nature