Distributed TrainingΒΆ

AgileRL can also be used for distributed training if you have multiple devices you want to take advantage of. We use the HuggingFace Accelerate library to implement this in an open manner, without hiding behind too many layers of abstraction. This should make implementations simple, but also highly customisable, by continuing to expose the PyTorch training loop beneath it all.

To launch distributed training scripts in bash, use accelerate launch. To customise the distributed training properties, specify the key --config_file. An example config file has been provided at configs/accelerate/accelerate.yaml.

Putting this all together, launching a distributed training script can be done as follows:

accelerate_launch --config_file configs/accelerate/accelerate.yaml demo_online_distributed.py
There are some key considerations to bear in mind when implementing a distributed training run:
  • If you only want to execute something once, rather than repeating it for each process, e.g printing a statement, logging to W&B, then use if accelerator.is_main_process:.

  • Training happens in parallel on each device, meaning that steps in a RL environment happen on each device too. In order to count the number of global training steps taken, you must multiply the number of steps you have taken on a singular device by the number of devices (assuming they are equal). If you want to use distributed training to train more quickly, and normally you would train for 100,000 steps on one device, you can now train for just 25,000 steps if using four devices.

Example distributed training loop:

from agilerl.utils.utils import makeVectEnvs, initialPopulation
from agilerl.components.replay_buffer import ReplayBuffer
from agilerl.components.replay_data import ReplayDataset
from agilerl.components.sampler import Sampler
from agilerl.hpo.tournament import TournamentSelection
from agilerl.hpo.mutation import Mutations
from accelerate import Accelerator
import numpy as np
import os
from torch.utils.data import DataLoader
from tqdm import trange

if __name__ == '__main__':

    accelerator = Accelerator()

    NET_CONFIG = {
        'arch': 'mlp',       # Network architecture
        'hidden_size': [32, 32],  # Actor hidden size
    }

    INIT_HP = {
        'POPULATION_SIZE': 4,   # Population size
        'DOUBLE': True,         # Use double Q-learning in DQN or CQN
        'BATCH_SIZE': 128,      # Batch size
        'LR': 1e-3,             # Learning rate
        'GAMMA': 0.99,          # Discount factor
        'LEARN_STEP': 1,        # Learning frequency
        'TAU': 1e-3,            # For soft update of target network parameters
        'POLICY_FREQ': 2,       # DDPG target network update frequency vs policy network
        # Swap image channels dimension from last to first [H, W, C] -> [C, H, W]
        'CHANNELS_LAST': False
    }

    env = makeVectEnvs('LunarLander-v2', num_envs=8)   # Create environment
    try:
        state_dim = env.single_observation_space.n          # Discrete observation space
        one_hot = True                                      # Requires one-hot encoding
    except Exception:
        state_dim = env.single_observation_space.shape      # Continuous observation space
        one_hot = False                                     # Does not require one-hot encoding
    try:
        action_dim = env.single_action_space.n             # Discrete action space
    except Exception:
        action_dim = env.single_action_space.shape[0]      # Continuous action space

    if INIT_HP['CHANNELS_LAST']:
        state_dim = (state_dim[2], state_dim[0], state_dim[1])

    pop = initialPopulation(algo='DQN',                 # Algorithm
                            state_dim=state_dim,        # State dimension
                            action_dim=action_dim,      # Action dimension
                            one_hot=one_hot,            # One-hot encoding
                            net_config=NET_CONFIG,      # Network configuration
                            INIT_HP=INIT_HP,            # Initial hyperparameters
                            population_size=INIT_HP['POPULATION_SIZE'], # Population size
                            accelerator=accelerator)    # Accelerator

    field_names = ["state", "action", "reward", "next_state", "done"]
    memory = ReplayBuffer(action_dim=action_dim,    # Number of agent actions
                        memory_size=10000,        # Max replay buffer size
                        field_names=field_names)  # Field names to store in memory
    replay_dataset = ReplayDataset(memory, INIT_HP['BATCH_SIZE'])
    replay_dataloader = DataLoader(replay_dataset, batch_size=None)
    replay_dataloader = accelerator.prepare(replay_dataloader)
    sampler = Sampler(distributed=True,
                    dataset=replay_dataset,
                    dataloader=replay_dataloader)

    tournament = TournamentSelection(tournament_size=2,  # Tournament selection size
                                    elitism=True,      # Elitism in tournament selection
                                    population_size=INIT_HP['POPULATION_SIZE'],  # Population size
                                    evo_step=1)        # Evaluate using last N fitness scores

    mutations = Mutations(algo='DQN',                           # Algorithm
                        no_mutation=0.4,                      # No mutation
                        architecture=0.2,                     # Architecture mutation
                        new_layer_prob=0.2,                   # New layer mutation
                        parameters=0.2,                       # Network parameters mutation
                        activation=0,                         # Activation layer mutation
                        rl_hp=0.2,                            # Learning HP mutation
                        rl_hp_selection=['lr', 'batch_size'], # Learning HPs to choose from
                        mutation_sd=0.1,                      # Mutation strength
                        arch=NET_CONFIG['arch'],              # Network architecture
                        rand_seed=1,                          # Random seed
                        accelerator=accelerator)              # Accelerator)

    max_episodes = 1000 # Max training episodes
    max_steps = 500     # Max steps per episode

    # Exploration params
    eps_start = 1.0     # Max exploration
    eps_end = 0.1       # Min exploration
    eps_decay = 0.995   # Decay per episode
    epsilon = eps_start

    evo_epochs = 5      # Evolution frequency
    evo_loop = 1        # Number of evaluation episodes

    accel_temp_models_path = 'models/{}'.format('LunarLander-v2')
    if accelerator.is_main_process:
        if not os.path.exists(accel_temp_models_path):
            os.makedirs(accel_temp_models_path)

    print(f'\nDistributed training on {accelerator.device}...')

    # TRAINING LOOP
    for idx_epi in trange(max_episodes):
        accelerator.wait_for_everyone()
        for agent in pop:   # Loop through population
            state = env.reset()[0]  # Reset environment at start of episode
            score = 0
            for idx_step in range(max_steps):
                # Get next action from agent
                action = agent.getAction(state, epsilon)
                next_state, reward, done, _, _ = env.step(
                    action)   # Act in environment

                # Save experience to replay buffer
                memory.save2memoryVectEnvs(
                    state, action, reward, next_state, done)

                # Learn according to learning frequency
                if memory.counter % agent.learn_step == 0 and len(
                        memory) >= agent.batch_size:
                    # Sample dataloader
                    experiences = sampler.sample(agent.batch_size)
                    # Learn according to agent's RL algorithm
                    agent.learn(experiences)

                state = next_state
                score += reward

        # Update epsilon for exploration
        epsilon = max(eps_end, epsilon * eps_decay)

        # Now evolve population if necessary
        if (idx_epi + 1) % evo_epochs == 0:

            # Evaluate population
            fitnesses = [
                agent.test(
                    env,
                    swap_channels=False,
                    max_steps=max_steps,
                    loop=evo_loop) for agent in pop]

            if accelerator.is_main_process:
                print(f'Episode {idx_epi+1}/{max_episodes}')
                print(f'Fitnesses: {["%.2f"%fitness for fitness in fitnesses]}')
                print(f'100 fitness avgs: {["%.2f"%np.mean(agent.fitness[-100:]) for agent in pop]}')

            # Tournament selection and population mutation
            accelerator.wait_for_everyone()
            for model in pop:
                model.unwrap_models()
            accelerator.wait_for_everyone()
            if accelerator.is_main_process:
                elite, pop = tournament.select(pop)
                pop = mutations.mutation(pop)
                for pop_i, model in enumerate(pop):
                    model.saveCheckpoint(f'{accel_temp_models_path}/DQN_{pop_i}.pt')
            accelerator.wait_for_everyone()
            if not accelerator.is_main_process:
                for pop_i, model in enumerate(pop):
                    model.loadCheckpoint(f'{accel_temp_models_path}/DQN_{pop_i}.pt')
            accelerator.wait_for_everyone()
            for model in pop:
                model.wrap_models()

    env.close()