From 3bb80a03337237bc0ab7a0374506da7c5692ee9f Mon Sep 17 00:00:00 2001 From: Russell Bryant Date: Mon, 1 Dec 2014 18:42:30 +0000 Subject: [PATCH] Drop RpcProxy usage from ml2 AgentNotifierApi Remove usage of the RpcProxy compatibility class from the ml2 AgentNotifierApi. The equivalent oslo.messaging APIs are now used instead. A couple of other mixin APIs had to be converted at the same time. Note that there is one very minor functional change here. The base rpc version is set to '1.0' now instead of '1.1'. The right pattern to use is to always set the base to be N.0. Any method that needs a newer version should specify it. Part of blueprint drop-rpc-compat. Change-Id: I640568e2d73c9eb7a9505db640dc1427a1ae2abe --- neutron/api/rpc/handlers/dvr_rpc.py | 8 ++-- neutron/plugins/ml2/drivers/type_tunnel.py | 9 ++-- neutron/plugins/ml2/rpc.py | 31 +++++++------- neutron/tests/unit/ml2/test_rpcapi.py | 48 +++++++--------------- 4 files changed, 35 insertions(+), 61 deletions(-) diff --git a/neutron/api/rpc/handlers/dvr_rpc.py b/neutron/api/rpc/handlers/dvr_rpc.py index ba648bb0ae..43a5d68acb 100644 --- a/neutron/api/rpc/handlers/dvr_rpc.py +++ b/neutron/api/rpc/handlers/dvr_rpc.py @@ -98,11 +98,9 @@ class DVRAgentRpcApiMixin(object): """Notify dvr mac address updates.""" if not dvr_macs: return - self.fanout_cast(context, - self.make_msg('dvr_mac_address_update', - dvr_macs=dvr_macs), - version=self.DVR_RPC_VERSION, - topic=self._get_dvr_update_topic()) + cctxt = self.client.prepare(topic=self._get_dvr_update_topic(), + version=self.DVR_RPC_VERSION, fanout=True) + cctxt.cast(context, 'dvr_mac_address_update', dvr_macs=dvr_macs) class DVRAgentRpcCallbackMixin(object): diff --git a/neutron/plugins/ml2/drivers/type_tunnel.py b/neutron/plugins/ml2/drivers/type_tunnel.py index b6f34eaa58..b0bad17834 100644 --- a/neutron/plugins/ml2/drivers/type_tunnel.py +++ b/neutron/plugins/ml2/drivers/type_tunnel.py @@ -187,8 +187,7 @@ class TunnelAgentRpcApiMixin(object): topics.UPDATE) def tunnel_update(self, context, tunnel_ip, tunnel_type): - self.fanout_cast(context, - self.make_msg('tunnel_update', - tunnel_ip=tunnel_ip, - tunnel_type=tunnel_type), - topic=self._get_tunnel_update_topic()) + cctxt = self.client.prepare(topic=self._get_tunnel_update_topic(), + fanout=True) + cctxt.cast(context, 'tunnel_update', tunnel_ip=tunnel_ip, + tunnel_type=tunnel_type) diff --git a/neutron/plugins/ml2/rpc.py b/neutron/plugins/ml2/rpc.py index 72fc955d17..d51d6d23b0 100644 --- a/neutron/plugins/ml2/rpc.py +++ b/neutron/plugins/ml2/rpc.py @@ -13,6 +13,8 @@ # License for the specific language governing permissions and limitations # under the License. +from oslo import messaging + from neutron.agent import securitygroups_rpc as sg_rpc from neutron.api.rpc.handlers import dvr_rpc from neutron.common import constants as q_const @@ -172,8 +174,7 @@ class RpcCallbacks(n_rpc.RpcCallback, LOG.debug('Port %s not found during ARP update', port_id) -class AgentNotifierApi(n_rpc.RpcProxy, - dvr_rpc.DVRAgentRpcApiMixin, +class AgentNotifierApi(dvr_rpc.DVRAgentRpcApiMixin, sg_rpc.SecurityGroupAgentRpcApiMixin, type_tunnel.TunnelAgentRpcApiMixin): """Agent side of the openvswitch rpc API. @@ -185,30 +186,26 @@ class AgentNotifierApi(n_rpc.RpcProxy, """ - BASE_RPC_API_VERSION = '1.1' - def __init__(self, topic): - super(AgentNotifierApi, self).__init__( - topic=topic, default_version=self.BASE_RPC_API_VERSION) + self.topic = topic self.topic_network_delete = topics.get_topic_name(topic, topics.NETWORK, topics.DELETE) self.topic_port_update = topics.get_topic_name(topic, topics.PORT, topics.UPDATE) + target = messaging.Target(topic=topic, version='1.0') + self.client = n_rpc.get_client(target) def network_delete(self, context, network_id): - self.fanout_cast(context, - self.make_msg('network_delete', - network_id=network_id), - topic=self.topic_network_delete) + cctxt = self.client.prepare(topic=self.topic_network_delete, + fanout=True) + cctxt.cast(context, 'network_delete', network_id=network_id) def port_update(self, context, port, network_type, segmentation_id, physical_network): - self.fanout_cast(context, - self.make_msg('port_update', - port=port, - network_type=network_type, - segmentation_id=segmentation_id, - physical_network=physical_network), - topic=self.topic_port_update) + cctxt = self.client.prepare(topic=self.topic_port_update, + fanout=True) + cctxt.cast(context, 'port_update', port=port, + network_type=network_type, segmentation_id=segmentation_id, + physical_network=physical_network) diff --git a/neutron/tests/unit/ml2/test_rpcapi.py b/neutron/tests/unit/ml2/test_rpcapi.py index a05d1dabf4..f4a5fb17ab 100644 --- a/neutron/tests/unit/ml2/test_rpcapi.py +++ b/neutron/tests/unit/ml2/test_rpcapi.py @@ -25,7 +25,6 @@ import mock from neutron.agent import rpc as agent_rpc from neutron.common import constants from neutron.common import exceptions -from neutron.common import rpc as n_rpc from neutron.common import topics from neutron.openstack.common import context from neutron.plugins.ml2.drivers import type_tunnel @@ -166,35 +165,11 @@ class RpcCallbacksTestCase(base.BaseTestCase): class RpcApiTestCase(base.BaseTestCase): - def _test_rpc_api_legacy(self, rpcapi, topic, method, rpc_method, - **kwargs): - # NOTE(russellb) This can be removed once AgentNotifierApi has been - # converted over to no longer use the RpcProxy compatibility class. - ctxt = context.RequestContext('fake_user', 'fake_project') - expected_retval = 'foo' if rpc_method == 'call' else None - expected_version = kwargs.pop('version', None) - expected_msg = rpcapi.make_msg(method, **kwargs) - - rpc = n_rpc.RpcProxy - with mock.patch.object(rpc, rpc_method) as rpc_method_mock: - rpc_method_mock.return_value = expected_retval - retval = getattr(rpcapi, method)(ctxt, **kwargs) - - self.assertEqual(retval, expected_retval) - additional_args = {} - if topic: - additional_args['topic'] = topic - if expected_version: - additional_args['version'] = expected_version - expected = [ - mock.call(ctxt, expected_msg, **additional_args) - ] - rpc_method_mock.assert_has_calls(expected) - def _test_rpc_api(self, rpcapi, topic, method, rpc_method, **kwargs): ctxt = context.RequestContext('fake_user', 'fake_project') expected_retval = 'foo' if rpc_method == 'call' else None expected_version = kwargs.pop('version', None) + fanout = kwargs.pop('fanout', False) with contextlib.nested( mock.patch.object(rpcapi.client, rpc_method), @@ -209,6 +184,10 @@ class RpcApiTestCase(base.BaseTestCase): prepare_args = {} if expected_version: prepare_args['version'] = expected_version + if fanout: + prepare_args['fanout'] = fanout + if topic: + prepare_args['topic'] = topic prepare_mock.assert_called_once_with(**prepare_args) self.assertEqual(retval, expected_retval) @@ -216,35 +195,36 @@ class RpcApiTestCase(base.BaseTestCase): def test_delete_network(self): rpcapi = plugin_rpc.AgentNotifierApi(topics.AGENT) - self._test_rpc_api_legacy( + self._test_rpc_api( rpcapi, topics.get_topic_name(topics.AGENT, topics.NETWORK, topics.DELETE), - 'network_delete', rpc_method='fanout_cast', - network_id='fake_request_spec') + 'network_delete', rpc_method='cast', + fanout=True, network_id='fake_request_spec') def test_port_update(self): rpcapi = plugin_rpc.AgentNotifierApi(topics.AGENT) - self._test_rpc_api_legacy( + self._test_rpc_api( rpcapi, topics.get_topic_name(topics.AGENT, topics.PORT, topics.UPDATE), - 'port_update', rpc_method='fanout_cast', - port='fake_port', + 'port_update', rpc_method='cast', + fanout=True, port='fake_port', network_type='fake_network_type', segmentation_id='fake_segmentation_id', physical_network='fake_physical_network') def test_tunnel_update(self): rpcapi = plugin_rpc.AgentNotifierApi(topics.AGENT) - self._test_rpc_api_legacy( + self._test_rpc_api( rpcapi, topics.get_topic_name(topics.AGENT, type_tunnel.TUNNEL, topics.UPDATE), - 'tunnel_update', rpc_method='fanout_cast', + 'tunnel_update', rpc_method='cast', + fanout=True, tunnel_ip='fake_ip', tunnel_type='gre') def test_device_details(self):