| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283 | 
							- from typing import Optional, cast
 - 
 - from core.app.entities.app_invoke_entities import ModelConfigWithCredentialsEntity
 - from core.memory.token_buffer_memory import TokenBufferMemory
 - from core.model_runtime.entities.message_entities import PromptMessage
 - from core.model_runtime.entities.model_entities import ModelPropertyKey
 - from core.model_runtime.model_providers.__base.large_language_model import LargeLanguageModel
 - from core.prompt.entities.advanced_prompt_entities import MemoryConfig
 - 
 - 
 - class PromptTransform:
 -     def _append_chat_histories(self, memory: TokenBufferMemory,
 -                                memory_config: MemoryConfig,
 -                                prompt_messages: list[PromptMessage],
 -                                model_config: ModelConfigWithCredentialsEntity) -> list[PromptMessage]:
 -         rest_tokens = self._calculate_rest_token(prompt_messages, model_config)
 -         histories = self._get_history_messages_list_from_memory(memory, memory_config, rest_tokens)
 -         prompt_messages.extend(histories)
 - 
 -         return prompt_messages
 - 
 -     def _calculate_rest_token(self, prompt_messages: list[PromptMessage],
 -                               model_config: ModelConfigWithCredentialsEntity) -> int:
 -         rest_tokens = 2000
 - 
 -         model_context_tokens = model_config.model_schema.model_properties.get(ModelPropertyKey.CONTEXT_SIZE)
 -         if model_context_tokens:
 -             model_type_instance = model_config.provider_model_bundle.model_type_instance
 -             model_type_instance = cast(LargeLanguageModel, model_type_instance)
 - 
 -             curr_message_tokens = model_type_instance.get_num_tokens(
 -                 model_config.model,
 -                 model_config.credentials,
 -                 prompt_messages
 -             )
 - 
 -             max_tokens = 0
 -             for parameter_rule in model_config.model_schema.parameter_rules:
 -                 if (parameter_rule.name == 'max_tokens'
 -                         or (parameter_rule.use_template and parameter_rule.use_template == 'max_tokens')):
 -                     max_tokens = (model_config.parameters.get(parameter_rule.name)
 -                                   or model_config.parameters.get(parameter_rule.use_template)) or 0
 - 
 -             rest_tokens = model_context_tokens - max_tokens - curr_message_tokens
 -             rest_tokens = max(rest_tokens, 0)
 - 
 -         return rest_tokens
 - 
 -     def _get_history_messages_from_memory(self, memory: TokenBufferMemory,
 -                                           memory_config: MemoryConfig,
 -                                           max_token_limit: int,
 -                                           human_prefix: Optional[str] = None,
 -                                           ai_prefix: Optional[str] = None) -> str:
 -         """Get memory messages."""
 -         kwargs = {
 -             "max_token_limit": max_token_limit
 -         }
 - 
 -         if human_prefix:
 -             kwargs['human_prefix'] = human_prefix
 - 
 -         if ai_prefix:
 -             kwargs['ai_prefix'] = ai_prefix
 - 
 -         if memory_config.window.enabled and memory_config.window.size is not None and memory_config.window.size > 0:
 -             kwargs['message_limit'] = memory_config.window.size
 - 
 -         return memory.get_history_prompt_text(
 -             **kwargs
 -         )
 - 
 -     def _get_history_messages_list_from_memory(self, memory: TokenBufferMemory,
 -                                                memory_config: MemoryConfig,
 -                                                max_token_limit: int) -> list[PromptMessage]:
 -         """Get memory messages."""
 -         return memory.get_history_prompt_messages(
 -             max_token_limit=max_token_limit,
 -             message_limit=memory_config.window.size
 -             if (memory_config.window.enabled
 -                and memory_config.window.size is not None
 -                and memory_config.window.size > 0)
 -             else 10
 -         )
 
 
  |