Merge "Switch create volume commands to Taskflow 0.1.1"
This commit is contained in:
commit
9d0998791a
@ -34,7 +34,6 @@ from cinder.openstack.common.notifier import api as notifier
|
||||
from cinder.volume.flows import create_volume
|
||||
from cinder.volume import rpcapi as volume_rpcapi
|
||||
|
||||
from cinder.taskflow import states
|
||||
|
||||
scheduler_driver_opt = cfg.StrOpt('scheduler_driver',
|
||||
default='cinder.scheduler.filter_scheduler.'
|
||||
@ -76,17 +75,18 @@ class SchedulerManager(manager.Manager):
|
||||
image_id=None, request_spec=None,
|
||||
filter_properties=None):
|
||||
|
||||
flow = create_volume.get_scheduler_flow(db, self.driver,
|
||||
try:
|
||||
flow_engine = create_volume.get_scheduler_flow(context,
|
||||
db, self.driver,
|
||||
request_spec,
|
||||
filter_properties,
|
||||
volume_id, snapshot_id,
|
||||
volume_id,
|
||||
snapshot_id,
|
||||
image_id)
|
||||
assert flow, _('Schedule volume flow not retrieved')
|
||||
|
||||
flow.run(context)
|
||||
if flow.state != states.SUCCESS:
|
||||
LOG.warn(_("Failed to successfully complete"
|
||||
" schedule volume using flow: %s"), flow)
|
||||
except Exception:
|
||||
raise exception.CinderException(
|
||||
_("Failed to create scheduler manager volume flow"))
|
||||
flow_engine.run()
|
||||
|
||||
def request_service_capabilities(self, context):
|
||||
volume_rpcapi.VolumeAPI().publish_service_capabilities(context)
|
||||
|
@ -1 +0,0 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
@ -1,276 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import collections
|
||||
import functools
|
||||
import inspect
|
||||
import types
|
||||
|
||||
# These arguments are ones that we will skip when parsing for requirements
|
||||
# for a function to operate (when used as a task).
|
||||
AUTO_ARGS = ('self', 'context', 'cls')
|
||||
|
||||
|
||||
def is_decorated(functor):
|
||||
if not isinstance(functor, (types.MethodType, types.FunctionType)):
|
||||
return False
|
||||
return getattr(extract(functor), '__task__', False)
|
||||
|
||||
|
||||
def extract(functor):
|
||||
# Extract the underlying functor if its a method since we can not set
|
||||
# attributes on instance methods, this is supposedly fixed in python 3
|
||||
# and later.
|
||||
#
|
||||
# TODO(harlowja): add link to this fix.
|
||||
assert isinstance(functor, (types.MethodType, types.FunctionType))
|
||||
if isinstance(functor, types.MethodType):
|
||||
return functor.__func__
|
||||
else:
|
||||
return functor
|
||||
|
||||
|
||||
def _mark_as_task(functor):
|
||||
setattr(functor, '__task__', True)
|
||||
|
||||
|
||||
def _get_wrapped(function):
|
||||
"""Get the method at the bottom of a stack of decorators."""
|
||||
|
||||
if hasattr(function, '__wrapped__'):
|
||||
return getattr(function, '__wrapped__')
|
||||
|
||||
if not hasattr(function, 'func_closure') or not function.func_closure:
|
||||
return function
|
||||
|
||||
def _get_wrapped_function(function):
|
||||
if not hasattr(function, 'func_closure') or not function.func_closure:
|
||||
return None
|
||||
|
||||
for closure in function.func_closure:
|
||||
func = closure.cell_contents
|
||||
|
||||
deeper_func = _get_wrapped_function(func)
|
||||
if deeper_func:
|
||||
return deeper_func
|
||||
elif hasattr(closure.cell_contents, '__call__'):
|
||||
return closure.cell_contents
|
||||
|
||||
return _get_wrapped_function(function)
|
||||
|
||||
|
||||
def _take_arg(a):
|
||||
if a in AUTO_ARGS:
|
||||
return False
|
||||
# In certain decorator cases it seems like we get the function to be
|
||||
# decorated as an argument, we don't want to take that as a real argument.
|
||||
if isinstance(a, collections.Callable):
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def wraps(fn):
|
||||
"""This will not be needed in python 3.2 or greater which already has this
|
||||
built-in to its functools.wraps method.
|
||||
"""
|
||||
|
||||
def wrapper(f):
|
||||
f = functools.wraps(fn)(f)
|
||||
f.__wrapped__ = getattr(fn, '__wrapped__', fn)
|
||||
return f
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
def locked(f):
|
||||
|
||||
@wraps(f)
|
||||
def wrapper(self, *args, **kwargs):
|
||||
with self._lock:
|
||||
return f(self, *args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
|
||||
def task(*args, **kwargs):
|
||||
"""Decorates a given function and ensures that all needed attributes of
|
||||
that function are set so that the function can be used as a task.
|
||||
"""
|
||||
|
||||
def decorator(f):
|
||||
w_f = extract(f)
|
||||
|
||||
def noop(*args, **kwargs):
|
||||
pass
|
||||
|
||||
# Mark as being a task
|
||||
_mark_as_task(w_f)
|
||||
|
||||
# By default don't revert this.
|
||||
w_f.revert = kwargs.pop('revert_with', noop)
|
||||
|
||||
# Associate a name of this task that is the module + function name.
|
||||
w_f.name = "%s.%s" % (f.__module__, f.__name__)
|
||||
|
||||
# Sets the version of the task.
|
||||
version = kwargs.pop('version', (1, 0))
|
||||
f = _versionize(*version)(f)
|
||||
|
||||
# Attach any requirements this function needs for running.
|
||||
requires_what = kwargs.pop('requires', [])
|
||||
f = _requires(*requires_what, **kwargs)(f)
|
||||
|
||||
# Attach any optional requirements this function needs for running.
|
||||
optional_what = kwargs.pop('optional', [])
|
||||
f = _optional(*optional_what, **kwargs)(f)
|
||||
|
||||
# Attach any items this function provides as output
|
||||
provides_what = kwargs.pop('provides', [])
|
||||
f = _provides(*provides_what, **kwargs)(f)
|
||||
|
||||
@wraps(f)
|
||||
def wrapper(*args, **kwargs):
|
||||
return f(*args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
# This is needed to handle when the decorator has args or the decorator
|
||||
# doesn't have args, python is rather weird here...
|
||||
if kwargs or not args:
|
||||
return decorator
|
||||
else:
|
||||
if isinstance(args[0], collections.Callable):
|
||||
return decorator(args[0])
|
||||
else:
|
||||
return decorator
|
||||
|
||||
|
||||
def _versionize(major, minor=None):
|
||||
"""A decorator that marks the wrapped function with a major & minor version
|
||||
number.
|
||||
"""
|
||||
|
||||
if minor is None:
|
||||
minor = 0
|
||||
|
||||
def decorator(f):
|
||||
w_f = extract(f)
|
||||
w_f.version = (major, minor)
|
||||
|
||||
@wraps(f)
|
||||
def wrapper(*args, **kwargs):
|
||||
return f(*args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
return decorator
|
||||
|
||||
|
||||
def _optional(*args, **kwargs):
|
||||
"""Attaches a set of items that the decorated function would like as input
|
||||
to the functions underlying dictionary.
|
||||
"""
|
||||
|
||||
def decorator(f):
|
||||
w_f = extract(f)
|
||||
|
||||
if not hasattr(w_f, 'optional'):
|
||||
w_f.optional = set()
|
||||
|
||||
w_f.optional.update([a for a in args if _take_arg(a)])
|
||||
|
||||
@wraps(f)
|
||||
def wrapper(*args, **kwargs):
|
||||
return f(*args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
# This is needed to handle when the decorator has args or the decorator
|
||||
# doesn't have args, python is rather weird here...
|
||||
if kwargs or not args:
|
||||
return decorator
|
||||
else:
|
||||
if isinstance(args[0], collections.Callable):
|
||||
return decorator(args[0])
|
||||
else:
|
||||
return decorator
|
||||
|
||||
|
||||
def _requires(*args, **kwargs):
|
||||
"""Attaches a set of items that the decorated function requires as input
|
||||
to the functions underlying dictionary.
|
||||
"""
|
||||
|
||||
def decorator(f):
|
||||
w_f = extract(f)
|
||||
|
||||
if not hasattr(w_f, 'requires'):
|
||||
w_f.requires = set()
|
||||
|
||||
if kwargs.pop('auto_extract', True):
|
||||
inspect_what = _get_wrapped(f)
|
||||
f_args = inspect.getargspec(inspect_what).args
|
||||
w_f.requires.update([a for a in f_args if _take_arg(a)])
|
||||
|
||||
w_f.requires.update([a for a in args if _take_arg(a)])
|
||||
|
||||
@wraps(f)
|
||||
def wrapper(*args, **kwargs):
|
||||
return f(*args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
# This is needed to handle when the decorator has args or the decorator
|
||||
# doesn't have args, python is rather weird here...
|
||||
if kwargs or not args:
|
||||
return decorator
|
||||
else:
|
||||
if isinstance(args[0], collections.Callable):
|
||||
return decorator(args[0])
|
||||
else:
|
||||
return decorator
|
||||
|
||||
|
||||
def _provides(*args, **kwargs):
|
||||
"""Attaches a set of items that the decorated function provides as output
|
||||
to the functions underlying dictionary.
|
||||
"""
|
||||
|
||||
def decorator(f):
|
||||
w_f = extract(f)
|
||||
|
||||
if not hasattr(f, 'provides'):
|
||||
w_f.provides = set()
|
||||
|
||||
w_f.provides.update([a for a in args if _take_arg(a)])
|
||||
|
||||
@wraps(f)
|
||||
def wrapper(*args, **kwargs):
|
||||
return f(*args, **kwargs)
|
||||
|
||||
return wrapper
|
||||
|
||||
# This is needed to handle when the decorator has args or the decorator
|
||||
# doesn't have args, python is rather weird here...
|
||||
if kwargs or not args:
|
||||
return decorator
|
||||
else:
|
||||
if isinstance(args[0], collections.Callable):
|
||||
return decorator(args[0])
|
||||
else:
|
||||
return decorator
|
@ -1,69 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
|
||||
class TaskFlowException(Exception):
|
||||
"""Base class for exceptions emitted from this library."""
|
||||
pass
|
||||
|
||||
|
||||
class Duplicate(TaskFlowException):
|
||||
"""Raised when a duplicate entry is found."""
|
||||
pass
|
||||
|
||||
|
||||
class NotFound(TaskFlowException):
|
||||
"""Raised when some entry in some object doesn't exist."""
|
||||
pass
|
||||
|
||||
|
||||
class AlreadyExists(TaskFlowException):
|
||||
"""Raised when some entry in some object already exists."""
|
||||
pass
|
||||
|
||||
|
||||
class ClosedException(TaskFlowException):
|
||||
"""Raised when an access on a closed object occurs."""
|
||||
pass
|
||||
|
||||
|
||||
class InvalidStateException(TaskFlowException):
|
||||
"""Raised when a task/job/workflow is in an invalid state when an
|
||||
operation is attempting to apply to said task/job/workflow.
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class UnclaimableJobException(TaskFlowException):
|
||||
"""Raised when a job can not be claimed."""
|
||||
pass
|
||||
|
||||
|
||||
class JobNotFound(TaskFlowException):
|
||||
"""Raised when a job entry can not be found."""
|
||||
pass
|
||||
|
||||
|
||||
class MissingDependencies(InvalidStateException):
|
||||
"""Raised when a task has dependencies that can not be satisfied."""
|
||||
message = ("%(task)s requires %(requirements)s but no other task produces"
|
||||
" said requirements")
|
||||
|
||||
def __init__(self, task, requirements):
|
||||
message = self.message % {'task': task, 'requirements': requirements}
|
||||
super(MissingDependencies, self).__init__(message)
|
@ -1 +0,0 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
@ -1,215 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import abc
|
||||
import threading
|
||||
import uuid as uuidlib
|
||||
|
||||
import six
|
||||
|
||||
|
||||
from cinder.taskflow import decorators
|
||||
from cinder.taskflow import exceptions as exc
|
||||
from cinder.taskflow import states
|
||||
from cinder.taskflow import utils
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class Flow(object):
|
||||
"""The base abstract class of all flow implementations.
|
||||
|
||||
It provides a set of parents to flows that have a concept of parent flows
|
||||
as well as a state and state utility functions to the deriving classes. It
|
||||
also provides a name and an identifier (uuid or other) to the flow so that
|
||||
it can be uniquely identifed among many flows.
|
||||
|
||||
Flows are expected to provide (if desired) the following methods:
|
||||
- add
|
||||
- add_many
|
||||
- interrupt
|
||||
- reset
|
||||
- rollback
|
||||
- run
|
||||
- soft_reset
|
||||
"""
|
||||
|
||||
# Common states that certain actions can be performed in. If the flow
|
||||
# is not in these sets of states then it is likely that the flow operation
|
||||
# can not succeed.
|
||||
RESETTABLE_STATES = set([
|
||||
states.INTERRUPTED,
|
||||
states.SUCCESS,
|
||||
states.PENDING,
|
||||
states.FAILURE,
|
||||
])
|
||||
SOFT_RESETTABLE_STATES = set([
|
||||
states.INTERRUPTED,
|
||||
])
|
||||
UNINTERRUPTIBLE_STATES = set([
|
||||
states.FAILURE,
|
||||
states.SUCCESS,
|
||||
states.PENDING,
|
||||
])
|
||||
RUNNABLE_STATES = set([
|
||||
states.PENDING,
|
||||
])
|
||||
|
||||
def __init__(self, name, parents=None, uuid=None):
|
||||
self._name = str(name)
|
||||
# The state of this flow.
|
||||
self._state = states.PENDING
|
||||
# If this flow has a parent flow/s which need to be reverted if
|
||||
# this flow fails then please include them here to allow this child
|
||||
# to call the parents...
|
||||
if parents:
|
||||
self.parents = tuple(parents)
|
||||
else:
|
||||
self.parents = ()
|
||||
# Any objects that want to listen when a wf/task starts/stops/completes
|
||||
# or errors should be registered here. This can be used to monitor
|
||||
# progress and record tasks finishing (so that it becomes possible to
|
||||
# store the result of a task in some persistent or semi-persistent
|
||||
# storage backend).
|
||||
self.notifier = utils.TransitionNotifier()
|
||||
self.task_notifier = utils.TransitionNotifier()
|
||||
# Ensure that modifications and/or multiple runs aren't happening
|
||||
# at the same time in the same flow at the same time.
|
||||
self._lock = threading.RLock()
|
||||
# Assign this flow a unique identifer.
|
||||
if uuid:
|
||||
self._id = str(uuid)
|
||||
else:
|
||||
self._id = str(uuidlib.uuid4())
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
"""A non-unique name for this flow (human readable)"""
|
||||
return self._name
|
||||
|
||||
@property
|
||||
def uuid(self):
|
||||
"""Uniquely identifies this flow"""
|
||||
return "f-%s" % (self._id)
|
||||
|
||||
@property
|
||||
def state(self):
|
||||
"""Provides a read-only view of the flow state."""
|
||||
return self._state
|
||||
|
||||
def _change_state(self, context, new_state):
|
||||
was_changed = False
|
||||
old_state = self.state
|
||||
with self._lock:
|
||||
if self.state != new_state:
|
||||
old_state = self.state
|
||||
self._state = new_state
|
||||
was_changed = True
|
||||
if was_changed:
|
||||
# Don't notify while holding the lock.
|
||||
self.notifier.notify(self.state, details={
|
||||
'context': context,
|
||||
'flow': self,
|
||||
'old_state': old_state,
|
||||
})
|
||||
|
||||
def __str__(self):
|
||||
lines = ["Flow: %s" % (self.name)]
|
||||
lines.append("%s" % (self.uuid))
|
||||
lines.append("%s" % (len(self.parents)))
|
||||
lines.append("%s" % (self.state))
|
||||
return "; ".join(lines)
|
||||
|
||||
@abc.abstractmethod
|
||||
def add(self, task):
|
||||
"""Adds a given task to this flow.
|
||||
|
||||
Returns the uuid that is associated with the task for later operations
|
||||
before and after it is ran.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
@decorators.locked
|
||||
def add_many(self, tasks):
|
||||
"""Adds many tasks to this flow.
|
||||
|
||||
Returns a list of uuids (one for each task added).
|
||||
"""
|
||||
uuids = []
|
||||
for t in tasks:
|
||||
uuids.append(self.add(t))
|
||||
return uuids
|
||||
|
||||
def interrupt(self):
|
||||
"""Attempts to interrupt the current flow and any tasks that are
|
||||
currently not running in the flow.
|
||||
|
||||
Returns how many tasks were interrupted (if any).
|
||||
"""
|
||||
if self.state in self.UNINTERRUPTIBLE_STATES:
|
||||
raise exc.InvalidStateException(("Can not interrupt when"
|
||||
" in state %s") % (self.state))
|
||||
# Note(harlowja): Do *not* acquire the lock here so that the flow may
|
||||
# be interrupted while running. This does mean the the above check may
|
||||
# not be valid but we can worry about that if it becomes an issue.
|
||||
old_state = self.state
|
||||
if old_state != states.INTERRUPTED:
|
||||
self._state = states.INTERRUPTED
|
||||
self.notifier.notify(self.state, details={
|
||||
'context': None,
|
||||
'flow': self,
|
||||
'old_state': old_state,
|
||||
})
|
||||
return 0
|
||||
|
||||
@decorators.locked
|
||||
def reset(self):
|
||||
"""Fully resets the internal state of this flow, allowing for the flow
|
||||
to be ran again.
|
||||
|
||||
Note: Listeners are also reset.
|
||||
"""
|
||||
if self.state not in self.RESETTABLE_STATES:
|
||||
raise exc.InvalidStateException(("Can not reset when"
|
||||
" in state %s") % (self.state))
|
||||
self.notifier.reset()
|
||||
self.task_notifier.reset()
|
||||
self._change_state(None, states.PENDING)
|
||||
|
||||
@decorators.locked
|
||||
def soft_reset(self):
|
||||
"""Partially resets the internal state of this flow, allowing for the
|
||||
flow to be ran again from an interrupted state only.
|
||||
"""
|
||||
if self.state not in self.SOFT_RESETTABLE_STATES:
|
||||
raise exc.InvalidStateException(("Can not soft reset when"
|
||||
" in state %s") % (self.state))
|
||||
self._change_state(None, states.PENDING)
|
||||
|
||||
@decorators.locked
|
||||
def run(self, context, *args, **kwargs):
|
||||
"""Executes the workflow."""
|
||||
if self.state not in self.RUNNABLE_STATES:
|
||||
raise exc.InvalidStateException("Unable to run flow when "
|
||||
"in state %s" % (self.state))
|
||||
|
||||
@decorators.locked
|
||||
def rollback(self, context, cause):
|
||||
"""Performs rollback of this workflow and any attached parent workflows
|
||||
if present.
|
||||
"""
|
||||
pass
|
@ -1,271 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import collections
|
||||
import logging
|
||||
|
||||
from cinder.openstack.common import excutils
|
||||
|
||||
from cinder.taskflow import decorators
|
||||
from cinder.taskflow import exceptions as exc
|
||||
from cinder.taskflow import states
|
||||
from cinder.taskflow import utils
|
||||
|
||||
from cinder.taskflow.patterns import base
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class Flow(base.Flow):
|
||||
""""A linear chain of tasks that can be applied in order as one unit and
|
||||
rolled back as one unit using the reverse order that the tasks have
|
||||
been applied in.
|
||||
|
||||
Note(harlowja): Each task in the chain must have requirements
|
||||
which are satisfied by the previous task/s in the chain.
|
||||
"""
|
||||
|
||||
def __init__(self, name, parents=None, uuid=None):
|
||||
super(Flow, self).__init__(name, parents, uuid)
|
||||
# The tasks which have been applied will be collected here so that they
|
||||
# can be reverted in the correct order on failure.
|
||||
self._accumulator = utils.RollbackAccumulator()
|
||||
# Tasks results are stored here. Lookup is by the uuid that was
|
||||
# returned from the add function.
|
||||
self.results = {}
|
||||
# The previously left off iterator that can be used to resume from
|
||||
# the last task (if interrupted and soft-reset).
|
||||
self._leftoff_at = None
|
||||
# All runners to run are collected here.
|
||||
self._runners = []
|
||||
self._connected = False
|
||||
# The resumption strategy to use.
|
||||
self.resumer = None
|
||||
|
||||
@decorators.locked
|
||||
def add(self, task):
|
||||
"""Adds a given task to this flow."""
|
||||
assert isinstance(task, collections.Callable)
|
||||
r = utils.Runner(task)
|
||||
r.runs_before = list(reversed(self._runners))
|
||||
self._runners.append(r)
|
||||
self._reset_internals()
|
||||
return r.uuid
|
||||
|
||||
def _reset_internals(self):
|
||||
self._connected = False
|
||||
self._leftoff_at = None
|
||||
|
||||
def _associate_providers(self, runner):
|
||||
# Ensure that some previous task provides this input.
|
||||
who_provides = {}
|
||||
task_requires = runner.requires
|
||||
for r in task_requires:
|
||||
provider = None
|
||||
for before_me in runner.runs_before:
|
||||
if r in before_me.provides:
|
||||
provider = before_me
|
||||
break
|
||||
if provider:
|
||||
who_provides[r] = provider
|
||||
# Ensure that the last task provides all the needed input for this
|
||||
# task to run correctly.
|
||||
missing_requires = task_requires - set(who_provides.keys())
|
||||
if missing_requires:
|
||||
raise exc.MissingDependencies(runner, sorted(missing_requires))
|
||||
runner.providers.update(who_provides)
|
||||
|
||||
def __str__(self):
|
||||
lines = ["LinearFlow: %s" % (self.name)]
|
||||
lines.append("%s" % (self.uuid))
|
||||
lines.append("%s" % (len(self._runners)))
|
||||
lines.append("%s" % (len(self.parents)))
|
||||
lines.append("%s" % (self.state))
|
||||
return "; ".join(lines)
|
||||
|
||||
@decorators.locked
|
||||
def remove(self, uuid):
|
||||
index_removed = -1
|
||||
for (i, r) in enumerate(self._runners):
|
||||
if r.uuid == uuid:
|
||||
index_removed = i
|
||||
break
|
||||
if index_removed == -1:
|
||||
raise ValueError("No runner found with uuid %s" % (uuid))
|
||||
else:
|
||||
removed = self._runners.pop(index_removed)
|
||||
self._reset_internals()
|
||||
# Go and remove it from any runner after the removed runner since
|
||||
# those runners may have had an attachment to it.
|
||||
for r in self._runners[index_removed:]:
|
||||
try:
|
||||
r.runs_before.remove(removed)
|
||||
except (IndexError, ValueError):
|
||||
pass
|
||||
|
||||
def __len__(self):
|
||||
return len(self._runners)
|
||||
|
||||
def _connect(self):
|
||||
if self._connected:
|
||||
return self._runners
|
||||
for r in self._runners:
|
||||
r.providers = {}
|
||||
for r in reversed(self._runners):
|
||||
self._associate_providers(r)
|
||||
self._connected = True
|
||||
return self._runners
|
||||
|
||||
def _ordering(self):
|
||||
return iter(self._connect())
|
||||
|
||||
@decorators.locked
|
||||
def run(self, context, *args, **kwargs):
|
||||
super(Flow, self).run(context, *args, **kwargs)
|
||||
|
||||
def resume_it():
|
||||
if self._leftoff_at is not None:
|
||||
return ([], self._leftoff_at)
|
||||
if self.resumer:
|
||||
(finished, leftover) = self.resumer.resume(self,
|
||||
self._ordering())
|
||||
else:
|
||||
finished = []
|
||||
leftover = self._ordering()
|
||||
return (finished, leftover)
|
||||
|
||||
self._change_state(context, states.STARTED)
|
||||
try:
|
||||
those_finished, leftover = resume_it()
|
||||
except Exception:
|
||||
with excutils.save_and_reraise_exception():
|
||||
self._change_state(context, states.FAILURE)
|
||||
|
||||
def run_it(runner, failed=False, result=None, simulate_run=False):
|
||||
try:
|
||||
# Add the task to be rolled back *immediately* so that even if
|
||||
# the task fails while producing results it will be given a
|
||||
# chance to rollback.
|
||||
rb = utils.RollbackTask(context, runner.task, result=None)
|
||||
self._accumulator.add(rb)
|
||||
self.task_notifier.notify(states.STARTED, details={
|
||||
'context': context,
|
||||
'flow': self,
|
||||
'runner': runner,
|
||||
})
|
||||
if not simulate_run:
|
||||
result = runner(context, *args, **kwargs)
|
||||
else:
|
||||
if failed:
|
||||
# TODO(harlowja): make this configurable??
|
||||
# If we previously failed, we want to fail again at
|
||||
# the same place.
|
||||
if not result:
|
||||
# If no exception or exception message was provided
|
||||
# or captured from the previous run then we need to
|
||||
# form one for this task.
|
||||
result = "%s failed running." % (runner.task)
|
||||
if isinstance(result, basestring):
|
||||
result = exc.InvalidStateException(result)
|
||||
if not isinstance(result, Exception):
|
||||
LOG.warn("Can not raise a non-exception"
|
||||
" object: %s", result)
|
||||
result = exc.InvalidStateException()
|
||||
raise result
|
||||
# Adjust the task result in the accumulator before
|
||||
# notifying others that the task has finished to
|
||||
# avoid the case where a listener might throw an
|
||||
# exception.
|
||||
rb.result = result
|
||||
runner.result = result
|
||||
self.results[runner.uuid] = result
|
||||
self.task_notifier.notify(states.SUCCESS, details={
|
||||
'context': context,
|
||||
'flow': self,
|
||||
'runner': runner,
|
||||
})
|
||||
except Exception as e:
|
||||
runner.result = e
|
||||
cause = utils.FlowFailure(runner, self, e)
|
||||
with excutils.save_and_reraise_exception():
|
||||
# Notify any listeners that the task has errored.
|
||||
self.task_notifier.notify(states.FAILURE, details={
|
||||
'context': context,
|
||||
'flow': self,
|
||||
'runner': runner,
|
||||
})
|
||||
self.rollback(context, cause)
|
||||
|
||||
if len(those_finished):
|
||||
self._change_state(context, states.RESUMING)
|
||||
for (r, details) in those_finished:
|
||||
# Fake running the task so that we trigger the same
|
||||
# notifications and state changes (and rollback that
|
||||
# would have happened in a normal flow).
|
||||
failed = states.FAILURE in details.get('states', [])
|
||||
result = details.get('result')
|
||||
run_it(r, failed=failed, result=result, simulate_run=True)
|
||||
|
||||
self._leftoff_at = leftover
|
||||
self._change_state(context, states.RUNNING)
|
||||
if self.state == states.INTERRUPTED:
|
||||
return
|
||||
|
||||
was_interrupted = False
|
||||
for r in leftover:
|
||||
r.reset()
|
||||
run_it(r)
|
||||
if self.state == states.INTERRUPTED:
|
||||
was_interrupted = True
|
||||
break
|
||||
|
||||
if not was_interrupted:
|
||||
# Only gets here if everything went successfully.
|
||||
self._change_state(context, states.SUCCESS)
|
||||
self._leftoff_at = None
|
||||
|
||||
@decorators.locked
|
||||
def reset(self):
|
||||
super(Flow, self).reset()
|
||||
self.results = {}
|
||||
self.resumer = None
|
||||
self._accumulator.reset()
|
||||
self._reset_internals()
|
||||
|
||||
@decorators.locked
|
||||
def rollback(self, context, cause):
|
||||
# Performs basic task by task rollback by going through the reverse
|
||||
# order that tasks have finished and asking said task to undo whatever
|
||||
# it has done. If this flow has any parent flows then they will
|
||||
# also be called to rollback any tasks said parents contain.
|
||||
#
|
||||
# Note(harlowja): if a flow can more simply revert a whole set of
|
||||
# tasks via a simpler command then it can override this method to
|
||||
# accomplish that.
|
||||
#
|
||||
# For example, if each task was creating a file in a directory, then
|
||||
# it's easier to just remove the directory than to ask each task to
|
||||
# delete its file individually.
|
||||
self._change_state(context, states.REVERTING)
|
||||
try:
|
||||
self._accumulator.rollback(cause)
|
||||
finally:
|
||||
self._change_state(context, states.FAILURE)
|
||||
# Rollback any parents flows if they exist...
|
||||
for p in self.parents:
|
||||
p.rollback(context, cause)
|
@ -1,40 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
# Job states.
|
||||
CLAIMED = 'CLAIMED'
|
||||
FAILURE = 'FAILURE'
|
||||
PENDING = 'PENDING'
|
||||
RUNNING = 'RUNNING'
|
||||
SUCCESS = 'SUCCESS'
|
||||
UNCLAIMED = 'UNCLAIMED'
|
||||
|
||||
# Flow states.
|
||||
FAILURE = FAILURE
|
||||
INTERRUPTED = 'INTERRUPTED'
|
||||
PENDING = 'PENDING'
|
||||
RESUMING = 'RESUMING'
|
||||
REVERTING = 'REVERTING'
|
||||
RUNNING = RUNNING
|
||||
STARTED = 'STARTED'
|
||||
SUCCESS = SUCCESS
|
||||
|
||||
# Task states.
|
||||
FAILURE = FAILURE
|
||||
STARTED = STARTED
|
||||
SUCCESS = SUCCESS
|
@ -1,67 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import abc
|
||||
|
||||
import six
|
||||
|
||||
from cinder.taskflow import utils
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class Task(object):
|
||||
"""An abstraction that defines a potential piece of work that can be
|
||||
applied and can be reverted to undo the work as a single unit.
|
||||
"""
|
||||
def __init__(self, name):
|
||||
self.name = name
|
||||
# An *immutable* input 'resource' name set this task depends
|
||||
# on existing before this task can be applied.
|
||||
self.requires = set()
|
||||
# An *immutable* input 'resource' name set this task would like to
|
||||
# depends on existing before this task can be applied (but does not
|
||||
# strongly depend on existing).
|
||||
self.optional = set()
|
||||
# An *immutable* output 'resource' name set this task
|
||||
# produces that other tasks may depend on this task providing.
|
||||
self.provides = set()
|
||||
# This identifies the version of the task to be ran which
|
||||
# can be useful in resuming older versions of tasks. Standard
|
||||
# major, minor version semantics apply.
|
||||
self.version = (1, 0)
|
||||
|
||||
def __str__(self):
|
||||
return "%s==%s" % (self.name, utils.join(self.version, with_what="."))
|
||||
|
||||
@abc.abstractmethod
|
||||
def __call__(self, context, *args, **kwargs):
|
||||
"""Activate a given task which will perform some operation and return.
|
||||
|
||||
This method can be used to apply some given context and given set
|
||||
of args and kwargs to accomplish some goal. Note that the result
|
||||
that is returned needs to be serializable so that it can be passed
|
||||
back into this task if reverting is triggered.
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def revert(self, context, result, cause):
|
||||
"""Revert this task using the given context, result that the apply
|
||||
provided as well as any information which may have caused
|
||||
said reversion.
|
||||
"""
|
||||
pass
|
@ -1,464 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import collections
|
||||
import contextlib
|
||||
import copy
|
||||
import logging
|
||||
import re
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
import types
|
||||
import uuid as uuidlib
|
||||
|
||||
|
||||
from cinder.taskflow import decorators
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def get_attr(task, field, default=None):
|
||||
if decorators.is_decorated(task):
|
||||
# If its a decorated functor then the attributes will be either
|
||||
# in the underlying function of the instancemethod or the function
|
||||
# itself.
|
||||
task = decorators.extract(task)
|
||||
return getattr(task, field, default)
|
||||
|
||||
|
||||
def join(itr, with_what=","):
|
||||
pieces = [str(i) for i in itr]
|
||||
return with_what.join(pieces)
|
||||
|
||||
|
||||
def get_many_attr(obj, *attrs):
|
||||
many = []
|
||||
for a in attrs:
|
||||
many.append(get_attr(obj, a, None))
|
||||
return many
|
||||
|
||||
|
||||
def get_task_version(task):
|
||||
"""Gets a tasks *string* version, whether it is a task object/function."""
|
||||
task_version = get_attr(task, 'version')
|
||||
if isinstance(task_version, (list, tuple)):
|
||||
task_version = join(task_version, with_what=".")
|
||||
if task_version is not None and not isinstance(task_version, basestring):
|
||||
task_version = str(task_version)
|
||||
return task_version
|
||||
|
||||
|
||||
def get_task_name(task):
|
||||
"""Gets a tasks *string* name, whether it is a task object/function."""
|
||||
task_name = ""
|
||||
if isinstance(task, (types.MethodType, types.FunctionType)):
|
||||
# If its a function look for the attributes that should have been
|
||||
# set using the task() decorator provided in the decorators file. If
|
||||
# those have not been set, then we should at least have enough basic
|
||||
# information (not a version) to form a useful task name.
|
||||
task_name = get_attr(task, 'name')
|
||||
if not task_name:
|
||||
name_pieces = [a for a in get_many_attr(task,
|
||||
'__module__',
|
||||
'__name__')
|
||||
if a is not None]
|
||||
task_name = join(name_pieces, ".")
|
||||
else:
|
||||
task_name = str(task)
|
||||
return task_name
|
||||
|
||||
|
||||
def is_version_compatible(version_1, version_2):
|
||||
"""Checks for major version compatibility of two *string" versions."""
|
||||
if version_1 == version_2:
|
||||
# Equivalent exactly, so skip the rest.
|
||||
return True
|
||||
|
||||
def _convert_to_pieces(version):
|
||||
try:
|
||||
pieces = []
|
||||
for p in version.split("."):
|
||||
p = p.strip()
|
||||
if not len(p):
|
||||
pieces.append(0)
|
||||
continue
|
||||
# Clean off things like 1alpha, or 2b and just select the
|
||||
# digit that starts that entry instead.
|
||||
p_match = re.match(r"(\d+)([A-Za-z]*)(.*)", p)
|
||||
if p_match:
|
||||
p = p_match.group(1)
|
||||
pieces.append(int(p))
|
||||
except (AttributeError, TypeError, ValueError):
|
||||
pieces = []
|
||||
return pieces
|
||||
|
||||
version_1_pieces = _convert_to_pieces(version_1)
|
||||
version_2_pieces = _convert_to_pieces(version_2)
|
||||
if len(version_1_pieces) == 0 or len(version_2_pieces) == 0:
|
||||
return False
|
||||
|
||||
# Ensure major version compatibility to start.
|
||||
major1 = version_1_pieces[0]
|
||||
major2 = version_2_pieces[0]
|
||||
if major1 != major2:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def await(check_functor, timeout=None):
|
||||
if timeout is not None:
|
||||
end_time = time.time() + max(0, timeout)
|
||||
else:
|
||||
end_time = None
|
||||
# Use the same/similar scheme that the python condition class uses.
|
||||
delay = 0.0005
|
||||
while not check_functor():
|
||||
time.sleep(delay)
|
||||
if end_time is not None:
|
||||
remaining = end_time - time.time()
|
||||
if remaining <= 0:
|
||||
return False
|
||||
delay = min(delay * 2, remaining, 0.05)
|
||||
else:
|
||||
delay = min(delay * 2, 0.05)
|
||||
return True
|
||||
|
||||
|
||||
class LastFedIter(object):
|
||||
"""An iterator which yields back the first item and then yields back
|
||||
results from the provided iterator.
|
||||
"""
|
||||
|
||||
def __init__(self, first, rest_itr):
|
||||
self.first = first
|
||||
self.rest_itr = rest_itr
|
||||
|
||||
def __iter__(self):
|
||||
yield self.first
|
||||
for i in self.rest_itr:
|
||||
yield i
|
||||
|
||||
|
||||
class FlowFailure(object):
|
||||
"""When a task failure occurs the following object will be given to revert
|
||||
and can be used to interrogate what caused the failure.
|
||||
"""
|
||||
|
||||
def __init__(self, runner, flow, exception):
|
||||
self.runner = runner
|
||||
self.flow = flow
|
||||
self.exc = exception
|
||||
self.exc_info = sys.exc_info()
|
||||
|
||||
|
||||
class RollbackTask(object):
|
||||
"""A helper task that on being called will call the underlying callable
|
||||
tasks revert method (if said method exists).
|
||||
"""
|
||||
|
||||
def __init__(self, context, task, result):
|
||||
self.task = task
|
||||
self.result = result
|
||||
self.context = context
|
||||
|
||||
def __str__(self):
|
||||
return str(self.task)
|
||||
|
||||
def __call__(self, cause):
|
||||
if ((hasattr(self.task, "revert") and
|
||||
isinstance(self.task.revert, collections.Callable))):
|
||||
self.task.revert(self.context, self.result, cause)
|
||||
|
||||
|
||||
class Runner(object):
|
||||
"""A helper class that wraps a task and can find the needed inputs for
|
||||
the task to run, as well as providing a uuid and other useful functionality
|
||||
for users of the task.
|
||||
|
||||
TODO(harlowja): replace with the task details object or a subclass of
|
||||
that???
|
||||
"""
|
||||
|
||||
def __init__(self, task, uuid=None):
|
||||
assert isinstance(task, collections.Callable)
|
||||
self.task = task
|
||||
self.providers = {}
|
||||
self.runs_before = []
|
||||
self.result = None
|
||||
if not uuid:
|
||||
self._id = str(uuidlib.uuid4())
|
||||
else:
|
||||
self._id = str(uuid)
|
||||
|
||||
@property
|
||||
def uuid(self):
|
||||
return "r-%s" % (self._id)
|
||||
|
||||
@property
|
||||
def requires(self):
|
||||
return set(get_attr(self.task, 'requires', []))
|
||||
|
||||
@property
|
||||
def provides(self):
|
||||
return set(get_attr(self.task, 'provides', []))
|
||||
|
||||
@property
|
||||
def optional(self):
|
||||
return set(get_attr(self.task, 'optional', []))
|
||||
|
||||
@property
|
||||
def version(self):
|
||||
return get_task_version(self.task)
|
||||
|
||||
@property
|
||||
def name(self):
|
||||
return get_task_name(self.task)
|
||||
|
||||
def reset(self):
|
||||
self.result = None
|
||||
|
||||
def __str__(self):
|
||||
lines = ["Runner: %s" % (self.name)]
|
||||
lines.append("%s" % (self.uuid))
|
||||
lines.append("%s" % (self.version))
|
||||
return "; ".join(lines)
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
# Find all of our inputs first.
|
||||
kwargs = dict(kwargs)
|
||||
for (k, who_made) in self.providers.iteritems():
|
||||
if who_made.result and k in who_made.result:
|
||||
kwargs[k] = who_made.result[k]
|
||||
else:
|
||||
kwargs[k] = None
|
||||
optional_keys = self.optional
|
||||
optional_missing_keys = optional_keys - set(kwargs.keys())
|
||||
if optional_missing_keys:
|
||||
for k in optional_missing_keys:
|
||||
for r in self.runs_before:
|
||||
r_provides = r.provides
|
||||
if k in r_provides and r.result and k in r.result:
|
||||
kwargs[k] = r.result[k]
|
||||
break
|
||||
# And now finally run.
|
||||
self.result = self.task(*args, **kwargs)
|
||||
return self.result
|
||||
|
||||
|
||||
class TransitionNotifier(object):
|
||||
"""A utility helper class that can be used to subscribe to
|
||||
notifications of events occurring as well as allow a entity to post said
|
||||
notifications to subscribers.
|
||||
"""
|
||||
|
||||
RESERVED_KEYS = ('details',)
|
||||
ANY = '*'
|
||||
|
||||
def __init__(self):
|
||||
self._listeners = collections.defaultdict(list)
|
||||
|
||||
def reset(self):
|
||||
self._listeners = collections.defaultdict(list)
|
||||
|
||||
def notify(self, state, details):
|
||||
listeners = list(self._listeners.get(self.ANY, []))
|
||||
for i in self._listeners[state]:
|
||||
if i not in listeners:
|
||||
listeners.append(i)
|
||||
if not listeners:
|
||||
return
|
||||
for (callback, args, kwargs) in listeners:
|
||||
if args is None:
|
||||
args = []
|
||||
if kwargs is None:
|
||||
kwargs = {}
|
||||
kwargs['details'] = details
|
||||
try:
|
||||
callback(state, *args, **kwargs)
|
||||
except Exception:
|
||||
LOG.exception(("Failure calling callback %s to notify about"
|
||||
" state transition %s"), callback, state)
|
||||
|
||||
def register(self, state, callback, args=None, kwargs=None):
|
||||
assert isinstance(callback, collections.Callable)
|
||||
for i, (cb, args, kwargs) in enumerate(self._listeners.get(state, [])):
|
||||
if cb is callback:
|
||||
raise ValueError("Callback %s already registered" % (callback))
|
||||
if kwargs:
|
||||
for k in self.RESERVED_KEYS:
|
||||
if k in kwargs:
|
||||
raise KeyError(("Reserved key '%s' not allowed in "
|
||||
"kwargs") % k)
|
||||
kwargs = copy.copy(kwargs)
|
||||
if args:
|
||||
args = copy.copy(args)
|
||||
self._listeners[state].append((callback, args, kwargs))
|
||||
|
||||
def deregister(self, state, callback):
|
||||
if state not in self._listeners:
|
||||
return
|
||||
for i, (cb, args, kwargs) in enumerate(self._listeners[state]):
|
||||
if cb is callback:
|
||||
self._listeners[state].pop(i)
|
||||
break
|
||||
|
||||
|
||||
class RollbackAccumulator(object):
|
||||
"""A utility class that can help in organizing 'undo' like code
|
||||
so that said code be rolled back on failure (automatically or manually)
|
||||
by activating rollback callables that were inserted during said codes
|
||||
progression.
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self._rollbacks = []
|
||||
|
||||
def add(self, *callables):
|
||||
self._rollbacks.extend(callables)
|
||||
|
||||
def reset(self):
|
||||
self._rollbacks = []
|
||||
|
||||
def __len__(self):
|
||||
return len(self._rollbacks)
|
||||
|
||||
def __iter__(self):
|
||||
# Rollbacks happen in the reverse order that they were added.
|
||||
return reversed(self._rollbacks)
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def rollback(self, cause):
|
||||
LOG.warn("Activating %s rollbacks due to %s.", len(self), cause)
|
||||
for (i, f) in enumerate(self):
|
||||
LOG.debug("Calling rollback %s: %s", i + 1, f)
|
||||
try:
|
||||
f(cause)
|
||||
except Exception:
|
||||
LOG.exception(("Failed rolling back %s: %s due "
|
||||
"to inner exception."), i + 1, f)
|
||||
|
||||
def __exit__(self, type, value, tb):
|
||||
if any((value, type, tb)):
|
||||
self.rollback(value)
|
||||
|
||||
|
||||
class ReaderWriterLock(object):
|
||||
"""A simple reader-writer lock.
|
||||
|
||||
Several readers can hold the lock simultaneously, and only one writer.
|
||||
Write locks have priority over reads to prevent write starvation.
|
||||
|
||||
Public domain @ http://majid.info/blog/a-reader-writer-lock-for-python/
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.rwlock = 0
|
||||
self.writers_waiting = 0
|
||||
self.monitor = threading.Lock()
|
||||
self.readers_ok = threading.Condition(self.monitor)
|
||||
self.writers_ok = threading.Condition(self.monitor)
|
||||
|
||||
@contextlib.contextmanager
|
||||
def acquire(self, read=True):
|
||||
"""Acquire a read or write lock in a context manager."""
|
||||
try:
|
||||
if read:
|
||||
self.acquire_read()
|
||||
else:
|
||||
self.acquire_write()
|
||||
yield self
|
||||
finally:
|
||||
self.release()
|
||||
|
||||
def acquire_read(self):
|
||||
"""Acquire a read lock.
|
||||
|
||||
Several threads can hold this typeof lock.
|
||||
It is exclusive with write locks.
|
||||
"""
|
||||
|
||||
self.monitor.acquire()
|
||||
while self.rwlock < 0 or self.writers_waiting:
|
||||
self.readers_ok.wait()
|
||||
self.rwlock += 1
|
||||
self.monitor.release()
|
||||
|
||||
def acquire_write(self):
|
||||
"""Acquire a write lock.
|
||||
|
||||
Only one thread can hold this lock, and only when no read locks
|
||||
are also held.
|
||||
"""
|
||||
|
||||
self.monitor.acquire()
|
||||
while self.rwlock != 0:
|
||||
self.writers_waiting += 1
|
||||
self.writers_ok.wait()
|
||||
self.writers_waiting -= 1
|
||||
self.rwlock = -1
|
||||
self.monitor.release()
|
||||
|
||||
def release(self):
|
||||
"""Release a lock, whether read or write."""
|
||||
|
||||
self.monitor.acquire()
|
||||
if self.rwlock < 0:
|
||||
self.rwlock = 0
|
||||
else:
|
||||
self.rwlock -= 1
|
||||
wake_writers = self.writers_waiting and self.rwlock == 0
|
||||
wake_readers = self.writers_waiting == 0
|
||||
self.monitor.release()
|
||||
if wake_writers:
|
||||
self.writers_ok.acquire()
|
||||
self.writers_ok.notify()
|
||||
self.writers_ok.release()
|
||||
elif wake_readers:
|
||||
self.readers_ok.acquire()
|
||||
self.readers_ok.notifyAll()
|
||||
self.readers_ok.release()
|
||||
|
||||
|
||||
class LazyPluggable(object):
|
||||
"""A pluggable backend loaded lazily based on some value."""
|
||||
|
||||
def __init__(self, pivot, **backends):
|
||||
self.__backends = backends
|
||||
self.__pivot = pivot
|
||||
self.__backend = None
|
||||
|
||||
def __get_backend(self):
|
||||
if not self.__backend:
|
||||
backend_name = 'sqlalchemy'
|
||||
backend = self.__backends[backend_name]
|
||||
if isinstance(backend, tuple):
|
||||
name = backend[0]
|
||||
fromlist = backend[1]
|
||||
else:
|
||||
name = backend
|
||||
fromlist = backend
|
||||
|
||||
self.__backend = __import__(name, None, None, fromlist)
|
||||
return self.__backend
|
||||
|
||||
def __getattr__(self, key):
|
||||
backend = self.__get_backend()
|
||||
return getattr(backend, key)
|
@ -30,6 +30,7 @@ import tempfile
|
||||
|
||||
import mox
|
||||
from oslo.config import cfg
|
||||
from taskflow.engines.action_engine import engine
|
||||
|
||||
from cinder.backup import driver as backup_driver
|
||||
from cinder.brick.iscsi import iscsi
|
||||
@ -46,7 +47,6 @@ from cinder.openstack.common.notifier import test_notifier
|
||||
from cinder.openstack.common import rpc
|
||||
import cinder.policy
|
||||
from cinder import quota
|
||||
from cinder.taskflow.patterns import linear_flow
|
||||
from cinder import test
|
||||
from cinder.tests.brick.fake_lvm import FakeBrickLVM
|
||||
from cinder.tests import conf_fixture
|
||||
@ -59,7 +59,6 @@ import cinder.volume
|
||||
from cinder.volume import configuration as conf
|
||||
from cinder.volume import driver
|
||||
from cinder.volume.drivers import lvm
|
||||
from cinder.volume.flows import create_volume
|
||||
from cinder.volume import rpcapi as volume_rpcapi
|
||||
from cinder.volume import utils as volutils
|
||||
|
||||
@ -465,7 +464,7 @@ class VolumeTestCase(BaseVolumeTestCase):
|
||||
self.stubs.Set(self.volume.driver, 'create_volume_from_snapshot',
|
||||
lambda *args, **kwargs: None)
|
||||
|
||||
orig_flow = linear_flow.Flow.run
|
||||
orig_flow = engine.ActionEngine.run
|
||||
|
||||
def mock_flow_run(*args, **kwargs):
|
||||
# ensure the lock has been taken
|
||||
@ -492,7 +491,7 @@ class VolumeTestCase(BaseVolumeTestCase):
|
||||
admin_ctxt = context.get_admin_context()
|
||||
|
||||
# mock the flow runner so we can do some checks
|
||||
self.stubs.Set(linear_flow.Flow, 'run', mock_flow_run)
|
||||
self.stubs.Set(engine.ActionEngine, 'run', mock_flow_run)
|
||||
|
||||
# locked
|
||||
self.volume.create_volume(self.context, volume_id=dst_vol_id,
|
||||
@ -528,7 +527,7 @@ class VolumeTestCase(BaseVolumeTestCase):
|
||||
# mock the synchroniser so we can record events
|
||||
self.stubs.Set(utils, 'synchronized', self._mock_synchronized)
|
||||
|
||||
orig_flow = linear_flow.Flow.run
|
||||
orig_flow = engine.ActionEngine.run
|
||||
|
||||
def mock_flow_run(*args, **kwargs):
|
||||
# ensure the lock has been taken
|
||||
@ -551,7 +550,7 @@ class VolumeTestCase(BaseVolumeTestCase):
|
||||
admin_ctxt = context.get_admin_context()
|
||||
|
||||
# mock the flow runner so we can do some checks
|
||||
self.stubs.Set(linear_flow.Flow, 'run', mock_flow_run)
|
||||
self.stubs.Set(engine.ActionEngine, 'run', mock_flow_run)
|
||||
|
||||
# locked
|
||||
self.volume.create_volume(self.context, volume_id=dst_vol_id,
|
||||
@ -1816,13 +1815,6 @@ class VolumeTestCase(BaseVolumeTestCase):
|
||||
def fake_create_volume(*args, **kwargs):
|
||||
raise exception.CinderException('fake exception')
|
||||
|
||||
def fake_reschedule_or_error(self, context, *args, **kwargs):
|
||||
self.assertFalse(context.is_admin)
|
||||
self.assertNotIn('admin', context.roles)
|
||||
#compare context passed in with the context we saved
|
||||
self.assertDictMatch(self.saved_ctxt.__dict__,
|
||||
context.__dict__)
|
||||
|
||||
#create context for testing
|
||||
ctxt = self.context.deepcopy()
|
||||
if 'admin' in ctxt.roles:
|
||||
@ -1831,8 +1823,6 @@ class VolumeTestCase(BaseVolumeTestCase):
|
||||
#create one copy of context for future comparison
|
||||
self.saved_ctxt = ctxt.deepcopy()
|
||||
|
||||
self.stubs.Set(create_volume.OnFailureRescheduleTask, '_reschedule',
|
||||
fake_reschedule_or_error)
|
||||
self.stubs.Set(self.volume.driver, 'create_volume', fake_create_volume)
|
||||
|
||||
volume_src = tests_utils.create_volume(self.context,
|
||||
|
@ -37,15 +37,12 @@ from cinder.openstack.common import timeutils
|
||||
import cinder.policy
|
||||
from cinder import quota
|
||||
from cinder.scheduler import rpcapi as scheduler_rpcapi
|
||||
from cinder import units
|
||||
from cinder import utils
|
||||
from cinder.volume.flows import create_volume
|
||||
from cinder.volume import rpcapi as volume_rpcapi
|
||||
from cinder.volume import utils as volume_utils
|
||||
from cinder.volume import volume_types
|
||||
|
||||
from cinder.taskflow import states
|
||||
|
||||
|
||||
volume_host_opt = cfg.BoolOpt('snapshot_same_host',
|
||||
default=True,
|
||||
@ -147,42 +144,34 @@ class API(base.Base):
|
||||
return False
|
||||
|
||||
create_what = {
|
||||
'size': size,
|
||||
'context': context,
|
||||
'raw_size': size,
|
||||
'name': name,
|
||||
'description': description,
|
||||
'snapshot': snapshot,
|
||||
'image_id': image_id,
|
||||
'volume_type': volume_type,
|
||||
'raw_volume_type': volume_type,
|
||||
'metadata': metadata,
|
||||
'availability_zone': availability_zone,
|
||||
'raw_availability_zone': availability_zone,
|
||||
'source_volume': source_volume,
|
||||
'scheduler_hints': scheduler_hints,
|
||||
'key_manager': self.key_manager,
|
||||
'backup_source_volume': backup_source_volume,
|
||||
}
|
||||
(flow, uuid) = create_volume.get_api_flow(self.scheduler_rpcapi,
|
||||
|
||||
try:
|
||||
flow_engine = create_volume.get_api_flow(self.scheduler_rpcapi,
|
||||
self.volume_rpcapi,
|
||||
self.db,
|
||||
self.image_service,
|
||||
check_volume_az_zone,
|
||||
create_what)
|
||||
except Exception:
|
||||
raise exception.CinderException(
|
||||
_("Failed to create api volume flow"))
|
||||
|
||||
assert flow, _('Create volume flow not retrieved')
|
||||
flow.run(context)
|
||||
if flow.state != states.SUCCESS:
|
||||
raise exception.CinderException(_("Failed to successfully complete"
|
||||
" create volume workflow"))
|
||||
|
||||
# Extract the volume information from the task uuid that was specified
|
||||
# to produce said information.
|
||||
volume = None
|
||||
try:
|
||||
volume = flow.results[uuid]['volume']
|
||||
except KeyError:
|
||||
pass
|
||||
|
||||
# Raise an error, nobody provided it??
|
||||
assert volume, _('Expected volume result not found')
|
||||
flow_engine.run()
|
||||
volume = flow_engine.storage.fetch('volume')
|
||||
return volume
|
||||
|
||||
@wrap_check_policy
|
||||
|
@ -15,7 +15,7 @@
|
||||
# under the License.
|
||||
|
||||
# For more information please visit: https://wiki.openstack.org/wiki/TaskFlow
|
||||
from cinder.taskflow import task
|
||||
from taskflow import task
|
||||
|
||||
|
||||
def _make_task_name(cls, addons=None):
|
||||
@ -34,28 +34,7 @@ class CinderTask(task.Task):
|
||||
implement the given task as the task name.
|
||||
"""
|
||||
|
||||
def __init__(self, addons=None):
|
||||
def __init__(self, addons=None, **kwargs):
|
||||
super(CinderTask, self).__init__(_make_task_name(self.__class__,
|
||||
addons))
|
||||
|
||||
|
||||
class InjectTask(CinderTask):
|
||||
"""This injects a dict into the flow.
|
||||
|
||||
This injection is done so that the keys (and values) provided can be
|
||||
dependended on by tasks further down the line. Since taskflow is dependency
|
||||
based this can be considered the bootstrapping task that provides an
|
||||
initial set of values for other tasks to get started with. If this did not
|
||||
exist then tasks would fail locating there dependent tasks and the values
|
||||
said dependent tasks produce.
|
||||
|
||||
Reversion strategy: N/A
|
||||
"""
|
||||
|
||||
def __init__(self, inject_what, addons=None):
|
||||
super(InjectTask, self).__init__(addons=addons)
|
||||
self.provides.update(inject_what.keys())
|
||||
self._inject = inject_what
|
||||
|
||||
def __call__(self, context):
|
||||
return dict(self._inject)
|
||||
addons),
|
||||
**kwargs)
|
||||
|
@ -23,6 +23,10 @@
|
||||
import traceback
|
||||
|
||||
from oslo.config import cfg
|
||||
import taskflow.engines
|
||||
from taskflow.patterns import linear_flow
|
||||
from taskflow import task
|
||||
from taskflow.utils import misc
|
||||
|
||||
from cinder import exception
|
||||
from cinder.image import glance
|
||||
@ -34,13 +38,9 @@ from cinder.openstack.common import strutils
|
||||
from cinder.openstack.common import timeutils
|
||||
from cinder import policy
|
||||
from cinder import quota
|
||||
from cinder.taskflow import decorators
|
||||
from cinder.taskflow.patterns import linear_flow
|
||||
from cinder.taskflow import task
|
||||
from cinder import units
|
||||
from cinder import utils
|
||||
from cinder.volume.flows import base
|
||||
from cinder.volume.flows import utils as flow_utils
|
||||
from cinder.volume import utils as volume_utils
|
||||
from cinder.volume import volume_types
|
||||
|
||||
@ -86,16 +86,6 @@ def _make_pretty_name(method):
|
||||
return ".".join(meth_pieces)
|
||||
|
||||
|
||||
def _find_result_spec(flow):
|
||||
"""Find the last task that produced a valid volume_spec and returns it."""
|
||||
for there_result in flow.results.values():
|
||||
if not there_result or not 'volume_spec' in there_result:
|
||||
continue
|
||||
if there_result['volume_spec']:
|
||||
return there_result['volume_spec']
|
||||
return None
|
||||
|
||||
|
||||
def _restore_source_status(context, db, volume_spec):
|
||||
# NOTE(harlowja): Only if the type of the volume that was being created is
|
||||
# the source volume type should we try to reset the source volume status
|
||||
@ -171,25 +161,16 @@ class ExtractVolumeRequestTask(base.CinderTask):
|
||||
Reversion strategy: N/A
|
||||
"""
|
||||
|
||||
def __init__(self, image_service, az_check_functor=None):
|
||||
super(ExtractVolumeRequestTask, self).__init__(addons=[ACTION])
|
||||
# This task will produce the following outputs (said outputs can be
|
||||
# saved to durable storage in the future so that the flow can be
|
||||
# reconstructed elsewhere and continued).
|
||||
self.provides.update(['availability_zone', 'size', 'snapshot_id',
|
||||
default_provides = set(['availability_zone', 'size', 'snapshot_id',
|
||||
'source_volid', 'volume_type', 'volume_type_id',
|
||||
'encryption_key_id'])
|
||||
# This task requires the following inputs to operate (provided
|
||||
# automatically to __call__(). This is done so that the flow can
|
||||
# be reconstructed elsewhere and continue running (in the future).
|
||||
#
|
||||
# It also is used to be able to link tasks that produce items to tasks
|
||||
# that consume items (thus allowing the linking of the flow to be
|
||||
# mostly automatic).
|
||||
self.requires.update(['availability_zone', 'image_id', 'metadata',
|
||||
'size', 'snapshot', 'source_volume',
|
||||
'volume_type', 'key_manager',
|
||||
'backup_source_volume'])
|
||||
|
||||
def __init__(self, image_service, az_check_functor=None, **kwargs):
|
||||
super(ExtractVolumeRequestTask, self).__init__(addons=[ACTION],
|
||||
**kwargs)
|
||||
self.image_service = image_service
|
||||
self.az_check_functor = az_check_functor
|
||||
if not self.az_check_functor:
|
||||
@ -451,7 +432,7 @@ class ExtractVolumeRequestTask(base.CinderTask):
|
||||
|
||||
return volume_type_id
|
||||
|
||||
def __call__(self, context, size, snapshot, image_id, source_volume,
|
||||
def execute(self, context, size, snapshot, image_id, source_volume,
|
||||
availability_zone, volume_type, metadata,
|
||||
key_manager, backup_source_volume):
|
||||
|
||||
@ -519,16 +500,18 @@ class EntryCreateTask(base.CinderTask):
|
||||
Reversion strategy: remove the volume_id created from the database.
|
||||
"""
|
||||
|
||||
def __init__(self, db):
|
||||
super(EntryCreateTask, self).__init__(addons=[ACTION])
|
||||
self.db = db
|
||||
self.requires.update(['availability_zone', 'description', 'metadata',
|
||||
'name', 'reservations', 'size', 'snapshot_id',
|
||||
'source_volid', 'volume_type_id',
|
||||
'encryption_key_id'])
|
||||
self.provides.update(['volume_properties', 'volume_id'])
|
||||
default_provides = set(['volume_properties', 'volume_id', 'volume'])
|
||||
|
||||
def __call__(self, context, **kwargs):
|
||||
def __init__(self, db):
|
||||
requires = ['availability_zone', 'description', 'metadata',
|
||||
'name', 'reservations', 'size', 'snapshot_id',
|
||||
'source_volid', 'volume_type_id', 'encryption_key_id']
|
||||
super(EntryCreateTask, self).__init__(addons=[ACTION],
|
||||
requires=requires)
|
||||
self.db = db
|
||||
self.provides.update()
|
||||
|
||||
def execute(self, context, **kwargs):
|
||||
"""Creates a database entry for the given inputs and returns details.
|
||||
|
||||
Accesses the database and creates a new entry for the to be created
|
||||
@ -569,9 +552,9 @@ class EntryCreateTask(base.CinderTask):
|
||||
'volume': volume,
|
||||
}
|
||||
|
||||
def revert(self, context, result, cause):
|
||||
def revert(self, context, result, **kwargs):
|
||||
# We never produced a result and therefore can't destroy anything.
|
||||
if not result:
|
||||
if isinstance(result, misc.Failure):
|
||||
return
|
||||
if context.quota_committed:
|
||||
# Committed quota doesn't rollback as the volume has already been
|
||||
@ -603,12 +586,12 @@ class QuotaReserveTask(base.CinderTask):
|
||||
an automated or manual process.
|
||||
"""
|
||||
|
||||
default_provides = set(['reservations'])
|
||||
|
||||
def __init__(self):
|
||||
super(QuotaReserveTask, self).__init__(addons=[ACTION])
|
||||
self.requires.update(['size', 'volume_type_id'])
|
||||
self.provides.update(['reservations'])
|
||||
|
||||
def __call__(self, context, size, volume_type_id):
|
||||
def execute(self, context, size, volume_type_id):
|
||||
try:
|
||||
reserve_opts = {'volumes': 1, 'gigabytes': size}
|
||||
QUOTAS.add_volume_type_opts(context, reserve_opts, volume_type_id)
|
||||
@ -648,15 +631,14 @@ class QuotaReserveTask(base.CinderTask):
|
||||
"already consumed)")
|
||||
LOG.warn(msg % {'s_pid': context.project_id,
|
||||
'd_consumed': _consumed('volumes')})
|
||||
allowed = quotas['volumes']
|
||||
raise exception.VolumeLimitExceeded(allowed=quotas['volumes'])
|
||||
else:
|
||||
# If nothing was reraised, ensure we reraise the initial error
|
||||
raise
|
||||
|
||||
def revert(self, context, result, cause):
|
||||
def revert(self, context, result, **kwargs):
|
||||
# We never produced a result and therefore can't destroy anything.
|
||||
if not result:
|
||||
if isinstance(result, misc.Failure):
|
||||
return
|
||||
if context.quota_committed:
|
||||
# The reservations have already been committed and can not be
|
||||
@ -691,16 +673,15 @@ class QuotaCommitTask(base.CinderTask):
|
||||
|
||||
def __init__(self):
|
||||
super(QuotaCommitTask, self).__init__(addons=[ACTION])
|
||||
self.requires.update(['reservations', 'volume_properties'])
|
||||
|
||||
def __call__(self, context, reservations, volume_properties):
|
||||
def execute(self, context, reservations, volume_properties):
|
||||
QUOTAS.commit(context, reservations)
|
||||
context.quota_committed = True
|
||||
return {'volume_properties': volume_properties}
|
||||
|
||||
def revert(self, context, result, cause):
|
||||
def revert(self, context, result, **kwargs):
|
||||
# We never produced a result and therefore can't destroy anything.
|
||||
if not result:
|
||||
if isinstance(result, misc.Failure):
|
||||
return
|
||||
volume = result['volume_properties']
|
||||
try:
|
||||
@ -729,13 +710,14 @@ class VolumeCastTask(base.CinderTask):
|
||||
"""
|
||||
|
||||
def __init__(self, scheduler_rpcapi, volume_rpcapi, db):
|
||||
super(VolumeCastTask, self).__init__(addons=[ACTION])
|
||||
requires = ['image_id', 'scheduler_hints', 'snapshot_id',
|
||||
'source_volid', 'volume_id', 'volume_type',
|
||||
'volume_properties']
|
||||
super(VolumeCastTask, self).__init__(addons=[ACTION],
|
||||
requires=requires)
|
||||
self.volume_rpcapi = volume_rpcapi
|
||||
self.scheduler_rpcapi = scheduler_rpcapi
|
||||
self.db = db
|
||||
self.requires.update(['image_id', 'scheduler_hints', 'snapshot_id',
|
||||
'source_volid', 'volume_id', 'volume_type',
|
||||
'volume_properties'])
|
||||
|
||||
def _cast_create_volume(self, context, request_spec, filter_properties):
|
||||
source_volid = request_spec['source_volid']
|
||||
@ -786,7 +768,7 @@ class VolumeCastTask(base.CinderTask):
|
||||
image_id=image_id,
|
||||
source_volid=source_volid)
|
||||
|
||||
def __call__(self, context, **kwargs):
|
||||
def execute(self, context, **kwargs):
|
||||
scheduler_hints = kwargs.pop('scheduler_hints', None)
|
||||
request_spec = kwargs.copy()
|
||||
filter_properties = {}
|
||||
@ -794,6 +776,20 @@ class VolumeCastTask(base.CinderTask):
|
||||
filter_properties['scheduler_hints'] = scheduler_hints
|
||||
self._cast_create_volume(context, request_spec, filter_properties)
|
||||
|
||||
def revert(self, context, result, flow_failures, **kwargs):
|
||||
if isinstance(result, misc.Failure):
|
||||
return
|
||||
|
||||
# Restore the source volume status and set the volume to error status.
|
||||
volume_id = kwargs['volume_id']
|
||||
_restore_source_status(context, self.db, kwargs)
|
||||
_error_out_volume(context, self.db, volume_id)
|
||||
LOG.error(_("Volume %s: create failed"), volume_id)
|
||||
exc_info = False
|
||||
if all(flow_failures[-1].exc_info):
|
||||
exc_info = flow_failures[-1].exc_info
|
||||
LOG.error(_('Unexpected build error:'), exc_info=exc_info)
|
||||
|
||||
|
||||
class OnFailureChangeStatusTask(base.CinderTask):
|
||||
"""Helper task that sets a volume id to status error.
|
||||
@ -808,33 +804,24 @@ class OnFailureChangeStatusTask(base.CinderTask):
|
||||
def __init__(self, db):
|
||||
super(OnFailureChangeStatusTask, self).__init__(addons=[ACTION])
|
||||
self.db = db
|
||||
self.requires.update(['volume_id'])
|
||||
self.optional.update(['volume_spec'])
|
||||
|
||||
def __call__(self, context, volume_id, volume_spec=None):
|
||||
def execute(self, context, volume_id, volume_spec):
|
||||
# Save these items since we only use them if a reversion is triggered.
|
||||
return {
|
||||
'volume_id': volume_id,
|
||||
'volume_spec': volume_spec,
|
||||
}
|
||||
|
||||
def revert(self, context, result, cause):
|
||||
def revert(self, context, result, flow_failures, **kwargs):
|
||||
if isinstance(result, misc.Failure):
|
||||
return
|
||||
volume_spec = result.get('volume_spec')
|
||||
if not volume_spec:
|
||||
# Attempt to use it from a later task that *should* have populated
|
||||
# this from the database. It is not needed to be found since
|
||||
# reverting will continue without it.
|
||||
volume_spec = _find_result_spec(cause.flow)
|
||||
|
||||
# Restore the source volume status and set the volume to error status.
|
||||
volume_id = result['volume_id']
|
||||
_restore_source_status(context, self.db, volume_spec)
|
||||
_error_out_volume(context, self.db, volume_id, reason=cause.exc)
|
||||
_error_out_volume(context, self.db, volume_id)
|
||||
LOG.error(_("Volume %s: create failed"), volume_id)
|
||||
exc_info = False
|
||||
if all(cause.exc_info):
|
||||
exc_info = cause.exc_info
|
||||
LOG.error(_('Unexpected build error:'), exc_info=exc_info)
|
||||
|
||||
|
||||
class OnFailureRescheduleTask(base.CinderTask):
|
||||
@ -846,10 +833,10 @@ class OnFailureRescheduleTask(base.CinderTask):
|
||||
"""
|
||||
|
||||
def __init__(self, reschedule_context, db, scheduler_rpcapi):
|
||||
super(OnFailureRescheduleTask, self).__init__(addons=[ACTION])
|
||||
self.requires.update(['filter_properties', 'image_id', 'request_spec',
|
||||
'snapshot_id', 'volume_id'])
|
||||
self.optional.update(['volume_spec'])
|
||||
requires = ['filter_properties', 'image_id', 'request_spec',
|
||||
'snapshot_id', 'volume_id', 'context']
|
||||
super(OnFailureRescheduleTask, self).__init__(addons=[ACTION],
|
||||
requires=requires)
|
||||
self.scheduler_rpcapi = scheduler_rpcapi
|
||||
self.db = db
|
||||
self.reschedule_context = reschedule_context
|
||||
@ -878,27 +865,8 @@ class OnFailureRescheduleTask(base.CinderTask):
|
||||
exception.ImageUnacceptable,
|
||||
]
|
||||
|
||||
def _is_reschedulable(self, cause):
|
||||
# Figure out the type of the causes exception and compare it against
|
||||
# our black-list of exception types that will not cause rescheduling.
|
||||
exc_type, value = cause.exc_info[:2]
|
||||
# If we don't have a type from exc_info but we do have a exception in
|
||||
# the cause, try to get the type from that instead.
|
||||
if not value:
|
||||
value = cause.exc
|
||||
if not exc_type and value:
|
||||
exc_type = type(value)
|
||||
if exc_type and exc_type in self.no_reschedule_types:
|
||||
return False
|
||||
# Couldn't figure it out, by default assume whatever the cause was can
|
||||
# be fixed by rescheduling.
|
||||
#
|
||||
# NOTE(harlowja): Crosses fingers.
|
||||
return True
|
||||
|
||||
def __call__(self, context, *args, **kwargs):
|
||||
# Save these items since we only use them if a reversion is triggered.
|
||||
return kwargs.copy()
|
||||
def execute(self, **kwargs):
|
||||
pass
|
||||
|
||||
def _reschedule(self, context, cause, request_spec, filter_properties,
|
||||
snapshot_id, image_id, volume_id, **kwargs):
|
||||
@ -919,7 +887,7 @@ class OnFailureRescheduleTask(base.CinderTask):
|
||||
{'volume_id': volume_id,
|
||||
'method': _make_pretty_name(create_volume),
|
||||
'num': num_attempts,
|
||||
'reason': _exception_to_unicode(cause.exc)})
|
||||
'reason': cause.exception_str})
|
||||
|
||||
if all(cause.exc_info):
|
||||
# Stringify to avoid circular ref problem in json serialization
|
||||
@ -958,83 +926,23 @@ class OnFailureRescheduleTask(base.CinderTask):
|
||||
LOG.exception(_("Volume %s: resetting 'creating' status failed"),
|
||||
volume_id)
|
||||
|
||||
def revert(self, context, result, cause):
|
||||
volume_spec = result.get('volume_spec')
|
||||
if not volume_spec:
|
||||
# Find it from a prior task that populated this from the database.
|
||||
volume_spec = _find_result_spec(cause.flow)
|
||||
volume_id = result['volume_id']
|
||||
def revert(self, context, result, flow_failures, **kwargs):
|
||||
# Check if we have a cause which can tell us not to reschedule.
|
||||
for failure in flow_failures.values():
|
||||
if failure.check(self.no_reschedule_types):
|
||||
return
|
||||
|
||||
volume_id = kwargs['volume_id']
|
||||
# Use a different context when rescheduling.
|
||||
if self.reschedule_context:
|
||||
context = self.reschedule_context
|
||||
|
||||
# If we are now supposed to reschedule (or unable to), then just
|
||||
# restore the source volume status and set the volume to error status.
|
||||
def do_error_revert():
|
||||
LOG.debug(_("Failing volume %s creation by altering volume status"
|
||||
" instead of rescheduling"), volume_id)
|
||||
_restore_source_status(context, self.db, volume_spec)
|
||||
_error_out_volume(context, self.db, volume_id, reason=cause.exc)
|
||||
LOG.error(_("Volume %s: create failed"), volume_id)
|
||||
|
||||
# Check if we have a cause which can tell us not to reschedule.
|
||||
if not self._is_reschedulable(cause):
|
||||
do_error_revert()
|
||||
else:
|
||||
try:
|
||||
cause = list(flow_failures.values())[0]
|
||||
self._pre_reschedule(context, volume_id)
|
||||
self._reschedule(context, cause, **result)
|
||||
self._reschedule(context, cause, **kwargs)
|
||||
self._post_reschedule(context, volume_id)
|
||||
except exception.CinderException:
|
||||
LOG.exception(_("Volume %s: rescheduling failed"), volume_id)
|
||||
# NOTE(harlowja): Do error volume status changing instead.
|
||||
do_error_revert()
|
||||
exc_info = False
|
||||
if all(cause.exc_info):
|
||||
exc_info = cause.exc_info
|
||||
LOG.error(_('Unexpected build error:'), exc_info=exc_info)
|
||||
|
||||
|
||||
class NotifySchedulerFailureTask(base.CinderTask):
|
||||
"""Helper task that notifies some external service on failure.
|
||||
|
||||
Reversion strategy: On failure of any flow that includes this task the
|
||||
request specification associated with that flow will be extracted and
|
||||
sent as a payload to the notification service under the given methods
|
||||
scheduler topic.
|
||||
"""
|
||||
|
||||
def __init__(self, method):
|
||||
super(NotifySchedulerFailureTask, self).__init__(addons=[ACTION])
|
||||
self.requires.update(['request_spec', 'volume_id'])
|
||||
self.method = method
|
||||
self.topic = 'scheduler.%s' % self.method
|
||||
self.publisher_id = notifier.publisher_id("scheduler")
|
||||
|
||||
def __call__(self, context, **kwargs):
|
||||
# Save these items since we only use them if a reversion is triggered.
|
||||
return kwargs.copy()
|
||||
|
||||
def revert(self, context, result, cause):
|
||||
request_spec = result['request_spec']
|
||||
volume_id = result['volume_id']
|
||||
volume_properties = request_spec['volume_properties']
|
||||
payload = {
|
||||
'request_spec': request_spec,
|
||||
'volume_properties': volume_properties,
|
||||
'volume_id': volume_id,
|
||||
'state': 'error',
|
||||
'method': self.method,
|
||||
'reason': unicode(cause.exc),
|
||||
}
|
||||
try:
|
||||
notifier.notify(context, self.publisher_id, self.topic,
|
||||
notifier.ERROR, payload)
|
||||
except exception.CinderException:
|
||||
LOG.exception(_("Failed notifying on %(topic)s "
|
||||
"payload %(payload)s") % {'topic': self.topic,
|
||||
'payload': payload})
|
||||
|
||||
|
||||
class ExtractSchedulerSpecTask(base.CinderTask):
|
||||
@ -1043,12 +951,12 @@ class ExtractSchedulerSpecTask(base.CinderTask):
|
||||
Reversion strategy: N/A
|
||||
"""
|
||||
|
||||
def __init__(self, db):
|
||||
super(ExtractSchedulerSpecTask, self).__init__(addons=[ACTION])
|
||||
default_provides = set(['request_spec'])
|
||||
|
||||
def __init__(self, db, **kwargs):
|
||||
super(ExtractSchedulerSpecTask, self).__init__(addons=[ACTION],
|
||||
**kwargs)
|
||||
self.db = db
|
||||
self.requires.update(['image_id', 'request_spec', 'snapshot_id',
|
||||
'volume_id'])
|
||||
self.provides.update(['request_spec'])
|
||||
|
||||
def _populate_request_spec(self, context, volume_id, snapshot_id,
|
||||
image_id):
|
||||
@ -1077,7 +985,7 @@ class ExtractSchedulerSpecTask(base.CinderTask):
|
||||
'volume_type': list(dict(vol_type).iteritems()),
|
||||
}
|
||||
|
||||
def __call__(self, context, request_spec, volume_id, snapshot_id,
|
||||
def execute(self, context, request_spec, volume_id, snapshot_id,
|
||||
image_id):
|
||||
# For RPC version < 1.2 backward compatibility
|
||||
if request_spec is None:
|
||||
@ -1088,6 +996,33 @@ class ExtractSchedulerSpecTask(base.CinderTask):
|
||||
}
|
||||
|
||||
|
||||
class ExtractVolumeRefTask(base.CinderTask):
|
||||
"""Extracts volume reference for given volume id. """
|
||||
|
||||
default_provides = 'volume_ref'
|
||||
|
||||
def __init__(self, db):
|
||||
super(ExtractVolumeRefTask, self).__init__(addons=[ACTION])
|
||||
self.db = db
|
||||
|
||||
def execute(self, context, volume_id):
|
||||
# NOTE(harlowja): this will fetch the volume from the database, if
|
||||
# the volume has been deleted before we got here then this should fail.
|
||||
#
|
||||
# In the future we might want to have a lock on the volume_id so that
|
||||
# the volume can not be deleted while its still being created?
|
||||
volume_ref = self.db.volume_get(context, volume_id)
|
||||
|
||||
return volume_ref
|
||||
|
||||
def revert(self, context, volume_id, result, **kwargs):
|
||||
if isinstance(result, misc.Failure):
|
||||
return
|
||||
|
||||
_error_out_volume(context, self.db, volume_id)
|
||||
LOG.error(_("Volume %s: create failed"), volume_id)
|
||||
|
||||
|
||||
class ExtractVolumeSpecTask(base.CinderTask):
|
||||
"""Extracts a spec of a volume to be created into a common structure.
|
||||
|
||||
@ -1099,22 +1034,17 @@ class ExtractVolumeSpecTask(base.CinderTask):
|
||||
Reversion strategy: N/A
|
||||
"""
|
||||
|
||||
def __init__(self, db):
|
||||
super(ExtractVolumeSpecTask, self).__init__(addons=[ACTION])
|
||||
self.db = db
|
||||
self.requires.update(['filter_properties', 'image_id', 'snapshot_id',
|
||||
'source_volid', 'volume_id'])
|
||||
self.provides.update(['volume_spec', 'volume_ref'])
|
||||
default_provides = 'volume_spec'
|
||||
|
||||
def __call__(self, context, volume_id, **kwargs):
|
||||
def __init__(self, db):
|
||||
requires = ['image_id', 'snapshot_id', 'source_volid']
|
||||
super(ExtractVolumeSpecTask, self).__init__(addons=[ACTION],
|
||||
requires=requires)
|
||||
self.db = db
|
||||
|
||||
def execute(self, context, volume_ref, **kwargs):
|
||||
get_remote_image_service = glance.get_remote_image_service
|
||||
|
||||
# NOTE(harlowja): this will fetch the volume from the database, if
|
||||
# the volume has been deleted before we got here then this should fail.
|
||||
#
|
||||
# In the future we might want to have a lock on the volume_id so that
|
||||
# the volume can not be deleted while its still being created?
|
||||
volume_ref = self.db.volume_get(context, volume_id)
|
||||
volume_name = volume_ref['name']
|
||||
volume_size = utils.as_int(volume_ref['size'], quiet=False)
|
||||
|
||||
@ -1173,21 +1103,14 @@ class ExtractVolumeSpecTask(base.CinderTask):
|
||||
'image_service': image_service,
|
||||
})
|
||||
|
||||
return {
|
||||
'volume_spec': specs,
|
||||
# NOTE(harlowja): it appears like further usage of this volume_ref
|
||||
# result actually depend on it being a sqlalchemy object and not
|
||||
# just a plain dictionary so thats why we are storing this here.
|
||||
#
|
||||
# It was attempted to refetch it when needed in subsequent tasks,
|
||||
# but that caused sqlalchemy errors to occur (volume already open
|
||||
# or similar).
|
||||
#
|
||||
# In the future where this task could fail and be recovered from we
|
||||
# will need to store the volume_spec and recreate the volume_ref
|
||||
# on demand.
|
||||
'volume_ref': volume_ref,
|
||||
}
|
||||
return specs
|
||||
|
||||
def revert(self, context, result, **kwargs):
|
||||
if isinstance(result, misc.Failure):
|
||||
return
|
||||
volume_spec = result.get('volume_spec')
|
||||
# Restore the source volume status and set the volume to error status.
|
||||
_restore_source_status(context, self.db, volume_spec)
|
||||
|
||||
|
||||
class NotifyVolumeActionTask(base.CinderTask):
|
||||
@ -1199,12 +1122,11 @@ class NotifyVolumeActionTask(base.CinderTask):
|
||||
def __init__(self, db, host, event_suffix):
|
||||
super(NotifyVolumeActionTask, self).__init__(addons=[ACTION,
|
||||
event_suffix])
|
||||
self.requires.update(['volume_ref'])
|
||||
self.db = db
|
||||
self.event_suffix = event_suffix
|
||||
self.host = host
|
||||
|
||||
def __call__(self, context, volume_ref):
|
||||
def execute(self, context, volume_ref):
|
||||
volume_id = volume_ref['id']
|
||||
try:
|
||||
volume_utils.notify_about_volume_usage(context, volume_ref,
|
||||
@ -1226,11 +1148,12 @@ class CreateVolumeFromSpecTask(base.CinderTask):
|
||||
Reversion strategy: N/A
|
||||
"""
|
||||
|
||||
default_provides = 'volume'
|
||||
|
||||
def __init__(self, db, host, driver):
|
||||
super(CreateVolumeFromSpecTask, self).__init__(addons=[ACTION])
|
||||
self.db = db
|
||||
self.driver = driver
|
||||
self.requires.update(['volume_spec', 'volume_ref'])
|
||||
# This maps the different volume specification types into the methods
|
||||
# that can create said volume type (aka this is a jump table).
|
||||
self._create_func_mapping = {
|
||||
@ -1472,7 +1395,7 @@ class CreateVolumeFromSpecTask(base.CinderTask):
|
||||
def _create_raw_volume(self, context, volume_ref, **kwargs):
|
||||
return self.driver.create_volume(volume_ref)
|
||||
|
||||
def __call__(self, context, volume_ref, volume_spec):
|
||||
def execute(self, context, volume_ref, volume_spec):
|
||||
# we can't do anything if the driver didn't init
|
||||
if not self.driver.initialized:
|
||||
LOG.error(_("Unable to create volume, driver not initialized"))
|
||||
@ -1538,6 +1461,8 @@ class CreateVolumeFromSpecTask(base.CinderTask):
|
||||
{'volume_id': volume_id, 'model': model_update})
|
||||
raise exception.ExportFailure(reason=ex)
|
||||
|
||||
return volume_ref
|
||||
|
||||
|
||||
class CreateVolumeOnFinishTask(NotifyVolumeActionTask):
|
||||
"""On successful volume creation this will perform final volume actions.
|
||||
@ -1553,13 +1478,12 @@ class CreateVolumeOnFinishTask(NotifyVolumeActionTask):
|
||||
|
||||
def __init__(self, db, host, event_suffix):
|
||||
super(CreateVolumeOnFinishTask, self).__init__(db, host, event_suffix)
|
||||
self.requires.update(['volume_spec'])
|
||||
self.status_translation = {
|
||||
'migration_target_creating': 'migration_target',
|
||||
}
|
||||
|
||||
def __call__(self, context, volume_ref, volume_spec):
|
||||
volume_id = volume_ref['id']
|
||||
def execute(self, context, volume, volume_spec):
|
||||
volume_id = volume['id']
|
||||
new_status = self.status_translation.get(volume_spec.get('status'),
|
||||
'available')
|
||||
update = {
|
||||
@ -1573,7 +1497,7 @@ class CreateVolumeOnFinishTask(NotifyVolumeActionTask):
|
||||
# 'building' if this fails)??
|
||||
volume_ref = self.db.volume_update(context, volume_id, update)
|
||||
# Now use the parent to notify.
|
||||
super(CreateVolumeOnFinishTask, self).__call__(context, volume_ref)
|
||||
super(CreateVolumeOnFinishTask, self).execute(context, volume_ref)
|
||||
except exception.CinderException:
|
||||
LOG.exception(_("Failed updating volume %(volume_id)s with "
|
||||
"%(update)s") % {'volume_id': volume_id,
|
||||
@ -1605,31 +1529,26 @@ def get_api_flow(scheduler_rpcapi, volume_rpcapi, db,
|
||||
flow_name = ACTION.replace(":", "_") + "_api"
|
||||
api_flow = linear_flow.Flow(flow_name)
|
||||
|
||||
# This injects the initial starting flow values into the workflow so that
|
||||
# the dependency order of the tasks provides/requires can be correctly
|
||||
# determined.
|
||||
api_flow.add(base.InjectTask(create_what, addons=[ACTION]))
|
||||
api_flow.add(ExtractVolumeRequestTask(image_service,
|
||||
az_check_functor))
|
||||
api_flow.add(QuotaReserveTask())
|
||||
v_uuid = api_flow.add(EntryCreateTask(db))
|
||||
api_flow.add(QuotaCommitTask())
|
||||
|
||||
# If after committing something fails, ensure we set the db to failure
|
||||
# before reverting any prior tasks.
|
||||
api_flow.add(OnFailureChangeStatusTask(db))
|
||||
api_flow.add(ExtractVolumeRequestTask(
|
||||
image_service,
|
||||
az_check_functor,
|
||||
rebind={'size': 'raw_size',
|
||||
'availability_zone': 'raw_availability_zone',
|
||||
'volume_type': 'raw_volume_type'}))
|
||||
api_flow.add(QuotaReserveTask(),
|
||||
EntryCreateTask(db),
|
||||
QuotaCommitTask())
|
||||
|
||||
# This will cast it out to either the scheduler or volume manager via
|
||||
# the rpc apis provided.
|
||||
api_flow.add(VolumeCastTask(scheduler_rpcapi, volume_rpcapi, db))
|
||||
|
||||
# Note(harlowja): this will return the flow as well as the uuid of the
|
||||
# task which will produce the 'volume' database reference (since said
|
||||
# reference is returned to other callers in the api for further usage).
|
||||
return (flow_utils.attach_debug_listeners(api_flow), v_uuid)
|
||||
# Now load (but do not run) the flow using the provided initial data.
|
||||
return taskflow.engines.load(api_flow, store=create_what)
|
||||
|
||||
|
||||
def get_scheduler_flow(db, driver, request_spec=None, filter_properties=None,
|
||||
def get_scheduler_flow(context, db, driver, request_spec=None,
|
||||
filter_properties=None,
|
||||
volume_id=None, snapshot_id=None, image_id=None):
|
||||
|
||||
"""Constructs and returns the scheduler entrypoint flow.
|
||||
@ -1643,29 +1562,23 @@ def get_scheduler_flow(db, driver, request_spec=None, filter_properties=None,
|
||||
4. Uses provided driver to to then select and continue processing of
|
||||
volume request.
|
||||
"""
|
||||
|
||||
flow_name = ACTION.replace(":", "_") + "_scheduler"
|
||||
scheduler_flow = linear_flow.Flow(flow_name)
|
||||
|
||||
# This injects the initial starting flow values into the workflow so that
|
||||
# the dependency order of the tasks provides/requires can be correctly
|
||||
# determined.
|
||||
scheduler_flow.add(base.InjectTask({
|
||||
'request_spec': request_spec,
|
||||
create_what = {
|
||||
'context': context,
|
||||
'raw_request_spec': request_spec,
|
||||
'filter_properties': filter_properties,
|
||||
'volume_id': volume_id,
|
||||
'snapshot_id': snapshot_id,
|
||||
'image_id': image_id,
|
||||
}, addons=[ACTION]))
|
||||
}
|
||||
|
||||
flow_name = ACTION.replace(":", "_") + "_scheduler"
|
||||
scheduler_flow = linear_flow.Flow(flow_name)
|
||||
|
||||
# This will extract and clean the spec from the starting values.
|
||||
scheduler_flow.add(ExtractSchedulerSpecTask(db))
|
||||
scheduler_flow.add(ExtractSchedulerSpecTask(
|
||||
db,
|
||||
rebind={'request_spec': 'raw_request_spec'}))
|
||||
|
||||
# The decorator application here ensures that the method gets the right
|
||||
# requires attributes automatically by examining the underlying functions
|
||||
# arguments.
|
||||
|
||||
@decorators.task
|
||||
def schedule_create_volume(context, request_spec, filter_properties):
|
||||
|
||||
def _log_failure(cause):
|
||||
@ -1711,16 +1624,16 @@ def get_scheduler_flow(db, driver, request_spec=None, filter_properties=None,
|
||||
_log_failure(e)
|
||||
_error_out_volume(context, db, volume_id, reason=e)
|
||||
|
||||
scheduler_flow.add(schedule_create_volume)
|
||||
scheduler_flow.add(task.FunctorTask(schedule_create_volume))
|
||||
|
||||
return flow_utils.attach_debug_listeners(scheduler_flow)
|
||||
# Now load (but do not run) the flow using the provided initial data.
|
||||
return taskflow.engines.load(scheduler_flow, store=create_what)
|
||||
|
||||
|
||||
def get_manager_flow(db, driver, scheduler_rpcapi, host, volume_id,
|
||||
request_spec=None, filter_properties=None,
|
||||
allow_reschedule=True,
|
||||
snapshot_id=None, image_id=None, source_volid=None,
|
||||
reschedule_context=None):
|
||||
def get_manager_flow(context, db, driver, scheduler_rpcapi, host, volume_id,
|
||||
allow_reschedule, reschedule_context, request_spec,
|
||||
filter_properties, snapshot_id=None, image_id=None,
|
||||
source_volid=None):
|
||||
"""Constructs and returns the manager entrypoint flow.
|
||||
|
||||
This flow will do the following:
|
||||
@ -1739,44 +1652,29 @@ def get_manager_flow(db, driver, scheduler_rpcapi, host, volume_id,
|
||||
flow_name = ACTION.replace(":", "_") + "_manager"
|
||||
volume_flow = linear_flow.Flow(flow_name)
|
||||
|
||||
# Determine if we are allowed to reschedule since this affects how
|
||||
# failures will be handled.
|
||||
if not filter_properties:
|
||||
filter_properties = {}
|
||||
if not request_spec and allow_reschedule:
|
||||
LOG.debug(_("No request spec, will not reschedule"))
|
||||
allow_reschedule = False
|
||||
if not filter_properties.get('retry', None) and allow_reschedule:
|
||||
LOG.debug(_("No retry filter property or associated "
|
||||
"retry info, will not reschedule"))
|
||||
allow_reschedule = False
|
||||
|
||||
# This injects the initial starting flow values into the workflow so that
|
||||
# the dependency order of the tasks provides/requires can be correctly
|
||||
# determined.
|
||||
volume_flow.add(base.InjectTask({
|
||||
create_what = {
|
||||
'context': context,
|
||||
'filter_properties': filter_properties,
|
||||
'image_id': image_id,
|
||||
'request_spec': request_spec,
|
||||
'snapshot_id': snapshot_id,
|
||||
'source_volid': source_volid,
|
||||
'volume_id': volume_id,
|
||||
}, addons=[ACTION]))
|
||||
}
|
||||
|
||||
# We can actually just check if we should reschedule on failure ahead of
|
||||
# time instead of trying to determine this later, certain values are needed
|
||||
# to reschedule and without them we should just avoid rescheduling.
|
||||
if not allow_reschedule:
|
||||
# On failure ensure that we just set the volume status to error.
|
||||
LOG.debug(_("Retry info not present, will not reschedule"))
|
||||
volume_flow.add(OnFailureChangeStatusTask(db))
|
||||
else:
|
||||
volume_flow.add(ExtractVolumeRefTask(db))
|
||||
|
||||
if allow_reschedule and request_spec:
|
||||
volume_flow.add(OnFailureRescheduleTask(reschedule_context,
|
||||
db, scheduler_rpcapi))
|
||||
|
||||
volume_flow.add(ExtractVolumeSpecTask(db))
|
||||
volume_flow.add(NotifyVolumeActionTask(db, host, "create.start"))
|
||||
volume_flow.add(CreateVolumeFromSpecTask(db, host, driver))
|
||||
volume_flow.add(CreateVolumeOnFinishTask(db, host, "create.end"))
|
||||
volume_flow.add(ExtractVolumeSpecTask(db),
|
||||
NotifyVolumeActionTask(db, host, "create.start"),
|
||||
CreateVolumeFromSpecTask(db, host, driver),
|
||||
CreateVolumeOnFinishTask(db, host, "create.end"))
|
||||
|
||||
return flow_utils.attach_debug_listeners(volume_flow)
|
||||
# Now load (but do not run) the flow using the provided initial data.
|
||||
return taskflow.engines.load(volume_flow, store=create_what)
|
||||
|
@ -1,55 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
|
||||
# Copyright (c) 2013 OpenStack Foundation
|
||||
# Copyright 2010 United States Government as represented by the
|
||||
# Administrator of the National Aeronautics and Space Administration.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from cinder.openstack.common import log as logging
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def attach_debug_listeners(flow):
|
||||
"""Sets up a nice set of debug listeners for the flow.
|
||||
|
||||
These listeners will log when tasks/flows are transitioning from state to
|
||||
state so that said states can be seen in the debug log output which is very
|
||||
useful for figuring out where problems are occurring.
|
||||
"""
|
||||
|
||||
def flow_log_change(state, details):
|
||||
# TODO(harlowja): the bug 1214083 is causing problems
|
||||
LOG.debug(_("%(flow)s has moved into state %(state)s from state"
|
||||
" %(old_state)s") % {'state': state,
|
||||
'old_state': details.get('old_state'),
|
||||
'flow': str(details['flow'])})
|
||||
|
||||
def task_log_change(state, details):
|
||||
# TODO(harlowja): the bug 1214083 is causing problems
|
||||
LOG.debug(_("%(flow)s has moved %(runner)s into state %(state)s with"
|
||||
" result: %(result)s") % {'state': state,
|
||||
'flow': str(details['flow']),
|
||||
'runner': str(details['runner']),
|
||||
'result': details.get('result')})
|
||||
|
||||
# Register * for all state changes (and not selective state changes to be
|
||||
# called upon) since all the changes is more useful.
|
||||
flow.notifier.register('*', flow_log_change)
|
||||
flow.task_notifier.register('*', task_log_change)
|
||||
return flow
|
@ -61,8 +61,6 @@ from cinder.volume import rpcapi as volume_rpcapi
|
||||
from cinder.volume import utils as volume_utils
|
||||
from cinder.volume import volume_types
|
||||
|
||||
from cinder.taskflow import states
|
||||
|
||||
from eventlet.greenpool import GreenPool
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
@ -289,23 +287,31 @@ class VolumeManager(manager.SchedulerDependentManager):
|
||||
def create_volume(self, context, volume_id, request_spec=None,
|
||||
filter_properties=None, allow_reschedule=True,
|
||||
snapshot_id=None, image_id=None, source_volid=None):
|
||||
"""Creates and exports the volume."""
|
||||
|
||||
flow = create_volume.get_manager_flow(
|
||||
"""Creates and exports the volume."""
|
||||
context_saved = context.deepcopy()
|
||||
context = context.elevated()
|
||||
if filter_properties is None:
|
||||
filter_properties = {}
|
||||
|
||||
try:
|
||||
flow_engine = create_volume.get_manager_flow(
|
||||
context,
|
||||
self.db,
|
||||
self.driver,
|
||||
self.scheduler_rpcapi,
|
||||
self.host,
|
||||
volume_id,
|
||||
request_spec=request_spec,
|
||||
filter_properties=filter_properties,
|
||||
allow_reschedule=allow_reschedule,
|
||||
snapshot_id=snapshot_id,
|
||||
image_id=image_id,
|
||||
source_volid=source_volid,
|
||||
reschedule_context=context.deepcopy())
|
||||
|
||||
assert flow, _('Manager volume flow not retrieved')
|
||||
allow_reschedule=allow_reschedule,
|
||||
reschedule_context=context_saved,
|
||||
request_spec=request_spec,
|
||||
filter_properties=filter_properties)
|
||||
except Exception:
|
||||
raise exception.CinderException(
|
||||
_("Failed to create manager volume flow"))
|
||||
|
||||
if snapshot_id is not None:
|
||||
# Make sure the snapshot is not deleted until we are done with it.
|
||||
@ -317,11 +323,11 @@ class VolumeManager(manager.SchedulerDependentManager):
|
||||
locked_action = None
|
||||
|
||||
def _run_flow():
|
||||
flow.run(context.elevated())
|
||||
if flow.state != states.SUCCESS:
|
||||
msg = _("Failed to successfully complete manager volume "
|
||||
"workflow")
|
||||
raise exception.CinderException(msg)
|
||||
# This code executes create volume flow. If something goes wrong,
|
||||
# flow reverts all job that was done and reraises an exception.
|
||||
# Otherwise, all data that was generated by flow becomes available
|
||||
# in flow engine's storage.
|
||||
flow_engine.run()
|
||||
|
||||
@utils.synchronized(locked_action, external=True)
|
||||
def _run_flow_locked():
|
||||
@ -332,8 +338,9 @@ class VolumeManager(manager.SchedulerDependentManager):
|
||||
else:
|
||||
_run_flow_locked()
|
||||
|
||||
self._reset_stats()
|
||||
return volume_id
|
||||
# Fetch created volume from storage
|
||||
volume_ref = flow_engine.storage.fetch('volume')
|
||||
return volume_ref['id']
|
||||
|
||||
@utils.require_driver_initialized
|
||||
@locked_volume_operation
|
||||
|
@ -18,6 +18,7 @@ python-keystoneclient>=0.4.1
|
||||
python-novaclient>=2.15.0
|
||||
python-swiftclient>=1.5
|
||||
Routes>=1.12.3
|
||||
taskflow>=0.1.1,<0.2
|
||||
rtslib-fb>=2.1.39
|
||||
six>=1.4.1
|
||||
SQLAlchemy>=0.7.8,<=0.7.99
|
||||
|
@ -1,7 +0,0 @@
|
||||
[DEFAULT]
|
||||
|
||||
# The list of primitives to copy from taskflow
|
||||
primitives=flow.linear_flow,task
|
||||
|
||||
# The base module to hold the copy of taskflow
|
||||
base=cinder
|
Loading…
Reference in New Issue
Block a user