Container-server: add container namespaces GET
The proxy-server makes GET requests to the container server to fetch full lists of shard ranges when handling object PUT/POST/DELETE and container GETs, then it only stores the Namespace attributes (lower and name) of the shard ranges into Memcache and reconstructs the list of Namespaces based on those attributes. Thus, a namespaces GET interface can be added into the backend container-server to only return a list of those Namespace attributes. On a container server setup which serves a container with ~12000 shard ranges, benchmarking results show that the request rate of the HTTP GET all namespaces (states=updating) is ~12 op/s, while the HTTP GET all shard ranges (states=updating) is ~3.2 op/s. The new namespace GET interface supports most of headers and parameters supported by shard range GET interface. For example, the support of marker, end_marker, include, reverse and etc. Two exceptions are: 'x-backend-include-deleted' cannot be supported because there is no way for a Namespace to indicate the deleted state; the 'auditing' state query parameter is not supported because it is specific to the sharder which only requests full shard ranges. Co-Authored-By: Matthew Oliver <matt@oliver.net.au> Co-Authored-By: Alistair Coles <alistairncoles@gmail.com> Co-Authored-By: Clay Gerrard <clay.gerrard@gmail.com> Change-Id: If152942c168d127de13e11e8da00a5760de5ae0d
This commit is contained in:
parent
c1c41a145e
commit
c073933387
@ -280,7 +280,7 @@ def backward(f, blocksize=4096):
|
||||
|
||||
|
||||
# Used when reading config values
|
||||
TRUE_VALUES = set(('true', '1', 'yes', 'on', 't', 'y'))
|
||||
TRUE_VALUES = {'true', '1', 'yes', 'on', 't', 'y'}
|
||||
|
||||
|
||||
def non_negative_float(value):
|
||||
@ -5096,11 +5096,18 @@ class ShardRange(Namespace):
|
||||
|
||||
@classmethod
|
||||
def sort_key(cls, sr):
|
||||
return cls.sort_key_order(sr.name, sr.lower, sr.upper, sr.state)
|
||||
|
||||
@staticmethod
|
||||
def sort_key_order(name, lower, upper, state):
|
||||
# Use Namespace.MaxBound() for upper bound '', this will allow this
|
||||
# record to be sorted correctly by upper.
|
||||
upper = upper if upper else Namespace.MaxBound()
|
||||
# defines the sort order for shard ranges
|
||||
# note if this ever changes to *not* sort by upper first then it breaks
|
||||
# a key assumption for bisect, which is used by utils.find_namespace
|
||||
# with shard ranges.
|
||||
return sr.upper, sr.state, sr.lower, sr.name
|
||||
return upper, state, lower, name
|
||||
|
||||
def is_child_of(self, parent):
|
||||
"""
|
||||
|
@ -1685,6 +1685,125 @@ class ContainerBroker(DatabaseBroker):
|
||||
if ('no such table: %s' % SHARD_RANGE_TABLE) not in str(err):
|
||||
raise
|
||||
|
||||
def _make_filler_shard_range(self, namespaces, marker, end_marker):
|
||||
if namespaces and namespaces[-1].upper == Namespace.MAX:
|
||||
return None
|
||||
|
||||
# Insert a modified copy of own shard range to fill any gap between the
|
||||
# end of any found and the upper bound of own shard range. Gaps
|
||||
# enclosed within the found shard ranges are not filled.
|
||||
own_shard_range = self.get_own_shard_range()
|
||||
if namespaces:
|
||||
last_upper = namespaces[-1].upper
|
||||
else:
|
||||
last_upper = max(marker or own_shard_range.lower,
|
||||
own_shard_range.lower)
|
||||
required_upper = min(end_marker or own_shard_range.upper,
|
||||
own_shard_range.upper)
|
||||
if required_upper > last_upper:
|
||||
filler_sr = own_shard_range
|
||||
filler_sr.lower = last_upper
|
||||
filler_sr.upper = required_upper
|
||||
return filler_sr
|
||||
else:
|
||||
return None
|
||||
|
||||
def get_namespaces(self, marker=None, end_marker=None, includes=None,
|
||||
reverse=False, states=None, fill_gaps=False):
|
||||
"""
|
||||
Returns a list of persisted namespaces per input parameters.
|
||||
|
||||
:param marker: restricts the returned list to shard ranges whose
|
||||
namespace includes or is greater than the marker value. If
|
||||
``reverse=True`` then ``marker`` is treated as ``end_marker``.
|
||||
``marker`` is ignored if ``includes`` is specified.
|
||||
:param end_marker: restricts the returned list to shard ranges whose
|
||||
namespace includes or is less than the end_marker value. If
|
||||
``reverse=True`` then ``end_marker`` is treated as ``marker``.
|
||||
``end_marker`` is ignored if ``includes`` is specified.
|
||||
:param includes: restricts the returned list to the shard range that
|
||||
includes the given value; if ``includes`` is specified then
|
||||
``fill_gaps``, ``marker`` and ``end_marker`` are ignored.
|
||||
:param reverse: reverse the result order.
|
||||
:param states: if specified, restricts the returned list to namespaces
|
||||
that have one of the given states; should be a list of ints.
|
||||
:param fill_gaps: if True, insert a modified copy of own shard range to
|
||||
fill any gap between the end of any found shard ranges and the
|
||||
upper bound of own shard range. Gaps enclosed within the found
|
||||
shard ranges are not filled.
|
||||
:return: a list of Namespace objects.
|
||||
"""
|
||||
if includes is None and (marker == Namespace.MAX
|
||||
or end_marker == Namespace.MIN):
|
||||
return []
|
||||
|
||||
if reverse:
|
||||
marker, end_marker = end_marker, marker
|
||||
if marker and end_marker and marker >= end_marker:
|
||||
return []
|
||||
|
||||
included_states = set(states) if states else None
|
||||
with self.get() as conn:
|
||||
# Namespace only needs 'name', 'lower' and 'upper', but the query
|
||||
# also need to include 'state' to be used when subesequently
|
||||
# sorting the rows. And the sorting can't be done within SQLite
|
||||
# since the value for maximum upper bound is an empty string.
|
||||
|
||||
conditions = ['deleted = 0', 'name != ?']
|
||||
params = [self.path]
|
||||
if included_states:
|
||||
conditions.append('state in (%s)' % ','.join(
|
||||
'?' * len(included_states)))
|
||||
params.extend(included_states)
|
||||
if includes is None:
|
||||
if end_marker:
|
||||
conditions.append('lower < ?')
|
||||
params.append(end_marker)
|
||||
if marker:
|
||||
conditions.append("(upper = '' OR upper > ?)")
|
||||
params.append(marker)
|
||||
else:
|
||||
conditions.extend(('lower < ?', "(upper = '' OR upper >= ?)"))
|
||||
params.extend((includes, includes))
|
||||
condition = ' WHERE ' + ' AND '.join(conditions)
|
||||
sql = '''
|
||||
SELECT name, lower, upper, state FROM %s%s
|
||||
''' % (SHARD_RANGE_TABLE, condition)
|
||||
try:
|
||||
data = conn.execute(sql, params)
|
||||
data.row_factory = None
|
||||
namespaces = [row for row in data]
|
||||
except sqlite3.OperationalError as err:
|
||||
if ('no such table: %s' % SHARD_RANGE_TABLE) in str(err):
|
||||
return []
|
||||
else:
|
||||
raise
|
||||
|
||||
# Sort those namespaces in order, note that each namespace record also
|
||||
# include additional attribute 'state'.
|
||||
def sort_key(namespace):
|
||||
return ShardRange.sort_key_order(name=namespace[0],
|
||||
lower=namespace[1],
|
||||
upper=namespace[2],
|
||||
state=namespace[3])
|
||||
namespaces.sort(key=sort_key)
|
||||
# Convert the record tuples to Namespace objects.
|
||||
namespaces = [Namespace(row[0], row[1], row[2]) for row in namespaces]
|
||||
if includes:
|
||||
return namespaces[:1] if namespaces else []
|
||||
|
||||
if fill_gaps:
|
||||
filler_sr = self._make_filler_shard_range(
|
||||
namespaces, marker, end_marker)
|
||||
if filler_sr:
|
||||
namespaces.append(Namespace(filler_sr.name,
|
||||
filler_sr.lower,
|
||||
filler_sr.upper))
|
||||
if reverse:
|
||||
namespaces.reverse()
|
||||
|
||||
return namespaces
|
||||
|
||||
def _get_shard_range_rows(self, connection=None, marker=None,
|
||||
end_marker=None, includes=None,
|
||||
include_deleted=False, states=None,
|
||||
@ -1901,18 +2020,9 @@ class ContainerBroker(DatabaseBroker):
|
||||
return shard_ranges[:1] if shard_ranges else []
|
||||
|
||||
if fill_gaps:
|
||||
own_shard_range = self.get_own_shard_range()
|
||||
if shard_ranges:
|
||||
last_upper = shard_ranges[-1].upper
|
||||
else:
|
||||
last_upper = max(marker or own_shard_range.lower,
|
||||
own_shard_range.lower)
|
||||
required_upper = min(end_marker or own_shard_range.upper,
|
||||
own_shard_range.upper)
|
||||
if required_upper > last_upper:
|
||||
filler_sr = own_shard_range
|
||||
filler_sr.lower = last_upper
|
||||
filler_sr.upper = required_upper
|
||||
filler_sr = self._make_filler_shard_range(
|
||||
shard_ranges, marker, end_marker)
|
||||
if filler_sr:
|
||||
shard_ranges.append(filler_sr)
|
||||
|
||||
if reverse:
|
||||
|
@ -612,15 +612,18 @@ class ContainerController(BaseStorageServer):
|
||||
resp.last_modified = Timestamp(headers['X-PUT-Timestamp']).ceil()
|
||||
return resp
|
||||
|
||||
def update_shard_record(self, record):
|
||||
def update_shard_record(self, record, shard_record_full=True):
|
||||
"""
|
||||
Return the shard_range database record as a dict, the keys will depend
|
||||
on the database fields provided in the record.
|
||||
|
||||
:param record: shard entry record.
|
||||
:param record: shard entry record, either ShardRange or Namespace.
|
||||
:param shard_record_full: boolean, when true the timestamp field is
|
||||
added as "last_modified" in iso format.
|
||||
:returns: dict suitable for listing responses
|
||||
"""
|
||||
response = dict(record)
|
||||
if shard_record_full:
|
||||
created = record.timestamp
|
||||
response['last_modified'] = Timestamp(created).isoformat
|
||||
return response
|
||||
@ -707,11 +710,11 @@ class ContainerController(BaseStorageServer):
|
||||
either the string or integer representation of
|
||||
:data:`~swift.common.utils.ShardRange.STATES`.
|
||||
|
||||
Two alias values may be used in a ``states`` parameter value:
|
||||
``listing`` will cause the listing to include all shard ranges in a
|
||||
state suitable for contributing to an object listing; ``updating``
|
||||
will cause the listing to include all shard ranges in a state
|
||||
suitable to accept an object update.
|
||||
Alias values may be used in a ``states`` parameter value. The
|
||||
``listing`` alias will cause the listing to include all shard ranges
|
||||
in a state suitable for contributing to an object listing. The
|
||||
``updating`` alias will cause the listing to include all shard ranges
|
||||
in a state suitable to accept an object update.
|
||||
|
||||
If either of these aliases is used then the shard range listing will
|
||||
if necessary be extended with a synthesised 'filler' range in order
|
||||
@ -720,6 +723,23 @@ class ContainerController(BaseStorageServer):
|
||||
uncovered tail of the requested name range and will point back to the
|
||||
same container.
|
||||
|
||||
The ``auditing`` alias will cause the listing to include all shard
|
||||
ranges in a state useful to the sharder while auditing a shard
|
||||
container. This alias will not cause a 'filler' range to be added,
|
||||
but will cause the container's own shard range to be included in the
|
||||
listing. For now, ``auditing`` is only supported when
|
||||
'X-Backend-Record-Shard-Format' is 'full'.
|
||||
|
||||
* Shard range listings can be simplified to include only Namespace
|
||||
only attributes (name, lower and upper) if the caller send the header
|
||||
``X-Backend-Record-Shard-Format`` with value 'namespace' as a hint
|
||||
that it would prefer namespaces. If this header doesn't exist or the
|
||||
value is 'full', the listings will default to include all attributes
|
||||
of shard ranges. But if params has includes/marker/end_marker then
|
||||
the response will be full shard ranges, regardless the header of
|
||||
``X-Backend-Record-Shard-Format``. The response header
|
||||
``X-Backend-Record-Type`` will tell the user what type it gets back.
|
||||
|
||||
* Listings are not normally returned from a deleted container. However,
|
||||
the ``X-Backend-Override-Deleted`` header may be used with a value in
|
||||
:attr:`swift.common.utils.TRUE_VALUES` to force a shard range
|
||||
@ -755,7 +775,7 @@ class ContainerController(BaseStorageServer):
|
||||
def GET_shard(self, req, broker, container, params, info,
|
||||
is_deleted, out_content_type):
|
||||
"""
|
||||
Returns a list of persisted shard ranges in response.
|
||||
Returns a list of persisted shard ranges or namespaces in response.
|
||||
|
||||
:param req: swob.Request object
|
||||
:param broker: container DB broker object
|
||||
@ -766,6 +786,14 @@ class ContainerController(BaseStorageServer):
|
||||
:param out_content_type: content type as a string.
|
||||
:returns: an instance of :class:`swift.common.swob.Response`
|
||||
"""
|
||||
override_deleted = info and config_true_value(
|
||||
req.headers.get('x-backend-override-deleted', False))
|
||||
resp_headers = gen_resp_headers(
|
||||
info, is_deleted=is_deleted and not override_deleted)
|
||||
|
||||
if is_deleted and not override_deleted:
|
||||
return HTTPNotFound(request=req, headers=resp_headers)
|
||||
|
||||
marker = params.get('marker', '')
|
||||
end_marker = params.get('end_marker')
|
||||
reverse = config_true_value(params.get('reverse'))
|
||||
@ -773,13 +801,6 @@ class ContainerController(BaseStorageServer):
|
||||
includes = params.get('includes')
|
||||
include_deleted = config_true_value(
|
||||
req.headers.get('x-backend-include-deleted', False))
|
||||
override_deleted = info and config_true_value(
|
||||
req.headers.get('x-backend-override-deleted', False))
|
||||
|
||||
resp_headers = gen_resp_headers(
|
||||
info, is_deleted=is_deleted and not override_deleted)
|
||||
if is_deleted and not override_deleted:
|
||||
return HTTPNotFound(request=req, headers=resp_headers)
|
||||
|
||||
resp_headers['X-Backend-Record-Type'] = 'shard'
|
||||
override_filter_hdr = req.headers.get(
|
||||
@ -804,11 +825,37 @@ class ContainerController(BaseStorageServer):
|
||||
except ValueError:
|
||||
return HTTPBadRequest(request=req, body='Bad state')
|
||||
|
||||
# For record type of 'shard', user can specify an additional header
|
||||
# to ask for list of Namespaces instead of full ShardRanges.
|
||||
# This will allow proxy server who is going to retrieve Namespace
|
||||
# to talk to older version of container servers who don't support
|
||||
# Namespace yet during upgrade.
|
||||
shard_format = req.headers.get(
|
||||
'x-backend-record-shard-format', 'full').lower()
|
||||
if shard_format == 'namespace':
|
||||
resp_headers['X-Backend-Record-Shard-Format'] = 'namespace'
|
||||
# Namespace GET does not support all the options of Shard Range
|
||||
# GET: 'x-backend-include-deleted' cannot be supported because
|
||||
# there is no way for a Namespace to indicate the deleted state;
|
||||
# the 'auditing' state query parameter is not supported because it
|
||||
# is specific to the sharder which only requests full shard ranges.
|
||||
if include_deleted:
|
||||
return HTTPBadRequest(
|
||||
request=req, body='No include_deleted for namespace GET')
|
||||
if include_own:
|
||||
return HTTPBadRequest(
|
||||
request=req, body='No auditing state for namespace GET')
|
||||
shard_format_full = False
|
||||
container_list = broker.get_namespaces(
|
||||
marker, end_marker, includes, reverse, states, fill_gaps)
|
||||
else:
|
||||
resp_headers['X-Backend-Record-Shard-Format'] = 'full'
|
||||
shard_format_full = True
|
||||
container_list = broker.get_shard_ranges(
|
||||
marker, end_marker, includes, reverse, states=states,
|
||||
include_deleted=include_deleted, fill_gaps=fill_gaps,
|
||||
include_own=include_own)
|
||||
listing = [self.update_shard_record(record)
|
||||
listing = [self.update_shard_record(record, shard_format_full)
|
||||
for record in container_list]
|
||||
return self._create_GET_response(req, out_content_type, info,
|
||||
resp_headers, broker.metadata,
|
||||
|
@ -203,8 +203,22 @@ class BaseTestContainerSharding(ReplProbeTest):
|
||||
resp = self.get_container_listing(account, container, headers,
|
||||
params=params)
|
||||
self.assertEqual('shard', resp.headers.get('X-Backend-Record-Type'))
|
||||
self.assertEqual('full',
|
||||
resp.headers.get('X-Backend-Record-Shard-Format'))
|
||||
return [ShardRange.from_dict(sr) for sr in json.loads(resp.body)]
|
||||
|
||||
def get_container_namespaces(self, account=None, container=None,
|
||||
headers=None, params=None):
|
||||
headers = dict(headers) if headers else {}
|
||||
headers.update({'X-Backend-Record-Type': 'shard',
|
||||
'X-Backend-Record-Shard-Format': 'namespace'})
|
||||
resp = self.get_container_listing(account, container, headers,
|
||||
params=params)
|
||||
self.assertEqual('shard', resp.headers.get('X-Backend-Record-Type'))
|
||||
self.assertEqual('namespace',
|
||||
resp.headers.get('X-Backend-Record-Shard-Format'))
|
||||
return [Namespace(**ns) for ns in json.loads(resp.body)]
|
||||
|
||||
def direct_get_container_shard_ranges(self, account=None, container=None,
|
||||
expect_failure=False):
|
||||
collector = ShardCollector()
|
||||
@ -2974,6 +2988,8 @@ class TestShardedAPI(BaseTestContainerSharding):
|
||||
|
||||
orig_shard_ranges = self.get_container_shard_ranges()
|
||||
self.assertEqual(2, len(orig_shard_ranges))
|
||||
namespaces = self.get_container_namespaces()
|
||||
self._assert_namespace_equivalence(orig_shard_ranges, namespaces)
|
||||
|
||||
# the container is sharded so *all* shard ranges should satisfy
|
||||
# updating and listing state aliases
|
||||
@ -3005,33 +3021,57 @@ class TestShardedAPI(BaseTestContainerSharding):
|
||||
shard_ranges = self.get_container_shard_ranges(
|
||||
params={'includes': all_obj_names[1]})
|
||||
self._assert_namespace_equivalence(orig_shard_ranges[:1], shard_ranges)
|
||||
namespaces = self.get_container_namespaces(
|
||||
params={'includes': all_obj_names[1]})
|
||||
self._assert_namespace_equivalence(shard_ranges, namespaces)
|
||||
|
||||
shard_ranges = self.get_container_shard_ranges(
|
||||
# override 'includes'
|
||||
headers={'X-Backend-Override-Shard-Name-Filter': 'sharded'},
|
||||
params={'includes': all_obj_names[1]})
|
||||
self._assert_namespace_equivalence(orig_shard_ranges, shard_ranges)
|
||||
namespaces = self.get_container_namespaces(
|
||||
# override 'includes'
|
||||
headers={'X-Backend-Override-Shard-Name-Filter': 'sharded'},
|
||||
params={'includes': all_obj_names[1]})
|
||||
self._assert_namespace_equivalence(shard_ranges, namespaces)
|
||||
|
||||
shard_ranges = self.get_container_shard_ranges(
|
||||
params={'end_marker': all_obj_names[1]})
|
||||
self._assert_namespace_equivalence(orig_shard_ranges[:1], shard_ranges)
|
||||
namespaces = self.get_container_namespaces(
|
||||
params={'end_marker': all_obj_names[1]})
|
||||
self._assert_namespace_equivalence(shard_ranges, namespaces)
|
||||
|
||||
shard_ranges = self.get_container_shard_ranges(
|
||||
# override 'end_marker'
|
||||
headers={'X-Backend-Override-Shard-Name-Filter': 'sharded'},
|
||||
params={'end_marker': all_obj_names[1]})
|
||||
self._assert_namespace_equivalence(orig_shard_ranges, shard_ranges)
|
||||
namespaces = self.get_container_namespaces(
|
||||
# override 'end_marker'
|
||||
headers={'X-Backend-Override-Shard-Name-Filter': 'sharded'},
|
||||
params={'end_marker': all_obj_names[1]})
|
||||
self._assert_namespace_equivalence(shard_ranges, namespaces)
|
||||
|
||||
shard_ranges = self.get_container_shard_ranges(
|
||||
params={'reverse': 'true'})
|
||||
self._assert_namespace_equivalence(list(reversed(orig_shard_ranges)),
|
||||
shard_ranges)
|
||||
namespaces = self.get_container_namespaces(
|
||||
params={'reverse': 'true'})
|
||||
self._assert_namespace_equivalence(shard_ranges, namespaces)
|
||||
|
||||
shard_ranges = self.get_container_shard_ranges(
|
||||
# override 'reverse'
|
||||
headers={'X-Backend-Override-Shard-Name-Filter': 'sharded'},
|
||||
params={'reverse': 'true'})
|
||||
self._assert_namespace_equivalence(orig_shard_ranges, shard_ranges)
|
||||
namespaces = self.get_container_namespaces(
|
||||
# override 'reverse'
|
||||
headers={'X-Backend-Override-Shard-Name-Filter': 'sharded'},
|
||||
params={'reverse': 'true'})
|
||||
self._assert_namespace_equivalence(shard_ranges, namespaces)
|
||||
|
||||
objs = self.get_container_objects()
|
||||
self.assertEqual(all_obj_names, [obj['name'] for obj in objs])
|
||||
|
@ -8600,6 +8600,42 @@ class TestShardRange(unittest.TestCase, BaseNamespaceShardRange):
|
||||
'a', 'root', 'parent', ts.internal, '3')
|
||||
self.assertEqual('a/root-%s-%s-3' % (parent_hash, ts.internal), actual)
|
||||
|
||||
def test_sort_key_order(self):
|
||||
self.assertEqual(
|
||||
utils.ShardRange.sort_key_order(
|
||||
name="a/c",
|
||||
lower='lower',
|
||||
upper='upper',
|
||||
state=utils.ShardRange.ACTIVE),
|
||||
('upper', utils.ShardRange.ACTIVE, 'lower', "a/c"))
|
||||
|
||||
def test_sort_key(self):
|
||||
orig_shard_ranges = [
|
||||
utils.ShardRange('a/c', next(self.ts_iter), '', '',
|
||||
state=utils.ShardRange.SHARDED),
|
||||
utils.ShardRange('.a/c1', next(self.ts_iter), 'a', 'd',
|
||||
state=utils.ShardRange.CREATED),
|
||||
utils.ShardRange('.a/c0', next(self.ts_iter), '', 'a',
|
||||
state=utils.ShardRange.CREATED),
|
||||
utils.ShardRange('.a/c2b', next(self.ts_iter), 'd', 'f',
|
||||
state=utils.ShardRange.SHARDING),
|
||||
utils.ShardRange('.a/c2', next(self.ts_iter), 'c', 'f',
|
||||
state=utils.ShardRange.SHARDING),
|
||||
utils.ShardRange('.a/c2a', next(self.ts_iter), 'd', 'f',
|
||||
state=utils.ShardRange.SHARDING),
|
||||
utils.ShardRange('.a/c4', next(self.ts_iter), 'f', '',
|
||||
state=utils.ShardRange.ACTIVE)
|
||||
]
|
||||
shard_ranges = list(orig_shard_ranges)
|
||||
shard_ranges.sort(key=utils.ShardRange.sort_key)
|
||||
self.assertEqual(shard_ranges[0], orig_shard_ranges[2])
|
||||
self.assertEqual(shard_ranges[1], orig_shard_ranges[1])
|
||||
self.assertEqual(shard_ranges[2], orig_shard_ranges[4])
|
||||
self.assertEqual(shard_ranges[3], orig_shard_ranges[5])
|
||||
self.assertEqual(shard_ranges[4], orig_shard_ranges[3])
|
||||
self.assertEqual(shard_ranges[5], orig_shard_ranges[6])
|
||||
self.assertEqual(shard_ranges[6], orig_shard_ranges[0])
|
||||
|
||||
def test_is_child_of(self):
|
||||
# Set up some shard ranges in relational hierarchy:
|
||||
# account -> root -> grandparent -> parent -> child
|
||||
|
@ -1747,9 +1747,8 @@ class TestContainerBroker(test_db.TestDbBase):
|
||||
# Add the 'shard_range' table back to the database, but it doesn't
|
||||
# have any shard range row in it yet.
|
||||
self._add_shard_range_table(broker)
|
||||
shard_ranges = broker.get_shard_ranges(
|
||||
include_deleted=True, states=None, include_own=True)
|
||||
self.assertEqual(shard_ranges, [])
|
||||
self.assertFalse(broker.get_shard_ranges(
|
||||
include_deleted=True, states=None, include_own=True))
|
||||
self.assertFalse(broker.has_other_shard_ranges())
|
||||
|
||||
# Insert its 'own_shard_range' into this test database.
|
||||
@ -4236,6 +4235,29 @@ class TestContainerBroker(test_db.TestDbBase):
|
||||
check_bad_value([''])
|
||||
check_bad_value('active')
|
||||
|
||||
def _check_get_sr(self, broker, expected_sr, **kwargs):
|
||||
"""
|
||||
Get shard ranges from ``broker`` per parameters ``kwargs``, and check
|
||||
returned shard ranges against expected shard ranges.
|
||||
"""
|
||||
actual_sr = broker.get_shard_ranges(**kwargs)
|
||||
self.assertEqual([dict(sr) for sr in expected_sr],
|
||||
[dict(sr) for sr in actual_sr])
|
||||
|
||||
def _check_get_ns(self, broker, expected_ns, **kwargs):
|
||||
actual_ns = broker.get_namespaces(**kwargs)
|
||||
self.assertEqual(expected_ns, actual_ns)
|
||||
|
||||
def _check_get_sr_and_ns(self, broker, expected_sr, **kwargs):
|
||||
"""
|
||||
For those 'get_shard_ranges' calls who's params are compatible with
|
||||
'get_namespaces', call both of them to cross-check each other.
|
||||
"""
|
||||
self._check_get_sr(broker, expected_sr, **kwargs)
|
||||
expected_ns = [Namespace(sr.name, sr.lower, sr.upper)
|
||||
for sr in expected_sr]
|
||||
self._check_get_ns(broker, expected_ns, **kwargs)
|
||||
|
||||
@with_tempdir
|
||||
def test_get_shard_ranges(self, tempdir):
|
||||
db_path = os.path.join(tempdir, 'container.db')
|
||||
@ -4243,20 +4265,18 @@ class TestContainerBroker(test_db.TestDbBase):
|
||||
broker.initialize(next(self.ts).internal, 0)
|
||||
|
||||
# no rows
|
||||
self.assertFalse(broker.get_shard_ranges())
|
||||
self._check_get_sr_and_ns(broker, expected_sr=[])
|
||||
# check that a default own shard range is not generated
|
||||
self.assertFalse(broker.get_shard_ranges(include_own=True))
|
||||
self._check_get_sr(broker, expected_sr=[], include_own=True)
|
||||
|
||||
# merge row for own shard range
|
||||
own_shard_range = ShardRange(broker.path, next(self.ts), 'l', 'u',
|
||||
state=ShardRange.SHARDING)
|
||||
broker.merge_shard_ranges([own_shard_range])
|
||||
self.assertFalse(broker.get_shard_ranges())
|
||||
self.assertFalse(broker.get_shard_ranges(include_own=False))
|
||||
|
||||
actual = broker.get_shard_ranges(include_own=True)
|
||||
self.assertEqual([dict(sr) for sr in [own_shard_range]],
|
||||
[dict(sr) for sr in actual])
|
||||
self._check_get_sr(broker, expected_sr=[])
|
||||
self._check_get_sr(broker, expected_sr=[], include_own=False)
|
||||
self._check_get_sr(
|
||||
broker, expected_sr=[own_shard_range], include_own=True)
|
||||
|
||||
# merge rows for other shard ranges
|
||||
shard_ranges = [
|
||||
@ -4271,105 +4291,85 @@ class TestContainerBroker(test_db.TestDbBase):
|
||||
ShardRange('.a/c5', next(self.ts), 'h', 'j', deleted=1)
|
||||
]
|
||||
broker.merge_shard_ranges(shard_ranges)
|
||||
actual = broker.get_shard_ranges()
|
||||
undeleted = shard_ranges[:3] + shard_ranges[4:5]
|
||||
self.assertEqual([dict(sr) for sr in undeleted],
|
||||
[dict(sr) for sr in actual])
|
||||
self._check_get_sr_and_ns(broker, expected_sr=undeleted)
|
||||
|
||||
actual = broker.get_shard_ranges(include_deleted=True)
|
||||
self.assertEqual([dict(sr) for sr in shard_ranges],
|
||||
[dict(sr) for sr in actual])
|
||||
self._check_get_sr(
|
||||
broker, expected_sr=shard_ranges, include_deleted=True)
|
||||
|
||||
actual = broker.get_shard_ranges(reverse=True)
|
||||
self.assertEqual([dict(sr) for sr in reversed(undeleted)],
|
||||
[dict(sr) for sr in actual])
|
||||
|
||||
actual = broker.get_shard_ranges(marker='c', end_marker='e')
|
||||
self.assertEqual([dict(sr) for sr in shard_ranges[1:3]],
|
||||
[dict(sr) for sr in actual])
|
||||
|
||||
actual = broker.get_shard_ranges(marker='c', end_marker='e',
|
||||
states=[ShardRange.ACTIVE])
|
||||
self.assertEqual([dict(sr) for sr in shard_ranges[2:3]],
|
||||
[dict(sr) for sr in actual])
|
||||
|
||||
actual = broker.get_shard_ranges(marker='e', end_marker='e')
|
||||
self.assertFalse([dict(sr) for sr in actual])
|
||||
self._check_get_sr_and_ns(
|
||||
broker, reverse=True, expected_sr=list(reversed(undeleted)))
|
||||
self._check_get_sr_and_ns(
|
||||
broker, marker='c', end_marker='e', expected_sr=shard_ranges[1:3])
|
||||
self._check_get_sr_and_ns(
|
||||
broker, marker='c', end_marker='e', states=[ShardRange.ACTIVE],
|
||||
expected_sr=shard_ranges[2:3])
|
||||
self._check_get_sr_and_ns(broker, marker='e', end_marker='e',
|
||||
expected_sr=[])
|
||||
|
||||
# check state filtering...
|
||||
actual = broker.get_shard_ranges(states=[ShardRange.FOUND])
|
||||
self.assertEqual([dict(sr) for sr in shard_ranges[:2]],
|
||||
[dict(sr) for sr in actual])
|
||||
self._check_get_sr_and_ns(
|
||||
broker, states=[ShardRange.FOUND], expected_sr=shard_ranges[:2])
|
||||
|
||||
# includes overrides include_own
|
||||
actual = broker.get_shard_ranges(includes='b', include_own=True)
|
||||
self.assertEqual([dict(shard_ranges[0])], [dict(sr) for sr in actual])
|
||||
self._check_get_sr(broker, expected_sr=[shard_ranges[0]],
|
||||
includes='b', include_own=True)
|
||||
# ... unless they coincide
|
||||
actual = broker.get_shard_ranges(includes='t', include_own=True)
|
||||
self.assertEqual([dict(own_shard_range)], [dict(sr) for sr in actual])
|
||||
self._check_get_sr(broker, expected_sr=[own_shard_range],
|
||||
includes='t', include_own=True)
|
||||
|
||||
# exclude_others overrides includes
|
||||
actual = broker.get_shard_ranges(includes='b', exclude_others=True)
|
||||
self.assertFalse(actual)
|
||||
self._check_get_sr(broker, expected_sr=[],
|
||||
includes='b', exclude_others=True)
|
||||
|
||||
# include_deleted overrides includes
|
||||
actual = broker.get_shard_ranges(includes='i', include_deleted=True)
|
||||
self.assertEqual([dict(shard_ranges[-1])], [dict(sr) for sr in actual])
|
||||
actual = broker.get_shard_ranges(includes='i', include_deleted=False)
|
||||
self.assertFalse(actual)
|
||||
self._check_get_sr(broker, expected_sr=[shard_ranges[-1]],
|
||||
includes='i', include_deleted=True)
|
||||
self._check_get_sr(broker, expected_sr=[],
|
||||
includes='i', include_deleted=False)
|
||||
|
||||
# includes overrides marker/end_marker
|
||||
actual = broker.get_shard_ranges(includes='b', marker='e',
|
||||
end_marker='')
|
||||
self.assertEqual([dict(shard_ranges[0])], [dict(sr) for sr in actual])
|
||||
self._check_get_sr_and_ns(
|
||||
broker, marker='e', end_marker='', includes='b',
|
||||
expected_sr=[shard_ranges[0]])
|
||||
|
||||
actual = broker.get_shard_ranges(includes='b', marker=Namespace.MAX)
|
||||
self.assertEqual([dict(shard_ranges[0])], [dict(sr) for sr in actual])
|
||||
self._check_get_sr_and_ns(
|
||||
broker, includes='b', marker=Namespace.MAX,
|
||||
expected_sr=[shard_ranges[0]])
|
||||
|
||||
# end_marker is Namespace.MAX
|
||||
actual = broker.get_shard_ranges(marker='e', end_marker='')
|
||||
self.assertEqual([dict(sr) for sr in undeleted[2:]],
|
||||
[dict(sr) for sr in actual])
|
||||
|
||||
actual = broker.get_shard_ranges(marker='e', end_marker='',
|
||||
reverse=True)
|
||||
self.assertEqual([dict(sr) for sr in reversed(undeleted[:3])],
|
||||
[dict(sr) for sr in actual])
|
||||
self._check_get_sr_and_ns(
|
||||
broker, marker='e', end_marker='', expected_sr=undeleted[2:])
|
||||
self._check_get_sr_and_ns(
|
||||
broker, marker='e', end_marker='', reverse=True,
|
||||
expected_sr=list(reversed(undeleted[:3])))
|
||||
|
||||
# marker is Namespace.MIN
|
||||
actual = broker.get_shard_ranges(marker='', end_marker='d')
|
||||
self.assertEqual([dict(sr) for sr in shard_ranges[:2]],
|
||||
[dict(sr) for sr in actual])
|
||||
|
||||
actual = broker.get_shard_ranges(marker='', end_marker='d',
|
||||
self._check_get_sr_and_ns(
|
||||
broker, marker='', end_marker='d', expected_sr=shard_ranges[:2])
|
||||
self._check_get_sr(broker,
|
||||
expected_sr=list(reversed(shard_ranges[2:])),
|
||||
marker='', end_marker='d',
|
||||
reverse=True, include_deleted=True)
|
||||
self.assertEqual([dict(sr) for sr in reversed(shard_ranges[2:])],
|
||||
[dict(sr) for sr in actual])
|
||||
|
||||
# marker, end_marker span entire namespace
|
||||
actual = broker.get_shard_ranges(marker='', end_marker='')
|
||||
self.assertEqual([dict(sr) for sr in undeleted],
|
||||
[dict(sr) for sr in actual])
|
||||
self._check_get_sr_and_ns(
|
||||
broker, marker='', end_marker='', expected_sr=undeleted)
|
||||
|
||||
# marker, end_marker override include_own
|
||||
actual = broker.get_shard_ranges(marker='', end_marker='k',
|
||||
include_own=True)
|
||||
self.assertEqual([dict(sr) for sr in undeleted],
|
||||
[dict(sr) for sr in actual])
|
||||
actual = broker.get_shard_ranges(marker='u', end_marker='',
|
||||
include_own=True)
|
||||
self.assertFalse(actual)
|
||||
self._check_get_sr(broker, expected_sr=undeleted,
|
||||
marker='', end_marker='k', include_own=True)
|
||||
self._check_get_sr(broker, expected_sr=[],
|
||||
marker='u', end_marker='', include_own=True)
|
||||
# ...unless they coincide
|
||||
actual = broker.get_shard_ranges(marker='t', end_marker='',
|
||||
include_own=True)
|
||||
self.assertEqual([dict(own_shard_range)], [dict(sr) for sr in actual])
|
||||
self._check_get_sr(broker, expected_sr=[own_shard_range],
|
||||
marker='t', end_marker='', include_own=True)
|
||||
|
||||
# null namespace cases
|
||||
actual = broker.get_shard_ranges(end_marker=Namespace.MIN)
|
||||
self.assertFalse(actual)
|
||||
|
||||
actual = broker.get_shard_ranges(marker=Namespace.MAX)
|
||||
self.assertFalse(actual)
|
||||
self._check_get_sr_and_ns(
|
||||
broker, end_marker=Namespace.MIN, expected_sr=[])
|
||||
self._check_get_sr_and_ns(
|
||||
broker, marker=Namespace.MAX, expected_sr=[])
|
||||
|
||||
orig_execute = GreenDBConnection.execute
|
||||
mock_call_args = []
|
||||
@ -4380,9 +4380,7 @@ class TestContainerBroker(test_db.TestDbBase):
|
||||
|
||||
with mock.patch('swift.common.db.GreenDBConnection.execute',
|
||||
mock_execute):
|
||||
actual = broker.get_shard_ranges(includes='f')
|
||||
self.assertEqual([dict(sr) for sr in shard_ranges[2:3]],
|
||||
[dict(sr) for sr in actual])
|
||||
self._check_get_sr(broker, shard_ranges[2:3], includes='f')
|
||||
self.assertEqual(1, len(mock_call_args))
|
||||
# verify that includes keyword plumbs through to an SQL condition
|
||||
self.assertIn("WHERE deleted=0 AND name != ? AND lower < ? AND "
|
||||
@ -4392,65 +4390,57 @@ class TestContainerBroker(test_db.TestDbBase):
|
||||
mock_call_args = []
|
||||
with mock.patch('swift.common.db.GreenDBConnection.execute',
|
||||
mock_execute):
|
||||
actual = broker.get_shard_ranges(marker='c', end_marker='d')
|
||||
self.assertEqual([dict(sr) for sr in shard_ranges[1:2]],
|
||||
[dict(sr) for sr in actual])
|
||||
self._check_get_sr(broker, shard_ranges[1:2],
|
||||
marker='c', end_marker='d')
|
||||
self.assertEqual(1, len(mock_call_args))
|
||||
# verify that marker & end_marker plumb through to an SQL condition
|
||||
self.assertIn("WHERE deleted=0 AND name != ? AND lower < ? AND "
|
||||
"(upper = '' OR upper > ?)", mock_call_args[0][1])
|
||||
self.assertEqual(['a/c', 'd', 'c'], mock_call_args[0][2])
|
||||
|
||||
actual = broker.get_shard_ranges(includes='i')
|
||||
self.assertFalse(actual)
|
||||
|
||||
actual = broker.get_shard_ranges(
|
||||
states=[ShardRange.CREATED, ShardRange.ACTIVE])
|
||||
self.assertEqual(
|
||||
[dict(sr) for sr in [shard_ranges[2], shard_ranges[4]]],
|
||||
[dict(sr) for sr in actual])
|
||||
self._check_get_sr_and_ns(broker, includes='i', expected_sr=[])
|
||||
self._check_get_sr_and_ns(
|
||||
broker, states=[ShardRange.CREATED, ShardRange.ACTIVE],
|
||||
expected_sr=[shard_ranges[2], shard_ranges[4]])
|
||||
|
||||
# fill gaps
|
||||
filler = own_shard_range.copy()
|
||||
filler.lower = 'h'
|
||||
actual = broker.get_shard_ranges(fill_gaps=True)
|
||||
self.assertEqual([dict(sr) for sr in undeleted + [filler]],
|
||||
[dict(sr) for sr in actual])
|
||||
actual = broker.get_shard_ranges(fill_gaps=True, marker='a')
|
||||
self.assertEqual([dict(sr) for sr in undeleted + [filler]],
|
||||
[dict(sr) for sr in actual])
|
||||
actual = broker.get_shard_ranges(fill_gaps=True, end_marker='z')
|
||||
self.assertEqual([dict(sr) for sr in undeleted + [filler]],
|
||||
[dict(sr) for sr in actual])
|
||||
actual = broker.get_shard_ranges(fill_gaps=True, end_marker='k')
|
||||
self._check_get_sr_and_ns(
|
||||
broker, fill_gaps=True, expected_sr=undeleted + [filler])
|
||||
self._check_get_sr_and_ns(
|
||||
broker, fill_gaps=True, marker='a',
|
||||
expected_sr=undeleted + [filler])
|
||||
self._check_get_sr_and_ns(
|
||||
broker, fill_gaps=True, end_marker='z',
|
||||
expected_sr=undeleted + [filler])
|
||||
filler.upper = 'k'
|
||||
self.assertEqual([dict(sr) for sr in undeleted + [filler]],
|
||||
[dict(sr) for sr in actual])
|
||||
self._check_get_sr_and_ns(
|
||||
broker, fill_gaps=True, end_marker='k',
|
||||
expected_sr=undeleted + [filler])
|
||||
|
||||
# includes overrides fill_gaps
|
||||
actual = broker.get_shard_ranges(includes='b', fill_gaps=True)
|
||||
self.assertEqual([dict(shard_ranges[0])], [dict(sr) for sr in actual])
|
||||
self._check_get_sr_and_ns(
|
||||
broker, includes='b', fill_gaps=True,
|
||||
expected_sr=[shard_ranges[0]])
|
||||
|
||||
# no filler needed...
|
||||
actual = broker.get_shard_ranges(fill_gaps=True, end_marker='h')
|
||||
self.assertEqual([dict(sr) for sr in undeleted],
|
||||
[dict(sr) for sr in actual])
|
||||
actual = broker.get_shard_ranges(fill_gaps=True, end_marker='a')
|
||||
self.assertEqual([], [dict(sr) for sr in actual])
|
||||
self._check_get_sr_and_ns(
|
||||
broker, fill_gaps=True, end_marker='h', expected_sr=undeleted)
|
||||
self._check_get_sr_and_ns(
|
||||
broker, fill_gaps=True, end_marker='a', expected_sr=[])
|
||||
|
||||
# get everything
|
||||
actual = broker.get_shard_ranges(include_own=True)
|
||||
self.assertEqual([dict(sr) for sr in undeleted + [own_shard_range]],
|
||||
[dict(sr) for sr in actual])
|
||||
self._check_get_sr(broker, expected_sr=undeleted + [own_shard_range],
|
||||
include_own=True)
|
||||
|
||||
# get just own range
|
||||
actual = broker.get_shard_ranges(include_own=True, exclude_others=True)
|
||||
self.assertEqual([dict(sr) for sr in [own_shard_range]],
|
||||
[dict(sr) for sr in actual])
|
||||
self._check_get_sr(broker, expected_sr=[own_shard_range],
|
||||
include_own=True, exclude_others=True)
|
||||
|
||||
# if you ask for nothing you'll get nothing
|
||||
actual = broker.get_shard_ranges(
|
||||
self._check_get_sr(broker, expected_sr=[],
|
||||
include_own=False, exclude_others=True)
|
||||
self.assertFalse(actual)
|
||||
|
||||
@with_tempdir
|
||||
def test_get_shard_ranges_includes(self, tempdir):
|
||||
@ -4467,45 +4457,32 @@ class TestContainerBroker(test_db.TestDbBase):
|
||||
broker.initialize(next(self.ts).internal, 0)
|
||||
broker.merge_shard_ranges(ranges)
|
||||
|
||||
actual = broker.get_shard_ranges(includes='')
|
||||
self.assertEqual(actual, [])
|
||||
actual = broker.get_shard_ranges(includes=' ')
|
||||
self.assertEqual(actual, [start])
|
||||
actual = broker.get_shard_ranges(includes='b')
|
||||
self.assertEqual(actual, [atof])
|
||||
actual = broker.get_shard_ranges(includes='f')
|
||||
self.assertEqual(actual, [atof])
|
||||
actual = broker.get_shard_ranges(includes='f\x00')
|
||||
self.assertEqual(actual, [ftol])
|
||||
actual = broker.get_shard_ranges(includes='x')
|
||||
self.assertEqual(actual, [rtoz])
|
||||
actual = broker.get_shard_ranges(includes='r')
|
||||
self.assertEqual(actual, [ltor])
|
||||
actual = broker.get_shard_ranges(includes='}')
|
||||
self.assertEqual(actual, [end])
|
||||
self._check_get_sr_and_ns(broker, includes='', expected_sr=[])
|
||||
self._check_get_sr_and_ns(broker, includes=' ', expected_sr=[start])
|
||||
self._check_get_sr_and_ns(broker, includes='b', expected_sr=[atof])
|
||||
self._check_get_sr_and_ns(broker, includes='f', expected_sr=[atof])
|
||||
self._check_get_sr_and_ns(
|
||||
broker, includes='f\x00', expected_sr=[ftol])
|
||||
self._check_get_sr_and_ns(broker, includes='x', expected_sr=[rtoz])
|
||||
self._check_get_sr_and_ns(broker, includes='r', expected_sr=[ltor])
|
||||
self._check_get_sr_and_ns(broker, includes='}', expected_sr=[end])
|
||||
|
||||
# add some overlapping sub-shards
|
||||
ftoh = ShardRange('a/f-h', ts, 'f', 'h')
|
||||
htok = ShardRange('a/h-k', ts, 'h', 'k')
|
||||
|
||||
broker.merge_shard_ranges([ftoh, htok])
|
||||
actual = broker.get_shard_ranges(includes='g')
|
||||
self.assertEqual(actual, [ftoh])
|
||||
actual = broker.get_shard_ranges(includes='h')
|
||||
self.assertEqual(actual, [ftoh])
|
||||
actual = broker.get_shard_ranges(includes='k')
|
||||
self.assertEqual(actual, [htok])
|
||||
actual = broker.get_shard_ranges(includes='l')
|
||||
self.assertEqual(actual, [ftol])
|
||||
actual = broker.get_shard_ranges(includes='m')
|
||||
self.assertEqual(actual, [ltor])
|
||||
self._check_get_sr_and_ns(broker, includes='g', expected_sr=[ftoh])
|
||||
self._check_get_sr_and_ns(broker, includes='h', expected_sr=[ftoh])
|
||||
self._check_get_sr_and_ns(broker, includes='k', expected_sr=[htok])
|
||||
self._check_get_sr_and_ns(broker, includes='l', expected_sr=[ftol])
|
||||
self._check_get_sr_and_ns(broker, includes='m', expected_sr=[ltor])
|
||||
|
||||
# remove l-r from shard ranges and try and find a shard range for an
|
||||
# item in that range.
|
||||
ltor.set_deleted(next(self.ts))
|
||||
broker.merge_shard_ranges([ltor])
|
||||
actual = broker.get_shard_ranges(includes='p')
|
||||
self.assertEqual(actual, [])
|
||||
self._check_get_sr_and_ns(broker, includes='p', expected_sr=[])
|
||||
|
||||
@with_tempdir
|
||||
def test_overlap_shard_range_order(self, tempdir):
|
||||
@ -4522,29 +4499,13 @@ class TestContainerBroker(test_db.TestDbBase):
|
||||
string.ascii_letters[1:]))
|
||||
for e, epoch in enumerate((epoch0, epoch1))
|
||||
]
|
||||
expected_sr = [sr for sr in shard_ranges]
|
||||
|
||||
random.shuffle(shard_ranges)
|
||||
for sr in shard_ranges:
|
||||
broker.merge_shard_ranges([sr])
|
||||
|
||||
expected = [
|
||||
'.shard_a/shard_0-0',
|
||||
'.shard_a/shard_1-0',
|
||||
'.shard_a/shard_0-1',
|
||||
'.shard_a/shard_1-1',
|
||||
'.shard_a/shard_0-2',
|
||||
'.shard_a/shard_1-2',
|
||||
'.shard_a/shard_0-3',
|
||||
'.shard_a/shard_1-3',
|
||||
'.shard_a/shard_0-4',
|
||||
'.shard_a/shard_1-4',
|
||||
'.shard_a/shard_0-5',
|
||||
'.shard_a/shard_1-5',
|
||||
'.shard_a/shard_0-6',
|
||||
'.shard_a/shard_1-6',
|
||||
]
|
||||
self.assertEqual(expected, [
|
||||
sr.name for sr in broker.get_shard_ranges()])
|
||||
self._check_get_sr_and_ns(broker, expected_sr)
|
||||
|
||||
@with_tempdir
|
||||
def test_get_shard_ranges_with_sharding_overlaps(self, tempdir):
|
||||
@ -4567,14 +4528,11 @@ class TestContainerBroker(test_db.TestDbBase):
|
||||
]
|
||||
broker.merge_shard_ranges(
|
||||
random.sample(shard_ranges, len(shard_ranges)))
|
||||
actual = broker.get_shard_ranges()
|
||||
self.assertEqual([dict(sr) for sr in shard_ranges],
|
||||
[dict(sr) for sr in actual])
|
||||
self._check_get_sr_and_ns(broker, expected_sr=shard_ranges)
|
||||
|
||||
actual = broker.get_shard_ranges(states=SHARD_LISTING_STATES)
|
||||
self.assertEqual(
|
||||
[dict(sr) for sr in shard_ranges[:3] + shard_ranges[4:]],
|
||||
[dict(sr) for sr in actual])
|
||||
self._check_get_sr_and_ns(
|
||||
broker, states=SHARD_LISTING_STATES,
|
||||
expected_sr=shard_ranges[:3] + shard_ranges[4:])
|
||||
|
||||
orig_execute = GreenDBConnection.execute
|
||||
mock_call_args = []
|
||||
@ -4585,22 +4543,17 @@ class TestContainerBroker(test_db.TestDbBase):
|
||||
|
||||
with mock.patch('swift.common.db.GreenDBConnection.execute',
|
||||
mock_execute):
|
||||
actual = broker.get_shard_ranges(states=SHARD_UPDATE_STATES,
|
||||
includes='e')
|
||||
self.assertEqual([dict(shard_ranges[1])],
|
||||
[dict(sr) for sr in actual])
|
||||
self._check_get_sr(broker, shard_ranges[1:2],
|
||||
states=SHARD_UPDATE_STATES, includes='e')
|
||||
self.assertEqual(1, len(mock_call_args))
|
||||
# verify that includes keyword plumbs through to an SQL condition
|
||||
self.assertIn("WHERE deleted=0 AND state in (?,?,?,?) AND name != ? "
|
||||
"AND lower < ? AND (upper = '' OR upper >= ?)",
|
||||
mock_call_args[0][1])
|
||||
|
||||
actual = broker.get_shard_ranges(states=SHARD_UPDATE_STATES,
|
||||
includes='j')
|
||||
self.assertEqual([shard_ranges[2]], actual)
|
||||
actual = broker.get_shard_ranges(states=SHARD_UPDATE_STATES,
|
||||
includes='k')
|
||||
self.assertEqual([shard_ranges[3]], actual)
|
||||
self._check_get_sr(broker, shard_ranges[2:3],
|
||||
states=SHARD_UPDATE_STATES, includes='j')
|
||||
self._check_get_sr(broker, shard_ranges[3:4],
|
||||
states=SHARD_UPDATE_STATES, includes='k')
|
||||
|
||||
@with_tempdir
|
||||
def test_get_shard_ranges_with_shrinking_overlaps(self, tempdir):
|
||||
@ -4619,13 +4572,10 @@ class TestContainerBroker(test_db.TestDbBase):
|
||||
]
|
||||
broker.merge_shard_ranges(
|
||||
random.sample(shard_ranges, len(shard_ranges)))
|
||||
actual = broker.get_shard_ranges()
|
||||
self.assertEqual([dict(sr) for sr in shard_ranges],
|
||||
[dict(sr) for sr in actual])
|
||||
|
||||
actual = broker.get_shard_ranges(states=SHARD_UPDATE_STATES,
|
||||
includes='l')
|
||||
self.assertEqual([shard_ranges[2]], actual)
|
||||
self._check_get_sr_and_ns(broker, expected_sr=shard_ranges)
|
||||
self._check_get_sr_and_ns(
|
||||
broker, states=SHARD_UPDATE_STATES, includes='l',
|
||||
expected_sr=[shard_ranges[2]])
|
||||
|
||||
@with_tempdir
|
||||
def test_get_shard_range_rows_with_limit(self, tempdir):
|
||||
@ -4679,6 +4629,171 @@ class TestContainerBroker(test_db.TestDbBase):
|
||||
self.assertEqual(1, len(actual))
|
||||
self.assertEqual(shard_ranges[0], ShardRange(*actual[0]))
|
||||
|
||||
def _setup_broker_with_shard_ranges(self, tempdir,
|
||||
own_shard_range, shard_ranges):
|
||||
db_path = os.path.join(tempdir, 'container.db')
|
||||
broker = ContainerBroker(db_path, account='a', container='c')
|
||||
broker.initialize(next(self.ts).internal, 0)
|
||||
|
||||
# no rows
|
||||
self.assertFalse(broker.get_shard_ranges())
|
||||
self.assertFalse(broker.get_namespaces())
|
||||
|
||||
# merge row for own shard range
|
||||
broker.merge_shard_ranges([own_shard_range])
|
||||
self._check_get_sr(broker, [own_shard_range], include_own=True)
|
||||
self.assertFalse(broker.get_namespaces())
|
||||
|
||||
# merge rows for other shard ranges
|
||||
broker.merge_shard_ranges(shard_ranges)
|
||||
return broker
|
||||
|
||||
@with_tempdir
|
||||
def test_get_namespaces(self, tempdir):
|
||||
own_shard_range = ShardRange('a/c', next(
|
||||
self.ts), 'a', 'z', state=ShardRange.SHARDING)
|
||||
shard_ranges = [
|
||||
ShardRange('.a/c0', next(self.ts), 'a',
|
||||
'c', state=ShardRange.CREATED),
|
||||
ShardRange('.a/c1', next(self.ts), 'c',
|
||||
'd', state=ShardRange.CREATED),
|
||||
ShardRange('.a/c2', next(self.ts), 'd', 'f',
|
||||
state=ShardRange.ACTIVE),
|
||||
ShardRange('.a/c3', next(self.ts), 'e', 'f', deleted=1,
|
||||
state=ShardRange.SHARDING,),
|
||||
ShardRange('.a/c4', next(self.ts), 'f', 'h',
|
||||
state=ShardRange.SHARDING),
|
||||
ShardRange('.a/c5', next(self.ts), 'h', 'j', deleted=1)
|
||||
]
|
||||
broker = self._setup_broker_with_shard_ranges(
|
||||
tempdir, own_shard_range, shard_ranges)
|
||||
undeleted = [sr for sr in shard_ranges if not sr.deleted]
|
||||
self._check_get_sr(broker, undeleted)
|
||||
|
||||
# test get all undeleted namespaces with gap filled.
|
||||
expected_ns = [Namespace(sr.name, sr.lower, sr.upper)
|
||||
for sr in undeleted]
|
||||
filler = [Namespace('a/c', 'h', 'z')]
|
||||
self._check_get_ns(broker, expected_ns + filler, fill_gaps=True)
|
||||
# test get all undeleted namespaces w/o gap filled.
|
||||
self._check_get_sr_and_ns(broker, undeleted)
|
||||
|
||||
orig_execute = GreenDBConnection.execute
|
||||
mock_call_args = []
|
||||
|
||||
def mock_execute(*args, **kwargs):
|
||||
mock_call_args.append(args)
|
||||
return orig_execute(*args, **kwargs)
|
||||
|
||||
with mock.patch('swift.common.db.GreenDBConnection.execute',
|
||||
mock_execute):
|
||||
self._check_get_ns(broker, expected_ns,
|
||||
states=[ShardRange.CREATED, ShardRange.ACTIVE,
|
||||
ShardRange.SHARDING])
|
||||
self.assertEqual(1, len(mock_call_args))
|
||||
# verify that includes keyword plumbs through to an SQL condition
|
||||
self.assertIn(
|
||||
"WHERE deleted = 0 AND name != ? AND state in (?,?,?)",
|
||||
mock_call_args[0][1])
|
||||
self.assertEqual(set(['a/c', ShardRange.ACTIVE, ShardRange.CREATED,
|
||||
ShardRange.SHARDING]), set(mock_call_args[0][2]))
|
||||
|
||||
mock_call_args = []
|
||||
with mock.patch('swift.common.db.GreenDBConnection.execute',
|
||||
mock_execute):
|
||||
self._check_get_ns(broker, expected_ns[2:3], includes='f')
|
||||
self.assertEqual(1, len(mock_call_args))
|
||||
# verify that includes keyword plumbs through to an SQL condition
|
||||
self.assertIn("WHERE deleted = 0 AND name != ? AND lower < ? AND "
|
||||
"(upper = '' OR upper >= ?)", mock_call_args[0][1])
|
||||
self.assertEqual(['a/c', 'f', 'f'], mock_call_args[0][2])
|
||||
|
||||
mock_call_args = []
|
||||
with mock.patch('swift.common.db.GreenDBConnection.execute',
|
||||
mock_execute):
|
||||
self._check_get_ns(broker, expected_ns[1:2],
|
||||
marker='c', end_marker='d')
|
||||
self.assertEqual(1, len(mock_call_args))
|
||||
# verify that marker & end_marker plumb through to an SQL condition
|
||||
self.assertIn("WHERE deleted = 0 AND name != ? AND lower < ? AND "
|
||||
"(upper = '' OR upper > ?)", mock_call_args[0][1])
|
||||
self.assertEqual(['a/c', 'd', 'c'], mock_call_args[0][2])
|
||||
|
||||
@with_tempdir
|
||||
def test_get_namespaces_state_filtering(self, tempdir):
|
||||
own_shard_range = ShardRange('a/c', next(
|
||||
self.ts), 'a', 'z', state=ShardRange.SHARDING)
|
||||
shard_ranges = [
|
||||
ShardRange('.a/c0', next(self.ts), 'a', 'c',
|
||||
state=ShardRange.CREATED),
|
||||
ShardRange('.a/c1', next(self.ts), 'c', 'd',
|
||||
state=ShardRange.CREATED),
|
||||
ShardRange('.a/c2', next(self.ts), 'd', 'f',
|
||||
state=ShardRange.SHARDING),
|
||||
ShardRange('.a/c2a', next(self.ts), 'd', 'e',
|
||||
state=ShardRange.ACTIVE),
|
||||
ShardRange('.a/c2b', next(self.ts), 'e', 'f',
|
||||
state=ShardRange.ACTIVE, ),
|
||||
ShardRange('.a/c3', next(self.ts), 'f', 'h',
|
||||
state=ShardRange.SHARDING),
|
||||
ShardRange('.a/c4', next(self.ts), 'h', 'j', deleted=1,
|
||||
state=ShardRange.SHARDED)
|
||||
]
|
||||
broker = self._setup_broker_with_shard_ranges(
|
||||
tempdir, own_shard_range, shard_ranges)
|
||||
|
||||
def do_test(states, expected_sr):
|
||||
self._check_get_sr_and_ns(broker, expected_sr, states=states)
|
||||
expected_ns = [Namespace(sr.name, sr.lower, sr.upper)
|
||||
for sr in expected_sr]
|
||||
filler_lower = expected_sr[-1].upper if expected_sr else 'a'
|
||||
filler = [Namespace('a/c', filler_lower, 'z')]
|
||||
self._check_get_ns(broker, expected_ns + filler,
|
||||
states=states, fill_gaps=True)
|
||||
|
||||
do_test([ShardRange.CREATED], shard_ranges[:2])
|
||||
do_test([ShardRange.CREATED, ShardRange.ACTIVE],
|
||||
shard_ranges[:2] + shard_ranges[3:5])
|
||||
# this case verifies that state trumps lower for ordering...
|
||||
do_test([ShardRange.ACTIVE, ShardRange.SHARDING],
|
||||
shard_ranges[3:5] + shard_ranges[2:3] + shard_ranges[5:6])
|
||||
do_test([ShardRange.CREATED, ShardRange.ACTIVE, ShardRange.SHARDING],
|
||||
shard_ranges[:2] + shard_ranges[3:5] + shard_ranges[2:3] +
|
||||
shard_ranges[5:6])
|
||||
do_test([ShardRange.SHARDED], [])
|
||||
|
||||
@with_tempdir
|
||||
def test_get_namespaces_root_container_fill_gap(self, tempdir):
|
||||
# Test GET namespaces from a root container with full namespace.
|
||||
own_shard_range = ShardRange('a/c', next(
|
||||
self.ts), '', '', state=ShardRange.SHARDED)
|
||||
shard_ranges = [
|
||||
ShardRange('.a/c0', next(self.ts), '',
|
||||
'a', state=ShardRange.CREATED),
|
||||
ShardRange('.a/c1', next(self.ts), 'a',
|
||||
'c', state=ShardRange.CREATED),
|
||||
ShardRange('.a/c2', next(self.ts), 'c',
|
||||
'd', state=ShardRange.CREATED),
|
||||
ShardRange('.a/c3', next(self.ts), 'd', 'f',
|
||||
state=ShardRange.ACTIVE),
|
||||
ShardRange('.a/c4', next(self.ts), 'f', 'h',
|
||||
state=ShardRange.SHARDING),
|
||||
ShardRange('.a/c5', next(self.ts), 'h', '',
|
||||
state=ShardRange.SHARDING),
|
||||
]
|
||||
broker = self._setup_broker_with_shard_ranges(
|
||||
tempdir, own_shard_range, shard_ranges)
|
||||
undeleted = [sr for sr in shard_ranges if not sr.deleted]
|
||||
self._check_get_sr_and_ns(broker, undeleted, fill_gaps=True)
|
||||
|
||||
# test optimization will skip ``get_own_shard_range`` call.
|
||||
with mock.patch.object(
|
||||
broker, 'get_own_shard_range') as mock_get_own_sr:
|
||||
self._check_get_ns(broker, undeleted, fill_gaps=True)
|
||||
mock_get_own_sr.assert_not_called()
|
||||
# test get all undeleted namespaces w/o gap filled.
|
||||
self._check_get_sr_and_ns(broker, undeleted)
|
||||
|
||||
@with_tempdir
|
||||
def test_get_own_shard_range(self, tempdir):
|
||||
db_path = os.path.join(tempdir, 'container.db')
|
||||
|
@ -43,7 +43,7 @@ from swift.container import server as container_server
|
||||
from swift.common import constraints
|
||||
from swift.common.utils import (Timestamp, mkdirs, public, replication,
|
||||
storage_directory, lock_parent_directory,
|
||||
ShardRange, RESERVED_STR)
|
||||
ShardRange, RESERVED_STR, Namespace)
|
||||
from test.debug_logger import debug_logger
|
||||
from test.unit import fake_http_connect, mock_check_drive
|
||||
from swift.common.storage_policy import (POLICIES, StoragePolicy)
|
||||
@ -1722,64 +1722,89 @@ class TestContainerController(unittest.TestCase):
|
||||
'content_type': 'foo/bar', 'last_modified': obj_ts.isoformat},
|
||||
])
|
||||
|
||||
def test_DELETE(self):
|
||||
ts_iter = make_timestamp_iter()
|
||||
def _populate_container(self, path):
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c',
|
||||
path,
|
||||
environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': next(ts_iter).internal})
|
||||
headers={'X-Timestamp': next(self.ts).internal})
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
|
||||
# PUT an *empty* shard range
|
||||
sr = ShardRange('.shards_a/c', next(ts_iter), 'l', 'u', 0, 0,
|
||||
sr = ShardRange('.shards_a/c', next(self.ts), 'l', 'u', 0, 0,
|
||||
state=ShardRange.ACTIVE)
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c',
|
||||
path,
|
||||
environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers={'X-Timestamp': next(ts_iter).internal,
|
||||
headers={'X-Timestamp': next(self.ts).internal,
|
||||
'X-Backend-Record-Type': 'shard'},
|
||||
body=json.dumps([dict(sr)]))
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEqual(resp.status_int, 202)
|
||||
return sr
|
||||
|
||||
def _populate_and_delete_container(self, path):
|
||||
sr = self._populate_container(path)
|
||||
req = Request.blank(
|
||||
path,
|
||||
environ={'REQUEST_METHOD': 'DELETE'},
|
||||
headers={'X-Timestamp': next(self.ts).internal})
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEqual(resp.status_int, 204)
|
||||
return sr
|
||||
|
||||
def test_DELETE(self):
|
||||
self._populate_container('/sda1/p/a/c')
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c',
|
||||
environ={'REQUEST_METHOD': 'DELETE'},
|
||||
headers={'X-Timestamp': next(ts_iter).internal})
|
||||
headers={'X-Timestamp': next(self.ts).internal})
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEqual(resp.status_int, 204)
|
||||
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c',
|
||||
environ={'REQUEST_METHOD': 'GET'},
|
||||
headers={'X-Timestamp': next(ts_iter).internal})
|
||||
headers={'X-Timestamp': next(self.ts).internal})
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEqual(resp.status_int, 404)
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c',
|
||||
environ={'REQUEST_METHOD': 'GET'},
|
||||
headers={'X-Timestamp': next(ts_iter).internal,
|
||||
headers={'X-Timestamp': next(self.ts).internal,
|
||||
'X-Backend-Record-Type': 'shard'},
|
||||
params={'format': 'json'})
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEqual(resp.status_int, 404)
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c',
|
||||
environ={'REQUEST_METHOD': 'GET'},
|
||||
headers={'X-Timestamp': next(self.ts).internal,
|
||||
'X-Backend-Record-Type': 'namespace'},
|
||||
params={'format': 'json'})
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEqual(resp.status_int, 404)
|
||||
|
||||
def test_GET_with_override_deleted_ignored_for_objects(self):
|
||||
self._populate_and_delete_container('/sda1/p/a/c')
|
||||
# the override-deleted header is ignored for object records
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c',
|
||||
environ={'REQUEST_METHOD': 'GET'},
|
||||
headers={'X-Timestamp': next(ts_iter).internal,
|
||||
headers={'X-Timestamp': next(self.ts).internal,
|
||||
'X-Backend-Override-Deleted': 'true'},
|
||||
params={'format': 'json'})
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEqual(resp.status_int, 404)
|
||||
|
||||
# but override-deleted header makes shard ranges available after DELETE
|
||||
def test_GET_with_override_deleted_for_shard_ranges(self):
|
||||
sr = self._populate_and_delete_container('/sda1/p/a/c')
|
||||
|
||||
# override-deleted header makes shard ranges available after DELETE
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c',
|
||||
environ={'REQUEST_METHOD': 'GET'},
|
||||
headers={'X-Timestamp': next(ts_iter).internal,
|
||||
headers={'X-Timestamp': next(self.ts).internal,
|
||||
'X-Backend-Record-Type': 'shard',
|
||||
'X-Backend-Override-Deleted': 'true'},
|
||||
params={'format': 'json'})
|
||||
@ -1789,12 +1814,33 @@ class TestContainerController(unittest.TestCase):
|
||||
json.loads(resp.body))
|
||||
self.assertIn('X-Backend-Record-Type', resp.headers)
|
||||
self.assertEqual('shard', resp.headers['X-Backend-Record-Type'])
|
||||
self.assertIn('X-Backend-Record-Shard-Format', resp.headers)
|
||||
self.assertEqual(
|
||||
'full', resp.headers['X-Backend-Record-Shard-Format'])
|
||||
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c',
|
||||
environ={'REQUEST_METHOD': 'GET'},
|
||||
headers={'X-Timestamp': next(self.ts).internal,
|
||||
'X-Backend-Record-Type': 'shard',
|
||||
'X-Backend-Record-Shard-Format': 'full',
|
||||
'X-Backend-Override-Deleted': 'true'},
|
||||
params={'format': 'json'})
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertEqual([dict(sr, last_modified=sr.timestamp.isoformat)],
|
||||
json.loads(resp.body))
|
||||
self.assertIn('X-Backend-Record-Type', resp.headers)
|
||||
self.assertEqual('shard', resp.headers['X-Backend-Record-Type'])
|
||||
self.assertIn('X-Backend-Record-Shard-Format', resp.headers)
|
||||
self.assertEqual(
|
||||
'full', resp.headers['X-Backend-Record-Shard-Format'])
|
||||
|
||||
# ... unless the override header equates to False
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c',
|
||||
environ={'REQUEST_METHOD': 'GET'},
|
||||
headers={'X-Timestamp': next(ts_iter).internal,
|
||||
headers={'X-Timestamp': next(self.ts).internal,
|
||||
'X-Backend-Record-Type': 'shard',
|
||||
'X-Backend-Override-Deleted': 'no'},
|
||||
params={'format': 'json'})
|
||||
@ -1802,7 +1848,42 @@ class TestContainerController(unittest.TestCase):
|
||||
self.assertEqual(resp.status_int, 404)
|
||||
self.assertNotIn('X-Backend-Record-Type', resp.headers)
|
||||
|
||||
# ...or the db file is unlinked
|
||||
def test_GET_with_override_deleted_for_namespaces(self):
|
||||
sr = self._populate_and_delete_container('/sda1/p/a/c')
|
||||
# override-deleted header makes shard ranges available after DELETE
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c',
|
||||
environ={'REQUEST_METHOD': 'GET'},
|
||||
headers={'X-Timestamp': next(self.ts).internal,
|
||||
'X-Backend-Record-Type': 'shard',
|
||||
'X-Backend-Record-Shard-Format': 'namespace',
|
||||
'X-Backend-Override-Deleted': 'true'},
|
||||
params={'format': 'json'})
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertEqual([dict(Namespace(sr.name, sr.lower, sr.upper))],
|
||||
json.loads(resp.body))
|
||||
self.assertIn('X-Backend-Record-Type', resp.headers)
|
||||
self.assertEqual('shard', resp.headers['X-Backend-Record-Type'])
|
||||
self.assertIn('X-Backend-Record-Shard-Format', resp.headers)
|
||||
self.assertEqual(
|
||||
'namespace', resp.headers['X-Backend-Record-Shard-Format'])
|
||||
|
||||
# ... unless the override header equates to False
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c',
|
||||
environ={'REQUEST_METHOD': 'GET'},
|
||||
headers={'X-Timestamp': next(self.ts).internal,
|
||||
'X-Backend-Record-Type': 'shard',
|
||||
'X-Backend-Record-Shard-Format': 'namespace',
|
||||
'X-Backend-Override-Deleted': 'no'},
|
||||
params={'format': 'json'})
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEqual(resp.status_int, 404)
|
||||
self.assertNotIn('X-Backend-Record-Type', resp.headers)
|
||||
|
||||
def test_GET_with_override_deleted_for_shard_ranges_db_unlinked(self):
|
||||
self._populate_and_delete_container('/sda1/p/a/c')
|
||||
broker = self.controller._get_container_broker('sda1', 'p', 'a', 'c')
|
||||
self.assertTrue(os.path.exists(broker.db_file))
|
||||
os.unlink(broker.db_file)
|
||||
@ -1810,7 +1891,7 @@ class TestContainerController(unittest.TestCase):
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c',
|
||||
environ={'REQUEST_METHOD': 'GET'},
|
||||
headers={'X-Timestamp': next(ts_iter).internal,
|
||||
headers={'X-Timestamp': next(self.ts).internal,
|
||||
'X-Backend-Record-Type': 'shard',
|
||||
'X-Backend-Override-Deleted': 'true'},
|
||||
params={'format': 'json'})
|
||||
@ -3079,6 +3160,31 @@ class TestContainerController(unittest.TestCase):
|
||||
self.assertFalse(self.controller.logger.get_lines_for_level('warning'))
|
||||
self.assertFalse(self.controller.logger.get_lines_for_level('error'))
|
||||
|
||||
def test_GET_shard_ranges_with_format_header(self):
|
||||
# verify that shard range GET defaults to the 'full' format
|
||||
sr = self._populate_container('/sda1/p/a/c')
|
||||
|
||||
def do_test(headers):
|
||||
headers.update({'X-Timestamp': next(self.ts).internal,
|
||||
'X-Backend-Record-Type': 'shard'})
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c', environ={'REQUEST_METHOD': 'GET'},
|
||||
headers=headers, params={'format': 'json'})
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertEqual([dict(sr, last_modified=sr.timestamp.isoformat)],
|
||||
json.loads(resp.body))
|
||||
self.assertIn('X-Backend-Record-Type', resp.headers)
|
||||
self.assertEqual('shard', resp.headers['X-Backend-Record-Type'])
|
||||
self.assertIn('X-Backend-Record-Shard-Format', resp.headers)
|
||||
self.assertEqual(
|
||||
'full', resp.headers['X-Backend-Record-Shard-Format'])
|
||||
|
||||
do_test({})
|
||||
do_test({'X-Backend-Record-Shard-Format': ''})
|
||||
do_test({'X-Backend-Record-Shard-Format': 'full'})
|
||||
do_test({'X-Backend-Record-Shard-Format': 'nonsense'})
|
||||
|
||||
def test_GET_shard_ranges_from_compacted_shard(self):
|
||||
# make a shrunk shard container with two acceptors that overlap with
|
||||
# the shard's namespace
|
||||
@ -3472,6 +3578,391 @@ class TestContainerController(unittest.TestCase):
|
||||
'states': 'auditing'})
|
||||
self.assertEqual(expected, json.loads(resp.body))
|
||||
|
||||
def _do_get_namespaces_unsharded(self, root_path, path,
|
||||
params, expected_states):
|
||||
# make a shard container
|
||||
shard_ranges = []
|
||||
lower = ''
|
||||
for state in sorted(ShardRange.STATES.keys()):
|
||||
upper = str(state)
|
||||
shard_ranges.append(
|
||||
ShardRange('.shards_a/c_%s' % upper, next(self.ts),
|
||||
lower, upper, state * 100, state * 1000,
|
||||
meta_timestamp=next(self.ts),
|
||||
state=state, state_timestamp=next(self.ts)))
|
||||
lower = upper
|
||||
expected_sr = [
|
||||
sr for sr in shard_ranges if sr.state in expected_states]
|
||||
own_shard_range = ShardRange(path, next(self.ts), '', '',
|
||||
state=ShardRange.ACTIVE)
|
||||
filler_sr = own_shard_range.copy(lower=expected_sr[-1].upper)
|
||||
expected_sr.append(filler_sr)
|
||||
expected_ns = [{'name': sr.name, 'lower': sr.lower_str,
|
||||
'upper': sr.upper_str} for sr in expected_sr]
|
||||
headers = {'X-Timestamp': next(self.ts).normal}
|
||||
|
||||
# create container
|
||||
req = Request.blank(
|
||||
'/sda1/p/%s' % path, method='PUT', headers=headers)
|
||||
self.assertIn(
|
||||
req.get_response(self.controller).status_int, (201, 202))
|
||||
# PUT some shard ranges
|
||||
headers = {'X-Timestamp': next(self.ts).normal,
|
||||
'X-Container-Sysmeta-Shard-Root': root_path,
|
||||
'X-Backend-Record-Type': 'shard'}
|
||||
body = json.dumps(
|
||||
[dict(sr) for sr in shard_ranges + [own_shard_range]])
|
||||
req = Request.blank(
|
||||
'/sda1/p/%s' % path, method='PUT', headers=headers, body=body)
|
||||
self.assertEqual(202, req.get_response(self.controller).status_int)
|
||||
# GET namespaces.
|
||||
req = Request.blank(
|
||||
"/sda1/p/%s?format=json%s" % (path, params),
|
||||
method="GET",
|
||||
headers={
|
||||
"X-Backend-Record-Type": "shard",
|
||||
"X-Backend-Record-shard-format": "namespace",
|
||||
"X-Backend-Override-Shard-Name-Filter": "sharded",
|
||||
},
|
||||
)
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertEqual(resp.content_type, 'application/json')
|
||||
self.assertEqual(expected_ns, json.loads(resp.body))
|
||||
self.assertIn('X-Backend-Record-Type', resp.headers)
|
||||
self.assertEqual(
|
||||
'shard', resp.headers['X-Backend-Record-Type'])
|
||||
self.assertEqual(
|
||||
'namespace', resp.headers['X-Backend-Record-Shard-Format'])
|
||||
self.assertNotIn(
|
||||
'X-Backend-Override-Shard-Name-Filter', resp.headers)
|
||||
# GET shard ranges to cross-check.
|
||||
req = Request.blank('/sda1/p/%s?format=json%s' %
|
||||
(path, params), method='GET',
|
||||
headers={'X-Backend-Record-Type': 'shard'})
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertIn('X-Backend-Record-Type', resp.headers)
|
||||
self.assertEqual(
|
||||
'shard', resp.headers['X-Backend-Record-Type'])
|
||||
self.assertIn('X-Backend-Record-Shard-Format', resp.headers)
|
||||
self.assertEqual(
|
||||
'full', resp.headers['X-Backend-Record-Shard-Format'])
|
||||
raw_sr = json.loads(resp.body)
|
||||
expected_sr = [{'name': sr['name'], 'lower': sr['lower'],
|
||||
'upper': sr['upper']}
|
||||
for sr in raw_sr]
|
||||
self.assertEqual(expected_ns, expected_sr)
|
||||
# GET shard ranges with explicit 'full' shard format.
|
||||
req = Request.blank(
|
||||
"/sda1/p/%s?format=json%s" % (path, params),
|
||||
method="GET",
|
||||
headers={
|
||||
"X-Backend-Record-Type": "shard",
|
||||
"X-Backend-Record-shard-format": "full",
|
||||
},
|
||||
)
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertIn('X-Backend-Record-Type', resp.headers)
|
||||
self.assertEqual(
|
||||
'shard', resp.headers['X-Backend-Record-Type'])
|
||||
self.assertIn('X-Backend-Record-Shard-Format', resp.headers)
|
||||
self.assertEqual(
|
||||
'full', resp.headers['X-Backend-Record-Shard-Format'])
|
||||
self.assertEqual(raw_sr, json.loads(resp.body))
|
||||
|
||||
def test_GET_namespaces_unsharded_root_state_listing(self):
|
||||
# root's namespaces for listing
|
||||
root_path = container_path = 'a/c'
|
||||
params = '&states=listing'
|
||||
expected_states = [
|
||||
ShardRange.CLEAVED, ShardRange.ACTIVE, ShardRange.SHARDING,
|
||||
ShardRange.SHRINKING]
|
||||
self._do_get_namespaces_unsharded(
|
||||
root_path, container_path, params, expected_states)
|
||||
|
||||
def test_GET_namespaces_unsharded_subshard_state_listing(self):
|
||||
# shard's namespaces for listing
|
||||
root_path = 'a/c'
|
||||
container_path = '.shards_a/c'
|
||||
params = '&states=listing'
|
||||
expected_states = [
|
||||
ShardRange.CLEAVED, ShardRange.ACTIVE, ShardRange.SHARDING,
|
||||
ShardRange.SHRINKING]
|
||||
self._do_get_namespaces_unsharded(
|
||||
root_path, container_path, params, expected_states)
|
||||
|
||||
def test_GET_namespaces_unsharded_root_state_updating(self):
|
||||
# root's namespaces for updating
|
||||
root_path = container_path = 'a/c'
|
||||
params = '&states=updating'
|
||||
expected_states = [
|
||||
ShardRange.CREATED, ShardRange.CLEAVED, ShardRange.ACTIVE,
|
||||
ShardRange.SHARDING]
|
||||
container_path = root_path
|
||||
self._do_get_namespaces_unsharded(
|
||||
root_path, container_path, params, expected_states)
|
||||
|
||||
def test_GET_namespaces_unsharded_subshard_state_updating(self):
|
||||
# shard's namespaces for updating
|
||||
root_path = 'a/c'
|
||||
container_path = '.shards_a/c'
|
||||
params = '&states=updating'
|
||||
expected_states = [
|
||||
ShardRange.CREATED, ShardRange.CLEAVED, ShardRange.ACTIVE,
|
||||
ShardRange.SHARDING]
|
||||
self._do_get_namespaces_unsharded(
|
||||
root_path, container_path, params, expected_states)
|
||||
|
||||
def _do_get_namespaces_sharded(self, root_path, path,
|
||||
params, expected_states):
|
||||
# make a shard container
|
||||
shard_ranges = []
|
||||
lower = ''
|
||||
for state in sorted(ShardRange.STATES.keys()):
|
||||
upper = str(state)
|
||||
shard_ranges.append(
|
||||
ShardRange('.shards_a/c_%s' % upper, next(self.ts),
|
||||
lower, upper, state * 100, state * 1000,
|
||||
meta_timestamp=next(self.ts),
|
||||
state=state, state_timestamp=next(self.ts)))
|
||||
lower = upper
|
||||
expected_sr = [
|
||||
sr for sr in shard_ranges if sr.state in expected_states]
|
||||
own_shard_range = ShardRange(path, next(self.ts), '', '',
|
||||
100, 1000,
|
||||
meta_timestamp=next(self.ts),
|
||||
state=ShardRange.ACTIVE,
|
||||
state_timestamp=next(self.ts),
|
||||
epoch=next(self.ts))
|
||||
filler_sr = own_shard_range.copy(lower=expected_sr[-1].upper)
|
||||
expected_sr.append(filler_sr)
|
||||
expected_ns = [{'name': sr.name, 'lower': sr.lower_str,
|
||||
'upper': sr.upper_str} for sr in expected_sr]
|
||||
headers = {'X-Timestamp': next(self.ts).normal}
|
||||
|
||||
# create container
|
||||
req = Request.blank(
|
||||
'/sda1/p/%s' % path, method='PUT', headers=headers)
|
||||
self.assertIn(
|
||||
req.get_response(self.controller).status_int, (201, 202))
|
||||
# PUT some shard ranges
|
||||
headers = {'X-Timestamp': next(self.ts).normal,
|
||||
'X-Container-Sysmeta-Shard-Root': root_path,
|
||||
'X-Backend-Record-Type': 'shard'}
|
||||
body = json.dumps(
|
||||
[dict(sr) for sr in shard_ranges + [own_shard_range]])
|
||||
req = Request.blank(
|
||||
'/sda1/p/%s' % path, method='PUT', headers=headers, body=body)
|
||||
self.assertEqual(202, req.get_response(self.controller).status_int)
|
||||
|
||||
# set broker to sharded state so
|
||||
# X-Backend-Override-Shard-Name-Filter does have effect
|
||||
shard_broker = self.controller._get_container_broker(
|
||||
'sda1', 'p', '.shards_a', 'c')
|
||||
self.assertTrue(shard_broker.set_sharding_state())
|
||||
self.assertTrue(shard_broker.set_sharded_state())
|
||||
|
||||
# GET namespaces.
|
||||
req = Request.blank(
|
||||
"/sda1/p/%s?format=json%s" % (path, params),
|
||||
method="GET",
|
||||
headers={
|
||||
"X-Backend-Record-Type": "shard",
|
||||
"X-Backend-Record-shard-format": "namespace",
|
||||
"X-Backend-Override-Shard-Name-Filter": "sharded",
|
||||
},
|
||||
)
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertEqual(resp.content_type, 'application/json')
|
||||
self.assertEqual(expected_ns, json.loads(resp.body))
|
||||
self.assertIn('X-Backend-Record-Type', resp.headers)
|
||||
self.assertEqual(
|
||||
'shard', resp.headers['X-Backend-Record-Type'])
|
||||
self.assertEqual(
|
||||
'namespace', resp.headers['X-Backend-Record-Shard-Format'])
|
||||
self.assertIn('X-Backend-Override-Shard-Name-Filter', resp.headers)
|
||||
self.assertTrue(
|
||||
resp.headers['X-Backend-Override-Shard-Name-Filter'])
|
||||
# GET shard ranges to cross-check.
|
||||
req = Request.blank(
|
||||
"/sda1/p/%s?format=json%s" % (path, params),
|
||||
method="GET",
|
||||
headers={
|
||||
"X-Backend-Record-Type": "shard",
|
||||
"X-Backend-Record-shard-format": "full",
|
||||
"X-Backend-Override-Shard-Name-Filter": "sharded",
|
||||
},
|
||||
)
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertIn('X-Backend-Record-Type', resp.headers)
|
||||
self.assertEqual(
|
||||
'shard', resp.headers['X-Backend-Record-Type'])
|
||||
self.assertIn('X-Backend-Record-Shard-Format', resp.headers)
|
||||
self.assertEqual(
|
||||
'full', resp.headers['X-Backend-Record-Shard-Format'])
|
||||
raw_sr = json.loads(resp.body)
|
||||
expected_sr = [{'name': sr['name'], 'lower': sr['lower'],
|
||||
'upper': sr['upper']}
|
||||
for sr in raw_sr]
|
||||
self.assertEqual(expected_ns, expected_sr)
|
||||
|
||||
def test_GET_namespaces_sharded_subshard_state_listing(self):
|
||||
# shard's namespaces for listing
|
||||
root_path = 'a/c'
|
||||
container_path = '.shards_a/c'
|
||||
params = '&states=listing'
|
||||
expected_states = [
|
||||
ShardRange.CLEAVED, ShardRange.ACTIVE, ShardRange.SHARDING,
|
||||
ShardRange.SHRINKING]
|
||||
self._do_get_namespaces_sharded(
|
||||
root_path, container_path, params, expected_states)
|
||||
|
||||
def test_GET_namespaces_sharded_subshard_state_updating(self):
|
||||
# shard's namespaces for updating
|
||||
root_path = 'a/c'
|
||||
container_path = '.shards_a/c'
|
||||
params = '&states=updating'
|
||||
expected_states = [
|
||||
ShardRange.CREATED, ShardRange.CLEAVED, ShardRange.ACTIVE,
|
||||
ShardRange.SHARDING]
|
||||
self._do_get_namespaces_sharded(
|
||||
root_path, container_path, params, expected_states)
|
||||
|
||||
def _do_create_container_for_GET_namespaces(self):
|
||||
# make a container
|
||||
ts_put = next(self.ts)
|
||||
headers = {'X-Timestamp': ts_put.normal}
|
||||
req = Request.blank('/sda1/p/a/c', method='PUT', headers=headers)
|
||||
self.assertEqual(201, req.get_response(self.controller).status_int)
|
||||
# PUT some shard ranges
|
||||
shard_bounds = [('', 'apple', ShardRange.SHRINKING),
|
||||
('apple', 'ham', ShardRange.CLEAVED),
|
||||
('ham', 'salami', ShardRange.ACTIVE),
|
||||
('salami', 'yoghurt', ShardRange.CREATED),
|
||||
('yoghurt', '', ShardRange.FOUND),
|
||||
]
|
||||
shard_ranges = [
|
||||
ShardRange('.sharded_a/_%s' % upper, next(self.ts),
|
||||
lower, upper,
|
||||
i * 100, i * 1000, meta_timestamp=next(self.ts),
|
||||
state=state, state_timestamp=next(self.ts))
|
||||
for i, (lower, upper, state) in enumerate(shard_bounds)]
|
||||
for shard_range in shard_ranges:
|
||||
self._put_shard_range(shard_range)
|
||||
|
||||
broker = self.controller._get_container_broker('sda1', 'p', 'a', 'c')
|
||||
self.assertTrue(broker.is_root_container()) # sanity
|
||||
self._assert_shard_ranges_equal(shard_ranges,
|
||||
broker.get_shard_ranges())
|
||||
|
||||
return shard_ranges
|
||||
|
||||
def test_GET_namespaces_other_params(self):
|
||||
shard_ranges = self._do_create_container_for_GET_namespaces()
|
||||
ts_now = Timestamp.now() # used when mocking Timestamp.now()
|
||||
|
||||
# Test namespace GET with 'include' or 'marker/end_marker' or 'reverse'
|
||||
# parameters which are not supported.
|
||||
def check_namespace_GET(expected_namespaces, path, params=''):
|
||||
req = Request.blank(
|
||||
'/sda1/p/%s?format=json%s' % (path, params), method='GET',
|
||||
headers={
|
||||
"X-Backend-Record-Type": "shard",
|
||||
"X-Backend-Record-shard-format": "namespace",
|
||||
})
|
||||
with mock_timestamp_now(ts_now):
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertEqual(resp.content_type, 'application/json')
|
||||
expected_ns = [dict(ns) for ns in expected_namespaces]
|
||||
self.assertEqual(expected_ns, json.loads(resp.body))
|
||||
self.assertIn('X-Backend-Record-Type', resp.headers)
|
||||
self.assertEqual('shard', resp.headers['X-Backend-Record-Type'])
|
||||
self.assertIn('X-Backend-Record-Shard-Format', resp.headers)
|
||||
self.assertEqual(
|
||||
'namespace', resp.headers['X-Backend-Record-Shard-Format'])
|
||||
|
||||
namespaces = [Namespace(sr.name, sr.lower, sr.upper)
|
||||
for sr in shard_ranges]
|
||||
check_namespace_GET(
|
||||
namespaces[:3], 'a/c',
|
||||
params='&states=listing&end_marker=pickle')
|
||||
check_namespace_GET(
|
||||
reversed(namespaces[:3]), 'a/c',
|
||||
params='&states=listing&reverse=true&marker=pickle')
|
||||
check_namespace_GET(namespaces[1:4], 'a/c',
|
||||
params='&states=updating&end_marker=treacle')
|
||||
check_namespace_GET(
|
||||
reversed(namespaces[1:4]), 'a/c',
|
||||
params='&states=updating&reverse=true&marker=treacle')
|
||||
check_namespace_GET(namespaces[1:2],
|
||||
'a/c', params='&includes=cheese')
|
||||
check_namespace_GET(namespaces[1:2], 'a/c', params='&includes=ham')
|
||||
check_namespace_GET(reversed(namespaces),
|
||||
'a/c', params='&reverse=true')
|
||||
|
||||
def test_GET_namespaces_not_supported(self):
|
||||
self._do_create_container_for_GET_namespaces()
|
||||
|
||||
# Test namespace GET with 'X-Backend-Include-Deleted' header.
|
||||
req = Request.blank(
|
||||
'/sda1/p/%s?format=json%s' % ('a/c', '&states=listing'),
|
||||
method='GET',
|
||||
headers={
|
||||
"X-Backend-Record-Type": "shard",
|
||||
"X-Backend-Record-shard-format": "namespace",
|
||||
'X-Backend-Include-Deleted': 'True'
|
||||
})
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEqual(resp.status, '400 Bad Request')
|
||||
self.assertEqual(resp.body, b'No include_deleted for namespace GET')
|
||||
|
||||
# Test namespace GET with 'auditing' state in query params.
|
||||
req = Request.blank(
|
||||
'/sda1/p/%s?format=json%s' % ('a/c', '&states=auditing'),
|
||||
method='GET',
|
||||
headers={
|
||||
"X-Backend-Record-Type": "shard",
|
||||
"X-Backend-Record-shard-format": "namespace",
|
||||
})
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEqual(resp.status, '400 Bad Request')
|
||||
self.assertEqual(resp.body, b'No auditing state for namespace GET')
|
||||
|
||||
def test_GET_namespaces_errors(self):
|
||||
self._do_create_container_for_GET_namespaces()
|
||||
|
||||
def do_test(params, expected_status):
|
||||
params['format'] = 'json'
|
||||
headers = {'X-Backend-Record-Type': 'shard',
|
||||
"X-Backend-Record-shard-format": "namespace"}
|
||||
req = Request.blank('/sda1/p/a/c', method='GET',
|
||||
headers=headers, params=params)
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEqual(resp.status_int, expected_status)
|
||||
self.assertEqual(resp.content_type, 'text/html')
|
||||
self.assertNotIn('X-Backend-Record-Type', resp.headers)
|
||||
self.assertNotIn('X-Backend-Record-shard-format', resp.headers)
|
||||
self.assertNotIn('X-Backend-Sharding-State', resp.headers)
|
||||
self.assertNotIn('X-Container-Object-Count', resp.headers)
|
||||
self.assertNotIn('X-Container-Bytes-Used', resp.headers)
|
||||
self.assertNotIn('X-Timestamp', resp.headers)
|
||||
self.assertNotIn('X-PUT-Timestamp', resp.headers)
|
||||
|
||||
do_test({'states': 'bad'}, 400)
|
||||
|
||||
with mock.patch('swift.container.server.check_drive',
|
||||
side_effect=ValueError('sda1 is not mounted')):
|
||||
do_test({}, 507)
|
||||
|
||||
# delete the container
|
||||
req = Request.blank('/sda1/p/a/c', method='DELETE',
|
||||
headers={'X-Timestamp': Timestamp.now().normal})
|
||||
self.assertEqual(204, req.get_response(self.controller).status_int)
|
||||
|
||||
do_test({'states': 'bad'}, 404)
|
||||
|
||||
def test_GET_auto_record_type(self):
|
||||
# make a container
|
||||
ts_now = Timestamp.now() # used when mocking Timestamp.now()
|
||||
@ -3539,6 +4030,9 @@ class TestContainerController(unittest.TestCase):
|
||||
self.assertIn('X-Backend-Record-Type', resp.headers)
|
||||
self.assertEqual(
|
||||
'shard', resp.headers.pop('X-Backend-Record-Type'))
|
||||
self.assertIn('X-Backend-Record-Shard-Format', resp.headers)
|
||||
self.assertEqual(
|
||||
'full', resp.headers.pop('X-Backend-Record-Shard-Format'))
|
||||
self.assertEqual(
|
||||
str(POLICIES.default.idx),
|
||||
resp.headers.pop('X-Backend-Storage-Policy-Index'))
|
||||
|
Loading…
Reference in New Issue
Block a user