Fix SLO re-upload
Previously, if you uploaded a file as an SLO then re-uploaded it with the same segment size and mtime, the second upload would go delete the segments it just (re)uploaded. This was due to us tracking old_slo_manifest_paths and new_slo_manifest_paths in different formats; one would have a leading slash while the other would not. Now, normalize to the stripped-slash version so we stop deleting segments we just uploaded. Change-Id: Ibcbed3df4febe81cdf13855656e2daaca8d521b4
This commit is contained in:
parent
113eacf3b8
commit
9021a58c24
@ -17,6 +17,7 @@ import logging
|
|||||||
|
|
||||||
import os
|
import os
|
||||||
|
|
||||||
|
from collections import defaultdict
|
||||||
from concurrent.futures import as_completed, CancelledError, TimeoutError
|
from concurrent.futures import as_completed, CancelledError, TimeoutError
|
||||||
from copy import deepcopy
|
from copy import deepcopy
|
||||||
from errno import EEXIST, ENOENT
|
from errno import EEXIST, ENOENT
|
||||||
@ -44,7 +45,7 @@ from swiftclient.command_helpers import (
|
|||||||
from swiftclient.utils import (
|
from swiftclient.utils import (
|
||||||
config_true_value, ReadableToIterable, LengthWrapper, EMPTY_ETAG,
|
config_true_value, ReadableToIterable, LengthWrapper, EMPTY_ETAG,
|
||||||
parse_api_response, report_traceback, n_groups, split_request_headers,
|
parse_api_response, report_traceback, n_groups, split_request_headers,
|
||||||
n_at_a_time
|
n_at_a_time, normalize_manifest_path
|
||||||
)
|
)
|
||||||
from swiftclient.exceptions import ClientException
|
from swiftclient.exceptions import ClientException
|
||||||
from swiftclient.multithreading import MultiThreadingManager
|
from swiftclient.multithreading import MultiThreadingManager
|
||||||
@ -2071,11 +2072,9 @@ class SwiftService(object):
|
|||||||
if not options['leave_segments']:
|
if not options['leave_segments']:
|
||||||
old_manifest = headers.get('x-object-manifest')
|
old_manifest = headers.get('x-object-manifest')
|
||||||
if is_slo:
|
if is_slo:
|
||||||
for old_seg in chunk_data:
|
old_slo_manifest_paths.extend(
|
||||||
seg_path = old_seg['name'].lstrip('/')
|
normalize_manifest_path(old_seg['name'])
|
||||||
if isinstance(seg_path, text_type):
|
for old_seg in chunk_data)
|
||||||
seg_path = seg_path.encode('utf-8')
|
|
||||||
old_slo_manifest_paths.append(seg_path)
|
|
||||||
except ClientException as err:
|
except ClientException as err:
|
||||||
if err.http_status != 404:
|
if err.http_status != 404:
|
||||||
traceback, err_time = report_traceback()
|
traceback, err_time = report_traceback()
|
||||||
@ -2165,8 +2164,9 @@ class SwiftService(object):
|
|||||||
response = self._upload_slo_manifest(
|
response = self._upload_slo_manifest(
|
||||||
conn, segment_results, container, obj, put_headers)
|
conn, segment_results, container, obj, put_headers)
|
||||||
res['manifest_response_dict'] = response
|
res['manifest_response_dict'] = response
|
||||||
new_slo_manifest_paths = {
|
new_slo_manifest_paths.update(
|
||||||
seg['segment_location'] for seg in segment_results}
|
normalize_manifest_path(new_seg['segment_location'])
|
||||||
|
for new_seg in segment_results)
|
||||||
else:
|
else:
|
||||||
new_object_manifest = '%s/%s/%s/%s/%s/' % (
|
new_object_manifest = '%s/%s/%s/%s/%s/' % (
|
||||||
quote(seg_container.encode('utf8')),
|
quote(seg_container.encode('utf8')),
|
||||||
@ -2223,8 +2223,9 @@ class SwiftService(object):
|
|||||||
response = self._upload_slo_manifest(
|
response = self._upload_slo_manifest(
|
||||||
conn, results, container, obj, put_headers)
|
conn, results, container, obj, put_headers)
|
||||||
res['manifest_response_dict'] = response
|
res['manifest_response_dict'] = response
|
||||||
new_slo_manifest_paths = {
|
new_slo_manifest_paths.update(
|
||||||
r['segment_location'] for r in results}
|
normalize_manifest_path(new_seg['segment_location'])
|
||||||
|
for new_seg in results)
|
||||||
res['large_object'] = True
|
res['large_object'] = True
|
||||||
else:
|
else:
|
||||||
res['response_dict'] = ret
|
res['response_dict'] = ret
|
||||||
@ -2264,11 +2265,10 @@ class SwiftService(object):
|
|||||||
fp.close()
|
fp.close()
|
||||||
if old_manifest or old_slo_manifest_paths:
|
if old_manifest or old_slo_manifest_paths:
|
||||||
drs = []
|
drs = []
|
||||||
delobjsmap = {}
|
delobjsmap = defaultdict(list)
|
||||||
if old_manifest:
|
if old_manifest:
|
||||||
scontainer, sprefix = old_manifest.split('/', 1)
|
scontainer, sprefix = old_manifest.split('/', 1)
|
||||||
sprefix = sprefix.rstrip('/') + '/'
|
sprefix = sprefix.rstrip('/') + '/'
|
||||||
delobjsmap[scontainer] = []
|
|
||||||
for part in self.list(scontainer, {'prefix': sprefix}):
|
for part in self.list(scontainer, {'prefix': sprefix}):
|
||||||
if not part["success"]:
|
if not part["success"]:
|
||||||
raise part["error"]
|
raise part["error"]
|
||||||
@ -2280,10 +2280,8 @@ class SwiftService(object):
|
|||||||
if seg_to_delete in new_slo_manifest_paths:
|
if seg_to_delete in new_slo_manifest_paths:
|
||||||
continue
|
continue
|
||||||
scont, sobj = \
|
scont, sobj = \
|
||||||
seg_to_delete.split(b'/', 1)
|
seg_to_delete.split('/', 1)
|
||||||
delobjs_cont = delobjsmap.get(scont, [])
|
delobjsmap[scont].append(sobj)
|
||||||
delobjs_cont.append(sobj)
|
|
||||||
delobjsmap[scont] = delobjs_cont
|
|
||||||
|
|
||||||
del_segs = []
|
del_segs = []
|
||||||
for dscont, dsobjs in delobjsmap.items():
|
for dscont, dsobjs in delobjsmap.items():
|
||||||
|
@ -395,3 +395,11 @@ def n_at_a_time(seq, n):
|
|||||||
def n_groups(seq, n):
|
def n_groups(seq, n):
|
||||||
items_per_group = ((len(seq) - 1) // n) + 1
|
items_per_group = ((len(seq) - 1) // n) + 1
|
||||||
return n_at_a_time(seq, items_per_group)
|
return n_at_a_time(seq, items_per_group)
|
||||||
|
|
||||||
|
|
||||||
|
def normalize_manifest_path(path):
|
||||||
|
if six.PY2 and isinstance(path, six.text_type):
|
||||||
|
path = path.encode('utf-8')
|
||||||
|
if path.startswith('/'):
|
||||||
|
return path[1:]
|
||||||
|
return path
|
||||||
|
@ -799,11 +799,11 @@ class TestShell(unittest.TestCase):
|
|||||||
response_dict={})
|
response_dict={})
|
||||||
expected_delete_calls = [
|
expected_delete_calls = [
|
||||||
mock.call(
|
mock.call(
|
||||||
b'container1', b'old_seg1',
|
'container1', 'old_seg1',
|
||||||
response_dict={}
|
response_dict={}
|
||||||
),
|
),
|
||||||
mock.call(
|
mock.call(
|
||||||
b'container2', b'old_seg2',
|
'container2', 'old_seg2',
|
||||||
response_dict={}
|
response_dict={}
|
||||||
)
|
)
|
||||||
]
|
]
|
||||||
@ -834,6 +834,46 @@ class TestShell(unittest.TestCase):
|
|||||||
response_dict={})
|
response_dict={})
|
||||||
self.assertFalse(connection.return_value.delete_object.mock_calls)
|
self.assertFalse(connection.return_value.delete_object.mock_calls)
|
||||||
|
|
||||||
|
@mock.patch('swiftclient.service.Connection')
|
||||||
|
def test_reupload_leaves_slo_segments(self, connection):
|
||||||
|
with open(self.tmpfile, "wb") as fh:
|
||||||
|
fh.write(b'12345678901234567890')
|
||||||
|
mtime = '{:.6f}'.format(os.path.getmtime(self.tmpfile))
|
||||||
|
expected_segments = [
|
||||||
|
'container_segments/{}/slo/{}/20/10/{:08d}'.format(
|
||||||
|
self.tmpfile[1:], mtime, i)
|
||||||
|
for i in range(2)
|
||||||
|
]
|
||||||
|
|
||||||
|
# Test re-upload overwriting a manifest doesn't remove
|
||||||
|
# segments it just wrote
|
||||||
|
connection.return_value.head_container.return_value = {
|
||||||
|
'x-storage-policy': 'one'}
|
||||||
|
connection.return_value.attempts = 0
|
||||||
|
argv = ["", "upload", "container", self.tmpfile,
|
||||||
|
"--use-slo", "-S", "10"]
|
||||||
|
connection.return_value.head_object.side_effect = [
|
||||||
|
{'x-static-large-object': 'true', # For the upload call
|
||||||
|
'content-length': '20'}]
|
||||||
|
connection.return_value.get_object.return_value = (
|
||||||
|
{},
|
||||||
|
# we've already *got* the expected manifest!
|
||||||
|
json.dumps([
|
||||||
|
{'name': seg} for seg in expected_segments
|
||||||
|
]).encode('ascii')
|
||||||
|
)
|
||||||
|
connection.return_value.put_object.return_value = (
|
||||||
|
'd41d8cd98f00b204e9800998ecf8427e')
|
||||||
|
swiftclient.shell.main(argv)
|
||||||
|
connection.return_value.put_object.assert_called_with(
|
||||||
|
'container',
|
||||||
|
self.tmpfile[1:], # drop leading /
|
||||||
|
mock.ANY,
|
||||||
|
headers={'x-object-meta-mtime': mtime},
|
||||||
|
query_string='multipart-manifest=put',
|
||||||
|
response_dict={})
|
||||||
|
self.assertFalse(connection.return_value.delete_object.mock_calls)
|
||||||
|
|
||||||
@mock.patch('swiftclient.service.Connection')
|
@mock.patch('swiftclient.service.Connection')
|
||||||
def test_upload_delete_dlo_segments(self, connection):
|
def test_upload_delete_dlo_segments(self, connection):
|
||||||
# Upload delete existing segments
|
# Upload delete existing segments
|
||||||
|
Loading…
x
Reference in New Issue
Block a user