Scheduler API clean up and refactor
Unlike Nova scheduler, which has to consider serving compute and volume scheduling, Cinder scheduler only serves volume scheduling, so there's no need to keep generic interface 'schedule'. Instead, 'schedule_create_volume' is added (if missing) to manager/driver class and chance/simple scheduler driver implementation. Also this patch changes the interface between API service and scheduler to allow more information about volume is passed to scheduler for advanced scheduling while maintained backward compatibility. And this change bumps scheduler RPC API to version 1.2. Change-Id: I42be05675cd73f89a03c84105ec512d7ee4f3c3a
This commit is contained in:
@@ -24,9 +24,13 @@ Chance (Random) Scheduler implementation
|
||||
import random
|
||||
|
||||
from cinder import exception
|
||||
from cinder import flags
|
||||
from cinder.scheduler import driver
|
||||
|
||||
|
||||
FLAGS = flags.FLAGS
|
||||
|
||||
|
||||
class ChanceScheduler(driver.Scheduler):
|
||||
"""Implements Scheduler as a random node selector."""
|
||||
|
||||
@@ -54,8 +58,16 @@ class ChanceScheduler(driver.Scheduler):
|
||||
|
||||
return hosts[int(random.random() * len(hosts))]
|
||||
|
||||
def schedule(self, context, topic, method, *_args, **kwargs):
|
||||
def schedule_create_volume(self, context, request_spec, filter_properties):
|
||||
"""Picks a host that is up at random."""
|
||||
topic = FLAGS.volume_topic
|
||||
host = self._schedule(context, topic, request_spec,
|
||||
filter_properties=filter_properties)
|
||||
volume_id = request_spec['volume_id']
|
||||
snapshot_id = request_spec['snapshot_id']
|
||||
image_id = request_spec['image_id']
|
||||
|
||||
host = self._schedule(context, topic, None, **kwargs)
|
||||
driver.cast_to_host(context, topic, host, method, **kwargs)
|
||||
driver.cast_to_host(context, topic, host, 'create_volume',
|
||||
volume_id=volume_id,
|
||||
snapshot_id=snapshot_id,
|
||||
image_id=image_id)
|
||||
|
||||
@@ -107,3 +107,7 @@ class Scheduler(object):
|
||||
def schedule(self, context, topic, method, *_args, **_kwargs):
|
||||
"""Must override schedule method for scheduler to work."""
|
||||
raise NotImplementedError(_("Must implement a fallback schedule"))
|
||||
|
||||
def schedule_create_volume(self, context, request_spec, filter_properties):
|
||||
"""Must override schedule method for scheduler to work."""
|
||||
raise NotImplementedError(_("Must implement schedule_create_volume"))
|
||||
|
||||
@@ -24,12 +24,14 @@ Scheduler Service
|
||||
import functools
|
||||
|
||||
from cinder import db
|
||||
from cinder import exception
|
||||
from cinder import flags
|
||||
from cinder.openstack.common import log as logging
|
||||
from cinder import manager
|
||||
from cinder.openstack.common import cfg
|
||||
from cinder.openstack.common import excutils
|
||||
from cinder.openstack.common import importutils
|
||||
from cinder.openstack.common import log as logging
|
||||
from cinder.openstack.common.notifier import api as notifier
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
@@ -45,7 +47,7 @@ FLAGS.register_opt(scheduler_driver_opt)
|
||||
class SchedulerManager(manager.Manager):
|
||||
"""Chooses a host to create volumes"""
|
||||
|
||||
RPC_API_VERSION = '1.1'
|
||||
RPC_API_VERSION = '1.2'
|
||||
|
||||
def __init__(self, scheduler_driver=None, *args, **kwargs):
|
||||
if not scheduler_driver:
|
||||
@@ -53,13 +55,6 @@ class SchedulerManager(manager.Manager):
|
||||
self.driver = importutils.import_object(scheduler_driver)
|
||||
super(SchedulerManager, self).__init__(*args, **kwargs)
|
||||
|
||||
def __getattr__(self, key):
|
||||
"""Converts all method calls to use the schedule method"""
|
||||
# NOTE(russellb) Because of what this is doing, we must be careful
|
||||
# when changing the API of the scheduler drivers, as that changes
|
||||
# the rpc API as well, and the version should be updated accordingly.
|
||||
return functools.partial(self._schedule, key)
|
||||
|
||||
def get_host_list(self, context):
|
||||
"""Get a list of hosts from the HostManager."""
|
||||
return self.driver.get_host_list()
|
||||
@@ -76,23 +71,59 @@ class SchedulerManager(manager.Manager):
|
||||
self.driver.update_service_capabilities(service_name, host,
|
||||
capabilities)
|
||||
|
||||
def _schedule(self, method, context, topic, *args, **kwargs):
|
||||
"""Tries to call schedule_* method on the driver to retrieve host.
|
||||
Falls back to schedule(context, topic) if method doesn't exist.
|
||||
"""
|
||||
driver_method_name = 'schedule_%s' % method
|
||||
def create_volume(self, context, topic, volume_id, snapshot_id=None,
|
||||
image_id=None, request_spec=None,
|
||||
filter_properties=None):
|
||||
try:
|
||||
driver_method = getattr(self.driver, driver_method_name)
|
||||
args = (context,) + args
|
||||
except AttributeError, e:
|
||||
LOG.warning(_("Driver Method %(driver_method_name)s missing: "
|
||||
"%(e)s. Reverting to schedule()") % locals())
|
||||
driver_method = self.driver.schedule
|
||||
args = (context, topic, method) + args
|
||||
if request_spec is None:
|
||||
# For RPC version < 1.2 backward compatibility
|
||||
request_spec = {}
|
||||
volume_ref = db.volume_get(context, volume_id)
|
||||
size = volume_ref.get('size')
|
||||
availability_zone = volume_ref.get('availability_zone')
|
||||
volume_type_id = volume_ref.get('volume_type_id')
|
||||
vol_type = db.volume_type_get(context, volume_type_id)
|
||||
volume_properties = {'size': size,
|
||||
'availability_zone': availability_zone,
|
||||
'volume_type_id': volume_type_id}
|
||||
request_spec.update({'volume_id': volume_id,
|
||||
'snapshot_id': snapshot_id,
|
||||
'image_id': image_id,
|
||||
'volume_properties': volume_properties,
|
||||
'volume_type': dict(vol_type).iteritems()})
|
||||
|
||||
try:
|
||||
return driver_method(*args, **kwargs)
|
||||
except Exception:
|
||||
self.driver.schedule_create_volume(context, request_spec,
|
||||
filter_properties)
|
||||
except exception.NoValidHost as ex:
|
||||
volume_state = {'volume_state': {'status': 'error'}}
|
||||
self._set_volume_state_and_notify('create_volume',
|
||||
volume_state,
|
||||
context, ex, request_spec)
|
||||
except Exception as ex:
|
||||
with excutils.save_and_reraise_exception():
|
||||
volume_id = kwargs.get('volume_id')
|
||||
db.volume_update(context, volume_id, {'status': 'error'})
|
||||
volume_state = {'volume_state': {'status': 'error'}}
|
||||
self._set_volume_state_and_notify('create_volume',
|
||||
volume_state,
|
||||
context, ex, request_spec)
|
||||
|
||||
def _set_volume_state_and_notify(self, method, updates, context, ex,
|
||||
request_spec):
|
||||
LOG.warning(_("Failed to schedule_%(method)s: %(ex)s") % locals())
|
||||
|
||||
volume_state = updates['volume_state']
|
||||
properties = request_spec.get('volume_properties', {})
|
||||
|
||||
volume_id = request_spec.get('volume_id', None)
|
||||
|
||||
if volume_id:
|
||||
db.volume_update(context, volume_id, volume_state)
|
||||
|
||||
payload = dict(request_spec=request_spec,
|
||||
volume_properties=properties,
|
||||
volume_id=volume_id,
|
||||
state=volume_state,
|
||||
method=method,
|
||||
reason=ex)
|
||||
|
||||
notifier.notify(context, notifier.publisher_id("scheduler"),
|
||||
'scheduler.' + method, notifier.ERROR, payload)
|
||||
|
||||
@@ -32,6 +32,8 @@ class SchedulerAPI(cinder.openstack.common.rpc.proxy.RpcProxy):
|
||||
|
||||
1.0 - Initial version.
|
||||
1.1 - Add create_volume() method
|
||||
1.2 - Add request_spec, filter_properties arguments
|
||||
to create_volume()
|
||||
'''
|
||||
|
||||
RPC_API_VERSION = '1.0'
|
||||
@@ -41,14 +43,16 @@ class SchedulerAPI(cinder.openstack.common.rpc.proxy.RpcProxy):
|
||||
default_version=self.RPC_API_VERSION)
|
||||
|
||||
def create_volume(self, ctxt, topic, volume_id, snapshot_id=None,
|
||||
image_id=None):
|
||||
image_id=None, request_spec=None,
|
||||
filter_properties=None):
|
||||
return self.cast(ctxt, self.make_msg('create_volume',
|
||||
topic=topic,
|
||||
volume_id=volume_id,
|
||||
snapshot_id=snapshot_id,
|
||||
image_id=image_id),
|
||||
topic=None,
|
||||
version='1.1')
|
||||
topic=topic,
|
||||
volume_id=volume_id,
|
||||
snapshot_id=snapshot_id,
|
||||
image_id=image_id,
|
||||
request_spec=request_spec,
|
||||
filter_properties=filter_properties),
|
||||
version='1.2')
|
||||
|
||||
def update_service_capabilities(self, ctxt, service_name, host,
|
||||
capabilities):
|
||||
|
||||
@@ -43,22 +43,28 @@ FLAGS.register_opts(simple_scheduler_opts)
|
||||
class SimpleScheduler(chance.ChanceScheduler):
|
||||
"""Implements Naive Scheduler that tries to find least loaded host."""
|
||||
|
||||
def schedule_create_volume(self, context, volume_id, **_kwargs):
|
||||
def schedule_create_volume(self, context, request_spec, filter_properties):
|
||||
"""Picks a host that is up and has the fewest volumes."""
|
||||
elevated = context.elevated()
|
||||
|
||||
volume_ref = db.volume_get(context, volume_id)
|
||||
availability_zone = volume_ref.get('availability_zone')
|
||||
volume_id = request_spec.get('volume_id')
|
||||
snapshot_id = request_spec.get('snapshot_id')
|
||||
image_id = request_spec.get('image_id')
|
||||
volume_properties = request_spec.get('volume_properties')
|
||||
volume_size = volume_properties.get('size')
|
||||
availability_zone = volume_properties.get('availability_zone')
|
||||
|
||||
zone, host = None, None
|
||||
if availability_zone:
|
||||
zone, _x, host = availability_zone.partition(':')
|
||||
if host and context.is_admin:
|
||||
service = db.service_get_by_args(elevated, host, 'cinder-volume')
|
||||
topic = FLAGS.volume_topic
|
||||
service = db.service_get_by_args(elevated, host, topic)
|
||||
if not utils.service_is_up(service):
|
||||
raise exception.WillNotSchedule(host=host)
|
||||
driver.cast_to_volume_host(context, host, 'create_volume',
|
||||
volume_id=volume_id, **_kwargs)
|
||||
volume_id=volume_id, snapshot_id=snapshot_id,
|
||||
image_id=image_id)
|
||||
return None
|
||||
|
||||
results = db.service_get_all_volume_sorted(elevated)
|
||||
@@ -67,12 +73,13 @@ class SimpleScheduler(chance.ChanceScheduler):
|
||||
if service['availability_zone'] == zone]
|
||||
for result in results:
|
||||
(service, volume_gigabytes) = result
|
||||
if volume_gigabytes + volume_ref['size'] > FLAGS.max_gigabytes:
|
||||
if volume_gigabytes + volume_size > FLAGS.max_gigabytes:
|
||||
msg = _("Not enough allocatable volume gigabytes remaining")
|
||||
raise exception.NoValidHost(reason=msg)
|
||||
if utils.service_is_up(service) and not service['disabled']:
|
||||
driver.cast_to_volume_host(context, service['host'],
|
||||
'create_volume', volume_id=volume_id, **_kwargs)
|
||||
'create_volume', volume_id=volume_id,
|
||||
snapshot_id=snapshot_id, image_id=image_id)
|
||||
return None
|
||||
msg = _("Is the appropriate service running?")
|
||||
raise exception.NoValidHost(reason=msg)
|
||||
|
||||
@@ -69,8 +69,8 @@ class SchedulerRpcAPITestCase(test.TestCase):
|
||||
|
||||
def test_create_volume(self):
|
||||
self._test_scheduler_api('create_volume',
|
||||
rpc_method='cast', topic='fake_topic',
|
||||
volume_id='fake_volume_id',
|
||||
snapshot_id='fake_snapshot_id',
|
||||
image_id='fake_image_id',
|
||||
version='1.1')
|
||||
rpc_method='cast', topic='topic', volume_id='volume_id',
|
||||
snapshot_id='snapshot_id', image_id='image_id',
|
||||
request_spec='fake_request_spec',
|
||||
filter_properties='filter_properties',
|
||||
version='1.2')
|
||||
|
||||
@@ -22,6 +22,7 @@ Tests For Scheduler
|
||||
|
||||
from cinder import context
|
||||
from cinder import db
|
||||
from cinder import exception
|
||||
from cinder import flags
|
||||
from cinder.openstack.common import rpc
|
||||
from cinder.openstack.common import timeutils
|
||||
@@ -57,28 +58,6 @@ class SchedulerManagerTestCase(test.TestCase):
|
||||
manager = self.manager
|
||||
self.assertTrue(isinstance(manager.driver, self.driver_cls))
|
||||
|
||||
def test_get_host_list(self):
|
||||
expected = 'fake_hosts'
|
||||
|
||||
self.mox.StubOutWithMock(self.manager.driver, 'get_host_list')
|
||||
self.manager.driver.get_host_list().AndReturn(expected)
|
||||
|
||||
self.mox.ReplayAll()
|
||||
result = self.manager.get_host_list(self.context)
|
||||
self.assertEqual(result, expected)
|
||||
|
||||
def test_get_service_capabilities(self):
|
||||
expected = 'fake_service_capabs'
|
||||
|
||||
self.mox.StubOutWithMock(self.manager.driver,
|
||||
'get_service_capabilities')
|
||||
self.manager.driver.get_service_capabilities().AndReturn(
|
||||
expected)
|
||||
|
||||
self.mox.ReplayAll()
|
||||
result = self.manager.get_service_capabilities(self.context)
|
||||
self.assertEqual(result, expected)
|
||||
|
||||
def test_update_service_capabilities(self):
|
||||
service_name = 'fake_service'
|
||||
host = 'fake_host'
|
||||
@@ -104,28 +83,26 @@ class SchedulerManagerTestCase(test.TestCase):
|
||||
service_name=service_name, host=host,
|
||||
capabilities=capabilities)
|
||||
|
||||
def test_existing_method(self):
|
||||
def stub_method(self, *args, **kwargs):
|
||||
pass
|
||||
setattr(self.manager.driver, 'schedule_stub_method', stub_method)
|
||||
def test_create_volume_exception_puts_volume_in_error_state(self):
|
||||
""" Test that a NoValideHost exception for create_volume puts
|
||||
the volume in 'error' state and eats the exception.
|
||||
"""
|
||||
fake_volume_id = 1
|
||||
self._mox_schedule_method_helper('schedule_create_volume')
|
||||
self.mox.StubOutWithMock(db, 'volume_update')
|
||||
|
||||
self.mox.StubOutWithMock(self.manager.driver,
|
||||
'schedule_stub_method')
|
||||
self.manager.driver.schedule_stub_method(self.context,
|
||||
*self.fake_args, **self.fake_kwargs)
|
||||
topic = 'fake_topic'
|
||||
volume_id = fake_volume_id
|
||||
request_spec = {'volume_id': fake_volume_id}
|
||||
|
||||
self.manager.driver.schedule_create_volume(self.context,
|
||||
request_spec, {}).AndRaise(exception.NoValidHost(reason=""))
|
||||
db.volume_update(self.context, fake_volume_id, {'status': 'error'})
|
||||
|
||||
self.mox.ReplayAll()
|
||||
self.manager.stub_method(self.context, self.topic,
|
||||
*self.fake_args, **self.fake_kwargs)
|
||||
|
||||
def test_missing_method_fallback(self):
|
||||
self.mox.StubOutWithMock(self.manager.driver, 'schedule')
|
||||
self.manager.driver.schedule(self.context, self.topic,
|
||||
'noexist', *self.fake_args, **self.fake_kwargs)
|
||||
|
||||
self.mox.ReplayAll()
|
||||
self.manager.noexist(self.context, self.topic,
|
||||
*self.fake_args, **self.fake_kwargs)
|
||||
self.manager.create_volume(self.context, topic, volume_id,
|
||||
request_spec=request_spec,
|
||||
filter_properties={})
|
||||
|
||||
def _mox_schedule_method_helper(self, method_name):
|
||||
# Make sure the method exists that we're going to test call
|
||||
@@ -150,28 +127,6 @@ class SchedulerTestCase(test.TestCase):
|
||||
self.context = context.RequestContext('fake_user', 'fake_project')
|
||||
self.topic = 'fake_topic'
|
||||
|
||||
def test_get_host_list(self):
|
||||
expected = 'fake_hosts'
|
||||
|
||||
self.mox.StubOutWithMock(self.driver.host_manager, 'get_host_list')
|
||||
self.driver.host_manager.get_host_list().AndReturn(expected)
|
||||
|
||||
self.mox.ReplayAll()
|
||||
result = self.driver.get_host_list()
|
||||
self.assertEqual(result, expected)
|
||||
|
||||
def test_get_service_capabilities(self):
|
||||
expected = 'fake_service_capabs'
|
||||
|
||||
self.mox.StubOutWithMock(self.driver.host_manager,
|
||||
'get_service_capabilities')
|
||||
self.driver.host_manager.get_service_capabilities().AndReturn(
|
||||
expected)
|
||||
|
||||
self.mox.ReplayAll()
|
||||
result = self.driver.get_service_capabilities()
|
||||
self.assertEqual(result, expected)
|
||||
|
||||
def test_update_service_capabilities(self):
|
||||
service_name = 'fake_service'
|
||||
host = 'fake_host'
|
||||
|
||||
@@ -178,17 +178,30 @@ class API(base.Base):
|
||||
finally:
|
||||
QUOTAS.rollback(context, reservations)
|
||||
|
||||
self._cast_create_volume(context, volume['id'], snapshot_id,
|
||||
image_id)
|
||||
request_spec = {
|
||||
'volume_properties': options,
|
||||
'volume_type': volume_type,
|
||||
'volume_id': volume['id'],
|
||||
'snapshot_id': volume['snapshot_id'],
|
||||
'image_id': image_id
|
||||
}
|
||||
|
||||
filter_properties = {}
|
||||
|
||||
self._cast_create_volume(context, request_spec, filter_properties)
|
||||
|
||||
return volume
|
||||
|
||||
def _cast_create_volume(self, context, volume_id, snapshot_id,
|
||||
image_id):
|
||||
def _cast_create_volume(self, context, request_spec, filter_properties):
|
||||
|
||||
# NOTE(Rongze Zhu): It is a simple solution for bug 1008866
|
||||
# If snapshot_id is set, make the call create volume directly to
|
||||
# the volume host where the snapshot resides instead of passing it
|
||||
# through the scheduer. So snapshot can be copy to new volume.
|
||||
# through the scheduler. So snapshot can be copy to new volume.
|
||||
|
||||
volume_id = request_spec['volume_id']
|
||||
snapshot_id = request_spec['snapshot_id']
|
||||
image_id = request_spec['image_id']
|
||||
|
||||
if snapshot_id and FLAGS.snapshot_same_host:
|
||||
snapshot_ref = self.db.snapshot_get(context, snapshot_id)
|
||||
@@ -208,8 +221,10 @@ class API(base.Base):
|
||||
self.scheduler_rpcapi.create_volume(context,
|
||||
FLAGS.volume_topic,
|
||||
volume_id,
|
||||
snapshot_id=snapshot_id,
|
||||
image_id=image_id)
|
||||
snapshot_id,
|
||||
image_id,
|
||||
request_spec=request_spec,
|
||||
filter_properties=filter_properties)
|
||||
|
||||
@wrap_check_policy
|
||||
def delete(self, context, volume, force=False):
|
||||
|
||||
Reference in New Issue
Block a user