ROS 2 Execution Systems
This section covers implementing execution systems for robotics using ROS 2 (Robot Operating System 2). You'll learn to create robust execution frameworks that can execute plans generated by LLMs and other planning systems.
Understanding ROS 2 Execution
ROS 2 execution systems are responsible for:
- Converting high-level plans into executable robot actions
- Managing action execution and monitoring
- Handling failures and recovery
- Coordinating multiple robot subsystems
- Providing feedback to planning systems
Key Components
- Action Clients: Interface with action servers
- Action Servers: Execute specific robot behaviors
- State Machines: Coordinate complex execution sequences
- Monitoring Systems: Track execution progress
- Recovery Mechanisms: Handle failures and exceptions
Implementing Basic ROS 2 Actions
Navigation Action Client
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from nav2_msgs.action import NavigateToPose
from geometry_msgs.msg import PoseStamped
from tf2_ros import TransformException
from tf2_ros.buffer import Buffer
from tf2_ros.transform_listener import TransformListener
import time
class NavigationActionClient(Node):
def __init__(self):
super().__init__('navigation_action_client')
# Create action client for navigation
self._action_client = ActionClient(self, NavigateToPose, 'navigate_to_pose')
# TF buffer for coordinate transformations
self.tf_buffer = Buffer()
self.tf_listener = TransformListener(self.tf_buffer, self)
def send_goal(self, x, y, theta):
"""
Send navigation goal to the action server
"""
# Wait for the action server to be available
self._action_client.wait_for_server()
# Create goal message
goal_msg = NavigateToPose.Goal()
goal_msg.pose.header.frame_id = 'map'
goal_msg.pose.header.stamp = self.get_clock().now().to_msg()
# Set the target pose
goal_msg.pose.pose.position.x = x
goal_msg.pose.pose.position.y = y
goal_msg.pose.pose.position.z = 0.0
# Convert theta (yaw) to quaternion
import math
from geometry_msgs.msg import Quaternion
goal_msg.pose.pose.orientation = Quaternion(
x=0.0,
y=0.0,
z=math.sin(theta / 2.0),
w=math.cos(theta / 2.0)
)
# Send the goal
self._send_goal_future = self._action_client.send_goal_async(
goal_msg,
feedback_callback=self.feedback_callback
)
self._send_goal_future.add_done_callback(self.goal_response_callback)
def goal_response_callback(self, future):
"""
Handle goal response from the action server
"""
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().info('Goal rejected')
return
self.get_logger().info('Goal accepted')
self._get_result_future = goal_handle.get_result_async()
self._get_result_future.add_done_callback(self.get_result_callback)
def get_result_callback(self, future):
"""
Handle result from the action server
"""
result = future.result().result
self.get_logger().info(f'Result: {result}')
rclpy.shutdown()
def feedback_callback(self, feedback_msg):
"""
Handle feedback during navigation
"""
feedback = feedback_msg.feedback
self.get_logger().info(f'Feedback: {feedback.current_pose}')
def main(args=None):
rclpy.init(args=args)
action_client = NavigationActionClient()
# Send a navigation goal
action_client.send_goal(1.0, 2.0, 0.0) # Navigate to (1, 2) with 0 rotation
rclpy.spin(action_client)
if __name__ == '__main__':
main()
Manipulation Action Client
import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node
from std_msgs.msg import String
from geometry_msgs.msg import Pose
import json
class ManipulationActionClient(Node):
def __init__(self):
super().__init__('manipulation_action_client')
# Publishers for manipulation commands
self.grasp_pub = self.create_publisher(String, '/manipulation/grasp', 10)
self.place_pub = self.create_publisher(String, '/manipulation/place', 10)
self.move_pub = self.create_publisher(String, '/manipulation/move', 10)
# State tracking
self.current_object = None
self.is_holding_object = False
def grasp_object(self, object_name, object_pose):
"""
Execute grasp action
"""
command = {
'action': 'grasp',
'object': object_name,
'pose': {
'position': {
'x': object_pose.position.x,
'y': object_pose.position.y,
'z': object_pose.position.z
},
'orientation': {
'x': object_pose.orientation.x,
'y': object_pose.orientation.y,
'z': object_pose.orientation.z,
'w': object_pose.orientation.w
}
},
'timestamp': self.get_clock().now().nanoseconds / 1e9
}
msg = String()
msg.data = json.dumps(command)
self.grasp_pub.publish(msg)
self.get_logger().info(f'Grasping object: {object_name}')
self.current_object = object_name
self.is_holding_object = True
def place_object(self, target_pose):
"""
Execute place action
"""
command = {
'action': 'place',
'target_pose': {
'position': {
'x': target_pose.position.x,
'y': target_pose.position.y,
'z': target_pose.position.z
},
'orientation': {
'x': target_pose.orientation.x,
'y': target_pose.orientation.y,
'z': target_pose.orientation.z,
'w': target_pose.orientation.w
}
},
'timestamp': self.get_clock().now().nanoseconds / 1e9
}
msg = String()
msg.data = json.dumps(command)
self.place_pub.publish(msg)
self.get_logger().info('Placing object')
self.current_object = None
self.is_holding_object = False
def move_to_pose(self, target_pose):
"""
Move manipulator to target pose
"""
command = {
'action': 'move_to_pose',
'target_pose': {
'position': {
'x': target_pose.position.x,
'y': target_pose.position.y,
'z': target_pose.position.z
},
'orientation': {
'x': target_pose.orientation.x,
'y': target_pose.orientation.y,
'z': target_pose.orientation.z,
'w': target_pose.orientation.w
}
},
'timestamp': self.get_clock().now().nanoseconds / 1e9
}
msg = String()
msg.data = json.dumps(command)
self.move_pub.publish(msg)
self.get_logger().info('Moving to target pose')
Advanced Execution Systems
Behavior Tree Implementation
from enum import Enum
import time
from typing import Any, Dict, List, Optional
class NodeStatus(Enum):
SUCCESS = 1
FAILURE = 2
RUNNING = 3
class BehaviorTreeNode:
def __init__(self, name: str):
self.name = name
self.status = NodeStatus.RUNNING
def tick(self) -> NodeStatus:
"""
Execute one cycle of the behavior
"""
raise NotImplementedError
class ActionNode(BehaviorTreeNode):
def __init__(self, name: str, action_func):
super().__init__(name)
self.action_func = action_func
self.is_running = False
def tick(self) -> NodeStatus:
if not self.is_running:
# Start the action
self.is_running = True
result = self.action_func()
if result:
self.is_running = False
return NodeStatus.SUCCESS
else:
return NodeStatus.RUNNING
else:
# Action is running, check if it's completed
# In a real implementation, this would check the action status
return NodeStatus.RUNNING
class SequenceNode(BehaviorTreeNode):
def __init__(self, name: str, children: List[BehaviorTreeNode]):
super().__init__(name)
self.children = children
self.current_child_idx = 0
def tick(self) -> NodeStatus:
while self.current_child_idx < len(self.children):
child_status = self.children[self.current_child_idx].tick()
if child_status == NodeStatus.FAILURE:
# Reset for next time
self.current_child_idx = 0
return NodeStatus.FAILURE
elif child_status == NodeStatus.RUNNING:
return NodeStatus.RUNNING
elif child_status == NodeStatus.SUCCESS:
# Move to next child
self.current_child_idx += 1
# All children succeeded
self.current_child_idx = 0
return NodeStatus.SUCCESS
class SelectorNode(BehaviorTreeNode):
def __init__(self, name: str, children: List[BehaviorTreeNode]):
super().__init__(name)
self.children = children
self.current_child_idx = 0
def tick(self) -> NodeStatus:
while self.current_child_idx < len(self.children):
child_status = self.children[self.current_child_idx].tick()
if child_status == NodeStatus.SUCCESS:
# Reset for next time
self.current_child_idx = 0
return NodeStatus.SUCCESS
elif child_status == NodeStatus.RUNNING:
return NodeStatus.RUNNING
elif child_status == NodeStatus.FAILURE:
# Try next child
self.current_child_idx += 1
# All children failed
self.current_child_idx = 0
return NodeStatus.FAILURE
class RobotBehaviorTree:
def __init__(self, root_node: BehaviorTreeNode):
self.root = root_node
def execute(self) -> NodeStatus:
"""
Execute one cycle of the behavior tree
"""
return self.root.tick()
# Example usage of behavior tree
def create_pick_and_place_behavior_tree(navigation_client, manipulation_client):
"""
Create a behavior tree for pick and place task
"""
# Define actions
def navigate_to_object():
# Implementation would call navigation client
print("Navigating to object...")
time.sleep(1) # Simulate navigation
return True # Return True when navigation complete
def detect_object():
print("Detecting object...")
time.sleep(0.5)
return True # Return True when object detected
def grasp_object():
print("Grasping object...")
time.sleep(1)
return True # Return True when grasp successful
def navigate_to_target():
print("Navigating to target location...")
time.sleep(1)
return True # Return True when navigation complete
def place_object():
print("Placing object...")
time.sleep(1)
return True # Return True when placement complete
# Create action nodes
navigate_to_obj_node = ActionNode("navigate_to_object", navigate_to_object)
detect_obj_node = ActionNode("detect_object", detect_object)
grasp_obj_node = ActionNode("grasp_object", grasp_object)
navigate_to_target_node = ActionNode("navigate_to_target", navigate_to_target)
place_obj_node = ActionNode("place_object", place_object)
# Create sequence for pick task
pick_sequence = SequenceNode("pick_sequence", [
navigate_to_obj_node,
detect_obj_node,
grasp_obj_node
])
# Create sequence for place task
place_sequence = SequenceNode("place_sequence", [
navigate_to_target_node,
place_obj_node
])
# Create main sequence
main_sequence = SequenceNode("main_sequence", [
pick_sequence,
place_sequence
])
return RobotBehaviorTree(main_sequence)
State Machine Implementation
from enum import Enum
import time
from typing import Any, Dict
class RobotState(Enum):
IDLE = 1
NAVIGATING = 2
MANIPULATING = 3
PERCEIVING = 4
ERROR = 5
SHUTDOWN = 6
class RobotStateMachine:
def __init__(self, navigation_client, manipulation_client, perception_client):
self.state = RobotState.IDLE
self.navigation_client = navigation_client
self.manipulation_client = manipulation_client
self.perception_client = perception_client
self.current_task = None
self.last_state_change = time.time()
def update(self):
"""
Update the state machine based on current state and conditions
"""
current_time = time.time()
if self.state == RobotState.IDLE:
self.handle_idle_state()
elif self.state == RobotState.NAVIGATING:
self.handle_navigation_state()
elif self.state == RobotState.MANIPULATING:
self.handle_manipulation_state()
elif self.state == RobotState.PERCEIVING:
self.handle_perception_state()
elif self.state == RobotState.ERROR:
self.handle_error_state()
elif self.state == RobotState.SHUTDOWN:
return # Stay in shutdown state
def handle_idle_state(self):
"""
Handle idle state - wait for tasks
"""
# Check for new tasks
# In a real implementation, this would subscribe to task messages
pass
def handle_navigation_state(self):
"""
Handle navigation state
"""
# Check if navigation is complete
# In a real implementation, this would check action server status
if self.is_navigation_complete():
self.set_state(RobotState.IDLE)
def handle_manipulation_state(self):
"""
Handle manipulation state
"""
# Check if manipulation is complete
if self.is_manipulation_complete():
self.set_state(RobotState.IDLE)
def handle_perception_state(self):
"""
Handle perception state
"""
# Check if perception task is complete
if self.is_perception_complete():
self.set_state(RobotState.IDLE)
def handle_error_state(self):
"""
Handle error state
"""
# Try to recover from error
if self.attempt_recovery():
self.set_state(RobotState.IDLE)
else:
# Stay in error state or shutdown
pass
def is_navigation_complete(self) -> bool:
"""
Check if navigation task is complete
"""
# Implementation would check navigation action status
return False
def is_manipulation_complete(self) -> bool:
"""
Check if manipulation task is complete
"""
# Implementation would check manipulation action status
return False
def is_perception_complete(self) -> bool:
"""
Check if perception task is complete
"""
# Implementation would check perception task status
return False
def attempt_recovery(self) -> bool:
"""
Attempt to recover from error state
"""
# Implementation would include recovery procedures
return True
def set_state(self, new_state: RobotState):
"""
Set new state and perform any required state change actions
"""
if self.state != new_state:
self.get_logger().info(f'State change: {self.state} -> {new_state}')
self.state = new_state
self.last_state_change = time.time()
def get_logger(self):
"""
Simple logger for the state machine
"""
class Logger:
def info(self, msg):
print(f"INFO: {msg}")
def error(self, msg):
print(f"ERROR: {msg}")
return Logger()
def request_navigation(self, x: float, y: float, theta: float):
"""
Request navigation to a specific pose
"""
if self.state == RobotState.IDLE:
self.navigation_client.send_goal(x, y, theta)
self.set_state(RobotState.NAVIGATING)
else:
self.get_logger().error(f'Cannot navigate from state: {self.state}')
def request_manipulation(self, action: str, params: Dict[str, Any]):
"""
Request manipulation action
"""
if self.state == RobotState.IDLE:
if action == 'grasp':
self.manipulation_client.grasp_object(
params['object_name'],
params['object_pose']
)
self.set_state(RobotState.MANIPULATING)
elif action == 'place':
self.manipulation_client.place_object(params['target_pose'])
self.set_state(RobotState.MANIPULATING)
else:
self.get_logger().error(f'Cannot manipulate from state: {self.state}')
Integration with Planning Systems
Execution Manager for LLM Plans
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from action_msgs.msg import GoalStatus
import json
import threading
import time
class PlanExecutionManager(Node):
def __init__(self):
super().__init__('plan_execution_manager')
# Initialize clients
self.navigation_client = NavigationActionClient()
self.manipulation_client = ManipulationActionClient()
# Publishers and subscribers
self.plan_sub = self.create_subscription(
String, '/llm_plan/result', self.plan_callback, 10)
self.execution_feedback_pub = self.create_publisher(
String, '/execution_feedback', 10)
self.status_pub = self.create_publisher(String, '/execution_status', 10)
# Execution tracking
self.current_plan = None
self.current_step = 0
self.execution_active = False
self.execution_thread = None
self.get_logger().info('Plan Execution Manager initialized')
def plan_callback(self, msg):
"""
Handle incoming plans from LLM planner
"""
try:
plan_data = json.loads(msg.data)
plan = plan_data.get('plan', {})
task = plan_data.get('task', 'Unknown task')
self.get_logger().info(f'Received plan for task: {task}')
self.get_logger().info(f'Plan has {len(plan.get("steps", []))} steps')
# Start execution of the plan
self.execute_plan(plan)
except json.JSONDecodeError:
self.get_logger().error('Invalid JSON in plan message')
except Exception as e:
self.get_logger().error(f'Error processing plan: {e}')
def execute_plan(self, plan: Dict[str, Any]):
"""
Execute the given plan
"""
if self.execution_active:
self.get_logger().warn('Plan execution already active, stopping current plan')
self.stop_execution()
self.current_plan = plan
self.current_step = 0
self.execution_active = True
# Start execution in a separate thread
self.execution_thread = threading.Thread(target=self._execute_plan_thread)
self.execution_thread.start()
def _execute_plan_thread(self):
"""
Execute the plan in a separate thread
"""
steps = self.current_plan.get('steps', [])
for i, step in enumerate(steps):
if not self.execution_active:
break
self.current_step = i
self.get_logger().info(f'Executing step {i + 1}/{len(steps)}: {step.get("action", "unknown")}')
# Execute the step
success = self.execute_step(step)
# Publish feedback
feedback_msg = String()
feedback_msg.data = json.dumps({
'step_id': f"step_{i}",
'status': 'success' if success else 'failure',
'details': {'step_number': i, 'action': step.get('action')},
'timestamp': time.time()
})
self.execution_feedback_pub.publish(feedback_msg)
if not success:
self.get_logger().error(f'Step {i} failed')
break
self.execution_active = False
self.get_logger().info('Plan execution completed')
def execute_step(self, step: Dict[str, Any]) -> bool:
"""
Execute a single step of the plan
"""
action = step.get('action', '').lower()
parameters = step.get('parameters', {})
try:
if 'navigate' in action or 'move to' in action or 'go to' in action:
return self.execute_navigation_step(parameters)
elif 'grasp' in action or 'pick' in action or 'take' in action:
return self.execute_manipulation_step(action, parameters)
elif 'place' in action or 'put' in action:
return self.execute_placement_step(parameters)
elif 'detect' in action or 'find' in action or 'look for' in action:
return self.execute_perception_step(parameters)
else:
self.get_logger().warn(f'Unknown action: {action}')
return False
except Exception as e:
self.get_logger().error(f'Error executing step: {e}')
return False
def execute_navigation_step(self, parameters: Dict[str, Any]) -> bool:
"""
Execute navigation step
"""
try:
x = parameters.get('x', 0.0)
y = parameters.get('y', 0.0)
theta = parameters.get('theta', 0.0)
self.get_logger().info(f'Navigating to ({x}, {y}, {theta})')
# Call navigation client
self.navigation_client.send_goal(x, y, theta)
# Wait for completion (in a real system, this would be asynchronous)
time.sleep(2) # Simulate navigation time
return True
except Exception as e:
self.get_logger().error(f'Navigation error: {e}')
return False
def execute_manipulation_step(self, action: str, parameters: Dict[str, Any]) -> bool:
"""
Execute manipulation step
"""
try:
if 'grasp' in action or 'pick' in action or 'take' in action:
object_name = parameters.get('object', 'unknown')
object_pose = parameters.get('pose', {})
# Convert dict pose to ROS Pose message
from geometry_msgs.msg import Pose
ros_pose = Pose()
ros_pose.position.x = object_pose.get('x', 0.0)
ros_pose.position.y = object_pose.get('y', 0.0)
ros_pose.position.z = object_pose.get('z', 0.0)
self.manipulation_client.grasp_object(object_name, ros_pose)
# Wait for completion
time.sleep(2)
return True
else:
return False
except Exception as e:
self.get_logger().error(f'Manipulation error: {e}')
return False
def execute_placement_step(self, parameters: Dict[str, Any]) -> bool:
"""
Execute placement step
"""
try:
target_pose = parameters.get('target_pose', {})
# Convert dict pose to ROS Pose message
from geometry_msgs.msg import Pose
ros_pose = Pose()
ros_pose.position.x = target_pose.get('x', 0.0)
ros_pose.position.y = target_pose.get('y', 0.0)
ros_pose.position.z = target_pose.get('z', 0.0)
self.manipulation_client.place_object(ros_pose)
# Wait for completion
time.sleep(2)
return True
except Exception as e:
self.get_logger().error(f'Placement error: {e}')
return False
def execute_perception_step(self, parameters: Dict[str, Any]) -> bool:
"""
Execute perception step
"""
try:
target_object = parameters.get('target', 'unknown')
self.get_logger().info(f'Detecting object: {target_object}')
# In a real system, this would call perception services
time.sleep(1) # Simulate perception time
return True
except Exception as e:
self.get_logger().error(f'Perception error: {e}')
return False
def stop_execution(self):
"""
Stop current plan execution
"""
self.execution_active = False
if self.execution_thread:
self.execution_thread.join(timeout=1.0)
def destroy_node(self):
"""
Clean up before node destruction
"""
self.stop_execution()
super().destroy_node()
Error Handling and Recovery
Robust Execution with Error Handling
class RobustExecutionManager(PlanExecutionManager):
def __init__(self):
super().__init__()
# Add error handling components
self.max_retries = 3
self.retry_delay = 1.0
self.error_recovery_enabled = True
# Error statistics
self.error_stats = {
'total_errors': 0,
'recovered_errors': 0,
'fatal_errors': 0
}
def execute_step_with_retry(self, step: Dict[str, Any]) -> bool:
"""
Execute a step with retry logic
"""
action = step.get('action', '').lower()
parameters = step.get('parameters', {})
for attempt in range(self.max_retries):
self.get_logger().info(f'Attempt {attempt + 1} for action: {action}')
try:
success = self.execute_step(step)
if success:
return True
else:
self.get_logger().warn(f'Step failed on attempt {attempt + 1}')
if attempt < self.max_retries - 1:
time.sleep(self.retry_delay * (2 ** attempt)) # Exponential backoff
except Exception as e:
self.get_logger().error(f'Exception on attempt {attempt + 1}: {e}')
self.error_stats['total_errors'] += 1
if attempt < self.max_retries - 1:
time.sleep(self.retry_delay * (2 ** attempt))
# All retries failed
self.error_stats['fatal_errors'] += 1
self.get_logger().error(f'All retries failed for action: {action}')
return False
def execute_plan_with_recovery(self, plan: Dict[str, Any]):
"""
Execute plan with error recovery capabilities
"""
if self.execution_active:
self.stop_execution()
self.current_plan = plan
self.current_step = 0
self.execution_active = True
self.execution_thread = threading.Thread(
target=self._execute_plan_with_recovery_thread
)
self.execution_thread.start()
def _execute_plan_with_recovery_thread(self):
"""
Execute plan with recovery in a separate thread
"""
steps = self.current_plan.get('steps', [])
for i, step in enumerate(steps):
if not self.execution_active:
break
self.current_step = i
self.get_logger().info(f'Executing step {i + 1}/{len(steps)}: {step.get("action", "unknown")}')
# Execute the step with retry and recovery
success = self.execute_step_with_retry(step)
if not success:
# Try to recover from failure
if self.error_recovery_enabled:
recovered = self.attempt_recovery(step, i)
if recovered:
self.error_stats['recovered_errors'] += 1
self.get_logger().info(f'Recovered from step {i} failure')
continue # Continue with next step
else:
self.get_logger().error(f'Failed to recover from step {i} failure')
break
else:
self.get_logger().error(f'Step {i} failed and recovery disabled')
break
# Publish success feedback
feedback_msg = String()
feedback_msg.data = json.dumps({
'step_id': f"step_{i}",
'status': 'success',
'details': {'step_number': i, 'action': step.get('action')},
'timestamp': time.time()
})
self.execution_feedback_pub.publish(feedback_msg)
self.execution_active = False
self.get_logger().info('Plan execution completed')
def attempt_recovery(self, failed_step: Dict[str, Any], step_index: int) -> bool:
"""
Attempt to recover from a failed step
"""
action = failed_step.get('action', '').lower()
# Implement recovery strategies based on action type
if 'navigate' in action:
return self.recover_navigation_failure(failed_step, step_index)
elif 'grasp' in action or 'manipulat' in action:
return self.recover_manipulation_failure(failed_step, step_index)
else:
return self.generic_recovery(failed_step, step_index)
def recover_navigation_failure(self, failed_step: Dict[str, Any], step_index: int) -> bool:
"""
Recovery strategy for navigation failures
"""
try:
self.get_logger().info('Attempting navigation recovery...')
# Try alternative navigation approach
# This could involve:
# - Clearing costmaps
# - Re-localizing
# - Using different planner
# - Manual intervention request
# For now, simulate recovery
time.sleep(1)
return True
except Exception as e:
self.get_logger().error(f'Navigation recovery failed: {e}')
return False
def recover_manipulation_failure(self, failed_step: Dict[str, Any], step_index: int) -> bool:
"""
Recovery strategy for manipulation failures
"""
try:
self.get_logger().info('Attempting manipulation recovery...')
# Try alternative manipulation approach
# This could involve:
# - Re-approaching the object
# - Different grasp strategy
# - Requesting human assistance
# For now, simulate recovery
time.sleep(1)
return True
except Exception as e:
self.get_logger().error(f'Manipulation recovery failed: {e}')
return False
def generic_recovery(self, failed_step: Dict[str, Any], step_index: int) -> bool:
"""
Generic recovery for unknown failure types
"""
try:
self.get_logger().info('Attempting generic recovery...')
# Generic recovery steps
time.sleep(1)
return True
except Exception as e:
self.get_logger().error(f'Generic recovery failed: {e}')
return False
Performance Monitoring
Execution Performance Monitoring
import psutil
import time
from collections import deque
import statistics
class ExecutionPerformanceMonitor:
def __init__(self):
self.process = psutil.Process()
self.metrics = {
'execution_times': deque(maxlen=100),
'cpu_percentages': deque(maxlen=100),
'memory_usage': deque(maxlen=100),
'success_rates': deque(maxlen=100)
}
self.start_time = time.time()
def record_execution_time(self, execution_time: float):
"""
Record execution time for a step
"""
self.metrics['execution_times'].append(execution_time)
def record_system_metrics(self):
"""
Record system performance metrics
"""
cpu_percent = self.process.cpu_percent()
memory_percent = self.process.memory_percent()
self.metrics['cpu_percentages'].append(cpu_percent)
self.metrics['memory_usage'].append(memory_percent)
def record_success_rate(self, success: bool):
"""
Record success/failure of an execution
"""
self.metrics['success_rates'].append(1.0 if success else 0.0)
def get_performance_summary(self) -> Dict[str, Any]:
"""
Get performance summary statistics
"""
summary = {
'uptime': time.time() - self.start_time,
'current_cpu': self.process.cpu_percent(),
'current_memory': self.process.memory_percent(),
'process_id': self.process.pid
}
if self.metrics['execution_times']:
summary['avg_execution_time'] = statistics.mean(self.metrics['execution_times'])
summary['min_execution_time'] = min(self.metrics['execution_times'])
summary['max_execution_time'] = max(self.metrics['execution_times'])
if self.metrics['cpu_percentages']:
summary['avg_cpu'] = statistics.mean(self.metrics['cpu_percentages'])
if self.metrics['memory_usage']:
summary['avg_memory'] = statistics.mean(self.metrics['memory_usage'])
if self.metrics['success_rates']:
summary['success_rate'] = statistics.mean(self.metrics['success_rates'])
return summary
class MonitoredExecutionManager(RobustExecutionManager):
def __init__(self):
super().__init__()
self.performance_monitor = ExecutionPerformanceMonitor()
def execute_step_with_monitoring(self, step: Dict[str, Any]) -> bool:
"""
Execute a step with performance monitoring
"""
start_time = time.time()
# Record system metrics
self.performance_monitor.record_system_metrics()
# Execute the step
success = self.execute_step_with_retry(step)
# Record execution time
execution_time = time.time() - start_time
self.performance_monitor.record_execution_time(execution_time)
# Record success rate
self.performance_monitor.record_success_rate(success)
return success
def get_performance_report(self):
"""
Get performance report
"""
return self.performance_monitor.get_performance_summary()
Hands-on Exercise
Create a complete ROS 2 execution system for your robot that:
- Implements action clients for navigation and manipulation
- Creates behavior trees or state machines for complex tasks
- Integrates with your LLM planning system
- Includes error handling and recovery mechanisms
- Provides performance monitoring and feedback
This exercise will help you build a robust execution framework that can reliably execute plans generated by AI systems.