From aaa0cb8f12be43aea3c0cb513cae9295d433950e Mon Sep 17 00:00:00 2001 From: Roee Agiman Date: Wed, 29 Nov 2017 11:37:14 +0200 Subject: [PATCH] Added scenario: spawn instance with port associated to security group This test creates two ports on the same network, with a pre-made security group, and then spawning two instances and assigning them with the ports created - to verify that the security group is inherited and enforced. Change-Id: I40f8b0cac360594a65fb9f6023930af3322cac58 --- .../scenario/test_security_groups.py | 60 ++++++++++++++++--- 1 file changed, 51 insertions(+), 9 deletions(-) diff --git a/neutron_tempest_plugin/scenario/test_security_groups.py b/neutron_tempest_plugin/scenario/test_security_groups.py index 248d0bda8..9503fe361 100644 --- a/neutron_tempest_plugin/scenario/test_security_groups.py +++ b/neutron_tempest_plugin/scenario/test_security_groups.py @@ -26,13 +26,13 @@ from neutron_tempest_plugin.scenario import constants as const CONF = config.CONF -class NetworkDefaultSecGroupTest(base.BaseTempestTestCase): +class NetworkSecGroupTest(base.BaseTempestTestCase): credentials = ['primary', 'admin'] required_extensions = ['router', 'security-group'] @classmethod def resource_setup(cls): - super(NetworkDefaultSecGroupTest, cls).resource_setup() + super(NetworkSecGroupTest, cls).resource_setup() # setup basic topology for servers we can log into it cls.network = cls.create_network() cls.subnet = cls.create_subnet(cls.network) @@ -40,15 +40,26 @@ class NetworkDefaultSecGroupTest(base.BaseTempestTestCase): cls.create_router_interface(router['id'], cls.subnet['id']) cls.keypair = cls.create_keypair() - def create_vm_testing_sec_grp(self, num_servers=2, security_groups=None): + def create_vm_testing_sec_grp(self, num_servers=2, security_groups=None, + ports=None): + """Create instance for security group testing + :param num_servers (int): number of servers to spawn + :param security_groups (list): list of security groups + :param ports* (list): list of ports + *Needs to be the same length as num_servers + """ servers, fips, server_ssh_clients = ([], [], []) for i in range(num_servers): - servers.append(self.create_server( - flavor_ref=CONF.compute.flavor_ref, - image_ref=CONF.compute.image_ref, - key_name=self.keypair['name'], - networks=[{'uuid': self.network['id']}], - security_groups=security_groups)) + server_args = { + 'flavor_ref': CONF.compute.flavor_ref, + 'image_ref': CONF.compute.image_ref, + 'key_name': self.keypair['name'], + 'networks': [{'uuid': self.network['id']}], + 'security_groups': security_groups + } + if ports is not None: + server_args['networks'][0].update({'port': ports[i]['id']}) + servers.append(self.create_server(**server_args)) for i, server in enumerate(servers): waiters.wait_for_server_status( self.os_primary.servers_client, server['server']['id'], @@ -246,3 +257,34 @@ class NetworkDefaultSecGroupTest(base.BaseTempestTestCase): # make sure ICMP connectivity doesn't work from framework self.ping_ip_address(fips[0]['floating_ip_address'], should_succeed=False) + + @decorators.idempotent_id('f07d0159-8f9e-4faa-87f5-a869ab0ad488') + def test_multiple_ports_secgroup_inheritance(self): + """This test creates two ports with security groups, then + boots two instances and verify that the security group was + inherited properly and enforced in these instances. + """ + # create a security group and make it loginable and pingable + secgrp = self.os_primary.network_client.create_security_group( + name=data_utils.rand_name('secgrp')) + self.create_loginable_secgroup_rule( + secgroup_id=secgrp['security_group']['id']) + self.create_pingable_secgroup_rule( + secgroup_id=secgrp['security_group']['id']) + # add security group to cleanup + self.security_groups.append(secgrp['security_group']) + # create two ports with fixed IPs and the security group created + ports = [] + for i in range(2): + ports.append(self.create_port( + self.network, fixed_ips=[{'subnet_id': self.subnets[0]['id']}], + security_groups=[secgrp['security_group']['id']])) + # spawn instances with the ports created + server_ssh_clients, fips, servers = self.create_vm_testing_sec_grp( + ports=ports) + # verify ICMP reachability and ssh connectivity + for fip in fips: + self.ping_ip_address(fip['floating_ip_address']) + self.check_connectivity(fip['floating_ip_address'], + CONF.validation.image_ssh_user, + self.keypair['private_key'])