From a2671d356ccaa0f413fb9ea9ba0e594da75835c0 Mon Sep 17 00:00:00 2001 From: eldar nugaev Date: Wed, 19 Aug 2015 13:53:34 +0100 Subject: [PATCH] First iteration of multiple storages. Unification storages and extraction backup engine. Implements : blueprint multiple-storage-backup-restore Change-Id: I0cc4edacabebbc7503401e7865ed8a48a5b3350d --- README.rst | 8 +- freezer/arguments.py | 4 +- freezer/backup.py | 32 ++-- freezer/engine/__init__.py | 0 freezer/engine/engine.py | 139 +++++++++++++++ freezer/engine/tar_engine.py | 181 +++++++++++++++++++ freezer/job.py | 24 +-- freezer/local.py | 153 ++++++++-------- freezer/main.py | 55 +++--- freezer/ssh.py | 139 ++++++++------- freezer/storage.py | 298 +++++++++++++++++--------------- freezer/streaming.py | 176 +++++++++++++++++++ freezer/swift.py | 266 +++++++--------------------- freezer/tar.py | 195 +++------------------ freezer/utils.py | 24 ++- freezer/validator.py | 5 + tests/commons.py | 7 +- tests/integration/__init__.py | 0 tests/integration/common.py | 67 ++++--- tests/integration/test_agent.py | 56 ++++-- tests/test_backup.py | 166 ------------------ tests/test_local.py | 98 ++--------- tests/test_storage.py | 138 +++++++-------- tests/test_swift.py | 24 ++- tests/test_tar.py | 63 +------ tests/test_tar_builders.py | 8 +- 26 files changed, 1143 insertions(+), 1183 deletions(-) create mode 100644 freezer/engine/__init__.py create mode 100644 freezer/engine/engine.py create mode 100644 freezer/engine/tar_engine.py create mode 100644 freezer/streaming.py create mode 100644 tests/integration/__init__.py delete mode 100644 tests/test_backup.py diff --git a/README.rst b/README.rst index 8198a4a8..66eeaa9b 100644 --- a/README.rst +++ b/README.rst @@ -292,9 +292,9 @@ Freezer can use: To use ssh storage specify "--storage ssh" And use "--container %path-to-folder-with-backups-on-remote-machine%" - Also you should specify ssh-user, ssh-key and ssh-host parameters. + Also you should specify ssh-username, ssh-key and ssh-host parameters. - ssh-user for user ubuntu should be "--ssh-user ubuntu" + ssh-username for user ubuntu should be "--ssh-username ubuntu" ssh-key should be path to your secret ssh key "--ssh-key %path-to-secret-key%" ssh-host can be ip of remote machine or resolvable dns name "--ssh-host 8.8.8.8" @@ -302,14 +302,14 @@ Freezer can use: $ sudo freezerc --file-to-backup /data/dir/to/backup --container /remote-machine-path/ --backup-name my-backup-name - --storage ssh --ssh-user ubuntu --ssh-key ~/.ssh/id_rsa + --storage ssh --ssh-username ubuntu --ssh-key ~/.ssh/id_rsa --ssh-host 8.8.8.8 Restore example:: $ sudo freezerc --action restore --restore-abs-pat /data/dir/to/backup --container /remote-machine-path/ --backup-name my-backup-name - --storage ssh --ssh-user ubuntu --ssh-key ~/.ssh/id_rsa + --storage ssh --ssh-username ubuntu --ssh-key ~/.ssh/id_rsa --ssh-host 8.8.8.8 Restore diff --git a/freezer/arguments.py b/freezer/arguments.py index 8cc1d6ef..f6ba2b75 100644 --- a/freezer/arguments.py +++ b/freezer/arguments.py @@ -49,7 +49,7 @@ DEFAULT_PARAMS = { 'max_segment_size': 67108864, 'lvm_srcvol': False, 'download_limit': -1, 'hostname': False, 'remove_from_date': False, 'restart_always_level': False, 'lvm_dirmount': False, - 'dst_file': False, 'dereference_symlink': 'none', + 'dst_file': False, 'dereference_symlink': '', 'restore_from_host': False, 'config': False, 'mysql_conf': False, 'insecure': False, 'lvm_snapname': False, 'lvm_snapperm': 'ro', 'max_priority': False, 'max_level': False, 'path_to_backup': False, @@ -299,7 +299,7 @@ def backup_arguments(args_dict={}): help=( "Follow hard and soft links and archive and dump the files they " " refer to. Default False."), - dest='dereference_symlink', default='none') + dest='dereference_symlink', default='') arg_parser.add_argument( '-U', '--upload', action='store_true', help="Upload to Swift the destination file passed to the -d option.\ diff --git a/freezer/backup.py b/freezer/backup.py index d53cb107..9190a722 100644 --- a/freezer/backup.py +++ b/freezer/backup.py @@ -27,7 +27,6 @@ import time from freezer import utils from freezer.lvm import lvm_snap, lvm_snap_remove, get_lvm_info -from freezer.tar import TarCommandBuilder from freezer.vss import vss_create_shadow_copy from freezer.vss import vss_delete_shadow_copy from freezer.winutils import start_sql_server @@ -66,7 +65,8 @@ def backup_mode_sql_server(backup_opt_dict): try: stop_sql_server(backup_opt_dict) - backup(backup_opt_dict, backup_opt_dict.storage) + backup(backup_opt_dict, backup_opt_dict.storage, + backup_opt_dict.engine) finally: if not backup_opt_dict.vssadmin: # if vssadmin is false, wait until the backup is complete @@ -137,7 +137,7 @@ def backup_mode_mysql(backup_opt_dict): raise Exception('[*] MySQL: {0}'.format(error)) # Execute backup - backup(backup_opt_dict, backup_opt_dict.storage) + backup(backup_opt_dict, backup_opt_dict.storage, backup_opt_dict.engine) def backup_mode_mongo(backup_opt_dict): @@ -161,7 +161,8 @@ def backup_mode_mongo(backup_opt_dict): mongo_primary = master_dict['primary'] if mongo_me == mongo_primary: - backup(backup_opt_dict, backup_opt_dict.storage) + backup(backup_opt_dict, backup_opt_dict.storage, + backup_opt_dict.engine) else: logging.warning('[*] localhost {0} is not Master/Primary,\ exiting...'.format(local_hostname)) @@ -290,12 +291,14 @@ def snapshot_remove(backup_opt_dict, shadow, windows_volume): lvm_snap_remove(backup_opt_dict) -def backup(backup_opt_dict, storage): +def backup(backup_opt_dict, storage, engine): """ :param backup_opt_dict: :param storage: :type storage: freezer.storage.Storage + :param engine: Backup Engine + :type engine: freezer.engine.engine.BackupEngine :return: """ backup_media = backup_opt_dict.backup_media @@ -314,30 +317,17 @@ def backup(backup_opt_dict, storage): filepath = os.path.basename(chdir_path) chdir_path = os.path.dirname(chdir_path) os.chdir(chdir_path) - builder = TarCommandBuilder(backup_opt_dict.tar_path, - filepath, - backup_opt_dict.compression) - builder.set_dereference(backup_opt_dict.dereference_symlink) - if backup_opt_dict.exclude: - builder.set_exclude(backup_opt_dict.exclude) - if backup_opt_dict.encrypt_pass_file: - builder.set_encryption( - backup_opt_dict.openssl_path, - backup_opt_dict.encrypt_pass_file) hostname_backup_name = backup_opt_dict.hostname_backup_name if not storage.is_ready(): storage.prepare() - incremental_backup = storage.find_previous_backup( + backup_instance = storage.create_backup( hostname_backup_name, backup_opt_dict.no_incremental, backup_opt_dict.max_level, backup_opt_dict.always_level, - backup_opt_dict.restart_always_level) - storage.backup( - backup_opt_dict.path_to_backup, hostname_backup_name, - builder, incremental_backup, + backup_opt_dict.restart_always_level, time_stamp=time_stamp) - + engine.backup(filepath, backup_instance) finally: snapshot_remove(backup_opt_dict, backup_opt_dict.shadow, backup_opt_dict.windows_volume) diff --git a/freezer/engine/__init__.py b/freezer/engine/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/freezer/engine/engine.py b/freezer/engine/engine.py new file mode 100644 index 00000000..9a853f23 --- /dev/null +++ b/freezer/engine/engine.py @@ -0,0 +1,139 @@ +""" +(c) Copyright 2014,2015 Hewlett-Packard Development Company, L.P. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +This product includes cryptographic software written by Eric Young +(eay@cryptsoft.com). This product includes software written by Tim +Hudson (tjh@cryptsoft.com). +======================================================================== + +Freezer general utils functions +""" +import logging + +from freezer import streaming +from freezer import utils + + +class BackupEngine(object): + """ + The main part of making a backup and making a restore is the mechanism of + implementing it. A long time Freezer had the only mechanism of doing it - + invoking gnutar and it was heavy hardcoded. + + Currently we are going to support many different approaches. + One of them is rsync. Having many different implementations requires to + have an abstraction level + + This class is an abstraction above all implementations. + + Workflow: + 1) invoke backup + 1.1) try to download metadata for incremental + 1.2) create a dataflow between backup_stream and storage.write_backup + Backup_stream is producer of data, for tar backup + it creates a gnutar subprocess and start to read data from stdout + Storage write_backup is consumer of data, it creates a thread + that store data in storage. + Both streams communicate in non-blocking mode + 1.3) invoke post_backup - now it uploads metadata file + 2) restore backup + 2.1) define all incremental backups + 2.2) for each incremental backup create a dataflow between + storage.read_backup and restore_stream + Read_backup is data producer, it reads data chunk by chunk from + the specified storage and pushes the chunks into a queue. + Restore stream is a consumer, that is actually does restore (for + tar it is a thread that creates gnutar subprocess and feeds chunks + to stdin of this thread. + + author: Eldar Nugaev + """ + @property + def main_storage(self): + """ + Currently it is storage for restore, we can have multiple storages and + do a parallel backup on them, but when we are doing a restore, we need + to have one specified storage. + + PS. Should be changed to select the most up-to-date storage from + existing ones + :rtype: freezer.storage.Storage + :return: + """ + raise NotImplementedError("Should have implemented this") + + def backup_stream(self, backup_path, rich_queue, manifest_path): + """ + :param rich_queue: + :type rich_queue: freezer.streaming.RichQueue + :param manifest_path: + :return: + """ + rich_queue.put_messages(self.backup_data(backup_path, manifest_path)) + + def backup(self, backup_path, backup): + """ + Here we now location of all interesting artifacts like metadata + Should return stream for storing data. + :return: stream + """ + manifest = self.main_storage.download_meta_file(backup) + streaming.stream( + self.backup_stream, + {"backup_path": backup_path, "manifest_path": manifest}, + self.main_storage.write_backup, {"backup": backup}) + self.post_backup(backup, manifest) + + def post_backup(self, backup, manifest_file): + """ + Uploading manifest, cleaning temporary files + :return: + """ + raise NotImplementedError("Should have implemented this") + + def restore(self, backup, restore_path): + """ + :type backup: freezer.storage.Backup + """ + logging.info("Creation restore path: {0}".format(restore_path)) + utils.create_dir_tree(restore_path) + logging.info("Creation restore path completed") + for level in range(0, backup.level + 1): + b = backup.full_backup.increments[level] + logging.info("Restore backup {0}".format(b)) + streaming.stream( + self.main_storage.read_backup, {"backup": b}, + self.restore_stream, {"restore_path": restore_path}) + logging.info( + '[*] Restore execution successfully executed \ + for backup name {0}'.format(backup)) + + def backup_data(self, backup_path, manifest_path): + """ + :param backup_path: + :param manifest_path: + :return: + """ + raise NotImplementedError("Should have implemented this") + + def restore_stream(self, restore_path, rich_queue): + """ + :param restore_path: + :type restore_path: str + :param rich_queue: + :type rich_queue: freezer.streaming.RichQueue + :return: + """ + raise NotImplementedError("Should have implemented this") diff --git a/freezer/engine/tar_engine.py b/freezer/engine/tar_engine.py new file mode 100644 index 00000000..e6e2a0c4 --- /dev/null +++ b/freezer/engine/tar_engine.py @@ -0,0 +1,181 @@ +""" +(c) Copyright 2014,2015 Hewlett-Packard Development Company, L.P. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +This product includes cryptographic software written by Eric Young +(eay@cryptsoft.com). This product includes software written by Tim +Hudson (tjh@cryptsoft.com). +======================================================================== + +Freezer general utils functions +""" +import logging +import os +import subprocess +import threading +import time + +from freezer.engine import engine +from freezer import tar +from freezer import streaming + + +class TarBackupEngine(engine.BackupEngine): + DEFAULT_CHUNK_SIZE = 20000000 + + def __init__( + self, gnutar_path, compression_algo, dereference_symlink, + exclude, main_storage, is_windows, open_ssl_path=None, + encrypt_pass_file=None, dry_run=False, + chunk_size=DEFAULT_CHUNK_SIZE): + self.gnutar_path = gnutar_path + self.compression_algo = compression_algo + self.open_ssl_path = open_ssl_path + self.encrypt_pass_file = encrypt_pass_file + self.dereference_symlink = dereference_symlink + self.exclude = exclude + self._main_storage = main_storage + self.is_windows = is_windows + self.dry_run = dry_run + self.chunk_size = chunk_size + + @property + def main_storage(self): + """ + :rtype: freezer.storage.Storage + :return: + """ + return self._main_storage + + @staticmethod + def reader(rich_queue, read_pipe, size=DEFAULT_CHUNK_SIZE): + """ + :param rich_queue: + :type rich_queue: freezer.streaming.RichQueue + :type + :return: + """ + while True: + tar_chunk = read_pipe.read(size) + if tar_chunk == '': + break + if tar_chunk: + rich_queue.put(tar_chunk) + logging.info("reader finished") + rich_queue.finish() + + @staticmethod + def writer(rich_queue, write_pipe): + """ + :param rich_queue: + :type rich_queue: freezer.streaming.RichQueue + :type + :return: + """ + for message in rich_queue.get_messages(): + logging.debug("Write next chunk to tar stdin") + write_pipe.write(message) + + def post_backup(self, backup, manifest): + self.main_storage.upload_meta_file(backup, manifest) + + def backup_data(self, backup_path, manifest_path): + logging.info("Tar engine backup stream enter") + tar_command = tar.TarCommandBuilder( + self.gnutar_path, backup_path, self.compression_algo, + self.is_windows) + if self.open_ssl_path: + tar_command.set_encryption(self.open_ssl_path, + self.encrypt_pass_file) + if self.dereference_symlink: + tar_command.set_dereference(self.dereference_symlink) + tar_command.set_exclude(self.exclude) + tar_command.set_listed_incremental(manifest_path) + + command = tar_command.build() + + logging.info("Execution command: \n{}".format(command)) + + tar_queue = streaming.RichQueue() + tar_process = subprocess.Popen(command, stdout=subprocess.PIPE, + stderr=subprocess.PIPE, shell=True) + + reader = threading.Thread(target=self.reader, + args=(tar_queue, tar_process.stdout)) + reader.daemon = True + reader.start() + while tar_queue.has_more(): + try: + yield tar_queue.get() + except streaming.Wait: + pass + logging.info("Tar engine streaming end") + + def restore_stream(self, restore_path, rich_queue): + """ + :param restore_path: + :type restore_path: str + :param rich_queue: + :type rich_queue: freezer.streaming.RichQueue + :return: + """ + + tar_command = tar.TarCommandRestoreBuilder( + self.gnutar_path, restore_path, self.compression_algo, + self.is_windows) + + if self.open_ssl_path: + tar_command.set_encryption(self.open_ssl_path, + self.encrypt_pass_file) + + if self.dry_run: + tar_command.set_dry_run() + + command = tar_command.build() + logging.info("Execution restore command: \n{}".format(command)) + + tar_process = subprocess.Popen( + command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, + stderr=subprocess.PIPE, shell=True, close_fds=True) + if self.is_windows: + # on windows, chdir to restore path. + os.chdir(restore_path) + + writer = threading.Thread(target=self.writer, + args=(rich_queue, tar_process.stdin)) + writer.daemon = True + writer.start() + error_queue = streaming.RichQueue(size=2000) + # error buffer size should be small to detect error + reader = threading.Thread(target=self.reader, + args=(error_queue, tar_process.stderr, 10)) + reader.daemon = True + reader.start() + + while writer.is_alive() and not tar_process.poll() \ + and error_queue.empty(): + # here I know that tar_process is still running and I have no + # exceptions so far. So I need just wait + # I understand that sleep here means block of main thread, and + # may make it irresponsible. So I provide here very small timeout + time.sleep(1) + + res = [] + while not error_queue.empty(): + res.append(error_queue.get()) + if res: + tar_err = "".join(res) + logging.exception('[*] Restore error: {0}'.format(tar_err)) + rich_queue.force_stop() + raise Exception('[*] Restore error: {0}'.format(tar_err)) diff --git a/freezer/job.py b/freezer/job.py index 0e5f561b..ebd26a05 100644 --- a/freezer/job.py +++ b/freezer/job.py @@ -37,11 +37,13 @@ import logging class Job: """ :type storage: freezer.storage.Storage + :type engine: freezer.engine.engine.BackupEngine """ def __init__(self, conf_dict): self.conf = conf_dict self.storage = conf_dict.storage + self.engine = conf_dict.engine def execute(self): logging.info('[*] Action not implemented') @@ -85,7 +87,7 @@ class BackupJob(Job): self.conf.storage.prepare() if self.conf.mode == 'fs': - backup.backup(self.conf, self.storage) + backup.backup(self.conf, self.storage, self.engine) elif self.conf.mode == 'mongo': backup.backup_mode_mongo(self.conf) elif self.conf.mode == 'mysql': @@ -128,29 +130,21 @@ class RestoreJob(Job): if conf.restore_from_date: restore_timestamp = utils.date_to_timestamp(conf.restore_from_date) if conf.backup_media == 'fs': - storage = conf.storage - builder = tar.TarCommandRestoreBuilder(conf.tar_path, - restore_abs_path, - conf.compression) + builder = tar.TarCommandRestoreBuilder( + conf.tar_path, restore_abs_path, conf.compression, + winutils.is_windows()) if conf.dry_run: builder.set_dry_run() if winutils.is_windows(): - builder.set_windows() os.chdir(conf.restore_abs_path) if conf.encrypt_pass_file: builder.set_encryption(conf.openssl_path, conf.encrypt_pass_file) - if restore_timestamp: - storage.restore_from_date(conf.hostname_backup_name, - restore_abs_path, - builder, - restore_timestamp) - else: - storage.restore_latest(conf.hostname_backup_name, - restore_abs_path, - builder) + backup = self.storage.find_one(conf.hostname_backup_name, + restore_timestamp) + self.engine.restore(backup, restore_abs_path) return res = restore.RestoreOs(conf.client_manager, conf.container) diff --git a/freezer/local.py b/freezer/local.py index a45448c3..56e667b7 100644 --- a/freezer/local.py +++ b/freezer/local.py @@ -18,50 +18,75 @@ This product includes cryptographic software written by Eric Young Hudson (tjh@cryptsoft.com). """ -import subprocess -import logging import os import shutil +import io +import logging from freezer import storage from freezer import utils class LocalStorage(storage.Storage): + DEFAULT_CHUNK_SIZE = 20000000 - def prepare(self): - utils.create_dir(self.storage_directory) - - def get_backups(self): - backup_names = os.listdir(self.storage_directory) - backups = [] - for backup_name in backup_names: - backup_dir = self.storage_directory + "/" + backup_name - timestamps = os.listdir(backup_dir) - for timestamp in timestamps: - increments = os.listdir(backup_dir + "/" + timestamp) - backups.extend(self._get_backups(increments)) - return backups - - def __init__(self, storage_directory): + def __init__(self, storage_directory, work_dir, + chunk_size=DEFAULT_CHUNK_SIZE): """ :param storage_directory: directory of storage :type storage_directory: str :return: """ self.storage_directory = storage_directory + self.work_dir = work_dir + self.chunk_size = chunk_size + + def download_meta_file(self, backup): + """ + :type backup: freezer.storage.Backup + :param backup: + :return: + """ + utils.create_dir(self.work_dir) + if backup.level == 0: + return utils.joined_path(self.work_dir, backup.tar()) + meta_backup = backup.full_backup.increments[backup.level - 1] + zero_backup = self._zero_backup_dir(backup) + to_path = utils.joined_path(self.work_dir, meta_backup.tar()) + if os.path.exists(to_path): + os.remove(to_path) + from_path = utils.joined_path(zero_backup, meta_backup.tar()) + shutil.copyfile(from_path, to_path) + return to_path + + def upload_meta_file(self, backup, meta_file): + zero_backup = self._zero_backup_dir(backup) + to_path = utils.joined_path(zero_backup, backup.tar()) + shutil.copyfile(meta_file, to_path) + + def prepare(self): + utils.create_dir(self.storage_directory) + + def get_backups(self): + backup_names = os.listdir(self.storage_directory) + logging.info(backup_names) + backups = [] + for backup_name in backup_names: + backup_dir = utils.joined_path(self.storage_directory, backup_name) + timestamps = os.listdir(backup_dir) + for timestamp in timestamps: + increments = \ + os.listdir(utils.joined_path(backup_dir, timestamp)) + backups.extend(storage.Backup.parse_backups(increments)) + logging.info(backups) + return backups def info(self): pass - def _backup_dir(self, backup): - """ - :param backup: - :type backup: freezer.storage.Backup - :return: - """ - return "{0}/{1}".format(self.storage_directory, - backup.hostname_backup_name) + def backup_to_file_path(self, backup): + return utils.joined_path(self._zero_backup_dir(backup), + backup) def _zero_backup_dir(self, backup): """ @@ -69,47 +94,9 @@ class LocalStorage(storage.Storage): :type backup: freezer.storage.Backup :return: """ - return "{0}/{1}".format(self._backup_dir(backup), backup.timestamp) - - def backup(self, path, hostname_backup_name, tar_builder, - parent_backup=None, time_stamp=None): - """ - Backup path - storage_dir/backup_name/timestamp/backup_name_timestamps_level - :param path: - :param hostname_backup_name: - :param tar_builder: - :type tar_builder: freezer.tar.TarCommandBuilder - :param parent_backup: - :type parent_backup: freezer.storage.Backup - :return: - """ - new_backup = self._create_backup(hostname_backup_name, parent_backup, - time_stamp) - - host_backups = self._backup_dir(new_backup) - utils.create_dir(host_backups) - - if parent_backup: - zero_backup = self._zero_backup_dir(parent_backup.parent) - else: - zero_backup = self._zero_backup_dir(new_backup) - utils.create_dir(zero_backup) - tar_builder.set_output_file("{0}/{1}".format(zero_backup, - new_backup.repr())) - - tar_incremental = "{0}/{1}".format(zero_backup, new_backup.tar()) - if parent_backup: - shutil.copyfile("{0}/{1}".format( - zero_backup, parent_backup.tar()), tar_incremental) - - tar_builder.set_listed_incremental(tar_incremental) - - logging.info('[*] Changing current working directory to: {0}' - .format(path)) - logging.info('[*] Backup started for: {0}'.format(path)) - - subprocess.check_output(tar_builder.build(), shell=True) + backup_dir = utils.joined_path( + self.storage_directory, backup.hostname_backup_name) + return utils.joined_path(backup_dir, backup.full_backup.timestamp) def is_ready(self): return os.path.isdir(self.storage_directory) @@ -121,17 +108,27 @@ class LocalStorage(storage.Storage): """ shutil.rmtree(self._zero_backup_dir(backup)) - def restore(self, backup, path, tar_builder): + def backup_blocks(self, backup): + filename = self.backup_to_file_path(backup) + with io.open(filename, 'rb') as backup_file: + while True: + chunk = backup_file.read(self.chunk_size) + if chunk == '': + break + if len(chunk): + yield chunk + + def write_backup(self, rich_queue, backup): """ - :param backup: - :type backup: freezer.storage.Backup - :param path: - :param tar_builder: - :type tar_builder: freezer.tar.TarCommandRestoreBuilder - :return: + Upload object on the remote swift server + :type rich_queue: freezer.streaming.RichQueue + :type backup: SwiftBackup """ - zero_dir = self._zero_backup_dir(backup.parent) - for level in range(0, backup.level + 1): - c_backup = backup.parent.increments[level] - tar_builder.set_archive(zero_dir + "/" + c_backup.repr()) - subprocess.check_output(tar_builder.build(), shell=True) + filename = self.backup_to_file_path(backup) + logging.info("Local storage write backup enter") + if backup.level == 0: + os.makedirs(os.path.dirname(filename)) + with io.open(filename, 'wb', buffering=self.chunk_size) as b_file: + for message in rich_queue.get_messages(): + b_file.write(message) + logging.info("Local storage write backup successfully finished") diff --git a/freezer/main.py b/freezer/main.py index 29465231..bc3dcca1 100644 --- a/freezer/main.py +++ b/freezer/main.py @@ -20,6 +20,12 @@ Hudson (tjh@cryptsoft.com). Freezer main execution function """ +import os +import subprocess +import logging +import sys +import json + from freezer.bandwidth import monkeypatch_socket_bandwidth from freezer import job from freezer.arguments import backup_arguments @@ -28,12 +34,9 @@ from freezer import swift from freezer import local from freezer import ssh from freezer import utils -from freezer.utils import create_dir -import os -import subprocess -import logging -import sys -import json +from freezer.engine import tar_engine +from freezer import winutils + # Initialize backup options from freezer.validator import Validator @@ -55,7 +58,7 @@ def freezer_main(args={}): def configure_logging(file_name): expanded_file_name = os.path.expanduser(file_name) expanded_dir_name = os.path.dirname(expanded_file_name) - create_dir(expanded_dir_name, do_log=False) + utils.create_dir(expanded_dir_name, do_log=False) logging.basicConfig( filename=expanded_file_name, level=logging.INFO, @@ -88,7 +91,7 @@ def freezer_main(args={}): u'{0}'.format(backup_args.ionice), u'-c', u'1', u'-n', u'0', u'-t', u'-p', u'{0}'.format(PID) - ]) + ]) except Exception as priority_error: logging.warning('[*] Priority: {0}'.format(priority_error)) @@ -133,29 +136,33 @@ def freezer_main(args={}): swift_auth_version=identity_api_version, dry_run=backup_args.dry_run) - backup_args.__dict__['storage'] = swift.SwiftStorage( - client_manager, - backup_args.container, - backup_args.work_dir, + storage = swift.SwiftStorage( + client_manager, backup_args.container, backup_args.work_dir, backup_args.max_segment_size) backup_args.__dict__['client_manager'] = client_manager elif backup_args.storage == "local": - backup_args.__dict__['storage'] = \ - local.LocalStorage(backup_args.container) + storage = local.LocalStorage(backup_args.container, + backup_args.work_dir) elif backup_args.storage == "ssh": - if not (backup_args.ssh_key and backup_args.ssh_username and - backup_args.ssh_host): - raise Exception("Please provide ssh_key, " - "ssh_username and ssh_host") - backup_args.__dict__['storage'] = \ - ssh.SshStorage(backup_args.container, - backup_args.work_dir, - backup_args.ssh_key, - backup_args.ssh_username, - backup_args.ssh_host) + storage = ssh.SshStorage( + backup_args.container, backup_args.work_dir, + backup_args.ssh_key, backup_args.ssh_username, + backup_args.ssh_host) else: raise Exception("Not storage found for name " + backup_args.storage) + backup_args.__dict__['storage'] = storage + backup_args.__dict__['engine'] = tar_engine.TarBackupEngine( + backup_args.tar_path, + backup_args.compression, + backup_args.dereference_symlink, + backup_args.exclude, + storage, + winutils.is_windows(), + backup_args.openssl_path, + backup_args.encrypt_pass_file, + backup_args.dry_run) + freezer_job = job.create_job(backup_args) freezer_job.execute() diff --git a/freezer/ssh.py b/freezer/ssh.py index 303b3564..52031a81 100644 --- a/freezer/ssh.py +++ b/freezer/ssh.py @@ -18,23 +18,25 @@ This product includes cryptographic software written by Eric Young Hudson (tjh@cryptsoft.com). """ -import subprocess -import logging import os import stat +import logging + import paramiko - from freezer import storage +from freezer import utils class SshStorage(storage.Storage): """ - :type ssh: paramiko.SSHClient :type ftp: paramiko.SFTPClient """ + DEFAULT_CHUNK_SIZE = 10000000 + def __init__(self, storage_directory, work_dir, - ssh_key_path, remote_username, remote_ip): + ssh_key_path, remote_username, remote_ip, + chunk_size=DEFAULT_CHUNK_SIZE): """ :param storage_directory: directory of storage :type storage_directory: str @@ -45,14 +47,16 @@ class SshStorage(storage.Storage): self.remote_ip = remote_ip self.storage_directory = storage_directory self.work_dir = work_dir + self.chunk_size = chunk_size ssh = paramiko.SSHClient() # automatically add keys without requiring human intervention ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(remote_ip, username=remote_username, key_filename=ssh_key_path) + # we should keep link to ssh to prevent garbage collection self.ssh = ssh - self.ftp = ssh.open_sftp() + self.ftp = self.ssh.open_sftp() def prepare(self): if not self.is_ready(): @@ -62,24 +66,30 @@ class SshStorage(storage.Storage): backup_names = self.ftp.listdir(self.storage_directory) backups = [] for backup_name in backup_names: - backup_dir = self.storage_directory + "/" + backup_name + backup_dir = utils.joined_path(self.storage_directory, backup_name) timestamps = self.ftp.listdir(backup_dir) for timestamp in timestamps: - increments = self.ftp.listdir(backup_dir + "/" + timestamp) - backups.extend(self._get_backups(increments)) + increments = self.ftp.listdir( + utils.joined_path(backup_dir, timestamp)) + backups.extend(storage.Backup.parse_backups(increments)) return backups def info(self): pass + def backup_dir(self, backup): + return "{0}{1}{2}".format(self._zero_backup_dir(backup), + os.sep, backup) + def _backup_dir(self, backup): """ :param backup: :type backup: freezer.storage.Backup :return: """ - return "{0}/{1}".format(self.storage_directory, - backup.hostname_backup_name) + return "{0}{1}{2}".format(self.storage_directory, + os.sep, + backup.hostname_backup_name) def _zero_backup_dir(self, backup): """ @@ -87,46 +97,9 @@ class SshStorage(storage.Storage): :type backup: freezer.storage.Backup :return: """ - return "{0}/{1}".format(self._backup_dir(backup), backup.timestamp) - - def backup(self, path, hostname_backup_name, tar_builder, - parent_backup=None, time_stamp=None): - """ - Backup path - storage_dir/backup_name/timestamp/backup_name_timestamps_level - :param path: - :param hostname_backup_name: - :param tar_builder: - :type tar_builder: freezer.tar.TarCommandBuilder - :param parent_backup: - :type parent_backup: freezer.storage.Backup - :return: - """ - new_backup = self._create_backup(hostname_backup_name, parent_backup, - time_stamp) - - if parent_backup: - zero_backup = self._zero_backup_dir(parent_backup.parent) - else: - zero_backup = self._zero_backup_dir(new_backup) - self.mkdir_p(zero_backup) - output_file = "{0}/{1}".format(zero_backup, new_backup.repr()) - tar_builder.set_output_file(output_file) - tar_builder.set_ssh(self.ssh_key_path, self.remote_username, - self.remote_ip) - tar_incremental = "{0}/{1}".format(self.work_dir, new_backup.tar()) - if parent_backup: - self.ftp.get(zero_backup + "/" + parent_backup.tar(), - tar_incremental) - - tar_builder.set_listed_incremental(tar_incremental) - - logging.info('[*] Changing current working directory to: {0}' - .format(path)) - logging.info('[*] Backup started for: {0}'.format(path)) - - subprocess.check_output(tar_builder.build(), shell=True) - self.ftp.put(tar_incremental, zero_backup + "/" + new_backup.tar()) + return "{0}{1}{2}".format(self._backup_dir(backup.full_backup), + os.sep, + backup.full_backup.timestamp) def _is_dir(self, check_dir): return stat.S_IFMT(self.ftp.stat(check_dir).st_mode) == stat.S_IFDIR @@ -166,6 +139,32 @@ class SshStorage(storage.Storage): self.ftp.chdir(basename) return True + def download_meta_file(self, backup): + """ + :type backup: freezer.storage.Backup + :param backup: + :return: + """ + utils.create_dir(self.work_dir) + if backup.level == 0: + return "{0}{1}{2}".format(self.work_dir, os.sep, backup.tar()) + meta_backup = backup.full_backup.increments[backup.level - 1] + zero_backup = self._zero_backup_dir(backup) + from_path = "{0}{1}{2}".format(zero_backup, os.sep, meta_backup.tar()) + to_path = "{0}{1}{2}".format(self.work_dir, os.sep, meta_backup.tar()) + if backup.level != 0: + if os.path.exists(to_path): + os.remove(to_path) + self.ftp.get(from_path, to_path) + return to_path + + def upload_meta_file(self, backup, meta_file): + zero_backup = self._zero_backup_dir(backup) + to_path = "{0}{1}{2}".format(zero_backup, os.sep, backup.tar()) + logging.info("Ssh storage uploading {0} to {1}".format(meta_file, + to_path)) + self.ftp.put(meta_file, to_path) + def remove_backup(self, backup): """ :type backup: freezer.storage.Backup @@ -173,19 +172,29 @@ class SshStorage(storage.Storage): """ self.rm(self._zero_backup_dir(backup)) - def restore(self, backup, path, tar_builder): + def backup_blocks(self, backup): + filename = self.backup_dir(backup) + with self.ftp.open(filename, mode='rb', + bufsize=self.chunk_size) as backup_file: + while True: + chunk = backup_file.read(self.chunk_size) + if chunk == '': + break + if len(chunk): + yield chunk + + def write_backup(self, rich_queue, backup): """ - :param backup: - :param path: - :param tar_builder: - :type tar_builder: freezer.tar.TarCommandRestoreBuilder - :return: + Upload object on the remote swift server + :type rich_queue: freezer.streaming.RichQueue + :type backup: SwiftBackup """ - zero_dir = self._zero_backup_dir(backup.parent) - for level in range(0, backup.level + 1): - c_backup = backup.parent.increments[level] - tar_builder.set_archive(zero_dir + "/" + c_backup.repr()) - tar_builder.set_ssh(self.ssh_key_path, - self.remote_username, - self.remote_ip) - subprocess.check_output(tar_builder.build(), shell=True) + filename = self.backup_dir(backup) + self.mkdir_p(self._zero_backup_dir(backup)) + logging.info("SSH write backup enter") + + with self.ftp.open(filename, mode='wb', + bufsize=self.chunk_size) as b_file: + logging.debug("SSH write backup getting chunk") + for message in rich_queue.get_messages(): + b_file.write(message) diff --git a/freezer/storage.py b/freezer/storage.py index 2349bb34..606f19c3 100644 --- a/freezer/storage.py +++ b/freezer/storage.py @@ -19,9 +19,10 @@ Hudson (tjh@cryptsoft.com). """ import re -import utils import logging +from freezer import utils + class Storage(object): """ @@ -29,9 +30,30 @@ class Storage(object): class. """ + def download_meta_file(self, backup): + raise NotImplementedError("Should have implemented this") + + def upload_meta_file(self, backup, meta_file): + raise NotImplementedError("Should have implemented this") + + def backup_blocks(self, backup): + raise NotImplementedError("Should have implemented this") + + def read_backup(self, rich_queue, backup): + """ + :param rich_queue: + :type rich_queue: freezer.streaming.RichQueue + :param backup: + :type backup: freezer.storage.Backup + :return: + """ + rich_queue.put_messages(self.backup_blocks(backup)) + + def write_backup(self, rich_queue, backup): + raise NotImplementedError("Should have implemented this") + def is_ready(self): """ - :rtype: bool :return: """ @@ -44,7 +66,7 @@ class Storage(object): """ raise NotImplementedError("Should have implemented this") - def find(self, hostname_backup_name): + def find_all(self, hostname_backup_name): """ Gets backups by backup_name and hostname :param hostname_backup_name: @@ -55,71 +77,32 @@ class Storage(object): return [b for b in self.get_backups() if b.hostname_backup_name == hostname_backup_name] - def get_backups(self): - raise NotImplementedError("Should have implemented this") - - def backup(self, path, hostname_backup_name, tar_builder, - parent_backup=None, time_stamp=None): - """ - Implements backup of path directory. - - :type path: str - :type hostname_backup_name: str - :type tar_builder: freezer.tar.TarCommandBuilder - :param parent_backup: Can be None. - Previous backup for incremental update. - :type parent_backup: Backup - """ - raise NotImplementedError("Should have implemented this") - - def restore(self, backup, path, tar_builder): - """ - - :param backup: - :param path: - :param tar_builder: - :type tar_builder: freezer.tar.TarCommandRestoreBuilder - :return: - """ - raise NotImplementedError("Should have implemented this") - - def restore_from_date(self, hostname_backup_name, path, tar_builder, - restore_timestamp): + def find_one(self, hostname_backup_name, recent_to_date=None): """ :param hostname_backup_name: :type hostname_backup_name: str - :param restore_timestamp: - :type restore_timestamp: int - :param path: - :type path: str - :param tar_builder: - :type tar_builder: freezer.tar.TarCommandRestoreBuilder + :param recent_to_date: + :type recent_to_date: int + :rtype: Backup :return: """ - - backups = self.find(hostname_backup_name) + backups = self.find_all(hostname_backup_name) + if recent_to_date: + backups = [b for b in backups + if b.timestamp <= recent_to_date] + err_msg = '[*] No matching backup name "{0}" found'\ + .format(hostname_backup_name) if not backups: - raise Exception("[*] No backups found") - backups = [b for b in backups - if b.timestamp <= restore_timestamp] - if not backups: - raise ValueError('No matching backup name {0} found' - .format(hostname_backup_name)) + raise IndexError(err_msg) backup = max(backups, key=lambda b: b.timestamp) last_increments = backup.increments.values() - last_increments = [x for x in last_increments - if x.timestamp <= restore_timestamp] - last_increment = max(last_increments, key=lambda x: x.timestamp) - self.restore(last_increment, path, tar_builder) + if recent_to_date: + last_increments = [x for x in last_increments + if x.timestamp <= recent_to_date] + return max(last_increments, key=lambda x: x.timestamp) - def restore_latest(self, hostname_backup_name, path, tar_builder): - backups = self.find(hostname_backup_name) - if not backups: - raise ValueError('No matching backup name {0} found' - .format(hostname_backup_name)) - backup = max(backups, key=lambda b: b.latest_update.timestamp)\ - .latest_update - self.restore(backup, path, tar_builder) + def get_backups(self): + raise NotImplementedError("Should have implemented this") def remove_backup(self, backup): """ @@ -135,7 +118,7 @@ class Storage(object): :type remove_older_timestamp: int :type hostname_backup_name: str """ - backups = self.find(hostname_backup_name) + backups = self.find_all(hostname_backup_name) backups = [b for b in backups if b.latest_update.timestamp < remove_older_timestamp] for b in backups: @@ -144,67 +127,21 @@ class Storage(object): def info(self): raise NotImplementedError("Should have implemented this") - @staticmethod - def _create_backup(name, backup=None, time_stamp=None): - """ - :param name: - :type name: str - :param backup: - :type backup: Backup - :rtype: Backup - :return: - """ - return Backup(name, time_stamp or utils.DateTime.now().timestamp, - backup.latest_update.level + 1 if backup else 0) - - @staticmethod - def _get_backups(names): - """ - No side effect version of get_backups - :param names: - :type names: list[str] - file names of backups. - File name should be something like that host_backup_timestamp_level - :rtype: list[Backup] - :return: list of zero level backups - """ - prefix = 'tar_metadata_' - tar_names = set([x[len(prefix):] - for x in names if x.startswith(prefix)]) - backup_names = [x for x in names if not x.startswith(prefix)] - backups = [] - """:type: list[Backup]""" - for name in backup_names: - try: - backup = Backup.parse(name) - backup.tar_meta = name in tar_names - backups.append(backup) - except Exception as e: - logging.exception(e) - logging.error("cannot parse swift backup name: {0}" - .format(name)) - backups.sort(key=lambda x: (x.timestamp, x.level)) - zero_backups = [] - last_backup = None - - """:type last_backup: freezer.storage.Backup""" - for backup in backups: - if backup.level == 0: - zero_backups.append(backup) - last_backup = backup - else: - if last_backup: - last_backup.add_increment(backup) - else: - logging.error("Incremental backup without parent: {0}" - .format(backup.repr())) - - return zero_backups - - def find_previous_backup(self, hostname_backup_name, no_incremental, - max_level, always_level, restart_always_level): - backups = self.find(hostname_backup_name) - return self._find_previous_backup(backups, no_incremental, max_level, - always_level, restart_always_level) + def create_backup(self, hostname_backup_name, no_incremental, + max_level, always_level, restart_always_level, + time_stamp=None): + backups = self.find_all(hostname_backup_name) + prev_backup = self._find_previous_backup( + backups, no_incremental, max_level, always_level, + restart_always_level) + if prev_backup: + return Backup( + hostname_backup_name, + time_stamp or utils.DateTime.now().timestamp, + prev_backup.level + 1, prev_backup.full_backup) + else: + return Backup(hostname_backup_name, + time_stamp or utils.DateTime.now().timestamp) @staticmethod def _find_previous_backup(backups, no_incremental, max_level, always_level, @@ -257,38 +194,38 @@ class Backup: """ PATTERN = r'(.*)_(\d+)_(\d+?)$' - def __init__(self, hostname_backup_name, timestamp, level, tar_meta=False): + def __init__(self, hostname_backup_name, timestamp, level=0, + full_backup=None, tar_meta=False): """ - :param hostname_backup_name: name (hostname_backup_name) of backup :type hostname_backup_name: str :param timestamp: timestamp of backup (when it was executed) :type timestamp: int - :param level: level of backup (freezer supports incremental backup) - Completed full backup has level 0 and can be restored without any - additional information. - Levels 1, 2, ... means that our backup is incremental and contains - only smart portion of information (that was actually changed - since the last backup) + :param full_backup: Previous full_backup - not increment + :type full_backup: Backup + :param level: current incremental level of backup :type level: int :param tar_meta: Is backup has or has not an attached meta tar file in storage. Default = False :type tar_meta: bool :return: """ - if not isinstance(level, int): - raise ValueError("Level should have type int") + if level == 0 and full_backup: + raise ValueError("Detected incremental backup without level") self.hostname_backup_name = hostname_backup_name self._timestamp = timestamp self.tar_meta = tar_meta - self._level = level self._increments = {0: self} self._latest_update = self - self._parent = self + self._level = level + if not full_backup: + self._full_backup = self + else: + self._full_backup = full_backup @property - def parent(self): - return self._parent + def full_backup(self): + return self._full_backup @property def timestamp(self): @@ -307,7 +244,7 @@ class Backup: return self._latest_update def tar(self): - return "tar_metadata_{0}".format(self.repr()) + return "tar_metadata_{0}".format(self) def add_increment(self, increment): """ @@ -320,7 +257,6 @@ class Backup: raise ValueError("Can not add increment to increment") if increment.level == 0: raise ValueError("Can not add increment with level 0") - increment._parent = self if (increment.level not in self._increments or increment.timestamp > self._increments[increment.level].timestamp): @@ -328,12 +264,59 @@ class Backup: if self.latest_update.level <= increment.level: self._latest_update = increment - def repr(self): + def __repr__(self): return '_'.join([self.hostname_backup_name, repr(self._timestamp), repr(self._level)]) + def __str__(self): + return self.__repr__() + @staticmethod - def parse(value): + def parse_backups(names): + """ + No side effect version of get_backups + :param names: + :type names: list[str] - file names of backups. + File name should be something like that host_backup_timestamp_level + :rtype: list[Backup] + :return: list of zero level backups + """ + prefix = 'tar_metadata_' + tar_names = set([x[len(prefix):] + for x in names if x.startswith(prefix)]) + backup_names = [x for x in names if not x.startswith(prefix)] + backups = [] + """:type: list[freezer.storage.BackupRepr]""" + for name in backup_names: + try: + backup = Backup._parse(name) + backup.tar_meta = name in tar_names + backups.append(backup) + except Exception as e: + logging.exception(e) + logging.error("cannot parse backup name: {0}" + .format(name)) + backups.sort(key=lambda x: (x.timestamp, x.level)) + zero_backups = [] + """:type: list[freezer.storage.Backup]""" + last_backup = None + + """:type last_backup: freezer.storage.Backup""" + for backup in backups: + if backup.level == 0: + last_backup = backup.backup() + zero_backups.append(last_backup) + else: + if last_backup: + last_backup.add_increment(backup.backup(last_backup)) + else: + logging.error("Incremental backup without parent: {0}" + .format(backup)) + + return zero_backups + + @staticmethod + def _parse(value): """ :param value: String representation of backup :type value: str @@ -342,7 +325,8 @@ class Backup: match = re.search(Backup.PATTERN, value, re.I) if not match: raise ValueError("Cannot parse backup from string: " + value) - return Backup(match.group(1), int(match.group(2)), int(match.group(3))) + return BackupRepr(match.group(1), int(match.group(2)), + int(match.group(3))) def __eq__(self, other): if self is other: @@ -353,3 +337,37 @@ class Backup: self.tar_meta == other.tar_meta and \ self._level == other.level and \ len(self.increments) == len(other.increments) + + +class BackupRepr: + """ + Intermediate for parsing purposes - it parsed backup name. + Difference between Backup and BackupRepr - backupRepr can be parsed from + str and doesn't require information about full_backup + """ + def __init__(self, hostname_backup_name, timestamp, level, tar_meta=False): + """ + + :param hostname_backup_name: + :type hostname_backup_name: str + :param timestamp: + :type timestamp: int + :param level: + :type level: int + :param tar_meta: + :type tar_meta: bool + :return: + """ + self.hostname_backup_name = hostname_backup_name + self.timestamp = timestamp + self.level = level + self.tar_meta = tar_meta + + def repr(self): + return "_".join([self.hostname_backup_name, str(self.timestamp), + str(self.level)]) + + def backup(self, full_backup=None): + return Backup(self.hostname_backup_name, self.timestamp, + level=self.level, full_backup=full_backup, + tar_meta=self.tar_meta) diff --git a/freezer/streaming.py b/freezer/streaming.py new file mode 100644 index 00000000..159ddf85 --- /dev/null +++ b/freezer/streaming.py @@ -0,0 +1,176 @@ +""" +(c) Copyright 2014,2015 Hewlett-Packard Development Company, L.P. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +This product includes cryptographic software written by Eric Young +(eay@cryptsoft.com). This product includes software written by Tim +Hudson (tjh@cryptsoft.com). +======================================================================== + +Freezer general utils functions +""" +import threading +import logging +import Queue + + +class Wait(Exception): + pass + + +class StorageManager: + + def __init__(self, input_queue, output_queues): + """ + :type input_queue: streaming.RichQueue + :param input_queue: + :type output_queues: collections.Iterable[streaming.RichQueue] + :param output_queues: + :return: + """ + self.input_queue = input_queue + self.output_queues = output_queues + self.broken_output_queues = set() + + def send_message(self, message, finish=False): + for output_queue in self.output_queues: + if output_queue not in self.broken_output_queues: + try: + if finish: + output_queue.finish() + else: + output_queue.put(message) + except Exception as e: + logging.exception(e) + StorageManager.one_fails_all_fail( + self.input_queue, self.output_queues) + self.broken_output_queues.add(output_queue) + + def transmit(self): + for message in self.input_queue.get_messages(): + self.send_message(message) + self.send_message("", True) + + @staticmethod + def one_fails_all_fail(input_queue, output_queues): + input_queue.force_stop() + for output_queue in output_queues: + output_queue.force_stop() + raise Exception("All fail") + + +class RichQueue: + """ + :type data_queue: Queue.Queue + """ + def __init__(self, size=2): + """ + :type size: int + :return: + """ + self.data_queue = Queue.Queue(maxsize=size) + # transmission changes in atomic way so no synchronization needed + self.finish_transmission = False + self.is_force_stop = False + + def finish(self): + self.finish_transmission = True + + def force_stop(self): + self.is_force_stop = True + + def empty(self): + return self.data_queue.empty() + + def get(self): + try: + res = self.data_queue.get(timeout=1) + self.data_queue.task_done() + return res + except Queue.Empty: + raise Wait() + + def check_stop(self): + if self.is_force_stop: + raise Exception("Forced stop") + + def put_messages(self, messages): + for message in messages: + self.put(message) + self.finish() + + def has_more(self): + self.check_stop() + return not self.finish_transmission or not self.data_queue.empty() + + def put(self, message): + while True: + try: + self.data_queue.put(message, timeout=1) + break + except Queue.Full: + self.check_stop() + + def get_messages(self): + while self.has_more(): + try: + yield self.get() + except Wait: + self.check_stop() + + +class QueuedThread(threading.Thread): + def __init__(self, target, rich_queue, args=(), kwargs=None): + """ + :type args: collections.Iterable + :type kwargs: dict + :type target: () -> () + :type rich_queue: RichQueue + """ + self.args = args + kwargs = kwargs or {} + self.rich_queue = rich_queue + kwargs["rich_queue"] = rich_queue + super(QueuedThread, self).__init__(target=target, args=args, + kwargs=kwargs) + + def run(self): + try: + super(QueuedThread, self).run() + except Exception as e: + logging.exception(e) + self.rich_queue.force_stop() + + +def stream(read_function, read_function_kwargs, + write_function, write_function_kwargs, queue_size=10): + """ + :param queue_size: + :type queue_size: int + :return: + """ + input_queue = RichQueue(queue_size) + read_stream = QueuedThread(read_function, input_queue, + kwargs=read_function_kwargs) + output_queue = RichQueue(queue_size) + write_stream = QueuedThread(write_function, output_queue, + kwargs=write_function_kwargs) + read_stream.daemon = True + write_stream.daemon = True + read_stream.start() + write_stream.start() + manager = StorageManager(input_queue, [output_queue]) + manager.transmit() + read_stream.join() + write_stream.join() diff --git a/freezer/swift.py b/freezer/swift.py index af2598d9..b46cf0af 100644 --- a/freezer/swift.py +++ b/freezer/swift.py @@ -18,15 +18,12 @@ This product includes cryptographic software written by Eric Young Hudson (tjh@cryptsoft.com). """ -from copy import deepcopy -import multiprocessing - -from freezer import utils -from freezer import tar import json import time import logging import os + +from freezer import utils from freezer import storage @@ -35,7 +32,10 @@ class SwiftStorage(storage.Storage): :type client_manager: freezer.osclients.ClientManager """ - def __init__(self, client_manager, container, work_dir, max_segment_size): + RESP_CHUNK_SIZE = 10000000 + + def __init__(self, client_manager, container, work_dir, max_segment_size, + chunk_size=RESP_CHUNK_SIZE): """ :type client_manager: freezer.osclients.ClientManager :type container: str @@ -45,6 +45,7 @@ class SwiftStorage(storage.Storage): self.segments = u'{0}_segments'.format(container) self.work_dir = work_dir self.max_segment_size = max_segment_size + self.chunk_size = chunk_size def swift(self): """ @@ -85,87 +86,37 @@ class SwiftStorage(storage.Storage): .format(error)) raise Exception("cannot add object to storage") - def upload_manifest(self, name, headers=None): + def upload_manifest(self, backup): """ Upload Manifest to manage segments in Swift - :param name: Name of manifest file - :type name: str + :param backup: Backup + :type backup: freezer.storage.Backup """ - self.client_manager.create_swift() - headers = deepcopy(headers) or dict() - headers['x-object-manifest'] = u'{0}/{1}'.format(self.segments, - name.strip()) - logging.info('[*] Uploading Swift Manifest: {0}'.format(name)) - self.swift().put_object(container=self.container, obj=name, + headers = {'x-object-manifest': + u'{0}/{1}'.format(self.segments, backup)} + logging.info('[*] Uploading Swift Manifest: {0}'.format(backup)) + self.swift().put_object(container=self.container, obj=str(backup), contents=u'', headers=headers) logging.info('[*] Manifest successfully uploaded!') + def upload_meta_file(self, backup, meta_file): + # Upload swift manifest for segments + # Request a new auth client in case the current token + # is expired before uploading tar meta data or the swift manifest + self.client_manager.create_swift() + + # Upload tar incremental meta data file and remove it + logging.info('[*] Uploading tar meta data file: {0}'.format( + backup.tar())) + with open(meta_file, 'r') as meta_fd: + self.swift().put_object( + self.container, backup.tar(), meta_fd) + def is_ready(self): return self.check_container_existence()[0] - def restore(self, backup, path, tar_builder): - """ - Restore data from swift server to your local node. Data will be - restored in the directory specified in - backup_opt_dict.restore_abs_path. The - object specified with the --get-object option will be downloaded from - the Swift server and will be downloaded inside the parent directory of - backup_opt_dict.restore_abs_path. If the object was compressed during - backup time, then it is decrypted, decompressed and de-archived to - backup_opt_dict.restore_abs_path. Before download the file, the size of - the local volume/disk/partition will be computed. If there is enough - space - the full restore will be executed. Please remember to stop any service - that require access to the data before to start the restore execution - and to start the service at the end of the restore execution - - Take options dict as argument and sort/remove duplicate elements from - backup_opt_dict.remote_match_backup and find the closes backup to the - provided from backup_opt_dict.restore_from_date. Once the objects are - looped backwards and the level 0 backup is found, along with the other - level 1,2,n, is download the object from swift and untar them locally - starting from level 0 to level N. - :type tar_builder: freezer.tar.TarCommandRestoreBuilder - """ - - for level in range(0, backup.level + 1): - self._restore(backup.parent.increments[level], path, tar_builder) - - def _restore(self, backup, path, tar_builder): - """ - :type backup: freezer.storage.Backup - :param backup: - :type path: str - :type tar_builder: freezer.tar.TarCommandRestoreBuilder - :return: - """ - write_pipe, read_pipe = multiprocessing.Pipe() - process_stream = multiprocessing.Process( - target=self.object_to_stream, - args=(write_pipe, read_pipe, backup.repr(),)) - process_stream.daemon = True - process_stream.start() - - write_pipe.close() - # Start the tar pipe consumer process - tar_stream = multiprocessing.Process( - target=tar.tar_restore, args=(path, tar_builder.build(), - read_pipe)) - tar_stream.daemon = True - tar_stream.start() - read_pipe.close() - process_stream.join() - tar_stream.join() - - if tar_stream.exitcode: - raise Exception('failed to restore file') - - logging.info( - '[*] Restore execution successfully executed \ - for backup name {0}'.format(backup.repr())) - def prepare(self): containers = self.check_container_existence() if not containers[0]: @@ -184,21 +135,6 @@ class SwiftStorage(storage.Storage): return (self.container in containers_list, self.segments in containers_list) - def add_object(self, backup_queue, current_backup): - """ - Upload object on the remote swift server - :type current_backup: SwiftBackup - """ - file_chunk_index, file_chunk = backup_queue.get().popitem() - while file_chunk_index or file_chunk: - segment_package_name = u'{0}/{1}/{2}/{3}'.format( - current_backup.repr(), current_backup.timestamp, - self.max_segment_size, file_chunk_index) - self.upload_chunk(file_chunk, segment_package_name) - file_chunk_index, file_chunk = backup_queue.get().popitem() - - RESP_CHUNK_SIZE = 65536 - def info(self): ordered_container = {} containers = self.swift().get_account()[1] @@ -225,7 +161,7 @@ class SwiftStorage(storage.Storage): # remove segment for segment in self.swift().get_container( self.segments, - prefix=backup.increments[i].repr())[1]: + prefix=backup.increments[i])[1]: self.swift().delete_object(self.segments, segment['name']) # remove tar @@ -237,7 +173,7 @@ class SwiftStorage(storage.Storage): # remove manifest for segment in self.swift().get_container( self.container, - prefix=backup.increments[i].repr())[1]: + prefix=backup.increments[i])[1]: self.swift().delete_object(self.container, segment['name']) def add_stream(self, stream, package_name, headers=None): @@ -255,31 +191,6 @@ class SwiftStorage(storage.Storage): self.swift().put_object(self.container, package_name, "", headers=headers) - def object_to_stream(self, write_pipe, read_pipe, obj_name): - """ - Take a payload downloaded from Swift - and generate a stream to be consumed from other processes - """ - logging.info('[*] Downloading data stream...') - - # Close the read pipe in this child as it is unneeded - # and download the objects from swift in chunks. The - # Chunk size is set by RESP_CHUNK_SIZE and sent to che write - # pipe - read_pipe.close() - for obj_chunk in self.swift().get_object( - self.container, obj_name, - resp_chunk_size=self.RESP_CHUNK_SIZE)[1]: - write_pipe.send_bytes(obj_chunk) - - # Closing the pipe after checking no data - # is still available in the pipe. - while True: - if not write_pipe.poll(): - write_pipe.close() - break - time.sleep(1) - def get_backups(self): """ :rtype: list[SwiftBackup] @@ -288,35 +199,31 @@ class SwiftStorage(storage.Storage): try: files = self.swift().get_container(self.container)[1] names = [x['name'] for x in files if 'name' in x] - return self._get_backups(names) + return storage.Backup.parse_backups(names) except Exception as error: raise Exception('[*] Error: get_object_list: {0}'.format(error)) - def get_last_backup(self, hostname_backup_name): - """ - - :param hostname_backup_name: - :return: last backup or throws exception - :rtype: freezer.swift.backup.SwiftBackup - """ - return max(self.find(hostname_backup_name), key=lambda b: b.timestamp) - - def _download_tar_meta(self, backup): + def download_meta_file(self, backup): """ Downloads meta_data to work_dir of previous backup. :param backup: A backup or increment. Current backup is incremental, that means we should download tar_meta for detection new files and changes. If backup.tar_meta is false, raise Exception - :type backup: SwiftBackup + :type backup: freezer.storage.Backup :return: """ - if not backup.tar_meta: + utils.create_dir(self.work_dir) + if backup.level == 0: + return "{0}{1}{2}".format(self.work_dir, os.sep, backup.tar()) + + meta_backup = backup.full_backup.increments[backup.level - 1] + + if not meta_backup.tar_meta: raise ValueError('Latest update have no tar_meta') - utils.create_dir(self.work_dir) - tar_meta = backup.tar() - tar_meta_abs = "{0}/{1}".format(self.work_dir, tar_meta) + tar_meta = meta_backup.tar() + tar_meta_abs = "{0}{1}{2}".format(self.work_dir, os.sep, tar_meta) logging.info('[*] Downloading object {0} {1}'.format( tar_meta, tar_meta_abs)) @@ -326,82 +233,25 @@ class SwiftStorage(storage.Storage): with open(tar_meta_abs, 'ab') as obj_fd: iterator = self.swift().get_object( - self.container, tar_meta, resp_chunk_size=16000000)[1] + self.container, tar_meta, resp_chunk_size=self.chunk_size)[1] for obj_chunk in iterator: obj_fd.write(obj_chunk) + return tar_meta_abs - def _execute_tar_and_upload(self, path_to_backup, current_backup, - tar_command): + def backup_blocks(self, backup): + for chunk in self.swift().get_object( + self.container, backup, resp_chunk_size=self.chunk_size)[1]: + yield chunk + + def write_backup(self, rich_queue, backup): """ - - :param path_to_backup: - :type path_to_backup: str - :param current_backup: - :type current_backup: freezer.storage.Backup - :param tar_command: - :type tar_command: str - :return: + Upload object on the remote swift server + :type rich_queue: freezer.streaming.RichQueue + :type backup: SwiftBackup """ - # Initialize a Queue for a maximum of 2 items - tar_backup_queue = multiprocessing.Queue(maxsize=2) - - logging.info('[*] Changing current working directory to: {0} \ - '.format(path_to_backup)) - logging.info('[*] Backup started for: {0}'.format(path_to_backup)) - - tar_backup_stream = multiprocessing.Process( - target=tar.tar_backup, args=(path_to_backup, - self.max_segment_size, - tar_command, - tar_backup_queue)) - - tar_backup_stream.daemon = True - tar_backup_stream.start() - - add_object_stream = multiprocessing.Process( - target=self.add_object, args=(tar_backup_queue, current_backup)) - add_object_stream.daemon = True - add_object_stream.start() - - tar_backup_stream.join() - tar_backup_queue.put(({False: False})) - tar_backup_queue.close() - add_object_stream.join() - - if add_object_stream.exitcode: - raise Exception('failed to upload object to swift server') - - def _upload_tar_meta(self, new_backup, old_backup): - meta_data_abs_path = os.path.join(self.work_dir, old_backup.tar()) - - # Upload swift manifest for segments - # Request a new auth client in case the current token - # is expired before uploading tar meta data or the swift manifest - self.client_manager.create_swift() - - # Upload tar incremental meta data file and remove it - logging.info('[*] Uploading tar meta data file: {0}'.format( - new_backup.tar())) - with open(meta_data_abs_path, 'r') as meta_fd: - self.swift().put_object( - self.container, new_backup.tar(), meta_fd) - # Removing tar meta data file, so we have only one - # authoritative version on swift - logging.info('[*] Removing tar meta data file: {0}'.format( - meta_data_abs_path)) - os.remove(meta_data_abs_path) - - def backup(self, path, hostname_backup_name, tar_builder, - parent_backup=None, time_stamp=None): - new_backup = self._create_backup(hostname_backup_name, parent_backup, - time_stamp) - - if parent_backup: - self._download_tar_meta(parent_backup) - tar_builder.set_listed_incremental( - "{0}/{1}".format(self.work_dir, - (parent_backup or new_backup).tar())) - - self._execute_tar_and_upload(path, new_backup, tar_builder.build()) - self._upload_tar_meta(new_backup, parent_backup or new_backup) - self.upload_manifest(new_backup.repr()) + for block_index, message in enumerate(rich_queue.get_messages()): + segment_package_name = u'{0}/{1}/{2}/{3}'.format( + backup, backup.timestamp, + self.max_segment_size, "%08d" % block_index) + self.upload_chunk(message, segment_package_name) + self.upload_manifest(backup) diff --git a/freezer/tar.py b/freezer/tar.py index ccce2866..4bd173c8 100644 --- a/freezer/tar.py +++ b/freezer/tar.py @@ -21,57 +21,30 @@ Hudson (tjh@cryptsoft.com). Freezer Tar related functions """ -from freezer import winutils -import os -import logging -import subprocess -import sys - - -class SshCommandBuilder(object): - - @staticmethod - def ssh_command(ssh_key, ssh_user, ssh_ip, command): - """ - Use no compression because the data is already compressed. - To prevent asking to add a key, two more options are provided: - UserKnownHostsFile and StrictHostKeyChecking - - returns something like - ssh -o Compression=no -o StrictHostKeyChecking=no -o - UserKnownHostsFile=/dev/null -i mytestpair.pem ubuntu@15.126.199.52 - "cat > file.tar.gz" - """ - devnull = "/dev/null" - if winutils.is_windows(): - devnull = "NUL" - - return ('ssh -o Compression=no -o StrictHostKeyChecking=no -o ' - 'UserKnownHostsFile={0} -i {1} {2}@{3} "{4}"'.format( - devnull, ssh_key, ssh_user, ssh_ip, command)) - class TarCommandBuilder: """ Building a tar cmd command. To build command invoke method build. """ - COMMAND_TEMPLATE = ( + UNIX_TEMPLATE = ( "{gnutar_path} --create {algo} --warning=none --no-check-device " "--one-file-system --preserve-permissions --same-owner " "--seek --ignore-failed-read") + # local windows template + WINDOWS_TEMPLATE = ( + "{gnutar_path} -c {algo} --incremental --unlink-first " + "--ignore-zeros --force-local") + LISTED_TEMPLATE = "{tar_command} --listed-incremental={listed_incremental}" DEREFERENCE_MODE = {'soft': '--dereference', 'hard': '--hard-dereference', - 'all': '--hard-dereference --dereference', - 'none': ''} + 'all': '--hard-dereference --dereference'} - def __init__(self, gnutar_path, filepath, compression_algo): - self.dereference = 'none' + def __init__(self, gnutar_path, filepath, compression_algo, is_windows): self.gnutar_path = gnutar_path - self.exclude = None self.dereference = '' self.listed_incremental = None self.exclude = '' @@ -79,13 +52,8 @@ class TarCommandBuilder: self.encrypt_pass_file = None self.output_file = None self.filepath = filepath - self.ssh_key = None - self.ssh_user = None - self.ssh_ip = None self.compression_algo = get_tar_flag_from_algo(compression_algo) - - def set_output_file(self, output_file): - self.output_file = output_file + self.is_windows = is_windows def set_listed_incremental(self, absolute_path): self.listed_incremental = absolute_path @@ -93,11 +61,6 @@ class TarCommandBuilder: def set_exclude(self, exclude): self.exclude = exclude - def set_ssh(self, key_path, remote_user, remote_ip): - self.ssh_key = key_path - self.ssh_ip = remote_ip - self.ssh_user = remote_user - def set_dereference(self, mode): """ Dereference hard and soft links according option choices. @@ -112,34 +75,22 @@ class TarCommandBuilder: self.openssl_path = openssl_path self.encrypt_pass_file = encrypt_pass_file - def _create_ssh_command(self): - """ - returns something like that: - ssh -o Compression=no -i mytestpair.pem ubuntu@15.126.199.52 - "cat > file.tar.gz" - """ - return SshCommandBuilder.ssh_command( - self.ssh_key, - self.ssh_user, - self.ssh_ip, - "cat > {0}".format(self.output_file)) - def build(self): - tar_command = self.COMMAND_TEMPLATE.format( - gnutar_path=self.gnutar_path, algo=self.compression_algo, - dereference=self.dereference) + if self.is_windows: + tar_command = self.WINDOWS_TEMPLATE.format( + gnutar_path=self.gnutar_path, algo=self.compression_algo) + else: + tar_command = self.UNIX_TEMPLATE.format( + gnutar_path=self.gnutar_path, algo=self.compression_algo) + if self.dereference: - "{0} {1}".format(tar_command, self.dereference) + tar_command = "{0} {1}".format(tar_command, self.dereference) if self.listed_incremental: tar_command = self.LISTED_TEMPLATE.format( tar_command=tar_command, listed_incremental=self.listed_incremental) - if self.output_file and not self.ssh_key: - tar_command = "{0} --file={1}".format(tar_command, - self.output_file) - if self.exclude: tar_command = '{tar_command} --exclude="{exclude}"'.format( tar_command=tar_command, exclude=self.exclude) @@ -152,66 +103,46 @@ class TarCommandBuilder: file=self.encrypt_pass_file) tar_command = '{0} | {1}'.format(tar_command, openssl_cmd) - if self.ssh_key: - ssh_command = self._create_ssh_command() - tar_command = '{0} | {1}'.format(tar_command, ssh_command) - return tar_command class TarCommandRestoreBuilder: WINDOWS_TEMPLATE = '{0} -x {1} --incremental --unlink-first ' \ - '--ignore-zeros -f - ' + '--ignore-zeros -force-local' DRY_RUN_TEMPLATE = '{0} {1} --incremental --list ' \ '--ignore-zeros --warning=none' - NORMAL_TEMPLATE = '{0} {1} --incremental --extract --unlink-first ' \ + UNIX_TEMPLATE = '{0} {1} --incremental --extract --unlink-first ' \ '--ignore-zeros --warning=none --overwrite --directory {2}' - def __init__(self, tar_path, restore_path, compression_algo): + def __init__(self, tar_path, restore_path, compression_algo, is_windows): self.dry_run = False self.is_windows = False self.openssl_path = None self.encrypt_pass_file = None self.tar_path = tar_path self.restore_path = restore_path - self.archive = None - self.ssh_key = None - self.ssh_user = None - self.ssh_ip = None self.compression_algo = get_tar_flag_from_algo(compression_algo) + self.is_windows = is_windows def set_dry_run(self): self.dry_run = True - def set_windows(self): - self.is_windows = True - def set_encryption(self, openssl_path, encrypt_pass_file): self.openssl_path = openssl_path self.encrypt_pass_file = encrypt_pass_file - def set_archive(self, archive): - self.archive = archive - - def set_ssh(self, key_path, remote_user, remote_ip): - self.ssh_key = key_path - self.ssh_ip = remote_ip - self.ssh_user = remote_user - def build(self): - if self.is_windows: - tar_command = self.NORMAL_TEMPLATE.format(self.tar_path, - self.compression_algo) - elif self.dry_run: + if self.dry_run: tar_command = self.DRY_RUN_TEMPLATE.format(self.tar_path, self.compression_algo) + elif self.is_windows: + tar_command = self.WINDOWS_TEMPLATE.format(self.tar_path, + self.compression_algo) else: - tar_command = self.NORMAL_TEMPLATE.format(self.tar_path, - self.compression_algo, - self.restore_path) + tar_command = self.UNIX_TEMPLATE.format(self.tar_path, + self.compression_algo, + self.restore_path) - if self.archive and not self.ssh_key: - tar_command = tar_command + " --file " + self.archive # Check if encryption file is provided and set the openssl decrypt # command accordingly if self.encrypt_pass_file: @@ -219,79 +150,9 @@ class TarCommandRestoreBuilder: .format(openssl_path=self.openssl_path, file=self.encrypt_pass_file) tar_command = '{0} | {1} '.format(openssl_cmd, tar_command) - if self.ssh_key: - ssh_command = SshCommandBuilder.ssh_command( - self.ssh_key, - self.ssh_user, - self.ssh_ip, - "cat < {0}".format(self.archive)) - tar_command = '{0} | {1} '.format(ssh_command, tar_command) return tar_command -def tar_restore(restore_abs_path, tar_command, read_pipe): - """ - Restore the provided file into backup_opt_dict.restore_abs_path - Decrypt the file if backup_opt_dict.encrypt_pass_file key is provided - """ - - if winutils.is_windows(): - # on windows, chdir to restore path. - os.chdir(restore_abs_path) - - tar_cmd_proc = subprocess.Popen( - tar_command, stdin=subprocess.PIPE, stdout=subprocess.PIPE, - stderr=subprocess.PIPE, shell=True) - # Start loop reading the pipe and pass the data to the tar std input. - # If EOFError exception is raised, the loop end the std err will be - # checked for errors. - try: - while True: - tar_cmd_proc.stdin.write(read_pipe.recv_bytes()) - except EOFError: - logging.info( - '[*] Pipe closed as EOF reached. Data transmitted succesfully') - - tar_err = tar_cmd_proc.communicate()[1] - - if 'error' in tar_err.lower(): - logging.exception('[*] Restore error: {0}'.format(tar_err)) - sys.exit(1) - - -def tar_backup(path_to_backup, max_segment_size, tar_command, backup_queue): - """ - Execute an incremental backup using tar options, specified as - function arguments - """ - # Set counters, index, limits and bufsize for subprocess - file_read_limit = 0 - file_chunk_index = 00000000 - tar_chunk = b'' - logging.info( - '[*] Archiving and compressing files from {0}'.format(path_to_backup)) - - tar_process = subprocess.Popen( - tar_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, - shell=True) - - # Iterate over tar process stdout - for file_block in tar_process.stdout: - tar_chunk += file_block - file_read_limit += len(file_block) - if file_read_limit >= max_segment_size: - backup_queue.put( - ({("%08d" % file_chunk_index): tar_chunk})) - file_chunk_index += 1 - tar_chunk = b'' - file_read_limit = 0 - - # Upload segments smaller then opt_dict.max_segment_size - if len(tar_chunk) < max_segment_size: - backup_queue.put( - ({("%08d" % file_chunk_index): tar_chunk})) - - def get_tar_flag_from_algo(compression): algo = { 'gzip': '-z', diff --git a/freezer/utils.py b/freezer/utils.py index b3294376..86b70d54 100644 --- a/freezer/utils.py +++ b/freezer/utils.py @@ -27,6 +27,7 @@ import time import datetime import re import subprocess +import errno class OpenstackOptions: @@ -88,11 +89,25 @@ class OpenstackOptions: .format(e)) +def create_dir_tree(dir): + try: + os.makedirs(dir) + except OSError as exc: + if exc.errno == errno.EEXIST and os.path.isdir(dir): + pass + else: + raise exc + + +def joined_path(prefix, suffix): + return "{0}{1}{2}".format(prefix, os.sep, suffix) + + def create_dir(directory, do_log=True): - ''' + """ Creates a directory if it doesn't exists and write the execution in the logs - ''' + """ expanded_dir_name = os.path.expanduser(directory) try: if not os.path.isdir(expanded_dir_name): @@ -209,6 +224,11 @@ def human2bytes(s): set and return the corresponding bytes as an integer. When unable to recognize the format ValueError is raised. """ + # if isinstance(s, (int, long)): + # return s + + if s.isdigit(): + return long(s) if s in (False, None, '-1'): return -1 diff --git a/freezer/validator.py b/freezer/validator.py index 00eb020c..2e36dca9 100644 --- a/freezer/validator.py +++ b/freezer/validator.py @@ -22,3 +22,8 @@ class Validator: if conf.restore_abs_path and not conf.action == "restore": raise Exception('Restore abs path with {0} action' .format(conf.action)) + + if conf.storage == "ssh" and \ + not (conf.ssh_key and conf.ssh_username and conf.ssh_host): + raise Exception("Please provide ssh_key, " + "ssh_username and ssh_host") diff --git a/tests/commons.py b/tests/commons.py index dfb52fd3..b260a61d 100644 --- a/tests/commons.py +++ b/tests/commons.py @@ -13,6 +13,7 @@ import re from glanceclient.common.utils import IterableWithLength from freezer import swift from freezer.utils import OpenstackOptions +from freezer.engine import tar_engine os.environ['OS_REGION_NAME'] = 'testregion' os.environ['OS_TENANT_ID'] = '0123456789' @@ -789,13 +790,17 @@ class BackupOpt1: self.container, self.work_dir, self.max_segment_size) + self.compression = 'gzip' + + self.engine = tar_engine.TarBackupEngine( + tar_path(), self.compression, self.dereference_symlink, + self.exclude, self.storage, False) self.client_manager.get_glance = Mock(return_value=FakeGlanceClient()) self.client_manager.get_cinder = Mock(return_value=FakeCinderClient()) nova_client = MagicMock() self.client_manager.get_nova = Mock(return_value=nova_client) self.command = None - self.compression = 'gzip' class FakeMySQLdb: diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/integration/common.py b/tests/integration/common.py index 8a5a006a..97f2106a 100644 --- a/tests/integration/common.py +++ b/tests/integration/common.py @@ -182,43 +182,42 @@ class TestFS(unittest.TestCase): - FREEZER_TEST_NO_LVM is set """ + ssh_key = os.environ.get('FREEZER_TEST_SSH_KEY') + ssh_username = os.environ.get('FREEZER_TEST_SSH_USERNAME') + ssh_host = os.environ.get('FREEZER_TEST_SSH_HOST') + container = os.environ.get('FREEZER_TEST_CONTAINER') + use_ssh = ssh_key and ssh_username and ssh_host and container + + os_tenant_name = os.environ.get('FREEZER_TEST_OS_TENANT_NAME') + os_user_name = os.environ.get('FREEZER_TEST_OS_USERNAME') + os_region = os.environ.get('FREEZER_TEST_OS_REGION_NAME') + os_password = os.environ.get('FREEZER_TEST_OS_PASSWORD') + os_auth_url = os.environ.get('FREEZER_TEST_OS_AUTH_URL') + use_os = (os_tenant_name and os_user_name and os_region + and os_password and os_auth_url) + if use_os: + os.environ['OS_USERNAME'] = os_user_name + os.environ['OS_TENANT_NAME'] = os_tenant_name + os.environ['OS_AUTH_URL'] = os_auth_url + os.environ['OS_PASSWORD'] = os_password + os.environ['OS_REGION_NAME'] = os_region + os.environ['OS_TENANT_ID'] = '' + + openstack_executable = distutils.spawn.find_executable('openstack') + swift_executable = distutils.spawn.find_executable('swift') + + use_lvm = (os.getuid() == 0 and 'FREEZER_TEST_NO_LVM' not in os.environ) + ssh_executable = distutils.spawn.find_executable('ssh') + def setUp(self): - self.ssh_key = os.environ.get('FREEZER_TEST_SSH_KEY') - self.ssh_username = os.environ.get('FREEZER_TEST_SSH_USERNAME') - self.ssh_host = os.environ.get('FREEZER_TEST_SSH_HOST') - self.container = os.environ.get('FREEZER_TEST_CONTAINER') - self.use_ssh = self.ssh_key and self.ssh_username and self.ssh_host and self.container - if self.use_ssh: - self.ssh_client = paramiko.SSHClient() - self.ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) - self.ssh_client.connect(self.ssh_host, - username=self.ssh_username, - key_filename=self.ssh_key) - - self.os_tenant_name = os.environ.get('FREEZER_TEST_OS_TENANT_NAME') - self.os_user_name = os.environ.get('FREEZER_TEST_OS_USERNAME') - self.os_region = os.environ.get('FREEZER_TEST_OS_REGION_NAME') - self.os_password = os.environ.get('FREEZER_TEST_OS_PASSWORD') - self.os_auth_url = os.environ.get('FREEZER_TEST_OS_AUTH_URL') - self.use_os = (self.os_tenant_name and self.os_user_name and self.os_region - and self.os_password and self.os_auth_url) - if self.use_os: - os.environ['OS_USERNAME'] = self.os_user_name - os.environ['OS_TENANT_NAME'] = self.os_tenant_name - os.environ['OS_AUTH_URL'] = self.os_auth_url - os.environ['OS_PASSWORD'] = self.os_password - os.environ['OS_REGION_NAME'] = self.os_region - os.environ['OS_TENANT_ID'] = '' - - self.openstack_executable = distutils.spawn.find_executable('openstack') - self.swift_executable = distutils.spawn.find_executable('swift') - - self.use_lvm = (os.getuid() == 0 and 'FREEZER_TEST_NO_LVM' not in os.environ) - self.source_tree = Temp_Tree() self.dest_tree = Temp_Tree() - - self.ssh_executable = distutils.spawn.find_executable('ssh') + if TestFS.use_ssh: + self.ssh_client = paramiko.SSHClient() + self.ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + self.ssh_client.connect(TestFS.ssh_host, + username=TestFS.ssh_username, + key_filename=TestFS.ssh_key) def tearDown(self): self.source_tree.cleanup() diff --git a/tests/integration/test_agent.py b/tests/integration/test_agent.py index dea83842..b56f00c6 100644 --- a/tests/integration/test_agent.py +++ b/tests/integration/test_agent.py @@ -16,6 +16,7 @@ from copy import copy import json import os +import unittest import common import uuid @@ -177,9 +178,13 @@ class TestBackupSSH(common.TestFS): - FREEZER_TEST_CONTAINER (directory on the remote machine used to store backups) """ + @unittest.skipIf(not common.TestFS.use_ssh, + "Cannot test with ssh, please provide" + "'FREEZER_TEST_SSH_KEY,'" + "'FREEZER_TEST_SSH_USERNAME'," + "'FREEZER_TEST_SSH_HOST'," + "'FREEZER_TEST_CONTAINER'") def test_backup_ssh(self): - if not self.use_ssh: - return self.source_tree.add_random_data() self.assertTreesMatchNot() @@ -210,7 +215,6 @@ class TestBackupSSH(common.TestFS): result = common.execute(FREEZERC + self.dict_to_args(backup_args)) self.assertIsNotNone(result) - result = json.loads(result) sub_path = '_'.join([result['hostname'], result['backup_name']]) # It may be reasonable to insert a check of the files in the @@ -228,9 +232,13 @@ class TestBackupSSH(common.TestFS): self.remove_ssh_directory(sub_path) + @unittest.skipIf(not common.TestFS.use_ssh, + "Cannot test with ssh, please provide" + "'FREEZER_TEST_SSH_KEY,'" + "'FREEZER_TEST_SSH_USERNAME'," + "'FREEZER_TEST_SSH_HOST'," + "'FREEZER_TEST_CONTAINER'") def test_backup_ssh_incremental(self): - if not self.use_ssh: - return self.source_tree.add_random_data() self.assertTreesMatchNot() @@ -291,12 +299,14 @@ class TestBackupSSH(common.TestFS): self.remove_ssh_directory(sub_path) + @unittest.skipIf(not common.TestFS.use_ssh, + "Cannot test with ssh, please provide" + "'FREEZER_TEST_SSH_KEY,'" + "'FREEZER_TEST_SSH_USERNAME'," + "'FREEZER_TEST_SSH_HOST'," + "'FREEZER_TEST_CONTAINER'") + @unittest.skipIf(not common.TestFS.use_lvm, "No LVM support") def test_backup_ssh_incremental_with_lvm(self): - if not self.use_ssh: - return - if not self.use_lvm: - return - self.source_tree.add_random_data() self.assertTreesMatchNot() @@ -369,10 +379,14 @@ class TestBackupUsingSwiftStorage(common.TestFS): - FREEZER_TEST_OS_PASSWORD - FREEZER_TEST_OS_AUTH_URL """ - + @unittest.skipIf(not common.TestFS.use_os, + "Cannot test with swift, please provide" + "'FREEZER_TEST_OS_TENANT_NAME'," + "'FREEZER_TEST_OS_USERNAME'," + "'FREEZER_TEST_OS_REGION_NAME'," + "'FREEZER_TEST_OS_PASSWORD'," + "'FREEZER_TEST_OS_AUTH_URL'") def test_backup_os_simple(self): - if not self.use_os: - return self.source_tree.add_random_data() self.assertTreesMatchNot() @@ -426,13 +440,17 @@ class TestBackupUsingSwiftStorage(common.TestFS): result = self.remove_swift_container(backup_args['container']) self.assertIsNotNone(result) + @unittest.skipIf(not common.TestFS.use_os, + "Cannot test with swift, please provide" + "'FREEZER_TEST_OS_TENANT_NAME'," + "'FREEZER_TEST_OS_USERNAME'," + "'FREEZER_TEST_OS_REGION_NAME'," + "'FREEZER_TEST_OS_PASSWORD'," + "'FREEZER_TEST_OS_AUTH_URL'") + @unittest.skipIf(not common.TestFS.use_lvm, "No LVM support") + @unittest.skipIf(not os.path.isdir('/var/lib/mysql'), + "No path /var/lib/mysql") def test_backup_swift_mysql(self): - if not self.use_os: - return - if not self.use_lvm: - return - if not os.path.isdir('/var/lib/mysql'): - return self.source_tree = common.Temp_Tree(dir='/var/lib/mysql', create=False) backup_name = uuid.uuid4().hex diff --git a/tests/test_backup.py b/tests/test_backup.py deleted file mode 100644 index 95d663e7..00000000 --- a/tests/test_backup.py +++ /dev/null @@ -1,166 +0,0 @@ -#!/usr/bin/env python - -from freezer.backup import backup_mode_mysql, backup_mode_mongo -from freezer.backup import BackupOs -from freezer import tar -from freezer import local -import freezer -import swiftclient -import multiprocessing -import subprocess -import time -import os -import pymysql as MySQLdb -import pymongo -import re -import pytest -from commons import * - - -class TestBackUP: - - def test_backup_mode_mysql(self, monkeypatch, tmpdir): - - backup_opt = BackupOpt1() - backup_opt.__dict__['storage'] = local.LocalStorage(tmpdir.strpath) - fakemysql = FakeMySQLdb() - expanduser = Os() - fakere = FakeRe() - fakeswiftclient = FakeSwiftClient() - fakelvm = Lvm() - fakesubprocess = FakeSubProcess() - fakesubprocesspopen = fakesubprocess.Popen() - fakemultiprocessing = FakeMultiProcessing() - fakemultiprocessingqueue = fakemultiprocessing.Queue() - fakemultiprocessingpipe = fakemultiprocessing.Pipe() - fakemultiprocessinginit = fakemultiprocessing.__init__() - - monkeypatch.setattr( - multiprocessing, 'Queue', fakemultiprocessingqueue) - monkeypatch.setattr( - multiprocessing, 'Pipe', fakemultiprocessingpipe) - monkeypatch.setattr( - multiprocessing, 'Process', fakemultiprocessing.Process) - monkeypatch.setattr( - multiprocessing, '__init__', fakemultiprocessinginit) - #monkeypatch.setattr(__builtin__, 'open', fakeopen.open) - monkeypatch.setattr( - subprocess.Popen, 'communicate', fakesubprocesspopen.communicate) - monkeypatch.setattr( - freezer.lvm, 'lvm_snap_remove', fakelvm.lvm_snap_remove) - monkeypatch.setattr(freezer.lvm, 'lvm_eval', fakelvm.lvm_eval) - monkeypatch.setattr(re, 'search', fakere.search) - monkeypatch.setattr(MySQLdb, 'connect', fakemysql.connect) - monkeypatch.setattr(os.path, 'expanduser', expanduser.expanduser) - monkeypatch.setattr(os.path, 'isdir', expanduser.isdir) - monkeypatch.setattr(os, 'makedirs', expanduser.makedirs) - monkeypatch.setattr(os.path, 'exists', expanduser.exists) - monkeypatch.setattr(os, 'chdir', lambda x: x) - monkeypatch.setattr(swiftclient, 'client', fakeswiftclient.client) - - mysql_conf = backup_opt.mysql_conf - backup_opt.__dict__['mysql_conf'] = None - pytest.raises(Exception, backup_mode_mysql, backup_opt) - - # Generate mysql conf test file - backup_opt.__dict__['mysql_conf'] = mysql_conf - with open(backup_opt.mysql_conf, 'w') as mysql_conf_fd: - mysql_conf_fd.write('host=abcd\nport=1234\nuser=abcd\npassword=abcd\n') - assert backup_mode_mysql(backup_opt) is None - - fakemysql2 = FakeMySQLdb2() - monkeypatch.setattr(MySQLdb, 'connect', fakemysql2.connect) - pytest.raises(Exception, backup_mode_mysql) - os.unlink(backup_opt.mysql_conf) - - def test_backup_mode_fs(self, monkeypatch, tmpdir): - - # Class and other settings initialization - backup_opt = BackupOpt1() - backup_opt.mode = 'fs' - expanduser = Os() - fakere = FakeRe() - fakeswiftclient = FakeSwiftClient() - fakelvm = Lvm() - fakemultiprocessing = FakeMultiProcessing() - fakemultiprocessingqueue = fakemultiprocessing.Queue() - fakemultiprocessingpipe = fakemultiprocessing.Pipe() - fakemultiprocessinginit = fakemultiprocessing.__init__() - - # Monkey patch - monkeypatch.setattr( - multiprocessing, 'Queue', fakemultiprocessingqueue) - monkeypatch.setattr(multiprocessing, 'Pipe', fakemultiprocessingpipe) - monkeypatch.setattr( - multiprocessing, 'Process', fakemultiprocessing.Process) - monkeypatch.setattr( - multiprocessing, '__init__', fakemultiprocessinginit) - monkeypatch.setattr(freezer.lvm, 'lvm_eval', fakelvm.lvm_eval) - monkeypatch.setattr(swiftclient, 'client', fakeswiftclient.client) - monkeypatch.setattr(re, 'search', fakere.search) - monkeypatch.setattr(os.path, 'exists', expanduser.exists) - - storage = local.LocalStorage(tmpdir.strpath) - - assert storage.backup( - "/tmp/", "hostname_backup_name", - tar.TarCommandBuilder(tar_path(), ".", "gzip")) is None - - backup_opt.__dict__['no_incremental'] = False - with open( - '/tmp/tar_metadata_test-hostname_test-backup-name_123456789_0', 'w') as fd: - fd.write('testcontent\n') - assert storage.backup( - "/tmp/", "hostname_backup_name", - tar.TarCommandBuilder(tar_path(), ".", "gzip")) is None - - def test_backup_mode_mongo(self, monkeypatch, tmpdir): - - # Class and other settings initialization - test_meta = dict() - backup_opt = BackupOpt1() - backup_opt.__dict__['storage'] = local.LocalStorage(tmpdir.strpath) - - fakemongo = FakeMongoDB() - backup_opt.mode = 'mongo' - fakeos = Os() - fakere = FakeRe() - fakeswiftclient = FakeSwiftClient() - #fakeopen = FakeOpen() - fakelvm = Lvm() - fakemultiprocessing = FakeMultiProcessing() - fakemultiprocessingqueue = fakemultiprocessing.Queue() - fakemultiprocessingpipe = fakemultiprocessing.Pipe() - fakemultiprocessinginit = fakemultiprocessing.__init__() - - monkeypatch.setattr( - multiprocessing, 'Queue', fakemultiprocessingqueue) - monkeypatch.setattr( - multiprocessing, 'Pipe', fakemultiprocessingpipe) - monkeypatch.setattr( - multiprocessing, 'Process', fakemultiprocessing.Process) - monkeypatch.setattr( - multiprocessing, '__init__', fakemultiprocessinginit) - monkeypatch.setattr(freezer.lvm, 'lvm_eval', fakelvm.lvm_eval) - monkeypatch.setattr(pymongo, 'MongoClient', fakemongo) - monkeypatch.setattr(os.path, 'exists', fakeos.exists) - monkeypatch.setattr(re, 'search', fakere.search) - monkeypatch.setattr(swiftclient, 'client', fakeswiftclient.client) - #monkeypatch.setattr(__builtin__, 'open', fakeopen.open) - - assert backup_mode_mongo(backup_opt) is None - - fakemongo2 = FakeMongoDB2() - monkeypatch.setattr(pymongo, 'MongoClient', fakemongo2) - assert backup_mode_mongo(backup_opt) is True - - def test_backup_cinder_by_glance(self): - backup_opt = BackupOpt1() - BackupOs(backup_opt.client_manager, - backup_opt.container, - backup_opt.storage).backup_cinder_by_glance(34) - - def test_backup_cinder(self): - backup_opt = BackupOpt1() - BackupOs(backup_opt.client_manager, - backup_opt.container, backup_opt.storage).backup_cinder(34) diff --git a/tests/test_local.py b/tests/test_local.py index 5acb65bc..713b9281 100644 --- a/tests/test_local.py +++ b/tests/test_local.py @@ -12,6 +12,7 @@ import os class TestLocalStorage(object): BACKUP_DIR_PREFIX = "freezer_test_backup_dir" FILES_DIR_PREFIX = "freezer_test_files_dir" + WORK_DIR_PREFIX = "freezer_work_dir" HELLO = "Hello World!\n" temp = True @@ -27,13 +28,17 @@ class TestLocalStorage(object): dir=tmpdir, prefix=self.BACKUP_DIR_PREFIX) files_dir = tempfile.mkdtemp( dir=tmpdir, prefix=self.FILES_DIR_PREFIX) + work_dir = tempfile.mkdtemp( + dir=tmpdir, prefix=self.WORK_DIR_PREFIX) else: backup_dir = tmpdir + self.BACKUP_DIR_PREFIX files_dir = tmpdir + self.FILES_DIR_PREFIX + work_dir = tmpdir + self.WORK_DIR_PREFIX utils.create_dir(backup_dir) + utils.create_dir(work_dir) utils.create_dir(files_dir) self.create_content(files_dir) - return backup_dir, files_dir + return backup_dir, files_dir, work_dir def remove_dirs(self, work_dir, files_dir, backup_dir): if self.temp: @@ -44,99 +49,16 @@ class TestLocalStorage(object): def remove_storage(self, backup_dir): shutil.rmtree(backup_dir) - def test(self, tmpdir): - backup_dir, files_dir = self.create_dirs(tmpdir) - storage = local.LocalStorage(backup_dir) - builder = tar.TarCommandBuilder(commons.tar_path(), ".", "gzip") - storage.backup(files_dir, "file_backup", builder) - storage.get_backups() - def test_is_ready(self, tmpdir): - backup_dir, files_dir = self.create_dirs(tmpdir) - storage = local.LocalStorage(backup_dir) + backup_dir, files_dir, work_dir = self.create_dirs(tmpdir) + storage = local.LocalStorage(backup_dir, work_dir) assert storage.is_ready() def test_prepare(self, tmpdir): - backup_dir, files_dir = self.create_dirs(tmpdir) - storage = local.LocalStorage(backup_dir) + backup_dir, files_dir, work_dir = self.create_dirs(tmpdir) + storage = local.LocalStorage(backup_dir, work_dir) assert storage.is_ready() self.remove_storage(backup_dir) assert not storage.is_ready() storage.prepare() assert storage.is_ready() - - def test_get_backups(self, tmpdir): - backup_dir, files_dir = self.create_dirs(tmpdir) - storage = local.LocalStorage(backup_dir) - builder = tar.TarCommandBuilder(commons.tar_path(), ".", "gzip") - os.chdir(files_dir) - storage.backup(files_dir, "file_backup", builder) - backups = storage.get_backups() - assert len(backups) == 1 - - def test_incremental_backup(self, tmpdir): - backup_dir, files_dir = self.create_dirs(tmpdir) - storage = local.LocalStorage(backup_dir) - builder = tar.TarCommandBuilder(commons.tar_path(), ".", "gzip") - os.chdir(files_dir) - storage.backup(files_dir, "file_backup", builder) - backups = storage.get_backups() - assert len(backups) == 1 - backup = backups[0] - self.create_content(files_dir, "file_2", "foo\n") - storage.backup(files_dir, "file_backup", builder, backup) - - def test_incremental_restore(self, tmpdir): - backup_dir, files_dir = self.create_dirs(tmpdir) - storage = local.LocalStorage(backup_dir) - builder = tar.TarCommandBuilder(commons.tar_path(), ".", "gzip") - os.chdir(files_dir) - storage.backup(files_dir, "file_backup", builder) - backups = storage.get_backups() - assert len(backups) == 1 - backup = backups[0] - self.create_content(files_dir, "file_2", "foo\n") - storage.backup(files_dir, "file_backup", builder, backup) - for path in os.listdir(files_dir): - os.remove(files_dir + "/" + path) - assert not os.listdir(files_dir) - utils.create_dir(files_dir) - backup = storage.get_backups()[0] - builder = tar.TarCommandRestoreBuilder(commons.tar_path(), files_dir, - "gzip") - storage.restore(backup.latest_update, files_dir, builder) - files = os.listdir(files_dir) - assert len(files) == 2 - with open(files_dir + "/file_1", "r") as file_1: - assert self.HELLO == file_1.read() - with open(files_dir + "/file_2", "r") as file_2: - assert "foo\n" == file_2.read() - - def test_backup_file(self, tmpdir): - backup_dir, files_dir = self.create_dirs(tmpdir) - storage = local.LocalStorage(backup_dir) - builder = tar.TarCommandBuilder(commons.tar_path(), "file_1", "gzip") - os.chdir(files_dir) - storage.backup(files_dir + "/file_1", "file_backup", builder) - for path in os.listdir(files_dir): - os.remove(files_dir + "/" + path) - assert not os.listdir(files_dir) - utils.create_dir(files_dir) - backup = storage.get_backups()[0] - builder = tar.TarCommandRestoreBuilder(commons.tar_path(), files_dir, - "gzip") - storage.restore(backup, files_dir, builder) - files = os.listdir(files_dir) - assert len(files) == 1 - - def test_remove_backup(self, tmpdir): - backup_dir, files_dir = self.create_dirs(tmpdir) - storage = local.LocalStorage(backup_dir) - builder = tar.TarCommandBuilder(commons.tar_path(), ".", "gzip") - os.chdir(files_dir) - storage.backup(files_dir, "file_backup", builder) - backups = storage.get_backups() - assert len(backups) == 1 - storage.remove_backup(backups[0]) - backups = storage.get_backups() - assert len(backups) == 0 diff --git a/tests/test_storage.py b/tests/test_storage.py index b931a1cc..f3c8df45 100644 --- a/tests/test_storage.py +++ b/tests/test_storage.py @@ -6,14 +6,14 @@ import mock class TestBackup(unittest.TestCase): def test_backup_parse(self): - self.assertRaises(ValueError, storage.Backup.parse, "asdfasdfasdf") - backup = storage.Backup.parse("test_name_host_1234_0") + self.assertRaises(ValueError, storage.Backup._parse, "asdfasdfasdf") + backup = storage.Backup._parse("test_name_host_1234_0") self.assertEqual(backup.level, 0) self.assertEqual(backup.timestamp, 1234) self.assertEqual(backup.hostname_backup_name, "test_name_host") def test_backup_creation(self): - backup = storage.Backup("name", 1234, 0) + backup = storage.Backup("name", 1234) self.assertEqual(backup.hostname_backup_name, "name") self.assertEqual(backup.timestamp, 1234) self.assertEqual(backup.level, 0) @@ -23,63 +23,63 @@ class TestBackup(unittest.TestCase): self.assertEqual(len(backup.increments), 1) def test_backup_increment(self): - backup = storage.Backup("name", 1234, 0) + backup = storage.Backup("name", 1234) self.assertRaises(ValueError, backup.add_increment, backup) - increment = storage.Backup("name", 4567, 1) + increment = storage.Backup("name", 4567, 1, backup) backup.add_increment(increment) self.assertEqual(len(backup.increments), 2) def test__find_previous_backup(self): - backup = storage.Backup("name", 1234, 0) + backup = storage.Backup("name", 1234) b = storage.Storage._find_previous_backup([backup], False, 2, False, 0) assert b == backup def test__find_previous_backup_with_max_level(self): - backup = storage.Backup("name", 1234, 0) - i1 = storage.Backup("name", 1234, 1) - i2 = storage.Backup("name", 1234, 2) + backup = storage.Backup("name", 1234) + i1 = storage.Backup("name", 1234, 1, backup) + i2 = storage.Backup("name", 1234, 2, backup) backup.add_increment(i1) backup.add_increment(i2) b = storage.Storage._find_previous_backup([backup], False, 2, False, 0) assert not b def test__find_previous_backup_with_max_level_not_reached(self): - backup = storage.Backup("name", 1234, 0) - i1 = storage.Backup("name", 1234, 1) + backup = storage.Backup("name", 1234) + i1 = storage.Backup("name", 1234, 1, backup) backup.add_increment(i1) b = storage.Storage._find_previous_backup([backup], False, 2, False, 0) assert b == i1 def test__find_previous_backup_with_always_level_reached(self): - backup = storage.Backup("name", 1234, 0) - i1 = storage.Backup("name", 1234, 1) - i2 = storage.Backup("name", 1234, 2) + backup = storage.Backup("name", 1234) + i1 = storage.Backup("name", 1234, 1, backup) + i2 = storage.Backup("name", 1234, 2, backup) backup.add_increment(i1) backup.add_increment(i2) b = storage.Storage._find_previous_backup([backup], False, False, 2, 0) assert b == i1 def test__find_previous_backup_with_always_level_reached_2(self): - backup = storage.Backup("name", 1234, 0) - i1 = storage.Backup("name", 1234, 1) - i2 = storage.Backup("name", 1234, 2) + backup = storage.Backup("name", 1234) + i1 = storage.Backup("name", 1234, 1, backup) + i2 = storage.Backup("name", 1234, 2, backup) backup.add_increment(i1) backup.add_increment(i2) b = storage.Storage._find_previous_backup([backup], False, False, 3, 0) assert b == i2 - def test_find(self): + def test_find_all(self): t = storage.Storage() t.get_backups = mock.Mock() t.get_backups.return_value = [ - storage.Backup("host_backup", 1000, 0), - storage.Backup("host_backup", 1000, 0), - storage.Backup("host_backup", 1000, 0), - storage.Backup("host_backup", 1000, 0), - storage.Backup("host_backup_f", 1000, 0), - storage.Backup("host_backup", 1000, 0), + storage.Backup("host_backup", 1000), + storage.Backup("host_backup", 1000), + storage.Backup("host_backup", 1000), + storage.Backup("host_backup", 1000), + storage.Backup("host_backup_f", 1000), + storage.Backup("host_backup", 1000), ] - result = t.find("host_backup") + result = t.find_all("host_backup") assert len(result) == 5 for r in result: assert r.hostname_backup_name != "host_backup_f" @@ -87,82 +87,70 @@ class TestBackup(unittest.TestCase): def test_restore_latest_backup(self): t = storage.Storage() t.get_backups = mock.Mock() - last = storage.Backup("host_backup", 5000, 0) + last = storage.Backup("host_backup", 5000) t.get_backups.return_value = [ - storage.Backup("host_backup", 1000, 0), - storage.Backup("host_backup", 2000, 0), - storage.Backup("host_backup", 3000, 0), - storage.Backup("host_backup", 4000, 0), - storage.Backup("host_backup_f", 1000, 0), + storage.Backup("host_backup", 1000), + storage.Backup("host_backup", 2000), + storage.Backup("host_backup", 3000), + storage.Backup("host_backup", 4000), + storage.Backup("host_backup_f", 1000), last ] - builder = tar.TarCommandRestoreBuilder("", "", "gzip") - self.assertRaises(ValueError, t.restore_latest, "test", ".", builder) - t.restore = mock.Mock() - t.restore_latest("host_backup", ".", builder) - t.restore.assert_called_with(last, ".", builder) + self.assertRaises(IndexError, t.find_one, "") + assert t.find_one("host_backup") == last def test_find_latest_backup_respects_increments_timestamp(self): - test_backup = storage.Backup("host_backup", 1000, 0) - increment = storage.Backup("host_backup", 6000, 1) + test_backup = storage.Backup("host_backup", 5500) + increment = storage.Backup("host_backup", 6000, 1, test_backup) test_backup.add_increment(increment) t = storage.Storage() t.get_backups = mock.Mock() t.get_backups.return_value = [ test_backup, - storage.Backup("host_backup", 2000, 0), - storage.Backup("host_backup", 3000, 0), - storage.Backup("host_backup", 4000, 0), - storage.Backup("host_backup_f", 1000, 0), - storage.Backup("host_backup", 5000, 0), + storage.Backup("host_backup", 2000), + storage.Backup("host_backup", 3000), + storage.Backup("host_backup", 4000), + storage.Backup("host_backup_f", 1000), + storage.Backup("host_backup", 5000), ] - builder = tar.TarCommandRestoreBuilder("", "", "gzip") - t.restore = mock.Mock() - t.restore_latest("host_backup", ".", builder) - t.restore.assert_called_with(increment, ".", builder) + assert t.find_one("host_backup") == increment def test_restore_from_date(self): t = storage.Storage() t.get_backups = mock.Mock() - backup_restore = storage.Backup("host_backup", 3000, 0) + backup_restore = storage.Backup("host_backup", 3000) t.get_backups.return_value = [ - storage.Backup("host_backup", 1000, 0), - storage.Backup("host_backup", 2000, 0), + storage.Backup("host_backup", 1000), + storage.Backup("host_backup", 2000), backup_restore, - storage.Backup("host_backup", 4000, 0), - storage.Backup("host_backup_f", 1000, 0), - storage.Backup("host_backup", 5000, 0), + storage.Backup("host_backup", 4000), + storage.Backup("host_backup_f", 1000), + storage.Backup("host_backup", 5000), ] - t.restore = mock.Mock() - builder = tar.TarCommandRestoreBuilder("", "", "gzip") - t.restore_from_date("host_backup", ".", builder, 3234) - t.restore.assert_called_with(backup_restore, ".", builder) + assert t.find_one("host_backup", 3234) == backup_restore def test_restore_from_date_increment(self): t = storage.Storage() t.get_backups = mock.Mock() - test_backup = storage.Backup("host_backup", 1000, 0) - increment = storage.Backup("host_backup", 3200, 1) + test_backup = storage.Backup("host_backup", 1000) + increment = storage.Backup("host_backup", 3200, 1, test_backup) test_backup.add_increment(increment) t.get_backups.return_value = [ test_backup, - storage.Backup("host_backup", 4000, 0), - storage.Backup("host_backup_f", 1000, 0), - storage.Backup("host_backup", 5000, 0), + storage.Backup("host_backup", 4000), + storage.Backup("host_backup_f", 1000), + storage.Backup("host_backup", 5000), ] - t.restore = mock.Mock() - builder = tar.TarCommandRestoreBuilder("", "", "gzip") - t.restore_from_date("host_backup", ".", builder, 3234) - t.restore.assert_called_with(increment, ".", builder) + assert t.find_one("host_backup", 3234) == increment def test__get_backups_wrong_name(self): - result = storage.Storage._get_backups(["hostname"]) + result = storage.Backup.parse_backups(["hostname"]) assert len(result) == 0 - result = storage.Storage._get_backups(["hostname_100_2"]) + result = storage.Backup.parse_backups(["hostname_100_2"]) assert len(result) == 0 def test__get_backups_good_name(self): - result = storage.Storage._get_backups(["host_backup_100_0"]) + result = storage.Backup.parse_backups(["host_backup_100_0"]) assert len(result) == 1 result = result[0] assert result.hostname_backup_name == "host_backup" @@ -172,15 +160,15 @@ class TestBackup(unittest.TestCase): def test_remove_older_than(self): t = storage.Storage() t.get_backups = mock.Mock() - r1 = storage.Backup("host_backup", 1000, 0) - r2 = storage.Backup("host_backup", 2000, 0) + r1 = storage.Backup("host_backup", 1000) + r2 = storage.Backup("host_backup", 2000) t.get_backups.return_value = [ r1, r2, - storage.Backup("host_backup", 3000, 0), - storage.Backup("host_backup", 4000, 0), - storage.Backup("host_backup_f", 1000, 0), - storage.Backup("host_backup", 5000, 0), + storage.Backup("host_backup", 3000), + storage.Backup("host_backup", 4000), + storage.Backup("host_backup_f", 1000), + storage.Backup("host_backup", 5000), ] t.remove_backup = mock.Mock() t.remove_older_than(3000, "host_backup") diff --git a/tests/test_swift.py b/tests/test_swift.py index 0b00a8a1..79101f0f 100644 --- a/tests/test_swift.py +++ b/tests/test_swift.py @@ -41,38 +41,44 @@ class TestSwiftStorage(unittest.TestCase): "hostname_backup_4000_1", ] - self.backup = storage.Backup("hostname_backup", 1000, 0, True) - self.backup_2 = storage.Backup("hostname_backup", 3000, 0, True) - self.increment = storage.Backup("hostname_backup", 2000, 1, True) - self.increment_2 = storage.Backup("hostname_backup", 4000, 1, True) + self.backup = storage.Backup("hostname_backup", 1000, tar_meta=True) + self.backup_2 = storage.Backup("hostname_backup", 3000, tar_meta=True) + self.increment = storage.Backup("hostname_backup", 2000, + full_backup=self.backup, + level=1, + tar_meta=True) + self.increment_2 = storage.Backup("hostname_backup", 4000, + full_backup=self.backup_2, + level=1, + tar_meta=True) def test__get_backups(self): - backups = swift.SwiftStorage._get_backups(self.files) + backups = storage.Backup.parse_backups(self.files) self.assertEqual(len(backups), 1) backup = backups[0] self.assertEqual(backup, self.backup) def test__get_backups_with_tar_only(self): - backups = swift.SwiftStorage._get_backups( + backups = storage.Backup.parse_backups( ["tar_metadata_hostname_backup_1000_0"]) self.assertEqual(len(backups), 0) def test__get_backups_without_tar(self): - backups = swift.SwiftStorage._get_backups(["hostname_backup_1000_0"]) + backups = storage.Backup.parse_backups(["hostname_backup_1000_0"]) self.assertEqual(len(backups), 1) self.backup.tar_meta = False backup = backups[0] self.assertEqual(backup, self.backup) def test__get_backups_increment(self): - backups = swift.SwiftStorage._get_backups(self.increments) + backups = storage.Backup.parse_backups(self.increments) self.assertEqual(len(backups), 1) self.backup.add_increment(self.increment) backup = backups[0] self.assertEqual(backup, self.backup) def test__get_backups_increments(self): - backups = swift.SwiftStorage._get_backups(self.cycles_increments) + backups = storage.Backup.parse_backups(self.cycles_increments) self.assertEqual(len(backups), 2) self.backup.add_increment(self.increment) self.backup_2.add_increment(self.increment_2) diff --git a/tests/test_tar.py b/tests/test_tar.py index c50f02e2..78bd8a32 100644 --- a/tests/test_tar.py +++ b/tests/test_tar.py @@ -22,75 +22,14 @@ Hudson (tjh@cryptsoft.com). """ from commons import * -from freezer.tar import (tar_restore, tar_backup, get_tar_flag_from_algo) -from freezer import winutils +from freezer.tar import get_tar_flag_from_algo import os import logging -import subprocess -import pytest class TestTar: - def test_tar_restore(self, monkeypatch): - - backup_opt = BackupOpt1() - fakelogging = FakeLogging() - fakesubprocess = FakeSubProcess5() - fakesubprocesspopen = fakesubprocess.Popen() - fakemultiprocessing = FakeMultiProcessing() - fakepipe = fakemultiprocessing.Pipe() - fakeos = Os() - - monkeypatch.setattr(os, 'path', fakeos) - monkeypatch.setattr(os, 'remove', fakeos.remove) - monkeypatch.setattr( - subprocess.Popen, 'communicate', fakesubprocesspopen.communicate) - monkeypatch.setattr(subprocess, 'Popen', fakesubprocesspopen) - monkeypatch.setattr(logging, 'critical', fakelogging.critical) - monkeypatch.setattr(logging, 'warning', fakelogging.warning) - monkeypatch.setattr(logging, 'exception', fakelogging.exception) - monkeypatch.setattr(logging, 'error', fakelogging.error) - - pytest.raises(SystemExit, tar_restore, "", "", fakepipe) - - fakesubprocess = FakeSubProcess() - fakesubprocesspopen = fakesubprocess.Popen() - monkeypatch.setattr( - subprocess.Popen, 'communicate', fakesubprocesspopen.communicate) - monkeypatch.setattr( - subprocess, 'Popen', fakesubprocesspopen) - assert tar_restore("", "", fakepipe) is None - - # expected_tar_cmd = 'gzip -dc | tar -xf - --unlink-first --ignore-zeros' - monkeypatch.setattr(winutils, 'is_windows', ReturnBool.return_true) - fake_os = Os() - monkeypatch.setattr(os, 'chdir', fake_os.chdir) - assert tar_restore("", "", fakepipe) is None - - monkeypatch.setattr(os, 'chdir', fake_os.chdir2) - pytest.raises(Exception, tar_restore, backup_opt, "", fakepipe) - - def test_tar_backup(self, monkeypatch): - - fakelogging = FakeLogging() - fakesubprocess = FakeSubProcess() - fakesubprocesspopen = fakesubprocess.Popen() - fakemultiprocessing = FakeMultiProcessing() - fakebackup_queue = fakemultiprocessing.Queue() - - monkeypatch.setattr( - subprocess.Popen, 'communicate', fakesubprocesspopen.communicate) - monkeypatch.setattr( - subprocess, 'Popen', fakesubprocesspopen) - monkeypatch.setattr(logging, 'critical', fakelogging.critical) - monkeypatch.setattr(logging, 'warning', fakelogging.warning) - monkeypatch.setattr(logging, 'exception', fakelogging.exception) - monkeypatch.setattr(logging, 'error', fakelogging.error) - - assert tar_backup(".", 100, 'tar_command', fakebackup_queue) is not False - def test_tar_restore_args_valid(self, monkeypatch): backup_opt = BackupOpt1() diff --git a/tests/test_tar_builders.py b/tests/test_tar_builders.py index 433469a6..6230f1b1 100644 --- a/tests/test_tar_builders.py +++ b/tests/test_tar_builders.py @@ -5,7 +5,7 @@ from freezer import tar class TestTarCommandBuilder(unittest.TestCase): def setUp(self): - self.builder = tar.TarCommandBuilder("gnutar", ".", "gzip") + self.builder = tar.TarCommandBuilder("gnutar", ".", "gzip", False) def test_build(self): self.assertEquals( @@ -31,13 +31,15 @@ class TestTarCommandBuilder(unittest.TestCase): self.builder.build(), "gnutar --create -z --warning=none --no-check-device " "--one-file-system --preserve-permissions --same-owner " - "--seek --ignore-failed-read --listed-incremental=listed-file.tar " + "--seek --ignore-failed-read --hard-dereference " + "--listed-incremental=listed-file.tar " "--exclude=\"excluded_files\" . | openssl enc -aes-256-cfb -pass " "file:encrypt_pass_file") class TestTarCommandRestoreBuilder(unittest.TestCase): def setUp(self): - self.builder = tar.TarCommandRestoreBuilder("gnutar", "restore_path", "gzip") + self.builder = tar.TarCommandRestoreBuilder( + "gnutar", "restore_path", "gzip", False) def test(self): self.assertEquals(