Merge "DC decoupling - clean up certmon"

This commit is contained in:
Zuul 2025-04-08 19:52:23 +00:00 committed by Gerrit Code Review
commit a6d42645f8
12 changed files with 39 additions and 1788 deletions

View File

@ -253,6 +253,7 @@ RESTART_CONTROLLER_MANAGER=0
RESTART_SCHEDULER=0
RESTART_SYSINV=0
RESTART_CERT_MON=0
RESTART_DC_CERT_MON=0
RESTART_ETCD=0
# Fist check the validity of the Root CAs in /etc/kubernetes/pki/ca.crt and /etc/etcd/ca.crt
@ -306,6 +307,9 @@ if [ ${ERR} -eq 0 ]; then
if [ ${result} -eq 0 ]; then
RESTART_SYSINV=1
RESTART_CERT_MON=1
# dccertmon is only provisioned in DC systems, so there
# won't be any impacts if it is restarted in AIO as well.
RESTART_DC_CERT_MON=1
elif [ ${result} -eq 1 ]; then
ERR_REASON="Failed to renew admin.conf certificate."
ERR=1
@ -496,6 +500,14 @@ if [ ${RESTART_CERT_MON} -eq 1 ]; then
ERR=2
fi
fi
# Restart dccert-mon since it's using credentials from admin.conf
if [ ${RESTART_DC_CERT_MON} -eq 1 ]; then
sm-restart-safe service dccertmon
if [ $? -ne 0 ]; then
ERR_REASON="Failed to restart dccertmon service."
ERR=2
fi
fi
# Restart etcd server
if [ ${RESTART_ETCD} -eq 1 ]; then
sm-restart-safe service etcd

View File

@ -1,4 +1,4 @@
# Copyright (c) 2020-2024 Wind River Systems, Inc.
# Copyright (c) 2020-2025 Wind River Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -21,35 +21,13 @@ import eventlet
import greenlet
from oslo_config import cfg
from oslo_log import log
from oslo_serialization import base64
from oslo_service import periodic_task
from sysinv.cert_mon import subcloud_audit_queue
from sysinv.cert_mon import utils
from sysinv.cert_mon import watcher
from sysinv.common import constants
from sysinv.common import utils as cutils
LOG = log.getLogger(__name__)
TASK_NAME_PAUSE_AUDIT = 'pause'
INVALID_SUBCLOUD_AUDIT_DEPLOY_STATES = [
# Secondary subclouds should not be audited as they are expected
# to be managed by a peer system controller (geo-redundancy feat.)
'create-complete',
'create-failed',
'pre-rehome',
'rehome-failed',
'rehome-pending',
'rehoming',
'secondary',
'secondary-failed',
]
cert_mon_opts = [
cfg.IntOpt('audit_interval',
default=86400, # 24 hours
help='Interval to run certificate audit'),
cfg.IntOpt('retry_interval',
default=10 * 60, # retry every 10 minutes
help='Interval to reattempt accessing external system '
@ -58,28 +36,6 @@ cert_mon_opts = [
default=14, # retry 14 times to give at least 2 hours to recover
help='Max number of reattempts accessing external system '
'if failure occurred'),
cfg.BoolOpt('startup_audit_all',
default=False,
help='Audit all subclouds on startup'),
cfg.IntOpt('network_retry_interval',
default=180, # every 3 minutes
help='Max times to reattempt accessing external system '
'if network failure occurred'),
cfg.IntOpt('network_max_retry',
default=30,
help='Interval to reattempt accessing external system '
'if network failure occurred'),
cfg.IntOpt('audit_batch_size',
default=40,
help='Batch size of subcloud audits per audit_interval'),
cfg.IntOpt('audit_greenpool_size',
default=20,
help='Size of subcloud audit greenpool.'
'Set to 0 to disable use of greenpool '
'(force serial audit).'),
cfg.IntOpt('certificate_timeout_secs',
default=5,
help='Connection timeout for certificate check (in seconds)'),
]
CONF = cfg.CONF
@ -90,252 +46,17 @@ class CertificateMonManager(periodic_task.PeriodicTasks):
def __init__(self):
super(CertificateMonManager, self).__init__(CONF)
self.mon_threads = []
self.audit_thread = None
self.token_cache = utils.get_internal_token_cache()
self.dc_token_cache = utils.get_dc_token_cache()
self.dc_monitor = None
self.task_thread = None
self.systemlocalcacert_monitor = None
self.restapicert_monitor = None
self.registrycert_monitor = None
self.openldapcert_monitor = None
self.reattempt_monitor_tasks = []
self.sc_audit_queue = subcloud_audit_queue.SubcloudAuditPriorityQueue()
if CONF.certmon.audit_greenpool_size > 0:
self.sc_audit_pool = eventlet.greenpool.GreenPool(
size=CONF.certmon.audit_greenpool_size)
else:
self.sc_audit_pool = None
def periodic_tasks(self, context, raise_on_error=False):
"""Tasks to be run at a periodic interval."""
return self.run_periodic_tasks(context, raise_on_error=raise_on_error)
@periodic_task.periodic_task(spacing=CONF.certmon.audit_interval)
def audit_sc_cert_start(self, context):
"""Kicks an audit of all subclouds.
By default this task runs once every 24 hours.
"""
# auditing subcloud certificate
dc_role = utils.get_dc_role()
if dc_role != constants.DISTRIBUTED_CLOUD_ROLE_SYSTEMCONTROLLER:
# Do nothing if it is not systemcontroller
return
all_subclouds = utils.get_subclouds_from_dcmanager(
self.token_cache.get_token(), INVALID_SUBCLOUD_AUDIT_DEPLOY_STATES
)
# Update sysinv endpoint cache
management_ips = {sc["name"]: sc["management_ip"] for sc in all_subclouds}
utils.SubcloudSysinvEndpointCache.cache_endpoints_by_ip(management_ips)
LOG.info("Periodic: begin subcloud certificate audit: %d subclouds"
% len(all_subclouds))
for sc in all_subclouds:
try:
self.sc_audit_queue.enqueue(
subcloud_audit_queue.SubcloudAuditData(sc['name']))
except subcloud_audit_queue.SubcloudAuditException as exc:
# Log as warn because we can see this if the watch has fired
# near the same time as we are auditing the subcloud
LOG.warn("Failed to enqueue subcloud audit: %s", str(exc))
def on_start_audit(self):
"""
On service start audit
Audit all subclouds that are out-of-sync
"""
dc_role = utils.get_dc_role()
if dc_role != constants.DISTRIBUTED_CLOUD_ROLE_SYSTEMCONTROLLER:
# Do nothing if it is not systemcontroller
return
if CONF.certmon.startup_audit_all:
LOG.info("Service start startup_audit_all: audit all subclouds")
self.audit_sc_cert_start(None)
return
all_subclouds = utils.get_subclouds_from_dcmanager(
self.token_cache.get_token(), INVALID_SUBCLOUD_AUDIT_DEPLOY_STATES
)
# Update sysinv endpoint cache
management_ips = {sc["name"]: sc["management_ip"] for sc in all_subclouds}
utils.SubcloudSysinvEndpointCache.cache_endpoints_by_ip(management_ips)
LOG.info(
"Service start: begin subcloud certificate audit [#sc: %d, batch: %s]"
% (len(all_subclouds), CONF.certmon.audit_batch_size)
)
for subcloud in all_subclouds:
if subcloud[utils.ENDPOINT_TYPE_DC_CERT] != utils.SYNC_STATUS_IN_SYNC:
subcloud_name = subcloud['name']
if self.sc_audit_queue.contains(subcloud_name):
LOG.info("%s is not in-sync but already under audit"
% subcloud_name)
else:
LOG.info("%s is not in-sync, adding it to audit"
% subcloud_name)
self.sc_audit_queue.enqueue(
subcloud_audit_queue.SubcloudAuditData(subcloud_name))
if self.sc_audit_queue.qsize() > 0:
LOG.info("Startup audit: %d subcloud(s) to be audited" %
self.sc_audit_queue.qsize())
else:
LOG.info("Startup audit: all subclouds are in-sync")
@periodic_task.periodic_task(spacing=5)
def audit_sc_cert_task(self, context):
"""This task runs every N seconds, and is responsible for running
a single subcloud through its next step in the subcloud audit process.
Pull up to <batch_count> number of ready-to-audit subcloud audit
data items from the sc_audit_queue, and spawn each item to be
executed via the GreenPool (or directly invoke the audit if the
GreenPool is disabled).
"""
for batch_count in range(CONF.certmon.audit_batch_size):
if self.sc_audit_queue.qsize() < 1:
# Nothing to do
return
# Only continue if the next in queue is ready to be audited
# Peek into the timestamp of the next item in our priority queue
next_audit_timestamp = self.sc_audit_queue.queue[0][0]
if next_audit_timestamp > int(time.time()):
LOG.debug("audit_sc_cert_task: no audits ready for "
"processing, qsize=%s"
% self.sc_audit_queue.qsize())
return
_, sc_audit_item = self.sc_audit_queue.get()
LOG.debug(
"audit_sc_cert_task: enqueue subcloud audit %s, "
"qsize:%s, batch:%s" %
(sc_audit_item, self.sc_audit_queue.qsize(), batch_count))
# This item is ready for audit
if self.sc_audit_pool is not None:
self.sc_audit_pool.spawn_n(self.do_subcloud_audit,
sc_audit_item)
else:
self.do_subcloud_audit(sc_audit_item)
eventlet.sleep()
def do_subcloud_audit(self, sc_audit_item):
"""A wrapper function to ensure the subcloud audit task is marked done
within sc_audit_queue"""
try:
self._subcloud_audit(sc_audit_item)
except Exception:
LOG.exception("An error occurred during the subcloud audit task")
finally:
self.sc_audit_queue.task_done()
def _subcloud_audit(self, sc_audit_item):
"""Invoke a subcloud audit"""
subcloud_name = sc_audit_item.name
LOG.info("Auditing subcloud %s, attempt #%s [qsize: %s]"
% (subcloud_name,
sc_audit_item.audit_count,
self.sc_audit_queue.qsize()))
def my_dc_token():
"""Ensure we always have a valid token"""
return self.dc_token_cache.get_token()
# Abort audit if subcloud is in a valid deploy status
subcloud = utils.get_subcloud(my_dc_token(), subcloud_name)
if subcloud['deploy-status'] in INVALID_SUBCLOUD_AUDIT_DEPLOY_STATES:
LOG.info(f"Subcloud {subcloud_name} is in an invalid deploy status:"
f" {subcloud['deploy-status']}, aborting audit")
return
subcloud_sysinv_url = None
try:
subcloud_sysinv_url = utils.SubcloudSysinvEndpointCache.build_endpoint(
subcloud["management-start-ip"]
)
utils.SubcloudSysinvEndpointCache.update_endpoints(
{subcloud_name: subcloud_sysinv_url}
)
sc_ssl_cert = utils.get_endpoint_certificate(
subcloud_sysinv_url,
timeout_secs=CONF.certmon.certificate_timeout_secs)
except Exception:
if not utils.is_subcloud_online(subcloud_name, my_dc_token()):
LOG.warn("Subcloud is not online, aborting audit: %s"
% subcloud_name)
return
# Handle network-level issues
# Re-enqueue the subcloud for reauditing
max_attempts = CONF.certmon.network_max_retry
if sc_audit_item.audit_count < max_attempts:
LOG.exception("Cannot retrieve ssl certificate for %s "
"via: %s (requeuing audit)"
% (subcloud_name, subcloud_sysinv_url))
self.requeue_audit_subcloud(sc_audit_item,
CONF.certmon.network_retry_interval)
else:
LOG.exception("Cannot retrieve ssl certificate for %s via: %s; "
"maximum retry limit exceeded [%d], giving up"
% (subcloud_name, subcloud_sysinv_url, max_attempts))
utils.update_subcloud_status(my_dc_token(), subcloud_name,
utils.SYNC_STATUS_OUT_OF_SYNC)
return
try:
secret = utils.get_sc_intermediate_ca_secret(subcloud_name)
check_list = ['ca.crt', 'tls.crt', 'tls.key']
for item in check_list:
if item not in secret.data:
raise Exception('%s certificate data missing: %s'
% (subcloud_name, item))
txt_ssl_cert = base64.decode_as_text(secret.data['tls.crt'])
txt_ssl_key = base64.decode_as_text(secret.data['tls.key'])
txt_ca_cert = base64.decode_as_text(secret.data['ca.crt'])
except Exception:
# Handle certificate-level issues
if not utils.is_subcloud_online(subcloud_name, my_dc_token()):
LOG.exception("Error getting subcloud intermediate cert. "
"Subcloud is not online, aborting audit: %s"
% subcloud_name)
return
LOG.exception("Cannot audit ssl certificate on %s. "
"Certificate is not ready." % subcloud_name)
# certificate is not ready, no reaudit. Will be picked up
# by certificate MODIFIED event if it comes back
return
cert_chain = txt_ssl_cert + txt_ca_cert
if not cutils.verify_intermediate_ca_cert(cert_chain, sc_ssl_cert):
# The subcloud needs renewal.
LOG.info("Updating %s intermediate CA as it is out-of-sync" %
subcloud_name)
# reaudit this subcloud after delay
self.requeue_audit_subcloud(sc_audit_item)
try:
utils.update_subcloud_ca_cert(my_dc_token(),
subcloud_name,
subcloud_sysinv_url,
txt_ca_cert,
txt_ssl_cert,
txt_ssl_key)
except Exception:
LOG.exception("Failed to update intermediate CA on %s"
% subcloud_name)
utils.update_subcloud_status(my_dc_token(), subcloud_name,
utils.SYNC_STATUS_OUT_OF_SYNC)
else:
LOG.info("%s intermediate CA cert is in-sync" % subcloud_name)
utils.update_subcloud_status(my_dc_token(), subcloud_name,
utils.SYNC_STATUS_IN_SYNC)
@periodic_task.periodic_task(spacing=CONF.certmon.retry_interval)
def retry_monitor_task(self, context):
# Failed tasks that need to be reattempted will be taken care here
@ -347,8 +68,6 @@ class CertificateMonManager(periodic_task.PeriodicTasks):
LOG.info("Start retry_monitor_task: #tasks in queue: %s" %
num_tasks)
# NOTE: this loop can potentially retry ALL subclouds, which
# may be a resource concern.
for task in tasks:
task_id = task.get_id()
LOG.info("retry_monitor_task: %s, attempt: %s"
@ -371,18 +90,8 @@ class CertificateMonManager(periodic_task.PeriodicTasks):
eventlet.sleep(0.1)
LOG.debug("End retry_monitor_task")
def start_audit(self):
LOG.info("Auditing interval %s" % CONF.certmon.audit_interval)
utils.init_keystone_auth_opts()
self.audit_thread = eventlet.greenthread.spawn(self.audit_cert_loop)
self.on_start_audit()
def init_dc_monitor(self):
self.dc_monitor = watcher.DC_CertWatcher()
self.dc_monitor.initialize(
audit_subcloud=lambda subcloud_name:
self.audit_subcloud(subcloud_name, allow_requeue=True),
invalid_deploy_states=INVALID_SUBCLOUD_AUDIT_DEPLOY_STATES)
def start_periodic_tasks(self):
self.task_thread = eventlet.greenthread.spawn(self.periodic_tasks_loop)
def init_systemlocalcacert_monitor(self):
self.systemlocalcacert_monitor = watcher.SystemLocalCACert_CertWatcher()
@ -400,7 +109,7 @@ class CertificateMonManager(periodic_task.PeriodicTasks):
self.openldapcert_monitor = watcher.OpenldapCert_CertWatcher()
self.openldapcert_monitor.initialize()
def start_monitor(self, dc_role):
def start_monitor(self):
while True:
try:
# init system-local-ca RCA cert monitor
@ -409,11 +118,6 @@ class CertificateMonManager(periodic_task.PeriodicTasks):
self.init_restapicert_monitor()
self.init_registrycert_monitor()
self.init_openldapcert_monitor()
# init dc monitor only if running in DC role
if dc_role in (constants.DISTRIBUTED_CLOUD_ROLE_SYSTEMCONTROLLER,
constants.DISTRIBUTED_CLOUD_ROLE_SUBCLOUD):
self.init_dc_monitor()
except Exception as e:
LOG.exception(e)
time.sleep(5)
@ -435,68 +139,28 @@ class CertificateMonManager(periodic_task.PeriodicTasks):
eventlet.greenthread.spawn(self.monitor_cert,
self.openldapcert_monitor))
if dc_role in (constants.DISTRIBUTED_CLOUD_ROLE_SYSTEMCONTROLLER,
constants.DISTRIBUTED_CLOUD_ROLE_SUBCLOUD):
self.mon_threads.append(
eventlet.greenthread.spawn(self.monitor_cert, self.dc_monitor))
def stop_monitor(self):
for mon_thread in self.mon_threads:
mon_thread.kill()
mon_thread.wait()
self.mon_threads = []
def stop_audit(self):
if self.audit_thread:
self.audit_thread.kill()
self.audit_thread.wait()
self.audit_thread = None
def stop_periodic_tasks(self):
if self.task_thread:
self.task_thread.kill()
self.task_thread.wait()
self.task_thread = None
def audit_cert_loop(self):
def periodic_tasks_loop(self):
while True:
try:
self.run_periodic_tasks(context=None)
time.sleep(1)
time.sleep(60)
except greenlet.GreenletExit:
break
except Exception as e:
LOG.exception(e)
def requeue_audit_subcloud(self, sc_audit_item, delay_secs=60):
if not self.sc_audit_queue.contains(sc_audit_item.name):
self.sc_audit_queue.enqueue(sc_audit_item, delay_secs)
def audit_subcloud(self, subcloud_name, allow_requeue=False, priority=None):
"""Enqueue a subcloud audit
allow_requeue: This can come from a watch after a DC certificate renew.
i.e., outside of the periodic subcloud audit tasks.
We allow a re-enqueue here with a new delay.
priority: When set, this value will be used instead of timestamp for
the position in the queue
"""
# Since there's no way to remove a subcloud from the queue, we always
# enqueue again if it came from a notification (priority == 0)
if self.sc_audit_queue.contains(subcloud_name) and priority != 0:
if (allow_requeue
and self.sc_audit_queue.enqueued_subcloud_names.count(
subcloud_name) < 2):
LOG.info("audit_subcloud: requeing %s" % subcloud_name)
else:
LOG.debug("audit_subcloud: ignoring %s, already in queue"
% subcloud_name)
return
self.sc_audit_queue.enqueue(
subcloud_audit_queue.SubcloudAuditData(subcloud_name),
allow_requeue=allow_requeue,
priority=priority,
)
def subcloud_sysinv_endpoint_update(self, subcloud_name, sysinv_url):
utils.SubcloudSysinvEndpointCache.update_endpoints(
{subcloud_name: sysinv_url}
)
def monitor_cert(self, monitor):
while True:
# never exit until exit signal received

View File

@ -1,145 +0,0 @@
# Copyright (c) 2020, 2022 Wind River Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# SPDX-License-Identifier: Apache-2.0
import pecan
from pecan import hooks
from oslo_context import context as base_context
from oslo_utils import encodeutils
from sysinv.api.policies import base as base_policy
from sysinv.common import policy
ALLOWED_WITHOUT_AUTH = '/'
class RequestContext(base_context.RequestContext):
"""Stores information about the security context.
The context encapsulates information related to the user accessing the
the system, as well as additional request information.
"""
def __init__(self, auth_token=None, user=None, project=None,
domain=None, user_domain=None, project_domain=None,
is_admin=None, read_only=False, show_deleted=False,
request_id=None, auth_url=None, trusts=None,
user_name=None, project_name=None, domain_name=None,
user_domain_name=None, project_domain_name=None,
auth_token_info=None, region_name=None, roles=None,
password=None, **kwargs):
"""Initializer of request context."""
# We still have 'tenant' param because oslo_context still use it.
super(RequestContext, self).__init__(
auth_token=auth_token, user=user, tenant=project,
domain=domain, user_domain=user_domain,
project_domain=project_domain, roles=roles,
read_only=read_only, show_deleted=show_deleted,
request_id=request_id)
# request_id might be a byte array
self.request_id = encodeutils.safe_decode(self.request_id)
# we save an additional 'project' internally for use
self.project = project
# Session for DB access
self._session = None
self.auth_url = auth_url
self.trusts = trusts
self.user_name = user_name
self.project_name = project_name
self.domain_name = domain_name
self.user_domain_name = user_domain_name
self.project_domain_name = project_domain_name
self.auth_token_info = auth_token_info
self.region_name = region_name
self.roles = roles or []
self.password = password
# Check user is admin or not
if is_admin is None:
self.is_admin = policy.authorize(
base_policy.ADMIN_IN_SYSTEM_PROJECTS, {}, self.to_dict(),
do_raise=False)
else:
self.is_admin = is_admin
def to_dict(self):
return {
'auth_url': self.auth_url,
'auth_token': self.auth_token,
'auth_token_info': self.auth_token_info,
'user': self.user,
'user_name': self.user_name,
'user_domain': self.user_domain,
'user_domain_name': self.user_domain_name,
'project': self.project,
'project_name': self.project_name,
'project_domain': self.project_domain,
'project_domain_name': self.project_domain_name,
'domain': self.domain,
'domain_name': self.domain_name,
'trusts': self.trusts,
'region_name': self.region_name,
'roles': self.roles,
'show_deleted': self.show_deleted,
'is_admin': self.is_admin,
'request_id': self.request_id,
'password': self.password,
}
@classmethod
def from_dict(cls, values):
return cls(**values)
def get_admin_context(show_deleted=False):
return RequestContext(is_admin=True, show_deleted=show_deleted)
def get_service_context(**args):
"""An abstraction layer for getting service context.
There could be multiple cloud backends for cert-mon to use. This
abstraction layer provides an indirection for cert-mon to get the
credentials of 'dcmanager' user on the specific cloud. By default,
this credential refers to the credentials built for dcmanager middleware
in an OpenStack cloud.
"""
pass
class AuthHook(hooks.PecanHook):
def before(self, state):
if state.request.path == ALLOWED_WITHOUT_AUTH:
return
req = state.request
identity_status = req.headers.get('X-Identity-Status')
service_identity_status = req.headers.get('X-Service-Identity-Status')
if (identity_status == 'Confirmed' or
service_identity_status == 'Confirmed'):
return
if req.headers.get('X-Auth-Token'):
msg = 'Auth token is invalid: %s' % req.headers['X-Auth-Token']
else:
msg = 'Authentication required'
msg = "Failed to validate access token: %s" % str(msg)
pecan.abort(status_code=401, detail=msg)

View File

@ -1,95 +0,0 @@
# Copyright (c) 2020, 2022 Wind River Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# SPDX-License-Identifier: Apache-2.0
import eventlet
from oslo_config import cfg
import oslo_messaging
from oslo_serialization import jsonutils
from oslo_log import log as logging
from sysinv.cert_mon import context
TRANSPORT = None
LOG = logging.getLogger(__name__)
class RequestContextSerializer(oslo_messaging.Serializer):
def __init__(self, base):
self._base = base
def serialize_entity(self, ctxt, entity):
if not self._base:
return entity
return self._base.serialize_entity(ctxt, entity)
def deserialize_entity(self, ctxt, entity):
if not self._base:
return entity
return self._base.deserialize_entity(ctxt, entity)
@staticmethod
def serialize_context(ctxt):
return ctxt.to_dict()
@staticmethod
def deserialize_context(ctxt):
return context.RequestContext.from_dict(ctxt)
class JsonPayloadSerializer(oslo_messaging.NoOpSerializer):
@classmethod
def serialize_entity(cls, context, entity):
return jsonutils.to_primitive(entity, convert_instances=True)
def setup(url=None, optional=False):
"""Initialise the oslo_messaging layer."""
global TRANSPORT
if url and url.startswith("fake://"):
# NOTE: oslo_messaging fake driver uses time.sleep
# for task switch, so we need to monkey_patch it
eventlet.monkey_patch(time=True)
if not TRANSPORT:
oslo_messaging.set_transport_defaults('dcmanager')
exmods = ['dcmanager.common.exception']
try:
TRANSPORT = oslo_messaging.get_transport(
cfg.CONF, url, allowed_remote_exmods=exmods)
except oslo_messaging.InvalidTransportURL as e:
TRANSPORT = None
if not optional or e.url:
raise
LOG.info(TRANSPORT)
def cleanup():
"""Cleanup the oslo_messaging layer."""
global TRANSPORT
if TRANSPORT:
TRANSPORT.cleanup()
TRANSPORT = None
def get_rpc_server(target, endpoint):
"""Return a configured oslo_messaging rpc server."""
serializer = RequestContextSerializer(JsonPayloadSerializer())
return oslo_messaging.get_rpc_server(TRANSPORT, target, [endpoint],
executor='eventlet',
serializer=serializer)

View File

@ -1,4 +1,4 @@
# Copyright (c) 2020-2023 Wind River Systems, Inc.
# Copyright (c) 2020-2023, 2025 Wind River Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -15,23 +15,12 @@
#
# SPDX-License-Identifier: Apache-2.0
import time
from oslo_config import cfg
from oslo_log import log as logging
import oslo_messaging
from oslo_service import service
from sysinv.cert_mon import messaging as rpc_messaging
from sysinv.cert_mon import utils
from sysinv.cert_mon.certificate_mon_manager import CertificateMonManager
from sysinv.common import constants
RPC_API_VERSION = '1.0'
TOPIC_DCMANAGER_NOTFICATION = 'DCMANAGER-NOTIFICATION'
DC_ROLE_TIMEOUT_SECONDS = 180
DC_ROLE_DELAY_SECONDS = 5
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
@ -42,92 +31,15 @@ class CertificateMonitorService(service.Service):
def __init__(self):
super(CertificateMonitorService, self).__init__()
self.rpc_api_version = RPC_API_VERSION
self.topic = TOPIC_DCMANAGER_NOTFICATION
self._rpc_server = None
self.target = None
self.dc_role = utils.DC_ROLE_UNDETECTED
self.manager = CertificateMonManager()
def start(self):
super(CertificateMonitorService, self).start()
self._get_dc_role()
self.manager.start_monitor(self.dc_role)
# Note: self.dc_role can be None (if non-DC system):
if self.dc_role == constants.DISTRIBUTED_CLOUD_ROLE_SYSTEMCONTROLLER:
self.manager.start_audit()
self.target = oslo_messaging.Target(
version=self.rpc_api_version,
server=CONF.host,
topic=self.topic)
self._rpc_server = rpc_messaging.get_rpc_server(self.target, self)
self._rpc_server.start()
elif self.dc_role == constants.DISTRIBUTED_CLOUD_ROLE_SUBCLOUD:
self.manager.start_audit()
utils.init_keystone_auth_opts()
self.manager.start_monitor()
self.manager.start_periodic_tasks()
def stop(self):
if self.dc_role == constants.DISTRIBUTED_CLOUD_ROLE_SYSTEMCONTROLLER:
self._stop_rpc_server()
self.manager.stop_audit()
elif self.dc_role == constants.DISTRIBUTED_CLOUD_ROLE_SUBCLOUD:
self.manager.stop_audit()
self.manager.stop_periodic_tasks()
self.manager.stop_monitor()
super(CertificateMonitorService, self).stop()
rpc_messaging.cleanup()
def _get_dc_role(self):
if self.dc_role != utils.DC_ROLE_UNDETECTED:
return self.dc_role
utils.init_keystone_auth_opts()
delay = DC_ROLE_DELAY_SECONDS
max_dc_role_attempts = DC_ROLE_TIMEOUT_SECONDS // delay
dc_role_attempts = 1
while dc_role_attempts < max_dc_role_attempts:
try:
self.dc_role = utils.get_dc_role()
return self.dc_role
except Exception as e:
LOG.info("Unable to get DC role: %s [attempt: %s]",
str(e), dc_role_attempts)
time.sleep(delay)
dc_role_attempts += 1
raise Exception('Failed to obtain DC role from keystone')
def _stop_rpc_server(self):
# Stop RPC server
# only for DISTRIBUTED_CLOUD_ROLE_SYSTEMCONTROLLER
if self._rpc_server:
try:
self._rpc_server.stop()
self._rpc_server.wait()
LOG.info('Engine service stopped successfully')
except Exception as ex:
LOG.error('Failed to stop engine service: %s' % ex)
LOG.exception(ex)
# TODO(gherzman): verify if it's possible to add the subcloud management IP
# as a parameter as a way to further optimize the audit request during
# subcloud deployment. Might require passing the parameter to the audit
# enqueue method as well.
def subcloud_online(self, context, subcloud_name=None):
"""
Trigger a subcloud online audit
"""
LOG.info("%s is online. An online audit is queued"
% subcloud_name)
# Set priority to 0 for the audit to be processed immediately
# Multiple audits with priority 0 will be processed in FIFO order
self.manager.audit_subcloud(subcloud_name, priority=0)
def subcloud_managed(self, context, subcloud_name=None):
"""
Trigger a subcloud audit
"""
LOG.info("%s is managed. An audit is queued"
% subcloud_name)
self.manager.audit_subcloud(subcloud_name)
def subcloud_sysinv_endpoint_update(self, ctxt, subcloud_name, endpoint):
"""Update sysinv endpoint of dc token cache"""
LOG.info("Update subcloud: %s sysinv endpoint" % subcloud_name)
self.manager.subcloud_sysinv_endpoint_update(subcloud_name, endpoint)

View File

@ -1,115 +0,0 @@
# Copyright (c) 2021-2022 Wind River Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# SPDX-License-Identifier: Apache-2.0
import heapq
import time
from eventlet.queue import PriorityQueue
from oslo_log import log
LOG = log.getLogger(__name__)
class SubcloudAuditData(object):
"""Representation of a subcloud under audit.
"""
def __init__(self, name, audit_count=0):
self.name = name
self.audit_count = audit_count
self.timestamp = 0
def __eq__(self, other):
return self.name == other.name
def __lt__(self, other):
"""Used in sorting the PriorityQueue"""
return self.timestamp < other.timestamp
def __hash__(self):
return hash(self.name)
def __str__(self):
return "SubcloudAuditData: {name: %s, audit_count: %s}" % (
self.name, self.audit_count)
class SubcloudAuditException(Exception):
"""Indicates subcloud audit issue"""
pass
class SubcloudAuditPriorityQueue(PriorityQueue):
"""A subclass of PriorityQueue which tracks enqueued subclouds"""
def _init(self, maxsize=None):
self.enqueued_subcloud_names = list()
PriorityQueue._init(self, maxsize)
@staticmethod
def __get_next_audit_timestamp(delay_secs):
next_audit_timestamp = int(time.time())
if delay_secs > 0:
next_audit_timestamp += delay_secs
return next_audit_timestamp
def contains(self, subcloud_name):
"""Check if subcloud is under audit"""
return subcloud_name in self.enqueued_subcloud_names
def enqueue(
self,
sc_audit_item,
delay_secs=0,
timestamp=None,
allow_requeue=False,
priority=None,
):
"""Custom top-level method to enqueue a subcloud in the audit
- convert delay to timestamp
- increment audit_count
"""
if (sc_audit_item.name in self.enqueued_subcloud_names
and not allow_requeue and priority != 0):
raise SubcloudAuditException("Subcloud already enqueued: %s"
% sc_audit_item.name)
if timestamp is None:
timestamp = self.__get_next_audit_timestamp(delay_secs)
else:
timestamp += delay_secs
# If priority is set, use it as the timestamp so we can move
# the subcloud to the front of the queue
if priority is not None:
timestamp = priority
# this PriorityQueue is ordered by the next timestamp:
sc_audit_item.audit_count += 1
sc_audit_item.timestamp = timestamp
self.put(
(timestamp, sc_audit_item)
)
def _get(self, heappop=heapq.heappop):
"""Modifies PriorityQueue.get() to track audited subcloud names"""
item = PriorityQueue._get(self, heappop)
self.enqueued_subcloud_names.remove(item[1].name)
return item
def _put(self, item, heappush=heapq.heappush):
"""Modifies PriorityQueue.put() to track audited subcloud names"""
subcloud_audit = item[1]
self.enqueued_subcloud_names.append(subcloud_audit.name)
LOG.info("Enqueued: %s" % str(subcloud_audit))
PriorityQueue._put(self, item, heappush)

View File

@ -1,4 +1,4 @@
# Copyright (c) 2020-2024 Wind River Systems, Inc.
# Copyright (c) 2020-2025 Wind River Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -16,21 +16,13 @@
# SPDX-License-Identifier: Apache-2.0
import json
import netaddr
import os
import re
import ssl
import socket
import tempfile
import requests
from eventlet.green import subprocess
from six.moves.urllib.parse import urlparse
from oslo_concurrency import lockutils
from oslo_config import cfg
from oslo_log import log
from oslo_utils import encodeutils
from oslo_serialization import base64
from six.moves.urllib.request import Request
from six.moves.urllib.error import HTTPError
from six.moves.urllib.error import URLError
@ -38,28 +30,9 @@ from six.moves.urllib.request import urlopen
from sysinv.common import constants
from sysinv.openstack.common.keystone_objects import Token
from sysinv.common import kubernetes as sys_kube
# Subcloud sync status
ENDPOINT_TYPE_DC_CERT = 'dc-cert'
SYNC_STATUS_UNKNOWN = "unknown"
SYNC_STATUS_IN_SYNC = "in-sync"
SYNC_STATUS_OUT_OF_SYNC = "out-of-sync"
DEPLOY_STATE_DONE = 'complete'
MANAGEMENT_UNMANAGED = "unmanaged"
MANAGEMENT_MANAGED = "managed"
AVAILABILITY_OFFLINE = "offline"
AVAILABILITY_ONLINE = "online"
CERT_NAMESPACE_SYS_CONTROLLER = 'dc-cert'
CERT_NAMESPACE_SUBCLOUD_CONTROLLER = 'sc-cert'
DC_ROLE_UNDETECTED = 'unknown'
ENDPOINT_LOCK_NAME = "sysinv-endpoints"
CERT_INSTALL_LOCK_NAME = "sysinv-certs"
LOG = log.getLogger(__name__)
@ -68,212 +41,6 @@ CONF = cfg.CONF
dc_role = DC_ROLE_UNDETECTED
internal_token_cache = None
dc_token_cache = None
def update_admin_ep_cert(token, ca_crt, tls_crt, tls_key):
service_type = 'platform'
service_name = 'sysinv'
sysinv_url = token.get_service_internal_url(service_type, service_name)
api_cmd = sysinv_url + '/certificate/certificate_renew'
api_cmd_payload = dict()
api_cmd_payload['certtype'] = constants.CERTIFICATE_TYPE_ADMIN_ENDPOINT
resp = rest_api_request(token, "POST", api_cmd, json.dumps(api_cmd_payload))
if 'result' in resp and resp['result'] == 'OK':
LOG.info('Update admin endpoint certificate request succeeded')
else:
LOG.error('Request response %s' % resp)
raise Exception('Update admin endpoint certificate failed')
def verify_adminep_cert_chain():
"""
Verify admin endpoint certificate chain & delete if invalid
:param context: an admin context.
:return: True/False if chain is valid
* Retrieve ICA & AdminEP cert secrets from k8s
* base64 decode ICA cert (tls.crt from SC_INTERMEDIATE_CA_SECRET_NAME)
* & adminep (tls.crt from SC_ADMIN_ENDPOINT_SECRET_NAME)
* & store the crts in tempfiles
* Run openssl verify against RootCA to verify the chain
"""
kube_op = sys_kube.KubeOperator()
secret_ica = kube_op.kube_get_secret(constants.SC_INTERMEDIATE_CA_SECRET_NAME,
CERT_NAMESPACE_SUBCLOUD_CONTROLLER)
if 'tls.crt' not in secret_ica.data:
raise Exception('%s tls.crt (ICA) data missing'
% (constants.SC_INTERMEDIATE_CA_SECRET_NAME))
secret_adminep = kube_op.kube_get_secret(constants.SC_ADMIN_ENDPOINT_SECRET_NAME,
CERT_NAMESPACE_SUBCLOUD_CONTROLLER)
if 'tls.crt' not in secret_adminep.data:
raise Exception('%s tls.crt data missing'
% (constants.SC_ADMIN_ENDPOINT_SECRET_NAME))
txt_ca_crt = base64.decode_as_text(secret_ica.data['tls.crt'])
txt_tls_crt = base64.decode_as_text(secret_adminep.data['tls.crt'])
with tempfile.NamedTemporaryFile() as ca_tmpfile:
ca_tmpfile.write(txt_ca_crt.encode('utf8'))
ca_tmpfile.flush()
with tempfile.NamedTemporaryFile() as adminep_tmpfile:
adminep_tmpfile.write(txt_tls_crt.encode('utf8'))
adminep_tmpfile.flush()
cmd = ['openssl', 'verify', '-CAfile', constants.DC_ROOT_CA_CERT_PATH,
'-untrusted', ca_tmpfile.name, adminep_tmpfile.name]
proc = subprocess.Popen(cmd, stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True)
stdout, stderr = proc.communicate()
proc.wait()
if 0 == proc.returncode:
LOG.info('verify_adminep_cert_chain passed. Valid chain')
return True
else:
LOG.info('verify_adminep_cert_chain: Chain is invalid\n%s\n%s'
% (stdout, stderr))
res = kube_op.kube_delete_secret(constants.SC_ADMIN_ENDPOINT_SECRET_NAME,
CERT_NAMESPACE_SUBCLOUD_CONTROLLER)
LOG.info('Deleting AdminEP secret due to invalid chain. %s:%s, result %s, msg %s'
% (CERT_NAMESPACE_SUBCLOUD_CONTROLLER,
constants.SC_ADMIN_ENDPOINT_SECRET_NAME,
res.status, res.message))
return False
def dc_get_service_endpoint_url(token,
service_name='dcmanager',
service_type='dcmanager',
region=constants.SYSTEM_CONTROLLER_REGION):
"""Pulls the dcmanager service internal URL from the given token"""
url = token.get_service_internal_url(service_type, service_name, region)
if url:
LOG.debug('%s %s endpoint %s' % (region, service_name, url))
return url
else:
LOG.error('Cannot find %s endpoint for %s' % (service_name, region))
raise Exception('Cannot find %s endpoint for %s' % (service_name, region))
def update_subcloud_ca_cert(
token, sc_name, sysinv_url, ca_crt, tls_crt, tls_key):
api_cmd = sysinv_url + '/certificate/certificate_renew'
api_cmd_payload = dict()
api_cmd_payload['certtype'] = \
constants.CERTIFICATE_TYPE_ADMIN_ENDPOINT_INTERMEDIATE_CA
api_cmd_payload['root_ca_crt'] = ca_crt
api_cmd_payload['sc_ca_cert'] = tls_crt
api_cmd_payload['sc_ca_key'] = tls_key
timeout = int(CONF.endpoint_cache.http_connect_timeout)
resp = rest_api_request(token, "POST", api_cmd,
json.dumps(api_cmd_payload), timeout=timeout)
if 'result' in resp and resp['result'] == 'OK':
LOG.info('Update %s intermediate CA cert request succeed' % sc_name)
else:
LOG.error('Request response %s' % resp)
raise Exception('Update %s intermediate CA cert failed' % sc_name)
def get_subcloud(token, subcloud_name):
api_url = dc_get_service_endpoint_url(token)
api_cmd = api_url + '/subclouds/%s' % subcloud_name
LOG.info('api_cmd %s' % api_cmd)
resp = rest_api_request(token, "GET", api_cmd)
return resp
def load_subclouds(resp, invalid_deploy_states=None):
sc_list = []
for obj in resp["subclouds"]:
if invalid_deploy_states and obj["deploy-status"] in invalid_deploy_states:
continue
sc = {}
sc["name"] = obj["name"]
sc["region-name"] = obj["region-name"]
sc["management-state"] = obj["management-state"]
sc["availability-status"] = obj["availability-status"]
sc["sync_status"] = obj["sync_status"]
sc["management_ip"] = obj["management-start-ip"]
for ss in obj["endpoint_sync_status"]:
sc[ss["endpoint_type"]] = ss["sync_status"]
sc_list.append(sc)
return sc_list
def get_subclouds_from_dcmanager(token, invalid_deploy_states=None):
api_url = dc_get_service_endpoint_url(token)
api_cmd = api_url + '/subclouds'
LOG.debug('api_cmd %s' % api_cmd)
resp = rest_api_request(token, "GET", api_cmd)
return load_subclouds(resp, invalid_deploy_states)
def is_subcloud_online(subcloud_name, token=None):
"""Check if subcloud is online"""
if not token:
token = get_cached_token()
subcloud_info = get_subcloud(token, subcloud_name)
if not subcloud_info:
LOG.error('Cannot find subcloud %s' % subcloud_name)
return False
return subcloud_info['availability-status'] == AVAILABILITY_ONLINE
def query_subcloud_online_with_deploy_state(
subcloud_name, invalid_deploy_states=None, token=None
):
"""Check if subcloud is online and not in an invalid deploy state"""
if not token:
token = get_cached_token()
subcloud_info = get_subcloud(token, subcloud_name)
if not subcloud_info:
LOG.error("Cannot find subcloud %s" % subcloud_name)
return False, None, None
subcloud_valid_state = False
if (
invalid_deploy_states
and subcloud_info["deploy-status"] in invalid_deploy_states
):
subcloud_valid_state = False
else:
subcloud_valid_state = (
subcloud_info["availability-status"] == AVAILABILITY_ONLINE
)
return (
subcloud_valid_state,
subcloud_info["availability-status"],
subcloud_info["deploy-status"],
)
def update_subcloud_status(token, subcloud_name, status):
api_url = dc_get_service_endpoint_url(token)
api_cmd = api_url + '/subclouds/%s/update_status' % subcloud_name
api_cmd_payload = dict()
api_cmd_payload['endpoint'] = ENDPOINT_TYPE_DC_CERT
api_cmd_payload['status'] = status
resp = rest_api_request(token, "PATCH",
api_cmd, json.dumps(api_cmd_payload))
if 'result' in resp and resp['result'] == 'OK':
LOG.info('Updated subcloud %s status: %s' % (subcloud_name, status))
else:
LOG.error("Failed to update subcloud %s status to '%s', resp=%s"
% (subcloud_name, status, resp))
raise Exception('Update subcloud status failed, subcloud=%s'
% subcloud_name)
def rest_api_request(token, method, api_cmd,
@ -395,25 +162,6 @@ def get_token():
return token
def get_dc_token(region_name=constants.SYSTEM_CONTROLLER_REGION):
"""Get token for the dcmanager user.
Note: Although region_name can be specified, the token used here is a
"project-scoped" token (i.e., not specific to the subcloud/region name).
A token obtained using one region_name can be re-used across any
subcloud. We take advantage of this in our DC token caching strategy.
"""
token = _get_token(
CONF.endpoint_cache.auth_uri + '/auth/tokens',
CONF.endpoint_cache.project_name,
CONF.endpoint_cache.username,
CONF.endpoint_cache.password,
CONF.endpoint_cache.user_domain_name,
CONF.endpoint_cache.project_domain_name,
region_name)
return token
def _get_token(auth_url,
auth_project,
username,
@ -511,73 +259,6 @@ def init_keystone_auth_opts():
cfg.CONF.register_opts(endpoint_opts, group=endpoint_cache_group.name)
def get_subcloud_secrets():
"""get subcloud name and ICA secret name pairs from k8s secret
Every subcloud comes with an ICA entry in k8s secret
:return: dict of subcloud name and ICA secret name pairs
"""
secret_pattern = re.compile('-adminep-ca-certificate$')
kube_op = sys_kube.KubeOperator()
secret_list = kube_op.kube_list_secret(ENDPOINT_TYPE_DC_CERT)
dict = {}
for secret in secret_list:
secret_name = secret.metadata.name
m = secret_pattern.search(secret_name)
if m:
start = m.start()
if start > 0:
dict.update({secret_name[0:m.start()]: secret_name})
return dict
def get_subclouds():
"""get name of all subclouds from k8s secret
Every subcloud comes with an ICA entry in k8s secret
:return: list of subcloud names
"""
subcloud_secrets = get_subcloud_secrets()
return list(subcloud_secrets.keys())
def get_intermediate_ca_secret_name(sc):
return '{}-adminep-ca-certificate'.format(sc)
def get_sc_intermediate_ca_secret(sc):
secret_name = get_intermediate_ca_secret_name(sc)
kube_op = sys_kube.KubeOperator()
return kube_op.kube_get_secret(secret_name, CERT_NAMESPACE_SYS_CONTROLLER)
def get_endpoint_certificate(endpoint, timeout_secs=10):
url = urlparse(endpoint)
host = url.hostname
port = url.port
if timeout_secs is not None and timeout_secs > 0:
# The call to ssl.get_server_certificate blocks for a long time if the
# server is not available. A timeout is not available in python 2.7.
# See https://bugs.python.org/issue31870
# Until the timeout=<val> option is available in
# get_server_certificate(), we first check if the port is open
# by connecting using a timeout, then we do the certificate check:
sock = None
try:
sock = socket.create_connection((host, port), timeout=timeout_secs)
except Exception:
LOG.warn("get_endpoint_certificate: connection failed to %s:%s",
host, port)
raise
finally:
if sock is not None:
sock.close()
return ssl.get_server_certificate((host, port))
def get_dc_role():
global dc_role
if dc_role == DC_ROLE_UNDETECTED:
@ -600,39 +281,6 @@ def get_dc_role():
return dc_role
def get_isystems_uuid(token):
uuid = ''
sysinv_url = token.get_service_internal_url(constants.SERVICE_TYPE_PLATFORM, constants.SYSINV_USERNAME)
api_cmd = sysinv_url + '/isystems'
res = rest_api_request(token, "GET", api_cmd)['isystems']
if len(res) == 1:
system = res[0]
uuid = system['uuid']
else:
raise Exception('Failed to access system data')
return uuid
def enable_https(token, system_uuid):
ret = True
sysinv_url = token.get_service_internal_url(constants.SERVICE_TYPE_PLATFORM, constants.SYSINV_USERNAME)
api_cmd = sysinv_url + '/isystems/' + system_uuid
patch = []
patch.append({'op': 'replace', 'path': '/https_enabled', 'value': 'true'})
resp = rest_api_request(token, "PATCH", api_cmd, json.dumps(patch))
if resp['capabilities']['https_enabled'] is True:
LOG.info('Enable https patch request succeeded')
else:
ret = False
LOG.exception('Enable https failed! resp=%s' % resp)
return ret
def upload_request_with_data(token, url, **kwargs):
headers = {"X-Auth-Token": token.get_id()}
files = {'file': ("for_upload",
@ -698,24 +346,6 @@ def list_platform_certificates(token):
return rest_api_request(token, "GET", api_cmd)
def uninstall_ca_certificate(token, uuid, cert_type):
"""Uninstall Trusted CA certificate using the sysinv API
:param token: the token to access the sysinv API
:param uuid: the installed certificate uuid
:param cert_type: the type of the certificate. Currently only
'ssl_ca' is supported
"""
LOG.info('Uninstalling certificate %s.' % uuid)
if cert_type != constants.CERT_MODE_SSL_CA:
LOG.error('Cannot uninstall CA certificate of type %s.' % cert_type)
return
sysinv_url = token.get_service_internal_url(constants.SERVICE_TYPE_PLATFORM,
constants.SYSINV_USERNAME)
api_cmd = sysinv_url + '/certificate/' + uuid
rest_api_request(token, "DELETE", api_cmd)
def update_platform_cert(token, cert_type, pem_file_path, force=False):
"""Update a platform certificate using the sysinv API
:param token: the token to access the sysinv API
@ -748,19 +378,16 @@ def update_platform_cert(token, cert_type, pem_file_path, force=False):
class TokenCache(object):
"""Simple token cache. This class holds one keystone token.
"""
token_getters = {'internal': get_token, 'dc': get_dc_token}
def __init__(self, token_type):
self._token = None
self._token_type = token_type
self._getter_func = self.token_getters[token_type]
def get_token(self):
"""Get a new token if required; otherwise use the cached token"""
if not self._token or self._token.is_expired():
LOG.debug("TokenCache %s, Acquiring new token, previous token: %s",
self._token_type, self._token)
self._token = self._getter_func()
self._token = get_token()
else:
LOG.debug("TokenCache %s, Token is still valid, reusing token: %s",
self._token_type, self._token)
@ -776,74 +403,3 @@ def get_internal_token_cache():
def get_cached_token():
return get_internal_token_cache().get_token()
def get_dc_token_cache():
global dc_token_cache
if not dc_token_cache:
dc_token_cache = TokenCache("dc")
return dc_token_cache
def get_cached_dc_token():
return get_dc_token_cache().get_token()
class SubcloudSysinvEndpointCache(object):
# Maps subcloud name to sysinv endpoint
cached_endpoints = {}
@classmethod
@lockutils.synchronized(ENDPOINT_LOCK_NAME)
def get_endpoint(cls, region_name: str, dc_token=None):
"""Retrieve the sysinv endpoint for the given region.
:param region_name: The subcloud region name.
:param dc_token: dcmanager token, if it's present and the endpoint is
not already cached, the endpoint will be obtained by querying dcmanager.
"""
endpoint = cls.cached_endpoints.get(region_name)
if endpoint is None:
if dc_token is None:
LOG.error(f'Cannot find sysinv endpoint for {region_name}')
raise Exception(f'Cannot find sysinv endpoint for {region_name}')
# Try to get it from dcmanager, this should rarely happen as the
# cache is already populated during cert-mon audit during service
# startup
LOG.info("Unable to find cached sysinv endpoint, querying dcmanager")
subcloud = get_subcloud(dc_token, region_name)
endpoint = cls.build_endpoint(subcloud["management-start-ip"])
cls.cached_endpoints[region_name] = endpoint
return endpoint
@classmethod
@lockutils.synchronized(ENDPOINT_LOCK_NAME)
def update_endpoints(cls, endpoints_dict: dict):
"""Update the cached endpoints with the provided dictionary.
:param endpoints_dict: A dictionary mapping region names to endpoint URLs.
"""
cls.cached_endpoints.update(endpoints_dict)
@classmethod
@lockutils.synchronized(ENDPOINT_LOCK_NAME)
def cache_endpoints_by_ip(cls, subcloud_mgmt_ips: dict):
"""Cache endpoints based on management IPs.
:param subcloud_mgmt_ips: A dictionary mapping region names to management IPs.
"""
endpoints = {}
for region, ip in subcloud_mgmt_ips.items():
endpoints[region] = cls.build_endpoint(ip)
cls.cached_endpoints.clear()
cls.cached_endpoints.update(endpoints)
@staticmethod
def build_endpoint(ip: str):
"""Build the sysinv endpoint from the subcloud management IP.
:param ip: The management IP of a subcloud.
"""
formatted_ip = f"[{ip}]" if netaddr.IPAddress(ip).version == 6 else ip
return f"https://{formatted_ip}:6386/v1"

View File

@ -1,4 +1,4 @@
# Copyright (c) 2020-2024 Wind River Systems, Inc.
# Copyright (c) 2020-2025 Wind River Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -17,8 +17,6 @@
import hashlib
import json
import os
import re
from cryptography import x509
from cryptography.hazmat.backends import default_backend
@ -29,6 +27,7 @@ from kubernetes import config
from kubernetes import watch
from kubernetes.client import Configuration
from kubernetes.client.rest import ApiException
from oslo_concurrency import lockutils
from oslo_config import cfg
from oslo_log import log
from oslo_serialization import base64
@ -55,11 +54,10 @@ CONF = cfg.CONF
class MonitorContext(object):
"""Context data for watches"""
def __init__(self):
self.dc_role = None
self.kubernetes_namespace = None
def initialize(self):
self.dc_role = utils.get_dc_role()
pass
# Reuse cached tokens across all contexts
# (i.e. all watches reuse these caches)
@ -69,11 +67,6 @@ class MonitorContext(object):
"""Uses the cached local access token"""
return utils.get_cached_token()
@staticmethod
def get_dc_token():
"""Uses the cached DC token for subcloud"""
return utils.get_cached_dc_token()
class CertUpdateEventData(object):
def __init__(self, event_data):
@ -368,33 +361,6 @@ class CertWatcher(object):
raise
class DC_CertWatcher(CertWatcher):
def __init__(self):
super(DC_CertWatcher, self).__init__()
def initialize(self, audit_subcloud, invalid_deploy_states):
self.context.initialize()
dc_role = self.context.dc_role
LOG.info('DC role: %s' % dc_role)
if dc_role == constants.DISTRIBUTED_CLOUD_ROLE_SUBCLOUD:
ns = utils.CERT_NAMESPACE_SUBCLOUD_CONTROLLER
elif dc_role == constants.DISTRIBUTED_CLOUD_ROLE_SYSTEMCONTROLLER:
ns = utils.CERT_NAMESPACE_SYS_CONTROLLER
else:
ns = ''
self.namespace = ns
self.context.kubernetes_namespace = ns
self.register_listener(AdminEndpointRenew(self.context))
if dc_role == constants.DISTRIBUTED_CLOUD_ROLE_SYSTEMCONTROLLER:
self.register_listener(
DCIntermediateCertRenew(
self.context, audit_subcloud, invalid_deploy_states
)
)
self.register_listener(RootCARenew(self.context))
class SystemLocalCACert_CertWatcher(CertWatcher):
def __init__(self):
super(SystemLocalCACert_CertWatcher, self).__init__()
@ -505,223 +471,6 @@ class CertificateRenew(CertWatcherListener):
self.update_certificate(event_data)
class AdminEndpointRenew(CertificateRenew):
def __init__(self, context):
super(AdminEndpointRenew, self).__init__(context)
role = self.context.dc_role
if role == constants.DISTRIBUTED_CLOUD_ROLE_SYSTEMCONTROLLER:
self.secret_name = constants.DC_ADMIN_ENDPOINT_SECRET_NAME
elif role == constants.DISTRIBUTED_CLOUD_ROLE_SUBCLOUD:
self.secret_name = constants.SC_ADMIN_ENDPOINT_SECRET_NAME
else:
self.secret_name = None
def check_filter(self, event_data):
if self.secret_name == event_data.secret_name:
return self.certificate_is_ready(event_data)
else:
return False
def update_certificate(self, event_data):
token = self.context.get_token()
role = self.context.dc_role
utils.update_admin_ep_cert(token, event_data.ca_crt, event_data.tls_crt,
event_data.tls_key)
# In subclouds, it was observed that sometimes old ICA was used
# to sign adminep-cert. Here we run a verification to confirm that
# the chain is valid & delete secret if chain fails
if role == constants.DISTRIBUTED_CLOUD_ROLE_SUBCLOUD:
utils.verify_adminep_cert_chain()
class DCIntermediateCertRenew(CertificateRenew):
def __init__(self, context, audit_subcloud, invalid_deploy_states):
super(DCIntermediateCertRenew, self).__init__(context)
self.invalid_deploy_states = invalid_deploy_states
self.secret_pattern = re.compile('-adminep-ca-certificate$')
self.audit_subcloud = audit_subcloud
def check_filter(self, event_data):
search_result = self.secret_pattern.search(event_data.secret_name)
if search_result and search_result.start() > 0:
# Ensure subcloud is in a valid deploy-status and online (watch
# events can fire for secrets before the subcloud first comes online)
subcloud_name = self._get_subcloud_name(event_data)
try:
(
subcloud_valid_state,
availability_status,
deploy_status,
) = utils.query_subcloud_online_with_deploy_state(
subcloud_name,
invalid_deploy_states=self.invalid_deploy_states,
token=self.context.get_token(),
)
if not subcloud_valid_state:
LOG.info(
"%s check_filter: subcloud %s is ignored, "
"availability=%s, deploy_status: %s",
self.__class__.__name__,
subcloud_name,
availability_status,
deploy_status,
)
return False
except Exception:
LOG.exception(
"Failed to check subcloud availability: %s" % subcloud_name
)
return False
return self.certificate_is_ready(event_data)
else:
return False
def _get_subcloud_name(self, event_data):
m = self.secret_pattern.search(event_data.secret_name)
return event_data.secret_name[0:m.start()]
def update_certificate(self, event_data):
subcloud_name = self._get_subcloud_name(event_data)
LOG.info('update_certificate: subcloud %s %s', subcloud_name,
event_data)
token = self.context.get_dc_token()
subcloud_sysinv_url = utils.SubcloudSysinvEndpointCache.get_endpoint(
subcloud_name, token
)
utils.update_subcloud_ca_cert(token,
subcloud_name,
subcloud_sysinv_url,
event_data.ca_crt,
event_data.tls_crt,
event_data.tls_key)
self.audit_subcloud(subcloud_name)
def action_failed(self, event_data):
sc_name = self._get_subcloud_name(event_data)
LOG.info('Attempt to update intermediate CA cert for %s has failed' %
sc_name)
# verify subcloud is under managed and online
token = self.context.get_token()
sc = utils.get_subcloud(token, sc_name)
if not sc:
LOG.error('Cannot find subcloud %s' % sc_name)
else:
LOG.info('%s is %s %s. Software version %s' %
(sc_name,
sc['management-state'],
sc['availability-status'],
sc['software-version']))
if sc['management-state'] == utils.MANAGEMENT_MANAGED:
# don't do anything until subcloud managed
for status in sc['endpoint_sync_status']:
if status['endpoint_type'] == utils.ENDPOINT_TYPE_DC_CERT and \
status['sync_status'] != utils.SYNC_STATUS_OUT_OF_SYNC:
LOG.info('Updating %s intermediate CA has failed. Mark %s '
'as dc-cert %s' % (sc_name, sc_name,
utils.SYNC_STATUS_OUT_OF_SYNC))
# update subcloud to dc-cert out-of-sync b/c last intermediate
# CA cert was not updated successfully
# an audit (default within 24 hours) will pick up and reattempt
dc_token = self.context.get_dc_token()
utils.update_subcloud_status(dc_token, sc_name,
utils.SYNC_STATUS_OUT_OF_SYNC)
break
class RootCARenew(CertificateRenew):
def __init__(self, context):
super(RootCARenew, self).__init__(context)
self.secrets_to_recreate = []
def notify_changed(self, event_data):
if self.check_filter(event_data):
self.do_action(event_data)
else:
return False
def check_filter(self, event_data):
if self.context.dc_role != \
constants.DISTRIBUTED_CLOUD_ROLE_SYSTEMCONTROLLER or \
event_data.secret_name != constants.DC_ADMIN_ROOT_CA_SECRET_NAME or \
not self.certificate_is_ready(event_data):
return False
# check against current root CA cert to see if it is updated
if not os.path.isfile(constants.DC_ROOT_CA_CERT_PATH):
return True
with open(constants.DC_ROOT_CA_CERT_PATH, 'r') as f:
crt = f.read()
m = hashlib.md5()
m.update(encodeutils.safe_encode(event_data.ca_crt))
md5sum = m.hexdigest()
if crt.strip() != event_data.ca_crt:
LOG.info('%s check_filter[%s]: root ca certificate has changed. md5sum %s'
% (self.__class__.__name__, event_data.secret_name, md5sum))
# a root CA update, all required secrets needs to be recreated
self.secrets_to_recreate = []
return True
else:
LOG.info('%s check_filter[%s]: root ca certificate remains the same. md5sum %s'
% (self.__class__.__name__, event_data.secret_name, md5sum))
return False
def do_action(self, event_data):
LOG.info('%s do_action: %s' % (self.__class__.__name__, event_data))
if len(self.secrets_to_recreate) == 0:
self.update_certificate(event_data)
self.recreate_secrets()
def action_failed(self, event_data):
LOG.Error('Updating root CA certificate has failed.')
if len(self.secrets_to_recreate) > 0:
LOG.Error('%s are not refreshed' % self.secrets_to_recreate)
self.secrets_to_recreate = []
def update_certificate(self, event_data):
# currently the root CA cert renewal does not replace private key
# This is not ideal it is caused by a cert-manager issue.
# The root CA cert is to be updated by when the admin endpoint
# certification is updated
# https://github.com/jetstack/cert-manager/issues/2978
self.secrets_to_recreate = self.get_secrets_to_recreate()
LOG.info('Secrets to be recreated %s' % self.secrets_to_recreate)
@staticmethod
def get_secrets_to_recreate():
secret_names = list(utils.get_subcloud_secrets().values())
secret_names.insert(0, constants.DC_ADMIN_ENDPOINT_SECRET_NAME)
return secret_names
def recreate_secrets(self):
kube_op = sys_kube.KubeOperator()
secret_list = self.secrets_to_recreate[:]
for secret in secret_list:
try:
LOG.info('Recreate %s:%s' % (utils.CERT_NAMESPACE_SYS_CONTROLLER, secret))
kube_op.kube_delete_secret(secret, utils.CERT_NAMESPACE_SYS_CONTROLLER)
except Exception as e:
LOG.error('Deleting secret %s:%s. Error %s' %
(utils.CERT_NAMESPACE_SYS_CONTROLLER, secret, e))
else:
self.secrets_to_recreate.remove(secret)
if len(self.secrets_to_recreate) > 0:
# raise exception to keep reattempting
raise Exception('Some secrets were not recreated successfully')
class TrustedCARenew(CertificateRenew):
"""Handles a renew event for a certificate that must be installed as a trusted platform cert.
"""
@ -800,7 +549,7 @@ class PlatformCertRenew(CertificateRenew):
else:
return False
@utils.lockutils.synchronized(utils.CERT_INSTALL_LOCK_NAME)
@lockutils.synchronized(utils.CERT_INSTALL_LOCK_NAME)
def update_platform_certificate(self, event_data, cert_type, force=False):
"""Update a platform certificate

View File

@ -1,4 +1,4 @@
# Copyright (c) 2020, 2022 Wind River Systems, Inc.
# Copyright (c) 2020, 2022, 2025 Wind River Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -17,7 +17,6 @@
from oslo_config import cfg
from oslo_log import log as logging
from oslo_service import service
from sysinv.cert_mon import messaging
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
@ -27,16 +26,8 @@ def main():
logging.register_options(CONF)
CONF(project='sysinv', prog='certmon')
common_opts = [
cfg.StrOpt('host',
default='localhost',
help='hostname of the machine')
]
CONF.register_opts(common_opts)
logging.set_defaults()
logging.setup(cfg.CONF, 'certmon')
messaging.setup()
from sysinv.cert_mon import service as cert_mon
LOG.info("Configuration:")

View File

@ -1,5 +1,5 @@
#
# Copyright (c) 2020 Wind River Systems, Inc.
# Copyright (c) 2020, 2025 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
@ -12,14 +12,11 @@ from sysinv.common import utils
class CertMonPuppet(openstack.OpenstackBasePuppet):
"""Class to encapsulate puppet operations for sysinv configuration"""
SYSINV_SERVICE_NAME = 'sysinv'
DC_SERVICE_NAME = 'dcmanager'
def get_secure_static_config(self):
sysinv_kspass = self._get_service_password(self.SYSINV_SERVICE_NAME)
dc_kspass = self._get_service_password(self.DC_SERVICE_NAME)
return {
'sysinv::certmon::local_keystone_password': sysinv_kspass,
'sysinv::certmon::dc_keystone_password': dc_kspass,
}
def get_system_config(self):
@ -41,19 +38,6 @@ class CertMonPuppet(openstack.OpenstackBasePuppet):
'sysinv::certmon::local_region_name': self._keystone_region_name(),
})
if self._distributed_cloud_role() == constants.DISTRIBUTED_CLOUD_ROLE_SYSTEMCONTROLLER:
dc_user = self._get_service_user_name(self.DC_SERVICE_NAME),
config.update({
# The auth info for DC authentication
'sysinv::certmon::dc_keystone_auth_uri': self._keystone_auth_uri(host),
'sysinv::certmon::dc_keystone_identity_uri': self._keystone_identity_uri(host),
'sysinv::certmon::dc_keystone_project_domain': self._get_service_project_domain_name(),
'sysinv::certmon::dc_keystone_tenant': self._get_service_project_name(),
'sysinv::certmon::dc_keystone_user': dc_user,
'sysinv::certmon::dc_keystone_user_domain': self._get_service_user_domain_name(),
'sysinv::certmon::dc_region_name': self._keystone_region_name(),
})
return config
def get_public_url(self):

View File

@ -1,4 +1,4 @@
# Copyright (c) 2020-2024 Wind River Systems, Inc.
# Copyright (c) 2020-2025 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
@ -10,15 +10,12 @@ import filecmp
import json
import mock
import os.path
import time
from oslo_serialization import base64
from sysinv.common import constants
from sysinv.common import exception
from sysinv.cert_mon import service as cert_mon
from sysinv.cert_mon import certificate_mon_manager as cert_mon_manager
from sysinv.cert_mon import subcloud_audit_queue
from sysinv.cert_mon import utils as cert_mon_utils
from sysinv.cert_mon import watcher as cert_mon_watcher
from sysinv.openstack.common.keystone_objects import Token
@ -84,26 +81,6 @@ class CertMonTestCase(base.DbTestCase):
region_name = 'RegionOne'
return Token(token_json, token_id, region_name)
def test_get_isystems_uuid(self):
isystems_file = self.get_data_file_path("isystems")
with open(isystems_file, 'r') as ifile:
self.rest_api_request_result = json.load(ifile)
token = self.keystone_token
ret = cert_mon_utils.get_isystems_uuid(token)
assert ret == 'fdc60cf3-3330-4438-859d-b0da19e9663d'
def test_enable_https(self):
isystems_file = self.get_data_file_path("isystems")
with open(isystems_file, 'r') as ifile:
isystems_json = json.load(ifile)
# The PATCH api response doesn't include the 'isystems[]' json list section
self.rest_api_request_result = isystems_json['isystems'][0]
token = self.keystone_token
ret = cert_mon_utils.enable_https(token, 'fdc60cf3-3330-4438-859d-b0da19e9663d')
assert ret is True
def test_list_platform_certificates(self):
patcher = mock.patch('sysinv.cert_mon.utils.rest_api_request')
mocked_rest_api_get = patcher.start()
@ -124,33 +101,6 @@ class CertMonTestCase(base.DbTestCase):
actual_certificates = cert_mon_utils.list_platform_certificates(self.keystone_token)
self.assertEqual(actual_certificates, mock_certificates)
def test_uninstall_ca_certificate(self):
patcher = mock.patch('sysinv.cert_mon.utils.rest_api_request')
mocked_rest_api_req = patcher.start()
self.addCleanup(patcher.stop)
mock_uuid = '12345678-9abc-defg-hijk-lmnopqrstuvw'
cert_type = constants.CERT_MODE_SSL_CA
method = 'DELETE'
class AnyStringContaining(str):
def __eq__(self, string):
return self in string
cert_mon_utils.uninstall_ca_certificate(self.keystone_token, mock_uuid, cert_type)
mocked_rest_api_req.assert_called_once_with(self.keystone_token, method, AnyStringContaining(mock_uuid))
def test_uninstall_ca_certificate_not_allowed_type(self):
patcher = mock.patch('sysinv.cert_mon.utils.rest_api_request')
mocked_rest_api_req = patcher.start()
self.addCleanup(patcher.stop)
mock_uuid = '12345678-9abc-defg-hijk-lmnopqrstuvw'
cert_type = 'invalid'
cert_mon_utils.uninstall_ca_certificate(self.keystone_token, mock_uuid, cert_type)
mocked_rest_api_req.assert_not_called()
def get_registry_watcher(self):
class FakeContext(object):
def get_token(self):
@ -554,88 +504,6 @@ class CertMonTestCase(base.DbTestCase):
}
}
def test_audit_sc_cert_task_shallow(self):
"""Test the audit_sc_cert_task basic queuing functionality.
Mocks beginning at do_subcloud_audit"""
with mock.patch.object(cert_mon_manager.CertificateMonManager,
"do_subcloud_audit") as mock_do_subcloud_audit:
mock_do_subcloud_audit.return_value = None
cmgr = cert_mon_manager.CertificateMonManager()
cmgr.use_sc_audit_pool = False # easier for testing in serial
cmgr.sc_audit_queue.enqueue(
subcloud_audit_queue.SubcloudAuditData("test1"), delay_secs=1)
cmgr.sc_audit_queue.enqueue(
subcloud_audit_queue.SubcloudAuditData("test2"), delay_secs=2)
self.assertEqual(cmgr.sc_audit_queue.qsize(), 2)
# Run audit immediately, it should not have picked up anything
cmgr.audit_sc_cert_task(None)
mock_do_subcloud_audit.assert_not_called()
self.assertEqual(cmgr.sc_audit_queue.qsize(), 2)
time.sleep(3)
cmgr.audit_sc_cert_task(None)
# It should now be drained:
mock_do_subcloud_audit.assert_called()
self.assertEqual(cmgr.sc_audit_queue.qsize(), 0)
mock_do_subcloud_audit.reset_mock()
cmgr.audit_sc_cert_task(None)
mock_do_subcloud_audit.assert_not_called()
def test_audit_sc_cert_task_deep(self):
"""Test the audit_sc_cert_task basic queuing functionality"""
with mock.patch.multiple("sysinv.cert_mon.utils",
SubcloudSysinvEndpointCache=mock.DEFAULT,
get_endpoint_certificate=mock.DEFAULT,
get_sc_intermediate_ca_secret=mock.DEFAULT,
is_subcloud_online=mock.DEFAULT,
get_token=mock.DEFAULT,
get_dc_token=mock.DEFAULT,
update_subcloud_status=mock.DEFAULT,
update_subcloud_ca_cert=mock.DEFAULT) \
as utils_mock:
# returns an SSL cert in PEM-encoded string
utils_mock["SubcloudSysinvEndpointCache"].get_endpoint.return_value \
= "https://example.com"
utils_mock["get_endpoint_certificate"].return_value \
= self._get_valid_certificate_pem()
utils_mock["get_sc_intermediate_ca_secret"].return_value \
= self._get_sc_intermediate_ca_secret()
utils_mock["is_subcloud_online"].return_value = True
utils_mock["get_dc_token"].return_value = None # don"t care
utils_mock["update_subcloud_status"].return_value = None
utils_mock["update_subcloud_ca_cert"].return_value = None
# also need to mock the TokenCache
with mock.patch.multiple("sysinv.cert_mon.utils.TokenCache",
get_token=mock.DEFAULT) \
as token_cache_mock:
token_cache_mock["get_token"].return_value = None # don"t care
cmgr = cert_mon_manager.CertificateMonManager()
cmgr.use_sc_audit_pool = False # easier for testing in serial
cmgr.sc_audit_queue.enqueue(
subcloud_audit_queue.SubcloudAuditData("test1"),
delay_secs=1)
cmgr.sc_audit_queue.enqueue(
subcloud_audit_queue.SubcloudAuditData("test2"),
delay_secs=2)
self.assertEqual(cmgr.sc_audit_queue.qsize(), 2)
# Run audit immediately, it should not have picked up anything
cmgr.audit_sc_cert_task(None)
self.assertEqual(cmgr.sc_audit_queue.qsize(), 2)
time.sleep(3)
cmgr.audit_sc_cert_task(None)
# It should now be drained:
self.assertEqual(cmgr.sc_audit_queue.qsize(), 0)
def test_token_cache(self):
"""Basic test case for TokenCache"""
@ -651,7 +519,7 @@ class CertMonTestCase(base.DbTestCase):
token_cache = cert_mon_utils.TokenCache('internal')
# override the cache getter function for our test:
token_cache._getter_func = get_cache_test_token
cert_mon_utils.get_token = get_cache_test_token
token = token_cache.get_token()
self.assertEqual(token.get_id(), "token1")

View File

@ -1,130 +0,0 @@
# Copyright (c) 2021 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
"""
Test class for Sysinv subcloud_audit
"""
import time
from sysinv.cert_mon.subcloud_audit_queue import SubcloudAuditData
from sysinv.cert_mon.subcloud_audit_queue import SubcloudAuditException
from sysinv.cert_mon.subcloud_audit_queue import SubcloudAuditPriorityQueue
from sysinv.tests.db import base
class SubcloudAuditTestCase(base.DbTestCase):
"""Test cases for subcloud_audit.py classes"""
def setUp(self):
super(SubcloudAuditTestCase, self).setUp()
# Set up objects for testing
self.sc_audit_queue = SubcloudAuditPriorityQueue()
def tearDown(self):
super(SubcloudAuditTestCase, self).tearDown()
def test_audit_item(self):
print("Running test_audit_item")
item1 = SubcloudAuditData("item1")
self.assertEqual(item1.name, "item1")
self.assertEqual(item1.audit_count, 0)
self.assertEqual(item1, SubcloudAuditData("item1", 0))
self.assertEqual(item1, SubcloudAuditData("item1", 1))
def test_subcloud_audit_queue_single(self):
sc_name = "subcloud1"
subcloud = SubcloudAuditData(sc_name)
self.sc_audit_queue.enqueue(subcloud)
assert self.sc_audit_queue.contains(sc_name)
assert self.sc_audit_queue.qsize() == 1
# peek using the underlying queue
_, sc_audit_item1 = self.sc_audit_queue.queue[0]
assert sc_audit_item1.name == sc_name
assert sc_audit_item1.audit_count == 1
def test_subcloud_audit_queue_multiple(self):
subclouds = [SubcloudAuditData("subcloud%s" % i) for i in range(20)]
delay = 0
for i in range(20):
self.sc_audit_queue.enqueue(subclouds[i], delay)
delay += 10
assert self.sc_audit_queue.qsize() == 20
_, first = self.sc_audit_queue.get()
assert first.name == subclouds[0].name
assert not self.sc_audit_queue.contains(subclouds[0].name)
assert self.sc_audit_queue.qsize() == 19
# re-enqueue with no delay; it should come out first again
self.sc_audit_queue.enqueue(first, 0)
_, first = self.sc_audit_queue.get()
assert first.name == subclouds[0].name
timestamp, second = self.sc_audit_queue.get()
assert second.name == subclouds[1].name
# The time now should be well under the timestamp for this item
assert int(time.time()) < timestamp
def test_subcloud_audit_queue_custom_timestamp(self):
subclouds = [SubcloudAuditData("subcloud%s" % i) for i in range(20)]
timestamp = 0
for i in range(20):
self.sc_audit_queue.enqueue(subclouds[i], timestamp=timestamp)
timestamp += 10
assert self.sc_audit_queue.qsize() == 20
_, first = self.sc_audit_queue.get()
assert first.name == subclouds[0].name
assert not self.sc_audit_queue.contains(subclouds[0].name)
assert self.sc_audit_queue.qsize() == 19
# re-enqueue with no delay; it should come out first again
self.sc_audit_queue.enqueue(first, timestamp=0)
_, first = self.sc_audit_queue.get()
assert first.name == subclouds[0].name
assert first == subclouds[0]
self.sc_audit_queue.enqueue(subclouds[0], timestamp=10000)
prev_timestamp = 0
for i in range(19):
next_timestamp, next_item = self.sc_audit_queue.get()
assert next_timestamp > prev_timestamp
assert next_item.name != subclouds[0].name
prev_timestamp = next_timestamp
next_timestamp, next_item = self.sc_audit_queue.get()
assert next_timestamp == 10000
assert next_item.name == subclouds[0].name
def test_subcloud_audit_requeue(self):
subclouds = [SubcloudAuditData("subcloud%s" % i, 0) for i in range(20)]
timestamp = 0
for i in range(20):
self.sc_audit_queue.enqueue(subclouds[i], timestamp=timestamp)
timestamp += 10
assert self.sc_audit_queue.qsize() == 20
assert self.sc_audit_queue.contains(subclouds[0].name)
got_exception = False
try:
self.sc_audit_queue.enqueue(subclouds[0], timestamp=timestamp)
except SubcloudAuditException:
got_exception = True
assert got_exception
got_exception = False
try:
self.sc_audit_queue.enqueue(
subclouds[0], timestamp=timestamp, allow_requeue=True
)
except SubcloudAuditException:
got_exception = True
assert not got_exception
count = 0
for name in self.sc_audit_queue.enqueued_subcloud_names:
if name == subclouds[0].name:
count += 1
assert count == 2