[Tempest]: dns-search-domain scenario/negative tests

1. Validate that dns-search-domain work with different type of
   routers and can be resolved or not depending on valid
   dns_search_domain set or not.

   a. valid search-domain defined at CONF.network.dns_search_domain
      dns_search_domain can be resolved.
   b. subnet without dns-search-domain set
      host in CONF.network.host_in_dns_search_domain
      cannot be resolved.
   c. host in_dns_search_domain_cannot be resolved if
      dns_search_domain does not contains the
      host_in_dns_search_domain

2. Fixed other tests along with this patch due to upstream
   tempest config attr tenant_networks_x changed to
   project_networks_x, and network client is splited into
   networks/subnets/routers/ports/security_groups and
   security_group_rules clients.

3. lbaasv2 test_create_pool_invalid_tenant_id is being
   resolved. Remove skip_because bug.

Change-Id: I8c758f7f55c4e5eb79379764b3e19dfc442471ff
This commit is contained in:
Alex Kang 2016-04-08 12:12:22 -07:00
parent 70e92ba585
commit d7d9eb9e35
11 changed files with 408 additions and 95 deletions

View File

@ -267,7 +267,6 @@ class TestPools(base.BaseTestCase):
@test.attr(type='negative')
@test.idempotent_id('94949cd4-ebc1-4af5-a220-9ebb32772fbc')
@decorators.skip_because(bug="1639223")
def test_create_pool_invalid_tenant_id_field(self):
"""Test create pool with invalid tenant_id field"""
self.increment_protocol_port()

View File

@ -0,0 +1,55 @@
# Copyright 2016 VMware Inc
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.api.network import base
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib import exceptions
from tempest import test
CONF = config.CONF
class DnsSearchDoaminsNegativeTest(base.BaseAdminNetworkTest):
@classmethod
def skip_checks(cls):
super(DnsSearchDoaminsNegativeTest, cls).skip_checks()
def create_network_with_bad_dns_search_domain(
self, dns_search_domain="vmware@com"):
networks_client = self.networks_client
subnets_client = self.subnets_client
network_name = data_utils.rand_name('dns-sear-negative')
resp = networks_client.create_network(name=network_name)
network = resp.get('network', resp)
self.addCleanup(self._try_delete_resource,
networks_client.delete_network,
network['id'])
subnet_cfg = {
'client': subnets_client,
'name': network_name,
'dns_search_domain': dns_search_domain}
# should trigger exception of BadRequest with message:
# Invalid input for dns_search_domain: ...
resp = self.create_subnet(network, **subnet_cfg)
subnet = resp.get('subnet', resp)
return (network, subnet)
@test.attr(type=['negative'])
@test.idempotent_id('11bdc214-10d7-4926-8f49-2da3d8719143')
def test_create_dns_search_domain_negative(self):
self.assertRaises(exceptions.BadRequest,
self.create_network_with_bad_dns_search_domain)

View File

@ -139,7 +139,7 @@ class MultipleTransportZonesTest(base.BaseAdminNetworkTest):
self.delete_network(net_id)
def create_router_by_type(self, router_type, name=None, **kwargs):
router_client = self.admin_client
routers_client = self.admin_manager.routers_client
router_name = name or data_utils.rand_name('mtz-')
create_kwargs = dict(name=router_name, external_gateway_info={
"network_id": CONF.network.public_network_id})
@ -148,15 +148,15 @@ class MultipleTransportZonesTest(base.BaseAdminNetworkTest):
elif router_type in ('distributed'):
create_kwargs['distributed'] = True
kwargs.update(create_kwargs)
router = router_client.create_router(**kwargs)
router = routers_client.create_router(**kwargs)
router = router['router'] if 'router' in router else router
self.addCleanup(self._try_delete_resource,
router_client.delete_router, router['id'])
routers_client.delete_router, router['id'])
self.assertEqual(router['name'], router_name)
return (router_client, router)
return (routers_client, router)
def create_router_and_add_interfaces(self, router_type, nets):
(router_client, router) = self.create_router_by_type(router_type)
(routers_client, router) = self.create_router_by_type(router_type)
if router_type == 'exclusive':
router_nsxv_name = '%s-%s' % (router['name'], router['id'])
exc_edge = self.vsm.get_edge(router_nsxv_name)
@ -167,20 +167,20 @@ class MultipleTransportZonesTest(base.BaseAdminNetworkTest):
# and router can be deleted if test is aborted.
self.addCleanup(
self._try_delete_resource,
router_client.remove_router_interface_with_subnet_id,
router['id'], subnet['id'])
router_client.add_router_interface_with_subnet_id(
routers_client.remove_router_interface,
router['id'], subnet_id=subnet['id'])
routers_client.add_router_interface(
router['id'], subnet_id=subnet['id'])
return router
def clear_router_gateway_and_interfaces(self, router, nets):
router_client = self.admin_client
router_client.update_router(router['id'],
routers_client = self.admin_manager.routers_client
routers_client.update_router(router['id'],
external_gateway_info=dict())
for net_id, (s_id, network, subnet) in six.iteritems(nets):
try:
router_client.remove_router_interface_with_subnet_id(
router['id'], subnet['id'])
routers_client.remove_router_interface(
router['id'], subnet_id=subnet['id'])
except Exception:
pass

View File

@ -64,8 +64,8 @@ class ExcRouterTest(base.BaseRouterTest):
backend create service for the exclusive router.
"""
name = data_utils.rand_name('router-')
router = self.client.create_router(
name, external_gateway_info={
router = self.routers_client.create_router(
name=name, external_gateway_info={
"network_id": CONF.network.public_network_id},
admin_state_up=False, router_type='exclusive')
self.addCleanup(self._delete_router, router['router']['id'])
@ -83,15 +83,15 @@ class ExcRouterTest(base.BaseRouterTest):
Test update an exclusive router
"""
name = data_utils.rand_name('router-')
router = self.client.create_router(
name, external_gateway_info={
router = self.routers_client.create_router(
name=name, external_gateway_info={
"network_id": CONF.network.public_network_id},
admin_state_up=False, router_type='exclusive')
self.addCleanup(self._delete_router, router['router']['id'])
self.assertEqual(router['router']['name'], name)
updated_name = 'updated' + name
update_body = self.client.update_router(router['router']['id'],
name=updated_name)
update_body = self.routers_client.update_router(
router['router']['id'], name=updated_name)
self.assertEqual(update_body['router']['name'], updated_name)
@test.attr(type='nsxv')
@ -101,18 +101,18 @@ class ExcRouterTest(base.BaseRouterTest):
Test list and show exclusive router.
"""
name = data_utils.rand_name('router-')
router = self.client.create_router(
name, external_gateway_info={
router = self.routers_client.create_router(
name=name, external_gateway_info={
"network_id": CONF.network.public_network_id},
admin_state_up=False, router_type='exclusive')
self.addCleanup(self._delete_router, router['router']['id'])
self.assertEqual(router['router']['name'], name)
# Show details of exclusive router
show_body = self.client.show_router(router['router']['id'])
show_body = self.routers_client.show_router(router['router']['id'])
self.assertEqual(show_body['router']['name'], name)
self.assertEqual(show_body['router']['admin_state_up'], False)
# List routers and verify if created router in list
list_body = self.client.list_routers()
list_body = self.routers_client.list_routers()
routers_list = [r['id'] for r in list_body['routers']]
self.assertIn(router['router']['id'], routers_list)
@ -123,20 +123,20 @@ class ExcRouterTest(base.BaseRouterTest):
Test create, update, and delete an exclusive router
"""
name = data_utils.rand_name('router-')
router = self.client.create_router(
name, external_gateway_info={
router = self.routers_client.create_router(
name=name, external_gateway_info={
"network_id": CONF.network.public_network_id},
admin_state_up=False, router_type='exclusive')
self.assertEqual(router['router']['name'], name)
# Update the name of the exclusive router
updated_name = 'updated' + name
update_body = self.client.update_router(router['router']['id'],
name=updated_name)
update_body = self.routers_client.update_router(
router['router']['id'], name=updated_name)
self.assertEqual(update_body['router']['name'], updated_name)
# Delete the exclusive router and verify it has been deleted
# from nsxv backend
self.client.delete_router(router['router']['id'])
list_body = self.client.list_routers()
self.routers_client.delete_router(router['router']['id'])
list_body = self.routers_client.list_routers()
routers_list = [r['id'] for r in list_body['routers']]
self.assertNotIn(router['router']['id'], routers_list)
nsxv_edge_name = "%s-%s" % (name, router['router']['id'])
@ -167,21 +167,21 @@ class ExcRouterTest(base.BaseRouterTest):
del_waitfor=10.0,
del_interval=1.5):
name = data_utils.rand_name('rtr-%s' % router_size)
router = self.client.create_router(
name, external_gateway_info={
router = self.routers_client.create_router(
name=name, external_gateway_info={
"network_id": CONF.network.public_network_id},
admin_state_up=False, router_type='exclusive',
router_size=router_size)
self.assertEqual(router['router']['name'], name)
# Update the name of the exclusive router
updated_name = 'updated' + name
update_body = self.client.update_router(router['router']['id'],
name=updated_name)
update_body = self.routers_client.update_router(
router['router']['id'], name=updated_name)
self.assertEqual(update_body['router']['name'], updated_name)
# Delete the exclusive router and verify it has been deleted
# from nsxv backend
self.client.delete_router(router['router']['id'])
list_body = self.client.list_routers()
self.routers_client.delete_router(router['router']['id'])
list_body = self.routers_client.list_routers()
routers_list = [r['id'] for r in list_body['routers']]
self.assertNotIn(router['router']['id'], routers_list)
nsxv_edge_name = "%s-%s" % (name, router['router']['id'])

View File

@ -128,10 +128,9 @@ class TopoDeployScenarioManager(manager.NetworkScenarioTest):
def _create_router(self, client_mgr=None, tenant_id=None,
namestart='topo-deploy', **kwargs):
client_mgr = client_mgr or self.manager
router_client = client_mgr.network_client
routers_client = getattr(client_mgr, "routers_client")
if not tenant_id:
tenant_id = router_client.tenant_id
tenant_id = routers_client.tenant_id
distributed = kwargs.pop('distributed', None)
router_type = kwargs.pop('router_type', None)
if distributed in (True, False):
@ -139,11 +138,12 @@ class TopoDeployScenarioManager(manager.NetworkScenarioTest):
elif router_type in ('shared', 'exclusive'):
kwargs['router_type'] = router_type
name = data_utils.rand_name(namestart)
result = router_client.create_router(name=name,
result = routers_client.create_router(name=name,
admin_state_up=True,
tenant_id=tenant_id,
**kwargs)
router = net_resources.DeletableRouter(client=router_client,
router = net_resources.DeletableRouter(client=routers_client,
routers_client=routers_client,
**result['router'])
self.assertEqual(router.name, name)
self.addCleanup(self.delete_wrapper, router.delete)
@ -687,9 +687,18 @@ def get_remote_client_by_password(client_ip, username, password):
return ssh_client
def delete_all_servers(tenant_servers_client):
def delete_all_servers(tenant_servers_client, trys=3):
# try at least trys+1 time to delete servers, otherwise
# network resources can not be deleted
for s in tenant_servers_client.list_servers()['servers']:
tenant_servers_client.delete_server(s['id'])
for x in range(0, trys):
try:
waitfor_servers_terminated(tenant_servers_client)
return
except Exception:
pass
# last try
waitfor_servers_terminated(tenant_servers_client)

View File

@ -71,28 +71,25 @@ class DeletableRouter(n_resources.DeletableRouter):
def add_interface(self, subnet):
# should not let subnet add interface to router as
# the router might be crated by admin.
"""
self.client.add_router_interface_with_subnbet_id(
self.id, subnet_id=subnet.id)
"""
x_method(self.client, 'add_router_interface_with_subnet_id',
self.id, subnet_id=subnet.id)
try:
self.client.add_router_interface(
self.id, subnet_id=subnet.id)
except Exception:
x_method(self.client, 'add_router_interface_with_subnet_id',
self.id, subnet_id=subnet.id)
self._subnets.add(subnet)
def delete_subnet(self, subnet):
return self.delete_interface(subnet)
def delete_interface(self, subnet):
"""
self.client.remove_router_interface_with_subnet_id(
self.id, subnet_id=subnet.id)
"""
x_method(self.client, 'remove_router_interface_with_subnet_id',
self.id, subnet_id=subnet.id)
try:
self._subnets.remove(subnet)
self.client.remove_router_interface(
self.id, subnet_id=subnet.id)
except Exception:
pass
x_method(self.client, 'remove_router_interface_with_subnet_id',
self.id, subnet_id=subnet.id)
self._subnets.remove(subnet)
def update_extra_routes(self, nexthop, destination):
return self.client.update_extra_routes(self.id, nexthop, destination)
@ -120,6 +117,6 @@ def x_method(target_obj, method_name, *args, **kwargs):
_method = getattr(target_obj, method_name, None)
if _method is None:
raise Exception("Method[%s] is not defined at instance[%s]" %
method_name, str(target_obj))
(method_name, str(target_obj)))
results = _method(*args, **kwargs)
return results

View File

@ -49,7 +49,6 @@ class TestSimpleFlatNetwork(dmgr.TopoDeployScenarioManager):
def setUp(self):
super(TestSimpleFlatNetwork, self).setUp()
self.admin_client = self.admin_manager.network_client
self.info_flat1 = FLAT_ALLOC_DICT
def tearDown(self):
@ -92,7 +91,9 @@ class TestSimpleFlatNetwork(dmgr.TopoDeployScenarioManager):
self.net_subnet = self.create_subnet(self.net_network, self.info_flat1)
# tenant actions
self.security_group = self._create_security_group(
client=self.network_client, namestart='FLAT-tenant')
security_groups_client=self.security_groups_client,
security_group_rules_client=self.security_group_rules_client,
namestart='FLAT-tenant')
security_groups = [{'name': self.security_group['name']}]
self.serv1 = self.create_server_on_network(
self.net_network, security_groups,
@ -143,7 +144,8 @@ class TestTenantConnectivity(dmgr.TopoDeployScenarioManager):
# create security_group with loginable rules
self.security_group = self._create_security_group(
security_groups_client=client_mgr.security_groups_client,
client=client_mgr.network_client, namestart='deploy-connect')
security_group_rules_client=client_mgr.security_group_rules_client,
namestart='deploy-connect')
self.network, self.subnet, self.router = self.setup_project_network(
self.public_network_id, namestart='deploy-connect')
self.check_networks(self.network, self.subnet, self.router)
@ -239,8 +241,8 @@ class TestMultiTenantsNetwork(dmgr.TopoDeployScenarioManager):
cidr_offset=0):
username, password = self.get_image_userpass()
t_security_group = self._create_security_group(
client=client_mgr.network_client,
security_groups_client=client_mgr.security_groups_client,
security_group_rules_client=client_mgr.security_group_rules_client,
namestart="deploy-multi-tenant")
t_network, t_subnet, t_router = self.setup_project_network(
self.public_network_id, client_mgr,
@ -340,7 +342,6 @@ class TestProviderRouterTenantNetwork(dmgr.TopoDeployScenarioManager):
def setUp(self):
super(TestProviderRouterTenantNetwork, self).setUp()
self.admin_client = self.admin_manager.network_client
def tearDown(self):
# do mini teardown if test failed already
@ -379,7 +380,8 @@ class TestProviderRouterTenantNetwork(dmgr.TopoDeployScenarioManager):
to_router.add_subnet(t_subnet)
t_security_group = self._create_security_group(
security_groups_client=client_mgr.security_groups_client,
client=client_mgr.network_client, namestart=namestart)
security_group_rules_client=client_mgr.security_group_rules_client,
namestart=namestart)
security_groups = [{'name': t_security_group['name']}]
t_serv = self.create_server_on_network(
t_network, security_groups,
@ -399,7 +401,7 @@ class TestProviderRouterTenantNetwork(dmgr.TopoDeployScenarioManager):
@test.idempotent_id('a31712de-33ad-4dc2-9755-1a0631a4f66a')
@test.services('compute', 'network')
def test_provider_router_project_network(self):
# provider router owned by admin_client
# provider router owned by admin_manager
self.p_router = self._create_router(
client_mgr=self.admin_manager, namestart="deploy-provider-router",
distributed=self.tenant_router_attrs.get('distributed'),
@ -553,7 +555,7 @@ def _g_service_client(req_mgr, client_name):
s_client = getattr(req_mgr, client_name, None)
if s_client:
return s_client
return req_mgr.network_client
return req_mgr.networks_client
# self vs req: there are possible 3 client managers (admin, pri, 2nd)

View File

@ -0,0 +1,227 @@
# Copyright 2016 VMware Inc
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.common import waiters
from tempest import config
from tempest.lib.common.utils import data_utils
from tempest.lib import exceptions
from tempest import test
from vmware_nsx_tempest.tests.nsxv.scenario import (
manager_topo_deployment as dmgr)
CONF = config.CONF
DNS_SEARCH_DOMAIN = 'dns_search_domain'
class TestDnsSearchDomainBasicOps(dmgr.TopoDeployScenarioManager):
"""Test dns_search_domain working in subnets w/wo dns_search_domain.
network's subnet with dns_search_domain configured:
CONF.network.host_in_search_domain can be resolved,
update dns_search_dmain='' then host can not be resolved.
network's subnet without dns_search_domain configured:
CONF.network.host_in_search_domain can not be resolved,
update dns_search_dmain to CONF.network_dns_search_domain,
then host can be resolved.
Verify above 2 scenarios under shared/exclusive/distributed routers.
"""
@classmethod
def resource_setup(cls):
super(TestDnsSearchDomainBasicOps, cls).resource_setup()
cls.dns_search_domain = CONF.network.dns_search_domain
cls.host_in_search_domain = CONF.network.host_in_search_domain
@classmethod
def resource_cleanup(cls):
# lately, server up and down take long time. let's delete servers
# before test's auto cleanup kickin.
dmgr.delete_all_servers(cls.servers_client)
super(TestDnsSearchDomainBasicOps, cls).resource_cleanup()
def create_networks(self, dns_search_domain=None, cidr_offset=0):
prefix_name = 'dns-search' if dns_search_domain else 'no-search'
network_name = data_utils.rand_name(prefix_name)
network = self.create_network(client=self.networks_client,
name=network_name)
network = network.get('network', network)
subnet_kwargs = dict(name=network_name,
dns_nameservers=CONF.network.dns_servers,
cidr_offset=cidr_offset)
if dns_search_domain:
subnet_kwargs[DNS_SEARCH_DOMAIN] = dns_search_domain
subnet = self.create_subnet(network,
client=self.subnets_client,
**subnet_kwargs)
subnet = subnet.get('subnet', subnet)
if dns_search_domain:
self.assertEqual(dns_search_domain, subnet[DNS_SEARCH_DOMAIN])
return (network, subnet, dns_search_domain)
def create_router_by_type(self, router_type, name=None, **kwargs):
create_kwargs = dict(namestart='dns-search', external_gateway_info={
"network_id": CONF.network.public_network_id})
if router_type in ('shared', 'exclusive'):
create_kwargs['router_type'] = router_type
elif router_type in ('distributed'):
create_kwargs['distributed'] = True
kwargs.update(create_kwargs)
router = self._create_router(client_mgr=self.admin_manager,
**kwargs)
return router
def create_router_and_add_interfaces(self, router_type, net_list):
router = self.create_router_by_type(router_type)
for (network, subnet, dns_search_domain) in net_list:
router.add_subnet(subnet)
return router
def setup_tenant_networks(self, router_type):
self.networks_with_search_domain = self.create_networks(
self.dns_search_domain, cidr_offset=1)
self.networks_wo_search_domain = self.create_networks(
None, cidr_offset=2)
net_list = [self.networks_with_search_domain,
self.networks_wo_search_domain]
router = self.create_router_and_add_interfaces(router_type, net_list)
return (router, net_list)
def create_security_group_with_loginable_rules(self):
security_group = self._create_security_group(
security_groups_client=self.security_groups_client,
namestart='dns-search-')
return security_group
def wait_for_servers_become_active(self, server_id_list):
servers_client = self.admin_manager.servers_client
for server_id in server_id_list:
waiters.wait_for_server_status(
servers_client, server_id, 'ACTIVE')
def create_servers_on_networks(self, networks_info, security_group):
servers_client = self.servers_client
(network, subnet, dns_search_domain) = networks_info
security_groups = [{'name': security_group['id']}]
svr = self.create_server_on_network(
network, security_groups, name=network['name'],
wait_on_boot=False,
servers_client=self.servers_client)
server_info = dict(
server=svr, network=network, subnet=subnet,
dns_search_domain=dns_search_domain,
security_group=security_group,
servers_client=servers_client)
return server_info
def create_floatingip_for_server(self, server):
username, password = self.get_image_userpass()
floatingip = super(TestDnsSearchDomainBasicOps,
self).create_floatingip_for_server(
server, client_mgr=self.admin_manager)
msg = ("Associate floatingip[%s] to server[%s]"
% (floatingip, server['name']))
self._check_floatingip_connectivity(
floatingip, server, should_connect=True, msg=msg)
serv_fip = floatingip.floating_ip_address
dmgr.rm_sshkey(serv_fip)
ssh_client = dmgr.get_remote_client_by_password(
serv_fip, username, password)
return (floatingip, ssh_client)
def _test_host_cannot_be_resolved(self):
""""test CONF.network.host_in_dns_search_dmain can not be resolved.
The network/subnet does not define dns_search_domain and
its host_in_search_domain in dns_search_domain can not be resolved.
Later, update dns_search_domain to CONF.network.dns_search_domain,
then the host can be resovled.
"""
floatingip, sshc = self.create_floatingip_for_server(
self.net_wo_search['server'])
ping_cmd = 'ping -c3 %s' % self.host_in_search_domain
self.assertRaises(exceptions.SSHExecCommandFailed,
sshc.exec_command,
ping_cmd)
subnet = self.net_wo_search['subnet']
subnet = self.subnets_client.update_subnet(
subnet['id'],
dns_search_domain=self.dns_search_domain)
subnet = subnet.get('subnet', subnet)
self.assertEqual(subnet[DNS_SEARCH_DOMAIN],
self.dns_search_domain)
# renew dhcp lease to force dns_search_domain update too
sshc.renew_lease(floatingip['fixed_ip_address'])
sshc.exec_command(ping_cmd)
def _test_host_can_be_resolved(self):
""""test CONF.network.host_in_dns_search_dmain can be resolved.
The network/subnet has dns_search_domain defined and
its host_in_search_domain is in dns_search_domain should be resolved.
Later, update dns_search_domain to '', then the host is not resovled.
"""
floatingip, sshc = self.create_floatingip_for_server(
self.net_w_search['server'])
ping_cmd = 'ping -c3 %s' % self.host_in_search_domain
sshc.exec_command(ping_cmd)
subnet = self.net_w_search['subnet']
subnet = self.subnets_client.update_subnet(
subnet['id'], dns_search_domain='')
subnet = subnet.get('subnet', subnet)
self.assertEqual(subnet[DNS_SEARCH_DOMAIN], '')
# renew dhcp lease to force dns_search_domain update too
sshc.renew_lease(floatingip['fixed_ip_address'])
self.assertRaises(exceptions.SSHExecCommandFailed,
sshc.exec_command,
ping_cmd)
# entry point for dns_search_domain test for different router-type
def run_dns_search_domain_basic_ops(self, router_type):
router, net_list = self.setup_tenant_networks(router_type)
security_group = self.create_security_group_with_loginable_rules()
self.net_w_search = self.create_servers_on_networks(
self.networks_with_search_domain, security_group)
self.net_wo_search = self.create_servers_on_networks(
self.networks_wo_search_domain, security_group)
server_id_list = [self.net_w_search['server']['id'],
self.net_wo_search['server']['id']]
self.wait_for_servers_become_active(server_id_list)
self._test_host_can_be_resolved()
self._test_host_cannot_be_resolved()
class TestDnsSearchDomainOpsOverSharedRouter(TestDnsSearchDomainBasicOps):
@test.idempotent_id('5556cdce-075c-437a-9d9d-f1e4583e9f4c')
def test_dns_search_domain_ops_over_shared_router(self):
return self.run_dns_search_domain_basic_ops('shared')
class TestDnsSearchDomainOpsOverExclusiveRouter(TestDnsSearchDomainBasicOps):
@test.idempotent_id('6878c3cf-88d2-46ef-b366-b2a49bfa1e0a')
def test_dns_search_domain_ops_over_exclusive_router(self):
return self.run_dns_search_domain_basic_ops('exclusive')
class TestDnsSearchDomainOpsOverDistributedeRouter(
TestDnsSearchDomainBasicOps):
@test.idempotent_id('ad24cb58-532a-4675-9bbc-98ec4c296716')
def test_dns_search_domain_ops_over_distributed_router(self):
return self.run_dns_search_domain_basic_ops('distributed')

View File

@ -123,21 +123,25 @@ class TestDvrBasicOps(manager.NetworkScenarioTest):
# overwrite super class who does not accept router attributes
def create_networks(self, dns_nameservers=None, **kwargs):
client = self.network_client
namestart = 'dvr-ops'
routers_client = self.routers_client
networks_client = self.networks_client
subnets_client = self.subnets_client
network = self._create_network(
client=client, networks_client=networks_client,
networks_client=networks_client,
routers_client=routers_client,
namestart=namestart,
tenant_id=self.tenant_id)
router_kwargs = {}
router_kwargs = dict(client=routers_client, namestart=namestart)
for k in kwargs.keys():
if k in ('distributed', 'router_type', 'router_size'):
router_kwargs[k] = kwargs.pop(k)
router = self._create_router(**router_kwargs)
router.set_gateway(CONF.network.public_network_id)
subnet_kwargs = dict(network=network, client=client,
subnet_kwargs = dict(network=network,
namestart=namestart,
subnets_client=subnets_client)
# use explicit check because empty list is a valid option
if dns_nameservers is not None:
@ -148,9 +152,9 @@ class TestDvrBasicOps(manager.NetworkScenarioTest):
# overwrite super class
def _create_router(self, client=None, tenant_id=None,
namestart='router-smoke', **kwargs):
namestart='dvr-ops', **kwargs):
if not client:
client = self.network_client
client = self.routers_client
if not tenant_id:
tenant_id = client.tenant_id
name = data_utils.rand_name(namestart)
@ -158,8 +162,8 @@ class TestDvrBasicOps(manager.NetworkScenarioTest):
admin_state_up=True,
tenant_id=tenant_id,
**kwargs)
router = net_resources.DeletableRouter(client=client,
**result['router'])
router = net_resources.DeletableRouter(
routers_client=client, **result['router'])
self.assertEqual(router.name, name)
self.addCleanup(self.delete_wrapper, router.delete)
return router
@ -218,7 +222,7 @@ class TestDvrBasicOps(manager.NetworkScenarioTest):
for server in self.servers:
# call the common method in the parent class
super(TestDvrBasicOps, self).\
_check_project_network_connectivity(
_check_tenant_network_connectivity(
server, ssh_login, self._get_server_key(server),
servers_for_debug=self.servers)
@ -291,8 +295,7 @@ class TestDvrBasicOps(manager.NetworkScenarioTest):
interface = self.interface_client.create_interface(
server=server['id'],
network_id=self.new_net.id)
self.addCleanup(self.network_client.wait_for_resource_deletion,
'port',
self.addCleanup(self.ports_client.wait_for_resource_deletion,
interface['port_id'])
self.addCleanup(self.delete_wrapper,
self.interface_client.delete_interface,
@ -311,7 +314,7 @@ class TestDvrBasicOps(manager.NetworkScenarioTest):
"Old port: %s. Number of new ports: %d" % (
CONF.network.build_timeout, old_port,
len(self.new_port_list)))
new_port = net_resources.DeletablePort(client=self.network_client,
new_port = net_resources.DeletablePort(client=self.ports_client,
**self.new_port_list[0])
def check_new_nic():
@ -381,7 +384,9 @@ class TestDvrBasicOps(manager.NetworkScenarioTest):
should_connect=True):
ip_address = floating_ip.floating_ip_address
private_key = self._get_server_key(self.floating_ip_tuple.server)
ssh_source = self._ssh_to_server(ip_address, private_key)
# ssh_source = self._ssh_to_server(ip_address, private_key)
ssh_source = self.get_remote_client(ip_address,
private_key=private_key)
for remote_ip in address_list:
if should_connect:

View File

@ -69,9 +69,9 @@ class TestMultipleTransportZonesBasicOps(dmgr.TopoDeployScenarioManager):
"provider_network_type",
'vxlan')
cls.MAX_MTZ = getattr(CONF.nsxv, 'max_mtz', 0) or 3
cls.admin_client = cls.admin_manager.network_client
cls.admin_networks_client = cls.admin_manager.networks_client
cls.admin_subnets_client = cls.admin_manager.subnets_client
cls.admin_routers_client = cls.admin_manager.routers_client
@classmethod
def resource_cleanup(cls):
@ -142,12 +142,12 @@ class TestMultipleTransportZonesBasicOps(dmgr.TopoDeployScenarioManager):
return router
def clear_router_gateway_and_interfaces(self, router, nets):
router_client = self.admin_client
router_client.update_router(router['id'],
routers_client = self.admin_routers_client
routers_client.update_router(router['id'],
external_gateway_info=dict())
for net_id, (s_id, network, subnet, sg) in six.iteritems(nets):
try:
router_client.remove_router_interface_with_subnet_id(
routers_client.remove_router_interface_with_subnet_id(
router['id'], subnet['id'])
except Exception:
pass
@ -160,14 +160,15 @@ class TestMultipleTransportZonesBasicOps(dmgr.TopoDeployScenarioManager):
'mtz-tenant')
# create security_group with loginable rules
security_group = self._create_security_group(
security_groups_client=self.manager.security_groups_client,
client=self.manager.network_client,
security_groups_client=self.security_groups_client,
security_group_rules_client=self.security_group_rules_client,
namestart='mtz-tenant')
nets[net_id] = [None, network, subnet, security_group]
admin_security_group = self._create_security_group(
security_groups_client=self.admin_manager.security_groups_client,
client=self.admin_manager.network_client,
namestart='mtz-')
security_group_rules_client=(
self.admin_manager.security_group_rules_client),
namestart='mtz-admin')
for cidr_step in range(0, self.MAX_MTZ):
s_id = scope_id_list[cidr_step % len(scope_id_list)]
net_id, network, subnet = self.create_mtz_network_subnet(
@ -230,7 +231,7 @@ class TestMultipleTransportZonesBasicOps(dmgr.TopoDeployScenarioManager):
s_id, network, subnet, security_group = nets[net_id]
servers_client = (self.manager.servers_client if s_id is None
else self.admin_manager.servers_client)
security_groups = [{'name': security_group['name']}]
security_groups = [{'name': security_group['id']}]
svr = self.create_server_on_network(
network, security_groups,
name=network['name'],

View File

@ -110,11 +110,12 @@ class TestLBaaSBasicOps(manager.NetworkScenarioTest):
# overwrite super class who does not accept router attributes
def create_networks(self, dns_nameservers=None, **kwargs):
client = self.network_client
namestart = 'lbv1-ops'
routers_client = self.routers_client
networks_client = self.networks_client
subnets_client = self.subnets_client
router_kwargs = {}
router_kwargs = dict(client=routers_client, namestart=namestart)
for k in kwargs.keys():
if k in ('distributed', 'router_type', 'router_size'):
router_kwargs[k] = kwargs.pop(k)
@ -122,10 +123,13 @@ class TestLBaaSBasicOps(manager.NetworkScenarioTest):
router.set_gateway(CONF.network.public_network_id)
network = self._create_network(
client=client, networks_client=networks_client,
routers_client=routers_client,
networks_client=networks_client,
namestart=namestart,
tenant_id=self.tenant_id)
subnet_kwargs = dict(network=network, client=client,
subnet_kwargs = dict(network=network,
namestart=namestart,
subnets_client=subnets_client)
# use explicit check because empty list is a valid option
if dns_nameservers is not None:
@ -136,9 +140,9 @@ class TestLBaaSBasicOps(manager.NetworkScenarioTest):
# overwrite super class
def _create_router(self, client=None, tenant_id=None,
namestart='router-smoke', **kwargs):
namestart='router-lbv1', **kwargs):
if not client:
client = self.network_client
client = self.routers_client
if not tenant_id:
tenant_id = client.tenant_id
name = data_utils.rand_name(namestart)
@ -146,8 +150,8 @@ class TestLBaaSBasicOps(manager.NetworkScenarioTest):
admin_state_up=True,
tenant_id=tenant_id,
**kwargs)
router = net_resources.DeletableRouter(client=client,
**result['router'])
router = net_resources.DeletableRouter(
routers_client=client, **result['router'])
self.assertEqual(router.name, name)
self.addCleanup(self.delete_wrapper, router.delete)
return router
@ -376,7 +380,7 @@ class TestLBaaSBasicOps(manager.NetworkScenarioTest):
protocol_port=80,
subnet_id=self.subnet.id,
pool_id=self.pool.id)
self.vip.wait_for_status('ACTIVE')
self.vip_wait_for_status(self.vip, 'ACTIVE')
if (CONF.network.public_network_id and not
CONF.network.project_networks_reachable):
self._assign_floating_ip_to_vip(self.vip)
@ -392,6 +396,20 @@ class TestLBaaSBasicOps(manager.NetworkScenarioTest):
self.ports_client.update_port(
self.vip.port_id, security_groups=[self.security_group.id])
def vip_wait_for_status(self, vip, status='ACTIVE'):
# vip is DelatableVip
interval = vip.client.build_interval
timeout = vip.client.build_timeout
start_time = time.time()
while time.time() - start_time <= timeout:
resource = vip.client.show_vip(vip.id)['vip']
if resource['status'] == status:
return
time.sleep(interval)
message = "Wait for VIP become ACTIVE"
raise exceptions.TimeoutException(message)
def _check_load_balancing(self):
"""http to load balancer to check message handled by both servers.