Merge "Consolidate resource locking with state changes"
This commit is contained in:
commit
c2995c73df
@ -97,6 +97,9 @@ class PollDelay(Exception):
|
||||
class Resource(status.ResourceStatus):
|
||||
BASE_ATTRIBUTES = (SHOW, ) = (attributes.SHOW_ATTR, )
|
||||
|
||||
LOCK_ACTIONS = (LOCK_NONE, LOCK_ACQUIRE, LOCK_RELEASE) = (
|
||||
0, 1, -1)
|
||||
|
||||
# If True, this resource must be created before it can be referenced.
|
||||
strict_dependency = True
|
||||
|
||||
@ -254,6 +257,8 @@ class Resource(status.ResourceStatus):
|
||||
self.replaced_by = None
|
||||
self.current_template_id = None
|
||||
self.root_stack_id = None
|
||||
self._calling_engine_id = None
|
||||
self._atomic_key = None
|
||||
|
||||
if stack.cache_data is None:
|
||||
resource = stack.db_resource_get(name)
|
||||
@ -296,6 +301,7 @@ class Resource(status.ResourceStatus):
|
||||
self.replaced_by = resource.replaced_by
|
||||
self.current_template_id = resource.current_template_id
|
||||
self.root_stack_id = resource.root_stack_id
|
||||
self._atomic_key = resource.atomic_key
|
||||
|
||||
@property
|
||||
def external_id(self):
|
||||
@ -448,7 +454,8 @@ class Resource(status.ResourceStatus):
|
||||
LOG.debug('Setting metadata for %s', six.text_type(self))
|
||||
if refresh:
|
||||
metadata = merge_metadata(metadata, db_res.rsrc_metadata)
|
||||
db_res.update_metadata(metadata)
|
||||
if db_res.update_metadata(metadata):
|
||||
self._incr_atomic_key()
|
||||
self._rsrc_metadata = metadata
|
||||
|
||||
def handle_metadata_reset(self):
|
||||
@ -747,6 +754,12 @@ class Resource(status.ResourceStatus):
|
||||
def glance(self):
|
||||
return self.client('glance')
|
||||
|
||||
def _incr_atomic_key(self):
|
||||
if self._atomic_key is None:
|
||||
self._atomic_key = 1
|
||||
else:
|
||||
self._atomic_key = self._atomic_key + 1
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _action_recorder(self, action, expected_exceptions=tuple()):
|
||||
"""Return a context manager to record the progress of an action.
|
||||
@ -759,12 +772,22 @@ class Resource(status.ResourceStatus):
|
||||
Expected exceptions are re-raised, with the Resource moved to the
|
||||
COMPLETE state.
|
||||
"""
|
||||
if self.stack.convergence:
|
||||
lock_acquire = self.LOCK_ACQUIRE
|
||||
lock_release = self.LOCK_RELEASE
|
||||
else:
|
||||
lock_acquire = lock_release = self.LOCK_NONE
|
||||
|
||||
try:
|
||||
self.state_set(action, self.IN_PROGRESS)
|
||||
self.state_set(action, self.IN_PROGRESS, lock=lock_acquire)
|
||||
yield
|
||||
except exception.UpdateInProgress as ex:
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.info('Update in progress for %s', self.name)
|
||||
except expected_exceptions as ex:
|
||||
with excutils.save_and_reraise_exception():
|
||||
self.state_set(action, self.COMPLETE, six.text_type(ex))
|
||||
self.state_set(action, self.COMPLETE, six.text_type(ex),
|
||||
lock=lock_release)
|
||||
LOG.debug('%s', six.text_type(ex))
|
||||
except Exception as ex:
|
||||
LOG.info('%(action)s: %(info)s',
|
||||
@ -772,7 +795,8 @@ class Resource(status.ResourceStatus):
|
||||
"info": six.text_type(self)},
|
||||
exc_info=True)
|
||||
failure = exception.ResourceFailure(ex, self, action)
|
||||
self.state_set(action, self.FAILED, six.text_type(failure))
|
||||
self.state_set(action, self.FAILED, six.text_type(failure),
|
||||
lock=lock_release)
|
||||
raise failure
|
||||
except BaseException as exc:
|
||||
with excutils.save_and_reraise_exception():
|
||||
@ -781,11 +805,12 @@ class Resource(status.ResourceStatus):
|
||||
msg = '%s aborted' % action
|
||||
if reason:
|
||||
msg += ' (%s)' % reason
|
||||
self.state_set(action, self.FAILED, msg)
|
||||
self.state_set(action, self.FAILED, msg,
|
||||
lock=lock_release)
|
||||
except Exception:
|
||||
LOG.exception('Error marking resource as failed')
|
||||
else:
|
||||
self.state_set(action, self.COMPLETE)
|
||||
self.state_set(action, self.COMPLETE, lock=lock_release)
|
||||
|
||||
def action_handler_task(self, action, args=None, action_prefix=None):
|
||||
"""A task to call the Resource subclass's handler methods for action.
|
||||
@ -903,19 +928,19 @@ class Resource(status.ResourceStatus):
|
||||
def create_convergence(self, template_id, resource_data, engine_id,
|
||||
timeout, progress_callback=None):
|
||||
"""Creates the resource by invoking the scheduler TaskRunner."""
|
||||
with self.lock(engine_id):
|
||||
self.requires = list(
|
||||
set(data.primary_key for data in resource_data.values()
|
||||
if data is not None)
|
||||
)
|
||||
self.current_template_id = template_id
|
||||
if self.stack.adopt_stack_data is None:
|
||||
runner = scheduler.TaskRunner(self.create)
|
||||
else:
|
||||
adopt_data = self.stack._adopt_kwargs(self)
|
||||
runner = scheduler.TaskRunner(self.adopt, **adopt_data)
|
||||
self._calling_engine_id = engine_id
|
||||
self.requires = list(
|
||||
set(data.primary_key for data in resource_data.values()
|
||||
if data is not None)
|
||||
)
|
||||
self.current_template_id = template_id
|
||||
if self.stack.adopt_stack_data is None:
|
||||
runner = scheduler.TaskRunner(self.create)
|
||||
else:
|
||||
adopt_data = self.stack._adopt_kwargs(self)
|
||||
runner = scheduler.TaskRunner(self.adopt, **adopt_data)
|
||||
|
||||
runner(timeout=timeout, progress_callback=progress_callback)
|
||||
runner(timeout=timeout, progress_callback=progress_callback)
|
||||
|
||||
def validate_external(self):
|
||||
if self.external_id is not None:
|
||||
@ -1165,55 +1190,74 @@ class Resource(status.ResourceStatus):
|
||||
resource_data and existing resource's requires, then updates the
|
||||
resource by invoking the scheduler TaskRunner.
|
||||
"""
|
||||
def update_tmpl_id_and_requires():
|
||||
def update_templ_id_and_requires(persist=True):
|
||||
self.current_template_id = template_id
|
||||
self.requires = list(
|
||||
set(data.primary_key for data in resource_data.values()
|
||||
if data is not None)
|
||||
)
|
||||
if not persist:
|
||||
return
|
||||
|
||||
with self.lock(engine_id):
|
||||
registry = new_stack.env.registry
|
||||
new_res_def = new_stack.t.resource_definitions(
|
||||
new_stack)[self.name]
|
||||
new_res_type = registry.get_class_to_instantiate(
|
||||
new_res_def.resource_type, resource_name=self.name)
|
||||
restricted_actions = registry.get_rsrc_restricted_actions(
|
||||
self.name)
|
||||
is_substituted = self.check_is_substituted(new_res_type)
|
||||
if type(self) is not new_res_type and not is_substituted:
|
||||
self._check_for_convergence_replace(restricted_actions)
|
||||
rs = {'current_template_id': self.current_template_id,
|
||||
'updated_at': self.updated_time,
|
||||
'requires': self.requires}
|
||||
if not resource_objects.Resource.select_and_update_by_id(
|
||||
self.context, self.id, rs, expected_engine_id=None,
|
||||
atomic_key=self._atomic_key):
|
||||
LOG.info("Resource %s is locked, can't set template",
|
||||
six.text_type(self))
|
||||
LOG.debug('Resource id:%(resource_id)s locked. '
|
||||
'Expected atomic_key:%(atomic_key)s, '
|
||||
'accessing from engine_id:%(engine_id)s',
|
||||
{'resource_id': self.id,
|
||||
'atomic_key': self._atomic_key,
|
||||
'engine_id': self._calling_engine_id})
|
||||
raise exception.UpdateInProgress(self.name)
|
||||
self._incr_atomic_key()
|
||||
|
||||
action_rollback = self.stack.action == self.stack.ROLLBACK
|
||||
status_in_progress = self.stack.status == self.stack.IN_PROGRESS
|
||||
if action_rollback and status_in_progress and self.replaced_by:
|
||||
try:
|
||||
self.restore_prev_rsrc(convergence=True)
|
||||
except Exception as e:
|
||||
failure = exception.ResourceFailure(e, self, self.action)
|
||||
self.state_set(self.UPDATE, self.FAILED,
|
||||
six.text_type(failure))
|
||||
raise failure
|
||||
self._calling_engine_id = engine_id
|
||||
registry = new_stack.env.registry
|
||||
new_res_def = new_stack.t.resource_definitions(
|
||||
new_stack)[self.name]
|
||||
new_res_type = registry.get_class_to_instantiate(
|
||||
new_res_def.resource_type, resource_name=self.name)
|
||||
restricted_actions = registry.get_rsrc_restricted_actions(
|
||||
self.name)
|
||||
is_substituted = self.check_is_substituted(new_res_type)
|
||||
if type(self) is not new_res_type and not is_substituted:
|
||||
self._check_for_convergence_replace(restricted_actions)
|
||||
|
||||
# Use new resource as update method if existing resource
|
||||
# need to be substituted.
|
||||
if is_substituted:
|
||||
substitute = new_res_type(self.name, self.t, self.stack)
|
||||
self.stack.resources[self.name] = substitute
|
||||
updater = substitute.update
|
||||
else:
|
||||
updater = self.update
|
||||
runner = scheduler.TaskRunner(updater, new_res_def)
|
||||
action_rollback = self.stack.action == self.stack.ROLLBACK
|
||||
status_in_progress = self.stack.status == self.stack.IN_PROGRESS
|
||||
if action_rollback and status_in_progress and self.replaced_by:
|
||||
try:
|
||||
runner(timeout=timeout, progress_callback=progress_callback)
|
||||
update_tmpl_id_and_requires()
|
||||
except UpdateReplace:
|
||||
raise
|
||||
except BaseException:
|
||||
with excutils.save_and_reraise_exception():
|
||||
update_tmpl_id_and_requires()
|
||||
else:
|
||||
update_tmpl_id_and_requires()
|
||||
self.restore_prev_rsrc(convergence=True)
|
||||
except Exception as e:
|
||||
failure = exception.ResourceFailure(e, self, self.action)
|
||||
self.state_set(self.UPDATE, self.FAILED,
|
||||
six.text_type(failure))
|
||||
raise failure
|
||||
|
||||
# Use new resource as update method if existing resource
|
||||
# need to be substituted.
|
||||
if is_substituted:
|
||||
substitute = new_res_type(self.name, self.t, self.stack)
|
||||
self.stack.resources[self.name] = substitute
|
||||
substitute._calling_engine_id = engine_id
|
||||
updater = substitute.update
|
||||
else:
|
||||
updater = self.update
|
||||
runner = scheduler.TaskRunner(
|
||||
updater, new_res_def,
|
||||
update_templ_func=update_templ_id_and_requires)
|
||||
try:
|
||||
runner(timeout=timeout, progress_callback=progress_callback)
|
||||
except UpdateReplace:
|
||||
raise
|
||||
except BaseException:
|
||||
with excutils.save_and_reraise_exception():
|
||||
update_templ_id_and_requires(persist=True)
|
||||
|
||||
def preview_update(self, after, before, after_props, before_props,
|
||||
prev_resource, check_init_complete=False):
|
||||
@ -1309,7 +1353,8 @@ class Resource(status.ResourceStatus):
|
||||
return False
|
||||
|
||||
@scheduler.wrappertask
|
||||
def update(self, after, before=None, prev_resource=None):
|
||||
def update(self, after, before=None, prev_resource=None,
|
||||
update_templ_func=None):
|
||||
"""Return a task to update the resource.
|
||||
|
||||
Subclasses should provide a handle_update() method to customise update,
|
||||
@ -1346,11 +1391,15 @@ class Resource(status.ResourceStatus):
|
||||
after_props,
|
||||
before_props,
|
||||
prev_resource):
|
||||
if update_templ_func is not None:
|
||||
update_templ_func(persist=True)
|
||||
return
|
||||
else:
|
||||
if not self._needs_update(after, before,
|
||||
after_props, before_props,
|
||||
prev_resource):
|
||||
if update_templ_func is not None:
|
||||
update_templ_func(persist=True)
|
||||
return
|
||||
|
||||
if not self.stack.convergence:
|
||||
@ -1382,6 +1431,9 @@ class Resource(status.ResourceStatus):
|
||||
self.t = after
|
||||
self.reparse()
|
||||
self._update_stored_properties()
|
||||
if update_templ_func is not None:
|
||||
# template/requires will be persisted by _action_recorder()
|
||||
update_templ_func(persist=False)
|
||||
|
||||
except exception.ResourceActionRestricted as ae:
|
||||
# catch all ResourceActionRestricted exceptions
|
||||
@ -1610,12 +1662,12 @@ class Resource(status.ResourceStatus):
|
||||
if (db_res.current_template_id == template_id):
|
||||
# Following update failure is ignorable; another
|
||||
# update might have locked/updated the resource.
|
||||
db_res.select_and_update(
|
||||
{'needed_by': self.needed_by,
|
||||
'replaces': None},
|
||||
atomic_key=db_res.atomic_key,
|
||||
expected_engine_id=None
|
||||
)
|
||||
if db_res.select_and_update(
|
||||
{'needed_by': self.needed_by,
|
||||
'replaces': None},
|
||||
atomic_key=db_res.atomic_key,
|
||||
expected_engine_id=None):
|
||||
self._incr_atomic_key()
|
||||
|
||||
def delete_convergence(self, template_id, input_data, engine_id, timeout,
|
||||
progress_callback=None):
|
||||
@ -1629,22 +1681,22 @@ class Resource(status.ResourceStatus):
|
||||
replaced by more recent resource, then delete this and update
|
||||
the replacement resource's needed_by and replaces fields.
|
||||
"""
|
||||
with self.lock(engine_id):
|
||||
self.needed_by = list(set(v for v in input_data.values()
|
||||
if v is not None))
|
||||
self._calling_engine_id = engine_id
|
||||
self.needed_by = list(set(v for v in input_data.values()
|
||||
if v is not None))
|
||||
|
||||
if self.current_template_id != template_id:
|
||||
# just delete the resources in INIT state
|
||||
if self.action == self.INIT:
|
||||
try:
|
||||
resource_objects.Resource.delete(self.context, self.id)
|
||||
except exception.NotFound:
|
||||
pass
|
||||
else:
|
||||
runner = scheduler.TaskRunner(self.delete)
|
||||
runner(timeout=timeout,
|
||||
progress_callback=progress_callback)
|
||||
self._update_replacement_data(template_id)
|
||||
if self.current_template_id != template_id:
|
||||
# just delete the resources in INIT state
|
||||
if self.action == self.INIT:
|
||||
try:
|
||||
resource_objects.Resource.delete(self.context, self.id)
|
||||
except exception.NotFound:
|
||||
pass
|
||||
else:
|
||||
runner = scheduler.TaskRunner(self.delete)
|
||||
runner(timeout=timeout,
|
||||
progress_callback=progress_callback)
|
||||
self._update_replacement_data(template_id)
|
||||
|
||||
def handle_delete(self):
|
||||
"""Default implementation; should be overridden by resources."""
|
||||
@ -1762,7 +1814,7 @@ class Resource(status.ResourceStatus):
|
||||
except Exception as ex:
|
||||
LOG.warning('db error %s', ex)
|
||||
|
||||
def store(self, set_metadata=False):
|
||||
def store(self, set_metadata=False, lock=LOCK_NONE):
|
||||
"""Create the resource in the database.
|
||||
|
||||
If self.id is set, we update the existing stack.
|
||||
@ -1793,14 +1845,43 @@ class Resource(status.ResourceStatus):
|
||||
self._rsrc_metadata = metadata
|
||||
|
||||
if self.id is not None:
|
||||
resource_objects.Resource.update_by_id(
|
||||
self.context, self.id, rs)
|
||||
if (lock == self.LOCK_NONE or self._calling_engine_id is None):
|
||||
resource_objects.Resource.update_by_id(
|
||||
self.context, self.id, rs)
|
||||
if (lock != self.LOCK_NONE and
|
||||
self._calling_engine_id is None):
|
||||
LOG.warning('no calling_engine_id in store %s',
|
||||
str(rs))
|
||||
else:
|
||||
self._store_with_lock(rs, lock)
|
||||
else:
|
||||
new_rs = resource_objects.Resource.create(self.context, rs)
|
||||
self.id = new_rs.id
|
||||
self.uuid = new_rs.uuid
|
||||
self.created_time = new_rs.created_at
|
||||
|
||||
def _store_with_lock(self, rs, lock):
|
||||
if lock == self.LOCK_ACQUIRE:
|
||||
rs['engine_id'] = self._calling_engine_id
|
||||
expected_engine_id = None
|
||||
else: # self.LOCK_RELEASE
|
||||
expected_engine_id = self._calling_engine_id
|
||||
rs['engine_id'] = None
|
||||
if resource_objects.Resource.select_and_update_by_id(
|
||||
self.context, self.id, rs, expected_engine_id,
|
||||
self._atomic_key):
|
||||
self._incr_atomic_key()
|
||||
else:
|
||||
LOG.info('Resource %s is locked or does not exist',
|
||||
six.text_type(self))
|
||||
LOG.debug('Resource id:%(resource_id)s locked or does not exist. '
|
||||
'Expected atomic_key:%(atomic_key)s, '
|
||||
'accessing from engine_id:%(engine_id)s',
|
||||
{'resource_id': self.id,
|
||||
'atomic_key': self._atomic_key,
|
||||
'engine_id': self._calling_engine_id})
|
||||
raise exception.UpdateInProgress(self.name)
|
||||
|
||||
def _add_event(self, action, status, reason):
|
||||
"""Add a state change event to the database."""
|
||||
physical_res_id = self.resource_id or self.physical_resource_name()
|
||||
@ -1813,61 +1894,17 @@ class Resource(status.ResourceStatus):
|
||||
|
||||
@contextlib.contextmanager
|
||||
def lock(self, engine_id):
|
||||
self._acquire(engine_id)
|
||||
self._calling_engine_id = engine_id
|
||||
try:
|
||||
self._store_with_lock({}, self.LOCK_ACQUIRE)
|
||||
yield
|
||||
except: # noqa
|
||||
with excutils.save_and_reraise_exception():
|
||||
self._release(engine_id)
|
||||
else:
|
||||
self._release(engine_id)
|
||||
|
||||
def _acquire(self, engine_id):
|
||||
updated_ok = False
|
||||
try:
|
||||
rs = resource_objects.Resource.get_obj(self.context, self.id)
|
||||
updated_ok = rs.select_and_update(
|
||||
{'engine_id': engine_id},
|
||||
atomic_key=rs.atomic_key,
|
||||
expected_engine_id=None)
|
||||
except Exception as ex:
|
||||
LOG.error('DB error %s', ex)
|
||||
except exception.UpdateInProgress:
|
||||
raise
|
||||
|
||||
if not updated_ok:
|
||||
LOG.info('Resource %s is locked for update; deferring',
|
||||
six.text_type(self))
|
||||
LOG.debug(('Resource id:%(resource_id)s with '
|
||||
'atomic_key:%(atomic_key)s, locked '
|
||||
'by engine_id:%(rs_engine_id)s/%(engine_id)s') % {
|
||||
'resource_id': rs.id, 'atomic_key': rs.atomic_key,
|
||||
'rs_engine_id': rs.engine_id,
|
||||
'engine_id': engine_id})
|
||||
raise exception.UpdateInProgress(self.name)
|
||||
|
||||
def _release(self, engine_id):
|
||||
rs = None
|
||||
try:
|
||||
rs = resource_objects.Resource.get_obj(self.context, self.id)
|
||||
except (exception.NotFound, exception.EntityNotFound):
|
||||
# ignore: Resource is deleted holding a lock-on
|
||||
return
|
||||
|
||||
atomic_key = rs.atomic_key
|
||||
if atomic_key is None:
|
||||
atomic_key = 0
|
||||
|
||||
updated_ok = rs.select_and_update(
|
||||
{'engine_id': None,
|
||||
'current_template_id': self.current_template_id,
|
||||
'updated_at': self.updated_time,
|
||||
'requires': self.requires,
|
||||
'needed_by': self.needed_by},
|
||||
expected_engine_id=engine_id,
|
||||
atomic_key=atomic_key)
|
||||
|
||||
if not updated_ok:
|
||||
LOG.warning('Failed to unlock resource %s', self.name)
|
||||
except BaseException:
|
||||
with excutils.save_and_reraise_exception():
|
||||
self._store_with_lock({}, self.LOCK_RELEASE)
|
||||
else:
|
||||
self._store_with_lock({}, self.LOCK_RELEASE)
|
||||
|
||||
def _resolve_all_attributes(self, attr):
|
||||
"""Method for resolving all attributes.
|
||||
@ -2003,7 +2040,8 @@ class Resource(status.ResourceStatus):
|
||||
self.action = self.INIT
|
||||
self.status = self.COMPLETE
|
||||
|
||||
def state_set(self, action, status, reason="state changed"):
|
||||
def state_set(self, action, status, reason="state changed",
|
||||
lock=LOCK_NONE):
|
||||
if action not in self.ACTIONS:
|
||||
raise ValueError(_("Invalid action %s") % action)
|
||||
|
||||
@ -2016,7 +2054,7 @@ class Resource(status.ResourceStatus):
|
||||
self.action = action
|
||||
self.status = status
|
||||
self.status_reason = reason
|
||||
self.store(set_metadata)
|
||||
self.store(set_metadata, lock=lock)
|
||||
|
||||
if new_state != old_state:
|
||||
self._add_event(action, status, reason)
|
||||
|
@ -261,6 +261,14 @@ class Resource(
|
||||
atomic_key=atomic_key,
|
||||
expected_engine_id=expected_engine_id)
|
||||
|
||||
@classmethod
|
||||
def select_and_update_by_id(cls, context, resource_id,
|
||||
values, expected_engine_id=None,
|
||||
atomic_key=0):
|
||||
return db_api.resource_update(context, resource_id, values,
|
||||
atomic_key=atomic_key,
|
||||
expected_engine_id=expected_engine_id)
|
||||
|
||||
def refresh(self):
|
||||
resource_db = db_api.resource_get(self._context, self.id, refresh=True)
|
||||
return self.__class__._from_db_object(
|
||||
@ -282,3 +290,6 @@ class Resource(
|
||||
if not rows_updated:
|
||||
action = _('metadata setting for resource %s') % self.name
|
||||
raise exception.ConcurrentTransaction(action=action)
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
@ -676,25 +676,25 @@ class StackResourcesServiceTest(common.HeatTestCase):
|
||||
@tools.stack_context('service_mark_unhealthy_lock_converge_test_stk',
|
||||
convergence=True)
|
||||
def test_mark_unhealthy_stack_lock_convergence(self):
|
||||
mock_acquire = self.patchobject(res.Resource,
|
||||
'_acquire',
|
||||
return_value=None)
|
||||
mock_store_with_lock = self.patchobject(res.Resource,
|
||||
'_store_with_lock',
|
||||
return_value=None)
|
||||
|
||||
self.eng.resource_mark_unhealthy(self.ctx, self.stack.identifier(),
|
||||
'WebServer', True,
|
||||
resource_status_reason="")
|
||||
|
||||
mock_acquire.assert_called_once_with(self.eng.engine_id)
|
||||
self.assertEqual(2, mock_store_with_lock.call_count)
|
||||
|
||||
@tools.stack_context('service_mark_unhealthy_lockexc_converge_test_stk',
|
||||
convergence=True)
|
||||
def test_mark_unhealthy_stack_lock_exc_convergence(self):
|
||||
def _acquire(*args, **kwargs):
|
||||
def _store_with_lock(*args, **kwargs):
|
||||
raise exception.UpdateInProgress(self.stack.name)
|
||||
|
||||
self.patchobject(
|
||||
res.Resource,
|
||||
'_acquire',
|
||||
'_store_with_lock',
|
||||
return_value=None,
|
||||
side_effect=exception.UpdateInProgress(self.stack.name))
|
||||
ex = self.assertRaises(dispatcher.ExpectedException,
|
||||
|
@ -159,6 +159,7 @@ class ResourceWithProps(GenericResource):
|
||||
properties_schema = {
|
||||
'Foo': properties.Schema(properties.Schema.STRING),
|
||||
'FooInt': properties.Schema(properties.Schema.INTEGER)}
|
||||
atomic_key = None
|
||||
|
||||
|
||||
class ResourceWithPropsRefPropOnDelete(ResourceWithProps):
|
||||
|
@ -1996,17 +1996,6 @@ class ResourceTest(common.HeatTestCase):
|
||||
self.assertEqual(engine_id, rs.engine_id)
|
||||
self.assertEqual(atomic_key, rs.atomic_key)
|
||||
|
||||
@mock.patch.object(resource_objects.Resource, 'get_obj')
|
||||
@mock.patch.object(resource_objects.Resource, 'select_and_update')
|
||||
def test_release_ignores_not_found_error(self, mock_sau, mock_get_obj):
|
||||
tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo')
|
||||
res = generic_rsrc.GenericResource('test_res', tmpl, self.stack)
|
||||
res.store()
|
||||
res._acquire('engine-id')
|
||||
mock_get_obj.side_effect = exception.NotFound()
|
||||
res._release('engine-id')
|
||||
self.assertFalse(mock_sau.called)
|
||||
|
||||
def test_create_convergence(self):
|
||||
tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo')
|
||||
res = generic_rsrc.GenericResource('test_res', tmpl, self.stack)
|
||||
@ -2024,7 +2013,7 @@ class ResourceTest(common.HeatTestCase):
|
||||
self.assertTrue(mock_create.called)
|
||||
|
||||
self.assertItemsEqual([1, 3], res.requires)
|
||||
self._assert_resource_lock(res.id, None, 2)
|
||||
self._assert_resource_lock(res.id, None, None)
|
||||
|
||||
def test_create_convergence_throws_timeout(self):
|
||||
tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo')
|
||||
@ -2058,7 +2047,8 @@ class ResourceTest(common.HeatTestCase):
|
||||
res.create_convergence, self.stack.t.id, res_data,
|
||||
'engine-007', self.dummy_timeout, self.dummy_event)
|
||||
self.assertItemsEqual([5, 3], res.requires)
|
||||
self._assert_resource_lock(res.id, None, 2)
|
||||
# The locking happens in create which we mocked out
|
||||
self._assert_resource_lock(res.id, None, None)
|
||||
|
||||
@mock.patch.object(resource.Resource, 'adopt')
|
||||
def test_adopt_convergence_ok(self, mock_adopt):
|
||||
@ -2079,7 +2069,7 @@ class ResourceTest(common.HeatTestCase):
|
||||
mock_adopt.assert_called_once_with(
|
||||
resource_data={'resource_id': 'fluffy'})
|
||||
self.assertItemsEqual([5, 3], res.requires)
|
||||
self._assert_resource_lock(res.id, None, 2)
|
||||
self._assert_resource_lock(res.id, None, None)
|
||||
|
||||
def test_adopt_convergence_bad_data(self):
|
||||
tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo')
|
||||
@ -2097,7 +2087,9 @@ class ResourceTest(common.HeatTestCase):
|
||||
exc = self.assertRaises(exception.ResourceFailure, tr)
|
||||
self.assertIn('Resource ID was not provided', six.text_type(exc))
|
||||
|
||||
def test_update_convergence(self):
|
||||
@mock.patch.object(resource.Resource, 'update_template_diff_properties')
|
||||
@mock.patch.object(resource.Resource, '_needs_update')
|
||||
def test_update_convergence(self, mock_nu, mock_utd):
|
||||
tmpl = template.Template({
|
||||
'HeatTemplateFormatVersion': '2012-12-12',
|
||||
'Resources': {
|
||||
@ -2122,19 +2114,18 @@ class ResourceTest(common.HeatTestCase):
|
||||
new_temp.store(stack.context)
|
||||
new_stack = parser.Stack(utils.dummy_context(), 'test_stack',
|
||||
new_temp, stack_id=self.stack.id)
|
||||
res.stack.convergence = True
|
||||
|
||||
res_data = {(1, True): {u'id': 4, u'name': 'A', 'attrs': {}},
|
||||
(2, True): {u'id': 3, u'name': 'B', 'attrs': {}}}
|
||||
res_data = node_data.load_resources_data(res_data)
|
||||
pcb = mock.Mock()
|
||||
with mock.patch.object(resource.Resource, 'update') as mock_update:
|
||||
tr = scheduler.TaskRunner(res.update_convergence, new_temp.id,
|
||||
res_data, 'engine-007', 120, new_stack,
|
||||
pcb)
|
||||
tr()
|
||||
self.assertTrue(mock_update.called)
|
||||
tr = scheduler.TaskRunner(res.update_convergence, new_temp.id,
|
||||
res_data, 'engine-007', 120, new_stack)
|
||||
tr()
|
||||
|
||||
self.assertItemsEqual([3, 4], res.requires)
|
||||
self.assertEqual(res.action, resource.Resource.UPDATE)
|
||||
self.assertEqual(res.status, resource.Resource.COMPLETE)
|
||||
self._assert_resource_lock(res.id, None, 2)
|
||||
|
||||
def test_update_convergence_throws_timeout(self):
|
||||
@ -2210,7 +2201,9 @@ class ResourceTest(common.HeatTestCase):
|
||||
self.dummy_event)
|
||||
self.assertRaises(resource.UpdateReplace, tr)
|
||||
|
||||
def test_update_in_progress_convergence(self):
|
||||
@mock.patch.object(resource.Resource, '_needs_update')
|
||||
@mock.patch.object(resource.Resource, '_check_for_convergence_replace')
|
||||
def test_update_in_progress_convergence(self, mock_cfcr, mock_nu):
|
||||
tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo')
|
||||
res = generic_rsrc.GenericResource('test_res', tmpl, self.stack)
|
||||
res.requires = [1, 2]
|
||||
@ -2219,21 +2212,34 @@ class ResourceTest(common.HeatTestCase):
|
||||
rs.update_and_save({'engine_id': 'not-this'})
|
||||
self._assert_resource_lock(res.id, 'not-this', None)
|
||||
|
||||
res.stack.convergence = True
|
||||
|
||||
res_data = {(1, True): {u'id': 4, u'name': 'A', 'attrs': {}},
|
||||
(2, True): {u'id': 3, u'name': 'B', 'attrs': {}}}
|
||||
res_data = node_data.load_resources_data(res_data)
|
||||
tmpl = template.Template({
|
||||
'HeatTemplateFormatVersion': '2012-12-12',
|
||||
'Resources': {
|
||||
'test_res': {'Type': 'ResourceWithPropsType'}
|
||||
}}, env=self.env)
|
||||
new_stack = parser.Stack(utils.dummy_context(), 'test_stack',
|
||||
tmpl, stack_id=self.stack.id)
|
||||
tr = scheduler.TaskRunner(res.update_convergence, 'template_key',
|
||||
res_data, 'engine-007', self.dummy_timeout,
|
||||
mock.ANY, self.dummy_event)
|
||||
new_stack)
|
||||
ex = self.assertRaises(exception.UpdateInProgress, tr)
|
||||
msg = ("The resource %s is already being updated." %
|
||||
res.name)
|
||||
self.assertEqual(msg, six.text_type(ex))
|
||||
# ensure requirements are not updated for failed resource
|
||||
self.assertEqual([1, 2], res.requires)
|
||||
rs = resource_objects.Resource.get_obj(self.stack.context, res.id)
|
||||
self.assertEqual([1, 2], rs.requires)
|
||||
|
||||
@mock.patch.object(resource.Resource, 'update')
|
||||
def test_update_resource_convergence_failed(self, mock_update):
|
||||
@mock.patch.object(resource.Resource, 'update_template_diff_properties')
|
||||
@mock.patch.object(resource.Resource, '_needs_update')
|
||||
def test_update_resource_convergence_failed(self,
|
||||
mock_needs_update,
|
||||
mock_update_template_diff):
|
||||
tmpl = template.Template({
|
||||
'HeatTemplateFormatVersion': '2012-12-12',
|
||||
'Resources': {
|
||||
@ -2259,26 +2265,25 @@ class ResourceTest(common.HeatTestCase):
|
||||
res_data = {(1, True): {u'id': 4, u'name': 'A', 'attrs': {}},
|
||||
(2, True): {u'id': 3, u'name': 'B', 'attrs': {}}}
|
||||
res_data = node_data.load_resources_data(res_data)
|
||||
exc = Exception(_('Resource update failed'))
|
||||
new_stack = parser.Stack(utils.dummy_context(), 'test_stack',
|
||||
new_temp, stack_id=self.stack.id)
|
||||
dummy_ex = exception.ResourceFailure(exc, res, action=res.UPDATE)
|
||||
mock_update.side_effect = dummy_ex
|
||||
|
||||
res.stack.convergence = True
|
||||
res._calling_engine_id = 'engine-9'
|
||||
|
||||
tr = scheduler.TaskRunner(res.update_convergence, new_temp.id,
|
||||
res_data, 'engine-007', 120, new_stack,
|
||||
self.dummy_event)
|
||||
self.assertRaises(exception.ResourceFailure, tr)
|
||||
|
||||
expected_rsrc_def = new_temp.resource_definitions(self.stack)[res.name]
|
||||
mock_update.assert_called_once_with(expected_rsrc_def)
|
||||
# check if current_template_id was updated
|
||||
self.assertEqual(new_temp.id, res.current_template_id)
|
||||
# check if requires was updated
|
||||
self.assertItemsEqual([3, 4], res.requires)
|
||||
self._assert_resource_lock(res.id, None, 2)
|
||||
self.assertEqual(res.action, resource.Resource.UPDATE)
|
||||
self.assertEqual(res.status, resource.Resource.FAILED)
|
||||
self._assert_resource_lock(res.id, None, 3)
|
||||
|
||||
@mock.patch.object(resource.Resource, 'update')
|
||||
def test_update_resource_convergence_update_replace(self, mock_update):
|
||||
def test_update_resource_convergence_update_replace(self):
|
||||
tmpl = template.Template({
|
||||
'HeatTemplateFormatVersion': '2012-12-12',
|
||||
'Resources': {
|
||||
@ -2301,10 +2306,11 @@ class ResourceTest(common.HeatTestCase):
|
||||
}}, env=self.env)
|
||||
new_temp.store(stack.context)
|
||||
|
||||
res.stack.convergence = True
|
||||
|
||||
res_data = {(1, True): {u'id': 4, u'name': 'A', 'attrs': {}},
|
||||
(2, True): {u'id': 3, u'name': 'B', 'attrs': {}}}
|
||||
res_data = node_data.load_resources_data(res_data)
|
||||
mock_update.side_effect = resource.UpdateReplace
|
||||
new_stack = parser.Stack(utils.dummy_context(), 'test_stack',
|
||||
new_temp, stack_id=self.stack.id)
|
||||
tr = scheduler.TaskRunner(res.update_convergence, new_temp.id,
|
||||
@ -2312,13 +2318,11 @@ class ResourceTest(common.HeatTestCase):
|
||||
self.dummy_event)
|
||||
self.assertRaises(resource.UpdateReplace, tr)
|
||||
|
||||
expected_rsrc_def = new_temp.resource_definitions(self.stack)[res.name]
|
||||
mock_update.assert_called_once_with(expected_rsrc_def)
|
||||
# ensure that current_template_id was not updated
|
||||
self.assertEqual(stack.t.id, res.current_template_id)
|
||||
# ensure that requires was not updated
|
||||
self.assertItemsEqual([2], res.requires)
|
||||
self._assert_resource_lock(res.id, None, 2)
|
||||
self._assert_resource_lock(res.id, None, None)
|
||||
|
||||
def test_convergence_update_replace_rollback(self):
|
||||
rsrc_def = rsrc_defn.ResourceDefinition('test_res',
|
||||
@ -2386,7 +2390,7 @@ class ResourceTest(common.HeatTestCase):
|
||||
tr()
|
||||
self.assertTrue(mock_delete.called)
|
||||
self.assertTrue(res._update_replacement_data.called)
|
||||
self._assert_resource_lock(res.id, None, 2)
|
||||
self._assert_resource_lock(res.id, None, None)
|
||||
|
||||
def test_delete_convergence_does_not_delete_same_template_resource(self):
|
||||
tmpl = rsrc_defn.ResourceDefinition('test_res', 'Foo')
|
||||
@ -2410,6 +2414,7 @@ class ResourceTest(common.HeatTestCase):
|
||||
res_id = res.id
|
||||
res.handle_delete = mock.Mock(side_effect=ValueError('test'))
|
||||
self._assert_resource_lock(res.id, None, None)
|
||||
res.stack.convergence = True
|
||||
tr = scheduler.TaskRunner(res.delete_convergence, 2, {}, 'engine-007',
|
||||
self.dummy_timeout, self.dummy_event)
|
||||
self.assertRaises(exception.ResourceFailure, tr)
|
||||
@ -2428,12 +2433,13 @@ class ResourceTest(common.HeatTestCase):
|
||||
res.status = res.COMPLETE
|
||||
res.action = res.CREATE
|
||||
res.store()
|
||||
self.stack.convergence = True
|
||||
res._calling_engine_id = 'engine-9'
|
||||
rs = resource_objects.Resource.get_obj(self.stack.context, res.id)
|
||||
rs.update_and_save({'engine_id': 'not-this'})
|
||||
self._assert_resource_lock(res.id, 'not-this', None)
|
||||
|
||||
tr = scheduler.TaskRunner(res.delete_convergence, 1, {}, 'engine-007',
|
||||
self.dummy_timeout, self.dummy_event)
|
||||
tr = scheduler.TaskRunner(res.delete)
|
||||
ex = self.assertRaises(exception.UpdateInProgress, tr)
|
||||
msg = ("The resource %s is already being updated." %
|
||||
res.name)
|
||||
|
@ -55,3 +55,8 @@ class AodhAlarmTest(scenario_base.ScenarioTestsBase):
|
||||
# Note: there is little point waiting more than 60s+time to scale up.
|
||||
self.assertTrue(test.call_until_true(
|
||||
120, 2, self.check_instance_count, stack_identifier, 2))
|
||||
|
||||
# Temporarily avoids a race condition, addressed in the
|
||||
# next change https://review.openstack.org/#/c/449351/
|
||||
import time
|
||||
time.sleep(3)
|
||||
|
Loading…
x
Reference in New Issue
Block a user