こんにちは。AnyMind で機械学習エンジニアをしている河本直起です。
AnyMind では 0 から MLOps 環境を作成しており、前回の記事では Vertex AI を用いた機械学習バッチ推論基盤の構成について紹介させていただきました。
今回は、以前のデータ基盤についての記事の続編として、現在使用している機械学習向けデータ基盤についてご紹介できればと思います。
背景
Cloud Composer (Airflow) を用いた機械学習向けデータ基盤の作成
上の記事で書いたように、従来は以下のようにモデル学習パイプラインがプロダクト側の RDB を直接参照するという形になっていました。
RDB はプロダクト要件に合わせて採用されたものであり、機械学習用のデータ処理に向かないことが課題となっていましたが、
- RDB から BigQuery へのデータのコピー処理
- コピーされたデータを用いたモデル学習用データセットの作成処理
を Cloud Composer (Airflow) をデータパイプラインとして以下のように導入することで、機械学習向けのデータセットとプロダクト向けのデータセットを分離すること、そして機械学習向けのデータ処理の実行を可能にすることができました。
また、その後 RDB から BigQuery へのデータ転送部分に関してはプロダクト側の開発チームに引き継ぎました。また、機械学習用の共通のデータセットの作成処理を追加し、全体としては以下のような構成としました。共通のデータセットは機械学習プロジェクト間で汎用的に使いやすい形に定義されていて、プロダクトデータセット固有のルールの吸収や、各種統計量などの集計を行なっています。そして、プロジェクトのモデル学習に他プロジェクトの予測結果を利用できるようにするため、共通のデータセットは各プロジェクトのバッチ推論の結果を参照します。
バッチ推論処理の完了や依存するDAGの完了は、Google Cloud Storage (GCS) にフラグファイルを作成することによって確認されるようになっています。
課題
ここで、Cloud Composer (Airflow) を運用していく中で
- 開発時に必須で Airflow の知識が必要になること
- データ処理とモデル学習処理の切り分けが明確になりづらいこと
という課題が見えてきました。
一つ目に関して、弊社では機械学習エンジニアが問題設定、モデルの調査・検証から開発・運用までを担当し、開発範囲にはデータ作成からモデル学習、モデルのサービングまでが含まれます。そのため、本質的なタスクである問題設定、モデルの調査・検証に割くことのできるリソースを確保するため、開発に必要な知識を減らす必要が出てきました。加えて、機械学習向けデータ作成パイプラインではそれほど複雑な依存関係を持つ処理は行わないため、Airflow はオーバースペックでした。
二つ目に関して、データ作成用のパイプライン(Cloud Composer)とモデル学習用のパイプライン(Vertex Pipelines)で違う環境を用意することによって、その二つの担当する範囲を厳密に切り分け、インプットとなるデータがモデル学習処理に依存しないようにすることを意図していました。しかし、本来 Airflow で行う処理として想定していたのは SQL クエリのパイプライン実行のみだったんですが、Airflow でもモデル学習の処理が実装できてしまうため、その切り分けが曖昧な形で開発が進んでしまうということが起こりました。
BigQuery View Table を選んだ理由
上記課題は、SQL クエリを記述でき、その依存関係を解決できるフレームワークがあれば解決可能です。
BigQuery View Table は SQL クエリを登録すれば、テーブルを参照する形でその実行結果を取得できるもので、デプロイ方法を工夫すれば依存関係を解決できることがわかりました。加えて、BigQuery View Table は通常のテーブルと参照方法が変わらないため、テーブルを参照しているモデル学習パイプラインへの修正が必要なくなります。これら理由から、BigQuery View Table をベースに機械学習向けデータ基盤を作成することに決定しました。
また、BigQuery View Table を使用することによって、
- モデル学習パイプラインからデータ作成パイプラインの処理の完了を待つ必要がなくなる
- スケジュールの変更が一箇所で済む
というメリットがありました。BigQuery View Table は、参照されたタイミングで登録した SQL クエリが実行され、その時点で最新のデータを取得できます。一つ目に関して、データ作成パイプラインを使用していたときはモデル学習パイプラインからその完了を検知する必要がありましたが、BigQuery View Table を使用する場合はそのような処理が必要ありません。また、データ作成パイプラインで必要だった処理の監視やリカバリも行う必要がなく、通信エラーなどで BigQuery View Table での処理が失敗した場合は、モデル学習パイプラインからリトライなどの処理を行うことができます。それによって、運用コストが心理的なものも含めて減らすことができました。
二つ目に関して、パイプライン全体の実行スケジュールを変更する度にモデル学習パイプラインとデータ作成パイプライン両方のスケジュールを変更する必要がありましたが、モデル学習パイプラインのスケジュールを変更するだけで済むようになりました。変更箇所が一箇所のみになることによって、変更漏れのリスクを減らすことができました。
全体構成
全体構成は以下のようになっています。上記既存構成のデータ作成パイプライン部分、Cloud Composer を使用している部分を全て BigQuery View Table に置き換えています。データセットの新規切り分けに関しては次で説明します。
データセットの切り分け
Cloud Composer の利用を止めることで、テーブル作成ジョブ間の依存関係を記述しづらくなります。それに対して、テーブルが互いに参照し合うことを防ぐため、テーブルごとの役割を明確するために、以下のようにデータセットを切り分けて対処しています。
- プロダクトデータセット参照テーブル格納用:プロダクトデータセットを参照し、プロダクトデータセット固有のルールなどを解消する
- プロジェクト共通中間テーブル格納用:1.を参照し、後続の最終テーブル作成に向けたテーブルの汎用化処理を行う
- プロジェクト共通最終テーブル格納用:2.を参照し、テーブル間の結合処理を行う
- モデル推論結果参照テーブル格納用:モデルの推論結果ログを参照し、利用しやすい形に成形する。
- プロジェクト用テーブル格納用:3.4.を参照し、対象プロジェクトのモデル学習パイプラインが参照するテーブルを作成する。
これら切り分けに従っていれば、同じデータセット内にある他のテーブルを参照することもできるようにしています。
View テーブルの登録方法
query
├── dataset_name_a
│ ├── table_name_1.sql
│ └── table_name_2.sql
└── dataset_name_b
└── table_name_3.sql
archived_query
├── dataset_name_a
│ └── table_name_4.sql
└── dataset_name_b
View テーブルのデプロイを行うリポジトリに上記のようなディレクトリを用意しています。ここに、query/対象のデータセット名/
以下に、テーブル名.sql
として SQL クエリを記述すれば、そのクエリが View テーブルとして登録されるという形になっています。登録済みの .sql ファイルを変更した場合は、その変更が View テーブルに反映されます。
また、既存の View テーブルを削除したい場合は、対象テーブルの .sql ファイルを archived_query/対象のデータセット名/
以下に移動することで削除されるようになっています。こちらに関しては、ディレクトリに存在しないテーブルを削除するという形でも実装可能ですが、一度生成を止めたがまた生成したいというケースに対応するため、クエリをアーカイブするという形で実装しました。その場合、archived_query/対象のデータセット名/
以下から query/対象のデータセット名/
以下に対象ファイルを移動することで対応可能です。
デプロイフロー
View テーブルのデプロイは以下を実行することによって行なっています。
ここで、参照元のテーブルを参照するテーブルと同時にリリースしようとした場合、参照元のテーブルが存在しないために作成の順序によっては実行が失敗する可能性があります。そのため、参照元のテーブルが存在するテーブルから順次再帰的に作成していくという処理を実装しています。
また、こちらは CircleCI 経由でキックされた Cloud Build によって実行されるようになっています。クエリ内で project id を指定する部分は _PROJECT_ID_
として記述し、デプロイ時にデプロイ先の project id を Cloud Build 経由で取得し、上書きする形で動的に project id を指定できるようにしています。
import os
import glob
import dataclasses
from typing import ClassVar, List
from pyhocon import ConfigFactory
from google.cloud import bigquery
from google.cloud.exceptions import Conflict, NotFound
# pass project id as environment variable to Docker Image from cloudbuild.yaml
PROJECT_ID = os.environ['PROJECT_ID']
class InvalidQueryError(Exception):
pass
@dataclasses.dataclass
class ViewTableConf:
project_id: str
dataset: str
table_name: str
original_query: str
view_id: str = dataclasses.field(init=False)
query: str = dataclasses.field(init=False)
VIEW_ID_FORMAT: ClassVar[str] = "{project_id}.{dataset}.{table_name}"
def __post_init__(self):
self.view_id = self.VIEW_ID_FORMAT.format(
project_id=self.project_id,
dataset=self.dataset,
table_name=self.table_name
)
self.query = self.original_query.replace(
"_PROJECT_ID_", self.project_id
)
def load_table_info_file(
query_path_format: str
):
query_files = glob.glob(query_path_format)
for query_file in query_files:
with open(query_file, 'r') as f:
query = f.read()
# extract dataset and table name from file path
dir, file_name = os.path.split(query_file)
table_name = file_name.split(".")[0]
dataset = os.path.basename(dir)
yield dataset, table_name, query
class ViewDeployer:
def __init__(
self,
project_id: str
) -> None:
self.project_id = project_id
self.client = bigquery.Client(project=project_id)
def _is_view_exist(
self,
view_id
):
"""
check if view table already exists
"""
try:
_ = self.client.get_table(view_id)
return True
except Exception:
return False
def deploy(
self,
view_id: str,
view_query: str
) -> bool:
"""
deploy view table
"""
is_succeed = True
print(f"Deploy {view_id}")
view = bigquery.Table(view_id)
view.view_query = view_query
is_view_exist = self._is_view_exist(view_id)
try:
if not is_view_exist:
# create new one
view = self.client.create_table(view, exists_ok=False)
print(f"Created: {str(view.reference)}\n")
else:
# if already created, update exisiting one
self.client.delete_table(view_id)
view = self.client.create_table(view)
print(f"Updated: {str(view.reference)}\n")
except Exception as e:
print(f"Deployment failed: {str(view.reference)}")
print(f"{e}\n")
is_succeed = False
return is_succeed
def delete(
self,
view_id: str
) -> bool:
"""
delete view table
"""
is_succeed = True
try:
self.client.delete_table(view_id)
print(f"Deleted: {view_id}\n")
except NotFound:
print(f"Already deleted: {view_id}")
except Exception:
is_succeed = False
return is_succeed
def deploy_recursive(
view_deployer: ViewDeployer,
undeployed_view_table_confs: List[ViewTableConf]
):
"""
deploy with solving dependency between tables
"""
failed_view_table_confs = []
for undeployed_view_table_conf in undeployed_view_table_confs:
is_succeed = view_deployer.deploy(
view_id=undeployed_view_table_conf.view_id,
view_query=undeployed_view_table_conf.query
)
if not is_succeed:
failed_view_table_confs.append(undeployed_view_table_conf)
# if no tables are succeededly deployed on this trial, raise Error and finish process
if len(failed_view_table_confs) > 0:
if len(undeployed_view_table_confs) > len(failed_view_table_confs):
deploy_recursive(view_deployer, failed_view_table_confs)
else:
msg = f"Deployment process failed\n Probable tables: {[i.view_id for i in failed_view_table_confs]}"
raise InvalidQueryError(msg)
def main():
view_deployer = ViewDeployer(project_id=PROJECT_ID)
print("Delete archived view tables\n")
for dataset, table_name, original_query in load_table_info_file(conf.archived_query_path_format):
view_table_conf = ViewTableConf(
project_id=PROJECT_ID,
dataset=dataset,
table_name=table_name,
original_query=original_query
)
view_deployer.delete(
view_id=view_table_conf.view_id
)
print("Deploy view tables\n")
view_table_confs = []
for dataset, table_name, original_query in load_table_info_file(conf.query_path_format):
view_table_conf = ViewTableConf(
project_id=PROJECT_ID,
dataset=dataset,
table_name=table_name,
original_query=original_query
)
view_table_confs.append(view_table_conf)
deploy_recursive(view_deployer, view_table_confs)
if __name__ == "__main__":
main()
モデル学習に使われたデータの保存
モデルのデバックのためモデルの学習に使用したデータを残しておく必要があります。BigQuery View Table は常にその時のクエリの実行結果を返すため、モデル学習パイプライン側で、データのロード時に取得されたデータを GCS に保存するようにして対応しています。
コスト削減のため、保存先のバケットにはオブジェクトのライフサイクルを設定し、設定期間以上が経過したデータを削除するようにしています。
BigQuery View Table のデメリット
通常のテーブルと比較して、BigQuery View Table を使用するデメリットとしては大きくは以下が考えられます。
- データのロード時に都度計算が走るため、コストが大きくなる
- データのロードに時間がかかる
これらに関しては、クエリ結果のキャッシュや Materialized View を用いて、ロードする側のコードを修正せずに対応することが可能です。また、今後データによっては Cloud Composer を用いて通常のテーブルに置き換えることも考えています。
また、Cloud Composer 使用時と比較すれば、以下がデメリットとなります。
- ワークフロー処理を記述できない
こちらに関して、現状はテーブル間の依存関係を解決できればよく、その依存関係も複雑なものにはなり得ないため、前述のデプロイフローを用いることで解決しています。ただ、もし今後ワークフロー処理が必要なプロジェクトが発生すれば、そのプロジェクトに限定してBigQuery View Table と Cloud Composer を併用することを考えています。