Roman Safronov bf09f02aca Add L3HA OVN tests
Moved tests from the downstream plugin with minimal changes.
Setup function changed significantly in order to support
environments where routers can be scheduled on compute nodes.
Base _create_server function was changed to support spawning
a VM on a specific host or any host excluding list of hosts.
The tests are supported currently only on environments where ovn
routers are located on compute or standalone networker nodes.
Test that performs shutting down a node is required this
patch [1].
Also:
Since _create_server return value was changed (simplified),
adjusted some qos tests that could be affected.
Removed redundant 'return None' statements from some base functions.

[1] https://review.opendev.org/c/x/whitebox-neutron-tempest-plugin/+/913851

Change-Id: Ic7d46a9a432089f1cd1a30af7639e9824e464c6d
2024-04-01 19:12:30 +03:00

1087 lines
46 KiB
Python

# Copyright 2019 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import base64
import json
import os
import random
import re
import time
import yaml
import netaddr
from netifaces import AF_INET
from netifaces import ifaddresses
from netifaces import interfaces
from neutron_lib import constants
from neutron_tempest_plugin.common import shell
from neutron_tempest_plugin.common import ssh
from neutron_tempest_plugin.common import utils as common_utils
from neutron_tempest_plugin.scenario import base
from oslo_log import log
from tempest.common import utils
from tempest.common import waiters
from tempest import config
from tempest.lib.common import fixed_network
from tempest.lib.common.utils import data_utils
from tempest.lib.common.utils import test_utils
from whitebox_neutron_tempest_plugin.common import tcpdump_capture as capture
from whitebox_neutron_tempest_plugin.common import utils as local_utils
CONF = config.CONF
LOG = log.getLogger(__name__)
WB_CONF = CONF.whitebox_neutron_plugin_options
class BaseTempestWhiteboxTestCase(base.BaseTempestTestCase):
credentials = ['primary', 'admin']
@classmethod
def resource_setup(cls):
super(BaseTempestWhiteboxTestCase, cls).resource_setup()
uri = CONF.identity.uri
cls.is_ipv6 = True if netaddr.valid_ipv6(
uri[uri.find("[") + 1:uri.find("]")]) else False
cls.image_ref = CONF.compute.image_ref
cls.flavor_ref = CONF.compute.flavor_ref
cls.username = CONF.validation.image_ssh_user
agents = cls.os_admin.network.AgentsClient().list_agents()['agents']
ovn_agents = [agent for agent in agents if 'ovn' in agent['binary']]
cls.has_ovn_support = True if ovn_agents else False
sriov_agents = [
agent for agent in agents if 'sriov' in agent['binary']]
cls.has_sriov_support = True if sriov_agents else False
# deployer tool dependent variables
if WB_CONF.openstack_type == 'devstack':
cls.master_node_client = cls.get_node_client('localhost')
cls.master_cont_cmd_executor = cls.run_on_master_controller
cls.neutron_api_prefix = ''
cls.neutron_conf = WB_CONF.neutron_config
elif WB_CONF.openstack_type == 'podified':
cls.proxy_host_key = cls._get_podified_proxy_host_key()
cls.proxy_host_client = cls.get_node_client(
host=WB_CONF.proxy_host_address,
username=WB_CONF.proxy_host_user,
pkey=f"{cls.proxy_host_key}")
cls.master_node_client = cls.proxy_host_client
cls.master_cont_cmd_executor = cls.proxy_host_client
else:
LOG.warning(("Unrecognized deployer tool '{}', plugin supports "
"openstack_type as devstack/podified."
.format(WB_CONF.openstack_type)))
@classmethod
def run_on_master_controller(cls, cmd):
if WB_CONF.openstack_type == 'podified':
output = cls.proxy_host_client.exec_command(cmd)
if WB_CONF.openstack_type == 'devstack':
output, errors = local_utils.run_local_cmd(cmd)
LOG.debug("Stderr: {}".format(errors.decode()))
output = output.decode()
LOG.debug("Output: {}".format(output))
return output.strip()
def get_host_for_server(self, server_id):
server_details = self.os_admin.servers_client.show_server(server_id)
return server_details['server']['OS-EXT-SRV-ATTR:host']
@classmethod
def get_external_gateway(cls):
if CONF.network.public_network_id:
subnets = cls.os_admin.network_client.list_subnets(
network_id=CONF.network.public_network_id)
for subnet in subnets['subnets']:
if (subnet['gateway_ip'] and
subnet['ip_version'] == constants.IP_VERSION_4):
return subnet['gateway_ip']
@staticmethod
def get_node_client(
host, username=WB_CONF.overcloud_ssh_user, pkey=None,
key_filename=WB_CONF.overcloud_key_file):
if pkey:
return ssh.Client(host=host, username=username, pkey=pkey)
else:
return ssh.Client(host=host, username=username,
key_filename=key_filename)
def find_different_compute_host(self, exclude_hosts):
for node in self.nodes:
if not node['is_compute']:
continue
if node['is_compute'] and not node['name'] in exclude_hosts:
return node['name']
raise self.skipException(
"Not able to find a different compute than: {}".format(
exclude_hosts))
def get_local_ssh_client(self, network):
return ssh.Client(
host=self._get_local_ip_from_network(
self.get_subnet_cidr(network, 4)),
username=shell.execute_local_command('whoami').stdout.rstrip(),
key_filename=WB_CONF.overcloud_key_file)
def get_subnet_cidr(self, network, ip_version):
for subnet_id in network['subnets']:
subnet = self.os_admin.network_client.show_subnet(
subnet_id)['subnet']
if subnet['ip_version'] == ip_version:
return subnet['cidr']
def find_node_client(self, node_name):
for node in self.nodes:
if node['name'] == node_name:
return node['client']
@staticmethod
def _get_local_ip_from_network(network):
host_ip_addresses = [ifaddresses(iface)[AF_INET][0]['addr']
for iface in interfaces()
if AF_INET in ifaddresses(iface)]
for ip_address in host_ip_addresses:
if netaddr.IPAddress(ip_address) in netaddr.IPNetwork(network):
return ip_address
def get_fip_port_details(self, fip):
fip_ports = self.os_admin.network_client.list_ports(
network_id=CONF.network.public_network_id,
device_owner=constants.DEVICE_OWNER_FLOATINGIP)['ports']
for fp in fip_ports:
if (fp.get('fixed_ips') and len(fp['fixed_ips']) != 0 and
fp['fixed_ips'][0]['ip_address'] ==
fip['floating_ip_address']):
return fp
@classmethod
def get_podified_nodes_data(cls):
def append_node_data(node, node_group_data):
if 'ocp' in node:
node_name = node.replace("ocp", "master")
key = 'ansible_ssh_private_key_file' # meaning dict key here
# save path of ocp nodes key (if not yet), we'll need it later
if not hasattr(cls, 'ocp_nodes_key_path'):
cls.ocp_nodes_key_path = (
node_group_data[node][key].replace(
'~', '/home/{}'.format(WB_CONF.proxy_host_user)))
node_key = node_group_data[node][key].split('/')[-1]
else:
node_name = node
node_key = 'id_cifw_key'
node_data = {
'name': node_name,
'ip': node_group_data[node]['ansible_host'],
'user': node_group_data[node]['ansible_user'],
'key': node_key}
nodes.append(node_data)
nodes = []
inventory_data = yaml.safe_load(
cls.proxy_host_client.exec_command(
'cat ' + WB_CONF.proxy_host_inventory_path))
computes_data = inventory_data['computes']['hosts']
for node in computes_data:
append_node_data(node, computes_data)
ocps_data = inventory_data['ocps']['hosts']
for node in ocps_data:
append_node_data(node, ocps_data)
return nodes
@classmethod
def _get_podified_proxy_host_key(cls):
start = '-----BEGIN OPENSSH PRIVATE KEY-----\n'
end = '-----END OPENSSH PRIVATE KEY-----\n'
key = json.loads(WB_CONF.proxy_host_key_data)['key']
return '{}{}{}'.format(start, key, end)
@classmethod
def append_node(cls, host, is_compute=False, is_networker=False):
hostname = host.split('.')[0]
for node in cls.nodes:
if node['name'] == hostname:
if not node['is_networker']:
node['is_networker'] = is_networker
if not node['is_compute']:
node['is_compute'] = is_compute
return
if WB_CONF.openstack_type == 'podified':
for node in cls.nodes_data:
LOG.debug(
"hostname='{}', name='{}'".format(hostname, node['name']))
if node['name'] == hostname:
extra_params = {
'client': cls.get_node_client(
host=node['ip'], username=node['user'],
pkey=f"{cls.keys_data[node['key']]}")}
break
else:
extra_params = {'client': cls.get_node_client(host)}
params = {'name': hostname,
'is_networker': is_networker,
'is_controller': False,
'is_compute': is_compute}
node = {**params, **extra_params}
# Here we are checking if there are controller-specific
# processes running on the node
output = node['client'].exec_command(
r"ps ax | grep 'rabbit\|galera' | grep -v grep || true")
if output.strip() != "":
node['is_controller'] = True
cls.nodes.append(node)
@classmethod
def discover_nodes(cls):
if WB_CONF.openstack_type == 'podified':
cls.nodes_data = cls.get_podified_nodes_data()
cls.keys_data = {
'id_cifw_key': cls.proxy_host_key,
'devscripts_key': cls.proxy_host_client.exec_command(
'cat ' + cls.ocp_nodes_key_path)}
agents = cls.os_admin.network.AgentsClient().list_agents()['agents']
if cls.has_ovn_support:
l3_agent_hosts = [
agent['host'] for agent in agents
if agent['agent_type'] == 'OVN Controller Gateway agent']
else:
l3_agent_hosts = [
agent['host'] for agent in agents
if agent['binary'] == 'neutron-l3-agent']
compute_hosts = [
host['hypervisor_hostname'] for host
in cls.os_admin.hv_client.list_hypervisors()['hypervisors']]
cls.nodes = []
for host in compute_hosts:
cls.append_node(host, is_compute=True)
for host in l3_agent_hosts:
cls.append_node(host, is_networker=True)
def get_node_setting(self, node_name, setting):
for node in self.nodes:
if node_name == node['name']:
return node[setting]
@classmethod
def get_pod_of_service(cls, service='neutron'):
# (rsafrono) at this moment only neutron service pod handled
# since it's the only that existing tests are using
if service == 'neutron':
return cls.proxy_host_client.exec_command(
"oc get pods | grep neutron | grep -v meta | "
"cut -d' ' -f1").strip()
@classmethod
def get_configs_of_service(cls, service='neutron'):
# (rsafrono) at this moment only neutron configs were handled
# since it's the only service that existing tests are using
if service == 'neutron':
pod = cls.get_pod_of_service(service)
return cls.proxy_host_client.exec_command(
'oc rsh {} find {} -type f'.format(pod, os.path.split(
WB_CONF.neutron_config)[0])).strip().split('\n')
@classmethod
def check_service_setting(
cls, host, service='neutron', config_files=None,
section='DEFAULT', param='', value='True',
msg='Required config value is missing', skip_if_fails=True):
"""Check if a service on a node has a setting with a value in config
:param node(dict): Dictionary with host-related parameters,
host['client'] is a required parameter
:param service(str): Name of the containerized service.
:param config_files(list): List with paths to config files. List makes
sense on podified where e.g. neutron has
2 config files with same sections.
:param section(str): Section in the config file.
:param value(str): Expected value.
:param msg(str): Message to print in case of expected value not found
:param skip_if_fails(bool): skip if the check fails - if it fails and
skip_if_fails is False, return False.
"""
if WB_CONF.openstack_type == 'podified':
service_prefix = "oc rsh {}".format(
cls.get_pod_of_service(service))
else:
service_prefix = ""
cmd_prefix = "crudini --get"
for config_file in config_files:
setting = "{} {} {}".format(config_file, section, param)
cmd = "{} {} {} || true".format(
service_prefix, cmd_prefix, setting)
LOG.debug("Command = '{}'".format(cmd))
result = host['client'].exec_command(cmd)
LOG.debug("Result = '{}'".format(result))
if value in result:
return True
else:
continue
if skip_if_fails:
raise cls.skipException(msg)
else:
return False
@classmethod
def reset_node_service(cls, service_name, ssh_client,
wait_until_active=True, timeout=30):
# NOTE(mblue): Globbing works on podified/devstack/tripleo
service_glob = re.sub(r'[^a-zA-Z]', '?', service_name)
host_ip = ssh_client.__dict__['host']
LOG.debug("Restarting service '%s' on host '%s'.",
service_glob, host_ip)
ssh_client.exec_command(
'sudo systemctl restart *{}.service'.format(service_glob))
if not wait_until_active:
return
def _is_service_active():
return 'active' in ssh_client.exec_command(
'sudo systemctl is-active *{}.service; true'.format(
service_glob))
LOG.debug("Waiting for service '%s' to become active on host '%s'.",
service_glob, host_ip)
common_utils.wait_until_true(
_is_service_active, timeout=timeout, sleep=5,
exception=RuntimeError(
"Timed out {} seconds, service {} (globbing '*{}.service') "
"didn't become active after restart.\n\n'''\n{}\n'''".format(
timeout, service_name, service_glob,
ssh_client.exec_command(
'sudo systemctl status *{}.service; true'.format(
service_glob)))))
LOG.debug("Service '%s' active on host '%s'.",
service_glob, host_ip)
def _create_server(
self, create_floating_ip=True, exclude_hosts=None,
network=None, **kwargs):
network = network or self.network
kwargs.setdefault('name', data_utils.rand_name('server-test'))
kwargs['flavorRef'] = self.flavor_ref
kwargs['imageRef'] = self.image_ref
kwargs['networks'] = [{'uuid': network['id']}]
if not kwargs.get('key_name'):
kwargs['key_name'] = self.keypair['name']
if not kwargs.get('security_groups'):
kwargs['security_groups'] = [{
'name': self.security_groups[-1]['name']}]
if exclude_hosts:
exclude_hosts_ignored = False
if kwargs.get('host') and (kwargs['host'] in exclude_hosts):
exclude_hosts_ignored = True
LOG.debug("'exclude_hosts' parameter contains same value as "
"'host' so it will be ignored, i.e. 'host' will be "
"used")
else:
kwargs['host'] = self.find_different_compute_host(
exclude_hosts)
if kwargs.get('host'):
servers_client = self.os_admin.servers_client
network_client = self.os_admin.network_client
else:
servers_client = self.os_primary.servers_client
network_client = self.os_primary.network_client
server = servers_client.create_server(**kwargs)['server']
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
waiters.wait_for_server_termination,
servers_client,
server['id'])
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
servers_client.delete_server,
server['id'])
if exclude_hosts and not exclude_hosts_ignored:
if self.get_host_for_server(server['id']) in exclude_hosts:
self.fail("Failed to spawn a server on a host other than in "
"this list: '{}'. Can not proceed.".format(
' '.join(exclude_hosts)))
self.wait_for_server_active(server, client=servers_client)
port = self.client.list_ports(
network_id=network['id'],
device_id=server['id'])['ports'][0]
if create_floating_ip:
fip = network_client.create_floatingip(
floating_network_id=CONF.network.public_network_id,
port_id=port['id'])['floatingip']
self.addCleanup(test_utils.call_and_ignore_notfound_exc,
network_client.delete_floatingip,
fip['id'])
else:
fip = None
return {'port': port, 'fip': fip, 'server': server}
def _create_server_for_topology(
self, network_id=None, port_type=None,
different_host=None, port_qos_policy_id=None):
if not network_id:
network_id = self.network['id']
if port_type:
kwargs = {'binding:vnic_type': port_type,
'qos_policy_id': port_qos_policy_id}
port = self.create_port(
network={'id': network_id}, **kwargs)
networks = [{'port': port['id']}]
else:
networks = [{'uuid': network_id}]
params = {
'flavor_ref': self.flavor_ref,
'image_ref': self.image_ref,
'key_name': self.keypair['name'],
'networks': networks,
'security_groups': [
{'name': self.secgroup['security_group']['name']}],
'name': data_utils.rand_name(self._testMethodName)
}
if port_type == 'direct-physical':
net_vlan = self.client.show_network(
network_id)['network']['provider:segmentation_id']
params['user_data'] = build_user_data(net_vlan)
params['config_drive'] = True
if (different_host and CONF.compute.min_compute_nodes > 1):
params['scheduler_hints'] = {
'different_host': different_host['id']}
server = self.create_server(**params)['server']
if different_host and CONF.compute.min_compute_nodes > 1:
if (self.get_host_for_server(different_host['id']) ==
self.get_host_for_server(server['id'])):
raise self.skipException(
'Failed to run the VM on a different hypervisor, make '
'sure that DifferentHostFilter is in the list of '
'enabled nova scheduler filters')
port = self.client.list_ports(device_id=server['id'])['ports'][0]
if network_id == CONF.network.public_network_id:
access_ip_address = port['fixed_ips'][0]['ip_address']
else:
access_ip_address = self.create_floatingip(
port=port)['floating_ip_address']
server['ssh_client'] = ssh.Client(access_ip_address,
self.username,
pkey=self.keypair['private_key'])
return server
def _create_vms_by_topology(
self, topology='internal', port_type=None, ipv6=False,
different_host=True, num_vms_created=2):
"""Function for creating desired topology for the test
Available topologies:
* internal(default): sender and receiver are on tenant network
* external: sender and receiver are on external(public) network
* east-west: sender and receiver are on different tenant networks
* north-south: sender is on external and receiver on tenant network
:param topology(str): one of 4 available topologies to use (see list
above)
:param port_type(str): type of port to use. If omitted, default port
type will be used. Can be set to 'direct' or 'direct-physical'
for SR-IOV environments.
:param different_host(bool): whether to force vms to run on different
host.
:param num_vms_created(int): number of vms to create, 1 or 2.
default is 2.
:returns: sender if num_vms_created is 1, else server and receiver
"""
# num_vms_created can be 1 or 2
self.assertIn(num_vms_created, [1, 2], "num_vms_created can be 1 or 2")
def _create_local_network():
network = self.create_network()
subnet_index = len(self.reserved_subnet_cidrs)
cidr = '192.168.%d.0/24' % subnet_index
subnet = self.create_subnet(network, cidr=cidr)
self.create_router_interface(router['id'], subnet['id'])
if ipv6:
ipv6_cidr = '2001:{:x}::/64'.format(200 + subnet_index)
ra_address_mode = 'dhcpv6-stateless'
ipv6_subnet = self.create_subnet(
network, cidr=ipv6_cidr, ip_version=6,
ipv6_ra_mode=ra_address_mode,
ipv6_address_mode=ra_address_mode)
self.create_router_interface(router['id'], ipv6_subnet['id'])
return network
if topology != 'external':
if hasattr(self, "router") and self.router:
router = self.router
else:
router = self.create_router_by_client()
if topology == 'external' or topology == 'north-south':
external_network = self.client.show_network(
CONF.network.public_network_id)['network']
if not external_network['shared']:
skip_reason = "External network is not shared"
self.skipTest(skip_reason)
src_network = external_network
else:
src_network = _create_local_network()
sender = self._create_server_for_topology(
network_id=src_network['id'],
port_type=port_type)
if topology == 'external' or topology == 'internal':
dst_network = src_network
else:
dst_network = _create_local_network()
different_host = sender if different_host else None
if num_vms_created == 1:
return sender
receiver = self._create_server_for_topology(
different_host=different_host, network_id=dst_network['id'],
port_type=port_type)
return sender, receiver
class BaseTempestTestCaseAdvanced(BaseTempestWhiteboxTestCase):
"""Base class skips test suites unless advanced image is available,
also defines handy test settings for advanced image use.
"""
@classmethod
def skip_checks(cls):
super(BaseTempestTestCaseAdvanced, cls).skip_checks()
advanced_image_available = (
CONF.neutron_plugin_options.advanced_image_ref or
CONF.neutron_plugin_options.default_image_is_advanced)
if not advanced_image_available:
skip_reason = "This test requires advanced image and tools"
raise cls.skipException(skip_reason)
@classmethod
def resource_setup(cls):
super(BaseTempestTestCaseAdvanced, cls).resource_setup()
if CONF.neutron_plugin_options.default_image_is_advanced:
cls.flavor_ref = CONF.compute.flavor_ref
cls.image_ref = CONF.compute.image_ref
cls.username = CONF.validation.image_ssh_user
else:
cls.flavor_ref = (
CONF.neutron_plugin_options.advanced_image_flavor_ref)
cls.image_ref = CONF.neutron_plugin_options.advanced_image_ref
cls.username = CONF.neutron_plugin_options.advanced_image_ssh_user
class TrafficFlowTest(BaseTempestWhiteboxTestCase):
force_tenant_isolation = False
@classmethod
@utils.requires_ext(extension="router", service="network")
def skip_checks(cls):
super(TrafficFlowTest, cls).skip_checks()
if not CONF.network.public_network_id:
raise cls.skipException(
'The public_network_id option must be specified.')
if not WB_CONF.run_traffic_flow_tests:
raise cls.skipException(
"CONF.whitebox_neutron_plugin_options."
"run_traffic_flow_tests set to False.")
@classmethod
def resource_setup(cls):
super(TrafficFlowTest, cls).resource_setup()
cls.gateway_external_ip = cls.get_external_gateway()
if not cls.gateway_external_ip:
raise cls.skipException("IPv4 gateway is not configured "
"for public network or public_network_id "
"is not configured.")
cls.discover_nodes()
def _start_captures(self, interface, filters):
for node in self.nodes:
node['capture'] = capture.TcpdumpCapture(
node['client'], interface, filters)
self.useFixture(node['capture'])
time.sleep(2)
def _stop_captures(self):
for node in self.nodes:
node['capture'].stop()
def check_east_west_icmp_flow(
self, dst_ip, expected_routing_nodes, expected_macs, ssh_client):
"""Check that traffic routed as expected within a tenant network
Both directions are supported.
Traffic is captured on
CONF.whitebox_neutron_plugin_options.node_tunnel_interface.
Use values:
genev_sys_6081 for OVN
vxlanxx for ML2/OVS with VXLAN tunnels
<vlanid> for ML2/OVS with VLAN tunnels
:param dst_ip(str): Destination IP address that we check route to
:param expected_routing_nodes(list): Hostnames of expected gateways,
nodes on tunnel interface of which we expect
to find ethernet frames with packets that we send
:param expected_macs(tuple): pair of MAC addresses of ports that we
expect to find on the captured packets
:param ssh_client(Client): SSH client object of the origin of traffic
(the one that we send traffic from)
"""
interface = CONF.whitebox_neutron_plugin_options.node_tunnel_interface
# create filters
if type(expected_macs) is tuple:
filters = 'icmp and ether host {0} and ether host {1}'.format(
expected_macs[0],
expected_macs[1])
elif type(expected_macs) is list:
filters = ('"icmp and ((ether host {0} and ether host {1}) '
'or (ether host {2} and ether host {3}))"').format(
expected_macs[0][0],
expected_macs[0][1],
expected_macs[1][0],
expected_macs[1][1])
else:
raise TypeError(expected_macs)
self._start_captures(interface, filters)
self.check_remote_connectivity(ssh_client, dst_ip, ping_count=2)
self._stop_captures()
LOG.debug('Expected routing nodes: {}'.format(
','.join(expected_routing_nodes)))
actual_routing_nodes = [node['name']
for node in self.nodes if
not node['capture'].is_empty()]
LOG.debug('Actual routing nodes: {}'.format(
','.join(actual_routing_nodes)))
self.assertCountEqual(expected_routing_nodes, actual_routing_nodes)
def check_north_south_icmp_flow(
self, dst_ip, expected_routing_nodes, expected_mac, ssh_client,
ignore_outbound=False):
"""Check that traffic routed as expected between internal and external
networks. Both directions are supported.
:param dst_ip(str): Destination IP address that we check route to
:param expected_routing_nodes(list): Hostnames of expected gateways,
nodes on external interface of which we expect
to find ethernet frames with packets that we send
:param expected_mac(str): MAC address of a port that we expect to find
on the expected gateway external interface
:param ssh_client(Client): SSH client object of the origin of traffic
(the one that we send traffic from)
:param ignore_outbound(bool): Whether to ignore outbound packets.
This helps to avoid false positives.
"""
interface = WB_CONF.node_ext_interface
inbound = '-Qin' if ignore_outbound else ''
size = None
if not WB_CONF.bgp:
filters = '{} icmp and ether host {}'.format(inbound, expected_mac)
else:
filters = "{} icmp and icmp[0] == 8".format(inbound)
size = random.randint(0, 50)
# Adjust payload size adding icmp header size
if netaddr.valid_ipv6(dst_ip):
size += 44
else:
size += 28
# Filter including ip size packet
filters += " and ip[2:2]=={} and ip dst {}".format(size, dst_ip)
self._start_captures(interface, filters)
# if the host is localhost, don't use remote connectivity,
# ping directly on the host
if ssh_client.host in (
'localhost', '127.0.0.1', '0:0:0:0:0:0:0:1', '::1'):
self.ping_ip_address(dst_ip, mtu=size, should_succeed=True)
# tcpdump requires a delay between capturing packets and writing
# them to its file.
time.sleep(2)
else:
self.check_remote_connectivity(
ssh_client, dst_ip, mtu=size, ping_count=2)
self._stop_captures()
LOG.debug('Expected routing nodes: {}'.format(expected_routing_nodes))
actual_routing_nodes = [node['name']
for node in self.nodes if
not node['capture'].is_empty()]
LOG.debug('Actual routing nodes: {}'.format(
','.join(actual_routing_nodes)))
self.assertCountEqual(expected_routing_nodes, actual_routing_nodes)
class BaseTempestTestCaseOvn(BaseTempestWhiteboxTestCase):
@classmethod
def resource_setup(cls):
super(BaseTempestTestCaseOvn, cls).resource_setup()
if not cls.has_ovn_support:
raise cls.skipException(
"OVN agents not found. This test is supported only on "
"openstack environments with OVN support.")
cls.nbctl, cls.sbctl = cls._get_ovn_dbs()
cls.nbmonitorcmd, cls.sbmonitorcmd = cls._get_ovn_db_monitor_cmds()
@classmethod
def _get_ovn_db_monitor_cmds(cls):
monitorcmdprefix = 'sudo timeout 300 ovsdb-client monitor -f json '
if WB_CONF.openstack_type == 'podified':
# (rsafrono) still need to re-check if works properly
nb_monitor_connection_opts = cls.nbctl.replace(
'ovn-nbctl', '{} punix:/tmp/ovnnb_db.sock'.format(
monitorcmdprefix.replace('sudo', '')))
sb_monitor_connection_opts = cls.sbctl.replace(
'ovn-sbctl', '{} punix:/tmp/ovsnb_db.sock'.format(
monitorcmdprefix.replace('sudo', '')))
return (nb_monitor_connection_opts, sb_monitor_connection_opts)
if WB_CONF.openstack_type == 'devstack':
regex = r'--db=(.*)$'
# this regex search will return the connection string
# (tcp:IP:port or ssl:IP:port) and in case of TLS,
# will also include the TLS options
nb_monitor_connection_opts = re.search(regex, cls.nbctl).group(1)
sb_monitor_connection_opts = re.search(regex, cls.sbctl).group(1)
return (monitorcmdprefix + nb_monitor_connection_opts,
monitorcmdprefix + sb_monitor_connection_opts)
@classmethod
def _get_ovn_dbs(cls):
if WB_CONF.openstack_type == 'podified':
sb_pod = cls.proxy_host_client.exec_command(
"oc get pods | grep ovsdbserver-sb | cut -f1 -d' '").strip()
sb_prefix = 'oc rsh {}'.format(sb_pod)
nb_prefix = sb_prefix.replace('sb', 'nb')
cmd = "{} ovn-{}ctl"
return [cmd.format(nb_prefix, 'nb'), cmd.format(sb_prefix, 'sb')]
if WB_CONF.openstack_type == 'devstack':
sbdb = "unix:/usr/local/var/run/ovn/ovnsb_db.sock"
nbdb = sbdb.replace('sb', 'nb')
cmd = "sudo ovn-{}ctl --db={}"
return [cmd.format('nb', nbdb), cmd.format('sb', sbdb)]
def get_router_gateway_chassis(self, router_port_id):
cmd = "{} get port_binding cr-lrp-{} chassis".format(
self.sbctl, router_port_id)
LOG.debug("Waiting until port is bound to chassis")
self.chassis_id = None
def _port_binding_exist():
self.chassis_id = self.run_on_master_controller(cmd)
LOG.debug("chassis_id = '{}'".format(self.chassis_id))
if self.chassis_id != '[]':
return True
return False
try:
common_utils.wait_until_true(lambda: _port_binding_exist(),
timeout=30, sleep=5)
except common_utils.WaitTimeout:
self.fail("Port is not bound to chassis")
cmd = "{} get chassis {} hostname".format(self.sbctl, self.chassis_id)
LOG.debug("Running '{}' on the master node".format(cmd))
res = self.run_on_master_controller(cmd)
return res.replace('"', '').split('.')[0]
def get_router_gateway_chassis_list(self, router_port_id):
cmd = (self.nbctl + " lrp-get-gateway-chassis lrp-" + router_port_id)
data = self.run_on_master_controller(cmd)
return [re.sub(r'.*_(.*?)\s.*', r'\1', s) for s in data.splitlines()]
def get_router_gateway_chassis_by_id(self, chassis_id):
res = self.run_on_master_controller(
self.sbctl + " get chassis " + chassis_id + " hostname").rstrip()
return res.replace('"', '').split('.')[0]
def get_router_port_gateway_mtu(self, router_port_id):
cmd = (self.nbctl + " get logical_router_port lrp-" + router_port_id +
" options:gateway_mtu")
return int(
self.run_on_master_controller(cmd).rstrip().strip('"'))
def get_item_uuid(self, db, item, search_string):
ovn_db = self.sbctl if db == 'sb' else self.nbctl
cmd = (ovn_db + " find " + item + " " + search_string +
" | grep _uuid | awk '{print $3}'")
return self.run_on_master_controller(cmd)
def get_datapath_tunnel_key(self, search_string):
cmd = (self.sbctl + " find datapath_binding " + search_string +
" | grep tunnel_key | awk '{print $3}'")
return self.run_on_master_controller(cmd)
def get_logical_switch(self, port):
"""Returns logical switch name that port is connected to
Fuction gets the logical switch name without its ID from the
`ovn-nbctl lsp-get-ls <PORT_NAME>` command
"""
cmd = '{cmd} lsp-get-ls {port}'.format(cmd=self.nbctl, port=port)
output = self.run_on_master_controller(cmd)
ls_name = re.search('neutron-[^)]*', output)
if ls_name:
return ls_name.group()
else:
return ''
def get_physical_net(self, port):
"""Returns physical network name that port has configured with
Physical network name is saved as option in the logical switch port
record in OVN north database. It can be queried with
`ovn-nbctl lsp-get-options <PORT_NAME>` command but this output may
contain more than one option so it is better to get the value with
`ovn-nbctl get Logical_Switch_Port <PORT_NAME> options:network_name`
command
"""
cmd = '{cmd} get Logical_Switch_Port {port} '\
'options:network_name'.format(cmd=self.nbctl, port=port)
return self.run_on_master_controller(cmd)
def verify_that_segment_deleted(self, segment_id):
"""Checks that the segment id is not in the OVN database
There shouldn't be 'provnet-<SEGEMTN_ID>' port in the OVN database
after the segment has been deleted
"""
cmd = '{cmd} find Logical_Switch_Port '\
'name=provnet-{sid}'.format(cmd=self.nbctl, sid=segment_id)
output = self.run_on_master_controller(cmd)
self.assertEqual(output, '')
# user_data_cmd is used to generate a VLAN interface on VM instances with PF
# ports
user_data_cmd = """
#cloud-config
write_files:
- path: "/etc/sysconfig/network-scripts/ifcfg-%s"
owner: "root"
permissions: "777"
content: |
DEVICE="%s"
BOOTPROTO="dhcp"
ONBOOT="yes"
VLAN="yes"
PERSISTENT_DHCLIENT="yes"
runcmd:
- [ sh, -c , "systemctl restart NetworkManager" ]
"""
user_data_cmd = user_data_cmd.replace('\t', '')
def build_user_data(net_vlan):
"""user_data is required when direct-physical (PF) ports are used
"""
if_full_name = '%s.%s' % \
(WB_CONF.default_instance_interface,
net_vlan)
user_data = base64.b64encode((
user_data_cmd % (if_full_name, if_full_name)).encode("utf-8"))
return user_data
class ProviderBaseTest(BaseTempestWhiteboxTestCase):
"""Base class for tests using provider networks, such as provider routed
networks or sriov scenarios
Admin user is needed to create ports on the existing provisioning network
"""
servers = []
keypairs_client = None
secgroup_client = None
servers_client = None
extra_dhcp_opts = None
@classmethod
def create_loginable_secgroup_rule(cls, secgroup_id=None,
client=None):
"""This rule is intended to permit inbound IPv4 and IPv6 ssh
"""
cls.create_security_group_rule(
security_group_id=secgroup_id,
client=client,
protocol='tcp',
direction='ingress',
ip_version=6,
port_range_min=22,
port_range_max=22)
cls.create_security_group_rule(
security_group_id=secgroup_id,
client=client,
protocol='tcp',
direction='ingress',
port_range_min=22,
port_range_max=22)
@classmethod
def create_pingable_secgroup_rule(cls, secgroup_id=None,
client=None):
"""This rule is intended to permit inbound IPv4 and IPv6 ping
"""
cls.create_security_group_rule(
security_group_id=secgroup_id,
client=client,
protocol='icmp',
direction='ingress')
cls.create_security_group_rule(
security_group_id=secgroup_id,
client=client,
protocol='icmpv6',
ip_version=6,
direction='ingress')
@classmethod
def resource_setup(cls):
super(ProviderBaseTest, cls).resource_setup()
# setup basic topology for servers we can log into it
cls.keypair = cls.create_keypair(client=cls.keypairs_client)
secgroup_name = data_utils.rand_name('secgroup')
if cls.secgroup_client:
cls.secgroup = cls.secgroup_client.create_security_group(
name=secgroup_name)['security_group']
else:
cls.secgroup = cls.client.create_security_group(
name=secgroup_name)['security_group']
cls.security_groups.append(cls.secgroup)
cls.create_loginable_secgroup_rule(
secgroup_id=cls.secgroup['id'],
client=cls.client)
cls.create_pingable_secgroup_rule(
secgroup_id=cls.secgroup['id'],
client=cls.client)
@classmethod
def resource_cleanup(cls):
client = cls.servers_client or cls.os_primary.servers_client
for server in cls.servers:
cls._try_delete_resource(client.delete_server,
server['id'])
waiters.wait_for_server_termination(client,
server['id'])
super(ProviderBaseTest, cls).resource_cleanup()
@classmethod
def create_network_with_port(cls, cidr, gateway=True, **kwargs):
cls.network = cls.create_network()
if not gateway:
# some subnets need to be created without a default gateway
# e.g.: when a server is created with two ports, one of them should
# not include a default gateway
cls.subnet = cls.create_subnet(
cls.network, cidr=cidr, gateway=None)
else:
cls.subnet = cls.create_subnet(cls.network, cidr=cidr)
cls.port_id = cls.create_port(network=cls.network, **kwargs)['id']
return {'port': cls.port_id}
def build_create_port_body_and_secgroups(self, port_type, secgroup):
"""create_port_body and security_groups are needed to create ports,
whatever their types are (normal, macvtap, direct or direct-physical)
"""
create_port_body = {}
security_groups = []
if port_type not in ('direct', 'direct-physical', 'macvtap'):
create_port_body['security_groups'] = [secgroup['id']]
security_groups = [{'name': secgroup['name']}]
create_port_body['binding:vnic_type'] = port_type
create_port_body['name'] = "_".join(['port', port_type])
if self.extra_dhcp_opts is not None:
create_port_body['extra_dhcp_opts'] = self.extra_dhcp_opts
return (create_port_body, security_groups)
def _create_server(self, **kwargs):
kwargs['client'] = \
self.servers_client or self.os_primary.servers_client
kwargs['flavor_ref'] = self.flavor_ref
kwargs['image_ref'] = self.image_ref
kwargs['key_name'] = self.keypair['name']
server = self.create_server(**kwargs)
self.servers.append(server['server'])
return server['server']
def _create_network_port(self, port_type,
reuse_port=False,
use_provider_net=True,
subnet_id=None,
reused_tenant_net=None,
cidr=None,
gateway=True):
create_port_body, security_groups = \
self.build_create_port_body_and_secgroups(
port_type, self.secgroup)
if subnet_id:
subnet_name = 'segment' + subnet_id
if use_provider_net:
self.network = fixed_network.get_network_from_name(
CONF.network.floating_network_name, self.client)
if not reuse_port:
if not subnet_id:
self.create_port(
network=self.network,
fixed_ips=[{'subnet_id': self.network['subnets'][-1]}],
**create_port_body)
else:
subnets = self.client.list_subnets(name=subnet_name)
subnet_id = {'subnet_id': subnets['subnets'][-1]['id']}
self.create_port(
network=self.network,
fixed_ips=[subnet_id],
**create_port_body)
port = {'port': self.ports[-1]['id']}
elif not reused_tenant_net:
port = self.create_network_with_port(
cidr=cidr, gateway=gateway, **create_port_body)
else:
self.network = reused_tenant_net
port = {'port': self.create_port(network=self.network,
**create_port_body)['id']}
if not subnet_id:
net_id = self.network['id']
nc = self.admin_manager.network_client
net_vlan = nc.show_network(net_id)['network'][
'provider:segmentation_id']
else:
segments = self.client.list_segments(name=subnet_name)
net_vlan = segments['segments'][-1]['segmentation_id']
user_data = ""
config_drive = False
if port_type == 'direct-physical':
user_data = build_user_data(net_vlan)
config_drive = True
return (security_groups, port, user_data, config_drive)
def check_port_status(self, port_type,
port_index=-1, server_index=-1):
# by default, use last created port (-1) and last created server (-1)
port_id = self.ports[port_index]['id']
server_id = self.servers[server_index]['id']
waiters.wait_for_interface_status(self.os_adm.interfaces_client,
server_id,
port_id,
constants.PORT_STATUS_ACTIVE)
port_details = self.client.show_port(port_id)['port']
network = fixed_network.get_network_from_name(
CONF.network.floating_network_name, self.client)
self.assertEqual(port_details['network_id'], network['id'])
self.assertEqual(port_details['admin_state_up'], True)
self.assertEqual(port_details['binding:vnic_type'], port_type)
return port_details