Add periodic_fuzzy_delay option.

Fixes bug 962665

This random delay is intended to reduce the stampeding behavior
associated with periodic tasks when compute workers are restarted in
unison across a cluster.

Change-Id: Ie3771d94af29049061c129b8ea562ee447a61771
This commit is contained in:
Rick Harris 2012-03-22 23:50:36 +00:00
parent 0b10c3f1a0
commit 8b3f327df0
7 changed files with 48 additions and 29 deletions

View File

@ -21,6 +21,7 @@
import inspect
import os
import random
import signal
import eventlet
@ -47,6 +48,11 @@ service_opts = [
cfg.IntOpt('periodic_interval',
default=60,
help='seconds between running periodic tasks'),
cfg.IntOpt('periodic_fuzzy_delay',
default=60,
help='range of seconds to randomly delay when starting the'
' periodic task scheduler to reduce stampeding.'
' (Disable by setting to 0)'),
cfg.StrOpt('ec2_listen',
default="0.0.0.0",
help='IP address for EC2 API to listen'),
@ -151,7 +157,8 @@ class Service(object):
it state to the database services table."""
def __init__(self, host, binary, topic, manager, report_interval=None,
periodic_interval=None, *args, **kwargs):
periodic_interval=None, periodic_fuzzy_delay=None,
*args, **kwargs):
self.host = host
self.binary = binary
self.topic = topic
@ -160,6 +167,7 @@ class Service(object):
self.manager = manager_class(host=self.host, *args, **kwargs)
self.report_interval = report_interval
self.periodic_interval = periodic_interval
self.periodic_fuzzy_delay = periodic_fuzzy_delay
super(Service, self).__init__(*args, **kwargs)
self.saved_args, self.saved_kwargs = args, kwargs
self.timers = []
@ -200,12 +208,19 @@ class Service(object):
if self.report_interval:
pulse = utils.LoopingCall(self.report_state)
pulse.start(interval=self.report_interval, now=False)
pulse.start(interval=self.report_interval,
initial_delay=self.report_interval)
self.timers.append(pulse)
if self.periodic_interval:
if self.periodic_fuzzy_delay:
initial_delay = random.randint(0, self.periodic_fuzzy_delay)
else:
initial_delay = None
periodic = utils.LoopingCall(self.periodic_tasks)
periodic.start(interval=self.periodic_interval, now=False)
periodic.start(interval=self.periodic_interval,
initial_delay=initial_delay)
self.timers.append(periodic)
def _create_service_ref(self, context):
@ -224,7 +239,8 @@ class Service(object):
@classmethod
def create(cls, host=None, binary=None, topic=None, manager=None,
report_interval=None, periodic_interval=None):
report_interval=None, periodic_interval=None,
periodic_fuzzy_delay=None):
"""Instantiates class and passes back application object.
:param host: defaults to FLAGS.host
@ -233,6 +249,7 @@ class Service(object):
:param manager: defaults to FLAGS.<topic>_manager
:param report_interval: defaults to FLAGS.report_interval
:param periodic_interval: defaults to FLAGS.periodic_interval
:param periodic_fuzzy_delay: defaults to FLAGS.periodic_fuzzy_delay
"""
if not host:
@ -243,12 +260,16 @@ class Service(object):
topic = binary.rpartition('nova-')[2]
if not manager:
manager = FLAGS.get('%s_manager' % topic, None)
if not report_interval:
if report_interval is None:
report_interval = FLAGS.report_interval
if not periodic_interval:
if periodic_interval is None:
periodic_interval = FLAGS.periodic_interval
if periodic_fuzzy_delay is None:
periodic_fuzzy_delay = FLAGS.periodic_fuzzy_delay
service_obj = cls(host, binary, topic, manager,
report_interval, periodic_interval)
report_interval=report_interval,
periodic_interval=periodic_interval,
periodic_fuzzy_delay=periodic_fuzzy_delay)
return service_obj

View File

@ -38,14 +38,10 @@ class ServersTest(integrated_helpers._IntegratedTestBase):
return server
def _restart_compute_service(self, periodic_interval=None):
def _restart_compute_service(self, *args, **kwargs):
"""restart compute service. NOTE: fake driver forgets all instances."""
self.compute.kill()
if periodic_interval:
self.compute = self.start_service(
'compute', periodic_interval=periodic_interval)
else:
self.compute = self.start_service('compute')
self.compute = self.start_service('compute', *args, **kwargs)
def test_get_servers(self):
"""Simple check that listing servers works."""
@ -144,7 +140,8 @@ class ServersTest(integrated_helpers._IntegratedTestBase):
self.flags(stub_network=True, reclaim_instance_interval=1)
# enforce periodic tasks run in short time to avoid wait for 60s.
self._restart_compute_service(periodic_interval=0.3)
self._restart_compute_service(
periodic_interval=0.3, periodic_fuzzy_delay=0)
# Create server
server = self._build_minimal_create_server_request()

View File

@ -166,7 +166,7 @@ def stubout_create_vm(stubs):
def stubout_loopingcall_start(stubs):
def fake_start(self, interval, now=True):
def fake_start(self, interval):
self.f(*self.args, **self.kw)
stubs.Set(utils.LoopingCall, 'start', fake_start)

View File

@ -687,13 +687,14 @@ class LoopingCall(object):
self.f = f
self._running = False
def start(self, interval, now=True):
def start(self, interval, initial_delay=None):
self._running = True
done = event.Event()
def _inner():
if not now:
greenthread.sleep(interval)
if initial_delay:
greenthread.sleep(initial_delay)
try:
while self._running:
self.f(*self.args, **self.kw)

View File

@ -192,7 +192,7 @@ class ProxyConnection(driver.ComputeDriver):
LOG.exception(_('_wait_for_reboot failed'))
timer.stop()
timer.f = _wait_for_reboot
return timer.start(interval=0.5, now=True)
return timer.start(interval=0.5)
@exception.wrap_exception
def rescue(self, context, instance, network_info):
@ -225,7 +225,7 @@ class ProxyConnection(driver.ComputeDriver):
LOG.exception(_('_wait_for_rescue failed'))
timer.stop()
timer.f = _wait_for_reboot
return timer.start(interval=0.5, now=True)
return timer.start(interval=0.5)
@exception.wrap_exception
def unrescue(self, instance, network_info):
@ -278,7 +278,7 @@ class ProxyConnection(driver.ComputeDriver):
timer.stop()
timer.f = _wait_for_boot
return timer.start(interval=0.5, now=True)
return timer.start(interval=0.5)
def get_console_output(self, instance):
console_log = os.path.join(FLAGS.instances_path, instance['name'],

View File

@ -429,7 +429,7 @@ class LibvirtConnection(driver.ComputeDriver):
raise utils.LoopingCallDone
timer = utils.LoopingCall(_wait_for_destroy)
timer.start(interval=0.5, now=True)
timer.start(interval=0.5)
try:
self.firewall_driver.unfilter_instance(instance,
@ -726,7 +726,7 @@ class LibvirtConnection(driver.ComputeDriver):
instance=instance)
dom.create()
timer = utils.LoopingCall(self._wait_for_running, instance)
return timer.start(interval=0.5, now=True)
return timer.start(interval=0.5)
greenthread.sleep(1)
return False
@ -768,7 +768,7 @@ class LibvirtConnection(driver.ComputeDriver):
raise utils.LoopingCallDone
timer = utils.LoopingCall(_wait_for_reboot)
return timer.start(interval=0.5, now=True)
return timer.start(interval=0.5)
@exception.wrap_exception()
def pause(self, instance):
@ -912,7 +912,7 @@ class LibvirtConnection(driver.ComputeDriver):
raise utils.LoopingCallDone
timer = utils.LoopingCall(_wait_for_boot)
return timer.start(interval=0.5, now=True)
return timer.start(interval=0.5)
def _flush_libvirt_console(self, pty):
out, err = utils.execute('dd',
@ -2069,7 +2069,7 @@ class LibvirtConnection(driver.ComputeDriver):
post_method(ctxt, instance_ref, dest, block_migration)
timer.f = wait_for_live_migration
timer.start(interval=0.5, now=True)
timer.start(interval=0.5)
def pre_live_migration(self, block_device_info):
"""Preparation live migration.
@ -2415,7 +2415,7 @@ class LibvirtConnection(driver.ComputeDriver):
self.firewall_driver.apply_instance_filter(instance, network_info)
timer = utils.LoopingCall(self._wait_for_running, instance)
return timer.start(interval=0.5, now=True)
return timer.start(interval=0.5)
@exception.wrap_exception()
def finish_revert_migration(self, instance, network_info):
@ -2437,7 +2437,7 @@ class LibvirtConnection(driver.ComputeDriver):
self.firewall_driver.apply_instance_filter(instance, network_info)
timer = utils.LoopingCall(self._wait_for_running, instance)
return timer.start(interval=0.5, now=True)
return timer.start(interval=0.5)
def confirm_migration(self, migration, instance, network_info):
"""Confirms a resize, destroying the source VM"""

View File

@ -374,7 +374,7 @@ class VMWareAPISession(object):
done = event.Event()
loop = utils.LoopingCall(self._poll_task, instance_uuid, task_ref,
done)
loop.start(FLAGS.vmwareapi_task_poll_interval, now=True)
loop.start(FLAGS.vmwareapi_task_poll_interval)
ret_val = done.wait()
loop.stop()
return ret_val