ABEJA Tech Blog

中の人の興味のある情報を発信していきます

慣れてきたらチャレンジしてみよう!BigQueryのパフォーマンス最適化

自己紹介

こんにちは、真壁(@Takayoshi_ma)です。先日読んだGoogle Cloudの公式ドキュメントが個人的に勉強になったので内容についてまとめていくと共に、それを補助する形で色々と自分なりに追記していこうと思います。なお、公式ドキュメントで紹介されている幾つかの基礎的な部分については割愛させていただきました。

cloud.google.com

I/Oや通信的観点

cloud.google.com

DBのチューニングでいつも問題になってくるのがI/Oだと思います。ディスクへのアクセスが増えることで一気に作業効率が下がります。

分割されたクエリを取り除く

ここではベストプラクティスとしてパーティショニングのことに触れています。パーティショニングとは、データベースにおけるテーブル内のデータを分割して保持する機能であり、うまく利用することで不必要なデータを初めから除外し、パフォーマンスを向上させることができます。 BigQueryではテーブル作成時に指定しておけば、自動で _PARTITIONTIME という疑似列を追加し、ロード日単位でテーブルを分割してくれる機能も存在します。

-- 例
WHERE _PARTITIONTIME
BETWEEN TIMESTAMP("2016-01-01")
    AND TIMESTAMP("2016-01-31")

過剰なワイルドカード テーブルを避ける

これもよく使われると思います。個人的にこんな風に日付情報を語尾につけることで管理しているパターンをよく見ます。後述ですが、このようにして日付単位で細かくテーブルを分割することをシャーディングと言います。

`{project}.{dataset}.hoge_fuga_yyyymmdd`

このように管理することで複数のテーブル(もちろんスキーマに互換性があることが条件!)に対してクエリを投げることが可能となります。公式ではこのサフィックスを無秩序に命名してしまったり、意識せずに大量のテーブルにクエリを投げることで、予期せぬ挙動になってしまうことを危惧しています。

(下記文は公式からの引用)

たとえば `FROM bigquery-public-data.noaa_gsod.gsod194*` は、
ワイルドカードに一致するテーブルが少ないため、
`FROM bigquery-public-data.noaa_gsod.*` よりもパフォーマンスが良好になります。

テーブルを日付別にシャーディングすることを避ける & テーブルの過度な分割を回避する

先ほどチラッと出てきましたが、サフィックスに日付を入れてテーブルを分割したものをシャーディングテーブル(日付別テーブル)といいます。個人的にこれには利点もあり、例えば

  • 一部の日付テーブルの削除、入れ替えが用意
  • 特定の日付に対してのみ処理を行えるので高速

などが挙げられると思います。しかし公式では前述の_PARTITIONTIMEで代用できるところはそれでやった方がよりベストプラクティスとの見解を示しています。シャーディングされたテーブルでは、BigQueryで各分割のスキーマ、メタデータ、および権限を保持する必要が出てきてしまうのですが、この分割があまりに多すぎると、各分割の情報を保持する必要からオーバーヘッドの増加によりパフォーマンス低下につながる可能性があります。

JOIN を使用する前にデータを削減する

公式ドキュメントで書いていることをまとめると、GROUP BYJOINの両方を行う場合、その順番が問われないのであれば、先にGROUP BYをしてレコード数を減らした上で、後半にJOINをした方が良いよってことでした。 まあ、それ自体はそうだろうなといった感じの人も多いと思うので、一応ここでは改めてJOINについて深掘りしていきます。必要ないよって方は飛ばしてください。

RDBMSの学習をしていると、

結合には以下の3つが存在しており、明示的に選択せずともオプティマイザの方でより最適化なアルゴリズムを選択している。

みたいなことが始めの方に出てくるかと思います(一部のRDSMBでは明示的にアルゴリズムを選択することも可能)。

頭の整理のために一応メモしておくと

  • Nested Loop
    • 手順
      • 駆動表にある全行($M$行)に対して、内部表($N$行)から結合キー列の値がマッチするものを探して結合する。よって普通に処理すると $O(MN)$
    • 特徴
      • 小さいテーブルを駆動表にする、かつ内部表にある結合キーに対してインデックスを付与することで容易に検索できるようになるため、処理が高速化される
  • Hash
    • 手順
      • 結合する2つのテーブルの結合キーからハッシュテーブルを作成する
      • 同じハッシュ値同士を結合する。ハッシュ化したことにより探索が高速
    • 特徴
      • 大きなテーブル同士の結合に対して有利になりやすい
      • 特にインデックスを付与せずとも高速に働く
      • 一度にハッシュテーブルを作成する必要があり、メモリを多く消費する。メモリが溢れた場合、TEMP落ちが発生、余計なI/Oが発生することで処理が遅くなる
      • ハッシュ値で比較することになるので等価結合($=$)でしか使えない、要は $<$ や $>$ などが使えない
  • Sort merge
    • 手順
      • 結合する2つのテーブルの結合キーでソートする
      • 大小が固定された状態で上から順に2つのテーブルを探索していく
    • 特徴
      • 大きなテーブル同士の結合に対して有利になりやすい
      • ソートの処理自体が重く、多くのメモリを消費する
      • 始めからソートされている状態でないと、あまり効率的でない

上記はRDBMSの復習でしたが、これを頭に入れていた方がBigQueryで使用される結合アルゴリズムの理解も進みます。BigQueryのオプティマイザでは、最適なクエリプランになるよう内部でどういったアルゴリズムを選択するのかコントロールしています。

cloud.google.com

上のページで以下の2つの結合の紹介がされています。分かり易くまとめると以下のような手順

  • ブロードキャスト結合
    • 小さいテーブルのデータが、BigQueryのワーカーノードにコピーされます。これにより、すべてのワーカーノードが小さいテーブルの全データを持つことになります。
    • 各ワーカーノードは、大きいテーブルのデータを一部分ずつ処理します。このとき、各ワーカーノードは自分が持っている小さいテーブルのデータを使って結合を実行します。
    • 最後に、各ワーカーノードの結果が集約されて最終結果が得られます。

ブロードキャスト結合の利点は、大きいテーブルのデータを複数のワーカーノードに分散して処理できるため、結合処理が高速に行われることです。また、小さいテーブルのデータがすべてのワーカーノードにコピーされるため、ネットワークを介したデータのやり取りが少なくなります。ただし、ブロードキャスト結合は、小さいテーブルが非常に大きくなると効率が悪くなります。なぜなら、すべてのワーカーノードに小さいテーブルのデータをコピーするため、メモリ使用量が増加し、ネットワーク帯域幅も圧迫されるからです。

  • ハッシュ結合
    • まず、結合キー(結合条件に使用される列)に基づいて、両方のテーブルのデータをハッシュ関数を使ってパーティションに分割します。これにより、同じハッシュ値を持つ行は同じパーティションに格納されます。
    • 各パーティションは、BigQueryのワーカーノードに割り当てられます。ワーカーノードは、自分の担当するパーティションのデータをメモリ上にハッシュテーブルとして構築します。
    • 次に、各ワーカーノードは、もう一方のテーブルのデータを読み取り、ハッシュテーブルを使って結合キーが一致する行を探し、結合を行います。
    • 最後に、各ワーカーノードの結果が集約されて最終結果が得られます。

ハッシュ結合の利点は、結合キーに基づいてデータをパーティションに分割するため、結合処理が高速に行われることです。また、ハッシュテーブルを使うことで、結合キーが一致する行を効率的に探すことができます。ただし、ハッシュ結合は、両方のテーブルのデータがワーカーノードのメモリに収まらない場合には効率が悪くなります。

また、ハッシュ結合で行われるワーカーノード間で効率的に分散処理するための手法をシャッフルと言いますが、例えばこれは大きなテーブルをGROUP BY(集約)する際にも使用されます。 シャッフルは、大規模なデータ処理タスクを効率的に実行するために重要な役割を果たしています。ただし、シャッフルにはネットワークのオーバーヘッドが伴うため、できるだけ最小限に抑えることが望ましいです。脳死で大きなテーブルの結合や集計を行う前に、クエリの最適化や、適切なクラスタリングとパーティショニングを使用することで、シャッフルのオーバーヘッドを削減できないだろうか検討することが非常に重要になってきます。

コンピューティング的観点

cloud.google.com

結合パターンを最適化する

先ほども出てきましたが、BigQueryのオプティマイザではその都度最適な結合アルゴリズムを選択していますが、それを最大限生かすためのTipsが紹介されていました。具体的にはデータを結合する順序で、大きいテーブルから始めていくと良い可能性があるとのことです。 大きなテーブルから順に結合していくことで以下のようなメリットがあります。

  • ブロードキャスト結合の効率化:

    • ブロードキャスト結合は、片方のテーブルが小さい場合に効率的な結合手法です。大きなテーブルから順に結合していくことで、結合対象のテーブルが徐々に小さくなるため、ブロードキャスト結合が効果的に利用できます。小さいテーブルをすべてのワーカーノードにコピーすることで、各ワーカーは大きなテーブルのデータの一部を処理しながら結合を行います。これにより、結合処理が高速化されます。
  • データのフィルタリングとシャッフルの削減:

    • 大きなテーブルから順に結合していくことで、結合処理によってデータがフィルタリングされ、次の結合処理で扱うデータ量が徐々に減少します。これにより、ワーカーノード間でのデータのシャッフル(再分配)が削減され、ネットワークのオーバーヘッドが低減されます。シャッフルが減少すると、クエリの実行時間も短縮され、パフォーマンスが向上します。

結合で INT64 データ型を使用して費用を削減し、比較パフォーマンスを向上させる

BigQueryでは主キーがインデックス化されていないため、文字列の検索に時間がかかります。特に長い文字列だとその分手間がかかります。そこで結合に使用するキーをなるべくINT64にすることで、より低コストで高いパフォーマンスを発揮することが可能です。

同じ共通テーブル式(CTE)を複数回評価するのを避ける

いわゆるWITH句の話です。WITH句(共通テーブル式、CTE)を使用し、その結果が複数回参照される場合、その評価が一度だけ行われるのか、それとも複数回行われるのかは、BigQueryのオプティマイザに依存しますが、基本的にBigQueryではCTEは各参照毎に行われることが多いようです。(おそらくメモリの関係?)なので、場合によりけりですが、一時テーブルとして時限的なテーブルを実体化しておくことも検討した方が良い場合もあると公式では述べています。

クエリのアンチパターン

cloud.google.com

自己結合

安易に結合に走るのでなく、window関数を使うことを検討した方が良いと公式では述べています。SQLっぽい。 言葉だけだと説明しづらいので具体例を挙げていきます。

create or replace table `myproject.mydataset.player`
(
  id int64
  , name string
  , height int64
);

insert into `myproject.mydataset.player`
values
  (1, 'ryota', 168),
  (2, 'hisashi', 184),
  (3, 'kaede', 187),
  (4, 'hanamichi', 189),
  (5, 'takenori', 197)
;

このテーブルから身長のランキングを取得してみます。これくらい簡単な例だと自己結合よりもウィンドウ関数を使うやり方の方が真っ先に思いつくかと思いますが、、、

-- 自己結合ver
select
  p1.id
  , p1.name
  , p1.height
  , count(p2.height) as rank
from
  `myproject.mydataset.player` p1
left join
  `myproject.mydataset.player` p2
  on p1.height <= p2.height
group by
  p1.id, p1.name, p1.height
order by
  p1.height desc;
-- window関数 ver
select
  id
  , name
  , height
  , rank() over (order by height desc) as rank
from
  `myproject.mydataset.player`
order by
  height desc

自己結合時のクエリプランを確認すると一目瞭然ですが、結合時に行数がどうしても多くなってしまう一方、window関数ではそれが抑え切れています。尚、時間に関してはサンプルのテーブルが小さいためいまいち分かりづらいですが、大きなテーブルだと差が出てくる傾向にあります。

自己結合 分析関数

もう一つ、より実践的な例を取り上げます。下記テーブルはDAU(Daily Active Users)に関する表ですが、ここに前日にアクセスがあったかどうかのフラグ列を追加していきます。

create or replace table `myproject.mydataset.dau`
(
  access_date datetime
  , user_id int64
);

insert into `myproject.mydataset.dau`
values
  ('2023-05-03', 3928),
  ('2023-05-03', 308),
  ('2023-05-03', 732),
  ('2023-05-03', 9811),
  ('2023-05-03', 1782),
  ('2023-05-03', 3795),
  ('2023-05-02', 9811),
  ('2023-05-02', 732),
  ('2023-05-02', 3795),
  ('2023-05-01', 8830),
  ('2023-05-01', 9811),
  ('2023-05-01', 308),
  ('2023-05-01', 7408),
  ('2023-05-01', 2200)
;

/*
DAUテーブル
PK: access_date, user_id
*/
select *
from `myproject.mydataset.dau`
order by access_date desc
;

まず始めに自己結合を使ったやり方です。自己結合を行う際に、同じuser_idかつ+1シフトしたaccess_dateと紐づけることで、前日にアクセスがあったかどうかのフラグを立てます。 直感的にも理解しやすいためSQLに慣れてない人だと真っ先に思いつく方法な気がします。

select
  d1.user_id
  , d1.access_date
  , d2.user_id is not null as is_access_yesterday
from
  `myproject.mydataset.dau` d1
  left join `myproject.mydataset.dau` d2
    on (
      d1.user_id = d2.user_id
      and d1.access_date = date_add(d2.access_date, interval 1 day)
    )
order by
  user_id, d1.access_date desc, is_access_yesterday

これをwindow関数でやろうとするとこうなります。user_idでカットしたうえで、ウィンドウ関数LAGを適用し、一つ前のaccess_dateを取得します。取得されたaccess_dateが自身のaccess_dateの前日であればフラグが立ちます。

select
  user_id
  , access_date
  , case
      when lag(access_date) over (partition by user_id order by access_date) = date_sub(access_date, interval 1 day) then true
      else false
    end as is_access_yesterday
from
  `myproject.mydataset.dau`
order by
  user_id, access_date desc

一般的な手続型の言語と違い、宣言型の一つであるSQLは、文よりも式中心で書いていったほうがスマートになる傾向がありますが、上の例はまさにCASE式や関数を用いて書いてるあたり非常にSQLチックだなと思います。

データスキュー

例えば、user_id単位でカットを行う際やJOIN句での結合を行う際、一部の値だけサイズが大きいとそこがボトルネックとなってしまうと公式は述べています。具体的にはuser_idでカウントをとった結果が以下のような場合

select
  user_id
  , count(user_id) as cnt
from `myproject.mydataset.sample`
group by user_id
order by user_id

圧倒的にnullに偏っていることが分かります。GitQueryではスロット間でパーティションを共有することができません。そのため一部のパーティションのサイズが大きくなってしまい処理に時間がかかってしまいます。またリソースがあまりに大きいとresources exceedエラーが発生したり、スロットのシャッフル制限(メモリ圧縮の2TB)に達した場合もディスクへの書き込みI/Oが発生してしまったりするなど更にパフォーマンスが低下する可能性があります。そこでこういった場合、可能であればできるだけ早い段階でデータをフィルタ処理することが最適であると述べています。

公式にはない項目ですが個人的に気になったので

上の章のページを見ていた際に、公式で以下の例が出ていました。前者よりも後者の方が効率的だよねって話。まあこれ自体は改めての解説も必要ないかと思います。

-- 前者
table1.my_id NOT IN (
  SELECT my_id
  FROM table2
  );

-- 後者
table1.my_id NOT IN (
  SELECT DISTINCT my_id
  FROM table2
  )

そこで、上のクエリを眺めてて思ったのですが、存在判定でよく使われる手法に、EXISTSを使った相関サブクエリ、NOT INを使った判定、あとは結合などの手法があるかと思います。簡単な例でイメージだけ書くと(それぞれの手法がどのようなステップを踏んでいるかの詳細については、このブログでは割愛させていただきます。)

-- 外部結合パターンで存在しないキーを判定パターン
select id
from a
left join b
  on a.id = b.id
where
  b.id is null

-- 相関サブクエリで1行1行存在判定パターン
select id
from a
where not exists (
  select 1 from b
  where a.id = b.id
)

-- 個人的にこれはあまり良くないイメージあったんだけどそうでもないらしい?
select id from a
except
select id from b

-- NOT INを使う方法、INとは違い全てのidを順にアクセスしていく必要があり、一般的に遅いと言われている
select id
from a
where id not in (
  select id from b
)

BigQueryだとこの辺ってどうなってるんだっけか?と気になったので気になってググったところ、この記事を見つけました。

zenn.dev

結果だけまとめると

  • 差分を求めたい場合

    • NOT INのパフォーマンスが悪かった
    • LEFT JOINNOT EXISTSEXCEPTは同程度
  • 積集合を求めたい場合

    • INNER JOIN EXISTS INTERSECT IN全てステージ数、シャッフルされたバイト数一緒になって

という実験結果だったようです。NOT INを使うときは注意が必要かもしれません。

単一行を更新または挿入する DML ステートメント

表題の処理については避けるよう公式では述べています。UPDATEINSERTに関してはバッチ処理が基本です。 そもそもBigQueryが一般的なRDBMSと違い、行指向ではなく列指向であることからも、更新や挿入、あるいは1行単位でのSELECTがリアルタイムで頻繁に発生するものはRDBMS、集計や分析で使用する場合はBigQueryなどの使い分けを意識することが大切かと思います。

非正規化の検討

公式ドキュメントの前に、簡単ではありますが、一応正規化について確認しておきます。多くの人が第1正規化〜第3正規化まで学習した経験があるかと思います。

正規化の種類 説明
第1正規化(1NF) 非正規化テーブルから繰り返し項目を排除する
第2正規化(2NF) 1NFの条件を満たす上で、部分関数従属のない形にテーブルを分割する
第3正規化(3NF) 2NFの条件を満たす上で、推移的関数従属のない形にテーブルを分割する

その中でも最も基本と言えるのが非正規形から第1正規系への変換です。では繰り返し項目とは一体どのようなものでしょうか?ここでは以下の2つの例を挙げてみました。表現が違うだけで実質どちらも同じです。

  • 横に広がってるパターン
    • 例えばitem_name_4などが出てくる可能性もあり対応できない
order_id item_name_1 item_name_2 item_name_3 ...
001 りんご みかん もも ...
002 オレンジ いちご null ...
  • 縦に広がってるパターン
order_id item_name
001 りんご
みかん
もも
002 オレンジ
いちご

非正規化テーブルは更新処理で問題になるパターンが多いです。しかしBigQueryなどのDWHはRDBMSと違い更新よりも集計などで使用されることが多く、その場合非正規化テーブルを使用することでリソースの有効活用やコンピューティングの最適化につながる可能性があります。

ネストされて繰り返されているフィールドを使用する

cloud.google.com

ここからは公式のクエリを使って例を見ていきます。まず以下の2パターンでテーブルを作成します。クエリのコメントなどは自分でつけているので参考程度に見てくれると幸いです。

/*
正規化テーブル
PK: user_id, post_id
*/
create or replace table `myproject.mydataset.stackoverflow` as (
  select
    user_id
    , post_id
    , creation_date
  from
    `bigquery-public-data.stackoverflow.comments`
);

/*
非正規化テーブル
PK: user_id
*/
create or replace table `myproject.mydataset.stackoverflow_nested` as (
  select
    user_id
    , array_agg(struct(post_id, creation_date) order by creation_date asc) as comments
  from
    `bigquery-public-data.stackoverflow.comments`
  group by
    user_id
);

これら2つのテーブルに対して以下のクエリを投げます。やりたいことは各ユーザーのファーストコメントの取得です。

-- 正規化テーブルに対する集計処理
select
  user_id,
  array_agg(struct(post_id, creation_date as earliest_comment) order by creation_date asc limit 1)[offset(0)].*
from
  `myproject.mydataset.stackoverflow`
group by user_id
order by user_id asc;

-- 非正規化テーブルに対する集計処理
select
  user_id,
  (select as struct post_id, creation_date as earliest_comment from unnest(comments) order by creation_date asc limit 1).*
from
  `myproject.mydataset.stackoverflow_nested`
order by user_id asc;

当たり前といえば当たり前ですが、初めから集計されている非正規化テーブルの方がパフォーマンスの面で上にきます。 あと、これは余談なのですが、同じことをやれと言われた場合、自分は真っ先にwindow関数とpartitionを組み合わせる方法を思いつくのですが、そうではなくSTRUCTを使ってやってるとこが勉強になりました。

採用情報

ABEJAでは共に働く仲間を募集しています。気になった方は是非ご連絡ください。

careers.abejainc.com