Python Control with rclpy
This section focuses on controlling robots using Python with the rclpy library, which is the Python client library for ROS 2. You'll learn how to create nodes, handle parameters, and control robot hardware from Python scripts.
Introduction to rclpy
rclpy provides Python bindings for ROS 2, allowing you to write ROS 2 nodes in Python. It's particularly useful for rapid prototyping, testing, and applications where Python's ecosystem provides advantages.
Basic Node Structure
Every rclpy node follows a similar structure:
import rclpy
from rclpy.node import Node
class RobotController(Node):
def __init__(self):
super().__init__('robot_controller')
# Initialize publishers, subscribers, services, etc.
self.get_logger().info('Robot Controller node initialized')
def main(args=None):
rclpy.init(args=args)
node = RobotController()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Working with Parameters
Parameters allow you to configure your node without recompiling:
import rclpy
from rclpy.node import Node
class ParameterNode(Node):
def __init__(self):
super().__init__('parameter_node')
# Declare parameters with default values
self.declare_parameter('robot_name', 'humanoid_robot')
self.declare_parameter('max_speed', 1.0)
self.declare_parameter('safety_distance', 0.5)
# Access parameter values
self.robot_name = self.get_parameter('robot_name').value
self.max_speed = self.get_parameter('max_speed').value
self.safety_distance = self.get_parameter('safety_distance').value
self.get_logger().info(f'Robot: {self.robot_name}, Max Speed: {self.max_speed}')
def main(args=None):
rclpy.init(args=args)
node = ParameterNode()
rclpy.spin_once(node, timeout_sec=1) # Run once to log parameters
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Robot Movement Control
Here's an example of controlling a robot's movement using Twist messages:
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist
class MovementController(Node):
def __init__(self):
super().__init__('movement_controller')
# Create publisher for cmd_vel topic
self.cmd_vel_publisher = self.create_publisher(Twist, 'cmd_vel', 10)
# Create a timer to send commands periodically
self.timer = self.create_timer(0.1, self.send_command)
# Initialize movement parameters
self.linear_speed = 0.5 # m/s
self.angular_speed = 0.5 # rad/s
self.get_logger().info('Movement Controller initialized')
def send_command(self):
msg = Twist()
# Set linear and angular velocities
msg.linear.x = self.linear_speed
msg.angular.z = self.angular_speed
self.cmd_vel_publisher.publish(msg)
def main(args=None):
rclpy.init(args=args)
controller = MovementController()
try:
rclpy.spin(controller)
except KeyboardInterrupt:
controller.get_logger().info('Stopping movement controller')
finally:
# Stop the robot before shutting down
stop_msg = Twist()
controller.cmd_vel_publisher.publish(stop_msg)
controller.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Joint Control for Humanoid Robots
For humanoid robots, you'll often need to control individual joints:
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import JointState
import math
class JointController(Node):
def __init__(self):
super().__init__('joint_controller')
# Publisher for joint commands
self.joint_publisher = self.create_publisher(JointState, 'joint_commands', 10)
# Timer for sending joint commands
self.timer = self.create_timer(0.05, self.send_joint_commands)
# Define joint names for a humanoid robot
self.joint_names = [
'left_hip_joint', 'left_knee_joint', 'left_ankle_joint',
'right_hip_joint', 'right_knee_joint', 'right_ankle_joint',
'left_shoulder_joint', 'left_elbow_joint', 'left_wrist_joint',
'right_shoulder_joint', 'right_elbow_joint', 'right_wrist_joint'
]
self.time = 0.0
self.get_logger().info('Joint Controller initialized')
def send_joint_commands(self):
msg = JointState()
msg.name = self.joint_names
msg.position = []
# Generate sinusoidal joint movements
for i, joint_name in enumerate(self.joint_names):
# Create different movement patterns for different joint groups
if 'hip' in joint_name:
position = 0.2 * math.sin(self.time * 2.0)
elif 'knee' in joint_name:
position = 0.3 * math.sin(self.time * 2.0 + math.pi/2)
elif 'shoulder' in joint_name:
position = 0.5 * math.sin(self.time * 1.5)
else:
position = 0.1 * math.sin(self.time * 1.0)
msg.position.append(position)
msg.header.stamp = self.get_clock().now().to_msg()
self.joint_publisher.publish(msg)
self.time += 0.05
def main(args=None):
rclpy.init(args=args)
controller = JointController()
try:
rclpy.spin(controller)
except KeyboardInterrupt:
controller.get_logger().info('Stopping joint controller')
finally:
controller.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Error Handling and Safety
When controlling robots, it's crucial to implement proper error handling:
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist
from sensor_msgs.msg import LaserScan
import threading
import time
class SafeController(Node):
def __init__(self):
super().__init__('safe_controller')
self.cmd_vel_publisher = self.create_publisher(Twist, 'cmd_vel', 10)
self.scan_subscriber = self.create_subscription(
LaserScan, 'scan', self.scan_callback, 10)
self.safety_distance = 0.5 # meters
self.obstacle_detected = False
self.desired_linear_speed = 0.5
self.desired_angular_speed = 0.0
# Timer for control loop
self.control_timer = self.create_timer(0.1, self.control_loop)
self.get_logger().info('Safe Controller initialized')
def scan_callback(self, msg):
# Check for obstacles in front of the robot
if len(msg.ranges) > 0:
# Check the front 30 degrees
front_ranges = msg.ranges[:len(msg.ranges)//12] + msg.ranges[-len(msg.ranges)//12:]
min_distance = min([r for r in front_ranges if r > 0 and not r > 10], default=float('inf'))
self.obstacle_detected = min_distance < self.safety_distance
if self.obstacle_detected:
self.get_logger().warn(f'Obstacle detected at {min_distance:.2f}m')
def control_loop(self):
cmd_msg = Twist()
if self.obstacle_detected:
# Stop the robot when obstacle is detected
cmd_msg.linear.x = 0.0
cmd_msg.angular.z = 0.0
self.get_logger().info('Obstacle detected - stopping robot')
else:
# Apply desired velocities
cmd_msg.linear.x = self.desired_linear_speed
cmd_msg.angular.z = self.desired_angular_speed
self.cmd_vel_publisher.publish(cmd_msg)
def main(args=None):
rclpy.init(args=args)
controller = SafeController()
try:
rclpy.spin(controller)
except KeyboardInterrupt:
controller.get_logger().info('Shutting down safe controller')
finally:
# Always stop the robot before exiting
stop_msg = Twist()
controller.cmd_vel_publisher.publish(stop_msg)
controller.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Hands-on Exercise
Create a Python node that implements a simple state machine for a humanoid robot:
- Define states: IDLE, WALKING, TURNING, STOPPING
- Implement transitions based on sensor input
- Control the robot to move forward until it detects an obstacle
- When an obstacle is detected, turn in a random direction
- Resume forward motion after turning
This exercise will help you understand how to implement robot control logic using Python and rclpy.