Procházet zdrojové kódy

Feat/add langsmith dotted order (#10856)

tags/0.12.0
Joe před 11 měsíci
rodič
revize
2dc29cfee3
Žádný účet není propojen s e-mailovou adresou tvůrce revize

+ 1
- 0
api/core/ops/langsmith_trace/entities/langsmith_trace_entity.py Zobrazit soubor

reference_example_id: Optional[str] = Field(None, description="Reference example ID associated with the run") reference_example_id: Optional[str] = Field(None, description="Reference example ID associated with the run")
input_attachments: Optional[dict[str, Any]] = Field(None, description="Input attachments of the run") input_attachments: Optional[dict[str, Any]] = Field(None, description="Input attachments of the run")
output_attachments: Optional[dict[str, Any]] = Field(None, description="Output attachments of the run") output_attachments: Optional[dict[str, Any]] = Field(None, description="Output attachments of the run")
dotted_order: Optional[str] = Field(None, description="Dotted order of the run")


@field_validator("inputs", "outputs") @field_validator("inputs", "outputs")
@classmethod @classmethod

+ 19
- 1
api/core/ops/langsmith_trace/langsmith_trace.py Zobrazit soubor

LangSmithRunType, LangSmithRunType,
LangSmithRunUpdateModel, LangSmithRunUpdateModel,
) )
from core.ops.utils import filter_none_values
from core.ops.utils import filter_none_values, generate_dotted_order
from extensions.ext_database import db from extensions.ext_database import db
from models.model import EndUser, MessageFile from models.model import EndUser, MessageFile
from models.workflow import WorkflowNodeExecution from models.workflow import WorkflowNodeExecution
self.generate_name_trace(trace_info) self.generate_name_trace(trace_info)


def workflow_trace(self, trace_info: WorkflowTraceInfo): def workflow_trace(self, trace_info: WorkflowTraceInfo):
trace_id = trace_info.message_id or trace_info.workflow_app_log_id or trace_info.workflow_run_id
message_dotted_order = (
generate_dotted_order(trace_info.message_id, trace_info.start_time) if trace_info.message_id else None
)
workflow_dotted_order = generate_dotted_order(
trace_info.workflow_app_log_id or trace_info.workflow_run_id,
trace_info.workflow_data.created_at,
message_dotted_order,
)

if trace_info.message_id: if trace_info.message_id:
message_run = LangSmithRunModel( message_run = LangSmithRunModel(
id=trace_info.message_id, id=trace_info.message_id,
}, },
tags=["message", "workflow"], tags=["message", "workflow"],
error=trace_info.error, error=trace_info.error,
trace_id=trace_id,
dotted_order=message_dotted_order,
) )
self.add_run(message_run) self.add_run(message_run)


error=trace_info.error, error=trace_info.error,
tags=["workflow"], tags=["workflow"],
parent_run_id=trace_info.message_id or None, parent_run_id=trace_info.message_id or None,
trace_id=trace_id,
dotted_order=workflow_dotted_order,
) )


self.add_run(langsmith_run) self.add_run(langsmith_run)
else: else:
run_type = LangSmithRunType.tool run_type = LangSmithRunType.tool


node_dotted_order = generate_dotted_order(node_execution_id, created_at, workflow_dotted_order)
langsmith_run = LangSmithRunModel( langsmith_run = LangSmithRunModel(
total_tokens=node_total_tokens, total_tokens=node_total_tokens,
name=node_type, name=node_type,
}, },
parent_run_id=trace_info.workflow_app_log_id or trace_info.workflow_run_id, parent_run_id=trace_info.workflow_app_log_id or trace_info.workflow_run_id,
tags=["node_execution"], tags=["node_execution"],
id=node_execution_id,
trace_id=trace_id,
dotted_order=node_dotted_order,
) )


self.add_run(langsmith_run) self.add_run(langsmith_run)

+ 17
- 0
api/core/ops/utils.py Zobrazit soubor

from contextlib import contextmanager from contextlib import contextmanager
from datetime import datetime from datetime import datetime
from typing import Optional, Union


from extensions.ext_database import db from extensions.ext_database import db
from models.model import Message from models.model import Message
return [replace_text_with_content(item) for item in data] return [replace_text_with_content(item) for item in data]
else: else:
return data return data


def generate_dotted_order(
run_id: str, start_time: Union[str, datetime], parent_dotted_order: Optional[str] = None
) -> str:
"""
generate dotted_order for langsmith
"""
start_time = datetime.fromisoformat(start_time) if isinstance(start_time, str) else start_time
timestamp = start_time.strftime("%Y%m%dT%H%M%S%f")[:-3] + "Z"
current_segment = f"{timestamp}{run_id}"

if parent_dotted_order is None:
return current_segment

return f"{parent_dotted_order}.{current_segment}"

Načítá se…
Zrušit
Uložit