Add pep8 check to freezer tests

This patch adds pep8 check on freezer/tests and fixes all
code style problems.

Change-Id: I178e4cbdfcbe610fbee1e6257b0d2b2b922d5411
Closes-Bug: #1643952
Signed-off-by: Ruslan Aliev <raliev@mirantis.com>
This commit is contained in:
Ruslan Aliev 2016-11-22 19:52:52 +03:00
parent ae2b17b1ca
commit da29fee615
28 changed files with 554 additions and 448 deletions

View File

@ -15,29 +15,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import swiftclient
import multiprocessing
import subprocess
import time
import pymongo
import re
import os
from glanceclient.common import utils as glance_utils
import mock
from oslo_config import cfg
from oslo_config import fixture as cfg_fixture
import six
import testtools
from mock import MagicMock
from mock import Mock
from oslo_config import cfg
from oslo_config import fixture as cfg_fixture
from glanceclient.common.utils import IterableWithLength
from freezer.common import config
from freezer.storage import swift
from freezer.utils import utils
from freezer.engine.tar import tar as tar_engine
from freezer.openstack.osclients import OpenstackOpts
from freezer.openstack.osclients import OSClientManager
from freezer.openstack import osclients
from freezer.storage import swift
CONF = cfg.CONF
os.environ['OS_REGION_NAME'] = 'testregion'
@ -50,8 +40,8 @@ os.environ['OS_TENANT_NAME'] = 'testtenantename'
class FakeSubProcess(object):
def __init__(self, opt1=True, stdin=True, stdout=True,
stderr=True, shell=True, executable=True, env={},
bufsize=4096):
stderr=True, shell=True, executable=True, env={},
bufsize=4096):
return None
stdout = ['abcd', 'ehfg']
@ -68,7 +58,7 @@ class FakeSubProcess(object):
def communicate_error(cls):
return '', 'error'
class stdin:
class stdin(object):
def __call__(self, *args, **kwargs):
return self
@ -79,19 +69,19 @@ class FakeSubProcess(object):
class FakeSubProcess3(object):
def __init__(self, opt1=True, stdin=True, stdout=True,
stderr=True, shell=True, executable=True):
stderr=True, shell=True, executable=True):
return None
@classmethod
def Popen(cls, opt1=True, stdin=True, stdout=True,
stderr=True, shell=True, executable=True):
stderr=True, shell=True, executable=True):
return cls
@classmethod
def communicate(cls):
return False, False
class stdin:
class stdin(object):
def __call__(self, *args, **kwargs):
return self
@ -195,7 +185,7 @@ class FakeGlanceClient(object):
@staticmethod
def data(image):
return IterableWithLength(iter("abc"), 3)
return glance_utils.IterableWithLength(iter("abc"), 3)
@staticmethod
def delete(image):
@ -207,7 +197,6 @@ class FakeGlanceClient(object):
class FakeSwiftClient(object):
def __init__(self):
pass
@ -215,18 +204,23 @@ class FakeSwiftClient(object):
def __init__(self):
pass
class Connection:
def __init__(self, key=True, os_options=True, auth_version=True, user=True, authurl=True, tenant_name=True, retries=True, insecure=True):
class Connection(object):
def __init__(self, key=True, os_options=True, auth_version=True,
user=True, authurl=True, tenant_name=True,
retries=True, insecure=True):
self.num_try = 0
def put_object(self, container, obj, contents, content_length=None,
etag=None, chunk_size=None, content_type=None,
headers=None, query_string=None, response_dict=None):
etag=None, chunk_size=None, content_type=None,
headers=None, query_string=None,
response_dict=None):
return True
def head_object(self, container_name='', object_name=''):
if object_name == 'has_segments':
return {'x-object-manifest': 'freezer_segments/hostname_backup_name_1234567890_0'}
return {
'x-object-manifest': ('freezer_segments/hostname_'
'backup_name_1234567890_0')}
else:
return {}
@ -243,25 +237,46 @@ class FakeSwiftClient(object):
def get_container(self, container, *args, **kwargs):
if container == 'freezer_segments':
return ({'container_metadata': True}, [
{'bytes': 251, 'last_modified': '2015-03-09T10:37:01.701170', 'hash': '9a8cbdb30c226d11bf7849f3d48831b9', 'name': 'hostname_backup_name_1234567890_0/1234567890/67108864/00000000', 'content_type': 'application/octet-stream'},
{'bytes': 632, 'last_modified': '2015-03-09T11:54:27.860730', 'hash': 'd657a4035d0dcc18deaf9bfd2a3d0ebf', 'name': 'hostname_backup_name_1234567891_1/1234567891/67108864/00000000', 'content_type': 'application/octet-stream'}
{'bytes': 251,
'last_modified': '2015-03-09T10:37:01.701170',
'hash': '9a8cbdb30c226d11bf7849f3d48831b9',
'name': ('hostname_backup_name_1234567890_0/'
'1234567890/67108864/00000000'),
'content_type': 'application/octet-stream'},
{'bytes': 632,
'last_modified': '2015-03-09T11:54:27.860730',
'hash': 'd657a4035d0dcc18deaf9bfd2a3d0ebf',
'name': ('hostname_backup_name_1234567891_1/'
'1234567891/67108864/00000000'),
'content_type': 'application/octet-stream'}
])
elif container == "test-container" and 'path' in kwargs:
return ({'container_metadata': True}, [
{'bytes': 251, 'last_modified': '2015-03-09T10:37:01.701170', 'hash': '9a8cbdb30c226d11bf7849f3d48831b9', 'name': 'hostname_backup_name_1234567890_0/11417649003', 'content_type': 'application/octet-stream'},
{'bytes': 632, 'last_modified': '2015-03-09T11:54:27.860730', 'hash': 'd657a4035d0dcc18deaf9bfd2a3d0ebf', 'name': 'hostname_backup_name_1234567891_1/1417649003', 'content_type': 'application/octet-stream'}
{'bytes': 251,
'last_modified': '2015-03-09T10:37:01.701170',
'hash': '9a8cbdb30c226d11bf7849f3d48831b9',
'name': ('hostname_backup_name_1234567890_0/'
'11417649003'),
'content_type': 'application/octet-stream'},
{'bytes': 632,
'last_modified': '2015-03-09T11:54:27.860730',
'hash': 'd657a4035d0dcc18deaf9bfd2a3d0ebf',
'name': ('hostname_backup_name_1234567891_1/'
'1417649003'),
'content_type': 'application/octet-stream'}
])
else:
return [{}, []]
def get_account(self, *args, **kwargs):
return [{'count': 0, 'bytes': 0, 'name': '1234'}, {'count': 4, 'bytes': 156095, 'name': 'a1'}], \
return [{'count': 0, 'bytes': 0, 'name': '1234'},
{'count': 4, 'bytes': 156095, 'name': 'a1'}], \
[{'name': 'test-container',
'bytes': 200000,
'count': 1000},
{'name': 'test-container-segments',
'bytes': 300000,
'count': 656}]
'bytes': 200000,
'count': 1000},
{'name': 'test-container-segments',
'bytes': 300000,
'count': 656}]
def get_object(self, *args, **kwargs):
return [{'x-object-meta-length': "123",
@ -270,7 +285,6 @@ class FakeSwiftClient(object):
class BackupOpt1(object):
def __init__(self):
self.dereference_symlink = 'none'
self.mysql_conf = '/tmp/freezer-test-conf-file'
@ -278,7 +292,7 @@ class BackupOpt1(object):
self.lvm_auto_snap = '/dev/null'
self.lvm_volgroup = 'testgroup'
self.lvm_srcvol = 'testvol'
self.lvm_dirmount= '/tmp/testdir'
self.lvm_dirmount = '/tmp/testdir'
self.lvm_snapsize = '1G'
self.lvm_snapname = 'testsnapname'
self.lvcreate_path = 'true'
@ -328,13 +342,14 @@ class BackupOpt1(object):
self.lvm_snapperm = 'ro'
self.compression = 'gzip'
self.storage = MagicMock()
self.engine = MagicMock()
opts = OpenstackOpts.create_from_env().get_opts_dicts()
self.client_manager = OSClientManager(opts.pop('auth_url'),
opts.pop('auth_method'),
**opts)
self.client_manager.get_swift = Mock(
self.storage = mock.MagicMock()
self.engine = mock.MagicMock()
opts = osclients.OpenstackOpts.create_from_env().get_opts_dicts()
self.client_manager = osclients.OSClientManager(opts.pop('auth_url'),
opts.pop(
'auth_method'),
**opts)
self.client_manager.get_swift = mock.Mock(
return_value=FakeSwiftClient().client.Connection())
self.client_manager.create_swift = self.client_manager.get_swift
self.storage = swift.SwiftStorage(self.client_manager,
@ -343,16 +358,18 @@ class BackupOpt1(object):
self.engine = tar_engine.TarEngine(
self.compression, self.dereference_symlink,
self.exclude, self.storage, 1000, False)
self.client_manager.get_glance = Mock(return_value=FakeGlanceClient())
self.client_manager.get_cinder = Mock(return_value=FakeCinderClient())
nova_client = MagicMock()
self.client_manager.get_glance = mock.Mock(
return_value=FakeGlanceClient())
self.client_manager.get_cinder = mock.Mock(
return_value=FakeCinderClient())
nova_client = mock.MagicMock()
self.client_manager.get_nova = Mock(return_value=nova_client)
self.client_manager.get_nova = mock.Mock(return_value=nova_client)
self.command = ''
class Os:
class Os(object):
def __init__(self, directory=True):
return None
@ -465,7 +482,6 @@ class FakeDisableFileSystemRedirection(object):
class FreezerBaseTestCase(testtools.TestCase):
def setUp(self):
if six.PY34:
super().setUp()

View File

@ -19,4 +19,3 @@ service_option = cfg.BoolOpt('freezer',
default=True,
help="Whether or not freezer is expected to be "
"available")

View File

@ -13,13 +13,13 @@
# License for the specific language governing permissions and limitations
# under the License.
import os
from tempest.test_discover import plugins
from freezer.tests.freezer_tempest_plugin import config as freezer_config
class FreezerTempestPlugin(plugins.TempestPlugin):
def load_tests(self):
base_path = os.path.split(os.path.dirname(

View File

@ -11,14 +11,16 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from datetime import datetime
from datetime import timedelta
import json
import os
import subprocess
from datetime import datetime, timedelta
from time import mktime
import tempest.test
from tempest import test
from freezer.tests.integration.common import Temp_Tree
@ -106,8 +108,7 @@ def save_metadata(metadata, path):
json.dump(metadata, f)
class BaseFreezerTest(tempest.test.BaseTestCase):
class BaseFreezerTest(test.BaseTestCase):
credentials = ['primary']
def __init__(self, *args, **kwargs):
@ -136,7 +137,8 @@ class BaseFreezerTest(tempest.test.BaseTestCase):
@classmethod
def get_auth_url(cls):
return cls.os_primary.auth_provider.auth_client.auth_url[:-len('/tokens')]
return cls.os_primary.auth_provider.auth_client.auth_url[:-len(
'/tokens')]
@classmethod
def setup_clients(cls):
@ -152,8 +154,8 @@ class BaseFreezerTest(tempest.test.BaseTestCase):
# Allow developers to set OS_AUTH_URL when developing so that
# Keystone may be on a host other than localhost.
if not 'OS_AUTH_URL' in os.environ:
os.environ['OS_AUTH_URL'] = cls.get_auth_url()
if 'OS_AUTH_URL' not in os.environ:
os.environ['OS_AUTH_URL'] = cls.get_auth_url()
# Mac OS X uses gtar located in /usr/local/bin
os.environ['PATH'] = '/usr/local/bin:' + os.environ['PATH']

View File

@ -11,6 +11,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import os
import subprocess
@ -18,7 +19,7 @@ import subprocess
from tempest import test
from freezer.tests.freezer_tempest_plugin.tests.api import base
from freezer.tests.integration.common import Temp_Tree
from freezer.tests.integration import common
class TestFreezerCompressGzip(base.BaseFreezerTest):
@ -31,11 +32,11 @@ class TestFreezerCompressGzip(base.BaseFreezerTest):
# create a source tree to backup with a few empty files
# (files must be empty to avoid encoding errors with pure random data)
self.source_tree = Temp_Tree()
self.source_tree = common.Temp_Tree()
self.source_tree.add_random_data(size=0)
self.storage_tree = Temp_Tree()
self.dest_tree = Temp_Tree()
self.storage_tree = common.Temp_Tree()
self.dest_tree = common.Temp_Tree()
self.environ = super(TestFreezerCompressGzip, self).get_environ()

View File

@ -11,40 +11,38 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import shutil
import subprocess
import uuid
from freezer.tests.freezer_tempest_plugin.tests.api import base
from tempest import test
from freezer.tests.freezer_tempest_plugin.tests.api import base
class TestFreezerFSBackup(base.BaseFreezerTest):
def __init__(self, *args, **kwargs):
super(TestFreezerFSBackup, self).__init__(*args, **kwargs)
def setUp(self):
super(TestFreezerFSBackup, self).setUp()
test_id = uuid.uuid4().hex
self.backup_source_dir = ("/tmp/freezer-test-backup-source/"
+ test_id)
self.backup_source_dir = (
"/tmp/freezer-test-backup-source/" + test_id
)
self.backup_source_sub_dir = self.backup_source_dir + "/subdir"
self.restore_target_dir = (
"/tmp/freezer-test-backup-restore/"
+ test_id)
"/tmp/freezer-test-backup-restore/" + test_id
)
self.backup_local_storage_dir = (
"/tmp/freezer-test-backup-local-storage/"
+ test_id)
"/tmp/freezer-test-backup-local-storage/" + test_id
)
self.freezer_backup_name = 'freezer-test-backup-fs-0'
@ -68,7 +66,6 @@ class TestFreezerFSBackup(base.BaseFreezerTest):
self.environ = super(TestFreezerFSBackup, self).get_environ()
def tearDown(self):
super(TestFreezerFSBackup, self).tearDown()
shutil.rmtree(self.backup_source_dir, True)
shutil.rmtree(self.restore_target_dir, True)
@ -76,7 +73,6 @@ class TestFreezerFSBackup(base.BaseFreezerTest):
@test.attr(type="gate")
def test_freezer_fs_backup(self):
backup_args = ['freezer-agent',
'--path-to-backup',
self.backup_source_dir,
@ -109,5 +105,6 @@ class TestFreezerFSBackup(base.BaseFreezerTest):
self.backup_source_dir,
self.restore_target_dir]
self.run_subprocess(diff_args, "Test backup restore from local storage "
self.run_subprocess(diff_args,
"Test backup restore from local storage "
"diff.")

View File

@ -13,13 +13,13 @@
# under the License.
import subprocess
from freezer.tests.freezer_tempest_plugin.tests.api import base
from freezer.tests.integration.common import Temp_Tree
from tempest import test
from freezer.tests.freezer_tempest_plugin.tests.api import base
from freezer.tests.integration import common
class TestFreezerMetadataChecksum(base.BaseFreezerTest):
def __init__(self, *args, **kwargs):
super(TestFreezerMetadataChecksum, self).__init__(*args, **kwargs)
@ -28,7 +28,7 @@ class TestFreezerMetadataChecksum(base.BaseFreezerTest):
super(TestFreezerMetadataChecksum, self).setUp()
self.environ = super(TestFreezerMetadataChecksum, self).get_environ()
self.dest_tree = Temp_Tree()
self.dest_tree = common.Temp_Tree()
self.backup_name = 'backup_checksum_test'
def tearDown(self):

View File

@ -11,35 +11,33 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import shutil
import subprocess
import uuid
from freezer.tests.freezer_tempest_plugin.tests.api import base
from tempest import test
from freezer.tests.freezer_tempest_plugin.tests.api import base
class TestFreezerSwiftBackup(base.BaseFreezerTest):
def __init__(self, *args, **kwargs):
super(TestFreezerSwiftBackup, self).__init__(*args, **kwargs)
def setUp(self):
super(TestFreezerSwiftBackup, self).setUp()
test_id = uuid.uuid4().hex
self.backup_source_dir = ("/tmp/freezer-test-backup-source/"
+ test_id)
self.backup_source_dir = (
"/tmp/freezer-test-backup-source/" + test_id
)
self.backup_source_sub_dir = self.backup_source_dir + "/subdir"
self.restore_target_dir = (
"/tmp/freezer-test-backup-restore/"
+ test_id)
"/tmp/freezer-test-backup-restore/" + test_id
)
self.freezer_container_name = 'freezer-test-container-0'
self.freezer_backup_name = 'freezer-test-backup-swift-0'
@ -61,7 +59,6 @@ class TestFreezerSwiftBackup(base.BaseFreezerTest):
self.environ = super(TestFreezerSwiftBackup, self).get_environ()
def tearDown(self):
super(TestFreezerSwiftBackup, self).tearDown()
shutil.rmtree(self.backup_source_dir, True)
@ -69,7 +66,6 @@ class TestFreezerSwiftBackup(base.BaseFreezerTest):
@test.attr(type="gate")
def test_freezer_swift_backup(self):
backup_args = ['freezer-agent',
'--path-to-backup',
self.backup_source_dir,
@ -100,5 +96,5 @@ class TestFreezerSwiftBackup(base.BaseFreezerTest):
self.backup_source_dir,
self.restore_target_dir]
self.run_subprocess(diff_args, "Test backup to swift and restore diff.")
self.run_subprocess(diff_args,
"Test backup to swift and restore diff.")

View File

@ -12,12 +12,14 @@
# License for the specific language governing permissions and limitations
# under the License.
from freezer.tests.freezer_tempest_plugin.tests.api import base
from tempest import test
from freezer.tests.freezer_tempest_plugin.tests.api import base
class TestFreezerTestsRunning(base.BaseFreezerTest):
@test.attr(type="gate")
def test_tests_running(self):
# See if tempest plugin tests run.
self.assertEqual(1, 1, 'Tests are running')
self.assertEqual(1, 1, 'Tests are running')

View File

@ -14,10 +14,11 @@
import subprocess
from freezer.tests.freezer_tempest_plugin.tests.api import base
from freezer import __version__ as FREEZER_VERSION
from tempest import test
from freezer import __version__ as freezer_version
from freezer.tests.freezer_tempest_plugin.tests.api import base
class TestFreezerVersion(base.BaseFreezerTest):
@ -26,4 +27,4 @@ class TestFreezerVersion(base.BaseFreezerTest):
version = subprocess.check_output(['freezer-agent', '--version'],
stderr=subprocess.STDOUT)
self.assertEqual(FREEZER_VERSION, version.strip())
self.assertEqual(freezer_version, version.strip())

View File

@ -11,17 +11,14 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import hashlib
import json
import os
import shutil
import tempfile
import re
import time
import subprocess
from tempest import test
from tempest.lib.cli import base as cli_base
from tempest.lib.cli import output_parser
@ -41,17 +38,18 @@ class BaseFreezerCliTest(base.BaseFreezerTest):
super(BaseFreezerCliTest, cls).setup_clients()
cls.cli = CLIClientWithFreezer(
username = cls.os_primary.credentials.username,
username=cls.os_primary.credentials.username,
# fails if the password contains an unescaped $ sign
password = cls.os_primary.credentials.password.replace('$', '$$'),
tenant_name = cls.os_primary.credentials.tenant_name,
uri = cls.get_auth_url(),
cli_dir = '/usr/local/bin' # devstack default
)
password=cls.os_primary.credentials.password.replace('$', '$$'),
tenant_name=cls.os_primary.credentials.tenant_name,
uri=cls.get_auth_url(),
cli_dir='/usr/local/bin' # devstack default
)
cls.cli.cli_dir = ''
def delete_job(self, job_id):
self.cli.freezer_scheduler(action='job-delete', flags='-c test_node -j {}'.format(job_id))
self.cli.freezer_scheduler(action='job-delete',
flags='-c test_node -j {}'.format(job_id))
def create_job(self, job_json):
@ -59,7 +57,10 @@ class BaseFreezerCliTest(base.BaseFreezerTest):
job_file.write(json.dumps(job_json))
job_file.flush()
output = self.cli.freezer_scheduler(action='job-create', flags='-c test_node --file {}'.format(job_file.name))
output = self.cli.freezer_scheduler(
action='job-create',
flags='-c test_node --file {}'.format(job_file.name)
)
self.assertTrue(output.startswith('Created job'))
job_id = output[len('Created job '):]
@ -68,13 +69,15 @@ class BaseFreezerCliTest(base.BaseFreezerTest):
return job_id
def find_job_in_job_list(self, job_id):
job_list = output_parser.table(self.cli.freezer_scheduler(action='job-list', flags='-c test_node'))
job_list = output_parser.table(
self.cli.freezer_scheduler(action='job-list',
flags='-c test_node'))
for row in job_list['values']:
if row[0].strip() == job_id.strip():
return row
self.fail('Could not find job: {}'.format(job_id))
self.fail('Could not find job: {}'.format(job_id))
def wait_for_job_status(self, job_id, status, timeout=720):
start = time.time()
@ -85,7 +88,12 @@ class BaseFreezerCliTest(base.BaseFreezerTest):
if row[JOB_TABLE_STATUS_COLUMN] == status:
return
elif time.time() - start > timeout:
self.fail("Status of job '{}' is '{}'. Expected '{}'".format(job_id, row[JOB_TABLE_STATUS_COLUMN], status))
self.fail(
("Status of job '{}' is '{}'. "
"Expected '{}'").format(job_id,
row[JOB_TABLE_STATUS_COLUMN],
status)
)
else:
time.sleep(1)
@ -94,11 +102,9 @@ class BaseFreezerCliTest(base.BaseFreezerTest):
self.assertEqual(expected, row[column])
class CLIClientWithFreezer(cli_base.CLIClient):
def freezer_scheduler(self, action, flags='', params='', fail_ok=False,
endpoint_type='publicURL', merge_stderr=False):
endpoint_type='publicURL', merge_stderr=False):
"""Executes freezer-scheduler command for the given action.
:param action: the cli command to run using freezer-scheduler
@ -119,10 +125,10 @@ class CLIClientWithFreezer(cli_base.CLIClient):
return self.cmd_with_auth(
'freezer-scheduler', action, flags, params, fail_ok, merge_stderr)
# This class is just copied from the freezer repo. Depending on where the
# scenario tests end up we may need to refactore this.
class Temp_Tree(object):
def __init__(self, suffix='', dir=None, create=True):
self.create = create
if create:
@ -156,7 +162,7 @@ class Temp_Tree(object):
for y in range(nfile):
abs_pathname = self.create_file_with_random_data(
dir_path=subdir_path, size=size)
rel_path_name = abs_pathname[len(self.path)+1:]
rel_path_name = abs_pathname[len(self.path) + 1:]
self.files.append(rel_path_name)
def create_file_with_random_data(self, dir_path, size=1024):
@ -193,7 +199,7 @@ class Temp_Tree(object):
"""
self.files = []
for root, dirs, files in os.walk(self.path):
rel_base = root[len(self.path)+1:]
rel_base = root[len(self.path) + 1:]
self.files.extend([os.path.join(rel_base, x) for x in files])
return self.files
@ -222,7 +228,6 @@ class Temp_Tree(object):
class TestFreezerScenario(BaseFreezerCliTest):
def setUp(self):
super(TestFreezerScenario, self).setUp()
self.source_tree = Temp_Tree()
@ -240,46 +245,50 @@ class TestFreezerScenario(BaseFreezerCliTest):
def test_simple_backup(self):
backup_job = {
"job_actions": [
{
"freezer_action": {
"action": "backup",
"mode": "fs",
"storage": "local",
"backup_name": "backup1",
"path_to_backup": self.source_tree.path,
"container": "/tmp/freezer_test/",
},
"max_retries": 3,
"max_retries_interval": 60
}
],
"description": "a test backup"
"job_actions": [
{
"freezer_action": {
"action": "backup",
"mode": "fs",
"storage": "local",
"backup_name": "backup1",
"path_to_backup": self.source_tree.path,
"container": "/tmp/freezer_test/",
},
"max_retries": 3,
"max_retries_interval": 60
}
],
"description": "a test backup"
}
restore_job = {
"job_actions": [
{
"freezer_action": {
"action": "restore",
"storage": "local",
"restore_abs_path": self.dest_tree.path,
"backup_name": "backup1",
"container": "/tmp/freezer_test/",
},
"max_retries": 3,
"max_retries_interval": 60
}
],
"description": "a test restore"
"job_actions": [
{
"freezer_action": {
"action": "restore",
"storage": "local",
"restore_abs_path": self.dest_tree.path,
"backup_name": "backup1",
"container": "/tmp/freezer_test/",
},
"max_retries": 3,
"max_retries_interval": 60
}
],
"description": "a test restore"
}
backup_job_id = self.create_job(backup_job)
self.cli.freezer_scheduler(action='job-start', flags='-c test_node -j {}'.format(backup_job_id))
self.cli.freezer_scheduler(action='job-start',
flags='-c test_node -j {}'.format(
backup_job_id))
self.wait_for_job_status(backup_job_id, 'completed')
self.assertJobColumnEqual(backup_job_id, JOB_TABLE_RESULT_COLUMN, 'success')
self.assertJobColumnEqual(backup_job_id, JOB_TABLE_RESULT_COLUMN,
'success')
restore_job_id = self.create_job(restore_job)
self.wait_for_job_status(restore_job_id, 'completed')
self.assertJobColumnEqual(restore_job_id, JOB_TABLE_RESULT_COLUMN, 'success')
self.assertJobColumnEqual(restore_job_id, JOB_TABLE_RESULT_COLUMN,
'success')
self.assertTrue(self.source_tree.is_equal(self.dest_tree))

View File

@ -15,21 +15,21 @@
import distutils.spawn
import hashlib
import json
import itertools
import json
import os
import shutil
import six
import subprocess
import tempfile
import unittest
import paramiko
import six
FREEZERC = distutils.spawn.find_executable('freezer-agent')
class CommandFailed(Exception):
def __init__(self, returncode, cmd, output, stderr):
super(CommandFailed, self).__init__()
self.returncode = returncode
@ -77,14 +77,15 @@ def execute(args, must_fail=False, merge_stderr=False):
result, result_err = proc.communicate()
if not must_fail and proc.returncode != 0:
raise CommandFailed(proc.returncode, ' '.join(args), result, result_err)
raise CommandFailed(proc.returncode, ' '.join(args), result,
result_err)
if must_fail and proc.returncode == 0:
raise CommandFailed(proc.returncode, ' '.join(args), result, result_err)
raise CommandFailed(proc.returncode, ' '.join(args), result,
result_err)
return result
class Temp_Tree(object):
def __init__(self, suffix='', dir=None, create=True):
self.create = create
if create:
@ -118,7 +119,7 @@ class Temp_Tree(object):
for y in range(nfile):
abs_pathname = self.create_file_with_random_data(
dir_path=subdir_path, size=size)
rel_path_name = abs_pathname[len(self.path)+1:]
rel_path_name = abs_pathname[len(self.path) + 1:]
self.files.append(rel_path_name)
def create_file_with_random_data(self, dir_path, size=1024):
@ -155,7 +156,7 @@ class Temp_Tree(object):
"""
self.files = []
for root, dirs, files in os.walk(self.path):
rel_base = root[len(self.path)+1:]
rel_base = root[len(self.path) + 1:]
self.files.extend([os.path.join(rel_base, x) for x in files])
return self.files
@ -220,8 +221,8 @@ class TestFS(unittest.TestCase):
os_region = os.environ.get('FREEZER_TEST_OS_REGION_NAME')
os_password = os.environ.get('FREEZER_TEST_OS_PASSWORD')
os_auth_url = os.environ.get('FREEZER_TEST_OS_AUTH_URL')
use_os = (os_tenant_name and os_user_name and os_region
and os_password and os_auth_url)
use_os = (os_tenant_name and os_user_name and os_region and
os_password and os_auth_url)
if use_os:
os.environ['OS_USERNAME'] = os_user_name
os.environ['OS_TENANT_NAME'] = os_tenant_name

View File

@ -23,7 +23,6 @@ from freezer.tests.integration import common
class TestSimpleExecution(common.TestFS):
def test_freezerc_executes(self):
result = common.execute_freezerc({})
self.assertIsNotNone(result)
@ -35,7 +34,6 @@ class TestSimpleExecution(common.TestFS):
class TestBackupFSLocalstorage(common.TestFS):
def test_trees(self):
self.assertTreesMatch()
self.source_tree.add_random_data()
@ -43,7 +41,8 @@ class TestBackupFSLocalstorage(common.TestFS):
def test_backup_single_level(self):
"""
- use the default source and destination trees in /tmp (see common.TestFS)
- use the default source and destination trees in /tmp
(see common.TestFS)
- use temporary directory for backup storage
- add some random data
- check that trees don't match anymore
@ -126,9 +125,9 @@ class TestBackupFSLocalstorage(common.TestFS):
self.assertTreesMatchNot()
backup_name = uuid.uuid4().hex
path_to_backup= self.source_tree.path
lvm_snapsize= '50M'
lvm_snapname= 'freezer-snap_{0}'.format(backup_name)
path_to_backup = self.source_tree.path
lvm_snapsize = '50M'
lvm_snapname = 'freezer-snap_{0}'.format(backup_name)
lvm_dirmount = '/var/freezer/freezer-{0}'.format(backup_name)
with common.Temp_Tree() as storage_dir:
@ -167,7 +166,8 @@ class TestBackupSSH(common.TestFS):
- FREEZER_TEST_SSH_KEY
- FREEZER_TEST_SSH_USERNAME
- FREEZER_TEST_SSH_HOST
- FREEZER_TEST_CONTAINER (directory on the remote machine used to store backups)
- FREEZER_TEST_CONTAINER
(directory on the remote machine used to store backups)
"""
@unittest.skipIf(not common.TestFS.use_ssh,
@ -294,9 +294,9 @@ class TestBackupSSH(common.TestFS):
self.assertTreesMatchNot()
backup_name = uuid.uuid4().hex
path_to_backup= self.source_tree.path
lvm_snapsize= '1G'
lvm_snapname= 'freezer-snap_{0}'.format(backup_name)
path_to_backup = self.source_tree.path
lvm_snapsize = '1G'
lvm_snapname = 'freezer-snap_{0}'.format(backup_name)
lvm_dirmount = '/var/freezer/freezer-{0}'.format(backup_name)
backup_args = {
@ -360,6 +360,7 @@ class TestBackupUsingSwiftStorage(common.TestFS):
- FREEZER_TEST_OS_PASSWORD
- FREEZER_TEST_OS_AUTH_URL
"""
@unittest.skipIf(not common.TestFS.use_os,
"Cannot test with swift, please provide"
"'FREEZER_TEST_OS_TENANT_NAME',"
@ -529,7 +530,8 @@ class TestBackupUsingSwiftStorage(common.TestFS):
self.assertIsNotNone(result)
result = common.execute_freezerc(restore_args)
self.assertIsNotNone(result)
# we cannot test if trees as a running mysql instance will modify the files
# we cannot test if trees as a running mysql instance
# will modify the files
@unittest.skipIf(not common.TestFS.use_os,
"Cannot test with swift, please provide"

View File

@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
from freezer.engine.tar import tar_builders
@ -20,10 +19,9 @@ from freezer.utils import utils
class TestTarCommandBuilder(unittest.TestCase):
def setUp(self):
self.builder = tar_builders\
.TarCommandBuilder(".", "gzip", False, "gnutar")
self.builder = tar_builders.TarCommandBuilder(".", "gzip", False,
"gnutar")
def test_build(self):
self.assertEqual(
@ -55,8 +53,8 @@ class TestTarCommandBuilder(unittest.TestCase):
"file:encrypt_pass_file")
def test_build_every_arg_windows(self):
self.builder = tar_builders\
.TarCommandBuilder(".", "gzip", True, "gnutar")
self.builder = tar_builders.TarCommandBuilder(".", "gzip", True,
"gnutar")
self.builder.set_listed_incremental("listed-file.tar")
self.builder.set_encryption("encrypt_pass_file", "openssl")
self.builder.set_dereference("hard")
@ -90,8 +88,8 @@ class TestTarCommandRestoreBuilder(unittest.TestCase):
self.builder.set_encryption("encrypt_pass_file", "openssl")
self.assertEqual(
self.builder.build(),
"openssl enc -d -aes-256-cfb -pass file:encrypt_pass_file | gnutar "
"-z --incremental --extract --unlink-first --ignore-zeros"
"openssl enc -d -aes-256-cfb -pass file:encrypt_pass_file | "
"gnutar -z --incremental --extract --unlink-first --ignore-zeros"
" --warning=none --directory restore_path")
def test_all_args_windows(self):

View File

@ -14,6 +14,7 @@
# limitations under the License.
import unittest
import mock
from freezer.openstack import osclients
@ -26,9 +27,10 @@ class TestOsClients(unittest.TestCase):
auth_url="url/v3", password="password", identity_api_version="3",
insecure=False, cacert='cert', user_domain_name='Default',
project_domain_name='Default').get_opts_dicts()
self.client_manager = osclients.OSClientManager(auth_method=self.opts.pop('auth_method'),
auth_url=self.opts.pop('auth_url'),
**self.opts)
self.client_manager = osclients.OSClientManager(
auth_method=self.opts.pop('auth_method'),
auth_url=self.opts.pop('auth_url'),
**self.opts)
def test_init(self):
self.client_manager.get_cinder()

View File

@ -15,14 +15,12 @@ See the License for the specific language governing permissions and
limitations under the License.
"""
import unittest
from freezer.openstack import restore
from freezer.tests import commons
class TestRestore(commons.FreezerBaseTestCase):
def setUp(self):
super(TestRestore, self).setUp()
@ -32,12 +30,14 @@ class TestRestore(commons.FreezerBaseTestCase):
def test_restore_cinder_with_backup_id(self):
backup_opt = commons.BackupOpt1()
ros = restore.RestoreOs(backup_opt.client_manager, backup_opt.container)
ros = restore.RestoreOs(backup_opt.client_manager,
backup_opt.container)
ros.restore_cinder(35, 34, 33)
def test_restore_cinder_without_backup_id(self):
backup_opt = commons.BackupOpt1()
ros = restore.RestoreOs(backup_opt.client_manager, backup_opt.container)
ros = restore.RestoreOs(backup_opt.client_manager,
backup_opt.container)
ros.restore_cinder(35, 34)
def test_restore_nova(self):

View File

@ -12,20 +12,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
from mock import Mock, patch, mock_open
import signal
import unittest
import mock
from mock import patch
from freezer.scheduler import daemon
class TestNoDaemon(unittest.TestCase):
@patch('freezer.scheduler.daemon.signal')
def setUp(self, mock_signal):
self.daemonizable = Mock()
self.daemonizable = mock.Mock()
self.daemon = daemon.NoDaemon(daemonizable=self.daemonizable)
def test_create(self):
@ -53,7 +52,8 @@ class TestNoDaemon(unittest.TestCase):
# @patch('freezer.scheduler.daemon.logging')
# def test_start_restarts_daemonizable_on_Exception(self, mock_logging):
# daemon.NoDaemon.exit_flag = False
# self.daemonizable.start.side_effect = [Exception('test'), lambda: DEFAULT]
# self.daemonizable.start.side_effect = [Exception('test'),
# lambda: DEFAULT]
#
# res = self.daemon.start(log_file=None, dump_stack_trace=True)
#
@ -72,25 +72,25 @@ class TestNoDaemon(unittest.TestCase):
class TestDaemon(unittest.TestCase):
def setUp(self):
self.daemonizable = Mock()
self.daemonizable = mock.Mock()
self.daemon = daemon.Daemon(daemonizable=self.daemonizable)
def test_create(self):
self.assertIsInstance(self.daemon, daemon.Daemon)
def test_handle_program_exit_calls_scheduler_stop(self):
self.daemon.handle_program_exit(Mock(), Mock())
self.daemon.handle_program_exit(mock.Mock(), mock.Mock())
self.daemonizable.stop.assert_called_with()
def test_handle_program_reload_calls_scheduler_reload(self):
self.daemon.handle_reload(Mock(), Mock())
self.daemon.handle_reload(mock.Mock(), mock.Mock())
self.daemonizable.reload.assert_called_with()
def test_signal_map_handlers(self):
signal_map = self.daemon.signal_map
self.assertEqual(self.daemon.handle_program_exit, signal_map[signal.SIGTERM])
self.assertEqual(self.daemon.handle_program_exit,
signal_map[signal.SIGTERM])
self.assertEqual(self.daemon.handle_reload, signal_map[signal.SIGHUP])
@patch('freezer.scheduler.daemon.gettempdir')
@ -126,13 +126,15 @@ class TestDaemon(unittest.TestCase):
self.assertIsNone(res)
self.assertEqual(daemon.Daemon.exit_flag, True)
self.assertTrue(self.daemonizable.start.called)
#
# @patch('freezer.scheduler.daemon.logging')
# @patch('freezer.scheduler.daemon.PidFile')
# @patch('freezer.scheduler.daemon.DaemonContext')
# def test_start_restarts_daemonizable_on_Exception(self, mock_DaemonContext, mock_PidFile, mock_logging):
# def test_start_restarts_daemonizable_on_Exception(
# self, mock_DaemonContext, mock_PidFile, mock_logging):
# daemon.Daemon.exit_flag = False
# self.daemonizable.start.side_effect = [Exception('test'), lambda: DEFAULT]
# self.daemonizable.start.side_effect = [Exception('test'),
# lambda: DEFAULT]
#
# res = self.daemon.start(log_file=None, dump_stack_trace=True)
#
@ -140,7 +142,7 @@ class TestDaemon(unittest.TestCase):
# self.assertEqual(daemon.Daemon.exit_flag, True)
# self.assertEqual(self.daemonizable.start.call_count, 2)
# self.assertTrue(mock_logging.error.called)
#
# @patch('freezer.scheduler.daemon.os')
# def test_stop_not_existing(self, mock_os):
# self.daemon.pid = None

View File

@ -12,11 +12,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
from freezer.scheduler import scheduler_job
class TestSchedulerJob(unittest.TestCase):
def setUp(self):
self.job = scheduler_job.Job(None, None, {"job_schedule": {}})

View File

@ -16,40 +16,41 @@ limitations under the License.
"""
import unittest
from mock import Mock, patch
import mock
from mock import patch
from freezer.snapshot import lvm
class Test_lvm_snap_remove(unittest.TestCase):
@patch('freezer.snapshot.lvm.os')
@patch('freezer.snapshot.lvm._umount')
@patch('freezer.snapshot.lvm._lvremove')
def test_return_none_on_success(self, mock_lvremove, mock_umount, mock_os):
backup_opt = Mock()
backup_opt = mock.Mock()
backup_opt.lvm_volgroup = 'one'
backup_opt.lvm_snapname = 'two'
self.assertIsNone(lvm.lvm_snap_remove(backup_opt))
class Test_lvm_snap(unittest.TestCase):
@patch('freezer.snapshot.lvm.get_lvm_info')
@patch('freezer.snapshot.lvm.utils.create_dir')
def test_with_snapshot_opt_simple_sets_correct_path_and_raises_on_perm(self, mock_create_dir, mock_get_lvm_info):
def test_with_snapshot_opt_simple_sets_correct_path_and_raises_on_perm(
self, mock_create_dir, mock_get_lvm_info):
mock_get_lvm_info.return_value = {
'volgroup': 'lvm_volgroup',
'srcvol': 'lvm_device',
'snap_path': 'snap_path'}
backup_opt = Mock()
backup_opt = mock.Mock()
backup_opt.snapshot = True
backup_opt.path_to_backup = '/just/a/path'
backup_opt.lvm_dirmount = '/var/mountpoint'
backup_opt.lvm_snapperm = 'invalid_value'
with self.assertRaises(Exception) as cm:
with self.assertRaises(Exception) as cm: # noqa
lvm.lvm_snap(backup_opt)
the_exception = cm.exception
self.assertIn('Invalid value for option lvm-snap-perm',
@ -60,17 +61,18 @@ class Test_lvm_snap(unittest.TestCase):
@patch('freezer.snapshot.lvm.get_vol_fs_type')
@patch('freezer.snapshot.lvm.get_lvm_info')
@patch('freezer.snapshot.lvm.utils.create_dir')
def test_ok(self, mock_create_dir, mock_get_lvm_info, mock_get_vol_fs_type, mock_popen, mock_validate_lvm_params):
def test_ok(self, mock_create_dir, mock_get_lvm_info, mock_get_vol_fs_type,
mock_popen, mock_validate_lvm_params):
mock_get_lvm_info.return_value = {
'volgroup': 'lvm_volgroup',
'srcvol': 'lvm_device',
'snap_path': 'snap_path'}
mock_process = Mock()
mock_process = mock.Mock()
mock_process.communicate.return_value = '', ''
mock_process.returncode = 0
mock_popen.return_value = mock_process
backup_opt = Mock()
backup_opt = mock.Mock()
backup_opt.snapshot = True
backup_opt.path_to_backup = '/just/a/path'
backup_opt.lvm_dirmount = '/var/mountpoint'
@ -86,24 +88,26 @@ class Test_lvm_snap(unittest.TestCase):
@patch('freezer.snapshot.lvm.get_vol_fs_type')
@patch('freezer.snapshot.lvm.get_lvm_info')
@patch('freezer.utils.utils.create_dir')
def test_snapshot_fails(self, mock_create_dir, mock_get_lvm_info, mock_get_vol_fs_type, mock_popen, mock_validate_lvm_params):
def test_snapshot_fails(self, mock_create_dir, mock_get_lvm_info,
mock_get_vol_fs_type, mock_popen,
mock_validate_lvm_params):
mock_get_lvm_info.return_value = {
'volgroup': 'lvm_volgroup',
'srcvol': 'lvm_device',
'snap_path': 'snap_path'}
mock_process = Mock()
mock_process = mock.Mock()
mock_process.communicate.return_value = '', ''
mock_process.returncode = 1
mock_popen.return_value = mock_process
backup_opt = Mock()
backup_opt = mock.Mock()
backup_opt.snapshot = True
backup_opt.path_to_backup = '/just/a/path'
backup_opt.lvm_dirmount = '/var/mountpoint'
backup_opt.lvm_snapperm = 'ro'
backup_opt.lvm_snapsize = '1G'
with self.assertRaises(Exception) as cm:
with self.assertRaises(Exception) as cm: # noqa
lvm.lvm_snap(backup_opt)
the_exception = cm.exception
self.assertIn('lvm snapshot creation error', str(the_exception))
@ -114,19 +118,21 @@ class Test_lvm_snap(unittest.TestCase):
@patch('freezer.snapshot.lvm.get_vol_fs_type')
@patch('freezer.snapshot.lvm.get_lvm_info')
@patch('freezer.utils.utils.create_dir')
def test_already_mounted(self, mock_create_dir, mock_get_lvm_info, mock_get_vol_fs_type,
mock_popen, mock_validate_lvm_params, lvm_snap_remove):
def test_already_mounted(self, mock_create_dir, mock_get_lvm_info,
mock_get_vol_fs_type,
mock_popen, mock_validate_lvm_params,
lvm_snap_remove):
mock_get_vol_fs_type.return_value = 'xfs'
mock_get_lvm_info.return_value = {
'volgroup': 'lvm_volgroup',
'srcvol': 'lvm_device',
'snap_path': 'snap_path'}
mock_process = Mock()
mock_process = mock.Mock()
mock_process.communicate.return_value = '', 'already mounted'
mock_process.returncode = 0
mock_popen.return_value = mock_process
backup_opt = Mock()
backup_opt = mock.Mock()
backup_opt.snapshot = True
backup_opt.path_to_backup = '/just/a/path'
backup_opt.lvm_dirmount = '/var/mountpoint'
@ -151,7 +157,7 @@ class Test_lvm_snap(unittest.TestCase):
'volgroup': 'lvm_volgroup',
'srcvol': 'lvm_device',
'snap_path': 'snap_path'}
mock_lvcreate_process, mock_mount_process = Mock(), Mock()
mock_lvcreate_process, mock_mount_process = mock.Mock(), mock.Mock()
mock_lvcreate_process.communicate.return_value = '', ''
mock_lvcreate_process.returncode = 0
@ -161,14 +167,14 @@ class Test_lvm_snap(unittest.TestCase):
mock_popen.side_effect = [mock_lvcreate_process, mock_mount_process]
backup_opt = Mock()
backup_opt = mock.Mock()
backup_opt.snapshot = True
backup_opt.path_to_backup = '/just/a/path'
backup_opt.lvm_dirmount = '/var/mountpoint'
backup_opt.lvm_snapperm = 'ro'
backup_opt.lvm_snapsize = '1G'
with self.assertRaises(Exception) as cm:
with self.assertRaises(Exception) as cm: # noqa
lvm.lvm_snap(backup_opt)
the_exception = cm.exception
self.assertIn('lvm snapshot mounting error', str(the_exception))
@ -177,63 +183,77 @@ class Test_lvm_snap(unittest.TestCase):
# class Test_get_lvm_info(unittest.TestCase):
# @patch('freezer.snapshot.lvm.lvm_guess')
# @patch('freezer.snapshot.lvm.utils.get_mount_from_path')
# def test_using_guess(self, mock_get_mount_from_path, mock_lvm_guess):
# mock_get_mount_from_path.return_value = '/home/somedir', 'some-snap-path'
# mock_lvm_guess.return_value = 'vg_test', 'lv_test', 'lvm_device'
# mounts = ('/dev/mapper/vg_prova-lv_prova_vol1 /home/pippo ext4 rw,relatime,data=ordered 0 0')
# mocked_open_function = mock_open(read_data=mounts)
#
# with patch("__builtin__.open", mocked_open_function):
# res = lvm.get_lvm_info('lvm_auto_snap_value')
#
# expected_result = {'volgroup': 'vg_test',
# 'snap_path': 'some-snap-path',
# 'srcvol': 'lvm_device'}
# self.assertEqual(res, expected_result)
# @patch('freezer.snapshot.lvm.subprocess.Popen')
# @patch('freezer.snapshot.lvm.lvm_guess')
# @patch('freezer.snapshot.lvm.utils.get_mount_from_path')
# def test_using_mount(self, mock_get_mount_from_path, mock_lvm_guess, mock_popen):
# mock_get_mount_from_path.return_value = '/home/somedir', 'some-snap-path'
# mock_lvm_guess.side_effect = [(None, None, None), ('vg_test', 'lv_test', 'lvm_device')]
# mounts = ('/dev/mapper/vg_prova-lv_prova_vol1 /home/pippo ext4 rw,relatime,data=ordered 0 0')
# mocked_open_function = mock_open(read_data=mounts)
# mock_process = Mock()
# mock_process.returncode = 0
# mock_popen.return_value = mock_process
# mock_process.communicate.return_value = '', ''
#
# with patch("__builtin__.open", mocked_open_function):
# res = lvm.get_lvm_info('lvm_auto_snap_value')
#
# expected_result = {'volgroup': 'vg_test',
# 'snap_path': 'some-snap-path',
# 'srcvol': 'lvm_device'}
# self.assertEqual(res, expected_result)
#
# @patch('freezer.snapshot.lvm.subprocess.Popen')
# @patch('freezer.snapshot.lvm.lvm_guess')
# @patch('freezer.snapshot.lvm.utils.get_mount_from_path')
# def test_raises_Exception_when_info_not_found(self, mock_get_mount_from_path, mock_lvm_guess, mock_popen):
# mock_get_mount_from_path.return_value = '/home/somedir', 'some-snap-path'
# mock_lvm_guess.return_value = None, None, None
# mounts = ('/dev/mapper/vg_prova-lv_prova_vol1 /home/pippo ext4 rw,relatime,data=ordered 0 0')
# mocked_open_function = mock_open(read_data=mounts)
# mock_process = Mock()
# mock_lvm_guess.return_value = None, None, None
# mock_process.communicate.return_value = '', ''
# mock_popen.return_value = mock_process
#
# with patch("__builtin__.open", mocked_open_function):
# self.assertRaises(Exception, lvm.get_lvm_info, 'lvm_auto_snap_value')
# @patch('freezer.snapshot.lvm.lvm_guess')
# @patch('freezer.snapshot.lvm.utils.get_mount_from_path')
# def test_using_guess(self, mock_get_mount_from_path, mock_lvm_guess):
# mock_get_mount_from_path.return_value = ('/home/somedir',
# 'some-snap-path')
# mock_lvm_guess.return_value = 'vg_test', 'lv_test', 'lvm_device'
# mounts = (
# '/dev/mapper/vg_prova-lv_prova_vol1 /home/pippo ext4 rw,'
# 'relatime,data=ordered 0 0'
# )
# mocked_open_function = mock.mock_open(read_data=mounts)
#
# with patch("__builtin__.open", mocked_open_function):
# res = lvm.get_lvm_info('lvm_auto_snap_value')
#
# expected_result = {'volgroup': 'vg_test',
# 'snap_path': 'some-snap-path',
# 'srcvol': 'lvm_device'}
# self.assertEqual(res, expected_result)
#
# @patch('freezer.snapshot.lvm.subprocess.Popen')
# @patch('freezer.snapshot.lvm.lvm_guess')
# @patch('freezer.snapshot.lvm.utils.get_mount_from_path')
# def test_using_mount(self, mock_get_mount_from_path, mock_lvm_guess,
# mock_popen):
# mock_get_mount_from_path.return_value = ('/home/somedir',
# 'some-snap-path')
# mock_lvm_guess.side_effect = [(None, None, None),
# ('vg_test', 'lv_test', 'lvm_device')]
# mounts = (
# '/dev/mapper/vg_prova-lv_prova_vol1 /home/pippo ext4 rw,'
# 'relatime,data=ordered 0 0'
# )
# mocked_open_function = mock.mock_open(read_data=mounts)
# mock_process = mock.Mock()
# mock_process.returncode = 0
# mock_popen.return_value = mock_process
# mock_process.communicate.return_value = '', ''
#
# with patch("__builtin__.open", mocked_open_function):
# res = lvm.get_lvm_info('lvm_auto_snap_value')
#
# expected_result = {'volgroup': 'vg_test',
# 'snap_path': 'some-snap-path',
# 'srcvol': 'lvm_device'}
# self.assertEqual(res, expected_result)
#
# @patch('freezer.snapshot.lvm.subprocess.Popen')
# @patch('freezer.snapshot.lvm.lvm_guess')
# @patch('freezer.snapshot.lvm.utils.get_mount_from_path')
# def test_raises_Exception_when_info_not_found(
# self, mock_get_mount_from_path, mock_lvm_guess, mock_popen):
# mock_get_mount_from_path.return_value = ('/home/somedir',
# 'some-snap-path')
# mock_lvm_guess.return_value = None, None, None
# mounts = (
# '/dev/mapper/vg_prova-lv_prova_vol1 /home/pippo ext4 rw,'
# 'relatime,data=ordered 0 0'
# )
# mocked_open_function = mock.mock_open(read_data=mounts)
# mock_process = mock.Mock()
# mock_lvm_guess.return_value = None, None, None
# mock_process.communicate.return_value = '', ''
# mock_popen.return_value = mock_process
#
# with patch("__builtin__.open", mocked_open_function):
# self.assertRaises(Exception, lvm.get_lvm_info,
# 'lvm_auto_snap_value')
class Test_lvm_guess(unittest.TestCase):
def test_no_match(self):
mount_points = []
mount_point_path = '/home/pippo'
@ -245,56 +265,105 @@ class Test_lvm_guess(unittest.TestCase):
self.assertEqual(res, expected_result)
def test_unsing_proc_mounts(self):
mount_points = ['rootfs / rootfs rw 0 0\n', 'sysfs /sys sysfs rw,nosuid,nodev,noexec,relatime 0 0\n', 'proc /proc proc rw,nosuid,nodev,noexec,relatime 0 0\n', 'udev /dev devtmpfs rw,relatime,size=2010616k,nr_inodes=502654,mode=755 0 0\n', 'devpts /dev/pts devpts rw,nosuid,noexec,relatime,gid=5,mode=620,ptmxmode=000 0 0\n', 'tmpfs /run tmpfs rw,nosuid,noexec,relatime,size=404836k,mode=755 0 0\n', '/dev/mapper/fabuntu--vg-root / ext4 rw,relatime,errors=remount-ro,data=ordered 0 0\n', 'none /sys/fs/cgroup tmpfs rw,relatime,size=4k,mode=755 0 0\n', 'none /sys/fs/fuse/connections fusectl rw,relatime 0 0\n', 'none /sys/kernel/debug debugfs rw,relatime 0 0\n', 'none /sys/kernel/security securityfs rw,relatime 0 0\n', 'cgroup /sys/fs/cgroup/cpuset cgroup rw,relatime,cpuset 0 0\n', 'cgroup /sys/fs/cgroup/cpu cgroup rw,relatime,cpu 0 0\n', 'cgroup /sys/fs/cgroup/cpuacct cgroup rw,relatime,cpuacct 0 0\n', 'cgroup /sys/fs/cgroup/memory cgroup rw,relatime,memory 0 0\n', 'none /run/lock tmpfs rw,nosuid,nodev,noexec,relatime,size=5120k 0 0\n', 'none /run/shm tmpfs rw,nosuid,nodev,relatime 0 0\n', 'none /run/user tmpfs rw,nosuid,nodev,noexec,relatime,size=102400k,mode=755 0 0\n', 'cgroup /sys/fs/cgroup/devices cgroup rw,relatime,devices 0 0\n', 'none /sys/fs/pstore pstore rw,relatime 0 0\n', 'cgroup /sys/fs/cgroup/freezer cgroup rw,relatime,freezer 0 0\n', 'cgroup /sys/fs/cgroup/blkio cgroup rw,relatime,blkio 0 0\n', 'cgroup /sys/fs/cgroup/perf_event cgroup rw,relatime,perf_event 0 0\n', 'cgroup /sys/fs/cgroup/hugetlb cgroup rw,relatime,hugetlb 0 0\n', '/dev/sda1 /boot ext2 rw,relatime 0 0\n', 'systemd /sys/fs/cgroup/systemd cgroup rw,nosuid,nodev,noexec,relatime,name=systemd 0 0\n', '/dev/mapper/vg_prova-lv_prova_vol1 /home/pippo ext4 rw,relatime,data=ordered 0 0\n']
mount_points = [
'rootfs / rootfs rw 0 0\n',
'sysfs /sys sysfs rw,nosuid,nodev,noexec,relatime 0 0\n',
'proc /proc proc rw,nosuid,nodev,noexec,relatime 0 0\n',
'udev /dev devtmpfs rw,relatime,size=2010616k,nr_inodes=502654,'
'mode=755 0 0\n',
'devpts /dev/pts devpts rw,nosuid,noexec,relatime,gid=5,mode=620,'
'ptmxmode=000 0 0\n',
'tmpfs /run tmpfs rw,nosuid,noexec,relatime,size=404836k,mode=755 '
'0 0\n',
'/dev/mapper/fabuntu--vg-root / ext4 rw,relatime,'
'errors=remount-ro,data=ordered 0 0\n',
'none /sys/fs/cgroup tmpfs rw,relatime,size=4k,mode=755 0 0\n',
'none /sys/fs/fuse/connections fusectl rw,relatime 0 0\n',
'none /sys/kernel/debug debugfs rw,relatime 0 0\n',
'none /sys/kernel/security securityfs rw,relatime 0 0\n',
'cgroup /sys/fs/cgroup/cpuset cgroup rw,relatime,cpuset 0 0\n',
'cgroup /sys/fs/cgroup/cpu cgroup rw,relatime,cpu 0 0\n',
'cgroup /sys/fs/cgroup/cpuacct cgroup rw,relatime,cpuacct 0 0\n',
'cgroup /sys/fs/cgroup/memory cgroup rw,relatime,memory 0 0\n',
'none /run/lock tmpfs rw,nosuid,nodev,noexec,relatime,size=5120k '
'0 0\n',
'none /run/shm tmpfs rw,nosuid,nodev,relatime 0 0\n',
'none /run/user tmpfs rw,nosuid,nodev,noexec,relatime,'
'size=102400k,mode=755 0 0\n',
'cgroup /sys/fs/cgroup/devices cgroup rw,relatime,devices 0 0\n',
'none /sys/fs/pstore pstore rw,relatime 0 0\n',
'cgroup /sys/fs/cgroup/freezer cgroup rw,relatime,freezer 0 0\n',
'cgroup /sys/fs/cgroup/blkio cgroup rw,relatime,blkio 0 0\n',
'cgroup /sys/fs/cgroup/perf_event cgroup rw,relatime,perf_event '
'0 0\n',
'cgroup /sys/fs/cgroup/hugetlb cgroup rw,relatime,hugetlb 0 0\n',
'/dev/sda1 /boot ext2 rw,relatime 0 0\n',
'systemd /sys/fs/cgroup/systemd cgroup rw,nosuid,nodev,noexec,'
'relatime,name=systemd 0 0\n',
'/dev/mapper/vg_prova-lv_prova_vol1 /home/pippo ext4 rw,relatime,'
'data=ordered 0 0\n'
]
mount_point_path = '/home/pippo'
source = '/proc/mounts'
res = lvm.lvm_guess(mount_point_path, mount_points, source)
expected_result = ('vg_prova', 'lv_prova_vol1', '/dev/vg_prova/lv_prova_vol1')
expected_result = (
'vg_prova', 'lv_prova_vol1', '/dev/vg_prova/lv_prova_vol1')
self.assertEqual(res, expected_result)
def test_unsing_mount(self):
mount_points = ['/dev/mapper/fabuntu--vg-root on / type ext4 (rw,errors=remount-ro)',
'proc on /proc type proc (rw,noexec,nosuid,nodev)',
'sysfs on /sys type sysfs (rw,noexec,nosuid,nodev)',
'none on /sys/fs/cgroup type tmpfs (rw)',
'none on /sys/fs/fuse/connections type fusectl (rw)',
'none on /sys/kernel/debug type debugfs (rw)',
'none on /sys/kernel/security type securityfs (rw)',
'udev on /dev type devtmpfs (rw,mode=0755)',
'devpts on /dev/pts type devpts (rw,noexec,nosuid,gid=5,mode=0620)',
'tmpfs on /run type tmpfs (rw,noexec,nosuid,size=10%,mode=0755)',
'none on /run/lock type tmpfs (rw,noexec,nosuid,nodev,size=5242880)',
'none on /run/shm type tmpfs (rw,nosuid,nodev)',
'none on /run/user type tmpfs (rw,noexec,nosuid,nodev,size=104857600,mode=0755)',
'none on /sys/fs/pstore type pstore (rw)',
'cgroup on /sys/fs/cgroup/cpuset type cgroup (rw,relatime,cpuset)',
'cgroup on /sys/fs/cgroup/cpu type cgroup (rw,relatime,cpu)',
'cgroup on /sys/fs/cgroup/cpuacct type cgroup (rw,relatime,cpuacct)',
'cgroup on /sys/fs/cgroup/memory type cgroup (rw,relatime,memory)',
'cgroup on /sys/fs/cgroup/devices type cgroup (rw,relatime,devices)',
'cgroup on /sys/fs/cgroup/freezer type cgroup (rw,relatime,freezer)',
'cgroup on /sys/fs/cgroup/blkio type cgroup (rw,relatime,blkio)',
'cgroup on /sys/fs/cgroup/perf_event type cgroup (rw,relatime,perf_event)',
'cgroup on /sys/fs/cgroup/hugetlb type cgroup (rw,relatime,hugetlb)',
'/dev/sda1 on /boot type ext2 (rw)',
'systemd on /sys/fs/cgroup/systemd type cgroup (rw,noexec,nosuid,nodev,none,name=systemd)',
'/dev/mapper/vg_prova-lv_prova_vol1 on /home/pippo type ext4 (rw)',
'']
mount_points = [
'/dev/mapper/fabuntu--vg-root on / type ext4 (rw,errors='
'remount-ro)',
'proc on /proc type proc (rw,noexec,nosuid,nodev)',
'sysfs on /sys type sysfs (rw,noexec,nosuid,nodev)',
'none on /sys/fs/cgroup type tmpfs (rw)',
'none on /sys/fs/fuse/connections type fusectl (rw)',
'none on /sys/kernel/debug type debugfs (rw)',
'none on /sys/kernel/security type securityfs (rw)',
'udev on /dev type devtmpfs (rw,mode=0755)',
'devpts on /dev/pts type devpts (rw,noexec,nosuid,gid=5,'
'mode=0620)',
'tmpfs on /run type tmpfs (rw,noexec,nosuid,size=10%,mode=0755)',
'none on /run/lock type tmpfs (rw,noexec,nosuid,nodev,'
'size=5242880)',
'none on /run/shm type tmpfs (rw,nosuid,nodev)',
'none on /run/user type tmpfs (rw,noexec,nosuid,nodev,'
'size=104857600,mode=0755)',
'none on /sys/fs/pstore type pstore (rw)',
'cgroup on /sys/fs/cgroup/cpuset type cgroup (rw,relatime,cpuset)',
'cgroup on /sys/fs/cgroup/cpu type cgroup (rw,relatime,cpu)',
'cgroup on /sys/fs/cgroup/cpuacct type cgroup (rw,relatime,'
'cpuacct)',
'cgroup on /sys/fs/cgroup/memory type cgroup (rw,relatime,memory)',
'cgroup on /sys/fs/cgroup/devices type cgroup (rw,relatime,'
'devices)',
'cgroup on /sys/fs/cgroup/freezer type cgroup (rw,relatime,'
'freezer)',
'cgroup on /sys/fs/cgroup/blkio type cgroup (rw,relatime,blkio)',
'cgroup on /sys/fs/cgroup/perf_event type cgroup (rw,relatime,'
'perf_event)',
'cgroup on /sys/fs/cgroup/hugetlb type cgroup (rw,relatime,'
'hugetlb)',
'/dev/sda1 on /boot type ext2 (rw)',
'systemd on /sys/fs/cgroup/systemd type cgroup (rw,noexec,nosuid,'
'nodev,none,name=systemd)',
'/dev/mapper/vg_prova-lv_prova_vol1 on /home/pippo type ext4 (rw)',
'']
mount_point_path = '/home/pippo'
source = 'mount'
res = lvm.lvm_guess(mount_point_path, mount_points, source)
expected_result = ('vg_prova', 'lv_prova_vol1', '/dev/vg_prova/lv_prova_vol1')
expected_result = ('vg_prova', 'lv_prova_vol1',
'/dev/vg_prova/lv_prova_vol1')
self.assertEqual(res, expected_result)
class Test_validate_lvm_params(unittest.TestCase):
def setUp(self):
self.backup_opt = Mock()
self.backup_opt = mock.Mock()
self.backup_opt.lvm_snapperm = 'ro'
self.backup_opt.path_to_backup = '/path/to/backup'
self.backup_opt.lvm_srcvol = '/lvm/srcvol'
@ -310,27 +379,30 @@ class Test_validate_lvm_params(unittest.TestCase):
def test_raises_Exception_on_snapperm_invalid(self):
self.backup_opt.lvm_snapperm = 'squeezeme'
self.assertRaises(Exception, lvm.validate_lvm_params, self.backup_opt)
self.assertRaises(Exception, lvm.validate_lvm_params,
self.backup_opt) # noqa
def test_raises_Exception_on_no_pathtobackup(self):
self.backup_opt.path_to_backup = ''
self.assertRaises(Exception, lvm.validate_lvm_params, self.backup_opt)
self.assertRaises(Exception, lvm.validate_lvm_params,
self.backup_opt) # noqa
def test_raises_Exception_on_no_lvmsrcvol(self):
self.backup_opt.lvm_srcvol = ''
self.assertRaises(Exception, lvm.validate_lvm_params, self.backup_opt)
self.assertRaises(Exception, lvm.validate_lvm_params,
self.backup_opt) # noqa
def test_raises_Exception_on_no_lvmvolgrp(self):
self.backup_opt.lvm_volgroup = ''
self.assertRaises(Exception, lvm.validate_lvm_params, self.backup_opt)
self.assertRaises(Exception, lvm.validate_lvm_params,
self.backup_opt) # noqa
class Test_umount(unittest.TestCase):
@patch('freezer.snapshot.lvm.subprocess.Popen')
@patch('freezer.snapshot.lvm.os')
def test_return_none_on_success(self, mock_os, mock_popen):
mock_process = Mock()
mock_process = mock.Mock()
mock_process.communicate.return_value = '', ''
mock_process.returncode = 0
mock_popen.return_value = mock_process
@ -340,19 +412,18 @@ class Test_umount(unittest.TestCase):
@patch('freezer.snapshot.lvm.subprocess.Popen')
@patch('freezer.snapshot.lvm.os')
def test_raises_on_popen_returncode_not_0(self, mock_os, mock_popen):
mock_process = Mock()
mock_process = mock.Mock()
mock_process.communicate.return_value = '', ''
mock_process.returncode = 1
mock_popen.return_value = mock_process
mock_os.rmdir.return_value = None
self.assertRaises(Exception, lvm._umount, 'path')
self.assertRaises(Exception, lvm._umount, 'path') # noqa
class Test_lvremove(unittest.TestCase):
@patch('freezer.snapshot.lvm.subprocess.Popen')
def test_return_none_on_success(self, mock_popen):
mock_process = Mock()
mock_process = mock.Mock()
mock_process.communicate.return_value = '', ''
mock_process.returncode = 0
mock_popen.return_value = mock_process
@ -360,8 +431,8 @@ class Test_lvremove(unittest.TestCase):
@patch('freezer.snapshot.lvm.subprocess.Popen')
def test_raises_on_popen_returncode_not_0(self, mock_popen):
mock_process = Mock()
mock_process = mock.Mock()
mock_process.communicate.return_value = '', ''
mock_process.returncode = 1
mock_popen.return_value = mock_process
self.assertRaises(Exception, lvm._lvremove, 'path')
self.assertRaises(Exception, lvm._lvremove, 'path') # noqa

View File

@ -12,21 +12,20 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
import unittest
from freezer.snapshot import vss
from freezer.tests.commons import (FakeDisableFileSystemRedirection, FakeSubProcess,
FakeSubProcess3, FakeSubProcess6)
import mock
from freezer.tests.commons import FakeDisableFileSystemRedirection
class TestVss(unittest.TestCase):
def mock_process(self, process):
fakesubprocesspopen = process.Popen()
mock.patch('subprocess.Popen.communicate',
new_callable=fakesubprocesspopen.communicate).start()
mock.patch('subprocess.Popen', new_callable=fakesubprocesspopen)\
.start()
mock.patch('subprocess.Popen',
new_callable=fakesubprocesspopen.start())
def mock_winutils(self):
fake_disable_redirection = FakeDisableFileSystemRedirection()

View File

@ -12,9 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import tempfile
import shutil
import tempfile
import unittest
from freezer.storage import local

View File

@ -16,80 +16,76 @@ limitations under the License.
"""
import mock
from mock import patch
from freezer.tests.commons import *
from freezer import job as jobs
from freezer.tests import commons
class TestJob(FreezerBaseTestCase):
class TestJob(commons.FreezerBaseTestCase):
def setUp(self):
super(TestJob, self).setUp()
def test_execute(self):
opt = BackupOpt1()
opt = commons.BackupOpt1()
job = jobs.InfoJob(opt, opt.storage)
assert job.execute() is None
class TestInfoJob(TestJob):
def setUp(self):
super(TestInfoJob, self).setUp()
def test_execute_nothing_to_do(self):
backup_opt = BackupOpt1()
backup_opt = commons.BackupOpt1()
job = jobs.InfoJob(backup_opt, backup_opt.storage)
job.execute()
def test_execute_list_containers(self):
backup_opt = BackupOpt1()
backup_opt = commons.BackupOpt1()
job = jobs.InfoJob(backup_opt, backup_opt.storage)
job.execute()
class TestBackupJob(TestJob):
def setUp(self):
super(TestBackupJob, self).setUp()
def test_execute_backup_fs_no_incremental_and_backup_level_raise(self):
backup_opt = BackupOpt1()
backup_opt = commons.BackupOpt1()
backup_opt.mode = 'default'
backup_opt.no_incremental = True
backup_opt.max_level = None
backup_opt.always_level = None
job = jobs.BackupJob(backup_opt, backup_opt.storage)
self.assertRaises(Exception, job.execute)
self.assertRaises(Exception, job.execute) # noqa
def test_execute_raise(self):
backup_opt = BackupOpt1()
backup_opt = commons.BackupOpt1()
backup_opt.no_incremental = False
backup_opt.mode = None
job = jobs.BackupJob(backup_opt, backup_opt.storage)
self.assertRaises(ValueError, job.execute)
self.assertRaises(ValueError, job.execute) # noqa
class TestAdminJob(TestJob):
def setUp(self):
super(TestAdminJob, self).setUp()
def test_execute(self):
backup_opt = BackupOpt1()
backup_opt = commons.BackupOpt1()
jobs.AdminJob(backup_opt, backup_opt.storage).execute()
class TestExecJob(TestJob):
def setUp(self):
super(TestExecJob, self).setUp()
#init mock_popen
# init mock_popen
self.popen = patch('freezer.utils.exec_cmd.subprocess.Popen')
self.mock_popen = self.popen.start()
self.mock_popen.return_value = Mock()
self.mock_popen.return_value.communicate = Mock()
self.mock_popen.return_value = mock.Mock()
self.mock_popen.return_value.communicate = mock.Mock()
self.mock_popen.return_value.communicate.return_value = ['some stderr']
def tearDown(self):
@ -98,21 +94,21 @@ class TestExecJob(TestJob):
def test_execute_nothing_to_do(self):
self.mock_popen.return_value.returncode = 0
backup_opt = BackupOpt1()
backup_opt = commons.BackupOpt1()
backup_opt.command = 'ls '
jobs.ExecJob(backup_opt, backup_opt.storage).execute()
def test_execute_script(self):
self.mock_popen.return_value.returncode = 0
backup_opt = BackupOpt1()
backup_opt.command='echo test'
backup_opt = commons.BackupOpt1()
backup_opt.command = 'echo test'
jobs.ExecJob(backup_opt, backup_opt.storage).execute()
def test_execute_raise(self):
self.popen=patch('freezer.utils.exec_cmd.subprocess.Popen')
self.mock_popen=self.popen.start()
self.popen = patch('freezer.utils.exec_cmd.subprocess.Popen')
self.mock_popen = self.popen.start()
self.mock_popen.return_value.returncode = 1
backup_opt = BackupOpt1()
backup_opt = commons.BackupOpt1()
backup_opt.command = 'echo test'
job = jobs.ExecJob(backup_opt, backup_opt.storage)
self.assertRaises(Exception, job.execute)
self.assertRaises(Exception, job.execute) # noqa

View File

@ -12,30 +12,31 @@
# License for the specific language governing permissions and limitations
# under the License.
from mock import Mock, patch, mock_open
import sys
from six.moves import StringIO
import unittest
import mock
from mock import patch
from six import moves
from freezer.utils.checksum import CheckSum
class TestChecksum(unittest.TestCase):
def setUp(self):
self.file = Mock()
self.dir = Mock()
self.file = mock.Mock()
self.dir = mock.Mock()
self.hello_world_md5sum = 'f36b2652200f5e88edd57963a1109146'
self.hello_world_sha256sum = '17b949eb67acf16bbf2605d57a01f7af4ff4b5' \
'7e200259de63fcebf20e75bbf5'
self.hello_world_sha256sum = ('17b949eb67acf16bbf2605d57a01f7af4ff4b5'
'7e200259de63fcebf20e75bbf5')
self.fake_file = StringIO(u"hello world\n")
self.fake_file = moves.StringIO(u"hello world\n")
self.increment_hash_one = self.hello_world_sha256sum
self.increment_hash_multi = '1b4bc4ff41172a5f29eaeffb7e9fc24c683c693' \
'9ab30132ad5d93a1e4a6b16e8'
self.increment_hash_emptydir = "6b6c6a3d7548cc4396b3dacc6c2750c3"\
"da53f379d20996cbdd2c18be00c3742c"
self.increment_hash_multi = ('1b4bc4ff41172a5f29eaeffb7e9fc24c683c693'
'9ab30132ad5d93a1e4a6b16e8')
self.increment_hash_emptydir = ("6b6c6a3d7548cc4396b3dacc6c2750c3"
"da53f379d20996cbdd2c18be00c3742c")
self.fake_dir = [('root', ['d1, .git'], ['a', 'b']), ]
self.dir_files = ['root/a', 'root/b']
self.exclude = "ro*b"
@ -69,7 +70,8 @@ class TestChecksum(unittest.TestCase):
with self.assertRaises(ValueError):
CheckSum('nope', 'bulshit')
@unittest.skipIf(sys.version_info.major == 2, 'Not supported on python v 2.7')
@unittest.skipIf(sys.version_info.major == 2,
'Not supported on python v 2.7')
@patch('builtins.open')
@patch('freezer.utils.checksum.os.path.isfile')
def test_get_hash_files(self, mock_isfile, mock_open):

View File

@ -12,9 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from six.moves import cStringIO
import unittest
from six import moves
from freezer.utils import config
@ -46,7 +47,7 @@ password = 'aNiceQuotedPassword'
password2 = "aNiceQuotedPassword"
spaced = value"""
fd = cStringIO(string)
fd = moves.cStringIO(string)
res = config.ini_parse(fd)
self.assertEqual('127.0.0.1', res['host'])
self.assertEqual('openstack', res['user'])

View File

@ -11,37 +11,39 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import subprocess
import unittest
from mock import patch, Mock
import mock
from mock import patch
from freezer.utils import exec_cmd
class TestExec(unittest.TestCase):
def test_exec_cmd(self):
cmd="echo test > test.txt"
popen=patch('freezer.utils.exec_cmd.subprocess.Popen')
mock_popen=popen.start()
mock_popen.return_value = Mock()
mock_popen.return_value.communicate = Mock()
cmd = "echo test > test.txt"
popen = patch('freezer.utils.exec_cmd.subprocess.Popen')
mock_popen = popen.start()
mock_popen.return_value = mock.Mock()
mock_popen.return_value.communicate = mock.Mock()
mock_popen.return_value.communicate.return_value = ['some stderr']
mock_popen.return_value.returncode = 0
exec_cmd.execute(cmd)
assert (mock_popen.call_count == 1)
mock_popen.assert_called_with(['echo', 'test', '>', 'test.txt'],
shell=False,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE)
shell=False,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE)
popen.stop()
def test__exec_cmd_with_pipe(self):
cmd="echo test|wc -l"
popen=patch('freezer.utils.exec_cmd.subprocess.Popen')
mock_popen=popen.start()
mock_popen.return_value = Mock()
mock_popen.return_value.communicate = Mock()
cmd = "echo test|wc -l"
popen = patch('freezer.utils.exec_cmd.subprocess.Popen')
mock_popen = popen.start()
mock_popen.return_value = mock.Mock()
mock_popen.return_value.communicate = mock.Mock()
mock_popen.return_value.communicate.return_value = ['some stderr']
mock_popen.return_value.returncode = 0
exec_cmd.execute(cmd)

View File

@ -13,23 +13,22 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import unittest
import os
import time
from mock import patch
from freezer.tests.commons import *
from freezer.openstack import osclients
from freezer.tests import commons
from freezer.utils import utils
class TestUtils(FreezerBaseTestCase):
class TestUtils(commons.FreezerBaseTestCase):
def setUp(self):
super(TestUtils, self).setUp()
def test_create_dir(self):
dir1 = '/tmp'
dir2 = '/tmp/testnoexistent1234'
dir3 = '~'
@ -48,15 +47,15 @@ class TestUtils(FreezerBaseTestCase):
# fakere = FakeRe()
# re.search = fakere.search
# assert type(utils.get_vol_fs_type("test")) is str
#def test_get_mount_from_path(self):
# dir1 = '/tmp'
# dir2 = '/tmp/nonexistentpathasdf'
# assert type(utils.get_mount_from_path(dir1)[0]) is str
# assert type(utils.get_mount_from_path(dir1)[1]) is str
# self.assertRaises(Exception, utils.get_mount_from_path, dir2)
# pytest.raises(Exception, utils.get_mount_from_path, dir2)
#
# def test_get_mount_from_path(self):
# dir1 = '/tmp'
# dir2 = '/tmp/nonexistentpathasdf'
# assert type(utils.get_mount_from_path(dir1)[0]) is str
# assert type(utils.get_mount_from_path(dir1)[1]) is str
# self.assertRaises(Exception, utils.get_mount_from_path, dir2)
#
# pytest.raises(Exception, utils.get_mount_from_path, dir2)
def test_human2bytes(self):
assert utils.human2bytes('0 B') == 0
@ -70,9 +69,10 @@ class TestUtils(FreezerBaseTestCase):
self.assertRaises(ValueError, utils.human2bytes, '12 foo')
def test_OpenstackOptions_creation_success(self):
class FreezerOpts:
class FreezerOpts(object):
def __init__(self, opts):
self.__dict__.update(opts)
env_dict = dict(OS_USERNAME='testusername',
OS_TENANT_NAME='testtenantename',
OS_AUTH_URL='testauthurl',
@ -80,7 +80,8 @@ class TestUtils(FreezerBaseTestCase):
OS_REGION_NAME='testregion',
OS_TENANT_ID='0123456789',
OS_AUTH_VERSION='2.0')
options = OpenstackOpts.create_from_dict(env_dict).get_opts_dicts()
options = osclients.OpenstackOpts.create_from_dict(
env_dict).get_opts_dicts()
options = FreezerOpts(options)
assert options.username == env_dict['OS_USERNAME']
assert options.tenant_name == env_dict['OS_TENANT_NAME']
@ -94,7 +95,8 @@ class TestUtils(FreezerBaseTestCase):
OS_AUTH_URL='testauthurl',
OS_PASSWORD='testpassword',
OS_AUTH_VERSION='2.0')
options = OpenstackOpts.create_from_dict(env_dict).get_opts_dicts()
options = osclients.OpenstackOpts.create_from_dict(
env_dict).get_opts_dicts()
options = FreezerOpts(options)
assert options.username == env_dict['OS_USERNAME']
assert options.tenant_name == env_dict['OS_TENANT_NAME']
@ -103,8 +105,8 @@ class TestUtils(FreezerBaseTestCase):
def test_date_to_timestamp(self):
# ensure that timestamp is check with appropriate timezone offset
assert (1417649003+time.timezone) == \
utils.date_to_timestamp("2014-12-03T23:23:23")
assert (1417649003 + time.timezone) == utils.date_to_timestamp(
"2014-12-03T23:23:23")
def prepare_env(self):
os.environ["HTTP_PROXY"] = 'http://proxy.original.domain:8080'
@ -116,7 +118,7 @@ class TestUtils(FreezerBaseTestCase):
HTTP_PROXY and HTTPS_PROXY when 'proxy' key in its dictionary
"""
# Test wrong proxy value
self.assertRaises(Exception, utils.alter_proxy, 'boohoo')
self.assertRaises(Exception, utils.alter_proxy, 'boohoo') # noqa
# Test when there is proxy value passed
self.prepare_env()
@ -126,43 +128,43 @@ class TestUtils(FreezerBaseTestCase):
assert os.environ["HTTPS_PROXY"] == test_proxy
def test_exclude_path(self):
assert utils.exclude_path('./dir/file','file') is True
assert utils.exclude_path('./dir/file','*le') is True
assert utils.exclude_path('./dir/file','fi*') is True
assert utils.exclude_path('./dir/file','*fi*') is True
assert utils.exclude_path('./dir/file','dir') is True
assert utils.exclude_path('./dir/file','di*') is True
assert utils.exclude_path('./aaa/bbb/ccc','*bb') is True
assert utils.exclude_path('./aaa/bbb/ccc','bb') is False
assert utils.exclude_path('./a/b','c') is False
assert utils.exclude_path('./a/b/c','') is False
assert utils.exclude_path('./dir/file', 'file') is True
assert utils.exclude_path('./dir/file', '*le') is True
assert utils.exclude_path('./dir/file', 'fi*') is True
assert utils.exclude_path('./dir/file', '*fi*') is True
assert utils.exclude_path('./dir/file', 'dir') is True
assert utils.exclude_path('./dir/file', 'di*') is True
assert utils.exclude_path('./aaa/bbb/ccc', '*bb') is True
assert utils.exclude_path('./aaa/bbb/ccc', 'bb') is False
assert utils.exclude_path('./a/b', 'c') is False
assert utils.exclude_path('./a/b/c', '') is False
@patch('freezer.utils.utils.os.walk')
@patch('freezer.utils.utils.os.chdir')
@patch('freezer.utils.utils.os.path.isfile')
def test_walk_path_dir(self,mock_isfile,mock_chdir,mock_walk):
def test_walk_path_dir(self, mock_isfile, mock_chdir, mock_walk):
mock_isfile.return_value = False
mock_chdir.return_value = None
mock_walk.return_value = [('.', ['d1','d2'],['f1','f2']),
('./d1',[],['f3']),('./d2',[],[]),]
mock_walk.return_value = [('.', ['d1', 'd2'], ['f1', 'f2']),
('./d1', [], ['f3']), ('./d2', [], []), ]
expected = ['.', './f1', './f2', './d1', './d1/f3', './d2']
files = []
count = utils.walk_path('root','',False,self.callback, files=files)
count = utils.walk_path('root', '', False, self.callback, files=files)
for i in range(len(files)):
assert expected[i] == files[i]
assert count is len(files)
@patch('freezer.utils.utils.os.path.isfile')
def test_walk_path_file(self,mock_isfile):
def test_walk_path_file(self, mock_isfile):
mock_isfile.return_value = True
count = utils.walk_path('root','',False,self.callback)
count = utils.walk_path('root', '', False, self.callback)
assert count is 1
def callback(self,filepath='', files=[]):
def callback(self, filepath='', files=[]):
files.append(filepath)
class TestDateTime:
class TestDateTime(object):
def setup(self):
d = datetime.datetime(2015, 3, 7, 17, 47, 44, 716799)
self.datetime = utils.DateTime(d)
@ -172,8 +174,8 @@ class TestDateTime:
assert isinstance(new_time, utils.DateTime)
def test_timestamp(self):
#ensure that timestamp is check with appropriate timezone offset
assert (1425750464+time.timezone) == self.datetime.timestamp
# ensure that timestamp is check with appropriate timezone offset
assert (1425750464 + time.timezone) == self.datetime.timestamp
def test_repr(self):
assert '2015-03-07 17:47:44' == '{}'.format(self.datetime)
@ -181,20 +183,22 @@ class TestDateTime:
def test_initialize_int(self):
d = utils.DateTime(1425750464)
assert 1425750464 == d.timestamp
#ensure that time is check with appropriate timezone offset
t = time.strftime("%Y-%m-%d %H:%M:%S",
time.localtime((time.mktime(time.strptime("2015-03-07 17:47:44",
"%Y-%m-%d %H:%M:%S")))-time.timezone))
# ensure that time is check with appropriate timezone offset
t = time.strftime(
"%Y-%m-%d %H:%M:%S",
time.localtime(
(time.mktime(
time.strptime("2015-03-07 17:47:44",
"%Y-%m-%d %H:%M:%S"))) - time.timezone))
assert t == '{}'.format(d)
def test_initialize_string(self):
d = utils.DateTime('2015-03-07T17:47:44')
# ensure that timestamp is check with appropriate timezone offset
assert (1425750464+time.timezone) == d.timestamp
assert (1425750464 + time.timezone) == d.timestamp
assert '2015-03-07 17:47:44' == '{}'.format(d)
def test_sub(self):
d2 = datetime.datetime(2015, 3, 7, 18, 18, 38, 508411)
ts2 = utils.DateTime(d2)
assert '0:30:53.791612' == '{}'.format(ts2 - self.datetime)

View File

@ -12,10 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import unittest
import mock
from freezer.tests.commons import *
from freezer.tests import commons
from freezer.utils import winutils
@ -29,14 +31,14 @@ class TestWinutils(unittest.TestCase):
.start()
def mock_winutils(self):
fake_disable_redirection = FakeDisableFileSystemRedirection()
fake_disable_redirection = commons.FakeDisableFileSystemRedirection()
mock.patch('winutils.DisableFileSystemRedirection.__enter__',
new_callable=fake_disable_redirection.__enter__)
mock.patch('winutils.DisableFileSystemRedirection.__exit__',
new_callable=fake_disable_redirection.__exit__)
def test_is_windows(self):
fake_os = Os()
fake_os = commons.Os()
os.name = fake_os
assert winutils.is_windows() is False
@ -48,14 +50,16 @@ class TestWinutils(unittest.TestCase):
assert winutils.use_shadow(path, test_volume2) == expected
# test if the volume format is incorrect
self.assertRaises(Exception, winutils.use_shadow(path, test_volume))
self.assertRaises(Exception,
winutils.use_shadow(path, test_volume)) # noqa
# def test_start_sql_server(self):
# backup_opt = BackupOpt1()
# self.mock_process(FakeSubProcess())
# self.mock_winutils()
#
# assert winutils.start_sql_server(backup_opt.sql_server_instance) is not False
# assert winutils.start_sql_server(
# backup_opt.sql_server_instance) is not False
#
# self.mock_process(FakeSubProcess3())
# self.assertRaises(

View File

@ -81,7 +81,7 @@ commands = pylint --rcfile .pylintrc freezer
[flake8]
ignore = H405,H404,H403,H401
show-source = True
exclude = .venv,.tox,dist,doc,test,*egg,tests,freezer/tests/unit,releasenotes
exclude = .venv,.tox,dist,doc,test,*egg,releasenotes
[testenv:releasenotes]
commands = sphinx-build -a -E -d releasenotes/build/doctrees -b html releasenotes/source releasenotes/build/html