Refactoring of create_volume to use taskflow.

Move the create_volume workflow to using taskflow and
split that workflow into three major pieces (each with
there own workflow) and create tasks that perform the
individual required actions to accomplish the pieces
desired outcome.

1. An api workflow composed of the following tasks:
  - Extracting volume request (which checks types, values) and creates a
    standard output for other tasks to work on (allowing further tasks to be
    plugged in the chain without having to worry about other tasks output
    formats).
  - Quota reservation (rolled back on failure).
  - Database entry creation.
  - Quota committing.
  - Volume RPC casting to volume scheduler or to targeted volume manager.
2. A scheduler workflow composed of the following tasks:
  - Extracting scheduler request specification for further tasks to use.
  - Change status & notify (activated only on failure).
  - Create volume scheduler driver call (which will itself RPC cast to a
    targeted volume manager).
3. A manager workflow composed of the following tasks:
  - Extract volume request specification from incoming request for
    further tasks to use. This also breaks up the incoming request into the 4
    volume types that can be created later.
  - Change status & notify on failure or reschedule on failure, this is
    dependent on if rescheduling is enabled *and* which exception types are
    thrown from the volume creation code.
  - Create volume from specification
    - This contains the code to create from image, create raw volume, create
      from source volume, create from snapshot using the extracted volume
      specification.
  - Change status & notify success.

Key benefits:
  - Handled exceptions in a easier to understand, easier to review and more
    reliable way than they are currently being handled.
  - Rescheduling is now easier to understand.
  - Easier to understand structure with tasks that consume inputs, take some
    action on them and produce outputs and revert on subsequent failure using
    whatever they produced to know how to revert.
  - Ability to add new unit tests that can test individual task actions by
    providing mock task inputs and validating expected task outputs.

Future additions:
  - Eventual addition of resumption logic to recover from operations stopped
    halfway through.
  - Ability to centrally orchestrate the tasks and pick and choice how
    reconciliation of failures based on code or policies.

Part of bp: cinder-state-machine

Change-Id: I96b688511b35014a8c006e4d30b875dcaf409d93
This commit is contained in:
Joshua Harlow 2013-08-01 12:08:04 -07:00
parent 568e75c511
commit e78ba96949
19 changed files with 3319 additions and 597 deletions

View File

@ -580,6 +580,22 @@ class GlanceMetadataExists(Invalid):
" exists for volume id %(volume_id)s")
class ExportFailure(Invalid):
message = _("Failed to export for volume: %(reason)s")
class MetadataCreateFailure(Invalid):
message = _("Failed to create metadata for volume: %(reason)s")
class MetadataUpdateFailure(Invalid):
message = _("Failed to update metadata for volume: %(reason)s")
class MetadataCopyFailure(Invalid):
message = _("Failed to copy metadata to volume: %(reason)s")
class ImageCopyFailure(Invalid):
message = _("Failed to copy image to volume: %(reason)s")

View File

@ -62,6 +62,20 @@ def _set_brain(data):
policy.set_brain(policy.Brain.load_json(data, default_rule))
def enforce_action(context, action):
"""Checks that the action can be done by the given context.
Applies a check to ensure the context's project_id and user_id can be
applied to the given action using the policy enforcement api.
"""
target = {
'project_id': context.project_id,
'user_id': context.user_id,
}
enforce(context, action, target)
def enforce(context, action, target):
"""Verifies that the action is valid on the target in this context.

View File

@ -31,8 +31,10 @@ from cinder.openstack.common import excutils
from cinder.openstack.common import importutils
from cinder.openstack.common import log as logging
from cinder.openstack.common.notifier import api as notifier
from cinder.volume.flows import create_volume
from cinder.volume import rpcapi as volume_rpcapi
from cinder.taskflow import states
scheduler_driver_opt = cfg.StrOpt('scheduler_driver',
default='cinder.scheduler.filter_scheduler.'
@ -81,61 +83,18 @@ class SchedulerManager(manager.Manager):
def create_volume(self, context, topic, volume_id, snapshot_id=None,
image_id=None, request_spec=None,
filter_properties=None):
try:
if request_spec is None:
# For RPC version < 1.2 backward compatibility
request_spec = {}
volume_ref = db.volume_get(context, volume_id)
size = volume_ref.get('size')
availability_zone = volume_ref.get('availability_zone')
volume_type_id = volume_ref.get('volume_type_id')
vol_type = db.volume_type_get(context, volume_type_id)
volume_properties = {'size': size,
'availability_zone': availability_zone,
'volume_type_id': volume_type_id}
request_spec.update(
{'volume_id': volume_id,
'snapshot_id': snapshot_id,
'image_id': image_id,
'volume_properties': volume_properties,
'volume_type': dict(vol_type).iteritems()})
self.driver.schedule_create_volume(context, request_spec,
filter_properties)
except exception.NoValidHost as ex:
volume_state = {'volume_state': {'status': 'error'}}
self._set_volume_state_and_notify('create_volume',
volume_state,
context, ex, request_spec)
except Exception as ex:
with excutils.save_and_reraise_exception():
volume_state = {'volume_state': {'status': 'error'}}
self._set_volume_state_and_notify('create_volume',
volume_state,
context, ex, request_spec)
flow = create_volume.get_scheduler_flow(db, self.driver,
request_spec,
filter_properties,
volume_id, snapshot_id,
image_id)
assert flow, _('Schedule volume flow not retrieved')
def _set_volume_state_and_notify(self, method, updates, context, ex,
request_spec):
LOG.error(_("Failed to schedule_%(method)s: %(ex)s") %
{'method': method, 'ex': ex})
volume_state = updates['volume_state']
properties = request_spec.get('volume_properties', {})
volume_id = request_spec.get('volume_id', None)
if volume_id:
db.volume_update(context, volume_id, volume_state)
payload = dict(request_spec=request_spec,
volume_properties=properties,
volume_id=volume_id,
state=volume_state,
method=method,
reason=ex)
notifier.notify(context, notifier.publisher_id("scheduler"),
'scheduler.' + method, notifier.ERROR, payload)
flow.run(context)
if flow.state != states.SUCCESS:
LOG.warn(_("Failed to successfully complete"
" schedule volume using flow: %s"), flow)
def request_service_capabilities(self, context):
volume_rpcapi.VolumeAPI().publish_service_capabilities(context)
@ -164,3 +123,28 @@ class SchedulerManager(manager.Manager):
volume_rpcapi.VolumeAPI().migrate_volume(context, volume_ref,
tgt_host,
force_host_copy)
def _set_volume_state_and_notify(self, method, updates, context, ex,
request_spec):
# TODO(harlowja): move into a task that just does this later.
LOG.error(_("Failed to schedule_%(method)s: %(ex)s") %
{'method': method, 'ex': ex})
volume_state = updates['volume_state']
properties = request_spec.get('volume_properties', {})
volume_id = request_spec.get('volume_id', None)
if volume_id:
db.volume_update(context, volume_id, volume_state)
payload = dict(request_spec=request_spec,
volume_properties=properties,
volume_id=volume_id,
state=volume_state,
method=method,
reason=ex)
notifier.notify(context, notifier.publisher_id("scheduler"),
'scheduler.' + method, notifier.ERROR, payload)

View File

@ -0,0 +1 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4

View File

@ -0,0 +1,276 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import functools
import inspect
import types
# These arguments are ones that we will skip when parsing for requirements
# for a function to operate (when used as a task).
AUTO_ARGS = ('self', 'context', 'cls')
def is_decorated(functor):
if not isinstance(functor, (types.MethodType, types.FunctionType)):
return False
return getattr(extract(functor), '__task__', False)
def extract(functor):
# Extract the underlying functor if its a method since we can not set
# attributes on instance methods, this is supposedly fixed in python 3
# and later.
#
# TODO(harlowja): add link to this fix.
assert isinstance(functor, (types.MethodType, types.FunctionType))
if isinstance(functor, types.MethodType):
return functor.__func__
else:
return functor
def _mark_as_task(functor):
setattr(functor, '__task__', True)
def _get_wrapped(function):
"""Get the method at the bottom of a stack of decorators."""
if hasattr(function, '__wrapped__'):
return getattr(function, '__wrapped__')
if not hasattr(function, 'func_closure') or not function.func_closure:
return function
def _get_wrapped_function(function):
if not hasattr(function, 'func_closure') or not function.func_closure:
return None
for closure in function.func_closure:
func = closure.cell_contents
deeper_func = _get_wrapped_function(func)
if deeper_func:
return deeper_func
elif hasattr(closure.cell_contents, '__call__'):
return closure.cell_contents
return _get_wrapped_function(function)
def _take_arg(a):
if a in AUTO_ARGS:
return False
# In certain decorator cases it seems like we get the function to be
# decorated as an argument, we don't want to take that as a real argument.
if isinstance(a, collections.Callable):
return False
return True
def wraps(fn):
"""This will not be needed in python 3.2 or greater which already has this
built-in to its functools.wraps method.
"""
def wrapper(f):
f = functools.wraps(fn)(f)
f.__wrapped__ = getattr(fn, '__wrapped__', fn)
return f
return wrapper
def locked(f):
@wraps(f)
def wrapper(self, *args, **kwargs):
with self._lock:
return f(self, *args, **kwargs)
return wrapper
def task(*args, **kwargs):
"""Decorates a given function and ensures that all needed attributes of
that function are set so that the function can be used as a task.
"""
def decorator(f):
w_f = extract(f)
def noop(*args, **kwargs):
pass
# Mark as being a task
_mark_as_task(w_f)
# By default don't revert this.
w_f.revert = kwargs.pop('revert_with', noop)
# Associate a name of this task that is the module + function name.
w_f.name = "%s.%s" % (f.__module__, f.__name__)
# Sets the version of the task.
version = kwargs.pop('version', (1, 0))
f = _versionize(*version)(f)
# Attach any requirements this function needs for running.
requires_what = kwargs.pop('requires', [])
f = _requires(*requires_what, **kwargs)(f)
# Attach any optional requirements this function needs for running.
optional_what = kwargs.pop('optional', [])
f = _optional(*optional_what, **kwargs)(f)
# Attach any items this function provides as output
provides_what = kwargs.pop('provides', [])
f = _provides(*provides_what, **kwargs)(f)
@wraps(f)
def wrapper(*args, **kwargs):
return f(*args, **kwargs)
return wrapper
# This is needed to handle when the decorator has args or the decorator
# doesn't have args, python is rather weird here...
if kwargs or not args:
return decorator
else:
if isinstance(args[0], collections.Callable):
return decorator(args[0])
else:
return decorator
def _versionize(major, minor=None):
"""A decorator that marks the wrapped function with a major & minor version
number.
"""
if minor is None:
minor = 0
def decorator(f):
w_f = extract(f)
w_f.version = (major, minor)
@wraps(f)
def wrapper(*args, **kwargs):
return f(*args, **kwargs)
return wrapper
return decorator
def _optional(*args, **kwargs):
"""Attaches a set of items that the decorated function would like as input
to the functions underlying dictionary.
"""
def decorator(f):
w_f = extract(f)
if not hasattr(w_f, 'optional'):
w_f.optional = set()
w_f.optional.update([a for a in args if _take_arg(a)])
@wraps(f)
def wrapper(*args, **kwargs):
return f(*args, **kwargs)
return wrapper
# This is needed to handle when the decorator has args or the decorator
# doesn't have args, python is rather weird here...
if kwargs or not args:
return decorator
else:
if isinstance(args[0], collections.Callable):
return decorator(args[0])
else:
return decorator
def _requires(*args, **kwargs):
"""Attaches a set of items that the decorated function requires as input
to the functions underlying dictionary.
"""
def decorator(f):
w_f = extract(f)
if not hasattr(w_f, 'requires'):
w_f.requires = set()
if kwargs.pop('auto_extract', True):
inspect_what = _get_wrapped(f)
f_args = inspect.getargspec(inspect_what).args
w_f.requires.update([a for a in f_args if _take_arg(a)])
w_f.requires.update([a for a in args if _take_arg(a)])
@wraps(f)
def wrapper(*args, **kwargs):
return f(*args, **kwargs)
return wrapper
# This is needed to handle when the decorator has args or the decorator
# doesn't have args, python is rather weird here...
if kwargs or not args:
return decorator
else:
if isinstance(args[0], collections.Callable):
return decorator(args[0])
else:
return decorator
def _provides(*args, **kwargs):
"""Attaches a set of items that the decorated function provides as output
to the functions underlying dictionary.
"""
def decorator(f):
w_f = extract(f)
if not hasattr(f, 'provides'):
w_f.provides = set()
w_f.provides.update([a for a in args if _take_arg(a)])
@wraps(f)
def wrapper(*args, **kwargs):
return f(*args, **kwargs)
return wrapper
# This is needed to handle when the decorator has args or the decorator
# doesn't have args, python is rather weird here...
if kwargs or not args:
return decorator
else:
if isinstance(args[0], collections.Callable):
return decorator(args[0])
else:
return decorator

View File

@ -0,0 +1,69 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class TaskFlowException(Exception):
"""Base class for exceptions emitted from this library."""
pass
class Duplicate(TaskFlowException):
"""Raised when a duplicate entry is found."""
pass
class NotFound(TaskFlowException):
"""Raised when some entry in some object doesn't exist."""
pass
class AlreadyExists(TaskFlowException):
"""Raised when some entry in some object already exists."""
pass
class ClosedException(TaskFlowException):
"""Raised when an access on a closed object occurs."""
pass
class InvalidStateException(TaskFlowException):
"""Raised when a task/job/workflow is in an invalid state when an
operation is attempting to apply to said task/job/workflow.
"""
pass
class UnclaimableJobException(TaskFlowException):
"""Raised when a job can not be claimed."""
pass
class JobNotFound(TaskFlowException):
"""Raised when a job entry can not be found."""
pass
class MissingDependencies(InvalidStateException):
"""Raised when a task has dependencies that can not be satisified."""
message = ("%(task)s requires %(requirements)s but no other task produces"
" said requirements")
def __init__(self, task, requirements):
message = self.message % {'task': task, 'requirements': requirements}
super(MissingDependencies, self).__init__(message)

View File

@ -0,0 +1 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4

View File

@ -0,0 +1,214 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import threading
from cinder.openstack.common import uuidutils
from cinder.taskflow import decorators
from cinder.taskflow import exceptions as exc
from cinder.taskflow import states
from cinder.taskflow import utils
class Flow(object):
"""The base abstract class of all flow implementations.
It provides a set of parents to flows that have a concept of parent flows
as well as a state and state utility functions to the deriving classes. It
also provides a name and an identifier (uuid or other) to the flow so that
it can be uniquely identifed among many flows.
Flows are expected to provide (if desired) the following methods:
- add
- add_many
- interrupt
- reset
- rollback
- run
- soft_reset
"""
__metaclass__ = abc.ABCMeta
# Common states that certain actions can be performed in. If the flow
# is not in these sets of states then it is likely that the flow operation
# can not succeed.
RESETTABLE_STATES = set([
states.INTERRUPTED,
states.SUCCESS,
states.PENDING,
states.FAILURE,
])
SOFT_RESETTABLE_STATES = set([
states.INTERRUPTED,
])
UNINTERRUPTIBLE_STATES = set([
states.FAILURE,
states.SUCCESS,
states.PENDING,
])
RUNNABLE_STATES = set([
states.PENDING,
])
def __init__(self, name, parents=None, uuid=None):
self._name = str(name)
# The state of this flow.
self._state = states.PENDING
# If this flow has a parent flow/s which need to be reverted if
# this flow fails then please include them here to allow this child
# to call the parents...
if parents:
self.parents = tuple(parents)
else:
self.parents = ()
# Any objects that want to listen when a wf/task starts/stops/completes
# or errors should be registered here. This can be used to monitor
# progress and record tasks finishing (so that it becomes possible to
# store the result of a task in some persistent or semi-persistent
# storage backend).
self.notifier = utils.TransitionNotifier()
self.task_notifier = utils.TransitionNotifier()
# Ensure that modifications and/or multiple runs aren't happening
# at the same time in the same flow at the same time.
self._lock = threading.RLock()
# Assign this flow a unique identifer.
if uuid:
self._id = str(uuid)
else:
self._id = uuidutils.generate_uuid()
@property
def name(self):
"""A non-unique name for this flow (human readable)"""
return self._name
@property
def uuid(self):
"""Uniquely identifies this flow"""
return "f-%s" % (self._id)
@property
def state(self):
"""Provides a read-only view of the flow state."""
return self._state
def _change_state(self, context, new_state):
was_changed = False
old_state = self.state
with self._lock:
if self.state != new_state:
old_state = self.state
self._state = new_state
was_changed = True
if was_changed:
# Don't notify while holding the lock.
self.notifier.notify(self.state, details={
'context': context,
'flow': self,
'old_state': old_state,
})
def __str__(self):
lines = ["Flow: %s" % (self.name)]
lines.append("%s" % (self.uuid))
lines.append("%s" % (len(self.parents)))
lines.append("%s" % (self.state))
return "; ".join(lines)
@abc.abstractmethod
def add(self, task):
"""Adds a given task to this flow.
Returns the uuid that is associated with the task for later operations
before and after it is ran.
"""
raise NotImplementedError()
@decorators.locked
def add_many(self, tasks):
"""Adds many tasks to this flow.
Returns a list of uuids (one for each task added).
"""
uuids = []
for t in tasks:
uuids.append(self.add(t))
return uuids
def interrupt(self):
"""Attempts to interrupt the current flow and any tasks that are
currently not running in the flow.
Returns how many tasks were interrupted (if any).
"""
if self.state in self.UNINTERRUPTIBLE_STATES:
raise exc.InvalidStateException(("Can not interrupt when"
" in state %s") % (self.state))
# Note(harlowja): Do *not* acquire the lock here so that the flow may
# be interrupted while running. This does mean the the above check may
# not be valid but we can worry about that if it becomes an issue.
old_state = self.state
if old_state != states.INTERRUPTED:
self._state = states.INTERRUPTED
self.notifier.notify(self.state, details={
'context': None,
'flow': self,
'old_state': old_state,
})
return 0
@decorators.locked
def reset(self):
"""Fully resets the internal state of this flow, allowing for the flow
to be ran again.
Note: Listeners are also reset.
"""
if self.state not in self.RESETTABLE_STATES:
raise exc.InvalidStateException(("Can not reset when"
" in state %s") % (self.state))
self.notifier.reset()
self.task_notifier.reset()
self._change_state(None, states.PENDING)
@decorators.locked
def soft_reset(self):
"""Partially resets the internal state of this flow, allowing for the
flow to be ran again from an interrupted state only.
"""
if self.state not in self.SOFT_RESETTABLE_STATES:
raise exc.InvalidStateException(("Can not soft reset when"
" in state %s") % (self.state))
self._change_state(None, states.PENDING)
@decorators.locked
def run(self, context, *args, **kwargs):
"""Executes the workflow."""
if self.state not in self.RUNNABLE_STATES:
raise exc.InvalidStateException("Unable to run flow when "
"in state %s" % (self.state))
@decorators.locked
def rollback(self, context, cause):
"""Performs rollback of this workflow and any attached parent workflows
if present.
"""
pass

View File

@ -0,0 +1,271 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import logging
from cinder.openstack.common import excutils
from cinder.taskflow import decorators
from cinder.taskflow import exceptions as exc
from cinder.taskflow import states
from cinder.taskflow import utils
from cinder.taskflow.patterns import base
LOG = logging.getLogger(__name__)
class Flow(base.Flow):
""""A linear chain of tasks that can be applied in order as one unit and
rolled back as one unit using the reverse order that the tasks have
been applied in.
Note(harlowja): Each task in the chain must have requirements
which are satisfied by the previous task/s in the chain.
"""
def __init__(self, name, parents=None, uuid=None):
super(Flow, self).__init__(name, parents, uuid)
# The tasks which have been applied will be collected here so that they
# can be reverted in the correct order on failure.
self._accumulator = utils.RollbackAccumulator()
# Tasks results are stored here. Lookup is by the uuid that was
# returned from the add function.
self.results = {}
# The previously left off iterator that can be used to resume from
# the last task (if interrupted and soft-reset).
self._leftoff_at = None
# All runners to run are collected here.
self._runners = []
self._connected = False
# The resumption strategy to use.
self.resumer = None
@decorators.locked
def add(self, task):
"""Adds a given task to this flow."""
assert isinstance(task, collections.Callable)
r = utils.Runner(task)
r.runs_before = list(reversed(self._runners))
self._runners.append(r)
self._reset_internals()
return r.uuid
def _reset_internals(self):
self._connected = False
self._leftoff_at = None
def _associate_providers(self, runner):
# Ensure that some previous task provides this input.
who_provides = {}
task_requires = runner.requires
for r in task_requires:
provider = None
for before_me in runner.runs_before:
if r in before_me.provides:
provider = before_me
break
if provider:
who_provides[r] = provider
# Ensure that the last task provides all the needed input for this
# task to run correctly.
missing_requires = task_requires - set(who_provides.keys())
if missing_requires:
raise exc.MissingDependencies(runner, sorted(missing_requires))
runner.providers.update(who_provides)
def __str__(self):
lines = ["LinearFlow: %s" % (self.name)]
lines.append("%s" % (self.uuid))
lines.append("%s" % (len(self._runners)))
lines.append("%s" % (len(self.parents)))
lines.append("%s" % (self.state))
return "; ".join(lines)
@decorators.locked
def remove(self, uuid):
index_removed = -1
for (i, r) in enumerate(self._runners):
if r.uuid == uuid:
index_removed = i
break
if index_removed == -1:
raise ValueError("No runner found with uuid %s" % (uuid))
else:
removed = self._runners.pop(index_removed)
self._reset_internals()
# Go and remove it from any runner after the removed runner since
# those runners may have had an attachment to it.
for r in self._runners[index_removed:]:
try:
r.runs_before.remove(removed)
except (IndexError, ValueError):
pass
def __len__(self):
return len(self._runners)
def _connect(self):
if self._connected:
return self._runners
for r in self._runners:
r.providers = {}
for r in reversed(self._runners):
self._associate_providers(r)
self._connected = True
return self._runners
def _ordering(self):
return iter(self._connect())
@decorators.locked
def run(self, context, *args, **kwargs):
super(Flow, self).run(context, *args, **kwargs)
def resume_it():
if self._leftoff_at is not None:
return ([], self._leftoff_at)
if self.resumer:
(finished, leftover) = self.resumer.resume(self,
self._ordering())
else:
finished = []
leftover = self._ordering()
return (finished, leftover)
self._change_state(context, states.STARTED)
try:
those_finished, leftover = resume_it()
except Exception:
with excutils.save_and_reraise_exception():
self._change_state(context, states.FAILURE)
def run_it(runner, failed=False, result=None, simulate_run=False):
try:
# Add the task to be rolled back *immediately* so that even if
# the task fails while producing results it will be given a
# chance to rollback.
rb = utils.RollbackTask(context, runner.task, result=None)
self._accumulator.add(rb)
self.task_notifier.notify(states.STARTED, details={
'context': context,
'flow': self,
'runner': runner,
})
if not simulate_run:
result = runner(context, *args, **kwargs)
else:
if failed:
# TODO(harlowja): make this configurable??
# If we previously failed, we want to fail again at
# the same place.
if not result:
# If no exception or exception message was provided
# or captured from the previous run then we need to
# form one for this task.
result = "%s failed running." % (runner.task)
if isinstance(result, basestring):
result = exc.InvalidStateException(result)
if not isinstance(result, Exception):
LOG.warn("Can not raise a non-exception"
" object: %s", result)
result = exc.InvalidStateException()
raise result
# Adjust the task result in the accumulator before
# notifying others that the task has finished to
# avoid the case where a listener might throw an
# exception.
rb.result = result
runner.result = result
self.results[runner.uuid] = result
self.task_notifier.notify(states.SUCCESS, details={
'context': context,
'flow': self,
'runner': runner,
})
except Exception as e:
runner.result = e
cause = utils.FlowFailure(runner, self, e)
with excutils.save_and_reraise_exception():
# Notify any listeners that the task has errored.
self.task_notifier.notify(states.FAILURE, details={
'context': context,
'flow': self,
'runner': runner,
})
self.rollback(context, cause)
if len(those_finished):
self._change_state(context, states.RESUMING)
for (r, details) in those_finished:
# Fake running the task so that we trigger the same
# notifications and state changes (and rollback that
# would have happened in a normal flow).
failed = states.FAILURE in details.get('states', [])
result = details.get('result')
run_it(r, failed=failed, result=result, simulate_run=True)
self._leftoff_at = leftover
self._change_state(context, states.RUNNING)
if self.state == states.INTERRUPTED:
return
was_interrupted = False
for r in leftover:
r.reset()
run_it(r)
if self.state == states.INTERRUPTED:
was_interrupted = True
break
if not was_interrupted:
# Only gets here if everything went successfully.
self._change_state(context, states.SUCCESS)
self._leftoff_at = None
@decorators.locked
def reset(self):
super(Flow, self).reset()
self.results = {}
self.resumer = None
self._accumulator.reset()
self._reset_internals()
@decorators.locked
def rollback(self, context, cause):
# Performs basic task by task rollback by going through the reverse
# order that tasks have finished and asking said task to undo whatever
# it has done. If this flow has any parent flows then they will
# also be called to rollback any tasks said parents contain.
#
# Note(harlowja): if a flow can more simply revert a whole set of
# tasks via a simpler command then it can override this method to
# accomplish that.
#
# For example, if each task was creating a file in a directory, then
# it's easier to just remove the directory than to ask each task to
# delete its file individually.
self._change_state(context, states.REVERTING)
try:
self._accumulator.rollback(cause)
finally:
self._change_state(context, states.FAILURE)
# Rollback any parents flows if they exist...
for p in self.parents:
p.rollback(context, cause)

40
cinder/taskflow/states.py Normal file
View File

@ -0,0 +1,40 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# Job states.
CLAIMED = 'CLAIMED'
FAILURE = 'FAILURE'
PENDING = 'PENDING'
RUNNING = 'RUNNING'
SUCCESS = 'SUCCESS'
UNCLAIMED = 'UNCLAIMED'
# Flow states.
FAILURE = FAILURE
INTERRUPTED = 'INTERRUPTED'
PENDING = 'PENDING'
RESUMING = 'RESUMING'
REVERTING = 'REVERTING'
RUNNING = RUNNING
STARTED = 'STARTED'
SUCCESS = SUCCESS
# Task states.
FAILURE = FAILURE
STARTED = STARTED
SUCCESS = SUCCESS

66
cinder/taskflow/task.py Normal file
View File

@ -0,0 +1,66 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
from cinder.taskflow import utils
class Task(object):
"""An abstraction that defines a potential piece of work that can be
applied and can be reverted to undo the work as a single unit.
"""
__metaclass__ = abc.ABCMeta
def __init__(self, name):
self.name = name
# An *immutable* input 'resource' name set this task depends
# on existing before this task can be applied.
self.requires = set()
# An *immutable* input 'resource' name set this task would like to
# depends on existing before this task can be applied (but does not
# strongly depend on existing).
self.optional = set()
# An *immutable* output 'resource' name set this task
# produces that other tasks may depend on this task providing.
self.provides = set()
# This identifies the version of the task to be ran which
# can be useful in resuming older versions of tasks. Standard
# major, minor version semantics apply.
self.version = (1, 0)
def __str__(self):
return "%s==%s" % (self.name, utils.join(self.version, with_what="."))
@abc.abstractmethod
def __call__(self, context, *args, **kwargs):
"""Activate a given task which will perform some operation and return.
This method can be used to apply some given context and given set
of args and kwargs to accomplish some goal. Note that the result
that is returned needs to be serializable so that it can be passed
back into this task if reverting is triggered.
"""
raise NotImplementedError()
def revert(self, context, result, cause):
"""Revert this task using the given context, result that the apply
provided as well as any information which may have caused
said reversion.
"""
pass

464
cinder/taskflow/utils.py Normal file
View File

@ -0,0 +1,464 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import contextlib
import copy
import logging
import re
import sys
import threading
import time
import types
from cinder.openstack.common import uuidutils
from cinder.taskflow import decorators
LOG = logging.getLogger(__name__)
def get_attr(task, field, default=None):
if decorators.is_decorated(task):
# If its a decorated functor then the attributes will be either
# in the underlying function of the instancemethod or the function
# itself.
task = decorators.extract(task)
return getattr(task, field, default)
def join(itr, with_what=","):
pieces = [str(i) for i in itr]
return with_what.join(pieces)
def get_many_attr(obj, *attrs):
many = []
for a in attrs:
many.append(get_attr(obj, a, None))
return many
def get_task_version(task):
"""Gets a tasks *string* version, whether it is a task object/function."""
task_version = get_attr(task, 'version')
if isinstance(task_version, (list, tuple)):
task_version = join(task_version, with_what=".")
if task_version is not None and not isinstance(task_version, basestring):
task_version = str(task_version)
return task_version
def get_task_name(task):
"""Gets a tasks *string* name, whether it is a task object/function."""
task_name = ""
if isinstance(task, (types.MethodType, types.FunctionType)):
# If its a function look for the attributes that should have been
# set using the task() decorator provided in the decorators file. If
# those have not been set, then we should at least have enough basic
# information (not a version) to form a useful task name.
task_name = get_attr(task, 'name')
if not task_name:
name_pieces = [a for a in get_many_attr(task,
'__module__',
'__name__')
if a is not None]
task_name = join(name_pieces, ".")
else:
task_name = str(task)
return task_name
def is_version_compatible(version_1, version_2):
"""Checks for major version compatibility of two *string" versions."""
if version_1 == version_2:
# Equivalent exactly, so skip the rest.
return True
def _convert_to_pieces(version):
try:
pieces = []
for p in version.split("."):
p = p.strip()
if not len(p):
pieces.append(0)
continue
# Clean off things like 1alpha, or 2b and just select the
# digit that starts that entry instead.
p_match = re.match(r"(\d+)([A-Za-z]*)(.*)", p)
if p_match:
p = p_match.group(1)
pieces.append(int(p))
except (AttributeError, TypeError, ValueError):
pieces = []
return pieces
version_1_pieces = _convert_to_pieces(version_1)
version_2_pieces = _convert_to_pieces(version_2)
if len(version_1_pieces) == 0 or len(version_2_pieces) == 0:
return False
# Ensure major version compatibility to start.
major1 = version_1_pieces[0]
major2 = version_2_pieces[0]
if major1 != major2:
return False
return True
def await(check_functor, timeout=None):
if timeout is not None:
end_time = time.time() + max(0, timeout)
else:
end_time = None
# Use the same/similar scheme that the python condition class uses.
delay = 0.0005
while not check_functor():
time.sleep(delay)
if end_time is not None:
remaining = end_time - time.time()
if remaining <= 0:
return False
delay = min(delay * 2, remaining, 0.05)
else:
delay = min(delay * 2, 0.05)
return True
class LastFedIter(object):
"""An iterator which yields back the first item and then yields back
results from the provided iterator.
"""
def __init__(self, first, rest_itr):
self.first = first
self.rest_itr = rest_itr
def __iter__(self):
yield self.first
for i in self.rest_itr:
yield i
class FlowFailure(object):
"""When a task failure occurs the following object will be given to revert
and can be used to interrogate what caused the failure.
"""
def __init__(self, runner, flow, exception):
self.runner = runner
self.flow = flow
self.exc = exception
self.exc_info = sys.exc_info()
class RollbackTask(object):
"""A helper task that on being called will call the underlying callable
tasks revert method (if said method exists).
"""
def __init__(self, context, task, result):
self.task = task
self.result = result
self.context = context
def __str__(self):
return str(self.task)
def __call__(self, cause):
if ((hasattr(self.task, "revert") and
isinstance(self.task.revert, collections.Callable))):
self.task.revert(self.context, self.result, cause)
class Runner(object):
"""A helper class that wraps a task and can find the needed inputs for
the task to run, as well as providing a uuid and other useful functionality
for users of the task.
TODO(harlowja): replace with the task details object or a subclass of
that???
"""
def __init__(self, task, uuid=None):
assert isinstance(task, collections.Callable)
self.task = task
self.providers = {}
self.runs_before = []
self.result = None
if not uuid:
self._id = uuidutils.generate_uuid()
else:
self._id = str(uuid)
@property
def uuid(self):
return "r-%s" % (self._id)
@property
def requires(self):
return set(get_attr(self.task, 'requires', []))
@property
def provides(self):
return set(get_attr(self.task, 'provides', []))
@property
def optional(self):
return set(get_attr(self.task, 'optional', []))
@property
def version(self):
return get_task_version(self.task)
@property
def name(self):
return get_task_name(self.task)
def reset(self):
self.result = None
def __str__(self):
lines = ["Runner: %s" % (self.name)]
lines.append("%s" % (self.uuid))
lines.append("%s" % (self.version))
return "; ".join(lines)
def __call__(self, *args, **kwargs):
# Find all of our inputs first.
kwargs = dict(kwargs)
for (k, who_made) in self.providers.iteritems():
if who_made.result and k in who_made.result:
kwargs[k] = who_made.result[k]
else:
kwargs[k] = None
optional_keys = self.optional
optional_missing_keys = optional_keys - set(kwargs.keys())
if optional_missing_keys:
for k in optional_missing_keys:
for r in self.runs_before:
r_provides = r.provides
if k in r_provides and r.result and k in r.result:
kwargs[k] = r.result[k]
break
# And now finally run.
self.result = self.task(*args, **kwargs)
return self.result
class TransitionNotifier(object):
"""A utility helper class that can be used to subscribe to
notifications of events occuring as well as allow a entity to post said
notifications to subscribers.
"""
RESERVED_KEYS = ('details',)
ANY = '*'
def __init__(self):
self._listeners = collections.defaultdict(list)
def reset(self):
self._listeners = collections.defaultdict(list)
def notify(self, state, details):
listeners = list(self._listeners.get(self.ANY, []))
for i in self._listeners[state]:
if i not in listeners:
listeners.append(i)
if not listeners:
return
for (callback, args, kwargs) in listeners:
if args is None:
args = []
if kwargs is None:
kwargs = {}
kwargs['details'] = details
try:
callback(state, *args, **kwargs)
except Exception:
LOG.exception(("Failure calling callback %s to notify about"
" state transition %s"), callback, state)
def register(self, state, callback, args=None, kwargs=None):
assert isinstance(callback, collections.Callable)
for i, (cb, args, kwargs) in enumerate(self._listeners.get(state, [])):
if cb is callback:
raise ValueError("Callback %s already registered" % (callback))
if kwargs:
for k in self.RESERVED_KEYS:
if k in kwargs:
raise KeyError(("Reserved key '%s' not allowed in "
"kwargs") % k)
kwargs = copy.copy(kwargs)
if args:
args = copy.copy(args)
self._listeners[state].append((callback, args, kwargs))
def deregister(self, state, callback):
if state not in self._listeners:
return
for i, (cb, args, kwargs) in enumerate(self._listeners[state]):
if cb is callback:
self._listeners[state].pop(i)
break
class RollbackAccumulator(object):
"""A utility class that can help in organizing 'undo' like code
so that said code be rolled back on failure (automatically or manually)
by activating rollback callables that were inserted during said codes
progression.
"""
def __init__(self):
self._rollbacks = []
def add(self, *callables):
self._rollbacks.extend(callables)
def reset(self):
self._rollbacks = []
def __len__(self):
return len(self._rollbacks)
def __iter__(self):
# Rollbacks happen in the reverse order that they were added.
return reversed(self._rollbacks)
def __enter__(self):
return self
def rollback(self, cause):
LOG.warn("Activating %s rollbacks due to %s.", len(self), cause)
for (i, f) in enumerate(self):
LOG.debug("Calling rollback %s: %s", i + 1, f)
try:
f(cause)
except Exception:
LOG.exception(("Failed rolling back %s: %s due "
"to inner exception."), i + 1, f)
def __exit__(self, type, value, tb):
if any((value, type, tb)):
self.rollback(value)
class ReaderWriterLock(object):
"""A simple reader-writer lock.
Several readers can hold the lock simultaneously, and only one writer.
Write locks have priority over reads to prevent write starvation.
Public domain @ http://majid.info/blog/a-reader-writer-lock-for-python/
"""
def __init__(self):
self.rwlock = 0
self.writers_waiting = 0
self.monitor = threading.Lock()
self.readers_ok = threading.Condition(self.monitor)
self.writers_ok = threading.Condition(self.monitor)
@contextlib.contextmanager
def acquire(self, read=True):
"""Acquire a read or write lock in a context manager."""
try:
if read:
self.acquire_read()
else:
self.acquire_write()
yield self
finally:
self.release()
def acquire_read(self):
"""Acquire a read lock.
Several threads can hold this typeof lock.
It is exclusive with write locks.
"""
self.monitor.acquire()
while self.rwlock < 0 or self.writers_waiting:
self.readers_ok.wait()
self.rwlock += 1
self.monitor.release()
def acquire_write(self):
"""Acquire a write lock.
Only one thread can hold this lock, and only when no read locks
are also held.
"""
self.monitor.acquire()
while self.rwlock != 0:
self.writers_waiting += 1
self.writers_ok.wait()
self.writers_waiting -= 1
self.rwlock = -1
self.monitor.release()
def release(self):
"""Release a lock, whether read or write."""
self.monitor.acquire()
if self.rwlock < 0:
self.rwlock = 0
else:
self.rwlock -= 1
wake_writers = self.writers_waiting and self.rwlock == 0
wake_readers = self.writers_waiting == 0
self.monitor.release()
if wake_writers:
self.writers_ok.acquire()
self.writers_ok.notify()
self.writers_ok.release()
elif wake_readers:
self.readers_ok.acquire()
self.readers_ok.notifyAll()
self.readers_ok.release()
class LazyPluggable(object):
"""A pluggable backend loaded lazily based on some value."""
def __init__(self, pivot, **backends):
self.__backends = backends
self.__pivot = pivot
self.__backend = None
def __get_backend(self):
if not self.__backend:
backend_name = 'sqlalchemy'
backend = self.__backends[backend_name]
if isinstance(backend, tuple):
name = backend[0]
fromlist = backend[1]
else:
name = backend
fromlist = backend
self.__backend = __import__(name, None, None, fromlist)
return self.__backend
def __getattr__(self, key):
backend = self.__get_backend()
return getattr(backend, key)

View File

@ -49,6 +49,7 @@ from cinder.tests.image import fake as fake_image
from cinder.volume import configuration as conf
from cinder.volume import driver
from cinder.volume.drivers import lvm
from cinder.volume.flows import create_volume
from cinder.volume import rpcapi as volume_rpcapi
from cinder.volume import utils as volutils
@ -151,7 +152,7 @@ class VolumeTestCase(test.TestCase):
'volume_type': None,
'snapshot_id': None,
'user_id': 'fake',
'launched_at': '',
'launched_at': 'DONTCARE',
'size': 0,
}
self.assertDictMatch(msg['payload'], expected)
@ -167,7 +168,7 @@ class VolumeTestCase(test.TestCase):
'volume_type': None,
'snapshot_id': None,
'user_id': 'fake',
'launched_at': '',
'launched_at': 'DONTCARE',
'size': 0,
}
self.assertDictMatch(msg['payload'], expected)
@ -1376,14 +1377,10 @@ class VolumeTestCase(test.TestCase):
def test_create_volume_from_unelevated_context(self):
"""Test context does't change after volume creation failure."""
def fake_create_volume(context, volume_ref, snapshot_ref,
sourcevol_ref, image_service, image_id,
image_location):
def fake_create_volume(*args, **kwargs):
raise exception.CinderException('fake exception')
def fake_reschedule_or_error(context, volume_id, exc_info,
snapshot_id, image_id, request_spec,
filter_properties):
def fake_reschedule_or_error(self, context, *args, **kwargs):
self.assertFalse(context.is_admin)
self.assertFalse('admin' in context.roles)
#compare context passed in with the context we saved
@ -1398,10 +1395,9 @@ class VolumeTestCase(test.TestCase):
#create one copy of context for future comparison
self.saved_ctxt = ctxt.deepcopy()
self.stubs.Set(self.volume, '_reschedule_or_error',
self.stubs.Set(create_volume.OnFailureRescheduleTask, '_reschedule',
fake_reschedule_or_error)
self.stubs.Set(self.volume, '_create_volume',
fake_create_volume)
self.stubs.Set(self.volume.driver, 'create_volume', fake_create_volume)
volume_src = self._create_volume()
self.assertRaises(exception.CinderException,
@ -1490,13 +1486,6 @@ class VolumeTestCase(test.TestCase):
db.volume_update(self.context, src_vref['id'], {'status': 'error'})
raise exception.CinderException('fake exception')
def fake_reschedule_or_error(context, volume_id, exc_info,
snapshot_id, image_id, request_spec,
filter_properties):
pass
self.stubs.Set(self.volume, '_reschedule_or_error',
fake_reschedule_or_error)
self.stubs.Set(self.volume.driver, 'create_cloned_volume',
fake_error_create_cloned_volume)
volume_src = self._create_volume()

View File

@ -86,6 +86,54 @@ def find_config(config_path):
raise exception.ConfigNotFound(path=os.path.abspath(config_path))
def as_int(obj, quiet=True):
# Try "2" -> 2
try:
return int(obj)
except (ValueError, TypeError):
pass
# Try "2.5" -> 2
try:
return int(float(obj))
except (ValueError, TypeError):
pass
# Eck, not sure what this is then.
if not quiet:
raise TypeError(_("Can not translate %s to integer.") % (obj))
return obj
def check_exclusive_options(**kwargs):
"""Checks that only one of the provided options is actually not-none.
Iterates over all the kwargs passed in and checks that only one of said
arguments is not-none, if more than one is not-none then an exception will
be raised with the names of those arguments who were not-none.
"""
if not kwargs:
return
pretty_keys = kwargs.pop("pretty_keys", True)
exclusive_options = {}
for (k, v) in kwargs.iteritems():
if v is not None:
exclusive_options[k] = True
if len(exclusive_options) > 1:
# Change the format of the names from pythonic to
# something that is more readable.
#
# Ex: 'the_key' -> 'the key'
if pretty_keys:
names = [k.replace('_', ' ') for k in kwargs.keys()]
else:
names = kwargs.keys()
names = ", ".join(sorted(names))
msg = (_("May specify only one of %s") % (names))
raise exception.InvalidInput(reason=msg)
def fetchfile(url, target):
LOG.debug(_('Fetching %s') % url)
execute('curl', '--fail', url, '-o', target)

View File

@ -37,9 +37,12 @@ from cinder import quota
from cinder.scheduler import rpcapi as scheduler_rpcapi
from cinder import units
from cinder import utils
from cinder.volume.flows import create_volume
from cinder.volume import rpcapi as volume_rpcapi
from cinder.volume import volume_types
from cinder.taskflow import states
volume_host_opt = cfg.BoolOpt('snapshot_same_host',
default=True,
@ -56,7 +59,6 @@ CONF.register_opt(volume_same_az_opt)
CONF.import_opt('storage_availability_zone', 'cinder.volume.manager')
LOG = logging.getLogger(__name__)
GB = units.GiB
QUOTAS = quota.QUOTAS
@ -95,248 +97,17 @@ class API(base.Base):
self.availability_zone_names = ()
super(API, self).__init__(db_driver)
def create(self, context, size, name, description, snapshot=None,
image_id=None, volume_type=None, metadata=None,
availability_zone=None, source_volume=None,
scheduler_hints=None):
exclusive_options = (snapshot, image_id, source_volume)
exclusive_options_set = sum(1 for option in
exclusive_options if option is not None)
if exclusive_options_set > 1:
msg = (_("May specify only one of snapshot, imageRef "
"or source volume"))
raise exception.InvalidInput(reason=msg)
check_policy(context, 'create')
if snapshot is not None:
if snapshot['status'] != "available":
msg = _("status must be available")
raise exception.InvalidSnapshot(reason=msg)
if not size:
size = snapshot['volume_size']
elif size < snapshot['volume_size']:
msg = _("Volume size cannot be lesser than"
" the Snapshot size")
raise exception.InvalidInput(reason=msg)
snapshot_id = snapshot['id']
else:
snapshot_id = None
if source_volume is not None:
if source_volume['status'] == "error":
msg = _("Unable to clone volumes that are in an error state")
raise exception.InvalidSourceVolume(reason=msg)
if not size:
size = source_volume['size']
else:
if size < source_volume['size']:
msg = _("Clones currently must be "
">= original volume size.")
raise exception.InvalidInput(reason=msg)
source_volid = source_volume['id']
else:
source_volid = None
def as_int(s):
try:
return int(s)
except (ValueError, TypeError):
return s
# tolerate size as stringified int
size = as_int(size)
if not isinstance(size, int) or size <= 0:
msg = (_("Volume size '%s' must be an integer and greater than 0")
% size)
raise exception.InvalidInput(reason=msg)
if (image_id and not (source_volume or snapshot)):
# check image existence
image_meta = self.image_service.show(context, image_id)
image_size_in_gb = (int(image_meta['size']) + GB - 1) / GB
#check image size is not larger than volume size.
if image_size_in_gb > size:
msg = _('Size of specified image is larger than volume size.')
raise exception.InvalidInput(reason=msg)
# Check image minDisk requirement is met for the particular volume
if size < image_meta.get('min_disk', 0):
msg = _('Image minDisk size is larger than the volume size.')
raise exception.InvalidInput(reason=msg)
if availability_zone is None:
if snapshot is not None:
availability_zone = snapshot['volume']['availability_zone']
elif source_volume is not None:
availability_zone = source_volume['availability_zone']
else:
availability_zone = CONF.storage_availability_zone
else:
self._check_availabilty_zone(availability_zone)
if CONF.cloned_volume_same_az:
if (snapshot and
snapshot['volume']['availability_zone'] !=
availability_zone):
msg = _("Volume must be in the same "
"availability zone as the snapshot")
raise exception.InvalidInput(reason=msg)
elif source_volume and \
source_volume['availability_zone'] != availability_zone:
msg = _("Volume must be in the same "
"availability zone as the source volume")
raise exception.InvalidInput(reason=msg)
if not volume_type and not source_volume:
volume_type = volume_types.get_default_volume_type()
if not volume_type and source_volume:
volume_type_id = source_volume['volume_type_id']
else:
volume_type_id = volume_type.get('id')
try:
reserve_opts = {'volumes': 1, 'gigabytes': size}
QUOTAS.add_volume_type_opts(context, reserve_opts, volume_type_id)
reservations = QUOTAS.reserve(context, **reserve_opts)
except exception.OverQuota as e:
overs = e.kwargs['overs']
usages = e.kwargs['usages']
quotas = e.kwargs['quotas']
def _consumed(name):
return (usages[name]['reserved'] + usages[name]['in_use'])
for over in overs:
if 'gigabytes' in over:
msg = _("Quota exceeded for %(s_pid)s, tried to create "
"%(s_size)sG volume (%(d_consumed)dG of "
"%(d_quota)dG already consumed)")
LOG.warn(msg % {'s_pid': context.project_id,
's_size': size,
'd_consumed': _consumed(over),
'd_quota': quotas[over]})
raise exception.VolumeSizeExceedsAvailableQuota()
elif 'volumes' in over:
msg = _("Quota exceeded for %(s_pid)s, tried to create "
"volume (%(d_consumed)d volumes"
"already consumed)")
LOG.warn(msg % {'s_pid': context.project_id,
'd_consumed': _consumed(over)})
raise exception.VolumeLimitExceeded(allowed=quotas[over])
self._check_metadata_properties(context, metadata)
options = {'size': size,
'user_id': context.user_id,
'project_id': context.project_id,
'snapshot_id': snapshot_id,
'availability_zone': availability_zone,
'status': "creating",
'attach_status': "detached",
'display_name': name,
'display_description': description,
'volume_type_id': volume_type_id,
'metadata': metadata,
'source_volid': source_volid}
try:
volume = self.db.volume_create(context, options)
QUOTAS.commit(context, reservations)
except Exception:
with excutils.save_and_reraise_exception():
try:
self.db.volume_destroy(context, volume['id'])
finally:
QUOTAS.rollback(context, reservations)
request_spec = {'volume_properties': options,
'volume_type': volume_type,
'volume_id': volume['id'],
'snapshot_id': volume['snapshot_id'],
'image_id': image_id,
'source_volid': volume['source_volid']}
if scheduler_hints:
filter_properties = {'scheduler_hints': scheduler_hints}
else:
filter_properties = {}
self._cast_create_volume(context, request_spec, filter_properties)
return volume
def _cast_create_volume(self, context, request_spec, filter_properties):
# NOTE(Rongze Zhu): It is a simple solution for bug 1008866
# If snapshot_id is set, make the call create volume directly to
# the volume host where the snapshot resides instead of passing it
# through the scheduler. So snapshot can be copy to new volume.
source_volid = request_spec['source_volid']
volume_id = request_spec['volume_id']
snapshot_id = request_spec['snapshot_id']
image_id = request_spec['image_id']
if snapshot_id and CONF.snapshot_same_host:
snapshot_ref = self.db.snapshot_get(context, snapshot_id)
source_volume_ref = self.db.volume_get(context,
snapshot_ref['volume_id'])
now = timeutils.utcnow()
values = {'host': source_volume_ref['host'], 'scheduled_at': now}
volume_ref = self.db.volume_update(context, volume_id, values)
# bypass scheduler and send request directly to volume
self.volume_rpcapi.create_volume(
context,
volume_ref,
volume_ref['host'],
request_spec=request_spec,
filter_properties=filter_properties,
allow_reschedule=False,
snapshot_id=snapshot_id,
image_id=image_id)
elif source_volid:
source_volume_ref = self.db.volume_get(context,
source_volid)
now = timeutils.utcnow()
values = {'host': source_volume_ref['host'], 'scheduled_at': now}
volume_ref = self.db.volume_update(context, volume_id, values)
# bypass scheduler and send request directly to volume
self.volume_rpcapi.create_volume(
context,
volume_ref,
volume_ref['host'],
request_spec=request_spec,
filter_properties=filter_properties,
allow_reschedule=False,
snapshot_id=snapshot_id,
image_id=image_id,
source_volid=source_volid)
else:
self.scheduler_rpcapi.create_volume(
context,
CONF.volume_topic,
volume_id,
snapshot_id,
image_id,
request_spec=request_spec,
filter_properties=filter_properties)
def _check_availabilty_zone(self, availability_zone):
def _valid_availabilty_zone(self, availability_zone):
#NOTE(bcwaldon): This approach to caching fails to handle the case
# that an availability zone is disabled/removed.
if availability_zone in self.availability_zone_names:
return
return True
if CONF.storage_availability_zone == availability_zone:
return True
azs = self.list_availability_zones()
self.availability_zone_names = [az['name'] for az in azs]
if availability_zone not in self.availability_zone_names:
msg = _("Availability zone is invalid")
LOG.warn(msg)
raise exception.InvalidInput(reason=msg)
return availability_zone in self.availability_zone_names
def list_availability_zones(self):
"""Describe the known availability zones
@ -358,6 +129,56 @@ class API(base.Base):
return tuple(azs)
def create(self, context, size, name, description, snapshot=None,
image_id=None, volume_type=None, metadata=None,
availability_zone=None, source_volume=None,
scheduler_hints=None):
def check_volume_az_zone(availability_zone):
try:
return self._valid_availabilty_zone(availability_zone)
except exception.CinderException:
LOG.exception(_("Unable to query if %s is in the "
"availability zone set"), availability_zone)
return False
create_what = {
'size': size,
'name': name,
'description': description,
'snapshot': snapshot,
'image_id': image_id,
'volume_type': volume_type,
'metadata': metadata,
'availability_zone': availability_zone,
'source_volume': source_volume,
'scheduler_hints': scheduler_hints,
}
(flow, uuid) = create_volume.get_api_flow(self.scheduler_rpcapi,
self.volume_rpcapi,
self.db,
self.image_service,
check_volume_az_zone,
create_what)
assert flow, _('Create volume flow not retrieved')
flow.run(context)
if flow.state != states.SUCCESS:
raise exception.CinderException(_("Failed to successfully complete"
" create volume workflow"))
# Extract the volume information from the task uuid that was specified
# to produce said information.
volume = None
try:
volume = flow.results[uuid]['volume']
except KeyError:
pass
# Raise an error, nobody provided it??
assert volume, _('Expected volume result not found')
return volume
@wrap_check_policy
def delete(self, context, volume, force=False):
if context.is_admin and context.project_id != volume['project_id']:

View File

@ -0,0 +1,17 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

File diff suppressed because it is too large Load Diff

View File

@ -58,9 +58,12 @@ from cinder.openstack.common import uuidutils
from cinder import quota
from cinder import utils
from cinder.volume.configuration import Configuration
from cinder.volume.flows import create_volume
from cinder.volume import rpcapi as volume_rpcapi
from cinder.volume import utils as volume_utils
from cinder.taskflow import states
LOG = logging.getLogger(__name__)
QUOTAS = quota.QUOTAS
@ -167,282 +170,34 @@ class VolumeManager(manager.SchedulerDependentManager):
# collect and publish service capabilities
self.publish_service_capabilities(ctxt)
def _create_volume(self, context, volume_ref, snapshot_ref,
srcvol_ref, image_service, image_id, image_location):
cloned = None
model_update = False
if all(x is None for x in(snapshot_ref, image_id, srcvol_ref)):
model_update = self.driver.create_volume(volume_ref)
elif snapshot_ref is not None:
model_update = self.driver.create_volume_from_snapshot(
volume_ref,
snapshot_ref)
originating_vref = self.db.volume_get(context,
snapshot_ref['volume_id'])
if originating_vref.bootable:
self.db.volume_update(context,
volume_ref['id'],
{'bootable': True})
elif srcvol_ref is not None:
model_update = self.driver.create_cloned_volume(volume_ref,
srcvol_ref)
if srcvol_ref.bootable:
self.db.volume_update(context,
volume_ref['id'],
{'bootable': True})
else:
# create the volume from an image
# NOTE (singn): two params need to be returned
# dict containing provider_location for cloned volume
# and clone status
model_update, cloned = self.driver.clone_image(
volume_ref, image_location)
if not cloned:
model_update = self.driver.create_volume(volume_ref)
updates = dict(model_update or dict(), status='downloading')
volume_ref = self.db.volume_update(context,
volume_ref['id'],
updates)
# TODO(jdg): Wrap this in a try block and update status
# appropriately if the download image fails
self._copy_image_to_volume(context,
volume_ref,
image_service,
image_id)
self.db.volume_update(context,
volume_ref['id'],
{'bootable': True})
return model_update, cloned
def create_volume(self, context, volume_id, request_spec=None,
filter_properties=None, allow_reschedule=True,
snapshot_id=None, image_id=None, source_volid=None):
"""Creates and exports the volume."""
context_saved = context.deepcopy()
context = context.elevated()
if filter_properties is None:
filter_properties = {}
volume_ref = self.db.volume_get(context, volume_id)
self._notify_about_volume_usage(context, volume_ref, "create.start")
# NOTE(vish): so we don't have to get volume from db again
# before passing it to the driver.
volume_ref['host'] = self.host
flow = create_volume.get_manager_flow(
self.db,
self.driver,
self.scheduler_rpcapi,
self.host,
volume_id,
request_spec=request_spec,
filter_properties=filter_properties,
allow_reschedule=allow_reschedule,
snapshot_id=snapshot_id,
image_id=image_id,
source_volid=source_volid,
reschedule_context=context.deepcopy())
if volume_ref['status'] == 'migration_target_creating':
status = 'migration_target'
else:
status = 'available'
model_update = False
image_meta = None
cloned = False
assert flow, _('Manager volume flow not retrieved')
try:
LOG.debug(_("volume %(vol_name)s: creating lv of"
" size %(vol_size)sG"),
{'vol_name': volume_ref['name'],
'vol_size': volume_ref['size']})
snapshot_ref = None
sourcevol_ref = None
image_service = None
image_location = None
image_meta = None
flow.run(context.elevated())
if flow.state != states.SUCCESS:
raise exception.CinderException(_("Failed to successfully complete"
" manager volume workflow"))
if snapshot_id is not None:
LOG.info(_("volume %s: creating from snapshot"),
volume_ref['name'])
snapshot_ref = self.db.snapshot_get(context, snapshot_id)
elif source_volid is not None:
LOG.info(_("volume %s: creating from existing volume"),
volume_ref['name'])
sourcevol_ref = self.db.volume_get(context, source_volid)
elif image_id is not None:
LOG.info(_("volume %s: creating from image"),
volume_ref['name'])
# create the volume from an image
image_service, image_id = \
glance.get_remote_image_service(context,
image_id)
image_location = image_service.get_location(context, image_id)
image_meta = image_service.show(context, image_id)
else:
LOG.info(_("volume %s: creating"), volume_ref['name'])
try:
model_update, cloned = self._create_volume(context,
volume_ref,
snapshot_ref,
sourcevol_ref,
image_service,
image_id,
image_location)
except exception.ImageCopyFailure as ex:
LOG.error(_('Setting volume: %s status to error '
'after failed image copy.'), volume_ref['id'])
self.db.volume_update(context,
volume_ref['id'],
{'status': 'error'})
return
except Exception:
exc_info = sys.exc_info()
# restore source volume status before reschedule
# FIXME(zhiteng) do all the clean-up before reschedule
if sourcevol_ref is not None:
self.db.volume_update(context, sourcevol_ref['id'],
{'status': sourcevol_ref['status']})
rescheduled = False
# try to re-schedule volume:
if allow_reschedule:
rescheduled = self._reschedule_or_error(context_saved,
volume_id,
exc_info,
snapshot_id,
image_id,
request_spec,
filter_properties)
if rescheduled:
LOG.error(_('Unexpected Error: '), exc_info=exc_info)
msg = (_('Creating %(volume_id)s %(snapshot_id)s '
'%(image_id)s was rescheduled due to '
'%(reason)s')
% {'volume_id': volume_id,
'snapshot_id': snapshot_id,
'image_id': image_id,
'reason': unicode(exc_info[1])})
raise exception.CinderException(msg)
else:
# not re-scheduling
raise exc_info[0], exc_info[1], exc_info[2]
if model_update:
volume_ref = self.db.volume_update(
context, volume_ref['id'], model_update)
if sourcevol_ref is not None:
self.db.volume_glance_metadata_copy_from_volume_to_volume(
context,
source_volid,
volume_id)
LOG.debug(_("volume %s: creating export"), volume_ref['name'])
model_update = self.driver.create_export(context, volume_ref)
if model_update:
self.db.volume_update(context, volume_ref['id'], model_update)
except Exception:
with excutils.save_and_reraise_exception():
volume_ref['status'] = 'error'
self.db.volume_update(context,
volume_ref['id'],
{'status': volume_ref['status']})
LOG.error(_("volume %s: create failed"), volume_ref['name'])
self._notify_about_volume_usage(context, volume_ref,
"create.end")
if snapshot_id:
# Copy any Glance metadata from the original volume
self.db.volume_glance_metadata_copy_to_volume(context,
volume_ref['id'],
snapshot_id)
if image_id and image_meta:
# Copy all of the Glance image properties to the
# volume_glance_metadata table for future reference.
self.db.volume_glance_metadata_create(context,
volume_ref['id'],
'image_id', image_id)
name = image_meta.get('name', None)
if name:
self.db.volume_glance_metadata_create(context,
volume_ref['id'],
'image_name', name)
# Save some more attributes into the volume metadata
IMAGE_ATTRIBUTES = ['size', 'disk_format',
'container_format', 'checksum',
'min_disk', 'min_ram']
for key in IMAGE_ATTRIBUTES:
value = image_meta.get(key, None)
if value is not None:
self.db.volume_glance_metadata_create(context,
volume_ref['id'],
key, value)
image_properties = image_meta.get('properties', {})
for key, value in image_properties.items():
self.db.volume_glance_metadata_create(context,
volume_ref['id'],
key, value)
now = timeutils.utcnow()
volume_ref['status'] = status
self.db.volume_update(context,
volume_ref['id'],
{'status': volume_ref['status'],
'launched_at': now})
LOG.info(_("volume %s: created successfully"), volume_ref['name'])
self._reset_stats()
self._notify_about_volume_usage(context, volume_ref, "create.end")
return volume_ref['id']
def _reschedule_or_error(self, context, volume_id, exc_info,
snapshot_id, image_id, request_spec,
filter_properties):
"""Try to re-schedule the request."""
rescheduled = False
try:
method_args = (CONF.volume_topic, volume_id, snapshot_id,
image_id, request_spec, filter_properties)
rescheduled = self._reschedule(context, request_spec,
filter_properties, volume_id,
self.scheduler_rpcapi.create_volume,
method_args,
exc_info)
except Exception:
rescheduled = False
LOG.exception(_("volume %s: Error trying to reschedule create"),
volume_id)
return rescheduled
def _reschedule(self, context, request_spec, filter_properties,
volume_id, scheduler_method, method_args,
exc_info=None):
"""Attempt to re-schedule a volume operation."""
retry = filter_properties.get('retry', None)
if not retry:
# no retry information, do not reschedule.
LOG.debug(_("Retry info not present, will not reschedule"))
return
if not request_spec:
LOG.debug(_("No request spec, will not reschedule"))
return
request_spec['volume_id'] = volume_id
LOG.debug(_("volume %(volume_id)s: re-scheduling %(method)s "
"attempt %(num)d") %
{'volume_id': volume_id,
'method': scheduler_method.func_name,
'num': retry['num_attempts']})
# reset the volume state:
now = timeutils.utcnow()
self.db.volume_update(context, volume_id,
{'status': 'creating',
'scheduled_at': now})
if exc_info:
# stringify to avoid circular ref problem in json serialization:
retry['exc'] = traceback.format_exception(*exc_info)
scheduler_method(context, *method_args)
return True
return volume_id
def delete_volume(self, context, volume_id):
"""Deletes and unexports volume."""
@ -680,28 +435,6 @@ class VolumeManager(manager.SchedulerDependentManager):
volume['name'] not in volume['provider_location']):
self.driver.ensure_export(context, volume)
def _copy_image_to_volume(self, context, volume, image_service, image_id):
"""Downloads Glance image to the specified volume."""
volume_id = volume['id']
try:
self.driver.copy_image_to_volume(context, volume,
image_service,
image_id)
except exception.ProcessExecutionError as ex:
LOG.error(_("Failed to copy image to volume: %(volume_id)s, "
"error: %(error)s") % {'volume_id': volume_id,
'error': ex.stderr})
raise exception.ImageCopyFailure(reason=ex.stderr)
except Exception as ex:
LOG.error(_("Failed to copy image to volume: %(volume_id)s, "
"error: %(error)s") % {'volume_id': volume_id,
'error': ex})
raise exception.ImageCopyFailure(reason=ex)
LOG.info(_("Downloaded image %(image_id)s to %(volume_id)s "
"successfully.") % {'image_id': image_id,
'volume_id': volume_id})
def copy_volume_to_image(self, context, volume_id, image_meta):
"""Uploads the specified volume to Glance.

7
taskflow.conf Normal file
View File

@ -0,0 +1,7 @@
[DEFAULT]
# The list of primitives to copy from taskflow
primitives=flow.linear_flow,task
# The base module to hold the copy of taskflow
base=cinder