coordination: factorize common async result futures code

Many drivers leverage concurrent.futures to return result, so let's just
factorize all of that in a same place.

Change-Id: Ibdec36912b553eb4b76c9f99619c171bc41766bb
This commit is contained in:
Julien Danjou 2017-05-08 17:48:42 -04:00
parent 17620203cb
commit cae8421fd9
5 changed files with 68 additions and 104 deletions

View File

@ -17,10 +17,12 @@
import abc
import collections
from concurrent import futures
import enum
import logging
import threading
from oslo_utils import encodeutils
from oslo_utils import excutils
from oslo_utils import netutils
from oslo_utils import timeutils
@ -647,6 +649,29 @@ class CoordAsyncResult(object):
"""Returns True if the task is done, False otherwise."""
class CoordinatorResult(CoordAsyncResult):
"""Asynchronous result that references a future."""
def __init__(self, fut, failure_translator=None):
self._fut = fut
self._failure_translator = failure_translator
def get(self, timeout=None):
try:
if self._failure_translator:
with self._failure_translator():
return self._fut.result(timeout=timeout)
else:
return self._fut.result(timeout=timeout)
except futures.TimeoutError as e:
utils.raise_with_cause(OperationTimedOut,
encodeutils.exception_to_unicode(e),
cause=e)
def done(self):
return self._fut.done()
class CoordinationDriverCachedRunWatchers(CoordinationDriver):
"""Coordination driver with a `run_watchers` implementation.

View File

@ -17,6 +17,7 @@
import contextlib
import datetime
import errno
import functools
import hashlib
import logging
import os
@ -27,8 +28,6 @@ import tempfile
import threading
import weakref
from concurrent import futures
import fasteners
from oslo_utils import encodeutils
from oslo_utils import fileutils
@ -533,23 +532,5 @@ class FileDriver(coordination.CoordinationDriverCachedRunWatchers):
raise tooz.NotImplemented
class FileFutureResult(coordination.CoordAsyncResult):
"""File asynchronous result that references a future."""
def __init__(self, fut):
self._fut = fut
def get(self, timeout=None):
try:
# Late translate the common failures since the file driver
# may throw things that we can not catch in the callbacks where
# it is used.
with _translate_failures():
return self._fut.result(timeout=timeout)
except futures.TimeoutError as e:
utils.raise_with_cause(coordination.OperationTimedOut,
encodeutils.exception_to_unicode(e),
cause=e)
def done(self):
return self._fut.done()
FileFutureResult = functools.partial(coordination.CoordinatorResult,
failure_translator=_translate_failures)

View File

@ -18,9 +18,7 @@ import hashlib
import struct
import time
from concurrent import futures
import msgpack
from oslo_utils import encodeutils
import six
import sysv_ipc
@ -212,7 +210,8 @@ class IPCDriver(coordination.CoordinationDriver):
group_list.add(group_id)
self._write_group_list(group_list)
return IPCFutureResult(self._executor.submit(_create_group))
return coordination.CoordinatorResult(
self._executor.submit(_create_group))
def delete_group(self, group_id):
def _delete_group():
@ -223,7 +222,8 @@ class IPCDriver(coordination.CoordinationDriver):
group_list.remove(group_id)
self._write_group_list(group_list)
return IPCFutureResult(self._executor.submit(_delete_group))
return coordination.CoordinatorResult(
self._executor.submit(_delete_group))
def watch_join_group(self, group_id, callback):
# Check the group exist
@ -240,26 +240,9 @@ class IPCDriver(coordination.CoordinationDriver):
return self._read_group_list()
def get_groups(self):
return IPCFutureResult(self._executor.submit(
return coordination.CoordinatorResult(self._executor.submit(
self._get_groups_handler))
@staticmethod
def get_lock(name):
return IPCLock(name)
class IPCFutureResult(coordination.CoordAsyncResult):
"""IPC asynchronous result that references a future."""
def __init__(self, fut):
self._fut = fut
def get(self, timeout=10):
try:
return self._fut.result(timeout=timeout)
except futures.TimeoutError as e:
utils.raise_with_cause(coordination.OperationTimedOut,
encodeutils.exception_to_unicode(e),
cause=e)
def done(self):
return self._fut.done()

View File

@ -14,11 +14,12 @@
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import errno
import functools
import logging
import socket
from concurrent import futures
from oslo_utils import encodeutils
from pymemcache import client as pymemcache_client
import six
@ -33,35 +34,41 @@ from tooz import utils
LOG = logging.getLogger(__name__)
def _translate_failures(func):
@contextlib.contextmanager
def _failure_translator():
"""Translates common pymemcache exceptions into tooz exceptions.
https://github.com/pinterest/pymemcache/blob/d995/pymemcache/client.py#L202
"""
try:
yield
except pymemcache_client.MemcacheUnexpectedCloseError as e:
utils.raise_with_cause(coordination.ToozConnectionError,
encodeutils.exception_to_unicode(e),
cause=e)
except (socket.timeout, socket.error,
socket.gaierror, socket.herror) as e:
# TODO(harlowja): get upstream pymemcache to produce a better
# exception for these, using socket (vs. a memcache specific
# error) seems sorta not right and/or the best approach...
msg = encodeutils.exception_to_unicode(e)
if e.errno is not None:
msg += " (with errno %s [%s])" % (errno.errorcode[e.errno],
e.errno)
utils.raise_with_cause(coordination.ToozConnectionError,
msg, cause=e)
except pymemcache_client.MemcacheError as e:
utils.raise_with_cause(tooz.ToozError,
encodeutils.exception_to_unicode(e),
cause=e)
def _translate_failures(func):
@six.wraps(func)
def wrapper(*args, **kwargs):
try:
with _failure_translator():
return func(*args, **kwargs)
except pymemcache_client.MemcacheUnexpectedCloseError as e:
utils.raise_with_cause(coordination.ToozConnectionError,
encodeutils.exception_to_unicode(e),
cause=e)
except (socket.timeout, socket.error,
socket.gaierror, socket.herror) as e:
# TODO(harlowja): get upstream pymemcache to produce a better
# exception for these, using socket (vs. a memcache specific
# error) seems sorta not right and/or the best approach...
msg = encodeutils.exception_to_unicode(e)
if e.errno is not None:
msg += " (with errno %s [%s])" % (errno.errorcode[e.errno],
e.errno)
utils.raise_with_cause(coordination.ToozConnectionError,
msg, cause=e)
except pymemcache_client.MemcacheError as e:
utils.raise_with_cause(tooz.ToozError,
encodeutils.exception_to_unicode(e),
cause=e)
return wrapper
@ -504,19 +511,6 @@ class MemcachedDriver(coordination.CoordinationDriverCachedRunWatchers):
return result
class MemcachedFutureResult(coordination.CoordAsyncResult):
"""Memcached asynchronous result that references a future."""
def __init__(self, fut):
self._fut = fut
def get(self, timeout=10):
try:
return self._fut.result(timeout=timeout)
except futures.TimeoutError as e:
utils.raise_with_cause(
coordination.OperationTimedOut,
encodeutils.exception_to_unicode(e),
cause=e)
def done(self):
return self._fut.done()
MemcachedFutureResult = functools.partial(
coordination.CoordinatorResult,
failure_translator=_failure_translator)

View File

@ -18,11 +18,11 @@ from __future__ import absolute_import
import contextlib
from distutils import version
import functools
import logging
import string
import threading
from concurrent import futures
from oslo_utils import encodeutils
from oslo_utils import strutils
import redis
@ -747,24 +747,5 @@ return 1
return result
class RedisFutureResult(coordination.CoordAsyncResult):
"""Redis asynchronous result that references a future."""
def __init__(self, fut):
self._fut = fut
def get(self, timeout=10):
try:
# Late translate the common failures since the redis client
# may throw things that we can not catch in the callbacks where
# it is used (especially one that uses the transaction
# method).
with _translate_failures():
return self._fut.result(timeout=timeout)
except futures.TimeoutError as e:
utils.raise_with_cause(coordination.OperationTimedOut,
encodeutils.exception_to_unicode(e),
cause=e)
def done(self):
return self._fut.done()
RedisFutureResult = functools.partial(coordination.CoordinatorResult,
failure_translator=_translate_failures)