Merge "Rewrite heartbeat runner with event"

This commit is contained in:
Jenkins 2017-03-02 15:04:07 +00:00 committed by Gerrit Code Review
commit 1009c620f4
1 changed files with 21 additions and 28 deletions

View File

@ -163,8 +163,6 @@ class Heart(object):
event_cls=threading.Event): event_cls=threading.Event):
self._thread_cls = thread_cls self._thread_cls = thread_cls
self._dead = event_cls() self._dead = event_cls()
self._finished = event_cls()
self._finished.set()
self._runner = None self._runner = None
self._driver = driver self._driver = driver
self._beats = 0 self._beats = 0
@ -177,38 +175,33 @@ class Heart(object):
def is_alive(self): def is_alive(self):
"""Returns if the heart is beating.""" """Returns if the heart is beating."""
return not (self._runner is None return not (self._runner is None
or not self._runner.is_alive() or not self._runner.is_alive())
or self._finished.is_set())
@excutils.forever_retry_uncaught_exceptions @excutils.forever_retry_uncaught_exceptions
def _beat_forever_until_stopped(self): def _beat_forever_until_stopped(self):
"""Inner beating loop.""" """Inner beating loop."""
try: while not self._dead.is_set():
while not self._dead.is_set(): with timeutils.StopWatch() as w:
with timeutils.StopWatch() as w: wait_until_next_beat = self._driver.heartbeat()
wait_until_next_beat = self._driver.heartbeat() ran_for = w.elapsed()
ran_for = w.elapsed() if ran_for > wait_until_next_beat:
if ran_for > wait_until_next_beat: LOG.warning(
LOG.warning( "Heartbeating took too long to execute (it ran for"
"Heartbeating took too long to execute (it ran for" " %0.2f seconds which is %0.2f seconds longer than"
" %0.2f seconds which is %0.2f seconds longer than" " the next heartbeat idle time). This may cause"
" the next heartbeat idle time). This may cause" " timeouts (in locks, leadership, ...) to"
" timeouts (in locks, leadership, ...) to" " happen (which will not end well).", ran_for,
" happen (which will not end well).", ran_for, ran_for - wait_until_next_beat)
ran_for - wait_until_next_beat) self._beats += 1
self._beats += 1 # NOTE(harlowja): use the event object for waiting and
# NOTE(harlowja): use the event object for waiting and # not a sleep function since doing that will allow this code
# not a sleep function since doing that will allow this code # to terminate early if stopped via the stop() method vs
# to terminate early if stopped via the stop() method vs # having to wait until the sleep function returns.
# having to wait until the sleep function returns. self._dead.wait(wait_until_next_beat)
self._dead.wait(wait_until_next_beat)
finally:
self._finished.set()
def start(self, thread_cls=None): def start(self, thread_cls=None):
"""Starts the heart beating thread (noop if already started).""" """Starts the heart beating thread (noop if already started)."""
if not self.is_alive(): if not self.is_alive():
self._finished.clear()
self._dead.clear() self._dead.clear()
self._beats = 0 self._beats = 0
if thread_cls is None: if thread_cls is None:
@ -223,8 +216,8 @@ class Heart(object):
def wait(self, timeout=None): def wait(self, timeout=None):
"""Wait up to given timeout for the heart beating thread to stop.""" """Wait up to given timeout for the heart beating thread to stop."""
self._finished.wait(timeout) self._runner.join(timeout)
return self._finished.is_set() return self._runner.is_alive()
class CoordinationDriver(object): class CoordinationDriver(object):