Add etcd3 group support

Sem-Ver: feature

Change-Id: I1ce3a61c8e28b874df3583e08aa6a70f9506dd11
This commit is contained in:
Julien Danjou 2017-06-06 17:48:07 +02:00
parent f947d1f377
commit e1af969a68
2 changed files with 221 additions and 22 deletions

View File

@ -0,0 +1,4 @@
---
features:
- |
The etcd3 driver now supports the group membership API.

View File

@ -13,6 +13,8 @@
# under the License.
from __future__ import absolute_import
import contextlib
import functools
import threading
import etcd3
@ -21,18 +23,17 @@ from oslo_utils import encodeutils
import six
import tooz
from tooz import _retry
from tooz import coordination
from tooz import locking
from tooz import utils
def _translate_failures(func):
@contextlib.contextmanager
def _failure_translator():
"""Translates common requests exceptions into tooz exceptions."""
@six.wraps(func)
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
yield
except etcd3_exc.ConnectionFailedError as e:
utils.raise_with_cause(coordination.ToozConnectionError,
encodeutils.exception_to_unicode(e),
@ -46,6 +47,14 @@ def _translate_failures(func):
encodeutils.exception_to_unicode(e),
cause=e)
def _translate_failures(func):
@six.wraps(func)
def wrapper(*args, **kwargs):
with _failure_translator():
return func(*args, **kwargs)
return wrapper
@ -100,7 +109,8 @@ class Etcd3Lock(locking.Lock):
return False
class Etcd3Driver(coordination.CoordinationDriver):
class Etcd3Driver(coordination.CoordinationDriverCachedRunWatchers,
coordination.CoordinationDriverWithExecutor):
"""An etcd based driver.
This driver uses etcd provide the coordination driver semantics and
@ -124,8 +134,18 @@ class Etcd3Driver(coordination.CoordinationDriver):
timeout = int(options.get('timeout', self.DEFAULT_TIMEOUT))
self.client = etcd3.client(host=host, port=port, timeout=timeout)
self.lock_timeout = int(options.get('lock_timeout', timeout))
self.membership_timeout = int(options.get(
'membership_timeout', timeout))
self._acquired_locks = set()
def _start(self):
super(Etcd3Driver, self)._start()
self._membership_lease = self.client.lease(self.membership_timeout)
def _stop(self):
super(Etcd3Driver, self)._stop()
self._membership_lease.revoke()
def get_lock(self, name):
return Etcd3Lock(self, name, self.lock_timeout)
@ -133,16 +153,191 @@ class Etcd3Driver(coordination.CoordinationDriver):
# NOTE(jaypipes): Copying because set can mutate during iteration
for lock in self._acquired_locks.copy():
lock.heartbeat()
return self.lock_timeout
# TODO(jd) use the same lease for locks?
self._membership_lease.refresh()
return min(self.lock_timeout, self.membership_timeout)
def watch_join_group(self, group_id, callback):
GROUP_PREFIX = b"tooz/groups/"
def _encode_group_id(self, group_id):
return self.GROUP_PREFIX + group_id + b"/"
def _encode_group_member_id(self, group_id, member_id):
return self._encode_group_id(group_id) + member_id
def create_group(self, group_id):
encoded_group = self._encode_group_id(group_id)
@_translate_failures
def _create_group():
status, results = self.client.transaction(
compare=[
self.client.transactions.version(
encoded_group) == 0
],
success=[
self.client.transactions.put(encoded_group, b"")
],
failure=[],
)
if not status:
raise coordination.GroupAlreadyExist(group_id)
return EtcdFutureResult(self._executor.submit(_create_group))
def _destroy_group(self, group_id):
self.client.delete(self._encode_group_id(group_id))
def delete_group(self, group_id):
encoded_group = self._encode_group_id(group_id)
@_translate_failures
def _delete_group():
members = list(self.client.get_prefix(encoded_group))
if len(members) > 1:
raise coordination.GroupNotEmpty(group_id)
# Warning: as of this writing python-etcd3 does not support the
# NOT_EQUAL operator so we use the EQUAL operator and will retry on
# success, hihi
status, results = self.client.transaction(
compare=[
self.client.transactions.version(encoded_group) == 0
],
success=[],
failure=[
self.client.transactions.delete(encoded_group)
],
)
if status:
raise coordination.GroupNotCreated(group_id)
return EtcdFutureResult(self._executor.submit(_delete_group))
def join_group(self, group_id, capabilities=b""):
encoded_group = self._encode_group_id(group_id)
@_retry.retry()
@_translate_failures
def _join_group():
members = list(self.client.get_prefix(encoded_group))
encoded_member = self._encode_group_member_id(
group_id, self._member_id)
group_metadata = None
for cap, metadata in members:
if metadata.key == encoded_member:
raise coordination.MemberAlreadyExist(group_id,
self._member_id)
if metadata.key == encoded_group:
group_metadata = metadata
if group_metadata is None:
raise coordination.GroupNotCreated(group_id)
status, results = self.client.transaction(
# This comparison makes sure the group has not been deleted in
# the mean time
compare=[
self.client.transactions.version(encoded_group) ==
group_metadata.version
],
success=[
self.client.transactions.put(encoded_member,
utils.dumps(capabilities),
lease=self._membership_lease)
],
failure=[],
)
if not status:
# TODO(jd) There's a small optimization doable by getting the
# current range on failure and passing it to this function as
# the first arg when retrying to avoid redoing a get_prefix()
raise _retry.TryAgain
return EtcdFutureResult(self._executor.submit(_join_group))
def leave_group(self, group_id):
encoded_group = self._encode_group_id(group_id)
@_translate_failures
def _leave_group():
members = list(self.client.get_prefix(encoded_group))
encoded_member = self._encode_group_member_id(
group_id, self._member_id)
for capabilities, metadata in members:
if metadata.key == encoded_member:
break
else:
raise coordination.MemberNotJoined(group_id,
self._member_id)
self.client.delete(encoded_member)
return EtcdFutureResult(self._executor.submit(_leave_group))
def get_members(self, group_id):
encoded_group = self._encode_group_id(group_id)
@_translate_failures
def _get_members():
members = set()
group_found = False
for cap, metadata in self.client.get_prefix(encoded_group):
if metadata.key == encoded_group:
group_found = True
else:
members.add(metadata.key[len(encoded_group):])
if not group_found:
raise coordination.GroupNotCreated(group_id)
return members
return EtcdFutureResult(self._executor.submit(_get_members))
def get_member_capabilities(self, group_id, member_id):
encoded_member = self._encode_group_member_id(
group_id, member_id)
@_translate_failures
def _get_member_capabilities():
capabilities, metadata = self.client.get(encoded_member)
if capabilities is None:
raise coordination.MemberNotJoined(group_id, member_id)
return utils.loads(capabilities)
return EtcdFutureResult(
self._executor.submit(_get_member_capabilities))
def update_capabilities(self, group_id, capabilities):
encoded_member = self._encode_group_member_id(
group_id, self._member_id)
@_translate_failures
def _update_capabilities():
cap, metadata = self.client.get(encoded_member)
if cap is None:
raise coordination.MemberNotJoined(group_id, self._member_id)
self.client.put(encoded_member, utils.dumps(capabilities),
lease=self._membership_lease)
return EtcdFutureResult(
self._executor.submit(_update_capabilities))
@staticmethod
def watch_elected_as_leader(group_id, callback):
raise tooz.NotImplemented
def unwatch_join_group(self, group_id, callback):
@staticmethod
def unwatch_elected_as_leader(group_id, callback):
raise tooz.NotImplemented
def watch_leave_group(self, group_id, callback):
raise tooz.NotImplemented
def unwatch_leave_group(self, group_id, callback):
raise tooz.NotImplemented
EtcdFutureResult = functools.partial(coordination.CoordinatorResult,
failure_translator=_failure_translator)