Merge "use attribute functions/operations from neutron-lib"

This commit is contained in:
Jenkins 2017-06-30 11:42:17 +00:00 committed by Gerrit Code Review
commit d76c45d546
6 changed files with 18 additions and 251 deletions

View File

@ -14,12 +14,8 @@
# under the License.
from neutron_lib.api import converters as lib_converters
from neutron_lib.api import validators as lib_validators
from neutron_lib import constants
from neutron_lib.db import constants as db_const
import webob.exc
from neutron._i18n import _
# Defining a constant to avoid repeating string literal in several modules
@ -289,9 +285,6 @@ RESOURCE_FOREIGN_KEYS = {
NETWORKS: 'network_id'
}
# Removing PLURALS breaks subprojects, but they don't need it so they
# need to be patched.
def get_collection_info(collection):
"""Helper function to retrieve attribute info.
@ -299,106 +292,3 @@ def get_collection_info(collection):
:param collection: Collection or plural name of the resource
"""
return RESOURCE_ATTRIBUTE_MAP.get(collection)
def fill_default_value(attr_info, res_dict,
exc_cls=ValueError,
check_allow_post=True):
for attr, attr_vals in attr_info.items():
if attr_vals['allow_post']:
if 'default' not in attr_vals and attr not in res_dict:
msg = _("Failed to parse request. Required "
"attribute '%s' not specified") % attr
raise exc_cls(msg)
res_dict[attr] = res_dict.get(attr,
attr_vals.get('default'))
elif check_allow_post:
if attr in res_dict:
msg = _("Attribute '%s' not allowed in POST") % attr
raise exc_cls(msg)
def convert_value(attr_info, res_dict, exc_cls=ValueError):
for attr, attr_vals in attr_info.items():
if (attr not in res_dict or
res_dict[attr] is constants.ATTR_NOT_SPECIFIED):
continue
# Convert values if necessary
if 'convert_to' in attr_vals:
res_dict[attr] = attr_vals['convert_to'](res_dict[attr])
# Check that configured values are correct
if 'validate' not in attr_vals:
continue
for rule in attr_vals['validate']:
validator = lib_validators.get_validator(rule)
res = validator(res_dict[attr], attr_vals['validate'][rule])
if res:
msg_dict = dict(attr=attr, reason=res)
msg = _("Invalid input for %(attr)s. "
"Reason: %(reason)s.") % msg_dict
raise exc_cls(msg)
def populate_project_info(attributes):
"""
Ensure that both project_id and tenant_id attributes are present.
If either project_id or tenant_id is present in attributes then ensure
that both are present.
If neither are present then attributes is not updated.
:param attributes: a dictionary of resource/API attributes
:type attributes: dict
:return: the updated attributes dictionary
:rtype: dict
"""
if 'tenant_id' in attributes and 'project_id' not in attributes:
# TODO(HenryG): emit a deprecation warning here
attributes['project_id'] = attributes['tenant_id']
elif 'project_id' in attributes and 'tenant_id' not in attributes:
# Backward compatibility for code still using tenant_id
attributes['tenant_id'] = attributes['project_id']
if attributes.get('project_id') != attributes.get('tenant_id'):
msg = _("'project_id' and 'tenant_id' do not match")
raise webob.exc.HTTPBadRequest(msg)
return attributes
def _validate_privileges(context, res_dict):
if ('project_id' in res_dict and
res_dict['project_id'] != context.project_id and
not context.is_admin):
msg = _("Specifying 'project_id' or 'tenant_id' other than "
"authenticated project in request requires admin privileges")
raise webob.exc.HTTPBadRequest(msg)
def populate_tenant_id(context, res_dict, attr_info, is_create):
populate_project_info(res_dict)
_validate_privileges(context, res_dict)
if is_create and 'project_id' not in res_dict:
if context.project_id:
res_dict['project_id'] = context.project_id
# For backward compatibility
res_dict['tenant_id'] = context.project_id
elif 'tenant_id' in attr_info:
msg = _("Running without keystone AuthN requires "
"that tenant_id is specified")
raise webob.exc.HTTPBadRequest(msg)
def verify_attributes(res_dict, attr_info):
populate_project_info(attr_info)
extra_keys = set(res_dict.keys()) - set(attr_info.keys())
if extra_keys:
msg = _("Unrecognized attribute(s) '%s'") % ', '.join(extra_keys)
raise webob.exc.HTTPBadRequest(msg)

View File

@ -17,18 +17,17 @@ import collections
import copy
import netaddr
from neutron_lib.api import attributes
from neutron_lib.callbacks import events
from neutron_lib.callbacks import registry
from neutron_lib import exceptions
from oslo_log import log as logging
from oslo_policy import policy as oslo_policy
from oslo_utils import excutils
from oslo_utils import strutils
import webob.exc
from neutron._i18n import _, _LE, _LI
from neutron.api import api_common
from neutron.api.v2 import attributes
from neutron.api.v2 import resource as wsgi_resource
from neutron.common import constants as n_const
from neutron.common import exceptions as n_exc
@ -693,8 +692,7 @@ class Controller(object):
if not body:
raise webob.exc.HTTPBadRequest(_("Resource body required"))
LOG.debug("Request body: %(body)s",
{'body': strutils.mask_password(body)})
LOG.debug("Request body: %(body)s", {'body': body})
try:
if collection in body:
if not allow_bulk:
@ -717,19 +715,21 @@ class Controller(object):
msg = _("Unable to find '%s' in request body") % resource
raise webob.exc.HTTPBadRequest(msg)
attributes.populate_tenant_id(context, res_dict, attr_info, is_create)
attributes.verify_attributes(res_dict, attr_info)
attr_ops = attributes.AttributeInfo(attr_info)
attr_ops.populate_project_id(context, res_dict, is_create)
attributes.populate_project_info(attr_info)
attr_ops.verify_attributes(res_dict)
if is_create: # POST
attributes.fill_default_value(attr_info, res_dict,
webob.exc.HTTPBadRequest)
attr_ops.fill_post_defaults(
res_dict, exc_cls=webob.exc.HTTPBadRequest)
else: # PUT
for attr, attr_vals in attr_info.items():
if attr in res_dict and not attr_vals['allow_put']:
msg = _("Cannot update read-only attribute %s") % attr
raise webob.exc.HTTPBadRequest(msg)
attributes.convert_value(attr_info, res_dict, webob.exc.HTTPBadRequest)
attr_ops.convert_values(res_dict, exc_cls=webob.exc.HTTPBadRequest)
return body
def _validate_network_tenant_ownership(self, request, resource_item):

View File

@ -16,12 +16,12 @@ NOTE: This module shall not be used by external projects. It will be moved
to neutron-lib in due course, and then it can be used from there.
"""
from neutron_lib.api import attributes
from neutron_lib.db import utils as db_utils
from oslo_db.sqlalchemy import utils as sa_utils
from sqlalchemy import sql, or_, and_
from sqlalchemy.ext import associationproxy
from neutron.api.v2 import attributes
from neutron.common import utils
from neutron.db import _utils as ndb_utils

View File

@ -17,12 +17,12 @@ NOTE: This module shall not be used by external projects. It will be moved
import contextlib
from neutron_lib.api import attributes
from oslo_log import log as logging
from oslo_utils import excutils
from sqlalchemy.ext import associationproxy
from neutron._i18n import _LE
from neutron.api.v2 import attributes
LOG = logging.getLogger(__name__)

View File

@ -20,6 +20,7 @@ import collections
import contextlib
import hashlib
from neutron_lib.api import attributes as lib_attrs
from neutron_lib import constants as n_const
from neutron_lib import exceptions
from oslo_config import cfg
@ -150,16 +151,17 @@ def in_pending_status(status):
def _fixup_res_dict(context, attr_name, res_dict, check_allow_post=True):
attr_info = attributes.RESOURCE_ATTRIBUTE_MAP[attr_name]
attr_ops = lib_attrs.AttributeInfo(attr_info)
try:
attributes.populate_tenant_id(context, res_dict, attr_info, True)
attributes.verify_attributes(res_dict, attr_info)
attr_ops.populate_project_id(context, res_dict, True)
lib_attrs.populate_project_info(attr_info)
attr_ops.verify_attributes(res_dict)
except webob.exc.HTTPBadRequest as e:
# convert webob exception into ValueError as these functions are
# for internal use. webob exception doesn't make sense.
raise ValueError(e.detail)
attributes.fill_default_value(attr_info, res_dict,
check_allow_post=check_allow_post)
attributes.convert_value(attr_info, res_dict)
attr_ops.fill_post_defaults(res_dict, check_allow_post=check_allow_post)
attr_ops.convert_values(res_dict)
return res_dict

View File

@ -13,135 +13,10 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron_lib.api import converters
from neutron_lib import constants
from neutron_lib import context
from neutron_lib import exceptions as n_exc
from oslo_utils import uuidutils
import webob.exc
from neutron.api.v2 import attributes
from neutron.tests import base
class TestResDict(base.BaseTestCase):
class _MyException(Exception):
pass
_EXC_CLS = _MyException
def _test_fill_default_value(self, attr_info, expected, res_dict):
attributes.fill_default_value(attr_info, res_dict)
self.assertEqual(expected, res_dict)
def test_fill_default_value(self):
attr_info = {
'key': {
'allow_post': True,
'default': constants.ATTR_NOT_SPECIFIED,
},
}
self._test_fill_default_value(attr_info, {'key': 'X'}, {'key': 'X'})
self._test_fill_default_value(
attr_info, {'key': constants.ATTR_NOT_SPECIFIED}, {})
attr_info = {
'key': {
'allow_post': True,
},
}
self._test_fill_default_value(attr_info, {'key': 'X'}, {'key': 'X'})
self.assertRaises(ValueError, self._test_fill_default_value,
attr_info, {'key': 'X'}, {})
self.assertRaises(self._EXC_CLS, attributes.fill_default_value,
attr_info, {}, self._EXC_CLS)
attr_info = {
'key': {
'allow_post': False,
},
}
self.assertRaises(ValueError, self._test_fill_default_value,
attr_info, {'key': 'X'}, {'key': 'X'})
self._test_fill_default_value(attr_info, {}, {})
self.assertRaises(self._EXC_CLS, attributes.fill_default_value,
attr_info, {'key': 'X'}, self._EXC_CLS)
def _test_convert_value(self, attr_info, expected, res_dict):
attributes.convert_value(attr_info, res_dict)
self.assertEqual(expected, res_dict)
def test_convert_value(self):
attr_info = {
'key': {
},
}
self._test_convert_value(attr_info,
{'key': constants.ATTR_NOT_SPECIFIED},
{'key': constants.ATTR_NOT_SPECIFIED})
self._test_convert_value(attr_info, {'key': 'X'}, {'key': 'X'})
self._test_convert_value(attr_info,
{'other_key': 'X'}, {'other_key': 'X'})
attr_info = {
'key': {
'convert_to': converters.convert_to_int,
},
}
self._test_convert_value(attr_info,
{'key': constants.ATTR_NOT_SPECIFIED},
{'key': constants.ATTR_NOT_SPECIFIED})
self._test_convert_value(attr_info, {'key': 1}, {'key': '1'})
self._test_convert_value(attr_info, {'key': 1}, {'key': 1})
self.assertRaises(n_exc.InvalidInput, self._test_convert_value,
attr_info, {'key': 1}, {'key': 'a'})
attr_info = {
'key': {
'validate': {'type:uuid': None},
},
}
self._test_convert_value(attr_info,
{'key': constants.ATTR_NOT_SPECIFIED},
{'key': constants.ATTR_NOT_SPECIFIED})
uuid_str = '01234567-1234-1234-1234-1234567890ab'
self._test_convert_value(attr_info,
{'key': uuid_str}, {'key': uuid_str})
self.assertRaises(ValueError, self._test_convert_value,
attr_info, {'key': 1}, {'key': 1})
self.assertRaises(self._EXC_CLS, attributes.convert_value,
attr_info, {'key': 1}, self._EXC_CLS)
def test_populate_tenant_id(self):
tenant_id_1 = uuidutils.generate_uuid()
tenant_id_2 = uuidutils.generate_uuid()
# apart from the admin, nobody can create a res on behalf of another
# tenant
ctx = context.Context(user_id=None, tenant_id=tenant_id_1)
res_dict = {'tenant_id': tenant_id_2}
self.assertRaises(webob.exc.HTTPBadRequest,
attributes.populate_tenant_id,
ctx, res_dict, None, None)
ctx.is_admin = True
self.assertIsNone(attributes.populate_tenant_id(ctx, res_dict,
None, None))
# for each create request, the tenant_id should be added to the
# req body
res_dict2 = {}
attributes.populate_tenant_id(ctx, res_dict2, None, True)
self.assertEqual(
{'tenant_id': ctx.tenant_id, 'project_id': ctx.tenant_id},
res_dict2)
# if the tenant_id is mandatory for the resource and not specified
# in the request nor in the context, an exception should be raised
res_dict3 = {}
attr_info = {'tenant_id': {'allow_post': True}, }
ctx.tenant_id = None
self.assertRaises(webob.exc.HTTPBadRequest,
attributes.populate_tenant_id,
ctx, res_dict3, attr_info, True)
class TestHelpers(base.DietTestCase):
def _verify_port_attributes(self, attrs):