Monty Taylor 09934718f7 Add support for client-side rate limiting
shade/openstacksdk has implemented client-side rate limiting on top of
keystoneauth for ages and uses it extensively in nodepool. As part of an
effort to refactor that code a new approach was devised which was much
simpler and therfore suitable for inclusion in keystoneauth directly.

The underlying goal is two-fold, but fundamentally is about allowing a
user to add some settings so that they can avoid slamming their cloud.
First, allow a user to express that they never want to exceed a given
rate. Second, allow a user to limit the number of concurrent requests
allowed to be in flight.

The settings and logic are added to Adapter and not Session so that the
settings can easily be per-service. There is no need to block requests
to nova on a neutron rate limit, after all.

Co-Authored-By: Ian Wienand <>
Change-Id: Ic831e03a37d804f45b7ee58c87f92fa0f4411ad8
2019-02-28 22:14:24 +00:00

105 lines
3.8 KiB

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import threading
import time
from six.moves import queue
class FairSemaphore(object):
"""Semaphore class that notifies in order of request.
We cannot use a normal Semaphore because it doesn't give any ordering,
which could lead to a request starving. Instead, handle them in the
order we receive them.
:param int concurrency:
How many concurrent threads can have the semaphore at once.
:param float rate_delay:
How long to wait between the start of each thread receiving the
def __init__(self, concurrency, rate_delay):
self._lock = threading.Lock()
self._concurrency = concurrency
if concurrency:
self._count = 0
self._queue = queue.Queue()
self._rate_delay = rate_delay
self._rate_last_ts = time.time()
def __enter__(self):
"""Aquire a semaphore."""
# If concurrency is None, everyone is free to immediately execute.
if not self._concurrency:
# NOTE: Rate limiting still applies.This will ultimately impact
# concurrency a bit due to the mutex.
with self._lock:
execution_time = self._advance_timer()
execution_time = self._get_ticket()
return self._wait_for_execution(execution_time)
def _wait_for_execution(self, execution_time):
"""Wait until the pre-calculated time to run."""
wait_time = execution_time - time.time()
if wait_time > 0:
def _get_ticket(self):
ticket = threading.Event()
with self._lock:
if self._count <= self._concurrency:
# We can execute, no need to wait. Take a ticket and
# move on.
self._count += 1
return self._advance_timer()
# We need to wait for a ticket before we can execute.
# Put ourselves in the ticket queue to be woken up
# when available.
with self._lock:
return self._advance_timer()
def _advance_timer(self):
"""Calculate the time when it's ok to run a command again.
This runs inside of the mutex, serializing the calculation
of when it's ok to run again and setting _rate_last_ts to that
new time so that the next thread to calculate when it's safe to
run starts from the time that the current thread calculated.
self._rate_last_ts = self._rate_last_ts + self._rate_delay
return self._rate_last_ts
def __exit__(self, exc_type, exc_value, traceback):
"""Release the semaphore."""
# If concurrency is None, everyone is free to immediately execute
if not self._concurrency:
with self._lock:
# If waiters, wake up the next item in the queue (note
# we're under the queue lock so the queue won't change
# under us).
if self._queue.qsize() > 0:
ticket = self._queue.get()
# Nothing else to do, give our ticket back
self._count -= 1