Path Integration

Arena

class flygym.examples.path_integration.PathIntegrationArenaBase(*args, **kwargs)

Bases: object

get_spawn_position(rel_pos: ndarray, rel_angle: ndarray) tuple[ndarray, ndarray]
update_cam_pos(physics: dm_control.mjcf.Physics, fly_pos: ndarray) None
class flygym.examples.path_integration.PathIntegrationArenaFlat(*args, **kwargs)

Bases: PathIntegrationArenaBase, FlatTerrain

Flat terrain for the path integration task with some fixed camera configurations.

class flygym.examples.path_integration.PathIntegrationArenaBlocks(*args, **kwargs)

Bases: PathIntegrationArenaBase, BlocksTerrain

Blocks terrain for the path integration task with some fixed camera configurations.

get_spawn_position(rel_pos: ndarray, rel_angle: ndarray) tuple[ndarray, ndarray]

Given a relative entity spawn position and orientation (as if it was a simple flat terrain), return the adjusted position and orientation. This is useful for environments that have complex terrain (e.g. with obstacles) where the entity’s spawn position needs to be shifted accordingly.

For example, if the arena has flat terrain, this method can simply return rel_pos, rel_angle unchanged (as is the case by default). If there is are features on the ground that are 0.1 mm in height, then this method should return rel_pos + [0, 0, 0.1], rel_angle.

Parameters:
rel_posnp.ndarray

(x, y, z) position of the entity in mm as supplied by the user (before any transformation).

rel_anglenp.ndarray

Euler angle (rotation along x, y, z in radian) of the fly’s orientation as supplied by the user (before any transformation).

Returns:
np.ndarray

Adjusted (x, y, z) position of the entity.

np.ndarray

Adjusted euler angles (rotations along x, y, z in radian) of the fly’s orientation.

Controller

class flygym.examples.path_integration.RandomExplorationController(dt: float, forward_dn_drive: tuple[float, float] = (1.0, 1.0), left_turn_dn_drive: tuple[float, float] = (-0.4, 1.2), right_turn_dn_drive: tuple[float, float] = (1.2, -0.4), turn_duration_mean: float = 0.4, turn_duration_std: float = 0.1, lambda_turn: float = 1.0, seed: int = 0, init_time: float = 0.1)

Bases: object

This controller drives a random exploration: the fly transitions between forward walking and turning in a Poisson process. When the fly turns, the turn direction is chosen randomly and the turn duration is drawn from a normal distribution.

Parameters:
dtfloat

Time step of the simulation.

forward_dn_drivetuple[float, float], optional

DN drives for forward walking, by default (1.0, 1.0).

left_turn_dn_drivetuple[float, float], optional

DN drives for turning left, by default (-0.4, 1.2).

right_turn_dn_drivetuple[float, float], optional

DN drives for turning right, by default (1.2, -0.4).

turn_duration_meanfloat, optional

Mean of the turn duration distribution in seconds, by default 0.4.

turn_duration_stdfloat, optional

Standard deviation of the turn duration distribution in seconds, by default 0.1.

lambda_turnfloat, optional

Rate of the Poisson process for turning, by default 1.0.

seedint, optional

Random seed, by default 0.

init_timefloat, optional

Initial time, in seconds, during which the fly walks straight, by default 0.1.

step()

Update the fly’s walking state.

Returns:
WalkingState

The next state of the fly.

tuple[float, float]

The next DN drives.

class flygym.examples.path_integration.PathIntegrationController(*args, **kwargs)

Bases: HybridTurningController

A wrapper of HybridTurningController that records variables that are used to perform path integration.

Notes

Please refer to the “MPD Task Specifications” page of the API references for the detailed specifications of the action space, the observation space, the reward, the “terminated” and “truncated” flags, and the “info” dictionary.

static absolute_to_relative_pos(pos: ndarray, base_pos: ndarray, heading: ndarray) ndarray
property action_space
close() None

Close the simulation, save data, and release any resources.

get_observation() ObsType
get_wrapper_attr(name: str) Any

Gets the attribute name from the environment.

property gravity
metadata: dict[str, Any] = {'render_modes': []}
property np_random: Generator

Returns the environment’s internal _np_random that if not set will initialise with a random seed.

Returns:

Instances of np.random.Generator

property observation_space
render(*args, **kwargs)

Compute the render frames as specified by render_mode during the initialization of the environment.

The environment’s metadata render modes (env.metadata[“render_modes”]) should contain the possible ways to implement the render modes. In addition, list versions for most render modes is achieved through gymnasium.make which automatically applies a wrapper to collect rendered frames.

Note:

As the render_mode is known during __init__, the objects used to render the environment state should be initialised in __init__.

By convention, if the render_mode is:

  • None (default): no render is computed.

  • “human”: The environment is continuously rendered in the current display or terminal, usually for human consumption. This rendering should occur during step() and render() doesn’t need to be called. Returns None.

  • “rgb_array”: Return a single frame representing the current state of the environment. A frame is a np.ndarray with shape (x, y, 3) representing RGB values for an x-by-y pixel image.

  • “ansi”: Return a strings (str) or StringIO.StringIO containing a terminal-style text representation for each time step. The text can include newlines and ANSI escape sequences (e.g. for colors).

  • “rgb_array_list” and “ansi_list”: List based version of render modes are possible (except Human) through the wrapper, gymnasium.wrappers.RenderCollection that is automatically applied during gymnasium.make(..., render_mode="rgb_array_list"). The frames collected are popped after render() is called or reset().

Note:

Make sure that your class’s metadata "render_modes" key includes the list of supported modes.

Changed in version 0.25.0: The render function was changed to no longer accept parameters, rather these parameters should be specified in the environment initialised, i.e., gymnasium.make("CartPole-v1", render_mode="human")

render_mode: str | None = None
reset(seed=None, init_phases=None, init_magnitudes=None, **kwargs)

Reset the simulation.

Parameters:
seedint, optional

Seed for the random number generator. If None, the simulation is re-seeded without a specific seed. For reproducibility, always specify a seed.

init_phasesnp.ndarray, optional

Initial phases of the CPGs. See CPGNetwork for details.

init_magnitudesnp.ndarray, optional

Initial magnitudes of the CPGs. See CPGNetwork for details.

**kwargs

Additional keyword arguments to be passed to SingleFlySimulation.reset.

Returns:
np.ndarray

Initial observation upon reset.

dict

Additional information.

reward_range = (-inf, inf)
set_slope(slope: float, rot_axis='y')

Set the slope of the simulation environment and modify the camera orientation so that gravity is always pointing down. Changing the gravity vector might be useful during climbing simulations. The change in the camera angle has been extensively tested for the simple cameras (left, right, top, bottom, front, back) but not for the composed ones.

Parameters:
slopefloat

The desired_slope of the environment in degrees.

rot_axisstr, optional

The axis about which the slope is applied, by default “y”.

spec: EnvSpec | None = None
step(action)

Same as HybridTurningController.step, but also records the stride for each leg (i.e., how much the leg tip has moved in the fly’s egocentric frame since the last step) in the observation space under the key “stride_diff_unmasked”. Note that this calculation does not take into account whether the “stride” is actually made during a power stroke (i.e., stance phase); it only reports the change in end effector positions in an “unmasked” manner. The user should post-process it using the contact mask as a part of the model. The order of legs in stride_diff_unmasked is: LF, LM, LH, RF, RM, RH.

property unwrapped: Env[ObsType, ActType]

Returns the base non-wrapped environment.

Returns:

Env: The base non-wrapped gymnasium.Env instance

Model

class flygym.examples.path_integration.LinearModel(coefs_all, intercept, legs)

Bases: object

Simple linear model for predicting change in heading and displacement.

flygym.examples.path_integration.path_integrate(trial_data: dict[str, ndarray], heading_model: LinearModel, displacement_model: LinearModel, time_scale: float, contact_force_thr: tuple[float, float, float], legs: str, dt: float)

Perform path integration on trial data.

Parameters:
trial_datadict[str, np.ndarray]

Dictionary containing trial data.

heading_modelLinearModel

Model for predicting change in heading.

displacement_modelLinearModel

Model for predicting change in displacement.

time_scalefloat

Time scale for path integration.

contact_force_thrtuple[float, float, float]

Thresholds for contact forces. These are used to determine whether a leg is in contact with the ground.

legsstr

String indicating which legs are included. Can be any combination of “F”, “M”, and “H”.

dtfloat

Time step of the physics simulation in the trial.

Returns:
dict[str, np.ndarray]

Dictionary containing the following keys: * “heading_pred”: Predicted heading. * “heading_actual”: Actual heading. * “pos_pred”: Predicted position. * “pos_actual”: Actual position. * “heading_diff_pred”: Predicted change in heading. * “heading_diff_actual”: Actual change in heading. * “displacement_diff_pred”: Predicted change in displacement. * “displacement_diff_actual”: Actual change in displacement.

Utilities

flygym.examples.path_integration.util.get_leg_mask(legs: str) ndarray

Given a list of legs, return a boolean mask indicating which legs are included.

If legs == “F”, the mask is np.array([True, False, False]) If legs == “FM”, the mask is np.array([True, True, False]) …

flygym.examples.path_integration.util.extract_variables(trial_data: dict[str, ndarray], time_scale: float, contact_force_thr: tuple[float, float, float], legs: str, dt: float = 1e-4) dict[str, ndarray]

Extract variables used for path integration from trial data. The difference between load_trial_data and extract_variables is that the former loads the raw data from the trial (i.e., physics simulation). The latter extracts variables from these raw data subject to additional parameters such as time scale. For each trial, we only call load_trial_data once, but we may call extract_variables multiple times with different parameters.

Parameters:
trial_datadict[str, np.ndarray]

Dictionary containing trial data.

time_scalefloat

Time scale for path integration.

contact_force_thrtuple[float, float, float]

Thresholds for contact forces. These are used to determine whether a leg is in contact with the ground.

legsstr

String indicating which legs are included. Can be any combination of “F”, “M”, and “H”.

dtfloat, optional

Time step of the physics simulation in the trial, by default 1e-4.