Change the swift file deletion to use the manifest.

* Adding a _parse_manifest helper to parse the x-object-manifest header
* Delete file using the prefix in the manifest.
* Fix error handling in the delete_backup method.

Fixes Bug: #1194653

Change-Id: I34268f768a5cdb8085607269e2c2cb95974a539d
This commit is contained in:
Robert Myers 2013-07-01 09:43:24 -05:00
parent 8c339954f0
commit 1815d5d566
4 changed files with 96 additions and 58 deletions

View File

@ -35,8 +35,9 @@ class BackupState(object):
SAVING = "SAVING"
COMPLETED = "COMPLETED"
FAILED = "FAILED"
DELETE_FAILED = "DELETE_FAILED"
RUNNING_STATES = [NEW, BUILDING, SAVING]
END_STATES = [COMPLETED, FAILED]
END_STATES = [COMPLETED, FAILED, DELETE_FAILED]
class Backup(object):

View File

@ -36,41 +36,33 @@ class SwiftStorage(base.Storage):
super(SwiftStorage, self).__init__()
self.connection = create_swift_client(context)
def set_container(self, ):
""" Set the container to store to. """
""" This creates the container if it doesn't exist. """
def save(self, save_location, stream):
""" Persist information from the stream """
# Create the container (save_location) if it doesn't already exist
self.container_name = save_location
self.segments_container_name = stream.manifest + "_segments"
self.connection.put_container(self.container_name)
self.connection.put_container(self.segments_container_name)
self.connection.put_container(save_location)
# Read from the stream and write to the container in swift
while not stream.end_of_file:
segment = stream.segment
etag = self.connection.put_object(self.segments_container_name,
segment,
etag = self.connection.put_object(save_location,
stream.segment,
stream)
# Check each segment MD5 hash against swift etag
# Raise an error and mark backup as failed
if etag != stream.schecksum.hexdigest():
print("%s %s" % (etag, stream.schecksum.hexdigest()))
LOG.error(
"Error saving data to swift. ETAG: %s File MD5: %s",
etag, stream.schecksum.hexdigest())
return (False, "Error saving data to Swift!", None, None)
checksum = stream.checksum.hexdigest()
url = self.connection.url
location = "%s/%s/%s" % (url, self.container_name, stream.manifest)
location = "%s/%s/%s" % (url, save_location, stream.manifest)
# Create the manifest file
headers = {
'X-Object-Manifest':
self.segments_container_name + "/" + stream.filename}
self.connection.put_object(self.container_name,
headers = {'X-Object-Manifest': stream.prefix}
self.connection.put_object(save_location,
stream.manifest,
contents='',
headers=headers)

View File

@ -550,21 +550,40 @@ class BuiltInstanceTasks(BuiltInstance, NotifyMixin, ConfigurationMixin):
class BackupTasks(object):
@classmethod
def _parse_manifest(cls, manifest):
# manifest is in the format 'container/prefix'
# where prefix can be 'path' or 'lots/of/paths'
try:
container_index = manifest.index('/')
prefix_index = container_index + 1
except ValueError:
return None, None
container = manifest[:container_index]
prefix = manifest[prefix_index:]
return container, prefix
@classmethod
def delete_files_from_swift(cls, context, filename):
container = CONF.backup_swift_container
client = remote.create_swift_client(context)
# Delete the manifest
if client.head_object(CONF.backup_swift_container, filename):
client.delete_object(CONF.backup_swift_container, filename)
# Delete the segments
if client.head_container(filename + "_segments"):
for obj in client.get_container(filename + "_segments")[1]:
client.delete_object(filename + "_segments", obj['name'])
# Delete the segments container
client.delete_container(filename + "_segments")
obj = client.head_object(container, filename)
manifest = obj.get('x-object-manifest', '')
cont, prefix = cls._parse_manifest(manifest)
if all([cont, prefix]):
# This is a manifest file, first delete all segments.
LOG.info("Deleting files with prefix: %s/%s", cont, prefix)
# list files from container/prefix specified by manifest
headers, segments = client.get_container(cont, prefix=prefix)
LOG.debug(headers)
for segment in segments:
name = segment.get('name')
if name:
LOG.info("Deleting file: %s/%s", cont, name)
client.delete_object(cont, name)
# Delete the manifest file
LOG.info("Deleting file: %s/%s", container, filename)
client.delete_object(container, filename)
@classmethod
def delete_backup(cls, context, backup_id):
@ -574,12 +593,17 @@ class BackupTasks(object):
filename = backup.filename
if filename:
BackupTasks.delete_files_from_swift(context, filename)
except (ClientException, ValueError) as e:
LOG.exception("Exception deleting from swift. Details: %s" % e)
LOG.error("Failed to delete swift objects")
backup.state = trove.backup.models.BackupState.FAILED
except ValueError:
backup.delete()
except ClientException as e:
if e.http_status == 404:
# Backup already deleted in swift
backup.delete()
else:
LOG.exception("Exception deleting from swift. Details: %s" % e)
backup.state = trove.backup.models.BackupState.DELETE_FAILED
backup.save()
raise TroveError("Failed to delete swift objects")
else:
backup.delete()

View File

@ -16,6 +16,7 @@ import trove.common.remote as remote
import testtools
import trove.taskmanager.models as taskmanager_models
import trove.backup.models as backup_models
from trove.common.exception import TroveError
from mockito import mock, when, unstub, any, verify, never
from swiftclient.client import ClientException
@ -39,6 +40,7 @@ class BackupTasksTest(testtools.TestCase):
when(backup_models.Backup).delete(any()).thenReturn(None)
when(backup_models.Backup).get_by_id(
any(), self.backup.id).thenReturn(self.backup)
when(backup_models.DBBackup).save(any()).thenReturn(self.backup)
when(self.backup).delete(any()).thenReturn(None)
self.swift_client = mock()
when(remote).create_swift_client(
@ -67,32 +69,51 @@ class BackupTasksTest(testtools.TestCase):
when(self.swift_client).delete_object(
any(),
filename).thenRaise(ClientException("foo"))
when(self.swift_client).head_object(any(), any()).thenReturn(None)
taskmanager_models.BackupTasks.delete_backup('dummy context',
self.backup.id)
when(self.swift_client).head_object(any(), any()).thenReturn({})
self.assertRaises(
TroveError,
taskmanager_models.BackupTasks.delete_backup,
'dummy context', self.backup.id)
verify(backup_models.Backup, never).delete(self.backup.id)
self.assertEqual(backup_models.BackupState.FAILED, self.backup.state,
"backup should be in FAILED status")
def test_delete_backup_fail_delete_container(self):
when(self.swift_client).delete_container(
any()).thenRaise(ClientException("foo"))
when(self.swift_client).head_container(any()).thenReturn(None)
taskmanager_models.BackupTasks.delete_backup('dummy context',
self.backup.id)
verify(backup_models.Backup, never).delete(self.backup.id)
self.assertEqual(backup_models.BackupState.FAILED, self.backup.state,
"backup should be in FAILED status")
self.assertEqual(
backup_models.BackupState.DELETE_FAILED,
self.backup.state,
"backup should be in DELETE_FAILED status")
def test_delete_backup_fail_delete_segment(self):
when(self.swift_client).delete_object(
any(),
'second').thenRaise(ClientException("foo"))
when(self.swift_client).delete_container(
any()).thenRaise(ClientException("foo"))
when(self.swift_client).head_container(any()).thenReturn(None)
taskmanager_models.BackupTasks.delete_backup('dummy context',
self.backup.id)
self.assertRaises(
TroveError,
taskmanager_models.BackupTasks.delete_backup,
'dummy context', self.backup.id)
verify(backup_models.Backup, never).delete(self.backup.id)
self.assertEqual(backup_models.BackupState.FAILED, self.backup.state,
"backup should be in FAILED status")
self.assertEqual(
backup_models.BackupState.DELETE_FAILED,
self.backup.state,
"backup should be in DELETE_FAILED status")
def test_parse_manifest(self):
manifest = 'container/prefix'
cont, prefix = taskmanager_models.BackupTasks._parse_manifest(manifest)
self.assertEqual(cont, 'container')
self.assertEqual(prefix, 'prefix')
def test_parse_manifest_bad(self):
manifest = 'bad_prefix'
cont, prefix = taskmanager_models.BackupTasks._parse_manifest(manifest)
self.assertEqual(cont, None)
self.assertEqual(prefix, None)
def test_parse_manifest_long(self):
manifest = 'container/long/path/to/prefix'
cont, prefix = taskmanager_models.BackupTasks._parse_manifest(manifest)
self.assertEqual(cont, 'container')
self.assertEqual(prefix, 'long/path/to/prefix')
def test_parse_manifest_short(self):
manifest = 'container/'
cont, prefix = taskmanager_models.BackupTasks._parse_manifest(manifest)
self.assertEqual(cont, 'container')
self.assertEqual(prefix, '')