You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

file_lifecycle.py 18KB


  1. """ClickZetta Volume file lifecycle management
  2. This module provides file lifecycle management features including version control,
  3. automatic cleanup, backup and restore.
  4. Supports complete lifecycle management for knowledge base files.
  5. """
  6. import json
  7. import logging
  8. from dataclasses import asdict, dataclass
  9. from datetime import datetime
  10. from enum import Enum
  11. from typing import Any, Optional
  12. logger = logging.getLogger(__name__)
  13. class FileStatus(Enum):
  14. """File status enumeration"""
  15. ACTIVE = "active" # Active status
  16. ARCHIVED = "archived" # Archived
  17. DELETED = "deleted" # Deleted (soft delete)
  18. BACKUP = "backup" # Backup file
  19. @dataclass
  20. class FileMetadata:
  21. """File metadata"""
  22. filename: str
  23. size: int | None
  24. created_at: datetime
  25. modified_at: datetime
  26. version: int | None
  27. status: FileStatus
  28. checksum: Optional[str] = None
  29. tags: Optional[dict[str, str]] = None
  30. parent_version: Optional[int] = None
  31. def to_dict(self) -> dict:
  32. """Convert to dictionary format"""
  33. data = asdict(self)
  34. data["created_at"] = self.created_at.isoformat()
  35. data["modified_at"] = self.modified_at.isoformat()
  36. data["status"] = self.status.value
  37. return data
  38. @classmethod
  39. def from_dict(cls, data: dict) -> "FileMetadata":
  40. """Create instance from dictionary"""
  41. data = data.copy()
  42. data["created_at"] = datetime.fromisoformat(data["created_at"])
  43. data["modified_at"] = datetime.fromisoformat(data["modified_at"])
  44. data["status"] = FileStatus(data["status"])
  45. return cls(**data)
  46. class FileLifecycleManager:
  47. """File lifecycle manager"""
  48. def __init__(self, storage, dataset_id: Optional[str] = None):
  49. """Initialize lifecycle manager
  50. Args:
  51. storage: ClickZetta Volume storage instance
  52. dataset_id: Dataset ID (for Table Volume)
  53. """
  54. self._storage = storage
  55. self._dataset_id = dataset_id
  56. self._metadata_file = ".dify_file_metadata.json"
  57. self._version_prefix = ".versions/"
  58. self._backup_prefix = ".backups/"
  59. self._deleted_prefix = ".deleted/"
  60. # Get permission manager (if exists)
  61. self._permission_manager: Optional[Any] = getattr(storage, "_permission_manager", None)
  62. def save_with_lifecycle(self, filename: str, data: bytes, tags: Optional[dict[str, str]] = None) -> FileMetadata:
  63. """Save file and manage lifecycle
  64. Args:
  65. filename: File name
  66. data: File content
  67. tags: File tags
  68. Returns:
  69. File metadata
  70. """
  71. # Permission check
  72. if not self._check_permission(filename, "save"):
  73. from .volume_permissions import VolumePermissionError
  74. raise VolumePermissionError(
  75. f"Permission denied for lifecycle save operation on file: {filename}",
  76. operation="save",
  77. volume_type=getattr(self._storage, "_config", {}).get("volume_type", "unknown"),
  78. dataset_id=self._dataset_id,
  79. )
  80. try:
  81. # 1. Check if old version exists
  82. metadata_dict = self._load_metadata()
  83. current_metadata = metadata_dict.get(filename)
  84. # 2. If old version exists, create version backup
  85. if current_metadata:
  86. self._create_version_backup(filename, current_metadata)
  87. # 3. Calculate file information
  88. now = datetime.now()
  89. checksum = self._calculate_checksum(data)
  90. new_version = (current_metadata["version"] + 1) if current_metadata else 1
  91. # 4. Save new file
  92. self._storage.save(filename, data)
  93. # 5. Create metadata
  94. created_at = now
  95. parent_version = None
  96. if current_metadata:
  97. # If created_at is string, convert to datetime
  98. if isinstance(current_metadata["created_at"], str):
  99. created_at = datetime.fromisoformat(current_metadata["created_at"])
  100. else:
  101. created_at = current_metadata["created_at"]
  102. parent_version = current_metadata["version"]
  103. file_metadata = FileMetadata(
  104. filename=filename,
  105. size=len(data),
  106. created_at=created_at,
  107. modified_at=now,
  108. version=new_version,
  109. status=FileStatus.ACTIVE,
  110. checksum=checksum,
  111. tags=tags or {},
  112. parent_version=parent_version,
  113. )
  114. # 6. Update metadata
  115. metadata_dict[filename] = file_metadata.to_dict()
  116. self._save_metadata(metadata_dict)
  117. logger.info("File %s saved with lifecycle management, version %s", filename, new_version)
  118. return file_metadata
  119. except Exception:
  120. logger.exception("Failed to save file with lifecycle")
  121. raise
  122. def get_file_metadata(self, filename: str) -> Optional[FileMetadata]:
  123. """Get file metadata
  124. Args:
  125. filename: File name
  126. Returns:
  127. File metadata, returns None if not exists
  128. """
  129. try:
  130. metadata_dict = self._load_metadata()
  131. if filename in metadata_dict:
  132. return FileMetadata.from_dict(metadata_dict[filename])
  133. return None
  134. except Exception:
  135. logger.exception("Failed to get file metadata for %s", filename)
  136. return None
  137. def list_file_versions(self, filename: str) -> list[FileMetadata]:
  138. """List all versions of a file
  139. Args:
  140. filename: File name
  141. Returns:
  142. File version list, sorted by version number
  143. """
  144. try:
  145. versions = []
  146. # Get current version
  147. current_metadata = self.get_file_metadata(filename)
  148. if current_metadata:
  149. versions.append(current_metadata)
  150. # Get historical versions
  151. try:
  152. version_files = self._storage.scan(self._dataset_id or "", files=True)
  153. for file_path in version_files:
  154. if file_path.startswith(f"{self._version_prefix}{filename}.v"):
  155. # Parse version number
  156. version_str = file_path.split(".v")[-1].split(".")[0]
  157. try:
  158. _ = int(version_str)
  159. # Simplified processing here, should actually read metadata from version file
  160. # Temporarily create basic metadata information
  161. except ValueError:
  162. continue
  163. except:
  164. # If cannot scan version files, only return current version
  165. pass
  166. return sorted(versions, key=lambda x: x.version or 0, reverse=True)
  167. except Exception:
  168. logger.exception("Failed to list file versions for %s", filename)
  169. return []
  170. def restore_version(self, filename: str, version: int) -> bool:
  171. """Restore file to specified version
  172. Args:
  173. filename: File name
  174. version: Version number to restore
  175. Returns:
  176. Whether restore succeeded
  177. """
  178. try:
  179. version_filename = f"{self._version_prefix}{filename}.v{version}"
  180. # Check if version file exists
  181. if not self._storage.exists(version_filename):
  182. logger.warning("Version %s of %s not found", version, filename)
  183. return False
  184. # Read version file content
  185. version_data = self._storage.load_once(version_filename)
  186. # Save current version as backup
  187. current_metadata = self.get_file_metadata(filename)
  188. if current_metadata:
  189. self._create_version_backup(filename, current_metadata.to_dict())
  190. # Restore file
  191. self.save_with_lifecycle(filename, version_data, {"restored_from": str(version)})
  192. return True
  193. except Exception:
  194. logger.exception("Failed to restore %s to version %s", filename, version)
  195. return False
  196. def archive_file(self, filename: str) -> bool:
  197. """Archive file
  198. Args:
  199. filename: File name
  200. Returns:
  201. Whether archive succeeded
  202. """
  203. # Permission check
  204. if not self._check_permission(filename, "archive"):
  205. logger.warning("Permission denied for archive operation on file: %s", filename)
  206. return False
  207. try:
  208. # Update file status to archived
  209. metadata_dict = self._load_metadata()
  210. if filename not in metadata_dict:
  211. logger.warning("File %s not found in metadata", filename)
  212. return False
  213. metadata_dict[filename]["status"] = FileStatus.ARCHIVED.value
  214. metadata_dict[filename]["modified_at"] = datetime.now().isoformat()
  215. self._save_metadata(metadata_dict)
  216. logger.info("File %s archived successfully", filename)
  217. return True
  218. except Exception:
  219. logger.exception("Failed to archive file %s", filename)
  220. return False
  221. def soft_delete_file(self, filename: str) -> bool:
  222. """Soft delete file (move to deleted directory)
  223. Args:
  224. filename: File name
  225. Returns:
  226. Whether delete succeeded
  227. """
  228. # Permission check
  229. if not self._check_permission(filename, "delete"):
  230. logger.warning("Permission denied for soft delete operation on file: %s", filename)
  231. return False
  232. try:
  233. # Check if file exists
  234. if not self._storage.exists(filename):
  235. logger.warning("File %s not found", filename)
  236. return False
  237. # Read file content
  238. file_data = self._storage.load_once(filename)
  239. # Move to deleted directory
  240. deleted_filename = f"{self._deleted_prefix}{filename}.{datetime.now().strftime('%Y%m%d_%H%M%S')}"
  241. self._storage.save(deleted_filename, file_data)
  242. # Delete original file
  243. self._storage.delete(filename)
  244. # Update metadata
  245. metadata_dict = self._load_metadata()
  246. if filename in metadata_dict:
  247. metadata_dict[filename]["status"] = FileStatus.DELETED.value
  248. metadata_dict[filename]["modified_at"] = datetime.now().isoformat()
  249. self._save_metadata(metadata_dict)
  250. logger.info("File %s soft deleted successfully", filename)
  251. return True
  252. except Exception:
  253. logger.exception("Failed to soft delete file %s", filename)
  254. return False
  255. def cleanup_old_versions(self, max_versions: int = 5, max_age_days: int = 30) -> int:
  256. """Cleanup old version files
  257. Args:
  258. max_versions: Maximum number of versions to keep
  259. max_age_days: Maximum retention days for version files
  260. Returns:
  261. Number of files cleaned
  262. """
  263. try:
  264. cleaned_count = 0
  265. # Get all version files
  266. try:
  267. all_files = self._storage.scan(self._dataset_id or "", files=True)
  268. version_files = [f for f in all_files if f.startswith(self._version_prefix)]
  269. # Group by file
  270. file_versions: dict[str, list[tuple[int, str]]] = {}
  271. for version_file in version_files:
  272. # Parse filename and version
  273. parts = version_file[len(self._version_prefix) :].split(".v")
  274. if len(parts) >= 2:
  275. base_filename = parts[0]
  276. version_part = parts[1].split(".")[0]
  277. try:
  278. version_num = int(version_part)
  279. if base_filename not in file_versions:
  280. file_versions[base_filename] = []
  281. file_versions[base_filename].append((version_num, version_file))
  282. except ValueError:
  283. continue
  284. # Cleanup old versions for each file
  285. for base_filename, versions in file_versions.items():
  286. # Sort by version number
  287. versions.sort(key=lambda x: x[0], reverse=True)
  288. # Keep the newest max_versions versions, delete the rest
  289. if len(versions) > max_versions:
  290. to_delete = versions[max_versions:]
  291. for version_num, version_file in to_delete:
  292. self._storage.delete(version_file)
  293. cleaned_count += 1
  294. logger.debug("Cleaned old version: %s", version_file)
  295. logger.info("Cleaned %d old version files", cleaned_count)
  296. except Exception as e:
  297. logger.warning("Could not scan for version files: %s", e)
  298. return cleaned_count
  299. except Exception:
  300. logger.exception("Failed to cleanup old versions")
  301. return 0
  302. def get_storage_statistics(self) -> dict[str, Any]:
  303. """Get storage statistics
  304. Returns:
  305. Storage statistics dictionary
  306. """
  307. try:
  308. metadata_dict = self._load_metadata()
  309. stats: dict[str, Any] = {
  310. "total_files": len(metadata_dict),
  311. "active_files": 0,
  312. "archived_files": 0,
  313. "deleted_files": 0,
  314. "total_size": 0,
  315. "versions_count": 0,
  316. "oldest_file": None,
  317. "newest_file": None,
  318. }
  319. oldest_date = None
  320. newest_date = None
  321. for filename, metadata in metadata_dict.items():
  322. file_meta = FileMetadata.from_dict(metadata)
  323. # Count file status
  324. if file_meta.status == FileStatus.ACTIVE:
  325. stats["active_files"] = (stats["active_files"] or 0) + 1
  326. elif file_meta.status == FileStatus.ARCHIVED:
  327. stats["archived_files"] = (stats["archived_files"] or 0) + 1
  328. elif file_meta.status == FileStatus.DELETED:
  329. stats["deleted_files"] = (stats["deleted_files"] or 0) + 1
  330. # Count size
  331. stats["total_size"] = (stats["total_size"] or 0) + (file_meta.size or 0)
  332. # Count versions
  333. stats["versions_count"] = (stats["versions_count"] or 0) + (file_meta.version or 0)
  334. # Find newest and oldest files
  335. if oldest_date is None or file_meta.created_at < oldest_date:
  336. oldest_date = file_meta.created_at
  337. stats["oldest_file"] = filename
  338. if newest_date is None or file_meta.modified_at > newest_date:
  339. newest_date = file_meta.modified_at
  340. stats["newest_file"] = filename
  341. return stats
  342. except Exception:
  343. logger.exception("Failed to get storage statistics")
  344. return {}
  345. def _create_version_backup(self, filename: str, metadata: dict):
  346. """Create version backup"""
  347. try:
  348. # Read current file content
  349. current_data = self._storage.load_once(filename)
  350. # Save as version file
  351. version_filename = f"{self._version_prefix}{filename}.v{metadata['version']}"
  352. self._storage.save(version_filename, current_data)
  353. logger.debug("Created version backup: %s", version_filename)
  354. except Exception as e:
  355. logger.warning("Failed to create version backup for %s: %s", filename, e)
  356. def _load_metadata(self) -> dict[str, Any]:
  357. """Load metadata file"""
  358. try:
  359. if self._storage.exists(self._metadata_file):
  360. metadata_content = self._storage.load_once(self._metadata_file)
  361. result = json.loads(metadata_content.decode("utf-8"))
  362. return dict(result) if result else {}
  363. else:
  364. return {}
  365. except Exception as e:
  366. logger.warning("Failed to load metadata: %s", e)
  367. return {}
  368. def _save_metadata(self, metadata_dict: dict):
  369. """Save metadata file"""
  370. try:
  371. metadata_content = json.dumps(metadata_dict, indent=2, ensure_ascii=False)
  372. self._storage.save(self._metadata_file, metadata_content.encode("utf-8"))
  373. logger.debug("Metadata saved successfully")
  374. except Exception:
  375. logger.exception("Failed to save metadata")
  376. raise
  377. def _calculate_checksum(self, data: bytes) -> str:
  378. """Calculate file checksum"""
  379. import hashlib
  380. return hashlib.md5(data).hexdigest()
  381. def _check_permission(self, filename: str, operation: str) -> bool:
  382. """Check file operation permission
  383. Args:
  384. filename: File name
  385. operation: Operation type
  386. Returns:
  387. True if permission granted, False otherwise
  388. """
  389. # If no permission manager, allow by default
  390. if not self._permission_manager:
  391. return True
  392. try:
  393. # Map operation type to permission
  394. operation_mapping = {
  395. "save": "save",
  396. "load": "load_once",
  397. "delete": "delete",
  398. "archive": "delete", # Archive requires delete permission
  399. "restore": "save", # Restore requires write permission
  400. "cleanup": "delete", # Cleanup requires delete permission
  401. "read": "load_once",
  402. "write": "save",
  403. }
  404. mapped_operation = operation_mapping.get(operation, operation)
  405. # Check permission
  406. result = self._permission_manager.validate_operation(mapped_operation, self._dataset_id)
  407. return bool(result)
  408. except Exception:
  409. logger.exception("Permission check failed for %s operation %s", filename, operation)
  410. # Safe default: deny access when permission check fails
  411. return False