Merge "Adds generic retries for build failures."
This commit is contained in:
@@ -44,6 +44,9 @@ scheduler_driver_opts = [
|
|||||||
cfg.StrOpt('scheduler_host_manager',
|
cfg.StrOpt('scheduler_host_manager',
|
||||||
default='nova.scheduler.host_manager.HostManager',
|
default='nova.scheduler.host_manager.HostManager',
|
||||||
help='The scheduler host manager class to use'),
|
help='The scheduler host manager class to use'),
|
||||||
|
cfg.IntOpt('scheduler_max_attempts',
|
||||||
|
default=3,
|
||||||
|
help='Maximum number of attempts to schedule an instance'),
|
||||||
]
|
]
|
||||||
|
|
||||||
FLAGS = flags.FLAGS
|
FLAGS = flags.FLAGS
|
||||||
|
|||||||
@@ -59,7 +59,6 @@ class FilterScheduler(driver.Scheduler):
|
|||||||
|
|
||||||
Returns a list of the instances created.
|
Returns a list of the instances created.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
elevated = context.elevated()
|
elevated = context.elevated()
|
||||||
num_instances = request_spec.get('num_instances', 1)
|
num_instances = request_spec.get('num_instances', 1)
|
||||||
LOG.debug(_("Attempting to build %(num_instances)d instance(s)") %
|
LOG.debug(_("Attempting to build %(num_instances)d instance(s)") %
|
||||||
@@ -69,15 +68,16 @@ class FilterScheduler(driver.Scheduler):
|
|||||||
notifier.notify(context, notifier.publisher_id("scheduler"),
|
notifier.notify(context, notifier.publisher_id("scheduler"),
|
||||||
'scheduler.run_instance.start', notifier.INFO, payload)
|
'scheduler.run_instance.start', notifier.INFO, payload)
|
||||||
|
|
||||||
|
filter_properties = kwargs.pop('filter_properties', {})
|
||||||
weighted_hosts = self._schedule(context, "compute", request_spec,
|
weighted_hosts = self._schedule(context, "compute", request_spec,
|
||||||
*args, **kwargs)
|
filter_properties, *args, **kwargs)
|
||||||
|
|
||||||
if not weighted_hosts:
|
if not weighted_hosts:
|
||||||
raise exception.NoValidHost(reason="")
|
raise exception.NoValidHost(reason="")
|
||||||
|
|
||||||
# NOTE(comstud): Make sure we do not pass this through. It
|
# NOTE(comstud): Make sure we do not pass this through. It
|
||||||
# contains an instance of RpcContext that cannot be serialized.
|
# contains an instance of RpcContext that cannot be serialized.
|
||||||
kwargs.pop('filter_properties', None)
|
filter_properties.pop('context', None)
|
||||||
|
|
||||||
instances = []
|
instances = []
|
||||||
for num in xrange(num_instances):
|
for num in xrange(num_instances):
|
||||||
@@ -86,9 +86,14 @@ class FilterScheduler(driver.Scheduler):
|
|||||||
weighted_host = weighted_hosts.pop(0)
|
weighted_host = weighted_hosts.pop(0)
|
||||||
|
|
||||||
request_spec['instance_properties']['launch_index'] = num
|
request_spec['instance_properties']['launch_index'] = num
|
||||||
|
|
||||||
instance = self._provision_resource(elevated, weighted_host,
|
instance = self._provision_resource(elevated, weighted_host,
|
||||||
request_spec, reservations,
|
request_spec, reservations,
|
||||||
kwargs)
|
filter_properties, kwargs)
|
||||||
|
# scrub retry host list in case we're scheduling multiple
|
||||||
|
# instances:
|
||||||
|
retry = filter_properties.get('retry', {})
|
||||||
|
retry['hosts'] = []
|
||||||
|
|
||||||
if instance:
|
if instance:
|
||||||
instances.append(instance)
|
instances.append(instance)
|
||||||
@@ -120,11 +125,14 @@ class FilterScheduler(driver.Scheduler):
|
|||||||
'prep_resize', **kwargs)
|
'prep_resize', **kwargs)
|
||||||
|
|
||||||
def _provision_resource(self, context, weighted_host, request_spec,
|
def _provision_resource(self, context, weighted_host, request_spec,
|
||||||
reservations, kwargs):
|
reservations, filter_properties, kwargs):
|
||||||
"""Create the requested resource in this Zone."""
|
"""Create the requested resource in this Zone."""
|
||||||
instance = self.create_instance_db_entry(context, request_spec,
|
instance = self.create_instance_db_entry(context, request_spec,
|
||||||
reservations)
|
reservations)
|
||||||
|
|
||||||
|
# Add a retry entry for the selected compute host:
|
||||||
|
self._add_retry_host(filter_properties, weighted_host.host_state.host)
|
||||||
|
|
||||||
payload = dict(request_spec=request_spec,
|
payload = dict(request_spec=request_spec,
|
||||||
weighted_host=weighted_host.to_dict(),
|
weighted_host=weighted_host.to_dict(),
|
||||||
instance_id=instance['uuid'])
|
instance_id=instance['uuid'])
|
||||||
@@ -133,14 +141,29 @@ class FilterScheduler(driver.Scheduler):
|
|||||||
payload)
|
payload)
|
||||||
|
|
||||||
driver.cast_to_compute_host(context, weighted_host.host_state.host,
|
driver.cast_to_compute_host(context, weighted_host.host_state.host,
|
||||||
'run_instance', instance_uuid=instance['uuid'], **kwargs)
|
'run_instance', instance_uuid=instance['uuid'],
|
||||||
|
request_spec=request_spec, filter_properties=filter_properties,
|
||||||
|
**kwargs)
|
||||||
inst = driver.encode_instance(instance, local=True)
|
inst = driver.encode_instance(instance, local=True)
|
||||||
|
|
||||||
# So if another instance is created, create_instance_db_entry will
|
# So if another instance is created, create_instance_db_entry will
|
||||||
# actually create a new entry, instead of assume it's been created
|
# actually create a new entry, instead of assume it's been created
|
||||||
# already
|
# already
|
||||||
del request_spec['instance_properties']['uuid']
|
del request_spec['instance_properties']['uuid']
|
||||||
|
|
||||||
return inst
|
return inst
|
||||||
|
|
||||||
|
def _add_retry_host(self, filter_properties, host):
|
||||||
|
"""Add a retry entry for the selected computep host. In the event that
|
||||||
|
the request gets re-scheduled, this entry will signal that the given
|
||||||
|
host has already been tried.
|
||||||
|
"""
|
||||||
|
retry = filter_properties.get('retry', None)
|
||||||
|
if not retry:
|
||||||
|
return
|
||||||
|
hosts = retry['hosts']
|
||||||
|
hosts.append(host)
|
||||||
|
|
||||||
def _get_configuration_options(self):
|
def _get_configuration_options(self):
|
||||||
"""Fetch options dictionary. Broken out for testing."""
|
"""Fetch options dictionary. Broken out for testing."""
|
||||||
return self.options.get_configuration()
|
return self.options.get_configuration()
|
||||||
@@ -151,7 +174,41 @@ class FilterScheduler(driver.Scheduler):
|
|||||||
"""
|
"""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
def _schedule(self, context, topic, request_spec, *args, **kwargs):
|
def _max_attempts(self):
|
||||||
|
max_attempts = FLAGS.scheduler_max_attempts
|
||||||
|
if max_attempts < 1:
|
||||||
|
raise exception.NovaException(_("Invalid value for "
|
||||||
|
"'scheduler_max_attempts', must be >= 1"))
|
||||||
|
return max_attempts
|
||||||
|
|
||||||
|
def _populate_retry(self, filter_properties, instance_properties):
|
||||||
|
"""Populate filter properties with history of retries for this
|
||||||
|
request. If maximum retries is exceeded, raise NoValidHost.
|
||||||
|
"""
|
||||||
|
max_attempts = self._max_attempts()
|
||||||
|
retry = filter_properties.pop('retry', {})
|
||||||
|
|
||||||
|
if max_attempts == 1:
|
||||||
|
# re-scheduling is disabled.
|
||||||
|
return
|
||||||
|
|
||||||
|
# retry is enabled, update attempt count:
|
||||||
|
if retry:
|
||||||
|
retry['num_attempts'] += 1
|
||||||
|
else:
|
||||||
|
retry = {
|
||||||
|
'num_attempts': 1,
|
||||||
|
'hosts': [] # list of compute hosts tried
|
||||||
|
}
|
||||||
|
filter_properties['retry'] = retry
|
||||||
|
|
||||||
|
if retry['num_attempts'] > max_attempts:
|
||||||
|
uuid = instance_properties.get('uuid', None)
|
||||||
|
msg = _("Exceeded max scheduling attempts %d ") % max_attempts
|
||||||
|
raise exception.NoValidHost(msg, instance_uuid=uuid)
|
||||||
|
|
||||||
|
def _schedule(self, context, topic, request_spec, filter_properties, *args,
|
||||||
|
**kwargs):
|
||||||
"""Returns a list of hosts that meet the required specs,
|
"""Returns a list of hosts that meet the required specs,
|
||||||
ordered by their fitness.
|
ordered by their fitness.
|
||||||
"""
|
"""
|
||||||
@@ -166,7 +223,9 @@ class FilterScheduler(driver.Scheduler):
|
|||||||
cost_functions = self.get_cost_functions()
|
cost_functions = self.get_cost_functions()
|
||||||
config_options = self._get_configuration_options()
|
config_options = self._get_configuration_options()
|
||||||
|
|
||||||
filter_properties = kwargs.get('filter_properties', {})
|
# check retry policy:
|
||||||
|
self._populate_retry(filter_properties, instance_properties)
|
||||||
|
|
||||||
filter_properties.update({'context': context,
|
filter_properties.update({'context': context,
|
||||||
'request_spec': request_spec,
|
'request_spec': request_spec,
|
||||||
'config_options': config_options,
|
'config_options': config_options,
|
||||||
|
|||||||
42
nova/scheduler/filters/retry_filter.py
Normal file
42
nova/scheduler/filters/retry_filter.py
Normal file
@@ -0,0 +1,42 @@
|
|||||||
|
# Copyright (c) 2012 OpenStack, LLC.
|
||||||
|
# All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
from nova.openstack.common import log as logging
|
||||||
|
from nova.scheduler import filters
|
||||||
|
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class RetryFilter(filters.BaseHostFilter):
|
||||||
|
"""Filter out hosts that have already been attempted for scheduling
|
||||||
|
purposes
|
||||||
|
"""
|
||||||
|
|
||||||
|
def host_passes(self, host_state, filter_properties):
|
||||||
|
"""Skip hosts that have already been attempted"""
|
||||||
|
retry = filter_properties.get('retry', None)
|
||||||
|
if not retry:
|
||||||
|
# Re-scheduling is disabled
|
||||||
|
LOG.debug("Re-scheduling is disabled")
|
||||||
|
return True
|
||||||
|
|
||||||
|
hosts = retry.get('hosts', [])
|
||||||
|
host = host_state.host
|
||||||
|
|
||||||
|
LOG.debug(_("Previously tried hosts: %(hosts)s. (host=%(host)s)") %
|
||||||
|
locals())
|
||||||
|
|
||||||
|
# Host passes if it's not in the list of previously attempted hosts:
|
||||||
|
return host not in hosts
|
||||||
@@ -44,6 +44,7 @@ host_manager_opts = [
|
|||||||
'maps to all filters included with nova.'),
|
'maps to all filters included with nova.'),
|
||||||
cfg.ListOpt('scheduler_default_filters',
|
cfg.ListOpt('scheduler_default_filters',
|
||||||
default=[
|
default=[
|
||||||
|
'RetryFilter',
|
||||||
'AvailabilityZoneFilter',
|
'AvailabilityZoneFilter',
|
||||||
'RamFilter',
|
'RamFilter',
|
||||||
'ComputeFilter'
|
'ComputeFilter'
|
||||||
|
|||||||
@@ -79,7 +79,7 @@ class FilterSchedulerTestCase(test_scheduler.SchedulerTestCase):
|
|||||||
sched = fakes.FakeFilterScheduler()
|
sched = fakes.FakeFilterScheduler()
|
||||||
fake_context = context.RequestContext('user', 'project')
|
fake_context = context.RequestContext('user', 'project')
|
||||||
self.assertRaises(NotImplementedError, sched._schedule, fake_context,
|
self.assertRaises(NotImplementedError, sched._schedule, fake_context,
|
||||||
"foo", {})
|
"foo", {}, {})
|
||||||
|
|
||||||
def test_scheduler_includes_launch_index(self):
|
def test_scheduler_includes_launch_index(self):
|
||||||
ctxt = "fake-context"
|
ctxt = "fake-context"
|
||||||
@@ -111,18 +111,18 @@ class FilterSchedulerTestCase(test_scheduler.SchedulerTestCase):
|
|||||||
self.mox.StubOutWithMock(self.driver, '_provision_resource')
|
self.mox.StubOutWithMock(self.driver, '_provision_resource')
|
||||||
|
|
||||||
self.driver._schedule(context_fake, 'compute',
|
self.driver._schedule(context_fake, 'compute',
|
||||||
request_spec, **fake_kwargs
|
request_spec, {}, **fake_kwargs
|
||||||
).AndReturn(['host1', 'host2'])
|
).AndReturn(['host1', 'host2'])
|
||||||
# instance 1
|
# instance 1
|
||||||
self.driver._provision_resource(
|
self.driver._provision_resource(
|
||||||
ctxt, 'host1',
|
ctxt, 'host1',
|
||||||
mox.Func(_has_launch_index(0)), None,
|
mox.Func(_has_launch_index(0)), None,
|
||||||
fake_kwargs).AndReturn(instance1)
|
{}, fake_kwargs).AndReturn(instance1)
|
||||||
# instance 2
|
# instance 2
|
||||||
self.driver._provision_resource(
|
self.driver._provision_resource(
|
||||||
ctxt, 'host2',
|
ctxt, 'host2',
|
||||||
mox.Func(_has_launch_index(1)), None,
|
mox.Func(_has_launch_index(1)), None,
|
||||||
fake_kwargs).AndReturn(instance2)
|
{}, fake_kwargs).AndReturn(instance2)
|
||||||
self.mox.ReplayAll()
|
self.mox.ReplayAll()
|
||||||
|
|
||||||
self.driver.schedule_run_instance(context_fake, request_spec, None,
|
self.driver.schedule_run_instance(context_fake, request_spec, None,
|
||||||
@@ -160,7 +160,7 @@ class FilterSchedulerTestCase(test_scheduler.SchedulerTestCase):
|
|||||||
'vcpus': 1}}
|
'vcpus': 1}}
|
||||||
self.mox.ReplayAll()
|
self.mox.ReplayAll()
|
||||||
weighted_hosts = sched._schedule(fake_context, 'compute',
|
weighted_hosts = sched._schedule(fake_context, 'compute',
|
||||||
request_spec)
|
request_spec, {})
|
||||||
self.assertEquals(len(weighted_hosts), 10)
|
self.assertEquals(len(weighted_hosts), 10)
|
||||||
for weighted_host in weighted_hosts:
|
for weighted_host in weighted_hosts:
|
||||||
self.assertTrue(weighted_host.host_state is not None)
|
self.assertTrue(weighted_host.host_state is not None)
|
||||||
@@ -176,3 +176,88 @@ class FilterSchedulerTestCase(test_scheduler.SchedulerTestCase):
|
|||||||
hostinfo.update_from_compute_node(dict(memory_mb=1000,
|
hostinfo.update_from_compute_node(dict(memory_mb=1000,
|
||||||
local_gb=0, vcpus=1))
|
local_gb=0, vcpus=1))
|
||||||
self.assertEquals(1000 - 128, fn(hostinfo, {}))
|
self.assertEquals(1000 - 128, fn(hostinfo, {}))
|
||||||
|
|
||||||
|
def test_max_attempts(self):
|
||||||
|
self.flags(scheduler_max_attempts=4)
|
||||||
|
|
||||||
|
sched = fakes.FakeFilterScheduler()
|
||||||
|
self.assertEqual(4, sched._max_attempts())
|
||||||
|
|
||||||
|
def test_invalid_max_attempts(self):
|
||||||
|
self.flags(scheduler_max_attempts=0)
|
||||||
|
|
||||||
|
sched = fakes.FakeFilterScheduler()
|
||||||
|
self.assertRaises(exception.NovaException, sched._max_attempts)
|
||||||
|
|
||||||
|
def test_retry_disabled(self):
|
||||||
|
"""Retry info should not get populated when re-scheduling is off"""
|
||||||
|
self.flags(scheduler_max_attempts=1)
|
||||||
|
sched = fakes.FakeFilterScheduler()
|
||||||
|
|
||||||
|
instance_properties = {}
|
||||||
|
request_spec = dict(instance_properties=instance_properties)
|
||||||
|
filter_properties = {}
|
||||||
|
|
||||||
|
sched._schedule(self.context, 'compute', request_spec,
|
||||||
|
filter_properties=filter_properties)
|
||||||
|
|
||||||
|
# should not have retry info in the populated filter properties:
|
||||||
|
self.assertFalse("retry" in filter_properties)
|
||||||
|
|
||||||
|
def test_retry_attempt_one(self):
|
||||||
|
"""Test retry logic on initial scheduling attempt"""
|
||||||
|
self.flags(scheduler_max_attempts=2)
|
||||||
|
sched = fakes.FakeFilterScheduler()
|
||||||
|
|
||||||
|
instance_properties = {}
|
||||||
|
request_spec = dict(instance_properties=instance_properties)
|
||||||
|
filter_properties = {}
|
||||||
|
|
||||||
|
sched._schedule(self.context, 'compute', request_spec,
|
||||||
|
filter_properties=filter_properties)
|
||||||
|
|
||||||
|
num_attempts = filter_properties['retry']['num_attempts']
|
||||||
|
self.assertEqual(1, num_attempts)
|
||||||
|
|
||||||
|
def test_retry_attempt_two(self):
|
||||||
|
"""Test retry logic when re-scheduling"""
|
||||||
|
self.flags(scheduler_max_attempts=2)
|
||||||
|
sched = fakes.FakeFilterScheduler()
|
||||||
|
|
||||||
|
instance_properties = {}
|
||||||
|
request_spec = dict(instance_properties=instance_properties)
|
||||||
|
|
||||||
|
retry = dict(num_attempts=1)
|
||||||
|
filter_properties = dict(retry=retry)
|
||||||
|
|
||||||
|
sched._schedule(self.context, 'compute', request_spec,
|
||||||
|
filter_properties=filter_properties)
|
||||||
|
|
||||||
|
num_attempts = filter_properties['retry']['num_attempts']
|
||||||
|
self.assertEqual(2, num_attempts)
|
||||||
|
|
||||||
|
def test_retry_exceeded_max_attempts(self):
|
||||||
|
"""Test for necessary explosion when max retries is exceeded"""
|
||||||
|
self.flags(scheduler_max_attempts=2)
|
||||||
|
sched = fakes.FakeFilterScheduler()
|
||||||
|
|
||||||
|
instance_properties = {}
|
||||||
|
request_spec = dict(instance_properties=instance_properties)
|
||||||
|
|
||||||
|
retry = dict(num_attempts=2)
|
||||||
|
filter_properties = dict(retry=retry)
|
||||||
|
|
||||||
|
self.assertRaises(exception.NoValidHost, sched._schedule, self.context,
|
||||||
|
'compute', request_spec, filter_properties=filter_properties)
|
||||||
|
|
||||||
|
def test_add_retry_host(self):
|
||||||
|
retry = dict(num_attempts=1, hosts=[])
|
||||||
|
filter_properties = dict(retry=retry)
|
||||||
|
host = "fakehost"
|
||||||
|
|
||||||
|
sched = fakes.FakeFilterScheduler()
|
||||||
|
sched._add_retry_host(filter_properties, host)
|
||||||
|
|
||||||
|
hosts = filter_properties['retry']['hosts']
|
||||||
|
self.assertEqual(1, len(hosts))
|
||||||
|
self.assertEqual(host, hosts[0])
|
||||||
|
|||||||
@@ -885,3 +885,26 @@ class HostFiltersTestCase(test.TestCase):
|
|||||||
host = fakes.FakeHostState('host1', 'compute',
|
host = fakes.FakeHostState('host1', 'compute',
|
||||||
{'capabilities': capabilities, 'service': service})
|
{'capabilities': capabilities, 'service': service})
|
||||||
self.assertFalse(filt_cls.host_passes(host, filter_properties))
|
self.assertFalse(filt_cls.host_passes(host, filter_properties))
|
||||||
|
|
||||||
|
def test_retry_filter_disabled(self):
|
||||||
|
"""Test case where retry/re-scheduling is disabled"""
|
||||||
|
filt_cls = self.class_map['RetryFilter']()
|
||||||
|
host = fakes.FakeHostState('host1', 'compute', {})
|
||||||
|
filter_properties = {}
|
||||||
|
self.assertTrue(filt_cls.host_passes(host, filter_properties))
|
||||||
|
|
||||||
|
def test_retry_filter_pass(self):
|
||||||
|
"""Host not previously tried"""
|
||||||
|
filt_cls = self.class_map['RetryFilter']()
|
||||||
|
host = fakes.FakeHostState('host1', 'compute', {})
|
||||||
|
retry = dict(num_attempts=1, hosts=['host2', 'host3'])
|
||||||
|
filter_properties = dict(retry=retry)
|
||||||
|
self.assertTrue(filt_cls.host_passes(host, filter_properties))
|
||||||
|
|
||||||
|
def test_retry_filter_fail(self):
|
||||||
|
"""Host was already tried"""
|
||||||
|
filt_cls = self.class_map['RetryFilter']()
|
||||||
|
host = fakes.FakeHostState('host1', 'compute', {})
|
||||||
|
retry = dict(num_attempts=1, hosts=['host3', 'host1'])
|
||||||
|
filter_properties = dict(retry=retry)
|
||||||
|
self.assertFalse(filt_cls.host_passes(host, filter_properties))
|
||||||
|
|||||||
Reference in New Issue
Block a user