Merge "Use a task subclass instead of a functor + task wrapper"

This commit is contained in:
Jenkins 2014-07-01 05:03:09 +00:00 committed by Gerrit Code Review
commit e8f0cf51fb
1 changed files with 78 additions and 51 deletions

View File

@ -12,7 +12,6 @@
import taskflow.engines
from taskflow.patterns import linear_flow
from taskflow import task
from cinder import exception
from cinder import flow_utils
@ -35,10 +34,10 @@ class ExtractSchedulerSpecTask(flow_utils.CinderTask):
default_provides = set(['request_spec'])
def __init__(self, db, **kwargs):
def __init__(self, db_api, **kwargs):
super(ExtractSchedulerSpecTask, self).__init__(addons=[ACTION],
**kwargs)
self.db = db
self.db_api = db_api
def _populate_request_spec(self, context, volume_id, snapshot_id,
image_id):
@ -52,9 +51,9 @@ class ExtractSchedulerSpecTask(flow_utils.CinderTask):
if not volume_id:
msg = _("No volume_id provided to populate a request_spec from")
raise exception.InvalidInput(reason=msg)
volume_ref = self.db.volume_get(context, volume_id)
volume_ref = self.db_api.volume_get(context, volume_id)
volume_type_id = volume_ref.get('volume_type_id')
vol_type = self.db.volume_type_get(context, volume_type_id)
vol_type = self.db_api.volume_type_get(context, volume_type_id)
return {
'volume_id': volume_id,
'snapshot_id': snapshot_id,
@ -78,7 +77,76 @@ class ExtractSchedulerSpecTask(flow_utils.CinderTask):
}
def get_flow(context, db, driver, request_spec=None,
class ScheduleCreateVolumeTask(flow_utils.CinderTask):
"""Activates a scheduler driver and handles any subsequent failures.
Notification strategy: on failure the scheduler rpc notifier will be
activated and a notification will be emitted indicating what errored,
the reason, and the request (and misc. other data) that caused the error
to be triggered.
Reversion strategy: N/A
"""
FAILURE_TOPIC = "scheduler.create_volume"
def __init__(self, db_api, driver_api, **kwargs):
super(ScheduleCreateVolumeTask, self).__init__(addons=[ACTION],
**kwargs)
self.db_api = db_api
self.driver_api = driver_api
def _handle_failure(self, context, request_spec, cause):
try:
self._notify_failure(context, request_spec, cause)
finally:
LOG.error(_("Failed to run task %(name)s: %(cause)s") %
{'cause': cause, 'name': self.name})
def _notify_failure(self, context, request_spec, cause):
"""When scheduling fails send out a event that it failed."""
payload = {
'request_spec': request_spec,
'volume_properties': request_spec.get('volume_properties', {}),
'volume_id': request_spec['volume_id'],
'state': 'error',
'method': 'create_volume',
'reason': cause,
}
try:
rpc.get_notifier('scheduler').error(context, self.FAILURE_TOPIC,
payload)
except exception.CinderException:
LOG.exception(_("Failed notifying on %(topic)s "
"payload %(payload)s") %
{'topic': self.FAILURE_TOPIC, 'payload': payload})
def execute(self, context, request_spec, filter_properties):
try:
self.driver_api.schedule_create_volume(context, request_spec,
filter_properties)
except exception.NoValidHost as e:
# No host found happened, notify on the scheduler queue and log
# that this happened and set the volume to errored out and
# *do not* reraise the error (since whats the point).
try:
self._handle_failure(context, request_spec, e)
finally:
common.error_out_volume(context, self.db_api,
request_spec['volume_id'], reason=e)
except Exception as e:
# Some other error happened, notify on the scheduler queue and log
# that this happened and set the volume to errored out and
# *do* reraise the error.
with excutils.save_and_reraise_exception():
try:
self._handle_failure(context, request_spec, e)
finally:
common.error_out_volume(context, self.db_api,
request_spec['volume_id'],
reason=e)
def get_flow(context, db_api, driver_api, request_spec=None,
filter_properties=None,
volume_id=None, snapshot_id=None, image_id=None):
@ -107,53 +175,12 @@ def get_flow(context, db, driver, request_spec=None,
# This will extract and clean the spec from the starting values.
scheduler_flow.add(ExtractSchedulerSpecTask(
db,
db_api,
rebind={'request_spec': 'raw_request_spec'}))
def schedule_create_volume(context, request_spec, filter_properties):
def _log_failure(cause):
LOG.error(_("Failed to schedule_create_volume: %(cause)s") %
{'cause': cause})
def _notify_failure(cause):
"""When scheduling fails send out a event that it failed."""
topic = "scheduler.create_volume"
payload = {
'request_spec': request_spec,
'volume_properties': request_spec.get('volume_properties', {}),
'volume_id': volume_id,
'state': 'error',
'method': 'create_volume',
'reason': cause,
}
try:
rpc.get_notifier('scheduler').error(context, topic, payload)
except exception.CinderException:
LOG.exception(_("Failed notifying on %(topic)s "
"payload %(payload)s") % {'topic': topic,
'payload': payload})
try:
driver.schedule_create_volume(context, request_spec,
filter_properties)
except exception.NoValidHost as e:
# Not host found happened, notify on the scheduler queue and log
# that this happened and set the volume to errored out and
# *do not* reraise the error (since whats the point).
_notify_failure(e)
_log_failure(e)
common.error_out_volume(context, db, volume_id, reason=e)
except Exception as e:
# Some other error happened, notify on the scheduler queue and log
# that this happened and set the volume to errored out and
# *do* reraise the error.
with excutils.save_and_reraise_exception():
_notify_failure(e)
_log_failure(e)
common.error_out_volume(context, db, volume_id, reason=e)
scheduler_flow.add(task.FunctorTask(schedule_create_volume))
# This will activate the desired scheduler driver (and handle any
# driver related failures appropriately).
scheduler_flow.add(ScheduleCreateVolumeTask(db_api, driver_api))
# Now load (but do not run) the flow using the provided initial data.
return taskflow.engines.load(scheduler_flow, store=create_what)