Publisher Demo (Python)

This tutorial demonstrates how to publish ROS 2 messages from within a YASMIN state machine using the PublisherState. This is useful for sending commands to actuators, broadcasting status updates, or generating test data. Publishing states integrate with other state machine logic, allowing you to coordinate complex behaviors that involve both sensing (subscribing) and acting (publishing).

Background

The PublisherState allows states to:

1. Create the PublisherState

Create a custom publisher state that publishes integer messages to the count topic. The constructor takes three parameters: the message type (Int32), the topic name ("count"), and a callback function that creates the message. The create_int_msg method is called each time the state executes. It retrieves the current counter from the blackboard using blackboard.get(), increments it, stores it back with blackboard.set(), and creates an Int32 message with the new value. This demonstrates stateful publishing - the counter persists across state executions thanks to the blackboard, allowing you to track progress or maintain sequences.

from yasmin_ros import PublisherState
from std_msgs.msg import Int32

class PublishIntState(PublisherState):
    """
    PublishIntState is a YASMIN ROS publisher state that sends incrementing integers
    to the 'count' topic using std_msgs.msg.Int32 messages.

    This state increments a counter on the blackboard and publishes it.
    """

    def __init__(self):
        """
        Initializes the PublishIntState with the topic 'count' and a message creation callback.
        """
        super().__init__(Int32, "count", self.create_int_msg)

    def create_int_msg(self, blackboard: Blackboard) -> Int32:
        """
        Generates a std_msgs.msg.Int32 message with an incremented counter value.

        Args:
            blackboard (Blackboard): The shared data store between states.

        Returns:
            Int32: A ROS message containing the updated counter.
        """
        # Get and increment the counter from the blackboard
        counter = blackboard.get("counter")
        counter += 1
        blackboard.set("counter", counter)

        # Log the message creation
        yasmin.YASMIN_LOG_INFO(f"Creating message {counter}")

        # Create and return the message
        msg = Int32()
        msg.data = counter
        return msg

2. Create a Check State

Create a callback state to decide when to stop publishing. The check_count function retrieves both the current counter and maximum count from the blackboard, adds a 1-second delay to simulate processing, and compares them. It returns "outcome1" when the maximum is reached (signaling completion), or "outcome2" to continue the publish-check loop. This pattern of alternating between action (publishing) and evaluation (checking) is common in state machines for implementing controlled sequences, rate-limited operations, or conditional loops based on accumulated state.

from yasmin import CbState
from yasmin_ros.basic_outcomes import SUCCEED

def check_count(blackboard: Blackboard) -> str:
    """
    Checks the current counter against a max threshold to determine state transition.

    Args:
        blackboard (Blackboard): The shared data store between states.

    Returns:
        str: The outcome string ('outcome1' or 'outcome2').
    """
    # Simulate processing time
    time.sleep(1)

    # Retrieve the counter and max value from blackboard
    count = blackboard.get("counter")
    max_count = blackboard.get("max_count")

    yasmin.YASMIN_LOG_INFO(f"Checking count: {count}")

    # Determine and return the outcome based on the counter value
    if count >= max_count:
        return "outcome1"  # Done
    else:
        return "outcome2"  # Continue

3. Build and Execute State Machine

Define the complete main function that initializes ROS 2, creates the state machine with publishing and checking states, sets up the viewer with an on_shutdown handler for proper cleanup, and executes with error handling.

def main() -> None:
    yasmin.YASMIN_LOG_INFO("yasmin_monitor_demo")
    rclpy.init()

    # Configure YASMIN to use ROS-based logging
    set_ros_loggers()

    # Create the state machine with 'SUCCEED' as the terminal outcome
    sm = StateMachine([SUCCEED])

    # Ensure the state machine cancels on shutdown
    def on_shutdown():
        if sm.is_running():
            sm.cancel_state()

    rclpy.get_default_context().on_shutdown(on_shutdown)

    # Add the publishing state which loops until the condition is met
    sm.add_state(
        "PUBLISHING_INT",
        PublishIntState(),
        {
            SUCCEED: "CHECKING_COUNTS",
        },
    )

    # Add the conditional check state
    sm.add_state(
        "CHECKING_COUNTS",
        CbState(["outcome1", "outcome2"], check_count),
        {
            "outcome1": SUCCEED,
            "outcome2": "PUBLISHING_INT",
        },
    )

    # Launch YASMIN Viewer publisher for state visualization
    viewer = YasminViewerPub(sm, "YASMIN_PUBLISHER_DEMO")

    # Initialize blackboard with counter values
    blackboard = Blackboard()
    blackboard.set("counter", 0)
    blackboard.set("max_count", 10)

    # Run the state machine and log the outcome
    try:
        outcome = sm(blackboard)
        yasmin.YASMIN_LOG_INFO(outcome)
    except Exception as e:
        yasmin.YASMIN_LOG_INFO(str(e))
    finally:
        viewer.cleanup()
        del sm

        # Shutdown ROS 2 if it's running
        if rclpy.ok():
            rclpy.shutdown()


if __name__ == "__main__":
    main()

4. Run the Demo

Execute the publisher demo to observe the state machine publishing sequential integer messages.

ros2 run yasmin_demos publisher_demo.py

5. Monitor Published Messages

In another terminal, echo the topic to see the published messages:

ros2 topic echo /count

Expected Behavior