ABEJA Tech Blog

中の人の興味のある情報を発信していきます

GA → Airbyte → Snowflake(Snowpark for Python) のELT(EL)パイプラインを作ってみた

この記事はABEJA Advent Calender20日目、及びSnowflake Advent Calender23日目の記事になります。

はじめに

こんにちは@Takayoshi_maです。今日はGoogle Analytics(UA) → Airbyte → Snowflake (Snowpark for Python)という流れでデータの抽出、加工を行います。


各々の技術について大雑把に紹介

Snowflake

Snowflake社(ティッカーコード: SNOW)が提供しているSaasのDWHで、よくBigQueryやRedshiftと比較されることが多いと思います。尚、Snowflake社は投資の神様ウォーレンバフェット率いるバークシャーハサウェイが2020年に巨額の投資を行ったことでも知られています。Snowflakeはマルチクラウドプラットフォーム(AWS, GCP, Azure)に対応しており高いパフォーマンスとコスパを誇るDWHです。特にAWSがメインの場合、Redshiftに変わるDWHとしてチョイスされるケースが増えてきていると思います。特にRedshiftの課金体系(起動している時間にお金がかかる)とSnowflakeの課金体系(処理をした分だけお金がかかる)が大きく異なるため、この辺りが判断材料の一つになってくるケースも多い気がします。

Snowpark

2021年末にリリースされたSnowflakeの新機能で、現在はScala/Jave/Pythonなどの言語をサポートしています。対応言語に関してはApache Sparkと似ていますね。 よく同じDWHとして比較されるBigQueryにもUDFがありますが、両者で異なる言語をサポートしていますし、うまく棲み分けができているのでは無いでしょうか? またSnowparkですが、その記述はApache Sparkと比較的似ています。なので例えばSnowpark for Pythonに慣れていない人でもPySparkと同じような書き方でデータ処理ができてしまうことも多いです。これは覚えておくとググる時に結構重宝する気がします。

Airbyte

2020年にサンフランシスコで創業され、わずか1年でシリーズAに到達したスタートアップ企業Airbyte社がOSSで公開しているデータ統合プラットフォームです。今年になってぼちぼち触る機会が出てきたのですが、紹介の意味も込めて記事にさせて頂きました。データ転送によく使われる技術としてdigdag+emblukがよく例として挙げられると思いますが、Airbyteも少しずつ増えてきている印象


環境構築

それではまず環境構築から簡単に書いていきます。

Airbyte

公式から出ている Deploy Airbyte - Airbyte Documentation に従ってDockerコンテナ群を立ち上げます。非常に簡単! ひとまず今回はローカルマシン上で起動します。

#!/bin/bash

git clone https://github.com/airbytehq/airbyte.git
cd airbyte
docker compose up   # 公式では"docker-compose up"でしたがV2なのでこれで

立ち上がったらlocalhost:8000にアクセスし、メールアドレスを入力して進みます。

これでAirbyteの初期画面に行けたと思います。

ここから"Set up your first connection"ボタンを押下すると、以下のような画面が出てきますので、取り敢えずGoogle Analyticsを選択します。

尚、Airbyteにデフォルトで用意されているConnectionはかなり多岐に渡ります。以下の公式サイトをご参照ください

Connector Catalog - Airbyte Documentation

一応以下に2022年9月時点の情報を掲載しておきます

▶︎ Source(クリックすると展開します)

ConnectorProduct Release StageAvailable in Cloud?
3PL CentralAlphaNo
AirtableAlphaYes
AlloyDbAlphaYes
Amazon AdsBetaYes
Amazon Seller PartnerAlphaYes
Amazon SQSAlphaYes
AmplitudeGenerally AvailableYes
Apify DatasetAlphaYes
AppstoreAlphaNo
AsanaAlphaNo
AWS CloudTrailAlphaYes
Azure Table StorageAlphaYes
BambooHRAlphaNo
BatonAlphaNo
BigCommerceAlphaYes
BigQueryAlphaYes
Bing AdsGenerally AvailableYes
BraintreeAlphaYes
Cart.comAlphaNo
ChargebeeBetaYes
ChargifyAlphaNo
ChartmogulAlphaYes
ClickHouseAlphaYes
Close.comAlphaYes
CockroachDBAlphaNo
CommercetoolsAlphaNo
ConfluenceAlphaNo
Customer.ioAlphaNo
Db2AlphaNo
DelightedAlphaYes
DixaAlphaYes
DockerhubAlphaYes
DriftAlphaNo
DrupalAlphaNo
ElasticsearchAlphaNo
End-to-End TestingAlphaYes
Exchange Rates APIAlphaYes
Facebook MarketingGenerally AvailableYes
Facebook PagesAlphaNo
FakerAlphaYes
FileBetaYes
FireboltAlphaYes
FlexportAlphaNo
FreshdeskBetaYes
FreshsalesAlphaNo
FreshserviceAlphaNo
GitHubGenerally AvailableYes
GitLabAlphaYes
GlassfrogAlphaNo
Google AdsGenerally AvailableYes
Google Analytics (v4)AlphaNo
Google Analytics (Universal Analytics)Generally AvailableYes
Google DirectoryAlphaYes
Google Search ConsoleBetaYes
Google SheetsGenerally AvailableYes
Google Workspace Admin ReportsAlphaYes
GreenhouseAlphaYes
HarnessAlphaNo
HarvestBetaNo
http-requestAlphaNo
HubSpotGenerally AvailableYes
InstagramGenerally AvailableYes
IntercomGenerally AvailableYes
IterableAlphaYes
JenkinsAlphaNo
JiraAlphaNo
KafkaAlphaNo
KlaviyoBetaYes
KustomerAlphaYes
KyribaAlphaNo
LemlistAlphaYes
LeverAlphaNo
LinkedIn AdsGenerally AvailableYes
LinkedIn PagesAlphaNo
LinnworksAlphaYes
LookerAlphaYes
MagentoAlphaNo
MailchimpGenerally AvailableYes
MarketoBetaYes
MetabaseAlphaYes
Microsoft Dynamics AXAlphaNo
Microsoft Dynamics Customer EngagementAlphaNo
Microsoft Dynamics GPAlphaNo
Microsoft Dynamics NAVAlphaNo
Microsoft SQL Server (MSSQL)AlphaYes
Microsoft TeamsAlphaYes
MixpanelBetaYes
MondayAlphaYes
Mongo DBAlphaYes
My HoursAlphaYes
MySQLAlphaYes
NotionBetaNo
OktaAlphaYes
OneSignalAlphaNo
OpenWeatherAlphaNo
Oracle DBAlphaYes
Oracle PeopleSoftAlphaNo
Oracle Siebel CRMAlphaNo
OrbAlphaYes
OrbitAlphaYes
OutreachAlphaNo
PagerDutyAlphaNo
PayPal TransactionBetaYes
PaystackAlphaNo
PersistIqAlphaYes
PinterestAlphaNo
PipedriveAlphaNo
Pivotal TrackerAlphaNo
PlaidAlphaNo
PokéAPIAlphaYes
PostgresGenerally AvailableYes
PostHogAlphaYes
PrestaShopAlphaYes
QualarooAlphaYes
QuickBooksAlphaNo
RechargeBetaYes
RecurlyAlphaYes
RedshiftAlphaYes
RetentlyAlphaYes
S3Generally AvailableYes
SalesforceGenerally AvailableYes
SalesloftAlphaNo
SAP Business OneAlphaNo
SearchMetricsAlphaNo
SendgridAlphaYes
SentryAlphaYes
SFTPAlphaYes
ShopifyAlphaNo
Short.ioAlphaYes
SlackBetaYes
SmartsheetsBetaYes
Snapchat MarketingBetaYes
SnowflakeAlphaYes
Spree CommerceAlphaNo
SquareAlphaYes
StravaAlphaNo
StripeGenerally AvailableYes
Sugar CRMAlphaNo
SurveyMonkeyBetaYes
TempoAlphaYes
TiDBAlphaNo
TikTok MarketingGenerally AvailableYes
TrelloAlphaNo
TwilioBetaYes
TypeformAlphaYes
US CensusAlphaYes
VictorOpsAlphaNo
WebflowAlphaYes
WooCommerceAlphaNo
WordpressAlphaNo
YouTube AnalyticsBetaYes
ZencartAlphaNo
Zendesk ChatBetaYes
Zendesk SunshineAlphaYes
Zendesk SupportGenerally AvailableYes
Zendesk TalkAlphaYes
ZenloopAlphaYes
Zoho CRMAlphaNo
ZoomAlphaNo
ZuoraAlphaYes

▶︎ Destinations(クリックすると展開します)

ConnectorProduct Release StageAvailable in Cloud?
Amazon SQSAlphaYes
Amazon DatalakeAlphaNo
AzureBlobStorageAlphaYes
BigQueryGenerally AvailableYes
CassandraAlphaYes
Chargify (Keen)AlphaYes
ClickHouseAlphaYes
DatabricksAlphaYes
DynamoDBAlphaYes
ElasticsearchAlphaYes
End-to-End TestingAlphaYes
FireboltAlphaYes
Google Cloud Storage (GCS)BetaYes
Google PubsubAlphaYes
Google SheetsAlphaYes
KafkaAlphaNo
KeenAlphaNo
KinesisAlphaNo
Local CSVAlphaNo
Local JSONAlphaNo
MariaDB ColumnStoreAlphaYes
MeiliSearchAlphaYes
MongoDBAlphaYes
MQTTAlphaYes
MS SQL ServerAlphaYes
MySQLAlphaYes
OracleAlphaYes
PostgresAlphaYes
PulsarAlphaYes
RabbitMQAlphaYes
RedisAlphaYes
RedshiftBetaYes
RocksetAlphaYes
S3Generally AvailableYes
ScyllaAlphaYes
SFTP JSONAlphaYes
SnowflakeGenerally AvailableYes
SQLiteAlphaNo
StreamrAlphaNo
TiDBAlphaNo

Google Analytics

今回はUAを使います。やり方に関しては以下の公式ドキュメントを参考にしました。 (Google Analyticsの導入については既にできているものとして一旦割愛します)

Google Analytics (Universal Analytics) - Airbyte Documentation

Google Analyticsと同じGoogleアカウントログイン後、GCPコンソールに移動します。既存のプロジェクトでも新規プロジェクトでも良いのでサービスカウントを作成、権限はデフォルトのまま作りました

次に、作成したアカウントの秘密鍵(JSON)を作成、自動的に鍵がダウンロードされるので保管します。その後Google Analyticsのホーム画面から左下の歯車をクリックし、「アカウントのアクセス管理をクリック」

右上の+ボタンから新規ユーザーの追加をクリックし、先ほど作成したユーザーアカウントのメールアドレスを追加、権限は閲覧者でOK、その後GCPのAPIとサービスからAnalytics Reporting APIを有効化します

最後に、Airbyteの画面に戻って、Sourceの追加から諸々の情報を入力します

Key Value
Credentials Service Account Key Aucentificationを選択
Service Account JSON Key 作成した秘密鍵
Replication Start Data 同期を開始した日付
View ID 対象の view id
Custom Reports (Optional) 欲しい情報
Data request time increment in days (Optional) 1日あたりの同期回数 

Google Search Console

SEOと言えばGoogleAnalyticsともう一つGoogle Search Consoleが鉄板かと思います。自分もAnalyticsのリッチすぎる情報を消化しきれず、ついついこっちの方をメインで見ることが多い気がします。 本来であればサチコもSourceとして追加したいのですが、こちらはAPIの関係上GoogleWorkspaceの管理者IDが(多分)必要になってくるようです。今回はあくまでも個人利用であるため、Sourceの追加は割愛します。一応、以下のドキュメントにSourceの追加方法について詳しく掲載されています。

Google Search Console - Airbyte Documentation

Snowflake

先ほどはAirbyteのSources登録からGoogleAnalyticsを追加しましたが、引き続きDestinationsとしてSnowflkeを登録していきます。具体的な方法についてはAirbyteの公式ドキュメントに非常に分かりやすく記載されております。

Snowflake - Airbyte Document -

まず初めに現在のアカウントにSECURITYADMIN以上のロールが割り当てられていることを確認します。直接GUIで確認しても良いし、ワークシートを開いて以下のクエリを叩くことでも確認可能です。

-- Acountで確認
SHOW PARAMETERS LIKE 'network_policy' IN ACCOUNT;

-- またはUserで確認
SHOW PARAMETERS LIKE 'network_policy' IN USER <username>;

ワークシートを開いて以下のクエリを実行し、Airbyte用に各種エンティティ(DWH, DB, Schema, User, Role)を作成します。クエリは公式からそのまま引用しています

-- set variables (these need to be uppercase)
set airbyte_role = 'AIRBYTE_ROLE';
set airbyte_username = 'AIRBYTE_USER';
set airbyte_warehouse = 'AIRBYTE_WAREHOUSE';
set airbyte_database = 'AIRBYTE_DATABASE';
set airbyte_schema = 'AIRBYTE_SCHEMA';

-- set user password
set airbyte_password = 'password';

begin;

-- create Airbyte role
use role securityadmin;
create role if not exists identifier($airbyte_role);
grant role identifier($airbyte_role) to role SYSADMIN;

-- create Airbyte user
create user if not exists identifier($airbyte_username)
password = $airbyte_password
default_role = $airbyte_role
default_warehouse = $airbyte_warehouse;

grant role identifier($airbyte_role) to user identifier($airbyte_username);

-- change role to sysadmin for warehouse / database steps
use role sysadmin;

-- create Airbyte warehouse
create warehouse if not exists identifier($airbyte_warehouse)
warehouse_size = xsmall
warehouse_type = standard
auto_suspend = 60
auto_resume = true
initially_suspended = true;

-- create Airbyte database
create database if not exists identifier($airbyte_database);

-- grant Airbyte warehouse access
grant USAGE
on warehouse identifier($airbyte_warehouse)
to role identifier($airbyte_role);

-- grant Airbyte database access
grant OWNERSHIP
on database identifier($airbyte_database)
to role identifier($airbyte_role);

commit;

begin;

USE DATABASE identifier($airbyte_database);

-- create schema for Airbyte data
CREATE SCHEMA IF NOT EXISTS identifier($airbyte_schema);

commit;

begin;

-- grant Airbyte schema access
grant OWNERSHIP
on schema identifier($airbyte_schema)
to role identifier($airbyte_role);

commit;

クエリ実行後には以下のように各エンティティが作られていることが確認できます。

Role DB, Schema User

次にデータのローディング場所についての設定を行います。SnowflakeではInternal Stage, S3(AWS), GCS(GCP), AzureBolbStorage(Azure)の中から選べます。Internal Stageを使う際はもちろんその分の料金がかかってきますが、低量であれば気にするほどではありません。尚、今回はInternal Stageを使っていくので特に設定の必要はありません。

そして、Airbyteから各情報を入力していきます。公式ドキュメントに必要な情報があるのでそれに沿っていく形で良いと思います。一応今回はシンプルにパスワード認証で設定しています。尚、ブログ用の簡易的な構築とは言え、流石にそれだけだと怖いのでSnowflake側のセキュリティ設定でIP制限だけかけておきました。ちなみにAirbyte側にホスト名を入力する箇所がありますが、ホスト名の取得については以下のクエリを叩きます

select system$whitelist();

Connection

SourceとDestinationの設定ができたので次は両者をつなげていきます。まずAirbyteのConnectionタブをクリックした後に、先ほどのSourceとDestinationを選択していきます。

その後、更新頻度や紐づける情報などを設定していきます。

もちろん個別に設定することも可能ですが、以下のように一括で設定を反映することも可能です。

Syncされたテーブルですが、以下のような形で保存されていることが確認できます。ここではデフォルトの連携で入るそれぞれのテーブルに関する説明は省きます。


Snowpark

サンプルとして、自身のプロ野球データサイトにおけるGoogle Analyticsのデータを使用しています。

今回の処理

以下のような定義でViewを作成していきます。Snowpark公式ドキュメントの中でクロス集計に関する情報を見つけることができなかったため、前半部分はSQLで記述しています。

"""
session.py

Create a session and return a connection to the Snowflake database
"""
import os

from snowflake.snowpark import Session


def create_session() -> Session:
    params = {
        'account': os.environ['ACCOUNT'],
        'user': os.environ['SNOWFLAKE_USER'],
        'password': os.environ['SNOWFLAKE_PASS'],
        'role': os.environ['SNOWFLAKE_ROLE'],
        'warehouse': os.environ['SNOWFLAKE_WAREHOUSE'],
        'database': os.environ['DB_NAME'],
        'schema': os.environ['SCHEMA_NAME']
    }
    session = Session.builder.configs(params).create()
    return session
"""
main.py
"""

import os

from dotenv import load_dotenv
from snowflake.snowpark.functions import col
from snowflake.snowpark.dataframe import DataFrame

from session import create_session


load_dotenv()
session = create_session()

# 下記テーブル(df_device, SQLで記述)と見た目を揃えるためSQLで記述
query_overview = f"""
    select
        ga_date
        , ga_users
        , ga_exitrate
        , ga_newusers
        , ga_sessions
        , ga_pageviews
        , ga_avgtimeonpage
        , ga_sessionsperuser
        , ga_avgsessionduration
        , ga_pageviewspersession
    from {os.environ["DB_NAME"]}.{os.environ["SCHEMA_NAME"]}.npb_website_overview
"""

# Snowparkでのクロス集計の記述が不明なため生SQLで記述
query_device = f"""
    select
        ga_date
        , sum(case ga_devicecategory when 'desktop' then ga_users else 0 end) as desktop_user
        , sum(case ga_devicecategory when 'mobile' then ga_users else 0 end) as mobile_user
        , sum(case ga_devicecategory when 'tablet' then ga_users else 0 end) as tablets_user
        , sum(case ga_devicecategory when 'desktop' then ga_pageviews else 0 end) as desktop_pageviews
        , sum(case ga_devicecategory when 'mobile' then ga_pageviews else 0 end) as mobile_pageviews
        , sum(case ga_devicecategory when 'tablet' then ga_pageviews else 0 end) as tablets_pageviews
    from {os.environ["DB_NAME"]}.{os.environ["SCHEMA_NAME"]}.npb_devices
    group by ga_date
"""

df = session.sql(query_overview)
df_device = session.sql(query_device)


def create_per_person_df(df: DataFrame, device_name: str) -> DataFrame:
    """ デバイス毎の1人あたりページビュー数を算出する

    Args:
        df (DataFrame): npb_devicesテーブルを想定
        device_name (str): デバイス名 (desktop, mobile, tablets)
    """
    _df = df.filter(col(f'{device_name}_user') > 0)
    _df = _df.withColumn(f'{device_name}_pageviews_per_person',
                         col(f'{device_name}_pageviews') / col(f'{device_name}_user'))
    _df = _df.select('ga_date', f'{device_name}_pageviews_per_person')
    return _df


df_desktop = create_per_person_df(df_device, 'desktop')
df_mobile = create_per_person_df(df_device, 'mobile')
df_tablets = create_per_person_df(df_device, 'tablets')

df = df.join(df_desktop, ['ga_date'], 'left')
df = df.join(df_mobile, ['ga_date'], 'left')
df = df.join(df_tablets, ['ga_date'], 'left')

df = df.sort('ga_date')

# Viewを保存
df.create_or_replace_view(
    f'{os.environ["DB_NAME"]}.{os.environ["SCHEMA_NAME"]}.npb_my_view')

npb_website_overviewに必要な情報はほとんど入っていますが、今回はそれにデバイス毎の1ユーザーあたりページビュー数を追加します。レスポンシブサイトとはいえ、モバイルとタブレット、デスクトップでUI/UXに差は出てきてしまいますが、果たしてどの程度ページビュー数や滞在時間に差があるのかの検証です。 ちなみに上記の処理を実行するとSnowflake上にSQLが発行されます。確認したところ以下のSQLが発行されていました。

create or replace view AIRBYTE_DATABASE.AIRBYTE_SCHEMA.NPB_MY_VIEW(
    GA_DATE,
    GA_USERS,
    GA_EXITRATE,
    GA_NEWUSERS,
    GA_SESSIONS,
    GA_PAGEVIEWS,
    GA_AVGTIMEONPAGE,
    GA_SESSIONSPERUSER,
    GA_AVGSESSIONDURATION,
    GA_PAGEVIEWSPERSESSION,
    DESKTOP_PAGEVIEWS_PER_PERSON,
    MOBILE_PAGEVIEWS_PER_PERSON,
    TABLETS_PAGEVIEWS_PER_PERSON
) as  SELECT  *  FROM ( SELECT  *  FROM ( SELECT  *  FROM (( SELECT "GA_DATE" AS "GA_DATE", "GA_USERS" AS "GA_USERS", "GA_EXITRATE" AS "GA_EXITRATE", "GA_NEWUSERS" AS "GA_NEWUSERS", "GA_SESSIONS" AS "GA_SESSIONS", "GA_PAGEVIEWS" AS "GA_PAGEVIEWS", "GA_AVGTIMEONPAGE" AS "GA_AVGTIMEONPAGE", "GA_SESSIONSPERUSER" AS "GA_SESSIONSPERUSER", "GA_AVGSESSIONDURATION" AS "GA_AVGSESSIONDURATION", "GA_PAGEVIEWSPERSESSION" AS "GA_PAGEVIEWSPERSESSION", "DESKTOP_PAGEVIEWS_PER_PERSON" AS "DESKTOP_PAGEVIEWS_PER_PERSON", "MOBILE_PAGEVIEWS_PER_PERSON" AS "MOBILE_PAGEVIEWS_PER_PERSON" FROM ( SELECT  *  FROM (( SELECT "GA_DATE" AS "GA_DATE", "GA_USERS" AS "GA_USERS", "GA_EXITRATE" AS "GA_EXITRATE", "GA_NEWUSERS" AS "GA_NEWUSERS", "GA_SESSIONS" AS "GA_SESSIONS", "GA_PAGEVIEWS" AS "GA_PAGEVIEWS", "GA_AVGTIMEONPAGE" AS "GA_AVGTIMEONPAGE", "GA_SESSIONSPERUSER" AS "GA_SESSIONSPERUSER", "GA_AVGSESSIONDURATION" AS "GA_AVGSESSIONDURATION", "GA_PAGEVIEWSPERSESSION" AS "GA_PAGEVIEWSPERSESSION", "DESKTOP_PAGEVIEWS_PER_PERSON" AS "DESKTOP_PAGEVIEWS_PER_PERSON" FROM ( SELECT  *  FROM (( SELECT "GA_DATE" AS "GA_DATE", "GA_USERS" AS "GA_USERS", "GA_EXITRATE" AS "GA_EXITRATE", "GA_NEWUSERS" AS "GA_NEWUSERS", "GA_SESSIONS" AS "GA_SESSIONS", "GA_PAGEVIEWS" AS "GA_PAGEVIEWS", "GA_AVGTIMEONPAGE" AS "GA_AVGTIMEONPAGE", "GA_SESSIONSPERUSER" AS "GA_SESSIONSPERUSER", "GA_AVGSESSIONDURATION" AS "GA_AVGSESSIONDURATION", "GA_PAGEVIEWSPERSESSION" AS "GA_PAGEVIEWSPERSESSION" FROM (
    select
        ga_date
        , ga_users
        , ga_exitrate
        , ga_newusers
        , ga_sessions
        , ga_pageviews
        , ga_avgtimeonpage
        , ga_sessionsperuser
        , ga_avgsessionduration
        , ga_pageviewspersession
    from AIRBYTE_DATABASE.AIRBYTE_SCHEMA.npb_website_overview
)) AS SNOWPARK_TEMP_TABLE_Y6CSYKKW6K LEFT OUTER JOIN ( SELECT "GA_DATE" AS "GA_DATE", "DESKTOP_PAGEVIEWS_PER_PERSON" AS "DESKTOP_PAGEVIEWS_PER_PERSON" FROM ( SELECT "GA_DATE", "DESKTOP_PAGEVIEWS_PER_PERSON" FROM ( SELECT "GA_DATE", "DESKTOP_USER", "MOBILE_USER", "TABLETS_USER", "DESKTOP_PAGEVIEWS", "MOBILE_PAGEVIEWS", "TABLETS_PAGEVIEWS", ("DESKTOP_PAGEVIEWS" / "DESKTOP_USER") AS "DESKTOP_PAGEVIEWS_PER_PERSON" FROM ( SELECT  *  FROM (
    select
        ga_date
        , sum(case ga_devicecategory when 'desktop' then ga_users else 0 end) as desktop_user
        , sum(case ga_devicecategory when 'mobile' then ga_users else 0 end) as mobile_user
        , sum(case ga_devicecategory when 'tablet' then ga_users else 0 end) as tablets_user
        , sum(case ga_devicecategory when 'desktop' then ga_pageviews else 0 end) as desktop_pageviews
        , sum(case ga_devicecategory when 'mobile' then ga_pageviews else 0 end) as mobile_pageviews
        , sum(case ga_devicecategory when 'tablet' then ga_pageviews else 0 end) as tablets_pageviews
    from AIRBYTE_DATABASE.AIRBYTE_SCHEMA.npb_devices
    group by ga_date
) WHERE ("DESKTOP_USER" > 0 :: bigint))))) AS SNOWPARK_TEMP_TABLE_F1KQPY4Y1A USING (ga_date)))) AS SNOWPARK_TEMP_TABLE_DZJ0YH3IKI LEFT OUTER JOIN ( SELECT "GA_DATE" AS "GA_DATE", "MOBILE_PAGEVIEWS_PER_PERSON" AS "MOBILE_PAGEVIEWS_PER_PERSON" FROM ( SELECT "GA_DATE", "MOBILE_PAGEVIEWS_PER_PERSON" FROM ( SELECT "GA_DATE", "DESKTOP_USER", "MOBILE_USER", "TABLETS_USER", "DESKTOP_PAGEVIEWS", "MOBILE_PAGEVIEWS", "TABLETS_PAGEVIEWS", ("MOBILE_PAGEVIEWS" / "MOBILE_USER") AS "MOBILE_PAGEVIEWS_PER_PERSON" FROM ( SELECT  *  FROM (
    select
        ga_date
        , sum(case ga_devicecategory when 'desktop' then ga_users else 0 end) as desktop_user
        , sum(case ga_devicecategory when 'mobile' then ga_users else 0 end) as mobile_user
        , sum(case ga_devicecategory when 'tablet' then ga_users else 0 end) as tablets_user
        , sum(case ga_devicecategory when 'desktop' then ga_pageviews else 0 end) as desktop_pageviews
        , sum(case ga_devicecategory when 'mobile' then ga_pageviews else 0 end) as mobile_pageviews
        , sum(case ga_devicecategory when 'tablet' then ga_pageviews else 0 end) as tablets_pageviews
    from AIRBYTE_DATABASE.AIRBYTE_SCHEMA.npb_devices
    group by ga_date
) WHERE ("MOBILE_USER" > 0 :: bigint))))) AS SNOWPARK_TEMP_TABLE_Q2L8KVPG8W USING (ga_date)))) AS SNOWPARK_TEMP_TABLE_3J9UYSEAIC LEFT OUTER JOIN ( SELECT "GA_DATE" AS "GA_DATE", "TABLETS_PAGEVIEWS_PER_PERSON" AS "TABLETS_PAGEVIEWS_PER_PERSON" FROM ( SELECT "GA_DATE", "TABLETS_PAGEVIEWS_PER_PERSON" FROM ( SELECT "GA_DATE", "DESKTOP_USER", "MOBILE_USER", "TABLETS_USER", "DESKTOP_PAGEVIEWS", "MOBILE_PAGEVIEWS", "TABLETS_PAGEVIEWS", ("TABLETS_PAGEVIEWS" / "TABLETS_USER") AS "TABLETS_PAGEVIEWS_PER_PERSON" FROM ( SELECT  *  FROM (
    select
        ga_date
        , sum(case ga_devicecategory when 'desktop' then ga_users else 0 end) as desktop_user
        , sum(case ga_devicecategory when 'mobile' then ga_users else 0 end) as mobile_user
        , sum(case ga_devicecategory when 'tablet' then ga_users else 0 end) as tablets_user
        , sum(case ga_devicecategory when 'desktop' then ga_pageviews else 0 end) as desktop_pageviews
        , sum(case ga_devicecategory when 'mobile' then ga_pageviews else 0 end) as mobile_pageviews
        , sum(case ga_devicecategory when 'tablet' then ga_pageviews else 0 end) as tablets_pageviews
    from AIRBYTE_DATABASE.AIRBYTE_SCHEMA.npb_devices
    group by ga_date
) WHERE ("TABLETS_USER" > 0 :: bigint))))) AS SNOWPARK_TEMP_TABLE_EKH4EDRB3T USING (ga_date))) ORDER BY "GA_DATE" ASC NULLS FIRST);

一見するとすっきりに見えますが、自分で書いた生SQLの部分がそう見えるだけで、実際には結構複雑です。見やすいよう整形したSQLを一応下記に残しておきます。

▶︎ SQL(長いので折り畳んでいます)

create
or replace view AIRBYTE_DATABASE.AIRBYTE_SCHEMA.NPB_MY_VIEW(
  GA_DATE,
  GA_USERS,
  GA_EXITRATE,
  GA_NEWUSERS,
  GA_SESSIONS,
  GA_PAGEVIEWS,
  GA_AVGTIMEONPAGE,
  GA_SESSIONSPERUSER,
  GA_AVGSESSIONDURATION,
  GA_PAGEVIEWSPERSESSION,
  DESKTOP_PAGEVIEWS_PER_PERSON,
  MOBILE_PAGEVIEWS_PER_PERSON,
  TABLETS_PAGEVIEWS_PER_PERSON
) as
SELECT
  *
FROM
  (
    SELECT
      *
    FROM
      (
        SELECT
          *
        FROM
          (
            (
              SELECT
                "GA_DATE" AS "GA_DATE",
                "GA_USERS" AS "GA_USERS",
                "GA_EXITRATE" AS "GA_EXITRATE",
                "GA_NEWUSERS" AS "GA_NEWUSERS",
                "GA_SESSIONS" AS "GA_SESSIONS",
                "GA_PAGEVIEWS" AS "GA_PAGEVIEWS",
                "GA_AVGTIMEONPAGE" AS "GA_AVGTIMEONPAGE",
                "GA_SESSIONSPERUSER" AS "GA_SESSIONSPERUSER",
                "GA_AVGSESSIONDURATION" AS "GA_AVGSESSIONDURATION",
                "GA_PAGEVIEWSPERSESSION" AS "GA_PAGEVIEWSPERSESSION",
                "DESKTOP_PAGEVIEWS_PER_PERSON" AS "DESKTOP_PAGEVIEWS_PER_PERSON",
                "MOBILE_PAGEVIEWS_PER_PERSON" AS "MOBILE_PAGEVIEWS_PER_PERSON"
              FROM
                (
                  SELECT
                    *
                  FROM
                    (
                      (
                        SELECT
                          "GA_DATE" AS "GA_DATE",
                          "GA_USERS" AS "GA_USERS",
                          "GA_EXITRATE" AS "GA_EXITRATE",
                          "GA_NEWUSERS" AS "GA_NEWUSERS",
                          "GA_SESSIONS" AS "GA_SESSIONS",
                          "GA_PAGEVIEWS" AS "GA_PAGEVIEWS",
                          "GA_AVGTIMEONPAGE" AS "GA_AVGTIMEONPAGE",
                          "GA_SESSIONSPERUSER" AS "GA_SESSIONSPERUSER",
                          "GA_AVGSESSIONDURATION" AS "GA_AVGSESSIONDURATION",
                          "GA_PAGEVIEWSPERSESSION" AS "GA_PAGEVIEWSPERSESSION",
                          "DESKTOP_PAGEVIEWS_PER_PERSON" AS "DESKTOP_PAGEVIEWS_PER_PERSON"
                        FROM
                          (
                            SELECT
                              *
                            FROM
                              (
                                (
                                  SELECT
                                    "GA_DATE" AS "GA_DATE",
                                    "GA_USERS" AS "GA_USERS",
                                    "GA_EXITRATE" AS "GA_EXITRATE",
                                    "GA_NEWUSERS" AS "GA_NEWUSERS",
                                    "GA_SESSIONS" AS "GA_SESSIONS",
                                    "GA_PAGEVIEWS" AS "GA_PAGEVIEWS",
                                    "GA_AVGTIMEONPAGE" AS "GA_AVGTIMEONPAGE",
                                    "GA_SESSIONSPERUSER" AS "GA_SESSIONSPERUSER",
                                    "GA_AVGSESSIONDURATION" AS "GA_AVGSESSIONDURATION",
                                    "GA_PAGEVIEWSPERSESSION" AS "GA_PAGEVIEWSPERSESSION"
                                  FROM
                                    (
                                      select
                                        ga_date,
                                        ga_users,
                                        ga_exitrate,
                                        ga_newusers,
                                        ga_sessions,
                                        ga_pageviews,
                                        ga_avgtimeonpage,
                                        ga_sessionsperuser,
                                        ga_avgsessionduration,
                                        ga_pageviewspersession
                                      from
                                        AIRBYTE_DATABASE.AIRBYTE_SCHEMA.npb_website_overview
                                    )
                                ) AS SNOWPARK_TEMP_TABLE_Y6CSYKKW6K
                                LEFT OUTER JOIN (
                                  SELECT
                                    "GA_DATE" AS "GA_DATE",
                                    "DESKTOP_PAGEVIEWS_PER_PERSON" AS "DESKTOP_PAGEVIEWS_PER_PERSON"
                                  FROM
                                    (
                                      SELECT
                                        "GA_DATE",
                                        "DESKTOP_PAGEVIEWS_PER_PERSON"
                                      FROM
                                        (
                                          SELECT
                                            "GA_DATE",
                                            "DESKTOP_USER",
                                            "MOBILE_USER",
                                            "TABLETS_USER",
                                            "DESKTOP_PAGEVIEWS",
                                            "MOBILE_PAGEVIEWS",
                                            "TABLETS_PAGEVIEWS",
                                            ("DESKTOP_PAGEVIEWS" / "DESKTOP_USER") AS "DESKTOP_PAGEVIEWS_PER_PERSON"
                                          FROM
                                            (
                                              SELECT
                                                *
                                              FROM
                                                (
                                                  select
                                                    ga_date,
                                                    sum(
                                                      case
                                                        ga_devicecategory
                                                        when 'desktop' then ga_users
                                                        else 0
                                                      end
                                                    ) as desktop_user,
                                                    sum(
                                                      case
                                                        ga_devicecategory
                                                        when 'mobile' then ga_users
                                                        else 0
                                                      end
                                                    ) as mobile_user,
                                                    sum(
                                                      case
                                                        ga_devicecategory
                                                        when 'tablet' then ga_users
                                                        else 0
                                                      end
                                                    ) as tablets_user,
                                                    sum(
                                                      case
                                                        ga_devicecategory
                                                        when 'desktop' then ga_pageviews
                                                        else 0
                                                      end
                                                    ) as desktop_pageviews,
                                                    sum(
                                                      case
                                                        ga_devicecategory
                                                        when 'mobile' then ga_pageviews
                                                        else 0
                                                      end
                                                    ) as mobile_pageviews,
                                                    sum(
                                                      case
                                                        ga_devicecategory
                                                        when 'tablet' then ga_pageviews
                                                        else 0
                                                      end
                                                    ) as tablets_pageviews
                                                  from
                                                    AIRBYTE_DATABASE.AIRBYTE_SCHEMA.npb_devices
                                                  group by
                                                    ga_date
                                                )
                                              WHERE
                                                ("DESKTOP_USER" > 0 :: bigint)
                                            )
                                        )
                                    )
                                ) AS SNOWPARK_TEMP_TABLE_F1KQPY4Y1A USING (ga_date)
                              )
                          )
                      ) AS SNOWPARK_TEMP_TABLE_DZJ0YH3IKI
                      LEFT OUTER JOIN (
                        SELECT
                          "GA_DATE" AS "GA_DATE",
                          "MOBILE_PAGEVIEWS_PER_PERSON" AS "MOBILE_PAGEVIEWS_PER_PERSON"
                        FROM
                          (
                            SELECT
                              "GA_DATE",
                              "MOBILE_PAGEVIEWS_PER_PERSON"
                            FROM
                              (
                                SELECT
                                  "GA_DATE",
                                  "DESKTOP_USER",
                                  "MOBILE_USER",
                                  "TABLETS_USER",
                                  "DESKTOP_PAGEVIEWS",
                                  "MOBILE_PAGEVIEWS",
                                  "TABLETS_PAGEVIEWS",
                                  ("MOBILE_PAGEVIEWS" / "MOBILE_USER") AS "MOBILE_PAGEVIEWS_PER_PERSON"
                                FROM
                                  (
                                    SELECT
                                      *
                                    FROM
                                      (
                                        select
                                          ga_date,
                                          sum(
                                            case
                                              ga_devicecategory
                                              when 'desktop' then ga_users
                                              else 0
                                            end
                                          ) as desktop_user,
                                          sum(
                                            case
                                              ga_devicecategory
                                              when 'mobile' then ga_users
                                              else 0
                                            end
                                          ) as mobile_user,
                                          sum(
                                            case
                                              ga_devicecategory
                                              when 'tablet' then ga_users
                                              else 0
                                            end
                                          ) as tablets_user,
                                          sum(
                                            case
                                              ga_devicecategory
                                              when 'desktop' then ga_pageviews
                                              else 0
                                            end
                                          ) as desktop_pageviews,
                                          sum(
                                            case
                                              ga_devicecategory
                                              when 'mobile' then ga_pageviews
                                              else 0
                                            end
                                          ) as mobile_pageviews,
                                          sum(
                                            case
                                              ga_devicecategory
                                              when 'tablet' then ga_pageviews
                                              else 0
                                            end
                                          ) as tablets_pageviews
                                        from
                                          AIRBYTE_DATABASE.AIRBYTE_SCHEMA.npb_devices
                                        group by
                                          ga_date
                                      )
                                    WHERE
                                      ("MOBILE_USER" > 0 :: bigint)
                                  )
                              )
                          )
                      ) AS SNOWPARK_TEMP_TABLE_Q2L8KVPG8W USING (ga_date)
                    )
                )
            ) AS SNOWPARK_TEMP_TABLE_3J9UYSEAIC
            LEFT OUTER JOIN (
              SELECT
                "GA_DATE" AS "GA_DATE",
                "TABLETS_PAGEVIEWS_PER_PERSON" AS "TABLETS_PAGEVIEWS_PER_PERSON"
              FROM
                (
                  SELECT
                    "GA_DATE",
                    "TABLETS_PAGEVIEWS_PER_PERSON"
                  FROM
                    (
                      SELECT
                        "GA_DATE",
                        "DESKTOP_USER",
                        "MOBILE_USER",
                        "TABLETS_USER",
                        "DESKTOP_PAGEVIEWS",
                        "MOBILE_PAGEVIEWS",
                        "TABLETS_PAGEVIEWS",
                        ("TABLETS_PAGEVIEWS" / "TABLETS_USER") AS "TABLETS_PAGEVIEWS_PER_PERSON"
                      FROM
                        (
                          SELECT
                            *
                          FROM
                            (
                              select
                                ga_date,
                                sum(
                                  case
                                    ga_devicecategory
                                    when 'desktop' then ga_users
                                    else 0
                                  end
                                ) as desktop_user,
                                sum(
                                  case
                                    ga_devicecategory
                                    when 'mobile' then ga_users
                                    else 0
                                  end
                                ) as mobile_user,
                                sum(
                                  case
                                    ga_devicecategory
                                    when 'tablet' then ga_users
                                    else 0
                                  end
                                ) as tablets_user,
                                sum(
                                  case
                                    ga_devicecategory
                                    when 'desktop' then ga_pageviews
                                    else 0
                                  end
                                ) as desktop_pageviews,
                                sum(
                                  case
                                    ga_devicecategory
                                    when 'mobile' then ga_pageviews
                                    else 0
                                  end
                                ) as mobile_pageviews,
                                sum(
                                  case
                                    ga_devicecategory
                                    when 'tablet' then ga_pageviews
                                    else 0
                                  end
                                ) as tablets_pageviews
                              from
                                AIRBYTE_DATABASE.AIRBYTE_SCHEMA.npb_devices
                              group by
                                ga_date
                            )
                          WHERE
                            ("TABLETS_USER" > 0 :: bigint)
                        )
                    )
                )
            ) AS SNOWPARK_TEMP_TABLE_EKH4EDRB3T USING (ga_date)
          )
      )
    ORDER BY
      "GA_DATE" ASC NULLS FIRST
  );

どうでしょう?整形後で少し見やすくした場合でも、一見すると「??」って感じですね。正直頭に直ぐには入ってこないです。一応、自分でもSQL記述してみましたが、以下のような形の方が見やすいかなとは思います。(それでもSnowparkと比較すると冗長)

create or replace view airbyte_database.airbyte_schema.npb_view2 as
    /*
        デバイス毎の1人あたりページビュー数を算出する
    */
    with users_by_device as (
        select 
            ga_date
            , sum(case ga_devicecategory when 'desktop' then ga_users else 0 end) as desktop_user
            , sum(case ga_devicecategory when 'mobile' then ga_users else 0 end) as mobile_user
            , sum(case ga_devicecategory when 'tablet' then ga_users else 0 end) as tablets_user
            , sum(case ga_devicecategory when 'desktop' then ga_pageviews else 0 end) as desktop_pageviews
            , sum(case ga_devicecategory when 'mobile' then ga_pageviews else 0 end) as mobile_pageviews
            , sum(case ga_devicecategory when 'tablet' then ga_pageviews else 0 end) as tablets_pageviews
        from airbyte_database.airbyte_schema.npb_devices
        group by ga_date
    ), t_desktop as (
        select
            ga_date
            , desktop_pageviews / desktop_user as desktop_views_per_person
        from users_by_device
        where desktop_user > 0
    ), t_mobile as (
        select
            ga_date
            , mobile_pageviews / mobile_user as mobile_views_per_person
        from users_by_device
        where mobile_user > 0
    ), t_tablets as (
        select
            ga_date
            , tablets_pageviews / tablets_user as tablets_views_per_person
        from users_by_device
        where tablets_user > 0
    )

    select
        ga_date
        , ga_users
        , ga_exitrate
        , ga_newusers
        , ga_sessions
        , ga_pageviews
        , ga_avgtimeonpage
        , ga_sessionsperuser
        , ga_avgsessionduration
        , ga_pageviewspersession
        , t_desktop.desktop_views_per_person
        , t_mobile.mobile_views_per_person
        , t_tablets.tablets_views_per_person
    from
        airbyte_database.airbyte_schema.npb_website_overview
        left join t_desktop using(ga_date)
        left join t_mobile using(ga_date)
        left join t_tablets using(ga_date)
    order by ga_date

結局Snowparkが使えると何がいいか

色々あると思いますが、記述量を減らせること、UDFとして定義しておくことで保守性や可読性を高められる点などだと思います。 例えばSQLで全てのカラムに対して以下のような置換処理を行うとします。

select
  nullif('column1', 'N/A') as a,
  nullif('column2', 'N/A') as a,
  -- 中略
  nullif('column1', '') as a,
  nullif('column2', '') as b,
  -- 中略
  nullif('column1', ' ') as a,
  nullif('column2', ' ') as b,
  -- 以下略

カラムの数や対象の文字列次第では、これは最早手に負えません。バグの温床、コードの複雑化、何より書くのが大変すぎます。これがSnowparkだと

df = df.replace('N/A', None)
df = df.replace('', None)
df = df.replace(' ', None)

3行です


最後に

ABEJAでは共に働く仲間を募集しています。 幅広い職種を募集しており、尖ったスキルが生かせる場があるかもしれません!ご興味ある方は是非ご応募ください。

careers.abejainc.com