こんにちは。AnyMind で機械学習エンジニアをしている河本直起です。
AnyMind では 0 からモデル学習パイプラインを含めた MLOps 環境を作成しています。
前回の記事では、Cloud Composer (Airflow) を用いて作成した機械学習向けデータ基盤について現状と将来的なプラン含めてご紹介させていただきました。
今回は、Vertex Pipelines (Kubeflow) を用いて実装している現状のモデル学習パイプラインについて、ご紹介できればと思います。
背景 / 課題
まず、Vertex Pipelines (Kubeflow) 導入前にどのような状態だったのか、どのようなことが課題だったのかについて説明します。
モデル学習・サービング処理の属人化
従来 AnyMind では notebook 環境で作成したモデルをレポジトリに配置してリリースする、単一のバッチで全ての学習処理を行うなど、プロジェクトごとに様々なモデル学習・サービング方法がとられ属人化していました。加えて、モデル学習をスケジュール実行する仕組みや、モデルの学習とそのモデルの API へのデプロイやバッチ推論を一貫して実行する仕組みも存在せず、こちらもマニュアル実行を含む各々の方法で行われていました。
そのため、必要な機能を持った共通のモデル学習基盤を用意することで、属人化を解消し運用や改善にかかるコストを下げる必要がありました。
ワークフロー処理の必要性
AnyMind の提供するサービスは基本的に複数国・言語を跨ぐものとなっています。同時に、例えば AnyTag というプラットフォームでは同じ機能を Instagram、Youtube など複数 SNS に渡って提供する必要があります。
SNS によって特徴量が違ったり、同じ特徴量でも言語ごとに違う処理を行う必要があったり、国ごとにモデルを作成する必要があるなど、同じ機械学習機能に向けた学習処理でも、内部で分岐・並列処理を行う必要があります。しかし、従来の環境ではそのような処理に対応できませんでした。
検証実行時間の長さ・検証コードとプロダクションコードの不一致
上記背景に伴い、モデル検証に必要な処理が複数パターン存在するため、検証処理の実行に非常に時間がかかる用になっていました。また、機械学習モデルに関わる開発全般で発生する問題として、検証が完了した後にその検証に使用したコードをプロダクション向けに書き換える必要があります。検証処理の実行時間に加えて、その書き換えにかかる時間もボトルネックとなっていました。
Vertex Pipelines (Kubeflow) を選んだ理由
Vertex Pipelines (Kubeflow) は以下の特徴をもつため、上記課題に適したものとなっていました。
- モデルの学習・サービング処理をワークフローとして記述できること
- コンポーネントとしてコードの再利用が可能であること
- コンポーネントにおいて前回と同じインプットで実行する場合、キャッシュされた前回実行時の結果を再利用できること
加えて、AnyMind では少人数で複数の機械学習機能を開発・運用しているため、フルマネージドサービスであり必要とされる知識が技術が少ないこと、メンテナンスに割かれるリソースが少ないことも選んだ理由となっています。
一方で、現状の Kubeflow の Python 用 SDK (kfp v2) ではスケジュール実行用のインターフェースが用意されていないなど要件を満たさない部分もありましたが、Google Cloud Platform の Python 用 API を使うことで全て簡単に開発できることがわかったので自前で開発することにしました。
導入内容
以下では、実際にどのような形で導入したのかを説明します。
デプロイ単位の切り分け
現状モデル学習パイプラインはプロジェクト単位でリポジトリを切り分けており、コンポーネントもプロジェクト単位で作成しています。現状プロジェクト共通の処理がそこまでなく共通化するメリットが少ないのに対して、実装におけるベストプラクティスがまだ固まっていないため共通化するリスクが高いと考えたためです。
プロジェクト単位で作成されたリポジトリ内に学習処理と推論処理を格納することで、その二者間で前処理など共通の処理を使い回すことができることができるようになっています。それにより、学習と推論での処理の不一致を解決することができています。
コンポーネントのビルド
Kubeflow においてコンポーネントの記述方法は以下の二種類存在します。
- yaml ファイルにインターフェースを、python ファイルに処理を定義する方法
- python の function としてインターフェースと処理を定義する方法
実行イメージとディレクトリの切り分けが明確になり、プロジェクト間での再利用も簡単になるため、モデル作成などプロジェクト固有の処理に関しては全て前者で記述しています。
一方で、前回のジョブの完了を wait する処理、実行開始時間を取得する処理など、パイプラインに依存するコンポーネントは後者で記述しています。パイプラインと同じディレクトリに格納し、パイプラインと同じイメージ上で実行される形にしています。
コンポーネントのビルドは Cloud Build を通して以下のように行なっており、ビルドされたコンポーネントイメージは Container Registry 上に格納されています。
substitutions:
_VERSION: version_of_pipeline
_PROJECT_NAME: project_name
# from circle ci
_COMMIT_SHA: none
steps:
- id: pull cache component image
name: 'gcr.io/cloud-builders/docker'
entrypoint: 'bash'
args: ['-c', 'docker pull asia.gcr.io/$PROJECT_ID/$_PROJECT_NAME/$_VERSION/workflow/component/component_name:latest || exit 0']
- id: build component image
name: 'gcr.io/cloud-builders/docker'
args: [
'build',
'-f', './image/workflow/component/component_name/Dockerfile',
'-t', 'asia.gcr.io/$PROJECT_ID/$_PROJECT_NAME/$_VERSION/workflow/component/component_name:latest',
'-t', 'asia.gcr.io/$PROJECT_ID/$_PROJECT_NAME/$_VERSION/workflow/component/component_name:$_COMMIT_SHA',
'--cache-from', 'asia.gcr.io/$PROJECT_ID/$_PROJECT_NAME/$_VERSION/workflow/component/component_name:latest',
'.'
]
- id: push component image to gcr
name: 'gcr.io/cloud-builders/docker'
args: ['push', 'asia.gcr.io/$PROJECT_ID/$_PROJECT_NAME/$_VERSION/workflow/component/component_name:latest']
images:
- 'asia.gcr.io/$PROJECT_ID/$_PROJECT_NAME/$_VERSION/workflow/component/component_name:$_COMMIT_SHA'
パイプラインのビルド
パイプラインのビルド処理は python ファイル上にパイプラインの定義、そのパラメータと共に記述しています。
行う処理としては
- コンポーネント定義ファイルの上書き
- パイプラインのコンパイル
- コンパイルされたパイプライン・そのパラメータを Cloud Storage へアップロード
- 一回切りの実行の場合はパイプラインジョブ作成用 Cloud Pub/Sub トピックにメッセージを発行
- 定期実行の場合は Cloud Scheduler 作成用 Cloud Pub/Sub トピックにメッセージを発行
となっています。各種項目の詳細に関しては後述します。
サンプルとして、以下のような処理を行なっています。
import os
import json
import kfp
from kfp.v2.compiler import Compiler
from gcsfs import GCSFileSystem
from google.cloud import pubsub
# set to image from cloud build
PROJECT_ID = os.environ['PROJECT_ID']
VERSION = os.environ['VERSION']
PROJECT_NAME = os.environ['PROJECT_NAME']
# sample pipeline parameters
PIPELINE_PARAMETERS = {
"param1": "value_of_param1",
"param2": "value_of_param2",
}
# sample pipeline definition
@kfp.dsl.pipeline(
name="id of pipeline",
description="description of pipeline",
pipeline_root="root of pipeline"
)
def sample_pipeline(
param1: str,
param2: str
):
# load light weight components
component1_op = kfp.components.create_component_from_func(
func=component1, base_image="gcr path to pipeline image",
)
# load components from yaml file
components_store = kfp.components.ComponentStore(
local_search_paths=["src/workflow/component"])
component2_op = components_store.load_component('component2')
# define workflow
component1_task = component1_op(
param1=param1
)
component2_task = component2_op(
param2=param2
).after(component1_task)
# compile and save pipeline
Compiler().compile(
pipeline_func=sample_pipeline,
package_path="pipeline.json"
)
# save parameter
with open('pipeline_values.json', 'w') as f:
json.dump(PIPELINE_PARAMETERS, f)
# put pipeline and values to cloud storage
fs = GCSFileSystem()
fs.put("pipeline.json", "storage path of pipeline")
fs.put("pipeline_values.json", "storage path of pipeline parameter")
# publish message to cloud pub/sub
publisher = pubsub.PublisherClient()
IS_ENABLE_SCHEDULING = True # if pipeline is scheduled or not
if not IS_ENABLE_SCHEDULING:
# if schedule is not enabled, just run once
topic_message = {
"version": "version of pipeline",
"pipeline": {
"location": "location of pipeline",
"pipeline_spec_uri": "storage path of pipeline",
"pipeline_root": "root of pipeline",
"pipeline_parameter_values_uri": "storage path of pipeline parameter",
"enable_caching": False
}
}
topic_path = publisher.topic_path(PROJECT_ID, "name of pipeline creation topic")
else:
topic_message = {
"version": "version of pipeline",
"scheduler": {
"location": "location of scheduler",
"schedule": "cron schedule of scheduler",
"description": "desctiption of scheduler",
"time_zone": "time zone of scheduler",
"is_pause": False
},
"pipeline": {
"location": "location of pipeline",
"pipeline_spec_uri": "storage path of pipeline",
"pipeline_root": "root of pipeline",
"pipeline_parameter_values_uri": "storage path of pipeline parameter",
"enable_caching": False
}
}
topic_path = publisher.topic_path(PROJECT_ID, "name of scheduler creation topic")
future = publisher.publish(topic_path, json.dumps(topic_message).encode("utf-8"))
message_id = future.result()
上記処理は以下のように Cloud Build から実行されています。
substitutions:
_VERSION: version_of_pipeline
_PROJECT_NAME: project_name
# from circle ci
_COMMIT_SHA: none
steps:
# pipeline
- id: pull cache pipeline image
name: 'gcr.io/cloud-builders/docker'
entrypoint: 'bash'
args: ['-c', 'docker pull asia.gcr.io/$PROJECT_ID/$_PROJECT_NAME/$_MODEL_VERSION/workflow/pipeline:latest || exit 0']
- id: build pipeline image
name: 'gcr.io/cloud-builders/docker'
args: [
'build',
'-f', './image/workflow/pipeline/Dockerfile',
'--build-arg', 'PROJECT_ID=$PROJECT_ID',
'--build-arg', 'VERSION=$_VERSION',
'--build-arg', 'PROJECT_NAME=$_PROJECT_NAME',
'-t', 'asia.gcr.io/$PROJECT_ID/$_PROJECT_NAME/$_MODEL_VERSION/workflow/pipeline:latest',
'-t', 'asia.gcr.io/$PROJECT_ID/$_PROJECT_NAME/$_MODEL_VERSION/workflow/pipeline:$_COMMIT_SHA',
'--cache-from', 'asia.gcr.io/$PROJECT_ID/$_PROJECT_NAME/$_MODEL_VERSION/workflow/pipeline:latest',
'.'
]
- id: push pipeline image
name: 'gcr.io/cloud-builders/docker'
args: ['push', 'asia.gcr.io/$PROJECT_ID/$_PROJECT_NAME/$_MODEL_VERSION/workflow/pipeline:latest']
- id: deploy pipeline
name: "asia.gcr.io/$PROJECT_ID/$_PROJECT_NAME/$_MODEL_VERSION/workflow/pipeline:latest"
entrypoint: 'python'
args: ['src/workflow/pipeline/pipeline.py']
コンポーネント定義ファイルの上書き
コンポーネント定義ファイルは以下のように実行イメージの Container Registry でのパスを持ちますが、コンポーネントはステージング・プロダクションなど環境ごとに project を分けてデプロイされています。
name: Sample Component
inputs:
- name: param1
type: String
implementation:
container:
image: asia.gcr.io/{project_id}/{project_name}/{version}/workflow/component/batch_predict:latest
args: [
--param1, {inputValue: param1},
]
そのため、project id を上書きする処理をパイプライン実行時に行なっています。この時に、同時にリリースバージョンも反映させています。
具体的には以下のような処理を上記パイプラインビルド実行時に追加しています。
def overwrite_component_definition_file(project_id, project_name, version):
component_files = glob.glob('src/workflow/component/*/component.yaml')
for component_file in component_files:
with open(component_file, 'r') as f:
data = yaml.safe_load(f)
data["implementation"]["container"]["image"] = data["implementation"]["container"]["image"].format(
project_id=project_id, project_name=project_name, version=version)
with open(component_file, "w") as f:
yaml.dump(data, f)
# set variables to components
overwrite_component_definition_file(
project_id=PROJECT_ID,
project_name=PROJECT_NAME
version=VERSION
)
パイプラインジョブの作成
パイプラインジョブの作成処理は、以下のように Cloud Functions として切り出し、Cloud Pub/Sub トピックへのメッセージの発行によって実行されるようになっています。
メッセージには以下の情報が含まれます。
- コンパイルされたパイプラインファイルの保存先 Cloud Storage のパス
- パイプラインのパラメータを格納したファイルの保存先 Cloud Storage のパス
- その他バージョンなどの情報
これら情報を持ったメッセージを Cloud Pub/Sub トピックへと発行することで、Cloud Functions がそれら情報をもとにパイプラインジョブを作成します。
Cloud Functions 内部での処理は以下のようになっています。
import os
import json
import base64
from gcsfs import GCSFileSystem
from google.cloud import aiplatform
PROJECT_ID = os.getenv('PROJECT_ID')
PIPELINE_SERVICE_ACCOUNT = "pipeline_service_account"
def main(event, context):
# decode http request payload and translate into JSON object
settings = json.loads(base64.b64decode(event['data']).decode('utf-8'))
# get arguments
display_name = settings['project_name'] + "-" + settings["model_version"]
pipeline_spec_uri = settings["pipeline"]['pipeline_spec_uri']
pipeline_parameter_values_uri = settings["pipeline"]['pipeline_parameter_values_uri']
pipeline_root = settings["pipeline"]["pipeline_root"]
location = settings["pipeline"]["location"]
enable_caching = settings["pipeline"]["enable_caching"]
# load pipeline parameters
fs = GCSFileSystem()
with fs.open(pipeline_parameter_values_uri, 'r') as f:
parameter_values = json.load(f)
# initialize aiplatform client
aiplatform.init(
project=PROJECT_ID,
location=location,
)
# create pipeline job
job = aiplatform.PipelineJob(
display_name=display_name,
template_path=pipeline_spec_uri,
pipeline_root=pipeline_root,
enable_caching=enable_caching,
parameter_values=parameter_values
)
job.submit(
service_account=PIPELINE_SERVICE_ACCOUNT
)
定期実行用スケジューラの作成
パイプラインジョブの定期実行は Cloud Scheduler を用いて行なっています。以下のように、上記パイプラインジョブ作成に必要な情報をメッセージとして保持し、設定したスケジュールでその情報を上記 Cloud Pub/Sub トピックへ発行する形となっています。
その情報を保持した Cloud Scheduler の作成は Cloud Functions によって行われており、パイプライン同様に Cloud Pub/Sub を通して実行されます。スケジューラ作成、その後の定期的なパイプラインジョブ作成のフローは以下のようになっています。
Cloud Scheduler の作成は以下のように行なっています。
import os
import json
import base64
from google.cloud import scheduler_v1
PROJECT_ID = os.getenv('PROJECT_ID')
PUBSUB_TOPIC_ID = "scheduler_creation_topic_id"
def main(event, context):
# decode http request payload and translate into JSON object
settings = json.loads(base64.b64decode(event['data']).decode('utf-8'))
job_id = settings['project_name'] + "-" + settings["model_version"]
location = settings["scheduler"]["location"]
time_zone = settings["scheduler"]["time_zone"]
description = settings["scheduler"]["description"]
schedule = settings["scheduler"]["schedule"]
scheduler_name = f"projects/{PROJECT_ID}/locations/{location}/jobs/{job_id}"
pipeline_settings = {}
pipeline_settings["project_name"] = settings['project_name']
pipeline_settings["model_version"] = settings['model_version']
pipeline_settings["name_postfix"] = settings['name_postfix']
pipeline_settings["pipeline"] = settings["pipeline"]
# create pipeline settings
string_pipeline_settings = json.dumps(pipeline_settings)
pubsub_target = scheduler_v1.PubsubTarget(
topic_name=f"projects/{PROJECT_ID}/topics/{PUBSUB_TOPIC_ID}",
data=string_pipeline_settings.encode("utf-8")
)
# create scheduler job
scheduler_job = scheduler_v1.Job(
name=scheduler_name,
description=description,
schedule=schedule,
pubsub_target=pubsub_target,
time_zone=time_zone
)
client = scheduler_v1.CloudSchedulerClient()
# get existing scheduler
list_job_request = scheduler_v1.ListJobsRequest(
parent=f"projects/{PROJECT_ID}/locations/{location}",
)
page_result = client.list_jobs(request=list_job_request)
exist_jobs = [i.name for i in page_result]
# delete existing scheduler
if scheduler_name in exist_jobs:
delete_job_request = scheduler_v1.DeleteJobRequest(
name=scheduler_name
)
response = client.delete_job(request=delete_job_request)
print(f"deleted: {response}")
# create new scheduler
create_job_request = scheduler_v1.CreateJobRequest(
parent=f"projects/{PROJECT_ID}/locations/{location}",
job=scheduler_job
)
response = client.create_job(request=create_job_request)
# pause scheduler if needed
if settings["scheduler"]["is_pause"]:
pause_job_request = scheduler_v1.PauseJobRequest(
name=scheduler_name
)
response = client.pause_job(request=pause_job_request)
ビルド全体処理
ここまでの内容をまとめると、定期実行パイプラインの全体的なビルド処理は以下のようになります。
ここで、Cloud Build は CircleCI によって submit されますが、ここでステージング、プロダクションなどどの環境下の情報をブランチに応じて substitution として渡しています。以降のビルド処理はその渡された情報に応じて、対応する環境の config ファイルを取得するなど環境に応じた値の切り分けを行なっています。
他ワークフローとの接続
以下では、モデル学習パイプラインがデータパイプラインやバッチ推論とどのように接続しているかについて説明します。
データ生成
AnyMind ではデータ生成処理を Cloud Composer (Airflow) に分離しています。そのため、データ生成パイプラインが処理完了後に時間情報を持ったフラグファイルを作成し、モデル学習パイプラインにフラグが作成されるまで wait し作成が確認されれば完了するコンポーネントを用意しています。コンポーネントの完了後、以降のコンポーネントが実行されていく形になっています。
バッチ推論
学習されたモデルによるバッチ推論、その結果のデータストアへの登録処理は、Vertex Pipelines (Kubeflow) 上で別のパイプラインジョブとして作成されるようになっています。モデルは月に一回更新するがバッチ推論は日に一回実行したいなど、モデル学習の頻度とバッチ推論の頻度が異なることがあるためです。
ここでもデータ生成処理と同様に、モデル学習パイプラインが全体処理の完了後に時間情報を持ったフラグ生成を行い、バッチ推論パイプラインにはそのフラグが作成されるまで wait するコンポーネントを用意しています。モデルが全て学習されてからバッチ推論が実行される形になっています。
終わりに
今回は AnyMind における機械学習システムとしての Vertex Pipelines (Kubeflow) の導入についてご紹介させていただきました。参考になれば幸いです。