|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316 |
- import { useCallback } from 'react'
- import {
- useReactFlow,
- useStoreApi,
- } from 'reactflow'
- import produce from 'immer'
- import { useStore, useWorkflowStore } from '@/app/components/workflow/store'
- import { WorkflowRunningStatus } from '@/app/components/workflow/types'
- import { useWorkflowUpdate } from '@/app/components/workflow/hooks/use-workflow-interactions'
- import { useWorkflowRunEvent } from '@/app/components/workflow/hooks/use-workflow-run-event/use-workflow-run-event'
- import type { IOtherOptions } from '@/service/base'
- import { ssePost } from '@/service/base'
- import { stopWorkflowRun } from '@/service/workflow'
- import type { VersionHistory } from '@/types/workflow'
- import { useNodesSyncDraft } from './use-nodes-sync-draft'
- import { useSetWorkflowVarsWithValue } from '@/app/components/workflow/hooks/use-fetch-workflow-inspect-vars'
- import { useInvalidAllLastRun } from '@/service/use-workflow'
- import { FlowType } from '@/types/common'
-
- export const usePipelineRun = () => {
- const store = useStoreApi()
- const workflowStore = useWorkflowStore()
- const reactflow = useReactFlow()
- const { doSyncWorkflowDraft } = useNodesSyncDraft()
- const { handleUpdateWorkflowCanvas } = useWorkflowUpdate()
-
- const {
- handleWorkflowStarted,
- handleWorkflowFinished,
- handleWorkflowFailed,
- handleWorkflowNodeStarted,
- handleWorkflowNodeFinished,
- handleWorkflowNodeIterationStarted,
- handleWorkflowNodeIterationNext,
- handleWorkflowNodeIterationFinished,
- handleWorkflowNodeLoopStarted,
- handleWorkflowNodeLoopNext,
- handleWorkflowNodeLoopFinished,
- handleWorkflowNodeRetry,
- handleWorkflowAgentLog,
- handleWorkflowTextChunk,
- handleWorkflowTextReplace,
- } = useWorkflowRunEvent()
-
- const handleBackupDraft = useCallback(() => {
- const {
- getNodes,
- edges,
- } = store.getState()
- const { getViewport } = reactflow
- const {
- backupDraft,
- setBackupDraft,
- environmentVariables,
- } = workflowStore.getState()
-
- if (!backupDraft) {
- setBackupDraft({
- nodes: getNodes(),
- edges,
- viewport: getViewport(),
- environmentVariables,
- })
- doSyncWorkflowDraft()
- }
- }, [reactflow, workflowStore, store, doSyncWorkflowDraft])
-
- const handleLoadBackupDraft = useCallback(() => {
- const {
- backupDraft,
- setBackupDraft,
- setEnvironmentVariables,
- } = workflowStore.getState()
-
- if (backupDraft) {
- const {
- nodes,
- edges,
- viewport,
- environmentVariables,
- } = backupDraft
- handleUpdateWorkflowCanvas({
- nodes,
- edges,
- viewport,
- })
- setEnvironmentVariables(environmentVariables)
- setBackupDraft(undefined)
- }
- }, [handleUpdateWorkflowCanvas, workflowStore])
-
- const pipelineId = useStore(s => s.pipelineId)
- const invalidAllLastRun = useInvalidAllLastRun(FlowType.ragPipeline, pipelineId)
- const { fetchInspectVars } = useSetWorkflowVarsWithValue({
- flowType: FlowType.ragPipeline,
- flowId: pipelineId!,
- })
-
- const handleRun = useCallback(async (
- params: any,
- callback?: IOtherOptions,
- ) => {
- const {
- getNodes,
- setNodes,
- } = store.getState()
- const newNodes = produce(getNodes(), (draft) => {
- draft.forEach((node) => {
- node.data.selected = false
- node.data._runningStatus = undefined
- })
- })
- setNodes(newNodes)
- await doSyncWorkflowDraft()
-
- const {
- onWorkflowStarted,
- onWorkflowFinished,
- onNodeStarted,
- onNodeFinished,
- onIterationStart,
- onIterationNext,
- onIterationFinish,
- onLoopStart,
- onLoopNext,
- onLoopFinish,
- onNodeRetry,
- onAgentLog,
- onError,
- ...restCallback
- } = callback || {}
- const { pipelineId } = workflowStore.getState()
- workflowStore.setState({ historyWorkflowData: undefined })
- const workflowContainer = document.getElementById('workflow-container')
-
- const {
- clientWidth,
- clientHeight,
- } = workflowContainer!
-
- const url = `/rag/pipelines/${pipelineId}/workflows/draft/run`
-
- const {
- setWorkflowRunningData,
- } = workflowStore.getState()
- setWorkflowRunningData({
- result: {
- status: WorkflowRunningStatus.Running,
- },
- tracing: [],
- resultText: '',
- })
-
- ssePost(
- url,
- {
- body: params,
- },
- {
- onWorkflowStarted: (params) => {
- handleWorkflowStarted(params)
-
- if (onWorkflowStarted)
- onWorkflowStarted(params)
- },
- onWorkflowFinished: (params) => {
- handleWorkflowFinished(params)
- fetchInspectVars({})
- invalidAllLastRun()
-
- if (onWorkflowFinished)
- onWorkflowFinished(params)
- },
- onError: (params) => {
- handleWorkflowFailed()
-
- if (onError)
- onError(params)
- },
- onNodeStarted: (params) => {
- handleWorkflowNodeStarted(
- params,
- {
- clientWidth,
- clientHeight,
- },
- )
-
- if (onNodeStarted)
- onNodeStarted(params)
- },
- onNodeFinished: (params) => {
- handleWorkflowNodeFinished(params)
-
- if (onNodeFinished)
- onNodeFinished(params)
- },
- onIterationStart: (params) => {
- handleWorkflowNodeIterationStarted(
- params,
- {
- clientWidth,
- clientHeight,
- },
- )
-
- if (onIterationStart)
- onIterationStart(params)
- },
- onIterationNext: (params) => {
- handleWorkflowNodeIterationNext(params)
-
- if (onIterationNext)
- onIterationNext(params)
- },
- onIterationFinish: (params) => {
- handleWorkflowNodeIterationFinished(params)
-
- if (onIterationFinish)
- onIterationFinish(params)
- },
- onLoopStart: (params) => {
- handleWorkflowNodeLoopStarted(
- params,
- {
- clientWidth,
- clientHeight,
- },
- )
-
- if (onLoopStart)
- onLoopStart(params)
- },
- onLoopNext: (params) => {
- handleWorkflowNodeLoopNext(params)
-
- if (onLoopNext)
- onLoopNext(params)
- },
- onLoopFinish: (params) => {
- handleWorkflowNodeLoopFinished(params)
-
- if (onLoopFinish)
- onLoopFinish(params)
- },
- onNodeRetry: (params) => {
- handleWorkflowNodeRetry(params)
-
- if (onNodeRetry)
- onNodeRetry(params)
- },
- onAgentLog: (params) => {
- handleWorkflowAgentLog(params)
-
- if (onAgentLog)
- onAgentLog(params)
- },
- onTextChunk: (params) => {
- handleWorkflowTextChunk(params)
- },
- onTextReplace: (params) => {
- handleWorkflowTextReplace(params)
- },
- ...restCallback,
- },
- )
- }, [
- store,
- workflowStore,
- doSyncWorkflowDraft,
- handleWorkflowStarted,
- handleWorkflowFinished,
- handleWorkflowFailed,
- handleWorkflowNodeStarted,
- handleWorkflowNodeFinished,
- handleWorkflowNodeIterationStarted,
- handleWorkflowNodeIterationNext,
- handleWorkflowNodeIterationFinished,
- handleWorkflowNodeLoopStarted,
- handleWorkflowNodeLoopNext,
- handleWorkflowNodeLoopFinished,
- handleWorkflowNodeRetry,
- handleWorkflowTextChunk,
- handleWorkflowTextReplace,
- handleWorkflowAgentLog,
- ],
- )
-
- const handleStopRun = useCallback((taskId: string) => {
- const { pipelineId } = workflowStore.getState()
-
- stopWorkflowRun(`/rag/pipelines/${pipelineId}/workflow-runs/tasks/${taskId}/stop`)
- }, [workflowStore])
-
- const handleRestoreFromPublishedWorkflow = useCallback((publishedWorkflow: VersionHistory) => {
- const nodes = publishedWorkflow.graph.nodes.map(node => ({ ...node, selected: false, data: { ...node.data, selected: false } }))
- const edges = publishedWorkflow.graph.edges
- const viewport = publishedWorkflow.graph.viewport!
- handleUpdateWorkflowCanvas({
- nodes,
- edges,
- viewport,
- })
-
- workflowStore.getState().setEnvironmentVariables(publishedWorkflow.environment_variables || [])
- workflowStore.getState().setRagPipelineVariables?.(publishedWorkflow.rag_pipeline_variables || [])
- }, [handleUpdateWorkflowCanvas, workflowStore])
-
- return {
- handleBackupDraft,
- handleLoadBackupDraft,
- handleRun,
- handleStopRun,
- handleRestoreFromPublishedWorkflow,
- }
- }
|