| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251 | 
							- import uuid
 - from collections.abc import Generator, Mapping
 - from typing import Any, Optional, Union
 - 
 - from openai._exceptions import RateLimitError
 - 
 - from configs import dify_config
 - from core.app.apps.advanced_chat.app_generator import AdvancedChatAppGenerator
 - from core.app.apps.agent_chat.app_generator import AgentChatAppGenerator
 - from core.app.apps.chat.app_generator import ChatAppGenerator
 - from core.app.apps.completion.app_generator import CompletionAppGenerator
 - from core.app.apps.workflow.app_generator import WorkflowAppGenerator
 - from core.app.entities.app_invoke_entities import InvokeFrom
 - from core.app.features.rate_limiting import RateLimit
 - from libs.helper import RateLimiter
 - from models.model import Account, App, AppMode, EndUser
 - from models.workflow import Workflow
 - from services.billing_service import BillingService
 - from services.errors.app import WorkflowIdFormatError, WorkflowNotFoundError
 - from services.errors.llm import InvokeRateLimitError
 - from services.workflow_service import WorkflowService
 - 
 - 
 - class AppGenerateService:
 -     system_rate_limiter = RateLimiter("app_daily_rate_limiter", dify_config.APP_DAILY_RATE_LIMIT, 86400)
 - 
 -     @classmethod
 -     def generate(
 -         cls,
 -         app_model: App,
 -         user: Union[Account, EndUser],
 -         args: Mapping[str, Any],
 -         invoke_from: InvokeFrom,
 -         streaming: bool = True,
 -     ):
 -         """
 -         App Content Generate
 -         :param app_model: app model
 -         :param user: user
 -         :param args: args
 -         :param invoke_from: invoke from
 -         :param streaming: streaming
 -         :return:
 -         """
 -         # system level rate limiter
 -         if dify_config.BILLING_ENABLED:
 -             # check if it's free plan
 -             limit_info = BillingService.get_info(app_model.tenant_id)
 -             if limit_info["subscription"]["plan"] == "sandbox":
 -                 if cls.system_rate_limiter.is_rate_limited(app_model.tenant_id):
 -                     raise InvokeRateLimitError(
 -                         "Rate limit exceeded, please upgrade your plan "
 -                         f"or your RPD was {dify_config.APP_DAILY_RATE_LIMIT} requests/day"
 -                     )
 -                 cls.system_rate_limiter.increment_rate_limit(app_model.tenant_id)
 - 
 -         # app level rate limiter
 -         max_active_request = cls._get_max_active_requests(app_model)
 -         rate_limit = RateLimit(app_model.id, max_active_request)
 -         request_id = RateLimit.gen_request_key()
 -         try:
 -             request_id = rate_limit.enter(request_id)
 -             if app_model.mode == AppMode.COMPLETION.value:
 -                 return rate_limit.generate(
 -                     CompletionAppGenerator.convert_to_event_stream(
 -                         CompletionAppGenerator().generate(
 -                             app_model=app_model, user=user, args=args, invoke_from=invoke_from, streaming=streaming
 -                         ),
 -                     ),
 -                     request_id=request_id,
 -                 )
 -             elif app_model.mode == AppMode.AGENT_CHAT.value or app_model.is_agent:
 -                 return rate_limit.generate(
 -                     AgentChatAppGenerator.convert_to_event_stream(
 -                         AgentChatAppGenerator().generate(
 -                             app_model=app_model, user=user, args=args, invoke_from=invoke_from, streaming=streaming
 -                         ),
 -                     ),
 -                     request_id,
 -                 )
 -             elif app_model.mode == AppMode.CHAT.value:
 -                 return rate_limit.generate(
 -                     ChatAppGenerator.convert_to_event_stream(
 -                         ChatAppGenerator().generate(
 -                             app_model=app_model, user=user, args=args, invoke_from=invoke_from, streaming=streaming
 -                         ),
 -                     ),
 -                     request_id=request_id,
 -                 )
 -             elif app_model.mode == AppMode.ADVANCED_CHAT.value:
 -                 workflow_id = args.get("workflow_id")
 -                 workflow = cls._get_workflow(app_model, invoke_from, workflow_id)
 -                 return rate_limit.generate(
 -                     AdvancedChatAppGenerator.convert_to_event_stream(
 -                         AdvancedChatAppGenerator().generate(
 -                             app_model=app_model,
 -                             workflow=workflow,
 -                             user=user,
 -                             args=args,
 -                             invoke_from=invoke_from,
 -                             streaming=streaming,
 -                         ),
 -                     ),
 -                     request_id=request_id,
 -                 )
 -             elif app_model.mode == AppMode.WORKFLOW.value:
 -                 workflow_id = args.get("workflow_id")
 -                 workflow = cls._get_workflow(app_model, invoke_from, workflow_id)
 -                 return rate_limit.generate(
 -                     WorkflowAppGenerator.convert_to_event_stream(
 -                         WorkflowAppGenerator().generate(
 -                             app_model=app_model,
 -                             workflow=workflow,
 -                             user=user,
 -                             args=args,
 -                             invoke_from=invoke_from,
 -                             streaming=streaming,
 -                             call_depth=0,
 -                             workflow_thread_pool_id=None,
 -                         ),
 -                     ),
 -                     request_id,
 -                 )
 -             else:
 -                 raise ValueError(f"Invalid app mode {app_model.mode}")
 -         except RateLimitError as e:
 -             raise InvokeRateLimitError(str(e))
 -         except Exception:
 -             rate_limit.exit(request_id)
 -             raise
 -         finally:
 -             if not streaming:
 -                 rate_limit.exit(request_id)
 - 
 -     @staticmethod
 -     def _get_max_active_requests(app: App) -> int:
 -         """
 -         Get the maximum number of active requests allowed for an app.
 - 
 -         Returns the smaller value between app's custom limit and global config limit.
 -         A value of 0 means infinite (no limit).
 - 
 -         Args:
 -             app: The App model instance
 - 
 -         Returns:
 -             The maximum number of active requests allowed
 -         """
 -         app_limit = app.max_active_requests or 0
 -         config_limit = dify_config.APP_MAX_ACTIVE_REQUESTS
 - 
 -         # Filter out infinite (0) values and return the minimum, or 0 if both are infinite
 -         limits = [limit for limit in [app_limit, config_limit] if limit > 0]
 -         return min(limits) if limits else 0
 - 
 -     @classmethod
 -     def generate_single_iteration(cls, app_model: App, user: Account, node_id: str, args: Any, streaming: bool = True):
 -         if app_model.mode == AppMode.ADVANCED_CHAT.value:
 -             workflow = cls._get_workflow(app_model, InvokeFrom.DEBUGGER)
 -             return AdvancedChatAppGenerator.convert_to_event_stream(
 -                 AdvancedChatAppGenerator().single_iteration_generate(
 -                     app_model=app_model, workflow=workflow, node_id=node_id, user=user, args=args, streaming=streaming
 -                 )
 -             )
 -         elif app_model.mode == AppMode.WORKFLOW.value:
 -             workflow = cls._get_workflow(app_model, InvokeFrom.DEBUGGER)
 -             return AdvancedChatAppGenerator.convert_to_event_stream(
 -                 WorkflowAppGenerator().single_iteration_generate(
 -                     app_model=app_model, workflow=workflow, node_id=node_id, user=user, args=args, streaming=streaming
 -                 )
 -             )
 -         else:
 -             raise ValueError(f"Invalid app mode {app_model.mode}")
 - 
 -     @classmethod
 -     def generate_single_loop(cls, app_model: App, user: Account, node_id: str, args: Any, streaming: bool = True):
 -         if app_model.mode == AppMode.ADVANCED_CHAT.value:
 -             workflow = cls._get_workflow(app_model, InvokeFrom.DEBUGGER)
 -             return AdvancedChatAppGenerator.convert_to_event_stream(
 -                 AdvancedChatAppGenerator().single_loop_generate(
 -                     app_model=app_model, workflow=workflow, node_id=node_id, user=user, args=args, streaming=streaming
 -                 )
 -             )
 -         elif app_model.mode == AppMode.WORKFLOW.value:
 -             workflow = cls._get_workflow(app_model, InvokeFrom.DEBUGGER)
 -             return AdvancedChatAppGenerator.convert_to_event_stream(
 -                 WorkflowAppGenerator().single_loop_generate(
 -                     app_model=app_model, workflow=workflow, node_id=node_id, user=user, args=args, streaming=streaming
 -                 )
 -             )
 -         else:
 -             raise ValueError(f"Invalid app mode {app_model.mode}")
 - 
 -     @classmethod
 -     def generate_more_like_this(
 -         cls,
 -         app_model: App,
 -         user: Union[Account, EndUser],
 -         message_id: str,
 -         invoke_from: InvokeFrom,
 -         streaming: bool = True,
 -     ) -> Union[Mapping, Generator]:
 -         """
 -         Generate more like this
 -         :param app_model: app model
 -         :param user: user
 -         :param message_id: message id
 -         :param invoke_from: invoke from
 -         :param streaming: streaming
 -         :return:
 -         """
 -         return CompletionAppGenerator().generate_more_like_this(
 -             app_model=app_model, message_id=message_id, user=user, invoke_from=invoke_from, stream=streaming
 -         )
 - 
 -     @classmethod
 -     def _get_workflow(cls, app_model: App, invoke_from: InvokeFrom, workflow_id: Optional[str] = None) -> Workflow:
 -         """
 -         Get workflow
 -         :param app_model: app model
 -         :param invoke_from: invoke from
 -         :param workflow_id: optional workflow id to specify a specific version
 -         :return:
 -         """
 -         workflow_service = WorkflowService()
 - 
 -         # If workflow_id is specified, get the specific workflow version
 -         if workflow_id:
 -             try:
 -                 workflow_uuid = uuid.UUID(workflow_id)
 -             except ValueError:
 -                 raise WorkflowIdFormatError(f"Invalid workflow_id format: '{workflow_id}'. ")
 -             workflow = workflow_service.get_published_workflow_by_id(app_model=app_model, workflow_id=workflow_id)
 -             if not workflow:
 -                 raise WorkflowNotFoundError(f"Workflow not found with id: {workflow_id}")
 -             return workflow
 - 
 -         if invoke_from == InvokeFrom.DEBUGGER:
 -             # fetch draft workflow by app_model
 -             workflow = workflow_service.get_draft_workflow(app_model=app_model)
 - 
 -             if not workflow:
 -                 raise ValueError("Workflow not initialized")
 -         else:
 -             # fetch published workflow by app_model
 -             workflow = workflow_service.get_published_workflow(app_model=app_model)
 - 
 -             if not workflow:
 -                 raise ValueError("Workflow not published")
 - 
 -         return workflow
 
 
  |