diff --git a/swift/proxy/controllers/base.py b/swift/proxy/controllers/base.py index a6a37e1388..8c40072e7b 100644 --- a/swift/proxy/controllers/base.py +++ b/swift/proxy/controllers/base.py @@ -40,13 +40,12 @@ from eventlet import sleep from eventlet.timeout import Timeout import six -from swift.common.memcached import MemcacheConnectionError from swift.common.wsgi import make_pre_authed_env, make_pre_authed_request from swift.common.utils import Timestamp, WatchdogTimeout, config_true_value, \ public, split_path, list_from_csv, GreenthreadSafeIterator, \ GreenAsyncPile, quorum_size, parse_content_type, drain_and_close, \ - document_iters_to_http_response_body, ShardRange, find_shard_range, \ - cache_from_env, MetricsPrefixLoggerAdapter + document_iters_to_http_response_body, ShardRange, cache_from_env, \ + MetricsPrefixLoggerAdapter from swift.common.bufferedhttp import http_connect from swift.common import constraints from swift.common.exceptions import ChunkReadTimeout, ChunkWriteTimeout, \ @@ -2399,86 +2398,3 @@ class Controller(object): listing, response = self._get_container_listing( req, account, container, headers=headers, params=params) return self._parse_shard_ranges(req, listing, response), response - - def _get_cached_updating_shard_ranges( - self, infocache, memcache, cache_key): - """ - Fetch cached shard ranges from infocache and memcache. - - :param infocache: the infocache instance. - :param memcache: an instance of a memcache client, - :class:`swift.common.memcached.MemcacheRing`. - :param cache_key: the cache key for both infocache and memcache. - :return: a tuple of (list of shard ranges in dict format, cache state) - """ - cached_ranges = infocache.get(cache_key) - if cached_ranges: - cache_state = 'infocache_hit' - else: - if memcache: - skip_chance = \ - self.app.container_updating_shard_ranges_skip_cache - if skip_chance and random.random() < skip_chance: - cache_state = 'skip' - else: - try: - cached_ranges = memcache.get( - cache_key, raise_on_error=True) - cache_state = 'hit' if cached_ranges else 'miss' - except MemcacheConnectionError: - cache_state = 'error' - else: - cache_state = 'disabled' - cached_ranges = cached_ranges or [] - return cached_ranges, cache_state - - def _get_update_shard(self, req, account, container, obj): - """ - Find the appropriate shard range for an object update. - - Note that this fetches and caches (in both the per-request infocache - and memcache, if available) all shard ranges for the given root - container so we won't have to contact the container DB for every write. - - :param req: original Request instance. - :param account: account from which shard ranges should be fetched. - :param container: container from which shard ranges should be fetched. - :param obj: object getting updated. - :return: an instance of :class:`swift.common.utils.ShardRange`, - or None if the update should go back to the root - """ - if not self.app.recheck_updating_shard_ranges: - # caching is disabled - cache_state = 'disabled' - # legacy behavior requests container server for includes=obj - shard_ranges, response = self._get_shard_ranges( - req, account, container, states='updating', includes=obj) - else: - # try to get from cache - response = None - cache_key = get_cache_key(account, container, shard='updating') - infocache = req.environ.setdefault('swift.infocache', {}) - memcache = cache_from_env(req.environ, True) - (cached_ranges, cache_state - ) = self._get_cached_updating_shard_ranges( - infocache, memcache, cache_key) - if cached_ranges: - # found cached shard ranges in either infocache or memcache - infocache[cache_key] = tuple(cached_ranges) - shard_ranges = [ShardRange.from_dict(shard_range) - for shard_range in cached_ranges] - else: - # pull full set of updating shards from backend - shard_ranges, response = self._get_shard_ranges( - req, account, container, states='updating') - if shard_ranges: - cached_ranges = [dict(sr) for sr in shard_ranges] - infocache[cache_key] = tuple(cached_ranges) - if memcache: - memcache.set( - cache_key, cached_ranges, - time=self.app.recheck_updating_shard_ranges) - - record_cache_op_metrics( - self.logger, 'shard_updating', cache_state, response) - return find_shard_range(obj, shard_ranges or []) diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index 4d83b2b26a..c95e51b2b1 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -47,7 +47,8 @@ from swift.common.utils import ( GreenAsyncPile, GreenthreadSafeIterator, Timestamp, WatchdogTimeout, normalize_delete_at_timestamp, public, get_expirer_container, document_iters_to_http_response_body, parse_content_range, - quorum_size, reiterate, close_if_possible, safe_json_loads, md5) + quorum_size, reiterate, close_if_possible, safe_json_loads, md5, + ShardRange, find_shard_range, cache_from_env) from swift.common.bufferedhttp import http_connect from swift.common.constraints import check_metadata, check_object_creation from swift.common import constraints @@ -62,11 +63,12 @@ from swift.common.http import ( HTTP_SERVICE_UNAVAILABLE, HTTP_INSUFFICIENT_STORAGE, HTTP_PRECONDITION_FAILED, HTTP_CONFLICT, HTTP_UNPROCESSABLE_ENTITY, HTTP_REQUESTED_RANGE_NOT_SATISFIABLE, HTTP_NOT_FOUND) +from swift.common.memcached import MemcacheConnectionError from swift.common.storage_policy import (POLICIES, REPL_POLICY, EC_POLICY, ECDriverError, PolicyError) from swift.proxy.controllers.base import Controller, delay_denial, \ cors_validation, update_headers, bytes_to_skip, close_swift_conn, \ - ByteCountEnforcer + ByteCountEnforcer, record_cache_op_metrics, get_cache_key from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPNotFound, \ HTTPPreconditionFailed, HTTPRequestEntityTooLarge, HTTPRequestTimeout, \ HTTPServerError, HTTPServiceUnavailable, HTTPClientDisconnect, \ @@ -274,6 +276,89 @@ class BaseObjectController(Controller): """Handler for HTTP HEAD requests.""" return self.GETorHEAD(req) + def _get_cached_updating_shard_ranges( + self, infocache, memcache, cache_key): + """ + Fetch cached shard ranges from infocache and memcache. + + :param infocache: the infocache instance. + :param memcache: an instance of a memcache client, + :class:`swift.common.memcached.MemcacheRing`. + :param cache_key: the cache key for both infocache and memcache. + :return: a tuple of (list of shard ranges in dict format, cache state) + """ + cached_ranges = infocache.get(cache_key) + if cached_ranges: + cache_state = 'infocache_hit' + else: + if memcache: + skip_chance = \ + self.app.container_updating_shard_ranges_skip_cache + if skip_chance and random.random() < skip_chance: + cache_state = 'skip' + else: + try: + cached_ranges = memcache.get( + cache_key, raise_on_error=True) + cache_state = 'hit' if cached_ranges else 'miss' + except MemcacheConnectionError: + cache_state = 'error' + else: + cache_state = 'disabled' + cached_ranges = cached_ranges or [] + return cached_ranges, cache_state + + def _get_update_shard(self, req, account, container, obj): + """ + Find the appropriate shard range for an object update. + + Note that this fetches and caches (in both the per-request infocache + and memcache, if available) all shard ranges for the given root + container so we won't have to contact the container DB for every write. + + :param req: original Request instance. + :param account: account from which shard ranges should be fetched. + :param container: container from which shard ranges should be fetched. + :param obj: object getting updated. + :return: an instance of :class:`swift.common.utils.ShardRange`, + or None if the update should go back to the root + """ + if not self.app.recheck_updating_shard_ranges: + # caching is disabled + cache_state = 'disabled' + # legacy behavior requests container server for includes=obj + shard_ranges, response = self._get_shard_ranges( + req, account, container, states='updating', includes=obj) + else: + # try to get from cache + response = None + cache_key = get_cache_key(account, container, shard='updating') + infocache = req.environ.setdefault('swift.infocache', {}) + memcache = cache_from_env(req.environ, True) + (cached_ranges, cache_state + ) = self._get_cached_updating_shard_ranges( + infocache, memcache, cache_key) + if cached_ranges: + # found cached shard ranges in either infocache or memcache + infocache[cache_key] = tuple(cached_ranges) + shard_ranges = [ShardRange.from_dict(shard_range) + for shard_range in cached_ranges] + else: + # pull full set of updating shards from backend + shard_ranges, response = self._get_shard_ranges( + req, account, container, states='updating') + if shard_ranges: + cached_ranges = [dict(sr) for sr in shard_ranges] + infocache[cache_key] = tuple(cached_ranges) + if memcache: + memcache.set( + cache_key, cached_ranges, + time=self.app.recheck_updating_shard_ranges) + + record_cache_op_metrics( + self.logger, 'shard_updating', cache_state, response) + return find_shard_range(obj, shard_ranges or []) + def _get_update_target(self, req, container_info): # find the sharded container to which we'll send the update db_state = container_info.get('sharding_state', 'unsharded')