improving tests

This commit is contained in:
Eldar Nugaev
2011-04-04 18:33:50 +04:00
parent 432d46f0eb
commit a476dbba65

View File

@@ -44,6 +44,22 @@ def _concurrency(wait, done, target):
done.send()
def _create_network_info(count=1):
fake = 'fake'
fake_ip = '0.0.0.0/0'
fake_ip_2 = '0.0.0.1/0'
fake_ip_3 = '0.0.0.1/0'
network = {'gateway': fake,
'gateway_v6': fake,
'bridge': fake,
'cidr': fake_ip,
'cidr_v6': fake_ip}
mapping = {'mac': fake,
'ips': [{'ip': fake_ip}, {'ip': fake_ip}],
'ip6s': [{'ip': fake_ip}, {'ip': fake_ip_2}, {'ip': fake_ip_3}]}
return [(network, mapping) for x in xrange(0, count)]
class CacheConcurrencyTestCase(test.TestCase):
def setUp(self):
super(CacheConcurrencyTestCase, self).setUp()
@@ -192,19 +208,6 @@ class LibvirtConnTestCase(test.TestCase):
return db.service_create(context.get_admin_context(), service_ref)
def _create_network_info(self, count=1):
fake = 'fake'
fake_ip = '0.0.0.0/0'
network = {'gateway': fake,
'gateway_v6': fake,
'bridge': fake,
'cidr': fake_ip,
'cidr_v6': fake_ip}
mapping = {'mac': fake,
'ips': [{'ip': fake_ip}]}
return [(network, mapping) for x in xrange(0, count)]
def test_preparing_xml_info(self):
conn = libvirt_conn.LibvirtConnection(True)
instance_ref = db.instance_create(self.context, self.test_instance)
@@ -213,16 +216,16 @@ class LibvirtConnTestCase(test.TestCase):
self.assertFalse(result['nics'])
result = conn._prepare_xml_info(instance_ref, False,
self._create_network_info())
_create_network_info())
self.assertTrue(len(result['nics']) == 1)
result = conn._prepare_xml_info(instance_ref, False,
self._create_network_info(2))
_create_network_info(2))
self.assertTrue(len(result['nics']) == 2)
def test_get_nic_for_xml(self):
conn = libvirt_conn.LibvirtConnection(True)
network, mapping = self._create_network_info()[0]
network, mapping = _create_network_info()[0]
backup = FLAGS.use_ipv6
FLAGS.use_ipv6 = False
params_1 = conn._get_nic_for_xml(network, mapping)['extra_params']
@@ -271,12 +274,19 @@ class LibvirtConnTestCase(test.TestCase):
def test_multi_nic(self):
instance_data = dict(self.test_instance)
network_info = self._create_network_info(2)
network_info = _create_network_info(2)
conn = libvirt_conn.LibvirtConnection(True)
instance_ref = db.instance_create(self.context, instance_data)
xml = conn.to_xml(instance_ref, False, network_info)
tree = xml_to_tree(xml)
self.assertEquals(len(tree.findall("./devices/interface")), 2)
interfaces = tree.findall("./devices/interface")
self.assertEquals(len(interfaces), 2)
parameters = interfaces[0].findall('./filterref/parameter')
self.assertEquals(interfaces[0].get('type'), 'bridge')
self.assertEquals(parameters[0].get('name'), 'IP')
self.assertEquals(parameters[0].get('value'), '0.0.0.0/0')
self.assertEquals(parameters[1].get('name'), 'DHCPSERVER')
self.assertEquals(parameters[1].get('value'), 'fake')
def _check_xml_and_container(self, instance):
user_context = context.RequestContext(project=self.project,
@@ -656,11 +666,14 @@ class IptablesFirewallTestCase(test.TestCase):
'# Completed on Tue Jan 18 23:47:56 2011',
]
def _create_instance_ref(self):
return db.instance_create(self.context,
{'user_id': 'fake',
'project_id': 'fake',
'mac_address': '56:12:12:12:12:12'})
def test_static_filters(self):
instance_ref = db.instance_create(self.context,
{'user_id': 'fake',
'project_id': 'fake',
'mac_address': '56:12:12:12:12:12'})
instance_ref = self._create_instance_ref()
ip = '10.11.12.13'
network_ref = db.project_get_network(self.context,
@@ -771,6 +784,25 @@ class IptablesFirewallTestCase(test.TestCase):
"TCP port 80/81 acceptance rule wasn't added")
db.instance_destroy(admin_ctxt, instance_ref['id'])
def test_filters_for_instance(self):
network_info = _create_network_info()
rulesv4, rulesv6 = self.fw._filters_for_instance("fake", network_info)
self.assertEquals(len(rulesv4), 2)
self.assertEquals(len(rulesv6), 3)
def multinic_iptables_test(self):
instance_ref = self._create_instance_ref()
network_info = _create_network_info()
ipv4_len = len(self.fw.iptables.ipv4['filter'].rules)
ipv6_len = len(self.fw.iptables.ipv6['filter'].rules)
inst_ipv4, inst_ipv6 = self.fw.instance_rules(instance_ref,
network_info)
self.fw.add_filters_for_instance(instance_ref, network_info)
ipv4 = self.fw.iptables.ipv4['filter'].rules
ipv6 = self.fw.iptables.ipv6['filter'].rules
self.assertEquals(len(ipv4) - len(inst_ipv4) - ipv4_len, 2)
self.assertEquals(len(ipv6) - len(inst_ipv6) - ipv6_len, 3)
class NWFilterTestCase(test.TestCase):
def setUp(self):