From 0df317c94b6f8dc6a5ec029d5f2a1d838363d8bd Mon Sep 17 00:00:00 2001 From: Ruslan Aliev Date: Fri, 20 Jan 2017 17:46:23 +0400 Subject: [PATCH] Block based incremental support - rsync Added new engine type (rsync) to perform space/bandwidth efficient backups. Change-Id: I8390c9c85fc2478a4ad2fe7eb4e40f3e580da912 Signed-off-by: Ruslan Aliev --- README.rst | 11 +- doc/source/index.rst | 1 + freezer/common/config.py | 11 +- freezer/engine/engine.py | 15 +- freezer/engine/rsync/__init__.py | 0 freezer/engine/rsync/pyrsync.py | 159 +++ freezer/engine/rsync/rsync.py | 942 ++++++++++++++++++ freezer/main.py | 6 +- freezer/storage/base.py | 5 +- freezer/storage/fslike.py | 3 +- freezer/storage/physical.py | 1 + freezer/storage/ssh.py | 1 - freezer/storage/swift.py | 3 +- freezer/tests/integration/common.py | 40 +- freezer/tests/integration/test_agent.py | 5 +- .../tests/integration/test_rsync_backup.py | 160 +++ freezer/tests/unit/engines/rsync/__init__.py | 0 .../tests/unit/engines/rsync/test_pyrsync.py | 56 ++ freezer/utils/compress.py | 110 ++ freezer/utils/config.py | 18 +- freezer/utils/crypt.py | 92 ++ freezer/utils/streaming.py | 4 +- requirements.txt | 1 + 23 files changed, 1602 insertions(+), 42 deletions(-) create mode 100644 freezer/engine/rsync/__init__.py create mode 100644 freezer/engine/rsync/pyrsync.py create mode 100644 freezer/engine/rsync/rsync.py create mode 100644 freezer/tests/integration/test_rsync_backup.py create mode 100644 freezer/tests/unit/engines/rsync/__init__.py create mode 100644 freezer/tests/unit/engines/rsync/test_pyrsync.py create mode 100644 freezer/utils/compress.py create mode 100644 freezer/utils/crypt.py diff --git a/README.rst b/README.rst index 2e71a6cc..69f228e6 100644 --- a/README.rst +++ b/README.rst @@ -86,7 +86,7 @@ Freezer Components | | It can be executed standalone or by the Freezer Scheduler. | | | The Freezer Agent provides a flexible way to execute backup, restore and other actions on a running system. | | | In order to provide flexibility in terms of data integrity, speed, performance, resources usage, etc the freezer agent offers a | -| | wide range of options to execute optimized backup according to the available resources as: | +| | wide range of options to execute optimized backup according to the available resources as: | | | | | | - Segments size (the amount of memory used) | | | - Queues size (optimize backups where I/O, bandwidth, memory or CPU is a constraint) | @@ -124,6 +124,7 @@ Linux Requirements - libffi-dev - libssl-dev - python-dev +- pycrypto - At least 128 MB of memory reserved for Freezer Windows Requirements @@ -553,9 +554,9 @@ Freezer architectural components are the following: - OpenStack Swift (the storage) - freezer client running on the node where the backups and restores are to be executed -Freezer uses GNU Tar under the hood to execute incremental backup and -restore. When a key is provided, it uses OpenSSL to encrypt data. -(AES-256-CFB) +Freezer uses GNU Tar or Rsync algorithm under the hood to execute incremental backup and +restore. When a key is provided, it uses OpenSSL or pycrypto module (OpenSSL compatible) +to encrypt data. (AES-256-CFB) ============= @@ -582,7 +583,7 @@ The Freezer architecture is composed of the following components: | | It can be executed standalone or by the Freezer Scheduler. | | | The freezer-agent provides a flexible way to execute backup, restore and other actions on a running system. | | | In order to provide flexibility in terms of data integrity, speed, performance, resources usage, etc the freezer agent offers a | -| | wide range of options to execute optimized backup according to the available resources as: | +| | wide range of options to execute optimized backup according to the available resources as: | | | | | | - Segments size (the amount of memory used) | | | - Queues size (optimize backups where I/O, bandwidth, memory or CPU is a constraint) | diff --git a/doc/source/index.rst b/doc/source/index.rst index 902c2699..fa526759 100644 --- a/doc/source/index.rst +++ b/doc/source/index.rst @@ -20,6 +20,7 @@ Key Features - Low storage consumption as the backup are uploaded as a stream - Flexible backup policy (incremental and differential) - Data is archived in GNU Tar format for file based incremental +- Block based backup support (rsync) - Multiple compression algorithm support (zlib, bzip2, xz) - Remove old backup automatically according to the provided parameters - Multiple storage media support (Swift, local file system, ssh) diff --git a/freezer/common/config.py b/freezer/common/config.py index 20c8961a..8567f3aa 100644 --- a/freezer/common/config.py +++ b/freezer/common/config.py @@ -114,9 +114,18 @@ _COMMON = [ "(OpenStack Instance). Default set to fs"), cfg.StrOpt('engine', short='e', + choices=['tar', 'rsync'], dest='engine_name', default=DEFAULT_PARAMS['engine_name'], - help="Engine to be used for backup/restore."), + help="Engine to be used for backup/restore. " + "With tar, the file inode will be checked for changes " + "amid backup execution. If the file inode changed, the " + "whole file will be backed up. With rsync, the data " + "blocks changes will be verified and only the changed " + "blocks will be backed up. Tar is faster, but is uses " + "more space and bandwidth. Rsync is slower, but uses " + "less space and bandwidth." + ), cfg.StrOpt('container', short='C', default=DEFAULT_PARAMS['container'], diff --git a/freezer/engine/engine.py b/freezer/engine/engine.py index 9f3a1571..9da2fec9 100644 --- a/freezer/engine/engine.py +++ b/freezer/engine/engine.py @@ -230,7 +230,8 @@ class BackupEngine(object): if not overwrite and not utils.is_empty_dir(restore_path): raise Exception( "Restore dir is not empty. " - "Please use --overwrite or provide different path.") + "Please use --overwrite or provide different path " + "or remove the content of {}".format(restore_path)) LOG.info("Restore path creation completed") backups = self.storage.get_latest_level_zero_increments( @@ -244,6 +245,7 @@ class BackupEngine(object): read_except_queue = queues.SimpleQueue() LOG.info("Restoring backup {0}".format(hostname_backup_name)) for level in range(0, max_level + 1): + LOG.info("Restoring from level {0}".format(level)) backup = backups[level] read_pipe, write_pipe = multiprocessing.Pipe() process_stream = multiprocessing.Process( @@ -259,16 +261,17 @@ class BackupEngine(object): # Use SimpleQueue because Queue does not work on Mac OS X. write_except_queue = queues.SimpleQueue() - tar_stream = multiprocessing.Process( + engine_stream = multiprocessing.Process( target=self.restore_level, args=(restore_path, read_pipe, backup, write_except_queue)) - tar_stream.daemon = True - tar_stream.start() + engine_stream.daemon = True + engine_stream.start() + read_pipe.close() write_pipe.close() process_stream.join() - tar_stream.join() + engine_stream.join() # SimpleQueue handling is different from queue handling. def handle_except_SimpleQueue(except_queue): @@ -286,7 +289,7 @@ class BackupEngine(object): got_exception = (handle_except_SimpleQueue(write_except_queue) or got_exception) - if tar_stream.exitcode or got_exception: + if engine_stream.exitcode or got_exception: raise engine_exceptions.EngineException( "Engine error. Failed to restore.") diff --git a/freezer/engine/rsync/__init__.py b/freezer/engine/rsync/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/freezer/engine/rsync/pyrsync.py b/freezer/engine/rsync/pyrsync.py new file mode 100644 index 00000000..8402235c --- /dev/null +++ b/freezer/engine/rsync/pyrsync.py @@ -0,0 +1,159 @@ +""" +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +This is a pure Python implementation of the [rsync algorithm] [TM96]. +Updated to use SHA256 hashing (instead of the standard implementation +which uses outdated MD5 hashes), and packages for disutils +distribution by Isis Lovecruft, . The +majority of the code is blatantly stolen from Eric Pruitt's code +as posted on [ActiveState] [1]. +[1]: https://code.activestate.com/recipes/577518-rsync-algorithm/ +[TM96]: Andrew Tridgell and Paul Mackerras. The rsync algorithm. +Technical Report TR-CS-96-05, Canberra 0200 ACT, Australia, 1996. +http://samba.anu.edu.au/rsync/. + +""" + +import collections +import hashlib + +import six +from six.moves import range + + +if six.PY2: + # Python 2.x compatibility + def bytes(var, *args): + try: + return ''.join(map(chr, var)) + except TypeError: + return map(ord, var) + +__all__ = ["rollingchecksum", "weakchecksum", "rsyncdelta", + "blockchecksums"] + + +def rollingchecksum(removed, new, a, b, blocksize=4096): + """ + Generates a new weak checksum when supplied with the internal state + of the checksum calculation for the previous window, the removed + byte, and the added byte. + """ + a -= removed - new + b -= removed * blocksize - a + return (b << 16) | a, a, b + + +def weakchecksum(data): + """ + Generates a weak checksum from an iterable set of bytes. + """ + a = b = 0 + l = len(data) + for i in range(l): + a += data[i] + b += (l - i) * data[i] + + return (b << 16) | a, a, b + + +def blockchecksums(instream, blocksize=4096): + """ + Returns a list of weak and strong hashes for each block of the + defined size for the given data stream. + """ + + weakhashes = list() + stronghashes = list() + read = instream.read(blocksize) + + while read: + weakhashes.append(weakchecksum(bytes(read))[0]) + stronghashes.append(hashlib.sha1(read).hexdigest()) + read = instream.read(blocksize) + + return weakhashes, stronghashes + + +def rsyncdelta(datastream, remotesignatures, blocksize=4096): + """ + Generates a binary patch when supplied with the weak and strong + hashes from an unpatched target and a readable stream for the + up-to-date data. The blocksize must be the same as the value + used to generate remotesignatures. + """ + + remote_weak, remote_strong = remotesignatures + + match = True + matchblock = -1 + last_byte = [] + while True: + if match and datastream is not None: + # Whenever there is a match or the loop is running for the first + # time, populate the window using weakchecksum instead of rolling + # through every single byte which takes at least twice as long. + window = collections.deque(bytes(datastream.read(blocksize))) + checksum, a, b = weakchecksum(window) + + try: + # If there are two identical weak checksums in a file, and the + # matching strong hash does not occur at the first match, it will + # be missed and the data sent over. May fix eventually, but this + # problem arises very rarely. + matchblock = remote_weak.index(checksum, matchblock + 1) + stronghash = hashlib.sha1(bytes(window)).hexdigest() + matchblock = remote_strong.index(stronghash, matchblock) + + match = True + # print "MATCHBLOCK: {}".format(matchblock) + # print "MATCHBLOCK TYPE: {}".format(type(matchblock)) + # print "LAST BYTE WHEN MATCH: {}".format(last_byte) + if last_byte: + yield bytes(last_byte) + last_byte = [] + yield matchblock + if datastream.closed: + break + continue + + except ValueError: + # The weakchecksum did not match + match = False + try: + if datastream: + # Get the next byte and affix to the window + newbyte = ord(datastream.read(1)) + window.append(newbyte) + except TypeError: + # No more data from the file; the window will slowly shrink. + # newbyte needs to be zero from here on to keep the checksum + # correct. + newbyte = 0 + tailsize = datastream.tell() % blocksize + datastream = None + + if datastream is None and len(window) <= tailsize: + # The likelihood that any blocks will match after this is + # nearly nil so call it quits. + yield bytes(window) + break + + # Yank off the extra byte and calculate the new window checksum + oldbyte = window.popleft() + checksum, a, b = rollingchecksum(oldbyte, newbyte, a, b, blocksize) + + last_byte.append(oldbyte) + if len(last_byte) == blocksize: + yield bytes(last_byte) + last_byte = [] diff --git a/freezer/engine/rsync/rsync.py b/freezer/engine/rsync/rsync.py new file mode 100644 index 00000000..6d2430d6 --- /dev/null +++ b/freezer/engine/rsync/rsync.py @@ -0,0 +1,942 @@ +"""Freezer rsync incremental engine + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import getpass +import grp +import json +import os +import pwd +import Queue +import re +import stat +import sys +import threading + +from oslo_log import log +from six.moves import cStringIO + +from freezer.engine import engine +from freezer.engine.rsync import pyrsync +from freezer.utils import compress +from freezer.utils import crypt +from freezer.utils import winutils + +LOG = log.getLogger(__name__) + +# Data block size of used by rsync to generate signature +RSYNC_BLOCK_SIZE = 4096 +# Version of the meta data structure format +RSYNC_DATA_STRUCT_VERSION = 1 +# Rsync main block size for streams, 32MB (1024*1024*32) +RSYNC_BLOCK_BUFF_SIZE = 33554432 +# Files type where file data content can be backed up or restored +REG_FILE = ('r', 'u') + + +class RsyncEngine(engine.BackupEngine): + + def __init__( + self, compression, symlinks, exclude, storage, + max_segment_size, encrypt_key=None, + dry_run=False): + self.compression_algo = compression + self.encrypt_pass_file = encrypt_key + self.dereference_symlink = symlinks + self.exclude = exclude + self.storage = storage + self.is_windows = winutils.is_windows() + self.dry_run = dry_run + self.max_segment_size = max_segment_size + # Compression and encryption objects + self.compressor = None + self.cipher = None + super(RsyncEngine, self).__init__(storage=storage) + + @property + def name(self): + return "rsync" + + def metadata(self): + return { + "engine_name": self.name, + "compression": self.compression_algo, + # the encrypt_pass_file might be key content so we need to convert + # to boolean + "encryption": bool(self.encrypt_pass_file) + } + + def backup_data(self, backup_path, manifest_path): + """Execute backup using rsync algorithm. + + If an existing rsync meta data is available the backup + will be incremental, otherwise will be executed a level 0 backup + + :param backup_path: + :param manifest_path: + :return: + """ + + LOG.info("Starting RSYNC engine backup data stream") + + file_read_limit = 0 + data_chunk = b'' + LOG.info( + 'Recursively archiving and compressing files from {}'.format( + os.getcwd())) + + self.compressor = compress.Compressor(self.compression_algo) + + if self.encrypt_pass_file: + self.cipher = crypt.AESEncrypt(self.encrypt_pass_file) + data_chunk += self.cipher.generate_header() + + rsync_queue = Queue.Queue(maxsize=2) + + t_get_sign_delta = threading.Thread( + target=self.get_sign_delta, + args=( + backup_path, manifest_path, rsync_queue)) + t_get_sign_delta.daemon = True + + t_get_sign_delta.start() + + while True: + file_block = rsync_queue.get() + + if file_block is False: + break + if len(file_block) == 0: + continue + + data_chunk += file_block + file_read_limit += len(file_block) + if file_read_limit >= self.max_segment_size: + yield data_chunk + data_chunk = b'' + file_read_limit = 0 + + # Upload segments smaller then max_segment_size + if len(data_chunk) < self.max_segment_size: + yield data_chunk + + # Rejoining thread + t_get_sign_delta.join() + + def restore_level(self, restore_path, read_pipe, backup, except_queue): + """Restore the provided file into restore_abs_path. + + Decrypt the file if backup_opt_dict.encrypt_pass_file key is provided. + Freezer rsync header data structure: + + header_len, RSYNC_DATA_STRUCT_VERSION, file_mode, + os_stat.st_uid, os_stat.st_gid, os_stat.st_size, + mtime, ctime, uname, gname, file_type, linkname + + :param restore_path: + :param read_pipe: + :param backup: + :param except_queue: + :return: + """ + try: + metadata = backup.metadata() + if (not self.encrypt_pass_file and + metadata.get("encryption", False)): + raise Exception("Cannot restore encrypted backup without key") + + self.compression_algo = metadata.get('compression', + self.compression_algo) + + if not os.path.exists(restore_path): + raise ValueError( + 'Provided restore path does not exist: {0}'.format( + restore_path)) + + if self.dry_run: + restore_path = '/dev/null' + + raw_data_chunk = read_pipe.recv_bytes() + + self.compressor = compress.Decompressor(self.compression_algo) + + if self.encrypt_pass_file: + self.cipher = crypt.AESDecrypt(self.encrypt_pass_file, + raw_data_chunk[:16]) + raw_data_chunk = raw_data_chunk[16:] + + data_chunk = self.process_restore_data(raw_data_chunk) + + header_str = r'^(\d{1,})\00' + flushed = False + while True: + header_match = re.search(header_str, data_chunk) + if not header_match and not flushed: + try: + data_chunk += self.process_restore_data( + read_pipe.recv_bytes()) + continue + except EOFError: + LOG.info("EOFError: Pipe closed. Flushing buffer...") + data_chunk += self.compressor.flush() + flushed = True + + if data_chunk and header_match: + header_len = int(header_match.group(1)) + if header_len > len(data_chunk) and not flushed: + try: + data_chunk += self.process_restore_data( + read_pipe.recv_bytes()) + except EOFError: + LOG.info("[*] End of File: Pipe closed. " + "Flushing the buffer.") + data_chunk += self.compressor.flush() + flushed = True + + header = data_chunk[:header_len] + header_list = header.split('\00') + data_chunk = data_chunk[header_len:] + data_chunk = self.make_files( + header_list, restore_path, read_pipe, + data_chunk, flushed, backup.level) + else: + LOG.info('No more data available...') + break + except Exception as e: + LOG.exception(e) + except_queue.put(e) + raise + + def process_backup_data(self, data, do_compress=True): + """Compresses and encrypts provided data according to args""" + + if do_compress: + data = self.compressor.compress(data) + + if self.encrypt_pass_file: + data = self.cipher.encrypt(data) + return data + + def process_restore_data(self, data): + """Decrypts and decompresses provided data according to args""" + + if self.encrypt_pass_file: + data = self.cipher.decrypt(data) + + data = self.compressor.decompress(data) + return data + + @staticmethod + def rsync_gen_delta(file_path_fd, old_file_meta): + """Get rsync delta for file descriptor provided as arg. + + :param file_path_fd: + :param old_file_meta: + :return: + """ + + if not old_file_meta: + raise StopIteration + + # If the ctime or mtime has changed, the delta is computed + # data block is returned + + len_deltas = 0 + old_signature = old_file_meta['signature'] + # Get changed blocks index only + all_changed_indexes = cStringIO() + file_path_fd.seek(0) + previous_index = -1 + modified_blocks = [] + for block_index in pyrsync.rsyncdelta( + file_path_fd, + (old_signature[0], old_signature[1]), + RSYNC_BLOCK_SIZE): + + previous_index += 1 + + if not isinstance(block_index, int): + len_deltas += len(block_index) + all_changed_indexes.write(b'\00{}'.format(previous_index)) + modified_blocks.append(previous_index) + + # Yield the total length data changed blocks + + yield b'\00' + str(len_deltas) + previous_index_str = all_changed_indexes.getvalue() + b'\00' + len_previous_index_str = len(previous_index_str) + 1 + # Yield the length of the string that contain all the indexes + + yield b'\00' + str(len_previous_index_str) + b'\00' + # Yield string containing all the indexes separated by \00 + + yield previous_index_str + + # Get blocks of changed data + file_path_fd.seek(0) + for block_index in modified_blocks: + offset = block_index * RSYNC_BLOCK_SIZE + file_path_fd.seek(offset) + data_block = file_path_fd.read(RSYNC_BLOCK_SIZE) + + yield data_block + + @staticmethod + def is_file_modified(old_file_meta, file_meta): + """Check for changes on inode or file data + + :param old_file_meta: meta data of the previous backup execution + :param file_meta: meta data of the current backup execution + :return: True if the file changed, False otherwise + """ + + # Get mtime and ctime from previous backup execution + old_file_mtime = old_file_meta['inode']['mtime'] + old_file_ctime = old_file_meta['inode']['ctime'] + fs_index_file_mtime = file_meta['inode']['mtime'] + fs_index_file_ctime = file_meta['inode']['ctime'] + + # Check if new ctime/mtime is different from the current one + file_change_flag = None + if old_file_mtime != fs_index_file_mtime: + file_change_flag = True + elif old_file_ctime != fs_index_file_ctime: + file_change_flag = True + + # TODO(raliev): There is also need to add check size of files + return file_change_flag + + def write_file(self, file_fd, size, data_chunk, read_pipe, flushed): + while size: + written_data = min(len(data_chunk), size) + file_fd.write(data_chunk[:written_data]) + data_chunk = data_chunk[written_data:] + size -= written_data + if size > 0 and not flushed: + try: + data_chunk += self.process_restore_data( + read_pipe.recv_bytes()) + except EOFError: + LOG.info( + "[*] EOF from pipe. Flushing buffer.") + data_chunk += self.compressor.flush() + flushed = True + continue + elif flushed: + break + return data_chunk + + def write_changes_in_file(self, fd_curr_file, data_chunk, read_pipe): + # Searching for: + # - the block incremental header string + # - len of all the changed blocks + # - len of the index string + offset_match = re.search(r'^(\00(\d+?)\00(\d+?)\00)', data_chunk) + + if offset_match: + offset_size = len(offset_match.group(1)) + len_deltas = int(offset_match.group(2)) + len_offset_str = int(offset_match.group(3)) - 1 + + no_of_blocks, reminder = divmod(len_deltas, RSYNC_BLOCK_SIZE) + if len_deltas > 0: + block_indexes = ( + data_chunk[offset_size: offset_size + len_offset_str]) + data_chunk = data_chunk[offset_size + len(block_indexes):] + blocks_offsets = filter(None, block_indexes.split('\00')) + # Get all the block index offset from + for block_index in blocks_offsets: + while len(data_chunk) < RSYNC_BLOCK_SIZE: + try: + data_chunk += self.process_restore_data( + read_pipe.recv_bytes()) + except EOFError: + LOG.info( + "[*] EOF from pipe. Flushing buffer.") + data_chunk += self.compressor.flush() + break + + offset = int(block_index) * RSYNC_BLOCK_SIZE + fd_curr_file.seek(offset) + fd_curr_file.write(data_chunk[:RSYNC_BLOCK_SIZE]) + data_chunk = data_chunk[RSYNC_BLOCK_SIZE:] + + if reminder: + fd_curr_file.write(data_chunk[:reminder]) + data_chunk = data_chunk[reminder:] + + return data_chunk + + def make_reg_file( + self, size, file_path, read_pipe, data_chunk, + flushed, level_id): + """Create the regular file and write data on it. + + :param size: + :param file_path: + :param read_pipe: + :param data_chunk: + :param flushed: + :param level_id: + :return: + """ + + # File is created. If size is 0, no content is written and the + # function return + + if level_id == '0000': + fd_curr_file = open(file_path, 'wb') + data_chunk = self.write_file(fd_curr_file, size, data_chunk, + read_pipe, flushed) + elif level_id == '1111': + fd_curr_file = open(file_path, 'rb+') + data_chunk = self.write_changes_in_file(fd_curr_file, + data_chunk, read_pipe) + fd_curr_file.close() + return data_chunk + + def set_inode(self, uname, gname, mtime, name): + """Set the file inode fields according to the provided args. + + :param uname: + :param gname: + :param mtime: + :param name: + :return: + """ + + try: + current_uid = pwd.getpwnam(uname).pw_uid + current_gid = grp.getgrnam(gname).gr_gid + os.chown(name, current_uid, current_gid) + os.utime(name, (mtime, mtime)) + except (IOError, OSError): + try: + current_uid = pwd.getpwnam(getpass.getuser()).pw_uid + current_gid = grp.getgrnam(getpass.getuser()).gr_gid + os.chown(name, current_uid, current_gid) + os.utime(name, (mtime, mtime)) + except (OSError, IOError) as err: + raise Exception(err) + + @staticmethod + def get_file_type(file_mode, fs_path): + """Extract file type from the the file mode retrieved + from file abs path + + :param file_mode: + :param fs_path: + :return: + """ + + if stat.S_ISREG(file_mode): + return 'r', '' + if stat.S_ISDIR(file_mode): + return 'd', '' + if stat.S_ISLNK(file_mode): + return 'l', os.readlink(fs_path) + if stat.S_ISCHR(file_mode): + return 'c', '' + if stat.S_ISBLK(file_mode): + return 'b', '' + if stat.S_ISFIFO(file_mode): + return 'p', '' + if stat.S_ISSOCK(file_mode): + return 's', '' + + return 'u', '' + + @staticmethod + def gen_file_header(file_path, inode_str_struct): + """Generate file header for rsync binary data file + + :param file_path: file path + :param inode_str_struct: file binary string including inode data + :return: chunk of binary data to be processed on the next iteration + """ + + start_of_block = b'\00{}\00'.format(file_path) + header_size = len(start_of_block) + len(inode_str_struct) + header_size += len(str(header_size)) + file_header = b'{}{}{}'.format( + header_size, start_of_block, inode_str_struct) + len_file_header = len(file_header) + + if header_size != len_file_header: + file_header_list = file_header.split('\00') + file_header_list = file_header_list[1:] + file_header_list.insert(0, str(len_file_header)) + file_header = '\00'.join(file_header_list) + return file_header + + def make_files( + self, header_list, restore_abs_path, read_pipe, + data_chunk, flushed, + current_backup_level): + """ + Header list binary structure: + + header_len, file_abs_path, RSYNC_DATA_STRUCT_VERSION, file_mode, + os_stat.st_uid, os_stat.st_gid, os_stat.st_size, + mtime, ctime, uname, gname, file_type, linkname, rsync_block_size, + + :param header_list: + :param restore_abs_path: + :param read_pipe: + :param data_chunk: + :param flushed: + :param current_backup_level: + :return: + """ + + # The following commented lines are important for development and + # troubleshooting, please let it go :) + # header_len = header_list[0] + file_path = header_list[1] + # data_ver = header_list[2] + file_mode = header_list[3] + # uid = header_list[4] + # gid = header_list[5] + size = header_list[6] + mtime = header_list[7] + # ctime = header_list[8] + uname = header_list[9] + gname = header_list[10] + file_type = header_list[11] + link_name = header_list[12] + # inumber = header_list[13] + # nlink = header_list[14] + devminor = header_list[15] + devmajor = header_list[16] + # rsync_block_size = header_list[17] + level_id = header_list[18] + rm = header_list[19] + + # Data format conversion + file_mode = int(file_mode) + size = int(size) + mtime = int(mtime) + file_abs_path = '{0}/{1}'.format(restore_abs_path, file_path) + + if not os.path.isdir(file_abs_path) and os.path.exists( + file_abs_path) and current_backup_level == 0: + os.unlink(file_abs_path) + + if not os.path.isdir(file_abs_path) and os.path.exists( + file_abs_path) and current_backup_level != 0 and rm == '1111': + os.unlink(file_abs_path) + return data_chunk + + if file_type in REG_FILE: + data_chunk = self.make_reg_file( + size, file_abs_path, read_pipe, data_chunk, + flushed, level_id) + + elif file_type == 'd': + try: + os.makedirs(file_abs_path, file_mode) + except (OSError, IOError) as error: + if error.errno != 17: # E_EXIST + LOG.warning( + 'Directory {0} creation error: {1}'.format( + file_abs_path, error)) + + elif file_type == 'b': + file_mode |= stat.S_IFBLK + try: + devmajor = int(devmajor) + devminor = int(devminor) + new_dev = os.makedev(devmajor, devminor) + os.mknod(file_abs_path, file_mode, new_dev) + except (OSError, IOError) as error: + LOG.warning( + 'Block file {0} creation error: {1}'.format( + file_abs_path, error)) + + elif file_type == 'c': + file_mode |= stat.S_IFCHR + try: + devmajor = int(devmajor) + devminor = int(devminor) + new_dev = os.makedev(devmajor, devminor) + os.mknod(file_abs_path, file_mode, new_dev) + except (OSError, IOError) as error: + LOG.warning( + 'Character file {0} creation error: {1}'.format( + file_abs_path, error)) + + elif file_type == 'p': + try: + os.mkfifo(file_abs_path) + except (OSError, IOError) as error: + LOG.warning( + 'FIFO or Pipe file {0} creation error: {1}'.format( + file_abs_path, error)) + + elif file_type == 'l': + try: + os.symlink(link_name, file_abs_path) + except (OSError, IOError) as error: + LOG.warning('Link file {0} creation error: {1}'.format( + file_abs_path, error)) + + if file_type != 'l': + self.set_inode(uname, gname, mtime, file_abs_path) + + return data_chunk + + def get_file_struct(self, fs_path, new_level=False): + """Generate file meta data from file abs path. + + Return the meta data as a dict structure and a binary string + + :param fs_path: file abs path + :param new_level + :return: file data structure + """ + + # Get file inode information, whether the file is a regular + # file or a symbolic link + try: + os_stat = os.lstat(fs_path) + except (OSError, IOError) as error: + raise Exception('[*] Error on file stat: {}'.format(error)) + + file_mode = os_stat.st_mode + # Get file type. If file type is a link it returns also the + # file pointed by the link + file_type, lname = self.get_file_type(file_mode, fs_path) + + # If file_type is a socket return False + if file_type == 's': + return False, False + + ctime = int(os_stat.st_ctime) + mtime = int(os_stat.st_mtime) + uname = pwd.getpwuid(os_stat.st_uid)[0] + gname = grp.getgrgid(os_stat.st_gid)[0] + + dev = os_stat.st_dev + inumber = os_stat.st_ino + nlink = os_stat.st_nlink + uid = os_stat.st_uid + gid = os_stat.st_gid + size = os_stat.st_size + devmajor = os.major(dev) + devminor = os.minor(dev) + + level_id = '0000' + if new_level: + level_id = '1111' + + # build file meta data as dictionary + inode_dict = { + 'inode': { + 'inumber': inumber, + 'nlink': nlink, + 'mode': file_mode, + 'uid': uid, + 'gid': gid, + 'size': size, + 'devmajor': devmajor, + 'devminor': devminor, + 'mtime': mtime, + 'ctime': ctime, + 'uname': uname, + 'gname': gname, + 'ftype': file_type, + 'lname': lname, + 'rsync_block_size': RSYNC_BLOCK_SIZE, + 'level_id': level_id, + 'deleted': '0000' + } + } + + # build file meta data as binary string + inode_bin_str = ( + b'{}\00{}\00{}\00{}\00{}' + b'\00{}\00{}\00{}\00{}\00{}' + b'\00{}\00{}\00{}\00{}\00{}\00{}\00{}\00{}').format( + RSYNC_DATA_STRUCT_VERSION, file_mode, + uid, gid, size, mtime, ctime, uname, gname, + file_type, lname, inumber, nlink, devminor, devmajor, + RSYNC_BLOCK_SIZE, level_id, '0000') + + return inode_dict, inode_bin_str + + def gen_struct_for_deleted_files(self, files_meta, old_fs_meta_struct, + rel_path, write_queue): + files_meta['files'][rel_path] = old_fs_meta_struct['files'][rel_path] + files_meta['files'][rel_path]['inode']['deleted'] = '1111' + file_mode = files_meta['files'][rel_path]['inode']['mode'] + uid = files_meta['files'][rel_path]['inode']['uid'] + gid = files_meta['files'][rel_path]['inode']['gid'] + size = files_meta['files'][rel_path]['inode']['size'] + mtime = files_meta['files'][rel_path]['inode']['mtime'] + ctime = files_meta['files'][rel_path]['inode']['ctime'] + uname = files_meta['files'][rel_path]['inode']['uname'] + gname = files_meta['files'][rel_path]['inode']['gname'] + file_type = files_meta['files'][rel_path]['inode']['ftype'] + lname = files_meta['files'][rel_path]['inode']['lname'] + inumber = files_meta['files'][rel_path]['inode']['inumber'] + nlink = files_meta['files'][rel_path]['inode']['nlink'] + devminor = files_meta['files'][rel_path]['inode']['devminor'] + devmajor = files_meta['files'][rel_path]['inode']['devmajor'] + level_id = files_meta['files'][rel_path]['inode']['level_id'] + + inode_bin_str = ( + b'{}\00{}\00{}\00{}\00{}' + b'\00{}\00{}\00{}\00{}\00{}' + b'\00{}\00{}\00{}\00{}\00{}\00{}\00{}\00{}').format( + RSYNC_DATA_STRUCT_VERSION, file_mode, + uid, gid, size, mtime, ctime, uname, gname, + file_type, lname, inumber, nlink, devminor, devmajor, + RSYNC_BLOCK_SIZE, level_id, '1111') + file_header = self.gen_file_header(rel_path, inode_bin_str) + compr_block = self.process_backup_data(file_header) + write_queue.put(compr_block) + + def process_file(self, file_path, fs_path, files_meta, + old_fs_meta_struct, write_queue): + rel_path = os.path.relpath(file_path, fs_path) + + new_level = True if self.get_old_file_meta( + old_fs_meta_struct, rel_path) else False + + inode_dict_struct, inode_str_struct = self.get_file_struct( + rel_path, new_level) + + if not inode_dict_struct: + return + + if os.path.isdir(file_path): + files_meta['directories'][file_path] = inode_dict_struct + files_meta['meta']['backup_size_on_disk'] += os.path.getsize( + rel_path) + file_header = self.gen_file_header(rel_path, inode_str_struct) + + compressed_block = self.process_backup_data(file_header) + files_meta['meta']['backup_size_compressed'] += len( + compressed_block) + write_queue.put(compressed_block) + else: + file_metadata = self.compute_incrementals( + rel_path, inode_str_struct, + inode_dict_struct, files_meta, + old_fs_meta_struct, write_queue) + + files_meta.update(file_metadata) + + def get_sign_delta(self, fs_path, manifest_path, write_queue): + """Compute the file or fs tree path signatures. + + :param fs_path: + :param manifest_path + :param write_queue: + :return: + """ + + files_meta = { + 'files': {}, + 'directories': {}, + 'meta': { + 'broken_links_tot': '', + 'total_files': '', + 'total_directories': '', + 'backup_size_on_disk': 0, + 'backup_size_uncompressed': 0, + 'backup_size_compressed': 0, + 'platform': sys.platform + }, + 'abs_backup_path': os.getcwd(), + 'broken_links': [], + 'rsync_struct_ver': RSYNC_DATA_STRUCT_VERSION, + 'rsync_block_size': RSYNC_BLOCK_SIZE} + + # Get old file meta structure or an empty dict if not available + old_fs_meta_struct = self.get_fs_meta_struct(manifest_path) + + if os.path.isdir(fs_path): + # If given path is a directory, change cwd to path to backup + os.chdir(fs_path) + for root, dirs, files in os.walk(fs_path): + self.process_file(root, fs_path, files_meta, + old_fs_meta_struct, write_queue) + + # Check if exclude is in filename. If it is, log the file + # exclusion and continue to the next iteration. + if self.exclude: + files = [name for name in files if + self.exclude not in name] + if files: + LOG.warning( + ('Excluding file names matching with: ' + '{}'.format(self.exclude))) + + for name in files: + file_path = os.path.join(root, name) + self.process_file(file_path, fs_path, + files_meta, old_fs_meta_struct, + write_queue) + else: + self.process_file(fs_path, os.getcwd(), files_meta, + old_fs_meta_struct, write_queue) + if old_fs_meta_struct: + for rel_path in old_fs_meta_struct['files']: + if not files_meta['files'].get(rel_path): + self.gen_struct_for_deleted_files( + files_meta, old_fs_meta_struct, rel_path, write_queue) + + # Flush any compressed buffered data + flushed_data = self.compressor.flush() + if flushed_data: + flushed_data = self.process_backup_data(flushed_data, + do_compress=False) + files_meta['meta']['backup_size_compressed'] += len(flushed_data) + write_queue.put(flushed_data) + + # General metrics to be uploaded to the API and/or media storage + files_meta['meta']['broken_links_tot'] = len( + files_meta['broken_links']) + files_meta['meta']['total_files'] = len(files_meta['files']) + files_meta['meta']['total_directories'] = len( + files_meta['directories']) + files_meta['meta']['rsync_data_struct_ver'] = RSYNC_DATA_STRUCT_VERSION + LOG.info("Backup session metrics: {0}".format( + files_meta['meta'])) + + # Compress meta data file + # Write meta data to disk as JSON + compressed_json_meta = compress.one_shot_compress( + self.compression_algo, json.dumps(files_meta)) + with open(manifest_path, 'wb') as manifest_file: + manifest_file.write(compressed_json_meta) + + # Put False on the queue so it will be terminated on the other side: + write_queue.put(False) + + def get_fs_meta_struct(self, fs_meta_path): + fs_meta_struct = {} + + if os.path.isfile(fs_meta_path): + with open(fs_meta_path) as meta_file: + fs_meta_struct = json.loads( + compress.one_shot_decompress(self.compression_algo, + meta_file.read())) + + return fs_meta_struct + + @staticmethod + def is_reg_file(file_type): + if file_type in REG_FILE: + return True + return False + + @staticmethod + def get_old_file_meta(old_fs_meta_struct, rel_path): + if old_fs_meta_struct: + return old_fs_meta_struct['files'].get(rel_path) + return None + + @staticmethod + def compute_checksums(rel_path, files_meta, reg_file=True): + # Files type where the file content can be backed up + if reg_file: + with open(rel_path, 'rb') as file_path_fd: + files_meta['files'][rel_path].update( + {'signature': pyrsync.blockchecksums( + file_path_fd, RSYNC_BLOCK_SIZE)}) + else: + # Stat the file to be sure it's not a broken link + if os.path.lexists(rel_path): + if not os.path.exists(rel_path): + raise IOError + + files_meta['files'][rel_path].update( + {'signature': [[], []]}) + + return files_meta + + def compute_incrementals( + self, rel_path, inode_str_struct, + inode_dict_struct, files_meta, + old_fs_meta_struct, write_queue, deleted=False): + + file_header = self.gen_file_header( + rel_path, inode_str_struct) + + # File size is header size + null bytes + file size + file_size = len(file_header) + reg_file_type = self.is_reg_file(inode_dict_struct['inode']['ftype']) + try: + files_meta['files'][rel_path] = inode_dict_struct + + # As pyrsync functions run thorough the file descriptor + # we need to set the set the pointer to the fd to 0. + # Backup file data content only if the file type is + # regular or unknown + old_file_meta = self.get_old_file_meta(old_fs_meta_struct, + rel_path) + if old_file_meta: + if self.is_file_modified(old_file_meta, + files_meta['files'][rel_path]): + # If old_fs_path is provided, it checks + # if old file mtime or ctime are different then + # the new ones + files_meta = self.compute_checksums(rel_path, files_meta, + reg_file=reg_file_type) + + compressed_block = self.process_backup_data(file_header) + write_queue.put(compressed_block) + + if reg_file_type: + file_path_fd = open(rel_path) + for data_block in self.rsync_gen_delta( + file_path_fd, old_file_meta): + + compressed_block = self.process_backup_data( + data_block) + write_queue.put(compressed_block) + file_path_fd.close() + else: + files_meta['files'][rel_path].update( + {'signature': old_file_meta['signature']}) + + else: + files_meta = self.compute_checksums( + rel_path, files_meta, + reg_file=reg_file_type) + + compressed_block = self.process_backup_data(file_header) + write_queue.put(compressed_block) + if reg_file_type: + file_path_fd = open(rel_path) + data_block = file_path_fd.read( + RSYNC_BLOCK_BUFF_SIZE) + while data_block: + compressed_block = self.process_backup_data(data_block) + write_queue.put(compressed_block) + data_block = file_path_fd.read( + RSYNC_BLOCK_BUFF_SIZE) + + file_path_fd.close() + files_meta['files'][rel_path]['file_data_len'] = file_size + except (IOError, OSError) as error: + LOG.warning('IO or OS Error: {}'.format(error)) + if os.path.lexists(rel_path): + LOG.warning( + 'Broken link at: {}'.format(rel_path)) + files_meta['broken_links'].append(rel_path) + + return files_meta diff --git a/freezer/main.py b/freezer/main.py index da4b31d1..584837d5 100644 --- a/freezer/main.py +++ b/freezer/main.py @@ -16,6 +16,7 @@ limitations under the License. Freezer main execution function """ + import json import os import prettytable @@ -78,6 +79,7 @@ def freezer_main(backup_args): encrypt_key=backup_args.encrypt_pass_file, dry_run=backup_args.dry_run ) + if hasattr(backup_args, 'trickle_command'): if "tricklecount" in os.environ: if int(os.environ.get("tricklecount")) > 1: @@ -183,6 +185,7 @@ def get_client_manager(backup_args): def storage_from_dict(backup_args, max_segment_size): storage_name = backup_args['storage'] container = backup_args['container'] + if storage_name == "swift": client_manager = backup_args['client_manager'] @@ -200,8 +203,9 @@ def storage_from_dict(backup_args, max_segment_size): int(backup_args.get('ssh_port', freezer_config.DEFAULT_SSH_PORT)), max_segment_size=max_segment_size) else: - raise Exception("Not storage found for name {0}".format( + raise Exception("No storage found for name {0}".format( backup_args['storage'])) + return storage diff --git a/freezer/storage/base.py b/freezer/storage/base.py index 4ea38ee3..a73f3935 100644 --- a/freezer/storage/base.py +++ b/freezer/storage/base.py @@ -16,10 +16,10 @@ import abc import json -import six import tempfile from oslo_log import log +import six from freezer.utils import utils @@ -214,9 +214,6 @@ class Backup(object): :type timestamp: int :param level: current incremental level of backup :type level: int - :param tar_meta: Is backup has or has not an attached meta - tar file in storage. Default = False - :type tar_meta: bool :return: """ self.hostname_backup_name = hostname_backup_name diff --git a/freezer/storage/fslike.py b/freezer/storage/fslike.py index 21241398..0e9c5abb 100644 --- a/freezer/storage/fslike.py +++ b/freezer/storage/fslike.py @@ -13,10 +13,11 @@ # limitations under the License. import abc +import json + import six from freezer.storage import physical -import json @six.add_metaclass(abc.ABCMeta) diff --git a/freezer/storage/physical.py b/freezer/storage/physical.py index 8a9a93d9..6219835a 100644 --- a/freezer/storage/physical.py +++ b/freezer/storage/physical.py @@ -17,6 +17,7 @@ limitations under the License. import abc import os + import six from freezer.storage import base diff --git a/freezer/storage/ssh.py b/freezer/storage/ssh.py index 0c412068..a3412b11 100644 --- a/freezer/storage/ssh.py +++ b/freezer/storage/ssh.py @@ -22,7 +22,6 @@ import stat import paramiko from freezer.storage import fslike - from freezer.utils import utils CHUNK_SIZE = 32768 diff --git a/freezer/storage/swift.py b/freezer/storage/swift.py index d20946b6..671010e0 100644 --- a/freezer/storage/swift.py +++ b/freezer/storage/swift.py @@ -17,11 +17,10 @@ limitations under the License. import json import os -import requests import time from oslo_log import log - +import requests from freezer.storage import physical diff --git a/freezer/tests/integration/common.py b/freezer/tests/integration/common.py index ed4601b4..13e27368 100644 --- a/freezer/tests/integration/common.py +++ b/freezer/tests/integration/common.py @@ -18,6 +18,7 @@ import hashlib import itertools import json import os +import random import shutil import subprocess import tempfile @@ -25,6 +26,7 @@ import unittest import paramiko import six +from six.moves import range FREEZERC = distutils.spawn.find_executable('freezer-agent') @@ -107,20 +109,26 @@ class Temp_Tree(object): def add_random_data(self, ndir=5, nfile=5, size=1024): """ - add some files containing randoma data + add some files containing random data :param ndir: number of dirs to create :param nfile: number of files to create in each dir :param size: size of files :return: None """ - for x in range(ndir): + def create_file(path): + abs_pathname = self.create_file_with_random_data( + dir_path=path, size=size) + rel_path_name = abs_pathname[len(self.path) + 1:] + self.files.append(rel_path_name) + + for _ in range(nfile): + create_file(self.path) + + for _ in range(ndir): subdir_path = tempfile.mkdtemp(dir=self.path) - for y in range(nfile): - abs_pathname = self.create_file_with_random_data( - dir_path=subdir_path, size=size) - rel_path_name = abs_pathname[len(self.path) + 1:] - self.files.append(rel_path_name) + for _ in range(nfile): + create_file(subdir_path) def create_file_with_random_data(self, dir_path, size=1024): handle, abs_pathname = tempfile.mkstemp(dir=dir_path) @@ -128,6 +136,24 @@ class Temp_Tree(object): fd.write(os.urandom(size)) return abs_pathname + def modify_random_files(self, count=1): + indexes = [] + for _ in range(count): + indexes.append(random.randint(0, len(self.files) - 1)) + for file_index in indexes: + file_name = self.files[file_index] + with open(os.path.join(self.path, file_name), 'ab') as fd: + size_to_add = int(fd.tell() * 0.5) + fd.write(os.urandom(size_to_add)) + + def delete_random_files(self, count=1): + indexes = [] + for _ in range(count): + indexes.append(random.randint(0, len(self.files) - 1)) + for file_index in indexes: + file_name = self.files[file_index] + os.unlink(os.path.join(self.path, file_name)) + def get_file_hash(self, rel_filepath): filepath = os.path.join(self.path, rel_filepath) if os.path.isfile(filepath): diff --git a/freezer/tests/integration/test_agent.py b/freezer/tests/integration/test_agent.py index efb2e251..ed560176 100644 --- a/freezer/tests/integration/test_agent.py +++ b/freezer/tests/integration/test_agent.py @@ -24,7 +24,7 @@ from freezer.tests.integration import common class TestSimpleExecution(common.TestFS): def test_freezerc_executes(self): - result = common.execute_freezerc({}) + result = common.execute_freezerc({'version': ''}) self.assertIsNotNone(result) def test_freezerc_fails_with_wrong_params(self): @@ -78,8 +78,7 @@ class TestBackupFSLocalstorage(common.TestFS): self.assertIsNotNone(result) result = common.execute_freezerc(restore_args) self.assertIsNotNone(result) - self.assertTreesMatch() - self.do_backup_and_restore_with_check(backup_args, restore_args) + self.assertTreesMatch() def test_backup_preexisting_dir(self): """ diff --git a/freezer/tests/integration/test_rsync_backup.py b/freezer/tests/integration/test_rsync_backup.py new file mode 100644 index 00000000..be43d8ae --- /dev/null +++ b/freezer/tests/integration/test_rsync_backup.py @@ -0,0 +1,160 @@ +# Copyright 2017 Mirantis, Inc +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ======================================================================== + +from copy import copy +import os +import uuid + +from freezer.tests.integration import common + + +class TestBackupFSLocalRsync(common.TestFS): + def test_backup_single_level(self): + """ + - use the default source and destination trees in /tmp + (see common.TestFS) + - use temporary directory for backup storage + - add some random data + - check that trees don't match anymore + - execute block based backup of source tree + - execute restore into destination tree + - check that source and destination trees match + + :return: non on success + """ + self.source_tree.add_random_data() + self.assertTreesMatchNot() + + with common.Temp_Tree() as storage_dir: + backup_args = { + 'action': 'backup', + 'mode': 'fs', + 'path_to_backup': self.source_tree.path, + 'container': storage_dir.path, + 'storage': 'local', + 'engine': 'rsync', + 'max_segment_size': '67108864', + 'backup_name': uuid.uuid4().hex + } + + restore_args = { + 'action': 'restore', + 'restore_abs_path': self.dest_tree.path, + 'backup_name': copy(backup_args['backup_name']), + 'storage': 'local', + 'engine': 'rsync', + 'container': storage_dir.path + } + result = common.execute_freezerc(backup_args) + self.assertIsNotNone(result) + result = common.execute_freezerc(restore_args) + self.assertIsNotNone(result) + self.assertTreesMatch() + + def test_backup_multiple_level(self): + """ + - use the default source and destination trees in /tmp + (see common.TestFS) + - use temporary directory for backup storage + - add some random data + - check that trees don't match anymore + - execute block based backup of source tree + - modify data + - execute backup again + - delete some files + - execute backup again + - execute restore into destination tree + - check that source and destination trees match + + :return: non on success + """ + self.source_tree.add_random_data() + self.assertTreesMatchNot() + backup_name = uuid.uuid4().hex + + with common.Temp_Tree() as storage_dir: + backup_args = { + 'action': 'backup', + 'mode': 'fs', + 'path_to_backup': self.source_tree.path, + 'container': storage_dir.path, + 'storage': 'local', + 'engine': 'rsync', + 'max_segment_size': '67108864', + 'backup_name': backup_name, + } + + restore_args = { + 'action': 'restore', + 'restore_abs_path': self.dest_tree.path, + 'backup_name': backup_name, + 'storage': 'local', + 'engine': 'rsync', + 'container': storage_dir.path + } + result = common.execute_freezerc(backup_args) + self.assertIsNotNone(result) + self.source_tree.modify_random_files(2) + result = common.execute_freezerc(backup_args) + self.assertIsNotNone(result) + self.source_tree.delete_random_files(1) + result = common.execute_freezerc(backup_args) + self.assertIsNotNone(result) + result = common.execute_freezerc(restore_args) + self.assertIsNotNone(result) + self.assertTreesMatch() + + def test_backup_single_file(self): + """ + - use the default source and destination trees in /tmp + (see common.TestFS) + - use temporary directory for backup storage + - add one file with random data + - check that trees don't match anymore + - execute block based backup of single file + - execute restore into destination tree + - check that source and destination trees match + + :return: non on success + """ + self.source_tree.add_random_data(ndir=0, nfile=1) + self.assertTreesMatchNot() + + with common.Temp_Tree() as storage_dir: + backup_args = { + 'action': 'backup', + 'mode': 'fs', + 'path_to_backup': os.path.join( + self.source_tree.path, self.source_tree.files[0]), + 'container': storage_dir.path, + 'storage': 'local', + 'engine': 'rsync', + 'max_segment_size': '67108864', + 'backup_name': uuid.uuid4().hex + } + + restore_args = { + 'action': 'restore', + 'restore_abs_path': self.dest_tree.path, + 'backup_name': copy(backup_args['backup_name']), + 'storage': 'local', + 'engine': 'rsync', + 'container': storage_dir.path + } + result = common.execute_freezerc(backup_args) + self.assertIsNotNone(result) + result = common.execute_freezerc(restore_args) + self.assertIsNotNone(result) + self.assertTreesMatch() diff --git a/freezer/tests/unit/engines/rsync/__init__.py b/freezer/tests/unit/engines/rsync/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/freezer/tests/unit/engines/rsync/test_pyrsync.py b/freezer/tests/unit/engines/rsync/test_pyrsync.py new file mode 100644 index 00000000..b8bbb8ed --- /dev/null +++ b/freezer/tests/unit/engines/rsync/test_pyrsync.py @@ -0,0 +1,56 @@ +# (C) Copyright 2016 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +import six + +from freezer.engine.rsync import pyrsync + + +class TestPyrsync(unittest.TestCase): + def test_blockcheksum(self): + instream = six.BytesIO(b'aae9dd83aa45f906' + b'a4629f42e97eac99' + b'b9882284dc7030ca' + b'427ad365fedd2a55') + weak, strong = pyrsync.blockchecksums(instream, 16) + exp_weak = [736756931, 616825970, 577963056, 633341072] + exp_strong = ['0f923c37c14f648de4065d4666c2429231a923bc', + '9f043572d40922cc45545bd6ec8a650ca095ab84', + '3a0c39d59a6f49975c2be24bc6b37d80a6680dce', + '81487d7e87190cfbbf4f74acc40094c0a6f6ce8a'] + self.assertEqual((weak, strong), (exp_weak, exp_strong)) + + def test_rsyncdelta(self): + datastream = six.BytesIO(b'addc830058f917ae' + b'a1be5ab4d899b570' + b'85c9534c64d8d71c' + b'1f32cde9c71e5b6d') + + old_weak = [675087508, 698025105, 579470394, 667092162] + old_strong = ['e72251cb70a1b918ee43876896ebb4c8a7225f78', + '3bf6d2483425e8925df06c01ee490e386a9a707a', + '0ba97d95cc49b1ee2863b7dec3d49911502111c2', + '8b92d9f3f6679e1c8ce2f20e2a6217fd7f351f8f'] + + changed_indexes = [] + cur_index = 0 + for block_index in pyrsync.rsyncdelta(datastream, + (old_weak, old_strong), 16): + if not isinstance(block_index, int): + changed_indexes.append(cur_index) + cur_index += 1 + exp_changed_indexes = [0, 2] + self.assertEqual(changed_indexes[:-1], exp_changed_indexes) diff --git a/freezer/utils/compress.py b/freezer/utils/compress.py new file mode 100644 index 00000000..5d0e9654 --- /dev/null +++ b/freezer/utils/compress.py @@ -0,0 +1,110 @@ +# (C) Copyright 2016 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +GZIP = 'zlib' +BZIP2 = 'bz2' +XZ = 'lzma' + +COMPRESS_METHOD = 'compress' +DECOMPRESS_METHOD = 'decompress' + + +def get_compression_algo(compression_algo): + algo = { + 'gzip': GZIP, + 'bzip2': BZIP2, + 'xz': XZ, + } + return algo.get(compression_algo) + + +def one_shot_compress(compression_algo, data): + compression_module = __import__(get_compression_algo(compression_algo)) + return getattr(compression_module, COMPRESS_METHOD)(data) + + +def one_shot_decompress(compression_algo, data): + compression_module = __import__(get_compression_algo(compression_algo)) + return getattr(compression_module, DECOMPRESS_METHOD)(data) + + +class BaseCompressor(object): + """ + Base class for compress/decompress activities. + """ + + def __init__(self, compression_algo): + # TODO(raliev): lzma module exists in stdlib since Py3 only + if compression_algo == 'xz': + raise NotImplementedError('XZ compression not implemented yet') + self.algo = get_compression_algo(compression_algo) + self.module = __import__(self.algo) + + +class Compressor(BaseCompressor): + """ + Compress chucks of data. + """ + + MAX_COMPRESS_LEVEL = 9 + + def __init__(self, compression_algo): + super(Compressor, self).__init__(compression_algo) + self.compressobj = self.create_compressobj(compression_algo) + + def create_compressobj(self, compression_algo): + def get_obj_name(): + names = { + 'gzip': 'compressobj', + 'bzip2': 'BZ2Compressor', + 'xz': 'compressobj', + } + return names.get(compression_algo) + + obj_name = get_obj_name() + return getattr(self.module, obj_name)(self.MAX_COMPRESS_LEVEL) + + def compress(self, data): + return self.compressobj.compress(data) + + def flush(self): + return self.compressobj.flush() + + +class Decompressor(BaseCompressor): + """ + Decompress chucks of data. + """ + + def __init__(self, compression_algo): + super(Decompressor, self).__init__(compression_algo) + self.decompressobj = self.create_decompressobj(compression_algo) + + def create_decompressobj(self, compression_algo): + def get_obj_name(): + names = { + 'gzip': 'decompressobj', + 'bzip2': 'BZ2Decompressor', + 'xz': 'decompressobj', + } + return names.get(compression_algo) + + obj_name = get_obj_name() + return getattr(self.module, obj_name)() + + def decompress(self, data): + return self.decompressobj.decompress(data) + + def flush(self): + return self.decompressobj.flush() diff --git a/freezer/utils/config.py b/freezer/utils/config.py index 170de520..6b26898b 100644 --- a/freezer/utils/config.py +++ b/freezer/utils/config.py @@ -12,20 +12,24 @@ # See the License for the specific language governing permissions and # limitations under the License. - import os import re -import six - -from six.moves import configparser from oslo_log import log +import six +from six.moves import configparser from freezer.utils import utils LOG = log.getLogger(__name__) +EXPORT = re.compile(r"^\s*export\s+([^=^#^\s]+)\s*=\s*([^#^\n]*)\s*$", + re.MULTILINE) + +INI = re.compile(r"^\s*([^=#\s]+)\s*=[\t]*([^#\n]*)\s*$", re.MULTILINE) + + class Config(object): @staticmethod @@ -70,12 +74,6 @@ class Config(object): self.storages = storages -EXPORT = re.compile(r"^\s*export\s+([^=^#^\s]+)\s*=\s*([^#^\n]*)\s*$", - re.MULTILINE) - -INI = re.compile(r"^\s*([^=#\s]+)\s*=[\t]*([^#\n]*)\s*$", re.MULTILINE) - - def osrc_parse(lines): """ :param lines: diff --git a/freezer/utils/crypt.py b/freezer/utils/crypt.py new file mode 100644 index 00000000..6c54a230 --- /dev/null +++ b/freezer/utils/crypt.py @@ -0,0 +1,92 @@ +# (C) Copyright 2016 Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import hashlib + +from Crypto.Cipher import AES +from Crypto import Random + + +class AESCipher(object): + """ + Base class for encrypt/decrypt activities. + """ + + SALT_HEADER = 'Salted__' + AES256_KEY_LENGTH = 32 + BS = AES.block_size + + def __init__(self, pass_file): + self._password = self._get_pass_from_file(pass_file) + self._salt = None + self.cipher = None + + @staticmethod + def _get_pass_from_file(pass_file): + with open(pass_file) as p_file: + password = p_file.readline() + return password + + @staticmethod + def _derive_key_and_iv(password, salt, key_length, iv_length): + d = d_i = b'' + while len(d) < key_length + iv_length: + md5_str = d_i + password + salt + d_i = hashlib.md5(md5_str).digest() + d += d_i + return d[:key_length], d[key_length:key_length + iv_length] + + +class AESEncrypt(AESCipher): + """ + Encrypts chucks of data using AES-256 algorithm. + OpenSSL compatible. + """ + + def __init__(self, pass_file): + super(AESEncrypt, self).__init__(pass_file) + self._salt = Random.new().read(self.BS - len(self.SALT_HEADER)) + key, iv = self._derive_key_and_iv(self._password, + self._salt, + self.AES256_KEY_LENGTH, + self.BS) + self.cipher = AES.new(key, AES.MODE_CFB, iv) + + def generate_header(self): + return self.SALT_HEADER + self._salt + + def encrypt(self, data): + return self.cipher.encrypt(data) + + +class AESDecrypt(AESCipher): + """ + Decrypts chucks of data using AES-256 algorithm. + OpenSSL compatible. + """ + + def __init__(self, pass_file, salt): + super(AESDecrypt, self).__init__(pass_file) + self._salt = self._prepare_salt(salt) + key, iv = self._derive_key_and_iv(self._password, + self._salt, + self.AES256_KEY_LENGTH, + self.BS) + self.cipher = AES.new(key, AES.MODE_CFB, iv) + + def _prepare_salt(self, salt): + return salt[len(self.SALT_HEADER):] + + def decrypt(self, data): + return self.cipher.decrypt(data) diff --git a/freezer/utils/streaming.py b/freezer/utils/streaming.py index 6e4cd4a9..4cc9139f 100644 --- a/freezer/utils/streaming.py +++ b/freezer/utils/streaming.py @@ -15,10 +15,12 @@ limitations under the License. Freezer general utils functions """ + +import threading + from oslo_log import log from six.moves import queue -import threading LOG = log.getLogger(__name__) diff --git a/requirements.txt b/requirements.txt index a329be04..09668cc7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -16,6 +16,7 @@ oslo.log>=3.11.0 # Apache-2.0 oslo.config!=3.18.0,>=3.14.0 # Apache-2.0 keystoneauth1>=2.17.0 # Apache-2.0 +pycrypto>=2.6 # Public Domain PyMySQL>=0.7.6 # MIT License pymongo!=3.1,>=3.0.2 # Apache-2.0 paramiko>=2.0 # LGPLv2.1+