Allow sharded container to be deleted (when empty)
This is a *simple* delete strategy - if the root container thinks it is empty then it is OK to delete it. The root will not consider itself empty until (a) all objects have been deleted and (b) all shards have updated their usage (i.e. the sharders have passed over them), so there may be some delay between object deletes and the root being deletable. That is analogous to a normal container being undeletable because some delete updates dropped into an async pending queue. We can do better, but this allows us to delete at some point. Change-Id: I942b3b5a8295bb9957eddb15a149263a9037dd9a
This commit is contained in:
parent
c0d29c1219
commit
708c9b3dae
|
@ -1789,11 +1789,15 @@ class ContainerBroker(DatabaseBroker):
|
|||
if next_shard_upper is None:
|
||||
# We reached the end of the container
|
||||
next_shard_upper = cont_upper
|
||||
shard_size = object_count - progress
|
||||
last_found = True
|
||||
|
||||
# NB set non-zero object count to that container is non-deletable
|
||||
# if shards found but not yet cleaved
|
||||
found_ranges.append(
|
||||
ShardRange.create(self.root_account, self.root_container,
|
||||
last_shard_upper, next_shard_upper)
|
||||
last_shard_upper, next_shard_upper,
|
||||
object_count=shard_size)
|
||||
)
|
||||
|
||||
if last_found:
|
||||
|
|
|
@ -25,8 +25,7 @@ from eventlet import Timeout
|
|||
import swift.common.db
|
||||
from swift.container.sync_store import ContainerSyncStore
|
||||
from swift.container.backend import ContainerBroker, DATADIR, \
|
||||
RECORD_TYPE_SHARD_NODE, DB_STATE_SHARDING, DB_STATE_SHARDED, \
|
||||
DB_STATE_UNSHARDED
|
||||
RECORD_TYPE_SHARD_NODE, DB_STATE_SHARDING, DB_STATE_UNSHARDED
|
||||
from swift.container.replicator import ContainerReplicatorRpc
|
||||
from swift.common.db import DatabaseAlreadyExists
|
||||
from swift.common.container_sync_realms import ContainerSyncRealms
|
||||
|
@ -342,16 +341,6 @@ class ContainerController(BaseStorageServer):
|
|||
# delete container
|
||||
if not broker.empty():
|
||||
return HTTPConflict(request=req)
|
||||
db_state = broker.get_db_state()
|
||||
if db_state in (DB_STATE_SHARDED, DB_STATE_SHARDING):
|
||||
resp = HTTPPreconditionFailed(request=req)
|
||||
resp.headers['X-Backend-Sharding-State'] = db_state
|
||||
resp.headers.update(
|
||||
(key, value)
|
||||
for key, (value, timestamp) in broker.metadata.items()
|
||||
if value != '' and (key.lower() in self.save_headers or
|
||||
is_sys_or_user_meta('container', key)))
|
||||
return resp
|
||||
existed = Timestamp(broker.get_info()['put_timestamp']) and \
|
||||
not broker.is_deleted()
|
||||
broker.delete_db(req_timestamp.internal)
|
||||
|
|
|
@ -21,8 +21,7 @@ from swift.common.utils import public, csv_append, Timestamp, \
|
|||
config_true_value
|
||||
from swift.common.constraints import check_metadata, CONTAINER_LISTING_LIMIT
|
||||
from swift.common import constraints
|
||||
from swift.common.http import HTTP_ACCEPTED, is_success, \
|
||||
HTTP_PRECONDITION_FAILED
|
||||
from swift.common.http import HTTP_ACCEPTED, is_success
|
||||
from swift.common.request_helpers import get_sys_meta_prefix
|
||||
from swift.proxy.controllers.base import Controller, delay_denial, \
|
||||
cors_validation, set_info_cache, clear_info_cache
|
||||
|
@ -416,14 +415,6 @@ class ContainerController(Controller):
|
|||
req.swift_entity_path, [headers] * len(containers))
|
||||
return resp
|
||||
|
||||
def _delete_sharded(self, req, sharding_state):
|
||||
# TODO propergate the DELETE to all shards. If one returns a 409 then
|
||||
# we back off (what do we do with the containers that were deleted).
|
||||
# Also we need some kind of force delete when sending to the root
|
||||
# container while in the sharding state, as there will be a readonly
|
||||
# (non-empty) container.
|
||||
return HTTPBadRequest(req)
|
||||
|
||||
@public
|
||||
@cors_validation
|
||||
def DELETE(self, req):
|
||||
|
@ -444,13 +435,6 @@ class ContainerController(Controller):
|
|||
# Indicates no server had the container
|
||||
if resp.status_int == HTTP_ACCEPTED:
|
||||
return HTTPNotFound(request=req)
|
||||
|
||||
sharding_state = resp.headers.get('X-Backend-Sharding-State')
|
||||
if resp.status_int == HTTP_PRECONDITION_FAILED and sharding_state:
|
||||
if sharding_state in (DB_STATE_SHARDING, DB_STATE_SHARDED):
|
||||
# We need to first attempt to delete the container shards then
|
||||
# the container
|
||||
resp = self._delete_sharded(req, )
|
||||
return resp
|
||||
|
||||
def _backend_requests(self, req, n_outgoing, account_partition, accounts,
|
||||
|
|
|
@ -18,10 +18,13 @@ import uuid
|
|||
|
||||
from nose import SkipTest
|
||||
|
||||
from swift.common import direct_client
|
||||
from swift.common.direct_client import DirectClientException
|
||||
from swift.common.utils import ShardRange
|
||||
from swift.container.backend import ContainerBroker, DB_STATE
|
||||
from swift.common import utils
|
||||
from swift.common.manager import Manager
|
||||
from swiftclient import client, get_auth
|
||||
from swiftclient import client, get_auth, ClientException
|
||||
|
||||
from test.probe.brain import BrainSplitter
|
||||
from test.probe.common import ReplProbeTest, get_server_number
|
||||
|
@ -49,7 +52,7 @@ class TestContainerSharding(ReplProbeTest):
|
|||
|
||||
if self.max_shard_size > MAX_SHARD_CONTAINER_SIZE:
|
||||
raise SkipTest('shard_container_size is too big! %d > %d' %
|
||||
self.max_shard_size, MAX_SHARD_CONTAINER_SIZE)
|
||||
(self.max_shard_size, MAX_SHARD_CONTAINER_SIZE))
|
||||
|
||||
_, self.admin_token = get_auth(
|
||||
'http://127.0.0.1:8080/auth/v1.0', 'admin:admin', 'admin')
|
||||
|
@ -60,6 +63,25 @@ class TestContainerSharding(ReplProbeTest):
|
|||
|
||||
self.sharders = Manager(['container-sharder'])
|
||||
|
||||
def direct_delete_container(self, account=None, container=None,
|
||||
expect_failure=False):
|
||||
account = account if account else self.account
|
||||
container = container if container else self.container_name
|
||||
cpart, cnodes = self.container_ring.get_nodes(account, container)
|
||||
unexpected_responses = []
|
||||
for cnode in cnodes:
|
||||
try:
|
||||
direct_client.direct_delete_container(
|
||||
cnode, cpart, account, container)
|
||||
except DirectClientException as err:
|
||||
if not expect_failure:
|
||||
unexpected_responses.append((cnode, err))
|
||||
else:
|
||||
if expect_failure:
|
||||
unexpected_responses.append((cnode, 'success'))
|
||||
if unexpected_responses:
|
||||
self.fail('Unexpected responses: %s' % unexpected_responses)
|
||||
|
||||
def categorize_container_dir_content(self, container=None):
|
||||
container = container or self.container_name
|
||||
part, nodes = self.brain.ring.get_nodes(self.brain.account, container)
|
||||
|
@ -106,6 +128,19 @@ class TestContainerSharding(ReplProbeTest):
|
|||
self.assertEqual(obj_len, length, 'len(%r) == %d, not %d' % (
|
||||
obj, obj_len, length))
|
||||
|
||||
def assert_shard_ranges_contiguous(self, expected_number, shard_ranges):
|
||||
actual_shard_ranges = sorted([ShardRange.from_dict(d)
|
||||
for d in shard_ranges])
|
||||
self.assertLengthEqual(actual_shard_ranges, expected_number)
|
||||
self.assertEqual('', actual_shard_ranges[0].lower)
|
||||
for x, y in zip(actual_shard_ranges, actual_shard_ranges[1:]):
|
||||
self.assertEqual(x.upper, y.lower)
|
||||
self.assertEqual('', actual_shard_ranges[-1].upper)
|
||||
|
||||
def assert_total_object_count(self, expected_object_count, shard_ranges):
|
||||
actual = sum([sr['object_count'] for sr in shard_ranges])
|
||||
self.assertEqual(expected_object_count, actual)
|
||||
|
||||
def _test_sharded_listing(self, run_replicators=False):
|
||||
obj_names = ['obj%03d' % x for x in range(self.max_shard_size)]
|
||||
|
||||
|
@ -148,6 +183,9 @@ class TestContainerSharding(ReplProbeTest):
|
|||
self.assertEqual('sharded', DB_STATE[broker.get_db_state()])
|
||||
expected_shard_ranges = [dict(sr) for sr in broker.get_shard_ranges()]
|
||||
self.assertLengthEqual(expected_shard_ranges, 2)
|
||||
self.assert_total_object_count(len(obj_names), expected_shard_ranges)
|
||||
self.assert_shard_ranges_contiguous(2, expected_shard_ranges)
|
||||
self.direct_delete_container(expect_failure=True)
|
||||
|
||||
self.assertLengthEqual(found['normal_dbs'], 2)
|
||||
for db_file in found['normal_dbs']:
|
||||
|
@ -224,6 +262,22 @@ class TestContainerSharding(ReplProbeTest):
|
|||
self.assertEqual(headers['x-container-object-count'],
|
||||
str(len(obj_names) + 1))
|
||||
|
||||
with self.assertRaises(ClientException) as cm:
|
||||
client.delete_container(self.url, self.token, self.container_name)
|
||||
self.assertEqual(409, cm.exception.http_status)
|
||||
|
||||
for obj in listing:
|
||||
client.delete_object(
|
||||
self.url, self.token, self.container_name, obj['name'])
|
||||
|
||||
# root container will not yet be aware of the deletions
|
||||
with self.assertRaises(ClientException) as cm:
|
||||
client.delete_container(self.url, self.token, self.container_name)
|
||||
self.assertEqual(409, cm.exception.http_status)
|
||||
# but once the sharders run and shards update the root...
|
||||
self.sharders.once()
|
||||
client.delete_container(self.url, self.token, self.container_name)
|
||||
|
||||
def test_sharded_listing_no_replicators(self):
|
||||
self._test_sharded_listing()
|
||||
|
||||
|
|
|
@ -2655,15 +2655,16 @@ class TestContainerBroker(unittest.TestCase):
|
|||
container_name = 'test_container'
|
||||
|
||||
def do_test(expected_bounds, expected_last_found, shard_size, limit):
|
||||
# expected_bounds is a list of tuples (lower, upper, object_count)
|
||||
# build expected shard range dicts
|
||||
expected_range_dicts = []
|
||||
for lower, upper in expected_bounds:
|
||||
for lower, upper, object_count in expected_bounds:
|
||||
name = '.sharded_a/%s-%s' % (
|
||||
container_name,
|
||||
hashlib.md5('%s-%s' % (upper, ts_now.internal)).hexdigest()
|
||||
)
|
||||
d = dict(name=name, created_at=ts_now.internal, lower=lower,
|
||||
upper=upper, object_count=0, bytes_used=0,
|
||||
upper=upper, object_count=object_count, bytes_used=0,
|
||||
meta_timestamp=ts_now.internal, deleted=0)
|
||||
expected_range_dicts.append(d)
|
||||
# call the method under test
|
||||
|
@ -2697,12 +2698,12 @@ class TestContainerBroker(unittest.TestCase):
|
|||
broker.put_object(
|
||||
'obj%02d' % i, next(ts_iter).internal, 0, 'text/plain', 'etag')
|
||||
|
||||
expected_bounds = [(c_lower, 'obj04'), ('obj04', c_upper)]
|
||||
expected_bounds = [(c_lower, 'obj04', 5), ('obj04', c_upper, 5)]
|
||||
do_test(expected_bounds, True, shard_size=5, limit=None)
|
||||
|
||||
expected = [(c_lower, 'obj06'), ('obj06', c_upper)]
|
||||
expected = [(c_lower, 'obj06', 7), ('obj06', c_upper, 3)]
|
||||
do_test(expected, True, shard_size=7, limit=None)
|
||||
expected = [(c_lower, 'obj08'), ('obj08', c_upper)]
|
||||
expected = [(c_lower, 'obj08', 9), ('obj08', c_upper, 1)]
|
||||
do_test(expected, True, shard_size=9, limit=None)
|
||||
# shard size >= object count
|
||||
do_test([], False, shard_size=10, limit=None)
|
||||
|
@ -2710,11 +2711,12 @@ class TestContainerBroker(unittest.TestCase):
|
|||
|
||||
# check use of limit
|
||||
do_test([], False, shard_size=4, limit=0)
|
||||
expected = [(c_lower, 'obj03')]
|
||||
expected = [(c_lower, 'obj03', 4)]
|
||||
do_test(expected, False, shard_size=4, limit=1)
|
||||
expected = [(c_lower, 'obj03'), ('obj03', 'obj07')]
|
||||
expected = [(c_lower, 'obj03', 4), ('obj03', 'obj07', 4)]
|
||||
do_test(expected, False, shard_size=4, limit=2)
|
||||
expected = [(c_lower, 'obj03'), ('obj03', 'obj07'), ('obj07', c_upper)]
|
||||
expected = [(c_lower, 'obj03', 4), ('obj03', 'obj07', 4),
|
||||
('obj07', c_upper, 2)]
|
||||
do_test(expected, True, shard_size=4, limit=3)
|
||||
do_test(expected, True, shard_size=4, limit=4)
|
||||
do_test(expected, True, shard_size=4, limit=-1)
|
||||
|
@ -2722,10 +2724,11 @@ class TestContainerBroker(unittest.TestCase):
|
|||
# increase object count to 11
|
||||
broker.put_object(
|
||||
'obj10', next(ts_iter).internal, 0, 'text/plain', 'etag')
|
||||
expected = [(c_lower, 'obj03'), ('obj03', 'obj07'), ('obj07', c_upper)]
|
||||
expected = [(c_lower, 'obj03', 4), ('obj03', 'obj07', 4),
|
||||
('obj07', c_upper, 3)]
|
||||
do_test(expected, True, shard_size=4, limit=None)
|
||||
|
||||
expected = [(c_lower, 'obj09'), ('obj09', c_upper)]
|
||||
expected = [(c_lower, 'obj09', 10), ('obj09', c_upper, 1)]
|
||||
do_test(expected, True, shard_size=10, limit=None)
|
||||
do_test([], False, shard_size=11, limit=None)
|
||||
|
||||
|
@ -2734,16 +2737,16 @@ class TestContainerBroker(unittest.TestCase):
|
|||
'.sharded_a/srange-0', Timestamp.now(), '', 'obj03')
|
||||
broker.merge_shard_ranges([dict(shard_range)])
|
||||
|
||||
expected = [('obj03', 'obj07'), ('obj07', c_upper)]
|
||||
expected = [('obj03', 'obj07', 4), ('obj07', c_upper, 3)]
|
||||
do_test(expected, True, shard_size=4, limit=None)
|
||||
expected = [('obj03', 'obj07')]
|
||||
expected = [('obj03', 'obj07', 4)]
|
||||
do_test(expected, False, shard_size=4, limit=1)
|
||||
|
||||
# add another...
|
||||
shard_range = ShardRange(
|
||||
'.sharded_a/srange-1', Timestamp.now(), '', 'obj07')
|
||||
broker.merge_shard_ranges([dict(shard_range)])
|
||||
expected = [('obj07', c_upper)]
|
||||
expected = [('obj07', c_upper, 3)]
|
||||
do_test(expected, True, shard_size=4, limit=None)
|
||||
|
||||
# add last shard range...
|
||||
|
|
Loading…
Reference in New Issue