Petting Zoo Async Vector Environment¶
Class for vectorizing pettingzoo parallel environments, for both custom and default pettingzoo parallel environments.
# Default pettingzoo environment
from agilerl.vector.pz_async_vec_env import AsyncPettingZooVecEnv
from pettingzoo.mpe import simple_speaker_listener_v4
num_envs = 4
env = AsyncPettingZooVecEnv(
[
lambda: simple_speaker_listener_v4.parallel_env()
for _ in range(num_envs)
]
)
observations, infos = vec_env.reset()
for step in range(25):
actions = {
agent: [vec_env.single_action_space(agent).sample() for n in range(num_envs)]
for agent in vec_env.agents
}
observations, rewards, terminations, truncations, infos = vec_env.step(actions)
# Custom pettingzoo environment
from agilerl.vector.pz_async_vec_env import AsyncPettingZooVecEnv
num_envs = 4
vec_env = AsyncPettingZooVecEnv([lambda: CustomEnv() for _ in range(num_envs)])
observations, infos = vec_env.reset()
for step in range(25):
actions = {
agent: [vec_env.single_action_space(agent).sample() for n in range(num_envs)]
for agent in vec_env.agents
}
observations, rewards, terminations, truncations, infos = vec_env.step(actions)
Parameters¶
- class agilerl.vector.pz_async_vec_env.AsyncPettingZooVecEnv(env_fns: list[Callable[[], PettingZooVecEnv | ParallelEnv]], copy: bool = True, context: str | None = None)¶
Vectorized PettingZoo environment that runs multiple environments in parallel.
- Parameters:
env_fns (list[Callable]) – Functions that create the environment
copy (bool, optional) – Boolean flag to copy the observation data when it is returned with either .step() or .reset(), recommended, defaults to True
context (str, optional) – Context for multiprocessing. Choose between “spawn”, “fork”, or “forkserver”.
- call(name: str, *args: Any, **kwargs: Any) Any¶
Call a method from each parallel environment with args and kwargs.
- Parameters:
name (str) – Name of the method or property to call
*args –
Position arguments to apply to the method call.
**kwargs –
Keyword arguments to apply to the method call.
- call_async(name: str, *args: Any, **kwargs: Any) None¶
Call the method with name asynchronously and apply args and kwargs to the method.
- Parameters:
name (str) – Name of the method or property to call
*args –
Position arguments to apply to the method call.
**kwargs –
Keyword arguments to apply to the method call.
- call_wait(timeout: float | None = None) Any¶
Call all parent pipes and waits for the results.
- Parameters:
timeout – Number of seconds before the call to
call_wait()times out. IfNone(default),
the call to
call_wait()never times out, defaults to 0 :type timeout: int | float | None, optional
- close_extras(*, timeout: float | None = None, terminate: bool = False, **kwargs: Any) None¶
Close the environments & clean up the extra resources (processes and pipes).
- Parameters:
timeout (int | float | None, optional) – Number of seconds before the call to
close()times out. IfNone, the call toclose()never times out. If the call toclose()times out, then all processes are terminated, defaults to 0terminate (bool, optional) – If
True, then theclose()operation is forced and all processes are terminated, defaults to False
- get_attr(name: str) Any¶
Get a property from each parallel environment.
- Parameters:
name (str) – Name of property to get from each individual environment
- get_observations() dict[str, ndarray | dict[str, ndarray] | tuple[ndarray, ...]]¶
Get the observations from the environments.
- reset(*, seed: int | None = None, options: dict[str, Any] | None = None) tuple[dict[str, ndarray | dict[str, ndarray] | tuple[ndarray, ...]], dict[str, Any]]¶
Reset all the environments and return two dictionaries of batched observations and infos.
- reset_async(seed: int | None = None, options: dict[str, Any] | None = None) None¶
Send calls to the
resetmethods of the sub-environments.To get the results of these calls, you may invoke
reset_wait().
- reset_wait(timeout: float | None = None) tuple[dict[str, ndarray | dict[str, ndarray] | tuple[ndarray, ...]], dict[str, Any]]¶
Wait for the calls triggered by
reset_async()to finish and return the results.- Parameters:
timeout – Number of seconds before the call to
reset_waittimes out. If None, the call to
reset_waitnever times out, defaults to 0 :type timeout: int | float | None, optional
- set_attr(name: str, values: Any) None¶
Set an attribute of the sub-environments.
- Parameters:
name (str) – Name of the property to be set in each individual environment.
values (list[Any] | tuple[Any] | object) – Values of the property to be set to. If
valuesis a list or tuple, then it corresponds to the values for each individual environment, otherwise a single value is set for all environments.
- step_async(actions: list[list[int | float | ndarray | Tensor]]) None¶
Tell all the environments to start taking a step with the given actions. Call step_wait() to get the results of the step. You should not call this if a step_async run is already pending.
- step_wait(timeout: float | None = None) tuple[dict[str, ndarray | dict[str, ndarray] | tuple[ndarray, ...]], dict[str, ndarray], dict[str, ndarray], dict[str, ndarray], dict[str, Any]]¶
Wait for the calls to
stepin each sub-environment to finish.- Parameters:
timeout (int | float | None, optional) – Number of seconds before the call to
step_waittimes out. If None, the call tostep_waitnever times out, defaults to 0- Returns:
Tuple of observations, rewards, dones, and infos
- Return type:
tuple[dict[str, NumpyObsType], dict[str, np.ndarray], dict[str, np.ndarray], dict[str, Any]]
- class agilerl.vector.pz_async_vec_env.Observations(shared_memory: dict[str, RawArray | tuple[RawArray, ...] | dict[str, RawArray]], obs_spaces: dict[str, Space], num_envs: int)¶
Class for storing observations with a dictionary interface.
Create shared memory for multi-agent observations.
Set the observation for a given environment. Handles Dict and Tuple spaces.
- agilerl.vector.pz_async_vec_env.process_transition(transitions: tuple[dict[str, ndarray | dict[str, ndarray] | tuple[ndarray, ...]], ...], obs_spaces: dict[str, Space], transition_names: list[str], agents: list[str]) list[dict[str, ndarray | dict[str, ndarray] | tuple[ndarray, ...]]]¶
Process transition, adds in placeholder values for killed agents.
- agilerl.vector.pz_async_vec_env.get_placeholder_value(agent: str, transition_name: str, obs_spaces: dict[str, Space] | None = None) Any¶
Obtain a placeholder value to return for associated experience when an agent is killed or is inactive for the current step.
- agilerl.vector.pz_async_vec_env._async_worker(index: int, env_fn: Callable[[], ParallelEnv], pipe: Connection, parent_pipe: Connection, shared_memory: dict[str, RawArray], error_queue: Queue, agents: list[str]) None¶
Worker function to run the environment in a subprocess.
- Parameters:
index (int) – Subprocess index
env_fn (callable) – Function to call environment
pipe (Connection) – Child pipe object for sending data to the main process
parent_pipe (Connection) – Parent pipe object
shared_memory (list[multiprocessing.Array]) – List of shared memories.
error_queue (mp.Queue) – Queue object for collecting subprocess errors to communicate back to the main process
observation_shapes (dict[str, tuple[int]]) – Shapes of observations
observation_widths (dict[str, int]) – Flattened observation widths
observation_dtypes (dict[str, np.dtype]) – Observation dtypes
agents (str) – Agent names