Merge "Implement compare-and-swap for instance update"
This commit is contained in:
commit
998fd4ccb2
@ -730,17 +730,18 @@ def instance_get_all_hung_in_rebooting(context, reboot_window):
|
||||
return IMPL.instance_get_all_hung_in_rebooting(context, reboot_window)
|
||||
|
||||
|
||||
def instance_update(context, instance_uuid, values):
|
||||
def instance_update(context, instance_uuid, values, expected=None):
|
||||
"""Set the given properties on an instance and update it.
|
||||
|
||||
Raises NotFound if instance does not exist.
|
||||
|
||||
"""
|
||||
return IMPL.instance_update(context, instance_uuid, values)
|
||||
return IMPL.instance_update(context, instance_uuid, values,
|
||||
expected=expected)
|
||||
|
||||
|
||||
def instance_update_and_get_original(context, instance_uuid, values,
|
||||
columns_to_join=None):
|
||||
columns_to_join=None, expected=None):
|
||||
"""Set the given properties on an instance and update it. Return
|
||||
a shallow copy of the original instance reference, as well as the
|
||||
updated one.
|
||||
@ -754,7 +755,8 @@ def instance_update_and_get_original(context, instance_uuid, values,
|
||||
Raises NotFound if instance does not exist.
|
||||
"""
|
||||
rv = IMPL.instance_update_and_get_original(context, instance_uuid, values,
|
||||
columns_to_join=columns_to_join)
|
||||
columns_to_join=columns_to_join,
|
||||
expected=expected)
|
||||
return rv
|
||||
|
||||
|
||||
|
@ -30,6 +30,7 @@ from oslo_db import api as oslo_db_api
|
||||
from oslo_db import exception as db_exc
|
||||
from oslo_db import options as oslo_db_options
|
||||
from oslo_db.sqlalchemy import session as db_session
|
||||
from oslo_db.sqlalchemy import update_match
|
||||
from oslo_db.sqlalchemy import utils as sqlalchemyutils
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import excutils
|
||||
@ -2426,15 +2427,29 @@ def instance_get_all_hung_in_rebooting(context, reboot_window):
|
||||
manual_joins=[])
|
||||
|
||||
|
||||
@require_context
|
||||
def instance_update(context, instance_uuid, values):
|
||||
instance_ref = _instance_update(context, instance_uuid, values)[1]
|
||||
return instance_ref
|
||||
def _retry_instance_update():
|
||||
"""Wrap with oslo_db_api.wrap_db_retry, and also retry on
|
||||
UnknownInstanceUpdateConflict.
|
||||
"""
|
||||
exception_checker = \
|
||||
lambda exc: isinstance(exc, (exception.UnknownInstanceUpdateConflict,))
|
||||
return oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True,
|
||||
exception_checker=exception_checker)
|
||||
|
||||
|
||||
@require_context
|
||||
@_retry_instance_update()
|
||||
def instance_update(context, instance_uuid, values, expected=None):
|
||||
session = get_session()
|
||||
with session.begin():
|
||||
return _instance_update(context, session, instance_uuid,
|
||||
values, expected)
|
||||
|
||||
|
||||
@require_context
|
||||
@_retry_instance_update()
|
||||
def instance_update_and_get_original(context, instance_uuid, values,
|
||||
columns_to_join=None):
|
||||
columns_to_join=None, expected=None):
|
||||
"""Set the given properties on an instance and update it. Return
|
||||
a shallow copy of the original instance reference, as well as the
|
||||
updated one.
|
||||
@ -2451,9 +2466,14 @@ def instance_update_and_get_original(context, instance_uuid, values,
|
||||
|
||||
Raises NotFound if instance does not exist.
|
||||
"""
|
||||
return _instance_update(context, instance_uuid, values,
|
||||
copy_old_instance=True,
|
||||
columns_to_join=columns_to_join)
|
||||
session = get_session()
|
||||
with session.begin():
|
||||
instance_ref = _instance_get_by_uuid(context, instance_uuid,
|
||||
columns_to_join=columns_to_join,
|
||||
session=session)
|
||||
return (copy.copy(instance_ref),
|
||||
_instance_update(context, session, instance_uuid, values,
|
||||
expected, original=instance_ref))
|
||||
|
||||
|
||||
# NOTE(danms): This updates the instance's metadata list in-place and in
|
||||
@ -2490,73 +2510,122 @@ def _instance_metadata_update_in_place(context, instance, metadata_type, model,
|
||||
instance[metadata_type].append(newitem)
|
||||
|
||||
|
||||
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
|
||||
def _instance_update(context, instance_uuid, values, copy_old_instance=False,
|
||||
columns_to_join=None):
|
||||
session = get_session()
|
||||
|
||||
def _instance_update(context, session, instance_uuid, values, expected,
|
||||
original=None):
|
||||
if not uuidutils.is_uuid_like(instance_uuid):
|
||||
raise exception.InvalidUUID(instance_uuid)
|
||||
|
||||
with session.begin():
|
||||
instance_ref = _instance_get_by_uuid(context, instance_uuid,
|
||||
session=session,
|
||||
columns_to_join=columns_to_join)
|
||||
if "expected_task_state" in values:
|
||||
# it is not a db column so always pop out
|
||||
expected = values.pop("expected_task_state")
|
||||
if not isinstance(expected, (tuple, list, set)):
|
||||
expected = (expected,)
|
||||
actual_state = instance_ref["task_state"]
|
||||
if actual_state not in expected:
|
||||
if actual_state == task_states.DELETING:
|
||||
raise exception.UnexpectedDeletingTaskStateError(
|
||||
actual=actual_state, expected=expected)
|
||||
else:
|
||||
raise exception.UnexpectedTaskStateError(
|
||||
actual=actual_state, expected=expected)
|
||||
if "expected_vm_state" in values:
|
||||
expected = values.pop("expected_vm_state")
|
||||
if not isinstance(expected, (tuple, list, set)):
|
||||
expected = (expected,)
|
||||
actual_state = instance_ref["vm_state"]
|
||||
if actual_state not in expected:
|
||||
raise exception.UnexpectedVMStateError(actual=actual_state,
|
||||
expected=expected)
|
||||
if expected is None:
|
||||
expected = {}
|
||||
else:
|
||||
# Coerce all single values to singleton lists
|
||||
expected = {k: [None] if v is None else sqlalchemyutils.to_list(v)
|
||||
for (k, v) in six.iteritems(expected)}
|
||||
|
||||
instance_hostname = instance_ref['hostname'] or ''
|
||||
if ("hostname" in values and
|
||||
values["hostname"].lower() != instance_hostname.lower()):
|
||||
_validate_unique_server_name(context,
|
||||
session,
|
||||
values['hostname'])
|
||||
# Extract 'expected_' values from values dict, as these aren't actually
|
||||
# updates
|
||||
for field in ('task_state', 'vm_state'):
|
||||
expected_field = 'expected_%s' % field
|
||||
if expected_field in values:
|
||||
value = values.pop(expected_field, None)
|
||||
# Coerce all single values to singleton lists
|
||||
if value is None:
|
||||
expected[field] = [None]
|
||||
else:
|
||||
expected[field] = sqlalchemyutils.to_list(value)
|
||||
|
||||
if copy_old_instance:
|
||||
old_instance_ref = copy.copy(instance_ref)
|
||||
# Values which need to be updated separately
|
||||
metadata = values.pop('metadata', None)
|
||||
system_metadata = values.pop('system_metadata', None)
|
||||
|
||||
_handle_objects_related_type_conversions(values)
|
||||
|
||||
# Hostname is potentially unique, but this is enforced in code rather
|
||||
# than the DB. The query below races, but the number of users of
|
||||
# osapi_compute_unique_server_name_scope is small, and a robust fix
|
||||
# will be complex. This is intentionally left as is for the moment.
|
||||
if 'hostname' in values:
|
||||
_validate_unique_server_name(context, session, values['hostname'])
|
||||
|
||||
compare = models.Instance(uuid=instance_uuid, **expected)
|
||||
try:
|
||||
instance_ref = model_query(context, models.Instance,
|
||||
project_only=True, session=session).\
|
||||
update_on_match(compare, 'uuid', values)
|
||||
except update_match.NoRowsMatched:
|
||||
# Update failed. Try to find why and raise a specific error.
|
||||
|
||||
# We should get here only because our expected values were not current
|
||||
# when update_on_match executed. Having failed, we now have a hint that
|
||||
# the values are out of date and should check them.
|
||||
|
||||
# This code is made more complex because we are using repeatable reads.
|
||||
# If we have previously read the original instance in the current
|
||||
# transaction, reading it again will return the same data, even though
|
||||
# the above update failed because it has changed: it is not possible to
|
||||
# determine what has changed in this transaction. In this case we raise
|
||||
# UnknownInstanceUpdateConflict, which will cause the operation to be
|
||||
# retried in a new transaction.
|
||||
|
||||
# Because of the above, if we have previously read the instance in the
|
||||
# current transaction it will have been passed as 'original', and there
|
||||
# is no point refreshing it. If we have not previously read the
|
||||
# instance, we can fetch it here and we will get fresh data.
|
||||
if original is None:
|
||||
original = _instance_get_by_uuid(context, instance_uuid,
|
||||
session=session)
|
||||
|
||||
conflicts_expected = {}
|
||||
conflicts_actual = {}
|
||||
for (field, expected_values) in six.iteritems(expected):
|
||||
actual = original[field]
|
||||
if actual not in expected_values:
|
||||
conflicts_expected[field] = expected_values
|
||||
conflicts_actual[field] = actual
|
||||
|
||||
# Exception properties
|
||||
exc_props = {
|
||||
'instance_uuid': instance_uuid,
|
||||
'expected': conflicts_expected,
|
||||
'actual': conflicts_actual
|
||||
}
|
||||
|
||||
# There was a conflict, but something (probably the MySQL read view,
|
||||
# but possibly an exceptionally unlikely second race) is preventing us
|
||||
# from seeing what it is. When we go round again we'll get a fresh
|
||||
# transaction and a fresh read view.
|
||||
if len(conflicts_actual) == 0:
|
||||
raise exception.UnknownInstanceUpdateConflict(**exc_props)
|
||||
|
||||
# Task state gets special handling for convenience. We raise the
|
||||
# specific error UnexpectedDeletingTaskStateError or
|
||||
# UnexpectedTaskStateError as appropriate
|
||||
if 'task_state' in conflicts_actual:
|
||||
conflict_task_state = conflicts_actual['task_state']
|
||||
if conflict_task_state == task_states.DELETING:
|
||||
exc = exception.UnexpectedDeletingTaskStateError
|
||||
else:
|
||||
exc = exception.UnexpectedTaskStateError
|
||||
|
||||
# Everything else is an InstanceUpdateConflict
|
||||
else:
|
||||
old_instance_ref = None
|
||||
exc = exception.InstanceUpdateConflict
|
||||
|
||||
metadata = values.get('metadata')
|
||||
if metadata is not None:
|
||||
_instance_metadata_update_in_place(context, instance_ref,
|
||||
'metadata',
|
||||
models.InstanceMetadata,
|
||||
values.pop('metadata'),
|
||||
session)
|
||||
raise exc(**exc_props)
|
||||
|
||||
system_metadata = values.get('system_metadata')
|
||||
if system_metadata is not None:
|
||||
_instance_metadata_update_in_place(context, instance_ref,
|
||||
'system_metadata',
|
||||
models.InstanceSystemMetadata,
|
||||
values.pop('system_metadata'),
|
||||
session)
|
||||
if metadata is not None:
|
||||
_instance_metadata_update_in_place(context, instance_ref,
|
||||
'metadata',
|
||||
models.InstanceMetadata,
|
||||
metadata, session)
|
||||
|
||||
_handle_objects_related_type_conversions(values)
|
||||
instance_ref.update(values)
|
||||
session.add(instance_ref)
|
||||
if system_metadata is not None:
|
||||
_instance_metadata_update_in_place(context, instance_ref,
|
||||
'system_metadata',
|
||||
models.InstanceSystemMetadata,
|
||||
system_metadata, session)
|
||||
|
||||
return (old_instance_ref, instance_ref)
|
||||
return instance_ref
|
||||
|
||||
|
||||
def instance_add_security_group(context, instance_uuid, security_group_id):
|
||||
|
Loading…
x
Reference in New Issue
Block a user