Merge "Backup to swift using wrong large object type"
This commit is contained in:
commit
2f0a8610ca
9
releasenotes/notes/slo-backups-3c35135316f837e1.yaml
Normal file
9
releasenotes/notes/slo-backups-3c35135316f837e1.yaml
Normal file
@ -0,0 +1,9 @@
|
||||
---
|
||||
fixes:
|
||||
- Backups to Swift will now use Static Large Objects
|
||||
for larger backups. A new configuration option
|
||||
'backup_segment_max_size' can be set to adjust the
|
||||
segment size of the SLO. Backups that are smaller
|
||||
than the segment size will be uploaded as regular
|
||||
objects. This is an improvement over old Dynamic
|
||||
Large Object implementation. Bug 1489997.
|
@ -28,7 +28,7 @@ class Storage(Strategy):
|
||||
super(Storage, self).__init__()
|
||||
|
||||
@abc.abstractmethod
|
||||
def save(self, filename, stream):
|
||||
def save(self, filename, stream, metadata=None):
|
||||
"""Persist information from the stream."""
|
||||
|
||||
@abc.abstractmethod
|
||||
|
@ -15,6 +15,7 @@
|
||||
#
|
||||
|
||||
import hashlib
|
||||
import json
|
||||
|
||||
from oslo_log import log as logging
|
||||
|
||||
@ -64,8 +65,12 @@ class StreamReader(object):
|
||||
return '%s_%08d' % (self.base_filename, self.file_number)
|
||||
|
||||
@property
|
||||
def prefix(self):
|
||||
return '%s/%s_' % (self.container, self.base_filename)
|
||||
def first_segment(self):
|
||||
return '%s_%08d' % (self.base_filename, 0)
|
||||
|
||||
@property
|
||||
def segment_path(self):
|
||||
return '%s/%s' % (self.container, self.segment)
|
||||
|
||||
def read(self, chunk_size=CHUNK_SIZE):
|
||||
if self.end_of_segment:
|
||||
@ -97,29 +102,40 @@ class SwiftStorage(base.Storage):
|
||||
super(SwiftStorage, self).__init__(*args, **kwargs)
|
||||
self.connection = create_swift_client(self.context)
|
||||
|
||||
def save(self, filename, stream):
|
||||
def save(self, filename, stream, metadata=None):
|
||||
"""Persist information from the stream to swift.
|
||||
|
||||
The file is saved to the location <BACKUP_CONTAINER>/<filename>.
|
||||
It will be a Swift Static Large Object (SLO).
|
||||
The filename is defined on the backup runner manifest property
|
||||
which is typically in the format '<backup_id>.<ext>.gz'
|
||||
"""
|
||||
|
||||
LOG.info(_('Saving %(filename)s to %(container)s in swift.')
|
||||
% {'filename': filename, 'container': BACKUP_CONTAINER})
|
||||
|
||||
# Create the container if it doesn't already exist
|
||||
LOG.debug('Creating container %s.' % BACKUP_CONTAINER)
|
||||
self.connection.put_container(BACKUP_CONTAINER)
|
||||
|
||||
# Swift Checksum is the checksum of the concatenated segment checksums
|
||||
swift_checksum = hashlib.md5()
|
||||
|
||||
# Wrap the output of the backup process to segment it for swift
|
||||
stream_reader = StreamReader(stream, filename)
|
||||
stream_reader = StreamReader(stream, filename, MAX_FILE_SIZE)
|
||||
LOG.debug('Using segment size %s' % stream_reader.max_file_size)
|
||||
|
||||
url = self.connection.url
|
||||
# Full location where the backup manifest is stored
|
||||
location = "%s/%s/%s" % (url, BACKUP_CONTAINER, filename)
|
||||
|
||||
# Information about each segment upload job
|
||||
segment_results = []
|
||||
|
||||
# Read from the stream and write to the container in swift
|
||||
while not stream_reader.end_of_file:
|
||||
LOG.debug('Saving segment %s.' % stream_reader.segment)
|
||||
path = stream_reader.segment_path
|
||||
etag = self.connection.put_object(BACKUP_CONTAINER,
|
||||
stream_reader.segment,
|
||||
stream_reader)
|
||||
@ -134,30 +150,71 @@ class SwiftStorage(base.Storage):
|
||||
{'tag': etag, 'checksum': segment_checksum})
|
||||
return False, "Error saving data to Swift!", None, location
|
||||
|
||||
segment_results.append({
|
||||
'path': path,
|
||||
'etag': etag,
|
||||
'size_bytes': stream_reader.segment_length
|
||||
})
|
||||
|
||||
swift_checksum.update(segment_checksum)
|
||||
|
||||
# Create the manifest file
|
||||
# We create the manifest file after all the segments have been uploaded
|
||||
# so a partial swift object file can't be downloaded; if the manifest
|
||||
# file exists then all segments have been uploaded so the whole backup
|
||||
# file can be downloaded.
|
||||
headers = {'X-Object-Manifest': stream_reader.prefix}
|
||||
# The etag returned from the manifest PUT is the checksum of the
|
||||
# manifest object (which is empty); this is not the checksum we want
|
||||
self.connection.put_object(BACKUP_CONTAINER,
|
||||
filename,
|
||||
contents='',
|
||||
headers=headers)
|
||||
# All segments uploaded.
|
||||
num_segments = len(segment_results)
|
||||
LOG.debug('File uploaded in %s segments.' % num_segments)
|
||||
|
||||
# An SLO will be generated if the backup was more than one segment in
|
||||
# length.
|
||||
large_object = num_segments > 1
|
||||
|
||||
# Meta data is stored as headers
|
||||
if metadata is None:
|
||||
metadata = {}
|
||||
metadata.update(stream.metadata())
|
||||
headers = {}
|
||||
for key, value in metadata.iteritems():
|
||||
headers[self._set_attr(key)] = value
|
||||
|
||||
LOG.debug('Metadata headers: %s' % str(headers))
|
||||
if large_object:
|
||||
LOG.info(_('Creating the manifest file.'))
|
||||
manifest_data = json.dumps(segment_results)
|
||||
LOG.debug('Manifest contents: %s' % manifest_data)
|
||||
# The etag returned from the manifest PUT is the checksum of the
|
||||
# manifest object (which is empty); this is not the checksum we
|
||||
# want.
|
||||
self.connection.put_object(BACKUP_CONTAINER,
|
||||
filename,
|
||||
manifest_data,
|
||||
query_string='multipart-manifest=put')
|
||||
|
||||
# Validation checksum is the Swift Checksum
|
||||
final_swift_checksum = swift_checksum.hexdigest()
|
||||
else:
|
||||
LOG.info(_('Backup fits in a single segment. Moving segment '
|
||||
'%(segment)s to %(filename)s.')
|
||||
% {'segment': stream_reader.first_segment,
|
||||
'filename': filename})
|
||||
segment_result = segment_results[0]
|
||||
# Just rename it via a special put copy.
|
||||
headers['X-Copy-From'] = segment_result['path']
|
||||
self.connection.put_object(BACKUP_CONTAINER,
|
||||
filename, '',
|
||||
headers=headers)
|
||||
# Delete the old segment file that was copied
|
||||
LOG.debug('Deleting the old segment file %s.'
|
||||
% stream_reader.first_segment)
|
||||
self.connection.delete_object(BACKUP_CONTAINER,
|
||||
stream_reader.first_segment)
|
||||
final_swift_checksum = segment_result['etag']
|
||||
|
||||
# Validate the object by comparing checksums
|
||||
# Get the checksum according to Swift
|
||||
resp = self.connection.head_object(BACKUP_CONTAINER, filename)
|
||||
# swift returns etag in double quotes
|
||||
# e.g. '"dc3b0827f276d8d78312992cc60c2c3f"'
|
||||
etag = resp['etag'].strip('"')
|
||||
|
||||
# Check the checksum of the concatenated segment checksums against
|
||||
# swift manifest etag.
|
||||
# Raise an error and mark backup as failed
|
||||
final_swift_checksum = swift_checksum.hexdigest()
|
||||
if etag != final_swift_checksum:
|
||||
LOG.error(
|
||||
_("Error saving data to swift. Manifest "
|
||||
@ -229,8 +286,7 @@ class SwiftStorage(base.Storage):
|
||||
|
||||
storage_url, container, filename = self._explodeLocation(location)
|
||||
|
||||
_headers = self.connection.head_object(container, filename)
|
||||
headers = {'X-Object-Manifest': _headers.get('x-object-manifest')}
|
||||
headers = {}
|
||||
for key, value in metadata.items():
|
||||
headers[self._set_attr(key)] = value
|
||||
|
||||
|
@ -80,9 +80,13 @@ class BackupAgent(object):
|
||||
**parent_metadata) as bkup:
|
||||
try:
|
||||
LOG.debug("Starting backup %s.", backup_id)
|
||||
meta = {}
|
||||
meta['datastore'] = backup_info['datastore']
|
||||
meta['datastore_version'] = backup_info['datastore_version']
|
||||
success, note, checksum, location = storage.save(
|
||||
bkup.manifest,
|
||||
bkup)
|
||||
bkup,
|
||||
metadata=meta)
|
||||
|
||||
backup_state.update({
|
||||
'checksum': checksum,
|
||||
@ -102,12 +106,6 @@ class BackupAgent(object):
|
||||
if not success:
|
||||
raise BackupError(note)
|
||||
|
||||
meta = bkup.metadata()
|
||||
meta['datastore'] = backup_info['datastore']
|
||||
meta['datastore_version'] = backup_info[
|
||||
'datastore_version']
|
||||
storage.save_metadata(location, meta)
|
||||
|
||||
backup_state.update({'state': BackupState.COMPLETED})
|
||||
|
||||
return meta
|
||||
|
@ -1375,25 +1375,17 @@ class BackupTasks(object):
|
||||
container = CONF.backup_swift_container
|
||||
client = remote.create_swift_client(context)
|
||||
obj = client.head_object(container, filename)
|
||||
manifest = obj.get('x-object-manifest', '')
|
||||
cont, prefix = cls._parse_manifest(manifest)
|
||||
if all([cont, prefix]):
|
||||
# This is a manifest file, first delete all segments.
|
||||
LOG.debug("Deleting files with prefix: %(cont)s/%(prefix)s" %
|
||||
{'cont': cont, 'prefix': prefix})
|
||||
# list files from container/prefix specified by manifest
|
||||
headers, segments = client.get_container(cont, prefix=prefix)
|
||||
LOG.debug(headers)
|
||||
for segment in segments:
|
||||
name = segment.get('name')
|
||||
if name:
|
||||
LOG.debug("Deleting file: %(cont)s/%(name)s" %
|
||||
{'cont': cont, 'name': name})
|
||||
client.delete_object(cont, name)
|
||||
# Delete the manifest file
|
||||
LOG.debug("Deleting file: %(cont)s/%(filename)s" %
|
||||
{'cont': cont, 'filename': filename})
|
||||
client.delete_object(container, filename)
|
||||
if 'x-static-large-object' in obj:
|
||||
# Static large object
|
||||
LOG.debug("Deleting large object file: %(cont)s/%(filename)s" %
|
||||
{'cont': container, 'filename': filename})
|
||||
client.delete_object(container, filename,
|
||||
query_string='multipart-manifest=delete')
|
||||
else:
|
||||
# Single object
|
||||
LOG.debug("Deleting object file: %(cont)s/%(filename)s" %
|
||||
{'cont': container, 'filename': filename})
|
||||
client.delete_object(container, filename)
|
||||
|
||||
@classmethod
|
||||
def delete_backup(cls, context, backup_id):
|
||||
|
@ -44,7 +44,9 @@ class FakeSwiftClient(object):
|
||||
|
||||
class FakeSwiftConnection(object):
|
||||
"""Logging calls instead of executing."""
|
||||
MANIFEST_HEADER_KEY = 'X-Object-Manifest'
|
||||
MANIFEST_QUERY_STRING_PUT = 'multipart-manifest=put'
|
||||
MANIFEST_QUERY_STRING_DELETE = 'multipart-manifest=delete'
|
||||
COPY_OBJECT_HEADER_KEY = 'X-Copy-From'
|
||||
url = 'http://mockswift/v1'
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
@ -101,7 +103,7 @@ class FakeSwiftConnection(object):
|
||||
LOG.debug("fake put_container(%(container)s, %(name)s)" %
|
||||
{'container': container, 'name': name})
|
||||
checksum = md5()
|
||||
if self.manifest_prefix and self.manifest_name == name:
|
||||
if self.manifest_name == name:
|
||||
for object_name in sorted(self.container_objects):
|
||||
object_checksum = md5(self.container_objects[object_name])
|
||||
# The manifest file etag for a HEAD or GET is the checksum of
|
||||
@ -163,14 +165,19 @@ class FakeSwiftConnection(object):
|
||||
if container == 'socket_error_on_put':
|
||||
raise socket.error(111, 'ECONNREFUSED')
|
||||
headers = kwargs.get('headers', {})
|
||||
query_string = kwargs.get('query_string', '')
|
||||
object_checksum = md5()
|
||||
if self.MANIFEST_HEADER_KEY in headers:
|
||||
if query_string == self.MANIFEST_QUERY_STRING_PUT:
|
||||
# the manifest prefix format is <container>/<prefix> where
|
||||
# container is where the object segments are in and prefix is the
|
||||
# common prefix for all segments.
|
||||
self.manifest_prefix = headers.get(self.MANIFEST_HEADER_KEY)
|
||||
self.manifest_name = name
|
||||
object_checksum.update(contents)
|
||||
elif self.COPY_OBJECT_HEADER_KEY in headers:
|
||||
# this is a copy object operation
|
||||
source_path = headers.get(self.COPY_OBJECT_HEADER_KEY)
|
||||
source_name = source_path.split('/')[1]
|
||||
self.container_objects[name] = self.container_objects[source_name]
|
||||
else:
|
||||
if hasattr(contents, 'read'):
|
||||
chunk_size = 128
|
||||
|
@ -106,7 +106,7 @@ class MockSwift(object):
|
||||
self.etag.update(self.store)
|
||||
return self.etag.hexdigest()
|
||||
|
||||
def save(self, filename, stream):
|
||||
def save(self, filename, stream, metadata=None):
|
||||
location = '%s/%s/%s' % (self.url, self.container, filename)
|
||||
return True, 'w00t', 'fake-checksum', location
|
||||
|
||||
@ -128,7 +128,7 @@ class MockStorage(Storage):
|
||||
def load(self, location, backup_checksum):
|
||||
pass
|
||||
|
||||
def save(self, filename, stream):
|
||||
def save(self, filename, stream, metadata=None):
|
||||
pass
|
||||
|
||||
def load_metadata(self, location, checksum):
|
||||
@ -407,6 +407,8 @@ class BackupAgentTest(trove_testtools.TestCase):
|
||||
'location': 'fake-location',
|
||||
'type': 'InnoBackupEx',
|
||||
'checksum': 'fake-checksum',
|
||||
'datastore': 'mysql',
|
||||
'datastore_version': '5.5'
|
||||
}
|
||||
|
||||
self.assertRaises(backupagent.BackupError, agent.execute_backup,
|
||||
|
@ -32,12 +32,47 @@ class SwiftStorageSaveChecksumTests(trove_testtools.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(SwiftStorageSaveChecksumTests, self).setUp()
|
||||
self.max_file_size = swift.MAX_FILE_SIZE
|
||||
swift.MAX_FILE_SIZE = 128
|
||||
|
||||
def tearDown(self):
|
||||
swift.MAX_FILE_SIZE = self.max_file_size
|
||||
super(SwiftStorageSaveChecksumTests, self).tearDown()
|
||||
|
||||
def test_swift_small_file_checksum_save(self):
|
||||
"""This tests that SwiftStorage.save returns the swift checksum
|
||||
for small files.
|
||||
"""
|
||||
context = trove_testtools.TroveTestContext(self)
|
||||
backup_id = '123'
|
||||
user = 'user'
|
||||
password = 'password'
|
||||
|
||||
swift.MAX_FILE_SIZE = 2 * (1024 ** 3)
|
||||
|
||||
swift_client = FakeSwiftConnection()
|
||||
with patch.object(swift, 'create_swift_client',
|
||||
return_value=swift_client):
|
||||
storage_strategy = SwiftStorage(context)
|
||||
|
||||
with MockBackupRunner(filename=backup_id,
|
||||
user=user,
|
||||
password=password) as runner:
|
||||
(success,
|
||||
note,
|
||||
checksum,
|
||||
location) = storage_strategy.save(runner.manifest, runner)
|
||||
|
||||
self.assertTrue(success, "The backup should have been successful.")
|
||||
self.assertIsNotNone(note, "A note should have been returned.")
|
||||
self.assertEqual('http://mockswift/v1/database_backups/123.gz.enc',
|
||||
location,
|
||||
"Incorrect swift location was returned.")
|
||||
|
||||
def test_swift_checksum_save(self):
|
||||
"""This tests that SwiftStorage.save returns the swift checksum."""
|
||||
"""This tests that SwiftStorage.save returns the swift checksum for
|
||||
large files.
|
||||
"""
|
||||
context = trove_testtools.TroveTestContext(self)
|
||||
backup_id = '123'
|
||||
user = 'user'
|
||||
@ -240,9 +275,6 @@ class StreamReaderTests(trove_testtools.TestCase):
|
||||
stream_reader = StreamReader(self.runner, 'foo')
|
||||
self.assertEqual('foo', stream_reader.base_filename)
|
||||
|
||||
def test_prefix(self):
|
||||
self.assertEqual('database_backups/123_', self.stream.prefix)
|
||||
|
||||
def test_segment(self):
|
||||
self.assertEqual('123_00000000', self.stream.segment)
|
||||
|
||||
@ -326,8 +358,7 @@ class SwiftMetadataTests(trove_testtools.TestCase):
|
||||
self.swift.save_metadata(location, metadata=metadata)
|
||||
|
||||
headers = {
|
||||
'X-Object-Meta-lsn': '1234567',
|
||||
'X-Object-Manifest': None
|
||||
'X-Object-Meta-lsn': '1234567'
|
||||
}
|
||||
self.swift_client.post_object.assert_called_with(
|
||||
'backups', 'mybackup.tar', headers=headers)
|
||||
|
Loading…
Reference in New Issue
Block a user