20. เจาะลึกซอร์สโค้ด (Source Code Breakdown)
ในบทนี้ เราจะมาดูเบื้องหลังการทำงานของหุ่นยนต์ Batly R1 ผ่านซอร์สโค้ดหลัก 3 ส่วน ได้แก่ Firmware ของไมโครคอนโทรลเลอร์, โปรแกรมควบคุมด้วยคีย์บอร์ด และไฟล์ตั้งค่าระบบนำทาง
20.1. Firmware: micro-ROS บน RP2040 (batlyr1.ino)
โค้ดนี้ทำงานบนบอร์ด Raspberry Pi Pico (RP2040) ทำหน้าที่เป็น Low-level Controller เชื่อมต่อกับเซนเซอร์และ Actuator ต่างๆ แล้วสื่อสารกับ ROS 2 Agent ผ่านโปรโตคอล micro-ROS
20.1.1. ไฟล์: batlyr1.ino
/*****************************************************************************************
RP2040 ALL-IN-ONE – ROS 2 VOICE ALERT, STATUS LIGHTS & SERVO SWING
*****************************************************************************************/
#include <FastLED.h>
#include <Arduino_LSM6DSOX.h>
#include <MadgwickAHRS.h>
#include <micro_ros_arduino.h>
#include <rcl/rcl.h>
#include <rclc/rclc.h>
#include <rclc/executor.h>
#include <std_msgs/msg/bool.h>
#include <sensor_msgs/msg/imu.h>
#include <geometry_msgs/msg/twist.h>
#include <hardware/watchdog.h>
// ======================= SERVO (RP2040_ISR) ========================
#if ( defined(ARDUINO_ARCH_RP2040) || defined(ARDUINO_RASPBERRY_PI_PICO) || defined(ARDUINO_ADAFRUIT_FEATHER_RP2040) || \
defined(ARDUINO_GENERIC_RP2040) ) && !defined(ARDUINO_ARCH_MBED)
#if !defined(RP2040_ISR_SERVO_USING_MBED)
#define RP2040_ISR_SERVO_USING_MBED false
#endif
#elif ( defined(ARDUINO_NANO_RP2040_CONNECT) || defined(ARDUINO_RASPBERRY_PI_PICO) || \
defined(ARDUINO_ADAFRUIT_FEATHER_RP2040) || \
defined(ARDUINO_GENERIC_RP2040) ) && defined(ARDUINO_ARCH_MBED)
#if !defined(RP2040_ISR_SERVO_USING_MBED)
#define RP2040_ISR_SERVO_USING_MBED true
#endif
#endif
#include "RP2040_ISR_Servo.h"
#define SERVO_PIN_A0 A0
#define MIN_MICROS 800
#define MAX_MICROS 2450
#define MIN_POS 60
#define MAX_POS 120
#define CENTER_POS 90
int servoIndex = -1;
int current_servo_pos = CENTER_POS;
bool servo_direction_up = true;
unsigned long last_servo_ms = 0;
const int servo_speed_ms = 8;
// ======================= FASTLED ========================
#define DATA_PIN 17
#define NUM_LEDS 6
#define BRIGHTNESS 150
CRGB leds[NUM_LEDS];
// ======================= ULTRASONIC =====================
#define TRIG_PIN 10
#define ECHO_PIN 9
unsigned long last_ultrasonic_ms = 0;
// ======================= SYSTEM STATE ===================
enum State { WAITING_AGENT, AGENT_CONNECTED };
State current_state = WAITING_AGENT;
bool alert_mode = false;
bool is_moving = false;
// ======================= IMU & TIMING ===================
Madgwick filter;
float g_ax, g_ay, g_az;
float g_gx, g_gy, g_gz;
unsigned long last_imu_ms = 0;
const unsigned long imu_interval = 20; // 50Hz
// ======================= micro-ROS ======================
rcl_allocator_t allocator;
rclc_support_t support;
rcl_node_t node;
rcl_publisher_t pub_imu;
rcl_publisher_t pub_alert;
rcl_subscription_t sub_cmd_vel;
geometry_msgs__msg__Twist msg_cmd_vel;
std_msgs__msg__Bool msg_alert;
rclc_executor_t executor;
sensor_msgs__msg__Imu msg_imu;
// CALLBACK: รับค่าความเร็วจาก ROS 2 (Teleop/Navigation)
void cmd_vel_callback(const void * msgin) {
const geometry_msgs__msg__Twist * msg = (const geometry_msgs__msg__Twist *)msgin;
is_moving = (abs(msg->linear.x) > 0.01 || abs(msg->angular.z) > 0.01);
}
// =======================================================
// SERVO TASK
// =======================================================
void updateServoTask() {
if (servoIndex == -1) return;
if (alert_mode) {
// โหมดแจ้งเตือน: สวิงซ้าย-ขวาต่อเนื่องรวดเร็ว
if (millis() - last_servo_ms >= servo_speed_ms) {
last_servo_ms = millis();
if (servo_direction_up) {
current_servo_pos += 2;
if (current_servo_pos >= MAX_POS) {
current_servo_pos = MAX_POS;
servo_direction_up = false;
}
} else {
current_servo_pos -= 2;
if (current_servo_pos <= MIN_POS) {
current_servo_pos = MIN_POS;
servo_direction_up = true;
}
}
RP2040_ISR_Servos.setPosition(servoIndex, current_servo_pos);
}
} else {
// โหมดปกติ: กลับไปหยุดนิ่งที่ 90 องศา
if (current_servo_pos != CENTER_POS) {
if (millis() - last_servo_ms >= 15) {
last_servo_ms = millis();
if (current_servo_pos < CENTER_POS) current_servo_pos++;
else if (current_servo_pos > CENTER_POS) current_servo_pos--;
RP2040_ISR_Servos.setPosition(servoIndex, current_servo_pos);
}
}
}
}
// =======================================================
// 🔥 STARTUP ANIMATION (ORIGINAL)
// =======================================================
void startupAnimation() {
int left_out[] = {2, 1, 0};
int right_out[] = {3, 4, 5};
int left_in[] = {0, 1, 2};
int right_in[] = {5, 4, 3};
FastLED.clear();
for (int i = 0; i < 3; i++) {
leds[left_out[i]] = CHSV(150, 80, 90);
leds[right_out[i]] = CHSV(150, 80, 90);
FastLED.show();
delay(120);
}
FastLED.clear();
for (int i = 0; i < 3; i++) {
leds[left_in[i]] = CHSV(150, 80, 90);
leds[right_in[i]] = CHSV(150, 80, 90);
FastLED.show();
delay(120);
}
FastLED.clear();
int speed = 160;
for (int i = 0; i < 3; i++) {
leds[left_in[i]] = CHSV(150, 100, 120);
leds[right_in[i]] = CHSV(150, 100, 120);
FastLED.show();
delay(speed);
speed = max(speed - 35, 45);
}
FastLED.clear(true);
FastLED.show();
}
// =======================================================
// LED TASK
// =======================================================
void updateLEDTask() {
static uint8_t brightness = 0;
if (current_state == WAITING_AGENT) {
if (brightness < 200) brightness++;
fill_solid(leds, NUM_LEDS, CHSV(150, 70, brightness));
}
else {
if (alert_mode) {
static bool blink = false;
static unsigned long last_blink = 0;
if(millis() - last_blink > 300) {
last_blink = millis();
blink = !blink;
}
fill_solid(leds, NUM_LEDS, blink ? CRGB::Red : CRGB::Black);
}
else if (is_moving) {
fill_solid(leds, NUM_LEDS, CHSV(150, 180, 200));
}
else {
fill_solid(leds, NUM_LEDS, CHSV(110, 140, 200));
}
}
FastLED.show();
}
void updateIMUTask() {
if (current_state != AGENT_CONNECTED) return;
if (millis() - last_imu_ms < imu_interval) return;
last_imu_ms = millis();
if (!IMU.accelerationAvailable() || !IMU.gyroscopeAvailable()) return;
IMU.readAcceleration(g_ax, g_ay, g_az);
IMU.readGyroscope(g_gx, g_gy, g_gz);
msg_imu.header.stamp.sec = millis() / 1000;
msg_imu.header.stamp.nanosec = (millis() % 1000) * 1000000;
msg_imu.linear_acceleration.x = g_ax * 9.81;
msg_imu.linear_acceleration.y = g_ay * 9.81;
msg_imu.linear_acceleration.z = g_az * 9.81;
rcl_publish(&pub_imu, &msg_imu, NULL);
}
void updateUltrasonicTask() {
if (current_state != AGENT_CONNECTED) return;
if (millis() - last_ultrasonic_ms < 80) return;
last_ultrasonic_ms = millis();
digitalWrite(TRIG_PIN, LOW);
delayMicroseconds(2);
digitalWrite(TRIG_PIN, HIGH);
delayMicroseconds(10);
digitalWrite(TRIG_PIN, LOW);
long d = pulseIn(ECHO_PIN, HIGH, 15000);
int cm = (d > 0) ? (d * 0.034 / 2) : 999;
bool prev_alert = alert_mode;
alert_mode = (cm > 0 && cm < 45);
if (alert_mode != prev_alert) {
msg_alert.data = alert_mode;
rcl_publish(&pub_alert, &msg_alert, NULL);
}
}
// =======================================================
// micro-ROS RECONNECTION & ENTITIES
// =======================================================
bool create_entities() {
allocator = rcl_get_default_allocator();
if (rclc_support_init(&support, 0, NULL, &allocator) != RCL_RET_OK) return false;
if (rclc_node_init_default(&node, "rp2040_robot_node", "", &support) != RCL_RET_OK) return false;
rclc_publisher_init_default(&pub_imu, &node, ROSIDL_GET_MSG_TYPE_SUPPORT(sensor_msgs, msg, Imu), "imu/data");
rclc_publisher_init_default(&pub_alert, &node, ROSIDL_GET_MSG_TYPE_SUPPORT(std_msgs, msg, Bool), "ultrasonic_alert");
rclc_subscription_init_default(&sub_cmd_vel, &node, ROSIDL_GET_MSG_TYPE_SUPPORT(geometry_msgs, msg, Twist), "cmd_vel");
executor = rclc_executor_get_zero_initialized_executor();
rclc_executor_init(&executor, &support.context, 1, &allocator);
rclc_executor_add_subscription(&executor, &sub_cmd_vel, &msg_cmd_vel, &cmd_vel_callback, ON_NEW_DATA);
return true;
}
void destroy_entities() {
rcl_publisher_fini(&pub_imu, &node);
rcl_publisher_fini(&pub_alert, &node);
rcl_subscription_fini(&sub_cmd_vel, &node);
rclc_executor_fini(&executor);
rcl_node_fini(&node);
rclc_support_fini(&support);
}
void updateMicroROSTask() {
static unsigned long last_check = 0;
if (millis() - last_check < 500) return;
last_check = millis();
if (rmw_uros_ping_agent(50, 1) == RMW_RET_OK) {
if (current_state == WAITING_AGENT) {
if (create_entities()) current_state = AGENT_CONNECTED;
else destroy_entities();
}
} else if (current_state == AGENT_CONNECTED) {
watchdog_reboot(0, 0, 0);
}
}
// =======================================================
// SETUP / LOOP
// =======================================================
void setup() {
watchdog_enable(3000, 1);
FastLED.addLeds<WS2812B, DATA_PIN, GRB>(leds, NUM_LEDS);
FastLED.setBrightness(BRIGHTNESS);
pinMode(TRIG_PIN, OUTPUT);
pinMode(ECHO_PIN, INPUT);
// Setup Servo
servoIndex = RP2040_ISR_Servos.setupServo(SERVO_PIN_A0, MIN_MICROS, MAX_MICROS);
if (servoIndex != -1) {
RP2040_ISR_Servos.setPosition(servoIndex, CENTER_POS);
}
IMU.begin();
filter.begin(50);
set_microros_transports();
startupAnimation();
}
void loop() {
watchdog_update();
updateMicroROSTask();
if (current_state == AGENT_CONNECTED) {
rclc_executor_spin_some(&executor, RCL_MS_TO_NS(5));
updateIMUTask();
updateUltrasonicTask();
}
updateServoTask();
updateLEDTask();
}
20.1.2. อธิบายการทำงานของโค้ด (Firmware)
การเชื่อมต่อ micro-ROS:
ใช้ฟังก์ชัน
updateMicroROSTask()เพื่อตรวจสอบสถานะการเชื่อมต่อกับ ROS 2 Agent ตลอดเวลา (Ping Agent)ถ้า Agent หายไป (
rmw_uros_ping_agentไม่ตอบกลับ) บอร์ดจะสั่ง Reboot ตัวเองเพื่อพยายามเชื่อมต่อใหม่ (watchdog_reboot)มีการสร้าง Entities (Publisher/Subscriber) ผ่านฟังก์ชัน
create_entities()
IMU Sensor (LSM6DSOX):
อ่านค่า Acceleration และ Gyroscope ส่งไปที่หัวข้อ
imu/dataทำงานที่ความถี่ 50Hz
Ultrasonic Sensor:
วัดระยะทางเพื่อตรวจจับสิ่งกีดขวาง
หากระยะน้อยกว่า 45 cm จะเปลี่ยนตัวแปร
alert_modeเป็น True และส่งข้อความแจ้งเตือนไปที่หัวข้อultrasonic_alert
Servo Motor:
โหมดปกติ: เซอร์โวจะหยุดนิ่งที่ตรงกลาง (90 องศา)
โหมดแจ้งเตือน (Alert Mode): เซอร์โวจะสวิงซ้าย-ขวาอย่างรวดเร็ว (เหมือนอาการตกใจ)
LED Status (FastLED):
สีฟ้า (Breathing): รอการเชื่อมต่อ Agent
สีเขียวสว่าง: เชื่อมต่อแล้วและหุ่นยนต์หยุดนิ่ง
สีน้ำเงิน: หุ่นยนต์กำลังเคลื่อนที่ (
is_movingเป็น True โดยเช็คจากcmd_vel)สีแดงกระพริบ: แจ้งเตือนสิ่งกีดขวาง (Alert Mode)
20.2. Teleoperation: การควบคุมด้วยคีย์บอร์ด (teleop_keyboard.py)
Node นี้เขียนด้วย Python ทำหน้าที่รับค่าการกดปุ่มจากผู้ใช้ แล้วแปลงเป็นข้อความ geometry_msgs/Twist เพื่อสั่งความเร็วหุ่นยนต์
20.2.1. ไฟล์: teleop_keyboard.py
#!/usr/bin/env python3
import threading
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist, TwistStamped
import sys
from select import select
if sys.platform == 'win32':
import msvcrt
else:
import termios
import tty
msg = """
Reading from the keyboard and Publishing to Twist!
---------------------------
Moving around:
7 8 9
4 5 6
1 2 3
q/w : increase/decrease max speeds by 0.04 and 0.2
a/s : increase/decrease only linear speed by 0.04
z/x : increase/decrease only angular speed by 0.2
CTRL-C to quit
"""
moveBindings = {
'2': (-1, 0, 0, 0),
'1': (-1, 0, 0, -1),
'4': (0, 0, 0, 1),
'6': (0, 0, 0, -1),
'3': (-1, 0, 0, 1),
'8': (1, 0, 0, 0),
'7': (1, 0, 0, 1),
'9': (1, 0, 0, -1),
}
speedBindings = {
'q': (0.04, 0.2),
'w': (-0.04, -0.2),
'a': (0.04, 0),
's': (-0.04, 0),
'z': (0, 0.2),
'x': (0, -0.2),
}
class TeleopTwistKeyboard(Node):
def __init__(self):
super().__init__('teleop_twist_keyboard')
# Parameters
self.speed = self.declare_parameter("speed", 0.04).value
self.turn = self.declare_parameter("turn", 0.2).value
self.speed_limit = self.declare_parameter("speed_limit", 2.04).value
self.turn_limit = self.declare_parameter("turn_limit", 10.0).value
self.key_timeout = self.declare_parameter("key_timeout", 0.5).value
self.stamped = self.declare_parameter("stamped", False).value
self.twist_frame = self.declare_parameter("frame_id", "").value
# Publisher
self.publisher = self.create_publisher(TwistStamped if self.stamped else Twist, 'cmd_vel', 10)
# Initialize variables
self.x = 0
self.y = 0
self.z = 0
self.th = 0
# Start publishing thread
self.publish_thread = PublishThread(self.publisher, self.stamped, self.twist_frame)
self.publish_thread.start()
def run(self):
settings = save_terminal_settings()
try:
print(msg)
print(self.vels(self.speed, self.turn))
while True:
key = get_key(settings, self.key_timeout)
if key in moveBindings:
self.x, self.y, self.z, self.th = moveBindings[key]
elif key in speedBindings:
self.speed = min(self.speed_limit, max(self.speed + speedBindings[key][0], 0.0))
self.turn = min(self.turn_limit, max(self.turn + speedBindings[key][1], 0.0))
print(self.vels(self.speed, self.turn))
else:
if key == '' and self.x == 0 and self.y == 0 and self.z == 0 and self.th == 0:
continue
self.x, self.y, self.z, self.th = 0, 0, 0, 0
if key == '\x03': # CTRL-C
break
self.publish_thread.update(self.x, self.y, self.z, self.th, self.speed, self.turn)
except Exception as e:
self.get_logger().error(str(e))
finally:
self.publish_thread.stop()
restore_terminal_settings(settings)
def vels(self, speed, turn):
return f"currently:\tLinear {speed:.2f} m/s\tAngular {turn:.2f} rad/s"
class PublishThread(threading.Thread):
def __init__(self, publisher, stamped, twist_frame):
super().__init__()
self.publisher = publisher
self.stamped = stamped
self.twist_frame = twist_frame
self.x = 0.0
self.y = 0.0
self.z = 0.0
self.th = 0.0
self.speed = 0.0
self.turn = 0.0
self.done = False
self.condition = threading.Condition()
def update(self, x, y, z, th, speed, turn):
with self.condition:
self.x = float(x)
self.y = float(y)
self.z = float(z)
self.th = float(th)
self.speed = float(speed)
self.turn = float(turn)
self.condition.notify()
def run(self):
while not self.done:
with self.condition:
self.condition.wait()
msg = TwistStamped() if self.stamped else Twist()
if self.stamped:
msg.header.stamp = rclpy.time.Time().to_msg()
msg.header.frame_id = self.twist_frame
msg.linear.x = self.x * self.speed
msg.linear.y = self.y * self.speed
msg.linear.z = self.z
msg.angular.z = self.th * self.turn
self.publisher.publish(msg)
def stop(self):
self.done = True
self.update(0, 0, 0, 0, 0, 0)
def get_key(settings, timeout):
if sys.platform == 'win32':
return msvcrt.getwch()
else:
tty.setraw(sys.stdin.fileno())
rlist, _, _ = select([sys.stdin], [], [], timeout)
key = sys.stdin.read(1) if rlist else ''
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, settings)
return key
def save_terminal_settings():
if sys.platform == 'win32':
return None
return termios.tcgetattr(sys.stdin)
def restore_terminal_settings(settings):
if sys.platform == 'win32':
return
termios.tcsetattr(sys.stdin, termios.TCSADRAIN, settings)
def main():
rclpy.init()
node = TeleopTwistKeyboard()
try:
node.run()
except KeyboardInterrupt:
pass
finally:
rclpy.shutdown()
if __name__ == '__main__':
main()
20.2.2. อธิบายการทำงานของโค้ด (Teleop)
การอ่านค่าคีย์บอร์ด: ใช้ไลบรารี
termiosและtty(บน Linux) เพื่ออ่านค่าจาก Keyboard แบบ Real-time โดยไม่ต้องกด EnterMapping ปุ่มกด:
ใช้ Numpad 8, 2, 4, 6 เพื่อเคลื่อนที่ (หน้า, หลัง, ซ้าย, ขวา)
ใช้ปุ่ม q, z เพื่อเพิ่มความเร็ว
Publish Thread: แยก Thread สำหรับการส่งค่า (
publisher.publish) ออกจาก Thread ที่รับค่าคีย์บอร์ด เพื่อให้การส่งข้อมูลมีความต่อเนื่องแม้ผู้ใช้จะไม่ได้กดปุ่มรัวๆ