diff --git a/neutron/api/v2/attributes.py b/neutron/api/v2/attributes.py index 694d07cfcb3..7a757977497 100644 --- a/neutron/api/v2/attributes.py +++ b/neutron/api/v2/attributes.py @@ -893,6 +893,7 @@ RESOURCE_FOREIGN_KEYS = { NETWORKS: 'network_id' } +# Store plural/singular mappings PLURALS = {NETWORKS: NETWORK, PORTS: PORT, SUBNETS: SUBNET, @@ -902,6 +903,31 @@ PLURALS = {NETWORKS: NETWORK, 'allocation_pools': 'allocation_pool', 'fixed_ips': 'fixed_ip', 'extensions': 'extension'} +# Store singular/plural mappings. This dictionary is populated by +# get_resource_info +REVERSED_PLURALS = {} + + +def get_collection_info(collection): + """Helper function to retrieve attribute info. + + :param collection: Collection or plural name of the resource + """ + return RESOURCE_ATTRIBUTE_MAP.get(collection) + + +def get_resource_info(resource): + """Helper function to retrive attribute info + + :param resource: resource name + """ + plural_name = REVERSED_PLURALS.get(resource) + if not plural_name: + for (plural, singular) in PLURALS.items(): + if singular == resource: + plural_name = plural + REVERSED_PLURALS[resource] = plural_name + return RESOURCE_ATTRIBUTE_MAP.get(plural_name) def fill_default_value(attr_info, res_dict, diff --git a/neutron/pecan_wsgi/app.py b/neutron/pecan_wsgi/app.py index 7860da5a7a5..67721f4425c 100644 --- a/neutron/pecan_wsgi/app.py +++ b/neutron/pecan_wsgi/app.py @@ -46,7 +46,7 @@ def setup_app(*args, **kwargs): hooks.ExceptionTranslationHook(), # priority 100 hooks.ContextHook(), # priority 95 hooks.MemberActionHook(), # piority 95 - hooks.AttributePopulationHook(), # priority 120 + hooks.BodyValidationHook(), # priority 120 hooks.OwnershipValidationHook(), # priority 125 hooks.QuotaEnforcementHook(), # priority 130 hooks.PolicyHook(), # priority 135 diff --git a/neutron/pecan_wsgi/controllers/root.py b/neutron/pecan_wsgi/controllers/root.py index 15b509d5e1a..977a7e58a7a 100644 --- a/neutron/pecan_wsgi/controllers/root.py +++ b/neutron/pecan_wsgi/controllers/root.py @@ -58,7 +58,9 @@ class RootController(object): versions = [builder.build(version) for version in _get_version_info()] return dict(versions=versions) + @when(index, method='HEAD') @when(index, method='POST') + @when(index, method='PATCH') @when(index, method='PUT') @when(index, method='DELETE') def not_supported(self): @@ -96,7 +98,9 @@ class V2Controller(object): builder = versions_view.get_view_builder(pecan.request) return dict(version=builder.build(self.version_info)) + @when(index, method='HEAD') @when(index, method='POST') + @when(index, method='PATCH') @when(index, method='PUT') @when(index, method='DELETE') def not_supported(self): @@ -110,9 +114,10 @@ class V2Controller(object): LOG.warn(_LW("No controller found for: %s - returning response " "code 404"), collection) pecan.abort(404) - # Store resource name in pecan request context so that hooks can - # leverage it if necessary + # Store resource and collection names in pecan request context so that + # hooks can leverage them if necessary request.context['resource'] = controller.resource + request.context['collection'] = collection return controller, remainder @@ -150,6 +155,8 @@ class CollectionsController(NeutronPecanController): @expose() def _lookup(self, item, *remainder): + # Store resource identifier in request context + request.context['resource_id'] = item return ItemController(self.resource, item), remainder @expose(generic=True) @@ -165,21 +172,36 @@ class CollectionsController(NeutronPecanController): filters = {k: _listify(v) for k, v in kwargs.items()} # TODO(kevinbenton): convert these using api_common.get_filters lister = getattr(self.plugin, 'get_%s' % self.collection) - neutron_context = request.context.get('neutron_context') + neutron_context = request.context['neutron_context'] return {self.collection: lister(neutron_context, filters=filters)} + @when(index, method='HEAD') + @when(index, method='PATCH') + @when(index, method='PUT') + @when(index, method='DELETE') + def not_supported(self): + pecan.abort(405) + @when(index, method='POST') def post(self, *args, **kwargs): # TODO(kevinbenton): emulated bulk! + resources = request.context['resources'] pecan.response.status = 201 - if request.bulk: + return self.create(resources) + + def create(self, resources): + if len(resources) > 1: + # Bulk! method = 'create_%s_bulk' % self.resource + key = self.collection + data = {key: [{self.resource: res} for res in resources]} else: method = 'create_%s' % self.resource + key = self.resource + data = {key: resources[0]} creator = getattr(self.plugin, method) - key = self.collection if request.bulk else self.resource - neutron_context = request.context.get('neutron_context') - return {key: creator(neutron_context, request.prepared_data)} + neutron_context = request.context['neutron_context'] + return {key: creator(neutron_context, data)} class ItemController(NeutronPecanController): @@ -194,25 +216,35 @@ class ItemController(NeutronPecanController): def get(self, *args, **kwargs): getter = getattr(self.plugin, 'get_%s' % self.resource) - neutron_context = request.context.get('neutron_context') + neutron_context = request.context['neutron_context'] return {self.resource: getter(neutron_context, self.item)} + @when(index, method='HEAD') + @when(index, method='POST') + @when(index, method='PATCH') + def not_supported(self): + pecan.abort(405) + @when(index, method='PUT') def put(self, *args, **kwargs): - neutron_context = request.context.get('neutron_context') + neutron_context = request.context['neutron_context'] + resources = request.context['resources'] if request.member_action: member_action_method = getattr(self.plugin, request.member_action) return member_action_method(neutron_context, self.item, - request.prepared_data) + resources[0]) # TODO(kevinbenton): bulk? updater = getattr(self.plugin, 'update_%s' % self.resource) - return updater(neutron_context, self.item, request.prepared_data) + # Bulk update is not supported, 'resources' always contains a single + # elemenet + data = {self.resource: resources[0]} + return updater(neutron_context, self.item, data) @when(index, method='DELETE') def delete(self): # TODO(kevinbenton): setting code could be in a decorator pecan.response.status = 204 - neutron_context = request.context.get('neutron_context') + neutron_context = request.context['neutron_context'] deleter = getattr(self.plugin, 'delete_%s' % self.resource) return deleter(neutron_context, self.item) diff --git a/neutron/pecan_wsgi/hooks/__init__.py b/neutron/pecan_wsgi/hooks/__init__.py index 7ac38da781f..a14a810e892 100644 --- a/neutron/pecan_wsgi/hooks/__init__.py +++ b/neutron/pecan_wsgi/hooks/__init__.py @@ -13,7 +13,7 @@ # License for the specific language governing permissions and limitations # under the License. -from neutron.pecan_wsgi.hooks import attribute_population +from neutron.pecan_wsgi.hooks import body_validation from neutron.pecan_wsgi.hooks import context from neutron.pecan_wsgi.hooks import member_action from neutron.pecan_wsgi.hooks import notifier @@ -26,7 +26,7 @@ from neutron.pecan_wsgi.hooks import translation ExceptionTranslationHook = translation.ExceptionTranslationHook ContextHook = context.ContextHook MemberActionHook = member_action.MemberActionHook -AttributePopulationHook = attribute_population.AttributePopulationHook +BodyValidationHook = body_validation.BodyValidationHook OwnershipValidationHook = ownership_validation.OwnershipValidationHook PolicyHook = policy_enforcement.PolicyHook QuotaEnforcementHook = quota_enforcement.QuotaEnforcementHook diff --git a/neutron/pecan_wsgi/hooks/attribute_population.py b/neutron/pecan_wsgi/hooks/attribute_population.py deleted file mode 100644 index 311a1580fb4..00000000000 --- a/neutron/pecan_wsgi/hooks/attribute_population.py +++ /dev/null @@ -1,104 +0,0 @@ -# Copyright (c) 2015 Mirantis, Inc. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -from pecan import hooks - -from neutron.api.v2 import attributes -from neutron.api.v2 import base as v2base -from neutron import manager - - -class AttributePopulationHook(hooks.PecanHook): - - priority = 120 - - def before(self, state): - state.request.prepared_data = {} - state.request.resources = [] - if state.request.method not in ('POST', 'PUT'): - return - is_create = state.request.method == 'POST' - resource = state.request.context.get('resource') - neutron_context = state.request.context['neutron_context'] - if not resource: - return - if state.request.member_action: - # Neutron currently does not describe request bodies for member - # actions in meh. prepare_request_body should not be called for - # member actions, and the body should be passed as it is. The - # plugin will do the validation (yuck). - state.request.prepared_data = state.request.json - else: - state.request.prepared_data = ( - v2base.Controller.prepare_request_body( - neutron_context, state.request.json, is_create, - resource, _attributes_for_resource(resource), - allow_bulk=True)) - # TODO(kevinbenton): conditional allow_bulk - - state.request.resources = _extract_resources_from_state(state) - # make the original object available: - if not is_create and not state.request.member_action: - obj_id = _pull_id_from_request(state.request, resource) - attrs = _attributes_for_resource(resource) - field_list = [name for (name, value) in attrs.items() - if (value.get('required_by_policy') or - value.get('primary_key') or - 'default' not in value)] - plugin = manager.NeutronManager.get_plugin_for_resource(resource) - getter = getattr(plugin, 'get_%s' % resource) - # TODO(kevinbenton): the parent_id logic currently in base.py - obj = getter(neutron_context, obj_id, fields=field_list) - state.request.original_object = obj - - -def _attributes_for_resource(resource): - if resource not in attributes.PLURALS.values(): - return {} - return attributes.RESOURCE_ATTRIBUTE_MAP.get( - _plural(resource), {}) - - -def _pull_id_from_request(request, resource): - # NOTE(kevinbenton): this sucks - # Converting /v2.0/ports/dbbdae29-82f6-49cf-b05e-3365bcc95b7a.json - # into dbbdae29-82f6-49cf-b05e-3365bcc95b7a - resources = _plural(resource) - jsontrail = request.path_info.replace('/v2.0/%s/' % resources, '') - obj_id = jsontrail.replace('.json', '') - return obj_id - - -def _plural(rtype): - for plural, single in attributes.PLURALS.items(): - if rtype == single: - return plural - - -def _extract_resources_from_state(state): - resource = state.request.context['resource'] - if not resource: - return [] - data = state.request.prepared_data - # single item - if resource in data: - state.request.bulk = False - return [data[resource]] - # multiple items - if _plural(resource) in data: - state.request.bulk = True - return data[_plural(resource)] - - return [] diff --git a/neutron/pecan_wsgi/hooks/body_validation.py b/neutron/pecan_wsgi/hooks/body_validation.py new file mode 100644 index 00000000000..6e957301a0b --- /dev/null +++ b/neutron/pecan_wsgi/hooks/body_validation.py @@ -0,0 +1,47 @@ +# Copyright (c) 2015 Mirantis, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from pecan import hooks + +from neutron.api.v2 import attributes as v2_attributes +from neutron.api.v2 import base as v2_base + + +class BodyValidationHook(hooks.PecanHook): + + priority = 120 + + def before(self, state): + if state.request.method not in ('POST', 'PUT'): + return + resource = state.request.context.get('resource') + collection = state.request.context.get('collection') + neutron_context = state.request.context['neutron_context'] + is_create = state.request.method == 'POST' + if not resource: + return + # Prepare data to be passed to the plugin from request body + data = v2_base.Controller.prepare_request_body( + neutron_context, + state.request.json, + is_create, + resource, + v2_attributes.get_collection_info(collection), + allow_bulk=is_create) + if collection in data: + state.request.context['resources'] = [item[resource] for item in + data[collection]] + else: + state.request.context['resources'] = [data[resource]] diff --git a/neutron/pecan_wsgi/hooks/ownership_validation.py b/neutron/pecan_wsgi/hooks/ownership_validation.py index 469ffe228f5..e3c4af38168 100644 --- a/neutron/pecan_wsgi/hooks/ownership_validation.py +++ b/neutron/pecan_wsgi/hooks/ownership_validation.py @@ -27,8 +27,7 @@ class OwnershipValidationHook(hooks.PecanHook): def before(self, state): if state.request.method != 'POST': return - items = state.request.resources - for item in items: + for item in state.request.context.get('resources', []): self._validate_network_tenant_ownership(state, item) def _validate_network_tenant_ownership(self, state, resource_item): diff --git a/neutron/pecan_wsgi/hooks/policy_enforcement.py b/neutron/pecan_wsgi/hooks/policy_enforcement.py index cc9ed316230..cfc70d5aa34 100644 --- a/neutron/pecan_wsgi/hooks/policy_enforcement.py +++ b/neutron/pecan_wsgi/hooks/policy_enforcement.py @@ -18,14 +18,13 @@ import simplejson from oslo_policy import policy as oslo_policy from oslo_utils import excutils -import pecan from pecan import hooks import webob from neutron._i18n import _ +from neutron.api.v2 import attributes as v2_attributes from neutron.common import constants as const from neutron import manager -from neutron.pecan_wsgi.hooks import attribute_population from neutron import policy @@ -34,25 +33,47 @@ class PolicyHook(hooks.PecanHook): ACTION_MAP = {'POST': 'create', 'PUT': 'update', 'GET': 'get', 'DELETE': 'delete'} + def _fetch_resource(self, neutron_context, resource, resource_id): + attrs = v2_attributes.get_resource_info(resource) + field_list = [name for (name, value) in attrs.items() + if (value.get('required_by_policy') or + value.get('primary_key') or 'default' not in value)] + plugin = manager.NeutronManager.get_plugin_for_resource(resource) + getter = getattr(plugin, 'get_%s' % resource) + # TODO(kevinbenton): the parent_id logic currently in base.py + return getter(neutron_context, resource_id, fields=field_list) + def before(self, state): - if state.request.method not in self.ACTION_MAP: - pecan.abort(405) + # This hook should be run only for PUT,POST and DELETE methods and for + # requests targeting a neutron resource + # FIXME(salv-orlando): DELETE support. It does not work with the + # current logic. See LP Bug #1520180 + items = state.request.context.get('resources') + if state.request.method not in ('POST', 'PUT') or not items: + return neutron_context = state.request.context.get('neutron_context') resource = state.request.context.get('resource') + collection = state.request.context.get('collection') is_update = (state.request.method == 'PUT') - items = state.request.resources policy.init() action = '%s_%s' % (self.ACTION_MAP[state.request.method], resource) + + # NOTE(salv-orlando): As bulk updates are not supported, in case of PUT + # requests there will be only a single item to process, and its + # identifier would have been already retrieved by the lookup process for item in items: if is_update: - obj = copy.copy(state.request.original_object) + resource_id = state.request.context.get('resource_id') + obj = copy.copy(self._fetch_resource(neutron_context, + resource, + resource_id)) obj.update(item) obj[const.ATTRIBUTES_TO_UPDATE] = item.keys() item = obj try: policy.enforce( neutron_context, action, item, - pluralized=attribute_population._plural(resource)) + pluralized=collection) except oslo_policy.PolicyNotAuthorized: with excutils.save_and_reraise_exception() as ctxt: # If a tenant is modifying it's own object, it's safe to @@ -67,6 +88,7 @@ class PolicyHook(hooks.PecanHook): def after(self, state): neutron_context = state.request.context.get('neutron_context') resource = state.request.context.get('resource') + collection = state.request.context.get('collection') if not resource: # can't filter a resource we don't recognize return @@ -79,31 +101,31 @@ class PolicyHook(hooks.PecanHook): return action = '%s_%s' % (self.ACTION_MAP[state.request.method], resource) - plural = attribute_population._plural(resource) - if not data or (resource not in data and plural not in data): + if not data or (resource not in data and collection not in data): return is_single = resource in data - key = resource if is_single else plural - to_process = [data[resource]] if is_single else data[plural] + key = resource if is_single else collection + to_process = [data[resource]] if is_single else data[collection] # in the single case, we enforce which raises on violation # in the plural case, we just check so violating items are hidden policy_method = policy.enforce if is_single else policy.check plugin = manager.NeutronManager.get_plugin_for_resource(resource) - resp = [self._get_filtered_item(state.request, resource, item) + resp = [self._get_filtered_item(state.request, resource, + collection, item) for item in to_process if (state.request.method != 'GET' or policy_method(neutron_context, action, item, plugin=plugin, - pluralized=plural))] + pluralized=collection))] if is_single: resp = resp[0] data[key] = resp state.response.json = data - def _get_filtered_item(self, request, resource, data): + def _get_filtered_item(self, request, resource, collection, data): neutron_context = request.context.get('neutron_context') to_exclude = self._exclude_attributes_by_policy( - neutron_context, resource, data) + neutron_context, resource, collection, data) return self._filter_attributes(request, data, to_exclude) def _filter_attributes(self, request, data, fields_to_strip): @@ -115,7 +137,8 @@ class PolicyHook(hooks.PecanHook): if (item[0] not in fields_to_strip and (not user_fields or item[0] in user_fields))) - def _exclude_attributes_by_policy(self, context, resource, data): + def _exclude_attributes_by_policy(self, context, resource, + collection, data): """Identifies attributes to exclude according to authZ policies. Return a list of attribute names which should be stripped from the @@ -124,7 +147,7 @@ class PolicyHook(hooks.PecanHook): """ attributes_to_exclude = [] for attr_name in data.keys(): - attr_data = attribute_population._attributes_for_resource( + attr_data = v2_attributes.get_resource_info( resource).get(attr_name) if attr_data and attr_data['is_visible']: if policy.check( @@ -134,7 +157,7 @@ class PolicyHook(hooks.PecanHook): 'get_%s:%s' % (resource, attr_name), data, might_not_exist=True, - pluralized=attribute_population._plural(resource)): + pluralized=collection): # this attribute is visible, check next one continue # if the code reaches this point then either the policy check diff --git a/neutron/pecan_wsgi/hooks/quota_enforcement.py b/neutron/pecan_wsgi/hooks/quota_enforcement.py index b6b04e3b5fa..6cede44aef9 100644 --- a/neutron/pecan_wsgi/hooks/quota_enforcement.py +++ b/neutron/pecan_wsgi/hooks/quota_enforcement.py @@ -31,11 +31,11 @@ class QuotaEnforcementHook(hooks.PecanHook): def before(self, state): # TODO(salv-orlando): This hook must go when adapting the pecan code to # use reservations. - if state.request.method != 'POST': - return resource = state.request.context.get('resource') + if state.request.method != 'POST' or not resource: + return plugin = manager.NeutronManager.get_plugin_for_resource(resource) - items = state.request.resources + items = state.request.context.get('resources') deltas = {} for item in items: tenant_id = item['tenant_id'] diff --git a/neutron/tests/functional/pecan_wsgi/test_functional.py b/neutron/tests/functional/pecan_wsgi/test_functional.py index 96c4207494b..f84901a0dac 100644 --- a/neutron/tests/functional/pecan_wsgi/test_functional.py +++ b/neutron/tests/functional/pecan_wsgi/test_functional.py @@ -172,35 +172,101 @@ class TestExceptionTranslationHook(PecanFunctionalTest): self.assertEqual(response.status_int, 500) -class TestRequestPopulatingHooks(PecanFunctionalTest): +class TestRequestProcessing(PecanFunctionalTest): def setUp(self): - super(TestRequestPopulatingHooks, self).setUp() + super(TestRequestProcessing, self).setUp() # request.context is thread-local storage so it has to be accessed by # the controller. We can capture it into a list here to assert on after # the request finishes. def capture_request_details(*args, **kwargs): - self.req_stash = { - 'context': request.context['neutron_context'], - 'resource_type': request.context['resource'], - } - mock.patch( - 'neutron.pecan_wsgi.controllers.root.CollectionsController.get', - side_effect=capture_request_details - ).start() + self.captured_context = request.context + mock.patch('neutron.pecan_wsgi.controllers.root.' + 'CollectionsController.get', + side_effect=capture_request_details).start() + mock.patch('neutron.pecan_wsgi.controllers.root.' + 'CollectionsController.create', + side_effect=capture_request_details).start() + mock.patch('neutron.pecan_wsgi.controllers.root.ItemController.get', + side_effect=capture_request_details).start() # TODO(kevinbenton): add context tests for X-Roles etc def test_context_set_in_request(self): self.app.get('/v2.0/ports.json', headers={'X-Project-Id': 'tenant_id'}) - self.assertEqual('tenant_id', self.req_stash['context'].tenant_id) + self.assertEqual('tenant_id', + self.captured_context['neutron_context'].tenant_id) def test_core_resource_identified(self): self.app.get('/v2.0/ports.json') - self.assertEqual('port', self.req_stash['resource_type']) + self.assertEqual('port', self.captured_context['resource']) + self.assertEqual('ports', self.captured_context['collection']) + + def test_lookup_identifies_resource_id(self): + # We now this will return a 404 but that's not the point as it is + # mocked + self.app.get('/v2.0/ports/reina.json') + self.assertEqual('port', self.captured_context['resource']) + self.assertEqual('ports', self.captured_context['collection']) + self.assertEqual('reina', self.captured_context['resource_id']) + + def test_resource_processing_post(self): + self.app.post_json( + '/v2.0/ports.json', + params={'port': {'network_id': self.port['network_id'], + 'name': 'the_port', + 'admin_state_up': True}}, + headers={'X-Project-Id': 'tenid'}) + self.assertEqual('port', self.captured_context['resource']) + self.assertEqual('ports', self.captured_context['collection']) + resources = self.captured_context['resources'] + self.assertEqual(1, len(resources)) + self.assertEqual(self.port['network_id'], + resources[0]['network_id']) + self.assertEqual('the_port', resources[0]['name']) + + def test_resource_processing_post_bulk(self): + self.app.post_json( + '/v2.0/ports.json', + params={'ports': [{'network_id': self.port['network_id'], + 'name': 'the_port_1', + 'admin_state_up': True}, + {'network_id': self.port['network_id'], + 'name': 'the_port_2', + 'admin_state_up': True}]}, + headers={'X-Project-Id': 'tenid'}) + resources = self.captured_context['resources'] + self.assertEqual(2, len(resources)) + self.assertEqual(self.port['network_id'], + resources[0]['network_id']) + self.assertEqual('the_port_1', resources[0]['name']) + self.assertEqual(self.port['network_id'], + resources[1]['network_id']) + self.assertEqual('the_port_2', resources[1]['name']) + + def test_resource_processing_post_unknown_attribute_returns_400(self): + response = self.app.post_json( + '/v2.0/ports.json', + params={'port': {'network_id': self.port['network_id'], + 'name': 'the_port', + 'alien': 'E.T.', + 'admin_state_up': True}}, + headers={'X-Project-Id': 'tenid'}, + expect_errors=True) + self.assertEqual(400, response.status_int) + + def test_resource_processing_post_validation_errori_returns_400(self): + response = self.app.post_json( + '/v2.0/ports.json', + params={'port': {'network_id': self.port['network_id'], + 'name': 'the_port', + 'admin_state_up': 'invalid_value'}}, + headers={'X-Project-Id': 'tenid'}, + expect_errors=True) + self.assertEqual(400, response.status_int) def test_service_plugin_identified(self): # TODO(kevinbenton): fix the unit test setup to include an l3 plugin @@ -216,14 +282,12 @@ class TestRequestPopulatingHooks(PecanFunctionalTest): class TestEnforcementHooks(PecanFunctionalTest): def test_network_ownership_check(self): - # TODO(kevinbenton): get a scenario that passes attribute population - self.skipTest("Attribute population blocks this test as-is") - response = self.app.post_json('/v2.0/ports.json', + response = self.app.post_json( + '/v2.0/ports.json', params={'port': {'network_id': self.port['network_id'], - 'admin_state_up': True, - 'tenant_id': 'tenid2'}}, + 'admin_state_up': True}}, headers={'X-Project-Id': 'tenid'}) - self.assertEqual(response.status_int, 200) + self.assertEqual(201, response.status_int) def test_quota_enforcement(self): # TODO(kevinbenton): this test should do something diff --git a/neutron/tests/unit/api/v2/test_attributes.py b/neutron/tests/unit/api/v2/test_attributes.py index c3324793c80..198247a990f 100644 --- a/neutron/tests/unit/api/v2/test_attributes.py +++ b/neutron/tests/unit/api/v2/test_attributes.py @@ -1042,3 +1042,35 @@ class TestResDict(base.BaseTestCase): self.assertRaises(webob.exc.HTTPBadRequest, attributes.populate_tenant_id, ctx, res_dict3, attr_info, True) + + +class TestHelpers(base.DietTestCase): + + def _verify_port_attributes(self, attrs): + for test_attribute in ('id', 'name', 'mac_address', 'network_id', + 'tenant_id', 'fixed_ips', 'status'): + self.assertIn(test_attribute, attrs) + + def test_get_collection_info(self): + attrs = attributes.get_collection_info('ports') + self._verify_port_attributes(attrs) + + def test_get_collection_info_missing(self): + self.assertFalse(attributes.get_collection_info('meh')) + + def test_get_resource_info(self): + attributes.REVERSED_PLURALS.pop('port', None) + attrs = attributes.get_resource_info('port') + self._verify_port_attributes(attrs) + # verify side effect + self.assertIn('port', attributes.REVERSED_PLURALS) + + def test_get_resource_info_missing(self): + self.assertFalse(attributes.get_resource_info('meh')) + + def test_get_resource_info_cached(self): + with mock.patch('neutron.api.v2.attributes.PLURALS') as mock_plurals: + attributes.REVERSED_PLURALS['port'] = 'ports' + attrs = attributes.get_resource_info('port') + self._verify_port_attributes(attrs) + self.assertEqual(0, mock_plurals.items.call_count)