Tech Blog

Facebook Icon Twitter Icon Linkedin Icon

Facebook Icon Twitter Icon Linkedin Icon

[Tech Blog] Cloud Composer (Airflow) for Machine Learning Data Pipeline

Hi, I’m Naoki Komoto (河本 直起) working as Machine Learning Engineer in AnyMind.

In AnyMind, we’ve just started introducing machine learning functions to our products. So we are developing our MLOps environment, including the data pipeline from scratch.

In this article, I’d like to introduce our data pipeline using Cloud Composer (Airflow) including its current set up and future plans.

Problems

The dataset for the machine learning (ML) model training is originally stored in the product application’s RDB. When I joined the team, model training application directly fetches dataset from the RDB.

As RDB is chosen due to its suitability with the product application’s requirements and not machine learning, we encountered the following problems:

  • RDB does not tolerate the heavy data processing that comes with ML model training and ML experiment
  • Load on RDB is increasing as more ML functions are added

Requirements

From problems stated above, the most important requirement is to separate the product application’s dataset and the ML dataset, and store the ML dataset on a datastore more suited for ML purposes.

However, as we didn’t have much resources for development and we need the datastore as soon as possible – I decided to set the following minimum requirements as a start:

  • ML dataset is on BigQuery
  • The dataset is updated on a daily basis
  • Data processing is done using project level datasets and not a general dataset used between projects

Architecture

To satisfy these requirements, I decided to import the product application’s dataset and create a dataset for ML using the architecture below:

It’s done with Cloud Composer (Airflow). The reasons are explained below.

Why Cloud Composer (Airflow)?

I chose Cloud Composer (Airflow) as the data pipeline framework although I also considered using Vertex Pipelines (Kubeflow).

We already use Vertex Pipelines (Kubeflow) to train ML models in AnyMind. While Vertex Pipelines (Kubeflow) is a fully managed service and is easy to use without much knowledge on infrastructure, it’s still a new service. There are less commonly prepared components on Vertext Pipelines (Kubeflow) compared to Cloud Composer (Airflow) and we would need to develop them by ourselves to some extent.

We still don’t have enough knowledge or best practices on data / model creation. Because of this, it’s possible that our code base is too specific to one developer and becomes hard to generalize if the data creation is also done with Vertex Pipelines (Kubeflow). So, choosing Cloud Composer (Airflow) helps to avoid this. This is the first reason.

In ML development, data creation and model training workflows will never be in the same deployment unit. So, if these two are working on the same environment, it leads to a project’s code becoming too specific which then becomes hard to separate when there may be a need to do so. The second reason is to strictly separate them by using different pipeline services. This separation makes it easy to hand data pipelines over to other teams.

The advantage of needing lesser skillsets to use Vertex Pipelines (Kubeflow) comes from the fact that Data Scientists and Infrastructure Engineers usually don’t have common skillsets. However as an engineer developing data pipelines and hence having knowledge related to infrastructure, is also one of the reasons for choosing Cloud Composer (Airflow).

Importing product application’s dataset

I’ll explain the details of the architecture on the above figure focusing on the data importation part.

Importing to BigQuery from Cloud SQL

Product application’s dataset is stored in Cloud SQL.

There are two ways to import data from Cloud SQL to BigQuery:

  • A periodical bulk import of whole or a partial table snapshot
  • Data streaming of transactions on a table (like using Cloud DataFlow and Cloud Pub/Sub)

As a daily update of the data is expected, the first option is chosen.

BigQuery’s date-sharded table is created using BigQuery’s federated queries like in the code sample below. Using a data-sharded table and not a data-partitioned table, makes it easier to transport to Cloud Storage.

query = '''
SELECT *
FROM EXTERNAL_QUERY(
    "projects/PROJECT_ID/locations/LOCATION/connections/CONNECTION_NAME",
    "SELECT * FROM RDB_TABLE_NAME")
'''

import_to_bq_task = BigQueryOperator(
    task_id='import_to_bq',
    sql=query,
    destination_dataset_table=PROJECT_ID.DATASET.BQ_TABLE_NAME_{{ next_ds_nodash }}",
    write_disposition="WRITE_TRUNCATE",
    create_disposition="CREATE_IF_NEEDED",
    allow_large_results=True,
    use_legacy_sql=False
)

In addition, there are two types of product application tables

  • master data table
  • log data table

The whole master data table is updated on every run. For log data tables, it has an updated time column which based on it, only updates records from the previous run are fetched and appended to the existing table.

Leaving only latest records

Imported datasets on BigQuery is used for multiple ML projects. For log data table, old records are also stored on the dataset since all updated records are appended.

Because we want to avoid a case where all projects have their own logic to fetch only the latest records, tables with only the latest records are created after being imported to BigQuery like in the sample code below.

query = '''
SELECT * EXCEPT(rn)
FROM (
    SELECT 
        *,
        row_number() over (PARTITION BY id ORDER BY updated DESC) AS rn
    FROM (
        SELECT * FROM PROJECT_ID.DATASET.INTERMEDIATE_TABLE_NAME_{{ next_ds_nodash }}
        UNION ALL
        SELECT * FROM PROJECT_ID.DATASET.TABLE_NAME_{{ ds_nodash }}
    )
)
WHERE rn = 1
'''

create_table_with_only_latest_record_task = BigQueryOperator(
    task_id='create_table_with_only_latest_record',
    sql=query,
    destination_dataset_table="PROJECT_ID.DATASET.TABLE_NAME_{{ next_ds_nodash }}",
    write_disposition="WRITE_TRUNCATE",
    create_disposition="CREATE_IF_NEEDED",
    allow_large_results=True,
    use_legacy_sql=False
)

Solving region differences

Created datasets are used as the input to ML model training pipelines on Vertex AI. However the number of regions supported are limited and our product application’s Cloud SQL is not one of them.

Also, on the region that Vertex Pipelines are located, cross-region data creation is not possible (it’s possible on us multi-region).

That is why in the above architecture, datasets are imported to Cloud Storage in order to enable cross-regional data imports.

Creating Cloud Composer Environment

It is necessary to prepare an environment for Cloud Composer. For that, I prepared another repository to manage the environment’s settings. In this repository, settings are written as substitutions in Cloud Build file so that we can manage settings on code like in the sample below:

For switching settings between production, staging or development environment, it’s managed by giving variables as substitution when Circle CI kickes Cloud Build.

# cloudbuild.yaml

options:
  dynamic_substitutions: true

substitutions:
  _COMPOSER_ENVIRONMENT_NAME: your_environment_name
  _COMPOSER_IMAGE_VERSION: composer-2.0.4-airflow-2.2.3
  _SERVICE_ACCOUNT: your_service_account
  _COMPOSER_LOCATION: your_location
  _COMPOSER_ENVIRONMENT_SIZE: medium
  _COMPOSER_MAX_WORKERS: "10"
  # given from Circle CI file
  _PROFILE: development
  # environment variable to be set in comoposer environment
  _COMPOSER_ENV_VARIABLES: PROFILE=${_PROFILE}

steps:    
  - id: create composer environment if not exist
    name: 'gcr.io/cloud-builders/gcloud'
    entrypoint: 'bash'
    args: [
      '-c', 'gcloud composer environments create $_COMPOSER_ENVIRONMENT_NAME
      --location=$_COMPOSER_LOCATION --service-account=$_SERVICE_ACCOUNT --image-version=$_COMPOSER_IMAGE_VERSION
      || exit 0']
    
  - id: reset composer env variables if changed
    name: 'gcr.io/cloud-builders/gcloud'
    entrypoint: 'bash'
    args: [
      '-c', 'gcloud composer environments update $_COMPOSER_ENVIRONMENT_NAME
      --location=$_COMPOSER_LOCATION
      --update-env-variables=$_COMPOSER_ENV_VARIABLES
      || exit 0']
  
  - id: upgrade composer environment if changed
    name: 'gcr.io/cloud-builders/gcloud'
    entrypoint: 'bash'
    args: [
      '-c', 'gcloud beta composer environments update $_COMPOSER_ENVIRONMENT_NAME
      --location=$_COMPOSER_LOCATION
      --image-version=$_COMPOSER_IMAGE_VERSION
      || exit 0']
    
  - id: change composer environment settings if changed (max workers)
    name: 'gcr.io/cloud-builders/gcloud'
    entrypoint: 'bash'
    args: [
      '-c', 'gcloud beta composer environments update $_COMPOSER_ENVIRONMENT_NAME
      --location=$_COMPOSER_LOCATION
      --max-workers=$_COMPOSER_MAX_WORKERS
      || exit 0']

  - id: change composer environment settings if changed (environment size)
    name: 'gcr.io/cloud-builders/gcloud'
    entrypoint: 'bash'
    args: [
      '-c', 'gcloud beta composer environments update $_COMPOSER_ENVIRONMENT_NAME
      --location=$_COMPOSER_LOCATION
      --environment-size=$_COMPOSER_ENVIRONMENT_SIZE
      || exit 0']

  - id: update pypi packages
    name: 'gcr.io/cloud-builders/gcloud'
    entrypoint: 'bash'
    args: [
      '-c', 'gcloud composer environments update $_COMPOSER_ENVIRONMENT_NAME
      --location=$_COMPOSER_LOCATION
      --update-pypi-packages-from-file=settings/anytag/requirements.txt
      || exit 0']

Confirming if the import is completed

Following the ML model training pipeline on Vertex Pipelines (Kubeflow), we need to confirm if DAG on Cloud Composer (Airflow) when dataset imports are finished.

To do this, a flag file with time information is created on Cloud Storage when importing is completed, as per the figure below. Pipeline checks if the flag file is created or not. If it is not created, it waits for its creation.

Future plans

In the short term, these are the two tasks:

  • Create general feature store for ML
  • More frequent update of tables

For the first task, since there is a possibility that huge table schema changes may happen on the product’s application and our lack of development resources, I looked for a way to import the product’s application data tables as they are and each ML project creates its own ML input table based on the them.

So, I’m planning to prepare a general feature store for every ML project with structures suited to ML model training and model serving.

For the next task, currently this is updated on a daily basis. I plan to save the transactions of the dataset and update ML dataset table more frequently when online predictions are needed.

After the above two tasks are done, as a final plan, I am planning to make a Firestore (Datastore) unified interface for both ML prediction API and ML model training pipeline. The planed architecture is below.

Summary

For this time, because we set out the minimum requirements and Google Cloud Platform services made it easy to develop, it took just 3 man-days to develop the whole architecture.

While it was easy to develop, in the longer term this is not the best architecture as stated in future plans.

Latest News