Path integration
================
.. note::
**Author:** Sibo Wang-Chen
The code presented in this notebook has been simplified and
restructured for display in a notebook format. A more complete and better
structured implementation can be found in the
`examples folder of the FlyGym repository on
GitHub `__.
This tutorial is available in ``.ipynb`` format in the
`notebooks folder of the FlyGym repository `_.
**Summary:** In this tutorial, we will show how the position and heading
of the animal can be estimated by integrating mechanosensory signals, a
process known as path integration.
Introduction
------------
In the previous tutorials, we have demonstrated how brain-level
processes can drive the motor system via *descending control*.
*Ascending* motor signals are a complement to descending information as
they convey information, often mechanosensory in nature, back to the
brain. Ascending signals are predicted to inform brain-level
action selection, motor planning, and sensory contextualization (see
`Chen et al., 2023 `__). In
this tutorial and the next, we will demonstrate how incorporating
ascending motor feedback signals allows us to model behaviors that are
critical to the animal’s survival and functioning.
Animals, including flies, estimate their own orientation and distance
traveled (“odometry”) to perform path integration when navigating the
world. A superb example of path integration was demonstrated in the desert
ant *Cataglyphis fortis* (see `review by Wolf,
2011 `__). While the ant takes an
exploratory outbound path in search of a food source, it can return to
the nest in a straight line. Furthermore, if the experimenter moves the
ant to a different location upon finding the food, the ant still takes
the “correct” path back to the nest, but starting from the location
where it has been “air dropped” (as shown below). These results show
that the ant must be using idiothetic cues, rather than sensory input,
to navigate — similar to how sailors used to navigate featureless oceans
by “dead reckoning.”
.. figure:: https://github.com/NeLy-EPFL/_media/blob/main/flygym/path_integration/pathint_schematic.png?raw=true
:width: 300
The fly *Drosophila melanogaster* also performs path integration,
especially when navigating to find food sources (see `Kim & Dickinson,
2017 `__, `Behbahani et al.,
2021 `__). While the source
of the idiothetic cues are unknown, they may, in principle, be derived
using ascending proprioceptive and tactile signals from the legs and
motor system. As a demonstration, we will attempt to estimate the
changes in the fly’s orientation (shown below in green) and displacement
(shown below in purple) based on proprioceptive and tactile information.
By integrating these changes over time, we aim to reconstruct the path
of the fly in space (right).
.. figure:: https://github.com/NeLy-EPFL/_media/blob/main/flygym/path_integration/pathint_integration.png?raw=true
:width: 600
The algorithm
-------------
Having hand-waved at how we aim to accomplish path integration, we will
try to make our proposed algorithm more precise. We can partition our
task into two sub-problems:
1. How do we predict the changes in heading and forward displacement
from proprioception and tactile information?
2. With these estimated “deltas” in heading and forward displacement,
how do we integrate them to find the position?
The first question can be tackled in the following way: for each leg, we
accumulate stride lengths by computing the forward translation of the
leg tip relative to the thorax when the leg is pushing. Proprioception
gives us the positions of the leg tips; tactile sensing tells us whether
the leg was in contact with the ground, which we use as a proxy to
detect leg pushing. Together, proprioception and tactile sensing allow
us to calculate the stride lengths. Then, we compute the differences and
sums of the left and right total stride lengths for each pair of legs
over a small time window. These left-right differences and their sums
are then used to predict the change in heading and forward displacement
respectively. We will use a simple linear regression model for this
purpose.
However, we have glossed over one important detail: to predict the
change in heading and forward displacement, we need the *changes* in the
cumulated left-right differences and sums. But to calculate these
changes, we must first define a *time scale*. This underlies our
solution to the second part of the problem. We record the cumulated
left-right differences and sums at every point in time during the
walking simulation. Then, for every time :math:`t`, can compute the
changes in the left-right differences and sums from time :math:`t-\tau`
(:math:`\tau` being the time scale) to time :math:`t`. If we use these
“delta differences” and “delta sums” to predict “delta heading” and
“delta forward displacement,” we can get the changes in heading and
forward displacement in the same time scale. Therefore, we can simply
re-normalize the predicted values by the time scale, and integrate the
position in 2D. This process can be shown in the following schematic:
.. figure:: https://github.com/NeLy-EPFL/_media/blob/main/flygym/path_integration/pathint_prediction.png?raw=true
:width: 400
In the next sections, we will test this algorithm.
Collecting walking data
-----------------------
To train the models, we first need to collect data where the fly walks
in a trajectory similar to foraging desert ants. To this end, we will
construct a scenario in which the fly model performs random exploration
of a featureless environment. Here, the fly alternates between forward
walking and in-place turning. We will control turning in a Poisson
process with a rate :math:`\lambda_\text{turn}=2\text{ s}^{-1}`. This
turning rate is quite high compared to the range of typical fly
behavior. This is to deliberately make path integration more difficult.
When the fly executes a turn, we will apply a fixed asymmetrical
descending drive of
:math:`[{\rm DN}_\text{inner}, {\rm DN}_\text{outer}]` which has the
following values:
- :math:`[{\rm DN}_\text{inner}, {\rm DN}_\text{outer}] = [-0.2, 1.0]`
for the tripod and tetrapod gaits
- :math:`[{\rm DN}_\text{inner}, {\rm DN}_\text{outer}] = [0.4, 1.0]`
for the wave gait
These choices lead to qualitatively similar turning across gait types.
The direction of the turn is chosen at random. We will sample the
duration of the turn (and therefore the angle turned) from a normal
distribution :math:`\mathcal{N}(0.4\text{ s}, 0.1\text{ s})`. The fly
receives no visual information — akin to navigating in the dark. We will
use the `hybrid turning
controller `__,
but with the correction amounts set to 0 for simplicity.
.. code-block:: ipython3
import numpy as np
import pickle
from tqdm import trange
from pathlib import Path
from typing import Optional
from flygym import Fly, Camera
from flygym.preprogrammed import get_cpg_biases
from flygym.examples.path_integration import PathIntegrationArenaFlat
Let’s define the discrete walking states as an ``Enum`` class (see
`this tutorial `__ for more
information on Enum if you are not familiar with it, but this is not
required).
.. code-block:: ipython3
from enum import Enum
class WalkingState(Enum):
FORWARD = 0
TURN_LEFT = 1
TURN_RIGHT = 2
STOP = 3
Next, we will define a random exploration controller that controls the
switch between straight walking and turning in a Poisson process, as
discussed above. We will implement this controller as a class with a
``.step`` method, which returns the next state and the corresponding
descending drives. Note that this controller is detached from the
physics simulation — it only tells us which walking state the simulated
fly *should* be in in the next step.
In a Poisson process, the cumulative distribution function of the
exponential distribution is
.. math:: F(x) = 1 - e^{-\lambda x}
Therefore, the probability that the transition will happen within the
next :math:`\Delta t` seconds is
.. math:: P(T_{\rm turn} \leq {\rm d} t) = 1 - e^{-\lambda \Delta t}
where :math:`\Delta t` is the simulation time step and
:math:`T_{\rm turn}` is the time until the next transition to turning.
As a result, we will change the state to turning if and only if a scalar
uniformly randomly sampled from 0 to 1 is greater than
:math:`e^{-\lambda \Delta t}`.
.. code-block:: ipython3
from typing import Union
class RandomExplorationController:
"""This controller drives a random exploration: the fly transitions
between forward walking and turning in a Poisson process. When the fly
turns, the turn direction is chosen randomly and the turn duration is
drawn from a normal distribution.
"""
def __init__(
self,
dt: float,
forward_dn_drive: tuple[float, float] = (1.0, 1.0),
left_turn_dn_drive: tuple[float, float] = (-0.4, 1.2),
right_turn_dn_drive: tuple[float, float] = (1.2, -0.4),
turn_duration_mean: float = 0.4,
turn_duration_std: float = 0.1,
lambda_turn: float = 1.0,
seed: int = 0,
init_time: float = 0.1,
) -> None:
"""
Parameters
----------
dt : float
Time step of the simulation.
forward_dn_drive : tuple[float, float], optional
DN drives for forward walking, by default (1.0, 1.0).
left_turn_dn_drive : tuple[float, float], optional
DN drives for turning left, by default (-0.4, 1.2).
right_turn_dn_drive : tuple[float, float], optional
DN drives for turning right, by default (1.2, -0.4).
turn_duration_mean : float, optional
Mean of the turn duration distribution in seconds, by default
0.4.
turn_duration_std : float, optional
Standard deviation of the turn duration distribution in
seconds, by default 0.1.
lambda_turn : float, optional
Rate of the Poisson process for turning, by default 1.0.
seed : int, optional
Random seed, by default 0.
init_time : float, optional
Initial time, in seconds, during which the fly walks straight,
by default 0.1.
"""
self.random_state = np.random.RandomState(seed)
self.dt = dt
self.init_time = init_time
self.curr_time = 0.0
self.curr_state: WalkingState = WalkingState.FORWARD
self._curr_turn_duration: Union[None, float] = None
# DN drives
self.dn_drives = {
WalkingState.FORWARD: np.array(forward_dn_drive),
WalkingState.TURN_LEFT: np.array(left_turn_dn_drive),
WalkingState.TURN_RIGHT: np.array(right_turn_dn_drive),
}
# Turning related parameters
self.turn_duration_mean = turn_duration_mean
self.turn_duration_std = turn_duration_std
self.lambda_turn = lambda_turn
def step(self):
"""
Update the fly's walking state.
Returns
-------
WalkingState
The next state of the fly.
tuple[float, float]
The next DN drives.
"""
# Upon spawning, just walk straight for a bit (init_time) for things to settle
if self.curr_time < self.init_time:
self.curr_time += self.dt
return WalkingState.FORWARD, self.dn_drives[WalkingState.FORWARD]
# Forward -> turn transition
if self.curr_state == WalkingState.FORWARD:
# exponential function defining how likely it is that transition will NOT
# happen in the next time step
p_nochange = np.exp(-self.lambda_turn * self.dt)
if self.random_state.rand() > p_nochange:
# decide turn duration and direction
self._curr_turn_duration = self.random_state.normal(
self.turn_duration_mean, self.turn_duration_std
)
self.curr_state = self.random_state.choice(
[WalkingState.TURN_LEFT, WalkingState.TURN_RIGHT]
)
self.curr_state_start_time = self.curr_time
# Turn -> forward transition
if self.curr_state in (WalkingState.TURN_LEFT, WalkingState.TURN_RIGHT):
if self.curr_time - self.curr_state_start_time > self._curr_turn_duration:
self.curr_state = WalkingState.FORWARD
self.curr_state_start_time = self.curr_time
self.curr_time += self.dt
return self.curr_state, self.dn_drives[self.curr_state]
As discussed, we will use the hybrid turning controller that we have
implemented previously. However, still missing from the
``HybridTurningController`` class is the ability to find the coordinates
of the leg tips (or any point at all) in the reference frame of the fly.
We will now extend ``HybridTurningController`` to a new
``PathIntegrationController`` class that has a
``.absolute_to_relative_pos`` method that does exactly this. We will add
a ``"stride_diff_unmasked"`` key to the observation that records how
much the leg tips have shifted from the previous simulation time step.
More precisely, for each leg,
.. math::
\text{stride_diff_unmasked}[i] =
\text{rel_leg_tip_pos}[i] - \text{rel_leg_tip_pos}[i - 1]
where :math:`\text{rel_leg_tip_pos}[i]` is the position of the tip of
this leg at the :math:`i`-th step.
.. code-block:: ipython3
from flygym.examples.locomotion import HybridTurningController
class PathIntegrationController(HybridTurningController):
"""
A wrapper of ``HybridTurningController`` that records variables that
are used to perform path integration.
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._last_end_effector_pos: Union[None, np.ndarray] = None
self.total_stride_lengths_hist = []
self.heading_estimate_hist = []
self.pos_estimate_hist = []
def step(self, action):
"""
Same as ``HybridTurningController.step``, but also records the
stride for each leg (i.e., how much the leg tip has moved in the
fly's egocentric frame since the last step) in the observation
space under the key "stride_diff_unmasked". Note that this
calculation does not take into account whether the "stride" is
actually made during a power stroke (i.e., stance phase); it only
reports the change in end effector positions in an "unmasked"
manner. The user should post-process it using the contact mask as a
part of the model.
"""
obs, reward, terminated, truncated, info = super().step(action)
# Calculate stride since last step for each leg
ee_pos_rel = self.absolute_to_relative_pos(
obs["end_effectors"][:, :2], obs["fly"][0, :2], obs["fly_orientation"][:2]
)
if self._last_end_effector_pos is None:
ee_diff = np.zeros_like(ee_pos_rel)
else:
ee_diff = ee_pos_rel - self._last_end_effector_pos
self._last_end_effector_pos = ee_pos_rel
obs["stride_diff_unmasked"] = ee_diff
return obs, reward, terminated, truncated, info
@staticmethod
def absolute_to_relative_pos(
pos: np.ndarray, base_pos: np.ndarray, heading: np.ndarray
) -> np.ndarray:
rel_pos = pos - base_pos
heading = heading / np.linalg.norm(heading)
angle = np.arctan2(heading[1], heading[0])
rot_matrix = np.array(
[[np.cos(-angle), -np.sin(-angle)], [np.sin(-angle), np.cos(-angle)]]
)
pos_rotated = np.dot(rel_pos, rot_matrix.T)
return pos_rotated
Now, we are ready to write a ``run_simulation`` function that interfaces
the state switching controller with the physics-embedded NeuroMechFly
simulation:
.. code-block:: ipython3
def run_simulation(
seed: int = 0,
running_time: float = 20.0,
gait: str = "tripod",
output_dir: Optional[Path] = None,
):
contact_sensor_placements = [
f"{leg}{segment}"
for leg in ["LF", "LM", "LH", "RF", "RM", "RH"]
for segment in ["Tibia", "Tarsus1", "Tarsus2", "Tarsus3", "Tarsus4", "Tarsus5"]
]
fly = Fly(
enable_adhesion=True,
draw_adhesion=True,
contact_sensor_placements=contact_sensor_placements,
spawn_pos=(0, 0, 0.25),
)
arena = PathIntegrationArenaFlat()
cam = Camera(fly=fly, camera_id="birdeye_cam", play_speed=0.5, timestamp_text=True)
sim = PathIntegrationController(
phase_biases=get_cpg_biases(gait),
fly=fly,
arena=arena,
cameras=[cam],
timestep=1e-4,
correction_rates={"retraction": (0, 0), "stumbling": (0, 0)},
)
random_exploration_controller = RandomExplorationController(
dt=sim.timestep,
lambda_turn=2,
seed=seed,
forward_dn_drive=(1.0, 1.0),
left_turn_dn_drive=(0.2, 1.0) if gait == "wave" else (-0.2, 1.0),
right_turn_dn_drive=(1.0, 0.2) if gait == "wave" else (1.0, -0.2),
)
obs, info = sim.reset(0)
obs_hist, info_hist, action_hist = [], [], []
_real_heading_buffer = []
for i in trange(int(running_time / sim.timestep)):
walking_state, dn_drive = random_exploration_controller.step()
action_hist.append(dn_drive)
obs, reward, terminated, truncated, info = sim.step(dn_drive)
# Get real heading
orientation_x, orientation_y = obs["fly_orientation"][:2]
real_heading = np.arctan2(orientation_y, orientation_x)
_real_heading_buffer.append(real_heading)
obs_hist.append(obs)
info_hist.append(info)
# Save data if output_dir is provided
if output_dir is not None:
output_dir.mkdir(parents=True, exist_ok=True)
with open(output_dir / "sim_data.pkl", "wb") as f:
data = {
"obs_hist": obs_hist,
"info_hist": info_hist,
"action_hist": action_hist,
}
pickle.dump(data, f)
Let’s run a 1-second simulation and plot the fly’s trajectory:
.. code-block:: ipython3
from pathlib import Path
output_dir = Path("outputs/path_integration/")
output_dir.mkdir(parents=True, exist_ok=True)
run_simulation(
seed=0, running_time=1.0, gait="tripod", output_dir=output_dir
)
.. parsed-literal::
100%|██████████| 10000/10000 [00:36<00:00, 270.96it/s]
.. code-block:: ipython3
import matplotlib.pyplot as plt
with open(output_dir / "sim_data.pkl", "rb") as f:
sim_data = pickle.load(f)
trajectory = np.array([obs["fly"][0, :2] for obs in sim_data["obs_hist"]])
fig, ax = plt.subplots(figsize=(5, 4), tight_layout=True)
ax.plot(trajectory[:, 0], trajectory[:, 1], label="Trajectory")
ax.plot([0], [0], "ko", label="Origin")
ax.legend()
ax.set_aspect("equal")
ax.set_xlabel("x position (mm)")
ax.set_ylabel("y position (mm)")
fig.savefig(output_dir / "trajectory_sample_1s.png")
.. figure:: https://github.com/NeLy-EPFL/_media/blob/main/flygym/path_integration/trajectory_sample_1s.png?raw=true
We can also plot the recorded shifts in leg tip positions relative to
the fly’s thorax:
.. code-block:: ipython3
stride_diff_unmasked = np.array(
[x["stride_diff_unmasked"] for x in sim_data["obs_hist"]]
)
t_grid = np.arange(stride_diff_unmasked.shape[0]) * 1e-4
fig, axs = plt.subplots(3, 1, figsize=(5, 5), tight_layout=True, sharex=True)
for i, leg_pair in enumerate(["Front", "Middle", "Hind"]):
ax = axs[i]
ax.axhline(0, color="k", linestyle="-", lw=1)
left_ts = stride_diff_unmasked[:, i, :]
right_ts = stride_diff_unmasked[:, i + 3, :]
ax.plot(t_grid, left_ts[:, 0], lw=1, label="Left")
ax.plot(t_grid, right_ts[:, 0], lw=1, label="Right")
ax.set_ylim(-0.02, 0.02)
if i == 0:
ax.legend(ncols=2, loc="lower right")
if i == 2:
ax.set_xlabel("Time (s)")
ax.set_ylabel("x offset (mm)")
ax.set_title(f"{leg_pair} legs")
fig.savefig(output_dir / "ee_shift_1s.png")
.. figure:: https://github.com/NeLy-EPFL/_media/blob/main/flygym/path_integration/ee_shift_1s.png?raw=true
This plot shows the time series of the change in the x position (along
the anterior-posterior axis) of the leg tips from the previous time
step. Note that the values can be both positive and negative. This is
because we are simply reporting the shift in the claw positions without
taking into account whether the legs are in stance or swing yet.
In the NeuroMechFly v2 paper, we ran 15 trials with different random
seeds for each of the three gaits: tripod gait, tetrapod gait, and wave
gait. Each trial was 20 seconds long. In this tutorial, we will use only
5 trials for the tripod gait. We have uploaded the simulation data of
all trials to a SFTP server. Instead of running these simulations
yourself (which would take roughly 20 minutes on a machine with 5+
cores), you can simply download the data by running the following code
block:
.. code-block:: ipython3
# TODO. We are working with our IT team to set up a gateway to share these data publicly
# in a secure manner. We aim to update this by the end of June. Please reach out to us
# by email in the meantime.
.. code-block:: ipython3
exploration_data_dir = (
Path.home() / "Data/flygym_demo_data/path_integration/random_exploration/"
)
if not exploration_data_dir.is_dir():
raise FileNotFoundError(
"Pregenerated simulation data not found. Please download it from TODO."
)
else:
print(f"[OK] Pregenerated simulation data found. Ready to proceed.")
.. parsed-literal::
[OK] Pregenerated simulation data found. Ready to proceed.
Extracting input and target variables from simulation data
----------------------------------------------------------
Let’s start by loading basic information — time series of end effector
positions, ground contact forces, descending drives, fly orientation,
and fly position — from the simulation data files.
.. code-block:: ipython3
import gc
def load_trial_data(trial_dir: Path) -> dict[str, np.ndarray]:
"""Load simulation data from trial.
The difference between ``load_trial_data`` and ``extract_variables`` is
that the former loads the raw data from the trial (i.e., physics
simulation). The latter extracts variables from these raw data subject
to additional parameters such as time scale. For each trial, we only
call ``load_trial_data`` once, but we may call ``extract_variables``
multiple times with different parameters.
Parameters
----------
trial_dir : Path
Path to the directory containing the trial data saved in
``exploration.run_simulation``.
Returns
-------
dict[str, np.ndarray]
Dictionary containing the following keys, each mapping to a time
series saved as a numpy array:
* "end_effector_pos_diff": End effector positions.
* "contact_force": Contact forces.
* "dn_drive": DN drives.
* "fly_orientation_xy": Fly orientation in the form of a unit vector
on the xy plane.
* "fly_orientation_angle": Fly orientation in the form of an angle
in radians.
* "fly_pos": Fly position.
"""
with open(trial_dir / "sim_data.pkl", "rb") as f:
sim_data = pickle.load(f)
obs_hist = sim_data["obs_hist"]
# End effector positions
end_effector_pos_diff = np.array(
[obs["stride_diff_unmasked"] for obs in obs_hist], dtype=np.float32
)
# Contact forces
contact_force_ts = np.array(
[obs["contact_forces"] for obs in obs_hist], dtype=np.float32
)
contact_force_ts = np.linalg.norm(contact_force_ts, axis=2) # calc force magnitude
contact_force_ts = contact_force_ts.reshape(-1, 6, 6).sum(axis=2) # total per leg
# Fly position
fly_pos_ts = np.array([obs["fly"][0, :2] for obs in obs_hist], dtype=np.float32)
# Heading
fly_orientation_xy = np.array(
[obs["fly_orientation"][:2] for obs in obs_hist], dtype=np.float32
)
fly_orientation_angle = np.arctan2(
fly_orientation_xy[:, 1], fly_orientation_xy[:, 0]
)
# Clear RAM right away manually to avoid memory fragmentation
del sim_data
gc.collect()
return {
"end_effector_pos_diff": end_effector_pos_diff.astype(np.float32),
"contact_force": contact_force_ts.astype(np.float32),
"fly_orientation_xy": fly_orientation_xy.astype(np.float32),
"fly_orientation_angle": fly_orientation_angle.astype(np.float32),
"fly_pos": fly_pos_ts.astype(np.float32),
}
.. code-block:: ipython3
trial_data = []
num_trials = 5
for seed in range(num_trials):
print(f"Loading trial {seed + 1} of {num_trials}...")
trial_dir = exploration_data_dir / f"seed={seed}_gait=tripod"
data = load_trial_data(trial_dir)
trial_data.append(data)
.. parsed-literal::
Loading trial 1 of 5...
Loading trial 2 of 5...
Loading trial 3 of 5...
Loading trial 4 of 5...
Loading trial 5 of 5...
.. code-block:: ipython3
trial_data[0].keys()
.. parsed-literal::
dict_keys(['end_effector_pos_diff', 'contact_force', 'fly_orientation_xy', 'fly_orientation_angle', 'fly_pos'])
We will perform some sanity tests on the data. As before, we can
visualize the per-step change in end effector (leg tip) positions over 1
second of simulation, but this time in 2D:
.. code-block:: ipython3
data = trial_data[0]
fig, axs = plt.subplots(1, 3, figsize=(9, 3), tight_layout=True)
for i, leg_pair in enumerate(["Front", "Middle", "Hind"]):
ax = axs[i]
ax.axvline(0, color="k", linestyle="--", lw=1)
ax.axhline(0, color="k", linestyle="--", lw=1)
ax.plot(
data["end_effector_pos_diff"][10000:20000, i, 0],
data["end_effector_pos_diff"][10000:20000, i, 1],
lw=1,
label="Left",
)
ax.plot(
data["end_effector_pos_diff"][10000:20000, i + 3, 0],
data["end_effector_pos_diff"][10000:20000, i + 3, 1],
lw=1,
label="Right"
)
ax.set_aspect("equal")
ax.set_xlim(-0.02, 0.02)
ax.set_ylim(-0.02, 0.02)
ax.set_title(f"{leg_pair} leg tips")
if i == 0:
ax.set_xlabel("x offset (mm)")
ax.set_ylabel("y offset (mm)")
ax.legend(ncols=2, loc="lower center")
fig.savefig(output_dir / "ee_shift_2d.png")
.. figure:: https://github.com/NeLy-EPFL/_media/blob/main/flygym/path_integration/ee_shift_2d.png?raw=true
.. code-block:: ipython3
t_grid = np.arange(data["contact_force"].shape[0]) * 1e-4
fig, axs = plt.subplots(3, 1, figsize=(9, 6), tight_layout=True, sharex=True)
for i, leg_pair in enumerate(["Front", "Middle", "Hind"]):
ax = axs[i]
ax.plot(t_grid, data["contact_force"][:, i], lw=1, label="Left")
ax.plot(t_grid, data["contact_force"][:, i + 3], lw=1, label="Right")
ax.set_xlim(2.5, 3)
ax.set_ylim(0, 30)
ax.set_title(f"{leg_pair} leg contact force")
ax.set_ylabel("Force (mN)")
if i == 2:
ax.set_xlabel("Time (s)")
if i == 0:
ax.legend(ncols=2, loc="upper right")
fig.savefig(output_dir / "ee_contact_force.png")
.. figure:: https://github.com/NeLy-EPFL/_media/blob/main/flygym/path_integration/ee_contact_force.png?raw=true
From the contact force time series, we can observe that:
1. There are roughly 6 groups of non-zero blocks per time series. These
are the 6 stance phases per line over the period of 0.5 seconds (the
CPG frequency is 12 Hz).
2. The two sides are not necessarily symmetrical. This is because the
fly might turn during walking.
3. The hind leg has a lower signal-to-noise ratio than the front and
middle legs.
Next, we will inspect the fly’s orientation and position:
.. code-block:: ipython3
fig, axs = plt.subplots(1, 2, figsize=(8, 3), tight_layout=True)
unwrapped_heading = np.unwrap(data["fly_orientation_angle"])
axs[0].plot(t_grid, np.rad2deg(unwrapped_heading), lw=1, color="k")
axs[0].set_xlabel("Time (s)")
axs[0].set_ylabel(r"Heading ($^\circ$)")
axs[0].set_title("Fly heading")
axs[1].plot(data["fly_pos"][:, 0], data["fly_pos"][:, 1], lw=1, color="k")
axs[1].plot([0], [0], "ko", label="Origin")
axs[1].set_aspect("equal")
axs[1].set_xlabel("x position (mm)")
axs[1].set_ylabel("y position (mm)")
axs[1].legend(loc="lower right")
axs[1].set_title("Fly trajectory")
fig.savefig(output_dir / "heading_and_trajectory.png")
.. figure:: https://github.com/NeLy-EPFL/_media/blob/main/flygym/path_integration/heading_and_trajectory.png?raw=true
Recall the algorithm that we have proposed. To train the models, we need
to collect the following *input* variables to the model:
- Difference in the left-right *sum* of cumulated stride lengths,
``stride_total_diff_lrsum``
- Difference in the left-right *difference* of cumulated stride
lengths, ``stride_total_diff_lrdiff``
… and the following *target* variables (i.e., what the models are
supposed to predict):
- Difference in the fly’s heading, ``heading_diff``
- Difference in the fly’s total forward displacement,
``forward_disp_total_diff``
There are two things to note here:
1. We have not implemented the calculation of stride lengths yet;
``stride_diff_unmasked`` is only the shift of the leg tip position
from one time step to the next.
2. As discussed in the Algorithm section, the differences above are
calculated over a predefined time scale :math:`\tau`.
To calculate the cumulated stride lengths given
``stride_diff_unmasked``, we need to mask it with a boolean time series
indicating whether the leg is “pushing” (as opposed to swinging) before
taking the cumulative sum. More precisely,
.. math::
\begin{gather*}
\text{stride_total}[0] = 0 \\
\text{stride_total}[i] = \text{stride_total}[i - 1] +
\big( \text{mask}[i] \cdot \text{ stride_diff_unmasked}[i] \big)
\quad \text{for } i > 0
\end{gather*}
where :math:`\text{mask}[i]` is a boolean indicating whether the leg is
in the power stroke (push). In our example, we will use the ground
contact force to determine if the leg is in contact with the floor. If
it is, then the leg is executing a power stroke. We will use a threshold
of 0.5 mN, 1 mN, and 3mN for the front, middle, and hind legs
respectively.
Once we have the cumulative stride lengths for each leg, we can
calculate how it changes over the predefined time scale :math:`\tau`:
.. math::
\text{stride_total_diff}[i] =
\text{stride_total}[i] - \text{stride_total}[i - \text{window_len}]
where :math:`\text{window_len} = \tau / \Delta t` is the number of
simulation steps over the time scale :math:`\tau`.
With this, we can finally calculate the changes in the left-right sum
and left-right difference of cumulative stride lengths for each leg pair
over time:
.. math::
\begin{align*}
\text{stride_total_diff_lrsum}[i] &=
\text{stride_total_diff}_\text{left}[i] +
\text{stride_total_diff}_\text{right}[i] \\
\text{stride_total_diff_lrdiff}[i] &=
\text{stride_total_diff}_\text{left}[i] -
\text{stride_total_diff}_\text{right}[i] \\
\end{align*}
Having extracted the *input* variables, we will next extract the target
*output* variables: the changes in the fly’s heading and forward
displacement over the same time scale.
Calculating the change in the fly’s heading is straightforward: for each
simulation step :math:`i`,
.. math::
\text{heading_diff}[i] = \text{heading}[i] - \text{heading}[i - \text{window_len}]
\quad \text{wrapped to $[-\pi, \pi)$}
where :math:`\text{heading}` is the heading angle.
To calculate the change in the fly’s forward displacement, we first have to
accumulate the forward displacement from one step to the next over the
whole simulation. We will call this variable
:math:`\text{forward_disp}`. This sounds simply like the total travel
distance, but the critical difference is that at the scale of single
simulation steps, we discard lateral movements. Then, similar to the
change in heading, we can simply calculate the cumulative forward
displacement over the time period of :math:`\tau`:
.. math::
\begin{gather*}
\text{forward_disp}[0] = 0, \\
\text{forward_disp}[i] =
\text{forward_disp}[i - 1] + \text{d_forward_disp}[i]
\quad\text{for } i > 0
\end{gather*}
where
.. math::
\text{d_forward_disp}[i] = (\overrightarrow{\text{position}}[i] -
\overrightarrow{\text{position}}[i - 1]) \cdot
\begin{bmatrix}
\cos(\text{heading}[i])\\
\sin(\text{heading}[i])
\end{bmatrix}
where :math:`\overrightarrow{\text{position}}[i]` is the fly’s vector
position (x-y) at simulation step :math:`i`.
With this, the change in total forward displacement is:
.. math::
\text{forward_disp_diff}[i] =
\text{forward_disp}[i] -
\text{forward_disp}[i - \text{window_len}]
Let’s implement a function that extracts these variables:
.. code-block:: ipython3
def extract_variables(
trial_data: dict[str, np.ndarray],
time_scale: float,
contact_force_thr: tuple[float, float, float],
dt: float = 1e-4,
) -> dict[str, np.ndarray]:
"""
Extract variables used for path integration from trial data.
The difference between ``load_trial_data`` and ``extract_variables`` is
that the former loads the raw data from the trial (i.e., physics
simulation). The latter extracts variables from these raw data subject
to additional parameters such as time scale. For each trial, we only
call ``load_trial_data`` once, but we may call ``extract_variables``
multiple times with different parameters.
Parameters
----------
trial_data : dict[str, np.ndarray]
Dictionary containing trial data.
time_scale : float
Time scale for path integration.
contact_force_thr : tuple[float, float, float]
Thresholds for contact forces. These are used to determine whether
a leg is in contact with the ground.
dt : float, optional
Time step of the physics simulation in the trial, by default 1e-4.
"""
window_len = int(time_scale / dt)
# contact force thresholds: (3,) -> (6,), for both sides
contact_force_thr = np.array([*contact_force_thr, *contact_force_thr])
# Mechanosensory signal ==========
# Calculate total stride (Σstride) for each side
stride_left = trial_data["end_effector_pos_diff"][:, :3, 0] # (L, 3)
stride_right = trial_data["end_effector_pos_diff"][:, 3:, 0] # (L, 3)
contact_mask = trial_data["contact_force"] > contact_force_thr[None, :] # (L, 6)
stride_left = (stride_left * contact_mask[:, :3])
stride_right = (stride_right * contact_mask[:, 3:])
stride_total_left = np.cumsum(stride_left, axis=0)
stride_total_right = np.cumsum(stride_right, axis=0)
# Calculate difference in Σstride over proprioceptive time window (ΔΣstride)
stride_total_diff_left = (
stride_total_left[window_len:] - stride_total_left[:-window_len]
)
stride_total_diff_right = (
stride_total_right[window_len:] - stride_total_right[:-window_len]
)
# Calculate sum and difference in ΔΣstride over two sides
stride_total_diff_lrsum = stride_total_diff_left + stride_total_diff_right
stride_total_diff_lrdiff = stride_total_diff_left - stride_total_diff_right
# Change in locomotion state (heading & displacement) ==========
# Calculate change in fly orientation over proprioceptive time window (Δheading)
fly_orientation_xy = trial_data["fly_orientation_xy"]
fly_orientation_angle = trial_data["fly_orientation_angle"]
heading_diff = (
fly_orientation_angle[window_len:] - fly_orientation_angle[:-window_len]
)
heading_diff = (heading_diff + np.pi) % (2 * np.pi) - np.pi # wrap to [-π, π]
# Same for displacement projected in the direction of fly's heading
# Use projection formula: proj_v(u) = (u · v) / (v · v) * v where v is the fly's
# heading vector and u is the change in position
fly_disp_xy = np.diff(trial_data["fly_pos"], axis=0, prepend=0)
fly_orientation_xy_norm = np.linalg.norm(fly_orientation_xy, axis=1)
fly_orientation_xy_unit = fly_orientation_xy / fly_orientation_xy_norm[:, None]
udotv = np.sum(fly_disp_xy * fly_orientation_xy_unit, axis=1)
vdotv = np.sum(fly_orientation_xy_unit * fly_orientation_xy_unit, axis=1)
forward_disp_mag = udotv / vdotv
forward_disp_total = np.cumsum(forward_disp_mag)
forward_disp_total_diff = (
forward_disp_total[window_len:] - forward_disp_total[:-window_len]
)
return {
"stride_total_diff_lrsum": stride_total_diff_lrsum.astype(np.float32),
"stride_total_diff_lrdiff": stride_total_diff_lrdiff.astype(np.float32),
"heading_diff": heading_diff.astype(np.float32),
"forward_disp_total_diff": forward_disp_total_diff.astype(np.float32),
}
Let’s use this function to extract the input and target variables at a
time scale of 0.32 s using a contact force threshold of 0.5 mN, 1 mN,
and 3 mN for the front, middle, and hind legs respectively:
.. code-block:: ipython3
time_scale = 0.32
contact_force_thr = (0.5, 1, 3)
extracted_variables = [
extract_variables(data, time_scale, contact_force_thr) for data in trial_data
]
We are trying to predict the change in forward displacement from the
changes in left-right sums, and the change in heading from the
left-right differences. Let’s plot these variable in one trial to decide
if these are qualitatively good predictors:
.. code-block:: ipython3
ext_vars = extracted_variables[0]
t_grid_trim = t_grid[-ext_vars["stride_total_diff_lrsum"].shape[0] :]
fig, axs = plt.subplots(2, 1, figsize=(9, 6), tight_layout=True, sharex=True)
axs[0].axhline(0, color="k", linestyle="--", lw=1)
for i, leg in enumerate(["Front", "Middle", "Hind"]):
axs[0].plot(
t_grid_trim,
-ext_vars["stride_total_diff_lrsum"][:, i],
lw=1,
label=f"ΔL-R sum ({leg.lower()})",
)
axs[0].plot(
t_grid_trim,
ext_vars["forward_disp_total_diff"],
lw=2,
color="k",
label="Δforward displacement",
)
axs[0].legend(loc="upper center", ncol=4)
axs[0].set_ylabel("Length (mm)")
axs[0].set_ylim(-2, 10)
axs[0].set_title("Δforward displacement predictors and target")
axs[1].axhline(0, color="k", linestyle="--", lw=1)
for i, leg in enumerate(["Front", "Middle", "Hind"]):
axs[1].plot(
t_grid_trim,
-ext_vars["stride_total_diff_lrdiff"][:, i],
lw=1,
label=f"ΔL-R difference ({leg.lower()})",
)
axs[1].plot(
t_grid_trim,
-ext_vars["heading_diff"],
lw=2,
color="k",
label="Δheading",
)
axs[1].legend(loc="upper center", ncol=4)
axs[1].set_ylabel("Length (mm)")
axs[1].set_xlabel("Time (s)")
axs[1].set_ylim(-6, 6)
axs[1].set_title("Δheading predictors and target")
fig.savefig(output_dir / "pathint_predictors_and_target.png")
.. figure:: https://github.com/NeLy-EPFL/_media/blob/main/flygym/path_integration/pathint_predictors_and_target.png?raw=true
We observe that the inputs (blue, orange, and green lines) indeed seem
to be good predictors of the target (black lines). Next, we will train
the prediction models based on our proposed algorithm.
Training models to predict changes in locomotor state
-----------------------------------------------------
Once the input and target variables have been extracted, training the
models themselves is relatively easy. As discussed, we will train two
linear models to predict the changes in forward displacement and heading
using changes in the left-right sums and differences in cumulative
stride lengths:
.. math::
\begin{align*}
\text{heading_diff_pred}[i] &=
\sum_{pos\in\{\text{front}, \text{middle}, \text{hind}\}}
\big(
k_{pos}^{({\rm h})} \cdot \text{stride_total_diff_lrsum}_{pos}[i]
\big) + b^{({\rm h})} \\
\text{forward_disp_diff_pred}[i] &=
\sum_{pos\in\{\text{front}, \text{middle}, \text{hind}\}}
\big(
k_{pos}^{({\rm fd})} \cdot \text{stride_total_diff_lrdiff}_{pos}[i]
\big) + b^{({\rm fd})} \\
\end{align*}
where :math:`\text{heading_diff_pred}` and
:math:`\text{forward_disp_diff_pred}` are the model’s predictions of
:math:`\text{heading_diff}` and :math:`\text{forward_disp_diff}`;
:math:`k_{pos}^{({\rm h})}`, :math:`b^{({\rm h})}`,
:math:`k_{pos}^{({\rm fd})}`, and :math:`b^{({\rm fd})}` are the
parameters to be fitted. While we are using all three pairs of legs in
this example, a different set of legs can be used instead.
Recall that we have 5 trials per gait type. We will concatenate the
first 4 trials to form the training set, and then use the last trial for
testing.
.. code-block:: ipython3
stride_total_diff_lrsum_train = np.concatenate(
[ext_vars["stride_total_diff_lrsum"] for ext_vars in extracted_variables[:4]]
)
stride_total_diff_lrdiff_train = np.concatenate(
[ext_vars["stride_total_diff_lrdiff"] for ext_vars in extracted_variables[:4]]
)
heading_diff_train = np.concatenate(
[ext_vars["heading_diff"] for ext_vars in extracted_variables[:4]]
)
forward_disp_total_diff_train = np.concatenate(
[ext_vars["forward_disp_total_diff"] for ext_vars in extracted_variables[:4]]
)
stride_total_diff_lrsum_test = extracted_variables[4]["stride_total_diff_lrsum"]
stride_total_diff_lrdiff_test = extracted_variables[4]["stride_total_diff_lrdiff"]
heading_diff_test = extracted_variables[4]["heading_diff"]
forward_disp_total_diff_test = extracted_variables[4]["forward_disp_total_diff"]
Then, we will train the linear models using the ``LinearRegression``
class from scikit-learn.
.. code-block:: ipython3
from sklearn.linear_model import LinearRegression
def fit_1d_linear_model(x: np.ndarray, y: np.ndarray) -> tuple[LinearRegression, float]:
model = LinearRegression()
model.fit(x, y)
r2 = model.score(x, y)
return model, r2
heading_model, heading_model_r2 = fit_1d_linear_model(
stride_total_diff_lrdiff_train, heading_diff_train
)
fwd_disp_model, fwd_disp_model_r2 = fit_1d_linear_model(
stride_total_diff_lrsum_train, forward_disp_total_diff_train
)
print("Δheading model:")
print(f" coefficients (front, middle, hind legs): {heading_model.coef_}")
print(f" intercept: {heading_model.intercept_}")
print(f" r2 score (training set): {heading_model_r2}")
print("Δforward displacement model:")
print(f" coefficients (front, middle, hind legs): {fwd_disp_model.coef_}")
print(f" intercept: {fwd_disp_model.intercept_}")
print(f" r2 score (training set): {fwd_disp_model_r2}")
.. parsed-literal::
Δheading model:
coefficients (front, middle, hind legs): [0.24994847 0.18084855 0.02075428]
intercept: -0.006393308751285076
r2 score (training set): 0.961056459682645
Δforward displacement model:
coefficients (front, middle, hind legs): [-0.5035825 -0.3369246 0.00302732]
intercept: 0.49183177947998047
r2 score (training set): 0.9717244581507696
Integrating changes in locomotor state to estimate position
-----------------------------------------------------------
Now that we have built models that can estimate the changes in heading
and forward displacement, we will integrate these changes to estimate
the fly’s location in space. To do this, we essentially reverse the
process of extracting the change signals: whereas previously we have
taken the per-step changes in cumulative stride lengths as an estimation
of instantaneous changes, we will now sum these changes as an
approximation of continuous integration.
More formally, from the model-predicted change in heading,
:math:`\text{heading_diff_pred}`, the estimated heading can be given
by
.. math::
\text{heading_pred}[i] =
\sum_{i'=0}^i \frac{\text{heading_diff_pred}[i']}{\text{window_len}}
where, once again, :math:`\text{window_len} = \tau / \Delta t` is the
number of simulation steps over the time scale :math:`\tau`.
To obtain the estimated position vector,
:math:`\overrightarrow{\text{position_pred}}`, we have to take into
account the fact that the change in *forward* displacement must be
integrated in the direction of the fly’s instantaneous heading:
.. math::
\overrightarrow{\text{position_pred}}[i] =
\sum_{i'=0}^i \frac{\text{fwd_disp_diff_pred}[i']}{\text{window_len}}
\begin{bmatrix}
\cos(\text{heading_pred}[i'])\\
\sin(\text{heading_pred}[i'])
\end{bmatrix}
We will now implement this integration logic:
.. code-block:: ipython3
from typing import Callable
def path_integrate(
trial_data: dict[str, np.ndarray],
heading_model: Callable,
displacement_model: Callable,
time_scale: float,
contact_force_thr: tuple[float, float, float],
dt: float = 1e-4,
):
"""
Perform path integration on trial data.
Parameters
----------
trial_data : dict[str, np.ndarray]
Dictionary containing trial data.
heading_model : Callable
Model for predicting change in heading.
displacement_model : Callable
Model for predicting change in displacement.
time_scale : float
Time scale for path integration.
contact_force_thr : tuple[float, float, float]
Thresholds for contact forces. These are used to determine whether
a leg is in contact with the ground.
dt : float
Time step of the physics simulation in the trial.
Returns
-------
dict[str, np.ndarray]
Dictionary containing the following keys:
* "heading_pred": Predicted heading.
* "heading_actual": Actual heading.
* "pos_pred": Predicted position.
* "pos_actual": Actual position.
* "heading_diff_pred": Predicted change in heading.
* "heading_diff_actual": Actual change in heading.
* "displacement_diff_pred": Predicted change in displacement.
* "displacement_diff_actual": Actual change in displacement.
"""
window_len = int(time_scale / dt)
variables = extract_variables(
trial_data, time_scale=time_scale, contact_force_thr=contact_force_thr, dt=dt
)
# Integrate heading
heading_diff_pred = heading_model(variables["stride_total_diff_lrdiff"])
heading_pred = np.cumsum(heading_diff_pred / window_len)
# Path int. not performed when not enough data is available. Start from the real
# heading at the moment when path int. actually starts.
hx_start, hy_start = trial_data["fly_orientation_xy"][window_len, :]
real_heading_start = np.arctan2(hy_start, hx_start)
heading_pred += real_heading_start
# Integrate displacement
displacement_diff_pred = displacement_model(variables["stride_total_diff_lrsum"])
displacement_diff_x_pred = np.cos(heading_pred) * displacement_diff_pred
displacement_diff_y_pred = np.sin(heading_pred) * displacement_diff_pred
pos_x_pred = np.cumsum(displacement_diff_x_pred / window_len)
pos_y_pred = np.cumsum(displacement_diff_y_pred / window_len)
pos_x_pred += trial_data["fly_pos"][window_len, 0]
pos_y_pred += trial_data["fly_pos"][window_len, 1]
pos_pred = np.concatenate([pos_x_pred[:, None], pos_y_pred[:, None]], axis=1)
# Pad with NaN where prediction not available
padding = np.full(window_len, np.nan)
heading_pred = np.concatenate([padding, heading_pred])
pos_pred = np.concatenate([np.full((window_len, 2), np.nan), pos_pred], axis=0)
heading_diff_pred = np.concatenate([padding, heading_diff_pred])
heading_diff_actual = np.concatenate([padding, variables["heading_diff"]])
displacement_diff_pred = np.concatenate([padding, displacement_diff_pred])
displacement_diff_actual = np.concatenate(
[padding, variables["forward_disp_total_diff"]]
)
return {
"heading_pred": heading_pred,
"heading_actual": trial_data["fly_orientation_angle"],
"pos_pred": pos_pred,
"pos_actual": trial_data["fly_pos"],
"heading_diff_pred": heading_diff_pred,
"heading_diff_actual": heading_diff_actual,
"displacement_diff_pred": displacement_diff_pred,
"displacement_diff_actual": displacement_diff_actual,
}
We can run this function on the last trial, which has been reserved for
testing:
.. code-block:: ipython3
path_integration_res = path_integrate(
trial_data[4],
heading_model.predict, # this is LinearRegression's method for making prediction
fwd_disp_model.predict, # "
time_scale,
contact_force_thr,
)
… and inspect the time series of predicted vs. actual changes in heading
and forward displacement on this test dataset.
.. code-block:: ipython3
fig, axs = plt.subplots(2, 1, figsize=(6, 4), tight_layout=True, sharex=True)
axs[0].plot(
t_grid,
np.rad2deg(path_integration_res["heading_diff_actual"]),
lw=1,
color="black",
label="Actual",
)
axs[0].plot(
t_grid,
np.rad2deg(path_integration_res["heading_diff_pred"]),
lw=1,
color="tab:red",
label="Predicted",
)
axs[0].set_ylabel(r"Δheading ($^\circ$)")
axs[0].set_ylim(-90, 90)
axs[0].legend(loc="lower left", ncols=2)
axs[1].plot(
t_grid,
np.rad2deg(path_integration_res["displacement_diff_actual"]),
lw=1,
color="black",
label="Actual",
)
axs[1].plot(
t_grid,
np.rad2deg(path_integration_res["displacement_diff_pred"]),
lw=1,
color="tab:red",
label="Predicted",
)
axs[1].set_ylabel("Δfwd. disp. (mm)")
axs[1].set_ylim(50, 300)
axs[1].legend(loc="lower left", ncols=2)
axs[1].set_xlabel("Time (s)")
fig.savefig(output_dir / "path_integration_diff.png")
.. figure:: https://github.com/NeLy-EPFL/_media/blob/main/flygym/path_integration/path_integration_diff.png?raw=true
:width: 500
Similarly, we can plot the integrated estimation of heading and total
forward displacement:
.. code-block:: ipython3
fig, axs = plt.subplots(2, 1, figsize=(6, 4), tight_layout=True, sharex=True)
axs[0].plot(
t_grid,
np.rad2deg(np.unwrap(path_integration_res["heading_actual"])),
lw=1,
color="black",
label="Actual",
)
axs[0].plot(
t_grid,
np.rad2deg(path_integration_res["heading_pred"]),
lw=1,
color="tab:red",
label="Predicted",
)
axs[0].set_ylabel(r"Heading ($^\circ$)")
axs[0].legend(loc="lower left", ncols=2)
fwd_disp_total_actual = np.cumsum(
np.nan_to_num(path_integration_res["displacement_diff_actual"])
) / (time_scale / 1e-4)
fwd_disp_total_pred = np.cumsum(
np.nan_to_num(path_integration_res["displacement_diff_pred"])
) / (time_scale / 1e-4)
axs[1].plot(
t_grid,
fwd_disp_total_actual,
lw=1,
color="black",
label="Actual",
)
axs[1].plot(
t_grid,
fwd_disp_total_pred,
lw=1,
color="tab:red",
label="Predicted",
)
axs[1].set_ylabel("Cumulative fwd. disp. (mm)")
axs[1].legend(loc="lower right", ncols=2)
axs[1].set_xlabel("Time (s)")
fig.savefig(output_dir / "path_integration_cumulative.png")
.. figure:: https://github.com/NeLy-EPFL/_media/blob/main/flygym/path_integration/path_integration_cumulative.png?raw=true
:width: 500
Finally, we can plot the estimated and true trajectories of the fly:
.. code-block:: ipython3
fig, ax = plt.subplots(figsize=(4, 4), tight_layout=True)
ax.plot(
path_integration_res["pos_actual"][:, 0],
path_integration_res["pos_actual"][:, 1],
lw=1,
color="black",
label="Actual",
)
ax.plot(
path_integration_res["pos_pred"][:, 0],
path_integration_res["pos_pred"][:, 1],
lw=1,
color="tab:red",
label="Predicted",
)
ax.plot([0], [0], "ko", label="Origin")
ax.set_aspect("equal")
ax.set_xlabel("x position (mm)")
ax.set_ylabel("y position (mm)")
ax.set_xlim(-50, 50)
ax.set_ylim(-50, 50)
ax.legend(loc="lower left")
fig.savefig(output_dir / "path_integration_trajectory.png")
.. figure:: https://github.com/NeLy-EPFL/_media/blob/main/flygym/path_integration/path_integration_trajectory.png?raw=true
:width: 500
We can observe that, although the model gives excellent predictions in
heading and forward displacement, small errors in heading can lead to
larger errors in the final position estimation. This is simply due to
the fact that walking straight in a slightly wrong direction amplifies
the error in the estimated position. Therefore, while path integration
based solely on idiothetic cues is possible, calibration of the
integrator based on sensory inputs appears to be critical.