Overcome complex terrain with a hybrid controller

Summary: In this tutorial, we will illustrate how the CPG-based and rule-based controllers fail to produce robust locomotion over complex terrains. We will then build a combined CPG and sensory feedback-based hybrid controller that overcomes these deficits and generates more robust walking.

In the Controlling locomotion with CPGs and Rule-based controller tutorials, we have demonstrated how different control strategies can lead to effective walking over simple, flat terrain. However, in the real world animals locomote over complex surfaces. To achieve this, animals likely use a combination of CPGs and sensory feedback to produce agile, adaptive body kinematics. To explore how sensory feedback can facilitate locomotion over rugged surfaces, we have developed three rugged terrain types which complement our baseline smooth terrain: one with gaps perpendicular to the initial heading of the fly, one with blocks of variable height, and one that is a mixture of these two. Using these new terrain types, we will now demonstrate how one can examine the efficacy of different bioinspired locomotor control strategies.

Defining the arena

The environment that the fly is situated in is defined by the Arena class. So far, we have been using the default terrain: FlatTerrain. Some other arenas are also included in FlyGym package, including GappedTerrain, BlocksTerrain, and MixedTerrain. In future tutorials, we will introduce more arenas with sensory (visual, olfactory) features. The user can define custom arenas by inheriting from the flygym.mujoco.arena.BaseArena abstract class.

Let’s start by defining a couple of terrains:

 1from flygym.mujoco.arena import FlatTerrain, GappedTerrain, BlocksTerrain, MixedTerrain
 2
 3
 4def get_arena(arena_type):
 5    if arena_type == "flat":
 6        return FlatTerrain()
 7    elif arena_type == "gapped":
 8        return GappedTerrain()
 9    elif arena_type == "blocks":
10        return BlocksTerrain()
11    elif arena_type == "mixed":
12        return MixedTerrain()
13    else:
14        raise ValueError(f"Unknown arena type: {arena_type}")

Let’s put a fly into each of these terrains, run 0.01 seconds of the simulation so the fly can stabilize on the floor, and visualize how the fly looks in these different terrain types:

 1import numpy as np
 2import matplotlib.pyplot as plt
 3from tqdm import tqdm, trange
 4from flygym.mujoco import Parameters, NeuroMechFly
 5from flygym.mujoco.examples import PreprogrammedSteps
 6
 7preprogrammed_steps = PreprogrammedSteps()  # we will use the neutral pose from this
 8
 9sim_params = Parameters(
10    timestep=1e-4,
11    enable_adhesion=True,
12    render_fps=101,
13    render_playspeed=1,
14    render_playspeed_text=False,
15)
16
17fig, axs = plt.subplots(2, 2, figsize=(6, 6), tight_layout=True)
18terrain_types = ["flat", "gapped", "blocks", "mixed"]
19for i, terrain_type in tqdm(enumerate(terrain_types)):
20    ax = axs.flat[i]
21    terrain = get_arena(terrain_type)
22    nmf = NeuroMechFly(sim_params=sim_params, arena=terrain, spawn_pos=(0, 0, 0.2))
23    nmf.reset()
24    for _ in range(100):
25        action = {
26            "joints": preprogrammed_steps.default_pose,
27            "adhesion": np.ones(6, dtype=np.int32),
28        }
29        nmf.step(action)
30        nmf.render()
31    ax.imshow(nmf._frames[-1])
32    ax.axis("off")
33    ax.set_title(f"{terrain_type.title()} terrain")
34
35fig.savefig("./outputs/complex_terrain_overview.png")
4it [00:13,  3.29s/it]
https://github.com/NeLy-EPFL/_media/blob/main/flygym/complex_terrain_overview.png?raw=true

Basic CPG-based and rule-based controllers

Do the CPG-based and rule-based controllers work well over complex terrain? Let’s run the simulation for 0.5 seconds using each of these controllers on each of the different terrain types:

 1from flygym.mujoco.examples.cpg_controller import CPGNetwork, run_cpg_simulation
 2from flygym.mujoco.examples.rule_based_controller import (
 3    RuleBasedSteppingCoordinator,
 4    construct_rules_graph,
 5    run_rule_based_simulation,
 6)
 7
 8run_time = 0.5
 9timestep = 1e-4
10
11for controller_name in ["CPG-based", "Rule-based"]:
12    for terrain_name in terrain_types:
13        print(f"* Running {controller_name} controller on {terrain_name} terrain")
14
15        terrain = get_arena(terrain_name)
16
17        # Initialize the simulation
18        sim_params = Parameters(
19            timestep=1e-4,
20            render_mode="saved",
21            render_playspeed=0.1,
22            enable_adhesion=True,
23            draw_adhesion=True,
24        )
25        nmf = NeuroMechFly(sim_params=sim_params, arena=terrain, spawn_pos=(0, 0, 0.2))
26
27        if controller_name == "CPG-based":
28            intrinsic_freqs = np.ones(6) * 12
29            intrinsic_amps = np.ones(6) * 1
30            phase_biases = np.pi * np.array(
31                [
32                    [0, 1, 0, 1, 0, 1],
33                    [1, 0, 1, 0, 1, 0],
34                    [0, 1, 0, 1, 0, 1],
35                    [1, 0, 1, 0, 1, 0],
36                    [0, 1, 0, 1, 0, 1],
37                    [1, 0, 1, 0, 1, 0],
38                ]
39            )
40            coupling_weights = (phase_biases > 0) * 10
41            convergence_coefs = np.ones(6) * 20
42            cpg_network = CPGNetwork(
43                timestep=timestep,
44                intrinsic_freqs=intrinsic_freqs,
45                intrinsic_amps=intrinsic_amps,
46                coupling_weights=coupling_weights,
47                phase_biases=phase_biases,
48                convergence_coefs=convergence_coefs,
49            )
50            run_cpg_simulation(nmf, cpg_network, preprogrammed_steps, run_time)
51        elif controller_name == "Rule-based":
52            weights = {
53                "rule1": -10,
54                "rule2_ipsi": 2.5,
55                "rule2_contra": 1,
56                "rule3_ipsi": 3.0,
57                "rule3_contra": 2.0,
58            }
59            rules_graph = construct_rules_graph()
60            controller = RuleBasedSteppingCoordinator(
61                timestep=timestep,
62                rules_graph=rules_graph,
63                weights=weights,
64                preprogrammed_steps=preprogrammed_steps,
65            )
66            run_rule_based_simulation(nmf, controller, run_time)
67        else:
68            raise ValueError(f"Unknown controller: {controller}")
69
70        x_pos = nmf.get_observation()["fly"][0, 0]
71        print(f"Final x position: {x_pos:.4f} mm")
72
73        nmf.save_video(f"./outputs/{controller_name}_{terrain_name}.mp4")
* Running CPG-based controller on flat terrain
100%|██████████| 5000/5000 [00:13<00:00, 371.84it/s]
Final x position: 4.9144 mm
* Running CPG-based controller on gapped terrain
100%|██████████| 5000/5000 [00:11<00:00, 440.77it/s]
Final x position: 2.9075 mm
* Running CPG-based controller on blocks terrain
100%|██████████| 5000/5000 [00:10<00:00, 470.04it/s]
Final x position: 4.2828 mm
* Running CPG-based controller on mixed terrain
100%|██████████| 5000/5000 [00:11<00:00, 449.06it/s]
Final x position: 4.0499 mm
* Running Rule-based controller on flat terrain
100%|██████████| 5000/5000 [00:18<00:00, 272.57it/s]
Final x position: 2.6169 mm
* Running Rule-based controller on gapped terrain
100%|██████████| 5000/5000 [00:12<00:00, 413.62it/s]
Final x position: 1.0374 mm
* Running Rule-based controller on blocks terrain
100%|██████████| 5000/5000 [00:13<00:00, 377.23it/s]
Final x position: 2.0600 mm
* Running Rule-based controller on mixed terrain
100%|██████████| 5000/5000 [00:12<00:00, 407.71it/s]
Final x position: 2.3158 mm

Though we have only tested one initial condition (spawn position, controller) per case, we can already begin to observe that the CPG-based and rule-based controllers may not be very robust over complex terrain. In fact, if we run 20 initial conditions for 1 second each, we get a result like the one reported in the NeuroMechFly 2.0 paper (Wang-Chen et al., 2023):

https://github.com/NeLy-EPFL/_media/blob/main/flygym/cpg_rule_based_comparison.png?raw=true

We can look more closely at some examples of failed locomotion:

CPG-based controller over gapped terrain:

Rule-based controller over gapped terrain:

In the next section, we will show that by combining CPGs with sensory feedback, we can build a more robust controller.

Building a hybrid controller

Now, we will build a hybrid controller that integrates both feedforward oscillators as well as feedback-based mechanisms that reposition the legs if they get stuck. As described in the NeuroMechFly 2.0 paper, we will detect the following conditions:

  1. Retraction: In principle, with the tripod gait, there should always be three legs on the ground. Therefore, if any leg is extended farther than the third most extended leg in the z-direction, this leg may be stuck in a hole. This rule will lift the leg to recover it from a stuck position.

  2. Stumbling: In principle, only the tip of the tarsus of each leg should contact with the ground. Therefore, we will consider the fly as stumbling if the tibia or upper tarsal segments (1 and 2) collide with the ground resulting in a supra-threshold force against the direction of the fly’s heading. To correct for stumbling we will lift the stumbling leg.

To implement these rules, we will create a variable for each rule that keeps track of the extent to which a given leg should be lifted:

1retraction_correction = np.zeros(6)
2stumbling_correction = np.zeros(6)

We will also define a vector representing how each DoF should be adjusted to implement leg lifting. We will call this \(\vec{v}_\text{leg}\).

1correction_vectors = {
2    # "leg pos": (Coxa, Coxa_roll, Coxa_yaw, Femur, Fimur_roll, Tibia, Tarsus1)
3    # unit: radian
4    "F": np.array([0, 0, 0, -0.02, 0, 0.016, 0]),
5    "M": np.array([-0.015, 0, 0, 0.004, 0, 0.01, -0.008]),
6    "H": np.array([0, 0, 0, -0.01, 0, 0.005, 0]),
7}

That is, when the leg should be lifted, we will increment the joint angles on this leg by \(\vec{v}_\text{leg}\) scaled by a factor defining the extent of correction. When the condition is no longer met, we will reduce the correction term until it reaches zero (ie. with no adjustment) so that the target angles applied to the simulator are those suggested by the corresponding CPGs.

Next, we need to define the factor defining the extent of correction. Recall that we will progressively lift the leg when an adjustment is necessary. Therefore, let’s also define the rate of adjustment \(k_\text{inc}\) when the condition is met and the rate of recovery \(k_\text{dec}\) when the condition is no longer met:

1correction_rates = {
2    # "rule": (increment rate, decrement rate). unit: 1/sec
3    "retraction": (500, 1000 / 3),
4    "stumbling": (2000, 500),
5}

Concretely, we will initialize the amount of correction \(c\) to 0. This variable is unitless. For every \(t\) amount of time that the condition is met, we increment \(c\) by \(k_\text{inc}t\) where \(k_\text{inc}\) is the appropriate correction rate. Similarly, for every \(t\) amount of time that the condition is no longer met, we will decrement \(c\) by \(k_\text{dec}t\) until it reaches 0. We will therefore adjust the leg joint angles by adding \(c\vec{v}_\text{leg}\) to it.

We should also define a threshold for the stumbling force. Note that a negative number indicates a force against the direction in which the fly is facing:

1stumbling_force_threshold = -1

Next, we will define the underlying CPG network as we did in the tutorial on CPG-based control:

 1run_time = 1
 2timestep = 1e-4
 3
 4# Define leg raise correction vectors
 5correction_vectors = {
 6    # "leg pos": (Coxa, Coxa_roll, Coxa_yaw, Femur, Femur_roll, Tibia, Tarsus1)
 7    "F": np.array([0, 0, 0, -0.02, 0, 0.016, 0]),
 8    "M": np.array([-0.015, 0, 0, 0.004, 0, 0.01, -0.008]),
 9    "H": np.array([0, 0, 0, -0.01, 0, 0.005, 0]),
10}
11
12# Define leg raise rates
13correction_rates = {"retraction": (500, 1000 / 3), "stumbling": (2000, 500)}
14
15# Initialize CPG network
16intrinsic_freqs = np.ones(6) * 12
17intrinsic_amps = np.ones(6) * 1
18phase_biases = np.pi * np.array(
19    [
20        [0, 1, 0, 1, 0, 1],
21        [1, 0, 1, 0, 1, 0],
22        [0, 1, 0, 1, 0, 1],
23        [1, 0, 1, 0, 1, 0],
24        [0, 1, 0, 1, 0, 1],
25        [1, 0, 1, 0, 1, 0],
26    ]
27)
28coupling_weights = (phase_biases > 0) * 10
29convergence_coefs = np.ones(6) * 20
30cpg_network = CPGNetwork(
31    timestep=1e-4,
32    intrinsic_freqs=intrinsic_freqs,
33    intrinsic_amps=intrinsic_amps,
34    coupling_weights=coupling_weights,
35    phase_biases=phase_biases,
36    convergence_coefs=convergence_coefs,
37)

Similarly, let’s define the preprogrammed steps:

1# Initialize preprogrammed steps
2preprogrammed_steps = PreprogrammedSteps()

… and the NeuroMechFly simulation over mixed terrain. We will enable contact detection for all tibial and tarsal segments to achieve stumbling detection:

 1# Initialize NeuroMechFly simulation
 2sim_params = Parameters(
 3    timestep=1e-4,
 4    render_mode="saved",
 5    render_playspeed=0.1,
 6    enable_adhesion=True,
 7    draw_adhesion=True,
 8)
 9contact_sensor_placements = [
10    f"{leg}{segment}"
11    for leg in preprogrammed_steps.legs
12    for segment in ["Tibia", "Tarsus1", "Tarsus2", "Tarsus3", "Tarsus4", "Tarsus5"]
13]
14arena = MixedTerrain()
15nmf = NeuroMechFly(
16    sim_params=sim_params,
17    arena=arena,
18    init_pose="stretch",
19    control="position",
20    spawn_pos=(0, 0, 0.2),
21    contact_sensor_placements=contact_sensor_placements,
22)

Let’s build a dictionary containing the indices of the contact sensors on each leg. These will be used to detect stumbling:

1detected_segments = ["Tibia", "Tarsus1", "Tarsus2"]
2stumbling_sensors = {leg: [] for leg in preprogrammed_steps.legs}
3for i, sensor_name in enumerate(nmf.contact_sensor_placements):
4    leg = sensor_name.split("/")[1][:2]  # sensor_name: eg. "Animat/LFTarsus1"
5    segment = sensor_name.split("/")[1][2:]
6    if segment in detected_segments:
7        stumbling_sensors[leg].append(i)
8stumbling_sensors = {k: np.array(v) for k, v in stumbling_sensors.items()}

As a sanity check, let’s make sure that the number of stumble sensors per leg is as expected:

1if any(v.size != len(detected_segments) for v in stumbling_sensors.values()):
2    raise RuntimeError(
3        "Contact detection must be enabled for all tibia, tarsus1, and tarsus2 "
4        "segments for stumbling detection."
5    )

We are now ready to write the main simulation loop. We will implement and execute the entire loop before explaining its constituent components:

 1obs, info = nmf.reset()
 2for _ in trange(int(run_time / nmf.sim_params.timestep)):
 3    # retraction rule: is any leg stuck in a hole and needs to be retracted?
 4    end_effector_z_pos = obs["fly"][0][2] - obs["end_effectors"][:, 2]
 5    end_effector_z_pos_sorted_idx = np.argsort(end_effector_z_pos)
 6    end_effector_z_pos_sorted = end_effector_z_pos[end_effector_z_pos_sorted_idx]
 7    if end_effector_z_pos_sorted[-1] > end_effector_z_pos_sorted[-3] + 0.05:
 8        leg_to_correct_retraction = end_effector_z_pos_sorted_idx[-1]
 9    else:
10        leg_to_correct_retraction = None
11
12    cpg_network.step()
13    joints_angles = []
14    adhesion_onoff = []
15    for i, leg in enumerate(preprogrammed_steps.legs):
16        # update retraction correction amounts
17        if i == leg_to_correct_retraction:  # lift leg
18            increment = correction_rates["retraction"][0] * nmf.timestep
19            retraction_correction[i] += increment
20            nmf.change_segment_color(f"{leg}Tibia", (1, 0, 0, 1))
21        else:  # condition no longer met, lower leg
22            decrement = correction_rates["retraction"][1] * nmf.timestep
23            retraction_correction[i] = max(0, retraction_correction[i] - decrement)
24            nmf.change_segment_color(f"{leg}Tibia", (0.5, 0.5, 0.5, 1))
25
26        # update stumbling correction amounts
27        contact_forces = obs["contact_forces"][stumbling_sensors[leg], :]
28        fly_orientation = obs["fly_orientation"]
29        # force projection should be negative if against fly orientation
30        force_proj = np.dot(contact_forces, fly_orientation)
31        if (force_proj < stumbling_force_threshold).any():
32            increment = correction_rates["stumbling"][0] * nmf.timestep
33            stumbling_correction[i] += increment
34            nmf.change_segment_color(f"{leg}Femur", (1, 0, 0, 1))
35        else:
36            decrement = correction_rates["stumbling"][1] * nmf.timestep
37            stumbling_correction[i] = max(0, stumbling_correction[i] - decrement)
38            nmf.change_segment_color(f"{leg}Femur", (0.5, 0.5, 0.5, 1))
39
40        # retraction correction has priority
41        if retraction_correction[i] > 0:
42            net_correction = retraction_correction[i]
43        else:
44            net_correction = stumbling_correction[i]
45
46        # get target angles from CPGs and apply correction
47        my_joints_angles = preprogrammed_steps.get_joint_angles(
48            leg, cpg_network.curr_phases[i], cpg_network.curr_magnitudes[i]
49        )
50        my_joints_angles += net_correction * correction_vectors[leg[1]]
51        joints_angles.append(my_joints_angles)
52
53        # get adhesion on/off signal
54        my_adhesion_onoff = preprogrammed_steps.get_adhesion_onoff(
55            leg, cpg_network.curr_phases[i]
56        )
57        adhesion_onoff.append(my_adhesion_onoff)
58
59    action = {
60        "joints": np.array(np.concatenate(joints_angles)),
61        "adhesion": np.array(adhesion_onoff).astype(int),
62    }
63    obs, reward, terminated, truncated, info = nmf.step(action)
64    nmf.render()
100%|██████████| 10000/10000 [00:23<00:00, 432.09it/s]

At each simulation time step, we first check whether the retraction rule is met. This depends on whether any leg is extended further than the third most extended leg in the z-direction by a margin of 0.05 mm. This margin is important because contact calculations in the physics simulator are imperfect sometimes causing the leg to penetrate the floor by a small amount. If two legs meet this condition, only the most extended leg is corrected:

# retraction rule: does a leg need to be retracted from a hole?
end_effector_z_pos = obs["fly"][0][2] - obs["end_effectors"][:, 2]
end_effector_z_pos_sorted_idx = np.argsort(end_effector_z_pos)
end_effector_z_pos_sorted = end_effector_z_pos[end_effector_z_pos_sorted_idx]
if end_effector_z_pos_sorted[-1] > end_effector_z_pos_sorted[-3] + 0.05:
    leg_to_correct_retraction = end_effector_z_pos_sorted_idx[-1]
else:
    leg_to_correct_retraction = None

Then, have an inner loop that iterates over all legs. The joint angles and adhesion on/off signals are calculated here. We first update the amount of correction \(c\) for the retraction rule:

# update amount of retraction correction
if i == leg_to_correct_retraction:  # lift leg
    increment = correction_rates["retraction"][0] * nmf.timestep
    retraction_correction[i] += increment
    nmf.change_segment_color(f"{leg}Tibia", (1, 0, 0, 1))
else:  # condition no longer met, lower leg
    decrement = correction_rates["retraction"][1] * nmf.timestep
    retraction_correction[i] = max(0, retraction_correction[i] - decrement)
    nmf.change_segment_color(f"{leg}Tibia", (0.5, 0.5, 0.5, 1))

Similarly, we update the correction amount \(c\) for the stumbling rule:

# update amount of stumbling correction
contact_forces = obs["contact_forces"][stumbling_sensors[leg], :]
fly_orientation = obs["fly_orientation"]
# force projection should be negative if against fly orientation
force_proj = np.dot(contact_forces, fly_orientation)
if (force_proj < stumbling_force_threshold).any():
    increment = correction_rates["stumbling"][0] * nmf.timestep
    stumbling_correction[i] += increment
    nmf.change_segment_color(f"{leg}Femur", (1, 0, 0, 1))
else:
    decrement = correction_rates["stumbling"][1] * nmf.timestep
    stumbling_correction[i] = max(0, stumbling_correction[i] - decrement)
    nmf.change_segment_color(f"{leg}Femur", (0.5, 0.5, 0.5, 1))

In the case that both rules are active for the same leg, we will only apply the retraction correction:

# retraction correction is prioritized
if retraction_correction[i] > 0:
    net_correction = retraction_correction[i]
else:
    net_correction = stumbling_correction[i]

Let’s first get the initial joint angles based purely on the CPG phase and preprogrammed step. Then, we will apply the lifting correction:

# get target angles from CPGs and apply correction
my_joints_angles = preprogrammed_steps.get_joint_angles(
    leg, cpg_network.curr_phases[i], cpg_network.curr_magnitudes[i]
)
my_joints_angles += net_correction * correction_vectors[leg[1]]
joints_angles.append(my_joints_angles)

Finally, we can obtain the adhesion on/off signal based on the leg phase as well:

# get adhesion on/off signal
my_adhesion_onoff = preprogrammed_steps.get_adhesion_onoff(
    leg, cpg_network.curr_phases[i]
)
adhesion_onoff.append(my_adhesion_onoff)

We now have all we need to feed the action into the NeuroMechFly simulation. Don’t forget to call .render() to record the video correctly.

action = {
    "joints": np.array(np.concatenate(joints_angles)),
    "adhesion": np.array(adhesion_onoff).astype(int),
}
obs, reward, terminated, truncated, info = nmf.step(action)
nmf.render()

Let’s visualize the results:

1nmf.save_video("./outputs/hybrid_controller_mixed_terrain.mp4")

Even based on this single example, this hybrid controller looks better than the CPG-based or rule-based controller. Indeed, we obtained the following results by running 20 simulations for each controller over each terrain type with different initial conditions. These show that a hybrid controller outperforms the other two controllers (see the NeuroMechFly 2.0 paper for details):

https://github.com/NeLy-EPFL/_media/blob/main/flygym/cpg_rule_based_hybrid_comparison.png?raw=true

These results demonstrate how rugged terrain can expose failure modes for controllers that otherwise work well on flat terrain, and how you can use NeuroMechFly to benchmark different control strategies that go beyond the classical dichotomy of CPG-based versus rule-based control.

In the next tutorial, we will refactor our hybrid controller code into a Python class that implements the Gym interface. This will allow us to show how to build control models with different degrees of abstraction and preprogrammed computations.