| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990 | import datetime
import logging
import time
import click
from werkzeug.exceptions import NotFound
import app
from configs import dify_config
from extensions.ext_database import db
from extensions.ext_redis import redis_client
from models.model import (
    App,
    Message,
    MessageAgentThought,
    MessageAnnotation,
    MessageChain,
    MessageFeedback,
    MessageFile,
)
from models.web import SavedMessage
from services.feature_service import FeatureService
_logger = logging.getLogger(__name__)
@app.celery.task(queue="dataset")
def clean_messages():
    click.echo(click.style("Start clean messages.", fg="green"))
    start_at = time.perf_counter()
    plan_sandbox_clean_message_day = datetime.datetime.now() - datetime.timedelta(
        days=dify_config.PLAN_SANDBOX_CLEAN_MESSAGE_DAY_SETTING
    )
    while True:
        try:
            # Main query with join and filter
            messages = (
                db.session.query(Message)
                .filter(Message.created_at < plan_sandbox_clean_message_day)
                .order_by(Message.created_at.desc())
                .limit(100)
                .all()
            )
        except NotFound:
            break
        if not messages:
            break
        for message in messages:
            plan_sandbox_clean_message_day = message.created_at
            app = db.session.query(App).filter_by(id=message.app_id).first()
            if not app:
                _logger.warning(
                    "Expected App record to exist, but none was found, app_id=%s, message_id=%s",
                    message.app_id,
                    message.id,
                )
                continue
            features_cache_key = f"features:{app.tenant_id}"
            plan_cache = redis_client.get(features_cache_key)
            if plan_cache is None:
                features = FeatureService.get_features(app.tenant_id)
                redis_client.setex(features_cache_key, 600, features.billing.subscription.plan)
                plan = features.billing.subscription.plan
            else:
                plan = plan_cache.decode()
            if plan == "sandbox":
                # clean related message
                db.session.query(MessageFeedback).filter(MessageFeedback.message_id == message.id).delete(
                    synchronize_session=False
                )
                db.session.query(MessageAnnotation).filter(MessageAnnotation.message_id == message.id).delete(
                    synchronize_session=False
                )
                db.session.query(MessageChain).filter(MessageChain.message_id == message.id).delete(
                    synchronize_session=False
                )
                db.session.query(MessageAgentThought).filter(MessageAgentThought.message_id == message.id).delete(
                    synchronize_session=False
                )
                db.session.query(MessageFile).filter(MessageFile.message_id == message.id).delete(
                    synchronize_session=False
                )
                db.session.query(SavedMessage).filter(SavedMessage.message_id == message.id).delete(
                    synchronize_session=False
                )
                db.session.query(Message).filter(Message.id == message.id).delete()
                db.session.commit()
    end_at = time.perf_counter()
    click.echo(click.style("Cleaned messages from db success latency: {}".format(end_at - start_at), fg="green"))
 |