Monitor Demo (Python)

This tutorial demonstrates how to monitor ROS 2 topics using YASMIN's MonitorState. This state subscribes to a topic and processes incoming messages, allowing your state machine to react to sensor data or other published information. The MonitorState is particularly valuable in robotics applications where you need to wait for specific sensor readings, check for environmental conditions, or synchronize with external events before proceeding with other tasks.

Background

The MonitorState is useful when you need to:

1. Create the MonitorState

Create a custom monitor state that subscribes to the /odom topic. The constructor takes seven parameters: the message type (Odometry), the topic name ("odom"), a list of custom outcomes, the monitor handler callback, QoS profile for subscription (we use qos_profile_sensor_data for reliable sensor data), message queue size, and timeout in seconds. The monitor_handler method is invoked for each received message. It logs the entire odometry message, decrements a counter, and returns "outcome2" when the target number of messages have been processed, or "outcome1" to continue monitoring. If no message arrives within the timeout period, the state automatically returns the TIMEOUT outcome.

from yasmin_ros import MonitorState
from yasmin_ros.basic_outcomes import TIMEOUT, CANCEL
from nav_msgs.msg import Odometry
from rclpy.qos import qos_profile_sensor_data

class PrintOdometryState(MonitorState):
    """
    MonitorState subclass to handle Odometry messages.

    This state monitors Odometry messages from the specified ROS topic,
    logging them and transitioning based on the number of messages received.

    Attributes:
        times (int): The number of messages to monitor before transitioning
                     to the next outcome.

    Args:
        times (int): The initial count of how many Odometry messages to
                     process before changing state.

    Methods:
        monitor_handler(blackboard: Blackboard, msg: Odometry) -> str:
            Handles incoming Odometry messages, logging the message and
            returning the appropriate outcome based on the remaining count.
    """

    def __init__(self, times: int) -> None:
        """
        Initializes the PrintOdometryState.

        Args:
            times (int): The number of Odometry messages to monitor before
                         transitioning to the next outcome.
        """
        super().__init__(
            Odometry,  # msg type
            "odom",  # topic name
            ["outcome1", "outcome2"],  # outcomes
            self.monitor_handler,  # monitor handler callback
            qos=qos_profile_sensor_data,  # qos for the topic subscription
            msg_queue=10,  # queue for the monitor handler callback
            timeout=10,  # timeout to wait for messages in seconds
        )
        self.times = times

    def monitor_handler(self, blackboard: Blackboard, msg: Odometry) -> str:
        """
        Handles incoming Odometry messages.

        This method is called whenever a new Odometry message is received.
        It logs the message, decrements the count of messages to process,
        and determines the next state outcome.

        Args:
            blackboard (Blackboard): The shared data storage for states.
            msg (Odometry): The incoming Odometry message.

        Returns:
            str: The next state outcome, either "outcome1" to continue
                 monitoring or "outcome2" to transition to the next state.

        Exceptions:
            None
        """
        yasmin.YASMIN_LOG_INFO(msg)

        self.times -= 1

        if self.times <= 0:
            return "outcome2"

        return "outcome1"

2. Build the State Machine

Initialize ROS 2 and configure the state machine with the monitor state, defining transitions for all possible outcomes. The state loops back to itself with outcome1 to continue monitoring messages, exits with outcome2 when the target number of messages is reached, and handles error conditions like TIMEOUT (no messages received within the timeout period) and CANCEL (manual cancellation) by transitioning to the terminal outcome outcome4. This comprehensive transition mapping ensures graceful handling of all execution paths - successful completion, timeout scenarios, and user interruption.

rclpy.init()
set_ros_loggers()

sm = StateMachine(outcomes=["outcome4"])

sm.add_state(
    "PRINTING_ODOM",
    PrintOdometryState(5),  # Monitor 5 messages
    transitions={
        "outcome1": "PRINTING_ODOM",  # Loop to continue monitoring
        "outcome2": "outcome4",  # Exit when done
        TIMEOUT: "outcome4",  # Exit on timeout
        CANCEL: "outcome4",  # Exit on cancel
    },
)

YasminViewerPub(sm, "YASMIN_MONITOR_DEMO")

3. Execute

Execute the state machine with proper exception handling to manage keyboard interrupts and ensure clean shutdown.

    # Publish FSM information
    viewer = YasminViewerPub(sm, "YASMIN_MONITOR_DEMO")

    # Execute FSM
    try:
        outcome = sm()
        yasmin.YASMIN_LOG_INFO(outcome)
    except KeyboardInterrupt:
        if sm.is_running():
            sm.cancel_state()
    finally:
        viewer.cleanup()
        del sm

        # Shutdown ROS 2 if it's running
        if rclpy.ok():
            rclpy.shutdown()


if __name__ == "__main__":
    main()

4. Run the Demo

Ensure you have a node publishing odometry data (e.g., from a robot simulator or real robot), then run:

ros2 run yasmin_demos monitor_demo.py

Expected Behavior

Key Parameters

Parameter Description
qos Quality of Service profile for the subscription
msg_queue Number of messages to buffer
timeout Seconds to wait before returning TIMEOUT