選択できるのは25トピックまでです。 トピックは、先頭が英数字で、英数字とダッシュ('-')を使用した35文字以内のものにしてください。

ext_otel.py 5.6KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130
  1. import atexit
  2. import os
  3. import platform
  4. import socket
  5. from typing import Union
  6. from flask_login import user_loaded_from_request, user_logged_in # type: ignore
  7. from opentelemetry import trace
  8. from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter
  9. from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
  10. from opentelemetry.instrumentation.flask import FlaskInstrumentor
  11. from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
  12. from opentelemetry.metrics import set_meter_provider
  13. from opentelemetry.propagate import set_global_textmap
  14. from opentelemetry.propagators.b3 import B3Format
  15. from opentelemetry.propagators.composite import CompositePropagator
  16. from opentelemetry.sdk.metrics import MeterProvider
  17. from opentelemetry.sdk.metrics.export import ConsoleMetricExporter, PeriodicExportingMetricReader
  18. from opentelemetry.sdk.resources import Resource
  19. from opentelemetry.sdk.trace import TracerProvider
  20. from opentelemetry.sdk.trace.export import (
  21. BatchSpanProcessor,
  22. ConsoleSpanExporter,
  23. )
  24. from opentelemetry.sdk.trace.sampling import ParentBasedTraceIdRatio
  25. from opentelemetry.semconv.resource import ResourceAttributes
  26. from opentelemetry.trace import Span, get_current_span, set_tracer_provider
  27. from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
  28. from opentelemetry.trace.status import StatusCode
  29. from configs import dify_config
  30. from dify_app import DifyApp
  31. @user_logged_in.connect
  32. @user_loaded_from_request.connect
  33. def on_user_loaded(_sender, user):
  34. if user:
  35. current_span = get_current_span()
  36. if current_span:
  37. current_span.set_attribute("service.tenant.id", user.current_tenant_id)
  38. current_span.set_attribute("service.user.id", user.id)
  39. def init_app(app: DifyApp):
  40. if dify_config.ENABLE_OTEL:
  41. setup_context_propagation()
  42. # Initialize OpenTelemetry
  43. # Follow Semantic Convertions 1.32.0 to define resource attributes
  44. resource = Resource(
  45. attributes={
  46. ResourceAttributes.SERVICE_NAME: dify_config.APPLICATION_NAME,
  47. ResourceAttributes.SERVICE_VERSION: f"dify-{dify_config.CURRENT_VERSION}-{dify_config.COMMIT_SHA}",
  48. ResourceAttributes.PROCESS_PID: os.getpid(),
  49. ResourceAttributes.DEPLOYMENT_ENVIRONMENT: f"{dify_config.DEPLOY_ENV}-{dify_config.EDITION}",
  50. ResourceAttributes.HOST_NAME: socket.gethostname(),
  51. ResourceAttributes.HOST_ARCH: platform.machine(),
  52. "custom.deployment.git_commit": dify_config.COMMIT_SHA,
  53. ResourceAttributes.HOST_ID: platform.node(),
  54. ResourceAttributes.OS_TYPE: platform.system().lower(),
  55. ResourceAttributes.OS_DESCRIPTION: platform.platform(),
  56. ResourceAttributes.OS_VERSION: platform.version(),
  57. }
  58. )
  59. sampler = ParentBasedTraceIdRatio(dify_config.OTEL_SAMPLING_RATE)
  60. provider = TracerProvider(resource=resource, sampler=sampler)
  61. set_tracer_provider(provider)
  62. exporter: Union[OTLPSpanExporter, ConsoleSpanExporter]
  63. metric_exporter: Union[OTLPMetricExporter, ConsoleMetricExporter]
  64. if dify_config.OTEL_EXPORTER_TYPE == "otlp":
  65. exporter = OTLPSpanExporter(
  66. endpoint=dify_config.OTLP_BASE_ENDPOINT + "/v1/traces",
  67. headers={"Authorization": f"Bearer {dify_config.OTLP_API_KEY}"},
  68. )
  69. metric_exporter = OTLPMetricExporter(
  70. endpoint=dify_config.OTLP_BASE_ENDPOINT + "/v1/metrics",
  71. headers={"Authorization": f"Bearer {dify_config.OTLP_API_KEY}"},
  72. )
  73. else:
  74. # Fallback to console exporter
  75. exporter = ConsoleSpanExporter()
  76. metric_exporter = ConsoleMetricExporter()
  77. provider.add_span_processor(
  78. BatchSpanProcessor(
  79. exporter,
  80. max_queue_size=dify_config.OTEL_MAX_QUEUE_SIZE,
  81. schedule_delay_millis=dify_config.OTEL_BATCH_EXPORT_SCHEDULE_DELAY,
  82. max_export_batch_size=dify_config.OTEL_MAX_EXPORT_BATCH_SIZE,
  83. export_timeout_millis=dify_config.OTEL_BATCH_EXPORT_TIMEOUT,
  84. )
  85. )
  86. reader = PeriodicExportingMetricReader(
  87. metric_exporter,
  88. export_interval_millis=dify_config.OTEL_METRIC_EXPORT_INTERVAL,
  89. export_timeout_millis=dify_config.OTEL_METRIC_EXPORT_TIMEOUT,
  90. )
  91. set_meter_provider(MeterProvider(resource=resource, metric_readers=[reader]))
  92. def response_hook(span: Span, status: str, response_headers: list):
  93. if span and span.is_recording():
  94. if status.startswith("2"):
  95. span.set_status(StatusCode.OK)
  96. else:
  97. span.set_status(StatusCode.ERROR, status)
  98. instrumentor = FlaskInstrumentor()
  99. instrumentor.instrument_app(app, response_hook=response_hook)
  100. with app.app_context():
  101. engines = list(app.extensions["sqlalchemy"].engines.values())
  102. SQLAlchemyInstrumentor().instrument(enable_commenter=True, engines=engines)
  103. atexit.register(shutdown_tracer)
  104. def setup_context_propagation():
  105. # Configure propagators
  106. set_global_textmap(
  107. CompositePropagator(
  108. [
  109. TraceContextTextMapPropagator(), # W3C trace context
  110. B3Format(), # B3 propagation (used by many systems)
  111. ]
  112. )
  113. )
  114. def shutdown_tracer():
  115. provider = trace.get_tracer_provider()
  116. if hasattr(provider, "force_flush"):
  117. provider.force_flush()