# Copyright (c) 2020 Red Hat, Inc. # # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import netaddr from neutron_lib import constants from neutron_tempest_plugin.common import ip from neutron_tempest_plugin.common import ssh from neutron_tempest_plugin import config from neutron_tempest_plugin.scenario import base from oslo_log import log as logging from tempest.lib.common.utils import data_utils from tempest.lib import decorators from tempest.lib import exceptions LOG = logging.getLogger(__name__) CONF = config.CONF MIN_VLAN_ID = 1 MAX_VLAN_ID = 4094 # TODO(eolivare): split upstream class in two classes: a base class and a class # with the tests. This is needed in order to import the base class here and to # not duplicate code class VlanTransparencyWhiteboxBaseTest(base.BaseTempestTestCase): credentials = ['primary', 'admin'] force_tenant_isolation = False keypairs_client = None servers_client = None required_extensions = ['vlan-transparent', 'allowed-address-pairs'] @classmethod def resource_setup(cls): super(VlanTransparencyWhiteboxBaseTest, cls).resource_setup() # setup basic topology for servers we can log into cls.rand_name = data_utils.rand_name( cls.__name__.rsplit('.', 1)[-1]) try: cls.network = cls.create_network(name=cls.rand_name, vlan_transparent=True) except exceptions.ServerFault as exc: msg = 'Backend does not support VLAN Transparency.' if exc.resp_body['message'] == msg: raise cls.skipException(msg) else: raise exc cls.subnet = cls.create_subnet(network=cls.network, name=cls.rand_name) cls.router = cls.create_router_by_client() cls.create_router_interface(cls.router['id'], cls.subnet['id']) cls.keypair = cls.create_keypair(name=cls.rand_name, client=cls.keypairs_client) cls.vms = {} cls.security_group = cls.create_security_group(name=cls.rand_name) cls.create_loginable_secgroup_rule( cls.security_group['id'], client=cls.client) # create security group rules for icmpv6 rulesets = [{'protocol': constants.PROTO_NAME_IPV6_ICMP, 'ethertype': 'IPv6', 'direction': 'ingress'}] cls.create_secgroup_rules( rulesets, cls.security_group['id'], client=cls.client) if CONF.neutron_plugin_options.default_image_is_advanced: cls.flavor_ref = CONF.compute.flavor_ref cls.image_ref = CONF.compute.image_ref else: cls.flavor_ref = \ CONF.neutron_plugin_options.advanced_image_flavor_ref cls.image_ref = CONF.neutron_plugin_options.advanced_image_ref @classmethod def skip_checks(cls): super(VlanTransparencyWhiteboxBaseTest, cls).skip_checks() if not (CONF.neutron_plugin_options.advanced_image_ref or CONF.neutron_plugin_options.default_image_is_advanced): raise cls.skipException( 'Advanced image is required to run these tests.') def _create_port_and_server(self, server_name, port_name, port_security=True, allowed_address_pairs=None, second_network=None): if port_security: sec_groups = [self.security_group['id']] else: sec_groups = None first_port_name = 'first_' + port_name if second_network else port_name self.vms[server_name]['port'] = self.create_port( network=self.network, name=first_port_name, security_groups=sec_groups, port_security_enabled=port_security, allowed_address_pairs=allowed_address_pairs) networks = [{'port': self.vms[server_name]['port']['id']}] if second_network: # Some test need servers with two ports: First one is used for # management (ssh access via FIP) and second one is used for VLAN # Transparency testing second_port_name = 'second_' + port_name self.vms[server_name]['second_port'] = self.create_port( network=second_network, name=second_port_name, security_groups=sec_groups, port_security_enabled=port_security) networks.append( {'port': self.vms[server_name]['second_port']['id']}) return self.create_server(flavor_ref=self.flavor_ref, image_ref=self.image_ref, key_name=self.keypair['name'], networks=networks, name=server_name, client=self.servers_client)['server'] def _configure_vlan_transparent(self, port, ssh_client, vlan_tag, vlan_ip): ip_command = ip.IPCommand(ssh_client=ssh_client) addresses = ip_command.list_addresses(port=port) port_iface = ip.get_port_device_name(addresses, port) subport_iface = ip_command.configure_vlan_transparent( port=port, vlan_tag=vlan_tag, ip_addresses=[vlan_ip]) for address in ip_command.list_addresses(ip_addresses=vlan_ip): self.assertEqual(subport_iface, address.device.name) self.assertEqual(port_iface, address.device.parent) break else: self.fail("Sub-port fixed IP not found on server.") def _configure_vlan_transparent_second_port( self, port, ssh_client, vlan_tag, mask): ip_command = ip.IPCommand(ssh_client=ssh_client) port_device = ip_command.get_nic_name_by_mac(port['mac_address']) subport_device = '{!s}.{!s}'.format(port_device, vlan_tag) ip_address = port['fixed_ips'][0]['ip_address'] ip_address_mask = '{!s}/{!s}'.format(ip_address, mask) # delete address from the untagged interface in case the address has # been assigned to it if ip_command.list_addresses(device=port_device, ip_addresses=[ip_address]): ip_command.delete_address(ip_address_mask, port_device) if netaddr.valid_ipv6(ip_address): LOG.debug('IPv6 routes are removed from the untagged interface' ' to avoid them to collide with the tagged one') # TODO(eolivare): implement list_routes for IPv6 in # neutron-tempest-plugin routesv6 = ip_command.execute( '-6', 'route', 'show', 'dev', port_device).splitlines() for route in routesv6: destv6 = route.strip().split()[0] # TODO(eolivare): implement delete_route for IPv6 in # neutron-tempest-plugin ip_command.execute( '-6', 'route', 'del', destv6, 'dev', port_device) # Create vlan interface (e.g.: eth1.100) ip_command.add_link(link=port_device, name=subport_device, link_type='vlan', segmentation_id=vlan_tag) ip_command.set_link(device=subport_device, state='up') ip_command.add_address(address=ip_address_mask, device=subport_device) for address in ip_command.list_addresses( ip_addresses=[ip_address]): self.assertEqual(subport_device, address.device.name) self.assertEqual(port_device, address.device.parent) break else: self.fail("Sub-port fixed IP not found on server.") def _create_ssh_client(self, ssh_ip): if CONF.neutron_plugin_options.default_image_is_advanced: username = CONF.validation.image_ssh_user else: username = CONF.neutron_plugin_options.advanced_image_ssh_user return ssh.Client(host=ssh_ip, username=username, pkey=self.keypair['private_key']) def _check_remote_connectivity_vlans(self, untagged=False, num_vlans=1, should_succeed=True): # All tests use two VMs, so this is a list of length 2 vms = list(self.vms.values()) # If untagged is True, num_vlans is ignored if untagged: self.check_remote_connectivity( vms[0]['ssh_client'], vms[1]['port']['fixed_ips'][0]['ip_address'], servers=[vms[0]['vm'], vms[1]['vm']], should_succeed=should_succeed) else: for i in range(num_vlans): self.check_remote_connectivity( vms[0]['ssh_client'], vms[1]['vlan_ipmasks'][i].split('/')[0], servers=[vms[0]['vm'], vms[1]['vm']], should_succeed=should_succeed) def _check_remote_connectivity_vlan_second_port(self): # All tests use two VMs, so this is a list of length 2 vms = list(self.vms.values()) dest_ip = vms[1]['second_port']['fixed_ips'][0]['ip_address'] # if second_port, num_vlans and untagged are ignored self.check_remote_connectivity( vms[0]['ssh_client'], dest_ip, servers=[vms[0]['vm'], vms[1]['vm']]) def _check_remote_connectivity_vlans_mtu(self): # All tests use two VMs, so this is a list of length 2 vms = list(self.vms.values()) # According to BZ1973445, MTU value from transparent VLAN interfaces # should be reduced by 4 new_mtu = self.network['mtu'] - 4 for vm in vms: ip_command = ip.IPCommand(ssh_client=vm['ssh_client']) for address in ip_command.list_addresses( ip_addresses=vm['vlan_ipmasks'][0].split('/')[0]): device = address.device.name break command = ['set', 'mtu', new_mtu, 'dev', device] ip_command.execute('link', *command) for pkt_size in range(new_mtu - 20, new_mtu + 20, 2): self.check_remote_connectivity( vms[0]['ssh_client'], vms[1]['vlan_ipmasks'][0].split('/')[0], servers=[vms[0]['vm'], vms[1]['vm']], mtu=pkt_size) def _create_multivlan_vlan_transparency_resources( self, num_vlans=1, port_security=True, use_allowed_address_pairs=False): vlan_tags = [] vlan_ipmask_templates = [] for i in range(num_vlans): while True: vlan_tag = data_utils.rand_int_id(start=MIN_VLAN_ID, end=MAX_VLAN_ID) vlan_ipmask_template = '192.168.%d.{ip_last_byte}/24' % ( vlan_tag % 256) if (vlan_tag not in vlan_tags and vlan_ipmask_template not in vlan_ipmask_templates): vlan_tags.append(vlan_tag) vlan_ipmask_templates.append(vlan_ipmask_template) break # Two servers are created with one port each one # VLANs are created on those ports for i in range(2): server_name = 'server-%s-%d' % (self.rand_name, i) port_name = 'port-%s-%d' % (self.rand_name, i) self.vms[server_name] = {} vlan_ipmasks_per_vm = [] for vlan_ipmask_template in vlan_ipmask_templates: vlan_ipmasks_per_vm.append(vlan_ipmask_template.format( ip_last_byte=(i + 1) * 10)) self.vms[server_name]['vlan_ipmasks'] = vlan_ipmasks_per_vm if use_allowed_address_pairs: allowed_address_pairs = [] for vlan_ipmask in vlan_ipmasks_per_vm: allowed_address_pairs.append({'ip_address': vlan_ipmask}) else: allowed_address_pairs = None self.vms[server_name]['vm'] = self._create_port_and_server( server_name=server_name, port_name=port_name, port_security=port_security, allowed_address_pairs=allowed_address_pairs) if self.network['router:external']: ssh_ip = self.vms[server_name]['port'][ 'fixed_ips'][0]['ip_address'] else: self.vms[server_name]['floating_ip'] = self.create_floatingip( port=self.vms[server_name]['port']) ssh_ip = self.vms[server_name]['floating_ip'][ 'floating_ip_address'] self.vms[server_name]['ssh_client'] = self._create_ssh_client( ssh_ip=ssh_ip) self.check_connectivity( host=ssh_ip, ssh_user=self.vms[server_name]['ssh_client'].username, ssh_key=self.vms[server_name]['ssh_client'].pkey) for j in range(num_vlans): self._configure_vlan_transparent( port=self.vms[server_name]['port'], ssh_client=self.vms[server_name]['ssh_client'], vlan_tag=vlan_tags[j], vlan_ip=self.vms[server_name]['vlan_ipmasks'][j]) if port_security: # Ping from vm0 to vm1 via VLAN interfaces should fail because # we haven't allowed ICMP self._check_remote_connectivity_vlans(num_vlans=num_vlans, should_succeed=False) # allow intra-security-group traffic sg_rule = self.create_pingable_secgroup_rule( self.security_group['id'], client=self.client) self.addCleanup( self.client.delete_security_group_rule, sg_rule['id']) class MultiVlanTransparencyTest(VlanTransparencyWhiteboxBaseTest): num_vlans = 4 @decorators.idempotent_id('2e2909cc-8d1d-46ae-983e-ad87376736d4') def test_multivlan_transparent_port_sec_disabled(self): self._create_multivlan_vlan_transparency_resources( num_vlans=self.num_vlans, port_security=False, use_allowed_address_pairs=False) self._check_remote_connectivity_vlans(num_vlans=self.num_vlans) self._check_remote_connectivity_vlans(untagged=True) @decorators.idempotent_id('853a00ce-ae5c-456c-be5e-54e2c28da891') def test_multivlan_transparent_allowed_address_pairs(self): self._create_multivlan_vlan_transparency_resources( num_vlans=self.num_vlans, port_security=True, use_allowed_address_pairs=True) self._check_remote_connectivity_vlans(num_vlans=self.num_vlans) self._check_remote_connectivity_vlans(untagged=True) @decorators.idempotent_id('bde09eef-310c-4353-9d37-fe479e88ee01') def test_vlan_transparent_different_vlans_drops(self): vlan_tag_vm0 = data_utils.rand_int_id(start=MIN_VLAN_ID, end=MAX_VLAN_ID - 1) vlan_tag_vm1 = vlan_tag_vm0 + 1 vlan_tags = [vlan_tag_vm0, vlan_tag_vm1] vlan_ipmasks = ['192.168.0.10/24', '192.168.0.20/24'] # Two servers are created with one port each one # VLANs are created on those ports for i in range(2): server_name = 'server-%s-%d' % (self.rand_name, i) port_name = 'port-%s-%d' % (self.rand_name, i) self.vms[server_name] = {} self.vms[server_name]['vlan_ipmasks'] = [vlan_ipmasks[i]] self.vms[server_name]['vm'] = self._create_port_and_server( server_name=server_name, port_name=port_name, port_security=False) self.vms[server_name]['floating_ip'] = self.create_floatingip( port=self.vms[server_name]['port']) ssh_ip = self.vms[server_name]['floating_ip'][ 'floating_ip_address'] self.vms[server_name]['ssh_client'] = self._create_ssh_client( ssh_ip=ssh_ip) self.check_connectivity( host=self.vms[server_name]['floating_ip'][ 'floating_ip_address'], ssh_user=self.vms[server_name]['ssh_client'].username, ssh_key=self.vms[server_name]['ssh_client'].pkey) self._configure_vlan_transparent( port=self.vms[server_name]['port'], ssh_client=self.vms[server_name]['ssh_client'], vlan_tag=vlan_tags[i], vlan_ip=vlan_ipmasks[i]) self._check_remote_connectivity_vlans(should_succeed=False) self._check_remote_connectivity_vlans(untagged=True) class MultiPortVlanTransparencyTest(VlanTransparencyWhiteboxBaseTest): def _test_vlan_transparent_connectivity_two_ports( self, vlan_cidr, port_security, ip_version=4): vlan_tag = data_utils.rand_int_id(start=MIN_VLAN_ID, end=MAX_VLAN_ID - 1) second_rand_name = 'second_' + self.rand_name second_network = self.create_network( name=second_rand_name, vlan_transparent=True, port_security_enabled=port_security) self.create_subnet( network=second_network, name=second_rand_name, cidr=vlan_cidr, enable_dhcp=False, ip_version=ip_version) # Two servers are created with two ports each one # First port is for management (ssh access via FIP) # Second port is used for VLAN Transparent testing for i in range(2): server_name = 'server-%s-%d' % (self.rand_name, i) port_name = 'port-%s-%d' % (self.rand_name, i) self.vms[server_name] = {} self.vms[server_name]['vm'] = self._create_port_and_server( server_name=server_name, port_name=port_name, second_network=second_network) self.vms[server_name]['floating_ip'] = self.create_floatingip( port=self.vms[server_name]['port']) ssh_ip = self.vms[server_name]['floating_ip'][ 'floating_ip_address'] self.vms[server_name]['ssh_client'] = self._create_ssh_client( ssh_ip=ssh_ip) self.check_connectivity( host=self.vms[server_name]['floating_ip'][ 'floating_ip_address'], ssh_user=self.vms[server_name]['ssh_client'].username, ssh_key=self.vms[server_name]['ssh_client'].pkey) self._configure_vlan_transparent_second_port( port=self.vms[server_name]['second_port'], ssh_client=self.vms[server_name]['ssh_client'], vlan_tag=vlan_tag, mask=vlan_cidr.split('/')[1]) if port_security: # allow icmp traffic sg_rule = self.create_pingable_secgroup_rule( self.security_group['id'], client=self.client) self.addCleanup( self.client.delete_security_group_rule, sg_rule['id']) self._check_remote_connectivity_vlan_second_port() @decorators.idempotent_id('d7653461-46ae-4d55-9ad1-85ef66084739') def test_vlan_transparent_connectivity_two_ports(self): self._test_vlan_transparent_connectivity_two_ports( vlan_cidr='192.168.100.0/24', port_security=True) @decorators.idempotent_id('4953921d-f94b-45cb-b569-be0eb6720df2') def test_vlan_transparency_ipv6_port_sec_disabled(self): self._test_vlan_transparent_connectivity_two_ports( vlan_cidr='2001:db8::/64', port_security=False, ip_version=6) @decorators.idempotent_id('07a3ad80-d6b0-4644-8a33-198dabae6b2f') def test_vlan_transparency_ipv6_port_sec_enabled(self): self._test_vlan_transparent_connectivity_two_ports( vlan_cidr='2001:db9::/64', port_security=True, ip_version=6) class MtuVlanTransparencyTest(VlanTransparencyWhiteboxBaseTest): @decorators.idempotent_id('8e51bc66-c19e-44d5-860a-7e42a1aea722') def test_vlan_transparent_packet_length_greater_mtu(self): self._create_multivlan_vlan_transparency_resources( port_security=False, use_allowed_address_pairs=False) self._check_remote_connectivity_vlans_mtu() class ProviderNetworkVlanTransparencyTest(VlanTransparencyWhiteboxBaseTest): num_vlans = 1 @classmethod def setup_clients(cls): super(ProviderNetworkVlanTransparencyTest, cls).setup_clients() cls.client = cls.os_adm.network_client cls.keypairs_client = cls.os_adm.keypairs_client cls.servers_client = cls.os_adm.servers_client @classmethod def resource_setup(cls): super(ProviderNetworkVlanTransparencyTest, cls).resource_setup() # no tenant network is used during this test # existing external network is used instead cls.network = cls.os_admin.network_client.list_networks(**{ 'admin_state_up': True, 'router:external': True})['networks'][-1] if cls.network['vlan_transparent'] is not True: raise cls.skipException( 'This test are only executed when VLAN Transparency is ' 'enabled on the external network.') if cls.network['shared'] is not True: cls.addClassResourceCleanup( cls.os_admin.network_client.update_network, cls.network['id'], shared=False) cls.os_admin.network_client.update_network( cls.network['id'], shared=True) @decorators.idempotent_id('8e51bc66-c19e-44d5-860a-7e42a1aea722') def test_vlan_transparent_provider_network_port_sec_disabled(self): self._create_multivlan_vlan_transparency_resources( num_vlans=self.num_vlans, port_security=False, use_allowed_address_pairs=False) self._check_remote_connectivity_vlans(num_vlans=self.num_vlans) self._check_remote_connectivity_vlans(untagged=True) @decorators.idempotent_id('ab472bfd-2a2b-4731-a018-c6adc87994d0') def test_vlan_transparent_provider_network_port_sec_enabled(self): self._create_multivlan_vlan_transparency_resources( num_vlans=self.num_vlans, port_security=True, use_allowed_address_pairs=True) self._check_remote_connectivity_vlans(num_vlans=self.num_vlans) self._check_remote_connectivity_vlans(untagged=True)