Browse Source

Chore/remove add node restrict of workflow (#26218)

Co-authored-by: -LAN- <laipz8200@outlook.com>
tags/1.9.1
zxhlyh 1 month ago
parent
commit
915023b809
No account linked to committer's email address

+ 0
- 1
api/.env.example View File

@@ -468,7 +468,6 @@ INDEXING_MAX_SEGMENTATION_TOKENS_LENGTH=4000
WORKFLOW_MAX_EXECUTION_STEPS=500
WORKFLOW_MAX_EXECUTION_TIME=1200
WORKFLOW_CALL_MAX_DEPTH=5
WORKFLOW_PARALLEL_DEPTH_LIMIT=3
MAX_VARIABLE_SIZE=204800

# GraphEngine Worker Pool Configuration

+ 0
- 5
api/configs/feature/__init__.py View File

@@ -577,11 +577,6 @@ class WorkflowConfig(BaseSettings):
default=5,
)

WORKFLOW_PARALLEL_DEPTH_LIMIT: PositiveInt = Field(
description="Maximum allowed depth for nested parallel executions",
default=3,
)

MAX_VARIABLE_SIZE: PositiveInt = Field(
description="Maximum size in bytes for a single variable in workflows. Default to 200 KB.",
default=200 * 1024,

+ 0
- 19
api/controllers/console/app/workflow.py View File

@@ -9,7 +9,6 @@ from sqlalchemy.orm import Session
from werkzeug.exceptions import Forbidden, InternalServerError, NotFound

import services
from configs import dify_config
from controllers.console import api, console_ns
from controllers.console.app.error import ConversationCompletedError, DraftWorkflowNotExist, DraftWorkflowNotSync
from controllers.console.app.wraps import get_app_model
@@ -797,24 +796,6 @@ class ConvertToWorkflowApi(Resource):
}


@console_ns.route("/apps/<uuid:app_id>/workflows/draft/config")
class WorkflowConfigApi(Resource):
"""Resource for workflow configuration."""

@api.doc("get_workflow_config")
@api.doc(description="Get workflow configuration")
@api.doc(params={"app_id": "Application ID"})
@api.response(200, "Workflow configuration retrieved successfully")
@setup_required
@login_required
@account_initialization_required
@get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
def get(self, app_model: App):
return {
"parallel_depth_limit": dify_config.WORKFLOW_PARALLEL_DEPTH_LIMIT,
}


@console_ns.route("/apps/<uuid:app_id>/workflows")
class PublishedAllWorkflowApi(Resource):
@api.doc("get_all_published_workflows")

+ 0
- 17
api/controllers/console/datasets/rag_pipeline/rag_pipeline_workflow.py View File

@@ -9,7 +9,6 @@ from sqlalchemy.orm import Session
from werkzeug.exceptions import Forbidden, InternalServerError, NotFound

import services
from configs import dify_config
from controllers.console import api
from controllers.console.app.error import (
ConversationCompletedError,
@@ -609,18 +608,6 @@ class DefaultRagPipelineBlockConfigApi(Resource):
return rag_pipeline_service.get_default_block_config(node_type=block_type, filters=filters)


class RagPipelineConfigApi(Resource):
"""Resource for rag pipeline configuration."""

@setup_required
@login_required
@account_initialization_required
def get(self, pipeline_id):
return {
"parallel_depth_limit": dify_config.WORKFLOW_PARALLEL_DEPTH_LIMIT,
}


class PublishedAllRagPipelineApi(Resource):
@setup_required
@login_required
@@ -985,10 +972,6 @@ api.add_resource(
DraftRagPipelineApi,
"/rag/pipelines/<uuid:pipeline_id>/workflows/draft",
)
api.add_resource(
RagPipelineConfigApi,
"/rag/pipelines/<uuid:pipeline_id>/workflows/draft/config",
)
api.add_resource(
DraftRagPipelineRunApi,
"/rag/pipelines/<uuid:pipeline_id>/workflows/draft/run",

+ 0
- 1
api/tests/integration_tests/.env.example View File

@@ -167,7 +167,6 @@ INDEXING_MAX_SEGMENTATION_TOKENS_LENGTH=4000
WORKFLOW_MAX_EXECUTION_STEPS=500
WORKFLOW_MAX_EXECUTION_TIME=1200
WORKFLOW_CALL_MAX_DEPTH=5
WORKFLOW_PARALLEL_DEPTH_LIMIT=3
MAX_VARIABLE_SIZE=204800

# App configuration

+ 0
- 2
api/tests/unit_tests/configs/test_dify_config.py View File

@@ -40,8 +40,6 @@ def test_dify_config(monkeypatch: pytest.MonkeyPatch):
# annotated field with configured value
assert config.HTTP_REQUEST_MAX_WRITE_TIMEOUT == 30

assert config.WORKFLOW_PARALLEL_DEPTH_LIMIT == 3

# values from pyproject.toml
assert Version(config.project.version) >= Version("1.0.0")


+ 0
- 1
docker/.env.example View File

@@ -881,7 +881,6 @@ WORKFLOW_MAX_EXECUTION_STEPS=500
WORKFLOW_MAX_EXECUTION_TIME=1200
WORKFLOW_CALL_MAX_DEPTH=5
MAX_VARIABLE_SIZE=204800
WORKFLOW_PARALLEL_DEPTH_LIMIT=3
WORKFLOW_FILE_UPLOAD_LIMIT=10

# GraphEngine Worker Pool Configuration

+ 0
- 1
docker/docker-compose.yaml View File

@@ -402,7 +402,6 @@ x-shared-env: &shared-api-worker-env
WORKFLOW_MAX_EXECUTION_TIME: ${WORKFLOW_MAX_EXECUTION_TIME:-1200}
WORKFLOW_CALL_MAX_DEPTH: ${WORKFLOW_CALL_MAX_DEPTH:-5}
MAX_VARIABLE_SIZE: ${MAX_VARIABLE_SIZE:-204800}
WORKFLOW_PARALLEL_DEPTH_LIMIT: ${WORKFLOW_PARALLEL_DEPTH_LIMIT:-3}
WORKFLOW_FILE_UPLOAD_LIMIT: ${WORKFLOW_FILE_UPLOAD_LIMIT:-10}
GRAPH_ENGINE_MIN_WORKERS: ${GRAPH_ENGINE_MIN_WORKERS:-1}
GRAPH_ENGINE_MAX_WORKERS: ${GRAPH_ENGINE_MAX_WORKERS:-10}

+ 0
- 10
web/app/components/rag-pipeline/hooks/use-pipeline-config.ts View File

@@ -14,16 +14,6 @@ export const usePipelineConfig = () => {
const pipelineId = useStore(s => s.pipelineId)
const workflowStore = useWorkflowStore()

const handleUpdateWorkflowConfig = useCallback((config: Record<string, any>) => {
const { setWorkflowConfig } = workflowStore.getState()

setWorkflowConfig(config)
}, [workflowStore])
useWorkflowConfig(
pipelineId ? `/rag/pipelines/${pipelineId}/workflows/draft/config` : '',
handleUpdateWorkflowConfig,
)

const handleUpdateNodesDefaultConfigs = useCallback((nodesDefaultConfigs: Record<string, any> | Record<string, any>[]) => {
const { setNodesDefaultConfigs } = workflowStore.getState()
let res: Record<string, any> = {}

+ 0
- 7
web/app/components/workflow-app/hooks/use-workflow-init.ts View File

@@ -33,13 +33,6 @@ export const useWorkflowInit = () => {
workflowStore.setState({ appId: appDetail.id, appName: appDetail.name })
}, [appDetail.id, workflowStore])

const handleUpdateWorkflowConfig = useCallback((config: Record<string, any>) => {
const { setWorkflowConfig } = workflowStore.getState()

setWorkflowConfig(config)
}, [workflowStore])
useWorkflowConfig(`/apps/${appDetail.id}/workflows/draft/config`, handleUpdateWorkflowConfig)

const handleUpdateWorkflowFileUploadConfig = useCallback((config: FileUploadConfigResponse) => {
const { setFileUploadConfig } = workflowStore.getState()
setFileUploadConfig(config)

+ 0
- 2
web/app/components/workflow/constants.ts View File

@@ -35,8 +35,6 @@ export const NODE_LAYOUT_HORIZONTAL_PADDING = 60
export const NODE_LAYOUT_VERTICAL_PADDING = 60
export const NODE_LAYOUT_MIN_DISTANCE = 100

export const PARALLEL_DEPTH_LIMIT = 3

export const RETRIEVAL_OUTPUT_STRUCT = `{
"content": "",
"title": "",

+ 12
- 33
web/app/components/workflow/hooks/use-nodes-interactions.ts View File

@@ -70,7 +70,7 @@ export const useNodesInteractions = () => {
const reactflow = useReactFlow()
const { store: workflowHistoryStore } = useWorkflowHistoryStore()
const { handleSyncWorkflowDraft } = useNodesSyncDraft()
const { checkNestedParallelLimit, getAfterNodesInSameBranch } = useWorkflow()
const { getAfterNodesInSameBranch } = useWorkflow()
const { getNodesReadOnly } = useNodesReadOnly()
const { getWorkflowReadOnly } = useWorkflowReadOnly()
const { handleSetHelpline } = useHelpline()
@@ -436,21 +436,13 @@ export const useNodesInteractions = () => {
draft.push(newEdge)
})

if (checkNestedParallelLimit(newNodes, newEdges, targetNode)) {
setNodes(newNodes)
setEdges(newEdges)
setNodes(newNodes)
setEdges(newEdges)

handleSyncWorkflowDraft()
saveStateToHistory(WorkflowHistoryEvent.NodeConnect, {
nodeId: targetNode?.id,
})
}
else {
const { setConnectingNodePayload, setEnteringNodePayload }
= workflowStore.getState()
setConnectingNodePayload(undefined)
setEnteringNodePayload(undefined)
}
handleSyncWorkflowDraft()
saveStateToHistory(WorkflowHistoryEvent.NodeConnect, {
nodeId: targetNode?.id,
})
},
[
getNodesReadOnly,
@@ -458,7 +450,6 @@ export const useNodesInteractions = () => {
workflowStore,
handleSyncWorkflowDraft,
saveStateToHistory,
checkNestedParallelLimit,
],
)

@@ -934,13 +925,8 @@ export const useNodesInteractions = () => {
if (newEdge) draft.push(newEdge)
})

if (checkNestedParallelLimit(newNodes, newEdges, prevNode)) {
setNodes(newNodes)
setEdges(newEdges)
}
else {
return false
}
setNodes(newNodes)
setEdges(newEdges)
}
if (!prevNodeId && nextNodeId) {
const nextNodeIndex = nodes.findIndex(node => node.id === nextNodeId)
@@ -1087,17 +1073,11 @@ export const useNodesInteractions = () => {
draft.push(newEdge)
})

if (checkNestedParallelLimit(newNodes, newEdges, nextNode)) {
setNodes(newNodes)
setEdges(newEdges)
}
else {
return false
}
setNodes(newNodes)
setEdges(newEdges)
}
else {
if (checkNestedParallelLimit(newNodes, edges)) setNodes(newNodes)
else return false
setNodes(newNodes)
}
}
if (prevNodeId && nextNodeId) {
@@ -1297,7 +1277,6 @@ export const useNodesInteractions = () => {
saveStateToHistory,
workflowStore,
getAfterNodesInSameBranch,
checkNestedParallelLimit,
nodesMetaDataMap,
],
)

+ 1
- 53
web/app/components/workflow/hooks/use-workflow.ts View File

@@ -2,7 +2,6 @@ import {
useCallback,
} from 'react'
import { uniqBy } from 'lodash-es'
import { useTranslation } from 'react-i18next'
import {
getIncomers,
getOutgoers,
@@ -24,9 +23,7 @@ import {
useStore,
useWorkflowStore,
} from '../store'
import { getParallelInfo } from '../utils'
import {
PARALLEL_DEPTH_LIMIT,
SUPPORT_OUTPUT_VARS_NODE,
} from '../constants'
import type { IterationNodeType } from '../nodes/iteration/types'
@@ -44,7 +41,6 @@ import {
import { CUSTOM_ITERATION_START_NODE } from '@/app/components/workflow/nodes/iteration-start/constants'
import { CUSTOM_LOOP_START_NODE } from '@/app/components/workflow/nodes/loop-start/constants'
import { basePath } from '@/utils/var'
import { MAX_PARALLEL_LIMIT } from '@/config'
import { useNodesMetaData } from '.'

export const useIsChatMode = () => {
@@ -54,9 +50,7 @@ export const useIsChatMode = () => {
}

export const useWorkflow = () => {
const { t } = useTranslation()
const store = useStoreApi()
const workflowStore = useWorkflowStore()
const { getAvailableBlocks } = useAvailableBlocks()
const { nodesMap } = useNodesMetaData()

@@ -290,20 +284,6 @@ export const useWorkflow = () => {
return isUsed
}, [isVarUsedInNodes])

const checkParallelLimit = useCallback((nodeId: string, nodeHandle = 'source') => {
const {
edges,
} = store.getState()
const connectedEdges = edges.filter(edge => edge.source === nodeId && edge.sourceHandle === nodeHandle)
if (connectedEdges.length > MAX_PARALLEL_LIMIT - 1) {
const { setShowTips } = workflowStore.getState()
setShowTips(t('workflow.common.parallelTip.limit', { num: MAX_PARALLEL_LIMIT }))
return false
}

return true
}, [store, workflowStore, t])

const getRootNodesById = useCallback((nodeId: string) => {
const {
getNodes,
@@ -374,33 +354,6 @@ export const useWorkflow = () => {
return startNodes
}, [nodesMap, getRootNodesById])

const checkNestedParallelLimit = useCallback((nodes: Node[], edges: Edge[], targetNode?: Node) => {
const startNodes = getStartNodes(nodes, targetNode)

for (let i = 0; i < startNodes.length; i++) {
const {
parallelList,
hasAbnormalEdges,
} = getParallelInfo(startNodes[i], nodes, edges)
const { workflowConfig } = workflowStore.getState()

if (hasAbnormalEdges)
return false

for (let i = 0; i < parallelList.length; i++) {
const parallel = parallelList[i]

if (parallel.depth > (workflowConfig?.parallel_depth_limit || PARALLEL_DEPTH_LIMIT)) {
const { setShowTips } = workflowStore.getState()
setShowTips(t('workflow.common.parallelTip.depthLimit', { num: (workflowConfig?.parallel_depth_limit || PARALLEL_DEPTH_LIMIT) }))
return false
}
}
}

return true
}, [t, workflowStore, getStartNodes])

const isValidConnection = useCallback(({ source, sourceHandle, target }: Connection) => {
const {
edges,
@@ -410,9 +363,6 @@ export const useWorkflow = () => {
const sourceNode: Node = nodes.find(node => node.id === source)!
const targetNode: Node = nodes.find(node => node.id === target)!

if (!checkParallelLimit(source!, sourceHandle || 'source'))
return false

if (sourceNode.type === CUSTOM_NOTE_NODE || targetNode.type === CUSTOM_NOTE_NODE)
return false

@@ -445,7 +395,7 @@ export const useWorkflow = () => {
}

return !hasCycle(targetNode)
}, [store, checkParallelLimit, getAvailableBlocks])
}, [store, getAvailableBlocks])

return {
getNodeById,
@@ -457,8 +407,6 @@ export const useWorkflow = () => {
isVarUsedInNodes,
removeUsedVarInNodes,
isNodeVarsUsedInNodes,
checkParallelLimit,
checkNestedParallelLimit,
isValidConnection,
getBeforeNodeById,
getIterationNodeChildren,

+ 0
- 2
web/app/components/workflow/index.tsx View File

@@ -71,7 +71,6 @@ import PanelContextmenu from './panel-contextmenu'
import NodeContextmenu from './node-contextmenu'
import SelectionContextmenu from './selection-contextmenu'
import SyncingDataModal from './syncing-data-modal'
import LimitTips from './limit-tips'
import { setupScrollToNodeListener } from './utils/node-navigation'
import {
useStore,
@@ -378,7 +377,6 @@ export const Workflow: FC<WorkflowProps> = memo(({
/>
)
}
<LimitTips />
{children}
<ReactFlow
nodeTypes={nodeTypes}

+ 0
- 39
web/app/components/workflow/limit-tips.tsx View File

@@ -1,39 +0,0 @@
import {
RiAlertFill,
RiCloseLine,
} from '@remixicon/react'
import { useStore } from './store'
import ActionButton from '@/app/components/base/action-button'

const LimitTips = () => {
const showTips = useStore(s => s.showTips)
const setShowTips = useStore(s => s.setShowTips)

if (!showTips)
return null

return (
<div className='absolute bottom-16 left-1/2 z-[9] flex h-10 -translate-x-1/2 items-center rounded-xl border border-components-panel-border bg-components-panel-bg-blur p-2 shadow-md'>
<div
className='absolute inset-0 rounded-xl opacity-[0.4]'
style={{
background: 'linear-gradient(92deg, rgba(247, 144, 9, 0.25) 0%, rgba(255, 255, 255, 0.00) 100%)',
}}
></div>
<div className='flex h-5 w-5 items-center justify-center'>
<RiAlertFill className='h-4 w-4 text-text-warning-secondary' />
</div>
<div className='system-xs-medium mx-1 px-1 text-text-primary'>
{showTips}
</div>
<ActionButton
className='z-[1]'
onClick={() => setShowTips('')}
>
<RiCloseLine className='h-4 w-4' />
</ActionButton>
</div>
)
}

export default LimitTips

+ 2
- 7
web/app/components/workflow/nodes/_base/components/next-step/add.tsx View File

@@ -12,7 +12,6 @@ import {
useAvailableBlocks,
useNodesInteractions,
useNodesReadOnly,
useWorkflow,
} from '@/app/components/workflow/hooks'
import BlockSelector from '@/app/components/workflow/block-selector'
import type {
@@ -39,7 +38,6 @@ const Add = ({
const { handleNodeAdd } = useNodesInteractions()
const { nodesReadOnly } = useNodesReadOnly()
const { availableNextBlocks } = useAvailableBlocks(nodeData.type, nodeData.isInIteration || nodeData.isInLoop)
const { checkParallelLimit } = useWorkflow()

const handleSelect = useCallback<OnSelectBlock>((type, toolDefaultValue) => {
handleNodeAdd(
@@ -52,14 +50,11 @@ const Add = ({
prevNodeSourceHandle: sourceHandle,
},
)
}, [nodeId, sourceHandle, handleNodeAdd])
}, [handleNodeAdd])

const handleOpenChange = useCallback((newOpen: boolean) => {
if (newOpen && !checkParallelLimit(nodeId, sourceHandle))
return

setOpen(newOpen)
}, [checkParallelLimit, nodeId, sourceHandle])
}, [])

const tip = useMemo(() => {
if (isFailBranch)

+ 2
- 5
web/app/components/workflow/nodes/_base/components/node-handle.tsx View File

@@ -22,7 +22,6 @@ import {
useIsChatMode,
useNodesInteractions,
useNodesReadOnly,
useWorkflow,
} from '../../../hooks'
import {
useStore,
@@ -132,7 +131,6 @@ export const NodeSourceHandle = memo(({
const { availableNextBlocks } = useAvailableBlocks(data.type, data.isInIteration || data.isInLoop)
const isConnectable = !!availableNextBlocks.length
const isChatMode = useIsChatMode()
const { checkParallelLimit } = useWorkflow()

const connected = data._connectedSourceHandleIds?.includes(handleId)
const handleOpenChange = useCallback((v: boolean) => {
@@ -140,9 +138,8 @@ export const NodeSourceHandle = memo(({
}, [])
const handleHandleClick = useCallback((e: MouseEvent) => {
e.stopPropagation()
if (checkParallelLimit(id, handleId))
setOpen(v => !v)
}, [checkParallelLimit, id, handleId])
setOpen(v => !v)
}, [])
const handleSelect = useCallback((type: BlockEnum, toolDefaultValue?: ToolDefaultValue) => {
handleNodeAdd(
{

+ 0
- 8
web/app/components/workflow/store/workflow/workflow-slice.ts View File

@@ -29,10 +29,6 @@ export type WorkflowSliceShape = {
setControlPromptEditorRerenderKey: (controlPromptEditorRerenderKey: number) => void
showImportDSLModal: boolean
setShowImportDSLModal: (showImportDSLModal: boolean) => void
showTips: string
setShowTips: (showTips: string) => void
workflowConfig?: Record<string, any>
setWorkflowConfig: (workflowConfig: Record<string, any>) => void
fileUploadConfig?: FileUploadConfigResponse
setFileUploadConfig: (fileUploadConfig: FileUploadConfigResponse) => void
}
@@ -59,10 +55,6 @@ export const createWorkflowSlice: StateCreator<WorkflowSliceShape> = set => ({
setControlPromptEditorRerenderKey: controlPromptEditorRerenderKey => set(() => ({ controlPromptEditorRerenderKey })),
showImportDSLModal: false,
setShowImportDSLModal: showImportDSLModal => set(() => ({ showImportDSLModal })),
showTips: '',
setShowTips: showTips => set(() => ({ showTips })),
workflowConfig: undefined,
setWorkflowConfig: workflowConfig => set(() => ({ workflowConfig })),
fileUploadConfig: undefined,
setFileUploadConfig: fileUploadConfig => set(() => ({ fileUploadConfig })),
})

+ 0
- 156
web/app/components/workflow/utils/workflow.ts View File

@@ -1,12 +1,8 @@
import {
getConnectedEdges,
getIncomers,
getOutgoers,
} from 'reactflow'
import { v4 as uuid4 } from 'uuid'
import {
groupBy,
isEqual,
uniqBy,
} from 'lodash-es'
import type {
@@ -168,158 +164,6 @@ export const changeNodesAndEdgesId = (nodes: Node[], edges: Edge[]) => {
return [newNodes, newEdges] as [Node[], Edge[]]
}

type ParallelInfoItem = {
parallelNodeId: string
depth: number
isBranch?: boolean
}
type NodeParallelInfo = {
parallelNodeId: string
edgeHandleId: string
depth: number
}
type NodeHandle = {
node: Node
handle: string
}
type NodeStreamInfo = {
upstreamNodes: Set<string>
downstreamEdges: Set<string>
}
export const getParallelInfo = (startNode: Node, nodes: Node[], edges: Edge[]) => {
if (!startNode)
throw new Error('Start node not found')

const parallelList = [] as ParallelInfoItem[]
const nextNodeHandles = [{ node: startNode, handle: 'source' }]
let hasAbnormalEdges = false

const traverse = (firstNodeHandle: NodeHandle) => {
const nodeEdgesSet = {} as Record<string, Set<string>>
const totalEdgesSet = new Set<string>()
const nextHandles = [firstNodeHandle]
const streamInfo = {} as Record<string, NodeStreamInfo>
const parallelListItem = {
parallelNodeId: '',
depth: 0,
} as ParallelInfoItem
const nodeParallelInfoMap = {} as Record<string, NodeParallelInfo>
nodeParallelInfoMap[firstNodeHandle.node.id] = {
parallelNodeId: '',
edgeHandleId: '',
depth: 0,
}

while (nextHandles.length) {
const currentNodeHandle = nextHandles.shift()!
const { node: currentNode, handle: currentHandle = 'source' } = currentNodeHandle
const currentNodeHandleKey = currentNode.id
const connectedEdges = edges.filter(edge => edge.source === currentNode.id && edge.sourceHandle === currentHandle)
const connectedEdgesLength = connectedEdges.length
const outgoers = nodes.filter(node => connectedEdges.some(edge => edge.target === node.id))
const incomers = getIncomers(currentNode, nodes, edges)

if (!streamInfo[currentNodeHandleKey]) {
streamInfo[currentNodeHandleKey] = {
upstreamNodes: new Set<string>(),
downstreamEdges: new Set<string>(),
}
}

if (nodeEdgesSet[currentNodeHandleKey]?.size > 0 && incomers.length > 1) {
const newSet = new Set<string>()
for (const item of totalEdgesSet) {
if (!streamInfo[currentNodeHandleKey].downstreamEdges.has(item))
newSet.add(item)
}
if (isEqual(nodeEdgesSet[currentNodeHandleKey], newSet)) {
parallelListItem.depth = nodeParallelInfoMap[currentNode.id].depth
nextNodeHandles.push({ node: currentNode, handle: currentHandle })
break
}
}

if (nodeParallelInfoMap[currentNode.id].depth > parallelListItem.depth)
parallelListItem.depth = nodeParallelInfoMap[currentNode.id].depth

outgoers.forEach((outgoer) => {
const outgoerConnectedEdges = getConnectedEdges([outgoer], edges).filter(edge => edge.source === outgoer.id)
const sourceEdgesGroup = groupBy(outgoerConnectedEdges, 'sourceHandle')
const incomers = getIncomers(outgoer, nodes, edges)

if (outgoers.length > 1 && incomers.length > 1)
hasAbnormalEdges = true

Object.keys(sourceEdgesGroup).forEach((sourceHandle) => {
nextHandles.push({ node: outgoer, handle: sourceHandle })
})
if (!outgoerConnectedEdges.length)
nextHandles.push({ node: outgoer, handle: 'source' })

const outgoerKey = outgoer.id
if (!nodeEdgesSet[outgoerKey])
nodeEdgesSet[outgoerKey] = new Set<string>()

if (nodeEdgesSet[currentNodeHandleKey]) {
for (const item of nodeEdgesSet[currentNodeHandleKey])
nodeEdgesSet[outgoerKey].add(item)
}

if (!streamInfo[outgoerKey]) {
streamInfo[outgoerKey] = {
upstreamNodes: new Set<string>(),
downstreamEdges: new Set<string>(),
}
}

if (!nodeParallelInfoMap[outgoer.id]) {
nodeParallelInfoMap[outgoer.id] = {
...nodeParallelInfoMap[currentNode.id],
}
}

if (connectedEdgesLength > 1) {
const edge = connectedEdges.find(edge => edge.target === outgoer.id)!
nodeEdgesSet[outgoerKey].add(edge.id)
totalEdgesSet.add(edge.id)

streamInfo[currentNodeHandleKey].downstreamEdges.add(edge.id)
streamInfo[outgoerKey].upstreamNodes.add(currentNodeHandleKey)

for (const item of streamInfo[currentNodeHandleKey].upstreamNodes)
streamInfo[item].downstreamEdges.add(edge.id)

if (!parallelListItem.parallelNodeId)
parallelListItem.parallelNodeId = currentNode.id

const prevDepth = nodeParallelInfoMap[currentNode.id].depth + 1
const currentDepth = nodeParallelInfoMap[outgoer.id].depth

nodeParallelInfoMap[outgoer.id].depth = Math.max(prevDepth, currentDepth)
}
else {
for (const item of streamInfo[currentNodeHandleKey].upstreamNodes)
streamInfo[outgoerKey].upstreamNodes.add(item)

nodeParallelInfoMap[outgoer.id].depth = nodeParallelInfoMap[currentNode.id].depth
}
})
}

parallelList.push(parallelListItem)
}

while (nextNodeHandles.length) {
const nodeHandle = nextNodeHandles.shift()!
traverse(nodeHandle)
}

return {
parallelList,
hasAbnormalEdges,
}
}

export const hasErrorHandleNode = (nodeType?: BlockEnum) => {
return nodeType === BlockEnum.LLM || nodeType === BlockEnum.Tool || nodeType === BlockEnum.HttpRequest || nodeType === BlockEnum.Code
}

Loading…
Cancel
Save