On-Policy Rollout Buffer

On-policy RL algorithms like PPO and A2C require collecting experiences from the current policy and computing advantages before updating the policy. Unlike off-policy algorithms that can reuse old experiences, on-policy methods need fresh data from each policy iteration.

The rollout buffer is designed to efficiently collect experiences from vectorized environments, compute Generalized Advantage Estimation (GAE), and provide properly formatted batches for policy updates. After each training iteration, the buffer is typically reset since the experiences are no longer valid for the updated policy.

During environment interaction, use RolloutBuffer.add() to store transitions. Once an episode or rollout is complete, call RolloutBuffer.compute_returns_and_advantages() to calculate GAE advantages and returns. Finally, use RolloutBuffer.get_tensor_batch() to sample minibatches for policy optimization.

from agilerl.components.rollout_buffer import RolloutBuffer

buffer = RolloutBuffer(
    capacity=2048,  # Number of steps to collect per environment
    observation_space=env.observation_space,
    action_space=env.action_space,
    num_envs=8,  # Number of parallel environments
    device=device,
    gae_lambda=0.95,  # GAE lambda parameter
    gamma=0.99,  # Discount factor
)

The RolloutBuffer can also collect experiences for POMDPs and sample sequences for truncated BPTT. To do this, users can specify recurrent=True.

from agilerl.typing import BPTTSequenceType

buffer = RolloutBuffer(
    capacity=2048,  # Number of steps to collect per environment
    observation_space=env.observation_space,
    action_space=env.action_space,
    num_envs=8,  # Number of parallel environments
    device=device,
    gae_lambda=0.95,  # GAE lambda parameter
    gamma=0.99,  # Discount factor
    recurrent=True,
    hidden_state_architecture=hidden_state_architecture,
    bptt_sequence_type=BPTTSequenceType.MAXIMUM,
)

Note

The bptt_sequence_type parameter determines how sequences are sampled from the buffer. - BPTTSequenceType.CHUNKED: Samples as many unique sequences of length max_seq_len``as possible from the buffer. - ``BPTTSequenceType.MAXIMUM: Samples as many (overlapping) sequences of length max_seq_len as possible from the buffer. - BPTTSequenceType.FIFTY_PERCENT_OVERLAPPING: Samples as many (overlapping) sequences of length max_seq_len as possible from the buffer, with up

to 50% overlap between sequences.

Parameters

class agilerl.components.rollout_buffer.RolloutBuffer(capacity: int, observation_space: Space, action_space: Space, num_envs: int = 1, device: str = 'cpu', gae_lambda: float = 0.95, gamma: float = 0.99, recurrent: bool = False, hidden_state_architecture: dict[str, tuple[int, int, int]] | None = None, use_gae: bool = True, wrap_at_capacity: bool = False, max_seq_len: int | None = None, bptt_sequence_type: BPTTSequenceType = BPTTSequenceType.CHUNKED)

Rollout buffer for collecting experiences and computing advantages for RL algorithms. This buffer is designed to handle vectorized environments efficiently.

Parameters:
  • capacity (int) – Maximum number of timesteps to store in the buffer (per environment).

  • observation_space (gym.spaces.Space) – Observation space of the environment.

  • action_space (gym.spaces.Space) – Action space of the environment.

  • num_envs (int) – Number of parallel environments.

  • device (str, optional) – Device to store tensors on, defaults to “cpu”.

  • gae_lambda (float, optional) – Lambda parameter for GAE, defaults to 0.95.

  • gamma (float, optional) – Discount factor, defaults to 0.99.

  • recurrent (bool, optional) – Whether to store hidden states, defaults to False.

  • hidden_state_architecture (dict[str, tuple[int, int, int]], optional) – Architecture of hidden states if used, defaults to None.

  • use_gae (bool, optional) – Whether to compute GAE advantages, defaults to True.

  • wrap_at_capacity (bool, optional) – Whether to wrap the buffer at capacity, defaults to False. This is especially useful for OFF-policy algorithms, ON-policy algorithms should leave this as False in most cases.

  • max_seq_len (int, optional) – Maximum sequence length for BPTT, defaults to None.

add(obs: ndarray | dict[str, ndarray] | tuple[ndarray, ...] | Tensor | TensorDict | tuple[Tensor, ...] | dict[str, Tensor] | Number | list[ReasoningPrompts] | ReasoningPrompts, action: ndarray | Tensor, reward: float | ndarray, done: bool | ndarray, value: float | ndarray, log_prob: float | ndarray, next_obs: ndarray | dict[str, ndarray] | tuple[ndarray, ...] | Tensor | TensorDict | tuple[Tensor, ...] | dict[str, Tensor] | Number | list[ReasoningPrompts] | ReasoningPrompts | None = None, hidden_state: dict[str, ndarray | Tensor] | None = None, next_hidden_state: dict[str, ndarray | Tensor] | None = None, episode_start: bool | ndarray | None = None, action_mask: ndarray | Tensor | None = None) None

Add a new batch of observations and associated data from vectorized environments to the buffer.

Parameters:
  • obs (ObservationType) – Current observation batch (shape: (num_envs, *obs_shape))

  • action (ArrayOrTensor) – Action batch taken (shape: (num_envs, *action_shape))

  • reward (float | np.ndarray) – Reward batch received (shape: (num_envs,))

  • done (bool | np.ndarray) – Done flag batch (shape: (num_envs,))

  • value (float | np.ndarray) – Value estimate batch (shape: (num_envs,))

  • log_prob (float | np.ndarray) – Log probability batch of the actions (shape: (num_envs,))

  • next_obs (ObservationType | None) – Next observation batch (shape: (num_envs, *obs_shape)), defaults to None

  • hidden_state (dict[str, ArrayOrTensor] | None) – Current hidden state batch (shape: (num_envs, hidden_size)), defaults to None

  • next_hidden_state (dict[str, ArrayOrTensor] | None) – Next hidden state batch (shape: (num_envs, hidden_size)), defaults to None

  • episode_start (bool | np.ndarray | None) – Episode start flag batch (shape: (num_envs,)), defaults to None

  • action_mask (ArrayOrTensor | None) – Action mask batch (shape: (num_envs, mask_size)), 1=legal 0=illegal, defaults to None

compute_returns_and_advantages(last_value: ndarray | Tensor, last_done: ndarray | Tensor) None

Compute returns and advantages for the stored experiences using GAE or Monte Carlo.

Parameters:
  • last_value (ArrayOrTensor) – Value estimate for the last observation in each environment (shape: (num_envs,))

  • last_done (ArrayOrTensor) – Done flag for the last state in each environment (shape: (num_envs,))

get(batch_size: int | None = None) dict[str, ndarray | dict[str, ndarray]]

Get data from the buffer, flattened and optionally sampled into minibatches.

Parameters:

batch_size (int | None) – Size of the minibatch to sample. If None, returns all data. Defaults to None.

Returns:

Dictionary containing flattened buffer data arrays.

Return type:

dict[str, np.ndarray | dict[str, np.ndarray]]

get_minibatch_sequences(batch_size: int) Generator[tuple[dict[str, Tensor], dict[str, Tensor]], None, None]

Get a minibatch of sequences from the buffer.

Parameters:

batch_size (int) – The number of sequences to sample.

Returns:

A TensorDict containing the minibatch of sequences.

Return type:

Generator[tuple[dict[str, torch.Tensor], dict[str, torch.Tensor]], None, None]

get_tensor_batch(batch_size: int | None = None, device: str | None = None) dict[str, Tensor | dict[str, Tensor]]

Get data from the buffer as PyTorch tensors, flattened and optionally sampled. The output is a TensorDict.

Parameters:
  • batch_size (int | None) – Size of batch to sample, if None returns all data, defaults to None.

  • device (str | None) – Device to put tensors on, defaults to None (uses self.device).

Returns:

TensorDict containing the data.

Return type:

dict[str, torch.Tensor | dict[str, torch.Tensor]]

prepare_sequence_tensors(device: str | None = None) TensorDict

Return a TensorDict with all of the possible sequences in the buffer for the observations, actions, and hidden states. We pad the sequences to the same length to obtain a TensorDict with batch_size [num_sequences, max_sequence_length] for efficient truncated BPTT.

Parameters:

device (str | None) – Device to put tensors on, defaults to None (uses self.device).

Returns:

Dictionary with tensor sequences.

Return type:

TensorDict

reset() None

Reset the buffer pointer and full flag.

size() int

Get current number of transitions stored in the buffer.

Returns:

Current number of transitions.

Return type:

int