Action Client Demo (Python)

This tutorial demonstrates how to integrate ROS 2 actions into a YASMIN state machine using the ActionState class. Actions are perfect for long-running tasks that need to provide feedback during execution and can be canceled if needed - such as navigation, manipulation, or data processing tasks. We'll use the Fibonacci action as an example to show how to send goals, monitor feedback, and handle results. The ActionState automatically manages the action client lifecycle, handles timeouts, and provides clean integration with the state machine.

Prerequisites

Start the Fibonacci action server in a separate terminal before running this demo:

ros2 run yasmin_demos fibonacci_action_server

1. Create the ActionState

Create a custom state that inherits from ActionState. The constructor takes six parameters: the action type (Fibonacci), the action name ("/fibonacci"), a callback to create the goal, optional custom outcomes (we use None to accept defaults: SUCCEED, ABORT, CANCEL), a response handler, and a feedback handler. The create_goal_handler retrieves the Fibonacci order from the blackboard and creates a goal request. The response_handler processes the final result and stores the sequence in the blackboard. The print_feedback method is called periodically during execution, allowing us to monitor the partial sequence as it's computed - this is one of the key advantages of actions over services.

from yasmin_ros import ActionState
from example_interfaces.action import Fibonacci

class FibonacciState(ActionState):
    """
    Class representing the state of the Fibonacci action.

    Inherits from ActionState and implements methods to handle the
    Fibonacci action in a finite state machine.

    Attributes:
        None
    """

    def __init__(self) -> None:
        """
        Initializes the FibonacciState.

        Sets up the action type and the action name for the Fibonacci
        action. Initializes goal, response handler, and feedback
        processing callbacks.

        Args:
            None

        Returns:
            None
        """
        super().__init__(
            Fibonacci,  # action type
            "/fibonacci",  # action name
            self.create_goal_handler,  # callback to create the goal
            None,  # outcomes. Includes (SUCCEED, ABORT, CANCEL)
            self.response_handler,  # callback to process the response
            self.print_feedback,  # callback to process the feedback
        )

    def create_goal_handler(self, blackboard: Blackboard) -> Fibonacci.Goal:
        """
        Creates the goal for the Fibonacci action.

        This method retrieves the input value from the blackboard and
        populates the Fibonacci goal.

        Args:
            blackboard (Blackboard): The blackboard containing the state
            information.

        Returns:
            Fibonacci.Goal: The populated goal object for the Fibonacci action.

        Raises:
            KeyError: If the expected key is not present in the blackboard.
        """
        goal = Fibonacci.Goal()
        goal.order = blackboard["n"]  # Retrieve the input value 'n' from the blackboard
        return goal

    def response_handler(self, blackboard: Blackboard, response: Fibonacci.Result) -> str:
        """
        Handles the response from the Fibonacci action.

        This method processes the result of the Fibonacci action and
        stores it in the blackboard.

        Args:
            blackboard (Blackboard): The blackboard to store the result.
            response (Fibonacci.Result): The result object from the Fibonacci action.

        Returns:
            str: Outcome of the operation, typically SUCCEED.

        Raises:
            None
        """
        blackboard["fibo_res"] = (
            response.sequence
        )  # Store the result sequence in the blackboard
        return SUCCEED

    def print_feedback(
        self, blackboard: Blackboard, feedback: Fibonacci.Feedback
    ) -> None:
        """
        Prints feedback from the Fibonacci action.

        This method logs the partial sequence received during the action.

        Args:
            blackboard (Blackboard): The blackboard (not used in this method).
            feedback (Fibonacci.Feedback): The feedback object from the Fibonacci action.

        Returns:
            None

        Raises:
            None
        """
        yasmin.YASMIN_LOG_INFO(f"Received feedback: {list(feedback.sequence)}")

2. Create Result Printing State

Define a callback state to print the Fibonacci sequence result stored in the blackboard after the action completes successfully.

from yasmin import CbState
from yasmin_ros.basic_outcomes import SUCCEED

def print_result(blackboard: Blackboard) -> str:
    """
    Prints the result of the Fibonacci action.

    This function logs the final result stored in the blackboard.

    Args:
        blackboard (Blackboard): The blackboard containing the result.

    Returns:
        str: Outcome of the operation, typically SUCCEED.

    Raises:
        None
    """
    yasmin.YASMIN_LOG_INFO(f"Result: {blackboard['fibo_res']}")
    return SUCCEED

3. Build the State Machine

Assemble the action client state and result printing state into a sequential state machine. Initialize the blackboard with the Fibonacci order value that will be sent as the action goal.

rclpy.init()
set_ros_loggers()

sm = StateMachine(outcomes=["outcome4"])

sm.add_state(
    "CALLING_FIBONACCI",
    FibonacciState(),
    transitions={
        SUCCEED: "PRINTING_RESULT",
        CANCEL: "outcome4",
        ABORT: "outcome4",
    },
)
sm.add_state(
    "PRINTING_RESULT",
    CbState([SUCCEED], print_result),
    transitions={SUCCEED: "outcome4"},
)

YasminViewerPub(sm, "YASMIN_ACTION_CLIENT_DEMO")

blackboard = Blackboard()
blackboard["n"] = 10  # Fibonacci order

4. Execute

Run the state machine with proper exception handling to gracefully handle interruptions and ensure clean shutdown of ROS resources.

    # Publish FSM information
    viewer = YasminViewerPub(sm, "YASMIN_ACTION_CLIENT_DEMO")

    # Create an initial blackboard with the input value
    blackboard = Blackboard()
    blackboard["n"] = 10  # Set the Fibonacci order to 10

    # Execute the FSM
    try:
        outcome = sm(blackboard)
        yasmin.YASMIN_LOG_INFO(outcome)
    except KeyboardInterrupt:
        if sm.is_running():
            sm.cancel_state()  # Cancel the state if interrupted
    finally:
        viewer.cleanup()
        del sm

        # Shutdown ROS 2 if it's running
        if rclpy.ok():
            rclpy.shutdown()


if __name__ == "__main__":
    main()

5. Run the Demo

Start the action server first to provide the Fibonacci computation service:

ros2 run yasmin_demos fibonacci_action_server

Then run the client to send the goal and receive feedback and results:

ros2 run yasmin_demos action_client_demo.py

Expected Behavior