ローカルでPub/Sub, Cloud Functionsを同時に動かしながら動作確認する

こんにちは、ピリカ開発チームの九鬼です。

SNSピリカのサービスでは App EngineからPub/SubにメッセージをPublish → サブスクリプションフィルタで絞り込み → Cloud FunctionsからCloud Tasks経由で再度App Engineに戻して処理するケースがあります。

f:id:pirika-inc:20211015162905p:plain
SNSピリカサービス GCPサービス間連携例

ローカル環境で開発するとき、Pub/SubからCloud FunctionsをPushサブスクリプション経由で呼び出したいことがあります。

その際のやり方について、備忘録として掲載します。

前提

以下の構成で動作することを想定しています。

(Inbound Message)
    \--> [Pub/Sub]<port=8085>
         - トピック: projects/alice/topics/bob
         - サブスクリプション: projects/alice/subscriptions/charley
             \--> [Cloud Functions]<http://127.0.0.1/emily:8080>
  • プロジェクトID: alice
  • Cloud Functions
    • 関数名: emily
    • ローカル環境での動作ポート: 8080
  • Pub/Sub

また、Functionsで立ち上げる関数は、以下の様にメッセージのみを表示するものとします。

import flask
import json


def emily(request: flask.Request):
    print("received message!: " + json.dumps(request.get_json()))
    return ""

環境構築

以下のフローで動作環境を構築します(本ケースでは、Python3.7, pipenvを利用します)。

  1. functions-frameworkのインストール
  2. functions-frameworkを立ち上げる
  3. gcloudのPub/Sub Emulatorを立ち上がる
  4. PubSub Emulator上でトピック、サブスクリプションを作成する

なお、2., 3., 4.はいずれも別のターミナル上で実施する必要があります(functions-framework、Pub/Sub Emulatorともにフォアグラウンドにプロセスが動作し続けるため)

1. functions-frameworkのインストール

pipenvを使う場合、以下の様にfunctions-frameworkをPipfileを記載した上でpipenv installを実行します。

[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"

[packages]
flask = "*"

[dev-packages]
flake8 = "*"
autopep8 = "*"
functions-framework = "*"

[scripts]
functions="functions-framework --target emily --debug"

2. functions-frameworkを立ち上げる

pipenvの仮想環境を使って、functions-frameworkを立ち上げます。

pipenv run functions

3. gcloudのPub/Sub Emulatorを立ち上がる

gcloudを使って、Pub/Subのエミュレータを立ち上げます。

gcloud beta emulators pubsub start --project alice

4. Pub/Sub Emulator上でトピック、サブスクリプションを作成する

以下の通りシェルスクリプト等でトピック、サブスクリプションを作成します。

PROJECT="alice"
TOPIC="bob"
SUBSCRIBER="charley"

PUBSUBPORT="8085"
FUNCTIONPORT="8080"
api_url="http://127.0.0.1:${PUBSUBPORT}/v1/projects/${PROJECT}"

# トピックを作成する
curl -X PUT ${api_url}/topics/${TOPIC}

# サブスクリプションを追加する
curl -s -X PUT ${api_url}/subscriptions/${SUBSCRIBER} \
    -H "content-type: application/json" \
    --data '{
        "topic": "projects/'${PROJECT}'/topics/'${TOPIC}'",
        "pushConfig":{"pushEndpoint": "http://127.0.0.1:'${FUNCTIONPORT}'"}
    }'

# 追加したサブスクリプションを表示する
curl -X GET ${api_url}/subscriptions

問題なく設定できていれば、コンソール上に以下の様に設定内容が出力されます。

{
  "subscriptions": [{
    "name": "projects/alice/subscriptions/charley",
    "topic": "projects/alice/topics/bob",
    "pushConfig": {
      "pushEndpoint": "http://127.0.0.1:8080"
    },
    "ackDeadlineSeconds": 10,
    "messageRetentionDuration": "604800s"
  }]
}

Pub/Sub経由でFunctionsを呼び出す

Pub/Subへのモックメッセージを作成し、Pub/SubのAPIに送信します。

以下の通りシェルスクリプト等でメッセージを作成し、curlでPub/Sub EmulatorにメッセージをPublishします。

PROJECT="alice"
TOPIC="bob"

data='
{
    "my_message": "hello world!"
}
'
data_enc=$(echo $data | base64 )   # data should be base64 encoded.
message='
{
  "messages": [
    {
      "data": "'$data_enc'",
      "attributes": '$data'
    }
  ]
}
'

url="http://127.0.0.1:8085/v1/projects/$PROJECT/topics/$TOPIC:publish"
curl $url \
    -H "Content-Type: application/json" \
    -d "$message"

送信後、functions-frameworkを立ち上げているコンソールで以下のメッセージを確認できます。

received message!: {"subscription": "projects/alice/subscriptions/charley", "message": {"data": "eyAibXlfbWVzc2FnZSI6ICJoZWxsbyB3b3JsZCEiIH0K", "messageId": "1", "attributes": {"my_message": "hello world!"}}}

付記: ローカルのApp EngineのPub/Subへの向け先をローカルのものにするには?

以下の通り、App Engine立ち上げ前に環境変数を設定しておくとOKです(リファレンス)。

自動設定する場合

以下コマンドにより、PUBSUB_EMULATOR_HOST=localhost:8085が環境変数に設定されます。

$(gcloud beta emulators pubsub env-init)

手動設定する場合

Pub/Subのローカルエミュレータのホストやポートを変更している場合、以下の様に手動設定します。

export PUBSUB_EMULATOR_HOST=localhost:8085
export PUBSUB_PROJECT_ID=alice

以上になります。ご覧いただきありがとうございます!