この記事はABEJA Advent Calender20日目、及びSnowflake Advent Calender23日目の記事になります。
はじめに
こんにちは@Takayoshi_maです。今日はGoogle Analytics(UA) → Airbyte → Snowflake (Snowpark for Python)という流れでデータの抽出、加工を行います。
各々の技術について大雑把に紹介
Snowflake
Snowflake社(ティッカーコード: SNOW)が提供しているSaasのDWHで、よくBigQueryやRedshiftと比較されることが多いと思います。尚、Snowflake社は投資の神様ウォーレンバフェット率いるバークシャーハサウェイが2020年に巨額の投資を行ったことでも知られています。Snowflakeはマルチクラウドプラットフォーム(AWS, GCP, Azure)に対応しており高いパフォーマンスとコスパを誇るDWHです。特にAWSがメインの場合、Redshiftに変わるDWHとしてチョイスされるケースが増えてきていると思います。特にRedshiftの課金体系(起動している時間にお金がかかる)とSnowflakeの課金体系(処理をした分だけお金がかかる)が大きく異なるため、この辺りが判断材料の一つになってくるケースも多い気がします。
Snowpark
2021年末にリリースされたSnowflakeの新機能で、現在はScala/Jave/Pythonなどの言語をサポートしています。対応言語に関してはApache Sparkと似ていますね。 よく同じDWHとして比較されるBigQueryにもUDFがありますが、両者で異なる言語をサポートしていますし、うまく棲み分けができているのでは無いでしょうか? またSnowparkですが、その記述はApache Sparkと比較的似ています。なので例えばSnowpark for Pythonに慣れていない人でもPySparkと同じような書き方でデータ処理ができてしまうことも多いです。これは覚えておくとググる時に結構重宝する気がします。
Airbyte
2020年にサンフランシスコで創業され、わずか1年でシリーズAに到達したスタートアップ企業Airbyte社がOSSで公開しているデータ統合プラットフォームです。今年になってぼちぼち触る機会が出てきたのですが、紹介の意味も込めて記事にさせて頂きました。データ転送によく使われる技術としてdigdag+emblukがよく例として挙げられると思いますが、Airbyteも少しずつ増えてきている印象
環境構築
それではまず環境構築から簡単に書いていきます。
Airbyte
公式から出ている Deploy Airbyte - Airbyte Documentation に従ってDockerコンテナ群を立ち上げます。非常に簡単! ひとまず今回はローカルマシン上で起動します。
#!/bin/bash git clone https://github.com/airbytehq/airbyte.git cd airbyte docker compose up # 公式では"docker-compose up"でしたがV2なのでこれで
立ち上がったらlocalhost:8000
にアクセスし、メールアドレスを入力して進みます。
これでAirbyteの初期画面に行けたと思います。
ここから"Set up your first connection"ボタンを押下すると、以下のような画面が出てきますので、取り敢えずGoogle Analyticsを選択します。
尚、Airbyteにデフォルトで用意されているConnectionはかなり多岐に渡ります。以下の公式サイトをご参照ください
Connector Catalog - Airbyte Documentation
一応以下に2022年9月時点の情報を掲載しておきます
▶︎ Source(クリックすると展開します)
▶︎ Destinations(クリックすると展開します)
Connector | Product Release Stage | Available in Cloud? |
---|---|---|
Amazon SQS | Alpha | Yes |
Amazon Datalake | Alpha | No |
AzureBlobStorage | Alpha | Yes |
BigQuery | Generally Available | Yes |
Cassandra | Alpha | Yes |
Chargify (Keen) | Alpha | Yes |
ClickHouse | Alpha | Yes |
Databricks | Alpha | Yes |
DynamoDB | Alpha | Yes |
Elasticsearch | Alpha | Yes |
End-to-End Testing | Alpha | Yes |
Firebolt | Alpha | Yes |
Google Cloud Storage (GCS) | Beta | Yes |
Google Pubsub | Alpha | Yes |
Google Sheets | Alpha | Yes |
Kafka | Alpha | No |
Keen | Alpha | No |
Kinesis | Alpha | No |
Local CSV | Alpha | No |
Local JSON | Alpha | No |
MariaDB ColumnStore | Alpha | Yes |
MeiliSearch | Alpha | Yes |
MongoDB | Alpha | Yes |
MQTT | Alpha | Yes |
MS SQL Server | Alpha | Yes |
MySQL | Alpha | Yes |
Oracle | Alpha | Yes |
Postgres | Alpha | Yes |
Pulsar | Alpha | Yes |
RabbitMQ | Alpha | Yes |
Redis | Alpha | Yes |
Redshift | Beta | Yes |
Rockset | Alpha | Yes |
S3 | Generally Available | Yes |
Scylla | Alpha | Yes |
SFTP JSON | Alpha | Yes |
Snowflake | Generally Available | Yes |
SQLite | Alpha | No |
Streamr | Alpha | No |
TiDB | Alpha | No |
Google Analytics
今回はUAを使います。やり方に関しては以下の公式ドキュメントを参考にしました。 (Google Analyticsの導入については既にできているものとして一旦割愛します)
Google Analytics (Universal Analytics) - Airbyte Documentation
Google Analyticsと同じGoogleアカウントログイン後、GCPコンソールに移動します。既存のプロジェクトでも新規プロジェクトでも良いのでサービスカウントを作成、権限はデフォルトのまま作りました
次に、作成したアカウントの秘密鍵(JSON)を作成、自動的に鍵がダウンロードされるので保管します。その後Google Analyticsのホーム画面から左下の歯車をクリックし、「アカウントのアクセス管理をクリック」
右上の+ボタンから新規ユーザーの追加をクリックし、先ほど作成したユーザーアカウントのメールアドレスを追加、権限は閲覧者でOK、その後GCPのAPIとサービスからAnalytics Reporting APIを有効化します
最後に、Airbyteの画面に戻って、Sourceの追加から諸々の情報を入力します
Key | Value |
---|---|
Credentials | Service Account Key Aucentificationを選択 |
Service Account JSON Key | 作成した秘密鍵 |
Replication Start Data | 同期を開始した日付 |
View ID | 対象の view id |
Custom Reports (Optional) | 欲しい情報 |
Data request time increment in days (Optional) | 1日あたりの同期回数 |
Google Search Console
SEOと言えばGoogleAnalyticsともう一つGoogle Search Consoleが鉄板かと思います。自分もAnalyticsのリッチすぎる情報を消化しきれず、ついついこっちの方をメインで見ることが多い気がします。 本来であればサチコもSourceとして追加したいのですが、こちらはAPIの関係上GoogleWorkspaceの管理者IDが(多分)必要になってくるようです。今回はあくまでも個人利用であるため、Sourceの追加は割愛します。一応、以下のドキュメントにSourceの追加方法について詳しく掲載されています。
Google Search Console - Airbyte Documentation
Snowflake
先ほどはAirbyteのSources登録からGoogleAnalyticsを追加しましたが、引き続きDestinationsとしてSnowflkeを登録していきます。具体的な方法についてはAirbyteの公式ドキュメントに非常に分かりやすく記載されております。
Snowflake - Airbyte Document -
まず初めに現在のアカウントにSECURITYADMIN以上のロールが割り当てられていることを確認します。直接GUIで確認しても良いし、ワークシートを開いて以下のクエリを叩くことでも確認可能です。
-- Acountで確認 SHOW PARAMETERS LIKE 'network_policy' IN ACCOUNT; -- またはUserで確認 SHOW PARAMETERS LIKE 'network_policy' IN USER <username>;
ワークシートを開いて以下のクエリを実行し、Airbyte用に各種エンティティ(DWH, DB, Schema, User, Role)を作成します。クエリは公式からそのまま引用しています
-- set variables (these need to be uppercase) set airbyte_role = 'AIRBYTE_ROLE'; set airbyte_username = 'AIRBYTE_USER'; set airbyte_warehouse = 'AIRBYTE_WAREHOUSE'; set airbyte_database = 'AIRBYTE_DATABASE'; set airbyte_schema = 'AIRBYTE_SCHEMA'; -- set user password set airbyte_password = 'password'; begin; -- create Airbyte role use role securityadmin; create role if not exists identifier($airbyte_role); grant role identifier($airbyte_role) to role SYSADMIN; -- create Airbyte user create user if not exists identifier($airbyte_username) password = $airbyte_password default_role = $airbyte_role default_warehouse = $airbyte_warehouse; grant role identifier($airbyte_role) to user identifier($airbyte_username); -- change role to sysadmin for warehouse / database steps use role sysadmin; -- create Airbyte warehouse create warehouse if not exists identifier($airbyte_warehouse) warehouse_size = xsmall warehouse_type = standard auto_suspend = 60 auto_resume = true initially_suspended = true; -- create Airbyte database create database if not exists identifier($airbyte_database); -- grant Airbyte warehouse access grant USAGE on warehouse identifier($airbyte_warehouse) to role identifier($airbyte_role); -- grant Airbyte database access grant OWNERSHIP on database identifier($airbyte_database) to role identifier($airbyte_role); commit; begin; USE DATABASE identifier($airbyte_database); -- create schema for Airbyte data CREATE SCHEMA IF NOT EXISTS identifier($airbyte_schema); commit; begin; -- grant Airbyte schema access grant OWNERSHIP on schema identifier($airbyte_schema) to role identifier($airbyte_role); commit;
クエリ実行後には以下のように各エンティティが作られていることが確認できます。
Role | DB, Schema | User |
---|---|---|
次にデータのローディング場所についての設定を行います。SnowflakeではInternal Stage, S3(AWS), GCS(GCP), AzureBolbStorage(Azure)の中から選べます。Internal Stageを使う際はもちろんその分の料金がかかってきますが、低量であれば気にするほどではありません。尚、今回はInternal Stageを使っていくので特に設定の必要はありません。
そして、Airbyteから各情報を入力していきます。公式ドキュメントに必要な情報があるのでそれに沿っていく形で良いと思います。一応今回はシンプルにパスワード認証で設定しています。尚、ブログ用の簡易的な構築とは言え、流石にそれだけだと怖いのでSnowflake側のセキュリティ設定でIP制限だけかけておきました。ちなみにAirbyte側にホスト名を入力する箇所がありますが、ホスト名の取得については以下のクエリを叩きます
select system$whitelist();
Connection
SourceとDestinationの設定ができたので次は両者をつなげていきます。まずAirbyteのConnectionタブをクリックした後に、先ほどのSourceとDestinationを選択していきます。
その後、更新頻度や紐づける情報などを設定していきます。
もちろん個別に設定することも可能ですが、以下のように一括で設定を反映することも可能です。
Syncされたテーブルですが、以下のような形で保存されていることが確認できます。ここではデフォルトの連携で入るそれぞれのテーブルに関する説明は省きます。
Snowpark
サンプルとして、自身のプロ野球データサイトにおけるGoogle Analyticsのデータを使用しています。
今回の処理
以下のような定義でViewを作成していきます。Snowpark公式ドキュメントの中でクロス集計に関する情報を見つけることができなかったため、前半部分はSQLで記述しています。
""" session.py Create a session and return a connection to the Snowflake database """ import os from snowflake.snowpark import Session def create_session() -> Session: params = { 'account': os.environ['ACCOUNT'], 'user': os.environ['SNOWFLAKE_USER'], 'password': os.environ['SNOWFLAKE_PASS'], 'role': os.environ['SNOWFLAKE_ROLE'], 'warehouse': os.environ['SNOWFLAKE_WAREHOUSE'], 'database': os.environ['DB_NAME'], 'schema': os.environ['SCHEMA_NAME'] } session = Session.builder.configs(params).create() return session
""" main.py """ import os from dotenv import load_dotenv from snowflake.snowpark.functions import col from snowflake.snowpark.dataframe import DataFrame from session import create_session load_dotenv() session = create_session() # 下記テーブル(df_device, SQLで記述)と見た目を揃えるためSQLで記述 query_overview = f""" select ga_date , ga_users , ga_exitrate , ga_newusers , ga_sessions , ga_pageviews , ga_avgtimeonpage , ga_sessionsperuser , ga_avgsessionduration , ga_pageviewspersession from {os.environ["DB_NAME"]}.{os.environ["SCHEMA_NAME"]}.npb_website_overview """ # Snowparkでのクロス集計の記述が不明なため生SQLで記述 query_device = f""" select ga_date , sum(case ga_devicecategory when 'desktop' then ga_users else 0 end) as desktop_user , sum(case ga_devicecategory when 'mobile' then ga_users else 0 end) as mobile_user , sum(case ga_devicecategory when 'tablet' then ga_users else 0 end) as tablets_user , sum(case ga_devicecategory when 'desktop' then ga_pageviews else 0 end) as desktop_pageviews , sum(case ga_devicecategory when 'mobile' then ga_pageviews else 0 end) as mobile_pageviews , sum(case ga_devicecategory when 'tablet' then ga_pageviews else 0 end) as tablets_pageviews from {os.environ["DB_NAME"]}.{os.environ["SCHEMA_NAME"]}.npb_devices group by ga_date """ df = session.sql(query_overview) df_device = session.sql(query_device) def create_per_person_df(df: DataFrame, device_name: str) -> DataFrame: """ デバイス毎の1人あたりページビュー数を算出する Args: df (DataFrame): npb_devicesテーブルを想定 device_name (str): デバイス名 (desktop, mobile, tablets) """ _df = df.filter(col(f'{device_name}_user') > 0) _df = _df.withColumn(f'{device_name}_pageviews_per_person', col(f'{device_name}_pageviews') / col(f'{device_name}_user')) _df = _df.select('ga_date', f'{device_name}_pageviews_per_person') return _df df_desktop = create_per_person_df(df_device, 'desktop') df_mobile = create_per_person_df(df_device, 'mobile') df_tablets = create_per_person_df(df_device, 'tablets') df = df.join(df_desktop, ['ga_date'], 'left') df = df.join(df_mobile, ['ga_date'], 'left') df = df.join(df_tablets, ['ga_date'], 'left') df = df.sort('ga_date') # Viewを保存 df.create_or_replace_view( f'{os.environ["DB_NAME"]}.{os.environ["SCHEMA_NAME"]}.npb_my_view')
npb_website_overview
に必要な情報はほとんど入っていますが、今回はそれにデバイス毎の1ユーザーあたりページビュー数を追加します。レスポンシブサイトとはいえ、モバイルとタブレット、デスクトップでUI/UXに差は出てきてしまいますが、果たしてどの程度ページビュー数や滞在時間に差があるのかの検証です。
ちなみに上記の処理を実行するとSnowflake上にSQLが発行されます。確認したところ以下のSQLが発行されていました。
create or replace view AIRBYTE_DATABASE.AIRBYTE_SCHEMA.NPB_MY_VIEW( GA_DATE, GA_USERS, GA_EXITRATE, GA_NEWUSERS, GA_SESSIONS, GA_PAGEVIEWS, GA_AVGTIMEONPAGE, GA_SESSIONSPERUSER, GA_AVGSESSIONDURATION, GA_PAGEVIEWSPERSESSION, DESKTOP_PAGEVIEWS_PER_PERSON, MOBILE_PAGEVIEWS_PER_PERSON, TABLETS_PAGEVIEWS_PER_PERSON ) as SELECT * FROM ( SELECT * FROM ( SELECT * FROM (( SELECT "GA_DATE" AS "GA_DATE", "GA_USERS" AS "GA_USERS", "GA_EXITRATE" AS "GA_EXITRATE", "GA_NEWUSERS" AS "GA_NEWUSERS", "GA_SESSIONS" AS "GA_SESSIONS", "GA_PAGEVIEWS" AS "GA_PAGEVIEWS", "GA_AVGTIMEONPAGE" AS "GA_AVGTIMEONPAGE", "GA_SESSIONSPERUSER" AS "GA_SESSIONSPERUSER", "GA_AVGSESSIONDURATION" AS "GA_AVGSESSIONDURATION", "GA_PAGEVIEWSPERSESSION" AS "GA_PAGEVIEWSPERSESSION", "DESKTOP_PAGEVIEWS_PER_PERSON" AS "DESKTOP_PAGEVIEWS_PER_PERSON", "MOBILE_PAGEVIEWS_PER_PERSON" AS "MOBILE_PAGEVIEWS_PER_PERSON" FROM ( SELECT * FROM (( SELECT "GA_DATE" AS "GA_DATE", "GA_USERS" AS "GA_USERS", "GA_EXITRATE" AS "GA_EXITRATE", "GA_NEWUSERS" AS "GA_NEWUSERS", "GA_SESSIONS" AS "GA_SESSIONS", "GA_PAGEVIEWS" AS "GA_PAGEVIEWS", "GA_AVGTIMEONPAGE" AS "GA_AVGTIMEONPAGE", "GA_SESSIONSPERUSER" AS "GA_SESSIONSPERUSER", "GA_AVGSESSIONDURATION" AS "GA_AVGSESSIONDURATION", "GA_PAGEVIEWSPERSESSION" AS "GA_PAGEVIEWSPERSESSION", "DESKTOP_PAGEVIEWS_PER_PERSON" AS "DESKTOP_PAGEVIEWS_PER_PERSON" FROM ( SELECT * FROM (( SELECT "GA_DATE" AS "GA_DATE", "GA_USERS" AS "GA_USERS", "GA_EXITRATE" AS "GA_EXITRATE", "GA_NEWUSERS" AS "GA_NEWUSERS", "GA_SESSIONS" AS "GA_SESSIONS", "GA_PAGEVIEWS" AS "GA_PAGEVIEWS", "GA_AVGTIMEONPAGE" AS "GA_AVGTIMEONPAGE", "GA_SESSIONSPERUSER" AS "GA_SESSIONSPERUSER", "GA_AVGSESSIONDURATION" AS "GA_AVGSESSIONDURATION", "GA_PAGEVIEWSPERSESSION" AS "GA_PAGEVIEWSPERSESSION" FROM ( select ga_date , ga_users , ga_exitrate , ga_newusers , ga_sessions , ga_pageviews , ga_avgtimeonpage , ga_sessionsperuser , ga_avgsessionduration , ga_pageviewspersession from AIRBYTE_DATABASE.AIRBYTE_SCHEMA.npb_website_overview )) AS SNOWPARK_TEMP_TABLE_Y6CSYKKW6K LEFT OUTER JOIN ( SELECT "GA_DATE" AS "GA_DATE", "DESKTOP_PAGEVIEWS_PER_PERSON" AS "DESKTOP_PAGEVIEWS_PER_PERSON" FROM ( SELECT "GA_DATE", "DESKTOP_PAGEVIEWS_PER_PERSON" FROM ( SELECT "GA_DATE", "DESKTOP_USER", "MOBILE_USER", "TABLETS_USER", "DESKTOP_PAGEVIEWS", "MOBILE_PAGEVIEWS", "TABLETS_PAGEVIEWS", ("DESKTOP_PAGEVIEWS" / "DESKTOP_USER") AS "DESKTOP_PAGEVIEWS_PER_PERSON" FROM ( SELECT * FROM ( select ga_date , sum(case ga_devicecategory when 'desktop' then ga_users else 0 end) as desktop_user , sum(case ga_devicecategory when 'mobile' then ga_users else 0 end) as mobile_user , sum(case ga_devicecategory when 'tablet' then ga_users else 0 end) as tablets_user , sum(case ga_devicecategory when 'desktop' then ga_pageviews else 0 end) as desktop_pageviews , sum(case ga_devicecategory when 'mobile' then ga_pageviews else 0 end) as mobile_pageviews , sum(case ga_devicecategory when 'tablet' then ga_pageviews else 0 end) as tablets_pageviews from AIRBYTE_DATABASE.AIRBYTE_SCHEMA.npb_devices group by ga_date ) WHERE ("DESKTOP_USER" > 0 :: bigint))))) AS SNOWPARK_TEMP_TABLE_F1KQPY4Y1A USING (ga_date)))) AS SNOWPARK_TEMP_TABLE_DZJ0YH3IKI LEFT OUTER JOIN ( SELECT "GA_DATE" AS "GA_DATE", "MOBILE_PAGEVIEWS_PER_PERSON" AS "MOBILE_PAGEVIEWS_PER_PERSON" FROM ( SELECT "GA_DATE", "MOBILE_PAGEVIEWS_PER_PERSON" FROM ( SELECT "GA_DATE", "DESKTOP_USER", "MOBILE_USER", "TABLETS_USER", "DESKTOP_PAGEVIEWS", "MOBILE_PAGEVIEWS", "TABLETS_PAGEVIEWS", ("MOBILE_PAGEVIEWS" / "MOBILE_USER") AS "MOBILE_PAGEVIEWS_PER_PERSON" FROM ( SELECT * FROM ( select ga_date , sum(case ga_devicecategory when 'desktop' then ga_users else 0 end) as desktop_user , sum(case ga_devicecategory when 'mobile' then ga_users else 0 end) as mobile_user , sum(case ga_devicecategory when 'tablet' then ga_users else 0 end) as tablets_user , sum(case ga_devicecategory when 'desktop' then ga_pageviews else 0 end) as desktop_pageviews , sum(case ga_devicecategory when 'mobile' then ga_pageviews else 0 end) as mobile_pageviews , sum(case ga_devicecategory when 'tablet' then ga_pageviews else 0 end) as tablets_pageviews from AIRBYTE_DATABASE.AIRBYTE_SCHEMA.npb_devices group by ga_date ) WHERE ("MOBILE_USER" > 0 :: bigint))))) AS SNOWPARK_TEMP_TABLE_Q2L8KVPG8W USING (ga_date)))) AS SNOWPARK_TEMP_TABLE_3J9UYSEAIC LEFT OUTER JOIN ( SELECT "GA_DATE" AS "GA_DATE", "TABLETS_PAGEVIEWS_PER_PERSON" AS "TABLETS_PAGEVIEWS_PER_PERSON" FROM ( SELECT "GA_DATE", "TABLETS_PAGEVIEWS_PER_PERSON" FROM ( SELECT "GA_DATE", "DESKTOP_USER", "MOBILE_USER", "TABLETS_USER", "DESKTOP_PAGEVIEWS", "MOBILE_PAGEVIEWS", "TABLETS_PAGEVIEWS", ("TABLETS_PAGEVIEWS" / "TABLETS_USER") AS "TABLETS_PAGEVIEWS_PER_PERSON" FROM ( SELECT * FROM ( select ga_date , sum(case ga_devicecategory when 'desktop' then ga_users else 0 end) as desktop_user , sum(case ga_devicecategory when 'mobile' then ga_users else 0 end) as mobile_user , sum(case ga_devicecategory when 'tablet' then ga_users else 0 end) as tablets_user , sum(case ga_devicecategory when 'desktop' then ga_pageviews else 0 end) as desktop_pageviews , sum(case ga_devicecategory when 'mobile' then ga_pageviews else 0 end) as mobile_pageviews , sum(case ga_devicecategory when 'tablet' then ga_pageviews else 0 end) as tablets_pageviews from AIRBYTE_DATABASE.AIRBYTE_SCHEMA.npb_devices group by ga_date ) WHERE ("TABLETS_USER" > 0 :: bigint))))) AS SNOWPARK_TEMP_TABLE_EKH4EDRB3T USING (ga_date))) ORDER BY "GA_DATE" ASC NULLS FIRST);
一見するとすっきりに見えますが、自分で書いた生SQLの部分がそう見えるだけで、実際には結構複雑です。見やすいよう整形したSQLを一応下記に残しておきます。
▶︎ SQL(長いので折り畳んでいます)
create or replace view AIRBYTE_DATABASE.AIRBYTE_SCHEMA.NPB_MY_VIEW( GA_DATE, GA_USERS, GA_EXITRATE, GA_NEWUSERS, GA_SESSIONS, GA_PAGEVIEWS, GA_AVGTIMEONPAGE, GA_SESSIONSPERUSER, GA_AVGSESSIONDURATION, GA_PAGEVIEWSPERSESSION, DESKTOP_PAGEVIEWS_PER_PERSON, MOBILE_PAGEVIEWS_PER_PERSON, TABLETS_PAGEVIEWS_PER_PERSON ) as SELECT * FROM ( SELECT * FROM ( SELECT * FROM ( ( SELECT "GA_DATE" AS "GA_DATE", "GA_USERS" AS "GA_USERS", "GA_EXITRATE" AS "GA_EXITRATE", "GA_NEWUSERS" AS "GA_NEWUSERS", "GA_SESSIONS" AS "GA_SESSIONS", "GA_PAGEVIEWS" AS "GA_PAGEVIEWS", "GA_AVGTIMEONPAGE" AS "GA_AVGTIMEONPAGE", "GA_SESSIONSPERUSER" AS "GA_SESSIONSPERUSER", "GA_AVGSESSIONDURATION" AS "GA_AVGSESSIONDURATION", "GA_PAGEVIEWSPERSESSION" AS "GA_PAGEVIEWSPERSESSION", "DESKTOP_PAGEVIEWS_PER_PERSON" AS "DESKTOP_PAGEVIEWS_PER_PERSON", "MOBILE_PAGEVIEWS_PER_PERSON" AS "MOBILE_PAGEVIEWS_PER_PERSON" FROM ( SELECT * FROM ( ( SELECT "GA_DATE" AS "GA_DATE", "GA_USERS" AS "GA_USERS", "GA_EXITRATE" AS "GA_EXITRATE", "GA_NEWUSERS" AS "GA_NEWUSERS", "GA_SESSIONS" AS "GA_SESSIONS", "GA_PAGEVIEWS" AS "GA_PAGEVIEWS", "GA_AVGTIMEONPAGE" AS "GA_AVGTIMEONPAGE", "GA_SESSIONSPERUSER" AS "GA_SESSIONSPERUSER", "GA_AVGSESSIONDURATION" AS "GA_AVGSESSIONDURATION", "GA_PAGEVIEWSPERSESSION" AS "GA_PAGEVIEWSPERSESSION", "DESKTOP_PAGEVIEWS_PER_PERSON" AS "DESKTOP_PAGEVIEWS_PER_PERSON" FROM ( SELECT * FROM ( ( SELECT "GA_DATE" AS "GA_DATE", "GA_USERS" AS "GA_USERS", "GA_EXITRATE" AS "GA_EXITRATE", "GA_NEWUSERS" AS "GA_NEWUSERS", "GA_SESSIONS" AS "GA_SESSIONS", "GA_PAGEVIEWS" AS "GA_PAGEVIEWS", "GA_AVGTIMEONPAGE" AS "GA_AVGTIMEONPAGE", "GA_SESSIONSPERUSER" AS "GA_SESSIONSPERUSER", "GA_AVGSESSIONDURATION" AS "GA_AVGSESSIONDURATION", "GA_PAGEVIEWSPERSESSION" AS "GA_PAGEVIEWSPERSESSION" FROM ( select ga_date, ga_users, ga_exitrate, ga_newusers, ga_sessions, ga_pageviews, ga_avgtimeonpage, ga_sessionsperuser, ga_avgsessionduration, ga_pageviewspersession from AIRBYTE_DATABASE.AIRBYTE_SCHEMA.npb_website_overview ) ) AS SNOWPARK_TEMP_TABLE_Y6CSYKKW6K LEFT OUTER JOIN ( SELECT "GA_DATE" AS "GA_DATE", "DESKTOP_PAGEVIEWS_PER_PERSON" AS "DESKTOP_PAGEVIEWS_PER_PERSON" FROM ( SELECT "GA_DATE", "DESKTOP_PAGEVIEWS_PER_PERSON" FROM ( SELECT "GA_DATE", "DESKTOP_USER", "MOBILE_USER", "TABLETS_USER", "DESKTOP_PAGEVIEWS", "MOBILE_PAGEVIEWS", "TABLETS_PAGEVIEWS", ("DESKTOP_PAGEVIEWS" / "DESKTOP_USER") AS "DESKTOP_PAGEVIEWS_PER_PERSON" FROM ( SELECT * FROM ( select ga_date, sum( case ga_devicecategory when 'desktop' then ga_users else 0 end ) as desktop_user, sum( case ga_devicecategory when 'mobile' then ga_users else 0 end ) as mobile_user, sum( case ga_devicecategory when 'tablet' then ga_users else 0 end ) as tablets_user, sum( case ga_devicecategory when 'desktop' then ga_pageviews else 0 end ) as desktop_pageviews, sum( case ga_devicecategory when 'mobile' then ga_pageviews else 0 end ) as mobile_pageviews, sum( case ga_devicecategory when 'tablet' then ga_pageviews else 0 end ) as tablets_pageviews from AIRBYTE_DATABASE.AIRBYTE_SCHEMA.npb_devices group by ga_date ) WHERE ("DESKTOP_USER" > 0 :: bigint) ) ) ) ) AS SNOWPARK_TEMP_TABLE_F1KQPY4Y1A USING (ga_date) ) ) ) AS SNOWPARK_TEMP_TABLE_DZJ0YH3IKI LEFT OUTER JOIN ( SELECT "GA_DATE" AS "GA_DATE", "MOBILE_PAGEVIEWS_PER_PERSON" AS "MOBILE_PAGEVIEWS_PER_PERSON" FROM ( SELECT "GA_DATE", "MOBILE_PAGEVIEWS_PER_PERSON" FROM ( SELECT "GA_DATE", "DESKTOP_USER", "MOBILE_USER", "TABLETS_USER", "DESKTOP_PAGEVIEWS", "MOBILE_PAGEVIEWS", "TABLETS_PAGEVIEWS", ("MOBILE_PAGEVIEWS" / "MOBILE_USER") AS "MOBILE_PAGEVIEWS_PER_PERSON" FROM ( SELECT * FROM ( select ga_date, sum( case ga_devicecategory when 'desktop' then ga_users else 0 end ) as desktop_user, sum( case ga_devicecategory when 'mobile' then ga_users else 0 end ) as mobile_user, sum( case ga_devicecategory when 'tablet' then ga_users else 0 end ) as tablets_user, sum( case ga_devicecategory when 'desktop' then ga_pageviews else 0 end ) as desktop_pageviews, sum( case ga_devicecategory when 'mobile' then ga_pageviews else 0 end ) as mobile_pageviews, sum( case ga_devicecategory when 'tablet' then ga_pageviews else 0 end ) as tablets_pageviews from AIRBYTE_DATABASE.AIRBYTE_SCHEMA.npb_devices group by ga_date ) WHERE ("MOBILE_USER" > 0 :: bigint) ) ) ) ) AS SNOWPARK_TEMP_TABLE_Q2L8KVPG8W USING (ga_date) ) ) ) AS SNOWPARK_TEMP_TABLE_3J9UYSEAIC LEFT OUTER JOIN ( SELECT "GA_DATE" AS "GA_DATE", "TABLETS_PAGEVIEWS_PER_PERSON" AS "TABLETS_PAGEVIEWS_PER_PERSON" FROM ( SELECT "GA_DATE", "TABLETS_PAGEVIEWS_PER_PERSON" FROM ( SELECT "GA_DATE", "DESKTOP_USER", "MOBILE_USER", "TABLETS_USER", "DESKTOP_PAGEVIEWS", "MOBILE_PAGEVIEWS", "TABLETS_PAGEVIEWS", ("TABLETS_PAGEVIEWS" / "TABLETS_USER") AS "TABLETS_PAGEVIEWS_PER_PERSON" FROM ( SELECT * FROM ( select ga_date, sum( case ga_devicecategory when 'desktop' then ga_users else 0 end ) as desktop_user, sum( case ga_devicecategory when 'mobile' then ga_users else 0 end ) as mobile_user, sum( case ga_devicecategory when 'tablet' then ga_users else 0 end ) as tablets_user, sum( case ga_devicecategory when 'desktop' then ga_pageviews else 0 end ) as desktop_pageviews, sum( case ga_devicecategory when 'mobile' then ga_pageviews else 0 end ) as mobile_pageviews, sum( case ga_devicecategory when 'tablet' then ga_pageviews else 0 end ) as tablets_pageviews from AIRBYTE_DATABASE.AIRBYTE_SCHEMA.npb_devices group by ga_date ) WHERE ("TABLETS_USER" > 0 :: bigint) ) ) ) ) AS SNOWPARK_TEMP_TABLE_EKH4EDRB3T USING (ga_date) ) ) ORDER BY "GA_DATE" ASC NULLS FIRST );
どうでしょう?整形後で少し見やすくした場合でも、一見すると「??」って感じですね。正直頭に直ぐには入ってこないです。一応、自分でもSQL記述してみましたが、以下のような形の方が見やすいかなとは思います。(それでもSnowparkと比較すると冗長)
create or replace view airbyte_database.airbyte_schema.npb_view2 as /* デバイス毎の1人あたりページビュー数を算出する */ with users_by_device as ( select ga_date , sum(case ga_devicecategory when 'desktop' then ga_users else 0 end) as desktop_user , sum(case ga_devicecategory when 'mobile' then ga_users else 0 end) as mobile_user , sum(case ga_devicecategory when 'tablet' then ga_users else 0 end) as tablets_user , sum(case ga_devicecategory when 'desktop' then ga_pageviews else 0 end) as desktop_pageviews , sum(case ga_devicecategory when 'mobile' then ga_pageviews else 0 end) as mobile_pageviews , sum(case ga_devicecategory when 'tablet' then ga_pageviews else 0 end) as tablets_pageviews from airbyte_database.airbyte_schema.npb_devices group by ga_date ), t_desktop as ( select ga_date , desktop_pageviews / desktop_user as desktop_views_per_person from users_by_device where desktop_user > 0 ), t_mobile as ( select ga_date , mobile_pageviews / mobile_user as mobile_views_per_person from users_by_device where mobile_user > 0 ), t_tablets as ( select ga_date , tablets_pageviews / tablets_user as tablets_views_per_person from users_by_device where tablets_user > 0 ) select ga_date , ga_users , ga_exitrate , ga_newusers , ga_sessions , ga_pageviews , ga_avgtimeonpage , ga_sessionsperuser , ga_avgsessionduration , ga_pageviewspersession , t_desktop.desktop_views_per_person , t_mobile.mobile_views_per_person , t_tablets.tablets_views_per_person from airbyte_database.airbyte_schema.npb_website_overview left join t_desktop using(ga_date) left join t_mobile using(ga_date) left join t_tablets using(ga_date) order by ga_date
結局Snowparkが使えると何がいいか
色々あると思いますが、記述量を減らせること、UDFとして定義しておくことで保守性や可読性を高められる点などだと思います。 例えばSQLで全てのカラムに対して以下のような置換処理を行うとします。
select nullif('column1', 'N/A') as a, nullif('column2', 'N/A') as a, -- 中略 nullif('column1', '') as a, nullif('column2', '') as b, -- 中略 nullif('column1', ' ') as a, nullif('column2', ' ') as b, -- 以下略
カラムの数や対象の文字列次第では、これは最早手に負えません。バグの温床、コードの複雑化、何より書くのが大変すぎます。これがSnowparkだと
df = df.replace('N/A', None) df = df.replace('', None) df = df.replace(' ', None)
3行です
最後に
ABEJAでは共に働く仲間を募集しています。 幅広い職種を募集しており、尖ったスキルが生かせる場があるかもしれません!ご興味ある方は是非ご応募ください。