Lab 07

This lab page has not yet been ported to ROS 2. It will be updated shortly.

During this lab, you should learn how to work with action servers and clients.

ROS action server and client

The ROS action servers/clients provide the ability to execute and interface preemptable tasks that can take a long time to execute. It also allows you to get a regular feedback from tasks that are currently being executed. Examples of such a task can be moving a robot to a target location or following a given path with a robot. You can check out the official python tutorial for writing action server and action client.

Action specification

The action specification have three parts, the goal (similar to services' request), the results (similar to services' response), and the feedback messages to enable reporting on the action run. The action server and action clients can be created using rclpy.action library in python.

The definition of an example action GoToPosition.action can look as follows:

#goal definition
geometry_msgs/Point position
---
#result definition
bool success
string message
---
#feedback
float32 position_error
float32 orientation_error
string message

Action server

The action server part listens for a goal message and once the goal is received, it starts executing procedures to fulfill the given goal. While working on finishing the goal, the action server can periodically report the current state of goal processing through a feedback message, but also has to check if it was not stopped through preemption request. Upon finishing or failing to fulfill the goal, the action server sends its result to the client. The action server code could look like this:

import time
import rclpy
from rclpy.node import Node
from rclpy.action import ActionServer, GoalResponse
from rclpy.executors import ExternalShutdownException
 
from your_package.action import GoToPosition
 
 
class GoToPositionActionServer(Node):
 
    def __init__(self):
        super().__init__('goto_position_action_server')
 
        self._action_server = ActionServer(self, GoToPosition, 'goto_position', 
        execute_callback=self.execute_callback, goal_callback=self.goal_callback)
 
    def execute_callback(self, goal_handle):
        self.get_logger().info('Executing goal...')
 
        success = True
        rate_hz = 10.0
        period = 1.0 / rate_hz
 
        feedback = GoToPosition.Feedback()
 
        while rclpy.ok():
 
            # goal canceled
            if goal_handle.is_cancel_requested:
                self.get_logger().info('Goal canceled')
                goal_handle.canceled()
 
                result = GoToPosition.Result()
                result.success = False
                result.message = "goal canceled"
                return result
 
            .... # do something now, compute position and orientation error
 
            feedback.message = "still working on it"
            feedback.position_error = ...
            feedback.orientation_error = ...
 
            goal_handle.publish_feedback(feedback)
 
            # check finish/fail conditions
            if #finished or #failed:
                success = True/False # based on finished or failed
                break # break the action loop
 
            time.sleep(period)
 
        # set final state before returning
        result = GoToPosition.Result()
        result.success = success
 
        if success:
            result.message = "finished successfully"
            goal_handle.succeed()
        else:
            result.message = "failed miserably"
            goal_handle.abort()
 
        return result
 
    def goal_callback(self, goal_request):
        self.get_logger().info('Received goal request')
 
        position_invalid = ...
 
        if position_invalid:
            self.get_logger().warn('Rejecting goal: position invalid')
            return GoalResponse.REJECT
 
        # Otherwise accept
        self.get_logger().info('Goal accepted')
        return GoalResponse.ACCEPT
 
def main(args=None):
    try:
        with rclpy.init(args=args):
            node = GoToPositionActionServer()
            rclpy.spin(node)
    except (KeyboardInterrupt, ExternalShutdownException):
        pass
 
 
if __name__ == '__main__':
    main()

Action client

The action client can be used to interface the action server efficiently. A simple client can look like this:

import rclpy
from rclpy.node import Node
from rclpy.action import ActionClient
from rclpy.executors import ExternalShutdownException
 
from your_package.action import GoToPosition
 
class GoToPositionActionClient(Node):
 
    def __init__(self):
        super().__init__('goto_position_action_client')
 
        self._action_client = ActionClient(self, GoToPosition, 'goto_position')
 
    def send_goal(self):
        # Wait for server
        self.get_logger().info('Waiting for action server...')
        self._action_client.wait_for_server()
 
        # Create goal
        goal_msg = GoToPosition.Goal()
        goal_msg.position = ...
 
        self.get_logger().info('Sending goal...')
 
        # Send goal asynchronously
        self._send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)
 
        self._send_goal_future.add_done_callback(self.goal_response_callback)
 
    # Called when server accepts/rejects goal
    def goal_response_callback(self, future):
        goal_handle = future.result()
 
        if not goal_handle.accepted:
            self.get_logger().info('Goal rejected')
            return
 
        self.get_logger().info('Goal accepted')
 
        # Request result
        self._get_result_future = goal_handle.get_result_async()
        self._get_result_future.add_done_callback(self.get_result_callback)
 
    # Feedback callback
    def feedback_callback(self, feedback_msg):
        feedback = feedback_msg.feedback
        self.get_logger().info(f'Feedback: {feedback.message}, 'f'pos_err={feedback.position_error},
        'f'ori_err={feedback.orientation_error}')
 
    # Result callback
    def get_result_callback(self, future):
        result = future.result().result
        status = future.result().status
 
        self.get_logger().info(f'Result: success={result.success}')
        self.get_logger().info(f'Message: {result.message}')
        self.get_logger().info(f'Status: {status}')
 
        # Shutdown after receiving result
        rclpy.shutdown()
 
 
def main(args=None):
    try:
        rclpy.init(args=args)
 
        node = GoToPositionActionClient()
        node.send_goal()
 
        rclpy.spin(node)
 
    except (KeyboardInterrupt, ExternalShutdownException):
        pass
 
 
if __name__ == '__main__':
    main()

Homework 5 assignment

Read and try to understand the assignment of the homework HW5.

courses/aro/tutorials/lab07.txt · Last modified: 2026/03/30 11:39 by kratkvit