Monitor Demo (Python)

This tutorial demonstrates how to monitor ROS 2 topics using YASMIN's MonitorState. This state subscribes to a topic and processes incoming messages, allowing your state machine to react to sensor data or other published information. The MonitorState is particularly valuable in robotics applications where you need to wait for specific sensor readings, check for environmental conditions, or synchronize with external events before proceeding with other tasks.

Background

The MonitorState is useful when you need to:

1. Create the MonitorState

Create a custom monitor state that subscribes to the /odom topic. The constructor takes seven parameters: the message type (Odometry), the topic name ("odom"), a list of custom outcomes, the monitor handler callback, QoS profile for subscription (we use qos_profile_sensor_data for reliable sensor data), message queue size, and timeout in seconds. The monitor_handler method is invoked for each received message. It logs the entire odometry message, decrements a counter, and returns "outcome2" when the target number of messages have been processed, or "outcome1" to continue monitoring. If no message arrives within the timeout period, the state automatically returns the TIMEOUT outcome.

from yasmin_ros import MonitorState
from yasmin_ros.basic_outcomes import TIMEOUT, CANCEL
from nav_msgs.msg import Odometry
from rclpy.qos import qos_profile_sensor_data

class PrintOdometryState(MonitorState):
    """
    MonitorState subclass to handle Odometry messages.

    This state monitors Odometry messages from the specified ROS topic,
    logging them and transitioning based on the number of messages received.
    """

    def __init__(self) -> None:
        """
        Initializes the PrintOdometryState.
        """
        super().__init__(
            Odometry,  # msg type
            "odom",  # topic name
            ["outcome1", "outcome2"],  # outcomes
            self.monitor_handler,  # monitor handler callback
            qos=qos_profile_sensor_data,  # qos for the topic subscription
            msg_queue=10,  # queue for the monitor handler callback
            timeout=10,  # timeout to wait for messages in seconds
        )
        self.times = 5

    def monitor_handler(self, blackboard: Blackboard, msg: Odometry) -> str:
        """
        Handles incoming Odometry messages.

        This method is called whenever a new Odometry message is received.
        It logs the message, decrements the count of messages to process,
        and determines the next state outcome.

        Args:
            blackboard (Blackboard): The shared data storage for states.
            msg (Odometry): The incoming Odometry message.

        Returns:
            str: The next state outcome, either "outcome1" to continue
                 monitoring or "outcome2" to transition to the next state.

        Exceptions:
            None
        """
        yasmin.YASMIN_LOG_INFO(msg)

        self.times -= 1

        if self.times <= 0:
            return "outcome2"

        return "outcome1"

2. Build the State Machine

Initialize ROS 2 and configure the state machine with the monitor state, defining transitions for all possible outcomes. The state loops back to itself with outcome1 to continue monitoring messages, exits with outcome2 when the target number of messages is reached, and handles error conditions like TIMEOUT (no messages received within the timeout period) and CANCEL (manual cancellation) by transitioning to the terminal outcome outcome4. This comprehensive transition mapping ensures graceful handling of all execution paths - successful completion, timeout scenarios, and user interruption.

def main() -> None:
    # Initialize ROS 2
    rclpy.init()

    # Set ROS 2 logs
    set_ros_loggers()
    yasmin.YASMIN_LOG_INFO("yasmin_monitor_demo")

    # Create a finite state machine (FSM)
    sm = StateMachine(outcomes=["outcome4"], handle_sigint=True)

    # Add states to the FSM
    sm.add_state(
        "PRINTING_ODOM",
        PrintOdometryState(),
        transitions={
            "outcome1": "PRINTING_ODOM",
            "outcome2": "outcome4",
            TIMEOUT: "outcome4",
            CANCEL: "outcome4",
        },
    )

    # Publish FSM information for visualization
    YasminViewerPub(sm, "YASMIN_MONITOR_DEMO")

3. Execute

Execute the state machine with proper exception handling to manage keyboard interrupts and ensure clean shutdown.

    # Publish FSM information for visualization
    YasminViewerPub(sm, "YASMIN_MONITOR_DEMO")

    # Execute the FSM
    try:
        outcome = sm()
        yasmin.YASMIN_LOG_INFO(outcome)
    except Exception as e:
        yasmin.YASMIN_LOG_WARN(e)

    # Shutdown ROS 2 if it's running
    if rclpy.ok():
        rclpy.shutdown()


if __name__ == "__main__":
    main()

4. Run the Demo

Ensure you have a node publishing odometry data (e.g., from a robot simulator or real robot), then run:

ros2 run yasmin_demos monitor_demo.py

Expected Behavior

Key Parameters

Parameter Description
qos Quality of Service profile for the subscription
msg_queue Number of messages to buffer
timeout Seconds to wait before returning TIMEOUT