ABEJA Tech Blog

中の人の興味のある情報を発信していきます

Raspberry Pi で作るWebカメラ映像配信(物体検出機能付き)

はじめに

ABEJA 新卒エンジニアの和田です。

趣味で購入した Raspberry Pi が使われずに眠っていたため、USB 接続の Web カメラでリアルタイム映像を取得し、何か面白いことをしたいと思い立ちました。本記事では、低コストかつシンプルに映像入力環境を整える方法として、Raspberry Pi と USB カメラを用いたリアルタイム物体認識ストリーミングの構築手順を解説します。必要なのは Raspberry Pi 4 Model B と USB カメラだけ。数十分で映像のキャプチャから物体検出、ブラウザ配信までを一気に実現できます。この仕組みを作ることで、まず自宅に設置したカメラ映像から人物や荷物の動きを自動で検知し、必要に応じて通知を受け取るホーム監視システムを構築できます。

セットアップ

必要なもの

種類 数量 備考
Raspberry Pi 4 Model B 1台 4GB/8GB どちらでも可
microSDカード 1枚 16GB 以上推奨
USB-C 電源アダプタ 1台 5V/3A 以上
USB Webカメラ 1台 UVC 対応のもの
LANケーブルまたは Wi-Fi 1本/環境 ネットワーク接続用
PC/Mac/Linux 1台 Raspberry Pi Imager,SSH クライアント用
HDMI ケーブル/モニタ 任意 初回セットアップ時にあると便利
USB キーボード/マウス 任意 初回セットアップ時にあると便利

Raspberry Pi OS の書き込み

まずはPCにRaspberry Pi Imagerをインストールします。Imagerを起動したら、インストール先のRaspberry Pi本体、OSイメージ、そしてmicroSDカードを選択して書き込みを行います。書き込みオプションでWi-FiやSSHの設定を済ませた後、microSDカードをRaspberry Piに挿入して起動してください。

項目
OS Debian GNU/Linux 12 (bookworm)
Kernel 5.15.84-v7l+

USBカメラ認識の確認

以下を実行してUSBカメラが認識されており、Video4Linuxデバイスが生成されているか確かめます。

# 出力例に 'Webcam' または 'Camera' が含まれていることを確認
lsusb

# /dev/video0 が存在することを確認
ls /dev/video*

依存ライブラリ

パッケージ バージョン
Python 3.11.2
uv 0.7.13

環境セットアップ

TensorFlow Lite ランタイムが NumPy 1.x 向けにビルドされているため、numpyはバージョンを指定しています。

# プロジェクトディレクトリ作成&移動
mkdir ~/pi-object-detect && cd ~/pi-object-detect


# uv 初期化&依存追加
uv init
uv add flask opencv-python numpy<2.0 tflite-runtime
uv sync

モデル&ラベルのダウンロード

wget https://github.com/google-coral/test_data/raw/master/ssd_mobilenet_v2_coco_quant_postprocess.tflite -O detect.tflite
wget https://github.com/google-coral/test_data/raw/master/coco_labels.txt -O labelmap.txt

Step 1:Flask+MJPEG ストリーミングの骨子

今回は「Python だけで」「追加ビルド不要」「ブラウザ互換性抜群」「手軽に動く」というメリットを優先して MJPEG+Flask を選択しました。 Camera._update() を常時スレッドで回し、フレーム更新と配信を分離しています。そして、generate_mjpeg() は取得済み JPEG バイト列を連続返却します。

import cv2  
import threading  
from flask import Flask, Response  

app = Flask(__name__)

class Camera:
    def __init__(self):
        self.cap = cv2.VideoCapture(0)
        self.cap.set(cv2.CAP_PROP_FRAME_WIDTH, 640)
        self.cap.set(cv2.CAP_PROP_FRAME_HEIGHT, 480)
        self.frame = None
        threading.Thread(target=self._update, daemon=True).start()

    def _update(self):
        while True:
            ret, f = self.cap.read()
            if ret:
                self.frame = f

    def get_frame(self):
        if self.frame is None: return None
        _, j = cv2.imencode('.jpg', self.frame)
        return j.tobytes()

camera = Camera()

def generate_mjpeg():
    while True:
        frame = camera.get_frame()
        if frame:
            yield (b'--frame\r\n'
                   b'Content-Type: image/jpeg\r\n\r\n' + frame + b'\r\n')

@app.route('/stream')
def stream():
    return Response(generate_mjpeg(),
                    mimetype='multipart/x-mixed-replace; boundary=frame')

Step 2:TFLite 物体検出の組み込み

2-1 モデル/ラベルロード

import numpy as np
from tflite_runtime.interpreter import Interpreter

interpreter = Interpreter(model_path='detect.tflite')
interpreter.allocate_tensors()
id_in, id_out = interpreter.get_input_details(), interpreter.get_output_details()
h, w = id_in[0]['shape'][1:3]

with open('labelmap.txt') as f:
    labels = [l.strip() for l in f]

2-2 _update() 内の前処理~推論~描画

モデル入力サイズとデータ型を動的に取得します。スコア閾値は用途に応じて調整可能です。

# 既存の _update() 内、フレーム取得直後に追加
rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
resized = cv2.resize(rgb, (w, h))
data = np.expand_dims(resized, 0)
if id_in[0]['dtype'] == np.float32:
    data = (data - 127.5) / 127.5

interpreter.set_tensor(id_in[0]['index'], data)
interpreter.invoke()

boxes   = interpreter.get_tensor(id_out[0]['index'])[0]
classes = interpreter.get_tensor(id_out[1]['index'])[0]
scores  = interpreter.get_tensor(id_out[2]['index'])[0]

H, W, _ = frame.shape
for box, cls, sc in zip(boxes, classes, scores):
    if sc < 0.5: continue
    y1, x1, y2, x2 = box
    l, r = int(x1*W), int(x2*W)
    t, b = int(y1*H), int(y2*H)
    cv2.rectangle(frame, (l, t), (r, b), (0,255,0), 2)
    txt = f"{labels[int(cls)]}: {sc*100:.1f}%"
    cv2.putText(frame, txt, (l, t-5),
                cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0,255,0), 2)

Step 3:動作確認

uv run python app.py

ブラウザで http://<Pi_IP>:5000/stream にアクセスし、検出ボックス付き映像を確認します。

おわりに

本記事では、Raspberry Pi+USBカメラを使ったリアルタイム物体認識ストリーミングの構築手順をご紹介しました。 低コストで手軽に始められる環境を整え、 Flask+MJPEGによるシンプルなライブ配信の仕組みを実装し、 TensorFlow Liteを組み合わせて物体検出をリアルタイムに動作させました。 これらの手法を応用すれば、監視カメラやスマート家電、ロボットの視覚システムなど、さまざまなIoT/ロボット×AIアプリケーションに発展させることが可能です。

We Are Hiring!

ABEJAは、テクノロジーの社会実装に取り組んでいます。 技術はもちろん、技術をどのようにして社会やビジネスに組み込んでいくかを考えるのが好きな方は、下記採用ページからエントリーください! (新卒の方やインターンシップのエントリーもお待ちしております!)

careers.abejainc.com