Skip to main content

ROS 2 Execution Systems

This section covers implementing execution systems for robotics using ROS 2 (Robot Operating System 2). You'll learn to create robust execution frameworks that can execute plans generated by LLMs and other planning systems.

Understanding ROS 2 Execution

ROS 2 execution systems are responsible for:

  • Converting high-level plans into executable robot actions
  • Managing action execution and monitoring
  • Handling failures and recovery
  • Coordinating multiple robot subsystems
  • Providing feedback to planning systems

Key Components

  1. Action Clients: Interface with action servers
  2. Action Servers: Execute specific robot behaviors
  3. State Machines: Coordinate complex execution sequences
  4. Monitoring Systems: Track execution progress
  5. Recovery Mechanisms: Handle failures and exceptions

Implementing Basic ROS 2 Actions

import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from nav2_msgs.action import NavigateToPose
from geometry_msgs.msg import PoseStamped
from tf2_ros import TransformException
from tf2_ros.buffer import Buffer
from tf2_ros.transform_listener import TransformListener
import time

class NavigationActionClient(Node):
def __init__(self):
super().__init__('navigation_action_client')

# Create action client for navigation
self._action_client = ActionClient(self, NavigateToPose, 'navigate_to_pose')

# TF buffer for coordinate transformations
self.tf_buffer = Buffer()
self.tf_listener = TransformListener(self.tf_buffer, self)

def send_goal(self, x, y, theta):
"""
Send navigation goal to the action server
"""
# Wait for the action server to be available
self._action_client.wait_for_server()

# Create goal message
goal_msg = NavigateToPose.Goal()
goal_msg.pose.header.frame_id = 'map'
goal_msg.pose.header.stamp = self.get_clock().now().to_msg()

# Set the target pose
goal_msg.pose.pose.position.x = x
goal_msg.pose.pose.position.y = y
goal_msg.pose.pose.position.z = 0.0

# Convert theta (yaw) to quaternion
import math
from geometry_msgs.msg import Quaternion
goal_msg.pose.pose.orientation = Quaternion(
x=0.0,
y=0.0,
z=math.sin(theta / 2.0),
w=math.cos(theta / 2.0)
)

# Send the goal
self._send_goal_future = self._action_client.send_goal_async(
goal_msg,
feedback_callback=self.feedback_callback
)

self._send_goal_future.add_done_callback(self.goal_response_callback)

def goal_response_callback(self, future):
"""
Handle goal response from the action server
"""
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().info('Goal rejected')
return

self.get_logger().info('Goal accepted')

self._get_result_future = goal_handle.get_result_async()
self._get_result_future.add_done_callback(self.get_result_callback)

def get_result_callback(self, future):
"""
Handle result from the action server
"""
result = future.result().result
self.get_logger().info(f'Result: {result}')
rclpy.shutdown()

def feedback_callback(self, feedback_msg):
"""
Handle feedback during navigation
"""
feedback = feedback_msg.feedback
self.get_logger().info(f'Feedback: {feedback.current_pose}')

def main(args=None):
rclpy.init(args=args)

action_client = NavigationActionClient()

# Send a navigation goal
action_client.send_goal(1.0, 2.0, 0.0) # Navigate to (1, 2) with 0 rotation

rclpy.spin(action_client)

if __name__ == '__main__':
main()

Manipulation Action Client

import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from std_msgs.msg import String
from geometry_msgs.msg import Pose
import json

class ManipulationActionClient(Node):
def __init__(self):
super().__init__('manipulation_action_client')

# Publishers for manipulation commands
self.grasp_pub = self.create_publisher(String, '/manipulation/grasp', 10)
self.place_pub = self.create_publisher(String, '/manipulation/place', 10)
self.move_pub = self.create_publisher(String, '/manipulation/move', 10)

# State tracking
self.current_object = None
self.is_holding_object = False

def grasp_object(self, object_name, object_pose):
"""
Execute grasp action
"""
command = {
'action': 'grasp',
'object': object_name,
'pose': {
'position': {
'x': object_pose.position.x,
'y': object_pose.position.y,
'z': object_pose.position.z
},
'orientation': {
'x': object_pose.orientation.x,
'y': object_pose.orientation.y,
'z': object_pose.orientation.z,
'w': object_pose.orientation.w
}
},
'timestamp': self.get_clock().now().nanoseconds / 1e9
}

msg = String()
msg.data = json.dumps(command)
self.grasp_pub.publish(msg)

self.get_logger().info(f'Grasping object: {object_name}')
self.current_object = object_name
self.is_holding_object = True

def place_object(self, target_pose):
"""
Execute place action
"""
command = {
'action': 'place',
'target_pose': {
'position': {
'x': target_pose.position.x,
'y': target_pose.position.y,
'z': target_pose.position.z
},
'orientation': {
'x': target_pose.orientation.x,
'y': target_pose.orientation.y,
'z': target_pose.orientation.z,
'w': target_pose.orientation.w
}
},
'timestamp': self.get_clock().now().nanoseconds / 1e9
}

msg = String()
msg.data = json.dumps(command)
self.place_pub.publish(msg)

self.get_logger().info('Placing object')
self.current_object = None
self.is_holding_object = False

def move_to_pose(self, target_pose):
"""
Move manipulator to target pose
"""
command = {
'action': 'move_to_pose',
'target_pose': {
'position': {
'x': target_pose.position.x,
'y': target_pose.position.y,
'z': target_pose.position.z
},
'orientation': {
'x': target_pose.orientation.x,
'y': target_pose.orientation.y,
'z': target_pose.orientation.z,
'w': target_pose.orientation.w
}
},
'timestamp': self.get_clock().now().nanoseconds / 1e9
}

msg = String()
msg.data = json.dumps(command)
self.move_pub.publish(msg)

self.get_logger().info('Moving to target pose')

Advanced Execution Systems

Behavior Tree Implementation

from enum import Enum
import time
from typing import Any, Dict, List, Optional

class NodeStatus(Enum):
SUCCESS = 1
FAILURE = 2
RUNNING = 3

class BehaviorTreeNode:
def __init__(self, name: str):
self.name = name
self.status = NodeStatus.RUNNING

def tick(self) -> NodeStatus:
"""
Execute one cycle of the behavior
"""
raise NotImplementedError

class ActionNode(BehaviorTreeNode):
def __init__(self, name: str, action_func):
super().__init__(name)
self.action_func = action_func
self.is_running = False

def tick(self) -> NodeStatus:
if not self.is_running:
# Start the action
self.is_running = True
result = self.action_func()

if result:
self.is_running = False
return NodeStatus.SUCCESS
else:
return NodeStatus.RUNNING
else:
# Action is running, check if it's completed
# In a real implementation, this would check the action status
return NodeStatus.RUNNING

class SequenceNode(BehaviorTreeNode):
def __init__(self, name: str, children: List[BehaviorTreeNode]):
super().__init__(name)
self.children = children
self.current_child_idx = 0

def tick(self) -> NodeStatus:
while self.current_child_idx < len(self.children):
child_status = self.children[self.current_child_idx].tick()

if child_status == NodeStatus.FAILURE:
# Reset for next time
self.current_child_idx = 0
return NodeStatus.FAILURE
elif child_status == NodeStatus.RUNNING:
return NodeStatus.RUNNING
elif child_status == NodeStatus.SUCCESS:
# Move to next child
self.current_child_idx += 1

# All children succeeded
self.current_child_idx = 0
return NodeStatus.SUCCESS

class SelectorNode(BehaviorTreeNode):
def __init__(self, name: str, children: List[BehaviorTreeNode]):
super().__init__(name)
self.children = children
self.current_child_idx = 0

def tick(self) -> NodeStatus:
while self.current_child_idx < len(self.children):
child_status = self.children[self.current_child_idx].tick()

if child_status == NodeStatus.SUCCESS:
# Reset for next time
self.current_child_idx = 0
return NodeStatus.SUCCESS
elif child_status == NodeStatus.RUNNING:
return NodeStatus.RUNNING
elif child_status == NodeStatus.FAILURE:
# Try next child
self.current_child_idx += 1

# All children failed
self.current_child_idx = 0
return NodeStatus.FAILURE

class RobotBehaviorTree:
def __init__(self, root_node: BehaviorTreeNode):
self.root = root_node

def execute(self) -> NodeStatus:
"""
Execute one cycle of the behavior tree
"""
return self.root.tick()

# Example usage of behavior tree
def create_pick_and_place_behavior_tree(navigation_client, manipulation_client):
"""
Create a behavior tree for pick and place task
"""
# Define actions
def navigate_to_object():
# Implementation would call navigation client
print("Navigating to object...")
time.sleep(1) # Simulate navigation
return True # Return True when navigation complete

def detect_object():
print("Detecting object...")
time.sleep(0.5)
return True # Return True when object detected

def grasp_object():
print("Grasping object...")
time.sleep(1)
return True # Return True when grasp successful

def navigate_to_target():
print("Navigating to target location...")
time.sleep(1)
return True # Return True when navigation complete

def place_object():
print("Placing object...")
time.sleep(1)
return True # Return True when placement complete

# Create action nodes
navigate_to_obj_node = ActionNode("navigate_to_object", navigate_to_object)
detect_obj_node = ActionNode("detect_object", detect_object)
grasp_obj_node = ActionNode("grasp_object", grasp_object)
navigate_to_target_node = ActionNode("navigate_to_target", navigate_to_target)
place_obj_node = ActionNode("place_object", place_object)

# Create sequence for pick task
pick_sequence = SequenceNode("pick_sequence", [
navigate_to_obj_node,
detect_obj_node,
grasp_obj_node
])

# Create sequence for place task
place_sequence = SequenceNode("place_sequence", [
navigate_to_target_node,
place_obj_node
])

# Create main sequence
main_sequence = SequenceNode("main_sequence", [
pick_sequence,
place_sequence
])

return RobotBehaviorTree(main_sequence)

State Machine Implementation

from enum import Enum
import time
from typing import Any, Dict

class RobotState(Enum):
IDLE = 1
NAVIGATING = 2
MANIPULATING = 3
PERCEIVING = 4
ERROR = 5
SHUTDOWN = 6

class RobotStateMachine:
def __init__(self, navigation_client, manipulation_client, perception_client):
self.state = RobotState.IDLE
self.navigation_client = navigation_client
self.manipulation_client = manipulation_client
self.perception_client = perception_client
self.current_task = None
self.last_state_change = time.time()

def update(self):
"""
Update the state machine based on current state and conditions
"""
current_time = time.time()

if self.state == RobotState.IDLE:
self.handle_idle_state()
elif self.state == RobotState.NAVIGATING:
self.handle_navigation_state()
elif self.state == RobotState.MANIPULATING:
self.handle_manipulation_state()
elif self.state == RobotState.PERCEIVING:
self.handle_perception_state()
elif self.state == RobotState.ERROR:
self.handle_error_state()
elif self.state == RobotState.SHUTDOWN:
return # Stay in shutdown state

def handle_idle_state(self):
"""
Handle idle state - wait for tasks
"""
# Check for new tasks
# In a real implementation, this would subscribe to task messages
pass

def handle_navigation_state(self):
"""
Handle navigation state
"""
# Check if navigation is complete
# In a real implementation, this would check action server status
if self.is_navigation_complete():
self.set_state(RobotState.IDLE)

def handle_manipulation_state(self):
"""
Handle manipulation state
"""
# Check if manipulation is complete
if self.is_manipulation_complete():
self.set_state(RobotState.IDLE)

def handle_perception_state(self):
"""
Handle perception state
"""
# Check if perception task is complete
if self.is_perception_complete():
self.set_state(RobotState.IDLE)

def handle_error_state(self):
"""
Handle error state
"""
# Try to recover from error
if self.attempt_recovery():
self.set_state(RobotState.IDLE)
else:
# Stay in error state or shutdown
pass

def is_navigation_complete(self) -> bool:
"""
Check if navigation task is complete
"""
# Implementation would check navigation action status
return False

def is_manipulation_complete(self) -> bool:
"""
Check if manipulation task is complete
"""
# Implementation would check manipulation action status
return False

def is_perception_complete(self) -> bool:
"""
Check if perception task is complete
"""
# Implementation would check perception task status
return False

def attempt_recovery(self) -> bool:
"""
Attempt to recover from error state
"""
# Implementation would include recovery procedures
return True

def set_state(self, new_state: RobotState):
"""
Set new state and perform any required state change actions
"""
if self.state != new_state:
self.get_logger().info(f'State change: {self.state} -> {new_state}')
self.state = new_state
self.last_state_change = time.time()

def get_logger(self):
"""
Simple logger for the state machine
"""
class Logger:
def info(self, msg):
print(f"INFO: {msg}")
def error(self, msg):
print(f"ERROR: {msg}")
return Logger()

def request_navigation(self, x: float, y: float, theta: float):
"""
Request navigation to a specific pose
"""
if self.state == RobotState.IDLE:
self.navigation_client.send_goal(x, y, theta)
self.set_state(RobotState.NAVIGATING)
else:
self.get_logger().error(f'Cannot navigate from state: {self.state}')

def request_manipulation(self, action: str, params: Dict[str, Any]):
"""
Request manipulation action
"""
if self.state == RobotState.IDLE:
if action == 'grasp':
self.manipulation_client.grasp_object(
params['object_name'],
params['object_pose']
)
self.set_state(RobotState.MANIPULATING)
elif action == 'place':
self.manipulation_client.place_object(params['target_pose'])
self.set_state(RobotState.MANIPULATING)
else:
self.get_logger().error(f'Cannot manipulate from state: {self.state}')

Integration with Planning Systems

Execution Manager for LLM Plans

import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from action_msgs.msg import GoalStatus
import json
import threading
import time

class PlanExecutionManager(Node):
def __init__(self):
super().__init__('plan_execution_manager')

# Initialize clients
self.navigation_client = NavigationActionClient()
self.manipulation_client = ManipulationActionClient()

# Publishers and subscribers
self.plan_sub = self.create_subscription(
String, '/llm_plan/result', self.plan_callback, 10)
self.execution_feedback_pub = self.create_publisher(
String, '/execution_feedback', 10)
self.status_pub = self.create_publisher(String, '/execution_status', 10)

# Execution tracking
self.current_plan = None
self.current_step = 0
self.execution_active = False
self.execution_thread = None

self.get_logger().info('Plan Execution Manager initialized')

def plan_callback(self, msg):
"""
Handle incoming plans from LLM planner
"""
try:
plan_data = json.loads(msg.data)
plan = plan_data.get('plan', {})
task = plan_data.get('task', 'Unknown task')

self.get_logger().info(f'Received plan for task: {task}')
self.get_logger().info(f'Plan has {len(plan.get("steps", []))} steps')

# Start execution of the plan
self.execute_plan(plan)

except json.JSONDecodeError:
self.get_logger().error('Invalid JSON in plan message')
except Exception as e:
self.get_logger().error(f'Error processing plan: {e}')

def execute_plan(self, plan: Dict[str, Any]):
"""
Execute the given plan
"""
if self.execution_active:
self.get_logger().warn('Plan execution already active, stopping current plan')
self.stop_execution()

self.current_plan = plan
self.current_step = 0
self.execution_active = True

# Start execution in a separate thread
self.execution_thread = threading.Thread(target=self._execute_plan_thread)
self.execution_thread.start()

def _execute_plan_thread(self):
"""
Execute the plan in a separate thread
"""
steps = self.current_plan.get('steps', [])

for i, step in enumerate(steps):
if not self.execution_active:
break

self.current_step = i
self.get_logger().info(f'Executing step {i + 1}/{len(steps)}: {step.get("action", "unknown")}')

# Execute the step
success = self.execute_step(step)

# Publish feedback
feedback_msg = String()
feedback_msg.data = json.dumps({
'step_id': f"step_{i}",
'status': 'success' if success else 'failure',
'details': {'step_number': i, 'action': step.get('action')},
'timestamp': time.time()
})
self.execution_feedback_pub.publish(feedback_msg)

if not success:
self.get_logger().error(f'Step {i} failed')
break

self.execution_active = False
self.get_logger().info('Plan execution completed')

def execute_step(self, step: Dict[str, Any]) -> bool:
"""
Execute a single step of the plan
"""
action = step.get('action', '').lower()
parameters = step.get('parameters', {})

try:
if 'navigate' in action or 'move to' in action or 'go to' in action:
return self.execute_navigation_step(parameters)
elif 'grasp' in action or 'pick' in action or 'take' in action:
return self.execute_manipulation_step(action, parameters)
elif 'place' in action or 'put' in action:
return self.execute_placement_step(parameters)
elif 'detect' in action or 'find' in action or 'look for' in action:
return self.execute_perception_step(parameters)
else:
self.get_logger().warn(f'Unknown action: {action}')
return False

except Exception as e:
self.get_logger().error(f'Error executing step: {e}')
return False

def execute_navigation_step(self, parameters: Dict[str, Any]) -> bool:
"""
Execute navigation step
"""
try:
x = parameters.get('x', 0.0)
y = parameters.get('y', 0.0)
theta = parameters.get('theta', 0.0)

self.get_logger().info(f'Navigating to ({x}, {y}, {theta})')

# Call navigation client
self.navigation_client.send_goal(x, y, theta)

# Wait for completion (in a real system, this would be asynchronous)
time.sleep(2) # Simulate navigation time

return True
except Exception as e:
self.get_logger().error(f'Navigation error: {e}')
return False

def execute_manipulation_step(self, action: str, parameters: Dict[str, Any]) -> bool:
"""
Execute manipulation step
"""
try:
if 'grasp' in action or 'pick' in action or 'take' in action:
object_name = parameters.get('object', 'unknown')
object_pose = parameters.get('pose', {})

# Convert dict pose to ROS Pose message
from geometry_msgs.msg import Pose
ros_pose = Pose()
ros_pose.position.x = object_pose.get('x', 0.0)
ros_pose.position.y = object_pose.get('y', 0.0)
ros_pose.position.z = object_pose.get('z', 0.0)

self.manipulation_client.grasp_object(object_name, ros_pose)

# Wait for completion
time.sleep(2)

return True
else:
return False
except Exception as e:
self.get_logger().error(f'Manipulation error: {e}')
return False

def execute_placement_step(self, parameters: Dict[str, Any]) -> bool:
"""
Execute placement step
"""
try:
target_pose = parameters.get('target_pose', {})

# Convert dict pose to ROS Pose message
from geometry_msgs.msg import Pose
ros_pose = Pose()
ros_pose.position.x = target_pose.get('x', 0.0)
ros_pose.position.y = target_pose.get('y', 0.0)
ros_pose.position.z = target_pose.get('z', 0.0)

self.manipulation_client.place_object(ros_pose)

# Wait for completion
time.sleep(2)

return True
except Exception as e:
self.get_logger().error(f'Placement error: {e}')
return False

def execute_perception_step(self, parameters: Dict[str, Any]) -> bool:
"""
Execute perception step
"""
try:
target_object = parameters.get('target', 'unknown')
self.get_logger().info(f'Detecting object: {target_object}')

# In a real system, this would call perception services
time.sleep(1) # Simulate perception time

return True
except Exception as e:
self.get_logger().error(f'Perception error: {e}')
return False

def stop_execution(self):
"""
Stop current plan execution
"""
self.execution_active = False
if self.execution_thread:
self.execution_thread.join(timeout=1.0)

def destroy_node(self):
"""
Clean up before node destruction
"""
self.stop_execution()
super().destroy_node()

Error Handling and Recovery

Robust Execution with Error Handling

class RobustExecutionManager(PlanExecutionManager):
def __init__(self):
super().__init__()

# Add error handling components
self.max_retries = 3
self.retry_delay = 1.0
self.error_recovery_enabled = True

# Error statistics
self.error_stats = {
'total_errors': 0,
'recovered_errors': 0,
'fatal_errors': 0
}

def execute_step_with_retry(self, step: Dict[str, Any]) -> bool:
"""
Execute a step with retry logic
"""
action = step.get('action', '').lower()
parameters = step.get('parameters', {})

for attempt in range(self.max_retries):
self.get_logger().info(f'Attempt {attempt + 1} for action: {action}')

try:
success = self.execute_step(step)

if success:
return True
else:
self.get_logger().warn(f'Step failed on attempt {attempt + 1}')

if attempt < self.max_retries - 1:
time.sleep(self.retry_delay * (2 ** attempt)) # Exponential backoff

except Exception as e:
self.get_logger().error(f'Exception on attempt {attempt + 1}: {e}')
self.error_stats['total_errors'] += 1

if attempt < self.max_retries - 1:
time.sleep(self.retry_delay * (2 ** attempt))

# All retries failed
self.error_stats['fatal_errors'] += 1
self.get_logger().error(f'All retries failed for action: {action}')
return False

def execute_plan_with_recovery(self, plan: Dict[str, Any]):
"""
Execute plan with error recovery capabilities
"""
if self.execution_active:
self.stop_execution()

self.current_plan = plan
self.current_step = 0
self.execution_active = True

self.execution_thread = threading.Thread(
target=self._execute_plan_with_recovery_thread
)
self.execution_thread.start()

def _execute_plan_with_recovery_thread(self):
"""
Execute plan with recovery in a separate thread
"""
steps = self.current_plan.get('steps', [])

for i, step in enumerate(steps):
if not self.execution_active:
break

self.current_step = i
self.get_logger().info(f'Executing step {i + 1}/{len(steps)}: {step.get("action", "unknown")}')

# Execute the step with retry and recovery
success = self.execute_step_with_retry(step)

if not success:
# Try to recover from failure
if self.error_recovery_enabled:
recovered = self.attempt_recovery(step, i)
if recovered:
self.error_stats['recovered_errors'] += 1
self.get_logger().info(f'Recovered from step {i} failure')
continue # Continue with next step
else:
self.get_logger().error(f'Failed to recover from step {i} failure')
break
else:
self.get_logger().error(f'Step {i} failed and recovery disabled')
break

# Publish success feedback
feedback_msg = String()
feedback_msg.data = json.dumps({
'step_id': f"step_{i}",
'status': 'success',
'details': {'step_number': i, 'action': step.get('action')},
'timestamp': time.time()
})
self.execution_feedback_pub.publish(feedback_msg)

self.execution_active = False
self.get_logger().info('Plan execution completed')

def attempt_recovery(self, failed_step: Dict[str, Any], step_index: int) -> bool:
"""
Attempt to recover from a failed step
"""
action = failed_step.get('action', '').lower()

# Implement recovery strategies based on action type
if 'navigate' in action:
return self.recover_navigation_failure(failed_step, step_index)
elif 'grasp' in action or 'manipulat' in action:
return self.recover_manipulation_failure(failed_step, step_index)
else:
return self.generic_recovery(failed_step, step_index)

def recover_navigation_failure(self, failed_step: Dict[str, Any], step_index: int) -> bool:
"""
Recovery strategy for navigation failures
"""
try:
self.get_logger().info('Attempting navigation recovery...')

# Try alternative navigation approach
# This could involve:
# - Clearing costmaps
# - Re-localizing
# - Using different planner
# - Manual intervention request

# For now, simulate recovery
time.sleep(1)
return True

except Exception as e:
self.get_logger().error(f'Navigation recovery failed: {e}')
return False

def recover_manipulation_failure(self, failed_step: Dict[str, Any], step_index: int) -> bool:
"""
Recovery strategy for manipulation failures
"""
try:
self.get_logger().info('Attempting manipulation recovery...')

# Try alternative manipulation approach
# This could involve:
# - Re-approaching the object
# - Different grasp strategy
# - Requesting human assistance

# For now, simulate recovery
time.sleep(1)
return True

except Exception as e:
self.get_logger().error(f'Manipulation recovery failed: {e}')
return False

def generic_recovery(self, failed_step: Dict[str, Any], step_index: int) -> bool:
"""
Generic recovery for unknown failure types
"""
try:
self.get_logger().info('Attempting generic recovery...')

# Generic recovery steps
time.sleep(1)
return True

except Exception as e:
self.get_logger().error(f'Generic recovery failed: {e}')
return False

Performance Monitoring

Execution Performance Monitoring

import psutil
import time
from collections import deque
import statistics

class ExecutionPerformanceMonitor:
def __init__(self):
self.process = psutil.Process()
self.metrics = {
'execution_times': deque(maxlen=100),
'cpu_percentages': deque(maxlen=100),
'memory_usage': deque(maxlen=100),
'success_rates': deque(maxlen=100)
}
self.start_time = time.time()

def record_execution_time(self, execution_time: float):
"""
Record execution time for a step
"""
self.metrics['execution_times'].append(execution_time)

def record_system_metrics(self):
"""
Record system performance metrics
"""
cpu_percent = self.process.cpu_percent()
memory_percent = self.process.memory_percent()

self.metrics['cpu_percentages'].append(cpu_percent)
self.metrics['memory_usage'].append(memory_percent)

def record_success_rate(self, success: bool):
"""
Record success/failure of an execution
"""
self.metrics['success_rates'].append(1.0 if success else 0.0)

def get_performance_summary(self) -> Dict[str, Any]:
"""
Get performance summary statistics
"""
summary = {
'uptime': time.time() - self.start_time,
'current_cpu': self.process.cpu_percent(),
'current_memory': self.process.memory_percent(),
'process_id': self.process.pid
}

if self.metrics['execution_times']:
summary['avg_execution_time'] = statistics.mean(self.metrics['execution_times'])
summary['min_execution_time'] = min(self.metrics['execution_times'])
summary['max_execution_time'] = max(self.metrics['execution_times'])

if self.metrics['cpu_percentages']:
summary['avg_cpu'] = statistics.mean(self.metrics['cpu_percentages'])

if self.metrics['memory_usage']:
summary['avg_memory'] = statistics.mean(self.metrics['memory_usage'])

if self.metrics['success_rates']:
summary['success_rate'] = statistics.mean(self.metrics['success_rates'])

return summary

class MonitoredExecutionManager(RobustExecutionManager):
def __init__(self):
super().__init__()
self.performance_monitor = ExecutionPerformanceMonitor()

def execute_step_with_monitoring(self, step: Dict[str, Any]) -> bool:
"""
Execute a step with performance monitoring
"""
start_time = time.time()

# Record system metrics
self.performance_monitor.record_system_metrics()

# Execute the step
success = self.execute_step_with_retry(step)

# Record execution time
execution_time = time.time() - start_time
self.performance_monitor.record_execution_time(execution_time)

# Record success rate
self.performance_monitor.record_success_rate(success)

return success

def get_performance_report(self):
"""
Get performance report
"""
return self.performance_monitor.get_performance_summary()

Hands-on Exercise

Create a complete ROS 2 execution system for your robot that:

  1. Implements action clients for navigation and manipulation
  2. Creates behavior trees or state machines for complex tasks
  3. Integrates with your LLM planning system
  4. Includes error handling and recovery mechanisms
  5. Provides performance monitoring and feedback

This exercise will help you build a robust execution framework that can reliably execute plans generated by AI systems.