BigQueryサブスクリプションを使ってイベントデータをBigQueryに取り込む

2023-02-14

#はじめに

BigQuery サブスクリプション かなり前の記事ですが、便利そうな機能で気になっていました。今回はBigQueryサブスクリプションを試していきたいと思います。

#BigQuery サブスクリプションとは

公式ドキュメントには下記のように書かれています。

BigQuery サブスクリプションは、メッセージを受信すると、既存の BigQuery テーブルにメッセージを書き込みます。サブスクライバー クライアントを個別に構成する必要はありません。Google Cloud コンソール、Google Cloud CLI、クライアント ライブラリ、または Pub/Sub API を使用して、BigQuery サブスクリプションの作成、更新、一覧表示、接続解除、削除を行います。

多くの場合、Dataflow を使用して BigQuery に書き込む単純なデータ取り込みパイプラインの代わりとして、BigQuery サブスクリプションには次の利点があります。

  • シンプルな導入。 BigQuery サブスクリプションは、コンソール、Google Cloud CLI、クライアント ライブラリ、または Pub/Sub API から単一のワークフローで設定できます。
  • 低コスト。 Dataflow ジョブを含む、同様の Pub/Sub パイプラインの追加のコストとレイテンシがなくなります。この費用の最適化は、保存前に追加の処理を必要としないメッセージング システムに便利です。
  • モニタリングを最小限に抑える。 BigQuery のサブスクリプションはマルチテナントの Pub/Sub サービスの一部であるため、個別のモニタリング ジョブを実行する必要はありません。

引用元:BigQuery サブスクリプション

PubSubトピックに紐づくBigQueryサブスクリプションを作っておいて、PubSubメッセージを送信するとBigQueryにデータが書き込まれるようです。Dataflowジョブなどを使わず、データをBigQueryに取り込めるので、低コストでシンプルに導入できそうですね。

#BigQueryサブスクリプションを試す

PubSubのトピックスキーマは使わなくても、BigQueryサブスクリプションを使うことはできます。しかし、トピックスキーマを使うことでBigQueryにデータを取り込むとき、データフィールドをカラムに安全にマッピングすることができます。使わない場合は、data というカラムにPayloadがそのまま入るので取り出す時に大変です。

#トピックスキーマを設計する

スキーマにはプロトコルバッファを利用します。スキーマはサンプルで出てくるものをそのまま使います。
トピックスキーマ作成

#トピックを作成する

トピック作成

#BigQueryのデータセット・テーブルを作成する

BigQueryテーブルは、下記を参考にしました。メタデータなどが書き込まれるフィールドは決まっています。その他にトピックスキーマで追加したフィールドを追加しました。

https://cloud.google.com/pubsub/docs/bigquery?hl=ja#properties_subscription

CREATE TABLE sub_test.events(
    data STRING,
    subscription_name STRING,
    message_id STRING,
    publish_time TIMESTAMP,
    attributes JSON,
    string_field STRING,
    float_field FLOAT64,
    boolean_field BOOLEAN,
)

#サブスクリプションを作成する

配信タイプにBigQueryへの書き込みがあります。これがBigQueryサブスクリプションです。
他にトピックスキーマを使用する, メタデータを書き込むにチェックを入れます。
サブスクリプション作成

#メッセージを送信する

CloudEventsのGoSDKを使ってメッセージを送信します。

export GOOGLE_CLOUD_PROJECT=<your project id>
export PUBSUB_TOPIC=sub_topic
gcloud auth login application-default
go run main.go 
2023/02/14 22:13:03 sent, accepted: true

main.goはcloudeventsのPubsubサンプルを使いました。一部送るデータをプロトコルバッファのスキーマに合わせて変更しています。

/*
 Copyright 2021 The CloudEvents Authors
 SPDX-License-Identifier: Apache-2.0
*/

package main

import (
	"context"
	"log"
	"os"

	cepubsub "github.com/cloudevents/sdk-go/protocol/pubsub/v2"
	cloudevents "github.com/cloudevents/sdk-go/v2"
	"github.com/kelseyhightower/envconfig"
)

type envConfig struct {
	ProjectID string `envconfig:"GOOGLE_CLOUD_PROJECT" required:"true"`

	TopicID string `envconfig:"PUBSUB_TOPIC" default:"demo_cloudevents" required:"true"`
}

// Example is a basic data struct.
type Example struct {
	// プロトコルバッファで定義したスキーマと同じデータ構造とする
	StringField  string  `json:"string_field"`
	FloatField   float64 `json:"float_field"`
	BooleanField bool    `json:"boolean_field"`
}

func main() {
	var env envConfig
	if err := envconfig.Process("", &env); err != nil {
		log.Printf("[ERROR] Failed to process env var: %s", err)
		os.Exit(1)
	}

	t, err := cepubsub.New(context.Background(),
		cepubsub.WithProjectID(env.ProjectID),
		cepubsub.WithTopicID(env.TopicID))
	if err != nil {
		log.Printf("failed to create pubsub transport, %s", err.Error())
		os.Exit(1)
	}
	c, err := cloudevents.NewClient(t, cloudevents.WithTimeNow(), cloudevents.WithUUIDs())
	if err != nil {
		log.Printf("failed to create client, %s", err.Error())
		os.Exit(1)
	}

	event := cloudevents.NewEvent()
	event.SetType("com.cloudevents.sample.sent")
	event.SetSource("github.com/cloudevents/sdk-go/samples/pubsub/sender/")
	_ = event.SetData("application/json", &Example{
		// テストデータを送る
		StringField:  "test",
		FloatField:   1234.5678,
		BooleanField: true,
	})

	if result := c.Send(context.Background(), event); cloudevents.IsUndelivered(result) {
		log.Printf("failed to send: %v", result)
		os.Exit(1)
	} else {
		log.Printf("sent, accepted: %t", cloudevents.IsACK(result))
	}

	os.Exit(0)
}

BigQueryにデータが書き込まれていることが確認してみます。トピックスキーマで指定したフィールドが自動的にBigQueryのテーブルにマッピングされて登録されています。
BigQuery

attributes部分には以下のようなJSONデータが入っていました。CloudEventsで指定したメタデータが入っていることが確認できます。

{
    "googclient_schemarevisionid": "c04cabd0",
    "ce-specversion": "1.0",
    "Content-Type": "application/json",
    "googclient_schemaencoding": "JSON",
    "ce-time": "2023-02-14T13:13:01.533288Z",
    "ce-source": "github.com/cloudevents/sdk-go/samples/pubsub/sender/",
    "ce-id": "77e9e0b4-5e40-42ed-b7ad-e8ba59c435bc",
    "ce-type": "com.cloudevents.sample.sent",
    "googclient_schemaname": "projects/go-flutter-gcp/schemas/test_schema"
}

#まとめ

今回は、BigQueryサブスクリプションを試してみました。
かなり簡単に導入することができて驚きました。一方で、BigQuery サブスクリプションだと時間でパーティションするテーブルに書き込んだり、スキーマをアップデートなどの運用や、エラーが起きた時の対応など、実際の運用にのせられるかはもう少し検証が必要そうだなと感じました。