Switch create volume commands to Taskflow 0.1.1

- Old TaskFlow code was removed from Cinder.

- TaskFlow 0.1.1 was added to Cinder requirements.

- Create volume flows for volume.api, volume.manager and
  scheduler.manager were updated to use taskFlow 0.1.1

Partially implements: blueprint create-volume-flow
Change-Id: Idbac8d001436f02978b366fbb3205ce84c847267
This commit is contained in:
anastasia-karpinska 2013-12-10 23:12:31 +02:00 committed by Anastasia Karpinska
parent 5cb91b70b9
commit ebd52f2e6d
18 changed files with 247 additions and 1849 deletions

View File

@ -34,7 +34,6 @@ from cinder.openstack.common.notifier import api as notifier
from cinder.volume.flows import create_volume
from cinder.volume import rpcapi as volume_rpcapi
from cinder.taskflow import states
scheduler_driver_opt = cfg.StrOpt('scheduler_driver',
default='cinder.scheduler.filter_scheduler.'
@ -76,17 +75,18 @@ class SchedulerManager(manager.Manager):
image_id=None, request_spec=None,
filter_properties=None):
flow = create_volume.get_scheduler_flow(db, self.driver,
request_spec,
filter_properties,
volume_id, snapshot_id,
image_id)
assert flow, _('Schedule volume flow not retrieved')
flow.run(context)
if flow.state != states.SUCCESS:
LOG.warn(_("Failed to successfully complete"
" schedule volume using flow: %s"), flow)
try:
flow_engine = create_volume.get_scheduler_flow(context,
db, self.driver,
request_spec,
filter_properties,
volume_id,
snapshot_id,
image_id)
except Exception:
raise exception.CinderException(
_("Failed to create scheduler manager volume flow"))
flow_engine.run()
def request_service_capabilities(self, context):
volume_rpcapi.VolumeAPI().publish_service_capabilities(context)

View File

@ -1 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4

View File

@ -1,276 +0,0 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import functools
import inspect
import types
# These arguments are ones that we will skip when parsing for requirements
# for a function to operate (when used as a task).
AUTO_ARGS = ('self', 'context', 'cls')
def is_decorated(functor):
if not isinstance(functor, (types.MethodType, types.FunctionType)):
return False
return getattr(extract(functor), '__task__', False)
def extract(functor):
# Extract the underlying functor if its a method since we can not set
# attributes on instance methods, this is supposedly fixed in python 3
# and later.
#
# TODO(harlowja): add link to this fix.
assert isinstance(functor, (types.MethodType, types.FunctionType))
if isinstance(functor, types.MethodType):
return functor.__func__
else:
return functor
def _mark_as_task(functor):
setattr(functor, '__task__', True)
def _get_wrapped(function):
"""Get the method at the bottom of a stack of decorators."""
if hasattr(function, '__wrapped__'):
return getattr(function, '__wrapped__')
if not hasattr(function, 'func_closure') or not function.func_closure:
return function
def _get_wrapped_function(function):
if not hasattr(function, 'func_closure') or not function.func_closure:
return None
for closure in function.func_closure:
func = closure.cell_contents
deeper_func = _get_wrapped_function(func)
if deeper_func:
return deeper_func
elif hasattr(closure.cell_contents, '__call__'):
return closure.cell_contents
return _get_wrapped_function(function)
def _take_arg(a):
if a in AUTO_ARGS:
return False
# In certain decorator cases it seems like we get the function to be
# decorated as an argument, we don't want to take that as a real argument.
if isinstance(a, collections.Callable):
return False
return True
def wraps(fn):
"""This will not be needed in python 3.2 or greater which already has this
built-in to its functools.wraps method.
"""
def wrapper(f):
f = functools.wraps(fn)(f)
f.__wrapped__ = getattr(fn, '__wrapped__', fn)
return f
return wrapper
def locked(f):
@wraps(f)
def wrapper(self, *args, **kwargs):
with self._lock:
return f(self, *args, **kwargs)
return wrapper
def task(*args, **kwargs):
"""Decorates a given function and ensures that all needed attributes of
that function are set so that the function can be used as a task.
"""
def decorator(f):
w_f = extract(f)
def noop(*args, **kwargs):
pass
# Mark as being a task
_mark_as_task(w_f)
# By default don't revert this.
w_f.revert = kwargs.pop('revert_with', noop)
# Associate a name of this task that is the module + function name.
w_f.name = "%s.%s" % (f.__module__, f.__name__)
# Sets the version of the task.
version = kwargs.pop('version', (1, 0))
f = _versionize(*version)(f)
# Attach any requirements this function needs for running.
requires_what = kwargs.pop('requires', [])
f = _requires(*requires_what, **kwargs)(f)
# Attach any optional requirements this function needs for running.
optional_what = kwargs.pop('optional', [])
f = _optional(*optional_what, **kwargs)(f)
# Attach any items this function provides as output
provides_what = kwargs.pop('provides', [])
f = _provides(*provides_what, **kwargs)(f)
@wraps(f)
def wrapper(*args, **kwargs):
return f(*args, **kwargs)
return wrapper
# This is needed to handle when the decorator has args or the decorator
# doesn't have args, python is rather weird here...
if kwargs or not args:
return decorator
else:
if isinstance(args[0], collections.Callable):
return decorator(args[0])
else:
return decorator
def _versionize(major, minor=None):
"""A decorator that marks the wrapped function with a major & minor version
number.
"""
if minor is None:
minor = 0
def decorator(f):
w_f = extract(f)
w_f.version = (major, minor)
@wraps(f)
def wrapper(*args, **kwargs):
return f(*args, **kwargs)
return wrapper
return decorator
def _optional(*args, **kwargs):
"""Attaches a set of items that the decorated function would like as input
to the functions underlying dictionary.
"""
def decorator(f):
w_f = extract(f)
if not hasattr(w_f, 'optional'):
w_f.optional = set()
w_f.optional.update([a for a in args if _take_arg(a)])
@wraps(f)
def wrapper(*args, **kwargs):
return f(*args, **kwargs)
return wrapper
# This is needed to handle when the decorator has args or the decorator
# doesn't have args, python is rather weird here...
if kwargs or not args:
return decorator
else:
if isinstance(args[0], collections.Callable):
return decorator(args[0])
else:
return decorator
def _requires(*args, **kwargs):
"""Attaches a set of items that the decorated function requires as input
to the functions underlying dictionary.
"""
def decorator(f):
w_f = extract(f)
if not hasattr(w_f, 'requires'):
w_f.requires = set()
if kwargs.pop('auto_extract', True):
inspect_what = _get_wrapped(f)
f_args = inspect.getargspec(inspect_what).args
w_f.requires.update([a for a in f_args if _take_arg(a)])
w_f.requires.update([a for a in args if _take_arg(a)])
@wraps(f)
def wrapper(*args, **kwargs):
return f(*args, **kwargs)
return wrapper
# This is needed to handle when the decorator has args or the decorator
# doesn't have args, python is rather weird here...
if kwargs or not args:
return decorator
else:
if isinstance(args[0], collections.Callable):
return decorator(args[0])
else:
return decorator
def _provides(*args, **kwargs):
"""Attaches a set of items that the decorated function provides as output
to the functions underlying dictionary.
"""
def decorator(f):
w_f = extract(f)
if not hasattr(f, 'provides'):
w_f.provides = set()
w_f.provides.update([a for a in args if _take_arg(a)])
@wraps(f)
def wrapper(*args, **kwargs):
return f(*args, **kwargs)
return wrapper
# This is needed to handle when the decorator has args or the decorator
# doesn't have args, python is rather weird here...
if kwargs or not args:
return decorator
else:
if isinstance(args[0], collections.Callable):
return decorator(args[0])
else:
return decorator

View File

@ -1,69 +0,0 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class TaskFlowException(Exception):
"""Base class for exceptions emitted from this library."""
pass
class Duplicate(TaskFlowException):
"""Raised when a duplicate entry is found."""
pass
class NotFound(TaskFlowException):
"""Raised when some entry in some object doesn't exist."""
pass
class AlreadyExists(TaskFlowException):
"""Raised when some entry in some object already exists."""
pass
class ClosedException(TaskFlowException):
"""Raised when an access on a closed object occurs."""
pass
class InvalidStateException(TaskFlowException):
"""Raised when a task/job/workflow is in an invalid state when an
operation is attempting to apply to said task/job/workflow.
"""
pass
class UnclaimableJobException(TaskFlowException):
"""Raised when a job can not be claimed."""
pass
class JobNotFound(TaskFlowException):
"""Raised when a job entry can not be found."""
pass
class MissingDependencies(InvalidStateException):
"""Raised when a task has dependencies that can not be satisfied."""
message = ("%(task)s requires %(requirements)s but no other task produces"
" said requirements")
def __init__(self, task, requirements):
message = self.message % {'task': task, 'requirements': requirements}
super(MissingDependencies, self).__init__(message)

View File

@ -1 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4

View File

@ -1,215 +0,0 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import threading
import uuid as uuidlib
import six
from cinder.taskflow import decorators
from cinder.taskflow import exceptions as exc
from cinder.taskflow import states
from cinder.taskflow import utils
@six.add_metaclass(abc.ABCMeta)
class Flow(object):
"""The base abstract class of all flow implementations.
It provides a set of parents to flows that have a concept of parent flows
as well as a state and state utility functions to the deriving classes. It
also provides a name and an identifier (uuid or other) to the flow so that
it can be uniquely identifed among many flows.
Flows are expected to provide (if desired) the following methods:
- add
- add_many
- interrupt
- reset
- rollback
- run
- soft_reset
"""
# Common states that certain actions can be performed in. If the flow
# is not in these sets of states then it is likely that the flow operation
# can not succeed.
RESETTABLE_STATES = set([
states.INTERRUPTED,
states.SUCCESS,
states.PENDING,
states.FAILURE,
])
SOFT_RESETTABLE_STATES = set([
states.INTERRUPTED,
])
UNINTERRUPTIBLE_STATES = set([
states.FAILURE,
states.SUCCESS,
states.PENDING,
])
RUNNABLE_STATES = set([
states.PENDING,
])
def __init__(self, name, parents=None, uuid=None):
self._name = str(name)
# The state of this flow.
self._state = states.PENDING
# If this flow has a parent flow/s which need to be reverted if
# this flow fails then please include them here to allow this child
# to call the parents...
if parents:
self.parents = tuple(parents)
else:
self.parents = ()
# Any objects that want to listen when a wf/task starts/stops/completes
# or errors should be registered here. This can be used to monitor
# progress and record tasks finishing (so that it becomes possible to
# store the result of a task in some persistent or semi-persistent
# storage backend).
self.notifier = utils.TransitionNotifier()
self.task_notifier = utils.TransitionNotifier()
# Ensure that modifications and/or multiple runs aren't happening
# at the same time in the same flow at the same time.
self._lock = threading.RLock()
# Assign this flow a unique identifer.
if uuid:
self._id = str(uuid)
else:
self._id = str(uuidlib.uuid4())
@property
def name(self):
"""A non-unique name for this flow (human readable)"""
return self._name
@property
def uuid(self):
"""Uniquely identifies this flow"""
return "f-%s" % (self._id)
@property
def state(self):
"""Provides a read-only view of the flow state."""
return self._state
def _change_state(self, context, new_state):
was_changed = False
old_state = self.state
with self._lock:
if self.state != new_state:
old_state = self.state
self._state = new_state
was_changed = True
if was_changed:
# Don't notify while holding the lock.
self.notifier.notify(self.state, details={
'context': context,
'flow': self,
'old_state': old_state,
})
def __str__(self):
lines = ["Flow: %s" % (self.name)]
lines.append("%s" % (self.uuid))
lines.append("%s" % (len(self.parents)))
lines.append("%s" % (self.state))
return "; ".join(lines)
@abc.abstractmethod
def add(self, task):
"""Adds a given task to this flow.
Returns the uuid that is associated with the task for later operations
before and after it is ran.
"""
raise NotImplementedError()
@decorators.locked
def add_many(self, tasks):
"""Adds many tasks to this flow.
Returns a list of uuids (one for each task added).
"""
uuids = []
for t in tasks:
uuids.append(self.add(t))
return uuids
def interrupt(self):
"""Attempts to interrupt the current flow and any tasks that are
currently not running in the flow.
Returns how many tasks were interrupted (if any).
"""
if self.state in self.UNINTERRUPTIBLE_STATES:
raise exc.InvalidStateException(("Can not interrupt when"
" in state %s") % (self.state))
# Note(harlowja): Do *not* acquire the lock here so that the flow may
# be interrupted while running. This does mean the the above check may
# not be valid but we can worry about that if it becomes an issue.
old_state = self.state
if old_state != states.INTERRUPTED:
self._state = states.INTERRUPTED
self.notifier.notify(self.state, details={
'context': None,
'flow': self,
'old_state': old_state,
})
return 0
@decorators.locked
def reset(self):
"""Fully resets the internal state of this flow, allowing for the flow
to be ran again.
Note: Listeners are also reset.
"""
if self.state not in self.RESETTABLE_STATES:
raise exc.InvalidStateException(("Can not reset when"
" in state %s") % (self.state))
self.notifier.reset()
self.task_notifier.reset()
self._change_state(None, states.PENDING)
@decorators.locked
def soft_reset(self):
"""Partially resets the internal state of this flow, allowing for the
flow to be ran again from an interrupted state only.
"""
if self.state not in self.SOFT_RESETTABLE_STATES:
raise exc.InvalidStateException(("Can not soft reset when"
" in state %s") % (self.state))
self._change_state(None, states.PENDING)
@decorators.locked
def run(self, context, *args, **kwargs):
"""Executes the workflow."""
if self.state not in self.RUNNABLE_STATES:
raise exc.InvalidStateException("Unable to run flow when "
"in state %s" % (self.state))
@decorators.locked
def rollback(self, context, cause):
"""Performs rollback of this workflow and any attached parent workflows
if present.
"""
pass

View File

@ -1,271 +0,0 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import logging
from cinder.openstack.common import excutils
from cinder.taskflow import decorators
from cinder.taskflow import exceptions as exc
from cinder.taskflow import states
from cinder.taskflow import utils
from cinder.taskflow.patterns import base
LOG = logging.getLogger(__name__)
class Flow(base.Flow):
""""A linear chain of tasks that can be applied in order as one unit and
rolled back as one unit using the reverse order that the tasks have
been applied in.
Note(harlowja): Each task in the chain must have requirements
which are satisfied by the previous task/s in the chain.
"""
def __init__(self, name, parents=None, uuid=None):
super(Flow, self).__init__(name, parents, uuid)
# The tasks which have been applied will be collected here so that they
# can be reverted in the correct order on failure.
self._accumulator = utils.RollbackAccumulator()
# Tasks results are stored here. Lookup is by the uuid that was
# returned from the add function.
self.results = {}
# The previously left off iterator that can be used to resume from
# the last task (if interrupted and soft-reset).
self._leftoff_at = None
# All runners to run are collected here.
self._runners = []
self._connected = False
# The resumption strategy to use.
self.resumer = None
@decorators.locked
def add(self, task):
"""Adds a given task to this flow."""
assert isinstance(task, collections.Callable)
r = utils.Runner(task)
r.runs_before = list(reversed(self._runners))
self._runners.append(r)
self._reset_internals()
return r.uuid
def _reset_internals(self):
self._connected = False
self._leftoff_at = None
def _associate_providers(self, runner):
# Ensure that some previous task provides this input.
who_provides = {}
task_requires = runner.requires
for r in task_requires:
provider = None
for before_me in runner.runs_before:
if r in before_me.provides:
provider = before_me
break
if provider:
who_provides[r] = provider
# Ensure that the last task provides all the needed input for this
# task to run correctly.
missing_requires = task_requires - set(who_provides.keys())
if missing_requires:
raise exc.MissingDependencies(runner, sorted(missing_requires))
runner.providers.update(who_provides)
def __str__(self):
lines = ["LinearFlow: %s" % (self.name)]
lines.append("%s" % (self.uuid))
lines.append("%s" % (len(self._runners)))
lines.append("%s" % (len(self.parents)))
lines.append("%s" % (self.state))
return "; ".join(lines)
@decorators.locked
def remove(self, uuid):
index_removed = -1
for (i, r) in enumerate(self._runners):
if r.uuid == uuid:
index_removed = i
break
if index_removed == -1:
raise ValueError("No runner found with uuid %s" % (uuid))
else:
removed = self._runners.pop(index_removed)
self._reset_internals()
# Go and remove it from any runner after the removed runner since
# those runners may have had an attachment to it.
for r in self._runners[index_removed:]:
try:
r.runs_before.remove(removed)
except (IndexError, ValueError):
pass
def __len__(self):
return len(self._runners)
def _connect(self):
if self._connected:
return self._runners
for r in self._runners:
r.providers = {}
for r in reversed(self._runners):
self._associate_providers(r)
self._connected = True
return self._runners
def _ordering(self):
return iter(self._connect())
@decorators.locked
def run(self, context, *args, **kwargs):
super(Flow, self).run(context, *args, **kwargs)
def resume_it():
if self._leftoff_at is not None:
return ([], self._leftoff_at)
if self.resumer:
(finished, leftover) = self.resumer.resume(self,
self._ordering())
else:
finished = []
leftover = self._ordering()
return (finished, leftover)
self._change_state(context, states.STARTED)
try:
those_finished, leftover = resume_it()
except Exception:
with excutils.save_and_reraise_exception():
self._change_state(context, states.FAILURE)
def run_it(runner, failed=False, result=None, simulate_run=False):
try:
# Add the task to be rolled back *immediately* so that even if
# the task fails while producing results it will be given a
# chance to rollback.
rb = utils.RollbackTask(context, runner.task, result=None)
self._accumulator.add(rb)
self.task_notifier.notify(states.STARTED, details={
'context': context,
'flow': self,
'runner': runner,
})
if not simulate_run:
result = runner(context, *args, **kwargs)
else:
if failed:
# TODO(harlowja): make this configurable??
# If we previously failed, we want to fail again at
# the same place.
if not result:
# If no exception or exception message was provided
# or captured from the previous run then we need to
# form one for this task.
result = "%s failed running." % (runner.task)
if isinstance(result, basestring):
result = exc.InvalidStateException(result)
if not isinstance(result, Exception):
LOG.warn("Can not raise a non-exception"
" object: %s", result)
result = exc.InvalidStateException()
raise result
# Adjust the task result in the accumulator before
# notifying others that the task has finished to
# avoid the case where a listener might throw an
# exception.
rb.result = result
runner.result = result
self.results[runner.uuid] = result
self.task_notifier.notify(states.SUCCESS, details={
'context': context,
'flow': self,
'runner': runner,
})
except Exception as e:
runner.result = e
cause = utils.FlowFailure(runner, self, e)
with excutils.save_and_reraise_exception():
# Notify any listeners that the task has errored.
self.task_notifier.notify(states.FAILURE, details={
'context': context,
'flow': self,
'runner': runner,
})
self.rollback(context, cause)
if len(those_finished):
self._change_state(context, states.RESUMING)
for (r, details) in those_finished:
# Fake running the task so that we trigger the same
# notifications and state changes (and rollback that
# would have happened in a normal flow).
failed = states.FAILURE in details.get('states', [])
result = details.get('result')
run_it(r, failed=failed, result=result, simulate_run=True)
self._leftoff_at = leftover
self._change_state(context, states.RUNNING)
if self.state == states.INTERRUPTED:
return
was_interrupted = False
for r in leftover:
r.reset()
run_it(r)
if self.state == states.INTERRUPTED:
was_interrupted = True
break
if not was_interrupted:
# Only gets here if everything went successfully.
self._change_state(context, states.SUCCESS)
self._leftoff_at = None
@decorators.locked
def reset(self):
super(Flow, self).reset()
self.results = {}
self.resumer = None
self._accumulator.reset()
self._reset_internals()
@decorators.locked
def rollback(self, context, cause):
# Performs basic task by task rollback by going through the reverse
# order that tasks have finished and asking said task to undo whatever
# it has done. If this flow has any parent flows then they will
# also be called to rollback any tasks said parents contain.
#
# Note(harlowja): if a flow can more simply revert a whole set of
# tasks via a simpler command then it can override this method to
# accomplish that.
#
# For example, if each task was creating a file in a directory, then
# it's easier to just remove the directory than to ask each task to
# delete its file individually.
self._change_state(context, states.REVERTING)
try:
self._accumulator.rollback(cause)
finally:
self._change_state(context, states.FAILURE)
# Rollback any parents flows if they exist...
for p in self.parents:
p.rollback(context, cause)

View File

@ -1,40 +0,0 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# Job states.
CLAIMED = 'CLAIMED'
FAILURE = 'FAILURE'
PENDING = 'PENDING'
RUNNING = 'RUNNING'
SUCCESS = 'SUCCESS'
UNCLAIMED = 'UNCLAIMED'
# Flow states.
FAILURE = FAILURE
INTERRUPTED = 'INTERRUPTED'
PENDING = 'PENDING'
RESUMING = 'RESUMING'
REVERTING = 'REVERTING'
RUNNING = RUNNING
STARTED = 'STARTED'
SUCCESS = SUCCESS
# Task states.
FAILURE = FAILURE
STARTED = STARTED
SUCCESS = SUCCESS

View File

@ -1,67 +0,0 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import six
from cinder.taskflow import utils
@six.add_metaclass(abc.ABCMeta)
class Task(object):
"""An abstraction that defines a potential piece of work that can be
applied and can be reverted to undo the work as a single unit.
"""
def __init__(self, name):
self.name = name
# An *immutable* input 'resource' name set this task depends
# on existing before this task can be applied.
self.requires = set()
# An *immutable* input 'resource' name set this task would like to
# depends on existing before this task can be applied (but does not
# strongly depend on existing).
self.optional = set()
# An *immutable* output 'resource' name set this task
# produces that other tasks may depend on this task providing.
self.provides = set()
# This identifies the version of the task to be ran which
# can be useful in resuming older versions of tasks. Standard
# major, minor version semantics apply.
self.version = (1, 0)
def __str__(self):
return "%s==%s" % (self.name, utils.join(self.version, with_what="."))
@abc.abstractmethod
def __call__(self, context, *args, **kwargs):
"""Activate a given task which will perform some operation and return.
This method can be used to apply some given context and given set
of args and kwargs to accomplish some goal. Note that the result
that is returned needs to be serializable so that it can be passed
back into this task if reverting is triggered.
"""
raise NotImplementedError()
def revert(self, context, result, cause):
"""Revert this task using the given context, result that the apply
provided as well as any information which may have caused
said reversion.
"""
pass

View File

@ -1,464 +0,0 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import contextlib
import copy
import logging
import re
import sys
import threading
import time
import types
import uuid as uuidlib
from cinder.taskflow import decorators
LOG = logging.getLogger(__name__)
def get_attr(task, field, default=None):
if decorators.is_decorated(task):
# If its a decorated functor then the attributes will be either
# in the underlying function of the instancemethod or the function
# itself.
task = decorators.extract(task)
return getattr(task, field, default)
def join(itr, with_what=","):
pieces = [str(i) for i in itr]
return with_what.join(pieces)
def get_many_attr(obj, *attrs):
many = []
for a in attrs:
many.append(get_attr(obj, a, None))
return many
def get_task_version(task):
"""Gets a tasks *string* version, whether it is a task object/function."""
task_version = get_attr(task, 'version')
if isinstance(task_version, (list, tuple)):
task_version = join(task_version, with_what=".")
if task_version is not None and not isinstance(task_version, basestring):
task_version = str(task_version)
return task_version
def get_task_name(task):
"""Gets a tasks *string* name, whether it is a task object/function."""
task_name = ""
if isinstance(task, (types.MethodType, types.FunctionType)):
# If its a function look for the attributes that should have been
# set using the task() decorator provided in the decorators file. If
# those have not been set, then we should at least have enough basic
# information (not a version) to form a useful task name.
task_name = get_attr(task, 'name')
if not task_name:
name_pieces = [a for a in get_many_attr(task,
'__module__',
'__name__')
if a is not None]
task_name = join(name_pieces, ".")
else:
task_name = str(task)
return task_name
def is_version_compatible(version_1, version_2):
"""Checks for major version compatibility of two *string" versions."""
if version_1 == version_2:
# Equivalent exactly, so skip the rest.
return True
def _convert_to_pieces(version):
try:
pieces = []
for p in version.split("."):
p = p.strip()
if not len(p):
pieces.append(0)
continue
# Clean off things like 1alpha, or 2b and just select the
# digit that starts that entry instead.
p_match = re.match(r"(\d+)([A-Za-z]*)(.*)", p)
if p_match:
p = p_match.group(1)
pieces.append(int(p))
except (AttributeError, TypeError, ValueError):
pieces = []
return pieces
version_1_pieces = _convert_to_pieces(version_1)
version_2_pieces = _convert_to_pieces(version_2)
if len(version_1_pieces) == 0 or len(version_2_pieces) == 0:
return False
# Ensure major version compatibility to start.
major1 = version_1_pieces[0]
major2 = version_2_pieces[0]
if major1 != major2:
return False
return True
def await(check_functor, timeout=None):
if timeout is not None:
end_time = time.time() + max(0, timeout)
else:
end_time = None
# Use the same/similar scheme that the python condition class uses.
delay = 0.0005
while not check_functor():
time.sleep(delay)
if end_time is not None:
remaining = end_time - time.time()
if remaining <= 0:
return False
delay = min(delay * 2, remaining, 0.05)
else:
delay = min(delay * 2, 0.05)
return True
class LastFedIter(object):
"""An iterator which yields back the first item and then yields back
results from the provided iterator.
"""
def __init__(self, first, rest_itr):
self.first = first
self.rest_itr = rest_itr
def __iter__(self):
yield self.first
for i in self.rest_itr:
yield i
class FlowFailure(object):
"""When a task failure occurs the following object will be given to revert
and can be used to interrogate what caused the failure.
"""
def __init__(self, runner, flow, exception):
self.runner = runner
self.flow = flow
self.exc = exception
self.exc_info = sys.exc_info()
class RollbackTask(object):
"""A helper task that on being called will call the underlying callable
tasks revert method (if said method exists).
"""
def __init__(self, context, task, result):
self.task = task
self.result = result
self.context = context
def __str__(self):
return str(self.task)
def __call__(self, cause):
if ((hasattr(self.task, "revert") and
isinstance(self.task.revert, collections.Callable))):
self.task.revert(self.context, self.result, cause)
class Runner(object):
"""A helper class that wraps a task and can find the needed inputs for
the task to run, as well as providing a uuid and other useful functionality
for users of the task.
TODO(harlowja): replace with the task details object or a subclass of
that???
"""
def __init__(self, task, uuid=None):
assert isinstance(task, collections.Callable)
self.task = task
self.providers = {}
self.runs_before = []
self.result = None
if not uuid:
self._id = str(uuidlib.uuid4())
else:
self._id = str(uuid)
@property
def uuid(self):
return "r-%s" % (self._id)
@property
def requires(self):
return set(get_attr(self.task, 'requires', []))
@property
def provides(self):
return set(get_attr(self.task, 'provides', []))
@property
def optional(self):
return set(get_attr(self.task, 'optional', []))
@property
def version(self):
return get_task_version(self.task)
@property
def name(self):
return get_task_name(self.task)
def reset(self):
self.result = None
def __str__(self):
lines = ["Runner: %s" % (self.name)]
lines.append("%s" % (self.uuid))
lines.append("%s" % (self.version))
return "; ".join(lines)
def __call__(self, *args, **kwargs):
# Find all of our inputs first.
kwargs = dict(kwargs)
for (k, who_made) in self.providers.iteritems():
if who_made.result and k in who_made.result:
kwargs[k] = who_made.result[k]
else:
kwargs[k] = None
optional_keys = self.optional
optional_missing_keys = optional_keys - set(kwargs.keys())
if optional_missing_keys:
for k in optional_missing_keys:
for r in self.runs_before:
r_provides = r.provides
if k in r_provides and r.result and k in r.result:
kwargs[k] = r.result[k]
break
# And now finally run.
self.result = self.task(*args, **kwargs)
return self.result
class TransitionNotifier(object):
"""A utility helper class that can be used to subscribe to
notifications of events occurring as well as allow a entity to post said
notifications to subscribers.
"""
RESERVED_KEYS = ('details',)
ANY = '*'
def __init__(self):
self._listeners = collections.defaultdict(list)
def reset(self):
self._listeners = collections.defaultdict(list)
def notify(self, state, details):
listeners = list(self._listeners.get(self.ANY, []))
for i in self._listeners[state]:
if i not in listeners:
listeners.append(i)
if not listeners:
return
for (callback, args, kwargs) in listeners:
if args is None:
args = []
if kwargs is None:
kwargs = {}
kwargs['details'] = details
try:
callback(state, *args, **kwargs)
except Exception:
LOG.exception(("Failure calling callback %s to notify about"
" state transition %s"), callback, state)
def register(self, state, callback, args=None, kwargs=None):
assert isinstance(callback, collections.Callable)
for i, (cb, args, kwargs) in enumerate(self._listeners.get(state, [])):
if cb is callback:
raise ValueError("Callback %s already registered" % (callback))
if kwargs:
for k in self.RESERVED_KEYS:
if k in kwargs:
raise KeyError(("Reserved key '%s' not allowed in "
"kwargs") % k)
kwargs = copy.copy(kwargs)
if args:
args = copy.copy(args)
self._listeners[state].append((callback, args, kwargs))
def deregister(self, state, callback):
if state not in self._listeners:
return
for i, (cb, args, kwargs) in enumerate(self._listeners[state]):
if cb is callback:
self._listeners[state].pop(i)
break
class RollbackAccumulator(object):
"""A utility class that can help in organizing 'undo' like code
so that said code be rolled back on failure (automatically or manually)
by activating rollback callables that were inserted during said codes
progression.
"""
def __init__(self):
self._rollbacks = []
def add(self, *callables):
self._rollbacks.extend(callables)
def reset(self):
self._rollbacks = []
def __len__(self):
return len(self._rollbacks)
def __iter__(self):
# Rollbacks happen in the reverse order that they were added.
return reversed(self._rollbacks)
def __enter__(self):
return self
def rollback(self, cause):
LOG.warn("Activating %s rollbacks due to %s.", len(self), cause)
for (i, f) in enumerate(self):
LOG.debug("Calling rollback %s: %s", i + 1, f)
try:
f(cause)
except Exception:
LOG.exception(("Failed rolling back %s: %s due "
"to inner exception."), i + 1, f)
def __exit__(self, type, value, tb):
if any((value, type, tb)):
self.rollback(value)
class ReaderWriterLock(object):
"""A simple reader-writer lock.
Several readers can hold the lock simultaneously, and only one writer.
Write locks have priority over reads to prevent write starvation.
Public domain @ http://majid.info/blog/a-reader-writer-lock-for-python/
"""
def __init__(self):
self.rwlock = 0
self.writers_waiting = 0
self.monitor = threading.Lock()
self.readers_ok = threading.Condition(self.monitor)
self.writers_ok = threading.Condition(self.monitor)
@contextlib.contextmanager
def acquire(self, read=True):
"""Acquire a read or write lock in a context manager."""
try:
if read:
self.acquire_read()
else:
self.acquire_write()
yield self
finally:
self.release()
def acquire_read(self):
"""Acquire a read lock.
Several threads can hold this typeof lock.
It is exclusive with write locks.
"""
self.monitor.acquire()
while self.rwlock < 0 or self.writers_waiting:
self.readers_ok.wait()
self.rwlock += 1
self.monitor.release()
def acquire_write(self):
"""Acquire a write lock.
Only one thread can hold this lock, and only when no read locks
are also held.
"""
self.monitor.acquire()
while self.rwlock != 0:
self.writers_waiting += 1
self.writers_ok.wait()
self.writers_waiting -= 1
self.rwlock = -1
self.monitor.release()
def release(self):
"""Release a lock, whether read or write."""
self.monitor.acquire()
if self.rwlock < 0:
self.rwlock = 0
else:
self.rwlock -= 1
wake_writers = self.writers_waiting and self.rwlock == 0
wake_readers = self.writers_waiting == 0
self.monitor.release()
if wake_writers:
self.writers_ok.acquire()
self.writers_ok.notify()
self.writers_ok.release()
elif wake_readers:
self.readers_ok.acquire()
self.readers_ok.notifyAll()
self.readers_ok.release()
class LazyPluggable(object):
"""A pluggable backend loaded lazily based on some value."""
def __init__(self, pivot, **backends):
self.__backends = backends
self.__pivot = pivot
self.__backend = None
def __get_backend(self):
if not self.__backend:
backend_name = 'sqlalchemy'
backend = self.__backends[backend_name]
if isinstance(backend, tuple):
name = backend[0]
fromlist = backend[1]
else:
name = backend
fromlist = backend
self.__backend = __import__(name, None, None, fromlist)
return self.__backend
def __getattr__(self, key):
backend = self.__get_backend()
return getattr(backend, key)

View File

@ -30,6 +30,7 @@ import tempfile
import mox
from oslo.config import cfg
from taskflow.engines.action_engine import engine
from cinder.backup import driver as backup_driver
from cinder.brick.iscsi import iscsi
@ -46,7 +47,6 @@ from cinder.openstack.common.notifier import test_notifier
from cinder.openstack.common import rpc
import cinder.policy
from cinder import quota
from cinder.taskflow.patterns import linear_flow
from cinder import test
from cinder.tests.brick.fake_lvm import FakeBrickLVM
from cinder.tests import conf_fixture
@ -59,7 +59,6 @@ import cinder.volume
from cinder.volume import configuration as conf
from cinder.volume import driver
from cinder.volume.drivers import lvm
from cinder.volume.flows import create_volume
from cinder.volume import rpcapi as volume_rpcapi
from cinder.volume import utils as volutils
@ -465,7 +464,7 @@ class VolumeTestCase(BaseVolumeTestCase):
self.stubs.Set(self.volume.driver, 'create_volume_from_snapshot',
lambda *args, **kwargs: None)
orig_flow = linear_flow.Flow.run
orig_flow = engine.ActionEngine.run
def mock_flow_run(*args, **kwargs):
# ensure the lock has been taken
@ -492,7 +491,7 @@ class VolumeTestCase(BaseVolumeTestCase):
admin_ctxt = context.get_admin_context()
# mock the flow runner so we can do some checks
self.stubs.Set(linear_flow.Flow, 'run', mock_flow_run)
self.stubs.Set(engine.ActionEngine, 'run', mock_flow_run)
# locked
self.volume.create_volume(self.context, volume_id=dst_vol_id,
@ -528,7 +527,7 @@ class VolumeTestCase(BaseVolumeTestCase):
# mock the synchroniser so we can record events
self.stubs.Set(utils, 'synchronized', self._mock_synchronized)
orig_flow = linear_flow.Flow.run
orig_flow = engine.ActionEngine.run
def mock_flow_run(*args, **kwargs):
# ensure the lock has been taken
@ -551,7 +550,7 @@ class VolumeTestCase(BaseVolumeTestCase):
admin_ctxt = context.get_admin_context()
# mock the flow runner so we can do some checks
self.stubs.Set(linear_flow.Flow, 'run', mock_flow_run)
self.stubs.Set(engine.ActionEngine, 'run', mock_flow_run)
# locked
self.volume.create_volume(self.context, volume_id=dst_vol_id,
@ -1816,13 +1815,6 @@ class VolumeTestCase(BaseVolumeTestCase):
def fake_create_volume(*args, **kwargs):
raise exception.CinderException('fake exception')
def fake_reschedule_or_error(self, context, *args, **kwargs):
self.assertFalse(context.is_admin)
self.assertNotIn('admin', context.roles)
#compare context passed in with the context we saved
self.assertDictMatch(self.saved_ctxt.__dict__,
context.__dict__)
#create context for testing
ctxt = self.context.deepcopy()
if 'admin' in ctxt.roles:
@ -1831,8 +1823,6 @@ class VolumeTestCase(BaseVolumeTestCase):
#create one copy of context for future comparison
self.saved_ctxt = ctxt.deepcopy()
self.stubs.Set(create_volume.OnFailureRescheduleTask, '_reschedule',
fake_reschedule_or_error)
self.stubs.Set(self.volume.driver, 'create_volume', fake_create_volume)
volume_src = tests_utils.create_volume(self.context,

View File

@ -37,15 +37,12 @@ from cinder.openstack.common import timeutils
import cinder.policy
from cinder import quota
from cinder.scheduler import rpcapi as scheduler_rpcapi
from cinder import units
from cinder import utils
from cinder.volume.flows import create_volume
from cinder.volume import rpcapi as volume_rpcapi
from cinder.volume import utils as volume_utils
from cinder.volume import volume_types
from cinder.taskflow import states
volume_host_opt = cfg.BoolOpt('snapshot_same_host',
default=True,
@ -147,42 +144,34 @@ class API(base.Base):
return False
create_what = {
'size': size,
'context': context,
'raw_size': size,
'name': name,
'description': description,
'snapshot': snapshot,
'image_id': image_id,
'volume_type': volume_type,
'raw_volume_type': volume_type,
'metadata': metadata,
'availability_zone': availability_zone,
'raw_availability_zone': availability_zone,
'source_volume': source_volume,
'scheduler_hints': scheduler_hints,
'key_manager': self.key_manager,
'backup_source_volume': backup_source_volume,
}
(flow, uuid) = create_volume.get_api_flow(self.scheduler_rpcapi,
self.volume_rpcapi,
self.db,
self.image_service,
check_volume_az_zone,
create_what)
assert flow, _('Create volume flow not retrieved')
flow.run(context)
if flow.state != states.SUCCESS:
raise exception.CinderException(_("Failed to successfully complete"
" create volume workflow"))
# Extract the volume information from the task uuid that was specified
# to produce said information.
volume = None
try:
volume = flow.results[uuid]['volume']
except KeyError:
pass
flow_engine = create_volume.get_api_flow(self.scheduler_rpcapi,
self.volume_rpcapi,
self.db,
self.image_service,
check_volume_az_zone,
create_what)
except Exception:
raise exception.CinderException(
_("Failed to create api volume flow"))
# Raise an error, nobody provided it??
assert volume, _('Expected volume result not found')
flow_engine.run()
volume = flow_engine.storage.fetch('volume')
return volume
@wrap_check_policy

View File

@ -15,7 +15,7 @@
# under the License.
# For more information please visit: https://wiki.openstack.org/wiki/TaskFlow
from cinder.taskflow import task
from taskflow import task
def _make_task_name(cls, addons=None):
@ -34,28 +34,7 @@ class CinderTask(task.Task):
implement the given task as the task name.
"""
def __init__(self, addons=None):
def __init__(self, addons=None, **kwargs):
super(CinderTask, self).__init__(_make_task_name(self.__class__,
addons))
class InjectTask(CinderTask):
"""This injects a dict into the flow.
This injection is done so that the keys (and values) provided can be
dependended on by tasks further down the line. Since taskflow is dependency
based this can be considered the bootstrapping task that provides an
initial set of values for other tasks to get started with. If this did not
exist then tasks would fail locating there dependent tasks and the values
said dependent tasks produce.
Reversion strategy: N/A
"""
def __init__(self, inject_what, addons=None):
super(InjectTask, self).__init__(addons=addons)
self.provides.update(inject_what.keys())
self._inject = inject_what
def __call__(self, context):
return dict(self._inject)
addons),
**kwargs)

View File

@ -23,6 +23,10 @@
import traceback
from oslo.config import cfg
import taskflow.engines
from taskflow.patterns import linear_flow
from taskflow import task
from taskflow.utils import misc
from cinder import exception
from cinder.image import glance
@ -34,13 +38,9 @@ from cinder.openstack.common import strutils
from cinder.openstack.common import timeutils
from cinder import policy
from cinder import quota
from cinder.taskflow import decorators
from cinder.taskflow.patterns import linear_flow
from cinder.taskflow import task
from cinder import units
from cinder import utils
from cinder.volume.flows import base
from cinder.volume.flows import utils as flow_utils
from cinder.volume import utils as volume_utils
from cinder.volume import volume_types
@ -86,16 +86,6 @@ def _make_pretty_name(method):
return ".".join(meth_pieces)
def _find_result_spec(flow):
"""Find the last task that produced a valid volume_spec and returns it."""
for there_result in flow.results.values():
if not there_result or not 'volume_spec' in there_result:
continue
if there_result['volume_spec']:
return there_result['volume_spec']
return None
def _restore_source_status(context, db, volume_spec):
# NOTE(harlowja): Only if the type of the volume that was being created is
# the source volume type should we try to reset the source volume status
@ -171,25 +161,16 @@ class ExtractVolumeRequestTask(base.CinderTask):
Reversion strategy: N/A
"""
def __init__(self, image_service, az_check_functor=None):
super(ExtractVolumeRequestTask, self).__init__(addons=[ACTION])
# This task will produce the following outputs (said outputs can be
# saved to durable storage in the future so that the flow can be
# reconstructed elsewhere and continued).
self.provides.update(['availability_zone', 'size', 'snapshot_id',
'source_volid', 'volume_type', 'volume_type_id',
'encryption_key_id'])
# This task requires the following inputs to operate (provided
# automatically to __call__(). This is done so that the flow can
# be reconstructed elsewhere and continue running (in the future).
#
# It also is used to be able to link tasks that produce items to tasks
# that consume items (thus allowing the linking of the flow to be
# mostly automatic).
self.requires.update(['availability_zone', 'image_id', 'metadata',
'size', 'snapshot', 'source_volume',
'volume_type', 'key_manager',
'backup_source_volume'])
# This task will produce the following outputs (said outputs can be
# saved to durable storage in the future so that the flow can be
# reconstructed elsewhere and continued).
default_provides = set(['availability_zone', 'size', 'snapshot_id',
'source_volid', 'volume_type', 'volume_type_id',
'encryption_key_id'])
def __init__(self, image_service, az_check_functor=None, **kwargs):
super(ExtractVolumeRequestTask, self).__init__(addons=[ACTION],
**kwargs)
self.image_service = image_service
self.az_check_functor = az_check_functor
if not self.az_check_functor:
@ -451,9 +432,9 @@ class ExtractVolumeRequestTask(base.CinderTask):
return volume_type_id
def __call__(self, context, size, snapshot, image_id, source_volume,
availability_zone, volume_type, metadata,
key_manager, backup_source_volume):
def execute(self, context, size, snapshot, image_id, source_volume,
availability_zone, volume_type, metadata,
key_manager, backup_source_volume):
utils.check_exclusive_options(snapshot=snapshot,
imageRef=image_id,
@ -519,16 +500,18 @@ class EntryCreateTask(base.CinderTask):
Reversion strategy: remove the volume_id created from the database.
"""
def __init__(self, db):
super(EntryCreateTask, self).__init__(addons=[ACTION])
self.db = db
self.requires.update(['availability_zone', 'description', 'metadata',
'name', 'reservations', 'size', 'snapshot_id',
'source_volid', 'volume_type_id',
'encryption_key_id'])
self.provides.update(['volume_properties', 'volume_id'])
default_provides = set(['volume_properties', 'volume_id', 'volume'])
def __call__(self, context, **kwargs):
def __init__(self, db):
requires = ['availability_zone', 'description', 'metadata',
'name', 'reservations', 'size', 'snapshot_id',
'source_volid', 'volume_type_id', 'encryption_key_id']
super(EntryCreateTask, self).__init__(addons=[ACTION],
requires=requires)
self.db = db
self.provides.update()
def execute(self, context, **kwargs):
"""Creates a database entry for the given inputs and returns details.
Accesses the database and creates a new entry for the to be created
@ -569,9 +552,9 @@ class EntryCreateTask(base.CinderTask):
'volume': volume,
}
def revert(self, context, result, cause):
def revert(self, context, result, **kwargs):
# We never produced a result and therefore can't destroy anything.
if not result:
if isinstance(result, misc.Failure):
return
if context.quota_committed:
# Committed quota doesn't rollback as the volume has already been
@ -603,12 +586,12 @@ class QuotaReserveTask(base.CinderTask):
an automated or manual process.
"""
default_provides = set(['reservations'])
def __init__(self):
super(QuotaReserveTask, self).__init__(addons=[ACTION])
self.requires.update(['size', 'volume_type_id'])
self.provides.update(['reservations'])
def __call__(self, context, size, volume_type_id):
def execute(self, context, size, volume_type_id):
try:
reserve_opts = {'volumes': 1, 'gigabytes': size}
QUOTAS.add_volume_type_opts(context, reserve_opts, volume_type_id)
@ -648,15 +631,14 @@ class QuotaReserveTask(base.CinderTask):
"already consumed)")
LOG.warn(msg % {'s_pid': context.project_id,
'd_consumed': _consumed('volumes')})
allowed = quotas['volumes']
raise exception.VolumeLimitExceeded(allowed=quotas['volumes'])
else:
# If nothing was reraised, ensure we reraise the initial error
raise
def revert(self, context, result, cause):
def revert(self, context, result, **kwargs):
# We never produced a result and therefore can't destroy anything.
if not result:
if isinstance(result, misc.Failure):
return
if context.quota_committed:
# The reservations have already been committed and can not be
@ -691,16 +673,15 @@ class QuotaCommitTask(base.CinderTask):
def __init__(self):
super(QuotaCommitTask, self).__init__(addons=[ACTION])
self.requires.update(['reservations', 'volume_properties'])
def __call__(self, context, reservations, volume_properties):
def execute(self, context, reservations, volume_properties):
QUOTAS.commit(context, reservations)
context.quota_committed = True
return {'volume_properties': volume_properties}
def revert(self, context, result, cause):
def revert(self, context, result, **kwargs):
# We never produced a result and therefore can't destroy anything.
if not result:
if isinstance(result, misc.Failure):
return
volume = result['volume_properties']
try:
@ -729,13 +710,14 @@ class VolumeCastTask(base.CinderTask):
"""
def __init__(self, scheduler_rpcapi, volume_rpcapi, db):
super(VolumeCastTask, self).__init__(addons=[ACTION])
requires = ['image_id', 'scheduler_hints', 'snapshot_id',
'source_volid', 'volume_id', 'volume_type',
'volume_properties']
super(VolumeCastTask, self).__init__(addons=[ACTION],
requires=requires)
self.volume_rpcapi = volume_rpcapi
self.scheduler_rpcapi = scheduler_rpcapi
self.db = db
self.requires.update(['image_id', 'scheduler_hints', 'snapshot_id',
'source_volid', 'volume_id', 'volume_type',
'volume_properties'])
def _cast_create_volume(self, context, request_spec, filter_properties):
source_volid = request_spec['source_volid']
@ -786,7 +768,7 @@ class VolumeCastTask(base.CinderTask):
image_id=image_id,
source_volid=source_volid)
def __call__(self, context, **kwargs):
def execute(self, context, **kwargs):
scheduler_hints = kwargs.pop('scheduler_hints', None)
request_spec = kwargs.copy()
filter_properties = {}
@ -794,6 +776,20 @@ class VolumeCastTask(base.CinderTask):
filter_properties['scheduler_hints'] = scheduler_hints
self._cast_create_volume(context, request_spec, filter_properties)
def revert(self, context, result, flow_failures, **kwargs):
if isinstance(result, misc.Failure):
return
# Restore the source volume status and set the volume to error status.
volume_id = kwargs['volume_id']
_restore_source_status(context, self.db, kwargs)
_error_out_volume(context, self.db, volume_id)
LOG.error(_("Volume %s: create failed"), volume_id)
exc_info = False
if all(flow_failures[-1].exc_info):
exc_info = flow_failures[-1].exc_info
LOG.error(_('Unexpected build error:'), exc_info=exc_info)
class OnFailureChangeStatusTask(base.CinderTask):
"""Helper task that sets a volume id to status error.
@ -808,33 +804,24 @@ class OnFailureChangeStatusTask(base.CinderTask):
def __init__(self, db):
super(OnFailureChangeStatusTask, self).__init__(addons=[ACTION])
self.db = db
self.requires.update(['volume_id'])
self.optional.update(['volume_spec'])
def __call__(self, context, volume_id, volume_spec=None):
def execute(self, context, volume_id, volume_spec):
# Save these items since we only use them if a reversion is triggered.
return {
'volume_id': volume_id,
'volume_spec': volume_spec,
}
def revert(self, context, result, cause):
def revert(self, context, result, flow_failures, **kwargs):
if isinstance(result, misc.Failure):
return
volume_spec = result.get('volume_spec')
if not volume_spec:
# Attempt to use it from a later task that *should* have populated
# this from the database. It is not needed to be found since
# reverting will continue without it.
volume_spec = _find_result_spec(cause.flow)
# Restore the source volume status and set the volume to error status.
volume_id = result['volume_id']
_restore_source_status(context, self.db, volume_spec)
_error_out_volume(context, self.db, volume_id, reason=cause.exc)
_error_out_volume(context, self.db, volume_id)
LOG.error(_("Volume %s: create failed"), volume_id)
exc_info = False
if all(cause.exc_info):
exc_info = cause.exc_info
LOG.error(_('Unexpected build error:'), exc_info=exc_info)
class OnFailureRescheduleTask(base.CinderTask):
@ -846,10 +833,10 @@ class OnFailureRescheduleTask(base.CinderTask):
"""
def __init__(self, reschedule_context, db, scheduler_rpcapi):
super(OnFailureRescheduleTask, self).__init__(addons=[ACTION])
self.requires.update(['filter_properties', 'image_id', 'request_spec',
'snapshot_id', 'volume_id'])
self.optional.update(['volume_spec'])
requires = ['filter_properties', 'image_id', 'request_spec',
'snapshot_id', 'volume_id', 'context']
super(OnFailureRescheduleTask, self).__init__(addons=[ACTION],
requires=requires)
self.scheduler_rpcapi = scheduler_rpcapi
self.db = db
self.reschedule_context = reschedule_context
@ -878,27 +865,8 @@ class OnFailureRescheduleTask(base.CinderTask):
exception.ImageUnacceptable,
]
def _is_reschedulable(self, cause):
# Figure out the type of the causes exception and compare it against
# our black-list of exception types that will not cause rescheduling.
exc_type, value = cause.exc_info[:2]
# If we don't have a type from exc_info but we do have a exception in
# the cause, try to get the type from that instead.
if not value:
value = cause.exc
if not exc_type and value:
exc_type = type(value)
if exc_type and exc_type in self.no_reschedule_types:
return False
# Couldn't figure it out, by default assume whatever the cause was can
# be fixed by rescheduling.
#
# NOTE(harlowja): Crosses fingers.
return True
def __call__(self, context, *args, **kwargs):
# Save these items since we only use them if a reversion is triggered.
return kwargs.copy()
def execute(self, **kwargs):
pass
def _reschedule(self, context, cause, request_spec, filter_properties,
snapshot_id, image_id, volume_id, **kwargs):
@ -919,7 +887,7 @@ class OnFailureRescheduleTask(base.CinderTask):
{'volume_id': volume_id,
'method': _make_pretty_name(create_volume),
'num': num_attempts,
'reason': _exception_to_unicode(cause.exc)})
'reason': cause.exception_str})
if all(cause.exc_info):
# Stringify to avoid circular ref problem in json serialization
@ -958,83 +926,23 @@ class OnFailureRescheduleTask(base.CinderTask):
LOG.exception(_("Volume %s: resetting 'creating' status failed"),
volume_id)
def revert(self, context, result, cause):
volume_spec = result.get('volume_spec')
if not volume_spec:
# Find it from a prior task that populated this from the database.
volume_spec = _find_result_spec(cause.flow)
volume_id = result['volume_id']
def revert(self, context, result, flow_failures, **kwargs):
# Check if we have a cause which can tell us not to reschedule.
for failure in flow_failures.values():
if failure.check(self.no_reschedule_types):
return
volume_id = kwargs['volume_id']
# Use a different context when rescheduling.
if self.reschedule_context:
context = self.reschedule_context
# If we are now supposed to reschedule (or unable to), then just
# restore the source volume status and set the volume to error status.
def do_error_revert():
LOG.debug(_("Failing volume %s creation by altering volume status"
" instead of rescheduling"), volume_id)
_restore_source_status(context, self.db, volume_spec)
_error_out_volume(context, self.db, volume_id, reason=cause.exc)
LOG.error(_("Volume %s: create failed"), volume_id)
# Check if we have a cause which can tell us not to reschedule.
if not self._is_reschedulable(cause):
do_error_revert()
else:
try:
cause = list(flow_failures.values())[0]
self._pre_reschedule(context, volume_id)
self._reschedule(context, cause, **result)
self._reschedule(context, cause, **kwargs)
self._post_reschedule(context, volume_id)
except exception.CinderException:
LOG.exception(_("Volume %s: rescheduling failed"), volume_id)
# NOTE(harlowja): Do error volume status changing instead.
do_error_revert()
exc_info = False
if all(cause.exc_info):
exc_info = cause.exc_info
LOG.error(_('Unexpected build error:'), exc_info=exc_info)
class NotifySchedulerFailureTask(base.CinderTask):
"""Helper task that notifies some external service on failure.
Reversion strategy: On failure of any flow that includes this task the
request specification associated with that flow will be extracted and
sent as a payload to the notification service under the given methods
scheduler topic.
"""
def __init__(self, method):
super(NotifySchedulerFailureTask, self).__init__(addons=[ACTION])
self.requires.update(['request_spec', 'volume_id'])
self.method = method
self.topic = 'scheduler.%s' % self.method
self.publisher_id = notifier.publisher_id("scheduler")
def __call__(self, context, **kwargs):
# Save these items since we only use them if a reversion is triggered.
return kwargs.copy()
def revert(self, context, result, cause):
request_spec = result['request_spec']
volume_id = result['volume_id']
volume_properties = request_spec['volume_properties']
payload = {
'request_spec': request_spec,
'volume_properties': volume_properties,
'volume_id': volume_id,
'state': 'error',
'method': self.method,
'reason': unicode(cause.exc),
}
try:
notifier.notify(context, self.publisher_id, self.topic,
notifier.ERROR, payload)
except exception.CinderException:
LOG.exception(_("Failed notifying on %(topic)s "
"payload %(payload)s") % {'topic': self.topic,
'payload': payload})
class ExtractSchedulerSpecTask(base.CinderTask):
@ -1043,12 +951,12 @@ class ExtractSchedulerSpecTask(base.CinderTask):
Reversion strategy: N/A
"""
def __init__(self, db):
super(ExtractSchedulerSpecTask, self).__init__(addons=[ACTION])
default_provides = set(['request_spec'])
def __init__(self, db, **kwargs):
super(ExtractSchedulerSpecTask, self).__init__(addons=[ACTION],
**kwargs)
self.db = db
self.requires.update(['image_id', 'request_spec', 'snapshot_id',
'volume_id'])
self.provides.update(['request_spec'])
def _populate_request_spec(self, context, volume_id, snapshot_id,
image_id):
@ -1077,8 +985,8 @@ class ExtractSchedulerSpecTask(base.CinderTask):
'volume_type': list(dict(vol_type).iteritems()),
}
def __call__(self, context, request_spec, volume_id, snapshot_id,
image_id):
def execute(self, context, request_spec, volume_id, snapshot_id,
image_id):
# For RPC version < 1.2 backward compatibility
if request_spec is None:
request_spec = self._populate_request_spec(context, volume_id,
@ -1088,6 +996,33 @@ class ExtractSchedulerSpecTask(base.CinderTask):
}
class ExtractVolumeRefTask(base.CinderTask):
"""Extracts volume reference for given volume id. """
default_provides = 'volume_ref'
def __init__(self, db):
super(ExtractVolumeRefTask, self).__init__(addons=[ACTION])
self.db = db
def execute(self, context, volume_id):
# NOTE(harlowja): this will fetch the volume from the database, if
# the volume has been deleted before we got here then this should fail.
#
# In the future we might want to have a lock on the volume_id so that
# the volume can not be deleted while its still being created?
volume_ref = self.db.volume_get(context, volume_id)
return volume_ref
def revert(self, context, volume_id, result, **kwargs):
if isinstance(result, misc.Failure):
return
_error_out_volume(context, self.db, volume_id)
LOG.error(_("Volume %s: create failed"), volume_id)
class ExtractVolumeSpecTask(base.CinderTask):
"""Extracts a spec of a volume to be created into a common structure.
@ -1099,22 +1034,17 @@ class ExtractVolumeSpecTask(base.CinderTask):
Reversion strategy: N/A
"""
def __init__(self, db):
super(ExtractVolumeSpecTask, self).__init__(addons=[ACTION])
self.db = db
self.requires.update(['filter_properties', 'image_id', 'snapshot_id',
'source_volid', 'volume_id'])
self.provides.update(['volume_spec', 'volume_ref'])
default_provides = 'volume_spec'
def __call__(self, context, volume_id, **kwargs):
def __init__(self, db):
requires = ['image_id', 'snapshot_id', 'source_volid']
super(ExtractVolumeSpecTask, self).__init__(addons=[ACTION],
requires=requires)
self.db = db
def execute(self, context, volume_ref, **kwargs):
get_remote_image_service = glance.get_remote_image_service
# NOTE(harlowja): this will fetch the volume from the database, if
# the volume has been deleted before we got here then this should fail.
#
# In the future we might want to have a lock on the volume_id so that
# the volume can not be deleted while its still being created?
volume_ref = self.db.volume_get(context, volume_id)
volume_name = volume_ref['name']
volume_size = utils.as_int(volume_ref['size'], quiet=False)
@ -1173,21 +1103,14 @@ class ExtractVolumeSpecTask(base.CinderTask):
'image_service': image_service,
})
return {
'volume_spec': specs,
# NOTE(harlowja): it appears like further usage of this volume_ref
# result actually depend on it being a sqlalchemy object and not
# just a plain dictionary so thats why we are storing this here.
#
# It was attempted to refetch it when needed in subsequent tasks,
# but that caused sqlalchemy errors to occur (volume already open
# or similar).
#
# In the future where this task could fail and be recovered from we
# will need to store the volume_spec and recreate the volume_ref
# on demand.
'volume_ref': volume_ref,
}
return specs
def revert(self, context, result, **kwargs):
if isinstance(result, misc.Failure):
return
volume_spec = result.get('volume_spec')
# Restore the source volume status and set the volume to error status.
_restore_source_status(context, self.db, volume_spec)
class NotifyVolumeActionTask(base.CinderTask):
@ -1199,12 +1122,11 @@ class NotifyVolumeActionTask(base.CinderTask):
def __init__(self, db, host, event_suffix):
super(NotifyVolumeActionTask, self).__init__(addons=[ACTION,
event_suffix])
self.requires.update(['volume_ref'])
self.db = db
self.event_suffix = event_suffix
self.host = host
def __call__(self, context, volume_ref):
def execute(self, context, volume_ref):
volume_id = volume_ref['id']
try:
volume_utils.notify_about_volume_usage(context, volume_ref,
@ -1226,11 +1148,12 @@ class CreateVolumeFromSpecTask(base.CinderTask):
Reversion strategy: N/A
"""
default_provides = 'volume'
def __init__(self, db, host, driver):
super(CreateVolumeFromSpecTask, self).__init__(addons=[ACTION])
self.db = db
self.driver = driver
self.requires.update(['volume_spec', 'volume_ref'])
# This maps the different volume specification types into the methods
# that can create said volume type (aka this is a jump table).
self._create_func_mapping = {
@ -1472,7 +1395,7 @@ class CreateVolumeFromSpecTask(base.CinderTask):
def _create_raw_volume(self, context, volume_ref, **kwargs):
return self.driver.create_volume(volume_ref)
def __call__(self, context, volume_ref, volume_spec):
def execute(self, context, volume_ref, volume_spec):
# we can't do anything if the driver didn't init
if not self.driver.initialized:
LOG.error(_("Unable to create volume, driver not initialized"))
@ -1538,6 +1461,8 @@ class CreateVolumeFromSpecTask(base.CinderTask):
{'volume_id': volume_id, 'model': model_update})
raise exception.ExportFailure(reason=ex)
return volume_ref
class CreateVolumeOnFinishTask(NotifyVolumeActionTask):
"""On successful volume creation this will perform final volume actions.
@ -1553,13 +1478,12 @@ class CreateVolumeOnFinishTask(NotifyVolumeActionTask):
def __init__(self, db, host, event_suffix):
super(CreateVolumeOnFinishTask, self).__init__(db, host, event_suffix)
self.requires.update(['volume_spec'])
self.status_translation = {
'migration_target_creating': 'migration_target',
}
def __call__(self, context, volume_ref, volume_spec):
volume_id = volume_ref['id']
def execute(self, context, volume, volume_spec):
volume_id = volume['id']
new_status = self.status_translation.get(volume_spec.get('status'),
'available')
update = {
@ -1573,7 +1497,7 @@ class CreateVolumeOnFinishTask(NotifyVolumeActionTask):
# 'building' if this fails)??
volume_ref = self.db.volume_update(context, volume_id, update)
# Now use the parent to notify.
super(CreateVolumeOnFinishTask, self).__call__(context, volume_ref)
super(CreateVolumeOnFinishTask, self).execute(context, volume_ref)
except exception.CinderException:
LOG.exception(_("Failed updating volume %(volume_id)s with "
"%(update)s") % {'volume_id': volume_id,
@ -1605,31 +1529,26 @@ def get_api_flow(scheduler_rpcapi, volume_rpcapi, db,
flow_name = ACTION.replace(":", "_") + "_api"
api_flow = linear_flow.Flow(flow_name)
# This injects the initial starting flow values into the workflow so that
# the dependency order of the tasks provides/requires can be correctly
# determined.
api_flow.add(base.InjectTask(create_what, addons=[ACTION]))
api_flow.add(ExtractVolumeRequestTask(image_service,
az_check_functor))
api_flow.add(QuotaReserveTask())
v_uuid = api_flow.add(EntryCreateTask(db))
api_flow.add(QuotaCommitTask())
# If after committing something fails, ensure we set the db to failure
# before reverting any prior tasks.
api_flow.add(OnFailureChangeStatusTask(db))
api_flow.add(ExtractVolumeRequestTask(
image_service,
az_check_functor,
rebind={'size': 'raw_size',
'availability_zone': 'raw_availability_zone',
'volume_type': 'raw_volume_type'}))
api_flow.add(QuotaReserveTask(),
EntryCreateTask(db),
QuotaCommitTask())
# This will cast it out to either the scheduler or volume manager via
# the rpc apis provided.
api_flow.add(VolumeCastTask(scheduler_rpcapi, volume_rpcapi, db))
# Note(harlowja): this will return the flow as well as the uuid of the
# task which will produce the 'volume' database reference (since said
# reference is returned to other callers in the api for further usage).
return (flow_utils.attach_debug_listeners(api_flow), v_uuid)
# Now load (but do not run) the flow using the provided initial data.
return taskflow.engines.load(api_flow, store=create_what)
def get_scheduler_flow(db, driver, request_spec=None, filter_properties=None,
def get_scheduler_flow(context, db, driver, request_spec=None,
filter_properties=None,
volume_id=None, snapshot_id=None, image_id=None):
"""Constructs and returns the scheduler entrypoint flow.
@ -1643,29 +1562,23 @@ def get_scheduler_flow(db, driver, request_spec=None, filter_properties=None,
4. Uses provided driver to to then select and continue processing of
volume request.
"""
flow_name = ACTION.replace(":", "_") + "_scheduler"
scheduler_flow = linear_flow.Flow(flow_name)
# This injects the initial starting flow values into the workflow so that
# the dependency order of the tasks provides/requires can be correctly
# determined.
scheduler_flow.add(base.InjectTask({
'request_spec': request_spec,
create_what = {
'context': context,
'raw_request_spec': request_spec,
'filter_properties': filter_properties,
'volume_id': volume_id,
'snapshot_id': snapshot_id,
'image_id': image_id,
}, addons=[ACTION]))
}
flow_name = ACTION.replace(":", "_") + "_scheduler"
scheduler_flow = linear_flow.Flow(flow_name)
# This will extract and clean the spec from the starting values.
scheduler_flow.add(ExtractSchedulerSpecTask(db))
scheduler_flow.add(ExtractSchedulerSpecTask(
db,
rebind={'request_spec': 'raw_request_spec'}))
# The decorator application here ensures that the method gets the right
# requires attributes automatically by examining the underlying functions
# arguments.
@decorators.task
def schedule_create_volume(context, request_spec, filter_properties):
def _log_failure(cause):
@ -1711,16 +1624,16 @@ def get_scheduler_flow(db, driver, request_spec=None, filter_properties=None,
_log_failure(e)
_error_out_volume(context, db, volume_id, reason=e)
scheduler_flow.add(schedule_create_volume)
scheduler_flow.add(task.FunctorTask(schedule_create_volume))
return flow_utils.attach_debug_listeners(scheduler_flow)
# Now load (but do not run) the flow using the provided initial data.
return taskflow.engines.load(scheduler_flow, store=create_what)
def get_manager_flow(db, driver, scheduler_rpcapi, host, volume_id,
request_spec=None, filter_properties=None,
allow_reschedule=True,
snapshot_id=None, image_id=None, source_volid=None,
reschedule_context=None):
def get_manager_flow(context, db, driver, scheduler_rpcapi, host, volume_id,
allow_reschedule, reschedule_context, request_spec,
filter_properties, snapshot_id=None, image_id=None,
source_volid=None):
"""Constructs and returns the manager entrypoint flow.
This flow will do the following:
@ -1739,44 +1652,29 @@ def get_manager_flow(db, driver, scheduler_rpcapi, host, volume_id,
flow_name = ACTION.replace(":", "_") + "_manager"
volume_flow = linear_flow.Flow(flow_name)
# Determine if we are allowed to reschedule since this affects how
# failures will be handled.
if not filter_properties:
filter_properties = {}
if not request_spec and allow_reschedule:
LOG.debug(_("No request spec, will not reschedule"))
allow_reschedule = False
if not filter_properties.get('retry', None) and allow_reschedule:
LOG.debug(_("No retry filter property or associated "
"retry info, will not reschedule"))
allow_reschedule = False
# This injects the initial starting flow values into the workflow so that
# the dependency order of the tasks provides/requires can be correctly
# determined.
volume_flow.add(base.InjectTask({
create_what = {
'context': context,
'filter_properties': filter_properties,
'image_id': image_id,
'request_spec': request_spec,
'snapshot_id': snapshot_id,
'source_volid': source_volid,
'volume_id': volume_id,
}, addons=[ACTION]))
}
# We can actually just check if we should reschedule on failure ahead of
# time instead of trying to determine this later, certain values are needed
# to reschedule and without them we should just avoid rescheduling.
if not allow_reschedule:
# On failure ensure that we just set the volume status to error.
LOG.debug(_("Retry info not present, will not reschedule"))
volume_flow.add(OnFailureChangeStatusTask(db))
else:
volume_flow.add(ExtractVolumeRefTask(db))
if allow_reschedule and request_spec:
volume_flow.add(OnFailureRescheduleTask(reschedule_context,
db, scheduler_rpcapi))
volume_flow.add(ExtractVolumeSpecTask(db))
volume_flow.add(NotifyVolumeActionTask(db, host, "create.start"))
volume_flow.add(CreateVolumeFromSpecTask(db, host, driver))
volume_flow.add(CreateVolumeOnFinishTask(db, host, "create.end"))
volume_flow.add(ExtractVolumeSpecTask(db),
NotifyVolumeActionTask(db, host, "create.start"),
CreateVolumeFromSpecTask(db, host, driver),
CreateVolumeOnFinishTask(db, host, "create.end"))
return flow_utils.attach_debug_listeners(volume_flow)
# Now load (but do not run) the flow using the provided initial data.
return taskflow.engines.load(volume_flow, store=create_what)

View File

@ -1,55 +0,0 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved.
# Copyright (c) 2013 OpenStack Foundation
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from cinder.openstack.common import log as logging
LOG = logging.getLogger(__name__)
def attach_debug_listeners(flow):
"""Sets up a nice set of debug listeners for the flow.
These listeners will log when tasks/flows are transitioning from state to
state so that said states can be seen in the debug log output which is very
useful for figuring out where problems are occurring.
"""
def flow_log_change(state, details):
# TODO(harlowja): the bug 1214083 is causing problems
LOG.debug(_("%(flow)s has moved into state %(state)s from state"
" %(old_state)s") % {'state': state,
'old_state': details.get('old_state'),
'flow': str(details['flow'])})
def task_log_change(state, details):
# TODO(harlowja): the bug 1214083 is causing problems
LOG.debug(_("%(flow)s has moved %(runner)s into state %(state)s with"
" result: %(result)s") % {'state': state,
'flow': str(details['flow']),
'runner': str(details['runner']),
'result': details.get('result')})
# Register * for all state changes (and not selective state changes to be
# called upon) since all the changes is more useful.
flow.notifier.register('*', flow_log_change)
flow.task_notifier.register('*', task_log_change)
return flow

View File

@ -61,8 +61,6 @@ from cinder.volume import rpcapi as volume_rpcapi
from cinder.volume import utils as volume_utils
from cinder.volume import volume_types
from cinder.taskflow import states
from eventlet.greenpool import GreenPool
LOG = logging.getLogger(__name__)
@ -289,23 +287,31 @@ class VolumeManager(manager.SchedulerDependentManager):
def create_volume(self, context, volume_id, request_spec=None,
filter_properties=None, allow_reschedule=True,
snapshot_id=None, image_id=None, source_volid=None):
"""Creates and exports the volume."""
context_saved = context.deepcopy()
context = context.elevated()
if filter_properties is None:
filter_properties = {}
flow = create_volume.get_manager_flow(
self.db,
self.driver,
self.scheduler_rpcapi,
self.host,
volume_id,
request_spec=request_spec,
filter_properties=filter_properties,
allow_reschedule=allow_reschedule,
snapshot_id=snapshot_id,
image_id=image_id,
source_volid=source_volid,
reschedule_context=context.deepcopy())
assert flow, _('Manager volume flow not retrieved')
try:
flow_engine = create_volume.get_manager_flow(
context,
self.db,
self.driver,
self.scheduler_rpcapi,
self.host,
volume_id,
snapshot_id=snapshot_id,
image_id=image_id,
source_volid=source_volid,
allow_reschedule=allow_reschedule,
reschedule_context=context_saved,
request_spec=request_spec,
filter_properties=filter_properties)
except Exception:
raise exception.CinderException(
_("Failed to create manager volume flow"))
if snapshot_id is not None:
# Make sure the snapshot is not deleted until we are done with it.
@ -317,11 +323,11 @@ class VolumeManager(manager.SchedulerDependentManager):
locked_action = None
def _run_flow():
flow.run(context.elevated())
if flow.state != states.SUCCESS:
msg = _("Failed to successfully complete manager volume "
"workflow")
raise exception.CinderException(msg)
# This code executes create volume flow. If something goes wrong,
# flow reverts all job that was done and reraises an exception.
# Otherwise, all data that was generated by flow becomes available
# in flow engine's storage.
flow_engine.run()
@utils.synchronized(locked_action, external=True)
def _run_flow_locked():
@ -332,8 +338,9 @@ class VolumeManager(manager.SchedulerDependentManager):
else:
_run_flow_locked()
self._reset_stats()
return volume_id
# Fetch created volume from storage
volume_ref = flow_engine.storage.fetch('volume')
return volume_ref['id']
@utils.require_driver_initialized
@locked_volume_operation

View File

@ -18,6 +18,7 @@ python-keystoneclient>=0.4.1
python-novaclient>=2.15.0
python-swiftclient>=1.5
Routes>=1.12.3
taskflow>=0.1.1,<0.2
rtslib-fb>=2.1.39
six>=1.4.1
SQLAlchemy>=0.7.8,<=0.7.99

View File

@ -1,7 +0,0 @@
[DEFAULT]
# The list of primitives to copy from taskflow
primitives=flow.linear_flow,task
# The base module to hold the copy of taskflow
base=cinder