Distributed TrainingΒΆ
AgileRL can also be used for distributed training if you have multiple devices you want to take advantage of. We use the HuggingFace Accelerate library to implement this in an open manner, without hiding behind too many layers of abstraction. This should make implementations simple, but also highly customisable, by continuing to expose the PyTorch training loop beneath it all.
To launch distributed training scripts in bash, use accelerate launch. To customise the distributed training properties, specify the key --config_file. An example
config file has been provided at configs/accelerate/accelerate.yaml.
Putting this all together, launching a distributed training script can be done as follows:
accelerate_launch --config_file configs/accelerate/accelerate.yaml demo_online_distributed.py
- There are some key considerations to bear in mind when implementing a distributed training run:
If you only want to execute something once, rather than repeating it for each process, e.g printing a statement, logging to W&B, then use
if accelerator.is_main_process:.Training happens in parallel on each device, meaning that steps in a RL environment happen on each device too. In order to count the number of global training steps taken, you must multiply the number of steps you have taken on a singular device by the number of devices (assuming they are equal). If you want to use distributed training to train more quickly, and normally you would train for 100,000 steps on one device, you can now train for just 25,000 steps if using four devices.
Below is an example of a distributed training loop.
Example Distributed Training Loop
from agilerl.components.replay_buffer import ReplayBuffer
from agilerl.components.data import ReplayDataset
from agilerl.components.sampler import Sampler
from agilerl.hpo.mutation import Mutations
from agilerl.hpo.tournament import TournamentSelection
from agilerl.utils.utils import create_population, make_vect_envs, observation_space_channels_to_first
from accelerate import Accelerator
import numpy as np
import os
from torch.utils.data import DataLoader
from tqdm import trange
accelerator = Accelerator()
accelerator.wait_for_everyone()
if accelerator.is_main_process:
print("===== AgileRL Online Distributed Demo =====")
accelerator.wait_for_everyone()
NET_CONFIG = {
"encoder_config": {"hidden_size": [32, 32]}, # Encoder hidden size
"head_config": {"hidden_size": [32, 32]}, # Head hidden size
}
INIT_HP = {
"DOUBLE": True, # Use double Q-learning in DQN or CQN
"BATCH_SIZE": 128, # Batch size
"LR": 1e-3, # Learning rate
"GAMMA": 0.99, # Discount factor
"LEARN_STEP": 1, # Learning frequency
"TAU": 1e-3, # For soft update of target network parameters
# Swap image channels dimension last to first [H, W, C] -> [C, H, W]
"CHANNELS_LAST": False,
"POP_SIZE": 4, # Population size
}
# Create vectorized environment
num_envs = 8
env = make_vect_envs("LunarLander-v3", num_envs=num_envs) # Create environment
observation_space = env.single_observation_space
action_space = env.single_action_space
if INIT_HP['CHANNELS_LAST']:
observation_space = observation_space_channels_to_first(observation_space)
# RL hyperparameter configuration for mutations
hp_config = HyperparameterConfig(
lr = RLParameter(min=1e-4, max=1e-2),
batch_size = RLParameter(min=8, max=64),
learn_step = RLParameter(min=1, max=120, grow_factor=1.5, shrink_factor=0.75)
)
pop = create_population(
algo="DQN", # RL algorithm
observation_space=observation_space, # State dimension
action_space=action_space, # Action dimension
net_config=NET_CONFIG, # Network configuration
INIT_HP=INIT_HP, # Initial hyperparameters
population_size=INIT_HP["POP_SIZE"], # Population size
num_envs=num_envs, # No. vectorized envs
accelerator=accelerator, # Accelerator
)
memory = ReplayBuffer(
max_size=10000, # Max replay buffer size
device=accelerator.device,
)
replay_dataset = ReplayDataset(memory, INIT_HP["BATCH_SIZE"])
replay_dataloader = DataLoader(replay_dataset, batch_size=None)
replay_dataloader = accelerator.prepare(replay_dataloader)
sampler = Sampler(
dataset=replay_dataset,
dataloader=replay_dataloader,
)
tournament = TournamentSelection(
tournament_size=2, # Tournament selection size
elitism=True, # Elitism in tournament selection
population_size=INIT_HP["POP_SIZE"], # Population size
eval_loop=1, # Evaluate using last N fitness scores
)
mutations = Mutations(
no_mutation=0.4, # No mutation
architecture=0.2, # Architecture mutation
new_layer_prob=0.2, # New layer mutation
parameters=0.2, # Network parameters mutation
activation=0, # Activation layer mutation
rl_hp=0.2, # Learning HP mutation
mutation_sd=0.1, # Mutation strength # Network architecture
rand_seed=1, # Random seed
accelerator=accelerator, # Accelerator
)
max_steps = 200000 # Max steps
learning_delay = 1000 # Steps before starting learning
# Exploration params
eps_start = 1.0 # Max exploration
eps_end = 0.1 # Min exploration
eps_decay = 0.995 # Decay per episode
epsilon = eps_start
evo_steps = 10000 # Evolution frequency
eval_steps = None # Evaluation steps per episode - go until done
eval_loop = 1 # Number of evaluation episodes
total_steps = 0
accel_temp_models_path = "models/{}".format("LunarLander-v3")
if accelerator.is_main_process:
if not os.path.exists(accel_temp_models_path):
os.makedirs(accel_temp_models_path)
print(f"\nDistributed training on {accelerator.device}...")
# TRAINING LOOP
print("Training...")
pbar = trange(max_steps, unit="step", disable=not accelerator.is_local_main_process)
while np.less([agent.steps[-1] for agent in pop], max_steps).all():
accelerator.wait_for_everyone()
pop_episode_scores = []
for agent in pop: # Loop through population
state, info = env.reset() # Reset environment at start of episode
scores = np.zeros(num_envs)
completed_episode_scores, losses = [], []
steps = 0
epsilon = eps_start
for idx_step in range(evo_steps):
# Get next action from agent
action = agent.get_action(state, epsilon)
epsilon = max(
eps_end, epsilon * eps_decay
) # Decay epsilon for exploration
# Act in environment
next_state, reward, terminated, truncated, info = env.step(action)
scores += np.array(reward)
steps += num_envs
total_steps += num_envs
# Collect scores for completed episodes
for idx, (d, t) in enumerate(zip(terminated, truncated)):
if d or t:
completed_episode_scores.append(scores[idx])
agent.scores.append(scores[idx])
scores[idx] = 0
# Save experience to replay buffer
memory.save_to_memory_vect_envs(
state, action, reward, next_state, terminated
)
# Learn according to learning frequency
if memory.counter > learning_delay and len(memory) >= agent.batch_size:
for _ in range(num_envs // agent.learn_step):
# Sample dataloader
experiences = sampler.sample(agent.batch_size)
# Learn according to agent's RL algorithm
agent.learn(experiences)
state = next_state
pbar.update(evo_steps // len(pop))
agent.steps[-1] += steps
pop_episode_scores.append(completed_episode_scores)
# Reset epsilon start to latest decayed value for next round of population training
eps_start = epsilon
# Evaluate population
fitnesses = [
agent.test(
env,
swap_channels=INIT_HP["CHANNELS_LAST"],
max_steps=eval_steps,
loop=eval_loop,
)
for agent in pop
]
mean_scores = [
(
np.mean(episode_scores)
if len(episode_scores) > 0
else "0 completed episodes"
)
for episode_scores in pop_episode_scores
]
if accelerator.is_main_process:
print(f"--- Global steps {total_steps} ---")
print(f"Steps {[agent.steps[-1] for agent in pop]}")
print(f"Scores: {mean_scores}")
print(f'Fitnesses: {["%.2f"%fitness for fitness in fitnesses]}')
print(
f'5 fitness avgs: {["%.2f"%np.mean(agent.fitness[-5:]) for agent in pop]}'
)
# Tournament selection and population mutation
accelerator.wait_for_everyone()
for model in pop:
model.unwrap_models()
accelerator.wait_for_everyone()
if accelerator.is_main_process:
elite, pop = tournament.select(pop)
pop = mutations.mutation(pop)
for pop_i, model in enumerate(pop):
model.save_checkpoint(f"{accel_temp_models_path}/DQN_{pop_i}.pt")
accelerator.wait_for_everyone()
if not accelerator.is_main_process:
for pop_i, model in enumerate(pop):
model.load_checkpoint(f"{accel_temp_models_path}/DQN_{pop_i}.pt")
accelerator.wait_for_everyone()
for model in pop:
model.wrap_models()
# Update step counter
for agent in pop:
agent.steps.append(agent.steps[-1])
pbar.close()
env.close()