Writing a Simple FSM (Python)
This tutorial shows how to create a simple Finite State Machine (FSM)
using YASMIN in Python. We will create two custom states,
FooState and BarState, and transition
between them. This example demonstrates the fundamental concepts of
state machines: defining states with specific behaviors, managing
state transitions, and sharing data between states using the
blackboard. Python's syntax makes it easy to create readable and
maintainable state machines.
1. Create the FooState
First, we define the FooState class inheriting from
yasmin.State. This state will increment a counter and
switch outcomes based on the counter value. The constructor
initializes two possible outcomes: "outcome1" (to
continue to BarState) and "outcome2" (to finish
execution). In the execute() method, we check if the
counter is less than 3 - if so, we increment it, store a message in
the blackboard, and return "outcome1" to transition to
BarState. After 3 executions, we return "outcome2" to end
the state machine.
class FooState(State):
"""
Represents the Foo state in the state machine.
Attributes:
counter (int): Counter to track the number of executions of this state.
"""
def __init__(self) -> None:
"""
Initializes the FooState instance, setting up the outcomes.
Outcomes:
outcome1: Indicates the state should continue to the Bar state.
outcome2: Indicates the state should finish execution and return.
"""
super().__init__(["outcome1", "outcome2"])
self.counter = 0
def execute(self, blackboard: Blackboard) -> str:
"""
Executes the logic for the Foo state.
Args:
blackboard (Blackboard): The shared data structure for states.
Returns:
str: The outcome of the execution, which can be "outcome1" or "outcome2".
Raises:
Exception: May raise exceptions related to state execution.
"""
yasmin.YASMIN_LOG_INFO("Executing state FOO")
time.sleep(3)
if self.counter < 3:
self.counter += 1
blackboard["foo_str"] = f"Counter: {self.counter}"
return "outcome1"
else:
return "outcome2"
2. Create the BarState
Next, we define the BarState, which serves as an
intermediary state in our FSM. This state has only one possible
outcome: "outcome3", which will transition back to
FooState, creating a loop. In the execute() method, we
retrieve the string that FooState stored in the blackboard (using
dictionary-style access with blackboard["foo_str"]) and
log it using YASMIN's logging system. This demonstrates inter-state
communication through the blackboard - a shared data structure that
allows states to pass information to each other. The
time.sleep(3) call simulates processing time.
class BarState(State):
"""
Represents the Bar state in the state machine.
"""
def __init__(self) -> None:
"""
Initializes the BarState instance, setting up the outcome.
Outcomes:
outcome3: Indicates the state should transition back to the Foo state.
"""
super().__init__(outcomes=["outcome3"])
def execute(self, blackboard: Blackboard) -> str:
"""
Executes the logic for the Bar state.
Args:
blackboard (Blackboard): The shared data structure for states.
Returns:
str: The outcome of the execution, which will always be "outcome3".
Raises:
Exception: May raise exceptions related to state execution.
"""
yasmin.YASMIN_LOG_INFO("Executing state BAR")
time.sleep(3)
yasmin.YASMIN_LOG_INFO(blackboard["foo_str"])
return "outcome3"
3. Main Function
In the main function, we initialize ROS 2 and construct the complete
state machine. First, rclpy.init() initializes the ROS 2
Python client library, and set_ros_loggers() configures
YASMIN to use ROS logging. We create a StateMachine with
"outcome4" as the terminal outcome that ends execution.
Using the add_state() method, we register both FooState
and BarState, defining a dictionary of transitions for each: FooState
can transition to BarState ("outcome1") or terminate
("outcome2"), while BarState always loops back to
FooState ("outcome3"). The
YasminViewerPub publishes state machine data for
real-time visualization in a web interface. Finally, we execute the
state machine by calling sm(), handle keyboard interrupts
gracefully, and shut down ROS 2.
def main() -> None:
yasmin.YASMIN_LOG_INFO("yasmin_demo")
rclpy.init()
set_ros_loggers()
# Create a finite state machine (FSM)
sm = StateMachine(outcomes=["outcome4"])
# Add states to the FSM
sm.add_state(
"FOO",
FooState(),
transitions={
"outcome1": "BAR",
"outcome2": "outcome4",
},
)
sm.add_state(
"BAR",
BarState(),
transitions={
"outcome3": "FOO",
},
)
# Publish FSM information for visualization
viewer = YasminViewerPub(sm, "YASMIN_DEMO")
# Execute the FSM
try:
outcome = sm()
yasmin.YASMIN_LOG_INFO(outcome)
except KeyboardInterrupt:
if sm.is_running():
sm.cancel_state()
finally:
viewer.cleanup()
del sm
# Shutdown ROS 2 if it's running
if rclpy.ok():
rclpy.shutdown()
if __name__ == "__main__":
main()
4. Run the Demo
Execute the Python script to see the state machine transition between FOO and BAR states:
ros2 run yasmin_demos yasmin_demo.py
YASMIN