Hi, I’m Naoki Komoto (河本 直起) working as Machine Learning Engineer in AnyMind.
At AnyMind, we are developing our MLOps environment from scratch and in a previous article, we introduced the machine learning batch prediction architecture using Vertex AI.
In this article, as a continuation of our previous article on data infrastructure for machine learning, we hope to introduce the data infrastructure for machine learning that we are currently using.
Background
Cloud Composer (Airflow) for Machine Learning Data Pipeline
As noted in this article, the model training pipeline used to load dataset directly from the product-side RDB, as shown below.
A problem was that RDB was adopted to meet product requirements and was not suitable for processing data for machine learning. We solved that problem by introducing following functions with Cloud Composer (Airflow) as data pipeline.
- Function to Copy dataset from RDB to BigQuery
- Function to create model training datasets using copied dataset
And that enabled us to separate the dataset for machine learning from the dataset for product, and to execute of data processing for machine learning.
After the article, we handed over the data transfer part from RDB to BigQuery to the development team on the product side. We also added a process to create a common data set for machine learning. So the current architecture is as follows.
Common datasets are defined in a form that is easy to use universally across machine learning projects, absorbing product dataset-specific rules and aggregating various statistics. The common dataset refers the batch prediction results of each project in order to use the prediction results of other projects to train the project’s model.
The completion of the batch prediction workflow and other dependent workflows is confirmed by a flag file, which those workflows creates in Google Cloud Storage (GCS) after they are completed.
Problems
The following problems were identified while operating Cloud Composer (Airflow).
- Knowledge of Airflow is a must during development
- Difficulty in separating data processing from model training process
Regarding the first, our machine learning engineers are in charge of everything from problem setup, model investigation and validation to development and operation, and the scope of development includes data creation, model training, and model serving. This has made it necessary to reduce the amount of knowledge required for development in order to have more resources available to devote to the essential tasks: problem formulation and model investigation and validation. In addition, Airflow was over-specified because the data creation pipeline for machine learning does not involve processing with very complex dependencies.
Regarding the second, we intended to strictly separate the scope of the data creation pipeline (Cloud Composer) and the model training pipeline (Vertex Pipelines) by preparing different environments for the two, so that the input data would not be dependent on the model training process. However, the only processing originally envisioned for Airflow was pipeline execution of SQL queries, but since Airflow could also implement model training processing, development proceeded in a manner that blurred the distinction between the two.
Why BigQuery View Table?
The above problems can be solved with a framework that can register SQL queries and resolve their dependencies.
BigQuery View Table allows you to register an SQL query and get the results of the query by reading the table, and we found that the dependency can be resolved by devising a deployment method. In addition, BigQuery View Tables are referenced in the same way as regular tables, eliminating the need for modifications to the model training pipeline that already references the tables. For these reasons, we decided to create a data infrastructure for machine learning based on the BigQuery View Table.
The use of BigQuery View Table also provided the following benefits:
- Eliminates the need to wait for the data creation pipeline to complete processing from the model training pipeline
- Schedule changes can be made in one place
The BigQuery View Table executes the registered SQL query at the time it is referenced to retrieve the latest data at that point in time. For the first, when using the data creation pipeline, the model training pipeline would need to detect its completion, but this is not necessary when using BigQuery View Tables. Also, there is no need to monitor or recover from the processing that was required in the data creation pipeline, and if processing in the BigQuery View Table fails due to network errors, etc., retry can be executed from the model training pipeline. This has reduced operational costs, including psychological ones.
For the second, when using Cloud Composer, the model training pipeline and the data creation pipeline both had to be rescheduled every time the overall pipeline execution schedule was changed. But We can now simply reschedule the model training pipeline. By having only one change point, the risk of omitted changes was reduced.
Architecture
The overall architecture is as follows. The part using Cloud Composer mentioned above are all replaced with BigQuery View Table. The next section describes why we separated datasets like this.
Separating Datasets
Stopping the use of Cloud Composer makes it harder to describe dependencies between table creation jobs. In order to prevent tables from referencing each other and to clarify the role of each table, we separated datasets with the following rules.
- Dataset for storing tables that reference product datasets:Refer to product datasets and resolve product dataset-specific rules
- Dataset for storing intermediate tables common to all projects:Refer to 1. and perform table generalization process for subsequent final table creation
- Dataset for storing final tables common to all projects:Refer to 2. and perform join processing between tables
- Dataset for storing tables that reference batch prediction results:Refer to the batch prediction log tables and transform it into a usable form
- Dataset for storing project tables:Refer to 3.4. and create a table for specific project that will be referenced by the model training pipeline
We also allow table to reference to other tables in the same dataset as long as tables are created in the correct dataset.
Registering of BigQuery View Tables
query
├── dataset_name_a
│ ├── table_name_1.sql
│ └── table_name_2.sql
└── dataset_name_b
└── table_name_3.sql
archived_query
├── dataset_name_a
│ └── table_name_4.sql
└── dataset_name_b
The above directory is prepared in the repository where View tables are deployed. When SQL query is put under query/target_dataset_name/
as table_name.sql
, the query will be registered as a View table. If you modify the registered .sql file, the changes will be reflected in the View table.
If you want to delete an existing View table, move the .sql file of the target table under archived_query/target_dataset_name/
. This also can be implemented by deleting tables that do not exist in the directory, but we have implemented it by archiving the query. That is because we wanted to handle cases where the data generation has been stopped once but the user wants to generate the table again. In this case, you can just move the target files from under archived_query/target_dataset_name/
to under query/target_dataset_name/
.
Deployment
The View table is deployed with the following code.
Here, if you try to release the referenced table at the same time as the tables that reference it, the execution may fail depending on the order of creation because the referenced table does not exist. Therefore, we have implemented a recursive creation starting from the table from which the referenced table exists.
import os
import glob
import dataclasses
from typing import ClassVar, List
from pyhocon import ConfigFactory
from google.cloud import bigquery
from google.cloud.exceptions import Conflict, NotFound
# pass project id as environment variable to Docker Image from cloudbuild.yaml
PROJECT_ID = os.environ['PROJECT_ID']
class InvalidQueryError(Exception):
pass
@dataclasses.dataclass
class ViewTableConf:
project_id: str
dataset: str
table_name: str
original_query: str
view_id: str = dataclasses.field(init=False)
query: str = dataclasses.field(init=False)
VIEW_ID_FORMAT: ClassVar[str] = "{project_id}.{dataset}.{table_name}"
def __post_init__(self):
self.view_id = self.VIEW_ID_FORMAT.format(
project_id=self.project_id,
dataset=self.dataset,
table_name=self.table_name
)
self.query = self.original_query.replace(
"_PROJECT_ID_", self.project_id
)
def load_table_info_file(
query_path_format: str
):
query_files = glob.glob(query_path_format)
for query_file in query_files:
with open(query_file, 'r') as f:
query = f.read()
# extract dataset and table name from file path
dir, file_name = os.path.split(query_file)
table_name = file_name.split(".")[0]
dataset = os.path.basename(dir)
yield dataset, table_name, query
class ViewDeployer:
def __init__(
self,
project_id: str
) -> None:
self.project_id = project_id
self.client = bigquery.Client(project=project_id)
def _is_view_exist(
self,
view_id
):
"""
check if view table already exists
"""
try:
_ = self.client.get_table(view_id)
return True
except Exception:
return False
def deploy(
self,
view_id: str,
view_query: str
) -> bool:
"""
deploy view table
"""
is_succeed = True
print(f"Deploy {view_id}")
view = bigquery.Table(view_id)
view.view_query = view_query
is_view_exist = self._is_view_exist(view_id)
try:
if not is_view_exist:
# create new one
view = self.client.create_table(view, exists_ok=False)
print(f"Created: {str(view.reference)}\n")
else:
# if already created, update exisiting one
self.client.delete_table(view_id)
view = self.client.create_table(view)
print(f"Updated: {str(view.reference)}\n")
except Exception as e:
print(f"Deployment failed: {str(view.reference)}")
print(f"{e}\n")
is_succeed = False
return is_succeed
def delete(
self,
view_id: str
) -> bool:
"""
delete view table
"""
is_succeed = True
try:
self.client.delete_table(view_id)
print(f"Deleted: {view_id}\n")
except NotFound:
print(f"Already deleted: {view_id}")
except Exception:
is_succeed = False
return is_succeed
def deploy_recursive(
view_deployer: ViewDeployer,
undeployed_view_table_confs: List[ViewTableConf]
):
"""
deploy with solving dependency between tables
"""
failed_view_table_confs = []
for undeployed_view_table_conf in undeployed_view_table_confs:
is_succeed = view_deployer.deploy(
view_id=undeployed_view_table_conf.view_id,
view_query=undeployed_view_table_conf.query
)
if not is_succeed:
failed_view_table_confs.append(undeployed_view_table_conf)
# if no tables are succeededly deployed on this trial, raise Error and finish process
if len(failed_view_table_confs) > 0:
if len(undeployed_view_table_confs) > len(failed_view_table_confs):
deploy_recursive(view_deployer, failed_view_table_confs)
else:
msg = f"Deployment process failed\n Probable tables: {[i.view_id for i in failed_view_table_confs]}"
raise InvalidQueryError(msg)
def main():
view_deployer = ViewDeployer(project_id=PROJECT_ID)
print("Delete archived view tables\n")
for dataset, table_name, original_query in load_table_info_file(conf.archived_query_path_format):
view_table_conf = ViewTableConf(
project_id=PROJECT_ID,
dataset=dataset,
table_name=table_name,
original_query=original_query
)
view_deployer.delete(
view_id=view_table_conf.view_id
)
print("Deploy view tables\n")
view_table_confs = []
for dataset, table_name, original_query in load_table_info_file(conf.query_path_format):
view_table_conf = ViewTableConf(
project_id=PROJECT_ID,
dataset=dataset,
table_name=table_name,
original_query=original_query
)
view_table_confs.append(view_table_conf)
deploy_recursive(view_deployer, view_table_confs)
if __name__ == "__main__":
main()
This is executed by Cloud Build kicked via CircleCI. The part of the query that specifies the project id is written as _PROJECT_ID_
, and the project id is obtained via Cloud Build when deploying, and the project id can be dynamically specified by overwriting it.
Saving Data Used for Model Training
The data used to train models must be retained for model debugging. Since the BigQuery View Table always returns latest results of the query at that time, subsequent model training pipeline stores the acquired data in the GCS when the data is loaded.
To reduce costs, an object life cycle is set for the destination GCS bucket, and data is deleted after a set period of time.
Disadvantages of BigQuery View Table
The major disadvantages of using BigQuery View Tables compared to regular tables are as follows.
- Computation cost becomes higher because calculations are performed each time data is loaded
- Data loading takes more time
These can be handled without modifying the code on the loading side by caching query results or using Materialized View. We are also considering replacing some of the View tables with regular tables using Cloud Composer if necessary.
In addition, compared to using Cloud Composer, the following is disadvantage.
- Cannot describe workflow processing
In this regard, since it is currently sufficient to resolve dependencies between tables, and the dependencies cannot be complex, the aforementioned deployment flow can solve this problem. However, if there are future projects that require workflow processing, we would consider using BigQuery View Table and Cloud Composer together only for those projects.