Drop usage of RpcProxy from DhcpPluginApi

Drop the usage of the RpcProxy compatibility class from the
DhcpPluginApi RPC client class.  The implementation has been updated
to use the appropariate APIs from oslo.messaging directly.

Part of blueprint drop-rpc-compat.

Change-Id: I81bfd761707c4c587b12877668c4399efe3d652e
This commit is contained in:
Russell Bryant 2014-11-07 15:11:42 +01:00
parent 8edf980078
commit 0b924798ff
2 changed files with 66 additions and 118 deletions

View File

@ -399,7 +399,7 @@ class DhcpAgent(manager.Manager):
pm.disable()
class DhcpPluginApi(n_rpc.RpcProxy):
class DhcpPluginApi(object):
"""Agent side of the dhcp rpc API.
API version history:
@ -409,76 +409,66 @@ class DhcpPluginApi(n_rpc.RpcProxy):
"""
BASE_RPC_API_VERSION = '1.1'
def __init__(self, topic, context, use_namespaces):
super(DhcpPluginApi, self).__init__(
topic=topic, default_version=self.BASE_RPC_API_VERSION)
self.context = context
self.host = cfg.CONF.host
self.use_namespaces = use_namespaces
target = messaging.Target(topic=topic, version='1.1')
self.client = n_rpc.get_client(target)
def get_active_networks_info(self):
"""Make a remote process call to retrieve all network info."""
networks = self.call(self.context,
self.make_msg('get_active_networks_info',
host=self.host))
cctxt = self.client.prepare()
networks = cctxt.call(self.context, 'get_active_networks_info',
host=self.host)
return [dhcp.NetModel(self.use_namespaces, n) for n in networks]
def get_network_info(self, network_id):
"""Make a remote process call to retrieve network info."""
network = self.call(self.context,
self.make_msg('get_network_info',
network_id=network_id,
host=self.host))
cctxt = self.client.prepare()
network = cctxt.call(self.context, 'get_network_info',
network_id=network_id, host=self.host)
if network:
return dhcp.NetModel(self.use_namespaces, network)
def get_dhcp_port(self, network_id, device_id):
"""Make a remote process call to get the dhcp port."""
port = self.call(self.context,
self.make_msg('get_dhcp_port',
network_id=network_id,
device_id=device_id,
host=self.host))
cctxt = self.client.prepare()
port = cctxt.call(self.context, 'get_dhcp_port',
network_id=network_id, device_id=device_id,
host=self.host)
if port:
return dhcp.DictModel(port)
def create_dhcp_port(self, port):
"""Make a remote process call to create the dhcp port."""
port = self.call(self.context,
self.make_msg('create_dhcp_port',
port=port,
host=self.host))
cctxt = self.client.prepare()
port = cctxt.call(self.context, 'create_dhcp_port',
port=port, host=self.host)
if port:
return dhcp.DictModel(port)
def update_dhcp_port(self, port_id, port):
"""Make a remote process call to update the dhcp port."""
port = self.call(self.context,
self.make_msg('update_dhcp_port',
port_id=port_id,
port=port,
host=self.host))
cctxt = self.client.prepare()
port = cctxt.call(self.context, 'update_dhcp_port',
port_id=port_id, port=port, host=self.host)
if port:
return dhcp.DictModel(port)
def release_dhcp_port(self, network_id, device_id):
"""Make a remote process call to release the dhcp port."""
return self.call(self.context,
self.make_msg('release_dhcp_port',
network_id=network_id,
device_id=device_id,
host=self.host))
cctxt = self.client.prepare()
return cctxt.call(self.context, 'release_dhcp_port',
network_id=network_id, device_id=device_id,
host=self.host)
def release_port_fixed_ip(self, network_id, device_id, subnet_id):
"""Make a remote process call to release a fixed_ip on the port."""
return self.call(self.context,
self.make_msg('release_port_fixed_ip',
network_id=network_id,
subnet_id=subnet_id,
device_id=device_id,
host=self.host))
cctxt = self.client.prepare()
return cctxt.call(self.context, 'release_port_fixed_ip',
network_id=network_id, subnet_id=subnet_id,
device_id=device_id, host=self.host)
class NetworkCache(object):

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import copy
import sys
import uuid
@ -30,6 +31,7 @@ from neutron.agent.linux import interface
from neutron.common import config as common_config
from neutron.common import constants as const
from neutron.common import exceptions
from neutron import context
from neutron.tests import base
@ -929,101 +931,57 @@ class TestDhcpAgentEventHandler(base.BaseTestCase):
class TestDhcpPluginApiProxy(base.BaseTestCase):
def setUp(self):
super(TestDhcpPluginApiProxy, self).setUp()
self.proxy = dhcp_agent.DhcpPluginApi('foo', {}, None)
self.proxy.host = 'foo'
def _test_dhcp_api(self, method, **kwargs):
ctxt = context.get_admin_context()
proxy = dhcp_agent.DhcpPluginApi('foo', ctxt, None)
proxy.host = 'foo'
self.call_p = mock.patch.object(self.proxy, 'call')
self.call = self.call_p.start()
self.make_msg_p = mock.patch.object(self.proxy, 'make_msg')
self.make_msg = self.make_msg_p.start()
with contextlib.nested(
mock.patch.object(proxy.client, 'call'),
mock.patch.object(proxy.client, 'prepare'),
) as (
rpc_mock, prepare_mock
):
prepare_mock.return_value = proxy.client
rpc_mock.return_value = kwargs.pop('return_value', [])
def test_get_network_info(self):
self.call.return_value = dict(a=1)
retval = self.proxy.get_network_info('netid')
self.assertEqual(retval.a, 1)
self.assertTrue(self.call.called)
self.make_msg.assert_called_once_with('get_network_info',
network_id='netid',
host='foo')
prepare_args = {}
if 'version' in kwargs:
prepare_args['version'] = kwargs.pop('version')
def test_get_dhcp_port(self):
self.call.return_value = dict(a=1)
retval = self.proxy.get_dhcp_port('netid', 'devid')
self.assertEqual(retval.a, 1)
self.assertTrue(self.call.called)
self.make_msg.assert_called_once_with('get_dhcp_port',
network_id='netid',
device_id='devid',
host='foo')
retval = getattr(proxy, method)(**kwargs)
self.assertEqual(retval, rpc_mock.return_value)
def test_get_dhcp_port_none(self):
self.call.return_value = None
self.assertIsNone(self.proxy.get_dhcp_port('netid', 'devid'))
prepare_mock.assert_called_once_with(**prepare_args)
kwargs['host'] = proxy.host
rpc_mock.assert_called_once_with(ctxt, method, **kwargs)
def test_get_active_networks_info(self):
self.proxy.get_active_networks_info()
self.make_msg.assert_called_once_with('get_active_networks_info',
host='foo')
self._test_dhcp_api('get_active_networks_info')
def test_get_network_info(self):
self._test_dhcp_api('get_network_info', network_id='fake_id',
return_value=None)
def test_get_dhcp_port(self):
self._test_dhcp_api('get_dhcp_port', network_id='fake_id',
device_id='fake_id_2', return_value=None)
def test_create_dhcp_port(self):
port_body = (
{'port':
{'name': '', 'admin_state_up': True,
'network_id': fake_network.id,
'tenant_id': fake_network.tenant_id,
'fixed_ips': [{'subnet_id': fake_fixed_ip1.subnet_id}],
'device_id': mock.ANY}})
self.proxy.create_dhcp_port(port_body)
self.make_msg.assert_called_once_with('create_dhcp_port',
port=port_body,
host='foo')
def test_create_dhcp_port_none(self):
self.call.return_value = None
port_body = (
{'port':
{'name': '', 'admin_state_up': True,
'network_id': fake_network.id,
'tenant_id': fake_network.tenant_id,
'fixed_ips': [{'subnet_id': fake_fixed_ip1.subnet_id}],
'device_id': mock.ANY}})
self.assertIsNone(self.proxy.create_dhcp_port(port_body))
def test_update_dhcp_port_none(self):
self.call.return_value = None
port_body = {'port': {'fixed_ips':
[{'subnet_id': fake_fixed_ip1.subnet_id}]}}
self.assertIsNone(self.proxy.update_dhcp_port(fake_port1.id,
port_body))
self._test_dhcp_api('create_dhcp_port', port='fake_port',
return_value=None)
def test_update_dhcp_port(self):
port_body = {'port': {'fixed_ips':
[{'subnet_id': fake_fixed_ip1.subnet_id}]}}
self.proxy.update_dhcp_port(fake_port1.id, port_body)
self.make_msg.assert_called_once_with('update_dhcp_port',
port_id=fake_port1.id,
port=port_body,
host='foo')
self._test_dhcp_api('update_dhcp_port', port_id='fake_id',
port='fake_port', return_value=None)
def test_release_dhcp_port(self):
self.proxy.release_dhcp_port('netid', 'devid')
self.assertTrue(self.call.called)
self.make_msg.assert_called_once_with('release_dhcp_port',
network_id='netid',
device_id='devid',
host='foo')
self._test_dhcp_api('release_dhcp_port', network_id='fake_id',
device_id='fake_id_2')
def test_release_port_fixed_ip(self):
self.proxy.release_port_fixed_ip('netid', 'devid', 'subid')
self.assertTrue(self.call.called)
self.make_msg.assert_called_once_with('release_port_fixed_ip',
network_id='netid',
subnet_id='subid',
device_id='devid',
host='foo')
self._test_dhcp_api('release_port_fixed_ip', network_id='fake_id',
device_id='fake_id_2', subnet_id='fake_id_3')
class TestNetworkCache(base.BaseTestCase):