ABEJA Tech Blog

中の人の興味のある情報を発信していきます

iPhone でロボットアームをテレオペ! ~ RViz2 編 ~

こんにちは、 ABEJA でソフトウェアエンジニアをしている三枝 (@moriaki3193) です。ABEJAアドベントカレンダー2025の5日目の記事を担当させていただきます!

今年の9月より新設されたエンボディドインテリジェンスグループという組織でロボティクスの社会実装をテーマに開発業務に取り組むことになったのですが、私はこれまでロボティクス機能の開発経験がありませんでした。この記事は、そんな自分がキャッチアップを兼ねて iPhone でロボットアームをテレオペする機能を開発してみた、というものです。ただし、まだ実機で動作確認をするのは怖かったので、今回は RViz2 を利用してシミュレーションを行うところまでをスコープとしました。お手柔らかにお付き合いください。

【重要!!!】また、同日に同僚のプロジェクトマネージャが記事を出すのですが、ひょっとするとテレオペがいらなくなるかもしれない世界について解説しているのでそちらも合わせてお楽しみください!


つくるもの

RViz2 上でのシミュレーション結果

(エンドエフェクタが下を向いてしまっているのはご愛嬌です、余裕があれば修正します...)

今回実装するものは 1. ロボットアームを操作するための iOS アプリ 2. 操作内容を受け取りロボットアームの状態を制御する ROS 2 パッケージの2つです。

iOS アプリの機能としては、当初はロボットアームに対して「土台部分を 90 度回転させて」「先端部分を 10cm 下げて」というように各関節の状態を送信するような構成を考えていたのですが、今回は trzy/robot-arm リポジトリのような先例の構成を真似て、デバイスの実際の位置に対応するように先端部分(エンドエフェクタ)の位置のみを送信するものとしました。

そのようにして指定されるエンドエフェクタの位置をロボットアームに変換する機能を ROS 2 パッケージとして実装します。 A. iOS アプリからのデータを受け取る機能 B. 指定される位置に到達するように各関節の状態を制御する機能の2つの機能が必要となるため、それらをパッケージ内のノードとして実装します。また、結果を確認するためにそれぞれの機能を連携し、可視化のためのツールを起動するスクリプトも作成します。

今回の実装の全体像

上記の図は、テレオペという機能を実現する上での構成要素の関係性を表したものです。たくさんの登場人物がいるように見えますが、実装が必要になるのは左側の上3つです。 ROS 2 には有用な実装がさまざまにコミュニティに公開されているので、それらを活用することで開発しなければならないものを削減できます。広く利用されてきた機能性を組み合わせて再活用し、新たな機能性を開発できることが ROS 2 の強みの一つであると感じました。

とはいえ、自前で用意しなければならない実装の内容自体はある程度大きなものとなってしまうため、私が今回用意したもの全体をブログに掲載することは難しく、重要な部分のみを抜粋しています。執筆時から更新・改善されている部分もあるので、(時間が許せば)リポジトリとして全体を公開できればと考えています...

つくりかた

Teleoperator

まずはロボットアームのエンドエフェクタの座標、姿勢やグリッパーの開閉具合の情報を送信する iOS アプリを開発していきます。個人的な好みで The Composable Architecture (TCA) を採用していますが、実装上の要点についてはご理解いただけると思います。

Apple 公式のフレームワークに ARKit があります。その名前の通り拡張現実を扱うものであり、デバイスが認識している現実世界についての情報を取得するための API が提供されています。今回はそれらを利用して3次元空間内での座標を取得する機能を実装します。

まずは ARKit を通じて取得できるデータを表現するモデル DevicePose を定義します。データが取得された日時を表す timestamp 、3次元空間内の座標 position と姿勢角 orientation からなる構造体です:

/* `Models/DevicePose.swift` */
import Foundation
import simd

struct DevisePose: Equatable, Sendable {
  var timestamp: TimeInterval

  var position: SIMD3<Float>
  var orientation: simd_quatf
}

次に ARKit を利用してデータを取得するクライアント ARTrackingClient を実装していきます。 TCA を利用しているので、まずはインタフェースとなる構造体を宣言します:

/* `Dependencies/ARTrackingClient/ARTrackingClient.swift` */
struct ARTrackingClient: Sendable {
  var startEventTracking: @Sendable () -> AsyncStream<TrackingEvent>
  var startTracking: @Sendable () -> AsyncStream<DevicePose>
  var stopTracking: @Sendable () async -> Void
}

ここでの var startTracking: @Sendable () -> AsyncStream<DevicePose> が、 ARKit を通じて逐次的にデータを取得していく機能を担うメソッドです。 これらインタフェースに準拠する実装を定義します(とはいっても、ここでは後述の ARSessionManager のシングルトンインスタンスを参照してメソッドを呼び出しているだけですが...):

/* `Dependencies/ARTrackingClient/ARTrackingClient+DepenencyKey.swift` */
extension ARTrackingClient: DepenencyKey {
  static let liveValue: Self = {
    let manager = ARSessionManager.shared

    return .init(
      startEventTracking: { manager.eventStream() },
      startTracking: { manager.poseStream() },
      stopTracking: { manager.pauseSession() }
    )
  }()
}

それでは具体的に ARKit からデータを取得する実装について見ていきます。

ARSessionManagerARSessionState を内部的に所有し、それに紐づく ARSessionHandler がデータを吸い上げるという構成になっています。この ARSessionHandlerARSessionDelegate プロトコルに準拠しているので、 ARKit が実際に取得するデータをアプリに連携できるという点がポイントです:

/* `Dependencies/ARTrackingClient/ARTrackingClient+DepenencyKey.swift` */
fileprivate class ARSessionManager: Sendable {
  static let shared = ARSessionManager()

  private let state = ARSessionState()

  func poseStream() -> AsyncStream<DevicePose> {
    AsyncStream { [state] continuation in
      Task { @MainActor in
        guard let handler = state.handler else {
          continuation.finish()
          return
        }

        let cancellable = handler.posePublisher
          .sink { continuation.yield($0) }

        continuation.onTermination = { _ in
          cancellable.cancel()
        }
      }
    }
  }
}
/* `Dependencies/ARTrackingClient/ARTrackingClient+DepenencyKey.swift` */
fileprivate final class ARSessionState: @unchecked Sendable {
  private let lock = NSLock()

  private var _handler: ARSessionHandler?

  var handler: ARSessionHandler? {
    get { lock.withLock { _handler } }
    set { lock.withLock { _handler = newValue } }
  }
}
/* `Dependencies/ARTrackingClient/ARTrackingClient+DepenencyKey.swift` */
@MainActor
fileprivate final class ARSessionHandler: NSObject, ARSessionDelegate {
  private let poseSubject = PassthroughSubject<DevicePose, Never>()

  var posePublisher: AnyPublisher<DevicePose, Never> {
    poseSubject.eraseToAnyPublisher()
  }

  nonisolated func session(_ session: ARSession, didUpdate frame: ARFrame) {
    guard frame.camera.trackingState != .notAvailable else { return }
    guard CVPixelBufferGetWidth(frame.capturedImage) > 0 else { return }

    let transform = frame.camera.transform

    let pose = DevicePose(
      timestamp: frame.timestamp,
      position: SIMD3(transform.columns.3.x, transform.columns.3.y, transform.columns.3.z),
      orientation: simd_quatf(transform)
    )

    Task { @MainActor in
      poseSubject.send(pose)
    }
  }
}

ここまでで AR の機能を利用して座標などを取得する依存性を準備できたので、ビジネスロジック (Feature) を実装してビューから機能を呼び出せるようにしましょう:

import ComposableArchitecture

@Reducer
struct TrackingFeature {
  // ここまでに実装した `ARTrackingClient`
  @Dependency(\.arTrackingClient) var arTrackingClient
  // 後述のブリッジサーバーにデータを転送する依存性(ここでは割愛)
  @Dependency(\.teleop) var teleop

  @ObservableState
  struct State: Equatable {
    var currentPose: DevicePose = .zero
  }

  enum Action {
    case arEvent(TrackingEvent)
  }

  var body: some ReducerOf<Self> {
    Reduce { state, action in
      switch action {
      case .arEvent(let event):
        switch event {
        case .poseUpdated(let pose):
          state.currentPose = pose

          return .run { _ in
            try? await teleop.sendPose(pose)
          }
        }

        return .none
      }
  }
}

その他の実装の詳細についてはこちらでは割愛しますが、ここまでの実装を UI と連携すると以下のようにデバイスの位置が取得できていることを確認できます:

iPhone を動かすと座標も変化する様子
GIF が荒く数値が読み取りづらいこともまたご愛嬌です...

Interface

ブリッジサーバーがトピックに配信するメッセージのインタフェースを定義する ROS 2 パッケージを作成します。

ros2 pkg create iphone_teleop_interface \
  --build-type ament_cmake \
  --destination-directory ${HOME}/ros2_ws/src/local \
  --description 'iPhone teleoperation interface' \
  --maintainer-name "${YOUR_NAME}" \
  --maintainer-email "${YOUR_EMAIL}" \
  --license MIT

上記のコマンドで初期化されたディレクトリに msg/EEPose.msg ファイルを作成し、 ROS 2 ノード間でやりとりされるメッセージのインタフェースを宣言します:

std_msgs/Header header

# End-effector position in robot coordinate system (meters)
float32[3] position

# Gripper opening amount (0.0 = closed, 1.0 = fully open)
float32 gripper_open

# Gripper rotation in degrees
float32 gripper_rotate

宣言したインタフェースをパッケージに含めるために、 CMakeLists.txt に以下のような設定を追加します:

rosidl_generate_interfaces(${PROJECT_NAME}
  "msg/EEPose.msg"
  DEPENDENCIES std_msgs
)

また、 ROS 2 パッケージとしての(ビルド時や実行時の)依存関係を宣言するタグを package.xml に追加します:

  <buildtool_depend>rosidl_default_generators</buildtool_depend>

  <depend>std_msgs</depend>

  <exec_depend>rosidl_default_runtime</exec_depend>

  <member_of_group>rosidl_interface_packages</member_of_group>

パッケージをビルドして、以下のようなコマンドでインタフェースが表示できれば OK です:

colcon build --packages-select iphone_teleop_interface
source install/setup.bash

ros2 pkg list | grep iphone_teleop_interface
ros2 interface package iphone_teleop_interface
ros2 interface show iphone_teleop_interface/msg/EEPose

Bridge Server

iPhone アプリが送信するデバイスの位置情報などを受け取り、 ROS 2 の世界に再配信する役割を担う BridgeServer ノードを実装します。

ここでのブリッジ(橋渡し)とは、ネットワーク経由で配信されるデータを ROS 2 トピックへと EEPose の形式で配信 (Publish) することを指します。以下に実装の具体例を示します:

"""Bridge Server.
"""
import asyncio
import json
import math
import threading
from datetime import datetime
from typing import Any, cast

import numpy as np
import rclpy
from iphone_teleop_interface.msg import EEPose
from rclpy.executors import ExternalShutdownException
from rclpy.node import Node
from std_msgs.msg import Header

from .message import MessageHandler
from .network import TcpServer


class BridgeServer(Node, MessageHandler):

  def __init__(self) -> None:
    super().__init__('bridge_server')

    self.declare_parameter('tcp_port', 8000)

    self.declare_parameter('topic_name', '/ee_pose')
    self.declare_parameter('qos_profile', 10)

    tcp_port = int(self.get_parameter('tcp_port').value)

    topic_name = str(self.get_parameter('topic_name').value)
    qos_profile = int(self.get_parameter('qos_profile').value)

    self._publisher = self.create_publisher(EEPose, topic_name, qos_profile)

    self._tcp_server = TcpServer(message_handler=self, port=tcp_port)

  async def run(self) -> None:
    await self._tcp_server.run()

  def handle_connect(self) -> None:
    pass

  def handle_disconnect(self) -> None:
    pass

  def handle_message(self, data: bytes, timestamp: float) -> None:
    ts = datetime.fromtimestamp(timestamp)
    parsed = cast(dict[str, Any], json.loads(data.decode('utf-8')))

    if (id_ := parsed.get('__id')) is None:
      return

    match id_:
      case 'PoseStateMessage':
        position = cast(list[float], parsed['gripperDeltaPosition'])
        rotation = float(parsed['gripperRotateDegrees'])
        open_amount = float(parsed['gripperOpenAmount'])

        x =   position[0]
        y = - position[2]
        z =   position[1]

        msg = EEPose()

        msg.header = Header()
        msg.header.frame_id = 'ee'
        msg.header.stamp.sec = math.floor(ts.timestamp())
        msg.header.stamp.nanosec = int(ts.microsecond * 1_000)

        msg.position = np.array([x, y, z], dtype=np.float32).tolist()
        msg.gripper_open = open_amount
        msg.gripper_rotate = rotation

        self._publisher.publish(msg)


def main(args: list[str] | None = None) -> None:
  rclpy.init(args=args)

  node = BridgeServer()
  loop = asyncio.new_event_loop()

  def target() -> None:
    asyncio.set_event_loop(loop)
    loop.run_until_complete(node.run())

  thread = threading.Thread(target=target, daemon=True)
  thread.start()

  try:
    node.get_logger().info('bridge server node started')
    rclpy.spin(node)

  except KeyboardInterrupt:
    node.get_logger().info('keyboard interrupt, shutting down bridge server...')

  except ExternalShutdownException:
    pass

  finally:
    node.destroy_node()

    if rclpy.ok():
      rclpy.shutdown()

    loop.call_soon_threadsafe(loop.stop)
    thread.join(timeout=10.0)

Motion Planner

BridgeServer ノードが ROS 2 の世界に持ち込んだ入力データを利用して、ロボットアームの関節変位を具体的に求める MotionPlanner ノードを実装します。

ブリッジサーバーが転送するデータはエンドエフェクタの3次元空間内での位置座標であり、ロボットアームを動かすためには最終的にエンドエフェクタが「その位置」に来るように各関節の角度を求めるという 逆運動学 という問題を解く必要があります。逆運動学どのような問題であるかについては他に詳細かつ丁寧に解説されている記事があるため説明を割愛しますが、ここでの実装においては LeRobot リポジトリにおいても利用されている PlaCo という Python ライブラリを利用しています:

"""Motion Planner (for myCobot 280 Jetson Nano).
"""
import os
from typing import cast

import numpy as np
import placo
import rclpy
from ament_index_python.packages import get_package_share_directory
from iphone_teleop_interface.msg import EEPose
from rclpy.executors import ExternalShutdownException
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
from sensor_msgs.msg import JointState
from std_msgs.msg import Header


class MotionPlanner(Node):

  def __init__(self) -> None:
    super().__init__('motion_planner')

    self.declare_parameter('states_topic_name', '/joint_states')
    states_topic_name = str(self.get_parameter('states_topic_name').value)

    self.declare_parameter('target_topic_name', '/ee_pose')
    target_topic_name = str(self.get_parameter('target_topic_name').value)

    self.declare_parameter('control_rate', 50.0)
    control_rate = float(self.get_parameter('control_rate').value)

    self.declare_parameter('kp', 1.0)
    kp = float(self.get_parameter('kp').value)

    self.declare_parameter('kd', 1.0)
    kd = float(self.get_parameter('kd').value)

    self.get_logger().info(f'control_rate: {control_rate} Hz, kp: {kp}, kd: {kd}')

    qos_profile = QoSProfile(history=HistoryPolicy.KEEP_LAST, depth=1, reliability=ReliabilityPolicy.RELIABLE)

    self._publisher = self.create_publisher(JointState, states_topic_name, qos_profile)
    self._subscription = self.create_subscription(EEPose, target_topic_name, self._target_callback, qos_profile)

    urdf_file_path = os.path.join(get_package_share_directory('mycobot_description'), 'urdf', 'mycobot_280_jn', 'mycobot_280_jn_adaptive_gripper.urdf')
    target_frame_name = 'gripper_base'

    self._robot = placo.RobotWrapper(urdf_file_path)
    self._target_frame_name = target_frame_name

    self._arm_joint_names = [
      'joint2_to_joint1',
      'joint3_to_joint2',
      'joint4_to_joint3',
      'joint5_to_joint4',
      'joint6_to_joint5',
      'joint6output_to_joint6',
    ]

    for joint_name in self._arm_joint_names:
      self._robot.set_joint(joint_name, 0.0)

    self._robot.update_kinematics()

    ee_pose = self._robot.get_T_world_frame(self._target_frame_name)

    self._solver = placo.KinematicsSolver(self._robot)
    self._solver.dt = 1.0 / control_rate
    self._solver.mask_fbase(True)
    self._solver.enable_joint_limits(True)

    # 以下の制約も導入したかったものの、 URDF ファイルが不十分なのか
    # 更新頻度が高く処理が間に合わないのか、動作しなかったためここではコメントアウトしています...
    # self._solver.enable_velocity_limits(True)
    # self._solver.add_avoid_self_collisions_constraint()

    self._tip_task = self._solver.add_frame_task(self._target_frame_name, ee_pose)
    self._tip_task.configure(self._target_frame_name, 'soft', kp, kd)

    self._target_pose: np.ndarray | None = None
    self._current_joint_positions = [0.0] * len(self._arm_joint_names)

    self._timer = self.create_timer(1.0 / control_rate, self._timer_callback)

  def _timer_callback(self) -> None:
    if self._target_pose is None:
      return

    self._tip_task.T_world_frame = self._target_pose

    self._solver.solve(True)
    self._robot.update_kinematics()

    for idx, joint_name in enumerate(self._arm_joint_names):
      self._current_joint_positions[idx] = self._robot.get_joint(joint_name)

    joint_states = JointState()

    # Set header
    joint_states.header = Header()
    joint_states.header.stamp = self.get_clock().now().to_msg()

    # Set joint names and positions
    joint_states.name = self._arm_joint_names
    joint_states.position = self._current_joint_positions

    self._publisher.publish(joint_states)

  def _target_callback(self, msg: EEPose) -> None:
    x, y, z = cast(tuple[float, float, float], msg.position)

    self.get_logger().debug(f'target: x={x:.3f}, y={y:.3f}, z={z:.3f}')

    # [
    #   [ 1.0,  0.0,  0.0,  0.0],
    #   [ 0.0,  1.0,  0.0,  0.0],
    #   [ 0.0,  0.0,  1.0,  0.0],
    #   [ 0.0,  0.0,  0.0,  1.0],
    # ]
    pose = np.eye(4)

    # [
    #   [ 1.0,  0.0,  0.0,    x],
    #   [ 0.0,  1.0,  0.0,    y],
    #   [ 0.0,  0.0,  1.0,    z],
    #   [ 0.0,  0.0,  0.0,  1.0],
    # ]
    pose[0:3, 3] = np.array([x, y, z])

    # [
    #   [ 0.0,  0.0,  1.0,    x],
    #   [-1.0,  0.0,  0.0,    y],
    #   [ 0.0, -1.0,  0.0,    z],
    #   [ 0.0,  0.0,  0.0,  1.0],
    # ]
    pose[0:3, 0:3] = np.array([
      [ 0.0,  0.0,  1.0],
      [-1.0,  0.0,  0.0],
      [ 0.0, -1.0,  0.0],
    ])

    self._target_pose = pose


def main(args: list[str] | None = None) -> None:
  rclpy.init(args=args)

  node = MotionPlanner()

  try:
    node.get_logger().info('motion planner node started')
    rclpy.spin(node)

  except KeyboardInterrupt:
    node.get_logger().info('keyboard interrupt, shutting down motion planner...')

  except ExternalShutdownException:
    pass

  finally:
    node.destroy_node()

    if rclpy.ok():
      rclpy.shutdown()

Launch Scripts

ここまでで実装した BridgeServerMotionPlanner を、ロボットアームの関節変位や RViz2 上に表示するノードと一緒に起動する Launch スクリプトを実装します。 こちらもスクリプト全体を貼り付けているため長い内容となってしまっていますが、①ロボットアームの URDF ファイルを指定し、②一緒に起動したいノードをその際のパラメータを指定して宣言して、③ROS 2 が指定する方法としての LaunchDescription を返すような関数 generate_launch_description としてまとめているということを行なっています:

import os

from ament_index_python.packages import get_package_share_directory
from launch import LaunchDescription
from launch_ros.actions import Node


def generate_launch_description() -> LaunchDescription:
  pkg_dir = get_package_share_directory('mycobot_description')
  urdf_path = os.path.join(pkg_dir, 'urdf', 'mycobot_280_jn', 'mycobot_280_jn_adaptive_gripper.urdf')

  with open(urdf_path, mode='r') as fp:
    robot_description = fp.read()

  pkg_dir = get_package_share_directory('iphone_teleop')
  rviz_path = p if os.path.isfile((p := os.path.join(pkg_dir, 'rviz', 'sim.rviz'))) else None

  robot_state_publisher_node = Node(
    package='robot_state_publisher',
    executable='robot_state_publisher',
    name='robot_state_publisher',
    output='screen',
    parameters=[{
      'robot_description': robot_description,
      'use_sim_time': False,
    }],
  )

  joint_state_publisher_node = Node(
    package='joint_state_publisher',
    executable='joint_state_publisher',
    name='joint_state_publisher',
    output='screen',
    parameters=[{
      'source_list': ['/joint_states', ],
    }],
  )

  bridge_server_node = Node(
    package='iphone_teleop',
    executable='bridge_server',
    name='bridge_server',
    output='screen',
  )

  motion_planner_node = Node(
    package='iphone_teleop',
    executable='motion_planner',
    name='motion_planner',
    output='screen',
  )

  rviz2_node = Node(
    package='rviz2',
    executable='rviz2',
    name='rviz2',
    output='screen',
    arguments=[] if rviz_path is None else ['-d', rviz_path],
  )

  return LaunchDescription([
    robot_state_publisher_node,
    joint_state_publisher_node,
    bridge_server_node,
    motion_planner_node,
    rviz2_node,
  ])

ここまでの実装を確認してみましょう。 ROS 2 における Launch スクリプトは、以下のようにスクリプトが含まれるパッケージ名とスクリプト名をともなって launch コマンドで起動できます:

ros2 launch iphone_teleop sim.launch.py

RViz2 の画面が起動し、冒頭の GIF アニメのような結果になれば成功です!🎉

まとめ

実装の具体例を詰め込んだため、内容が冗長になり読みづらくなってしまった部分があるかもしれませんが、 iPhone でロボットアームをテレオペする機能開発の具体的なイメージを説明できたのではないでしょうか...?

とはいえ、今回は必要最低限の機能のみを実装したに過ぎません。エンドエフェクタの姿勢により重点を置いた逆運動学の解を求めるように修正が必要だったり、(定義するだけしておいて実はまだ使われていない)グリッパーの状態の連携であったり、模倣学習などに応用するのであればビジョンの情報を収集するようなノードを追加したりと、まだまだ改善の余地が盛り沢山です。

もう少し機能性を安定させてシミュレーションも完璧!となった暁には実機での動作確認を行い、今回のような Tech Blog として発信できればと思っています。ここまでお付き合いいただきありがとうございましたmm

We Are Hiring!

ABEJAは、テクノロジーの社会実装に取り組んでいます。 技術はもちろん、技術をどのようにして社会やビジネスに組み込んでいくかを考えるのが好きな方は、下記採用ページからエントリーください! (新卒の方やインターンシップのエントリーもお待ちしております!) careers.abejainc.com

特に下記ポジションの募集を強化しています!ぜひ御覧ください!

トランスフォーメーション領域:データサイエンティスト

トランスフォーメーション領域:データサイエンティスト(ミドル)

トランスフォーメーション領域:データサイエンティスト(シニア)