Merge "Deprecate wrappertask decorator"
This commit is contained in:
commit
9f931119ba
@ -1005,7 +1005,6 @@ class Resource(status.ResourceStatus):
|
||||
action
|
||||
)
|
||||
|
||||
@scheduler.wrappertask
|
||||
def _do_action(self, action, pre_func=None, resource_data=None):
|
||||
"""Perform a transition to a new state via a specified action.
|
||||
|
||||
@ -1028,7 +1027,7 @@ class Resource(status.ResourceStatus):
|
||||
pre_func()
|
||||
|
||||
handler_args = [resource_data] if resource_data is not None else []
|
||||
yield self.action_handler_task(action, args=handler_args)
|
||||
yield from self.action_handler_task(action, args=handler_args)
|
||||
|
||||
def _update_stored_properties(self):
|
||||
old_props = self._stored_properties_data
|
||||
@ -1189,7 +1188,6 @@ class Resource(status.ResourceStatus):
|
||||
message="%s" % error_message)
|
||||
raise
|
||||
|
||||
@scheduler.wrappertask
|
||||
def create(self):
|
||||
"""Create the resource.
|
||||
|
||||
@ -1203,18 +1201,18 @@ class Resource(status.ResourceStatus):
|
||||
raise exception.ResourceFailure(exc, self, action)
|
||||
|
||||
if self.external_id is not None:
|
||||
yield self._do_action(self.ADOPT,
|
||||
resource_data={
|
||||
'resource_id': self.external_id})
|
||||
yield self.check()
|
||||
yield from self._do_action(self.ADOPT,
|
||||
resource_data={
|
||||
'resource_id': self.external_id})
|
||||
yield from self.check()
|
||||
return
|
||||
|
||||
# This method can be called when we replace a resource, too. In that
|
||||
# case, a hook has already been dealt with in `Resource.update` so we
|
||||
# shouldn't do it here again:
|
||||
if self.stack.action == self.stack.CREATE:
|
||||
yield self._break_if_required(
|
||||
self.CREATE, environment.HOOK_PRE_CREATE)
|
||||
yield from self._break_if_required(self.CREATE,
|
||||
environment.HOOK_PRE_CREATE)
|
||||
|
||||
LOG.info('creating %s', self)
|
||||
|
||||
@ -1239,13 +1237,13 @@ class Resource(status.ResourceStatus):
|
||||
delay = timeutils.retry_backoff_delay(count[action],
|
||||
jitter_max=2.0)
|
||||
waiter = scheduler.TaskRunner(self.pause)
|
||||
yield waiter.as_task(timeout=delay)
|
||||
yield from waiter.as_task(timeout=delay)
|
||||
elif action == self.CREATE:
|
||||
# Only validate properties in first create call.
|
||||
pre_func = self.properties.validate
|
||||
|
||||
try:
|
||||
yield self._do_action(action, pre_func)
|
||||
yield from self._do_action(action, pre_func)
|
||||
if action == self.CREATE:
|
||||
first_failure = None
|
||||
break
|
||||
@ -1282,8 +1280,8 @@ class Resource(status.ResourceStatus):
|
||||
raise first_failure
|
||||
|
||||
if self.stack.action == self.stack.CREATE:
|
||||
yield self._break_if_required(
|
||||
self.CREATE, environment.HOOK_POST_CREATE)
|
||||
yield from self._break_if_required(self.CREATE,
|
||||
environment.HOOK_POST_CREATE)
|
||||
|
||||
@staticmethod
|
||||
def pause():
|
||||
@ -1312,7 +1310,7 @@ class Resource(status.ResourceStatus):
|
||||
adopt.
|
||||
"""
|
||||
self._update_stored_properties()
|
||||
return self._do_action(self.ADOPT, resource_data=resource_data)
|
||||
yield from self._do_action(self.ADOPT, resource_data=resource_data)
|
||||
|
||||
def handle_adopt(self, resource_data=None):
|
||||
resource_id, data, metadata = self._get_resource_info(resource_data)
|
||||
@ -1606,7 +1604,6 @@ class Resource(status.ResourceStatus):
|
||||
elif new_template_id is not None:
|
||||
self.store(lock=lock)
|
||||
|
||||
@scheduler.wrappertask
|
||||
def update(self, after, before=None, prev_resource=None,
|
||||
new_template_id=None, new_requires=None):
|
||||
"""Return a task to update the resource.
|
||||
@ -1634,8 +1631,8 @@ class Resource(status.ResourceStatus):
|
||||
|
||||
after_props, before_props = self._prepare_update_props(after, before)
|
||||
|
||||
yield self._break_if_required(
|
||||
self.UPDATE, environment.HOOK_PRE_UPDATE)
|
||||
yield from self._break_if_required(self.UPDATE,
|
||||
environment.HOOK_PRE_UPDATE)
|
||||
|
||||
try:
|
||||
registry = self.stack.env.registry
|
||||
@ -1695,9 +1692,9 @@ class Resource(status.ResourceStatus):
|
||||
if new_template_id is not None:
|
||||
self.current_template_id = new_template_id
|
||||
|
||||
yield self.action_handler_task(action,
|
||||
args=[after, tmpl_diff,
|
||||
prop_diff])
|
||||
yield from self.action_handler_task(action,
|
||||
args=[after, tmpl_diff,
|
||||
prop_diff])
|
||||
except UpdateReplace:
|
||||
with excutils.save_and_reraise_exception():
|
||||
self.current_template_id = self.old_template_id
|
||||
@ -1710,8 +1707,8 @@ class Resource(status.ResourceStatus):
|
||||
if new_requires is not None:
|
||||
self.requires = new_requires
|
||||
|
||||
yield self._break_if_required(
|
||||
self.UPDATE, environment.HOOK_POST_UPDATE)
|
||||
yield from self._break_if_required(self.UPDATE,
|
||||
environment.HOOK_POST_UPDATE)
|
||||
|
||||
def prepare_for_replace(self):
|
||||
"""Prepare resource for replacing.
|
||||
@ -1731,7 +1728,6 @@ class Resource(status.ResourceStatus):
|
||||
"""
|
||||
pass
|
||||
|
||||
@scheduler.wrappertask
|
||||
def check(self):
|
||||
"""Checks that the physical resource is in its expected state.
|
||||
|
||||
@ -1753,7 +1749,7 @@ class Resource(status.ResourceStatus):
|
||||
raise failure
|
||||
|
||||
with self.frozen_properties():
|
||||
yield self._do_action(action)
|
||||
yield from self._do_action(action)
|
||||
else:
|
||||
if self.state == (self.INIT, self.COMPLETE):
|
||||
# No need to store status; better to leave the resource in
|
||||
@ -1778,7 +1774,6 @@ class Resource(status.ResourceStatus):
|
||||
if invalid_checks:
|
||||
raise exception.Error('; '.join(invalid_checks))
|
||||
|
||||
@scheduler.wrappertask
|
||||
def suspend(self):
|
||||
"""Return a task to suspend the resource.
|
||||
|
||||
@ -1798,9 +1793,8 @@ class Resource(status.ResourceStatus):
|
||||
|
||||
LOG.info('suspending %s', self)
|
||||
with self.frozen_properties():
|
||||
yield self._do_action(action)
|
||||
yield from self._do_action(action)
|
||||
|
||||
@scheduler.wrappertask
|
||||
def resume(self):
|
||||
"""Return a task to resume the resource.
|
||||
|
||||
@ -1820,18 +1814,16 @@ class Resource(status.ResourceStatus):
|
||||
|
||||
LOG.info('resuming %s', self)
|
||||
with self.frozen_properties():
|
||||
yield self._do_action(action)
|
||||
yield from self._do_action(action)
|
||||
|
||||
@scheduler.wrappertask
|
||||
def snapshot(self):
|
||||
"""Snapshot the resource and return the created data, if any."""
|
||||
LOG.info('snapshotting %s', self)
|
||||
with self.frozen_properties():
|
||||
yield self._do_action(self.SNAPSHOT)
|
||||
yield from self._do_action(self.SNAPSHOT)
|
||||
|
||||
@scheduler.wrappertask
|
||||
def delete_snapshot(self, data):
|
||||
yield self.action_handler_task('delete_snapshot', args=[data])
|
||||
yield from self.action_handler_task('delete_snapshot', args=[data])
|
||||
|
||||
def physical_resource_name(self):
|
||||
if self.id is None or self.action == self.INIT:
|
||||
@ -1981,7 +1973,6 @@ class Resource(status.ResourceStatus):
|
||||
return self.resource_id
|
||||
return None
|
||||
|
||||
@scheduler.wrappertask
|
||||
def delete(self):
|
||||
"""A task to delete the resource.
|
||||
|
||||
@ -2009,8 +2000,8 @@ class Resource(status.ResourceStatus):
|
||||
# case, a hook has already been dealt with in `Resource.update` so we
|
||||
# shouldn't do it here again:
|
||||
if self.stack.action == self.stack.DELETE:
|
||||
yield self._break_if_required(
|
||||
self.DELETE, environment.HOOK_PRE_DELETE)
|
||||
yield from self._break_if_required(self.DELETE,
|
||||
environment.HOOK_PRE_DELETE)
|
||||
|
||||
LOG.info('deleting %s', self)
|
||||
|
||||
@ -2045,20 +2036,19 @@ class Resource(status.ResourceStatus):
|
||||
delay = timeutils.retry_backoff_delay(count,
|
||||
jitter_max=2.0)
|
||||
waiter = scheduler.TaskRunner(self.pause)
|
||||
yield waiter.as_task(timeout=delay)
|
||||
yield from waiter.as_task(timeout=delay)
|
||||
with excutils.exception_filter(should_retry):
|
||||
yield self.action_handler_task(action,
|
||||
*action_args)
|
||||
yield from self.action_handler_task(action,
|
||||
*action_args)
|
||||
break
|
||||
|
||||
if self.stack.action == self.stack.DELETE:
|
||||
yield self._break_if_required(
|
||||
self.DELETE, environment.HOOK_POST_DELETE)
|
||||
yield from self._break_if_required(self.DELETE,
|
||||
environment.HOOK_POST_DELETE)
|
||||
|
||||
@scheduler.wrappertask
|
||||
def destroy(self):
|
||||
"""A task to delete the resource and remove it from the database."""
|
||||
yield self.delete()
|
||||
yield from self.delete()
|
||||
|
||||
if self.id is None:
|
||||
return
|
||||
|
@ -15,6 +15,7 @@ import functools
|
||||
import sys
|
||||
import types
|
||||
|
||||
import debtcollector
|
||||
import eventlet
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import encodeutils
|
||||
@ -293,7 +294,11 @@ class TaskRunner(object):
|
||||
return self.__nonzero__()
|
||||
|
||||
|
||||
def wrappertask(task): # noqa: C901
|
||||
@debtcollector.removals.remove(message="Use the Python 3 'yield from' keyword "
|
||||
"in place of 'yield', instead of "
|
||||
"decorating with @wrappertask.",
|
||||
stacklevel=1)
|
||||
def wrappertask(task):
|
||||
"""Decorator for a task that needs to drive a subtask.
|
||||
|
||||
This is essentially a replacement for the Python 3-only "yield from"
|
||||
@ -311,6 +316,9 @@ def wrappertask(task): # noqa: C901
|
||||
|
||||
@functools.wraps(task)
|
||||
def wrapper(*args, **kwargs):
|
||||
# This could be simplified by using 'yield from' for the parent loop
|
||||
# as well, but not without adding yet another frame to the stack
|
||||
# for the subtasks.
|
||||
parent = task(*args, **kwargs)
|
||||
|
||||
try:
|
||||
@ -321,28 +329,7 @@ def wrappertask(task): # noqa: C901
|
||||
while True:
|
||||
try:
|
||||
if isinstance(subtask, types.GeneratorType):
|
||||
subtask_running = True
|
||||
try:
|
||||
step = next(subtask)
|
||||
except StopIteration:
|
||||
subtask_running = False
|
||||
|
||||
while subtask_running:
|
||||
try:
|
||||
yield step
|
||||
except GeneratorExit:
|
||||
subtask.close()
|
||||
raise
|
||||
except: # noqa
|
||||
try:
|
||||
step = subtask.throw(*sys.exc_info())
|
||||
except StopIteration:
|
||||
subtask_running = False
|
||||
else:
|
||||
try:
|
||||
step = next(subtask)
|
||||
except StopIteration:
|
||||
subtask_running = False
|
||||
yield from subtask
|
||||
else:
|
||||
yield subtask
|
||||
except GeneratorExit:
|
||||
|
@ -1155,7 +1155,6 @@ class Stack(collections.Mapping):
|
||||
|
||||
return {'resource_data': data['resources'].get(resource.name)}
|
||||
|
||||
@scheduler.wrappertask
|
||||
def stack_task(self, action, reverse=False, post_func=None,
|
||||
aggregate_exceptions=False, pre_completion_func=None,
|
||||
notify=None):
|
||||
@ -1204,12 +1203,11 @@ class Stack(collections.Mapping):
|
||||
lambda x: {})
|
||||
|
||||
@functools.wraps(getattr(resource.Resource, action_method))
|
||||
@scheduler.wrappertask
|
||||
def resource_action(r):
|
||||
# Find e.g resource.create and call it
|
||||
handle = getattr(r, action_method)
|
||||
|
||||
yield handle(**handle_kwargs(r))
|
||||
yield from handle(**handle_kwargs(r))
|
||||
|
||||
if action == self.CREATE:
|
||||
stk_defn.update_resource_data(self.defn, r.name, r.node_data())
|
||||
@ -1225,7 +1223,7 @@ class Stack(collections.Mapping):
|
||||
aggregate_exceptions=aggregate_exceptions)
|
||||
|
||||
try:
|
||||
yield action_task()
|
||||
yield from action_task()
|
||||
except scheduler.Timeout:
|
||||
stack_status = self.FAILED
|
||||
reason = '%s timed out' % action.title()
|
||||
@ -1591,7 +1589,6 @@ class Stack(collections.Mapping):
|
||||
|
||||
self.state_set(self.action, self.FAILED, str(reason))
|
||||
|
||||
@scheduler.wrappertask
|
||||
def update_task(self, newstack, action=UPDATE,
|
||||
msg_queue=None, notify=None):
|
||||
if action not in (self.UPDATE, self.ROLLBACK, self.RESTORE):
|
||||
@ -1674,8 +1671,8 @@ class Stack(collections.Mapping):
|
||||
check_message = functools.partial(self._check_for_message,
|
||||
msg_queue)
|
||||
try:
|
||||
yield updater.as_task(timeout=self.timeout_secs(),
|
||||
progress_callback=check_message)
|
||||
yield from updater.as_task(timeout=self.timeout_secs(),
|
||||
progress_callback=check_message)
|
||||
finally:
|
||||
self.reset_dependencies()
|
||||
|
||||
@ -1691,7 +1688,7 @@ class Stack(collections.Mapping):
|
||||
# so we roll back to the original state
|
||||
should_rollback = self._update_exception_handler(e, action)
|
||||
if should_rollback:
|
||||
yield self.update_task(oldstack, action=self.ROLLBACK)
|
||||
yield from self.update_task(oldstack, action=self.ROLLBACK)
|
||||
except BaseException as e:
|
||||
with excutils.save_and_reraise_exception():
|
||||
self._update_exception_handler(e, action)
|
||||
|
@ -45,7 +45,6 @@ class StackUpdate(object):
|
||||
else:
|
||||
return '%s Update' % str(self.existing_stack)
|
||||
|
||||
@scheduler.wrappertask
|
||||
def __call__(self):
|
||||
"""Return a co-routine that updates the stack."""
|
||||
|
||||
@ -63,10 +62,10 @@ class StackUpdate(object):
|
||||
error_wait_time=get_error_wait_time)
|
||||
|
||||
if not self.rollback:
|
||||
yield cleanup_prev()
|
||||
yield from cleanup_prev()
|
||||
|
||||
try:
|
||||
yield updater()
|
||||
yield from updater()
|
||||
finally:
|
||||
self.previous_stack.reset_dependencies()
|
||||
|
||||
@ -76,12 +75,11 @@ class StackUpdate(object):
|
||||
else:
|
||||
return self._process_existing_resource_update(res)
|
||||
|
||||
@scheduler.wrappertask
|
||||
def _remove_backup_resource(self, prev_res):
|
||||
if prev_res.state not in ((prev_res.INIT, prev_res.COMPLETE),
|
||||
(prev_res.DELETE, prev_res.COMPLETE)):
|
||||
LOG.debug("Deleting backup resource %s", prev_res.name)
|
||||
yield prev_res.destroy()
|
||||
yield from prev_res.destroy()
|
||||
|
||||
@staticmethod
|
||||
def _exchange_stacks(existing_res, prev_res):
|
||||
@ -91,7 +89,6 @@ class StackUpdate(object):
|
||||
prev_stack.add_resource(existing_res)
|
||||
existing_stack.add_resource(prev_res)
|
||||
|
||||
@scheduler.wrappertask
|
||||
def _create_resource(self, new_res):
|
||||
res_name = new_res.name
|
||||
|
||||
@ -110,7 +107,7 @@ class StackUpdate(object):
|
||||
return
|
||||
|
||||
LOG.debug("Deleting backup Resource %s", res_name)
|
||||
yield prev_res.destroy()
|
||||
yield from prev_res.destroy()
|
||||
|
||||
# Back up existing resource
|
||||
if res_name in self.existing_stack:
|
||||
@ -131,7 +128,7 @@ class StackUpdate(object):
|
||||
self.previous_stack.t.add_resource(new_res.t)
|
||||
self.previous_stack.t.store(self.previous_stack.context)
|
||||
|
||||
yield new_res.create()
|
||||
yield from new_res.create()
|
||||
|
||||
self._update_resource_data(new_res)
|
||||
|
||||
@ -160,7 +157,6 @@ class StackUpdate(object):
|
||||
stk_defn.update_resource_data(self.new_stack.defn,
|
||||
resource.name, node_data)
|
||||
|
||||
@scheduler.wrappertask
|
||||
def _process_new_resource_update(self, new_res):
|
||||
res_name = new_res.name
|
||||
|
||||
@ -169,9 +165,9 @@ class StackUpdate(object):
|
||||
is_substituted = existing_res.check_is_substituted(type(new_res))
|
||||
if type(existing_res) is type(new_res) or is_substituted:
|
||||
try:
|
||||
yield self._update_in_place(existing_res,
|
||||
new_res,
|
||||
is_substituted)
|
||||
yield from self._update_in_place(existing_res,
|
||||
new_res,
|
||||
is_substituted)
|
||||
except resource.UpdateReplace:
|
||||
pass
|
||||
else:
|
||||
@ -195,7 +191,7 @@ class StackUpdate(object):
|
||||
else:
|
||||
self._check_replace_restricted(new_res)
|
||||
|
||||
yield self._create_resource(new_res)
|
||||
yield from self._create_resource(new_res)
|
||||
|
||||
def _update_in_place(self, existing_res, new_res, is_substituted=False):
|
||||
existing_snippet = self.existing_snippets[existing_res.name]
|
||||
@ -214,15 +210,15 @@ class StackUpdate(object):
|
||||
existing_res.stack.resources[existing_res.name] = substitute
|
||||
existing_res = substitute
|
||||
existing_res.converge = self.new_stack.converge
|
||||
return existing_res.update(new_snippet, existing_snippet,
|
||||
prev_resource=prev_res)
|
||||
yield from existing_res.update(new_snippet, existing_snippet,
|
||||
prev_resource=prev_res)
|
||||
|
||||
@scheduler.wrappertask
|
||||
def _process_existing_resource_update(self, existing_res):
|
||||
res_name = existing_res.name
|
||||
|
||||
if res_name in self.previous_stack:
|
||||
yield self._remove_backup_resource(self.previous_stack[res_name])
|
||||
backup_res = self.previous_stack[res_name]
|
||||
yield from self._remove_backup_resource(backup_res)
|
||||
|
||||
if res_name in self.new_stack:
|
||||
new_res = self.new_stack[res_name]
|
||||
@ -231,7 +227,7 @@ class StackUpdate(object):
|
||||
return
|
||||
|
||||
if existing_res.stack is not self.previous_stack:
|
||||
yield existing_res.destroy()
|
||||
yield from existing_res.destroy()
|
||||
|
||||
if res_name not in self.new_stack:
|
||||
self.existing_stack.remove_resource(res_name)
|
||||
|
@ -2110,6 +2110,7 @@ class StackUpdateTest(common.HeatTestCase):
|
||||
|
||||
def update(self, after, before=None, prev_resource=None):
|
||||
ResourceTypeB.count_b += 1
|
||||
yield
|
||||
|
||||
resource._register_class('ResourceTypeB', ResourceTypeB)
|
||||
|
||||
@ -2124,6 +2125,7 @@ class StackUpdateTest(common.HeatTestCase):
|
||||
|
||||
def update(self, after, before=None, prev_resource=None):
|
||||
ResourceTypeA.count_a += 1
|
||||
yield
|
||||
|
||||
resource._register_class('ResourceTypeA', ResourceTypeA)
|
||||
|
||||
|
@ -6,6 +6,7 @@ pbr!=2.1.0,>=2.0.0 # Apache-2.0
|
||||
Babel!=2.4.0,>=2.3.4 # BSD
|
||||
croniter>=0.3.4 # MIT License
|
||||
cryptography>=2.1 # BSD/Apache-2.0
|
||||
debtcollector>=1.19.0 # Apache-2.0
|
||||
eventlet!=0.18.3,!=0.20.1,!=0.21.0,!=0.23.0,!=0.25.0,>=0.18.2 # MIT
|
||||
keystoneauth1>=3.18.0 # Apache-2.0
|
||||
keystonemiddleware>=4.17.0 # Apache-2.0
|
||||
|
Loading…
Reference in New Issue
Block a user