ML2: Add tests to validate quota usage tracking
Ensure that event handlers are invoked upon completion of
ML2 operations which add or remove tracked resources.
Also validate that the event handlers are called for the
appropriate resources and that quota usage's dirty bit
is set and unset as expected.
These are not unit tests, but added in the unit test tree
as they leverage code both from the DB unit test and the ML2
unit test framework. This module has indeed been added to
the 'exclusion list' in check_unit_test_structure.sh, and
should be moved to the functional test tree together with
the other modules.
Conflicts:
neutron/quota/resource.py
Closes-Bug: #1499358
Change-Id: I78c432c35f3f3339607cd533019ae6d0fa2a5cd6
(cherry picked from commit cdd049e4c4
)
This commit is contained in:
parent
1e52b28cc9
commit
f175cd751b
@ -18,10 +18,11 @@ from oslo_db import exception as oslo_db_exception
|
||||
from oslo_log import log
|
||||
from oslo_utils import excutils
|
||||
from sqlalchemy import event
|
||||
from sqlalchemy import exc as sql_exc
|
||||
|
||||
from neutron.db import api as db_api
|
||||
from neutron.db.quota import api as quota_api
|
||||
from neutron.i18n import _LE
|
||||
from neutron.i18n import _LE, _LW
|
||||
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
@ -300,5 +301,11 @@ class TrackedResource(BaseResource):
|
||||
event.listen(self._model_class, 'after_delete', self._db_event_handler)
|
||||
|
||||
def unregister_events(self):
|
||||
event.remove(self._model_class, 'after_insert', self._db_event_handler)
|
||||
event.remove(self._model_class, 'after_delete', self._db_event_handler)
|
||||
try:
|
||||
event.remove(self._model_class, 'after_insert',
|
||||
self._db_event_handler)
|
||||
event.remove(self._model_class, 'after_delete',
|
||||
self._db_event_handler)
|
||||
except sql_exc.InvalidRequestError:
|
||||
LOG.warning(_LW("No sqlalchemy event for resource %s found"),
|
||||
self.name)
|
||||
|
292
neutron/tests/unit/plugins/ml2/test_tracked_resources.py
Normal file
292
neutron/tests/unit/plugins/ml2/test_tracked_resources.py
Normal file
@ -0,0 +1,292 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from neutron import context
|
||||
from neutron.db.quota import api as quota_db_api
|
||||
from neutron.tests.unit.extensions import test_securitygroup
|
||||
from neutron.tests.unit.plugins.ml2 import test_plugin
|
||||
|
||||
|
||||
class SgTestCaseWrapper(test_securitygroup.SecurityGroupDBTestCase):
|
||||
# This wrapper class enables Ml2PluginV2TestCase to correctly call the
|
||||
# setup method in SecurityGroupDBTestCase which does not accept the
|
||||
# service_plugins keyword parameter.
|
||||
|
||||
def setUp(self, plugin, **kwargs):
|
||||
super(SgTestCaseWrapper, self).setUp(plugin)
|
||||
|
||||
|
||||
class BaseTestTrackedResources(test_plugin.Ml2PluginV2TestCase,
|
||||
SgTestCaseWrapper):
|
||||
|
||||
def setUp(self):
|
||||
self.ctx = context.get_admin_context()
|
||||
# Prevent noise from default security group operations
|
||||
def_sec_group_patch = mock.patch(
|
||||
'neutron.db.securitygroups_db.SecurityGroupDbMixin.'
|
||||
'_ensure_default_security_group')
|
||||
def_sec_group_patch.start()
|
||||
get_sec_group_port_patch = mock.patch(
|
||||
'neutron.db.securitygroups_db.SecurityGroupDbMixin.'
|
||||
'_get_security_groups_on_port')
|
||||
get_sec_group_port_patch.start()
|
||||
super(BaseTestTrackedResources, self).setUp()
|
||||
self._tenant_id = uuidutils.generate_uuid()
|
||||
|
||||
def _test_init(self, resource_name):
|
||||
quota_db_api.set_quota_usage(
|
||||
self.ctx, resource_name, self._tenant_id)
|
||||
|
||||
|
||||
class TestTrackedResourcesEventHandler(BaseTestTrackedResources):
|
||||
|
||||
def setUp(self):
|
||||
handler_patch = mock.patch(
|
||||
'neutron.quota.resource.TrackedResource._db_event_handler')
|
||||
self.handler_mock = handler_patch.start()
|
||||
super(TestTrackedResourcesEventHandler, self).setUp()
|
||||
|
||||
def _verify_event_handler_calls(self, data, expected_call_count=1):
|
||||
if not hasattr(data, '__iter__') or isinstance(data, dict):
|
||||
data = [data]
|
||||
self.assertEqual(expected_call_count, self.handler_mock.call_count)
|
||||
call_idx = -1
|
||||
for item in data:
|
||||
if item:
|
||||
model = self.handler_mock.call_args_list[call_idx][0][-1]
|
||||
self.assertEqual(model['id'], item['id'])
|
||||
self.assertEqual(model['tenant_id'], item['tenant_id'])
|
||||
call_idx = call_idx - 1
|
||||
|
||||
def test_create_delete_network_triggers_event(self):
|
||||
self._test_init('network')
|
||||
net = self._make_network('json', 'meh', True)['network']
|
||||
self._verify_event_handler_calls(net)
|
||||
self._delete('networks', net['id'])
|
||||
self._verify_event_handler_calls(net, expected_call_count=2)
|
||||
|
||||
def test_create_delete_port_triggers_event(self):
|
||||
self._test_init('port')
|
||||
net = self._make_network('json', 'meh', True)['network']
|
||||
port = self._make_port('json', net['id'])['port']
|
||||
# Expecting 2 calls - 1 for the network, 1 for the port
|
||||
self._verify_event_handler_calls(port, expected_call_count=2)
|
||||
self._delete('ports', port['id'])
|
||||
self._verify_event_handler_calls(port, expected_call_count=3)
|
||||
|
||||
def test_create_delete_subnet_triggers_event(self):
|
||||
self._test_init('subnet')
|
||||
net = self._make_network('json', 'meh', True)
|
||||
subnet = self._make_subnet('json', net, '10.0.0.1',
|
||||
'10.0.0.0/24')['subnet']
|
||||
# Expecting 2 calls - 1 for the network, 1 for the subnet
|
||||
self._verify_event_handler_calls([subnet, net['network']],
|
||||
expected_call_count=2)
|
||||
self._delete('subnets', subnet['id'])
|
||||
self._verify_event_handler_calls(subnet, expected_call_count=3)
|
||||
|
||||
def test_create_delete_network_with_subnet_triggers_event(self):
|
||||
self._test_init('network')
|
||||
self._test_init('subnet')
|
||||
net = self._make_network('json', 'meh', True)
|
||||
subnet = self._make_subnet('json', net, '10.0.0.1',
|
||||
'10.0.0.0/24')['subnet']
|
||||
# Expecting 2 calls - 1 for the network, 1 for the subnet
|
||||
self._verify_event_handler_calls([subnet, net['network']],
|
||||
expected_call_count=2)
|
||||
self._delete('networks', net['network']['id'])
|
||||
# Expecting 2 more calls - 1 for the network, 1 for the subnet
|
||||
self._verify_event_handler_calls([net['network'], subnet],
|
||||
expected_call_count=4)
|
||||
|
||||
def test_create_delete_subnetpool_triggers_event(self):
|
||||
self._test_init('subnetpool')
|
||||
pool = self._make_subnetpool('json', ['10.0.0.0/8'],
|
||||
name='meh',
|
||||
tenant_id=self._tenant_id)['subnetpool']
|
||||
self._verify_event_handler_calls(pool)
|
||||
self._delete('subnetpools', pool['id'])
|
||||
self._verify_event_handler_calls(pool, expected_call_count=2)
|
||||
|
||||
def test_create_delete_securitygroup_triggers_event(self):
|
||||
self._test_init('security_group')
|
||||
sec_group = self._make_security_group(
|
||||
'json', 'meh', 'meh', tenant_id=self._tenant_id)['security_group']
|
||||
# When a security group is created it also creates 2 rules, therefore
|
||||
# there will be three calls and we need to verify the first
|
||||
self._verify_event_handler_calls([None, None, sec_group],
|
||||
expected_call_count=3)
|
||||
self._delete('security-groups', sec_group['id'])
|
||||
# When a security group is deleted it also removes the 2 rules
|
||||
# generated upon creation
|
||||
self._verify_event_handler_calls(sec_group, expected_call_count=6)
|
||||
|
||||
def test_create_delete_securitygrouprule_triggers_event(self):
|
||||
self._test_init('security_group_rule')
|
||||
sec_group = self._make_security_group(
|
||||
'json', 'meh', 'meh', tenant_id=self._tenant_id)['security_group']
|
||||
rule_req = self._build_security_group_rule(
|
||||
sec_group['id'], 'ingress', 'TCP', tenant_id=self._tenant_id)
|
||||
sec_group_rule = self._make_security_group_rule(
|
||||
'json', rule_req)['security_group_rule']
|
||||
# When a security group is created it also creates 2 rules, therefore
|
||||
# there will be four calls in total to the event handler
|
||||
self._verify_event_handler_calls(sec_group_rule, expected_call_count=4)
|
||||
self._delete('security-group-rules', sec_group_rule['id'])
|
||||
self._verify_event_handler_calls(sec_group_rule, expected_call_count=5)
|
||||
|
||||
|
||||
class TestTrackedResources(BaseTestTrackedResources):
|
||||
|
||||
def _verify_dirty_bit(self, resource_name, expected_value=True):
|
||||
usage = quota_db_api.get_quota_usage_by_resource_and_tenant(
|
||||
self.ctx, resource_name, self._tenant_id)
|
||||
self.assertEqual(expected_value, usage.dirty)
|
||||
|
||||
def test_create_delete_network_marks_dirty(self):
|
||||
self._test_init('network')
|
||||
net = self._make_network('json', 'meh', True)['network']
|
||||
self._verify_dirty_bit('network')
|
||||
# Clear the dirty bit
|
||||
quota_db_api.set_quota_usage_dirty(
|
||||
self.ctx, 'network', self._tenant_id, dirty=False)
|
||||
self._delete('networks', net['id'])
|
||||
self._verify_dirty_bit('network')
|
||||
|
||||
def test_list_networks_clears_dirty(self):
|
||||
self._test_init('network')
|
||||
net = self._make_network('json', 'meh', True)['network']
|
||||
self.ctx.tenant_id = net['tenant_id']
|
||||
self._list('networks', neutron_context=self.ctx)
|
||||
self._verify_dirty_bit('network', expected_value=False)
|
||||
|
||||
def test_create_delete_port_marks_dirty(self):
|
||||
self._test_init('port')
|
||||
net = self._make_network('json', 'meh', True)['network']
|
||||
port = self._make_port('json', net['id'])['port']
|
||||
self._verify_dirty_bit('port')
|
||||
# Clear the dirty bit
|
||||
quota_db_api.set_quota_usage_dirty(
|
||||
self.ctx, 'port', self._tenant_id, dirty=False)
|
||||
self._delete('ports', port['id'])
|
||||
self._verify_dirty_bit('port')
|
||||
|
||||
def test_list_ports_clears_dirty(self):
|
||||
self._test_init('port')
|
||||
net = self._make_network('json', 'meh', True)['network']
|
||||
port = self._make_port('json', net['id'])['port']
|
||||
self.ctx.tenant_id = port['tenant_id']
|
||||
self._list('ports', neutron_context=self.ctx)
|
||||
self._verify_dirty_bit('port', expected_value=False)
|
||||
|
||||
def test_create_delete_subnet_marks_dirty(self):
|
||||
self._test_init('subnet')
|
||||
net = self._make_network('json', 'meh', True)
|
||||
subnet = self._make_subnet('json', net, '10.0.0.1',
|
||||
'10.0.0.0/24')['subnet']
|
||||
self._verify_dirty_bit('subnet')
|
||||
# Clear the dirty bit
|
||||
quota_db_api.set_quota_usage_dirty(
|
||||
self.ctx, 'subnet', self._tenant_id, dirty=False)
|
||||
self._delete('subnets', subnet['id'])
|
||||
self._verify_dirty_bit('subnet')
|
||||
|
||||
def test_create_delete_network_with_subnet_marks_dirty(self):
|
||||
self._test_init('network')
|
||||
self._test_init('subnet')
|
||||
net = self._make_network('json', 'meh', True)
|
||||
self._make_subnet('json', net, '10.0.0.1',
|
||||
'10.0.0.0/24')['subnet']
|
||||
self._verify_dirty_bit('subnet')
|
||||
# Clear the dirty bit
|
||||
quota_db_api.set_quota_usage_dirty(
|
||||
self.ctx, 'subnet', self._tenant_id, dirty=False)
|
||||
self._delete('networks', net['network']['id'])
|
||||
self._verify_dirty_bit('network')
|
||||
self._verify_dirty_bit('subnet')
|
||||
|
||||
def test_list_subnets_clears_dirty(self):
|
||||
self._test_init('subnet')
|
||||
net = self._make_network('json', 'meh', True)
|
||||
subnet = self._make_subnet('json', net, '10.0.0.1',
|
||||
'10.0.0.0/24')['subnet']
|
||||
self.ctx.tenant_id = subnet['tenant_id']
|
||||
self._list('subnets', neutron_context=self.ctx)
|
||||
self._verify_dirty_bit('subnet', expected_value=False)
|
||||
|
||||
def test_create_delete_subnetpool_marks_dirty(self):
|
||||
self._test_init('subnetpool')
|
||||
pool = self._make_subnetpool('json', ['10.0.0.0/8'],
|
||||
name='meh',
|
||||
tenant_id=self._tenant_id)['subnetpool']
|
||||
self._verify_dirty_bit('subnetpool')
|
||||
# Clear the dirty bit
|
||||
quota_db_api.set_quota_usage_dirty(
|
||||
self.ctx, 'subnetpool', self._tenant_id, dirty=False)
|
||||
self._delete('subnetpools', pool['id'])
|
||||
self._verify_dirty_bit('subnetpool')
|
||||
|
||||
def test_list_subnetpools_clears_dirty(self):
|
||||
self._test_init('subnetpool')
|
||||
pool = self._make_subnetpool('json', ['10.0.0.0/8'],
|
||||
name='meh',
|
||||
tenant_id=self._tenant_id)['subnetpool']
|
||||
self.ctx.tenant_id = pool['tenant_id']
|
||||
self._list('subnetpools', neutron_context=self.ctx)
|
||||
self._verify_dirty_bit('subnetpool', expected_value=False)
|
||||
|
||||
def test_create_delete_securitygroup_marks_dirty(self):
|
||||
self._test_init('security_group')
|
||||
sec_group = self._make_security_group(
|
||||
'json', 'meh', 'meh', tenant_id=self._tenant_id)['security_group']
|
||||
self._verify_dirty_bit('security_group')
|
||||
# Clear the dirty bit
|
||||
quota_db_api.set_quota_usage_dirty(
|
||||
self.ctx, 'security_group', self._tenant_id, dirty=False)
|
||||
self._delete('security-groups', sec_group['id'])
|
||||
self._verify_dirty_bit('security_group')
|
||||
|
||||
def test_list_securitygroups_clears_dirty(self):
|
||||
self._test_init('security_group')
|
||||
self._make_security_group(
|
||||
'json', 'meh', 'meh', tenant_id=self._tenant_id)['security_group']
|
||||
self.ctx.tenant_id = self._tenant_id
|
||||
self._list('security-groups', neutron_context=self.ctx)
|
||||
self._verify_dirty_bit('security_group', expected_value=False)
|
||||
|
||||
def test_create_delete_securitygrouprule_marks_dirty(self):
|
||||
self._test_init('security_group_rule')
|
||||
sec_group = self._make_security_group(
|
||||
'json', 'meh', 'meh', tenant_id=self._tenant_id)['security_group']
|
||||
rule_req = self._build_security_group_rule(
|
||||
sec_group['id'], 'ingress', 'TCP', tenant_id=self._tenant_id)
|
||||
sec_group_rule = self._make_security_group_rule(
|
||||
'json', rule_req)['security_group_rule']
|
||||
self._verify_dirty_bit('security_group_rule')
|
||||
# Clear the dirty bit
|
||||
quota_db_api.set_quota_usage_dirty(
|
||||
self.ctx, 'security_group_rule', self._tenant_id, dirty=False)
|
||||
self._delete('security-group-rules', sec_group_rule['id'])
|
||||
self._verify_dirty_bit('security_group_rule')
|
||||
|
||||
def test_list_securitygrouprules_clears_dirty(self):
|
||||
self._test_init('security_group_rule')
|
||||
self._make_security_group(
|
||||
'json', 'meh', 'meh', tenant_id=self._tenant_id)['security_group']
|
||||
# As the security group create operation also creates 2 security group
|
||||
# rules there is no need to explicitly create any rule
|
||||
self.ctx.tenant_id = self._tenant_id
|
||||
self._list('security-group-rules', neutron_context=self.ctx)
|
||||
self._verify_dirty_bit('security_group_rule', expected_value=False)
|
@ -26,6 +26,7 @@ ignore_regexes=(
|
||||
"^plugins/ml2/test_extension_driver_api.py$"
|
||||
"^plugins/ml2/test_ext_portsecurity.py$"
|
||||
"^plugins/ml2/test_agent_scheduler.py$"
|
||||
"^plugins/ml2/test_tracked_resources.py$"
|
||||
"^plugins/ml2/drivers/openvswitch/agent/test_agent_scheduler.py$"
|
||||
"^plugins/ml2/drivers/openvswitch/agent/test_ovs_tunnel.py$"
|
||||
"^plugins/openvswitch/test_agent_scheduler.py$"
|
||||
|
Loading…
Reference in New Issue
Block a user