diff --git a/neutronclient/osc/v2/trunk/__init__.py b/neutronclient/osc/v2/trunk/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/neutronclient/osc/v2/trunk/network_trunk.py b/neutronclient/osc/v2/trunk/network_trunk.py new file mode 100644 index 000000000..fa8fb3f7a --- /dev/null +++ b/neutronclient/osc/v2/trunk/network_trunk.py @@ -0,0 +1,347 @@ +# Copyright 2016 ZTE Corporation. +# All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +"""Network trunk and subports action implementations""" +import logging + +from osc_lib.cli import parseractions +from osc_lib.command import command +from osc_lib import exceptions +from osc_lib import utils + +# TODO(abhiraut): Switch to neutronclients identity utils +from openstackclient.identity import common as identity_common + +from neutronclient._i18n import _ +# TODO(abhiraut): Switch to client methods +from neutronclient.neutron import v2_0 as neutronV20 + +LOG = logging.getLogger(__name__) + +TRUNK = 'trunk' +TRUNKS = 'trunks' +SUB_PORTS = 'sub_ports' + + +class CreateNetworkTrunk(command.ShowOne): + """Create a network trunk for a given project""" + + def get_parser(self, prog_name): + parser = super(CreateNetworkTrunk, self).get_parser(prog_name) + parser.add_argument( + 'name', + metavar='', + help=_("Name of the trunk to create") + ) + parser.add_argument( + '--parent-port', + metavar='', + required=True, + help=_("Parent port belonging to this trunk (name or ID)") + ) + parser.add_argument( + '--subport', + metavar='', + action=parseractions.MultiKeyValueAction, dest='add_subports', + help=_("Subport to add. Subport is of form " + "\'port=,segmentation-type=,segmentation-ID=\' " + "(--subport) option can be repeated)") + ) + admin_group = parser.add_mutually_exclusive_group() + admin_group.add_argument( + '--enable', + action='store_true', + default=True, + help=_("Enable trunk (default)") + ) + admin_group.add_argument( + '--disable', + action='store_true', + help=_("Disable trunk") + ) + parser.add_argument( + '--project', + metavar='', + help=_("Owner's project (name or ID)") + ) + identity_common.add_project_domain_option_to_parser(parser) + return parser + + def take_action(self, parsed_args): + client = self.app.client_manager.neutronclient + attrs = _get_attrs_for_trunk(self.app.client_manager, + parsed_args) + body = {TRUNK: attrs} + obj = client.create_trunk(body) + columns = _get_columns(obj[TRUNK]) + data = utils.get_dict_properties(obj[TRUNK], columns, + formatters=_formatters) + return columns, data + + +class DeleteNetworkTrunk(command.Command): + """Delete a given network trunk""" + + def get_parser(self, prog_name): + parser = super(DeleteNetworkTrunk, self).get_parser(prog_name) + parser.add_argument( + 'trunk', + metavar="", + nargs="+", + help=_("Trunk(s) to delete (name or ID)") + ) + return parser + + def take_action(self, parsed_args): + client = self.app.client_manager.neutronclient + result = 0 + for trunk in parsed_args.trunk: + try: + trunk_id = _get_id(client, trunk, TRUNK) + client.delete_trunk(trunk_id) + except Exception as e: + result += 1 + LOG.error(_("Failed to delete trunk with name " + "or ID '%(trunk)s': %(e)s") + % {'trunk': trunk, 'e': e}) + if result > 0: + total = len(parsed_args.trunk) + msg = (_("%(result)s of %(total)s trunks failed " + "to delete.") % {'result': result, 'total': total}) + raise exceptions.CommandError(msg) + + +class ListNetworkTrunk(command.Lister): + """List all network trunks""" + + def take_action(self, parsed_args): + client = self.app.client_manager.neutronclient + data = client.list_trunks() + # TODO(abhiraut): List more columns using --long + headers = ('ID', 'Name', 'Parent Port') + columns = ('id', 'name', 'port_id') + return (headers, + (utils.get_dict_properties( + s, columns, + formatters=_formatters, + ) for s in data[TRUNKS])) + + +class SetNetworkTrunk(command.Command): + """Set network trunk properties""" + + def get_parser(self, prog_name): + parser = super(SetNetworkTrunk, self).get_parser(prog_name) + parser.add_argument( + 'trunk', + metavar="", + help=_("Trunk to modify (name or ID)") + ) + parser.add_argument( + '--name', + metavar="", + help=_("Set trunk name") + ) + parser.add_argument( + '--subport', + metavar='', + action=parseractions.MultiKeyValueAction, dest='set_subports', + help=_("Subport to add. Subport is of form " + "\'port=,segmentation-type=,segmentation-ID=\'" + "(--subport) option can be repeated)") + ) + admin_group = parser.add_mutually_exclusive_group() + admin_group.add_argument( + '--enable', + action='store_true', + help=_("Enable trunk") + ) + admin_group.add_argument( + '--disable', + action='store_true', + help=_("Disable trunk") + ) + return parser + + def take_action(self, parsed_args): + client = self.app.client_manager.neutronclient + trunk_id = _get_id(client, parsed_args.trunk, TRUNK) + attrs = _get_attrs_for_trunk(self.app.client_manager, parsed_args) + body = {TRUNK: attrs} + client.update_trunk(trunk_id, body) + if parsed_args.set_subports: + subport_attrs = _get_attrs_for_subports(self.app.client_manager, + parsed_args) + client.trunk_add_subports(trunk_id, subport_attrs) + + +class ShowNetworkTrunk(command.ShowOne): + """Show information of a given network trunk""" + def get_parser(self, prog_name): + parser = super(ShowNetworkTrunk, self).get_parser(prog_name) + parser.add_argument( + 'trunk', + metavar="", + help=_("Trunk to display (name or ID)") + ) + return parser + + def take_action(self, parsed_args): + client = self.app.client_manager.neutronclient + trunk_id = _get_id(client, parsed_args.trunk, TRUNK) + obj = client.show_trunk(trunk_id) + columns = _get_columns(obj[TRUNK]) + data = utils.get_dict_properties(obj[TRUNK], columns, + formatters=_formatters) + return columns, data + + +class ListNetworkSubport(command.Lister): + """List all subports for a given network trunk""" + + def get_parser(self, prog_name): + parser = super(ListNetworkSubport, self).get_parser(prog_name) + parser.add_argument( + '--trunk', + required=True, + metavar="", + help=_("List subports belonging to this trunk (name or ID)") + ) + return parser + + def take_action(self, parsed_args): + client = self.app.client_manager.neutronclient + trunk_id = _get_id(client, parsed_args.trunk, TRUNK) + data = client.trunk_get_subports(trunk_id) + headers = ('Port', 'Segmentation Type', 'Segmentation ID') + columns = ('port_id', 'segmentation_type', 'segmentation_id') + return (headers, + (utils.get_dict_properties( + s, columns, + ) for s in data[SUB_PORTS])) + + +class UnsetNetworkTrunk(command.Command): + """Unset subports from a given network trunk""" + + def get_parser(self, prog_name): + parser = super(UnsetNetworkTrunk, self).get_parser(prog_name) + parser.add_argument( + 'trunk', + metavar="", + help=_("Unset subports from this trunk (name or ID)") + ) + parser.add_argument( + '--subport', + metavar="", + required=True, + action='append', dest='unset_subports', + help=_("Subport to delete (name or ID of the port) " + "(--subport) option can be repeated") + ) + return parser + + def take_action(self, parsed_args): + client = self.app.client_manager.neutronclient + attrs = _get_attrs_for_subports(self.app.client_manager, parsed_args) + trunk_id = _get_id(client, parsed_args.trunk, TRUNK) + client.trunk_remove_subports(trunk_id, attrs) + + +def _format_admin_state(item): + return 'UP' if item else 'DOWN' + + +_formatters = { + 'admin_state_up': _format_admin_state, + 'sub_ports': utils.format_list_of_dicts, +} + + +def _get_columns(item): + return tuple(sorted(list(item.keys()))) + + +def _get_attrs_for_trunk(client_manager, parsed_args): + attrs = {} + if parsed_args.name is not None: + attrs['name'] = str(parsed_args.name) + if parsed_args.enable: + attrs['admin_state_up'] = True + if parsed_args.disable: + attrs['admin_state_up'] = False + if 'parent_port' in parsed_args and parsed_args.parent_port is not None: + port_id = _get_id(client_manager.neutronclient, + parsed_args.parent_port, 'port') + attrs['port_id'] = port_id + if 'add_subports' in parsed_args and parsed_args.add_subports is not None: + attrs[SUB_PORTS] = _format_subports(client_manager, + parsed_args.add_subports) + + # "trunk set" command doesn't support setting project. + if 'project' in parsed_args and parsed_args.project is not None: + identity_client = client_manager.identity + project_id = identity_common.find_project( + identity_client, + parsed_args.project, + parsed_args.project_domain, + ).id + attrs['tenant_id'] = project_id + + return attrs + + +def _format_subports(client_manager, subports): + attrs = [] + for subport in subports: + subport_attrs = {} + if subport.get('port'): + port_id = _get_id(client_manager.neutronclient, + subport['port'], 'port') + subport_attrs['port_id'] = port_id + if subport.get('segmentation-id'): + try: + subport_attrs['segmentation_id'] = int( + subport['segmentation-id']) + except ValueError: + msg = (_("Segmentation-id '%s' is not an integer") % + subport['segmentation-id']) + raise exceptions.CommandError(msg) + if subport.get('segmentation-type'): + subport_attrs['segmentation_type'] = subport['segmentation-type'] + attrs.append(subport_attrs) + return attrs + + +def _get_attrs_for_subports(client_manager, parsed_args): + attrs = {} + if 'set_subports' in parsed_args and parsed_args.set_subports is not None: + attrs[SUB_PORTS] = _format_subports(client_manager, + parsed_args.set_subports) + if ('unset_subports' in parsed_args and + parsed_args.unset_subports is not None): + subports_list = [] + for subport in parsed_args.unset_subports: + port_id = _get_id(client_manager.neutronclient, + subport, 'port') + subports_list.append({'port_id': port_id}) + attrs[SUB_PORTS] = subports_list + return attrs + + +def _get_id(client, id_or_name, resource): + return neutronV20.find_resourceid_by_name_or_id( + client, resource, str(id_or_name)) diff --git a/neutronclient/tests/unit/osc/v2/trunk/__init__.py b/neutronclient/tests/unit/osc/v2/trunk/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/neutronclient/tests/unit/osc/v2/trunk/fakes.py b/neutronclient/tests/unit/osc/v2/trunk/fakes.py new file mode 100644 index 000000000..0eb6f9786 --- /dev/null +++ b/neutronclient/tests/unit/osc/v2/trunk/fakes.py @@ -0,0 +1,83 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import copy +import mock +import uuid + + +class FakeTrunk(object): + """Fake one or more trunks.""" + @staticmethod + def create_one_trunk(attrs=None): + """Create a fake trunk. + + :param Dictionary attrs: + A dictionary with all attributes + :return: + A Dictionary with id, name, admin_state_up, + port_id, sub_ports, status and project_id + """ + attrs = attrs or {} + + # Set default attributes. + trunk_attrs = { + 'id': 'trunk-id-' + uuid.uuid4().hex, + 'name': 'trunk-name-' + uuid.uuid4().hex, + 'port_id': 'port-' + uuid.uuid4().hex, + 'admin_state_up': True, + 'project_id': 'project-id-' + uuid.uuid4().hex, + 'status': 'ACTIVE', + 'sub_ports': [{'port_id': 'subport-' + uuid.uuid4().hex, + 'segmentation_type': 'vlan', + 'segmentation_id': 100}], + } + + # Overwrite default attributes. + trunk_attrs.update(attrs) + return copy.deepcopy(trunk_attrs) + + @staticmethod + def create_trunks(attrs=None, count=2): + """Create multiple fake trunks. + + :param Dictionary attrs: + A dictionary with all attributes + :param int count: + The number of routers to fake + :return: + A list of dictionaries faking the trunks + """ + trunks = [] + for i in range(0, count): + trunks.append(FakeTrunk.create_one_trunk(attrs)) + + return trunks + + @staticmethod + def get_trunks(trunks=None, count=2): + """Get an iterable MagicMock object with a list of faked trunks. + + If trunks list is provided, then initialize the Mock object with the + list. Otherwise create one. + + :param List trunks: + A list of FakeResource objects faking trunks + :param int count: + The number of trunks to fake + :return: + An iterable Mock object with side_effect set to a list of faked + trunks + """ + if trunks is None: + trunks = FakeTrunk.create_trunks(count) + return mock.MagicMock(side_effect=trunks) diff --git a/neutronclient/tests/unit/osc/v2/trunk/test_network_trunk.py b/neutronclient/tests/unit/osc/v2/trunk/test_network_trunk.py new file mode 100644 index 000000000..ff9baa073 --- /dev/null +++ b/neutronclient/tests/unit/osc/v2/trunk/test_network_trunk.py @@ -0,0 +1,570 @@ +# Copyright 2016 ZTE Corporation. +# All Rights Reserved +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock +from mock import call + +from osc_lib import exceptions +from osc_lib import utils + +# TODO(abhiraut): Switch to osc-lib test utils +from openstackclient.tests import utils as tests_utils + +from neutronclient.osc.v2.trunk import network_trunk as trunk +from neutronclient.tests.unit.osc.v2 import fakes as test_fakes +from neutronclient.tests.unit.osc.v2.trunk import fakes + + +def _get_id(client, id_or_name, resource): + return id_or_name + + +class TestCreateNetworkTrunk(test_fakes.TestNeutronClientOSCV2): + # The new trunk created + _trunk = fakes.FakeTrunk.create_one_trunk() + + columns = ( + 'admin_state_up', + 'id', + 'name', + 'port_id', + 'project_id', + 'status', + 'sub_ports', + ) + data = ( + trunk._format_admin_state(_trunk['admin_state_up']), + _trunk['id'], + _trunk['name'], + _trunk['port_id'], + _trunk['project_id'], + _trunk['status'], + utils.format_list_of_dicts(_trunk['sub_ports']), + ) + + def setUp(self): + super(TestCreateNetworkTrunk, self).setUp() + mock.patch('neutronclient.osc.v2.trunk.network_trunk._get_id', + new=_get_id).start() + self.neutronclient.create_trunk = mock.Mock( + return_value={trunk.TRUNK: self._trunk}) + + # Get the command object to test + self.cmd = trunk.CreateNetworkTrunk(self.app, self.namespace) + + def test_create_no_options(self): + arglist = [] + verifylist = [] + + self.assertRaises(tests_utils.ParserException, self.check_parser, + self.cmd, arglist, verifylist) + + def test_create_default_options(self): + arglist = [ + "--parent-port", self._trunk['port_id'], + self._trunk['name'], + ] + verifylist = [ + ('parent_port', self._trunk['port_id']), + ('name', self._trunk['name']), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + columns, data = (self.cmd.take_action(parsed_args)) + + self.neutronclient.create_trunk.assert_called_once_with({ + trunk.TRUNK: {'name': self._trunk['name'], + 'admin_state_up': self._trunk['admin_state_up'], + 'port_id': self._trunk['port_id']} + }) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) + + def test_create_full_options(self): + subport = self._trunk['sub_ports'][0] + arglist = [ + "--disable", + "--parent-port", self._trunk['port_id'], + "--subport", 'port=%(port)s,segmentation-type=%(seg_type)s,' + 'segmentation-id=%(seg_id)s' % { + 'seg_id': subport['segmentation_id'], + 'seg_type': subport['segmentation_type'], + 'port': subport['port_id']}, + self._trunk['name'], + ] + verifylist = [ + ('name', self._trunk['name']), + ('parent_port', self._trunk['port_id']), + ('add_subports', [{ + 'port': subport['port_id'], + 'segmentation-id': str(subport['segmentation_id']), + 'segmentation-type': subport['segmentation_type']}]), + ('disable', True), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = (self.cmd.take_action(parsed_args)) + + self.neutronclient.create_trunk.assert_called_once_with({ + trunk.TRUNK: {'name': self._trunk['name'], + 'admin_state_up': False, + 'sub_ports': [subport], + 'port_id': self._trunk['port_id']} + }) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) + + def test_create_trunk_with_subport_invalid_segmentation_id_fail(self): + subport = self._trunk['sub_ports'][0] + arglist = [ + "--parent-port", self._trunk['port_id'], + "--subport", "port=%(port)s,segmentation-type=%(seg_type)s," + "segmentation-id=boom" % { + 'seg_type': subport['segmentation_type'], + 'port': subport['port_id']}, + self._trunk['name'], + ] + verifylist = [ + ('name', self._trunk['name']), + ('parent_port', self._trunk['port_id']), + ('add_subports', [{ + 'port': subport['port_id'], + 'segmentation-id': 'boom', + 'segmentation-type': subport['segmentation_type']}]), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + try: + self.cmd.take_action(parsed_args) + self.fail('CommandError should be raised.') + except exceptions.CommandError as e: + self.assertEqual("Segmentation-id 'boom' is not an integer", + str(e)) + + +class TestDeleteNetworkTrunk(test_fakes.TestNeutronClientOSCV2): + # The trunk to be deleted. + _trunks = fakes.FakeTrunk.create_trunks(count=2) + + def setUp(self): + super(TestDeleteNetworkTrunk, self).setUp() + + mock.patch('neutronclient.osc.v2.trunk.network_trunk._get_id', + new=_get_id).start() + self.neutronclient.delete_trunk = mock.Mock(return_value=None) + + # Get the command object to test + self.cmd = trunk.DeleteNetworkTrunk(self.app, self.namespace) + + def test_delete_trunk(self): + arglist = [ + self._trunks[0]['name'], + ] + verifylist = [ + ('trunk', [self._trunks[0]['name']]), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + self.neutronclient.delete_trunk.assert_called_once_with( + self._trunks[0]['name']) + self.assertIsNone(result) + + def test_delete_trunk_multiple(self): + arglist = [] + verifylist = [] + + for t in self._trunks: + arglist.append(t['name']) + verifylist = [ + ('trunk', arglist), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + calls = [] + for t in self._trunks: + calls.append(call(t['name'])) + self.neutronclient.delete_trunk.assert_has_calls(calls) + self.assertIsNone(result) + + def test_delete_trunk_multiple_with_exception(self): + arglist = [ + self._trunks[0]['name'], + 'unexist_trunk', + ] + verifylist = [ + ('trunk', + [self._trunks[0]['name'], 'unexist_trunk']), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + get_mock_result = [self._trunks[0], exceptions.CommandError] + trunk._get_id = ( + mock.MagicMock(side_effect=get_mock_result) + ) + try: + self.cmd.take_action(parsed_args) + self.fail('CommandError should be raised.') + except exceptions.CommandError as e: + self.assertEqual('1 of 2 trunks failed to delete.', str(e)) + self.neutronclient.delete_trunk.assert_called_once_with( + self._trunks[0] + ) + + +class TestShowNetworkTrunk(test_fakes.TestNeutronClientOSCV2): + + # The trunk to set. + _trunk = fakes.FakeTrunk.create_one_trunk() + + columns = ( + 'admin_state_up', + 'id', + 'name', + 'port_id', + 'project_id', + 'status', + 'sub_ports', + ) + data = ( + trunk._format_admin_state(_trunk['admin_state_up']), + _trunk['id'], + _trunk['name'], + _trunk['port_id'], + _trunk['project_id'], + _trunk['status'], + utils.format_list_of_dicts(_trunk['sub_ports']), + ) + + def setUp(self): + super(TestShowNetworkTrunk, self).setUp() + + mock.patch('neutronclient.osc.v2.trunk.network_trunk._get_id', + new=_get_id).start() + self.neutronclient.show_trunk = mock.Mock( + return_value={trunk.TRUNK: self._trunk}) + + # Get the command object to test + self.cmd = trunk.ShowNetworkTrunk(self.app, self.namespace) + + def test_show_no_options(self): + arglist = [] + verifylist = [] + + self.assertRaises(tests_utils.ParserException, self.check_parser, + self.cmd, arglist, verifylist) + + def test_show_all_options(self): + arglist = [ + self._trunk['id'], + ] + verifylist = [ + ('trunk', self._trunk['id']), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.neutronclient.show_trunk.assert_called_once_with( + self._trunk['id']) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, data) + + +class TestListNetworkTrunk(test_fakes.TestNeutronClientOSCV2): + # Create trunks to be listed. + _trunks = fakes.FakeTrunk.create_trunks(count=3) + + columns = ( + 'ID', + 'Name', + 'Parent Port', + ) + data = [] + for t in _trunks: + data.append(( + t['id'], + t['name'], + t['port_id'], + )) + + def setUp(self): + super(TestListNetworkTrunk, self).setUp() + mock.patch('neutronclient.osc.v2.trunk.network_trunk._get_id', + new=_get_id).start() + self.neutronclient.list_trunks = mock.Mock( + return_value={trunk.TRUNKS: self._trunks}) + + # Get the command object to test + self.cmd = trunk.ListNetworkTrunk(self.app, self.namespace) + + def test_trunk_list_no_option(self): + arglist = [] + verifylist = [] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.neutronclient.list_trunks.assert_called_once_with() + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, list(data)) + + +class TestSetNetworkTrunk(test_fakes.TestNeutronClientOSCV2): + # Create trunks to be listed. + _trunk = fakes.FakeTrunk.create_one_trunk() + + columns = ( + 'admin_state_up', + 'id', + 'name', + 'port_id', + 'project_id', + 'status', + 'sub_ports', + ) + data = ( + trunk._format_admin_state(_trunk['admin_state_up']), + _trunk['id'], + _trunk['name'], + _trunk['port_id'], + _trunk['project_id'], + _trunk['status'], + utils.format_list_of_dicts(_trunk['sub_ports']), + ) + + def setUp(self): + super(TestSetNetworkTrunk, self).setUp() + mock.patch('neutronclient.osc.v2.trunk.network_trunk._get_id', + new=_get_id).start() + self.neutronclient.update_trunk = mock.Mock( + return_value={trunk.TRUNK: self._trunk}) + self.neutronclient.trunk_add_subports = mock.Mock( + return_value=self._trunk) + + # Get the command object to test + self.cmd = trunk.SetNetworkTrunk(self.app, self.namespace) + + def test_set_network_trunk_name(self): + arglist = [ + '--name', 'trunky', + self._trunk['name'], + ] + verifylist = [ + ('name', 'trunky'), + ('trunk', self._trunk['name']), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + attrs = { + 'name': 'trunky', + } + self.neutronclient.update_trunk.assert_called_once_with( + self._trunk['name'], {trunk.TRUNK: attrs}) + self.assertIsNone(result) + + def test_set_network_trunk_admin_state_up_disable(self): + arglist = [ + '--disable', + self._trunk['name'], + ] + verifylist = [ + ('disable', True), + ('trunk', self._trunk['name']), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + attrs = { + 'admin_state_up': False, + } + self.neutronclient.update_trunk.assert_called_once_with( + self._trunk['name'], {trunk.TRUNK: attrs}) + self.assertIsNone(result) + + def test_set_network_trunk_admin_state_up_enable(self): + arglist = [ + '--enable', + self._trunk['name'], + ] + verifylist = [ + ('enable', True), + ('trunk', self._trunk['name']), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + attrs = { + 'admin_state_up': True, + } + self.neutronclient.update_trunk.assert_called_once_with( + self._trunk['name'], {trunk.TRUNK: attrs}) + self.assertIsNone(result) + + def test_set_network_trunk_nothing(self): + arglist = [self._trunk['name'], ] + verifylist = [('trunk', self._trunk['name']), ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + attrs = {} + self.neutronclient.update_trunk.assert_called_once_with( + self._trunk['name'], {trunk.TRUNK: attrs}) + self.assertIsNone(result) + + def test_set_network_trunk_subports(self): + subport = self._trunk['sub_ports'][0] + arglist = [ + "--subport", 'port=%(port)s,segmentation-type=%(seg_type)s,' + 'segmentation-id=%(seg_id)s' % { + 'seg_id': subport['segmentation_id'], + 'seg_type': subport['segmentation_type'], + 'port': subport['port_id']}, + self._trunk['name'], + ] + verifylist = [ + ('trunk', self._trunk['name']), + ('set_subports', [{ + 'port': subport['port_id'], + 'segmentation-id': str(subport['segmentation_id']), + 'segmentation-type': subport['segmentation_type']}]), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + result = self.cmd.take_action(parsed_args) + + self.neutronclient.trunk_add_subports.assert_called_once_with( + self._trunk['name'], {'sub_ports': [subport]} + ) + self.assertIsNone(result) + + +class TestListNetworkSubport(test_fakes.TestNeutronClientOSCV2): + + _trunk = fakes.FakeTrunk.create_one_trunk() + _subports = _trunk['sub_ports'] + + columns = ( + 'Port', + 'Segmentation Type', + 'Segmentation ID', + ) + data = [] + for s in _subports: + data.append(( + s['port_id'], + s['segmentation_type'], + s['segmentation_id'], + )) + + def setUp(self): + super(TestListNetworkSubport, self).setUp() + mock.patch('neutronclient.osc.v2.trunk.network_trunk._get_id', + new=_get_id).start() + self.neutronclient.trunk_get_subports = mock.Mock( + return_value={trunk.SUB_PORTS: self._subports}) + + # Get the command object to test + self.cmd = trunk.ListNetworkSubport(self.app, self.namespace) + + def test_subport_list(self): + arglist = [ + '--trunk', self._trunk['name'], + ] + verifylist = [ + ('trunk', self._trunk['name']), + ] + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + columns, data = self.cmd.take_action(parsed_args) + + self.neutronclient.trunk_get_subports.assert_called_once_with( + self._trunk['name']) + self.assertEqual(self.columns, columns) + self.assertEqual(self.data, list(data)) + + +class TestUnsetNetworkTrunk(test_fakes.TestNeutronClientOSCV2): + + _trunk = fakes.FakeTrunk.create_one_trunk() + + columns = ( + 'admin_state_up', + 'id', + 'name', + 'port_id', + 'project_id', + 'status', + 'sub_ports', + ) + data = ( + trunk._format_admin_state(_trunk['admin_state_up']), + _trunk['id'], + _trunk['name'], + _trunk['port_id'], + _trunk['project_id'], + _trunk['status'], + utils.format_list_of_dicts(_trunk['sub_ports']), + ) + + def setUp(self): + super(TestUnsetNetworkTrunk, self).setUp() + + mock.patch('neutronclient.osc.v2.trunk.network_trunk._get_id', + new=_get_id).start() + self.neutronclient.trunk_remove_subports = mock.Mock( + return_value=None) + + # Get the command object to test + self.cmd = trunk.UnsetNetworkTrunk(self.app, self.namespace) + + def test_unset_network_trunk_subport(self): + subport = self._trunk['sub_ports'][0] + arglist = [ + "--subport", subport['port_id'], + self._trunk['name'], + ] + verifylist = [ + ('trunk', self._trunk['name']), + ('unset_subports', [subport['port_id']]), + ] + + parsed_args = self.check_parser(self.cmd, arglist, verifylist) + + result = self.cmd.take_action(parsed_args) + + self.neutronclient.trunk_remove_subports.assert_called_once_with( + self._trunk['name'], + {trunk.SUB_PORTS: [{'port_id': subport['port_id']}]} + ) + self.assertIsNone(result) + + def test_unset_subport_no_arguments_fail(self): + arglist = [ + self._trunk['name'], + ] + verifylist = [ + ('trunk', self._trunk['name']), + ] + self.assertRaises(tests_utils.ParserException, + self.check_parser, self.cmd, arglist, verifylist) diff --git a/neutronclient/v2_0/client.py b/neutronclient/v2_0/client.py index 5504fd1dd..4dc55e3ef 100644 --- a/neutronclient/v2_0/client.py +++ b/neutronclient/v2_0/client.py @@ -521,6 +521,11 @@ class Client(ClientBase): network_ip_availability_path = '/network-ip-availabilities/%s' tags_path = "/%s/%s/tags" tag_path = "/%s/%s/tags/%s" + trunks_path = "/trunks" + trunk_path = "/trunks/%s" + subports_path = "/trunks/%s/get_subports" + subports_add_path = "/trunks/%s/add_subports" + subports_remove_path = "/trunks/%s/remove_subports" # API has no way to report plurals, so we have to hard code them EXTED_PLURALS = {'routers': 'router', @@ -566,6 +571,7 @@ class Client(ClientBase): 'bgp_speakers': 'bgp_speaker', 'bgp_peers': 'bgp_peer', 'network_ip_availabilities': 'network_ip_availability', + 'trunks': 'trunk', } def list_ext(self, collection, path, retrieve_all, **_params): @@ -1800,6 +1806,39 @@ class Client(ClientBase): """Remove all tags on the resource.""" return self.delete(self.tags_path % (resource_type, resource_id)) + def create_trunk(self, body=None): + """Create a trunk port.""" + return self.post(self.trunks_path, body=body) + + def update_trunk(self, trunk, body=None): + """Update a trunk port.""" + return self.put(self.trunk_path % trunk, body=body) + + def delete_trunk(self, trunk): + """Delete a trunk port.""" + return self.delete(self.trunk_path % (trunk)) + + def list_trunks(self, retrieve_all=True, **_params): + """Fetch a list of all trunk ports.""" + return self.list('trunks', self.trunks_path, retrieve_all, + **_params) + + def show_trunk(self, trunk, **_params): + """Fetch information for a certain trunk port.""" + return self.get(self.trunk_path % (trunk), params=_params) + + def trunk_add_subports(self, trunk, body=None): + """Add specified subports to the trunk.""" + return self.put(self.subports_add_path % (trunk), body=body) + + def trunk_remove_subports(self, trunk, body=None): + """Removes specified subports from the trunk.""" + return self.put(self.subports_remove_path % (trunk), body=body) + + def trunk_get_subports(self, trunk, **_params): + """Fetch a list of all subports attached to given trunk.""" + return self.get(self.subports_path % (trunk), params=_params) + def __init__(self, **kwargs): """Initialize a new client for the Neutron v2.0 API.""" super(Client, self).__init__(**kwargs) diff --git a/releasenotes/notes/add-osc-trunk-commands-7e77283a369729c5.yaml b/releasenotes/notes/add-osc-trunk-commands-7e77283a369729c5.yaml new file mode 100644 index 000000000..684ae3102 --- /dev/null +++ b/releasenotes/notes/add-osc-trunk-commands-7e77283a369729c5.yaml @@ -0,0 +1,8 @@ +--- +features: + - | + Add ``network trunk create``, ``network trunk list``, + ``network trunk set``, ``network trunk unset``, ``network trunk delete`` + and ``network subport list`` OSC commands for trunk resource along with + client bindings. + [Blueprint `vlan-aware-vms `_] diff --git a/setup.cfg b/setup.cfg index 89d31ec54..bb780713e 100644 --- a/setup.cfg +++ b/setup.cfg @@ -35,6 +35,13 @@ openstack.cli.extension = neutronclient = neutronclient.osc.plugin openstack.neutronclient.v2 = + network_subport_list = neutronclient.osc.v2.trunk.network_trunk:ListNetworkSubport + network_trunk_create = neutronclient.osc.v2.trunk.network_trunk:CreateNetworkTrunk + network_trunk_delete = neutronclient.osc.v2.trunk.network_trunk:DeleteNetworkTrunk + network_trunk_list = neutronclient.osc.v2.trunk.network_trunk:ListNetworkTrunk + network_trunk_set = neutronclient.osc.v2.trunk.network_trunk:SetNetworkTrunk + network_trunk_show = neutronclient.osc.v2.trunk.network_trunk:ShowNetworkTrunk + network_trunk_unset = neutronclient.osc.v2.trunk.network_trunk:UnsetNetworkTrunk [build_sphinx] all_files = 1