13. Custom Action

ในบทนี้เราจะเรียนรู้การสร้าง Action แบบกำหนดเอง (Custom Action) ซึ่งเป็นกลไกการสื่อสารระหว่างโหนด (Node) ที่ออกแบบมาสำหรับ งานที่ใช้เวลานานและต้องรายงานความคืบหน้า (progress) เช่น หุ่นยนต์เคลื่อนที่ไปยังเป้าหมาย, แขนกลกำลังหยิบวัตถุ, หรือระบบนับเวลาการทำงาน

13.1. สร้าง ไฟล์ .action

เปิด VS Code ขึ้นมา สร้างโฟลเดอร์ action

_images/13.1.1.png

สร้างไฟล์ Timer.action

_images/13.1.2.png

เพิ่มเนื้อหาในไฟล์ Timer.action

int32 duration
---
bool success
---
int32 time_elapsed
_images/13.1.3.png

เพิ่มเนื้อหาใน CMakeList.txt

_images/13.1.4.png

เปิด Terminal ขึ้นมา เพื่อทำการ คอมไพล์

cd ~/ros2_ws
colcon build 
_images/13.1.5.png

ตรวจสอบว่า action ถูกสร้างสำเร็จ

ros2 interface show custom_tutorials/action/Timer
_images/13.1.6.png

13.2. ทดลองใช้ Custom Action ที่เราสร้างไว้ (Timer.action)

สร้าง โหนด timer_action_server.py และ timer_action_client.py ภายใน my_package

_images/13.2.1.png

ตัวอย่างโค้ด timer_action_server.py

import time
import rclpy
from rclpy.node import Node
from rclpy.action import ActionServer
from custom_tutorials.action import Timer

class TimerActionServer(Node):
    def __init__(self):
        super().__init__('timer_action_server')
        self._server = ActionServer(self, Timer, 'timer', self.execute_cb)
        self.get_logger().info('Timer action server started')

    def execute_cb(self, goal_handle):
        duration = goal_handle.request.duration
        fb = Timer.Feedback()
        for t in range(1, duration + 1):
            fb.time_elapsed = t
            goal_handle.publish_feedback(fb)
            time.sleep(1)
        result = Timer.Result()
        result.success = True
        goal_handle.succeed()
        return result

def main():
    rclpy.init()
    node = TimerActionServer()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

ตัวอย่างโค้ด timer_action_client.py

import rclpy
from rclpy.node import Node
from rclpy.action import ActionClient
from custom_tutorials.action import Timer

class TimerActionClient(Node):
    def __init__(self):
        super().__init__('timer_action_client')
        self._client = ActionClient(self, Timer, 'timer')

    def send_goal(self, duration):
        self._client.wait_for_server()
        goal = Timer.Goal()
        goal.duration = duration
        
        # 1. Capture the future and add a callback for the goal response
        self._send_goal_future = self._client.send_goal_async(goal, feedback_callback=self.fb_cb)
        self._send_goal_future.add_done_callback(self.goal_response_callback)

    def goal_response_callback(self, future):
        goal_handle = future.result()
        if not goal_handle.accepted:
            self.get_logger().info('Goal rejected :(')
            return

        self.get_logger().info('Goal accepted :)')

        # 2. Request the result and add a callback for when it completes
        self._get_result_future = goal_handle.get_result_async()
        self._get_result_future.add_done_callback(self.get_result_callback)

    def get_result_callback(self, future):
        # 3. Access the actual result object
        result = future.result().result
        self.get_logger().info(f'Result: {result}')
        raise SystemExit

    def fb_cb(self, fb_msg):
        self.get_logger().info(f'Feedback: time_elapsed={fb_msg.feedback.time_elapsed}')

def main():
    rclpy.init()
    node = TimerActionClient()
    node.send_goal(5)
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

ตั้งค่า setup.py

'timer_action_server = my_package.timer_action_server:main',
'timer_action_client = my_package.timer_action_client:main',
_images/13.2.2.png

เปิด Terminal ขึ้นมา ทำการ คอมไพล์

cd ~/ros2_ws
colcon build 
_images/13.2.3.png

ทดสอบการทำงานของโหนด timer_action_server.py และ timer_action_client.py กับ Timer Action

ros2 run my_package timer_action_server 
ros2 run my_package timer_action_client
_images/13.2.4.png