Merge "Convert the rest of memcached driver functions to futures"

This commit is contained in:
Jenkins 2014-10-23 15:37:01 +00:00 committed by Gerrit Code Review
commit eb0c584949
1 changed files with 48 additions and 84 deletions

View File

@ -207,46 +207,44 @@ class MemcachedDriver(coordination.CoordinationDriver):
return self.client.get(self._GROUP_LIST_KEY) or []
return MemcachedFutureResult(self._executor.submit(_get_groups))
@retry
def join_group(self, group_id, capabilities=b""):
encoded_group = self._encode_group_id(group_id)
group_members, cas = self.client.gets(encoded_group)
if not cas:
return MemcachedAsyncError(
coordination.GroupNotCreated(group_id))
if self._member_id in group_members:
return MemcachedAsyncError(
coordination.MemberAlreadyExist(group_id, self._member_id))
group_members[self._member_id] = {
"capabilities": capabilities,
}
if not self.client.cas(encoded_group,
group_members,
cas):
# It changed, let's try again
raise Retry
self._groups.add(group_id)
return MemcachedAsyncResult(None)
@retry
@retry
def _join_group():
group_members, cas = self.client.gets(encoded_group)
if not cas:
raise coordination.GroupNotCreated(group_id)
if self._member_id in group_members:
raise coordination.MemberAlreadyExist(group_id,
self._member_id)
group_members[self._member_id] = {
"capabilities": capabilities,
}
if not self.client.cas(encoded_group, group_members, cas):
# It changed, let's try again
raise Retry
self._groups.add(group_id)
return MemcachedFutureResult(self._executor.submit(_join_group))
def leave_group(self, group_id):
encoded_group = self._encode_group_id(group_id)
group_members, cas = self.client.gets(encoded_group)
if not cas:
return MemcachedAsyncError(
coordination.GroupNotCreated(group_id))
if self._member_id not in group_members:
return MemcachedAsyncError(
coordination.MemberNotJoined(group_id,
self._member_id))
del group_members[self._member_id]
if not self.client.cas(encoded_group,
group_members,
cas):
# It changed, let's try again
raise Retry
self._groups.discard(group_id)
return MemcachedAsyncResult(None)
@retry
def _leave_group():
group_members, cas = self.client.gets(encoded_group)
if not cas:
raise coordination.GroupNotCreated(group_id)
if self._member_id not in group_members:
raise coordination.MemberNotJoined(group_id, self._member_id)
del group_members[self._member_id]
if not self.client.cas(encoded_group, group_members, cas):
# It changed, let's try again
raise Retry
self._groups.discard(group_id)
return MemcachedFutureResult(self._executor.submit(_leave_group))
def _get_members(self, group_id):
group_members = self.client.get(self._encode_group_id(group_id))
@ -269,21 +267,23 @@ class MemcachedDriver(coordination.CoordinationDriver):
return MemcachedFutureResult(
self._executor.submit(_get_member_capabilities))
@retry
def update_capabilities(self, group_id, capabilities):
encoded_group = self._encode_group_id(group_id)
group_members, cas = self.client.gets(encoded_group)
if cas is None:
return MemcachedAsyncError(
coordination.GroupNotCreated(group_id))
if self._member_id not in group_members:
return MemcachedAsyncError(
coordination.MemberNotJoined(group_id, self._member_id))
group_members[self._member_id][b'capabilities'] = capabilities
if not self.client.cas(encoded_group, group_members, cas):
# It changed, try again
raise Retry
return MemcachedAsyncResult(None)
@retry
def _update_capabilities():
group_members, cas = self.client.gets(encoded_group)
if cas is None:
raise coordination.GroupNotCreated(group_id)
if self._member_id not in group_members:
raise coordination.MemberNotJoined(group_id, self._member_id)
group_members[self._member_id][b'capabilities'] = capabilities
if not self.client.cas(encoded_group, group_members, cas):
# It changed, try again
raise Retry
return MemcachedFutureResult(
self._executor.submit(_update_capabilities))
def get_leader(self, group_id):
def _get_leader():
@ -385,39 +385,3 @@ class MemcachedFutureResult(coordination.CoordAsyncResult):
def done(self):
return self._fut.done()
class MemcachedAsyncResult(coordination.CoordAsyncResult):
"""Memcached asynchronous result.
Unfortunately, this is mostely a fake because our driver is not
asynchronous at all. :-(.
"""
def __init__(self, result):
self._result = result
def get(self, timeout=0):
return self._result
@staticmethod
def done():
return True
class MemcachedAsyncError(coordination.CoordAsyncResult):
"""Memcached asynchronous error.
Unfortunately, this is mostely a fake because our driver is not
asynchronous at all. :-(.
"""
def __init__(self, error):
self._error = error
def get(self, timeout=10):
raise self._error
@staticmethod
def done():
return True