Monitor Demo (Python)
This tutorial demonstrates how to monitor ROS 2 topics using YASMIN's
MonitorState. This state subscribes to a topic and
processes incoming messages, allowing your state machine to react to
sensor data or other published information. The MonitorState is
particularly valuable in robotics applications where you need to wait
for specific sensor readings, check for environmental conditions, or
synchronize with external events before proceeding with other tasks.
Background
The MonitorState is useful when you need to:
- Wait for a specific message to arrive on a topic
- Process a certain number of messages before transitioning
- React to changing data from sensors or other ROS nodes
- Implement timeout behavior if no messages arrive within a time limit
- Monitor robot state (odometry, battery, sensors) for decision making
1. Create the MonitorState
Create a custom monitor state that subscribes to the
/odom topic. The constructor takes seven parameters: the
message type (Odometry), the topic name
("odom"), a list of custom outcomes, the monitor handler
callback, QoS profile for subscription (we use
qos_profile_sensor_data for reliable sensor data),
message queue size, and timeout in seconds. The
monitor_handler method is invoked for each received
message. It logs the entire odometry message, decrements a counter,
and returns "outcome2" when the target number of messages
have been processed, or "outcome1" to continue
monitoring. If no message arrives within the timeout period, the state
automatically returns the TIMEOUT outcome.
from yasmin_ros import MonitorState
from yasmin_ros.basic_outcomes import TIMEOUT, CANCEL
from nav_msgs.msg import Odometry
from rclpy.qos import qos_profile_sensor_data
class PrintOdometryState(MonitorState):
"""
MonitorState subclass to handle Odometry messages.
This state monitors Odometry messages from the specified ROS topic,
logging them and transitioning based on the number of messages received.
Attributes:
times (int): The number of messages to monitor before transitioning
to the next outcome.
Args:
times (int): The initial count of how many Odometry messages to
process before changing state.
Methods:
monitor_handler(blackboard: Blackboard, msg: Odometry) -> str:
Handles incoming Odometry messages, logging the message and
returning the appropriate outcome based on the remaining count.
"""
def __init__(self, times: int) -> None:
"""
Initializes the PrintOdometryState.
Args:
times (int): The number of Odometry messages to monitor before
transitioning to the next outcome.
"""
super().__init__(
Odometry, # msg type
"odom", # topic name
["outcome1", "outcome2"], # outcomes
self.monitor_handler, # monitor handler callback
qos=qos_profile_sensor_data, # qos for the topic subscription
msg_queue=10, # queue for the monitor handler callback
timeout=10, # timeout to wait for messages in seconds
)
self.times = times
def monitor_handler(self, blackboard: Blackboard, msg: Odometry) -> str:
"""
Handles incoming Odometry messages.
This method is called whenever a new Odometry message is received.
It logs the message, decrements the count of messages to process,
and determines the next state outcome.
Args:
blackboard (Blackboard): The shared data storage for states.
msg (Odometry): The incoming Odometry message.
Returns:
str: The next state outcome, either "outcome1" to continue
monitoring or "outcome2" to transition to the next state.
Exceptions:
None
"""
yasmin.YASMIN_LOG_INFO(msg)
self.times -= 1
if self.times <= 0:
return "outcome2"
return "outcome1"
2. Build the State Machine
Initialize ROS 2 and configure the state machine with the monitor
state, defining transitions for all possible outcomes. The state loops
back to itself with outcome1 to continue monitoring
messages, exits with outcome2 when the target number of
messages is reached, and handles error conditions like
TIMEOUT (no messages received within the timeout period)
and CANCEL (manual cancellation) by transitioning to the
terminal outcome outcome4. This comprehensive transition
mapping ensures graceful handling of all execution paths - successful
completion, timeout scenarios, and user interruption.
rclpy.init()
set_ros_loggers()
sm = StateMachine(outcomes=["outcome4"])
sm.add_state(
"PRINTING_ODOM",
PrintOdometryState(5), # Monitor 5 messages
transitions={
"outcome1": "PRINTING_ODOM", # Loop to continue monitoring
"outcome2": "outcome4", # Exit when done
TIMEOUT: "outcome4", # Exit on timeout
CANCEL: "outcome4", # Exit on cancel
},
)
YasminViewerPub(sm, "YASMIN_MONITOR_DEMO")
3. Execute
Execute the state machine with proper exception handling to manage keyboard interrupts and ensure clean shutdown.
# Publish FSM information
viewer = YasminViewerPub(sm, "YASMIN_MONITOR_DEMO")
# Execute FSM
try:
outcome = sm()
yasmin.YASMIN_LOG_INFO(outcome)
except KeyboardInterrupt:
if sm.is_running():
sm.cancel_state()
finally:
viewer.cleanup()
del sm
# Shutdown ROS 2 if it's running
if rclpy.ok():
rclpy.shutdown()
if __name__ == "__main__":
main()
4. Run the Demo
Ensure you have a node publishing odometry data (e.g., from a robot simulator or real robot), then run:
ros2 run yasmin_demos monitor_demo.py
Expected Behavior
-
The state will wait for messages on the
/odomtopic - Each message is logged and the counter decrements
- After 5 messages, the state returns "outcome2" and the FSM exits
- If no messages arrive within 10 seconds, the state returns TIMEOUT
Key Parameters
| Parameter | Description |
|---|---|
qos |
Quality of Service profile for the subscription |
msg_queue |
Number of messages to buffer |
timeout |
Seconds to wait before returning TIMEOUT |
YASMIN