From afd50d27ec4e44f5379666eb2e9cc4302f8f505a Mon Sep 17 00:00:00 2001 From: Raphael Lima Date: Thu, 6 Feb 2025 18:35:14 -0300 Subject: [PATCH] Orchestrator manager and worker processes skeleton This commit creates the skeleton for the redesign of dcmanager's orchestrator to a manager and worker process. Test plan: Note: The test plan had to be executed in a limited manner as the services were not created. Therefore, the processing was setup using threads and calling the worker class directly instead of using an RPC call. 1. PASS: Create a kube-rootca-strategy for two subclouds and apply. Verify they complete the orchestration successfully and the strategy's state is updated afterwards. 2. PASS: Verify that the manager's monitoring starts after the apply request and stops once the strategy is set to a finished state, i.e. failed, complete or aborted. 3. PASS: Verify that the manager's strategy deletion actives the monitoring thread and destroys the strategy once the steps are deleted. 4. PASS: Run an orchestration without the worker manager setup and verify that it can complete and abort successfully. 5. PASS: Apply an orchestration, restart the service and verify that the manager's monitoring is restarted as well. Story: 2011311 Task: 51670 Change-Id: I19592a50c47c5a0608e6e95a915b71423bcd97df Signed-off-by: Raphael Lima --- .../dcmanager/common/exceptions.py | 4 + distributedcloud/dcmanager/db/api.py | 43 +- .../dcmanager/db/sqlalchemy/api.py | 56 +- .../orchestrator/orchestrator_manager.py | 664 ++++++++++++++++++ .../orchestrator/orchestrator_worker.py | 597 ++++++++++++++++ .../orchestrator/strategies/__init__.py | 0 .../dcmanager/orchestrator/strategies/base.py | 133 ++++ .../orchestrator/strategies/firmware.py | 55 ++ .../orchestrator/strategies/kube_rootca.py | 55 ++ .../orchestrator/strategies/kubernetes.py | 72 ++ .../orchestrator/strategies/patch.py | 59 ++ .../orchestrator/strategies/prestage.py | 56 ++ .../orchestrator/strategies/software.py | 58 ++ 13 files changed, 1829 insertions(+), 23 deletions(-) create mode 100644 distributedcloud/dcmanager/orchestrator/orchestrator_manager.py create mode 100644 distributedcloud/dcmanager/orchestrator/orchestrator_worker.py create mode 100644 distributedcloud/dcmanager/orchestrator/strategies/__init__.py create mode 100644 distributedcloud/dcmanager/orchestrator/strategies/base.py create mode 100644 distributedcloud/dcmanager/orchestrator/strategies/firmware.py create mode 100644 distributedcloud/dcmanager/orchestrator/strategies/kube_rootca.py create mode 100644 distributedcloud/dcmanager/orchestrator/strategies/kubernetes.py create mode 100644 distributedcloud/dcmanager/orchestrator/strategies/patch.py create mode 100644 distributedcloud/dcmanager/orchestrator/strategies/prestage.py create mode 100644 distributedcloud/dcmanager/orchestrator/strategies/software.py diff --git a/distributedcloud/dcmanager/common/exceptions.py b/distributedcloud/dcmanager/common/exceptions.py index a4fa46d0c..697c2fd2a 100644 --- a/distributedcloud/dcmanager/common/exceptions.py +++ b/distributedcloud/dcmanager/common/exceptions.py @@ -311,6 +311,10 @@ class VaultLoadMissingError(DCManagerException): message = _("No matching: %(file_type)s found in vault: %(vault_dir)s") +class StrategyNotFound(NotFound): + message = _("Strategy with type %(strategy_type)s doesn't exist.") + + class StrategyStepNotFound(NotFound): message = _("StrategyStep with subcloud_id %(subcloud_id)s doesn't exist.") diff --git a/distributedcloud/dcmanager/db/api.py b/distributedcloud/dcmanager/db/api.py index 880e4fee2..a7999d593 100644 --- a/distributedcloud/dcmanager/db/api.py +++ b/distributedcloud/dcmanager/db/api.py @@ -1,5 +1,5 @@ # Copyright (c) 2015 Ericsson AB. -# Copyright (c) 2017-2024 Wind River Systems, Inc. +# Copyright (c) 2017-2025 Wind River Systems, Inc. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -25,6 +25,7 @@ from oslo_config import cfg from oslo_db import api from dccommon import consts as dccommon_consts +from dcmanager.common import consts from dcmanager.db.sqlalchemy import models CONF = cfg.CONF @@ -896,9 +897,36 @@ def strategy_step_get_by_name(context, name): return IMPL.strategy_step_get_by_name(context, name) -def strategy_step_get_all(context): +def strategy_step_get_all(context, steps_id=None, last_update_threshold=None): """Retrieve all patch strategy steps.""" - return IMPL.strategy_step_get_all(context) + return IMPL.strategy_step_get_all( + context, steps_id=steps_id, last_update_threshold=last_update_threshold + ) + + +def strategy_step_count_all_states(context): + """Retrieve the count of steps in each possible state""" + return IMPL.strategy_step_count_all_states(context) + + +def strategy_step_states_to_dict(states): + """Convert a list of strategy step states and their count to a dictionary""" + + # Pre-fill the dict with the required states to avoid key errors + content = { + consts.STRATEGY_STATE_INITIAL: 0, + consts.STRATEGY_STATE_COMPLETE: 0, + consts.STRATEGY_STATE_ABORTED: 0, + consts.STRATEGY_STATE_FAILED: 0, + "total": 0, + } + + # The states object is presented as [("initial", 2)] + for state in states: + content[state[0]] = state[1] + content["total"] += state[1] + + return content def strategy_step_bulk_create(context, subcloud_ids, stage, state, details): @@ -926,19 +954,20 @@ def strategy_step_update( ) -def strategy_step_update_all(context, filters, values): +def strategy_step_update_all(context, filters, values, steps_id=None): """Updates all strategy steps :param context: request context object :param filters: filters to be applied in the query :param values: values to be set for the specified strategies + :param steps_id: list of strategy steps to update """ - return IMPL.strategy_step_update_all(context, filters, values) + return IMPL.strategy_step_update_all(context, filters, values, steps_id) -def strategy_step_destroy_all(context): +def strategy_step_destroy_all(context, steps_id=None): """Destroy all the patch strategy steps.""" - return IMPL.strategy_step_destroy_all(context) + return IMPL.strategy_step_destroy_all(context, steps_id) ################### diff --git a/distributedcloud/dcmanager/db/sqlalchemy/api.py b/distributedcloud/dcmanager/db/sqlalchemy/api.py index e35def3b5..d3c62cedb 100644 --- a/distributedcloud/dcmanager/db/sqlalchemy/api.py +++ b/distributedcloud/dcmanager/db/sqlalchemy/api.py @@ -33,6 +33,7 @@ from oslo_utils import uuidutils import sqlalchemy from sqlalchemy import bindparam from sqlalchemy import desc +from sqlalchemy import func from sqlalchemy import insert from sqlalchemy import or_ from sqlalchemy.orm import exc @@ -991,7 +992,7 @@ def sw_update_strategy_get(context, update_type=None): query = query.filter_by(type=update_type) result = query.first() if not result: - raise exception.NotFound() + raise exception.StrategyNotFound(strategy_type=update_type) return result @@ -1906,15 +1907,32 @@ def strategy_step_get_by_name(context, name): @db_session_cleanup @require_context -def strategy_step_get_all(context): - result = ( - model_query(context, models.StrategyStep) - .filter_by(deleted=0) - .order_by(models.StrategyStep.id) - .all() - ) +def strategy_step_get_all(context, steps_id=None, last_update_threshold=None): + with read_session() as session: + query = ( + session.query(models.StrategyStep) + .filter_by(deleted=0) + .options(joinedload("*")) + ) - return result + if steps_id: + query = query.filter(models.StrategyStep.id.in_(steps_id)) + + if last_update_threshold: + query = query.filter(models.StrategyStep.updated_at < last_update_threshold) + + return query.order_by(models.StrategyStep.id).all() + + +@db_session_cleanup +@require_context +def strategy_step_count_all_states(context): + with read_session() as session: + return ( + session.query(models.StrategyStep.state, func.count(models.StrategyStep.id)) + .filter_by(deleted=0) + .group_by(models.StrategyStep.state) + ).all() @db_session_cleanup @@ -1987,33 +2005,39 @@ def strategy_step_update( @db_session_cleanup @require_admin_context -def strategy_step_update_all(context, filters, values): +def strategy_step_update_all(context, filters, values, steps_id=None): """Updates all strategy steps :param context: request context object :param filters: filters to be applied in the query :param values: values to be set for the specified strategies + :param steps_id: list of strategy steps to update """ with write_session() as session: query = session.query(models.StrategyStep).filter_by(deleted=0) + if steps_id: + query = query.filter(models.StrategyStep.id.in_(steps_id)) + for key, value in filters.items(): attribute = getattr(models.StrategyStep, key, None) if attribute: query = query.filter(attribute == value) - query.update(values) + query.update(values, synchronize_session="fetch") @db_session_cleanup @require_admin_context -def strategy_step_destroy_all(context): +def strategy_step_destroy_all(context, steps_id=None): with write_session() as session: - strategy_step_stages = strategy_step_get_all(context) - if strategy_step_stages: - for strategy_step_ref in strategy_step_stages: - session.delete(strategy_step_ref) + query = session.query(models.StrategyStep).filter_by(deleted=0) + + if steps_id: + query = query.filter(models.StrategyStep.id.in_(steps_id)) + + query.delete(synchronize_session="fetch") ########################## diff --git a/distributedcloud/dcmanager/orchestrator/orchestrator_manager.py b/distributedcloud/dcmanager/orchestrator/orchestrator_manager.py new file mode 100644 index 000000000..a516e0fa4 --- /dev/null +++ b/distributedcloud/dcmanager/orchestrator/orchestrator_manager.py @@ -0,0 +1,664 @@ +# Copyright 2017 Ericsson AB. +# Copyright (c) 2017-2025 Wind River Systems, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import datetime +import os +import shutil +import threading + +import eventlet +from oslo_log import log as logging +from oslo_utils import timeutils +from tsconfig.tsconfig import SW_VERSION + +from dccommon import consts as dccommon_consts +from dccommon import ostree_mount +from dcmanager.audit import rpcapi as dcmanager_audit_rpc_client +from dcmanager.common import consts +from dcmanager.common import context +from dcmanager.common import exceptions +from dcmanager.common import manager +from dcmanager.common import prestage +from dcmanager.common import scheduler +from dcmanager.common import utils +from dcmanager.db import api as db_api +from dcmanager.orchestrator.validators.firmware_validator import ( + FirmwareStrategyValidator, +) +from dcmanager.orchestrator.validators.kube_root_ca_validator import ( + KubeRootCaStrategyValidator, +) +from dcmanager.orchestrator.validators.kubernetes_validator import ( + KubernetesStrategyValidator, +) +from dcmanager.orchestrator.validators.patch_validator import PatchStrategyValidator +from dcmanager.orchestrator.validators.prestage_validator import ( + PrestageStrategyValidator, +) +from dcmanager.orchestrator.validators.sw_deploy_validator import ( + SoftwareDeployStrategyValidator, +) +from dcmanager.orchestrator.orchestrator_worker import OrchestratorWorker + +LOG = logging.getLogger(__name__) +ORCHESTRATION_STRATEGY_MONITORING_INTERVAL = 30 + + +# TODO(rlima): do not replace the class name while the service is not created. It should +# be set as a placeholder for now to use the current orchestrator service just by +# replacing the files +class SwUpdateManager(manager.Manager): + """Manages tasks related to software updates.""" + + def __init__(self, *args, **kwargs): + LOG.debug("SwUpdateManager initialization...") + + super().__init__(service_name="sw_update_manager", *args, **kwargs) + self.context = context.get_admin_context() + + # Used to protect strategies when an atomic read/update is required. + self.strategy_lock = threading.Lock() + + # Used to notify dcmanager-audit + self.audit_rpc_client = dcmanager_audit_rpc_client.ManagerAuditClient() + + # Used to determine the continuous execution of the strategy monitoring + self._monitor_strategy = False + + # Start worker threads + self.strategy_validators = { + consts.SW_UPDATE_TYPE_SOFTWARE: SoftwareDeployStrategyValidator(), + consts.SW_UPDATE_TYPE_FIRMWARE: FirmwareStrategyValidator(), + consts.SW_UPDATE_TYPE_KUBERNETES: KubernetesStrategyValidator(), + consts.SW_UPDATE_TYPE_KUBE_ROOTCA_UPDATE: KubeRootCaStrategyValidator(), + consts.SW_UPDATE_TYPE_PATCH: PatchStrategyValidator(), + consts.SW_UPDATE_TYPE_PRESTAGE: PrestageStrategyValidator(), + } + + self.orchestrator_worker = OrchestratorWorker(self.audit_rpc_client) + self.thread_group_manager = scheduler.ThreadGroupManager(thread_pool_size=10) + + # When starting the manager service, it is necessary to confirm if there + # are any strategies in a state different from initial, because that means + # the service was unexpectedly restarted and the periodic strategy monitoring + # should be restarted to finish the original processing. + try: + strategy = db_api.sw_update_strategy_get(self.context) + + if strategy and strategy.state not in [ + consts.SW_UPDATE_STATE_INITIAL, + consts.SW_UPDATE_STATE_COMPLETE, + consts.SW_UPDATE_STATE_ABORTED, + consts.SW_UPDATE_STATE_FAILED, + ]: + LOG.info( + f"An active {strategy.type} strategy was found, restarting " + "its monitoring" + ) + self.periodic_strategy_monitoring(strategy.type) + except exceptions.StrategyNotFound: + LOG.debug( + "There isn't an active strategy to orchestrate, skipping monitoring" + ) + + def periodic_strategy_monitoring(self, strategy_type): + # Reset the flag to start the monitoring + self._monitor_strategy = True + + while self._monitor_strategy: + try: + eventlet.greenthread.sleep(ORCHESTRATION_STRATEGY_MONITORING_INTERVAL) + self._periodic_strategy_monitoring_loop(strategy_type) + except eventlet.greenlet.GreenletExit: + # Exit the execution + return + except Exception: + LOG.exception("An error occurred in the strategy monitoring loop") + + def _create_and_send_step_batches(self, strategy_type, steps): + steps_to_orchestrate = list() + # chunksize = (len(steps) + CONF.orchestrator_worker_workers) // ( + # CONF.orchestrator_workers + # ) + + for step in steps: + steps_to_orchestrate.append(step.id) + + # if len(steps_to_orchestrate) == chunksize: + # self.orchestrator_worker_rpc_client.orchestrate( + # self.context, steps_to_orchestrate + # ) + # LOG.info(f"Sent steps to orchestrate: {steps_to_orchestrate}") + # steps_to_orchestrate = [] + + if steps_to_orchestrate: + self.thread_group_manager.start( + self.periodic_strategy_monitoring, strategy_type + ) + self.thread_group_manager.start( + self.orchestrator_worker.orchestrate, + steps_to_orchestrate, + strategy_type, + ) + # self.orchestrator_worker_rpc_client.orchestrate( + # self.context, steps_to_orchestrate + # ) + LOG.info(f"Sent final steps to orchestrate: {steps_to_orchestrate}") + + if steps: + LOG.info("Finished sending steps to orchestrate") + + def _verify_pending_steps(self, strategy_type): + """Verifies if there are any steps that were not updated in the threshold + + If there is, send them to be processed in the workers. + + :param strategy_type: the type of the strategy being monitored + :return: True if there are pending steps and False otherwise + """ + + # TODO(rlima): create a configuration variable for the seconds once the + # threashold is set + last_update_threshold = timeutils.utcnow() - datetime.timedelta(seconds=120) + + steps_to_process = db_api.strategy_step_get_all( + self.context, last_update_threshold=last_update_threshold + ) + + if steps_to_process: + self._create_and_send_step_batches(strategy_type, steps_to_process) + return True + + return False + + def _periodic_strategy_monitoring_loop(self, strategy_type): + """Verifies strategy and subcloud states""" + + strategy = db_api.sw_update_strategy_get(self.context, strategy_type) + + if ( + strategy.state == consts.SW_UPDATE_STATE_APPLYING + and self._verify_pending_steps(strategy_type) + ): + return + + # When the strategy is not in a finished state, it is necessary to verify the + # step's state to update the strategy accordingly. + steps_count = db_api.strategy_step_states_to_dict( + db_api.strategy_step_count_all_states(self.context) + ) + total_steps = steps_count["total"] + + if strategy.state in [ + consts.SW_UPDATE_STATE_APPLYING, + consts.SW_UPDATE_STATE_ABORTING, + ]: + LOG.debug( + f"The {strategy.type} strategy is not complete, verifying " + "possible state update" + ) + + new_state = None + failed_steps = steps_count[consts.STRATEGY_STATE_FAILED] + complete_steps = steps_count[consts.STRATEGY_STATE_COMPLETE] + aborted_steps = steps_count[consts.STRATEGY_STATE_ABORTED] + + # If all steps are completed, the strategy state is to be updated + if total_steps == failed_steps + complete_steps + aborted_steps: + if failed_steps > 0: + new_state = consts.SW_UPDATE_STATE_FAILED + elif aborted_steps > 0: + new_state = consts.SW_UPDATE_STATE_ABORTED + else: + new_state = consts.SW_UPDATE_STATE_COMPLETE + + if new_state: + # Once the strategy is set to a finished state, it does not need to + # be monitored anymore until it is requested to delete, so the + # execution is stopped + with self.strategy_lock: + db_api.sw_update_strategy_update( + self.context, + update_type=strategy_type, + state=new_state, + ) + self._monitor_strategy = False + elif strategy.state == consts.SW_UPDATE_STATE_ABORT_REQUESTED: + # When the strategy is set to abort requested, it needs to have all of + # the steps in initial state updated to aborted before proceeding + if steps_count[consts.STRATEGY_STATE_INITIAL] == 0: + new_state = consts.SW_UPDATE_STATE_ABORTING + elif strategy.state == consts.SW_UPDATE_STATE_DELETING: + # If all steps were deleted, delete the strategy + if total_steps == 0: + with self.strategy_lock: + db_api.sw_update_strategy_destroy(self.context, strategy_type) + self._monitor_strategy = False + + def stop(self): + self.thread_group_manager.stop() + + # todo(abailey): dc-vault actions are normally done by dcorch-api-proxy + # However this situation is unique since the strategy drives vault contents + def _vault_upload(self, vault_dir, src_file): + """Copies the file to the dc-vault, and returns the new path""" + # make sure the vault directory exists, create, if it is missing + if not os.path.isdir(vault_dir): + os.makedirs(vault_dir) + # determine the destination name for the file + dest_file = os.path.join(vault_dir, os.path.basename(src_file)) + # copy the file to the vault dir + # use 'copy' to preserve file system permissions + # note: if the dest and src are the same file, this operation fails + shutil.copy(src_file, dest_file) + return dest_file + + def _vault_remove(self, vault_dir, vault_file): + """Removes the the file from the dc-vault.""" + # no point in deleting if the file does not exist + if os.path.isfile(vault_file): + # no point in deleting if the file is not under a vault path + if vault_file.startswith(os.path.abspath(vault_dir) + os.sep): + # remove it + os.remove(vault_file) + + def _process_extra_args_creation(self, strategy_type, extra_args): + if extra_args: + # cert-file extra_arg needs vault handling for kube rootca update + if strategy_type == consts.SW_UPDATE_TYPE_KUBE_ROOTCA_UPDATE: + # extra_args can be 'cert-file' or 'subject / expiry_date' + # but combining both is not supported + cert_file = extra_args.get(consts.EXTRA_ARGS_CERT_FILE) + expiry_date = extra_args.get(consts.EXTRA_ARGS_EXPIRY_DATE) + subject = extra_args.get(consts.EXTRA_ARGS_SUBJECT) + if expiry_date: + is_valid, reason = utils.validate_expiry_date(expiry_date) + if not is_valid: + raise exceptions.BadRequest(resource="strategy", msg=reason) + if subject: + is_valid, reason = utils.validate_certificate_subject(subject) + if not is_valid: + raise exceptions.BadRequest(resource="strategy", msg=reason) + if cert_file: + if expiry_date or subject: + raise exceptions.BadRequest( + resource="strategy", + msg=( + "Invalid extra args. cannot be specified " + "along with or ." + ), + ) + # copy the cert-file to the vault + vault_file = self._vault_upload(consts.CERTS_VAULT_DIR, cert_file) + # update extra_args with the new path (in the vault) + extra_args[consts.EXTRA_ARGS_CERT_FILE] = vault_file + + def _process_extra_args_deletion(self, strategy_type, extra_args): + if extra_args: + # cert-file extra_arg needs vault handling for kube rootca update + if strategy_type == consts.SW_UPDATE_TYPE_KUBE_ROOTCA_UPDATE: + cert_file = extra_args.get(consts.EXTRA_ARGS_CERT_FILE) + if cert_file: + # remove this cert file from the vault + self._vault_remove(consts.CERTS_VAULT_DIR, cert_file) + + def create_sw_update_strategy(self, context, payload): + """Create software update strategy. + + :param context: request context object + :param payload: strategy configuration + """ + + LOG.info(f"Creating software update strategy of type {payload['type']}.") + + # Don't create a strategy if one exists. No need to filter by type + try: + strategy = db_api.sw_update_strategy_get(context, update_type=None) + except exceptions.StrategyNotFound: + pass + else: + msg = f"Strategy of type: '{strategy.type}' already exists" + + LOG.error( + "Failed creating software update strategy of type " + f"{payload['type']}. {msg}" + ) + raise exceptions.BadRequest(resource="strategy", msg=msg) + + single_group = None + subcloud_group = payload.get("subcloud_group") + + if subcloud_group: + single_group = utils.subcloud_group_get_by_ref(context, subcloud_group) + subcloud_apply_type = single_group.update_apply_type + max_parallel_subclouds = single_group.max_parallel_subclouds + else: + subcloud_apply_type = payload.get("subcloud-apply-type") + max_parallel_subclouds_str = payload.get("max-parallel-subclouds") + + if not max_parallel_subclouds_str: + max_parallel_subclouds = None + else: + max_parallel_subclouds = int(max_parallel_subclouds_str) + + stop_on_failure = payload.get("stop-on-failure") in ["true"] + force = payload.get(consts.EXTRA_ARGS_FORCE) in ["true"] + + # Has the user specified a specific subcloud? + cloud_name = payload.get("cloud_name") + strategy_type = payload.get("type") + prestage_global_validated = False + + # Has the user specified for_sw_deploy flag for prestage strategy? + if strategy_type == consts.SW_UPDATE_TYPE_PRESTAGE: + for_sw_deploy = payload.get(consts.PRESTAGE_FOR_SW_DEPLOY) in ["true"] + + if cloud_name: + # Make sure subcloud exists + try: + subcloud = db_api.subcloud_get_by_name(context, cloud_name) + except exceptions.SubcloudNameNotFound: + msg = f"Subcloud {cloud_name} does not exist" + LOG.error( + "Failed creating software update strategy of type " + f"{payload['type']}. {msg}" + ) + raise exceptions.BadRequest(resource="strategy", msg=msg) + + # TODO(rlima): move prestage to its validator + if strategy_type == consts.SW_UPDATE_TYPE_PRESTAGE: + # Do initial validation for subcloud + try: + prestage.global_prestage_validate(payload) + prestage_global_validated = True + prestage.initial_subcloud_validate(subcloud) + except exceptions.PrestagePreCheckFailedException as ex: + raise exceptions.BadRequest(resource="strategy", msg=str(ex)) + else: + self.strategy_validators[strategy_type].validate_strategy_requirements( + context, subcloud.id, subcloud.name, force + ) + + extra_args = None + if strategy_type != consts.SW_UPDATE_TYPE_PRESTAGE: + extra_args = self.strategy_validators[strategy_type].build_extra_args( + payload + ) + # Don't create a strategy if any of the subclouds is online and the + # relevant sync status is unknown. + # When the count is greater than 0, that means there are invalid subclouds + # and the execution should abort. + count_invalid_subclouds = db_api.subcloud_count_invalid_for_strategy_type( + context, + self.strategy_validators[strategy_type].endpoint_type, + single_group.id if subcloud_group else None, + cloud_name, + ) + if count_invalid_subclouds > 0: + msg = ( + f"{self.strategy_validators[strategy_type].endpoint_type} " + "sync status is unknown for one or more subclouds" + ) + LOG.error( + "Failed creating software update strategy of type " + f"{payload['type']}. {msg}" + ) + raise exceptions.BadRequest(resource="strategy", msg=msg) + + # handle extra_args processing such as staging to the vault + self._process_extra_args_creation(strategy_type, extra_args) + + if consts.SUBCLOUD_APPLY_TYPE_SERIAL == subcloud_apply_type: + max_parallel_subclouds = 1 + + if max_parallel_subclouds is None: + max_parallel_subclouds = ( + consts.DEFAULT_SUBCLOUD_GROUP_MAX_PARALLEL_SUBCLOUDS + ) + + valid_subclouds = db_api.subcloud_get_all_valid_for_strategy_step_creation( + context, + self.strategy_validators[strategy_type].endpoint_type, + single_group.id if subcloud_group else None, + cloud_name, + self.strategy_validators[strategy_type].build_availability_status_filter(), + self.strategy_validators[strategy_type].build_sync_status_filter(force), + ) + + # TODO(rlima): move this step to validators + if strategy_type == consts.SW_UPDATE_TYPE_PATCH: + # TODO(nicodemos): Remove the support for patch strategy in stx-11 + for subcloud, _ in valid_subclouds: + # We need to check the software version of the subcloud and + # the system controller. If the software versions are the same, we + # cannot apply the patch. + if subcloud.software_version == SW_VERSION: + msg = ( + f"Subcloud {subcloud.name} has the same software version as " + f"the system controller. The {strategy_type} strategy can " + "only be used for subclouds running the previous release." + ) + LOG.error( + "Failed creating software update strategy of type " + f"{payload['type']}. {msg}" + ) + raise exceptions.BadRequest(resource="strategy", msg=msg) + elif strategy_type == consts.SW_UPDATE_TYPE_SOFTWARE: + filtered_valid_subclouds = list() + + for subcloud, sync_status in valid_subclouds: + if sync_status == dccommon_consts.SYNC_STATUS_OUT_OF_SYNC: + filtered_valid_subclouds.append((subcloud, sync_status)) + + if filtered_valid_subclouds: + software_version = utils.get_major_release( + payload.get(consts.EXTRA_ARGS_RELEASE_ID) + ) + ostree_mount.validate_ostree_iso_mount(software_version) + + valid_subclouds = filtered_valid_subclouds + + elif strategy_type == consts.SW_UPDATE_TYPE_PRESTAGE: + if not prestage_global_validated: + try: + prestage.global_prestage_validate(payload) + except exceptions.PrestagePreCheckFailedException as ex: + raise exceptions.BadRequest(resource="strategy", msg=str(ex)) + + extra_args = { + consts.EXTRA_ARGS_SYSADMIN_PASSWORD: payload.get( + consts.EXTRA_ARGS_SYSADMIN_PASSWORD + ), + consts.EXTRA_ARGS_FORCE: force, + consts.PRESTAGE_SOFTWARE_VERSION: ( + payload.get(consts.PRESTAGE_REQUEST_RELEASE) + ), + consts.PRESTAGE_FOR_SW_DEPLOY: for_sw_deploy, + } + + filtered_valid_subclouds = [] + for subcloud, sync_status in valid_subclouds: + warn_msg = f"Excluding subcloud from prestage strategy: {subcloud.name}" + # Do initial validation for subcloud + try: + prestage.initial_subcloud_validate(subcloud) + filtered_valid_subclouds.append((subcloud, sync_status)) + except exceptions.PrestagePreCheckFailedException: + LOG.warn(warn_msg) + valid_subclouds = filtered_valid_subclouds + + if not valid_subclouds: + # handle extra_args processing such as removing from the vault + self._process_extra_args_deletion(strategy_type, extra_args) + msg = "Strategy has no steps to apply" + LOG.error( + "Failed creating software update strategy of type " + f"{payload['type']}. {msg}" + ) + raise exceptions.BadRequest(resource="strategy", msg=msg) + + # Create the strategy + strategy = db_api.sw_update_strategy_create( + context, + strategy_type, + subcloud_apply_type, + max_parallel_subclouds, + stop_on_failure, + consts.SW_UPDATE_STATE_INITIAL, + extra_args=extra_args, + ) + db_api.strategy_step_bulk_create( + context, + [subcloud.id for subcloud, _ in valid_subclouds], + stage=consts.STAGE_SUBCLOUD_ORCHESTRATION_CREATED, + state=consts.STRATEGY_STATE_INITIAL, + details="", + ) + # Clear the error_description field for all subclouds that will + # perform orchestration. + update_form = {"error_description": consts.ERROR_DESC_EMPTY} + db_api.subcloud_bulk_update_by_ids( + context, + [subcloud.id for subcloud, _ in valid_subclouds], + update_form, + ) + + LOG.info( + f"Finished creating software update strategy of type {payload['type']}." + ) + + return db_api.sw_update_strategy_db_model_to_dict(strategy) + + def delete_sw_update_strategy(self, context, update_type=None): + """Delete software update strategy. + + :param context: request context object. + :param update_type: the type to filter on querying + """ + LOG.info("Deleting software update strategy.") + + # Ensure our read/update of the strategy is done without interference + # The strategy object is common to all workers (patch, upgrades, etc) + with self.strategy_lock: + # Retrieve the existing strategy from the database + sw_update_strategy = db_api.sw_update_strategy_get( + context, update_type=update_type + ) + + # Semantic checking + if sw_update_strategy.state not in [ + consts.SW_UPDATE_STATE_INITIAL, + consts.SW_UPDATE_STATE_COMPLETE, + consts.SW_UPDATE_STATE_FAILED, + consts.SW_UPDATE_STATE_ABORTED, + ]: + raise exceptions.BadRequest( + resource="strategy", + msg="Strategy in state %s cannot be deleted" + % sw_update_strategy.state, + ) + + # Set the state to deleting + sw_update_strategy = db_api.sw_update_strategy_update( + context, state=consts.SW_UPDATE_STATE_DELETING, update_type=update_type + ) + + # Trigger the orchestration + steps = db_api.strategy_step_get_all(context) + self._create_and_send_step_batches(sw_update_strategy.type, steps) + + LOG.info( + f"Subcloud orchestration delete triggered for {sw_update_strategy.type}" + ) + + # handle extra_args processing such as removing from the vault + self._process_extra_args_deletion( + sw_update_strategy.type, sw_update_strategy.extra_args + ) + + strategy_dict = db_api.sw_update_strategy_db_model_to_dict(sw_update_strategy) + return strategy_dict + + def apply_sw_update_strategy(self, context, update_type=None): + """Apply software update strategy. + + :param context: request context object. + :param update_type: the type to filter on querying + """ + LOG.info("Applying software update strategy.") + + # Ensure our read/update of the strategy is done without interference + with self.strategy_lock: + # Retrieve the existing strategy from the database + sw_update_strategy = db_api.sw_update_strategy_get( + context, update_type=update_type + ) + + # Semantic checking + if sw_update_strategy.state != consts.SW_UPDATE_STATE_INITIAL: + raise exceptions.BadRequest( + resource="strategy", + msg="Strategy in state %s cannot be applied" + % sw_update_strategy.state, + ) + + # Set the state to applying + sw_update_strategy = db_api.sw_update_strategy_update( + context, state=consts.SW_UPDATE_STATE_APPLYING, update_type=update_type + ) + + # Trigger the orchestration + steps = db_api.strategy_step_get_all(context) + self._create_and_send_step_batches(sw_update_strategy.type, steps) + + LOG.info( + f"Subcloud orchestration apply triggered for {sw_update_strategy.type}" + ) + + strategy_dict = db_api.sw_update_strategy_db_model_to_dict(sw_update_strategy) + return strategy_dict + + def abort_sw_update_strategy(self, context, update_type=None): + """Abort software update strategy. + + :param context: request context object. + :param update_type: the type to filter on querying + """ + LOG.info("Aborting software update strategy.") + + # Ensure our read/update of the strategy is done without interference + with self.strategy_lock: + # Retrieve the existing strategy from the database + sw_update_strategy = db_api.sw_update_strategy_get( + context, update_type=update_type + ) + + # Semantic checking + if sw_update_strategy.state != consts.SW_UPDATE_STATE_APPLYING: + raise exceptions.BadRequest( + resource="strategy", + msg="Strategy in state %s cannot be aborted" + % sw_update_strategy.state, + ) + + # Set the state to abort requested, which will trigger + # the orchestration to abort... + sw_update_strategy = db_api.sw_update_strategy_update( + context, state=consts.SW_UPDATE_STATE_ABORT_REQUESTED + ) + strategy_dict = db_api.sw_update_strategy_db_model_to_dict(sw_update_strategy) + return strategy_dict diff --git a/distributedcloud/dcmanager/orchestrator/orchestrator_worker.py b/distributedcloud/dcmanager/orchestrator/orchestrator_worker.py new file mode 100644 index 000000000..6103e932e --- /dev/null +++ b/distributedcloud/dcmanager/orchestrator/orchestrator_worker.py @@ -0,0 +1,597 @@ +# Copyright 2017 Ericsson AB. +# Copyright (c) 2017-2025 Wind River Systems, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import datetime +import os +import threading +import time + +from keystoneauth1 import exceptions as keystone_exceptions +from oslo_log import log as logging + +from dccommon import consts as dccommon_consts +from dccommon.drivers.openstack import vim +from dcmanager.common import consts +from dcmanager.common import context +from dcmanager.common import exceptions +from dcmanager.common import scheduler +from dcmanager.db import api as db_api +from dcmanager.orchestrator.strategies.firmware import FirmwareStrategy +from dcmanager.orchestrator.strategies.kube_rootca import KubeRootcaStrategy +from dcmanager.orchestrator.strategies.kubernetes import KubernetesStrategy +from dcmanager.orchestrator.strategies.patch import PatchStrategy +from dcmanager.orchestrator.strategies.prestage import PrestageStrategy +from dcmanager.orchestrator.strategies.software import SoftwareStrategy + +LOG = logging.getLogger(__name__) +DEFAULT_SLEEP_TIME_IN_SECONDS = 10 +MANAGER_SLEEP_TIME_IN_SECONDS = 30 + + +class OrchestratorWorker(object): + """Orchestrator worker + + This class is responsible for orchestrating strategy steps based on the requests + from the orchestrator manager, which sends the steps and strategy step to process. + """ + + def __init__(self, audit_rpc_client): + self.context = context.get_admin_context() + # Keeps track of greenthreads we create to do work. + self.thread_group_manager = scheduler.ThreadGroupManager(thread_pool_size=5000) + # Track worker created for each subcloud. + self.subcloud_workers = dict() + # self.orchestrator_rpc_client = ( + # orchestrator_rpc_client.OrchestratorManagerClient() + # ) + self.pid = os.getpid() + # Determines if the worker is still processing the strategy steps or if the + # execution should finish + self._processing = True + # Determines if the worker should not process new steps + self._stop = threading.Event() + # Time for the orchestration to sleep after every loop + self._sleep_time = DEFAULT_SLEEP_TIME_IN_SECONDS + # Strategies orchestration process + self.strategies = { + consts.SW_UPDATE_TYPE_FIRMWARE: FirmwareStrategy(audit_rpc_client), + consts.SW_UPDATE_TYPE_KUBE_ROOTCA_UPDATE: KubeRootcaStrategy( + audit_rpc_client + ), + consts.SW_UPDATE_TYPE_KUBERNETES: KubernetesStrategy(audit_rpc_client), + consts.SW_UPDATE_TYPE_PATCH: PatchStrategy(audit_rpc_client), + consts.SW_UPDATE_TYPE_PRESTAGE: PrestageStrategy(audit_rpc_client), + consts.SW_UPDATE_TYPE_SOFTWARE: SoftwareStrategy(audit_rpc_client), + } + + @staticmethod + def _get_subcloud_name(step): + """Get the subcloud name for a step""" + + if step.subcloud_id is None: + # This is the SystemController. + return dccommon_consts.DEFAULT_REGION_NAME + return step.subcloud.name + + @staticmethod + def _get_region_name(step): + """Get the region name for a step""" + + if step.subcloud_id is None: + # This is the SystemController. + return dccommon_consts.DEFAULT_REGION_NAME + return step.subcloud.region_name + + @staticmethod + def _format_update_details(last_state, info): + # Optionally include the last state, since the current state is likely 'failed' + if last_state: + details = "%s: %s" % (last_state, info) + else: + details = str(info) + # details cannot exceed 1000 chars. inform user to check full logs + if len(details) > 1000: + details = ( + "Error message longer than 1000 characters, " + "please check orchestrator logs for additional details." + ) + return details + + def orchestrate(self, steps_id, strategy_type): + LOG.info(f"({self.pid}) Orchestration starting for {strategy_type}") + # Reset the control flags since a new process started + self._stop.clear() + self._processing = True + + while self._processing: + try: + LOG.debug(f"({self.pid}) Orchestration is running for {strategy_type}") + + strategy = db_api.sw_update_strategy_get( + self.context, update_type=strategy_type + ) + + if strategy.state in [ + consts.SW_UPDATE_STATE_APPLYING, + consts.SW_UPDATE_STATE_ABORTING, + ]: + self.strategies[strategy_type]._pre_apply_setup() + self._apply(strategy, steps_id) + elif strategy.state == consts.SW_UPDATE_STATE_ABORT_REQUESTED: + self._abort(strategy, steps_id) + elif strategy.state == consts.SW_UPDATE_STATE_DELETING: + self._delete(strategy, steps_id) + self.strategies[strategy_type]._post_delete_teardown() + except exceptions.StrategyNotFound: + self._processing = False + LOG.error( + f"({self.pid}) A strategy of type {strategy_type} was not found" + ) + except Exception: + # We catch all exceptions to avoid terminating the thread. + LOG.exception( + f"({self.pid}) Orchestration got an unexpected exception when " + f"processing strategy {strategy_type}" + ) + + # Wake up every so often to see if there is work to do. + time.sleep(self._sleep_time) + + LOG.info(f"({self.pid}) Orchestration finished for {strategy_type}") + self.thread_group_manager.stop() + + def _adjust_sleep_time(self, number_of_subclouds, strategy_type): + prev_sleep_time = self._sleep_time + + if number_of_subclouds <= 0: + new_sleep_time = DEFAULT_SLEEP_TIME_IN_SECONDS + else: + new_sleep_time = min( + (DEFAULT_SLEEP_TIME_IN_SECONDS * 60) + / min(number_of_subclouds, consts.MAX_PARALLEL_SUBCLOUDS_LIMIT), + DEFAULT_SLEEP_TIME_IN_SECONDS, + ) + + if new_sleep_time != prev_sleep_time: + self._sleep_time = new_sleep_time + LOG.debug( + f"({self.pid}) Adjusted {strategy_type} orch thread sleep time " + f"from {prev_sleep_time} to {self._sleep_time} " + f"based on {number_of_subclouds} parallel subclouds." + ) + + def _update_subcloud_deploy_status(self, subcloud): + # If an exception occurs during the create/apply of the VIM strategy, the + # deploy_status will be set to 'apply-strategy-failed'. If we retry the + # orchestration and the process completes successfully, we need to update the + # deploy_status to 'complete'. + if subcloud.deploy_status != consts.DEPLOY_STATE_DONE: + # Update deploy state for subclouds to complete + db_api.subcloud_update( + self.context, + subcloud.id, + deploy_status=consts.DEPLOY_STATE_DONE, + ) + + def _delete_subcloud_worker(self, region, subcloud_id): + db_api.strategy_step_update( + self.context, + subcloud_id, + stage=consts.STAGE_SUBCLOUD_ORCHESTRATION_PROCESSED, + ) + if region in self.subcloud_workers: + # The orchestration for this subcloud has either completed/failed/aborted, + # remove it from the dictionary. + LOG.debug(f"({self.pid}) Remove {region} from subcloud workers dict") + del self.subcloud_workers[region] + + def strategy_step_update( + self, strategy_type, subcloud_id, state=None, details=None, stage=None + ): + """Update the strategy step in the DB + + Sets the start and finished timestamp if necessary, based on state. + """ + started_at = None + finished_at = None + if state == self.strategies[strategy_type].starting_state: + started_at = datetime.datetime.now() + elif state in [ + consts.STRATEGY_STATE_COMPLETE, + consts.STRATEGY_STATE_ABORTED, + consts.STRATEGY_STATE_FAILED, + ]: + finished_at = datetime.datetime.now() + # Return the updated object, in case we need to use its updated values + return db_api.strategy_step_update( + self.context, + subcloud_id, + stage=stage, + state=state, + details=details, + started_at=started_at, + finished_at=finished_at, + ) + + def _perform_state_action(self, strategy_type, region, step): + """Extensible state handler for processing and transitioning states""" + + try: + LOG.info( + f"({self.pid}) Strategy: {strategy_type} Stage: {step.stage}, " + f"State: {step.state}, Subcloud: {self._get_subcloud_name(step)}" + ) + # Instantiate the state operator and perform the state actions + state_operator = self.strategies[strategy_type].determine_state_operator( + region, step + ) + state_operator.registerStopEvent(self._stop) + next_state = state_operator.perform_state_action(step) + self.strategy_step_update( + strategy_type, step.subcloud_id, state=next_state, details="" + ) + except exceptions.StrategySkippedException as ex: + LOG.info( + f"({self.pid}) Skipping subcloud, Strategy: {strategy_type} " + f"Stage: {step.stage}, State: {step.state}, " + f"Subcloud: {step.subcloud_name}" + ) + # Transition immediately to complete. Update the details to show + # that this subcloud has been skipped + self.strategy_step_update( + strategy_type, + step.subcloud_id, + state=consts.STRATEGY_STATE_COMPLETE, + details=self._format_update_details(None, str(ex)), + ) + except Exception as ex: + # Catch ALL exceptions and set the strategy to failed + LOG.exception( + f"({self.pid}) Failed! Strategy: {strategy_type} Stage: {step.stage}, " + f"State: {step.state}, Subcloud: {step.subcloud_name}" + ) + self.strategy_step_update( + strategy_type, + step.subcloud_id, + state=consts.STRATEGY_STATE_FAILED, + details=self._format_update_details(step.state, str(ex)), + ) + + def _process_update_step(self, strategy_type, region, step, log_error=False): + """Manages the green thread for calling perform_state_action""" + + if region in self.subcloud_workers: + if self.subcloud_workers[region][0] == step.state: + # A worker already exists. Let it finish whatever it was doing. + if log_error: + LOG.error( + f"({self.pid}) Worker should not exist for {region} " + f"in strategy {strategy_type}." + ) + else: + LOG.info( + f"({self.pid}) Update worker exists for {region} " + f"in strategy {strategy_type}." + ) + else: + LOG.info( + f"({self.pid}) Starting a new worker for region {region} at state " + f"{step.state} (update) in strategy {strategy_type}" + ) + # Advance to the next state. The previous greenthread has exited, + # create a new one. + self.subcloud_workers[region] = ( + step.state, + self.thread_group_manager.start( + self._perform_state_action, strategy_type, region, step + ), + ) + else: + # This is the first state. Create a greenthread to start processing the + # update for the subcloud and invoke the perform_state_action method. + LOG.info( + f"({self.pid}) Starting a new worker for region {region} at state " + f"{step.state} in strategy {strategy_type}" + ) + self.subcloud_workers[region] = ( + step.state, + self.thread_group_manager.start( + self._perform_state_action, strategy_type, region, step + ), + ) + + def _apply(self, strategy, steps_id): + """Apply a strategy""" + + LOG.debug(f"({self.pid}) Applying strategy {strategy.type}") + steps = db_api.strategy_step_get_all(self.context, steps_id) + # Adjust sleep time based on the number of subclouds being processed + # in parallel + self._adjust_sleep_time(len(steps), strategy.type) + + for step in steps: + if step.state == consts.STRATEGY_STATE_COMPLETE: + # Update deploy state for subclouds to complete + self._update_subcloud_deploy_status(step.subcloud) + # This step is complete + self._delete_subcloud_worker( + step.subcloud.region_name, step.subcloud_id + ) + continue + elif step.state == consts.STRATEGY_STATE_ABORTED: + # This step was aborted + self._delete_subcloud_worker( + step.subcloud.region_name, step.subcloud_id + ) + continue + elif step.state == consts.STRATEGY_STATE_FAILED: + self._delete_subcloud_worker( + step.subcloud.region_name, step.subcloud_id + ) + # This step has failed and needs no further action + if step.subcloud_id is None: + # Strategy on SystemController failed. We are done. + LOG.info( + f"({self.pid}) Stopping strategy {strategy.type} due to " + "failure while processing update step on SystemController" + ) + # TODO(rlima): Remove this request and replace with the + # stop rpc call + # with self.strategy_lock: + # db_api.sw_update_strategy_update( + # self.context, + # state=consts.SW_UPDATE_STATE_FAILED, + # update_type=self.update_type, + # ) + self._sleep_time = DEFAULT_SLEEP_TIME_IN_SECONDS + # Trigger audit to update the sync status for each subcloud. + self.strategies[strategy.type].trigger_audit() + return + elif strategy.stop_on_failure: + # We have been told to stop on failures + self._stop.set() + break + continue + # We have found the first step that isn't complete or failed. + break + else: + # The strategy application is complete + self.subcloud_workers.clear() + self._sleep_time = DEFAULT_SLEEP_TIME_IN_SECONDS + + # Trigger audit to update the sync status for each subcloud. + LOG.info(f"({self.pid}) Trigger audit for {strategy.type}") + self.strategies[strategy.type].trigger_audit() + return + + # The worker is not allowed to process new steps. It should only finish the + # ones it is currently executing before it quits + if self._stop.is_set(): + work_remaining = False + # We are going to stop after the steps that are in progress finish. + if len(self.subcloud_workers) > 0: + work_remaining = True + + if not work_remaining: + # We have completed the remaining steps + LOG.info( + f"({self.pid}) Stopping strategy {strategy.type} due to failure" + ) + self._sleep_time = DEFAULT_SLEEP_TIME_IN_SECONDS + # Trigger audit to update the sync status for each subcloud. + self.strategies[strategy.type].trigger_audit() + + for step in steps: + region = self._get_region_name(step) + if not self._processing: + LOG.info( + f"({self.pid}) Exiting {strategy.type} strategy because task " + "is stopped" + ) + self.subcloud_workers.clear() + self._sleep_time = DEFAULT_SLEEP_TIME_IN_SECONDS + return + if step.state in [ + consts.STRATEGY_STATE_FAILED, + consts.STRATEGY_STATE_COMPLETE, + consts.STRATEGY_STATE_ABORTED, + ]: + LOG.debug( + f"({self.pid}) Intermediate step is {step.state} for strategy " + f"{strategy.type}" + ) + self._delete_subcloud_worker(region, step.subcloud_id) + elif step.state == consts.STRATEGY_STATE_INITIAL: + if ( + # TODO(rlima): replace the calculation to consider the + # amount of workers + strategy.max_parallel_subclouds > len(self.subcloud_workers) + and not self._stop.is_set() + ): + # Don't start upgrading this subcloud if it has been unmanaged by + # the user. If orchestration was already started, it will be allowed + # to complete. + if ( + step.subcloud_id is not None + and step.subcloud.management_state + == dccommon_consts.MANAGEMENT_UNMANAGED + ): + message = f"Subcloud {step.subcloud_name} is unmanaged." + LOG.warn(f"({self.pid}) {message}") + self.strategy_step_update( + strategy.type, + step.subcloud_id, + state=consts.STRATEGY_STATE_FAILED, + details=message, + ) + continue + + # We are just getting started, enter the first state + # Use the updated value for calling process_update_step + step = self.strategy_step_update( + strategy.type, + step.subcloud_id, + stage=consts.STAGE_SUBCLOUD_ORCHESTRATION_STARTED, + state=self.strategies[strategy.type].starting_state, + ) + # Starting state should log an error if greenthread exists + self._process_update_step( + strategy.type, region, step, log_error=True + ) + else: + self._process_update_step(strategy.type, region, step, log_error=False) + + def _abort(self, strategy, steps_id): + """Abort a strategy""" + + LOG.info(f"({self.pid}) Aborting strategy {strategy.type}") + + # Only strategy steps that did not start processing can be updated to aborted + filters = {"state": consts.STRATEGY_STATE_INITIAL} + values = {"state": consts.STRATEGY_STATE_ABORTED, "details": ""} + + # Currently, the orchestrator only supports executing a single strategy at + # a time and there isn't any database relationship between the steps and the + # strategy, so we just update all the steps + db_api.strategy_step_update_all(self.context, filters, values, steps_id) + + # Since the steps were just recently updated, the manager needs to confirm + # all workers completed the request before proceeding, so the sleep is longer + # to avoid unnecessary requests + self._sleep_time = MANAGER_SLEEP_TIME_IN_SECONDS + + def _do_delete_subcloud_strategy(self, strategy_type, region, step): + """Delete the vim strategy in the subcloud""" + + LOG.info( + f"({self.pid}) Deleting vim strategy:({strategy_type}) for " + f"region:({region})" + ) + + # First check if the strategy has been created. + try: + vim_client = self.strategies[strategy_type].get_vim_client(region) + subcloud_strategy = vim_client.get_strategy( + self.strategies[strategy_type].vim_strategy_name + ) + except (keystone_exceptions.EndpointNotFound, IndexError): + LOG.warn( + f"({self.pid}) Endpoint for subcloud: {region} not found in " + f"strategy {strategy_type}." + ) + return + except Exception: + # Strategy doesn't exist so there is nothing to do + return + + if subcloud_strategy.state in [ + vim.STATE_BUILDING, + vim.STATE_APPLYING, + vim.STATE_ABORTING, + ]: + # Can't delete a vim strategy in these states + LOG.warn( + f"({self.pid}) Vim strategy:(" + f"{self.strategies[strategy_type].vim_strategy_name}) for region:(" + f"{region}) in wrong state:({subcloud_strategy.state}) for delete." + ) + return + + # If we are here, we need to delete the strategy + try: + vim_client.delete_strategy(self.strategies[strategy_type].vim_strategy_name) + except Exception: + LOG.warn( + f"({self.pid}) Vim strategy:" + f"({self.strategies[strategy_type].vim_strategy_name}) delete failed " + f"for region:({region})" + ) + return + + def _delete_subcloud_strategy(self, strategy_type, region, step): + """Delete the strategy in the subcloud + + Removes the worker reference after the operation is complete. + """ + + try: + self._do_delete_subcloud_strategy(strategy_type, region, step) + except Exception as e: + LOG.exception(e) + finally: + # The worker is done. + if region in self.subcloud_workers: + del self.subcloud_workers[region] + + def _delete(self, strategy, steps_id): + """Delete an update strategy""" + + LOG.info(f"({self.pid}) Deleting strategy {strategy.type}") + + steps = db_api.strategy_step_get_all(self.context, steps_id) + + # Adjust sleep time based on the number of subclouds being processed + # in parallel + self._adjust_sleep_time(len(steps), strategy.type) + + for step in steps: + region = self._get_region_name(step) + + if region in self.subcloud_workers: + # A worker already exists. Let it finish whatever it was doing. + LOG.debug( + f"({self.pid}) Worker already exists for {region} in strategy " + f"{strategy.type}." + ) + else: + # Create a greenthread to delete the subcloud strategy + delete_thread = self.thread_group_manager.start( + self._delete_subcloud_strategy, strategy.type, region, step + ) + if delete_thread: + self.subcloud_workers[region] = delete_thread + + if self._stop.is_set(): + LOG.info( + f"({self.pid}) Exiting because task is stopped for strategy " + f"{strategy.type}" + ) + return + + # Wait for 180 seconds so that last 100 workers can complete their execution + counter = 0 + while len(self.subcloud_workers) > 0: + time.sleep(10) + counter = counter + 1 + if counter > 18: + break + + # Remove the strategy from the database if all workers have completed their + # execution + try: + db_api.strategy_step_destroy_all(self.context, steps_id) + except Exception as e: + LOG.exception( + f"({self.pid}) exception during delete in strategy {strategy.type}" + ) + raise e + finally: + # The orchestration is complete, halt the processing + self._processing = False + self._sleep_time = DEFAULT_SLEEP_TIME_IN_SECONDS + + LOG.info(f"({self.pid}) Finished deleting strategy {strategy.type}") diff --git a/distributedcloud/dcmanager/orchestrator/strategies/__init__.py b/distributedcloud/dcmanager/orchestrator/strategies/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/distributedcloud/dcmanager/orchestrator/strategies/base.py b/distributedcloud/dcmanager/orchestrator/strategies/base.py new file mode 100644 index 000000000..dcbe2e235 --- /dev/null +++ b/distributedcloud/dcmanager/orchestrator/strategies/base.py @@ -0,0 +1,133 @@ +# Copyright 2017 Ericsson AB. +# Copyright (c) 2017-2025 Wind River Systems, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +import abc + +from oslo_log import log as logging + +from dccommon import consts as dccommon_consts +from dccommon.drivers.openstack.patching_v1 import PatchingClient +from dccommon.drivers.openstack.sdk_platform import OpenStackDriver +from dccommon.drivers.openstack.software_v1 import SoftwareClient +from dccommon.drivers.openstack.sysinv_v1 import SysinvClient +from dccommon.drivers.openstack import vim +from dcmanager.common import context +from dcmanager.common import utils + +LOG = logging.getLogger(__name__) + + +class BaseStrategy(object): + """Base strategy + + This class is responsible specifying common methods used by strategies during + their orchestration. + """ + + # each subclass must provide the STATE_OPERATORS + STATE_OPERATORS = {} + + def __init__( + self, + audit_rpc_client, + update_type, + vim_strategy_name, + starting_state, + ): + # Context object for RPC queries + self.context = context.get_admin_context() + # Used to notify dcmanager-audit to trigger an audit + self.audit_rpc_client = audit_rpc_client + # The update type for the orch thread + self.update_type = update_type + # The vim strategy name for the orch thread + self.vim_strategy_name = vim_strategy_name + # When an apply is initiated, this is the first state + self.starting_state = starting_state + # Track if the strategy setup function was executed + self._setup = False + + @abc.abstractmethod + def trigger_audit(self): + """Subclass MUST override this method""" + LOG.warn( + "(%s) BaseStrategy subclass must override trigger_audit" % self.update_type + ) + + def _pre_apply_setup(self): + """Setup performed once before a strategy starts to apply""" + if not self._setup: + LOG.info("(%s) BaseStrategy Pre-Apply Setup" % self.update_type) + self._setup = True + self.pre_apply_setup() + + def pre_apply_setup(self): + """Subclass can override this method""" + + def _post_delete_teardown(self): + """Cleanup code executed once after deleting a strategy""" + if self._setup: + LOG.info("(%s) BaseStrategy Post-Delete Teardown" % self.update_type) + self._setup = False + self.post_delete_teardown() + + def post_delete_teardown(self): + """Subclass can override this method""" + + @staticmethod + def get_ks_client(region_name=dccommon_consts.DEFAULT_REGION_NAME): + """This will get a cached keystone client (and token) + + throws an exception if keystone client cannot be initialized + """ + os_client = OpenStackDriver( + region_name=region_name, + region_clients=None, + fetch_subcloud_ips=utils.fetch_subcloud_mgmt_ips, + ) + return os_client.keystone_client + + @staticmethod + def get_vim_client(region_name=dccommon_consts.DEFAULT_REGION_NAME): + ks_client = BaseStrategy.get_ks_client(region_name) + return vim.VimClient(region_name, ks_client.session) + + @staticmethod + def get_sysinv_client(region_name=dccommon_consts.DEFAULT_REGION_NAME): + ks_client = BaseStrategy.get_ks_client(region_name) + endpoint = ks_client.endpoint_cache.get_endpoint("sysinv") + return SysinvClient(region_name, ks_client.session, endpoint=endpoint) + + @staticmethod + def get_software_client(region_name=dccommon_consts.DEFAULT_REGION_NAME): + ks_client = BaseStrategy.get_ks_client(region_name) + return SoftwareClient( + ks_client.session, + endpoint=ks_client.endpoint_cache.get_endpoint("usm"), + ) + + @staticmethod + def get_patching_client(region_name=dccommon_consts.DEFAULT_REGION_NAME): + ks_client = BaseStrategy.get_ks_client(region_name) + return PatchingClient(region_name, ks_client.session) + + def determine_state_operator(self, region_name, strategy_step): + """Return the state operator for the current state""" + + state_operator = self.STATE_OPERATORS.get(strategy_step.state) + # instantiate and return the state_operator class + return state_operator(region_name=region_name) diff --git a/distributedcloud/dcmanager/orchestrator/strategies/firmware.py b/distributedcloud/dcmanager/orchestrator/strategies/firmware.py new file mode 100644 index 000000000..a6ef349bb --- /dev/null +++ b/distributedcloud/dcmanager/orchestrator/strategies/firmware.py @@ -0,0 +1,55 @@ +# Copyright 2017 Ericsson AB. +# Copyright (c) 2017-2021, 2024-2025 Wind River Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from dccommon.drivers.openstack import vim +from dcmanager.common import consts +from dcmanager.orchestrator.states.firmware.applying_vim_strategy import ( + ApplyingVIMStrategyState, +) +from dcmanager.orchestrator.states.firmware.creating_vim_strategy import ( + CreatingVIMStrategyState, +) +from dcmanager.orchestrator.states.firmware.finishing_fw_update import ( + FinishingFwUpdateState, +) +from dcmanager.orchestrator.states.firmware.importing_firmware import ( + ImportingFirmwareState, +) +from dcmanager.orchestrator.strategies.base import BaseStrategy + + +class FirmwareStrategy(BaseStrategy): + """Firmware orchestration strategy""" + + # Every state in fw orchestration must have an operator + STATE_OPERATORS = { + consts.STRATEGY_STATE_IMPORTING_FIRMWARE: ImportingFirmwareState, + consts.STRATEGY_STATE_CREATING_FW_UPDATE_STRATEGY: CreatingVIMStrategyState, + consts.STRATEGY_STATE_APPLYING_FW_UPDATE_STRATEGY: ApplyingVIMStrategyState, + consts.STRATEGY_STATE_FINISHING_FW_UPDATE: FinishingFwUpdateState, + } + + def __init__(self, audit_rpc_client): + super().__init__( + audit_rpc_client, + consts.SW_UPDATE_TYPE_FIRMWARE, + vim.STRATEGY_NAME_FW_UPDATE, + consts.STRATEGY_STATE_IMPORTING_FIRMWARE, + ) + + def trigger_audit(self): + """Trigger an audit for firmware""" + self.audit_rpc_client.trigger_firmware_audit(self.context) diff --git a/distributedcloud/dcmanager/orchestrator/strategies/kube_rootca.py b/distributedcloud/dcmanager/orchestrator/strategies/kube_rootca.py new file mode 100644 index 000000000..f52cd4b20 --- /dev/null +++ b/distributedcloud/dcmanager/orchestrator/strategies/kube_rootca.py @@ -0,0 +1,55 @@ +# +# Copyright (c) 2020-2021, 2024-2025 Wind River Systems, Inc. +# +# SPDX-License-Identifier: Apache-2.0 +# +from dccommon.drivers.openstack import vim +from dcmanager.common import consts + +from dcmanager.orchestrator.states.kube_rootca.applying_vim_strategy import ( + ApplyingVIMKubeRootcaUpdateStrategyState, +) +from dcmanager.orchestrator.states.kube_rootca.creating_vim_strategy import ( + CreatingVIMKubeRootcaUpdateStrategyState, +) +from dcmanager.orchestrator.states.kube_rootca.pre_check import ( + KubeRootcaUpdatePreCheckState, +) +from dcmanager.orchestrator.states.kube_rootca.start_update import ( + KubeRootcaUpdateStartState, +) +from dcmanager.orchestrator.states.kube_rootca.upload_cert import ( + KubeRootcaUpdateUploadCertState, +) +from dcmanager.orchestrator.strategies.base import BaseStrategy + + +class KubeRootcaStrategy(BaseStrategy): + """Kube RootCA orchestration strategy""" + + # Reassign constants to avoid line length issues + PRE_CHECK = consts.STRATEGY_STATE_KUBE_ROOTCA_UPDATE_PRE_CHECK + START = consts.STRATEGY_STATE_KUBE_ROOTCA_UPDATE_START + UPLOAD_CERT = consts.STRATEGY_STATE_KUBE_ROOTCA_UPDATE_UPLOAD_CERT + CREATE_VIM_STRATEGY = consts.STRATEGY_STATE_CREATING_VIM_KUBE_ROOTCA_UPDATE_STRATEGY + APPLY_VIM_STRATEGY = consts.STRATEGY_STATE_APPLYING_VIM_KUBE_ROOTCA_UPDATE_STRATEGY + + STATE_OPERATORS = { + PRE_CHECK: KubeRootcaUpdatePreCheckState, + START: KubeRootcaUpdateStartState, + UPLOAD_CERT: KubeRootcaUpdateUploadCertState, + CREATE_VIM_STRATEGY: CreatingVIMKubeRootcaUpdateStrategyState, + APPLY_VIM_STRATEGY: ApplyingVIMKubeRootcaUpdateStrategyState, + } + + def __init__(self, audit_rpc_client): + super().__init__( + audit_rpc_client, + consts.SW_UPDATE_TYPE_KUBE_ROOTCA_UPDATE, + vim.STRATEGY_NAME_KUBE_ROOTCA_UPDATE, + consts.STRATEGY_STATE_KUBE_ROOTCA_UPDATE_PRE_CHECK, + ) + + def trigger_audit(self): + """Trigger an audit for kube rootca update""" + self.audit_rpc_client.trigger_kube_rootca_update_audit(self.context) diff --git a/distributedcloud/dcmanager/orchestrator/strategies/kubernetes.py b/distributedcloud/dcmanager/orchestrator/strategies/kubernetes.py new file mode 100644 index 000000000..77ed94763 --- /dev/null +++ b/distributedcloud/dcmanager/orchestrator/strategies/kubernetes.py @@ -0,0 +1,72 @@ +# Copyright 2017 Ericsson AB. +# Copyright (c) 2017-2021, 2024-2025 Wind River Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from dccommon.drivers.openstack import vim +from dcmanager.common import consts +from dcmanager.orchestrator.cache.shared_cache_repository import SharedCacheRepository +from dcmanager.orchestrator.states.kube.applying_vim_kube_upgrade_strategy import ( + ApplyingVIMKubeUpgradeStrategyState, +) +from dcmanager.orchestrator.states.kube.creating_vim_kube_upgrade_strategy import ( + CreatingVIMKubeUpgradeStrategyState, +) +from dcmanager.orchestrator.states.kube.pre_check import KubeUpgradePreCheckState +from dcmanager.orchestrator.strategies.base import BaseStrategy + + +class KubernetesStrategy(BaseStrategy): + """Kubernetes orchestration strategy""" + + # Reassign constants to avoid line length issues + PRE_CHECK = consts.STRATEGY_STATE_KUBE_UPGRADE_PRE_CHECK + CREATE_VIM_STRATEGY = consts.STRATEGY_STATE_KUBE_CREATING_VIM_KUBE_UPGRADE_STRATEGY + APPLY_VIM_STRATEGY = consts.STRATEGY_STATE_KUBE_APPLYING_VIM_KUBE_UPGRADE_STRATEGY + + # every state in kube orchestration must have an operator + # The states are listed here in their typical execution order + STATE_OPERATORS = { + PRE_CHECK: KubeUpgradePreCheckState, + CREATE_VIM_STRATEGY: CreatingVIMKubeUpgradeStrategyState, + APPLY_VIM_STRATEGY: ApplyingVIMKubeUpgradeStrategyState, + } + + def __init__(self, audit_rpc_client): + super().__init__( + audit_rpc_client, + consts.SW_UPDATE_TYPE_KUBERNETES, + vim.STRATEGY_NAME_KUBE_UPGRADE, + consts.STRATEGY_STATE_KUBE_UPGRADE_PRE_CHECK, + ) + + # Initialize shared cache instances for the states that require them + self._shared_caches = SharedCacheRepository(self.update_type) + self._shared_caches.initialize_caches() + + def trigger_audit(self): + """Trigger an audit for kubernetes""" + self.audit_rpc_client.trigger_kubernetes_audit(self.context) + + def pre_apply_setup(self): + # Restart caches for next strategy so that we always have the + # latest RegionOne data at the moment the strategy is applied + self._shared_caches.initialize_caches() + super().pre_apply_setup() + + def determine_state_operator(self, region_name, strategy_step): + state = super().determine_state_operator(region_name, strategy_step) + # Share the cache with the state object + state.add_shared_caches(self._shared_caches) + return state diff --git a/distributedcloud/dcmanager/orchestrator/strategies/patch.py b/distributedcloud/dcmanager/orchestrator/strategies/patch.py new file mode 100644 index 000000000..9ba6aa566 --- /dev/null +++ b/distributedcloud/dcmanager/orchestrator/strategies/patch.py @@ -0,0 +1,59 @@ +# Copyright 2017 Ericsson AB. +# Copyright (c) 2017-2025 Wind River Systems, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +# TODO(nicodemos): Remove this file and all patch states after all support +# to patching is removed + +from dccommon.drivers.openstack import vim +from dcmanager.common import consts +from dcmanager.orchestrator.states.patch.applying_vim_patch_strategy import ( + ApplyingVIMPatchStrategyState, +) +from dcmanager.orchestrator.states.patch.creating_vim_patch_strategy import ( + CreatingVIMPatchStrategyState, +) +from dcmanager.orchestrator.states.patch.pre_check import PreCheckState +from dcmanager.orchestrator.states.patch.updating_patches import UpdatingPatchesState +from dcmanager.orchestrator.strategies.base import BaseStrategy + + +class PatchStrategy(BaseStrategy): + """Patch orchestration strategy""" + + # Reassign constants to avoid line length issues + PRE_CHECK = consts.STRATEGY_STATE_PRE_CHECK + UPDATING_PATCHES = consts.STRATEGY_STATE_UPDATING_PATCHES + CREATE_VIM_STRATEGY = consts.STRATEGY_STATE_CREATING_VIM_PATCH_STRATEGY + APPLY_VIM_STRATEGY = consts.STRATEGY_STATE_APPLYING_VIM_PATCH_STRATEGY + + STATE_OPERATORS = { + PRE_CHECK: PreCheckState, + UPDATING_PATCHES: UpdatingPatchesState, + CREATE_VIM_STRATEGY: CreatingVIMPatchStrategyState, + APPLY_VIM_STRATEGY: ApplyingVIMPatchStrategyState, + } + + def __init__(self, audit_rpc_client): + super().__init__( + audit_rpc_client, + consts.SW_UPDATE_TYPE_PATCH, + vim.STRATEGY_NAME_SW_PATCH, + starting_state=consts.STRATEGY_STATE_PRE_CHECK, + ) + + def trigger_audit(self): + self.audit_rpc_client.trigger_patch_audit(self.context) diff --git a/distributedcloud/dcmanager/orchestrator/strategies/prestage.py b/distributedcloud/dcmanager/orchestrator/strategies/prestage.py new file mode 100644 index 000000000..a697c717e --- /dev/null +++ b/distributedcloud/dcmanager/orchestrator/strategies/prestage.py @@ -0,0 +1,56 @@ +# Copyright (c) 2022-2025 Wind River Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from dcmanager.common import consts +from dcmanager.orchestrator.cache.shared_cache_repository import SharedCacheRepository +from dcmanager.orchestrator.states.prestage import states +from dcmanager.orchestrator.strategies.base import BaseStrategy + + +class PrestageStrategy(BaseStrategy): + """Prestage orchestration strategy""" + + # Every state in prestage orchestration must have an operator + # The states are listed here in their typical execution order + STATE_OPERATORS = { + consts.STRATEGY_STATE_PRESTAGE_PRE_CHECK: states.PrestagePreCheckState, + consts.STRATEGY_STATE_PRESTAGE_PACKAGES: states.PrestagePackagesState, + consts.STRATEGY_STATE_PRESTAGE_IMAGES: states.PrestageImagesState, + } + + def __init__(self, audit_rpc_client): + super().__init__( + audit_rpc_client, + consts.SW_UPDATE_TYPE_PRESTAGE, + None, + consts.STRATEGY_STATE_PRESTAGE_PRE_CHECK, + ) + # Initialize shared cache instances for the states that require them + self._shared_caches = SharedCacheRepository(consts.SW_UPDATE_TYPE_SOFTWARE) + self._shared_caches.initialize_caches() + + def pre_apply_setup(self): + # Restart caches for next strategy + self._shared_caches.initialize_caches() + super().pre_apply_setup() + + def determine_state_operator(self, region_name, strategy_step): + state = super().determine_state_operator(region_name, strategy_step) + state.add_shared_caches(self._shared_caches) + return state + + def trigger_audit(self): + """Trigger an audit""" diff --git a/distributedcloud/dcmanager/orchestrator/strategies/software.py b/distributedcloud/dcmanager/orchestrator/strategies/software.py new file mode 100644 index 000000000..68d8ab9f4 --- /dev/null +++ b/distributedcloud/dcmanager/orchestrator/strategies/software.py @@ -0,0 +1,58 @@ +# +# Copyright (c) 2023-2025 Wind River Systems, Inc. +# +# SPDX-License-Identifier: Apache-2.0 +# + +from dccommon.drivers.openstack import vim +from dcmanager.common import consts +from dcmanager.orchestrator.cache.shared_cache_repository import SharedCacheRepository +from dcmanager.orchestrator.states.software.apply_vim_software_strategy import ( + ApplyVIMSoftwareStrategyState, +) +from dcmanager.orchestrator.states.software.create_vim_software_strategy import ( + CreateVIMSoftwareStrategyState, +) +from dcmanager.orchestrator.states.software.finish_strategy import FinishStrategyState +from dcmanager.orchestrator.states.software.install_license import InstallLicenseState +from dcmanager.orchestrator.states.software.pre_check import PreCheckState +from dcmanager.orchestrator.strategies.base import BaseStrategy + + +class SoftwareStrategy(BaseStrategy): + """Software orchestration strategy""" + + # every state in sw deploy orchestration should have an operator + STATE_OPERATORS = { + consts.STRATEGY_STATE_SW_PRE_CHECK: PreCheckState, + consts.STRATEGY_STATE_SW_INSTALL_LICENSE: InstallLicenseState, + consts.STRATEGY_STATE_SW_CREATE_VIM_STRATEGY: CreateVIMSoftwareStrategyState, + consts.STRATEGY_STATE_SW_APPLY_VIM_STRATEGY: ApplyVIMSoftwareStrategyState, + consts.STRATEGY_STATE_SW_FINISH_STRATEGY: FinishStrategyState, + } + + def __init__(self, audit_rpc_client): + super().__init__( + audit_rpc_client, + consts.SW_UPDATE_TYPE_SOFTWARE, # software update strategy type + vim.STRATEGY_NAME_SW_USM, # strategy type used by vim + consts.STRATEGY_STATE_SW_PRE_CHECK, # starting state + ) + + # Initialize shared cache instances for the states that require them + self._shared_caches = SharedCacheRepository(consts.SW_UPDATE_TYPE_SOFTWARE) + self._shared_caches.initialize_caches() + + def trigger_audit(self): + """Trigger an audit for software""" + self.audit_rpc_client.trigger_software_audit(self.context) + + def pre_apply_setup(self): + # Restart caches for next strategy + self._shared_caches.initialize_caches() + super().pre_apply_setup() + + def determine_state_operator(self, region_name, strategy_step): + state = super().determine_state_operator(region_name, strategy_step) + state.add_shared_caches(self._shared_caches) + return state