Hierarchical Skills Tutorial¶
This tutorial provides an introductory guide to using AgileRL to learn skills and apply these in a hierarchical fashion. AgileRL’s single-agent algorithms allow users to train agents to perform specific skills, and then combine these in a learned order to achieve an outcome.
Hierarchical reinforcement learning can be used to learn to complete tasks efficiently by breaking down problems into smaller sub-tasks or ‘skills’. For example, to train a robot hand to create a stack of blocks from scratch is very difficult - but by breaking this down into stages such as moving fingers, grasping a block, lifting the block and building a tower, we can guide agents to perform the overall task. We can also then allow an agent to learn which order to perform these tasks in to achieve the overall goal.
This tutorial uses hierarchical reinforcement learning to train agents to solve the LunarLander Gymnasium environment. The overall task is broken down into three skills which are each trained separately:
Stabilize the lander, minimizing movement in all directions
Move to the center of the environment in the x-direction
Land on the landing pad
A selector agent is also trained to select which skill to execute.
Code¶
The following code should run without any issues. The comments are designed to help you understand how to use PettingZoo with AgileRL. If you have any questions, please feel free to ask in the Discord server.
Imports¶
Importing the following packages, functions and classes will enable us to run the tutorial.
Imports
import os
from datetime import datetime
import numpy as np
import torch
import wandb
from tqdm import trange
from agilerl.algorithms.ppo import PPO
from agilerl.training.train_on_policy import train_on_policy
from agilerl.utils.utils import initialPopulation, makeSkillVectEnvs, makeVectEnvs
from agilerl.wrappers.learning import Skill
Defining skills¶
To define the skills to be learned by our agent, we modify the reward from our environment. This is a form of curriculum learning. For example, if we want the agent to learn the skill of moving to the center of the environment in the x-direction, then we can introduce a negative reward scaled by the distance from the lander to the center.
To define these skills, we can use the AgileRL Skill
class, which acts as a wrapper around the environment. Just the skill_reward
method needs to be overwritten in order to encourage our agent to learn what we want.
This method takes the inputs and returns the outputs observation
, reward
, terminated
, truncated
, info
. We can also define variables in the class constructor. In this tutorial, for the LunarLander environment,
we define three skills: StabilizeSkill
, CenterSkill
and LandingSkill
.
Stabilize
class StabilizeSkill(Skill):
def __init__(self, env):
super().__init__(env)
self.theta_level = 0
self.history = {"x": [], "y": [], "theta": []}
def skill_reward(self, observation, reward, terminated, truncated, info):
if terminated or truncated:
reward = -100.0
self.history = {"x": [], "y": [], "theta": []}
return observation, reward, terminated, truncated, info
reward, terminated, truncated = 1.0, 0, 0
x, y, theta = observation[0], observation[1], observation[4]
# Ensure there are previous observations to compare with
if len(self.history["x"]) == 0:
self.history["x"].append(x)
self.history["y"].append(y)
self.history["theta"].append(theta)
return observation, reward, terminated, truncated, info
# Minimise x movement
reward -= (abs(self.history["x"][-1] - x) * 10) ** 2
# Minimise y movement
reward -= (abs(self.history["y"][-1] - y) * 10) ** 2
# Minimise tilt angle
reward -= (abs(self.history["theta"][-1] - theta) * 10) ** 2
self.history["x"].append(x)
self.history["y"].append(y)
self.history["theta"].append(theta)
# Reset episode if longer than 300 steps
if len(self.history["x"]) > 300:
reward = 10.0
terminated = True
self.history = {"x": [], "y": [], "theta": []}
self.env.reset()
return observation, reward, terminated, truncated, info
Center
class CenterSkill(Skill):
def __init__(self, env):
super().__init__(env)
self.x_center = 0
self.history = {"y": [], "theta": []}
def skill_reward(self, observation, reward, terminated, truncated, info):
if terminated or truncated:
reward = -1000.0
self.history = {"y": [], "theta": []}
return observation, reward, terminated, truncated, info
reward, terminated, truncated = 1.0, 0, 0
x, y, theta = observation[0], observation[1], observation[4]
# Ensure there are previous observations to compare with
if len(self.history["y"]) == 0:
self.history["y"].append(y)
self.history["theta"].append(theta)
return observation, reward, terminated, truncated, info
# Minimise x distance to center
reward -= abs((self.x_center - x) * 2) ** 2
# Minimise y movement
reward -= (abs(self.history["y"][-1] - y) * 10) ** 2
# Minimise tilt angle
reward -= (abs(self.history["theta"][-1] - theta) * 10) ** 2
self.history["y"].append(y)
self.history["theta"].append(theta)
# Reset episode if longer than 300 steps
if len(self.history["y"]) > 300:
reward = 10.0
terminated = True
self.history = {"y": [], "theta": []}
self.env.reset()
return observation, reward, terminated, truncated, info
Landing
class LandingSkill(Skill):
def __init__(self, env):
super().__init__(env)
self.x_landing = 0
self.y_landing = 0
self.theta_level = 0
def skill_reward(self, observation, reward, terminated, truncated, info):
if terminated or truncated:
return observation, reward, terminated, truncated, info
x, y, theta = observation[0], observation[1], observation[4]
reward, terminated, truncated = 1.0, 0, 0
# Minimise x distance to landing zone
reward -= (abs(self.x_landing - x)) ** 2
# Minimise y distance to landing zone
reward -= (abs(self.y_landing - y)) ** 2
# Minimise tilt angle
reward -= abs(self.theta_level - theta)
return observation, reward, terminated, truncated, info
Training skills¶
Once the skills have been defined, training agents to solve them is very straightforward using AgileRL. In this tutorial we will train PPO
agents, but this is equally possible with any on- or off-policy single-agent algorithm.
Training skills individually
First define the initial hyperparameters and skill objects:
NET_CONFIG = {
"arch": "mlp", # Network architecture
"hidden_size": [64, 64], # Actor hidden size
}
INIT_HP = {
"ENV_NAME": "LunarLander-v2",
"ALGO": "PPO",
"POPULATION_SIZE": 1, # Population size
"DISCRETE_ACTIONS": True, # Discrete action space
"BATCH_SIZE": 128, # Batch size
"LR": 1e-3, # Learning rate
"GAMMA": 0.99, # Discount factor
"GAE_LAMBDA": 0.95, # Lambda for general advantage estimation
"ACTION_STD_INIT": 0.6, # Initial action standard deviation
"CLIP_COEF": 0.2, # Surrogate clipping coefficient
"ENT_COEF": 0.01, # Entropy coefficient
"VF_COEF": 0.5, # Value function coefficient
"MAX_GRAD_NORM": 0.5, # Maximum norm for gradient clipping
"TARGET_KL": None, # Target KL divergence threshold
"TARGET_SCORE": 2000,
"EPISODES": 1000,
"EVO_EPOCHS": 5,
"UPDATE_EPOCHS": 4, # Number of policy update epochs
# Swap image channels dimension from last to first [H, W, C] -> [C, H, W]
"CHANNELS_LAST": False,
"WANDB": True,
}
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Directory to save trained agents and skills
save_dir = "./models/PPO"
os.makedirs(save_dir, exist_ok=True)
skills = {
"stabilize": StabilizeSkill,
"center": CenterSkill,
"landing": LandingSkill,
}
Now loop through the skills and use the AgileRL training function to efficiently train for each one.
for skill in skills.keys():
env = makeSkillVectEnvs(
INIT_HP["ENV_NAME"], skills[skill], num_envs=1
) # Create environment
try:
state_dim = env.single_observation_space.n # Discrete observation space
one_hot = True # Requires one-hot encoding
except Exception:
state_dim = (
env.single_observation_space.shape
) # Continuous observation space
one_hot = False # Does not require one-hot encoding
try:
action_dim = env.single_action_space.n # Discrete action space
except Exception:
action_dim = env.single_action_space.shape[0] # Continuous action space
if INIT_HP["CHANNELS_LAST"]:
state_dim = (state_dim[2], state_dim[0], state_dim[1])
pop = initialPopulation(
algo="PPO", # Algorithm
state_dim=state_dim, # State dimension
action_dim=action_dim, # Action dimension
one_hot=one_hot, # One-hot encoding
net_config=NET_CONFIG, # Network configuration
INIT_HP=INIT_HP, # Initial hyperparameters
population_size=INIT_HP["POPULATION_SIZE"], # Population size
device=device,
)
trained_pop, pop_fitnesses = train_on_policy(
env=env, # Gym-style environment
env_name=f"{INIT_HP['ENV_NAME']}-{skill}", # Environment name
algo=INIT_HP["ALGO"], # Algorithm
pop=pop, # Population of agents
swap_channels=INIT_HP[
"CHANNELS_LAST"
], # Swap image channel from last to first
n_episodes=INIT_HP["EPISODES"], # Max number of training episodes
evo_epochs=INIT_HP["EVO_EPOCHS"], # Evolution frequency
evo_loop=3, # Number of evaluation episodes per agent
target=INIT_HP["TARGET_SCORE"], # Target score for early stopping
tournament=None, # Tournament selection object
mutation=None, # Mutations object
wb=INIT_HP["WANDB"], # Weights and Biases tracking
)
# Save the trained algorithm
filename = f"PPO_trained_agent_{skill}.pt"
save_path = os.path.join(save_dir, filename)
trained_pop[0].saveCheckpoint(save_path)
env.close()
The selector agent¶
Now the skills have been learned, we can train a hierarchical selector agent to decide which skill to execute. This meta-policy should optimise the original “meta-reward” of the environment, and so we no longer need to use a skill wrapper. Instead, we can load an agent for each skill, whose policy we can execute if called upon. It is also important to define how many timesteps each skill should be executed for, before we query the meta-policy again and decide which skill to use next. These skill-agents and skill durations can be defined in a dictionary.
Loading and defining skill agents
# Now train the skill selector, which will choose which of the learned skills to use
# First load the learned skill agents
stabilize_agent = PPO.load(os.path.join(save_dir, "PPO_trained_agent_stabilize.pt"))
center_agent = PPO.load(os.path.join(save_dir, "PPO_trained_agent_center.pt"))
landing_agent = PPO.load(os.path.join(save_dir, "PPO_trained_agent_landing.pt"))
trained_skills = {
0: {"skill": "stabilize", "agent": stabilize_agent, "skill_duration": 40},
1: {"skill": "center", "agent": center_agent, "skill_duration": 40},
2: {"skill": "landing", "agent": landing_agent, "skill_duration": 40},
}
Next we can define the variables we will need in our training loop.
Setting up training
env = makeVectEnvs(INIT_HP["ENV_NAME"], num_envs=1) # Create environment
try:
state_dim = env.single_observation_space.n # Discrete observation space
one_hot = True # Requires one-hot encoding
except Exception:
state_dim = env.single_observation_space.shape # Continuous observation space
one_hot = False # Does not require one-hot encoding
action_dim = len(
trained_skills
) # Selector will be trained to choose which trained skill to use
if INIT_HP["CHANNELS_LAST"]:
state_dim = (state_dim[2], state_dim[0], state_dim[1])
pop = initialPopulation(
algo="PPO", # Algorithm
state_dim=state_dim, # State dimension
action_dim=action_dim, # Action dimension
one_hot=one_hot, # One-hot encoding
net_config=NET_CONFIG, # Network configuration
INIT_HP=INIT_HP, # Initial hyperparameters
population_size=INIT_HP["POPULATION_SIZE"], # Population size
device=device,
)
if INIT_HP["WANDB"]:
wandb.init(
# set the wandb project where this run will be logged
project="EvoWrappers",
name="{}-EvoHPO-{}-{}".format(
INIT_HP["ENV_NAME"],
INIT_HP["ALGO"],
datetime.now().strftime("%m%d%Y%H%M%S"),
),
# track hyperparameters and run metadata
config={
"algo": f"Evo HPO {INIT_HP['ALGO']}",
"env": INIT_HP["ENV_NAME"],
"INIT_HP": INIT_HP,
},
)
bar_format = "{l_bar}{bar:10}| {n:4}/{total_fmt} [{elapsed:>7}<{remaining:>7}, {rate_fmt}{postfix}]"
pbar = trange(
INIT_HP["EPISODES"],
unit="ep",
bar_format=bar_format,
ascii=True,
dynamic_ncols=True,
)
total_steps = 0
Finally, we can run the training loop for the selector agent. Each skill agent’s policy is executed in the environment for the number of timesteps defined in the trained_skills
dictionary.
Training the selector agent
# RL training loop
for idx_epi in pbar:
for agent in pop: # Loop through population
state = env.reset()[0] # Reset environment at start of episode
score = 0
states = []
actions = []
log_probs = []
rewards = []
terminations = []
values = []
for idx_step in range(500):
# Get next action from agent
action, log_prob, _, value = agent.getAction(state)
# Internal loop to execute trained skill
skill_agent = trained_skills[action[0]]["agent"]
skill_duration = trained_skills[action[0]]["skill_duration"]
reward = 0
for skill_step in range(skill_duration):
# If landed, do nothing
if state[0][6] or state[0][7]:
next_state, skill_reward, termination, truncation, _ = env.step(
[0]
)
else:
skill_action, _, _, _ = skill_agent.getAction(state)
next_state, skill_reward, termination, truncation, _ = env.step(
skill_action
) # Act in environment
reward += skill_reward
if np.any(termination) or np.any(truncation):
break
state = next_state
score += reward
states.append(state)
actions.append(action)
log_probs.append(log_prob)
rewards.append(reward)
terminations.append(termination)
values.append(value)
agent.scores.append(score)
# Learn according to agent's RL algorithm
agent.learn(
(
states,
actions,
log_probs,
rewards,
terminations,
values,
next_state,
)
)
agent.steps[-1] += idx_step + 1
total_steps += idx_step + 1
if (idx_epi + 1) % INIT_HP["EVO_EPOCHS"] == 0:
mean_scores = np.mean([agent.scores[-20:] for agent in pop], axis=1)
if INIT_HP["WANDB"]:
wandb.log(
{
"global_step": total_steps,
"train/mean_score": np.mean(mean_scores),
}
)
print(
f"""
--- Epoch {idx_epi + 1} ---
Score avgs:\t{mean_scores}
Steps:\t\t{total_steps}
""",
end="\r",
)
if INIT_HP["WANDB"]:
wandb.finish()
env.close()
# Save the trained selector
filename = "PPO_trained_agent_selector.pt"
save_path = os.path.join(save_dir, filename)
pop[0].saveCheckpoint(save_path)
Trained model weights¶
Trained model weights are provided in our GitHub repository at AgileRL/tutorials/Skills/models
. Take a look and see if you can achieve better performance!
Rendering agents¶
We can visualise the performance of the skills agents individually, or when combined by the selector agent, as a gif.
Rendering individual skills
import os
import gymnasium as gym
import imageio
import numpy as np
import torch
from PIL import Image
from agilerl.algorithms.ppo import PPO
# Resizes frames to make file size smaller
def resize_frames(frames, fraction):
resized_frames = []
for frame in frames:
img = Image.fromarray(frame)
new_width = int(img.width * fraction)
new_height = int(img.height * fraction)
img_resized = img.resize((new_width, new_height))
resized_frames.append(np.array(img_resized))
return resized_frames
if __name__ == "__main__":
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
env = gym.make("LunarLander-v2", render_mode="rgb_array")
skills = ["stabilize", "center", "landing"]
for skill in skills:
# Load the saved algorithm into the PPO object
path = f"./models/PPO/PPO_trained_agent_{skill}.pt" # Path to saved agent checkpoint
agent = PPO.load(path)
# Define test loop parameters
episodes = 3 # Number of episodes to test agent on
max_steps = (
500 # Max number of steps to take in the environment in each episode
)
rewards = [] # List to collect total episodic reward
frames = [] # List to collect frames
print("============================================")
print(f"Skill: {skill}")
# Test loop for inference
for ep in range(episodes):
state, _ = env.reset() # Reset environment at start of episode
frames.append(env.render())
score = 0
for idx_step in range(max_steps):
# Get next action from agent
if state[6] or state[7]:
action = [0]
else:
action, log_prob, _, value = agent.getAction(state)
next_state, reward, termination, truncation, _ = env.step(
action[0]
) # Act in environment
# Save the frame for this step and append to frames list
frames.append(env.render())
score += reward
# Stop episode if any agents have terminated
if termination or truncation:
break
state = next_state
print("-" * 15, f"Episode: {ep+1}", "-" * 15)
print(f"Episode length: {idx_step}")
print(f"Score: {score}")
print("============================================")
frames = frames[::2]
# Save the gif to specified path
gif_path = "./videos/"
os.makedirs(gif_path, exist_ok=True)
imageio.mimwrite(
os.path.join("./videos/", f"LunarLander-v2_{skill}.gif"),
frames,
duration=40,
loop=0,
)
env.close()
Rendering the hierarchical policy
import os
import gymnasium as gym
import imageio
import numpy as np
import torch
from PIL import Image, ImageDraw, ImageFont
from agilerl.algorithms.ppo import PPO
# Resizes frames to make file size smaller
def resize_frames(frames, fraction):
resized_frames = []
for frame in frames:
img = Image.fromarray(frame)
new_width = int(img.width * fraction)
new_height = int(img.height * fraction)
img_resized = img.resize((new_width, new_height))
resized_frames.append(np.array(img_resized))
return resized_frames
def add_text_to_image(
image_array, text, position, font_size=30, font_color=(153, 255, 255)
):
"""Add text to an image represented as a numpy array.
:param image_array: numpy array of the image.
:param text: string of text to add.
:param position: tuple (x, y) for the position of the text.
:param font_size: size of the font. Default is 20.
:param font_color: color of the font in BGR (not RGB). Default is yellow (153, 255, 255).
:return: Modified image as numpy array.
"""
image = Image.fromarray(image_array)
try:
font = ImageFont.truetype("arial.ttf", font_size)
except OSError:
font = ImageFont.load_default()
draw = ImageDraw.Draw(image)
draw.text(position, text, font=font, fill=font_color)
modified_image_array = np.array(image)
return modified_image_array
if __name__ == "__main__":
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
env = gym.make("LunarLander-v2", render_mode="rgb_array")
stabilize_agent = PPO.load("./models/PPO/PPO_trained_agent_stabilize.pt")
center_agent = PPO.load("./models/PPO/PPO_trained_agent_center.pt")
landing_agent = PPO.load("./models/PPO/PPO_trained_agent_landing.pt")
trained_skills = {
0: {"skill": "stabilize", "agent": stabilize_agent, "skill_duration": 40},
1: {"skill": "center", "agent": center_agent, "skill_duration": 40},
2: {"skill": "landing", "agent": landing_agent, "skill_duration": 40},
}
# Load the saved algorithm into the PPO object
selector_path = (
"./models/PPO/PPO_trained_agent_selector.pt" # Path to saved agent checkpoint
)
agent = PPO.load(selector_path)
# Define test loop parameters
episodes = 3 # Number of episodes to test agent on
max_steps = 100 # Max number of steps to take in the environment in each episode
rewards = [] # List to collect total episodic reward
frames = [] # List to collect frames
print("============================================")
print("Skill selector")
# Test loop for inference
for ep in range(episodes):
state, _ = env.reset() # Reset environment at start of episode
frames.append(env.render())
score = 0
steps = 0
for idx_step in range(max_steps):
# Get next action from agent
action, log_prob, _, value = agent.getAction(state)
# Internal loop to execute trained skill
skill_name = trained_skills[action[0]]["skill"]
skill_agent = trained_skills[action[0]]["agent"]
skill_duration = trained_skills[action[0]]["skill_duration"]
reward = 0
for skill_step in range(skill_duration):
if state[6] or state[7]:
next_state, skill_reward, termination, truncation, _ = env.step(0)
else:
skill_action, _, _, _ = skill_agent.getAction(state)
next_state, skill_reward, termination, truncation, _ = env.step(
skill_action[0]
) # Act in environment
# Save the frame for this step and append to frames list
frame = env.render()
frame = add_text_to_image(frame, skill_name, (450, 35))
frames.append(frame)
reward += skill_reward
steps += 1
if termination or truncation:
break
state = next_state
score += reward
# Stop episode if any agents have terminated
if termination or truncation:
break
state = next_state
print("-" * 15, f"Episode: {ep+1}", "-" * 15)
print(f"Episode length: {steps}")
print(f"Score: {score}")
print("============================================")
# frames = resize_frames(frames, 0.5)
frames = frames[::2]
# Save the gif to specified path
gif_path = "./videos/"
os.makedirs(gif_path, exist_ok=True)
imageio.mimwrite(
os.path.join("./videos/", "LunarLander-v2_selector.gif"),
frames,
duration=40,
loop=0,
)
env.close()
Full training code¶
Full code
import os
from datetime import datetime
import numpy as np
import torch
import wandb
from tqdm import trange
from agilerl.algorithms.ppo import PPO
from agilerl.training.train_on_policy import train_on_policy
from agilerl.utils.utils import initialPopulation, makeSkillVectEnvs, makeVectEnvs
from agilerl.wrappers.learning import Skill
class StabilizeSkill(Skill):
def __init__(self, env):
super().__init__(env)
self.theta_level = 0
self.history = {"x": [], "y": [], "theta": []}
def skill_reward(self, observation, reward, terminated, truncated, info):
if terminated or truncated:
reward = -100.0
self.history = {"x": [], "y": [], "theta": []}
return observation, reward, terminated, truncated, info
reward, terminated, truncated = 1.0, 0, 0
x, y, theta = observation[0], observation[1], observation[4]
# Ensure there are previous observations to compare with
if len(self.history["x"]) == 0:
self.history["x"].append(x)
self.history["y"].append(y)
self.history["theta"].append(theta)
return observation, reward, terminated, truncated, info
# Minimise x movement
reward -= (abs(self.history["x"][-1] - x) * 10) ** 2
# Minimise y movement
reward -= (abs(self.history["y"][-1] - y) * 10) ** 2
# Minimise tilt angle
reward -= (abs(self.history["theta"][-1] - theta) * 10) ** 2
self.history["x"].append(x)
self.history["y"].append(y)
self.history["theta"].append(theta)
# Reset episode if longer than 300 steps
if len(self.history["x"]) > 300:
reward = 10.0
terminated = True
self.history = {"x": [], "y": [], "theta": []}
self.env.reset()
return observation, reward, terminated, truncated, info
class CenterSkill(Skill):
def __init__(self, env):
super().__init__(env)
self.x_center = 0
self.history = {"y": [], "theta": []}
def skill_reward(self, observation, reward, terminated, truncated, info):
if terminated or truncated:
reward = -1000.0
self.history = {"y": [], "theta": []}
return observation, reward, terminated, truncated, info
reward, terminated, truncated = 1.0, 0, 0
x, y, theta = observation[0], observation[1], observation[4]
# Ensure there are previous observations to compare with
if len(self.history["y"]) == 0:
self.history["y"].append(y)
self.history["theta"].append(theta)
return observation, reward, terminated, truncated, info
# Minimise x distance to center
reward -= abs((self.x_center - x) * 2) ** 2
# Minimise y movement
reward -= (abs(self.history["y"][-1] - y) * 10) ** 2
# Minimise tilt angle
reward -= (abs(self.history["theta"][-1] - theta) * 10) ** 2
self.history["y"].append(y)
self.history["theta"].append(theta)
# Reset episode if longer than 300 steps
if len(self.history["y"]) > 300:
reward = 10.0
terminated = True
self.history = {"y": [], "theta": []}
self.env.reset()
return observation, reward, terminated, truncated, info
class LandingSkill(Skill):
def __init__(self, env):
super().__init__(env)
self.x_landing = 0
self.y_landing = 0
self.theta_level = 0
def skill_reward(self, observation, reward, terminated, truncated, info):
if terminated or truncated:
return observation, reward, terminated, truncated, info
x, y, theta = observation[0], observation[1], observation[4]
reward, terminated, truncated = 1.0, 0, 0
# Minimise x distance to landing zone
reward -= (abs(self.x_landing - x)) ** 2
# Minimise y distance to landing zone
reward -= (abs(self.y_landing - y)) ** 2
# Minimise tilt angle
reward -= abs(self.theta_level - theta)
return observation, reward, terminated, truncated, info
if __name__ == "__main__":
NET_CONFIG = {
"arch": "mlp", # Network architecture
"hidden_size": [64, 64], # Actor hidden size
}
INIT_HP = {
"ENV_NAME": "LunarLander-v2",
"ALGO": "PPO",
"POPULATION_SIZE": 1, # Population size
"DISCRETE_ACTIONS": True, # Discrete action space
"BATCH_SIZE": 128, # Batch size
"LR": 1e-3, # Learning rate
"GAMMA": 0.99, # Discount factor
"GAE_LAMBDA": 0.95, # Lambda for general advantage estimation
"ACTION_STD_INIT": 0.6, # Initial action standard deviation
"CLIP_COEF": 0.2, # Surrogate clipping coefficient
"ENT_COEF": 0.01, # Entropy coefficient
"VF_COEF": 0.5, # Value function coefficient
"MAX_GRAD_NORM": 0.5, # Maximum norm for gradient clipping
"TARGET_KL": None, # Target KL divergence threshold
"TARGET_SCORE": 2000,
"EPISODES": 1000,
"EVO_EPOCHS": 5,
"UPDATE_EPOCHS": 4, # Number of policy update epochs
# Swap image channels dimension from last to first [H, W, C] -> [C, H, W]
"CHANNELS_LAST": False,
"WANDB": True,
}
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Directory to save trained agents and skills
save_dir = "./models/PPO"
os.makedirs(save_dir, exist_ok=True)
skills = {
"stabilize": StabilizeSkill,
"center": CenterSkill,
"landing": LandingSkill,
}
for skill in skills.keys():
env = makeSkillVectEnvs(
INIT_HP["ENV_NAME"], skills[skill], num_envs=1
) # Create environment
try:
state_dim = env.single_observation_space.n # Discrete observation space
one_hot = True # Requires one-hot encoding
except Exception:
state_dim = (
env.single_observation_space.shape
) # Continuous observation space
one_hot = False # Does not require one-hot encoding
try:
action_dim = env.single_action_space.n # Discrete action space
except Exception:
action_dim = env.single_action_space.shape[0] # Continuous action space
if INIT_HP["CHANNELS_LAST"]:
state_dim = (state_dim[2], state_dim[0], state_dim[1])
pop = initialPopulation(
algo="PPO", # Algorithm
state_dim=state_dim, # State dimension
action_dim=action_dim, # Action dimension
one_hot=one_hot, # One-hot encoding
net_config=NET_CONFIG, # Network configuration
INIT_HP=INIT_HP, # Initial hyperparameters
population_size=INIT_HP["POPULATION_SIZE"], # Population size
device=device,
)
trained_pop, pop_fitnesses = train_on_policy(
env=env, # Gym-style environment
env_name=f"{INIT_HP['ENV_NAME']}-{skill}", # Environment name
algo=INIT_HP["ALGO"], # Algorithm
pop=pop, # Population of agents
swap_channels=INIT_HP[
"CHANNELS_LAST"
], # Swap image channel from last to first
n_episodes=INIT_HP["EPISODES"], # Max number of training episodes
evo_epochs=INIT_HP["EVO_EPOCHS"], # Evolution frequency
evo_loop=3, # Number of evaluation episodes per agent
target=INIT_HP["TARGET_SCORE"], # Target score for early stopping
tournament=None, # Tournament selection object
mutation=None, # Mutations object
wb=INIT_HP["WANDB"], # Weights and Biases tracking
)
# Save the trained algorithm
filename = f"PPO_trained_agent_{skill}.pt"
save_path = os.path.join(save_dir, filename)
trained_pop[0].saveCheckpoint(save_path)
env.close()
# Now train the skill selector, which will choose which of the learned skills to use
# First load the learned skill agents
stabilize_agent = PPO.load(os.path.join(save_dir, "PPO_trained_agent_stabilize.pt"))
center_agent = PPO.load(os.path.join(save_dir, "PPO_trained_agent_center.pt"))
landing_agent = PPO.load(os.path.join(save_dir, "PPO_trained_agent_landing.pt"))
trained_skills = {
0: {"skill": "stabilize", "agent": stabilize_agent, "skill_duration": 40},
1: {"skill": "center", "agent": center_agent, "skill_duration": 40},
2: {"skill": "landing", "agent": landing_agent, "skill_duration": 40},
}
env = makeVectEnvs(INIT_HP["ENV_NAME"], num_envs=1) # Create environment
try:
state_dim = env.single_observation_space.n # Discrete observation space
one_hot = True # Requires one-hot encoding
except Exception:
state_dim = env.single_observation_space.shape # Continuous observation space
one_hot = False # Does not require one-hot encoding
action_dim = len(
trained_skills
) # Selector will be trained to choose which trained skill to use
if INIT_HP["CHANNELS_LAST"]:
state_dim = (state_dim[2], state_dim[0], state_dim[1])
pop = initialPopulation(
algo="PPO", # Algorithm
state_dim=state_dim, # State dimension
action_dim=action_dim, # Action dimension
one_hot=one_hot, # One-hot encoding
net_config=NET_CONFIG, # Network configuration
INIT_HP=INIT_HP, # Initial hyperparameters
population_size=INIT_HP["POPULATION_SIZE"], # Population size
device=device,
)
if INIT_HP["WANDB"]:
wandb.init(
# set the wandb project where this run will be logged
project="EvoWrappers",
name="{}-EvoHPO-{}-{}".format(
INIT_HP["ENV_NAME"],
INIT_HP["ALGO"],
datetime.now().strftime("%m%d%Y%H%M%S"),
),
# track hyperparameters and run metadata
config={
"algo": f"Evo HPO {INIT_HP['ALGO']}",
"env": INIT_HP["ENV_NAME"],
"INIT_HP": INIT_HP,
},
)
bar_format = "{l_bar}{bar:10}| {n:4}/{total_fmt} [{elapsed:>7}<{remaining:>7}, {rate_fmt}{postfix}]"
pbar = trange(
INIT_HP["EPISODES"],
unit="ep",
bar_format=bar_format,
ascii=True,
dynamic_ncols=True,
)
total_steps = 0
# RL training loop
for idx_epi in pbar:
for agent in pop: # Loop through population
state = env.reset()[0] # Reset environment at start of episode
score = 0
states = []
actions = []
log_probs = []
rewards = []
terminations = []
values = []
for idx_step in range(500):
# Get next action from agent
action, log_prob, _, value = agent.getAction(state)
# Internal loop to execute trained skill
skill_agent = trained_skills[action[0]]["agent"]
skill_duration = trained_skills[action[0]]["skill_duration"]
reward = 0
for skill_step in range(skill_duration):
# If landed, do nothing
if state[0][6] or state[0][7]:
next_state, skill_reward, termination, truncation, _ = env.step(
[0]
)
else:
skill_action, _, _, _ = skill_agent.getAction(state)
next_state, skill_reward, termination, truncation, _ = env.step(
skill_action
) # Act in environment
reward += skill_reward
if np.any(termination) or np.any(truncation):
break
state = next_state
score += reward
states.append(state)
actions.append(action)
log_probs.append(log_prob)
rewards.append(reward)
terminations.append(termination)
values.append(value)
agent.scores.append(score)
# Learn according to agent's RL algorithm
agent.learn(
(
states,
actions,
log_probs,
rewards,
terminations,
values,
next_state,
)
)
agent.steps[-1] += idx_step + 1
total_steps += idx_step + 1
if (idx_epi + 1) % INIT_HP["EVO_EPOCHS"] == 0:
mean_scores = np.mean([agent.scores[-20:] for agent in pop], axis=1)
if INIT_HP["WANDB"]:
wandb.log(
{
"global_step": total_steps,
"train/mean_score": np.mean(mean_scores),
}
)
print(
f"""
--- Epoch {idx_epi + 1} ---
Score avgs:\t{mean_scores}
Steps:\t\t{total_steps}
""",
end="\r",
)
if INIT_HP["WANDB"]:
wandb.finish()
env.close()
# Save the trained selector
filename = "PPO_trained_agent_selector.pt"
save_path = os.path.join(save_dir, filename)
pop[0].saveCheckpoint(save_path)