Reinforcement Learning Basics
Reinforcement learning (RL) is learning through trial and error. An agent takes actions in an environment, receives rewards, and learns to maximize cumulative reward. Unlike supervised learning, there are no labels – only delayed feedback.
RL Agent-Environment Loop
The RL Framework
An agent interacts with an environment at discrete timesteps. At each step, it observes a state, takes an action, receives a reward, and transitions to a new state.
import numpy as np
import matplotlib.pyplot as plt
from collections import defaultdict
import warnings
warnings.filterwarnings('ignore')
Multi-Armed Bandits
The simplest RL problem: choose one of K actions repeatedly to maximize reward.
class MultiArmedBandit:
"""K-armed bandit with Gaussian rewards."""
def __init__(self, k=10, seed=42):
self.k = k
np.random.seed(seed)
self.true_values = np.random.normal(0, 1, k) # true action values
self.optimal_action = np.argmax(self.true_values)
def pull(self, action):
"""Pull arm and return reward."""
return np.random.normal(self.true_values[action], 1)
def regret(self, action):
"""Instantaneous regret."""
return self.true_values[self.optimal_action] - self.true_values[action]
# Greedy strategy
def greedy(bandit, n_steps):
q_estimates = np.zeros(bandit.k)
counts = np.zeros(bandit.k)
rewards = []
for t in range(n_steps):
action = np.argmax(q_estimates)
reward = bandit.pull(action)
counts[action] += 1
q_estimates[action] += (reward - q_estimates[action]) / counts[action]
rewards.append(reward)
return np.array(rewards)
# Epsilon-greedy strategy
def epsilon_greedy(bandit, n_steps, epsilon=0.1):
q_estimates = np.zeros(bandit.k)
counts = np.zeros(bandit.k)
rewards = []
for t in range(n_steps):
if np.random.random() < epsilon:
action = np.random.randint(bandit.k) # explore
else:
action = np.argmax(q_estimates) # exploit
reward = bandit.pull(action)
counts[action] += 1
q_estimates[action] += (reward - q_estimates[action]) / counts[action]
rewards.append(reward)
return np.array(rewards)
# UCB1 (Upper Confidence Bound)
def ucb1(bandit, n_steps):
q_estimates = np.zeros(bandit.k)
counts = np.zeros(bandit.k)
rewards = []
for t in range(1, n_steps + 1):
# Select action with highest UCB
ucb_values = q_estimates + np.sqrt(2 * np.log(t) / (counts + 1e-10))
action = np.argmax(ucb_values)
reward = bandit.pull(action)
counts[action] += 1
q_estimates[action] += (reward - q_estimates[action]) / counts[action]
rewards.append(reward)
return np.array(rewards)
# Thompson Sampling
def thompson_sampling(bandit, n_steps):
# Maintain Beta distributions for each arm
alpha = np.ones(bandit.k) # successes
beta = np.ones(bandit.k) # failures
rewards = []
for t in range(n_steps):
# Sample from each arm's Beta distribution
samples = np.random.beta(alpha, beta)
action = np.argmax(samples)
reward = bandit.pull(action)
rewards.append(reward)
# Update (simplified: positive reward → success)
if reward > 0:
alpha[action] += 1
else:
beta[action] += 1
return np.array(rewards)
# Compare strategies
bandit = MultiArmedBandit(k=10)
n_steps = 1000
strategies = {
'Greedy': greedy(bandit, n_steps),
'ε-Greedy (0.1)': epsilon_greedy(bandit, n_steps, 0.1),
'UCB1': ucb1(bandit, n_steps),
'Thompson Sampling': thompson_sampling(bandit, n_steps)
}
print("Cumulative rewards after 1000 steps:")
for name, rewards in strategies.items():
print(f" {name}: {rewards.sum():.1f}")
Markov Decision Processes (MDPs)
An MDP is defined by states, actions, transition probabilities, rewards, and a discount factor.
class GridWorld:
"""Simple 4x4 grid world MDP."""
def __init__(self, size=4):
self.size = size
self.n_states = size * size
self.n_actions = 4 # up, down, left, right
self.goal = size * size - 1
self.start = 0
def reset(self):
self.state = self.start
return self.state
def step(self, action):
"""Take action and return (next_state, reward, done)."""
row, col = divmod(self.state, self.size)
if action == 0: # up
row = max(0, row - 1)
elif action == 1: # down
row = min(self.size - 1, row + 1)
elif action == 2: # left
col = max(0, col - 1)
elif action == 3: # right
col = min(self.size - 1, col + 1)
self.state = row * self.size + col
reward = 1 if self.state == self.goal else -0.01
done = self.state == self.goal
return self.state, reward, done
def get_transitions(self, state, action):
"""Get transition probabilities (deterministic in this env)."""
row, col = divmod(state, self.size)
if action == 0:
row = max(0, row - 1)
elif action == 1:
row = min(self.size - 1, row + 1)
elif action == 2:
col = max(0, col - 1)
elif action == 3:
col = min(self.size - 1, col + 1)
next_state = row * self.size + col
return [(1.0, next_state)]
env = GridWorld(size=4)
print(f"States: {env.n_states}, Actions: {env.n_actions}")
print(f"Goal state: {env.goal}")
Value Iteration
Computing optimal values by iterating the Bellman equation.
def value_iteration(env, gamma=0.99, theta=1e-6):
"""Compute optimal state values."""
V = np.zeros(env.n_states)
policy = np.zeros(env.n_states, dtype=int)
iteration = 0
while True:
delta = 0
for s in range(env.n_states):
if s == env.goal:
continue
v_old = V[s]
# Bellman optimality update
q_values = np.zeros(env.n_actions)
for a in range(env.n_actions):
transitions = env.get_transitions(s, a)
for prob, next_state in transitions:
reward = 1 if next_state == env.goal else -0.01
q_values[a] += prob * (reward + gamma * V[next_state])
V[s] = np.max(q_values)
policy[s] = np.argmax(q_values)
delta = max(delta, abs(v_old - V[s]))
iteration += 1
if delta < theta:
break
return V, policy, iteration
V, policy, iterations = value_iteration(env)
print(f"Value iteration converged in {iterations} iterations")
print(f"State 0 value: {V[0]:.4f}")
print(f"Optimal action at state 0: {['UP', 'DOWN', 'LEFT', 'RIGHT'][policy[0]]}")
Policy Iteration
交替进行ç–略评估和ç–略改进。
def policy_iteration(env, gamma=0.99):
"""Compute optimal policy via policy iteration."""
policy = np.random.randint(0, env.n_actions, env.n_states)
V = np.zeros(env.n_states)
iteration = 0
while True:
# Policy evaluation
while True:
delta = 0
for s in range(env.n_states):
if s == env.goal:
continue
v_old = V[s]
a = policy[s]
transitions = env.get_transitions(s, a)
V[s] = sum(prob * (1 if ns == env.goal else -0.01 + gamma * V[ns])
for prob, ns in transitions)
delta = max(delta, abs(v_old - V[s]))
if delta < 1e-8:
break
# Policy improvement
policy_stable = True
for s in range(env.n_states):
if s == env.goal:
continue
old_action = policy[s]
q_values = np.zeros(env.n_actions)
for a in range(env.n_actions):
transitions = env.get_transitions(s, a)
q_values[a] = sum(prob * (1 if ns == env.goal else -0.01 + gamma * V[ns])
for prob, ns in transitions)
policy[s] = np.argmax(q_values)
if old_action != policy[s]:
policy_stable = False
iteration += 1
if policy_stable:
break
return V, policy, iteration
V_pi, policy_pi, iters = policy_iteration(env)
print(f"Policy iteration converged in {iters} sweeps")
Q-Learning (Model-Free)
Q-learning learns action values directly from experience without knowing transition probabilities. The Q-learning update rule adjusts estimates toward the Bellman optimum:
where is the learning rate, is the discount factor, and is the TD target.
def q_learning(env, n_episodes=1000, alpha=0.1, gamma=0.99, epsilon=0.1):
"""Tabular Q-learning."""
Q = np.zeros((env.n_states, env.n_actions))
rewards_per_episode = []
for episode in range(n_episodes):
state = env.reset()
total_reward = 0
done = False
while not done:
# Epsilon-greedy action selection
if np.random.random() < epsilon:
action = np.random.randint(env.n_actions)
else:
action = np.argmax(Q[state])
next_state, reward, done = env.step(action)
total_reward += reward
# Q-learning update
best_next = np.max(Q[next_state])
Q[state, action] += alpha * (reward + gamma * best_next - Q[state, action])
state = next_state
rewards_per_episode.append(total_reward)
policy = np.argmax(Q, axis=1)
return Q, policy, rewards_per_episode
Q, learned_policy, rewards = q_learning(env, n_episodes=2000)
print(f"Final average reward: {np.mean(rewards[-100:]):.3f}")
print(f"Learned policy at start: {['UP', 'DOWN', 'LEFT', 'RIGHT'][learned_policy[0]]}")
SARSA (On-Policy Learning)
def sarsa(env, n_episodes=1000, alpha=0.1, gamma=0.99, epsilon=0.1):
"""SARSA: on-policy temporal difference learning."""
Q = np.zeros((env.n_states, env.n_actions))
for episode in range(n_episodes):
state = env.reset()
if np.random.random() < epsilon:
action = np.random.randint(env.n_actions)
else:
action = np.argmax(Q[state])
done = False
while not done:
next_state, reward, done = env.step(action)
if np.random.random() < epsilon:
next_action = np.random.randint(env.n_actions)
else:
next_action = np.argmax(Q[next_state])
# SARSA update
Q[state, action] += alpha * (
reward + gamma * Q[next_state, next_action] - Q[state, action]
)
state, action = next_state, next_action
return Q, np.argmax(Q, axis=1)
Q_sarsa, policy_sarsa = sarsa(env)
print("SARSA policy at start:", ['UP', 'DOWN', 'LEFT', 'RIGHT'][policy_sarsa[0]])
Deep Q-Network (DQN) Concept
import torch
import torch.nn as nn
class DQN(nn.Module):
"""Simple DQN for tabular-like problems."""
def __init__(self, n_states, n_actions, hidden=64):
super().__init__()
self.net = nn.Sequential(
nn.Linear(n_states, hidden),
nn.ReLU(),
nn.Linear(hidden, hidden),
nn.ReLU(),
nn.Linear(hidden, n_actions)
)
def forward(self, x):
return self.net(x)
# Experience replay buffer
class ReplayBuffer:
def __init__(self, capacity=10000):
self.buffer = []
self.capacity = capacity
def push(self, state, action, reward, next_state, done):
if len(self.buffer) >= self.capacity:
self.buffer.pop(0)
self.buffer.append((state, action, reward, next_state, done))
def sample(self, batch_size):
indices = np.random.choice(len(self.buffer), batch_size, replace=False)
batch = [self.buffer[i] for i in indices]
states, actions, rewards, next_states, dones = zip(*batch)
return (
torch.FloatTensor(np.array(states)),
torch.LongTensor(actions),
torch.FloatTensor(rewards),
torch.FloatTensor(np.array(next_states)),
torch.FloatTensor(dones)
)
# One training step
model = DQN(16, 4)
target_model = DQN(16, 4)
target_model.load_state_dict(model.state_dict())
buffer = ReplayBuffer()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
# Simulate one step
state = np.eye(16)[0] # one-hot state
action = 0
next_state = np.eye(16)[4]
reward = -0.01
done = False
buffer.push(state, action, reward, next_state, done)
states, actions, rewards, next_states, dones = buffer.sample(1)
q_values = model(states)
q_value = q_values[0, actions[0]]
target = rewards[0] + 0.99 * target_model(next_states).max() * (1 - dones[0])
loss = (q_value - target) ** 2
loss.backward()
optimizer.step()
print(f"DQN loss: {loss.item():.6f}")
Best Practices
- Start with tabular methods – understand fundamentals before deep RL
- Tune epsilon carefully – too high wastes exploration, too low gets stuck
- Use target networks – stabilize deep Q-learning training
- Experience replay – break correlations in sequential data
- Discount factor γ – closer to 1 for long horizons, lower for myopic agents
- Evaluate thoroughly – RL is notoriously unstable; run multiple seeds
Summary
Reinforcement learning learns optimal behavior through interaction. Multi-armed bandits introduce exploration, MDPs formalize sequential decision-making, and Q-learning/policy gradient provide practical algorithms. Master these foundations before tackling deep RL.