Speaker-Listener with MATD3¶
This tutorial shows how to train an MATD3 agent on the simple speaker listener multi-particle environment.
Performance of trained MATD3 algorithm on 6 random episodes¶
What is MATD3?¶
MATD3 (Multi-Agent Twin Delayed Deep Deterministic Policy Gradients) extends the MADDPG (Multi-Agent Deep Deterministic Policy Gradients) algorithm to reduce overestimation bias in multi-agent domains through the use of a second set of critic networks and delayed updates of the policy networks. This enables superior performance when compared to MADDPG. For further information on MATD3, check out the documentation.
Compatible Action Spaces¶
|
|
|
|
|---|---|---|---|
❌ |
✔️ |
❌ |
❌ |
Code¶
Train multiple agents using MADDPG¶
The following code should run without any issues. The comments are designed to help you understand how to use PettingZoo with AgileRL. If you have any questions, please feel free to ask in the Discord server.
"""This tutorial shows how to train an MATD3 agent on the simple speaker listener multi-particle environment.
Authors: Michael (https://github.com/mikepratt1), Nickua (https://github.com/nicku-a)
"""
import os
import numpy as np
import torch
from pettingzoo.mpe import simple_speaker_listener_v4
from agilerl.algorithms import MATD3
from agilerl.algorithms.core.registry import HyperparameterConfig, RLParameter
from agilerl.components.multi_agent_replay_buffer import MultiAgentReplayBuffer
from agilerl.hpo.mutation import Mutations
from agilerl.hpo.tournament import TournamentSelection
from agilerl.utils.utils import (
create_population,
default_progress_bar,
make_multi_agent_vect_envs,
)
if __name__ == "__main__":
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("===== AgileRL Online Multi-Agent Demo =====")
# Define the network configuration
NET_CONFIG = {
"latent_dim": 64,
"encoder_config": {
"hidden_size": [64], # Actor hidden size
},
"head_config": {
"hidden_size": [64], # Critic hidden size
},
}
# Define the initial hyperparameters
INIT_HP = {
"POPULATION_SIZE": 4,
"ALGO": "MATD3", # Algorithm
"BATCH_SIZE": 128, # Batch size
"O_U_NOISE": True, # Ornstein Uhlenbeck action noise
"EXPL_NOISE": 0.1, # Action noise scale
"MEAN_NOISE": 0.0, # Mean action noise
"THETA": 0.15, # Rate of mean reversion in OU noise
"DT": 0.01, # Timestep for OU noise
"LR_ACTOR": 0.0001, # Actor learning rate
"LR_CRITIC": 0.001, # Critic learning rate
"GAMMA": 0.95, # Discount factor
"MEMORY_SIZE": 100000, # Max memory buffer size
"LEARN_STEP": 100, # Learning frequency
"TAU": 0.01, # For soft update of target parameters
"POLICY_FREQ": 2, # Policy frequnecy
}
num_envs = 8
def make_env():
return simple_speaker_listener_v4.parallel_env(continuous_actions=True)
env = make_multi_agent_vect_envs(env=make_env, num_envs=num_envs)
# Configure the multi-agent algo input arguments
observation_spaces = [env.single_observation_space(agent) for agent in env.agents]
action_spaces = [env.single_action_space(agent) for agent in env.agents]
# Append number of agents and agent IDs to the initial hyperparameter dictionary
INIT_HP["AGENT_IDS"] = env.agents
# Mutation config for RL hyperparameters
hp_config = HyperparameterConfig(
lr_actor=RLParameter(min=1e-4, max=1e-2),
lr_critic=RLParameter(min=1e-4, max=1e-2),
batch_size=RLParameter(min=8, max=512, dtype=int),
learn_step=RLParameter(
min=20,
max=200,
dtype=int,
grow_factor=1.5,
shrink_factor=0.75,
),
)
# Create a population ready for evolutionary hyper-parameter optimisation
pop: list[MATD3] = create_population(
INIT_HP["ALGO"],
observation_spaces,
action_spaces,
NET_CONFIG,
INIT_HP,
hp_config=hp_config,
population_size=INIT_HP["POPULATION_SIZE"],
num_envs=num_envs,
device=device,
)
# Configure the multi-agent replay buffer
field_names = ["obs", "action", "reward", "next_obs", "done"]
memory = MultiAgentReplayBuffer(
INIT_HP["MEMORY_SIZE"],
field_names=field_names,
agent_ids=INIT_HP["AGENT_IDS"],
device=device,
)
# Instantiate a tournament selection object (used for HPO)
tournament = TournamentSelection(
tournament_size=2, # Tournament selection size
elitism=True, # Elitism in tournament selection
population_size=INIT_HP["POPULATION_SIZE"], # Population size
eval_loop=1, # Evaluate using last N fitness scores
)
# Instantiate a mutations object (used for HPO)
mutations = Mutations(
no_mutation=0.2, # Probability of no mutation
architecture=0.2, # Probability of architecture mutation
new_layer_prob=0.2, # Probability of new layer mutation
parameters=0.2, # Probability of parameter mutation
activation=0, # Probability of activation function mutation
rl_hp=0.2, # Probability of RL hyperparameter mutation
mutation_sd=0.1, # Mutation strength
rand_seed=1,
device=device,
)
# Define training loop parameters
max_steps = 2_000_000 # Max steps (default: 2000000)
learning_delay = 0 # Steps before starting learning
evo_steps = 10_000 # Evolution frequency
eval_steps = None # Evaluation steps per episode - go until done
eval_loop = 1 # Number of evaluation episodes
elite = pop[0] # Assign a placeholder "elite" agent
total_steps = 0
# TRAINING LOOP
print("Training...")
pbar = default_progress_bar(max_steps)
while np.less([agent.steps[-1] for agent in pop], max_steps).all():
pop_episode_scores = []
for agent in pop: # Loop through population
agent.set_training_mode(True)
obs, info = env.reset() # Reset environment at start of episode
scores = np.zeros(num_envs)
completed_episode_scores = []
steps = 0
for idx_step in range(evo_steps // num_envs):
action, raw_action = agent.get_action(
obs=obs,
infos=info,
) # Predict action
next_obs, reward, termination, truncation, info = env.step(
action,
) # Act in environment
scores += np.sum(np.array(list(reward.values())).transpose(), axis=-1)
total_steps += num_envs
steps += num_envs
# Save experiences to replay buffer
memory.save_to_memory(
obs,
raw_action,
reward,
next_obs,
termination,
is_vectorised=True,
)
# Learn according to learning frequency
# Handle learn steps > num_envs
if agent.learn_step > num_envs:
learn_step = agent.learn_step // num_envs
if (
idx_step % learn_step == 0
and len(memory) >= agent.batch_size
and memory.counter > learning_delay
):
experiences = memory.sample(
agent.batch_size,
) # Sample replay buffer
agent.learn(
experiences,
) # Learn according to agent's RL algorithm
# Handle num_envs > learn step; learn multiple times per step in env
elif (
len(memory) >= agent.batch_size and memory.counter > learning_delay
):
for _ in range(num_envs // agent.learn_step):
experiences = memory.sample(
agent.batch_size,
) # Sample replay buffer
agent.learn(
experiences,
) # Learn according to agent's RL algorithm
obs = next_obs
# Calculate scores and reset noise for finished episodes
reset_noise_indices = []
term_array = np.array(list(termination.values())).transpose()
trunc_array = np.array(list(truncation.values())).transpose()
for idx, (d, t) in enumerate(
zip(term_array, trunc_array, strict=False),
):
if np.any(d) or np.any(t):
completed_episode_scores.append(scores[idx])
agent.scores.append(scores[idx])
scores[idx] = 0
reset_noise_indices.append(idx)
agent.reset_action_noise(reset_noise_indices)
pbar.update(evo_steps // len(pop))
agent.steps[-1] += steps
pop_episode_scores.append(completed_episode_scores)
# Evaluate population
fitnesses = [
agent.test(
env,
max_steps=eval_steps,
loop=eval_loop,
)
for agent in pop
]
mean_scores = [
(
np.mean(episode_scores)
if len(episode_scores) > 0
else "0 completed episodes"
)
for episode_scores in pop_episode_scores
]
pbar.write(
f"--- Global steps {total_steps} ---\n"
f"Steps {[agent.steps[-1] for agent in pop]}\n"
f"Scores: {mean_scores}\n"
f"Fitnesses: {[f'{fitness:.2f}' for fitness in fitnesses]}\n"
f"5 fitness avgs: {[f'{np.mean(agent.fitness[-5:]):.2f}' for agent in pop]}\n"
f"Mutations: {[agent.mut for agent in pop]}",
)
# Tournament selection and population mutation
elite, pop = tournament.select(pop)
pop = mutations.mutation(pop)
# Update step counter
for agent in pop:
agent.steps.append(agent.steps[-1])
# Save the trained algorithm
path = "./models/MATD3"
filename = "MATD3_trained_agent.pt"
os.makedirs(path, exist_ok=True)
save_path = os.path.join(path, filename)
elite.save_checkpoint(save_path)
pbar.close()
env.close()
Watch the trained agents play¶
The following code allows you to load your saved MATD3 algorithm from the previous training block, test the algorithms performance, and then visualise a number of episodes as a gif.
import os
import imageio
import numpy as np
import torch
from pettingzoo.mpe import simple_speaker_listener_v4
from PIL import Image, ImageDraw
from agilerl.algorithms import MATD3
# Define function to return image
def _label_with_episode_number(frame, episode_num):
im = Image.fromarray(frame)
drawer = ImageDraw.Draw(im)
text_color = (255, 255, 255) if np.mean(frame) < 128 else (0, 0, 0)
drawer.text(
(im.size[0] / 20, im.size[1] / 18),
f"Episode: {episode_num + 1}",
fill=text_color,
)
return im
if __name__ == "__main__":
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Configure the environment
env = simple_speaker_listener_v4.parallel_env(
continuous_actions=True,
render_mode="rgb_array",
)
env.reset()
agent_ids = env.agents
# Load the saved agent
path = "./models/MATD3/MATD3_trained_agent.pt"
matd3 = MATD3.load(path, device)
# Define test loop parameters
episodes = 10 # Number of episodes to test agent on
max_steps = 25 # Max number of steps to take in the environment in each episode
rewards = [] # List to collect total episodic reward
frames = [] # List to collect frames
indi_agent_rewards = {
agent_id: [] for agent_id in agent_ids
} # Dictionary to collect inidivdual agent rewards
# Test loop for inference
for ep in range(episodes):
obs, info = env.reset()
agent_reward = dict.fromkeys(agent_ids, 0)
score = 0
for _ in range(max_steps):
# Get next action from agent
action, _ = matd3.get_action(obs, infos=info)
# Save the frame for this step and append to frames list
frame = env.render()
frames.append(_label_with_episode_number(frame, episode_num=ep))
# Take action in environment
obs, reward, termination, truncation, info = env.step(
{agent: a.squeeze() for agent, a in action.items()},
)
# Save agent's reward for this step in this episode
for agent_id, r in reward.items():
agent_reward[agent_id] += r
# Determine total score for the episode and then append to rewards list
score = sum(agent_reward.values())
# Stop episode if any agents have terminated
if any(truncation.values()) or any(termination.values()):
break
rewards.append(score)
# Record agent specific episodic reward
for agent_id in agent_ids:
indi_agent_rewards[agent_id].append(agent_reward[agent_id])
print("-" * 15, f"Episode: {ep}", "-" * 15)
print("Episodic Reward: ", rewards[-1])
for agent_id, reward_list in indi_agent_rewards.items():
print(f"{agent_id} reward: {reward_list[-1]}")
env.close()
# Save the gif to specified path
gif_path = "./videos/"
os.makedirs(gif_path, exist_ok=True)
imageio.mimwrite(
os.path.join("./videos/", "speaker_listener.gif"),
frames,
duration=10,
)