Merge "Nova Hyper-V driver refactoring"

This commit is contained in:
Jenkins
2013-01-30 00:49:44 +00:00
committed by Gerrit Code Review
264 changed files with 742 additions and 1060 deletions

View File

@@ -1,83 +0,0 @@
=====================================
OpenStack Hyper-V Nova Testing Architecture
=====================================
The Hyper-V Nova Compute plugin uses Windows Management Instrumentation (WMI)
as the main API for hypervisor related operations.
WMI has a database / procedural oriented nature that can become difficult to
test with a traditional static mock / stub based unit testing approach.
The included Hyper-V testing framework has been developed with the
following goals:
1) Dynamic mock generation.
2) Decoupling. No dependencies on WMI or any other module.
The tests are designed to work with mocked objects in all cases, including
OS-dependent (e.g. wmi, os, subprocess) and non-deterministic
(e.g. time, uuid) modules
3) Transparency. Mocks and real objects can be swapped via DI
or monkey patching.
4) Platform independence.
5) Tests need to be executed against the real object or against the mocks
with a simple configuration switch. Development efforts can highly
benefit from this feature.
6) It must be possible to change a mock's behavior without running the tests
against the hypervisor (e.g. by manually adding a value / return value).
The tests included in this package include dynamically generated mock objects,
based on the recording of the attribute values and invocations on the
real WMI objects and other OS dependent features.
The generated mock objects are serialized in the nova/tests/hyperv/stubs
directory as gzipped pickled objects.
An environment variable controls the execution mode of the tests.
Recording mode:
NOVA_GENERATE_TEST_MOCKS=True
Tests are executed on the hypervisor (without mocks), and mock objects are
generated.
Replay mode:
NOVA_GENERATE_TEST_MOCKS=
Tests are executed with the existing mock objects (default).
Mock generation is performed by nova.tests.hyperv.mockproxy.MockProxy.
Instances of this class wrap objects that need to be mocked and act as a
delegate on the wrapped object by leveraging Python's __getattr__ feature.
Attribute values and method call return values are recorded at each access.
Objects returned by attributes and method invocations are wrapped in a
MockProxy consistently.
From a caller perspective, the MockProxy is completely transparent,
with the exception of calls to the type(...) builtin function.
At the end of the test, a mock is generated by each MockProxy by calling
the get_mock() method. A mock is represented by an instance of the
nova.tests.hyperv.mockproxy.Mock class.
The Mock class task consists of replicating the behaviour of the mocked
objects / modules by returning the same values in the same order, for example:
def check_path(path):
if not os.path.exists(path):
os.makedirs(path)
check_path(path)
# The second time os.path.exists returns True
check_path(path)
The injection of MockProxy / Mock instances is performed by the
nova.tests.hyperv.basetestcase.BaseTestCase class in the setUp()
method via selective monkey patching.
Mocks are serialized in tearDown() during recording.
The actual Hyper-V test case inherits from BaseTestCase:
nova.tests.hyperv.test_hypervapi.HyperVAPITestCase
Future directions:
1) Replace the pickled files with a more generic serialization option (e.g. json)
2) Add methods to statically extend the mocks (e.g. method call return values)
3) Extend an existing framework, e.g. mox

View File

@@ -0,0 +1,16 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Cloudbase Solutions Srl
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@@ -1,105 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
TestCase for MockProxy based tests and related classes.
"""
import gzip
import os
import pickle
import sys
from nova import test
from nova.tests.hyperv import mockproxy
gen_test_mocks_key = 'NOVA_GENERATE_TEST_MOCKS'
class BaseTestCase(test.TestCase):
"""TestCase for MockProxy based tests."""
def run(self, result=None):
self._currentResult = result
super(BaseTestCase, self).run(result)
def setUp(self):
super(BaseTestCase, self).setUp()
self._mps = {}
def tearDown(self):
super(BaseTestCase, self).tearDown()
# python-subunit will wrap test results with a decorator.
# Need to access the decorated member of results to get the
# actual test result when using python-subunit.
if hasattr(self._currentResult, 'decorated'):
result = self._currentResult.decorated
else:
result = self._currentResult
has_errors = len([test for (test, msgs) in result.errors
if test.id() == self.id()]) > 0
failed = len([test for (test, msgs) in result.failures
if test.id() == self.id()]) > 0
if not has_errors and not failed:
self._save_mock_proxies()
def _save_mock(self, name, mock):
path = self._get_stub_file_path(self.id(), name)
pickle.dump(mock, gzip.open(path, 'wb'))
def _get_stub_file_path(self, test_name, mock_name):
# test naming differs between platforms
prefix = 'nova.tests.'
if test_name.startswith(prefix):
test_name = test_name[len(prefix):]
file_name = '{0}_{1}.p.gz'.format(test_name, mock_name)
return os.path.join(os.path.dirname(mockproxy.__file__),
"stubs", file_name)
def _load_mock(self, name):
path = self._get_stub_file_path(self.id(), name)
if os.path.exists(path):
return pickle.load(gzip.open(path, 'rb'))
else:
return None
def _load_mock_or_create_proxy(self, module_name):
m = None
if not gen_test_mocks_key in os.environ or \
os.environ[gen_test_mocks_key].lower() \
not in ['true', 'yes', '1']:
m = self._load_mock(module_name)
else:
__import__(module_name)
module = sys.modules[module_name]
m = mockproxy.MockProxy(module)
self._mps[module_name] = m
return m
def _inject_mocks_in_modules(self, objects_to_mock, modules_to_test):
for module_name in objects_to_mock:
mp = self._load_mock_or_create_proxy(module_name)
for mt in modules_to_test:
module_local_name = module_name.split('.')[-1]
setattr(mt, module_local_name, mp)
def _save_mock_proxies(self):
for name, mp in self._mps.items():
m = mp.get_mock()
if m.has_values():
self._save_mock(name, m)

View File

@@ -29,35 +29,35 @@ from nova import utils
def get_fake_instance_data(name, project_id, user_id):
return {'name': name,
'id': 1,
'uuid': str(uuid.uuid4()),
'project_id': project_id,
'user_id': user_id,
'image_ref': "1",
'kernel_id': "1",
'ramdisk_id': "1",
'mac_address': "de:ad:be:ef:be:ef",
'instance_type':
{'name': 'm1.tiny',
'memory_mb': 512,
'vcpus': 1,
'root_gb': 0,
'flavorid': 1,
'rxtx_factor': 1}
}
'id': 1,
'uuid': str(uuid.uuid4()),
'project_id': project_id,
'user_id': user_id,
'image_ref': "1",
'kernel_id': "1",
'ramdisk_id': "1",
'mac_address': "de:ad:be:ef:be:ef",
'instance_type':
{'name': 'm1.tiny',
'memory_mb': 512,
'vcpus': 1,
'root_gb': 0,
'flavorid': 1,
'rxtx_factor': 1}
}
def get_fake_image_data(project_id, user_id):
return {'name': 'image1',
'id': 1,
'project_id': project_id,
'user_id': user_id,
'image_ref': "1",
'kernel_id': "1",
'ramdisk_id': "1",
'mac_address': "de:ad:be:ef:be:ef",
'instance_type': 'm1.tiny',
}
'id': 1,
'project_id': project_id,
'user_id': user_id,
'image_ref': "1",
'kernel_id': "1",
'ramdisk_id': "1",
'mac_address': "de:ad:be:ef:be:ef",
'instance_type': 'm1.tiny',
}
def get_fake_volume_info_data(target_portal, volume_id):
@@ -72,25 +72,25 @@ def get_fake_volume_info_data(target_portal, volume_id):
'auth_method': 'fake',
'auth_method': 'fake',
}
}
}
def get_fake_block_device_info(target_portal, volume_id):
return {
'block_device_mapping': [{'connection_info': {
'driver_volume_type': 'iscsi',
'data': {'target_lun': 1,
'volume_id': volume_id,
'target_iqn': 'iqn.2010-10.org.openstack:volume-' +
volume_id,
'target_portal': target_portal,
'target_discovered': False}},
'mount_device': 'vda',
'delete_on_termination': False}],
return {'block_device_mapping': [{'connection_info': {
'driver_volume_type': 'iscsi',
'data': {'target_lun': 1,
'volume_id': volume_id,
'target_iqn':
'iqn.2010-10.org.openstack:volume-' +
volume_id,
'target_portal': target_portal,
'target_discovered': False}},
'mount_device': 'vda',
'delete_on_termination': False}],
'root_device_name': None,
'ephemerals': [],
'swap': None
}
}
def stub_out_db_instance_api(stubs):
@@ -99,11 +99,9 @@ def stub_out_db_instance_api(stubs):
INSTANCE_TYPES = {
'm1.tiny': dict(memory_mb=512, vcpus=1, root_gb=0, flavorid=1),
'm1.small': dict(memory_mb=2048, vcpus=1, root_gb=20, flavorid=2),
'm1.medium':
dict(memory_mb=4096, vcpus=2, root_gb=40, flavorid=3),
'm1.medium': dict(memory_mb=4096, vcpus=2, root_gb=40, flavorid=3),
'm1.large': dict(memory_mb=8192, vcpus=4, root_gb=80, flavorid=4),
'm1.xlarge':
dict(memory_mb=16384, vcpus=8, root_gb=160, flavorid=5)}
'm1.xlarge': dict(memory_mb=16384, vcpus=8, root_gb=160, flavorid=5)}
class FakeModel(object):
"""Stubs out for model."""
@@ -152,7 +150,7 @@ def stub_out_db_instance_api(stubs):
'vcpus': instance_type['vcpus'],
'mac_addresses': [{'address': values['mac_address']}],
'root_gb': instance_type['root_gb'],
}
}
return FakeModel(base_options)
def fake_network_get_by_instance(context, instance_id):
@@ -181,4 +179,4 @@ def stub_out_db_instance_api(stubs):
stubs.Set(db, 'instance_type_get_all', fake_instance_type_get_all)
stubs.Set(db, 'instance_type_get_by_name', fake_instance_type_get_by_name)
stubs.Set(db, 'block_device_mapping_get_all_by_instance',
fake_block_device_mapping_get_all_by_instance)
fake_block_device_mapping_get_all_by_instance)

46
nova/tests/hyperv/fake.py Normal file
View File

@@ -0,0 +1,46 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Cloudbase Solutions Srl
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import io
import os
class PathUtils(object):
def open(self, path, mode):
return io.BytesIO(b'fake content')
def get_instances_path(self):
return 'C:\\FakePath\\'
def get_instance_path(self, instance_name):
return os.path.join(self.get_instances_path(), instance_name)
def get_vhd_path(self, instance_name):
instance_path = self.get_instance_path(instance_name)
return os.path.join(instance_path, instance_name + ".vhd")
def get_base_vhd_path(self, image_name):
base_dir = os.path.join(self.get_instances_path(), '_base')
return os.path.join(base_dir, image_name + ".vhd")
def make_export_path(self, instance_name):
export_folder = os.path.join(self.get_instances_path(), "export",
instance_name)
return export_folder
def vhd_exists(self, path):
return False

View File

@@ -1,262 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Hyper-V classes to be used in testing.
"""
import sys
import time
from nova import exception
from nova.virt.hyperv import constants
from nova.virt.hyperv import volumeutilsV2
from xml.etree import ElementTree
# Check needed for unit testing on Unix
if sys.platform == 'win32':
import wmi
class HyperVUtils(object):
def __init__(self):
self.__conn = None
self.__conn_v2 = None
self.__conn_cimv2 = None
self.__conn_wmi = None
self.__conn_storage = None
self._volumeutils = volumeutilsV2.VolumeUtilsV2(
self._conn_storage, self._conn_wmi)
@property
def _conn(self):
if self.__conn is None:
self.__conn = wmi.WMI(moniker='//./root/virtualization')
return self.__conn
@property
def _conn_v2(self):
if self.__conn_v2 is None:
self.__conn_v2 = wmi.WMI(moniker='//./root/virtualization/v2')
return self.__conn_v2
@property
def _conn_cimv2(self):
if self.__conn_cimv2 is None:
self.__conn_cimv2 = wmi.WMI(moniker='//./root/cimv2')
return self.__conn_cimv2
@property
def _conn_wmi(self):
if self.__conn_wmi is None:
self.__conn_wmi = wmi.WMI(moniker='//./root/wmi')
return self.__conn_wmi
@property
def _conn_storage(self):
if self.__conn_storage is None:
storage_namespace = '//./Root/Microsoft/Windows/Storage'
self.__conn_storage = wmi.WMI(moniker=storage_namespace)
return self.__conn_storage
def create_vhd(self, path):
image_service = self._conn.query(
"Select * from Msvm_ImageManagementService")[0]
(job, ret_val) = image_service.CreateDynamicVirtualHardDisk(
Path=path, MaxInternalSize=3 * 1024 * 1024)
if ret_val == constants.WMI_JOB_STATUS_STARTED:
success = self._check_job_status(job)
else:
success = (ret_val == 0)
if not success:
raise Exception('Failed to create Dynamic disk %s with error %d'
% (path, ret_val))
def _check_job_status(self, jobpath):
"""Poll WMI job state for completion."""
job_wmi_path = jobpath.replace('\\', '/')
job = wmi.WMI(moniker=job_wmi_path)
while job.JobState == constants.WMI_JOB_STATE_RUNNING:
time.sleep(0.1)
job = wmi.WMI(moniker=job_wmi_path)
return job.JobState == constants.WMI_JOB_STATE_COMPLETED
def _get_vm(self, vm_name, conn=None):
if conn is None:
conn = self._conn
vml = conn.Msvm_ComputerSystem(ElementName=vm_name)
if not len(vml):
raise exception.InstanceNotFound(instance=vm_name)
return vml[0]
def remote_vm_exists(self, server, vm_name):
conn = wmi.WMI(moniker='//' + server + '/root/virtualization')
return self._vm_exists(conn, vm_name)
def vm_exists(self, vm_name):
return self._vm_exists(self._conn, vm_name)
def _vm_exists(self, conn, vm_name):
return len(conn.Msvm_ComputerSystem(ElementName=vm_name)) > 0
def _get_vm_summary(self, vm_name):
vm = self._get_vm(vm_name)
vs_man_svc = self._conn.Msvm_VirtualSystemManagementService()[0]
vmsettings = vm.associators(
wmi_association_class='Msvm_SettingsDefineState',
wmi_result_class='Msvm_VirtualSystemSettingData')
settings_paths = [v.path_() for v in vmsettings]
return vs_man_svc.GetSummaryInformation([100, 105],
settings_paths)[1][0]
def get_vm_uptime(self, vm_name):
return self._get_vm_summary(vm_name).UpTime
def get_vm_state(self, vm_name):
return self._get_vm_summary(vm_name).EnabledState
def set_vm_state(self, vm_name, req_state):
self._set_vm_state(self._conn, vm_name, req_state)
def _set_vm_state(self, conn, vm_name, req_state):
vm = self._get_vm(vm_name, conn)
(job, ret_val) = vm.RequestStateChange(req_state)
success = False
if ret_val == constants.WMI_JOB_STATUS_STARTED:
success = self._check_job_status(job)
elif ret_val == 0:
success = True
elif ret_val == 32775:
#Invalid state for current operation. Typically means it is
#already in the state requested
success = True
if not success:
raise Exception(_("Failed to change vm state of %(vm_name)s"
" to %(req_state)s") % locals())
def get_vm_disks(self, vm_name):
return self._get_vm_disks(self._conn, vm_name)
def _get_vm_disks(self, conn, vm_name):
vm = self._get_vm(vm_name, conn)
vmsettings = vm.associators(
wmi_result_class='Msvm_VirtualSystemSettingData')
rasds = vmsettings[0].associators(
wmi_result_class='MSVM_ResourceAllocationSettingData')
disks = [r for r in rasds
if r.ResourceSubType == 'Microsoft Virtual Hard Disk']
disk_files = []
for disk in disks:
disk_files.extend([c for c in disk.Connection])
volumes = [r for r in rasds
if r.ResourceSubType == 'Microsoft Physical Disk Drive']
volume_drives = []
for volume in volumes:
hostResources = volume.HostResource
drive_path = hostResources[0]
volume_drives.append(drive_path)
dvds = [r for r in rasds
if r.ResourceSubType == 'Microsoft Virtual CD/DVD Disk']
dvd_files = []
for dvd in dvds:
dvd_files.extend([c for c in dvd.Connection])
return (disk_files, volume_drives, dvd_files)
def remove_remote_vm(self, server, vm_name):
conn = wmi.WMI(moniker='//' + server + '/root/virtualization')
conn_cimv2 = wmi.WMI(moniker='//' + server + '/root/cimv2')
self._remove_vm(vm_name, conn, conn_cimv2)
def remove_vm(self, vm_name):
self._remove_vm(vm_name, self._conn, self._conn_cimv2)
def _remove_vm(self, vm_name, conn, conn_cimv2):
vm = self._get_vm(vm_name, conn)
vs_man_svc = conn.Msvm_VirtualSystemManagementService()[0]
#Stop the VM first.
self._set_vm_state(conn, vm_name, 3)
(disk_files, volume_drives, dvd_files) = self._get_vm_disks(conn,
vm_name)
(job, ret_val) = vs_man_svc.DestroyVirtualSystem(vm.path_())
if ret_val == constants.WMI_JOB_STATUS_STARTED:
success = self._check_job_status(job)
elif ret_val == 0:
success = True
if not success:
raise Exception(_('Failed to destroy vm %s') % vm_name)
#Delete associated vhd disk files.
for disk in disk_files + dvd_files:
vhd_file = conn_cimv2.query(
"Select * from CIM_DataFile where Name = '" +
disk.replace("'", "''") + "'")[0]
vhd_file.Delete()
def _get_target_iqn(self, volume_id):
return 'iqn.2010-10.org.openstack:volume-' + volume_id
def logout_iscsi_volume_sessions(self, volume_id):
target_iqn = self._get_target_iqn(volume_id)
if (self.iscsi_volume_sessions_exist(volume_id)):
self._volumeutils.logout_storage_target(target_iqn)
def iscsi_volume_sessions_exist(self, volume_id):
target_iqn = self._get_target_iqn(volume_id)
return len(self._conn_wmi.query(
"SELECT * FROM MSiSCSIInitiator_SessionClass \
WHERE TargetName='" + target_iqn + "'")) > 0
def get_vm_count(self):
return len(self._conn.query(
"Select * from Msvm_ComputerSystem where Description "
"<> 'Microsoft Hosting Computer System'"))
def get_vm_snapshots_count(self, vm_name):
return len(self._conn.query(
"Select * from Msvm_VirtualSystemSettingData where \
SettingType = 5 and SystemName = '" + vm_name + "'"))
def get_vhd_parent_path(self, vhd_path):
image_man_svc = self._conn.Msvm_ImageManagementService()[0]
(vhd_info, job_path, ret_val) = \
image_man_svc.GetVirtualHardDiskInfo(vhd_path)
if ret_val == constants.WMI_JOB_STATUS_STARTED:
success = self._check_job_status(job_path)
else:
success = (ret_val == 0)
if not success:
raise Exception(_("Failed to get info for disk %s") %
(vhd_path))
base_disk_path = None
et = ElementTree.fromstring(vhd_info)
for item in et.findall("PROPERTY"):
if item.attrib["NAME"] == "ParentPath":
base_disk_path = item.find("VALUE").text
break
return base_disk_path

View File

@@ -1,272 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 Cloudbase Solutions Srl
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
"""
Classes for dynamic generation of mock objects.
"""
import inspect
def serialize_obj(obj):
if isinstance(obj, float):
val = str(round(obj, 10))
elif isinstance(obj, dict):
d = {}
for k1, v1 in obj.items():
d[k1] = serialize_obj(v1)
val = str(d)
elif isinstance(obj, list):
l1 = []
for i1 in obj:
l1.append(serialize_obj(i1))
val = str(l1)
elif isinstance(obj, tuple):
l1 = ()
for i1 in obj:
l1 = l1 + (serialize_obj(i1),)
val = str(l1)
else:
if isinstance(obj, str) or isinstance(obj, unicode):
val = obj
elif hasattr(obj, '__str__') and inspect.ismethod(obj.__str__):
val = str(obj)
else:
val = str(type(obj))
return val
def serialize_args(*args, **kwargs):
"""Workaround for float string conversion issues in Python 2.6."""
return serialize_obj((args, kwargs))
class MockException(Exception):
def __init__(self, message):
super(MockException, self).__init__(message)
class Mock(object):
def _get_next_value(self, name):
c = self._access_count.get(name)
if c is None:
c = 0
else:
c = c + 1
self._access_count[name] = c
try:
value = self._values[name][c]
except IndexError as ex:
raise MockException(_('Couldn\'t find invocation num. %(c)d '
'of attribute "%(name)s"') % locals())
return value
def _get_next_ret_value(self, name, params):
d = self._access_count.get(name)
if d is None:
d = {}
self._access_count[name] = d
c = d.get(params)
if c is None:
c = 0
else:
c = c + 1
d[params] = c
try:
m = self._values[name]
except KeyError as ex:
raise MockException(_('Couldn\'t find attribute "%s"') % (name))
try:
value = m[params][c]
except KeyError as ex:
raise MockException(_('Couldn\'t find attribute "%(name)s" '
'with arguments "%(params)s"') % locals())
except IndexError as ex:
raise MockException(_('Couldn\'t find invocation num. %(c)d '
'of attribute "%(name)s" with arguments "%(params)s"')
% locals())
return value
def __init__(self, values):
self._values = values
self._access_count = {}
def has_values(self):
return len(self._values) > 0
def __getattr__(self, name):
if name.startswith('__') and name.endswith('__'):
return object.__getattribute__(self, name)
else:
try:
isdict = isinstance(self._values[name], dict)
except KeyError as ex:
raise MockException(_('Couldn\'t find attribute "%s"')
% (name))
if isdict:
def newfunc(*args, **kwargs):
params = serialize_args(args, kwargs)
return self._get_next_ret_value(name, params)
return newfunc
else:
return self._get_next_value(name)
def __str__(self):
return self._get_next_value('__str__')
def __iter__(self):
return getattr(self._get_next_value('__iter__'), '__iter__')()
def __len__(self):
return self._get_next_value('__len__')
def __getitem__(self, key):
return self._get_next_ret_value('__getitem__', str(key))
def __call__(self, *args, **kwargs):
params = serialize_args(args, kwargs)
return self._get_next_ret_value('__call__', params)
class MockProxy(object):
def __init__(self, wrapped):
self._wrapped = wrapped
self._recorded_values = {}
def _get_proxy_object(self, obj):
if hasattr(obj, '__dict__') or isinstance(obj, tuple) or \
isinstance(obj, list) or isinstance(obj, dict):
p = MockProxy(obj)
else:
p = obj
return p
def __getattr__(self, name):
if name in ['_wrapped']:
return object.__getattribute__(self, name)
else:
attr = getattr(self._wrapped, name)
if inspect.isfunction(attr) or inspect.ismethod(attr) or \
inspect.isbuiltin(attr):
def newfunc(*args, **kwargs):
result = attr(*args, **kwargs)
p = self._get_proxy_object(result)
params = serialize_args(args, kwargs)
self._add_recorded_ret_value(name, params, p)
return p
return newfunc
elif hasattr(attr, '__dict__') or (hasattr(attr, '__getitem__')
and not (isinstance(attr, str) or isinstance(attr, unicode))):
p = MockProxy(attr)
else:
p = attr
self._add_recorded_value(name, p)
return p
def __setattr__(self, name, value):
if name in ['_wrapped', '_recorded_values']:
object.__setattr__(self, name, value)
else:
setattr(self._wrapped, name, value)
def _add_recorded_ret_value(self, name, params, val):
d = self._recorded_values.get(name)
if d is None:
d = {}
self._recorded_values[name] = d
l = d.get(params)
if l is None:
l = []
d[params] = l
l.append(val)
def _add_recorded_value(self, name, val):
if not name in self._recorded_values:
self._recorded_values[name] = []
self._recorded_values[name].append(val)
def get_mock(self):
values = {}
for k, v in self._recorded_values.items():
if isinstance(v, dict):
d = {}
values[k] = d
for k1, v1 in v.items():
l = []
d[k1] = l
for i1 in v1:
if isinstance(i1, MockProxy):
l.append(i1.get_mock())
else:
l.append(i1)
else:
l = []
values[k] = l
for i in v:
if isinstance(i, MockProxy):
l.append(i.get_mock())
elif isinstance(i, dict):
d = {}
for k1, v1 in v.items():
if isinstance(v1, MockProxy):
d[k1] = v1.get_mock()
else:
d[k1] = v1
l.append(d)
elif isinstance(i, list):
l1 = []
for i1 in i:
if isinstance(i1, MockProxy):
l1.append(i1.get_mock())
else:
l1.append(i1)
l.append(l1)
else:
l.append(i)
return Mock(values)
def __str__(self):
s = str(self._wrapped)
self._add_recorded_value('__str__', s)
return s
def __len__(self):
l = len(self._wrapped)
self._add_recorded_value('__len__', l)
return l
def __iter__(self):
it = []
for i in self._wrapped:
it.append(self._get_proxy_object(i))
self._add_recorded_value('__iter__', it)
return iter(it)
def __getitem__(self, key):
p = self._get_proxy_object(self._wrapped[key])
self._add_recorded_ret_value('__getitem__', str(key), p)
return p
def __call__(self, *args, **kwargs):
c = self._wrapped(*args, **kwargs)
p = self._get_proxy_object(c)
params = serialize_args(args, kwargs)
self._add_recorded_ret_value('__call__', params, p)
return p

View File

@@ -1,2 +0,0 @@
Files with extension p.gz are compressed pickle files containing serialized
mocks used during unit testing

Some files were not shown because too many files have changed in this diff Show More