Add unittest for swift.py and bug fix

Fixed a bug when parsing backup names. The issue
arised after the freezer_ prefix was introduced.

Added unittests for freezer/swift.py

In tox.ini now pytest doesn't recurse on .venv dir.

Change-Id: Ie29481e525efafcb4420f442174932f058e71d3f
LAUNCHPAD: https://bugs.launchpad.net/freezer/+bug/1412016
LAUNCHPAD: https://blueprints.launchpad.net/freezer/+spec/unittest-swift
This commit is contained in:
Fausto Marzi 2015-01-17 17:30:53 +00:00
parent 47247f5702
commit 8266162ccc
7 changed files with 543 additions and 67 deletions

View File

@ -39,7 +39,7 @@ def backup_arguments():
help=( help=(
"Set the action to be taken. backup and restore are" "Set the action to be taken. backup and restore are"
" self explanatory, info is used to retrieve info from the" " self explanatory, info is used to retrieve info from the"
" storage media, while maintenance is used to delete old backups" " storage media, while admin is used to delete old backups"
" and other admin actions. Default backup."), " and other admin actions. Default backup."),
dest='action', default='backup') dest='action', default='backup')
arg_parser.add_argument( arg_parser.add_argument(

View File

@ -128,6 +128,7 @@ def freezer_main(backup_args):
# Admin tasks code should go here, before moving it on a dedicated module # Admin tasks code should go here, before moving it on a dedicated module
if backup_args.action == 'admin' or backup_args.remove_older_than: if backup_args.action == 'admin' or backup_args.remove_older_than:
# Remove backups older if set. # Remove backups older if set.
backup_args = get_container_content(backup_args)
remove_obj_older_than(backup_args) remove_obj_older_than(backup_args)
# Compute elapsed time # Compute elapsed time

View File

@ -95,8 +95,9 @@ def show_objects(backup_opt_dict):
backup_opt_dict.remote_obj_list] backup_opt_dict.remote_obj_list]
if not validate_all_args(required_list): if not validate_all_args(required_list):
logging.critical('[*] Error: Remote Object list not avaiblale') err_msg = '[*] Error: Remote Object list not avaiblale'
raise Exception logging.exception(err_msg)
raise Exception(err_msg)
ordered_objects = {} ordered_objects = {}
remote_obj = backup_opt_dict.remote_obj_list remote_obj = backup_opt_dict.remote_obj_list
@ -125,8 +126,8 @@ def remove_obj_older_than(backup_opt_dict):
float(backup_opt_dict.remove_older_than)) float(backup_opt_dict.remove_older_than))
logging.info('[*] Removing object older {0} day(s)'.format( logging.info('[*] Removing object older {0} day(s)'.format(
backup_opt_dict.remove_older_than)) backup_opt_dict.remove_older_than))
# Compute the amount of seconds from days to compare with # Compute the amount of seconds from the number of days provided by
# the remote backup timestamp # remove_older_than and compare it with the remote backup timestamp
max_time = backup_opt_dict.remove_older_than * 86400 max_time = backup_opt_dict.remove_older_than * 86400
current_timestamp = backup_opt_dict.time_stamp current_timestamp = backup_opt_dict.time_stamp
backup_name = backup_opt_dict.backup_name backup_name = backup_opt_dict.backup_name
@ -134,29 +135,73 @@ def remove_obj_older_than(backup_opt_dict):
backup_opt_dict = get_match_backup(backup_opt_dict) backup_opt_dict = get_match_backup(backup_opt_dict)
sorted_remote_list = sort_backup_list(backup_opt_dict) sorted_remote_list = sort_backup_list(backup_opt_dict)
sw_connector = backup_opt_dict.sw_connector sw_connector = backup_opt_dict.sw_connector
level_0_flag = None
tar_meta_0_flag = None
for match_object in sorted_remote_list: for match_object in sorted_remote_list:
obj_name_match = re.search(r'{0}_({1})_(\d+)_\d+?$'.format( obj_name_match = re.search(r'{0}_({1})_(\d+)_(\d+?)$'.format(
hostname, backup_name), match_object, re.I) hostname, backup_name), match_object, re.I)
if not obj_name_match:
continue if obj_name_match:
remote_obj_timestamp = int(obj_name_match.group(2)) remote_obj_timestamp = int(obj_name_match.group(2))
time_delta = current_timestamp - remote_obj_timestamp time_delta = current_timestamp - remote_obj_timestamp
if time_delta > max_time:
logging.info('[*] Removing backup object: {0}'.format( # If the difference between current_timestamp and the backup
match_object)) # timestamp is smaller then max_time, then the backup is valid
sw_connector.delete_object( if time_delta > max_time:
backup_opt_dict.container, match_object)
# Try to remove also the corresponding tar_meta # If the time_delta is bigger then max_time, then we verify
# NEED TO BE IMPROVED! # if the level of the backup is 0. In case is not 0,
try: # the backup is not removed as is part of a backup where the
tar_match_object = 'tar_metadata_{0}'.format(match_object) # levels cross the max_time. In this case we don't remove the
sw_connector.delete_object( # backup till its level 0.
backup_opt_dict.container, tar_match_object) # Both tar_meta data and backup objects names are handled
logging.info( if match_object.startswith('tar_meta'):
'[*] Object tar meta data removed: {0}'.format( if tar_meta_0_flag is None:
tar_match_object)) if obj_name_match.group(3) is '0':
except Exception: tar_meta_0_flag = True
pass else:
continue
elif level_0_flag is None:
if obj_name_match.group(3) is '0':
level_0_flag = True
else:
continue
logging.info('[*] Removing backup object: {0}'.format(
match_object))
sleep_time = 120
retry_max_count = 60
curr_count = 0
while True:
try:
sw_connector.delete_object(
backup_opt_dict.container, match_object)
logging.info(
'[*] Remote object {0} removed'.format(
match_object))
break
except Exception as error:
curr_count += 1
time.sleep(sleep_time)
if curr_count >= retry_max_count:
err_msg = (
'[*] Remote Object {0} failed to be removed.'
' Retrying intent '
'{1} out of {2} totals'.format(
match_object, curr_count,
retry_max_count))
error_message = '[*] Error: {0}: {1}'.format(
err_msg, error)
logging.exception(error_message)
raise Exception(error_message)
else:
logging.warning(
('[*] Remote object {0} failed to be removed'
' Retrying intent n. '
'{1} out of {2} totals'.format(
match_object, curr_count,
retry_max_count)))
def get_container_content(backup_opt_dict): def get_container_content(backup_opt_dict):
@ -166,10 +211,9 @@ def get_container_content(backup_opt_dict):
""" """
if not backup_opt_dict.container: if not backup_opt_dict.container:
print '[*] Error: please provide a valid container name' err_msg = '[*] Error: please provide a valid container name'
logging.critical( logging.exception(err_msg)
'[*] Error: please provide a valid container name') raise Exception(err_msg)
raise Exception
sw_connector = backup_opt_dict.sw_connector sw_connector = backup_opt_dict.sw_connector
try: try:
@ -177,8 +221,9 @@ def get_container_content(backup_opt_dict):
sw_connector.get_container(backup_opt_dict.container)[1] sw_connector.get_container(backup_opt_dict.container)[1]
return backup_opt_dict return backup_opt_dict
except Exception as error: except Exception as error:
logging.critical('[*] Error: get_object_list: {0}'.format(error)) err_msg = '[*] Error: get_object_list: {0}'.format(error)
raise Exception logging.exception(err_msg)
raise Exception(err_msg)
def check_container_existance(backup_opt_dict): def check_container_existance(backup_opt_dict):
@ -195,9 +240,11 @@ def check_container_existance(backup_opt_dict):
backup_opt_dict.container] backup_opt_dict.container]
if not validate_all_args(required_list): if not validate_all_args(required_list):
logging.critical("[*] Error: please provide ALL the following args \ err_msg = ('[*] Error: please provide the following arg: '
{0}".format(','.join(required_list))) '--container')
raise Exception logging.exception(err_msg)
raise Exception(err_msg)
logging.info( logging.info(
"[*] Retrieving container {0}".format(backup_opt_dict.container)) "[*] Retrieving container {0}".format(backup_opt_dict.container))
sw_connector = backup_opt_dict.sw_connector sw_connector = backup_opt_dict.sw_connector
@ -260,6 +307,7 @@ def get_client(backup_opt_dict):
authurl=options['auth_url'], authurl=options['auth_url'],
user=options['username'], key=options['password'], os_options=options, user=options['username'], key=options['password'], os_options=options,
tenant_name=options['tenant_name'], auth_version='2', retries=6) tenant_name=options['tenant_name'], auth_version='2', retries=6)
return backup_opt_dict return backup_opt_dict
@ -270,8 +318,9 @@ def manifest_upload(
""" """
if not manifest_meta_dict: if not manifest_meta_dict:
logging.critical('[*] Error Manifest Meta dictionary not available') err_msg = '[*] Error Manifest Meta dictionary not available'
raise Exception logging.exception(err_msg)
raise Exception(err_msg)
sw_connector = backup_opt_dict.sw_connector sw_connector = backup_opt_dict.sw_connector
tmp_manifest_meta = dict() tmp_manifest_meta = dict()
@ -296,14 +345,16 @@ def add_object(
""" """
if not backup_opt_dict.container: if not backup_opt_dict.container:
logging.critical('[*] Error: Please specify the container \ err_msg = ('[*] Error: Please specify the container '
name with -C option') 'name with -C or --container option')
raise Exception logging.exception(err_msg)
raise Exception(err_msg)
if absolute_file_path is None and backup_queue is None: if absolute_file_path is None and backup_queue is None:
logging.critical('[*] Error: Please specify the file you want to \ err_msg = ('[*] Error: Please specify the file or fs path '
upload on swift with -d option') 'you want to upload on swift with -d or --dst-file')
raise Exception logging.exception(err_msg)
raise Exception(err_msg)
sw_connector = backup_opt_dict.sw_connector sw_connector = backup_opt_dict.sw_connector
while True: while True:
@ -354,8 +405,9 @@ def get_containers_list(backup_opt_dict):
backup_opt_dict.containers_list = sw_connector.get_account()[1] backup_opt_dict.containers_list = sw_connector.get_account()[1]
return backup_opt_dict return backup_opt_dict
except Exception as error: except Exception as error:
logging.error('[*] Get containers list error: {0}').format(error) err_msg = '[*] Get containers list error: {0}'.format(error)
raise Exception logging.exception(err_msg)
raise Exception(err_msg)
def object_to_file(backup_opt_dict, file_name_abs_path): def object_to_file(backup_opt_dict, file_name_abs_path):
@ -369,9 +421,10 @@ def object_to_file(backup_opt_dict, file_name_abs_path):
file_name_abs_path] file_name_abs_path]
if not validate_all_args(required_list): if not validate_all_args(required_list):
logging.critical('[*] Error: Please provide ALL the following \ err_msg = ('[*] Error in object_to_file(): Please provide ALL the '
arguments: {0}'.format(','.join(required_list))) 'following arguments: --container file_name_abs_path')
raise ValueError logging.exception(err_msg)
raise ValueError(err_msg)
sw_connector = backup_opt_dict.sw_connector sw_connector = backup_opt_dict.sw_connector
file_name = file_name_abs_path.split('/')[-1] file_name = file_name_abs_path.split('/')[-1]
@ -403,9 +456,10 @@ def object_to_stream(backup_opt_dict, write_pipe, read_pipe, obj_name):
backup_opt_dict.container] backup_opt_dict.container]
if not validate_all_args(required_list): if not validate_all_args(required_list):
logging.critical('[*] Error: Please provide ALL the following \ err_msg = ('[*] Error in object_to_stream(): Please provide ALL the '
arguments: {0}'.format(','.join(required_list))) 'following argument: --container')
raise ValueError logging.exception(err_msg)
raise ValueError(err_msg)
backup_opt_dict = get_client(backup_opt_dict) backup_opt_dict = get_client(backup_opt_dict)
logging.info('[*] Downloading data stream...') logging.info('[*] Downloading data stream...')

View File

@ -161,7 +161,7 @@ def sort_backup_list(backup_opt_dict):
# Remove duplicates objects # Remove duplicates objects
sorted_backups_list = list(set(backup_opt_dict.remote_match_backup)) sorted_backups_list = list(set(backup_opt_dict.remote_match_backup))
sorted_backups_list.sort(key=lambda x: x.split('_')[2], reverse=True) sorted_backups_list.sort(key=lambda x: x.rsplit('_', 2)[1], reverse=True)
return sorted_backups_list return sorted_backups_list

View File

@ -10,7 +10,7 @@ import os
import MySQLdb import MySQLdb
import pymongo import pymongo
import re import re
from collections import OrderedDict
import __builtin__ import __builtin__
os.environ['OS_REGION_NAME'] = 'testregion' os.environ['OS_REGION_NAME'] = 'testregion'
@ -21,6 +21,15 @@ os.environ['OS_USERNAME'] = 'testusername'
os.environ['OS_TENANT_NAME'] = 'testtenantename' os.environ['OS_TENANT_NAME'] = 'testtenantename'
class FakeTime:
def __init__(self):
return None
def sleep(self, *args):
return True
class FakeValidate: class FakeValidate:
def __init__(self): def __init__(self):
@ -194,14 +203,15 @@ class FakeMultiProcessing:
return True return True
def get(self, opt1=dict()): def get(self, opt1=dict()):
return True
return {'item': 'test-item-value'}
def __call__(self, duplex=True): def __call__(self, duplex=True):
return [] return []
class Pipe: class Pipe:
#def __init__(self, duplex=True): def __init__(self, duplex=True):
# return None return None
def send_bytes(self, opt1=True): def send_bytes(self, opt1=True):
return True return True
@ -245,6 +255,70 @@ class FakeMultiProcessing:
return True return True
class FakeMultiProcessing1:
def __init__(self, duplex=True, maxsize=True):
return None
class Queue:
def __init__(self, duplex=True):
return None
def put(self, opt1=dict()):
return False
def get(self, opt1=dict()):
return {'item': 'test-item-value'}
def __call__(self, duplex=True):
return []
class Pipe:
def __init__(self, duplex=True):
return None
def send_bytes(self, opt1=True):
return False
def recv_bytes(self, opt1=True):
raise EOFError
def send(self, opt1=True):
return False
def recv(self, opt1=True):
raise EOFError
def poll(self):
return False
def close(self):
return False
def __call__(self, duplex=True):
return [self, self]
class Process:
def __init__(self, target=True, args=True):
return None
def start(self):
return True
def stop(self):
return True
def daemon(self):
return True
def join(self):
return True
@classmethod
def util(cls):
return True
class FakeSubProcess: class FakeSubProcess:
def __init__(self, opt1=True, stdin=True, stdout=True, def __init__(self, opt1=True, stdin=True, stdout=True,
stderr=True, shell=True, executable=True, env={}, stderr=True, shell=True, executable=True, env={},
@ -415,6 +489,53 @@ class FakeSwiftClient:
def head_object(self, opt1=True, opt2=True): def head_object(self, opt1=True, opt2=True):
return True return True
def put_container(self, container=True):
return True
def delete_object(self, *args, **kwargs):
return True
def get_container(self, *args, **kwargs):
return [True, True]
def get_account(self, *args, **kwargs):
return True, [{'name': 'test-container'}, {'name': 'test-container-segments'}]
def get_object(self, *args, **kwargs):
return ['abcdef', 'hijlmno']
class FakeSwiftClient1:
def __init__(self):
return None
class client:
def __init__(self):
return None
class Connection:
def __init__(self, key=True, os_options=True, auth_version=True, user=True, authurl=True, tenant_name=True, retries=True):
return None
def put_object(self, opt1=True, opt2=True, opt3=True, opt4=True, opt5=True, headers=True, content_length=True, content_type=True):
raise Exception
def head_object(self, opt1=True, opt2=True):
raise Exception
def put_container(self, container=True):
raise Exception
def delete_object(self):
raise Exception
def get_container(self, *args, **kwargs):
raise Exception
def get_account(self, *args, **kwargs):
raise Exception
class FakeRe: class FakeRe:
@ -486,7 +607,7 @@ class BackupOpt1:
self.remove_older_than = '0' self.remove_older_than = '0'
self.max_seg_size = '0' self.max_seg_size = '0'
self.time_stamp = 123456789 self.time_stamp = 123456789
self.container_segments = 'test-container-segements' self.container_segments = 'test-container-segments'
self.container = 'test-container' self.container = 'test-container'
self.workdir = '/tmp' self.workdir = '/tmp'
self.upload = 'true' self.upload = 'true'
@ -496,19 +617,33 @@ class BackupOpt1:
self.always_backup_level = '20' self.always_backup_level = '20'
self.remove_older_than = '20' self.remove_older_than = '20'
self.restart_always_backup = 100000 self.restart_always_backup = 100000
self.container_segments = 'testcontainerseg'
self.remote_match_backup = [ self.remote_match_backup = [
'test-hostname_test-backup-name_1234567_0', 'test-hostname_test-backup-name_1234567_0',
'test-hostname_test-backup-name_1234567_1', 'test-hostname_test-backup-name_aaaaa__a',
'test-hostname_test-backup-name_1234567_2'] 'test-hostname_test-backup-name_9999999999999999999999999999999_0',
'test-hostname_test-backup-name_1234568_1',
'test-hostname_test-backup-name_1234569_2',
'tar_meta_test-hostname_test-backup-name_1234569_2',
'tar_meta_test-hostname_test-backup-name_1234568_1',
'tar_meta_test-hostname_test-backup-name_1234567_0']
self.remote_obj_list = [ self.remote_obj_list = [
{'name' : 'test-hostname_test-backup-name_1234567_0'}, {'name': 'test-hostname_test-backup-name_1234567_0',
{'name' : 'test-hostname_test-backup-name_1234567_1'}, 'last_modified': 'testdate'},
{'name' : 'test-hostname_test-backup-name_1234567_2'}, {'name': 'test-hostname_test-backup-name_1234567_1',
{'fakename' : 'test-hostname_test-backup-name_1234567_2'}, 'last_modified': 'testdate'},
{'name' : 'test-hostname-test-backup-name-asdfa-asdfasdf'}] {'name': 'test-hostname_test-backup-name_1234567_2',
'last_modified': 'testdate'},
#{'name': 'test-hostname_test-backup-name_1234567_2',
# 'last_modified': 'testdate'},
{'name': 'test-hostname-test-backup-name-asdfa-asdfasdf',
'last_modified': 'testdate'}]
self.remote_objects = [] self.remote_objects = []
self.restore_abs_path = '/tmp' self.restore_abs_path = '/tmp'
self.containers_list = [
{'name' : 'testcontainer1', 'bytes' : 123423, 'count' : 10}
]
self.list_container = True
self.list_objects = True
class FakeMySQLdb: class FakeMySQLdb:
@ -682,7 +817,6 @@ class Fake_get_vol_fs_type:
def __init__(self): def __init__(self):
return None return None
@classmethod @classmethod
def get_vol_fs_type1(self, opt1=True): def get_vol_fs_type1(self, opt1=True):
return 'xfs' return 'xfs'

287
tests/test_swift.py Normal file
View File

@ -0,0 +1,287 @@
"""Freezer swift.py related tests
Copyright 2014 Hewlett-Packard
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
This product includes cryptographic software written by Eric Young
(eay@cryptsoft.com). This product includes software written by Tim
Hudson (tjh@cryptsoft.com).
========================================================================
"""
from commons import *
from freezer.swift import (create_containers, show_containers,
show_objects, remove_obj_older_than, get_container_content,
check_container_existance, get_swift_os_env,
get_client, manifest_upload, add_object, get_containers_list,
object_to_file, object_to_stream)
import os
import logging
import subprocess
import pytest
import time
class TestSwift:
def test_create_containers(self, monkeypatch):
backup_opt = BackupOpt1()
fakelogging = FakeLogging()
monkeypatch.setattr(logging, 'critical', fakelogging.critical)
monkeypatch.setattr(logging, 'warning', fakelogging.warning)
monkeypatch.setattr(logging, 'exception', fakelogging.exception)
monkeypatch.setattr(logging, 'error', fakelogging.error)
assert create_containers(backup_opt) is True
def test_show_containers(self, monkeypatch):
backup_opt = BackupOpt1()
fakelogging = FakeLogging()
monkeypatch.setattr(logging, 'critical', fakelogging.critical)
monkeypatch.setattr(logging, 'warning', fakelogging.warning)
monkeypatch.setattr(logging, 'exception', fakelogging.exception)
monkeypatch.setattr(logging, 'error', fakelogging.error)
assert show_containers(backup_opt) is True
backup_opt.__dict__['list_container'] = False
assert show_containers(backup_opt) is False
def test_show_objects(self, monkeypatch):
backup_opt = BackupOpt1()
fakelogging = FakeLogging()
monkeypatch.setattr(logging, 'critical', fakelogging.critical)
monkeypatch.setattr(logging, 'warning', fakelogging.warning)
monkeypatch.setattr(logging, 'exception', fakelogging.exception)
monkeypatch.setattr(logging, 'error', fakelogging.error)
assert show_objects(backup_opt) is True
backup_opt.__dict__['remote_obj_list'] = None
pytest.raises(Exception, show_objects, backup_opt)
backup_opt.__dict__['list_objects'] = False
assert show_objects(backup_opt) is False
def test_remove_obj_older_than(self, monkeypatch):
backup_opt = BackupOpt1()
fakelogging = FakeLogging()
fakeclient = FakeSwiftClient1()
fakeconnector = fakeclient.client()
fakeswclient = fakeconnector.Connection()
backup_opt.sw_connector = fakeswclient
faketime = FakeTime()
monkeypatch.setattr(logging, 'critical', fakelogging.critical)
monkeypatch.setattr(logging, 'warning', fakelogging.warning)
monkeypatch.setattr(logging, 'exception', fakelogging.exception)
monkeypatch.setattr(logging, 'error', fakelogging.error)
monkeypatch.setattr(time, 'sleep', faketime.sleep)
pytest.raises(Exception, remove_obj_older_than, backup_opt)
backup_opt.__dict__['remove_older_than'] = False
assert remove_obj_older_than(backup_opt) is False
backup_opt = BackupOpt1()
assert remove_obj_older_than(backup_opt) is None
def test_get_container_content(self, monkeypatch):
backup_opt = BackupOpt1()
fakelogging = FakeLogging()
monkeypatch.setattr(logging, 'critical', fakelogging.critical)
monkeypatch.setattr(logging, 'warning', fakelogging.warning)
monkeypatch.setattr(logging, 'exception', fakelogging.exception)
monkeypatch.setattr(logging, 'error', fakelogging.error)
assert get_container_content(backup_opt) is not False
assert get_container_content(backup_opt) is not None
backup_opt = BackupOpt1()
backup_opt.container = False
pytest.raises(Exception, get_container_content, backup_opt)
fakeclient = FakeSwiftClient1()
fakeconnector = fakeclient.client()
fakeswclient = fakeconnector.Connection()
backup_opt = BackupOpt1()
backup_opt.sw_connector = fakeswclient
pytest.raises(Exception, get_container_content, backup_opt)
def test_check_container_existance(self, monkeypatch):
backup_opt = BackupOpt1()
fakelogging = FakeLogging()
monkeypatch.setattr(logging, 'critical', fakelogging.critical)
monkeypatch.setattr(logging, 'warning', fakelogging.warning)
monkeypatch.setattr(logging, 'exception', fakelogging.exception)
monkeypatch.setattr(logging, 'error', fakelogging.error)
assert type(check_container_existance(backup_opt)) is dict
backup_opt = BackupOpt1()
backup_opt.container_segments = None
pytest.raises(Exception, check_container_existance, backup_opt)
backup_opt = BackupOpt1()
backup_opt.container = 'test-abcd'
backup_opt.container_segments = 'test-abcd-segments'
assert type(check_container_existance(backup_opt)) is dict
def test_get_client(self, monkeypatch):
backup_opt = BackupOpt1()
fakelogging = FakeLogging()
monkeypatch.setattr(logging, 'critical', fakelogging.critical)
monkeypatch.setattr(logging, 'warning', fakelogging.warning)
monkeypatch.setattr(logging, 'exception', fakelogging.exception)
monkeypatch.setattr(logging, 'error', fakelogging.error)
assert isinstance(get_client(backup_opt), BackupOpt1) is True
def test_manifest_upload(self, monkeypatch):
backup_opt = BackupOpt1()
fakelogging = FakeLogging()
monkeypatch.setattr(logging, 'critical', fakelogging.critical)
monkeypatch.setattr(logging, 'warning', fakelogging.warning)
monkeypatch.setattr(logging, 'exception', fakelogging.exception)
monkeypatch.setattr(logging, 'error', fakelogging.error)
manifest_file = 'test-manifest-file'
file_prefix = '000000'
manifest_meta_dict = {'x-object-manifest': 'test-x-object'}
assert manifest_upload(
manifest_file, backup_opt,
file_prefix, manifest_meta_dict) is None
manifest_meta_dict = {}
pytest.raises(
Exception, manifest_upload, manifest_file, backup_opt,
file_prefix, manifest_meta_dict)
def test_add_object(self, monkeypatch):
backup_opt = BackupOpt1()
fakelogging = FakeLogging()
monkeypatch.setattr(logging, 'critical', fakelogging.critical)
monkeypatch.setattr(logging, 'warning', fakelogging.warning)
monkeypatch.setattr(logging, 'exception', fakelogging.exception)
monkeypatch.setattr(logging, 'error', fakelogging.error)
fakemultiprocessing = FakeMultiProcessing()
backup_queue = fakemultiprocessing.Queue()
time_stamp = int(time.time())
faketime = FakeTime()
monkeypatch.setattr(time, 'sleep', faketime.sleep)
absolute_file_path = '/tmp/test-abs-file-path'
backup_opt = BackupOpt1()
backup_opt.container = None
pytest.raises(Exception, add_object, backup_opt, backup_queue,
absolute_file_path, time_stamp)
fakeclient = FakeSwiftClient1()
fakeconnector = fakeclient.client()
fakeswclient = fakeconnector.Connection()
backup_opt = BackupOpt1()
backup_opt.sw_connector = fakeswclient
pytest.raises(Exception, add_object, backup_opt, backup_queue,
absolute_file_path, time_stamp)
backup_opt = BackupOpt1()
absolute_file_path = None
backup_queue = None
pytest.raises(Exception, add_object, backup_opt, backup_queue,
absolute_file_path, time_stamp)
def test_get_containers_list(self, monkeypatch):
backup_opt = BackupOpt1()
fakelogging = FakeLogging()
monkeypatch.setattr(logging, 'critical', fakelogging.critical)
monkeypatch.setattr(logging, 'warning', fakelogging.warning)
monkeypatch.setattr(logging, 'exception', fakelogging.exception)
monkeypatch.setattr(logging, 'error', fakelogging.error)
assert isinstance(get_containers_list(backup_opt), BackupOpt1) is True
fakeclient = FakeSwiftClient1()
fakeconnector = fakeclient.client()
fakeswclient = fakeconnector.Connection()
backup_opt = BackupOpt1()
backup_opt.sw_connector = fakeswclient
pytest.raises(Exception, get_containers_list, backup_opt)
def test_object_to_file(self, monkeypatch):
backup_opt = BackupOpt1()
fakelogging = FakeLogging()
monkeypatch.setattr(logging, 'critical', fakelogging.critical)
monkeypatch.setattr(logging, 'warning', fakelogging.warning)
monkeypatch.setattr(logging, 'exception', fakelogging.exception)
monkeypatch.setattr(logging, 'error', fakelogging.error)
file_name_abs_path = '/tmp/test-abs-file-path'
assert object_to_file(backup_opt, file_name_abs_path) is True
backup_opt = BackupOpt1()
backup_opt.container = None
pytest.raises(Exception, object_to_file, backup_opt, file_name_abs_path)
os.unlink(file_name_abs_path)
def test_object_to_stream(self, monkeypatch):
backup_opt = BackupOpt1()
fakelogging = FakeLogging()
fakeclient = FakeSwiftClient()
fakeconnector = fakeclient.client
monkeypatch.setattr(logging, 'critical', fakelogging.critical)
monkeypatch.setattr(logging, 'warning', fakelogging.warning)
monkeypatch.setattr(logging, 'exception', fakelogging.exception)
monkeypatch.setattr(logging, 'error', fakelogging.error)
monkeypatch.setattr(swiftclient, 'client', fakeconnector)
obj_name = 'test-obj-name'
fakemultiprocessing = FakeMultiProcessing1()
backup_pipe_read = backup_pipe_write = fakemultiprocessing.Pipe()
backup_opt.container = None
pytest.raises(Exception, object_to_stream,
backup_opt, backup_pipe_write, backup_pipe_read, obj_name)
backup_opt = BackupOpt1()
assert object_to_stream(
backup_opt, backup_pipe_write, backup_pipe_read, obj_name) is None

View File

@ -17,7 +17,7 @@ commands = python runtests.py -v -n 2 --cov-report term-missing --cov freezer
[pytest] [pytest]
python_files = test_*.py python_files = test_*.py
norecursedirs = .tox norecursedirs = .tox .venv
[testenv:pep8] [testenv:pep8]
commands = flake8 freezer commands = flake8 freezer