Vision#

Summary: In this tutorial, we will build a simple model to control the fly to follow a moving sphere. By doing so, we will also demonstrate how one can create a custom arena.

Animals typically navigate over rugged terrain to reach attractive objects (eg. potential mates, food sources) and to avoid repulsive features (eg. pheromones from predators) and obstacles. Animals use a hierarchical controller to achieve these goals: processing higher-order sensory signals, using them to select the next action, and translating these decisions into descending commands that drive lower-level motor systems. We aimed to simulate this sensorimotor hierarchy by adding vision and olfaction to NeuroMechFly.

Retina simulation#

A fly’s compound eye consists of ∼700–750 individual units called ommatidia arranged in a hexagonal pattern (see the left panel of the figure below from the droso4schools project; see also this article from the Arizona Retina Project). To emulate this, we attached a color camera to each of our model’s compound eyes (top right panel). We then transformed each camera image into 721 bins, representing ommatidia. Based on previous studies, we assume a 270° combined azimuth for the fly’s field of view, with a ∼17° binocular overlap. Visual sensitivity has evolved to highlight ethologically relevant color spectra at different locations in the environment. Here, as an initial step toward enabling this heterogeneity in our model, we implemented yellow- and pale-type ommatidia—sensitive to the green and blue channels of images rendered by the physics simulator—randomly assigned at a 7:3 ratio (as reported in Rister et al, 2013). Users can substitute the green and blue channel values with the desired light intensities sensed by yellow- and pale-type ommatidia to achieve more biorealistic chromatic vision.

https://github.com/NeLy-EPFL/_media/blob/main/flygym/vision.png?raw=true

In NeuroMechFly, the main interface to interact with the fly is the Retina class. It is embedded into the NeuroMechFly class as its .retina attribute if enable_vision is set to True in NeuroMechFly.sim_params. Here, let’s reproduce the top right panel of the figure above by putting the fly into an arena with three pillars and simulating the fly’s visual experience.

To start, we do the necessary imports:

1import matplotlib.pyplot as plt
2import numpy as np
3from tqdm import trange
4from gymnasium.utils.env_checker import check_env
5
6from flygym.mujoco import Parameters
7from flygym.mujoco.arena import FlatTerrain
8from flygym.mujoco.examples.obstacle_arena import ObstacleOdorArena
9from flygym.mujoco.examples.turning_controller import HybridTurningNMF

We have pre-implemented an ObstacleOdorArena class with visual obstacles and an odor source. The details of this class is beyond the scope of this tutorial, but you can refer to it on the FlyGym github repository:

 1# We start by creating a simple arena
 2flat_terrain_arena = FlatTerrain()
 3
 4# Then, we add visual and olfactory features on top of it
 5arena = ObstacleOdorArena(
 6    terrain=flat_terrain_arena,
 7    obstacle_positions=np.array([(7.5, 0), (12.5, 5), (17.5, -5)]),
 8    marker_size=0.5,
 9    obstacle_colors=[(0.14, 0.14, 0.2, 1), (0.2, 0.8, 0.2, 1), (0.2, 0.2, 0.8, 1)],
10    user_camera_settings=((13, -18, 9), (np.deg2rad(65), 0, 0), 45),
11)

Let’s put the fly in it and simulate 500 timesteps so the fly can stand on the floor in a stable manner:

 1contact_sensor_placements = [
 2    f"{leg}{segment}"
 3    for leg in ["LF", "LM", "LH", "RF", "RM", "RH"]
 4    for segment in ["Tibia", "Tarsus1", "Tarsus2", "Tarsus3", "Tarsus4", "Tarsus5"]
 5]
 6sim_params = Parameters(
 7    render_playspeed=0.2,
 8    render_camera="user_cam",
 9    enable_vision=True,
10    render_raw_vision=True,
11    enable_olfaction=True,
12)
13nmf = HybridTurningNMF(
14    sim_params=sim_params,
15    arena=arena,
16    spawn_pos=(13, -5, 0.2),
17    spawn_orientation=(0, 0, np.pi / 2 + np.deg2rad(70)),
18    contact_sensor_placements=contact_sensor_placements,
19)
20
21for i in range(500):
22    obs, reward, terminated, truncated, info = nmf.step(np.zeros(2))
23    nmf.render()
24
25fig, ax = plt.subplots(figsize=(4, 3), tight_layout=True)
26ax.imshow(nmf._frames[-1])
27ax.axis("off")
28fig.savefig("./outputs/vision_sim_env.png")
https://github.com/NeLy-EPFL/_media/blob/main/flygym/vision_sim_env.png?raw=true

We can access the intensities sensed by the fly’s ommatidia from the observation:

1print(obs["vision"])
2print("Shape:", obs["vision"].shape)
3print("Data type:", obs["vision"].dtype)
[[[0.9913793  0.        ]
  [1.         0.        ]
  [1.         0.        ]
  ...
  [0.31609195 0.        ]
  [0.29803923 0.        ]
  [0.29803923 0.        ]]

 [[0.9913793  0.        ]
  [1.         0.        ]
  [1.         0.        ]
  ...
  [0.29803923 0.        ]
  [0.3752228  0.        ]
  [0.4        0.        ]]]
Shape: (2, 721, 2)
Data type: float32

This gives us a (2, 722, 2) array representing the light intensities sensed by the ommatidia. The values are normalized to [0, 1]. The first dimension is for the two eyes (left and right in that order). The second dimension is for the 721 ommatidia per eye. The third dimension is for the two color channels (yellow- and pale-type in that order). For each ommatidia, only one of the two numbers along the last dimension is nonzero. The yellow- and pale-type ommatidia are split at a 7:3 ratio. We can verify this below:

1nonzero_idx = np.nonzero(obs["vision"])
2unique_vals, val_counts = np.unique(nonzero_idx[2], return_counts=True)
3val_counts / val_counts.sum()
array([0.70041609, 0.29958391])

But this is array representation is not good for visualization. We can use the hex_pxls_to_human_readable method of the retina to convert it into a normal [0, 256) 8-bit RGB image that can be plotted:

 1vision_left = nmf.retina.hex_pxls_to_human_readable(obs["vision"][0, :, :])
 2vision_right = nmf.retina.hex_pxls_to_human_readable(obs["vision"][1, :, :])
 3
 4fig, axs = plt.subplots(1, 2, figsize=(6, 3), tight_layout=True)
 5axs[0].imshow(vision_left, cmap="gray", vmin=0, vmax=255)
 6axs[0].axis("off")
 7axs[0].set_title("Left eye")
 8axs[1].imshow(vision_right, cmap="gray", vmin=0, vmax=255)
 9axs[1].axis("off")
10axs[1].set_title("Right eye")
11fig.savefig("./outputs/vision_sim.png")
https://github.com/NeLy-EPFL/_media/blob/main/flygym/vision_sim.png?raw=true

Since render_raw_vision is set to True in the parameters, we can access the raw RGB vision through the info dictionary before pixels are binned into ommatidia:

1fig, axs = plt.subplots(1, 2, figsize=(6, 3), tight_layout=True)
2axs[0].imshow(info["raw_vision"][0, :, :, :].astype(np.uint8))
3axs[0].axis("off")
4axs[0].set_title("Left eye")
5axs[1].imshow(info["raw_vision"][1, :, :, :].astype(np.uint8))
6axs[1].axis("off")
7axs[1].set_title("Right eye")
8fig.savefig("./outputs/vision_sim_raw.png")
https://github.com/NeLy-EPFL/_media/blob/main/flygym/vision_sim_raw.png?raw=true

We observe that the ommatidia covering the blue and the green pillars seem to have a bimodal distribution in intensity. This is because the ommatidia are stochastically split into yellow- and pale-types, and they have different sensitivities to different colors.

A dynamic arena with a moving sphere#

The next step is to create a custom arena with a moving sphere. To do this, we will implement a MovingObjArena class that inherits from the flygym.mujoco.arena.BaseArena class. A complete, functioning implementation of this class is provided under flygym.mujoco.examples.vision on the FlyGym repository. We start by defining some attributes in its __init__ method:

class MovingObjArena(BaseArena):
    """Flat terrain with a hovering moving object.

    Attributes
    ----------
    arena : mjcf.RootElement
        The arena object that the terrain is built on.
    ball_pos : Tuple[float,float,float]
        The position of the floating object in the arena.

    Parameters
    ----------
    size : Tuple[int, int]
        The size of the terrain in (x, y) dimensions.
    friction : Tuple[float, float, float]
        Sliding, torsional, and rolling friction coefficients, by default
        (1, 0.005, 0.0001)
    obj_radius : float
        Radius of the spherical floating object in mm.
    obj_spawn_pos : Tuple[float,float,float]
        Initial position of the object, by default (0, 2, 1).
    move_direction : str
        Which way the ball moves toward first. Can be "left", "right", or
        "random".
    move_speed : float
        Speed of the moving object.
    """

    def __init__(
        self,
        size: Tuple[float, float] = (300, 300),
        friction: Tuple[float, float, float] = (1, 0.005, 0.0001),
        obj_radius: float = 1,
        init_ball_pos: Tuple[float, float] = (5, 0),
        move_speed: float = 8,
        move_direction: str = "right",
    ):
        self.init_ball_pos = (*init_ball_pos, obj_radius)
        self.ball_pos = np.array(self.init_ball_pos, dtype="float32")
        self.friction = friction
        self.move_speed = move_speed
        self.curr_time = 0
        self.move_direction = move_direction
        if move_direction == "left":
            self.y_mult = 1
        elif move_direction == "right":
            self.y_mult = -1
        elif move_direction == "random":
            self.y_mult = np.random.choice([-1, 1])
        else:
            raise ValueError("Invalid move_direction")

        ...

Next, we define a root_element attribute. The simulated world is represented as a tree of objects, each attached to a parent. For example, the eyes of the fly are attached to the head, which is in turn attached to the thorax — the base of the NeuroMechFly model. Note that this tree is merely a representation of objects and their relation to each other; there does not necessarily have to be a visual or anatomical link between the parent and child objects. For example, the base of the NeuroMechFly model — the thorax — is attached to the world, but the link between the thorax and the world is a free joint, meaning that it is free to move in all 6 degrees of freedom. The root element is the root of this tree.

...

self.root_element = mjcf.RootElement()

...

Then, we will add the moving sphere. It will be attached to the root element:

...

# Add ball
obstacle = self.root_element.asset.add(
    "material", name="obstacle", reflectance=0.1
)
self.root_element.worldbody.add(
    "body", name="ball_mocap", mocap=True, pos=self.ball_pos, gravcomp=1
)
self.object_body = self.root_element.find("body", "ball_mocap")
self.object_body.add(
    "geom",
    name="ball",
    type="sphere",
    size=(obj_radius, obj_radius),
    rgba=(0.0, 0.0, 0.0, 1),
    material=obstacle,
)

...

Let’s also add some cameras so we can visualize the scene from different angles. This concludes the definition of our __init__ method.

...

# Add camera
self.birdeye_cam = self.root_element.worldbody.add(
    "camera",
    name="birdeye_cam",
    mode="fixed",
    pos=(15, 0, 35),
    euler=(0, 0, 0),
    fovy=45,
)
self.birdeye_cam_zoom = self.root_element.worldbody.add(
    "camera",
    name="birdeye_cam_zoom",
    mode="fixed",
    pos=(15, 0, 20),
    euler=(0, 0, 0),
    fovy=45,
)

Next, let’s define a get_spawn_position class. This is applies an offset to the user-defined fly spawn position. For example, if there is a stage in your arena that is 1 mm high, and you want to place the fly on this stage, then you might want to apply a transformation to the user-specified relative spawn position and return rel_pos + np.array([0, 0, 1]) as the effective spawn position. In our case, we have a flat arena, so we will just return the spawn position and orientation as is:

def get_spawn_position(self, rel_pos, rel_angle):
    return rel_pos, rel_angle

The arena also has a step method. Usually, in static arenas, this is left empty. However, since we want the sphere to move in our arena, we need to implement this method so the sphere is moved appropriately every step of the simulation:

def step(self, dt, physics):
    heading_vec = np.array([1, 2 * np.cos(self.curr_time * 3) * self.y_mult])
    heading_vec /= np.linalg.norm(heading_vec)
    self.ball_pos[:2] += self.move_speed * heading_vec * dt
    physics.bind(self.object_body).mocap_pos = self.ball_pos
    self.curr_time += dt

Finally, let’s implement a reset method:

def reset(self, physics):
    if self.move_direction == "random":
        self.y_mult = np.random.choice([-1, 1])
    self.curr_time = 0
    self.ball_pos = np.array(self.init_ball_pos, dtype="float32")
    physics.bind(self.object_body).mocap_pos = self.ball_pos

Visual feature preprocessing#

We will preprocess the visual feature by computing the x-y position of the object on the retina along with its size relative to the whole retinal image. We do this by applying binary thresholding to the image and calculating its size and center of mass. This is a good example to once again showcase the benefit of encapsulating preprogrammed logic into the Markov Decision Process (implemented as a Gym environment). If you haven’t, read the tutorial on building a turning controller to see how this is done.

Recall that in the HybridTurningController, we implemented the purple arrow in the following figure, encapsulating the CPG network and the sensory feedback-based correction rules:

https://github.com/NeLy-EPFL/_media/blob/main/flygym/mdp.png?raw=true

Here, we will build yet another layer on top of HybridTurningController, implementing the aforementioned sensory preprocessing logic (cyan arrow) and encapsulating it inside the new MDP. As before, a complete, functioning implementation of this class is provided under flygym.mujoco.examples.vision on the FlyGym repository.

We start by defining an __init__ method. This time, we will specify the threshold used in the binary thresholding step. Any pixel darker than this number will be considered part of the black sphere. We will also define a decision interval \(\tau\): the turning signal is recomputed every \(\tau\) seconds. We will compute the center of mass (COM) of all ommatidia so that later when we need to compute the COM of the object (a masked subset of pixels), we can simply take the average of the COMs of the selected pixels. Finally, we will override the definition of the observation space with a 6-dimensional one (x, y positions of the object per side, plus the relative size of the object per side).

class VisualTaxis(HybridTurningNMF):
    def __init__(self, obj_threshold=0.15, decision_interval=0.05, **kwargs):
        super().__init__(**kwargs)

        self.obj_threshold = obj_threshold
        self.decision_interval = decision_interval
        self.num_substeps = int(self.decision_interval / self.timestep)
        self.visual_inputs_hist = []

        self.coms = np.empty((self.retina.num_ommatidia_per_eye, 2))
        for i in range(self.retina.num_ommatidia_per_eye):
            mask = self.retina.ommatidia_id_map == i + 1
            self.coms[i, :] = np.argwhere(mask).mean(axis=0)

        self.observation_space = spaces.Box(0, 1, shape=(6,))

Next, let’s implement the visual preprocessing logic discussed above:

def _process_visual_observation(self, raw_obs):
    features = np.zeros((2, 3))
    for i, ommatidia_readings in enumerate(raw_obs["vision"]):
        is_obj = ommatidia_readings.max(axis=1) < self.obj_threshold
        is_obj_coords = self.coms[is_obj]
        if is_obj_coords.shape[0] > 0:
            features[i, :2] = is_obj_coords.mean(axis=0)
        features[i, 2] = is_obj_coords.shape[0]
    features[:, 0] /= self.retina.nrows  # normalize y_center
    features[:, 1] /= self.retina.ncols  # normalize x_center
    features[:, 2] /= self.retina.num_ommatidia_per_eye  # normalize area
    return features.flatten()

In the step method, we will replace the raw observation with the output of the _process_visual_observation method. We will also record the retina images every time the simulation renders a frame for the recorded video. This way, we can visualize the retina images along with the recorded video later:

def step(self, control_signal):
    for _ in range(self.num_substeps):
        raw_obs, _, _, _, _ = super().step(control_signal)
        render_res = super().render()
        if render_res is not None:
            # record visual inputs too because they will be played in the video
            self.visual_inputs_hist.append(raw_obs["vision"].copy())
    visual_features = self._process_visual_observation(raw_obs)
    return visual_features, 0, False, False, {}

Finally, we implement the reset method:

def reset(self, seed=0, **kwargs):
    raw_obs, _ = super().reset(seed=seed)
    return self._process_visual_observation(raw_obs), {}

Implementing a object tracking controller#

Now that we have implemented the arena and the new Gym environment, we just need to define the actual controller logic that outputs the 2D descending representation based on the extracted visual features. As a proof-of-concept, we have hand-tuned the following relationship:

\[\begin{split}\delta_i = \begin{cases} \min(\max(k a_i + b, \delta_\text{min}), \delta_\text{max}) & \text{if } A_i > A_\text{thr} \\ 1 & \text{otherwise} \end{cases}\end{split}\]

where \(\delta_i\) is the descending modulation signal on side \(i\); \(a_i\) is the azimuth of the object expressed as the deviation from the anterior edge of the eye’s field of view, normalized by the horizontal field of view of the retina; \(A_i\) is the relative size of the object on the arena; \(k=-3\), \(b=1\) are parameters describing the response curve; \(\delta_\text{min}=0.2\), \(\delta_\text{max}=1\) describe the range of the descending signal; \(A_\text{thr} = 1\%\) is the threshold below which the object is considered unseen from the eye.

Before we implement this in the main simulation loop, let’s instantiate our arena and Gym environment:

 1from flygym.mujoco.examples.vision import MovingObjArena, VisualTaxis
 2
 3obj_threshold = 0.15
 4decision_interval = 0.05
 5contact_sensor_placements = [
 6    f"{leg}{segment}"
 7    for leg in ["LF", "LM", "LH", "RF", "RM", "RH"]
 8    for segment in ["Tibia", "Tarsus1", "Tarsus2", "Tarsus3", "Tarsus4", "Tarsus5"]
 9]
10arena = MovingObjArena()
11sim_params = Parameters(
12    render_camera="birdeye_cam",
13    render_playspeed=0.5,
14    render_window_size=(800, 608),
15    enable_adhesion=True,
16    enable_vision=True,
17)
18nmf = VisualTaxis(
19    obj_threshold=obj_threshold,
20    decision_interval=decision_interval,
21    sim_params=sim_params,
22    arena=arena,
23    contact_sensor_placements=contact_sensor_placements,
24)

As before, let’s check if this environment complies with the Gym interface. Despite a few warnings on design choices, no errors should be raised.

1check_env(nmf)
/opt/homebrew/Caskroom/miniforge/base/envs/flygym0.2/lib/python3.11/site-packages/gymnasium/utils/env_checker.py:247: UserWarning: WARN: For Box action spaces, we recommend using a symmetric and normalized space (range=[-1, 1] or [0, 1]). See https://stable-baselines3.readthedocs.io/en/master/guide/rl_tips.html for more information.
  logger.warn(
/opt/homebrew/Caskroom/miniforge/base/envs/flygym0.2/lib/python3.11/site-packages/gymnasium/utils/env_checker.py:125: UserWarning: WARN: The default seed argument in reset should be None, otherwise the environment will by default always be deterministic. Actual default: 0
  logger.warn(
/opt/homebrew/Caskroom/miniforge/base/envs/flygym0.2/lib/python3.11/site-packages/gymnasium/utils/passive_env_checker.py:175: UserWarning: WARN: The default seed argument in Env.reset should be None, otherwise the environment will by default always be deterministic. Actual default: seed=0
  logger.warn(
/opt/homebrew/Caskroom/miniforge/base/envs/flygym0.2/lib/python3.11/site-packages/gymnasium/utils/env_checker.py:321: UserWarning: WARN: Not able to test alternative render modes due to the environment not having a spec. Try instantialising the environment through gymnasium.make
  logger.warn(

Now, we implement the main simulation loop:

 1def calc_ipsilateral_speed(deviation, is_found):
 2    if not is_found:
 3        return 1.0
 4    else:
 5        return np.clip(1 - deviation * 3, 0.2, 1.0)
 6
 7
 8num_substeps = int(decision_interval / sim_params.timestep)
 9
10obs_hist = []
11deviations_hist = []
12control_signal_hist = []
13
14obs, _ = nmf.reset()
15for i in trange(70):
16    left_deviation = 1 - obs[1]
17    right_deviation = obs[4]
18    left_found = obs[2] > 0.01
19    right_found = obs[5] > 0.01
20    if not left_found:
21        left_deviation = np.nan
22    if not right_found:
23        right_deviation = np.nan
24    control_signal = np.array(
25        [
26            calc_ipsilateral_speed(left_deviation, left_found),
27            calc_ipsilateral_speed(right_deviation, right_found),
28        ]
29    )
30
31    obs, _, _, _, _ = nmf.step(control_signal)
32    obs_hist.append(obs)
33    deviations_hist.append([left_deviation, right_deviation])
34    control_signal_hist.append(control_signal)
100%|██████████| 70/70 [02:33<00:00,  2.19s/it]

To inspect the recorded video:

1nmf.save_video("./outputs/object_following.mp4")

We can use the save_video_with_vision_insets utility function to regenerate this video, but with insets at the bottom illustrating the visual experience of the fly:

1from flygym.mujoco.vision.visualize import save_video_with_vision_insets
2
3save_video_with_vision_insets(
4    nmf, "./outputs/object_following_with_retina_images.mp4", nmf.visual_inputs_hist
5)
WARNING:imageio_ffmpeg:IMAGEIO FFMPEG_WRITER WARNING: input image is not divisible by macro_block_size=16, resizing from (800, 763) to (800, 768) to ensure video compatibility with most codecs and players. To prevent resizing, make your input image divisible by the macro_block_size or set the macro_block_size to 1 (risking incompatibility).