Merge "Proxy: restructure cached updating shard ranges"

This commit is contained in:
Zuul 2023-03-15 16:51:24 +00:00 committed by Gerrit Code Review
commit d2153f5d5a
6 changed files with 517 additions and 271 deletions

View File

@ -96,7 +96,7 @@ from swift.common.header_key_dict import HeaderKeyDict
from swift.common.linkat import linkat
# For backwards compatability with 3rd party middlewares
from swift.common.registry import register_swift_info, get_swift_info # noqa
from swift.common.registry import register_swift_info, get_swift_info # noqa
# logging doesn't import patched as cleanly as one would like
from logging.handlers import SysLogHandler
@ -585,6 +585,7 @@ class _UTC(datetime.tzinfo):
"""
A tzinfo class for datetime objects that returns a 0 timedelta (UTC time)
"""
def dst(self, dt):
return datetime.timedelta(0)
utcoffset = dst
@ -934,6 +935,7 @@ class _LibcWrapper(object):
has the function of that name. If false, then calls will fail with a
NotImplementedError.
"""
def __init__(self, func_name):
self._func_name = func_name
self._func_handle = None
@ -1715,6 +1717,7 @@ class RateLimitedIterator(object):
this many elements; default is 0 (rate limit
immediately)
"""
def __init__(self, iterable, elements_per_second, limit_after=0,
ratelimit_if=lambda _junk: True):
self.iterator = iter(iterable)
@ -1749,6 +1752,7 @@ class GreenthreadSafeIterator(object):
an error like "ValueError: generator already executing". By wrapping calls
to next() with a mutex, we avoid that error.
"""
def __init__(self, unsafe_iterable):
self.unsafe_iter = iter(unsafe_iterable)
self.semaphore = eventlet.semaphore.Semaphore(value=1)
@ -2068,6 +2072,7 @@ class SwiftLoggerAdapter(logging.LoggerAdapter):
Like logging.LoggerAdapter, you have to subclass this and override the
process() method to accomplish anything useful.
"""
def get_metric_name(self, metric):
# subclasses may override this method to annotate the metric name
return metric
@ -2110,6 +2115,7 @@ class PrefixLoggerAdapter(SwiftLoggerAdapter):
Adds an optional prefix to all its log messages. When the prefix has not
been set, messages are unchanged.
"""
def set_prefix(self, prefix):
self.extra['prefix'] = prefix
@ -2129,6 +2135,7 @@ class MetricsPrefixLoggerAdapter(SwiftLoggerAdapter):
"""
Adds a prefix to all Statsd metrics' names.
"""
def __init__(self, logger, extra, metric_prefix):
"""
:param logger: an instance of logging.Logger
@ -2382,6 +2389,7 @@ class LogLevelFilter(object):
(DEBUG < INFO < WARN < ERROR < CRITICAL|FATAL)
Default: DEBUG
"""
def __init__(self, level=logging.DEBUG):
self.level = level
@ -3682,6 +3690,7 @@ class GreenAsyncPile(object):
Correlating results with jobs (if necessary) is left to the caller.
"""
def __init__(self, size_or_pool):
"""
:param size_or_pool: thread pool size or a pool to use
@ -3775,6 +3784,7 @@ class StreamingPile(GreenAsyncPile):
When used as a context manager, has the same worker-killing properties as
:class:`ContextPool`.
"""
def __init__(self, size):
""":param size: number of worker threads to use"""
self.pool = ContextPool(size)
@ -4266,6 +4276,7 @@ class Everything(object):
A container that contains everything. If "e" is an instance of
Everything, then "x in e" is true for all x.
"""
def __contains__(self, element):
return True
@ -4297,6 +4308,7 @@ class CloseableChain(object):
Like itertools.chain, but with a close method that will attempt to invoke
its sub-iterators' close methods, if any.
"""
def __init__(self, *iterables):
self.iterables = iterables
self.chained_iter = itertools.chain(*self.iterables)
@ -4340,6 +4352,7 @@ class InputProxy(object):
File-like object that counts bytes read.
To be swapped in for wsgi.input for accounting purposes.
"""
def __init__(self, wsgi_input):
"""
:param wsgi_input: file-like object to wrap the functionality of
@ -4481,6 +4494,7 @@ class Spliterator(object):
"l" # shorter than requested; this can happen with the last iterator
"""
def __init__(self, source_iterable):
self.input_iterator = iter(source_iterable)
self.leftovers = None
@ -5238,6 +5252,7 @@ class ShardName(object):
root container's own shard range will have a name format of
<account>/<root_container> which will raise ValueError if passed to parse.
"""
def __init__(self, account, root_container,
parent_container_hash,
timestamp,
@ -5329,7 +5344,277 @@ class ShardName(object):
raise ValueError('invalid name: %s' % name)
class ShardRange(object):
@functools.total_ordering
class Namespace(object):
__slots__ = ('_lower', '_upper', 'name')
@functools.total_ordering
class MaxBound(ShardRangeOuterBound):
# singleton for maximum bound
def __ge__(self, other):
return True
@functools.total_ordering
class MinBound(ShardRangeOuterBound):
# singleton for minimum bound
def __le__(self, other):
return True
MIN = MinBound()
MAX = MaxBound()
def __init__(self, name, lower, upper):
self._lower = Namespace.MIN
self._upper = Namespace.MAX
self.lower = lower
self.upper = upper
self.name = name
def __iter__(self):
yield 'name', str(self.name)
yield 'lower', self.lower_str
yield 'upper', self.upper_str
def __repr__(self):
return '%s(%s)' % (self.__class__.__name__, ', '.join(
'%s=%r' % prop for prop in self))
def __lt__(self, other):
# a Namespace is less than other if its entire namespace is less than
# other; if other is another Namespace that implies that this
# Namespace's upper must be less than or equal to the other
# Namespace's lower
if self.upper == Namespace.MAX:
return False
if isinstance(other, Namespace):
return self.upper <= other.lower
elif other is None:
return True
else:
return self.upper < self._encode(other)
def __gt__(self, other):
# a Namespace is greater than other if its entire namespace is greater
# than other; if other is another Namespace that implies that this
# Namespace's lower must be less greater than or equal to the other
# Namespace's upper
if self.lower == Namespace.MIN:
return False
if isinstance(other, Namespace):
return self.lower >= other.upper
elif other is None:
return False
else:
return self.lower >= self._encode(other)
def __eq__(self, other):
# test for equality of range bounds only
if not isinstance(other, Namespace):
return False
return self.lower == other.lower and self.upper == other.upper
def __ne__(self, other):
return not (self == other)
def __contains__(self, item):
# test if the given item is within the namespace
if item == '':
return False
item = self._encode_bound(item)
return self.lower < item <= self.upper
@classmethod
def _encode(cls, value):
if six.PY2 and isinstance(value, six.text_type):
return value.encode('utf-8')
if six.PY3 and isinstance(value, six.binary_type):
# This should never fail -- the value should always be coming from
# valid swift paths, which means UTF-8
return value.decode('utf-8')
return value
def _encode_bound(self, bound):
if isinstance(bound, ShardRangeOuterBound):
return bound
if not (isinstance(bound, six.text_type) or
isinstance(bound, six.binary_type)):
raise TypeError('must be a string type')
return self._encode(bound)
@property
def lower(self):
return self._lower
@property
def lower_str(self):
return str(self.lower)
@lower.setter
def lower(self, value):
if value is None or (value == b"" if isinstance(value, bytes) else
value == u""):
value = Namespace.MIN
try:
value = self._encode_bound(value)
except TypeError as err:
raise TypeError('lower %s' % err)
if value > self._upper:
raise ValueError(
'lower (%r) must be less than or equal to upper (%r)' %
(value, self.upper))
self._lower = value
@property
def upper(self):
return self._upper
@property
def upper_str(self):
return str(self.upper)
@upper.setter
def upper(self, value):
if value is None or (value == b"" if isinstance(value, bytes) else
value == u""):
value = Namespace.MAX
try:
value = self._encode_bound(value)
except TypeError as err:
raise TypeError('upper %s' % err)
if value < self._lower:
raise ValueError(
'upper (%r) must be greater than or equal to lower (%r)' %
(value, self.lower))
self._upper = value
def entire_namespace(self):
"""
Returns True if this namespace includes the entire namespace, False
otherwise.
"""
return (self.lower == Namespace.MIN and
self.upper == Namespace.MAX)
def overlaps(self, other):
"""
Returns True if this namespace overlaps with the other namespace.
:param other: an instance of :class:`~swift.common.utils.Namespace`
"""
if not isinstance(other, Namespace):
return False
return max(self.lower, other.lower) < min(self.upper, other.upper)
def includes(self, other):
"""
Returns True if this namespace includes the whole of the other
namespace, False otherwise.
:param other: an instance of :class:`~swift.common.utils.Namespace`
"""
return (self.lower <= other.lower) and (other.upper <= self.upper)
def expand(self, donors):
"""
Expands the bounds as necessary to match the minimum and maximum bounds
of the given donors.
:param donors: A list of :class:`~swift.common.utils.Namespace`
:return: True if the bounds have been modified, False otherwise.
"""
modified = False
new_lower = self.lower
new_upper = self.upper
for donor in donors:
new_lower = min(new_lower, donor.lower)
new_upper = max(new_upper, donor.upper)
if self.lower > new_lower or self.upper < new_upper:
self.lower = new_lower
self.upper = new_upper
modified = True
return modified
class NamespaceBoundList(object):
def __init__(self, bounds):
"""
Encapsulate a compact representation of namespaces. Each item in the
list is a list [lower bound, name].
:param bounds: a list of lists ``[lower bound, name]``. The list
should be ordered by ``lower bound``.
"""
self.bounds = [] if bounds is None else bounds
@classmethod
def parse(cls, namespaces):
"""
Create a NamespaceBoundList object by parsing a list of Namespaces or
shard ranges and only storing the compact bounds list.
Each Namespace in the given list of ``namespaces`` provides the next
[lower bound, name] list to append to the NamespaceBoundList. The
given ``namespaces`` should be contiguous because the
NamespaceBoundList only stores lower bounds; if ``namespaces`` has
overlaps then at least one of the overlapping namespaces may be
ignored; similarly, gaps between namespaces are not represented in the
NamespaceBoundList.
:param namespaces: A list of Namespace instances. The list should be
ordered by namespace bounds.
:return: a NamespaceBoundList.
"""
if not namespaces:
return None
bounds = []
upper = namespaces[0].lower
for ns in namespaces:
if ns.lower < upper:
# Discard overlapping namespace.
# Overlapping namespaces are expected in lists of shard ranges
# fetched from the backend. For example, while a parent
# container is in the process of sharding, the parent shard
# range and its children shard ranges may be returned in the
# list of shard ranges. However, the backend sorts the list by
# (upper, state, lower, name) such that the children precede
# the parent, and it is the children that we prefer to retain
# in the NamespaceBoundList. For example, these namespaces:
# (a-b, "child1"), (b-c, "child2"), (a-c, "parent")
# would result in a NamespaceBoundList:
# (a, "child1"), (b, "child2")
# Unexpected overlaps or gaps may result in namespaces being
# 'extended' because only lower bounds are stored. For example,
# these namespaces:
# (a-b, "ns1"), (d-e, "ns2")
# would result in a NamespaceBoundList:
# (a, "ns1"), (d, "ns2")
# When used to find a target namespace for an object update
# that lies in a gap, the NamespaceBoundList will map the
# object name to the preceding namespace. In the example, an
# object named "c" would be mapped to "ns1". (In previous
# versions, an object update lying in a gap would have been
# mapped to the root container.)
continue
bounds.append([ns.lower_str, str(ns.name)])
upper = ns.upper
return cls(bounds)
def get_namespace(self, item):
"""
Get a Namespace instance that contains ``item``.
:param item: The item for a which a Namespace is to be found.
:return: the Namespace that contains ``item``.
"""
pos = bisect.bisect(self.bounds, [item]) - 1
lower, name = self.bounds[pos]
upper = ('' if pos + 1 == len(self.bounds)
else self.bounds[pos + 1][0])
return Namespace(name, lower, upper)
class ShardRange(Namespace):
"""
A ShardRange encapsulates sharding state related to a container including
lower and upper bounds that define the object namespace for which the
@ -5398,41 +5683,25 @@ class ShardRange(object):
SHARDING_STATES = (SHARDING, SHARDED)
CLEAVING_STATES = SHRINKING_STATES + SHARDING_STATES
@functools.total_ordering
class MaxBound(ShardRangeOuterBound):
# singleton for maximum bound
def __ge__(self, other):
return True
@functools.total_ordering
class MinBound(ShardRangeOuterBound):
# singleton for minimum bound
def __le__(self, other):
return True
MIN = MinBound()
MAX = MaxBound()
__slots__ = (
'account', 'container',
'_timestamp', '_meta_timestamp', '_state_timestamp', '_epoch',
'_lower', '_upper', '_deleted', '_state', '_count', '_bytes',
'_deleted', '_state', '_count', '_bytes',
'_tombstones', '_reported')
def __init__(self, name, timestamp, lower=MIN, upper=MAX,
def __init__(self, name, timestamp,
lower=Namespace.MIN, upper=Namespace.MAX,
object_count=0, bytes_used=0, meta_timestamp=None,
deleted=False, state=None, state_timestamp=None, epoch=None,
reported=False, tombstones=-1):
super(ShardRange, self).__init__(name=name, lower=lower, upper=upper)
self.account = self.container = self._timestamp = \
self._meta_timestamp = self._state_timestamp = self._epoch = None
self._lower = ShardRange.MIN
self._upper = ShardRange.MAX
self._deleted = False
self._state = None
self.name = name
self.timestamp = timestamp
self.lower = lower
self.upper = upper
self.deleted = deleted
self.object_count = object_count
self.bytes_used = bytes_used
@ -5450,24 +5719,6 @@ class ShardRange(object):
# a key assumption for bisect, which is used by utils.find_shard_range
return sr.upper, sr.state, sr.lower, sr.name
@classmethod
def _encode(cls, value):
if six.PY2 and isinstance(value, six.text_type):
return value.encode('utf-8')
if six.PY3 and isinstance(value, six.binary_type):
# This should never fail -- the value should always be coming from
# valid swift paths, which means UTF-8
return value.decode('utf-8')
return value
def _encode_bound(self, bound):
if isinstance(bound, ShardRangeOuterBound):
return bound
if not (isinstance(bound, six.text_type) or
isinstance(bound, six.binary_type)):
raise TypeError('must be a string type')
return self._encode(bound)
def is_child_of(self, parent):
"""
Test if this shard range is a child of another shard range. The
@ -5638,56 +5889,10 @@ class ShardRange(object):
def meta_timestamp(self, ts):
self._meta_timestamp = self._to_timestamp(ts)
@property
def lower(self):
return self._lower
@property
def lower_str(self):
return str(self.lower)
@lower.setter
def lower(self, value):
if value is None or (value == b"" if isinstance(value, bytes) else
value == u""):
value = ShardRange.MIN
try:
value = self._encode_bound(value)
except TypeError as err:
raise TypeError('lower %s' % err)
if value > self._upper:
raise ValueError(
'lower (%r) must be less than or equal to upper (%r)' %
(value, self.upper))
self._lower = value
@property
def end_marker(self):
return self.upper_str + '\x00' if self.upper else ''
@property
def upper(self):
return self._upper
@property
def upper_str(self):
return str(self.upper)
@upper.setter
def upper(self, value):
if value is None or (value == b"" if isinstance(value, bytes) else
value == u""):
value = ShardRange.MAX
try:
value = self._encode_bound(value)
except TypeError as err:
raise TypeError('upper %s' % err)
if value < self._lower:
raise ValueError(
'upper (%r) must be greater than or equal to lower (%r)' %
(value, self.lower))
self._upper = value
@property
def object_count(self):
return self._count
@ -5895,56 +6100,12 @@ class ShardRange(object):
self.timestamp = timestamp or Timestamp.now()
return True
def __contains__(self, item):
# test if the given item is within the namespace
if item == '':
return False
item = self._encode_bound(item)
return self.lower < item <= self.upper
def __lt__(self, other):
# a ShardRange is less than other if its entire namespace is less than
# other; if other is another ShardRange that implies that this
# ShardRange's upper must be less than or equal to the other
# ShardRange's lower
if self.upper == ShardRange.MAX:
return False
if isinstance(other, ShardRange):
return self.upper <= other.lower
elif other is None:
return True
else:
return self.upper < self._encode(other)
def __gt__(self, other):
# a ShardRange is greater than other if its entire namespace is greater
# than other; if other is another ShardRange that implies that this
# ShardRange's lower must be less greater than or equal to the other
# ShardRange's upper
if self.lower == ShardRange.MIN:
return False
if isinstance(other, ShardRange):
return self.lower >= other.upper
elif other is None:
return False
else:
return self.lower >= self._encode(other)
def __eq__(self, other):
# test for equality of range bounds only
if not isinstance(other, ShardRange):
return False
return self.lower == other.lower and self.upper == other.upper
# A by-the-book implementation should probably hash the value, which
# in our case would be account+container+lower+upper (+timestamp ?).
# But we seem to be okay with just the identity.
def __hash__(self):
return id(self)
def __ne__(self, other):
return not (self == other)
def __repr__(self):
return '%s<%r to %r as of %s, (%d, %d) as of %s, %s as of %s>' % (
self.__class__.__name__, self.lower, self.upper,
@ -5952,34 +6113,6 @@ class ShardRange(object):
self.meta_timestamp.internal, self.state_text,
self.state_timestamp.internal)
def entire_namespace(self):
"""
Returns True if the ShardRange includes the entire namespace, False
otherwise.
"""
return (self.lower == ShardRange.MIN and
self.upper == ShardRange.MAX)
def overlaps(self, other):
"""
Returns True if the ShardRange namespace overlaps with the other
ShardRange's namespace.
:param other: an instance of :class:`~swift.common.utils.ShardRange`
"""
if not isinstance(other, ShardRange):
return False
return max(self.lower, other.lower) < min(self.upper, other.upper)
def includes(self, other):
"""
Returns True if this namespace includes the whole of the other
namespace, False otherwise.
:param other: an instance of :class:`~swift.common.utils.ShardRange`
"""
return (self.lower <= other.lower) and (other.upper <= self.upper)
def __iter__(self):
yield 'name', self.name
yield 'timestamp', self.timestamp.internal
@ -6028,26 +6161,6 @@ class ShardRange(object):
params['state_timestamp'], params['epoch'],
params.get('reported', 0), params.get('tombstones', -1))
def expand(self, donors):
"""
Expands the bounds as necessary to match the minimum and maximum bounds
of the given donors.
:param donors: A list of :class:`~swift.common.utils.ShardRange`
:return: True if the bounds have been modified, False otherwise.
"""
modified = False
new_lower = self.lower
new_upper = self.upper
for donor in donors:
new_lower = min(new_lower, donor.lower)
new_upper = max(new_upper, donor.upper)
if self.lower > new_lower or self.upper < new_upper:
self.lower = new_lower
self.upper = new_upper
modified = True
return modified
class ShardRangeList(UserList):
"""
@ -6057,6 +6170,7 @@ class ShardRangeList(UserList):
This class does not enforce ordering or continuity of the list items:
callers should ensure that items are added in order as appropriate.
"""
def __getitem__(self, index):
# workaround for py3 - not needed for py2.7,py3.8
result = self.data[index]
@ -6069,27 +6183,27 @@ class ShardRangeList(UserList):
only be equal to the lowest bound of all items in the list if the list
contents has been sorted.
:return: lower bound of first item in the list, or ShardRange.MIN
:return: lower bound of first item in the list, or Namespace.MIN
if the list is empty.
"""
if not self:
# empty list has range MIN->MIN
return ShardRange.MIN
return Namespace.MIN
return self[0].lower
@property
def upper(self):
"""
Returns the upper bound of the first item in the list. Note: this will
Returns the upper bound of the last item in the list. Note: this will
only be equal to the uppermost bound of all items in the list if the
list has previously been sorted.
:return: upper bound of first item in the list, or ShardRange.MIN
:return: upper bound of last item in the list, or Namespace.MIN
if the list is empty.
"""
if not self:
# empty list has range MIN->MIN
return ShardRange.MIN
return Namespace.MIN
return self[-1].upper
@property
@ -6231,7 +6345,7 @@ def filter_shard_ranges(shard_ranges, includes, marker, end_marker):
if marker or end_marker:
return list(filter(shard_range_filter, shard_ranges))
if marker == ShardRange.MAX or end_marker == ShardRange.MIN:
if marker == Namespace.MAX or end_marker == Namespace.MIN:
# MIN and MAX are both Falsy so not handled by shard_range_filter
return []
@ -6590,6 +6704,7 @@ class NoopMutex(object):
of which have the message-interleaving trouble you'd expect from TCP or
file handlers.
"""
def __init__(self):
# Usually, it's an error to have multiple greenthreads all waiting
# to write to the same file descriptor. It's often a sign of inadequate
@ -6857,6 +6972,7 @@ class Watchdog(object):
=> the exception is raised, then the greenlet watchdog sleep(3) to
wake up for the 1st timeout expiration
"""
def __init__(self):
# key => (timeout, timeout_at, caller_greenthread, exception)
self._timeouts = dict()
@ -6946,6 +7062,7 @@ class WatchdogTimeout(object):
"""
Context manager to schedule a timeout in a Watchdog instance
"""
def __init__(self, watchdog, timeout, exc, timeout_at=None):
"""
Schedule a timeout in a Watchdog instance

View File

@ -615,7 +615,10 @@ def get_cache_key(account, container=None, obj=None, shard=None):
raise ValueError('Shard cache key requires account and container')
if obj:
raise ValueError('Shard cache key cannot have obj')
cache_key = 'shard-%s/%s/%s' % (shard, account, container)
if shard == 'updating':
cache_key = 'shard-%s-v2/%s/%s' % (shard, account, container)
else:
cache_key = 'shard-%s/%s/%s' % (shard, account, container)
elif obj:
if not (account and container):
raise ValueError('Object cache key requires account and container')

View File

@ -48,7 +48,7 @@ from swift.common.utils import (
normalize_delete_at_timestamp, public, get_expirer_container,
document_iters_to_http_response_body, parse_content_range,
quorum_size, reiterate, close_if_possible, safe_json_loads, md5,
ShardRange, find_shard_range, cache_from_env)
ShardRange, find_shard_range, cache_from_env, NamespaceBoundList)
from swift.common.bufferedhttp import http_connect
from swift.common.constraints import check_metadata, check_object_creation
from swift.common import constraints
@ -278,37 +278,67 @@ class BaseObjectController(Controller):
"""Handler for HTTP HEAD requests."""
return self.GETorHEAD(req)
def _get_cached_updating_shard_ranges(
def _get_cached_updating_namespaces(
self, infocache, memcache, cache_key):
"""
Fetch cached shard ranges from infocache and memcache.
Fetch cached updating namespaces of updating shard ranges from
infocache and memcache.
:param infocache: the infocache instance.
:param memcache: an instance of a memcache client,
:class:`swift.common.memcached.MemcacheRing`.
:param cache_key: the cache key for both infocache and memcache.
:return: a tuple of (list of shard ranges in dict format, cache state)
:return: a tuple of (an instance of NamespaceBoundList, cache state)
"""
cached_ranges = infocache.get(cache_key)
if cached_ranges:
cache_state = 'infocache_hit'
# try get namespaces from infocache first
namespace_list = infocache.get(cache_key)
if namespace_list:
return namespace_list, 'infocache_hit'
# then try get them from memcache
if not memcache:
return None, 'disabled'
skip_chance = self.app.container_updating_shard_ranges_skip_cache
if skip_chance and random.random() < skip_chance:
return None, 'skip'
try:
namespaces = memcache.get(cache_key, raise_on_error=True)
cache_state = 'hit' if namespaces else 'miss'
except MemcacheConnectionError:
namespaces = None
cache_state = 'error'
if namespaces:
if six.PY2:
# json.loads() in memcache.get will convert json 'string' to
# 'unicode' with python2, here we cast 'unicode' back to 'str'
namespaces = [
[lower.encode('utf-8'), name.encode('utf-8')]
for lower, name in namespaces]
namespace_list = NamespaceBoundList(namespaces)
else:
if memcache:
skip_chance = \
self.app.container_updating_shard_ranges_skip_cache
if skip_chance and random.random() < skip_chance:
cache_state = 'skip'
else:
try:
cached_ranges = memcache.get(
cache_key, raise_on_error=True)
cache_state = 'hit' if cached_ranges else 'miss'
except MemcacheConnectionError:
cache_state = 'error'
else:
cache_state = 'disabled'
cached_ranges = cached_ranges or []
return cached_ranges, cache_state
namespace_list = None
return namespace_list, cache_state
def _get_update_shard_caching_disabled(self, req, account, container, obj):
"""
Fetch all updating shard ranges for the given root container when
all caching is disabled.
:param req: original Request instance.
:param account: account from which shard ranges should be fetched.
:param container: container from which shard ranges should be fetched.
:param obj: object getting updated.
:return: an instance of :class:`swift.common.utils.ShardRange`,
or None if the update should go back to the root
"""
# legacy behavior requests container server for includes=obj
shard_ranges, response = self._get_shard_ranges(
req, account, container, states='updating', includes=obj)
record_cache_op_metrics(
self.logger, 'shard_updating', 'disabled', response)
# there will be only one shard range in the list if any
return shard_ranges[0] if shard_ranges else None
def _get_update_shard(self, req, account, container, obj):
"""
@ -327,39 +357,41 @@ class BaseObjectController(Controller):
"""
if not self.app.recheck_updating_shard_ranges:
# caching is disabled
cache_state = 'disabled'
# legacy behavior requests container server for includes=obj
shard_ranges, response = self._get_shard_ranges(
req, account, container, states='updating', includes=obj)
else:
# try to get from cache
response = None
cache_key = get_cache_key(account, container, shard='updating')
infocache = req.environ.setdefault('swift.infocache', {})
memcache = cache_from_env(req.environ, True)
(cached_ranges, cache_state
) = self._get_cached_updating_shard_ranges(
infocache, memcache, cache_key)
if cached_ranges:
# found cached shard ranges in either infocache or memcache
infocache[cache_key] = tuple(cached_ranges)
shard_ranges = [ShardRange.from_dict(shard_range)
for shard_range in cached_ranges]
else:
# pull full set of updating shards from backend
shard_ranges, response = self._get_shard_ranges(
req, account, container, states='updating')
if shard_ranges:
cached_ranges = [dict(sr) for sr in shard_ranges]
infocache[cache_key] = tuple(cached_ranges)
if memcache:
memcache.set(
cache_key, cached_ranges,
time=self.app.recheck_updating_shard_ranges)
return self._get_update_shard_caching_disabled(
req, account, container, obj)
# caching is enabled, try to get from caches
response = None
cache_key = get_cache_key(account, container, shard='updating')
infocache = req.environ.setdefault('swift.infocache', {})
memcache = cache_from_env(req.environ, True)
cached_namespaces, cache_state = self._get_cached_updating_namespaces(
infocache, memcache, cache_key)
if cached_namespaces:
# found cached namespaces in either infocache or memcache
infocache[cache_key] = cached_namespaces
namespace = cached_namespaces.get_namespace(obj)
update_shard = ShardRange(
name=namespace.name, timestamp=0, lower=namespace.lower,
upper=namespace.upper)
else:
# pull full set of updating shard ranges from backend
shard_ranges, response = self._get_shard_ranges(
req, account, container, states='updating')
if shard_ranges:
# only store the list of namespace lower bounds and names into
# infocache and memcache.
cached_namespaces = NamespaceBoundList.parse(
shard_ranges)
infocache[cache_key] = cached_namespaces
if memcache:
memcache.set(
cache_key, cached_namespaces.bounds,
time=self.app.recheck_updating_shard_ranges)
update_shard = find_shard_range(obj, shard_ranges or [])
record_cache_op_metrics(
self.logger, 'shard_updating', cache_state, response)
return find_shard_range(obj, shard_ranges or [])
return update_shard
def _get_update_target(self, req, container_info):
# find the sharded container to which we'll send the update

View File

@ -5910,6 +5910,7 @@ class UnsafeXrange(object):
"""
Like range(limit), but with extra context switching to screw things up.
"""
def __init__(self, upper_bound):
self.current = 0
self.concurrent_calls = 0
@ -8211,6 +8212,83 @@ class TestShardName(unittest.TestCase):
utils.ShardName.create('a', 'root', None, '1235678', 'bad')
class TestNamespace(unittest.TestCase):
def test_total_ordering(self):
a_start_ns = utils.Namespace('a/-a', '', 'a')
a_atob_ns = utils.Namespace('a/a-b', 'a', 'b')
a_atof_ns = utils.Namespace('a/a-f', 'a', 'f')
a_ftol_ns = utils.Namespace('a/f-l', 'f', 'l')
a_ltor_ns = utils.Namespace('a/l-r', 'l', 'r')
a_rtoz_ns = utils.Namespace('a/r-z', 'r', 'z')
a_end_ns = utils.Namespace('a/z-', 'z', '')
b_start_ns = utils.Namespace('b/-a', '', 'a')
self.assertEqual(a_start_ns, b_start_ns)
self.assertNotEqual(a_start_ns, a_atob_ns)
self.assertLess(a_start_ns, a_atob_ns)
self.assertLess(a_atof_ns, a_ftol_ns)
self.assertLess(a_ftol_ns, a_ltor_ns)
self.assertLess(a_ltor_ns, a_rtoz_ns)
self.assertLess(a_rtoz_ns, a_end_ns)
self.assertLessEqual(a_start_ns, a_atof_ns)
self.assertLessEqual(a_atof_ns, a_rtoz_ns)
self.assertGreater(a_end_ns, a_atof_ns)
self.assertGreater(a_rtoz_ns, a_ftol_ns)
self.assertGreater(a_end_ns, a_start_ns)
self.assertGreaterEqual(a_end_ns, a_atof_ns)
self.assertGreaterEqual(a_rtoz_ns, a_start_ns)
class TestNamespaceBoundList(unittest.TestCase):
def test_functions(self):
start = ['', 'a/-a']
start_ns = utils.Namespace('a/-a', '', 'a')
atof = ['a', 'a/a-f']
atof_ns = utils.Namespace('a/a-f', 'a', 'f')
ftol = ['f', 'a/f-l']
ftol_ns = utils.Namespace('a/f-l', 'f', 'l')
ltor = ['l', 'a/l-r']
ltor_ns = utils.Namespace('a/l-r', 'l', 'r')
rtoz = ['r', 'a/r-z']
rtoz_ns = utils.Namespace('a/r-z', 'r', 'z')
end = ['z', 'a/z-']
end_ns = utils.Namespace('a/z-', 'z', '')
lowerbounds = [start, atof, ftol, ltor, rtoz, end]
namespace_list = utils.NamespaceBoundList(lowerbounds)
# test 'get_namespace'
self.assertEqual(namespace_list.get_namespace('1'), start_ns)
self.assertEqual(namespace_list.get_namespace('a'), start_ns)
self.assertEqual(namespace_list.get_namespace('b'), atof_ns)
self.assertEqual(namespace_list.get_namespace('f'), atof_ns)
self.assertEqual(namespace_list.get_namespace('f\x00'), ftol_ns)
self.assertEqual(namespace_list.get_namespace('l'), ftol_ns)
self.assertEqual(namespace_list.get_namespace('x'), rtoz_ns)
self.assertEqual(namespace_list.get_namespace('r'), ltor_ns)
self.assertEqual(namespace_list.get_namespace('}'), end_ns)
# test 'parse'
namespaces_list = utils.NamespaceBoundList.parse(None)
self.assertEqual(namespaces_list, None)
namespaces = [start_ns, atof_ns, ftol_ns, ltor_ns, rtoz_ns, end_ns]
namespace_list = utils.NamespaceBoundList.parse(namespaces)
self.assertEqual(namespace_list.get_namespace('1'), start_ns)
self.assertEqual(namespace_list.get_namespace('l'), ftol_ns)
self.assertEqual(namespace_list.get_namespace('x'), rtoz_ns)
self.assertEqual(namespace_list.get_namespace('r'), ltor_ns)
self.assertEqual(namespace_list.get_namespace('}'), end_ns)
self.assertEqual(namespace_list.bounds, lowerbounds)
overlap_f_ns = utils.Namespace('a/-f', '', 'f')
overlapping_namespaces = [start_ns, atof_ns, overlap_f_ns,
ftol_ns, ltor_ns, rtoz_ns, end_ns]
namespace_list = utils.NamespaceBoundList.parse(overlapping_namespaces)
self.assertEqual(namespace_list.bounds, lowerbounds)
overlap_l_ns = utils.Namespace('a/a-l', 'a', 'l')
overlapping_namespaces = [start_ns, atof_ns, ftol_ns,
overlap_l_ns, ltor_ns, rtoz_ns, end_ns]
namespace_list = utils.NamespaceBoundList.parse(overlapping_namespaces)
self.assertEqual(namespace_list.bounds, lowerbounds)
class TestShardRange(unittest.TestCase):
def setUp(self):
self.ts_iter = make_timestamp_iter()

View File

@ -501,7 +501,7 @@ class TestFuncs(BaseTest):
self.assertEqual(get_cache_key("account", "cont", shard="listing"),
'shard-listing/account/cont')
self.assertEqual(get_cache_key("account", "cont", shard="updating"),
'shard-updating/account/cont')
'shard-updating-v2/account/cont')
self.assertRaises(ValueError,
get_cache_key, "account", shard="listing")
self.assertRaises(ValueError,

View File

@ -71,7 +71,7 @@ from swift.common import utils, constraints, registry
from swift.common.utils import hash_path, storage_directory, \
parse_content_type, parse_mime_headers, StatsdClient, \
iter_multipart_mime_documents, public, mkdirs, NullLogger, md5, \
node_to_string
node_to_string, NamespaceBoundList
from swift.common.wsgi import loadapp, ConfigString
from swift.common.http_protocol import SwiftHttpProtocol
from swift.proxy.controllers import base as proxy_base
@ -4370,13 +4370,16 @@ class TestReplicatedObjectController(
params={'states': 'updating'},
headers={'X-Backend-Record-Type': 'shard'})
cache_key = 'shard-updating/a/c'
cache_key = 'shard-updating-v2/a/c'
self.assertIn(cache_key, req.environ['swift.cache'].store)
self.assertEqual(req.environ['swift.cache'].store[cache_key],
[dict(sr) for sr in shard_ranges])
cached_namespaces = NamespaceBoundList.parse(shard_ranges)
self.assertEqual(
req.environ['swift.cache'].store[cache_key],
cached_namespaces.bounds)
self.assertIn(cache_key, req.environ.get('swift.infocache'))
self.assertEqual(req.environ['swift.infocache'][cache_key],
tuple(dict(sr) for sr in shard_ranges))
self.assertEqual(
req.environ['swift.infocache'][cache_key].bounds,
cached_namespaces.bounds)
# make sure backend requests included expected container headers
container_headers = {}
@ -4433,8 +4436,11 @@ class TestReplicatedObjectController(
'.shards_a/c_nope', utils.Timestamp.now(), 'u', ''),
]
cache = FakeMemcache()
cache.set('shard-updating/a/c', tuple(
dict(shard_range) for shard_range in shard_ranges))
cache.set(
'shard-updating-v2/a/c',
tuple(
[shard_range.lower_str, str(shard_range.name)]
for shard_range in shard_ranges))
req = Request.blank('/v1/a/c/o', {'swift.cache': cache},
method=method, body='',
headers={'Content-Type': 'text/plain'})
@ -4467,10 +4473,11 @@ class TestReplicatedObjectController(
container_request, method='HEAD', path='/sda/0/a/c')
# infocache gets populated from memcache
cache_key = 'shard-updating/a/c'
cache_key = 'shard-updating-v2/a/c'
self.assertIn(cache_key, req.environ.get('swift.infocache'))
self.assertEqual(req.environ['swift.infocache'][cache_key],
tuple(dict(sr) for sr in shard_ranges))
self.assertEqual(
req.environ['swift.infocache'][cache_key].bounds,
NamespaceBoundList.parse(shard_ranges).bounds)
# make sure backend requests included expected container headers
container_headers = {}
@ -4527,8 +4534,8 @@ class TestReplicatedObjectController(
'.shards_a/c_nope', utils.Timestamp.now(), 'u', ''),
]
infocache = {
'shard-updating/a/c':
tuple(dict(shard_range) for shard_range in shard_ranges)}
'shard-updating-v2/a/c':
NamespaceBoundList.parse(shard_ranges)}
req = Request.blank('/v1/a/c/o', {'swift.infocache': infocache},
method=method, body='',
headers={'Content-Type': 'text/plain'})
@ -4560,10 +4567,11 @@ class TestReplicatedObjectController(
container_request, method='HEAD', path='/sda/0/a/c')
# verify content in infocache.
cache_key = 'shard-updating/a/c'
cache_key = 'shard-updating-v2/a/c'
self.assertIn(cache_key, req.environ.get('swift.infocache'))
self.assertEqual(req.environ['swift.infocache'][cache_key],
tuple(dict(sr) for sr in shard_ranges))
self.assertEqual(
req.environ['swift.infocache'][cache_key].bounds,
NamespaceBoundList.parse(shard_ranges).bounds)
# make sure backend requests included expected container headers
container_headers = {}
@ -4621,8 +4629,10 @@ class TestReplicatedObjectController(
'.shards_a/c_no_way', utils.Timestamp.now(), 'u', ''),
]
cache = FakeMemcache()
cache.set('shard-updating/a/c', tuple(
dict(shard_range) for shard_range in cached_shard_ranges))
cache.set('shard-updating-v2/a/c',
tuple(
[sr.lower_str, str(sr.name)]
for sr in cached_shard_ranges))
# sanity check: we can get the old shard from cache
req = Request.blank(
@ -4636,7 +4646,7 @@ class TestReplicatedObjectController(
'x-backend-sharding-state': sharding_state,
'X-Backend-Record-Type': 'shard'}
with mock.patch('random.random', return_value=1), \
mocked_http_conn(*status_codes, headers=resp_headers):
mocked_http_conn(*status_codes, headers=resp_headers):
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 202)
@ -4646,13 +4656,16 @@ class TestReplicatedObjectController(
'object.shard_updating.cache.hit': 1}, stats)
# cached shard ranges are still there
cache_key = 'shard-updating/a/c'
cache_key = 'shard-updating-v2/a/c'
self.assertIn(cache_key, req.environ['swift.cache'].store)
self.assertEqual(req.environ['swift.cache'].store[cache_key],
[dict(sr) for sr in cached_shard_ranges])
cached_namespaces = NamespaceBoundList.parse(cached_shard_ranges)
self.assertEqual(
req.environ['swift.cache'].store[cache_key],
cached_namespaces.bounds)
self.assertIn(cache_key, req.environ.get('swift.infocache'))
self.assertEqual(req.environ['swift.infocache'][cache_key],
tuple(dict(sr) for sr in cached_shard_ranges))
self.assertEqual(
req.environ['swift.infocache'][cache_key].bounds,
cached_namespaces.bounds)
# ...but we have some chance to skip cache
req = Request.blank(
@ -4675,8 +4688,8 @@ class TestReplicatedObjectController(
dict(shard_range)
for shard_range in shard_ranges]).encode('ascii')
with mock.patch('random.random', return_value=0), \
mocked_http_conn(*status_codes, headers=resp_headers,
body=body) as fake_conn:
mocked_http_conn(*status_codes, headers=resp_headers,
body=body) as fake_conn:
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 202)
@ -4698,13 +4711,16 @@ class TestReplicatedObjectController(
headers={'X-Backend-Record-Type': 'shard'})
# and skipping cache will refresh it
cache_key = 'shard-updating/a/c'
cache_key = 'shard-updating-v2/a/c'
self.assertIn(cache_key, req.environ['swift.cache'].store)
self.assertEqual(req.environ['swift.cache'].store[cache_key],
[dict(sr) for sr in shard_ranges])
cached_namespaces = NamespaceBoundList.parse(shard_ranges)
self.assertEqual(
req.environ['swift.cache'].store[cache_key],
cached_namespaces.bounds)
self.assertIn(cache_key, req.environ.get('swift.infocache'))
self.assertEqual(req.environ['swift.infocache'][cache_key],
tuple(dict(sr) for sr in shard_ranges))
self.assertEqual(
req.environ['swift.infocache'][cache_key].bounds,
cached_namespaces.bounds)
# make sure backend requests included expected container headers
container_headers = {}
@ -4805,7 +4821,7 @@ class TestReplicatedObjectController(
headers={'X-Backend-Record-Type': 'shard'})
# infocache does not get populated from memcache
cache_key = 'shard-updating/a/c'
cache_key = 'shard-updating-v2/a/c'
self.assertNotIn(cache_key, req.environ.get('swift.infocache'))
# make sure backend requests included expected container headers