Use futurist to allow for executor providing and unifying

Depends-On: Ifd7b6e11863cae3d34fd5475a364ebb7c28811fc

Change-Id: Ie5f800c1620451e605e8d28673631b7436019279
This commit is contained in:
Joshua Harlow 2015-07-20 16:12:47 -07:00
parent e0aff95336
commit 8511d4c33a
8 changed files with 166 additions and 42 deletions

View File

@ -11,5 +11,6 @@ msgpack-python>=0.4.0
fasteners>=0.7 # Apache-2.0
retrying!=1.3.0,>=1.2.3 # Apache-2.0
futures>=3.0;python_version=='2.7' or python_version=='2.6'
futurist>=0.1.2 # Apache-2.0
oslo.utils>=1.9.0 # Apache-2.0
oslo.serialization>=1.4.0 # Apache-2.0

View File

@ -25,3 +25,6 @@ sysv-ipc>=0.6.8 # BSD License
kazoo>=2.2
pymemcache>=1.2.9 # Apache 2.0 License
redis>=2.10.0
# Ensure that the eventlet executor continues to operate...
eventlet>=0.17.4

View File

@ -14,8 +14,6 @@
# License for the specific language governing permissions and limitations
# under the License.
from concurrent import futures
import contextlib
import datetime
import errno
@ -25,6 +23,8 @@ import os
import shutil
import threading
from concurrent import futures
import fasteners
from oslo_utils import timeutils
import six
@ -133,6 +133,7 @@ class FileDriver(coordination._RunWatchersMixin,
super(FileDriver, self).__init__()
self._member_id = member_id
self._dir = parsed_url.path
self._executor = utils.ProxyExecutor.build("File", options)
self._group_dir = os.path.join(self._dir, 'groups')
self._driver_lock_path = os.path.join(self._dir, '.driver_lock')
self._driver_lock = locking.SharedWeakLockHelper(
@ -140,7 +141,6 @@ class FileDriver(coordination._RunWatchersMixin,
self._reserved_dirs = [self._dir, self._group_dir]
self._reserved_paths = list(self._reserved_dirs)
self._reserved_paths.append(self._driver_lock_path)
self._executor = None
self._joined_groups = set()
self._safe_member_id = self._make_filesystem_safe(member_id)
@ -159,25 +159,12 @@ class FileDriver(coordination._RunWatchersMixin,
def _start(self):
for a_dir in self._reserved_dirs:
utils.ensure_tree(a_dir)
self._executor = futures.ThreadPoolExecutor(max_workers=1)
self._executor.start()
def _stop(self):
while self._joined_groups:
self.leave_group(self._joined_groups.pop())
if self._executor is not None:
self._executor.shutdown(wait=True)
self._executor = None
def _submit(self, cb, *args, **kwargs):
executor = self._executor
if executor is None:
raise coordination.ToozError("File driver has not been started")
else:
try:
return executor.submit(cb, *args, **kwargs)
except RuntimeError:
raise coordination.ToozError("File driver asynchronous"
" executor has been shutdown")
self._executor.stop()
def create_group(self, group_id):
safe_group_id = self._make_filesystem_safe(group_id)
@ -197,7 +184,7 @@ class FileDriver(coordination._RunWatchersMixin,
with open(group_meta_path, "wb") as fh:
fh.write(details_blob)
fut = self._submit(_do_create_group)
fut = self._executor.submit(_do_create_group)
return FileFutureResult(fut)
def join_group(self, group_id, capabilities=b""):
@ -222,7 +209,7 @@ class FileDriver(coordination._RunWatchersMixin,
fh.write(details_blob)
self._joined_groups.add(group_id)
fut = self._submit(_do_join_group)
fut = self._executor.submit(_do_join_group)
return FileFutureResult(fut)
def leave_group(self, group_id):
@ -245,7 +232,7 @@ class FileDriver(coordination._RunWatchersMixin,
else:
self._joined_groups.discard(group_id)
fut = self._submit(_do_leave_group)
fut = self._executor.submit(_do_leave_group)
return FileFutureResult(fut)
def get_members(self, group_id):
@ -288,7 +275,7 @@ class FileDriver(coordination._RunWatchersMixin,
members.append(member_id)
return members
fut = self._submit(_do_get_members)
fut = self._executor.submit(_do_get_members)
return FileFutureResult(fut)
def get_member_capabilities(self, group_id, member_id):
@ -319,7 +306,7 @@ class FileDriver(coordination._RunWatchersMixin,
type(details)))
return details["capabilities"]
fut = self._submit(_do_get_member_capabilities)
fut = self._executor.submit(_do_get_member_capabilities)
return FileFutureResult(fut)
def delete_group(self, group_id):
@ -345,7 +332,7 @@ class FileDriver(coordination._RunWatchersMixin,
if e.errno != errno.ENOENT:
raise
fut = self._submit(_do_delete_group)
fut = self._executor.submit(_do_delete_group)
return FileFutureResult(fut)
def get_groups(self):
@ -372,7 +359,7 @@ class FileDriver(coordination._RunWatchersMixin,
raise
return groups
fut = self._submit(_do_get_groups)
fut = self._executor.submit(_do_get_groups)
return FileFutureResult(fut)
def _init_watch_group(self, group_id):

View File

@ -139,6 +139,7 @@ class IPCDriver(coordination.CoordinationDriver):
def __init__(self, member_id, parsed_url, options):
"""Initialize the IPC driver."""
super(IPCDriver, self).__init__()
self._executor = utils.ProxyExecutor.build("IPC", options)
def _start(self):
self._group_list = sysv_ipc.SharedMemory(
@ -146,10 +147,10 @@ class IPCDriver(coordination.CoordinationDriver):
sysv_ipc.IPC_CREAT,
size=self._SEGMENT_SIZE)
self._lock = self.get_lock(self._INTERNAL_LOCK_NAME)
self._executor = futures.ThreadPoolExecutor(max_workers=1)
self._executor.start()
def _stop(self):
self._executor.shutdown(wait=True)
self._executor.stop()
try:
self._group_list.detach()
self._group_list.remove()

View File

@ -151,7 +151,7 @@ class MemcachedDriver(coordination._RunWatchersMixin,
self._options = options
self._member_id = member_id
self._joined_groups = set()
self._executor = None
self._executor = utils.ProxyExecutor.build("Memcached", options)
self.host = (parsed_url.hostname or "localhost",
parsed_url.port or 11211)
default_timeout = options.get('timeout', self.DEFAULT_TIMEOUT)
@ -197,7 +197,7 @@ class MemcachedDriver(coordination._RunWatchersMixin,
# method and only connect once you do an operation.
self.heartbeat()
self._group_members = collections.defaultdict(set)
self._executor = futures.ThreadPoolExecutor(max_workers=1)
self._executor.start()
@_translate_failures
def _stop(self):
@ -213,9 +213,7 @@ class MemcachedDriver(coordination._RunWatchersMixin,
pass
except coordination.ToozError:
LOG.warning("Unable to leave group '%s'", g, exc_info=True)
if self._executor is not None:
self._executor.shutdown(wait=True)
self._executor = None
self._executor.stop()
self.client.close()
def _encode_group_id(self, group_id):

View File

@ -252,7 +252,7 @@ class RedisDriver(coordination._RunWatchersMixin,
self._member_id = self._to_binary(member_id)
self._acquired_locks = set()
self._joined_groups = set()
self._executor = None
self._executor = utils.ProxyExecutor.build("Redis", options)
self._started = False
self._server_info = {}
@ -343,7 +343,7 @@ class RedisDriver(coordination._RunWatchersMixin,
return redis.StrictRedis(**kwargs)
def _start(self):
self._executor = futures.ThreadPoolExecutor(max_workers=1)
self._executor.start()
try:
self._client = self._make_client(self._parsed_url, self._options,
self.timeout)
@ -428,9 +428,7 @@ class RedisDriver(coordination._RunWatchersMixin,
except coordination.ToozError:
LOG.warning("Unable to leave group '%s'", group_id,
exc_info=True)
if self._executor is not None:
self._executor.shutdown(wait=True)
self._executor = None
self._executor.stop()
if self._client is not None:
# Make sure we no longer exist...
beat_id = self._encode_beat_id(self._member_id)
@ -449,11 +447,7 @@ class RedisDriver(coordination._RunWatchersMixin,
def _submit(self, cb, *args, **kwargs):
if not self._started:
raise coordination.ToozError("Redis driver has not been started")
try:
return self._executor.submit(cb, *args, **kwargs)
except RuntimeError:
raise coordination.ToozError("Redis driver asynchronous executor"
" has been shutdown")
return self._executor.submit(cb, *args, **kwargs)
def create_group(self, group_id):
encoded_group = self._encode_group_id(group_id)

View File

@ -18,12 +18,65 @@
import os
import tempfile
import futurist
import six
from testtools import testcase
from tooz import coordination
from tooz import utils
class TestProxyExecutor(testcase.TestCase):
def test_fetch_check_executor(self):
try_options = [
({'executor': 'sync'}, futurist.SynchronousExecutor),
({'executor': 'thread'}, futurist.ThreadPoolExecutor),
({'executor': 'greenthread'}, futurist.GreenThreadPoolExecutor),
]
for options, expected_cls in try_options:
executor = utils.ProxyExecutor.build("test", options)
self.assertTrue(executor.internally_owned)
executor.start()
self.assertTrue(executor.started)
self.assertIsInstance(executor.executor, expected_cls)
executor.stop()
self.assertFalse(executor.started)
def test_fetch_default_executor(self):
executor = utils.ProxyExecutor.build("test", {})
executor.start()
try:
self.assertIsInstance(executor.executor,
futurist.ThreadPoolExecutor)
finally:
executor.stop()
def test_given_executor(self):
backing_executor = futurist.SynchronousExecutor()
options = {'executor': backing_executor}
executor = utils.ProxyExecutor.build("test", options)
executor.start()
try:
self.assertIs(backing_executor, executor.executor)
finally:
executor.stop()
# The backing executor should not be shutoff...
self.assertTrue(backing_executor.alive)
def test_fetch_unknown_executor(self):
options = {'executor': 'huh'}
self.assertRaises(coordination.ToozError,
utils.ProxyExecutor.build, 'test',
options)
def test_no_submit_stopped(self):
executor = utils.ProxyExecutor.build("test", {})
self.assertRaises(coordination.ToozError,
executor.submit, lambda: None)
class TestUtilsSafePath(testcase.TestCase):
base = tempfile.gettempdir()

View File

@ -17,6 +17,8 @@
import errno
import os
import futurist
from futurist import waiters
import msgpack
from oslo_serialization import msgpackutils
from oslo_utils import encodeutils
@ -25,6 +27,91 @@ import six
from tooz import coordination
class ProxyExecutor(object):
KIND_TO_FACTORY = {
'threaded': (lambda:
futurist.ThreadPoolExecutor(max_workers=1)),
'greenthreaded': (lambda:
futurist.GreenThreadPoolExecutor(max_workers=1)),
'synchronous': lambda: futurist.SynchronousExecutor(),
}
# Provide a few common aliases...
KIND_TO_FACTORY['thread'] = KIND_TO_FACTORY['threaded']
KIND_TO_FACTORY['threading'] = KIND_TO_FACTORY['threaded']
KIND_TO_FACTORY['sync'] = KIND_TO_FACTORY['synchronous']
KIND_TO_FACTORY['greenthread'] = KIND_TO_FACTORY['greenthreaded']
KIND_TO_FACTORY['greenthreading'] = KIND_TO_FACTORY['greenthreaded']
DEFAULT_KIND = 'threaded'
def __init__(self, driver_name, default_executor_factory, executor=None):
self.default_executor_factory = default_executor_factory
self.driver_name = driver_name
self.dispatched = set()
self.started = False
if executor is None:
self.executor = None
self.internally_owned = True
else:
self.executor = executor
self.internally_owned = False
@classmethod
def build(cls, driver_name, options):
default_executor_fact = cls.KIND_TO_FACTORY[cls.DEFAULT_KIND]
executor = None
if 'executor' in options:
executor = options['executor']
if isinstance(executor, six.string_types):
try:
default_executor_fact = cls.KIND_TO_FACTORY[executor]
executor = None
except KeyError:
executors_known = sorted(list(cls.KIND_TO_FACTORY))
raise coordination.ToozError("Unknown executor string"
" '%s' accepted values"
" are %s" % (executor,
executors_known))
return cls(driver_name, default_executor_fact, executor=executor)
def start(self):
if self.started:
return
if self.internally_owned:
self.executor = self.default_executor_factory()
self.started = True
def stop(self):
self.started = False
not_done = self.dispatched.copy()
if not_done:
waiters.wait_for_all(not_done)
if self.internally_owned:
executor = self.executor
self.executor = None
if executor is not None:
executor.shutdown()
def _on_done(self, fut):
self.dispatched.discard(fut)
def submit(self, cb, *args, **kwargs):
if not self.started:
raise coordination.ToozError("%s driver asynchronous executor"
" has not been started"
% self.driver_name)
try:
fut = self.executor.submit(cb, *args, **kwargs)
except RuntimeError:
raise coordination.ToozError("%s driver asynchronous executor has"
" been shutdown" % self.driver_name)
else:
self.dispatched.add(fut)
fut.add_done_callback(self._on_done)
return fut
def safe_abs_path(rooted_at, *pieces):
# Avoids the following junk...
#