Adds generic retries for build failures.

Add a generic scheduler retry for build failures.  Failed
build requests get casted back to scheduler for retry until
success or the maximum number of attempts is reached. The
number of attempts to make is configurable or can be
simply set to 1 to disable retries altogether.

Partially implements blueprint: scheduler-resource-race

DocImpact:

Adds a new capability to filter scheduler to enable retries of
scheduling requests.

1) New flag: scheduler_max_attempts (int) - Number of attempts to make
to schedule an instance before giving up and settting the instance to
error.
2) New RetryFilter.  Avoids re-scheduling to the same host multiple
times. (nova.scheduler.filters.RetryFilter)

Change-Id: I1127caeed4418c75372a42ca7fafacb4f061ffe3
This commit is contained in:
Brian Elliott 2012-06-21 00:44:24 +00:00
parent acb158714c
commit 83fece1aa7
8 changed files with 409 additions and 20 deletions

View File

@ -70,6 +70,7 @@ from nova.openstack.common.notifier import api as notifier
from nova.openstack.common import rpc
from nova.openstack.common.rpc import common as rpc_common
from nova.openstack.common import timeutils
from nova.scheduler import rpcapi as scheduler_rpcapi
from nova import utils
from nova.virt import driver
from nova import volume
@ -259,6 +260,7 @@ class ComputeManager(manager.SchedulerDependentManager):
self._last_info_cache_heal = 0
self.compute_api = compute.API()
self.compute_rpcapi = compute_rpcapi.ComputeAPI()
self.scheduler_rpcapi = scheduler_rpcapi.SchedulerAPI()
super(ComputeManager, self).__init__(service_name="compute",
*args, **kwargs)
@ -470,22 +472,90 @@ class ComputeManager(manager.SchedulerDependentManager):
instance = self._spawn(context, instance, image_meta,
network_info, block_device_info,
injected_files, admin_password)
except exception.InstanceNotFound:
raise # the instance got deleted during the spawn
except Exception:
with excutils.save_and_reraise_exception():
self._deallocate_network(context, instance)
# try to re-schedule instance:
self._reschedule_or_reraise(context, instance,
requested_networks, admin_password, injected_files,
is_first_time, **kwargs)
else:
# Spawn success:
if (is_first_time and not instance['access_ip_v4']
and not instance['access_ip_v6']):
self._update_access_ip(context, instance, network_info)
if (is_first_time and not instance['access_ip_v4']
and not instance['access_ip_v6']):
self._update_access_ip(context, instance, network_info)
self._notify_about_instance_usage(context, instance,
"create.end", network_info=network_info)
self._notify_about_instance_usage(
context, instance, "create.end", network_info=network_info)
except exception.InstanceNotFound:
LOG.warn(_("Instance not found."), instance_uuid=instance_uuid)
except Exception as e:
with excutils.save_and_reraise_exception():
self._set_instance_error_state(context, instance_uuid)
def _reschedule_or_reraise(self, context, instance, *args, **kwargs):
"""Try to re-schedule the build or re-raise the original build error to
error out the instance.
"""
type_, value, tb = sys.exc_info() # save original exception
rescheduled = False
instance_uuid = instance['uuid']
def _log_original_error():
LOG.error(_('Build error: %s') %
traceback.format_exception(type_, value, tb),
instance_uuid=instance_uuid)
try:
self._deallocate_network(context, instance)
except Exception:
# do not attempt retry if network de-allocation occurs:
_log_original_error()
raise
try:
rescheduled = self._reschedule(context, instance_uuid, *args,
**kwargs)
except Exception:
rescheduled = False
LOG.exception(_("Error trying to reschedule"),
instance_uuid=instance_uuid)
if rescheduled:
# log the original build error
_log_original_error()
else:
# not re-scheduling
raise type_, value, tb
def _reschedule(self, context, instance_uuid, requested_networks,
admin_password, injected_files, is_first_time, **kwargs):
filter_properties = kwargs.get('filter_properties', {})
retry = filter_properties.get('retry', None)
if not retry:
# no retry information, do not reschedule.
LOG.debug(_("Retry info not present, will not reschedule"),
instance_uuid=instance_uuid)
return
request_spec = kwargs.get('request_spec', None)
if not request_spec:
LOG.debug(_("No request spec, will not reschedule"),
instance_uuid=instance_uuid)
return
request_spec['num_instances'] = 1
LOG.debug(_("Re-scheduling instance: attempt %d"),
retry['num_attempts'], instance_uuid=instance_uuid)
self.scheduler_rpcapi.run_instance(context, FLAGS.compute_topic,
request_spec, admin_password, injected_files,
requested_networks, is_first_time, filter_properties,
reservations=None, call=False)
return True
@manager.periodic_task
def _check_instance_build_time(self, context):
"""Ensure that instances are not stuck in build."""

View File

@ -44,6 +44,9 @@ scheduler_driver_opts = [
cfg.StrOpt('scheduler_host_manager',
default='nova.scheduler.host_manager.HostManager',
help='The scheduler host manager class to use'),
cfg.IntOpt('scheduler_max_attempts',
default=3,
help='Maximum number of attempts to schedule an instance'),
]
FLAGS = flags.FLAGS

View File

@ -59,7 +59,6 @@ class FilterScheduler(driver.Scheduler):
Returns a list of the instances created.
"""
elevated = context.elevated()
num_instances = request_spec.get('num_instances', 1)
LOG.debug(_("Attempting to build %(num_instances)d instance(s)") %
@ -69,15 +68,16 @@ class FilterScheduler(driver.Scheduler):
notifier.notify(context, notifier.publisher_id("scheduler"),
'scheduler.run_instance.start', notifier.INFO, payload)
filter_properties = kwargs.pop('filter_properties', {})
weighted_hosts = self._schedule(context, "compute", request_spec,
*args, **kwargs)
filter_properties, *args, **kwargs)
if not weighted_hosts:
raise exception.NoValidHost(reason="")
# NOTE(comstud): Make sure we do not pass this through. It
# contains an instance of RpcContext that cannot be serialized.
kwargs.pop('filter_properties', None)
filter_properties.pop('context', None)
instances = []
for num in xrange(num_instances):
@ -86,9 +86,14 @@ class FilterScheduler(driver.Scheduler):
weighted_host = weighted_hosts.pop(0)
request_spec['instance_properties']['launch_index'] = num
instance = self._provision_resource(elevated, weighted_host,
request_spec, reservations,
kwargs)
filter_properties, kwargs)
# scrub retry host list in case we're scheduling multiple
# instances:
retry = filter_properties.get('retry', {})
retry['hosts'] = []
if instance:
instances.append(instance)
@ -120,11 +125,14 @@ class FilterScheduler(driver.Scheduler):
'prep_resize', **kwargs)
def _provision_resource(self, context, weighted_host, request_spec,
reservations, kwargs):
reservations, filter_properties, kwargs):
"""Create the requested resource in this Zone."""
instance = self.create_instance_db_entry(context, request_spec,
reservations)
# Add a retry entry for the selected compute host:
self._add_retry_host(filter_properties, weighted_host.host_state.host)
payload = dict(request_spec=request_spec,
weighted_host=weighted_host.to_dict(),
instance_id=instance['uuid'])
@ -133,14 +141,29 @@ class FilterScheduler(driver.Scheduler):
payload)
driver.cast_to_compute_host(context, weighted_host.host_state.host,
'run_instance', instance_uuid=instance['uuid'], **kwargs)
'run_instance', instance_uuid=instance['uuid'],
request_spec=request_spec, filter_properties=filter_properties,
**kwargs)
inst = driver.encode_instance(instance, local=True)
# So if another instance is created, create_instance_db_entry will
# actually create a new entry, instead of assume it's been created
# already
del request_spec['instance_properties']['uuid']
return inst
def _add_retry_host(self, filter_properties, host):
"""Add a retry entry for the selected computep host. In the event that
the request gets re-scheduled, this entry will signal that the given
host has already been tried.
"""
retry = filter_properties.get('retry', None)
if not retry:
return
hosts = retry['hosts']
hosts.append(host)
def _get_configuration_options(self):
"""Fetch options dictionary. Broken out for testing."""
return self.options.get_configuration()
@ -151,7 +174,41 @@ class FilterScheduler(driver.Scheduler):
"""
pass
def _schedule(self, context, topic, request_spec, *args, **kwargs):
def _max_attempts(self):
max_attempts = FLAGS.scheduler_max_attempts
if max_attempts < 1:
raise exception.NovaException(_("Invalid value for "
"'scheduler_max_attempts', must be >= 1"))
return max_attempts
def _populate_retry(self, filter_properties, instance_properties):
"""Populate filter properties with history of retries for this
request. If maximum retries is exceeded, raise NoValidHost.
"""
max_attempts = self._max_attempts()
retry = filter_properties.pop('retry', {})
if max_attempts == 1:
# re-scheduling is disabled.
return
# retry is enabled, update attempt count:
if retry:
retry['num_attempts'] += 1
else:
retry = {
'num_attempts': 1,
'hosts': [] # list of compute hosts tried
}
filter_properties['retry'] = retry
if retry['num_attempts'] > max_attempts:
uuid = instance_properties.get('uuid', None)
msg = _("Exceeded max scheduling attempts %d ") % max_attempts
raise exception.NoValidHost(msg, instance_uuid=uuid)
def _schedule(self, context, topic, request_spec, filter_properties, *args,
**kwargs):
"""Returns a list of hosts that meet the required specs,
ordered by their fitness.
"""
@ -166,7 +223,9 @@ class FilterScheduler(driver.Scheduler):
cost_functions = self.get_cost_functions()
config_options = self._get_configuration_options()
filter_properties = kwargs.get('filter_properties', {})
# check retry policy:
self._populate_retry(filter_properties, instance_properties)
filter_properties.update({'context': context,
'request_spec': request_spec,
'config_options': config_options,

View File

@ -0,0 +1,42 @@
# Copyright (c) 2012 OpenStack, LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from nova.openstack.common import log as logging
from nova.scheduler import filters
LOG = logging.getLogger(__name__)
class RetryFilter(filters.BaseHostFilter):
"""Filter out hosts that have already been attempted for scheduling
purposes
"""
def host_passes(self, host_state, filter_properties):
"""Skip hosts that have already been attempted"""
retry = filter_properties.get('retry', None)
if not retry:
# Re-scheduling is disabled
LOG.debug("Re-scheduling is disabled")
return True
hosts = retry.get('hosts', [])
host = host_state.host
LOG.debug(_("Previously tried hosts: %(hosts)s. (host=%(host)s)") %
locals())
# Host passes if it's not in the list of previously attempted hosts:
return host not in hosts

View File

@ -44,6 +44,7 @@ host_manager_opts = [
'maps to all filters included with nova.'),
cfg.ListOpt('scheduler_default_filters',
default=[
'RetryFilter',
'AvailabilityZoneFilter',
'RamFilter',
'ComputeFilter'

View File

@ -20,6 +20,7 @@
import copy
import datetime
import functools
import sys
import time
@ -102,6 +103,11 @@ def nop_report_driver_status(self):
pass
class FakeSchedulerAPI(object):
def run_instance(self, *args, **kwargs):
pass
class BaseTestCase(test.TestCase):
def setUp(self):
@ -129,6 +135,9 @@ class BaseTestCase(test.TestCase):
self.stubs.Set(rpc, 'call', rpc_call_wrapper)
self.stubs.Set(rpc, 'cast', rpc_cast_wrapper)
fake_rpcapi = FakeSchedulerAPI()
self.stubs.Set(self.compute, 'scheduler_rpcapi', fake_rpcapi)
def tearDown(self):
fake_image.FakeImageService_reset()
instances = db.instance_get_all(self.context.elevated())
@ -4290,3 +4299,100 @@ class DisabledInstanceTypesTestCase(BaseTestCase):
self.assertNotRaises(exception.FlavorNotFound,
self.compute_api.resize, self.context, instance, None,
exc_msg="Disabled flavors can be migrated to")
class ComputeReschedulingTestCase(BaseTestCase):
"""Tests related to re-scheduling build requests"""
def setUp(self):
super(ComputeReschedulingTestCase, self).setUp()
self._reschedule = self._reschedule_partial()
def _reschedule_partial(self):
uuid = "12-34-56-78-90"
requested_networks = None
admin_password = None
injected_files = None
is_first_time = False
return functools.partial(self.compute._reschedule, self.context, uuid,
requested_networks, admin_password, injected_files,
is_first_time)
def test_reschedule_no_filter_properties(self):
"""no filter_properties will disable re-scheduling"""
self.assertFalse(self._reschedule())
def test_reschedule_no_retry_info(self):
"""no retry info will also disable re-scheduling"""
filter_properties = {}
self.assertFalse(self._reschedule(filter_properties=filter_properties))
def test_reschedule_no_request_spec(self):
"""no request spec will also disable re-scheduling"""
retry = dict(num_attempts=1)
filter_properties = dict(retry=retry)
self.assertFalse(self._reschedule(filter_properties=filter_properties))
def test_reschedule_success(self):
retry = dict(num_attempts=1)
filter_properties = dict(retry=retry)
request_spec = {'num_instances': 42}
self.assertTrue(self._reschedule(filter_properties=filter_properties,
request_spec=request_spec))
self.assertEqual(1, request_spec['num_instances'])
class ThatsNoOrdinaryRabbitException(Exception):
pass
class ComputeReschedulingExceptionTestCase(BaseTestCase):
"""Tests for re-scheduling exception handling logic"""
def setUp(self):
super(ComputeReschedulingExceptionTestCase, self).setUp()
# cause _spawn to raise an exception to test the exception logic:
def exploding_spawn(*args, **kwargs):
raise ThatsNoOrdinaryRabbitException()
self.stubs.Set(self.compute, '_spawn',
exploding_spawn)
self.instance_uuid = self._create_fake_instance()['uuid']
def test_exception_with_rescheduling_disabled(self):
"""Spawn fails and re-scheduling is disabled."""
# this won't be re-scheduled:
self.assertRaises(ThatsNoOrdinaryRabbitException,
self.compute._run_instance, self.context, self.instance_uuid)
def test_exception_with_rescheduling_enabled(self):
"""Spawn fails and re-scheduling is enabled. Original exception
should *not* be re-raised.
"""
# provide the expected status so that this one will be re-scheduled:
retry = dict(num_attempts=1)
filter_properties = dict(retry=retry)
request_spec = dict(num_attempts=1)
self.assertNotRaises(ThatsNoOrdinaryRabbitException,
self.compute._run_instance, self.context, self.instance_uuid,
filter_properties=filter_properties, request_spec=request_spec)
def test_exception_context_cleared(self):
"""Test with no rescheduling and an additional exception occurs
clearing the original build error's exception context.
"""
# clears the original exception context:
class FleshWoundException(Exception):
pass
def reschedule_explode(*args, **kwargs):
raise FleshWoundException()
self.stubs.Set(self.compute, '_reschedule', reschedule_explode)
# the original exception should now be raised:
self.assertRaises(ThatsNoOrdinaryRabbitException,
self.compute._run_instance, self.context, self.instance_uuid)

View File

@ -79,7 +79,7 @@ class FilterSchedulerTestCase(test_scheduler.SchedulerTestCase):
sched = fakes.FakeFilterScheduler()
fake_context = context.RequestContext('user', 'project')
self.assertRaises(NotImplementedError, sched._schedule, fake_context,
"foo", {})
"foo", {}, {})
def test_scheduler_includes_launch_index(self):
ctxt = "fake-context"
@ -111,18 +111,18 @@ class FilterSchedulerTestCase(test_scheduler.SchedulerTestCase):
self.mox.StubOutWithMock(self.driver, '_provision_resource')
self.driver._schedule(context_fake, 'compute',
request_spec, **fake_kwargs
request_spec, {}, **fake_kwargs
).AndReturn(['host1', 'host2'])
# instance 1
self.driver._provision_resource(
ctxt, 'host1',
mox.Func(_has_launch_index(0)), None,
fake_kwargs).AndReturn(instance1)
{}, fake_kwargs).AndReturn(instance1)
# instance 2
self.driver._provision_resource(
ctxt, 'host2',
mox.Func(_has_launch_index(1)), None,
fake_kwargs).AndReturn(instance2)
{}, fake_kwargs).AndReturn(instance2)
self.mox.ReplayAll()
self.driver.schedule_run_instance(context_fake, request_spec, None,
@ -160,7 +160,7 @@ class FilterSchedulerTestCase(test_scheduler.SchedulerTestCase):
'vcpus': 1}}
self.mox.ReplayAll()
weighted_hosts = sched._schedule(fake_context, 'compute',
request_spec)
request_spec, {})
self.assertEquals(len(weighted_hosts), 10)
for weighted_host in weighted_hosts:
self.assertTrue(weighted_host.host_state is not None)
@ -176,3 +176,88 @@ class FilterSchedulerTestCase(test_scheduler.SchedulerTestCase):
hostinfo.update_from_compute_node(dict(memory_mb=1000,
local_gb=0, vcpus=1))
self.assertEquals(1000 - 128, fn(hostinfo, {}))
def test_max_attempts(self):
self.flags(scheduler_max_attempts=4)
sched = fakes.FakeFilterScheduler()
self.assertEqual(4, sched._max_attempts())
def test_invalid_max_attempts(self):
self.flags(scheduler_max_attempts=0)
sched = fakes.FakeFilterScheduler()
self.assertRaises(exception.NovaException, sched._max_attempts)
def test_retry_disabled(self):
"""Retry info should not get populated when re-scheduling is off"""
self.flags(scheduler_max_attempts=1)
sched = fakes.FakeFilterScheduler()
instance_properties = {}
request_spec = dict(instance_properties=instance_properties)
filter_properties = {}
sched._schedule(self.context, 'compute', request_spec,
filter_properties=filter_properties)
# should not have retry info in the populated filter properties:
self.assertFalse("retry" in filter_properties)
def test_retry_attempt_one(self):
"""Test retry logic on initial scheduling attempt"""
self.flags(scheduler_max_attempts=2)
sched = fakes.FakeFilterScheduler()
instance_properties = {}
request_spec = dict(instance_properties=instance_properties)
filter_properties = {}
sched._schedule(self.context, 'compute', request_spec,
filter_properties=filter_properties)
num_attempts = filter_properties['retry']['num_attempts']
self.assertEqual(1, num_attempts)
def test_retry_attempt_two(self):
"""Test retry logic when re-scheduling"""
self.flags(scheduler_max_attempts=2)
sched = fakes.FakeFilterScheduler()
instance_properties = {}
request_spec = dict(instance_properties=instance_properties)
retry = dict(num_attempts=1)
filter_properties = dict(retry=retry)
sched._schedule(self.context, 'compute', request_spec,
filter_properties=filter_properties)
num_attempts = filter_properties['retry']['num_attempts']
self.assertEqual(2, num_attempts)
def test_retry_exceeded_max_attempts(self):
"""Test for necessary explosion when max retries is exceeded"""
self.flags(scheduler_max_attempts=2)
sched = fakes.FakeFilterScheduler()
instance_properties = {}
request_spec = dict(instance_properties=instance_properties)
retry = dict(num_attempts=2)
filter_properties = dict(retry=retry)
self.assertRaises(exception.NoValidHost, sched._schedule, self.context,
'compute', request_spec, filter_properties=filter_properties)
def test_add_retry_host(self):
retry = dict(num_attempts=1, hosts=[])
filter_properties = dict(retry=retry)
host = "fakehost"
sched = fakes.FakeFilterScheduler()
sched._add_retry_host(filter_properties, host)
hosts = filter_properties['retry']['hosts']
self.assertEqual(1, len(hosts))
self.assertEqual(host, hosts[0])

View File

@ -885,3 +885,26 @@ class HostFiltersTestCase(test.TestCase):
host = fakes.FakeHostState('host1', 'compute',
{'capabilities': capabilities, 'service': service})
self.assertFalse(filt_cls.host_passes(host, filter_properties))
def test_retry_filter_disabled(self):
"""Test case where retry/re-scheduling is disabled"""
filt_cls = self.class_map['RetryFilter']()
host = fakes.FakeHostState('host1', 'compute', {})
filter_properties = {}
self.assertTrue(filt_cls.host_passes(host, filter_properties))
def test_retry_filter_pass(self):
"""Host not previously tried"""
filt_cls = self.class_map['RetryFilter']()
host = fakes.FakeHostState('host1', 'compute', {})
retry = dict(num_attempts=1, hosts=['host2', 'host3'])
filter_properties = dict(retry=retry)
self.assertTrue(filt_cls.host_passes(host, filter_properties))
def test_retry_filter_fail(self):
"""Host was already tried"""
filt_cls = self.class_map['RetryFilter']()
host = fakes.FakeHostState('host1', 'compute', {})
retry = dict(num_attempts=1, hosts=['host3', 'host1'])
filter_properties = dict(retry=retry)
self.assertFalse(filt_cls.host_passes(host, filter_properties))