Petting Zoo Async Vector Environment¶
Class for vectorizing pettingzoo parallel environments, for both custom and default pettingzoo parallel environments.
# Default pettingzoo environment
from agilerl.vector.pz_async_vec_env import AsyncPettingZooVecEnv
from pettingzoo.mpe import simple_speaker_listener_v4
num_envs = 4
env = AsyncPettingZooVecEnv(
[
lambda: simple_speaker_listener_v4.parallel_env()
for _ in range(num_envs)
]
)
observations, infos = vec_env.reset()
for step in range(25):
actions = {
agent: [vec_env.single_action_space(agent).sample() for n in range(num_envs)]
for agent in vec_env.agents
}
observations, rewards, terminations, truncations, infos = vec_env.step(actions)
# Custom pettingzoo environment
from agilerl.vector.pz_async_vec_env import AsyncPettingZooVecEnv
num_envs = 4
vec_env = AsyncPettingZooVecEnv([lambda: CustomEnv() for _ in range(num_envs)])
observations, infos = vec_env.reset()
for step in range(25):
actions = {
agent: [vec_env.single_action_space(agent).sample() for n in range(num_envs)]
for agent in vec_env.agents
}
observations, rewards, terminations, truncations, infos = vec_env.step(actions)
Parameters¶
- class agilerl.vector.pz_async_vec_env.AsyncPettingZooVecEnv(env_fns: List[Callable[[], PettingZooVecEnv | ParallelEnv]], copy: bool = True, context: str | None = None)¶
Vectorized PettingZoo environment that runs multiple environments in parallel
- Parameters:
- call(name: str, *args: Any, **kwargs: Any) Any ¶
Call a method from each parallel environment with args and kwargs.
- Parameters:
name (str) – Name of the method or property to call
*args –
Position arguments to apply to the method call.
**kwargs –
Keyword arguments to apply to the method call.
- call_async(name: str, *args: Any, **kwargs: Any) None ¶
Calls the method with name asynchronously and apply args and kwargs to the method.
- Parameters:
name (str) – Name of the method or property to call
*args –
Position arguments to apply to the method call.
**kwargs –
Keyword arguments to apply to the method call.
- call_wait(timeout: float | None = None) Any ¶
Calls all parent pipes and waits for the results.
- Parameters:
timeout – Number of seconds before the call to
call_wait()
times out. IfNone
(default),
the call to
call_wait()
never times out, defaults to 0 :type timeout: int | float | None, optional
- close_extras(timeout: float | None = None, terminate: bool = False) None ¶
Close the environments & clean up the extra resources (processes and pipes).
- Parameters:
timeout (int | float | None, optional) – Number of seconds before the call to
close()
times out. IfNone
, the call toclose()
never times out. If the call toclose()
times out, then all processes are terminated, defaults to 0terminate (bool, optional) – If
True
, then theclose()
operation is forced and all processes are terminated, defaults to False
- get_attr(name: str) Any ¶
Get a property from each parallel environment.
- Parameters:
name (str) – Name of property to get from each individual environment
- get_observations() Dict[str, ndarray | Dict[str, ndarray] | Tuple[_SupportsArray[dtype[Any]] | _NestedSequence[_SupportsArray[dtype[Any]]] | bool | int | float | complex | str | bytes | _NestedSequence[bool | int | float | complex | str | bytes], ...]] ¶
Get the observations from the environments.
- Returns:
Observations from the environments
- Return type:
Dict[str, NumpyObsType]
- reset(*, seed: int | None = None, options: Dict[str, Any] | None = None) Tuple[Dict[str, ndarray | Dict[str, ndarray] | Tuple[_SupportsArray[dtype[Any]] | _NestedSequence[_SupportsArray[dtype[Any]]] | bool | int | float | complex | str | bytes | _NestedSequence[bool | int | float | complex | str | bytes], ...]], Dict[str, Any]] ¶
Reset all the environments and return two dictionaries of batched observations and infos.
- reset_async(seed: int | None = None, options: Dict[str, Any] | None = None) None ¶
Send calls to the
reset
methods of the sub-environments.To get the results of these calls, you may invoke
reset_wait()
.
- reset_wait(timeout: float | None = None) Tuple[Dict[str, ndarray | Dict[str, ndarray] | Tuple[_SupportsArray[dtype[Any]] | _NestedSequence[_SupportsArray[dtype[Any]]] | bool | int | float | complex | str | bytes | _NestedSequence[bool | int | float | complex | str | bytes], ...]], Dict[str, Any]] ¶
Waits for the calls triggered by
reset_async()
to finish and returns the results.- Parameters:
timeout – Number of seconds before the call to
reset_wait
times out. If None, the call to
reset_wait
never times out, defaults to 0 :type timeout: int | float | None, optional
- set_attr(name: str, values: Any) None ¶
Sets an attribute of the sub-environments.
- Parameters:
name (str) – Name of the property to be set in each individual environment.
values (list[Any] | tuple[Any] | object) – Values of the property to be set to. If
values
is a list or tuple, then it corresponds to the values for each individual environment, otherwise a single value is set for all environments.
- step_async(actions: List[List[int | float | ndarray | Tensor]]) None ¶
Tell all the environments to start taking a step with the given actions. Call step_wait() to get the results of the step. You should not call this if a step_async run is already pending.
- step_wait(timeout: float | None = None) Tuple[Dict[str, ndarray | Dict[str, ndarray] | Tuple[_SupportsArray[dtype[Any]] | _NestedSequence[_SupportsArray[dtype[Any]]] | bool | int | float | complex | str | bytes | _NestedSequence[bool | int | float | complex | str | bytes], ...]], Dict[str, ndarray], Dict[str, ndarray], Dict[str, ndarray], Dict[str, Any]] ¶
Wait for the calls to
step
in each sub-environment to finish.- Parameters:
timeout (int | float | None, optional) – Number of seconds before the call to
step_wait
times out. If None, the call tostep_wait
never times out, defaults to 0- Returns:
Tuple of observations, rewards, dones, and infos
- Return type:
Tuple[Dict[str, NumpyObsType], Dict[str, np.ndarray], Dict[str, np.ndarray], Dict[str, Any]]
- class agilerl.vector.pz_async_vec_env.Observations(shared_memory: Dict[str, RawArray | Tuple[RawArray, ...] | Dict[str, RawArray]], obs_spaces: Dict[str, Space], num_envs: int)¶
Class for storing observations with a dictionary interface
Create shared memory for multi-agent observations.
Set the observation for a given environment. Handles Dict and Tuple spaces.
- agilerl.vector.pz_async_vec_env.process_transition(transitions: Tuple[Dict[str, ndarray | Dict[str, ndarray] | Tuple[_SupportsArray[dtype[Any]] | _NestedSequence[_SupportsArray[dtype[Any]]] | bool | int | float | complex | str | bytes | _NestedSequence[bool | int | float | complex | str | bytes], ...]], ...], obs_spaces: Dict[str, Space], transition_names: List[str], agents: List[str]) List[Dict[str, ndarray | Dict[str, ndarray] | Tuple[_SupportsArray[dtype[Any]] | _NestedSequence[_SupportsArray[dtype[Any]]] | bool | int | float | complex | str | bytes | _NestedSequence[bool | int | float | complex | str | bytes], ...]]] ¶
Process transition, adds in placeholder values for killed sub-agents.
- agilerl.vector.pz_async_vec_env.get_placeholder_value(agent: str, transition_name: str, obs_spaces: Dict[str, Space] | None = None) Any ¶
Used to obtain a placeholder value to return for associated experience when an agent is killed or is inactive for the current step.
- agilerl.vector.pz_async_vec_env._async_worker(index: int, env_fn: Callable[[], ParallelEnv], pipe: Connection, parent_pipe: Connection, shared_memory: Dict[str, RawArray], error_queue: Queue, agents: List[str]) None ¶
Worker function to run the environment in a subprocess.
- Parameters:
index (int) – Subprocess index
env_fn (callable) – Function to call environment
pipe (Connection) – Child pipe object for sending data to the main process
parent_pipe (Connection) – Parent pipe object
shared_memory (List[multiprocessing.Array]) – List of shared memories.
error_queue (mp.Queue) – Queue object for collecting subprocess errors to communicate back to the main process
observation_shapes (Dict[str, Tuple[int]]) – Shapes of observations
observation_widths (Dict[str, int]) – Flattened observation widths
observation_dtypes (Dict[str, np.dtype]) – Observation dtypes
agents (str) – Sub-agent names