Add support for per-service rate limits

Rate limits on the server-side are per-service, but the rate limit in
the TaskManager is a single rate. Add support for a dict of rate limits,
keyed by service-type.

The primary user interface should be passing rate to the Connection
constructor. That takes calls-per-second for both scalar and dict
versions of its rate parameter.

Depends-On: https://review.openstack.org/612168/
Change-Id: If0ff77b43adc1f6f0bb1e7c08908930a95508b31
This commit is contained in:
Monty Taylor 2018-09-22 07:59:04 -05:00
parent 448cda91e3
commit 09b10cfecb
No known key found for this signature in database
GPG Key ID: 7BAE94BC7141A594
8 changed files with 118 additions and 15 deletions

View File

@ -116,11 +116,18 @@ class OpenStackSDKAdapter(adapter.Adapter):
This allows using the nodepool MultiThreaded Rate Limiting TaskManager. This allows using the nodepool MultiThreaded Rate Limiting TaskManager.
""" """
def __init__(self, session=None, task_manager=None, *args, **kwargs): def __init__(
self, session=None,
task_manager=None,
rate_limit=None, concurrency=None,
*args, **kwargs):
super(OpenStackSDKAdapter, self).__init__( super(OpenStackSDKAdapter, self).__init__(
session=session, *args, **kwargs) session=session, *args, **kwargs)
if not task_manager: if not task_manager:
task_manager = _task_manager.TaskManager(name=self.service_type) task_manager = _task_manager.TaskManager(
name=self.service_type,
rate=rate_limit,
workers=concurrency)
task_manager.start() task_manager.start()
self.task_manager = task_manager self.task_manager = task_manager
@ -143,6 +150,7 @@ class OpenStackSDKAdapter(adapter.Adapter):
ret = self.task_manager.submit_function( ret = self.task_manager.submit_function(
request_method, run_async=True, name=name, request_method, run_async=True, name=name,
connect_retries=connect_retries, raise_exc=raise_exc, connect_retries=connect_retries, raise_exc=raise_exc,
tag=self.service_type,
**kwargs) **kwargs)
if run_async: if run_async:
return ret return ret

View File

@ -15,7 +15,6 @@
import copy import copy
import warnings import warnings
from keystoneauth1 import adapter
from keystoneauth1 import discover from keystoneauth1 import discover
import keystoneauth1.exceptions.catalog import keystoneauth1.exceptions.catalog
from keystoneauth1 import session as ks_session from keystoneauth1 import session as ks_session
@ -23,6 +22,7 @@ import os_service_types
import requestsexceptions import requestsexceptions
from six.moves import urllib from six.moves import urllib
from openstack import _adapter
from openstack import version as openstack_version from openstack import version as openstack_version
from openstack import _log from openstack import _log
from openstack.config import _util from openstack.config import _util
@ -247,6 +247,17 @@ class CloudRegion(object):
value = converter(value) value = converter(value)
return value return value
def _get_service_config(self, key, service_type):
config_dict = self.config.get(key)
if not config_dict:
return None
if not isinstance(config_dict, dict):
return config_dict
for st in self._service_type_manager.get_all_types(service_type):
if st in config_dict:
return config_dict[st]
def get_interface(self, service_type=None): def get_interface(self, service_type=None):
return self._get_config( return self._get_config(
'interface', service_type, fallback_to_unprefixed=True) 'interface', service_type, fallback_to_unprefixed=True)
@ -438,7 +449,8 @@ class CloudRegion(object):
return interface_versions.get(service_type, []) return interface_versions.get(service_type, [])
def get_session_client( def get_session_client(
self, service_type, version=None, constructor=adapter.Adapter, self, service_type, version=None,
constructor=_adapter.OpenStackSDKAdapter,
**kwargs): **kwargs):
"""Return a prepped keystoneauth Adapter for a given service. """Return a prepped keystoneauth Adapter for a given service.
@ -498,6 +510,8 @@ class CloudRegion(object):
max_version=max_api_version, max_version=max_api_version,
endpoint_override=endpoint_override, endpoint_override=endpoint_override,
default_microversion=version_request.default_microversion, default_microversion=version_request.default_microversion,
rate_limit=self.get_rate_limit(service_type),
concurrency=self.get_concurrency(service_type),
**kwargs) **kwargs)
if version_request.default_microversion: if version_request.default_microversion:
default_microversion = version_request.default_microversion default_microversion = version_request.default_microversion
@ -724,3 +738,11 @@ class CloudRegion(object):
def get_password_callback(self): def get_password_callback(self):
return self._password_callback return self._password_callback
def get_rate_limit(self, service_type=None):
return self._get_service_config(
'rate_limit', service_type=service_type)
def get_concurrency(self, service_type=None):
return self._get_service_config(
'concurrency', service_type=service_type)

View File

@ -220,6 +220,7 @@ class Connection(six.with_metaclass(_meta.ConnectionMeta,
strict=False, strict=False,
use_direct_get=False, use_direct_get=False,
task_manager=None, task_manager=None,
rate_limit=None,
**kwargs): **kwargs):
"""Create a connection to a cloud. """Create a connection to a cloud.
@ -262,6 +263,12 @@ class Connection(six.with_metaclass(_meta.ConnectionMeta,
Defaults to None which causes a direct-action Task Manager to be Defaults to None which causes a direct-action Task Manager to be
used. used.
:type manager: :class:`~openstack.task_manager.TaskManager` :type manager: :class:`~openstack.task_manager.TaskManager`
:param rate_limit:
Client-side rate limit, expressed in calls per second. The
parameter can either be a single float, or it can be a dict with
keys as service-type and values as floats expressing the calls
per second for that service. Defaults to None, which means no
rate-limiting is performed.
:param kwargs: If a config is not provided, the rest of the parameters :param kwargs: If a config is not provided, the rest of the parameters
provided are assumed to be arguments to be passed to the provided are assumed to be arguments to be passed to the
CloudRegion contructor. CloudRegion contructor.
@ -294,7 +301,8 @@ class Connection(six.with_metaclass(_meta.ConnectionMeta,
self.task_manager = task_manager self.task_manager = task_manager
else: else:
self.task_manager = _task_manager.TaskManager( self.task_manager = _task_manager.TaskManager(
self.config.full_name) self.config.full_name,
rate=rate_limit)
self.task_manager.start() self.task_manager.start()
self._session = None self._session = None

View File

@ -45,7 +45,9 @@ class Task(object):
the main payload at execution time. the main payload at execution time.
""" """
def __init__(self, main=None, name=None, run_async=False, *args, **kwargs): def __init__(
self, main=None, name=None, run_async=False,
tag=None, *args, **kwargs):
self._exception = None self._exception = None
self._traceback = None self._traceback = None
self._result = None self._result = None
@ -56,6 +58,7 @@ class Task(object):
self.args = args self.args = args
self.kwargs = kwargs self.kwargs = kwargs
self.name = name or type(self).__name__ self.name = name or type(self).__name__
self.tag = tag
def main(self): def main(self):
return self._main(*self.args, **self.kwargs) return self._main(*self.args, **self.kwargs)
@ -103,12 +106,22 @@ class TaskManager(object):
self.daemon = True self.daemon = True
self.queue = queue.Queue() self.queue = queue.Queue()
self._running = True self._running = True
if rate is not None: if isinstance(rate, dict):
rate = float(rate) self._waits = {}
self.rate = rate for (k, v) in rate.items():
if v:
self._waits[k] = 1.0 / v
else:
if rate:
self._waits = {None: 1.0 / rate}
else:
self._waits = {}
self._thread = threading.Thread(name=name, target=self.run) self._thread = threading.Thread(name=name, target=self.run)
self._thread.daemon = True self._thread.daemon = True
def _get_wait(self, tag):
return self._waits.get(tag, self._waits.get(None))
@property @property
def executor(self): def executor(self):
if not self._executor: if not self._executor:
@ -129,7 +142,7 @@ class TaskManager(object):
self._thread.join() self._thread.join()
def run(self): def run(self):
last_ts = 0 last_ts_dict = {}
try: try:
while True: while True:
task = self.queue.get() task = self.queue.get()
@ -137,12 +150,15 @@ class TaskManager(object):
if not self._running: if not self._running:
break break
continue continue
if self.rate: wait = self._get_wait(task.tag)
if wait:
last_ts = last_ts_dict.get(task.tag, 0)
while True: while True:
delta = time.time() - last_ts delta = time.time() - last_ts
if delta >= self.rate: if delta >= wait:
break break
time.sleep(self.rate - delta) time.sleep(wait - delta)
last_ts_dict[task.tag] = time.time()
self._log.debug( self._log.debug(
"TaskManager {name} queue size: {size})".format( "TaskManager {name} queue size: {size})".format(
name=self.name, name=self.name,
@ -171,12 +187,14 @@ class TaskManager(object):
return task.wait() return task.wait()
def submit_function( def submit_function(
self, method, name=None, run_async=False, *args, **kwargs): self, method, name=None, run_async=False, tag=None,
*args, **kwargs):
""" Allows submitting an arbitrary method for work. """ Allows submitting an arbitrary method for work.
:param method: Callable to run in the TaskManager. :param method: Callable to run in the TaskManager.
:param str name: Name to use for the generated Task object. :param str name: Name to use for the generated Task object.
:param bool run_async: Whether to run this task async or not. :param bool run_async: Whether to run this task async or not.
:param str tag: Named rate-limiting context for the task.
:param args: positional arguments to pass to the method when it runs. :param args: positional arguments to pass to the method when it runs.
:param kwargs: keyword arguments to pass to the method when it runs. :param kwargs: keyword arguments to pass to the method when it runs.
""" """
@ -185,10 +203,12 @@ class TaskManager(object):
self.executor.submit, method, *args, **kwargs) self.executor.submit, method, *args, **kwargs)
task = Task( task = Task(
main=payload, name=name, main=payload, name=name,
run_async=run_async) run_async=run_async,
tag=tag)
else: else:
task = Task( task = Task(
main=method, name=name, main=method, name=name,
tag=tag,
*args, **kwargs) *args, **kwargs)
return self.submit_task(task) return self.submit_task(task)

View File

@ -124,6 +124,8 @@ class TestCase(base.TestCase):
self.strict_cloud = openstack.connection.Connection( self.strict_cloud = openstack.connection.Connection(
config=self.cloud_config, config=self.cloud_config,
strict=True) strict=True)
self.addCleanup(self.cloud.task_manager.stop)
self.addCleanup(self.strict_cloud.task_manager.stop)
# FIXME(notmorgan): Convert the uri_registry, discovery.json, and # FIXME(notmorgan): Convert the uri_registry, discovery.json, and
# use of keystone_v3/v2 to a proper fixtures.Fixture. For now this # use of keystone_v3/v2 to a proper fixtures.Fixture. For now this

View File

@ -63,6 +63,25 @@ class TaskTestSet(task_manager.Task):
return set([1, 2]) return set([1, 2])
class TestRateTransforms(base.TestCase):
def test_rate_parameter_scalar(self):
manager = task_manager.TaskManager(name='test', rate=0.1234)
self.assertEqual(1 / 0.1234, manager._get_wait('compute'))
self.assertEqual(1 / 0.1234, manager._get_wait(None))
def test_rate_parameter_dict(self):
manager = task_manager.TaskManager(
name='test',
rate={
'compute': 20,
'network': 10,
})
self.assertEqual(1 / 20, manager._get_wait('compute'))
self.assertEqual(1 / 10, manager._get_wait('network'))
self.assertIsNone(manager._get_wait('object-store'))
class TestTaskManager(base.TestCase): class TestTaskManager(base.TestCase):
def setUp(self): def setUp(self):

View File

@ -84,6 +84,22 @@ class TestConnection(base.TestCase):
self.assertEqual(mock_session, conn.session) self.assertEqual(mock_session, conn.session)
self.assertEqual('auth.example.com', conn.config.name) self.assertEqual('auth.example.com', conn.config.name)
def test_task_manager_rate_scalar(self):
conn = connection.Connection(cloud='sample', rate_limit=20)
self.assertEqual(1 / 20, conn.task_manager._get_wait('object-store'))
self.assertEqual(1 / 20, conn.task_manager._get_wait(None))
def test_task_manager_rate_dict(self):
conn = connection.Connection(
cloud='sample',
rate_limit={
'compute': 20,
'network': 10,
})
self.assertEqual(1 / 20, conn.task_manager._get_wait('compute'))
self.assertEqual(1 / 10, conn.task_manager._get_wait('network'))
self.assertIsNone(conn.task_manager._get_wait('object-store'))
def test_create_session(self): def test_create_session(self):
conn = connection.Connection(cloud='sample') conn = connection.Connection(cloud='sample')
self.assertIsNotNone(conn) self.assertIsNotNone(conn)

View File

@ -0,0 +1,8 @@
---
features:
- |
Client-side rate limiting is now directly exposed via ``rate_limit``
and ``concurrency`` parameters. A single value can be given that applies
to all services, or a dict of service-type and value if different
client-side rate or concurrency limits should be used for different
services.