|
|
|
@@ -2,13 +2,11 @@ from abc import abstractmethod |
|
|
|
from os import listdir, path |
|
|
|
from typing import Any |
|
|
|
|
|
|
|
from core.tools.entities.api_entities import UserToolProviderCredentials |
|
|
|
from core.tools.entities.tool_entities import ToolParameter, ToolProviderCredentials, ToolProviderType |
|
|
|
from core.tools.entities.values import ToolLabelEnum, default_tool_label_dict |
|
|
|
from core.tools.errors import ( |
|
|
|
ToolNotFoundError, |
|
|
|
ToolParameterValidationError, |
|
|
|
ToolProviderCredentialValidationError, |
|
|
|
ToolProviderNotFoundError, |
|
|
|
) |
|
|
|
from core.tools.provider.tool_provider import ToolProviderController |
|
|
|
@@ -84,15 +82,6 @@ class BuiltinToolProviderController(ToolProviderController): |
|
|
|
|
|
|
|
return self.credentials_schema.copy() |
|
|
|
|
|
|
|
def user_get_credentials_schema(self) -> UserToolProviderCredentials: |
|
|
|
""" |
|
|
|
returns the credentials schema of the provider, this method is used for user |
|
|
|
|
|
|
|
:return: the credentials schema |
|
|
|
""" |
|
|
|
credentials = self.credentials_schema.copy() |
|
|
|
return UserToolProviderCredentials(credentials=credentials) |
|
|
|
|
|
|
|
def get_tools(self) -> list[Tool]: |
|
|
|
""" |
|
|
|
returns a list of tools that the provider can provide |
|
|
|
@@ -222,80 +211,6 @@ class BuiltinToolProviderController(ToolProviderController): |
|
|
|
default_value = bool(default_value) |
|
|
|
|
|
|
|
tool_parameters[parameter] = default_value |
|
|
|
|
|
|
|
def validate_credentials_format(self, credentials: dict[str, Any]) -> None: |
|
|
|
""" |
|
|
|
validate the format of the credentials of the provider and set the default value if needed |
|
|
|
|
|
|
|
:param credentials: the credentials of the tool |
|
|
|
""" |
|
|
|
credentials_schema = self.credentials_schema |
|
|
|
if credentials_schema is None: |
|
|
|
return |
|
|
|
|
|
|
|
credentials_need_to_validate: dict[str, ToolProviderCredentials] = {} |
|
|
|
for credential_name in credentials_schema: |
|
|
|
credentials_need_to_validate[credential_name] = credentials_schema[credential_name] |
|
|
|
|
|
|
|
for credential_name in credentials: |
|
|
|
if credential_name not in credentials_need_to_validate: |
|
|
|
raise ToolProviderCredentialValidationError(f'credential {credential_name} not found in provider {self.identity.name}') |
|
|
|
|
|
|
|
# check type |
|
|
|
credential_schema = credentials_need_to_validate[credential_name] |
|
|
|
if credential_schema == ToolProviderCredentials.CredentialsType.SECRET_INPUT or \ |
|
|
|
credential_schema == ToolProviderCredentials.CredentialsType.TEXT_INPUT: |
|
|
|
if not isinstance(credentials[credential_name], str): |
|
|
|
raise ToolProviderCredentialValidationError(f'credential {credential_schema.label.en_US} should be string') |
|
|
|
|
|
|
|
elif credential_schema.type == ToolProviderCredentials.CredentialsType.SELECT: |
|
|
|
if not isinstance(credentials[credential_name], str): |
|
|
|
raise ToolProviderCredentialValidationError(f'credential {credential_schema.label.en_US} should be string') |
|
|
|
|
|
|
|
options = credential_schema.options |
|
|
|
if not isinstance(options, list): |
|
|
|
raise ToolProviderCredentialValidationError(f'credential {credential_schema.label.en_US} options should be list') |
|
|
|
|
|
|
|
if credentials[credential_name] not in [x.value for x in options]: |
|
|
|
raise ToolProviderCredentialValidationError(f'credential {credential_schema.label.en_US} should be one of {options}') |
|
|
|
elif credential_schema.type == ToolProviderCredentials.CredentialsType.BOOLEAN: |
|
|
|
if isinstance(credentials[credential_name], bool): |
|
|
|
pass |
|
|
|
elif isinstance(credentials[credential_name], str): |
|
|
|
if credentials[credential_name].lower() == 'true': |
|
|
|
credentials[credential_name] = True |
|
|
|
elif credentials[credential_name].lower() == 'false': |
|
|
|
credentials[credential_name] = False |
|
|
|
else: |
|
|
|
raise ToolProviderCredentialValidationError(f'credential {credential_schema.label.en_US} should be boolean') |
|
|
|
elif isinstance(credentials[credential_name], int): |
|
|
|
if credentials[credential_name] == 1: |
|
|
|
credentials[credential_name] = True |
|
|
|
elif credentials[credential_name] == 0: |
|
|
|
credentials[credential_name] = False |
|
|
|
else: |
|
|
|
raise ToolProviderCredentialValidationError(f'credential {credential_schema.label.en_US} should be boolean') |
|
|
|
else: |
|
|
|
raise ToolProviderCredentialValidationError(f'credential {credential_schema.label.en_US} should be boolean') |
|
|
|
|
|
|
|
if credentials[credential_name] or credentials[credential_name] == False: |
|
|
|
credentials_need_to_validate.pop(credential_name) |
|
|
|
|
|
|
|
for credential_name in credentials_need_to_validate: |
|
|
|
credential_schema = credentials_need_to_validate[credential_name] |
|
|
|
if credential_schema.required: |
|
|
|
raise ToolProviderCredentialValidationError(f'credential {credential_schema.label.en_US} is required') |
|
|
|
|
|
|
|
# the credential is not set currently, set the default value if needed |
|
|
|
if credential_schema.default is not None: |
|
|
|
default_value = credential_schema.default |
|
|
|
# parse default value into the correct type |
|
|
|
if credential_schema.type == ToolProviderCredentials.CredentialsType.SECRET_INPUT or \ |
|
|
|
credential_schema.type == ToolProviderCredentials.CredentialsType.TEXT_INPUT or \ |
|
|
|
credential_schema.type == ToolProviderCredentials.CredentialsType.SELECT: |
|
|
|
default_value = str(default_value) |
|
|
|
|
|
|
|
credentials[credential_name] = default_value |
|
|
|
|
|
|
|
def validate_credentials(self, credentials: dict[str, Any]) -> None: |
|
|
|
""" |