Freezer metadata part2
Change-Id: I9a6febb0ce427630c3e6602047613d6804b360dd Implements-blueprint: freezer-metadata
This commit is contained in:
parent
819a5c14e8
commit
9c94d30334
|
@ -17,7 +17,6 @@ from distutils import spawn as distspawn
|
|||
import os
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
from oslo_utils import encodeutils
|
||||
import socket
|
||||
import sys
|
||||
from tempfile import NamedTemporaryFile
|
||||
|
@ -26,7 +25,7 @@ from freezer import __version__ as FREEZER_VERSION
|
|||
from freezer.utils import config as freezer_config
|
||||
from freezer.utils import utils
|
||||
from freezer.utils import winutils
|
||||
|
||||
from oslo_utils import encodeutils
|
||||
|
||||
CONF = cfg.CONF
|
||||
LOG = log.getLogger(__name__)
|
||||
|
@ -177,18 +176,16 @@ _COMMON = [
|
|||
help="Restart the backup from level 0 after n days. Valid "
|
||||
"only if --always-level option if set. If "
|
||||
"--always-level is used together with "
|
||||
"--remove-older-then, there might be "
|
||||
"--remove-older-than, there might be "
|
||||
"the chance where the initial level 0 will be removed. "
|
||||
"Default False (Disabled)"),
|
||||
cfg.FloatOpt('remove-older-than',
|
||||
short='R',
|
||||
dest='remove_older_than',
|
||||
help="Checks in the specified container for object older "
|
||||
help="Checks in the specified container for objects older "
|
||||
"than the specified days. If i.e. 30 is specified, it "
|
||||
"will remove the remote object older than 30 days. "
|
||||
"Default False (Disabled) The option "
|
||||
"--remove-older-then is deprecated and will be removed "
|
||||
"soon", deprecated_for_removal=True),
|
||||
"Default False (Disabled)"),
|
||||
cfg.StrOpt('remove-from-date',
|
||||
dest='remove_from_date',
|
||||
help="Checks the specified container and removes objects older "
|
||||
|
@ -462,11 +459,29 @@ def get_backup_args():
|
|||
|
||||
if not CONF.get('log_file'):
|
||||
log_file = None
|
||||
for file_name in ['/var/log/freezer.log', '~/.freezer/freezer.log']:
|
||||
for file_name in ['/var/log/freezer-agent/freezer-agent.log',
|
||||
'/var/log/freezer.log']:
|
||||
try:
|
||||
log_file = prepare_logging(file_name)
|
||||
except IOError:
|
||||
pass
|
||||
|
||||
if not log_file:
|
||||
# Set default working directory to ~/.freezer. If the directory
|
||||
# does not exists it is created
|
||||
work_dir = os.path.join(home, '.freezer')
|
||||
if not os.path.exists(work_dir):
|
||||
try:
|
||||
os.makedirs(work_dir)
|
||||
log_file = prepare_logging(os.path.join(work_dir,
|
||||
'freezer.log'))
|
||||
except (OSError, IOError) as err_msg:
|
||||
# This avoids freezer-agent to crash if it can't write to
|
||||
# ~/.freezer, which may happen on some env (for me,
|
||||
# it happens in Jenkins, as freezer-agent can't write to
|
||||
# /var/lib/jenkins).
|
||||
print(encodeutils.safe_decode('{}'.format(err_msg)),
|
||||
file=sys.stderr)
|
||||
if log_file:
|
||||
CONF.set_default('log_file', log_file)
|
||||
else:
|
||||
|
|
|
@ -17,16 +17,20 @@ Freezer general utils functions
|
|||
"""
|
||||
|
||||
import abc
|
||||
import json
|
||||
import multiprocessing
|
||||
from multiprocessing.queues import SimpleQueue
|
||||
import shutil
|
||||
import six
|
||||
# PyCharm will not recognize queue. Puts red squiggle line under it. That's OK.
|
||||
from six.moves import queue
|
||||
import tempfile
|
||||
import time
|
||||
|
||||
from oslo_log import log
|
||||
|
||||
from freezer.engine.exceptions import EngineException
|
||||
from freezer.storage import base
|
||||
from freezer.utils import streaming
|
||||
from freezer.utils import utils
|
||||
|
||||
|
@ -65,78 +69,123 @@ class BackupEngine(object):
|
|||
Restore stream is a consumer, that is actually does restore (for
|
||||
tar it is a thread that creates gnutar subprocess and feeds chunks
|
||||
to stdin of this thread.
|
||||
|
||||
:type storage: freezer.storage.base.Storage
|
||||
"""
|
||||
|
||||
def __init__(self, storage):
|
||||
"""
|
||||
:type storage: freezer.storage.base.Storage
|
||||
:param storage:
|
||||
:return:
|
||||
"""
|
||||
self.storage = storage
|
||||
|
||||
@abc.abstractproperty
|
||||
def name(self):
|
||||
"""
|
||||
:rtype: str
|
||||
:return: Engine name
|
||||
"""
|
||||
pass
|
||||
|
||||
def backup_stream(self, backup_path, rich_queue, manifest_path):
|
||||
"""
|
||||
:param rich_queue:
|
||||
:type rich_queue: freezer.streaming.RichQueue
|
||||
:type manifest_path: list[str]
|
||||
:param manifest_path:
|
||||
:return:
|
||||
"""
|
||||
rich_queue.put_messages(self.backup_data(backup_path, manifest_path))
|
||||
|
||||
def backup(self, backup_path, backup, queue_size=2):
|
||||
def backup(self, backup_path, hostname_backup_name, no_incremental,
|
||||
max_level, always_level, restart_always_level, queue_size=2):
|
||||
"""
|
||||
Here we now location of all interesting artifacts like metadata
|
||||
Should return stream for storing data.
|
||||
:return: stream
|
||||
"""
|
||||
manifest = backup.storage.download_meta_file(backup)
|
||||
input_queue = streaming.RichQueue(queue_size)
|
||||
prev_backup = self.storage.previous_backup(
|
||||
engine=self,
|
||||
hostname_backup_name=hostname_backup_name,
|
||||
no_incremental=no_incremental,
|
||||
max_level=max_level,
|
||||
always_level=always_level,
|
||||
restart_always_level=restart_always_level
|
||||
)
|
||||
|
||||
read_except_queue = queue.Queue()
|
||||
write_except_queue = queue.Queue()
|
||||
try:
|
||||
tmpdir = tempfile.mkdtemp()
|
||||
except Exception:
|
||||
LOG.error("Unable to create a tmp directory")
|
||||
raise
|
||||
|
||||
read_stream = streaming.QueuedThread(
|
||||
self.backup_stream,
|
||||
input_queue,
|
||||
read_except_queue,
|
||||
kwargs={"backup_path": backup_path,
|
||||
"manifest_path": manifest})
|
||||
try:
|
||||
engine_meta = utils.path_join(tmpdir, "engine_meta")
|
||||
freezer_meta = utils.path_join(tmpdir, "freezer_meta")
|
||||
if prev_backup:
|
||||
prev_backup.storage.get_file(prev_backup.engine_metadata_path,
|
||||
engine_meta)
|
||||
timestamp = utils.DateTime.now().timestamp
|
||||
level_zero_timestamp = (prev_backup.level_zero_timestamp
|
||||
if prev_backup else timestamp)
|
||||
backup = base.Backup(
|
||||
engine=self,
|
||||
hostname_backup_name=hostname_backup_name,
|
||||
level_zero_timestamp=level_zero_timestamp,
|
||||
timestamp=timestamp,
|
||||
level=(prev_backup.level + 1 if prev_backup else 0)
|
||||
)
|
||||
|
||||
write_stream = streaming.QueuedThread(
|
||||
backup.storage.write_backup,
|
||||
input_queue,
|
||||
write_except_queue,
|
||||
kwargs={"backup": backup})
|
||||
input_queue = streaming.RichQueue(queue_size)
|
||||
read_except_queue = queue.Queue()
|
||||
write_except_queue = queue.Queue()
|
||||
|
||||
read_stream.daemon = True
|
||||
write_stream.daemon = True
|
||||
read_stream = streaming.QueuedThread(
|
||||
self.backup_stream,
|
||||
input_queue,
|
||||
read_except_queue,
|
||||
kwargs={"backup_path": backup_path,
|
||||
"manifest_path": engine_meta})
|
||||
|
||||
read_stream.start()
|
||||
write_stream.start()
|
||||
write_stream = streaming.QueuedThread(
|
||||
self.storage.write_backup,
|
||||
input_queue,
|
||||
write_except_queue,
|
||||
kwargs={"backup": backup})
|
||||
|
||||
read_stream.join()
|
||||
write_stream.join()
|
||||
read_stream.daemon = True
|
||||
write_stream.daemon = True
|
||||
read_stream.start()
|
||||
write_stream.start()
|
||||
read_stream.join()
|
||||
write_stream.join()
|
||||
|
||||
# queue handling is different from SimpleQueue handling.
|
||||
def handle_except_queue(except_queue):
|
||||
if not except_queue.empty():
|
||||
while not except_queue.empty():
|
||||
e = except_queue.get_nowait()
|
||||
LOG.exception('Engine error: {0}'.format(e))
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
# queue handling is different from SimpleQueue handling.
|
||||
def handle_except_queue(except_queue):
|
||||
if not except_queue.empty():
|
||||
while not except_queue.empty():
|
||||
e = except_queue.get_nowait()
|
||||
LOG.critical('Engine error: {0}'.format(e))
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
got_exception = None
|
||||
got_exception = (handle_except_queue(read_except_queue) or
|
||||
got_exception)
|
||||
got_exception = (handle_except_queue(write_except_queue) or
|
||||
got_exception)
|
||||
got_exception = None
|
||||
got_exception = (handle_except_queue(read_except_queue) or
|
||||
got_exception)
|
||||
got_exception = (handle_except_queue(write_except_queue) or
|
||||
got_exception)
|
||||
|
||||
if (got_exception):
|
||||
raise EngineException("Engine error. Failed to backup.")
|
||||
if (got_exception):
|
||||
raise EngineException("Engine error. Failed to backup.")
|
||||
|
||||
self.post_backup(backup, manifest)
|
||||
|
||||
@abc.abstractmethod
|
||||
def post_backup(self, backup, manifest_file):
|
||||
"""
|
||||
Uploading manifest, cleaning temporary files
|
||||
:return:
|
||||
"""
|
||||
pass
|
||||
with open(freezer_meta, mode='wb') as b_file:
|
||||
b_file.write(json.dumps(self.metadata()))
|
||||
self.storage.put_metadata(engine_meta, freezer_meta, backup)
|
||||
finally:
|
||||
shutil.rmtree(tmpdir)
|
||||
|
||||
def read_blocks(self, backup, write_pipe, read_pipe, except_queue):
|
||||
# Close the read pipe in this child as it is unneeded
|
||||
|
@ -165,28 +214,42 @@ class BackupEngine(object):
|
|||
except_queue.put(e)
|
||||
raise
|
||||
|
||||
def restore(self, backup, restore_path, overwrite):
|
||||
def restore(self, hostname_backup_name, restore_path,
|
||||
overwrite,
|
||||
recent_to_date):
|
||||
"""
|
||||
:type backup: freezer.storage.Backup
|
||||
|
||||
:param hostname_backup_name:
|
||||
:param restore_path:
|
||||
:param overwrite:
|
||||
:param recent_to_date:
|
||||
"""
|
||||
LOG.info("Creation restore path: {0}".format(restore_path))
|
||||
LOG.info("Creating restore path: {0}".format(restore_path))
|
||||
# if restore path can't be created this function will raise exception
|
||||
utils.create_dir_tree(restore_path)
|
||||
if not overwrite and not utils.is_empty_dir(restore_path):
|
||||
raise Exception(
|
||||
"Restore dir is not empty. "
|
||||
"Please use --overwrite or provide different path.")
|
||||
LOG.info("Creation restore path completed")
|
||||
for level in range(0, backup.level + 1):
|
||||
b = backup.full_backup.increments[level]
|
||||
LOG.info("Restore backup {0}".format(b))
|
||||
|
||||
# Use SimpleQueue because Queue does not work on Mac OS X.
|
||||
read_except_queue = SimpleQueue()
|
||||
LOG.info("Restore path creation completed")
|
||||
backups = self.storage.get_latest_level_zero_increments(
|
||||
engine=self,
|
||||
hostname_backup_name=hostname_backup_name,
|
||||
recent_to_date=recent_to_date)
|
||||
|
||||
max_level = max(backups.keys())
|
||||
|
||||
# Use SimpleQueue because Queue does not work on Mac OS X.
|
||||
read_except_queue = SimpleQueue()
|
||||
|
||||
for level in range(0, max_level + 1):
|
||||
backup = backups[level]
|
||||
LOG.info("Restoring backup {0}".format(backup))
|
||||
read_pipe, write_pipe = multiprocessing.Pipe()
|
||||
process_stream = multiprocessing.Process(
|
||||
target=self.read_blocks,
|
||||
args=(b, write_pipe, read_pipe, read_except_queue))
|
||||
args=(backup, write_pipe, read_pipe, read_except_queue))
|
||||
|
||||
process_stream.daemon = True
|
||||
process_stream.start()
|
||||
|
@ -228,8 +291,8 @@ class BackupEngine(object):
|
|||
raise EngineException("Engine error. Failed to restore.")
|
||||
|
||||
LOG.info(
|
||||
'Restore execution successfully executed \
|
||||
for backup name {0}'.format(backup))
|
||||
'Restore completed successfully for backup name '
|
||||
'{0}'.format(backups[max_level]))
|
||||
|
||||
@abc.abstractmethod
|
||||
def restore_level(self, restore_path, read_pipe, backup, except_queue):
|
||||
|
@ -243,3 +306,7 @@ class BackupEngine(object):
|
|||
:return:
|
||||
"""
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def metadata(self):
|
||||
pass
|
||||
|
|
|
@ -31,7 +31,8 @@ class TarBackupEngine(engine.BackupEngine):
|
|||
|
||||
def __init__(
|
||||
self, compression_algo, dereference_symlink, exclude, storage,
|
||||
is_windows, chunk_size, encrypt_pass_file=None, dry_run=False):
|
||||
is_windows, max_segment_size, encrypt_pass_file=None,
|
||||
dry_run=False):
|
||||
"""
|
||||
:type storage: freezer.storage.base.Storage
|
||||
:return:
|
||||
|
@ -43,20 +44,24 @@ class TarBackupEngine(engine.BackupEngine):
|
|||
self.storage = storage
|
||||
self.is_windows = is_windows
|
||||
self.dry_run = dry_run
|
||||
self.chunk_size = chunk_size
|
||||
self.max_segment_size = max_segment_size
|
||||
super(TarBackupEngine, self).__init__(storage=storage)
|
||||
|
||||
def post_backup(self, backup, manifest):
|
||||
self.storage.upload_meta_file(backup, manifest)
|
||||
metadata = {
|
||||
"engine": "tar",
|
||||
@property
|
||||
def name(self):
|
||||
return "tar"
|
||||
|
||||
def metadata(self):
|
||||
return {
|
||||
"engine_name": self.name,
|
||||
"compression": self.compression_algo,
|
||||
"encryption": self.encrypt_pass_file is not None
|
||||
# the encrypt_pass_file might be key content so we need to covert
|
||||
# to boolean
|
||||
"encryption": bool(self.encrypt_pass_file)
|
||||
}
|
||||
|
||||
self.storage.upload_freezer_meta_data(backup, metadata)
|
||||
|
||||
def backup_data(self, backup_path, manifest_path):
|
||||
LOG.info("Tar engine backup stream enter")
|
||||
LOG.info("Starting Tar engine backup stream")
|
||||
tar_command = tar_builders.TarCommandBuilder(
|
||||
backup_path, self.compression_algo, self.is_windows)
|
||||
if self.encrypt_pass_file:
|
||||
|
@ -73,19 +78,24 @@ class TarBackupEngine(engine.BackupEngine):
|
|||
tar_process = subprocess.Popen(command, stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE, shell=True)
|
||||
read_pipe = tar_process.stdout
|
||||
tar_chunk = read_pipe.read(self.chunk_size)
|
||||
tar_chunk = read_pipe.read(self.max_segment_size)
|
||||
while tar_chunk:
|
||||
yield tar_chunk
|
||||
tar_chunk = read_pipe.read(self.chunk_size)
|
||||
tar_chunk = read_pipe.read(self.max_segment_size)
|
||||
|
||||
self.check_process_output(tar_process, 'Backup')
|
||||
|
||||
LOG.info("Tar engine streaming end")
|
||||
LOG.info("Tar engine stream completed")
|
||||
|
||||
def restore_level(self, restore_path, read_pipe, backup, except_queue):
|
||||
"""
|
||||
Restore the provided file into backup_opt_dict.restore_abs_path
|
||||
Decrypt the file if backup_opt_dict.encrypt_pass_file key is provided
|
||||
|
||||
:param restore_path:
|
||||
:param read_pipe:
|
||||
:type backup: freezer.storage.base.Backup
|
||||
:param backup:
|
||||
"""
|
||||
|
||||
try:
|
||||
|
|
|
@ -49,27 +49,18 @@ class Job(object):
|
|||
self.storage = storage
|
||||
self.engine = conf_dict.engine
|
||||
|
||||
def execute(self):
|
||||
start_time = utils.DateTime.now()
|
||||
LOG.info('Job execution Started at: {0}'.format(start_time))
|
||||
retval = self.execute_method()
|
||||
end_time = utils.DateTime.now()
|
||||
LOG.info('Job execution Finished, at: {0}'.format(end_time))
|
||||
LOG.info('Job time Elapsed: {0}'.format(end_time - start_time))
|
||||
return retval
|
||||
|
||||
@abc.abstractmethod
|
||||
def execute_method(self):
|
||||
def execute(self):
|
||||
pass
|
||||
|
||||
|
||||
class InfoJob(Job):
|
||||
def execute_method(self):
|
||||
def execute(self):
|
||||
self.storage.info()
|
||||
|
||||
|
||||
class BackupJob(Job):
|
||||
def execute_method(self):
|
||||
def execute(self):
|
||||
try:
|
||||
(out, err) = utils.create_subprocess('sync')
|
||||
if err:
|
||||
|
@ -81,9 +72,9 @@ class BackupJob(Job):
|
|||
mod_name = 'freezer.mode.{0}.{1}'.format(
|
||||
self.conf.mode, self.conf.mode.capitalize() + 'Mode')
|
||||
app_mode = importutils.import_object(mod_name, self.conf)
|
||||
backup_instance = self.backup(app_mode)
|
||||
backup_level = self.backup(app_mode)
|
||||
|
||||
level = backup_instance.level if backup_instance else 0
|
||||
level = backup_level or 0
|
||||
|
||||
metadata = {
|
||||
'curr_backup_level': level,
|
||||
|
@ -162,16 +153,14 @@ class BackupJob(Job):
|
|||
format(consistency_checksum))
|
||||
self.conf.consistency_checksum = consistency_checksum
|
||||
|
||||
hostname_backup_name = self.conf.hostname_backup_name
|
||||
backup_instance = self.storage.create_backup(
|
||||
hostname_backup_name,
|
||||
self.conf.no_incremental,
|
||||
self.conf.max_level,
|
||||
self.conf.always_level,
|
||||
self.conf.restart_always_level,
|
||||
time_stamp=time_stamp)
|
||||
self.engine.backup(filepath, backup_instance)
|
||||
return backup_instance
|
||||
return self.engine.backup(
|
||||
backup_path=filepath,
|
||||
hostname_backup_name=self.conf.hostname_backup_name,
|
||||
no_incremental=self.conf.no_incremental,
|
||||
max_level=self.conf.max_level,
|
||||
always_level=self.conf.always_level,
|
||||
restart_always_level=self.conf.restart_always_level)
|
||||
|
||||
finally:
|
||||
# whether an error occurred or not, remove the snapshot anyway
|
||||
app_mode.release()
|
||||
|
@ -201,7 +190,7 @@ class BackupJob(Job):
|
|||
|
||||
class RestoreJob(Job):
|
||||
|
||||
def execute_method(self):
|
||||
def execute(self):
|
||||
conf = self.conf
|
||||
LOG.info('Executing FS restore...')
|
||||
restore_timestamp = None
|
||||
|
@ -210,9 +199,11 @@ class RestoreJob(Job):
|
|||
if conf.restore_from_date:
|
||||
restore_timestamp = utils.date_to_timestamp(conf.restore_from_date)
|
||||
if conf.backup_media == 'fs':
|
||||
backup = self.storage.find_one(conf.hostname_backup_name,
|
||||
restore_timestamp)
|
||||
self.engine.restore(backup, restore_abs_path, conf.overwrite)
|
||||
self.engine.restore(
|
||||
hostname_backup_name=self.conf.hostname_backup_name,
|
||||
restore_path=restore_abs_path,
|
||||
overwrite=conf.overwrite,
|
||||
recent_to_date=conf.restore_from_date)
|
||||
|
||||
try:
|
||||
if conf.consistency_checksum:
|
||||
|
@ -246,13 +237,9 @@ class RestoreJob(Job):
|
|||
return {}
|
||||
|
||||
|
||||
class ConsistencyCheckException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class AdminJob(Job):
|
||||
|
||||
def execute_method(self):
|
||||
def execute(self):
|
||||
if self.conf.remove_from_date:
|
||||
timestamp = utils.date_to_timestamp(self.conf.remove_from_date)
|
||||
else:
|
||||
|
@ -260,14 +247,15 @@ class AdminJob(Job):
|
|||
datetime.timedelta(days=self.conf.remove_older_than)
|
||||
timestamp = int(time.mktime(timestamp.timetuple()))
|
||||
|
||||
self.storage.remove_older_than(timestamp,
|
||||
self.storage.remove_older_than(self.engine,
|
||||
timestamp,
|
||||
self.conf.hostname_backup_name)
|
||||
return {}
|
||||
|
||||
|
||||
class ExecJob(Job):
|
||||
|
||||
def execute_method(self):
|
||||
def execute(self):
|
||||
LOG.info('exec job....')
|
||||
if self.conf.command:
|
||||
LOG.info('Executing exec job....')
|
||||
|
@ -276,3 +264,7 @@ class ExecJob(Job):
|
|||
LOG.warning(
|
||||
'No command info options were set. Exiting.')
|
||||
return {}
|
||||
|
||||
|
||||
class ConsistencyCheckException(Exception):
|
||||
pass
|
||||
|
|
|
@ -57,7 +57,6 @@ def freezer_main(backup_args):
|
|||
|
||||
validator.validate(backup_args)
|
||||
|
||||
work_dir = backup_args.work_dir
|
||||
max_segment_size = backup_args.max_segment_size
|
||||
if (backup_args.storage ==
|
||||
'swift' or
|
||||
|
@ -67,12 +66,10 @@ def freezer_main(backup_args):
|
|||
|
||||
if backup_args.storages:
|
||||
storage = multiple.MultipleStorage(
|
||||
work_dir,
|
||||
[storage_from_dict(x, work_dir, max_segment_size)
|
||||
[storage_from_dict(x, max_segment_size)
|
||||
for x in backup_args.storages])
|
||||
else:
|
||||
storage = storage_from_dict(backup_args.__dict__, work_dir,
|
||||
max_segment_size)
|
||||
storage = storage_from_dict(backup_args.__dict__, max_segment_size)
|
||||
|
||||
backup_args.engine = tar_engine.TarBackupEngine(
|
||||
backup_args.compression,
|
||||
|
@ -127,7 +124,14 @@ def run_job(conf, storage):
|
|||
'info': job.InfoJob,
|
||||
'admin': job.AdminJob,
|
||||
'exec': job.ExecJob}[conf.action](conf, storage)
|
||||
|
||||
start_time = utils.DateTime.now()
|
||||
LOG.info('Job execution Started at: {0}'.format(start_time))
|
||||
response = freezer_job.execute()
|
||||
end_time = utils.DateTime.now()
|
||||
LOG.info('Job execution Finished, at: {0}'.format(end_time))
|
||||
LOG.info('Job time Elapsed: {0}'.format(end_time - start_time))
|
||||
LOG.info('Backup metadata received: {0}'.format(json.dumps(response)))
|
||||
|
||||
if conf.metadata_out and response:
|
||||
if conf.metadata_out == '-':
|
||||
|
@ -168,22 +172,25 @@ def get_client_manager(backup_args):
|
|||
return client_manager
|
||||
|
||||
|
||||
def storage_from_dict(backup_args, work_dir, max_segment_size):
|
||||
def storage_from_dict(backup_args, max_segment_size):
|
||||
storage_name = backup_args['storage']
|
||||
container = backup_args['container']
|
||||
if storage_name == "swift":
|
||||
client_manager = backup_args['client_manager']
|
||||
|
||||
storage = swift.SwiftStorage(
|
||||
client_manager, container, work_dir, max_segment_size)
|
||||
client_manager, container, max_segment_size)
|
||||
elif storage_name == "local":
|
||||
storage = local.LocalStorage(container, work_dir)
|
||||
storage = local.LocalStorage(
|
||||
storage_path=container,
|
||||
max_segment_size=max_segment_size)
|
||||
elif storage_name == "ssh":
|
||||
storage = ssh.SshStorage(
|
||||
container, work_dir,
|
||||
container,
|
||||
backup_args['ssh_key'], backup_args['ssh_username'],
|
||||
backup_args['ssh_host'],
|
||||
int(backup_args.get('ssh_port', freezer_config.DEFAULT_SSH_PORT)))
|
||||
int(backup_args.get('ssh_port', freezer_config.DEFAULT_SSH_PORT)),
|
||||
max_segment_size=max_segment_size)
|
||||
else:
|
||||
raise Exception("Not storage found for name {0}".format(
|
||||
backup_args['storage']))
|
||||
|
|
|
@ -22,10 +22,12 @@ from glanceclient.client import Client as glance_client
|
|||
from keystoneauth1 import loading
|
||||
from keystoneauth1 import session
|
||||
from novaclient.client import Client as nova_client
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
|
||||
from freezer.utils import utils
|
||||
|
||||
CONF = cfg.CONF
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
|
|
|
@ -38,7 +38,6 @@ def lvm_snap_remove(backup_opt_dict):
|
|||
:param backup_opt_dict.lvm_snapname: name of the snapshot lv
|
||||
:return: None, raises on error
|
||||
"""
|
||||
os.chdir(backup_opt_dict.work_dir)
|
||||
try:
|
||||
_umount(backup_opt_dict.lvm_dirmount)
|
||||
except Exception as e:
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
# (c) Copyright 2014,2015 Hewlett-Packard Development Company, L.P.
|
||||
# (c) Copyright 2015 Hewlett-Packard Development Company, L.P.
|
||||
# (c) Copyright 2016 Hewlett-Packard Enterprise Development Company, L.P.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
|
@ -14,9 +15,9 @@
|
|||
|
||||
|
||||
import abc
|
||||
import os
|
||||
import re
|
||||
import json
|
||||
import six
|
||||
import tempfile
|
||||
|
||||
from oslo_log import log
|
||||
|
||||
|
@ -32,68 +33,14 @@ class Storage(object):
|
|||
class.
|
||||
"""
|
||||
|
||||
def __init__(self, work_dir, skip_prepare=False):
|
||||
self.work_dir = work_dir
|
||||
def __init__(self, skip_prepare=False):
|
||||
if not skip_prepare:
|
||||
self.prepare()
|
||||
|
||||
def download_meta_file(self, backup):
|
||||
"""
|
||||
Downloads meta_data to work_dir of previous backup.
|
||||
:type backup: freezer.storage.base.Backup
|
||||
:param backup: A backup or increment. Current backup is incremental,
|
||||
that means we should download tar_meta for detection new files and
|
||||
changes. If backup.tar_meta is false, raise Exception
|
||||
:return:
|
||||
"""
|
||||
utils.create_dir(self.work_dir)
|
||||
if backup.level == 0:
|
||||
return utils.path_join(self.work_dir, backup.tar())
|
||||
meta_backup = backup.full_backup.increments[backup.level - 1]
|
||||
if not meta_backup.tar_meta:
|
||||
raise ValueError('Latest update have no tar_meta')
|
||||
to_path = utils.path_join(self.work_dir, meta_backup.tar())
|
||||
if os.path.exists(to_path):
|
||||
os.remove(to_path)
|
||||
meta_backup.storage.get_file(
|
||||
meta_backup.storage.meta_file_abs_path(meta_backup), to_path)
|
||||
return to_path
|
||||
|
||||
@abc.abstractmethod
|
||||
def meta_file_abs_path(self, backup):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_file(self, from_path, to_path):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def upload_meta_file(self, backup, meta_file):
|
||||
"""
|
||||
:param backup:
|
||||
:type backup: freezer.storage.base.Backup
|
||||
:param meta_file:
|
||||
:return:
|
||||
"""
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def upload_freezer_meta_data(self, backup, meta_dict):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def download_freezer_meta_data(self, backup):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def backup_blocks(self, backup):
|
||||
"""
|
||||
:param backup:
|
||||
:type backup: freezer.storage.base.Backup
|
||||
:return:
|
||||
"""
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def write_backup(self, rich_queue, backup):
|
||||
"""
|
||||
|
@ -112,111 +59,131 @@ class Storage(object):
|
|||
"""
|
||||
pass
|
||||
|
||||
def find_one(self, hostname_backup_name, recent_to_date=None):
|
||||
"""
|
||||
:param hostname_backup_name:
|
||||
:type hostname_backup_name: str
|
||||
:param recent_to_date:
|
||||
:type recent_to_date: int
|
||||
:rtype: Backup
|
||||
:return:
|
||||
"""
|
||||
backups = self.find_all(hostname_backup_name)
|
||||
if recent_to_date:
|
||||
backups = [b for b in backups
|
||||
if b.timestamp <= recent_to_date]
|
||||
err_msg = 'No matching backup name "{0}" found'\
|
||||
.format(hostname_backup_name)
|
||||
if not backups:
|
||||
raise IndexError(err_msg)
|
||||
backup = max(backups, key=lambda b: b.timestamp)
|
||||
last_increments = backup.increments.values()
|
||||
if recent_to_date:
|
||||
last_increments = [x for x in last_increments
|
||||
if x.timestamp <= recent_to_date]
|
||||
return max(last_increments, key=lambda x: x.timestamp)
|
||||
|
||||
@abc.abstractmethod
|
||||
def find_all(self, hostname_backup_name):
|
||||
def get_level_zero(self,
|
||||
engine,
|
||||
hostname_backup_name,
|
||||
recent_to_date=None):
|
||||
"""
|
||||
Gets backups by backup_name and hostname
|
||||
:type engine: freezer.engine.engine.BackupEngine
|
||||
:param hostname_backup_name:
|
||||
:type hostname_backup_name: str
|
||||
:rtype: list[freezer.storage.base.Backup]
|
||||
:return: List of matched backups
|
||||
:type recent_to_date: int
|
||||
:param recent_to_date:
|
||||
:rtype: collections.Iterable[freezer.storage.base.Backup]
|
||||
:return: dictionary of level zero timestamps with attached storage
|
||||
"""
|
||||
pass
|
||||
|
||||
def get_latest_level_zero_increments(self, engine, hostname_backup_name,
|
||||
recent_to_date=None):
|
||||
"""
|
||||
Returns the latest zero level backup with increments
|
||||
:param engine:
|
||||
:param hostname_backup_name:
|
||||
:param recent_to_date:
|
||||
:rtype: dict[int, freezer.storage.base.Backup]
|
||||
:return: Dictionary[backup_level, backup]
|
||||
"""
|
||||
zeros = self.get_level_zero(engine=engine,
|
||||
hostname_backup_name=hostname_backup_name,
|
||||
recent_to_date=recent_to_date)
|
||||
if not zeros:
|
||||
err_msg = 'No matching backup name "{0}" found'.format(
|
||||
hostname_backup_name
|
||||
)
|
||||
raise IndexError(err_msg)
|
||||
|
||||
backup = max(zeros, key=lambda backup: backup.timestamp)
|
||||
""":type : freezer.storage.base.Backup"""
|
||||
|
||||
increments = backup.get_increments()
|
||||
|
||||
return {level: backup for level, backup in increments.iteritems()
|
||||
if not recent_to_date or backup.timestamp >= recent_to_date}
|
||||
|
||||
def remove_older_than(self, engine, remove_older_timestamp,
|
||||
hostname_backup_name):
|
||||
"""
|
||||
Removes backups which are older than or equal to the specified
|
||||
timestamp
|
||||
:type engine: freezer.engine.engine.BackupEngine
|
||||
:type remove_older_timestamp: int
|
||||
:type hostname_backup_name: str
|
||||
"""
|
||||
backups = self.get_level_zero(engine, hostname_backup_name,
|
||||
remove_older_timestamp)
|
||||
for backup in backups:
|
||||
backup.remove()
|
||||
|
||||
@abc.abstractmethod
|
||||
def info(self):
|
||||
pass
|
||||
|
||||
def previous_backup(self, engine, hostname_backup_name, no_incremental,
|
||||
max_level, always_level, restart_always_level):
|
||||
"""
|
||||
:type engine: freezer.engine.engine.BackupEngine
|
||||
:param engine: engine instance
|
||||
:param hostname_backup_name: name of backup
|
||||
:param no_incremental:
|
||||
:param max_level:
|
||||
:param always_level:
|
||||
:param restart_always_level:
|
||||
:return:
|
||||
"""
|
||||
if no_incremental:
|
||||
return None
|
||||
try:
|
||||
increments = self.get_latest_level_zero_increments(
|
||||
engine=engine,
|
||||
hostname_backup_name=hostname_backup_name)
|
||||
highest_level = max(increments.keys())
|
||||
highest_level_backup = increments[highest_level]
|
||||
if max_level and max_level <= highest_level:
|
||||
return None
|
||||
if always_level and highest_level > always_level:
|
||||
return None
|
||||
expire_time = (highest_level_backup.timestamp +
|
||||
restart_always_level * 86400)
|
||||
if restart_always_level and utils.DateTime.now().timestamp > \
|
||||
expire_time:
|
||||
return None
|
||||
if always_level and highest_level == always_level:
|
||||
return increments[highest_level - 1]
|
||||
return highest_level_backup
|
||||
except IndexError:
|
||||
return None
|
||||
|
||||
@abc.abstractmethod
|
||||
def put_file(self, from_path, to_path):
|
||||
"""
|
||||
:type from_path: str
|
||||
:param from_path:
|
||||
:type to_path: str
|
||||
:param to_path:
|
||||
"""
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def remove_backup(self, backup):
|
||||
def put_metadata(self,
|
||||
engine_metadata_path,
|
||||
freezer_metadata_path,
|
||||
backup):
|
||||
"""
|
||||
:param engine_metadata_path:
|
||||
:param freezer_metadata_path:
|
||||
:type backup: freezer.storage.base.Backup
|
||||
:param backup:
|
||||
:return:
|
||||
"""
|
||||
pass
|
||||
|
||||
def remove_older_than(self, remove_older_timestamp, hostname_backup_name):
|
||||
"""
|
||||
Removes backups which are older than the specified timestamp
|
||||
:type remove_older_timestamp: int
|
||||
:type hostname_backup_name: str
|
||||
"""
|
||||
backups = self.find_all(hostname_backup_name)
|
||||
backups = [b for b in backups
|
||||
if b.latest_update.timestamp < remove_older_timestamp]
|
||||
for b in backups:
|
||||
b.storage.remove_backup(b)
|
||||
|
||||
@abc.abstractmethod
|
||||
def info(self):
|
||||
def create_dirs(self, path):
|
||||
pass
|
||||
|
||||
def create_backup(self, hostname_backup_name, no_incremental,
|
||||
max_level, always_level, restart_always_level,
|
||||
time_stamp=None):
|
||||
backups = self.find_all(hostname_backup_name)
|
||||
prev_backup = self._find_previous_backup(
|
||||
backups, no_incremental, max_level, always_level,
|
||||
restart_always_level)
|
||||
time_stamp = time_stamp or utils.DateTime.now().timestamp
|
||||
if prev_backup and prev_backup.tar_meta:
|
||||
return Backup(
|
||||
self, hostname_backup_name, time_stamp,
|
||||
prev_backup.level + 1, prev_backup.full_backup)
|
||||
else:
|
||||
return Backup(
|
||||
self, hostname_backup_name, time_stamp)
|
||||
|
||||
@staticmethod
|
||||
def _find_previous_backup(backups, no_incremental, max_level, always_level,
|
||||
restart_always_level):
|
||||
"""
|
||||
|
||||
:param backups:
|
||||
:type backups: list[freezer.storage.base.Backup]
|
||||
:param no_incremental:
|
||||
:param max_level:
|
||||
:param always_level:
|
||||
:param restart_always_level:
|
||||
:rtype: freezer.storage.base.Backup
|
||||
:return:
|
||||
"""
|
||||
if no_incremental or not backups:
|
||||
return None
|
||||
incremental_backup = max(backups, key=lambda x: x.timestamp)
|
||||
""":type : freezer.storage.base.Backup"""
|
||||
latest_update = incremental_backup.latest_update
|
||||
if max_level and max_level <= latest_update.level:
|
||||
return None
|
||||
elif always_level and latest_update.level >= always_level:
|
||||
latest_update = \
|
||||
incremental_backup.increments[latest_update.level - 1]
|
||||
elif restart_always_level and utils.DateTime.now().timestamp > \
|
||||
latest_update.timestamp + restart_always_level * 86400:
|
||||
return None
|
||||
return latest_update
|
||||
|
||||
|
||||
class Backup(object):
|
||||
"""
|
||||
|
@ -230,23 +197,16 @@ class Backup(object):
|
|||
Levels 1, 2, ... means that our backup is incremental and contains
|
||||
only smart portion of information (that was actually changed
|
||||
since the last backup)
|
||||
tar_meta - boolean value, that is true when we have available meta
|
||||
information.
|
||||
Please check for additional information about tar_meta
|
||||
http://www.gnu.org/software/tar/manual/html_node/Incremental-Dumps.html
|
||||
"""
|
||||
PATTERN = r'(.*)_(\d+)_(\d+?)$'
|
||||
|
||||
def __init__(self, storage, hostname_backup_name, timestamp, level=0,
|
||||
full_backup=None, tar_meta=False):
|
||||
def __init__(self, engine, hostname_backup_name,
|
||||
level_zero_timestamp, timestamp, level, storage=None):
|
||||
"""
|
||||
:type storage: freezer.storage.base.Storage
|
||||
:type storage: freezer.storage.physical.PhysicalStorage
|
||||
:param hostname_backup_name: name (hostname_backup_name) of backup
|
||||
:type hostname_backup_name: str
|
||||
:param timestamp: timestamp of backup (when it was executed)
|
||||
:type timestamp: int
|
||||
:param full_backup: Previous full_backup - not increment
|
||||
:type full_backup: Backup
|
||||
:param level: current incremental level of backup
|
||||
:type level: int
|
||||
:param tar_meta: Is backup has or has not an attached meta
|
||||
|
@ -254,172 +214,74 @@ class Backup(object):
|
|||
:type tar_meta: bool
|
||||
:return:
|
||||
"""
|
||||
if level == 0 and full_backup:
|
||||
raise ValueError("Detected incremental backup without level")
|
||||
self.hostname_backup_name = hostname_backup_name
|
||||
self._timestamp = timestamp
|
||||
self.tar_meta = tar_meta
|
||||
self._increments = {0: self}
|
||||
self._latest_update = self
|
||||
self._level = level
|
||||
self.storage = storage
|
||||
if not full_backup:
|
||||
self._full_backup = self
|
||||
else:
|
||||
self._full_backup = full_backup
|
||||
|
||||
@property
|
||||
def full_backup(self):
|
||||
return self._full_backup
|
||||
|
||||
@property
|
||||
def timestamp(self):
|
||||
return self._timestamp
|
||||
|
||||
@property
|
||||
def level(self):
|
||||
return self._level
|
||||
|
||||
@property
|
||||
def increments(self):
|
||||
return self._increments
|
||||
|
||||
@property
|
||||
def latest_update(self):
|
||||
return self._latest_update
|
||||
|
||||
def tar(self):
|
||||
return "tar_metadata_{0}".format(self)
|
||||
|
||||
def metadata(self):
|
||||
return self.storage.download_freezer_meta_data(self)
|
||||
|
||||
def add_increment(self, increment):
|
||||
"""
|
||||
|
||||
:param increment:
|
||||
:type increment: Backup
|
||||
:return:
|
||||
"""
|
||||
if self.level != 0:
|
||||
raise ValueError("Can not add increment to increment")
|
||||
if increment.level == 0:
|
||||
raise ValueError("Can not add increment with level 0")
|
||||
if (increment.level not in self._increments or
|
||||
increment.timestamp >
|
||||
self._increments[increment.level].timestamp):
|
||||
self._increments[increment.level] = increment
|
||||
if self.latest_update.level <= increment.level:
|
||||
self._latest_update = increment
|
||||
|
||||
def __repr__(self):
|
||||
return '_'.join([self.hostname_backup_name,
|
||||
repr(self._timestamp), repr(self._level)])
|
||||
|
||||
def __str__(self):
|
||||
return self.__repr__()
|
||||
|
||||
@staticmethod
|
||||
def parse_backups(names, storage):
|
||||
"""
|
||||
:param names:
|
||||
:type names: list[str] - file names of backups.
|
||||
:type storage: freezer.storage.base.Storage
|
||||
File name should be something like that host_backup_timestamp_level
|
||||
:rtype: list[freezer.storage.base.Backup]
|
||||
:return: list of zero level backups
|
||||
"""
|
||||
prefix = 'tar_metadata_'
|
||||
tar_names = set([x[len(prefix):]
|
||||
for x in names if x.startswith(prefix)])
|
||||
backup_names = [x for x in names if not x.startswith(prefix)]
|
||||
backups = []
|
||||
""":type: list[freezer.storage.base.BackupRepr]"""
|
||||
for name in backup_names:
|
||||
try:
|
||||
backup = Backup._parse(name)
|
||||
backup.tar_meta = name in tar_names
|
||||
backups.append(backup)
|
||||
except Exception as e:
|
||||
LOG.exception(e)
|
||||
LOG.error("cannot parse backup name: {0}".format(name))
|
||||
backups.sort(
|
||||
key=lambda x: (x.hostname_backup_name, x.timestamp, x.level))
|
||||
zero_backups = []
|
||||
""":type: list[freezer.storage.base.Backup]"""
|
||||
last_backup = None
|
||||
|
||||
""":type last_backup: freezer.storage.base.Backup"""
|
||||
for backup in backups:
|
||||
if backup.level == 0:
|
||||
last_backup = backup.backup(storage)
|
||||
zero_backups.append(last_backup)
|
||||
else:
|
||||
if last_backup:
|
||||
last_backup.add_increment(backup.backup(storage,
|
||||
last_backup))
|
||||
else:
|
||||
LOG.error("Incremental backup without parent: {0}"
|
||||
.format(backup))
|
||||
|
||||
return zero_backups
|
||||
|
||||
@staticmethod
|
||||
def _parse(value):
|
||||
"""
|
||||
:param value: String representation of backup
|
||||
:type value: str
|
||||
:return:
|
||||
"""
|
||||
match = re.search(Backup.PATTERN, value, re.I)
|
||||
if not match:
|
||||
raise ValueError("Cannot parse backup from string: " + value)
|
||||
return BackupRepr(match.group(1), int(match.group(2)),
|
||||
int(match.group(3)))
|
||||
|
||||
def __eq__(self, other):
|
||||
if self is other:
|
||||
return True
|
||||
return type(other) is type(self) and \
|
||||
self.hostname_backup_name == other.hostname_backup_name and \
|
||||
self._timestamp == other.timestamp and \
|
||||
self.tar_meta == other.tar_meta and \
|
||||
self._level == other.level and \
|
||||
len(self.increments) == len(other.increments)
|
||||
|
||||
|
||||
class BackupRepr(object):
|
||||
"""
|
||||
Intermediate for parsing purposes - it parsed backup name.
|
||||
Difference between Backup and BackupRepr - backupRepr can be parsed from
|
||||
str and doesn't require information about full_backup
|
||||
"""
|
||||
def __init__(self, hostname_backup_name, timestamp, level, tar_meta=False):
|
||||
"""
|
||||
|
||||
:param hostname_backup_name:
|
||||
:type hostname_backup_name: str
|
||||
:param timestamp:
|
||||
:type timestamp: int
|
||||
:param level:
|
||||
:type level: int
|
||||
:param tar_meta:
|
||||
:type tar_meta: bool
|
||||
:return:
|
||||
"""
|
||||
self.hostname_backup_name = hostname_backup_name
|
||||
self.timestamp = timestamp
|
||||
self.level = level
|
||||
self.tar_meta = tar_meta
|
||||
self.engine = engine
|
||||
self.storage = storage
|
||||
self.level_zero_timestamp = level_zero_timestamp
|
||||
if storage:
|
||||
self.increments_data_path = utils.path_join(
|
||||
self.storage.storage_path, "data", self.engine.name,
|
||||
self.hostname_backup_name, self.level_zero_timestamp)
|
||||
self.increments_metadata_path = utils.path_join(
|
||||
self.storage.storage_path, "metadata", self.engine.name,
|
||||
self.hostname_backup_name, self.level_zero_timestamp)
|
||||
self.data_prefix_path = utils.path_join(
|
||||
self.increments_data_path,
|
||||
"{0}_{1}".format(self.level, self.timestamp))
|
||||
self.engine_metadata_path = utils.path_join(
|
||||
self.data_prefix_path, "engine_metadata")
|
||||
self.metadata_path = utils.path_join(
|
||||
self.increments_metadata_path,
|
||||
"{0}_{1}".format(self.level, self.timestamp), "metadata")
|
||||
self.data_path = utils.path_join(self.data_prefix_path, "data")
|
||||
self.segments_path = utils.path_join(self.data_prefix_path,
|
||||
"segments")
|
||||
|
||||
def backup(self, storage, full_backup=None):
|
||||
def copy(self, storage):
|
||||
"""
|
||||
|
||||
:param storage:
|
||||
:type storage: freezer.storage.base.Storage
|
||||
:param full_backup: freezer.storage.base.Backup
|
||||
:type storage: freezer.storage.physical.PhysicalStorage
|
||||
:return:
|
||||
"""
|
||||
return Backup(storage, self.hostname_backup_name, self.timestamp,
|
||||
level=self.level, full_backup=full_backup,
|
||||
tar_meta=self.tar_meta)
|
||||
return Backup(
|
||||
engine=self.engine,
|
||||
hostname_backup_name=self.hostname_backup_name,
|
||||
level_zero_timestamp=self.level_zero_timestamp,
|
||||
timestamp=self.timestamp,
|
||||
level=self.level,
|
||||
storage=storage)
|
||||
|
||||
def remove(self):
|
||||
self.storage.rmtree(self.increments_metadata_path)
|
||||
self.storage.rmtree(self.increments_data_path)
|
||||
|
||||
def get_increments(self):
|
||||
"""
|
||||
Gets all incremental backups based on a level-zero backup with
|
||||
timestamp
|
||||
:rtype: dict[int, freezer.storage.base.Backup]
|
||||
:return: Dictionary[backup_level, backup]
|
||||
"""
|
||||
increments = self.storage.listdir(self.increments_metadata_path)
|
||||
sorted(increments)
|
||||
increments = [name.split('_') for name in increments]
|
||||
|
||||
return {int(increment[0]): Backup(
|
||||
storage=self.storage,
|
||||
engine=self.engine,
|
||||
hostname_backup_name=self.hostname_backup_name,
|
||||
timestamp=int(increment[1]),
|
||||
level_zero_timestamp=self.level_zero_timestamp,
|
||||
level=int(increment[0])
|
||||
) for increment in increments}
|
||||
|
||||
def metadata(self):
|
||||
metadata_file = tempfile.NamedTemporaryFile('wb', delete=True)
|
||||
self.storage.get_file(self.metadata_path, metadata_file.name)
|
||||
with open(metadata_file.name) as f:
|
||||
metadata_content = f.readlines()
|
||||
LOG.info("metadata content download {0}".format(metadata_content))
|
||||
metadata_file.close()
|
||||
utils.delete_file(metadata_file.name)
|
||||
return json.loads(metadata_content[0])
|
||||
|
|
|
@ -17,94 +17,38 @@ import six
|
|||
|
||||
from oslo_log import log
|
||||
|
||||
from freezer.storage import base
|
||||
from freezer.utils import utils
|
||||
from freezer.storage import physical
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class FsLikeStorage(base.Storage):
|
||||
DEFAULT_CHUNK_SIZE = 10000000
|
||||
|
||||
def __init__(self, storage_directory, work_dir,
|
||||
chunk_size=DEFAULT_CHUNK_SIZE, skip_prepare=False):
|
||||
self.storage_directory = storage_directory
|
||||
self.chunk_size = chunk_size
|
||||
super(FsLikeStorage, self).__init__(work_dir,
|
||||
skip_prepare=skip_prepare)
|
||||
|
||||
def backup_to_file_path(self, backup):
|
||||
"""
|
||||
|
||||
:param backup:
|
||||
:type backup: freezer.storage.base.Backup
|
||||
:return:
|
||||
"""
|
||||
return utils.path_join(self._zero_backup_dir(backup), backup)
|
||||
|
||||
def _zero_backup_dir(self, backup):
|
||||
"""
|
||||
:param backup:
|
||||
:type backup: freezer.storage.base.Backup
|
||||
:return:
|
||||
"""
|
||||
return utils.path_join(self.storage_directory,
|
||||
backup.hostname_backup_name,
|
||||
backup.full_backup.timestamp)
|
||||
class FsLikeStorage(physical.PhysicalStorage):
|
||||
def __init__(self, storage_path,
|
||||
max_segment_size, skip_prepare=False):
|
||||
super(FsLikeStorage, self).__init__(
|
||||
storage_path=storage_path,
|
||||
max_segment_size=max_segment_size,
|
||||
skip_prepare=skip_prepare)
|
||||
|
||||
def prepare(self):
|
||||
self.create_dirs(self.storage_directory)
|
||||
self.create_dirs(self.storage_path)
|
||||
|
||||
def info(self):
|
||||
pass
|
||||
|
||||
def meta_file_abs_path(self, backup):
|
||||
zero_backup = self._zero_backup_dir(backup)
|
||||
return utils.path_join(zero_backup, backup.tar())
|
||||
|
||||
def upload_meta_file(self, backup, meta_file):
|
||||
"""
|
||||
|
||||
:param backup:
|
||||
:type backup: freezer.storage.base.Backup
|
||||
:param meta_file:
|
||||
:return:
|
||||
"""
|
||||
zero_backup = self._zero_backup_dir(backup)
|
||||
to_path = utils.path_join(zero_backup, backup.tar())
|
||||
self.put_file(meta_file, to_path)
|
||||
|
||||
def find_all(self, hostname_backup_name):
|
||||
backups = []
|
||||
backup_dir = utils.path_join(self.storage_directory,
|
||||
hostname_backup_name)
|
||||
self.create_dirs(backup_dir)
|
||||
timestamps = self.listdir(backup_dir)
|
||||
for timestamp in timestamps:
|
||||
increments = \
|
||||
self.listdir(utils.path_join(backup_dir, timestamp))
|
||||
backups.extend(base.Backup.parse_backups(increments, self))
|
||||
return backups
|
||||
|
||||
def remove_backup(self, backup):
|
||||
"""
|
||||
:type backup: freezer.storage.base.Backup
|
||||
:return:
|
||||
"""
|
||||
self.rmtree(self._zero_backup_dir(backup))
|
||||
|
||||
def write_backup(self, rich_queue, backup):
|
||||
"""
|
||||
Stores backup in storage
|
||||
:type rich_queue: freezer.streaming.RichQueue
|
||||
:type backup: freezer.storage.base.Backup
|
||||
"""
|
||||
filename = self.backup_to_file_path(backup)
|
||||
if backup.level == 0:
|
||||
self.create_dirs(self._zero_backup_dir(backup))
|
||||
backup = backup.copy(storage=self)
|
||||
path = backup.data_path
|
||||
self.create_dirs(path.rsplit('/', 1)[0])
|
||||
|
||||
with self.open(filename, mode='wb') as b_file:
|
||||
with self.open(path, mode='wb') as \
|
||||
b_file:
|
||||
for message in rich_queue.get_messages():
|
||||
b_file.write(message)
|
||||
|
||||
|
@ -115,37 +59,21 @@ class FsLikeStorage(base.Storage):
|
|||
:type backup: freezer.storage.base.Backup
|
||||
:return:
|
||||
"""
|
||||
filename = self.backup_to_file_path(backup)
|
||||
with self.open(filename, 'rb') as backup_file:
|
||||
with self.open(backup.data_path, 'rb') as backup_file:
|
||||
while True:
|
||||
chunk = backup_file.read(self.chunk_size)
|
||||
chunk = backup_file.read(self.max_segment_size)
|
||||
if chunk == '':
|
||||
break
|
||||
if len(chunk):
|
||||
yield chunk
|
||||
|
||||
@abc.abstractmethod
|
||||
def listdir(self, directory):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def put_file(self, from_path, to_path):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def create_dirs(self, path):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def rmtree(self, path):
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def open(self, filename, mode):
|
||||
pass
|
||||
|
||||
def download_freezer_meta_data(self, backup):
|
||||
return {}
|
||||
|
||||
def upload_freezer_meta_data(self, backup, meta_dict):
|
||||
"""
|
||||
:type filename: str
|
||||
:param filename:
|
||||
:type mode: str
|
||||
:param mode:
|
||||
:return:
|
||||
"""
|
||||
pass
|
||||
|
|
|
@ -35,7 +35,10 @@ class LocalStorage(fslike.FsLikeStorage):
|
|||
shutil.copyfile(from_path, to_path)
|
||||
|
||||
def listdir(self, directory):
|
||||
return os.listdir(directory)
|
||||
try:
|
||||
return os.listdir(directory)
|
||||
except OSError:
|
||||
return list()
|
||||
|
||||
def create_dirs(self, path):
|
||||
utils.create_dir_tree(path)
|
||||
|
|
|
@ -26,12 +26,6 @@ LOG = log.getLogger(__name__)
|
|||
|
||||
class MultipleStorage(base.Storage):
|
||||
|
||||
def remove_backup(self, backup):
|
||||
raise Exception()
|
||||
|
||||
def backup_blocks(self, backup):
|
||||
raise Exception()
|
||||
|
||||
def info(self):
|
||||
for s in self.storages:
|
||||
s.info()
|
||||
|
@ -70,25 +64,42 @@ class MultipleStorage(base.Storage):
|
|||
if (got_exception):
|
||||
raise StorageException("Storage error. Failed to backup.")
|
||||
|
||||
def find_all(self, hostname_backup_name):
|
||||
backups = [b.find_all(hostname_backup_name) for b in self.storages]
|
||||
def get_level_zero(self,
|
||||
engine,
|
||||
hostname_backup_name,
|
||||
recent_to_date=None):
|
||||
backups = (
|
||||
[s.get_level_zero(engine, hostname_backup_name, recent_to_date)
|
||||
for s in self.storages]
|
||||
)
|
||||
# flat the list
|
||||
return [item for sublist in backups for item in sublist]
|
||||
|
||||
def prepare(self):
|
||||
pass
|
||||
|
||||
def upload_meta_file(self, backup, meta_file):
|
||||
def put_file(self, from_path, to_path):
|
||||
for storage in self.storages:
|
||||
storage.upload_meta_file(backup, meta_file)
|
||||
storage.put_file(from_path, to_path)
|
||||
|
||||
def __init__(self, work_dir, storages):
|
||||
def put_engine_metadata(self, from_path, backup):
|
||||
"""
|
||||
|
||||
:param from_path:
|
||||
:type backup: freezer.storage.base.Backup
|
||||
:param backup:
|
||||
:return:
|
||||
"""
|
||||
for storage in self.storages:
|
||||
storage.put_engine_metadata(from_path, backup)
|
||||
|
||||
def __init__(self, storages):
|
||||
"""
|
||||
:param storages:
|
||||
:type storages: list[freezer.storage.base.Storage]
|
||||
:return:
|
||||
"""
|
||||
super(MultipleStorage, self).__init__(work_dir)
|
||||
super(MultipleStorage, self).__init__()
|
||||
self.storages = storages
|
||||
|
||||
def download_freezer_meta_data(self, backup):
|
||||
|
|
|
@ -0,0 +1,112 @@
|
|||
"""
|
||||
(c) Copyright 2016 Hewlett-Packard Enterprise Development Company, L.P.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
|
||||
"""
|
||||
|
||||
import abc
|
||||
import os
|
||||
import six
|
||||
|
||||
from freezer.storage import base
|
||||
from freezer.utils import utils
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class PhysicalStorage(base.Storage):
|
||||
"""
|
||||
Backup like Swift, SSH or Local. Something that represents real storage.
|
||||
For example MultipleStorage is not physical.
|
||||
"""
|
||||
|
||||
def __init__(self, storage_path, max_segment_size,
|
||||
skip_prepare=False):
|
||||
self.storage_path = storage_path
|
||||
self.max_segment_size = max_segment_size
|
||||
super(PhysicalStorage, self).__init__(skip_prepare=skip_prepare)
|
||||
|
||||
def metadata_path(self, engine, hostname_backup_name):
|
||||
return utils.path_join(self.storage_path, "metadata", engine.name,
|
||||
hostname_backup_name)
|
||||
|
||||
def get_level_zero(self,
|
||||
engine,
|
||||
hostname_backup_name,
|
||||
recent_to_date=None):
|
||||
"""
|
||||
Gets backups by backup_name and hostname
|
||||
|
||||
:type engine: freezer.engine.engine.BackupEngine
|
||||
:param engine: Search for backups made by specified engine
|
||||
:type hostname_backup_name: str
|
||||
:param hostname_backup_name: Search for backup with specified name
|
||||
:type recent_to_date: int
|
||||
:param recent_to_date:
|
||||
:rtype: list[freezer.storage.base.Backup]
|
||||
:return: dictionary of level zero timestamps with attached storage
|
||||
"""
|
||||
|
||||
path = self.metadata_path(
|
||||
engine=engine,
|
||||
hostname_backup_name=hostname_backup_name)
|
||||
|
||||
zeros = [base.Backup(
|
||||
storage=self,
|
||||
engine=engine,
|
||||
hostname_backup_name=hostname_backup_name,
|
||||
level_zero_timestamp=int(t),
|
||||
timestamp=int(t),
|
||||
level=0) for t in self.listdir(path)]
|
||||
if recent_to_date:
|
||||
zeros = [zero for zero in zeros
|
||||
if zero.timestamp >= recent_to_date]
|
||||
return zeros
|
||||
|
||||
@abc.abstractmethod
|
||||
def backup_blocks(self, backup):
|
||||
"""
|
||||
:param backup:
|
||||
:type backup: freezer.storage.base.Backup
|
||||
:return:
|
||||
"""
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def listdir(self, path):
|
||||
"""
|
||||
:type path: str
|
||||
:param path:
|
||||
:rtype: collections.Iterable[str]
|
||||
"""
|
||||
pass
|
||||
|
||||
def put_metadata(self,
|
||||
engine_metadata_path,
|
||||
freezer_metadata_path,
|
||||
backup):
|
||||
"""
|
||||
:param engine_metadata_path:
|
||||
:param freezer_metadata_path:
|
||||
:type backup: freezer.storage.base.Backup
|
||||
:param backup:
|
||||
:return:
|
||||
"""
|
||||
backup = backup.copy(self)
|
||||
self.put_file(engine_metadata_path, backup.engine_metadata_path)
|
||||
self.create_dirs(os.path.dirname(backup.metadata_path))
|
||||
self.put_file(freezer_metadata_path, backup.metadata_path)
|
||||
|
||||
@abc.abstractmethod
|
||||
def rmtree(self, path):
|
||||
pass
|
|
@ -33,14 +33,12 @@ class SshStorage(fslike.FsLikeStorage):
|
|||
"""
|
||||
:type ftp: paramiko.SFTPClient
|
||||
"""
|
||||
DEFAULT_CHUNK_SIZE = 10000000
|
||||
|
||||
def __init__(self, storage_directory, work_dir, ssh_key_path,
|
||||
remote_username, remote_ip, port,
|
||||
chunk_size=DEFAULT_CHUNK_SIZE):
|
||||
def __init__(self, storage_path, ssh_key_path,
|
||||
remote_username, remote_ip, port, max_segment_size):
|
||||
"""
|
||||
:param storage_directory: directory of storage
|
||||
:type storage_directory: str
|
||||
:param storage_path: directory of storage
|
||||
:type storage_path: str
|
||||
:return:
|
||||
"""
|
||||
self.ssh_key_path = ssh_key_path
|
||||
|
@ -50,8 +48,9 @@ class SshStorage(fslike.FsLikeStorage):
|
|||
self.ssh = None
|
||||
self.ftp = None
|
||||
self.init()
|
||||
super(SshStorage, self).__init__(storage_directory, work_dir,
|
||||
chunk_size)
|
||||
super(SshStorage, self).__init__(
|
||||
storage_path=storage_path,
|
||||
max_segment_size=max_segment_size)
|
||||
|
||||
def init(self):
|
||||
ssh = paramiko.SSHClient()
|
||||
|
|
|
@ -20,35 +20,42 @@ from oslo_log import log
|
|||
import requests.exceptions
|
||||
import time
|
||||
|
||||
from freezer.storage import base
|
||||
from freezer.storage import physical
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
||||
class SwiftStorage(base.Storage):
|
||||
class SwiftStorage(physical.PhysicalStorage):
|
||||
"""
|
||||
:type client_manager: freezer.osclients.ClientManager
|
||||
"""
|
||||
|
||||
def __init__(self, client_manager, container, work_dir, max_segment_size,
|
||||
def rmtree(self, path):
|
||||
split = path.rsplit('/', 1)
|
||||
for file in self.swift().get_container(split[0],
|
||||
prefix=split[1])[1]:
|
||||
try:
|
||||
self.swift().delete_object(split[0], file['name'])
|
||||
except Exception as e:
|
||||
raise e
|
||||
|
||||
def put_file(self, from_path, to_path):
|
||||
self.client_manager.create_swift()
|
||||
split = to_path.rsplit('/', 1)
|
||||
with open(from_path, 'r') as meta_fd:
|
||||
self.swift().put_object(split[0], split[1], meta_fd)
|
||||
|
||||
def __init__(self, client_manager, container, max_segment_size,
|
||||
skip_prepare=False):
|
||||
"""
|
||||
:type client_manager: freezer.osclients.ClientManager
|
||||
:type client_manager: freezer.osclients.OSClientManager
|
||||
:type container: str
|
||||
"""
|
||||
self.client_manager = client_manager
|
||||
# The containers used by freezer to executed backups needs to have
|
||||
# freezer_ prefix in the name. If the user provider container doesn't
|
||||
# have the prefix, it is automatically added also to the container
|
||||
# segments name. This is done to quickly identify the containers
|
||||
# that contain freezer generated backups
|
||||
if not container.startswith('freezer_'):
|
||||
self.container = 'freezer_{0}'.format(container)
|
||||
else:
|
||||
self.container = container
|
||||
self.segments = u'{0}_segments'.format(container)
|
||||
self.max_segment_size = max_segment_size
|
||||
super(SwiftStorage, self).__init__(work_dir, skip_prepare)
|
||||
super(SwiftStorage, self).__init__(
|
||||
storage_path=container,
|
||||
max_segment_size=max_segment_size,
|
||||
skip_prepare=skip_prepare)
|
||||
|
||||
def swift(self):
|
||||
"""
|
||||
|
@ -67,12 +74,13 @@ class SwiftStorage(base.Storage):
|
|||
|
||||
count = 0
|
||||
success = False
|
||||
split = path.rsplit('/', 1)
|
||||
while not success:
|
||||
try:
|
||||
LOG.info(
|
||||
'Uploading file chunk index: {0}'.format(path))
|
||||
self.swift().put_object(
|
||||
self.segments, path, content,
|
||||
split[0], split[1], content,
|
||||
content_type='application/octet-stream',
|
||||
content_length=len(content))
|
||||
LOG.info('Data successfully uploaded!')
|
||||
|
@ -96,27 +104,15 @@ class SwiftStorage(base.Storage):
|
|||
:param backup: Backup
|
||||
:type backup: freezer.storage.base.Backup
|
||||
"""
|
||||
backup = backup.copy(storage=self)
|
||||
self.client_manager.create_swift()
|
||||
headers = {'x-object-manifest':
|
||||
u'{0}/{1}'.format(self.segments, backup)}
|
||||
LOG.info('Uploading Swift Manifest: {0}'.format(backup))
|
||||
self.swift().put_object(container=self.container, obj=str(backup),
|
||||
headers = {'x-object-manifest': backup.segments_path}
|
||||
LOG.info('[*] Uploading Swift Manifest: {0}'.format(backup))
|
||||
split = backup.data_path.rsplit('/', 1)
|
||||
self.swift().put_object(container=split[0], obj=split[1],
|
||||
contents=u'', headers=headers)
|
||||
LOG.info('Manifest successfully uploaded!')
|
||||
|
||||
def upload_meta_file(self, backup, meta_file):
|
||||
# Upload swift manifest for segments
|
||||
# Request a new auth client in case the current token
|
||||
# is expired before uploading tar meta data or the swift manifest
|
||||
self.client_manager.create_swift()
|
||||
|
||||
# Upload tar incremental meta data file and remove it
|
||||
LOG.info('Uploading tar meta data file: {0}'.format(
|
||||
backup.tar()))
|
||||
with open(meta_file, 'r') as meta_fd:
|
||||
self.swift().put_object(
|
||||
self.container, backup.tar(), meta_fd)
|
||||
|
||||
def prepare(self):
|
||||
"""
|
||||
Check if the provided container is already available on Swift.
|
||||
|
@ -125,10 +121,8 @@ class SwiftStorage(base.Storage):
|
|||
account.
|
||||
"""
|
||||
containers_list = [c['name'] for c in self.swift().get_account()[1]]
|
||||
if self.container not in containers_list:
|
||||
self.swift().put_container(self.container)
|
||||
if self.segments not in containers_list:
|
||||
self.swift().put_container(self.segments)
|
||||
if self.storage_path not in containers_list:
|
||||
self.swift().put_container(self.storage_path)
|
||||
|
||||
def info(self):
|
||||
ordered_container = {}
|
||||
|
@ -145,37 +139,15 @@ class SwiftStorage(base.Storage):
|
|||
ordered_container, indent=4,
|
||||
separators=(',', ': '), sort_keys=True))
|
||||
|
||||
def meta_file_abs_path(self, backup):
|
||||
return backup.tar()
|
||||
|
||||
def get_file(self, from_path, to_path):
|
||||
split = from_path.split('/', 1)
|
||||
with open(to_path, 'ab') as obj_fd:
|
||||
iterator = self.swift().get_object(
|
||||
self.container, from_path,
|
||||
split[0], split[1],
|
||||
resp_chunk_size=self.max_segment_size)[1]
|
||||
for obj_chunk in iterator:
|
||||
obj_fd.write(obj_chunk)
|
||||
|
||||
def remove(self, container, prefix):
|
||||
for segment in self.swift().get_container(container, prefix=prefix)[1]:
|
||||
self.swift().delete_object(container, segment['name'])
|
||||
|
||||
def remove_backup(self, backup):
|
||||
"""
|
||||
Removes backup, all increments, tar_meta and segments
|
||||
:param backup:
|
||||
:type backup: freezer.storage.base.Backup
|
||||
:return:
|
||||
"""
|
||||
for i in range(backup.latest_update.level, -1, -1):
|
||||
if i in backup.increments:
|
||||
# remove segment
|
||||
self.remove(self.segments, backup.increments[i])
|
||||
# remove tar
|
||||
self.remove(self.container, backup.increments[i].tar())
|
||||
# remove manifest
|
||||
self.remove(self.container, backup.increments[i])
|
||||
|
||||
def add_stream(self, stream, package_name, headers=None):
|
||||
i = 0
|
||||
for el in stream:
|
||||
|
@ -190,19 +162,6 @@ class SwiftStorage(base.Storage):
|
|||
self.swift().put_object(self.container, package_name, "",
|
||||
headers=headers)
|
||||
|
||||
def find_all(self, hostname_backup_name):
|
||||
"""
|
||||
:rtype: list[freezer.storage.base.Backup]
|
||||
:return: list of zero level backups
|
||||
"""
|
||||
try:
|
||||
files = self.swift().get_container(self.container)[1]
|
||||
names = [x['name'] for x in files if 'name' in x]
|
||||
return [b for b in base.Backup.parse_backups(names, self)
|
||||
if b.hostname_backup_name == hostname_backup_name]
|
||||
except Exception as error:
|
||||
raise Exception('Error: get_object_list: {0}'.format(error))
|
||||
|
||||
def backup_blocks(self, backup):
|
||||
"""
|
||||
|
||||
|
@ -210,14 +169,15 @@ class SwiftStorage(base.Storage):
|
|||
:type backup: freezer.storage.base.Backup
|
||||
:return:
|
||||
"""
|
||||
split = backup.data_path.split('/', 1)
|
||||
try:
|
||||
chunks = self.swift().get_object(
|
||||
self.container, str(backup),
|
||||
chunks = self.client_manager.create_swift().get_object(
|
||||
split[0], split[1],
|
||||
resp_chunk_size=self.max_segment_size)[1]
|
||||
except requests.exceptions.SSLError as e:
|
||||
LOG.warning(e)
|
||||
chunks = self.client_manager.create_swift().get_object(
|
||||
self.container, str(backup),
|
||||
split[0], split[1],
|
||||
resp_chunk_size=self.max_segment_size)[1]
|
||||
|
||||
for chunk in chunks:
|
||||
|
@ -229,15 +189,36 @@ class SwiftStorage(base.Storage):
|
|||
:type rich_queue: freezer.streaming.RichQueue
|
||||
:type backup: freezer.storage.base.Backup
|
||||
"""
|
||||
backup = backup.copy(storage=self)
|
||||
for block_index, message in enumerate(rich_queue.get_messages()):
|
||||
segment_package_name = u'{0}/{1}/{2}/{3}'.format(
|
||||
backup, backup.timestamp,
|
||||
self.max_segment_size, "%08d" % block_index)
|
||||
segment_package_name = u'{0}/{1}'.format(
|
||||
backup.segments_path, "%08d" % block_index)
|
||||
self.upload_chunk(message, segment_package_name)
|
||||
self.upload_manifest(backup)
|
||||
|
||||
def download_freezer_meta_data(self, backup):
|
||||
return {}
|
||||
def listdir(self, path):
|
||||
"""
|
||||
:type path: str
|
||||
:param path:
|
||||
:rtype: collections.Iterable[str]
|
||||
"""
|
||||
try:
|
||||
# split[0] will have container name and the split[1] will have
|
||||
# the rest of the path. If the path is
|
||||
# freezer_backups/tar/server1.cloud.com_testest/
|
||||
# split[0] = freezer_backups which is container name
|
||||
# split[1] = tar/server1.cloud.com_testest/
|
||||
split = path.split('/', 1)
|
||||
files = self.swift().get_container(container=split[0],
|
||||
full_listing=True,
|
||||
prefix=split[1])[1]
|
||||
# @todo normalize intro plain for loop to be easily
|
||||
# understandable (szaher)
|
||||
return set(f['name'][len(split[1]):].split('/', 2)[1] for f in
|
||||
files)
|
||||
except Exception as e:
|
||||
LOG.info(e)
|
||||
return []
|
||||
|
||||
def upload_freezer_meta_data(self, backup, meta_dict):
|
||||
def create_dirs(self, folder_list):
|
||||
pass
|
||||
|
|
|
@ -297,7 +297,6 @@ class BackupOpt1:
|
|||
self.max_segment_size = '0'
|
||||
self.time_stamp = 123456789
|
||||
self.container = 'test-container'
|
||||
self.work_dir = '/tmp'
|
||||
self.max_level = '20'
|
||||
self.encrypt_pass_file = '/dev/random'
|
||||
self.always_level = '20'
|
||||
|
@ -333,7 +332,6 @@ class BackupOpt1:
|
|||
self.client_manager.create_swift = self.client_manager.get_swift
|
||||
self.storage = swift.SwiftStorage(self.client_manager,
|
||||
self.container,
|
||||
self.work_dir,
|
||||
self.max_segment_size)
|
||||
self.engine = tar_engine.TarBackupEngine(
|
||||
self.compression, self.dereference_symlink,
|
||||
|
|
|
@ -92,15 +92,26 @@ class TestFreezerCompressGzip(base.BaseFreezerTest):
|
|||
:param metadata: the parsed json file metadata
|
||||
:return: the mimetype
|
||||
"""
|
||||
hostname = metadata['hostname']
|
||||
backup_name = metadata['backup_name']
|
||||
time_stamp = metadata['time_stamp']
|
||||
dir_name = '%s_%s' % (hostname, backup_name)
|
||||
file_name = '%s_%s_%s_0' % (hostname, backup_name, time_stamp)
|
||||
"""
|
||||
Data is stored like data/tar/localhost_False/1469786264/0_1469786264 so
|
||||
we need build the same directory structure.
|
||||
data: the directory that holds the backup data
|
||||
tar: the engine used to create backup
|
||||
localhost: the hostname of the machine where the backup was taken
|
||||
False: it should be backup name or False is backup is not provided
|
||||
1469786264: timestamp
|
||||
0_1469786264: level zero timestamp
|
||||
"""
|
||||
data_file_path = 'data{0}{1}{0}{2}_{3}{0}{4}{0}{5}_{4}{0}data'.format(
|
||||
os.path.sep,
|
||||
"tar", # currently we support only tar
|
||||
metadata['hostname'],
|
||||
metadata['backup_name'],
|
||||
metadata['time_stamp'],
|
||||
metadata['curr_backup_level']
|
||||
)
|
||||
data_file_path = os.path.join(self.storage_tree.path,
|
||||
dir_name,
|
||||
str(time_stamp),
|
||||
file_name)
|
||||
data_file_path)
|
||||
self.assertEqual(True, os.path.exists(data_file_path))
|
||||
|
||||
# run 'file' in brief mode to only output the values we want
|
||||
|
|
|
@ -15,10 +15,10 @@ limitations under the License.
|
|||
|
||||
Freezer general utils functions
|
||||
"""
|
||||
from six.moves import queue
|
||||
import threading
|
||||
|
||||
from oslo_log import log
|
||||
from six.moves import queue
|
||||
|
||||
import threading
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
|
@ -112,4 +112,5 @@ class QueuedThread(threading.Thread):
|
|||
self._exception_queue.put_nowait(e)
|
||||
self.rich_queue.force_stop()
|
||||
# Thread will exit at this point.
|
||||
raise
|
||||
# @todo print the error using traceback.print_exc(file=sys.stdout)
|
||||
raise e
|
||||
|
|
|
@ -110,7 +110,9 @@ class DateTime(object):
|
|||
|
||||
|
||||
def path_join(*args):
|
||||
"""Should work for windows and linux"""
|
||||
"""Should work for windows and linux
|
||||
:rtype: str
|
||||
"""
|
||||
return "/".join([str(x) for x in args])
|
||||
|
||||
|
||||
|
|
|
@ -1,203 +0,0 @@
|
|||
# (c) Copyright 2014,2015 Hewlett-Packard Development Company, L.P.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
import unittest
|
||||
from freezer.storage import base
|
||||
from freezer.storage import local
|
||||
import mock
|
||||
|
||||
|
||||
class TestBackup(unittest.TestCase):
|
||||
def test_backup_parse(self):
|
||||
self.assertRaises(ValueError, base.Backup._parse, "asdfasdfasdf")
|
||||
backup = base.Backup._parse("test_name_host_1234_0")
|
||||
self.assertEqual(backup.level, 0)
|
||||
self.assertEqual(backup.timestamp, 1234)
|
||||
self.assertEqual(backup.hostname_backup_name, "test_name_host")
|
||||
|
||||
def test_backup_creation(self):
|
||||
backup = base.Backup(None, "name", 1234)
|
||||
self.assertEqual(backup.hostname_backup_name, "name")
|
||||
self.assertEqual(backup.timestamp, 1234)
|
||||
self.assertEqual(backup.level, 0)
|
||||
self.assertEqual(backup.latest_update.level, 0)
|
||||
self.assertEqual(backup.latest_update.timestamp, 1234)
|
||||
self.assertEqual(backup.latest_update.hostname_backup_name, "name")
|
||||
self.assertEqual(len(backup.increments), 1)
|
||||
|
||||
def test_backup_full_backup(self):
|
||||
ok = False
|
||||
try:
|
||||
base.Backup(None, "name", 1324, 0, "full_backup")
|
||||
except ValueError:
|
||||
ok = True
|
||||
if not ok:
|
||||
raise Exception("Should throw ValueError")
|
||||
|
||||
def test_backup_increment(self):
|
||||
backup = base.Backup(None, "name", 1234)
|
||||
self.assertRaises(ValueError, backup.add_increment, backup)
|
||||
increment = base.Backup(None, "name", 4567, 1, backup)
|
||||
backup.add_increment(increment)
|
||||
self.assertEqual(len(backup.increments), 2)
|
||||
|
||||
def test__find_previous_backup(self):
|
||||
backup = base.Backup(None, "name", 1234)
|
||||
b = base.Storage._find_previous_backup([backup], False, 2, False, 0)
|
||||
assert b == backup
|
||||
|
||||
def test__find_previous_backup_with_max_level(self):
|
||||
backup = base.Backup(None, "name", 1234)
|
||||
i1 = base.Backup(None, "name", 1234, 1, backup)
|
||||
i2 = base.Backup(None, "name", 1234, 2, backup)
|
||||
backup.add_increment(i1)
|
||||
backup.add_increment(i2)
|
||||
b = base.Storage._find_previous_backup([backup], False, 2, False, 0)
|
||||
assert not b
|
||||
|
||||
def test__find_previous_backup_with_max_level_not_reached(self):
|
||||
backup = base.Backup(None, "name", 1234)
|
||||
i1 = base.Backup(None, "name", 1234, 1, backup)
|
||||
backup.add_increment(i1)
|
||||
b = base.Storage._find_previous_backup([backup], False, 2, False, 0)
|
||||
assert b == i1
|
||||
|
||||
def test__find_previous_backup_with_always_level_reached(self):
|
||||
backup = base.Backup(None, "name", 1234)
|
||||
i1 = base.Backup(None, "name", 1234, 1, backup)
|
||||
i2 = base.Backup(None, "name", 1234, 2, backup)
|
||||
backup.add_increment(i1)
|
||||
backup.add_increment(i2)
|
||||
b = base.Storage._find_previous_backup([backup], False, False, 2, 0)
|
||||
assert b == i1
|
||||
|
||||
def test__find_previous_backup_with_always_level_reached_2(self):
|
||||
backup = base.Backup(None, "name", 1234)
|
||||
i1 = base.Backup(None, "name", 1234, 1, backup)
|
||||
i2 = base.Backup(None, "name", 1234, 2, backup)
|
||||
backup.add_increment(i1)
|
||||
backup.add_increment(i2)
|
||||
b = base.Storage._find_previous_backup([backup], False, False, 3, 0)
|
||||
assert b == i2
|
||||
|
||||
def test_add_increment_raises(self):
|
||||
backup = base.Backup(None, "name", 1234, level=3)
|
||||
self.assertRaises(ValueError, backup.add_increment, None)
|
||||
|
||||
def test_restore_latest_backup(self):
|
||||
t = local.LocalStorage("", None, skip_prepare=True)
|
||||
t.find_all = mock.Mock()
|
||||
last = base.Backup(t, "host_backup", 5000)
|
||||
t.find_all.return_value = [
|
||||
base.Backup(t, "host_backup", 1000),
|
||||
base.Backup(t, "host_backup", 2000),
|
||||
base.Backup(t, "host_backup", 3000),
|
||||
base.Backup(t, "host_backup", 4000),
|
||||
base.Backup(t, "host_backup_f", 1000),
|
||||
last
|
||||
]
|
||||
assert t.find_one("host_backup") == last
|
||||
|
||||
def test_find_latest_backup_respects_increments_timestamp(self):
|
||||
test_backup = base.Backup(None, "host_backup", 5500)
|
||||
increment = base.Backup(None, "host_backup", 6000, 1, test_backup)
|
||||
test_backup.add_increment(increment)
|
||||
t = local.LocalStorage(None, None, skip_prepare=True)
|
||||
t.find_all = mock.Mock()
|
||||
t.find_all.return_value = [
|
||||
test_backup,
|
||||
base.Backup(None, "host_backup", 2000),
|
||||
base.Backup(None, "host_backup", 3000),
|
||||
base.Backup(None, "host_backup", 4000),
|
||||
base.Backup(None, "host_backup_f", 1000),
|
||||
base.Backup(None, "host_backup", 5000),
|
||||
]
|
||||
assert t.find_one("host_backup") == increment
|
||||
|
||||
def test_restore_from_date(self):
|
||||
t = local.LocalStorage(None, None, skip_prepare=True)
|
||||
t.find_all = mock.Mock()
|
||||
backup_restore = base.Backup(None, "host_backup", 3000)
|
||||
t.find_all.return_value = [
|
||||
base.Backup(None, "host_backup", 1000),
|
||||
base.Backup(None, "host_backup", 2000),
|
||||
backup_restore,
|
||||
base.Backup(None, "host_backup", 4000),
|
||||
base.Backup(None, "host_backup_f", 1000),
|
||||
base.Backup(None, "host_backup", 5000),
|
||||
]
|
||||
assert t.find_one("host_backup", 3234) == backup_restore
|
||||
|
||||
def test_restore_from_date_increment(self):
|
||||
t = local.LocalStorage(None, None, skip_prepare=True)
|
||||
t.find_all = mock.Mock()
|
||||
test_backup = base.Backup(None, "host_backup", 1000)
|
||||
increment = base.Backup(None, "host_backup", 3200, 1, test_backup)
|
||||
test_backup.add_increment(increment)
|
||||
t.find_all.return_value = [
|
||||
test_backup,
|
||||
base.Backup(None, "host_backup", 4000),
|
||||
base.Backup(None, "host_backup_f", 1000),
|
||||
base.Backup(None, "host_backup", 5000),
|
||||
]
|
||||
assert t.find_one("host_backup", 3234) == increment
|
||||
|
||||
def test__get_backups_wrong_name(self):
|
||||
result = base.Backup.parse_backups(["hostname"], None)
|
||||
assert len(result) == 0
|
||||
result = base.Backup.parse_backups(["hostname_100_2"], None)
|
||||
assert len(result) == 0
|
||||
|
||||
def test__get_backups_good_name(self):
|
||||
result = base.Backup.parse_backups(["host_backup_100_0"], None)
|
||||
assert len(result) == 1
|
||||
result = result[0]
|
||||
assert result.hostname_backup_name == "host_backup"
|
||||
assert result.timestamp == 100
|
||||
assert result.level == 0
|
||||
|
||||
def test_remove_older_than(self):
|
||||
t = local.LocalStorage(None, None, skip_prepare=True)
|
||||
t.find_all = mock.Mock()
|
||||
r1 = base.Backup(t, "host_backup", 1000)
|
||||
r2 = base.Backup(t, "host_backup", 2000)
|
||||
t.find_all.return_value = [
|
||||
r1,
|
||||
r2,
|
||||
base.Backup(t, "host_backup", 3000),
|
||||
base.Backup(t, "host_backup", 4000),
|
||||
base.Backup(t, "host_backup", 5000),
|
||||
]
|
||||
t.remove_backup = mock.Mock()
|
||||
t.remove_older_than(3000, "host_backup")
|
||||
t.remove_backup.assert_any_call(r1)
|
||||
t.remove_backup.assert_any_call(r2)
|
||||
assert t.remove_backup.call_count == 2
|
||||
|
||||
def test_create_backup(self):
|
||||
t = local.LocalStorage(None, None, skip_prepare=True)
|
||||
t.find_all = mock.Mock()
|
||||
t.find_all.return_value = []
|
||||
t._find_previous_backup = mock.Mock()
|
||||
t._find_previous_backup.return_value = \
|
||||
base.Backup(None, "host_backup", 3000, tar_meta=True)
|
||||
t.create_backup("", True, 12, False, False)
|
||||
|
||||
def test_restart_always_level(self):
|
||||
t = local.LocalStorage(None, None, skip_prepare=True)
|
||||
t.find_all = mock.Mock()
|
||||
t.find_all.return_value = []
|
||||
backup = base.Backup(None, "host_backup", 3000, tar_meta=True)
|
||||
t._find_previous_backup([backup], False, None, None, 10)
|
|
@ -63,10 +63,12 @@ class TestLocalStorage(unittest.TestCase):
|
|||
|
||||
def test_prepare(self):
|
||||
backup_dir, files_dir, work_dir = self.create_dirs()
|
||||
storage = local.LocalStorage(backup_dir, work_dir)
|
||||
storage = local.LocalStorage(backup_dir,
|
||||
work_dir,
|
||||
10000)
|
||||
storage.prepare()
|
||||
|
||||
def test_info(self):
|
||||
backup_dir, files_dir, work_dir = self.create_dirs()
|
||||
storage = local.LocalStorage(backup_dir, work_dir)
|
||||
storage = local.LocalStorage(backup_dir, work_dir, 10000)
|
||||
storage.info()
|
||||
|
|
|
@ -1,109 +0,0 @@
|
|||
# (c) Copyright 2014,2015 Hewlett-Packard Development Company, L.P.
|
||||
# (c) Copyright 2016 Hewlett-Packard Enterprise Development Company, L.P
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
import unittest
|
||||
|
||||
from freezer.openstack import osclients
|
||||
from freezer.storage import swift
|
||||
from freezer.storage import base
|
||||
|
||||
|
||||
class TestSwiftStorage(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
opts = osclients.OpenstackOpts.create_from_env().get_opts_dicts()
|
||||
self.storage = swift.SwiftStorage(
|
||||
osclients.OSClientManager(opts.pop('auth_url'),
|
||||
opts.pop('auth_method', 'password'),
|
||||
**opts
|
||||
),
|
||||
"freezer_ops-aw1ops1-gerrit0001.aw1.hpcloud.net",
|
||||
"/tmp/",
|
||||
100, skip_prepare=True
|
||||
)
|
||||
|
||||
self.files = [
|
||||
"tar_metadata_hostname_backup_1000_0",
|
||||
"hostname_backup_1000_0",
|
||||
]
|
||||
|
||||
self.increments = [
|
||||
"tar_metadata_hostname_backup_1000_0",
|
||||
"hostname_backup_1000_0",
|
||||
"tar_metadata_hostname_backup_2000_1",
|
||||
"hostname_backup_2000_1",
|
||||
]
|
||||
|
||||
self.cycles_increments = [
|
||||
"tar_metadata_hostname_backup_1000_0",
|
||||
"hostname_backup_1000_0",
|
||||
"tar_metadata_hostname_backup_2000_1",
|
||||
"hostname_backup_2000_1",
|
||||
"tar_metadata_hostname_backup_3000_0",
|
||||
"hostname_backup_3000_0",
|
||||
"tar_metadata_hostname_backup_4000_1",
|
||||
"hostname_backup_4000_1",
|
||||
]
|
||||
|
||||
self.backup = base.Backup(self.storage,
|
||||
"hostname_backup", 1000, tar_meta=True,)
|
||||
self.backup_2 = base.Backup(self.storage,
|
||||
"hostname_backup", 3000, tar_meta=True)
|
||||
self.increment = base.Backup(self.storage,
|
||||
"hostname_backup", 2000,
|
||||
full_backup=self.backup,
|
||||
level=1,
|
||||
tar_meta=True)
|
||||
self.increment_2 = base.Backup(self.storage,
|
||||
"hostname_backup", 4000,
|
||||
full_backup=self.backup_2,
|
||||
level=1,
|
||||
tar_meta=True)
|
||||
|
||||
def test__get_backups(self):
|
||||
backups = base.Backup.parse_backups(self.files, self.storage)
|
||||
self.assertEqual(1, len(backups))
|
||||
backup = backups[0]
|
||||
self.assertEqual(self.backup, backup)
|
||||
|
||||
def test__get_backups_with_tar_only(self):
|
||||
backups = base.Backup.parse_backups(
|
||||
["tar_metadata_hostname_backup_1000_0"], self.storage)
|
||||
self.assertEqual(0, len(backups))
|
||||
|
||||
def test__get_backups_without_tar(self):
|
||||
backups = base.Backup.parse_backups(["hostname_backup_1000_0"],
|
||||
self.storage)
|
||||
self.assertEqual(1, len(backups))
|
||||
self.backup.tar_meta = False
|
||||
backup = backups[0]
|
||||
self.assertEqual(self.backup, backup)
|
||||
|
||||
def test__get_backups_increment(self):
|
||||
backups = base.Backup.parse_backups(self.increments, self.storage)
|
||||
self.assertEqual(1, len(backups))
|
||||
self.backup.add_increment(self.increment)
|
||||
backup = backups[0]
|
||||
self.assertEqual(self.backup, backup)
|
||||
|
||||
def test__get_backups_increments(self):
|
||||
backups = base.Backup.parse_backups(self.cycles_increments,
|
||||
self.storage)
|
||||
self.assertEqual(2, len(backups))
|
||||
self.backup.add_increment(self.increment)
|
||||
self.backup_2.add_increment(self.increment_2)
|
||||
self.assertEqual(self.backup, backups[0])
|
||||
self.assertEqual(self.backup_2, backups[1])
|
Loading…
Reference in New Issue