Browse Source

Merge "Add support for client-side rate limiting"

Zuul 1 month ago
parent
commit
0828f7048e

+ 104
- 0
keystoneauth1/_fair_semaphore.py View File

@@ -0,0 +1,104 @@
1
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
2
+# not use this file except in compliance with the License. You may obtain
3
+# a copy of the License at
4
+#
5
+#      http://www.apache.org/licenses/LICENSE-2.0
6
+#
7
+# Unless required by applicable law or agreed to in writing, software
8
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10
+# License for the specific language governing permissions and limitations
11
+# under the License.
12
+
13
+import threading
14
+import time
15
+
16
+
17
+from six.moves import queue
18
+
19
+
20
+class FairSemaphore(object):
21
+    """Semaphore class that notifies in order of request.
22
+
23
+    We cannot use a normal Semaphore because it doesn't give any ordering,
24
+    which could lead to a request starving. Instead, handle them in the
25
+    order we receive them.
26
+
27
+    :param int concurrency:
28
+        How many concurrent threads can have the semaphore at once.
29
+    :param float rate_delay:
30
+        How long to wait between the start of each thread receiving the
31
+        semaphore.
32
+    """
33
+
34
+    def __init__(self, concurrency, rate_delay):
35
+        self._lock = threading.Lock()
36
+        self._concurrency = concurrency
37
+        if concurrency:
38
+            self._count = 0
39
+            self._queue = queue.Queue()
40
+
41
+        self._rate_delay = rate_delay
42
+        self._rate_last_ts = time.time()
43
+
44
+    def __enter__(self):
45
+        """Aquire a semaphore."""
46
+        # If concurrency is None, everyone is free to immediately execute.
47
+        if not self._concurrency:
48
+            # NOTE: Rate limiting still applies.This will ultimately impact
49
+            # concurrency a bit due to the mutex.
50
+            with self._lock:
51
+                execution_time = self._advance_timer()
52
+        else:
53
+            execution_time = self._get_ticket()
54
+        return self._wait_for_execution(execution_time)
55
+
56
+    def _wait_for_execution(self, execution_time):
57
+        """Wait until the pre-calculated time to run."""
58
+        wait_time = execution_time - time.time()
59
+        if wait_time > 0:
60
+            time.sleep(wait_time)
61
+
62
+    def _get_ticket(self):
63
+        ticket = threading.Event()
64
+        with self._lock:
65
+            if self._count <= self._concurrency:
66
+                # We can execute, no need to wait. Take a ticket and
67
+                # move on.
68
+                self._count += 1
69
+                return self._advance_timer()
70
+            else:
71
+                # We need to wait for a ticket before we can execute.
72
+                # Put ourselves in the ticket queue to be woken up
73
+                # when available.
74
+                self._queue.put(ticket)
75
+        ticket.wait()
76
+        with self._lock:
77
+            return self._advance_timer()
78
+
79
+    def _advance_timer(self):
80
+        """Calculate the time when it's ok to run a command again.
81
+
82
+        This runs inside of the mutex, serializing the calculation
83
+        of when it's ok to run again and setting _rate_last_ts to that
84
+        new time so that the next thread to calculate when it's safe to
85
+        run starts from the time that the current thread calculated.
86
+        """
87
+        self._rate_last_ts = self._rate_last_ts + self._rate_delay
88
+        return self._rate_last_ts
89
+
90
+    def __exit__(self, exc_type, exc_value, traceback):
91
+        """Release the semaphore."""
92
+        # If concurrency is None, everyone is free to immediately execute
93
+        if not self._concurrency:
94
+            return
95
+        with self._lock:
96
+            # If waiters, wake up the next item in the queue (note
97
+            # we're under the queue lock so the queue won't change
98
+            # under us).
99
+            if self._queue.qsize() > 0:
100
+                ticket = self._queue.get()
101
+                ticket.set()
102
+            else:
103
+                # Nothing else to do, give our ticket back
104
+                self._count -= 1

+ 25
- 1
keystoneauth1/adapter.py View File

@@ -13,6 +13,7 @@
13 13
 import os
14 14
 import warnings
15 15
 
16
+from keystoneauth1 import _fair_semaphore
16 17
 from keystoneauth1 import session
17 18
 
18 19
 
@@ -92,6 +93,16 @@ class Adapter(object):
92 93
         If True, requests returning failing HTTP responses will raise an
93 94
         exception; if False, the response is returned. This can be
94 95
         overridden on a per-request basis via the kwarg of the same name.
96
+    :param float rate_limit:
97
+        A client-side rate limit to impose on requests made through this
98
+        adapter in requests per second. For instance, a rate_limit of 2
99
+        means to allow no more than 2 requests per second, and a rate_limit
100
+        of 0.5 means to allow no more than 1 request every two seconds.
101
+        (optional, defaults to None, which means no rate limiting will be
102
+        applied).
103
+    :param int concurrency:
104
+        How many simultaneous http requests this Adapter can be used for.
105
+        (optional, defaults to None, which means no limit).
95 106
     """
96 107
 
97 108
     client_name = None
@@ -106,7 +117,9 @@ class Adapter(object):
106 117
                  global_request_id=None,
107 118
                  min_version=None, max_version=None,
108 119
                  default_microversion=None, status_code_retries=None,
109
-                 retriable_status_codes=None, raise_exc=None):
120
+                 retriable_status_codes=None, raise_exc=None,
121
+                 rate_limit=None, concurrency=None,
122
+                 ):
110 123
         if version and (min_version or max_version):
111 124
             raise TypeError(
112 125
                 "version is mutually exclusive with min_version and"
@@ -144,6 +157,15 @@ class Adapter(object):
144 157
         if client_version:
145 158
             self.client_version = client_version
146 159
 
160
+        rate_delay = 0.0
161
+        if rate_limit:
162
+            # 1 / rate converts from requests per second to delay
163
+            # between requests needed to achieve that rate.
164
+            rate_delay = 1.0 / rate_limit
165
+
166
+        self._rate_semaphore = _fair_semaphore.FairSemaphore(
167
+            concurrency, rate_delay)
168
+
147 169
     def _set_endpoint_filter_kwargs(self, kwargs):
148 170
         if self.service_type:
149 171
             kwargs.setdefault('service_type', self.service_type)
@@ -210,6 +232,8 @@ class Adapter(object):
210 232
         if self.raise_exc is not None:
211 233
             kwargs.setdefault('raise_exc', self.raise_exc)
212 234
 
235
+        kwargs.setdefault('rate_semaphore', self._rate_semaphore)
236
+
213 237
         return self.session.request(url, method, **kwargs)
214 238
 
215 239
     def get_token(self, auth=None):

+ 32
- 6
keystoneauth1/session.py View File

@@ -99,6 +99,18 @@ def _sanitize_headers(headers):
99 99
     return str_dict
100 100
 
101 101
 
102
+class NoOpSemaphore(object):
103
+    """Empty context manager for use as a default semaphore."""
104
+
105
+    def __enter__(self):
106
+        """Enter the context manager and do nothing."""
107
+        pass
108
+
109
+    def __exit__(self, exc_type, exc_value, traceback):
110
+        """Exit the context manager and do nothing."""
111
+        pass
112
+
113
+
102 114
 class _JSONEncoder(json.JSONEncoder):
103 115
 
104 116
     def default(self, o):
@@ -285,6 +297,9 @@ class Session(object):
285 297
     :param bool collect_timing: Whether or not to collect per-method timing
286 298
                                 information for each API call. (optional,
287 299
                                 defaults to False)
300
+    :param rate_semaphore: Semaphore to be used to control concurrency
301
+                           and rate limiting of requests. (optional,
302
+                           defaults to no concurrency or rate control)
288 303
     """
289 304
 
290 305
     user_agent = None
@@ -298,7 +313,7 @@ class Session(object):
298 313
                  redirect=_DEFAULT_REDIRECT_LIMIT, additional_headers=None,
299 314
                  app_name=None, app_version=None, additional_user_agent=None,
300 315
                  discovery_cache=None, split_loggers=None,
301
-                 collect_timing=False):
316
+                 collect_timing=False, rate_semaphore=None):
302 317
 
303 318
         self.auth = auth
304 319
         self.session = _construct_session(session)
@@ -320,6 +335,7 @@ class Session(object):
320 335
         self._split_loggers = split_loggers
321 336
         self._collect_timing = collect_timing
322 337
         self._api_times = []
338
+        self._rate_semaphore = rate_semaphore or NoOpSemaphore()
323 339
 
324 340
         if timeout is not None:
325 341
             self.timeout = float(timeout)
@@ -561,7 +577,7 @@ class Session(object):
561 577
                 allow=None, client_name=None, client_version=None,
562 578
                 microversion=None, microversion_service_type=None,
563 579
                 status_code_retries=0, retriable_status_codes=None,
564
-                **kwargs):
580
+                rate_semaphore=None, **kwargs):
565 581
         """Send an HTTP request with the specified characteristics.
566 582
 
567 583
         Wrapper around `requests.Session.request` to handle tasks such as
@@ -647,6 +663,9 @@ class Session(object):
647 663
                                             should be retried (optional,
648 664
                                             defaults to HTTP 503, has no effect
649 665
                                             when status_code_retries is 0).
666
+        :param rate_semaphore: Semaphore to be used to control concurrency
667
+                               and rate limiting of requests. (optional,
668
+                               defaults to no concurrency or rate control)
650 669
         :param kwargs: any other parameter that can be passed to
651 670
                        :meth:`requests.Session.request` (such as `headers`).
652 671
                        Except:
@@ -670,6 +689,7 @@ class Session(object):
670 689
         logger = logger or utils.get_logger(__name__)
671 690
         # HTTP 503 - Service Unavailable
672 691
         retriable_status_codes = retriable_status_codes or [503]
692
+        rate_semaphore = rate_semaphore or self._rate_semaphore
673 693
 
674 694
         headers = kwargs.setdefault('headers', dict())
675 695
         if microversion:
@@ -797,7 +817,8 @@ class Session(object):
797 817
         send = functools.partial(self._send_request,
798 818
                                  url, method, redirect, log, logger,
799 819
                                  split_loggers, connect_retries,
800
-                                 status_code_retries, retriable_status_codes)
820
+                                 status_code_retries, retriable_status_codes,
821
+                                 rate_semaphore)
801 822
 
802 823
         try:
803 824
             connection_params = self.get_auth_connection_params(auth=auth)
@@ -885,8 +906,9 @@ class Session(object):
885 906
 
886 907
     def _send_request(self, url, method, redirect, log, logger, split_loggers,
887 908
                       connect_retries, status_code_retries,
888
-                      retriable_status_codes, connect_retry_delay=0.5,
889
-                      status_code_retry_delay=0.5, **kwargs):
909
+                      retriable_status_codes, rate_semaphore,
910
+                      connect_retry_delay=0.5, status_code_retry_delay=0.5,
911
+                      **kwargs):
890 912
         # NOTE(jamielennox): We handle redirection manually because the
891 913
         # requests lib follows some browser patterns where it will redirect
892 914
         # POSTs as GETs for certain statuses which is not want we want for an
@@ -900,7 +922,8 @@ class Session(object):
900 922
 
901 923
         try:
902 924
             try:
903
-                resp = self.session.request(method, url, **kwargs)
925
+                with rate_semaphore:
926
+                    resp = self.session.request(method, url, **kwargs)
904 927
             except requests.exceptions.SSLError as e:
905 928
                 msg = 'SSL exception connecting to %(url)s: %(error)s' % {
906 929
                     'url': url, 'error': e}
@@ -934,6 +957,7 @@ class Session(object):
934 957
                 url, method, redirect, log, logger, split_loggers,
935 958
                 status_code_retries=status_code_retries,
936 959
                 retriable_status_codes=retriable_status_codes,
960
+                rate_semaphore=rate_semaphore,
937 961
                 connect_retries=connect_retries - 1,
938 962
                 connect_retry_delay=connect_retry_delay * 2,
939 963
                 **kwargs)
@@ -964,6 +988,7 @@ class Session(object):
964 988
                 # This request actually worked so we can reset the delay count.
965 989
                 new_resp = self._send_request(
966 990
                     location, method, redirect, log, logger, split_loggers,
991
+                    rate_semaphore=rate_semaphore,
967 992
                     connect_retries=connect_retries,
968 993
                     status_code_retries=status_code_retries,
969 994
                     retriable_status_codes=retriable_status_codes,
@@ -989,6 +1014,7 @@ class Session(object):
989 1014
                 connect_retries=connect_retries,
990 1015
                 status_code_retries=status_code_retries - 1,
991 1016
                 retriable_status_codes=retriable_status_codes,
1017
+                rate_semaphore=rate_semaphore,
992 1018
                 status_code_retry_delay=status_code_retry_delay * 2,
993 1019
                 **kwargs)
994 1020
 

+ 86
- 0
keystoneauth1/tests/unit/test_fair_sempahore.py View File

@@ -0,0 +1,86 @@
1
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
2
+# not use this file except in compliance with the License. You may obtain
3
+# a copy of the License at
4
+#
5
+#      http://www.apache.org/licenses/LICENSE-2.0
6
+#
7
+# Unless required by applicable law or agreed to in writing, software
8
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
9
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
10
+# License for the specific language governing permissions and limitations
11
+# under the License.
12
+
13
+from threading import Thread
14
+from timeit import default_timer as timer
15
+
16
+import mock
17
+from six.moves import queue
18
+import testtools
19
+
20
+from keystoneauth1 import _fair_semaphore
21
+
22
+
23
+class SemaphoreTests(testtools.TestCase):
24
+
25
+    def _thread_worker(self):
26
+        while True:
27
+            # get returns the Item, but we don't care about the value so we
28
+            # purposely don't assign it to anything.
29
+            self.q.get()
30
+            with self.s:
31
+                self.mock_payload.do_something()
32
+            self.q.task_done()
33
+
34
+    # Have 5 threads do 10 different "things" coordinated by the fair
35
+    # semaphore.
36
+    def _concurrency_core(self, concurrency, delay):
37
+        self.s = _fair_semaphore.FairSemaphore(concurrency, delay)
38
+
39
+        self.q = queue.Queue()
40
+        for i in range(5):
41
+            t = Thread(target=self._thread_worker)
42
+            t.daemon = True
43
+            t.start()
44
+
45
+        for item in range(0, 10):
46
+            self.q.put(item)
47
+
48
+        self.q.join()
49
+
50
+    def setUp(self):
51
+        super(SemaphoreTests, self).setUp()
52
+        self.mock_payload = mock.Mock()
53
+
54
+    # We should be waiting at least 0.1s between operations, so
55
+    # the 10 operations must take at *least* 1 second
56
+    def test_semaphore_no_concurrency(self):
57
+        start = timer()
58
+        self._concurrency_core(None, 0.1)
59
+        end = timer()
60
+        self.assertTrue((end - start) > 1.0)
61
+        self.assertEqual(self.mock_payload.do_something.call_count, 10)
62
+
63
+    def test_semaphore_single_concurrency(self):
64
+        start = timer()
65
+        self._concurrency_core(1, 0.1)
66
+        end = timer()
67
+        self.assertTrue((end - start) > 1.0)
68
+        self.assertEqual(self.mock_payload.do_something.call_count, 10)
69
+
70
+    def test_semaphore_multiple_concurrency(self):
71
+        start = timer()
72
+        self._concurrency_core(5, 0.1)
73
+        end = timer()
74
+        self.assertTrue((end - start) > 1.0)
75
+        self.assertEqual(self.mock_payload.do_something.call_count, 10)
76
+
77
+    # do some high speed tests; I don't think we can really assert
78
+    # much about these other than they don't deadlock...
79
+    def test_semaphore_fast_no_concurrency(self):
80
+        self._concurrency_core(None, 0.0)
81
+
82
+    def test_semaphore_fast_single_concurrency(self):
83
+        self._concurrency_core(1, 0.0)
84
+
85
+    def test_semaphore_fast_multiple_concurrency(self):
86
+        self._concurrency_core(5, 0.0)

+ 2
- 0
keystoneauth1/tests/unit/test_session.py View File

@@ -1565,6 +1565,7 @@ class AdapterTest(utils.TestCase):
1565 1565
             with mock.patch.object(sess, 'request') as m:
1566 1566
                 adapter.Adapter(sess, **adap_kwargs).get(url, **get_kwargs)
1567 1567
                 m.assert_called_once_with(url, 'GET', endpoint_filter={},
1568
+                                          rate_semaphore=mock.ANY,
1568 1569
                                           **exp_kwargs)
1569 1570
 
1570 1571
         # No default_microversion in Adapter, no microversion in get()
@@ -1588,6 +1589,7 @@ class AdapterTest(utils.TestCase):
1588 1589
             with mock.patch.object(sess, 'request') as m:
1589 1590
                 adapter.Adapter(sess, **adap_kwargs).get(url, **get_kwargs)
1590 1591
                 m.assert_called_once_with(url, 'GET', endpoint_filter={},
1592
+                                          rate_semaphore=mock.ANY,
1591 1593
                                           **exp_kwargs)
1592 1594
 
1593 1595
         # No raise_exc in Adapter or get()

+ 10
- 0
releasenotes/notes/client-side-rate-limiting-dec43fc9b54f5b70.yaml View File

@@ -0,0 +1,10 @@
1
+---
2
+features:
3
+  - |
4
+    Support added for client-side rate limiting. Two new parameters now
5
+    exist for ``keystoneauth1.adapter.Adapter``. ``rate`` expresses a
6
+    maximum rate at which to execute requests. ``parallel_limit`` allows
7
+    for the creation of a semaphore to control the maximum number of
8
+    requests that can be active at any one given point in time.
9
+    Both default to ``None`` which has the normal behavior or not limiting
10
+    requests in any manner.

Loading…
Cancel
Save