From 707a65ab3c2150fad093a904c3a3d099d74fa236 Mon Sep 17 00:00:00 2001 From: Jianjian Huo Date: Mon, 18 Mar 2024 21:05:20 -0700 Subject: [PATCH] common: add memcached based cooperative token mechanism. Memcached based cooperative token is a improved version of ghetto lock, see the description of ghetto lock at here: https://github.com/memcached/memcached/wiki/ProgrammingTricks It's used to avoid the thundering herd situation which many caching users face: given a cache item that is popular and difficult to recreate, in the event of cache misses, users could end up with hundreds (or thousands) of processes slamming the backend database at the same time in an attempt to refill the same cache content. This thundering herd problem not only often leads to unresponsive backend; and also those writes into memcached cause premature cache eviction under memory pressure. With cooperative token, when lots of in-flight callers try to get the cached item specified by key from memcache and get cache misses, only the first few query requests (limited by by ``num_tokens``) will be able get the cooperative tokens by creating or incrementing an internal memcache key, and then those callers with tokens can send backend requests to fetch data from backend servers and be able to set data into memcache; all other cache miss requests without a token should wait for cache filling to finish, instead of all querying the backend servers at the same time. Co-Authored-By: Tim Burke Co-Authored-By: Clay Gerrard Co-Authored-By: Yan Xiao Co-Authored-By: Alistair Coles Signed-off-by: Jianjian Huo Change-Id: I50ff92441c2f2c49b3034644aba59930e8a99589 --- swift/common/utils/__init__.py | 249 ++++++++- test/debug_logger.py | 8 + test/unit/__init__.py | 70 +++ test/unit/common/test_utils.py | 905 ++++++++++++++++++++++++++++++++- 4 files changed, 1226 insertions(+), 6 deletions(-) diff --git a/swift/common/utils/__init__.py b/swift/common/utils/__init__.py index aa32548ad2..22a09c4499 100644 --- a/swift/common/utils/__init__.py +++ b/swift/common/utils/__init__.py @@ -100,7 +100,7 @@ from swift.common.utils.logs import ( # noqa LOG_LINE_DEFAULT_FORMAT, NOTICE, ) -from swift.common.utils.config import ( # noqa +from swift.common.utils.config import ( # noqa TRUE_VALUES, NicerInterpolation, config_true_value, @@ -1493,6 +1493,253 @@ def load_multikey_opts(conf, prefix, allow_none_key=False): return sorted(result) +class CooperativeCachePopulator(object): + """ + A cooperative token is used to avoid the thundering herd problem when + caching is used in front of slow backend(s). Here is it how it works: + + * When lots of in-flight requests try to get the cached item specified by + a key from memcache and get cache misses, only the first few (limited by + ``num_tokens``) of query requests will be able to get a cooperative + token by creating or incrementing an internal memcache key. + * Those callers with tokens can send backend requests to fetch data from + backend servers and set data into memcache. + * All other cache miss requests without a token should wait for cache + filling to finish, instead of all querying the backend servers at the + same time. + * After those requests with a token are done, they will release the token + by deleting the internal cache key, and finish this usage session. As + such, one token usage session starts when the first request gets a token + after cache misses and ends when all requests with a token are done. + + Multiple tokens are available in each usage session in order to increase + fault tolerance in the distributed environment. When one request with a + token hangs or exits, any other request with a token can still set new + fetched data into memcache and finish the current usage session. The + CooperativeCachePopulator class uses ``num_tokens`` to define the maximum + number of tokens during each usage session. The default is 3. + + In the rare case of all 3 requests with tokens failing, the existing usage + session ends after a ``token_ttl`` period is reached and the token key + expires. If this happens then other pending requests which have no token + will exit waiting in the same order as entering the token session, and fall + back to querying the backend and setting data in memcache. After any of + those requests without a token successfully sets data in the memcache, the + following requests in waiting can fetch that data from memcache as they + continue retrying. The ``token_ttl`` is designed to be a back-off time + window for retrying the backend; it will work for the same purpose in this + case as well. When one token session ends after ``token_ttl``, requests + which see a cache miss will start a new cooperative token session. + + :param app: the application instance containing app.logger, app.statsd + :param infocache: the infocache instance. + :param memcache: the memcache instance, must be a valid MemcacheRing. + :param cache_key: the cache key. + :param cache_ttl: time-to-live of the data fetched from backend to set into + memcached. + :param avg_backend_fetch_time: The average time in seconds expected for the + backend fetch operation ``do_fetch_backend`` to complete. This duration + serves as a base unit for calculating exponential backoff delays when + awaiting cache population by other requests, and for determining the + cooperative token's time-to-live (``token_ttl``) which is set to 10x + this value. Should be greater than 0. + :param num_tokens: the minimum limit of tokens per each usage session, + also the minimum limit of in-flight requests allowed to fetch data + from backend. The default is 3, which gives redundancy when any request + with token fails to fetch data from the backend or fails to set new + data into memcached; 0 means no cooperative token is used. + :param labels: the default labels for emitting labeled metrics, for example + resource or operation type, account, container, etc. + """ + + def __init__(self, app, infocache, memcache, + cache_key, cache_ttl, avg_backend_fetch_time, num_tokens=3, + labels=None): + self._logger = app.logger + self._statsd = app.statsd + self._labels = labels or {} + self._infocache = infocache + self._memcache = memcache + self._cache_key = cache_key + self._cache_ttl = cache_ttl + self._token_key = '_cache_token/%s' % cache_key + self._avg_backend_fetch_time = avg_backend_fetch_time + # Time-to-live of the cooperative token when set in memcached, this + # defines the typical worse time that a token request would need to + # fetch the data from the backend when it's busy, default to be 10 + # times of the average time spent on ``do_fetch_backend`` which is + # the ``avg_backend_fetch_time``. + self._token_ttl = avg_backend_fetch_time * 10 + self._num_tokens = num_tokens + # The status of cache operation which sets backend data into Memcached. + self.set_cache_state = None + # Indicates if this request has acquired one token. + self.token_acquired = False + # Indicates if the request has no token and doesn't get enough retries. + self.lack_retries = False + # The HttpResponse object returned by ``do_fetch_backend`` if called. + self.backend_resp = None + # Track if fetch_data has been called to enforce one-shot usage + self._fetch_called = False + + def do_fetch_backend(self): + """ + To fetch data from the backend, needs to be implemented by sub-class. + + :returns: a tuple of (data, response). + """ + raise NotImplementedError + + def cache_encoder(self, data): + """ + To encode data to be stored in Memcached, default to return the data + as is. + + :returns: encoded data. + """ + return data + + def cache_decoder(self, data): + """ + To decode data from Memcached, default to return the data as is. + + :returns: decoded data. + """ + return data + + def _query_backend_and_set_cache(self): + """ + Fetch data from the backend and set the value in the Memcached. + + :returns: value of the data fetched from backend; None if not exist. + """ + data, self.backend_resp = self.do_fetch_backend() + if not data: + return None + + if self._infocache is not None: + self._infocache[self._cache_key] = data + try: + encoded_data = self.cache_encoder(data) + self._memcache.set( + self._cache_key, encoded_data, + time=self._cache_ttl, raise_on_error=True) + except swift.common.exceptions.MemcacheConnectionError: + self.set_cache_state = 'set_error' + else: + self.set_cache_state = 'set' + return data + + def _sleep_and_retry_memcache(self): + """ + Wait for cache value to be set by other requests with intermittent and + limited number of sleeps. With ``token_ttl`` set as 10 times of + ``avg_backend_fetch_time`` and the exponential backoff doubling the + retry interval after each retry, this function will normally sleep and + retry 3 times maximum. + The first retry is 1.5 times of the ``avg_backend_fetch_time``, the + second is 3 times, and the third is 6 times of it, so total is 10.5 + times of the ``avg_backend_fetch_time``. This roughly equals to the + ``token_ttl`` which is 10 times of the ``avg_backend_fetch_time``. + + :returns: value of the data fetched from Memcached; None if not exist. + """ + cur_time = time.time() + cutoff_time = cur_time + self._token_ttl + retry_interval = self._avg_backend_fetch_time * 1.5 + num_waits = 0 + while cur_time < cutoff_time or num_waits < 3: + if cur_time < cutoff_time: + sleep(retry_interval) + num_waits += 1 + else: + # Request has no token and doesn't get enough retries. + self.lack_retries = True + + # To have one last check, when eventlet scheduling didn't give + # this greenthread enough cpu cycles and it didn't have enough + # times of retries. + num_waits = 3 + cache_data = self._memcache.get( + self._cache_key, raise_on_error=False) + if cache_data: + # cache hit. + decoded_data = self.cache_decoder(cache_data) + return decoded_data + # cache miss, retry again with exponential backoff + retry_interval *= 2 + cur_time = time.time() + return None + + def _fetch_data(self): + total_requests = 0 + try: + total_requests = self._memcache.incr( + self._token_key, time=self._token_ttl) + except swift.common.exceptions.MemcacheConnectionError: + self._labels['token'] = 'error' # nosec bandit B105 + + if not total_requests: + # Couldn't connect to the memcache to increment the token key + data = self._query_backend_and_set_cache() + elif total_requests <= self._num_tokens: + # Acquired a cooperative token, go fetching data from backend and + # set the data in memcache. + self.token_acquired = True + data = self._query_backend_and_set_cache() + + if self.set_cache_state == 'set': + # Since the successful finish of one whole cooperative token + # session only depends on a single successful request. So when + # any request with a token finishes both backend fetching and + # memcache set successful, it can remove all cooperative tokens + # of this token session. + self._memcache.delete(self._token_key) + else: + # No token acquired, it means that there are requests in-flight + # which will fetch data form the backend servers and update them in + # cache, let's wait for them to finish with limited retries. + data = self._sleep_and_retry_memcache() + self._labels['lack_retries'] = self.lack_retries + if data is None: + # Still no cache data fetched. + data = self._query_backend_and_set_cache() + return data + + def fetch_data(self): + """ + Coalescing all requests which are asking for the same data from the + backend into a few with cooperative token. + + :returns: value of the data fetched from backend or memcache; None if + not exist. + """ + if self._fetch_called: + raise RuntimeError("fetch_data() can only be called once per " + "CooperativeCachePopulator instance") + self._fetch_called = True + + if not self._num_tokens: + # Cooperative token disabled, fetch from backend. + data = self._query_backend_and_set_cache() + self._labels['token'] = 'disabled' # nosec bandit B105 + else: + data = self._fetch_data() + if 'token' not in self._labels: + self._labels['token'] = 'with_token' if self.token_acquired \ + else 'no_token' + + if self.backend_resp: + self._labels['event'] = 'backend_reqs' + self._labels['status'] = self.backend_resp.status_int + else: + self._labels['event'] = 'cache_served' + if self.set_cache_state: + self._labels['set_cache_state'] = self.set_cache_state + self._statsd.increment('swift_coop_cache', labels=self._labels) + return data + + def write_pickle(obj, dest, tmp=None, pickle_protocol=0): """ Ensure that a pickle file gets written to disk. The file diff --git a/test/debug_logger.py b/test/debug_logger.py index 44f377fab8..a632cfc36c 100644 --- a/test/debug_logger.py +++ b/test/debug_logger.py @@ -76,6 +76,7 @@ class BaseFakeStatsdClient: self.calls = defaultdict(list) self.recording_socket = RecordingSocket() self.counters = defaultdict(int) + self.labeled_stats_counters = defaultdict(int) @property def sendto_calls(self): @@ -94,8 +95,15 @@ class BaseFakeStatsdClient: Hook into base class primitive to track all "counter" metrics """ self.counters[metric] += value + labels = kwargs.get('labels', None) + if labels is not None: + hashable_labels = frozenset(labels.items()) + self.labeled_stats_counters[(metric, hashable_labels)] += value return super()._update_stats(metric, value, *args, **kwargs) + def get_labeled_stats_counts(self): + return self.labeled_stats_counters + # getter for backwards compat def get_stats_counts(self): return self.counters diff --git a/test/unit/__init__.py b/test/unit/__init__.py index d5cf333a7a..afa8388b22 100644 --- a/test/unit/__init__.py +++ b/test/unit/__init__.py @@ -41,6 +41,7 @@ from io import BytesIO from uuid import uuid4 from http.client import HTTPException +from swift.common import memcached from swift.common import storage_policy, swob, utils, exceptions from swift.common.memcached import MemcacheConnectionError from swift.common.storage_policy import (StoragePolicy, ECStoragePolicy, @@ -48,6 +49,8 @@ from swift.common.storage_policy import (StoragePolicy, ECStoragePolicy, from swift.common.utils import Timestamp, md5, close_if_possible, checksum from test import get_config from test.debug_logger import FakeLogger +from test.unit.common.test_memcached import MockedMemcachePool, \ + MockMemcached from swift.common.header_key_dict import HeaderKeyDict from swift.common.ring import Ring, RingData, RingBuilder from swift.obj import server @@ -395,6 +398,14 @@ def track(f): class FakeMemcache(object): + """ + Simple in-memory test helper for basic memcache GET/SET operations. + + This class provides a lightweight mock that stores data in a Python dict. + It does not implement TTL expiration or the full memcache protocol, and + bypasses MemcacheRing's internal handling (e.g., atomic incr operations). + Use this for simple tests that only need basic key-value storage. + """ def __init__(self, error_on_set=None, error_on_get=None): self.store = {} @@ -468,6 +479,65 @@ class FakeMemcache(object): del track +class TestableMemcacheRing(memcached.MemcacheRing): + """ + Real MemcacheRing with injectable errors for testing concurrent scenarios. + + This class wraps the actual MemcacheRing implementation while allowing + controlled injection of connection errors. It preserves MemcacheRing's + internal behavior including atomic operations and protocol compliance. + Use this for testing components that rely on memcache features like + atomic incr/decr or when testing concurrent access patterns. + """ + + def __init__(self, servers, inject_incr_error=None, inject_set_error=None, + inject_get_error=None, inject_del_error=None, **kwargs): + self.inject_incr_error = inject_incr_error + self.inject_set_error = inject_set_error + self.inject_get_error = inject_get_error + self.inject_del_error = inject_del_error + super().__init__(servers, **kwargs) + mock_cache = MockMemcached() + self._client_cache['1.2.3.4:11211'] = MockedMemcachePool( + [(mock_cache, mock_cache)] * 2) + self.set_calls = [] + self.incr_calls = [] + self.get_calls = [] + self.del_calls = [] + + def incr(self, key, delta=1, time=0): + self.incr_calls.append((key, delta, time)) + if self.inject_incr_error: + raise MemcacheConnectionError + return super().incr(key, delta, time) + + def set(self, key, value, serialize=True, time=0, + min_compress_len=0, raise_on_error=False): + self.set_calls.append((key, value, time)) + if self.inject_set_error: + if raise_on_error: + raise MemcacheConnectionError + else: + return None + super().set( + key, value, serialize, time, min_compress_len, raise_on_error) + + def get(self, key, raise_on_error=False): + self.get_calls.append(key) + if self.inject_get_error: + if raise_on_error: + raise MemcacheConnectionError + else: + return None + return super().get(key, raise_on_error) + + def delete(self, key, server_key=None): + self.del_calls.append(key) + if self.inject_del_error: + raise MemcacheConnectionError + super().delete(key, server_key) + + class FakeIterable(object): def __init__(self, values): self.next_call_count = 0 diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index cb5bdfbcf7..af64c60236 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -21,9 +21,10 @@ import io import itertools from swift.common.statsd_client import StatsdClient -from test.debug_logger import debug_logger, FakeStatsdClient +from test.debug_logger import debug_logger, FakeStatsdClient, \ + debug_labeled_statsd_client from test.unit import temptree, make_timestamp_iter, with_tempdir, \ - mock_timestamp_now, FakeIterable + mock_timestamp_now, TestableMemcacheRing, FakeIterable import contextlib import errno @@ -64,7 +65,8 @@ from uuid import uuid4 from swift.common.exceptions import Timeout, LockTimeout, \ ReplicationLockTimeout, MimeInvalid from swift.common import utils -from swift.common.utils import set_swift_dir, md5, ShardRangeList +from swift.common.utils import set_swift_dir, md5, ShardRangeList, \ + CooperativeCachePopulator from swift.common.container_sync_realms import ContainerSyncRealms from swift.common.header_key_dict import HeaderKeyDict from swift.common.storage_policy import POLICIES, reload_storage_policies @@ -122,6 +124,13 @@ class MockOs(object): return getattr(os, name) +class MockApp(object): + + def __init__(self, logger, statsd): + self.logger = logger + self.statsd = statsd + + class MockSys(object): def __init__(self): @@ -2956,6 +2965,891 @@ cluster_dfw1 = http://dfw1.host/v1/ self.assertEqual(os.getppid(), utils.get_ppid(os.getpid())) +class TestCooperativeCachePopulator(unittest.TestCase): + backend_resp = Response(status=200, body=b'response') + + def setUp(self): + self.logger = debug_logger() + conf = { + 'log_statsd_host': 'host', + 'log_statsd_port': 8125, + 'statsd_label_mode': 'dogstatsd', + 'statsd_emit_legacy': True, + } + self.statsd = debug_labeled_statsd_client(conf) + self.memcache = TestableMemcacheRing( + ['1.2.3.4:11211'], logger=self.logger) + self.infocache = {} + self.cache_key = "test_key" + self.token_key = "_cache_token/%s" % self.cache_key + self.cache_ttl = 60 + self.avg_backend_fetch_time = 0.001 + + class MockCachePopulator(CooperativeCachePopulator): + def do_fetch_backend(self): + return "backend data", TestCooperativeCachePopulator.backend_resp + + class DataTransformCachePopulator(CooperativeCachePopulator): + def cache_encoder(self, data): + return data.upper() + + def cache_decoder(self, data): + return data.lower() + + def do_fetch_backend(self): + return "backend data", TestCooperativeCachePopulator.backend_resp + + class DelayedCachePopulator(CooperativeCachePopulator): + def __init__(self, app, infocache, memcache, + cache_key, cache_ttl, avg_backend_fetch_time, + backend_delay=0, + fetch_backend_failure=False, + num_tokens=3, labels=None): + super().__init__( + app, infocache, memcache, cache_key, cache_ttl, + avg_backend_fetch_time, num_tokens, labels=labels + ) + self._backend_delay = backend_delay + self._fetch_backend_failure = fetch_backend_failure + + def do_fetch_backend(self): + if self._backend_delay: + eventlet.sleep(self._backend_delay) + if self._fetch_backend_failure: + return None, FakeResponse(status_int=503) + else: + return "backend data", \ + TestCooperativeCachePopulator.backend_resp + + def test_populator_constructor(self): + obj = self.MockCachePopulator( + MockApp(self.logger, self.statsd), + self.infocache, self.memcache, + self.cache_key, self.cache_ttl, + self.avg_backend_fetch_time, + num_tokens=10, labels={ + 'resource': "test", + } + ) + self.assertEqual(obj._logger, self.logger) + self.assertEqual(obj._labels, { + 'resource': "test", + }) + self.assertEqual(obj._infocache, self.infocache) + self.assertEqual(obj._memcache, self.memcache) + self.assertEqual(obj._cache_key, self.cache_key) + self.assertEqual(obj._cache_ttl, self.cache_ttl) + self.assertEqual(obj._token_key, '_cache_token/%s' % self.cache_key) + self.assertEqual( + obj._avg_backend_fetch_time, self.avg_backend_fetch_time) + self.assertEqual(obj._token_ttl, self.avg_backend_fetch_time * 10) + self.assertEqual(obj._num_tokens, 10) + self.assertEqual(obj.cache_encoder(42), 42) + self.assertEqual(obj.cache_decoder(42), 42) + self.assertIsNone(obj.set_cache_state) + self.assertFalse(obj.token_acquired) + self.assertIsNone(obj.backend_resp) + + def test_populator_num_tokens_zero(self): + populator = self.MockCachePopulator( + MockApp(self.logger, self.statsd), + self.infocache, self.memcache, + self.cache_key, self.cache_ttl, + avg_backend_fetch_time=.1, + num_tokens=0, labels={ + 'resource': "test", + } + ) + data = populator.fetch_data() + self.assertEqual(data, "backend data") + self.assertEqual(populator.backend_resp, self.backend_resp) + self.assertEqual(populator.set_cache_state, "set") + self.assertFalse(populator.token_acquired) + self.assertEqual(self.memcache.get_calls, []) + self.assertEqual(self.infocache[self.cache_key], "backend data") + self.assertEqual(self.memcache.incr_calls, []) + self.assertEqual( + self.memcache.set_calls, + [(self.cache_key, "backend data", self.cache_ttl)] + ) + self.assertEqual(self.memcache.del_calls, []) + stats = self.statsd.get_labeled_stats_counts() + self.assertEqual({ + ('swift_coop_cache', frozenset(( + ('resource', 'test'), + ('event', 'backend_reqs'), + ('token', 'disabled'), + ('set_cache_state', 'set'), + ('status', 200)), + )): 1, + }, stats) + + def test_populator_fetch_data_called_twice(self): + populator = self.MockCachePopulator( + MockApp(self.logger, self.statsd), + self.infocache, self.memcache, + self.cache_key, self.cache_ttl, + self.avg_backend_fetch_time, + ) + data = populator.fetch_data() + self.assertEqual(data, "backend data") + self.assertTrue(populator._fetch_called) + self.assertRaises(RuntimeError, populator.fetch_data) + + def test_first_request_with_token(self): + # Test the first request will acquire the token, fetch data from + # the backend and set it into Memcached. + populator = self.MockCachePopulator( + MockApp(self.logger, self.statsd), + self.infocache, self.memcache, + self.cache_key, self.cache_ttl, + self.avg_backend_fetch_time, + labels={ + 'resource': "test", + } + ) + data = populator.fetch_data() + self.assertEqual(data, "backend data") + self.assertEqual(populator.backend_resp, self.backend_resp) + self.assertEqual(populator.set_cache_state, "set") + self.assertTrue(populator.token_acquired) + self.assertEqual(self.memcache.get_calls, []) + self.assertEqual(self.infocache[self.cache_key], "backend data") + self.assertEqual( + self.memcache.incr_calls, + [(self.token_key, 1, self.avg_backend_fetch_time * 10)] + ) + self.assertEqual( + self.memcache.set_calls, + [(self.cache_key, "backend data", self.cache_ttl)] + ) + self.assertEqual(self.memcache.del_calls, [self.token_key]) + stats = self.statsd.get_labeled_stats_counts() + self.assertEqual({ + ('swift_coop_cache', frozenset(( + ('resource', 'test'), + ('event', 'backend_reqs'), + ('token', 'with_token'), + ('set_cache_state', 'set'), + ('status', 200)), + )): 1, + }, stats) + + def test_first_request_with_token_with_encoder(self): + # Test the processing of first request with cache data encoder and + # decoder. + populator = self.DataTransformCachePopulator( + MockApp(self.logger, self.statsd), + self.infocache, self.memcache, + self.cache_key, self.cache_ttl, + self.avg_backend_fetch_time, + labels={ + 'resource': "test", + } + ) + data = populator.fetch_data() + self.assertEqual(data, "backend data") + self.assertEqual(populator.backend_resp, self.backend_resp) + self.assertEqual(populator.set_cache_state, "set") + self.assertTrue(populator.token_acquired) + self.assertEqual(self.memcache.get_calls, []) + self.assertEqual(self.infocache[self.cache_key], "backend data") + self.assertEqual( + self.memcache.incr_calls, + [(self.token_key, 1, self.avg_backend_fetch_time * 10)] + ) + self.assertEqual( + self.memcache.set_calls, + [(self.cache_key, "BACKEND DATA", self.cache_ttl)] + ) + self.assertEqual(self.memcache.del_calls, [self.token_key]) + stats = self.statsd.get_labeled_stats_counts() + self.assertEqual({ + ('swift_coop_cache', frozenset(( + ('resource', 'test'), + ('event', 'backend_reqs'), + ('token', 'with_token'), + ('set_cache_state', 'set'), + ('status', 200)), + )): 1, + }, stats) + + def test_following_request_with_token(self): + # Test the following request (not the first one) which also acquires + # the token, fetch data from the backend and set it into Memcached. + # Simulate that there are 1 or 2 requests who have acquired the tokens. + prior_reqs_with_token = random.randint(1, 2) + total_requests = self.memcache.incr( + self.token_key, delta=prior_reqs_with_token, time=10) + self.assertEqual(total_requests, prior_reqs_with_token) + # Test the following request to get the token. + self.memcache.incr_calls = [] + populator = self.MockCachePopulator( + MockApp(self.logger, self.statsd), + self.infocache, self.memcache, + self.cache_key, self.cache_ttl, + self.avg_backend_fetch_time, + labels={ + 'resource': "test", + } + ) + data = populator.fetch_data() + self.assertEqual(data, "backend data") + self.assertEqual(populator.backend_resp, self.backend_resp) + self.assertEqual(populator.set_cache_state, "set") + self.assertTrue(populator.token_acquired) + self.assertEqual(self.memcache.get_calls, []) + self.assertEqual(self.infocache[self.cache_key], "backend data") + self.assertEqual( + self.memcache.incr_calls, + [(self.token_key, 1, self.avg_backend_fetch_time * 10)] + ) + self.assertEqual( + self.memcache.set_calls, + [(self.cache_key, "backend data", self.cache_ttl)] + ) + self.assertEqual(self.memcache.del_calls, [self.token_key]) + stats = self.statsd.get_labeled_stats_counts() + self.assertEqual({ + ('swift_coop_cache', frozenset(( + ('resource', 'test'), + ('event', 'backend_reqs'), + ('token', 'with_token'), + ('set_cache_state', 'set'), + ('status', 200)), + )): 1, + }, stats) + + def test_fetch_data_cache_hit_without_token(self): + # Test the request which doesn't acquire the token, then keep sleeping + # and trying to fetch data from the Memcached until it succeeds. + + def test_fetch_data(avg_fetch_time, + total_miss_retries, + expected_sleep_calls): + self.logger.statsd_client.clear() + self.statsd.clear() + num_tokens_per_session = random.randint(1, 3) + retries = 0 + + class CustomizedCache(TestableMemcacheRing): + def get(self, key, raise_on_error=False): + nonlocal retries + retries += 1 + if retries <= total_miss_retries: + value = super().get( + "NOT_EXISTED_YET") + return value + else: + return super().get(key) + + self.memcache = CustomizedCache( + ['1.2.3.4:11211'], logger=self.logger) + # Simulate that there were 'num_tokens_per_session' requests who + # have acquired the tokens. + total_requests = self.memcache.incr( + self.token_key, delta=num_tokens_per_session, time=10) + self.assertEqual(total_requests, num_tokens_per_session) + self.memcache.set(self.cache_key, [1, 2, 3]) + # Test the request without a token + self.memcache.incr_calls = [] + self.memcache.set_calls = [] + populator = self.MockCachePopulator( + MockApp(self.logger, self.statsd), + self.infocache, self.memcache, + self.cache_key, self.cache_ttl, + avg_backend_fetch_time=avg_fetch_time, + num_tokens=num_tokens_per_session, + labels={ + 'resource': "test", + } + ) + populator._token_ttl = 1 + with mock.patch.object(utils, 'sleep') as mock_sleep: + data = populator.fetch_data() + self.assertEqual(expected_sleep_calls, mock_sleep.call_args_list) + self.assertEqual(data, [1, 2, 3]) + self.assertIsNone(populator.backend_resp) + self.assertIsNone(populator.set_cache_state) + self.assertEqual(self.infocache, {}) + self.assertEqual( + self.memcache.incr_calls, + [(self.token_key, 1, populator._token_ttl)] + ) + self.assertEqual(self.memcache.set_calls, []) + self.assertEqual(retries, total_miss_retries + 1) + self.assertEqual( + self.memcache.get_calls, + ['NOT_EXISTED_YET'] * (retries - 1) + [self.cache_key] + ) + self.assertFalse(populator.token_acquired) + stats = self.statsd.get_labeled_stats_counts() + self.assertEqual({ + ('swift_coop_cache', frozenset(( + ('resource', 'test'), + ('event', 'cache_served'), + ('token', 'no_token'), + ('lack_retries', False)), + )): 1, + }, stats) + + test_fetch_data(1.0, 1, [mock.call(1.5), mock.call(3.0)]) + test_fetch_data( + 1.0, 2, [mock.call(1.5), mock.call(3.0), mock.call(6.0)]) + test_fetch_data(2.0, 1, [mock.call(3.0), mock.call(6.0)]) + test_fetch_data( + 2.0, 2, [mock.call(3.0), mock.call(6.0), mock.call(12.0)]) + + def test_fetch_data_cache_miss_without_token(self): + # Test the request which doesn't acquire the token, then keep sleeping + # and trying to fetch data from the Memcached, but enventually all + # retries exhausted with cache misses. + num_tokens_per_session = random.randint(1, 3) + retries = 0 + + class CustomizedCache(TestableMemcacheRing): + def get(self, key, raise_on_error=False): + nonlocal retries + retries += 1 + return super().get("NOT_EXISTED_YET") + + self.memcache = CustomizedCache(['1.2.3.4:11211'], logger=self.logger) + # Simulate that there are 'num_tokens_per_session' requests who have + # acquired the tokens. + total_requests = self.memcache.incr( + self.token_key, delta=num_tokens_per_session, time=10) + self.assertEqual(total_requests, num_tokens_per_session) + self.memcache.set(self.cache_key, "cached data") + # Test the request without a token + self.memcache.incr_calls = [] + self.memcache.set_calls = [] + populator = self.MockCachePopulator( + MockApp(self.logger, self.statsd), + self.infocache, self.memcache, + self.cache_key, self.cache_ttl, + self.avg_backend_fetch_time, + num_tokens_per_session, + labels={ + 'resource': "test", + } + ) + data = populator.fetch_data() + self.assertEqual(data, "backend data") + self.assertEqual(populator.backend_resp, self.backend_resp) + self.assertEqual(populator.set_cache_state, "set") + self.assertFalse(populator.token_acquired) + self.assertEqual(self.infocache[self.cache_key], "backend data") + self.assertEqual( + self.memcache.incr_calls, + [(self.token_key, 1, self.avg_backend_fetch_time * 10)] + ) + self.assertEqual( + self.memcache.set_calls, + [(self.cache_key, "backend data", self.cache_ttl)] + ) + self.assertEqual(retries, 3) + self.assertEqual( + self.memcache.get_calls, ['NOT_EXISTED_YET'] * retries) + self.assertEqual(self.memcache.del_calls, []) + stats = self.statsd.get_labeled_stats_counts() + self.assertEqual({ + ('swift_coop_cache', frozenset(( + ('resource', 'test'), + ('event', 'backend_reqs'), + ('token', 'no_token'), + ('lack_retries', False), + ('set_cache_state', 'set'), + ('status', 200)), + )): 1, + }, stats) + + def test_fetch_data_req_lacks_enough_retries(self): + # Test the request which doesn't acquire the token, then keep sleeping + # and trying to fetch data from the Memcached, but doesn't get enough + # retries. + retries = 0 + + class CustomizedCache(TestableMemcacheRing): + def get(self, key, raise_on_error=False): + nonlocal retries + retries += 1 + return super().get("NOT_EXISTED_YET") + + self.memcache = CustomizedCache(['1.2.3.4:11211'], logger=self.logger) + # Simulate that there are three requests who have acquired the tokens. + total_requests = self.memcache.incr(self.token_key, delta=3, time=10) + self.assertEqual(total_requests, 3) + self.memcache.set(self.cache_key, [1, 2, 3]) + # Test the request without a token + self.memcache.incr_calls = [] + self.memcache.set_calls = [] + populator = self.MockCachePopulator( + MockApp(self.logger, self.statsd), + self.infocache, self.memcache, + self.cache_key, self.cache_ttl, + self.avg_backend_fetch_time, + labels={ + 'resource': "test", + } + ) + with patch('time.time', ) as mock_time: + mock_time.side_effect = itertools.count(4000.99, 1.0) + data = populator.fetch_data() + self.assertEqual(data, "backend data") + self.assertEqual(populator.backend_resp, self.backend_resp) + self.assertEqual(populator.set_cache_state, "set") + self.assertFalse(populator.token_acquired) + self.assertEqual(self.infocache[self.cache_key], "backend data") + self.assertEqual( + self.memcache.incr_calls, + [(self.token_key, 1, self.avg_backend_fetch_time * 10)] + ) + self.assertEqual( + self.memcache.set_calls, + [(self.cache_key, "backend data", self.cache_ttl)] + ) + self.assertEqual(retries, 2) + self.assertEqual( + self.memcache.get_calls, ['NOT_EXISTED_YET'] * 2) + self.assertEqual(self.memcache.del_calls, []) + stats = self.statsd.get_labeled_stats_counts() + self.assertEqual({ + ('swift_coop_cache', frozenset(( + ('resource', 'test'), + ('event', 'backend_reqs'), + ('token', 'no_token'), + ('lack_retries', True), + ('set_cache_state', 'set'), + ('status', 200)), + )): 1, + }, stats) + + def test_get_token_connection_error(self): + # Test the request which couldn't acquire the token due to memcached + # connection error. + self.memcache = TestableMemcacheRing( + ['1.2.3.4:11211'], logger=self.logger, inject_incr_error=True) + populator = self.MockCachePopulator( + MockApp(self.logger, self.statsd), + self.infocache, self.memcache, + self.cache_key, self.cache_ttl, + self.avg_backend_fetch_time, + labels={ + 'resource': "test", + } + ) + data = populator.fetch_data() + self.assertEqual(data, "backend data") + self.assertEqual(populator.backend_resp, self.backend_resp) + self.assertEqual(populator.set_cache_state, "set") + self.assertFalse(populator.token_acquired) + self.assertEqual(self.infocache[self.cache_key], "backend data") + self.assertEqual( + self.memcache.incr_calls, + [(self.token_key, 1, self.avg_backend_fetch_time * 10)] + ) + self.assertEqual( + self.memcache.set_calls, + [(self.cache_key, "backend data", self.cache_ttl)] + ) + self.assertEqual(self.memcache.del_calls, []) + stats = self.statsd.get_labeled_stats_counts() + self.assertEqual({ + ('swift_coop_cache', frozenset(( + ('resource', 'test'), + ('event', 'backend_reqs'), + ('token', 'error'), + ('set_cache_state', 'set'), + ('status', 200)), + )): 1, + }, stats) + + def test_set_data_connection_error(self): + # Test the request which is able to acquire the token, but fails to set + # the fetched backend data into memcached due to memcached connection + # error. + self.memcache = TestableMemcacheRing( + ['1.2.3.4:11211'], logger=self.logger, inject_set_error=True) + populator = self.MockCachePopulator( + MockApp(self.logger, self.statsd), + self.infocache, self.memcache, + self.cache_key, self.cache_ttl, + self.avg_backend_fetch_time, + labels={ + 'resource': "test", + } + ) + data = populator.fetch_data() + self.assertEqual(data, "backend data") + self.assertEqual(populator.backend_resp, self.backend_resp) + self.assertEqual(populator.set_cache_state, "set_error") + self.assertTrue(populator.token_acquired) + self.assertEqual(self.infocache[self.cache_key], "backend data") + self.assertEqual( + self.memcache.incr_calls, + [(self.token_key, 1, self.avg_backend_fetch_time * 10)] + ) + self.assertEqual( + self.memcache.set_calls, + [(self.cache_key, "backend data", self.cache_ttl)] + ) + self.assertEqual(self.memcache.del_calls, []) + stats = self.statsd.get_labeled_stats_counts() + self.assertEqual({ + ('swift_coop_cache', frozenset(( + ('resource', 'test'), + ('event', 'backend_reqs'), + ('token', 'with_token'), + ('set_cache_state', 'set_error'), + ('status', 200)), + )): 1, + }, stats) + + def test_connection_errors_on_gettoken_and_dataset(self): + # Test the request which couldn't acquire the token due to memcached + # connection error, and then couldn't set the backend data into cache + # due to memcached connection error too. + self.memcache = TestableMemcacheRing( + ['1.2.3.4:11211'], logger=self.logger, + inject_incr_error=True, inject_set_error=True) + populator = self.MockCachePopulator( + MockApp(self.logger, self.statsd), + self.infocache, self.memcache, + self.cache_key, self.cache_ttl, + self.avg_backend_fetch_time, + labels={ + 'resource': "test", + } + ) + data = populator.fetch_data() + self.assertEqual(data, "backend data") + self.assertEqual(populator.backend_resp, self.backend_resp) + self.assertEqual(populator.set_cache_state, "set_error") + self.assertFalse(populator.token_acquired) + self.assertEqual(self.infocache[self.cache_key], "backend data") + self.assertEqual( + self.memcache.incr_calls, + [(self.token_key, 1, self.avg_backend_fetch_time * 10)] + ) + self.assertEqual( + self.memcache.set_calls, + [(self.cache_key, "backend data", self.cache_ttl)] + ) + self.assertEqual(self.memcache.del_calls, []) + stats = self.statsd.get_labeled_stats_counts() + self.assertEqual({ + ('swift_coop_cache', frozenset(( + ('resource', 'test'), + ('event', 'backend_reqs'), + ('token', 'error'), + ('set_cache_state', 'set_error'), + ('status', 200)), + )): 1, + }, stats) + + def test_fetch_data_from_cache_connection_error(self): + # Test the request which doesn't acquire the token, then keep sleeping + # and trying to fetch data from the Memcached, but eventually all + # retries exhausted with memcached connection errors. + self.memcache = TestableMemcacheRing( + ['1.2.3.4:11211'], logger=self.logger, inject_get_error=True) + # Simulate that there are three requests who have acquired the tokens. + total_requests = self.memcache.incr(self.token_key, delta=3, time=10) + self.assertEqual(total_requests, 3) + self.memcache.set(self.cache_key, [1, 2, 3]) + # Test the request without a token + self.memcache.incr_calls = [] + self.memcache.set_calls = [] + populator = self.MockCachePopulator( + MockApp(self.logger, self.statsd), + self.infocache, self.memcache, + self.cache_key, self.cache_ttl, + self.avg_backend_fetch_time, + labels={ + 'resource': "test", + } + ) + data = populator.fetch_data() + self.assertEqual(data, "backend data") + self.assertEqual(populator.backend_resp, self.backend_resp) + self.assertEqual(populator.set_cache_state, "set") + self.assertFalse(populator.token_acquired) + self.assertEqual(self.infocache[self.cache_key], "backend data") + self.assertEqual( + self.memcache.incr_calls, + [(self.token_key, 1, self.avg_backend_fetch_time * 10)] + ) + self.assertEqual( + self.memcache.set_calls, + [(self.cache_key, "backend data", self.cache_ttl)] + ) + self.assertEqual( + self.memcache.get_calls[0], self.cache_key) + self.assertEqual(self.memcache.del_calls, []) + stats = self.statsd.get_labeled_stats_counts() + self.assertEqual({ + ('swift_coop_cache', frozenset(( + ('resource', 'test'), + ('event', 'backend_reqs'), + ('token', 'no_token'), + ('lack_retries', False), + ('set_cache_state', 'set'), + ('status', 200)), + )): 1, + }, stats) + + def test_concurrent_requests(self): + # Simulate multiple concurrent threads, each of them issues a + # "fetch_data" request cooperatively. + self.avg_backend_fetch_time = 0.01 + num_processes = 100 + exceptions = [] + + def worker_process(): + # Initialize new populator instance in each process. + populator = self.DelayedCachePopulator( + MockApp(self.logger, self.statsd), + {}, self.memcache, + self.cache_key, self.cache_ttl, + self.avg_backend_fetch_time, + backend_delay=self.avg_backend_fetch_time, + labels={ + 'resource': "test", + } + ) + data = populator.fetch_data() + + try: + # Data retrieved successfully + self.assertEqual(data, "backend data") + if populator.set_cache_state == 'set': + self.assertTrue(populator.token_acquired) + self.assertEqual( + populator._infocache[self.cache_key], "backend data") + self.assertEqual(populator.backend_resp, self.backend_resp) + else: + self.assertEqual(populator._infocache, {}) + self.assertIsNone(populator.backend_resp) + except Exception as e: + exceptions.append(e) + + # Issue those parallel requests "at the same time". + pool = eventlet.GreenPool() + for i in range(num_processes): + pool.spawn(worker_process) + + # Wait for all requests to complete + pool.waitall() + if exceptions: + self.fail(f"Greenthread assertions failed: {exceptions}") + stats = self.statsd.get_labeled_stats_counts() + self.assertEqual({ + ('swift_coop_cache', frozenset(( + ('resource', 'test'), + ('event', 'backend_reqs'), + ('token', 'with_token'), + ('set_cache_state', 'set'), + ('status', 200)), + )): 3, + ('swift_coop_cache', frozenset(( + ('resource', 'test'), + ('event', 'cache_served'), + ('token', 'no_token'), + ('lack_retries', False)), + )): 97 + }, stats) + self.assertEqual( + self.memcache.incr_calls, + [(self.token_key, 1, self.avg_backend_fetch_time * 10)] * 100 + ) + self.assertEqual( + self.memcache.set_calls, + [(self.cache_key, "backend data", self.cache_ttl)] * 3 + ) + self.assertEqual(self.memcache.del_calls, [self.token_key] * 3) + + def test_concurrent_requests_all_token_requests_fail(self): + # Simulate multiple concurrent threads issued into a cooperative token + # session, each thread will issue a "fetch_data" request cooperatively. + # And the first three requests will acquire the token, but fail to get + # data from the backend. This test also demonstrates that even though + # all token requests fail to go through, other requests who arrive at + # the late stage of same token session and won't get a token still + # could be served out of the memcached. + self.avg_backend_fetch_time = 0.1 + counts = { + 'num_backend_success': 0, + 'num_requests_served_from_cache': 0, + 'num_backend_failures': 0, + } + exceptions = [] + + def worker_process(exec_delay, backend_delay, + fetch_backend_failure=False): + # Initialize new populator instance in each process. + populator = self.DelayedCachePopulator( + MockApp(self.logger, self.statsd), + {}, self.memcache, + self.cache_key, self.cache_ttl, + self.avg_backend_fetch_time, + backend_delay, + fetch_backend_failure, + labels={ + 'resource': "test", + } + ) + if exec_delay: + eventlet.sleep(exec_delay) + data = populator.fetch_data() + + try: + if fetch_backend_failure: + self.assertIsNone(data) + else: + self.assertEqual(data, "backend data") + if populator.set_cache_state == 'set': + counts['num_backend_success'] += 1 + self.assertEqual( + populator._infocache[self.cache_key], "backend data") + self.assertEqual(populator.backend_resp, self.backend_resp) + elif not fetch_backend_failure: + counts['num_requests_served_from_cache'] += 1 + self.assertEqual(populator._infocache, {}) + self.assertIsNone(populator.backend_resp) + else: + counts['num_backend_failures'] += 1 + except Exception as e: + exceptions.append(e) + + # Issue those parallel requests at different time within this + # cooperative token session. + pool = eventlet.GreenPool() + # The first three requests will get the token but fails. + for i in range(3): + pool.spawn( + worker_process, 0, self.avg_backend_fetch_time * 15, True) + # The 4th request won't get a token, but after it exits its waiting + # cycles, it will fetch the data from the backend and set the data into + # the memcached. + pool.spawn(worker_process, 0, 0) + # The remaining 16 requests won't get a token, but will be served out + # of the memcached because the 4th request will finish within their + # waiting cycles. + for i in range(16): + pool.spawn( + worker_process, + random.uniform(self.avg_backend_fetch_time * 3, + self.avg_backend_fetch_time * 6), + self.avg_backend_fetch_time + ) + + # Wait for all requests to complete + pool.waitall() + if exceptions: + self.fail(f"Greenthread assertions failed: {exceptions}") + # The first three requests of the first token session failed to get + # data from the backend. + self.assertEqual(counts['num_backend_failures'], 3) + self.assertEqual(counts['num_backend_success'], 1) + self.assertEqual(counts['num_requests_served_from_cache'], 16) + self.assertEqual(len(self.memcache.incr_calls), 20) + self.assertEqual(len(self.memcache.del_calls), 0) + + def test_concurrent_requests_pass_token_ttl(self): + # Simulate multiple concurrent threads across two cooperative token + # sessions, each thread will issue a "fetch_data" request + # cooperatively. The very first three requests which will acquire the + # token, but fail to fetch data from backend. + self.avg_backend_fetch_time = 0.1 + counts = { + 'num_backend_success': 0, + 'num_requests_served_from_cache': 0, + 'num_backend_failures': 0, + } + exceptions = [] + + def worker_process(exec_delay, backend_delay, + fetch_backend_failure=False): + # Initialize new populator instance in each process. + populator = self.DelayedCachePopulator( + MockApp(self.logger, self.statsd), + {}, self.memcache, + self.cache_key, self.cache_ttl, + self.avg_backend_fetch_time, + backend_delay, + fetch_backend_failure, + labels={ + 'resource': "test", + } + ) + if exec_delay: + eventlet.sleep(exec_delay) + data = populator.fetch_data() + + try: + if fetch_backend_failure: + self.assertIsNone(data) + else: + self.assertEqual(data, "backend data") + if populator.set_cache_state == 'set': + counts['num_backend_success'] += 1 + self.assertEqual( + populator._infocache[self.cache_key], "backend data") + self.assertEqual(populator.backend_resp, self.backend_resp) + elif not fetch_backend_failure: + counts['num_requests_served_from_cache'] += 1 + self.assertEqual(populator._infocache, {}) + self.assertIsNone(populator.backend_resp) + else: + counts['num_backend_failures'] += 1 + except Exception as e: + exceptions.append(e) + + # Issue the parallel requests for the first token session. + pool = eventlet.GreenPool() + for i in range(3): + pool.spawn( + worker_process, 0, self.avg_backend_fetch_time * 15, True) + for i in range(17): + pool.spawn( + worker_process, + random.uniform(0, self.avg_backend_fetch_time * 10), + self.avg_backend_fetch_time + ) + + # Issue the parallel requests for the second token session. + for i in range(3): + pool.spawn( + worker_process, + self.avg_backend_fetch_time * 10, + self.avg_backend_fetch_time * 5 + ) + for i in range(17): + pool.spawn( + worker_process, + random.uniform(self.avg_backend_fetch_time * 10, + self.avg_backend_fetch_time * 11), + self.avg_backend_fetch_time + ) + # Wait for all requests to complete + pool.waitall() + if exceptions: + self.fail(f"Greenthread assertions failed: {exceptions}") + # The first three requests of the first token session failed to get + # data from the backend. + self.assertEqual(counts['num_backend_failures'], 3) + self.assertGreaterEqual(counts['num_backend_success'], 3) + self.assertEqual( + counts['num_requests_served_from_cache'], + 40 - counts['num_backend_failures'] - + counts['num_backend_success'] + ) + self.assertEqual(len(self.memcache.incr_calls), 40) + # The first three requests of the second token session will delete the + # token after fetching data from the backend and set it in cache. + self.assertEqual(len(self.memcache.del_calls), 3) + + class TestUnlinkOlder(unittest.TestCase): def setUp(self): @@ -4642,8 +5536,9 @@ This is the body class FakeResponse(object): - def __init__(self, status, headers, body): - self.status = status + def __init__(self, status_int=200, headers=None, body=b''): + self.status_int = status_int + self.status = status_int self.headers = HeaderKeyDict(headers) self.body = BytesIO(body)