Backup to swift using wrong large object type
Trove backups are stored in Swift as Dynamic Large Objects. This is not ideal, and could cause restore to fail as DLOs may not return all of their parts. Static Large Objects should be used instead. SLOs perform checksum and object size validation and can reduce object download times significantly. Changes are needed to the Swift storage strategy and taskmanager's backup delete model to perform actions on SLOs. The corresponding unittests and fakes are also updated. A special case occurs when the backup object fits in a single Swift object segment, in which case it will not be made a SLO. Change-Id: Ide30a1e7f909a3b7fecb16268367b56468306d42 Closes-bug: 1489997
This commit is contained in:
parent
79f350852e
commit
6672fe868a
9
releasenotes/notes/slo-backups-3c35135316f837e1.yaml
Normal file
9
releasenotes/notes/slo-backups-3c35135316f837e1.yaml
Normal file
@ -0,0 +1,9 @@
|
||||
---
|
||||
fixes:
|
||||
- Backups to Swift will now use Static Large Objects
|
||||
for larger backups. A new configuration option
|
||||
'backup_segment_max_size' can be set to adjust the
|
||||
segment size of the SLO. Backups that are smaller
|
||||
than the segment size will be uploaded as regular
|
||||
objects. This is an improvement over old Dynamic
|
||||
Large Object implementation. Bug 1489997.
|
@ -28,7 +28,7 @@ class Storage(Strategy):
|
||||
super(Storage, self).__init__()
|
||||
|
||||
@abc.abstractmethod
|
||||
def save(self, filename, stream):
|
||||
def save(self, filename, stream, metadata=None):
|
||||
"""Persist information from the stream."""
|
||||
|
||||
@abc.abstractmethod
|
||||
|
@ -15,6 +15,7 @@
|
||||
#
|
||||
|
||||
import hashlib
|
||||
import json
|
||||
|
||||
from oslo_log import log as logging
|
||||
|
||||
@ -64,8 +65,12 @@ class StreamReader(object):
|
||||
return '%s_%08d' % (self.base_filename, self.file_number)
|
||||
|
||||
@property
|
||||
def prefix(self):
|
||||
return '%s/%s_' % (self.container, self.base_filename)
|
||||
def first_segment(self):
|
||||
return '%s_%08d' % (self.base_filename, 0)
|
||||
|
||||
@property
|
||||
def segment_path(self):
|
||||
return '%s/%s' % (self.container, self.segment)
|
||||
|
||||
def read(self, chunk_size=CHUNK_SIZE):
|
||||
if self.end_of_segment:
|
||||
@ -97,29 +102,40 @@ class SwiftStorage(base.Storage):
|
||||
super(SwiftStorage, self).__init__(*args, **kwargs)
|
||||
self.connection = create_swift_client(self.context)
|
||||
|
||||
def save(self, filename, stream):
|
||||
def save(self, filename, stream, metadata=None):
|
||||
"""Persist information from the stream to swift.
|
||||
|
||||
The file is saved to the location <BACKUP_CONTAINER>/<filename>.
|
||||
It will be a Swift Static Large Object (SLO).
|
||||
The filename is defined on the backup runner manifest property
|
||||
which is typically in the format '<backup_id>.<ext>.gz'
|
||||
"""
|
||||
|
||||
LOG.info(_('Saving %(filename)s to %(container)s in swift.')
|
||||
% {'filename': filename, 'container': BACKUP_CONTAINER})
|
||||
|
||||
# Create the container if it doesn't already exist
|
||||
LOG.debug('Creating container %s.' % BACKUP_CONTAINER)
|
||||
self.connection.put_container(BACKUP_CONTAINER)
|
||||
|
||||
# Swift Checksum is the checksum of the concatenated segment checksums
|
||||
swift_checksum = hashlib.md5()
|
||||
|
||||
# Wrap the output of the backup process to segment it for swift
|
||||
stream_reader = StreamReader(stream, filename)
|
||||
stream_reader = StreamReader(stream, filename, MAX_FILE_SIZE)
|
||||
LOG.debug('Using segment size %s' % stream_reader.max_file_size)
|
||||
|
||||
url = self.connection.url
|
||||
# Full location where the backup manifest is stored
|
||||
location = "%s/%s/%s" % (url, BACKUP_CONTAINER, filename)
|
||||
|
||||
# Information about each segment upload job
|
||||
segment_results = []
|
||||
|
||||
# Read from the stream and write to the container in swift
|
||||
while not stream_reader.end_of_file:
|
||||
LOG.debug('Saving segment %s.' % stream_reader.segment)
|
||||
path = stream_reader.segment_path
|
||||
etag = self.connection.put_object(BACKUP_CONTAINER,
|
||||
stream_reader.segment,
|
||||
stream_reader)
|
||||
@ -134,30 +150,71 @@ class SwiftStorage(base.Storage):
|
||||
{'tag': etag, 'checksum': segment_checksum})
|
||||
return False, "Error saving data to Swift!", None, location
|
||||
|
||||
segment_results.append({
|
||||
'path': path,
|
||||
'etag': etag,
|
||||
'size_bytes': stream_reader.segment_length
|
||||
})
|
||||
|
||||
swift_checksum.update(segment_checksum)
|
||||
|
||||
# Create the manifest file
|
||||
# We create the manifest file after all the segments have been uploaded
|
||||
# so a partial swift object file can't be downloaded; if the manifest
|
||||
# file exists then all segments have been uploaded so the whole backup
|
||||
# file can be downloaded.
|
||||
headers = {'X-Object-Manifest': stream_reader.prefix}
|
||||
# All segments uploaded.
|
||||
num_segments = len(segment_results)
|
||||
LOG.debug('File uploaded in %s segments.' % num_segments)
|
||||
|
||||
# An SLO will be generated if the backup was more than one segment in
|
||||
# length.
|
||||
large_object = num_segments > 1
|
||||
|
||||
# Meta data is stored as headers
|
||||
if metadata is None:
|
||||
metadata = {}
|
||||
metadata.update(stream.metadata())
|
||||
headers = {}
|
||||
for key, value in metadata.iteritems():
|
||||
headers[self._set_attr(key)] = value
|
||||
|
||||
LOG.debug('Metadata headers: %s' % str(headers))
|
||||
if large_object:
|
||||
LOG.info(_('Creating the manifest file.'))
|
||||
manifest_data = json.dumps(segment_results)
|
||||
LOG.debug('Manifest contents: %s' % manifest_data)
|
||||
# The etag returned from the manifest PUT is the checksum of the
|
||||
# manifest object (which is empty); this is not the checksum we want
|
||||
# manifest object (which is empty); this is not the checksum we
|
||||
# want.
|
||||
self.connection.put_object(BACKUP_CONTAINER,
|
||||
filename,
|
||||
contents='',
|
||||
headers=headers)
|
||||
manifest_data,
|
||||
query_string='multipart-manifest=put')
|
||||
|
||||
# Validation checksum is the Swift Checksum
|
||||
final_swift_checksum = swift_checksum.hexdigest()
|
||||
else:
|
||||
LOG.info(_('Backup fits in a single segment. Moving segment '
|
||||
'%(segment)s to %(filename)s.')
|
||||
% {'segment': stream_reader.first_segment,
|
||||
'filename': filename})
|
||||
segment_result = segment_results[0]
|
||||
# Just rename it via a special put copy.
|
||||
headers['X-Copy-From'] = segment_result['path']
|
||||
self.connection.put_object(BACKUP_CONTAINER,
|
||||
filename, '',
|
||||
headers=headers)
|
||||
# Delete the old segment file that was copied
|
||||
LOG.debug('Deleting the old segment file %s.'
|
||||
% stream_reader.first_segment)
|
||||
self.connection.delete_object(BACKUP_CONTAINER,
|
||||
stream_reader.first_segment)
|
||||
final_swift_checksum = segment_result['etag']
|
||||
|
||||
# Validate the object by comparing checksums
|
||||
# Get the checksum according to Swift
|
||||
resp = self.connection.head_object(BACKUP_CONTAINER, filename)
|
||||
# swift returns etag in double quotes
|
||||
# e.g. '"dc3b0827f276d8d78312992cc60c2c3f"'
|
||||
etag = resp['etag'].strip('"')
|
||||
|
||||
# Check the checksum of the concatenated segment checksums against
|
||||
# swift manifest etag.
|
||||
# Raise an error and mark backup as failed
|
||||
final_swift_checksum = swift_checksum.hexdigest()
|
||||
if etag != final_swift_checksum:
|
||||
LOG.error(
|
||||
_("Error saving data to swift. Manifest "
|
||||
@ -229,8 +286,7 @@ class SwiftStorage(base.Storage):
|
||||
|
||||
storage_url, container, filename = self._explodeLocation(location)
|
||||
|
||||
_headers = self.connection.head_object(container, filename)
|
||||
headers = {'X-Object-Manifest': _headers.get('x-object-manifest')}
|
||||
headers = {}
|
||||
for key, value in metadata.items():
|
||||
headers[self._set_attr(key)] = value
|
||||
|
||||
|
@ -80,9 +80,13 @@ class BackupAgent(object):
|
||||
**parent_metadata) as bkup:
|
||||
try:
|
||||
LOG.debug("Starting backup %s.", backup_id)
|
||||
meta = {}
|
||||
meta['datastore'] = backup_info['datastore']
|
||||
meta['datastore_version'] = backup_info['datastore_version']
|
||||
success, note, checksum, location = storage.save(
|
||||
bkup.manifest,
|
||||
bkup)
|
||||
bkup,
|
||||
metadata=meta)
|
||||
|
||||
backup_state.update({
|
||||
'checksum': checksum,
|
||||
@ -102,12 +106,6 @@ class BackupAgent(object):
|
||||
if not success:
|
||||
raise BackupError(note)
|
||||
|
||||
meta = bkup.metadata()
|
||||
meta['datastore'] = backup_info['datastore']
|
||||
meta['datastore_version'] = backup_info[
|
||||
'datastore_version']
|
||||
storage.save_metadata(location, meta)
|
||||
|
||||
backup_state.update({'state': BackupState.COMPLETED})
|
||||
|
||||
return meta
|
||||
|
@ -1372,24 +1372,16 @@ class BackupTasks(object):
|
||||
container = CONF.backup_swift_container
|
||||
client = remote.create_swift_client(context)
|
||||
obj = client.head_object(container, filename)
|
||||
manifest = obj.get('x-object-manifest', '')
|
||||
cont, prefix = cls._parse_manifest(manifest)
|
||||
if all([cont, prefix]):
|
||||
# This is a manifest file, first delete all segments.
|
||||
LOG.debug("Deleting files with prefix: %(cont)s/%(prefix)s" %
|
||||
{'cont': cont, 'prefix': prefix})
|
||||
# list files from container/prefix specified by manifest
|
||||
headers, segments = client.get_container(cont, prefix=prefix)
|
||||
LOG.debug(headers)
|
||||
for segment in segments:
|
||||
name = segment.get('name')
|
||||
if name:
|
||||
LOG.debug("Deleting file: %(cont)s/%(name)s" %
|
||||
{'cont': cont, 'name': name})
|
||||
client.delete_object(cont, name)
|
||||
# Delete the manifest file
|
||||
LOG.debug("Deleting file: %(cont)s/%(filename)s" %
|
||||
{'cont': cont, 'filename': filename})
|
||||
if 'x-static-large-object' in obj:
|
||||
# Static large object
|
||||
LOG.debug("Deleting large object file: %(cont)s/%(filename)s" %
|
||||
{'cont': container, 'filename': filename})
|
||||
client.delete_object(container, filename,
|
||||
query_string='multipart-manifest=delete')
|
||||
else:
|
||||
# Single object
|
||||
LOG.debug("Deleting object file: %(cont)s/%(filename)s" %
|
||||
{'cont': container, 'filename': filename})
|
||||
client.delete_object(container, filename)
|
||||
|
||||
@classmethod
|
||||
|
@ -44,7 +44,9 @@ class FakeSwiftClient(object):
|
||||
|
||||
class FakeSwiftConnection(object):
|
||||
"""Logging calls instead of executing."""
|
||||
MANIFEST_HEADER_KEY = 'X-Object-Manifest'
|
||||
MANIFEST_QUERY_STRING_PUT = 'multipart-manifest=put'
|
||||
MANIFEST_QUERY_STRING_DELETE = 'multipart-manifest=delete'
|
||||
COPY_OBJECT_HEADER_KEY = 'X-Copy-From'
|
||||
url = 'http://mockswift/v1'
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
@ -101,7 +103,7 @@ class FakeSwiftConnection(object):
|
||||
LOG.debug("fake put_container(%(container)s, %(name)s)" %
|
||||
{'container': container, 'name': name})
|
||||
checksum = md5()
|
||||
if self.manifest_prefix and self.manifest_name == name:
|
||||
if self.manifest_name == name:
|
||||
for object_name in sorted(self.container_objects):
|
||||
object_checksum = md5(self.container_objects[object_name])
|
||||
# The manifest file etag for a HEAD or GET is the checksum of
|
||||
@ -163,14 +165,19 @@ class FakeSwiftConnection(object):
|
||||
if container == 'socket_error_on_put':
|
||||
raise socket.error(111, 'ECONNREFUSED')
|
||||
headers = kwargs.get('headers', {})
|
||||
query_string = kwargs.get('query_string', '')
|
||||
object_checksum = md5()
|
||||
if self.MANIFEST_HEADER_KEY in headers:
|
||||
if query_string == self.MANIFEST_QUERY_STRING_PUT:
|
||||
# the manifest prefix format is <container>/<prefix> where
|
||||
# container is where the object segments are in and prefix is the
|
||||
# common prefix for all segments.
|
||||
self.manifest_prefix = headers.get(self.MANIFEST_HEADER_KEY)
|
||||
self.manifest_name = name
|
||||
object_checksum.update(contents)
|
||||
elif self.COPY_OBJECT_HEADER_KEY in headers:
|
||||
# this is a copy object operation
|
||||
source_path = headers.get(self.COPY_OBJECT_HEADER_KEY)
|
||||
source_name = source_path.split('/')[1]
|
||||
self.container_objects[name] = self.container_objects[source_name]
|
||||
else:
|
||||
if hasattr(contents, 'read'):
|
||||
chunk_size = 128
|
||||
|
@ -106,7 +106,7 @@ class MockSwift(object):
|
||||
self.etag.update(self.store)
|
||||
return self.etag.hexdigest()
|
||||
|
||||
def save(self, filename, stream):
|
||||
def save(self, filename, stream, metadata=None):
|
||||
location = '%s/%s/%s' % (self.url, self.container, filename)
|
||||
return True, 'w00t', 'fake-checksum', location
|
||||
|
||||
@ -128,7 +128,7 @@ class MockStorage(Storage):
|
||||
def load(self, location, backup_checksum):
|
||||
pass
|
||||
|
||||
def save(self, filename, stream):
|
||||
def save(self, filename, stream, metadata=None):
|
||||
pass
|
||||
|
||||
def load_metadata(self, location, checksum):
|
||||
@ -407,6 +407,8 @@ class BackupAgentTest(trove_testtools.TestCase):
|
||||
'location': 'fake-location',
|
||||
'type': 'InnoBackupEx',
|
||||
'checksum': 'fake-checksum',
|
||||
'datastore': 'mysql',
|
||||
'datastore_version': '5.5'
|
||||
}
|
||||
|
||||
self.assertRaises(backupagent.BackupError, agent.execute_backup,
|
||||
|
@ -32,12 +32,47 @@ class SwiftStorageSaveChecksumTests(trove_testtools.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(SwiftStorageSaveChecksumTests, self).setUp()
|
||||
self.max_file_size = swift.MAX_FILE_SIZE
|
||||
swift.MAX_FILE_SIZE = 128
|
||||
|
||||
def tearDown(self):
|
||||
swift.MAX_FILE_SIZE = self.max_file_size
|
||||
super(SwiftStorageSaveChecksumTests, self).tearDown()
|
||||
|
||||
def test_swift_small_file_checksum_save(self):
|
||||
"""This tests that SwiftStorage.save returns the swift checksum
|
||||
for small files.
|
||||
"""
|
||||
context = trove_testtools.TroveTestContext(self)
|
||||
backup_id = '123'
|
||||
user = 'user'
|
||||
password = 'password'
|
||||
|
||||
swift.MAX_FILE_SIZE = 2 * (1024 ** 3)
|
||||
|
||||
swift_client = FakeSwiftConnection()
|
||||
with patch.object(swift, 'create_swift_client',
|
||||
return_value=swift_client):
|
||||
storage_strategy = SwiftStorage(context)
|
||||
|
||||
with MockBackupRunner(filename=backup_id,
|
||||
user=user,
|
||||
password=password) as runner:
|
||||
(success,
|
||||
note,
|
||||
checksum,
|
||||
location) = storage_strategy.save(runner.manifest, runner)
|
||||
|
||||
self.assertTrue(success, "The backup should have been successful.")
|
||||
self.assertIsNotNone(note, "A note should have been returned.")
|
||||
self.assertEqual('http://mockswift/v1/database_backups/123.gz.enc',
|
||||
location,
|
||||
"Incorrect swift location was returned.")
|
||||
|
||||
def test_swift_checksum_save(self):
|
||||
"""This tests that SwiftStorage.save returns the swift checksum."""
|
||||
"""This tests that SwiftStorage.save returns the swift checksum for
|
||||
large files.
|
||||
"""
|
||||
context = trove_testtools.TroveTestContext(self)
|
||||
backup_id = '123'
|
||||
user = 'user'
|
||||
@ -240,9 +275,6 @@ class StreamReaderTests(trove_testtools.TestCase):
|
||||
stream_reader = StreamReader(self.runner, 'foo')
|
||||
self.assertEqual('foo', stream_reader.base_filename)
|
||||
|
||||
def test_prefix(self):
|
||||
self.assertEqual('database_backups/123_', self.stream.prefix)
|
||||
|
||||
def test_segment(self):
|
||||
self.assertEqual('123_00000000', self.stream.segment)
|
||||
|
||||
@ -326,8 +358,7 @@ class SwiftMetadataTests(trove_testtools.TestCase):
|
||||
self.swift.save_metadata(location, metadata=metadata)
|
||||
|
||||
headers = {
|
||||
'X-Object-Meta-lsn': '1234567',
|
||||
'X-Object-Manifest': None
|
||||
'X-Object-Meta-lsn': '1234567'
|
||||
}
|
||||
self.swift_client.post_object.assert_called_with(
|
||||
'backups', 'mybackup.tar', headers=headers)
|
||||
|
Loading…
Reference in New Issue
Block a user