Skip to main content
Version: Next

Execute Task

Service

Execute a task as a service.

import rclpy
from rclpy.node import Node

from clearpath_task_msgs.srv import ExecuteTask


class ExecuteTask(Node):

def __init__(self):
super().__init__('execute_task_client')
self._srv_client = self.create_client(ExecuteTask, 'autonomy/task/execute_as_srv')
while not self._srv_client.wait_for_service(timeout_sec=1.0):
self.get_logger().info('service not available, waiting again...')
self.req = ExecuteTask.Request()

def send_request(self):
# TODO
self.future = self._srv_client.call_async(self.req)
rclpy.spin_until_future_complete(self, self.future)
return self.future.result()

if __name__ == '__main__':
rclpy.init(args=args)
execute_task_client = PauseAutonomy()
response = execute_task_client.send_request()
if response.success:
execute_task_client.get_logger().info('Executing task: '))
else:
execute_task_client.get_logger().error('Failed to execute task: ')

execute_task_client.destroy_node()
rclpy.shutdown()

Action

Execute a task as an action.

import rclpy
from rclpy.action import ActionClient
from rclpy.node import Node

from clearpath_task_msgs.action import ExecuteTask


class ExecuteTask(Node):

def __init__(self):
super().__init__('execute_task_action_client')
self._action_client = ActionClient(self, ExecuteTask, 'autonomy/task/execute_as_action')

def send_goal(self):
goal_msg = ExecuteTask.Goal()

# TODO: populate goal message

self._action_client.wait_for_server()

self._send_goal_future = self._action_client.send_goal_async(goal_msg, feedback_callback=self.feedback_callback)

self._send_goal_future.add_done_callback(self.goal_response_callback)

def goal_response_callback(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().info('Goal rejected :(')
return

self.get_logger().info('Goal accepted :)')

self._get_result_future = goal_handle.get_result_async()
self._get_result_future.add_done_callback(self.get_result_callback)

def get_result_callback(self, future):
result = future.result().result
self.get_logger().info('Result: {0}'.format(result.sequence))
rclpy.shutdown()

def feedback_callback(self, feedback_msg):
feedback = feedback_msg.feedback
self.get_logger().info('Received feedback: {0}'.format(feedback.partial_sequence))


if __name__ == '__main__':
rclpy.init(args=args)
action_client = ExecuteTask()
action_client.send_goal()
rclpy.spin(action_client)