Skip to article frontmatterSkip to article content
Site not loading correctly?

This may be due to an incorrect BASE_URL configuration. See the MyST Documentation for reference.

Voor dit labo dien je volgende packages ad-hoc te installeren in de devcontainer:

!uv add pygame
!uv add swig
!uv add gymnasium[box2d]

CartPole

CartPole is een klassiek controle probleem waarbij een staaf rechtop moet blijven op een kar die heen en weer kan bewegen.

CartPole

Het Probleem

  • State: 4 continue waarden (positie kar, snelheid kar, hoek staaf, hoeksnelheid staaf)

  • Actions: 2 discrete acties (duw naar links of rechts)

  • Reward: +1 voor elke tijdstap waarbij de staaf rechtop blijft

  • Doel: Hou de staaf zo lang mogelijk rechtop (max 500 tijdstappen)

Setup

We gebruiken:

  • Gymnasium: Een framework voor RL omgevingen (oorspronkelijk van OpenAI)

  • Stable-Baselines3: Kwalitatieve PyTorch implementaties van RL algoritmes

import gymnasium as gym
import numpy as np
import pandas as pd
import plotly.express as px
import torch

De Omgeving Verkennen

# Create the CartPole environment
env = gym.make("CartPole-v1", render_mode="rgb_array")

# Reset environment to get initial state
state, info = env.reset(seed=42)

print("=== CartPole Environment ===")
print(f"State space: {env.observation_space}")
print(f"Action space: {env.action_space}")
print(f"\nInitial state: {state}")
print("\nState components:")
print(f"  [0] Cart Position: {state[0]:.3f}")
print(f"  [1] Cart Velocity: {state[1]:.3f}")
print(f"  [2] Pole Angle: {state[2]:.3f}")
print(f"  [3] Pole Angular Velocity: {state[3]:.3f}")
print("\nPossible actions:")
print("  0: Push cart to the LEFT")
print("  1: Push cart to the RIGHT")

Random baseline agent

Voordat we een intelligent model trainen, kijken we eerst hoe een random agent (die willekeurige acties neemt) presteert. Dit geeft ons een baseline.

# Test random agent
def evaluate_random_agent(env, n_episodes=10, seed=42):
    """
    Evaluate a random agent that takes random actions.

    Args:
        env: Gymnasium environment
        n_episodes: Number of episodes to run
        seed: Random seed for reproducibility

    Returns
    -------
        List of episode rewards
    """
    episode_rewards = []

    for episode in range(n_episodes):
        state, info = env.reset(seed=seed + episode)
        episode_reward = 0
        done = False
        truncated = False

        while not (done or truncated):
            # Random action
            action = env.action_space.sample()
            state, reward, done, truncated, info = env.step(action)
            episode_reward += reward

        episode_rewards.append(episode_reward)

    return episode_rewards


# Evaluate random agent
random_rewards = evaluate_random_agent(env, n_episodes=100)

print("=== Random Agent Performance ===")
print(f"Average reward: {np.mean(random_rewards):.2f} ± {np.std(random_rewards):.2f}")
print(f"Min reward: {np.min(random_rewards):.2f}")
print(f"Max reward: {np.max(random_rewards):.2f}")

px.histogram(random_rewards, nbins=20, title="Random Agent: Reward Distribution").add_vline(
    x=np.mean(random_rewards),
    line_dash="dash",
    line_color="red",
    annotation_text=f"Mean: {np.mean(random_rewards):.1f}",
).show()
px.line(
    y=random_rewards,
    title="Random Agent: Reward per Episode",
    labels={"x": "Episode", "y": "Reward"},
).add_hline(
    y=np.mean(random_rewards),
    line_dash="dash",
    line_color="red",
    annotation_text=f"Mean: {np.mean(random_rewards):.1f}",
).show()

Training met Deep Q-Network (DQN)

Nu gaan we een Deep Q-Network (DQN) trainen om een intelligente policy te leren. DQN is een value-based methode die een neural network gebruikt om de optimale QQ-functie Q(s,a)Q^*(s,a) te benaderen.

from stable_baselines3 import DQN
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.monitor import Monitor
# Create a fresh environment for training
env = gym.make("CartPole-v1")

# Create DQN model with better hyperparameters
# The neural network will learn Q(s,a) for each state-action pair
model = DQN(
    "MlpPolicy",  # Multi-Layer Perceptron policy network
    env,
    learning_rate=1e-3,
    buffer_size=50000,
    learning_starts=1000,  # Start learning after more experiences
    batch_size=64,  # Larger batch size for more stable learning
    tau=1.0,
    gamma=0.99,  # Discount factor
    train_freq=4,
    target_update_interval=250,
    exploration_fraction=0.1,
    exploration_initial_eps=1.0,
    exploration_final_eps=0.02,  # Lower final exploration
    # verbose=1,  # Show training progress
    tensorboard_log=None,
)
# Train the agent for longer
model.learn(total_timesteps=100000, progress_bar=True)

# Evaluate the trained model (wrap env with Monitor to avoid warning)
eval_env = Monitor(gym.make("CartPole-v1"))
mean_reward, std_reward = evaluate_policy(model, eval_env, n_eval_episodes=100, deterministic=True)
eval_env.close()

print(f"  Random Agent: {np.mean(random_rewards):.2f} ± {np.std(random_rewards):.2f}")
print(f"  Trained DQN:  {mean_reward:.2f} ± {std_reward:.2f}")

Visualisatie

from IPython.display import HTML
from matplotlib import animation
from matplotlib import pyplot as plt


def create_animation(frames, interval=50):
    """
    Create an animation from frames.

    Args:
        frames: List of RGB arrays
        interval: Delay between frames in milliseconds

    Returns
    -------
        matplotlib animation object
    """
    fig, ax = plt.subplots(figsize=(8, 6))
    ax.axis("off")

    # Display first frame
    img = ax.imshow(frames[0])

    def animate(frame_idx):
        img.set_array(frames[frame_idx])
        ax.set_title(f"Step {frame_idx}/{len(frames) - 1}", fontsize=14)
        return [img]

    anim = animation.FuncAnimation(fig, animate, frames=len(frames), interval=interval, blit=True)

    plt.close()  # Don't show the static figure
    return anim
# Visualize trained agent
def visualize_agent(model, env, n_steps=1000):
    """
    Run agent in environment and collect frames for visualization.

    Args:
        model: Trained RL model
        env: Gymnasium environment
        n_steps: Maximum number of steps

    Returns
    -------
        frames, rewards, actions
    """
    frames = []
    rewards_list = []
    actions_list = []

    state, info = env.reset(seed=42)
    frames.append(env.render())

    for _ in range(n_steps):
        # Get action from trained policy (deterministic)
        action, _states = model.predict(state, deterministic=True)
        actions_list.append(int(action))

        # Take action in environment
        state, reward, done, truncated, info = env.step(action)
        rewards_list.append(reward)
        frames.append(env.render())

        if done or truncated:
            break

    return frames, rewards_list, actions_list


# Create environment with rendering
env_render = gym.make("CartPole-v1", render_mode="rgb_array")
frames, rewards_list, actions_list = visualize_agent(model, env_render, n_steps=1000)
env_render.close()

print(f"\nEpisode lasted {len(rewards_list)} steps")
print(f"Total reward: {sum(rewards_list):.0f}")
print(f"Action distribution: LEFT={actions_list.count(0)}, RIGHT={actions_list.count(1)}")
# Create animation of the trained CartPole agent
print("Creating animation...")
anim = create_animation(frames, interval=50)

# Display the animation
HTML(anim.to_jshtml())

Analyse

DQN leert een Q-function Q(s,a)Q(s,a) die voor elke state-action combinatie voorspelt wat de verwachte cumulatieve reward (return) is.

Bij een greedy policy, kiest de agent altijd actie met de hoogste Q-waarde:

π(s)=argmaxaQ(s,a)\pi(s) = \arg\max_a Q(s,a)
# Analyze Q-values for different states
def analyze_q_values(model, env, n_samples=100):
    """
    Sample random states and analyze Q-values.

    Args:
        model: Trained DQN model
        env: Gymnasium environment
        n_samples: Number of states to sample

    Returns
    -------
        states, q_values, actions
    """
    states = []
    q_values_left = []
    q_values_right = []
    chosen_actions = []

    for _ in range(n_samples):
        state, _ = env.reset()
        states.append(state)

        # Get Q-values for both actions
        with torch.no_grad():
            q_values = model.q_net(torch.FloatTensor(state).unsqueeze(0))
            q_values_left.append(q_values[0, 0].item())
            q_values_right.append(q_values[0, 1].item())
            chosen_actions.append(torch.argmax(q_values).item())

    return np.array(states), q_values_left, q_values_right, chosen_actions


# Analyze Q-values
states, q_left, q_right, actions = analyze_q_values(model, env, n_samples=300)

print("\n=== Q-Value Analysis ===")
print(f"Average Q-value for LEFT: {np.mean(q_left):.2f}")
print(f"Average Q-value for RIGHT: {np.mean(q_right):.2f}")
print(f"Q-value range: [{min(q_left + q_right):.2f}, {max(q_left + q_right):.2f}]")

# Heatmap: Q-values based on Pole Angle and Cart Position
# Create a grid of states to visualize Q-values
print("\nCreating Q-value heatmap...")
angle_range = np.linspace(-0.3, 0.3, 40)
position_range = np.linspace(-2.4, 2.4, 40)
q_grid_left = np.zeros((len(angle_range), len(position_range)))
q_grid_right = np.zeros((len(angle_range), len(position_range)))

for i, angle in enumerate(angle_range):
    for j, position in enumerate(position_range):
        # Create a state with this angle and position, zero velocities
        test_state = np.array([position, 0.0, angle, 0.0])
        with torch.no_grad():
            q_values = model.q_net(torch.FloatTensor(test_state).unsqueeze(0))
            q_grid_left[i, j] = q_values[0, 0].item()
            q_grid_right[i, j] = q_values[0, 1].item()

# Plot Q-value difference heatmap
q_diff_grid = q_grid_right - q_grid_left

fig = px.imshow(
    q_diff_grid,
    x=position_range,
    y=angle_range,
    color_continuous_scale="RdBu_r",
    color_continuous_midpoint=0,
    title="Learned Policy: Q(RIGHT) - Q(LEFT) for Different States",
    labels={"x": "Cart Position", "y": "Pole Angle (radians)", "color": "Q(RIGHT) - Q(LEFT)"},
    aspect="auto",
)
fig.update_layout(
    xaxis_title="Cart Position (negative = left of center, positive = right of center)",
    yaxis_title="Pole Angle (negative = leaning left, positive = leaning right)",
    height=500,
)
fig.show()

print("\n💡 Interpretation of the Heatmap:")
print("- BLUE: Agent prefers action LEFT (push cart to the left)")
print("- RED: Agent prefers action RIGHT (push cart to the right)")
print("- The diagonal structure shows the learned strategy:")
print("  → When pole leans left, push left")
print("  → When pole leans right, push right")
print("- At the edges (extreme cart positions), the agent adjusts the strategy")
print("  to keep the cart within bounds")

Training met PPO (Proximal Policy Optimization)

# Import PPO
from stable_baselines3 import PPO

# Create fresh environment
env_ppo = gym.make("CartPole-v1")

# Create PPO model
# PPO learns a policy π(a|s) directly, not Q-values
model_ppo = PPO(
    "MlpPolicy",
    env_ppo,
    learning_rate=3e-4,
    n_steps=2048,
    batch_size=64,
    n_epochs=10,
    gamma=0.99,
    gae_lambda=0.95,
    clip_range=0.2,
    verbose=0,
)

# Train PPO
model_ppo.learn(total_timesteps=50000, progress_bar=True)

# Evaluate PPO (wrap env with Monitor to avoid warning)
eval_env_ppo = Monitor(gym.make("CartPole-v1"))
mean_reward_ppo, std_reward_ppo = evaluate_policy(
    model_ppo, eval_env_ppo, n_eval_episodes=100, deterministic=True
)
eval_env_ppo.close()

print(f"Random Agent: {np.mean(random_rewards):.2f} ± {np.std(random_rewards):.2f}")
print(f"DQN Agent:    {mean_reward:.2f} ± {std_reward:.2f}")
print(f"PPO Agent:    {mean_reward_ppo:.2f} ± {std_reward_ppo:.2f}")

# Visualize comparison
df_comparison = pd.DataFrame(
    {
        "Algorithm": ["Random", "DQN", "PPO"],
        "Mean Reward": [np.mean(random_rewards), mean_reward, mean_reward_ppo],
        "Std": [np.std(random_rewards), std_reward, std_reward_ppo],
    }
)
fig = px.bar(
    df_comparison,
    x="Algorithm",
    y="Mean Reward",
    error_y="Std",
    title="Algorithm Performance Comparison on CartPole-v1",
    color="Algorithm",
    color_discrete_map={"Random": "gray", "DQN": "blue", "PPO": "green"},
    text=[
        f"{m:.1f}±{s:.1f}"
        for m, s in zip(df_comparison["Mean Reward"], df_comparison["Std"], strict=False)
    ],
)
fig.add_hline(y=500, line_dash="dash", line_color="red", annotation_text="Maximum possible (500)")
fig.update_layout(yaxis_range=[0, 550])
fig.show()

LunarLander

Laten we nu een complexer probleem bekijken: LunarLander-v2. Hier moet een maanlander veilig landen op een landingsplatform.

LunarLander

Het Probleem

  • State: 8 continue waarden (positie, snelheid, hoek, hoeksnelheid, been-contact)

  • Actions: 4 discrete acties (niets, linker motor, hoofd motor, rechter motor)

  • Rewards:

    • +100 tot +140 voor succesvolle landing

    • -100 voor crash

    • Kleine negatieve rewards voor brandstofverbruik

    • Positieve rewards voor dichter bij landingszone

  • Doel: Land veilig met minimaal brandstofverbruik

# Create LunarLander environment
env_lunar = gym.make("LunarLander-v3")

# Explore the environment
state, info = env_lunar.reset(seed=42)

print("=== LunarLander-v2 Environment ===")
print(f"State space: {env_lunar.observation_space}")
print(f"Action space: {env_lunar.action_space}")
print(f"\nInitial state shape: {state.shape}")
print(f"State: {state}")
print("\nState components:")
print("  [0] X position")
print("  [1] Y position")
print("  [2] X velocity")
print("  [3] Y velocity")
print("  [4] Angle")
print("  [5] Angular velocity")
print("  [6] Left leg contact (0=no, 1=yes)")
print("  [7] Right leg contact (0=no, 1=yes)")
print("\nActions:")
print("  0: Do nothing")
print("  1: Fire left engine")
print("  2: Fire main engine")
print("  3: Fire right engine")

# Test random agent on LunarLander
print("\n=== Testing Random Agent ===")
random_rewards_lunar = evaluate_random_agent(env_lunar, n_episodes=20, seed=42)
print(f"Random Agent: {np.mean(random_rewards_lunar):.2f} ± {np.std(random_rewards_lunar):.2f}")
print("(Note: Negative rewards mean crashes!)")

Training met PPO op LunarLander

# Train PPO on LunarLander
model_lunar = PPO(
    "MlpPolicy",
    env_lunar,
    learning_rate=3e-4,
    n_steps=1024,
    batch_size=64,
    n_epochs=4,
    gamma=0.999,
    gae_lambda=0.98,
    clip_range=0.2,
    verbose=0,
)

# Train for longer since this is more complex
model_lunar.learn(total_timesteps=500000, progress_bar=True)

# Evaluate (wrap env with Monitor to avoid warning)
eval_env_lunar = Monitor(gym.make("LunarLander-v3"))
mean_reward_lunar, std_reward_lunar = evaluate_policy(
    model_lunar, eval_env_lunar, n_eval_episodes=50, deterministic=True
)
eval_env_lunar.close()

print(f"Random Agent: {np.mean(random_rewards_lunar):.2f} ± {np.std(random_rewards_lunar):.2f}")
print(f"Trained PPO:  {mean_reward_lunar:.2f} ± {std_reward_lunar:.2f}")
print("\nNote: Score > 200 is considered solved!")
status = "SOLVED ✓" if mean_reward_lunar > 200 else "Needs more training"
print(f"Status: {status}")

# Visualize performance
df_lunar = pd.DataFrame(
    {
        "Algorithm": ["Random", "PPO"],
        "Mean Reward": [np.mean(random_rewards_lunar), mean_reward_lunar],
        "Std": [np.std(random_rewards_lunar), std_reward_lunar],
    }
)
fig = px.bar(
    df_lunar,
    x="Algorithm",
    y="Mean Reward",
    error_y="Std",
    title="LunarLander-v2 Performance",
    color="Algorithm",
    color_discrete_map={"Random": "gray", "PPO": "green"},
)
fig.add_hline(y=200, line_dash="dash", line_color="red", annotation_text="Solved threshold (200)")
fig.add_hline(y=0, line_color="black")
fig.show()

Visualisatie

# Visualize trained LunarLander agent
env_lunar_render = gym.make("LunarLander-v3", render_mode="rgb_array")
frames_lunar, rewards_lunar, actions_lunar = visualize_agent(
    model_lunar, env_lunar_render, n_steps=500
)
env_lunar_render.close()

print("\n=== Episode Analysis ===")
print(f"Episode length: {len(rewards_lunar)} steps")
print(f"Total reward: {sum(rewards_lunar):.1f}")
print(
    f"Final outcome: {'SUCCESS ✓' if sum(rewards_lunar) > 200 else 'CRASH' if sum(rewards_lunar) < 0 else 'PARTIAL'}"
)
print("\nAction usage:")
action_names = ["Do nothing", "Left engine", "Main engine", "Right engine"]
for action_id, action_name in enumerate(action_names):
    count = actions_lunar.count(action_id)
    percentage = (count / len(actions_lunar)) * 100
    print(f"  {action_name}: {count} times ({percentage:.1f}%)")

# Show action distribution
df_actions = pd.DataFrame(
    {"Action": action_names, "Count": [actions_lunar.count(i) for i in range(4)]}
)
px.bar(
    df_actions,
    x="Action",
    y="Count",
    title=f"LunarLander Action Distribution (Total Reward: {sum(rewards_lunar):.1f})",
).show()
# Create animation of the trained LunarLander agent
print("Creating LunarLander animation...")
anim_lunar = create_animation(frames_lunar, interval=50)

# Display the animation
HTML(anim_lunar.to_jshtml())