On-Policy Rollout Buffer¶
On-policy RL algorithms like PPO and A2C require collecting experiences from the current policy and computing advantages before updating the policy. Unlike off-policy algorithms that can reuse old experiences, on-policy methods need fresh data from each policy iteration.
The rollout buffer is designed to efficiently collect experiences from vectorized environments, compute Generalized Advantage Estimation (GAE), and provide properly formatted batches for policy updates. After each training iteration, the buffer is typically reset since the experiences are no longer valid for the updated policy.
During environment interaction, use RolloutBuffer.add() to store transitions. Once an episode or rollout is complete, call
RolloutBuffer.compute_returns_and_advantages() to calculate GAE advantages and returns. Finally, use RolloutBuffer.get_tensor_batch()
to sample minibatches for policy optimization.
from agilerl.components.rollout_buffer import RolloutBuffer
buffer = RolloutBuffer(
capacity=2048, # Number of steps to collect per environment
observation_space=env.observation_space,
action_space=env.action_space,
num_envs=8, # Number of parallel environments
device=device,
gae_lambda=0.95, # GAE lambda parameter
gamma=0.99, # Discount factor
)
The RolloutBuffer can also collect experiences for POMDPs and sample sequences for truncated BPTT. To do this, users can
specify recurrent=True.
from agilerl.typing import BPTTSequenceType
buffer = RolloutBuffer(
capacity=2048, # Number of steps to collect per environment
observation_space=env.observation_space,
action_space=env.action_space,
num_envs=8, # Number of parallel environments
device=device,
gae_lambda=0.95, # GAE lambda parameter
gamma=0.99, # Discount factor
recurrent=True,
hidden_state_architecture=hidden_state_architecture,
bptt_sequence_type=BPTTSequenceType.MAXIMUM,
)
Note
The bptt_sequence_type parameter determines how sequences are sampled from the buffer.
- BPTTSequenceType.CHUNKED: Samples as many unique sequences of length max_seq_len``as possible from the buffer.
- ``BPTTSequenceType.MAXIMUM: Samples as many (overlapping) sequences of length max_seq_len as possible from the buffer.
- BPTTSequenceType.FIFTY_PERCENT_OVERLAPPING: Samples as many (overlapping) sequences of length max_seq_len as possible from the buffer, with up
to 50% overlap between sequences.
Parameters¶
- class agilerl.components.rollout_buffer.RolloutBuffer(capacity: int, observation_space: Space, action_space: Space, num_envs: int = 1, device: str = 'cpu', gae_lambda: float = 0.95, gamma: float = 0.99, recurrent: bool = False, hidden_state_architecture: dict[str, tuple[int, int, int]] | None = None, use_gae: bool = True, wrap_at_capacity: bool = False, max_seq_len: int | None = None, bptt_sequence_type: BPTTSequenceType = BPTTSequenceType.CHUNKED)¶
Rollout buffer for collecting experiences and computing advantages for RL algorithms. This buffer is designed to handle vectorized environments efficiently.
- Parameters:
capacity (int) – Maximum number of timesteps to store in the buffer (per environment).
observation_space (gym.spaces.Space) – Observation space of the environment.
action_space (gym.spaces.Space) – Action space of the environment.
num_envs (int) – Number of parallel environments.
device (str, optional) – Device to store tensors on, defaults to “cpu”.
gae_lambda (float, optional) – Lambda parameter for GAE, defaults to 0.95.
gamma (float, optional) – Discount factor, defaults to 0.99.
recurrent (bool, optional) – Whether to store hidden states, defaults to False.
hidden_state_architecture (dict[str, tuple[int, int, int]], optional) – Architecture of hidden states if used, defaults to None.
use_gae (bool, optional) – Whether to compute GAE advantages, defaults to True.
wrap_at_capacity (bool, optional) – Whether to wrap the buffer at capacity, defaults to False. This is especially useful for OFF-policy algorithms, ON-policy algorithms should leave this as False in most cases.
max_seq_len (int, optional) – Maximum sequence length for BPTT, defaults to None.
- add(obs: ndarray | dict[str, ndarray] | tuple[ndarray, ...] | Tensor | TensorDict | tuple[Tensor, ...] | dict[str, Tensor] | Number | list[ReasoningPrompts] | ReasoningPrompts, action: ndarray | Tensor, reward: float | ndarray, done: bool | ndarray, value: float | ndarray, log_prob: float | ndarray, next_obs: ndarray | dict[str, ndarray] | tuple[ndarray, ...] | Tensor | TensorDict | tuple[Tensor, ...] | dict[str, Tensor] | Number | list[ReasoningPrompts] | ReasoningPrompts | None = None, hidden_state: dict[str, ndarray | Tensor] | None = None, next_hidden_state: dict[str, ndarray | Tensor] | None = None, episode_start: bool | ndarray | None = None, action_mask: ndarray | Tensor | None = None) None¶
Add a new batch of observations and associated data from vectorized environments to the buffer.
- Parameters:
obs (ObservationType) – Current observation batch (shape: (num_envs, *obs_shape))
action (ArrayOrTensor) – Action batch taken (shape: (num_envs, *action_shape))
reward (float | np.ndarray) – Reward batch received (shape: (num_envs,))
done (bool | np.ndarray) – Done flag batch (shape: (num_envs,))
value (float | np.ndarray) – Value estimate batch (shape: (num_envs,))
log_prob (float | np.ndarray) – Log probability batch of the actions (shape: (num_envs,))
next_obs (ObservationType | None) – Next observation batch (shape: (num_envs, *obs_shape)), defaults to None
hidden_state (dict[str, ArrayOrTensor] | None) – Current hidden state batch (shape: (num_envs, hidden_size)), defaults to None
next_hidden_state (dict[str, ArrayOrTensor] | None) – Next hidden state batch (shape: (num_envs, hidden_size)), defaults to None
episode_start (bool | np.ndarray | None) – Episode start flag batch (shape: (num_envs,)), defaults to None
action_mask (ArrayOrTensor | None) – Action mask batch (shape: (num_envs, mask_size)), 1=legal 0=illegal, defaults to None
- compute_returns_and_advantages(last_value: ndarray | Tensor, last_done: ndarray | Tensor) None¶
Compute returns and advantages for the stored experiences using GAE or Monte Carlo.
- Parameters:
last_value (ArrayOrTensor) – Value estimate for the last observation in each environment (shape: (num_envs,))
last_done (ArrayOrTensor) – Done flag for the last state in each environment (shape: (num_envs,))
- get(batch_size: int | None = None) dict[str, ndarray | dict[str, ndarray]]¶
Get data from the buffer, flattened and optionally sampled into minibatches.
- get_minibatch_sequences(batch_size: int) Generator[tuple[dict[str, Tensor], dict[str, Tensor]], None, None]¶
Get a minibatch of sequences from the buffer.
- get_tensor_batch(batch_size: int | None = None, device: str | None = None) dict[str, Tensor | dict[str, Tensor]]¶
Get data from the buffer as PyTorch tensors, flattened and optionally sampled. The output is a TensorDict.
- prepare_sequence_tensors(device: str | None = None) TensorDict¶
Return a TensorDict with all of the possible sequences in the buffer for the observations, actions, and hidden states. We pad the sequences to the same length to obtain a TensorDict with batch_size [num_sequences, max_sequence_length] for efficient truncated BPTT.
- Parameters:
device (str | None) – Device to put tensors on, defaults to None (uses self.device).
- Returns:
Dictionary with tensor sequences.
- Return type:
TensorDict