選択できるのは25トピックまでです。 トピックは、先頭が英数字で、英数字とダッシュ('-')を使用した35文字以内のものにしてください。

ext_otel.py 9.2KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215
  1. import atexit
  2. import logging
  3. import os
  4. import platform
  5. import socket
  6. import sys
  7. from typing import Union
  8. import flask
  9. from celery.signals import worker_init # type: ignore
  10. from flask_login import user_loaded_from_request, user_logged_in # type: ignore
  11. from configs import dify_config
  12. from dify_app import DifyApp
  13. @user_logged_in.connect
  14. @user_loaded_from_request.connect
  15. def on_user_loaded(_sender, user):
  16. if dify_config.ENABLE_OTEL:
  17. from opentelemetry.trace import get_current_span
  18. if user:
  19. current_span = get_current_span()
  20. if current_span:
  21. current_span.set_attribute("service.tenant.id", user.current_tenant_id)
  22. current_span.set_attribute("service.user.id", user.id)
  23. def init_app(app: DifyApp):
  24. from opentelemetry.semconv.trace import SpanAttributes
  25. def is_celery_worker():
  26. return "celery" in sys.argv[0].lower()
  27. def instrument_exception_logging():
  28. exception_handler = ExceptionLoggingHandler()
  29. logging.getLogger().addHandler(exception_handler)
  30. def init_flask_instrumentor(app: DifyApp):
  31. meter = get_meter("http_metrics", version=dify_config.CURRENT_VERSION)
  32. _http_response_counter = meter.create_counter(
  33. "http.server.response.count",
  34. description="Total number of HTTP responses by status code, method and target",
  35. unit="{response}",
  36. )
  37. def response_hook(span: Span, status: str, response_headers: list):
  38. if span and span.is_recording():
  39. if status.startswith("2"):
  40. span.set_status(StatusCode.OK)
  41. else:
  42. span.set_status(StatusCode.ERROR, status)
  43. status = status.split(" ")[0]
  44. status_code = int(status)
  45. status_class = f"{status_code // 100}xx"
  46. attributes: dict[str, str | int] = {"status_code": status_code, "status_class": status_class}
  47. request = flask.request
  48. if request and request.url_rule:
  49. attributes[SpanAttributes.HTTP_TARGET] = str(request.url_rule.rule)
  50. if request and request.method:
  51. attributes[SpanAttributes.HTTP_METHOD] = str(request.method)
  52. _http_response_counter.add(1, attributes)
  53. instrumentor = FlaskInstrumentor()
  54. if dify_config.DEBUG:
  55. logging.info("Initializing Flask instrumentor")
  56. instrumentor.instrument_app(app, response_hook=response_hook)
  57. def init_sqlalchemy_instrumentor(app: DifyApp):
  58. with app.app_context():
  59. engines = list(app.extensions["sqlalchemy"].engines.values())
  60. SQLAlchemyInstrumentor().instrument(enable_commenter=True, engines=engines)
  61. def setup_context_propagation():
  62. # Configure propagators
  63. set_global_textmap(
  64. CompositePropagator(
  65. [
  66. TraceContextTextMapPropagator(), # W3C trace context
  67. B3Format(), # B3 propagation (used by many systems)
  68. ]
  69. )
  70. )
  71. def shutdown_tracer():
  72. provider = trace.get_tracer_provider()
  73. if hasattr(provider, "force_flush"):
  74. provider.force_flush()
  75. class ExceptionLoggingHandler(logging.Handler):
  76. """Custom logging handler that creates spans for logging.exception() calls"""
  77. def emit(self, record):
  78. try:
  79. if record.exc_info:
  80. tracer = get_tracer_provider().get_tracer("dify.exception.logging")
  81. with tracer.start_as_current_span(
  82. "log.exception",
  83. attributes={
  84. "log.level": record.levelname,
  85. "log.message": record.getMessage(),
  86. "log.logger": record.name,
  87. "log.file.path": record.pathname,
  88. "log.file.line": record.lineno,
  89. },
  90. ) as span:
  91. span.set_status(StatusCode.ERROR)
  92. span.record_exception(record.exc_info[1])
  93. span.set_attribute("exception.type", record.exc_info[0].__name__)
  94. span.set_attribute("exception.message", str(record.exc_info[1]))
  95. except Exception:
  96. pass
  97. from opentelemetry import trace
  98. from opentelemetry.exporter.otlp.proto.http.metric_exporter import OTLPMetricExporter
  99. from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
  100. from opentelemetry.instrumentation.celery import CeleryInstrumentor
  101. from opentelemetry.instrumentation.flask import FlaskInstrumentor
  102. from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
  103. from opentelemetry.metrics import get_meter, get_meter_provider, set_meter_provider
  104. from opentelemetry.propagate import set_global_textmap
  105. from opentelemetry.propagators.b3 import B3Format
  106. from opentelemetry.propagators.composite import CompositePropagator
  107. from opentelemetry.sdk.metrics import MeterProvider
  108. from opentelemetry.sdk.metrics.export import ConsoleMetricExporter, PeriodicExportingMetricReader
  109. from opentelemetry.sdk.resources import Resource
  110. from opentelemetry.sdk.trace import TracerProvider
  111. from opentelemetry.sdk.trace.export import (
  112. BatchSpanProcessor,
  113. ConsoleSpanExporter,
  114. )
  115. from opentelemetry.sdk.trace.sampling import ParentBasedTraceIdRatio
  116. from opentelemetry.semconv.resource import ResourceAttributes
  117. from opentelemetry.trace import Span, get_tracer_provider, set_tracer_provider
  118. from opentelemetry.trace.propagation.tracecontext import TraceContextTextMapPropagator
  119. from opentelemetry.trace.status import StatusCode
  120. setup_context_propagation()
  121. # Initialize OpenTelemetry
  122. # Follow Semantic Convertions 1.32.0 to define resource attributes
  123. resource = Resource(
  124. attributes={
  125. ResourceAttributes.SERVICE_NAME: dify_config.APPLICATION_NAME,
  126. ResourceAttributes.SERVICE_VERSION: f"dify-{dify_config.CURRENT_VERSION}-{dify_config.COMMIT_SHA}",
  127. ResourceAttributes.PROCESS_PID: os.getpid(),
  128. ResourceAttributes.DEPLOYMENT_ENVIRONMENT: f"{dify_config.DEPLOY_ENV}-{dify_config.EDITION}",
  129. ResourceAttributes.HOST_NAME: socket.gethostname(),
  130. ResourceAttributes.HOST_ARCH: platform.machine(),
  131. "custom.deployment.git_commit": dify_config.COMMIT_SHA,
  132. ResourceAttributes.HOST_ID: platform.node(),
  133. ResourceAttributes.OS_TYPE: platform.system().lower(),
  134. ResourceAttributes.OS_DESCRIPTION: platform.platform(),
  135. ResourceAttributes.OS_VERSION: platform.version(),
  136. }
  137. )
  138. sampler = ParentBasedTraceIdRatio(dify_config.OTEL_SAMPLING_RATE)
  139. provider = TracerProvider(resource=resource, sampler=sampler)
  140. set_tracer_provider(provider)
  141. exporter: Union[OTLPSpanExporter, ConsoleSpanExporter]
  142. metric_exporter: Union[OTLPMetricExporter, ConsoleMetricExporter]
  143. if dify_config.OTEL_EXPORTER_TYPE == "otlp":
  144. exporter = OTLPSpanExporter(
  145. endpoint=dify_config.OTLP_BASE_ENDPOINT + "/v1/traces",
  146. headers={"Authorization": f"Bearer {dify_config.OTLP_API_KEY}"},
  147. )
  148. metric_exporter = OTLPMetricExporter(
  149. endpoint=dify_config.OTLP_BASE_ENDPOINT + "/v1/metrics",
  150. headers={"Authorization": f"Bearer {dify_config.OTLP_API_KEY}"},
  151. )
  152. else:
  153. # Fallback to console exporter
  154. exporter = ConsoleSpanExporter()
  155. metric_exporter = ConsoleMetricExporter()
  156. provider.add_span_processor(
  157. BatchSpanProcessor(
  158. exporter,
  159. max_queue_size=dify_config.OTEL_MAX_QUEUE_SIZE,
  160. schedule_delay_millis=dify_config.OTEL_BATCH_EXPORT_SCHEDULE_DELAY,
  161. max_export_batch_size=dify_config.OTEL_MAX_EXPORT_BATCH_SIZE,
  162. export_timeout_millis=dify_config.OTEL_BATCH_EXPORT_TIMEOUT,
  163. )
  164. )
  165. reader = PeriodicExportingMetricReader(
  166. metric_exporter,
  167. export_interval_millis=dify_config.OTEL_METRIC_EXPORT_INTERVAL,
  168. export_timeout_millis=dify_config.OTEL_METRIC_EXPORT_TIMEOUT,
  169. )
  170. set_meter_provider(MeterProvider(resource=resource, metric_readers=[reader]))
  171. if not is_celery_worker():
  172. init_flask_instrumentor(app)
  173. CeleryInstrumentor(tracer_provider=get_tracer_provider(), meter_provider=get_meter_provider()).instrument()
  174. instrument_exception_logging()
  175. init_sqlalchemy_instrumentor(app)
  176. atexit.register(shutdown_tracer)
  177. def is_enabled():
  178. return dify_config.ENABLE_OTEL
  179. @worker_init.connect(weak=False)
  180. def init_celery_worker(*args, **kwargs):
  181. if dify_config.ENABLE_OTEL:
  182. from opentelemetry.instrumentation.celery import CeleryInstrumentor
  183. from opentelemetry.metrics import get_meter_provider
  184. from opentelemetry.trace import get_tracer_provider
  185. tracer_provider = get_tracer_provider()
  186. metric_provider = get_meter_provider()
  187. if dify_config.DEBUG:
  188. logging.info("Initializing OpenTelemetry for Celery worker")
  189. CeleryInstrumentor(tracer_provider=tracer_provider, meter_provider=metric_provider).instrument()