こんにちは。AnyMind で機械学習エンジニアをしている河本直起です。Anymind では 0 からデータパイプライン含めた MLOps 環境を作成しています。
今回は Cloud Composer (Airflow) を用いて実装している現状のデータパイプラインについて、現状と将来的な展望含めてご紹介できればと思います。
課題
機械学習モデルの生成に必要なデータはプロダクト側の RDB に保存されているんですが、私がチームに参加した段階ではモデル生成バッチがその RDB から直接データを取り出す形になっていました。そのため、プロダクト要件に適した RDB だと基本的に重い機械学習系のデータ処理に対応しきれないこと、機械学習系の機能が急激に増え負荷が大きくなってきたこと、RDB だと柔軟なデータを用いた検証がしづらいことなどが課題になっていました。
要件
上記課題から、まずはプロダクト側のデータセットと機械学習向けのデータセットを分離し、機械学習向けのデータセットはそれに適したデータストアに保存することが第一の要件となります。一方で、この開発に割くことのできるリソースが少なく、加えて急ぎで開発が必要だったため一旦要件を最低限にして以下のように定めました。
- 機械学習向けデータセットが BigQuery 上に存在すること
- そのデータセットが日次で更新されること
- プロジェクトごとにデータ整形処理を行う(プロジェクト間で汎用的なテーブルを用意しない)こと
設計
そこで、一旦以下のような形でプロダクト側のデータセットの複製、機械学習向けデータセットの生成を行うことに決めました。
Cloud Composer (Airflow) を選んだ理由
データ生成パイプライン用のフレームワークとしては Vertex Pipelines (Kubeflow) も検討したのですが、現状は Cloud Composer (Airflow) を採用しています。
モデル生成パイプライン用のフレームワークとして既に Vertex Pipelines (Kubeflow) を採用しているのですが、Vertex Pipelines にはフルマネージドであり開発に必要な技術要件が低いという利点がある一方で、まだ新しいサービスであり汎用的に使える既存の Component が少なく、ある程度自分達で実装する必要があります。まだ社内にモデル生成及びデータ生成の実装ノウハウやベストプラクティスが貯まっていないこともあり、データ生成処理も Vertex Pipelines で行ってしまうとコードの属人化が進んでしまう恐れがありました。それを防ぎたかったことが1つ目の理由になります。
また、データ生成処理とモデル生成処理が同じデプロイ単位に入ることはなく、その二つが混同されてしまうとより属人化が進むと考えました。使用するフレームワークを切り分けることで、その二つを厳密に切り分けることができるということが2つ目の理由になります。また、切り分けることでデータパイプラインを他チームへ委譲することも簡単になります。(Vertex Pipelines で Input/Output の定まった汎用的な Component を用意でき、それを共通で使えるようにできれば Vertex Pipelines に移行することも考えています。)
そして、Vertex Pipelines の求められる技術要件が低いという利点は、データサイエンティストとインフラエンジニアで求められるスキルセットが重なりづらいことからくると思うのですが、データ処理を実装するエンジニアは充分なインフラ知識を持ち合わせているだろうと考えたことも理由となっています。
プロダクト用データセットの複製
今回は特に上記図1におけるプロダクトデータセットの複製に焦点を当てて紹介できればと思います。
複製方法
プロダクトのデータセットは Cloud SQL 上にあるのですが、Cloud SQL から Big Query にデータを転送する場合
- 定期的にテーブル全体、もしくは一部のスナップショットをバルクで転送する
- Cloud DataFlow + Cloud Pub/Sub などを用いてトランザクションを逐次転送する
の2パターンが考えられると思います。
前述の通り今回は日次更新を想定しており、リアルタイム性を重視しないため前者を採用しました。
具体的には、以下のサンプルのようにBigQuery の federated queries を利用し BigQuery の date-sharded テーブルを作成しています。(date-partitioned ではなく date-sharded テーブルを選んだ理由は後続の Cloud Storage への転送がやりやすいためです)
query = '''
SELECT *
FROM EXTERNAL_QUERY(
"projects/PROJECT_ID/locations/LOCATION/connections/CONNECTION_NAME",
"SELECT * FROM RDB_TABLE_NAME")
'''
import_to_bq_task = BigQueryOperator(
task_id='import_to_bq',
sql=query,
destination_dataset_table=PROJECT_ID.DATASET.BQ_TABLE_NAME_{{ next_ds_nodash }}",
write_disposition="WRITE_TRUNCATE",
create_disposition="CREATE_IF_NEEDED",
allow_large_results=True,
use_legacy_sql=False
)
また、プロダクトのテーブルには大きく分けて以下の2種類が存在し、
- マスタテーブル
- ログデータテーブル
マスタテーブルに関しては実行ごとにデータを洗い替えする形に、ログデータテーブルは更新された時間をカラムとして持つため、そのカラムをもとに前回実行以降に更新されたレコードのみを取得して APPEND する形にしています。
最新以外のデータの削除
前述の通り、BigQueryに複製されたプロダクトデータセットは複数プロジェクトから参照されます。ログデータテーブルに関しては最新更新以外のレコードも蓄積されていきますが、最新データのみを取得するロジックをプロジェクトごとに実装する訳にはいかないため、BigQuery への転送後以下のサンプルのように最新のレコードのみを持つテーブルを作成する処理を行っています。
query = '''
SELECT * EXCEPT(rn)
FROM (
SELECT
*,
row_number() over (PARTITION BY id ORDER BY updated DESC) AS rn
FROM (
SELECT * FROM PROJECT_ID.DATASET.INTERMEDIATE_TABLE_NAME_{{ next_ds_nodash }}
UNION ALL
SELECT * FROM PROJECT_ID.DATASET.TABLE_NAME_{{ ds_nodash }}
)
)
WHERE rn = 1
'''
create_table_with_only_latest_record_task = BigQueryOperator(
task_id='create_table_with_only_latest_record',
sql=query,
destination_dataset_table="PROJECT_ID.DATASET.TABLE_NAME_{{ next_ds_nodash }}",
write_disposition="WRITE_TRUNCATE",
create_disposition="CREATE_IF_NEEDED",
allow_large_results=True,
use_legacy_sql=False
)
リージョンの不一致の解決
作成されたデータセットは最終的に Vertex Pipelines 上で動いているモデル生成処理の Input として参照されますが、Vertex Pipelines の対応している region とプロダクトの Cloud SQL の region が一致しておらず、また採用している region では BigQuery 内で region を跨ぐデータ生成処理ができませんでした。(us multi-region なら可能なようです)そのため、上記図のように region 間の転送が可能な Cloud Storage に一度転送し region を跨いだ転送を行っています。
Cloud Composer Environment の作成
Cloud Composer を利用するに当たって Environment を作成する必要がありますが、こちらに関しては別途レポジトリを作成し、以下のサンプルのように Cloud Build を利用し substitutions として設定を記述する形でコードベースで管理しています。
本番やステージングなど環境の切り替えは Circle CI から Cloud Build をキックする時に substitution として変数を渡すことで対応しています。
# cloudbuild.yaml
options:
dynamic_substitutions: true
substitutions:
_COMPOSER_ENVIRONMENT_NAME: your_environment_name
_COMPOSER_IMAGE_VERSION: composer-2.0.4-airflow-2.2.3
_SERVICE_ACCOUNT: your_service_account
_COMPOSER_LOCATION: your_location
_COMPOSER_ENVIRONMENT_SIZE: medium
_COMPOSER_MAX_WORKERS: "10"
# given from Circle CI file
_PROFILE: development
# environment variable to be set in comoposer environment
_COMPOSER_ENV_VARIABLES: PROFILE=${_PROFILE}
steps:
- id: create composer environment if not exist
name: 'gcr.io/cloud-builders/gcloud'
entrypoint: 'bash'
args: [
'-c', 'gcloud composer environments create $_COMPOSER_ENVIRONMENT_NAME
--location=$_COMPOSER_LOCATION --service-account=$_SERVICE_ACCOUNT --image-version=$_COMPOSER_IMAGE_VERSION
|| exit 0']
- id: reset composer env variables if changed
name: 'gcr.io/cloud-builders/gcloud'
entrypoint: 'bash'
args: [
'-c', 'gcloud composer environments update $_COMPOSER_ENVIRONMENT_NAME
--location=$_COMPOSER_LOCATION
--update-env-variables=$_COMPOSER_ENV_VARIABLES
|| exit 0']
- id: upgrade composer environment if changed
name: 'gcr.io/cloud-builders/gcloud'
entrypoint: 'bash'
args: [
'-c', 'gcloud beta composer environments update $_COMPOSER_ENVIRONMENT_NAME
--location=$_COMPOSER_LOCATION
--image-version=$_COMPOSER_IMAGE_VERSION
|| exit 0']
- id: change composer environment settings if changed (max workers)
name: 'gcr.io/cloud-builders/gcloud'
entrypoint: 'bash'
args: [
'-c', 'gcloud beta composer environments update $_COMPOSER_ENVIRONMENT_NAME
--location=$_COMPOSER_LOCATION
--max-workers=$_COMPOSER_MAX_WORKERS
|| exit 0']
- id: change composer environment settings if changed (environment size)
name: 'gcr.io/cloud-builders/gcloud'
entrypoint: 'bash'
args: [
'-c', 'gcloud beta composer environments update $_COMPOSER_ENVIRONMENT_NAME
--location=$_COMPOSER_LOCATION
--environment-size=$_COMPOSER_ENVIRONMENT_SIZE
|| exit 0']
- id: update pypi packages
name: 'gcr.io/cloud-builders/gcloud'
entrypoint: 'bash'
args: [
'-c', 'gcloud composer environments update $_COMPOSER_ENVIRONMENT_NAME
--location=$_COMPOSER_LOCATION
--update-pypi-packages-from-file=settings/anytag/requirements.txt
|| exit 0']
サービスを跨いだ処理完了確認
データの作成や転送が完了しているかを Airflow の DAG 間や後続の Vertex Pipelines (Kubeflow) との間で確認する必要があります。こちらに関しては、以下のように処理完了時に Cloud Storage 上に時間情報を含んだフラグファイルを作成し、後続の処理はそれを見て判断する、フラグファイルが作成するまで wait するなどの処理を行なっています。
将来的な展望
短期的には以下の二つを行おうと考えています。
- 機械学習用に定義されたテーブルの作成
- 更新頻度の向上
前者に関しては、まだプロダクト側で大きなテーブル定義の変更が行われる可能性があること、割ける工数が少なかったことからプロダクトのテーブルをそのまま BigQuery に複製し、機械学習プロジェクトごとに都度 Input となるテーブルを作成する方法を取りました。今後はモデル生成・サービングの観点から汎用的に使いやすい形に定義されたテーブルを作成し、そのテーブルをプロジェクト共通で使用できるようにしたいと考えています。
後者に関して、現状は日次更新となっていますが、今後オンライン推論の必要性が高まってくると考えられるため今後は更新トランザクションを保存し、最終テーブルの作成処理をより頻度高く実行できるように変更したいと考えています。
そして、上記2つが完了すれば最終的なプランとして、以下のように機械学習向けのデータストアのインターフェースを Firestore (Datastore) に統合し、後続のモデル生成処理と推論 API で共通利用することを考えています。
終わりに
今回の実装内容に関しては最低限の要件を定めていたことや Google Cloud Platform で簡単に実現できたこともあり、合計で3人日ほどで完了することができました。将来的にもあるように、長期的に考えた場合こちらがベストな状態とは言えないですが、機械学習システム導入の初期フェーズでの実装として参考になれば幸いです。