Windows support for freezer

Backups for SQL Server, MySQL, MongoDB and filesystem on windows using
shadow copies (vssadmin), tar compression and encryption using openssl and tar..

Change-Id: I1f68e012f28891c19e4d94352511ec968382c8e2
Implements: blueprint add-windows-support-for-backup-and-restore
This commit is contained in:
Memo Garcia 2015-04-02 10:09:18 +01:00
parent e4238272c5
commit 2be468138f
19 changed files with 953 additions and 183 deletions

View File

@ -17,6 +17,7 @@ Contributors
- Duncan Thomas
- Coleman Corrigan
- Guillermo Ramirez Garcia
- Zahari Zahariev
Credits

View File

@ -57,7 +57,7 @@ MySQL backup::
Freezer installation from Python package repo::
$ sudo pip install freezer
$ sudo pip install freezer
OR::
@ -90,6 +90,33 @@ These are just use case example using Swift in the HP Cloud.
freezer will execute a backup on point-in-time data. This avoid risks of
data inconsistencies and corruption.*
Windows
-------
*Note* Windows currently does not support incremental backups
Install Tar, OpenSSL, Gzip GNU binaries from http://gnuwin32.sourceforge.net/packages.html and add
GnuWin32\bin to Path:
e.g. C:\Program Files (x86)\GnuWin32\bin
Swift client and Keystone client:
> pip install python-swiftclient
> pip install python-keystoneclient
> pip install freezer
The basic Swift account configuration is needed to use freezer. Make sure python-swiftclient is installed.
set OS_REGION_NAME=region-a.geo-1
set OS_TENANT_ID=<account tenant>
set OS_PASSWORD=<account password>
set OS_AUTH_URL=https://region-a.geo-1.identity.hpcloudsvc.com:35357/v2.0
set OS_USERNAME=automationbackup
set OS_TENANT_NAME=automationbackup
Usage Example
=============
@ -105,6 +132,11 @@ The most simple backup execution is a direct file system backup::
$ sudo freezerc --file-to-backup /data/dir/to/backup
--container freezer_new-data-backup --backup-name my-backup-name
* On windows (need admin rights)*
> freezerc --action backup --mode fs --backup-name testwindows
--path-to-backup "C:\path\to\backup" --volume C:\ --container freezer_windows
--log-file C:\path\to\log\freezer.log
By default --mode fs is set. The command would generate a compressed tar
gzip file of the directory /data/dir/to/backup. The generated file will
be segmented in stream and uploaded in the swift container called
@ -154,7 +186,7 @@ example of the config::
$ sudo cat /root/.freezer/db.conf
host = your.mysql.host.ip
user = backup
user = backup
password = userpassword
Every listed option is mandatory. There's no need to stop the mysql
@ -162,7 +194,7 @@ service before the backup execution.
Execute a MySQL backup using lvm snapshot::
$ sudo freezerc --lvm-srcvol /dev/mysqlvg/mysqlvol
$ sudo freezerc --lvm-srcvol /dev/mysqlvg/mysqlvol
--lvm-dirmount /var/snapshot-backup
--lvm-volgroup mysqlvg --file-to-backup /var/snapshot-backup
--mysql-conf /root/.freezer/freezer-mysql.conf--container
@ -413,8 +445,8 @@ Available options::
The backup name you want to use to identify your
backup on Swift
-m MODE, --mode MODE Set the technology to back from. Options are, fs
(filesystem), mongo (MongoDB), mysql (MySQL). Default
set to fs
(filesystem), mongo (MongoDB), mysql (MySQL) sqlserver (SQL Server).
Default set to fs
-C CONTAINER, --container CONTAINER
The Swift container used to upload files to
-L, --list-containers
@ -489,7 +521,7 @@ Available options::
password, host. Following is an example of config
file: # cat ~/.freezer/backup_mysql_conf host = <db-
host> user = <mysqluser> password = <mysqlpass>
--log-file LOG_FILE Set log file. By default logs to /var/log/freezer.log
--log-file LOG_FILE Set log file. By default logs to ~/freezer.log
--exclude EXCLUDE Exclude files, given as a PATTERN.Ex: --exclude
'*.log' will exclude any file with name ending with
.log. Default no exclude
@ -524,4 +556,9 @@ Available options::
on Linux) and real-time for I/O. The process priority
will be set only if nice and ionice are installed
Default disabled. Use with caution.
-V, --version Print the release version and exit
-V, --version Print the release version and exit.
--volume Create a snapshot of the selected volume
--sql-server-conf Set the SQL Server configuration file where freezer retrieve
the sql server instance.
Following is an example of config file:
instance = <db-instance>

View File

@ -30,95 +30,12 @@ Freezer offer the following features:
[*] Flexible Incremental backup policy
'''
from freezer.main import freezer_main
from freezer.arguments import backup_arguments
from freezer.utils import create_dir
import os
import subprocess
import logging
import sys
# Initialize backup options
(backup_args, arg_parse) = backup_arguments()
if backup_args.version:
print "freezer version {0}".format(backup_args.__version__)
sys.exit(1)
if len(sys.argv) < 2:
arg_parse.print_help()
sys.exit(1)
def configure_log_file_using_defaults():
dry_run_message = ''
if backup_args.dry_run:
dry_run_message = '[DRY_RUN] '
def configure_logging(file_name):
expanded_file_name = os.path.expanduser(file_name)
expanded_dir_name = os.path.dirname(expanded_file_name)
create_dir(expanded_dir_name, do_log=False)
logging.basicConfig(
filename=expanded_file_name,
level=logging.INFO,
format=('%(asctime)s %(name)s %(levelname)s {0}%(message)s'.
format(dry_run_message)))
return expanded_file_name
if backup_args.log_file:
return configure_logging(backup_args.log_file)
for file_name in ['/var/log/freezer.log', '~/.freezer/freezer.log']:
try:
return configure_logging(file_name)
except IOError:
pass
raise Exception("Unable to write to log file")
def set_max_process_priority():
# children processes inherit niceness from father
try:
logging.warning(
'[*] Setting freezer execution with high CPU and I/O priority')
PID = os.getpid()
# Set cpu priority
os.nice(-19)
# Set I/O Priority to Real Time class with level 0
subprocess.call(
[u'{0}'.format(backup_args.ionice),
u'-c', u'1', u'-n', u'0', u'-t', u'-p', u'{0}'.format(PID)])
except Exception as priority_error:
logging.warning('[*] Priority: {0}'.format(priority_error))
def fail(exit_code, e, do_log=True):
msg = '[*] Critical Error: {0}\n'.format(e)
if not backup_args.quiet:
sys.stderr.write(msg)
if do_log:
logging.critical(msg)
sys.exit(exit_code)
from freezer.main import freezer_main, fail
if __name__ == '__main__':
try:
log_file_name = configure_log_file_using_defaults()
except Exception as err:
fail(1, err, do_log=False)
if not backup_args.quiet:
print 'log file at {0}'.format(log_file_name)
if backup_args.max_priority:
set_max_process_priority()
try:
freezer_main(backup_args)
freezer_main()
except ValueError as err:
fail(1, err)
except ImportError as err:

View File

@ -27,6 +27,11 @@ import argparse
import logging
import distutils.spawn as distspawn
import utils
import socket
from freezer.winutils import is_windows
from os.path import expanduser
home = expanduser("~")
def alter_proxy(args_dict):
@ -80,7 +85,8 @@ def backup_arguments(args_dict={}):
arg_parser.add_argument(
'-m', '--mode', action='store',
help="Set the technology to back from. Options are, fs (filesystem),\
mongo (MongoDB), mysql (MySQL). Default set to fs", dest='mode',
mongo (MongoDB), mysql (MySQL), sqlserver (SQL Server)\
Default set to fs", dest='mode',
default='fs')
arg_parser.add_argument(
'-C', '--container', action='store',
@ -193,16 +199,24 @@ def backup_arguments(args_dict={}):
password = <mysqlpass>
port = <db-port>''',
dest='mysql_conf_file', default=False)
arg_parser.add_argument(
'--log-file', action='store',
help='Set log file. By default logs to /var/log/freezer.log'
'If that file is not writable, freezer tries to log'
'to ~/.freezer/freezer.log',
dest='log_file', default=None)
if is_windows():
arg_parser.add_argument(
'--log-file', action='store',
help='Set log file. By default logs to ~/freezer.log',
dest='log_file', default=os.path.join(home,
'.freezer',
'freezer.log'))
else:
arg_parser.add_argument(
'--log-file', action='store',
help='Set log file. By default logs to /var/log/freezer.log'
'If that file is not writable, freezer tries to log'
'to ~/.freezer/freezer.log',
dest='log_file', default=None)
arg_parser.add_argument(
'--exclude', action='store', help="Exclude files,\
given as a PATTERN.Ex: --exclude '*.log' will exclude any file with \
name ending with .log. Default no exclude", dest='exclude',
given as a PATTERN.Ex: --exclude '*.log' will exclude any file \
with name ending with .log. Default no exclude", dest='exclude',
default=False)
arg_parser.add_argument(
'--dereference-symlink', choices=['none', 'soft', 'hard', 'all'],
@ -288,6 +302,17 @@ def backup_arguments(args_dict={}):
dest='download_limit',
type=utils.human2bytes,
default=-1)
arg_parser.add_argument(
'--sql-server-conf', action='store',
help='''Set the SQL Server configuration file where freezer retrieve
the sql server instance.
Following is an example of config file:
instance = <db-instance>''',
dest='sql_server_config', default=False)
arg_parser.add_argument(
'--volume', action='store',
help='Create a snapshot of the selected volume',
dest='volume', default=False)
backup_args = arg_parser.parse_args()
# Intercept command line arguments if you are not using the CLI
@ -299,7 +324,7 @@ def backup_arguments(args_dict={}):
backup_args.__dict__['remote_obj_list'] = []
backup_args.__dict__['remote_newest_backup'] = u''
# Set default workdir to ~/.freezer
backup_args.__dict__['workdir'] = os.path.expanduser(u'~/.freezer')
backup_args.__dict__['workdir'] = os.path.join(home, '.freezer')
# Create a new namespace attribute for container_segments
backup_args.__dict__['container_segments'] = u'{0}_segments'.format(
backup_args.container)
@ -317,11 +342,14 @@ def backup_arguments(args_dict={}):
# If hostname is not set, hostname of the current node will be used
if not backup_args.hostname:
backup_args.__dict__['hostname'] = os.uname()[1]
backup_args.__dict__['hostname'] = socket.gethostname()
backup_args.__dict__['manifest_meta_dict'] = {}
backup_args.__dict__['curr_backup_level'] = ''
backup_args.__dict__['manifest_meta_dict'] = ''
backup_args.__dict__['tar_path'] = distspawn.find_executable('tar')
if is_windows():
backup_args.__dict__['tar_path'] = 'tar'
else:
backup_args.__dict__['tar_path'] = distspawn.find_executable('tar')
# If freezer is being used under OSX, please install gnutar and
# rename the executable as gnutar
if 'darwin' in sys.platform or 'bsd' in sys.platform:
@ -342,7 +370,11 @@ def backup_arguments(args_dict={}):
backup_args.__dict__['lvremove_path'] = distspawn.find_executable(
'lvremove')
backup_args.__dict__['bash_path'] = distspawn.find_executable('bash')
backup_args.__dict__['openssl_path'] = distspawn.find_executable('openssl')
if is_windows():
backup_args.__dict__['openssl_path'] = 'openssl'
else:
backup_args.__dict__['openssl_path'] = \
distspawn.find_executable('openssl')
backup_args.__dict__['file_path'] = distspawn.find_executable('file')
backup_args.__dict__['mount_path'] = distspawn.find_executable('mount')
backup_args.__dict__['umount_path'] = distspawn.find_executable('umount')
@ -351,7 +383,19 @@ def backup_arguments(args_dict={}):
# MySQLdb object
backup_args.__dict__['mysql_db_inst'] = ''
# SQL Server object
backup_args.__dict__['sql_server_instance'] = ''
# Windows volume
backup_args.__dict__['shadow'] = ''
backup_args.__dict__['shadow_path'] = ''
backup_args.__dict__['file_name'] = ''
backup_args.__dict__['meta_data'] = {}
backup_args.__dict__['meta_data_file'] = ''
backup_args.__dict__['absolute_path'] = ''
# Freezer version
backup_args.__dict__['__version__'] = '1.1.2'
backup_args.__dict__['__version__'] = '1.1.3'
return backup_args, arg_parser

View File

@ -25,11 +25,44 @@ from freezer.lvm import lvm_snap, lvm_snap_remove, get_lvm_info
from freezer.tar import tar_backup, gen_tar_command
from freezer.swift import add_object, manifest_upload, get_client
from freezer.utils import gen_manifest_meta, add_host_name_ts_level
from freezer.vss import vss_create_shadow_copy
from freezer.vss import vss_delete_shadow_copy
from freezer.vss import start_sql_server
from freezer.vss import stop_sql_server
from freezer.winutils import use_shadow
from freezer.winutils import is_windows
import multiprocessing
import logging
import os
from os.path import expanduser
home = expanduser("~")
def backup_mode_sql_server(backup_opt_dict, time_stamp, manifest_meta_dict):
"""
Execute a SQL Server DB backup. Currently only backups with shadow
copy are supported. This mean, as soon as the shadow copy is created
the db writes will be blocked and a checkpoint will be created, as soon
as the backup finish the db will be unlocked and the backup will be
uploaded. A sql_server.conf_file is required for this operation.
"""
with open(backup_opt_dict.sql_server_config, 'r') as sql_conf_file_fd:
for line in sql_conf_file_fd:
if 'instance' in line:
db_instance = line.split('=')[1].strip()
backup_opt_dict.sql_server_instance = db_instance
continue
else:
raise Exception('Please indicate a valid SQL Server instance')
try:
stop_sql_server(backup_opt_dict)
backup_mode_fs(backup_opt_dict, time_stamp, manifest_meta_dict)
finally:
start_sql_server(backup_opt_dict)
def backup_mode_mysql(backup_opt_dict, time_stamp, manifest_meta_dict):
"""
@ -116,75 +149,104 @@ def backup_mode_fs(backup_opt_dict, time_stamp, manifest_meta_dict):
logging.info('[*] File System backup is being executed...')
# If lvm_auto_snap is true, the volume group and volume name will be
# extracted automatically
if backup_opt_dict.lvm_auto_snap:
backup_opt_dict = get_lvm_info(backup_opt_dict)
try:
# Generate the lvm_snap if lvm arguments are available
lvm_snap(backup_opt_dict)
if is_windows():
# Create a shadow copy.
# Create a shadow copy.
backup_opt_dict.shadow_path, backup_opt_dict.shadow = \
vss_create_shadow_copy(backup_opt_dict.volume)
# Generate a string hostname, backup name, timestamp and backup level
file_name = add_host_name_ts_level(backup_opt_dict, time_stamp)
meta_data_backup_file = u'tar_metadata_{0}'.format(file_name)
else:
# If lvm_auto_snap is true, the volume group and volume name will
# be extracted automatically
if backup_opt_dict.lvm_auto_snap:
backup_opt_dict = get_lvm_info(backup_opt_dict)
# Execute a tar gzip of the specified directory and return
# small chunks (default 128MB), timestamp, backup, filename,
# file chunk index and the tar meta-data file
(backup_opt_dict, tar_command, manifest_meta_dict) = gen_tar_command(
opt_dict=backup_opt_dict, time_stamp=time_stamp,
remote_manifest_meta=manifest_meta_dict)
# Initialize a Queue for a maximum of 2 items
tar_backup_queue = multiprocessing.Queue(maxsize=2)
tar_backup_stream = multiprocessing.Process(
target=tar_backup, args=(
backup_opt_dict, tar_command, tar_backup_queue,))
tar_backup_stream.daemon = True
tar_backup_stream.start()
# Generate the lvm_snap if lvm arguments are available
lvm_snap(backup_opt_dict)
add_object_stream = multiprocessing.Process(
target=add_object, args=(
backup_opt_dict, tar_backup_queue, file_name, time_stamp))
add_object_stream.daemon = True
add_object_stream.start()
# Generate a string hostname, backup name, timestamp and backup level
file_name = add_host_name_ts_level(backup_opt_dict, time_stamp)
meta_data_backup_file = u'tar_metadata_{0}'.format(file_name)
backup_opt_dict.meta_data_file = meta_data_backup_file
tar_backup_stream.join()
tar_backup_queue.put(({False: False}))
tar_backup_queue.close()
add_object_stream.join()
# Initialize a Queue for a maximum of 2 items
tar_backup_queue = multiprocessing.Queue(maxsize=2)
if add_object_stream.exitcode:
raise Exception('failed to upload object to swift server')
if is_windows():
backup_opt_dict.absolute_path = backup_opt_dict.src_file
backup_opt_dict.src_file = use_shadow(backup_opt_dict.src_file,
backup_opt_dict.volume)
(backup_opt_dict, manifest_meta_dict, tar_meta_to_upload,
tar_meta_prev) = gen_manifest_meta(
backup_opt_dict, manifest_meta_dict, meta_data_backup_file)
# Execute a tar gzip of the specified directory and return
# small chunks (default 128MB), timestamp, backup, filename,
# file chunk index and the tar meta-data file
(backup_opt_dict, tar_command, manifest_meta_dict) = \
gen_tar_command(opt_dict=backup_opt_dict,
time_stamp=time_stamp,
remote_manifest_meta=manifest_meta_dict)
manifest_file = u''
meta_data_abs_path = '{0}/{1}'.format(
backup_opt_dict.workdir, tar_meta_prev)
tar_backup_stream = multiprocessing.Process(
target=tar_backup, args=(
backup_opt_dict, tar_command, tar_backup_queue,))
# Upload swift manifest for segments
if backup_opt_dict.upload:
# Request a new auth client in case the current token
# is expired before uploading tar meta data or the swift manifest
backup_opt_dict = get_client(backup_opt_dict)
tar_backup_stream.daemon = True
tar_backup_stream.start()
if not backup_opt_dict.no_incremental:
# Upload tar incremental meta data file and remove it
logging.info('[*] Uploading tar meta data file: {0}'.format(
tar_meta_to_upload))
with open(meta_data_abs_path, 'r') as meta_fd:
backup_opt_dict.sw_connector.put_object(
backup_opt_dict.container, tar_meta_to_upload, meta_fd)
# Removing tar meta data file, so we have only one authoritative
# version on swift
logging.info('[*] Removing tar meta data file: {0}'.format(
meta_data_abs_path))
os.remove(meta_data_abs_path)
# Upload manifest to swift
manifest_upload(
manifest_file, backup_opt_dict, file_name, manifest_meta_dict)
add_object_stream = multiprocessing.Process(
target=add_object, args=(
backup_opt_dict, tar_backup_queue, file_name, time_stamp))
add_object_stream.daemon = True
add_object_stream.start()
# Unmount and remove lvm snapshot volume
lvm_snap_remove(backup_opt_dict)
tar_backup_stream.join()
tar_backup_queue.put(({False: False}))
tar_backup_queue.close()
add_object_stream.join()
if add_object_stream.exitcode:
raise Exception('failed to upload object to swift server')
(backup_opt_dict, manifest_meta_dict, tar_meta_to_upload,
tar_meta_prev) = gen_manifest_meta(
backup_opt_dict, manifest_meta_dict, meta_data_backup_file)
manifest_file = u''
if is_windows():
meta_data_abs_path = os.path.join(backup_opt_dict.workdir,
backup_opt_dict.meta_data_file)
else:
meta_data_abs_path = os.path.join(backup_opt_dict.workdir,
tar_meta_prev)
# Upload swift manifest for segments
if backup_opt_dict.upload:
# Request a new auth client in case the current token
# is expired before uploading tar meta data or the swift manifest
backup_opt_dict = get_client(backup_opt_dict)
if not backup_opt_dict.no_incremental:
# Upload tar incremental meta data file and remove it
logging.info('[*] Uploading tar meta data file: {0}'.format(
tar_meta_to_upload))
with open(meta_data_abs_path, 'r') as meta_fd:
backup_opt_dict.sw_connector.put_object(
backup_opt_dict.container, tar_meta_to_upload, meta_fd)
# Removing tar meta data file, so we have only one
# authoritative version on swift
logging.info('[*] Removing tar meta data file: {0}'.format(
meta_data_abs_path))
os.remove(meta_data_abs_path)
# Upload manifest to swift
manifest_upload(
manifest_file, backup_opt_dict, file_name, manifest_meta_dict)
finally:
if is_windows():
# Delete the shadow copy after the backup
vss_delete_shadow_copy(backup_opt_dict.shadow,
backup_opt_dict.volume)
else:
# Unmount and remove lvm snapshot volume
lvm_snap_remove(backup_opt_dict)

View File

@ -115,6 +115,9 @@ class BackupJob(Job):
elif self.conf.mode == 'mysql':
backup.backup_mode_mysql(
self.conf, self.start_time.timestamp, manifest_meta_dict)
elif self.conf.mode == 'sqlserver':
backup.backup_mode_sql_server(
self.conf, self.time_stamp, manifest_meta_dict)
else:
raise ValueError('Please provide a valid backup mode')

View File

@ -22,9 +22,93 @@ Freezer main execution function
"""
from freezer import job
from freezer.arguments import backup_arguments
from freezer.utils import create_dir
import os
import subprocess
import logging
import sys
def freezer_main(backup_args):
# Initialize backup options
(backup_args, arg_parse) = backup_arguments()
def configure_log_file_using_defaults():
""" Configure log file for freezer """
dry_run_message = ''
if backup_args.dry_run:
dry_run_message = '[DRY_RUN] '
def configure_logging(file_name):
expanded_file_name = os.path.expanduser(file_name)
expanded_dir_name = os.path.dirname(expanded_file_name)
create_dir(expanded_dir_name, do_log=False)
logging.basicConfig(
filename=expanded_file_name,
level=logging.INFO,
format=('%(asctime)s %(name)s %(levelname)s {0}%(message)s'.
format(dry_run_message)))
return expanded_file_name
if backup_args.log_file:
return configure_logging(backup_args.log_file)
for file_name in ['/var/log/freezer.log', '~/.freezer/freezer.log']:
try:
return configure_logging(file_name)
except IOError:
pass
raise Exception("Unable to write to log file")
def set_max_process_priority():
""" Set freezer in max priority on the os """
# children processes inherit niceness from father
try:
logging.warning(
'[*] Setting freezer execution with high CPU and I/O priority')
PID = os.getpid()
# Set cpu priority
os.nice(-19)
# Set I/O Priority to Real Time class with level 0
subprocess.call(
[u'{0}'.format(backup_args.ionice),
u'-c', u'1', u'-n', u'0', u'-t', u'-p', u'{0}'.format(PID)])
except Exception as priority_error:
logging.warning('[*] Priority: {0}'.format(priority_error))
def fail(exit_code, e, do_log=True):
""" Catch the exceptions and write it to log """
msg = '[*] Critical Error: {0}\n'.format(e)
if not backup_args.quiet:
sys.stderr.write(msg)
if do_log:
logging.critical(msg)
sys.exit(exit_code)
def freezer_main():
"""Freezer main loop for job execution.
"""
if backup_args.version:
print "freezer version {0}".format(backup_args.__version__)
sys.exit(1)
try:
log_file_name = configure_log_file_using_defaults()
except Exception as err:
fail(1, err, do_log=False)
if not backup_args.quiet:
print 'log file at {0}'.format(log_file_name)
if backup_args.max_priority:
set_max_process_priority()
freezer_job = job.create_job(backup_args)
freezer_job.execute()

33
freezer/scripts/vss.ps1 Normal file
View File

@ -0,0 +1,33 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
param([String]$volume="")
$shadow = get-wmiobject win32_shadowcopy
# get static method
$class=[WMICLASS]"root\cimv2:win32_shadowcopy"
# create a new shadow copy
$s1 = $class.create($volume, "ClientAccessible")
# get shadow ID
$s2 = gwmi Win32_ShadowCopy | ? { $_.ID -eq $s1.ShadowID }
$d = $s2.DeviceObject + "\"
# create a symlink for the shadow path
cmd /c mklink /d $volume\shadowcopy "$d"
echo "shadow id:" $s2

View File

@ -24,6 +24,7 @@ Freezer Tar related functions
from freezer.utils import (
validate_all_args, add_host_name_ts_level, create_dir)
from freezer.swift import object_to_file
from freezer.winutils import clean_tar_command, is_windows, add_gzip_to_command
import os
import logging
@ -65,19 +66,21 @@ def tar_restore(backup_opt_dict, read_pipe):
--directory {1} '.format(
backup_opt_dict.tar_path, backup_opt_dict.restore_abs_path)
if is_windows():
os.chdir(backup_opt_dict.restore_abs_path)
tar_cmd = 'gzip -dc | tar -xf - --unlink-first --ignore-zeros'
# Check if encryption file is provided and set the openssl decrypt
# command accordingly
if backup_opt_dict.encrypt_pass_file:
openssl_cmd = " {0} enc -d -aes-256-cfb -pass file:{1}".format(
backup_opt_dict.openssl_path,
backup_opt_dict.encrypt_pass_file)
tar_cmd = ' {0} | {1} '.format(openssl_cmd, tar_cmd)
tar_cmd = '{0} | {1} '.format(openssl_cmd, tar_cmd)
tar_cmd_proc = subprocess.Popen(
tar_cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, shell=True,
executable=backup_opt_dict.bash_path)
stderr=subprocess.PIPE, shell=True)
# Start loop reading the pipe and pass the data to the tar std input.
# If EOFError exception is raised, the loop end the std err will be
# checked for errors.
@ -86,7 +89,7 @@ def tar_restore(backup_opt_dict, read_pipe):
tar_cmd_proc.stdin.write(read_pipe.recv_bytes())
except EOFError:
logging.info(
'[*] Pipe closed as EOF reached. Data transmitted succesfully.')
'[*] Pipe closed as EOF reached. Data transmitted succesfully')
tar_err = tar_cmd_proc.communicate()[1]
@ -106,6 +109,10 @@ def tar_incremental(
options will be checked and updated respectively
"""
if is_windows():
raise NotImplementedError('[*] Tar incrementals are not supported'
' on windows currently.')
if not tar_cmd or not backup_opt_dict:
logging.error(('[*] Error: tar_incremental, please provide tar_cmd '
'and backup options'))
@ -206,6 +213,11 @@ def gen_tar_command(
opt_dict.exclude)
tar_command = ' {0} . '.format(tar_command)
if is_windows():
tar_command = clean_tar_command(tar_command)
tar_command = add_gzip_to_command(tar_command)
# Encrypt data if passfile is provided
if opt_dict.encrypt_pass_file:
openssl_cmd = "{0} enc -aes-256-cfb -pass file:{1}".format(
@ -220,9 +232,7 @@ def tar_backup(opt_dict, tar_command, backup_queue):
Execute an incremental backup using tar options, specified as
function arguments
"""
# Set counters, index, limits and bufsize for subprocess
buf_size = 1048576
file_read_limit = 0
file_chunk_index = 00000000
tar_chunk = b''
@ -232,8 +242,7 @@ def tar_backup(opt_dict, tar_command, backup_queue):
tar_process = subprocess.Popen(
tar_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE,
bufsize=buf_size, shell=True,
executable=opt_dict.bash_path, env=os.environ.copy())
shell=True)
# Iterate over tar process stdout
for file_block in tar_process.stdout:

View File

@ -591,3 +591,16 @@ def human2bytes(s):
for i, s in enumerate(sset[1:]):
prefix[s] = 1 << (i + 1) * 10
return int(num * prefix[letter])
def create_subprocess(cmd):
"""
Create a new subprocess in the OS
:param cmd: command to execute in the subprocess
:return: the output and errors of the subprocess
"""
process = subprocess.Popen(cmd,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
return process.communicate()

135
freezer/vss.py Normal file
View File

@ -0,0 +1,135 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from freezer.winutils import DisableFileSystemRedirection
from freezer.utils import create_subprocess
import logging
import os
def vss_create_shadow_copy(volume):
"""
Create a new shadow copy for the specified volume
Windows registry path for vss:
HKEY_LOCAL_MACHINE\System\CurrentControlSet\Services\VSS\Settings
MaxShadowCopies
Windows is limited in how many shadow copies can create per volume.
The default amount of shadow copies is 64, the minimum is 1 and the maxi-
mum is 512, if you want to change the default value you need to add/edit
the key MaxShadowCopies and set the amount of shadow copies per volume.
MinDiffAreaFileSize
The minimum size of the shadow copy storage area is a per-computer setting
that can be specified by using the MinDiffAreaFileSize registry value.
If the MinDiffAreaFileSize registry value is not set, the minimum size of
the shadow copy storage area is 32 MB for volumes that are smaller than
500 MB and 320 MB for volumes that are larger than 500 MB.
If you have not set a maximum size, there is no limit to the amount
of space that can be used.
If the MinDiffAreaFileSize registry value does not exist, the backup
application can create it under the following registry key:
HKEY_LOCAL_MACHINE\System\CurrentControlSet\Services\VolSnap
Freezer create a shadow copy for each time the client runs it's been
removed after the backup is complete.
:param volume: The letter of the windows volume e.g. c:\\
:return: shadow_id: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX
:return: shadow_path: shadow copy path
"""
shadow_path = None
shadow_id = None
with DisableFileSystemRedirection():
path = os.path.dirname(os.path.abspath(__file__))
script = '{0}\\scripts\\vss.ps1'.format(path)
(out, err) = create_subprocess(['powershell.exe',
'-executionpolicy', 'unrestricted',
'-command', script,
'-volume', volume])
if err != '':
raise Exception('[*] Error creating a new shadow copy on {0}'
', error {1}' .format(volume, err))
for line in out.split('\n'):
if 'symbolic' in line:
shadow_path = line.split('>>')[1].strip()
if '__RELPATH' in line:
shadow_id = line.split('=')[1].strip().lower() + '}'
shadow_id = shadow_id[1:]
logging.info('[*] Created shadow copy {0}'.
format(shadow_id))
return shadow_path, shadow_id
def vss_delete_shadow_copy(shadow_id, volume):
"""
Delete a shadow copy from the volume with the given shadow_id
:param shadow_id: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX
:return: bool
"""
with DisableFileSystemRedirection():
cmd = ['vssadmin', 'delete', 'shadows',
'/shadow={0}'.format(shadow_id), '/quiet']
(out, err) = create_subprocess(cmd)
if err != '':
raise Exception('[*] Error deleting shadow copy with id {0}'
', error {1}' .format(shadow_id, err))
try:
os.rmdir(os.path.join(volume, 'shadowcopy'))
except Exception:
logging.error('Failed to delete shadow copy symlink {0}'.
format(os.path.join(volume, 'shadowcopy')))
logging.info('[*] Deleting shadow copy {0}'.
format(shadow_id))
return True
def stop_sql_server(backup_opt_dict):
""" Stop a SQL Server instance to perform the backup of the db files """
logging.info('[*] Stopping SQL Server for backup')
with DisableFileSystemRedirection():
cmd = 'net stop "SQL Server ({0})"'\
.format(backup_opt_dict.sql_server_instance)
(out, err) = create_subprocess(cmd)
if err != '':
raise Exception('[*] Error while stopping SQL Server,'
', error {0}'.format(err))
def start_sql_server(backup_opt_dict):
""" Start the SQL Server instance after the backup is completed """
with DisableFileSystemRedirection():
cmd = 'net start "SQL Server ({0})"'\
.format(backup_opt_dict.sql_server_instance)
(out, err) = create_subprocess(cmd)
if err != '':
raise Exception('[*] Error while starting SQL Server'
', error {0}'.format(err))
logging.info('[*] SQL Server back to normal')

71
freezer/winutils.py Normal file
View File

@ -0,0 +1,71 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# This product includes cryptographic software written by Eric Young
# (eay@cryptsoft.com). This product includes software written by Tim
# Hudson (tjh@cryptsoft.com).
# ========================================================================
import sys
import ctypes
def is_windows():
"""
:return: True if the running platform is windows
"""
return True if sys.platform == 'win32' else False
class DisableFileSystemRedirection:
"""
When a 32 bit program runs on a 64 bit operating system the paths
to C:/Windows/System32 automatically get redirected to the 32 bit
version (C:/Windows/SysWow64), if you really do need to access the
contents of System32, you need to disable the file system redirector first.
"""
if is_windows():
_disable = ctypes.windll.kernel32.Wow64DisableWow64FsRedirection
_revert = ctypes.windll.kernel32.Wow64RevertWow64FsRedirection
else:
_disable = ''
_revert = ''
def __enter__(self):
self.old_value = ctypes.c_long()
self.success = self._disable(ctypes.byref(self.old_value))
def __exit__(self, type, value, traceback):
if self.success:
self._revert(self.old_value)
def use_shadow(to_backup, volume):
""" add the shadow path to the backup directory """
return to_backup.replace(volume, '{0}shadowcopy\\'.format(volume))
def clean_tar_command(tar_cmd):
""" Delete tar arguments that are not supported by GnuWin32 tar"""
tar_cmd = tar_cmd.replace('--hard-dereference', '')
tar_cmd = tar_cmd.replace('--no-check-device', '')
tar_cmd = tar_cmd.replace('--warning=none', '')
tar_cmd = tar_cmd.replace('--seek', '')
tar_cmd = tar_cmd.replace('-z', '')
return tar_cmd
def add_gzip_to_command(tar_cmd):
gzip_cmd = 'gzip -7'
return '{0} | {1}'.format(tar_cmd, gzip_cmd)

View File

@ -40,7 +40,7 @@ def read(*filenames, **kwargs):
setup(
name='freezer',
version='1.1.2',
version='1.1.3',
url='https://github.com/stackforge/freezer',
license='Apache Software License',
author='Fausto Marzi, Ryszard Chojnacki, Emil Dimitrov',
@ -86,5 +86,11 @@ setup(
'docutils>=0.8.1'],
extras_require={
'testing': ['pytest', 'flake8'],
}
},
entry_points={
'console_scripts': [
'freezerc=freezer.main:freezer_main'
]
},
data_files=[('freezer/scripts', ['freezer/scripts/vss.ps1'])]
)

View File

@ -347,6 +347,10 @@ class FakeSubProcess:
def communicate(cls):
return 'successfully removed', ''
@classmethod
def communicate_error(cls):
return '', 'error'
class stdin:
def __call__(self, *args, **kwargs):
return self
@ -472,6 +476,23 @@ class FakeSubProcess5:
return True
class FakeSubProcess6:
def __init__(self):
pass
@classmethod
def Popen(cls, cmd=None):
return cls
@classmethod
def communicate(cls):
return 'ok', ''
@classmethod
def communicate_error(cls):
return '', 'error'
class Lvm:
def __init__(self):
return None
@ -683,6 +704,8 @@ class BackupOpt1:
self.dry_run = False
self.upload_limit = -1
self.download_limit = -1
self.sql_server_instance = 'Sql Server'
class FakeMySQLdb:
@ -849,6 +872,18 @@ class Os:
def join(cls, directory1=True, directory2=True):
return '/tmp/testdir'
@classmethod
def rmdir(cls, directory1=True):
return True
@classmethod
def chdir(cls, directory1=True):
return True
@classmethod
def chdir2(cls, directory1=True):
raise Exception
class Os1(Os):
@classmethod
@ -920,6 +955,7 @@ class FakeSwift:
def remove_obj_older_than(self, backup_opt):
return backup_opt
class FakeRestore:
def __init__(self):
@ -980,5 +1016,55 @@ class FakeJob:
def execute(self):
return
def fake_create_job(conf):
return FakeJob(conf)
class FakeVss:
def __init__(self):
return None
@classmethod
def vss_create_shadow_copy(self, volume):
return 'ShadowID: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX'
@classmethod
def vss_create_shadow_copy_error(self, volume):
return 'ShadowID: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', 'error'
@classmethod
def vss_get_shadow_copy(self, shadow_id):
return 'Shadow Copy Volume: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX'
@classmethod
def vss_get_shadow_copy_error(self, shadow_id):
return 'Shadow Copy Volume: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', ''
@classmethod
def vss_delete_shadow_copy(self, shadow_id):
return True
@classmethod
def vss_delete_shadow_copy_error(self, shadow_id):
return '', 'error'
class FakeDisableFileSystemRedirection:
success = True
def __enter__(self):
return True
def __exit__(self, type, value, traceback):
if self.success:
return True
def fake_create_subprocess(cmd):
return True, ''
def fake_create_subprocess2(cmd):
return True, 'Error'

14
tests/scenario/swiftrc Normal file
View File

@ -0,0 +1,14 @@
unset OS_USERNAME
unset OS_PASSWORD
unset OS_TENANT_NAME
unset OS_AUTH_URL
unset OS_REGION_NAME
unset OS_TENANT_ID
unset OS_SERVICE_TOKEN
unset OS_SERVICE_ENDPOINT
export OS_USERNAME=admin
export OS_PASSWORD=admin
export OS_TENANT_NAME=admin
export OS_AUTH_URL=http://10.199.199.199:5000/v2.0

View File

@ -21,7 +21,7 @@ Hudson (tjh@cryptsoft.com).
"""
from commons import fake_create_job, BackupOpt1
from commons import fake_create_job
from freezer.main import freezer_main
from freezer import job
@ -29,5 +29,4 @@ from freezer import job
def test_freezer_main(monkeypatch):
monkeypatch.setattr(job, 'create_job', fake_create_job)
backup_opt = BackupOpt1()
assert freezer_main(backup_opt) is None
assert freezer_main() is None

View File

@ -24,6 +24,7 @@ Hudson (tjh@cryptsoft.com).
from commons import *
from freezer.tar import (tar_restore, tar_incremental, tar_backup,
gen_tar_command, tar_restore_args_valid)
from freezer import winutils
import os
import logging
@ -65,6 +66,14 @@ class TestTar:
subprocess, 'Popen', fakesubprocesspopen)
assert tar_restore(backup_opt, fakepipe) is None
# expected_tar_cmd = 'gzip -dc | tar -xf - --unlink-first --ignore-zeros'
monkeypatch.setattr(winutils, 'is_windows', True)
fake_os = Os()
monkeypatch.setattr(os, 'chdir', fake_os.chdir)
assert tar_restore(backup_opt, fakepipe) is None
monkeypatch.setattr(os, 'chdir', fake_os.chdir2)
pytest.raises(Exception, tar_restore(backup_opt, fakepipe))
def test_tar_incremental(self, monkeypatch):
@ -72,7 +81,7 @@ class TestTar:
fakelogging = FakeLogging()
(tar_cmd, curr_tar_meta,
remote_manifest_meta) = True, True, {}
(val1, val2, val3) = tar_incremental(
(val1, val2, val3) = tar_incremental(
tar_cmd, backup_opt, curr_tar_meta,
remote_manifest_meta)
assert val1 is not False
@ -126,6 +135,7 @@ class TestTar:
backup_opt, meta_data_backup_file, time_stamp,
remote_manifest_meta)
def test_tar_backup(self, monkeypatch):
backup_opt = BackupOpt1()

176
tests/test_vss.py Normal file
View File

@ -0,0 +1,176 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from commons import (FakeDisableFileSystemRedirection, FakeSubProcess,
FakeLogging, BackupOpt1, Os, FakeSubProcess3, FakeSubProcess6,
fake_create_subprocess, fake_create_subprocess2)
from freezer import vss
from freezer import winutils
from freezer import utils
import subprocess
import os
import logging
import pytest
class TestVss:
def test_start_sql_server(self, monkeypatch):
fake_disable_redirection = FakeDisableFileSystemRedirection()
backup_opt = BackupOpt1()
fakelogging = FakeLogging()
fakesubprocess = FakeSubProcess()
fakesubprocesspopen = fakesubprocess.Popen()
monkeypatch.setattr(
subprocess.Popen, 'communicate',
fakesubprocesspopen.communicate)
monkeypatch.setattr(
subprocess, 'Popen', fakesubprocesspopen)
monkeypatch.setattr(
winutils.DisableFileSystemRedirection, '__enter__',
fake_disable_redirection.__enter__)
monkeypatch.setattr(
winutils.DisableFileSystemRedirection, '__exit__',
fake_disable_redirection.__exit__)
monkeypatch.setattr(logging, 'info', fakelogging.info)
assert vss.start_sql_server(backup_opt) is not False
fakesubprocess = FakeSubProcess3()
fakesubprocesspopen = fakesubprocess.Popen()
monkeypatch.setattr(
subprocess.Popen, 'communicate',
fakesubprocesspopen.communicate)
monkeypatch.setattr(
subprocess, 'Popen', fakesubprocesspopen)
pytest.raises(Exception, vss.start_sql_server(backup_opt))
def test_stop_sql_server(self, monkeypatch):
fake_disable_redirection = FakeDisableFileSystemRedirection()
backup_opt = BackupOpt1()
fakelogging = FakeLogging()
fakesubprocess = FakeSubProcess()
fakesubprocesspopen = fakesubprocess.Popen()
monkeypatch.setattr(
subprocess.Popen, 'communicate',
fakesubprocesspopen.communicate)
monkeypatch.setattr(
subprocess, 'Popen', fakesubprocesspopen)
monkeypatch.setattr(
winutils.DisableFileSystemRedirection, '__enter__',
fake_disable_redirection.__enter__)
monkeypatch.setattr(
winutils.DisableFileSystemRedirection, '__exit__',
fake_disable_redirection.__exit__)
monkeypatch.setattr(logging, 'info', fakelogging.info)
assert vss.start_sql_server(backup_opt) is not False
fakesubprocess = FakeSubProcess3()
fakesubprocesspopen = fakesubprocess.Popen()
monkeypatch.setattr(
subprocess.Popen, 'communicate',
fakesubprocesspopen.communicate)
monkeypatch.setattr(
subprocess, 'Popen', fakesubprocesspopen)
pytest.raises(Exception, vss.stop_sql_server(backup_opt))
def test_vss_create_shadow_copy(self, monkeypatch):
fake_disable_redirection = FakeDisableFileSystemRedirection()
fakelogging = FakeLogging()
fakesubprocess = FakeSubProcess()
fakesubprocesspopen = fakesubprocess.Popen()
monkeypatch.setattr(
subprocess.Popen, 'communicate',
fakesubprocesspopen.communicate)
monkeypatch.setattr(
subprocess, 'Popen', fakesubprocesspopen)
monkeypatch.setattr(
winutils.DisableFileSystemRedirection, '__enter__',
fake_disable_redirection.__enter__)
monkeypatch.setattr(
winutils.DisableFileSystemRedirection, '__exit__',
fake_disable_redirection.__exit__)
monkeypatch.setattr(logging, 'info', fakelogging.info)
assert vss.vss_create_shadow_copy('C:\\') is not False
fakesubprocess = FakeSubProcess3()
fakesubprocesspopen = fakesubprocess.Popen()
monkeypatch.setattr(
subprocess.Popen, 'communicate',
fakesubprocesspopen.communicate)
monkeypatch.setattr(
subprocess, 'Popen', fakesubprocesspopen)
pytest.raises(Exception, vss.vss_create_shadow_copy('C:\\'))
def test_vss_delete_shadow_copy(self, monkeypatch):
fakelogging = FakeLogging()
monkeypatch.setattr(logging, 'info', fakelogging.info)
fake_disable_redirection = FakeDisableFileSystemRedirection()
monkeypatch.setattr(
winutils.DisableFileSystemRedirection, '__enter__',
fake_disable_redirection.__enter__)
monkeypatch.setattr(
winutils.DisableFileSystemRedirection, '__exit__',
fake_disable_redirection.__exit__)
fakesubprocess = FakeSubProcess6()
fakesubprocesspopen = fakesubprocess.Popen()
monkeypatch.setattr(subprocess, 'Popen', fakesubprocesspopen)
monkeypatch.setattr(subprocess.Popen, 'communicate',
fakesubprocesspopen.communicate)
pytest.raises(Exception, vss.vss_delete_shadow_copy('', ''))
fakesubprocess = FakeSubProcess3()
fakesubprocesspopen = fakesubprocess.Popen()
monkeypatch.setattr(
subprocess.Popen, 'communicate',
fakesubprocesspopen.communicate)
monkeypatch.setattr(
subprocess, 'Popen', fakesubprocesspopen)
pytest.raises(Exception, vss.vss_delete_shadow_copy('shadow_id',
'C:\\'))
fakesubprocess = FakeSubProcess()
fakesubprocesspopen = fakesubprocess.Popen()
monkeypatch.setattr(
subprocess.Popen, 'communicate',
fakesubprocesspopen.communicate)
monkeypatch.setattr(
subprocess, 'Popen', fakesubprocesspopen)
assert vss.vss_delete_shadow_copy('shadow_id', 'C:\\') is True

70
tests/test_winutils.py Normal file
View File

@ -0,0 +1,70 @@
# Copyright 2014 Hewlett-Packard
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from freezer.winutils import is_windows
from freezer.winutils import use_shadow
from freezer.winutils import clean_tar_command
from freezer.winutils import add_gzip_to_command
from freezer.winutils import DisableFileSystemRedirection
from commons import *
import pytest
class TestWinutils:
def test_is_windows(self, monkeypatch):
fake_os = Os()
monkeypatch.setattr(os, 'name', fake_os)
assert is_windows() is False
def test_use_shadow(self):
test_volume = 'C:'
test_volume2 = 'C:\\'
path = 'C:\\Users\\Test'
expected = 'C:\\shadowcopy\\Users\\Test'
assert use_shadow(path, test_volume2) == expected
# test if the volume format is incorrect
pytest.raises(Exception, use_shadow(path, test_volume))
def test_clean_tar_command(self):
test_tar_command = 'tar --create -z --warning=none ' \
'--no-check-device --one-file-system ' \
'--preserve-permissions --same-owner --seek ' \
'--ignore-failed-read '
expected = 'tar --create --one-file-system --preserve-permissions ' \
'--same-owner --ignore-failed-read '
assert clean_tar_command(test_tar_command) == expected
def test_add_gzip_to_command(self):
test_command = 'tar --create --one-file-system ' \
'--preserve-permissions --same-owner ' \
'--ignore-failed-read '
expected = 'tar --create --one-file-system ' \
'--preserve-permissions --same-owner ' \
'--ignore-failed-read | gzip -7'
assert add_gzip_to_command(test_command) == expected
def test_DisableFileSystemRedirection(self, monkeypatch):
fake_disable_redirection = DisableFileSystemRedirection()
fake_disable_redirection.success = True
assert fake_disable_redirection._revert == ''
assert fake_disable_redirection._disable == ''
pytest.raises(Exception, fake_disable_redirection.__enter__)
pytest.raises(Exception, fake_disable_redirection.__exit__)