coordination: expose a heartbeat loop method

This allows to delegate the heartbeat() loop entirely to Tooz. Tooz has
various information, such as the time required between heartbeats, that
the consumer application does not possess, which makes it hard to build
something reliable.

Change-Id: Ib87aae318285cceb91c6df1946e3cddababedf1b
Closes-Bug: #1557593
This commit is contained in:
Julien Danjou 2016-03-16 18:09:56 +01:00 committed by Joshua Harlow
parent 6d6fa0db30
commit 10b9711500
6 changed files with 111 additions and 1 deletions

View File

@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2016 Red Hat, Inc.
# Copyright (C) 2013-2014 eNovance Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -17,6 +18,8 @@
import abc
import collections
import enum
import logging
import threading
from oslo_utils import excutils
from oslo_utils import netutils
@ -26,6 +29,9 @@ from stevedore import driver
import tooz
LOG = logging.getLogger(__name__)
TOOZ_BACKENDS_NAMESPACE = "tooz.backends"
@ -130,6 +136,77 @@ class LeaderElected(Event):
self.member_id = member_id
class Heart(object):
"""Coordination drivers main liveness pump (its heart)."""
def __init__(self, driver, thread_cls=threading.Thread,
event_cls=threading.Event):
self._thread_cls = thread_cls
self._dead = event_cls()
self._finished = event_cls()
self._finished.set()
self._runner = None
self._driver = driver
self._beats = 0
@property
def beats(self):
"""How many times the heart has beaten."""
return self._beats
def is_alive(self):
"""Returns if the heart is beating."""
return not (self._runner is None
or not self._runner.is_alive()
or self._finished.is_set())
@excutils.forever_retry_uncaught_exceptions
def _beat_forever_until_stopped(self):
"""Inner beating loop."""
try:
while not self._dead.is_set():
with timeutils.StopWatch() as w:
wait_until_next_beat = self._driver.heartbeat()
ran_for = w.elapsed()
if ran_for > wait_until_next_beat:
LOG.warning(
"Heartbeating took too long to execute (it ran for"
" %0.2f seconds which is %0.2f seconds longer than"
" the next heartbeat idle time). This may cause"
" timeouts (in locks, leadership, ...) to"
" happen (which will not end well).", ran_for,
ran_for - wait_until_next_beat)
self._beats += 1
# NOTE(harlowja): use the event object for waiting and
# not a sleep function since doing that will allow this code
# to terminate early if stopped via the stop() method vs
# having to wait until the sleep function returns.
self._dead.wait(wait_until_next_beat)
finally:
self._finished.set()
def start(self, thread_cls=None):
"""Starts the heart beating thread (noop if already started)."""
if not self.is_alive():
self._finished.clear()
self._dead.clear()
self._beats = 0
if thread_cls is None:
thread_cls = self._thread_cls
self._runner = thread_cls(target=self._beat_forever_until_stopped)
self._runner.daemon = True
self._runner.start()
def stop(self):
"""Requests the heart beating thread to stop beating."""
self._dead.set()
def wait(self, timeout=None):
"""Wait up to given timeout for the heart beating thread to stop."""
self._finished.wait(timeout)
return self._finished.is_set()
@six.add_metaclass(abc.ABCMeta)
class CoordinationDriver(object):
@ -157,6 +234,7 @@ class CoordinationDriver(object):
self.requires_beating = (
CoordinationDriver.heartbeat != self.__class__.heartbeat
)
self.heart = Heart(self)
def _has_hooks_for_group(self, group_id):
return (len(self._hooks_join_group[group_id]) +
@ -283,7 +361,7 @@ class CoordinationDriver(object):
def is_started(self):
return self._started
def start(self):
def start(self, start_heart=False):
"""Start the service engine.
If needed, the establishment of a connection to the servers
@ -293,6 +371,8 @@ class CoordinationDriver(object):
raise ToozError(
"Can not start a driver which has not been stopped")
self._start()
if self.requires_beating and start_heart:
self.heart.start()
self._started = True
def _start(self):
@ -306,6 +386,9 @@ class CoordinationDriver(object):
"""
if not self._started:
raise ToozError("Can not stop a driver which has not been started")
if self.heart.is_alive():
self.heart.stop()
self.heart.wait()
self._stop()
self._started = False
@ -445,6 +528,7 @@ class CoordinationDriver(object):
Method to run once in a while to be sure that the member is not dead
and is still an active member of a group.
:return: The number of seconds to wait before sending a new heartbeat.
"""
pass

View File

@ -214,6 +214,7 @@ class EtcdDriver(coordination.CoordinationDriver):
def heartbeat(self):
for lock in self._acquired_locks:
lock.heartbeat()
return self.lock_timeout
@staticmethod
def watch_join_group(group_id, callback):

View File

@ -486,6 +486,9 @@ class MemcachedDriver(coordination._RunWatchersMixin,
# Reset the acquired locks
for lock in self._acquired_locks:
lock.heartbeat()
return min(self.membership_timeout,
self.leader_timeout,
self.lock_timeout)
@_translate_failures
def _init_watch_group(self, group_id):

View File

@ -508,6 +508,7 @@ return 1
except coordination.ToozError:
LOG.warning("Unable to heartbeat lock '%s'", lock,
exc_info=True)
return min(self.lock_timeout, self.membership_timeout)
def _stop(self):
while self._acquired_locks:

View File

@ -232,6 +232,7 @@ class BaseZooKeeperDriver(coordination.CoordinationDriver):
coordination.raise_with_cause(coordination.ToozError,
encodeutils.exception_to_unicode(e),
cause=e)
return self.timeout
def leave_group(self, group_id):
member_path = self._path_member(group_id, self._member_id)

View File

@ -337,8 +337,28 @@ class TestAPI(testscenarios.TestWithScenarios,
update_cap.get)
def test_heartbeat(self):
if not self._coord.requires_beating:
raise testcase.TestSkipped("Test not applicable (heartbeating"
" not required)")
self._coord.heartbeat()
def test_heartbeat_loop(self):
if not self._coord.requires_beating:
raise testcase.TestSkipped("Test not applicable (heartbeating"
" not required)")
heart = self._coord.heart
self.assertFalse(heart.is_alive())
heart.start()
# This will timeout if nothing ever is done...
try:
while not heart.beats:
time.sleep(1)
finally:
heart.stop()
heart.wait()
def test_disconnect_leave_group(self):
member_id_test2 = self._get_random_uuid()
client2 = tooz.coordination.get_coordinator(self.url,