Rule-based controller

Summary: In this tutorial, we will show how locomotion can be achieved using local coordination rules in the absence of centralized mechanisms like coupled CPGs.

Note

Additional dependencies are required to follow this tutorial. In addition to the standard installation, please also run:

pip install networkx

Previously, we covered how a centralized network of coupled oscillators (CPGs) can give rise to locomotion. A more decentralized mechanism for insect locomotion has been proposed as an alternative: locomotion can emerge from the application of sensory feedback-based rules that dictate for each leg when to lift, swing, or remain in stance phase (see Walknet described in Cruse et al, 1998 and reviewed in Schilling et al, 2013). This control approach has been applied to robotic locomotor control (Schneider et al, 2012).

In this tutorial, we will implement a controller based on the first three rules of Walknet, namely:

  1. The swing (“return stroke” as described in the Walknet paper) of a leg inhibits the swing of its rostral neighboring leg.

  2. The start of the stance phase (“power stroke” as described in the Walknet paper) of a leg excites the swing of the rostral contralateral neighboring legs.

  3. The completion of the stance phase (“caudal position” as described in the Walknet paper) excites the swing of the caudal and contralateral neighboring legs.

These rules are summarized in this figure:

https://github.com/NeLy-EPFL/_media/blob/main/flygym/rule_based.png?raw=true

Preprogrammed steps, refactored

We start by loading the preprogrammed steps as explained in the tutorial Controlling locomotion with CPGs. This time, we will use the PreprogrammedSteps Python class that encapsulates much of the code implemented in the previous tutorial. The following is the documentation for this class:

class flygym.mujoco.examples.rule_based_controller.PreprogrammedSteps(path=None)

Bases: object

Preprogrammed steps by each leg extracted from experimental recordings.

Parameters:
pathstr or Path, optional

Path to the preprogrammed steps data. If None, the default preprogrammed steps data will be loaded.

Attributes:
legsList[str]

List of leg names (eg. LF for left front leg).

dofs_per_legList[str]

List of names for degrees of freedom for each leg.

durationfloat

Duration of the preprogrammed step (at 1x speed) in seconds.

neutral_posDict[str, np.ndarray]

Neutral position of DoFs for each leg. Keys are leg names; values are joint angles in the order of self.dofs_per_leg.

swing_periodDict[str, np.ndarray]

The start and end of the liftedswing phase for each leg. Keys are leg names; values are arrays of shape (2,) with the start and end of the swing normalized to [0, 2π].

property default_pose

Default pose of the fly (all legs in neutral position) given as a single array. It is ready to be used as the “joints” state in the action space of NeuroMechFly like the following: NeuroMechFly.step(action={"joints": preprogrammed_steps.default_pose}).

get_adhesion_onoff(leg, phase)

Get whether adhesion is on for a given leg at a given phase.

Parameters:
legstr

Leg name.

phasefloat or np.ndarray

Phase or array of phases of the step normalized to [0, 2π].

Returns:
bool or np.ndarray

Whether adhesion is on for the leg at the given phase(s). A boolean array of shape (n,) is returned if phase is a 1D array of n elements; a bool is returned if phase is a scalar.

get_joint_angles(leg, phase, magnitude=1)

Get joint angles for a given leg at a given phase.

Parameters:
legstr

Leg name.

phasefloat or np.ndarray

Phase or array of phases of the step normalized to [0, 2π].

magnitudefloat or np.ndarray, optional

Magnitude of the step. Default: 1 (the preprogrammed steps as provided).

Returns:
np.ndarray

Joint angles of the leg at the given phase(s). The shape of the array is (7, n) if phase is a 1D array of n elements, or (7,) if phase is a scalar.

Let’s import this class:

1from flygym.mujoco.examples.rule_based_controller import PreprogrammedSteps

We can verify that this works by regenerating the following plot from the CPGs tutorial:

 1import matplotlib.pyplot as plt
 2
 3preprogrammed_steps = PreprogrammedSteps()
 4theta_ts = np.linspace(0, 3 * 2 * np.pi, 100)
 5r_ts = np.linspace(0, 1, 100)
 6
 7fig, axs = plt.subplots(3, 2, figsize=(7, 5), sharex=True, sharey=True)
 8for i_side, side in enumerate("LR"):
 9    for i_pos, pos in enumerate("FMH"):
10        leg = f"{side}{pos}"
11        ax = axs[i_pos, i_side]
12        joint_angles = preprogrammed_steps.get_joint_angles(leg, theta_ts, r_ts)
13        for i_dof, dof_name in enumerate(preprogrammed_steps.dofs_per_leg):
14            legend = dof_name if i_pos == 0 and i_side == 0 else None
15            ax.plot(
16                theta_ts, np.rad2deg(joint_angles[i_dof, :]), linewidth=1, label=legend
17            )
18        for i_cycle in range(3):
19            my_swing_period = preprogrammed_steps.swing_period[leg]
20            theta_offset = i_cycle * 2 * np.pi
21            ax.axvspan(
22                theta_offset + my_swing_period[0],
23                theta_offset + my_swing_period[0] + my_swing_period[1],
24                color="gray",
25                linewidth=0,
26                alpha=0.2,
27                label="Swing" if i_pos == 0 and i_side == 0 and i_cycle == 0 else None,
28            )
29        if i_pos == 2:
30            ax.set_xlabel("Phase")
31            ax.set_xticks(np.pi * np.arange(7))
32            ax.set_xticklabels(["0" if x == 0 else f"{x}$\pi$" for x in np.arange(7)])
33        if i_side == 0:
34            ax.set_ylabel(r"DoF angle ($\degree$)")
35        ax.set_title(f"{leg} leg")
36        ax.set_ylim(-180, 180)
37        ax.set_yticks([-180, -90, 0, 90, 180])
38fig.legend(loc=7)
39fig.tight_layout()
40fig.subplots_adjust(right=0.8)
41fig.savefig("./outputs/preprogrammed_steps_class.png")
https://github.com/NeLy-EPFL/_media/blob/main/flygym/preprogrammed_steps_class.png?raw=true

Implementing the rules

Next, we implement the first three rules from Walknet. To encode the graph representing the local coordination rules (the first figure of this tutorial), we will construct a MultiDiGraph using the Python graph library NetworkX. This is a convenient way to manipulate a directed graph with multiple edges between the same nodes (in our case, each node represents a leg and each edge represents a coupling rule). Note that this graph representation is not strictly necessary; the user can alternatively implement the same logic using lots of lists and dictionaries in native Python.

 1import networkx as nx
 2
 3# For each rule, the keys are the source nodes and the values are the
 4# target nodes influenced by the source nodes
 5edges = {
 6    "rule1": {"LM": ["LF"], "LH": ["LM"], "RM": ["RF"], "RH": ["RM"]},
 7    "rule2": {
 8        "LF": ["RF"],
 9        "LM": ["RM", "LF"],
10        "LH": ["RH", "LM"],
11        "RF": ["LF"],
12        "RM": ["LM", "RF"],
13        "RH": ["LH", "RM"],
14    },
15    "rule3": {
16        "LF": ["RF", "LM"],
17        "LM": ["RM", "LH"],
18        "LH": ["RH"],
19        "RF": ["LF", "RM"],
20        "RM": ["LM", "RH"],
21        "RH": ["LH"],
22    },
23}
24
25# Construct the rules graph
26rules_graph = nx.MultiDiGraph()
27for rule_type, d in edges.items():
28    for src, tgt_nodes in d.items():
29        for tgt in tgt_nodes:
30            if rule_type == "rule1":
31                rule_type_detailed = rule_type
32            else:
33                side = "ipsi" if src[0] == tgt[0] else "contra"
34                rule_type_detailed = f"{rule_type}_{side}"
35            rules_graph.add_edge(src, tgt, rule=rule_type_detailed)

Next, we will implement a helper function that selects the edges given the rule and the source node. This will become handy in the next section.

1def filter_edges(graph, rule, src_node=None):
2    """Return a list of edges that match the given rule and source node.
3    The edges are returned as a list of tuples (src, tgt)."""
4    return [
5        (src, tgt)
6        for src, tgt, rule_type in graph.edges(data="rule")
7        if (rule_type == rule) and (src_node is None or src == src_node)
8    ]

Using rules_graph and the function filter_edges, let’s visualize connections for each of the three rules. The ipsilateral and contralateral connections of the same rule can have different weights, so let’s visualize them separately:

 1node_pos = {
 2    "LF": (0, 0),
 3    "LM": (0, 1),
 4    "LH": (0, 2),
 5    "RF": (1, 0),
 6    "RM": (1, 1),
 7    "RH": (1, 2),
 8}
 9fig, axs = plt.subplots(1, 5, figsize=(8, 3), tight_layout=True)
10for i, rule in enumerate(
11    ["rule1", "rule2_ipsi", "rule2_contra", "rule3_ipsi", "rule3_contra"]
12):
13    ax = axs[i]
14    selected_edges = filter_edges(rules_graph, rule)
15    nx.draw(rules_graph, pos=node_pos, edgelist=selected_edges, with_labels=True, ax=ax)
16    ax.set_title(rule)
17    ax.set_xlim(-0.3, 1.3)
18    ax.set_ylim(-0.3, 2.3)
19    ax.invert_yaxis()
20    ax.axis("on")
21plt.savefig("./outputs/rules_graph.png")
https://github.com/NeLy-EPFL/_media/blob/main/flygym/rules_graph.png?raw=true

Using this rules graph, we will proceed to implement the rule-based leg stepping coordination model. To do this, we will once again construct a Python class.

In the __init__ method of the class, we will save some metadata and initialize arrays for the contributions to the stepping likelihood scores from each of the three rules. We will also initialize an array to track the current stepping phase — that is, how far into the preprogrammed step the leg is, normalized to [0, 2π]. If a step has completed but a new step has not been initiated, the leg remains at phase 0 indefinitely. To indicate whether the legs are stepping at all, we will create a boolean mask. Finally, we will create two dictionaries to map the leg names to the leg indices and vice versa:

 1class RuleBasedSteppingCoordinator:
 2    legs = ["LF", "LM", "LH", "RF", "RM", "RH"]
 3
 4    def __init__(
 5        self, timestep, rules_graph, weights, preprogrammed_steps, margin=0.001, seed=0
 6    ):
 7        self.timestep = timestep
 8        self.rules_graph = rules_graph
 9        self.weights = weights
10        self.preprogrammed_steps = preprogrammed_steps
11        self.margin = margin
12        self.random_state = np.random.RandomState(seed)
13        self._phase_inc_per_step = (
14            2 * np.pi * (timestep / self.preprogrammed_steps.duration)
15        )
16        self.curr_step = 0
17
18        self.rule1_scores = np.zeros(6)
19        self.rule2_scores = np.zeros(6)
20        self.rule3_scores = np.zeros(6)
21
22        self.leg_phases = np.zeros(6)
23        self.mask_is_stepping = np.zeros(6, dtype=bool)
24
25        self._leg2id = {leg: i for i, leg in enumerate(self.legs)}
26        self._id2leg = {i: leg for i, leg in enumerate(self.legs)}

Let’s implement a special combined_score method with a @property decorator to provide easy access to the sum of all three scores. This way, we can access the total score simply with stepping_coordinator.combined_score. Refer to this tutorial if you want to understand how property methods work in Python.

1    @property
2    def combined_scores(self):
3        return self.rule1_scores + self.rule2_scores + self.rule3_scores

As described in the NeuroMechFly 2.0 paper, the leg with the highest positive score is stepped. If multiple legs are within a small margin of the highest score, we choose one of these legs at random to avoid bias from numerical artifacts. Let’s implement a method that selects the legs that are eligible for stepping:

1    def _get_eligible_legs(self):
2        score_thr = self.combined_scores.max()
3        score_thr = max(0, score_thr - np.abs(score_thr) * self.margin)
4        mask_is_eligible = (
5            (self.combined_scores >= score_thr)  # highest or almost highest score
6            & (self.combined_scores > 0)  # score is positive
7            & ~self.mask_is_stepping  # leg is not currently stepping
8        )
9        return np.where(mask_is_eligible)[0]

Then, let’s implement another method that chooses one of the eligible legs at random if at least one leg is eligible, and returns None if no leg can be stepped:

1    def _select_stepping_leg(self):
2        eligible_legs = self._get_eligible_legs()
3        if len(eligible_legs) == 0:
4            return None
5        return self.random_state.choice(eligible_legs)

Now, let’s write a method that applies Rule 1 based on the swing mask and the current phases of the legs:

 1def _apply_rule1(self):
 2        for i, leg in enumerate(self.legs):
 3            is_swinging = (
 4                0 < self.leg_phases[i] < self.preprogrammed_steps.swing_period[leg][1]
 5            )
 6            edges = filter_edges(self.rules_graph, "rule1", src_node=leg)
 7            for _, tgt in edges:
 8                self.rule1_scores[self._leg2id[tgt]] = (
 9                    self.weights["rule1"] if is_swinging else 0
10                )

Rules 2 and 3 are based on “early” and “late” stance periods (power stroke). We will scale their weights by γ, a ratio indicating how far the leg is into the stance phase. Let’s define a helper method that calculates γ:

1    def _select_stepping_leg(self):
2        eligible_legs = self._get_eligible_legs()
3        if len(eligible_legs) == 0:
4            return None
5        return self.random_state.choice(eligible_legs)

Now, we can implement Rule 2 and Rule 3:

 1    def _apply_rule2(self):
 2        self.rule2_scores[:] = 0
 3        for i, leg in enumerate(self.legs):
 4            stance_progress_ratio = self._get_stance_progress_ratio(leg)
 5            if stance_progress_ratio == 0:
 6                continue
 7            for side in ["ipsi", "contra"]:
 8                edges = filter_edges(self.rules_graph, f"rule2_{side}", src_node=leg)
 9                weight = self.weights[f"rule2_{side}"]
10                for _, tgt in edges:
11                    tgt_id = self._leg2id[tgt]
12                    self.rule2_scores[tgt_id] += weight * (1 - stance_progress_ratio)
13
14    def _apply_rule3(self):
15        self.rule3_scores[:] = 0
16        for i, leg in enumerate(self.legs):
17            stance_progress_ratio = self._get_stance_progress_ratio(leg)
18            if stance_progress_ratio == 0:
19                continue
20            for side in ["ipsi", "contra"]:
21                edges = filter_edges(self.rules_graph, f"rule3_{side}", src_node=leg)
22                weight = self.weights[f"rule3_{side}"]
23                for _, tgt in edges:
24                    tgt_id = self._leg2id[tgt]
25                    self.rule3_scores[tgt_id] += weight * stance_progress_ratio

Finally, let’s implement the main step() method:

 1    def step(self):
 2        if self.curr_step == 0:
 3            # The first step is always a fore leg or mid leg
 4            stepping_leg_id = self.random_state.choice([0, 1, 3, 4])
 5        else:
 6            stepping_leg_id = self._select_stepping_leg()
 7
 8        # Initiate a new step, if conditions are met for any leg
 9        if stepping_leg_id is not None:
10            self.mask_is_stepping[stepping_leg_id] = True  # start stepping this leg
11
12        # Progress all stepping legs
13        self.leg_phases[self.mask_is_stepping] += self._phase_inc_per_step
14
15        # Check if any stepping legs has completed a step
16        mask_has_newly_completed = self.leg_phases >= 2 * np.pi
17        self.leg_phases[mask_has_newly_completed] = 0
18        self.mask_is_stepping[mask_has_newly_completed] = False
19
20        # Update scores
21        self._apply_rule1()
22        self._apply_rule2()
23        self._apply_rule3()
24
25        self.curr_step += 1

This class is actually included in flygym.mujoco.examples. Let’s import it.

1from flygym.mujoco.examples.rule_based_controller import RuleBasedSteppingCoordinator

Let’s define the weights of the rules and run 1 second of simulation:

 1run_time = 1
 2timestep = 1e-4
 3
 4weights = {
 5    "rule1": -10,
 6    "rule2_ipsi": 2.5,
 7    "rule2_contra": 1,
 8    "rule3_ipsi": 3.0,
 9    "rule3_contra": 2.0,
10}
11
12controller = RuleBasedSteppingCoordinator(
13    timestep=timestep,
14    rules_graph=rules_graph,
15    weights=weights,
16    preprogrammed_steps=preprogrammed_steps,
17)
18
19score_hist_overall = []
20score_hist_rule1 = []
21score_hist_rule2 = []
22score_hist_rule3 = []
23leg_phases_hist = []
24for i in range(int(run_time / controller.timestep)):
25    controller.step()
26    score_hist_overall.append(controller.combined_scores.copy())
27    score_hist_rule1.append(controller.rule1_scores.copy())
28    score_hist_rule2.append(controller.rule2_scores.copy())
29    score_hist_rule3.append(controller.rule3_scores.copy())
30    leg_phases_hist.append(controller.leg_phases.copy())
31
32score_hist_overall = np.array(score_hist_overall)
33score_hist_rule1 = np.array(score_hist_rule1)
34score_hist_rule2 = np.array(score_hist_rule2)
35score_hist_rule3 = np.array(score_hist_rule3)
36leg_phases_hist = np.array(leg_phases_hist)

Let’s also implement a plotting helper function and visualize the leg phases and stepping likelihood scores over time:

 1def plot_time_series_multi_legs(
 2    time_series_block,
 3    timestep,
 4    spacing=10,
 5    legs=["LF", "LM", "LH", "RF", "RM", "RH"],
 6    ax=None,
 7):
 8    """Plot a time series of scores for multiple legs.
 9
10    Parameters
11    ----------
12    time_series_block : np.ndarray
13        Time series of scores for multiple legs. The shape of the array
14        should be (n, m), where n is the number of time steps and m is the
15        length of ``legs``.
16    timestep : float
17        Timestep of the time series in seconds.
18    spacing : float, optional
19        Spacing between the time series of different legs. Default: 10.
20    legs : List[str], optional
21        List of leg names. Default: ["LF", "LM", "LH", "RF", "RM", "RH"].
22    ax : matplotlib.axes.Axes, optional
23        Axes to plot on. If None, a new figure and axes will be created.
24
25    Returns
26    -------
27    matplotlib.axes.Axes
28        Axes containing the plot.
29    """
30    t_grid = np.arange(time_series_block.shape[0]) * timestep
31    spacing *= -1
32    offset = np.arange(6)[np.newaxis, :] * spacing
33    score_hist_viz = time_series_block + offset
34    if ax is None:
35        fig, ax = plt.subplots(figsize=(8, 3), tight_layout=True)
36    for i in range(len(legs)):
37        ax.axhline(offset.flatten()[i], color="k", linewidth=0.5)
38        ax.plot(t_grid, score_hist_viz[:, i])
39    ax.set_yticks(offset[0], legs)
40    ax.set_xlabel("Time (s)")
41    return ax
 1fig, axs = plt.subplots(5, 1, figsize=(8, 15), tight_layout=True)
 2
 3# Plot leg phases
 4ax = axs[0]
 5plot_time_series_multi_legs(leg_phases_hist, timestep=timestep, ax=ax)
 6ax.set_title("Leg phases")
 7
 8# Plot combined stepping scores
 9ax = axs[1]
10plot_time_series_multi_legs(score_hist_overall, timestep=timestep, spacing=18, ax=ax)
11ax.set_title("Stepping scores (combined)")
12
13# Plot stepping scores (rule 1)
14ax = axs[2]
15plot_time_series_multi_legs(score_hist_rule1, timestep=timestep, spacing=18, ax=ax)
16ax.set_title("Stepping scores (rule 1 contribution)")
17
18# Plot stepping scores (rule 2)
19ax = axs[3]
20plot_time_series_multi_legs(score_hist_rule2, timestep=timestep, spacing=18, ax=ax)
21ax.set_title("Stepping scores (rule 2 contribution)")
22
23# Plot stepping scores (rule 3)
24ax = axs[4]
25plot_time_series_multi_legs(score_hist_rule3, timestep=timestep, spacing=18, ax=ax)
26ax.set_title("Stepping scores (rule 3 contribution)")
27
28fig.savefig("./outputs/rule_based_controll_signals.png")
https://github.com/NeLy-EPFL/_media/blob/main/flygym/rule_based_control_signals.png?raw=true

Plugging the controller into the simulation

By now, we have:

  • implemented the RuleBasedSteppingCoordinator that controls the stepping of the legs

  • (re)implemented PreprogrammedSteps which controls the kinematics of each individual step given the stepping state

The final task is to put everything together and plug the control signals (joint positions) into the NeuroMechFly physics simulation:

 1import flygym.mujoco
 2from tqdm import trange
 3
 4
 5controller = RuleBasedSteppingCoordinator(
 6    timestep=timestep,
 7    rules_graph=rules_graph,
 8    weights=weights,
 9    preprogrammed_steps=preprogrammed_steps,
10)
11sim_params = flygym.mujoco.Parameters(
12    timestep=timestep,
13    render_mode="saved",
14    render_playspeed=0.1,
15    enable_adhesion=True,
16    draw_adhesion=True,
17)
18nmf = flygym.mujoco.NeuroMechFly(
19    sim_params=sim_params,
20    init_pose="stretch",
21    actuated_joints=flygym.mujoco.preprogrammed.all_leg_dofs,
22    control="position",
23)
24
25obs, info = nmf.reset()
26for i in trange(int(run_time / sim_params.timestep)):
27    controller.step()
28    joint_angles = []
29    adhesion_onoff = []
30    for leg, phase in zip(controller.legs, controller.leg_phases):
31        joint_angles_arr = preprogrammed_steps.get_joint_angles(leg, phase)
32        joint_angles.append(joint_angles_arr.flatten())
33        adhesion_onoff.append(preprogrammed_steps.get_adhesion_onoff(leg, phase))
34    action = {
35        "joints": np.concatenate(joint_angles),
36        "adhesion": np.array(adhesion_onoff),
37    }
38    obs, reward, terminated, truncated, info = nmf.step(action)
39    nmf.render()
100%|██████████| 10000/10000 [00:32<00:00, 309.08it/s]

Let’s take a look at the result:

1nmf.save_video("./outputs/rule_based_controller.mp4")