
Fixes bug #1008628 1. Edit openstack-common.conf and import nova/openstack/common/timeutils.py 2. Move time related functions from utils.py to timeutils.py 3. Replace following functions in utils.py with timeutils.py - isotime - parse_isotime - strtime - parse_strtime - normalize_time - is_older_than - utcnow_ts - utcnow - set_time_override - advance_time_delta - advance_time_seconds - clear_time_override 4. Remove datetime related functions and datetime related unittests Change-Id: I9a92be286fb071b6237dd39495d88dae106e2ce0
887 lines
34 KiB
Python
887 lines
34 KiB
Python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
|
|
|
# Copyright 2011 Justin Santa Barbara
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import __builtin__
|
|
import datetime
|
|
import hashlib
|
|
import os
|
|
import os.path
|
|
import shutil
|
|
import socket
|
|
import StringIO
|
|
import tempfile
|
|
|
|
import eventlet
|
|
from eventlet import greenpool
|
|
import iso8601
|
|
import mox
|
|
|
|
import nova
|
|
from nova import exception
|
|
from nova import flags
|
|
from nova.openstack.common import timeutils
|
|
from nova import test
|
|
from nova import utils
|
|
|
|
|
|
FLAGS = flags.FLAGS
|
|
|
|
|
|
class ExecuteTestCase(test.TestCase):
|
|
def test_retry_on_failure(self):
|
|
fd, tmpfilename = tempfile.mkstemp()
|
|
_, tmpfilename2 = tempfile.mkstemp()
|
|
try:
|
|
fp = os.fdopen(fd, 'w+')
|
|
fp.write('''#!/bin/sh
|
|
# If stdin fails to get passed during one of the runs, make a note.
|
|
if ! grep -q foo
|
|
then
|
|
echo 'failure' > "$1"
|
|
fi
|
|
# If stdin has failed to get passed during this or a previous run, exit early.
|
|
if grep failure "$1"
|
|
then
|
|
exit 1
|
|
fi
|
|
runs="$(cat $1)"
|
|
if [ -z "$runs" ]
|
|
then
|
|
runs=0
|
|
fi
|
|
runs=$(($runs + 1))
|
|
echo $runs > "$1"
|
|
exit 1
|
|
''')
|
|
fp.close()
|
|
os.chmod(tmpfilename, 0755)
|
|
self.assertRaises(exception.ProcessExecutionError,
|
|
utils.execute,
|
|
tmpfilename, tmpfilename2, attempts=10,
|
|
process_input='foo',
|
|
delay_on_retry=False)
|
|
fp = open(tmpfilename2, 'r+')
|
|
runs = fp.read()
|
|
fp.close()
|
|
self.assertNotEquals(runs.strip(), 'failure', 'stdin did not '
|
|
'always get passed '
|
|
'correctly')
|
|
runs = int(runs.strip())
|
|
self.assertEquals(runs, 10,
|
|
'Ran %d times instead of 10.' % (runs,))
|
|
finally:
|
|
os.unlink(tmpfilename)
|
|
os.unlink(tmpfilename2)
|
|
|
|
def test_unknown_kwargs_raises_error(self):
|
|
self.assertRaises(exception.NovaException,
|
|
utils.execute,
|
|
'/usr/bin/env', 'true',
|
|
this_is_not_a_valid_kwarg=True)
|
|
|
|
def test_check_exit_code_boolean(self):
|
|
utils.execute('/usr/bin/env', 'false', check_exit_code=False)
|
|
self.assertRaises(exception.ProcessExecutionError,
|
|
utils.execute,
|
|
'/usr/bin/env', 'false', check_exit_code=True)
|
|
|
|
def test_no_retry_on_success(self):
|
|
fd, tmpfilename = tempfile.mkstemp()
|
|
_, tmpfilename2 = tempfile.mkstemp()
|
|
try:
|
|
fp = os.fdopen(fd, 'w+')
|
|
fp.write('''#!/bin/sh
|
|
# If we've already run, bail out.
|
|
grep -q foo "$1" && exit 1
|
|
# Mark that we've run before.
|
|
echo foo > "$1"
|
|
# Check that stdin gets passed correctly.
|
|
grep foo
|
|
''')
|
|
fp.close()
|
|
os.chmod(tmpfilename, 0755)
|
|
utils.execute(tmpfilename,
|
|
tmpfilename2,
|
|
process_input='foo',
|
|
attempts=2)
|
|
finally:
|
|
os.unlink(tmpfilename)
|
|
os.unlink(tmpfilename2)
|
|
|
|
|
|
class GetFromPathTestCase(test.TestCase):
|
|
def test_tolerates_nones(self):
|
|
f = utils.get_from_path
|
|
|
|
input = []
|
|
self.assertEquals([], f(input, "a"))
|
|
self.assertEquals([], f(input, "a/b"))
|
|
self.assertEquals([], f(input, "a/b/c"))
|
|
|
|
input = [None]
|
|
self.assertEquals([], f(input, "a"))
|
|
self.assertEquals([], f(input, "a/b"))
|
|
self.assertEquals([], f(input, "a/b/c"))
|
|
|
|
input = [{'a': None}]
|
|
self.assertEquals([], f(input, "a"))
|
|
self.assertEquals([], f(input, "a/b"))
|
|
self.assertEquals([], f(input, "a/b/c"))
|
|
|
|
input = [{'a': {'b': None}}]
|
|
self.assertEquals([{'b': None}], f(input, "a"))
|
|
self.assertEquals([], f(input, "a/b"))
|
|
self.assertEquals([], f(input, "a/b/c"))
|
|
|
|
input = [{'a': {'b': {'c': None}}}]
|
|
self.assertEquals([{'b': {'c': None}}], f(input, "a"))
|
|
self.assertEquals([{'c': None}], f(input, "a/b"))
|
|
self.assertEquals([], f(input, "a/b/c"))
|
|
|
|
input = [{'a': {'b': {'c': None}}}, {'a': None}]
|
|
self.assertEquals([{'b': {'c': None}}], f(input, "a"))
|
|
self.assertEquals([{'c': None}], f(input, "a/b"))
|
|
self.assertEquals([], f(input, "a/b/c"))
|
|
|
|
input = [{'a': {'b': {'c': None}}}, {'a': {'b': None}}]
|
|
self.assertEquals([{'b': {'c': None}}, {'b': None}], f(input, "a"))
|
|
self.assertEquals([{'c': None}], f(input, "a/b"))
|
|
self.assertEquals([], f(input, "a/b/c"))
|
|
|
|
def test_does_select(self):
|
|
f = utils.get_from_path
|
|
|
|
input = [{'a': 'a_1'}]
|
|
self.assertEquals(['a_1'], f(input, "a"))
|
|
self.assertEquals([], f(input, "a/b"))
|
|
self.assertEquals([], f(input, "a/b/c"))
|
|
|
|
input = [{'a': {'b': 'b_1'}}]
|
|
self.assertEquals([{'b': 'b_1'}], f(input, "a"))
|
|
self.assertEquals(['b_1'], f(input, "a/b"))
|
|
self.assertEquals([], f(input, "a/b/c"))
|
|
|
|
input = [{'a': {'b': {'c': 'c_1'}}}]
|
|
self.assertEquals([{'b': {'c': 'c_1'}}], f(input, "a"))
|
|
self.assertEquals([{'c': 'c_1'}], f(input, "a/b"))
|
|
self.assertEquals(['c_1'], f(input, "a/b/c"))
|
|
|
|
input = [{'a': {'b': {'c': 'c_1'}}}, {'a': None}]
|
|
self.assertEquals([{'b': {'c': 'c_1'}}], f(input, "a"))
|
|
self.assertEquals([{'c': 'c_1'}], f(input, "a/b"))
|
|
self.assertEquals(['c_1'], f(input, "a/b/c"))
|
|
|
|
input = [{'a': {'b': {'c': 'c_1'}}},
|
|
{'a': {'b': None}}]
|
|
self.assertEquals([{'b': {'c': 'c_1'}}, {'b': None}], f(input, "a"))
|
|
self.assertEquals([{'c': 'c_1'}], f(input, "a/b"))
|
|
self.assertEquals(['c_1'], f(input, "a/b/c"))
|
|
|
|
input = [{'a': {'b': {'c': 'c_1'}}},
|
|
{'a': {'b': {'c': 'c_2'}}}]
|
|
self.assertEquals([{'b': {'c': 'c_1'}}, {'b': {'c': 'c_2'}}],
|
|
f(input, "a"))
|
|
self.assertEquals([{'c': 'c_1'}, {'c': 'c_2'}], f(input, "a/b"))
|
|
self.assertEquals(['c_1', 'c_2'], f(input, "a/b/c"))
|
|
|
|
self.assertEquals([], f(input, "a/b/c/d"))
|
|
self.assertEquals([], f(input, "c/a/b/d"))
|
|
self.assertEquals([], f(input, "i/r/t"))
|
|
|
|
def test_flattens_lists(self):
|
|
f = utils.get_from_path
|
|
|
|
input = [{'a': [1, 2, 3]}]
|
|
self.assertEquals([1, 2, 3], f(input, "a"))
|
|
self.assertEquals([], f(input, "a/b"))
|
|
self.assertEquals([], f(input, "a/b/c"))
|
|
|
|
input = [{'a': {'b': [1, 2, 3]}}]
|
|
self.assertEquals([{'b': [1, 2, 3]}], f(input, "a"))
|
|
self.assertEquals([1, 2, 3], f(input, "a/b"))
|
|
self.assertEquals([], f(input, "a/b/c"))
|
|
|
|
input = [{'a': {'b': [1, 2, 3]}}, {'a': {'b': [4, 5, 6]}}]
|
|
self.assertEquals([1, 2, 3, 4, 5, 6], f(input, "a/b"))
|
|
self.assertEquals([], f(input, "a/b/c"))
|
|
|
|
input = [{'a': [{'b': [1, 2, 3]}, {'b': [4, 5, 6]}]}]
|
|
self.assertEquals([1, 2, 3, 4, 5, 6], f(input, "a/b"))
|
|
self.assertEquals([], f(input, "a/b/c"))
|
|
|
|
input = [{'a': [1, 2, {'b': 'b_1'}]}]
|
|
self.assertEquals([1, 2, {'b': 'b_1'}], f(input, "a"))
|
|
self.assertEquals(['b_1'], f(input, "a/b"))
|
|
|
|
def test_bad_xpath(self):
|
|
f = utils.get_from_path
|
|
|
|
self.assertRaises(exception.NovaException, f, [], None)
|
|
self.assertRaises(exception.NovaException, f, [], "")
|
|
self.assertRaises(exception.NovaException, f, [], "/")
|
|
self.assertRaises(exception.NovaException, f, [], "/a")
|
|
self.assertRaises(exception.NovaException, f, [], "/a/")
|
|
self.assertRaises(exception.NovaException, f, [], "//")
|
|
self.assertRaises(exception.NovaException, f, [], "//a")
|
|
self.assertRaises(exception.NovaException, f, [], "a//a")
|
|
self.assertRaises(exception.NovaException, f, [], "a//a/")
|
|
self.assertRaises(exception.NovaException, f, [], "a/a/")
|
|
|
|
def test_real_failure1(self):
|
|
# Real world failure case...
|
|
# We weren't coping when the input was a Dictionary instead of a List
|
|
# This led to test_accepts_dictionaries
|
|
f = utils.get_from_path
|
|
|
|
inst = {'fixed_ip': {'floating_ips': [{'address': '1.2.3.4'}],
|
|
'address': '192.168.0.3'},
|
|
'hostname': ''}
|
|
|
|
private_ips = f(inst, 'fixed_ip/address')
|
|
public_ips = f(inst, 'fixed_ip/floating_ips/address')
|
|
self.assertEquals(['192.168.0.3'], private_ips)
|
|
self.assertEquals(['1.2.3.4'], public_ips)
|
|
|
|
def test_accepts_dictionaries(self):
|
|
f = utils.get_from_path
|
|
|
|
input = {'a': [1, 2, 3]}
|
|
self.assertEquals([1, 2, 3], f(input, "a"))
|
|
self.assertEquals([], f(input, "a/b"))
|
|
self.assertEquals([], f(input, "a/b/c"))
|
|
|
|
input = {'a': {'b': [1, 2, 3]}}
|
|
self.assertEquals([{'b': [1, 2, 3]}], f(input, "a"))
|
|
self.assertEquals([1, 2, 3], f(input, "a/b"))
|
|
self.assertEquals([], f(input, "a/b/c"))
|
|
|
|
input = {'a': [{'b': [1, 2, 3]}, {'b': [4, 5, 6]}]}
|
|
self.assertEquals([1, 2, 3, 4, 5, 6], f(input, "a/b"))
|
|
self.assertEquals([], f(input, "a/b/c"))
|
|
|
|
input = {'a': [1, 2, {'b': 'b_1'}]}
|
|
self.assertEquals([1, 2, {'b': 'b_1'}], f(input, "a"))
|
|
self.assertEquals(['b_1'], f(input, "a/b"))
|
|
|
|
|
|
class GenericUtilsTestCase(test.TestCase):
|
|
def test_parse_server_string(self):
|
|
result = utils.parse_server_string('::1')
|
|
self.assertEqual(('::1', ''), result)
|
|
result = utils.parse_server_string('[::1]:8773')
|
|
self.assertEqual(('::1', '8773'), result)
|
|
result = utils.parse_server_string('2001:db8::192.168.1.1')
|
|
self.assertEqual(('2001:db8::192.168.1.1', ''), result)
|
|
result = utils.parse_server_string('[2001:db8::192.168.1.1]:8773')
|
|
self.assertEqual(('2001:db8::192.168.1.1', '8773'), result)
|
|
result = utils.parse_server_string('192.168.1.1')
|
|
self.assertEqual(('192.168.1.1', ''), result)
|
|
result = utils.parse_server_string('192.168.1.2:8773')
|
|
self.assertEqual(('192.168.1.2', '8773'), result)
|
|
result = utils.parse_server_string('192.168.1.3')
|
|
self.assertEqual(('192.168.1.3', ''), result)
|
|
result = utils.parse_server_string('www.example.com:8443')
|
|
self.assertEqual(('www.example.com', '8443'), result)
|
|
result = utils.parse_server_string('www.example.com')
|
|
self.assertEqual(('www.example.com', ''), result)
|
|
# error case
|
|
result = utils.parse_server_string('www.exa:mple.com:8443')
|
|
self.assertEqual(('', ''), result)
|
|
|
|
def test_hostname_unicode_sanitization(self):
|
|
hostname = u"\u7684.test.example.com"
|
|
self.assertEqual("test.example.com",
|
|
utils.sanitize_hostname(hostname))
|
|
|
|
def test_hostname_sanitize_periods(self):
|
|
hostname = "....test.example.com..."
|
|
self.assertEqual("test.example.com",
|
|
utils.sanitize_hostname(hostname))
|
|
|
|
def test_hostname_sanitize_dashes(self):
|
|
hostname = "----test.example.com---"
|
|
self.assertEqual("test.example.com",
|
|
utils.sanitize_hostname(hostname))
|
|
|
|
def test_hostname_sanitize_characters(self):
|
|
hostname = "(#@&$!(@*--#&91)(__=+--test-host.example!!.com-0+"
|
|
self.assertEqual("91----test-host.example.com-0",
|
|
utils.sanitize_hostname(hostname))
|
|
|
|
def test_hostname_translate(self):
|
|
hostname = "<}\x1fh\x10e\x08l\x02l\x05o\x12!{>"
|
|
self.assertEqual("hello", utils.sanitize_hostname(hostname))
|
|
|
|
def test_bool_from_str(self):
|
|
self.assertTrue(utils.bool_from_str('1'))
|
|
self.assertTrue(utils.bool_from_str('2'))
|
|
self.assertTrue(utils.bool_from_str('-1'))
|
|
self.assertTrue(utils.bool_from_str('true'))
|
|
self.assertTrue(utils.bool_from_str('True'))
|
|
self.assertTrue(utils.bool_from_str('tRuE'))
|
|
self.assertFalse(utils.bool_from_str('False'))
|
|
self.assertFalse(utils.bool_from_str('false'))
|
|
self.assertFalse(utils.bool_from_str('0'))
|
|
self.assertFalse(utils.bool_from_str(None))
|
|
self.assertFalse(utils.bool_from_str('junk'))
|
|
|
|
def test_generate_glance_url(self):
|
|
generated_url = utils.generate_glance_url()
|
|
actual_url = "http://%s:%d" % (FLAGS.glance_host, FLAGS.glance_port)
|
|
self.assertEqual(generated_url, actual_url)
|
|
|
|
def test_read_cached_file(self):
|
|
self.mox.StubOutWithMock(os.path, "getmtime")
|
|
os.path.getmtime(mox.IgnoreArg()).AndReturn(1)
|
|
self.mox.ReplayAll()
|
|
|
|
cache_data = {"data": 1123, "mtime": 1}
|
|
data = utils.read_cached_file("/this/is/a/fake", cache_data)
|
|
self.assertEqual(cache_data["data"], data)
|
|
|
|
def test_read_modified_cached_file(self):
|
|
self.mox.StubOutWithMock(os.path, "getmtime")
|
|
self.mox.StubOutWithMock(__builtin__, 'open')
|
|
os.path.getmtime(mox.IgnoreArg()).AndReturn(2)
|
|
|
|
fake_contents = "lorem ipsum"
|
|
fake_file = self.mox.CreateMockAnything()
|
|
fake_file.read().AndReturn(fake_contents)
|
|
fake_context_manager = self.mox.CreateMockAnything()
|
|
fake_context_manager.__enter__().AndReturn(fake_file)
|
|
fake_context_manager.__exit__(mox.IgnoreArg(),
|
|
mox.IgnoreArg(),
|
|
mox.IgnoreArg())
|
|
|
|
__builtin__.open(mox.IgnoreArg()).AndReturn(fake_context_manager)
|
|
|
|
self.mox.ReplayAll()
|
|
cache_data = {"data": 1123, "mtime": 1}
|
|
self.reload_called = False
|
|
|
|
def test_reload(reloaded_data):
|
|
self.assertEqual(reloaded_data, fake_contents)
|
|
self.reload_called = True
|
|
|
|
data = utils.read_cached_file("/this/is/a/fake", cache_data,
|
|
reload_func=test_reload)
|
|
self.assertEqual(data, fake_contents)
|
|
self.assertTrue(self.reload_called)
|
|
|
|
def test_generate_password(self):
|
|
password = utils.generate_password()
|
|
self.assertTrue([c for c in password if c in '0123456789'])
|
|
self.assertTrue([c for c in password
|
|
if c in 'abcdefghijklmnopqrstuvwxyz'])
|
|
self.assertTrue([c for c in password
|
|
if c in 'ABCDEFGHIJKLMNOPQRSTUVWXYZ'])
|
|
|
|
def test_read_file_as_root(self):
|
|
def fake_execute(*args, **kwargs):
|
|
if args[1] == 'bad':
|
|
raise exception.ProcessExecutionError
|
|
return 'fakecontents', None
|
|
|
|
self.stubs.Set(utils, 'execute', fake_execute)
|
|
contents = utils.read_file_as_root('good')
|
|
self.assertEqual(contents, 'fakecontents')
|
|
self.assertRaises(exception.FileNotFound,
|
|
utils.read_file_as_root, 'bad')
|
|
|
|
def test_strcmp_const_time(self):
|
|
self.assertTrue(utils.strcmp_const_time('abc123', 'abc123'))
|
|
self.assertFalse(utils.strcmp_const_time('a', 'aaaaa'))
|
|
self.assertFalse(utils.strcmp_const_time('ABC123', 'abc123'))
|
|
|
|
def test_temporary_chown(self):
|
|
def fake_execute(*args, **kwargs):
|
|
if args[0] == 'chown':
|
|
fake_execute.uid = args[1]
|
|
self.stubs.Set(utils, 'execute', fake_execute)
|
|
|
|
with tempfile.NamedTemporaryFile() as f:
|
|
with utils.temporary_chown(f.name, owner_uid=2):
|
|
self.assertEqual(fake_execute.uid, 2)
|
|
self.assertEqual(fake_execute.uid, os.getuid())
|
|
|
|
def test_service_is_up(self):
|
|
fts_func = datetime.datetime.fromtimestamp
|
|
fake_now = 1000
|
|
down_time = 5
|
|
|
|
self.flags(service_down_time=down_time)
|
|
self.mox.StubOutWithMock(timeutils, 'utcnow')
|
|
|
|
# Up (equal)
|
|
timeutils.utcnow().AndReturn(fts_func(fake_now))
|
|
service = {'updated_at': fts_func(fake_now - down_time),
|
|
'created_at': fts_func(fake_now - down_time)}
|
|
self.mox.ReplayAll()
|
|
result = utils.service_is_up(service)
|
|
self.assertTrue(result)
|
|
|
|
self.mox.ResetAll()
|
|
# Up
|
|
timeutils.utcnow().AndReturn(fts_func(fake_now))
|
|
service = {'updated_at': fts_func(fake_now - down_time + 1),
|
|
'created_at': fts_func(fake_now - down_time + 1)}
|
|
self.mox.ReplayAll()
|
|
result = utils.service_is_up(service)
|
|
self.assertTrue(result)
|
|
|
|
self.mox.ResetAll()
|
|
# Down
|
|
timeutils.utcnow().AndReturn(fts_func(fake_now))
|
|
service = {'updated_at': fts_func(fake_now - down_time - 1),
|
|
'created_at': fts_func(fake_now - down_time - 1)}
|
|
self.mox.ReplayAll()
|
|
result = utils.service_is_up(service)
|
|
self.assertFalse(result)
|
|
|
|
def test_xhtml_escape(self):
|
|
self.assertEqual('"foo"', utils.xhtml_escape('"foo"'))
|
|
self.assertEqual(''foo'', utils.xhtml_escape("'foo'"))
|
|
|
|
def test_hash_file(self):
|
|
data = 'Mary had a little lamb, its fleece as white as snow'
|
|
flo = StringIO.StringIO(data)
|
|
h1 = utils.hash_file(flo)
|
|
h2 = hashlib.sha1(data).hexdigest()
|
|
self.assertEquals(h1, h2)
|
|
|
|
|
|
class IsUUIDLikeTestCase(test.TestCase):
|
|
def assertUUIDLike(self, val, expected):
|
|
result = utils.is_uuid_like(val)
|
|
self.assertEqual(result, expected)
|
|
|
|
def test_good_uuid(self):
|
|
val = 'aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa'
|
|
self.assertUUIDLike(val, True)
|
|
|
|
def test_integer_passed(self):
|
|
val = 1
|
|
self.assertUUIDLike(val, False)
|
|
|
|
def test_non_uuid_string_passed(self):
|
|
val = 'foo-fooo'
|
|
self.assertUUIDLike(val, False)
|
|
|
|
def test_non_uuid_string_passed2(self):
|
|
val = 'xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx'
|
|
self.assertUUIDLike(val, False)
|
|
|
|
def test_gen_valid_uuid(self):
|
|
self.assertUUIDLike(str(utils.gen_uuid()), True)
|
|
|
|
|
|
class MonkeyPatchTestCase(test.TestCase):
|
|
"""Unit test for utils.monkey_patch()."""
|
|
def setUp(self):
|
|
super(MonkeyPatchTestCase, self).setUp()
|
|
self.example_package = 'nova.tests.monkey_patch_example.'
|
|
self.flags(
|
|
monkey_patch=True,
|
|
monkey_patch_modules=[self.example_package + 'example_a' + ':'
|
|
+ self.example_package + 'example_decorator'])
|
|
|
|
def test_monkey_patch(self):
|
|
utils.monkey_patch()
|
|
nova.tests.monkey_patch_example.CALLED_FUNCTION = []
|
|
from nova.tests.monkey_patch_example import example_a
|
|
from nova.tests.monkey_patch_example import example_b
|
|
|
|
self.assertEqual('Example function', example_a.example_function_a())
|
|
exampleA = example_a.ExampleClassA()
|
|
exampleA.example_method()
|
|
ret_a = exampleA.example_method_add(3, 5)
|
|
self.assertEqual(ret_a, 8)
|
|
|
|
self.assertEqual('Example function', example_b.example_function_b())
|
|
exampleB = example_b.ExampleClassB()
|
|
exampleB.example_method()
|
|
ret_b = exampleB.example_method_add(3, 5)
|
|
|
|
self.assertEqual(ret_b, 8)
|
|
package_a = self.example_package + 'example_a.'
|
|
self.assertTrue(package_a + 'example_function_a'
|
|
in nova.tests.monkey_patch_example.CALLED_FUNCTION)
|
|
|
|
self.assertTrue(package_a + 'ExampleClassA.example_method'
|
|
in nova.tests.monkey_patch_example.CALLED_FUNCTION)
|
|
self.assertTrue(package_a + 'ExampleClassA.example_method_add'
|
|
in nova.tests.monkey_patch_example.CALLED_FUNCTION)
|
|
package_b = self.example_package + 'example_b.'
|
|
self.assertFalse(package_b + 'example_function_b'
|
|
in nova.tests.monkey_patch_example.CALLED_FUNCTION)
|
|
self.assertFalse(package_b + 'ExampleClassB.example_method'
|
|
in nova.tests.monkey_patch_example.CALLED_FUNCTION)
|
|
self.assertFalse(package_b + 'ExampleClassB.example_method_add'
|
|
in nova.tests.monkey_patch_example.CALLED_FUNCTION)
|
|
|
|
|
|
class TestGreenLocks(test.TestCase):
|
|
def test_concurrent_green_lock_succeeds(self):
|
|
"""Verify spawn_n greenthreads with two locks run concurrently.
|
|
|
|
This succeeds with spawn but fails with spawn_n because lockfile
|
|
gets the same thread id for both spawn_n threads. Our workaround
|
|
of using the GreenLockFile will work even if the issue is fixed.
|
|
"""
|
|
self.completed = False
|
|
with utils.tempdir() as tmpdir:
|
|
|
|
def locka(wait):
|
|
a = utils.GreenLockFile(os.path.join(tmpdir, 'a'))
|
|
a.acquire()
|
|
wait.wait()
|
|
a.release()
|
|
self.completed = True
|
|
|
|
def lockb(wait):
|
|
b = utils.GreenLockFile(os.path.join(tmpdir, 'b'))
|
|
b.acquire()
|
|
wait.wait()
|
|
b.release()
|
|
|
|
wait1 = eventlet.event.Event()
|
|
wait2 = eventlet.event.Event()
|
|
pool = greenpool.GreenPool()
|
|
pool.spawn_n(locka, wait1)
|
|
pool.spawn_n(lockb, wait2)
|
|
wait2.send()
|
|
eventlet.sleep(0)
|
|
wait1.send()
|
|
pool.waitall()
|
|
self.assertTrue(self.completed)
|
|
|
|
|
|
class TestLockCleanup(test.TestCase):
|
|
"""unit tests for utils.cleanup_file_locks()"""
|
|
|
|
def setUp(self):
|
|
super(TestLockCleanup, self).setUp()
|
|
|
|
self.pid = os.getpid()
|
|
self.dead_pid = self._get_dead_pid()
|
|
self.tempdir = tempfile.mkdtemp()
|
|
self.flags(lock_path=self.tempdir)
|
|
self.lock_name = 'nova-testlock'
|
|
self.lock_file = os.path.join(FLAGS.lock_path,
|
|
self.lock_name + '.lock')
|
|
self.hostname = socket.gethostname()
|
|
print self.pid, self.dead_pid
|
|
try:
|
|
os.unlink(self.lock_file)
|
|
except OSError as (errno, strerror):
|
|
if errno == 2:
|
|
pass
|
|
|
|
def tearDown(self):
|
|
shutil.rmtree(self.tempdir)
|
|
super(TestLockCleanup, self).tearDown()
|
|
|
|
def _get_dead_pid(self):
|
|
"""get a pid for a process that does not exist"""
|
|
|
|
candidate_pid = self.pid - 1
|
|
while os.path.exists(os.path.join('/proc', str(candidate_pid))):
|
|
candidate_pid -= 1
|
|
if candidate_pid == 1:
|
|
return 0
|
|
return candidate_pid
|
|
|
|
def _get_sentinel_name(self, hostname, pid, thread='MainThread'):
|
|
return os.path.join(FLAGS.lock_path,
|
|
'%s.%s-%d' % (hostname, thread, pid))
|
|
|
|
def _create_sentinel(self, hostname, pid, thread='MainThread'):
|
|
name = self._get_sentinel_name(hostname, pid, thread)
|
|
open(name, 'wb').close()
|
|
return name
|
|
|
|
def test_clean_stale_locks(self):
|
|
"""verify locks for dead processes are cleaned up"""
|
|
|
|
# create sentinels for two processes, us and a 'dead' one
|
|
# no active lock
|
|
sentinel1 = self._create_sentinel(self.hostname, self.pid)
|
|
sentinel2 = self._create_sentinel(self.hostname, self.dead_pid)
|
|
|
|
utils.cleanup_file_locks()
|
|
|
|
self.assertTrue(os.path.exists(sentinel1))
|
|
self.assertFalse(os.path.exists(self.lock_file))
|
|
self.assertFalse(os.path.exists(sentinel2))
|
|
|
|
os.unlink(sentinel1)
|
|
|
|
def test_clean_stale_locks_active(self):
|
|
"""verify locks for dead processes are cleaned with an active lock """
|
|
|
|
# create sentinels for two processes, us and a 'dead' one
|
|
# create an active lock for us
|
|
sentinel1 = self._create_sentinel(self.hostname, self.pid)
|
|
sentinel2 = self._create_sentinel(self.hostname, self.dead_pid)
|
|
os.link(sentinel1, self.lock_file)
|
|
|
|
utils.cleanup_file_locks()
|
|
|
|
self.assertTrue(os.path.exists(sentinel1))
|
|
self.assertTrue(os.path.exists(self.lock_file))
|
|
self.assertFalse(os.path.exists(sentinel2))
|
|
|
|
os.unlink(sentinel1)
|
|
os.unlink(self.lock_file)
|
|
|
|
def test_clean_stale_with_threads(self):
|
|
"""verify locks for multiple threads are cleaned up """
|
|
|
|
# create sentinels for four threads in our process, and a 'dead'
|
|
# process. no lock.
|
|
sentinel1 = self._create_sentinel(self.hostname, self.pid, 'Default-1')
|
|
sentinel2 = self._create_sentinel(self.hostname, self.pid, 'Default-2')
|
|
sentinel3 = self._create_sentinel(self.hostname, self.pid, 'Default-3')
|
|
sentinel4 = self._create_sentinel(self.hostname, self.pid, 'Default-4')
|
|
sentinel5 = self._create_sentinel(self.hostname, self.dead_pid,
|
|
'Default-1')
|
|
|
|
utils.cleanup_file_locks()
|
|
|
|
self.assertTrue(os.path.exists(sentinel1))
|
|
self.assertTrue(os.path.exists(sentinel2))
|
|
self.assertTrue(os.path.exists(sentinel3))
|
|
self.assertTrue(os.path.exists(sentinel4))
|
|
self.assertFalse(os.path.exists(self.lock_file))
|
|
self.assertFalse(os.path.exists(sentinel5))
|
|
|
|
os.unlink(sentinel1)
|
|
os.unlink(sentinel2)
|
|
os.unlink(sentinel3)
|
|
os.unlink(sentinel4)
|
|
|
|
def test_clean_stale_with_threads_active(self):
|
|
"""verify locks for multiple threads are cleaned up """
|
|
|
|
# create sentinels for four threads in our process, and a 'dead'
|
|
# process
|
|
sentinel1 = self._create_sentinel(self.hostname, self.pid, 'Default-1')
|
|
sentinel2 = self._create_sentinel(self.hostname, self.pid, 'Default-2')
|
|
sentinel3 = self._create_sentinel(self.hostname, self.pid, 'Default-3')
|
|
sentinel4 = self._create_sentinel(self.hostname, self.pid, 'Default-4')
|
|
sentinel5 = self._create_sentinel(self.hostname, self.dead_pid,
|
|
'Default-1')
|
|
|
|
os.link(sentinel1, self.lock_file)
|
|
|
|
utils.cleanup_file_locks()
|
|
|
|
self.assertTrue(os.path.exists(sentinel1))
|
|
self.assertTrue(os.path.exists(sentinel2))
|
|
self.assertTrue(os.path.exists(sentinel3))
|
|
self.assertTrue(os.path.exists(sentinel4))
|
|
self.assertTrue(os.path.exists(self.lock_file))
|
|
self.assertFalse(os.path.exists(sentinel5))
|
|
|
|
os.unlink(sentinel1)
|
|
os.unlink(sentinel2)
|
|
os.unlink(sentinel3)
|
|
os.unlink(sentinel4)
|
|
os.unlink(self.lock_file)
|
|
|
|
def test_clean_bogus_lockfiles(self):
|
|
"""verify lockfiles are cleaned """
|
|
|
|
lock1 = os.path.join(FLAGS.lock_path, 'nova-testlock1.lock')
|
|
lock2 = os.path.join(FLAGS.lock_path, 'nova-testlock2.lock')
|
|
lock3 = os.path.join(FLAGS.lock_path, 'testlock3.lock')
|
|
|
|
open(lock1, 'wb').close()
|
|
open(lock2, 'wb').close()
|
|
open(lock3, 'wb').close()
|
|
|
|
utils.cleanup_file_locks()
|
|
|
|
self.assertFalse(os.path.exists(lock1))
|
|
self.assertFalse(os.path.exists(lock2))
|
|
self.assertTrue(os.path.exists(lock3))
|
|
|
|
os.unlink(lock3)
|
|
|
|
|
|
class AuditPeriodTest(test.TestCase):
|
|
|
|
def setUp(self):
|
|
super(AuditPeriodTest, self).setUp()
|
|
#a fairly random time to test with
|
|
self.test_time = datetime.datetime(second=23,
|
|
minute=12,
|
|
hour=8,
|
|
day=5,
|
|
month=3,
|
|
year=2012)
|
|
timeutils.set_time_override(override_time=self.test_time)
|
|
|
|
def tearDown(self):
|
|
timeutils.clear_time_override()
|
|
super(AuditPeriodTest, self).tearDown()
|
|
|
|
def test_hour(self):
|
|
begin, end = utils.last_completed_audit_period(unit='hour')
|
|
self.assertEquals(begin, datetime.datetime(
|
|
hour=7,
|
|
day=5,
|
|
month=3,
|
|
year=2012))
|
|
self.assertEquals(end, datetime.datetime(
|
|
hour=8,
|
|
day=5,
|
|
month=3,
|
|
year=2012))
|
|
|
|
def test_hour_with_offset_before_current(self):
|
|
begin, end = utils.last_completed_audit_period(unit='hour@10')
|
|
self.assertEquals(begin, datetime.datetime(
|
|
minute=10,
|
|
hour=7,
|
|
day=5,
|
|
month=3,
|
|
year=2012))
|
|
self.assertEquals(end, datetime.datetime(
|
|
minute=10,
|
|
hour=8,
|
|
day=5,
|
|
month=3,
|
|
year=2012))
|
|
|
|
def test_hour_with_offset_after_current(self):
|
|
begin, end = utils.last_completed_audit_period(unit='hour@30')
|
|
self.assertEquals(begin, datetime.datetime(
|
|
minute=30,
|
|
hour=6,
|
|
day=5,
|
|
month=3,
|
|
year=2012))
|
|
self.assertEquals(end, datetime.datetime(
|
|
minute=30,
|
|
hour=7,
|
|
day=5,
|
|
month=3,
|
|
year=2012))
|
|
|
|
def test_day(self):
|
|
begin, end = utils.last_completed_audit_period(unit='day')
|
|
self.assertEquals(begin, datetime.datetime(
|
|
day=4,
|
|
month=3,
|
|
year=2012))
|
|
self.assertEquals(end, datetime.datetime(
|
|
day=5,
|
|
month=3,
|
|
year=2012))
|
|
|
|
def test_day_with_offset_before_current(self):
|
|
begin, end = utils.last_completed_audit_period(unit='day@6')
|
|
self.assertEquals(begin, datetime.datetime(
|
|
hour=6,
|
|
day=4,
|
|
month=3,
|
|
year=2012))
|
|
self.assertEquals(end, datetime.datetime(
|
|
hour=6,
|
|
day=5,
|
|
month=3,
|
|
year=2012))
|
|
|
|
def test_day_with_offset_after_current(self):
|
|
begin, end = utils.last_completed_audit_period(unit='day@10')
|
|
self.assertEquals(begin, datetime.datetime(
|
|
hour=10,
|
|
day=3,
|
|
month=3,
|
|
year=2012))
|
|
self.assertEquals(end, datetime.datetime(
|
|
hour=10,
|
|
day=4,
|
|
month=3,
|
|
year=2012))
|
|
|
|
def test_month(self):
|
|
begin, end = utils.last_completed_audit_period(unit='month')
|
|
self.assertEquals(begin, datetime.datetime(
|
|
day=1,
|
|
month=2,
|
|
year=2012))
|
|
self.assertEquals(end, datetime.datetime(
|
|
day=1,
|
|
month=3,
|
|
year=2012))
|
|
|
|
def test_month_with_offset_before_current(self):
|
|
begin, end = utils.last_completed_audit_period(unit='month@2')
|
|
self.assertEquals(begin, datetime.datetime(
|
|
day=2,
|
|
month=2,
|
|
year=2012))
|
|
self.assertEquals(end, datetime.datetime(
|
|
day=2,
|
|
month=3,
|
|
year=2012))
|
|
|
|
def test_month_with_offset_after_current(self):
|
|
begin, end = utils.last_completed_audit_period(unit='month@15')
|
|
self.assertEquals(begin, datetime.datetime(
|
|
day=15,
|
|
month=1,
|
|
year=2012))
|
|
self.assertEquals(end, datetime.datetime(
|
|
day=15,
|
|
month=2,
|
|
year=2012))
|
|
|
|
def test_year(self):
|
|
begin, end = utils.last_completed_audit_period(unit='year')
|
|
self.assertEquals(begin, datetime.datetime(
|
|
day=1,
|
|
month=1,
|
|
year=2011))
|
|
self.assertEquals(end, datetime.datetime(
|
|
day=1,
|
|
month=1,
|
|
year=2012))
|
|
|
|
def test_year_with_offset_before_current(self):
|
|
begin, end = utils.last_completed_audit_period(unit='year@2')
|
|
self.assertEquals(begin, datetime.datetime(
|
|
day=1,
|
|
month=2,
|
|
year=2011))
|
|
self.assertEquals(end, datetime.datetime(
|
|
day=1,
|
|
month=2,
|
|
year=2012))
|
|
|
|
def test_year_with_offset_after_current(self):
|
|
begin, end = utils.last_completed_audit_period(unit='year@6')
|
|
self.assertEquals(begin, datetime.datetime(
|
|
day=1,
|
|
month=6,
|
|
year=2010))
|
|
self.assertEquals(end, datetime.datetime(
|
|
day=1,
|
|
month=6,
|
|
year=2011))
|