|
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166 |
- import traceback
- import typing
-
- import click
- from celery import shared_task # type: ignore
-
- from core.helper import marketplace
- from core.helper.marketplace import MarketplacePluginDeclaration
- from core.plugin.entities.plugin import PluginInstallationSource
- from core.plugin.impl.plugin import PluginInstaller
- from models.account import TenantPluginAutoUpgradeStrategy
-
- RETRY_TIMES_OF_ONE_PLUGIN_IN_ONE_TENANT = 3
-
-
- cached_plugin_manifests: dict[str, typing.Union[MarketplacePluginDeclaration, None]] = {}
-
-
- def marketplace_batch_fetch_plugin_manifests(
- plugin_ids_plain_list: list[str],
- ) -> list[MarketplacePluginDeclaration]:
- global cached_plugin_manifests
- # return marketplace.batch_fetch_plugin_manifests(plugin_ids_plain_list)
- not_included_plugin_ids = [
- plugin_id for plugin_id in plugin_ids_plain_list if plugin_id not in cached_plugin_manifests
- ]
- if not_included_plugin_ids:
- manifests = marketplace.batch_fetch_plugin_manifests_ignore_deserialization_error(not_included_plugin_ids)
- for manifest in manifests:
- cached_plugin_manifests[manifest.plugin_id] = manifest
-
- if (
- len(manifests) == 0
- ): # this indicates that the plugin not found in marketplace, should set None in cache to prevent future check
- for plugin_id in not_included_plugin_ids:
- cached_plugin_manifests[plugin_id] = None
-
- result: list[MarketplacePluginDeclaration] = []
- for plugin_id in plugin_ids_plain_list:
- final_manifest = cached_plugin_manifests.get(plugin_id)
- if final_manifest is not None:
- result.append(final_manifest)
-
- return result
-
-
- @shared_task(queue="plugin")
- def process_tenant_plugin_autoupgrade_check_task(
- tenant_id: str,
- strategy_setting: TenantPluginAutoUpgradeStrategy.StrategySetting,
- upgrade_time_of_day: int,
- upgrade_mode: TenantPluginAutoUpgradeStrategy.UpgradeMode,
- exclude_plugins: list[str],
- include_plugins: list[str],
- ):
- try:
- manager = PluginInstaller()
-
- click.echo(
- click.style(
- "Checking upgradable plugin for tenant: {}".format(tenant_id),
- fg="green",
- )
- )
-
- if strategy_setting == TenantPluginAutoUpgradeStrategy.StrategySetting.DISABLED:
- return
-
- # get plugin_ids to check
- plugin_ids: list[tuple[str, str, str]] = [] # plugin_id, version, unique_identifier
- click.echo(click.style("Upgrade mode: {}".format(upgrade_mode), fg="green"))
-
- if upgrade_mode == TenantPluginAutoUpgradeStrategy.UpgradeMode.PARTIAL and include_plugins:
- all_plugins = manager.list_plugins(tenant_id)
-
- for plugin in all_plugins:
- if plugin.source == PluginInstallationSource.Marketplace and plugin.plugin_id in include_plugins:
- plugin_ids.append(
- (
- plugin.plugin_id,
- plugin.version,
- plugin.plugin_unique_identifier,
- )
- )
-
- elif upgrade_mode == TenantPluginAutoUpgradeStrategy.UpgradeMode.EXCLUDE:
- # get all plugins and remove excluded plugins
- all_plugins = manager.list_plugins(tenant_id)
- plugin_ids = [
- (plugin.plugin_id, plugin.version, plugin.plugin_unique_identifier)
- for plugin in all_plugins
- if plugin.source == PluginInstallationSource.Marketplace and plugin.plugin_id not in exclude_plugins
- ]
- elif upgrade_mode == TenantPluginAutoUpgradeStrategy.UpgradeMode.ALL:
- all_plugins = manager.list_plugins(tenant_id)
- plugin_ids = [
- (plugin.plugin_id, plugin.version, plugin.plugin_unique_identifier)
- for plugin in all_plugins
- if plugin.source == PluginInstallationSource.Marketplace
- ]
-
- if not plugin_ids:
- return
-
- plugin_ids_plain_list = [plugin_id for plugin_id, _, _ in plugin_ids]
-
- manifests = marketplace_batch_fetch_plugin_manifests(plugin_ids_plain_list)
-
- if not manifests:
- return
-
- for manifest in manifests:
- for plugin_id, version, original_unique_identifier in plugin_ids:
- if manifest.plugin_id != plugin_id:
- continue
-
- try:
- current_version = version
- latest_version = manifest.latest_version
-
- def fix_only_checker(latest_version, current_version):
- latest_version_tuple = tuple(int(val) for val in latest_version.split("."))
- current_version_tuple = tuple(int(val) for val in current_version.split("."))
-
- if (
- latest_version_tuple[0] == current_version_tuple[0]
- and latest_version_tuple[1] == current_version_tuple[1]
- ):
- return latest_version_tuple[2] != current_version_tuple[2]
- return False
-
- version_checker = {
- TenantPluginAutoUpgradeStrategy.StrategySetting.LATEST: lambda latest_version,
- current_version: latest_version != current_version,
- TenantPluginAutoUpgradeStrategy.StrategySetting.FIX_ONLY: fix_only_checker,
- }
-
- if version_checker[strategy_setting](latest_version, current_version):
- # execute upgrade
- new_unique_identifier = manifest.latest_package_identifier
-
- marketplace.record_install_plugin_event(new_unique_identifier)
- click.echo(
- click.style(
- "Upgrade plugin: {} -> {}".format(original_unique_identifier, new_unique_identifier),
- fg="green",
- )
- )
- task_start_resp = manager.upgrade_plugin(
- tenant_id,
- original_unique_identifier,
- new_unique_identifier,
- PluginInstallationSource.Marketplace,
- {
- "plugin_unique_identifier": new_unique_identifier,
- },
- )
- except Exception as e:
- click.echo(click.style("Error when upgrading plugin: {}".format(e), fg="red"))
- traceback.print_exc()
- break
-
- except Exception as e:
- click.echo(click.style("Error when checking upgradable plugin: {}".format(e), fg="red"))
- traceback.print_exc()
- return
|