Implement group support for etcd3gw
This adds the various methods to support groups operations on the etc3gw driver. Change-Id: I8acb2f617f6044449fb3a91a9c3de73ef0061d36
This commit is contained in:
parent
824ff65433
commit
6ab8c380c8
@ -0,0 +1,4 @@
|
||||
---
|
||||
features:
|
||||
- |
|
||||
The etcd3gw driver now supports the group membership API.
|
@ -29,6 +29,11 @@ from tooz import locking
|
||||
from tooz import utils
|
||||
|
||||
|
||||
def _encode(data):
|
||||
"""Safely encode data for consumption of the gateway."""
|
||||
return base64.b64encode(data).decode("ascii")
|
||||
|
||||
|
||||
def _translate_failures(func):
|
||||
"""Translates common requests exceptions into tooz exceptions."""
|
||||
|
||||
@ -66,8 +71,8 @@ class Etcd3Lock(locking.Lock):
|
||||
self._timeout = timeout
|
||||
self._coord = coord
|
||||
self._key = self.LOCK_PREFIX + name
|
||||
self._key_b64 = base64.b64encode(self._key).decode("ascii")
|
||||
self._uuid = base64.b64encode(uuid.uuid4().bytes).decode("ascii")
|
||||
self._key_b64 = _encode(self._key)
|
||||
self._uuid = _encode(uuid.uuid4().bytes)
|
||||
self._exclusive_access = threading.Lock()
|
||||
|
||||
@_translate_failures
|
||||
@ -156,7 +161,7 @@ class Etcd3Lock(locking.Lock):
|
||||
return False
|
||||
|
||||
|
||||
class Etcd3Driver(coordination.CoordinationDriver):
|
||||
class Etcd3Driver(coordination.CoordinationDriverWithExecutor):
|
||||
"""An etcd based driver.
|
||||
|
||||
This driver uses etcd provide the coordination driver semantics and
|
||||
@ -172,6 +177,8 @@ class Etcd3Driver(coordination.CoordinationDriver):
|
||||
#: Default port used if none provided (4001 or 2379 are the common ones).
|
||||
DEFAULT_PORT = 2379
|
||||
|
||||
GROUP_PREFIX = b"tooz/groups/"
|
||||
|
||||
def __init__(self, member_id, parsed_url, options):
|
||||
super(Etcd3Driver, self).__init__(member_id, parsed_url, options)
|
||||
host = parsed_url.hostname or self.DEFAULT_HOST
|
||||
@ -180,8 +187,14 @@ class Etcd3Driver(coordination.CoordinationDriver):
|
||||
timeout = int(options.get('timeout', self.DEFAULT_TIMEOUT))
|
||||
self.client = etcd3gw.client(host=host, port=port, timeout=timeout)
|
||||
self.lock_timeout = int(options.get('lock_timeout', timeout))
|
||||
self.membership_timeout = int(options.get(
|
||||
'membership_timeout', timeout))
|
||||
self._acquired_locks = set()
|
||||
|
||||
def _start(self):
|
||||
super(Etcd3Driver, self)._start()
|
||||
self._membership_lease = self.client.lease(self.membership_timeout)
|
||||
|
||||
def get_lock(self, name):
|
||||
return Etcd3Lock(self, name, self.lock_timeout)
|
||||
|
||||
@ -202,3 +215,193 @@ class Etcd3Driver(coordination.CoordinationDriver):
|
||||
|
||||
def unwatch_leave_group(self, group_id, callback):
|
||||
raise tooz.NotImplemented
|
||||
|
||||
def _encode_group_id(self, group_id):
|
||||
return _encode(self._prefix_group(group_id))
|
||||
|
||||
def _prefix_group(self, group_id):
|
||||
return b"%s%s/" % (self.GROUP_PREFIX, group_id)
|
||||
|
||||
def create_group(self, group_id):
|
||||
@_translate_failures
|
||||
def _create_group():
|
||||
encoded_group = self._encode_group_id(group_id)
|
||||
txn = {
|
||||
'compare': [{
|
||||
'key': encoded_group,
|
||||
'result': 'EQUAL',
|
||||
'target': 'VERSION',
|
||||
'version': 0
|
||||
}],
|
||||
'success': [{
|
||||
'request_put': {
|
||||
'key': encoded_group,
|
||||
# We shouldn't need a value, but etcd3gw needs it for
|
||||
# now
|
||||
'value': encoded_group
|
||||
}
|
||||
}],
|
||||
'failure': []
|
||||
}
|
||||
result = self.client.transaction(txn)
|
||||
if not result.get("succeeded"):
|
||||
raise coordination.GroupAlreadyExist(group_id)
|
||||
|
||||
return coordination.CoordinatorResult(
|
||||
self._executor.submit(_create_group))
|
||||
|
||||
def _destroy_group(self, group_id):
|
||||
self.client.delete(group_id)
|
||||
|
||||
def delete_group(self, group_id):
|
||||
@_translate_failures
|
||||
def _delete_group():
|
||||
prefix_group = self._prefix_group(group_id)
|
||||
members = self.client.get_prefix(prefix_group)
|
||||
if len(members) > 1:
|
||||
raise coordination.GroupNotEmpty(group_id)
|
||||
|
||||
encoded_group = self._encode_group_id(group_id)
|
||||
txn = {
|
||||
'compare': [{
|
||||
'key': encoded_group,
|
||||
'result': 'NOT_EQUAL',
|
||||
'target': 'VERSION',
|
||||
'version': 0
|
||||
}],
|
||||
'success': [{
|
||||
'request_delete_range': {
|
||||
'key': encoded_group,
|
||||
}
|
||||
}],
|
||||
'failure': []
|
||||
}
|
||||
result = self.client.transaction(txn)
|
||||
|
||||
if not result.get("succeeded"):
|
||||
raise coordination.GroupNotCreated(group_id)
|
||||
|
||||
return coordination.CoordinatorResult(
|
||||
self._executor.submit(_delete_group))
|
||||
|
||||
def join_group(self, group_id, capabilities=b""):
|
||||
@_retry.retry()
|
||||
@_translate_failures
|
||||
def _join_group():
|
||||
prefix_group = self._prefix_group(group_id)
|
||||
prefix_member = prefix_group + self._member_id
|
||||
members = self.client.get_prefix(prefix_group)
|
||||
|
||||
encoded_member = _encode(prefix_member)
|
||||
|
||||
group_metadata = None
|
||||
for cap, metadata in members:
|
||||
if metadata['key'] == prefix_member:
|
||||
raise coordination.MemberAlreadyExist(group_id,
|
||||
self._member_id)
|
||||
if metadata['key'] == prefix_group:
|
||||
group_metadata = metadata
|
||||
|
||||
if group_metadata is None:
|
||||
raise coordination.GroupNotCreated(group_id)
|
||||
|
||||
encoded_group = self._encode_group_id(group_id)
|
||||
txn = {
|
||||
'compare': [{
|
||||
'key': encoded_group,
|
||||
'result': 'EQUAL',
|
||||
'target': 'VERSION',
|
||||
'version': int(group_metadata['version'])
|
||||
}],
|
||||
'success': [{
|
||||
'request_put': {
|
||||
'key': encoded_member,
|
||||
'value': _encode(utils.dumps(capabilities)),
|
||||
'lease': self._membership_lease.id
|
||||
}
|
||||
}],
|
||||
'failure': []
|
||||
}
|
||||
result = self.client.transaction(txn)
|
||||
if not result.get('succeeded'):
|
||||
raise _retry.TryAgain
|
||||
else:
|
||||
self._joined_groups.add(group_id)
|
||||
|
||||
return coordination.CoordinatorResult(
|
||||
self._executor.submit(_join_group))
|
||||
|
||||
def leave_group(self, group_id):
|
||||
@_translate_failures
|
||||
def _leave_group():
|
||||
prefix_group = self._prefix_group(group_id)
|
||||
prefix_member = prefix_group + self._member_id
|
||||
members = self.client.get_prefix(prefix_group)
|
||||
for capabilities, metadata in members:
|
||||
if metadata['key'] == prefix_member:
|
||||
break
|
||||
else:
|
||||
raise coordination.MemberNotJoined(group_id,
|
||||
self._member_id)
|
||||
|
||||
self.client.delete(prefix_member)
|
||||
self._joined_groups.discard(group_id)
|
||||
|
||||
return coordination.CoordinatorResult(
|
||||
self._executor.submit(_leave_group))
|
||||
|
||||
def get_members(self, group_id):
|
||||
@_translate_failures
|
||||
def _get_members():
|
||||
prefix_group = self._prefix_group(group_id)
|
||||
members = set()
|
||||
group_found = False
|
||||
|
||||
for cap, metadata in self.client.get_prefix(prefix_group):
|
||||
if metadata['key'] == prefix_group:
|
||||
group_found = True
|
||||
else:
|
||||
members.add(metadata['key'][len(prefix_group):])
|
||||
|
||||
if not group_found:
|
||||
raise coordination.GroupNotCreated(group_id)
|
||||
|
||||
return members
|
||||
|
||||
return coordination.CoordinatorResult(
|
||||
self._executor.submit(_get_members))
|
||||
|
||||
def get_member_capabilities(self, group_id, member_id):
|
||||
@_translate_failures
|
||||
def _get_member_capabilities():
|
||||
prefix_member = self._prefix_group(group_id) + member_id
|
||||
result = self.client.get(prefix_member)
|
||||
if not result:
|
||||
raise coordination.MemberNotJoined(group_id, member_id)
|
||||
return utils.loads(result[0])
|
||||
|
||||
return coordination.CoordinatorResult(
|
||||
self._executor.submit(_get_member_capabilities))
|
||||
|
||||
def update_capabilities(self, group_id, capabilities):
|
||||
@_translate_failures
|
||||
def _update_capabilities():
|
||||
prefix_member = self._prefix_group(group_id) + self._member_id
|
||||
result = self.client.get(prefix_member)
|
||||
if not result:
|
||||
raise coordination.MemberNotJoined(group_id, self._member_id)
|
||||
|
||||
self.client.put(prefix_member, utils.dumps(capabilities),
|
||||
lease=self._membership_lease)
|
||||
|
||||
return coordination.CoordinatorResult(
|
||||
self._executor.submit(_update_capabilities))
|
||||
|
||||
def get_groups(self):
|
||||
@_translate_failures
|
||||
def _get_groups():
|
||||
groups = self.client.get_prefix(self.GROUP_PREFIX)
|
||||
return [
|
||||
group[1]['key'][len(self.GROUP_PREFIX):-1] for group in groups]
|
||||
return coordination.CoordinatorResult(
|
||||
self._executor.submit(_get_groups))
|
||||
|
Loading…
Reference in New Issue
Block a user