
Currently the tox py34 target uses testtools.run to run a subset of our test harness. We need to be able to use pretty_tox.sh just like py27 as we make progress with py34 support. The first step is to make sure we can discover all the tests using: python -m subunit.run discover -t . ./nova/tests/ --list So, we need to fix a bunch of things for the discovery to work including updating to a new version of websockify. In the xen code, we should keep the original import and add except for py34 as xen uses an older python that does not work with six.moves. Depends-On: Ib4ef2e79b28b7180e564b3d6dc2188456c66c08a Change-Id: I88b6746da6136a7386a173f6cacd42f0b470deee
635 lines
24 KiB
Python
635 lines
24 KiB
Python
# Copyright 2010 United States Government as represented by the
|
|
# Administrator of the National Aeronautics and Space Administration.
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
"""Unit tests for the API endpoint."""
|
|
|
|
import random
|
|
import re
|
|
from six.moves import StringIO
|
|
|
|
import boto
|
|
import boto.connection
|
|
from boto.ec2 import regioninfo
|
|
from boto import exception as boto_exc
|
|
# newer versions of boto use their own wrapper on top of httplib.HTTPResponse
|
|
if hasattr(boto.connection, 'HTTPResponse'):
|
|
httplib = boto.connection
|
|
else:
|
|
from six.moves import http_client as httplib
|
|
import fixtures
|
|
from oslo_utils import versionutils
|
|
import webob
|
|
|
|
from nova.api import auth
|
|
from nova.api import ec2
|
|
from nova.api.ec2 import ec2utils
|
|
from nova import block_device
|
|
from nova import context
|
|
from nova import exception
|
|
from nova import test
|
|
from nova.tests.unit import matchers
|
|
|
|
|
|
class FakeHttplibSocket(object):
|
|
"""a fake socket implementation for httplib.HTTPResponse, trivial."""
|
|
def __init__(self, response_string):
|
|
self.response_string = response_string
|
|
self._buffer = StringIO(response_string)
|
|
|
|
def makefile(self, _mode, _other):
|
|
"""Returns the socket's internal buffer."""
|
|
return self._buffer
|
|
|
|
|
|
class FakeHttplibConnection(object):
|
|
"""A fake httplib.HTTPConnection for boto to use
|
|
|
|
requests made via this connection actually get translated and routed into
|
|
our WSGI app, we then wait for the response and turn it back into
|
|
the HTTPResponse that boto expects.
|
|
"""
|
|
def __init__(self, app, host, is_secure=False):
|
|
self.app = app
|
|
self.host = host
|
|
|
|
def request(self, method, path, data, headers):
|
|
req = webob.Request.blank(path)
|
|
req.method = method
|
|
req.body = data
|
|
req.headers = headers
|
|
req.headers['Accept'] = 'text/html'
|
|
req.host = self.host
|
|
# Call the WSGI app, get the HTTP response
|
|
resp = str(req.get_response(self.app))
|
|
# For some reason, the response doesn't have "HTTP/1.0 " prepended; I
|
|
# guess that's a function the web server usually provides.
|
|
resp = "HTTP/1.0 %s" % resp
|
|
self.sock = FakeHttplibSocket(resp)
|
|
self.http_response = httplib.HTTPResponse(self.sock)
|
|
# NOTE(vish): boto is accessing private variables for some reason
|
|
self._HTTPConnection__response = self.http_response
|
|
self.http_response.begin()
|
|
|
|
def getresponse(self):
|
|
return self.http_response
|
|
|
|
def getresponsebody(self):
|
|
return self.sock.response_string
|
|
|
|
def close(self):
|
|
"""Required for compatibility with boto/tornado."""
|
|
pass
|
|
|
|
|
|
class XmlConversionTestCase(test.NoDBTestCase):
|
|
"""Unit test api xml conversion."""
|
|
def test_number_conversion(self):
|
|
conv = ec2utils._try_convert
|
|
self.assertIsNone(conv('None'))
|
|
self.assertEqual(conv('True'), True)
|
|
self.assertEqual(conv('TRUE'), True)
|
|
self.assertEqual(conv('true'), True)
|
|
self.assertEqual(conv('False'), False)
|
|
self.assertEqual(conv('FALSE'), False)
|
|
self.assertEqual(conv('false'), False)
|
|
self.assertEqual(conv('0'), 0)
|
|
self.assertEqual(conv('42'), 42)
|
|
self.assertEqual(conv('3.14'), 3.14)
|
|
self.assertEqual(conv('-57.12'), -57.12)
|
|
self.assertEqual(conv('0x57'), 0x57)
|
|
self.assertEqual(conv('-0x57'), -0x57)
|
|
self.assertEqual(conv('-'), '-')
|
|
self.assertEqual(conv('-0'), 0)
|
|
self.assertEqual(conv('0.0'), 0.0)
|
|
self.assertEqual(conv('1e-8'), 0.0)
|
|
self.assertEqual(conv('-1e-8'), 0.0)
|
|
self.assertEqual(conv('0xDD8G'), '0xDD8G')
|
|
self.assertEqual(conv('0XDD8G'), '0XDD8G')
|
|
self.assertEqual(conv('-stringy'), '-stringy')
|
|
self.assertEqual(conv('stringy'), 'stringy')
|
|
self.assertEqual(conv('add'), 'add')
|
|
self.assertEqual(conv('remove'), 'remove')
|
|
self.assertEqual(conv(''), '')
|
|
|
|
|
|
class Ec2utilsTestCase(test.NoDBTestCase):
|
|
def test_ec2_id_to_id(self):
|
|
self.assertEqual(ec2utils.ec2_id_to_id('i-0000001e'), 30)
|
|
self.assertEqual(ec2utils.ec2_id_to_id('ami-1d'), 29)
|
|
self.assertEqual(ec2utils.ec2_id_to_id('snap-0000001c'), 28)
|
|
self.assertEqual(ec2utils.ec2_id_to_id('vol-0000001b'), 27)
|
|
|
|
def test_bad_ec2_id(self):
|
|
self.assertRaises(exception.InvalidEc2Id,
|
|
ec2utils.ec2_id_to_id,
|
|
'badone')
|
|
|
|
def test_id_to_ec2_id(self):
|
|
self.assertEqual(ec2utils.id_to_ec2_id(30), 'i-0000001e')
|
|
self.assertEqual(ec2utils.id_to_ec2_id(29, 'ami-%08x'), 'ami-0000001d')
|
|
self.assertEqual(ec2utils.id_to_ec2_snap_id(28), 'snap-0000001c')
|
|
self.assertEqual(ec2utils.id_to_ec2_vol_id(27), 'vol-0000001b')
|
|
|
|
def test_dict_from_dotted_str(self):
|
|
in_str = [('BlockDeviceMapping.1.DeviceName', '/dev/sda1'),
|
|
('BlockDeviceMapping.1.Ebs.SnapshotId', 'snap-0000001c'),
|
|
('BlockDeviceMapping.1.Ebs.VolumeSize', '80'),
|
|
('BlockDeviceMapping.1.Ebs.DeleteOnTermination', 'false'),
|
|
('BlockDeviceMapping.2.DeviceName', '/dev/sdc'),
|
|
('BlockDeviceMapping.2.VirtualName', 'ephemeral0')]
|
|
expected_dict = {
|
|
'block_device_mapping': {
|
|
'1': {'device_name': '/dev/sda1',
|
|
'ebs': {'snapshot_id': 'snap-0000001c',
|
|
'volume_size': 80,
|
|
'delete_on_termination': False}},
|
|
'2': {'device_name': '/dev/sdc',
|
|
'virtual_name': 'ephemeral0'}}}
|
|
out_dict = ec2utils.dict_from_dotted_str(in_str)
|
|
|
|
self.assertThat(out_dict, matchers.DictMatches(expected_dict))
|
|
|
|
def test_properties_root_defice_name(self):
|
|
mappings = [{"device": "/dev/sda1", "virtual": "root"}]
|
|
properties0 = {'mappings': mappings}
|
|
properties1 = {'root_device_name': '/dev/sdb', 'mappings': mappings}
|
|
|
|
root_device_name = block_device.properties_root_device_name(
|
|
properties0)
|
|
self.assertEqual(root_device_name, '/dev/sda1')
|
|
|
|
root_device_name = block_device.properties_root_device_name(
|
|
properties1)
|
|
self.assertEqual(root_device_name, '/dev/sdb')
|
|
|
|
def test_regex_from_ec2_regex(self):
|
|
def _test_re(ec2_regex, expected, literal, match=True):
|
|
regex = ec2utils.regex_from_ec2_regex(ec2_regex)
|
|
self.assertEqual(regex, expected)
|
|
if match:
|
|
self.assertIsNotNone(re.match(regex, literal))
|
|
else:
|
|
self.assertIsNone(re.match(regex, literal))
|
|
|
|
# wildcards
|
|
_test_re('foo', '\Afoo\Z(?s)', 'foo')
|
|
_test_re('foo', '\Afoo\Z(?s)', 'baz', match=False)
|
|
_test_re('foo?bar', '\Afoo.bar\Z(?s)', 'foo bar')
|
|
_test_re('foo?bar', '\Afoo.bar\Z(?s)', 'foo bar', match=False)
|
|
_test_re('foo*bar', '\Afoo.*bar\Z(?s)', 'foo QUUX bar')
|
|
|
|
# backslashes and escaped wildcards
|
|
_test_re('foo\\', '\Afoo\\\\\Z(?s)', 'foo\\')
|
|
_test_re('foo*bar', '\Afoo.*bar\Z(?s)', 'zork QUUX bar', match=False)
|
|
_test_re('foo\\?bar', '\Afoo[?]bar\Z(?s)', 'foo?bar')
|
|
_test_re('foo\\?bar', '\Afoo[?]bar\Z(?s)', 'foo bar', match=False)
|
|
_test_re('foo\\*bar', '\Afoo[*]bar\Z(?s)', 'foo*bar')
|
|
_test_re('foo\\*bar', '\Afoo[*]bar\Z(?s)', 'foo bar', match=False)
|
|
|
|
# analog to the example given in the EC2 API docs
|
|
ec2_regex = '\*nova\?\\end'
|
|
expected = r'\A[*]nova[?]\\end\Z(?s)'
|
|
literal = r'*nova?\end'
|
|
_test_re(ec2_regex, expected, literal)
|
|
|
|
def test_mapping_prepend_dev(self):
|
|
mappings = [
|
|
{'virtual': 'ami',
|
|
'device': 'sda1'},
|
|
{'virtual': 'root',
|
|
'device': '/dev/sda1'},
|
|
|
|
{'virtual': 'swap',
|
|
'device': 'sdb1'},
|
|
{'virtual': 'swap',
|
|
'device': '/dev/sdb2'},
|
|
|
|
{'virtual': 'ephemeral0',
|
|
'device': 'sdc1'},
|
|
{'virtual': 'ephemeral1',
|
|
'device': '/dev/sdc1'}]
|
|
expected_result = [
|
|
{'virtual': 'ami',
|
|
'device': 'sda1'},
|
|
{'virtual': 'root',
|
|
'device': '/dev/sda1'},
|
|
|
|
{'virtual': 'swap',
|
|
'device': '/dev/sdb1'},
|
|
{'virtual': 'swap',
|
|
'device': '/dev/sdb2'},
|
|
|
|
{'virtual': 'ephemeral0',
|
|
'device': '/dev/sdc1'},
|
|
{'virtual': 'ephemeral1',
|
|
'device': '/dev/sdc1'}]
|
|
self.assertThat(block_device.mappings_prepend_dev(mappings),
|
|
matchers.DictListMatches(expected_result))
|
|
|
|
|
|
class ApiEc2TestCase(test.TestCase):
|
|
"""Unit test for the cloud controller on an EC2 API."""
|
|
def setUp(self):
|
|
super(ApiEc2TestCase, self).setUp()
|
|
self.host = '127.0.0.1'
|
|
# NOTE(vish): skipping the Authorizer
|
|
roles = ['sysadmin', 'netadmin']
|
|
ctxt = context.RequestContext('fake', 'fake', roles=roles)
|
|
self.app = auth.InjectContext(ctxt, ec2.FaultWrapper(
|
|
ec2.RequestLogging(ec2.Requestify(ec2.Authorizer(ec2.Executor()
|
|
), 'nova.api.ec2.cloud.CloudController'))))
|
|
self.useFixture(fixtures.FakeLogger('boto'))
|
|
|
|
def expect_http(self, host=None, is_secure=False, api_version=None):
|
|
"""Returns a new EC2 connection."""
|
|
self.ec2 = boto.connect_ec2(
|
|
aws_access_key_id='fake',
|
|
aws_secret_access_key='fake',
|
|
is_secure=False,
|
|
region=regioninfo.RegionInfo(None, 'test', self.host),
|
|
port=8773,
|
|
path='/services/Cloud')
|
|
if api_version:
|
|
self.ec2.APIVersion = api_version
|
|
|
|
self.mox.StubOutWithMock(self.ec2, 'new_http_connection')
|
|
self.http = FakeHttplibConnection(
|
|
self.app, '%s:8773' % (self.host), False)
|
|
if versionutils.is_compatible('2.14', boto.Version, same_major=False):
|
|
self.ec2.new_http_connection(host or self.host, 8773,
|
|
is_secure).AndReturn(self.http)
|
|
elif versionutils.is_compatible('2', boto.Version, same_major=False):
|
|
self.ec2.new_http_connection(host or '%s:8773' % (self.host),
|
|
is_secure).AndReturn(self.http)
|
|
else:
|
|
self.ec2.new_http_connection(host, is_secure).AndReturn(self.http)
|
|
return self.http
|
|
|
|
def test_xmlns_version_matches_request_version(self):
|
|
self.expect_http(api_version='2010-10-30')
|
|
self.mox.ReplayAll()
|
|
|
|
# Any request should be fine
|
|
self.ec2.get_all_instances()
|
|
self.assertIn(self.ec2.APIVersion, self.http.getresponsebody(),
|
|
'The version in the xmlns of the response does '
|
|
'not match the API version given in the request.')
|
|
|
|
def test_describe_instances(self):
|
|
"""Test that, after creating a user and a project, the describe
|
|
instances call to the API works properly.
|
|
"""
|
|
self.expect_http()
|
|
self.mox.ReplayAll()
|
|
self.assertEqual(self.ec2.get_all_instances(), [])
|
|
|
|
def test_terminate_invalid_instance(self):
|
|
# Attempt to terminate an invalid instance.
|
|
self.expect_http()
|
|
self.mox.ReplayAll()
|
|
self.assertRaises(boto_exc.EC2ResponseError,
|
|
self.ec2.terminate_instances, "i-00000005")
|
|
|
|
def test_get_all_key_pairs(self):
|
|
"""Test that, after creating a user and project and generating
|
|
a key pair, that the API call to list key pairs works properly.
|
|
"""
|
|
keyname = "".join(random.choice("sdiuisudfsdcnpaqwertasd")
|
|
for x in range(random.randint(4, 8)))
|
|
self.expect_http()
|
|
self.mox.ReplayAll()
|
|
self.ec2.create_key_pair(keyname)
|
|
rv = self.ec2.get_all_key_pairs()
|
|
results = [k for k in rv if k.name == keyname]
|
|
self.assertEqual(len(results), 1)
|
|
|
|
def test_create_duplicate_key_pair(self):
|
|
"""Test that, after successfully generating a keypair,
|
|
requesting a second keypair with the same name fails sanely.
|
|
"""
|
|
self.expect_http()
|
|
self.mox.ReplayAll()
|
|
self.ec2.create_key_pair('test')
|
|
|
|
try:
|
|
self.ec2.create_key_pair('test')
|
|
except boto_exc.EC2ResponseError as e:
|
|
if e.code == 'InvalidKeyPair.Duplicate':
|
|
pass
|
|
else:
|
|
self.assertEqual('InvalidKeyPair.Duplicate', e.code)
|
|
else:
|
|
self.fail('Exception not raised.')
|
|
|
|
def test_get_all_security_groups(self):
|
|
# Test that we can retrieve security groups.
|
|
self.expect_http()
|
|
self.mox.ReplayAll()
|
|
|
|
rv = self.ec2.get_all_security_groups()
|
|
|
|
self.assertEqual(len(rv), 1)
|
|
self.assertEqual(rv[0].name, 'default')
|
|
|
|
def test_create_delete_security_group(self):
|
|
# Test that we can create a security group.
|
|
self.expect_http()
|
|
self.mox.ReplayAll()
|
|
|
|
security_group_name = "".join(random.choice("sdiuisudfsdcnpaqwertasd")
|
|
for x in range(random.randint(4, 8)))
|
|
|
|
self.ec2.create_security_group(security_group_name, 'test group')
|
|
|
|
self.expect_http()
|
|
self.mox.ReplayAll()
|
|
|
|
rv = self.ec2.get_all_security_groups()
|
|
self.assertEqual(len(rv), 2)
|
|
self.assertIn(security_group_name, [group.name for group in rv])
|
|
|
|
self.expect_http()
|
|
self.mox.ReplayAll()
|
|
|
|
self.ec2.delete_security_group(security_group_name)
|
|
|
|
def test_group_name_valid_chars_security_group(self):
|
|
"""Test that we sanely handle invalid security group names.
|
|
|
|
EC2 API Spec states we should only accept alphanumeric characters,
|
|
spaces, dashes, and underscores. Amazon implementation
|
|
accepts more characters - so, [:print:] is ok.
|
|
"""
|
|
bad_strict_ec2 = "aa \t\x01\x02\x7f"
|
|
bad_amazon_ec2 = "aa #^% -=99"
|
|
test_raise = [
|
|
(True, bad_amazon_ec2, "test desc"),
|
|
(True, "test name", bad_amazon_ec2),
|
|
(False, bad_strict_ec2, "test desc"),
|
|
]
|
|
for t in test_raise:
|
|
self.expect_http()
|
|
self.mox.ReplayAll()
|
|
self.flags(ec2_strict_validation=t[0])
|
|
self.assertRaises(boto_exc.EC2ResponseError,
|
|
self.ec2.create_security_group,
|
|
t[1],
|
|
t[2])
|
|
test_accept = [
|
|
(False, bad_amazon_ec2, "test desc"),
|
|
(False, "test name", bad_amazon_ec2),
|
|
]
|
|
for t in test_accept:
|
|
self.expect_http()
|
|
self.mox.ReplayAll()
|
|
self.flags(ec2_strict_validation=t[0])
|
|
self.ec2.create_security_group(t[1], t[2])
|
|
self.expect_http()
|
|
self.mox.ReplayAll()
|
|
self.ec2.delete_security_group(t[1])
|
|
|
|
def test_group_name_valid_length_security_group(self):
|
|
"""Test that we sanely handle invalid security group names.
|
|
|
|
API Spec states that the length should not exceed 255 char.
|
|
"""
|
|
self.expect_http()
|
|
self.mox.ReplayAll()
|
|
|
|
# Test block group_name > 255 chars
|
|
security_group_name = "".join(random.choice("poiuytrewqasdfghjklmnbvc")
|
|
for x in range(random.randint(256, 266)))
|
|
|
|
self.assertRaises(boto_exc.EC2ResponseError,
|
|
self.ec2.create_security_group,
|
|
security_group_name,
|
|
'test group')
|
|
|
|
def test_authorize_revoke_security_group_cidr(self):
|
|
"""Test that we can add and remove CIDR based rules
|
|
to a security group
|
|
"""
|
|
self.expect_http()
|
|
self.mox.ReplayAll()
|
|
|
|
security_group_name = "".join(random.choice("sdiuisudfsdcnpaqwertasd")
|
|
for x in range(random.randint(4, 8)))
|
|
|
|
group = self.ec2.create_security_group(security_group_name,
|
|
'test group')
|
|
|
|
self.expect_http()
|
|
self.mox.ReplayAll()
|
|
group.connection = self.ec2
|
|
|
|
group.authorize('tcp', 80, 81, '0.0.0.0/0')
|
|
group.authorize('icmp', -1, -1, '0.0.0.0/0')
|
|
group.authorize('udp', 80, 81, '0.0.0.0/0')
|
|
group.authorize('tcp', 1, 65535, '0.0.0.0/0')
|
|
group.authorize('udp', 1, 65535, '0.0.0.0/0')
|
|
group.authorize('icmp', 1, 0, '0.0.0.0/0')
|
|
group.authorize('icmp', 0, 1, '0.0.0.0/0')
|
|
group.authorize('icmp', 0, 0, '0.0.0.0/0')
|
|
|
|
def _assert(message, *args):
|
|
try:
|
|
group.authorize(*args)
|
|
except boto_exc.EC2ResponseError as e:
|
|
self.assertEqual(e.status, 400, 'Expected status to be 400')
|
|
self.assertIn(message, e.error_message)
|
|
else:
|
|
raise self.failureException('EC2ResponseError not raised')
|
|
|
|
# Invalid CIDR address
|
|
_assert('Invalid CIDR', 'tcp', 80, 81, '0.0.0.0/0444')
|
|
# Missing ports
|
|
_assert('Not enough parameters', 'tcp', '0.0.0.0/0')
|
|
# from port cannot be greater than to port
|
|
_assert('Invalid port range', 'tcp', 100, 1, '0.0.0.0/0')
|
|
# For tcp, negative values are not allowed
|
|
_assert('Invalid port range', 'tcp', -1, 1, '0.0.0.0/0')
|
|
# For tcp, valid port range 1-65535
|
|
_assert('Invalid port range', 'tcp', 1, 65599, '0.0.0.0/0')
|
|
# Invalid Cidr for ICMP type
|
|
_assert('Invalid CIDR', 'icmp', -1, -1, '0.0.444.0/4')
|
|
# Invalid protocol
|
|
_assert('Invalid IP protocol', 'xyz', 1, 14, '0.0.0.0/0')
|
|
# Invalid port
|
|
_assert('Invalid input received: To and From ports must be integers',
|
|
'tcp', " ", "81", '0.0.0.0/0')
|
|
# Invalid icmp port
|
|
_assert('Invalid input received: '
|
|
'Type and Code must be integers for ICMP protocol type',
|
|
'icmp', " ", "81", '0.0.0.0/0')
|
|
# Invalid CIDR Address
|
|
_assert('Invalid CIDR', 'icmp', -1, -1, '0.0.0.0')
|
|
# Invalid CIDR Address
|
|
_assert('Invalid CIDR', 'icmp', -1, -1, '0.0.0.0/')
|
|
# Invalid Cidr ports
|
|
_assert('Invalid port range', 'icmp', 1, 256, '0.0.0.0/0')
|
|
|
|
self.expect_http()
|
|
self.mox.ReplayAll()
|
|
|
|
rv = self.ec2.get_all_security_groups()
|
|
|
|
group = [grp for grp in rv if grp.name == security_group_name][0]
|
|
|
|
self.assertEqual(len(group.rules), 8)
|
|
self.assertEqual(int(group.rules[0].from_port), 80)
|
|
self.assertEqual(int(group.rules[0].to_port), 81)
|
|
self.assertEqual(len(group.rules[0].grants), 1)
|
|
self.assertEqual(str(group.rules[0].grants[0]), '0.0.0.0/0')
|
|
|
|
self.expect_http()
|
|
self.mox.ReplayAll()
|
|
group.connection = self.ec2
|
|
|
|
group.revoke('tcp', 80, 81, '0.0.0.0/0')
|
|
group.revoke('icmp', -1, -1, '0.0.0.0/0')
|
|
group.revoke('udp', 80, 81, '0.0.0.0/0')
|
|
group.revoke('tcp', 1, 65535, '0.0.0.0/0')
|
|
group.revoke('udp', 1, 65535, '0.0.0.0/0')
|
|
group.revoke('icmp', 1, 0, '0.0.0.0/0')
|
|
group.revoke('icmp', 0, 1, '0.0.0.0/0')
|
|
group.revoke('icmp', 0, 0, '0.0.0.0/0')
|
|
|
|
self.expect_http()
|
|
self.mox.ReplayAll()
|
|
|
|
self.ec2.delete_security_group(security_group_name)
|
|
|
|
self.expect_http()
|
|
self.mox.ReplayAll()
|
|
group.connection = self.ec2
|
|
|
|
rv = self.ec2.get_all_security_groups()
|
|
|
|
self.assertEqual(len(rv), 1)
|
|
self.assertEqual(rv[0].name, 'default')
|
|
|
|
def test_authorize_revoke_security_group_cidr_v6(self):
|
|
"""Test that we can add and remove CIDR based rules
|
|
to a security group for IPv6
|
|
"""
|
|
self.expect_http()
|
|
self.mox.ReplayAll()
|
|
|
|
security_group_name = "".join(random.choice("sdiuisudfsdcnpaqwertasd")
|
|
for x in range(random.randint(4, 8)))
|
|
|
|
group = self.ec2.create_security_group(security_group_name,
|
|
'test group')
|
|
|
|
self.expect_http()
|
|
self.mox.ReplayAll()
|
|
group.connection = self.ec2
|
|
|
|
group.authorize('tcp', 80, 81, '::/0')
|
|
|
|
self.expect_http()
|
|
self.mox.ReplayAll()
|
|
|
|
rv = self.ec2.get_all_security_groups()
|
|
|
|
group = [grp for grp in rv if grp.name == security_group_name][0]
|
|
self.assertEqual(len(group.rules), 1)
|
|
self.assertEqual(int(group.rules[0].from_port), 80)
|
|
self.assertEqual(int(group.rules[0].to_port), 81)
|
|
self.assertEqual(len(group.rules[0].grants), 1)
|
|
self.assertEqual(str(group.rules[0].grants[0]), '::/0')
|
|
|
|
self.expect_http()
|
|
self.mox.ReplayAll()
|
|
group.connection = self.ec2
|
|
|
|
group.revoke('tcp', 80, 81, '::/0')
|
|
|
|
self.expect_http()
|
|
self.mox.ReplayAll()
|
|
|
|
self.ec2.delete_security_group(security_group_name)
|
|
|
|
self.expect_http()
|
|
self.mox.ReplayAll()
|
|
group.connection = self.ec2
|
|
|
|
rv = self.ec2.get_all_security_groups()
|
|
|
|
self.assertEqual(len(rv), 1)
|
|
self.assertEqual(rv[0].name, 'default')
|
|
|
|
def test_authorize_revoke_security_group_foreign_group(self):
|
|
"""Test that we can grant and revoke another security group access
|
|
to a security group
|
|
"""
|
|
self.expect_http()
|
|
self.mox.ReplayAll()
|
|
|
|
rand_string = 'sdiuisudfsdcnpaqwertasd'
|
|
security_group_name = "".join(random.choice(rand_string)
|
|
for x in range(random.randint(4, 8)))
|
|
other_security_group_name = "".join(random.choice(rand_string)
|
|
for x in range(random.randint(4, 8)))
|
|
|
|
group = self.ec2.create_security_group(security_group_name,
|
|
'test group')
|
|
|
|
self.expect_http()
|
|
self.mox.ReplayAll()
|
|
|
|
other_group = self.ec2.create_security_group(other_security_group_name,
|
|
'some other group')
|
|
|
|
self.expect_http()
|
|
self.mox.ReplayAll()
|
|
group.connection = self.ec2
|
|
|
|
group.authorize(src_group=other_group)
|
|
|
|
self.expect_http()
|
|
self.mox.ReplayAll()
|
|
|
|
rv = self.ec2.get_all_security_groups()
|
|
|
|
# I don't bother checkng that we actually find it here,
|
|
# because the create/delete unit test further up should
|
|
# be good enough for that.
|
|
for group in rv:
|
|
if group.name == security_group_name:
|
|
self.assertEqual(len(group.rules), 3)
|
|
self.assertEqual(len(group.rules[0].grants), 1)
|
|
self.assertEqual(str(group.rules[0].grants[0]),
|
|
'%s-%s' % (other_security_group_name, 'fake'))
|
|
|
|
self.expect_http()
|
|
self.mox.ReplayAll()
|
|
|
|
rv = self.ec2.get_all_security_groups()
|
|
|
|
for group in rv:
|
|
if group.name == security_group_name:
|
|
self.expect_http()
|
|
self.mox.ReplayAll()
|
|
group.connection = self.ec2
|
|
group.revoke(src_group=other_group)
|
|
|
|
self.expect_http()
|
|
self.mox.ReplayAll()
|
|
|
|
self.ec2.delete_security_group(security_group_name)
|
|
self.ec2.delete_security_group(other_security_group_name)
|