Add asynchronous API

The Tooz asynchronous API rely on zookeeper client asynchronous
API. Each asynchronous call will return a future like object.

Change-Id: Ibee9bf8ae0d1c1f62318f46b58ee25631e9e319a
This commit is contained in:
Yassine Lamgarchal 2014-01-17 15:12:40 +01:00
parent d39dedd0ef
commit 5a11223c46
4 changed files with 167 additions and 74 deletions

View File

@ -7,4 +7,4 @@ python-subunit
testrepository>=0.0.17
testtools>=0.9.32
testscenarios>=0.4
zake>=0.0.6
zake>=0.0.9

View File

@ -46,69 +46,98 @@ class CoordinationDriver(object):
@abc.abstractmethod
def create_group(self, group_id):
"""Request the creation of a group.
"""Request the creation of a group asynchronously.
:param group_id: the id of the group to create
:type group_id: str
:returns: None
:rtype: CoordAsyncResult
"""
@abc.abstractmethod
def get_groups(self):
"""Return the list composed by all groups ids.
"""Return the list composed by all groups ids asynchronously.
:returns: list of all created group ids
:rtype: list
:returns: the list of all created group ids
:rtype: CoordAsyncResult
"""
@abc.abstractmethod
def join_group(self, group_id, capabilities):
"""Join a group and establish group membership.
"""Join a group and establish group membership asynchronously.
:param group_id: the id of the group to join
:type group_id: str
:param capabilities: the capabilities of the joined member
:type capabilities: str
:returns: None
:rtype: CoordAsyncResult
"""
@abc.abstractmethod
def leave_group(self, group_id):
"""Leave a group.
"""Leave a group asynchronously.
:param group_id: the id of the group to leave
:type group_id: str
:returns: None
:rtype: CoordAsyncResult
"""
@abc.abstractmethod
def get_members(self, group_id):
"""Return the list of all members ids of the specified group.
"""Return the list of all members ids of the specified group
asynchronously.
:returns: list of all created group ids
:rtype: list
:rtype: CoordAsyncResult
"""
@abc.abstractmethod
def get_member_capabilities(self, group_id, member_id):
"""Return the capabilities of a member.
"""Return the capabilities of a member asynchronously.
:param group_id: the id of the group of the member
:type group_id: str
:param member_id: the id of the member
:type member_id: str
:returns: capabilities of a member
:rtype: str
:rtype: CoordAsyncResult
"""
@abc.abstractmethod
def update_capabilities(self, group_id, capabilities):
"""Update capabilities of the caller in the specified group.
"""Update capabilities of the caller in the specified group
asynchronously.
:param group_id: the id of the group of the current member
:type group_id: str
:param capabilities: the capabilities of the updated member
:type capabilities: str
:returns: None
:rtype: CoordAsyncResult
"""
@six.add_metaclass(abc.ABCMeta)
class CoordAsyncResult(object):
"""Representation of an asynchronous task, every call API
returns an CoordAsyncResult object on which the result or
the status of the task can be requested.
"""
@abc.abstractmethod
def get(self, timeout=10):
"""Retrieve the result of the corresponding asynchronous call.
:param timeout: block until the timeout expire.
:type timeout: float
"""
@abc.abstractmethod
def done(self):
"""Returns True if the task is done, False otherwise."""
#TODO(yassine)
#Replace kwargs by something more simple.
def get_coordinator(backend, member_id, **kwargs):

View File

@ -40,85 +40,131 @@ class BaseZooKeeperDriver(coordination.CoordinationDriver):
def stop(self):
self._coord.stop()
def create_group(self, group_id):
@staticmethod
def _create_group_handler(async_result, timeout, group_id):
try:
group_path = "/%s/%s" % (_TOOZ_NAMESPACE, group_id)
self._wrap_kazoo_call(self._coord.create, group_path)
async_result.get(block=True, timeout=timeout)
except exceptions.NodeExistsError:
raise coordination.GroupAlreadyExist("group_id=%s" % group_id)
except exceptions.NoNodeError:
raise coordination.ToozError("tooz namespace has not been created")
def join_group(self, group_id, capabilities=b""):
def create_group(self, group_id):
group_path = "/%s/%s" % (_TOOZ_NAMESPACE, group_id)
async_result = self._wrap_kazoo_call(self._coord.create_async,
group_path)
return ZooAsyncResult(async_result, self._create_group_handler,
group_id=group_id)
@staticmethod
def _join_group_handler(async_result, timeout, group_id, member_id):
try:
member_path = self._path_member(group_id, self._member_id)
self._wrap_kazoo_call(self._coord.create,
member_path,
value=capabilities,
ephemeral=True)
async_result.get(block=True, timeout=timeout)
except exceptions.NodeExistsError:
raise coordination.MemberAlreadyExist(str(self._member_id))
raise coordination.MemberAlreadyExist(str(member_id))
except exceptions.NoNodeError:
raise coordination.GroupNotCreated("group '%s' has not been "
"created" % _TOOZ_NAMESPACE)
"created" % group_id)
def leave_group(self, group_id):
def join_group(self, group_id, capabilities=b""):
member_path = self._path_member(group_id, self._member_id)
async_result = self._wrap_kazoo_call(self._coord.create_async,
member_path,
value=capabilities,
ephemeral=True)
return ZooAsyncResult(async_result, self._join_group_handler,
group_id=group_id, member_id=self._member_id)
@staticmethod
def _leave_group_handler(async_result, timeout, group_id, member_id):
try:
member_path = self._path_member(group_id, self._member_id)
self._wrap_kazoo_call(self._coord.delete, member_path)
async_result.get(block=True, timeout=timeout)
except exceptions.NoNodeError:
raise coordination.MemberNotJoined("member '%s' has not joined "
"the group '%s' or the group "
"has not been created" %
(self._member_id, group_id))
member_id, group_id)
except exceptions.ZookeeperError as e:
raise coordination.ToozError(str(e))
def get_members(self, group_id):
member_ids = []
def leave_group(self, group_id):
member_path = self._path_member(group_id, self._member_id)
async_result = self._wrap_kazoo_call(self._coord.delete_async,
member_path)
return ZooAsyncResult(async_result, self._leave_group_handler,
group_id=group_id, member_id=self._member_id)
@staticmethod
def _get_members_handler(async_result, timeout, group_id):
members_ids = None
try:
group_path = paths.join("/", _TOOZ_NAMESPACE, group_id)
member_ids = self._wrap_kazoo_call(self._coord.get_children,
group_path)
members_ids = async_result.get(block=True, timeout=timeout)
except exceptions.NoNodeError:
raise coordination.GroupNotCreated("group '%s' does not exist" %
group_id)
return members_ids
return member_ids
def get_members(self, group_id):
group_path = paths.join("/", _TOOZ_NAMESPACE, group_id)
async_result = self._wrap_kazoo_call(self._coord.get_children_async,
group_path)
return ZooAsyncResult(async_result, self._get_members_handler,
group_id=group_id)
@staticmethod
def _update_capabilities_handler(async_result, timeout, group_id,
member_id):
try:
async_result.get(block=True, timeout=timeout)
except exceptions.NoNodeError:
raise coordination.MemberNotJoined("member '%s' has not joined "
"the group '%s' or the group "
"has not been created" %
(member_id, group_id))
def update_capabilities(self, group_id, capabilities):
try:
member_path = self._path_member(group_id, self._member_id)
self._wrap_kazoo_call(self._coord.set, member_path, capabilities)
except exceptions.NoNodeError:
raise coordination.MemberNotJoined("member '%s' has not joined "
"the group '%s' or the group "
"has not been created" %
(self._member_id, group_id))
member_path = self._path_member(group_id, self._member_id)
async_result = self._wrap_kazoo_call(self._coord.set_async,
member_path, capabilities)
return ZooAsyncResult(async_result, self._update_capabilities_handler,
group_id=group_id, member_id=self._member_id)
def get_member_capabilities(self, group_id, member_id):
@staticmethod
def _get_member_capabilities_handler(async_result, timeout, group_id,
member_id):
capabilities = ""
try:
member_path = self._path_member(group_id, member_id)
capabilities = self._wrap_kazoo_call(self._coord.get,
member_path)[0]
capabilities = async_result.get(block=True, timeout=timeout)[0]
except exceptions.NoNodeError:
raise coordination.MemberNotJoined("member '%s' has not joined "
"the group '%s' or the group "
"has not been created" %
(self._member_id, group_id))
(member_id, group_id))
return capabilities
def get_groups(self):
def get_member_capabilities(self, group_id, member_id):
member_path = self._path_member(group_id, member_id)
async_result = self._wrap_kazoo_call(self._coord.get_async,
member_path)
return ZooAsyncResult(async_result,
self._get_member_capabilities_handler,
group_id=group_id, member_id=self._member_id)
@staticmethod
def _get_groups_handler(async_result, timeout):
group_ids = []
try:
group_ids = self._wrap_kazoo_call(self._coord.get_children,
paths.join("/", _TOOZ_NAMESPACE))
group_ids = async_result.get(block=True, timeout=timeout)
except exceptions.NoNodeError:
raise coordination.ToozError("tooz namespace has "
"not been created")
raise coordination.ToozError("tooz namespace has not been created")
return group_ids
def get_groups(self):
tooz_namespace = paths.join("/", _TOOZ_NAMESPACE)
async_result = self._wrap_kazoo_call(self._coord.get_children_async,
tooz_namespace)
return ZooAsyncResult(async_result, self._get_groups_handler)
@staticmethod
def _path_member(group_id, member_id):
return paths.join("/", _TOOZ_NAMESPACE, group_id, member_id)
@ -163,3 +209,17 @@ class ZakeDriver(BaseZooKeeperDriver):
self._member_id = member_id
self._coord = fake_client.FakeClient(storage=storage)
super(ZakeDriver, self).__init__()
class ZooAsyncResult(coordination.CoordAsyncResult):
def __init__(self, kazooAsyncResult, handler, **kwargs):
self.kazooAsyncResult = kazooAsyncResult
self.handler = handler
self.kwargs = kwargs
def get(self, timeout=15):
return self.handler(self.kazooAsyncResult, timeout, **self.kwargs)
def done(self):
return self.kazooAsyncResult.ready()

View File

@ -55,31 +55,35 @@ class TestAPI(testscenarios.TestWithScenarios, testcase.TestCase):
super(TestAPI, self).tearDown()
def test_create_group(self):
self._coord.create_group(self.group_id)
all_group_ids = self._coord.get_groups()
self._coord.create_group(self.group_id).get()
all_group_ids = self._coord.get_groups().get()
self.assertTrue(self.group_id in all_group_ids)
def test_get_groups(self):
groups_ids = [self._get_random_uuid() for _ in range(0, 5)]
for group_id in groups_ids:
self._coord.create_group(group_id)
created_groups = self._coord.get_groups()
self._coord.create_group(group_id).get()
created_groups = self._coord.get_groups().get()
for group_id in groups_ids:
self.assertTrue(group_id in created_groups)
def test_join_group(self):
self._coord.create_group(self.group_id)
self._coord.join_group(self.group_id)
member_list = self._coord.get_members(self.group_id)
self._coord.create_group(self.group_id).get()
self._coord.join_group(self.group_id).get()
member_list = self._coord.get_members(self.group_id).get()
self.assertTrue(self.member_id in member_list)
def test_leave_group(self):
self._coord.create_group(self.group_id)
self._coord.join_group(self.group_id)
member_ids = self._coord.get_members(self.group_id)
self._coord.create_group(self.group_id).get()
all_group_ids = self._coord.get_groups().get()
self.assertTrue(self.group_id in all_group_ids)
self._coord.join_group(self.group_id).get()
member_list = self._coord.get_members(self.group_id).get()
self.assertTrue(self.member_id in member_list)
member_ids = self._coord.get_members(self.group_id).get()
self.assertTrue(self.member_id in member_ids)
self._coord.leave_group(self.group_id)
new_member_objects = self._coord.get_members(self.group_id)
self._coord.leave_group(self.group_id).get()
new_member_objects = self._coord.get_members(self.group_id).get()
new_member_list = [member.member_id for member in new_member_objects]
self.assertTrue(self.member_id not in new_member_list)
@ -91,33 +95,33 @@ class TestAPI(testscenarios.TestWithScenarios, testcase.TestCase):
**self.kwargs)
client2.start()
self._coord.create_group(group_id_test2)
self._coord.join_group(group_id_test2)
client2.join_group(group_id_test2)
members_ids = self._coord.get_members(group_id_test2)
self._coord.create_group(group_id_test2).get()
self._coord.join_group(group_id_test2).get()
client2.join_group(group_id_test2).get()
members_ids = self._coord.get_members(group_id_test2).get()
self.assertTrue(self.member_id in members_ids)
self.assertTrue(member_id_test2 in members_ids)
def test_get_member_capabilities(self):
self._coord.create_group(self.group_id)
self._coord.create_group(self.group_id).get()
self._coord.join_group(self.group_id, b"test_capabilities")
capa = self._coord.get_member_capabilities(self.group_id,
self.member_id)
self.member_id).get()
self.assertEqual(capa, b"test_capabilities")
def test_update_capabilities(self):
self._coord.create_group(self.group_id)
self._coord.join_group(self.group_id, b"test_capabilities1")
self._coord.create_group(self.group_id).get()
self._coord.join_group(self.group_id, b"test_capabilities1").get()
capa = self._coord.get_member_capabilities(self.group_id,
self.member_id)
self.member_id).get()
self.assertEqual(capa, b"test_capabilities1")
self._coord.update_capabilities(self.group_id,
b"test_capabilities2")
b"test_capabilities2").get()
capa2 = self._coord.get_member_capabilities(self.group_id,
self.member_id)
self.member_id).get()
self.assertEqual(capa2, b"test_capabilities2")
def _get_random_uuid(self):