OpenTelemetryを活用してGCP × Python × マイクロサービス構成のObservabilityを高める

こんにちは、マネーフォワード ケッサイのデータチームで与信審査システムやデータ基盤の開発をしているkamillleです。 今回はOpenTelemetryとGoogle Trace, Loggingなどを組み合わせてCloud Run, Functions上で動くPythonアプリケーションのObservabilityを高めている取り組みの中から、外部のドキュメントなどに情報が記載されておらず対応に苦労したポイントをご紹介したいと思います。

GCPにデプロイするPythonアプリケーションへのOpenTelemetry導入に関してはGoogle CloudのドキュメントやGMOさん、キャディさんのブログに詳しく書かれているため割愛します。

Google Cloud のドキュメント

GMO さん: GCP Cloud Trace を使ってみた

キャディさん: Python プロジェクトに OpenTelemetry を導入する

なお、今回利用しているサンプルコードはGitHub上で mfkessai/opentelemetry-python-sample-app としてパブリックに公開しているため、参考になればと思います。

Introduction

技術的な紹介の前に以下の2点について紹介させてください。

  • Observability向上に取り組む背景
  • Observability向上の対象となっている与信審査システム

Observability向上に取り組む背景

簡単に書くと事業の成長に合わせて与信審査システムもスケールさせる必要があり、スケールにあたってどこが課題なのか、どこが課題になりそうかなど「システムの内部で何が起きているのか」を判断可能な状態にするためです。

具体的には、

弊社では掛け売りのプロセスを代行する事業を展開しており、私が所属するデータチームは掛け売りプロセス内で必要となる与信審査を自動で行うシステムの開発に責任を持っています。導入社数が増えたり、既存のお客様のお取引数が増えるのに合わせて弊社で行っている与信審査の総数も増えるため、サービスの成長にあわせて与信審査システムもスケールする必要がある、ということになります。

Observability向上の対象となっている与信審査システム

与信審査システムは複数のアプリケーション群で成り立つマイクロサービス構成となっており、一般的なWeb APIや機械学習モデルの推論機能のAPIなど様々なものがあります。基本的にはFastAPIかFlaskで作られていて、FastAPI製アプリケーションはCloud Runに、Flask製アプリケーションはCloud Functionsにデプロイされています。

導入にあたって難しかった4点

ここからはOpenTelemetryを使ったObservability向上において難しかった4点をご紹介します。

1. Spanに任意のラベル情報を付与する

  • どのアプリケーションが発行したか
  • どの環境(本番、ステージングなど)で発行したか

上記2つをカスタムラベルとしてSpanに付与しています。ラベルがあることで本番環境のアプリケーションAのSpanが含まれているトレースのみ表示する、といった絞り込みがCloud Traceで可能になります。 カスタムラベルの付与は下記コードで行っており、こちらのGitHub Issueで紹介されていた方法を参考にしています。

from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.resources import Resource
from opentelemetry.exporter.cloud_trace import CloudTraceSpanExporter

def init_trace(service_name: str, environment: str):
    ...
    tracer_provider = TracerProvider(
        # service.name, service.environment という名前のカスタムラベルを付与する
        resource=Resource.create(
            {
                "service.name": service_name,
                "service.environment": environment,
            }
        ),
    )
    # resource_regex に渡した正規表現にマッチするラベル名のみがSpanに紐付けられる
    # Cloud Trace ではSpanに紐付けられるラベルが32個までと決まっており、
    # 不用意にその制限に当たるラベルが付与されないようにこのような設定が必要となっている
    # FYI: https://github.com/GoogleCloudPlatform/python-docs-samples/issues/7300#issuecomment-1164737444
    exporter = CloudTraceSpanExporter(resource_regex="service.*")
    ...


init_trace(service_name: "sample-application", environment: "production")

Cloud Traceの検索フォームにLABEL:service.name:flask-on-cloud-functionsLABEL:service.environment:productionと入力することで該当するSpanを含んだトレースを検索できます。

search-with-custom-labels-on-cloud-trace

2. 分散トレーシングの実現

Cloud Run, Cloud Functions上で動くアプリケーションがリクエストを受け取るとHTTPリクエストヘッダにX-Cloud-Trace-Contextがセットされており、ここにTrace ID, Span IDが入る仕組みととなっています。この仕組みを利用し、マイクロサービス間で通信を行う際に自分が受け取ったX-Cloud-Trace-Contextをヘッダに付与してリクエストを発行することで分散トレーシングを実現しています。

packageを入れる等でもっと簡単に分散トレーシングを実現する方法もありそうですが情報にたどり着くことができなかった & 分散トレーシング実現のためのコード数は少なく済んでおり他の方法に乗り換えることが容易な状態であるため弊社では下記のようなコードを使っています。

import requests
from opentelemetry import trace


class TraceContextManager:
    @classmethod
    def get_trace_context(cls):
        """
        分散トレーシングするにはアプリケーション間で同じX-Cloud-Trace-Contextを引き回す必要がある
        X-Cloud-Trace-Context のフォーマットは`TRACE_ID/SPAN_ID;o=TRACE_TRUE`
        https://cloud.google.com/trace/docs/setup#force-trace
        """
        return f"{cls.__get_trace_id()}/{cls.__get_span_id()}"

    @classmethod
    def __span_context(cls):
        return trace.get_current_span().get_span_context()

    @classmethod
    def __get_trace_id(cls):
        """
        Trace IDはOpenTelemetryの仕様では128 ビットの番号を表す32文字の16進数値だが
        OpenTelemetryのPython package内だと別フォーマットで管理されているため
        パッケージ側のフォーマット関数をかませる必要がある
        """
        return trace.format_trace_id(cls.__span_context().trace_id)

    @classmethod
    def __get_span_id(cls):
        return cls.__span_context().span_id


headers = {}
trace_context = TraceContextManager.get_trace_context()
if trace_context:
    headers["X-Cloud-Trace-Context"] = trace_context
res = requests.post(url, json={}, headers=headers)

3. Flask on Cloud Functionsの場合、Trace IDがバラけてしまう

Cloud FunctionsにデプロイしたFlaskアプリケーションの発行するスパンが異なるTrace IDに紐付いてしまうことが起きており、どこで問題が起きているのか・なぜそれが起きているのか把握することが困難な状態となっていました。こちらのIssueで質問したところWSGIを考慮しmiddlewareの設定を行う必要があることがわかり、下記のようなコードを利用しています。

from flask import Flask, current_app
from opentelemetry.instrumentation.wsgi import OpenTelemetryMiddleware


app = Flask(__name__)
# middlewareの設定を行う
with current_app.app_context():
    current_app.wsgi_app = OpenTelemetryMiddleware(current_app.wsgi_app)


def handler(req: Request) -> Response:
    ...

4. Sentryのエラーに対応するトレースやログを容易に見つけられるようにする

Sentryのcustom tagという仕組みを使い、Trace IDをエラーに紐づけてSentryに送信しています。 Sentryに表示されるTrace IDをCloud TraceやLoggingで検索し、該当エラーに関連したトレースやログを簡単に見つけられます。

from opentelemetry import trace


class TraceContextManager:
    @classmethod
    def get_trace_id(cls):
        """
        Trace IDはOpenTelemetryの仕様では128 ビットの番号を表す32文字の16進数値だが
        OpenTelemetryのPython package内だと別フォーマットで管理されているため
        パッケージ側のフォーマット関数をかませる必要がある
        """
        return trace.format_trace_id(cls.__span_context().trace_id)

    @classmethod
    def __span_context(cls):
        return trace.get_current_span().get_span_context()


@app.get("/")
def handler():
  # gcp-trace-idとしてTrace IDをタグ付与する
  sentry_sdk.set_tag("gcp-trace-id", TraceContextManager.get_trace_id())
  ...

終わりに

上記4点を乗り越えて、Observablity向上の運用にあたって最低限必要と考えていた体験を揃えられたため与信審査システムへの導入を行いました。

現状では月に2回30分ほどエンジニアメンバーで集まりObservablitity向上に取り組むo11y会を開催しており、Cloud Trace, Cloud Logging, Cloud Monitoringを画面共有しながらスケールの課題点やシステムの挙動がつかみにくい点を特定し、改善活動に繋げています。 メンバーによって与信審査システムに対する理解度にギャップがありましたがo11y会を通して埋まってきており、当初の目的達成とは別に副次的な効果をもたらしてくれています。

最後まで読んでいただきありがとうございました。良い年をお過ごしください。