Be more restrictive on the executors users can provide
Instead of allowing executor types to be provided for now just allow only strings to be provided, and restrict that set of strings to known working executors that will not have thread locking issues (until we have a way to use compatible thread locks that match the executor we should likely not allow to many types to be passed in). This will likely get better when the following merges: https://review.openstack.org/#/c/203885/ Change-Id: I2b5c2bdc00e59aca8ba150ef57a4b72c8c671227
This commit is contained in:
parent
7accc78843
commit
3023aeddb3
|
@ -31,7 +31,6 @@ class TestProxyExecutor(testcase.TestCase):
|
|||
try_options = [
|
||||
({'executor': 'sync'}, futurist.SynchronousExecutor),
|
||||
({'executor': 'thread'}, futurist.ThreadPoolExecutor),
|
||||
({'executor': 'greenthread'}, futurist.GreenThreadPoolExecutor),
|
||||
]
|
||||
for options, expected_cls in try_options:
|
||||
executor = utils.ProxyExecutor.build("test", options)
|
||||
|
@ -53,18 +52,6 @@ class TestProxyExecutor(testcase.TestCase):
|
|||
finally:
|
||||
executor.stop()
|
||||
|
||||
def test_given_executor(self):
|
||||
backing_executor = futurist.SynchronousExecutor()
|
||||
options = {'executor': backing_executor}
|
||||
executor = utils.ProxyExecutor.build("test", options)
|
||||
executor.start()
|
||||
try:
|
||||
self.assertIs(backing_executor, executor.executor)
|
||||
finally:
|
||||
executor.stop()
|
||||
# The backing executor should not be shutoff...
|
||||
self.assertTrue(backing_executor.alive)
|
||||
|
||||
def test_fetch_unknown_executor(self):
|
||||
options = {'executor': 'huh'}
|
||||
self.assertRaises(coordination.ToozError,
|
||||
|
|
|
@ -18,7 +18,6 @@ import errno
|
|||
import os
|
||||
|
||||
import futurist
|
||||
from futurist import waiters
|
||||
import msgpack
|
||||
from oslo_serialization import msgpackutils
|
||||
from oslo_utils import encodeutils
|
||||
|
@ -31,8 +30,6 @@ class ProxyExecutor(object):
|
|||
KIND_TO_FACTORY = {
|
||||
'threaded': (lambda:
|
||||
futurist.ThreadPoolExecutor(max_workers=1)),
|
||||
'greenthreaded': (lambda:
|
||||
futurist.GreenThreadPoolExecutor(max_workers=1)),
|
||||
'synchronous': lambda: futurist.SynchronousExecutor(),
|
||||
}
|
||||
|
||||
|
@ -40,61 +37,43 @@ class ProxyExecutor(object):
|
|||
KIND_TO_FACTORY['thread'] = KIND_TO_FACTORY['threaded']
|
||||
KIND_TO_FACTORY['threading'] = KIND_TO_FACTORY['threaded']
|
||||
KIND_TO_FACTORY['sync'] = KIND_TO_FACTORY['synchronous']
|
||||
KIND_TO_FACTORY['greenthread'] = KIND_TO_FACTORY['greenthreaded']
|
||||
KIND_TO_FACTORY['greenthreading'] = KIND_TO_FACTORY['greenthreaded']
|
||||
|
||||
DEFAULT_KIND = 'threaded'
|
||||
|
||||
def __init__(self, driver_name, default_executor_factory, executor=None):
|
||||
def __init__(self, driver_name, default_executor_factory):
|
||||
self.default_executor_factory = default_executor_factory
|
||||
self.driver_name = driver_name
|
||||
self.dispatched = set()
|
||||
self.started = False
|
||||
if executor is None:
|
||||
self.executor = None
|
||||
self.internally_owned = True
|
||||
else:
|
||||
self.executor = executor
|
||||
self.internally_owned = False
|
||||
self.executor = None
|
||||
self.internally_owned = True
|
||||
|
||||
@classmethod
|
||||
def build(cls, driver_name, options):
|
||||
default_executor_fact = cls.KIND_TO_FACTORY[cls.DEFAULT_KIND]
|
||||
executor = None
|
||||
if 'executor' in options:
|
||||
executor = options['executor']
|
||||
if isinstance(executor, six.string_types):
|
||||
try:
|
||||
default_executor_fact = cls.KIND_TO_FACTORY[executor]
|
||||
executor = None
|
||||
except KeyError:
|
||||
executors_known = sorted(list(cls.KIND_TO_FACTORY))
|
||||
raise coordination.ToozError("Unknown executor string"
|
||||
" '%s' accepted values"
|
||||
" are %s" % (executor,
|
||||
executors_known))
|
||||
return cls(driver_name, default_executor_fact, executor=executor)
|
||||
executor_kind = options['executor']
|
||||
try:
|
||||
default_executor_fact = cls.KIND_TO_FACTORY[executor_kind]
|
||||
except KeyError:
|
||||
executors_known = sorted(list(cls.KIND_TO_FACTORY))
|
||||
raise coordination.ToozError("Unknown executor"
|
||||
" '%s' provided, accepted values"
|
||||
" are %s" % (executor_kind,
|
||||
executors_known))
|
||||
return cls(driver_name, default_executor_fact)
|
||||
|
||||
def start(self):
|
||||
if self.started:
|
||||
return
|
||||
if self.internally_owned:
|
||||
self.executor = self.default_executor_factory()
|
||||
self.executor = self.default_executor_factory()
|
||||
self.started = True
|
||||
|
||||
def stop(self):
|
||||
executor = self.executor
|
||||
self.executor = None
|
||||
if executor is not None:
|
||||
executor.shutdown()
|
||||
self.started = False
|
||||
not_done = self.dispatched.copy()
|
||||
if not_done:
|
||||
waiters.wait_for_all(not_done)
|
||||
if self.internally_owned:
|
||||
executor = self.executor
|
||||
self.executor = None
|
||||
if executor is not None:
|
||||
executor.shutdown()
|
||||
|
||||
def _on_done(self, fut):
|
||||
self.dispatched.discard(fut)
|
||||
|
||||
def submit(self, cb, *args, **kwargs):
|
||||
if not self.started:
|
||||
|
@ -102,14 +81,10 @@ class ProxyExecutor(object):
|
|||
" has not been started"
|
||||
% self.driver_name)
|
||||
try:
|
||||
fut = self.executor.submit(cb, *args, **kwargs)
|
||||
return self.executor.submit(cb, *args, **kwargs)
|
||||
except RuntimeError:
|
||||
raise coordination.ToozError("%s driver asynchronous executor has"
|
||||
" been shutdown" % self.driver_name)
|
||||
else:
|
||||
self.dispatched.add(fut)
|
||||
fut.add_done_callback(self._on_done)
|
||||
return fut
|
||||
|
||||
|
||||
def safe_abs_path(rooted_at, *pieces):
|
||||
|
|
Loading…
Reference in New Issue