Merge "Expose 'run_elect_coordinator' and call it from 'run_watchers'"

This commit is contained in:
Jenkins 2015-08-05 06:13:53 +00:00 committed by Gerrit Code Review
commit 8610434a66
4 changed files with 23 additions and 16 deletions

View File

@ -78,7 +78,16 @@ class CoordinationDriver(object):
@staticmethod
def run_watchers(timeout=None):
"""Run the watchers callback."""
"""Run the watchers callback.
This may also activate :py:meth:`.run_elect_coordinator` (depending
on driver implementation).
"""
raise tooz.NotImplemented
@staticmethod
def run_elect_coordinator():
"""Try to leader elect this coordinator & activate hooks on success."""
raise tooz.NotImplemented
@abc.abstractmethod

View File

@ -458,7 +458,7 @@ class MemcachedDriver(coordination._RunWatchersMixin,
self.leader_timeout)
@_translate_failures
def _run_leadership(self):
def run_elect_coordinator(self):
for group_id, hooks in six.iteritems(self._hooks_elected_leader):
# Try to grab the lock, if that fails, that means someone has it
# already.
@ -471,7 +471,7 @@ class MemcachedDriver(coordination._RunWatchersMixin,
def run_watchers(self, timeout=None):
result = super(MemcachedDriver, self).run_watchers(timeout=timeout)
self._run_leadership()
self.run_elect_coordinator()
return result

View File

@ -23,7 +23,6 @@ import string
from concurrent import futures
from oslo_utils import strutils
from oslo_utils import timeutils
import redis
from redis import exceptions
from redis import lock as redis_locks
@ -743,10 +742,8 @@ return 1
name = self._encode_group_leader(group_id)
return self.get_lock(name)
def _run_leadership(self, watch):
def run_elect_coordinator(self):
for group_id, hooks in six.iteritems(self._hooks_elected_leader):
if watch.expired():
return
leader_lock = self._get_leader_lock(group_id)
if leader_lock.acquire(blocking=False):
# We got the lock
@ -754,10 +751,8 @@ return 1
self._member_id))
def run_watchers(self, timeout=None):
w = timeutils.StopWatch(duration=timeout)
w.start()
result = super(RedisDriver, self).run_watchers(timeout=timeout)
self._run_leadership(w)
self.run_elect_coordinator()
return result

View File

@ -460,11 +460,7 @@ class KazooDriver(BaseZooKeeperDriver):
self.paths_join(b"/", self._namespace, b"locks", name),
self._member_id.decode('ascii')))
def run_watchers(self, timeout=None):
ret = []
while self._watchers:
cb = self._watchers.popleft()
ret.extend(cb())
def run_elect_coordinator(self):
for group_id in six.iterkeys(self._hooks_elected_leader):
leader_lock = self._get_group_leader_lock(group_id)
if leader_lock.is_acquired:
@ -476,7 +472,14 @@ class KazooDriver(BaseZooKeeperDriver):
coordination.LeaderElected(
group_id,
self._member_id))
return ret
def run_watchers(self, timeout=None):
results = []
while self._watchers:
cb = self._watchers.popleft()
results.extend(cb())
self.run_elect_coordinator()
return results
class ZooAsyncResult(coordination.CoordAsyncResult):