Import Query Result to BigQuery

特定DirectoryにあるSQL fileを実行して、結果をBigQueryに入れるScriptを書いた.

  1. Background
  2. Import Query Result to BigQuery
  3. See Also

Background

Kaizen PlatformではBigQueryre:dashを使ってProjectの定量KPIの可視化をしていて、定期的に振り返りの機会を設けている.

これを実施・運用する上で困ったのが、UserのPVなどをplotする際に社内のUserかどうかがBigQueryに格納しているDataだけでは判別がつかないことだった.
(社内UserのIDリストを user_id NOT IN (...) に貼り付けるという真心こもったOperationが行われていた.)

Kaizen Platformでは数ヶ月に一度 Kaizen Week という日頃のプロジェクトを一時停止して、積みタスクや、リファクタリング、新しいツールの試験・導入などの時間を確保しようという試みがある. ちょうど今週がその Kaizen Week だったので、ここを改善しようと思った.

解決策としては2通り考えられる. 一つがLogにUserの属性を埋め込む方法、もう一つはBigQuery外部のDatabase (今回は社内のMySQL) からUserの属性を参照する方法だが、今回は二つ目の方法をとることにした.

外部DatabaseをBigQueryから参照する方法だが、Query Engineでうまい具合にJOINする方法 (Presto) と、外部DatabaseのDataをBigQueryにimportする方法が考えられた. 一つ目の方法はこれぐらい軽いことをやりるのにわざわさ導入するのはなって気がした (あくまで気がした) ので、外部DatabaseのDataをBigQueryにimportすることにした.

今回の場合、とりあえずUserのTableをうまい具合にBigQueryにimportするだけで良かったが、今後もカジュアルに外部DatabaseのDataをBigQueryにimportしたいという要望があったので、特定の場所にSQL fileを配置するだけで、それらを実行した結果をBigQueryにimportできるようにした.

Import Query Result to BigQuery

How

始めはGoでScriptを書いていたが、ふとEmbulkが使えないかと思って調べてみると、

  • MySQLのInput pluginとBigQueryのOutput pluginは当然ある.
  • MySQLのInput pluginで任意のQueryが実行できる.
  • Queryの実行結果に対応するSchemaからBigQueryのSchemaを生成できると良かったが (別でPlugin書けばできそう) 、今回はSQL fileと別に .schema.json でBigQueryのSchemaを用意することにする.
  • Configulation fileのExtensionを liquid にするとLiquid Template Engineが使用できる.
    • env によって外部から値を差し込むことが可能.

であり、Scriptの中で特定の場所にあるSQL fileをとってきてその情報で env を差し替えることによりQueryそれぞれでConfigulation fileを作成する必要がなく、Embulkで上の目標が達成できそうだったのでEmbulkを採用することにした.

Do it

Embulkはinstall済みだとして、

$ embulk gem install embulk-input-mysql
$ embulk gem install embulk-output-bigquery

で、今回必要なPluginsをinstallする.

in:
  type: mysql
  host: {{ env.mysql_host }}
  user: {{ env.mysql_user }}
  {% if env.mysql_password %}
  password: {{ env.mysql_password }}
  {% endif %}
  database:  {{ env.mysql_database }}
  query: {{ env.query }}
out:
  type: bigquery
  file_ext: csv
  auth_method: private_key
  service_account_email: {{ env.service_account_email }}
  p12_keyfile: {{ env.p12_keyfile }}
  path_prefix: /tmp/import_query_result_to_bq/
  file_ext: csv
  delete_from_local_when_job_end: 1
  project: your_project
  dataset: {{ env.dataset }}
  table: {{ env.table }}_%Y%m%d
  source_format: CSV
  formatter:
    type: csv
    header_line: false
  schema_file: {{ env.schema_file }}
  auto_create_table: 1

のように Liquid Template Engine を使用し env で設定可能な config.yml.liquid を用意し、

#!/bin/bash

QUERY_DIR=query
DATASET=tmp

export service_account_email=example@developer.gserviceaccount.com
export p12_keyfile=/path/to/p12_keyfile.p12

export mysql_host=localhost
export mysql_user=root
export mysql_password=password
export mysql_database=your_database
export dataset=$DATASET

for file in $QUERY_DIR/*.sql; do
  export query=`cat $file`
  filename=${file##*/}
  basename=${filename%.*}
  dir=$(cd $(dirname $file) && pwd)
  export table=$basename
  export schema_file=$dir/$basename.schema.json
  embulk run config.yml.liquid
done

のようなScriptを import_query_result_to_bigquery として用意し実行権限をつけ、 QUERY_DIR に設定したDirectoryに、

SELECT id FROM users WHENE is_admin = 1;

admin_users.sql として、

[
  { "name": "id", "type": "STRING" }
]

admin_users.schema.json として配置し、

$ ./import_query_result_to_bigquery

を実行すると、 admin_users.sql のQueryの実行結果を DATASET で設定したBigQueryのDatasetにTable名 admin_users_20160114 (PrefixはSQL file名で、Suffixは年月日) としてimportできる.

Operation

Query files

*.sql*.schema.json は専用のGitHubのRepositioryを作成して、そこに集約し、Scriptの実行前に QUERY_DIR で指定したDirectoryに展開する.

これによって、新しくQueryを追加する際に、GitHub上で完結できる.

Query in BigQuery

cronなどでDailyのJobとして実行するとして、Table名のSuffixとして _%Y%m%d がついているので、BigQuery上では TABLE_DATE_RANGE を使用して、

SELECT * FROM TABLE_DATE_RANGE(tmp.admin_users_, DATE_ADD(CURRENT_TIMESTAMP(), -1, 'DAY'), CURRENT_TIMESTAMP());

のようにすると、当日のTableを対象としてQueryを実行できる.


EmbulkのPlugin機構と Liquid Template Engine のおかげで簡単なScriptで業務が改善した.


See Also