Pārlūkot izejas kodu

Fix/dsl kb encrypt (#17353)

tags/1.2.0
Dongyu Li pirms 7 mēnešiem
vecāks
revīzija
2e9997110a
Revīzijas autora e-pasta adrese nav piesaistīta nevienam kontam
1 mainītis faili ar 48 papildinājumiem un 1 dzēšanām
  1. 48
    1
      api/services/app_dsl_service.py

+ 48
- 1
api/services/app_dsl_service.py Parādīt failu

import base64
import hashlib
import logging import logging
import uuid import uuid
from collections.abc import Mapping from collections.abc import Mapping
from uuid import uuid4 from uuid import uuid4


import yaml # type: ignore import yaml # type: ignore
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad
from packaging import version from packaging import version
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from sqlalchemy import select from sqlalchemy import select
unique_hash = current_draft_workflow.unique_hash unique_hash = current_draft_workflow.unique_hash
else: else:
unique_hash = None unique_hash = None
graph = workflow_data.get("graph", {})
for node in graph.get("nodes", []):
if node.get("data", {}).get("type", "") == NodeType.KNOWLEDGE_RETRIEVAL.value:
dataset_ids = node["data"].get("dataset_ids", [])
node["data"]["dataset_ids"] = [
decrypted_id
for dataset_id in dataset_ids
if (decrypted_id := self.decrypt_dataset_id(encrypted_data=dataset_id, tenant_id=app.tenant_id))
]
workflow_service.sync_draft_workflow( workflow_service.sync_draft_workflow(
app_model=app, app_model=app,
graph=workflow_data.get("graph", {}), graph=workflow_data.get("graph", {}),
if not workflow: if not workflow:
raise ValueError("Missing draft workflow configuration, please check.") raise ValueError("Missing draft workflow configuration, please check.")


export_data["workflow"] = workflow.to_dict(include_secret=include_secret)
workflow_dict = workflow.to_dict(include_secret=include_secret)
for node in workflow_dict.get("graph", {}).get("nodes", []):
if node.get("data", {}).get("type", "") == NodeType.KNOWLEDGE_RETRIEVAL.value:
dataset_ids = node["data"].get("dataset_ids", [])
node["data"]["dataset_ids"] = [
cls.encrypt_dataset_id(dataset_id=dataset_id, tenant_id=app_model.tenant_id)
for dataset_id in dataset_ids
]
export_data["workflow"] = workflow_dict
dependencies = cls._extract_dependencies_from_workflow(workflow) dependencies = cls._extract_dependencies_from_workflow(workflow)
export_data["dependencies"] = [ export_data["dependencies"] = [
jsonable_encoder(d.model_dump()) jsonable_encoder(d.model_dump())
return [] return []


return DependenciesAnalysisService.get_leaked_dependencies(tenant_id=tenant_id, dependencies=dependencies) return DependenciesAnalysisService.get_leaked_dependencies(tenant_id=tenant_id, dependencies=dependencies)

@staticmethod
def _generate_aes_key(tenant_id: str) -> bytes:
"""Generate AES key based on tenant_id"""
return hashlib.sha256(tenant_id.encode()).digest()

@classmethod
def encrypt_dataset_id(cls, dataset_id: str, tenant_id: str) -> str:
"""Encrypt dataset_id using AES-CBC mode"""
key = cls._generate_aes_key(tenant_id)
iv = key[:16]
cipher = AES.new(key, AES.MODE_CBC, iv)
ct_bytes = cipher.encrypt(pad(dataset_id.encode(), AES.block_size))
return base64.b64encode(ct_bytes).decode()

@classmethod
def decrypt_dataset_id(cls, encrypted_data: str, tenant_id: str) -> str | None:
"""AES decryption"""
try:
key = cls._generate_aes_key(tenant_id)
iv = key[:16]
cipher = AES.new(key, AES.MODE_CBC, iv)
pt = unpad(cipher.decrypt(base64.b64decode(encrypted_data)), AES.block_size)
return pt.decode()
except Exception:
return None

Notiek ielāde…
Atcelt
Saglabāt