2023.10.10
CloudDataflowでCloudSQLからBigQueryへデータ連携をする
みなさんこんにちは、グループ研究開発本部 AI研究開発室のK.Fです。
担当プロジェクトでは、GCP上にデータ基盤が構築されており、データレイクにCloud Storage、データマートにBigQuery、ジョブの依存・実行管理にCloud Composerを利用しています。Cloud Composerの導入により、データレイクからデータマート、ダッシュボード用のテーブル/ビューを集計するときは、失敗時の再実行に強い作りにできています。しかし、サービスのDBが動いているCloud SQL(postgresql)からBigQueryへのデータ連携がレガシーな作りになっており、失敗時の再実行に強い作りになっていないです。今回は、Cloud SQLからBigQueryへのデータ連携にCloud Dataflowを利用できないか、を検証していきます。
結論
- GCPのマネージドETLツールであるCloud Dataflowを検証してみた
- Google提供のテンプレートを利用するのが楽だた、少し書き換えるくらいならさくっとできる
- 真価を発揮するのはETLのTの部分だが(今回は試していない)、ELはそこそこ簡単にできる
Cloud Dataflowとは
Cloud Dataflowとは、GCPのサーバーレスかつマネージドのETLツールです。中身はApache Beamという分散処理フレームワークです(Apache Beamは、GoogleがApacheにdataflowの中身の技術を寄贈したものがOSS化されたもののようです)。弊社の過去のブログでもdataflowを検証した記事があるので、先に読んでいただくと理解が深まるかもしれません。
Dataflowの利用は、Googleが提供しているテンプレートがあり、これが利用できるのであれば利用するのが、最も楽です。このテンプレートにないものを実行したい場合は、自分でテンプレートを用意することになるのですが、テンプレートの作成は大きく分けて2種類あり、flex-templateとクラシックテンプレートと呼ばれています。flex-templateがdockerを利用したもので、クラシックテンプレートはflex-templateが登場する以前に使われていたものを、クラシックと冠して残したもののようです。なので、flex-templateを利用するのが、オーソドックスです。
templateの実装には、Apache BeamのSDKを利用します。SDKの開発言語は、Java/Python/Golangの選択肢があるみたいですが、実装が Java > Python >> golangの順番でリッチなので、こだわったことをしたいのであればJavaを選択するのが良さそうです。
Cloud Dataflowの検証
上述の理由からflex-templateでJavaのSDKを利用して検証を進めていきます。
flex-templateの実装
Cloud SQL(postgresql)に接続し、BigQueryにデータをロードするタスクを実行したいのですが、満たしたい要件として以下があります。
- 失敗時に再実行できるようにしたい(冪等性がある)
- 現在連携しているテーブルが、BigQuery上で日付分割テーブルになっており、そこを変えたくない
Google提供のJdbcToBigQueryというテンプレートがこちらにあるのですが、テーブルの作成方法が、CREATE_NEVER(テーブルが存在しない場合は失敗する)、WRITE_APPEND(実行時にレコードをappendする)になっているため、再実行の冪等性を担保することができません。なので、このテンプレートをベースに作成方法をCREATE_IF_NEEDED(テーブルが存在しない場合は作成する)、WRITE_TRUNCATED(テーブルが既に存在する場合は置き換える)に変更します。また、CREATE_IF_NEEDEDの場合はテーブルスキーマを指定する必要があるので、スキーマも指定できるように変更します。
step2のWriteToBigQueryの部分を以下のように書き換えます。
.apply(
"Write to BigQuery",
BigQueryIO.writeTableRows()
.withoutValidation()
.withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)
.withCustomGcsTempLocation(options.getBigQueryLoadingTemporaryDirectory())
.withSchema(
NestedValueProvider.of(
options.getSchema(),
new SerializableFunction() {
@Override
public TableSchema apply(String jsonText) {
TableSchema tableSchema = new TableSchema();
List fields = new ArrayList();
try {
JSONArray bqSchemaJsonArray = new JSONArray(jsonText);
for (int i = 0; i < bqSchemaJsonArray.length(); i++) {
JSONObject inputField = bqSchemaJsonArray
.getJSONObject(i);
TableFieldSchema field = new TableFieldSchema()
.setName(inputField.getString("name"))
.setType(inputField.getString("type"));
if (inputField.has("mode")) {
field.setMode(inputField.getString("mode"));
}
fields.add(field);
}
tableSchema.setFields(fields);
} catch (Exception e) {
throw new RuntimeException(e);
}
return tableSchema;
}
}))
.to(options.getOutputTable()));
flex-templateのbuild
templateの実装が完了したので、dataflowで利用できるようにflex-templateのbuildをしていきます。
dockerfileの作成
FROM maven:3-openjdk-17 AS build_and_test
COPY . /app
WORKDIR /app
RUN mvn clean package -Dmaven.test.skip=true
FROM gcr.io/dataflow-templates-base/java17-template-launcher-base:latest
ARG MAIN_CLASS
COPY --from=build_and_test /app/target/solution-bundled-0.1.0.jar .
ENV FLEX_TEMPLATE_JAVA_CLASSPATH="solution-bundled-0.1.0.jar"
ENV FLEX_TEMPLATE_JAVA_MAIN_CLASS="${MAIN_CLASS}"
docker buildして、container registoryに登録
docker build --build-arg "MAIN_CLASS=main" -t "$IMAGE_PATH" "." docker push "$IMAGE_PATH"
flex templateのbuild
gcloud dataflow flex-template build "$TEMPLATE_GCS_URL" --image "$IMAGE_PATH" --sdk-language "JAVA" --metadata-file "/path-to-metadata-json"
ここまで実行すると、$TEMPLATE_GCS_URLに指定したtemplateの情報を記載してjsonファイルがアップロードされます。このjsonファイルを指定してflex-templateの実行を行います。
一つ注意点なのが、筆者の環境もそうなのですが、aarch64のCPUを搭載したPC、M1/M2 Macを利用している場合は、ローカルでbuildしたdocker imageがそのままdataflowの実行環境で動かすことができません(aarch64でbuildしたimageをamd64の環境でrunすることができないため)。cloud buildのamd64環境を利用してdocker buildを実行するのが良いでしょう。
cloudbuild.yamlの作成
steps:
- name: "gcr.io/cloud-builders/docker"
args: [
"build",
"--build-arg",
"MAIN_CLASS=${_MAIN_CLASS}",
"-t",
"${_IMAGE_PATH}:latest",
"."
]
- name: "gcr.io/cloud-builders/docker"
args: [
"push",
"${_IMAGE_PATH}:latest"
]
- name: "gcr.io/cloud-builders/gcloud"
args: [
"dataflow",
"flex-template",
"build",
"${_TEMPLATE_GCS_URL}",
"--image",
"${_IMAGE_PATH}:latest",
"--sdk-language",
"JAVA",
"--metadata-file",
"/path-to-metadata-json"
]
substitutions:
_MAIN_CLASS: "main" # default value
_TEMPLATE_GCS_URL: "gs://gcs_location_to_store_metadata"
_IMAGE_PATH: "gcr.io/gcr_location_to_push_image"
このような感じでyamlファイルを作成し、以下のコマンドを実行することで、docker imageのbuildからflex templateのbuildまでをCI環境で行ってくれます。
gcloud builds submit --config=build/cloudbuild.yaml --substitutions="_MAIN_CLASS=main,_TEMPLATE_GCS_URL=gs://gcs_loation_to_store_metadata,_IMAGE_PATH=gcr.io/gcr_location_to_push_image"
flex-templateの実行
dataflowの実行には、gcloudコマンドを利用する方法とrest-apiを叩く方法の2通りあります。gcloudコマンドを利用する場合は以下のようになります。
gcloud dataflow flex-template run "flex-template-job-`date +%Y%m%d-%H%M%S`" \ --template-file-gcs-location "$TEMPLATE_GCS_URL" \ --parameters outputTable="$OUTPUT_TABLE" \ --parameters schema="$SCHEMA" \ --parameters bigQueryLoadingTemporaryDirectory="$BIGQUERY_LOADING_TEMPORARY_DIRECTORY" \ --region "$REGION"
まとめ
今回は、dataflowのflex-templateを利用して、cloud sqlのデータをbigqueryにloadする処理を実装してみました。flex-templateを利用することで、dataflowの実行環境を自由にカスタマイズすることができるようになります。また、CI環境を利用することで、flex-templateのbuildから実行までを自動化することができます。
最後に
グループ研究開発本部 AI研究開発室では、データサイエンティスト/機械学習エンジニアを募集しています。ビッグデータの解析業務などAI研究開発室にご興味を持って頂ける方がいらっしゃいましたら 、ぜひ募集要項一覧からご応募をお願いします。 一緒に勉強しながら楽しく働きたい方のご応募をお待ちしております。
グループ研究開発本部の最新情報をTwitterで配信中です。ぜひフォローください。
Follow @GMO_RD

