NSX|V add vm to exclude list when the port has no port security

When a compute port with a device-id has no port security, we should
add the device to the nsx exclude list, so the spoof guard will not block it

When the first port with no security is attached to a device, it will be added
to the exclude list. When the last port is detached from the device (or deleted),
the device will be removed from the exclude list

Managing the exclude list is done by retrieving the vm moref from the DVS,
and adding this moref to the exclude list api.

In addition we now allow creating a port without port security, even if the
on the network port security is enabled.

This feature depends on 3 NSXV configuration flags:
spoofguard_enabled=True
use_dvs_features=True
use_exclude_list=True (new flag, True by default)

DocImpact:New configuration flag for this feature use_exclude_list
(True by default)

Change-Id: I3c93c78f8ceca131ee319237d99a90282ab65a3a
This commit is contained in:
Adit Sarfaty 2016-05-23 09:21:26 +03:00 committed by garyk
parent 9902635dc0
commit 14dadb6e3b
6 changed files with 322 additions and 46 deletions

View File

@ -482,6 +482,11 @@ nsxv_opts = [
default=True, default=True,
help=_("(Optional) If True then plugin will use NSXV " help=_("(Optional) If True then plugin will use NSXV "
"spoofguard component for port-security feature.")), "spoofguard component for port-security feature.")),
cfg.BoolOpt('use_exclude_list',
default=True,
help=_("(Optional) If True then plugin will use NSXV exclude "
"list component when port security is disabled and "
"spoofguard is enabled.")),
cfg.ListOpt('tenant_router_types', cfg.ListOpt('tenant_router_types',
default=['shared', 'distributed', 'exclusive'], default=['shared', 'distributed', 'exclusive'],
help=_("Ordered list of router_types to allocate as tenant " help=_("Ordered list of router_types to allocate as tenant "

View File

@ -25,6 +25,7 @@ LOG = logging.getLogger(__name__)
PORTGROUP_PREFIX = 'dvportgroup' PORTGROUP_PREFIX = 'dvportgroup'
QOS_OUT_DIRECTION = 'outgoingPackets' QOS_OUT_DIRECTION = 'outgoingPackets'
QOS_AGENT_NAME = 'dvfilter-generic-vmware' QOS_AGENT_NAME = 'dvfilter-generic-vmware'
API_FIND_ALL_BY_UUID = 'FindAllByUuid'
class DvsManager(object): class DvsManager(object):
@ -301,3 +302,20 @@ class DvsManager(object):
LOG.info(_LI("%(net_id)s delete from %(dvs)s."), LOG.info(_LI("%(net_id)s delete from %(dvs)s."),
{'net_id': net_id, {'net_id': net_id,
'dvs': dvs_utils.dvs_name_get()}) 'dvs': dvs_utils.dvs_name_get()})
def get_vm_moref(self, instance_uuid):
"""Get reference to the VM.
The method will make use of FindAllByUuid to get the VM reference.
This method finds all VM's on the backend that match the
instance_uuid, more specifically all VM's on the backend that have
'config_spec.instanceUuid' set to 'instance_uuid'.
"""
vm_refs = self._session.invoke_api(
self._session.vim,
API_FIND_ALL_BY_UUID,
self._session.vim.service_content.searchIndex,
uuid=instance_uuid,
vmSearch=True,
instanceUuid=True)
if vm_refs:
return vm_refs[0].value

View File

@ -1156,10 +1156,14 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
# First we allocate port in neutron database # First we allocate port in neutron database
neutron_db = super(NsxVPluginV2, self).create_port(context, port) neutron_db = super(NsxVPluginV2, self).create_port(context, port)
# Port port-security is decided by the port-security state on the # Port port-security is decided by the port-security state on the
# network it belongs to # network it belongs to, unless specifically specified here
port_security = self._get_network_security_binding( if validators.is_attr_set(port_data.get(psec.PORTSECURITY)):
context, neutron_db['network_id']) port_security = port_data[psec.PORTSECURITY]
port_data[psec.PORTSECURITY] = port_security else:
port_security = self._get_network_security_binding(
context, neutron_db['network_id'])
port_data[psec.PORTSECURITY] = port_security
self._process_port_port_security_create( self._process_port_port_security_create(
context, port_data, neutron_db) context, port_data, neutron_db)
# Update fields obtained from neutron db (eg: MAC address) # Update fields obtained from neutron db (eg: MAC address)
@ -1220,18 +1224,91 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
'ip_address' in port['fixed_ips'][0]): 'ip_address' in port['fixed_ips'][0]):
return port['fixed_ips'][0]['ip_address'] return port['fixed_ips'][0]['ip_address']
def _count_no_sec_ports_for_device_id(self, context, device_id):
"""Find how many compute ports with this device ID and no security
there are, so we can decide on adding / removing the device from
the exclusion list
"""
filters = {'device_id': [device_id],
'device_owner': ['compute:None']}
ports = self.get_ports(context.elevated(), filters=filters)
return len([p for p in ports
if validators.is_attr_set(p.get(ext_vnic_idx.VNIC_INDEX))
and not p[psec.PORTSECURITY]])
def _add_vm_to_exclude_list(self, context, device_id, port_id):
if (self._dvs and
cfg.CONF.nsxv.use_exclude_list):
# first time for this vm (we expect the count to be 1 already
# because the DB was already updated)
if (self._count_no_sec_ports_for_device_id(
context, device_id) <= 1):
vm_moref = self._dvs.get_vm_moref(device_id)
if vm_moref is not None:
try:
self.nsx_v.vcns.add_vm_to_exclude_list(vm_moref)
LOG.info(_LI("Add VM %(dev)s to exclude list on "
"behalf of port %(port)s: added to "
"list"),
{"dev": device_id, "port": port_id})
except n_exc.BadRequest:
LOG.error(_LE("Failed to add vm %(device)s "
"moref %(moref)s to exclusion list"),
{'device': device_id, 'moref': vm_moref})
else:
LOG.info(_LI("Add VM %(dev)s to exclude list on behalf of "
"port %(port)s: already in list"),
{"dev": device_id, "port": port_id})
def _remove_vm_from_exclude_list(self, context, device_id, port_id,
expected_count=0):
if (self._dvs and
cfg.CONF.nsxv.use_exclude_list):
# No ports left in DB (expected count is 0 or 1 depending
# on whether the DB was already updated),
# So we can remove it from the backend exclude list
if (self._count_no_sec_ports_for_device_id(
context, device_id) <= expected_count):
vm_moref = self._dvs.get_vm_moref(device_id)
if vm_moref is not None:
try:
self.nsx_v.vcns.delete_vm_from_exclude_list(vm_moref)
LOG.info(_LI("Remove VM %(dev)s from exclude list on "
"behalf of port %(port)s: removed from "
"list"),
{"dev": device_id, "port": port_id})
except n_exc.BadRequest:
LOG.error(_LE("Failed to delete vm %(device)s "
"moref %(moref)s from exclusion list"),
{'device': device_id, 'moref': vm_moref})
else:
LOG.info(_LI("Remove VM %(dev)s from exclude list on behalf "
"of port %(port)s: other ports still in list"),
{"dev": device_id, "port": port_id})
def update_port(self, context, id, port): def update_port(self, context, id, port):
with locking.LockManager.get_lock('port-update-%s' % id): with locking.LockManager.get_lock('port-update-%s' % id):
return self._update_port(context, id, port)
def _update_port(self, context, id, port): original_port = super(NsxVPluginV2, self).get_port(context, id)
is_compute_port = self._is_compute_port(original_port)
device_id = original_port['device_id']
if is_compute_port and device_id:
# Lock on the device ID to make sure we do not change/delete
# ports of the same device at the same time
with locking.LockManager.get_lock(
'port-device-%s' % device_id):
return self._update_port(context, id, port, original_port,
is_compute_port, device_id)
else:
return self._update_port(context, id, port, original_port,
is_compute_port, device_id)
def _update_port(self, context, id, port, original_port, is_compute_port,
device_id):
attrs = port[attr.PORT] attrs = port[attr.PORT]
port_data = port['port'] port_data = port['port']
original_port = super(NsxVPluginV2, self).get_port(context, id)
if addr_pair.ADDRESS_PAIRS in attrs: if addr_pair.ADDRESS_PAIRS in attrs:
self._validate_address_pairs(attrs, original_port) self._validate_address_pairs(attrs, original_port)
is_compute_port = self._is_compute_port(original_port)
device_id = original_port['device_id']
has_port_security = (cfg.CONF.nsxv.spoofguard_enabled and has_port_security = (cfg.CONF.nsxv.spoofguard_enabled and
original_port[psec.PORTSECURITY]) original_port[psec.PORTSECURITY])
@ -1242,6 +1319,11 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
msg = (_('Cannot set fixed ips and device owner together for port ' msg = (_('Cannot set fixed ips and device owner together for port '
'%s') % original_port['id']) '%s') % original_port['id'])
raise n_exc.BadRequest(resource='port', msg=msg) raise n_exc.BadRequest(resource='port', msg=msg)
# We do not support updating the port-security field (yet)
if psec.PORTSECURITY in port['port']:
msg = (_('Cannot modify the port security of port %s after port '
'creation') % original_port['id'])
raise NotImplementedError(msg)
# TODO(roeyc): create a method '_process_vnic_index_update' from the # TODO(roeyc): create a method '_process_vnic_index_update' from the
# following code block # following code block
@ -1262,13 +1344,9 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
original_port['fixed_ips']) original_port['fixed_ips'])
self._update_vnic_assigned_addresses( self._update_vnic_assigned_addresses(
context.session, original_port, vnic_id) context.session, original_port, vnic_id)
else: elif cfg.CONF.nsxv.spoofguard_enabled:
LOG.warning(_LW("port-security is disabled on port %(id)s, " # Add vm to the exclusion list, since it has no port security
"VM tools must be installed on instance " self._add_vm_to_exclude_list(context, device_id, id)
"%(device_id)s for security-groups to "
"function properly."),
{'id': id,
'device_id': original_port['device_id']})
delete_security_groups = self._check_update_deletes_security_groups( delete_security_groups = self._check_update_deletes_security_groups(
port) port)
@ -1279,8 +1357,6 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
with context.session.begin(subtransactions=True): with context.session.begin(subtransactions=True):
ret_port = super(NsxVPluginV2, self).update_port( ret_port = super(NsxVPluginV2, self).update_port(
context, id, port) context, id, port)
if psec.PORTSECURITY in port['port']:
raise NotImplementedError()
# copy values over - except fixed_ips as # copy values over - except fixed_ips as
# they've already been processed # they've already been processed
updates_fixed_ips = port['port'].pop('fixed_ips', []) updates_fixed_ips = port['port'].pop('fixed_ips', [])
@ -1372,6 +1448,11 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
if cfg.CONF.nsxv.spoofguard_enabled: if cfg.CONF.nsxv.spoofguard_enabled:
self._remove_vnic_from_spoofguard_policy( self._remove_vnic_from_spoofguard_policy(
context.session, original_port['network_id'], vnic_id) context.session, original_port['network_id'], vnic_id)
# remove vm from the exclusion list when it is detached
# from the device if it has no port security
if not original_port[psec.PORTSECURITY]:
self._remove_vm_from_exclude_list(
context, device_id, id)
self._delete_port_vnic_index_mapping(context, id) self._delete_port_vnic_index_mapping(context, id)
self._delete_dhcp_static_binding(context, original_port) self._delete_dhcp_static_binding(context, original_port)
else: else:
@ -1402,6 +1483,22 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
def delete_port(self, context, id, l3_port_check=True, def delete_port(self, context, id, l3_port_check=True,
nw_gw_port_check=True): nw_gw_port_check=True):
neutron_db_port = self.get_port(context, id)
device_id = neutron_db_port['device_id']
is_compute_port = self._is_compute_port(neutron_db_port)
if is_compute_port and device_id:
# Lock on the device ID to make sure we do not change/delete
# ports of the same device at the same time
with locking.LockManager.get_lock(
'port-device-%s' % device_id):
return self._delete_port(context, id, l3_port_check,
nw_gw_port_check, neutron_db_port)
else:
return self._delete_port(context, id, l3_port_check,
nw_gw_port_check, neutron_db_port)
def _delete_port(self, context, id, l3_port_check,
nw_gw_port_check, neutron_db_port):
"""Deletes a port on a specified Virtual Network. """Deletes a port on a specified Virtual Network.
If the port contains a remote interface attachment, the remote If the port contains a remote interface attachment, the remote
@ -1416,7 +1513,6 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
# a l3 router. If so, we should prevent deletion here # a l3 router. If so, we should prevent deletion here
if l3_port_check: if l3_port_check:
self.prevent_l3_port_deletion(context, id) self.prevent_l3_port_deletion(context, id)
neutron_db_port = self.get_port(context, id)
if neutron_db_port['device_owner'] in [constants.DEVICE_OWNER_DHCP]: if neutron_db_port['device_owner'] in [constants.DEVICE_OWNER_DHCP]:
msg = (_('Can not delete DHCP port %s') % neutron_db_port['id']) msg = (_('Can not delete DHCP port %s') % neutron_db_port['id'])
raise n_exc.BadRequest(resource='port', msg=msg) raise n_exc.BadRequest(resource='port', msg=msg)
@ -1439,6 +1535,15 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
LOG.error(_LE('Could not delete the spoofguard policy. ' LOG.error(_LE('Could not delete the spoofguard policy. '
'Exception %s'), e) 'Exception %s'), e)
if (cfg.CONF.nsxv.spoofguard_enabled and
not neutron_db_port[psec.PORTSECURITY] and
self._is_compute_port(neutron_db_port)):
device_id = neutron_db_port['device_id']
# Note that we expect to find 1 relevant port in the DB still
# because this port was not yet deleted
self._remove_vm_from_exclude_list(context, device_id, id,
expected_count=1)
self.disassociate_floatingips(context, id) self.disassociate_floatingips(context, id)
with context.session.begin(subtransactions=True): with context.session.begin(subtransactions=True):
super(NsxVPluginV2, self).delete_port(context, id) super(NsxVPluginV2, self).delete_port(context, id)

View File

@ -47,6 +47,7 @@ VDN_PREFIX = '/api/2.0/vdn'
SERVICES_PREFIX = '/api/2.0/services' SERVICES_PREFIX = '/api/2.0/services'
SPOOFGUARD_PREFIX = '/api/4.0/services/spoofguard' SPOOFGUARD_PREFIX = '/api/4.0/services/spoofguard'
TRUSTSTORE_PREFIX = '%s/%s' % (SERVICES_PREFIX, 'truststore') TRUSTSTORE_PREFIX = '%s/%s' % (SERVICES_PREFIX, 'truststore')
EXCLUDELIST_PREFIX = '/api/2.1/app/excludelist'
#LbaaS Constants #LbaaS Constants
LOADBALANCER_SERVICE = "loadbalancer/config" LOADBALANCER_SERVICE = "loadbalancer/config"
@ -760,6 +761,14 @@ class Vcns(object):
else: else:
return uri_path return uri_path
def add_vm_to_exclude_list(self, vm_id):
uri = '%s/%s' % (EXCLUDELIST_PREFIX, vm_id)
return self.do_request(HTTP_PUT, uri)
def delete_vm_from_exclude_list(self, vm_id):
uri = '%s/%s' % (EXCLUDELIST_PREFIX, vm_id)
return self.do_request(HTTP_DELETE, uri)
def get_scoping_objects(self): def get_scoping_objects(self):
uri = '%s/usermgmt/scopingobjects' % SERVICES_PREFIX uri = '%s/usermgmt/scopingobjects' % SERVICES_PREFIX
h, scoping_objects = self.do_request(HTTP_GET, uri, decode=False, h, scoping_objects = self.do_request(HTTP_GET, uri, decode=False,

View File

@ -145,6 +145,15 @@ class NsxVPluginV2TestCase(test_plugin.NeutronDbPluginV2TestCase):
plugin_instance._get_edge_id_by_rtr_id.return_value = False plugin_instance._get_edge_id_by_rtr_id.return_value = False
plugin_instance.edge_manager.is_dhcp_opt_enabled = True plugin_instance.edge_manager.is_dhcp_opt_enabled = True
def _get_core_plugin_with_dvs(self):
# enable dvs features to allow policy with QOS
cfg.CONF.set_default('use_dvs_features', True, 'nsxv')
plugin = manager.NeutronManager.get_plugin()
with mock.patch.object(dvs_utils, 'dvs_create_session'):
with mock.patch.object(dvs.DvsManager, '_get_dvs_moref'):
plugin._dvs = dvs.DvsManager()
return plugin
def test_get_vlan_network_name(self): def test_get_vlan_network_name(self):
p = manager.NeutronManager.get_plugin() p = manager.NeutronManager.get_plugin()
net_id = uuidutils.generate_uuid() net_id = uuidutils.generate_uuid()
@ -492,15 +501,6 @@ class TestNetworksV2(test_plugin.TestNetworksV2, NsxVPluginV2TestCase):
context.get_admin_context(), context.get_admin_context(),
net['network']['id'], data) net['network']['id'], data)
def _get_core_plugin_with_dvs(self):
# enable dvs features to allow policy with QOS
cfg.CONF.set_default('use_dvs_features', True, 'nsxv')
plugin = manager.NeutronManager.get_plugin()
with mock.patch.object(dvs_utils, 'dvs_create_session'):
with mock.patch.object(dvs.DvsManager, '_get_dvs_moref'):
plugin._dvs = dvs.DvsManager()
return plugin
@mock.patch.object(dvs.DvsManager, 'update_port_groups_config') @mock.patch.object(dvs.DvsManager, 'update_port_groups_config')
@mock.patch.object(qos_utils.NsxVQosRule, '_init_from_policy_id') @mock.patch.object(qos_utils.NsxVQosRule, '_init_from_policy_id')
def test_create_network_with_qos_policy(self, def test_create_network_with_qos_policy(self,
@ -3487,27 +3487,9 @@ class TestNSXPortSecurity(test_psec.TestPortSecurity,
# Security Groups can be used even when port-security is disabled # Security Groups can be used even when port-security is disabled
pass pass
def test_create_port_security_overrides_network_value(self):
pass
def test_create_port_with_security_group_and_net_sec_false(self): def test_create_port_with_security_group_and_net_sec_false(self):
pass pass
def test_create_port_security_doese_not_overrides_network_value(self):
"""NSXv plugin port port-security-enabled is decided by the networks
port-security state
"""
res = self._create_network('json', 'net1', True,
arg_list=('port_security_enabled',),
port_security_enabled=False)
net = self.deserialize('json', res)
res = self._create_port('json', net['network']['id'],
arg_list=('port_security_enabled',),
port_security_enabled=True)
port = self.deserialize('json', res)
self.assertEqual(port['port'][psec.PORTSECURITY], False)
self._delete('ports', port['port']['id'])
def test_update_port_remove_port_security_security_group(self): def test_update_port_remove_port_security_security_group(self):
pass pass
@ -3525,6 +3507,157 @@ class TestNSXPortSecurity(test_psec.TestPortSecurity,
context.get_admin_context(), context.get_admin_context(),
port['port']['id'], update_port) port['port']['id'], update_port)
def _create_compute_port(self, network_name, device_id, port_security):
# create a network without port security
res = self._create_network('json', network_name, True)
net = self.deserialize('json', res)
# create a compute port with this network and a device
res = self._create_port('json', net['network']['id'],
arg_list=('port_security_enabled',
'device_id',
'device_owner',),
port_security_enabled=port_security,
device_id=device_id,
device_owner='compute:None')
return self.deserialize('json', res)
def _add_vnic_to_port(self, port_id, add_exclude, vnic_index):
"""Add vnic to a port and check if the device was added to the
exclude list
"""
plugin = self._get_core_plugin_with_dvs()
vm_moref = 'dummy_moref'
with mock.patch.object(plugin._dvs, 'get_vm_moref',
return_value=vm_moref):
with mock.patch.object(
plugin.nsx_v.vcns,
'add_vm_to_exclude_list') as exclude_list_add:
data = {'port': {'vnic_index': vnic_index}}
self.new_update_request(
'ports', data, port_id).get_response(self.api)
if add_exclude:
# make sure the vm was added to the exclude list
exclude_list_add.assert_called_once_with(vm_moref)
else:
self.assertFalse(exclude_list_add.called)
def _del_vnic_from_port(self, port_id, del_exclude):
"""Delete the vnic & device id from the port and check if
the device was removed from the exclude list
"""
plugin = self._get_core_plugin_with_dvs()
vm_moref = 'dummy_moref'
with mock.patch.object(plugin._dvs, 'get_vm_moref',
return_value=vm_moref):
with mock.patch.object(
plugin.nsx_v.vcns,
'delete_vm_from_exclude_list') as exclude_list_del:
data = {'port': {'vnic_index': None, 'device_id': ''}}
self.new_update_request(
'ports', data, port_id).get_response(self.api)
if del_exclude:
# make sure the vm was added to the exclude list
exclude_list_del.assert_called_once_with(vm_moref)
else:
self.assertFalse(exclude_list_del.called)
def _del_port_with_vnic(self, port_id, del_exclude):
"""Delete port with vnic, and check if the device was removed
from the exclude list
"""
plugin = self._get_core_plugin_with_dvs()
vm_moref = 'dummy_moref'
with mock.patch.object(plugin._dvs, 'get_vm_moref',
return_value=vm_moref):
with mock.patch.object(
plugin.nsx_v.vcns,
'delete_vm_from_exclude_list') as exclude_list_del:
self.new_delete_request(
'ports', port_id).get_response(self.api)
if del_exclude:
# make sure the vm was added to the exclude list
exclude_list_del.assert_called_once_with(vm_moref)
else:
self.assertFalse(exclude_list_del.called)
def test_update_port_no_security_with_vnic(self):
device_id = _uuid()
# create a compute port without port security
port = self._create_compute_port('net1', device_id, False)
# add vnic to the port
self._add_vnic_to_port(port['port']['id'], True, 3)
# delete vnic from the port
self._del_vnic_from_port(port['port']['id'], True)
def test_update_multiple_port_no_security_with_vnic(self):
device_id = _uuid()
# create a compute port without port security
port1 = self._create_compute_port('net1', device_id, False)
# add vnic to the port
self._add_vnic_to_port(port1['port']['id'], True, 3)
# create another compute port without port security on the same device
port2 = self._create_compute_port('net2', device_id, False)
# add vnic to the port (no need to add to exclude list again)
self._add_vnic_to_port(port2['port']['id'], False, 4)
# delete vnics from the port
self._del_vnic_from_port(port1['port']['id'], False)
self._del_vnic_from_port(port2['port']['id'], True)
def test_update_mixed_port_no_security_with_vnic(self):
device_id = _uuid()
# create a compute port without port security
port1 = self._create_compute_port('net1', device_id, True)
# add vnic to the port
self._add_vnic_to_port(port1['port']['id'], False, 3)
irrelevant_device_id = _uuid()
# create a compute port without port security for a different device
port2 = self._create_compute_port('net1', irrelevant_device_id, True)
# add vnic to the port
self._add_vnic_to_port(port2['port']['id'], False, 3)
# create another compute port without port security on the same device
port3 = self._create_compute_port('net2', device_id, False)
# add vnic to the port (no need to add to exclude list again)
self._add_vnic_to_port(port3['port']['id'], True, 4)
# delete vnics from the port
self._del_vnic_from_port(port1['port']['id'], False)
self._del_vnic_from_port(port3['port']['id'], True)
self._del_vnic_from_port(port2['port']['id'], False)
def test_delete_port_no_security_with_vnic(self):
device_id = _uuid()
# create a compute port without port security
port = self._create_compute_port('net1', device_id, False)
# add vnic to the port
self._add_vnic_to_port(port['port']['id'], True, 3)
# delete port with the vnic
self._del_port_with_vnic(port['port']['id'], True)
def test_delete_multiple_port_no_security_with_vnic(self):
device_id = _uuid()
# create a compute port without port security
port1 = self._create_compute_port('net1', device_id, False)
# add vnic to the port
self._add_vnic_to_port(port1['port']['id'], True, 3)
# create another compute port without port security on the same device
port2 = self._create_compute_port('net2', device_id, False)
# add vnic to the port (no need to add to exclude list again)
self._add_vnic_to_port(port2['port']['id'], False, 4)
# delete ports with the vnics
self._del_port_with_vnic(port2['port']['id'], False)
self._del_port_with_vnic(port1['port']['id'], True)
class TestSharedRouterTestCase(L3NatTest, L3NatTestCaseBase, class TestSharedRouterTestCase(L3NatTest, L3NatTestCaseBase,
test_l3_plugin.L3NatTestCaseMixin, test_l3_plugin.L3NatTestCaseMixin,

View File

@ -1073,6 +1073,12 @@ class FakeVcns(object):
def inactivate_vnic_assigned_addresses(self, policy_id, vnic_id): def inactivate_vnic_assigned_addresses(self, policy_id, vnic_id):
pass pass
def add_vm_to_exclude_list(self, vm_id):
pass
def delete_vm_from_exclude_list(self, vm_id):
pass
def reset_all(self): def reset_all(self):
self._jobs.clear() self._jobs.clear()
self._edges.clear() self._edges.clear()