Writing Your Own ROS 2 Nodes for AD-R1M
This guide walks you through creating custom ROS 2 nodes for the AD-R1M robot platform, using the lift control service as a practical example.
Development Environment Setup
Setting Up Your Workspace
Access the development container:
docker run -it \ --network=host \ --ipc=host \ --pid=host \ --privileged \ -v /home/analog/ros_data:/ros_data \ -e RMW_IMPLEMENTATION=rmw_zenoh_cpp \ -e ROBOT_NAMESPACE=ad_r1m_0 \ working bash
Source the ROS 2 workspace:
source /ros2_ws/install/setup.bash
Create a new package (for custom nodes):
cd /ros2_ws/src ros2 pkg create --build-type ament_python my_robot_nodes --dependencies rclpy std_msgs geometry_msgs
Package Structure
A typical ROS 2 Python package structure:
my_robot_nodes/
├── my_robot_nodes/
│ ├── __init__.py
│ ├── my_node.py
│ └── my_service.py
├── resource/
│ └── my_robot_nodes
├── test/
├── package.xml
└── setup.py
Example: Creating a Lift Control Node
The AD-R1M includes a lift mechanism that can be controlled via ROS 2. This example demonstrates creating a service client and server for lift control.
Lift Service Definition
First, define the service interface. In the adrd_demo_ros2 package, the lift service is defined as:
# LiftGPIO.srv
int32 command
---
bool success
string message
Commands:
0= Hold position1= Lift up2= Lift down
Lift Server Node (Python)
#!/usr/bin/env python3
"""
Lift control server node for AD-R1M.
Receives lift commands and controls GPIO.
"""
import rclpy
from rclpy.node import Node
from adrd_demo_ros2.srv import LiftGPIO
import RPi.GPIO as GPIO
class LiftServer(Node):
def __init__(self):
super().__init__('lift_server')
# GPIO setup
self.lift_up_pin = 17
self.lift_down_pin = 27
GPIO.setmode(GPIO.BCM)
GPIO.setup(self.lift_up_pin, GPIO.OUT)
GPIO.setup(self.lift_down_pin, GPIO.OUT)
# Create service
self.srv = self.create_service(
LiftGPIO,
'/elevator_to_robot',
self.lift_callback
)
self.get_logger().info('Lift server ready')
def lift_callback(self, request, response):
command = request.command
if command == 0: # Hold
GPIO.output(self.lift_up_pin, GPIO.LOW)
GPIO.output(self.lift_down_pin, GPIO.LOW)
response.message = "Lift holding"
elif command == 1: # Up
GPIO.output(self.lift_down_pin, GPIO.LOW)
GPIO.output(self.lift_up_pin, GPIO.HIGH)
response.message = "Lift going up"
elif command == 2: # Down
GPIO.output(self.lift_up_pin, GPIO.LOW)
GPIO.output(self.lift_down_pin, GPIO.HIGH)
response.message = "Lift going down"
else:
response.success = False
response.message = f"Unknown command: {command}"
return response
response.success = True
self.get_logger().info(response.message)
return response
def destroy_node(self):
GPIO.cleanup()
super().destroy_node()
def main():
rclpy.init()
node = LiftServer()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Lift Client Node (Python)
#!/usr/bin/env python3
"""
Async lift control client for AD-R1M.
"""
import rclpy
from rclpy.node import Node
from adrd_demo_ros2.srv import LiftGPIO
class LiftClientAsync(Node):
def __init__(self):
super().__init__('lift_client')
self.client = self.create_client(LiftGPIO, '/elevator_to_robot')
while not self.client.wait_for_service(timeout_sec=1.0):
self.get_logger().info('Waiting for lift service...')
def send_command(self, command):
"""
Send lift command asynchronously.
Args:
command: 0=hold, 1=up, 2=down
Returns:
Future object for the service call
"""
request = LiftGPIO.Request()
request.command = command
return self.client.call_async(request)
def main():
rclpy.init()
client = LiftClientAsync()
# Example: Move lift up
future = client.send_command(1)
rclpy.spin_until_future_complete(client, future)
result = future.result()
print(f'Result: {result.success}, {result.message}')
client.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Creating a Publisher Node
Example: Custom Status Publisher
#!/usr/bin/env python3
"""
Publishes custom robot status messages.
"""
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
import json
class StatusPublisher(Node):
def __init__(self):
super().__init__('status_publisher')
self.publisher = self.create_publisher(String, '/robot_status', 10)
self.timer = self.create_timer(1.0, self.publish_status)
self.status = {
'state': 'idle',
'battery': 12.0,
'errors': []
}
def publish_status(self):
msg = String()
msg.data = json.dumps(self.status)
self.publisher.publish(msg)
def update_state(self, new_state):
self.status['state'] = new_state
def main():
rclpy.init()
node = StatusPublisher()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
Creating a Subscriber Node
Example: Velocity Monitor
#!/usr/bin/env python3
"""
Monitors velocity commands and logs statistics.
"""
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist
import math
class VelocityMonitor(Node):
def __init__(self):
super().__init__('velocity_monitor')
self.subscription = self.create_subscription(
Twist,
'/cmd_vel',
self.velocity_callback,
10
)
self.max_linear = 0.0
self.max_angular = 0.0
self.timer = self.create_timer(5.0, self.report_stats)
def velocity_callback(self, msg):
linear = math.sqrt(msg.linear.x**2 + msg.linear.y**2)
angular = abs(msg.angular.z)
self.max_linear = max(self.max_linear, linear)
self.max_angular = max(self.max_angular, angular)
def report_stats(self):
self.get_logger().info(
f'Max velocities - Linear: {self.max_linear:.2f} m/s, '
f'Angular: {self.max_angular:.2f} rad/s'
)
def main():
rclpy.init()
node = VelocityMonitor()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
Creating an Action Server
For long-running tasks, use ROS 2 actions.
Example: Dock Action Server
#!/usr/bin/env python3
"""
Action server for docking behavior.
"""
import rclpy
from rclpy.action import ActionServer
from rclpy.node import Node
from nav2_msgs.action import NavigateToPose
from geometry_msgs.msg import Twist
import time
class DockActionServer(Node):
def __init__(self):
super().__init__('dock_action_server')
self._action_server = ActionServer(
self,
NavigateToPose,
'dock',
self.execute_callback
)
self.cmd_pub = self.create_publisher(Twist, '/cmd_vel', 10)
async def execute_callback(self, goal_handle):
self.get_logger().info('Executing docking...')
feedback_msg = NavigateToPose.Feedback()
# Simulate docking approach
for i in range(10):
if goal_handle.is_cancel_requested:
goal_handle.canceled()
return NavigateToPose.Result()
# Move slowly forward
cmd = Twist()
cmd.linear.x = 0.1
self.cmd_pub.publish(cmd)
feedback_msg.distance_remaining = float(10 - i) / 10.0
goal_handle.publish_feedback(feedback_msg)
time.sleep(0.5)
# Stop
self.cmd_pub.publish(Twist())
goal_handle.succeed()
result = NavigateToPose.Result()
return result
def main():
rclpy.init()
node = DockActionServer()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
Building and Installing Your Package
Update setup.py with your entry points:
entry_points={ 'console_scripts': [ 'lift_server = my_robot_nodes.lift_server:main', 'lift_client = my_robot_nodes.lift_client:main', 'status_publisher = my_robot_nodes.status_publisher:main', ], },
Build the package:
cd /ros2_ws colcon build --packages-select my_robot_nodes source install/setup.bash
Run your node:
ros2 run my_robot_nodes lift_server
Creating a Launch File
# launch/my_nodes.launch.py
from launch import LaunchDescription
from launch_ros.actions import Node
def generate_launch_description():
return LaunchDescription([
Node(
package='my_robot_nodes',
executable='lift_server',
name='lift_server',
output='screen',
),
Node(
package='my_robot_nodes',
executable='status_publisher',
name='status_publisher',
output='screen',
parameters=[{
'publish_rate': 1.0,
}],
),
])
Run with:
ros2 launch my_robot_nodes my_nodes.launch.py
Best Practices
Use namespaces for multi-robot compatibility:
self.create_publisher(Twist, 'cmd_vel', 10) # Relative topic
Handle exceptions gracefully:
try: rclpy.spin(node) except KeyboardInterrupt: pass finally: node.destroy_node() rclpy.shutdown()
Use parameters for configuration:
self.declare_parameter('speed', 0.5) speed = self.get_parameter('speed').value
Log appropriately:
self.get_logger().debug('Debug info') self.get_logger().info('Normal operation') self.get_logger().warn('Warning condition') self.get_logger().error('Error occurred')
Test in isolation before integrating with the full system