You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

workflow_app_service.py 4.6KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. import uuid
  2. from datetime import datetime
  3. from sqlalchemy import and_, func, or_, select
  4. from sqlalchemy.orm import Session
  5. from core.workflow.entities.workflow_execution import WorkflowExecutionStatus
  6. from models import Account, App, EndUser, WorkflowAppLog, WorkflowRun
  7. from models.enums import CreatorUserRole
  8. class WorkflowAppService:
  9. def get_paginate_workflow_app_logs(
  10. self,
  11. *,
  12. session: Session,
  13. app_model: App,
  14. keyword: str | None = None,
  15. status: WorkflowExecutionStatus | None = None,
  16. created_at_before: datetime | None = None,
  17. created_at_after: datetime | None = None,
  18. page: int = 1,
  19. limit: int = 20,
  20. created_by_end_user_session_id: str | None = None,
  21. created_by_account: str | None = None,
  22. ) -> dict:
  23. """
  24. Get paginate workflow app logs using SQLAlchemy 2.0 style
  25. :param session: SQLAlchemy session
  26. :param app_model: app model
  27. :param keyword: search keyword
  28. :param status: filter by status
  29. :param created_at_before: filter logs created before this timestamp
  30. :param created_at_after: filter logs created after this timestamp
  31. :param page: page number
  32. :param limit: items per page
  33. :param created_by_end_user_session_id: filter by end user session id
  34. :param created_by_account: filter by account email
  35. :return: Pagination object
  36. """
  37. # Build base statement using SQLAlchemy 2.0 style
  38. stmt = select(WorkflowAppLog).where(
  39. WorkflowAppLog.tenant_id == app_model.tenant_id, WorkflowAppLog.app_id == app_model.id
  40. )
  41. if keyword or status:
  42. stmt = stmt.join(WorkflowRun, WorkflowRun.id == WorkflowAppLog.workflow_run_id)
  43. if keyword:
  44. keyword_like_val = f"%{keyword[:30].encode('unicode_escape').decode('utf-8')}%".replace(r"\u", r"\\u")
  45. keyword_conditions = [
  46. WorkflowRun.inputs.ilike(keyword_like_val),
  47. WorkflowRun.outputs.ilike(keyword_like_val),
  48. # filter keyword by end user session id if created by end user role
  49. and_(WorkflowRun.created_by_role == "end_user", EndUser.session_id.ilike(keyword_like_val)),
  50. ]
  51. # filter keyword by workflow run id
  52. keyword_uuid = self._safe_parse_uuid(keyword)
  53. if keyword_uuid:
  54. keyword_conditions.append(WorkflowRun.id == keyword_uuid)
  55. stmt = stmt.outerjoin(
  56. EndUser,
  57. and_(WorkflowRun.created_by == EndUser.id, WorkflowRun.created_by_role == CreatorUserRole.END_USER),
  58. ).where(or_(*keyword_conditions))
  59. if status:
  60. stmt = stmt.where(WorkflowRun.status == status)
  61. # Add time-based filtering
  62. if created_at_before:
  63. stmt = stmt.where(WorkflowAppLog.created_at <= created_at_before)
  64. if created_at_after:
  65. stmt = stmt.where(WorkflowAppLog.created_at >= created_at_after)
  66. # Filter by end user session id or account email
  67. if created_by_end_user_session_id:
  68. stmt = stmt.join(
  69. EndUser,
  70. and_(
  71. WorkflowAppLog.created_by == EndUser.id,
  72. WorkflowAppLog.created_by_role == CreatorUserRole.END_USER,
  73. EndUser.session_id == created_by_end_user_session_id,
  74. ),
  75. )
  76. if created_by_account:
  77. stmt = stmt.join(
  78. Account,
  79. and_(
  80. WorkflowAppLog.created_by == Account.id,
  81. WorkflowAppLog.created_by_role == CreatorUserRole.ACCOUNT,
  82. Account.email == created_by_account,
  83. ),
  84. )
  85. stmt = stmt.order_by(WorkflowAppLog.created_at.desc())
  86. # Get total count using the same filters
  87. count_stmt = select(func.count()).select_from(stmt.subquery())
  88. total = session.scalar(count_stmt) or 0
  89. # Apply pagination limits
  90. offset_stmt = stmt.offset((page - 1) * limit).limit(limit)
  91. # Execute query and get items
  92. items = list(session.scalars(offset_stmt).all())
  93. return {
  94. "page": page,
  95. "limit": limit,
  96. "total": total,
  97. "has_more": total > page * limit,
  98. "data": items,
  99. }
  100. @staticmethod
  101. def _safe_parse_uuid(value: str):
  102. # fast check
  103. if len(value) < 32:
  104. return None
  105. try:
  106. return uuid.UUID(value)
  107. except ValueError:
  108. return None