Hierarchical Skills Tutorial

This tutorial provides an introductory guide to using AgileRL to learn skills and apply these in a hierarchical fashion. AgileRL’s single-agent algorithms allow users to train agents to perform specific skills, and then combine these in a learned order to achieve an outcome.

Hierarchical reinforcement learning can be used to learn to complete tasks efficiently by breaking down problems into smaller sub-tasks or ‘skills’. For example, to train a robot hand to create a stack of blocks from scratch is very difficult - but by breaking this down into stages such as moving fingers, grasping a block, lifting the block and building a tower, we can guide agents to perform the overall task. We can also then allow an agent to learn which order to perform these tasks in to achieve the overall goal.

../../_images/LunarLander-v2_stabilize.gif

Stabilize skill

../../_images/LunarLander-v2_center.gif

Center skill

../../_images/LunarLander-v2_landing.gif

Landing skill

../../_images/LunarLander-v2_selector.gif

Selector - combining skills

This tutorial uses hierarchical reinforcement learning to train agents to solve the LunarLander Gymnasium environment. The overall task is broken down into three skills which are each trained separately:

  • Stabilize the lander, minimizing movement in all directions

  • Move to the center of the environment in the x-direction

  • Land on the landing pad

A selector agent is also trained to select which skill to execute.

../../_images/LunarLander_hierarchy.png

Fig1: Hierarchical structure of LunarLander agents

Code

The following code should run without any issues. The comments are designed to help you understand how to use PettingZoo with AgileRL. If you have any questions, please feel free to ask in the Discord server.

Imports

Importing the following packages, functions and classes will enable us to run the tutorial.

Imports
import os
from datetime import datetime

import numpy as np
import torch
import wandb
from tqdm import trange

from agilerl.algorithms.ppo import PPO
from agilerl.training.train_on_policy import train_on_policy
from agilerl.utils.utils import initialPopulation, makeSkillVectEnvs, makeVectEnvs
from agilerl.wrappers.learning import Skill

Defining skills

To define the skills to be learned by our agent, we modify the reward from our environment. This is a form of curriculum learning. For example, if we want the agent to learn the skill of moving to the center of the environment in the x-direction, then we can introduce a negative reward scaled by the distance from the lander to the center.

To define these skills, we can use the AgileRL Skill class, which acts as a wrapper around the environment. Just the skill_reward method needs to be overwritten in order to encourage our agent to learn what we want. This method takes the inputs and returns the outputs observation, reward, terminated, truncated, info. We can also define variables in the class constructor. In this tutorial, for the LunarLander environment, we define three skills: StabilizeSkill, CenterSkill and LandingSkill.

Stabilize
class StabilizeSkill(Skill):
   def __init__(self, env):
      super().__init__(env)

      self.theta_level = 0
      self.history = {"x": [], "y": [], "theta": []}

   def skill_reward(self, observation, reward, terminated, truncated, info):
      if terminated or truncated:
            reward = -100.0
            self.history = {"x": [], "y": [], "theta": []}
            return observation, reward, terminated, truncated, info

      reward, terminated, truncated = 1.0, 0, 0
      x, y, theta = observation[0], observation[1], observation[4]

      # Ensure there are previous observations to compare with
      if len(self.history["x"]) == 0:
            self.history["x"].append(x)
            self.history["y"].append(y)
            self.history["theta"].append(theta)
            return observation, reward, terminated, truncated, info

      # Minimise x movement
      reward -= (abs(self.history["x"][-1] - x) * 10) ** 2
      # Minimise y movement
      reward -= (abs(self.history["y"][-1] - y) * 10) ** 2
      # Minimise tilt angle
      reward -= (abs(self.history["theta"][-1] - theta) * 10) ** 2

      self.history["x"].append(x)
      self.history["y"].append(y)
      self.history["theta"].append(theta)

      # Reset episode if longer than 300 steps
      if len(self.history["x"]) > 300:
            reward = 10.0
            terminated = True
            self.history = {"x": [], "y": [], "theta": []}
            self.env.reset()

      return observation, reward, terminated, truncated, info
Center
class CenterSkill(Skill):
   def __init__(self, env):
      super().__init__(env)

      self.x_center = 0
      self.history = {"y": [], "theta": []}

   def skill_reward(self, observation, reward, terminated, truncated, info):
      if terminated or truncated:
            reward = -1000.0
            self.history = {"y": [], "theta": []}
            return observation, reward, terminated, truncated, info

      reward, terminated, truncated = 1.0, 0, 0
      x, y, theta = observation[0], observation[1], observation[4]

      # Ensure there are previous observations to compare with
      if len(self.history["y"]) == 0:
            self.history["y"].append(y)
            self.history["theta"].append(theta)
            return observation, reward, terminated, truncated, info

      # Minimise x distance to center
      reward -= abs((self.x_center - x) * 2) ** 2
      # Minimise y movement
      reward -= (abs(self.history["y"][-1] - y) * 10) ** 2
      # Minimise tilt angle
      reward -= (abs(self.history["theta"][-1] - theta) * 10) ** 2

      self.history["y"].append(y)
      self.history["theta"].append(theta)

      # Reset episode if longer than 300 steps
      if len(self.history["y"]) > 300:
            reward = 10.0
            terminated = True
            self.history = {"y": [], "theta": []}
            self.env.reset()

      return observation, reward, terminated, truncated, info
Landing
class LandingSkill(Skill):
   def __init__(self, env):
      super().__init__(env)

      self.x_landing = 0
      self.y_landing = 0
      self.theta_level = 0

   def skill_reward(self, observation, reward, terminated, truncated, info):
      if terminated or truncated:
            return observation, reward, terminated, truncated, info

      x, y, theta = observation[0], observation[1], observation[4]
      reward, terminated, truncated = 1.0, 0, 0

      # Minimise x distance to landing zone
      reward -= (abs(self.x_landing - x)) ** 2
      # Minimise y distance to landing zone
      reward -= (abs(self.y_landing - y)) ** 2
      # Minimise tilt angle
      reward -= abs(self.theta_level - theta)

      return observation, reward, terminated, truncated, info

Training skills

Once the skills have been defined, training agents to solve them is very straightforward using AgileRL. In this tutorial we will train PPO agents, but this is equally possible with any on- or off-policy single-agent algorithm.

Training skills individually

First define the initial hyperparameters and skill objects:

NET_CONFIG = {
   "arch": "mlp",  # Network architecture
   "hidden_size": [64, 64],  # Actor hidden size
}

INIT_HP = {
   "ENV_NAME": "LunarLander-v2",
   "ALGO": "PPO",
   "POPULATION_SIZE": 1,  # Population size
   "DISCRETE_ACTIONS": True,  # Discrete action space
   "BATCH_SIZE": 128,  # Batch size
   "LR": 1e-3,  # Learning rate
   "GAMMA": 0.99,  # Discount factor
   "GAE_LAMBDA": 0.95,  # Lambda for general advantage estimation
   "ACTION_STD_INIT": 0.6,  # Initial action standard deviation
   "CLIP_COEF": 0.2,  # Surrogate clipping coefficient
   "ENT_COEF": 0.01,  # Entropy coefficient
   "VF_COEF": 0.5,  # Value function coefficient
   "MAX_GRAD_NORM": 0.5,  # Maximum norm for gradient clipping
   "TARGET_KL": None,  # Target KL divergence threshold
   "TARGET_SCORE": 2000,
   "EPISODES": 1000,
   "EVO_EPOCHS": 5,
   "UPDATE_EPOCHS": 4,  # Number of policy update epochs
   # Swap image channels dimension from last to first [H, W, C] -> [C, H, W]
   "CHANNELS_LAST": False,
   "WANDB": True,
}

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Directory to save trained agents and skills
save_dir = "./models/PPO"
os.makedirs(save_dir, exist_ok=True)

skills = {
   "stabilize": StabilizeSkill,
   "center": CenterSkill,
   "landing": LandingSkill,
}

Now loop through the skills and use the AgileRL training function to efficiently train for each one.

for skill in skills.keys():
   env = makeSkillVectEnvs(
         INIT_HP["ENV_NAME"], skills[skill], num_envs=1
   )  # Create environment

   try:
         state_dim = env.single_observation_space.n  # Discrete observation space
         one_hot = True  # Requires one-hot encoding
   except Exception:
         state_dim = (
            env.single_observation_space.shape
         )  # Continuous observation space
         one_hot = False  # Does not require one-hot encoding
   try:
         action_dim = env.single_action_space.n  # Discrete action space
   except Exception:
         action_dim = env.single_action_space.shape[0]  # Continuous action space

   if INIT_HP["CHANNELS_LAST"]:
         state_dim = (state_dim[2], state_dim[0], state_dim[1])

   pop = initialPopulation(
         algo="PPO",  # Algorithm
         state_dim=state_dim,  # State dimension
         action_dim=action_dim,  # Action dimension
         one_hot=one_hot,  # One-hot encoding
         net_config=NET_CONFIG,  # Network configuration
         INIT_HP=INIT_HP,  # Initial hyperparameters
         population_size=INIT_HP["POPULATION_SIZE"],  # Population size
         device=device,
   )

   trained_pop, pop_fitnesses = train_on_policy(
         env=env,  # Gym-style environment
         env_name=f"{INIT_HP['ENV_NAME']}-{skill}",  # Environment name
         algo=INIT_HP["ALGO"],  # Algorithm
         pop=pop,  # Population of agents
         swap_channels=INIT_HP[
            "CHANNELS_LAST"
         ],  # Swap image channel from last to first
         n_episodes=INIT_HP["EPISODES"],  # Max number of training episodes
         evo_epochs=INIT_HP["EVO_EPOCHS"],  # Evolution frequency
         evo_loop=3,  # Number of evaluation episodes per agent
         target=INIT_HP["TARGET_SCORE"],  # Target score for early stopping
         tournament=None,  # Tournament selection object
         mutation=None,  # Mutations object
         wb=INIT_HP["WANDB"],  # Weights and Biases tracking
   )

   # Save the trained algorithm
   filename = f"PPO_trained_agent_{skill}.pt"
   save_path = os.path.join(save_dir, filename)
   trained_pop[0].saveCheckpoint(save_path)

   env.close()

The selector agent

Now the skills have been learned, we can train a hierarchical selector agent to decide which skill to execute. This meta-policy should optimise the original “meta-reward” of the environment, and so we no longer need to use a skill wrapper. Instead, we can load an agent for each skill, whose policy we can execute if called upon. It is also important to define how many timesteps each skill should be executed for, before we query the meta-policy again and decide which skill to use next. These skill-agents and skill durations can be defined in a dictionary.

Loading and defining skill agents
# Now train the skill selector, which will choose which of the learned skills to use
# First load the learned skill agents
stabilize_agent = PPO.load(os.path.join(save_dir, "PPO_trained_agent_stabilize.pt"))
center_agent = PPO.load(os.path.join(save_dir, "PPO_trained_agent_center.pt"))
landing_agent = PPO.load(os.path.join(save_dir, "PPO_trained_agent_landing.pt"))

trained_skills = {
   0: {"skill": "stabilize", "agent": stabilize_agent, "skill_duration": 40},
   1: {"skill": "center", "agent": center_agent, "skill_duration": 40},
   2: {"skill": "landing", "agent": landing_agent, "skill_duration": 40},
}

Next we can define the variables we will need in our training loop.

Setting up training
env = makeVectEnvs(INIT_HP["ENV_NAME"], num_envs=1)  # Create environment

try:
   state_dim = env.single_observation_space.n  # Discrete observation space
   one_hot = True  # Requires one-hot encoding
except Exception:
   state_dim = env.single_observation_space.shape  # Continuous observation space
   one_hot = False  # Does not require one-hot encoding

action_dim = len(
   trained_skills
)  # Selector will be trained to choose which trained skill to use

if INIT_HP["CHANNELS_LAST"]:
   state_dim = (state_dim[2], state_dim[0], state_dim[1])

pop = initialPopulation(
   algo="PPO",  # Algorithm
   state_dim=state_dim,  # State dimension
   action_dim=action_dim,  # Action dimension
   one_hot=one_hot,  # One-hot encoding
   net_config=NET_CONFIG,  # Network configuration
   INIT_HP=INIT_HP,  # Initial hyperparameters
   population_size=INIT_HP["POPULATION_SIZE"],  # Population size
   device=device,
)

if INIT_HP["WANDB"]:
   wandb.init(
         # set the wandb project where this run will be logged
         project="EvoWrappers",
         name="{}-EvoHPO-{}-{}".format(
            INIT_HP["ENV_NAME"],
            INIT_HP["ALGO"],
            datetime.now().strftime("%m%d%Y%H%M%S"),
         ),
         # track hyperparameters and run metadata
         config={
            "algo": f"Evo HPO {INIT_HP['ALGO']}",
            "env": INIT_HP["ENV_NAME"],
            "INIT_HP": INIT_HP,
         },
   )

bar_format = "{l_bar}{bar:10}| {n:4}/{total_fmt} [{elapsed:>7}<{remaining:>7}, {rate_fmt}{postfix}]"
pbar = trange(
   INIT_HP["EPISODES"],
   unit="ep",
   bar_format=bar_format,
   ascii=True,
   dynamic_ncols=True,
)

total_steps = 0

Finally, we can run the training loop for the selector agent. Each skill agent’s policy is executed in the environment for the number of timesteps defined in the trained_skills dictionary.

Training the selector agent
# RL training loop
for idx_epi in pbar:
   for agent in pop:  # Loop through population
         state = env.reset()[0]  # Reset environment at start of episode
         score = 0

         states = []
         actions = []
         log_probs = []
         rewards = []
         terminations = []
         values = []

         for idx_step in range(500):
            # Get next action from agent
            action, log_prob, _, value = agent.getAction(state)

            # Internal loop to execute trained skill
            skill_agent = trained_skills[action[0]]["agent"]
            skill_duration = trained_skills[action[0]]["skill_duration"]
            reward = 0
            for skill_step in range(skill_duration):
               # If landed, do nothing
               if state[0][6] or state[0][7]:
                     next_state, skill_reward, termination, truncation, _ = env.step(
                        [0]
                     )
               else:
                     skill_action, _, _, _ = skill_agent.getAction(state)
                     next_state, skill_reward, termination, truncation, _ = env.step(
                        skill_action
                     )  # Act in environment
               reward += skill_reward
               if np.any(termination) or np.any(truncation):
                     break
               state = next_state
            score += reward

            states.append(state)
            actions.append(action)
            log_probs.append(log_prob)
            rewards.append(reward)
            terminations.append(termination)
            values.append(value)

         agent.scores.append(score)

         # Learn according to agent's RL algorithm
         agent.learn(
            (
               states,
               actions,
               log_probs,
               rewards,
               terminations,
               values,
               next_state,
            )
         )

         agent.steps[-1] += idx_step + 1
         total_steps += idx_step + 1

   if (idx_epi + 1) % INIT_HP["EVO_EPOCHS"] == 0:
         mean_scores = np.mean([agent.scores[-20:] for agent in pop], axis=1)
         if INIT_HP["WANDB"]:
            wandb.log(
               {
                     "global_step": total_steps,
                     "train/mean_score": np.mean(mean_scores),
               }
            )
         print(
            f"""
            --- Epoch {idx_epi + 1} ---
            Score avgs:\t{mean_scores}
            Steps:\t\t{total_steps}
            """,
            end="\r",
         )

if INIT_HP["WANDB"]:
   wandb.finish()
env.close()

# Save the trained selector
filename = "PPO_trained_agent_selector.pt"
save_path = os.path.join(save_dir, filename)
pop[0].saveCheckpoint(save_path)

Trained model weights

Trained model weights are provided in our GitHub repository at AgileRL/tutorials/Skills/models. Take a look and see if you can achieve better performance!

Rendering agents

We can visualise the performance of the skills agents individually, or when combined by the selector agent, as a gif.

Rendering individual skills
import os

import gymnasium as gym
import imageio
import numpy as np
import torch
from PIL import Image

from agilerl.algorithms.ppo import PPO


# Resizes frames to make file size smaller
def resize_frames(frames, fraction):
    resized_frames = []
    for frame in frames:
        img = Image.fromarray(frame)
        new_width = int(img.width * fraction)
        new_height = int(img.height * fraction)
        img_resized = img.resize((new_width, new_height))
        resized_frames.append(np.array(img_resized))

    return resized_frames


if __name__ == "__main__":
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    env = gym.make("LunarLander-v2", render_mode="rgb_array")

    skills = ["stabilize", "center", "landing"]

    for skill in skills:
        # Load the saved algorithm into the PPO object
        path = f"./models/PPO/PPO_trained_agent_{skill}.pt"  # Path to saved agent checkpoint
        agent = PPO.load(path)

        # Define test loop parameters
        episodes = 3  # Number of episodes to test agent on
        max_steps = (
            500  # Max number of steps to take in the environment in each episode
        )

        rewards = []  # List to collect total episodic reward
        frames = []  # List to collect frames

        print("============================================")
        print(f"Skill: {skill}")

        # Test loop for inference
        for ep in range(episodes):
            state, _ = env.reset()  # Reset environment at start of episode
            frames.append(env.render())
            score = 0
            for idx_step in range(max_steps):
                # Get next action from agent
                if state[6] or state[7]:
                    action = [0]
                else:
                    action, log_prob, _, value = agent.getAction(state)
                next_state, reward, termination, truncation, _ = env.step(
                    action[0]
                )  # Act in environment

                # Save the frame for this step and append to frames list
                frames.append(env.render())

                score += reward

                # Stop episode if any agents have terminated
                if termination or truncation:
                    break

                state = next_state

            print("-" * 15, f"Episode: {ep+1}", "-" * 15)
            print(f"Episode length: {idx_step}")
            print(f"Score: {score}")

        print("============================================")

        frames = frames[::2]

        # Save the gif to specified path
        gif_path = "./videos/"
        os.makedirs(gif_path, exist_ok=True)
        imageio.mimwrite(
            os.path.join("./videos/", f"LunarLander-v2_{skill}.gif"),
            frames,
            duration=40,
            loop=0,
        )

    env.close()
Rendering the hierarchical policy
import os

import gymnasium as gym
import imageio
import numpy as np
import torch
from PIL import Image, ImageDraw, ImageFont

from agilerl.algorithms.ppo import PPO


# Resizes frames to make file size smaller
def resize_frames(frames, fraction):
    resized_frames = []
    for frame in frames:
        img = Image.fromarray(frame)
        new_width = int(img.width * fraction)
        new_height = int(img.height * fraction)
        img_resized = img.resize((new_width, new_height))
        resized_frames.append(np.array(img_resized))

    return resized_frames


def add_text_to_image(
    image_array, text, position, font_size=30, font_color=(153, 255, 255)
):
    """Add text to an image represented as a numpy array.

    :param image_array: numpy array of the image.
    :param text: string of text to add.
    :param position: tuple (x, y) for the position of the text.
    :param font_size: size of the font. Default is 20.
    :param font_color: color of the font in BGR (not RGB). Default is yellow (153, 255, 255).
    :return: Modified image as numpy array.
    """
    image = Image.fromarray(image_array)
    try:
        font = ImageFont.truetype("arial.ttf", font_size)
    except OSError:
        font = ImageFont.load_default()
    draw = ImageDraw.Draw(image)
    draw.text(position, text, font=font, fill=font_color)
    modified_image_array = np.array(image)

    return modified_image_array


if __name__ == "__main__":
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    env = gym.make("LunarLander-v2", render_mode="rgb_array")

    stabilize_agent = PPO.load("./models/PPO/PPO_trained_agent_stabilize.pt")
    center_agent = PPO.load("./models/PPO/PPO_trained_agent_center.pt")
    landing_agent = PPO.load("./models/PPO/PPO_trained_agent_landing.pt")

    trained_skills = {
        0: {"skill": "stabilize", "agent": stabilize_agent, "skill_duration": 40},
        1: {"skill": "center", "agent": center_agent, "skill_duration": 40},
        2: {"skill": "landing", "agent": landing_agent, "skill_duration": 40},
    }

    # Load the saved algorithm into the PPO object
    selector_path = (
        "./models/PPO/PPO_trained_agent_selector.pt"  # Path to saved agent checkpoint
    )
    agent = PPO.load(selector_path)

    # Define test loop parameters
    episodes = 3  # Number of episodes to test agent on
    max_steps = 100  # Max number of steps to take in the environment in each episode

    rewards = []  # List to collect total episodic reward
    frames = []  # List to collect frames

    print("============================================")
    print("Skill selector")

    # Test loop for inference
    for ep in range(episodes):
        state, _ = env.reset()  # Reset environment at start of episode
        frames.append(env.render())
        score = 0
        steps = 0
        for idx_step in range(max_steps):
            # Get next action from agent
            action, log_prob, _, value = agent.getAction(state)

            # Internal loop to execute trained skill
            skill_name = trained_skills[action[0]]["skill"]
            skill_agent = trained_skills[action[0]]["agent"]
            skill_duration = trained_skills[action[0]]["skill_duration"]
            reward = 0
            for skill_step in range(skill_duration):
                if state[6] or state[7]:
                    next_state, skill_reward, termination, truncation, _ = env.step(0)
                else:
                    skill_action, _, _, _ = skill_agent.getAction(state)
                    next_state, skill_reward, termination, truncation, _ = env.step(
                        skill_action[0]
                    )  # Act in environment

                # Save the frame for this step and append to frames list
                frame = env.render()
                frame = add_text_to_image(frame, skill_name, (450, 35))
                frames.append(frame)

                reward += skill_reward
                steps += 1
                if termination or truncation:
                    break
                state = next_state
            score += reward

            # Stop episode if any agents have terminated
            if termination or truncation:
                break

            state = next_state

        print("-" * 15, f"Episode: {ep+1}", "-" * 15)
        print(f"Episode length: {steps}")
        print(f"Score: {score}")

    print("============================================")

    # frames = resize_frames(frames, 0.5)
    frames = frames[::2]

    # Save the gif to specified path
    gif_path = "./videos/"
    os.makedirs(gif_path, exist_ok=True)
    imageio.mimwrite(
        os.path.join("./videos/", "LunarLander-v2_selector.gif"),
        frames,
        duration=40,
        loop=0,
    )

    env.close()

Full training code

Full code
import os
from datetime import datetime

import numpy as np
import torch
import wandb
from tqdm import trange

from agilerl.algorithms.ppo import PPO
from agilerl.training.train_on_policy import train_on_policy
from agilerl.utils.utils import initialPopulation, makeSkillVectEnvs, makeVectEnvs
from agilerl.wrappers.learning import Skill


class StabilizeSkill(Skill):
    def __init__(self, env):
        super().__init__(env)

        self.theta_level = 0
        self.history = {"x": [], "y": [], "theta": []}

    def skill_reward(self, observation, reward, terminated, truncated, info):
        if terminated or truncated:
            reward = -100.0
            self.history = {"x": [], "y": [], "theta": []}
            return observation, reward, terminated, truncated, info

        reward, terminated, truncated = 1.0, 0, 0
        x, y, theta = observation[0], observation[1], observation[4]

        # Ensure there are previous observations to compare with
        if len(self.history["x"]) == 0:
            self.history["x"].append(x)
            self.history["y"].append(y)
            self.history["theta"].append(theta)
            return observation, reward, terminated, truncated, info

        # Minimise x movement
        reward -= (abs(self.history["x"][-1] - x) * 10) ** 2
        # Minimise y movement
        reward -= (abs(self.history["y"][-1] - y) * 10) ** 2
        # Minimise tilt angle
        reward -= (abs(self.history["theta"][-1] - theta) * 10) ** 2

        self.history["x"].append(x)
        self.history["y"].append(y)
        self.history["theta"].append(theta)

        # Reset episode if longer than 300 steps
        if len(self.history["x"]) > 300:
            reward = 10.0
            terminated = True
            self.history = {"x": [], "y": [], "theta": []}
            self.env.reset()

        return observation, reward, terminated, truncated, info


class CenterSkill(Skill):
    def __init__(self, env):
        super().__init__(env)

        self.x_center = 0
        self.history = {"y": [], "theta": []}

    def skill_reward(self, observation, reward, terminated, truncated, info):
        if terminated or truncated:
            reward = -1000.0
            self.history = {"y": [], "theta": []}
            return observation, reward, terminated, truncated, info

        reward, terminated, truncated = 1.0, 0, 0
        x, y, theta = observation[0], observation[1], observation[4]

        # Ensure there are previous observations to compare with
        if len(self.history["y"]) == 0:
            self.history["y"].append(y)
            self.history["theta"].append(theta)
            return observation, reward, terminated, truncated, info

        # Minimise x distance to center
        reward -= abs((self.x_center - x) * 2) ** 2
        # Minimise y movement
        reward -= (abs(self.history["y"][-1] - y) * 10) ** 2
        # Minimise tilt angle
        reward -= (abs(self.history["theta"][-1] - theta) * 10) ** 2

        self.history["y"].append(y)
        self.history["theta"].append(theta)

        # Reset episode if longer than 300 steps
        if len(self.history["y"]) > 300:
            reward = 10.0
            terminated = True
            self.history = {"y": [], "theta": []}
            self.env.reset()

        return observation, reward, terminated, truncated, info


class LandingSkill(Skill):
    def __init__(self, env):
        super().__init__(env)

        self.x_landing = 0
        self.y_landing = 0
        self.theta_level = 0

    def skill_reward(self, observation, reward, terminated, truncated, info):
        if terminated or truncated:
            return observation, reward, terminated, truncated, info

        x, y, theta = observation[0], observation[1], observation[4]
        reward, terminated, truncated = 1.0, 0, 0

        # Minimise x distance to landing zone
        reward -= (abs(self.x_landing - x)) ** 2
        # Minimise y distance to landing zone
        reward -= (abs(self.y_landing - y)) ** 2
        # Minimise tilt angle
        reward -= abs(self.theta_level - theta)

        return observation, reward, terminated, truncated, info


if __name__ == "__main__":
    NET_CONFIG = {
        "arch": "mlp",  # Network architecture
        "hidden_size": [64, 64],  # Actor hidden size
    }

    INIT_HP = {
        "ENV_NAME": "LunarLander-v2",
        "ALGO": "PPO",
        "POPULATION_SIZE": 1,  # Population size
        "DISCRETE_ACTIONS": True,  # Discrete action space
        "BATCH_SIZE": 128,  # Batch size
        "LR": 1e-3,  # Learning rate
        "GAMMA": 0.99,  # Discount factor
        "GAE_LAMBDA": 0.95,  # Lambda for general advantage estimation
        "ACTION_STD_INIT": 0.6,  # Initial action standard deviation
        "CLIP_COEF": 0.2,  # Surrogate clipping coefficient
        "ENT_COEF": 0.01,  # Entropy coefficient
        "VF_COEF": 0.5,  # Value function coefficient
        "MAX_GRAD_NORM": 0.5,  # Maximum norm for gradient clipping
        "TARGET_KL": None,  # Target KL divergence threshold
        "TARGET_SCORE": 2000,
        "EPISODES": 1000,
        "EVO_EPOCHS": 5,
        "UPDATE_EPOCHS": 4,  # Number of policy update epochs
        # Swap image channels dimension from last to first [H, W, C] -> [C, H, W]
        "CHANNELS_LAST": False,
        "WANDB": True,
    }

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Directory to save trained agents and skills
    save_dir = "./models/PPO"
    os.makedirs(save_dir, exist_ok=True)

    skills = {
        "stabilize": StabilizeSkill,
        "center": CenterSkill,
        "landing": LandingSkill,
    }

    for skill in skills.keys():
        env = makeSkillVectEnvs(
            INIT_HP["ENV_NAME"], skills[skill], num_envs=1
        )  # Create environment

        try:
            state_dim = env.single_observation_space.n  # Discrete observation space
            one_hot = True  # Requires one-hot encoding
        except Exception:
            state_dim = (
                env.single_observation_space.shape
            )  # Continuous observation space
            one_hot = False  # Does not require one-hot encoding
        try:
            action_dim = env.single_action_space.n  # Discrete action space
        except Exception:
            action_dim = env.single_action_space.shape[0]  # Continuous action space

        if INIT_HP["CHANNELS_LAST"]:
            state_dim = (state_dim[2], state_dim[0], state_dim[1])

        pop = initialPopulation(
            algo="PPO",  # Algorithm
            state_dim=state_dim,  # State dimension
            action_dim=action_dim,  # Action dimension
            one_hot=one_hot,  # One-hot encoding
            net_config=NET_CONFIG,  # Network configuration
            INIT_HP=INIT_HP,  # Initial hyperparameters
            population_size=INIT_HP["POPULATION_SIZE"],  # Population size
            device=device,
        )

        trained_pop, pop_fitnesses = train_on_policy(
            env=env,  # Gym-style environment
            env_name=f"{INIT_HP['ENV_NAME']}-{skill}",  # Environment name
            algo=INIT_HP["ALGO"],  # Algorithm
            pop=pop,  # Population of agents
            swap_channels=INIT_HP[
                "CHANNELS_LAST"
            ],  # Swap image channel from last to first
            n_episodes=INIT_HP["EPISODES"],  # Max number of training episodes
            evo_epochs=INIT_HP["EVO_EPOCHS"],  # Evolution frequency
            evo_loop=3,  # Number of evaluation episodes per agent
            target=INIT_HP["TARGET_SCORE"],  # Target score for early stopping
            tournament=None,  # Tournament selection object
            mutation=None,  # Mutations object
            wb=INIT_HP["WANDB"],  # Weights and Biases tracking
        )

        # Save the trained algorithm
        filename = f"PPO_trained_agent_{skill}.pt"
        save_path = os.path.join(save_dir, filename)
        trained_pop[0].saveCheckpoint(save_path)

        env.close()

    # Now train the skill selector, which will choose which of the learned skills to use
    # First load the learned skill agents
    stabilize_agent = PPO.load(os.path.join(save_dir, "PPO_trained_agent_stabilize.pt"))
    center_agent = PPO.load(os.path.join(save_dir, "PPO_trained_agent_center.pt"))
    landing_agent = PPO.load(os.path.join(save_dir, "PPO_trained_agent_landing.pt"))

    trained_skills = {
        0: {"skill": "stabilize", "agent": stabilize_agent, "skill_duration": 40},
        1: {"skill": "center", "agent": center_agent, "skill_duration": 40},
        2: {"skill": "landing", "agent": landing_agent, "skill_duration": 40},
    }

    env = makeVectEnvs(INIT_HP["ENV_NAME"], num_envs=1)  # Create environment

    try:
        state_dim = env.single_observation_space.n  # Discrete observation space
        one_hot = True  # Requires one-hot encoding
    except Exception:
        state_dim = env.single_observation_space.shape  # Continuous observation space
        one_hot = False  # Does not require one-hot encoding

    action_dim = len(
        trained_skills
    )  # Selector will be trained to choose which trained skill to use

    if INIT_HP["CHANNELS_LAST"]:
        state_dim = (state_dim[2], state_dim[0], state_dim[1])

    pop = initialPopulation(
        algo="PPO",  # Algorithm
        state_dim=state_dim,  # State dimension
        action_dim=action_dim,  # Action dimension
        one_hot=one_hot,  # One-hot encoding
        net_config=NET_CONFIG,  # Network configuration
        INIT_HP=INIT_HP,  # Initial hyperparameters
        population_size=INIT_HP["POPULATION_SIZE"],  # Population size
        device=device,
    )

    if INIT_HP["WANDB"]:
        wandb.init(
            # set the wandb project where this run will be logged
            project="EvoWrappers",
            name="{}-EvoHPO-{}-{}".format(
                INIT_HP["ENV_NAME"],
                INIT_HP["ALGO"],
                datetime.now().strftime("%m%d%Y%H%M%S"),
            ),
            # track hyperparameters and run metadata
            config={
                "algo": f"Evo HPO {INIT_HP['ALGO']}",
                "env": INIT_HP["ENV_NAME"],
                "INIT_HP": INIT_HP,
            },
        )

    bar_format = "{l_bar}{bar:10}| {n:4}/{total_fmt} [{elapsed:>7}<{remaining:>7}, {rate_fmt}{postfix}]"
    pbar = trange(
        INIT_HP["EPISODES"],
        unit="ep",
        bar_format=bar_format,
        ascii=True,
        dynamic_ncols=True,
    )

    total_steps = 0

    # RL training loop
    for idx_epi in pbar:
        for agent in pop:  # Loop through population
            state = env.reset()[0]  # Reset environment at start of episode
            score = 0

            states = []
            actions = []
            log_probs = []
            rewards = []
            terminations = []
            values = []

            for idx_step in range(500):
                # Get next action from agent
                action, log_prob, _, value = agent.getAction(state)

                # Internal loop to execute trained skill
                skill_agent = trained_skills[action[0]]["agent"]
                skill_duration = trained_skills[action[0]]["skill_duration"]
                reward = 0
                for skill_step in range(skill_duration):
                    # If landed, do nothing
                    if state[0][6] or state[0][7]:
                        next_state, skill_reward, termination, truncation, _ = env.step(
                            [0]
                        )
                    else:
                        skill_action, _, _, _ = skill_agent.getAction(state)
                        next_state, skill_reward, termination, truncation, _ = env.step(
                            skill_action
                        )  # Act in environment
                    reward += skill_reward
                    if np.any(termination) or np.any(truncation):
                        break
                    state = next_state
                score += reward

                states.append(state)
                actions.append(action)
                log_probs.append(log_prob)
                rewards.append(reward)
                terminations.append(termination)
                values.append(value)

            agent.scores.append(score)

            # Learn according to agent's RL algorithm
            agent.learn(
                (
                    states,
                    actions,
                    log_probs,
                    rewards,
                    terminations,
                    values,
                    next_state,
                )
            )

            agent.steps[-1] += idx_step + 1
            total_steps += idx_step + 1

        if (idx_epi + 1) % INIT_HP["EVO_EPOCHS"] == 0:
            mean_scores = np.mean([agent.scores[-20:] for agent in pop], axis=1)
            if INIT_HP["WANDB"]:
                wandb.log(
                    {
                        "global_step": total_steps,
                        "train/mean_score": np.mean(mean_scores),
                    }
                )
            print(
                f"""
                --- Epoch {idx_epi + 1} ---
                Score avgs:\t{mean_scores}
                Steps:\t\t{total_steps}
                """,
                end="\r",
            )

    if INIT_HP["WANDB"]:
        wandb.finish()
    env.close()

    # Save the trained selector
    filename = "PPO_trained_agent_selector.pt"
    save_path = os.path.join(save_dir, filename)
    pop[0].saveCheckpoint(save_path)