diff --git a/swift/common/utils/__init__.py b/swift/common/utils/__init__.py index aa32548ad2..22a09c4499 100644 --- a/swift/common/utils/__init__.py +++ b/swift/common/utils/__init__.py @@ -100,7 +100,7 @@ from swift.common.utils.logs import ( # noqa LOG_LINE_DEFAULT_FORMAT, NOTICE, ) -from swift.common.utils.config import ( # noqa +from swift.common.utils.config import ( # noqa TRUE_VALUES, NicerInterpolation, config_true_value, @@ -1493,6 +1493,253 @@ def load_multikey_opts(conf, prefix, allow_none_key=False): return sorted(result) +class CooperativeCachePopulator(object): + """ + A cooperative token is used to avoid the thundering herd problem when + caching is used in front of slow backend(s). Here is it how it works: + + * When lots of in-flight requests try to get the cached item specified by + a key from memcache and get cache misses, only the first few (limited by + ``num_tokens``) of query requests will be able to get a cooperative + token by creating or incrementing an internal memcache key. + * Those callers with tokens can send backend requests to fetch data from + backend servers and set data into memcache. + * All other cache miss requests without a token should wait for cache + filling to finish, instead of all querying the backend servers at the + same time. + * After those requests with a token are done, they will release the token + by deleting the internal cache key, and finish this usage session. As + such, one token usage session starts when the first request gets a token + after cache misses and ends when all requests with a token are done. + + Multiple tokens are available in each usage session in order to increase + fault tolerance in the distributed environment. When one request with a + token hangs or exits, any other request with a token can still set new + fetched data into memcache and finish the current usage session. The + CooperativeCachePopulator class uses ``num_tokens`` to define the maximum + number of tokens during each usage session. The default is 3. + + In the rare case of all 3 requests with tokens failing, the existing usage + session ends after a ``token_ttl`` period is reached and the token key + expires. If this happens then other pending requests which have no token + will exit waiting in the same order as entering the token session, and fall + back to querying the backend and setting data in memcache. After any of + those requests without a token successfully sets data in the memcache, the + following requests in waiting can fetch that data from memcache as they + continue retrying. The ``token_ttl`` is designed to be a back-off time + window for retrying the backend; it will work for the same purpose in this + case as well. When one token session ends after ``token_ttl``, requests + which see a cache miss will start a new cooperative token session. + + :param app: the application instance containing app.logger, app.statsd + :param infocache: the infocache instance. + :param memcache: the memcache instance, must be a valid MemcacheRing. + :param cache_key: the cache key. + :param cache_ttl: time-to-live of the data fetched from backend to set into + memcached. + :param avg_backend_fetch_time: The average time in seconds expected for the + backend fetch operation ``do_fetch_backend`` to complete. This duration + serves as a base unit for calculating exponential backoff delays when + awaiting cache population by other requests, and for determining the + cooperative token's time-to-live (``token_ttl``) which is set to 10x + this value. Should be greater than 0. + :param num_tokens: the minimum limit of tokens per each usage session, + also the minimum limit of in-flight requests allowed to fetch data + from backend. The default is 3, which gives redundancy when any request + with token fails to fetch data from the backend or fails to set new + data into memcached; 0 means no cooperative token is used. + :param labels: the default labels for emitting labeled metrics, for example + resource or operation type, account, container, etc. + """ + + def __init__(self, app, infocache, memcache, + cache_key, cache_ttl, avg_backend_fetch_time, num_tokens=3, + labels=None): + self._logger = app.logger + self._statsd = app.statsd + self._labels = labels or {} + self._infocache = infocache + self._memcache = memcache + self._cache_key = cache_key + self._cache_ttl = cache_ttl + self._token_key = '_cache_token/%s' % cache_key + self._avg_backend_fetch_time = avg_backend_fetch_time + # Time-to-live of the cooperative token when set in memcached, this + # defines the typical worse time that a token request would need to + # fetch the data from the backend when it's busy, default to be 10 + # times of the average time spent on ``do_fetch_backend`` which is + # the ``avg_backend_fetch_time``. + self._token_ttl = avg_backend_fetch_time * 10 + self._num_tokens = num_tokens + # The status of cache operation which sets backend data into Memcached. + self.set_cache_state = None + # Indicates if this request has acquired one token. + self.token_acquired = False + # Indicates if the request has no token and doesn't get enough retries. + self.lack_retries = False + # The HttpResponse object returned by ``do_fetch_backend`` if called. + self.backend_resp = None + # Track if fetch_data has been called to enforce one-shot usage + self._fetch_called = False + + def do_fetch_backend(self): + """ + To fetch data from the backend, needs to be implemented by sub-class. + + :returns: a tuple of (data, response). + """ + raise NotImplementedError + + def cache_encoder(self, data): + """ + To encode data to be stored in Memcached, default to return the data + as is. + + :returns: encoded data. + """ + return data + + def cache_decoder(self, data): + """ + To decode data from Memcached, default to return the data as is. + + :returns: decoded data. + """ + return data + + def _query_backend_and_set_cache(self): + """ + Fetch data from the backend and set the value in the Memcached. + + :returns: value of the data fetched from backend; None if not exist. + """ + data, self.backend_resp = self.do_fetch_backend() + if not data: + return None + + if self._infocache is not None: + self._infocache[self._cache_key] = data + try: + encoded_data = self.cache_encoder(data) + self._memcache.set( + self._cache_key, encoded_data, + time=self._cache_ttl, raise_on_error=True) + except swift.common.exceptions.MemcacheConnectionError: + self.set_cache_state = 'set_error' + else: + self.set_cache_state = 'set' + return data + + def _sleep_and_retry_memcache(self): + """ + Wait for cache value to be set by other requests with intermittent and + limited number of sleeps. With ``token_ttl`` set as 10 times of + ``avg_backend_fetch_time`` and the exponential backoff doubling the + retry interval after each retry, this function will normally sleep and + retry 3 times maximum. + The first retry is 1.5 times of the ``avg_backend_fetch_time``, the + second is 3 times, and the third is 6 times of it, so total is 10.5 + times of the ``avg_backend_fetch_time``. This roughly equals to the + ``token_ttl`` which is 10 times of the ``avg_backend_fetch_time``. + + :returns: value of the data fetched from Memcached; None if not exist. + """ + cur_time = time.time() + cutoff_time = cur_time + self._token_ttl + retry_interval = self._avg_backend_fetch_time * 1.5 + num_waits = 0 + while cur_time < cutoff_time or num_waits < 3: + if cur_time < cutoff_time: + sleep(retry_interval) + num_waits += 1 + else: + # Request has no token and doesn't get enough retries. + self.lack_retries = True + + # To have one last check, when eventlet scheduling didn't give + # this greenthread enough cpu cycles and it didn't have enough + # times of retries. + num_waits = 3 + cache_data = self._memcache.get( + self._cache_key, raise_on_error=False) + if cache_data: + # cache hit. + decoded_data = self.cache_decoder(cache_data) + return decoded_data + # cache miss, retry again with exponential backoff + retry_interval *= 2 + cur_time = time.time() + return None + + def _fetch_data(self): + total_requests = 0 + try: + total_requests = self._memcache.incr( + self._token_key, time=self._token_ttl) + except swift.common.exceptions.MemcacheConnectionError: + self._labels['token'] = 'error' # nosec bandit B105 + + if not total_requests: + # Couldn't connect to the memcache to increment the token key + data = self._query_backend_and_set_cache() + elif total_requests <= self._num_tokens: + # Acquired a cooperative token, go fetching data from backend and + # set the data in memcache. + self.token_acquired = True + data = self._query_backend_and_set_cache() + + if self.set_cache_state == 'set': + # Since the successful finish of one whole cooperative token + # session only depends on a single successful request. So when + # any request with a token finishes both backend fetching and + # memcache set successful, it can remove all cooperative tokens + # of this token session. + self._memcache.delete(self._token_key) + else: + # No token acquired, it means that there are requests in-flight + # which will fetch data form the backend servers and update them in + # cache, let's wait for them to finish with limited retries. + data = self._sleep_and_retry_memcache() + self._labels['lack_retries'] = self.lack_retries + if data is None: + # Still no cache data fetched. + data = self._query_backend_and_set_cache() + return data + + def fetch_data(self): + """ + Coalescing all requests which are asking for the same data from the + backend into a few with cooperative token. + + :returns: value of the data fetched from backend or memcache; None if + not exist. + """ + if self._fetch_called: + raise RuntimeError("fetch_data() can only be called once per " + "CooperativeCachePopulator instance") + self._fetch_called = True + + if not self._num_tokens: + # Cooperative token disabled, fetch from backend. + data = self._query_backend_and_set_cache() + self._labels['token'] = 'disabled' # nosec bandit B105 + else: + data = self._fetch_data() + if 'token' not in self._labels: + self._labels['token'] = 'with_token' if self.token_acquired \ + else 'no_token' + + if self.backend_resp: + self._labels['event'] = 'backend_reqs' + self._labels['status'] = self.backend_resp.status_int + else: + self._labels['event'] = 'cache_served' + if self.set_cache_state: + self._labels['set_cache_state'] = self.set_cache_state + self._statsd.increment('swift_coop_cache', labels=self._labels) + return data + + def write_pickle(obj, dest, tmp=None, pickle_protocol=0): """ Ensure that a pickle file gets written to disk. The file diff --git a/test/debug_logger.py b/test/debug_logger.py index 44f377fab8..a632cfc36c 100644 --- a/test/debug_logger.py +++ b/test/debug_logger.py @@ -76,6 +76,7 @@ class BaseFakeStatsdClient: self.calls = defaultdict(list) self.recording_socket = RecordingSocket() self.counters = defaultdict(int) + self.labeled_stats_counters = defaultdict(int) @property def sendto_calls(self): @@ -94,8 +95,15 @@ class BaseFakeStatsdClient: Hook into base class primitive to track all "counter" metrics """ self.counters[metric] += value + labels = kwargs.get('labels', None) + if labels is not None: + hashable_labels = frozenset(labels.items()) + self.labeled_stats_counters[(metric, hashable_labels)] += value return super()._update_stats(metric, value, *args, **kwargs) + def get_labeled_stats_counts(self): + return self.labeled_stats_counters + # getter for backwards compat def get_stats_counts(self): return self.counters diff --git a/test/unit/__init__.py b/test/unit/__init__.py index d5cf333a7a..afa8388b22 100644 --- a/test/unit/__init__.py +++ b/test/unit/__init__.py @@ -41,6 +41,7 @@ from io import BytesIO from uuid import uuid4 from http.client import HTTPException +from swift.common import memcached from swift.common import storage_policy, swob, utils, exceptions from swift.common.memcached import MemcacheConnectionError from swift.common.storage_policy import (StoragePolicy, ECStoragePolicy, @@ -48,6 +49,8 @@ from swift.common.storage_policy import (StoragePolicy, ECStoragePolicy, from swift.common.utils import Timestamp, md5, close_if_possible, checksum from test import get_config from test.debug_logger import FakeLogger +from test.unit.common.test_memcached import MockedMemcachePool, \ + MockMemcached from swift.common.header_key_dict import HeaderKeyDict from swift.common.ring import Ring, RingData, RingBuilder from swift.obj import server @@ -395,6 +398,14 @@ def track(f): class FakeMemcache(object): + """ + Simple in-memory test helper for basic memcache GET/SET operations. + + This class provides a lightweight mock that stores data in a Python dict. + It does not implement TTL expiration or the full memcache protocol, and + bypasses MemcacheRing's internal handling (e.g., atomic incr operations). + Use this for simple tests that only need basic key-value storage. + """ def __init__(self, error_on_set=None, error_on_get=None): self.store = {} @@ -468,6 +479,65 @@ class FakeMemcache(object): del track +class TestableMemcacheRing(memcached.MemcacheRing): + """ + Real MemcacheRing with injectable errors for testing concurrent scenarios. + + This class wraps the actual MemcacheRing implementation while allowing + controlled injection of connection errors. It preserves MemcacheRing's + internal behavior including atomic operations and protocol compliance. + Use this for testing components that rely on memcache features like + atomic incr/decr or when testing concurrent access patterns. + """ + + def __init__(self, servers, inject_incr_error=None, inject_set_error=None, + inject_get_error=None, inject_del_error=None, **kwargs): + self.inject_incr_error = inject_incr_error + self.inject_set_error = inject_set_error + self.inject_get_error = inject_get_error + self.inject_del_error = inject_del_error + super().__init__(servers, **kwargs) + mock_cache = MockMemcached() + self._client_cache['1.2.3.4:11211'] = MockedMemcachePool( + [(mock_cache, mock_cache)] * 2) + self.set_calls = [] + self.incr_calls = [] + self.get_calls = [] + self.del_calls = [] + + def incr(self, key, delta=1, time=0): + self.incr_calls.append((key, delta, time)) + if self.inject_incr_error: + raise MemcacheConnectionError + return super().incr(key, delta, time) + + def set(self, key, value, serialize=True, time=0, + min_compress_len=0, raise_on_error=False): + self.set_calls.append((key, value, time)) + if self.inject_set_error: + if raise_on_error: + raise MemcacheConnectionError + else: + return None + super().set( + key, value, serialize, time, min_compress_len, raise_on_error) + + def get(self, key, raise_on_error=False): + self.get_calls.append(key) + if self.inject_get_error: + if raise_on_error: + raise MemcacheConnectionError + else: + return None + return super().get(key, raise_on_error) + + def delete(self, key, server_key=None): + self.del_calls.append(key) + if self.inject_del_error: + raise MemcacheConnectionError + super().delete(key, server_key) + + class FakeIterable(object): def __init__(self, values): self.next_call_count = 0 diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index cb5bdfbcf7..af64c60236 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -21,9 +21,10 @@ import io import itertools from swift.common.statsd_client import StatsdClient -from test.debug_logger import debug_logger, FakeStatsdClient +from test.debug_logger import debug_logger, FakeStatsdClient, \ + debug_labeled_statsd_client from test.unit import temptree, make_timestamp_iter, with_tempdir, \ - mock_timestamp_now, FakeIterable + mock_timestamp_now, TestableMemcacheRing, FakeIterable import contextlib import errno @@ -64,7 +65,8 @@ from uuid import uuid4 from swift.common.exceptions import Timeout, LockTimeout, \ ReplicationLockTimeout, MimeInvalid from swift.common import utils -from swift.common.utils import set_swift_dir, md5, ShardRangeList +from swift.common.utils import set_swift_dir, md5, ShardRangeList, \ + CooperativeCachePopulator from swift.common.container_sync_realms import ContainerSyncRealms from swift.common.header_key_dict import HeaderKeyDict from swift.common.storage_policy import POLICIES, reload_storage_policies @@ -122,6 +124,13 @@ class MockOs(object): return getattr(os, name) +class MockApp(object): + + def __init__(self, logger, statsd): + self.logger = logger + self.statsd = statsd + + class MockSys(object): def __init__(self): @@ -2956,6 +2965,891 @@ cluster_dfw1 = http://dfw1.host/v1/ self.assertEqual(os.getppid(), utils.get_ppid(os.getpid())) +class TestCooperativeCachePopulator(unittest.TestCase): + backend_resp = Response(status=200, body=b'response') + + def setUp(self): + self.logger = debug_logger() + conf = { + 'log_statsd_host': 'host', + 'log_statsd_port': 8125, + 'statsd_label_mode': 'dogstatsd', + 'statsd_emit_legacy': True, + } + self.statsd = debug_labeled_statsd_client(conf) + self.memcache = TestableMemcacheRing( + ['1.2.3.4:11211'], logger=self.logger) + self.infocache = {} + self.cache_key = "test_key" + self.token_key = "_cache_token/%s" % self.cache_key + self.cache_ttl = 60 + self.avg_backend_fetch_time = 0.001 + + class MockCachePopulator(CooperativeCachePopulator): + def do_fetch_backend(self): + return "backend data", TestCooperativeCachePopulator.backend_resp + + class DataTransformCachePopulator(CooperativeCachePopulator): + def cache_encoder(self, data): + return data.upper() + + def cache_decoder(self, data): + return data.lower() + + def do_fetch_backend(self): + return "backend data", TestCooperativeCachePopulator.backend_resp + + class DelayedCachePopulator(CooperativeCachePopulator): + def __init__(self, app, infocache, memcache, + cache_key, cache_ttl, avg_backend_fetch_time, + backend_delay=0, + fetch_backend_failure=False, + num_tokens=3, labels=None): + super().__init__( + app, infocache, memcache, cache_key, cache_ttl, + avg_backend_fetch_time, num_tokens, labels=labels + ) + self._backend_delay = backend_delay + self._fetch_backend_failure = fetch_backend_failure + + def do_fetch_backend(self): + if self._backend_delay: + eventlet.sleep(self._backend_delay) + if self._fetch_backend_failure: + return None, FakeResponse(status_int=503) + else: + return "backend data", \ + TestCooperativeCachePopulator.backend_resp + + def test_populator_constructor(self): + obj = self.MockCachePopulator( + MockApp(self.logger, self.statsd), + self.infocache, self.memcache, + self.cache_key, self.cache_ttl, + self.avg_backend_fetch_time, + num_tokens=10, labels={ + 'resource': "test", + } + ) + self.assertEqual(obj._logger, self.logger) + self.assertEqual(obj._labels, { + 'resource': "test", + }) + self.assertEqual(obj._infocache, self.infocache) + self.assertEqual(obj._memcache, self.memcache) + self.assertEqual(obj._cache_key, self.cache_key) + self.assertEqual(obj._cache_ttl, self.cache_ttl) + self.assertEqual(obj._token_key, '_cache_token/%s' % self.cache_key) + self.assertEqual( + obj._avg_backend_fetch_time, self.avg_backend_fetch_time) + self.assertEqual(obj._token_ttl, self.avg_backend_fetch_time * 10) + self.assertEqual(obj._num_tokens, 10) + self.assertEqual(obj.cache_encoder(42), 42) + self.assertEqual(obj.cache_decoder(42), 42) + self.assertIsNone(obj.set_cache_state) + self.assertFalse(obj.token_acquired) + self.assertIsNone(obj.backend_resp) + + def test_populator_num_tokens_zero(self): + populator = self.MockCachePopulator( + MockApp(self.logger, self.statsd), + self.infocache, self.memcache, + self.cache_key, self.cache_ttl, + avg_backend_fetch_time=.1, + num_tokens=0, labels={ + 'resource': "test", + } + ) + data = populator.fetch_data() + self.assertEqual(data, "backend data") + self.assertEqual(populator.backend_resp, self.backend_resp) + self.assertEqual(populator.set_cache_state, "set") + self.assertFalse(populator.token_acquired) + self.assertEqual(self.memcache.get_calls, []) + self.assertEqual(self.infocache[self.cache_key], "backend data") + self.assertEqual(self.memcache.incr_calls, []) + self.assertEqual( + self.memcache.set_calls, + [(self.cache_key, "backend data", self.cache_ttl)] + ) + self.assertEqual(self.memcache.del_calls, []) + stats = self.statsd.get_labeled_stats_counts() + self.assertEqual({ + ('swift_coop_cache', frozenset(( + ('resource', 'test'), + ('event', 'backend_reqs'), + ('token', 'disabled'), + ('set_cache_state', 'set'), + ('status', 200)), + )): 1, + }, stats) + + def test_populator_fetch_data_called_twice(self): + populator = self.MockCachePopulator( + MockApp(self.logger, self.statsd), + self.infocache, self.memcache, + self.cache_key, self.cache_ttl, + self.avg_backend_fetch_time, + ) + data = populator.fetch_data() + self.assertEqual(data, "backend data") + self.assertTrue(populator._fetch_called) + self.assertRaises(RuntimeError, populator.fetch_data) + + def test_first_request_with_token(self): + # Test the first request will acquire the token, fetch data from + # the backend and set it into Memcached. + populator = self.MockCachePopulator( + MockApp(self.logger, self.statsd), + self.infocache, self.memcache, + self.cache_key, self.cache_ttl, + self.avg_backend_fetch_time, + labels={ + 'resource': "test", + } + ) + data = populator.fetch_data() + self.assertEqual(data, "backend data") + self.assertEqual(populator.backend_resp, self.backend_resp) + self.assertEqual(populator.set_cache_state, "set") + self.assertTrue(populator.token_acquired) + self.assertEqual(self.memcache.get_calls, []) + self.assertEqual(self.infocache[self.cache_key], "backend data") + self.assertEqual( + self.memcache.incr_calls, + [(self.token_key, 1, self.avg_backend_fetch_time * 10)] + ) + self.assertEqual( + self.memcache.set_calls, + [(self.cache_key, "backend data", self.cache_ttl)] + ) + self.assertEqual(self.memcache.del_calls, [self.token_key]) + stats = self.statsd.get_labeled_stats_counts() + self.assertEqual({ + ('swift_coop_cache', frozenset(( + ('resource', 'test'), + ('event', 'backend_reqs'), + ('token', 'with_token'), + ('set_cache_state', 'set'), + ('status', 200)), + )): 1, + }, stats) + + def test_first_request_with_token_with_encoder(self): + # Test the processing of first request with cache data encoder and + # decoder. + populator = self.DataTransformCachePopulator( + MockApp(self.logger, self.statsd), + self.infocache, self.memcache, + self.cache_key, self.cache_ttl, + self.avg_backend_fetch_time, + labels={ + 'resource': "test", + } + ) + data = populator.fetch_data() + self.assertEqual(data, "backend data") + self.assertEqual(populator.backend_resp, self.backend_resp) + self.assertEqual(populator.set_cache_state, "set") + self.assertTrue(populator.token_acquired) + self.assertEqual(self.memcache.get_calls, []) + self.assertEqual(self.infocache[self.cache_key], "backend data") + self.assertEqual( + self.memcache.incr_calls, + [(self.token_key, 1, self.avg_backend_fetch_time * 10)] + ) + self.assertEqual( + self.memcache.set_calls, + [(self.cache_key, "BACKEND DATA", self.cache_ttl)] + ) + self.assertEqual(self.memcache.del_calls, [self.token_key]) + stats = self.statsd.get_labeled_stats_counts() + self.assertEqual({ + ('swift_coop_cache', frozenset(( + ('resource', 'test'), + ('event', 'backend_reqs'), + ('token', 'with_token'), + ('set_cache_state', 'set'), + ('status', 200)), + )): 1, + }, stats) + + def test_following_request_with_token(self): + # Test the following request (not the first one) which also acquires + # the token, fetch data from the backend and set it into Memcached. + # Simulate that there are 1 or 2 requests who have acquired the tokens. + prior_reqs_with_token = random.randint(1, 2) + total_requests = self.memcache.incr( + self.token_key, delta=prior_reqs_with_token, time=10) + self.assertEqual(total_requests, prior_reqs_with_token) + # Test the following request to get the token. + self.memcache.incr_calls = [] + populator = self.MockCachePopulator( + MockApp(self.logger, self.statsd), + self.infocache, self.memcache, + self.cache_key, self.cache_ttl, + self.avg_backend_fetch_time, + labels={ + 'resource': "test", + } + ) + data = populator.fetch_data() + self.assertEqual(data, "backend data") + self.assertEqual(populator.backend_resp, self.backend_resp) + self.assertEqual(populator.set_cache_state, "set") + self.assertTrue(populator.token_acquired) + self.assertEqual(self.memcache.get_calls, []) + self.assertEqual(self.infocache[self.cache_key], "backend data") + self.assertEqual( + self.memcache.incr_calls, + [(self.token_key, 1, self.avg_backend_fetch_time * 10)] + ) + self.assertEqual( + self.memcache.set_calls, + [(self.cache_key, "backend data", self.cache_ttl)] + ) + self.assertEqual(self.memcache.del_calls, [self.token_key]) + stats = self.statsd.get_labeled_stats_counts() + self.assertEqual({ + ('swift_coop_cache', frozenset(( + ('resource', 'test'), + ('event', 'backend_reqs'), + ('token', 'with_token'), + ('set_cache_state', 'set'), + ('status', 200)), + )): 1, + }, stats) + + def test_fetch_data_cache_hit_without_token(self): + # Test the request which doesn't acquire the token, then keep sleeping + # and trying to fetch data from the Memcached until it succeeds. + + def test_fetch_data(avg_fetch_time, + total_miss_retries, + expected_sleep_calls): + self.logger.statsd_client.clear() + self.statsd.clear() + num_tokens_per_session = random.randint(1, 3) + retries = 0 + + class CustomizedCache(TestableMemcacheRing): + def get(self, key, raise_on_error=False): + nonlocal retries + retries += 1 + if retries <= total_miss_retries: + value = super().get( + "NOT_EXISTED_YET") + return value + else: + return super().get(key) + + self.memcache = CustomizedCache( + ['1.2.3.4:11211'], logger=self.logger) + # Simulate that there were 'num_tokens_per_session' requests who + # have acquired the tokens. + total_requests = self.memcache.incr( + self.token_key, delta=num_tokens_per_session, time=10) + self.assertEqual(total_requests, num_tokens_per_session) + self.memcache.set(self.cache_key, [1, 2, 3]) + # Test the request without a token + self.memcache.incr_calls = [] + self.memcache.set_calls = [] + populator = self.MockCachePopulator( + MockApp(self.logger, self.statsd), + self.infocache, self.memcache, + self.cache_key, self.cache_ttl, + avg_backend_fetch_time=avg_fetch_time, + num_tokens=num_tokens_per_session, + labels={ + 'resource': "test", + } + ) + populator._token_ttl = 1 + with mock.patch.object(utils, 'sleep') as mock_sleep: + data = populator.fetch_data() + self.assertEqual(expected_sleep_calls, mock_sleep.call_args_list) + self.assertEqual(data, [1, 2, 3]) + self.assertIsNone(populator.backend_resp) + self.assertIsNone(populator.set_cache_state) + self.assertEqual(self.infocache, {}) + self.assertEqual( + self.memcache.incr_calls, + [(self.token_key, 1, populator._token_ttl)] + ) + self.assertEqual(self.memcache.set_calls, []) + self.assertEqual(retries, total_miss_retries + 1) + self.assertEqual( + self.memcache.get_calls, + ['NOT_EXISTED_YET'] * (retries - 1) + [self.cache_key] + ) + self.assertFalse(populator.token_acquired) + stats = self.statsd.get_labeled_stats_counts() + self.assertEqual({ + ('swift_coop_cache', frozenset(( + ('resource', 'test'), + ('event', 'cache_served'), + ('token', 'no_token'), + ('lack_retries', False)), + )): 1, + }, stats) + + test_fetch_data(1.0, 1, [mock.call(1.5), mock.call(3.0)]) + test_fetch_data( + 1.0, 2, [mock.call(1.5), mock.call(3.0), mock.call(6.0)]) + test_fetch_data(2.0, 1, [mock.call(3.0), mock.call(6.0)]) + test_fetch_data( + 2.0, 2, [mock.call(3.0), mock.call(6.0), mock.call(12.0)]) + + def test_fetch_data_cache_miss_without_token(self): + # Test the request which doesn't acquire the token, then keep sleeping + # and trying to fetch data from the Memcached, but enventually all + # retries exhausted with cache misses. + num_tokens_per_session = random.randint(1, 3) + retries = 0 + + class CustomizedCache(TestableMemcacheRing): + def get(self, key, raise_on_error=False): + nonlocal retries + retries += 1 + return super().get("NOT_EXISTED_YET") + + self.memcache = CustomizedCache(['1.2.3.4:11211'], logger=self.logger) + # Simulate that there are 'num_tokens_per_session' requests who have + # acquired the tokens. + total_requests = self.memcache.incr( + self.token_key, delta=num_tokens_per_session, time=10) + self.assertEqual(total_requests, num_tokens_per_session) + self.memcache.set(self.cache_key, "cached data") + # Test the request without a token + self.memcache.incr_calls = [] + self.memcache.set_calls = [] + populator = self.MockCachePopulator( + MockApp(self.logger, self.statsd), + self.infocache, self.memcache, + self.cache_key, self.cache_ttl, + self.avg_backend_fetch_time, + num_tokens_per_session, + labels={ + 'resource': "test", + } + ) + data = populator.fetch_data() + self.assertEqual(data, "backend data") + self.assertEqual(populator.backend_resp, self.backend_resp) + self.assertEqual(populator.set_cache_state, "set") + self.assertFalse(populator.token_acquired) + self.assertEqual(self.infocache[self.cache_key], "backend data") + self.assertEqual( + self.memcache.incr_calls, + [(self.token_key, 1, self.avg_backend_fetch_time * 10)] + ) + self.assertEqual( + self.memcache.set_calls, + [(self.cache_key, "backend data", self.cache_ttl)] + ) + self.assertEqual(retries, 3) + self.assertEqual( + self.memcache.get_calls, ['NOT_EXISTED_YET'] * retries) + self.assertEqual(self.memcache.del_calls, []) + stats = self.statsd.get_labeled_stats_counts() + self.assertEqual({ + ('swift_coop_cache', frozenset(( + ('resource', 'test'), + ('event', 'backend_reqs'), + ('token', 'no_token'), + ('lack_retries', False), + ('set_cache_state', 'set'), + ('status', 200)), + )): 1, + }, stats) + + def test_fetch_data_req_lacks_enough_retries(self): + # Test the request which doesn't acquire the token, then keep sleeping + # and trying to fetch data from the Memcached, but doesn't get enough + # retries. + retries = 0 + + class CustomizedCache(TestableMemcacheRing): + def get(self, key, raise_on_error=False): + nonlocal retries + retries += 1 + return super().get("NOT_EXISTED_YET") + + self.memcache = CustomizedCache(['1.2.3.4:11211'], logger=self.logger) + # Simulate that there are three requests who have acquired the tokens. + total_requests = self.memcache.incr(self.token_key, delta=3, time=10) + self.assertEqual(total_requests, 3) + self.memcache.set(self.cache_key, [1, 2, 3]) + # Test the request without a token + self.memcache.incr_calls = [] + self.memcache.set_calls = [] + populator = self.MockCachePopulator( + MockApp(self.logger, self.statsd), + self.infocache, self.memcache, + self.cache_key, self.cache_ttl, + self.avg_backend_fetch_time, + labels={ + 'resource': "test", + } + ) + with patch('time.time', ) as mock_time: + mock_time.side_effect = itertools.count(4000.99, 1.0) + data = populator.fetch_data() + self.assertEqual(data, "backend data") + self.assertEqual(populator.backend_resp, self.backend_resp) + self.assertEqual(populator.set_cache_state, "set") + self.assertFalse(populator.token_acquired) + self.assertEqual(self.infocache[self.cache_key], "backend data") + self.assertEqual( + self.memcache.incr_calls, + [(self.token_key, 1, self.avg_backend_fetch_time * 10)] + ) + self.assertEqual( + self.memcache.set_calls, + [(self.cache_key, "backend data", self.cache_ttl)] + ) + self.assertEqual(retries, 2) + self.assertEqual( + self.memcache.get_calls, ['NOT_EXISTED_YET'] * 2) + self.assertEqual(self.memcache.del_calls, []) + stats = self.statsd.get_labeled_stats_counts() + self.assertEqual({ + ('swift_coop_cache', frozenset(( + ('resource', 'test'), + ('event', 'backend_reqs'), + ('token', 'no_token'), + ('lack_retries', True), + ('set_cache_state', 'set'), + ('status', 200)), + )): 1, + }, stats) + + def test_get_token_connection_error(self): + # Test the request which couldn't acquire the token due to memcached + # connection error. + self.memcache = TestableMemcacheRing( + ['1.2.3.4:11211'], logger=self.logger, inject_incr_error=True) + populator = self.MockCachePopulator( + MockApp(self.logger, self.statsd), + self.infocache, self.memcache, + self.cache_key, self.cache_ttl, + self.avg_backend_fetch_time, + labels={ + 'resource': "test", + } + ) + data = populator.fetch_data() + self.assertEqual(data, "backend data") + self.assertEqual(populator.backend_resp, self.backend_resp) + self.assertEqual(populator.set_cache_state, "set") + self.assertFalse(populator.token_acquired) + self.assertEqual(self.infocache[self.cache_key], "backend data") + self.assertEqual( + self.memcache.incr_calls, + [(self.token_key, 1, self.avg_backend_fetch_time * 10)] + ) + self.assertEqual( + self.memcache.set_calls, + [(self.cache_key, "backend data", self.cache_ttl)] + ) + self.assertEqual(self.memcache.del_calls, []) + stats = self.statsd.get_labeled_stats_counts() + self.assertEqual({ + ('swift_coop_cache', frozenset(( + ('resource', 'test'), + ('event', 'backend_reqs'), + ('token', 'error'), + ('set_cache_state', 'set'), + ('status', 200)), + )): 1, + }, stats) + + def test_set_data_connection_error(self): + # Test the request which is able to acquire the token, but fails to set + # the fetched backend data into memcached due to memcached connection + # error. + self.memcache = TestableMemcacheRing( + ['1.2.3.4:11211'], logger=self.logger, inject_set_error=True) + populator = self.MockCachePopulator( + MockApp(self.logger, self.statsd), + self.infocache, self.memcache, + self.cache_key, self.cache_ttl, + self.avg_backend_fetch_time, + labels={ + 'resource': "test", + } + ) + data = populator.fetch_data() + self.assertEqual(data, "backend data") + self.assertEqual(populator.backend_resp, self.backend_resp) + self.assertEqual(populator.set_cache_state, "set_error") + self.assertTrue(populator.token_acquired) + self.assertEqual(self.infocache[self.cache_key], "backend data") + self.assertEqual( + self.memcache.incr_calls, + [(self.token_key, 1, self.avg_backend_fetch_time * 10)] + ) + self.assertEqual( + self.memcache.set_calls, + [(self.cache_key, "backend data", self.cache_ttl)] + ) + self.assertEqual(self.memcache.del_calls, []) + stats = self.statsd.get_labeled_stats_counts() + self.assertEqual({ + ('swift_coop_cache', frozenset(( + ('resource', 'test'), + ('event', 'backend_reqs'), + ('token', 'with_token'), + ('set_cache_state', 'set_error'), + ('status', 200)), + )): 1, + }, stats) + + def test_connection_errors_on_gettoken_and_dataset(self): + # Test the request which couldn't acquire the token due to memcached + # connection error, and then couldn't set the backend data into cache + # due to memcached connection error too. + self.memcache = TestableMemcacheRing( + ['1.2.3.4:11211'], logger=self.logger, + inject_incr_error=True, inject_set_error=True) + populator = self.MockCachePopulator( + MockApp(self.logger, self.statsd), + self.infocache, self.memcache, + self.cache_key, self.cache_ttl, + self.avg_backend_fetch_time, + labels={ + 'resource': "test", + } + ) + data = populator.fetch_data() + self.assertEqual(data, "backend data") + self.assertEqual(populator.backend_resp, self.backend_resp) + self.assertEqual(populator.set_cache_state, "set_error") + self.assertFalse(populator.token_acquired) + self.assertEqual(self.infocache[self.cache_key], "backend data") + self.assertEqual( + self.memcache.incr_calls, + [(self.token_key, 1, self.avg_backend_fetch_time * 10)] + ) + self.assertEqual( + self.memcache.set_calls, + [(self.cache_key, "backend data", self.cache_ttl)] + ) + self.assertEqual(self.memcache.del_calls, []) + stats = self.statsd.get_labeled_stats_counts() + self.assertEqual({ + ('swift_coop_cache', frozenset(( + ('resource', 'test'), + ('event', 'backend_reqs'), + ('token', 'error'), + ('set_cache_state', 'set_error'), + ('status', 200)), + )): 1, + }, stats) + + def test_fetch_data_from_cache_connection_error(self): + # Test the request which doesn't acquire the token, then keep sleeping + # and trying to fetch data from the Memcached, but eventually all + # retries exhausted with memcached connection errors. + self.memcache = TestableMemcacheRing( + ['1.2.3.4:11211'], logger=self.logger, inject_get_error=True) + # Simulate that there are three requests who have acquired the tokens. + total_requests = self.memcache.incr(self.token_key, delta=3, time=10) + self.assertEqual(total_requests, 3) + self.memcache.set(self.cache_key, [1, 2, 3]) + # Test the request without a token + self.memcache.incr_calls = [] + self.memcache.set_calls = [] + populator = self.MockCachePopulator( + MockApp(self.logger, self.statsd), + self.infocache, self.memcache, + self.cache_key, self.cache_ttl, + self.avg_backend_fetch_time, + labels={ + 'resource': "test", + } + ) + data = populator.fetch_data() + self.assertEqual(data, "backend data") + self.assertEqual(populator.backend_resp, self.backend_resp) + self.assertEqual(populator.set_cache_state, "set") + self.assertFalse(populator.token_acquired) + self.assertEqual(self.infocache[self.cache_key], "backend data") + self.assertEqual( + self.memcache.incr_calls, + [(self.token_key, 1, self.avg_backend_fetch_time * 10)] + ) + self.assertEqual( + self.memcache.set_calls, + [(self.cache_key, "backend data", self.cache_ttl)] + ) + self.assertEqual( + self.memcache.get_calls[0], self.cache_key) + self.assertEqual(self.memcache.del_calls, []) + stats = self.statsd.get_labeled_stats_counts() + self.assertEqual({ + ('swift_coop_cache', frozenset(( + ('resource', 'test'), + ('event', 'backend_reqs'), + ('token', 'no_token'), + ('lack_retries', False), + ('set_cache_state', 'set'), + ('status', 200)), + )): 1, + }, stats) + + def test_concurrent_requests(self): + # Simulate multiple concurrent threads, each of them issues a + # "fetch_data" request cooperatively. + self.avg_backend_fetch_time = 0.01 + num_processes = 100 + exceptions = [] + + def worker_process(): + # Initialize new populator instance in each process. + populator = self.DelayedCachePopulator( + MockApp(self.logger, self.statsd), + {}, self.memcache, + self.cache_key, self.cache_ttl, + self.avg_backend_fetch_time, + backend_delay=self.avg_backend_fetch_time, + labels={ + 'resource': "test", + } + ) + data = populator.fetch_data() + + try: + # Data retrieved successfully + self.assertEqual(data, "backend data") + if populator.set_cache_state == 'set': + self.assertTrue(populator.token_acquired) + self.assertEqual( + populator._infocache[self.cache_key], "backend data") + self.assertEqual(populator.backend_resp, self.backend_resp) + else: + self.assertEqual(populator._infocache, {}) + self.assertIsNone(populator.backend_resp) + except Exception as e: + exceptions.append(e) + + # Issue those parallel requests "at the same time". + pool = eventlet.GreenPool() + for i in range(num_processes): + pool.spawn(worker_process) + + # Wait for all requests to complete + pool.waitall() + if exceptions: + self.fail(f"Greenthread assertions failed: {exceptions}") + stats = self.statsd.get_labeled_stats_counts() + self.assertEqual({ + ('swift_coop_cache', frozenset(( + ('resource', 'test'), + ('event', 'backend_reqs'), + ('token', 'with_token'), + ('set_cache_state', 'set'), + ('status', 200)), + )): 3, + ('swift_coop_cache', frozenset(( + ('resource', 'test'), + ('event', 'cache_served'), + ('token', 'no_token'), + ('lack_retries', False)), + )): 97 + }, stats) + self.assertEqual( + self.memcache.incr_calls, + [(self.token_key, 1, self.avg_backend_fetch_time * 10)] * 100 + ) + self.assertEqual( + self.memcache.set_calls, + [(self.cache_key, "backend data", self.cache_ttl)] * 3 + ) + self.assertEqual(self.memcache.del_calls, [self.token_key] * 3) + + def test_concurrent_requests_all_token_requests_fail(self): + # Simulate multiple concurrent threads issued into a cooperative token + # session, each thread will issue a "fetch_data" request cooperatively. + # And the first three requests will acquire the token, but fail to get + # data from the backend. This test also demonstrates that even though + # all token requests fail to go through, other requests who arrive at + # the late stage of same token session and won't get a token still + # could be served out of the memcached. + self.avg_backend_fetch_time = 0.1 + counts = { + 'num_backend_success': 0, + 'num_requests_served_from_cache': 0, + 'num_backend_failures': 0, + } + exceptions = [] + + def worker_process(exec_delay, backend_delay, + fetch_backend_failure=False): + # Initialize new populator instance in each process. + populator = self.DelayedCachePopulator( + MockApp(self.logger, self.statsd), + {}, self.memcache, + self.cache_key, self.cache_ttl, + self.avg_backend_fetch_time, + backend_delay, + fetch_backend_failure, + labels={ + 'resource': "test", + } + ) + if exec_delay: + eventlet.sleep(exec_delay) + data = populator.fetch_data() + + try: + if fetch_backend_failure: + self.assertIsNone(data) + else: + self.assertEqual(data, "backend data") + if populator.set_cache_state == 'set': + counts['num_backend_success'] += 1 + self.assertEqual( + populator._infocache[self.cache_key], "backend data") + self.assertEqual(populator.backend_resp, self.backend_resp) + elif not fetch_backend_failure: + counts['num_requests_served_from_cache'] += 1 + self.assertEqual(populator._infocache, {}) + self.assertIsNone(populator.backend_resp) + else: + counts['num_backend_failures'] += 1 + except Exception as e: + exceptions.append(e) + + # Issue those parallel requests at different time within this + # cooperative token session. + pool = eventlet.GreenPool() + # The first three requests will get the token but fails. + for i in range(3): + pool.spawn( + worker_process, 0, self.avg_backend_fetch_time * 15, True) + # The 4th request won't get a token, but after it exits its waiting + # cycles, it will fetch the data from the backend and set the data into + # the memcached. + pool.spawn(worker_process, 0, 0) + # The remaining 16 requests won't get a token, but will be served out + # of the memcached because the 4th request will finish within their + # waiting cycles. + for i in range(16): + pool.spawn( + worker_process, + random.uniform(self.avg_backend_fetch_time * 3, + self.avg_backend_fetch_time * 6), + self.avg_backend_fetch_time + ) + + # Wait for all requests to complete + pool.waitall() + if exceptions: + self.fail(f"Greenthread assertions failed: {exceptions}") + # The first three requests of the first token session failed to get + # data from the backend. + self.assertEqual(counts['num_backend_failures'], 3) + self.assertEqual(counts['num_backend_success'], 1) + self.assertEqual(counts['num_requests_served_from_cache'], 16) + self.assertEqual(len(self.memcache.incr_calls), 20) + self.assertEqual(len(self.memcache.del_calls), 0) + + def test_concurrent_requests_pass_token_ttl(self): + # Simulate multiple concurrent threads across two cooperative token + # sessions, each thread will issue a "fetch_data" request + # cooperatively. The very first three requests which will acquire the + # token, but fail to fetch data from backend. + self.avg_backend_fetch_time = 0.1 + counts = { + 'num_backend_success': 0, + 'num_requests_served_from_cache': 0, + 'num_backend_failures': 0, + } + exceptions = [] + + def worker_process(exec_delay, backend_delay, + fetch_backend_failure=False): + # Initialize new populator instance in each process. + populator = self.DelayedCachePopulator( + MockApp(self.logger, self.statsd), + {}, self.memcache, + self.cache_key, self.cache_ttl, + self.avg_backend_fetch_time, + backend_delay, + fetch_backend_failure, + labels={ + 'resource': "test", + } + ) + if exec_delay: + eventlet.sleep(exec_delay) + data = populator.fetch_data() + + try: + if fetch_backend_failure: + self.assertIsNone(data) + else: + self.assertEqual(data, "backend data") + if populator.set_cache_state == 'set': + counts['num_backend_success'] += 1 + self.assertEqual( + populator._infocache[self.cache_key], "backend data") + self.assertEqual(populator.backend_resp, self.backend_resp) + elif not fetch_backend_failure: + counts['num_requests_served_from_cache'] += 1 + self.assertEqual(populator._infocache, {}) + self.assertIsNone(populator.backend_resp) + else: + counts['num_backend_failures'] += 1 + except Exception as e: + exceptions.append(e) + + # Issue the parallel requests for the first token session. + pool = eventlet.GreenPool() + for i in range(3): + pool.spawn( + worker_process, 0, self.avg_backend_fetch_time * 15, True) + for i in range(17): + pool.spawn( + worker_process, + random.uniform(0, self.avg_backend_fetch_time * 10), + self.avg_backend_fetch_time + ) + + # Issue the parallel requests for the second token session. + for i in range(3): + pool.spawn( + worker_process, + self.avg_backend_fetch_time * 10, + self.avg_backend_fetch_time * 5 + ) + for i in range(17): + pool.spawn( + worker_process, + random.uniform(self.avg_backend_fetch_time * 10, + self.avg_backend_fetch_time * 11), + self.avg_backend_fetch_time + ) + # Wait for all requests to complete + pool.waitall() + if exceptions: + self.fail(f"Greenthread assertions failed: {exceptions}") + # The first three requests of the first token session failed to get + # data from the backend. + self.assertEqual(counts['num_backend_failures'], 3) + self.assertGreaterEqual(counts['num_backend_success'], 3) + self.assertEqual( + counts['num_requests_served_from_cache'], + 40 - counts['num_backend_failures'] - + counts['num_backend_success'] + ) + self.assertEqual(len(self.memcache.incr_calls), 40) + # The first three requests of the second token session will delete the + # token after fetching data from the backend and set it in cache. + self.assertEqual(len(self.memcache.del_calls), 3) + + class TestUnlinkOlder(unittest.TestCase): def setUp(self): @@ -4642,8 +5536,9 @@ This is the body class FakeResponse(object): - def __init__(self, status, headers, body): - self.status = status + def __init__(self, status_int=200, headers=None, body=b''): + self.status_int = status_int + self.status = status_int self.headers = HeaderKeyDict(headers) self.body = BytesIO(body)