Tech Blog

Facebook Icon Twitter Icon Linkedin Icon

AnyMind Group

Facebook Icon Twitter Icon Linkedin Icon

[Tech Blog] Managing Machine Learning Architecture Using Vertex AI

Hi, I’m Naoki Komoto (河本 直起) working as Machine Learning Engineer in AnyMind.

At AnyMind we are developing our MLOps environment and in a previous article, we introduced the data infrastructure for machine learning using BigQuery View Table.

In our previous article on Vertex AI’s batch prediction infrastructure, we introduced the architecture using Vertex AI at the time of its introduction. Since then, we have made some improvements in the course of operation, so we would like to introduce those improvements in this article.

Current Flow of Machine Learning Process

First, as a prerequisite, we introduce the current flow from the data generation to the serving of prediction results.

Currently we are creating input data, training models and batch prediction and serving prediction results in the following manner:

First, input data is prepared as a View Table on BigQuery, and the model training pipeline trains a model based on the data. The subsequent batch prediction pipeline performs prediction using the trained models and stores the prediction results in BigQuery and Firestore (Datastore mode). Firestore (Datastore mode) is used to retrieve prediction results.

The product application requests the keys (e.g., user ID for per-user prediction) and metadata (e.g., top number of results needed for recommendation) to the prediction result serving API. The Serving API then retrieves the prediction results from Firestore (Datastore mode) based on the request and returns them. The serving API is deployed on Cloud Run.

For more information on the input data generation process, please refer to the following article:

BigQuery View Table for Machine Learning Feature Store

Please refer to the following articles for details on model learning, batch inference, and serving process of inference results:

Machine Learning Batch Prediction Architecture Using Vertex AI

Improvements

Currently we are using monorepo for input data creation as described in this article.

And for model training and batch prediction pipeline, and Serving API for prediction results, each project has its own repository. The reason for this was that there was not that much processing common to all projects, so there was little benefit to standardization, while the best practices for development had not yet been established, so the risk of standardization was high.

Subsequent developments in multiple projects revealed to some extent which parts should be standardized and which parts should be implemented on a project-by-project basis. In addition, since the increase in maintenance costs and the cost of new developments resulting from development on a project-by-project basis became too high to ignore, we decided to proceed with partial standardization.

Specifically, for the model training and batch prediction pipeline, we standardized the deployment and monitoring. For the processes common to all projects, we created common components to standardize them. In terms of the prediction result serving API, we modularized the common processes. We also created template repositories for both the model training and batch prediction pipelines, and the prediction result serving API, in order to reduce the amount of work required for new developments, and to reduce the variability in development methods among developers.

As a result, for project-based development, we only need to develop project-specific components and pipelines for the model training and batch prediction pipelines, and interfaces for the prediction result serving API.

In the following, we hope to present the details of these improvements.

Standardization of Vertex Pipelines Deployment Flow

Here is what we are currently doing to deploy Vertex Pipelines:

  1. Push component images to Google Container Registry (GCR)
  2. Add configuration information to component definition files
  3. Load common component definition files common to projects
  4. Compile pipeline and push to Google Cloud Storage (GCS)
  5. Create pipeline job (for one-time run)
  6. Create pipeline scheduler (for periodic run)

When a pipeline job is a one-time run, the overall structure of the Vertex Pipelines deployment process is as follows:

Every repogitory is deployed by submitting configuration files to Cloud Build via CircleCI.

Before pipeline deployment, Pipeline Job Creation Function has been deployed on Cloud Functions, and a package with the pipeline deployment process (Pipeline Deployment Package) has been released to the Artifact Registry. Component images of Common Components are stored in GCR and definition files are stored in GCS.

If the pipeline job is a one-time run, updating the project-specific pipeline repository will build the Cloud Builder image for pipeline deployment, and install and execute the package with the pipeline deployment process. Then, after loading definition files of the components common to all projects, project-specific components are pushed, and pipeline is compiled and pushed. After that, pipeline job creation function is called with configurations to create the pipeline job.

If the pipeline job is a scheduled run, the structure is following:

When the project-specific pipeline repository is updated, the installed package for the deployment process creates a scheduler, which periodically calls the function for creating pipeline jobs. This ensures that pipeline jobs are created on a regular basis.

For details of this deployment flow of following processes:

  • Push component images to Google Container Registry (GCR)
  • Add configuration information to component definition files
  • Compile pipeline and push to Google Cloud Storage (GCS)

Please refer to the following article for more information.

Vertex Pipelines (Kubeflow) for Machine Learning Model Training Pipeline

Here, we hope to present the details of the following processes and how to standardize them.

  • Load common component definition files common to projects
  • Create pipeline job (for one-time run)
  • Create pipeline scheduler (for periodic run)

Creation of Common Components between Projects

First, we explain the creation of components common to projects described below. Common components are deployed from a repository separate from each pipeline, and pipelines load the deployed common components when deployed.

As mentioned above, for the model training and batch prediction pipelines, separated repository is created for each project. For this reason, Kubeflow components that are commonly used between projects are created and deployed in a separate repository, and each project’s pipeline refers to these components when deployed.

The following two are required to use the component:

  • Component Image
  • Component definition file (component.yaml)

Since the path to the component image is described in the component definition file, only the component definition file needs to be loaded from the project pipeline. Therefore, the common component repository builds the component images, pushes them to the GCR, and stores the component definition files in the GCS.

Component definition files are automatically loaded locally from GCS during the deployment process of each pipeline in the Vertex Pipelines deployment processing module described below.

The loading process for deployed common components is as follows:

import os
from pathlib import Path
from gcsfs import GCSFileSystem


def _load_common_components(
    common_component_download_dir: str,
    common_component_dir: str
    ) -> None:
    fs = GCSFileSystem()
    common_component_dirs = fs.ls(common_component_dir)
    for component_dir in common_component_dirs:
        component_file = os.path.join(component_dir, "component.yaml")
        component_name = component_dir.split("/")[-1]
        local_path = os.path.join(
            common_component_download_dir, component_name, "component.yaml")
        Path(local_path).parent.mkdir(parents=True, exist_ok=True)
        fs.get(component_file, local_path)


_load_common_components(
    common_component_download_dir='local_path_load_component_file_to',
    common_component_dir="gcs_path_component_is_uploaded")

Since the component definition file is automatically loaded when the pipeline is deployed, the pipeline can be refered in same way as other project-specific components by specifying the directory as follows:

component_store = kfp.components.ComponentStore(
    local_search_paths=[
        "component/",  # local directory project specific component definition files locate
        "component/common/"  # local directory common component definition files locate
    ]
)

project_specific_component_op = component_store.load_component('project_specific_component')
common_component_op = component_store.load_component('common_component')

Commonly used components include following processes, for example:

  • Register prediction results to BigQuery
  • Register prediction results to Firestore (Datastore mode)
  • Obtain execution start time

Creation of Pipeline Job

Next, we explain the pipeline job creation process described below. A function for creating pipeline jobs is deployed on Cloud Functions, and pipeline jobs are created by calling this function.

The functions for creating pipeline jobs are as follows:

import os
import json
import dataclasses
from google.cloud import aiplatform


# set project id as environment variable in docker image from cloudbuild.yaml
_PROJECT_ID = os.getenv('PROJECT_ID')
_PIPELINE_SERVICE_ACCOUNT = f"service-account-for-pipeline-job-creation@{_PROJECT_ID}.iam.gserviceaccount.com"


class TypeValidationError(Exception):
    pass


@dataclasses.dataclass
class Arguments:
    display_name: str
    pipeline_spec_uri: str
    pipeline_root: str
    location: str = "asia-east1"
    enable_caching: bool = False

    def __post_init__(self):
        # validate types
        fields = dataclasses.fields(self)
        for field in fields:
            value = getattr(self, field.name)
            if not isinstance(value, field.type):
                raise TypeValidationError(
                    f"{field.name} must be {field.type}")


def main(request):
   # decode http request payload and translate into JSON object
    request = request.get_data()
    try: 
        request = json.loads(request.decode())
    except ValueError as e:
        print(f"Error decoding JSON: {e}")
        return "JSON Error", 400
    print(f'request: {request}')

    try:
        # validate and set default value
        arguments = Arguments(**request)
    except TypeError as e:
        print(f"Error getting arguments: {e}")
        return "Invalid Argument", 400
    except TypeValidationError as e:
        print(f"Error validating argument type: {e}")
        return "Invalid Argument Schema", 400

    print("initialize aiplatform client")
    aiplatform.init(
        project=_PROJECT_ID,
        location=arguments.location,
    )

    print("create pipeline job")
    try:
        job = aiplatform.PipelineJob(
            display_name=arguments.display_name,
            template_path=arguments.pipeline_spec_uri,
            pipeline_root=arguments.pipeline_root,
            enable_caching=arguments.enable_caching
        )
        job.submit(
            service_account=_PIPELINE_SERVICE_ACCOUNT
        )
        print("job submitted")

        # send success message to slack

    except Exception as e:
        
        # send error message to slack

        return "Internal Error", 500
    return "Job Completed", 200 

In previous article, it was implemented to be invoked via PubSub. To simplify processing, it has been changed to be invoked with Http requests. Also, previously schema validation was performed by PubSub message schema, but now type validation is performed inside the function.

Although omitted in the code above, when the pipeline job creation process is compoleted or failed, a message is sent to Slack to notify that.

To deploy pipeline, The following generic client modules for Cloud Functions are developed:

import json
import requests
import google.oauth2.id_token
import google.auth.transport.requests


class FunctionClient:
    def __init__(
        self,
        project_id: str,
        location: str,
        function_name: str
        ) -> None:  
        request = google.auth.transport.requests.Request()
        self.url = f'https://{location}-{project_id}.cloudfunctions.net/{function_name}'
        self.token = google.oauth2.id_token.fetch_id_token(request, self.url)
    
    def request(
        self,
        request_parameter: dict
        ):
        response = requests.post(
            self.url, 
            headers={
                'Authorization': f"Bearer {self.token}", 
                "Content-Type": "application/json"},
            data=json.dumps(request_parameter)
        )
        print(f"status code: {response.status_code}")
        print(f"reason: {response.reason}")

To create a pipeline job, this module is called with the pipeline job configuration information as follows:

import dataclasses


_CREATE_PIPELINE_JOB_FUNCTION_NAME = "pipeline-job-creation-function-name"
_CREATE_PIPELINE_JOB_LOCATION = "pipeline-job-creation-function-location"


@dataclasses.dataclass
class _PipelineJobMessage:
    display_name: str
    location: str
    pipeline_spec_uri: str
    pipeline_root: str
    enable_caching: bool


function_client = FunctionClient(
    project_id="project_id",
    location=_CREATE_PIPELINE_JOB_LOCATION,
    function_name=_CREATE_PIPELINE_JOB_FUNCTION_NAME
)
message = dataclasses.asdict(
    _PipelineJobMessage(
        display_name="pipeline_job_display_name",
        location="pipeline_job_location",
        pipeline_spec_uri="pipeline_spec_uri",
        pipeline_root="pipeline_root",
        enable_caching="pipeline_job_enable_caching"
    )
)
function_client.request(
    request_parameter=message
)

Scheduled Run of Pipeline Job

Next, we explain the scheduled execution of pipeline jobs described below. When deploying a pipeline that requires periodic execution, a Cloud Scheduler is created that sends messages containing pipeline job settings to the Cloud Functions for pipeline creation above on a specified schedule.

Specifically, the following generic client module for creating Cloud Scheduler is developed.

import json
import dataclasses
from google.cloud import scheduler_v1


@dataclasses.dataclass
class SchedulerSetting:
    name: str
    schedule: str
    location: str = "asia-east1"
    description: str = ""
    time_zone: str = "UTC"
    is_pause: str = False


class SchedulerClient:
    def __init__(
        self,
        project_id: str
        ) -> None:
        self.project_id = project_id
        self.client = scheduler_v1.CloudSchedulerClient()

    def _if_scheduler_exists(
        self,
        scheduler_name: str,
        location: str
        ) -> bool:
        # get existing job
        list_job_request = scheduler_v1.ListJobsRequest(
            parent=f"projects/{self.project_id}/locations/{location}",
        )
        page_result = self.client.list_jobs(request=list_job_request)
        exist_scheduker_names = [i.name for i in page_result]
        return True if scheduler_name in exist_scheduker_names else False
    
    def _delete_scheduler(
        self,
        scheduler_name: str
        ) -> None:
        # delete existing job
        delete_job_request = scheduler_v1.DeleteJobRequest(
            name=scheduler_name
        )
        response = self.client.delete_job(request=delete_job_request)
        print(f"deleted: {response}")

    def _create_scheduler(
        self,
        job: scheduler_v1.Job,
        location: str
        ) -> None:
        # create scheduler if not exists
        create_job_request = scheduler_v1.CreateJobRequest(
            parent=f"projects/{self.project_id}/locations/{location}",
            job=job
        )
        response = self.client.create_job(request=create_job_request)
        print(f"created: {response}")

    def _pause_scheduler(
        self,
        scheduler_name: str
        ) -> None:
        pause_job_request = scheduler_v1.PauseJobRequest(
            name=scheduler_name
        )
        response = self.client.pause_job(request=pause_job_request)
        print(f"paused: {response}")

    def _delete_scheduler_if_exists(
        self,
        scheduler_name: str,
        location: str
        ) -> None:
        # delete previous scheduler if exists
        if self._if_scheduler_exists(
            scheduler_name=scheduler_name,
            location=location):
            print(f"delete previous scheduler: {scheduler_name}")
            self._delete_scheduler(scheduler_name)

    def _pause_scheduler_if_is_pause(
        self,
        scheduler_name: str,
        is_pause: bool
        ) -> None:
        # pause scheduler if needed
        if is_pause:
            print(f"pause scheduler: {scheduler_name}")
            self._pause_scheduler(
                scheduler_name=scheduler_name)

    def create_pubsub_trigger(
        self,
        scheduler_setting: SchedulerSetting,
        pubsub_topic_id: str,
        message: dict = {}
        ):
        self._delete_scheduler_if_exists(
            scheduler_name=scheduler_setting.name,
            location=scheduler_setting.location
        )

        string_message = json.dumps(message) 
        pubsub_target = scheduler_v1.PubsubTarget(
            topic_name=f"projects/{self.project_id}/topics/{pubsub_topic_id}",
            data=string_message.encode("utf-8")
        )
        print(f"create scheduler triggering pubsub: {scheduler_setting.name}")

        # create scheduler
        create_job = scheduler_v1.Job(
            name=scheduler_setting.name,
            description=scheduler_setting.description,
            schedule=scheduler_setting.schedule,
            pubsub_target=pubsub_target,
            time_zone=scheduler_setting.time_zone
        )
        self._create_scheduler(
            job=create_job,
            location=scheduler_setting.location)
        print(f"created scheduler: {scheduler_setting.name}")

        self._pause_scheduler_if_is_pause(
            scheduler_name=scheduler_setting.name,
            is_pause=scheduler_setting.is_pause
        )

    def create_function_trigger(
        self,
        scheduler_setting: SchedulerSetting,
        function_location: str,
        function_name: str,
        service_account: str,
        message: dict = {}
        ):
        self._delete_scheduler_if_exists(
            scheduler_name=scheduler_setting.name,
            location=scheduler_setting.location
        )
        url = f'https://{function_location}-{self.project_id}.cloudfunctions.net/{function_name}'
        print(f"http target uri: {url}")
        print(f"http target sa: {service_account}")
        token = scheduler_v1.OidcToken(
            service_account_email=service_account)

        string_message = json.dumps(message) 
        http_target = scheduler_v1.HttpTarget(
            uri=url,
            http_method=scheduler_v1.HttpMethod.POST,
            oidc_token=token,
            body=string_message.encode("utf-8")
        )
        print(f"create scheduler triggering function: : {scheduler_setting.name}")

        # create scheduler
        create_job = scheduler_v1.Job(
            name=scheduler_setting.name,
            description=scheduler_setting.description,
            schedule=scheduler_setting.schedule,
            http_target=http_target,
            time_zone=scheduler_setting.time_zone
        )
        self._create_scheduler(
            job=create_job,
            location=scheduler_setting.location)
        print(f"created scheduler: {scheduler_setting.name}")

        self._pause_scheduler_if_is_pause(
            scheduler_name=scheduler_setting.name,
            is_pause=scheduler_setting.is_pause
        )

a scheduler is created by calling this Cloud Scheduler client module with a message including the pipeline job settings as shown below:

import dataclasses


_CREATE_PIPELINE_JOB_FUNCTION_NAME = "pipeline-job-creation-function-name"
_CREATE_PIPELINE_JOB_LOCATION = "pipeline-job-creation-function-location"
_DEMPLOYMENT_SERVICE_ACCOUNT = "service-account-for-pipeline-deployment@{project_id}.iam.gserviceaccount.com"


@dataclasses.dataclass
class _PipelineJobSetting:
    display_name: str
    location: str
    pipeline_spec_uri: str
    pipeline_root: str
    enable_caching: bool


scheduler_client = SchedulerClient(
    project_id="project_id"
)
scheduler_setting = SchedulerSetting(
    name="scheduler_name",
    schedule="cron_schedule",
    location="scheduler_location",
    description="scheduler_description",
    time_zone="scheduler_time_zone",
    is_pause="if_scheduler_is_paused"
)
message = dataclasses.asdict(
    _PipelineJobSetting(
        display_name="pipeline_job_display_name",
        location="pipeline_job_location",
        pipeline_spec_uri="pipeline_spec_uri",
        pipeline_root="pipeline_root",
        enable_caching="pipeline_job_enable_caching"
    )
)
scheduler_client.create_function_trigger(
    scheduler_setting=scheduler_setting,
    function_location=_CREATE_PIPELINE_JOB_LOCATION,
    function_name=_CREATE_PIPELINE_JOB_FUNCTION_NAME,
    service_account=_DEMPLOYMENT_SERVICE_ACCOUNT.format(
        project_id="project_id"),
    message=message
)

Modularization of Vertex Pipelines Deployment

The pipeline deployment processes introduced so far, with the exception of the pipeline job creation function deployed in Cloud Functions, are modularized for common use across projects. This Vertex Pipelines deployment processing module is released as a Python package in the Artifact Registry as shown below, and is installed and executed when each pipeline is deployed.

This deployment processing module performs the following processes:

  1. Add configuration information to project’s component definition files
  2. Load common component definition files
  3. Compile pipeline and push to Google Cloud Storage (GCS)
  4. Call the function to create a job for one-time execution or create a scheduler for periodic execution
  5. Notification to Slack of completed/failed jobs

In addition, the module contains values that should be shared among projects, such as the time zone of the scheduler and the format of the paths where various files are saved. Therefore, when developing in each project, only values that need to be set for each project can be passed on.

Deployment of Vertex Pipelines

Pipeline deployment is done by preparing a Cloud Builder image with the above packages installed and executing a file of the pipeline definition.

The cord for the pipeline file is simply as follows. It includes a definition of the pipeline and a process of calling the deployment module with the information necessary for deployment (scheduler and pipeline job settings) as arguments.

import os
from pathlib import Path
import kfp
from vertex_pipeline_deployer import (
    PipelineConfig,
    PipelineDeployer
)


# set as environment variable from cloudbuild.yaml
PROJECT_ID = os.environ['PROJECT_ID']
MODEL_VERSION = os.environ['MODEL_VERSION']
PROJECT_NAME = os.environ['PROJECT_NAME']

_IS_SCHEDULED = True  # if pipeline is scheduled or not

# get pipeline name from file name
PIPELINE_NAME = Path(__file__).name.split(".")[0]

# [fixed] set up pipeline specific configs
pipeline_conf = PipelineConfig(
    name=PIPELINE_NAME,
    project_id=PROJECT_ID,
    project_name=PROJECT_NAME,
    model_version=MODEL_VERSION,
    description="pipeline_description",
    enable_caching="pipeline_enable_caching"
)
if _IS_SCHEDULED:
    pipeline_conf.set_scheduler(
        schedule="scheduler_schedule",
        description="scheduler_description",
        is_pause="scheduler_is_pause"
    )


# define pipeline parameter
pipeline_parameter = {
    "param_1": "aaa",
    "param_2": "bbb"
}


# global variable for pipeline compile
PIPELINE_LOCATION = pipeline_conf.location
PIPELINE_ID = pipeline_conf.id


@kfp.dsl.pipeline(
    name=PIPELINE_ID, 
    description=pipeline_conf.description, 
    pipeline_root=pipeline_conf.root
    )
def pipeline(
    param_1: str,
    param_2: str
    ):
    
    # load components
    component_store = kfp.components.ComponentStore(
        local_search_paths=[
            "component/",  # local directory project specific component definition files locate
            "component/common/"  # local directory common component definition files locate
        ]
    )

    component_1_op = component_store.load_component('component_1')
    component_2_op = component_store.load_component('component_2')
    
    component_1_task = component_1_op(
        param_1=param_1
    )
    component_2_task = component_2_op(
        param_2=param_2
    ).after(component_1_task)
    
    
# deploy pipeline and its parameter
pipeline_deployer = PipelineDeployer(
    pipeline_config=pipeline_conf
)
pipeline_deployer.deploy(
    pipeline_func=pipeline,
    pipeline_parameter=pipeline_parameter
)

Here, the PipelineDeployer executes a deployment process and the PipelineConfig is a data class containing information necessary for deployment. These two are the interfaces of the deploy processing module from the pipeline’s point of view.

The PipelineConfig, as mentioned above, holds default values, naming rules, etc. For example, naming rules for the pipeline display_name is better to be same between projects, so it is generated inside PipelineConfig from passed model version, project name and pipeline name.

In the actual code, configuration values are set in config files for each environment and loaded from pipeline file, so that values can be set according to the environment.

This file is executed on Cloud Build using the aforementioned Cloud Builder image as follows:

steps:
  - id: Deploy Pipeline
    name: "gcr_path_of_deployment_cloud_builder:latest"
    entrypoint: 'python'
    args: ['path_of_pipeline.py']

In summary, the processes in the pipeline file at deployment are as follows:

  1. Push component images to Google Container Registry (GCR)
  2. Call the deployment processing module

The called deployment processing module then performs the following processing:

  1. Add configuration information to project’s component definition files
  2. Load common component definition files
  3. Compile pipeline and push to Google Cloud Storage (GCS)
  4. Call the function to create a job for one-time execution or create a scheduler for periodic execution
  5. Notification to Slack of completed/failed jobs

In the case of periodic run, the created scheduler calls the function for pipeline job creation on the specified schedule, and the pipeline job is created.

Monitoring Pipeline Job Execution

As mentioned above, the pipeline job creation function and the pipeline deployment package will send notification in the following cases:

  • Scheduler creation failed or is completed
  • Pipeline job creation failed or is completed

In addition to these, a function on Cloud Functions to monitor execution status of pipeline jobs is prepared. This function sends notifications to Slack in the following cases:

  • When pipeline job is finished
  • When pipeline job failed
  • When pipeline job continues to run for more than the specified time

I used the following article by ZOZO Technologies as a reference to develop this monitoring.

Kubeflow PipelinesからVertex Pipelinesへの移行による運用コスト削減

Template Repository

As described above, Model training and batch prediction pipelines can be deployed by calling the deployment module. And the common components can be used for common processes across projects. Here, the loading and calling of deployment processing modules and the use of common components must be written in the project code on each development. For this reason, we have prepared template repositories where developers only need to develop pipeline and component processings and to change settings.

Kubeflow has a feature called Lightweight Python Components, which allows components to be defined as Python functions. This is a very useful feature for simple trial, but using it in production code allows too much freedom of code description. To prevent that by restricting the use of this feature, the template repository forces to use only normal components.

In addition, the following separation is defined so that domain logic is not written in pipelines or components (task.py):

  • Specific processes such as data loading, model training and preprocessing are implemented as modules
  • Components (task.py) just call the modules
  • Pipelines describe a workflow of these components

Therefore, the structure of the directory containing pipelines, components and modules is simplified as follows:

.
├── module
├── component
│   ├── component_1
│   │   ├── task.py
│   │   └── component.yaml
│   └── component_2
│       ├── task.py
│       └── component.yaml
└── pipeline
    ├── train.py
    └── batch_predict.py

Separation of model training and batch prediction pipelines

As shown in the above directory, a model training pipeline and a batch prediction pipeline are defined in separate files. Therefore, pipeline jobs are also created separately for each.

After training is complete, a model training pipeline stores information about trained models (e.g., countries in which the model was trained, the model’s version, and a start time of the training) and a completion flag. A batch prediction pipeline waits for the completion flag to be created and then proceeds with prediction based on the model information once the training pipeline is complete. This connects a model training pipeline and a batch prediction pipeline.

Standardization of Prediction Result Serving API

Up to this point, we have focused on the model training and batch prediction pipelines and have introduced standardization efforts for them. Finally, we would like to introduce our efforts to standardize the prediction result serving API as well.

The prediction result serving API simply fetches and returns prediction results stored in Firestore (Datastore mode) from the model training pipeline in response to a request from the product application, as shown below.

The prediction result serving API is standardized by packaging the process of fetching prediction results from Firestore (Datastore mode).

The process of retrieving prediction results takes one or more key values of the prediction result, converts them into a Firestore (Datastore mode) Kind, and returns the converted contents. This hides the Firestore (Datastore mode) specification from the caller. The package also absorbs the naming rules for Kind and other rules decided by Machine Learning Engineer team.

This allows us to simply write the interface and, if necessary, additional formatting processes when developing individual projects.

For the prediction result serving API, we have also created a template repository, which contains basic API configuration, processing, and interface descriptions. The package is installed at the time of deployment.

Summary

We develop new configurations on a project-by-project basis when they are needed, and then standardize them if there is merit in doing so.

Currently, we have been able to standardize machine learning projects that use batch prediction as described above, but we have not yet been able to do so for projects that require online prediction, for example. We hope to introduce the new system in the future when we can finalize it.

Other Articles on AnyMind’s MLOps

You can read more about our MLOps efforts to date in AnyMind in the following article:

Latest News