Add a LazyExecutorTTLCache to the OpenStack driver

See the docstring for an explanation of what a Lazy Executor TTL
Cache is.  By switching the caching of the server list method (and
also volumes and fips) to the lazy cache, we will make all of the
methods called by the state machines asynchronous.  This means
that both the create and delete state machine threads should be
able to spin through all of their state machines as quickly as
Python and ZooKeeper overhead will allow.

Change-Id: Ibce6b4d82929e6a764fdbc025990f7e01060b509
This commit is contained in:
James E. Blair 2023-03-15 15:07:17 -07:00
parent 939a39feda
commit 8fed208fb2
3 changed files with 130 additions and 6 deletions

View File

@ -23,11 +23,10 @@ import math
import time
import operator
import cachetools.func
import openstack
from keystoneauth1.exceptions.catalog import EndpointNotFound
from nodepool.driver.utils import QuotaInformation
from nodepool.driver.utils import QuotaInformation, LazyExecutorTTLCache
from nodepool.driver import statemachine
from nodepool import exceptions
from nodepool import stats
@ -409,6 +408,20 @@ class OpenStackAdapter(statemachine.Adapter):
thread_name_prefix=f'openstack-api-{provider_config.name}',
max_workers=workers)
# Use a lazy TTL cache for these. This uses the TPE to
# asynchronously update the cached values, meanwhile returning
# the previous cached data if available. This means every
# call after the first one is instantaneous.
self._listServers = LazyExecutorTTLCache(
CACHE_TTL, self.api_executor)(
self._listServers)
self._listVolumes = LazyExecutorTTLCache(
CACHE_TTL, self.api_executor)(
self._listVolumes)
self._listFloatingIps = LazyExecutorTTLCache(
CACHE_TTL, self.api_executor)(
self._listFloatingIps)
self._last_image_check_failure = time.time()
self._last_port_cleanup = None
self._statsd = stats.get_client()
@ -688,12 +701,10 @@ class OpenStackAdapter(statemachine.Adapter):
name, self.provider.name))
return network
@cachetools.func.ttl_cache(maxsize=1, ttl=CACHE_TTL)
def _listServers(self):
with Timer(self.log, 'API call list_servers'):
return self._client.list_servers(bare=True)
@cachetools.func.ttl_cache(maxsize=1, ttl=CACHE_TTL)
def _listVolumes(self):
try:
with Timer(self.log, 'API call list_volumes'):
@ -701,7 +712,6 @@ class OpenStackAdapter(statemachine.Adapter):
except EndpointNotFound:
return []
@cachetools.func.ttl_cache(maxsize=1, ttl=CACHE_TTL)
def _listFloatingIps(self):
with Timer(self.log, 'API call list_floating_ips'):
return self._client.list_floating_ips()

View File

@ -457,3 +457,76 @@ class RateLimiter:
def _exit(self, etype, value, tb):
pass
class LazyExecutorTTLCache:
"""This is a lazy executor TTL cache.
It's lazy because if it has cached data, it will always return it
instantly.
It's executor based, which means that if a cache miss occurs, it
will submit a task to an executor to fetch new data.
Finally, it's a TTL cache, which means it automatically expires data.
Since it is only expected to be used when caching provider
resource listing methods, it assumes there will only be one entry
and ignores arguments -- it will return the same cached data no
matter what arguments are supplied; but it will pass on those
arguments to the underlying method in a cache miss.
:param numeric ttl: The cache timeout in seconds.
:param concurrent.futures.Executor executor: An executor to use to
update data asynchronously in case of a cache miss.
"""
def __init__(self, ttl, executor):
self.ttl = ttl
self.executor = executor
# If we have an outstanding update being run by the executor,
# this is the future.
self.future = None
# The last time the underlying method completed.
self.last_time = None
# The last value from the underlying method.
self.last_value = None
# A lock to make all of this thread safe (especially to ensure
# we don't fire off multiple updates).
self.lock = threading.Lock()
def __call__(self, func):
def decorator(*args, **kw):
with self.lock:
now = time.monotonic()
if self.future and self.future.done():
# If a previous call spawned an update, resolve
# that now so we can use the data.
try:
self.last_time, self.last_value = self.future.result()
finally:
# Clear the future regardless so we don't loop.
self.future = None
if self.last_time and now - self.last_time < self.ttl:
# A cache hit.
return self.last_value
# The rest of the method is a cache miss.
if self.last_time:
if not self.future:
# Fire off an asynchronous update request.
# This second wrapper ensures that we record
# the time that the update is complete along
# with the value.
def func_with_time():
ret = func(*args, **kw)
now = time.monotonic()
return (now, ret)
self.future = self.executor.submit(func_with_time)
else:
# This is the first time this method has been
# called; since we don't have any cached data, we
# will synchronously update the data.
self.last_value = func(*args, **kw)
self.last_time = time.monotonic()
return self.last_value
return decorator

View File

@ -12,11 +12,14 @@
# License for the specific language governing permissions and limitations
# under the License.
from concurrent.futures import ThreadPoolExecutor
import copy
import math
import time
from nodepool import tests
from nodepool.driver.utils import QuotaInformation
from nodepool.driver.utils import QuotaInformation, LazyExecutorTTLCache
from nodepool.nodeutils import iterate_timeout
class TestQutoInformation(tests.BaseTestCase):
@ -66,3 +69,41 @@ class TestQutoInformation(tests.BaseTestCase):
remain.subtract(needed)
self.assertEqual(expected.quota, remain.quota)
class FakeAdapter:
CACHE_TTL = 0.5
def __init__(self):
self.api_executor = ThreadPoolExecutor(max_workers=4)
self.get_time = LazyExecutorTTLCache(
self.CACHE_TTL, self.api_executor)(
self.get_time)
def get_time(self):
return time.monotonic()
class TestLazyExecutorTTLCache(tests.BaseTestCase):
def test_lazy_cache(self):
adapter = FakeAdapter()
t0 = time.monotonic()
ret1 = adapter.get_time()
t1 = time.monotonic()
self.assertTrue(t0 < ret1 < t1)
# Assuming the computer isn't completely overloaded, this
# should happen instantly and be a cache hit.
ret2 = adapter.get_time()
self.assertEqual(ret1, ret2)
# Sleep longer than the ttl
time.sleep(adapter.CACHE_TTL + 0.1)
# This should be a cache miss that triggers an update and
# returns the old value.
ret3 = adapter.get_time()
self.assertEqual(ret1, ret3)
# Eventually the async update should return and we should get
# a newer value.
for _ in iterate_timeout(30, Exception, 'cache update'):
ret4 = adapter.get_time()
if ret4 > ret3:
break