Remove RPC Hybrid Mode

The RPC Hybrid Mode using both RabbitMQ and ZeroMQ was only
required during the upgrade from a previous version (21.12).
This change remove this mode.

NOTE: The unit tests use a fake implementation of the
openstack.common.rpc backend (RabbitMQ). These tests must
be adjusted so the legacy openstack rpc code can be removed
from the source tree.
I've marked this as TODO(RemoveOpenstackRPCBackend) to
be fixed at a later time.

Test Plan:
PASS: build-pkgs
PASS: Build ISO, install, bootstrap and host-unlock

Story: 2011343
Task: 51659

Change-Id: I521cffbe4604769b3f7ffa6cd464b6b41897cc26
Signed-off-by: Alyson Deives Pereira <alyson.deivespereira@windriver.com>
This commit is contained in:
Alyson Deives Pereira 2025-02-04 18:23:58 -03:00
parent 9d497fb343
commit 4e4f7e5b03
10 changed files with 49 additions and 197 deletions

View File

@ -1,5 +1,5 @@
#
# Copyright (c) 2016-2024 Wind River Systems, Inc.
# Copyright (c) 2016-2025 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
#
@ -40,7 +40,6 @@ from tsconfig.tsconfig import CONTROLLER_UPGRADE_FLAG
from tsconfig.tsconfig import CONTROLLER_UPGRADE_COMPLETE_FLAG
from tsconfig.tsconfig import CONTROLLER_UPGRADE_FAIL_FLAG
from tsconfig.tsconfig import CONTROLLER_UPGRADE_STARTED_FLAG
from tsconfig.tsconfig import SYSINV_HYBRID_RPC_FLAG
from controllerconfig.common import constants
from controllerconfig import utils as cutils
@ -924,11 +923,6 @@ def upgrade_controller(from_release, to_release):
LOG.error("Failed to stop %s service" % "sysinv-agent")
raise
# Creating Sysinv Hybrid Mode flag
# TODO(RPCHybridMode): This is only required for 21.12 -> 22.12 upgrades.
# Remove in future release.
open(SYSINV_HYBRID_RPC_FLAG, "w").close()
# Mount required filesystems from mate controller
LOG.info("Mounting filesystems")
nfs_mount_filesystem(PLATFORM_PATH)

View File

@ -17,7 +17,7 @@
# License for the specific language governing permissions and limitations
# under the License.
#
# Copyright (c) 2013-2024 Wind River Systems, Inc.
# Copyright (c) 2013-2025 Wind River Systems, Inc.
#
@ -69,15 +69,12 @@ from sysinv.common import constants
from sysinv.common import exception
from sysinv.common import service
from sysinv.common import utils
from sysinv.objects import base as objects_base
from sysinv.puppet import common as puppet
from sysinv.conductor import rpcapiproxy as conductor_rpcapi
from sysinv.openstack.common.rpc.common import Timeout
from sysinv.openstack.common.rpc.common import serialize_remote_exception
from sysinv.openstack.common.rpc import service as rpc_service
from sysinv.openstack.common.rpc.common import RemoteError
from sysinv.zmq_rpc.zmq_rpc import ZmqRpcServer
from sysinv.zmq_rpc.zmq_rpc import is_rpc_hybrid_mode_active
import tsconfig.tsconfig as tsc
@ -174,26 +171,12 @@ class AgentManager(service.PeriodicService):
def __init__(self, host, topic):
self.host = host
self.topic = topic
serializer = objects_base.SysinvObjectSerializer()
super(AgentManager, self).__init__()
self._rpc_service = None
self._zmq_rpc_service = None
# TODO(RPCHybridMode): Usage of RabbitMQ RPC is only required for
# 21.12 -> 22.12 upgrades.
# Remove this in new releases, when it's no longer necessary do the
# migration work through RabbitMQ and ZeroMQ
# NOTE: If more switches are necessary before RabbitMQ removal,
# refactor this into an RPC layer
if not CONF.rpc_backend_zeromq or is_rpc_hybrid_mode_active():
self._rpc_service = rpc_service.Service(self.host, self.topic,
manager=self,
serializer=serializer)
if CONF.rpc_backend_zeromq:
self._zmq_rpc_service = ZmqRpcServer(
self,
CONF.rpc_zeromq_bind_ip,
CONF.rpc_zeromq_agent_bind_port)
self._zmq_rpc_service = ZmqRpcServer(
self,
CONF.rpc_zeromq_bind_ip,
CONF.rpc_zeromq_agent_bind_port)
self._report_to_conductor_iplatform_avail_flag = False
self._report_to_conductor_fpga_info = True
@ -232,8 +215,7 @@ class AgentManager(service.PeriodicService):
def start(self):
super(AgentManager, self).start()
if self._rpc_service:
self._rpc_service.start()
if self._zmq_rpc_service:
self._zmq_rpc_service.run()
# Do not collect inventory and report to conductor at startup in
@ -247,12 +229,10 @@ class AgentManager(service.PeriodicService):
if tsc.system_mode == constants.SYSTEM_MODE_SIMPLEX:
utils.touch(SYSINV_READY_FLAG)
def stop(self):
if self._rpc_service:
self._rpc_service.stop()
def stop(self, graceful=False):
if self._zmq_rpc_service:
self._zmq_rpc_service.stop()
super(AgentManager, self).stop()
super(AgentManager, self).stop(graceful)
def _report_to_conductor(self):
""" Initial inventory report to conductor required
@ -2395,26 +2375,6 @@ class AgentManager(service.PeriodicService):
LOG.exception("Sysinv Agent exception updating ipv"
"conductor.")
# TODO(RPCHybridMode): This is only useful for 21.12 -> 22.12 upgrades.
# Remove this method in new releases, when it's no longer necessary to
# perform upgrade through hybrid mode messaging system
def delete_sysinv_hybrid_state(self, context, host_uuid):
"""Delete the Sysinv flag of Hybrid Mode.
:param host_uuid: ihost uuid unique id
:return: None
"""
if self._ihost_uuid and self._ihost_uuid == host_uuid:
if os.path.exists(tsc.SYSINV_HYBRID_RPC_FLAG):
utils.delete_if_exists(tsc.SYSINV_HYBRID_RPC_FLAG)
LOG.info("Sysinv Hybrid Mode deleted.")
LOG.info("Sysinv services will be restarted")
# pylint: disable=not-callable
subprocess.call(['/usr/bin/sysinv-service-restart.sh'])
else:
LOG.info("Hybrid flag doesn't exist. Ignoring delete.")
def report_initial_inventory(self, context, host_uuid):
# conductor requests re-report initial inventory
if self._ihost_uuid and self._ihost_uuid == host_uuid:

View File

@ -16,7 +16,7 @@
# License for the specific language governing permissions and limitations
# under the License.
#
# Copyright (c) 2013-2023 Wind River Systems, Inc.
# Copyright (c) 2013-2023,2025 Wind River Systems, Inc.
#
"""
@ -294,22 +294,6 @@ class AgentAPI(sysinv.openstack.common.rpc.proxy.RpcProxy):
retimer_included=retimer_included),
topic=topic)
# TODO(RPCHybridMode): This is only useful for 21.12 -> 22.12 upgrades.
# Remove this method in new releases, when it's no longer necessary to
# perform upgrade through hybrid mode messaging system
def delete_sysinv_hybrid_state(self, context, host_uuid):
"""Asynchronously, have the agent to delete sysinv hybrid
mode flag
:param context: request context.
:param host_uuid: ihost uuid unique id
:returns: pass or fail
"""
return self.cast(context,
self.make_msg('delete_sysinv_hybrid_state',
host_uuid=host_uuid))
def report_initial_inventory(self, context, host_uuid):
""" Synchronously, request the agent to re-report initial inventory
"""

View File

@ -1,4 +1,4 @@
# Copyright (c) 2022 Wind River Systems, Inc.
# Copyright (c) 2022,2025 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
@ -7,7 +7,6 @@ from oslo_log import log
import sysinv.agent.rpcapi as rpcapi
from sysinv.agent.rpcapizmq import AgentAPI as ZMQAgentAPI
from sysinv.agent.rpcapi import AgentAPI as AMQPAgentAPI
from sysinv.zmq_rpc.zmq_rpc import is_rpc_hybrid_mode_active
LOG = log.getLogger(__name__)
MANAGER_TOPIC = rpcapi.MANAGER_TOPIC
@ -16,11 +15,17 @@ MANAGER_TOPIC = rpcapi.MANAGER_TOPIC
def AgentAPI(topic=None):
rpc_backend = cfg.CONF.rpc_backend
rpc_backend_zeromq = cfg.CONF.rpc_backend_zeromq
rpc_backend_hybrid_mode = is_rpc_hybrid_mode_active()
LOG.debug("Current agent rpc_backend: {} "
"use_zeromq: {} hybrid_mode: {}".format(rpc_backend,
rpc_backend_zeromq,
rpc_backend_hybrid_mode))
"use_zeromq: {}".format(rpc_backend,
rpc_backend_zeromq
))
if rpc_backend_zeromq:
return ZMQAgentAPI(topic)
# TODO(RemoveOpenstackRPCBackend): The legacy openstack.common.rpc backend
# is not used anymore. However the unit tests use a fake implementation of
# this backend. Unit tests need to be adjusted to use a mockable/fake
# version zeromq rpc backend and then the legacy openstack.common.rpc code
# can be removed from source tree.
# We return the AMQPAgentAPI here just for the unit tests.
LOG.warn(f"Using rpc_backend {rpc_backend} is only intended for unit tests")
return AMQPAgentAPI(topic)

View File

@ -1,4 +1,4 @@
# Copyright (c) 2022 Wind River Systems, Inc.
# Copyright (c) 2022,2025 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
@ -11,8 +11,6 @@ from oslo_log import log
from sysinv.agent.rpcapi import AgentAPI as BaseAgentAPI
from sysinv.agent.rpcapi import MANAGER_TOPIC
from sysinv.zmq_rpc.zmq_rpc import ZmqRpcClient
from sysinv.zmq_rpc.zmq_rpc import is_rpc_hybrid_mode_active
from sysinv.zmq_rpc.zmq_rpc import is_zmq_backend_available
CONF = cfg.CONF
LOG = log.getLogger(__name__)
@ -27,36 +25,10 @@ class AgentAPI(ZmqRpcClient, BaseAgentAPI):
super(AgentAPI, self).__init__(host, port, topic)
def call(self, context, msg, topic=None, version=None, timeout=None):
if is_rpc_hybrid_mode_active():
host_uuid = msg['args']['host_uuid']
if not is_zmq_backend_available(host_uuid):
LOG.debug("RPC hybrid mode is active and agent zmq backend is "
"not yet available in host {}. Calling RPC call "
"method {} through rabbitmq".format(host_uuid,
msg['method']))
rpcapi = BaseAgentAPI()
return rpcapi.call(context, msg, topic, version, timeout)
return super(AgentAPI, self).call(context, msg, timeout)
def cast(self, context, msg, topic=None, version=None):
if is_rpc_hybrid_mode_active():
host_uuid = msg['args']['host_uuid']
if not is_zmq_backend_available(host_uuid):
LOG.debug("RPC hybrid mode is active and agent zmq backend is "
"not yet available in host {}. Calling RPC cast "
"method {} through rabbitmq".format(host_uuid,
msg['method']))
rpcapi = BaseAgentAPI()
return rpcapi.cast(context, msg, topic, version)
return super(AgentAPI, self).cast(context, msg)
def fanout_cast(self, context, msg, topic=None, version=None):
if is_rpc_hybrid_mode_active():
method = msg['method']
LOG.debug("RPC hybrid mode is active. Calling RPC fanout_cast "
"method {} through rabbitmq and zmq".format(method))
rpcapi = BaseAgentAPI()
rpcapi.fanout_cast(context, msg, topic, version)
return super(AgentAPI, self).fanout_cast(context, msg)

View File

@ -16,7 +16,7 @@
# License for the specific language governing permissions and limitations
# under the License.
#
# Copyright (c) 2013-2024 Wind River Systems, Inc.
# Copyright (c) 2013-2025 Wind River Systems, Inc.
#
"""Conduct all activity related system inventory.
@ -132,9 +132,7 @@ from sysinv.conductor import keystone_listener
from sysinv.db import api as dbapi
from sysinv.loads.loads import LoadImport
from sysinv import objects
from sysinv.objects import base as objects_base
from sysinv.objects import kube_app as kubeapp_obj
from sysinv.openstack.common.rpc import service as rpc_service
from sysinv.puppet import common as puppet_common
from sysinv.puppet import puppet
from sysinv.puppet import interface as pinterface
@ -143,7 +141,6 @@ from sysinv.helm.lifecycle_constants import LifecycleConstants
from sysinv.helm.lifecycle_hook import LifecycleHookInfo
from sysinv.helm import common
from sysinv.zmq_rpc.zmq_rpc import ZmqRpcServer
from sysinv.zmq_rpc.zmq_rpc import is_rpc_hybrid_mode_active
MANAGER_TOPIC = 'sysinv.conductor_manager'
@ -322,26 +319,12 @@ class ConductorManager(service.PeriodicService):
def __init__(self, host, topic):
self.host = host
self.topic = topic
serializer = objects_base.SysinvObjectSerializer()
super(ConductorManager, self).__init__()
self._rpc_service = None
self._zmq_rpc_service = None
# TODO(RPCHybridMode): Usage of RabbitMQ RPC is only required for
# 21.12 -> 22.12 upgrades.
# Remove this in new releases, when it's no longer necessary do the
# migration work through RabbitMQ and ZeroMQ
# NOTE: If more switches are necessary before RabbitMQ removal,
# refactor this into an RPC layer
if not CONF.rpc_backend_zeromq or is_rpc_hybrid_mode_active():
self._rpc_service = rpc_service.Service(self.host, self.topic,
manager=self,
serializer=serializer)
if CONF.rpc_backend_zeromq:
self._zmq_rpc_service = ZmqRpcServer(
self,
CONF.rpc_zeromq_conductor_bind_ip,
CONF.rpc_zeromq_conductor_bind_port)
self._zmq_rpc_service = ZmqRpcServer(
self,
CONF.rpc_zeromq_conductor_bind_ip,
CONF.rpc_zeromq_conductor_bind_port)
self.dbapi = None
self.fm_api = None
@ -423,9 +406,6 @@ class ConductorManager(service.PeriodicService):
# accept API calls and run periodic tasks after
# initializing conductor manager service
if self._rpc_service:
self._rpc_service.start()
if self._zmq_rpc_service:
self._zmq_rpc_service.run()
@ -577,12 +557,10 @@ class ConductorManager(service.PeriodicService):
""" Periodic tasks are run at pre-specified intervals. """
return self.run_periodic_tasks(context, raise_on_error=raise_on_error)
def stop(self):
if self._rpc_service:
self._rpc_service.stop()
def stop(self, graceful=False):
if self._zmq_rpc_service:
self._zmq_rpc_service.stop()
super(ConductorManager, self).stop()
super(ConductorManager, self).stop(graceful)
@contextmanager
def session(self):
@ -15531,16 +15509,6 @@ class ConductorManager(service.PeriodicService):
# Delete upgrade record
self.dbapi.software_upgrade_destroy(upgrade.uuid)
# TODO(RPCHybridMode): This is only useful for 21.12 -> 22.12 upgrades.
# Remove this in new releases, when it's no longer necessary
# do the migration work through RabbitMQ and ZeroMQ
if (tsc.system_mode != constants.SYSTEM_MODE_SIMPLEX):
rpcapi = agent_rpcapi.AgentAPI()
controller_1 = self.dbapi.ihost_get_by_hostname(
constants.CONTROLLER_1_HOSTNAME)
LOG.info("Deleting Sysinv Hybrid state")
rpcapi.delete_sysinv_hybrid_state(context, controller_1['uuid'])
# TODO(fcorream): This is just needed for upgrade from R7 to R8
# need to remove the flag that disables the use of FQDN during the
# upgrade

View File

@ -1,16 +1,13 @@
# Copyright (c) 2022 Wind River Systems, Inc.
# Copyright (c) 2022,2025 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
import os
from oslo_config import cfg
from oslo_log import log
import sysinv.conductor.rpcapi as rpcapi
from sysinv.conductor.rpcapi import ConductorAPI as AMQPConductorAPI
from sysinv.conductor.rpcapizmq import ConductorAPI as ZMQConductorAPI
from sysinv.zmq_rpc.zmq_rpc import is_rpc_hybrid_mode_active
from sysinv.zmq_rpc.zmq_rpc import check_connection
LOG = log.getLogger(__name__)
MANAGER_TOPIC = rpcapi.MANAGER_TOPIC
@ -18,30 +15,18 @@ MANAGER_TOPIC = rpcapi.MANAGER_TOPIC
def ConductorAPI(topic=None):
rpc_backend_zeromq = cfg.CONF.rpc_backend_zeromq
rpc_backend_hybrid_mode = is_rpc_hybrid_mode_active()
rpc_backend = cfg.CONF.rpc_backend
LOG.debug("Current conductor rpc_backend: {} "
"use_zeromq: {} hybrid_mode: {}".format(rpc_backend,
rpc_backend_zeromq,
rpc_backend_hybrid_mode))
# Hybrid mode is expected to be defined for controller-1 only during upgrade
# all other nodes should be running ZeroMQ exclusively
if rpc_backend_hybrid_mode:
# in controller-1 agent, we need to know if conductor
# is able to listen to ZeroRPC.
# If conductor is running on same host, we know it is running in
# hybrid mode, and we assume ZeroMQ is preferred.
# Otherwise, it can be conductor running on controller-0 before
# migrate to ZeroMQ, so we verify before send the RPC call
# if ZeroMQ is running and if yes, use it, otherwise use RabbitMQ
if os.path.isfile("/var/run/sysinv-conductor.pid"):
return ZMQConductorAPI(topic)
else:
if check_connection(cfg.CONF.rpc_zeromq_conductor_bind_ip,
cfg.CONF.rpc_zeromq_conductor_bind_port):
return ZMQConductorAPI(topic)
else:
return AMQPConductorAPI(topic)
"use_zeromq: {}".format(rpc_backend,
rpc_backend_zeromq
))
if rpc_backend_zeromq:
return ZMQConductorAPI(topic)
# TODO(RemoveOpenstackRPCBackend): The legacy openstack.common.rpc backend
# is not used anymore. However the unit tests use a fake implementation of
# this backend. Unit tests need to be adjusted to use a mockable/fake
# version zeromq rpc backend and then the legacy openstack.common.rpc code
# can be removed from source tree.
# We return the AMQPConductorAPI here just for the unit tests.
LOG.warn(f"Using rpc_backend {rpc_backend} is only intended for unit tests")
return AMQPConductorAPI(topic)

View File

@ -36,6 +36,11 @@ class ConfFixture(config_fixture.Config):
super(ConfFixture, self).setUp()
self.conf.set_default('host', 'fake-mini')
# TODO(RemoveOpenstackRPCBackend): The openstack.common.rpc backend
# is not used anymore. The unit tests use a fake implementation of
# this backend. This needs to be adjusted to use a mockable/fake
# version zeromq rpc backend. Then remove the openstack.common.rpc code
# from source tree.
self.conf.set_default('rpc_backend',
'sysinv.openstack.common.rpc.impl_fake')
self.conf.set_default('rpc_backend_zeromq', False)

View File

@ -1,11 +1,10 @@
# Copyright (c) 2022-2024 Wind River Systems, Inc.
# Copyright (c) 2022-2025 Wind River Systems, Inc.
#
# SPDX-License-Identifier: Apache-2.0
import dns.resolver
import zerorpc
import eventlet
import os
from eventlet import greenthread
from oslo_log import log
@ -19,7 +18,6 @@ from sysinv.zmq_rpc.client_provider import ClientProvider
from sysinv.zmq_rpc.serializer import decode
from sysinv.zmq_rpc.serializer import encode
import sysinv.openstack.common.rpc.common as rpc_common
import tsconfig.tsconfig as tsc
LOG = log.getLogger(__name__)
@ -163,7 +161,8 @@ class ZmqRpcClient(object):
if not client:
host_uuid = kwargs.get('host_uuid', None)
if host_uuid is None:
raise Exception("Missing host_uuid parameter for rpc endpoint")
raise Exception(f"Remote call/cast to {method} is missing "
f"host_uuid parameter for rpc endpoint")
dbapi = api.get_instance()
host = dbapi.ihost_get(host_uuid)
if (utils.is_fqdn_ready_to_use()
@ -269,22 +268,6 @@ class ZmqRpcClient(object):
return endpoints
# TODO(RPCHybridMode): This function is only useful for 21.12 -> 22.12 upgrades.
# Remove in future release.
def is_rpc_hybrid_mode_active():
return os.path.isfile(tsc.SYSINV_HYBRID_RPC_FLAG)
# TODO(RPCHybridMode): This function is only useful for 21.12 -> 22.12 upgrades.
# Remove in future release.
def is_zmq_backend_available(host_uuid):
dbapi = api.get_instance()
host = dbapi.ihost_get(host_uuid)
host_upgrade = dbapi.host_upgrade_get_by_host(host.id)
target_load = dbapi.load_get(host_upgrade.target_load)
return target_load.software_version >= tsc.SW_VERSION_22_12
def get_tcp_endpoint(host, port):
return "tcp://[{}]:{}".format(host, port)

View File

@ -1,5 +1,5 @@
"""
Copyright (c) 2014-2024 Wind River Systems, Inc.
Copyright (c) 2014-2025 Wind River Systems, Inc.
SPDX-License-Identifier: Apache-2.0
@ -272,10 +272,6 @@ UPGRADE_ABORT_FLAG = os.path.join(
PTP_UPDATE_PARAMETERS_DONE = '.update_ptp_parameters_done'
PTP_UPDATE_PARAMETERS_FLAG = os.path.join(CONFIG_PATH,
PTP_UPDATE_PARAMETERS_DONE)
# TODO(RPCHybridMode): This is required only for 21.12 -> 22.12 upgrades.
# Remove in future release.
SYSINV_HYBRID_RPC_FLAG = os.path.join(
PLATFORM_CONF_PATH, '.sysinv_hybrid_rpc')
# Set on controller-0 (by controller-1) to indicate that data migration has
# started