ABEJA Tech Blog

中の人の興味のある情報を発信していきます

Celery入門──非同期処理・定期実行・可観測性まで解説

はじめに

ABEJA 新卒の和田です。

本稿では「重い処理をバックグラウンドに逃がし、ユーザー操作を速く保つ」ための仕組みとして Celery を紹介します。

私は新卒エンジニアとして入社してすぐ、プロジェクトにて、LLM技術を用いたデータ取得・加工パイプラインを作成のため、 Celery を使った非同期バッチ処理の開発に携わりました。最初は Celery のアーキテクチャや設定パラメータの意味がまったく理解できず、何度も詰まりました。そのたびに先輩やドキュメントを頼りに調べ、実際にモックを作成したりしながら理解を深めてきた経験を、本記事でまとめて共有したいと思ったのが執筆のきっかけです。

CeleryはPythonで書かれた分散タスクキューフレームワークであり、Python を用いたマイクロサービス開発においては、Celery は成熟した選択肢の一つです。シンプルな構成から大規模分散処理まで拡張でき、Redis や RabbitMQ との親和性も高く、ドキュメント・事例も豊富です。

ウェブアプリケーションやマイクロサービスでは、ユーザ操作の応答性能を担保しつつ、大量のバッチ処理や外部API呼び出し(例:LLM API による応答生成など)、レポート生成(10秒〜1分以上かかるような処理)を同時に走らせたい場面が多々あります。

「メインプロセスで時間のかかる処理はしない」「結果だけ後で受け取る」「決まった時間に定期実行」「複数タスクを順番・並列・集計で動かす」――これらを自前で作るのは骨が折れるうえにバグの温床になるケースがあります。Celery を使えば、わずかな設定とデコレータだけでこれらをシームレスに扱えます。

今回、私達が Celery を使ったのは、主に以下の2つの処理に対応するためでした。

  • 定期的に統計を集計する処理を、毎時・毎日などのスケジュールで実行したい(=Celery Beat)
  • 順番に依存関係のある複数ステップ処理を、確実に順序通りに実行したい(=Chain / Group)

こうした処理は「メインの処理の中に同期的に書く」ことも一応可能ではありますが、処理の遅延やスケーリングの難しさ、エラー時の再送制御などを考えると、専用のタスクキューで分離する方が遥かに安全かつ保守的です。

Celery は Python ネイティブに近い直感的な記法と運用のしやすさがあり、当時のチーム構成でも十分に自走できると判断し、採用に至りました。

しかし実際に数多くのパラメータを目にすると、「何をどうチューニングすればいいのか」「失敗回避のベストプラクティスは?」と迷いがちです。

本稿は、

  1. Celery の全体像 をざっくり把握したい人
  2. すでに Celery を動かしているが深い理解が足りない人
  3. これから導入を検討する人 向けに、最小限の基礎から運用・スケーリング・可観測性までカバーします。

1. Celery のアーキテクチャと三大要素

Celery は大きく次の3つで構成されます。

  1. Producer:アプリ本体。.delay() などでタスクをキューへ投入
  2. Broker:Redis/RabbitMQ など。タスクメッセージを蓄え、配信する
  3. Worker:受け取ったタスクメッセージを実行し、処理結果を Backend に保存
  4. Backend:Redis/DB/S3 など。完了したタスクの戻り値を保管し、Producer が後で取得

1.1 Broker の選び方

  • Redis
    • メモリ上キー・バリュー:高速・セットアップ簡単
    • 複数DB:redis://localhost:6379/0 はキュー、…/1 は結果用など
  • RabbitMQ
    • 高機能なメッセージング:Exchange、TTL、Dead Letter…
    • 複数シナリオで細かなルーティングが必要な場合に

1.2 Worker の実行モデル

プールタイプ 説明 向き
prefork マルチプロセス CPU バウンド
gevent コルーチン/イベントループ I/O バウンド
threads マルチスレッド 軽量並列
celery -A tasks worker \
  --loglevel=info \
  --concurrency=8 \
  --pool=gevent \
  --prefetch-multiplier=1 \
  --max-tasks-per-child=100 \
  --events
  • concurrency:同時実行プロセス数
  • prefetch-multiplier=1:ワーカー間でタスクが偏らないよう調整
  • max-tasks-per-child=100:一定回数処理後にプロセスを再起動し、メモリリークを抑制
  • -events:Flower やエクスポーター向けに内部イベントを出力

1.3 Backend の選択と結果管理

  • 保存先:Redis、RPC、SQLAlchemy(DB)、Amazon S3 など
  • TTL の指定がないと、結果データが無限に蓄積される
  • 結果不要タスクは @app.task(ignore_result=True) でストアをスキップ

2. シンプルタスクとリトライ戦略

2.1 タスクの定義と呼び出し方

まず、Celery ではタスクは @app.task デコレータを付けた関数で定義します。 定義したタスクは「非同期で投げる」または「ワークフローの一部として扱う」など、複数の呼び出し方ができます。

app = Celery('demo', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1')

@app.task
def add(x, y):
    return x + y

2.2 タスクの呼び出し方法:.delay() と .s() の違い

  • .delay():シンプルな非同期実行
from tasks import add

# 非同期でタスクを即時キューに投入
result = add.delay(2, 3)

# 結果を取得
print(result.get(timeout=10))  # => 5
  • 使いやすい糖衣構文(syntax sugar)
  • .apply_async(args=(2, 3)) と同じ動作
  • キューに即時投入され、バックグラウンドで処理される
  • .s():ワークフロー(Chain / Group / Chord)向けの「タスク署名」
from celery import chain

# 署名オブジェクト(signature)を作成
add_sig = add.s(4, 5) 

# ワークフロー構築に使える(例:Chain)
flow = chain(add.s(1, 2), add.s(10))  # (1 + 2) の結果を add(…, 10) に渡す
result = flow.apply_async()
print(result.get(timeout=10))  # => 13
  • 引数を固定したタスクの「テンプレート」を作るイメージ
  • Chain / Group / Chord などのワークフローで使用される

2.3 障害耐性:自動リトライ

from celery.exceptions import MaxRetriesExceededError

@app.task(bind=True, max_retries=3, default_retry_delay=5)
def fetch_url(self, url):
    try:
        return requests.get(url, timeout=3).json()
    except requests.RequestException as exc:
        try:
            self.retry(exc=exc)
        except MaxRetriesExceededError:
            self.get_logger().error(f"fetch_url failed: {url}")
            return None
  • bind=Trueself.retry(), self.request.id, self.get_logger() が使用可能
  • max_retriesdefault_retry_delay:再試行回数・間隔
  • 上限到達時に MaxRetriesExceededError を捕捉し、フォールバック処理が可能

3. 複雑ワークフロー:Chain・Group・Chord

from celery import chain, group, chord

# Chain:直列依存
chain(fetch.s(), process.s(), save.s()).apply_async()

# Group:並列タスク
res = group(add.s(1,2), add.s(3,4)).apply_async()
print(res.get())  # [3,7]

# Chord:並列後に集計
res = chord(group(add.s(1,2), add.s(3,4)), sum_results.s()).apply_async()
print(res.get())  # 10
  • Chain:依存関係のある連続処理
  • Group:非依存処理の並列化
  • Chord:並列処理完了後にまとめ処理

3.1 よくある落とし穴とその回避法

私自身も最初の実装で、タスクを .delay() で連続呼び出ししていたために、 task1(DBへの前提データ作成)が完了する前に task2(そのデータを参照する処理)が先に走ってしまい、IntegrityError が頻発しました。

当初は「Celery の順番が信用できない?」と戸惑ったのですが、これは当然の仕様で、 非同期実行は基本的に順序保証されないためです。

この問題は .s() で signature を明示し、group() で括ってから chain() に渡すことで解決しました。

# NG例:順序がバラバラ
task1.delay()
task2.delay()

# OK例:Group + Chainで順序を明示
chain(
    group(task1.s(), task2.s()),  # これで順序を管理
    finalize.s()
).apply_async()

4. 定期ジョブ:Celery Beat

from celery.schedules import crontab

app.conf.beat_schedule = {
    'cleanup-hourly': {
        'task': 'tasks.cleanup_temp',
        'schedule': crontab(minute=0),
    },
    'stats-every-30s': {
        'task': 'tasks.emit_stats',
        'schedule': 30.0,
    },
}
app.conf.timezone = 'Asia/Tokyo'

HA対策

  • RedBeat:Redisベースの分散ロック付きスケジューラ
  • django-celery-beat:Django管理画面でスケジュールを動的管理

5. 可観測性とアラート

5.1 JSON ログ

import logging.config
from logging_config import LOG_CONFIG

logging.config.dictConfig(LOG_CONFIG)
  • JSON一行ログを Fluentd/Elasticsearch/Kibana に流し込める

5.2 メトリクス

  • celery-prometheus-exporter(純Python版)
celery-prometheus-exporter --broker=${CELERY_BROKER_URL} --port=9119
  • Prometheusscrape_configs に追加
  • Grafana:公式テンプレート「Celery Overview」を活用

6. ローカルモック&スケーリング(Docker Compose)

version: '3.8'
services:
  redis: { image: redis:7-alpine }
  worker:
    build: .; command: celery -A tasks worker --events
    deploy: { replicas: 4 }
  beat:   { build: .; command: celery -A tasks beat }
  flower: { build: .; command: celery -A tasks flower --port=5555 }
  exporter:
    image: python:3.12-slim
    command: celery-prometheus-exporter --broker=${CELERY_BROKER_URL} --port=9119
  • ローカルで docker-compose up --scale worker=4 が手軽なスケール体験に最適

7. ベストプラクティスまとめ

  1. Broker/Backend 分離:同じ Redis 併用は避け、I/O 衝突を防止
  2. リトライ戦略宣言max_retriesdefault_retry_delay で障害耐性を確保
  3. プリフェッチ制御prefetch-multiplier=1 でワーカー間の負荷を均等化
  4. 時間制限soft_time_limit で優雅にクリーンアップ、time_limit で強制終了
  5. メモリ管理max_tasks_per_child を設定し、プロセス肥大化を回避
  6. 可観測性:JSONログ+Prometheus+Grafana+アラートで運用負荷を激減
  7. HA BeatRedBeatdjango-celery-beat で定期ジョブ重複防止

おわりに

Celery は設定項目が多岐に渡る分、パワフルかつ柔軟ですが、同時に “設定迷子” になりやすいミドルウェアです。本稿で示したアーキテクチャ理解、タスク設計パターン、運用・可観測性・スケーリング手法を土台に、自社サービスに最適化してみてください。 疑問や運用ノウハウのアップデートがあれば、ぜひコメントや社内勉強会で取り上げてみてください!

We Are Hiring!

ABEJAは、テクノロジーの社会実装に取り組んでいます。 技術はもちろん、技術をどのようにして社会やビジネスに組み込んでいくかを考えるのが好きな方は、下記採用ページからエントリーください! (新卒の方やインターンシップのエントリーもお待ちしております!)

careers.abejainc.com