Voor dit labo dien je volgende packages ad-hoc te installeren in de devcontainer:
!uv add pygame
!uv add swig
!uv add gymnasium[box2d]CartPole¶
CartPole is een klassiek controle probleem waarbij een staaf rechtop moet blijven op een kar die heen en weer kan bewegen.
Het Probleem¶
State: 4 continue waarden (positie kar, snelheid kar, hoek staaf, hoeksnelheid staaf)
Actions: 2 discrete acties (duw naar links of rechts)
Reward: +1 voor elke tijdstap waarbij de staaf rechtop blijft
Doel: Hou de staaf zo lang mogelijk rechtop (max 500 tijdstappen)
Setup¶
We gebruiken:
Gymnasium: Een framework voor RL omgevingen (oorspronkelijk van OpenAI)
Stable-Baselines3: Kwalitatieve PyTorch implementaties van RL algoritmes
import gymnasium as gym
import numpy as np
import pandas as pd
import plotly.express as px
import torchDe Omgeving Verkennen¶
# Create the CartPole environment
env = gym.make("CartPole-v1", render_mode="rgb_array")
# Reset environment to get initial state
state, info = env.reset(seed=42)
print("=== CartPole Environment ===")
print(f"State space: {env.observation_space}")
print(f"Action space: {env.action_space}")
print(f"\nInitial state: {state}")
print("\nState components:")
print(f" [0] Cart Position: {state[0]:.3f}")
print(f" [1] Cart Velocity: {state[1]:.3f}")
print(f" [2] Pole Angle: {state[2]:.3f}")
print(f" [3] Pole Angular Velocity: {state[3]:.3f}")
print("\nPossible actions:")
print(" 0: Push cart to the LEFT")
print(" 1: Push cart to the RIGHT")Random baseline agent¶
Voordat we een intelligent model trainen, kijken we eerst hoe een random agent (die willekeurige acties neemt) presteert. Dit geeft ons een baseline.
# Test random agent
def evaluate_random_agent(env, n_episodes=10, seed=42):
"""
Evaluate a random agent that takes random actions.
Args:
env: Gymnasium environment
n_episodes: Number of episodes to run
seed: Random seed for reproducibility
Returns
-------
List of episode rewards
"""
episode_rewards = []
for episode in range(n_episodes):
state, info = env.reset(seed=seed + episode)
episode_reward = 0
done = False
truncated = False
while not (done or truncated):
# Random action
action = env.action_space.sample()
state, reward, done, truncated, info = env.step(action)
episode_reward += reward
episode_rewards.append(episode_reward)
return episode_rewards
# Evaluate random agent
random_rewards = evaluate_random_agent(env, n_episodes=100)
print("=== Random Agent Performance ===")
print(f"Average reward: {np.mean(random_rewards):.2f} ± {np.std(random_rewards):.2f}")
print(f"Min reward: {np.min(random_rewards):.2f}")
print(f"Max reward: {np.max(random_rewards):.2f}")
px.histogram(random_rewards, nbins=20, title="Random Agent: Reward Distribution").add_vline(
x=np.mean(random_rewards),
line_dash="dash",
line_color="red",
annotation_text=f"Mean: {np.mean(random_rewards):.1f}",
).show()px.line(
y=random_rewards,
title="Random Agent: Reward per Episode",
labels={"x": "Episode", "y": "Reward"},
).add_hline(
y=np.mean(random_rewards),
line_dash="dash",
line_color="red",
annotation_text=f"Mean: {np.mean(random_rewards):.1f}",
).show()
Training met Deep Q-Network (DQN)¶
Nu gaan we een Deep Q-Network (DQN) trainen om een intelligente policy te leren. DQN is een value-based methode die een neural network gebruikt om de optimale -functie te benaderen.
from stable_baselines3 import DQN
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.monitor import Monitor# Create a fresh environment for training
env = gym.make("CartPole-v1")
# Create DQN model with better hyperparameters
# The neural network will learn Q(s,a) for each state-action pair
model = DQN(
"MlpPolicy", # Multi-Layer Perceptron policy network
env,
learning_rate=1e-3,
buffer_size=50000,
learning_starts=1000, # Start learning after more experiences
batch_size=64, # Larger batch size for more stable learning
tau=1.0,
gamma=0.99, # Discount factor
train_freq=4,
target_update_interval=250,
exploration_fraction=0.1,
exploration_initial_eps=1.0,
exploration_final_eps=0.02, # Lower final exploration
# verbose=1, # Show training progress
tensorboard_log=None,
)# Train the agent for longer
model.learn(total_timesteps=100000, progress_bar=True)
# Evaluate the trained model (wrap env with Monitor to avoid warning)
eval_env = Monitor(gym.make("CartPole-v1"))
mean_reward, std_reward = evaluate_policy(model, eval_env, n_eval_episodes=100, deterministic=True)
eval_env.close()
print(f" Random Agent: {np.mean(random_rewards):.2f} ± {np.std(random_rewards):.2f}")
print(f" Trained DQN: {mean_reward:.2f} ± {std_reward:.2f}")Visualisatie¶
from IPython.display import HTML
from matplotlib import animation
from matplotlib import pyplot as plt
def create_animation(frames, interval=50):
"""
Create an animation from frames.
Args:
frames: List of RGB arrays
interval: Delay between frames in milliseconds
Returns
-------
matplotlib animation object
"""
fig, ax = plt.subplots(figsize=(8, 6))
ax.axis("off")
# Display first frame
img = ax.imshow(frames[0])
def animate(frame_idx):
img.set_array(frames[frame_idx])
ax.set_title(f"Step {frame_idx}/{len(frames) - 1}", fontsize=14)
return [img]
anim = animation.FuncAnimation(fig, animate, frames=len(frames), interval=interval, blit=True)
plt.close() # Don't show the static figure
return anim# Visualize trained agent
def visualize_agent(model, env, n_steps=1000):
"""
Run agent in environment and collect frames for visualization.
Args:
model: Trained RL model
env: Gymnasium environment
n_steps: Maximum number of steps
Returns
-------
frames, rewards, actions
"""
frames = []
rewards_list = []
actions_list = []
state, info = env.reset(seed=42)
frames.append(env.render())
for _ in range(n_steps):
# Get action from trained policy (deterministic)
action, _states = model.predict(state, deterministic=True)
actions_list.append(int(action))
# Take action in environment
state, reward, done, truncated, info = env.step(action)
rewards_list.append(reward)
frames.append(env.render())
if done or truncated:
break
return frames, rewards_list, actions_list
# Create environment with rendering
env_render = gym.make("CartPole-v1", render_mode="rgb_array")
frames, rewards_list, actions_list = visualize_agent(model, env_render, n_steps=1000)
env_render.close()
print(f"\nEpisode lasted {len(rewards_list)} steps")
print(f"Total reward: {sum(rewards_list):.0f}")
print(f"Action distribution: LEFT={actions_list.count(0)}, RIGHT={actions_list.count(1)}")# Create animation of the trained CartPole agent
print("Creating animation...")
anim = create_animation(frames, interval=50)
# Display the animation
HTML(anim.to_jshtml())Analyse¶
DQN leert een Q-function die voor elke state-action combinatie voorspelt wat de verwachte cumulatieve reward (return) is.
Bij een greedy policy, kiest de agent altijd actie met de hoogste Q-waarde:
# Analyze Q-values for different states
def analyze_q_values(model, env, n_samples=100):
"""
Sample random states and analyze Q-values.
Args:
model: Trained DQN model
env: Gymnasium environment
n_samples: Number of states to sample
Returns
-------
states, q_values, actions
"""
states = []
q_values_left = []
q_values_right = []
chosen_actions = []
for _ in range(n_samples):
state, _ = env.reset()
states.append(state)
# Get Q-values for both actions
with torch.no_grad():
q_values = model.q_net(torch.FloatTensor(state).unsqueeze(0))
q_values_left.append(q_values[0, 0].item())
q_values_right.append(q_values[0, 1].item())
chosen_actions.append(torch.argmax(q_values).item())
return np.array(states), q_values_left, q_values_right, chosen_actions
# Analyze Q-values
states, q_left, q_right, actions = analyze_q_values(model, env, n_samples=300)
print("\n=== Q-Value Analysis ===")
print(f"Average Q-value for LEFT: {np.mean(q_left):.2f}")
print(f"Average Q-value for RIGHT: {np.mean(q_right):.2f}")
print(f"Q-value range: [{min(q_left + q_right):.2f}, {max(q_left + q_right):.2f}]")
# Heatmap: Q-values based on Pole Angle and Cart Position
# Create a grid of states to visualize Q-values
print("\nCreating Q-value heatmap...")
angle_range = np.linspace(-0.3, 0.3, 40)
position_range = np.linspace(-2.4, 2.4, 40)
q_grid_left = np.zeros((len(angle_range), len(position_range)))
q_grid_right = np.zeros((len(angle_range), len(position_range)))
for i, angle in enumerate(angle_range):
for j, position in enumerate(position_range):
# Create a state with this angle and position, zero velocities
test_state = np.array([position, 0.0, angle, 0.0])
with torch.no_grad():
q_values = model.q_net(torch.FloatTensor(test_state).unsqueeze(0))
q_grid_left[i, j] = q_values[0, 0].item()
q_grid_right[i, j] = q_values[0, 1].item()
# Plot Q-value difference heatmap
q_diff_grid = q_grid_right - q_grid_left
fig = px.imshow(
q_diff_grid,
x=position_range,
y=angle_range,
color_continuous_scale="RdBu_r",
color_continuous_midpoint=0,
title="Learned Policy: Q(RIGHT) - Q(LEFT) for Different States",
labels={"x": "Cart Position", "y": "Pole Angle (radians)", "color": "Q(RIGHT) - Q(LEFT)"},
aspect="auto",
)
fig.update_layout(
xaxis_title="Cart Position (negative = left of center, positive = right of center)",
yaxis_title="Pole Angle (negative = leaning left, positive = leaning right)",
height=500,
)
fig.show()
print("\n💡 Interpretation of the Heatmap:")
print("- BLUE: Agent prefers action LEFT (push cart to the left)")
print("- RED: Agent prefers action RIGHT (push cart to the right)")
print("- The diagonal structure shows the learned strategy:")
print(" → When pole leans left, push left")
print(" → When pole leans right, push right")
print("- At the edges (extreme cart positions), the agent adjusts the strategy")
print(" to keep the cart within bounds")Training met PPO (Proximal Policy Optimization)¶
# Import PPO
from stable_baselines3 import PPO
# Create fresh environment
env_ppo = gym.make("CartPole-v1")
# Create PPO model
# PPO learns a policy π(a|s) directly, not Q-values
model_ppo = PPO(
"MlpPolicy",
env_ppo,
learning_rate=3e-4,
n_steps=2048,
batch_size=64,
n_epochs=10,
gamma=0.99,
gae_lambda=0.95,
clip_range=0.2,
verbose=0,
)
# Train PPO
model_ppo.learn(total_timesteps=50000, progress_bar=True)
# Evaluate PPO (wrap env with Monitor to avoid warning)
eval_env_ppo = Monitor(gym.make("CartPole-v1"))
mean_reward_ppo, std_reward_ppo = evaluate_policy(
model_ppo, eval_env_ppo, n_eval_episodes=100, deterministic=True
)
eval_env_ppo.close()
print(f"Random Agent: {np.mean(random_rewards):.2f} ± {np.std(random_rewards):.2f}")
print(f"DQN Agent: {mean_reward:.2f} ± {std_reward:.2f}")
print(f"PPO Agent: {mean_reward_ppo:.2f} ± {std_reward_ppo:.2f}")
# Visualize comparison
df_comparison = pd.DataFrame(
{
"Algorithm": ["Random", "DQN", "PPO"],
"Mean Reward": [np.mean(random_rewards), mean_reward, mean_reward_ppo],
"Std": [np.std(random_rewards), std_reward, std_reward_ppo],
}
)
fig = px.bar(
df_comparison,
x="Algorithm",
y="Mean Reward",
error_y="Std",
title="Algorithm Performance Comparison on CartPole-v1",
color="Algorithm",
color_discrete_map={"Random": "gray", "DQN": "blue", "PPO": "green"},
text=[
f"{m:.1f}±{s:.1f}"
for m, s in zip(df_comparison["Mean Reward"], df_comparison["Std"], strict=False)
],
)
fig.add_hline(y=500, line_dash="dash", line_color="red", annotation_text="Maximum possible (500)")
fig.update_layout(yaxis_range=[0, 550])
fig.show()LunarLander¶
Laten we nu een complexer probleem bekijken: LunarLander-v2. Hier moet een maanlander veilig landen op een landingsplatform.
Het Probleem¶
State: 8 continue waarden (positie, snelheid, hoek, hoeksnelheid, been-contact)
Actions: 4 discrete acties (niets, linker motor, hoofd motor, rechter motor)
Rewards:
+100 tot +140 voor succesvolle landing
-100 voor crash
Kleine negatieve rewards voor brandstofverbruik
Positieve rewards voor dichter bij landingszone
Doel: Land veilig met minimaal brandstofverbruik
# Create LunarLander environment
env_lunar = gym.make("LunarLander-v3")
# Explore the environment
state, info = env_lunar.reset(seed=42)
print("=== LunarLander-v2 Environment ===")
print(f"State space: {env_lunar.observation_space}")
print(f"Action space: {env_lunar.action_space}")
print(f"\nInitial state shape: {state.shape}")
print(f"State: {state}")
print("\nState components:")
print(" [0] X position")
print(" [1] Y position")
print(" [2] X velocity")
print(" [3] Y velocity")
print(" [4] Angle")
print(" [5] Angular velocity")
print(" [6] Left leg contact (0=no, 1=yes)")
print(" [7] Right leg contact (0=no, 1=yes)")
print("\nActions:")
print(" 0: Do nothing")
print(" 1: Fire left engine")
print(" 2: Fire main engine")
print(" 3: Fire right engine")
# Test random agent on LunarLander
print("\n=== Testing Random Agent ===")
random_rewards_lunar = evaluate_random_agent(env_lunar, n_episodes=20, seed=42)
print(f"Random Agent: {np.mean(random_rewards_lunar):.2f} ± {np.std(random_rewards_lunar):.2f}")
print("(Note: Negative rewards mean crashes!)")
Training met PPO op LunarLander¶
# Train PPO on LunarLander
model_lunar = PPO(
"MlpPolicy",
env_lunar,
learning_rate=3e-4,
n_steps=1024,
batch_size=64,
n_epochs=4,
gamma=0.999,
gae_lambda=0.98,
clip_range=0.2,
verbose=0,
)
# Train for longer since this is more complex
model_lunar.learn(total_timesteps=500000, progress_bar=True)
# Evaluate (wrap env with Monitor to avoid warning)
eval_env_lunar = Monitor(gym.make("LunarLander-v3"))
mean_reward_lunar, std_reward_lunar = evaluate_policy(
model_lunar, eval_env_lunar, n_eval_episodes=50, deterministic=True
)
eval_env_lunar.close()
print(f"Random Agent: {np.mean(random_rewards_lunar):.2f} ± {np.std(random_rewards_lunar):.2f}")
print(f"Trained PPO: {mean_reward_lunar:.2f} ± {std_reward_lunar:.2f}")
print("\nNote: Score > 200 is considered solved!")
status = "SOLVED ✓" if mean_reward_lunar > 200 else "Needs more training"
print(f"Status: {status}")
# Visualize performance
df_lunar = pd.DataFrame(
{
"Algorithm": ["Random", "PPO"],
"Mean Reward": [np.mean(random_rewards_lunar), mean_reward_lunar],
"Std": [np.std(random_rewards_lunar), std_reward_lunar],
}
)
fig = px.bar(
df_lunar,
x="Algorithm",
y="Mean Reward",
error_y="Std",
title="LunarLander-v2 Performance",
color="Algorithm",
color_discrete_map={"Random": "gray", "PPO": "green"},
)
fig.add_hline(y=200, line_dash="dash", line_color="red", annotation_text="Solved threshold (200)")
fig.add_hline(y=0, line_color="black")
fig.show()Visualisatie¶
# Visualize trained LunarLander agent
env_lunar_render = gym.make("LunarLander-v3", render_mode="rgb_array")
frames_lunar, rewards_lunar, actions_lunar = visualize_agent(
model_lunar, env_lunar_render, n_steps=500
)
env_lunar_render.close()
print("\n=== Episode Analysis ===")
print(f"Episode length: {len(rewards_lunar)} steps")
print(f"Total reward: {sum(rewards_lunar):.1f}")
print(
f"Final outcome: {'SUCCESS ✓' if sum(rewards_lunar) > 200 else 'CRASH' if sum(rewards_lunar) < 0 else 'PARTIAL'}"
)
print("\nAction usage:")
action_names = ["Do nothing", "Left engine", "Main engine", "Right engine"]
for action_id, action_name in enumerate(action_names):
count = actions_lunar.count(action_id)
percentage = (count / len(actions_lunar)) * 100
print(f" {action_name}: {count} times ({percentage:.1f}%)")
# Show action distribution
df_actions = pd.DataFrame(
{"Action": action_names, "Count": [actions_lunar.count(i) for i in range(4)]}
)
px.bar(
df_actions,
x="Action",
y="Count",
title=f"LunarLander Action Distribution (Total Reward: {sum(rewards_lunar):.1f})",
).show()
# Create animation of the trained LunarLander agent
print("Creating LunarLander animation...")
anim_lunar = create_animation(frames_lunar, interval=50)
# Display the animation
HTML(anim_lunar.to_jshtml())
