Publisher Demo (Python)
This tutorial demonstrates how to publish ROS 2 messages from within a
YASMIN state machine using the PublisherState. This is
useful for sending commands to actuators, broadcasting status updates,
or generating test data. Publishing states integrate with other state
machine logic, allowing you to coordinate complex behaviors that
involve both sensing (subscribing) and acting (publishing).
Background
The PublisherState allows states to:
- Publish messages to ROS 2 topics as part of state execution
- Generate message content dynamically from the blackboard
- Send periodic updates or commands as part of a control loop
- Coordinate publishing with other state machine operations
- Maintain state across multiple publish operations
1. Create the PublisherState
Create a custom publisher state that publishes integer messages to the
count topic. The constructor takes three parameters: the
message type (Int32), the topic name
("count"), and a callback function that creates the
message. The create_int_msg method is called each time
the state executes. It retrieves the current counter from the
blackboard using blackboard.get(), increments it, stores
it back with blackboard.set(), and creates an Int32
message with the new value. This demonstrates stateful publishing -
the counter persists across state executions thanks to the blackboard,
allowing you to track progress or maintain sequences.
from yasmin_ros import PublisherState
from std_msgs.msg import Int32
class PublishIntState(PublisherState):
"""
PublishIntState is a YASMIN ROS publisher state that sends incrementing integers
to the 'count' topic using std_msgs.msg.Int32 messages.
This state increments a counter on the blackboard and publishes it.
"""
def __init__(self):
"""
Initializes the PublishIntState with the topic 'count' and a message creation callback.
"""
super().__init__(Int32, "count", self.create_int_msg)
def create_int_msg(self, blackboard: Blackboard) -> Int32:
"""
Generates a std_msgs.msg.Int32 message with an incremented counter value.
Args:
blackboard (Blackboard): The shared data store between states.
Returns:
Int32: A ROS message containing the updated counter.
"""
# Get and increment the counter from the blackboard
counter = blackboard.get("counter")
counter += 1
blackboard.set("counter", counter)
# Log the message creation
yasmin.YASMIN_LOG_INFO(f"Creating message {counter}")
# Create and return the message
msg = Int32()
msg.data = counter
return msg
2. Create a Check State
Create a callback state to decide when to stop publishing. The
check_count function retrieves both the current counter
and maximum count from the blackboard, adds a 1-second delay to
simulate processing, and compares them. It returns
"outcome1" when the maximum is reached (signaling
completion), or "outcome2" to continue the publish-check
loop. This pattern of alternating between action (publishing) and
evaluation (checking) is common in state machines for implementing
controlled sequences, rate-limited operations, or conditional loops
based on accumulated state.
from yasmin import CbState
from yasmin_ros.basic_outcomes import SUCCEED
def check_count(blackboard: Blackboard) -> str:
"""
Checks the current counter against a max threshold to determine state transition.
Args:
blackboard (Blackboard): The shared data store between states.
Returns:
str: The outcome string ('outcome1' or 'outcome2').
"""
# Simulate processing time
time.sleep(1)
# Retrieve the counter and max value from blackboard
count = blackboard.get("counter")
max_count = blackboard.get("max_count")
yasmin.YASMIN_LOG_INFO(f"Checking count: {count}")
# Determine and return the outcome based on the counter value
if count >= max_count:
return "outcome1" # Done
else:
return "outcome2" # Continue
3. Build and Execute State Machine
Define the complete main function that initializes ROS 2, creates the state machine with publishing and checking states, sets up the viewer with an on_shutdown handler for proper cleanup, and executes with error handling.
def main() -> None:
yasmin.YASMIN_LOG_INFO("yasmin_monitor_demo")
rclpy.init()
# Configure YASMIN to use ROS-based logging
set_ros_loggers()
# Create the state machine with 'SUCCEED' as the terminal outcome
sm = StateMachine([SUCCEED])
# Ensure the state machine cancels on shutdown
def on_shutdown():
if sm.is_running():
sm.cancel_state()
rclpy.get_default_context().on_shutdown(on_shutdown)
# Add the publishing state which loops until the condition is met
sm.add_state(
"PUBLISHING_INT",
PublishIntState(),
{
SUCCEED: "CHECKING_COUNTS",
},
)
# Add the conditional check state
sm.add_state(
"CHECKING_COUNTS",
CbState(["outcome1", "outcome2"], check_count),
{
"outcome1": SUCCEED,
"outcome2": "PUBLISHING_INT",
},
)
# Launch YASMIN Viewer publisher for state visualization
viewer = YasminViewerPub(sm, "YASMIN_PUBLISHER_DEMO")
# Initialize blackboard with counter values
blackboard = Blackboard()
blackboard.set("counter", 0)
blackboard.set("max_count", 10)
# Run the state machine and log the outcome
try:
outcome = sm(blackboard)
yasmin.YASMIN_LOG_INFO(outcome)
except Exception as e:
yasmin.YASMIN_LOG_INFO(str(e))
finally:
viewer.cleanup()
del sm
# Shutdown ROS 2 if it's running
if rclpy.ok():
rclpy.shutdown()
if __name__ == "__main__":
main()
4. Run the Demo
Execute the publisher demo to observe the state machine publishing sequential integer messages.
ros2 run yasmin_demos publisher_demo.py
5. Monitor Published Messages
In another terminal, echo the topic to see the published messages:
ros2 topic echo /count
Expected Behavior
-
The state machine publishes incrementing integers (1-10) to
/count - After each publish, it checks if the max count is reached
-
Once
counter >= max_count, the FSM exits with SUCCEED
YASMIN