您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符

installed_app.py 6.8KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182
  1. import logging
  2. from typing import Any
  3. from flask import request
  4. from flask_login import current_user
  5. from flask_restx import Resource, inputs, marshal_with, reqparse
  6. from sqlalchemy import and_
  7. from werkzeug.exceptions import BadRequest, Forbidden, NotFound
  8. from controllers.console import api
  9. from controllers.console.explore.wraps import InstalledAppResource
  10. from controllers.console.wraps import account_initialization_required, cloud_edition_billing_resource_check
  11. from extensions.ext_database import db
  12. from fields.installed_app_fields import installed_app_list_fields
  13. from libs.datetime_utils import naive_utc_now
  14. from libs.login import login_required
  15. from models import App, InstalledApp, RecommendedApp
  16. from services.account_service import TenantService
  17. from services.app_service import AppService
  18. from services.enterprise.enterprise_service import EnterpriseService
  19. from services.feature_service import FeatureService
  20. logger = logging.getLogger(__name__)
  21. class InstalledAppsListApi(Resource):
  22. @login_required
  23. @account_initialization_required
  24. @marshal_with(installed_app_list_fields)
  25. def get(self):
  26. app_id = request.args.get("app_id", default=None, type=str)
  27. current_tenant_id = current_user.current_tenant_id
  28. if app_id:
  29. installed_apps = (
  30. db.session.query(InstalledApp)
  31. .where(and_(InstalledApp.tenant_id == current_tenant_id, InstalledApp.app_id == app_id))
  32. .all()
  33. )
  34. else:
  35. installed_apps = db.session.query(InstalledApp).where(InstalledApp.tenant_id == current_tenant_id).all()
  36. current_user.role = TenantService.get_user_role(current_user, current_user.current_tenant)
  37. installed_app_list: list[dict[str, Any]] = [
  38. {
  39. "id": installed_app.id,
  40. "app": installed_app.app,
  41. "app_owner_tenant_id": installed_app.app_owner_tenant_id,
  42. "is_pinned": installed_app.is_pinned,
  43. "last_used_at": installed_app.last_used_at,
  44. "editable": current_user.role in {"owner", "admin"},
  45. "uninstallable": current_tenant_id == installed_app.app_owner_tenant_id,
  46. }
  47. for installed_app in installed_apps
  48. if installed_app.app is not None
  49. ]
  50. # filter out apps that user doesn't have access to
  51. if FeatureService.get_system_features().webapp_auth.enabled:
  52. user_id = current_user.id
  53. app_ids = [installed_app["app"].id for installed_app in installed_app_list]
  54. webapp_settings = EnterpriseService.WebAppAuth.batch_get_app_access_mode_by_id(app_ids)
  55. # Pre-filter out apps without setting or with sso_verified
  56. filtered_installed_apps = []
  57. app_id_to_app_code = {}
  58. for installed_app in installed_app_list:
  59. app_id = installed_app["app"].id
  60. webapp_setting = webapp_settings.get(app_id)
  61. if not webapp_setting or webapp_setting.access_mode == "sso_verified":
  62. continue
  63. app_code = AppService.get_app_code_by_id(str(app_id))
  64. app_id_to_app_code[app_id] = app_code
  65. filtered_installed_apps.append(installed_app)
  66. app_codes = list(app_id_to_app_code.values())
  67. # Batch permission check
  68. permissions = EnterpriseService.WebAppAuth.batch_is_user_allowed_to_access_webapps(
  69. user_id=user_id,
  70. app_codes=app_codes,
  71. )
  72. # Keep only allowed apps
  73. res = []
  74. for installed_app in filtered_installed_apps:
  75. app_id = installed_app["app"].id
  76. app_code = app_id_to_app_code[app_id]
  77. if permissions.get(app_code):
  78. res.append(installed_app)
  79. installed_app_list = res
  80. logger.debug("installed_app_list: %s, user_id: %s", installed_app_list, user_id)
  81. installed_app_list.sort(
  82. key=lambda app: (
  83. -app["is_pinned"],
  84. app["last_used_at"] is None,
  85. -app["last_used_at"].timestamp() if app["last_used_at"] is not None else 0,
  86. )
  87. )
  88. return {"installed_apps": installed_app_list}
  89. @login_required
  90. @account_initialization_required
  91. @cloud_edition_billing_resource_check("apps")
  92. def post(self):
  93. parser = reqparse.RequestParser()
  94. parser.add_argument("app_id", type=str, required=True, help="Invalid app_id")
  95. args = parser.parse_args()
  96. recommended_app = db.session.query(RecommendedApp).where(RecommendedApp.app_id == args["app_id"]).first()
  97. if recommended_app is None:
  98. raise NotFound("App not found")
  99. current_tenant_id = current_user.current_tenant_id
  100. app = db.session.query(App).where(App.id == args["app_id"]).first()
  101. if app is None:
  102. raise NotFound("App not found")
  103. if not app.is_public:
  104. raise Forbidden("You can't install a non-public app")
  105. installed_app = (
  106. db.session.query(InstalledApp)
  107. .where(and_(InstalledApp.app_id == args["app_id"], InstalledApp.tenant_id == current_tenant_id))
  108. .first()
  109. )
  110. if installed_app is None:
  111. # todo: position
  112. recommended_app.install_count += 1
  113. new_installed_app = InstalledApp(
  114. app_id=args["app_id"],
  115. tenant_id=current_tenant_id,
  116. app_owner_tenant_id=app.tenant_id,
  117. is_pinned=False,
  118. last_used_at=naive_utc_now(),
  119. )
  120. db.session.add(new_installed_app)
  121. db.session.commit()
  122. return {"message": "App installed successfully"}
  123. class InstalledAppApi(InstalledAppResource):
  124. """
  125. update and delete an installed app
  126. use InstalledAppResource to apply default decorators and get installed_app
  127. """
  128. def delete(self, installed_app):
  129. if installed_app.app_owner_tenant_id == current_user.current_tenant_id:
  130. raise BadRequest("You can't uninstall an app owned by the current tenant")
  131. db.session.delete(installed_app)
  132. db.session.commit()
  133. return {"result": "success", "message": "App uninstalled successfully"}, 204
  134. def patch(self, installed_app):
  135. parser = reqparse.RequestParser()
  136. parser.add_argument("is_pinned", type=inputs.boolean)
  137. args = parser.parse_args()
  138. commit_args = False
  139. if "is_pinned" in args:
  140. installed_app.is_pinned = args["is_pinned"]
  141. commit_args = True
  142. if commit_args:
  143. db.session.commit()
  144. return {"result": "success", "message": "App info updated successfully"}
  145. api.add_resource(InstalledAppsListApi, "/installed-apps")
  146. api.add_resource(InstalledAppApi, "/installed-apps/<uuid:installed_app_id>")