diff --git a/quantum/tests/unit/test_db_plugin.py b/quantum/tests/unit/test_db_plugin.py index 77a71322c9..52c4f3fe5b 100644 --- a/quantum/tests/unit/test_db_plugin.py +++ b/quantum/tests/unit/test_db_plugin.py @@ -33,6 +33,7 @@ from quantum import context from quantum.db import api as db from quantum.manager import QuantumManager from quantum.openstack.common import cfg +from quantum.tests.unit import test_extensions from quantum.tests.unit.testlib_api import create_request from quantum.wsgi import Serializer, JSONDeserializer @@ -49,7 +50,8 @@ def etcdir(*p): class QuantumDbPluginV2TestCase(unittest2.TestCase): - def setUp(self): + + def setUp(self, plugin=None): super(QuantumDbPluginV2TestCase, self).setUp() # NOTE(jkoelker) for a 'pluggable' framework, Quantum sure @@ -85,6 +87,10 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase): self._skip_native_bulk = not _is_native_bulk_supported() + ext_mgr = test_config.get('extension_manager', None) + if ext_mgr: + self.ext_api = test_extensions.setup_extensions_middleware(ext_mgr) + def tearDown(self): super(QuantumDbPluginV2TestCase, self).tearDown() # NOTE(jkoelker) for a 'pluggable' framework, Quantum sure @@ -94,14 +100,17 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase): cfg.CONF.reset() def _req(self, method, resource, data=None, fmt='json', - id=None, params=None): - if id: + id=None, params=None, action=None): + if id and action: + path = '/%(resource)s/%(id)s/%(action)s.%(fmt)s' % locals() + elif id: path = '/%(resource)s/%(id)s.%(fmt)s' % locals() else: path = '/%(resource)s.%(fmt)s' % locals() + content_type = 'application/%s' % fmt body = None - if data: + if data is not None: # empty dict is valid body = Serializer().serialize(data, content_type) return create_request(path, body, @@ -124,6 +133,9 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase): def new_update_request(self, resource, data, id, fmt='json'): return self._req('PUT', resource, data, fmt, id=id) + def new_action_request(self, resource, data, id, action, fmt='json'): + return self._req('PUT', resource, data, fmt, id=id, action=action) + def deserialize(self, content_type, response): ctype = 'application/%s' % content_type data = self._deserializers[ctype].deserialize(response.body)['body'] @@ -284,9 +296,36 @@ class QuantumDbPluginV2TestCase(unittest2.TestCase): res = self._create_port(fmt, net_id, **kwargs) return self.deserialize(fmt, res) - def _delete(self, collection, id): + def _api_for_resource(self, resource): + if resource in ['networks', 'subnets', 'ports']: + return self.api + else: + return self.ext_api + + def _delete(self, collection, id, + expected_code=webob.exc.HTTPNoContent.code): req = self.new_delete_request(collection, id) - req.get_response(self.api) + res = req.get_response(self._api_for_resource(collection)) + self.assertEqual(res.status_int, expected_code) + + def _show(self, resource, id, expected_code=webob.exc.HTTPOk.code): + req = self.new_show_request(resource, id) + res = req.get_response(self._api_for_resource(resource)) + self.assertEqual(res.status_int, expected_code) + return self.deserialize('json', res) + + def _update(self, resource, id, new_data, + expected_code=webob.exc.HTTPOk.code): + req = self.new_update_request(resource, new_data, id) + res = req.get_response(self._api_for_resource(resource)) + self.assertEqual(res.status_int, expected_code) + return self.deserialize('json', res) + + def _list(self, resource): + req = self.new_list_request(resource) + res = req.get_response(self._api_for_resource(resource)) + self.assertEqual(res.status_int, webob.exc.HTTPOk.code) + return self.deserialize('json', res) def _do_side_effect(self, patched_plugin, orig, *args, **kwargs): """ Invoked by test cases for injecting failures in plugin """ @@ -526,6 +565,7 @@ class TestPortsV2(QuantumDbPluginV2TestCase): for k, v in keys: self.assertEquals(port['port'][k], v) self.assertTrue('mac_address' in port['port']) + self._delete('ports', port['port']['id']) def test_create_ports_bulk_native(self): if self._skip_native_bulk: @@ -534,6 +574,8 @@ class TestPortsV2(QuantumDbPluginV2TestCase): res = self._create_port_bulk('json', 2, net['network']['id'], 'test', True) self._validate_behavior_on_bulk_success(res, 'ports') + for p in self.deserialize('json', res)['ports']: + self._delete('ports', p['id']) def test_create_ports_bulk_emulated(self): real_has_attr = hasattr @@ -550,6 +592,8 @@ class TestPortsV2(QuantumDbPluginV2TestCase): res = self._create_port_bulk('json', 2, net['network']['id'], 'test', True) self._validate_behavior_on_bulk_success(res, 'ports') + for p in self.deserialize('json', res)['ports']: + self._delete('ports', p['id']) def test_create_ports_bulk_wrong_input(self): with self.network() as net: @@ -654,6 +698,8 @@ class TestPortsV2(QuantumDbPluginV2TestCase): _list_and_test_ports(1, [port1], tenant_id='tenant_1') # Tenant_2 request - must return single port _list_and_test_ports(1, [port2], tenant_id='tenant_2') + self._delete('ports', port1['port']['id']) + self._delete('ports', port2['port']['id']) def test_show_port(self): with self.port() as port: @@ -877,6 +923,7 @@ class TestPortsV2(QuantumDbPluginV2TestCase): self.assertEquals(len(ips), 1) self.assertEquals(ips[0]['ip_address'], '10.0.0.3') self.assertEquals(ips[0]['subnet_id'], subnet['subnet']['id']) + self._delete('ports', port2['port']['id']) def test_requested_subnet_id_not_on_network(self): fmt = 'json' @@ -921,14 +968,16 @@ class TestPortsV2(QuantumDbPluginV2TestCase): self.assertEquals(ips[1]['ip_address'], '2607:f0d0:1002:51::2') self.assertEquals(ips[1]['subnet_id'], subnet2['subnet']['id']) res = self._create_port(fmt, net_id=net_id) - port3 = self.deserialize(fmt, res) + port4 = self.deserialize(fmt, res) # Check that a v4 and a v6 address are allocated - ips = port3['port']['fixed_ips'] + ips = port4['port']['fixed_ips'] self.assertEquals(len(ips), 2) self.assertEquals(ips[0]['ip_address'], '10.0.0.3') self.assertEquals(ips[0]['subnet_id'], subnet['subnet']['id']) self.assertEquals(ips[1]['ip_address'], '2607:f0d0:1002:51::3') self.assertEquals(ips[1]['subnet_id'], subnet2['subnet']['id']) + self._delete('ports', port3['port']['id']) + self._delete('ports', port4['port']['id']) def test_range_allocation(self): fmt = 'json' @@ -951,6 +1000,8 @@ class TestPortsV2(QuantumDbPluginV2TestCase): self.assertEquals(ips[i]['ip_address'], alloc[i]) self.assertEquals(ips[i]['subnet_id'], subnet['subnet']['id']) + self._delete('ports', port['port']['id']) + with self.subnet(gateway_ip='11.0.0.6', cidr='11.0.0.0/29') as subnet: kwargs = {"fixed_ips": @@ -970,6 +1021,7 @@ class TestPortsV2(QuantumDbPluginV2TestCase): self.assertEquals(ips[i]['ip_address'], alloc[i]) self.assertEquals(ips[i]['subnet_id'], subnet['subnet']['id']) + self._delete('ports', port['port']['id']) def test_requested_invalid_fixed_ips(self): fmt = 'json' @@ -1025,6 +1077,7 @@ class TestPortsV2(QuantumDbPluginV2TestCase): fmt = 'json' with self.subnet() as subnet: with self.port(subnet=subnet) as port: + ports_to_delete = [] ips = port['port']['fixed_ips'] self.assertEquals(len(ips), 1) self.assertEquals(ips[0]['ip_address'], '10.0.0.2') @@ -1035,21 +1088,27 @@ class TestPortsV2(QuantumDbPluginV2TestCase): net_id = port['port']['network_id'] res = self._create_port(fmt, net_id=net_id, **kwargs) port2 = self.deserialize(fmt, res) + ports_to_delete.append(port2) ips = port2['port']['fixed_ips'] self.assertEquals(len(ips), 1) self.assertEquals(ips[0]['ip_address'], '10.0.0.5') self.assertEquals(ips[0]['subnet_id'], subnet['subnet']['id']) # Allocate specific IP's allocated = ['10.0.0.3', '10.0.0.4', '10.0.0.6'] + for a in allocated: res = self._create_port(fmt, net_id=net_id) port2 = self.deserialize(fmt, res) + ports_to_delete.append(port2) ips = port2['port']['fixed_ips'] self.assertEquals(len(ips), 1) self.assertEquals(ips[0]['ip_address'], a) self.assertEquals(ips[0]['subnet_id'], subnet['subnet']['id']) + for p in ports_to_delete: + self._delete('ports', p['port']['id']) + def test_requested_ips_only(self): fmt = 'json' with self.subnet() as subnet: @@ -1060,16 +1119,20 @@ class TestPortsV2(QuantumDbPluginV2TestCase): self.assertEquals(ips[0]['subnet_id'], subnet['subnet']['id']) ips_only = ['10.0.0.18', '10.0.0.20', '10.0.0.22', '10.0.0.21', '10.0.0.3', '10.0.0.17', '10.0.0.19'] + ports_to_delete = [] for i in ips_only: kwargs = {"fixed_ips": [{'ip_address': i}]} net_id = port['port']['network_id'] res = self._create_port(fmt, net_id=net_id, **kwargs) port = self.deserialize(fmt, res) + ports_to_delete.append(port) ips = port['port']['fixed_ips'] self.assertEquals(len(ips), 1) self.assertEquals(ips[0]['ip_address'], i) self.assertEquals(ips[0]['subnet_id'], subnet['subnet']['id']) + for p in ports_to_delete: + self._delete('ports', p['port']['id']) def test_recycling(self): fmt = 'json' @@ -1095,6 +1158,7 @@ class TestPortsV2(QuantumDbPluginV2TestCase): self.assertEquals(len(ips), 1) self.assertEquals(ips[0]['ip_address'], '10.0.1.3') self.assertEquals(ips[0]['subnet_id'], subnet['subnet']['id']) + self._delete('ports', port['port']['id']) def test_invalid_admin_state(self): with self.network() as network: @@ -1181,56 +1245,64 @@ class TestNetworksV2(QuantumDbPluginV2TestCase): def test_update_network_set_not_shared_single_tenant(self): with self.network(shared=True) as network: - self._create_port('json', - network['network']['id'], - 201, - tenant_id=network['network']['tenant_id'], - set_context=True) + res1 = self._create_port('json', + network['network']['id'], + 201, + tenant_id=network['network']['tenant_id'], + set_context=True) data = {'network': {'shared': False}} req = self.new_update_request('networks', data, network['network']['id']) res = self.deserialize('json', req.get_response(self.api)) self.assertFalse(res['network']['shared']) + port1 = self.deserialize('json', res1) + self._delete('ports', port1['port']['id']) def test_update_network_set_not_shared_other_tenant_returns_409(self): with self.network(shared=True) as network: - self._create_port('json', - network['network']['id'], - 201, - tenant_id='somebody_else', - set_context=True) + res1 = self._create_port('json', + network['network']['id'], + 201, + tenant_id='somebody_else', + set_context=True) data = {'network': {'shared': False}} req = self.new_update_request('networks', data, network['network']['id']) self.assertEqual(req.get_response(self.api).status_int, 409) + port1 = self.deserialize('json', res1) + self._delete('ports', port1['port']['id']) def test_update_network_set_not_shared_multi_tenants_returns_409(self): with self.network(shared=True) as network: - self._create_port('json', - network['network']['id'], - 201, - tenant_id='somebody_else', - set_context=True) - self._create_port('json', - network['network']['id'], - 201, - tenant_id=network['network']['tenant_id'], - set_context=True) + res1 = self._create_port('json', + network['network']['id'], + 201, + tenant_id='somebody_else', + set_context=True) + res2 = self._create_port('json', + network['network']['id'], + 201, + tenant_id=network['network']['tenant_id'], + set_context=True) data = {'network': {'shared': False}} req = self.new_update_request('networks', data, network['network']['id']) self.assertEqual(req.get_response(self.api).status_int, 409) + port1 = self.deserialize('json', res1) + port2 = self.deserialize('json', res2) + self._delete('ports', port1['port']['id']) + self._delete('ports', port2['port']['id']) def test_update_network_set_not_shared_multi_tenants2_returns_409(self): with self.network(shared=True) as network: - self._create_port('json', - network['network']['id'], - 201, - tenant_id='somebody_else', - set_context=True) + res1 = self._create_port('json', + network['network']['id'], + 201, + tenant_id='somebody_else', + set_context=True) self._create_subnet('json', network['network']['id'], '10.0.0.0/24', @@ -1243,6 +1315,9 @@ class TestNetworksV2(QuantumDbPluginV2TestCase): network['network']['id']) self.assertEqual(req.get_response(self.api).status_int, 409) + port1 = self.deserialize('json', res1) + self._delete('ports', port1['port']['id']) + def test_create_networks_bulk_native(self): if self._skip_native_bulk: self.skipTest("Plugin does not support native bulk network create")