自社ブログの記事閲覧数をdbtとBigQueryで分析してランキングを作成する
2024-09-13
#目次
#はじめに
今回は、自社ブログに閲覧数ランキングを導入していきたいと思います。
以前、データ分析基盤を作ったとき、dbt, BigQueryを利用した経験があります。そのときの経験を活かして、今回は、Google AnalyticsのデータをBigQueryにExportし、dbtで集計処理を行い、閲覧数ランキングを取得する方法をご紹介します。
#実現方法の検討
Google Analyticsは、すでに導入されているので、それを利用して閲覧数ランキングを導入することを検討します。
- 案1: Google AnalyticsのレポートAPIを利用して閲覧数ランキングを取得する
- 案2: Google AnalyticsのデータをBigQueryへExportし、dbtで集計して閲覧数ランキングを取得する
案1は、手っ取り早く実装できますし、おすすめです。しかし、今回は案2を選択します。理由は、BigQueryで独自にデータ分析することで、余計なデータを除去したり、データを加工することができるからです。また、dbtを利用することで、データの加工処理を管理しやすくなりますので、そのデモンストレーションも兼ねて案2を選択します。実用においても、BigQueryにExportしておくと、永続化できるのでとりあえずExportしておくと良いと思います。Google Analyticsのデータは、データの保持期間があるので。
#システム構成
システム構成のbeforeとafterを示します。赤い破線が変更点です。以下概要。
- Google AnalyticsのデータをBigQueryにExportする
- dbtで集計処理を行う
- GoでAPIを作成し、閲覧数ランキングを取得する
#before
#after
#BigQueryにGoogle AnalyticsのデータをExportする
Google Analyticsを導入したらできるだけ早く、BigQueryにデータをExportしておくと良いと思います。設定をしたその日以降のデータがExportされました。過去に遡ってはExportされませんでした。
詳細は、こちらの記事を参照してください。
ボタンをポチポチするだけで、設定は完了です。簡単です。
#dbtで集計処理を行う
#dbtのプロジェクトを作成する
# venvを作成する
python -m venv .venv
. .venv/bin/activate
# package管理にryeを利用し、dbtをインストールする
rye add dbt-core dbt-bigquery
# dbtのプロジェクトを作成する
dbt init analytics
- profiles.ymlを作成する
analytics:
target: "{{ env_var('DBT_TARGET', 'dev') }}"
outputs:
dev:
type: bigquery
method: oauth
project: <your project_id>
schema: dbtschema
threads: 4
location: asia-northeast1
prd:
type: bigquery
method: oauth
project: <your project_id>
schema: dbtschema
threads: 4
location: asia-northeast1
個人的には、profiles.yml
は、プロジェクト直下において、dbt run --profiles-dir .
で運用するのが好みです。CIや本番環境で切り替えるのが楽だからです。
上記のschema
は、環境ごとに書き換えたいのでダミー値です。dbt_project.yml
に記述します。
dbt_project.yml
のmodelsを設定する。
models:
analytics:
staging:
+materialized: view
+schema: |
{%- if target.name == "prd" -%} analytics_staging_prd
{%- elif target.name == "dev" -%} analytics_staging_dev
{%- else -%} invalid_database
{%- endif -%}
intermediate:
+materialized: ephemeral
+schema: |
{%- if target.name == "prd" -%} analytics_intermediate_prd
{%- elif target.name == "dev" -%} analytics_intermediate_dev
{%- else -%} invalid_database
{%- endif -%}
marts:
+materialized: table
+schema: |
{%- if target.name == "prd" -%} analytics_marts_prd
{%- elif target.name == "dev" -%} analytics_marts_dev
{%- else -%} invalid_database
{%- endif -%}
macros/get_custom_schema.sql
を作成する
{% macro generate_schema_name(custom_schema_name, node) -%}
{%- set default_schema = target.schema -%}
{%- if custom_schema_name is none -%}
{{ default_schema }}
{%- else -%}
{{ custom_schema_name | trim }}
{%- endif -%}
{%- endmacro %}
ここでカスタムスキーマを利用するようにしています。
#gcloud default credentialsを設定する
bigqueryにアクセスするために、gcloudの認証情報を設定します。
gcloud auth application-default login
#modelsを作成する
dbtのベストプラクティスに従って、以下のようなモデルを作成します。
#source
models/source.yml
を作成する
version: 2
sources:
- name: analytics
schema: analytics_219943355
tables:
- name: events
identifier: events_*
schemaの部分は、BigQueryのデータセットIDです。これはExportによって自動的に決まるようです。
Google AnalyticsからExportされるとevents_YYYYMMDD
のような日付によるシャーディングされたテーブルが作成されます。identifierでワイルドカードを指定することで、シャーディングされたテーブル全体にアクセスできます。
#staging
stagingではViewを作成しますが、後続の処理で、シャーディングされた日付でフィルタをしたいので、以下のようなViewを作成します。
models/staging/stg_analytics__events.sql
を作成する
SELECT
*,
cast(parse_date('%Y%m%d', event_date) AS timestamp) AS event_date_ts,
_table_suffix AS table_suffix,
cast(parse_date('%Y%m%d', _table_suffix) AS timestamp) AS table_suffix_ts,
(
SELECT value.string_value
FROM unnest(event_params) AS params
WHERE params.key = 'page_location'
) AS page_location,
(
SELECT value.string_value
FROM unnest(event_params) AS params
WHERE params.key = 'page_title'
) AS page_title,
timestamp_micros(event_timestamp) AS event_timestamp_ts
FROM
{{ source('analytics', 'events') }}
その他に必要なカラム
event_date | イベント発生日 |
event_timestamp | イベント発生日時 |
event_name | イベント名 |
event_params | イベントパラメータ page_location: ページのURL page_title: ページのタイトル |
これだけあれば、閲覧数ランキングを取得することができます。
event_paramsは、RECORD
型で、key
とvalue
のペアで構成されています。値を取り出すのは、unnest
を利用して取り出します。
stringやintegerなどの型を適切に変換しておくと、後続の処理が楽になります。
#intermediate
models/intermediate/int_events__count_pv.sql
を作成する
WITH filtered AS (
SELECT
event_date_ts,
page_location,
page_title,
count(*) AS pv_count
FROM {{ ref('stg_analytics__events') }}
WHERE event_name = 'page_view'
and page_location LIKE 'https://cloudandbuild.jp/blog/%'
{% if not should_full_refresh() %}
and table_suffix_ts >= timestamp(date_sub(current_date, interval 2 day))
and event_timestamp_ts >= timestamp(date_sub(current_date, interval 2 day))
{% endif %}
GROUP BY event_date_ts, page_location, page_title
)
SELECT * FROM filtered
2日前までのデータだけ処理するようにします。後のmartsでは、incremental modelで差分をマージしていくためです。
こうすることで、全量処理ではなく差分のみ処理するので、データ処理量を削減できます。
テーブルの構造などを破壊的変更したときは、full-refreshする必要があるので、should_full_refresh()
で判定しています。
#marts
models/marts/blog_pv.sql
を作成する
{% set partitions_to_replace = [
'timestamp(current_date)',
'timestamp(date_sub(current_date, interval 2 day))'
] %}
{{
config(
materialized = 'incremental',
incremental_strategy = 'insert_overwrite',
partition_by = {'field': 'event_date_ts', 'data_type': 'timestamp'},
partitions = partitions_to_replace
)
}}
WITH int_events AS (
SELECT * FROM {{ ref('int_events__count_pv') }}
{% if is_incremental() %}
WHERE
event_date_ts IN (
{{ partitions_to_replace | join(',') }}
)
{% endif %}
),
filtered AS (
SELECT *
FROM int_events
WHERE page_location LIKE 'https://cloudandbuild.jp/blog/%'
)
SELECT *
FROM filtered
blog_pv
では、日付パーティションのテーブルを作成します。intermediate
でデータの加工はほぼ終わっているので、このテーブルは単に実体化するのが主な目的です。このテーブルは日毎のPV数を保持します。これ自体でも役に立ちそうなので、martsに配置しています。
上記で作成したテーブルから最新の7日間のPV数のランキングを作成します。
models/marts/blog_pv_ranking.sql
を作成する
WITH filtered AS (
SELECT * FROM {{ ref('blog_pv') }}
WHERE
event_date_ts >= timestamp(date_sub(current_date, INTERVAL 7 DAY))
),
last_title AS (
SELECT
f1.page_location,
f1.page_title
FROM filtered AS f1
INNER JOIN (
SELECT
page_location,
max(event_date_ts) AS max_event_date_ts
FROM filtered GROUP BY page_location
) AS f2
ON
f1.page_location = f2.page_location
AND f1.event_date_ts = f2.max_event_date_ts
WHERE f1.page_title NOT LIKE '404%'
),
agg AS (
SELECT
filtered.page_location,
sum(filtered.pv_count) AS pv_count
FROM filtered
GROUP BY filtered.page_location
),
final AS (
SELECT
agg.page_location,
page_title,
pv_count,
rank() OVER (ORDER BY pv_count DESC) AS pv_rank
FROM agg
JOIN last_title using(page_location)
)
SELECT * FROM final
これで、ブログ記事の閲覧数ランキングを作成できました。
日付についてはあまり厳密ではありませんが、要件によってはタイムゾーンを考慮するなどして、適切に処理すると良いでしょう。
#Docker imageを作成する
k8sでdbtを動かす場合は、Docker imageを作成しておきます。
FROM ghcr.io/dbt-labs/dbt-core:1.8.3
WORKDIR /dbt
COPY requirements.lock pyproject.toml .python-version README.md ./
RUN PYTHONDONTWRITEBYTECODE=1 pip install --no-cache-dir -r requirements.lock
COPY . .
RUN dbt deps
CMD ["dbt", "run", "--profiles-dir", "."]
このimageをビルドして、k8sにデプロイします。cronjobで定期実行すると良いでしょう。
この記事では、k8sのmanifestは割愛します。
#閲覧数ランキングを取得する
GoでAPIを作成して、閲覧数ランキングを取得できるようにします。
APIの部分は割愛して、BigQueryからデータを取得する部分のコードを抜粋しています。
- bigquery clientを作成する
import (
"cloud.google.com/go/bigquery"
)
func NewBQClient(ctx context.Context, cfg *config.DWH) (*BQClient, func(), error) {
cli, err := bigquery.NewClient(ctx, cfg.ProjectID)
if err != nil {
return nil, nil, err
}
return &BQClient{
cli: cli,
cfg: cfg,
}, func() {
cli.Close()
}, nil
}
- bigqueryのテーブルの行をマッピングする構造体を作成する
type BlogPVRankRow struct {
PageLocation string `bigquery:"page_location"`
PageTitle string `bigquery:"page_title"`
PvCount int `bigquery:"pv_count"`
PvRank int `bigquery:"pv_rank"`
}
- bigqueryからデータを取得する
const blogPvRankTableName = "blog_pv_rank"
func tableName(datasetID, tableName string) string {
return fmt.Sprintf("`%s.%s`", datasetID, tableName)
}
func (r *BQClient) getBlogRanking(ctx context.Context) (*models.BlogRanking, error) {
c := r.cli
// Construct a query.
q := c.Query(fmt.Sprintf(`
SELECT *
FROM %s
ORDER BY pv_rank ASC
LIMIT 5
`, tableName(r.cfg.DatasetID, blogPvRankTableName)))
// Execute the query.
it, err := q.Read(ctx)
if err != nil {
return nil, fmt.Errorf("failed to read: %v", err)
}
// Iterate through the results.
var rows []*BlogPVRankRow
for {
var val BlogPVRankRow
err := it.Next(&val)
if err == iterator.Done {
break
}
if err != nil {
return nil, fmt.Errorf("failed to iterate: %v", err)
}
rows = append(rows, &val)
}
return toBlogRankingModel(rows), nil
}
これで、閲覧数ランキングを取得することができました。
実際には、APIを実装して、フロントエンドから取得していますが、割愛します。
あと、毎回BigQueryからデータを取得すると、パフォーマンスが悪いので、キャッシュを利用するなどして、パフォーマンスを向上させると良いでしょう。
#まとめ
今回は、自社ブログの記事閲覧数をdbtとBigQueryで分析してランキングを作成する方法をご紹介しました。
dbtを利用することで、データの加工処理を管理しやすくなります。また、Google AnalyticsはBigQueryにExportしておくことで、永続化できるのでとりあえずExportしておくと良いと思います。Google Analyticsのデータだけだと、Google AnalyticsのレポートAPIでできることとさほど変わりませんが、その他のデータがあればクロス分析することもできるので、データはとりあえず残しておきたいですね。