Merge "Implement compare-and-swap for instance update"

This commit is contained in:
Jenkins 2015-08-07 18:28:24 +00:00 committed by Gerrit Code Review
commit acdb72bfac
8 changed files with 346 additions and 90 deletions

View File

@ -730,17 +730,18 @@ def instance_get_all_hung_in_rebooting(context, reboot_window):
return IMPL.instance_get_all_hung_in_rebooting(context, reboot_window)
def instance_update(context, instance_uuid, values):
def instance_update(context, instance_uuid, values, expected=None):
"""Set the given properties on an instance and update it.
Raises NotFound if instance does not exist.
"""
return IMPL.instance_update(context, instance_uuid, values)
return IMPL.instance_update(context, instance_uuid, values,
expected=expected)
def instance_update_and_get_original(context, instance_uuid, values,
columns_to_join=None):
columns_to_join=None, expected=None):
"""Set the given properties on an instance and update it. Return
a shallow copy of the original instance reference, as well as the
updated one.
@ -754,7 +755,8 @@ def instance_update_and_get_original(context, instance_uuid, values,
Raises NotFound if instance does not exist.
"""
rv = IMPL.instance_update_and_get_original(context, instance_uuid, values,
columns_to_join=columns_to_join)
columns_to_join=columns_to_join,
expected=expected)
return rv

View File

@ -30,6 +30,7 @@ from oslo_db import api as oslo_db_api
from oslo_db import exception as db_exc
from oslo_db import options as oslo_db_options
from oslo_db.sqlalchemy import session as db_session
from oslo_db.sqlalchemy import update_match
from oslo_db.sqlalchemy import utils as sqlalchemyutils
from oslo_log import log as logging
from oslo_utils import excutils
@ -2426,15 +2427,29 @@ def instance_get_all_hung_in_rebooting(context, reboot_window):
manual_joins=[])
@require_context
def instance_update(context, instance_uuid, values):
instance_ref = _instance_update(context, instance_uuid, values)[1]
return instance_ref
def _retry_instance_update():
"""Wrap with oslo_db_api.wrap_db_retry, and also retry on
UnknownInstanceUpdateConflict.
"""
exception_checker = \
lambda exc: isinstance(exc, (exception.UnknownInstanceUpdateConflict,))
return oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True,
exception_checker=exception_checker)
@require_context
@_retry_instance_update()
def instance_update(context, instance_uuid, values, expected=None):
session = get_session()
with session.begin():
return _instance_update(context, session, instance_uuid,
values, expected)
@require_context
@_retry_instance_update()
def instance_update_and_get_original(context, instance_uuid, values,
columns_to_join=None):
columns_to_join=None, expected=None):
"""Set the given properties on an instance and update it. Return
a shallow copy of the original instance reference, as well as the
updated one.
@ -2451,9 +2466,14 @@ def instance_update_and_get_original(context, instance_uuid, values,
Raises NotFound if instance does not exist.
"""
return _instance_update(context, instance_uuid, values,
copy_old_instance=True,
columns_to_join=columns_to_join)
session = get_session()
with session.begin():
instance_ref = _instance_get_by_uuid(context, instance_uuid,
columns_to_join=columns_to_join,
session=session)
return (copy.copy(instance_ref),
_instance_update(context, session, instance_uuid, values,
expected, original=instance_ref))
# NOTE(danms): This updates the instance's metadata list in-place and in
@ -2490,73 +2510,122 @@ def _instance_metadata_update_in_place(context, instance, metadata_type, model,
instance[metadata_type].append(newitem)
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
def _instance_update(context, instance_uuid, values, copy_old_instance=False,
columns_to_join=None):
session = get_session()
def _instance_update(context, session, instance_uuid, values, expected,
original=None):
if not uuidutils.is_uuid_like(instance_uuid):
raise exception.InvalidUUID(instance_uuid)
with session.begin():
instance_ref = _instance_get_by_uuid(context, instance_uuid,
session=session,
columns_to_join=columns_to_join)
if "expected_task_state" in values:
# it is not a db column so always pop out
expected = values.pop("expected_task_state")
if not isinstance(expected, (tuple, list, set)):
expected = (expected,)
actual_state = instance_ref["task_state"]
if actual_state not in expected:
if actual_state == task_states.DELETING:
raise exception.UnexpectedDeletingTaskStateError(
actual=actual_state, expected=expected)
else:
raise exception.UnexpectedTaskStateError(
actual=actual_state, expected=expected)
if "expected_vm_state" in values:
expected = values.pop("expected_vm_state")
if not isinstance(expected, (tuple, list, set)):
expected = (expected,)
actual_state = instance_ref["vm_state"]
if actual_state not in expected:
raise exception.UnexpectedVMStateError(actual=actual_state,
expected=expected)
if expected is None:
expected = {}
else:
# Coerce all single values to singleton lists
expected = {k: [None] if v is None else sqlalchemyutils.to_list(v)
for (k, v) in six.iteritems(expected)}
instance_hostname = instance_ref['hostname'] or ''
if ("hostname" in values and
values["hostname"].lower() != instance_hostname.lower()):
_validate_unique_server_name(context,
session,
values['hostname'])
# Extract 'expected_' values from values dict, as these aren't actually
# updates
for field in ('task_state', 'vm_state'):
expected_field = 'expected_%s' % field
if expected_field in values:
value = values.pop(expected_field, None)
# Coerce all single values to singleton lists
if value is None:
expected[field] = [None]
else:
expected[field] = sqlalchemyutils.to_list(value)
if copy_old_instance:
old_instance_ref = copy.copy(instance_ref)
# Values which need to be updated separately
metadata = values.pop('metadata', None)
system_metadata = values.pop('system_metadata', None)
_handle_objects_related_type_conversions(values)
# Hostname is potentially unique, but this is enforced in code rather
# than the DB. The query below races, but the number of users of
# osapi_compute_unique_server_name_scope is small, and a robust fix
# will be complex. This is intentionally left as is for the moment.
if 'hostname' in values:
_validate_unique_server_name(context, session, values['hostname'])
compare = models.Instance(uuid=instance_uuid, **expected)
try:
instance_ref = model_query(context, models.Instance,
project_only=True, session=session).\
update_on_match(compare, 'uuid', values)
except update_match.NoRowsMatched:
# Update failed. Try to find why and raise a specific error.
# We should get here only because our expected values were not current
# when update_on_match executed. Having failed, we now have a hint that
# the values are out of date and should check them.
# This code is made more complex because we are using repeatable reads.
# If we have previously read the original instance in the current
# transaction, reading it again will return the same data, even though
# the above update failed because it has changed: it is not possible to
# determine what has changed in this transaction. In this case we raise
# UnknownInstanceUpdateConflict, which will cause the operation to be
# retried in a new transaction.
# Because of the above, if we have previously read the instance in the
# current transaction it will have been passed as 'original', and there
# is no point refreshing it. If we have not previously read the
# instance, we can fetch it here and we will get fresh data.
if original is None:
original = _instance_get_by_uuid(context, instance_uuid,
session=session)
conflicts_expected = {}
conflicts_actual = {}
for (field, expected_values) in six.iteritems(expected):
actual = original[field]
if actual not in expected_values:
conflicts_expected[field] = expected_values
conflicts_actual[field] = actual
# Exception properties
exc_props = {
'instance_uuid': instance_uuid,
'expected': conflicts_expected,
'actual': conflicts_actual
}
# There was a conflict, but something (probably the MySQL read view,
# but possibly an exceptionally unlikely second race) is preventing us
# from seeing what it is. When we go round again we'll get a fresh
# transaction and a fresh read view.
if len(conflicts_actual) == 0:
raise exception.UnknownInstanceUpdateConflict(**exc_props)
# Task state gets special handling for convenience. We raise the
# specific error UnexpectedDeletingTaskStateError or
# UnexpectedTaskStateError as appropriate
if 'task_state' in conflicts_actual:
conflict_task_state = conflicts_actual['task_state']
if conflict_task_state == task_states.DELETING:
exc = exception.UnexpectedDeletingTaskStateError
else:
exc = exception.UnexpectedTaskStateError
# Everything else is an InstanceUpdateConflict
else:
old_instance_ref = None
exc = exception.InstanceUpdateConflict
metadata = values.get('metadata')
if metadata is not None:
_instance_metadata_update_in_place(context, instance_ref,
'metadata',
models.InstanceMetadata,
values.pop('metadata'),
session)
raise exc(**exc_props)
system_metadata = values.get('system_metadata')
if system_metadata is not None:
_instance_metadata_update_in_place(context, instance_ref,
'system_metadata',
models.InstanceSystemMetadata,
values.pop('system_metadata'),
session)
if metadata is not None:
_instance_metadata_update_in_place(context, instance_ref,
'metadata',
models.InstanceMetadata,
metadata, session)
_handle_objects_related_type_conversions(values)
instance_ref.update(values)
session.add(instance_ref)
if system_metadata is not None:
_instance_metadata_update_in_place(context, instance_ref,
'system_metadata',
models.InstanceSystemMetadata,
system_metadata, session)
return (old_instance_ref, instance_ref)
return instance_ref
def instance_add_security_group(context, instance_uuid, security_group_id):

View File

@ -1433,9 +1433,18 @@ class InstanceUserDataMalformed(NovaException):
msg_fmt = _("User data needs to be valid base 64.")
class UnexpectedTaskStateError(NovaException):
msg_fmt = _("Unexpected task state: expecting %(expected)s but "
"the actual state is %(actual)s")
class InstanceUpdateConflict(NovaException):
msg_fmt = _("Conflict updating instance %(instance_uuid)s. "
"Expected: %(expected)s. Actual: %(actual)s")
class UnknownInstanceUpdateConflict(InstanceUpdateConflict):
msg_fmt = _("Conflict updating instance %(instance_uuid)s, but we were "
"unable to determine the cause")
class UnexpectedTaskStateError(InstanceUpdateConflict):
pass
class UnexpectedDeletingTaskStateError(UnexpectedTaskStateError):
@ -1451,11 +1460,6 @@ class InstanceActionEventNotFound(NovaException):
msg_fmt = _("Event %(event)s not found for action id %(action_id)s")
class UnexpectedVMStateError(NovaException):
msg_fmt = _("Unexpected VM state: expecting %(expected)s but "
"the actual state is %(actual)s")
class CryptoCAFileNotFound(FileNotFound):
msg_fmt = _("The CA file for %(project)s could not be found")

View File

@ -1786,7 +1786,9 @@ class ComputeTestCase(BaseTestCase):
with mock.patch.object(instance, 'save') as mock_save:
mock_save.side_effect = exception.UnexpectedDeletingTaskStateError(
actual='foo', expected='bar')
instance_uuid=instance['uuid'],
expected={'task_state': 'bar'},
actual={'task_state': 'foo'})
self.compute.build_and_run_instance(self.context, instance, {}, {},
{}, block_device_mapping=[])
self.assertTrue(mock_save.called)
@ -3159,7 +3161,9 @@ class ComputeTestCase(BaseTestCase):
def test_snapshot_fails_with_task_state_error(self):
deleting_state_error = exception.UnexpectedDeletingTaskStateError(
expected=task_states.IMAGE_SNAPSHOT, actual=task_states.DELETING)
instance_uuid='fake_uuid',
expected={'task_state': task_states.IMAGE_SNAPSHOT},
actual={'task_state': task_states.DELETING})
self._test_snapshot_deletes_image_on_failure(
'error', deleting_state_error)
self.assertTrue(self.fake_image_delete_called)

View File

@ -1268,7 +1268,9 @@ class _ComputeAPIUnitTestMixIn(object):
self.context, delta, fake_inst).AndReturn(fake_quotas)
exc = exception.UnexpectedTaskStateError(
actual=task_states.RESIZE_REVERTING, expected=None)
instance_uuid=fake_inst['uuid'],
actual={'task_state': task_states.RESIZE_REVERTING},
expected={'task_state': [None]})
fake_inst.save(expected_task_state=[None]).AndRaise(exc)
fake_quotas.rollback()

View File

@ -3032,8 +3032,8 @@ class ComputeManagerBuildInstanceTestCase(test.NoDBTestCase):
def test_build_and_run_unexpecteddeleting_exception(self):
self._test_build_and_run_exceptions(
exception.UnexpectedDeletingTaskStateError(expected='',
actual=''))
exception.UnexpectedDeletingTaskStateError(
instance_uuid='fake_uuid', expected={}, actual={}))
def test_build_and_run_buildabort_exception(self):
self._test_build_and_run_exceptions(exception.BuildAbortException(
@ -3281,7 +3281,9 @@ class ComputeManagerBuildInstanceTestCase(test.NoDBTestCase):
return_value=self.network_info),
mock.patch.object(self.instance, 'save',
side_effect=exception.UnexpectedDeletingTaskStateError(
actual=task_states.DELETING, expected='None')),
instance_uuid='fake_uuid',
actual={'task_state': task_states.DELETING},
expected={'task_state': None})),
) as (_build_networks_for_instance, save):
try:
@ -3319,8 +3321,10 @@ class ComputeManagerBuildInstanceTestCase(test.NoDBTestCase):
'_build_networks_for_instance') as _build_networks:
exc = exception.UnexpectedDeletingTaskStateError
_build_networks.side_effect = exc(actual=task_states.DELETING,
expected='None')
_build_networks.side_effect = exc(
instance_uuid='fake_uuid',
actual={'task_state': task_states.DELETING},
expected={'task_state': None})
try:
with self.compute._build_resources(self.context, self.instance,
@ -3413,7 +3417,7 @@ class ComputeManagerBuildInstanceTestCase(test.NoDBTestCase):
self, mock_save, mock_build_network, mock_info_wait):
mock_build_network.return_value = self.network_info
mock_save.side_effect = exception.UnexpectedTaskStateError(
expected='', actual='')
instance_uuid='fake_uuid', expected={}, actual={})
try:
with self.compute._build_resources(self.context, self.instance,
self.requested_networks, self.security_groups,

View File

@ -460,8 +460,9 @@ class ConductorTestCase(_BaseTestCase, test.TestCase):
def test_instance_update_expected_exceptions(self):
errors = (exc.InvalidUUID(uuid='foo'),
exc.InstanceNotFound(instance_id=1),
exc.UnexpectedTaskStateError(expected='foo',
actual='bar'))
exc.UnexpectedTaskStateError(instance_uuid='fake_uuid',
expected={'task_state': 'foo'},
actual={'task_state': 'bar'}))
self._test_expected_exceptions(
'instance_update', self.conductor.instance_update,
errors, None, {'foo': 'bar'}, None)

View File

@ -29,6 +29,7 @@ from oslo_config import cfg
from oslo_db import api as oslo_db_api
from oslo_db import exception as db_exc
from oslo_db.sqlalchemy import test_base
from oslo_db.sqlalchemy import update_match
from oslo_db.sqlalchemy import utils as sqlalchemyutils
from oslo_serialization import jsonutils
from oslo_utils import timeutils
@ -48,6 +49,7 @@ from sqlalchemy import Table
from nova import block_device
from nova.compute import arch
from nova.compute import task_states
from nova.compute import vm_states
from nova import context
from nova import db
@ -2465,7 +2467,7 @@ class InstanceTestCase(test.TestCase, ModelsObjectComparatorMixin):
def test_instance_update_with_unexpected_vm_state(self):
instance = self.create_instance_with_args(vm_state='foo')
self.assertRaises(exception.UnexpectedVMStateError,
self.assertRaises(exception.InstanceUpdateConflict,
db.instance_update, self.ctxt, instance['uuid'],
{'host': 'h1', 'expected_vm_state': ('spam', 'bar')})
@ -2532,7 +2534,7 @@ class InstanceTestCase(test.TestCase, ModelsObjectComparatorMixin):
# Make sure instance faults is deleted as well
self.assertEqual(0, len(faults[uuid]))
def test_instance_update_with_and_get_original(self):
def test_instance_update_and_get_original(self):
instance = self.create_instance_with_args(vm_state='building')
(old_ref, new_ref) = db.instance_update_and_get_original(self.ctxt,
instance['uuid'], {'vm_state': 'needscoffee'})
@ -2588,6 +2590,174 @@ class InstanceTestCase(test.TestCase, ModelsObjectComparatorMixin):
# 4. the "old" object is detached from this Session.
self.assertTrue(old_insp.detached)
def test_instance_update_and_get_original_conflict_race(self):
# Ensure that we retry if update_on_match fails for no discernable
# reason
instance = self.create_instance_with_args()
orig_update_on_match = update_match.update_on_match
# Reproduce the conditions of a race between fetching and updating the
# instance by making update_on_match fail for no discernable reason the
# first time it is called, but work normally the second time.
with mock.patch.object(update_match, 'update_on_match',
side_effect=[update_match.NoRowsMatched,
orig_update_on_match]):
db.instance_update_and_get_original(
self.ctxt, instance['uuid'], {'metadata': {'mk1': 'mv3'}})
self.assertEqual(update_match.update_on_match.call_count, 2)
def test_instance_update_and_get_original_conflict_race_fallthrough(self):
# Ensure that is update_match continuously fails for no discernable
# reason, we evantually raise UnknownInstanceUpdateConflict
instance = self.create_instance_with_args()
# Reproduce the conditions of a race between fetching and updating the
# instance by making update_on_match fail for no discernable reason.
with mock.patch.object(update_match, 'update_on_match',
side_effect=update_match.NoRowsMatched):
self.assertRaises(exception.UnknownInstanceUpdateConflict,
db.instance_update_and_get_original,
self.ctxt,
instance['uuid'],
{'metadata': {'mk1': 'mv3'}})
def test_instance_update_and_get_original_expected_host(self):
# Ensure that we allow update when expecting a host field
instance = self.create_instance_with_args()
(orig, new) = db.instance_update_and_get_original(
self.ctxt, instance['uuid'], {'host': None},
expected={'host': 'h1'})
self.assertIsNone(new['host'])
def test_instance_update_and_get_original_expected_host_fail(self):
# Ensure that we detect a changed expected host and raise
# InstanceUpdateConflict
instance = self.create_instance_with_args()
try:
db.instance_update_and_get_original(
self.ctxt, instance['uuid'], {'host': None},
expected={'host': 'h2'})
except exception.InstanceUpdateConflict as ex:
self.assertEqual(ex.kwargs['instance_uuid'], instance['uuid'])
self.assertEqual(ex.kwargs['actual'], {'host': 'h1'})
self.assertEqual(ex.kwargs['expected'], {'host': ['h2']})
else:
self.fail('InstanceUpdateConflict was not raised')
def test_instance_update_and_get_original_expected_host_none(self):
# Ensure that we allow update when expecting a host field of None
instance = self.create_instance_with_args(host=None)
(old, new) = db.instance_update_and_get_original(
self.ctxt, instance['uuid'], {'host': 'h1'},
expected={'host': None})
self.assertEqual('h1', new['host'])
def test_instance_update_and_get_original_expected_host_none_fail(self):
# Ensure that we detect a changed expected host of None and raise
# InstanceUpdateConflict
instance = self.create_instance_with_args()
try:
db.instance_update_and_get_original(
self.ctxt, instance['uuid'], {'host': None},
expected={'host': None})
except exception.InstanceUpdateConflict as ex:
self.assertEqual(ex.kwargs['instance_uuid'], instance['uuid'])
self.assertEqual(ex.kwargs['actual'], {'host': 'h1'})
self.assertEqual(ex.kwargs['expected'], {'host': [None]})
else:
self.fail('InstanceUpdateConflict was not raised')
def test_instance_update_and_get_original_expected_task_state_single_fail(self): # noqa
# Ensure that we detect a changed expected task and raise
# UnexpectedTaskStateError
instance = self.create_instance_with_args()
try:
db.instance_update_and_get_original(
self.ctxt, instance['uuid'], {
'host': None,
'expected_task_state': task_states.SCHEDULING
})
except exception.UnexpectedTaskStateError as ex:
self.assertEqual(ex.kwargs['instance_uuid'], instance['uuid'])
self.assertEqual(ex.kwargs['actual'], {'task_state': None})
self.assertEqual(ex.kwargs['expected'],
{'task_state': [task_states.SCHEDULING]})
else:
self.fail('UnexpectedTaskStateError was not raised')
def test_instance_update_and_get_original_expected_task_state_single_pass(self): # noqa
# Ensure that we allow an update when expected task is correct
instance = self.create_instance_with_args()
(orig, new) = db.instance_update_and_get_original(
self.ctxt, instance['uuid'], {
'host': None,
'expected_task_state': None
})
self.assertIsNone(new['host'])
def test_instance_update_and_get_original_expected_task_state_multi_fail(self): # noqa
# Ensure that we detect a changed expected task and raise
# UnexpectedTaskStateError when there are multiple potential expected
# tasks
instance = self.create_instance_with_args()
try:
db.instance_update_and_get_original(
self.ctxt, instance['uuid'], {
'host': None,
'expected_task_state': [task_states.SCHEDULING,
task_states.REBUILDING]
})
except exception.UnexpectedTaskStateError as ex:
self.assertEqual(ex.kwargs['instance_uuid'], instance['uuid'])
self.assertEqual(ex.kwargs['actual'], {'task_state': None})
self.assertEqual(ex.kwargs['expected'],
{'task_state': [task_states.SCHEDULING,
task_states.REBUILDING]})
else:
self.fail('UnexpectedTaskStateError was not raised')
def test_instance_update_and_get_original_expected_task_state_multi_pass(self): # noqa
# Ensure that we allow an update when expected task is in a list of
# expected tasks
instance = self.create_instance_with_args()
(orig, new) = db.instance_update_and_get_original(
self.ctxt, instance['uuid'], {
'host': None,
'expected_task_state': [task_states.SCHEDULING, None]
})
self.assertIsNone(new['host'])
def test_instance_update_and_get_original_expected_task_state_deleting(self): # noqa
# Ensure that we raise UnepectedDeletingTaskStateError when task state
# is not as expected, and it is DELETING
instance = self.create_instance_with_args(
task_state=task_states.DELETING)
try:
db.instance_update_and_get_original(
self.ctxt, instance['uuid'], {
'host': None,
'expected_task_state': task_states.SCHEDULING
})
except exception.UnexpectedDeletingTaskStateError as ex:
self.assertEqual(ex.kwargs['instance_uuid'], instance['uuid'])
self.assertEqual(ex.kwargs['actual'],
{'task_state': task_states.DELETING})
self.assertEqual(ex.kwargs['expected'],
{'task_state': [task_states.SCHEDULING]})
else:
self.fail('UnexpectedDeletingTaskStateError was not raised')
def test_instance_update_unique_name(self):
context1 = context.RequestContext('user1', 'p1')
context2 = context.RequestContext('user2', 'p2')