Tech Blog

Facebook Icon Twitter Icon Linkedin Icon

AnyMind Group

Facebook Icon Twitter Icon Linkedin Icon

[Tech Blog] BigQuery View Table for Machine Learning Feature Store

Hi, I’m Naoki Komoto (河本 直起) working as Machine Learning Engineer in AnyMind.

At AnyMind, we are developing our MLOps environment from scratch and in a previous article, we introduced the machine learning batch prediction architecture using Vertex AI.

In this article, as a continuation of our previous article on data infrastructure for machine learning, we hope to introduce the data infrastructure for machine learning that we are currently using.

Background

Cloud Composer (Airflow) for Machine Learning Data Pipeline

As noted in this article, the model training pipeline used to load dataset directly from the product-side RDB, as shown below.

A problem was that RDB was adopted to meet product requirements and was not suitable for processing data for machine learning. We solved that problem by introducing following functions with Cloud Composer (Airflow) as data pipeline.

  • Function to Copy dataset from RDB to BigQuery
  • Function to create model training datasets using copied dataset

And that enabled us to separate the dataset for machine learning from the dataset for product, and to execute of data processing for machine learning.

After the article, we handed over the data transfer part from RDB to BigQuery to the development team on the product side. We also added a process to create a common data set for machine learning. So the current architecture is as follows.

Common datasets are defined in a form that is easy to use universally across machine learning projects, absorbing product dataset-specific rules and aggregating various statistics. The common dataset refers the batch prediction results of each project in order to use the prediction results of other projects to train the project’s model.

The completion of the batch prediction workflow and other dependent workflows is confirmed by a flag file, which those workflows creates in Google Cloud Storage (GCS) after they are completed.

Problems

The following problems were identified while operating Cloud Composer (Airflow).

  • Knowledge of Airflow is a must during development
  • Difficulty in separating data processing from model training process

Regarding the first, our machine learning engineers are in charge of everything from problem setup, model investigation and validation to development and operation, and the scope of development includes data creation, model training, and model serving. This has made it necessary to reduce the amount of knowledge required for development in order to have more resources available to devote to the essential tasks: problem formulation and model investigation and validation. In addition, Airflow was over-specified because the data creation pipeline for machine learning does not involve processing with very complex dependencies.

Regarding the second, we intended to strictly separate the scope of the data creation pipeline (Cloud Composer) and the model training pipeline (Vertex Pipelines) by preparing different environments for the two, so that the input data would not be dependent on the model training process. However, the only processing originally envisioned for Airflow was pipeline execution of SQL queries, but since Airflow could also implement model training processing, development proceeded in a manner that blurred the distinction between the two.

Why BigQuery View Table?

The above problems can be solved with a framework that can register SQL queries and resolve their dependencies.

BigQuery View Table allows you to register an SQL query and get the results of the query by reading the table, and we found that the dependency can be resolved by devising a deployment method. In addition, BigQuery View Tables are referenced in the same way as regular tables, eliminating the need for modifications to the model training pipeline that already references the tables. For these reasons, we decided to create a data infrastructure for machine learning based on the BigQuery View Table.

The use of BigQuery View Table also provided the following benefits:

  • Eliminates the need to wait for the data creation pipeline to complete processing from the model training pipeline
  • Schedule changes can be made in one place

The BigQuery View Table executes the registered SQL query at the time it is referenced to retrieve the latest data at that point in time. For the first, when using the data creation pipeline, the model training pipeline would need to detect its completion, but this is not necessary when using BigQuery View Tables. Also, there is no need to monitor or recover from the processing that was required in the data creation pipeline, and if processing in the BigQuery View Table fails due to network errors, etc., retry can be executed from the model training pipeline. This has reduced operational costs, including psychological ones.

For the second, when using Cloud Composer, the model training pipeline and the data creation pipeline both had to be rescheduled every time the overall pipeline execution schedule was changed. But We can now simply reschedule the model training pipeline. By having only one change point, the risk of omitted changes was reduced.

Architecture

The overall architecture is as follows. The part using Cloud Composer mentioned above are all replaced with BigQuery View Table. The next section describes why we separated datasets like this.

Separating Datasets

Stopping the use of Cloud Composer makes it harder to describe dependencies between table creation jobs. In order to prevent tables from referencing each other and to clarify the role of each table, we separated datasets with the following rules.

  1. Dataset for storing tables that reference product datasets:Refer to product datasets and resolve product dataset-specific rules
  2. Dataset for storing intermediate tables common to all projects:Refer to 1. and perform table generalization process for subsequent final table creation
  3. Dataset for storing final tables common to all projects:Refer to 2. and perform join processing between tables
  4. Dataset for storing tables that reference batch prediction results:Refer to the batch prediction log tables and transform it into a usable form
  5. Dataset for storing project tables:Refer to 3.4. and create a table for specific project that will be referenced by the model training pipeline

We also allow table to reference to other tables in the same dataset as long as tables are created in the correct dataset.

Registering of BigQuery View Tables

query
├── dataset_name_a
│       ├── table_name_1.sql
│       └── table_name_2.sql
└── dataset_name_b
        └── table_name_3.sql

archived_query
├── dataset_name_a
│       └── table_name_4.sql
└── dataset_name_b

The above directory is prepared in the repository where View tables are deployed. When SQL query is put under query/target_dataset_name/ as table_name.sql, the query will be registered as a View table. If you modify the registered .sql file, the changes will be reflected in the View table.

If you want to delete an existing View table, move the .sql file of the target table under archived_query/target_dataset_name/. This also can be implemented by deleting tables that do not exist in the directory, but we have implemented it by archiving the query. That is because we wanted to handle cases where the data generation has been stopped once but the user wants to generate the table again. In this case, you can just move the target files from under archived_query/target_dataset_name/ to under query/target_dataset_name/.

Deployment

The View table is deployed with the following code.

Here, if you try to release the referenced table at the same time as the tables that reference it, the execution may fail depending on the order of creation because the referenced table does not exist. Therefore, we have implemented a recursive creation starting from the table from which the referenced table exists.

import os
import glob
import dataclasses
from typing import ClassVar, List
from pyhocon import ConfigFactory
from google.cloud import bigquery
from google.cloud.exceptions import Conflict, NotFound


# pass project id as environment variable to Docker Image from cloudbuild.yaml
PROJECT_ID = os.environ['PROJECT_ID']


class InvalidQueryError(Exception):
    pass


@dataclasses.dataclass
class ViewTableConf:
    project_id: str
    dataset: str
    table_name: str
    original_query: str

    view_id: str = dataclasses.field(init=False)
    query: str = dataclasses.field(init=False)

    VIEW_ID_FORMAT: ClassVar[str] = "{project_id}.{dataset}.{table_name}"

    def __post_init__(self):
        self.view_id = self.VIEW_ID_FORMAT.format(
            project_id=self.project_id,
            dataset=self.dataset,
            table_name=self.table_name
        )
        self.query = self.original_query.replace(
                "_PROJECT_ID_", self.project_id
            )


def load_table_info_file(
    query_path_format: str
    ):
    query_files = glob.glob(query_path_format)
    for query_file in query_files:
        with open(query_file, 'r') as f:
            query = f.read()

        # extract dataset and table name from file path
        dir, file_name = os.path.split(query_file)
        table_name = file_name.split(".")[0]
        dataset = os.path.basename(dir)
        yield dataset, table_name, query
        

class ViewDeployer:
    def __init__(
        self,
        project_id: str
        ) -> None:
        self.project_id = project_id
        self.client = bigquery.Client(project=project_id)

    def _is_view_exist(
        self,
        view_id
        ):
        """
        check if view table already exists
        """
        try:
            _ = self.client.get_table(view_id)
            return True
        except Exception:
            return False

    def deploy(
        self,
        view_id: str,
        view_query: str
        ) -> bool:
        """
        deploy view table
        """
        is_succeed = True

        print(f"Deploy {view_id}")
        view = bigquery.Table(view_id)
        view.view_query = view_query

        is_view_exist = self._is_view_exist(view_id)
        try:
            if not is_view_exist:
                # create new one
                view = self.client.create_table(view, exists_ok=False)
                print(f"Created: {str(view.reference)}\n")
            else:
                # if already created, update exisiting one
                self.client.delete_table(view_id)
                view = self.client.create_table(view)
                print(f"Updated: {str(view.reference)}\n")
        except Exception as e:
            print(f"Deployment failed: {str(view.reference)}")
            print(f"{e}\n")
            is_succeed = False
        
        return is_succeed
    
    def delete(
        self,
        view_id: str
        ) -> bool:
        """
        delete view table
        """
        is_succeed = True
        try:
            self.client.delete_table(view_id)
            print(f"Deleted: {view_id}\n")
        except NotFound:
            print(f"Already deleted: {view_id}")
        except Exception:
            is_succeed = False
        return is_succeed


def deploy_recursive(
    view_deployer: ViewDeployer,
    undeployed_view_table_confs: List[ViewTableConf]
    ):
    """
    deploy with solving dependency between tables
    """
    failed_view_table_confs = []
    for undeployed_view_table_conf in undeployed_view_table_confs:
        is_succeed = view_deployer.deploy(
            view_id=undeployed_view_table_conf.view_id,
            view_query=undeployed_view_table_conf.query
        )
        if not is_succeed:
            failed_view_table_confs.append(undeployed_view_table_conf)
    
    # if no tables are succeededly deployed on this trial, raise Error and finish process
    if len(failed_view_table_confs) > 0:
        if len(undeployed_view_table_confs) > len(failed_view_table_confs):
            deploy_recursive(view_deployer, failed_view_table_confs)
        else:
            msg = f"Deployment process failed\n Probable tables: {[i.view_id for i in failed_view_table_confs]}"
            raise InvalidQueryError(msg)


def main():
    view_deployer = ViewDeployer(project_id=PROJECT_ID)

    print("Delete archived view tables\n")
    for dataset, table_name, original_query in load_table_info_file(conf.archived_query_path_format):
        view_table_conf = ViewTableConf(
            project_id=PROJECT_ID,
            dataset=dataset,
            table_name=table_name,
            original_query=original_query
        )
        view_deployer.delete(
            view_id=view_table_conf.view_id
        )

    print("Deploy view tables\n")
    view_table_confs = []
    for dataset, table_name, original_query in load_table_info_file(conf.query_path_format):
        view_table_conf = ViewTableConf(
            project_id=PROJECT_ID,
            dataset=dataset,
            table_name=table_name,
            original_query=original_query
        )
        view_table_confs.append(view_table_conf)
    deploy_recursive(view_deployer, view_table_confs)

if __name__ == "__main__":
    main()

This is executed by Cloud Build kicked via CircleCI. The part of the query that specifies the project id is written as _PROJECT_ID_, and the project id is obtained via Cloud Build when deploying, and the project id can be dynamically specified by overwriting it.

Saving Data Used for Model Training

The data used to train models must be retained for model debugging. Since the BigQuery View Table always returns latest results of the query at that time, subsequent model training pipeline stores the acquired data in the GCS when the data is loaded.

To reduce costs, an object life cycle is set for the destination GCS bucket, and data is deleted after a set period of time.

Disadvantages of BigQuery View Table

The major disadvantages of using BigQuery View Tables compared to regular tables are as follows.

  • Computation cost becomes higher because calculations are performed each time data is loaded
  • Data loading takes more time

These can be handled without modifying the code on the loading side by caching query results or using Materialized View. We are also considering replacing some of the View tables with regular tables using Cloud Composer if necessary.

In addition, compared to using Cloud Composer, the following is disadvantage.

  • Cannot describe workflow processing

In this regard, since it is currently sufficient to resolve dependencies between tables, and the dependencies cannot be complex, the aforementioned deployment flow can solve this problem. However, if there are future projects that require workflow processing, we would consider using BigQuery View Table and Cloud Composer together only for those projects.

Latest News