Improve the segmentation ID validation logic

Segmentation ID is meant to be an Integer as per model,
and as such it is best to attempt a conversion in case
it happens to be sent as string as done in [1]. The
failure mode in this case is not immediately obvious
as the ID range check fails and yet the ID is indeed
in range.

This patch also dedups some logic, clean up and add a
test.

[1] https://review.openstack.org/#/c/340624/7/neutronclient/osc/v2/trunk/subport.py@36

Partially-implements: blueprint vlan-aware-vms

Change-Id: I7ca4c93fdf0ab83088194e9a343448ed511d4cdb
This commit is contained in:
Armando Migliaccio 2016-07-20 22:21:49 -07:00
parent 3150196c54
commit 5588f45a6d
3 changed files with 38 additions and 15 deletions

View File

@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from neutron_lib.api import converters
from neutron_lib import constants as n_const from neutron_lib import constants as n_const
from neutron_lib import exceptions as n_exc from neutron_lib import exceptions as n_exc
@ -102,20 +103,26 @@ class SubPortsValidator(object):
# figure out defaults when the time comes to support Ironic. # figure out defaults when the time comes to support Ironic.
# We can reasonably expect segmentation details to be provided # We can reasonably expect segmentation details to be provided
# in all other cases for now. # in all other cases for now.
segmentation_id = subport.get("segmentation_id") try:
segmentation_type = subport.get("segmentation_type") segmentation_type = subport["segmentation_type"]
if not segmentation_id or not segmentation_type: segmentation_id = (
converters.convert_to_int(subport["segmentation_id"]))
except KeyError:
msg = _("Invalid subport details '%s': missing segmentation " msg = _("Invalid subport details '%s': missing segmentation "
"information. Must specify both segmentation_id and " "information. Must specify both segmentation_id and "
"segmentation_type") % subport "segmentation_type") % subport
raise n_exc.InvalidInput(error_message=msg) raise n_exc.InvalidInput(error_message=msg)
except n_exc.InvalidInput:
msg = _("Invalid subport details: segmentation_id '%s' is "
"not an integer") % subport["segmentation_id"]
raise n_exc.InvalidInput(error_message=msg)
if segmentation_type not in self._segmentation_types: if segmentation_type not in self._segmentation_types:
msg = _("Invalid segmentation_type '%s'") % segmentation_type msg = _("Unknown segmentation_type '%s'") % segmentation_type
raise n_exc.InvalidInput(error_message=msg) raise n_exc.InvalidInput(error_message=msg)
if not self._segmentation_types[segmentation_type](segmentation_id): if not self._segmentation_types[segmentation_type](segmentation_id):
msg = _("Invalid segmentation id '%s'") % segmentation_id msg = _("Segmentation ID '%s' is not in range") % segmentation_id
raise n_exc.InvalidInput(error_message=msg) raise n_exc.InvalidInput(error_message=msg)
trunk_validator = TrunkPortValidator(subport['port_id']) trunk_validator = TrunkPortValidator(subport['port_id'])

View File

@ -16,7 +16,7 @@ from oslo_log import log as logging
from neutron.callbacks import events from neutron.callbacks import events
from neutron.callbacks import registry from neutron.callbacks import registry
from neutron.plugins.common import constants as common_consts from neutron.plugins.common import utils
from neutron.services.trunk import constants as trunk_consts from neutron.services.trunk import constants as trunk_consts
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -29,10 +29,6 @@ def register():
def handler(resource, event, trigger, **kwargs): def handler(resource, event, trigger, **kwargs):
trigger.add_segmentation_type(trunk_consts.VLAN, vlan_range) trigger.add_segmentation_type(
trunk_consts.VLAN, utils.is_valid_vlan_tag)
LOG.debug('Registration complete') LOG.debug('Registration complete')
def vlan_range(segmentation_id):
min_vid, max_vid = common_consts.MIN_VLAN_TAG, common_consts.MAX_VLAN_TAG
return min_vid <= segmentation_id <= max_vid

View File

@ -20,11 +20,11 @@ from neutron_lib import exceptions as n_exc
from oslo_utils import uuidutils from oslo_utils import uuidutils
from neutron import manager from neutron import manager
from neutron.plugins.common import utils
from neutron.services.trunk import constants from neutron.services.trunk import constants
from neutron.services.trunk import exceptions as trunk_exc from neutron.services.trunk import exceptions as trunk_exc
from neutron.services.trunk import plugin as trunk_plugin from neutron.services.trunk import plugin as trunk_plugin
from neutron.services.trunk import rules from neutron.services.trunk import rules
from neutron.services.trunk.validators import vlan as vlan_driver
from neutron.tests import base from neutron.tests import base
from neutron.tests.unit.plugins.ml2 import test_plugin from neutron.tests.unit.plugins.ml2 import test_plugin
@ -33,7 +33,7 @@ class SubPortsValidatorTestCase(base.BaseTestCase):
def setUp(self): def setUp(self):
super(SubPortsValidatorTestCase, self).setUp() super(SubPortsValidatorTestCase, self).setUp()
self.segmentation_types = {constants.VLAN: vlan_driver.vlan_range} self.segmentation_types = {constants.VLAN: utils.is_valid_vlan_tag}
self.context = mock.ANY self.context = mock.ANY
def test_validate_subport_subport_and_trunk_shared_port_id(self): def test_validate_subport_subport_and_trunk_shared_port_id(self):
@ -57,6 +57,26 @@ class SubPortsValidatorTestCase(base.BaseTestCase):
validator.validate, validator.validate,
self.context) self.context)
def test_validate_subport_vlan_id_not_an_int(self):
validator = rules.SubPortsValidator(
self.segmentation_types,
[{'port_id': uuidutils.generate_uuid(),
'segmentation_type': 'vlan',
'segmentation_id': 'IamNotAnumber'}])
self.assertRaises(n_exc.InvalidInput,
validator.validate,
self.context)
def test_validate_subport_valid_vlan_id_as_string(self):
validator = rules.SubPortsValidator(
self.segmentation_types,
[{'port_id': uuidutils.generate_uuid(),
'segmentation_type': 'vlan',
'segmentation_id': '2'}])
with mock.patch.object(rules.TrunkPortValidator, 'validate') as f:
validator.validate(self.context)
f.assert_called_once_with(self.context)
def test_validate_subport_subport_invalid_segmenation_type(self): def test_validate_subport_subport_invalid_segmenation_type(self):
validator = rules.SubPortsValidator( validator = rules.SubPortsValidator(
self.segmentation_types, self.segmentation_types,
@ -101,7 +121,7 @@ class TrunkPortValidatorTestCase(test_plugin.Ml2PluginV2TestCase):
super(TrunkPortValidatorTestCase, self).setUp() super(TrunkPortValidatorTestCase, self).setUp()
self.trunk_plugin = trunk_plugin.TrunkPlugin() self.trunk_plugin = trunk_plugin.TrunkPlugin()
self.trunk_plugin.add_segmentation_type(constants.VLAN, self.trunk_plugin.add_segmentation_type(constants.VLAN,
vlan_driver.vlan_range) utils.is_valid_vlan_tag)
def test_validate_port_parent_in_use_by_trunk(self): def test_validate_port_parent_in_use_by_trunk(self):
with self.port() as trunk_parent: with self.port() as trunk_parent: