Petting Zoo Async Vector Environment

Class for vectorizing pettingzoo parallel environments, for both custom and default pettingzoo parallel environments.

# Default pettingzoo environment
from agilerl.vector.pz_async_vec_env import AsyncPettingZooVecEnv
from pettingzoo.mpe import simple_speaker_listener_v4

num_envs = 4
env = AsyncPettingZooVecEnv(
      [
          lambda: simple_speaker_listener_v4.parallel_env()
          for _ in range(num_envs)
      ]
  )
observations, infos = vec_env.reset()
for step in range(25):
    actions = {
        agent: [vec_env.single_action_space(agent).sample() for n in range(num_envs)]
        for agent in vec_env.agents
    }
    observations, rewards, terminations, truncations, infos = vec_env.step(actions)
# Custom pettingzoo environment
from agilerl.vector.pz_async_vec_env import AsyncPettingZooVecEnv

num_envs = 4
vec_env = AsyncPettingZooVecEnv([lambda: CustomEnv() for _ in range(num_envs)])
observations, infos = vec_env.reset()
for step in range(25):
    actions = {
        agent: [vec_env.single_action_space(agent).sample() for n in range(num_envs)]
        for agent in vec_env.agents
    }
    observations, rewards, terminations, truncations, infos = vec_env.step(actions)

Parameters

class agilerl.vector.pz_async_vec_env.AsyncPettingZooVecEnv(env_fns: List[Callable[[], PettingZooVecEnv | ParallelEnv]], copy: bool = True, context: str | None = None)

Vectorized PettingZoo environment that runs multiple environments in parallel

Parameters:
  • env_fns (list[Callable]) – Functions that create the environment

  • copy (bool, optional) – Boolean flag to copy the observation data when it is returned with either .step() or .reset(), recommended, defaults to True

  • context – Context for multiprocessing

call(name: str, *args: Any, **kwargs: Any) Any

Call a method from each parallel environment with args and kwargs.

Parameters:
  • name (str) – Name of the method or property to call

  • *args

    Position arguments to apply to the method call.

  • **kwargs

    Keyword arguments to apply to the method call.

call_async(name: str, *args: Any, **kwargs: Any) None

Calls the method with name asynchronously and apply args and kwargs to the method.

Parameters:
  • name (str) – Name of the method or property to call

  • *args

    Position arguments to apply to the method call.

  • **kwargs

    Keyword arguments to apply to the method call.

call_wait(timeout: float | None = None) Any

Calls all parent pipes and waits for the results.

Parameters:

timeout – Number of seconds before the call to call_wait() times out. If None (default),

the call to call_wait() never times out, defaults to 0 :type timeout: int | float | None, optional

close_extras(timeout: float | None = None, terminate: bool = False) None

Close the environments & clean up the extra resources (processes and pipes).

Parameters:
  • timeout (int | float | None, optional) – Number of seconds before the call to close() times out. If None, the call to close() never times out. If the call to close() times out, then all processes are terminated, defaults to 0

  • terminate (bool, optional) – If True, then the close() operation is forced and all processes are terminated, defaults to False

get_attr(name: str) Any

Get a property from each parallel environment.

Parameters:

name (str) – Name of property to get from each individual environment

get_observations() Dict[str, ndarray | Dict[str, ndarray] | Tuple[_SupportsArray[dtype[Any]] | _NestedSequence[_SupportsArray[dtype[Any]]] | bool | int | float | complex | str | bytes | _NestedSequence[bool | int | float | complex | str | bytes], ...]]

Get the observations from the environments.

Returns:

Observations from the environments

Return type:

Dict[str, NumpyObsType]

render() Any

Returns the rendered frames from the parallel environments.

reset(*, seed: int | None = None, options: Dict[str, Any] | None = None) Tuple[Dict[str, ndarray | Dict[str, ndarray] | Tuple[_SupportsArray[dtype[Any]] | _NestedSequence[_SupportsArray[dtype[Any]]] | bool | int | float | complex | str | bytes | _NestedSequence[bool | int | float | complex | str | bytes], ...]], Dict[str, Any]]

Reset all the environments and return two dictionaries of batched observations and infos.

Parameters:
  • seed (None | int, optional) – Random seed, defaults to None

  • options (dict[str, Any]) – Options dictionary

reset_async(seed: int | None = None, options: Dict[str, Any] | None = None) None

Send calls to the reset methods of the sub-environments.

To get the results of these calls, you may invoke reset_wait().

Parameters:
  • seed (None | int, optional) – Random seed, defaults to None

  • options (dict[str, Any]) – Options dictionary

reset_wait(timeout: float | None = None) Tuple[Dict[str, ndarray | Dict[str, ndarray] | Tuple[_SupportsArray[dtype[Any]] | _NestedSequence[_SupportsArray[dtype[Any]]] | bool | int | float | complex | str | bytes | _NestedSequence[bool | int | float | complex | str | bytes], ...]], Dict[str, Any]]

Waits for the calls triggered by reset_async() to finish and returns the results.

Parameters:

timeout – Number of seconds before the call to reset_wait times out. If None, the call to

reset_wait never times out, defaults to 0 :type timeout: int | float | None, optional

Returns:

Tuple of observations and infos

Return type:

Tuple[Dict[str, NumpyObsType], Dict[str, Any]]

set_attr(name: str, values: Any) None

Sets an attribute of the sub-environments.

Parameters:
  • name (str) – Name of the property to be set in each individual environment.

  • values (list[Any] | tuple[Any] | object) – Values of the property to be set to. If values is a list or tuple, then it corresponds to the values for each individual environment, otherwise a single value is set for all environments.

step_async(actions: List[List[int | float | ndarray | Tensor]]) None

Tell all the environments to start taking a step with the given actions. Call step_wait() to get the results of the step. You should not call this if a step_async run is already pending.

Parameters:

actions (list[list[int | float | np.ndarray]]) – List of lists of length num_envs, each sub list contains actions for each agent in a given environment

step_wait(timeout: float | None = None) Tuple[Dict[str, ndarray | Dict[str, ndarray] | Tuple[_SupportsArray[dtype[Any]] | _NestedSequence[_SupportsArray[dtype[Any]]] | bool | int | float | complex | str | bytes | _NestedSequence[bool | int | float | complex | str | bytes], ...]], Dict[str, ndarray], Dict[str, ndarray], Dict[str, ndarray], Dict[str, Any]]

Wait for the calls to step in each sub-environment to finish.

Parameters:

timeout (int | float | None, optional) – Number of seconds before the call to step_wait times out. If None, the call to step_wait never times out, defaults to 0

Returns:

Tuple of observations, rewards, dones, and infos

Return type:

Tuple[Dict[str, NumpyObsType], Dict[str, np.ndarray], Dict[str, np.ndarray], Dict[str, Any]]

class agilerl.vector.pz_async_vec_env.Observations(shared_memory: Dict[str, RawArray | Tuple[RawArray, ...] | Dict[str, RawArray]], obs_spaces: Dict[str, Space], num_envs: int)

Class for storing observations with a dictionary interface

Parameters:
  • shared_memory (multiprocessing.RawArray) – A RawArray that all envs write observations to.

  • obs_spaces (Dict[str, gymnasiums.spaces.Space]) – Dictionary of gymnasium observation spaces

  • num_envs (int) – Number of environments

agilerl.vector.pz_async_vec_env.create_shared_memory(num_envs: int, obs_spaces: Dict[str, Space], context: Any) Dict[str, RawArray]

Create shared memory for multi-agent observations.

Parameters:
  • num_envs (int) – Number of environments

  • obs_spaces (Dict[str, gymnasiums.spaces.Space]) – Dictionary of gymnasium observation spaces

  • context (Any) – Multiprocessing context

agilerl.vector.pz_async_vec_env.write_to_shared_memory(index: int, observation: Dict[str, ndarray | Dict[str, ndarray] | Tuple[_SupportsArray[dtype[Any]] | _NestedSequence[_SupportsArray[dtype[Any]]] | bool | int | float | complex | str | bytes | _NestedSequence[bool | int | float | complex | str | bytes], ...]], shared_memory: Dict[str, RawArray], obs_space: Dict[str, Box | Discrete | MultiDiscrete | Dict | Tuple | MultiBinary | List[Box | Discrete | MultiDiscrete | Dict | Tuple | MultiBinary]]) None

Set the observation for a given environment. Handles Dict and Tuple spaces.

Parameters:
  • index (int) – Environment index

  • observation (Dict[str, np.ndarray]) – Observation from env.step or env.reset

  • shared_memory (Dict[str, mp.Array | Tuple[mp.Array, ...] | Dict[str, mp.Array]]) – Shared memory

  • obs_space (Dict[str, gymnasium.spaces.Space]) – Observation space dictionary

agilerl.vector.pz_async_vec_env.process_transition(transitions: Tuple[Dict[str, ndarray | Dict[str, ndarray] | Tuple[_SupportsArray[dtype[Any]] | _NestedSequence[_SupportsArray[dtype[Any]]] | bool | int | float | complex | str | bytes | _NestedSequence[bool | int | float | complex | str | bytes], ...]], ...], obs_spaces: Dict[str, Space], transition_names: List[str], agents: List[str]) List[Dict[str, ndarray | Dict[str, ndarray] | Tuple[_SupportsArray[dtype[Any]] | _NestedSequence[_SupportsArray[dtype[Any]]] | bool | int | float | complex | str | bytes | _NestedSequence[bool | int | float | complex | str | bytes], ...]]]

Process transition, adds in placeholder values for killed sub-agents.

Parameters:
  • transitions (Tuple[Dict[str, NumpyObsType], ...]) – Tuple of environment transition

  • obs_spaces (Dict[str, gymnasium.spaces.Space]) – Observation spaces

  • transition_names (List[str]) – Names associated to transitions

  • agents (List[str]) – List of sub-agent names

agilerl.vector.pz_async_vec_env.get_placeholder_value(agent: str, transition_name: str, obs_spaces: Dict[str, Space] | None = None) Any

Used to obtain a placeholder value to return for associated experience when an agent is killed or is inactive for the current step.

Parameters:
  • agent (str) – Agent ID

  • transition_name (str) – Name of the transition

  • obs_spaces (Dict[str, gymnasium.spaces.Space]) – Observation spaces

Returns:

Placeholder value

Return type:

Any

agilerl.vector.pz_async_vec_env._async_worker(index: int, env_fn: Callable[[], ParallelEnv], pipe: Connection, parent_pipe: Connection, shared_memory: Dict[str, RawArray], error_queue: Queue, agents: List[str]) None

Worker function to run the environment in a subprocess.

Parameters:
  • index (int) – Subprocess index

  • env_fn (callable) – Function to call environment

  • pipe (Connection) – Child pipe object for sending data to the main process

  • parent_pipe (Connection) – Parent pipe object

  • shared_memory (List[multiprocessing.Array]) – List of shared memories.

  • error_queue (mp.Queue) – Queue object for collecting subprocess errors to communicate back to the main process

  • observation_shapes (Dict[str, Tuple[int]]) – Shapes of observations

  • observation_widths (Dict[str, int]) – Flattened observation widths

  • observation_dtypes (Dict[str, np.dtype]) – Observation dtypes

  • agents (str) – Sub-agent names