You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

datasource_provider_service.py 27KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656
  1. import logging
  2. from typing import Any
  3. from flask_login import current_user
  4. from sqlalchemy.orm import Session
  5. from constants import HIDDEN_VALUE, UNKNOWN_VALUE
  6. from core.helper import encrypter
  7. from core.helper.name_generator import generate_incremental_name
  8. from core.helper.provider_cache import NoOpProviderCredentialCache
  9. from core.model_runtime.entities.provider_entities import FormType
  10. from core.model_runtime.errors.validate import CredentialsValidateFailedError
  11. from core.plugin.entities.plugin import DatasourceProviderID
  12. from core.plugin.impl.datasource import PluginDatasourceManager
  13. from core.tools.entities.tool_entities import CredentialType
  14. from core.tools.utils.encryption import ProviderConfigCache, ProviderConfigEncrypter, create_provider_encrypter
  15. from extensions.ext_database import db
  16. from extensions.ext_redis import redis_client
  17. from models.oauth import DatasourceOauthParamConfig, DatasourceOauthTenantParamConfig, DatasourceProvider
  18. logger = logging.getLogger(__name__)
  19. class DatasourceProviderService:
  20. """
  21. Model Provider Service
  22. """
  23. def __init__(self) -> None:
  24. self.provider_manager = PluginDatasourceManager()
  25. def get_default_credentials(self, tenant_id: str, provider: str, plugin_id: str) -> dict[str, Any]:
  26. """
  27. get default credentials
  28. """
  29. with Session(db.engine) as session:
  30. datasource_provider = (
  31. session.query(DatasourceProvider)
  32. .filter_by(tenant_id=tenant_id, provider=provider, plugin_id=plugin_id)
  33. .order_by(DatasourceProvider.is_default.desc(), DatasourceProvider.created_at.asc())
  34. .first()
  35. )
  36. if not datasource_provider:
  37. return {}
  38. return datasource_provider.encrypted_credentials
  39. def update_datasource_provider_name(
  40. self, tenant_id: str, datasource_provider_id: DatasourceProviderID, name: str, credential_id: str
  41. ):
  42. """
  43. update datasource provider name
  44. """
  45. with Session(db.engine) as session:
  46. target_provider = (
  47. session.query(DatasourceProvider)
  48. .filter_by(
  49. tenant_id=tenant_id,
  50. id=credential_id,
  51. provider=datasource_provider_id.provider_name,
  52. plugin_id=datasource_provider_id.plugin_id,
  53. )
  54. .first()
  55. )
  56. if target_provider is None:
  57. raise ValueError("provider not found")
  58. if target_provider.name == name:
  59. return
  60. # check name is exist
  61. if (
  62. session.query(DatasourceProvider)
  63. .filter_by(
  64. tenant_id=tenant_id,
  65. name=name,
  66. provider=datasource_provider_id.provider_name,
  67. plugin_id=datasource_provider_id.plugin_id,
  68. )
  69. .count()
  70. > 0
  71. ):
  72. raise ValueError("name is already exists")
  73. target_provider.name = name
  74. session.commit()
  75. return
  76. def set_default_datasource_provider(
  77. self, tenant_id: str, datasource_provider_id: DatasourceProviderID, credential_id: str
  78. ):
  79. """
  80. set default datasource provider
  81. """
  82. with Session(db.engine) as session:
  83. # get provider
  84. target_provider = (
  85. session.query(DatasourceProvider)
  86. .filter_by(
  87. tenant_id=tenant_id,
  88. id=credential_id,
  89. provider=datasource_provider_id.provider_name,
  90. plugin_id=datasource_provider_id.plugin_id,
  91. )
  92. .first()
  93. )
  94. if target_provider is None:
  95. raise ValueError("provider not found")
  96. # clear default provider
  97. session.query(DatasourceProvider).filter_by(
  98. tenant_id=tenant_id,
  99. provider=target_provider.provider,
  100. plugin_id=target_provider.plugin_id,
  101. is_default=True,
  102. ).update({"is_default": False})
  103. # set new default provider
  104. target_provider.is_default = True
  105. session.commit()
  106. return {"result": "success"}
  107. def setup_oauth_custom_client_params(
  108. self,
  109. tenant_id: str,
  110. datasource_provider_id: DatasourceProviderID,
  111. client_params: dict | None,
  112. enabled: bool | None,
  113. ):
  114. """
  115. setup oauth custom client params
  116. """
  117. if client_params is None and enabled is None:
  118. return
  119. with Session(db.engine) as session:
  120. tenant_oauth_client_params = (
  121. session.query(DatasourceOauthTenantParamConfig)
  122. .filter_by(
  123. tenant_id=tenant_id,
  124. provider=datasource_provider_id.provider_name,
  125. plugin_id=datasource_provider_id.plugin_id,
  126. )
  127. .first()
  128. )
  129. if not tenant_oauth_client_params:
  130. tenant_oauth_client_params = DatasourceOauthTenantParamConfig(
  131. tenant_id=tenant_id,
  132. provider=datasource_provider_id.provider_name,
  133. plugin_id=datasource_provider_id.plugin_id,
  134. client_params={},
  135. enabled=False,
  136. )
  137. session.add(tenant_oauth_client_params)
  138. if client_params is not None:
  139. encrypter, _ = self.get_oauth_encrypter(tenant_id, datasource_provider_id)
  140. original_params = (
  141. encrypter.decrypt(tenant_oauth_client_params.client_params) if tenant_oauth_client_params else {}
  142. )
  143. new_params: dict = {
  144. key: value if value != HIDDEN_VALUE else original_params.get(key, UNKNOWN_VALUE)
  145. for key, value in client_params.items()
  146. }
  147. tenant_oauth_client_params.client_params = encrypter.encrypt(new_params)
  148. if enabled is not None:
  149. tenant_oauth_client_params.enabled = enabled
  150. session.commit()
  151. def is_system_oauth_params_exist(self, datasource_provider_id: DatasourceProviderID) -> bool:
  152. """
  153. check if system oauth params exist
  154. """
  155. with Session(db.engine).no_autoflush as session:
  156. return (
  157. session.query(DatasourceOauthParamConfig)
  158. .filter_by(provider=datasource_provider_id.provider_name, plugin_id=datasource_provider_id.plugin_id)
  159. .first()
  160. is not None
  161. )
  162. def is_tenant_oauth_params_enabled(self, tenant_id: str, datasource_provider_id: DatasourceProviderID) -> bool:
  163. """
  164. check if tenant oauth params is enabled
  165. """
  166. with Session(db.engine).no_autoflush as session:
  167. return (
  168. session.query(DatasourceOauthTenantParamConfig)
  169. .filter_by(
  170. tenant_id=tenant_id,
  171. provider=datasource_provider_id.provider_name,
  172. plugin_id=datasource_provider_id.plugin_id,
  173. enabled=True,
  174. )
  175. .count()
  176. > 0
  177. )
  178. def get_tenant_oauth_client(
  179. self, tenant_id: str, datasource_provider_id: DatasourceProviderID, mask: bool = False
  180. ) -> dict[str, Any] | None:
  181. """
  182. get tenant oauth client
  183. """
  184. with Session(db.engine).no_autoflush as session:
  185. tenant_oauth_client_params = (
  186. session.query(DatasourceOauthTenantParamConfig)
  187. .filter_by(
  188. tenant_id=tenant_id,
  189. provider=datasource_provider_id.provider_name,
  190. plugin_id=datasource_provider_id.plugin_id,
  191. )
  192. .first()
  193. )
  194. if tenant_oauth_client_params:
  195. encrypter, _ = self.get_oauth_encrypter(tenant_id, datasource_provider_id)
  196. if mask:
  197. return encrypter.mask_tool_credentials(encrypter.decrypt(tenant_oauth_client_params.client_params))
  198. else:
  199. return encrypter.decrypt(tenant_oauth_client_params.client_params)
  200. return None
  201. def get_oauth_encrypter(
  202. self, tenant_id: str, datasource_provider_id: DatasourceProviderID
  203. ) -> tuple[ProviderConfigEncrypter, ProviderConfigCache]:
  204. """
  205. get oauth encrypter
  206. """
  207. datasource_provider = self.provider_manager.fetch_datasource_provider(
  208. tenant_id=tenant_id, provider_id=str(datasource_provider_id)
  209. )
  210. if not datasource_provider.declaration.oauth_schema:
  211. raise ValueError("Datasource provider oauth schema not found")
  212. client_schema = datasource_provider.declaration.oauth_schema.client_schema
  213. return create_provider_encrypter(
  214. tenant_id=tenant_id,
  215. config=[x.to_basic_provider_config() for x in client_schema],
  216. cache=NoOpProviderCredentialCache(),
  217. )
  218. def get_oauth_client(self, tenant_id: str, datasource_provider_id: DatasourceProviderID) -> dict[str, Any] | None:
  219. """
  220. get oauth client
  221. """
  222. provider = datasource_provider_id.provider_name
  223. plugin_id = datasource_provider_id.plugin_id
  224. with Session(db.engine).no_autoflush as session:
  225. # get tenant oauth client params
  226. tenant_oauth_client_params = (
  227. session.query(DatasourceOauthTenantParamConfig)
  228. .filter_by(
  229. tenant_id=tenant_id,
  230. provider=provider,
  231. plugin_id=plugin_id,
  232. enabled=True,
  233. )
  234. .first()
  235. )
  236. if tenant_oauth_client_params:
  237. encrypter, _ = self.get_oauth_encrypter(tenant_id, datasource_provider_id)
  238. return encrypter.decrypt(tenant_oauth_client_params.client_params)
  239. # fallback to system oauth client params
  240. oauth_client_params = (
  241. session.query(DatasourceOauthParamConfig).filter_by(provider=provider, plugin_id=plugin_id).first()
  242. )
  243. if oauth_client_params:
  244. return oauth_client_params.system_credentials
  245. raise ValueError(f"Please configure oauth client params(system/tenant) for {plugin_id}/{provider}")
  246. @staticmethod
  247. def generate_next_datasource_provider_name(
  248. session: Session, tenant_id: str, provider_id: DatasourceProviderID, credential_type: CredentialType
  249. ) -> str:
  250. db_providers = (
  251. session.query(DatasourceProvider)
  252. .filter_by(
  253. tenant_id=tenant_id,
  254. provider=provider_id.provider_name,
  255. plugin_id=provider_id.plugin_id,
  256. auth_type=credential_type.value,
  257. )
  258. .all()
  259. )
  260. return generate_incremental_name(
  261. [provider.name for provider in db_providers],
  262. f"{credential_type.get_name()}",
  263. )
  264. def add_datasource_oauth_provider(
  265. self,
  266. name: str | None,
  267. tenant_id: str,
  268. provider_id: DatasourceProviderID,
  269. avatar_url: str | None,
  270. credentials: dict,
  271. ) -> None:
  272. """
  273. add datasource oauth provider
  274. """
  275. credential_type = CredentialType.OAUTH2
  276. with Session(db.engine) as session:
  277. lock = f"datasource_provider_create_lock:{tenant_id}_{provider_id}_{credential_type.value}"
  278. with redis_client.lock(lock, timeout=20):
  279. db_provider_name = name
  280. if not db_provider_name:
  281. db_provider_name = self.generate_next_datasource_provider_name(
  282. session=session,
  283. tenant_id=tenant_id,
  284. provider_id=provider_id,
  285. credential_type=credential_type,
  286. )
  287. else:
  288. if (
  289. session.query(DatasourceProvider)
  290. .filter_by(
  291. tenant_id=tenant_id,
  292. name=db_provider_name,
  293. provider=provider_id.provider_name,
  294. plugin_id=provider_id.plugin_id,
  295. auth_type=credential_type.value,
  296. )
  297. .count()
  298. > 0
  299. ):
  300. db_provider_name = generate_incremental_name(
  301. [
  302. provider.name
  303. for provider in session.query(DatasourceProvider).filter_by(
  304. tenant_id=tenant_id,
  305. provider=provider_id.provider_name,
  306. plugin_id=provider_id.plugin_id,
  307. )
  308. ],
  309. db_provider_name,
  310. )
  311. provider_credential_secret_variables = self.extract_secret_variables(
  312. tenant_id=tenant_id, provider_id=f"{provider_id}", credential_type=credential_type.value
  313. )
  314. for key, value in credentials.items():
  315. if key in provider_credential_secret_variables:
  316. # if send [__HIDDEN__] in secret input, it will be same as original value
  317. credentials[key] = encrypter.encrypt_token(tenant_id, value)
  318. datasource_provider = DatasourceProvider(
  319. tenant_id=tenant_id,
  320. name=db_provider_name,
  321. provider=provider_id.provider_name,
  322. plugin_id=provider_id.plugin_id,
  323. auth_type=credential_type.value,
  324. encrypted_credentials=credentials,
  325. avatar_url=avatar_url or "default",
  326. )
  327. session.add(datasource_provider)
  328. session.commit()
  329. def add_datasource_api_key_provider(
  330. self,
  331. name: str | None,
  332. tenant_id: str,
  333. provider_id: DatasourceProviderID,
  334. credentials: dict,
  335. ) -> None:
  336. """
  337. validate datasource provider credentials.
  338. :param tenant_id:
  339. :param provider:
  340. :param credentials:
  341. """
  342. provider_name = provider_id.provider_name
  343. plugin_id = provider_id.plugin_id
  344. with Session(db.engine) as session:
  345. lock = f"datasource_provider_create_lock:{tenant_id}_{provider_id}_api_key"
  346. with redis_client.lock(lock, timeout=20):
  347. db_provider_name = name or self.generate_next_datasource_provider_name(
  348. session=session,
  349. tenant_id=tenant_id,
  350. provider_id=provider_id,
  351. credential_type=CredentialType.API_KEY,
  352. )
  353. # check name is exist
  354. if session.query(DatasourceProvider).filter_by(tenant_id=tenant_id, name=db_provider_name).count() > 0:
  355. raise ValueError("Authorization name is already exists")
  356. credential_valid = self.provider_manager.validate_provider_credentials(
  357. tenant_id=tenant_id,
  358. user_id=current_user.id,
  359. provider=provider_name,
  360. plugin_id=plugin_id,
  361. credentials=credentials,
  362. )
  363. if credential_valid:
  364. provider_credential_secret_variables = self.extract_secret_variables(
  365. tenant_id=tenant_id, provider_id=f"{provider_id}", credential_type=CredentialType.API_KEY.value
  366. )
  367. for key, value in credentials.items():
  368. if key in provider_credential_secret_variables:
  369. # if send [__HIDDEN__] in secret input, it will be same as original value
  370. credentials[key] = encrypter.encrypt_token(tenant_id, value)
  371. datasource_provider = DatasourceProvider(
  372. tenant_id=tenant_id,
  373. name=db_provider_name,
  374. provider=provider_name,
  375. plugin_id=plugin_id,
  376. auth_type="api_key",
  377. encrypted_credentials=credentials,
  378. )
  379. db.session.add(datasource_provider)
  380. db.session.commit()
  381. else:
  382. raise CredentialsValidateFailedError()
  383. def extract_secret_variables(self, tenant_id: str, provider_id: str, credential_type: str) -> list[str]:
  384. """
  385. Extract secret input form variables.
  386. :param credential_form_schemas:
  387. :return:
  388. """
  389. datasource_provider = self.provider_manager.fetch_datasource_provider(
  390. tenant_id=tenant_id, provider_id=provider_id
  391. )
  392. credential_form_schemas = []
  393. if credential_type == "api_key":
  394. credential_form_schemas = datasource_provider.declaration.credentials_schema
  395. elif credential_type == "oauth2":
  396. if not datasource_provider.declaration.oauth_schema:
  397. raise ValueError("Datasource provider oauth schema not found")
  398. credential_form_schemas = datasource_provider.declaration.oauth_schema.credentials_schema
  399. else:
  400. raise ValueError(f"Invalid credential type: {credential_type}")
  401. secret_input_form_variables = []
  402. for credential_form_schema in credential_form_schemas:
  403. if credential_form_schema.type.value == FormType.SECRET_INPUT.value:
  404. secret_input_form_variables.append(credential_form_schema.name)
  405. return secret_input_form_variables
  406. def get_datasource_credentials(self, tenant_id: str, provider: str, plugin_id: str) -> list[dict]:
  407. """
  408. get datasource credentials.
  409. :param tenant_id: workspace id
  410. :param provider_id: provider id
  411. :return:
  412. """
  413. # Get all provider configurations of the current workspace
  414. datasource_providers: list[DatasourceProvider] = (
  415. db.session.query(DatasourceProvider)
  416. .filter(
  417. DatasourceProvider.tenant_id == tenant_id,
  418. DatasourceProvider.provider == provider,
  419. DatasourceProvider.plugin_id == plugin_id,
  420. )
  421. .all()
  422. )
  423. if not datasource_providers:
  424. return []
  425. copy_credentials_list = []
  426. default_provider = (
  427. db.session.query(DatasourceProvider.id)
  428. .filter_by(tenant_id=tenant_id, provider=provider, plugin_id=plugin_id)
  429. .order_by(DatasourceProvider.is_default.desc(), DatasourceProvider.created_at.asc())
  430. .first()
  431. )
  432. default_provider_id = default_provider.id if default_provider else None
  433. for datasource_provider in datasource_providers:
  434. encrypted_credentials = datasource_provider.encrypted_credentials
  435. # Get provider credential secret variables
  436. credential_secret_variables = self.extract_secret_variables(
  437. tenant_id=tenant_id,
  438. provider_id=f"{plugin_id}/{provider}",
  439. credential_type=datasource_provider.auth_type,
  440. )
  441. # Obfuscate provider credentials
  442. copy_credentials = encrypted_credentials.copy()
  443. for key, value in copy_credentials.items():
  444. if key in credential_secret_variables:
  445. copy_credentials[key] = encrypter.obfuscated_token(value)
  446. copy_credentials_list.append(
  447. {
  448. "credential": copy_credentials,
  449. "type": datasource_provider.auth_type,
  450. "name": datasource_provider.name,
  451. "avatar_url": datasource_provider.avatar_url,
  452. "id": datasource_provider.id,
  453. "is_default": default_provider_id and datasource_provider.id == default_provider_id,
  454. }
  455. )
  456. return copy_credentials_list
  457. def get_all_datasource_credentials(self, tenant_id: str) -> list[dict]:
  458. """
  459. get datasource credentials.
  460. :return:
  461. """
  462. # get all plugin providers
  463. manager = PluginDatasourceManager()
  464. datasources = manager.fetch_installed_datasource_providers(tenant_id)
  465. datasource_credentials = []
  466. for datasource in datasources:
  467. datasource_provider_id = DatasourceProviderID(f"{datasource.plugin_id}/{datasource.provider}")
  468. credentials = self.get_datasource_credentials(
  469. tenant_id=tenant_id, provider=datasource.provider, plugin_id=datasource.plugin_id
  470. )
  471. datasource_credentials.append(
  472. {
  473. "provider": datasource.provider,
  474. "plugin_id": datasource.plugin_id,
  475. "plugin_unique_identifier": datasource.plugin_unique_identifier,
  476. "icon": datasource.declaration.identity.icon,
  477. "name": datasource.declaration.identity.name,
  478. "label": datasource.declaration.identity.label.model_dump(),
  479. "description": datasource.declaration.identity.description.model_dump(),
  480. "author": datasource.declaration.identity.author,
  481. "credentials_list": credentials,
  482. "credential_schema": [
  483. credential.model_dump() for credential in datasource.declaration.credentials_schema
  484. ],
  485. "oauth_schema": {
  486. "client_schema": [
  487. client_schema.model_dump()
  488. for client_schema in datasource.declaration.oauth_schema.client_schema
  489. ],
  490. "credentials_schema": [
  491. credential_schema.model_dump()
  492. for credential_schema in datasource.declaration.oauth_schema.credentials_schema
  493. ],
  494. "oauth_custom_client_params": self.get_tenant_oauth_client(
  495. tenant_id, datasource_provider_id, mask=True
  496. ),
  497. "is_oauth_custom_client_enabled": self.is_tenant_oauth_params_enabled(
  498. tenant_id, datasource_provider_id
  499. ),
  500. "is_system_oauth_params_exists": self.is_system_oauth_params_exist(datasource_provider_id),
  501. }
  502. if datasource.declaration.oauth_schema
  503. else None,
  504. }
  505. )
  506. return datasource_credentials
  507. def get_real_datasource_credentials(self, tenant_id: str, provider: str, plugin_id: str) -> list[dict]:
  508. """
  509. get datasource credentials.
  510. :param tenant_id: workspace id
  511. :param provider_id: provider id
  512. :return:
  513. """
  514. # Get all provider configurations of the current workspace
  515. datasource_providers: list[DatasourceProvider] = (
  516. db.session.query(DatasourceProvider)
  517. .filter(
  518. DatasourceProvider.tenant_id == tenant_id,
  519. DatasourceProvider.provider == provider,
  520. DatasourceProvider.plugin_id == plugin_id,
  521. )
  522. .all()
  523. )
  524. if not datasource_providers:
  525. return []
  526. copy_credentials_list = []
  527. for datasource_provider in datasource_providers:
  528. encrypted_credentials = datasource_provider.encrypted_credentials
  529. # Get provider credential secret variables
  530. credential_secret_variables = self.extract_secret_variables(
  531. tenant_id=tenant_id,
  532. provider_id=f"{plugin_id}/{provider}",
  533. credential_type=datasource_provider.auth_type,
  534. )
  535. # Obfuscate provider credentials
  536. copy_credentials = encrypted_credentials.copy()
  537. for key, value in copy_credentials.items():
  538. if key in credential_secret_variables:
  539. copy_credentials[key] = encrypter.decrypt_token(tenant_id, value)
  540. copy_credentials_list.append(
  541. {
  542. "credentials": copy_credentials,
  543. "type": datasource_provider.auth_type,
  544. }
  545. )
  546. return copy_credentials_list
  547. def update_datasource_credentials(
  548. self, tenant_id: str, auth_id: str, provider: str, plugin_id: str, credentials: dict
  549. ) -> None:
  550. """
  551. update datasource credentials.
  552. """
  553. credential_valid = self.provider_manager.validate_provider_credentials(
  554. tenant_id=tenant_id,
  555. user_id=current_user.id,
  556. provider=provider,
  557. plugin_id=plugin_id,
  558. credentials=credentials,
  559. )
  560. if credential_valid:
  561. # Get all provider configurations of the current workspace
  562. datasource_provider = (
  563. db.session.query(DatasourceProvider)
  564. .filter_by(tenant_id=tenant_id, id=auth_id, provider=provider, plugin_id=plugin_id)
  565. .first()
  566. )
  567. if not datasource_provider:
  568. raise ValueError("Datasource provider not found")
  569. else:
  570. provider_credential_secret_variables = self.extract_secret_variables(
  571. tenant_id=tenant_id,
  572. provider_id=f"{plugin_id}/{provider}",
  573. credential_type=datasource_provider.auth_type,
  574. )
  575. original_credentials = datasource_provider.encrypted_credentials
  576. for key, value in credentials.items():
  577. if key in provider_credential_secret_variables:
  578. # if send [__HIDDEN__] in secret input, it will be same as original value
  579. if value == HIDDEN_VALUE and key in original_credentials:
  580. original_value = encrypter.encrypt_token(tenant_id, original_credentials[key])
  581. credentials[key] = encrypter.encrypt_token(tenant_id, original_value)
  582. else:
  583. credentials[key] = encrypter.encrypt_token(tenant_id, value)
  584. datasource_provider.encrypted_credentials = credentials
  585. db.session.commit()
  586. else:
  587. raise CredentialsValidateFailedError()
  588. def remove_datasource_credentials(self, tenant_id: str, auth_id: str, provider: str, plugin_id: str) -> None:
  589. """
  590. remove datasource credentials.
  591. :param tenant_id: workspace id
  592. :param provider: provider name
  593. :param plugin_id: plugin id
  594. :return:
  595. """
  596. datasource_provider = (
  597. db.session.query(DatasourceProvider)
  598. .filter_by(tenant_id=tenant_id, id=auth_id, provider=provider, plugin_id=plugin_id)
  599. .first()
  600. )
  601. if datasource_provider:
  602. db.session.delete(datasource_provider)
  603. db.session.commit()