Merge "common: add memcached based cooperative token mechanism."

This commit is contained in:
Zuul
2025-09-29 22:59:08 +00:00
committed by Gerrit Code Review
4 changed files with 1226 additions and 6 deletions

View File

@@ -100,7 +100,7 @@ from swift.common.utils.logs import ( # noqa
LOG_LINE_DEFAULT_FORMAT,
NOTICE,
)
from swift.common.utils.config import ( # noqa
from swift.common.utils.config import ( # noqa
TRUE_VALUES,
NicerInterpolation,
config_true_value,
@@ -1493,6 +1493,253 @@ def load_multikey_opts(conf, prefix, allow_none_key=False):
return sorted(result)
class CooperativeCachePopulator(object):
"""
A cooperative token is used to avoid the thundering herd problem when
caching is used in front of slow backend(s). Here is it how it works:
* When lots of in-flight requests try to get the cached item specified by
a key from memcache and get cache misses, only the first few (limited by
``num_tokens``) of query requests will be able to get a cooperative
token by creating or incrementing an internal memcache key.
* Those callers with tokens can send backend requests to fetch data from
backend servers and set data into memcache.
* All other cache miss requests without a token should wait for cache
filling to finish, instead of all querying the backend servers at the
same time.
* After those requests with a token are done, they will release the token
by deleting the internal cache key, and finish this usage session. As
such, one token usage session starts when the first request gets a token
after cache misses and ends when all requests with a token are done.
Multiple tokens are available in each usage session in order to increase
fault tolerance in the distributed environment. When one request with a
token hangs or exits, any other request with a token can still set new
fetched data into memcache and finish the current usage session. The
CooperativeCachePopulator class uses ``num_tokens`` to define the maximum
number of tokens during each usage session. The default is 3.
In the rare case of all 3 requests with tokens failing, the existing usage
session ends after a ``token_ttl`` period is reached and the token key
expires. If this happens then other pending requests which have no token
will exit waiting in the same order as entering the token session, and fall
back to querying the backend and setting data in memcache. After any of
those requests without a token successfully sets data in the memcache, the
following requests in waiting can fetch that data from memcache as they
continue retrying. The ``token_ttl`` is designed to be a back-off time
window for retrying the backend; it will work for the same purpose in this
case as well. When one token session ends after ``token_ttl``, requests
which see a cache miss will start a new cooperative token session.
:param app: the application instance containing app.logger, app.statsd
:param infocache: the infocache instance.
:param memcache: the memcache instance, must be a valid MemcacheRing.
:param cache_key: the cache key.
:param cache_ttl: time-to-live of the data fetched from backend to set into
memcached.
:param avg_backend_fetch_time: The average time in seconds expected for the
backend fetch operation ``do_fetch_backend`` to complete. This duration
serves as a base unit for calculating exponential backoff delays when
awaiting cache population by other requests, and for determining the
cooperative token's time-to-live (``token_ttl``) which is set to 10x
this value. Should be greater than 0.
:param num_tokens: the minimum limit of tokens per each usage session,
also the minimum limit of in-flight requests allowed to fetch data
from backend. The default is 3, which gives redundancy when any request
with token fails to fetch data from the backend or fails to set new
data into memcached; 0 means no cooperative token is used.
:param labels: the default labels for emitting labeled metrics, for example
resource or operation type, account, container, etc.
"""
def __init__(self, app, infocache, memcache,
cache_key, cache_ttl, avg_backend_fetch_time, num_tokens=3,
labels=None):
self._logger = app.logger
self._statsd = app.statsd
self._labels = labels or {}
self._infocache = infocache
self._memcache = memcache
self._cache_key = cache_key
self._cache_ttl = cache_ttl
self._token_key = '_cache_token/%s' % cache_key
self._avg_backend_fetch_time = avg_backend_fetch_time
# Time-to-live of the cooperative token when set in memcached, this
# defines the typical worse time that a token request would need to
# fetch the data from the backend when it's busy, default to be 10
# times of the average time spent on ``do_fetch_backend`` which is
# the ``avg_backend_fetch_time``.
self._token_ttl = avg_backend_fetch_time * 10
self._num_tokens = num_tokens
# The status of cache operation which sets backend data into Memcached.
self.set_cache_state = None
# Indicates if this request has acquired one token.
self.token_acquired = False
# Indicates if the request has no token and doesn't get enough retries.
self.lack_retries = False
# The HttpResponse object returned by ``do_fetch_backend`` if called.
self.backend_resp = None
# Track if fetch_data has been called to enforce one-shot usage
self._fetch_called = False
def do_fetch_backend(self):
"""
To fetch data from the backend, needs to be implemented by sub-class.
:returns: a tuple of (data, response).
"""
raise NotImplementedError
def cache_encoder(self, data):
"""
To encode data to be stored in Memcached, default to return the data
as is.
:returns: encoded data.
"""
return data
def cache_decoder(self, data):
"""
To decode data from Memcached, default to return the data as is.
:returns: decoded data.
"""
return data
def _query_backend_and_set_cache(self):
"""
Fetch data from the backend and set the value in the Memcached.
:returns: value of the data fetched from backend; None if not exist.
"""
data, self.backend_resp = self.do_fetch_backend()
if not data:
return None
if self._infocache is not None:
self._infocache[self._cache_key] = data
try:
encoded_data = self.cache_encoder(data)
self._memcache.set(
self._cache_key, encoded_data,
time=self._cache_ttl, raise_on_error=True)
except swift.common.exceptions.MemcacheConnectionError:
self.set_cache_state = 'set_error'
else:
self.set_cache_state = 'set'
return data
def _sleep_and_retry_memcache(self):
"""
Wait for cache value to be set by other requests with intermittent and
limited number of sleeps. With ``token_ttl`` set as 10 times of
``avg_backend_fetch_time`` and the exponential backoff doubling the
retry interval after each retry, this function will normally sleep and
retry 3 times maximum.
The first retry is 1.5 times of the ``avg_backend_fetch_time``, the
second is 3 times, and the third is 6 times of it, so total is 10.5
times of the ``avg_backend_fetch_time``. This roughly equals to the
``token_ttl`` which is 10 times of the ``avg_backend_fetch_time``.
:returns: value of the data fetched from Memcached; None if not exist.
"""
cur_time = time.time()
cutoff_time = cur_time + self._token_ttl
retry_interval = self._avg_backend_fetch_time * 1.5
num_waits = 0
while cur_time < cutoff_time or num_waits < 3:
if cur_time < cutoff_time:
sleep(retry_interval)
num_waits += 1
else:
# Request has no token and doesn't get enough retries.
self.lack_retries = True
# To have one last check, when eventlet scheduling didn't give
# this greenthread enough cpu cycles and it didn't have enough
# times of retries.
num_waits = 3
cache_data = self._memcache.get(
self._cache_key, raise_on_error=False)
if cache_data:
# cache hit.
decoded_data = self.cache_decoder(cache_data)
return decoded_data
# cache miss, retry again with exponential backoff
retry_interval *= 2
cur_time = time.time()
return None
def _fetch_data(self):
total_requests = 0
try:
total_requests = self._memcache.incr(
self._token_key, time=self._token_ttl)
except swift.common.exceptions.MemcacheConnectionError:
self._labels['token'] = 'error' # nosec bandit B105
if not total_requests:
# Couldn't connect to the memcache to increment the token key
data = self._query_backend_and_set_cache()
elif total_requests <= self._num_tokens:
# Acquired a cooperative token, go fetching data from backend and
# set the data in memcache.
self.token_acquired = True
data = self._query_backend_and_set_cache()
if self.set_cache_state == 'set':
# Since the successful finish of one whole cooperative token
# session only depends on a single successful request. So when
# any request with a token finishes both backend fetching and
# memcache set successful, it can remove all cooperative tokens
# of this token session.
self._memcache.delete(self._token_key)
else:
# No token acquired, it means that there are requests in-flight
# which will fetch data form the backend servers and update them in
# cache, let's wait for them to finish with limited retries.
data = self._sleep_and_retry_memcache()
self._labels['lack_retries'] = self.lack_retries
if data is None:
# Still no cache data fetched.
data = self._query_backend_and_set_cache()
return data
def fetch_data(self):
"""
Coalescing all requests which are asking for the same data from the
backend into a few with cooperative token.
:returns: value of the data fetched from backend or memcache; None if
not exist.
"""
if self._fetch_called:
raise RuntimeError("fetch_data() can only be called once per "
"CooperativeCachePopulator instance")
self._fetch_called = True
if not self._num_tokens:
# Cooperative token disabled, fetch from backend.
data = self._query_backend_and_set_cache()
self._labels['token'] = 'disabled' # nosec bandit B105
else:
data = self._fetch_data()
if 'token' not in self._labels:
self._labels['token'] = 'with_token' if self.token_acquired \
else 'no_token'
if self.backend_resp:
self._labels['event'] = 'backend_reqs'
self._labels['status'] = self.backend_resp.status_int
else:
self._labels['event'] = 'cache_served'
if self.set_cache_state:
self._labels['set_cache_state'] = self.set_cache_state
self._statsd.increment('swift_coop_cache', labels=self._labels)
return data
def write_pickle(obj, dest, tmp=None, pickle_protocol=0):
"""
Ensure that a pickle file gets written to disk. The file

View File

@@ -76,6 +76,7 @@ class BaseFakeStatsdClient:
self.calls = defaultdict(list)
self.recording_socket = RecordingSocket()
self.counters = defaultdict(int)
self.labeled_stats_counters = defaultdict(int)
@property
def sendto_calls(self):
@@ -94,8 +95,15 @@ class BaseFakeStatsdClient:
Hook into base class primitive to track all "counter" metrics
"""
self.counters[metric] += value
labels = kwargs.get('labels', None)
if labels is not None:
hashable_labels = frozenset(labels.items())
self.labeled_stats_counters[(metric, hashable_labels)] += value
return super()._update_stats(metric, value, *args, **kwargs)
def get_labeled_stats_counts(self):
return self.labeled_stats_counters
# getter for backwards compat
def get_stats_counts(self):
return self.counters

View File

@@ -41,6 +41,7 @@ from io import BytesIO
from uuid import uuid4
from http.client import HTTPException
from swift.common import memcached
from swift.common import storage_policy, swob, utils, exceptions
from swift.common.memcached import MemcacheConnectionError
from swift.common.storage_policy import (StoragePolicy, ECStoragePolicy,
@@ -48,6 +49,8 @@ from swift.common.storage_policy import (StoragePolicy, ECStoragePolicy,
from swift.common.utils import Timestamp, md5, close_if_possible, checksum
from test import get_config
from test.debug_logger import FakeLogger
from test.unit.common.test_memcached import MockedMemcachePool, \
MockMemcached
from swift.common.header_key_dict import HeaderKeyDict
from swift.common.ring import Ring, RingData, RingBuilder
from swift.obj import server
@@ -395,6 +398,14 @@ def track(f):
class FakeMemcache(object):
"""
Simple in-memory test helper for basic memcache GET/SET operations.
This class provides a lightweight mock that stores data in a Python dict.
It does not implement TTL expiration or the full memcache protocol, and
bypasses MemcacheRing's internal handling (e.g., atomic incr operations).
Use this for simple tests that only need basic key-value storage.
"""
def __init__(self, error_on_set=None, error_on_get=None):
self.store = {}
@@ -468,6 +479,65 @@ class FakeMemcache(object):
del track
class TestableMemcacheRing(memcached.MemcacheRing):
"""
Real MemcacheRing with injectable errors for testing concurrent scenarios.
This class wraps the actual MemcacheRing implementation while allowing
controlled injection of connection errors. It preserves MemcacheRing's
internal behavior including atomic operations and protocol compliance.
Use this for testing components that rely on memcache features like
atomic incr/decr or when testing concurrent access patterns.
"""
def __init__(self, servers, inject_incr_error=None, inject_set_error=None,
inject_get_error=None, inject_del_error=None, **kwargs):
self.inject_incr_error = inject_incr_error
self.inject_set_error = inject_set_error
self.inject_get_error = inject_get_error
self.inject_del_error = inject_del_error
super().__init__(servers, **kwargs)
mock_cache = MockMemcached()
self._client_cache['1.2.3.4:11211'] = MockedMemcachePool(
[(mock_cache, mock_cache)] * 2)
self.set_calls = []
self.incr_calls = []
self.get_calls = []
self.del_calls = []
def incr(self, key, delta=1, time=0):
self.incr_calls.append((key, delta, time))
if self.inject_incr_error:
raise MemcacheConnectionError
return super().incr(key, delta, time)
def set(self, key, value, serialize=True, time=0,
min_compress_len=0, raise_on_error=False):
self.set_calls.append((key, value, time))
if self.inject_set_error:
if raise_on_error:
raise MemcacheConnectionError
else:
return None
super().set(
key, value, serialize, time, min_compress_len, raise_on_error)
def get(self, key, raise_on_error=False):
self.get_calls.append(key)
if self.inject_get_error:
if raise_on_error:
raise MemcacheConnectionError
else:
return None
return super().get(key, raise_on_error)
def delete(self, key, server_key=None):
self.del_calls.append(key)
if self.inject_del_error:
raise MemcacheConnectionError
super().delete(key, server_key)
class FakeIterable(object):
def __init__(self, values):
self.next_call_count = 0

View File

@@ -21,9 +21,10 @@ import io
import itertools
from swift.common.statsd_client import StatsdClient
from test.debug_logger import debug_logger, FakeStatsdClient
from test.debug_logger import debug_logger, FakeStatsdClient, \
debug_labeled_statsd_client
from test.unit import temptree, make_timestamp_iter, with_tempdir, \
mock_timestamp_now, FakeIterable
mock_timestamp_now, TestableMemcacheRing, FakeIterable
import contextlib
import errno
@@ -64,7 +65,8 @@ from uuid import uuid4
from swift.common.exceptions import Timeout, LockTimeout, \
ReplicationLockTimeout, MimeInvalid
from swift.common import utils
from swift.common.utils import set_swift_dir, md5, ShardRangeList
from swift.common.utils import set_swift_dir, md5, ShardRangeList, \
CooperativeCachePopulator
from swift.common.container_sync_realms import ContainerSyncRealms
from swift.common.header_key_dict import HeaderKeyDict
from swift.common.storage_policy import POLICIES, reload_storage_policies
@@ -122,6 +124,13 @@ class MockOs(object):
return getattr(os, name)
class MockApp(object):
def __init__(self, logger, statsd):
self.logger = logger
self.statsd = statsd
class MockSys(object):
def __init__(self):
@@ -2956,6 +2965,891 @@ cluster_dfw1 = http://dfw1.host/v1/
self.assertEqual(os.getppid(), utils.get_ppid(os.getpid()))
class TestCooperativeCachePopulator(unittest.TestCase):
backend_resp = Response(status=200, body=b'response')
def setUp(self):
self.logger = debug_logger()
conf = {
'log_statsd_host': 'host',
'log_statsd_port': 8125,
'statsd_label_mode': 'dogstatsd',
'statsd_emit_legacy': True,
}
self.statsd = debug_labeled_statsd_client(conf)
self.memcache = TestableMemcacheRing(
['1.2.3.4:11211'], logger=self.logger)
self.infocache = {}
self.cache_key = "test_key"
self.token_key = "_cache_token/%s" % self.cache_key
self.cache_ttl = 60
self.avg_backend_fetch_time = 0.001
class MockCachePopulator(CooperativeCachePopulator):
def do_fetch_backend(self):
return "backend data", TestCooperativeCachePopulator.backend_resp
class DataTransformCachePopulator(CooperativeCachePopulator):
def cache_encoder(self, data):
return data.upper()
def cache_decoder(self, data):
return data.lower()
def do_fetch_backend(self):
return "backend data", TestCooperativeCachePopulator.backend_resp
class DelayedCachePopulator(CooperativeCachePopulator):
def __init__(self, app, infocache, memcache,
cache_key, cache_ttl, avg_backend_fetch_time,
backend_delay=0,
fetch_backend_failure=False,
num_tokens=3, labels=None):
super().__init__(
app, infocache, memcache, cache_key, cache_ttl,
avg_backend_fetch_time, num_tokens, labels=labels
)
self._backend_delay = backend_delay
self._fetch_backend_failure = fetch_backend_failure
def do_fetch_backend(self):
if self._backend_delay:
eventlet.sleep(self._backend_delay)
if self._fetch_backend_failure:
return None, FakeResponse(status_int=503)
else:
return "backend data", \
TestCooperativeCachePopulator.backend_resp
def test_populator_constructor(self):
obj = self.MockCachePopulator(
MockApp(self.logger, self.statsd),
self.infocache, self.memcache,
self.cache_key, self.cache_ttl,
self.avg_backend_fetch_time,
num_tokens=10, labels={
'resource': "test",
}
)
self.assertEqual(obj._logger, self.logger)
self.assertEqual(obj._labels, {
'resource': "test",
})
self.assertEqual(obj._infocache, self.infocache)
self.assertEqual(obj._memcache, self.memcache)
self.assertEqual(obj._cache_key, self.cache_key)
self.assertEqual(obj._cache_ttl, self.cache_ttl)
self.assertEqual(obj._token_key, '_cache_token/%s' % self.cache_key)
self.assertEqual(
obj._avg_backend_fetch_time, self.avg_backend_fetch_time)
self.assertEqual(obj._token_ttl, self.avg_backend_fetch_time * 10)
self.assertEqual(obj._num_tokens, 10)
self.assertEqual(obj.cache_encoder(42), 42)
self.assertEqual(obj.cache_decoder(42), 42)
self.assertIsNone(obj.set_cache_state)
self.assertFalse(obj.token_acquired)
self.assertIsNone(obj.backend_resp)
def test_populator_num_tokens_zero(self):
populator = self.MockCachePopulator(
MockApp(self.logger, self.statsd),
self.infocache, self.memcache,
self.cache_key, self.cache_ttl,
avg_backend_fetch_time=.1,
num_tokens=0, labels={
'resource': "test",
}
)
data = populator.fetch_data()
self.assertEqual(data, "backend data")
self.assertEqual(populator.backend_resp, self.backend_resp)
self.assertEqual(populator.set_cache_state, "set")
self.assertFalse(populator.token_acquired)
self.assertEqual(self.memcache.get_calls, [])
self.assertEqual(self.infocache[self.cache_key], "backend data")
self.assertEqual(self.memcache.incr_calls, [])
self.assertEqual(
self.memcache.set_calls,
[(self.cache_key, "backend data", self.cache_ttl)]
)
self.assertEqual(self.memcache.del_calls, [])
stats = self.statsd.get_labeled_stats_counts()
self.assertEqual({
('swift_coop_cache', frozenset((
('resource', 'test'),
('event', 'backend_reqs'),
('token', 'disabled'),
('set_cache_state', 'set'),
('status', 200)),
)): 1,
}, stats)
def test_populator_fetch_data_called_twice(self):
populator = self.MockCachePopulator(
MockApp(self.logger, self.statsd),
self.infocache, self.memcache,
self.cache_key, self.cache_ttl,
self.avg_backend_fetch_time,
)
data = populator.fetch_data()
self.assertEqual(data, "backend data")
self.assertTrue(populator._fetch_called)
self.assertRaises(RuntimeError, populator.fetch_data)
def test_first_request_with_token(self):
# Test the first request will acquire the token, fetch data from
# the backend and set it into Memcached.
populator = self.MockCachePopulator(
MockApp(self.logger, self.statsd),
self.infocache, self.memcache,
self.cache_key, self.cache_ttl,
self.avg_backend_fetch_time,
labels={
'resource': "test",
}
)
data = populator.fetch_data()
self.assertEqual(data, "backend data")
self.assertEqual(populator.backend_resp, self.backend_resp)
self.assertEqual(populator.set_cache_state, "set")
self.assertTrue(populator.token_acquired)
self.assertEqual(self.memcache.get_calls, [])
self.assertEqual(self.infocache[self.cache_key], "backend data")
self.assertEqual(
self.memcache.incr_calls,
[(self.token_key, 1, self.avg_backend_fetch_time * 10)]
)
self.assertEqual(
self.memcache.set_calls,
[(self.cache_key, "backend data", self.cache_ttl)]
)
self.assertEqual(self.memcache.del_calls, [self.token_key])
stats = self.statsd.get_labeled_stats_counts()
self.assertEqual({
('swift_coop_cache', frozenset((
('resource', 'test'),
('event', 'backend_reqs'),
('token', 'with_token'),
('set_cache_state', 'set'),
('status', 200)),
)): 1,
}, stats)
def test_first_request_with_token_with_encoder(self):
# Test the processing of first request with cache data encoder and
# decoder.
populator = self.DataTransformCachePopulator(
MockApp(self.logger, self.statsd),
self.infocache, self.memcache,
self.cache_key, self.cache_ttl,
self.avg_backend_fetch_time,
labels={
'resource': "test",
}
)
data = populator.fetch_data()
self.assertEqual(data, "backend data")
self.assertEqual(populator.backend_resp, self.backend_resp)
self.assertEqual(populator.set_cache_state, "set")
self.assertTrue(populator.token_acquired)
self.assertEqual(self.memcache.get_calls, [])
self.assertEqual(self.infocache[self.cache_key], "backend data")
self.assertEqual(
self.memcache.incr_calls,
[(self.token_key, 1, self.avg_backend_fetch_time * 10)]
)
self.assertEqual(
self.memcache.set_calls,
[(self.cache_key, "BACKEND DATA", self.cache_ttl)]
)
self.assertEqual(self.memcache.del_calls, [self.token_key])
stats = self.statsd.get_labeled_stats_counts()
self.assertEqual({
('swift_coop_cache', frozenset((
('resource', 'test'),
('event', 'backend_reqs'),
('token', 'with_token'),
('set_cache_state', 'set'),
('status', 200)),
)): 1,
}, stats)
def test_following_request_with_token(self):
# Test the following request (not the first one) which also acquires
# the token, fetch data from the backend and set it into Memcached.
# Simulate that there are 1 or 2 requests who have acquired the tokens.
prior_reqs_with_token = random.randint(1, 2)
total_requests = self.memcache.incr(
self.token_key, delta=prior_reqs_with_token, time=10)
self.assertEqual(total_requests, prior_reqs_with_token)
# Test the following request to get the token.
self.memcache.incr_calls = []
populator = self.MockCachePopulator(
MockApp(self.logger, self.statsd),
self.infocache, self.memcache,
self.cache_key, self.cache_ttl,
self.avg_backend_fetch_time,
labels={
'resource': "test",
}
)
data = populator.fetch_data()
self.assertEqual(data, "backend data")
self.assertEqual(populator.backend_resp, self.backend_resp)
self.assertEqual(populator.set_cache_state, "set")
self.assertTrue(populator.token_acquired)
self.assertEqual(self.memcache.get_calls, [])
self.assertEqual(self.infocache[self.cache_key], "backend data")
self.assertEqual(
self.memcache.incr_calls,
[(self.token_key, 1, self.avg_backend_fetch_time * 10)]
)
self.assertEqual(
self.memcache.set_calls,
[(self.cache_key, "backend data", self.cache_ttl)]
)
self.assertEqual(self.memcache.del_calls, [self.token_key])
stats = self.statsd.get_labeled_stats_counts()
self.assertEqual({
('swift_coop_cache', frozenset((
('resource', 'test'),
('event', 'backend_reqs'),
('token', 'with_token'),
('set_cache_state', 'set'),
('status', 200)),
)): 1,
}, stats)
def test_fetch_data_cache_hit_without_token(self):
# Test the request which doesn't acquire the token, then keep sleeping
# and trying to fetch data from the Memcached until it succeeds.
def test_fetch_data(avg_fetch_time,
total_miss_retries,
expected_sleep_calls):
self.logger.statsd_client.clear()
self.statsd.clear()
num_tokens_per_session = random.randint(1, 3)
retries = 0
class CustomizedCache(TestableMemcacheRing):
def get(self, key, raise_on_error=False):
nonlocal retries
retries += 1
if retries <= total_miss_retries:
value = super().get(
"NOT_EXISTED_YET")
return value
else:
return super().get(key)
self.memcache = CustomizedCache(
['1.2.3.4:11211'], logger=self.logger)
# Simulate that there were 'num_tokens_per_session' requests who
# have acquired the tokens.
total_requests = self.memcache.incr(
self.token_key, delta=num_tokens_per_session, time=10)
self.assertEqual(total_requests, num_tokens_per_session)
self.memcache.set(self.cache_key, [1, 2, 3])
# Test the request without a token
self.memcache.incr_calls = []
self.memcache.set_calls = []
populator = self.MockCachePopulator(
MockApp(self.logger, self.statsd),
self.infocache, self.memcache,
self.cache_key, self.cache_ttl,
avg_backend_fetch_time=avg_fetch_time,
num_tokens=num_tokens_per_session,
labels={
'resource': "test",
}
)
populator._token_ttl = 1
with mock.patch.object(utils, 'sleep') as mock_sleep:
data = populator.fetch_data()
self.assertEqual(expected_sleep_calls, mock_sleep.call_args_list)
self.assertEqual(data, [1, 2, 3])
self.assertIsNone(populator.backend_resp)
self.assertIsNone(populator.set_cache_state)
self.assertEqual(self.infocache, {})
self.assertEqual(
self.memcache.incr_calls,
[(self.token_key, 1, populator._token_ttl)]
)
self.assertEqual(self.memcache.set_calls, [])
self.assertEqual(retries, total_miss_retries + 1)
self.assertEqual(
self.memcache.get_calls,
['NOT_EXISTED_YET'] * (retries - 1) + [self.cache_key]
)
self.assertFalse(populator.token_acquired)
stats = self.statsd.get_labeled_stats_counts()
self.assertEqual({
('swift_coop_cache', frozenset((
('resource', 'test'),
('event', 'cache_served'),
('token', 'no_token'),
('lack_retries', False)),
)): 1,
}, stats)
test_fetch_data(1.0, 1, [mock.call(1.5), mock.call(3.0)])
test_fetch_data(
1.0, 2, [mock.call(1.5), mock.call(3.0), mock.call(6.0)])
test_fetch_data(2.0, 1, [mock.call(3.0), mock.call(6.0)])
test_fetch_data(
2.0, 2, [mock.call(3.0), mock.call(6.0), mock.call(12.0)])
def test_fetch_data_cache_miss_without_token(self):
# Test the request which doesn't acquire the token, then keep sleeping
# and trying to fetch data from the Memcached, but enventually all
# retries exhausted with cache misses.
num_tokens_per_session = random.randint(1, 3)
retries = 0
class CustomizedCache(TestableMemcacheRing):
def get(self, key, raise_on_error=False):
nonlocal retries
retries += 1
return super().get("NOT_EXISTED_YET")
self.memcache = CustomizedCache(['1.2.3.4:11211'], logger=self.logger)
# Simulate that there are 'num_tokens_per_session' requests who have
# acquired the tokens.
total_requests = self.memcache.incr(
self.token_key, delta=num_tokens_per_session, time=10)
self.assertEqual(total_requests, num_tokens_per_session)
self.memcache.set(self.cache_key, "cached data")
# Test the request without a token
self.memcache.incr_calls = []
self.memcache.set_calls = []
populator = self.MockCachePopulator(
MockApp(self.logger, self.statsd),
self.infocache, self.memcache,
self.cache_key, self.cache_ttl,
self.avg_backend_fetch_time,
num_tokens_per_session,
labels={
'resource': "test",
}
)
data = populator.fetch_data()
self.assertEqual(data, "backend data")
self.assertEqual(populator.backend_resp, self.backend_resp)
self.assertEqual(populator.set_cache_state, "set")
self.assertFalse(populator.token_acquired)
self.assertEqual(self.infocache[self.cache_key], "backend data")
self.assertEqual(
self.memcache.incr_calls,
[(self.token_key, 1, self.avg_backend_fetch_time * 10)]
)
self.assertEqual(
self.memcache.set_calls,
[(self.cache_key, "backend data", self.cache_ttl)]
)
self.assertEqual(retries, 3)
self.assertEqual(
self.memcache.get_calls, ['NOT_EXISTED_YET'] * retries)
self.assertEqual(self.memcache.del_calls, [])
stats = self.statsd.get_labeled_stats_counts()
self.assertEqual({
('swift_coop_cache', frozenset((
('resource', 'test'),
('event', 'backend_reqs'),
('token', 'no_token'),
('lack_retries', False),
('set_cache_state', 'set'),
('status', 200)),
)): 1,
}, stats)
def test_fetch_data_req_lacks_enough_retries(self):
# Test the request which doesn't acquire the token, then keep sleeping
# and trying to fetch data from the Memcached, but doesn't get enough
# retries.
retries = 0
class CustomizedCache(TestableMemcacheRing):
def get(self, key, raise_on_error=False):
nonlocal retries
retries += 1
return super().get("NOT_EXISTED_YET")
self.memcache = CustomizedCache(['1.2.3.4:11211'], logger=self.logger)
# Simulate that there are three requests who have acquired the tokens.
total_requests = self.memcache.incr(self.token_key, delta=3, time=10)
self.assertEqual(total_requests, 3)
self.memcache.set(self.cache_key, [1, 2, 3])
# Test the request without a token
self.memcache.incr_calls = []
self.memcache.set_calls = []
populator = self.MockCachePopulator(
MockApp(self.logger, self.statsd),
self.infocache, self.memcache,
self.cache_key, self.cache_ttl,
self.avg_backend_fetch_time,
labels={
'resource': "test",
}
)
with patch('time.time', ) as mock_time:
mock_time.side_effect = itertools.count(4000.99, 1.0)
data = populator.fetch_data()
self.assertEqual(data, "backend data")
self.assertEqual(populator.backend_resp, self.backend_resp)
self.assertEqual(populator.set_cache_state, "set")
self.assertFalse(populator.token_acquired)
self.assertEqual(self.infocache[self.cache_key], "backend data")
self.assertEqual(
self.memcache.incr_calls,
[(self.token_key, 1, self.avg_backend_fetch_time * 10)]
)
self.assertEqual(
self.memcache.set_calls,
[(self.cache_key, "backend data", self.cache_ttl)]
)
self.assertEqual(retries, 2)
self.assertEqual(
self.memcache.get_calls, ['NOT_EXISTED_YET'] * 2)
self.assertEqual(self.memcache.del_calls, [])
stats = self.statsd.get_labeled_stats_counts()
self.assertEqual({
('swift_coop_cache', frozenset((
('resource', 'test'),
('event', 'backend_reqs'),
('token', 'no_token'),
('lack_retries', True),
('set_cache_state', 'set'),
('status', 200)),
)): 1,
}, stats)
def test_get_token_connection_error(self):
# Test the request which couldn't acquire the token due to memcached
# connection error.
self.memcache = TestableMemcacheRing(
['1.2.3.4:11211'], logger=self.logger, inject_incr_error=True)
populator = self.MockCachePopulator(
MockApp(self.logger, self.statsd),
self.infocache, self.memcache,
self.cache_key, self.cache_ttl,
self.avg_backend_fetch_time,
labels={
'resource': "test",
}
)
data = populator.fetch_data()
self.assertEqual(data, "backend data")
self.assertEqual(populator.backend_resp, self.backend_resp)
self.assertEqual(populator.set_cache_state, "set")
self.assertFalse(populator.token_acquired)
self.assertEqual(self.infocache[self.cache_key], "backend data")
self.assertEqual(
self.memcache.incr_calls,
[(self.token_key, 1, self.avg_backend_fetch_time * 10)]
)
self.assertEqual(
self.memcache.set_calls,
[(self.cache_key, "backend data", self.cache_ttl)]
)
self.assertEqual(self.memcache.del_calls, [])
stats = self.statsd.get_labeled_stats_counts()
self.assertEqual({
('swift_coop_cache', frozenset((
('resource', 'test'),
('event', 'backend_reqs'),
('token', 'error'),
('set_cache_state', 'set'),
('status', 200)),
)): 1,
}, stats)
def test_set_data_connection_error(self):
# Test the request which is able to acquire the token, but fails to set
# the fetched backend data into memcached due to memcached connection
# error.
self.memcache = TestableMemcacheRing(
['1.2.3.4:11211'], logger=self.logger, inject_set_error=True)
populator = self.MockCachePopulator(
MockApp(self.logger, self.statsd),
self.infocache, self.memcache,
self.cache_key, self.cache_ttl,
self.avg_backend_fetch_time,
labels={
'resource': "test",
}
)
data = populator.fetch_data()
self.assertEqual(data, "backend data")
self.assertEqual(populator.backend_resp, self.backend_resp)
self.assertEqual(populator.set_cache_state, "set_error")
self.assertTrue(populator.token_acquired)
self.assertEqual(self.infocache[self.cache_key], "backend data")
self.assertEqual(
self.memcache.incr_calls,
[(self.token_key, 1, self.avg_backend_fetch_time * 10)]
)
self.assertEqual(
self.memcache.set_calls,
[(self.cache_key, "backend data", self.cache_ttl)]
)
self.assertEqual(self.memcache.del_calls, [])
stats = self.statsd.get_labeled_stats_counts()
self.assertEqual({
('swift_coop_cache', frozenset((
('resource', 'test'),
('event', 'backend_reqs'),
('token', 'with_token'),
('set_cache_state', 'set_error'),
('status', 200)),
)): 1,
}, stats)
def test_connection_errors_on_gettoken_and_dataset(self):
# Test the request which couldn't acquire the token due to memcached
# connection error, and then couldn't set the backend data into cache
# due to memcached connection error too.
self.memcache = TestableMemcacheRing(
['1.2.3.4:11211'], logger=self.logger,
inject_incr_error=True, inject_set_error=True)
populator = self.MockCachePopulator(
MockApp(self.logger, self.statsd),
self.infocache, self.memcache,
self.cache_key, self.cache_ttl,
self.avg_backend_fetch_time,
labels={
'resource': "test",
}
)
data = populator.fetch_data()
self.assertEqual(data, "backend data")
self.assertEqual(populator.backend_resp, self.backend_resp)
self.assertEqual(populator.set_cache_state, "set_error")
self.assertFalse(populator.token_acquired)
self.assertEqual(self.infocache[self.cache_key], "backend data")
self.assertEqual(
self.memcache.incr_calls,
[(self.token_key, 1, self.avg_backend_fetch_time * 10)]
)
self.assertEqual(
self.memcache.set_calls,
[(self.cache_key, "backend data", self.cache_ttl)]
)
self.assertEqual(self.memcache.del_calls, [])
stats = self.statsd.get_labeled_stats_counts()
self.assertEqual({
('swift_coop_cache', frozenset((
('resource', 'test'),
('event', 'backend_reqs'),
('token', 'error'),
('set_cache_state', 'set_error'),
('status', 200)),
)): 1,
}, stats)
def test_fetch_data_from_cache_connection_error(self):
# Test the request which doesn't acquire the token, then keep sleeping
# and trying to fetch data from the Memcached, but eventually all
# retries exhausted with memcached connection errors.
self.memcache = TestableMemcacheRing(
['1.2.3.4:11211'], logger=self.logger, inject_get_error=True)
# Simulate that there are three requests who have acquired the tokens.
total_requests = self.memcache.incr(self.token_key, delta=3, time=10)
self.assertEqual(total_requests, 3)
self.memcache.set(self.cache_key, [1, 2, 3])
# Test the request without a token
self.memcache.incr_calls = []
self.memcache.set_calls = []
populator = self.MockCachePopulator(
MockApp(self.logger, self.statsd),
self.infocache, self.memcache,
self.cache_key, self.cache_ttl,
self.avg_backend_fetch_time,
labels={
'resource': "test",
}
)
data = populator.fetch_data()
self.assertEqual(data, "backend data")
self.assertEqual(populator.backend_resp, self.backend_resp)
self.assertEqual(populator.set_cache_state, "set")
self.assertFalse(populator.token_acquired)
self.assertEqual(self.infocache[self.cache_key], "backend data")
self.assertEqual(
self.memcache.incr_calls,
[(self.token_key, 1, self.avg_backend_fetch_time * 10)]
)
self.assertEqual(
self.memcache.set_calls,
[(self.cache_key, "backend data", self.cache_ttl)]
)
self.assertEqual(
self.memcache.get_calls[0], self.cache_key)
self.assertEqual(self.memcache.del_calls, [])
stats = self.statsd.get_labeled_stats_counts()
self.assertEqual({
('swift_coop_cache', frozenset((
('resource', 'test'),
('event', 'backend_reqs'),
('token', 'no_token'),
('lack_retries', False),
('set_cache_state', 'set'),
('status', 200)),
)): 1,
}, stats)
def test_concurrent_requests(self):
# Simulate multiple concurrent threads, each of them issues a
# "fetch_data" request cooperatively.
self.avg_backend_fetch_time = 0.01
num_processes = 100
exceptions = []
def worker_process():
# Initialize new populator instance in each process.
populator = self.DelayedCachePopulator(
MockApp(self.logger, self.statsd),
{}, self.memcache,
self.cache_key, self.cache_ttl,
self.avg_backend_fetch_time,
backend_delay=self.avg_backend_fetch_time,
labels={
'resource': "test",
}
)
data = populator.fetch_data()
try:
# Data retrieved successfully
self.assertEqual(data, "backend data")
if populator.set_cache_state == 'set':
self.assertTrue(populator.token_acquired)
self.assertEqual(
populator._infocache[self.cache_key], "backend data")
self.assertEqual(populator.backend_resp, self.backend_resp)
else:
self.assertEqual(populator._infocache, {})
self.assertIsNone(populator.backend_resp)
except Exception as e:
exceptions.append(e)
# Issue those parallel requests "at the same time".
pool = eventlet.GreenPool()
for i in range(num_processes):
pool.spawn(worker_process)
# Wait for all requests to complete
pool.waitall()
if exceptions:
self.fail(f"Greenthread assertions failed: {exceptions}")
stats = self.statsd.get_labeled_stats_counts()
self.assertEqual({
('swift_coop_cache', frozenset((
('resource', 'test'),
('event', 'backend_reqs'),
('token', 'with_token'),
('set_cache_state', 'set'),
('status', 200)),
)): 3,
('swift_coop_cache', frozenset((
('resource', 'test'),
('event', 'cache_served'),
('token', 'no_token'),
('lack_retries', False)),
)): 97
}, stats)
self.assertEqual(
self.memcache.incr_calls,
[(self.token_key, 1, self.avg_backend_fetch_time * 10)] * 100
)
self.assertEqual(
self.memcache.set_calls,
[(self.cache_key, "backend data", self.cache_ttl)] * 3
)
self.assertEqual(self.memcache.del_calls, [self.token_key] * 3)
def test_concurrent_requests_all_token_requests_fail(self):
# Simulate multiple concurrent threads issued into a cooperative token
# session, each thread will issue a "fetch_data" request cooperatively.
# And the first three requests will acquire the token, but fail to get
# data from the backend. This test also demonstrates that even though
# all token requests fail to go through, other requests who arrive at
# the late stage of same token session and won't get a token still
# could be served out of the memcached.
self.avg_backend_fetch_time = 0.1
counts = {
'num_backend_success': 0,
'num_requests_served_from_cache': 0,
'num_backend_failures': 0,
}
exceptions = []
def worker_process(exec_delay, backend_delay,
fetch_backend_failure=False):
# Initialize new populator instance in each process.
populator = self.DelayedCachePopulator(
MockApp(self.logger, self.statsd),
{}, self.memcache,
self.cache_key, self.cache_ttl,
self.avg_backend_fetch_time,
backend_delay,
fetch_backend_failure,
labels={
'resource': "test",
}
)
if exec_delay:
eventlet.sleep(exec_delay)
data = populator.fetch_data()
try:
if fetch_backend_failure:
self.assertIsNone(data)
else:
self.assertEqual(data, "backend data")
if populator.set_cache_state == 'set':
counts['num_backend_success'] += 1
self.assertEqual(
populator._infocache[self.cache_key], "backend data")
self.assertEqual(populator.backend_resp, self.backend_resp)
elif not fetch_backend_failure:
counts['num_requests_served_from_cache'] += 1
self.assertEqual(populator._infocache, {})
self.assertIsNone(populator.backend_resp)
else:
counts['num_backend_failures'] += 1
except Exception as e:
exceptions.append(e)
# Issue those parallel requests at different time within this
# cooperative token session.
pool = eventlet.GreenPool()
# The first three requests will get the token but fails.
for i in range(3):
pool.spawn(
worker_process, 0, self.avg_backend_fetch_time * 15, True)
# The 4th request won't get a token, but after it exits its waiting
# cycles, it will fetch the data from the backend and set the data into
# the memcached.
pool.spawn(worker_process, 0, 0)
# The remaining 16 requests won't get a token, but will be served out
# of the memcached because the 4th request will finish within their
# waiting cycles.
for i in range(16):
pool.spawn(
worker_process,
random.uniform(self.avg_backend_fetch_time * 3,
self.avg_backend_fetch_time * 6),
self.avg_backend_fetch_time
)
# Wait for all requests to complete
pool.waitall()
if exceptions:
self.fail(f"Greenthread assertions failed: {exceptions}")
# The first three requests of the first token session failed to get
# data from the backend.
self.assertEqual(counts['num_backend_failures'], 3)
self.assertEqual(counts['num_backend_success'], 1)
self.assertEqual(counts['num_requests_served_from_cache'], 16)
self.assertEqual(len(self.memcache.incr_calls), 20)
self.assertEqual(len(self.memcache.del_calls), 0)
def test_concurrent_requests_pass_token_ttl(self):
# Simulate multiple concurrent threads across two cooperative token
# sessions, each thread will issue a "fetch_data" request
# cooperatively. The very first three requests which will acquire the
# token, but fail to fetch data from backend.
self.avg_backend_fetch_time = 0.1
counts = {
'num_backend_success': 0,
'num_requests_served_from_cache': 0,
'num_backend_failures': 0,
}
exceptions = []
def worker_process(exec_delay, backend_delay,
fetch_backend_failure=False):
# Initialize new populator instance in each process.
populator = self.DelayedCachePopulator(
MockApp(self.logger, self.statsd),
{}, self.memcache,
self.cache_key, self.cache_ttl,
self.avg_backend_fetch_time,
backend_delay,
fetch_backend_failure,
labels={
'resource': "test",
}
)
if exec_delay:
eventlet.sleep(exec_delay)
data = populator.fetch_data()
try:
if fetch_backend_failure:
self.assertIsNone(data)
else:
self.assertEqual(data, "backend data")
if populator.set_cache_state == 'set':
counts['num_backend_success'] += 1
self.assertEqual(
populator._infocache[self.cache_key], "backend data")
self.assertEqual(populator.backend_resp, self.backend_resp)
elif not fetch_backend_failure:
counts['num_requests_served_from_cache'] += 1
self.assertEqual(populator._infocache, {})
self.assertIsNone(populator.backend_resp)
else:
counts['num_backend_failures'] += 1
except Exception as e:
exceptions.append(e)
# Issue the parallel requests for the first token session.
pool = eventlet.GreenPool()
for i in range(3):
pool.spawn(
worker_process, 0, self.avg_backend_fetch_time * 15, True)
for i in range(17):
pool.spawn(
worker_process,
random.uniform(0, self.avg_backend_fetch_time * 10),
self.avg_backend_fetch_time
)
# Issue the parallel requests for the second token session.
for i in range(3):
pool.spawn(
worker_process,
self.avg_backend_fetch_time * 10,
self.avg_backend_fetch_time * 5
)
for i in range(17):
pool.spawn(
worker_process,
random.uniform(self.avg_backend_fetch_time * 10,
self.avg_backend_fetch_time * 11),
self.avg_backend_fetch_time
)
# Wait for all requests to complete
pool.waitall()
if exceptions:
self.fail(f"Greenthread assertions failed: {exceptions}")
# The first three requests of the first token session failed to get
# data from the backend.
self.assertEqual(counts['num_backend_failures'], 3)
self.assertGreaterEqual(counts['num_backend_success'], 3)
self.assertEqual(
counts['num_requests_served_from_cache'],
40 - counts['num_backend_failures'] -
counts['num_backend_success']
)
self.assertEqual(len(self.memcache.incr_calls), 40)
# The first three requests of the second token session will delete the
# token after fetching data from the backend and set it in cache.
self.assertEqual(len(self.memcache.del_calls), 3)
class TestUnlinkOlder(unittest.TestCase):
def setUp(self):
@@ -4642,8 +5536,9 @@ This is the body
class FakeResponse(object):
def __init__(self, status, headers, body):
self.status = status
def __init__(self, status_int=200, headers=None, body=b''):
self.status_int = status_int
self.status = status_int
self.headers = HeaderKeyDict(headers)
self.body = BytesIO(body)