Tech Blog

Facebook Icon Twitter Icon Linkedin Icon

AnyMind Group

Facebook Icon Twitter Icon Linkedin Icon

[Tech Blog] 機械学習導入初期フェーズにおける Vertex AI Models / Endpoints を用いたオンライン推論

こんにちは。AnyMind で機械学習エンジニアをしている河本直起です。AnyMind では 0 からデータパイプライン含めた MLOps 環境を作成しています。

以前の記事では、データパイプラインモデル学習パイプラインと、MLOps 環境の作成に向けた取り組みをご紹介させていただきました。

今回は、導入初期フェーズにおいて取っていた Vertex Pipelines (Kubeflow)、Vertex AI Models / Endpoints を使用したオンライン推論の仕組みについてご紹介できればと思います。

背景

モデル学習パイプラインについての記事でも触れた通り、AnyMind ではプロジェクトごとにさまざまな方法でのモデルのサービング方法が取られ属人化していました。そこで、共通の仕組みを整えることで、属人化に伴う改善・運用コストの増大を解決することが必要となっていました。

このような課題がある一方で、機械学習エンジニアとして複数機能のリリースに向けた開発を行う中で MLOps 環境の開発を行う必要があり、開発工数をそこまで多く割くことができませんでした。そのため、段階的に改善・拡張することを前提に、簡易的な共通の仕組みを作る必要がありました。

設計

開発にかかる工数を削減するため

  • プロダクトで用意されている RDB を使用すること
  • 全ての結果をオンライン推論で返すこと

を決め、一旦以下のような設計で開発を行いました。

モデルの学習はこちらの記事で紹介した通り、Vertex Pipelines を用いて行っています。ここで、特徴量はプロダクトアプリケーションの RDB から取得されます。そして、モデルが学習されれば学習されたモデル、推論ロジックを含んだイメージを Vertex AI Models にアップロードし、それが Vertex AI Endpoints へとデプロイされます。プロダクトアプリケーションは RDB から特徴量を取得し、それを Vertex AI Endpoints に対してリクエストすることで推論結果を得ます。特徴量の取得・成形、Vertex AI Models へのイメージのアップロード、Vertex AI Endpoints へのデプロイは Vertex Pipelines から実行される形となっています。。

Vertex AI Models / Endpoints を選んだ理由

推論を行う API は Cloud Run などを用いて開発してもよかったんですが、最終的に Vertex AI Models / Endpoints を用いて実装することに決定しました。

一つ目の理由は、Vertex Pipelines (Kubeflow) 上で行われているモデル学習パイプラインとの結合が簡単だったことです。モデルを用いた推論を行う API は、そのモデルの学習が完了して初めてリリースされます。しかし、Cloud Run などのサービスでそのリリースフローを実現するためには、そのための機能を推論 API 側で開発する必要があります。一方で、Vertex AI Models / Endpoints では Python 向け SDK である google-cloud-aiplatform を用いることで、そのパイプラインの一部として推論 API のリリースを組み込むことができます。

また、推論に使われる機械学習モデルは定期的に学習され切り替えられる必要があります。Vertex AI Models / Endpoints ではインターフェースと内部で使用するモデルが切り分けられているため、リリース後のモデルの切り替えもgoogle-cloud-aiplatform を用いてパイプラインの一部として簡単に行うことができます。

二つ目の理由は、将来的にバッチ推論を行うことを想定していたためです。バッチ推論とオンライン推論では求められるマシンリソースが大きく異なります。一方で、その二つの間での処理の差異は推論結果の際に繋がるため、その二つの間で共通の処理が行われるように制約をかける必要があります。Vertex Models は Vertex Endpoints にデプロイすればオンライン推論を行うことができ、同時に Vertex Batch Prediction としてバッチ推論にも利用可能です。

これらが、Vertex AI Models / Endpoints を選んだ理由です。

導入内容

以下では、具体的な導入内容について紹介します。

推論 API のデプロイ

学習されたモデルをロードし、特徴量を入力として推論結果を出力する簡単な API を FastAPI で実装しています。この際、Vertex AI Models で定められたインターフェースを満たす必要があります。

そして、以下のように Vertex AI Models のアップロード、アップロードされた Vertex AI Models の Vertex AI Endpoints へのデプロイを行う関数を用意します。その関数をコンポーネントとして、Vertex Pipelines (Kubeflow) で行われているモデル学習処理が完了した後に実行するという形で、モデルのサービングを行なっています。

def upload_predict_model(
        project_id: str,
        region: str,
        model_display_name: str,
        predict_image_uri: str,
        predict_route: str = "/predict"
    ) -> str:
    """
    light weight component to upload image as vertex ai model
    """
    from google.cloud import aiplatform

    # initialize vertex ai client
    aiplatform.init(project=project_id, location=region)

    # deploy model to endpoint
    model = aiplatform.Model.upload(
        display_name=model_display_name,
        serving_container_image_uri=predict_image_uri,
        serving_container_predict_route=predict_route,
        serving_container_health_route="/health_check",
        serving_container_ports=[8080]
    )
    return model_display_name


def deploy_predict_model_to_endpoint(
        service_account: str,
        project_id: str,
        region: str,
        machine_type: str, 
        min_replica_count: int, 
        max_replica_count: int, 
        endpoint_display_name: str,
        input_model_display_name: str
    ) -> None:
    from google.cloud import aiplatform
    """
    light weight component to deploy uploaded vertex ai model to endpoint
    """

    aiplatform.init(project=project_id, location=region)

    # get existing endpoint
    exist_endpoints = aiplatform.Endpoint.list(
        filter=f'display_name={endpoint_display_name}',
        order_by="create_time"
    )
    # create new if not exist
    if len(exist_endpoints) == 0:
        endpoint = aiplatform.Endpoint.create(
            display_name=endpoint_display_name
        )
    else:
        # use latest endpoint
        endpoint = exist_endpoints[-1]

    # get deployed model
    deployed_models = endpoint.list_models()

    # get uploaded model
    uploaded_models = aiplatform.Model.list(
        filter=f'display_name="{input_model_display_name}"',
        order_by="create_time"
    )
    if len(uploaded_models) == 0:
        raise ValueError(f"model_display_name [{input_model_display_name}] does not exist")
    else:
        uploaded_model = uploaded_models[-1]

    # deploy uploaded model
    endpoint.deploy(
        model=uploaded_model,
        traffic_percentage=100,
        machine_type=machine_type,
        min_replica_count=min_replica_count,
        max_replica_count=max_replica_count,
        service_account=service_account
    )

    # undeploy other models
    for deployed_model in deployed_models:
        print(f"undeploy: {deployed_model.id}")
        endpoint.undeploy(deployed_model_id=deployed_model.id)

全体フロー

推論 API がデプロイされるまでの全体的なフローは以下のようになっています。

推論 API のイメージは Container Resistry に Cloud Build を通して事前にアップロードされており、モデルの学習が完了すれば上記コンポーネントがキックされ、推論 API イメージが Vertex AI Models としてアップロードされ、Vertex AI Endpoints へデプロイされます。学習済みのモデルは推論 API コンテナからロードされる形になっています。

課題

この仕組みは段階的に拡張・改善していくことを前提に開発したものであり、以下のような課題を持っています。

  • バッチ推論ができないこと
  • 特徴量の取得をクライアント側で実装する必要があること
  • インターフェースが固定されないこと

二つ目に関しては、責任範囲と開発範囲の不一致により不必要なコミュニケーションや開発コストに繋がる一方で、生成が共通しないことによる特徴量の不一致にも繋がります。三つ目に関しては、使用する特徴量がインターフェースに含まれるため、使用する特徴量を変更するたびにインターフェースも変わり、その度にクライアント側での開発が必要となります。これらは、モデルの改善を行う上でのボトルネックとなります。

これらの課題は現在解消されており、その内容については次の記事でご紹介できればと思います。

終わりに

今回は初期フェーズで取っていた機械学習モデルを用いたオンライン推論の仕組みについてご紹介させていただきました。ご参考になれば幸いです。

Latest News