702 lines
26 KiB
Python
702 lines
26 KiB
Python
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
#
|
|
|
|
import contextlib
|
|
from io import StringIO
|
|
import itertools
|
|
import sys
|
|
from unittest import mock
|
|
import urllib.parse as urlparse
|
|
|
|
import fixtures
|
|
from oslo_utils import encodeutils
|
|
from oslotest import base
|
|
import requests
|
|
|
|
from gbpclient import gbpshell as shell
|
|
from gbpclient.v2_0 import client
|
|
from neutronclient.common import constants
|
|
from neutronclient.common import exceptions
|
|
from neutronclient.tests.unit import test_http
|
|
from openstackclient.tests.unit import fakes
|
|
|
|
API_VERSION = "2.0"
|
|
TOKEN = test_http.AUTH_TOKEN
|
|
ENDURL = test_http.END_URL
|
|
REQUEST_ID = 'test_request_id'
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def capture_std_streams():
|
|
fake_stdout, fake_stderr = StringIO(), StringIO()
|
|
stdout, stderr = sys.stdout, sys.stderr
|
|
try:
|
|
sys.stdout, sys.stderr = fake_stdout, fake_stderr
|
|
yield fake_stdout, fake_stderr
|
|
finally:
|
|
sys.stdout, sys.stderr = stdout, stderr
|
|
|
|
|
|
class ParserException(Exception):
|
|
pass
|
|
|
|
|
|
class FakeStdout(fakes.FakeStdout):
|
|
def isatty(self):
|
|
return True
|
|
|
|
|
|
class MyRequest(requests.Request):
|
|
def __init__(self, method=None):
|
|
self.method = method
|
|
|
|
|
|
class MyResp(requests.Response):
|
|
def __init__(self, status_code, headers=None, reason=None,
|
|
request=None, url=None):
|
|
self.status_code = status_code
|
|
self.headers = headers or {}
|
|
self.reason = reason
|
|
self.request = request or MyRequest()
|
|
self.url = url
|
|
|
|
|
|
class MyApp(object):
|
|
def __init__(self, _stdout):
|
|
self.stdout = _stdout
|
|
|
|
|
|
def end_url(path, query=None):
|
|
_url_str = ENDURL + "/v" + API_VERSION + path
|
|
return query and _url_str + "?" + query or _url_str
|
|
|
|
|
|
class MyUrlComparator(object):
|
|
def __init__(self, lhs, client):
|
|
self.lhs = lhs
|
|
self.client = client
|
|
|
|
def __eq__(self, rhs):
|
|
lhsp = urlparse.urlparse(self.lhs)
|
|
rhsp = urlparse.urlparse(rhs)
|
|
|
|
lhs_qs = urlparse.parse_qsl(lhsp.query)
|
|
rhs_qs = urlparse.parse_qsl(rhsp.query)
|
|
|
|
return (lhsp.scheme == rhsp.scheme and
|
|
lhsp.netloc == rhsp.netloc and
|
|
lhsp.path == rhsp.path and
|
|
len(lhs_qs) == len(rhs_qs) and
|
|
set(lhs_qs) == set(rhs_qs))
|
|
|
|
def __str__(self):
|
|
return self.lhs
|
|
|
|
def __repr__(self):
|
|
return str(self)
|
|
|
|
|
|
class MyComparator(object):
|
|
def __init__(self, lhs, client):
|
|
self.lhs = lhs
|
|
self.client = client
|
|
|
|
def _com_dict(self, lhs, rhs):
|
|
if len(lhs) != len(rhs):
|
|
return False
|
|
for key, value in lhs.items():
|
|
if key not in rhs:
|
|
return False
|
|
rhs_value = rhs[key]
|
|
if not self._com(value, rhs_value):
|
|
return False
|
|
return True
|
|
|
|
def _com_list(self, lhs, rhs):
|
|
if len(lhs) != len(rhs):
|
|
return False
|
|
for lhs_value in lhs:
|
|
if lhs_value not in rhs:
|
|
return False
|
|
return True
|
|
|
|
def _com(self, lhs, rhs):
|
|
if lhs is None:
|
|
return rhs is None
|
|
if isinstance(lhs, dict):
|
|
if not isinstance(rhs, dict):
|
|
return False
|
|
return self._com_dict(lhs, rhs)
|
|
if isinstance(lhs, list):
|
|
if not isinstance(rhs, list):
|
|
return False
|
|
return self._com_list(lhs, rhs)
|
|
if isinstance(lhs, tuple):
|
|
if not isinstance(rhs, tuple):
|
|
return False
|
|
return self._com_list(lhs, rhs)
|
|
return lhs == rhs
|
|
|
|
def __eq__(self, rhs):
|
|
if self.client:
|
|
rhs = self.client.deserialize(rhs, 200)
|
|
return self._com(self.lhs, rhs)
|
|
|
|
def __repr__(self):
|
|
if self.client:
|
|
return self.client.serialize(self.lhs)
|
|
return str(self.lhs)
|
|
|
|
|
|
class ContainsKeyValue(object):
|
|
"""Checks whether key/value pair(s) are included in a dict parameter.
|
|
|
|
This class just checks whether specifid key/value pairs passed in
|
|
__init__() are included in a dict parameter. The comparison does not
|
|
fail even if other key/value pair(s) exists in a target dict.
|
|
"""
|
|
|
|
def __init__(self, expected):
|
|
self._expected = expected
|
|
|
|
def __eq__(self, other):
|
|
if not isinstance(other, dict):
|
|
return False
|
|
for key, value in self._expected.items():
|
|
if key not in other:
|
|
return False
|
|
if other[key] != value:
|
|
return False
|
|
return True
|
|
|
|
def __repr__(self):
|
|
return ('<%s (expected: %s)>' %
|
|
(self.__class__.__name__, self._expected))
|
|
|
|
|
|
class IsA(object):
|
|
"""Checks whether the parameter is of specific type."""
|
|
|
|
def __init__(self, expected_type):
|
|
self._expected_type = expected_type
|
|
|
|
def __eq__(self, other):
|
|
return isinstance(other, self._expected_type)
|
|
|
|
def __repr__(self):
|
|
return ('<%s (expected: %s)>' %
|
|
(self.__class__.__name__, self._expected_type))
|
|
|
|
|
|
class CLITestV20Base(base.BaseTestCase):
|
|
|
|
test_id = 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa'
|
|
id_field = 'id'
|
|
|
|
non_admin_status_resources = []
|
|
|
|
def _find_resourceid(self, client, resource, name_or_id,
|
|
cmd_resource=None, parent_id=None):
|
|
return name_or_id
|
|
|
|
def setUp(self, plurals=None):
|
|
"""Prepare the test environment."""
|
|
super(CLITestV20Base, self).setUp()
|
|
client.Client.EXTED_PLURALS.update(constants.PLURALS)
|
|
if plurals is not None:
|
|
client.Client.EXTED_PLURALS.update(plurals)
|
|
self.metadata = {'plurals': client.Client.EXTED_PLURALS}
|
|
self.endurl = ENDURL
|
|
self.fake_stdout = FakeStdout()
|
|
|
|
self.addCleanup(mock.patch.stopall)
|
|
mock.patch('sys.stdout', new=self.fake_stdout).start()
|
|
mock.patch('neutronclient.neutron.v2_0.find_resourceid_by_name_or_id',
|
|
new=self._find_resourceid).start()
|
|
mock.patch('neutronclient.neutron.v2_0.find_resourceid_by_id',
|
|
new=self._find_resourceid).start()
|
|
|
|
self.client = client.Client(token=TOKEN, endpoint_url=self.endurl)
|
|
|
|
def register_non_admin_status_resource(self, resource_name):
|
|
# TODO(amotoki):
|
|
# It is recommended to define
|
|
# "non_admin_status_resources in each test class rather than
|
|
# using register_non_admin_status_resource method.
|
|
|
|
# If we change self.non_admin_status_resources like this,
|
|
# we need to ensure this should be an instance variable
|
|
# to avoid changing the class variable.
|
|
if (id(self.non_admin_status_resources) ==
|
|
id(self.__class__.non_admin_status_resources)):
|
|
self.non_admin_status_resources = (self.__class__.
|
|
non_admin_status_resources[:])
|
|
self.non_admin_status_resources.append(resource_name)
|
|
|
|
def _test_create_resource(self, resource, cmd, name, myid, args,
|
|
position_names, position_values,
|
|
tenant_id=None, tags=None, admin_state_up=True,
|
|
extra_body=None, cmd_resource=None,
|
|
parent_id=None, **kwargs):
|
|
if not cmd_resource:
|
|
cmd_resource = resource
|
|
body = {resource: {}, }
|
|
if tenant_id:
|
|
body[resource].update({'tenant_id': tenant_id})
|
|
if tags:
|
|
body[resource].update({'tags': tags})
|
|
if extra_body:
|
|
body[resource].update(extra_body)
|
|
body[resource].update(kwargs)
|
|
|
|
for i in range(len(position_names)):
|
|
body[resource].update({position_names[i]: position_values[i]})
|
|
ress = {resource:
|
|
{self.id_field: myid}, }
|
|
if name:
|
|
ress[resource].update({'name': name})
|
|
resstr = self.client.serialize(ress)
|
|
# url method body
|
|
resource_plural = self.client.get_resource_plural(cmd_resource)
|
|
path = getattr(self.client, resource_plural + "_path")
|
|
if parent_id:
|
|
path = path % parent_id
|
|
mock_body = MyComparator(body, self.client)
|
|
cmd_parser = cmd.get_parser('create_' + resource)
|
|
resp = (MyResp(200), resstr)
|
|
|
|
with mock.patch.object(
|
|
cmd, "get_client", return_value=self.client
|
|
) as mock_get_client, mock.patch.object(
|
|
self.client.httpclient, "request", return_value=resp
|
|
) as mock_request:
|
|
shell.run_command(cmd, cmd_parser, args)
|
|
|
|
self.assert_mock_multiple_calls_with_same_arguments(
|
|
mock_get_client, mock.call(), None)
|
|
|
|
mock_request.assert_called_once_with(
|
|
end_url(path), 'POST',
|
|
body=mock_body,
|
|
headers=ContainsKeyValue(
|
|
{'X-Auth-Token': TOKEN}))
|
|
|
|
_str = self.fake_stdout.make_string()
|
|
self.assertIn(myid, _str)
|
|
if name:
|
|
self.assertIn(name, _str)
|
|
|
|
def _test_list_resources(self, resources, cmd, detail=False, tags=(),
|
|
fields_1=(), fields_2=(), page_size=None,
|
|
sort_key=(), sort_dir=(), response_contents=None,
|
|
base_args=None, path=None, cmd_resources=None,
|
|
parent_id=None, output_format=None, query=""):
|
|
if not cmd_resources:
|
|
cmd_resources = resources
|
|
if response_contents is None:
|
|
contents = [{self.id_field: 'myid1', },
|
|
{self.id_field: 'myid2', }, ]
|
|
else:
|
|
contents = response_contents
|
|
reses = {resources: contents}
|
|
resstr = self.client.serialize(reses)
|
|
# url method body
|
|
args = base_args if base_args is not None else []
|
|
if detail:
|
|
args.append('-D')
|
|
if fields_1:
|
|
for field in fields_1:
|
|
args.append('--fields')
|
|
args.append(field)
|
|
|
|
if tags:
|
|
args.append('--')
|
|
args.append("--tag")
|
|
for tag in tags:
|
|
args.append(tag)
|
|
tag_query = urlparse.urlencode(
|
|
{'tag': encodeutils.safe_encode(tag)})
|
|
if query:
|
|
query += "&" + tag_query
|
|
else:
|
|
query = tag_query
|
|
if (not tags) and fields_2:
|
|
args.append('--')
|
|
if fields_2:
|
|
args.append("--fields")
|
|
for field in fields_2:
|
|
args.append(field)
|
|
if detail:
|
|
query = query and query + '&verbose=True' or 'verbose=True'
|
|
for field in itertools.chain(fields_1, fields_2):
|
|
if query:
|
|
query += "&fields=" + field
|
|
else:
|
|
query = "fields=" + field
|
|
if page_size:
|
|
args.append("--page-size")
|
|
args.append(str(page_size))
|
|
if query:
|
|
query += "&limit=%s" % page_size
|
|
else:
|
|
query = "limit=%s" % page_size
|
|
if sort_key:
|
|
for key in sort_key:
|
|
args.append('--sort-key')
|
|
args.append(key)
|
|
if query:
|
|
query += '&'
|
|
query += 'sort_key=%s' % key
|
|
if sort_dir:
|
|
len_diff = len(sort_key) - len(sort_dir)
|
|
if len_diff > 0:
|
|
sort_dir = tuple(sort_dir) + ('asc',) * len_diff
|
|
elif len_diff < 0:
|
|
sort_dir = sort_dir[:len(sort_key)]
|
|
for dir in sort_dir:
|
|
args.append('--sort-dir')
|
|
args.append(dir)
|
|
if query:
|
|
query += '&'
|
|
query += 'sort_dir=%s' % dir
|
|
if path is None:
|
|
path = getattr(self.client, cmd_resources + "_path")
|
|
if parent_id:
|
|
path = path % parent_id
|
|
if output_format:
|
|
args.append('-f')
|
|
args.append(output_format)
|
|
cmd_parser = cmd.get_parser("list_" + cmd_resources)
|
|
resp = (MyResp(200), resstr)
|
|
|
|
with mock.patch.object(cmd, "get_client",
|
|
return_value=self.client) as mock_get_client, \
|
|
mock.patch.object(self.client.httpclient, "request",
|
|
return_value=resp) as mock_request:
|
|
shell.run_command(cmd, cmd_parser, args)
|
|
|
|
self.assert_mock_multiple_calls_with_same_arguments(
|
|
mock_get_client, mock.call(), None)
|
|
mock_request.assert_called_once_with(
|
|
MyUrlComparator(end_url(path, query), self.client),
|
|
'GET',
|
|
body=None,
|
|
headers=ContainsKeyValue({'X-Auth-Token': TOKEN}))
|
|
_str = self.fake_stdout.make_string()
|
|
if response_contents is None:
|
|
self.assertIn('myid1', _str)
|
|
return _str
|
|
|
|
def _test_list_resources_with_pagination(self, resources, cmd,
|
|
base_args=None,
|
|
cmd_resources=None,
|
|
parent_id=None, query=""):
|
|
if not cmd_resources:
|
|
cmd_resources = resources
|
|
|
|
path = getattr(self.client, cmd_resources + "_path")
|
|
if parent_id:
|
|
path = path % parent_id
|
|
fake_query = "marker=myid2&limit=2"
|
|
reses1 = {resources: [{'id': 'myid1', },
|
|
{'id': 'myid2', }],
|
|
'%s_links' % resources: [{'href': end_url(path, fake_query),
|
|
'rel': 'next'}]}
|
|
reses2 = {resources: [{'id': 'myid3', },
|
|
{'id': 'myid4', }]}
|
|
resstr1 = self.client.serialize(reses1)
|
|
resstr2 = self.client.serialize(reses2)
|
|
cmd_parser = cmd.get_parser("list_" + cmd_resources)
|
|
args = base_args if base_args is not None else []
|
|
mock_request_calls = [
|
|
mock.call(
|
|
end_url(path, query), 'GET',
|
|
body=None,
|
|
headers=ContainsKeyValue({'X-Auth-Token': TOKEN})),
|
|
mock.call(
|
|
MyUrlComparator(end_url(path, fake_query),
|
|
self.client), 'GET',
|
|
body=None,
|
|
headers=ContainsKeyValue({'X-Auth-Token': TOKEN}))]
|
|
mock_request_resp = [(MyResp(200), resstr1), (MyResp(200), resstr2)]
|
|
|
|
with mock.patch.object(cmd, "get_client",
|
|
return_value=self.client) as mock_get_client, \
|
|
mock.patch.object(self.client.httpclient,
|
|
"request") as mock_request:
|
|
mock_request.side_effect = mock_request_resp
|
|
shell.run_command(cmd, cmd_parser, args)
|
|
|
|
self.assert_mock_multiple_calls_with_same_arguments(
|
|
mock_get_client, mock.call(), None)
|
|
self.assertEqual(2, mock_request.call_count)
|
|
mock_request.assert_has_calls(mock_request_calls)
|
|
|
|
def _test_update_resource(self, resource, cmd, myid, args, extrafields,
|
|
cmd_resource=None, parent_id=None):
|
|
if not cmd_resource:
|
|
cmd_resource = resource
|
|
|
|
body = {resource: extrafields}
|
|
path = getattr(self.client, cmd_resource + "_path")
|
|
if parent_id:
|
|
path = path % (parent_id, myid)
|
|
else:
|
|
path = path % myid
|
|
mock_body = MyComparator(body, self.client)
|
|
|
|
cmd_parser = cmd.get_parser("update_" + cmd_resource)
|
|
resp = (MyResp(204), None)
|
|
|
|
with mock.patch.object(cmd, "get_client",
|
|
return_value=self.client) as mock_get_client, \
|
|
mock.patch.object(self.client.httpclient, "request",
|
|
return_value=resp) as mock_request:
|
|
shell.run_command(cmd, cmd_parser, args)
|
|
|
|
self.assert_mock_multiple_calls_with_same_arguments(
|
|
mock_get_client, mock.call(), None)
|
|
mock_request.assert_called_once_with(
|
|
MyUrlComparator(end_url(path), self.client),
|
|
'PUT',
|
|
body=mock_body,
|
|
headers=ContainsKeyValue({'X-Auth-Token': TOKEN}))
|
|
_str = self.fake_stdout.make_string()
|
|
self.assertIn(myid, _str)
|
|
|
|
def _test_show_resource(self, resource, cmd, myid, args, fields=(),
|
|
cmd_resource=None, parent_id=None):
|
|
if not cmd_resource:
|
|
cmd_resource = resource
|
|
|
|
query = "&".join(["fields=%s" % field for field in fields])
|
|
expected_res = {resource:
|
|
{self.id_field: myid,
|
|
'name': 'myname', }, }
|
|
resstr = self.client.serialize(expected_res)
|
|
path = getattr(self.client, cmd_resource + "_path")
|
|
if parent_id:
|
|
path = path % (parent_id, myid)
|
|
else:
|
|
path = path % myid
|
|
cmd_parser = cmd.get_parser("show_" + cmd_resource)
|
|
resp = (MyResp(200), resstr)
|
|
|
|
with mock.patch.object(cmd, "get_client",
|
|
return_value=self.client) as mock_get_client, \
|
|
mock.patch.object(self.client.httpclient, "request",
|
|
return_value=resp) as mock_request:
|
|
shell.run_command(cmd, cmd_parser, args)
|
|
|
|
self.assert_mock_multiple_calls_with_same_arguments(
|
|
mock_get_client, mock.call(), None)
|
|
mock_request.assert_called_once_with(
|
|
end_url(path, query), 'GET',
|
|
body=None,
|
|
headers=ContainsKeyValue({'X-Auth-Token': TOKEN}))
|
|
_str = self.fake_stdout.make_string()
|
|
self.assertIn(myid, _str)
|
|
self.assertIn('myname', _str)
|
|
|
|
def _test_set_path_and_delete(self, path, parent_id, myid,
|
|
mock_request_calls, mock_request_returns,
|
|
delete_fail=False):
|
|
return_val = 404 if delete_fail else 204
|
|
if parent_id:
|
|
path = path % (parent_id, myid)
|
|
else:
|
|
path = path % (myid)
|
|
mock_request_returns.append((MyResp(return_val), None))
|
|
mock_request_calls.append(mock.call(
|
|
end_url(path), 'DELETE',
|
|
body=None,
|
|
headers=ContainsKeyValue({'X-Auth-Token': TOKEN})))
|
|
|
|
def _test_delete_resource(self, resource, cmd, myid, args,
|
|
cmd_resource=None, parent_id=None,
|
|
extra_id=None, delete_fail=False):
|
|
mock_request_calls = []
|
|
mock_request_returns = []
|
|
if not cmd_resource:
|
|
cmd_resource = resource
|
|
path = getattr(self.client, cmd_resource + "_path")
|
|
self._test_set_path_and_delete(path, parent_id, myid,
|
|
mock_request_calls,
|
|
mock_request_returns)
|
|
# extra_id is used to test for bulk_delete
|
|
if extra_id:
|
|
self._test_set_path_and_delete(path, parent_id, extra_id,
|
|
mock_request_calls,
|
|
mock_request_returns,
|
|
delete_fail)
|
|
cmd_parser = cmd.get_parser("delete_" + cmd_resource)
|
|
|
|
with mock.patch.object(cmd, "get_client",
|
|
return_value=self.client) as mock_get_client, \
|
|
mock.patch.object(self.client.httpclient,
|
|
"request") as mock_request:
|
|
mock_request.side_effect = mock_request_returns
|
|
shell.run_command(cmd, cmd_parser, args)
|
|
|
|
self.assert_mock_multiple_calls_with_same_arguments(
|
|
mock_get_client, mock.call(), None)
|
|
mock_request.assert_has_calls(mock_request_calls)
|
|
_str = self.fake_stdout.make_string()
|
|
self.assertIn(myid, _str)
|
|
if extra_id:
|
|
self.assertIn(extra_id, _str)
|
|
|
|
def assert_mock_multiple_calls_with_same_arguments(
|
|
self, mocked_method, expected_call, count):
|
|
if count is None:
|
|
self.assertLessEqual(1, mocked_method.call_count)
|
|
else:
|
|
self.assertEqual(count, mocked_method.call_count)
|
|
mocked_method.assert_has_calls(
|
|
[expected_call] * mocked_method.call_count)
|
|
|
|
def check_parser_ext(self, cmd, args, verify_args, ext):
|
|
cmd_parser = self.cmd.get_parser('check_parser')
|
|
cmd_parser = ext.get_parser(cmd_parser)
|
|
stderr = StringIO()
|
|
with fixtures.MonkeyPatch('sys.stderr', stderr):
|
|
try:
|
|
parsed_args = cmd_parser.parse_args(args)
|
|
except SystemExit:
|
|
raise ParserException("Argument parse failed: %s" %
|
|
stderr.getvalue())
|
|
for av in verify_args:
|
|
attr, value = av
|
|
if attr:
|
|
self.assertIn(attr, parsed_args)
|
|
self.assertEqual(value, getattr(parsed_args, attr))
|
|
return parsed_args
|
|
|
|
|
|
class ClientV2TestJson(CLITestV20Base):
|
|
|
|
pass
|
|
|
|
|
|
class CLITestV20ExceptionHandler(CLITestV20Base):
|
|
|
|
def _test_exception_handler_v20(
|
|
self, expected_exception, status_code, expected_msg,
|
|
error_type=None, error_msg=None, error_detail=None,
|
|
error_content=None):
|
|
if error_content is None:
|
|
error_content = {'NeutronError': {'type': error_type,
|
|
'message': error_msg,
|
|
'detail': error_detail}}
|
|
|
|
e = self.assertRaises(expected_exception,
|
|
client.exception_handler_v20,
|
|
status_code, error_content)
|
|
self.assertEqual(status_code, e.status_code)
|
|
|
|
if expected_msg is None:
|
|
if error_detail:
|
|
expected_msg = '\n'.join([error_msg, error_detail])
|
|
else:
|
|
expected_msg = error_msg
|
|
self.assertEqual(expected_msg, e.message)
|
|
|
|
def test_exception_handler_v20_neutron_known_error(self):
|
|
# TODO(Sumit): This needs to be adapted for GBP
|
|
pass
|
|
|
|
def test_exception_handler_v20_neutron_known_error_without_detail(self):
|
|
# TODO(Sumit): This needs to be adapted for GBP
|
|
pass
|
|
|
|
def test_exception_handler_v20_unknown_error_to_per_code_exception(self):
|
|
for status_code, client_exc in list(
|
|
exceptions.HTTP_EXCEPTION_MAP.items()):
|
|
error_msg = 'Unknown error'
|
|
error_detail = 'This is detail'
|
|
self._test_exception_handler_v20(
|
|
client_exc, status_code,
|
|
error_msg + '\n' + error_detail,
|
|
'UnknownError', error_msg, error_detail)
|
|
|
|
def test_exception_handler_v20_neutron_unknown_status_code(self):
|
|
error_msg = 'Unknown error'
|
|
error_detail = 'This is detail'
|
|
self._test_exception_handler_v20(
|
|
exceptions.NeutronClientException, 501,
|
|
error_msg + '\n' + error_detail,
|
|
'UnknownError', error_msg, error_detail)
|
|
|
|
def test_exception_handler_v20_bad_neutron_error(self):
|
|
error_content = {'NeutronError': {'unknown_key': 'UNKNOWN'}}
|
|
self._test_exception_handler_v20(
|
|
exceptions.NeutronClientException, 500,
|
|
expected_msg={'unknown_key': 'UNKNOWN'},
|
|
error_content=error_content)
|
|
|
|
def test_exception_handler_v20_error_dict_contains_message(self):
|
|
error_content = {'message': 'This is an error message'}
|
|
self._test_exception_handler_v20(
|
|
exceptions.NeutronClientException, 500,
|
|
expected_msg='This is an error message',
|
|
error_content=error_content)
|
|
|
|
def test_exception_handler_v20_error_dict_not_contain_message(self):
|
|
error_content = {'error': 'This is an error message'}
|
|
expected_msg = '%s-%s' % (500, error_content)
|
|
self._test_exception_handler_v20(
|
|
exceptions.NeutronClientException, 500,
|
|
expected_msg=expected_msg,
|
|
error_content=error_content)
|
|
|
|
def test_exception_handler_v20_default_fallback(self):
|
|
error_content = 'This is an error message'
|
|
expected_msg = '%s-%s' % (500, error_content)
|
|
self._test_exception_handler_v20(
|
|
exceptions.NeutronClientException, 500,
|
|
expected_msg=expected_msg,
|
|
error_content=error_content)
|
|
|
|
def test_exception_status(self):
|
|
e = exceptions.BadRequest()
|
|
self.assertEqual(e.status_code, 400)
|
|
|
|
e = exceptions.BadRequest(status_code=499)
|
|
self.assertEqual(e.status_code, 499)
|
|
|
|
# SslCertificateValidationError has no explicit status_code,
|
|
# but should have a 'safe' defined fallback.
|
|
e = exceptions.SslCertificateValidationError()
|
|
self.assertIsNotNone(e.status_code)
|
|
|
|
e = exceptions.SslCertificateValidationError(status_code=599)
|
|
self.assertEqual(e.status_code, 599)
|
|
|
|
def test_connection_failed(self):
|
|
self.client.httpclient.auth_token = 'token'
|
|
excp = requests.exceptions.ConnectionError('Connection refused')
|
|
|
|
with mock.patch.object(self.client.httpclient, "request",
|
|
side_effect=excp) as mock_request:
|
|
error = self.assertRaises(exceptions.ConnectionFailed,
|
|
self.client.get, '/test')
|
|
|
|
mock_request.assert_called_once_with(
|
|
end_url('/test'), 'GET',
|
|
body=None,
|
|
headers=ContainsKeyValue(
|
|
{'X-Auth-Token': 'token'}))
|
|
# NB: ConnectionFailed has no explicit status_code, so this
|
|
# tests that there is a fallback defined.
|
|
self.assertIsNotNone(error.status_code)
|