Petting Zoo Async Vector Environment

Class for vectorizing pettingzoo parallel environments, for both custom and default pettingzoo parallel environments.

# Default pettingzoo environment
from agilerl.vector.pz_async_vec_env import AsyncPettingZooVecEnv
from pettingzoo.mpe import simple_speaker_listener_v4

num_envs = 4
env = simple_speaker_listener_v4.parallel_env()
vec_env = AsyncPettingZooVecEnv([lambda : env for _ in range(num_envs)])
observations, infos = vec_env.reset()
for step in range(25):
    actions = {
        agent: [vec_env.single_action_space(agent).sample() for n in range(num_envs)]
        for agent in vec_env.agents
    }
    observations, rewards, terminations, truncations, infos = vec_env.step(actions)
# Custom pettingzoo environment
from agilerl.vector.pz_async_vec_env import AsyncPettingZooVecEnv
env = CustomEnv()
num_envs = 4
AsyncPettingZooVecEnv([lambda : env for _ in range(num_envs)])
observations, infos = vec_env.reset()
for step in range(25):
    actions = {
        agent: [vec_env.single_action_space(agent).sample() for n in range(num_envs)]
        for agent in vec_env.agents
    }
    observations, rewards, terminations, truncations, infos = vec_env.step(actions)

Parameters

class agilerl.vector.pz_async_vec_env.AsyncPettingZooVecEnv(env_fns, experience_spec=None, copy=True, context=None)

Vectorized PettingZoo environment that runs multiple environments in parallel

Parameters:
  • env_fns (list[Callable]) – Functions that create the environment

  • copy (bool, optional) – Boolean flag to copy the observation data when it is returned with either .step() or .reset(), recommended, defaults to True

  • context – Context for multiprocessing

call(name, *args, **kwargs)

Call a method from each parallel environment with args and kwargs.

Parameters:
  • name (str) – Name of the method or property to call

  • *args

    Position arguments to apply to the method call.

  • **kwargs

    Keyword arguments to apply to the method call.

call_async(name, *args, **kwargs)

Calls the method with name asynchronously and apply args and kwargs to the method.

Parameters:
  • name (str) – Name of the method or property to call

  • *args

    Position arguments to apply to the method call.

  • **kwargs

    Keyword arguments to apply to the method call.

call_wait(timeout=None)

Calls all parent pipes and waits for the results.

Parameters:

timeout (int | float | None, optional) – Number of seconds before the call to call_wait() times out. If None (default), the call to call_wait() never times out, defaults to 0

close_extras(timeout=None, terminate=False)

Close the environments & clean up the extra resources (processes and pipes).

Parameters:
  • timeout (int | float | None, optional) – Number of seconds before the call to close() times out. If None, the call to close() never times out. If the call to close() times out, then all processes are terminated, defaults to 0

  • terminate (bool, optional) – If True, then the close() operation is forced and all processes are terminated, defaults to False

get_attr(name)

Get a property from each parallel environment.

Parameters:

name (str) – Name of property to get from each individual environment

render()

Returns the rendered frames from the parallel environments.

reset(*, seed=None, options=None)

Reset all the environments and return two dictionaries of batched observations and infos.

Parameters:
  • seed (None | int, optional) – Random seed, defaults to None

  • options (dict[str, Any]) – Options dictionary

reset_async(seed=None, options=None)

Send calls to the reset methods of the sub-environments.

To get the results of these calls, you may invoke reset_wait().

Parameters:
  • seed (None | int, optional) – Random seed, defaults to None

  • options (dict[str, Any]) – Options dictionary

reset_wait(timeout=None)

Waits for the calls triggered by reset_async() to finish and returns the results.

Parameters:

timeout (int | float | None, optional) – Number of seconds before the call to reset_wait times out. If None, the call to reset_wait never times out, defaults to 0

set_attr(name, values)

Sets an attribute of the sub-environments.

Parameters:
  • name (str) – Name of the property to be set in each individual environment.

  • values (list[Any] | tuple[Any] | object) – Values of the property to be set to. If values is a list or tuple, then it corresponds to the values for each individual environment, otherwise a single value is set for all environments.

step_async(actions)

Tell all the environments to start taking a step with the given actions. Call step_wait() to get the results of the step. You should not call this if a step_async run is already pending.

Parameters:

actions (list[list[int | float | np.ndarray]]) – List of lists of length num_envs, each sub list contains actions for each agent in a given environment

step_wait(timeout=None)

Wait for the calls to step in each sub-environment to finish.

Parameters:

timeout (int | float | None, optional) – Number of seconds before the call to step_wait times out. If None, the call to step_wait never times out, defaults to 0

class agilerl.vector.pz_async_vec_env.AsyncState(value)

The AsyncVectorEnv possible states given the different actions.

class agilerl.vector.pz_async_vec_env.PettingZooExperienceSpec(num_envs)

Class for formatting experiences when being returned by a vectorized environment

Parameters:

num_envs (int) – Number of environments to vectorize

class agilerl.vector.pz_async_vec_env.Observations(shared_memory, exp_spec, num_envs)

Class for storing observations with a dictionary interface

Parameters:
class agilerl.vector.pz_async_vec_env.SharedMemory(num_envs, exp_spec, context)

Class to hold the shared memory object that each of the subprocesses will write their observation to.

Parameters:
  • num_envs (int) – Number of environments to vectorize

  • exp_spec (PettingZooExperienceSpec) – Experience specification

  • context (BaseContext) – Multiprocessing context

agilerl.vector.pz_async_vec_env._async_worker(index, env_fn, pipe, parent_pipe, shared_memory, error_queue, observation_shapes, observation_widths, observation_dtypes, agents)
Parameters:
  • index (int) – Subprocess index

  • env_fn (callable) – Function to call environment

  • pipe (Connection) – Child pipe object for sending data to the main process

  • parent_pipe (Connection) – Parent pipe object

  • shared_memory (List[multiprocessing.Array]) – List of shared memories.

  • error_queue (mp.Queue) – Queue object for collecting subprocess errors to communicate back to the main process

  • observation_shapes (Dict[str, Tuple[int]]) – Shapes of observations

  • observation_widths (Dict[str, int]) – Flattened observation widths

  • observation_dtypes (Dict[str, np.dtype]) – Observation dtypes

  • agents (str) – Sub-agent names