diff --git a/tooz/coordination.py b/tooz/coordination.py index c903fd6e..480fbde0 100644 --- a/tooz/coordination.py +++ b/tooz/coordination.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # +# Copyright (C) 2016 Red Hat, Inc. # Copyright (C) 2013-2014 eNovance Inc. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may @@ -17,6 +18,8 @@ import abc import collections import enum +import logging +import threading from oslo_utils import excutils from oslo_utils import netutils @@ -26,6 +29,9 @@ from stevedore import driver import tooz +LOG = logging.getLogger(__name__) + + TOOZ_BACKENDS_NAMESPACE = "tooz.backends" @@ -130,6 +136,77 @@ class LeaderElected(Event): self.member_id = member_id +class Heart(object): + """Coordination drivers main liveness pump (its heart).""" + + def __init__(self, driver, thread_cls=threading.Thread, + event_cls=threading.Event): + self._thread_cls = thread_cls + self._dead = event_cls() + self._finished = event_cls() + self._finished.set() + self._runner = None + self._driver = driver + self._beats = 0 + + @property + def beats(self): + """How many times the heart has beaten.""" + return self._beats + + def is_alive(self): + """Returns if the heart is beating.""" + return not (self._runner is None + or not self._runner.is_alive() + or self._finished.is_set()) + + @excutils.forever_retry_uncaught_exceptions + def _beat_forever_until_stopped(self): + """Inner beating loop.""" + try: + while not self._dead.is_set(): + with timeutils.StopWatch() as w: + wait_until_next_beat = self._driver.heartbeat() + ran_for = w.elapsed() + if ran_for > wait_until_next_beat: + LOG.warning( + "Heartbeating took too long to execute (it ran for" + " %0.2f seconds which is %0.2f seconds longer than" + " the next heartbeat idle time). This may cause" + " timeouts (in locks, leadership, ...) to" + " happen (which will not end well).", ran_for, + ran_for - wait_until_next_beat) + self._beats += 1 + # NOTE(harlowja): use the event object for waiting and + # not a sleep function since doing that will allow this code + # to terminate early if stopped via the stop() method vs + # having to wait until the sleep function returns. + self._dead.wait(wait_until_next_beat) + finally: + self._finished.set() + + def start(self, thread_cls=None): + """Starts the heart beating thread (noop if already started).""" + if not self.is_alive(): + self._finished.clear() + self._dead.clear() + self._beats = 0 + if thread_cls is None: + thread_cls = self._thread_cls + self._runner = thread_cls(target=self._beat_forever_until_stopped) + self._runner.daemon = True + self._runner.start() + + def stop(self): + """Requests the heart beating thread to stop beating.""" + self._dead.set() + + def wait(self, timeout=None): + """Wait up to given timeout for the heart beating thread to stop.""" + self._finished.wait(timeout) + return self._finished.is_set() + + @six.add_metaclass(abc.ABCMeta) class CoordinationDriver(object): @@ -157,6 +234,7 @@ class CoordinationDriver(object): self.requires_beating = ( CoordinationDriver.heartbeat != self.__class__.heartbeat ) + self.heart = Heart(self) def _has_hooks_for_group(self, group_id): return (len(self._hooks_join_group[group_id]) + @@ -283,7 +361,7 @@ class CoordinationDriver(object): def is_started(self): return self._started - def start(self): + def start(self, start_heart=False): """Start the service engine. If needed, the establishment of a connection to the servers @@ -293,6 +371,8 @@ class CoordinationDriver(object): raise ToozError( "Can not start a driver which has not been stopped") self._start() + if self.requires_beating and start_heart: + self.heart.start() self._started = True def _start(self): @@ -306,6 +386,9 @@ class CoordinationDriver(object): """ if not self._started: raise ToozError("Can not stop a driver which has not been started") + if self.heart.is_alive(): + self.heart.stop() + self.heart.wait() self._stop() self._started = False @@ -445,6 +528,7 @@ class CoordinationDriver(object): Method to run once in a while to be sure that the member is not dead and is still an active member of a group. + :return: The number of seconds to wait before sending a new heartbeat. """ pass diff --git a/tooz/drivers/etcd.py b/tooz/drivers/etcd.py index 1bd94eab..39d12226 100644 --- a/tooz/drivers/etcd.py +++ b/tooz/drivers/etcd.py @@ -214,6 +214,7 @@ class EtcdDriver(coordination.CoordinationDriver): def heartbeat(self): for lock in self._acquired_locks: lock.heartbeat() + return self.lock_timeout @staticmethod def watch_join_group(group_id, callback): diff --git a/tooz/drivers/memcached.py b/tooz/drivers/memcached.py index 87a84f9e..a998ade0 100644 --- a/tooz/drivers/memcached.py +++ b/tooz/drivers/memcached.py @@ -486,6 +486,9 @@ class MemcachedDriver(coordination._RunWatchersMixin, # Reset the acquired locks for lock in self._acquired_locks: lock.heartbeat() + return min(self.membership_timeout, + self.leader_timeout, + self.lock_timeout) @_translate_failures def _init_watch_group(self, group_id): diff --git a/tooz/drivers/redis.py b/tooz/drivers/redis.py index e4a6e438..c86b204d 100644 --- a/tooz/drivers/redis.py +++ b/tooz/drivers/redis.py @@ -508,6 +508,7 @@ return 1 except coordination.ToozError: LOG.warning("Unable to heartbeat lock '%s'", lock, exc_info=True) + return min(self.lock_timeout, self.membership_timeout) def _stop(self): while self._acquired_locks: diff --git a/tooz/drivers/zookeeper.py b/tooz/drivers/zookeeper.py index 32ee9926..8675a6bc 100644 --- a/tooz/drivers/zookeeper.py +++ b/tooz/drivers/zookeeper.py @@ -232,6 +232,7 @@ class BaseZooKeeperDriver(coordination.CoordinationDriver): coordination.raise_with_cause(coordination.ToozError, encodeutils.exception_to_unicode(e), cause=e) + return self.timeout def leave_group(self, group_id): member_path = self._path_member(group_id, self._member_id) diff --git a/tooz/tests/test_coordination.py b/tooz/tests/test_coordination.py index 4ddbaafa..396a7bde 100644 --- a/tooz/tests/test_coordination.py +++ b/tooz/tests/test_coordination.py @@ -337,8 +337,28 @@ class TestAPI(testscenarios.TestWithScenarios, update_cap.get) def test_heartbeat(self): + if not self._coord.requires_beating: + raise testcase.TestSkipped("Test not applicable (heartbeating" + " not required)") self._coord.heartbeat() + def test_heartbeat_loop(self): + if not self._coord.requires_beating: + raise testcase.TestSkipped("Test not applicable (heartbeating" + " not required)") + + heart = self._coord.heart + self.assertFalse(heart.is_alive()) + heart.start() + + # This will timeout if nothing ever is done... + try: + while not heart.beats: + time.sleep(1) + finally: + heart.stop() + heart.wait() + def test_disconnect_leave_group(self): member_id_test2 = self._get_random_uuid() client2 = tooz.coordination.get_coordinator(self.url,