Switch to futurist for concurrency

As we're being used inside of services more and those services
frequently use things like eventlet, allow people to pass in
an Executor object from futurist to override the default
ThreadPoolExecutor object.

Change-Id: I6c04defc28998d49199383a6cc6d5f5611a99e25
This commit is contained in:
Monty Taylor 2020-03-04 12:54:57 -06:00
parent f34d399f52
commit b6a22e3749
5 changed files with 25 additions and 15 deletions

View File

@ -9,6 +9,7 @@ extras==1.0.0
fixtures==3.0.0 fixtures==3.0.0
future==0.16.0 future==0.16.0
futures==3.0.0 futures==3.0.0
futurist==2.1.0
ipaddress==1.0.17 ipaddress==1.0.17
iso8601==0.1.11 iso8601==0.1.11
jmespath==0.9.0 jmespath==0.9.0

View File

@ -45,9 +45,6 @@ OBJECT_CONTAINER_ACLS = {
class ObjectStoreCloudMixin(_normalize.Normalizer): class ObjectStoreCloudMixin(_normalize.Normalizer):
def __init__(self):
self.__pool_executor = None
@property @property
def _object_store_client(self): def _object_store_client(self):
if 'object-store' not in self._raw_clients: if 'object-store' not in self._raw_clients:
@ -55,16 +52,6 @@ class ObjectStoreCloudMixin(_normalize.Normalizer):
self._raw_clients['object-store'] = raw_client self._raw_clients['object-store'] = raw_client
return self._raw_clients['object-store'] return self._raw_clients['object-store']
@property
def _pool_executor(self):
if not self.__pool_executor:
# TODO(mordred) Make this configurable - and probably use Futurist
# instead of concurrent.futures so that people using Eventlet will
# be happier.
self.__pool_executor = concurrent.futures.ThreadPoolExecutor(
max_workers=5)
return self.__pool_executor
def list_containers(self, full_listing=True, prefix=None): def list_containers(self, full_listing=True, prefix=None):
"""List containers. """List containers.

View File

@ -179,6 +179,7 @@ Additional information about the services can be found in the
import warnings import warnings
import weakref import weakref
import futurist
import keystoneauth1.exceptions import keystoneauth1.exceptions
import requestsexceptions import requestsexceptions
import six import six
@ -276,6 +277,7 @@ class Connection(
service_types=None, service_types=None,
global_request_id=None, global_request_id=None,
strict_proxies=False, strict_proxies=False,
pool_executor=None,
**kwargs): **kwargs):
"""Create a connection to a cloud. """Create a connection to a cloud.
@ -343,6 +345,11 @@ class Connection(
where it can be expected that the deployer config is correct and where it can be expected that the deployer config is correct and
errors should be reported immediately. errors should be reported immediately.
Default false. Default false.
:param pool_executor:
:type pool_executor: :class:`~futurist.Executor`
A futurist ``Executor`` object to be used for concurrent background
activities. Defaults to None in which case a ThreadPoolExecutor
will be created if needed.
:param kwargs: If a config is not provided, the rest of the parameters :param kwargs: If a config is not provided, the rest of the parameters
provided are assumed to be arguments to be passed to the provided are assumed to be arguments to be passed to the
CloudRegion constructor. CloudRegion constructor.
@ -378,7 +385,7 @@ class Connection(
self._session = None self._session = None
self._proxies = {} self._proxies = {}
self.__pool_executor = None self.__pool_executor = pool_executor
self._global_request_id = global_request_id self._global_request_id = global_request_id
self.use_direct_get = use_direct_get self.use_direct_get = use_direct_get
self.strict_mode = strict self.strict_mode = strict
@ -491,6 +498,13 @@ class Connection(
except keystoneauth1.exceptions.ClientException as e: except keystoneauth1.exceptions.ClientException as e:
raise exceptions.raise_from_response(e.response) raise exceptions.raise_from_response(e.response)
@property
def _pool_executor(self):
if not self.__pool_executor:
self.__pool_executor = futurist.ThreadPoolExecutor(
max_workers=5)
return self.__pool_executor
def close(self): def close(self):
"""Release any resources held open.""" """Release any resources held open."""
if self.__pool_executor: if self.__pool_executor:

View File

@ -0,0 +1,8 @@
---
features:
- |
Switched to the ``futurist`` library for managing background
concurrent tasks. Introduced a new ``pool_executor`` parameter
to `Connection` that allows passing any any futurist Executor
for cases where the default ``ThreadPoolExecutor`` would not
be appropriate.

View File

@ -14,7 +14,7 @@ munch>=2.1.0 # MIT
decorator>=4.4.1 # BSD decorator>=4.4.1 # BSD
jmespath>=0.9.0 # MIT jmespath>=0.9.0 # MIT
ipaddress>=1.0.17;python_version<'3.3' # PSF ipaddress>=1.0.17;python_version<'3.3' # PSF
futures>=3.0.0;python_version=='2.7' or python_version=='2.6' # BSD futurist>=2.1.0 # Apache-2.0
iso8601>=0.1.11 # MIT iso8601>=0.1.11 # MIT
netifaces>=0.10.4 # MIT netifaces>=0.10.4 # MIT