You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

rag_pipeline_dsl_service.py 39KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910
  1. import base64
  2. import hashlib
  3. import json
  4. import logging
  5. import uuid
  6. from collections.abc import Mapping
  7. from datetime import UTC, datetime
  8. from enum import StrEnum
  9. from typing import Optional, cast
  10. from urllib.parse import urlparse
  11. from uuid import uuid4
  12. import yaml # type: ignore
  13. from Crypto.Cipher import AES
  14. from Crypto.Util.Padding import pad, unpad
  15. from flask_login import current_user
  16. from packaging import version
  17. from pydantic import BaseModel, Field
  18. from sqlalchemy import select
  19. from sqlalchemy.orm import Session
  20. from core.helper import ssrf_proxy
  21. from core.model_runtime.utils.encoders import jsonable_encoder
  22. from core.plugin.entities.plugin import PluginDependency
  23. from core.workflow.nodes.enums import NodeType
  24. from core.workflow.nodes.knowledge_retrieval.entities import KnowledgeRetrievalNodeData
  25. from core.workflow.nodes.llm.entities import LLMNodeData
  26. from core.workflow.nodes.parameter_extractor.entities import ParameterExtractorNodeData
  27. from core.workflow.nodes.question_classifier.entities import QuestionClassifierNodeData
  28. from core.workflow.nodes.tool.entities import ToolNodeData
  29. from extensions.ext_database import db
  30. from extensions.ext_redis import redis_client
  31. from factories import variable_factory
  32. from models import Account
  33. from models.dataset import Dataset, DatasetCollectionBinding, Pipeline
  34. from models.workflow import Workflow, WorkflowType
  35. from services.entities.knowledge_entities.rag_pipeline_entities import (
  36. KnowledgeConfiguration,
  37. RagPipelineDatasetCreateEntity,
  38. )
  39. from services.plugin.dependencies_analysis import DependenciesAnalysisService
  40. logger = logging.getLogger(__name__)
  41. IMPORT_INFO_REDIS_KEY_PREFIX = "app_import_info:"
  42. CHECK_DEPENDENCIES_REDIS_KEY_PREFIX = "app_check_dependencies:"
  43. IMPORT_INFO_REDIS_EXPIRY = 10 * 60 # 10 minutes
  44. DSL_MAX_SIZE = 10 * 1024 * 1024 # 10MB
  45. CURRENT_DSL_VERSION = "0.1.0"
  46. class ImportMode(StrEnum):
  47. YAML_CONTENT = "yaml-content"
  48. YAML_URL = "yaml-url"
  49. class ImportStatus(StrEnum):
  50. COMPLETED = "completed"
  51. COMPLETED_WITH_WARNINGS = "completed-with-warnings"
  52. PENDING = "pending"
  53. FAILED = "failed"
  54. class RagPipelineImportInfo(BaseModel):
  55. id: str
  56. status: ImportStatus
  57. pipeline_id: Optional[str] = None
  58. current_dsl_version: str = CURRENT_DSL_VERSION
  59. imported_dsl_version: str = ""
  60. error: str = ""
  61. dataset_id: Optional[str] = None
  62. class CheckDependenciesResult(BaseModel):
  63. leaked_dependencies: list[PluginDependency] = Field(default_factory=list)
  64. def _check_version_compatibility(imported_version: str) -> ImportStatus:
  65. """Determine import status based on version comparison"""
  66. try:
  67. current_ver = version.parse(CURRENT_DSL_VERSION)
  68. imported_ver = version.parse(imported_version)
  69. except version.InvalidVersion:
  70. return ImportStatus.FAILED
  71. # If imported version is newer than current, always return PENDING
  72. if imported_ver > current_ver:
  73. return ImportStatus.PENDING
  74. # If imported version is older than current's major, return PENDING
  75. if imported_ver.major < current_ver.major:
  76. return ImportStatus.PENDING
  77. # If imported version is older than current's minor, return COMPLETED_WITH_WARNINGS
  78. if imported_ver.minor < current_ver.minor:
  79. return ImportStatus.COMPLETED_WITH_WARNINGS
  80. # If imported version equals or is older than current's micro, return COMPLETED
  81. return ImportStatus.COMPLETED
  82. class RagPipelinePendingData(BaseModel):
  83. import_mode: str
  84. yaml_content: str
  85. pipeline_id: str | None
  86. class CheckDependenciesPendingData(BaseModel):
  87. dependencies: list[PluginDependency]
  88. pipeline_id: str | None
  89. class RagPipelineDslService:
  90. def __init__(self, session: Session):
  91. self._session = session
  92. def import_rag_pipeline(
  93. self,
  94. *,
  95. account: Account,
  96. import_mode: str,
  97. yaml_content: Optional[str] = None,
  98. yaml_url: Optional[str] = None,
  99. pipeline_id: Optional[str] = None,
  100. dataset: Optional[Dataset] = None,
  101. ) -> RagPipelineImportInfo:
  102. """Import an app from YAML content or URL."""
  103. import_id = str(uuid.uuid4())
  104. # Validate import mode
  105. try:
  106. mode = ImportMode(import_mode)
  107. except ValueError:
  108. raise ValueError(f"Invalid import_mode: {import_mode}")
  109. # Get YAML content
  110. content: str = ""
  111. if mode == ImportMode.YAML_URL:
  112. if not yaml_url:
  113. return RagPipelineImportInfo(
  114. id=import_id,
  115. status=ImportStatus.FAILED,
  116. error="yaml_url is required when import_mode is yaml-url",
  117. )
  118. try:
  119. parsed_url = urlparse(yaml_url)
  120. if (
  121. parsed_url.scheme == "https"
  122. and parsed_url.netloc == "github.com"
  123. and parsed_url.path.endswith((".yml", ".yaml"))
  124. ):
  125. yaml_url = yaml_url.replace("https://github.com", "https://raw.githubusercontent.com")
  126. yaml_url = yaml_url.replace("/blob/", "/")
  127. response = ssrf_proxy.get(yaml_url.strip(), follow_redirects=True, timeout=(10, 10))
  128. response.raise_for_status()
  129. content = response.content.decode()
  130. if len(content) > DSL_MAX_SIZE:
  131. return RagPipelineImportInfo(
  132. id=import_id,
  133. status=ImportStatus.FAILED,
  134. error="File size exceeds the limit of 10MB",
  135. )
  136. if not content:
  137. return RagPipelineImportInfo(
  138. id=import_id,
  139. status=ImportStatus.FAILED,
  140. error="Empty content from url",
  141. )
  142. except Exception as e:
  143. return RagPipelineImportInfo(
  144. id=import_id,
  145. status=ImportStatus.FAILED,
  146. error=f"Error fetching YAML from URL: {str(e)}",
  147. )
  148. elif mode == ImportMode.YAML_CONTENT:
  149. if not yaml_content:
  150. return RagPipelineImportInfo(
  151. id=import_id,
  152. status=ImportStatus.FAILED,
  153. error="yaml_content is required when import_mode is yaml-content",
  154. )
  155. content = yaml_content
  156. # Process YAML content
  157. try:
  158. # Parse YAML to validate format
  159. data = yaml.safe_load(content)
  160. if not isinstance(data, dict):
  161. return RagPipelineImportInfo(
  162. id=import_id,
  163. status=ImportStatus.FAILED,
  164. error="Invalid YAML format: content must be a mapping",
  165. )
  166. # Validate and fix DSL version
  167. if not data.get("version"):
  168. data["version"] = "0.1.0"
  169. if not data.get("kind") or data.get("kind") != "rag-pipeline":
  170. data["kind"] = "rag-pipeline"
  171. imported_version = data.get("version", "0.1.0")
  172. # check if imported_version is a float-like string
  173. if not isinstance(imported_version, str):
  174. raise ValueError(f"Invalid version type, expected str, got {type(imported_version)}")
  175. status = _check_version_compatibility(imported_version)
  176. # Extract app data
  177. pipeline_data = data.get("rag_pipeline")
  178. if not pipeline_data:
  179. return RagPipelineImportInfo(
  180. id=import_id,
  181. status=ImportStatus.FAILED,
  182. error="Missing rag_pipeline data in YAML content",
  183. )
  184. # If app_id is provided, check if it exists
  185. pipeline = None
  186. if pipeline_id:
  187. stmt = select(Pipeline).where(
  188. Pipeline.id == pipeline_id,
  189. Pipeline.tenant_id == account.current_tenant_id,
  190. )
  191. pipeline = self._session.scalar(stmt)
  192. if not pipeline:
  193. return RagPipelineImportInfo(
  194. id=import_id,
  195. status=ImportStatus.FAILED,
  196. error="Pipeline not found",
  197. )
  198. # If major version mismatch, store import info in Redis
  199. if status == ImportStatus.PENDING:
  200. pending_data = RagPipelinePendingData(
  201. import_mode=import_mode,
  202. yaml_content=content,
  203. pipeline_id=pipeline_id,
  204. )
  205. redis_client.setex(
  206. f"{IMPORT_INFO_REDIS_KEY_PREFIX}{import_id}",
  207. IMPORT_INFO_REDIS_EXPIRY,
  208. pending_data.model_dump_json(),
  209. )
  210. return RagPipelineImportInfo(
  211. id=import_id,
  212. status=status,
  213. pipeline_id=pipeline_id,
  214. imported_dsl_version=imported_version,
  215. )
  216. # Extract dependencies
  217. dependencies = data.get("dependencies", [])
  218. check_dependencies_pending_data = None
  219. if dependencies:
  220. check_dependencies_pending_data = [PluginDependency.model_validate(d) for d in dependencies]
  221. # Create or update pipeline
  222. pipeline = self._create_or_update_pipeline(
  223. pipeline=pipeline,
  224. data=data,
  225. account=account,
  226. dependencies=check_dependencies_pending_data,
  227. )
  228. # create dataset
  229. name = pipeline.name
  230. description = pipeline.description
  231. icon_type = data.get("rag_pipeline", {}).get("icon_type")
  232. icon = data.get("rag_pipeline", {}).get("icon")
  233. icon_background = data.get("rag_pipeline", {}).get("icon_background")
  234. icon_url = data.get("rag_pipeline", {}).get("icon_url")
  235. workflow = data.get("workflow", {})
  236. graph = workflow.get("graph", {})
  237. nodes = graph.get("nodes", [])
  238. dataset_id = None
  239. for node in nodes:
  240. if node.get("data", {}).get("type") == "knowledge_index":
  241. knowledge_configuration = node.get("data", {}).get("knowledge_configuration", {})
  242. knowledge_configuration = KnowledgeConfiguration(**knowledge_configuration)
  243. if dataset and pipeline.is_published and dataset.chunk_structure != knowledge_configuration.chunk_structure:
  244. raise ValueError("Chunk structure is not compatible with the published pipeline")
  245. else:
  246. dataset = Dataset(
  247. tenant_id=account.current_tenant_id,
  248. name=name,
  249. description=description,
  250. icon_info={
  251. "type": icon_type,
  252. "icon": icon,
  253. "background": icon_background,
  254. "url": icon_url,
  255. },
  256. indexing_technique=knowledge_configuration.indexing_technique,
  257. created_by=account.id,
  258. retrieval_model=knowledge_configuration.retrieval_model.model_dump(),
  259. runtime_mode="rag_pipeline",
  260. chunk_structure=knowledge_configuration.chunk_structure,
  261. )
  262. if knowledge_configuration.indexing_technique == "high_quality":
  263. dataset_collection_binding = (
  264. db.session.query(DatasetCollectionBinding)
  265. .filter(
  266. DatasetCollectionBinding.provider_name
  267. == knowledge_configuration.embedding_model_provider,
  268. DatasetCollectionBinding.model_name
  269. == knowledge_configuration.embedding_model,
  270. DatasetCollectionBinding.type == "dataset",
  271. )
  272. .order_by(DatasetCollectionBinding.created_at)
  273. .first()
  274. )
  275. if not dataset_collection_binding:
  276. dataset_collection_binding = DatasetCollectionBinding(
  277. provider_name=knowledge_configuration.embedding_model_provider,
  278. model_name=knowledge_configuration.embedding_model,
  279. collection_name=Dataset.gen_collection_name_by_id(str(uuid.uuid4())),
  280. type="dataset",
  281. )
  282. db.session.add(dataset_collection_binding)
  283. db.session.commit()
  284. dataset_collection_binding_id = dataset_collection_binding.id
  285. dataset.collection_binding_id = dataset_collection_binding_id
  286. dataset.embedding_model = (
  287. knowledge_configuration.embedding_model
  288. )
  289. dataset.embedding_model_provider = (
  290. knowledge_configuration.embedding_model_provider
  291. )
  292. elif knowledge_configuration.indexing_technique == "economy":
  293. dataset.keyword_number = knowledge_configuration.keyword_number
  294. dataset.pipeline_id = pipeline.id
  295. self._session.add(dataset)
  296. self._session.commit()
  297. dataset_id = dataset.id
  298. if not dataset_id:
  299. raise ValueError("DSL is not valid, please check the Knowledge Index node.")
  300. return RagPipelineImportInfo(
  301. id=import_id,
  302. status=status,
  303. pipeline_id=pipeline.id,
  304. dataset_id=dataset_id,
  305. imported_dsl_version=imported_version,
  306. )
  307. except yaml.YAMLError as e:
  308. return RagPipelineImportInfo(
  309. id=import_id,
  310. status=ImportStatus.FAILED,
  311. error=f"Invalid YAML format: {str(e)}",
  312. )
  313. except Exception as e:
  314. logger.exception("Failed to import app")
  315. return RagPipelineImportInfo(
  316. id=import_id,
  317. status=ImportStatus.FAILED,
  318. error=str(e),
  319. )
  320. def confirm_import(self, *, import_id: str, account: Account) -> RagPipelineImportInfo:
  321. """
  322. Confirm an import that requires confirmation
  323. """
  324. redis_key = f"{IMPORT_INFO_REDIS_KEY_PREFIX}{import_id}"
  325. pending_data = redis_client.get(redis_key)
  326. if not pending_data:
  327. return RagPipelineImportInfo(
  328. id=import_id,
  329. status=ImportStatus.FAILED,
  330. error="Import information expired or does not exist",
  331. )
  332. try:
  333. if not isinstance(pending_data, str | bytes):
  334. return RagPipelineImportInfo(
  335. id=import_id,
  336. status=ImportStatus.FAILED,
  337. error="Invalid import information",
  338. )
  339. pending_data = RagPipelinePendingData.model_validate_json(pending_data)
  340. data = yaml.safe_load(pending_data.yaml_content)
  341. pipeline = None
  342. if pending_data.pipeline_id:
  343. stmt = select(Pipeline).where(
  344. Pipeline.id == pending_data.pipeline_id,
  345. Pipeline.tenant_id == account.current_tenant_id,
  346. )
  347. pipeline = self._session.scalar(stmt)
  348. # Create or update app
  349. pipeline = self._create_or_update_pipeline(
  350. pipeline=pipeline,
  351. data=data,
  352. account=account,
  353. )
  354. # create dataset
  355. name = pipeline.name
  356. description = pipeline.description
  357. icon_type = data.get("rag_pipeline", {}).get("icon_type")
  358. icon = data.get("rag_pipeline", {}).get("icon")
  359. icon_background = data.get("rag_pipeline", {}).get("icon_background")
  360. icon_url = data.get("rag_pipeline", {}).get("icon_url")
  361. workflow = data.get("workflow", {})
  362. graph = workflow.get("graph", {})
  363. nodes = graph.get("nodes", [])
  364. dataset_id = None
  365. for node in nodes:
  366. if node.get("data", {}).get("type") == "knowledge_index":
  367. knowledge_configuration = node.get("data", {}).get("knowledge_configuration", {})
  368. knowledge_configuration = KnowledgeConfiguration(**knowledge_configuration)
  369. if not dataset:
  370. dataset = Dataset(
  371. tenant_id=account.current_tenant_id,
  372. name=name,
  373. description=description,
  374. icon_info={
  375. "type": icon_type,
  376. "icon": icon,
  377. "background": icon_background,
  378. "url": icon_url,
  379. },
  380. indexing_technique=knowledge_configuration.indexing_technique,
  381. created_by=account.id,
  382. retrieval_model=knowledge_configuration.retrieval_model.model_dump(),
  383. runtime_mode="rag_pipeline",
  384. chunk_structure=knowledge_configuration.chunk_structure,
  385. )
  386. else:
  387. dataset.indexing_technique = knowledge_configuration.indexing_technique
  388. dataset.retrieval_model = knowledge_configuration.retrieval_model.model_dump()
  389. dataset.runtime_mode = "rag_pipeline"
  390. dataset.chunk_structure = knowledge_configuration.chunk_structure
  391. if knowledge_configuration.indexing_technique == "high_quality":
  392. dataset_collection_binding = (
  393. db.session.query(DatasetCollectionBinding)
  394. .filter(
  395. DatasetCollectionBinding.provider_name
  396. == knowledge_configuration.embedding_model_provider,
  397. DatasetCollectionBinding.model_name
  398. == knowledge_configuration.embedding_model,
  399. DatasetCollectionBinding.type == "dataset",
  400. )
  401. .order_by(DatasetCollectionBinding.created_at)
  402. .first()
  403. )
  404. if not dataset_collection_binding:
  405. dataset_collection_binding = DatasetCollectionBinding(
  406. provider_name=knowledge_configuration.embedding_model_provider,
  407. model_name=knowledge_configuration.embedding_model,
  408. collection_name=Dataset.gen_collection_name_by_id(str(uuid.uuid4())),
  409. type="dataset",
  410. )
  411. db.session.add(dataset_collection_binding)
  412. db.session.commit()
  413. dataset_collection_binding_id = dataset_collection_binding.id
  414. dataset.collection_binding_id = dataset_collection_binding_id
  415. dataset.embedding_model = (
  416. knowledge_configuration.embedding_model
  417. )
  418. dataset.embedding_model_provider = (
  419. knowledge_configuration.embedding_model_provider
  420. )
  421. elif knowledge_configuration.indexing_technique == "economy":
  422. dataset.keyword_number = knowledge_configuration.keyword_number
  423. dataset.pipeline_id = pipeline.id
  424. self._session.add(dataset)
  425. self._session.commit()
  426. dataset_id = dataset.id
  427. if not dataset_id:
  428. raise ValueError("DSL is not valid, please check the Knowledge Index node.")
  429. # Delete import info from Redis
  430. redis_client.delete(redis_key)
  431. return RagPipelineImportInfo(
  432. id=import_id,
  433. status=ImportStatus.COMPLETED,
  434. pipeline_id=pipeline.id,
  435. dataset_id=dataset_id,
  436. current_dsl_version=CURRENT_DSL_VERSION,
  437. imported_dsl_version=data.get("version", "0.1.0"),
  438. )
  439. except Exception as e:
  440. logger.exception("Error confirming import")
  441. return RagPipelineImportInfo(
  442. id=import_id,
  443. status=ImportStatus.FAILED,
  444. error=str(e),
  445. )
  446. def check_dependencies(
  447. self,
  448. *,
  449. pipeline: Pipeline,
  450. ) -> CheckDependenciesResult:
  451. """Check dependencies"""
  452. # Get dependencies from Redis
  453. redis_key = f"{CHECK_DEPENDENCIES_REDIS_KEY_PREFIX}{pipeline.id}"
  454. dependencies = redis_client.get(redis_key)
  455. if not dependencies:
  456. return CheckDependenciesResult()
  457. # Extract dependencies
  458. dependencies = CheckDependenciesPendingData.model_validate_json(dependencies)
  459. # Get leaked dependencies
  460. leaked_dependencies = DependenciesAnalysisService.get_leaked_dependencies(
  461. tenant_id=pipeline.tenant_id, dependencies=dependencies.dependencies
  462. )
  463. return CheckDependenciesResult(
  464. leaked_dependencies=leaked_dependencies,
  465. )
  466. def _create_or_update_pipeline(
  467. self,
  468. *,
  469. pipeline: Optional[Pipeline],
  470. data: dict,
  471. account: Account,
  472. dependencies: Optional[list[PluginDependency]] = None,
  473. ) -> Pipeline:
  474. """Create a new app or update an existing one."""
  475. pipeline_data = data.get("rag_pipeline", {})
  476. # Set icon type
  477. icon_type_value = pipeline_data.get("icon_type")
  478. if icon_type_value in ["emoji", "link"]:
  479. icon_type = icon_type_value
  480. else:
  481. icon_type = "emoji"
  482. icon = str(pipeline_data.get("icon", ""))
  483. # Initialize pipeline based on mode
  484. workflow_data = data.get("workflow")
  485. if not workflow_data or not isinstance(workflow_data, dict):
  486. raise ValueError("Missing workflow data for rag pipeline")
  487. environment_variables_list = workflow_data.get("environment_variables", [])
  488. environment_variables = [
  489. variable_factory.build_environment_variable_from_mapping(obj) for obj in environment_variables_list
  490. ]
  491. conversation_variables_list = workflow_data.get("conversation_variables", [])
  492. conversation_variables = [
  493. variable_factory.build_conversation_variable_from_mapping(obj) for obj in conversation_variables_list
  494. ]
  495. rag_pipeline_variables_list = workflow_data.get("rag_pipeline_variables", [])
  496. graph = workflow_data.get("graph", {})
  497. for node in graph.get("nodes", []):
  498. if node.get("data", {}).get("type", "") == NodeType.KNOWLEDGE_RETRIEVAL.value:
  499. dataset_ids = node["data"].get("dataset_ids", [])
  500. node["data"]["dataset_ids"] = [
  501. decrypted_id
  502. for dataset_id in dataset_ids
  503. if (
  504. decrypted_id := self.decrypt_dataset_id(
  505. encrypted_data=dataset_id,
  506. tenant_id=account.current_tenant_id,
  507. )
  508. )
  509. ]
  510. if pipeline:
  511. # Update existing pipeline
  512. pipeline.name = pipeline_data.get("name", pipeline.name)
  513. pipeline.description = pipeline_data.get("description", pipeline.description)
  514. pipeline.updated_by = account.id
  515. else:
  516. if account.current_tenant_id is None:
  517. raise ValueError("Current tenant is not set")
  518. # Create new app
  519. pipeline = Pipeline()
  520. pipeline.id = str(uuid4())
  521. pipeline.tenant_id = account.current_tenant_id
  522. pipeline.name = pipeline_data.get("name", "")
  523. pipeline.description = pipeline_data.get("description", "")
  524. pipeline.created_by = account.id
  525. pipeline.updated_by = account.id
  526. self._session.add(pipeline)
  527. self._session.commit()
  528. # save dependencies
  529. if dependencies:
  530. redis_client.setex(
  531. f"{CHECK_DEPENDENCIES_REDIS_KEY_PREFIX}{pipeline.id}",
  532. IMPORT_INFO_REDIS_EXPIRY,
  533. CheckDependenciesPendingData(pipeline_id=pipeline.id, dependencies=dependencies).model_dump_json(),
  534. )
  535. workflow = (
  536. db.session.query(Workflow)
  537. .filter(
  538. Workflow.tenant_id == pipeline.tenant_id,
  539. Workflow.app_id == pipeline.id,
  540. Workflow.version == "draft",
  541. )
  542. .first()
  543. )
  544. # create draft workflow if not found
  545. if not workflow:
  546. workflow = Workflow(
  547. tenant_id=pipeline.tenant_id,
  548. app_id=pipeline.id,
  549. features="{}",
  550. type=WorkflowType.RAG_PIPELINE.value,
  551. version="draft",
  552. graph=json.dumps(graph),
  553. created_by=account.id,
  554. environment_variables=environment_variables,
  555. conversation_variables=conversation_variables,
  556. rag_pipeline_variables=rag_pipeline_variables_list,
  557. )
  558. db.session.add(workflow)
  559. db.session.flush()
  560. pipeline.workflow_id = workflow.id
  561. else:
  562. workflow.graph = json.dumps(graph)
  563. workflow.updated_by = account.id
  564. workflow.updated_at = datetime.now(UTC).replace(tzinfo=None)
  565. workflow.environment_variables = environment_variables
  566. workflow.conversation_variables = conversation_variables
  567. workflow.rag_pipeline_variables = rag_pipeline_variables_list
  568. # commit db session changes
  569. db.session.commit()
  570. return pipeline
  571. @classmethod
  572. def export_rag_pipeline_dsl(cls, pipeline: Pipeline, include_secret: bool = False) -> str:
  573. """
  574. Export pipeline
  575. :param pipeline: Pipeline instance
  576. :param include_secret: Whether include secret variable
  577. :return:
  578. """
  579. dataset = pipeline.dataset
  580. if not dataset:
  581. raise ValueError("Missing dataset for rag pipeline")
  582. icon_info = dataset.icon_info
  583. export_data = {
  584. "version": CURRENT_DSL_VERSION,
  585. "kind": "rag_pipeline",
  586. "pipeline": {
  587. "name": pipeline.name,
  588. "icon": icon_info.get("icon", "📙") if icon_info else "📙",
  589. "icon_type": icon_info.get("icon_type", "emoji") if icon_info else "emoji",
  590. "icon_background": icon_info.get("icon_background", "#FFEAD5") if icon_info else "#FFEAD5",
  591. "description": pipeline.description,
  592. },
  593. }
  594. cls._append_workflow_export_data(export_data=export_data, pipeline=pipeline, include_secret=include_secret)
  595. return yaml.dump(export_data, allow_unicode=True) # type: ignore
  596. @classmethod
  597. def _append_workflow_export_data(cls, *, export_data: dict, pipeline: Pipeline, include_secret: bool) -> None:
  598. """
  599. Append workflow export data
  600. :param export_data: export data
  601. :param pipeline: Pipeline instance
  602. """
  603. workflow = (
  604. db.session.query(Workflow)
  605. .filter(
  606. Workflow.tenant_id == pipeline.tenant_id,
  607. Workflow.app_id == pipeline.id,
  608. Workflow.version == "draft",
  609. )
  610. .first()
  611. )
  612. if not workflow:
  613. raise ValueError("Missing draft workflow configuration, please check.")
  614. workflow_dict = workflow.to_dict(include_secret=include_secret)
  615. for node in workflow_dict.get("graph", {}).get("nodes", []):
  616. if node.get("data", {}).get("type", "") == NodeType.KNOWLEDGE_RETRIEVAL.value:
  617. dataset_ids = node["data"].get("dataset_ids", [])
  618. node["data"]["dataset_ids"] = [
  619. cls.encrypt_dataset_id(dataset_id=dataset_id, tenant_id=pipeline.tenant_id)
  620. for dataset_id in dataset_ids
  621. ]
  622. export_data["workflow"] = workflow_dict
  623. dependencies = cls._extract_dependencies_from_workflow(workflow)
  624. export_data["dependencies"] = [
  625. jsonable_encoder(d.model_dump())
  626. for d in DependenciesAnalysisService.generate_dependencies(
  627. tenant_id=pipeline.tenant_id, dependencies=dependencies
  628. )
  629. ]
  630. @classmethod
  631. def _extract_dependencies_from_workflow(cls, workflow: Workflow) -> list[str]:
  632. """
  633. Extract dependencies from workflow
  634. :param workflow: Workflow instance
  635. :return: dependencies list format like ["langgenius/google"]
  636. """
  637. graph = workflow.graph_dict
  638. dependencies = cls._extract_dependencies_from_workflow_graph(graph)
  639. return dependencies
  640. @classmethod
  641. def _extract_dependencies_from_workflow_graph(cls, graph: Mapping) -> list[str]:
  642. """
  643. Extract dependencies from workflow graph
  644. :param graph: Workflow graph
  645. :return: dependencies list format like ["langgenius/google"]
  646. """
  647. dependencies = []
  648. for node in graph.get("nodes", []):
  649. try:
  650. typ = node.get("data", {}).get("type")
  651. match typ:
  652. case NodeType.TOOL.value:
  653. tool_entity = ToolNodeData(**node["data"])
  654. dependencies.append(
  655. DependenciesAnalysisService.analyze_tool_dependency(tool_entity.provider_id),
  656. )
  657. case NodeType.LLM.value:
  658. llm_entity = LLMNodeData(**node["data"])
  659. dependencies.append(
  660. DependenciesAnalysisService.analyze_model_provider_dependency(llm_entity.model.provider),
  661. )
  662. case NodeType.QUESTION_CLASSIFIER.value:
  663. question_classifier_entity = QuestionClassifierNodeData(**node["data"])
  664. dependencies.append(
  665. DependenciesAnalysisService.analyze_model_provider_dependency(
  666. question_classifier_entity.model.provider
  667. ),
  668. )
  669. case NodeType.PARAMETER_EXTRACTOR.value:
  670. parameter_extractor_entity = ParameterExtractorNodeData(**node["data"])
  671. dependencies.append(
  672. DependenciesAnalysisService.analyze_model_provider_dependency(
  673. parameter_extractor_entity.model.provider
  674. ),
  675. )
  676. case NodeType.KNOWLEDGE_RETRIEVAL.value:
  677. knowledge_retrieval_entity = KnowledgeRetrievalNodeData(**node["data"])
  678. if knowledge_retrieval_entity.retrieval_mode == "multiple":
  679. if knowledge_retrieval_entity.multiple_retrieval_config:
  680. if (
  681. knowledge_retrieval_entity.multiple_retrieval_config.reranking_mode
  682. == "reranking_model"
  683. ):
  684. if knowledge_retrieval_entity.multiple_retrieval_config.reranking_model:
  685. dependencies.append(
  686. DependenciesAnalysisService.analyze_model_provider_dependency(
  687. knowledge_retrieval_entity.multiple_retrieval_config.reranking_model.provider
  688. ),
  689. )
  690. elif (
  691. knowledge_retrieval_entity.multiple_retrieval_config.reranking_mode
  692. == "weighted_score"
  693. ):
  694. if knowledge_retrieval_entity.multiple_retrieval_config.weights:
  695. vector_setting = (
  696. knowledge_retrieval_entity.multiple_retrieval_config.weights.vector_setting
  697. )
  698. dependencies.append(
  699. DependenciesAnalysisService.analyze_model_provider_dependency(
  700. vector_setting.embedding_provider_name
  701. ),
  702. )
  703. elif knowledge_retrieval_entity.retrieval_mode == "single":
  704. model_config = knowledge_retrieval_entity.single_retrieval_config
  705. if model_config:
  706. dependencies.append(
  707. DependenciesAnalysisService.analyze_model_provider_dependency(
  708. model_config.model.provider
  709. ),
  710. )
  711. case _:
  712. # TODO: Handle default case or unknown node types
  713. pass
  714. except Exception as e:
  715. logger.exception("Error extracting node dependency", exc_info=e)
  716. return dependencies
  717. @classmethod
  718. def _extract_dependencies_from_model_config(cls, model_config: Mapping) -> list[str]:
  719. """
  720. Extract dependencies from model config
  721. :param model_config: model config dict
  722. :return: dependencies list format like ["langgenius/google"]
  723. """
  724. dependencies = []
  725. try:
  726. # completion model
  727. model_dict = model_config.get("model", {})
  728. if model_dict:
  729. dependencies.append(
  730. DependenciesAnalysisService.analyze_model_provider_dependency(model_dict.get("provider", ""))
  731. )
  732. # reranking model
  733. dataset_configs = model_config.get("dataset_configs", {})
  734. if dataset_configs:
  735. for dataset_config in dataset_configs.get("datasets", {}).get("datasets", []):
  736. if dataset_config.get("reranking_model"):
  737. dependencies.append(
  738. DependenciesAnalysisService.analyze_model_provider_dependency(
  739. dataset_config.get("reranking_model", {})
  740. .get("reranking_provider_name", {})
  741. .get("provider")
  742. )
  743. )
  744. # tools
  745. agent_configs = model_config.get("agent_mode", {})
  746. if agent_configs:
  747. for agent_config in agent_configs.get("tools", []):
  748. dependencies.append(
  749. DependenciesAnalysisService.analyze_tool_dependency(agent_config.get("provider_id"))
  750. )
  751. except Exception as e:
  752. logger.exception("Error extracting model config dependency", exc_info=e)
  753. return dependencies
  754. @classmethod
  755. def get_leaked_dependencies(cls, tenant_id: str, dsl_dependencies: list[dict]) -> list[PluginDependency]:
  756. """
  757. Returns the leaked dependencies in current workspace
  758. """
  759. dependencies = [PluginDependency(**dep) for dep in dsl_dependencies]
  760. if not dependencies:
  761. return []
  762. return DependenciesAnalysisService.get_leaked_dependencies(tenant_id=tenant_id, dependencies=dependencies)
  763. @staticmethod
  764. def _generate_aes_key(tenant_id: str) -> bytes:
  765. """Generate AES key based on tenant_id"""
  766. return hashlib.sha256(tenant_id.encode()).digest()
  767. @classmethod
  768. def encrypt_dataset_id(cls, dataset_id: str, tenant_id: str) -> str:
  769. """Encrypt dataset_id using AES-CBC mode"""
  770. key = cls._generate_aes_key(tenant_id)
  771. iv = key[:16]
  772. cipher = AES.new(key, AES.MODE_CBC, iv)
  773. ct_bytes = cipher.encrypt(pad(dataset_id.encode(), AES.block_size))
  774. return base64.b64encode(ct_bytes).decode()
  775. @classmethod
  776. def decrypt_dataset_id(cls, encrypted_data: str, tenant_id: str) -> str | None:
  777. """AES decryption"""
  778. try:
  779. key = cls._generate_aes_key(tenant_id)
  780. iv = key[:16]
  781. cipher = AES.new(key, AES.MODE_CBC, iv)
  782. pt = unpad(cipher.decrypt(base64.b64decode(encrypted_data)), AES.block_size)
  783. return pt.decode()
  784. except Exception:
  785. return None
  786. @staticmethod
  787. def create_rag_pipeline_dataset(
  788. tenant_id: str,
  789. rag_pipeline_dataset_create_entity: RagPipelineDatasetCreateEntity,
  790. ):
  791. # check if dataset name already exists
  792. if (
  793. db.session.query(Dataset)
  794. .filter_by(name=rag_pipeline_dataset_create_entity.name, tenant_id=tenant_id)
  795. .first()
  796. ):
  797. raise ValueError(
  798. f"Dataset with name {rag_pipeline_dataset_create_entity.name} already exists."
  799. )
  800. with Session(db.engine) as session:
  801. rag_pipeline_dsl_service = RagPipelineDslService(session)
  802. account = cast(Account, current_user)
  803. rag_pipeline_import_info: RagPipelineImportInfo = rag_pipeline_dsl_service.import_rag_pipeline(
  804. account=account,
  805. import_mode=ImportMode.YAML_CONTENT.value,
  806. yaml_content=rag_pipeline_dataset_create_entity.yaml_content,
  807. dataset=None,
  808. )
  809. return {
  810. "id": rag_pipeline_import_info.id,
  811. "dataset_id": rag_pipeline_import_info.dataset_id,
  812. "pipeline_id": rag_pipeline_import_info.pipeline_id,
  813. "status": rag_pipeline_import_info.status,
  814. "imported_dsl_version": rag_pipeline_import_info.imported_dsl_version,
  815. "current_dsl_version": rag_pipeline_import_info.current_dsl_version,
  816. "error": rag_pipeline_import_info.error,
  817. }