Merge "Proxy: move '_get_update_shard' from base class to child class."
This commit is contained in:
@@ -40,13 +40,12 @@ from eventlet import sleep
|
|||||||
from eventlet.timeout import Timeout
|
from eventlet.timeout import Timeout
|
||||||
import six
|
import six
|
||||||
|
|
||||||
from swift.common.memcached import MemcacheConnectionError
|
|
||||||
from swift.common.wsgi import make_pre_authed_env, make_pre_authed_request
|
from swift.common.wsgi import make_pre_authed_env, make_pre_authed_request
|
||||||
from swift.common.utils import Timestamp, WatchdogTimeout, config_true_value, \
|
from swift.common.utils import Timestamp, WatchdogTimeout, config_true_value, \
|
||||||
public, split_path, list_from_csv, GreenthreadSafeIterator, \
|
public, split_path, list_from_csv, GreenthreadSafeIterator, \
|
||||||
GreenAsyncPile, quorum_size, parse_content_type, drain_and_close, \
|
GreenAsyncPile, quorum_size, parse_content_type, drain_and_close, \
|
||||||
document_iters_to_http_response_body, ShardRange, find_shard_range, \
|
document_iters_to_http_response_body, ShardRange, cache_from_env, \
|
||||||
cache_from_env, MetricsPrefixLoggerAdapter
|
MetricsPrefixLoggerAdapter
|
||||||
from swift.common.bufferedhttp import http_connect
|
from swift.common.bufferedhttp import http_connect
|
||||||
from swift.common import constraints
|
from swift.common import constraints
|
||||||
from swift.common.exceptions import ChunkReadTimeout, ChunkWriteTimeout, \
|
from swift.common.exceptions import ChunkReadTimeout, ChunkWriteTimeout, \
|
||||||
@@ -2399,86 +2398,3 @@ class Controller(object):
|
|||||||
listing, response = self._get_container_listing(
|
listing, response = self._get_container_listing(
|
||||||
req, account, container, headers=headers, params=params)
|
req, account, container, headers=headers, params=params)
|
||||||
return self._parse_shard_ranges(req, listing, response), response
|
return self._parse_shard_ranges(req, listing, response), response
|
||||||
|
|
||||||
def _get_cached_updating_shard_ranges(
|
|
||||||
self, infocache, memcache, cache_key):
|
|
||||||
"""
|
|
||||||
Fetch cached shard ranges from infocache and memcache.
|
|
||||||
|
|
||||||
:param infocache: the infocache instance.
|
|
||||||
:param memcache: an instance of a memcache client,
|
|
||||||
:class:`swift.common.memcached.MemcacheRing`.
|
|
||||||
:param cache_key: the cache key for both infocache and memcache.
|
|
||||||
:return: a tuple of (list of shard ranges in dict format, cache state)
|
|
||||||
"""
|
|
||||||
cached_ranges = infocache.get(cache_key)
|
|
||||||
if cached_ranges:
|
|
||||||
cache_state = 'infocache_hit'
|
|
||||||
else:
|
|
||||||
if memcache:
|
|
||||||
skip_chance = \
|
|
||||||
self.app.container_updating_shard_ranges_skip_cache
|
|
||||||
if skip_chance and random.random() < skip_chance:
|
|
||||||
cache_state = 'skip'
|
|
||||||
else:
|
|
||||||
try:
|
|
||||||
cached_ranges = memcache.get(
|
|
||||||
cache_key, raise_on_error=True)
|
|
||||||
cache_state = 'hit' if cached_ranges else 'miss'
|
|
||||||
except MemcacheConnectionError:
|
|
||||||
cache_state = 'error'
|
|
||||||
else:
|
|
||||||
cache_state = 'disabled'
|
|
||||||
cached_ranges = cached_ranges or []
|
|
||||||
return cached_ranges, cache_state
|
|
||||||
|
|
||||||
def _get_update_shard(self, req, account, container, obj):
|
|
||||||
"""
|
|
||||||
Find the appropriate shard range for an object update.
|
|
||||||
|
|
||||||
Note that this fetches and caches (in both the per-request infocache
|
|
||||||
and memcache, if available) all shard ranges for the given root
|
|
||||||
container so we won't have to contact the container DB for every write.
|
|
||||||
|
|
||||||
:param req: original Request instance.
|
|
||||||
:param account: account from which shard ranges should be fetched.
|
|
||||||
:param container: container from which shard ranges should be fetched.
|
|
||||||
:param obj: object getting updated.
|
|
||||||
:return: an instance of :class:`swift.common.utils.ShardRange`,
|
|
||||||
or None if the update should go back to the root
|
|
||||||
"""
|
|
||||||
if not self.app.recheck_updating_shard_ranges:
|
|
||||||
# caching is disabled
|
|
||||||
cache_state = 'disabled'
|
|
||||||
# legacy behavior requests container server for includes=obj
|
|
||||||
shard_ranges, response = self._get_shard_ranges(
|
|
||||||
req, account, container, states='updating', includes=obj)
|
|
||||||
else:
|
|
||||||
# try to get from cache
|
|
||||||
response = None
|
|
||||||
cache_key = get_cache_key(account, container, shard='updating')
|
|
||||||
infocache = req.environ.setdefault('swift.infocache', {})
|
|
||||||
memcache = cache_from_env(req.environ, True)
|
|
||||||
(cached_ranges, cache_state
|
|
||||||
) = self._get_cached_updating_shard_ranges(
|
|
||||||
infocache, memcache, cache_key)
|
|
||||||
if cached_ranges:
|
|
||||||
# found cached shard ranges in either infocache or memcache
|
|
||||||
infocache[cache_key] = tuple(cached_ranges)
|
|
||||||
shard_ranges = [ShardRange.from_dict(shard_range)
|
|
||||||
for shard_range in cached_ranges]
|
|
||||||
else:
|
|
||||||
# pull full set of updating shards from backend
|
|
||||||
shard_ranges, response = self._get_shard_ranges(
|
|
||||||
req, account, container, states='updating')
|
|
||||||
if shard_ranges:
|
|
||||||
cached_ranges = [dict(sr) for sr in shard_ranges]
|
|
||||||
infocache[cache_key] = tuple(cached_ranges)
|
|
||||||
if memcache:
|
|
||||||
memcache.set(
|
|
||||||
cache_key, cached_ranges,
|
|
||||||
time=self.app.recheck_updating_shard_ranges)
|
|
||||||
|
|
||||||
record_cache_op_metrics(
|
|
||||||
self.logger, 'shard_updating', cache_state, response)
|
|
||||||
return find_shard_range(obj, shard_ranges or [])
|
|
||||||
|
@@ -47,7 +47,8 @@ from swift.common.utils import (
|
|||||||
GreenAsyncPile, GreenthreadSafeIterator, Timestamp, WatchdogTimeout,
|
GreenAsyncPile, GreenthreadSafeIterator, Timestamp, WatchdogTimeout,
|
||||||
normalize_delete_at_timestamp, public, get_expirer_container,
|
normalize_delete_at_timestamp, public, get_expirer_container,
|
||||||
document_iters_to_http_response_body, parse_content_range,
|
document_iters_to_http_response_body, parse_content_range,
|
||||||
quorum_size, reiterate, close_if_possible, safe_json_loads, md5)
|
quorum_size, reiterate, close_if_possible, safe_json_loads, md5,
|
||||||
|
ShardRange, find_shard_range, cache_from_env)
|
||||||
from swift.common.bufferedhttp import http_connect
|
from swift.common.bufferedhttp import http_connect
|
||||||
from swift.common.constraints import check_metadata, check_object_creation
|
from swift.common.constraints import check_metadata, check_object_creation
|
||||||
from swift.common import constraints
|
from swift.common import constraints
|
||||||
@@ -62,11 +63,12 @@ from swift.common.http import (
|
|||||||
HTTP_SERVICE_UNAVAILABLE, HTTP_INSUFFICIENT_STORAGE,
|
HTTP_SERVICE_UNAVAILABLE, HTTP_INSUFFICIENT_STORAGE,
|
||||||
HTTP_PRECONDITION_FAILED, HTTP_CONFLICT, HTTP_UNPROCESSABLE_ENTITY,
|
HTTP_PRECONDITION_FAILED, HTTP_CONFLICT, HTTP_UNPROCESSABLE_ENTITY,
|
||||||
HTTP_REQUESTED_RANGE_NOT_SATISFIABLE, HTTP_NOT_FOUND)
|
HTTP_REQUESTED_RANGE_NOT_SATISFIABLE, HTTP_NOT_FOUND)
|
||||||
|
from swift.common.memcached import MemcacheConnectionError
|
||||||
from swift.common.storage_policy import (POLICIES, REPL_POLICY, EC_POLICY,
|
from swift.common.storage_policy import (POLICIES, REPL_POLICY, EC_POLICY,
|
||||||
ECDriverError, PolicyError)
|
ECDriverError, PolicyError)
|
||||||
from swift.proxy.controllers.base import Controller, delay_denial, \
|
from swift.proxy.controllers.base import Controller, delay_denial, \
|
||||||
cors_validation, update_headers, bytes_to_skip, close_swift_conn, \
|
cors_validation, update_headers, bytes_to_skip, close_swift_conn, \
|
||||||
ByteCountEnforcer
|
ByteCountEnforcer, record_cache_op_metrics, get_cache_key
|
||||||
from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPNotFound, \
|
from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPNotFound, \
|
||||||
HTTPPreconditionFailed, HTTPRequestEntityTooLarge, HTTPRequestTimeout, \
|
HTTPPreconditionFailed, HTTPRequestEntityTooLarge, HTTPRequestTimeout, \
|
||||||
HTTPServerError, HTTPServiceUnavailable, HTTPClientDisconnect, \
|
HTTPServerError, HTTPServiceUnavailable, HTTPClientDisconnect, \
|
||||||
@@ -274,6 +276,89 @@ class BaseObjectController(Controller):
|
|||||||
"""Handler for HTTP HEAD requests."""
|
"""Handler for HTTP HEAD requests."""
|
||||||
return self.GETorHEAD(req)
|
return self.GETorHEAD(req)
|
||||||
|
|
||||||
|
def _get_cached_updating_shard_ranges(
|
||||||
|
self, infocache, memcache, cache_key):
|
||||||
|
"""
|
||||||
|
Fetch cached shard ranges from infocache and memcache.
|
||||||
|
|
||||||
|
:param infocache: the infocache instance.
|
||||||
|
:param memcache: an instance of a memcache client,
|
||||||
|
:class:`swift.common.memcached.MemcacheRing`.
|
||||||
|
:param cache_key: the cache key for both infocache and memcache.
|
||||||
|
:return: a tuple of (list of shard ranges in dict format, cache state)
|
||||||
|
"""
|
||||||
|
cached_ranges = infocache.get(cache_key)
|
||||||
|
if cached_ranges:
|
||||||
|
cache_state = 'infocache_hit'
|
||||||
|
else:
|
||||||
|
if memcache:
|
||||||
|
skip_chance = \
|
||||||
|
self.app.container_updating_shard_ranges_skip_cache
|
||||||
|
if skip_chance and random.random() < skip_chance:
|
||||||
|
cache_state = 'skip'
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
cached_ranges = memcache.get(
|
||||||
|
cache_key, raise_on_error=True)
|
||||||
|
cache_state = 'hit' if cached_ranges else 'miss'
|
||||||
|
except MemcacheConnectionError:
|
||||||
|
cache_state = 'error'
|
||||||
|
else:
|
||||||
|
cache_state = 'disabled'
|
||||||
|
cached_ranges = cached_ranges or []
|
||||||
|
return cached_ranges, cache_state
|
||||||
|
|
||||||
|
def _get_update_shard(self, req, account, container, obj):
|
||||||
|
"""
|
||||||
|
Find the appropriate shard range for an object update.
|
||||||
|
|
||||||
|
Note that this fetches and caches (in both the per-request infocache
|
||||||
|
and memcache, if available) all shard ranges for the given root
|
||||||
|
container so we won't have to contact the container DB for every write.
|
||||||
|
|
||||||
|
:param req: original Request instance.
|
||||||
|
:param account: account from which shard ranges should be fetched.
|
||||||
|
:param container: container from which shard ranges should be fetched.
|
||||||
|
:param obj: object getting updated.
|
||||||
|
:return: an instance of :class:`swift.common.utils.ShardRange`,
|
||||||
|
or None if the update should go back to the root
|
||||||
|
"""
|
||||||
|
if not self.app.recheck_updating_shard_ranges:
|
||||||
|
# caching is disabled
|
||||||
|
cache_state = 'disabled'
|
||||||
|
# legacy behavior requests container server for includes=obj
|
||||||
|
shard_ranges, response = self._get_shard_ranges(
|
||||||
|
req, account, container, states='updating', includes=obj)
|
||||||
|
else:
|
||||||
|
# try to get from cache
|
||||||
|
response = None
|
||||||
|
cache_key = get_cache_key(account, container, shard='updating')
|
||||||
|
infocache = req.environ.setdefault('swift.infocache', {})
|
||||||
|
memcache = cache_from_env(req.environ, True)
|
||||||
|
(cached_ranges, cache_state
|
||||||
|
) = self._get_cached_updating_shard_ranges(
|
||||||
|
infocache, memcache, cache_key)
|
||||||
|
if cached_ranges:
|
||||||
|
# found cached shard ranges in either infocache or memcache
|
||||||
|
infocache[cache_key] = tuple(cached_ranges)
|
||||||
|
shard_ranges = [ShardRange.from_dict(shard_range)
|
||||||
|
for shard_range in cached_ranges]
|
||||||
|
else:
|
||||||
|
# pull full set of updating shards from backend
|
||||||
|
shard_ranges, response = self._get_shard_ranges(
|
||||||
|
req, account, container, states='updating')
|
||||||
|
if shard_ranges:
|
||||||
|
cached_ranges = [dict(sr) for sr in shard_ranges]
|
||||||
|
infocache[cache_key] = tuple(cached_ranges)
|
||||||
|
if memcache:
|
||||||
|
memcache.set(
|
||||||
|
cache_key, cached_ranges,
|
||||||
|
time=self.app.recheck_updating_shard_ranges)
|
||||||
|
|
||||||
|
record_cache_op_metrics(
|
||||||
|
self.logger, 'shard_updating', cache_state, response)
|
||||||
|
return find_shard_range(obj, shard_ranges or [])
|
||||||
|
|
||||||
def _get_update_target(self, req, container_info):
|
def _get_update_target(self, req, container_info):
|
||||||
# find the sharded container to which we'll send the update
|
# find the sharded container to which we'll send the update
|
||||||
db_state = container_info.get('sharding_state', 'unsharded')
|
db_state = container_info.get('sharding_state', 'unsharded')
|
||||||
|
Reference in New Issue
Block a user