< script src="https://unpkg.com/@highlightjs/cdn-assets@11.0.0/highlight.min.js">

Tech Blog

Facebook Icon Twitter Icon Linkedin Icon

AnyMind Group

Facebook Icon Twitter Icon Linkedin Icon

[Tech Blog] Vertex Pipelines (Kubeflow) for Machine Learning Model Training Pipeline

Hi, I’m Naoki Komoto (河本 直起) working as Machine Learning Engineer in AnyMind.

In AnyMind, we are developing our MLOps environment, including the model training pipeline from scratch.

In previous article, I introduced our data pipeline using Cloud Composer (Airflow) including its current set up and future plans.

In this article, I’d like to introduce our ML model training pipeline using Vertex Pipelines (Kubeflow).

Background / Problems

Firstly, let me explain about background and problems before introducing Vertex Pipelines (Kubeflow).

Individualization of Model Training and Model Serving Code

Previously in AnyMind, each project has its own model training and serving method depending on developer. For example, a model is trained on notebook environment and released to repository, one batch function does all the training. In addition, there was no functionality to schedule model training, or to consistently train models, deploy models to the API, and perform batch prediction. Each of which was performed in its own way, including manual execution.

Therefore, it was necessary to prepare a common model training infrastructure with the necessary functionality, to reduce the amount of individualization and lowering the cost of operation and improvement.

Need for Workflow Processing

AnyMind’s service is offered across languages and countries. At the same time, for example, one of our platform AnyTag needs to provide the same functions across multiple SNSs, such as Instagram and Youtube. Different SNSs have different features, the same features need to be processed differently for each language, and models need to be created for each country.

That’s why model training process for the same machine learning function requires workflow processing such as branching and parallel processing. However, the previous environment could not support such processing.

Length of Time to Experimentation and Rewrite Experimentation Code

With the above background, there were multiple patterns of processing required for model experimentation, which made the experimentation process time-consuming to execute.

Another problem that arises in general development involving machine learning models is the need to rewrite the code used for experimentation for production purposes after the experimentation is complete. In addition to the execution time of the experimentation process, the time required for its rewriting was also a bottleneck.

Why Vertex Pipelines (Kubeflow) ?

Vertex Pipelines (Kubeflow) was well suited for the above task because of the following features:

  • Ability to develop model training and serving process as a workflow
  • Reusability of code as a component
  • Reusability of cached result of previous run when component is to be executed with the same inputs

Additionaly, In AnyMind we are developing and managing multiple ML functions with small members. Because Vertex Pipelines (Kubeflow) is a fully managed service, it requires less knowledge and skillset, and less resources to maintain. That’s another reason Vertex Pipelines (Kubeflow) is chosen.

On the other hand, Some requirements are not satisfied with Vertex Pipelines (Kubeflow). For example, Kubeflow’s current Python SDK (kfp v2) does not provide an interface for scheduled execution. However, because it turned out that it’s all easy to develop using Google Cloud Platform’s API for Python, We decided to develop by our own.

Implementation Details

In the following, I will describe how we actually implemented the system.

Deployment Unit Isolation

Currently, the model learning pipeline has a separated repository for each project, and components are also created on a project basis. The reason is that there are currently not many processes that are common to all projects, so there are few advantages to commonization, while the best practices for implementation have not yet been established, so the risk of commonization is high.

By storing the model training process and prediction process in a same repository, common processes such as pre-processing can be used between the them. This has allowed us to resolve processing discrepancies between model training process and prediction process.

Building Component

In Kubeflow, there are two ways to write components:

  • Defining interface in yaml file and process in python file
  • Defining interface and process as python function

All project specific components, such as model creation, are written with the former way. That’s because it makes separation among images and directories clear and reusing components between projects easy.

On the other hand, components which are dependent to pipeline, such as component to wait for previous job’s completion or component to get executed time, are written in the latter way. Those components are placed in same directory as pipeline and executed on the same image as the pipeline.

Components’ images are built by Cloud Build as follows, and built images are stored on Container Registry:

substitutions:
  _VERSION: version_of_pipeline
  _PROJECT_NAME: project_name
  # from circle ci
  _COMMIT_SHA: none

steps:
  - id: pull cache component image
    name: 'gcr.io/cloud-builders/docker'
    entrypoint: 'bash'
    args: ['-c', 'docker pull asia.gcr.io/$PROJECT_ID/$_PROJECT_NAME/$_VERSION/workflow/component/component_name:latest || exit 0']
  - id: build component image
    name: 'gcr.io/cloud-builders/docker'
    args: [
        'build',
        '-f', './image/workflow/component/component_name/Dockerfile',
        '-t', 'asia.gcr.io/$PROJECT_ID/$_PROJECT_NAME/$_VERSION/workflow/component/component_name:latest',
        '-t', 'asia.gcr.io/$PROJECT_ID/$_PROJECT_NAME/$_VERSION/workflow/component/component_name:$_COMMIT_SHA',
        '--cache-from', 'asia.gcr.io/$PROJECT_ID/$_PROJECT_NAME/$_VERSION/workflow/component/component_name:latest',
        '.'
    ]
  - id: push component image to gcr
    name: 'gcr.io/cloud-builders/docker'
    args: ['push', 'asia.gcr.io/$PROJECT_ID/$_PROJECT_NAME/$_VERSION/workflow/component/component_name:latest']

images:
  - 'asia.gcr.io/$PROJECT_ID/$_PROJECT_NAME/$_VERSION/workflow/component/component_name:$_COMMIT_SHA'

Building Pipeline

Building process of pipeline is written in python file with its workflow definition and parameters.

Building processes are following:

  • Overwrite component definition files
  • Compile pipeline
  • Upload compiled pipeline and its parameters to Cloud Storage
  • For one time execution, publish a message to Cloud Pub/Sub topic for pipeline job creation
  • For scheduled execution, publish a message to the Cloud Pub/Sub topic for Cloud Scheduler creation

The details of them are described later.

Following is sample code for building pipeline:

import os
import json
import kfp
from kfp.v2.compiler import Compiler
from gcsfs import GCSFileSystem
from google.cloud import pubsub


# set to image from cloud build
PROJECT_ID = os.environ['PROJECT_ID']
VERSION = os.environ['VERSION']
PROJECT_NAME = os.environ['PROJECT_NAME']


# sample pipeline parameters
PIPELINE_PARAMETERS = {
    "param1": "value_of_param1",
    "param2": "value_of_param2",
}


# sample pipeline definition
@kfp.dsl.pipeline(
    name="id of pipeline", 
    description="description of pipeline", 
    pipeline_root="root of pipeline"
    )
def sample_pipeline(
    param1: str,
    param2: str
    ):
    
    # load light weight components
    component1_op = kfp.components.create_component_from_func(
        func=component1, base_image="gcr path to pipeline image",
    )

    # load components from yaml file
    components_store = kfp.components.ComponentStore(
        local_search_paths=["src/workflow/component"])
    component2_op = components_store.load_component('component2')

    # define workflow
    component1_task = component1_op(
        param1=param1
    )
    component2_task = component2_op(
        param2=param2
    ).after(component1_task)


# compile and save pipeline
Compiler().compile(
    pipeline_func=sample_pipeline,
    package_path="pipeline.json"
)


# save parameter
with open('pipeline_values.json', 'w') as f:
    json.dump(PIPELINE_PARAMETERS, f)


# put pipeline and values to cloud storage
fs = GCSFileSystem()
fs.put("pipeline.json", "storage path of pipeline")
fs.put("pipeline_values.json", "storage path of pipeline parameter")


# publish message to cloud pub/sub
publisher = pubsub.PublisherClient()
IS_ENABLE_SCHEDULING = True  # if pipeline is scheduled or not
if not IS_ENABLE_SCHEDULING:
    # if schedule is not enabled, just run once
    topic_message = {
        "version": "version of pipeline",
        "pipeline": {
            "location": "location of pipeline",
            "pipeline_spec_uri": "storage path of pipeline",
            "pipeline_root": "root of pipeline",
            "pipeline_parameter_values_uri": "storage path of pipeline parameter",
            "enable_caching": False
        }
    }
    topic_path = publisher.topic_path(PROJECT_ID, "name of pipeline creation topic")
else:
    topic_message = {
        "version": "version of pipeline",
        "scheduler": {
            "location": "location of scheduler",
            "schedule": "cron schedule of scheduler",
            "description": "desctiption of scheduler",
            "time_zone": "time zone of scheduler",
            "is_pause": False
        },
        "pipeline": {
            "location": "location of pipeline",
            "pipeline_spec_uri": "storage path of pipeline",
            "pipeline_root": "root of pipeline",
            "pipeline_parameter_values_uri": "storage path of pipeline parameter",
            "enable_caching": False
        }
    }
    topic_path = publisher.topic_path(PROJECT_ID, "name of scheduler creation topic")

future = publisher.publish(topic_path, json.dumps(topic_message).encode("utf-8"))
message_id = future.result()

The above process is executed from Cloud Build as follows:

substitutions:
  _VERSION: version_of_pipeline
  _PROJECT_NAME: project_name
  # from circle ci
  _COMMIT_SHA: none

steps:
  # pipeline
  - id: pull cache pipeline image
    name: 'gcr.io/cloud-builders/docker'
    entrypoint: 'bash'
    args: ['-c', 'docker pull asia.gcr.io/$PROJECT_ID/$_PROJECT_NAME/$_MODEL_VERSION/workflow/pipeline:latest || exit 0']
  - id: build pipeline image
    name: 'gcr.io/cloud-builders/docker'
    args: [
        'build',
        '-f', './image/workflow/pipeline/Dockerfile',
        '--build-arg', 'PROJECT_ID=$PROJECT_ID', 
        '--build-arg', 'VERSION=$_VERSION', 
        '--build-arg', 'PROJECT_NAME=$_PROJECT_NAME', 
        '-t', 'asia.gcr.io/$PROJECT_ID/$_PROJECT_NAME/$_MODEL_VERSION/workflow/pipeline:latest',
        '-t', 'asia.gcr.io/$PROJECT_ID/$_PROJECT_NAME/$_MODEL_VERSION/workflow/pipeline:$_COMMIT_SHA',
        '--cache-from', 'asia.gcr.io/$PROJECT_ID/$_PROJECT_NAME/$_MODEL_VERSION/workflow/pipeline:latest',
        '.'
    ]
  - id: push pipeline image
    name: 'gcr.io/cloud-builders/docker'
    args: ['push', 'asia.gcr.io/$PROJECT_ID/$_PROJECT_NAME/$_MODEL_VERSION/workflow/pipeline:latest']
  - id: deploy pipeline
    name: "asia.gcr.io/$PROJECT_ID/$_PROJECT_NAME/$_MODEL_VERSION/workflow/pipeline:latest"
    entrypoint: 'python'
    args: ['src/workflow/pipeline/pipeline.py']

Overwriting component definition file

Component definition file has a path in the Container Registry of the execution image as shown below, but components are deployed in separate projects for each environment, such as staging and production.

name: Sample Component

inputs:
- name: param1
  type: String

implementation:
  container:
    image: asia.gcr.io/{project_id}/{project_name}/{version}/workflow/component/batch_predict:latest
    args: [
      --param1, {inputValue: param1},
    ]

Therefore, project id is overwritten during pipeline execution. At this time, release version is also reflected.

Specifically, the following process is added to the above pipeline building process.

def overwrite_component_definition_file(project_id, project_name, version):
    component_files = glob.glob('src/workflow/component/*/component.yaml')
    for component_file in component_files:
        with open(component_file, 'r') as f:
            data = yaml.safe_load(f)
        data["implementation"]["container"]["image"] = data["implementation"]["container"]["image"].format(
            project_id=project_id, project_name=project_name, version=version)
        with open(component_file, "w") as f:
            yaml.dump(data, f)


# set variables to components
overwrite_component_definition_file(
    project_id=PROJECT_ID,
    project_name=PROJECT_NAME
    version=VERSION
)

Creating pipeline job

The process of creating pipeline jobs is separated to Cloud Functions and executed by publishing a message to Cloud Pub/Sub topic, as shown below.

The message contains the following information

  • Cloud Storage path of the compiled pipeline file saved
  • Cloud Storage path of the pipeline parameters saved
  • Other information such as release version

By publishing a message with those information to the Cloud Pub/Sub topic, Cloud Functions will create a pipeline job based on it.

The process inside Cloud Functions is as follows:

import os
import json
import base64
from gcsfs import GCSFileSystem
from google.cloud import aiplatform

PROJECT_ID = os.getenv('PROJECT_ID')
PIPELINE_SERVICE_ACCOUNT = "pipeline_service_account"

def main(event, context):
    # decode http request payload and translate into JSON object
    settings = json.loads(base64.b64decode(event['data']).decode('utf-8'))

    # get arguments
    display_name = settings['project_name'] + "-" + settings["model_version"]
    pipeline_spec_uri = settings["pipeline"]['pipeline_spec_uri']
    pipeline_parameter_values_uri = settings["pipeline"]['pipeline_parameter_values_uri']
    pipeline_root = settings["pipeline"]["pipeline_root"]
    location = settings["pipeline"]["location"]
    enable_caching = settings["pipeline"]["enable_caching"]

    # load pipeline parameters
    fs = GCSFileSystem()
    with fs.open(pipeline_parameter_values_uri, 'r') as f:
        parameter_values = json.load(f)

    # initialize aiplatform client
    aiplatform.init(
        project=PROJECT_ID,
        location=location,
    )

    # create pipeline job
    job = aiplatform.PipelineJob(
        display_name=display_name,
        template_path=pipeline_spec_uri,
        pipeline_root=pipeline_root,
        enable_caching=enable_caching,
        parameter_values=parameter_values
    )
    job.submit(
        service_account=PIPELINE_SERVICE_ACCOUNT
    )

Creating a Scheduler for Periodic Execution

Pipeline jobs can be periodically created using Cloud Scheduler. As shown below, Cloud Scheduler has information to create the above pipeline job as a message, and it is published to the above Cloud Pub/Sub topic on a set schedule.

The creation of the Cloud Scheduler is done by Cloud Functions. And it is executed through Cloud Pub/Sub same as pipeline creation. The flow of scheduler creation and following periodic pipeline job creation is as follows:

Cloud Scheduler is created as follows:

import os
import json
import base64
from google.cloud import scheduler_v1

PROJECT_ID = os.getenv('PROJECT_ID')
PUBSUB_TOPIC_ID = "scheduler_creation_topic_id"


def main(event, context):
    # decode http request payload and translate into JSON object
    settings = json.loads(base64.b64decode(event['data']).decode('utf-8'))

    job_id = settings['project_name'] + "-" + settings["model_version"]
    location = settings["scheduler"]["location"]
    time_zone = settings["scheduler"]["time_zone"]
    description = settings["scheduler"]["description"]
    schedule = settings["scheduler"]["schedule"]
    scheduler_name = f"projects/{PROJECT_ID}/locations/{location}/jobs/{job_id}"

    pipeline_settings = {}
    pipeline_settings["project_name"] = settings['project_name']
    pipeline_settings["model_version"] = settings['model_version']
    pipeline_settings["name_postfix"] = settings['name_postfix']
    pipeline_settings["pipeline"] = settings["pipeline"]

    # create pipeline settings
    string_pipeline_settings = json.dumps(pipeline_settings) 
    pubsub_target = scheduler_v1.PubsubTarget(
        topic_name=f"projects/{PROJECT_ID}/topics/{PUBSUB_TOPIC_ID}",
        data=string_pipeline_settings.encode("utf-8")
    )

    # create scheduler job
    scheduler_job = scheduler_v1.Job(
        name=scheduler_name,
        description=description,
        schedule=schedule,
        pubsub_target=pubsub_target,
        time_zone=time_zone
    )
    client = scheduler_v1.CloudSchedulerClient()

    # get existing scheduler
    list_job_request = scheduler_v1.ListJobsRequest(
        parent=f"projects/{PROJECT_ID}/locations/{location}",
    )
    page_result = client.list_jobs(request=list_job_request)
    exist_jobs = [i.name for i in page_result]

    # delete existing scheduler
    if scheduler_name in exist_jobs:
        delete_job_request = scheduler_v1.DeleteJobRequest(
            name=scheduler_name
        )
        response = client.delete_job(request=delete_job_request)
        print(f"deleted: {response}")

    # create new scheduler
    create_job_request = scheduler_v1.CreateJobRequest(
        parent=f"projects/{PROJECT_ID}/locations/{location}",
        job=scheduler_job
    )
    response = client.create_job(request=create_job_request)

    # pause scheduler if needed
    if settings["scheduler"]["is_pause"]:
        pause_job_request = scheduler_v1.PauseJobRequest(
            name=scheduler_name
        )
        response = client.pause_job(request=pause_job_request)

Overall Build Process

To summarize what has been said so far, the overall build process for periodic execution of pipeline is as follows:

Cloud Build is submitted by CircleCI, where information on which environment (staging, production, etc.) is passed as a substitution depending on the branch. The following build process is based on the information passed to it, and the values are separated according to the environment, such as by obtaining the config file for the corresponding environment.

Connection with Other Workflows

Below I describe how the model training pipeline connects to the data pipeline and batch prediction.

Data Creation Workflow

In AnyMind, data creation process is isolated to Cloud Composer (Airflow). For this reason, we have a component in the model training pipeline that creates a flag file with time information after the data creation pipeline completes processing, waits until the flag is created, and completes when the creation is confirmed. After the completion of the component, following components are executed.

Batch Inference Creation Workflow

Batch prediction with trained models and registration of the results to the data store are done by another pipeline job on Vertex Pipelines (Kubeflow). This is because the frequency of model training and the frequency of batch prediction may differ, for example, a model may be updated once a month, but batch prediction may be performed once a day.

As in the data creation process, the model training pipeline creates flags with time information after the overall process is complete. And the batch prediction pipeline has a component that waits until the flags have been created. Batch prediction is executed after all models have been trained.

Summary

In this article, I have introduced the implementation of Vertex Pipelines (Kubeflow) as a part of machine learning system in AnyMind. I hope you find it useful.

Latest News