自社ブログの記事閲覧数をdbtとBigQueryで分析してランキングを作成する

dbt
BigQuery
Go

2024-09-13

#目次

#はじめに

今回は、自社ブログに閲覧数ランキングを導入していきたいと思います。
以前、データ分析基盤を作ったとき、dbt, BigQueryを利用した経験があります。そのときの経験を活かして、今回は、Google AnalyticsのデータをBigQueryにExportし、dbtで集計処理を行い、閲覧数ランキングを取得する方法をご紹介します。

#実現方法の検討

Google Analyticsは、すでに導入されているので、それを利用して閲覧数ランキングを導入することを検討します。

  • 案1: Google AnalyticsのレポートAPIを利用して閲覧数ランキングを取得する
  • 案2: Google AnalyticsのデータをBigQueryへExportし、dbtで集計して閲覧数ランキングを取得する

案1は、手っ取り早く実装できますし、おすすめです。しかし、今回は案2を選択します。理由は、BigQueryで独自にデータ分析することで、余計なデータを除去したり、データを加工することができるからです。また、dbtを利用することで、データの加工処理を管理しやすくなりますので、そのデモンストレーションも兼ねて案2を選択します。実用においても、BigQueryにExportしておくと、永続化できるのでとりあえずExportしておくと良いと思います。Google Analyticsのデータは、データの保持期間があるので。

#システム構成

システム構成のbeforeとafterを示します。赤い破線が変更点です。以下概要。

  • Google AnalyticsのデータをBigQueryにExportする
  • dbtで集計処理を行う
  • GoでAPIを作成し、閲覧数ランキングを取得する

#before

システム構成

#after

システム構成

#BigQueryにGoogle AnalyticsのデータをExportする

Google Analyticsを導入したらできるだけ早く、BigQueryにデータをExportしておくと良いと思います。設定をしたその日以降のデータがExportされました。過去に遡ってはExportされませんでした。

詳細は、こちらの記事を参照してください。

ボタンをポチポチするだけで、設定は完了です。簡単です。

#dbtで集計処理を行う

#dbtのプロジェクトを作成する

# venvを作成する
python -m venv .venv
. .venv/bin/activate

# package管理にryeを利用し、dbtをインストールする
rye add dbt-core dbt-bigquery

# dbtのプロジェクトを作成する
dbt init analytics
  • profiles.ymlを作成する
analytics:
  target: "{{ env_var('DBT_TARGET', 'dev') }}"
  outputs:
    dev:
      type: bigquery
      method: oauth
      project: <your project_id>
      schema: dbtschema
      threads: 4
      location: asia-northeast1
    prd:
      type: bigquery
      method: oauth
      project: <your project_id>
      schema: dbtschema
      threads: 4
      location: asia-northeast1

個人的には、profiles.ymlは、プロジェクト直下において、dbt run --profiles-dir .で運用するのが好みです。CIや本番環境で切り替えるのが楽だからです。
上記のschemaは、環境ごとに書き換えたいのでダミー値です。dbt_project.ymlに記述します。

  • dbt_project.ymlのmodelsを設定する。
models:
  analytics:
    staging:
      +materialized: view
      +schema: |
        {%- if   target.name == "prd" -%} analytics_staging_prd
        {%- elif target.name == "dev" -%} analytics_staging_dev
        {%- else -%} invalid_database
        {%- endif -%}

    intermediate:
      +materialized: ephemeral
      +schema: |
        {%- if   target.name == "prd" -%} analytics_intermediate_prd
        {%- elif target.name == "dev" -%} analytics_intermediate_dev
        {%- else -%} invalid_database
        {%- endif -%}
    marts:
      +materialized: table
      +schema: |
        {%- if   target.name == "prd" -%} analytics_marts_prd
        {%- elif target.name == "dev" -%} analytics_marts_dev
        {%- else -%} invalid_database
        {%- endif -%}
  • macros/get_custom_schema.sqlを作成する
{% macro generate_schema_name(custom_schema_name, node) -%}
  {%- set default_schema = target.schema -%}
  {%- if custom_schema_name is none -%}
    {{ default_schema }}
  {%- else -%}
    {{ custom_schema_name | trim }}
  {%- endif -%}
{%- endmacro %}

ここでカスタムスキーマを利用するようにしています。

#gcloud default credentialsを設定する

bigqueryにアクセスするために、gcloudの認証情報を設定します。

gcloud auth application-default login

#modelsを作成する

dbtのベストプラクティスに従って、以下のようなモデルを作成します。

Best practice guides | dbt

#source

  • models/source.ymlを作成する
version: 2

sources:
  - name: analytics
    schema: analytics_219943355
    tables:
      - name: events
        identifier: events_*

schemaの部分は、BigQueryのデータセットIDです。これはExportによって自動的に決まるようです。
Google AnalyticsからExportされるとevents_YYYYMMDDのような日付によるシャーディングされたテーブルが作成されます。identifierでワイルドカードを指定することで、シャーディングされたテーブル全体にアクセスできます。

#staging

stagingではViewを作成しますが、後続の処理で、シャーディングされた日付でフィルタをしたいので、以下のようなViewを作成します。

  • models/staging/stg_analytics__events.sqlを作成する
SELECT
  *,
  cast(parse_date('%Y%m%d', event_date) AS timestamp) AS event_date_ts,
  _table_suffix AS table_suffix,
  cast(parse_date('%Y%m%d', _table_suffix) AS timestamp) AS table_suffix_ts,
  (
    SELECT value.string_value
    FROM unnest(event_params) AS params
    WHERE params.key = 'page_location'
  ) AS page_location,
  (
    SELECT value.string_value
    FROM unnest(event_params) AS params
    WHERE params.key = 'page_title'
  ) AS page_title,
  timestamp_micros(event_timestamp) AS event_timestamp_ts

FROM
  {{ source('analytics', 'events') }}

その他に必要なカラム

event_date イベント発生日
event_timestamp イベント発生日時
event_name イベント名
event_params イベントパラメータ
page_location: ページのURL
page_title: ページのタイトル

これだけあれば、閲覧数ランキングを取得することができます。
event_paramsは、RECORD型で、keyvalueのペアで構成されています。値を取り出すのは、unnestを利用して取り出します。
stringやintegerなどの型を適切に変換しておくと、後続の処理が楽になります。

#intermediate

  • models/intermediate/int_events__count_pv.sqlを作成する
WITH filtered AS (
  SELECT
    event_date_ts,
    page_location,
    page_title,
    count(*) AS pv_count
  FROM {{ ref('stg_analytics__events') }}
  WHERE event_name = 'page_view'
  and page_location LIKE 'https://cloudandbuild.jp/blog/%'
  {% if not should_full_refresh() %}
    and table_suffix_ts >= timestamp(date_sub(current_date, interval 2 day))
    and event_timestamp_ts >= timestamp(date_sub(current_date, interval 2 day))
    {% endif %}
  GROUP BY event_date_ts, page_location, page_title
)

SELECT * FROM filtered

2日前までのデータだけ処理するようにします。後のmartsでは、incremental modelで差分をマージしていくためです。
こうすることで、全量処理ではなく差分のみ処理するので、データ処理量を削減できます。
テーブルの構造などを破壊的変更したときは、full-refreshする必要があるので、should_full_refresh()で判定しています。

#marts

  • models/marts/blog_pv.sqlを作成する
{% set partitions_to_replace = [
  'timestamp(current_date)',
  'timestamp(date_sub(current_date, interval 2 day))'
] %}

{{
  config(
    materialized = 'incremental',
    incremental_strategy = 'insert_overwrite',
    partition_by = {'field': 'event_date_ts', 'data_type': 'timestamp'},
    partitions = partitions_to_replace
  )
}}

WITH int_events AS (
  SELECT * FROM {{ ref('int_events__count_pv') }}
  {% if is_incremental() %}
    WHERE
      event_date_ts IN (
        {{ partitions_to_replace | join(',') }}
      )
  {% endif %}
),

filtered AS (
  SELECT *
  FROM int_events
  WHERE page_location LIKE 'https://cloudandbuild.jp/blog/%'
)

SELECT *
FROM filtered

blog_pvでは、日付パーティションのテーブルを作成します。intermediateでデータの加工はほぼ終わっているので、このテーブルは単に実体化するのが主な目的です。このテーブルは日毎のPV数を保持します。これ自体でも役に立ちそうなので、martsに配置しています。

上記で作成したテーブルから最新の7日間のPV数のランキングを作成します。

  • models/marts/blog_pv_ranking.sqlを作成する
WITH filtered AS (
  SELECT * FROM {{ ref('blog_pv') }}
  WHERE
    event_date_ts >= timestamp(date_sub(current_date, INTERVAL 7 DAY))
),

last_title AS (
  SELECT
    f1.page_location,
    f1.page_title
  FROM filtered AS f1
  INNER JOIN (
    SELECT
      page_location,
      max(event_date_ts) AS max_event_date_ts
    FROM filtered GROUP BY page_location
  ) AS f2
  ON
    f1.page_location = f2.page_location
    AND f1.event_date_ts = f2.max_event_date_ts
  WHERE f1.page_title NOT LIKE '404%'
),

agg AS (
  SELECT
    filtered.page_location,
    sum(filtered.pv_count) AS pv_count
  FROM filtered
  GROUP BY filtered.page_location
),

final AS (
  SELECT
    agg.page_location,
    page_title,
    pv_count,
    rank() OVER (ORDER BY pv_count DESC) AS pv_rank
  FROM agg
  JOIN last_title using(page_location)
)

SELECT * FROM final

これで、ブログ記事の閲覧数ランキングを作成できました。
日付についてはあまり厳密ではありませんが、要件によってはタイムゾーンを考慮するなどして、適切に処理すると良いでしょう。

#Docker imageを作成する

k8sでdbtを動かす場合は、Docker imageを作成しておきます。

FROM ghcr.io/dbt-labs/dbt-core:1.8.3
WORKDIR /dbt
COPY requirements.lock pyproject.toml .python-version README.md ./
RUN PYTHONDONTWRITEBYTECODE=1 pip install --no-cache-dir -r requirements.lock
COPY . .
RUN dbt deps
CMD ["dbt", "run", "--profiles-dir", "."]

このimageをビルドして、k8sにデプロイします。cronjobで定期実行すると良いでしょう。
この記事では、k8sのmanifestは割愛します。

#閲覧数ランキングを取得する

GoでAPIを作成して、閲覧数ランキングを取得できるようにします。
APIの部分は割愛して、BigQueryからデータを取得する部分のコードを抜粋しています。

  • bigquery clientを作成する
import (
  "cloud.google.com/go/bigquery"
)

func NewBQClient(ctx context.Context, cfg *config.DWH) (*BQClient, func(), error) {
	cli, err := bigquery.NewClient(ctx, cfg.ProjectID)
	if err != nil {
		return nil, nil, err
	}
	return &BQClient{
			cli:     cli,
			cfg:     cfg,
		}, func() {
			cli.Close()
		}, nil
}
  • bigqueryのテーブルの行をマッピングする構造体を作成する
type BlogPVRankRow struct {
	PageLocation string `bigquery:"page_location"`
	PageTitle    string `bigquery:"page_title"`
	PvCount      int    `bigquery:"pv_count"`
	PvRank       int    `bigquery:"pv_rank"`
}
  • bigqueryからデータを取得する
const blogPvRankTableName = "blog_pv_rank"

func tableName(datasetID, tableName string) string {
	return fmt.Sprintf("`%s.%s`", datasetID, tableName)
}

func (r *BQClient) getBlogRanking(ctx context.Context) (*models.BlogRanking, error) {

	c := r.cli
	// Construct a query.
	q := c.Query(fmt.Sprintf(`
SELECT *
FROM %s
ORDER BY pv_rank ASC
LIMIT 5

`, tableName(r.cfg.DatasetID, blogPvRankTableName)))
	// Execute the query.
	it, err := q.Read(ctx)
	if err != nil {
		return nil, fmt.Errorf("failed to read: %v", err)
	}
	// Iterate through the results.
	var rows []*BlogPVRankRow
	for {
		var val BlogPVRankRow
		err := it.Next(&val)
		if err == iterator.Done {
			break
		}
		if err != nil {
			return nil, fmt.Errorf("failed to iterate: %v", err)
		}

		rows = append(rows, &val)
	}
	return toBlogRankingModel(rows), nil
}

これで、閲覧数ランキングを取得することができました。
実際には、APIを実装して、フロントエンドから取得していますが、割愛します。
あと、毎回BigQueryからデータを取得すると、パフォーマンスが悪いので、キャッシュを利用するなどして、パフォーマンスを向上させると良いでしょう。

#まとめ

今回は、自社ブログの記事閲覧数をdbtとBigQueryで分析してランキングを作成する方法をご紹介しました。

dbtを利用することで、データの加工処理を管理しやすくなります。また、Google AnalyticsはBigQueryにExportしておくことで、永続化できるのでとりあえずExportしておくと良いと思います。Google Analyticsのデータだけだと、Google AnalyticsのレポートAPIでできることとさほど変わりませんが、その他のデータがあればクロス分析することもできるので、データはとりあえず残しておきたいですね。