< script src="https://unpkg.com/@highlightjs/cdn-assets@11.0.0/highlight.min.js">

Tech Blog

Facebook Icon Twitter Icon Linkedin Icon

AnyMind Group

Facebook Icon Twitter Icon Linkedin Icon

[Tech Blog] Vertex AI を用いた機械学習バッチ推論基盤の運用

こんにちは。AnyMind で機械学習エンジニアをしている河本直起です。

AnyMind では 0 から MLOps 環境を作成しており、前回の記事では BigQuery View Table を用いた機械学習向けデータ基盤について紹介させていただきました。

以前の Vertex AI のバッチ推論基盤についての記事では、導入時点での構成について紹介しました。その後運用に当たりいくつかの改善を行ったため、今回の記事ではそれら改善点について個別に紹介させていただければと思います。

機械学習処理の流れ

まず、前提として現状のデータ生成から推論結果のサービングまでの流れを紹介します。

現在は以下のような形で、インプットデータの生成、モデル学習・バッチ推論と推論結果のサービングを行なっています。

まず、インプットデータが BigQuery 上に View Table として用意されており、モデル学習パイプラインがそのデータを元にモデルを生成します。後続のバッチ推論パイプラインではその生成されたモデルを用いて推論を行い、推論結果を BigQuery と Firestore (Datastore mode) に格納します。BigQuery は主に推論結果の確認や検証、機械学習間での推論結果の再利用のため、Firestore (Datastore mode) は推論結果の取得のために使われます。

プロダクトアプリケーションは推論結果が必要なキー(ユーザー単位の推論であればユーザー ID など)とメタデータ(レコメンデーションであれば上位何件が必要かなど)を推論結果のサービング API にリクエストします。そして、サービング API はリクエストを元に Firestore (Datastore mode) から推論結果を取得し、返すという形になっています。サービング API は Cloud Run 上にデプロイされています。

インプットデータ生成処理の詳細に関しては以下の記事をご参照ください。

BigQuery View Table を用いた機械学習向けデータ基盤の作成

モデル学習・バッチ推論、推論結果のサービング処理の詳細については以下の記事をご参照ください。

Vertex AI を用いた機械学習バッチ推論基盤の導入

改善点

現状、インプットデータ生成に関してはこちらの記事で書いたようにモノリポでの運用を行なっているのですが、

  • モデル学習・バッチ推論パイプライン
  • 推論結果のサービング API

に関しては、それぞれプロジェクトごとにリポジトリを切り分けて運用しています。その理由としては、プロジェクト共通の処理がそこまでなく共通化するメリットが少ないのに対して、実装におけるベストプラクティスがまだ固まっていないため共通化するリスクが高いと考えたためです。

その後、複数プロジェクトでの実装を行なったことで、共通化すべき部分とプロジェクトごとに実装すべき部分がある程度見えてきました。また、プロジェクト単位で実装することに由来するメンテナンスコストの増加や、新規実装コストが無視できないレベルになったため、一部共通化を進めることにしました。

具体的には、モデル学習・バッチ推論パイプラインに関しては、一部行っていたデプロイ処理・監視処理の共通化を進め、プロジェクト間で共通する処理のコンポーネント化を行いました。そして、推論結果のサービング API に関しては、共通処理をパッケージ化しました。そして、モデル学習・バッチ推論パイプライン、推論結果のサービング API のどちらに対しても、新規実装における作業量、開発者ごとの実装方法のブレを軽減することを目的に、テンプレートリポジトリを作成しました。

結果、プロジェクト単位の開発においては、モデル学習・バッチ推論パイプラインに関しては、プロジェクト固有のコンポーネントとパイプラインを、推論結果のサービング API に関してはインターフェースを実装すれば良いという形になりました。

以下では、これら改善点の詳細について紹介させていただければと思います。

Vertex Pipelines デプロイフローの共通化

Vertex Pipelines をデプロイするに当たって、現在行っていることは以下になります。

  1. コンポーネントイメージの Google Container Registry (GCR) へのプッシュ
  2. コンポーネント定義ファイルへの設定情報の追加
  3. プロジェクト間共通コンポーネント定義ファイルのロード
  4. パイプラインのコンパイル・Google Cloud Storage (GCS) へのプッシュ
  5. (一回限りの実行の場合)ジョブの作成
  6. (定期実行の場合)スケジューラの作成

Vertex Pipelines デプロイ処理の全体構成としては、パイプラインジョブが一回きりの実行である場合は以下のようになっています。

全て CircleCI を経由して Cloud Build に設定ファイルが submit されることによってデプロイされます。

事前にパイプラインジョブ作成用関数(Pipeline Job Creation Function)が Cloud Functions 上にデプロイされ、パイプラインのデプロイ処理を持ったパッケージ(Pipeline Deployment Package)が Artifact Registry にリリースされています。そして、プロジェクト間共通のコンポーネント(Common Components)のコンポーネントイメージが GCR に、定義ファイルが GCS に保存されています。

パイプラインジョブが一回きりの実行である場合、プロジェクト固有のパイプラインリポジトリの更新を行うと、パイプラインデプロイのための Cloud Builder イメージがビルドされ、パイプラインのデプロイ処理を持ったパッケージがインストールされ実行されます。そして、プロジェクト間共通のコンポーネントの定義ファイルをロードし、コンポーネントのプッシュやパイプラインのコンパイルとプッシュを行なった後、実行情報と共にパイプラインジョブ作成用関数が呼び出され、パイプラインジョブが作成されます。

パイプラインジョブがスケジュール実行となっている場合は以下のようになっています。プロジェクト固有のパイプラインリポジトリが更新された場合、インストールされたデプロイ処理用のパッケージがスケジューラを作成し、そのスケジューラがパイプラインジョブ作成用関数を定期的に呼び出します。それによって、定期的にパイプラインジョブが作成されるようになっています。

このデプロイフローの各詳細に関して、

  • コンポーネントイメージの Google Container Registry (GCR) へのプッシュ
  • コンポーネント定義ファイルへの設定情報の追加
  • パイプラインのコンパイル・Google Cloud Storage (GCS) へのプッシュ

については以下の記事に書いているのでご参照ください。

Vertex Pipelines (Kubeflow) の機械学習システムへの導入

ここでは、以下の処理の詳細とその共通化の方法について紹介できればと思います。

  • プロジェクト間共通コンポーネント定義ファイルのロード
  • (一回限りの実行の場合)ジョブの作成
  • (定期実行の場合)スケジューラの作成

プロジェクト間共通コンポーネントの作成

まず、以下で行っているプロジェクト間共通コンポーネントの作成についてです。各パイプラインとは別のリポジトリから共通コンポーネントがデプロイされ、パイプラインはデプロイ時に、そのデプロイ済みの共通コンポーネントをロードしています。

前述の通り、モデル学習・バッチ推論パイプラインにおいて、プロジェクトごとにリポジトリを作成するという形で運用しています。そのため、共通で使用する Kubeflow コンポーネントに関しては別でリポジトリを作成しデプロイしておき、各プロジェクトのパイプラインからはデプロイ時にそれらコンポーネントを参照するという方法をとっています。

コンポーネントを使用する際に必要なものは以下の二つになります。

  • コンポーネントイメージ
  • コンポーネント定義ファイル(component.yaml

コンポーネントイメージのパスはコンポーネント定義ファイルに記述されているため、プロジェクトのパイプラインからはコンポーネント定義ファイルのみ読み込めればいいです。そのため、共通コンポーネントのリポジトリは、コンポーネントイメージのビルド、GCR へのプッシュと、コンポーネント定義ファイルの GCS の保存を行っています。

コンポーネント定義ファイルは、後述する Vertex Pipelines デプロイ処理モジュール内で、各パイプラインのデプロイ処理時に自動的に GCS からローカルにロードしてくる形になっています。デプロイ済みの共通コンポーネントのロード処理は以下のように行っています。

import os
from pathlib import Path
from gcsfs import GCSFileSystem


def _load_common_components(
    common_component_download_dir: str,
    common_component_dir: str
    ) -> None:
    fs = GCSFileSystem()
    common_component_dirs = fs.ls(common_component_dir)
    for component_dir in common_component_dirs:
        component_file = os.path.join(component_dir, "component.yaml")
        component_name = component_dir.split("/")[-1]
        local_path = os.path.join(
            common_component_download_dir, component_name, "component.yaml")
        Path(local_path).parent.mkdir(parents=True, exist_ok=True)
        fs.get(component_file, local_path)


_load_common_components(
    common_component_download_dir='local_path_load_component_file_to',
    common_component_dir="gcs_path_component_is_uploaded")

パイプラインのデプロイ実行時に自動的にコンポーネント定義ファイルがロードされるため、パイプライン側からは以下のようにディレクトリの指定を行えば、他のコンポーネントと変わらず使用することができます。

component_store = kfp.components.ComponentStore(
    local_search_paths=[
        "component/",  # local directory project specific component definition files locate
        "component/common/"  # local directory common component definition files locate
    ]
)

project_specific_component_op = component_store.load_component('project_specific_component')
common_component_op = component_store.load_component('common_component')

共通で使用するコンポーネントとしては例えば、以下のものがあります。

  • BigQueryへの推論結果の登録
  • Firestore (Datastore mode) への推論結果の登録
  • 実行開始時間の取得

パイプラインジョブの作成

次に、以下で行なっているパイプラインジョブの作成についてです。パイプラインジョブ作成用の関数が Cloud Functions 上にデプロイされており、この関数を呼び出せばパイプラインジョブが作成されるという形にしています。

パイプラインジョブ作成用の関数は以下のようなものとなっています。

import os
import json
import dataclasses
from google.cloud import aiplatform


# set project id as environment variable in docker image from cloudbuild.yaml
_PROJECT_ID = os.getenv('PROJECT_ID')
_PIPELINE_SERVICE_ACCOUNT = f"service-account-for-pipeline-job-creation@{_PROJECT_ID}.iam.gserviceaccount.com"


class TypeValidationError(Exception):
    pass


@dataclasses.dataclass
class Arguments:
    display_name: str
    pipeline_spec_uri: str
    pipeline_root: str
    location: str = "asia-east1"
    enable_caching: bool = False

    def __post_init__(self):
        # validate types
        fields = dataclasses.fields(self)
        for field in fields:
            value = getattr(self, field.name)
            if not isinstance(value, field.type):
                raise TypeValidationError(
                    f"{field.name} must be {field.type}")


def main(request):
   # decode http request payload and translate into JSON object
    request = request.get_data()
    try: 
        request = json.loads(request.decode())
    except ValueError as e:
        print(f"Error decoding JSON: {e}")
        return "JSON Error", 400
    print(f'request: {request}')

    try:
        # validate and set default value
        arguments = Arguments(**request)
    except TypeError as e:
        print(f"Error getting arguments: {e}")
        return "Invalid Argument", 400
    except TypeValidationError as e:
        print(f"Error validating argument type: {e}")
        return "Invalid Argument Schema", 400

    print("initialize aiplatform client")
    aiplatform.init(
        project=_PROJECT_ID,
        location=arguments.location,
    )

    print("create pipeline job")
    try:
        job = aiplatform.PipelineJob(
            display_name=arguments.display_name,
            template_path=arguments.pipeline_spec_uri,
            pipeline_root=arguments.pipeline_root,
            enable_caching=arguments.enable_caching
        )
        job.submit(
            service_account=_PIPELINE_SERVICE_ACCOUNT
        )
        print("job submitted")

        # send success message to slack

    except Exception as e:
        
        # send error message to slack

        return "Internal Error", 500
    return "Job Completed", 200 

以前の記事 の時点では PubSub 経由でリクエストを受け付ける形で実装していましたが、処理の簡略化のため Http リクエストで呼び出せるように変更しました。また、以前は PubSub のメッセージスキーマによってスキーマのバリデーションを行なっていましたが、現在は関数内部で型のバリデーションを行なっています。

上記コードでは省略していますが、パイプラインジョブの作成処理の完了・失敗はこちらから Slack に通知する形になっています。

パイプラインのデプロイにおいて、以下のような Cloud Functions 用の汎用的なモジュールを用意しています。

import json
import requests
import google.oauth2.id_token
import google.auth.transport.requests


class FunctionClient:
    def __init__(
        self,
        project_id: str,
        location: str,
        function_name: str
        ) -> None:  
        request = google.auth.transport.requests.Request()
        self.url = f'https://{location}-{project_id}.cloudfunctions.net/{function_name}'
        self.token = google.oauth2.id_token.fetch_id_token(request, self.url)
    
    def request(
        self,
        request_parameter: dict
        ):
        response = requests.post(
            self.url, 
            headers={
                'Authorization': f"Bearer {self.token}", 
                "Content-Type": "application/json"},
            data=json.dumps(request_parameter)
        )
        print(f"status code: {response.status_code}")
        print(f"reason: {response.reason}")

こちらのモジュールを以下のように、パイプラインジョブの設定情報と共に呼び出すことでパイプラインジョブの作成を行っています。

import dataclasses


_CREATE_PIPELINE_JOB_FUNCTION_NAME = "pipeline-job-creation-function-name"
_CREATE_PIPELINE_JOB_LOCATION = "pipeline-job-creation-function-location"


@dataclasses.dataclass
class _PipelineJobMessage:
    display_name: str
    location: str
    pipeline_spec_uri: str
    pipeline_root: str
    enable_caching: bool


function_client = FunctionClient(
    project_id="project_id",
    location=_CREATE_PIPELINE_JOB_LOCATION,
    function_name=_CREATE_PIPELINE_JOB_FUNCTION_NAME
)
message = dataclasses.asdict(
    _PipelineJobMessage(
        display_name="pipeline_job_display_name",
        location="pipeline_job_location",
        pipeline_spec_uri="pipeline_spec_uri",
        pipeline_root="pipeline_root",
        enable_caching="pipeline_job_enable_caching"
    )
)
function_client.request(
    request_parameter=message
)

パイプラインジョブのスケジュール実行

以下で行なっているパイプラインジョブのスケジュール実行についてです。定期実行が必要なパイプラインのデプロイにおいては、上記パイプライン作成用 Cloud Functions に対して、パイプラインジョブの設定を含んだメッセージを指定したスケジュールで送信する Cloud Scheduler が作成されます。

具体的には、以下のように汎用的な Cloud Scheduler 作成用のモジュールを用意しています。

import json
import dataclasses
from google.cloud import scheduler_v1


@dataclasses.dataclass
class SchedulerSetting:
    name: str
    schedule: str
    location: str = "asia-east1"
    description: str = ""
    time_zone: str = "UTC"
    is_pause: str = False


class SchedulerClient:
    def __init__(
        self,
        project_id: str
        ) -> None:
        self.project_id = project_id
        self.client = scheduler_v1.CloudSchedulerClient()

    def _if_scheduler_exists(
        self,
        scheduler_name: str,
        location: str
        ) -> bool:
        # get existing job
        list_job_request = scheduler_v1.ListJobsRequest(
            parent=f"projects/{self.project_id}/locations/{location}",
        )
        page_result = self.client.list_jobs(request=list_job_request)
        exist_scheduker_names = [i.name for i in page_result]
        return True if scheduler_name in exist_scheduker_names else False
    
    def _delete_scheduler(
        self,
        scheduler_name: str
        ) -> None:
        # delete existing job
        delete_job_request = scheduler_v1.DeleteJobRequest(
            name=scheduler_name
        )
        response = self.client.delete_job(request=delete_job_request)
        print(f"deleted: {response}")

    def _create_scheduler(
        self,
        job: scheduler_v1.Job,
        location: str
        ) -> None:
        # create scheduler if not exists
        create_job_request = scheduler_v1.CreateJobRequest(
            parent=f"projects/{self.project_id}/locations/{location}",
            job=job
        )
        response = self.client.create_job(request=create_job_request)
        print(f"created: {response}")

    def _pause_scheduler(
        self,
        scheduler_name: str
        ) -> None:
        pause_job_request = scheduler_v1.PauseJobRequest(
            name=scheduler_name
        )
        response = self.client.pause_job(request=pause_job_request)
        print(f"paused: {response}")

    def _delete_scheduler_if_exists(
        self,
        scheduler_name: str,
        location: str
        ) -> None:
        # delete previous scheduler if exists
        if self._if_scheduler_exists(
            scheduler_name=scheduler_name,
            location=location):
            print(f"delete previous scheduler: {scheduler_name}")
            self._delete_scheduler(scheduler_name)

    def _pause_scheduler_if_is_pause(
        self,
        scheduler_name: str,
        is_pause: bool
        ) -> None:
        # pause scheduler if needed
        if is_pause:
            print(f"pause scheduler: {scheduler_name}")
            self._pause_scheduler(
                scheduler_name=scheduler_name)

    def create_pubsub_trigger(
        self,
        scheduler_setting: SchedulerSetting,
        pubsub_topic_id: str,
        message: dict = {}
        ):
        self._delete_scheduler_if_exists(
            scheduler_name=scheduler_setting.name,
            location=scheduler_setting.location
        )

        string_message = json.dumps(message) 
        pubsub_target = scheduler_v1.PubsubTarget(
            topic_name=f"projects/{self.project_id}/topics/{pubsub_topic_id}",
            data=string_message.encode("utf-8")
        )
        print(f"create scheduler triggering pubsub: {scheduler_setting.name}")

        # create scheduler
        create_job = scheduler_v1.Job(
            name=scheduler_setting.name,
            description=scheduler_setting.description,
            schedule=scheduler_setting.schedule,
            pubsub_target=pubsub_target,
            time_zone=scheduler_setting.time_zone
        )
        self._create_scheduler(
            job=create_job,
            location=scheduler_setting.location)
        print(f"created scheduler: {scheduler_setting.name}")

        self._pause_scheduler_if_is_pause(
            scheduler_name=scheduler_setting.name,
            is_pause=scheduler_setting.is_pause
        )

    def create_function_trigger(
        self,
        scheduler_setting: SchedulerSetting,
        function_location: str,
        function_name: str,
        service_account: str,
        message: dict = {}
        ):
        self._delete_scheduler_if_exists(
            scheduler_name=scheduler_setting.name,
            location=scheduler_setting.location
        )
        url = f'https://{function_location}-{self.project_id}.cloudfunctions.net/{function_name}'
        print(f"http target uri: {url}")
        print(f"http target sa: {service_account}")
        token = scheduler_v1.OidcToken(
            service_account_email=service_account)

        string_message = json.dumps(message) 
        http_target = scheduler_v1.HttpTarget(
            uri=url,
            http_method=scheduler_v1.HttpMethod.POST,
            oidc_token=token,
            body=string_message.encode("utf-8")
        )
        print(f"create scheduler triggering function: : {scheduler_setting.name}")

        # create scheduler
        create_job = scheduler_v1.Job(
            name=scheduler_setting.name,
            description=scheduler_setting.description,
            schedule=scheduler_setting.schedule,
            http_target=http_target,
            time_zone=scheduler_setting.time_zone
        )
        self._create_scheduler(
            job=create_job,
            location=scheduler_setting.location)
        print(f"created scheduler: {scheduler_setting.name}")

        self._pause_scheduler_if_is_pause(
            scheduler_name=scheduler_setting.name,
            is_pause=scheduler_setting.is_pause
        )

この Cloud Scheduler クライアントモジュールを以下のようにパイプラインジョブの設定を含めたメッセージと共に呼び出すことで、スケジューラを作成しています。

import dataclasses


_CREATE_PIPELINE_JOB_FUNCTION_NAME = "pipeline-job-creation-function-name"
_CREATE_PIPELINE_JOB_LOCATION = "pipeline-job-creation-function-location"
_DEMPLOYMENT_SERVICE_ACCOUNT = "service-account-for-pipeline-deployment@{project_id}.iam.gserviceaccount.com"


@dataclasses.dataclass
class _PipelineJobSetting:
    display_name: str
    location: str
    pipeline_spec_uri: str
    pipeline_root: str
    enable_caching: bool


scheduler_client = SchedulerClient(
    project_id="project_id"
)
scheduler_setting = SchedulerSetting(
    name="scheduler_name",
    schedule="cron_schedule",
    location="scheduler_location",
    description="scheduler_description",
    time_zone="scheduler_time_zone",
    is_pause="if_scheduler_is_paused"
)
message = dataclasses.asdict(
    _PipelineJobSetting(
        display_name="pipeline_job_display_name",
        location="pipeline_job_location",
        pipeline_spec_uri="pipeline_spec_uri",
        pipeline_root="pipeline_root",
        enable_caching="pipeline_job_enable_caching"
    )
)
scheduler_client.create_function_trigger(
    scheduler_setting=scheduler_setting,
    function_location=_CREATE_PIPELINE_JOB_LOCATION,
    function_name=_CREATE_PIPELINE_JOB_FUNCTION_NAME,
    service_account=_DEMPLOYMENT_SERVICE_ACCOUNT.format(
        project_id="project_id"),
    message=message
)

Vertex Pipelines デプロイ処理のモジュール化

ここまで紹介させていただいたパイプラインのデプロイ処理は、Cloud Functions にデプロイされているパイプラインジョブ作成関数を除いて、モジュール化してプロジェクト間共通で使用できるようにしています。この Vertex Pipelines のデプロイ処理モジュールは、以下のようにパッケージとして Artifact Registry にリリースされており、各パイプラインのデプロイ時にインストールされ実行されます。

このデプロイ処理モジュールが行うのは以下の処理となっています。

  1. コンポーネント定義ファイルへの設定情報の追加
  2. 共通コンポーネント定義ファイルのロード
  3. パイプラインのコンパイル・Google Cloud Storage (GCS) へのプッシュ
  4. 一回限りの実行の場合はジョブ作成用関数の呼び出し、定期実行の場合はスケジューラの作成
  5. 処理完了・失敗の Slack への通知

また、スケジューラのタイムゾーンや、各種ファイルの保存先のパスのフォーマットなど、プロジェクト間で共通化させたい値をモジュール内に持たせてあります。そのため、各プロジェクトでの開発においては、プロジェクト単位で設定する必要のある値だけを渡せば良いという形になっています。

Vertex Pipelines のデプロイ実行

パイプラインのデプロイは、上記パッケージをインストールした Cloud Builder イメージを用意しておき、その上でパイプラインを記述したファイルを実行することで行われます。

パイプラインファイル内の記述は簡単には以下のように、パイプラインの定義と、デプロイ処理パッケージをデプロイに必要な情報(スケジューラやパイプラインジョブの設定)を引数として呼び出す処理を含むものとなっています。

import os
from pathlib import Path
import kfp
from vertex_pipeline_deployer import (
    PipelineConfig,
    PipelineDeployer
)


# set as environment variable from cloudbuild.yaml
PROJECT_ID = os.environ['PROJECT_ID']
MODEL_VERSION = os.environ['MODEL_VERSION']
PROJECT_NAME = os.environ['PROJECT_NAME']

_IS_SCHEDULED = True  # if pipeline is scheduled or not

# get pipeline name from file name
PIPELINE_NAME = Path(__file__).name.split(".")[0]

# [fixed] set up pipeline specific configs
pipeline_conf = PipelineConfig(
    name=PIPELINE_NAME,
    project_id=PROJECT_ID,
    project_name=PROJECT_NAME,
    model_version=MODEL_VERSION,
    description="pipeline_description",
    enable_caching="pipeline_enable_caching"
)
if _IS_SCHEDULED:
    pipeline_conf.set_scheduler(
        schedule="scheduler_schedule",
        description="scheduler_description",
        is_pause="scheduler_is_pause"
    )


# define pipeline parameter
pipeline_parameter = {
    "param_1": "aaa",
    "param_2": "bbb"
}


# global variable for pipeline compile
PIPELINE_LOCATION = pipeline_conf.location
PIPELINE_ID = pipeline_conf.id


@kfp.dsl.pipeline(
    name=PIPELINE_ID, 
    description=pipeline_conf.description, 
    pipeline_root=pipeline_conf.root
    )
def pipeline(
    param_1: str,
    param_2: str
    ):
    
    # load components
    component_store = kfp.components.ComponentStore(
        local_search_paths=[
            "component/",  # local directory project specific component definition files locate
            "component/common/"  # local directory common component definition files locate
        ]
    )

    component_1_op = component_store.load_component('component_1')
    component_2_op = component_store.load_component('component_2')
    
    component_1_task = component_1_op(
        param_1=param_1
    )
    component_2_task = component_2_op(
        param_2=param_2
    ).after(component_1_task)
    
    
# deploy pipeline and its parameter
pipeline_deployer = PipelineDeployer(
    pipeline_config=pipeline_conf
)
pipeline_deployer.deploy(
    pipeline_func=pipeline,
    pipeline_parameter=pipeline_parameter
)

ここで、PipelineDeployer はデプロイ処理の実行を行い、PipelineConfig はデプロイに必要な情報を持つデータクラスとなっています。これら二つが、パイプラインから見たデプロイ処理モジュールのインターフェースとなっています。

PipelineConfig は前述の通り、デフォルト値や命名規則などを保持していて、例えば、パイプラインの display_name の命名規則は同じで合ってほしいので、PipelineConfig 内でモデルのバージョンやプロジェクト名、パイプライン名から作成されるようになっています。

また、実際には設定値は環境ごとに config ファイルを用意して読み込むことで、環境に応じて値を設定できるようにするなどしています。

こちらのファイルは Cloud Build 上で、前述の Cloud Builder イメージを用いて以下のように実行されます。

steps:
  - id: Deploy Pipeline
    name: "gcr_path_of_deployment_cloud_builder:latest"
    entrypoint: 'python'
    args: ['path_of_pipeline.py']

まとめると、デプロイ時のパイプラインファイルでの処理は以下のような流れになっています。

  1. コンポーネントイメージの Google Container Registry (GCR) へのプッシュ
  2. デプロイ処理モジュールの呼び出し

そして、呼び出されたデプロイ処理モジュールは以下の処理を行います。

  1. コンポーネント定義ファイルへの設定情報の追加
  2. 共通コンポーネント定義ファイルのロード
  3. パイプラインのコンパイル・Google Cloud Storage (GCS) へのプッシュ
  4. 一回限りの実行の場合はジョブ作成用関数の呼び出し、定期実行の場合はスケジューラの作成
  5. 処理完了・失敗の Slack への通知

定期実行の場合は、作成されたスケジューラが指定されたスケジュールでジョブ作成用関数を呼び出し、パイプラインジョブが作成されます。

パイプライン実行状況の監視

前述の通り、上記ジョブ作成用関数、デプロイ処理パッケージからは以下の成功・エラーに関する通知が飛ばされるようになっています。

  • スケジューラの作成
  • パイプラインジョブの作成

これらに加えて、パイプラインジョブの実行状況を監視する関数を Cloud Functions 上に用意しています。こちらの関数からは、以下の場合に Slack へと通知が飛ぶようになっています。

  • パイプラインジョブが完了した場合
  • パイプラインジョブが失敗した場合
  • パイプラインジョブが指定時間以上実行し続けている場合

こちらに関しては、以下の ZOZO テクノロジーズさんの記事を参考に作成しています。

Kubeflow PipelinesからVertex Pipelinesへの移行による運用コスト削減

テンプレートリポジトリの作成

モデル学習・バッチ推論パイプラインのデプロイ処理に関しては、上記の通りデプロイ処理用モジュールを呼び出せばよく、共通する処理に関しては共通のコンポーネントを使用すれば良い形になっています。ここで、デプロイ処理モジュールのロードや呼び出し、共通コンポーネントの使用などはプロジェクトのコードに各自記述していく必要があります。そのため、テンプレートリポジトリを用意し、開発時にはパイプラインやコンポーネントの処理を記述し、設定を変更するだけで良いという形にしています。

また、Kubeflow では Lightweight Python コンポーネントという、Python の関数でコンポーネントを定義できる機能があります。簡易に検証を行う際には非常に便利な機能ですが、プロダクションコードでそれを使ってしまうと記述の自由度が大きくなりすぎるため、テンプレートリポジトリで通常のコンポーネントの記述のみを行うように制限しています。

また、ドメインロジックをパイプラインやコンポーネント処理(task.py)に記述しないように、以下のような切り分けを定めています。

  • データのロード、モデルの学習や前処理など、具体的な処理はモジュールとして実装する
  • コンポーネント処理(task.py)にはモジュールの呼び出しを記述する
  • パイプラインではそれらコンポーネントのワークフロー処理を記述する

そのため、パイプライン・コンポーネント・モジュールを格納したディレクトリの構成は、簡略化すると以下のようになっています。

.
├── module
├── component
│   ├── component_1
│   │   ├── task.py
│   │   └── component.yaml
│   └── component_2
│       ├── task.py
│       └── component.yaml
└── pipeline
    ├── train.py
    └── batch_predict.py

モデル学習・バッチ推論パイプラインの分離

上記ディレクトリにあるように、モデル学習パイプラインとバッチ推論パイプラインは別ファイルに定義する形にしています。そのため、パイプラインジョブもそれぞれ別に作成されます。

モデル学習パイプラインは学習完了後、学習されたモデルの情報(モデルの作成された国やモデルのバージョン、実行開始時間など)と完了フラグを保存します。バッチ推論パイプラインはその完了フラグの作成を待ち、完了すればモデルの情報を元に推論を進めます。これにより、モデル学習バイプラインとバッチ推論パイプラインの接続を行なっています。

推論結果サービング API における共通化

これまではモデル学習・バッチ推論パイプラインに絞って共通化の取り組みを紹介してきました。最後に、推論結果サービング API についても共通化の取り組みを紹介できればと思います。

推論結果サービング API の行う処理としては、以下のように、プロダクトアプリケーションからのリクエストに応じて、モデル学習パイプラインから Firestore (Datastore mode) に格納された推論結果を取得し返すというシンプルなものとなっています。

推論結果サービング API に関しては、Firestore (Datastore mode) からの推論結果の取得処理をパッケージ化することで共通化しています。

推論結果の取得処理は、推論結果のキーとなる値一つ以上受け取り、Firestore (Datastore mode) の Kind へと変換し取得し、取得された内容を変換し返すというものになっており、呼び出し側から Firestore (Datastore mode) の仕様を隠しています。また、このパッケージ内で Kind の命名規則など、チーム内で決定したルールを吸収するようになっています。

それによって、個々のプロジェクトの開発の際はインターフェース、必要であれば追加で整形処理を記述するだけで済むようになっています。

また、推論結果のサービング API に関してもテンプレートリポジトリを作成しており、基本的な API の構成や処理、インターフェースの記述が含まれています。そして、デプロイ時にこちらのパッケージがインストールされるようになっています。

最後に

AnyMind の機械学習エンジニアチームでは、プロジェクトで新しい構成が必要な場合はプロジェクト単位で個別に実装して、その後共通化するメリットがあれば共通化していくという形で進めています。

現状、バッチ推論を用いる機械学習プロジェクトに関しては上記のように共通化できているのですが、例えばオンライン推論が必要なプロジェクトなどに関してはまだできていません。今後その辺を含めて新しく仕組み化できればまたご紹介できればと思っています。

これまでの MLOps に関する取り組み

これまでの AnyMind 機械学習エンジニアチームとしての MLOps の取り組みに関しては以下の記事で紹介しています。

Latest News