UT: do not hide an original error in test resource ctxtmgr

In unit tests, resource contextmanagers such as network(), subnet()
try to delete themselves after returning from yield even if an
exception occurs. However when an exception occurs, there is a case
where deletion fails. In this case original exception will be hidden
and it makes difficult to debug test failures.

Before each test starts, resources like database entries will be
recreated, so there is no need to try to delete resources even
when an exception occurs. This commit removes try-finally clause
from resource contextmanagers to make original errors visible.

Closes-Bug: #1295887
Change-Id: Ia844d2aa2c9fc036e643068c5284f64798963ee3
This commit is contained in:
Akihiro Motoki 2014-03-22 08:17:20 +09:00 committed by Gerrit Code Review
parent 721830c541
commit 9730c18584
14 changed files with 210 additions and 256 deletions

View File

@ -303,11 +303,9 @@ class TestCiscoPortsV2(CiscoNetworkPluginV2TestCase,
res = self._create_port(self.fmt, net_id, arg_list=args,
context=ctx, **port_dict)
port = self.deserialize(self.fmt, res)
try:
yield res
finally:
if do_delete:
self._delete('ports', port['port']['id'])
yield res
if do_delete:
self._delete('ports', port['port']['id'])
def test_create_ports_bulk_emulated_plugin_failure(self):
real_has_attr = hasattr
@ -961,11 +959,9 @@ class TestCiscoNetworksV2(CiscoNetworkPluginV2TestCase,
res = self._create_network(self.fmt, net_name, True,
arg_list=arg_list, **provider_attrs)
network = self.deserialize(self.fmt, res)['network']
try:
yield network
finally:
req = self.new_delete_request('networks', network['id'])
req.get_response(self.api)
yield network
req = self.new_delete_request('networks', network['id'])
req.get_response(self.api)
def test_create_provider_vlan_network(self):
with self._provider_vlan_network(PHYS_NET, '1234',
@ -1073,10 +1069,8 @@ class TestCiscoRouterInterfacesV2(CiscoNetworkPluginV2TestCase):
request = self.new_create_request('routers', data, self.fmt)
response = request.get_response(self.ext_api)
router = self.deserialize(self.fmt, response)
try:
yield network, subnet, router
finally:
self._delete('routers', router['router']['id'])
yield network, subnet, router
self._delete('routers', router['router']['id'])
@contextlib.contextmanager
def _router_interface(self, router, subnet, **kwargs):
@ -1089,15 +1083,15 @@ class TestCiscoRouterInterfacesV2(CiscoNetworkPluginV2TestCase):
router['router']['id'],
'add_router_interface')
response = request.get_response(self.ext_api)
try:
yield response
finally:
# If router interface was created successfully, delete it now.
if response.status_int == wexc.HTTPOk.code:
request = self.new_action_request('routers', interface_data,
router['router']['id'],
'remove_router_interface')
request.get_response(self.ext_api)
yield response
# If router interface was created successfully, delete it now.
if response.status_int == wexc.HTTPOk.code:
request = self.new_action_request('routers', interface_data,
router['router']['id'],
'remove_router_interface')
request.get_response(self.ext_api)
@contextlib.contextmanager
def _network_subnet_router_interface(self, **kwargs):

View File

@ -188,12 +188,10 @@ class FirewallPluginDbTestCase(test_db_plugin.NeutronDbPluginV2TestCase):
if res.status_int >= 400:
raise webob.exc.HTTPClientError(code=res.status_int)
firewall_policy = self.deserialize(fmt or self.fmt, res)
try:
yield firewall_policy
finally:
if not no_delete:
self._delete('firewall_policies',
firewall_policy['firewall_policy']['id'])
yield firewall_policy
if not no_delete:
self._delete('firewall_policies',
firewall_policy['firewall_policy']['id'])
def _create_firewall_rule(self, fmt, name, shared, protocol,
ip_version, source_ip_address,
@ -240,12 +238,10 @@ class FirewallPluginDbTestCase(test_db_plugin.NeutronDbPluginV2TestCase):
if res.status_int >= 400:
raise webob.exc.HTTPClientError(code=res.status_int)
firewall_rule = self.deserialize(fmt or self.fmt, res)
try:
yield firewall_rule
finally:
if not no_delete:
self._delete('firewall_rules',
firewall_rule['firewall_rule']['id'])
yield firewall_rule
if not no_delete:
self._delete('firewall_rules',
firewall_rule['firewall_rule']['id'])
def _create_firewall(self, fmt, name, description, firewall_policy_id,
admin_state_up=True, expected_res_status=None,
@ -275,11 +271,9 @@ class FirewallPluginDbTestCase(test_db_plugin.NeutronDbPluginV2TestCase):
if res.status_int >= 400:
raise webob.exc.HTTPClientError(code=res.status_int)
firewall = self.deserialize(fmt or self.fmt, res)
try:
yield firewall
finally:
if not no_delete:
self._delete('firewalls', firewall['firewall']['id'])
yield firewall
if not no_delete:
self._delete('firewalls', firewall['firewall']['id'])
def _rule_action(self, action, id, firewall_rule_id, insert_before=None,
insert_after=None, expected_code=webob.exc.HTTPOk.code,

View File

@ -227,12 +227,10 @@ class LoadBalancerTestMixin(object):
explanation=_("Unexpected error code: %s") %
res.status_int
)
try:
vip = self.deserialize(fmt or self.fmt, res)
yield vip
finally:
if not no_delete:
self._delete('vips', vip['vip']['id'])
vip = self.deserialize(fmt or self.fmt, res)
yield vip
if not no_delete:
self._delete('vips', vip['vip']['id'])
@contextlib.contextmanager
def pool(self, fmt=None, name='pool1', lb_method='ROUND_ROBIN',
@ -250,12 +248,10 @@ class LoadBalancerTestMixin(object):
raise webob.exc.HTTPClientError(
explanation=_("Unexpected error code: %s") % res.status_int
)
try:
pool = self.deserialize(fmt or self.fmt, res)
yield pool
finally:
if not no_delete:
self._delete('pools', pool['pool']['id'])
pool = self.deserialize(fmt or self.fmt, res)
yield pool
if not no_delete:
self._delete('pools', pool['pool']['id'])
@contextlib.contextmanager
def member(self, fmt=None, address='192.168.1.100', protocol_port=80,
@ -271,12 +267,10 @@ class LoadBalancerTestMixin(object):
raise webob.exc.HTTPClientError(
explanation=_("Unexpected error code: %s") % res.status_int
)
try:
member = self.deserialize(fmt or self.fmt, res)
yield member
finally:
if not no_delete:
self._delete('members', member['member']['id'])
member = self.deserialize(fmt or self.fmt, res)
yield member
if not no_delete:
self._delete('members', member['member']['id'])
@contextlib.contextmanager
def health_monitor(self, fmt=None, type='TCP',
@ -310,11 +304,9 @@ class LoadBalancerTestMixin(object):
else:
for arg in http_related_attributes:
self.assertIsNone(the_health_monitor.get(arg))
try:
yield health_monitor
finally:
if not no_delete:
self._delete('health_monitors', the_health_monitor['id'])
yield health_monitor
if not no_delete:
self._delete('health_monitors', the_health_monitor['id'])
class LoadBalancerPluginDbTestCase(LoadBalancerTestMixin,

View File

@ -96,12 +96,10 @@ class MeteringPluginDbTestCaseMixin(object):
fmt = self.fmt
metering_label = self._make_metering_label(fmt, name,
description, **kwargs)
try:
yield metering_label
finally:
if not no_delete:
self._delete('metering-labels',
metering_label['metering_label']['id'])
yield metering_label
if not no_delete:
self._delete('metering-labels',
metering_label['metering_label']['id'])
@contextlib.contextmanager
def metering_label_rule(self, metering_label_id=None, direction='ingress',
@ -114,12 +112,10 @@ class MeteringPluginDbTestCaseMixin(object):
direction,
remote_ip_prefix,
excluded)
try:
yield metering_label_rule
finally:
if not no_delete:
self._delete('metering-label-rules',
metering_label_rule['metering_label_rule']['id'])
yield metering_label_rule
if not no_delete:
self._delete('metering-label-rules',
metering_label_rule['metering_label_rule']['id'])
class MeteringPluginDbTestCase(test_db_plugin.NeutronDbPluginV2TestCase,

View File

@ -125,12 +125,10 @@ class VPNTestMixin(object):
**kwargs)
if res.status_int >= 400:
raise webob.exc.HTTPClientError(code=res.status_int)
try:
ikepolicy = self.deserialize(fmt or self.fmt, res)
yield ikepolicy
finally:
if not no_delete:
self._delete('ikepolicies', ikepolicy['ikepolicy']['id'])
ikepolicy = self.deserialize(fmt or self.fmt, res)
yield ikepolicy
if not no_delete:
self._delete('ikepolicies', ikepolicy['ikepolicy']['id'])
def _create_ipsecpolicy(self, fmt,
name='ipsecpolicy1',
@ -188,12 +186,10 @@ class VPNTestMixin(object):
**kwargs)
if res.status_int >= 400:
raise webob.exc.HTTPClientError(code=res.status_int)
try:
ipsecpolicy = self.deserialize(fmt or self.fmt, res)
yield ipsecpolicy
finally:
if not no_delete:
self._delete('ipsecpolicies', ipsecpolicy['ipsecpolicy']['id'])
ipsecpolicy = self.deserialize(fmt or self.fmt, res)
yield ipsecpolicy
if not no_delete:
self._delete('ipsecpolicies', ipsecpolicy['ipsecpolicy']['id'])
def _create_vpnservice(self, fmt, name,
admin_state_up,
@ -250,36 +246,37 @@ class VPNTestMixin(object):
'add',
tmp_router['router']['id'],
tmp_subnet['subnet']['id'], None)
try:
res = self._create_vpnservice(fmt,
name,
admin_state_up,
router_id=(tmp_router['router']
['id']),
subnet_id=(tmp_subnet['subnet']
['id']),
**kwargs)
vpnservice = self.deserialize(fmt or self.fmt, res)
if res.status_int >= 400:
raise webob.exc.HTTPClientError(
code=res.status_int, detail=vpnservice)
res = self._create_vpnservice(fmt,
name,
admin_state_up,
router_id=(tmp_router['router']
['id']),
subnet_id=(tmp_subnet['subnet']
['id']),
**kwargs)
vpnservice = self.deserialize(fmt or self.fmt, res)
if res.status_int < 400:
yield vpnservice
finally:
if not no_delete and vpnservice.get('vpnservice'):
self._delete('vpnservices',
vpnservice['vpnservice']['id'])
if plug_subnet:
self._router_interface_action(
'remove',
tmp_router['router']['id'],
tmp_subnet['subnet']['id'], None)
if external_router:
external_gateway = tmp_router['router'].get(
'external_gateway_info')
if external_gateway:
network_id = external_gateway['network_id']
self._remove_external_gateway_from_router(
tmp_router['router']['id'], network_id)
if not no_delete and vpnservice.get('vpnservice'):
self._delete('vpnservices',
vpnservice['vpnservice']['id'])
if plug_subnet:
self._router_interface_action(
'remove',
tmp_router['router']['id'],
tmp_subnet['subnet']['id'], None)
if external_router:
external_gateway = tmp_router['router'].get(
'external_gateway_info')
if external_gateway:
network_id = external_gateway['network_id']
self._remove_external_gateway_from_router(
tmp_router['router']['id'], network_id)
if res.status_int >= 400:
raise webob.exc.HTTPClientError(
code=res.status_int, detail=vpnservice)
def _create_ipsec_site_connection(self, fmt, name='test',
peer_address='192.168.1.10',
@ -379,18 +376,18 @@ class VPNTestMixin(object):
**kwargs)
if res.status_int >= 400:
raise webob.exc.HTTPClientError(code=res.status_int)
try:
ipsec_site_connection = self.deserialize(
fmt or self.fmt, res
ipsec_site_connection = self.deserialize(
fmt or self.fmt, res
)
yield ipsec_site_connection
if not no_delete:
self._delete(
'ipsec-site-connections',
ipsec_site_connection[
'ipsec_site_connection']['id']
)
yield ipsec_site_connection
finally:
if not no_delete:
self._delete(
'ipsec-site-connections',
ipsec_site_connection[
'ipsec_site_connection']['id']
)
def _check_ipsec_site_connection(self, ipsec_site_connection, keys, dpd):
self.assertEqual(

View File

@ -97,11 +97,9 @@ class TestNecPluginPacketFilterBase(test_nec_plugin.NecPluginV2TestCase):
with test_plugin.optional_ctx(network, self.network) as network_to_use:
net_id = network_to_use['network']['id']
pf = self._make_packet_filter(fmt or self.fmt, net_id, **kwargs)
try:
yield pf
finally:
if do_delete:
self._delete('packet_filters', pf['packet_filter']['id'])
yield pf
if do_delete:
self._delete('packet_filters', pf['packet_filter']['id'])
@contextlib.contextmanager
def packet_filter_on_port(self, port=None, fmt=None, do_delete=True,
@ -121,11 +119,9 @@ class TestNecPluginPacketFilterBase(test_nec_plugin.NecPluginV2TestCase):
kwargs['in_port'] = port_id
pf = self._make_packet_filter(fmt or self.fmt, net_id, **kwargs)
self.assertEqual(port_id, pf['packet_filter']['in_port'])
try:
yield pf
finally:
if do_delete:
self._delete('packet_filters', pf['packet_filter']['id'])
yield pf
if do_delete:
self._delete('packet_filters', pf['packet_filter']['id'])
class TestNecPluginPacketFilter(TestNecPluginPacketFilterBase):

View File

@ -63,11 +63,9 @@ class NetPartitionTestCase(test_nuage_plugin.NuagePluginV2TestCase):
**kwargs):
netpart = self._make_netpartition(fmt or self.fmt, name)
try:
yield netpart
finally:
if do_delete:
self._del_netpartition(netpart['net_partition']['id'])
yield netpart
if do_delete:
self._del_netpartition(netpart['net_partition']['id'])
def test_create_netpartition(self):
name = 'netpart1'

View File

@ -523,15 +523,13 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
**kwargs):
network = self._make_network(fmt or self.fmt, name,
admin_state_up, **kwargs)
try:
yield network
finally:
if do_delete:
# The do_delete parameter allows you to control whether the
# created network is immediately deleted again. Therefore, this
# function is also usable in tests, which require the creation
# of many networks.
self._delete('networks', network['network']['id'])
yield network
if do_delete:
# The do_delete parameter allows you to control whether the
# created network is immediately deleted again. Therefore, this
# function is also usable in tests, which require the creation
# of many networks.
self._delete('networks', network['network']['id'])
@contextlib.contextmanager
def subnet(self, network=None,
@ -560,11 +558,9 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
shared=shared,
ipv6_ra_mode=ipv6_ra_mode,
ipv6_address_mode=ipv6_address_mode)
try:
yield subnet
finally:
if do_delete:
self._delete('subnets', subnet['subnet']['id'])
yield subnet
if do_delete:
self._delete('subnets', subnet['subnet']['id'])
@contextlib.contextmanager
def port(self, subnet=None, fmt=None, no_delete=False,
@ -572,11 +568,9 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
with optional_ctx(subnet, self.subnet) as subnet_to_use:
net_id = subnet_to_use['subnet']['network_id']
port = self._make_port(fmt or self.fmt, net_id, **kwargs)
try:
yield port
finally:
if not no_delete:
self._delete('ports', port['port']['id'])
yield port
if not no_delete:
self._delete('ports', port['port']['id'])
def _test_list_with_sort(self, resource,
items, sorts, resources=None, query_params=''):

View File

@ -126,12 +126,10 @@ class SecurityGroupsTestCase(test_db_plugin.NeutronDbPluginV2TestCase):
if not fmt:
fmt = self.fmt
security_group = self._make_security_group(fmt, name, description)
try:
yield security_group
finally:
if not no_delete:
self._delete('security-groups',
security_group['security_group']['id'])
yield security_group
if not no_delete:
self._delete('security-groups',
security_group['security_group']['id'])
@contextlib.contextmanager
def security_group_rule(self, security_group_id='4cd70774-cc67-4a87-9b39-7'
@ -150,12 +148,10 @@ class SecurityGroupsTestCase(test_db_plugin.NeutronDbPluginV2TestCase):
remote_group_id,
ethertype=ethertype)
security_group_rule = self._make_security_group_rule(self.fmt, rule)
try:
yield security_group_rule
finally:
if not no_delete:
self._delete('security-group-rules',
security_group_rule['security_group_rule']['id'])
yield security_group_rule
if not no_delete:
self._delete('security-group-rules',
security_group_rule['security_group_rule']['id'])
def _delete_default_security_group_egress_rules(self, security_group_id):
"""Deletes default egress rules given a security group ID."""

View File

@ -358,10 +358,8 @@ class L3NatTestCaseMixin(object):
router = self._make_router(fmt or self.fmt, tenant_id, name,
admin_state_up, external_gateway_info,
set_context, **kwargs)
try:
yield router
finally:
self._delete('routers', router['router']['id'])
yield router
self._delete('routers', router['router']['id'])
def _set_net_external(self, net_id):
self._update('networks', net_id,
@ -413,31 +411,31 @@ class L3NatTestCaseMixin(object):
sid = private_port['port']['fixed_ips'][0]['subnet_id']
private_sub = {'subnet': {'id': sid}}
floatingip = None
try:
self._add_external_gateway_to_router(
r['router']['id'],
public_sub['subnet']['network_id'])
self._router_interface_action(
'add', r['router']['id'],
private_sub['subnet']['id'], None)
floatingip = self._make_floatingip(
fmt or self.fmt,
public_sub['subnet']['network_id'],
port_id=private_port['port']['id'],
fixed_ip=fixed_ip,
set_context=False)
yield floatingip
finally:
if floatingip:
self._delete('floatingips',
floatingip['floatingip']['id'])
self._router_interface_action(
'remove', r['router']['id'],
private_sub['subnet']['id'], None)
self._remove_external_gateway_from_router(
r['router']['id'],
public_sub['subnet']['network_id'])
self._add_external_gateway_to_router(
r['router']['id'],
public_sub['subnet']['network_id'])
self._router_interface_action(
'add', r['router']['id'],
private_sub['subnet']['id'], None)
floatingip = self._make_floatingip(
fmt or self.fmt,
public_sub['subnet']['network_id'],
port_id=private_port['port']['id'],
fixed_ip=fixed_ip,
set_context=False)
yield floatingip
if floatingip:
self._delete('floatingips',
floatingip['floatingip']['id'])
self._router_interface_action(
'remove', r['router']['id'],
private_sub['subnet']['id'], None)
self._remove_external_gateway_from_router(
r['router']['id'],
public_sub['subnet']['network_id'])
@contextlib.contextmanager
def floatingip_no_assoc_with_public_sub(
@ -445,29 +443,29 @@ class L3NatTestCaseMixin(object):
self._set_net_external(public_sub['subnet']['network_id'])
with self.router() as r:
floatingip = None
try:
self._add_external_gateway_to_router(
r['router']['id'],
public_sub['subnet']['network_id'])
self._router_interface_action('add', r['router']['id'],
private_sub['subnet']['id'],
None)
floatingip = self._make_floatingip(
fmt or self.fmt,
public_sub['subnet']['network_id'],
set_context=set_context)
yield floatingip, r
finally:
if floatingip:
self._delete('floatingips',
floatingip['floatingip']['id'])
self._router_interface_action('remove', r['router']['id'],
private_sub['subnet']['id'],
None)
self._remove_external_gateway_from_router(
r['router']['id'],
public_sub['subnet']['network_id'])
self._add_external_gateway_to_router(
r['router']['id'],
public_sub['subnet']['network_id'])
self._router_interface_action('add', r['router']['id'],
private_sub['subnet']['id'],
None)
floatingip = self._make_floatingip(
fmt or self.fmt,
public_sub['subnet']['network_id'],
set_context=set_context)
yield floatingip, r
if floatingip:
self._delete('floatingips',
floatingip['floatingip']['id'])
self._router_interface_action('remove', r['router']['id'],
private_sub['subnet']['id'],
None)
self._remove_external_gateway_from_router(
r['router']['id'],
public_sub['subnet']['network_id'])
@contextlib.contextmanager
def floatingip_no_assoc(self, private_sub, fmt=None, set_context=False):

View File

@ -117,12 +117,12 @@ class L3SchedulerTestCase(l3_agentschedulers_db.L3AgentSchedulerDbMixin,
self._add_external_gateway_to_router(
router['router']['id'],
subnet['subnet']['network_id'])
try:
yield router
finally:
self._remove_external_gateway_from_router(
router['router']['id'], subnet['subnet']['network_id'])
self._delete('routers', router['router']['id'])
yield router
self._remove_external_gateway_from_router(
router['router']['id'], subnet['subnet']['network_id'])
self._delete('routers', router['router']['id'])
class L3AgentChanceSchedulerTestCase(L3SchedulerTestCase):

View File

@ -76,12 +76,12 @@ class TestQoSQueue(NsxPluginV2TestCase):
qos_queue = self.deserialize('json', res)
if res.status_int >= 400:
raise webob.exc.HTTPClientError(code=res.status_int)
try:
yield qos_queue
finally:
if not no_delete:
self._delete('qos-queues',
qos_queue['qos_queue']['id'])
yield qos_queue
if not no_delete:
self._delete('qos-queues',
qos_queue['qos_queue']['id'])
def test_create_qos_queue(self):
with self.qos_queue(name='fake_lqueue', min=34, max=44,

View File

@ -355,17 +355,16 @@ class SyncTestCase(base.BaseTestCase):
routers.append(self._plugin.create_router(ctx, router(i)))
# Do not return anything as the user does need the actual
# data created
try:
yield
finally:
# Remove everything
for router in routers:
self._plugin.delete_router(ctx, router['id'])
for port in ports:
self._plugin.delete_port(ctx, port['id'])
# This will remove networks and subnets
for network in networks:
self._plugin.delete_network(ctx, network['id'])
yield
# Remove everything
for router in routers:
self._plugin.delete_router(ctx, router['id'])
for port in ports:
self._plugin.delete_port(ctx, port['id'])
# This will remove networks and subnets
for network in networks:
self._plugin.delete_network(ctx, network['id'])
def _get_tag_dict(self, tags):
return dict((tag['scope'], tag['tag']) for tag in tags)

View File

@ -85,16 +85,16 @@ class TestVpnPlugin(test_db_vpnaas.VPNTestMixin,
data = {'router': {'tenant_id': self._tenant_id}}
data['router']['service_router'] = True
router_req = self.new_create_request('routers', data, self.fmt)
try:
res = router_req.get_response(self.ext_api)
router = self.deserialize(self.fmt, res)
self._add_external_gateway_to_router(
router['router']['id'],
s['subnet']['network_id'])
router = self._show('routers', router['router']['id'])
yield router
finally:
self._delete('routers', router['router']['id'])
res = router_req.get_response(self.ext_api)
router = self.deserialize(self.fmt, res)
self._add_external_gateway_to_router(
router['router']['id'],
s['subnet']['network_id'])
router = self._show('routers', router['router']['id'])
yield router
self._delete('routers', router['router']['id'])
def test_create_vpnservice(self, **extras):
"""Test case to create a vpnservice."""