Nav2 Demo (Hierarchical FSM + ROS 2 Action)

This tutorial demonstrates how to integrate YASMIN with ROS 2 Navigation (Nav2) using a hierarchical state machine. Hierarchical FSMs allow you to nest state machines within states, creating modular and reusable navigation behaviors. This pattern is ideal for complex robot tasks where high-level planning (waypoint selection) is separated from low-level execution (navigation to each point).

⚠️ Prerequisites - Create a ROS 2 Package First:

Before following this tutorial, you need to create a new ROS 2 package to contain your Nav2 state machine code. The nav2_demo source files shown here are not included in the default yasmin_demos package.

Create your package with:

ros2 pkg create --build-type ament_python my_nav2_demo \
  --dependencies yasmin yasmin_ros nav2_msgs geometry_msgs

Then add the source code from this tutorial to your new package.

Overview

The demo creates a two-level hierarchical state machine. The parent state machine handles waypoint management: it creates a dictionary of named waypoint coordinates (entrance, bathroom, livingroom, kitchen, bedroom), randomly selects a subset for the mission, and delegates navigation to a child state machine. The child state machine implements the navigation loop: it retrieves the next waypoint from the list, converts coordinates to a Pose message, sends it to Nav2's NavigateToPose action server, and repeats until all waypoints are visited. This structure demonstrates state machine composition, shared blackboard data between levels, and ROS 2 action integration for real robot navigation.

The workflow consists of:

  1. Create Waypoints: Initialize a dictionary with 5 predefined locations (x, y, orientation_z, orientation_w).
  2. Select Random Subset: Choose N random waypoints from the dictionary based on waypoints_num blackboard parameter.
  3. Navigate (Hierarchical): Enter the nested navigation state machine that loops through selected waypoints, sending each to Nav2's action server.

Running the Demo

After creating your package and adding the source code, build and run:

# Build your package
colcon build --packages-select my_nav2_demo
source install/setup.bash

# Run the demo
ros2 run my_nav2_demo nav2_demo

Note: Make sure you have Nav2 running with a valid map before executing this demo.

Source Code

You can find the full source code in the yasmin_demos package.

import random
import rclpy
from geometry_msgs.msg import Pose
from nav2_msgs.action import NavigateToPose

import yasmin
from yasmin import CbState, Blackboard, StateMachine
from yasmin_ros import ActionState
from yasmin_ros import set_ros_loggers
from yasmin_ros.basic_outcomes import SUCCEED, ABORT, CANCEL
from yasmin_viewer import YasminViewerPub

# Constants for state outcomes
HAS_NEXT = "has_next"  ##< Indicates there are more waypoints
END = "end"  ##< Indicates no more waypoints


class Nav2State(ActionState):
    """
    ActionState for navigating to a specified pose using ROS 2 Navigation.
    """

    def __init__(self) -> None:
        """
        Initializes the Nav2State.

        Calls the parent constructor to set up the action with:
        - Action type: NavigateToPose
        - Action name: /navigate_to_pose
        - Callback for goal creation: create_goal_handler
        - Outcomes: None, since it will use default outcomes (SUCCEED, ABORT, CANCEL)
        """
        super().__init__(
            NavigateToPose,  # action type
            "/navigate_to_pose",  # action name
            self.create_goal_handler,  # callback to create the goal
            None,  # outcomes
            None,  # callback to process the response
        )

    def create_goal_handler(self, blackboard: Blackboard) -> NavigateToPose.Goal:
        """
        Creates a goal for navigation based on the current pose in the blackboard.

        Args:
            blackboard (Blackboard): The blackboard instance holding current state data.

        Returns:
            NavigateToPose.Goal: The constructed goal for the navigation action.
        """
        goal = NavigateToPose.Goal()
        goal.pose.pose = blackboard["pose"]
        goal.pose.header.frame_id = "map"  # Set the reference frame to 'map'
        return goal


def create_waypoints(blackboard: Blackboard) -> str:
    """
    Initializes waypoints in the blackboard for navigation.

    Args:
        blackboard (Blackboard): The blackboard instance to store waypoints.

    Returns:
        str: Outcome indicating success (SUCCEED).
    """
    blackboard["waypoints"] = {
        "entrance": [1.25, 6.30, -0.78, 0.67],
        "bathroom": [4.89, 1.64, 0.0, 1.0],
        "livingroom": [1.55, 4.03, -0.69, 0.72],
        "kitchen": [3.79, 6.77, 0.99, 0.12],
        "bedroom": [7.50, 4.89, 0.76, 0.65],
    }
    return SUCCEED

def take_random_waypoint(blackboard: Blackboard) -> str:
    """
    Selects a random set of waypoints from the available waypoints.

    Args:
        blackboard (Blackboard): The blackboard instance to store random waypoints.

    Returns:
        str: Outcome indicating success (SUCCEED).
    """
    blackboard["random_waypoints"] = random.sample(
        list(blackboard["waypoints"].keys()), blackboard["waypoints_num"]
    )
    return SUCCEED

def get_next_waypoint(blackboard: Blackboard) -> str:
    """
    Retrieves the next waypoint from the list of random waypoints.

    Updates the blackboard with the pose of the next waypoint.

    Args:
        blackboard (Blackboard): The blackboard instance holding current state data.

    Returns:
        str: Outcome indicating whether there is a next waypoint (HAS_NEXT) or if
             navigation is complete (END).
    """
    if not blackboard["random_waypoints"]:
        return END

    wp_name = blackboard["random_waypoints"].pop(0)  # Get the next waypoint name
    wp = blackboard["waypoints"][wp_name]  # Get the waypoint coordinates

    pose = Pose()
    pose.position.x = wp[0]
    pose.position.y = wp[1]
    pose.orientation.z = wp[2]
    pose.orientation.w = wp[3]

    blackboard["pose"] = pose  # Update blackboard with new pose
    blackboard["text"] = f"I have reached waypoint {wp_name}"

    return HAS_NEXT


def main() -> None:
    # Initialize ROS 2
    rclpy.init()

    # Set ROS 2 loggers
    set_ros_loggers()
    yasmin.YASMIN_LOG_INFO("yasmin_nav2_demo")

    # Create state machines
    sm = StateMachine(outcomes=[SUCCEED, ABORT, CANCEL], handle_sigint=True)
    nav_sm = StateMachine(outcomes=[SUCCEED, ABORT, CANCEL])

    # Add states to the state machine
    sm.add_state(
        "CREATING_WAYPOINTS",
        CbState([SUCCEED], create_waypoints),
        transitions={
            SUCCEED: "TAKING_RANDOM_WAYPOINTS",
        },
    )
    sm.add_state(
        "TAKING_RANDOM_WAYPOINTS",
        CbState([SUCCEED], take_random_waypoint),
        transitions={
            SUCCEED: "NAVIGATING",
        },
    )
    nav_sm.add_state(
        "GETTING_NEXT_WAYPOINT",
        CbState([END, HAS_NEXT], get_next_waypoint),
        transitions={
            END: SUCCEED,
            HAS_NEXT: "NAVIGATING",
        },
    )
    nav_sm.add_state(
        "NAVIGATING",
        Nav2State(),
        transitions={
            SUCCEED: "GETTING_NEXT_WAYPOINT",
            CANCEL: CANCEL,
            ABORT: ABORT,
        },
    )
    sm.add_state(
        "NAVIGATING",
        nav_sm,
        transitions={
            SUCCEED: SUCCEED,
            CANCEL: CANCEL,
            ABORT: ABORT,
        },
    )

    # Publish FSM information for visualization
    YasminViewerPub(sm, "YASMIN_NAV2_DEMO")

    # Set the number of waypoints
    blackboard = Blackboard()
    blackboard["waypoints_num"] = 2

    # Execute the FSM
    try:
        outcome = sm(blackboard)
        yasmin.YASMIN_LOG_INFO(outcome)
    except Exception as e:
        yasmin.YASMIN_LOG_WARN(e)

    # Shutdown ROS 2 if it's running
    if rclpy.ok():
        rclpy.shutdown()


if __name__ == "__main__":
    main()