| @@ -83,6 +83,7 @@ class LangFuseDataTrace(BaseTraceInstance): | |||
| metadata=metadata, | |||
| session_id=trace_info.conversation_id, | |||
| tags=["message", "workflow"], | |||
| version=trace_info.workflow_run_version, | |||
| ) | |||
| self.add_trace(langfuse_trace_data=trace_data) | |||
| workflow_span_data = LangfuseSpan( | |||
| @@ -108,6 +109,7 @@ class LangFuseDataTrace(BaseTraceInstance): | |||
| metadata=metadata, | |||
| session_id=trace_info.conversation_id, | |||
| tags=["workflow"], | |||
| version=trace_info.workflow_run_version, | |||
| ) | |||
| self.add_trace(langfuse_trace_data=trace_data) | |||
| @@ -172,37 +174,7 @@ class LangFuseDataTrace(BaseTraceInstance): | |||
| } | |||
| ) | |||
| # add span | |||
| if trace_info.message_id: | |||
| span_data = LangfuseSpan( | |||
| id=node_execution_id, | |||
| name=node_type, | |||
| input=inputs, | |||
| output=outputs, | |||
| trace_id=trace_id, | |||
| start_time=created_at, | |||
| end_time=finished_at, | |||
| metadata=metadata, | |||
| level=(LevelEnum.DEFAULT if status == "succeeded" else LevelEnum.ERROR), | |||
| status_message=trace_info.error or "", | |||
| parent_observation_id=trace_info.workflow_run_id, | |||
| ) | |||
| else: | |||
| span_data = LangfuseSpan( | |||
| id=node_execution_id, | |||
| name=node_type, | |||
| input=inputs, | |||
| output=outputs, | |||
| trace_id=trace_id, | |||
| start_time=created_at, | |||
| end_time=finished_at, | |||
| metadata=metadata, | |||
| level=(LevelEnum.DEFAULT if status == "succeeded" else LevelEnum.ERROR), | |||
| status_message=trace_info.error or "", | |||
| ) | |||
| self.add_span(langfuse_span_data=span_data) | |||
| # add generation span | |||
| if process_data and process_data.get("model_mode") == "chat": | |||
| total_token = metadata.get("total_tokens", 0) | |||
| prompt_tokens = 0 | |||
| @@ -226,10 +198,10 @@ class LangFuseDataTrace(BaseTraceInstance): | |||
| ) | |||
| node_generation_data = LangfuseGeneration( | |||
| name="llm", | |||
| id=node_execution_id, | |||
| name=node_name, | |||
| trace_id=trace_id, | |||
| model=process_data.get("model_name"), | |||
| parent_observation_id=node_execution_id, | |||
| start_time=created_at, | |||
| end_time=finished_at, | |||
| input=inputs, | |||
| @@ -237,11 +209,30 @@ class LangFuseDataTrace(BaseTraceInstance): | |||
| metadata=metadata, | |||
| level=(LevelEnum.DEFAULT if status == "succeeded" else LevelEnum.ERROR), | |||
| status_message=trace_info.error or "", | |||
| parent_observation_id=trace_info.workflow_run_id if trace_info.message_id else None, | |||
| usage=generation_usage, | |||
| ) | |||
| self.add_generation(langfuse_generation_data=node_generation_data) | |||
| # add normal span | |||
| else: | |||
| span_data = LangfuseSpan( | |||
| id=node_execution_id, | |||
| name=node_name, | |||
| input=inputs, | |||
| output=outputs, | |||
| trace_id=trace_id, | |||
| start_time=created_at, | |||
| end_time=finished_at, | |||
| metadata=metadata, | |||
| level=(LevelEnum.DEFAULT if status == "succeeded" else LevelEnum.ERROR), | |||
| status_message=trace_info.error or "", | |||
| parent_observation_id=trace_info.workflow_run_id if trace_info.message_id else None, | |||
| ) | |||
| self.add_span(langfuse_span_data=span_data) | |||
| def message_trace(self, trace_info: MessageTraceInfo, **kwargs): | |||
| # get message file data | |||
| file_list = trace_info.file_list | |||
| @@ -284,7 +275,7 @@ class LangFuseDataTrace(BaseTraceInstance): | |||
| ) | |||
| self.add_trace(langfuse_trace_data=trace_data) | |||
| # start add span | |||
| # add generation | |||
| generation_usage = GenerationUsage( | |||
| input=trace_info.message_tokens, | |||
| output=trace_info.answer_tokens, | |||