From ebd52f2e6de96b965878f0fd5e56f089ceb1ff93 Mon Sep 17 00:00:00 2001 From: anastasia-karpinska Date: Tue, 10 Dec 2013 23:12:31 +0200 Subject: [PATCH] Switch create volume commands to Taskflow 0.1.1 - Old TaskFlow code was removed from Cinder. - TaskFlow 0.1.1 was added to Cinder requirements. - Create volume flows for volume.api, volume.manager and scheduler.manager were updated to use taskFlow 0.1.1 Partially implements: blueprint create-volume-flow Change-Id: Idbac8d001436f02978b366fbb3205ce84c847267 --- cinder/scheduler/manager.py | 24 +- cinder/taskflow/__init__.py | 1 - cinder/taskflow/decorators.py | 276 ----------- cinder/taskflow/exceptions.py | 69 --- cinder/taskflow/patterns/__init__.py | 1 - cinder/taskflow/patterns/base.py | 215 -------- cinder/taskflow/patterns/linear_flow.py | 271 ---------- cinder/taskflow/states.py | 40 -- cinder/taskflow/task.py | 67 --- cinder/taskflow/utils.py | 464 ------------------ cinder/tests/test_volume.py | 20 +- cinder/volume/api.py | 41 +- cinder/volume/flows/base.py | 29 +- cinder/volume/flows/create_volume/__init__.py | 460 +++++++---------- cinder/volume/flows/utils.py | 55 --- cinder/volume/manager.py | 55 ++- requirements.txt | 1 + taskflow.conf | 7 - 18 files changed, 247 insertions(+), 1849 deletions(-) delete mode 100644 cinder/taskflow/__init__.py delete mode 100644 cinder/taskflow/decorators.py delete mode 100644 cinder/taskflow/exceptions.py delete mode 100644 cinder/taskflow/patterns/__init__.py delete mode 100644 cinder/taskflow/patterns/base.py delete mode 100644 cinder/taskflow/patterns/linear_flow.py delete mode 100644 cinder/taskflow/states.py delete mode 100644 cinder/taskflow/task.py delete mode 100644 cinder/taskflow/utils.py delete mode 100644 cinder/volume/flows/utils.py delete mode 100644 taskflow.conf diff --git a/cinder/scheduler/manager.py b/cinder/scheduler/manager.py index 19d0a500c83..b4d342717e8 100644 --- a/cinder/scheduler/manager.py +++ b/cinder/scheduler/manager.py @@ -34,7 +34,6 @@ from cinder.openstack.common.notifier import api as notifier from cinder.volume.flows import create_volume from cinder.volume import rpcapi as volume_rpcapi -from cinder.taskflow import states scheduler_driver_opt = cfg.StrOpt('scheduler_driver', default='cinder.scheduler.filter_scheduler.' @@ -76,17 +75,18 @@ class SchedulerManager(manager.Manager): image_id=None, request_spec=None, filter_properties=None): - flow = create_volume.get_scheduler_flow(db, self.driver, - request_spec, - filter_properties, - volume_id, snapshot_id, - image_id) - assert flow, _('Schedule volume flow not retrieved') - - flow.run(context) - if flow.state != states.SUCCESS: - LOG.warn(_("Failed to successfully complete" - " schedule volume using flow: %s"), flow) + try: + flow_engine = create_volume.get_scheduler_flow(context, + db, self.driver, + request_spec, + filter_properties, + volume_id, + snapshot_id, + image_id) + except Exception: + raise exception.CinderException( + _("Failed to create scheduler manager volume flow")) + flow_engine.run() def request_service_capabilities(self, context): volume_rpcapi.VolumeAPI().publish_service_capabilities(context) diff --git a/cinder/taskflow/__init__.py b/cinder/taskflow/__init__.py deleted file mode 100644 index 1f19be57566..00000000000 --- a/cinder/taskflow/__init__.py +++ /dev/null @@ -1 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 diff --git a/cinder/taskflow/decorators.py b/cinder/taskflow/decorators.py deleted file mode 100644 index ea99d13934c..00000000000 --- a/cinder/taskflow/decorators.py +++ /dev/null @@ -1,276 +0,0 @@ -# -*- coding: utf-8 -*- - -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import collections -import functools -import inspect -import types - -# These arguments are ones that we will skip when parsing for requirements -# for a function to operate (when used as a task). -AUTO_ARGS = ('self', 'context', 'cls') - - -def is_decorated(functor): - if not isinstance(functor, (types.MethodType, types.FunctionType)): - return False - return getattr(extract(functor), '__task__', False) - - -def extract(functor): - # Extract the underlying functor if its a method since we can not set - # attributes on instance methods, this is supposedly fixed in python 3 - # and later. - # - # TODO(harlowja): add link to this fix. - assert isinstance(functor, (types.MethodType, types.FunctionType)) - if isinstance(functor, types.MethodType): - return functor.__func__ - else: - return functor - - -def _mark_as_task(functor): - setattr(functor, '__task__', True) - - -def _get_wrapped(function): - """Get the method at the bottom of a stack of decorators.""" - - if hasattr(function, '__wrapped__'): - return getattr(function, '__wrapped__') - - if not hasattr(function, 'func_closure') or not function.func_closure: - return function - - def _get_wrapped_function(function): - if not hasattr(function, 'func_closure') or not function.func_closure: - return None - - for closure in function.func_closure: - func = closure.cell_contents - - deeper_func = _get_wrapped_function(func) - if deeper_func: - return deeper_func - elif hasattr(closure.cell_contents, '__call__'): - return closure.cell_contents - - return _get_wrapped_function(function) - - -def _take_arg(a): - if a in AUTO_ARGS: - return False - # In certain decorator cases it seems like we get the function to be - # decorated as an argument, we don't want to take that as a real argument. - if isinstance(a, collections.Callable): - return False - return True - - -def wraps(fn): - """This will not be needed in python 3.2 or greater which already has this - built-in to its functools.wraps method. - """ - - def wrapper(f): - f = functools.wraps(fn)(f) - f.__wrapped__ = getattr(fn, '__wrapped__', fn) - return f - - return wrapper - - -def locked(f): - - @wraps(f) - def wrapper(self, *args, **kwargs): - with self._lock: - return f(self, *args, **kwargs) - - return wrapper - - -def task(*args, **kwargs): - """Decorates a given function and ensures that all needed attributes of - that function are set so that the function can be used as a task. - """ - - def decorator(f): - w_f = extract(f) - - def noop(*args, **kwargs): - pass - - # Mark as being a task - _mark_as_task(w_f) - - # By default don't revert this. - w_f.revert = kwargs.pop('revert_with', noop) - - # Associate a name of this task that is the module + function name. - w_f.name = "%s.%s" % (f.__module__, f.__name__) - - # Sets the version of the task. - version = kwargs.pop('version', (1, 0)) - f = _versionize(*version)(f) - - # Attach any requirements this function needs for running. - requires_what = kwargs.pop('requires', []) - f = _requires(*requires_what, **kwargs)(f) - - # Attach any optional requirements this function needs for running. - optional_what = kwargs.pop('optional', []) - f = _optional(*optional_what, **kwargs)(f) - - # Attach any items this function provides as output - provides_what = kwargs.pop('provides', []) - f = _provides(*provides_what, **kwargs)(f) - - @wraps(f) - def wrapper(*args, **kwargs): - return f(*args, **kwargs) - - return wrapper - - # This is needed to handle when the decorator has args or the decorator - # doesn't have args, python is rather weird here... - if kwargs or not args: - return decorator - else: - if isinstance(args[0], collections.Callable): - return decorator(args[0]) - else: - return decorator - - -def _versionize(major, minor=None): - """A decorator that marks the wrapped function with a major & minor version - number. - """ - - if minor is None: - minor = 0 - - def decorator(f): - w_f = extract(f) - w_f.version = (major, minor) - - @wraps(f) - def wrapper(*args, **kwargs): - return f(*args, **kwargs) - - return wrapper - - return decorator - - -def _optional(*args, **kwargs): - """Attaches a set of items that the decorated function would like as input - to the functions underlying dictionary. - """ - - def decorator(f): - w_f = extract(f) - - if not hasattr(w_f, 'optional'): - w_f.optional = set() - - w_f.optional.update([a for a in args if _take_arg(a)]) - - @wraps(f) - def wrapper(*args, **kwargs): - return f(*args, **kwargs) - - return wrapper - - # This is needed to handle when the decorator has args or the decorator - # doesn't have args, python is rather weird here... - if kwargs or not args: - return decorator - else: - if isinstance(args[0], collections.Callable): - return decorator(args[0]) - else: - return decorator - - -def _requires(*args, **kwargs): - """Attaches a set of items that the decorated function requires as input - to the functions underlying dictionary. - """ - - def decorator(f): - w_f = extract(f) - - if not hasattr(w_f, 'requires'): - w_f.requires = set() - - if kwargs.pop('auto_extract', True): - inspect_what = _get_wrapped(f) - f_args = inspect.getargspec(inspect_what).args - w_f.requires.update([a for a in f_args if _take_arg(a)]) - - w_f.requires.update([a for a in args if _take_arg(a)]) - - @wraps(f) - def wrapper(*args, **kwargs): - return f(*args, **kwargs) - - return wrapper - - # This is needed to handle when the decorator has args or the decorator - # doesn't have args, python is rather weird here... - if kwargs or not args: - return decorator - else: - if isinstance(args[0], collections.Callable): - return decorator(args[0]) - else: - return decorator - - -def _provides(*args, **kwargs): - """Attaches a set of items that the decorated function provides as output - to the functions underlying dictionary. - """ - - def decorator(f): - w_f = extract(f) - - if not hasattr(f, 'provides'): - w_f.provides = set() - - w_f.provides.update([a for a in args if _take_arg(a)]) - - @wraps(f) - def wrapper(*args, **kwargs): - return f(*args, **kwargs) - - return wrapper - - # This is needed to handle when the decorator has args or the decorator - # doesn't have args, python is rather weird here... - if kwargs or not args: - return decorator - else: - if isinstance(args[0], collections.Callable): - return decorator(args[0]) - else: - return decorator diff --git a/cinder/taskflow/exceptions.py b/cinder/taskflow/exceptions.py deleted file mode 100644 index 62deadd0ca4..00000000000 --- a/cinder/taskflow/exceptions.py +++ /dev/null @@ -1,69 +0,0 @@ -# -*- coding: utf-8 -*- - -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - - -class TaskFlowException(Exception): - """Base class for exceptions emitted from this library.""" - pass - - -class Duplicate(TaskFlowException): - """Raised when a duplicate entry is found.""" - pass - - -class NotFound(TaskFlowException): - """Raised when some entry in some object doesn't exist.""" - pass - - -class AlreadyExists(TaskFlowException): - """Raised when some entry in some object already exists.""" - pass - - -class ClosedException(TaskFlowException): - """Raised when an access on a closed object occurs.""" - pass - - -class InvalidStateException(TaskFlowException): - """Raised when a task/job/workflow is in an invalid state when an - operation is attempting to apply to said task/job/workflow. - """ - pass - - -class UnclaimableJobException(TaskFlowException): - """Raised when a job can not be claimed.""" - pass - - -class JobNotFound(TaskFlowException): - """Raised when a job entry can not be found.""" - pass - - -class MissingDependencies(InvalidStateException): - """Raised when a task has dependencies that can not be satisfied.""" - message = ("%(task)s requires %(requirements)s but no other task produces" - " said requirements") - - def __init__(self, task, requirements): - message = self.message % {'task': task, 'requirements': requirements} - super(MissingDependencies, self).__init__(message) diff --git a/cinder/taskflow/patterns/__init__.py b/cinder/taskflow/patterns/__init__.py deleted file mode 100644 index 1f19be57566..00000000000 --- a/cinder/taskflow/patterns/__init__.py +++ /dev/null @@ -1 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 diff --git a/cinder/taskflow/patterns/base.py b/cinder/taskflow/patterns/base.py deleted file mode 100644 index 20ab9d1ef99..00000000000 --- a/cinder/taskflow/patterns/base.py +++ /dev/null @@ -1,215 +0,0 @@ -# -*- coding: utf-8 -*- - -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import abc -import threading -import uuid as uuidlib - -import six - - -from cinder.taskflow import decorators -from cinder.taskflow import exceptions as exc -from cinder.taskflow import states -from cinder.taskflow import utils - - -@six.add_metaclass(abc.ABCMeta) -class Flow(object): - """The base abstract class of all flow implementations. - - It provides a set of parents to flows that have a concept of parent flows - as well as a state and state utility functions to the deriving classes. It - also provides a name and an identifier (uuid or other) to the flow so that - it can be uniquely identifed among many flows. - - Flows are expected to provide (if desired) the following methods: - - add - - add_many - - interrupt - - reset - - rollback - - run - - soft_reset - """ - - # Common states that certain actions can be performed in. If the flow - # is not in these sets of states then it is likely that the flow operation - # can not succeed. - RESETTABLE_STATES = set([ - states.INTERRUPTED, - states.SUCCESS, - states.PENDING, - states.FAILURE, - ]) - SOFT_RESETTABLE_STATES = set([ - states.INTERRUPTED, - ]) - UNINTERRUPTIBLE_STATES = set([ - states.FAILURE, - states.SUCCESS, - states.PENDING, - ]) - RUNNABLE_STATES = set([ - states.PENDING, - ]) - - def __init__(self, name, parents=None, uuid=None): - self._name = str(name) - # The state of this flow. - self._state = states.PENDING - # If this flow has a parent flow/s which need to be reverted if - # this flow fails then please include them here to allow this child - # to call the parents... - if parents: - self.parents = tuple(parents) - else: - self.parents = () - # Any objects that want to listen when a wf/task starts/stops/completes - # or errors should be registered here. This can be used to monitor - # progress and record tasks finishing (so that it becomes possible to - # store the result of a task in some persistent or semi-persistent - # storage backend). - self.notifier = utils.TransitionNotifier() - self.task_notifier = utils.TransitionNotifier() - # Ensure that modifications and/or multiple runs aren't happening - # at the same time in the same flow at the same time. - self._lock = threading.RLock() - # Assign this flow a unique identifer. - if uuid: - self._id = str(uuid) - else: - self._id = str(uuidlib.uuid4()) - - @property - def name(self): - """A non-unique name for this flow (human readable)""" - return self._name - - @property - def uuid(self): - """Uniquely identifies this flow""" - return "f-%s" % (self._id) - - @property - def state(self): - """Provides a read-only view of the flow state.""" - return self._state - - def _change_state(self, context, new_state): - was_changed = False - old_state = self.state - with self._lock: - if self.state != new_state: - old_state = self.state - self._state = new_state - was_changed = True - if was_changed: - # Don't notify while holding the lock. - self.notifier.notify(self.state, details={ - 'context': context, - 'flow': self, - 'old_state': old_state, - }) - - def __str__(self): - lines = ["Flow: %s" % (self.name)] - lines.append("%s" % (self.uuid)) - lines.append("%s" % (len(self.parents))) - lines.append("%s" % (self.state)) - return "; ".join(lines) - - @abc.abstractmethod - def add(self, task): - """Adds a given task to this flow. - - Returns the uuid that is associated with the task for later operations - before and after it is ran. - """ - raise NotImplementedError() - - @decorators.locked - def add_many(self, tasks): - """Adds many tasks to this flow. - - Returns a list of uuids (one for each task added). - """ - uuids = [] - for t in tasks: - uuids.append(self.add(t)) - return uuids - - def interrupt(self): - """Attempts to interrupt the current flow and any tasks that are - currently not running in the flow. - - Returns how many tasks were interrupted (if any). - """ - if self.state in self.UNINTERRUPTIBLE_STATES: - raise exc.InvalidStateException(("Can not interrupt when" - " in state %s") % (self.state)) - # Note(harlowja): Do *not* acquire the lock here so that the flow may - # be interrupted while running. This does mean the the above check may - # not be valid but we can worry about that if it becomes an issue. - old_state = self.state - if old_state != states.INTERRUPTED: - self._state = states.INTERRUPTED - self.notifier.notify(self.state, details={ - 'context': None, - 'flow': self, - 'old_state': old_state, - }) - return 0 - - @decorators.locked - def reset(self): - """Fully resets the internal state of this flow, allowing for the flow - to be ran again. - - Note: Listeners are also reset. - """ - if self.state not in self.RESETTABLE_STATES: - raise exc.InvalidStateException(("Can not reset when" - " in state %s") % (self.state)) - self.notifier.reset() - self.task_notifier.reset() - self._change_state(None, states.PENDING) - - @decorators.locked - def soft_reset(self): - """Partially resets the internal state of this flow, allowing for the - flow to be ran again from an interrupted state only. - """ - if self.state not in self.SOFT_RESETTABLE_STATES: - raise exc.InvalidStateException(("Can not soft reset when" - " in state %s") % (self.state)) - self._change_state(None, states.PENDING) - - @decorators.locked - def run(self, context, *args, **kwargs): - """Executes the workflow.""" - if self.state not in self.RUNNABLE_STATES: - raise exc.InvalidStateException("Unable to run flow when " - "in state %s" % (self.state)) - - @decorators.locked - def rollback(self, context, cause): - """Performs rollback of this workflow and any attached parent workflows - if present. - """ - pass diff --git a/cinder/taskflow/patterns/linear_flow.py b/cinder/taskflow/patterns/linear_flow.py deleted file mode 100644 index 16f220afe36..00000000000 --- a/cinder/taskflow/patterns/linear_flow.py +++ /dev/null @@ -1,271 +0,0 @@ -# -*- coding: utf-8 -*- - -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import collections -import logging - -from cinder.openstack.common import excutils - -from cinder.taskflow import decorators -from cinder.taskflow import exceptions as exc -from cinder.taskflow import states -from cinder.taskflow import utils - -from cinder.taskflow.patterns import base - -LOG = logging.getLogger(__name__) - - -class Flow(base.Flow): - """"A linear chain of tasks that can be applied in order as one unit and - rolled back as one unit using the reverse order that the tasks have - been applied in. - - Note(harlowja): Each task in the chain must have requirements - which are satisfied by the previous task/s in the chain. - """ - - def __init__(self, name, parents=None, uuid=None): - super(Flow, self).__init__(name, parents, uuid) - # The tasks which have been applied will be collected here so that they - # can be reverted in the correct order on failure. - self._accumulator = utils.RollbackAccumulator() - # Tasks results are stored here. Lookup is by the uuid that was - # returned from the add function. - self.results = {} - # The previously left off iterator that can be used to resume from - # the last task (if interrupted and soft-reset). - self._leftoff_at = None - # All runners to run are collected here. - self._runners = [] - self._connected = False - # The resumption strategy to use. - self.resumer = None - - @decorators.locked - def add(self, task): - """Adds a given task to this flow.""" - assert isinstance(task, collections.Callable) - r = utils.Runner(task) - r.runs_before = list(reversed(self._runners)) - self._runners.append(r) - self._reset_internals() - return r.uuid - - def _reset_internals(self): - self._connected = False - self._leftoff_at = None - - def _associate_providers(self, runner): - # Ensure that some previous task provides this input. - who_provides = {} - task_requires = runner.requires - for r in task_requires: - provider = None - for before_me in runner.runs_before: - if r in before_me.provides: - provider = before_me - break - if provider: - who_provides[r] = provider - # Ensure that the last task provides all the needed input for this - # task to run correctly. - missing_requires = task_requires - set(who_provides.keys()) - if missing_requires: - raise exc.MissingDependencies(runner, sorted(missing_requires)) - runner.providers.update(who_provides) - - def __str__(self): - lines = ["LinearFlow: %s" % (self.name)] - lines.append("%s" % (self.uuid)) - lines.append("%s" % (len(self._runners))) - lines.append("%s" % (len(self.parents))) - lines.append("%s" % (self.state)) - return "; ".join(lines) - - @decorators.locked - def remove(self, uuid): - index_removed = -1 - for (i, r) in enumerate(self._runners): - if r.uuid == uuid: - index_removed = i - break - if index_removed == -1: - raise ValueError("No runner found with uuid %s" % (uuid)) - else: - removed = self._runners.pop(index_removed) - self._reset_internals() - # Go and remove it from any runner after the removed runner since - # those runners may have had an attachment to it. - for r in self._runners[index_removed:]: - try: - r.runs_before.remove(removed) - except (IndexError, ValueError): - pass - - def __len__(self): - return len(self._runners) - - def _connect(self): - if self._connected: - return self._runners - for r in self._runners: - r.providers = {} - for r in reversed(self._runners): - self._associate_providers(r) - self._connected = True - return self._runners - - def _ordering(self): - return iter(self._connect()) - - @decorators.locked - def run(self, context, *args, **kwargs): - super(Flow, self).run(context, *args, **kwargs) - - def resume_it(): - if self._leftoff_at is not None: - return ([], self._leftoff_at) - if self.resumer: - (finished, leftover) = self.resumer.resume(self, - self._ordering()) - else: - finished = [] - leftover = self._ordering() - return (finished, leftover) - - self._change_state(context, states.STARTED) - try: - those_finished, leftover = resume_it() - except Exception: - with excutils.save_and_reraise_exception(): - self._change_state(context, states.FAILURE) - - def run_it(runner, failed=False, result=None, simulate_run=False): - try: - # Add the task to be rolled back *immediately* so that even if - # the task fails while producing results it will be given a - # chance to rollback. - rb = utils.RollbackTask(context, runner.task, result=None) - self._accumulator.add(rb) - self.task_notifier.notify(states.STARTED, details={ - 'context': context, - 'flow': self, - 'runner': runner, - }) - if not simulate_run: - result = runner(context, *args, **kwargs) - else: - if failed: - # TODO(harlowja): make this configurable?? - # If we previously failed, we want to fail again at - # the same place. - if not result: - # If no exception or exception message was provided - # or captured from the previous run then we need to - # form one for this task. - result = "%s failed running." % (runner.task) - if isinstance(result, basestring): - result = exc.InvalidStateException(result) - if not isinstance(result, Exception): - LOG.warn("Can not raise a non-exception" - " object: %s", result) - result = exc.InvalidStateException() - raise result - # Adjust the task result in the accumulator before - # notifying others that the task has finished to - # avoid the case where a listener might throw an - # exception. - rb.result = result - runner.result = result - self.results[runner.uuid] = result - self.task_notifier.notify(states.SUCCESS, details={ - 'context': context, - 'flow': self, - 'runner': runner, - }) - except Exception as e: - runner.result = e - cause = utils.FlowFailure(runner, self, e) - with excutils.save_and_reraise_exception(): - # Notify any listeners that the task has errored. - self.task_notifier.notify(states.FAILURE, details={ - 'context': context, - 'flow': self, - 'runner': runner, - }) - self.rollback(context, cause) - - if len(those_finished): - self._change_state(context, states.RESUMING) - for (r, details) in those_finished: - # Fake running the task so that we trigger the same - # notifications and state changes (and rollback that - # would have happened in a normal flow). - failed = states.FAILURE in details.get('states', []) - result = details.get('result') - run_it(r, failed=failed, result=result, simulate_run=True) - - self._leftoff_at = leftover - self._change_state(context, states.RUNNING) - if self.state == states.INTERRUPTED: - return - - was_interrupted = False - for r in leftover: - r.reset() - run_it(r) - if self.state == states.INTERRUPTED: - was_interrupted = True - break - - if not was_interrupted: - # Only gets here if everything went successfully. - self._change_state(context, states.SUCCESS) - self._leftoff_at = None - - @decorators.locked - def reset(self): - super(Flow, self).reset() - self.results = {} - self.resumer = None - self._accumulator.reset() - self._reset_internals() - - @decorators.locked - def rollback(self, context, cause): - # Performs basic task by task rollback by going through the reverse - # order that tasks have finished and asking said task to undo whatever - # it has done. If this flow has any parent flows then they will - # also be called to rollback any tasks said parents contain. - # - # Note(harlowja): if a flow can more simply revert a whole set of - # tasks via a simpler command then it can override this method to - # accomplish that. - # - # For example, if each task was creating a file in a directory, then - # it's easier to just remove the directory than to ask each task to - # delete its file individually. - self._change_state(context, states.REVERTING) - try: - self._accumulator.rollback(cause) - finally: - self._change_state(context, states.FAILURE) - # Rollback any parents flows if they exist... - for p in self.parents: - p.rollback(context, cause) diff --git a/cinder/taskflow/states.py b/cinder/taskflow/states.py deleted file mode 100644 index 48d320a70de..00000000000 --- a/cinder/taskflow/states.py +++ /dev/null @@ -1,40 +0,0 @@ -# -*- coding: utf-8 -*- - -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -# Job states. -CLAIMED = 'CLAIMED' -FAILURE = 'FAILURE' -PENDING = 'PENDING' -RUNNING = 'RUNNING' -SUCCESS = 'SUCCESS' -UNCLAIMED = 'UNCLAIMED' - -# Flow states. -FAILURE = FAILURE -INTERRUPTED = 'INTERRUPTED' -PENDING = 'PENDING' -RESUMING = 'RESUMING' -REVERTING = 'REVERTING' -RUNNING = RUNNING -STARTED = 'STARTED' -SUCCESS = SUCCESS - -# Task states. -FAILURE = FAILURE -STARTED = STARTED -SUCCESS = SUCCESS diff --git a/cinder/taskflow/task.py b/cinder/taskflow/task.py deleted file mode 100644 index 3d5d2eeed0f..00000000000 --- a/cinder/taskflow/task.py +++ /dev/null @@ -1,67 +0,0 @@ -# -*- coding: utf-8 -*- - -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import abc - -import six - -from cinder.taskflow import utils - - -@six.add_metaclass(abc.ABCMeta) -class Task(object): - """An abstraction that defines a potential piece of work that can be - applied and can be reverted to undo the work as a single unit. - """ - def __init__(self, name): - self.name = name - # An *immutable* input 'resource' name set this task depends - # on existing before this task can be applied. - self.requires = set() - # An *immutable* input 'resource' name set this task would like to - # depends on existing before this task can be applied (but does not - # strongly depend on existing). - self.optional = set() - # An *immutable* output 'resource' name set this task - # produces that other tasks may depend on this task providing. - self.provides = set() - # This identifies the version of the task to be ran which - # can be useful in resuming older versions of tasks. Standard - # major, minor version semantics apply. - self.version = (1, 0) - - def __str__(self): - return "%s==%s" % (self.name, utils.join(self.version, with_what=".")) - - @abc.abstractmethod - def __call__(self, context, *args, **kwargs): - """Activate a given task which will perform some operation and return. - - This method can be used to apply some given context and given set - of args and kwargs to accomplish some goal. Note that the result - that is returned needs to be serializable so that it can be passed - back into this task if reverting is triggered. - """ - raise NotImplementedError() - - def revert(self, context, result, cause): - """Revert this task using the given context, result that the apply - provided as well as any information which may have caused - said reversion. - """ - pass diff --git a/cinder/taskflow/utils.py b/cinder/taskflow/utils.py deleted file mode 100644 index cb66fef98c4..00000000000 --- a/cinder/taskflow/utils.py +++ /dev/null @@ -1,464 +0,0 @@ -# -*- coding: utf-8 -*- - -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import collections -import contextlib -import copy -import logging -import re -import sys -import threading -import time -import types -import uuid as uuidlib - - -from cinder.taskflow import decorators - -LOG = logging.getLogger(__name__) - - -def get_attr(task, field, default=None): - if decorators.is_decorated(task): - # If its a decorated functor then the attributes will be either - # in the underlying function of the instancemethod or the function - # itself. - task = decorators.extract(task) - return getattr(task, field, default) - - -def join(itr, with_what=","): - pieces = [str(i) for i in itr] - return with_what.join(pieces) - - -def get_many_attr(obj, *attrs): - many = [] - for a in attrs: - many.append(get_attr(obj, a, None)) - return many - - -def get_task_version(task): - """Gets a tasks *string* version, whether it is a task object/function.""" - task_version = get_attr(task, 'version') - if isinstance(task_version, (list, tuple)): - task_version = join(task_version, with_what=".") - if task_version is not None and not isinstance(task_version, basestring): - task_version = str(task_version) - return task_version - - -def get_task_name(task): - """Gets a tasks *string* name, whether it is a task object/function.""" - task_name = "" - if isinstance(task, (types.MethodType, types.FunctionType)): - # If its a function look for the attributes that should have been - # set using the task() decorator provided in the decorators file. If - # those have not been set, then we should at least have enough basic - # information (not a version) to form a useful task name. - task_name = get_attr(task, 'name') - if not task_name: - name_pieces = [a for a in get_many_attr(task, - '__module__', - '__name__') - if a is not None] - task_name = join(name_pieces, ".") - else: - task_name = str(task) - return task_name - - -def is_version_compatible(version_1, version_2): - """Checks for major version compatibility of two *string" versions.""" - if version_1 == version_2: - # Equivalent exactly, so skip the rest. - return True - - def _convert_to_pieces(version): - try: - pieces = [] - for p in version.split("."): - p = p.strip() - if not len(p): - pieces.append(0) - continue - # Clean off things like 1alpha, or 2b and just select the - # digit that starts that entry instead. - p_match = re.match(r"(\d+)([A-Za-z]*)(.*)", p) - if p_match: - p = p_match.group(1) - pieces.append(int(p)) - except (AttributeError, TypeError, ValueError): - pieces = [] - return pieces - - version_1_pieces = _convert_to_pieces(version_1) - version_2_pieces = _convert_to_pieces(version_2) - if len(version_1_pieces) == 0 or len(version_2_pieces) == 0: - return False - - # Ensure major version compatibility to start. - major1 = version_1_pieces[0] - major2 = version_2_pieces[0] - if major1 != major2: - return False - return True - - -def await(check_functor, timeout=None): - if timeout is not None: - end_time = time.time() + max(0, timeout) - else: - end_time = None - # Use the same/similar scheme that the python condition class uses. - delay = 0.0005 - while not check_functor(): - time.sleep(delay) - if end_time is not None: - remaining = end_time - time.time() - if remaining <= 0: - return False - delay = min(delay * 2, remaining, 0.05) - else: - delay = min(delay * 2, 0.05) - return True - - -class LastFedIter(object): - """An iterator which yields back the first item and then yields back - results from the provided iterator. - """ - - def __init__(self, first, rest_itr): - self.first = first - self.rest_itr = rest_itr - - def __iter__(self): - yield self.first - for i in self.rest_itr: - yield i - - -class FlowFailure(object): - """When a task failure occurs the following object will be given to revert - and can be used to interrogate what caused the failure. - """ - - def __init__(self, runner, flow, exception): - self.runner = runner - self.flow = flow - self.exc = exception - self.exc_info = sys.exc_info() - - -class RollbackTask(object): - """A helper task that on being called will call the underlying callable - tasks revert method (if said method exists). - """ - - def __init__(self, context, task, result): - self.task = task - self.result = result - self.context = context - - def __str__(self): - return str(self.task) - - def __call__(self, cause): - if ((hasattr(self.task, "revert") and - isinstance(self.task.revert, collections.Callable))): - self.task.revert(self.context, self.result, cause) - - -class Runner(object): - """A helper class that wraps a task and can find the needed inputs for - the task to run, as well as providing a uuid and other useful functionality - for users of the task. - - TODO(harlowja): replace with the task details object or a subclass of - that??? - """ - - def __init__(self, task, uuid=None): - assert isinstance(task, collections.Callable) - self.task = task - self.providers = {} - self.runs_before = [] - self.result = None - if not uuid: - self._id = str(uuidlib.uuid4()) - else: - self._id = str(uuid) - - @property - def uuid(self): - return "r-%s" % (self._id) - - @property - def requires(self): - return set(get_attr(self.task, 'requires', [])) - - @property - def provides(self): - return set(get_attr(self.task, 'provides', [])) - - @property - def optional(self): - return set(get_attr(self.task, 'optional', [])) - - @property - def version(self): - return get_task_version(self.task) - - @property - def name(self): - return get_task_name(self.task) - - def reset(self): - self.result = None - - def __str__(self): - lines = ["Runner: %s" % (self.name)] - lines.append("%s" % (self.uuid)) - lines.append("%s" % (self.version)) - return "; ".join(lines) - - def __call__(self, *args, **kwargs): - # Find all of our inputs first. - kwargs = dict(kwargs) - for (k, who_made) in self.providers.iteritems(): - if who_made.result and k in who_made.result: - kwargs[k] = who_made.result[k] - else: - kwargs[k] = None - optional_keys = self.optional - optional_missing_keys = optional_keys - set(kwargs.keys()) - if optional_missing_keys: - for k in optional_missing_keys: - for r in self.runs_before: - r_provides = r.provides - if k in r_provides and r.result and k in r.result: - kwargs[k] = r.result[k] - break - # And now finally run. - self.result = self.task(*args, **kwargs) - return self.result - - -class TransitionNotifier(object): - """A utility helper class that can be used to subscribe to - notifications of events occurring as well as allow a entity to post said - notifications to subscribers. - """ - - RESERVED_KEYS = ('details',) - ANY = '*' - - def __init__(self): - self._listeners = collections.defaultdict(list) - - def reset(self): - self._listeners = collections.defaultdict(list) - - def notify(self, state, details): - listeners = list(self._listeners.get(self.ANY, [])) - for i in self._listeners[state]: - if i not in listeners: - listeners.append(i) - if not listeners: - return - for (callback, args, kwargs) in listeners: - if args is None: - args = [] - if kwargs is None: - kwargs = {} - kwargs['details'] = details - try: - callback(state, *args, **kwargs) - except Exception: - LOG.exception(("Failure calling callback %s to notify about" - " state transition %s"), callback, state) - - def register(self, state, callback, args=None, kwargs=None): - assert isinstance(callback, collections.Callable) - for i, (cb, args, kwargs) in enumerate(self._listeners.get(state, [])): - if cb is callback: - raise ValueError("Callback %s already registered" % (callback)) - if kwargs: - for k in self.RESERVED_KEYS: - if k in kwargs: - raise KeyError(("Reserved key '%s' not allowed in " - "kwargs") % k) - kwargs = copy.copy(kwargs) - if args: - args = copy.copy(args) - self._listeners[state].append((callback, args, kwargs)) - - def deregister(self, state, callback): - if state not in self._listeners: - return - for i, (cb, args, kwargs) in enumerate(self._listeners[state]): - if cb is callback: - self._listeners[state].pop(i) - break - - -class RollbackAccumulator(object): - """A utility class that can help in organizing 'undo' like code - so that said code be rolled back on failure (automatically or manually) - by activating rollback callables that were inserted during said codes - progression. - """ - - def __init__(self): - self._rollbacks = [] - - def add(self, *callables): - self._rollbacks.extend(callables) - - def reset(self): - self._rollbacks = [] - - def __len__(self): - return len(self._rollbacks) - - def __iter__(self): - # Rollbacks happen in the reverse order that they were added. - return reversed(self._rollbacks) - - def __enter__(self): - return self - - def rollback(self, cause): - LOG.warn("Activating %s rollbacks due to %s.", len(self), cause) - for (i, f) in enumerate(self): - LOG.debug("Calling rollback %s: %s", i + 1, f) - try: - f(cause) - except Exception: - LOG.exception(("Failed rolling back %s: %s due " - "to inner exception."), i + 1, f) - - def __exit__(self, type, value, tb): - if any((value, type, tb)): - self.rollback(value) - - -class ReaderWriterLock(object): - """A simple reader-writer lock. - - Several readers can hold the lock simultaneously, and only one writer. - Write locks have priority over reads to prevent write starvation. - - Public domain @ http://majid.info/blog/a-reader-writer-lock-for-python/ - """ - - def __init__(self): - self.rwlock = 0 - self.writers_waiting = 0 - self.monitor = threading.Lock() - self.readers_ok = threading.Condition(self.monitor) - self.writers_ok = threading.Condition(self.monitor) - - @contextlib.contextmanager - def acquire(self, read=True): - """Acquire a read or write lock in a context manager.""" - try: - if read: - self.acquire_read() - else: - self.acquire_write() - yield self - finally: - self.release() - - def acquire_read(self): - """Acquire a read lock. - - Several threads can hold this typeof lock. - It is exclusive with write locks. - """ - - self.monitor.acquire() - while self.rwlock < 0 or self.writers_waiting: - self.readers_ok.wait() - self.rwlock += 1 - self.monitor.release() - - def acquire_write(self): - """Acquire a write lock. - - Only one thread can hold this lock, and only when no read locks - are also held. - """ - - self.monitor.acquire() - while self.rwlock != 0: - self.writers_waiting += 1 - self.writers_ok.wait() - self.writers_waiting -= 1 - self.rwlock = -1 - self.monitor.release() - - def release(self): - """Release a lock, whether read or write.""" - - self.monitor.acquire() - if self.rwlock < 0: - self.rwlock = 0 - else: - self.rwlock -= 1 - wake_writers = self.writers_waiting and self.rwlock == 0 - wake_readers = self.writers_waiting == 0 - self.monitor.release() - if wake_writers: - self.writers_ok.acquire() - self.writers_ok.notify() - self.writers_ok.release() - elif wake_readers: - self.readers_ok.acquire() - self.readers_ok.notifyAll() - self.readers_ok.release() - - -class LazyPluggable(object): - """A pluggable backend loaded lazily based on some value.""" - - def __init__(self, pivot, **backends): - self.__backends = backends - self.__pivot = pivot - self.__backend = None - - def __get_backend(self): - if not self.__backend: - backend_name = 'sqlalchemy' - backend = self.__backends[backend_name] - if isinstance(backend, tuple): - name = backend[0] - fromlist = backend[1] - else: - name = backend - fromlist = backend - - self.__backend = __import__(name, None, None, fromlist) - return self.__backend - - def __getattr__(self, key): - backend = self.__get_backend() - return getattr(backend, key) diff --git a/cinder/tests/test_volume.py b/cinder/tests/test_volume.py index 6109c2f8313..4d28c267555 100644 --- a/cinder/tests/test_volume.py +++ b/cinder/tests/test_volume.py @@ -30,6 +30,7 @@ import tempfile import mox from oslo.config import cfg +from taskflow.engines.action_engine import engine from cinder.backup import driver as backup_driver from cinder.brick.iscsi import iscsi @@ -46,7 +47,6 @@ from cinder.openstack.common.notifier import test_notifier from cinder.openstack.common import rpc import cinder.policy from cinder import quota -from cinder.taskflow.patterns import linear_flow from cinder import test from cinder.tests.brick.fake_lvm import FakeBrickLVM from cinder.tests import conf_fixture @@ -59,7 +59,6 @@ import cinder.volume from cinder.volume import configuration as conf from cinder.volume import driver from cinder.volume.drivers import lvm -from cinder.volume.flows import create_volume from cinder.volume import rpcapi as volume_rpcapi from cinder.volume import utils as volutils @@ -465,7 +464,7 @@ class VolumeTestCase(BaseVolumeTestCase): self.stubs.Set(self.volume.driver, 'create_volume_from_snapshot', lambda *args, **kwargs: None) - orig_flow = linear_flow.Flow.run + orig_flow = engine.ActionEngine.run def mock_flow_run(*args, **kwargs): # ensure the lock has been taken @@ -492,7 +491,7 @@ class VolumeTestCase(BaseVolumeTestCase): admin_ctxt = context.get_admin_context() # mock the flow runner so we can do some checks - self.stubs.Set(linear_flow.Flow, 'run', mock_flow_run) + self.stubs.Set(engine.ActionEngine, 'run', mock_flow_run) # locked self.volume.create_volume(self.context, volume_id=dst_vol_id, @@ -528,7 +527,7 @@ class VolumeTestCase(BaseVolumeTestCase): # mock the synchroniser so we can record events self.stubs.Set(utils, 'synchronized', self._mock_synchronized) - orig_flow = linear_flow.Flow.run + orig_flow = engine.ActionEngine.run def mock_flow_run(*args, **kwargs): # ensure the lock has been taken @@ -551,7 +550,7 @@ class VolumeTestCase(BaseVolumeTestCase): admin_ctxt = context.get_admin_context() # mock the flow runner so we can do some checks - self.stubs.Set(linear_flow.Flow, 'run', mock_flow_run) + self.stubs.Set(engine.ActionEngine, 'run', mock_flow_run) # locked self.volume.create_volume(self.context, volume_id=dst_vol_id, @@ -1816,13 +1815,6 @@ class VolumeTestCase(BaseVolumeTestCase): def fake_create_volume(*args, **kwargs): raise exception.CinderException('fake exception') - def fake_reschedule_or_error(self, context, *args, **kwargs): - self.assertFalse(context.is_admin) - self.assertNotIn('admin', context.roles) - #compare context passed in with the context we saved - self.assertDictMatch(self.saved_ctxt.__dict__, - context.__dict__) - #create context for testing ctxt = self.context.deepcopy() if 'admin' in ctxt.roles: @@ -1831,8 +1823,6 @@ class VolumeTestCase(BaseVolumeTestCase): #create one copy of context for future comparison self.saved_ctxt = ctxt.deepcopy() - self.stubs.Set(create_volume.OnFailureRescheduleTask, '_reschedule', - fake_reschedule_or_error) self.stubs.Set(self.volume.driver, 'create_volume', fake_create_volume) volume_src = tests_utils.create_volume(self.context, diff --git a/cinder/volume/api.py b/cinder/volume/api.py index c36ff6b6d67..857c107ae71 100644 --- a/cinder/volume/api.py +++ b/cinder/volume/api.py @@ -37,15 +37,12 @@ from cinder.openstack.common import timeutils import cinder.policy from cinder import quota from cinder.scheduler import rpcapi as scheduler_rpcapi -from cinder import units from cinder import utils from cinder.volume.flows import create_volume from cinder.volume import rpcapi as volume_rpcapi from cinder.volume import utils as volume_utils from cinder.volume import volume_types -from cinder.taskflow import states - volume_host_opt = cfg.BoolOpt('snapshot_same_host', default=True, @@ -147,42 +144,34 @@ class API(base.Base): return False create_what = { - 'size': size, + 'context': context, + 'raw_size': size, 'name': name, 'description': description, 'snapshot': snapshot, 'image_id': image_id, - 'volume_type': volume_type, + 'raw_volume_type': volume_type, 'metadata': metadata, - 'availability_zone': availability_zone, + 'raw_availability_zone': availability_zone, 'source_volume': source_volume, 'scheduler_hints': scheduler_hints, 'key_manager': self.key_manager, 'backup_source_volume': backup_source_volume, } - (flow, uuid) = create_volume.get_api_flow(self.scheduler_rpcapi, - self.volume_rpcapi, - self.db, - self.image_service, - check_volume_az_zone, - create_what) - assert flow, _('Create volume flow not retrieved') - flow.run(context) - if flow.state != states.SUCCESS: - raise exception.CinderException(_("Failed to successfully complete" - " create volume workflow")) - - # Extract the volume information from the task uuid that was specified - # to produce said information. - volume = None try: - volume = flow.results[uuid]['volume'] - except KeyError: - pass + flow_engine = create_volume.get_api_flow(self.scheduler_rpcapi, + self.volume_rpcapi, + self.db, + self.image_service, + check_volume_az_zone, + create_what) + except Exception: + raise exception.CinderException( + _("Failed to create api volume flow")) - # Raise an error, nobody provided it?? - assert volume, _('Expected volume result not found') + flow_engine.run() + volume = flow_engine.storage.fetch('volume') return volume @wrap_check_policy diff --git a/cinder/volume/flows/base.py b/cinder/volume/flows/base.py index 54ce06ad098..26c4805c266 100644 --- a/cinder/volume/flows/base.py +++ b/cinder/volume/flows/base.py @@ -15,7 +15,7 @@ # under the License. # For more information please visit: https://wiki.openstack.org/wiki/TaskFlow -from cinder.taskflow import task +from taskflow import task def _make_task_name(cls, addons=None): @@ -34,28 +34,7 @@ class CinderTask(task.Task): implement the given task as the task name. """ - def __init__(self, addons=None): + def __init__(self, addons=None, **kwargs): super(CinderTask, self).__init__(_make_task_name(self.__class__, - addons)) - - -class InjectTask(CinderTask): - """This injects a dict into the flow. - - This injection is done so that the keys (and values) provided can be - dependended on by tasks further down the line. Since taskflow is dependency - based this can be considered the bootstrapping task that provides an - initial set of values for other tasks to get started with. If this did not - exist then tasks would fail locating there dependent tasks and the values - said dependent tasks produce. - - Reversion strategy: N/A - """ - - def __init__(self, inject_what, addons=None): - super(InjectTask, self).__init__(addons=addons) - self.provides.update(inject_what.keys()) - self._inject = inject_what - - def __call__(self, context): - return dict(self._inject) + addons), + **kwargs) diff --git a/cinder/volume/flows/create_volume/__init__.py b/cinder/volume/flows/create_volume/__init__.py index d4fa8a20c99..7f3a6efa28a 100644 --- a/cinder/volume/flows/create_volume/__init__.py +++ b/cinder/volume/flows/create_volume/__init__.py @@ -23,6 +23,10 @@ import traceback from oslo.config import cfg +import taskflow.engines +from taskflow.patterns import linear_flow +from taskflow import task +from taskflow.utils import misc from cinder import exception from cinder.image import glance @@ -34,13 +38,9 @@ from cinder.openstack.common import strutils from cinder.openstack.common import timeutils from cinder import policy from cinder import quota -from cinder.taskflow import decorators -from cinder.taskflow.patterns import linear_flow -from cinder.taskflow import task from cinder import units from cinder import utils from cinder.volume.flows import base -from cinder.volume.flows import utils as flow_utils from cinder.volume import utils as volume_utils from cinder.volume import volume_types @@ -86,16 +86,6 @@ def _make_pretty_name(method): return ".".join(meth_pieces) -def _find_result_spec(flow): - """Find the last task that produced a valid volume_spec and returns it.""" - for there_result in flow.results.values(): - if not there_result or not 'volume_spec' in there_result: - continue - if there_result['volume_spec']: - return there_result['volume_spec'] - return None - - def _restore_source_status(context, db, volume_spec): # NOTE(harlowja): Only if the type of the volume that was being created is # the source volume type should we try to reset the source volume status @@ -171,25 +161,16 @@ class ExtractVolumeRequestTask(base.CinderTask): Reversion strategy: N/A """ - def __init__(self, image_service, az_check_functor=None): - super(ExtractVolumeRequestTask, self).__init__(addons=[ACTION]) - # This task will produce the following outputs (said outputs can be - # saved to durable storage in the future so that the flow can be - # reconstructed elsewhere and continued). - self.provides.update(['availability_zone', 'size', 'snapshot_id', - 'source_volid', 'volume_type', 'volume_type_id', - 'encryption_key_id']) - # This task requires the following inputs to operate (provided - # automatically to __call__(). This is done so that the flow can - # be reconstructed elsewhere and continue running (in the future). - # - # It also is used to be able to link tasks that produce items to tasks - # that consume items (thus allowing the linking of the flow to be - # mostly automatic). - self.requires.update(['availability_zone', 'image_id', 'metadata', - 'size', 'snapshot', 'source_volume', - 'volume_type', 'key_manager', - 'backup_source_volume']) + # This task will produce the following outputs (said outputs can be + # saved to durable storage in the future so that the flow can be + # reconstructed elsewhere and continued). + default_provides = set(['availability_zone', 'size', 'snapshot_id', + 'source_volid', 'volume_type', 'volume_type_id', + 'encryption_key_id']) + + def __init__(self, image_service, az_check_functor=None, **kwargs): + super(ExtractVolumeRequestTask, self).__init__(addons=[ACTION], + **kwargs) self.image_service = image_service self.az_check_functor = az_check_functor if not self.az_check_functor: @@ -451,9 +432,9 @@ class ExtractVolumeRequestTask(base.CinderTask): return volume_type_id - def __call__(self, context, size, snapshot, image_id, source_volume, - availability_zone, volume_type, metadata, - key_manager, backup_source_volume): + def execute(self, context, size, snapshot, image_id, source_volume, + availability_zone, volume_type, metadata, + key_manager, backup_source_volume): utils.check_exclusive_options(snapshot=snapshot, imageRef=image_id, @@ -519,16 +500,18 @@ class EntryCreateTask(base.CinderTask): Reversion strategy: remove the volume_id created from the database. """ - def __init__(self, db): - super(EntryCreateTask, self).__init__(addons=[ACTION]) - self.db = db - self.requires.update(['availability_zone', 'description', 'metadata', - 'name', 'reservations', 'size', 'snapshot_id', - 'source_volid', 'volume_type_id', - 'encryption_key_id']) - self.provides.update(['volume_properties', 'volume_id']) + default_provides = set(['volume_properties', 'volume_id', 'volume']) - def __call__(self, context, **kwargs): + def __init__(self, db): + requires = ['availability_zone', 'description', 'metadata', + 'name', 'reservations', 'size', 'snapshot_id', + 'source_volid', 'volume_type_id', 'encryption_key_id'] + super(EntryCreateTask, self).__init__(addons=[ACTION], + requires=requires) + self.db = db + self.provides.update() + + def execute(self, context, **kwargs): """Creates a database entry for the given inputs and returns details. Accesses the database and creates a new entry for the to be created @@ -569,9 +552,9 @@ class EntryCreateTask(base.CinderTask): 'volume': volume, } - def revert(self, context, result, cause): + def revert(self, context, result, **kwargs): # We never produced a result and therefore can't destroy anything. - if not result: + if isinstance(result, misc.Failure): return if context.quota_committed: # Committed quota doesn't rollback as the volume has already been @@ -603,12 +586,12 @@ class QuotaReserveTask(base.CinderTask): an automated or manual process. """ + default_provides = set(['reservations']) + def __init__(self): super(QuotaReserveTask, self).__init__(addons=[ACTION]) - self.requires.update(['size', 'volume_type_id']) - self.provides.update(['reservations']) - def __call__(self, context, size, volume_type_id): + def execute(self, context, size, volume_type_id): try: reserve_opts = {'volumes': 1, 'gigabytes': size} QUOTAS.add_volume_type_opts(context, reserve_opts, volume_type_id) @@ -648,15 +631,14 @@ class QuotaReserveTask(base.CinderTask): "already consumed)") LOG.warn(msg % {'s_pid': context.project_id, 'd_consumed': _consumed('volumes')}) - allowed = quotas['volumes'] raise exception.VolumeLimitExceeded(allowed=quotas['volumes']) else: # If nothing was reraised, ensure we reraise the initial error raise - def revert(self, context, result, cause): + def revert(self, context, result, **kwargs): # We never produced a result and therefore can't destroy anything. - if not result: + if isinstance(result, misc.Failure): return if context.quota_committed: # The reservations have already been committed and can not be @@ -691,16 +673,15 @@ class QuotaCommitTask(base.CinderTask): def __init__(self): super(QuotaCommitTask, self).__init__(addons=[ACTION]) - self.requires.update(['reservations', 'volume_properties']) - def __call__(self, context, reservations, volume_properties): + def execute(self, context, reservations, volume_properties): QUOTAS.commit(context, reservations) context.quota_committed = True return {'volume_properties': volume_properties} - def revert(self, context, result, cause): + def revert(self, context, result, **kwargs): # We never produced a result and therefore can't destroy anything. - if not result: + if isinstance(result, misc.Failure): return volume = result['volume_properties'] try: @@ -729,13 +710,14 @@ class VolumeCastTask(base.CinderTask): """ def __init__(self, scheduler_rpcapi, volume_rpcapi, db): - super(VolumeCastTask, self).__init__(addons=[ACTION]) + requires = ['image_id', 'scheduler_hints', 'snapshot_id', + 'source_volid', 'volume_id', 'volume_type', + 'volume_properties'] + super(VolumeCastTask, self).__init__(addons=[ACTION], + requires=requires) self.volume_rpcapi = volume_rpcapi self.scheduler_rpcapi = scheduler_rpcapi self.db = db - self.requires.update(['image_id', 'scheduler_hints', 'snapshot_id', - 'source_volid', 'volume_id', 'volume_type', - 'volume_properties']) def _cast_create_volume(self, context, request_spec, filter_properties): source_volid = request_spec['source_volid'] @@ -786,7 +768,7 @@ class VolumeCastTask(base.CinderTask): image_id=image_id, source_volid=source_volid) - def __call__(self, context, **kwargs): + def execute(self, context, **kwargs): scheduler_hints = kwargs.pop('scheduler_hints', None) request_spec = kwargs.copy() filter_properties = {} @@ -794,6 +776,20 @@ class VolumeCastTask(base.CinderTask): filter_properties['scheduler_hints'] = scheduler_hints self._cast_create_volume(context, request_spec, filter_properties) + def revert(self, context, result, flow_failures, **kwargs): + if isinstance(result, misc.Failure): + return + + # Restore the source volume status and set the volume to error status. + volume_id = kwargs['volume_id'] + _restore_source_status(context, self.db, kwargs) + _error_out_volume(context, self.db, volume_id) + LOG.error(_("Volume %s: create failed"), volume_id) + exc_info = False + if all(flow_failures[-1].exc_info): + exc_info = flow_failures[-1].exc_info + LOG.error(_('Unexpected build error:'), exc_info=exc_info) + class OnFailureChangeStatusTask(base.CinderTask): """Helper task that sets a volume id to status error. @@ -808,33 +804,24 @@ class OnFailureChangeStatusTask(base.CinderTask): def __init__(self, db): super(OnFailureChangeStatusTask, self).__init__(addons=[ACTION]) self.db = db - self.requires.update(['volume_id']) - self.optional.update(['volume_spec']) - def __call__(self, context, volume_id, volume_spec=None): + def execute(self, context, volume_id, volume_spec): # Save these items since we only use them if a reversion is triggered. return { 'volume_id': volume_id, 'volume_spec': volume_spec, } - def revert(self, context, result, cause): + def revert(self, context, result, flow_failures, **kwargs): + if isinstance(result, misc.Failure): + return volume_spec = result.get('volume_spec') - if not volume_spec: - # Attempt to use it from a later task that *should* have populated - # this from the database. It is not needed to be found since - # reverting will continue without it. - volume_spec = _find_result_spec(cause.flow) # Restore the source volume status and set the volume to error status. volume_id = result['volume_id'] _restore_source_status(context, self.db, volume_spec) - _error_out_volume(context, self.db, volume_id, reason=cause.exc) + _error_out_volume(context, self.db, volume_id) LOG.error(_("Volume %s: create failed"), volume_id) - exc_info = False - if all(cause.exc_info): - exc_info = cause.exc_info - LOG.error(_('Unexpected build error:'), exc_info=exc_info) class OnFailureRescheduleTask(base.CinderTask): @@ -846,10 +833,10 @@ class OnFailureRescheduleTask(base.CinderTask): """ def __init__(self, reschedule_context, db, scheduler_rpcapi): - super(OnFailureRescheduleTask, self).__init__(addons=[ACTION]) - self.requires.update(['filter_properties', 'image_id', 'request_spec', - 'snapshot_id', 'volume_id']) - self.optional.update(['volume_spec']) + requires = ['filter_properties', 'image_id', 'request_spec', + 'snapshot_id', 'volume_id', 'context'] + super(OnFailureRescheduleTask, self).__init__(addons=[ACTION], + requires=requires) self.scheduler_rpcapi = scheduler_rpcapi self.db = db self.reschedule_context = reschedule_context @@ -878,27 +865,8 @@ class OnFailureRescheduleTask(base.CinderTask): exception.ImageUnacceptable, ] - def _is_reschedulable(self, cause): - # Figure out the type of the causes exception and compare it against - # our black-list of exception types that will not cause rescheduling. - exc_type, value = cause.exc_info[:2] - # If we don't have a type from exc_info but we do have a exception in - # the cause, try to get the type from that instead. - if not value: - value = cause.exc - if not exc_type and value: - exc_type = type(value) - if exc_type and exc_type in self.no_reschedule_types: - return False - # Couldn't figure it out, by default assume whatever the cause was can - # be fixed by rescheduling. - # - # NOTE(harlowja): Crosses fingers. - return True - - def __call__(self, context, *args, **kwargs): - # Save these items since we only use them if a reversion is triggered. - return kwargs.copy() + def execute(self, **kwargs): + pass def _reschedule(self, context, cause, request_spec, filter_properties, snapshot_id, image_id, volume_id, **kwargs): @@ -919,7 +887,7 @@ class OnFailureRescheduleTask(base.CinderTask): {'volume_id': volume_id, 'method': _make_pretty_name(create_volume), 'num': num_attempts, - 'reason': _exception_to_unicode(cause.exc)}) + 'reason': cause.exception_str}) if all(cause.exc_info): # Stringify to avoid circular ref problem in json serialization @@ -958,83 +926,23 @@ class OnFailureRescheduleTask(base.CinderTask): LOG.exception(_("Volume %s: resetting 'creating' status failed"), volume_id) - def revert(self, context, result, cause): - volume_spec = result.get('volume_spec') - if not volume_spec: - # Find it from a prior task that populated this from the database. - volume_spec = _find_result_spec(cause.flow) - volume_id = result['volume_id'] + def revert(self, context, result, flow_failures, **kwargs): + # Check if we have a cause which can tell us not to reschedule. + for failure in flow_failures.values(): + if failure.check(self.no_reschedule_types): + return + volume_id = kwargs['volume_id'] # Use a different context when rescheduling. if self.reschedule_context: context = self.reschedule_context - - # If we are now supposed to reschedule (or unable to), then just - # restore the source volume status and set the volume to error status. - def do_error_revert(): - LOG.debug(_("Failing volume %s creation by altering volume status" - " instead of rescheduling"), volume_id) - _restore_source_status(context, self.db, volume_spec) - _error_out_volume(context, self.db, volume_id, reason=cause.exc) - LOG.error(_("Volume %s: create failed"), volume_id) - - # Check if we have a cause which can tell us not to reschedule. - if not self._is_reschedulable(cause): - do_error_revert() - else: try: + cause = list(flow_failures.values())[0] self._pre_reschedule(context, volume_id) - self._reschedule(context, cause, **result) + self._reschedule(context, cause, **kwargs) self._post_reschedule(context, volume_id) except exception.CinderException: LOG.exception(_("Volume %s: rescheduling failed"), volume_id) - # NOTE(harlowja): Do error volume status changing instead. - do_error_revert() - exc_info = False - if all(cause.exc_info): - exc_info = cause.exc_info - LOG.error(_('Unexpected build error:'), exc_info=exc_info) - - -class NotifySchedulerFailureTask(base.CinderTask): - """Helper task that notifies some external service on failure. - - Reversion strategy: On failure of any flow that includes this task the - request specification associated with that flow will be extracted and - sent as a payload to the notification service under the given methods - scheduler topic. - """ - - def __init__(self, method): - super(NotifySchedulerFailureTask, self).__init__(addons=[ACTION]) - self.requires.update(['request_spec', 'volume_id']) - self.method = method - self.topic = 'scheduler.%s' % self.method - self.publisher_id = notifier.publisher_id("scheduler") - - def __call__(self, context, **kwargs): - # Save these items since we only use them if a reversion is triggered. - return kwargs.copy() - - def revert(self, context, result, cause): - request_spec = result['request_spec'] - volume_id = result['volume_id'] - volume_properties = request_spec['volume_properties'] - payload = { - 'request_spec': request_spec, - 'volume_properties': volume_properties, - 'volume_id': volume_id, - 'state': 'error', - 'method': self.method, - 'reason': unicode(cause.exc), - } - try: - notifier.notify(context, self.publisher_id, self.topic, - notifier.ERROR, payload) - except exception.CinderException: - LOG.exception(_("Failed notifying on %(topic)s " - "payload %(payload)s") % {'topic': self.topic, - 'payload': payload}) class ExtractSchedulerSpecTask(base.CinderTask): @@ -1043,12 +951,12 @@ class ExtractSchedulerSpecTask(base.CinderTask): Reversion strategy: N/A """ - def __init__(self, db): - super(ExtractSchedulerSpecTask, self).__init__(addons=[ACTION]) + default_provides = set(['request_spec']) + + def __init__(self, db, **kwargs): + super(ExtractSchedulerSpecTask, self).__init__(addons=[ACTION], + **kwargs) self.db = db - self.requires.update(['image_id', 'request_spec', 'snapshot_id', - 'volume_id']) - self.provides.update(['request_spec']) def _populate_request_spec(self, context, volume_id, snapshot_id, image_id): @@ -1077,8 +985,8 @@ class ExtractSchedulerSpecTask(base.CinderTask): 'volume_type': list(dict(vol_type).iteritems()), } - def __call__(self, context, request_spec, volume_id, snapshot_id, - image_id): + def execute(self, context, request_spec, volume_id, snapshot_id, + image_id): # For RPC version < 1.2 backward compatibility if request_spec is None: request_spec = self._populate_request_spec(context, volume_id, @@ -1088,6 +996,33 @@ class ExtractSchedulerSpecTask(base.CinderTask): } +class ExtractVolumeRefTask(base.CinderTask): + """Extracts volume reference for given volume id. """ + + default_provides = 'volume_ref' + + def __init__(self, db): + super(ExtractVolumeRefTask, self).__init__(addons=[ACTION]) + self.db = db + + def execute(self, context, volume_id): + # NOTE(harlowja): this will fetch the volume from the database, if + # the volume has been deleted before we got here then this should fail. + # + # In the future we might want to have a lock on the volume_id so that + # the volume can not be deleted while its still being created? + volume_ref = self.db.volume_get(context, volume_id) + + return volume_ref + + def revert(self, context, volume_id, result, **kwargs): + if isinstance(result, misc.Failure): + return + + _error_out_volume(context, self.db, volume_id) + LOG.error(_("Volume %s: create failed"), volume_id) + + class ExtractVolumeSpecTask(base.CinderTask): """Extracts a spec of a volume to be created into a common structure. @@ -1099,22 +1034,17 @@ class ExtractVolumeSpecTask(base.CinderTask): Reversion strategy: N/A """ - def __init__(self, db): - super(ExtractVolumeSpecTask, self).__init__(addons=[ACTION]) - self.db = db - self.requires.update(['filter_properties', 'image_id', 'snapshot_id', - 'source_volid', 'volume_id']) - self.provides.update(['volume_spec', 'volume_ref']) + default_provides = 'volume_spec' - def __call__(self, context, volume_id, **kwargs): + def __init__(self, db): + requires = ['image_id', 'snapshot_id', 'source_volid'] + super(ExtractVolumeSpecTask, self).__init__(addons=[ACTION], + requires=requires) + self.db = db + + def execute(self, context, volume_ref, **kwargs): get_remote_image_service = glance.get_remote_image_service - # NOTE(harlowja): this will fetch the volume from the database, if - # the volume has been deleted before we got here then this should fail. - # - # In the future we might want to have a lock on the volume_id so that - # the volume can not be deleted while its still being created? - volume_ref = self.db.volume_get(context, volume_id) volume_name = volume_ref['name'] volume_size = utils.as_int(volume_ref['size'], quiet=False) @@ -1173,21 +1103,14 @@ class ExtractVolumeSpecTask(base.CinderTask): 'image_service': image_service, }) - return { - 'volume_spec': specs, - # NOTE(harlowja): it appears like further usage of this volume_ref - # result actually depend on it being a sqlalchemy object and not - # just a plain dictionary so thats why we are storing this here. - # - # It was attempted to refetch it when needed in subsequent tasks, - # but that caused sqlalchemy errors to occur (volume already open - # or similar). - # - # In the future where this task could fail and be recovered from we - # will need to store the volume_spec and recreate the volume_ref - # on demand. - 'volume_ref': volume_ref, - } + return specs + + def revert(self, context, result, **kwargs): + if isinstance(result, misc.Failure): + return + volume_spec = result.get('volume_spec') + # Restore the source volume status and set the volume to error status. + _restore_source_status(context, self.db, volume_spec) class NotifyVolumeActionTask(base.CinderTask): @@ -1199,12 +1122,11 @@ class NotifyVolumeActionTask(base.CinderTask): def __init__(self, db, host, event_suffix): super(NotifyVolumeActionTask, self).__init__(addons=[ACTION, event_suffix]) - self.requires.update(['volume_ref']) self.db = db self.event_suffix = event_suffix self.host = host - def __call__(self, context, volume_ref): + def execute(self, context, volume_ref): volume_id = volume_ref['id'] try: volume_utils.notify_about_volume_usage(context, volume_ref, @@ -1226,11 +1148,12 @@ class CreateVolumeFromSpecTask(base.CinderTask): Reversion strategy: N/A """ + default_provides = 'volume' + def __init__(self, db, host, driver): super(CreateVolumeFromSpecTask, self).__init__(addons=[ACTION]) self.db = db self.driver = driver - self.requires.update(['volume_spec', 'volume_ref']) # This maps the different volume specification types into the methods # that can create said volume type (aka this is a jump table). self._create_func_mapping = { @@ -1472,7 +1395,7 @@ class CreateVolumeFromSpecTask(base.CinderTask): def _create_raw_volume(self, context, volume_ref, **kwargs): return self.driver.create_volume(volume_ref) - def __call__(self, context, volume_ref, volume_spec): + def execute(self, context, volume_ref, volume_spec): # we can't do anything if the driver didn't init if not self.driver.initialized: LOG.error(_("Unable to create volume, driver not initialized")) @@ -1538,6 +1461,8 @@ class CreateVolumeFromSpecTask(base.CinderTask): {'volume_id': volume_id, 'model': model_update}) raise exception.ExportFailure(reason=ex) + return volume_ref + class CreateVolumeOnFinishTask(NotifyVolumeActionTask): """On successful volume creation this will perform final volume actions. @@ -1553,13 +1478,12 @@ class CreateVolumeOnFinishTask(NotifyVolumeActionTask): def __init__(self, db, host, event_suffix): super(CreateVolumeOnFinishTask, self).__init__(db, host, event_suffix) - self.requires.update(['volume_spec']) self.status_translation = { 'migration_target_creating': 'migration_target', } - def __call__(self, context, volume_ref, volume_spec): - volume_id = volume_ref['id'] + def execute(self, context, volume, volume_spec): + volume_id = volume['id'] new_status = self.status_translation.get(volume_spec.get('status'), 'available') update = { @@ -1573,7 +1497,7 @@ class CreateVolumeOnFinishTask(NotifyVolumeActionTask): # 'building' if this fails)?? volume_ref = self.db.volume_update(context, volume_id, update) # Now use the parent to notify. - super(CreateVolumeOnFinishTask, self).__call__(context, volume_ref) + super(CreateVolumeOnFinishTask, self).execute(context, volume_ref) except exception.CinderException: LOG.exception(_("Failed updating volume %(volume_id)s with " "%(update)s") % {'volume_id': volume_id, @@ -1605,31 +1529,26 @@ def get_api_flow(scheduler_rpcapi, volume_rpcapi, db, flow_name = ACTION.replace(":", "_") + "_api" api_flow = linear_flow.Flow(flow_name) - # This injects the initial starting flow values into the workflow so that - # the dependency order of the tasks provides/requires can be correctly - # determined. - api_flow.add(base.InjectTask(create_what, addons=[ACTION])) - api_flow.add(ExtractVolumeRequestTask(image_service, - az_check_functor)) - api_flow.add(QuotaReserveTask()) - v_uuid = api_flow.add(EntryCreateTask(db)) - api_flow.add(QuotaCommitTask()) - - # If after committing something fails, ensure we set the db to failure - # before reverting any prior tasks. - api_flow.add(OnFailureChangeStatusTask(db)) + api_flow.add(ExtractVolumeRequestTask( + image_service, + az_check_functor, + rebind={'size': 'raw_size', + 'availability_zone': 'raw_availability_zone', + 'volume_type': 'raw_volume_type'})) + api_flow.add(QuotaReserveTask(), + EntryCreateTask(db), + QuotaCommitTask()) # This will cast it out to either the scheduler or volume manager via # the rpc apis provided. api_flow.add(VolumeCastTask(scheduler_rpcapi, volume_rpcapi, db)) - # Note(harlowja): this will return the flow as well as the uuid of the - # task which will produce the 'volume' database reference (since said - # reference is returned to other callers in the api for further usage). - return (flow_utils.attach_debug_listeners(api_flow), v_uuid) + # Now load (but do not run) the flow using the provided initial data. + return taskflow.engines.load(api_flow, store=create_what) -def get_scheduler_flow(db, driver, request_spec=None, filter_properties=None, +def get_scheduler_flow(context, db, driver, request_spec=None, + filter_properties=None, volume_id=None, snapshot_id=None, image_id=None): """Constructs and returns the scheduler entrypoint flow. @@ -1643,29 +1562,23 @@ def get_scheduler_flow(db, driver, request_spec=None, filter_properties=None, 4. Uses provided driver to to then select and continue processing of volume request. """ - - flow_name = ACTION.replace(":", "_") + "_scheduler" - scheduler_flow = linear_flow.Flow(flow_name) - - # This injects the initial starting flow values into the workflow so that - # the dependency order of the tasks provides/requires can be correctly - # determined. - scheduler_flow.add(base.InjectTask({ - 'request_spec': request_spec, + create_what = { + 'context': context, + 'raw_request_spec': request_spec, 'filter_properties': filter_properties, 'volume_id': volume_id, 'snapshot_id': snapshot_id, 'image_id': image_id, - }, addons=[ACTION])) + } + + flow_name = ACTION.replace(":", "_") + "_scheduler" + scheduler_flow = linear_flow.Flow(flow_name) # This will extract and clean the spec from the starting values. - scheduler_flow.add(ExtractSchedulerSpecTask(db)) + scheduler_flow.add(ExtractSchedulerSpecTask( + db, + rebind={'request_spec': 'raw_request_spec'})) - # The decorator application here ensures that the method gets the right - # requires attributes automatically by examining the underlying functions - # arguments. - - @decorators.task def schedule_create_volume(context, request_spec, filter_properties): def _log_failure(cause): @@ -1711,16 +1624,16 @@ def get_scheduler_flow(db, driver, request_spec=None, filter_properties=None, _log_failure(e) _error_out_volume(context, db, volume_id, reason=e) - scheduler_flow.add(schedule_create_volume) + scheduler_flow.add(task.FunctorTask(schedule_create_volume)) - return flow_utils.attach_debug_listeners(scheduler_flow) + # Now load (but do not run) the flow using the provided initial data. + return taskflow.engines.load(scheduler_flow, store=create_what) -def get_manager_flow(db, driver, scheduler_rpcapi, host, volume_id, - request_spec=None, filter_properties=None, - allow_reschedule=True, - snapshot_id=None, image_id=None, source_volid=None, - reschedule_context=None): +def get_manager_flow(context, db, driver, scheduler_rpcapi, host, volume_id, + allow_reschedule, reschedule_context, request_spec, + filter_properties, snapshot_id=None, image_id=None, + source_volid=None): """Constructs and returns the manager entrypoint flow. This flow will do the following: @@ -1739,44 +1652,29 @@ def get_manager_flow(db, driver, scheduler_rpcapi, host, volume_id, flow_name = ACTION.replace(":", "_") + "_manager" volume_flow = linear_flow.Flow(flow_name) - # Determine if we are allowed to reschedule since this affects how - # failures will be handled. - if not filter_properties: - filter_properties = {} - if not request_spec and allow_reschedule: - LOG.debug(_("No request spec, will not reschedule")) - allow_reschedule = False - if not filter_properties.get('retry', None) and allow_reschedule: - LOG.debug(_("No retry filter property or associated " - "retry info, will not reschedule")) - allow_reschedule = False - # This injects the initial starting flow values into the workflow so that # the dependency order of the tasks provides/requires can be correctly # determined. - volume_flow.add(base.InjectTask({ + create_what = { + 'context': context, 'filter_properties': filter_properties, 'image_id': image_id, 'request_spec': request_spec, 'snapshot_id': snapshot_id, 'source_volid': source_volid, 'volume_id': volume_id, - }, addons=[ACTION])) + } - # We can actually just check if we should reschedule on failure ahead of - # time instead of trying to determine this later, certain values are needed - # to reschedule and without them we should just avoid rescheduling. - if not allow_reschedule: - # On failure ensure that we just set the volume status to error. - LOG.debug(_("Retry info not present, will not reschedule")) - volume_flow.add(OnFailureChangeStatusTask(db)) - else: + volume_flow.add(ExtractVolumeRefTask(db)) + + if allow_reschedule and request_spec: volume_flow.add(OnFailureRescheduleTask(reschedule_context, db, scheduler_rpcapi)) - volume_flow.add(ExtractVolumeSpecTask(db)) - volume_flow.add(NotifyVolumeActionTask(db, host, "create.start")) - volume_flow.add(CreateVolumeFromSpecTask(db, host, driver)) - volume_flow.add(CreateVolumeOnFinishTask(db, host, "create.end")) + volume_flow.add(ExtractVolumeSpecTask(db), + NotifyVolumeActionTask(db, host, "create.start"), + CreateVolumeFromSpecTask(db, host, driver), + CreateVolumeOnFinishTask(db, host, "create.end")) - return flow_utils.attach_debug_listeners(volume_flow) + # Now load (but do not run) the flow using the provided initial data. + return taskflow.engines.load(volume_flow, store=create_what) diff --git a/cinder/volume/flows/utils.py b/cinder/volume/flows/utils.py deleted file mode 100644 index 6c6677e1547..00000000000 --- a/cinder/volume/flows/utils.py +++ /dev/null @@ -1,55 +0,0 @@ -# -*- coding: utf-8 -*- - -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved. -# Copyright (c) 2013 OpenStack Foundation -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from cinder.openstack.common import log as logging - -LOG = logging.getLogger(__name__) - - -def attach_debug_listeners(flow): - """Sets up a nice set of debug listeners for the flow. - - These listeners will log when tasks/flows are transitioning from state to - state so that said states can be seen in the debug log output which is very - useful for figuring out where problems are occurring. - """ - - def flow_log_change(state, details): - # TODO(harlowja): the bug 1214083 is causing problems - LOG.debug(_("%(flow)s has moved into state %(state)s from state" - " %(old_state)s") % {'state': state, - 'old_state': details.get('old_state'), - 'flow': str(details['flow'])}) - - def task_log_change(state, details): - # TODO(harlowja): the bug 1214083 is causing problems - LOG.debug(_("%(flow)s has moved %(runner)s into state %(state)s with" - " result: %(result)s") % {'state': state, - 'flow': str(details['flow']), - 'runner': str(details['runner']), - 'result': details.get('result')}) - - # Register * for all state changes (and not selective state changes to be - # called upon) since all the changes is more useful. - flow.notifier.register('*', flow_log_change) - flow.task_notifier.register('*', task_log_change) - return flow diff --git a/cinder/volume/manager.py b/cinder/volume/manager.py index a0f4e071d5a..9d23997096e 100644 --- a/cinder/volume/manager.py +++ b/cinder/volume/manager.py @@ -61,8 +61,6 @@ from cinder.volume import rpcapi as volume_rpcapi from cinder.volume import utils as volume_utils from cinder.volume import volume_types -from cinder.taskflow import states - from eventlet.greenpool import GreenPool LOG = logging.getLogger(__name__) @@ -289,23 +287,31 @@ class VolumeManager(manager.SchedulerDependentManager): def create_volume(self, context, volume_id, request_spec=None, filter_properties=None, allow_reschedule=True, snapshot_id=None, image_id=None, source_volid=None): + """Creates and exports the volume.""" + context_saved = context.deepcopy() + context = context.elevated() + if filter_properties is None: + filter_properties = {} - flow = create_volume.get_manager_flow( - self.db, - self.driver, - self.scheduler_rpcapi, - self.host, - volume_id, - request_spec=request_spec, - filter_properties=filter_properties, - allow_reschedule=allow_reschedule, - snapshot_id=snapshot_id, - image_id=image_id, - source_volid=source_volid, - reschedule_context=context.deepcopy()) - - assert flow, _('Manager volume flow not retrieved') + try: + flow_engine = create_volume.get_manager_flow( + context, + self.db, + self.driver, + self.scheduler_rpcapi, + self.host, + volume_id, + snapshot_id=snapshot_id, + image_id=image_id, + source_volid=source_volid, + allow_reschedule=allow_reschedule, + reschedule_context=context_saved, + request_spec=request_spec, + filter_properties=filter_properties) + except Exception: + raise exception.CinderException( + _("Failed to create manager volume flow")) if snapshot_id is not None: # Make sure the snapshot is not deleted until we are done with it. @@ -317,11 +323,11 @@ class VolumeManager(manager.SchedulerDependentManager): locked_action = None def _run_flow(): - flow.run(context.elevated()) - if flow.state != states.SUCCESS: - msg = _("Failed to successfully complete manager volume " - "workflow") - raise exception.CinderException(msg) + # This code executes create volume flow. If something goes wrong, + # flow reverts all job that was done and reraises an exception. + # Otherwise, all data that was generated by flow becomes available + # in flow engine's storage. + flow_engine.run() @utils.synchronized(locked_action, external=True) def _run_flow_locked(): @@ -332,8 +338,9 @@ class VolumeManager(manager.SchedulerDependentManager): else: _run_flow_locked() - self._reset_stats() - return volume_id + # Fetch created volume from storage + volume_ref = flow_engine.storage.fetch('volume') + return volume_ref['id'] @utils.require_driver_initialized @locked_volume_operation diff --git a/requirements.txt b/requirements.txt index d029aaea81e..cef4ecd3e17 100644 --- a/requirements.txt +++ b/requirements.txt @@ -18,6 +18,7 @@ python-keystoneclient>=0.4.1 python-novaclient>=2.15.0 python-swiftclient>=1.5 Routes>=1.12.3 +taskflow>=0.1.1,<0.2 rtslib-fb>=2.1.39 six>=1.4.1 SQLAlchemy>=0.7.8,<=0.7.99 diff --git a/taskflow.conf b/taskflow.conf deleted file mode 100644 index a871ff71823..00000000000 --- a/taskflow.conf +++ /dev/null @@ -1,7 +0,0 @@ -[DEFAULT] - -# The list of primitives to copy from taskflow -primitives=flow.linear_flow,task - -# The base module to hold the copy of taskflow -base=cinder