
こんにちは、 ABEJA でソフトウェアエンジニアをしている三枝 (@moriaki3193) です。ABEJAアドベントカレンダー2025の5日目の記事を担当させていただきます!
今年の9月より新設されたエンボディドインテリジェンスグループという組織でロボティクスの社会実装をテーマに開発業務に取り組むことになったのですが、私はこれまでロボティクス機能の開発経験がありませんでした。この記事は、そんな自分がキャッチアップを兼ねて iPhone でロボットアームをテレオペする機能を開発してみた、というものです。ただし、まだ実機で動作確認をするのは怖かったので、今回は RViz2 を利用してシミュレーションを行うところまでをスコープとしました。お手柔らかにお付き合いください。
【重要!!!】また、同日に同僚のプロジェクトマネージャが記事を出すのですが、ひょっとするとテレオペがいらなくなるかもしれない世界について解説しているのでそちらも合わせてお楽しみください!
つくるもの

(エンドエフェクタが下を向いてしまっているのはご愛嬌です、余裕があれば修正します...)
今回実装するものは 1. ロボットアームを操作するための iOS アプリ 2. 操作内容を受け取りロボットアームの状態を制御する ROS 2 パッケージの2つです。
iOS アプリの機能としては、当初はロボットアームに対して「土台部分を 90 度回転させて」「先端部分を 10cm 下げて」というように各関節の状態を送信するような構成を考えていたのですが、今回は trzy/robot-arm リポジトリのような先例の構成を真似て、デバイスの実際の位置に対応するように先端部分(エンドエフェクタ)の位置のみを送信するものとしました。
そのようにして指定されるエンドエフェクタの位置をロボットアームに変換する機能を ROS 2 パッケージとして実装します。 A. iOS アプリからのデータを受け取る機能 B. 指定される位置に到達するように各関節の状態を制御する機能の2つの機能が必要となるため、それらをパッケージ内のノードとして実装します。また、結果を確認するためにそれぞれの機能を連携し、可視化のためのツールを起動するスクリプトも作成します。

上記の図は、テレオペという機能を実現する上での構成要素の関係性を表したものです。たくさんの登場人物がいるように見えますが、実装が必要になるのは左側の上3つです。 ROS 2 には有用な実装がさまざまにコミュニティに公開されているので、それらを活用することで開発しなければならないものを削減できます。広く利用されてきた機能性を組み合わせて再活用し、新たな機能性を開発できることが ROS 2 の強みの一つであると感じました。
とはいえ、自前で用意しなければならない実装の内容自体はある程度大きなものとなってしまうため、私が今回用意したもの全体をブログに掲載することは難しく、重要な部分のみを抜粋しています。執筆時から更新・改善されている部分もあるので、(時間が許せば)リポジトリとして全体を公開できればと考えています...
つくりかた
Teleoperator
まずはロボットアームのエンドエフェクタの座標、姿勢やグリッパーの開閉具合の情報を送信する iOS アプリを開発していきます。個人的な好みで The Composable Architecture (TCA) を採用していますが、実装上の要点についてはご理解いただけると思います。
Apple 公式のフレームワークに ARKit があります。その名前の通り拡張現実を扱うものであり、デバイスが認識している現実世界についての情報を取得するための API が提供されています。今回はそれらを利用して3次元空間内での座標を取得する機能を実装します。
まずは ARKit を通じて取得できるデータを表現するモデル DevicePose を定義します。データが取得された日時を表す timestamp 、3次元空間内の座標 position と姿勢角 orientation からなる構造体です:
/* `Models/DevicePose.swift` */ import Foundation import simd struct DevisePose: Equatable, Sendable { var timestamp: TimeInterval var position: SIMD3<Float> var orientation: simd_quatf }
次に ARKit を利用してデータを取得するクライアント ARTrackingClient を実装していきます。 TCA を利用しているので、まずはインタフェースとなる構造体を宣言します:
/* `Dependencies/ARTrackingClient/ARTrackingClient.swift` */ struct ARTrackingClient: Sendable { var startEventTracking: @Sendable () -> AsyncStream<TrackingEvent> var startTracking: @Sendable () -> AsyncStream<DevicePose> var stopTracking: @Sendable () async -> Void }
ここでの var startTracking: @Sendable () -> AsyncStream<DevicePose> が、 ARKit を通じて逐次的にデータを取得していく機能を担うメソッドです。
これらインタフェースに準拠する実装を定義します(とはいっても、ここでは後述の ARSessionManager のシングルトンインスタンスを参照してメソッドを呼び出しているだけですが...):
/* `Dependencies/ARTrackingClient/ARTrackingClient+DepenencyKey.swift` */ extension ARTrackingClient: DepenencyKey { static let liveValue: Self = { let manager = ARSessionManager.shared return .init( startEventTracking: { manager.eventStream() }, startTracking: { manager.poseStream() }, stopTracking: { manager.pauseSession() } ) }() }
それでは具体的に ARKit からデータを取得する実装について見ていきます。
ARSessionManager が ARSessionState を内部的に所有し、それに紐づく ARSessionHandler がデータを吸い上げるという構成になっています。この ARSessionHandler が ARSessionDelegate プロトコルに準拠しているので、 ARKit が実際に取得するデータをアプリに連携できるという点がポイントです:
/* `Dependencies/ARTrackingClient/ARTrackingClient+DepenencyKey.swift` */ fileprivate class ARSessionManager: Sendable { static let shared = ARSessionManager() private let state = ARSessionState() func poseStream() -> AsyncStream<DevicePose> { AsyncStream { [state] continuation in Task { @MainActor in guard let handler = state.handler else { continuation.finish() return } let cancellable = handler.posePublisher .sink { continuation.yield($0) } continuation.onTermination = { _ in cancellable.cancel() } } } } }
/* `Dependencies/ARTrackingClient/ARTrackingClient+DepenencyKey.swift` */ fileprivate final class ARSessionState: @unchecked Sendable { private let lock = NSLock() private var _handler: ARSessionHandler? var handler: ARSessionHandler? { get { lock.withLock { _handler } } set { lock.withLock { _handler = newValue } } } }
/* `Dependencies/ARTrackingClient/ARTrackingClient+DepenencyKey.swift` */ @MainActor fileprivate final class ARSessionHandler: NSObject, ARSessionDelegate { private let poseSubject = PassthroughSubject<DevicePose, Never>() var posePublisher: AnyPublisher<DevicePose, Never> { poseSubject.eraseToAnyPublisher() } nonisolated func session(_ session: ARSession, didUpdate frame: ARFrame) { guard frame.camera.trackingState != .notAvailable else { return } guard CVPixelBufferGetWidth(frame.capturedImage) > 0 else { return } let transform = frame.camera.transform let pose = DevicePose( timestamp: frame.timestamp, position: SIMD3(transform.columns.3.x, transform.columns.3.y, transform.columns.3.z), orientation: simd_quatf(transform) ) Task { @MainActor in poseSubject.send(pose) } } }
ここまでで AR の機能を利用して座標などを取得する依存性を準備できたので、ビジネスロジック (Feature) を実装してビューから機能を呼び出せるようにしましょう:
import ComposableArchitecture @Reducer struct TrackingFeature { // ここまでに実装した `ARTrackingClient` @Dependency(\.arTrackingClient) var arTrackingClient // 後述のブリッジサーバーにデータを転送する依存性(ここでは割愛) @Dependency(\.teleop) var teleop @ObservableState struct State: Equatable { var currentPose: DevicePose = .zero } enum Action { case arEvent(TrackingEvent) } var body: some ReducerOf<Self> { Reduce { state, action in switch action { case .arEvent(let event): switch event { case .poseUpdated(let pose): state.currentPose = pose return .run { _ in try? await teleop.sendPose(pose) } } return .none } } }
その他の実装の詳細についてはこちらでは割愛しますが、ここまでの実装を UI と連携すると以下のようにデバイスの位置が取得できていることを確認できます:

GIF が荒く数値が読み取りづらいこともまたご愛嬌です...
Interface
ブリッジサーバーがトピックに配信するメッセージのインタフェースを定義する ROS 2 パッケージを作成します。
ros2 pkg create iphone_teleop_interface \ --build-type ament_cmake \ --destination-directory ${HOME}/ros2_ws/src/local \ --description 'iPhone teleoperation interface' \ --maintainer-name "${YOUR_NAME}" \ --maintainer-email "${YOUR_EMAIL}" \ --license MIT
上記のコマンドで初期化されたディレクトリに msg/EEPose.msg ファイルを作成し、 ROS 2 ノード間でやりとりされるメッセージのインタフェースを宣言します:
std_msgs/Header header # End-effector position in robot coordinate system (meters) float32[3] position # Gripper opening amount (0.0 = closed, 1.0 = fully open) float32 gripper_open # Gripper rotation in degrees float32 gripper_rotate
宣言したインタフェースをパッケージに含めるために、 CMakeLists.txt に以下のような設定を追加します:
rosidl_generate_interfaces(${PROJECT_NAME} "msg/EEPose.msg" DEPENDENCIES std_msgs )
また、 ROS 2 パッケージとしての(ビルド時や実行時の)依存関係を宣言するタグを package.xml に追加します:
<buildtool_depend>rosidl_default_generators</buildtool_depend> <depend>std_msgs</depend> <exec_depend>rosidl_default_runtime</exec_depend> <member_of_group>rosidl_interface_packages</member_of_group>
パッケージをビルドして、以下のようなコマンドでインタフェースが表示できれば OK です:
colcon build --packages-select iphone_teleop_interface source install/setup.bash ros2 pkg list | grep iphone_teleop_interface ros2 interface package iphone_teleop_interface ros2 interface show iphone_teleop_interface/msg/EEPose
Bridge Server
iPhone アプリが送信するデバイスの位置情報などを受け取り、 ROS 2 の世界に再配信する役割を担う BridgeServer ノードを実装します。
ここでのブリッジ(橋渡し)とは、ネットワーク経由で配信されるデータを ROS 2 トピックへと EEPose の形式で配信 (Publish) することを指します。以下に実装の具体例を示します:
"""Bridge Server. """ import asyncio import json import math import threading from datetime import datetime from typing import Any, cast import numpy as np import rclpy from iphone_teleop_interface.msg import EEPose from rclpy.executors import ExternalShutdownException from rclpy.node import Node from std_msgs.msg import Header from .message import MessageHandler from .network import TcpServer class BridgeServer(Node, MessageHandler): def __init__(self) -> None: super().__init__('bridge_server') self.declare_parameter('tcp_port', 8000) self.declare_parameter('topic_name', '/ee_pose') self.declare_parameter('qos_profile', 10) tcp_port = int(self.get_parameter('tcp_port').value) topic_name = str(self.get_parameter('topic_name').value) qos_profile = int(self.get_parameter('qos_profile').value) self._publisher = self.create_publisher(EEPose, topic_name, qos_profile) self._tcp_server = TcpServer(message_handler=self, port=tcp_port) async def run(self) -> None: await self._tcp_server.run() def handle_connect(self) -> None: pass def handle_disconnect(self) -> None: pass def handle_message(self, data: bytes, timestamp: float) -> None: ts = datetime.fromtimestamp(timestamp) parsed = cast(dict[str, Any], json.loads(data.decode('utf-8'))) if (id_ := parsed.get('__id')) is None: return match id_: case 'PoseStateMessage': position = cast(list[float], parsed['gripperDeltaPosition']) rotation = float(parsed['gripperRotateDegrees']) open_amount = float(parsed['gripperOpenAmount']) x = position[0] y = - position[2] z = position[1] msg = EEPose() msg.header = Header() msg.header.frame_id = 'ee' msg.header.stamp.sec = math.floor(ts.timestamp()) msg.header.stamp.nanosec = int(ts.microsecond * 1_000) msg.position = np.array([x, y, z], dtype=np.float32).tolist() msg.gripper_open = open_amount msg.gripper_rotate = rotation self._publisher.publish(msg) def main(args: list[str] | None = None) -> None: rclpy.init(args=args) node = BridgeServer() loop = asyncio.new_event_loop() def target() -> None: asyncio.set_event_loop(loop) loop.run_until_complete(node.run()) thread = threading.Thread(target=target, daemon=True) thread.start() try: node.get_logger().info('bridge server node started') rclpy.spin(node) except KeyboardInterrupt: node.get_logger().info('keyboard interrupt, shutting down bridge server...') except ExternalShutdownException: pass finally: node.destroy_node() if rclpy.ok(): rclpy.shutdown() loop.call_soon_threadsafe(loop.stop) thread.join(timeout=10.0)
Motion Planner
BridgeServer ノードが ROS 2 の世界に持ち込んだ入力データを利用して、ロボットアームの関節変位を具体的に求める MotionPlanner ノードを実装します。
ブリッジサーバーが転送するデータはエンドエフェクタの3次元空間内での位置座標であり、ロボットアームを動かすためには最終的にエンドエフェクタが「その位置」に来るように各関節の角度を求めるという 逆運動学 という問題を解く必要があります。逆運動学どのような問題であるかについては他に詳細かつ丁寧に解説されている記事があるため説明を割愛しますが、ここでの実装においては LeRobot リポジトリにおいても利用されている PlaCo という Python ライブラリを利用しています:
"""Motion Planner (for myCobot 280 Jetson Nano). """ import os from typing import cast import numpy as np import placo import rclpy from ament_index_python.packages import get_package_share_directory from iphone_teleop_interface.msg import EEPose from rclpy.executors import ExternalShutdownException from rclpy.node import Node from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy from sensor_msgs.msg import JointState from std_msgs.msg import Header class MotionPlanner(Node): def __init__(self) -> None: super().__init__('motion_planner') self.declare_parameter('states_topic_name', '/joint_states') states_topic_name = str(self.get_parameter('states_topic_name').value) self.declare_parameter('target_topic_name', '/ee_pose') target_topic_name = str(self.get_parameter('target_topic_name').value) self.declare_parameter('control_rate', 50.0) control_rate = float(self.get_parameter('control_rate').value) self.declare_parameter('kp', 1.0) kp = float(self.get_parameter('kp').value) self.declare_parameter('kd', 1.0) kd = float(self.get_parameter('kd').value) self.get_logger().info(f'control_rate: {control_rate} Hz, kp: {kp}, kd: {kd}') qos_profile = QoSProfile(history=HistoryPolicy.KEEP_LAST, depth=1, reliability=ReliabilityPolicy.RELIABLE) self._publisher = self.create_publisher(JointState, states_topic_name, qos_profile) self._subscription = self.create_subscription(EEPose, target_topic_name, self._target_callback, qos_profile) urdf_file_path = os.path.join(get_package_share_directory('mycobot_description'), 'urdf', 'mycobot_280_jn', 'mycobot_280_jn_adaptive_gripper.urdf') target_frame_name = 'gripper_base' self._robot = placo.RobotWrapper(urdf_file_path) self._target_frame_name = target_frame_name self._arm_joint_names = [ 'joint2_to_joint1', 'joint3_to_joint2', 'joint4_to_joint3', 'joint5_to_joint4', 'joint6_to_joint5', 'joint6output_to_joint6', ] for joint_name in self._arm_joint_names: self._robot.set_joint(joint_name, 0.0) self._robot.update_kinematics() ee_pose = self._robot.get_T_world_frame(self._target_frame_name) self._solver = placo.KinematicsSolver(self._robot) self._solver.dt = 1.0 / control_rate self._solver.mask_fbase(True) self._solver.enable_joint_limits(True) # 以下の制約も導入したかったものの、 URDF ファイルが不十分なのか # 更新頻度が高く処理が間に合わないのか、動作しなかったためここではコメントアウトしています... # self._solver.enable_velocity_limits(True) # self._solver.add_avoid_self_collisions_constraint() self._tip_task = self._solver.add_frame_task(self._target_frame_name, ee_pose) self._tip_task.configure(self._target_frame_name, 'soft', kp, kd) self._target_pose: np.ndarray | None = None self._current_joint_positions = [0.0] * len(self._arm_joint_names) self._timer = self.create_timer(1.0 / control_rate, self._timer_callback) def _timer_callback(self) -> None: if self._target_pose is None: return self._tip_task.T_world_frame = self._target_pose self._solver.solve(True) self._robot.update_kinematics() for idx, joint_name in enumerate(self._arm_joint_names): self._current_joint_positions[idx] = self._robot.get_joint(joint_name) joint_states = JointState() # Set header joint_states.header = Header() joint_states.header.stamp = self.get_clock().now().to_msg() # Set joint names and positions joint_states.name = self._arm_joint_names joint_states.position = self._current_joint_positions self._publisher.publish(joint_states) def _target_callback(self, msg: EEPose) -> None: x, y, z = cast(tuple[float, float, float], msg.position) self.get_logger().debug(f'target: x={x:.3f}, y={y:.3f}, z={z:.3f}') # [ # [ 1.0, 0.0, 0.0, 0.0], # [ 0.0, 1.0, 0.0, 0.0], # [ 0.0, 0.0, 1.0, 0.0], # [ 0.0, 0.0, 0.0, 1.0], # ] pose = np.eye(4) # [ # [ 1.0, 0.0, 0.0, x], # [ 0.0, 1.0, 0.0, y], # [ 0.0, 0.0, 1.0, z], # [ 0.0, 0.0, 0.0, 1.0], # ] pose[0:3, 3] = np.array([x, y, z]) # [ # [ 0.0, 0.0, 1.0, x], # [-1.0, 0.0, 0.0, y], # [ 0.0, -1.0, 0.0, z], # [ 0.0, 0.0, 0.0, 1.0], # ] pose[0:3, 0:3] = np.array([ [ 0.0, 0.0, 1.0], [-1.0, 0.0, 0.0], [ 0.0, -1.0, 0.0], ]) self._target_pose = pose def main(args: list[str] | None = None) -> None: rclpy.init(args=args) node = MotionPlanner() try: node.get_logger().info('motion planner node started') rclpy.spin(node) except KeyboardInterrupt: node.get_logger().info('keyboard interrupt, shutting down motion planner...') except ExternalShutdownException: pass finally: node.destroy_node() if rclpy.ok(): rclpy.shutdown()
Launch Scripts
ここまでで実装した BridgeServer と MotionPlanner を、ロボットアームの関節変位や RViz2 上に表示するノードと一緒に起動する Launch スクリプトを実装します。
こちらもスクリプト全体を貼り付けているため長い内容となってしまっていますが、①ロボットアームの URDF ファイルを指定し、②一緒に起動したいノードをその際のパラメータを指定して宣言して、③ROS 2 が指定する方法としての LaunchDescription を返すような関数 generate_launch_description としてまとめているということを行なっています:
import os from ament_index_python.packages import get_package_share_directory from launch import LaunchDescription from launch_ros.actions import Node def generate_launch_description() -> LaunchDescription: pkg_dir = get_package_share_directory('mycobot_description') urdf_path = os.path.join(pkg_dir, 'urdf', 'mycobot_280_jn', 'mycobot_280_jn_adaptive_gripper.urdf') with open(urdf_path, mode='r') as fp: robot_description = fp.read() pkg_dir = get_package_share_directory('iphone_teleop') rviz_path = p if os.path.isfile((p := os.path.join(pkg_dir, 'rviz', 'sim.rviz'))) else None robot_state_publisher_node = Node( package='robot_state_publisher', executable='robot_state_publisher', name='robot_state_publisher', output='screen', parameters=[{ 'robot_description': robot_description, 'use_sim_time': False, }], ) joint_state_publisher_node = Node( package='joint_state_publisher', executable='joint_state_publisher', name='joint_state_publisher', output='screen', parameters=[{ 'source_list': ['/joint_states', ], }], ) bridge_server_node = Node( package='iphone_teleop', executable='bridge_server', name='bridge_server', output='screen', ) motion_planner_node = Node( package='iphone_teleop', executable='motion_planner', name='motion_planner', output='screen', ) rviz2_node = Node( package='rviz2', executable='rviz2', name='rviz2', output='screen', arguments=[] if rviz_path is None else ['-d', rviz_path], ) return LaunchDescription([ robot_state_publisher_node, joint_state_publisher_node, bridge_server_node, motion_planner_node, rviz2_node, ])
ここまでの実装を確認してみましょう。 ROS 2 における Launch スクリプトは、以下のようにスクリプトが含まれるパッケージ名とスクリプト名をともなって launch コマンドで起動できます:
ros2 launch iphone_teleop sim.launch.py
RViz2 の画面が起動し、冒頭の GIF アニメのような結果になれば成功です!🎉
まとめ
実装の具体例を詰め込んだため、内容が冗長になり読みづらくなってしまった部分があるかもしれませんが、 iPhone でロボットアームをテレオペする機能開発の具体的なイメージを説明できたのではないでしょうか...?
とはいえ、今回は必要最低限の機能のみを実装したに過ぎません。エンドエフェクタの姿勢により重点を置いた逆運動学の解を求めるように修正が必要だったり、(定義するだけしておいて実はまだ使われていない)グリッパーの状態の連携であったり、模倣学習などに応用するのであればビジョンの情報を収集するようなノードを追加したりと、まだまだ改善の余地が盛り沢山です。
もう少し機能性を安定させてシミュレーションも完璧!となった暁には実機での動作確認を行い、今回のような Tech Blog として発信できればと思っています。ここまでお付き合いいただきありがとうございましたmm
We Are Hiring!
ABEJAは、テクノロジーの社会実装に取り組んでいます。 技術はもちろん、技術をどのようにして社会やビジネスに組み込んでいくかを考えるのが好きな方は、下記採用ページからエントリーください! (新卒の方やインターンシップのエントリーもお待ちしております!) careers.abejainc.com
特に下記ポジションの募集を強化しています!ぜひ御覧ください!
