You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

ext_otel.py 7.0KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168
  1. import atexit
  2. import logging
  3. import os
  4. import platform
  5. import socket
  6. import sys
  7. from typing import Union
  8. from celery.signals import worker_init # type: ignore
  9. from flask_login import user_loaded_from_request, user_logged_in # type: ignore
  10. from opentelemetry import trace
  11. from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter
  12. from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
  13. from opentelemetry.instrumentation.celery import CeleryInstrumentor
  14. from opentelemetry.instrumentation.flask import FlaskInstrumentor
  15. from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
  16. from opentelemetry.metrics import get_meter, get_meter_provider, set_meter_provider
  17. from opentelemetry.propagate import set_global_textmap
  18. from opentelemetry.propagators.b3 import B3Format
  19. from opentelemetry.propagators.composite import CompositePropagator
  20. from opentelemetry.sdk.metrics import MeterProvider
  21. from opentelemetry.sdk.metrics.export import ConsoleMetricExporter, PeriodicExportingMetricReader
  22. from opentelemetry.sdk.resources import Resource
  23. from opentelemetry.sdk.trace import TracerProvider
  24. from opentelemetry.sdk.trace.export import (
  25. BatchSpanProcessor,
  26. ConsoleSpanExporter,
  27. )
  28. from opentelemetry.sdk.trace.sampling import ParentBasedTraceIdRatio
  29. from opentelemetry.semconv.resource import ResourceAttributes
  30. from opentelemetry.trace import Span, get_current_span, get_tracer_provider, set_tracer_provider
  31. from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
  32. from opentelemetry.trace.status import StatusCode
  33. from configs import dify_config
  34. from dify_app import DifyApp
  35. @user_logged_in.connect
  36. @user_loaded_from_request.connect
  37. def on_user_loaded(_sender, user):
  38. if user:
  39. current_span = get_current_span()
  40. if current_span:
  41. current_span.set_attribute("service.tenant.id", user.current_tenant_id)
  42. current_span.set_attribute("service.user.id", user.id)
  43. def init_app(app: DifyApp):
  44. if dify_config.ENABLE_OTEL:
  45. setup_context_propagation()
  46. # Initialize OpenTelemetry
  47. # Follow Semantic Convertions 1.32.0 to define resource attributes
  48. resource = Resource(
  49. attributes={
  50. ResourceAttributes.SERVICE_NAME: dify_config.APPLICATION_NAME,
  51. ResourceAttributes.SERVICE_VERSION: f"dify-{dify_config.CURRENT_VERSION}-{dify_config.COMMIT_SHA}",
  52. ResourceAttributes.PROCESS_PID: os.getpid(),
  53. ResourceAttributes.DEPLOYMENT_ENVIRONMENT: f"{dify_config.DEPLOY_ENV}-{dify_config.EDITION}",
  54. ResourceAttributes.HOST_NAME: socket.gethostname(),
  55. ResourceAttributes.HOST_ARCH: platform.machine(),
  56. "custom.deployment.git_commit": dify_config.COMMIT_SHA,
  57. ResourceAttributes.HOST_ID: platform.node(),
  58. ResourceAttributes.OS_TYPE: platform.system().lower(),
  59. ResourceAttributes.OS_DESCRIPTION: platform.platform(),
  60. ResourceAttributes.OS_VERSION: platform.version(),
  61. }
  62. )
  63. sampler = ParentBasedTraceIdRatio(dify_config.OTEL_SAMPLING_RATE)
  64. provider = TracerProvider(resource=resource, sampler=sampler)
  65. set_tracer_provider(provider)
  66. exporter: Union[OTLPSpanExporter, ConsoleSpanExporter]
  67. metric_exporter: Union[OTLPMetricExporter, ConsoleMetricExporter]
  68. if dify_config.OTEL_EXPORTER_TYPE == "otlp":
  69. exporter = OTLPSpanExporter(
  70. endpoint=dify_config.OTLP_BASE_ENDPOINT + "/v1/traces",
  71. headers={"Authorization": f"Bearer {dify_config.OTLP_API_KEY}"},
  72. )
  73. metric_exporter = OTLPMetricExporter(
  74. endpoint=dify_config.OTLP_BASE_ENDPOINT + "/v1/metrics",
  75. headers={"Authorization": f"Bearer {dify_config.OTLP_API_KEY}"},
  76. )
  77. else:
  78. # Fallback to console exporter
  79. exporter = ConsoleSpanExporter()
  80. metric_exporter = ConsoleMetricExporter()
  81. provider.add_span_processor(
  82. BatchSpanProcessor(
  83. exporter,
  84. max_queue_size=dify_config.OTEL_MAX_QUEUE_SIZE,
  85. schedule_delay_millis=dify_config.OTEL_BATCH_EXPORT_SCHEDULE_DELAY,
  86. max_export_batch_size=dify_config.OTEL_MAX_EXPORT_BATCH_SIZE,
  87. export_timeout_millis=dify_config.OTEL_BATCH_EXPORT_TIMEOUT,
  88. )
  89. )
  90. reader = PeriodicExportingMetricReader(
  91. metric_exporter,
  92. export_interval_millis=dify_config.OTEL_METRIC_EXPORT_INTERVAL,
  93. export_timeout_millis=dify_config.OTEL_METRIC_EXPORT_TIMEOUT,
  94. )
  95. set_meter_provider(MeterProvider(resource=resource, metric_readers=[reader]))
  96. if not is_celery_worker():
  97. init_flask_instrumentor(app)
  98. CeleryInstrumentor(tracer_provider=get_tracer_provider(), meter_provider=get_meter_provider()).instrument()
  99. init_sqlalchemy_instrumentor(app)
  100. atexit.register(shutdown_tracer)
  101. def is_celery_worker():
  102. return "celery" in sys.argv[0].lower()
  103. def init_flask_instrumentor(app: DifyApp):
  104. meter = get_meter("http_metrics", version=dify_config.CURRENT_VERSION)
  105. _http_response_counter = meter.create_counter(
  106. "http.server.response.count", description="Total number of HTTP responses by status code", unit="{response}"
  107. )
  108. def response_hook(span: Span, status: str, response_headers: list):
  109. if span and span.is_recording():
  110. if status.startswith("2"):
  111. span.set_status(StatusCode.OK)
  112. else:
  113. span.set_status(StatusCode.ERROR, status)
  114. status = status.split(" ")[0]
  115. status_code = int(status)
  116. status_class = f"{status_code // 100}xx"
  117. _http_response_counter.add(1, {"status_code": status_code, "status_class": status_class})
  118. instrumentor = FlaskInstrumentor()
  119. if dify_config.DEBUG:
  120. logging.info("Initializing Flask instrumentor")
  121. instrumentor.instrument_app(app, response_hook=response_hook)
  122. def init_sqlalchemy_instrumentor(app: DifyApp):
  123. with app.app_context():
  124. engines = list(app.extensions["sqlalchemy"].engines.values())
  125. SQLAlchemyInstrumentor().instrument(enable_commenter=True, engines=engines)
  126. def setup_context_propagation():
  127. # Configure propagators
  128. set_global_textmap(
  129. CompositePropagator(
  130. [
  131. TraceContextTextMapPropagator(), # W3C trace context
  132. B3Format(), # B3 propagation (used by many systems)
  133. ]
  134. )
  135. )
  136. @worker_init.connect(weak=False)
  137. def init_celery_worker(*args, **kwargs):
  138. tracer_provider = get_tracer_provider()
  139. metric_provider = get_meter_provider()
  140. if dify_config.DEBUG:
  141. logging.info("Initializing OpenTelemetry for Celery worker")
  142. CeleryInstrumentor(tracer_provider=tracer_provider, meter_provider=metric_provider).instrument()
  143. def shutdown_tracer():
  144. provider = trace.get_tracer_provider()
  145. if hasattr(provider, "force_flush"):
  146. provider.force_flush()