You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

ext_otel.py 8.7KB


  1. import atexit
  2. import logging
  3. import os
  4. import platform
  5. import socket
  6. import sys
  7. from typing import Union
  8. from celery.signals import worker_init # type: ignore
  9. from flask_login import user_loaded_from_request, user_logged_in # type: ignore
  10. from configs import dify_config
  11. from dify_app import DifyApp
  12. @user_logged_in.connect
  13. @user_loaded_from_request.connect
  14. def on_user_loaded(_sender, user):
  15. if dify_config.ENABLE_OTEL:
  16. from opentelemetry.trace import get_current_span
  17. if user:
  18. current_span = get_current_span()
  19. if current_span:
  20. current_span.set_attribute("service.tenant.id", user.current_tenant_id)
  21. current_span.set_attribute("service.user.id", user.id)
  22. def init_app(app: DifyApp):
  23. def is_celery_worker():
  24. return "celery" in sys.argv[0].lower()
  25. def instrument_exception_logging():
  26. exception_handler = ExceptionLoggingHandler()
  27. logging.getLogger().addHandler(exception_handler)
  28. def init_flask_instrumentor(app: DifyApp):
  29. meter = get_meter("http_metrics", version=dify_config.CURRENT_VERSION)
  30. _http_response_counter = meter.create_counter(
  31. "http.server.response.count", description="Total number of HTTP responses by status code", unit="{response}"
  32. )
  33. def response_hook(span: Span, status: str, response_headers: list):
  34. if span and span.is_recording():
  35. if status.startswith("2"):
  36. span.set_status(StatusCode.OK)
  37. else:
  38. span.set_status(StatusCode.ERROR, status)
  39. status = status.split(" ")[0]
  40. status_code = int(status)
  41. status_class = f"{status_code // 100}xx"
  42. _http_response_counter.add(1, {"status_code": status_code, "status_class": status_class})
  43. instrumentor = FlaskInstrumentor()
  44. if dify_config.DEBUG:
  45. logging.info("Initializing Flask instrumentor")
  46. instrumentor.instrument_app(app, response_hook=response_hook)
  47. def init_sqlalchemy_instrumentor(app: DifyApp):
  48. with app.app_context():
  49. engines = list(app.extensions["sqlalchemy"].engines.values())
  50. SQLAlchemyInstrumentor().instrument(enable_commenter=True, engines=engines)
  51. def setup_context_propagation():
  52. # Configure propagators
  53. set_global_textmap(
  54. CompositePropagator(
  55. [
  56. TraceContextTextMapPropagator(), # W3C trace context
  57. B3Format(), # B3 propagation (used by many systems)
  58. ]
  59. )
  60. )
  61. def shutdown_tracer():
  62. provider = trace.get_tracer_provider()
  63. if hasattr(provider, "force_flush"):
  64. provider.force_flush()
  65. class ExceptionLoggingHandler(logging.Handler):
  66. """Custom logging handler that creates spans for logging.exception() calls"""
  67. def emit(self, record):
  68. try:
  69. if record.exc_info:
  70. tracer = get_tracer_provider().get_tracer("dify.exception.logging")
  71. with tracer.start_as_current_span(
  72. "log.exception",
  73. attributes={
  74. "log.level": record.levelname,
  75. "log.message": record.getMessage(),
  76. "log.logger": record.name,
  77. "log.file.path": record.pathname,
  78. "log.file.line": record.lineno,
  79. },
  80. ) as span:
  81. span.set_status(StatusCode.ERROR)
  82. span.record_exception(record.exc_info[1])
  83. span.set_attribute("exception.type", record.exc_info[0].__name__)
  84. span.set_attribute("exception.message", str(record.exc_info[1]))
  85. except Exception:
  86. pass
  87. from opentelemetry import trace
  88. from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter
  89. from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
  90. from opentelemetry.instrumentation.celery import CeleryInstrumentor
  91. from opentelemetry.instrumentation.flask import FlaskInstrumentor
  92. from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
  93. from opentelemetry.metrics import get_meter, get_meter_provider, set_meter_provider
  94. from opentelemetry.propagate import set_global_textmap
  95. from opentelemetry.propagators.b3 import B3Format
  96. from opentelemetry.propagators.composite import CompositePropagator
  97. from opentelemetry.sdk.metrics import MeterProvider
  98. from opentelemetry.sdk.metrics.export import ConsoleMetricExporter, PeriodicExportingMetricReader
  99. from opentelemetry.sdk.resources import Resource
  100. from opentelemetry.sdk.trace import TracerProvider
  101. from opentelemetry.sdk.trace.export import (
  102. BatchSpanProcessor,
  103. ConsoleSpanExporter,
  104. )
  105. from opentelemetry.sdk.trace.sampling import ParentBasedTraceIdRatio
  106. from opentelemetry.semconv.resource import ResourceAttributes
  107. from opentelemetry.trace import Span, get_tracer_provider, set_tracer_provider
  108. from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
  109. from opentelemetry.trace.status import StatusCode
  110. setup_context_propagation()
  111. # Initialize OpenTelemetry
  112. # Follow Semantic Convertions 1.32.0 to define resource attributes
  113. resource = Resource(
  114. attributes={
  115. ResourceAttributes.SERVICE_NAME: dify_config.APPLICATION_NAME,
  116. ResourceAttributes.SERVICE_VERSION: f"dify-{dify_config.CURRENT_VERSION}-{dify_config.COMMIT_SHA}",
  117. ResourceAttributes.PROCESS_PID: os.getpid(),
  118. ResourceAttributes.DEPLOYMENT_ENVIRONMENT: f"{dify_config.DEPLOY_ENV}-{dify_config.EDITION}",
  119. ResourceAttributes.HOST_NAME: socket.gethostname(),
  120. ResourceAttributes.HOST_ARCH: platform.machine(),
  121. "custom.deployment.git_commit": dify_config.COMMIT_SHA,
  122. ResourceAttributes.HOST_ID: platform.node(),
  123. ResourceAttributes.OS_TYPE: platform.system().lower(),
  124. ResourceAttributes.OS_DESCRIPTION: platform.platform(),
  125. ResourceAttributes.OS_VERSION: platform.version(),
  126. }
  127. )
  128. sampler = ParentBasedTraceIdRatio(dify_config.OTEL_SAMPLING_RATE)
  129. provider = TracerProvider(resource=resource, sampler=sampler)
  130. set_tracer_provider(provider)
  131. exporter: Union[OTLPSpanExporter, ConsoleSpanExporter]
  132. metric_exporter: Union[OTLPMetricExporter, ConsoleMetricExporter]
  133. if dify_config.OTEL_EXPORTER_TYPE == "otlp":
  134. exporter = OTLPSpanExporter(
  135. endpoint=dify_config.OTLP_BASE_ENDPOINT + "/v1/traces",
  136. headers={"Authorization": f"Bearer {dify_config.OTLP_API_KEY}"},
  137. )
  138. metric_exporter = OTLPMetricExporter(
  139. endpoint=dify_config.OTLP_BASE_ENDPOINT + "/v1/metrics",
  140. headers={"Authorization": f"Bearer {dify_config.OTLP_API_KEY}"},
  141. )
  142. else:
  143. # Fallback to console exporter
  144. exporter = ConsoleSpanExporter()
  145. metric_exporter = ConsoleMetricExporter()
  146. provider.add_span_processor(
  147. BatchSpanProcessor(
  148. exporter,
  149. max_queue_size=dify_config.OTEL_MAX_QUEUE_SIZE,
  150. schedule_delay_millis=dify_config.OTEL_BATCH_EXPORT_SCHEDULE_DELAY,
  151. max_export_batch_size=dify_config.OTEL_MAX_EXPORT_BATCH_SIZE,
  152. export_timeout_millis=dify_config.OTEL_BATCH_EXPORT_TIMEOUT,
  153. )
  154. )
  155. reader = PeriodicExportingMetricReader(
  156. metric_exporter,
  157. export_interval_millis=dify_config.OTEL_METRIC_EXPORT_INTERVAL,
  158. export_timeout_millis=dify_config.OTEL_METRIC_EXPORT_TIMEOUT,
  159. )
  160. set_meter_provider(MeterProvider(resource=resource, metric_readers=[reader]))
  161. if not is_celery_worker():
  162. init_flask_instrumentor(app)
  163. CeleryInstrumentor(tracer_provider=get_tracer_provider(), meter_provider=get_meter_provider()).instrument()
  164. instrument_exception_logging()
  165. init_sqlalchemy_instrumentor(app)
  166. atexit.register(shutdown_tracer)
  167. def is_enabled():
  168. return dify_config.ENABLE_OTEL
  169. @worker_init.connect(weak=False)
  170. def init_celery_worker(*args, **kwargs):
  171. if dify_config.ENABLE_OTEL:
  172. from opentelemetry.instrumentation.celery import CeleryInstrumentor
  173. from opentelemetry.metrics import get_meter_provider
  174. from opentelemetry.trace import get_tracer_provider
  175. tracer_provider = get_tracer_provider()
  176. metric_provider = get_meter_provider()
  177. if dify_config.DEBUG:
  178. logging.info("Initializing OpenTelemetry for Celery worker")
  179. CeleryInstrumentor(tracer_provider=tracer_provider, meter_provider=metric_provider).instrument()