Add trunk commands to openstackclient

This patch introduces the client bindings for the trunk
extension. It uses the openstackclient framework, and
thus requires the openstack client to work.

New commands introduced are as follows:
"openstack network trunk create" to create a trunk.
"openstack network trunk set" to update/add subports to a trunk.
"openstack network trunk unset" to remove subports from trunk.
"openstack network trunk list" to list all trunks.
"openstack network trunk delete" to delete trunks.
"openstack subport list --trunk <name>" to list all subports
belonging to a trunk.

DocImpact: Openstackclient now supports CLIs to configure
           trunk resource via OSC plugin for neutronclient

Change-Id: I6fe1dbd81813fae234801a61c0e3d89f9e7c791e
Co-authored-by: SongmingYan <yan.songming@zte.com.cn>
Co-authored-by: Abhishek Raut <rauta@vmware.com>
Partial-implements: blueprint vlan-aware-vms
This commit is contained in:
Armando Migliaccio 2016-07-11 17:23:12 -07:00 committed by Abhishek Raut
parent 7f103c395b
commit 449a1f7d13
8 changed files with 1054 additions and 0 deletions

View File

View File

@ -0,0 +1,347 @@
# Copyright 2016 ZTE Corporation.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""Network trunk and subports action implementations"""
import logging
from osc_lib.cli import parseractions
from osc_lib.command import command
from osc_lib import exceptions
from osc_lib import utils
# TODO(abhiraut): Switch to neutronclients identity utils
from openstackclient.identity import common as identity_common
from neutronclient._i18n import _
# TODO(abhiraut): Switch to client methods
from neutronclient.neutron import v2_0 as neutronV20
LOG = logging.getLogger(__name__)
TRUNK = 'trunk'
TRUNKS = 'trunks'
SUB_PORTS = 'sub_ports'
class CreateNetworkTrunk(command.ShowOne):
"""Create a network trunk for a given project"""
def get_parser(self, prog_name):
parser = super(CreateNetworkTrunk, self).get_parser(prog_name)
parser.add_argument(
'name',
metavar='<name>',
help=_("Name of the trunk to create")
)
parser.add_argument(
'--parent-port',
metavar='<parent-port>',
required=True,
help=_("Parent port belonging to this trunk (name or ID)")
)
parser.add_argument(
'--subport',
metavar='<port=,segmentation-type=,segmentation-id=>',
action=parseractions.MultiKeyValueAction, dest='add_subports',
help=_("Subport to add. Subport is of form "
"\'port=<name or ID>,segmentation-type=,segmentation-ID=\' "
"(--subport) option can be repeated)")
)
admin_group = parser.add_mutually_exclusive_group()
admin_group.add_argument(
'--enable',
action='store_true',
default=True,
help=_("Enable trunk (default)")
)
admin_group.add_argument(
'--disable',
action='store_true',
help=_("Disable trunk")
)
parser.add_argument(
'--project',
metavar='<project>',
help=_("Owner's project (name or ID)")
)
identity_common.add_project_domain_option_to_parser(parser)
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
attrs = _get_attrs_for_trunk(self.app.client_manager,
parsed_args)
body = {TRUNK: attrs}
obj = client.create_trunk(body)
columns = _get_columns(obj[TRUNK])
data = utils.get_dict_properties(obj[TRUNK], columns,
formatters=_formatters)
return columns, data
class DeleteNetworkTrunk(command.Command):
"""Delete a given network trunk"""
def get_parser(self, prog_name):
parser = super(DeleteNetworkTrunk, self).get_parser(prog_name)
parser.add_argument(
'trunk',
metavar="<trunk>",
nargs="+",
help=_("Trunk(s) to delete (name or ID)")
)
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
result = 0
for trunk in parsed_args.trunk:
try:
trunk_id = _get_id(client, trunk, TRUNK)
client.delete_trunk(trunk_id)
except Exception as e:
result += 1
LOG.error(_("Failed to delete trunk with name "
"or ID '%(trunk)s': %(e)s")
% {'trunk': trunk, 'e': e})
if result > 0:
total = len(parsed_args.trunk)
msg = (_("%(result)s of %(total)s trunks failed "
"to delete.") % {'result': result, 'total': total})
raise exceptions.CommandError(msg)
class ListNetworkTrunk(command.Lister):
"""List all network trunks"""
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
data = client.list_trunks()
# TODO(abhiraut): List more columns using --long
headers = ('ID', 'Name', 'Parent Port')
columns = ('id', 'name', 'port_id')
return (headers,
(utils.get_dict_properties(
s, columns,
formatters=_formatters,
) for s in data[TRUNKS]))
class SetNetworkTrunk(command.Command):
"""Set network trunk properties"""
def get_parser(self, prog_name):
parser = super(SetNetworkTrunk, self).get_parser(prog_name)
parser.add_argument(
'trunk',
metavar="<trunk>",
help=_("Trunk to modify (name or ID)")
)
parser.add_argument(
'--name',
metavar="<name>",
help=_("Set trunk name")
)
parser.add_argument(
'--subport',
metavar='<port=,segmentation-type=,segmentation-id=>',
action=parseractions.MultiKeyValueAction, dest='set_subports',
help=_("Subport to add. Subport is of form "
"\'port=<name or ID>,segmentation-type=,segmentation-ID=\'"
"(--subport) option can be repeated)")
)
admin_group = parser.add_mutually_exclusive_group()
admin_group.add_argument(
'--enable',
action='store_true',
help=_("Enable trunk")
)
admin_group.add_argument(
'--disable',
action='store_true',
help=_("Disable trunk")
)
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
trunk_id = _get_id(client, parsed_args.trunk, TRUNK)
attrs = _get_attrs_for_trunk(self.app.client_manager, parsed_args)
body = {TRUNK: attrs}
client.update_trunk(trunk_id, body)
if parsed_args.set_subports:
subport_attrs = _get_attrs_for_subports(self.app.client_manager,
parsed_args)
client.trunk_add_subports(trunk_id, subport_attrs)
class ShowNetworkTrunk(command.ShowOne):
"""Show information of a given network trunk"""
def get_parser(self, prog_name):
parser = super(ShowNetworkTrunk, self).get_parser(prog_name)
parser.add_argument(
'trunk',
metavar="<trunk>",
help=_("Trunk to display (name or ID)")
)
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
trunk_id = _get_id(client, parsed_args.trunk, TRUNK)
obj = client.show_trunk(trunk_id)
columns = _get_columns(obj[TRUNK])
data = utils.get_dict_properties(obj[TRUNK], columns,
formatters=_formatters)
return columns, data
class ListNetworkSubport(command.Lister):
"""List all subports for a given network trunk"""
def get_parser(self, prog_name):
parser = super(ListNetworkSubport, self).get_parser(prog_name)
parser.add_argument(
'--trunk',
required=True,
metavar="<trunk>",
help=_("List subports belonging to this trunk (name or ID)")
)
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
trunk_id = _get_id(client, parsed_args.trunk, TRUNK)
data = client.trunk_get_subports(trunk_id)
headers = ('Port', 'Segmentation Type', 'Segmentation ID')
columns = ('port_id', 'segmentation_type', 'segmentation_id')
return (headers,
(utils.get_dict_properties(
s, columns,
) for s in data[SUB_PORTS]))
class UnsetNetworkTrunk(command.Command):
"""Unset subports from a given network trunk"""
def get_parser(self, prog_name):
parser = super(UnsetNetworkTrunk, self).get_parser(prog_name)
parser.add_argument(
'trunk',
metavar="<trunk>",
help=_("Unset subports from this trunk (name or ID)")
)
parser.add_argument(
'--subport',
metavar="<subport>",
required=True,
action='append', dest='unset_subports',
help=_("Subport to delete (name or ID of the port) "
"(--subport) option can be repeated")
)
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.neutronclient
attrs = _get_attrs_for_subports(self.app.client_manager, parsed_args)
trunk_id = _get_id(client, parsed_args.trunk, TRUNK)
client.trunk_remove_subports(trunk_id, attrs)
def _format_admin_state(item):
return 'UP' if item else 'DOWN'
_formatters = {
'admin_state_up': _format_admin_state,
'sub_ports': utils.format_list_of_dicts,
}
def _get_columns(item):
return tuple(sorted(list(item.keys())))
def _get_attrs_for_trunk(client_manager, parsed_args):
attrs = {}
if parsed_args.name is not None:
attrs['name'] = str(parsed_args.name)
if parsed_args.enable:
attrs['admin_state_up'] = True
if parsed_args.disable:
attrs['admin_state_up'] = False
if 'parent_port' in parsed_args and parsed_args.parent_port is not None:
port_id = _get_id(client_manager.neutronclient,
parsed_args.parent_port, 'port')
attrs['port_id'] = port_id
if 'add_subports' in parsed_args and parsed_args.add_subports is not None:
attrs[SUB_PORTS] = _format_subports(client_manager,
parsed_args.add_subports)
# "trunk set" command doesn't support setting project.
if 'project' in parsed_args and parsed_args.project is not None:
identity_client = client_manager.identity
project_id = identity_common.find_project(
identity_client,
parsed_args.project,
parsed_args.project_domain,
).id
attrs['tenant_id'] = project_id
return attrs
def _format_subports(client_manager, subports):
attrs = []
for subport in subports:
subport_attrs = {}
if subport.get('port'):
port_id = _get_id(client_manager.neutronclient,
subport['port'], 'port')
subport_attrs['port_id'] = port_id
if subport.get('segmentation-id'):
try:
subport_attrs['segmentation_id'] = int(
subport['segmentation-id'])
except ValueError:
msg = (_("Segmentation-id '%s' is not an integer") %
subport['segmentation-id'])
raise exceptions.CommandError(msg)
if subport.get('segmentation-type'):
subport_attrs['segmentation_type'] = subport['segmentation-type']
attrs.append(subport_attrs)
return attrs
def _get_attrs_for_subports(client_manager, parsed_args):
attrs = {}
if 'set_subports' in parsed_args and parsed_args.set_subports is not None:
attrs[SUB_PORTS] = _format_subports(client_manager,
parsed_args.set_subports)
if ('unset_subports' in parsed_args and
parsed_args.unset_subports is not None):
subports_list = []
for subport in parsed_args.unset_subports:
port_id = _get_id(client_manager.neutronclient,
subport, 'port')
subports_list.append({'port_id': port_id})
attrs[SUB_PORTS] = subports_list
return attrs
def _get_id(client, id_or_name, resource):
return neutronV20.find_resourceid_by_name_or_id(
client, resource, str(id_or_name))

View File

@ -0,0 +1,83 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import mock
import uuid
class FakeTrunk(object):
"""Fake one or more trunks."""
@staticmethod
def create_one_trunk(attrs=None):
"""Create a fake trunk.
:param Dictionary attrs:
A dictionary with all attributes
:return:
A Dictionary with id, name, admin_state_up,
port_id, sub_ports, status and project_id
"""
attrs = attrs or {}
# Set default attributes.
trunk_attrs = {
'id': 'trunk-id-' + uuid.uuid4().hex,
'name': 'trunk-name-' + uuid.uuid4().hex,
'port_id': 'port-' + uuid.uuid4().hex,
'admin_state_up': True,
'project_id': 'project-id-' + uuid.uuid4().hex,
'status': 'ACTIVE',
'sub_ports': [{'port_id': 'subport-' + uuid.uuid4().hex,
'segmentation_type': 'vlan',
'segmentation_id': 100}],
}
# Overwrite default attributes.
trunk_attrs.update(attrs)
return copy.deepcopy(trunk_attrs)
@staticmethod
def create_trunks(attrs=None, count=2):
"""Create multiple fake trunks.
:param Dictionary attrs:
A dictionary with all attributes
:param int count:
The number of routers to fake
:return:
A list of dictionaries faking the trunks
"""
trunks = []
for i in range(0, count):
trunks.append(FakeTrunk.create_one_trunk(attrs))
return trunks
@staticmethod
def get_trunks(trunks=None, count=2):
"""Get an iterable MagicMock object with a list of faked trunks.
If trunks list is provided, then initialize the Mock object with the
list. Otherwise create one.
:param List trunks:
A list of FakeResource objects faking trunks
:param int count:
The number of trunks to fake
:return:
An iterable Mock object with side_effect set to a list of faked
trunks
"""
if trunks is None:
trunks = FakeTrunk.create_trunks(count)
return mock.MagicMock(side_effect=trunks)

View File

@ -0,0 +1,570 @@
# Copyright 2016 ZTE Corporation.
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from mock import call
from osc_lib import exceptions
from osc_lib import utils
# TODO(abhiraut): Switch to osc-lib test utils
from openstackclient.tests import utils as tests_utils
from neutronclient.osc.v2.trunk import network_trunk as trunk
from neutronclient.tests.unit.osc.v2 import fakes as test_fakes
from neutronclient.tests.unit.osc.v2.trunk import fakes
def _get_id(client, id_or_name, resource):
return id_or_name
class TestCreateNetworkTrunk(test_fakes.TestNeutronClientOSCV2):
# The new trunk created
_trunk = fakes.FakeTrunk.create_one_trunk()
columns = (
'admin_state_up',
'id',
'name',
'port_id',
'project_id',
'status',
'sub_ports',
)
data = (
trunk._format_admin_state(_trunk['admin_state_up']),
_trunk['id'],
_trunk['name'],
_trunk['port_id'],
_trunk['project_id'],
_trunk['status'],
utils.format_list_of_dicts(_trunk['sub_ports']),
)
def setUp(self):
super(TestCreateNetworkTrunk, self).setUp()
mock.patch('neutronclient.osc.v2.trunk.network_trunk._get_id',
new=_get_id).start()
self.neutronclient.create_trunk = mock.Mock(
return_value={trunk.TRUNK: self._trunk})
# Get the command object to test
self.cmd = trunk.CreateNetworkTrunk(self.app, self.namespace)
def test_create_no_options(self):
arglist = []
verifylist = []
self.assertRaises(tests_utils.ParserException, self.check_parser,
self.cmd, arglist, verifylist)
def test_create_default_options(self):
arglist = [
"--parent-port", self._trunk['port_id'],
self._trunk['name'],
]
verifylist = [
('parent_port', self._trunk['port_id']),
('name', self._trunk['name']),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = (self.cmd.take_action(parsed_args))
self.neutronclient.create_trunk.assert_called_once_with({
trunk.TRUNK: {'name': self._trunk['name'],
'admin_state_up': self._trunk['admin_state_up'],
'port_id': self._trunk['port_id']}
})
self.assertEqual(self.columns, columns)
self.assertEqual(self.data, data)
def test_create_full_options(self):
subport = self._trunk['sub_ports'][0]
arglist = [
"--disable",
"--parent-port", self._trunk['port_id'],
"--subport", 'port=%(port)s,segmentation-type=%(seg_type)s,'
'segmentation-id=%(seg_id)s' % {
'seg_id': subport['segmentation_id'],
'seg_type': subport['segmentation_type'],
'port': subport['port_id']},
self._trunk['name'],
]
verifylist = [
('name', self._trunk['name']),
('parent_port', self._trunk['port_id']),
('add_subports', [{
'port': subport['port_id'],
'segmentation-id': str(subport['segmentation_id']),
'segmentation-type': subport['segmentation_type']}]),
('disable', True),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = (self.cmd.take_action(parsed_args))
self.neutronclient.create_trunk.assert_called_once_with({
trunk.TRUNK: {'name': self._trunk['name'],
'admin_state_up': False,
'sub_ports': [subport],
'port_id': self._trunk['port_id']}
})
self.assertEqual(self.columns, columns)
self.assertEqual(self.data, data)
def test_create_trunk_with_subport_invalid_segmentation_id_fail(self):
subport = self._trunk['sub_ports'][0]
arglist = [
"--parent-port", self._trunk['port_id'],
"--subport", "port=%(port)s,segmentation-type=%(seg_type)s,"
"segmentation-id=boom" % {
'seg_type': subport['segmentation_type'],
'port': subport['port_id']},
self._trunk['name'],
]
verifylist = [
('name', self._trunk['name']),
('parent_port', self._trunk['port_id']),
('add_subports', [{
'port': subport['port_id'],
'segmentation-id': 'boom',
'segmentation-type': subport['segmentation_type']}]),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
try:
self.cmd.take_action(parsed_args)
self.fail('CommandError should be raised.')
except exceptions.CommandError as e:
self.assertEqual("Segmentation-id 'boom' is not an integer",
str(e))
class TestDeleteNetworkTrunk(test_fakes.TestNeutronClientOSCV2):
# The trunk to be deleted.
_trunks = fakes.FakeTrunk.create_trunks(count=2)
def setUp(self):
super(TestDeleteNetworkTrunk, self).setUp()
mock.patch('neutronclient.osc.v2.trunk.network_trunk._get_id',
new=_get_id).start()
self.neutronclient.delete_trunk = mock.Mock(return_value=None)
# Get the command object to test
self.cmd = trunk.DeleteNetworkTrunk(self.app, self.namespace)
def test_delete_trunk(self):
arglist = [
self._trunks[0]['name'],
]
verifylist = [
('trunk', [self._trunks[0]['name']]),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.neutronclient.delete_trunk.assert_called_once_with(
self._trunks[0]['name'])
self.assertIsNone(result)
def test_delete_trunk_multiple(self):
arglist = []
verifylist = []
for t in self._trunks:
arglist.append(t['name'])
verifylist = [
('trunk', arglist),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
calls = []
for t in self._trunks:
calls.append(call(t['name']))
self.neutronclient.delete_trunk.assert_has_calls(calls)
self.assertIsNone(result)
def test_delete_trunk_multiple_with_exception(self):
arglist = [
self._trunks[0]['name'],
'unexist_trunk',
]
verifylist = [
('trunk',
[self._trunks[0]['name'], 'unexist_trunk']),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
get_mock_result = [self._trunks[0], exceptions.CommandError]
trunk._get_id = (
mock.MagicMock(side_effect=get_mock_result)
)
try:
self.cmd.take_action(parsed_args)
self.fail('CommandError should be raised.')
except exceptions.CommandError as e:
self.assertEqual('1 of 2 trunks failed to delete.', str(e))
self.neutronclient.delete_trunk.assert_called_once_with(
self._trunks[0]
)
class TestShowNetworkTrunk(test_fakes.TestNeutronClientOSCV2):
# The trunk to set.
_trunk = fakes.FakeTrunk.create_one_trunk()
columns = (
'admin_state_up',
'id',
'name',
'port_id',
'project_id',
'status',
'sub_ports',
)
data = (
trunk._format_admin_state(_trunk['admin_state_up']),
_trunk['id'],
_trunk['name'],
_trunk['port_id'],
_trunk['project_id'],
_trunk['status'],
utils.format_list_of_dicts(_trunk['sub_ports']),
)
def setUp(self):
super(TestShowNetworkTrunk, self).setUp()
mock.patch('neutronclient.osc.v2.trunk.network_trunk._get_id',
new=_get_id).start()
self.neutronclient.show_trunk = mock.Mock(
return_value={trunk.TRUNK: self._trunk})
# Get the command object to test
self.cmd = trunk.ShowNetworkTrunk(self.app, self.namespace)
def test_show_no_options(self):
arglist = []
verifylist = []
self.assertRaises(tests_utils.ParserException, self.check_parser,
self.cmd, arglist, verifylist)
def test_show_all_options(self):
arglist = [
self._trunk['id'],
]
verifylist = [
('trunk', self._trunk['id']),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.neutronclient.show_trunk.assert_called_once_with(
self._trunk['id'])
self.assertEqual(self.columns, columns)
self.assertEqual(self.data, data)
class TestListNetworkTrunk(test_fakes.TestNeutronClientOSCV2):
# Create trunks to be listed.
_trunks = fakes.FakeTrunk.create_trunks(count=3)
columns = (
'ID',
'Name',
'Parent Port',
)
data = []
for t in _trunks:
data.append((
t['id'],
t['name'],
t['port_id'],
))
def setUp(self):
super(TestListNetworkTrunk, self).setUp()
mock.patch('neutronclient.osc.v2.trunk.network_trunk._get_id',
new=_get_id).start()
self.neutronclient.list_trunks = mock.Mock(
return_value={trunk.TRUNKS: self._trunks})
# Get the command object to test
self.cmd = trunk.ListNetworkTrunk(self.app, self.namespace)
def test_trunk_list_no_option(self):
arglist = []
verifylist = []
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.neutronclient.list_trunks.assert_called_once_with()
self.assertEqual(self.columns, columns)
self.assertEqual(self.data, list(data))
class TestSetNetworkTrunk(test_fakes.TestNeutronClientOSCV2):
# Create trunks to be listed.
_trunk = fakes.FakeTrunk.create_one_trunk()
columns = (
'admin_state_up',
'id',
'name',
'port_id',
'project_id',
'status',
'sub_ports',
)
data = (
trunk._format_admin_state(_trunk['admin_state_up']),
_trunk['id'],
_trunk['name'],
_trunk['port_id'],
_trunk['project_id'],
_trunk['status'],
utils.format_list_of_dicts(_trunk['sub_ports']),
)
def setUp(self):
super(TestSetNetworkTrunk, self).setUp()
mock.patch('neutronclient.osc.v2.trunk.network_trunk._get_id',
new=_get_id).start()
self.neutronclient.update_trunk = mock.Mock(
return_value={trunk.TRUNK: self._trunk})
self.neutronclient.trunk_add_subports = mock.Mock(
return_value=self._trunk)
# Get the command object to test
self.cmd = trunk.SetNetworkTrunk(self.app, self.namespace)
def test_set_network_trunk_name(self):
arglist = [
'--name', 'trunky',
self._trunk['name'],
]
verifylist = [
('name', 'trunky'),
('trunk', self._trunk['name']),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
attrs = {
'name': 'trunky',
}
self.neutronclient.update_trunk.assert_called_once_with(
self._trunk['name'], {trunk.TRUNK: attrs})
self.assertIsNone(result)
def test_set_network_trunk_admin_state_up_disable(self):
arglist = [
'--disable',
self._trunk['name'],
]
verifylist = [
('disable', True),
('trunk', self._trunk['name']),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
attrs = {
'admin_state_up': False,
}
self.neutronclient.update_trunk.assert_called_once_with(
self._trunk['name'], {trunk.TRUNK: attrs})
self.assertIsNone(result)
def test_set_network_trunk_admin_state_up_enable(self):
arglist = [
'--enable',
self._trunk['name'],
]
verifylist = [
('enable', True),
('trunk', self._trunk['name']),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
attrs = {
'admin_state_up': True,
}
self.neutronclient.update_trunk.assert_called_once_with(
self._trunk['name'], {trunk.TRUNK: attrs})
self.assertIsNone(result)
def test_set_network_trunk_nothing(self):
arglist = [self._trunk['name'], ]
verifylist = [('trunk', self._trunk['name']), ]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
attrs = {}
self.neutronclient.update_trunk.assert_called_once_with(
self._trunk['name'], {trunk.TRUNK: attrs})
self.assertIsNone(result)
def test_set_network_trunk_subports(self):
subport = self._trunk['sub_ports'][0]
arglist = [
"--subport", 'port=%(port)s,segmentation-type=%(seg_type)s,'
'segmentation-id=%(seg_id)s' % {
'seg_id': subport['segmentation_id'],
'seg_type': subport['segmentation_type'],
'port': subport['port_id']},
self._trunk['name'],
]
verifylist = [
('trunk', self._trunk['name']),
('set_subports', [{
'port': subport['port_id'],
'segmentation-id': str(subport['segmentation_id']),
'segmentation-type': subport['segmentation_type']}]),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.neutronclient.trunk_add_subports.assert_called_once_with(
self._trunk['name'], {'sub_ports': [subport]}
)
self.assertIsNone(result)
class TestListNetworkSubport(test_fakes.TestNeutronClientOSCV2):
_trunk = fakes.FakeTrunk.create_one_trunk()
_subports = _trunk['sub_ports']
columns = (
'Port',
'Segmentation Type',
'Segmentation ID',
)
data = []
for s in _subports:
data.append((
s['port_id'],
s['segmentation_type'],
s['segmentation_id'],
))
def setUp(self):
super(TestListNetworkSubport, self).setUp()
mock.patch('neutronclient.osc.v2.trunk.network_trunk._get_id',
new=_get_id).start()
self.neutronclient.trunk_get_subports = mock.Mock(
return_value={trunk.SUB_PORTS: self._subports})
# Get the command object to test
self.cmd = trunk.ListNetworkSubport(self.app, self.namespace)
def test_subport_list(self):
arglist = [
'--trunk', self._trunk['name'],
]
verifylist = [
('trunk', self._trunk['name']),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
columns, data = self.cmd.take_action(parsed_args)
self.neutronclient.trunk_get_subports.assert_called_once_with(
self._trunk['name'])
self.assertEqual(self.columns, columns)
self.assertEqual(self.data, list(data))
class TestUnsetNetworkTrunk(test_fakes.TestNeutronClientOSCV2):
_trunk = fakes.FakeTrunk.create_one_trunk()
columns = (
'admin_state_up',
'id',
'name',
'port_id',
'project_id',
'status',
'sub_ports',
)
data = (
trunk._format_admin_state(_trunk['admin_state_up']),
_trunk['id'],
_trunk['name'],
_trunk['port_id'],
_trunk['project_id'],
_trunk['status'],
utils.format_list_of_dicts(_trunk['sub_ports']),
)
def setUp(self):
super(TestUnsetNetworkTrunk, self).setUp()
mock.patch('neutronclient.osc.v2.trunk.network_trunk._get_id',
new=_get_id).start()
self.neutronclient.trunk_remove_subports = mock.Mock(
return_value=None)
# Get the command object to test
self.cmd = trunk.UnsetNetworkTrunk(self.app, self.namespace)
def test_unset_network_trunk_subport(self):
subport = self._trunk['sub_ports'][0]
arglist = [
"--subport", subport['port_id'],
self._trunk['name'],
]
verifylist = [
('trunk', self._trunk['name']),
('unset_subports', [subport['port_id']]),
]
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
result = self.cmd.take_action(parsed_args)
self.neutronclient.trunk_remove_subports.assert_called_once_with(
self._trunk['name'],
{trunk.SUB_PORTS: [{'port_id': subport['port_id']}]}
)
self.assertIsNone(result)
def test_unset_subport_no_arguments_fail(self):
arglist = [
self._trunk['name'],
]
verifylist = [
('trunk', self._trunk['name']),
]
self.assertRaises(tests_utils.ParserException,
self.check_parser, self.cmd, arglist, verifylist)

View File

@ -521,6 +521,11 @@ class Client(ClientBase):
network_ip_availability_path = '/network-ip-availabilities/%s'
tags_path = "/%s/%s/tags"
tag_path = "/%s/%s/tags/%s"
trunks_path = "/trunks"
trunk_path = "/trunks/%s"
subports_path = "/trunks/%s/get_subports"
subports_add_path = "/trunks/%s/add_subports"
subports_remove_path = "/trunks/%s/remove_subports"
# API has no way to report plurals, so we have to hard code them
EXTED_PLURALS = {'routers': 'router',
@ -566,6 +571,7 @@ class Client(ClientBase):
'bgp_speakers': 'bgp_speaker',
'bgp_peers': 'bgp_peer',
'network_ip_availabilities': 'network_ip_availability',
'trunks': 'trunk',
}
def list_ext(self, collection, path, retrieve_all, **_params):
@ -1800,6 +1806,39 @@ class Client(ClientBase):
"""Remove all tags on the resource."""
return self.delete(self.tags_path % (resource_type, resource_id))
def create_trunk(self, body=None):
"""Create a trunk port."""
return self.post(self.trunks_path, body=body)
def update_trunk(self, trunk, body=None):
"""Update a trunk port."""
return self.put(self.trunk_path % trunk, body=body)
def delete_trunk(self, trunk):
"""Delete a trunk port."""
return self.delete(self.trunk_path % (trunk))
def list_trunks(self, retrieve_all=True, **_params):
"""Fetch a list of all trunk ports."""
return self.list('trunks', self.trunks_path, retrieve_all,
**_params)
def show_trunk(self, trunk, **_params):
"""Fetch information for a certain trunk port."""
return self.get(self.trunk_path % (trunk), params=_params)
def trunk_add_subports(self, trunk, body=None):
"""Add specified subports to the trunk."""
return self.put(self.subports_add_path % (trunk), body=body)
def trunk_remove_subports(self, trunk, body=None):
"""Removes specified subports from the trunk."""
return self.put(self.subports_remove_path % (trunk), body=body)
def trunk_get_subports(self, trunk, **_params):
"""Fetch a list of all subports attached to given trunk."""
return self.get(self.subports_path % (trunk), params=_params)
def __init__(self, **kwargs):
"""Initialize a new client for the Neutron v2.0 API."""
super(Client, self).__init__(**kwargs)

View File

@ -0,0 +1,8 @@
---
features:
- |
Add ``network trunk create``, ``network trunk list``,
``network trunk set``, ``network trunk unset``, ``network trunk delete``
and ``network subport list`` OSC commands for trunk resource along with
client bindings.
[Blueprint `vlan-aware-vms <https://blueprints.launchpad.net/neutron/+spec/vlan-aware-vms>`_]

View File

@ -35,6 +35,13 @@ openstack.cli.extension =
neutronclient = neutronclient.osc.plugin
openstack.neutronclient.v2 =
network_subport_list = neutronclient.osc.v2.trunk.network_trunk:ListNetworkSubport
network_trunk_create = neutronclient.osc.v2.trunk.network_trunk:CreateNetworkTrunk
network_trunk_delete = neutronclient.osc.v2.trunk.network_trunk:DeleteNetworkTrunk
network_trunk_list = neutronclient.osc.v2.trunk.network_trunk:ListNetworkTrunk
network_trunk_set = neutronclient.osc.v2.trunk.network_trunk:SetNetworkTrunk
network_trunk_show = neutronclient.osc.v2.trunk.network_trunk:ShowNetworkTrunk
network_trunk_unset = neutronclient.osc.v2.trunk.network_trunk:UnsetNetworkTrunk
[build_sphinx]
all_files = 1