Add waiting for the driver to SchedulerManager
This patch adds _wait_for_scheduler method before serving any request. Method waits till scheduler.is_ready() returns true or CONF.periodic_interval seconds passed from service startup. Change-Id: I9fab9fb076a955a24c1c157229baf027359d9771 Closes-Bug: 1409012
This commit is contained in:
parent
9cf6e694a3
commit
89106c5272
@ -19,6 +19,7 @@
|
||||
Scheduler Service
|
||||
"""
|
||||
|
||||
import eventlet
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
import oslo_messaging as messaging
|
||||
@ -75,11 +76,15 @@ class SchedulerManager(manager.Manager):
|
||||
'combination of filters and weighers.'))
|
||||
self.driver = importutils.import_object(scheduler_driver)
|
||||
super(SchedulerManager, self).__init__(*args, **kwargs)
|
||||
self._startup_delay = True
|
||||
|
||||
def init_host_with_rpc(self):
|
||||
ctxt = context.get_admin_context()
|
||||
self.request_service_capabilities(ctxt)
|
||||
|
||||
eventlet.sleep(CONF.periodic_interval)
|
||||
self._startup_delay = False
|
||||
|
||||
def update_service_capabilities(self, context, service_name=None,
|
||||
host=None, capabilities=None, **kwargs):
|
||||
"""Process a capability update from a service node."""
|
||||
@ -89,10 +94,18 @@ class SchedulerManager(manager.Manager):
|
||||
host,
|
||||
capabilities)
|
||||
|
||||
def _wait_for_scheduler(self):
|
||||
# NOTE(dulek): We're waiting for scheduler to announce that it's ready
|
||||
# or CONF.periodic_interval seconds from service startup has passed.
|
||||
while self._startup_delay and not self.driver.is_ready():
|
||||
eventlet.sleep(1)
|
||||
|
||||
def create_consistencygroup(self, context, topic,
|
||||
group_id,
|
||||
request_spec_list=None,
|
||||
filter_properties_list=None):
|
||||
|
||||
self._wait_for_scheduler()
|
||||
try:
|
||||
self.driver.schedule_create_consistencygroup(
|
||||
context, group_id,
|
||||
@ -117,6 +130,7 @@ class SchedulerManager(manager.Manager):
|
||||
image_id=None, request_spec=None,
|
||||
filter_properties=None):
|
||||
|
||||
self._wait_for_scheduler()
|
||||
try:
|
||||
flow_engine = create_volume.get_flow(context,
|
||||
db, self.driver,
|
||||
@ -142,6 +156,8 @@ class SchedulerManager(manager.Manager):
|
||||
filter_properties=None):
|
||||
"""Ensure that the host exists and can accept the volume."""
|
||||
|
||||
self._wait_for_scheduler()
|
||||
|
||||
def _migrate_volume_set_error(self, context, ex, request_spec):
|
||||
volume_state = {'volume_state': {'migration_status': None}}
|
||||
self._set_volume_state_and_notify('migrate_volume_to_host',
|
||||
@ -173,6 +189,9 @@ class SchedulerManager(manager.Manager):
|
||||
:param request_spec: parameters for this retype request
|
||||
:param filter_properties: parameters to filter by
|
||||
"""
|
||||
|
||||
self._wait_for_scheduler()
|
||||
|
||||
def _retype_volume_set_error(self, context, ex, request_spec,
|
||||
volume_ref, msg, reservations):
|
||||
if reservations:
|
||||
@ -223,6 +242,8 @@ class SchedulerManager(manager.Manager):
|
||||
request_spec, filter_properties=None):
|
||||
"""Ensure that the host exists and can accept the volume."""
|
||||
|
||||
self._wait_for_scheduler()
|
||||
|
||||
def _manage_existing_set_error(self, context, ex, request_spec):
|
||||
volume_state = {'volume_state': {'status': 'error'}}
|
||||
self._set_volume_state_and_notify('manage_existing', volume_state,
|
||||
@ -244,7 +265,12 @@ class SchedulerManager(manager.Manager):
|
||||
request_spec.get('ref'))
|
||||
|
||||
def get_pools(self, context, filters=None):
|
||||
"""Get active pools from scheduler's cache."""
|
||||
"""Get active pools from scheduler's cache.
|
||||
|
||||
NOTE(dulek): There's no self._wait_for_scheduler() because get_pools is
|
||||
an RPC call (is blocking for the c-api). Also this is admin-only API
|
||||
extension so it won't hurt the user much to retry the request manually.
|
||||
"""
|
||||
return self.driver.get_pools(context, filters)
|
||||
|
||||
def _set_volume_state_and_notify(self, method, updates, context, ex,
|
||||
|
@ -22,6 +22,7 @@ import string
|
||||
import uuid
|
||||
|
||||
import fixtures
|
||||
import mock
|
||||
from oslo_log import log as logging
|
||||
|
||||
from cinder import service
|
||||
@ -69,7 +70,10 @@ class _IntegratedTestBase(test.TestCase):
|
||||
|
||||
# set up services
|
||||
self.volume = self.start_service('volume')
|
||||
self.scheduler = self.start_service('scheduler')
|
||||
# NOTE(dulek): Mocking eventlet.sleep so test won't time out on
|
||||
# scheduler service start.
|
||||
with mock.patch('eventlet.sleep'):
|
||||
self.scheduler = self.start_service('scheduler')
|
||||
self._start_api_service()
|
||||
self.addCleanup(self.osapi.stop)
|
||||
|
||||
|
@ -49,6 +49,7 @@ class SchedulerManagerTestCase(test.TestCase):
|
||||
super(SchedulerManagerTestCase, self).setUp()
|
||||
self.flags(scheduler_driver=self.driver_cls_name)
|
||||
self.manager = self.manager_cls()
|
||||
self.manager._startup_delay = False
|
||||
self.context = context.RequestContext('fake_user', 'fake_project')
|
||||
self.topic = 'fake_topic'
|
||||
self.fake_args = (1, 2, 3)
|
||||
@ -59,6 +60,15 @@ class SchedulerManagerTestCase(test.TestCase):
|
||||
manager = self.manager
|
||||
self.assertIsInstance(manager.driver, self.driver_cls)
|
||||
|
||||
@mock.patch('eventlet.sleep')
|
||||
@mock.patch('cinder.volume.rpcapi.VolumeAPI.publish_service_capabilities')
|
||||
def test_init_host_with_rpc(self, publish_capabilities_mock, sleep_mock):
|
||||
self.manager._startup_delay = True
|
||||
self.manager.init_host_with_rpc()
|
||||
publish_capabilities_mock.assert_called_once_with(mock.ANY)
|
||||
sleep_mock.assert_called_once_with(CONF.periodic_interval)
|
||||
self.assertFalse(self.manager._startup_delay)
|
||||
|
||||
@mock.patch('cinder.scheduler.driver.Scheduler.'
|
||||
'update_service_capabilities')
|
||||
def test_update_service_capabilities_empty_dict(self, _mock_update_cap):
|
||||
@ -105,6 +115,65 @@ class SchedulerManagerTestCase(test.TestCase):
|
||||
_mock_sched_create.assert_called_once_with(self.context, request_spec,
|
||||
{})
|
||||
|
||||
@mock.patch('cinder.scheduler.driver.Scheduler.schedule_create_volume')
|
||||
@mock.patch('eventlet.sleep')
|
||||
def test_create_volume_no_delay(self, _mock_sleep, _mock_sched_create):
|
||||
fake_volume_id = 1
|
||||
topic = 'fake_topic'
|
||||
|
||||
request_spec = {'volume_id': fake_volume_id}
|
||||
|
||||
self.manager.create_volume(self.context, topic, fake_volume_id,
|
||||
request_spec=request_spec,
|
||||
filter_properties={})
|
||||
_mock_sched_create.assert_called_once_with(self.context, request_spec,
|
||||
{})
|
||||
self.assertFalse(_mock_sleep.called)
|
||||
|
||||
@mock.patch('cinder.scheduler.driver.Scheduler.schedule_create_volume')
|
||||
@mock.patch('cinder.scheduler.driver.Scheduler.is_ready')
|
||||
@mock.patch('eventlet.sleep')
|
||||
def test_create_volume_delay_scheduled_after_3_tries(self, _mock_sleep,
|
||||
_mock_is_ready,
|
||||
_mock_sched_create):
|
||||
self.manager._startup_delay = True
|
||||
fake_volume_id = 1
|
||||
topic = 'fake_topic'
|
||||
|
||||
request_spec = {'volume_id': fake_volume_id}
|
||||
|
||||
_mock_is_ready.side_effect = [False, False, True]
|
||||
|
||||
self.manager.create_volume(self.context, topic, fake_volume_id,
|
||||
request_spec=request_spec,
|
||||
filter_properties={})
|
||||
_mock_sched_create.assert_called_once_with(self.context, request_spec,
|
||||
{})
|
||||
calls = [mock.call(1)] * 2
|
||||
_mock_sleep.assert_has_calls(calls)
|
||||
self.assertEqual(2, _mock_sleep.call_count)
|
||||
|
||||
@mock.patch('cinder.scheduler.driver.Scheduler.schedule_create_volume')
|
||||
@mock.patch('cinder.scheduler.driver.Scheduler.is_ready')
|
||||
@mock.patch('eventlet.sleep')
|
||||
def test_create_volume_delay_scheduled_in_1_try(self, _mock_sleep,
|
||||
_mock_is_ready,
|
||||
_mock_sched_create):
|
||||
self.manager._startup_delay = True
|
||||
fake_volume_id = 1
|
||||
topic = 'fake_topic'
|
||||
|
||||
request_spec = {'volume_id': fake_volume_id}
|
||||
|
||||
_mock_is_ready.return_value = True
|
||||
|
||||
self.manager.create_volume(self.context, topic, fake_volume_id,
|
||||
request_spec=request_spec,
|
||||
filter_properties={})
|
||||
_mock_sched_create.assert_called_once_with(self.context, request_spec,
|
||||
{})
|
||||
self.assertFalse(_mock_sleep.called)
|
||||
|
||||
@mock.patch('cinder.scheduler.driver.Scheduler.host_passes_filters')
|
||||
@mock.patch('cinder.db.volume_update')
|
||||
def test_migrate_volume_exception_returns_volume_state(
|
||||
|
Loading…
Reference in New Issue
Block a user