Merge "Start to add a catch and reraise of timed out exceptions"
This commit is contained in:
commit
f504e44538
|
@ -358,6 +358,10 @@ class ToozConnectionError(ToozError):
|
|||
"""
|
||||
|
||||
|
||||
class OperationTimedOut(ToozError):
|
||||
"""Exception raised when an operation times out."""
|
||||
|
||||
|
||||
class GroupNotCreated(ToozError):
|
||||
"""Exception raised when the caller request a group which does
|
||||
not exist.
|
||||
|
|
|
@ -73,9 +73,12 @@ class BaseZooKeeperDriver(coordination.CoordinationDriver):
|
|||
self._coord.stop()
|
||||
|
||||
@staticmethod
|
||||
def _create_group_handler(async_result, timeout, group_id):
|
||||
def _create_group_handler(async_result, timeout,
|
||||
timeout_exception, group_id):
|
||||
try:
|
||||
async_result.get(block=True, timeout=timeout)
|
||||
except timeout_exception as e:
|
||||
raise coordination.OperationTimedOut(utils.exception_message(e))
|
||||
except exceptions.NodeExistsError:
|
||||
raise coordination.GroupAlreadyExist(group_id)
|
||||
except exceptions.NoNodeError:
|
||||
|
@ -87,12 +90,16 @@ class BaseZooKeeperDriver(coordination.CoordinationDriver):
|
|||
group_path = self._path_group(group_id)
|
||||
async_result = self._coord.create_async(group_path)
|
||||
return ZooAsyncResult(async_result, self._create_group_handler,
|
||||
timeout_exception=self._timeout_exception,
|
||||
group_id=group_id)
|
||||
|
||||
@staticmethod
|
||||
def _join_group_handler(async_result, timeout, group_id, member_id):
|
||||
def _join_group_handler(async_result, timeout,
|
||||
timeout_exception, group_id, member_id):
|
||||
try:
|
||||
async_result.get(block=True, timeout=timeout)
|
||||
except timeout_exception as e:
|
||||
raise coordination.OperationTimedOut(utils.exception_message(e))
|
||||
except exceptions.NodeExistsError:
|
||||
raise coordination.MemberAlreadyExist(group_id, member_id)
|
||||
except exceptions.NoNodeError:
|
||||
|
@ -106,12 +113,16 @@ class BaseZooKeeperDriver(coordination.CoordinationDriver):
|
|||
value=capabilities,
|
||||
ephemeral=True)
|
||||
return ZooAsyncResult(async_result, self._join_group_handler,
|
||||
timeout_exception=self._timeout_exception,
|
||||
group_id=group_id, member_id=self._member_id)
|
||||
|
||||
@staticmethod
|
||||
def _leave_group_handler(async_result, timeout, group_id, member_id):
|
||||
def _leave_group_handler(async_result, timeout,
|
||||
timeout_exception, group_id, member_id):
|
||||
try:
|
||||
async_result.get(block=True, timeout=timeout)
|
||||
except timeout_exception as e:
|
||||
raise coordination.OperationTimedOut(utils.exception_message(e))
|
||||
except exceptions.NoNodeError:
|
||||
raise coordination.MemberNotJoined(group_id, member_id)
|
||||
except exceptions.ZookeeperError as e:
|
||||
|
@ -121,12 +132,16 @@ class BaseZooKeeperDriver(coordination.CoordinationDriver):
|
|||
member_path = self._path_member(group_id, self._member_id)
|
||||
async_result = self._coord.delete_async(member_path)
|
||||
return ZooAsyncResult(async_result, self._leave_group_handler,
|
||||
timeout_exception=self._timeout_exception,
|
||||
group_id=group_id, member_id=self._member_id)
|
||||
|
||||
@staticmethod
|
||||
def _get_members_handler(async_result, timeout, group_id):
|
||||
def _get_members_handler(async_result, timeout,
|
||||
timeout_exception, group_id):
|
||||
try:
|
||||
members_ids = async_result.get(block=True, timeout=timeout)
|
||||
except timeout_exception as e:
|
||||
raise coordination.OperationTimedOut(utils.exception_message(e))
|
||||
except exceptions.NoNodeError:
|
||||
raise coordination.GroupNotCreated(group_id)
|
||||
except exceptions.ZookeeperError as e:
|
||||
|
@ -138,13 +153,16 @@ class BaseZooKeeperDriver(coordination.CoordinationDriver):
|
|||
group_path = self.paths_join("/", self._TOOZ_NAMESPACE, group_id)
|
||||
async_result = self._coord.get_children_async(group_path)
|
||||
return ZooAsyncResult(async_result, self._get_members_handler,
|
||||
timeout_exception=self._timeout_exception,
|
||||
group_id=group_id)
|
||||
|
||||
@staticmethod
|
||||
def _update_capabilities_handler(async_result, timeout, group_id,
|
||||
member_id):
|
||||
def _update_capabilities_handler(async_result, timeout,
|
||||
timeout_exception, group_id, member_id):
|
||||
try:
|
||||
async_result.get(block=True, timeout=timeout)
|
||||
except timeout_exception as e:
|
||||
raise coordination.OperationTimedOut(utils.exception_message(e))
|
||||
except exceptions.NoNodeError:
|
||||
raise coordination.MemberNotJoined(group_id, member_id)
|
||||
except exceptions.ZookeeperError as e:
|
||||
|
@ -154,13 +172,17 @@ class BaseZooKeeperDriver(coordination.CoordinationDriver):
|
|||
member_path = self._path_member(group_id, self._member_id)
|
||||
async_result = self._coord.set_async(member_path, capabilities)
|
||||
return ZooAsyncResult(async_result, self._update_capabilities_handler,
|
||||
timeout_exception=self._timeout_exception,
|
||||
group_id=group_id, member_id=self._member_id)
|
||||
|
||||
@staticmethod
|
||||
def _get_member_capabilities_handler(async_result, timeout, group_id,
|
||||
def _get_member_capabilities_handler(async_result, timeout,
|
||||
timeout_exception, group_id,
|
||||
member_id):
|
||||
try:
|
||||
capabilities = async_result.get(block=True, timeout=timeout)[0]
|
||||
except timeout_exception as e:
|
||||
raise coordination.OperationTimedOut(utils.exception_message(e))
|
||||
except exceptions.NoNodeError:
|
||||
raise coordination.MemberNotJoined(group_id, member_id)
|
||||
except exceptions.ZookeeperError as e:
|
||||
|
@ -173,12 +195,15 @@ class BaseZooKeeperDriver(coordination.CoordinationDriver):
|
|||
async_result = self._coord.get_async(member_path)
|
||||
return ZooAsyncResult(async_result,
|
||||
self._get_member_capabilities_handler,
|
||||
timeout_exception=self._timeout_exception,
|
||||
group_id=group_id, member_id=self._member_id)
|
||||
|
||||
@staticmethod
|
||||
def _get_groups_handler(async_result, timeout):
|
||||
def _get_groups_handler(async_result, timeout, timeout_exception):
|
||||
try:
|
||||
group_ids = async_result.get(block=True, timeout=timeout)
|
||||
except timeout_exception as e:
|
||||
raise coordination.OperationTimedOut(utils.exception_message(e))
|
||||
except exceptions.NoNodeError:
|
||||
raise coordination.ToozError("tooz namespace has not been created")
|
||||
except exceptions.ZookeeperError as e:
|
||||
|
@ -189,7 +214,8 @@ class BaseZooKeeperDriver(coordination.CoordinationDriver):
|
|||
def get_groups(self):
|
||||
tooz_namespace = self.paths_join("/", self._TOOZ_NAMESPACE)
|
||||
async_result = self._coord.get_children_async(tooz_namespace)
|
||||
return ZooAsyncResult(async_result, self._get_groups_handler)
|
||||
return ZooAsyncResult(async_result, self._get_groups_handler,
|
||||
timeout_exception=self._timeout_exception)
|
||||
|
||||
def _path_group(self, group_id):
|
||||
return self.paths_join("/", self._TOOZ_NAMESPACE, group_id)
|
||||
|
@ -216,6 +242,7 @@ class KazooDriver(BaseZooKeeperDriver):
|
|||
super(KazooDriver, self).__init__(member_id, parsed_url, options)
|
||||
self._coord = self._make_client(parsed_url, options)
|
||||
self._member_id = member_id
|
||||
self._timeout_exception = self._coord.handler.timeout_exception
|
||||
|
||||
@classmethod
|
||||
def _make_client(cls, parsed_url, options):
|
||||
|
|
Loading…
Reference in New Issue