
This is a patch to start work on re-writing the integration tests using pytest syntax and several improvements, as proposed on the upstream meeting, and summarized at https://etherpad.opendev.org/p/horizon-pytest The new tests are to eventually replace the existing integration tests. At the moment they don't run automatically, you have to explicitly run them using tox or pytest. When the new tests are complete, we will switch to them on the gate. Change-Id: Iea38e4f9771ff3cae7ae8675863e9c488f3f6d8a
234 lines
7.8 KiB
Python
234 lines
7.8 KiB
Python
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import os
|
|
import signal
|
|
import socket
|
|
import subprocess
|
|
from threading import Thread
|
|
import time
|
|
|
|
import pytest
|
|
import xvfbwrapper
|
|
|
|
from horizon.test import webdriver
|
|
from openstack_dashboard.test.integration_tests import config as horizon_config
|
|
|
|
|
|
STASH_FAILED = pytest.StashKey[bool]()
|
|
|
|
|
|
class Session:
|
|
def __init__(self, driver, config):
|
|
self.current_user = None
|
|
self.current_project = None
|
|
self.driver = driver
|
|
self.credentials = {
|
|
'user': (
|
|
config.identity.username,
|
|
config.identity.password,
|
|
config.identity.home_project,
|
|
),
|
|
'admin': (
|
|
config.identity.admin_username,
|
|
config.identity.admin_password,
|
|
config.identity.admin_home_project,
|
|
),
|
|
}
|
|
self.logout_url = '/'.join((
|
|
config.dashboard.dashboard_url,
|
|
'auth',
|
|
'logout',
|
|
))
|
|
|
|
def login(self, user, project=None):
|
|
if project is None:
|
|
project = self.credentials[user][2]
|
|
if self.current_user != user:
|
|
username, password, home_project = self.credentials[user]
|
|
self.driver.get(self.logout_url)
|
|
user_field = self.driver.find_element_by_id('id_username')
|
|
user_field.send_keys(username)
|
|
pass_field = self.driver.find_element_by_id('id_password')
|
|
pass_field.send_keys(password)
|
|
button = self.driver.find_element_by_css_selector(
|
|
'div.panel-footer button.btn')
|
|
button.click()
|
|
self.current_user = user
|
|
self.current_project = self.driver.find_element_by_xpath(
|
|
'//*[@class="context-project"]').text
|
|
if self.current_project != project:
|
|
dropdown_project = self.driver.find_element_by_xpath(
|
|
'//*[@class="context-project"]//ancestor::ul')
|
|
dropdown_project.click()
|
|
selection = dropdown_project.find_element_by_xpath(
|
|
f'//span[contains(text(),"{project}")]')
|
|
selection.click()
|
|
self.current_project = self.driver.find_element_by_xpath(
|
|
'//*[@class="context-project"]').text
|
|
|
|
|
|
@pytest.fixture(scope='session')
|
|
def login(driver, config):
|
|
session = Session(driver, config)
|
|
return session.login
|
|
|
|
|
|
@pytest.hookimpl(tryfirst=True, hookwrapper=True)
|
|
def pytest_runtest_makereport(item, call):
|
|
"""A hook to save the failure state of a test."""
|
|
# execute all other hooks to obtain the report object
|
|
outcome = yield
|
|
rep = outcome.get_result()
|
|
|
|
item.stash[STASH_FAILED] = item.stash.get(STASH_FAILED, False) or rep.failed
|
|
|
|
|
|
@pytest.fixture(scope='function', autouse=True)
|
|
def save_screenshot(request, report_dir, driver):
|
|
yield None
|
|
if not request.node.stash.get(STASH_FAILED, False):
|
|
return
|
|
screen_path = os.path.join(report_dir, 'screenshot.png')
|
|
driver.get_screenshot_as_file(screen_path)
|
|
|
|
|
|
@pytest.fixture(scope='function', autouse=True)
|
|
def save_page_source(request, report_dir, driver):
|
|
yield None
|
|
if not request.node.stash.get(STASH_FAILED, False):
|
|
return
|
|
source_path = os.path.join(report_dir, 'page.html')
|
|
html_elem = driver.find_element_by_tag_name("html")
|
|
page_source = html_elem.get_property("innerHTML")
|
|
with open(source_path, 'w') as f:
|
|
f.write(page_source)
|
|
|
|
|
|
@pytest.fixture(scope='function', autouse=True)
|
|
def record_video(request, report_dir, xdisplay):
|
|
if not os.environ.get('FFMPEG_INSTALLED', False):
|
|
yield None
|
|
return
|
|
filepath = os.path.join(report_dir, 'video.mp4')
|
|
frame_rate = 15
|
|
display, width, height = xdisplay
|
|
command = [
|
|
'ffmpeg',
|
|
'-video_size', '{}x{}'.format(width, height),
|
|
'-framerate', str(frame_rate),
|
|
'-f', 'x11grab',
|
|
'-i', display,
|
|
filepath,
|
|
]
|
|
fnull = open(os.devnull, 'w')
|
|
popen = subprocess.Popen(command, stdout=fnull, stderr=fnull)
|
|
yield None
|
|
popen.send_signal(signal.SIGINT)
|
|
|
|
def terminate_process():
|
|
limit = time.time() + 10
|
|
while time.time() < limit:
|
|
time.sleep(0.1)
|
|
if popen.poll() is not None:
|
|
return
|
|
os.kill(popen.pid, signal.SIGTERM)
|
|
|
|
thread = Thread(target=terminate_process)
|
|
thread.start()
|
|
popen.communicate()
|
|
thread.join()
|
|
if not request.node.stash.get(STASH_FAILED, False):
|
|
os.remove(filepath)
|
|
|
|
|
|
@pytest.fixture(scope='session')
|
|
def xdisplay():
|
|
IS_SELENIUM_HEADLESS = os.environ.get('SELENIUM_HEADLESS', False)
|
|
if IS_SELENIUM_HEADLESS:
|
|
width, height = 1920, 1080
|
|
vdisplay = xvfbwrapper.Xvfb(width=width, height=height)
|
|
args = []
|
|
|
|
# workaround for memory leak in Xvfb taken from:
|
|
# http://blog.jeffterrace.com/2012/07/xvfb-memory-leak-workaround.html
|
|
args.append("-noreset")
|
|
|
|
# disables X access control
|
|
args.append("-ac")
|
|
|
|
if hasattr(vdisplay, 'extra_xvfb_args'):
|
|
# xvfbwrapper 0.2.8 or newer
|
|
vdisplay.extra_xvfb_args.extend(args)
|
|
else:
|
|
vdisplay.xvfb_cmd.extend(args)
|
|
vdisplay.start()
|
|
display = vdisplay.new_display
|
|
else:
|
|
width, height = subprocess.check_output(
|
|
'xdpyinfo | grep "dimensions:"', shell=True
|
|
).decode().split(':', 1)[1].split()[0].strip().split('x')
|
|
vdisplay = None
|
|
display = subprocess.check_output(
|
|
'xdpyinfo | grep "name of display:"', shell=True
|
|
).decode().split(':', 1)[1].strip()
|
|
yield display, width, height
|
|
if vdisplay:
|
|
vdisplay.stop()
|
|
|
|
|
|
@pytest.fixture(scope='session')
|
|
def config():
|
|
return horizon_config.get_config()
|
|
|
|
|
|
@pytest.fixture(scope='function')
|
|
def report_dir(request, config):
|
|
root_path = os.path.dirname(os.path.abspath(horizon_config.__file__))
|
|
test_name = request.node.nodeid.rsplit('/', 1)[1]
|
|
report_dir = os.path.join(
|
|
root_path, config.selenium.screenshots_directory, test_name)
|
|
if not os.path.isdir(report_dir):
|
|
os.makedirs(report_dir)
|
|
yield report_dir
|
|
try:
|
|
os.rmdir(report_dir) # delete if empty
|
|
except OSError:
|
|
pass
|
|
|
|
|
|
@pytest.fixture(scope='session')
|
|
def driver(config, xdisplay):
|
|
# Start a virtual display server for running the tests headless.
|
|
IS_SELENIUM_HEADLESS = os.environ.get('SELENIUM_HEADLESS', False)
|
|
# Increase the default Python socket timeout from nothing
|
|
# to something that will cope with slow webdriver startup times.
|
|
# This *just* affects the communication between this test process
|
|
# and the webdriver.
|
|
socket.setdefaulttimeout(60)
|
|
# Start the Selenium webdriver and setup configuration.
|
|
desired_capabilities = dict(webdriver.desired_capabilities)
|
|
desired_capabilities['loggingPrefs'] = {'browser': 'ALL'}
|
|
driver = webdriver.WebDriver(
|
|
desired_capabilities=desired_capabilities
|
|
)
|
|
if config.selenium.maximize_browser:
|
|
driver.maximize_window()
|
|
if IS_SELENIUM_HEADLESS: # force full screen in xvfb
|
|
display, width, height = xdisplay
|
|
driver.set_window_size(width, height)
|
|
|
|
driver.implicitly_wait(config.selenium.implicit_wait)
|
|
driver.set_page_load_timeout(config.selenium.page_timeout)
|
|
yield driver
|
|
driver.quit()
|