diff --git a/keystoneauth1/_fair_semaphore.py b/keystoneauth1/_fair_semaphore.py new file mode 100644 index 00000000..e0ca0e44 --- /dev/null +++ b/keystoneauth1/_fair_semaphore.py @@ -0,0 +1,104 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import threading +import time + + +from six.moves import queue + + +class FairSemaphore(object): + """Semaphore class that notifies in order of request. + + We cannot use a normal Semaphore because it doesn't give any ordering, + which could lead to a request starving. Instead, handle them in the + order we receive them. + + :param int concurrency: + How many concurrent threads can have the semaphore at once. + :param float rate_delay: + How long to wait between the start of each thread receiving the + semaphore. + """ + + def __init__(self, concurrency, rate_delay): + self._lock = threading.Lock() + self._concurrency = concurrency + if concurrency: + self._count = 0 + self._queue = queue.Queue() + + self._rate_delay = rate_delay + self._rate_last_ts = time.time() + + def __enter__(self): + """Aquire a semaphore.""" + # If concurrency is None, everyone is free to immediately execute. + if not self._concurrency: + # NOTE: Rate limiting still applies.This will ultimately impact + # concurrency a bit due to the mutex. + with self._lock: + execution_time = self._advance_timer() + else: + execution_time = self._get_ticket() + return self._wait_for_execution(execution_time) + + def _wait_for_execution(self, execution_time): + """Wait until the pre-calculated time to run.""" + wait_time = execution_time - time.time() + if wait_time > 0: + time.sleep(wait_time) + + def _get_ticket(self): + ticket = threading.Event() + with self._lock: + if self._count <= self._concurrency: + # We can execute, no need to wait. Take a ticket and + # move on. + self._count += 1 + return self._advance_timer() + else: + # We need to wait for a ticket before we can execute. + # Put ourselves in the ticket queue to be woken up + # when available. + self._queue.put(ticket) + ticket.wait() + with self._lock: + return self._advance_timer() + + def _advance_timer(self): + """Calculate the time when it's ok to run a command again. + + This runs inside of the mutex, serializing the calculation + of when it's ok to run again and setting _rate_last_ts to that + new time so that the next thread to calculate when it's safe to + run starts from the time that the current thread calculated. + """ + self._rate_last_ts = self._rate_last_ts + self._rate_delay + return self._rate_last_ts + + def __exit__(self, exc_type, exc_value, traceback): + """Release the semaphore.""" + # If concurrency is None, everyone is free to immediately execute + if not self._concurrency: + return + with self._lock: + # If waiters, wake up the next item in the queue (note + # we're under the queue lock so the queue won't change + # under us). + if self._queue.qsize() > 0: + ticket = self._queue.get() + ticket.set() + else: + # Nothing else to do, give our ticket back + self._count -= 1 diff --git a/keystoneauth1/adapter.py b/keystoneauth1/adapter.py index 7360049a..a15fd541 100644 --- a/keystoneauth1/adapter.py +++ b/keystoneauth1/adapter.py @@ -13,6 +13,7 @@ import os import warnings +from keystoneauth1 import _fair_semaphore from keystoneauth1 import session @@ -92,6 +93,16 @@ class Adapter(object): If True, requests returning failing HTTP responses will raise an exception; if False, the response is returned. This can be overridden on a per-request basis via the kwarg of the same name. + :param float rate_limit: + A client-side rate limit to impose on requests made through this + adapter in requests per second. For instance, a rate_limit of 2 + means to allow no more than 2 requests per second, and a rate_limit + of 0.5 means to allow no more than 1 request every two seconds. + (optional, defaults to None, which means no rate limiting will be + applied). + :param int concurrency: + How many simultaneous http requests this Adapter can be used for. + (optional, defaults to None, which means no limit). """ client_name = None @@ -106,7 +117,9 @@ class Adapter(object): global_request_id=None, min_version=None, max_version=None, default_microversion=None, status_code_retries=None, - retriable_status_codes=None, raise_exc=None): + retriable_status_codes=None, raise_exc=None, + rate_limit=None, concurrency=None, + ): if version and (min_version or max_version): raise TypeError( "version is mutually exclusive with min_version and" @@ -144,6 +157,15 @@ class Adapter(object): if client_version: self.client_version = client_version + rate_delay = 0.0 + if rate_limit: + # 1 / rate converts from requests per second to delay + # between requests needed to achieve that rate. + rate_delay = 1.0 / rate_limit + + self._rate_semaphore = _fair_semaphore.FairSemaphore( + concurrency, rate_delay) + def _set_endpoint_filter_kwargs(self, kwargs): if self.service_type: kwargs.setdefault('service_type', self.service_type) @@ -210,6 +232,8 @@ class Adapter(object): if self.raise_exc is not None: kwargs.setdefault('raise_exc', self.raise_exc) + kwargs.setdefault('rate_semaphore', self._rate_semaphore) + return self.session.request(url, method, **kwargs) def get_token(self, auth=None): diff --git a/keystoneauth1/session.py b/keystoneauth1/session.py index 2a5d4d05..2842f5d9 100644 --- a/keystoneauth1/session.py +++ b/keystoneauth1/session.py @@ -99,6 +99,18 @@ def _sanitize_headers(headers): return str_dict +class NoOpSemaphore(object): + """Empty context manager for use as a default semaphore.""" + + def __enter__(self): + """Enter the context manager and do nothing.""" + pass + + def __exit__(self, exc_type, exc_value, traceback): + """Exit the context manager and do nothing.""" + pass + + class _JSONEncoder(json.JSONEncoder): def default(self, o): @@ -285,6 +297,9 @@ class Session(object): :param bool collect_timing: Whether or not to collect per-method timing information for each API call. (optional, defaults to False) + :param rate_semaphore: Semaphore to be used to control concurrency + and rate limiting of requests. (optional, + defaults to no concurrency or rate control) """ user_agent = None @@ -298,7 +313,7 @@ class Session(object): redirect=_DEFAULT_REDIRECT_LIMIT, additional_headers=None, app_name=None, app_version=None, additional_user_agent=None, discovery_cache=None, split_loggers=None, - collect_timing=False): + collect_timing=False, rate_semaphore=None): self.auth = auth self.session = _construct_session(session) @@ -320,6 +335,7 @@ class Session(object): self._split_loggers = split_loggers self._collect_timing = collect_timing self._api_times = [] + self._rate_semaphore = rate_semaphore or NoOpSemaphore() if timeout is not None: self.timeout = float(timeout) @@ -561,7 +577,7 @@ class Session(object): allow=None, client_name=None, client_version=None, microversion=None, microversion_service_type=None, status_code_retries=0, retriable_status_codes=None, - **kwargs): + rate_semaphore=None, **kwargs): """Send an HTTP request with the specified characteristics. Wrapper around `requests.Session.request` to handle tasks such as @@ -647,6 +663,9 @@ class Session(object): should be retried (optional, defaults to HTTP 503, has no effect when status_code_retries is 0). + :param rate_semaphore: Semaphore to be used to control concurrency + and rate limiting of requests. (optional, + defaults to no concurrency or rate control) :param kwargs: any other parameter that can be passed to :meth:`requests.Session.request` (such as `headers`). Except: @@ -670,6 +689,7 @@ class Session(object): logger = logger or utils.get_logger(__name__) # HTTP 503 - Service Unavailable retriable_status_codes = retriable_status_codes or [503] + rate_semaphore = rate_semaphore or self._rate_semaphore headers = kwargs.setdefault('headers', dict()) if microversion: @@ -797,7 +817,8 @@ class Session(object): send = functools.partial(self._send_request, url, method, redirect, log, logger, split_loggers, connect_retries, - status_code_retries, retriable_status_codes) + status_code_retries, retriable_status_codes, + rate_semaphore) try: connection_params = self.get_auth_connection_params(auth=auth) @@ -885,8 +906,9 @@ class Session(object): def _send_request(self, url, method, redirect, log, logger, split_loggers, connect_retries, status_code_retries, - retriable_status_codes, connect_retry_delay=0.5, - status_code_retry_delay=0.5, **kwargs): + retriable_status_codes, rate_semaphore, + connect_retry_delay=0.5, status_code_retry_delay=0.5, + **kwargs): # NOTE(jamielennox): We handle redirection manually because the # requests lib follows some browser patterns where it will redirect # POSTs as GETs for certain statuses which is not want we want for an @@ -900,7 +922,8 @@ class Session(object): try: try: - resp = self.session.request(method, url, **kwargs) + with rate_semaphore: + resp = self.session.request(method, url, **kwargs) except requests.exceptions.SSLError as e: msg = 'SSL exception connecting to %(url)s: %(error)s' % { 'url': url, 'error': e} @@ -934,6 +957,7 @@ class Session(object): url, method, redirect, log, logger, split_loggers, status_code_retries=status_code_retries, retriable_status_codes=retriable_status_codes, + rate_semaphore=rate_semaphore, connect_retries=connect_retries - 1, connect_retry_delay=connect_retry_delay * 2, **kwargs) @@ -964,6 +988,7 @@ class Session(object): # This request actually worked so we can reset the delay count. new_resp = self._send_request( location, method, redirect, log, logger, split_loggers, + rate_semaphore=rate_semaphore, connect_retries=connect_retries, status_code_retries=status_code_retries, retriable_status_codes=retriable_status_codes, @@ -989,6 +1014,7 @@ class Session(object): connect_retries=connect_retries, status_code_retries=status_code_retries - 1, retriable_status_codes=retriable_status_codes, + rate_semaphore=rate_semaphore, status_code_retry_delay=status_code_retry_delay * 2, **kwargs) diff --git a/keystoneauth1/tests/unit/test_fair_sempahore.py b/keystoneauth1/tests/unit/test_fair_sempahore.py new file mode 100644 index 00000000..709968ae --- /dev/null +++ b/keystoneauth1/tests/unit/test_fair_sempahore.py @@ -0,0 +1,86 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from threading import Thread +from timeit import default_timer as timer + +import mock +from six.moves import queue +import testtools + +from keystoneauth1 import _fair_semaphore + + +class SemaphoreTests(testtools.TestCase): + + def _thread_worker(self): + while True: + # get returns the Item, but we don't care about the value so we + # purposely don't assign it to anything. + self.q.get() + with self.s: + self.mock_payload.do_something() + self.q.task_done() + + # Have 5 threads do 10 different "things" coordinated by the fair + # semaphore. + def _concurrency_core(self, concurrency, delay): + self.s = _fair_semaphore.FairSemaphore(concurrency, delay) + + self.q = queue.Queue() + for i in range(5): + t = Thread(target=self._thread_worker) + t.daemon = True + t.start() + + for item in range(0, 10): + self.q.put(item) + + self.q.join() + + def setUp(self): + super(SemaphoreTests, self).setUp() + self.mock_payload = mock.Mock() + + # We should be waiting at least 0.1s between operations, so + # the 10 operations must take at *least* 1 second + def test_semaphore_no_concurrency(self): + start = timer() + self._concurrency_core(None, 0.1) + end = timer() + self.assertTrue((end - start) > 1.0) + self.assertEqual(self.mock_payload.do_something.call_count, 10) + + def test_semaphore_single_concurrency(self): + start = timer() + self._concurrency_core(1, 0.1) + end = timer() + self.assertTrue((end - start) > 1.0) + self.assertEqual(self.mock_payload.do_something.call_count, 10) + + def test_semaphore_multiple_concurrency(self): + start = timer() + self._concurrency_core(5, 0.1) + end = timer() + self.assertTrue((end - start) > 1.0) + self.assertEqual(self.mock_payload.do_something.call_count, 10) + + # do some high speed tests; I don't think we can really assert + # much about these other than they don't deadlock... + def test_semaphore_fast_no_concurrency(self): + self._concurrency_core(None, 0.0) + + def test_semaphore_fast_single_concurrency(self): + self._concurrency_core(1, 0.0) + + def test_semaphore_fast_multiple_concurrency(self): + self._concurrency_core(5, 0.0) diff --git a/keystoneauth1/tests/unit/test_session.py b/keystoneauth1/tests/unit/test_session.py index 2927740349..7dc8e55b 100644 --- a/keystoneauth1/tests/unit/test_session.py +++ b/keystoneauth1/tests/unit/test_session.py @@ -1565,6 +1565,7 @@ class AdapterTest(utils.TestCase): with mock.patch.object(sess, 'request') as m: adapter.Adapter(sess, **adap_kwargs).get(url, **get_kwargs) m.assert_called_once_with(url, 'GET', endpoint_filter={}, + rate_semaphore=mock.ANY, **exp_kwargs) # No default_microversion in Adapter, no microversion in get() @@ -1588,6 +1589,7 @@ class AdapterTest(utils.TestCase): with mock.patch.object(sess, 'request') as m: adapter.Adapter(sess, **adap_kwargs).get(url, **get_kwargs) m.assert_called_once_with(url, 'GET', endpoint_filter={}, + rate_semaphore=mock.ANY, **exp_kwargs) # No raise_exc in Adapter or get() diff --git a/releasenotes/notes/client-side-rate-limiting-dec43fc9b54f5b70.yaml b/releasenotes/notes/client-side-rate-limiting-dec43fc9b54f5b70.yaml new file mode 100644 index 00000000..eb3e658e --- /dev/null +++ b/releasenotes/notes/client-side-rate-limiting-dec43fc9b54f5b70.yaml @@ -0,0 +1,10 @@ +--- +features: + - | + Support added for client-side rate limiting. Two new parameters now + exist for ``keystoneauth1.adapter.Adapter``. ``rate`` expresses a + maximum rate at which to execute requests. ``parallel_limit`` allows + for the creation of a semaphore to control the maximum number of + requests that can be active at any one given point in time. + Both default to ``None`` which has the normal behavior or not limiting + requests in any manner.