
This fixes regressions in commit 9d13ea884bff749b3975acb5eb5630e5aca4a665. Handle device name prefixes other than "tap". For example, nova hybrid interface driver uses "qvo" prefix. Also, ignore non neutron ports correctly. For example, veth pairs used to connect physical bridges. Closes-Bug: #1341465 Change-Id: I1d71c8a2cf8c2f71f0dbcfb54c9b347e24c03562
1070 lines
48 KiB
Python
1070 lines
48 KiB
Python
# Copyright (C) 2014 VA Linux Systems Japan K.K.
|
|
# Based on test for openvswitch agent(test_ovs_neutron_agent.py).
|
|
#
|
|
# Copyright (c) 2012 OpenStack Foundation.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
#
|
|
# @author: Fumihiko Kakuma, VA Linux Systems Japan K.K.
|
|
# @author: YAMAMOTO Takashi, VA Linux Systems Japan K.K.
|
|
|
|
import contextlib
|
|
|
|
import mock
|
|
import netaddr
|
|
from oslo.config import cfg
|
|
import testtools
|
|
|
|
from neutron.agent.linux import ip_lib
|
|
from neutron.agent.linux import utils
|
|
from neutron.openstack.common import importutils
|
|
from neutron.plugins.common import constants as p_const
|
|
from neutron.plugins.openvswitch.common import constants
|
|
from neutron.tests import base
|
|
from neutron.tests.unit.ofagent import fake_oflib
|
|
|
|
|
|
NOTIFIER = ('neutron.plugins.ml2.rpc.AgentNotifierApi')
|
|
OVS_LINUX_KERN_VERS_WITHOUT_VXLAN = "3.12.0"
|
|
|
|
|
|
def _mock_port(is_neutron=True, normalized_name=None):
|
|
p = mock.Mock()
|
|
p.is_neutron_port.return_value = is_neutron
|
|
if normalized_name:
|
|
p.normalized_port_name.return_value = normalized_name
|
|
return p
|
|
|
|
|
|
class OFAAgentTestCase(base.BaseTestCase):
|
|
|
|
_AGENT_NAME = 'neutron.plugins.ofagent.agent.ofa_neutron_agent'
|
|
|
|
def setUp(self):
|
|
super(OFAAgentTestCase, self).setUp()
|
|
self.fake_oflib_of = fake_oflib.patch_fake_oflib_of().start()
|
|
self.mod_agent = importutils.import_module(self._AGENT_NAME)
|
|
cfg.CONF.set_default('firewall_driver',
|
|
'neutron.agent.firewall.NoopFirewallDriver',
|
|
group='SECURITYGROUP')
|
|
self.ryuapp = mock.Mock()
|
|
cfg.CONF.register_cli_opts([
|
|
cfg.StrOpt('ofp-listen-host', default='',
|
|
help='openflow listen host'),
|
|
cfg.IntOpt('ofp-tcp-listen-port', default=6633,
|
|
help='openflow tcp listen port')
|
|
])
|
|
cfg.CONF.set_override('root_helper', 'fake_helper', group='AGENT')
|
|
|
|
|
|
class CreateAgentConfigMap(OFAAgentTestCase):
|
|
|
|
def test_create_agent_config_map_succeeds(self):
|
|
self.assertTrue(self.mod_agent.create_agent_config_map(cfg.CONF))
|
|
|
|
def test_create_agent_config_map_fails_for_invalid_tunnel_config(self):
|
|
# An ip address is required for tunneling but there is no default,
|
|
# verify this for both gre and vxlan tunnels.
|
|
cfg.CONF.set_override('tunnel_types', [p_const.TYPE_GRE],
|
|
group='AGENT')
|
|
with testtools.ExpectedException(ValueError):
|
|
self.mod_agent.create_agent_config_map(cfg.CONF)
|
|
cfg.CONF.set_override('tunnel_types', [p_const.TYPE_VXLAN],
|
|
group='AGENT')
|
|
with testtools.ExpectedException(ValueError):
|
|
self.mod_agent.create_agent_config_map(cfg.CONF)
|
|
|
|
def test_create_agent_config_map_enable_tunneling(self):
|
|
# Verify setting only enable_tunneling will default tunnel_type to GRE
|
|
cfg.CONF.set_override('tunnel_types', None, group='AGENT')
|
|
cfg.CONF.set_override('enable_tunneling', True, group='OVS')
|
|
cfg.CONF.set_override('local_ip', '10.10.10.10', group='OVS')
|
|
cfgmap = self.mod_agent.create_agent_config_map(cfg.CONF)
|
|
self.assertEqual(cfgmap['tunnel_types'], [p_const.TYPE_GRE])
|
|
|
|
def test_create_agent_config_map_fails_no_local_ip(self):
|
|
# An ip address is required for tunneling but there is no default
|
|
cfg.CONF.set_override('enable_tunneling', True, group='OVS')
|
|
with testtools.ExpectedException(ValueError):
|
|
self.mod_agent.create_agent_config_map(cfg.CONF)
|
|
|
|
def test_create_agent_config_map_fails_for_invalid_tunnel_type(self):
|
|
cfg.CONF.set_override('tunnel_types', ['foobar'], group='AGENT')
|
|
with testtools.ExpectedException(ValueError):
|
|
self.mod_agent.create_agent_config_map(cfg.CONF)
|
|
|
|
def test_create_agent_config_map_multiple_tunnel_types(self):
|
|
cfg.CONF.set_override('local_ip', '10.10.10.10', group='OVS')
|
|
cfg.CONF.set_override('tunnel_types', [p_const.TYPE_GRE,
|
|
p_const.TYPE_VXLAN], group='AGENT')
|
|
cfgmap = self.mod_agent.create_agent_config_map(cfg.CONF)
|
|
self.assertEqual(cfgmap['tunnel_types'],
|
|
[p_const.TYPE_GRE, p_const.TYPE_VXLAN])
|
|
|
|
|
|
class TestOFANeutronAgentOVSBridge(OFAAgentTestCase):
|
|
|
|
def setUp(self):
|
|
super(TestOFANeutronAgentOVSBridge, self).setUp()
|
|
self.br_name = 'bridge1'
|
|
self.root_helper = 'fake_helper'
|
|
self.ovs = self.mod_agent.OVSBridge(
|
|
self.br_name, self.root_helper, self.ryuapp)
|
|
|
|
def test_find_datapath_id(self):
|
|
with mock.patch.object(self.ovs, 'get_datapath_id',
|
|
return_value='12345'):
|
|
self.ovs.find_datapath_id()
|
|
self.assertEqual(self.ovs.datapath_id, '12345')
|
|
|
|
def _fake_get_datapath(self, app, datapath_id):
|
|
if self.ovs.retry_count >= 2:
|
|
datapath = mock.Mock()
|
|
datapath.ofproto_parser = mock.Mock()
|
|
return datapath
|
|
self.ovs.retry_count += 1
|
|
return None
|
|
|
|
def test_get_datapath_normal(self):
|
|
self.ovs.retry_count = 0
|
|
with mock.patch.object(self.mod_agent.ryu_api, 'get_datapath',
|
|
new=self._fake_get_datapath):
|
|
self.ovs.datapath_id = '0x64'
|
|
self.ovs.get_datapath(retry_max=4)
|
|
self.assertEqual(self.ovs.retry_count, 2)
|
|
|
|
def test_get_datapath_retry_out_by_default_time(self):
|
|
cfg.CONF.set_override('get_datapath_retry_times', 3, group='AGENT')
|
|
with mock.patch.object(self.mod_agent.ryu_api, 'get_datapath',
|
|
return_value=None) as mock_get_datapath:
|
|
with testtools.ExpectedException(SystemExit):
|
|
self.ovs.datapath_id = '0x64'
|
|
self.ovs.get_datapath(retry_max=3)
|
|
self.assertEqual(mock_get_datapath.call_count, 3)
|
|
|
|
def test_get_datapath_retry_out_by_specified_time(self):
|
|
with mock.patch.object(self.mod_agent.ryu_api, 'get_datapath',
|
|
return_value=None) as mock_get_datapath:
|
|
with testtools.ExpectedException(SystemExit):
|
|
self.ovs.datapath_id = '0x64'
|
|
self.ovs.get_datapath(retry_max=2)
|
|
self.assertEqual(mock_get_datapath.call_count, 2)
|
|
|
|
def test_setup_ofp_default_par(self):
|
|
with contextlib.nested(
|
|
mock.patch.object(self.ovs, 'set_protocols'),
|
|
mock.patch.object(self.ovs, 'set_controller'),
|
|
mock.patch.object(self.ovs, 'find_datapath_id'),
|
|
mock.patch.object(self.ovs, 'get_datapath'),
|
|
) as (mock_set_protocols, mock_set_controller,
|
|
mock_find_datapath_id, mock_get_datapath):
|
|
self.ovs.setup_ofp()
|
|
mock_set_protocols.assert_called_with('OpenFlow13')
|
|
mock_set_controller.assert_called_with(['tcp:127.0.0.1:6633'])
|
|
mock_get_datapath.assert_called_with(
|
|
cfg.CONF.AGENT.get_datapath_retry_times)
|
|
self.assertEqual(mock_find_datapath_id.call_count, 1)
|
|
|
|
def test_setup_ofp_specify_par(self):
|
|
controller_names = ['tcp:192.168.10.10:1234', 'tcp:172.17.16.20:5555']
|
|
with contextlib.nested(
|
|
mock.patch.object(self.ovs, 'set_protocols'),
|
|
mock.patch.object(self.ovs, 'set_controller'),
|
|
mock.patch.object(self.ovs, 'find_datapath_id'),
|
|
mock.patch.object(self.ovs, 'get_datapath'),
|
|
) as (mock_set_protocols, mock_set_controller,
|
|
mock_find_datapath_id, mock_get_datapath):
|
|
self.ovs.setup_ofp(controller_names=controller_names,
|
|
protocols='OpenFlow133',
|
|
retry_max=11)
|
|
mock_set_protocols.assert_called_with('OpenFlow133')
|
|
mock_set_controller.assert_called_with(controller_names)
|
|
mock_get_datapath.assert_called_with(11)
|
|
self.assertEqual(mock_find_datapath_id.call_count, 1)
|
|
|
|
def test_setup_ofp_with_except(self):
|
|
with contextlib.nested(
|
|
mock.patch.object(self.ovs, 'set_protocols',
|
|
side_effect=RuntimeError),
|
|
mock.patch.object(self.ovs, 'set_controller'),
|
|
mock.patch.object(self.ovs, 'find_datapath_id'),
|
|
mock.patch.object(self.ovs, 'get_datapath'),
|
|
) as (mock_set_protocols, mock_set_controller,
|
|
mock_find_datapath_id, mock_get_datapath):
|
|
with testtools.ExpectedException(SystemExit):
|
|
self.ovs.setup_ofp()
|
|
|
|
|
|
class TestOFANeutronAgent(OFAAgentTestCase):
|
|
|
|
def setUp(self):
|
|
super(TestOFANeutronAgent, self).setUp()
|
|
notifier_p = mock.patch(NOTIFIER)
|
|
notifier_cls = notifier_p.start()
|
|
self.notifier = mock.Mock()
|
|
notifier_cls.return_value = self.notifier
|
|
# Avoid rpc initialization for unit tests
|
|
cfg.CONF.set_override('rpc_backend',
|
|
'neutron.openstack.common.rpc.impl_fake')
|
|
kwargs = self.mod_agent.create_agent_config_map(cfg.CONF)
|
|
|
|
class MockFixedIntervalLoopingCall(object):
|
|
def __init__(self, f):
|
|
self.f = f
|
|
|
|
def start(self, interval=0):
|
|
self.f()
|
|
|
|
def _mk_test_dp(name):
|
|
ofp = importutils.import_module('ryu.ofproto.ofproto_v1_3')
|
|
ofpp = importutils.import_module('ryu.ofproto.ofproto_v1_3_parser')
|
|
dp = mock.Mock()
|
|
dp.ofproto = ofp
|
|
dp.ofproto_parser = ofpp
|
|
dp.__repr__ = lambda _self: name
|
|
return dp
|
|
|
|
def _mk_test_br(name):
|
|
dp = _mk_test_dp(name)
|
|
br = mock.Mock()
|
|
br.datapath = dp
|
|
br.ofproto = dp.ofproto
|
|
br.ofparser = dp.ofproto_parser
|
|
return br
|
|
|
|
with contextlib.nested(
|
|
mock.patch.object(self.mod_agent.OFANeutronAgent,
|
|
'setup_integration_br',
|
|
return_value=mock.Mock()),
|
|
mock.patch.object(self.mod_agent.OFANeutronAgent,
|
|
'setup_ancillary_bridges',
|
|
return_value=[]),
|
|
mock.patch.object(self.mod_agent.OVSBridge,
|
|
'get_local_port_mac',
|
|
return_value='00:00:00:00:00:01'),
|
|
mock.patch('neutron.agent.linux.utils.get_interface_mac',
|
|
return_value='00:00:00:00:00:01'),
|
|
mock.patch('neutron.openstack.common.loopingcall.'
|
|
'FixedIntervalLoopingCall',
|
|
new=MockFixedIntervalLoopingCall)):
|
|
self.agent = self.mod_agent.OFANeutronAgent(self.ryuapp, **kwargs)
|
|
self.agent.tun_br = _mk_test_br('tun_br')
|
|
self.datapath = mock.Mock()
|
|
self.ofparser = mock.Mock()
|
|
self.agent.phys_brs['phys-net1'] = _mk_test_br('phys_br1')
|
|
self.agent.phys_ofports['phys-net1'] = 777
|
|
self.agent.int_ofports['phys-net1'] = 666
|
|
self.datapath.ofparser = self.ofparser
|
|
self.ofparser.OFPMatch = mock.Mock()
|
|
self.ofparser.OFPMatch.return_value = mock.Mock()
|
|
self.ofparser.OFPFlowMod = mock.Mock()
|
|
self.ofparser.OFPFlowMod.return_value = mock.Mock()
|
|
self.agent.int_br.ofparser = self.ofparser
|
|
self.agent.int_br.datapath = _mk_test_dp('int_br')
|
|
|
|
self.agent.sg_agent = mock.Mock()
|
|
|
|
def _mock_port_bound(self, ofport=None, new_local_vlan=None,
|
|
old_local_vlan=None):
|
|
port = mock.Mock()
|
|
port.ofport = ofport
|
|
net_uuid = 'my-net-uuid'
|
|
if old_local_vlan is not None:
|
|
self.agent.local_vlan_map[net_uuid] = (
|
|
self.mod_agent.LocalVLANMapping(
|
|
old_local_vlan, None, None, None))
|
|
with contextlib.nested(
|
|
mock.patch.object(self.mod_agent.OVSBridge,
|
|
'set_db_attribute', return_value=True),
|
|
mock.patch.object(self.mod_agent.OVSBridge,
|
|
'db_get_val', return_value=str(old_local_vlan)),
|
|
mock.patch.object(self.agent, 'ryu_send_msg')
|
|
) as (set_ovs_db_func, get_ovs_db_func, ryu_send_msg_func):
|
|
self.agent.port_bound(port, net_uuid, 'local', None, None)
|
|
get_ovs_db_func.assert_called_once_with("Port", mock.ANY, "tag")
|
|
if new_local_vlan != old_local_vlan:
|
|
set_ovs_db_func.assert_called_once_with(
|
|
"Port", mock.ANY, "tag", str(new_local_vlan))
|
|
if ofport != -1:
|
|
ryu_send_msg_func.assert_called_once_with(
|
|
self.ofparser.OFPFlowMod.return_value)
|
|
else:
|
|
self.assertFalse(ryu_send_msg_func.called)
|
|
else:
|
|
self.assertFalse(set_ovs_db_func.called)
|
|
self.assertFalse(ryu_send_msg_func.called)
|
|
|
|
def test_port_bound_deletes_flows_for_valid_ofport(self):
|
|
self._mock_port_bound(ofport=1, new_local_vlan=1)
|
|
|
|
def test_port_bound_ignores_flows_for_invalid_ofport(self):
|
|
self._mock_port_bound(ofport=-1, new_local_vlan=1)
|
|
|
|
def test_port_bound_does_not_rewire_if_already_bound(self):
|
|
self._mock_port_bound(ofport=-1, new_local_vlan=1, old_local_vlan=1)
|
|
|
|
def _test_port_dead(self, cur_tag=None):
|
|
port = mock.Mock()
|
|
port.ofport = 1
|
|
with contextlib.nested(
|
|
mock.patch.object(self.mod_agent.OVSBridge,
|
|
'set_db_attribute', return_value=True),
|
|
mock.patch.object(self.mod_agent.OVSBridge,
|
|
'db_get_val', return_value=cur_tag),
|
|
mock.patch.object(self.agent, 'ryu_send_msg')
|
|
) as (set_ovs_db_func, get_ovs_db_func, ryu_send_msg_func):
|
|
self.agent.port_dead(port)
|
|
get_ovs_db_func.assert_called_once_with("Port", mock.ANY, "tag")
|
|
if cur_tag == self.mod_agent.DEAD_VLAN_TAG:
|
|
self.assertFalse(set_ovs_db_func.called)
|
|
self.assertFalse(ryu_send_msg_func.called)
|
|
else:
|
|
set_ovs_db_func.assert_called_once_with(
|
|
"Port", mock.ANY, "tag", str(self.mod_agent.DEAD_VLAN_TAG))
|
|
ryu_send_msg_func.assert_called_once_with(
|
|
self.ofparser.OFPFlowMod.return_value)
|
|
|
|
def test_port_dead(self):
|
|
self._test_port_dead()
|
|
|
|
def test_port_dead_with_port_already_dead(self):
|
|
self._test_port_dead(self.mod_agent.DEAD_VLAN_TAG)
|
|
|
|
def mock_scan_ports(self, port_set=None, registered_ports=None,
|
|
updated_ports=None, port_tags_dict=None):
|
|
port_tags_dict = port_tags_dict or {}
|
|
with contextlib.nested(
|
|
mock.patch.object(self.agent, '_get_ofport_names',
|
|
return_value=port_set),
|
|
mock.patch.object(self.agent.int_br, 'get_port_tag_dict',
|
|
return_value=port_tags_dict)
|
|
):
|
|
return self.agent.scan_ports(registered_ports, updated_ports)
|
|
|
|
def test_scan_ports_returns_current_only_for_unchanged_ports(self):
|
|
vif_port_set = set([1, 3])
|
|
registered_ports = set([1, 3])
|
|
expected = {'current': vif_port_set}
|
|
actual = self.mock_scan_ports(vif_port_set, registered_ports)
|
|
self.assertEqual(expected, actual)
|
|
|
|
def test_scan_ports_returns_port_changes(self):
|
|
vif_port_set = set([1, 3])
|
|
registered_ports = set([1, 2])
|
|
expected = dict(current=vif_port_set, added=set([3]), removed=set([2]))
|
|
actual = self.mock_scan_ports(vif_port_set, registered_ports)
|
|
self.assertEqual(expected, actual)
|
|
|
|
def _test_scan_ports_with_updated_ports(self, updated_ports):
|
|
vif_port_set = set([1, 3, 4])
|
|
registered_ports = set([1, 2, 4])
|
|
expected = dict(current=vif_port_set, added=set([3]),
|
|
removed=set([2]), updated=set([4]))
|
|
actual = self.mock_scan_ports(vif_port_set, registered_ports,
|
|
updated_ports)
|
|
self.assertEqual(expected, actual)
|
|
|
|
def test_scan_ports_finds_known_updated_ports(self):
|
|
self._test_scan_ports_with_updated_ports(set([4]))
|
|
|
|
def test_scan_ports_ignores_unknown_updated_ports(self):
|
|
# the port '5' was not seen on current ports. Hence it has either
|
|
# never been wired or already removed and should be ignored
|
|
self._test_scan_ports_with_updated_ports(set([4, 5]))
|
|
|
|
def test_scan_ports_ignores_updated_port_if_removed(self):
|
|
vif_port_set = set([1, 3])
|
|
registered_ports = set([1, 2])
|
|
updated_ports = set([1, 2])
|
|
expected = dict(current=vif_port_set, added=set([3]),
|
|
removed=set([2]), updated=set([1]))
|
|
actual = self.mock_scan_ports(vif_port_set, registered_ports,
|
|
updated_ports)
|
|
self.assertEqual(expected, actual)
|
|
|
|
def test_scan_ports_no_vif_changes_returns_updated_port_only(self):
|
|
vif_port_set = set([1, 2, 3])
|
|
registered_ports = set([1, 2, 3])
|
|
updated_ports = set([2])
|
|
expected = dict(current=vif_port_set, updated=set([2]))
|
|
actual = self.mock_scan_ports(vif_port_set, registered_ports,
|
|
updated_ports)
|
|
self.assertEqual(expected, actual)
|
|
|
|
def test_update_ports_returns_lost_vlan_port(self):
|
|
port = mock.Mock(port_name='tap00000001-00', ofport=1)
|
|
lvm = self.mod_agent.LocalVLANMapping(
|
|
vlan=1, network_type='1', physical_network=None, segmentation_id=1,
|
|
vif_ports={port.port_name: port})
|
|
local_vlan_map = {'1': lvm}
|
|
port_set = set(['tap00000001-00',
|
|
'tap00000003-00'])
|
|
registered_ports = set(['tap00000001-00', 'tap00000002-00'])
|
|
port_tags_dict = {'tap00000001-00': []}
|
|
expected = dict(
|
|
added=set(['tap00000003-00']),
|
|
current=set(['tap00000001-00', 'tap00000003-00']),
|
|
removed=set(['tap00000002-00']),
|
|
updated=set(['tap00000001-00'])
|
|
)
|
|
with mock.patch.dict(self.agent.local_vlan_map, local_vlan_map):
|
|
actual = self.mock_scan_ports(
|
|
port_set, registered_ports, port_tags_dict=port_tags_dict)
|
|
self.assertEqual(expected, actual)
|
|
|
|
def test_treat_devices_added_returns_true_for_missing_device(self):
|
|
with contextlib.nested(
|
|
mock.patch.object(self.agent.plugin_rpc, 'get_device_details',
|
|
side_effect=Exception()),
|
|
mock.patch.object(self.agent, '_get_ports',
|
|
return_value=[_mock_port(True, 'xxx')])):
|
|
self.assertTrue(self.agent.treat_devices_added_or_updated(['xxx']))
|
|
|
|
def _mock_treat_devices_added_updated(self, details, port, all_ports,
|
|
func_name):
|
|
"""Mock treat devices added or updated.
|
|
|
|
:param details: the details to return for the device
|
|
:param port: port name to process
|
|
:param all_ports: the port that _get_ports return
|
|
:param func_name: the function that should be called
|
|
:returns: whether the named function was called
|
|
"""
|
|
with contextlib.nested(
|
|
mock.patch.object(self.agent.plugin_rpc, 'get_device_details',
|
|
return_value=details),
|
|
mock.patch.object(self.agent, '_get_ports',
|
|
return_value=all_ports),
|
|
mock.patch.object(self.agent.plugin_rpc, 'update_device_up'),
|
|
mock.patch.object(self.agent.plugin_rpc, 'update_device_down'),
|
|
mock.patch.object(self.agent, func_name)
|
|
) as (get_dev_fn, _get_ports, upd_dev_up, upd_dev_down, func):
|
|
self.assertFalse(self.agent.treat_devices_added_or_updated([port]))
|
|
_get_ports.assert_called_once_with(self.agent.int_br)
|
|
return func.called
|
|
|
|
def test_treat_devices_added_updated_ignores_invalid_ofport(self):
|
|
port_name = 'hoge'
|
|
p1 = _mock_port(True, port_name)
|
|
p1.ofport = -1
|
|
self.assertFalse(self._mock_treat_devices_added_updated(
|
|
mock.MagicMock(), port_name, [p1], 'port_dead'))
|
|
|
|
def test_treat_devices_added_updated_marks_unknown_port_as_dead(self):
|
|
port_name = 'hoge'
|
|
p1 = _mock_port(True, port_name)
|
|
p1.ofport = 1
|
|
self.assertTrue(self._mock_treat_devices_added_updated(
|
|
mock.MagicMock(), port_name, [p1], 'port_dead'))
|
|
|
|
def test_treat_devices_added_does_not_process_missing_port(self):
|
|
with contextlib.nested(
|
|
mock.patch.object(self.agent.plugin_rpc, 'get_device_details'),
|
|
mock.patch.object(self.agent.int_br, 'get_vif_port_by_id',
|
|
return_value=None)
|
|
) as (get_dev_fn, get_vif_func):
|
|
self.assertFalse(get_dev_fn.called)
|
|
|
|
def test_treat_devices_added_updated_updates_known_port(self):
|
|
port_name = 'tapd3315981-0b'
|
|
p1 = _mock_port(False)
|
|
p2 = _mock_port(True, port_name)
|
|
ports = [p1, p2]
|
|
details = mock.MagicMock()
|
|
details.__contains__.side_effect = lambda x: True
|
|
self.assertTrue(self._mock_treat_devices_added_updated(
|
|
details, port_name, ports, 'treat_vif_port'))
|
|
|
|
def test_treat_devices_added_updated_put_port_down(self):
|
|
fake_details_dict = {'admin_state_up': False,
|
|
'port_id': 'xxx',
|
|
'device': 'xxx',
|
|
'network_id': 'yyy',
|
|
'physical_network': 'foo',
|
|
'segmentation_id': 'bar',
|
|
'network_type': 'baz'}
|
|
with contextlib.nested(
|
|
mock.patch.object(self.agent.plugin_rpc, 'get_device_details',
|
|
return_value=fake_details_dict),
|
|
mock.patch.object(self.agent, '_get_ports',
|
|
return_value=[_mock_port(True, 'xxx')]),
|
|
mock.patch.object(self.agent.plugin_rpc, 'update_device_up'),
|
|
mock.patch.object(self.agent.plugin_rpc, 'update_device_down'),
|
|
mock.patch.object(self.agent, 'treat_vif_port')
|
|
) as (get_dev_fn, _get_ports, upd_dev_up,
|
|
upd_dev_down, treat_vif_port):
|
|
self.assertFalse(self.agent.treat_devices_added_or_updated(
|
|
['xxx']))
|
|
self.assertTrue(treat_vif_port.called)
|
|
self.assertTrue(upd_dev_down.called)
|
|
_get_ports.assert_called_once_with(self.agent.int_br)
|
|
|
|
def test_treat_devices_removed_returns_true_for_missing_device(self):
|
|
with mock.patch.object(self.agent.plugin_rpc, 'update_device_down',
|
|
side_effect=Exception()):
|
|
self.assertTrue(self.agent.treat_devices_removed([{}]))
|
|
|
|
def _mock_treat_devices_removed(self, port_exists):
|
|
details = dict(exists=port_exists)
|
|
with mock.patch.object(self.agent.plugin_rpc, 'update_device_down',
|
|
return_value=details):
|
|
with mock.patch.object(self.agent, 'port_unbound') as port_unbound:
|
|
self.assertFalse(self.agent.treat_devices_removed([{}]))
|
|
self.assertTrue(port_unbound.called)
|
|
|
|
def test_treat_devices_removed_unbinds_port(self):
|
|
self._mock_treat_devices_removed(True)
|
|
|
|
def test_treat_devices_removed_ignores_missing_port(self):
|
|
self._mock_treat_devices_removed(False)
|
|
|
|
def _test_process_network_ports(self, port_info):
|
|
with contextlib.nested(
|
|
mock.patch.object(self.agent.sg_agent, "setup_port_filters"),
|
|
mock.patch.object(self.agent, "treat_devices_added_or_updated",
|
|
return_value=False),
|
|
mock.patch.object(self.agent, "treat_devices_removed",
|
|
return_value=False)
|
|
) as (setup_port_filters, device_added_updated, device_removed):
|
|
self.assertFalse(self.agent.process_network_ports(port_info))
|
|
setup_port_filters.assert_called_once_with(
|
|
port_info['added'], port_info.get('updated', set()))
|
|
device_added_updated.assert_called_once_with(
|
|
port_info['added'] | port_info.get('updated', set()))
|
|
device_removed.assert_called_once_with(port_info['removed'])
|
|
|
|
def test_process_network_ports(self):
|
|
self._test_process_network_ports(
|
|
{'current': set(['tap0']),
|
|
'removed': set(['eth0']),
|
|
'added': set(['eth1'])})
|
|
|
|
def test_process_network_port_with_updated_ports(self):
|
|
self._test_process_network_ports(
|
|
{'current': set(['tap0', 'tap1']),
|
|
'updated': set(['tap1', 'eth1']),
|
|
'removed': set(['eth0']),
|
|
'added': set(['eth1'])})
|
|
|
|
def test_report_state(self):
|
|
with mock.patch.object(self.agent.state_rpc,
|
|
"report_state") as report_st:
|
|
self.agent.int_br_device_count = 5
|
|
self.agent._report_state()
|
|
report_st.assert_called_with(self.agent.context,
|
|
self.agent.agent_state)
|
|
self.assertNotIn("start_flag", self.agent.agent_state)
|
|
self.assertEqual(
|
|
self.agent.agent_state["configurations"]["devices"],
|
|
self.agent.int_br_device_count
|
|
)
|
|
|
|
def test_network_delete(self):
|
|
with mock.patch.object(self.agent,
|
|
"reclaim_local_vlan") as recl_fn:
|
|
self.agent.network_delete("unused_context",
|
|
network_id="123")
|
|
self.assertFalse(recl_fn.called)
|
|
self.agent.local_vlan_map["123"] = "LVM object"
|
|
self.agent.network_delete("unused_context",
|
|
network_id="123")
|
|
recl_fn.assert_called_with("123")
|
|
|
|
def test_port_update(self):
|
|
port = {"id": "b1981919-f516-11e3-a8f4-08606e7f74e7",
|
|
"network_id": "124",
|
|
"admin_state_up": False}
|
|
self.agent.port_update("unused_context",
|
|
port=port,
|
|
network_type="vlan",
|
|
segmentation_id="1",
|
|
physical_network="physnet")
|
|
self.assertEqual(set(['tapb1981919-f5']), self.agent.updated_ports)
|
|
|
|
def test_setup_physical_bridges(self):
|
|
with contextlib.nested(
|
|
mock.patch.object(ip_lib, "device_exists"),
|
|
mock.patch.object(utils, "execute"),
|
|
mock.patch.object(self.mod_agent.OVSBridge, "add_port"),
|
|
mock.patch.object(self.mod_agent.OVSBridge, "delete_port"),
|
|
mock.patch.object(self.mod_agent.OVSBridge, "set_protocols"),
|
|
mock.patch.object(self.mod_agent.OVSBridge, "set_controller"),
|
|
mock.patch.object(self.mod_agent.OVSBridge, "get_datapath_id",
|
|
return_value='0xa'),
|
|
mock.patch.object(self.agent.int_br, "add_port"),
|
|
mock.patch.object(self.agent.int_br, "delete_port"),
|
|
mock.patch.object(ip_lib.IPWrapper, "add_veth"),
|
|
mock.patch.object(ip_lib.IpLinkCommand, "delete"),
|
|
mock.patch.object(ip_lib.IpLinkCommand, "set_up"),
|
|
mock.patch.object(ip_lib.IpLinkCommand, "set_mtu"),
|
|
mock.patch.object(self.mod_agent.ryu_api, "get_datapath",
|
|
return_value=self.datapath)
|
|
) as (devex_fn, utilsexec_fn,
|
|
ovs_addport_fn, ovs_delport_fn, ovs_set_protocols_fn,
|
|
ovs_set_controller_fn, ovs_datapath_id_fn, br_addport_fn,
|
|
br_delport_fn, addveth_fn, linkdel_fn, linkset_fn, linkmtu_fn,
|
|
ryu_api_fn):
|
|
devex_fn.return_value = True
|
|
parent = mock.MagicMock()
|
|
parent.attach_mock(utilsexec_fn, 'utils_execute')
|
|
parent.attach_mock(linkdel_fn, 'link_delete')
|
|
parent.attach_mock(addveth_fn, 'add_veth')
|
|
addveth_fn.return_value = (ip_lib.IPDevice("int-br-eth1"),
|
|
ip_lib.IPDevice("phy-br-eth1"))
|
|
ovs_addport_fn.return_value = "25"
|
|
br_addport_fn.return_value = "11"
|
|
self.agent.setup_physical_bridges({"physnet1": "br-eth"})
|
|
expected_calls = [mock.call.link_delete(),
|
|
mock.call.utils_execute(['/sbin/udevadm',
|
|
'settle',
|
|
'--timeout=10']),
|
|
mock.call.add_veth('int-br-eth',
|
|
'phy-br-eth')]
|
|
parent.assert_has_calls(expected_calls, any_order=False)
|
|
self.assertEqual(self.agent.int_ofports["physnet1"],
|
|
"11")
|
|
self.assertEqual(self.agent.phys_ofports["physnet1"],
|
|
"25")
|
|
|
|
def test_port_unbound(self):
|
|
with mock.patch.object(self.agent, "reclaim_local_vlan") as reclvl_fn:
|
|
self.agent.enable_tunneling = True
|
|
lvm = mock.Mock()
|
|
lvm.network_type = "gre"
|
|
lvm.vif_ports = {"vif1": mock.Mock()}
|
|
self.agent.local_vlan_map["netuid12345"] = lvm
|
|
self.agent.port_unbound("vif1", "netuid12345")
|
|
self.assertTrue(reclvl_fn.called)
|
|
reclvl_fn.called = False
|
|
|
|
lvm.vif_ports = {}
|
|
self.agent.port_unbound("vif1", "netuid12345")
|
|
self.assertEqual(reclvl_fn.call_count, 2)
|
|
|
|
lvm.vif_ports = {"vif1": mock.Mock()}
|
|
self.agent.port_unbound("vif3", "netuid12345")
|
|
self.assertEqual(reclvl_fn.call_count, 2)
|
|
|
|
def test_daemon_loop_uses_polling_manager(self):
|
|
with mock.patch(
|
|
'neutron.agent.linux.polling.get_polling_manager'
|
|
) as mock_get_pm:
|
|
fake_pm = mock.Mock()
|
|
mock_get_pm.return_value = fake_pm
|
|
fake_pm.__enter__ = mock.Mock()
|
|
fake_pm.__exit__ = mock.Mock()
|
|
with mock.patch.object(
|
|
self.agent, 'ovsdb_monitor_loop'
|
|
) as mock_loop:
|
|
self.agent.daemon_loop()
|
|
mock_get_pm.assert_called_once_with(True, 'fake_helper',
|
|
constants.DEFAULT_OVSDBMON_RESPAWN)
|
|
mock_loop.assert_called_once_with(polling_manager=fake_pm.__enter__())
|
|
|
|
def test_setup_tunnel_port_error_negative(self):
|
|
with contextlib.nested(
|
|
mock.patch.object(self.agent.tun_br, 'add_tunnel_port',
|
|
return_value='-1'),
|
|
mock.patch.object(self.mod_agent.LOG, 'error')
|
|
) as (add_tunnel_port_fn, log_error_fn):
|
|
ofport = self.agent.setup_tunnel_port(
|
|
'gre-1', 'remote_ip', p_const.TYPE_GRE)
|
|
add_tunnel_port_fn.assert_called_once_with(
|
|
'gre-1', 'remote_ip', self.agent.local_ip, p_const.TYPE_GRE,
|
|
self.agent.vxlan_udp_port, self.agent.dont_fragment)
|
|
log_error_fn.assert_called_once_with(
|
|
_("Failed to set-up %(type)s tunnel port to %(ip)s"),
|
|
{'type': p_const.TYPE_GRE, 'ip': 'remote_ip'})
|
|
self.assertEqual(ofport, 0)
|
|
|
|
def test_setup_tunnel_port_error_not_int(self):
|
|
with contextlib.nested(
|
|
mock.patch.object(self.agent.tun_br, 'add_tunnel_port',
|
|
return_value=None),
|
|
mock.patch.object(self.mod_agent.LOG, 'exception'),
|
|
mock.patch.object(self.mod_agent.LOG, 'error')
|
|
) as (add_tunnel_port_fn, log_exc_fn, log_error_fn):
|
|
ofport = self.agent.setup_tunnel_port(
|
|
'gre-1', 'remote_ip', p_const.TYPE_GRE)
|
|
add_tunnel_port_fn.assert_called_once_with(
|
|
'gre-1', 'remote_ip', self.agent.local_ip, p_const.TYPE_GRE,
|
|
self.agent.vxlan_udp_port, self.agent.dont_fragment)
|
|
log_exc_fn.assert_called_once_with(
|
|
_("ofport should have a value that can be "
|
|
"interpreted as an integer"))
|
|
log_error_fn.assert_called_once_with(
|
|
_("Failed to set-up %(type)s tunnel port to %(ip)s"),
|
|
{'type': p_const.TYPE_GRE, 'ip': 'remote_ip'})
|
|
self.assertEqual(ofport, 0)
|
|
|
|
def _create_tunnel_port_name(self, tunnel_ip, tunnel_type):
|
|
tunnel_ip_hex = '%08x' % netaddr.IPAddress(tunnel_ip, version=4)
|
|
return '%s-%s' % (tunnel_type, tunnel_ip_hex)
|
|
|
|
def test_tunnel_sync_with_valid_ip_address_and_gre_type(self):
|
|
tunnel_ip = '100.101.102.103'
|
|
self.agent.tunnel_types = ['gre']
|
|
tun_name = self._create_tunnel_port_name(tunnel_ip,
|
|
self.agent.tunnel_types[0])
|
|
fake_tunnel_details = {'tunnels': [{'ip_address': tunnel_ip}]}
|
|
with contextlib.nested(
|
|
mock.patch.object(self.agent.plugin_rpc, 'tunnel_sync',
|
|
return_value=fake_tunnel_details),
|
|
mock.patch.object(self.agent, 'setup_tunnel_port')
|
|
) as (tunnel_sync_rpc_fn, setup_tunnel_port_fn):
|
|
self.agent.tunnel_sync()
|
|
expected_calls = [mock.call(tun_name, tunnel_ip,
|
|
self.agent.tunnel_types[0])]
|
|
setup_tunnel_port_fn.assert_has_calls(expected_calls)
|
|
|
|
def test_tunnel_sync_with_valid_ip_address_and_vxlan_type(self):
|
|
tunnel_ip = '100.101.31.15'
|
|
self.agent.tunnel_types = ['vxlan']
|
|
tun_name = self._create_tunnel_port_name(tunnel_ip,
|
|
self.agent.tunnel_types[0])
|
|
fake_tunnel_details = {'tunnels': [{'ip_address': tunnel_ip}]}
|
|
with contextlib.nested(
|
|
mock.patch.object(self.agent.plugin_rpc, 'tunnel_sync',
|
|
return_value=fake_tunnel_details),
|
|
mock.patch.object(self.agent, 'setup_tunnel_port')
|
|
) as (tunnel_sync_rpc_fn, setup_tunnel_port_fn):
|
|
self.agent.tunnel_sync()
|
|
expected_calls = [mock.call(tun_name, tunnel_ip,
|
|
self.agent.tunnel_types[0])]
|
|
setup_tunnel_port_fn.assert_has_calls(expected_calls)
|
|
|
|
def test_tunnel_sync_invalid_ip_address(self):
|
|
tunnel_ip = '100.100.100.100'
|
|
self.agent.tunnel_types = ['vxlan']
|
|
tun_name = self._create_tunnel_port_name(tunnel_ip,
|
|
self.agent.tunnel_types[0])
|
|
fake_tunnel_details = {'tunnels': [{'ip_address': '300.300.300.300'},
|
|
{'ip_address': tunnel_ip}]}
|
|
with contextlib.nested(
|
|
mock.patch.object(self.agent.plugin_rpc, 'tunnel_sync',
|
|
return_value=fake_tunnel_details),
|
|
mock.patch.object(self.agent, 'setup_tunnel_port')
|
|
) as (tunnel_sync_rpc_fn, setup_tunnel_port_fn):
|
|
self.agent.tunnel_sync()
|
|
setup_tunnel_port_fn.assert_called_once_with(
|
|
tun_name, tunnel_ip, self.agent.tunnel_types[0])
|
|
|
|
def test_tunnel_update(self):
|
|
tunnel_ip = '10.10.10.10'
|
|
self.agent.tunnel_types = ['gre']
|
|
tun_name = self._create_tunnel_port_name(tunnel_ip,
|
|
self.agent.tunnel_types[0])
|
|
kwargs = {'tunnel_ip': tunnel_ip,
|
|
'tunnel_type': self.agent.tunnel_types[0]}
|
|
self.agent.setup_tunnel_port = mock.Mock()
|
|
self.agent.enable_tunneling = True
|
|
self.agent.l2_pop = False
|
|
self.agent.tunnel_update(context=None, **kwargs)
|
|
expected_calls = [mock.call(tun_name, tunnel_ip,
|
|
self.agent.tunnel_types[0])]
|
|
self.agent.setup_tunnel_port.assert_has_calls(expected_calls)
|
|
|
|
def test__provision_local_vlan_inbound_for_tunnel(self):
|
|
with mock.patch.object(self.agent, 'ryu_send_msg') as sendmsg:
|
|
self.agent._provision_local_vlan_inbound_for_tunnel(1, 'gre', 3)
|
|
|
|
ofp = importutils.import_module('ryu.ofproto.ofproto_v1_3')
|
|
ofpp = importutils.import_module('ryu.ofproto.ofproto_v1_3_parser')
|
|
expected_msg = ofpp.OFPFlowMod(
|
|
self.agent.tun_br.datapath,
|
|
instructions=[
|
|
ofpp.OFPInstructionActions(
|
|
ofp.OFPIT_APPLY_ACTIONS,
|
|
[
|
|
ofpp.OFPActionPushVlan(),
|
|
ofpp.OFPActionSetField(vlan_vid=1 |
|
|
ofp.OFPVID_PRESENT),
|
|
]),
|
|
ofpp.OFPInstructionGotoTable(table_id=10),
|
|
],
|
|
match=ofpp.OFPMatch(tunnel_id=3),
|
|
priority=1,
|
|
table_id=2)
|
|
sendmsg.assert_has_calls([mock.call(expected_msg)])
|
|
|
|
def test__provision_local_vlan_outbound(self):
|
|
with mock.patch.object(self.agent, 'ryu_send_msg') as sendmsg:
|
|
self.agent._provision_local_vlan_outbound(888, 999, 'phys-net1')
|
|
|
|
ofp = importutils.import_module('ryu.ofproto.ofproto_v1_3')
|
|
ofpp = importutils.import_module('ryu.ofproto.ofproto_v1_3_parser')
|
|
expected_msg = ofpp.OFPFlowMod(
|
|
self.agent.phys_brs['phys-net1'].datapath,
|
|
instructions=[
|
|
ofpp.OFPInstructionActions(
|
|
ofp.OFPIT_APPLY_ACTIONS,
|
|
[
|
|
ofpp.OFPActionSetField(vlan_vid=999),
|
|
ofpp.OFPActionOutput(ofp.OFPP_NORMAL, 0),
|
|
]
|
|
)
|
|
],
|
|
match=ofpp.OFPMatch(
|
|
in_port=777,
|
|
vlan_vid=888 | ofp.OFPVID_PRESENT
|
|
),
|
|
priority=4)
|
|
sendmsg.assert_has_calls([mock.call(expected_msg)])
|
|
|
|
def test__provision_local_vlan_inbound(self):
|
|
with mock.patch.object(self.agent, 'ryu_send_msg') as sendmsg:
|
|
self.agent._provision_local_vlan_inbound(888, 999, 'phys-net1')
|
|
|
|
ofp = importutils.import_module('ryu.ofproto.ofproto_v1_3')
|
|
ofpp = importutils.import_module('ryu.ofproto.ofproto_v1_3_parser')
|
|
expected_msg = ofpp.OFPFlowMod(
|
|
self.agent.int_br.datapath,
|
|
instructions=[
|
|
ofpp.OFPInstructionActions(
|
|
ofp.OFPIT_APPLY_ACTIONS,
|
|
[
|
|
ofpp.OFPActionSetField(
|
|
vlan_vid=888 | ofp.OFPVID_PRESENT
|
|
),
|
|
ofpp.OFPActionOutput(ofp.OFPP_NORMAL, 0),
|
|
]
|
|
)
|
|
],
|
|
match=ofpp.OFPMatch(in_port=666, vlan_vid=999),
|
|
priority=3)
|
|
sendmsg.assert_has_calls([mock.call(expected_msg)])
|
|
|
|
def test__reclaim_local_vlan_outbound(self):
|
|
lvm = mock.Mock()
|
|
lvm.network_type = p_const.TYPE_VLAN
|
|
lvm.segmentation_id = 555
|
|
lvm.vlan = 444
|
|
lvm.physical_network = 'phys-net1'
|
|
with mock.patch.object(self.agent, 'ryu_send_msg') as sendmsg:
|
|
self.agent._reclaim_local_vlan_outbound(lvm)
|
|
|
|
ofp = importutils.import_module('ryu.ofproto.ofproto_v1_3')
|
|
ofpp = importutils.import_module('ryu.ofproto.ofproto_v1_3_parser')
|
|
expected_msg = ofpp.OFPFlowMod(
|
|
self.agent.phys_brs['phys-net1'].datapath,
|
|
command=ofp.OFPFC_DELETE,
|
|
match=ofpp.OFPMatch(
|
|
in_port=777,
|
|
vlan_vid=444 | ofp.OFPVID_PRESENT
|
|
),
|
|
out_group=ofp.OFPG_ANY,
|
|
out_port=ofp.OFPP_ANY,
|
|
table_id=ofp.OFPTT_ALL)
|
|
sendmsg.assert_has_calls([mock.call(expected_msg)])
|
|
|
|
def test__reclaim_local_vlan_inbound(self):
|
|
lvm = mock.Mock()
|
|
lvm.network_type = p_const.TYPE_VLAN
|
|
lvm.segmentation_id = 555
|
|
lvm.vlan = 444
|
|
lvm.physical_network = 'phys-net1'
|
|
with mock.patch.object(self.agent, 'ryu_send_msg') as sendmsg:
|
|
self.agent._reclaim_local_vlan_inbound(lvm)
|
|
|
|
ofp = importutils.import_module('ryu.ofproto.ofproto_v1_3')
|
|
ofpp = importutils.import_module('ryu.ofproto.ofproto_v1_3_parser')
|
|
expected_msg = ofpp.OFPFlowMod(
|
|
self.agent.int_br.datapath,
|
|
command=ofp.OFPFC_DELETE,
|
|
match=ofpp.OFPMatch(
|
|
in_port=666,
|
|
vlan_vid=555 | ofp.OFPVID_PRESENT
|
|
),
|
|
out_group=ofp.OFPG_ANY,
|
|
out_port=ofp.OFPP_ANY,
|
|
table_id=ofp.OFPTT_ALL)
|
|
sendmsg.assert_has_calls([mock.call(expected_msg)])
|
|
|
|
def test__provision_local_vlan_outbound_flat(self):
|
|
ofp = importutils.import_module('ryu.ofproto.ofproto_v1_3')
|
|
ofpp = importutils.import_module('ryu.ofproto.ofproto_v1_3_parser')
|
|
with mock.patch.object(self.agent, 'ryu_send_msg') as sendmsg:
|
|
self.agent._provision_local_vlan_outbound(888, ofp.OFPVID_NONE,
|
|
'phys-net1')
|
|
|
|
expected_msg = ofpp.OFPFlowMod(
|
|
self.agent.phys_brs['phys-net1'].datapath,
|
|
instructions=[
|
|
ofpp.OFPInstructionActions(
|
|
ofp.OFPIT_APPLY_ACTIONS,
|
|
[
|
|
ofpp.OFPActionPopVlan(),
|
|
ofpp.OFPActionOutput(ofp.OFPP_NORMAL, 0),
|
|
]
|
|
)
|
|
],
|
|
match=ofpp.OFPMatch(
|
|
in_port=777,
|
|
vlan_vid=888 | ofp.OFPVID_PRESENT
|
|
),
|
|
priority=4)
|
|
sendmsg.assert_has_calls([mock.call(expected_msg)])
|
|
|
|
def test__provision_local_vlan_inbound_flat(self):
|
|
ofp = importutils.import_module('ryu.ofproto.ofproto_v1_3')
|
|
ofpp = importutils.import_module('ryu.ofproto.ofproto_v1_3_parser')
|
|
with mock.patch.object(self.agent, 'ryu_send_msg') as sendmsg:
|
|
self.agent._provision_local_vlan_inbound(888, ofp.OFPVID_NONE,
|
|
'phys-net1')
|
|
|
|
expected_msg = ofpp.OFPFlowMod(
|
|
self.agent.int_br.datapath,
|
|
instructions=[
|
|
ofpp.OFPInstructionActions(
|
|
ofp.OFPIT_APPLY_ACTIONS,
|
|
[
|
|
ofpp.OFPActionPushVlan(),
|
|
ofpp.OFPActionSetField(
|
|
vlan_vid=888 | ofp.OFPVID_PRESENT
|
|
),
|
|
ofpp.OFPActionOutput(ofp.OFPP_NORMAL, 0),
|
|
]
|
|
)
|
|
],
|
|
match=ofpp.OFPMatch(in_port=666, vlan_vid=ofp.OFPVID_NONE),
|
|
priority=3)
|
|
sendmsg.assert_has_calls([mock.call(expected_msg)])
|
|
|
|
def test__reclaim_local_vlan_outbound_flat(self):
|
|
lvm = mock.Mock()
|
|
lvm.network_type = p_const.TYPE_FLAT
|
|
lvm.segmentation_id = 555
|
|
lvm.vlan = 444
|
|
lvm.physical_network = 'phys-net1'
|
|
with mock.patch.object(self.agent, 'ryu_send_msg') as sendmsg:
|
|
self.agent._reclaim_local_vlan_outbound(lvm)
|
|
|
|
ofp = importutils.import_module('ryu.ofproto.ofproto_v1_3')
|
|
ofpp = importutils.import_module('ryu.ofproto.ofproto_v1_3_parser')
|
|
expected_msg = ofpp.OFPFlowMod(
|
|
self.agent.phys_brs['phys-net1'].datapath,
|
|
command=ofp.OFPFC_DELETE,
|
|
match=ofpp.OFPMatch(
|
|
in_port=777,
|
|
vlan_vid=444 | ofp.OFPVID_PRESENT
|
|
),
|
|
out_group=ofp.OFPG_ANY,
|
|
out_port=ofp.OFPP_ANY,
|
|
table_id=ofp.OFPTT_ALL)
|
|
sendmsg.assert_has_calls([mock.call(expected_msg)])
|
|
|
|
def test__reclaim_local_vlan_inbound_flat(self):
|
|
lvm = mock.Mock()
|
|
lvm.network_type = p_const.TYPE_FLAT
|
|
lvm.segmentation_id = 555
|
|
lvm.vlan = 444
|
|
lvm.physical_network = 'phys-net1'
|
|
with mock.patch.object(self.agent, 'ryu_send_msg') as sendmsg:
|
|
self.agent._reclaim_local_vlan_inbound(lvm)
|
|
|
|
ofp = importutils.import_module('ryu.ofproto.ofproto_v1_3')
|
|
ofpp = importutils.import_module('ryu.ofproto.ofproto_v1_3_parser')
|
|
expected_msg = ofpp.OFPFlowMod(
|
|
self.agent.int_br.datapath,
|
|
command=ofp.OFPFC_DELETE,
|
|
match=ofpp.OFPMatch(
|
|
in_port=666,
|
|
vlan_vid=ofp.OFPVID_NONE
|
|
),
|
|
out_group=ofp.OFPG_ANY,
|
|
out_port=ofp.OFPP_ANY,
|
|
table_id=ofp.OFPTT_ALL)
|
|
sendmsg.assert_has_calls([mock.call(expected_msg)])
|
|
|
|
def test__get_ports(self):
|
|
ofpp = importutils.import_module('ryu.ofproto.ofproto_v1_3_parser')
|
|
reply = [ofpp.OFPPortDescStatsReply(body=[ofpp.OFPPort(name='hoge',
|
|
port_no=8)])]
|
|
sendmsg = mock.Mock(return_value=reply)
|
|
self.mod_agent.ryu_api.send_msg = sendmsg
|
|
result = self.agent._get_ports(self.agent.int_br)
|
|
result = list(result) # convert generator to list.
|
|
self.assertEqual(1, len(result))
|
|
self.assertEqual('hoge', result[0].port_name)
|
|
self.assertEqual(8, result[0].ofport)
|
|
expected_msg = ofpp.OFPPortDescStatsRequest(
|
|
datapath=self.agent.int_br.datapath)
|
|
sendmsg.assert_has_calls([mock.call(app=self.agent.ryuapp,
|
|
msg=expected_msg, reply_cls=ofpp.OFPPortDescStatsReply,
|
|
reply_multi=True)])
|
|
|
|
def test__get_ofport_names(self):
|
|
names = ['p111', 'p222', 'p333']
|
|
ps = [_mock_port(True, x) for x in names]
|
|
with mock.patch.object(self.agent, '_get_ports',
|
|
return_value=ps) as _get_ports:
|
|
result = self.agent._get_ofport_names('hoge')
|
|
_get_ports.assert_called_once_with('hoge')
|
|
self.assertEqual(set(names), result)
|
|
|
|
|
|
class AncillaryBridgesTest(OFAAgentTestCase):
|
|
|
|
def setUp(self):
|
|
super(AncillaryBridgesTest, self).setUp()
|
|
notifier_p = mock.patch(NOTIFIER)
|
|
notifier_cls = notifier_p.start()
|
|
self.notifier = mock.Mock()
|
|
notifier_cls.return_value = self.notifier
|
|
# Avoid rpc initialization for unit tests
|
|
cfg.CONF.set_override('rpc_backend',
|
|
'neutron.openstack.common.rpc.impl_fake')
|
|
cfg.CONF.set_override('report_interval', 0, 'AGENT')
|
|
self.kwargs = self.mod_agent.create_agent_config_map(cfg.CONF)
|
|
|
|
def _test_ancillary_bridges(self, bridges, ancillary):
|
|
device_ids = ancillary[:]
|
|
|
|
def pullup_side_effect(self, *args):
|
|
result = device_ids.pop(0)
|
|
return result
|
|
|
|
with contextlib.nested(
|
|
mock.patch.object(self.mod_agent.OFANeutronAgent,
|
|
'setup_integration_br',
|
|
return_value=mock.Mock()),
|
|
mock.patch('neutron.agent.linux.utils.get_interface_mac',
|
|
return_value='00:00:00:00:00:01'),
|
|
mock.patch.object(self.mod_agent.OVSBridge,
|
|
'get_local_port_mac',
|
|
return_value='00:00:00:00:00:01'),
|
|
mock.patch('neutron.agent.linux.ovs_lib.get_bridges',
|
|
return_value=bridges),
|
|
mock.patch(
|
|
'neutron.agent.linux.ovs_lib.get_bridge_external_bridge_id',
|
|
side_effect=pullup_side_effect)):
|
|
self.agent = self.mod_agent.OFANeutronAgent(
|
|
self.ryuapp, **self.kwargs)
|
|
self.assertEqual(len(ancillary), len(self.agent.ancillary_brs))
|
|
if ancillary:
|
|
bridges = [br.br_name for br in self.agent.ancillary_brs]
|
|
for br in ancillary:
|
|
self.assertIn(br, bridges)
|
|
|
|
def test_ancillary_bridges_single(self):
|
|
bridges = ['br-int', 'br-ex']
|
|
self._test_ancillary_bridges(bridges, ['br-ex'])
|
|
|
|
def test_ancillary_bridges_none(self):
|
|
bridges = ['br-int']
|
|
self._test_ancillary_bridges(bridges, [])
|
|
|
|
def test_ancillary_bridges_multiple(self):
|
|
bridges = ['br-int', 'br-ex1', 'br-ex2']
|
|
self._test_ancillary_bridges(bridges, ['br-ex1', 'br-ex2'])
|