Writing a Simple FSM (Python)

This tutorial shows how to create a simple Finite State Machine (FSM) using YASMIN in Python. We will create two custom states, FooState and BarState, and transition between them. This example demonstrates the fundamental concepts of state machines: defining states with specific behaviors, managing state transitions, and sharing data between states using the blackboard. Python's syntax makes it easy to create readable and maintainable state machines.

1. Create the FooState

First, we define the FooState class inheriting from yasmin.State. This state will increment a counter and switch outcomes based on the counter value. The constructor initializes two possible outcomes: "outcome1" (to continue to BarState) and "outcome2" (to finish execution). In the execute() method, we check if the counter is less than 3 - if so, we increment it, store a message in the blackboard, and return "outcome1" to transition to BarState. After 3 executions, we return "outcome2" to end the state machine.

class FooState(State):
    """
    Represents the Foo state in the state machine.

    Attributes:
        counter (int): Counter to track the number of executions of this state.
    """

    def __init__(self) -> None:
        """
        Initializes the FooState instance, setting up the outcomes.

        Outcomes:
            outcome1: Indicates the state should continue to the Bar state.
            outcome2: Indicates the state should finish execution and return.
        """
        super().__init__(["outcome1", "outcome2"])
        self.counter = 0

    def execute(self, blackboard: Blackboard) -> str:
        """
        Executes the logic for the Foo state.

        Args:
            blackboard (Blackboard): The shared data structure for states.

        Returns:
            str: The outcome of the execution, which can be "outcome1" or "outcome2".

        Raises:
            Exception: May raise exceptions related to state execution.
        """
        yasmin.YASMIN_LOG_INFO("Executing state FOO")
        time.sleep(3)

        if self.counter < 3:
            self.counter += 1
            blackboard["foo_str"] = f"Counter: {self.counter}"
            return "outcome1"
        else:
            return "outcome2"

2. Create the BarState

Next, we define the BarState, which serves as an intermediary state in our FSM. This state has only one possible outcome: "outcome3", which will transition back to FooState, creating a loop. In the execute() method, we retrieve the string that FooState stored in the blackboard (using dictionary-style access with blackboard["foo_str"]) and log it using YASMIN's logging system. This demonstrates inter-state communication through the blackboard - a shared data structure that allows states to pass information to each other. The time.sleep(3) call simulates processing time.

class BarState(State):
    """
    Represents the Bar state in the state machine.
    """

    def __init__(self) -> None:
        """
        Initializes the BarState instance, setting up the outcome.

        Outcomes:
            outcome3: Indicates the state should transition back to the Foo state.
        """
        super().__init__(outcomes=["outcome3"])

    def execute(self, blackboard: Blackboard) -> str:
        """
        Executes the logic for the Bar state.

        Args:
            blackboard (Blackboard): The shared data structure for states.

        Returns:
            str: The outcome of the execution, which will always be "outcome3".

        Raises:
            Exception: May raise exceptions related to state execution.
        """
        yasmin.YASMIN_LOG_INFO("Executing state BAR")
        time.sleep(3)

        yasmin.YASMIN_LOG_INFO(blackboard["foo_str"])
        return "outcome3"

3. Main Function

In the main function, we initialize ROS 2 and construct the complete state machine. First, rclpy.init() initializes the ROS 2 Python client library, and set_ros_loggers() configures YASMIN to use ROS logging. We create a StateMachine with "outcome4" as the terminal outcome that ends execution. Using the add_state() method, we register both FooState and BarState, defining a dictionary of transitions for each: FooState can transition to BarState ("outcome1") or terminate ("outcome2"), while BarState always loops back to FooState ("outcome3"). The YasminViewerPub publishes state machine data for real-time visualization in a web interface. Finally, we execute the state machine by calling sm(), handle keyboard interrupts gracefully, and shut down ROS 2.

def main() -> None:
    yasmin.YASMIN_LOG_INFO("yasmin_demo")
    rclpy.init()
    set_ros_loggers()

    # Create a finite state machine (FSM)
    sm = StateMachine(outcomes=["outcome4"])

    # Add states to the FSM
    sm.add_state(
        "FOO",
        FooState(),
        transitions={
            "outcome1": "BAR",
            "outcome2": "outcome4",
        },
    )
    sm.add_state(
        "BAR",
        BarState(),
        transitions={
            "outcome3": "FOO",
        },
    )

    # Publish FSM information for visualization
    viewer = YasminViewerPub(sm, "YASMIN_DEMO")

    # Execute the FSM
    try:
        outcome = sm()
        yasmin.YASMIN_LOG_INFO(outcome)
    except KeyboardInterrupt:
        if sm.is_running():
            sm.cancel_state()
    finally:
        viewer.cleanup()
        del sm

        # Shutdown ROS 2 if it's running
        if rclpy.ok():
            rclpy.shutdown()


if __name__ == "__main__":
    main()

4. Run the Demo

Execute the Python script to see the state machine transition between FOO and BAR states:

ros2 run yasmin_demos yasmin_demo.py