# Copyright 2024 Red Hat, Inc. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. from neutron_tempest_plugin import config from neutron_tempest_plugin.scenario import constants from tempest.common import waiters from tempest.lib.common.utils import data_utils from tempest.lib.common.utils import test_utils from tempest.lib import decorators import testtools from whitebox_neutron_tempest_plugin.tests.scenario import base CONF = config.CONF WB_CONF = config.CONF.whitebox_neutron_plugin_options class NetworkPortTestManyVmsBase(base.BaseTempestWhiteboxTestCase): credentials = ['primary', 'admin'] def _update_quota(self, quota_item, quota_value): quota_set = self.quotas_client.show_quota_set( self.client.tenant_id)['quota_set'] default_quota_value = quota_set[quota_item] self.quotas_client.update_quota_set( self.client.tenant_id, force=True, **{quota_item: quota_value}) self.addCleanup( self.quotas_client.update_quota_set, self.client.tenant_id, **{quota_item: default_quota_value}) @testtools.skipIf(CONF.compute_feature_enabled.console_output, 'With console_output set to enabled, VMs are spawned ' 'too slowly.') def _test_port_status_when_many_vms( self, stop_vms_before_compute_reboot=False): if not self.image_ref: raise self.skipException( "Default image is too heavy for launching many VMs. " "Alternative image was not found.") network = self.create_network() self.create_subnet(network, cidr="192.168.1.0/24") self.keypair = self.create_keypair() servers_count = WB_CONF.servers_count self.flavors_client = self.os_admin.compute.FlavorsClient() self.quotas_client = self.os_admin.compute.QuotasClient() vcpus = self.flavors_client.show_flavor( self.flavor_ref)['flavor']['vcpus'] self._update_quota('cores', servers_count * vcpus) self._update_quota('instances', servers_count) name = self._testMethodName params = { 'flavor_ref': self.flavor_ref, 'image_ref': self.image_ref, 'networks': [{'uuid': network['id']}], 'key_name': self.keypair['name'], 'name': data_utils.rand_name(name) } server = self.create_server(**params)['server'] params.update({ 'min_count': servers_count - 1, 'scheduler_hints': {'same_host': server['id']}}) self.create_server(**params) client = self.os_primary.servers_client servers = client.list_servers(**{'name': name})['servers'] for server in servers: self.addCleanup( test_utils.call_and_ignore_notfound_exc, waiters.wait_for_server_termination, client, server['id']) self.addCleanup( test_utils.call_and_ignore_notfound_exc, client.delete_server, server['id']) for server in servers: # since the test creates a high number of servers, it may take # a long time until they are ACTIVE self.wait_for_server_status( server, constants.SERVER_STATUS_ACTIVE, extra_timeout=300) if stop_vms_before_compute_reboot: for server in servers: client.stop_server(server['id']) for server in servers: waiters.wait_for_server_status( client, server['id'], 'SHUTOFF') az_list = self.os_admin.az_client.list_availability_zones(detail=True) zones = [zone['zoneName'] for zone in az_list['availabilityZoneInfo'] if zone['zoneName'] != 'internal'] ports = [] for zone in zones: zone_ports = self.client.list_ports( network_id=network['id'], device_owner='compute:' + zone)['ports'] ports.extend(zone_ports) self.assertEqual( len(ports), servers_count, 'Number of ports does not match number of servers.') if not stop_vms_before_compute_reboot: for port in ports: self._check_port(port) def _check_port(self, port): self.assertEqual( port['status'], 'ACTIVE', 'A nonactive port found') class NetworkPortTestManyVmsOvn(NetworkPortTestManyVmsBase, base.BaseTempestTestCaseOvn): def _get_nbdb_port_state(self, port_id): cmd = "%s %s get logical_switch_port %s up" % (self.nbctl, self.opt, port_id) return self.run_on_master_controller(cmd).rstrip() def _check_port(self, port): super(NetworkPortTestManyVmsOvn, self)._check_port(port) self.assertEqual( self._get_nbdb_port_state(port['id']), 'true', 'Port is not up in NB DB') @decorators.idempotent_id('6a6d4ae6-2d61-4b0e-b8fb-5aef5292dccc') def test_port_status_when_many_vms_ovn(self): self.opt = "--no-leader-only" self._test_port_status_when_many_vms()