You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

clear_free_plan_tenant_expired_logs.py 13KB


  1. import datetime
  2. import json
  3. import logging
  4. import time
  5. from concurrent.futures import ThreadPoolExecutor
  6. import click
  7. from flask import Flask, current_app
  8. from sqlalchemy.orm import Session
  9. from configs import dify_config
  10. from core.model_runtime.utils.encoders import jsonable_encoder
  11. from extensions.ext_database import db
  12. from extensions.ext_storage import storage
  13. from models.account import Tenant
  14. from models.model import App, Conversation, Message
  15. from models.workflow import WorkflowNodeExecutionModel, WorkflowRun
  16. from services.billing_service import BillingService
  17. logger = logging.getLogger(__name__)
  18. class ClearFreePlanTenantExpiredLogs:
  19. @classmethod
  20. def process_tenant(cls, flask_app: Flask, tenant_id: str, days: int, batch: int):
  21. with flask_app.app_context():
  22. apps = db.session.query(App).filter(App.tenant_id == tenant_id).all()
  23. app_ids = [app.id for app in apps]
  24. while True:
  25. with Session(db.engine).no_autoflush as session:
  26. messages = (
  27. session.query(Message)
  28. .filter(
  29. Message.app_id.in_(app_ids),
  30. Message.created_at < datetime.datetime.now() - datetime.timedelta(days=days),
  31. )
  32. .limit(batch)
  33. .all()
  34. )
  35. if len(messages) == 0:
  36. break
  37. storage.save(
  38. f"free_plan_tenant_expired_logs/"
  39. f"{tenant_id}/messages/{datetime.datetime.now().strftime('%Y-%m-%d')}"
  40. f"-{time.time()}.json",
  41. json.dumps(
  42. jsonable_encoder(
  43. [message.to_dict() for message in messages],
  44. ),
  45. ).encode("utf-8"),
  46. )
  47. message_ids = [message.id for message in messages]
  48. # delete messages
  49. session.query(Message).filter(
  50. Message.id.in_(message_ids),
  51. ).delete(synchronize_session=False)
  52. session.commit()
  53. click.echo(
  54. click.style(
  55. f"[{datetime.datetime.now()}] Processed {len(message_ids)} messages for tenant {tenant_id} "
  56. )
  57. )
  58. while True:
  59. with Session(db.engine).no_autoflush as session:
  60. conversations = (
  61. session.query(Conversation)
  62. .filter(
  63. Conversation.app_id.in_(app_ids),
  64. Conversation.updated_at < datetime.datetime.now() - datetime.timedelta(days=days),
  65. )
  66. .limit(batch)
  67. .all()
  68. )
  69. if len(conversations) == 0:
  70. break
  71. storage.save(
  72. f"free_plan_tenant_expired_logs/"
  73. f"{tenant_id}/conversations/{datetime.datetime.now().strftime('%Y-%m-%d')}"
  74. f"-{time.time()}.json",
  75. json.dumps(
  76. jsonable_encoder(
  77. [conversation.to_dict() for conversation in conversations],
  78. ),
  79. ).encode("utf-8"),
  80. )
  81. conversation_ids = [conversation.id for conversation in conversations]
  82. session.query(Conversation).filter(
  83. Conversation.id.in_(conversation_ids),
  84. ).delete(synchronize_session=False)
  85. session.commit()
  86. click.echo(
  87. click.style(
  88. f"[{datetime.datetime.now()}] Processed {len(conversation_ids)}"
  89. f" conversations for tenant {tenant_id}"
  90. )
  91. )
  92. while True:
  93. with Session(db.engine).no_autoflush as session:
  94. workflow_node_executions = (
  95. session.query(WorkflowNodeExecutionModel)
  96. .filter(
  97. WorkflowNodeExecutionModel.tenant_id == tenant_id,
  98. WorkflowNodeExecutionModel.created_at
  99. < datetime.datetime.now() - datetime.timedelta(days=days),
  100. )
  101. .limit(batch)
  102. .all()
  103. )
  104. if len(workflow_node_executions) == 0:
  105. break
  106. # save workflow node executions
  107. storage.save(
  108. f"free_plan_tenant_expired_logs/"
  109. f"{tenant_id}/workflow_node_executions/{datetime.datetime.now().strftime('%Y-%m-%d')}"
  110. f"-{time.time()}.json",
  111. json.dumps(
  112. jsonable_encoder(workflow_node_executions),
  113. ).encode("utf-8"),
  114. )
  115. workflow_node_execution_ids = [
  116. workflow_node_execution.id for workflow_node_execution in workflow_node_executions
  117. ]
  118. # delete workflow node executions
  119. session.query(WorkflowNodeExecutionModel).filter(
  120. WorkflowNodeExecutionModel.id.in_(workflow_node_execution_ids),
  121. ).delete(synchronize_session=False)
  122. session.commit()
  123. click.echo(
  124. click.style(
  125. f"[{datetime.datetime.now()}] Processed {len(workflow_node_execution_ids)}"
  126. f" workflow node executions for tenant {tenant_id}"
  127. )
  128. )
  129. while True:
  130. with Session(db.engine).no_autoflush as session:
  131. workflow_runs = (
  132. session.query(WorkflowRun)
  133. .filter(
  134. WorkflowRun.tenant_id == tenant_id,
  135. WorkflowRun.created_at < datetime.datetime.now() - datetime.timedelta(days=days),
  136. )
  137. .limit(batch)
  138. .all()
  139. )
  140. if len(workflow_runs) == 0:
  141. break
  142. # save workflow runs
  143. storage.save(
  144. f"free_plan_tenant_expired_logs/"
  145. f"{tenant_id}/workflow_runs/{datetime.datetime.now().strftime('%Y-%m-%d')}"
  146. f"-{time.time()}.json",
  147. json.dumps(
  148. jsonable_encoder(
  149. [workflow_run.to_dict() for workflow_run in workflow_runs],
  150. ),
  151. ).encode("utf-8"),
  152. )
  153. workflow_run_ids = [workflow_run.id for workflow_run in workflow_runs]
  154. # delete workflow runs
  155. session.query(WorkflowRun).filter(
  156. WorkflowRun.id.in_(workflow_run_ids),
  157. ).delete(synchronize_session=False)
  158. session.commit()
  159. @classmethod
  160. def process(cls, days: int, batch: int, tenant_ids: list[str]):
  161. """
  162. Clear free plan tenant expired logs.
  163. """
  164. click.echo(click.style("Clearing free plan tenant expired logs", fg="white"))
  165. ended_at = datetime.datetime.now()
  166. started_at = datetime.datetime(2023, 4, 3, 8, 59, 24)
  167. current_time = started_at
  168. with Session(db.engine) as session:
  169. total_tenant_count = session.query(Tenant.id).count()
  170. click.echo(click.style(f"Total tenant count: {total_tenant_count}", fg="white"))
  171. handled_tenant_count = 0
  172. thread_pool = ThreadPoolExecutor(max_workers=10)
  173. def process_tenant(flask_app: Flask, tenant_id: str) -> None:
  174. try:
  175. if (
  176. not dify_config.BILLING_ENABLED
  177. or BillingService.get_info(tenant_id)["subscription"]["plan"] == "sandbox"
  178. ):
  179. # only process sandbox tenant
  180. cls.process_tenant(flask_app, tenant_id, days, batch)
  181. except Exception:
  182. logger.exception(f"Failed to process tenant {tenant_id}")
  183. finally:
  184. nonlocal handled_tenant_count
  185. handled_tenant_count += 1
  186. if handled_tenant_count % 100 == 0:
  187. click.echo(
  188. click.style(
  189. f"[{datetime.datetime.now()}] "
  190. f"Processed {handled_tenant_count} tenants "
  191. f"({(handled_tenant_count / total_tenant_count) * 100:.1f}%), "
  192. f"{handled_tenant_count}/{total_tenant_count}",
  193. fg="green",
  194. )
  195. )
  196. futures = []
  197. if tenant_ids:
  198. for tenant_id in tenant_ids:
  199. futures.append(
  200. thread_pool.submit(
  201. process_tenant,
  202. current_app._get_current_object(), # type: ignore[attr-defined]
  203. tenant_id,
  204. )
  205. )
  206. else:
  207. while current_time < ended_at:
  208. click.echo(
  209. click.style(f"Current time: {current_time}, Started at: {datetime.datetime.now()}", fg="white")
  210. )
  211. # Initial interval of 1 day, will be dynamically adjusted based on tenant count
  212. interval = datetime.timedelta(days=1)
  213. # Process tenants in this batch
  214. with Session(db.engine) as session:
  215. # Calculate tenant count in next batch with current interval
  216. # Try different intervals until we find one with a reasonable tenant count
  217. test_intervals = [
  218. datetime.timedelta(days=1),
  219. datetime.timedelta(hours=12),
  220. datetime.timedelta(hours=6),
  221. datetime.timedelta(hours=3),
  222. datetime.timedelta(hours=1),
  223. ]
  224. for test_interval in test_intervals:
  225. tenant_count = (
  226. session.query(Tenant.id)
  227. .filter(Tenant.created_at.between(current_time, current_time + test_interval))
  228. .count()
  229. )
  230. if tenant_count <= 100:
  231. interval = test_interval
  232. break
  233. else:
  234. # If all intervals have too many tenants, use minimum interval
  235. interval = datetime.timedelta(hours=1)
  236. # Adjust interval to target ~100 tenants per batch
  237. if tenant_count > 0:
  238. # Scale interval based on ratio to target count
  239. interval = min(
  240. datetime.timedelta(days=1), # Max 1 day
  241. max(
  242. datetime.timedelta(hours=1), # Min 1 hour
  243. interval * (100 / tenant_count), # Scale to target 100
  244. ),
  245. )
  246. batch_end = min(current_time + interval, ended_at)
  247. rs = (
  248. session.query(Tenant.id)
  249. .filter(Tenant.created_at.between(current_time, batch_end))
  250. .order_by(Tenant.created_at)
  251. )
  252. tenants = []
  253. for row in rs:
  254. tenant_id = str(row.id)
  255. try:
  256. tenants.append(tenant_id)
  257. except Exception:
  258. logger.exception(f"Failed to process tenant {tenant_id}")
  259. continue
  260. futures.append(
  261. thread_pool.submit(
  262. process_tenant,
  263. current_app._get_current_object(), # type: ignore[attr-defined]
  264. tenant_id,
  265. )
  266. )
  267. current_time = batch_end
  268. # wait for all threads to finish
  269. for future in futures:
  270. future.result()