Concurrence Demo (Python)

This tutorial demonstrates how to run multiple states concurrently using YASMIN's Concurrence container. Concurrence allows you to execute multiple states in parallel and define outcome mappings based on the results of these states. This is essential in robotics for scenarios like monitoring multiple sensors simultaneously, executing independent behaviors in parallel, or waiting for multiple conditions to be satisfied before proceeding. The Concurrence container handles all the complexity of thread management, synchronization, and outcome resolution, providing a clean interface for parallel state execution.

Background

The Concurrence container enables parallel execution of multiple states. It is useful when you need to:

1. Create the FooState

This state increments a counter and returns different outcomes based on the counter value. It executes relatively quickly (2 seconds) and stores a formatted string in the blackboard. The three different outcomes ("outcome1", "outcome2", "outcome3") demonstrate how concurrent states can progress through stages and produce varying results. The counter is incremented after determining the outcome, so the state's behavior evolves with each execution. This pattern is useful for tasks that have distinct phases or thresholds, like collecting sensor samples, performing iterations of a computation, or tracking progress toward a goal.

class FooState(State):
    """
    Represents the Foo state in the state machine.

    Attributes:
        counter (int): Counter to track the number of executions of this state.
    """

    def __init__(self) -> None:
        """
        Initializes the FooState instance, setting up the outcomes.

        Outcomes:
            outcome1: Indicates the state should continue.
            outcome2: Indicates the state should cotninue.
            outcome3: Indicates the state should finish execution and return.
        """
        super().__init__(["outcome1", "outcome2", "outcome3"])
        self.counter = 0

    def execute(self, blackboard: Blackboard) -> str:
        """
        Executes the logic for the Foo state.

        Args:
            blackboard (Blackboard): The shared data structure for states.

        Returns:
            str: The outcome of the execution.

        Raises:
            Exception: May raise exceptions related to state execution.
        """
        yasmin.YASMIN_LOG_INFO("Executing state FOO")
        time.sleep(2)

        blackboard["foo_str"] = f"Counter: {self.counter}"

        if self.counter < 3:
            outcome = "outcome1"
        elif self.counter < 5:
            outcome = "outcome2"
        else:
            outcome = "outcome3"

        yasmin.YASMIN_LOG_INFO("Finishing state FOO")
        self.counter += 1
        return outcome

2. Create the BarState

This state takes longer to execute (4 seconds) and attempts to read from the blackboard. The key point here is demonstrating concurrent access to shared data. Since BarState takes twice as long as FooState, there's a race condition in the first execution - BarState might try to read foo_str before FooState has written it. We handle this safely with the in operator to check if the key exists. This illustrates an important principle in concurrent programming: always assume data might not be available yet. BarState consistently returns "outcome3", providing stable behavior that contrasts with FooState's variable outcomes. The logs help visualize the parallel execution - you'll see "Executing state FOO" and "Executing state BAR" at similar times.

class BarState(State):
    """
    Represents the Bar state in the state machine.
    """

    def __init__(self) -> None:
        """
        Initializes the BarState instance, setting up the outcome.

        Outcomes:
            outcome3: This state will always return this outcome
        """
        super().__init__(outcomes=["outcome3"])

    def execute(self, blackboard: Blackboard) -> str:
        """
        Executes the logic for the Bar state.

        Args:
            blackboard (Blackboard): The shared data structure for states.

        Returns:
            str: The outcome of the execution, which will always be "outcome3".

        Raises:
            Exception: May raise exceptions related to state execution.
        """
        yasmin.YASMIN_LOG_INFO("Executing state BAR")
        time.sleep(4)

        if "foo_str" in blackboard:
            yasmin.YASMIN_LOG_INFO(blackboard["foo_str"])
        else:
            yasmin.YASMIN_LOG_INFO("Blackboard does not yet contain 'foo_str'")

        yasmin.YASMIN_LOG_INFO("Finishing state BAR")
        return "outcome3"

3. Create the Concurrence Container

Initialize ROS 2 and create a Concurrence container that executes FooState and BarState simultaneously. The outcome_map parameter defines how the combined outcomes are determined - in this case, we specify that when BarState produces outcome3 and FooState produces outcome2, the concurrence should output outcome5 to end execution. The outcome_cb callback provides additional control by allowing you to inspect individual state outcomes and make custom decisions. This two-level outcome resolution (map + callback) gives you flexibility: use the map for simple AND/OR logic, and the callback for complex conditional logic based on runtime state.

rclpy.init()
set_ros_loggers()

sm = StateMachine(outcomes=["outcome4"])

foo_state = FooState()
bar_state = BarState()

concurrence_state = Concurrence(
    states={
        "FOO": foo_state,
        "BAR": bar_state,
    },
    default_outcome="defaulted",
    outcome_map={
        "outcome1": {
            "FOO": "outcome1",
            "BAR": "outcome3",
        },
        "outcome2": {
            "FOO": "outcome2",
            "BAR": "outcome3",
        },
    },
)

4. Add Concurrence to State Machine

Add the concurrence container to the state machine with self-looping transitions for outcome1 and outcome2, allowing multiple concurrent executions. The defaulted outcome exits the loop and terminates the state machine.

sm.add_state(
    "CONCURRENCE",
    concurrence_state,
    transitions={
        "outcome1": "CONCURRENCE",
        "outcome2": "CONCURRENCE",
        "defaulted": "outcome4",
    },
)

YasminViewerPub(sm, "YASMIN_CONCURRENCE_DEMO")

5. Execute the State Machine

Execute the state machine with exception handling to manage interruptions and ensure proper cleanup of resources.

    # Publish FSM information for visualization
    viewer = YasminViewerPub(sm, "YASMIN_CONCURRENCE_DEMO")

    # Execute the FSM
    try:
        outcome = sm()
        yasmin.YASMIN_LOG_INFO(outcome)
    except KeyboardInterrupt:
        if sm.is_running():
            sm.cancel_state()
    finally:
        viewer.cleanup()
        del sm

        # Shutdown ROS 2 if it's running
        if rclpy.ok():
            rclpy.shutdown()


if __name__ == "__main__":
    main()

6. Run the Demo

Execute the concurrence demonstration to observe how multiple states run in parallel and how their outcomes are combined.

ros2 run yasmin_demos concurrence_demo.py

How It Works

The outcome_map defines when the concurrence completes: