こんにちは!ABEJA で ABEJA Platform 開発を行っている坂井(@Yagami360)です。
先日の記事で、ロボティクス領域の VLA モデル「π0」の内部の仕組みを解説しました。 tech-blog.abeja.asia
今回の記事では、この「π0」を LeRobot を使用して実際に動かしてます。(π0 を動かす分には、中身の仕組みは必ずしも知らなくても十分なので↑の記事はスキップしてもらってもOKです。)
とはいえ、いきなり実機で動かすのは色々ハードルがあるので、この記事では Gymnasium でのシミュレーター環境上で動かしたり(推論させたり)、ファインチューニングしたりする方法を解説します。
使用ライブラリ
本記事では、主に LeRobot と Gymnasium というライブラリを使用して、π0 モデルを動かします。(内部的には Pytorch 等も使用しますが、使い慣れたライブラリだと思うのでここでの説明は省略します)
LeRobot とは?
HuggingFace で公開されているライブラリで、ロボティクスや強化学習用のライブラリ・学習済モデル・データセット等を提供しています。
LeRobot を使用すれば、PyTorch や Tensoflow といった機械学習フレームワークで頑張って実装しなくとも、比較的手頃にロボティクス領域におけるモデルをシミュレーター環境上や SO101 といった実機等で動かすことができるようです。HuggingFace には Transformer という LLM のための有名なライブラリがありますが、LeRobot はロボティクスや強化学習領域における Transformer 的なものを目指しているのだと個人的には思っています。
LeRobot では、π0 モデルの学習済みチェックポイントや PyTorch 実装も公開されており、π0 モデルも動かすことができます。
尚、π0 モデルの公式実装や学習済みチェックポイントは、以下のレポジトリで公開されていますが、こちらの公式実装では Jax という数値計算ライブラリを使用したコードになっています。 github.com
一方で、LeRobot で公開されている学習済みチェックポイントや実装は使い慣れた numpy + PyTorch で実装になっているので、こちらのほうが使いやすいかと思います。
Gymnasium とは?
強化学習のモデルをシミュレーター上で動かす際に利用するライブラリです。
ロボティクスなどでの強化学習モデルを現実世界でいきなり動かそうとすると大変でハードルが高いですが、シミュレーターを使用すれば比較的容易に動かしたり(推論させたり)、モデルの学習を行なうことができるので、モデルの改善などを試行錯誤をやりやすくなります。
なお強化学習の分野では、OpenAIGym という OpenAI が提供していたライブラリが広く利用されてましたが、ChatGPT とかで OpenAI が Open でなくなった頃に開発終了してしましました。Gymnasium は、そんな OpenAIGym の後を引き続いで開発されているライブラリになります。
Gymnasium は LeRobot をインストールすれば自動的にインストールされます
環境構築方法
GPU インスタンスを用意する
今回の記事では、GPU ありでの学習(ファインチューニング)を行なうので、GPU インスタンスが必要になります。 π0モデルは、内部で LLM と拡散系モデルを使った生成AIモデルになるため、大量のGPUメモリ(少なくとも32GB程度<AMPなしの場合>)が必要になります。
今回は、以下環境で動作確認しています。(記事作成時点<2025-05-16>)- GPU: V100 1台 - GPU メモリ: 32 GB - Ubuntu: 22.04 - GPU Driver: 550.54.15, 550.90.07 - CUDA: 12.4 - Python: 3.10 - LeRobot: 0.1.0
Lerobot 内部の Pytorch バージョン範囲によって利用可能な CUDA バージョンが決まって、CUDA バージョンによって利用可能な GPU ドライバーバージョンがきまるので、以下の情報等を参考にして最適なバージョンでよしなに環境構築して貰えればと思います。
LeRobot をインストールする
基本的に以下公式の README に従えば OK です
今回は LeRobot の π0 モデルも使用するので、以下のコマンドも実行してください。
cd lerobot pip install -e ".[pi0]"
また今回は Gymnasium で
pusht
とaloha
というシミュレーター環境を利用するので、これらもインストールしてくださいcd lerobot pip install -e ".[pusht]" pip install -e ".[aloha]"
自分の環境では pytest もインストールしないと動かなかったので、必要に応じて pytest もインストールしてください
conda install pytest
pushT シミュレーター環境上でπ0を動かす
π0 モデルを動かすための環境構築が済んだので、まずは pusht という簡単なタスクで π0 モデルを動かせるようにします。 pusht というのは、以下のように T 文字形状の隙間にT文字形状のオブジェクトをロボットが動かしながらはめ込むタスクになります。
やってることは全然たいしたことないやつで π0 じゃなくてももっと簡単な強化学習モデルでも動かせるやつですが、まずは簡単なタスクからやったほうが記事の理解がしやすくなるかと思ってこのタスクにしています
今回の記事では、LeRobot の π0 モデルを各シミュレーター環境用にファインチューニングしたうえで、推論して動かせるようにします。
推論前にファインチューニングが必要なのは、π0 モデルの入出力層を各環境用の次元にそう形に変更し、内部のネットワーク重みもそれに応じて調整する必要があるためです。 各環境用にファインチューニングしてない π0 ベースモデルで動かそうとしても、入出力層のテンソルの shape 不一致になるので動きません。
具体的には、今回の pusht のような2次元環境場合は、π0 モデルの入力層に入力するロボットの状態は x,y 座標という2次元になり、π0 モデルの出力層からの出力データは、ロボットの平面移動方向の2次元ベクトルになります。 一方で3次元環境の場合は、π0 モデルの入力層に入力するロボットの状態は x,y,z 座標という3次元になり、π0 モデルの出力層からの出力データは、ロボットの移動方向の3次元ベクトルになるため、入出力層の shape が一致せず、入出力層のファインチューニングを行わないと動かせいといった具合です。また入出力層の shape や意味合い変更に伴う内部のネットワーク重みの調整を行わないと精度も大きく悪化します。(これらはπ0モデルに限った話ではなく、機械学習モデルにおける一般的な話です)
学習用データセット
モデルを各環境用にファインチューニングするためには、各環境用の学習用データセットが必要になります。
幸いなことに pusht というシミュレーション環境の場合は、LeRobot で pusht 用のデータセットが公開されているので、今回の記事ではこれをそのまま利用します。
LeRobot 公開用データセットは、以下のようなコードで読み込めます。
import lerobot from lerobot.common.datasets.lerobot_dataset import LeRobotDataset # 全エピソードのデータを取得したい場合 dataset = LeRobotDataset("lerobot/pusht") # 特定のエピソードのデータのみ取得したい場合 # dataset = LeRobotDataset("lerobot/pusht", episodes=[0, 5, 10]) # 利用可能なデータセットを表示したい場合 # print("List of available datasets:", lerobot.available_datasets)
この学習用データセット(lerobot/pusht
)の中身(あるエピソードの中のある時間ステップのデータ dataset[0]
)は、以下のようになっています。
{ # 環境の画像 'observation.image': tensor([[[1.0000, 0.9725, 0.9725, ..., 0.9725, 0.9725, 1.0000], [0.9725, 0.9098, 0.9098, ..., 0.9098, 0.9098, 0.9725], [0.9725, 0.9098, 0.9725, ..., 1.0000, 0.9098, 0.9725], ..., [1.0000, 0.9725, 0.9725, ..., 0.9725, 0.9725, 1.0000]]]), # ロボットの x, y 位置 'observation.state': tensor([222., 97.]), # ロボットの次の行動 'action': tensor([233., 71.]), # エピソードの値 'episode_index': tensor(0), # フレーム(時間ステップ) 'frame_index': tensor(0), # 時間情報 'timestamp': tensor(0.), # エピソードが終了したかどうかのフラグ 'next.done': tensor(False), # エピソードが成功したかどうかのフラグ 'next.success': tensor(False), # データセット内の絶対的なインデックス 'index': tensor(0), # タスクの種類を示すインデックス 'task_index': tensor(0), # ロボットへの制御指示テキスト 'task': 'Push the T-shaped block onto the T-shaped target.' }
π0 モデルは、OXE [Open X-Embodiment] データセットで学習しており、このデータセットは RLDS [Reinforcement Learning Datasets] という強化学習のフレームワーク(エピソード・ステップ・行動・報酬など)に準じたフォーマットになっていました。
この pusht 用データセットに限らず LeRobot で公開しているデータセットも、RLDS に似た形で保存されているようです。(それ故に、強化学習や模倣学習で利用しやすいデータセットになっている)
モデルのファインチューニング
LeRobot ではモデルの学習用スクリプトも公開しています
lerobot/lerobot/scripts/train.py at main · huggingface/lerobot · GitHub
この学習用スクリプトを利用すれば、簡単にモデルをファインチューニングすることができます。 pusht データセットで π0 モデルをファインチューニングするコマンドは、以下のようになります。
python lerobot/scripts/train.py \ --policy.path=lerobot/pi0 \ --dataset.repo_id=lerobot/pusht \ --env.type=pusht \ --batch_size=2 \ --num_workers=2 \ --steps=100000 \ --policy.device=cuda
各引数の意味は、以下のようになります。
--policy.path=lerobot/pi0
: LeRobot で公開しているファインチューニングしていない π0 モデルを読み込み、これをベースに追加学習(ファインチューニング)を行なう--dataset.repo_id=lerobot/pusht
:学習用データセットとして、上記 pusht 用学習用データセットを使用して学習を行なう--env.type=pusht
:pusht 用のシミュレーター環境で学習を行なう--steps
:学習ステップ数。loss 値が十分収束するステップ数を設定してください。デフォルト値の100000
とした場合自分の環境(V1001台)では、丸1日程度の学習時間がかかりました。--batch_size
:学習時のバッチサイズ。バッチサイズが大きいほど学習時間が短くなりますが、使用するGPUメモリは増えます。自分の環境ではバッチサイズ2
でも GPUメモリを32GB程度消費してこれ以上だと動かせなかったのでこの値にしていますが、自身のインスタンス環境に応じて調整してください。--num_workers
:データセット読み込み時に使用するデータローダーの並列度(デフォルト値4
)。基本的にインスタンスのCPU数と同じでよいです。今回は、他の学習スクリプトも動かしていた関係上2
にしていますが、自身のインスタンス環境に応じて調整してください。--policy.device=cuda
:GPUで学習する場合はcuda
を指定
その他引数の意味は python lerobot/scripts/train.py --help
コマンドで確認してください。今回はその他引数に関しては、デフォルト値で学習します。
学習が完了すると、ディレクトリ以下に学習済みモデルのチェックポイントが保存されるので、後段の推論ステップでこれを利用します
また以下のように wandb 関連の引数を設定すると、wandb 上で損失関数などの学習状況を可視化することもできます
python lerobot/scripts/train.py \ --wandb.project=${WANDB_PROJECT_NAME} \ --wandb.enable=true \ --policy.path=lerobot/pi0 \ --dataset.repo_id=lerobot/pusht \ --env.type=pusht \ --batch_size=2 \ --num_workers=2 \ --steps=100000 \ --policy.device=cuda
少なくとも loss 値が十分に収束しているかは確認したほうがいいです。(逆に収束しすぎると過学習になるので注意してください)
以上のようにこの学習用スクリプトを利用すれば、簡単にモデルをファインチューニングすることができるのですが、とはいえこの学習スクリプトをそのまま利用するだけでは何をやっているかわからないと思うので、以下にもっと簡略化した学習スクリプト例も公開しておきます。
import argparse import os from pathlib import Path import torch from torchvision.transforms import ToPILImage, v2 from lerobot.common.datasets.lerobot_dataset import ( LeRobotDataset, LeRobotDatasetMetadata ) from lerobot.common.datasets.transforms import ImageTransforms from lerobot.common.datasets.utils import dataset_to_policy_features from lerobot.common.policies.pi0.configuration_pi0 import PI0Config from lerobot.common.policies.pi0.modeling_pi0 import PI0Policy from lerobot.configs.types import FeatureType if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--output_dir", type=str, default="outputs/train/pi0_pusht") parser.add_argument("--n_steps", type=int, default=10000) parser.add_argument("--batch_size", type=int, default=4) parser.add_argument("--optimizer_lr", type=float, default=2.5e-5) parser.add_argument("--optimizer_beta_1", type=float, default=0.9) parser.add_argument("--optimizer_beta_2", type=float, default=0.95) parser.add_argument("--optimizer_eps", type=float, default=1e-8) parser.add_argument("--optimizer_weight_decay", type=float, default=1e-10) parser.add_argument("--n_workers", type=int, default=4) parser.add_argument("--display_freq", type=int, default=100) parser.add_argument("--save_checkpoint_freq", type=int, default=10000) parser.add_argument("--device", type=str, default="cuda") args = parser.parse_args() for arg in vars(args): print(f"{arg}: {getattr(args, arg)}") os.makedirs(args.output_dir, exist_ok=True) # Select your device device = torch.device(args.device) # Get input-output features in dataset to fine-tuning properly input-output shapes of p0 model dataset_metadata = LeRobotDatasetMetadata("lerobot/pusht") features = dataset_to_policy_features(dataset_metadata.features) output_features = { key: ft for key, ft in features.items() if ft.type is FeatureType.ACTION } input_features = { key: ft for key, ft in features.items() if key not in output_features } print("output_features: ", output_features) print("input_features: ", input_features) # Define p0 model (policy) # class PI0Policy fine-tuninig input-output liner layer shapes by input_features and output_features. train_cfg = PI0Config( input_features=input_features, output_features=output_features ) policy = PI0Policy(train_cfg, dataset_stats=dataset_metadata.stats) policy.to(device) policy.train() # Define Transform for data-augmentation image_transforms = v2.Compose( [ v2.ColorJitter(brightness=(0.5, 1.5)), v2.ColorJitter(saturation=(0.5, 1.5)), v2.ColorJitter(contrast=(0.5, 1.5)), v2.ColorJitter(hue=(-0.1, 0.1)), v2.RandomAdjustSharpness(sharpness_factor=2, p=1), ] ) # Load dataset dataset = LeRobotDataset( "lerobot/pusht", image_transforms=image_transforms, # need specific time period input-output data when train # In p0 model, only current timestep [0.0] is used for input-output data delta_timestamps={ # pusht dataset and environment has image, state for input "observation.image": ( [i / dataset_metadata.fps for i in train_cfg.observation_delta_indices] if train_cfg.observation_delta_indices else [0.0] ), "observation.state": ( [i / dataset_metadata.fps for i in train_cfg.observation_delta_indices] if train_cfg.observation_delta_indices else [0.0] ), # pusht dataset and environment has action for output "action": ( [i / dataset_metadata.fps for i in train_cfg.action_delta_indices] if train_cfg.action_delta_indices else [0.0] ), }, ) # Define dataloader dataloader = torch.utils.data.DataLoader( dataset, batch_size=args.batch_size, num_workers=args.n_workers, shuffle=True, pin_memory=device.type != "cpu", drop_last=True, ) # define optimizer optimizer = torch.optim.Adam( policy.parameters(), lr=args.optimizer_lr, betas=(args.optimizer_beta_1, args.optimizer_beta_2), eps=args.optimizer_eps, weight_decay=args.optimizer_weight_decay, ) # Run training loop. step = 0 done = False while not done: for batch in dataloader: # set input tensor # In p0 model, use "task" as input text, so add "task" to batch batch = { k: (v.to(device) if isinstance(v, torch.Tensor) else v) for k, v in batch.items() } batch["task"] = [ "Push the block to the target position\n" ] * args.batch_size if "observation.state" in batch and batch["observation.state"].ndim == 3: # convert [batch_size, 1, state_dim] -> [batch_size, state_dim] shapes batch["observation.state"] = batch["observation.state"].squeeze(1) if step == 0: print("batch.keys: ", batch.keys()) for k, v in batch.items(): if isinstance(v, torch.Tensor): print(f"{k} shape: {v.shape}") elif isinstance(v, list) and k == "task": print(f"{k}: {v[0]}") # send input tensor to p0 model and calculate loss loss, _ = policy.forward(batch) # calucurate loss gradient with back propagation loss.backward() # update model network weights optimizer.step() # reset gradient optimizer.zero_grad() if step % args.display_freq == 0: print(f"step: {step} loss: {loss.item():.5f}") if step % args.save_checkpoint_freq == 0: output_dir = os.path.join(args.output_dir, f"{step}") os.makedirs(output_dir, exist_ok=True) policy.save_pretrained(output_dir) step += 1 if step >= args.n_steps: done = True break # Save a policy checkpoint. output_dir = os.path.join(args.output_dir, f"last") os.makedirs(output_dir, exist_ok=True) policy.save_pretrained(output_dir)
基本的には、PyTorch を使用したいつもの機械学習モデルの学習コードのように{モデル定義 → データセット読み込み → データローダー定義 → Optimizer 定義}した後に、学習ループ内で{学習用データセットのバッチをロード → 入力バッチをモデルに流す(forward) → 損失関数の loss 値計算 → loss 値の勾配計算 → Optimizer に従って誤差逆伝播法でモデルのネットワーク重み更新}を繰り返す流れになります。
その他ポイントは、以下のようになります。
π0 モデル定義
π0 モデルの定義は、LeRobot で提供している
PI0Policy
というπ0 モデルのネットワーク構造を定義したクラスを使用して行っています。from lerobot.common.policies.pi0.configuration_pi0 import PI0Config from lerobot.common.policies.pi0.modeling_pi0 import PI0Policy train_cfg = PI0Config( input_features=input_features, output_features=output_features ) policy = PI0Policy(train_cfg, dataset_stats=dataset_metadata.stats) policy.to(device) policy.train()
pushT 用の入出力層の形状にファインチューニングしたいので、pusht データセットから読み込み入出力特徴量のデータを渡しています(このデータ渡しておけば、
PI0Policy
クラス内部でよしなに入出力層を形状をファインチューニングしてくれます)dataset_metadata = LeRobotDatasetMetadata("lerobot/pusht") features = dataset_to_policy_features(dataset_metadata.features) output_features = { key: ft for key, ft in features.items() if ft.type is FeatureType.ACTION } input_features = { key: ft for key, ft in features.items() if key not in output_features } print("output_features: ", output_features) print("input_features: ", input_features)
PI0Policy
のクラスは、以下の部分で実装されています。 lerobot/lerobot/common/policies/pi0/modeling_pi0.py at main · huggingface/lerobot · GitHub今回の記事では、モデル内部の実装の説明はしませんが、以下の記事で説明しているネットワーク内部の実装が気になる方はこの
PI0Policy
の実装コードをみてみると具体的なイメージがつかめると思います。(使用するだけなら見なくても全然OKです)学習用データセットのデータオーギュメント
特に学習用データセットの数が少ないケースでは、データオーギュメント(DA)を行って学習用データセットのかさ増しを行い、モデルの汎化性能を向上させることが効果的になります。
この学習スクリプトでは、学習用データセットに含まれる環境の画像に対して、以下コードのようにランダム色調変換(輝度・コントラスト・彩度・色相・シャープネス)の DA を行った上で、π0 モデルに入力しています。
image_transforms = v2.Compose( [ v2.ColorJitter(brightness=(0.5, 1.5)), v2.ColorJitter(saturation=(0.5, 1.5)), v2.ColorJitter(contrast=(0.5, 1.5)), v2.ColorJitter(hue=(-0.1, 0.1)), v2.RandomAdjustSharpness(sharpness_factor=2, p=1), ] )
以下のオリジナルの学習スクリプトの実装でも、デフォルトではランダム色調変換のみのDAが行われているようです。
lerobot/lerobot/common/datasets/transforms.py at main · huggingface/lerobot · GitHub
画像に対してのDAでは、通常ランダムアフィン変換のDAも行なうのが一般的ですが、今回のような模倣学習やその他強化学習のタスクにおいては、ランダムアフィン変換を行なうと空間的な位置関係が破綻して逆に品質が低下するケースがあるので行っていなのだと思います。
具体的には、画像を反転させると学習用データセットに含まれるロボットの状態(x,y)座標の空間的な位置関係が破綻するといった具合です(ロボットの状態も画像と同じく反転させるDAをかければ問題ないかもですが、特に検証してないです)
この DA 手法を改善することでπ0 モデルの汎化性能を比較的容易に向上させれる気がしていますが、本記事では詳細は述べません。
モデルをシミュレーター上で動かす(推論する)
LeRobot ではモデルの推論用スクリプトも公開しています。
lerobot/lerobot/scripts/eval.py at main · huggingface/lerobot · GitHub
この推論用スクリプトを利用すれば、簡単にモデルをシミュレーター環境上で動かす(推論させる)ことができます。 pusht 用にファインチューニングした π0 モデルで推論するコマンドは、以下のようになります
python lerobot/scripts/eval.py \ --policy.path=outputs/train/2025-05-13/09-50-05_pusht_pi0/checkpoints/last/pretrained_model \ --output_dir=outputs/eval/pi0_pusht \ --env.type=pusht \ --eval.batch_size=10 \ --eval.n_episodes=10 \ --policy.device=cuda
各引数の意味は以下のようになります
--policy.path
:上記「π0モデルのファインチューニング」での学習スクリプト実行後に出力される学習済みモデルのチェックポイントパスを指定してください--env.type=pusht
:pusht 用のシミュレーター環境で推論を行なう—-eval.n_episodes
:エピソードの最大試行回数
以上のようにこの推論用スクリプトを利用すれば、簡単にモデルをシミュレーター環境で動かしてみることができます。 とはいえこの推論スクリプトをそのまま利用するだけでは何をやっているかわからないと思うので、以下にもっと簡略化した推論スクリプト例も公開しておきます。
import argparse import os import gym_pusht # noqa: F401 import gymnasium as gym import imageio import numpy as np import torch import lerobot from lerobot.common.policies.pi0.modeling_pi0 import PI0Policy if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--output_dir", type=str, default="outputs/eval/pi0_pusht") parser.add_argument( "--load_checkpoint_dir", type=str, default="../checkpoints/09-50-05_pusht_pi0/checkpoints/last/pretrained_model", ) parser.add_argument("--max_episode_steps", type=int, default=500) parser.add_argument("--seed", type=int, default=42) parser.add_argument("--gpu_id", type=int, default=0) args = parser.parse_args() for arg in vars(args): print(f"{arg}: {getattr(args, arg)}") os.makedirs(args.output_dir, exist_ok=True) if args.gpu_id < 0: device = "cpu" else: device = "cuda" os.environ["CUDA_VISIBLE_DEVICES"] = str(args.gpu_id) np.random.seed(args.seed) torch.manual_seed(args.seed) # Define simulation environment with PushT env = gym.make( "gym_pusht/PushT-v0", # 観測データ(observation) は、ロボットの x,y位置(pos) + 環境の画像(pixels) obs_type="pixels_agent_pos", max_episode_steps=args.max_episode_steps, ) observation_np, info = env.reset(seed=args.seed) print("env.observation_space:", env.observation_space) print("env.action_space:", env.action_space) # Load pre-trained p0 model (policy) # This pre-trained model needs to finetune with the PushT environment input-output feature shapes. policy = PI0Policy.from_pretrained( args.load_checkpoint_dir, strict=False, ) policy.reset() print("Policy config:", vars(policy.config)) print("policy.config.input_features:", policy.config.input_features) print("policy.config.output_features:", policy.config.output_features) # ----------------------------------------------- # Infer p0 policy with simulation environment # ----------------------------------------------- rewards = [] frames = [] step = 0 done = False # Render initial frame frames.append(env.render()) while not done: # pusht environment has x-y position of the agent as the observation state = torch.from_numpy(observation_np["agent_pos"]).to(device) state = state.to(torch.float32) state = state.unsqueeze(0) # pusht environment has RGB image of the environment as the observation image = torch.from_numpy(observation_np["pixels"]).to(device) image = image.to(torch.float32) / 255 image = image.permute(2, 0, 1) image = image.unsqueeze(0) # set observation for the pretrained p0-policy with pusht dataset observation = { # agent's x-y position "observation.state": state, # environment's RGB image "observation.image": image, # agent's control instruction text # same as the text in the training dataset of `lerobot/pusht` "task": ["Push the T-shaped block onto the T-shaped target."], } # infer the next action based on the p0-policy with torch.inference_mode(): action = policy.select_action(observation) # step through the simulation environment and receive a new observation action_np = action.squeeze(0).to("cpu").numpy() observation_np, reward, terminated, truncated, info = env.step(action_np) print(f"{step=} {reward=} {terminated=}") # render the environment frames.append(env.render()) # keep track of all the rewards rewards.append(reward) # finish inference when the success state is reached (i.e. terminated is True), # or the maximum number of iterations is reached (i.e. truncated is True) done = terminated | truncated | done step += 1 if terminated: print("Success!") else: print("Failure!") # Get fps of simulation environment fps = env.metadata["render_fps"] # save the simulation frames as a video video_path = os.path.join(args.output_dir, f"eval_frames.mp4") imageio.mimsave(str(video_path), np.stack(frames), fps=fps) print(f"Video of the evaluation is available in '{video_path}'.")
基本的には、PyTorch を使用したいつもの機械学習モデルの推論コードのように{モデル定義 → 学習済みチェックポイント読み込み → モデルを推論モードにする → 推論時の入力データをモデルに流し推論}という流れになります。
但し、シミュレーション環境を使用しながら推論するのが、他の一般的な機械学習の推論スクリプトとの相違点になります。
まず、いつもの推論スクリプトのように学習済みチェックポイントから π0 モデルを読み込みます。(policy という変数名にしているのは、π0 モデルも他の強化学習モデルと同じく行動方策 policy をモデル化したモデルになっているためです)
policy = PI0Policy.from_pretrained(
args.load_checkpoint_dir,
strict=False,
)
今回は Gymnasium でのシミュレーターを使用しているので、以下のような gym.make()
メソッドでシミュレーター環境を定義します。
env = gym.make( "gym_pusht/PushT-v0", obs_type="pixels_agent_pos", max_episode_steps=args.max_episode_steps, )
pusht タスクのシミュレーター環境を使用する場合は、gym_pusht/PushT-v0
を指定します。
今回の pusht 用学習用データセットには、入力データとしてロボットの x,y 座標と環境の画像データが含まれており、この入力データでモデルをファインチューニングしました。
そのため obs_type="pixels_agent_pos"
を指定して、環境からの観測データ(observation)としてロボットの x,y 座標と環境の画像を取得できるようにし、学習時と推論時で同じ入力の shape や意味合いになるようにします。
次に reset()
メソッドを実行するとシミュレーター環境を初期化し、初回時間ステップにおける環境からの観測データ(observation)が得られます。
observation_np, info = env.reset(seed=args.seed)
この observation には、ロボットの x,y 座標と環境の画像データが含まれるので、これをπ0 モデルで推論可能な PyTorch の Tensor 型に変換したうえで入力し、モデルの推論を行います。
推論時の出力データは、次の時間ステップにおけるロボットの行動(action)になります。確率分布で表現される行動方策のモデル(policy)に入力データ(observation)を入力し、最も確率の高い行動(action)が出力されるイメージです。
# aloha environment has x-y position of the agent as the observation state = torch.from_numpy(observation_np["agent_pos"]).to(device) state = state.to(torch.float32) state = state.unsqueeze(0) # aloha environment has RGB image of the environment as the observation image = torch.from_numpy(observation_np["pixels"]["top"]).to(device) image = image.to(torch.float32) / 255 image = image.permute(2, 0, 1) image = image.unsqueeze(0) # p0-policy expects the following observation format observation = { # agent's x-y position "observation.state": state, # environment's RGB image "observation.images.top": image, # agent's control instruction text # same as the text in the training dataset of `lerobot/pusht` "task": ["Push the T-shaped block onto the T-shaped target."], } # infer the next action based on the p0-policy with torch.inference_mode(): action = policy.select_action(observation)
次に、step()
メソッドを実行すると、引数で指定されたロボットの行動(action)を元に1ステップ分シミュレーターを動かします。このとき次回ステップにおける観測データ(observation)が再度シミュレーターから得られるので、こらら入力データで π0 モデルを再度推論といったループを繰り返します。
# step through the simulation environment and receive a new observation action_np = action.squeeze(0).to("cpu").numpy() observation_np, reward, terminated, truncated, info = env.step(action_np)
render()
メソッドを実行するとシミュレーター環境の現在の時間ステップにおけるフレーム画像がレンダリングされます。
frames.append(env.render())
毎時間ステップ毎にこのフレーム画像をレンダリングして、これらフレーム画像を動画として結合することで、以下のような動画ファイルとして出力しています。
学習用ステップ数(1000)でファインチューニングしたπ0 モデルの場合
学習用ステップ数(10000)でファインチューニングしたπ0 モデルの場合
学習用ステップ数(60000)でファインチューニングしたπ0 モデルの場合
学習ステップが少ないうちはうまく pusht タスク(T 文字形状の隙間にT文字形状のオブジェクトをロボットが動かしながらはめ込むタスク)を成功させることができませんが、学習ステップを増やしていくと徐々にタスクを成功させることができるようになっています!
aloha シミュレーター環境上でπ0を動かす
ここまでは簡単のため、まずは pusht という非常に単純な2次元のシミュレーション環境で π0 モデルを動かしてみました。 今度はもう少しロボティクスらしいシミュレーション環境として、以下のような aloha というロボットを使用したオブジェクトの空洞部分に別のオブジェクトを差し込むタスクで動かしてみます。
とはいえ基本的な動かし方は pusht のときと同じ流れになります。
学習用データセット
今回のタスク場合も、LeRobot で学習用データセットが公開されているので、今回の記事ではこれをそのまま利用します。
この学習用データセットの中身(あるエピソードの中のある時間ステップのデータ)は、以下のようになっています。
{ # 環境の画像 'observation.images.top': tensor([[[0., 0., 0., ..., 0., 0., 0.], [0., 0., 0., ..., 0., 0., 0.], [0., 0., 0., ..., 0., 0., 0.], ..., [0., 0., 0., ..., 0., 0., 0.], [0., 0., 0., ..., 0., 0., 0.], [0., 0., 0., ..., 0., 0., 0.]]]), # ロボットの状態(state): アーム関節角度や位置情報など 'observation.state': tensor([ 0.0000, -0.9600, 1.1600, 0.0000, -0.3000, 0.0000, 0.0000, 0.0000, -0.9600, 1.1600, 0.0000, -0.3000, 0.0000, 0.0000]), # ロボットの行動(action): ロボットが次に実行するアクション(関節の目標位置や速度など) 'action': tensor([ 0.0092, -0.9342, 1.1735, -0.0046, -0.3114, -0.0061, 0.1436, -0.0061, -0.9587, 1.1582, 0.0138, -0.2991, 0.0337, -0.0312]), # エピソードの値 'episode_index': tensor(0), # フレーム(時間ステップ) 'frame_index': tensor(0), # 時間情報 'timestamp': tensor(0.), # エピソードが終了したかどうかのフラグ 'next.done': tensor(False), # データセット内の絶対的なインデックス 'index': tensor(0), # タスクの種類を示すインデックス 'task_index': tensor(0), # ロボットへの制御指示テキスト 'task': 'Insert the peg into the socket.' }
モデルのファインチューニング
pusht のときと同じく、LeRobot で提供している学習用スクリプトを使用すれば、簡単にモデルをファインチューニングすることができます。
lerobot/lerobot/scripts/train.py at main · huggingface/lerobot · GitHub
この aloha タスク用データセットで π0 モデルをファインチューニングするコマンドは、以下のようになります
python lerobot/scripts/train.py \ --policy.path=lerobot/pi0 \ --dataset.repo_id=lerobot/aloha_sim_insertion_human_image \ --env.type=aloha \ --batch_size=2 \ --num_workers=2 \ --steps=100000 \ --policy.device=cuda
学習が完了すると、ディレクトリ以下に学習済みモデルのチェックポイントが保存されるので、後段の推論ステップでこれを利用します
モデルをシミュレーター上で動かす(推論する)
pusht のときと同じく、LeRobot で提供している推論用スクリプトを使用すれば、簡単にモデルをシミュレーター環境上で動かすことができます。
https://github.com/huggingface/lerobot/blob/main/lerobot/scripts/eval.py
aloha タスク用にファインチューニングした π0 モデルで推論するコマンドは、以下のようになります。
python lerobot/scripts/eval.py \ --policy.path=outputs/train/2025-05-13/07-32-44_aloha_pi0/checkpoints/last/pretrained_model \ --output_dir=outputs/eval/pi0_aloha \ --env.type=aloha \ --eval.batch_size=10 \ --eval.n_episodes=10 \ --policy.device=cuda
pusht のときと同じく簡略化した推論スクリプト例も公開しておきます。*1
import argparse import os import gym_aloha # noqa: F401 import gymnasium as gym import imageio import numpy as np import torch import lerobot from lerobot.common.policies.pi0.modeling_pi0 import PI0Policy if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--output_dir", type=str, default="outputs/eval/pi0_aloha") parser.add_argument( "--load_checkpoint_dir", type=str, default="../checkpoints/08-12-47_aloha_pi0/checkpoints/last/pretrained_model", ) parser.add_argument("--max_episode_steps", type=int, default=1000) parser.add_argument("--seed", type=int, default=42) parser.add_argument("--gpu_id", type=int, default=0) args = parser.parse_args() for arg in vars(args): print(f"{arg}: {getattr(args, arg)}") os.makedirs(args.output_dir, exist_ok=True) if args.gpu_id < 0: device = "cpu" else: device = "cuda" os.environ["CUDA_VISIBLE_DEVICES"] = str(args.gpu_id) np.random.seed(args.seed) torch.manual_seed(args.seed) # Define simulation environment with AlohaInsertion-v0 os.environ["MUJOCO_GL"] = "egl" env = gym.make( "gym_aloha/AlohaInsertion-v0", obs_type="pixels_agent_pos", max_episode_steps=args.max_episode_steps, ) observation_np, info = env.reset(seed=args.seed) print("env.observation_space:", env.observation_space) print("env.action_space:", env.action_space) # Load pre-trained p0 model (policy) # This pre-trained model needs to finetune with the AlohaInsertion environment input-output feature shapes. policy = PI0Policy.from_pretrained( args.load_checkpoint_dir, strict=False, ) policy.reset() print("Policy config:", vars(policy.config)) print("policy.config.input_features:", policy.config.input_features) print("policy.config.output_features:", policy.config.output_features) # ----------------------------------------------- # Infer p0 policy with simulation environment # ----------------------------------------------- rewards = [] frames = [] step = 0 done = False # Render initial frame frames.append(env.render()) while not done: # aloha environment has x-y position of the agent as the observation state = torch.from_numpy(observation_np["agent_pos"]).to(device) state = state.to(torch.float32) state = state.unsqueeze(0) # aloha environment has RGB image of the environment as the observation image = torch.from_numpy(observation_np["pixels"]["top"]).to(device) image = image.to(torch.float32) / 255 image = image.permute(2, 0, 1) image = image.unsqueeze(0) # p0-policy expects the following observation format observation = { # agent's x-y position "observation.state": state, # environment's RGB image "observation.images.top": image, # agent's control instruction text "task": ["Insert the peg into the socket"], } # infer the next action based on the p0-policy with torch.inference_mode(): action = policy.select_action(observation) # step through the simulation environment and receive a new observation action_np = action.squeeze(0).to("cpu").numpy() observation_np, reward, terminated, truncated, info = env.step(action_np) print(f"{step=} {reward=} {terminated=}") # render the environment frames.append(env.render()) # keep track of all the rewards rewards.append(reward) # finish inference when the success state is reached (i.e. terminated is True), # or the maximum number of iterations is reached (i.e. truncated is True) done = terminated | truncated | done step += 1 if terminated: print("Success!") else: print("Failure!") # Get fps of simulation environment fps = env.metadata["render_fps"] # save the simulation frames as a video video_path = os.path.join(args.output_dir, f"eval_frames.mp4") imageio.mimsave(str(video_path), np.stack(frames), fps=fps) print(f"Video of the evaluation is available in '{video_path}'.")
今回は aloha 用のタスクのシミュレーション環境にしたいので gym.make()
メソッドでのシミュレーター環境の定義時に gym_aloha/AlohaInsertion-v0
を指定します。
env = gym.make( "gym_aloha/AlohaInsertion-v0", obs_type="pixels_agent_pos", max_episode_steps=args.max_episode_steps, )
これ以外は pusht のときと同じです
この推論スクリプトを実行すると、以下のような出力動画が得られます。
学習用ステップ数(1000)でファインチューニングしたπ0 モデルの場合
学習用ステップ数(20000)でファインチューニングしたπ0 モデルの場合
学習用ステップ数(100000)でファインチューニングしたπ0 モデルの場合
学習ステップが少ないうちはうまくタスクを成功させることができませんが、学習ステップを増やしていくと徐々にタスクを成功させることができるようになっています!
まとめ
今回の記事では、以前に中身を仕組みを調査したπ0モデルを実際にシミュレーション環境上で動かしてみました。 正直今回のような簡単なシミュレーション環境では、π0 を使用しなくともより軽量で単純な強化学習モデルでも動かせるのですが、LeRobot や Gymnasium の使用方法、π0 や他の強化学習モデルのシミュレーション環境上での動かし方を学べたのは良かったと思ってます。
今後機会あれば、Genesis等を使用したより高度なシミュレーション環境や実機でもπ0や他のVLAモデルを動かせるようにしてみようかと思ってます。
We Are Hiring!
株式会社ABEJAでは共に働く仲間を募集しています!
ロボティクスやLLMに興味ある方々!機械学習プロダクトに関わるフロントエンド開発バックエンド開発に興味ある方々! こちらの採用ページから是非ご応募くださいませ!
*1:尚、環境によっては Docker 環境内で aloha 用シミュレーターを使用した推論を行おうとすると、"gymnasium.error.DependencyNotInstalled: Cannot initialize a EGL device display. This likely means that your EGL driver does not support the > PLATFORM_DEVICE extension, which is required for creating a headless rendering context.. (HINT: you need to install mujoco" のようなグラフィックデバイス関連のエラーが発生するかもです。その場合は 非 docker 環境で推論を実行するようにしてください。