From 46bcd38717a785a74bce36a4250fb14f8de7cd4b Mon Sep 17 00:00:00 2001 From: Adit Sarfaty Date: Wed, 5 Dec 2018 14:57:31 +0200 Subject: [PATCH] Policy plugin floating IPs support Including unit test & CI tests for routers & floating IPs, and necessary routing related fixes. Change-Id: If4f38e6081f1b68369102a1e9839feac8b33530e --- devstack/nsx_p/devstackgaterc | 8 +- devstack/tools/nsxp_cleanup.py | 39 +- vmware_nsx/plugins/nsx_p/plugin.py | 90 ++++- .../tests/unit/common_plugin/common_v3.py | 173 ++++++++ vmware_nsx/tests/unit/nsx_p/test_plugin.py | 378 ++++++++++++++++++ vmware_nsx/tests/unit/nsx_v3/test_plugin.py | 222 +++------- 6 files changed, 694 insertions(+), 216 deletions(-) create mode 100644 vmware_nsx/tests/unit/common_plugin/common_v3.py diff --git a/devstack/nsx_p/devstackgaterc b/devstack/nsx_p/devstackgaterc index b39bb1dc5a..0a46f5c613 100644 --- a/devstack/nsx_p/devstackgaterc +++ b/devstack/nsx_p/devstackgaterc @@ -39,12 +39,8 @@ r="$r|(?:tempest\.api\.network\.admin\.test_external_network_extension\.External # Some ICMP types are not supported by the NSX backend r="$r|(?:tempest\.api\.network\.test_security_groups\.SecGroupTest\.test_create_security_group_rule_with_icmp_type_code.*)" -# Temporarily exclude packages which are not yet supported by the P plugin -r="$r|(?:tempest\.api\.network\.admin\.test_floating_ips_admin_actions.*)" -r="$r|(?:tempest\.api\.network\.admin\.test_routers.*)" -r="$r|(?:tempest\.api\.network\.admin\.test_routers_negative.*)" -r="$r|(?:tempest\.api\.network\.test_floating_ips.*)" -r="$r|(?:tempest\.api\.network\.test_routers.*)" +# TODO(asarfaty): Make this test pass +r="$r|(?:tempest\.api\.network\.test_routers_negative\.RoutersNegativeTest\.test_router_remove_interface_in_use_returns_409.*)" # End list of exclusions. r="$r)" diff --git a/devstack/tools/nsxp_cleanup.py b/devstack/tools/nsxp_cleanup.py index 489e5c86f8..3c28c85fad 100755 --- a/devstack/tools/nsxp_cleanup.py +++ b/devstack/tools/nsxp_cleanup.py @@ -70,16 +70,16 @@ class NSXClient(object): # allow admin user to delete entities created # under openstack principal identity allow_overwrite_header=True) - self.nsxlib = v3.NsxPolicyLib(nsxlib_config) + self.nsxpolicy = v3.NsxPolicyLib(nsxlib_config) def get_nsx_os_domains(self): - domains = self.get_os_resources(self.nsxlib.domain.list()) + domains = self.get_os_resources(self.nsxpolicy.domain.list()) return [d['id'] for d in domains] def cleanup_domains(self, domains): """Delete all OS created NSX Policy segments ports per segment""" for domain_id in domains: - self.nsxlib.domain.delete(domain_id) + self.nsxpolicy.domain.delete(domain_id) def get_os_resources(self, resources): """ @@ -95,8 +95,8 @@ class NSXClient(object): Retrieve all NSX policy groups & maps created from OpenStack (by tags) If the DB is available - use only objects in the neutron DB """ - groups = self.get_os_resources(self.nsxlib.group.list(domain_id)) - maps = self.get_os_resources(self.nsxlib.comm_map.list(domain_id)) + groups = self.get_os_resources(self.nsxpolicy.group.list(domain_id)) + maps = self.get_os_resources(self.nsxpolicy.comm_map.list(domain_id)) if self.neutron_db: db_sgs = self.neutron_db.get_security_groups() @@ -110,36 +110,43 @@ class NSXClient(object): print("Number of OS Communication maps of domain %s to be deleted: " "%s" % (domain_id, len(maps))) for m in maps: - self.nsxlib.comm_map.delete(domain_id, m['id']) + self.nsxpolicy.comm_map.delete(domain_id, m['id']) print("Number of OS Groups of domain %s to be deleted: " "%s" % (domain_id, len(groups))) for grp in groups: - self.nsxlib.group.delete(domain_id, grp['id']) + self.nsxpolicy.group.delete(domain_id, grp['id']) def get_os_nsx_tier1_routers(self): """ Retrieve all NSX policy routers created from OpenStack (by tags) If the DB is available - use only objects in the neutron DB """ - routers = self.get_os_resources(self.nsxlib.tier1.list()) + routers = self.get_os_resources(self.nsxpolicy.tier1.list()) if routers and self.neutron_db: db_routers = self.neutron_db.get_routers() routers = [r for r in routers if r['id'] in db_routers] return routers + def cleanup_tier1_nat_rules(self, tier1_uuid): + rules = self.nsxpolicy.tier1_nat_rule.list(tier1_uuid) + for rule in rules: + self.nsxpolicy.tier1_nat_rule.delete(tier1_uuid, rule['id']) + def cleanup_tier1_routers(self): """Delete all OS created NSX Policy routers""" routers = self.get_os_nsx_tier1_routers() print("Number of OS Tier1 routers to be deleted: %s" % len(routers)) for rtr in routers: - self.nsxlib.tier1.delete(rtr['id']) + # remove all nat rules from this router before deletion + self.cleanup_tier1_nat_rules(rtr['id']) + self.nsxpolicy.tier1.delete(rtr['id']) def get_os_nsx_segments(self): """ Retrieve all NSX policy segments created from OpenStack (by tags) If the DB is available - use only objects in the neutron DB """ - segments = self.get_os_resources(self.nsxlib.segment.list()) + segments = self.get_os_resources(self.nsxpolicy.segment.list()) if segments and self.neutron_db: db_networks = self.neutron_db.get_networks() segments = [s for s in segments if s['id'] in db_networks] @@ -153,8 +160,8 @@ class NSXClient(object): # Delete all the ports self.cleanup_segment_ports(s['id']) # Disassociate from a tier1 router - self.nsxlib.segment.update(s['id'], tier1_id=None) - self.nsxlib.segment.delete(s['id']) + self.nsxpolicy.segment.update(s['id'], tier1_id=None) + self.nsxpolicy.segment.delete(s['id']) def get_os_nsx_segment_ports(self, segment_id): """ @@ -162,7 +169,7 @@ class NSXClient(object): If the DB is available - use only objects in the neutron DB """ segment_ports = self.get_os_resources( - self.nsxlib.segment_port.list(segment_id)) + self.nsxpolicy.segment_port.list(segment_id)) if segment_ports and self.neutron_db: db_ports = self.neutron_db.get_ports() segment_ports = [s for s in segment_ports if s['id'] in db_ports] @@ -172,7 +179,7 @@ class NSXClient(object): """Delete all OS created NSX Policy segments ports per segment""" segment_ports = self.get_os_nsx_segment_ports(segment_id) for p in segment_ports: - self.nsxlib.segment_port.delete(segment_id, p['id']) + self.nsxpolicy.segment_port.delete(segment_id, p['id']) def get_os_nsx_services(self): """ @@ -180,7 +187,7 @@ class NSXClient(object): (by tags) If the DB is available - use only objects in the neutron DB """ - services = self.get_os_resources(self.nsxlib.service.list()) + services = self.get_os_resources(self.nsxpolicy.service.list()) if services and self.neutron_db: db_rules = self.neutron_db.get_security_groups_rules() services = [s for s in services if s['id'] in db_rules] @@ -191,7 +198,7 @@ class NSXClient(object): services = self.get_os_nsx_services() print("Number of OS rule services to be deleted: %s" % len(services)) for srv in services: - self.nsxlib.service.delete(srv['id']) + self.nsxpolicy.service.delete(srv['id']) def cleanup_all(self): """ diff --git a/vmware_nsx/plugins/nsx_p/plugin.py b/vmware_nsx/plugins/nsx_p/plugin.py index 3248523cb2..3869d17cda 100644 --- a/vmware_nsx/plugins/nsx_p/plugin.py +++ b/vmware_nsx/plugins/nsx_p/plugin.py @@ -704,6 +704,10 @@ class NsxPolicyPlugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin, # first update neutron (this will perform all types of validations) port_data = self.get_port(context, port_id) net_id = port_data['network_id'] + # if needed, check to see if this is a port owned by + # a l3 router. If so, we should prevent deletion here + if l3_port_check: + self.prevent_l3_port_deletion(context, port_id) self.disassociate_floatingips(context, port_id) super(NsxPolicyPlugin, self).delete_port(context, port_id) @@ -882,7 +886,8 @@ class NsxPolicyPlugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin, action=policy_constants.NAT_ACTION_SNAT, #sequence_number=GW_NAT_PRI # TODO(asarfaty) handle priorities translated_network=gw_ip, - source_network=subnet['cidr']) + source_network=subnet['cidr'], + firewall_match=policy_constants.NAT_FIREWALL_MATCH_INTERNAL) def _get_snat_rule_id(self, subnet): return 'S-' + subnet['id'] @@ -899,7 +904,8 @@ class NsxPolicyPlugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin, nat_rule_id=self._get_no_dnat_rule_id(subnet), action=policy_constants.NAT_ACTION_NO_DNAT, #sequence_number=GW_NAT_PRI # TODO(asarfaty) handle priorities - destination_network=subnet['cidr']) + destination_network=subnet['cidr'], + firewall_match=policy_constants.NAT_FIREWALL_MATCH_BYPASS) def _del_subnet_no_dnat_rule(self, router_id, subnet): # Delete the previously created NO-DNAT rules @@ -946,6 +952,9 @@ class NsxPolicyPlugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin, if edge_cluster: self.nsxpolicy.tier1.set_edge_cluster_path( router_id, edge_cluster) + else: + LOG.error("Tier0 %s does not have an edge cluster", + new_tier0_uuid) if actions['remove_snat_rules']: for subnet in router_subnets: @@ -1059,6 +1068,7 @@ class NsxPolicyPlugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin, context, router_id, router) def add_router_interface(self, context, router_id, interface_info): + LOG.info("Adding router %s interface %s", router_id, interface_info) network_id = self._get_interface_network(context, interface_info) extern_net = self._network_is_external(context, network_id) router_db = self._get_router(context, router_id) @@ -1087,8 +1097,10 @@ class NsxPolicyPlugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin, net['name'] or 'network', network_id) segment_id = self._get_network_nsx_segment_id(context, network_id) subnet = self.get_subnet(context, info['subnet_ids'][0]) + cidr_prefix = int(subnet['cidr'].split('/')[1]) + gw_addr = "%s/%s" % (subnet['gateway_ip'], cidr_prefix) pol_subnet = policy_defs.Subnet( - gateway_address=("%s/32" % subnet.get('gateway_ip'))) + gateway_address=gw_addr) self.nsxpolicy.segment.update(segment_id, name=net_name, tier1_id=router_id, @@ -1116,6 +1128,7 @@ class NsxPolicyPlugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin, return info def remove_router_interface(self, context, router_id, interface_info): + LOG.info("Removing router %s interface %s", router_id, interface_info) # find the subnet - it is need for removing the SNAT rule subnet = subnet_id = None if 'port_id' in interface_info: @@ -1158,6 +1171,38 @@ class NsxPolicyPlugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin, {'id': network_id, 'e': ex}) return info + def _get_fip_snat_rule_id(self, fip_id): + return 'S-' + fip_id + + def _get_fip_dnat_rule_id(self, fip_id): + return 'D-' + fip_id + + def _add_fip_nat_rules(self, tier1_id, fip_id, ext_ip, int_ip): + self.nsxpolicy.tier1_nat_rule.create_or_overwrite( + 'snat for fip %s' % fip_id, + tier1_id, + nat_rule_id=self._get_fip_snat_rule_id(fip_id), + action=policy_constants.NAT_ACTION_SNAT, + translated_network=ext_ip, + source_network=int_ip, + firewall_match=policy_constants.NAT_FIREWALL_MATCH_INTERNAL) + self.nsxpolicy.tier1_nat_rule.create_or_overwrite( + 'dnat for fip %s' % fip_id, + tier1_id, + nat_rule_id=self._get_fip_dnat_rule_id(fip_id), + action=policy_constants.NAT_ACTION_DNAT, + translated_network=int_ip, + destination_network=ext_ip, + firewall_match=policy_constants.NAT_FIREWALL_MATCH_INTERNAL) + + def _delete_fip_nat_rules(self, tier1_id, fip_id): + self.nsxpolicy.tier1_nat_rule.delete( + tier1_id, + nat_rule_id=self._get_fip_snat_rule_id(fip_id)) + self.nsxpolicy.tier1_nat_rule.delete( + tier1_id, + nat_rule_id=self._get_fip_dnat_rule_id(fip_id)) + def create_floatingip(self, context, floatingip): new_fip = super(NsxPolicyPlugin, self).create_floatingip( context, floatingip, initial_status=( @@ -1167,42 +1212,51 @@ class NsxPolicyPlugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin, router_id = new_fip['router_id'] if not router_id: return new_fip - #TODO(asarfaty): Update the NSX router + + try: + self._add_fip_nat_rules( + router_id, new_fip['id'], + new_fip['floating_ip_address'], + new_fip['fixed_ip_address']) + except nsx_lib_exc.ManagerError: + with excutils.save_and_reraise_exception(): + self.delete_floatingip(context, new_fip['id']) + return new_fip def delete_floatingip(self, context, fip_id): fip = self.get_floatingip(context, fip_id) router_id = fip['router_id'] - port_id = fip['port_id'] - LOG.debug("Deleting floating IP %s. Router %s, Port %s", - fip_id, router_id, port_id) - if router_id: - #TODO(asarfaty): Update the NSX router - pass + self._delete_fip_nat_rules(router_id, fip_id) super(NsxPolicyPlugin, self).delete_floatingip(context, fip_id) def update_floatingip(self, context, fip_id, floatingip): old_fip = self.get_floatingip(context, fip_id) - old_port_id = old_fip['port_id'] new_status = (const.FLOATINGIP_STATUS_ACTIVE if floatingip['floatingip'].get('port_id') else const.FLOATINGIP_STATUS_DOWN) new_fip = super(NsxPolicyPlugin, self).update_floatingip( context, fip_id, floatingip) router_id = new_fip['router_id'] - new_port_id = new_fip['port_id'] + + if (old_fip['router_id'] and + (not router_id or old_fip['router_id'] != router_id)): + # Delete the old rules (if the router did not change - rewriting + # the rules with _add_fip_nat_rules is enough) + self._delete_fip_nat_rules(old_fip['router_id'], fip_id) if router_id: - #TODO(asarfaty): Update the NSX router - LOG.debug("Updating floating IP %s. Router %s, Port %s " - "(old port %s)", - fip_id, router_id, new_port_id, old_port_id) + self._add_fip_nat_rules( + router_id, new_fip['id'], + new_fip['floating_ip_address'], + new_fip['fixed_ip_address']) if new_fip['status'] != new_status: new_fip['status'] = new_status self.update_floatingip_status(context, fip_id, new_status) + return new_fip def disassociate_floatingips(self, context, port_id): @@ -1213,8 +1267,8 @@ class NsxPolicyPlugin(agentschedulers_db.AZDhcpAgentSchedulerDbMixin, if not fip_db.router_id: continue if fip_db.router_id: - # TODO(asarfaty): Update the NSX logical router - pass + # Delete the old rules + self._delete_fip_nat_rules(fip_db.router_id, fip_db.id) self.update_floatingip_status(context, fip_db.id, const.FLOATINGIP_STATUS_DOWN) diff --git a/vmware_nsx/tests/unit/common_plugin/common_v3.py b/vmware_nsx/tests/unit/common_plugin/common_v3.py new file mode 100644 index 0000000000..49f2029420 --- /dev/null +++ b/vmware_nsx/tests/unit/common_plugin/common_v3.py @@ -0,0 +1,173 @@ +# Copyright 2018 VMware, Inc. +# All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import contextlib + +import decorator + +from neutron.tests.unit.db import test_db_base_plugin_v2 as test_plugin + + +class FixExternalNetBaseTest(object): + """Base class providing utilities for handling tests which require updating + a network to be external, which is not supported for the NSX-v3 and NSX-P + plugins. + """ + def setUp(self, *args, **kwargs): + self.original_subnet = self.subnet + self.original_network = self.network + self.subnet_calls = [] + super(FixExternalNetBaseTest, self).setUp(*args, **kwargs) + + def _set_net_external(self, net_id): + # This action is not supported by the V3 plugin + pass + + def _create_external_network(self): + data = {'network': {'name': 'net1', + 'router:external': 'True', + 'tenant_id': 'tenant_one', + 'provider:physical_network': 'stam'}} + network_req = self.new_create_request('networks', data) + network = self.deserialize(self.fmt, + network_req.get_response(self.api)) + return network + + def external_subnet(self, **kwargs): + if 'network' in kwargs: + return self.original_subnet(**kwargs) + ext_net = self._create_external_network() + return self.original_subnet(network=ext_net, **kwargs) + + def external_subnet_by_list(self, *args, **kwargs): + if len(self.subnet_calls) > 0: + result = self.subnet_calls[0](*args, **kwargs) + del self.subnet_calls[0] + else: + # back to normal + self.subnet = self.original_subnet + result = self.subnet(*args, **kwargs) + return result + + @contextlib.contextmanager + def floatingip_with_assoc(self, port_id=None, fmt=None, fixed_ip=None, + public_cidr='11.0.0.0/24', set_context=False, + tenant_id=None, **kwargs): + # Override super implementation to avoid changing the network to + # external after creation + with self._create_l3_ext_network() as ext_net,\ + self.subnet(network=ext_net, cidr=public_cidr, + set_context=set_context, + tenant_id=tenant_id) as public_sub: + private_port = None + if port_id: + private_port = self._show('ports', port_id) + with test_plugin.optional_ctx( + private_port, self.port, + set_context=set_context, + tenant_id=tenant_id) as private_port: + with self.router(set_context=set_context, + tenant_id=tenant_id) as r: + sid = private_port['port']['fixed_ips'][0]['subnet_id'] + private_sub = {'subnet': {'id': sid}} + floatingip = None + + self._add_external_gateway_to_router( + r['router']['id'], + public_sub['subnet']['network_id']) + self._router_interface_action( + 'add', r['router']['id'], + private_sub['subnet']['id'], None) + + floatingip = self._make_floatingip( + fmt or self.fmt, + public_sub['subnet']['network_id'], + port_id=private_port['port']['id'], + fixed_ip=fixed_ip, + tenant_id=tenant_id, + set_context=set_context, + **kwargs) + yield floatingip + + if floatingip: + self._delete('floatingips', + floatingip['floatingip']['id']) + + @contextlib.contextmanager + def floatingip_no_assoc(self, private_sub, fmt=None, + set_context=False, flavor_id=None, **kwargs): + # override super code to create an external subnet in advanced + with self.external_subnet(cidr='12.0.0.0/24') as public_sub: + with self.floatingip_no_assoc_with_public_sub( + private_sub, fmt, set_context, public_sub, + flavor_id, **kwargs) as (f, r): + # Yield only the floating ip object + yield f + + +# Override subnet/network creation in some tests to create external +# networks immediately instead of updating it post creation, which the +# v3 plugin does not support +@decorator.decorator +def with_external_subnet(f, *args, **kwargs): + obj = args[0] + obj.subnet = obj.external_subnet + result = f(*args, **kwargs) + obj.subnet = obj.original_subnet + return result + + +def init_subnet_calls(self, n): + self.subnet_calls = [] + for i in range(0, n - 1): + self.subnet_calls.append(self.subnet) + self.subnet_calls.append(self.external_subnet) + + +def call_with_subnet_calls(self, f, *args, **kwargs): + self.subnet = self.external_subnet_by_list + result = f(*args, **kwargs) + self.subnet = self.original_subnet + return result + + +@decorator.decorator +def with_external_subnet_once(f, *args, **kwargs): + obj = args[0] + init_subnet_calls(obj, 1) + return call_with_subnet_calls(obj, f, *args, **kwargs) + + +@decorator.decorator +def with_external_subnet_second_time(f, *args, **kwargs): + obj = args[0] + init_subnet_calls(obj, 2) + return call_with_subnet_calls(obj, f, *args, **kwargs) + + +@decorator.decorator +def with_external_subnet_third_time(f, *args, **kwargs): + obj = args[0] + init_subnet_calls(obj, 3) + return call_with_subnet_calls(obj, f, *args, **kwargs) + + +@decorator.decorator +def with_external_network(f, *args, **kwargs): + obj = args[0] + obj.network = obj.external_network + result = f(*args, **kwargs) + obj.network = obj.original_network + return result diff --git a/vmware_nsx/tests/unit/nsx_p/test_plugin.py b/vmware_nsx/tests/unit/nsx_p/test_plugin.py index 1f7b21c2f1..d6f671e9ff 100644 --- a/vmware_nsx/tests/unit/nsx_p/test_plugin.py +++ b/vmware_nsx/tests/unit/nsx_p/test_plugin.py @@ -21,6 +21,7 @@ from webob import exc from neutron.extensions import securitygroup as secgrp from neutron.tests.unit.db import test_db_base_plugin_v2 +from neutron.tests.unit.extensions import test_l3 as test_l3_plugin from neutron.tests.unit.extensions import test_securitygroup from neutron_lib.api.definitions import external_net as extnet_apidef @@ -28,8 +29,15 @@ from neutron_lib.api.definitions import port_security as psec from neutron_lib.api.definitions import portbindings from neutron_lib.api.definitions import provider_net as pnet from neutron_lib.api.definitions import vlantransparent as vlan_apidef +from neutron_lib.callbacks import events +from neutron_lib.callbacks import registry +from neutron_lib.callbacks import resources +from neutron_lib import constants from neutron_lib import context +from neutron_lib.plugins import directory +from vmware_nsx.common import utils +from vmware_nsx.tests.unit.common_plugin import common_v3 from vmware_nsxlib.v3 import exceptions as nsxlib_exc from vmware_nsxlib.v3 import nsx_constants @@ -544,3 +552,373 @@ class NsxPTestSecurityGroup(NsxPPluginTestCaseMixin, psec.PORTSECURITY), **kwargs) self.assertEqual(res.status_int, exc.HTTPBadRequest.code) + + +class TestL3NatTestCase(common_v3.FixExternalNetBaseTest, + NsxPPluginTestCaseMixin, + test_l3_plugin.L3NatDBIntTestCase): + + # TODO(asarfaty): also add the tests from: + # test_l3_plugin.L3BaseForIntTests + # test_address_scope.AddressScopeTestCase + # test_ext_route.ExtraRouteDBTestCaseBase + def setUp(self, *args, **kwargs): + super(TestL3NatTestCase, self).setUp(*args, **kwargs) + self.original_subnet = self.subnet + self.original_network = self.network + + self.plugin_instance = directory.get_plugin() + self._plugin_name = "%s.%s" % ( + self.plugin_instance.__module__, + self.plugin_instance.__class__.__name__) + self._plugin_class = self.plugin_instance.__class__ + + def external_network(self, name='net1', + admin_state_up=True, + fmt=None, **kwargs): + if not name: + name = 'l3_ext_net' + physical_network = 'abc' + net_type = utils.NetworkTypes.L3_EXT + providernet_args = {pnet.NETWORK_TYPE: net_type, + pnet.PHYSICAL_NETWORK: physical_network} + return self.original_network(name=name, + admin_state_up=admin_state_up, + fmt=fmt, + router__external=True, + providernet_args=providernet_args, + arg_list=(pnet.NETWORK_TYPE, + pnet.PHYSICAL_NETWORK)) + + def _create_l3_ext_network(self, physical_network='abc'): + name = 'l3_ext_net' + net_type = utils.NetworkTypes.L3_EXT + providernet_args = {pnet.NETWORK_TYPE: net_type, + pnet.PHYSICAL_NETWORK: physical_network} + return self.network(name=name, + router__external=True, + providernet_args=providernet_args, + arg_list=(pnet.NETWORK_TYPE, + pnet.PHYSICAL_NETWORK)) + + def test_floatingip_create_different_fixed_ip_same_port(self): + self.skipTest('Multiple fixed ips on a port are not supported') + + def test_router_add_interface_multiple_ipv4_subnet_port_returns_400(self): + self.skipTest('Multiple fixed ips on a port are not supported') + + def test_router_add_interface_multiple_ipv6_subnet_port(self): + self.skipTest('Multiple fixed ips on a port are not supported') + + def test_floatingip_update_different_fixed_ip_same_port(self): + self.skipTest('Multiple fixed ips on a port are not supported') + + def test_create_multiple_floatingips_same_fixed_ip_same_port(self): + self.skipTest('Multiple fixed ips on a port are not supported') + + def test__notify_gateway_port_ip_changed(self): + self.skipTest('not supported') + + def test__notify_gateway_port_ip_not_changed(self): + self.skipTest('not supported') + + def test_floatingip_via_router_interface_returns_201(self): + self.skipTest('not supported') + + def test_floatingip_via_router_interface_returns_404(self): + self.skipTest('not supported') + + def test_network_update_external(self): + # This plugin does not support updating the external flag of a network + self.skipTest('not supported') + + def test_network_update_external_failure(self): + # This plugin does not support updating the external flag of a network + self.skipTest('not supported') + + def test_router_add_gateway_dup_subnet1_returns_400(self): + self.skipTest('not supported') + + def test_router_add_interface_dup_subnet2_returns_400(self): + self.skipTest('not supported') + + def test_router_add_interface_ipv6_port_existing_network_returns_400(self): + self.skipTest('not supported') + + def test_routes_update_for_multiple_routers(self): + self.skipTest('not supported') + + def test_floatingip_multi_external_one_internal(self): + self.skipTest('not supported') + + def test_floatingip_same_external_and_internal(self): + self.skipTest('not supported') + + def test_route_update_with_external_route(self): + self.skipTest('not supported') + + def test_floatingip_update_subnet_gateway_disabled(self): + self.skipTest('not supported') + + def test_floatingip_update_to_same_port_id_twice(self): + self.skipTest('Plugin changes floating port status') + + def test_router_add_interface_by_port_other_tenant_address_out_of_pool( + self): + # multiple fixed ips per port are not supported + self.skipTest('not supported') + + def test_router_add_interface_by_port_other_tenant_address_in_pool(self): + # multiple fixed ips per port are not supported + self.skipTest('not supported') + + def test_router_add_interface_by_port_admin_address_out_of_pool(self): + # multiple fixed ips per port are not supported + self.skipTest('not supported') + + def test_router_add_gateway_no_subnet(self): + self.skipTest('No support for no subnet gateway set') + + def test_create_router_gateway_fails(self): + self.skipTest('not supported') + + def test_router_remove_ipv6_subnet_from_interface(self): + self.skipTest('not supported') + + def test_router_add_interface_multiple_ipv6_subnets_same_net(self): + self.skipTest('not supported') + + def test_router_add_interface_multiple_ipv4_subnets(self): + self.skipTest('not supported') + + @common_v3.with_external_subnet + def test_router_update_gateway_with_external_ip_used_by_gw(self): + super(TestL3NatTestCase, + self).test_router_update_gateway_with_external_ip_used_by_gw() + + @common_v3.with_external_subnet + def test_router_update_gateway_with_invalid_external_ip(self): + super(TestL3NatTestCase, + self).test_router_update_gateway_with_invalid_external_ip() + + @common_v3.with_external_subnet + def test_router_update_gateway_with_invalid_external_subnet(self): + super(TestL3NatTestCase, + self).test_router_update_gateway_with_invalid_external_subnet() + + @common_v3.with_external_network + def test_router_update_gateway_with_different_external_subnet(self): + super(TestL3NatTestCase, + self).test_router_update_gateway_with_different_external_subnet() + + @common_v3.with_external_subnet_once + def test_router_update_gateway_with_existed_floatingip(self): + super(TestL3NatTestCase, + self).test_router_update_gateway_with_existed_floatingip() + + @common_v3.with_external_network + def test_router_update_gateway_add_multiple_prefixes_ipv6(self): + super(TestL3NatTestCase, + self).test_router_update_gateway_add_multiple_prefixes_ipv6() + + @common_v3.with_external_network + def test_router_concurrent_delete_upon_subnet_create(self): + super(TestL3NatTestCase, + self).test_router_concurrent_delete_upon_subnet_create() + + @common_v3.with_external_network + def test_router_update_gateway_upon_subnet_create_ipv6(self): + super(TestL3NatTestCase, + self).test_router_update_gateway_upon_subnet_create_ipv6() + + @common_v3.with_external_network + def test_router_update_gateway_upon_subnet_create_max_ips_ipv6(self): + super( + TestL3NatTestCase, + self).test_router_update_gateway_upon_subnet_create_max_ips_ipv6() + + @common_v3.with_external_subnet_second_time + def test_router_add_interface_cidr_overlapped_with_gateway(self): + super(TestL3NatTestCase, + self).test_router_add_interface_cidr_overlapped_with_gateway() + + @common_v3.with_external_subnet + def test_router_add_gateway_dup_subnet2_returns_400(self): + super(TestL3NatTestCase, + self).test_router_add_gateway_dup_subnet2_returns_400() + + @common_v3.with_external_subnet + def test_router_update_gateway(self): + super(TestL3NatTestCase, + self).test_router_update_gateway() + + @common_v3.with_external_subnet + def test_router_create_with_gwinfo(self): + super(TestL3NatTestCase, + self).test_router_create_with_gwinfo() + + @common_v3.with_external_subnet + def test_router_clear_gateway_callback_failure_returns_409(self): + super(TestL3NatTestCase, + self).test_router_clear_gateway_callback_failure_returns_409() + + @common_v3.with_external_subnet + def test_router_create_with_gwinfo_ext_ip(self): + super(TestL3NatTestCase, + self).test_router_create_with_gwinfo_ext_ip() + + @common_v3.with_external_network + def test_router_create_with_gwinfo_ext_ip_subnet(self): + super(TestL3NatTestCase, + self).test_router_create_with_gwinfo_ext_ip_subnet() + + @common_v3.with_external_subnet_second_time + def test_router_delete_with_floatingip_existed_returns_409(self): + super(TestL3NatTestCase, + self).test_router_delete_with_floatingip_existed_returns_409() + + @common_v3.with_external_subnet + def test_router_add_and_remove_gateway_tenant_ctx(self): + super(TestL3NatTestCase, + self).test_router_add_and_remove_gateway_tenant_ctx() + + @common_v3.with_external_subnet_second_time + def test_router_add_interface_by_port_cidr_overlapped_with_gateway(self): + super(TestL3NatTestCase, self).\ + test_router_add_interface_by_port_cidr_overlapped_with_gateway() + + @common_v3.with_external_network + def test_router_add_gateway_multiple_subnets_ipv6(self): + super(TestL3NatTestCase, + self).test_router_add_gateway_multiple_subnets_ipv6() + + @common_v3.with_external_subnet + def test_router_add_and_remove_gateway(self): + super(TestL3NatTestCase, + self).test_router_add_and_remove_gateway() + + @common_v3.with_external_subnet + def test_floatingip_list_with_sort(self): + super(TestL3NatTestCase, + self).test_floatingip_list_with_sort() + + @common_v3.with_external_subnet_once + def test_floatingip_with_assoc_fails(self): + super(TestL3NatTestCase, + self).test_floatingip_with_assoc_fails() + + @common_v3.with_external_subnet_second_time + def test_floatingip_update_same_fixed_ip_same_port(self): + super(TestL3NatTestCase, + self).test_floatingip_update_same_fixed_ip_same_port() + + @common_v3.with_external_subnet + def test_floatingip_list_with_pagination_reverse(self): + super(TestL3NatTestCase, + self).test_floatingip_list_with_pagination_reverse() + + @common_v3.with_external_subnet_once + def test_floatingip_association_on_unowned_router(self): + super(TestL3NatTestCase, + self).test_floatingip_association_on_unowned_router() + + @common_v3.with_external_network + def test_delete_ext_net_with_disassociated_floating_ips(self): + super(TestL3NatTestCase, + self).test_delete_ext_net_with_disassociated_floating_ips() + + @common_v3.with_external_network + def test_create_floatingip_with_subnet_and_invalid_fip_address(self): + super( + TestL3NatTestCase, + self).test_create_floatingip_with_subnet_and_invalid_fip_address() + + @common_v3.with_external_subnet + def test_create_floatingip_with_duplicated_specific_ip(self): + super(TestL3NatTestCase, + self).test_create_floatingip_with_duplicated_specific_ip() + + @common_v3.with_external_subnet + def test_create_floatingip_with_subnet_id_non_admin(self): + super(TestL3NatTestCase, + self).test_create_floatingip_with_subnet_id_non_admin() + + @common_v3.with_external_subnet + def test_floatingip_list_with_pagination(self): + super(TestL3NatTestCase, + self).test_floatingip_list_with_pagination() + + @common_v3.with_external_subnet + def test_create_floatingips_native_quotas(self): + super(TestL3NatTestCase, + self).test_create_floatingips_native_quotas() + + @common_v3.with_external_network + def test_create_floatingip_with_multisubnet_id(self): + super(TestL3NatTestCase, + self).test_create_floatingip_with_multisubnet_id() + + @common_v3.with_external_network + def test_create_floatingip_with_subnet_id_and_fip_address(self): + super(TestL3NatTestCase, + self).test_create_floatingip_with_subnet_id_and_fip_address() + + @common_v3.with_external_subnet + def test_create_floatingip_with_specific_ip(self): + super(TestL3NatTestCase, + self).test_create_floatingip_with_specific_ip() + + @common_v3.with_external_network + def test_create_floatingip_ipv6_and_ipv4_network_creates_ipv4(self): + super(TestL3NatTestCase, + self).test_create_floatingip_ipv6_and_ipv4_network_creates_ipv4() + + @common_v3.with_external_subnet_once + def test_create_floatingip_non_admin_context_agent_notification(self): + super( + TestL3NatTestCase, + self).test_create_floatingip_non_admin_context_agent_notification() + + @common_v3.with_external_subnet + def test_create_floatingip_no_ext_gateway_return_404(self): + super(TestL3NatTestCase, + self).test_create_floatingip_no_ext_gateway_return_404() + + @common_v3.with_external_subnet + def test_create_floatingip_with_specific_ip_out_of_allocation(self): + super(TestL3NatTestCase, + self).test_create_floatingip_with_specific_ip_out_of_allocation() + + @common_v3.with_external_subnet_third_time + def test_floatingip_update_different_router(self): + super(TestL3NatTestCase, + self).test_floatingip_update_different_router() + + def test_floatingip_update(self): + super(TestL3NatTestCase, self).test_floatingip_update( + expected_status=constants.FLOATINGIP_STATUS_DOWN) + + @common_v3.with_external_subnet_second_time + def test_floatingip_with_invalid_create_port(self): + self._test_floatingip_with_invalid_create_port(self._plugin_name) + + def test_router_add_gateway_notifications(self): + with self.router() as r,\ + self._create_l3_ext_network() as ext_net,\ + self.subnet(network=ext_net): + with mock.patch.object(registry, 'notify') as notify: + self._add_external_gateway_to_router( + r['router']['id'], ext_net['network']['id']) + expected = [mock.call( + resources.ROUTER_GATEWAY, + events.AFTER_CREATE, mock.ANY, + context=mock.ANY, gw_ips=mock.ANY, + network_id=mock.ANY, router_id=mock.ANY)] + notify.assert_has_calls(expected) + + def test_router_add_gateway_no_subnet_forbidden(self): + with self.router() as r: + with self._create_l3_ext_network() as n: + self._add_external_gateway_to_router( + r['router']['id'], n['network']['id'], + expected_code=exc.HTTPBadRequest.code) diff --git a/vmware_nsx/tests/unit/nsx_v3/test_plugin.py b/vmware_nsx/tests/unit/nsx_v3/test_plugin.py index 0bab66111b..02e38fe05f 100644 --- a/vmware_nsx/tests/unit/nsx_v3/test_plugin.py +++ b/vmware_nsx/tests/unit/nsx_v3/test_plugin.py @@ -13,8 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import contextlib - import decorator import mock @@ -61,6 +59,7 @@ from vmware_nsx.common import utils from vmware_nsx.plugins.nsx_v3 import plugin as nsx_plugin from vmware_nsx.services.lbaas.nsx_v3.v2 import lb_driver_v2 from vmware_nsx.tests import unit as vmware +from vmware_nsx.tests.unit.common_plugin import common_v3 from vmware_nsx.tests.unit.extensions import test_metadata from vmware_nsxlib.tests.unit.v3 import mocks as nsx_v3_mocks from vmware_nsxlib.tests.unit.v3 import nsxlib_testcase @@ -1895,6 +1894,7 @@ class TestL3ExtensionManager(object): class L3NatTest(test_l3_plugin.L3BaseForIntTests, NsxV3PluginTestCaseMixin, + common_v3.FixExternalNetBaseTest, test_address_scope.AddressScopeTestCase): def setUp(self, plugin=PLUGIN_NAME, ext_mgr=None, @@ -1932,22 +1932,6 @@ class L3NatTest(test_l3_plugin.L3BaseForIntTests, NsxV3PluginTestCaseMixin, # This action is not supported by the V3 plugin pass - def _create_external_network(self): - data = {'network': {'name': 'net1', - 'router:external': 'True', - 'tenant_id': 'tenant_one', - 'provider:physical_network': 'stam'}} - network_req = self.new_create_request('networks', data) - network = self.deserialize(self.fmt, - network_req.get_response(self.api)) - return network - - def external_subnet(self, **kwargs): - if 'network' in kwargs: - return self.original_subnet(**kwargs) - ext_net = self._create_external_network() - return self.original_subnet(network=ext_net, **kwargs) - def external_network(self, name='net1', admin_state_up=True, fmt=None, **kwargs): @@ -1965,16 +1949,6 @@ class L3NatTest(test_l3_plugin.L3BaseForIntTests, NsxV3PluginTestCaseMixin, arg_list=(pnet.NETWORK_TYPE, pnet.PHYSICAL_NETWORK)) - def external_subnet_by_list(self, *args, **kwargs): - if len(self.subnet_calls) > 0: - result = self.subnet_calls[0](*args, **kwargs) - del self.subnet_calls[0] - else: - # back to normal - self.subnet = self.original_subnet - result = self.subnet(*args, **kwargs) - return result - def test_floatingip_create_different_fixed_ip_same_port(self): self.skipTest('Multiple fixed ips on a port are not supported') @@ -2019,212 +1993,108 @@ class TestL3NatTestCase(L3NatTest, for k, v in expected: self.assertEqual(net['network'][k], v) - @contextlib.contextmanager - def floatingip_with_assoc(self, port_id=None, fmt=None, fixed_ip=None, - public_cidr='11.0.0.0/24', set_context=False, - tenant_id=None, **kwargs): - # Override super implementation to avoid changing the network to - # external after creation - with self._create_l3_ext_network() as ext_net,\ - self.subnet(network=ext_net, cidr=public_cidr, - set_context=set_context, - tenant_id=tenant_id) as public_sub: - private_port = None - if port_id: - private_port = self._show('ports', port_id) - with test_plugin.optional_ctx( - private_port, self.port, - set_context=set_context, - tenant_id=tenant_id) as private_port: - with self.router(set_context=set_context, - tenant_id=tenant_id) as r: - sid = private_port['port']['fixed_ips'][0]['subnet_id'] - private_sub = {'subnet': {'id': sid}} - floatingip = None - - self._add_external_gateway_to_router( - r['router']['id'], - public_sub['subnet']['network_id']) - self._router_interface_action( - 'add', r['router']['id'], - private_sub['subnet']['id'], None) - - floatingip = self._make_floatingip( - fmt or self.fmt, - public_sub['subnet']['network_id'], - port_id=private_port['port']['id'], - fixed_ip=fixed_ip, - tenant_id=tenant_id, - set_context=set_context, - **kwargs) - yield floatingip - - if floatingip: - self._delete('floatingips', - floatingip['floatingip']['id']) - - @contextlib.contextmanager - def floatingip_no_assoc(self, private_sub, fmt=None, - set_context=False, flavor_id=None, **kwargs): - # override super code to create an external subnet in advanced - with self.external_subnet(cidr='12.0.0.0/24') as public_sub: - with self.floatingip_no_assoc_with_public_sub( - private_sub, fmt, set_context, public_sub, - flavor_id, **kwargs) as (f, r): - # Yield only the floating ip object - yield f - - # Override subnet/network creation in some tests to create external - # networks immediately instead of updating it post creation, which the - # v3 plugin does not support - @decorator.decorator - def with_external_subnet(f, *args, **kwargs): - obj = args[0] - obj.subnet = obj.external_subnet - result = f(*args, **kwargs) - obj.subnet = obj.original_subnet - return result - - def _init_subnet_calls(self, n): - self.subnet_calls = [] - for i in range(0, n - 1): - self.subnet_calls.append(self.subnet) - self.subnet_calls.append(self.external_subnet) - - def _call_with_subnet_calls(self, f, *args, **kwargs): - self.subnet = self.external_subnet_by_list - result = f(*args, **kwargs) - self.subnet = self.original_subnet - return result - - @decorator.decorator - def with_external_subnet_once(f, *args, **kwargs): - obj = args[0] - obj._init_subnet_calls(1) - return obj._call_with_subnet_calls(f, *args, **kwargs) - - @decorator.decorator - def with_external_subnet_second_time(f, *args, **kwargs): - obj = args[0] - obj._init_subnet_calls(2) - return obj._call_with_subnet_calls(f, *args, **kwargs) - - @decorator.decorator - def with_external_subnet_third_time(f, *args, **kwargs): - obj = args[0] - obj._init_subnet_calls(3) - return obj._call_with_subnet_calls(f, *args, **kwargs) - - @decorator.decorator - def with_external_network(f, *args, **kwargs): - obj = args[0] - obj.network = obj.external_network - result = f(*args, **kwargs) - obj.network = obj.original_network - return result - - @with_external_subnet + @common_v3.with_external_subnet def test_router_update_gateway_with_external_ip_used_by_gw(self): super(TestL3NatTestCase, self).test_router_update_gateway_with_external_ip_used_by_gw() - @with_external_subnet + @common_v3.with_external_subnet def test_router_update_gateway_with_invalid_external_ip(self): super(TestL3NatTestCase, self).test_router_update_gateway_with_invalid_external_ip() - @with_external_subnet + @common_v3.with_external_subnet def test_router_update_gateway_with_invalid_external_subnet(self): super(TestL3NatTestCase, self).test_router_update_gateway_with_invalid_external_subnet() - @with_external_network + @common_v3.with_external_network def test_router_update_gateway_with_different_external_subnet(self): super(TestL3NatTestCase, self).test_router_update_gateway_with_different_external_subnet() - @with_external_subnet_once + @common_v3.with_external_subnet_once def test_router_update_gateway_with_existed_floatingip(self): super(TestL3NatTestCase, self).test_router_update_gateway_with_existed_floatingip() - @with_external_network + @common_v3.with_external_network def test_router_update_gateway_add_multiple_prefixes_ipv6(self): super(TestL3NatTestCase, self).test_router_update_gateway_add_multiple_prefixes_ipv6() - @with_external_network + @common_v3.with_external_network def test_router_concurrent_delete_upon_subnet_create(self): super(TestL3NatTestCase, self).test_router_concurrent_delete_upon_subnet_create() - @with_external_network + @common_v3.with_external_network def test_router_update_gateway_upon_subnet_create_ipv6(self): super(TestL3NatTestCase, self).test_router_update_gateway_upon_subnet_create_ipv6() - @with_external_network + @common_v3.with_external_network def test_router_update_gateway_upon_subnet_create_max_ips_ipv6(self): super( TestL3NatTestCase, self).test_router_update_gateway_upon_subnet_create_max_ips_ipv6() - @with_external_subnet_second_time + @common_v3.with_external_subnet_second_time def test_router_add_interface_cidr_overlapped_with_gateway(self): super(TestL3NatTestCase, self).test_router_add_interface_cidr_overlapped_with_gateway() - @with_external_subnet + @common_v3.with_external_subnet def test_router_add_gateway_dup_subnet2_returns_400(self): super(TestL3NatTestCase, self).test_router_add_gateway_dup_subnet2_returns_400() - @with_external_subnet + @common_v3.with_external_subnet def test_router_update_gateway(self): super(TestL3NatTestCase, self).test_router_update_gateway() - @with_external_subnet + @common_v3.with_external_subnet def test_router_create_with_gwinfo(self): super(TestL3NatTestCase, self).test_router_create_with_gwinfo() - @with_external_subnet + @common_v3.with_external_subnet def test_router_clear_gateway_callback_failure_returns_409(self): super(TestL3NatTestCase, self).test_router_clear_gateway_callback_failure_returns_409() - @with_external_subnet + @common_v3.with_external_subnet def test_router_create_with_gwinfo_ext_ip(self): super(TestL3NatTestCase, self).test_router_create_with_gwinfo_ext_ip() - @with_external_network + @common_v3.with_external_network def test_router_create_with_gwinfo_ext_ip_subnet(self): super(TestL3NatTestCase, self).test_router_create_with_gwinfo_ext_ip_subnet() - @with_external_subnet_second_time + @common_v3.with_external_subnet_second_time def test_router_delete_with_floatingip_existed_returns_409(self): super(TestL3NatTestCase, self).test_router_delete_with_floatingip_existed_returns_409() - @with_external_subnet + @common_v3.with_external_subnet def test_router_add_and_remove_gateway_tenant_ctx(self): super(TestL3NatTestCase, self).test_router_add_and_remove_gateway_tenant_ctx() - @with_external_subnet_second_time + @common_v3.with_external_subnet_second_time def test_router_add_interface_by_port_cidr_overlapped_with_gateway(self): super(TestL3NatTestCase, self).\ test_router_add_interface_by_port_cidr_overlapped_with_gateway() - @with_external_network + @common_v3.with_external_network def test_router_add_gateway_multiple_subnets_ipv6(self): super(TestL3NatTestCase, self).test_router_add_gateway_multiple_subnets_ipv6() - @with_external_subnet + @common_v3.with_external_subnet def test_router_add_and_remove_gateway(self): super(TestL3NatTestCase, self).test_router_add_and_remove_gateway() @@ -2241,99 +2111,99 @@ class TestL3NatTestCase(L3NatTest, def test_floatingip_via_router_interface_returns_404(self): self.skipTest('not supported') - @with_external_subnet + @common_v3.with_external_subnet def test_floatingip_list_with_sort(self): super(TestL3NatTestCase, self).test_floatingip_list_with_sort() - @with_external_subnet_once + @common_v3.with_external_subnet_once def test_floatingip_with_assoc_fails(self): super(TestL3NatTestCase, self).test_floatingip_with_assoc_fails() - @with_external_subnet_second_time + @common_v3.with_external_subnet_second_time def test_floatingip_update_same_fixed_ip_same_port(self): super(TestL3NatTestCase, self).test_floatingip_update_same_fixed_ip_same_port() - @with_external_subnet + @common_v3.with_external_subnet def test_floatingip_list_with_pagination_reverse(self): super(TestL3NatTestCase, self).test_floatingip_list_with_pagination_reverse() - @with_external_subnet_once + @common_v3.with_external_subnet_once def test_floatingip_association_on_unowned_router(self): super(TestL3NatTestCase, self).test_floatingip_association_on_unowned_router() - @with_external_network + @common_v3.with_external_network def test_delete_ext_net_with_disassociated_floating_ips(self): super(TestL3NatTestCase, self).test_delete_ext_net_with_disassociated_floating_ips() - @with_external_network + @common_v3.with_external_network def test_create_floatingip_with_subnet_and_invalid_fip_address(self): super( TestL3NatTestCase, self).test_create_floatingip_with_subnet_and_invalid_fip_address() - @with_external_subnet + @common_v3.with_external_subnet def test_create_floatingip_with_duplicated_specific_ip(self): super(TestL3NatTestCase, self).test_create_floatingip_with_duplicated_specific_ip() - @with_external_subnet + @common_v3.with_external_subnet def test_create_floatingip_with_subnet_id_non_admin(self): super(TestL3NatTestCase, self).test_create_floatingip_with_subnet_id_non_admin() - @with_external_subnet + @common_v3.with_external_subnet def test_floatingip_list_with_pagination(self): super(TestL3NatTestCase, self).test_floatingip_list_with_pagination() - @with_external_subnet + @common_v3.with_external_subnet def test_create_floatingips_native_quotas(self): super(TestL3NatTestCase, self).test_create_floatingips_native_quotas() - @with_external_network + @common_v3.with_external_network def test_create_floatingip_with_multisubnet_id(self): super(TestL3NatTestCase, self).test_create_floatingip_with_multisubnet_id() - @with_external_network + @common_v3.with_external_network def test_create_floatingip_with_subnet_id_and_fip_address(self): super(TestL3NatTestCase, self).test_create_floatingip_with_subnet_id_and_fip_address() - @with_external_subnet + @common_v3.with_external_subnet def test_create_floatingip_with_specific_ip(self): super(TestL3NatTestCase, self).test_create_floatingip_with_specific_ip() - @with_external_network + @common_v3.with_external_network def test_create_floatingip_ipv6_and_ipv4_network_creates_ipv4(self): super(TestL3NatTestCase, self).test_create_floatingip_ipv6_and_ipv4_network_creates_ipv4() - @with_external_subnet_once + @common_v3.with_external_subnet_once def test_create_floatingip_non_admin_context_agent_notification(self): super( TestL3NatTestCase, self).test_create_floatingip_non_admin_context_agent_notification() - @with_external_subnet + @common_v3.with_external_subnet def test_create_floatingip_no_ext_gateway_return_404(self): super(TestL3NatTestCase, self).test_create_floatingip_no_ext_gateway_return_404() - @with_external_subnet + @common_v3.with_external_subnet def test_create_floatingip_with_specific_ip_out_of_allocation(self): super(TestL3NatTestCase, self).test_create_floatingip_with_specific_ip_out_of_allocation() - @with_external_subnet_third_time + @common_v3.with_external_subnet_third_time def test_floatingip_update_different_router(self): super(TestL3NatTestCase, self).test_floatingip_update_different_router() @@ -2359,7 +2229,7 @@ class TestL3NatTestCase(L3NatTest, super(TestL3NatTestCase, self).test_floatingip_update( expected_status=constants.FLOATINGIP_STATUS_DOWN) - @with_external_subnet_second_time + @common_v3.with_external_subnet_second_time def test_floatingip_with_invalid_create_port(self): self._test_floatingip_with_invalid_create_port(self._plugin_name) @@ -3111,7 +2981,7 @@ class ExtGwModeTestCase(test_ext_gw_mode.ExtGwModeIntTestCase, obj.subnet = obj.original_subnet return result - @with_external_subnet + @common_v3.with_external_subnet def _test_router_update_ext_gwinfo(self, snat_input_value, snat_expected_value=False, expected_http_code=exc.HTTPOk.code): @@ -3120,11 +2990,11 @@ class ExtGwModeTestCase(test_ext_gw_mode.ExtGwModeIntTestCase, snat_expected_value=snat_expected_value, expected_http_code=expected_http_code) - @with_external_subnet + @common_v3.with_external_subnet def test_router_gateway_set_retry(self): super(ExtGwModeTestCase, self).test_router_gateway_set_retry() - @with_external_subnet + @common_v3.with_external_subnet def _test_router_create_show_ext_gwinfo(self, *args, **kwargs): return super(ExtGwModeTestCase, self)._test_router_create_show_ext_gwinfo(*args, **kwargs)