Nav2 Demo (Hierarchical FSM + ROS 2 Action)
This tutorial demonstrates how to integrate YASMIN with ROS 2 Navigation (Nav2) using a hierarchical state machine. Hierarchical FSMs allow you to nest state machines within states, creating modular and reusable navigation behaviors. This pattern is ideal for complex robot tasks where high-level planning (waypoint selection) is separated from low-level execution (navigation to each point).
Before following this tutorial, you need to create a new ROS 2 package to contain your Nav2 state machine code. The nav2_demo source files shown here are not included in the default yasmin_demos package.
Create your package with:
ros2 pkg create --build-type ament_python my_nav2_demo \
--dependencies yasmin yasmin_ros nav2_msgs geometry_msgs
Then add the source code from this tutorial to your new package.
Overview
The demo creates a two-level hierarchical state machine. The parent
state machine handles waypoint management: it creates a dictionary of
named waypoint coordinates (entrance, bathroom, livingroom, kitchen,
bedroom), randomly selects a subset for the mission, and delegates
navigation to a child state machine. The child state machine
implements the navigation loop: it retrieves the next waypoint from
the list, converts coordinates to a Pose message, sends
it to Nav2's NavigateToPose action server, and repeats
until all waypoints are visited. This structure demonstrates state
machine composition, shared blackboard data between levels, and ROS 2
action integration for real robot navigation.
The workflow consists of:
- Create Waypoints: Initialize a dictionary with 5 predefined locations (x, y, orientation_z, orientation_w).
-
Select Random Subset: Choose N random waypoints
from the dictionary based on
waypoints_numblackboard parameter. - Navigate (Hierarchical): Enter the nested navigation state machine that loops through selected waypoints, sending each to Nav2's action server.
Running the Demo
After creating your package and adding the source code, build and run:
# Build your package
colcon build --packages-select my_nav2_demo
source install/setup.bash
# Run the demo
ros2 run my_nav2_demo nav2_demo
Note: Make sure you have Nav2 running with a valid map before executing this demo.
Source Code
You can find the full source code in the yasmin_demos package.
import random
import rclpy
from geometry_msgs.msg import Pose
from nav2_msgs.action import NavigateToPose
import yasmin
from yasmin import CbState, Blackboard, StateMachine
from yasmin_ros import ActionState
from yasmin_ros import set_ros_loggers
from yasmin_ros.basic_outcomes import SUCCEED, ABORT, CANCEL
from yasmin_viewer import YasminViewerPub
# Constants for state outcomes
HAS_NEXT = "has_next" ##< Indicates there are more waypoints
END = "end" ##< Indicates no more waypoints
class Nav2State(ActionState):
"""
ActionState for navigating to a specified pose using ROS 2 Navigation.
"""
def __init__(self) -> None:
"""
Initializes the Nav2State.
Calls the parent constructor to set up the action with:
- Action type: NavigateToPose
- Action name: /navigate_to_pose
- Callback for goal creation: create_goal_handler
- Outcomes: None, since it will use default outcomes (SUCCEED, ABORT, CANCEL)
"""
super().__init__(
NavigateToPose, # action type
"/navigate_to_pose", # action name
self.create_goal_handler, # callback to create the goal
None, # outcomes
None, # callback to process the response
)
def create_goal_handler(self, blackboard: Blackboard) -> NavigateToPose.Goal:
"""
Creates a goal for navigation based on the current pose in the blackboard.
Args:
blackboard (Blackboard): The blackboard instance holding current state data.
Returns:
NavigateToPose.Goal: The constructed goal for the navigation action.
"""
goal = NavigateToPose.Goal()
goal.pose.pose = blackboard["pose"]
goal.pose.header.frame_id = "map" # Set the reference frame to 'map'
return goal
def create_waypoints(blackboard: Blackboard) -> str:
"""
Initializes waypoints in the blackboard for navigation.
Args:
blackboard (Blackboard): The blackboard instance to store waypoints.
Returns:
str: Outcome indicating success (SUCCEED).
"""
blackboard["waypoints"] = {
"entrance": [1.25, 6.30, -0.78, 0.67],
"bathroom": [4.89, 1.64, 0.0, 1.0],
"livingroom": [1.55, 4.03, -0.69, 0.72],
"kitchen": [3.79, 6.77, 0.99, 0.12],
"bedroom": [7.50, 4.89, 0.76, 0.65],
}
return SUCCEED
def take_random_waypoint(blackboard: Blackboard) -> str:
"""
Selects a random set of waypoints from the available waypoints.
Args:
blackboard (Blackboard): The blackboard instance to store random waypoints.
Returns:
str: Outcome indicating success (SUCCEED).
"""
blackboard["random_waypoints"] = random.sample(
list(blackboard["waypoints"].keys()), blackboard["waypoints_num"]
)
return SUCCEED
def get_next_waypoint(blackboard: Blackboard) -> str:
"""
Retrieves the next waypoint from the list of random waypoints.
Updates the blackboard with the pose of the next waypoint.
Args:
blackboard (Blackboard): The blackboard instance holding current state data.
Returns:
str: Outcome indicating whether there is a next waypoint (HAS_NEXT) or if
navigation is complete (END).
"""
if not blackboard["random_waypoints"]:
return END
wp_name = blackboard["random_waypoints"].pop(0) # Get the next waypoint name
wp = blackboard["waypoints"][wp_name] # Get the waypoint coordinates
pose = Pose()
pose.position.x = wp[0]
pose.position.y = wp[1]
pose.orientation.z = wp[2]
pose.orientation.w = wp[3]
blackboard["pose"] = pose # Update blackboard with new pose
blackboard["text"] = f"I have reached waypoint {wp_name}"
return HAS_NEXT
def main() -> None:
# Initialize ROS 2
rclpy.init()
# Set ROS 2 loggers
set_ros_loggers()
yasmin.YASMIN_LOG_INFO("yasmin_nav2_demo")
# Create state machines
sm = StateMachine(outcomes=[SUCCEED, ABORT, CANCEL], handle_sigint=True)
nav_sm = StateMachine(outcomes=[SUCCEED, ABORT, CANCEL])
# Add states to the state machine
sm.add_state(
"CREATING_WAYPOINTS",
CbState([SUCCEED], create_waypoints),
transitions={
SUCCEED: "TAKING_RANDOM_WAYPOINTS",
},
)
sm.add_state(
"TAKING_RANDOM_WAYPOINTS",
CbState([SUCCEED], take_random_waypoint),
transitions={
SUCCEED: "NAVIGATING",
},
)
nav_sm.add_state(
"GETTING_NEXT_WAYPOINT",
CbState([END, HAS_NEXT], get_next_waypoint),
transitions={
END: SUCCEED,
HAS_NEXT: "NAVIGATING",
},
)
nav_sm.add_state(
"NAVIGATING",
Nav2State(),
transitions={
SUCCEED: "GETTING_NEXT_WAYPOINT",
CANCEL: CANCEL,
ABORT: ABORT,
},
)
sm.add_state(
"NAVIGATING",
nav_sm,
transitions={
SUCCEED: SUCCEED,
CANCEL: CANCEL,
ABORT: ABORT,
},
)
# Publish FSM information for visualization
YasminViewerPub(sm, "YASMIN_NAV2_DEMO")
# Set the number of waypoints
blackboard = Blackboard()
blackboard["waypoints_num"] = 2
# Execute the FSM
try:
outcome = sm(blackboard)
yasmin.YASMIN_LOG_INFO(outcome)
except Exception as e:
yasmin.YASMIN_LOG_WARN(e)
# Shutdown ROS 2 if it's running
if rclpy.ok():
rclpy.shutdown()
if __name__ == "__main__":
main()
YASMIN