- from core.plugin.entities.endpoint import EndpointEntityWithInstance
- from core.plugin.manager.base import BasePluginManager
-
-
- class PluginEndpointManager(BasePluginManager):
- def create_endpoint(
- self, tenant_id: str, user_id: str, plugin_unique_identifier: str, name: str, settings: dict
- ) -> bool:
- """
- Create an endpoint for the given plugin.
-
- Errors will be raised if any error occurs.
- """
- return self._request_with_plugin_daemon_response(
- "POST",
- f"plugin/{tenant_id}/endpoint/setup",
- bool,
- headers={
- "Content-Type": "application/json",
- },
- data={
- "user_id": user_id,
- "plugin_unique_identifier": plugin_unique_identifier,
- "settings": settings,
- "name": name,
- },
- )
-
- def list_endpoints(self, tenant_id: str, user_id: str, page: int, page_size: int):
- """
- List all endpoints for the given tenant and user.
- """
- return self._request_with_plugin_daemon_response(
- "GET",
- f"plugin/{tenant_id}/endpoint/list",
- list[EndpointEntityWithInstance],
- params={"page": page, "page_size": page_size},
- )
-
- def list_endpoints_for_single_plugin(self, tenant_id: str, user_id: str, plugin_id: str, page: int, page_size: int):
- """
- List all endpoints for the given tenant, user and plugin.
- """
- return self._request_with_plugin_daemon_response(
- "GET",
- f"plugin/{tenant_id}/endpoint/list/plugin",
- list[EndpointEntityWithInstance],
- params={"plugin_id": plugin_id, "page": page, "page_size": page_size},
- )
-
- def update_endpoint(self, tenant_id: str, user_id: str, endpoint_id: str, name: str, settings: dict):
- """
- Update the settings of the given endpoint.
- """
- return self._request_with_plugin_daemon_response(
- "POST",
- f"plugin/{tenant_id}/endpoint/update",
- bool,
- data={
- "user_id": user_id,
- "endpoint_id": endpoint_id,
- "name": name,
- "settings": settings,
- },
- headers={
- "Content-Type": "application/json",
- },
- )
-
- def delete_endpoint(self, tenant_id: str, user_id: str, endpoint_id: str):
- """
- Delete the given endpoint.
- """
- return self._request_with_plugin_daemon_response(
- "POST",
- f"plugin/{tenant_id}/endpoint/remove",
- bool,
- data={
- "endpoint_id": endpoint_id,
- },
- headers={
- "Content-Type": "application/json",
- },
- )
-
- def enable_endpoint(self, tenant_id: str, user_id: str, endpoint_id: str):
- """
- Enable the given endpoint.
- """
- return self._request_with_plugin_daemon_response(
- "POST",
- f"plugin/{tenant_id}/endpoint/enable",
- bool,
- data={
- "endpoint_id": endpoint_id,
- },
- headers={
- "Content-Type": "application/json",
- },
- )
-
- def disable_endpoint(self, tenant_id: str, user_id: str, endpoint_id: str):
- """
- Disable the given endpoint.
- """
- return self._request_with_plugin_daemon_response(
- "POST",
- f"plugin/{tenant_id}/endpoint/disable",
- bool,
- data={
- "endpoint_id": endpoint_id,
- },
- headers={
- "Content-Type": "application/json",
- },
- )
|