Instead of the neutron commands in the form neutron l2-gateway-<command> and neutron l2-gateway-connection-<command> introduce the osc form: openstack l2gw <command> and openstack l2gw connection <command>. Change-Id: I3ad1c6fdde9c3cfc8732680f130fbd6420d98c53 Closes-Bug: #1755435stable/rocky
parent
5517dc26f1
commit
1125e0ad89
@ -0,0 +1,256 @@
|
||||
# All Rights Reserved 2018
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
|
||||
from osc_lib.cli import format_columns
|
||||
from osc_lib.command import command
|
||||
from osc_lib import exceptions
|
||||
from osc_lib import utils as osc_utils
|
||||
from osc_lib.utils import columns as column_util
|
||||
|
||||
from neutronclient._i18n import _
|
||||
from neutronclient.common import utils
|
||||
from neutronclient.osc import utils as nc_osc_utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
INTERFACE_DELIMITER = ";"
|
||||
SEGMENTATION_ID_DELIMITER = "#"
|
||||
INTERFACE_SEG_ID_DELIMITER = "|"
|
||||
|
||||
L2_GATEWAY = 'l2_gateway'
|
||||
L2_GATEWAYS = '%ss' % L2_GATEWAY
|
||||
|
||||
path = 'l2-gateways'
|
||||
object_path = '/%s' % path
|
||||
resource_path = '/%s/%%s' % path
|
||||
|
||||
_attr_map = (
|
||||
('id', 'ID', column_util.LIST_BOTH),
|
||||
('tenant_id', 'Tenant', column_util.LIST_LONG_ONLY),
|
||||
('name', 'Name', column_util.LIST_BOTH),
|
||||
('devices', 'Devices', column_util.LIST_BOTH),
|
||||
)
|
||||
_formatters = {
|
||||
'devices': format_columns.ListDictColumn,
|
||||
}
|
||||
|
||||
|
||||
def _get_common_parser(parser):
|
||||
"""Adds to parser arguments common to create and update commands.
|
||||
|
||||
:params ArgumentParser parser: argparse object contains all command's
|
||||
arguments
|
||||
"""
|
||||
parser.add_argument(
|
||||
'--device',
|
||||
metavar='name=name,interface_names=INTERFACE-DETAILS',
|
||||
action='append', dest='devices', type=utils.str2dict,
|
||||
help=_('Device name and Interface-names of l2gateway. '
|
||||
'INTERFACE-DETAILS is of form '
|
||||
'\"<interface_name1>;[<interface_name2>]'
|
||||
'[|<seg_id1>[#<seg_id2>]]\" '
|
||||
'(--device option can be repeated)'))
|
||||
|
||||
|
||||
def get_interface(interfaces):
|
||||
interface_dict = []
|
||||
for interface in interfaces:
|
||||
if INTERFACE_SEG_ID_DELIMITER in interface:
|
||||
int_name = interface.split(INTERFACE_SEG_ID_DELIMITER)[0]
|
||||
segid = interface.split(INTERFACE_SEG_ID_DELIMITER)[1]
|
||||
if SEGMENTATION_ID_DELIMITER in segid:
|
||||
segid = segid.split(SEGMENTATION_ID_DELIMITER)
|
||||
else:
|
||||
segid = [segid]
|
||||
interface_detail = {'name': int_name, 'segmentation_id': segid}
|
||||
else:
|
||||
interface_detail = {'name': interface}
|
||||
interface_dict.append(interface_detail)
|
||||
return interface_dict
|
||||
|
||||
|
||||
def _args2body(parsed_args, update=False):
|
||||
if parsed_args.devices:
|
||||
devices = parsed_args.devices
|
||||
interfaces = []
|
||||
else:
|
||||
devices = []
|
||||
device_dict = []
|
||||
for device in devices:
|
||||
if 'interface_names' in device.keys():
|
||||
interface = device['interface_names']
|
||||
if INTERFACE_DELIMITER in interface:
|
||||
interface_dict = interface.split(INTERFACE_DELIMITER)
|
||||
interfaces = get_interface(interface_dict)
|
||||
else:
|
||||
interfaces = get_interface([interface])
|
||||
if 'name' in device.keys():
|
||||
device = {'device_name': device['name'],
|
||||
'interfaces': interfaces}
|
||||
else:
|
||||
device = {'interfaces': interfaces}
|
||||
device_dict.append(device)
|
||||
if parsed_args.name:
|
||||
l2gw_name = parsed_args.name
|
||||
body = {L2_GATEWAY: {'name': l2gw_name,
|
||||
'devices': device_dict}, }
|
||||
else:
|
||||
body = {L2_GATEWAY: {'devices': device_dict}, }
|
||||
return body
|
||||
|
||||
|
||||
class CreateL2gw(command.ShowOne):
|
||||
_description = _("Create l2gateway resource")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super(CreateL2gw, self).get_parser(prog_name)
|
||||
nc_osc_utils.add_project_owner_option_to_parser(parser)
|
||||
parser.add_argument(
|
||||
'name', metavar='<GATEWAY-NAME>',
|
||||
help=_('Descriptive name for logical gateway.'))
|
||||
_get_common_parser(parser)
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.neutronclient
|
||||
attrs = {}
|
||||
if parsed_args.name is not None:
|
||||
attrs['name'] = str(parsed_args.name)
|
||||
if parsed_args.devices is not None:
|
||||
attrs['devices'] = str(parsed_args.devices)
|
||||
if 'project' in parsed_args and parsed_args.project is not None:
|
||||
project_id = nc_osc_utils.find_project(
|
||||
self.app.client_manager.identity,
|
||||
parsed_args.project,
|
||||
parsed_args.project_domain,
|
||||
).id
|
||||
attrs['tenant_id'] = project_id
|
||||
body = _args2body(parsed_args)
|
||||
obj = client.post(object_path, body)[L2_GATEWAY]
|
||||
columns, display_columns = column_util.get_columns(obj, _attr_map)
|
||||
data = osc_utils.get_dict_properties(obj, columns,
|
||||
formatters=_formatters)
|
||||
return display_columns, data
|
||||
|
||||
|
||||
class ListL2gw(command.Lister):
|
||||
_description = _("List l2gateway that belongs to a given tenant")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super(ListL2gw, self).get_parser(prog_name)
|
||||
nc_osc_utils.add_project_owner_option_to_parser(parser)
|
||||
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.neutronclient
|
||||
params = {}
|
||||
if parsed_args.project is not None:
|
||||
project_id = nc_osc_utils.find_project(
|
||||
self.app.client_manager.identity,
|
||||
parsed_args.project,
|
||||
parsed_args.project_domain,
|
||||
).id
|
||||
params['tenant_id'] = project_id
|
||||
objs = client.list(L2_GATEWAYS, object_path,
|
||||
retrieve_all=True, params=params)[L2_GATEWAYS]
|
||||
headers, columns = column_util.get_column_definitions(
|
||||
_attr_map, long_listing=True)
|
||||
return (headers, (osc_utils.get_dict_properties(
|
||||
s, columns, formatters=_formatters) for s in objs))
|
||||
|
||||
|
||||
class ShowL2gw(command.ShowOne):
|
||||
_description = _("Show information of a given l2gateway")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super(ShowL2gw, self).get_parser(prog_name)
|
||||
parser.add_argument(
|
||||
L2_GATEWAY,
|
||||
metavar="<L2_GATEWAY>",
|
||||
help=_("ID or name of l2_gateway to look up."),
|
||||
)
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.neutronclient
|
||||
id = client.find_resource(L2_GATEWAY, parsed_args.l2_gateway)['id']
|
||||
obj = client.get(resource_path % id)[L2_GATEWAY]
|
||||
columns, display_columns = column_util.get_columns(obj, _attr_map)
|
||||
data = osc_utils.get_dict_properties(obj, columns,
|
||||
formatters=_formatters)
|
||||
return display_columns, data
|
||||
|
||||
|
||||
class DeleteL2gw(command.Command):
|
||||
_description = _("Delete a given l2gateway")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super(DeleteL2gw, self).get_parser(prog_name)
|
||||
parser.add_argument(
|
||||
L2_GATEWAYS,
|
||||
metavar="<L2_GATEWAY>",
|
||||
nargs="+",
|
||||
help=_("ID(s) or name(s) of l2_gateway to delete."),
|
||||
)
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.neutronclient
|
||||
fails = 0
|
||||
for id_or_name in parsed_args.l2_gateways:
|
||||
try:
|
||||
id = client.find_resource(L2_GATEWAY, id_or_name)['id']
|
||||
client.delete(resource_path % id)
|
||||
LOG.warning("L2 Gateaway %(id)s deleted", {'id': id})
|
||||
except Exception as e:
|
||||
fails += 1
|
||||
LOG.error("Failed to delete L2 Gateway with name or ID "
|
||||
"'%(id_or_name)s': %(e)s",
|
||||
{'id_or_name': id_or_name, 'e': e})
|
||||
if fails > 0:
|
||||
msg = (_("Failed to delete %(fails)s of %(total)s L2 Gateway.") %
|
||||
{'fails': fails, 'total': len(parsed_args.l2_gateways)})
|
||||
raise exceptions.CommandError(msg)
|
||||
|
||||
|
||||
class UpdateL2gw(command.ShowOne):
|
||||
_description = _("Update a given l2gateway")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super(UpdateL2gw, self).get_parser(prog_name)
|
||||
parser.add_argument(
|
||||
L2_GATEWAY,
|
||||
metavar="<L2_GATEWAY>",
|
||||
help=_("ID or name of l2_gateway to update."),
|
||||
)
|
||||
parser.add_argument('--name', metavar='name',
|
||||
help=_('Descriptive name for logical gateway.'))
|
||||
_get_common_parser(parser)
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.neutronclient
|
||||
id = client.find_resource(L2_GATEWAY, parsed_args.l2_gateway)['id']
|
||||
if parsed_args.devices:
|
||||
body = _args2body(parsed_args)
|
||||
else:
|
||||
body = {L2_GATEWAY: {'name': parsed_args.name}}
|
||||
obj = client.put(resource_path % id, body)[L2_GATEWAY]
|
||||
columns, display_columns = column_util.get_columns(obj, _attr_map)
|
||||
data = osc_utils.get_dict_properties(obj, columns,
|
||||
formatters=_formatters)
|
||||
return display_columns, data
|
@ -0,0 +1,170 @@
|
||||
# All Rights Reserved 2018
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
|
||||
from osc_lib.command import command
|
||||
from osc_lib import exceptions
|
||||
from osc_lib import utils as osc_utils
|
||||
from osc_lib.utils import columns as column_util
|
||||
|
||||
from neutronclient._i18n import _
|
||||
from neutronclient.neutron import v2_0 as n_v20
|
||||
from neutronclient.osc import utils as nc_osc_utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
L2_GATEWAY_CONNECTION = 'l2_gateway_connection'
|
||||
L2_GATEWAY_CONNECTIONS = '%ss' % L2_GATEWAY_CONNECTION
|
||||
|
||||
path = 'l2-gateway-connections'
|
||||
object_path = '/%s' % path
|
||||
resource_path = '/%s/%%s' % path
|
||||
|
||||
_attr_map = (
|
||||
('id', 'ID', column_util.LIST_BOTH),
|
||||
('tenant_id', 'Tenant', column_util.LIST_LONG_ONLY),
|
||||
('l2_gateway_id', 'L2 GateWay ID', column_util.LIST_BOTH),
|
||||
('network_id', 'Network ID', column_util.LIST_BOTH),
|
||||
('segmentation_id', 'Segmentation ID', column_util.LIST_BOTH),
|
||||
)
|
||||
|
||||
|
||||
class CreateL2gwConnection(command.ShowOne):
|
||||
_description = _("Create l2gateway-connection")
|
||||
|
||||
def retrieve_ids(self, client, args):
|
||||
gateway_id = n_v20.find_resourceid_by_name_or_id(
|
||||
client, 'l2_gateway', args.gateway_name)
|
||||
network_id = n_v20.find_resourceid_by_name_or_id(
|
||||
client, 'network', args.network)
|
||||
return gateway_id, network_id
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super(CreateL2gwConnection, self).get_parser(prog_name)
|
||||
parser.add_argument('gateway_name', metavar='<GATEWAY-NAME/UUID>',
|
||||
help=_('Descriptive name for logical gateway.'))
|
||||
parser.add_argument(
|
||||
'network', metavar='<NETWORK-NAME/UUID>',
|
||||
help=_('Network name or uuid.'))
|
||||
parser.add_argument(
|
||||
'--default-segmentation-id',
|
||||
dest='seg_id',
|
||||
help=_('default segmentation-id that will '
|
||||
'be applied to the interfaces for which '
|
||||
'segmentation id was not specified '
|
||||
'in l2-gateway-create command.'))
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.neutronclient
|
||||
(gateway_id, network_id) = self.retrieve_ids(client, parsed_args)
|
||||
body = {
|
||||
L2_GATEWAY_CONNECTION: {
|
||||
'l2_gateway_id': gateway_id, 'network_id': network_id
|
||||
}
|
||||
}
|
||||
if parsed_args.seg_id:
|
||||
body[L2_GATEWAY_CONNECTION]['segmentation_id'] = \
|
||||
parsed_args.seg_id
|
||||
obj = client.post(object_path, body)[L2_GATEWAY_CONNECTION]
|
||||
columns, display_columns = column_util.get_columns(obj, _attr_map)
|
||||
data = osc_utils.get_dict_properties(obj, columns)
|
||||
return display_columns, data
|
||||
|
||||
|
||||
class ListL2gwConnection(command.Lister):
|
||||
_description = _("List l2gateway-connections")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super(ListL2gwConnection, self).get_parser(prog_name)
|
||||
nc_osc_utils.add_project_owner_option_to_parser(parser)
|
||||
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.neutronclient
|
||||
params = {}
|
||||
if parsed_args.project is not None:
|
||||
project_id = nc_osc_utils.find_project(
|
||||
self.app.client_manager.identity,
|
||||
parsed_args.project,
|
||||
parsed_args.project_domain,
|
||||
).id
|
||||
params['tenant_id'] = project_id
|
||||
objs = client.list(
|
||||
L2_GATEWAY_CONNECTIONS, object_path,
|
||||
retrieve_all=True, params=params)[L2_GATEWAY_CONNECTIONS]
|
||||
headers, columns = column_util.get_column_definitions(
|
||||
_attr_map, long_listing=True)
|
||||
return (headers, (osc_utils.get_dict_properties(
|
||||
s, columns) for s in objs))
|
||||
|
||||
|
||||
class ShowL2gwConnection(command.ShowOne):
|
||||
_description = _("Show information of a given l2gateway-connection")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super(ShowL2gwConnection, self).get_parser(prog_name)
|
||||
parser.add_argument(
|
||||
L2_GATEWAY_CONNECTION,
|
||||
metavar="<L2_GATEWAY_CONNECTION>",
|
||||
help=_("ID of l2_gateway_connection to look up."),
|
||||
)
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.neutronclient
|
||||
id = client.find_resource(L2_GATEWAY_CONNECTION,
|
||||
parsed_args.l2_gateway_connection)['id']
|
||||
obj = client.get(resource_path % id)[L2_GATEWAY_CONNECTION]
|
||||
columns, display_columns = column_util.get_columns(obj, _attr_map)
|
||||
data = osc_utils.get_dict_properties(obj, columns)
|
||||
return display_columns, data
|
||||
|
||||
|
||||
class DeleteL2gwConnection(command.Command):
|
||||
_description = _("Delete a given l2gateway-connection")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super(DeleteL2gwConnection, self).get_parser(prog_name)
|
||||
parser.add_argument(
|
||||
L2_GATEWAY_CONNECTIONS,
|
||||
metavar="<L2_GATEWAY_CONNECTIONS>",
|
||||
nargs="+",
|
||||
help=_("ID(s) of l2_gateway_connections(s) to delete."),
|
||||
)
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.neutronclient
|
||||
fails = 0
|
||||
for id_or_name in parsed_args.l2_gateway_connections:
|
||||
try:
|
||||
id = client.find_resource(
|
||||
L2_GATEWAY_CONNECTION, id_or_name)['id']
|
||||
client.delete(resource_path % id)
|
||||
LOG.warning("L2 Gateaway Connection %(id)s deleted",
|
||||
{'id': id})
|
||||
except Exception as e:
|
||||
fails += 1
|
||||
LOG.error("Failed to delete L2 Gateway Connection with name "
|
||||
"or ID '%(id_or_name)s': %(e)s",
|
||||
{'id_or_name': id_or_name, 'e': e})
|
||||
if fails > 0:
|
||||
msg = (_("Failed to delete %(fails)s of %(total)s L2 Gateway "
|
||||
"Connection.") %
|
||||
{'fails': fails, 'total': len(
|
||||
parsed_args.l2_gateway_connections)})
|
||||
raise exceptions.CommandError(msg)
|
@ -0,0 +1,91 @@
|
||||
# All Rights Reserved 2018
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import copy
|
||||
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from networking_l2gw.l2gatewayclient.osc import l2gw as osc_l2gw
|
||||
from networking_l2gw.l2gatewayclient.osc import l2gw_connection as \
|
||||
osc_l2gw_conn
|
||||
|
||||
|
||||
class FakeL2GW(object):
|
||||
|
||||
@staticmethod
|
||||
def create_l2gw(num_dev=1, num_if=1, attrs=None):
|
||||
"""Create one fake L2 Gateway."""
|
||||
attrs = attrs or {}
|
||||
interfaces = [{'name': 'interface' +
|
||||
uuidutils.generate_uuid(dashed=False)} for
|
||||
iface in range(num_if)]
|
||||
devices = [{'device_name': 'device' +
|
||||
uuidutils.generate_uuid(dashed=False),
|
||||
'interfaces': interfaces} for dev in range(num_dev)]
|
||||
l2gw_attrs = {
|
||||
'id': uuidutils.generate_uuid(),
|
||||
'name': 'test-l2gw' + uuidutils.generate_uuid(dashed=False),
|
||||
'tenant_id': uuidutils.generate_uuid(),
|
||||
'devices': devices
|
||||
}
|
||||
|
||||
l2gw_attrs.update(attrs)
|
||||
return copy.deepcopy(l2gw_attrs)
|
||||
|
||||
@staticmethod
|
||||
def create_l2gws(attrs=None, count=1):
|
||||
"""Create multiple fake L2 Gateways."""
|
||||
|
||||
l2gws = []
|
||||
for i in range(0, count):
|
||||
if attrs is None:
|
||||
attrs = {'id': 'fake_id%d' % i}
|
||||
elif getattr(attrs, 'id', None) is None:
|
||||
attrs['id'] = 'fake_id%d' % i
|
||||
l2gws.append(FakeL2GW.create_l2gw(attrs=attrs))
|
||||
|
||||
return {osc_l2gw.L2_GATEWAYS: l2gws}
|
||||
|
||||
|
||||
class FakeL2GWConnection(object):
|
||||
|
||||
@staticmethod
|
||||
def create_l2gw_connection(attrs=None):
|
||||
"""Create a fake l2gw connection."""
|
||||
|
||||
attrs = attrs or {}
|
||||
l2gw_connection_attrs = {
|
||||
'network_id': uuidutils.generate_uuid(),
|
||||
'l2_gateway_id': uuidutils.generate_uuid(),
|
||||
'segmentation_id': '42',
|
||||
'tenant_id': uuidutils.generate_uuid(),
|
||||
'id': uuidutils.generate_uuid()
|
||||
}
|
||||
|
||||
l2gw_connection_attrs.update(attrs)
|
||||
return copy.deepcopy(l2gw_connection_attrs)
|
||||
|
||||
@staticmethod
|
||||
def create_l2gw_connections(attrs=None, count=1):
|
||||
l2gw_connections = []
|
||||
|
||||
for i in range(0, count):
|
||||
if attrs is None:
|
||||
attrs = {'id': 'fake_id%d' % i}
|
||||
elif getattr(attrs, 'id', None) is None:
|
||||
attrs['id'] = 'fake_id%d' % i
|
||||
l2gw_connections.append(FakeL2GWConnection.create_l2gw_connection(
|
||||
attrs=attrs))
|
||||
|
||||
return {osc_l2gw_conn.L2_GATEWAY_CONNECTIONS: l2gw_connections}
|
@ -0,0 +1,502 @@
|
||||
# All Rights Reserved 2018
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import copy
|
||||
import operator
|
||||
|
||||
import mock
|
||||
from osc_lib import utils as osc_utils
|
||||
from osc_lib.utils import columns as column_util
|
||||
|
||||
from neutronclient.tests.unit.osc.v2 import fakes as test_fakes
|
||||
|
||||
from networking_l2gw.l2gatewayclient.osc import l2gw as osc_l2gw
|
||||
from networking_l2gw.tests.unit.l2gatewayclient.osc import fakes
|
||||
|
||||
|
||||
columns_long = tuple(col for col, _, listing_mode in osc_l2gw._attr_map
|
||||
if listing_mode in (column_util.LIST_BOTH,
|
||||
column_util.LIST_LONG_ONLY))
|
||||
headers_long = tuple(head for _, head, listing_mode in osc_l2gw._attr_map
|
||||
if listing_mode in (column_util.LIST_BOTH,
|
||||
column_util.LIST_LONG_ONLY))
|
||||
sorted_attr_map = sorted(osc_l2gw._attr_map, key=operator.itemgetter(1))
|
||||
sorted_columns = tuple(col for col, _, _ in sorted_attr_map)
|
||||
sorted_headers = tuple(head for _, head, _ in sorted_attr_map)
|
||||
|
||||
|
||||
def _get_data(attrs, columns=sorted_columns):
|
||||
return osc_utils.get_dict_properties(attrs, columns,
|
||||
formatters=osc_l2gw._formatters)
|
||||
|
||||
|
||||
class TestCreateL2gw(test_fakes.TestNeutronClientOSCV2):
|
||||
|
||||
columns = (
|
||||
'Devices',
|
||||
'ID',
|
||||
'Name',
|
||||
'Tenant'
|
||||
)
|
||||
|
||||
def setUp(self):
|
||||
super(TestCreateL2gw, self).setUp()
|
||||
self.cmd = osc_l2gw.CreateL2gw(self.app, self.namespace)
|
||||
|
||||
def _assert_create_succeeded(self, fake_l2gw, arg_list, verify_list):
|
||||
parsed_args = self.check_parser(self.cmd, arg_list, verify_list)
|
||||
columns, data = self.cmd.take_action(parsed_args)
|
||||
self.neutronclient.post.assert_called_once_with(
|
||||
osc_l2gw.object_path,
|
||||
{osc_l2gw.L2_GATEWAY:
|
||||
{'name': fake_l2gw['name'], 'devices': fake_l2gw['devices']}}
|
||||
)
|
||||
self.assertEqual(self.columns, columns)
|
||||
self.assertItemEqual(_get_data(fake_l2gw), data)
|
||||
|
||||
def test_create_l2gw(self):
|
||||
"""Test Create l2gateway."""
|
||||
|
||||
fake_l2gw = fakes.FakeL2GW.create_l2gw()
|
||||
|
||||
self.neutronclient.post = mock.Mock(
|
||||
return_value={osc_l2gw.L2_GATEWAY: fake_l2gw})
|
||||
l2gw_device = fake_l2gw['devices'][0]
|
||||
arg_list = [
|
||||
'--device', 'name=' + l2gw_device['device_name'] +
|
||||
',interface_names=' + l2gw_device['interfaces'][0]['name'],
|
||||
fake_l2gw['name']
|
||||
]
|
||||
|
||||
verify_list = [
|
||||
('devices', [
|
||||
{'interface_names': l2gw_device['interfaces'][0]['name'],
|
||||
'name': l2gw_device['device_name']}]),
|
||||
('name', fake_l2gw['name']),
|
||||
]
|
||||
|
||||
self._assert_create_succeeded(fake_l2gw, arg_list, verify_list)
|
||||
|
||||
def test_create_l2gateway_with_multiple_devices(self):
|
||||
"""Test Create l2gateway for multiple devices."""
|
||||
|
||||
fake_l2gw = fakes.FakeL2GW.create_l2gw(num_dev=2)
|
||||
|
||||
self.neutronclient.post = mock.Mock(
|
||||
return_value={osc_l2gw.L2_GATEWAY: fake_l2gw})
|
||||
l2gw_device_1 = fake_l2gw['devices'][0]
|
||||
l2gw_device_2 = fake_l2gw['devices'][1]
|
||||
arg_list = [
|
||||
'--device', 'name=' + l2gw_device_1['device_name'] +
|
||||
',interface_names=' +
|
||||
l2gw_device_1['interfaces'][0]['name'],
|
||||
'--device', 'name=' + l2gw_device_2['device_name'] +
|
||||
',interface_names=' +
|
||||
l2gw_device_2['interfaces'][0]['name'],
|
||||
fake_l2gw['name']
|
||||
]
|
||||
verify_list = [
|
||||
('devices', [
|
||||
{'interface_names': l2gw_device_1['interfaces'][0]['name'],
|
||||
'name': l2gw_device_1['device_name']},
|
||||
{'interface_names': l2gw_device_2['interfaces'][0]['name'],
|
||||
'name': l2gw_device_2['device_name']},
|
||||
]),
|
||||
('name', fake_l2gw['name']),
|
||||
]
|
||||
|
||||
self._assert_create_succeeded(fake_l2gw, arg_list, verify_list)
|
||||
|
||||
def test_create_l2gateway_with_multiple_interfaces(self):
|
||||
"""Test Create l2gateway with multiple interfaces."""
|
||||
|
||||
fake_l2gw = fakes.FakeL2GW.create_l2gw(num_if=2)
|
||||
|
||||
self.neutronclient.post = mock.Mock(
|
||||
return_value={osc_l2gw.L2_GATEWAY: fake_l2gw})
|
||||
l2gw_device = fake_l2gw['devices'][0]
|
||||
l2gw_interface_1 = l2gw_device['interfaces'][0]
|
||||
l2gw_interface_2 = l2gw_device['interfaces'][1]
|
||||
arg_list = [
|
||||
'--device', 'name=' + l2gw_device['device_name'] +
|
||||
',interface_names=' + l2gw_interface_1['name'] + ';' +
|
||||
l2gw_interface_2['name'],
|
||||
fake_l2gw['name']
|
||||
]
|
||||
verify_list = [
|
||||
('devices', [
|
||||
{
|
||||
'interface_names':
|
||||
l2gw_interface_1['name'] + ';' +
|
||||
l2gw_interface_2['name'],
|
||||
'name': l2gw_device['device_name']
|
||||
}
|
||||
]),
|
||||
('name', fake_l2gw['name']),
|
||||
]
|
||||
|
||||
self._assert_create_succeeded(fake_l2gw, arg_list, verify_list)
|
||||
|
||||
def test_create_l2gateway_with_segmentation_id(self):
|
||||
"""Test Create l2gateway with segmentation-id."""
|
||||
|
||||
fake_l2gw = fakes.FakeL2GW.create_l2gw()
|
||||
fake_l2gw['devices'][0]['interfaces'][0]['segmentation_id'] = ['42']
|
||||
|
||||
self.neutronclient.post = mock.Mock(
|
||||
return_value={osc_l2gw.L2_GATEWAY: fake_l2gw})
|
||||
l2gw_device = fake_l2gw['devices'][0]
|
||||
l2gw_interface = l2gw_device['interfaces'][0]
|
||||
arg_list = [
|
||||
'--device', 'name=' + l2gw_device['device_name'] +
|
||||
',interface_names=' + l2gw_interface['name'] + '|' +
|
||||
'#'.join(l2gw_interface['segmentation_id']),
|
||||
fake_l2gw['name']
|
||||
]
|
||||
|
||||
verify_list = [
|
||||
('devices', [
|
||||
{'interface_names': l2gw_device['interfaces'][0]['name'] +
|
||||
'|' + '#'.join(l2gw_interface['segmentation_id']),
|
||||
'name': l2gw_device['device_name']}]),
|
||||
('name', fake_l2gw['name']),
|
||||
]
|
||||
|
||||
self._assert_create_succeeded(fake_l2gw, arg_list, verify_list)
|
||||
|
||||
def test_create_l2gateway_with_mul_segmentation_id(self):
|
||||
"""Test Create l2gateway with multiple segmentation-ids."""
|
||||
|
||||
fake_l2gw = fakes.FakeL2GW.create_l2gw()
|
||||
fake_l2gw['devices'][0]['interfaces'][0]['segmentation_id'] = ['42',
|
||||
'43']
|
||||
|
||||
self.neutronclient.post = mock.Mock(
|
||||
return_value={osc_l2gw.L2_GATEWAY: fake_l2gw})
|
||||
l2gw_device = fake_l2gw['devices'][0]
|
||||
l2gw_interface = l2gw_device['interfaces'][0]
|
||||
arg_list = [
|
||||
'--device', 'name=' + l2gw_device['device_name'] +
|
||||
',interface_names=' + l2gw_interface['name'] + '|' +
|
||||
'#'.join(l2gw_interface['segmentation_id']),
|
||||
fake_l2gw['name'],
|
||||
]
|
||||
|
||||
verify_list = [
|
||||
('devices', [
|
||||
{'interface_names': l2gw_device['interfaces'][0]['name'] +
|
||||
'|' + '#'.join(l2gw_interface['segmentation_id']),
|
||||
'name': l2gw_device['device_name']}]),
|
||||
('name', fake_l2gw['name']),
|
||||
]
|
||||
|
||||
self._assert_create_succeeded(fake_l2gw, arg_list, verify_list)
|
||||
|
||||
|
||||
class TestListL2gw(test_fakes.TestNeutronClientOSCV2):
|
||||
def setUp(self):
|
||||
super(TestListL2gw, self).setUp()
|
||||
self.cmd = osc_l2gw.ListL2gw(self.app, self.namespace)
|
||||
|
||||
def test_list_l2gateway(self):
|
||||
"""Test List l2gateways."""
|
||||
|
||||
fake_l2gws = fakes.FakeL2GW.create_l2gws(count=4)
|
||||
self.neutronclient.list = mock.Mock(return_value=fake_l2gws)
|
||||
arg_list = []
|
||||
verify_list = []
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arg_list, verify_list)
|
||||
|
||||
headers, data = self.cmd.take_action(parsed_args)
|
||||
|
||||
self.neutronclient.list.assert_called_once()
|
||||
self.assertEqual(headers, list(headers_long))
|
||||
self.assertListItemEqual(
|
||||
list(data),
|
||||
[_get_data(fake_l2gw, columns_long) for fake_l2gw
|
||||
in fake_l2gws[osc_l2gw.L2_GATEWAYS]]
|
||||
)
|
||||
|
||||
|
||||
class TestDeleteL2gw(test_fakes.TestNeutronClientOSCV2):
|
||||
def setUp(self):
|
||||
super(TestDeleteL2gw, self).setUp()
|
||||
self.neutronclient.find_resource = mock.Mock(
|
||||
side_effect=lambda _, name_or_id: {'id': name_or_id})
|
||||
self.cmd = osc_l2gw.DeleteL2gw(self.app, self.namespace)
|
||||
|
||||
def test_delete_l2gateway(self):
|
||||
"""Test Delete l2gateway."""
|
||||
|
||||
fake_l2gw = fakes.FakeL2GW.create_l2gw()
|
||||
self.neutronclient.delete = mock.Mock()
|
||||
|
||||
arg_list = [
|
||||
fake_l2gw['id'],
|
||||
]
|
||||
verify_list = [
|
||||
(osc_l2gw.L2_GATEWAYS, [fake_l2gw['id']]),
|
||||
]
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arg_list, verify_list)
|
||||
|
||||
result = self.cmd.take_action(parsed_args)
|
||||
|
||||
self.neutronclient.delete.assert_called_once_with(
|
||||
osc_l2gw.resource_path % fake_l2gw['id'])
|
||||
self.assertIsNone(result)
|
||||
|
||||
|
||||
class TestShowL2gw(test_fakes.TestNeutronClientOSCV2):
|
||||
def setUp(self):
|
||||
super(TestShowL2gw, self).setUp()
|
||||
self.neutronclient.find_resource = mock.Mock(
|
||||
side_effect=lambda _, name_or_id: {'id': name_or_id})
|
||||
self.cmd = osc_l2gw.ShowL2gw(self.app, self.namespace)
|
||||
|
||||
def test_show_l2gateway(self):
|
||||
"""Test Show l2gateway: --fields id --fields name myid."""
|
||||
|
||||
fake_l2gw = fakes.FakeL2GW.create_l2gw()
|
||||
self.neutronclient.get = mock.Mock(
|
||||
return_value={osc_l2gw.L2_GATEWAY: fake_l2gw})
|
||||
arg_list = [
|
||||
fake_l2gw['id'],
|
||||
]
|
||||
verify_list = [
|
||||
(osc_l2gw.L2_GATEWAY, fake_l2gw['id']),
|
||||
]
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arg_list, verify_list)
|
||||
|
||||
headers, data = self.cmd.take_action(parsed_args)
|
||||
|
||||
self.neutronclient.get.assert_called_once_with(
|
||||
osc_l2gw.resource_path % fake_l2gw['id'])
|
||||
self.assertEqual(sorted_headers, headers)
|
||||
self.assertItemEqual(_get_data(fake_l2gw), data)
|
||||
|
||||
|
||||
class TestUpdateL2gw(test_fakes.TestNeutronClientOSCV2):
|
||||
_new_device_name = 'new_device'
|
||||
_new_interface = 'new_interface'
|
||||
_new_name = 'new_name'
|
||||
|
||||
columns = (
|
||||
'Devices',
|
||||
'ID',
|
||||
'Name',
|
||||
'Tenant'
|
||||
)
|
||||
|
||||
def setUp(self):
|
||||
super(TestUpdateL2gw, self).setUp()
|
||||
self.cmd = osc_l2gw.UpdateL2gw(self.app, self.namespace)
|
||||
self.neutronclient.find_resource = mock.Mock(
|
||||
side_effect=lambda _, name_or_id: {'id': name_or_id})
|
||||
|
||||
def _assert_update_succeeded(self, new_l2gw, attrs, columns, data):
|
||||
self.neutronclient.put.assert_called_once_with(
|
||||
osc_l2gw.resource_path % new_l2gw['id'],
|
||||
{osc_l2gw.L2_GATEWAY: attrs})
|
||||
self.assertEqual(self.columns, columns)
|
||||
self.assertItemEqual(_get_data(new_l2gw), data)
|
||||
|
||||
def test_update_l2gateway(self):
|
||||
"""Test Update l2gateway."""
|
||||
|
||||
fake_l2gw = fakes.FakeL2GW.create_l2gw()
|
||||
new_l2gw = copy.deepcopy(fake_l2gw)
|
||||
new_l2gw['name'] = self._new_name
|
||||
new_l2gw['devices'][0]['device_name'] = self._new_device_name
|
||||
new_l2gw['devices'][0]['interfaces'][0]['name'] = self._new_interface
|
||||
|
||||
self.neutronclient.put = mock.Mock(
|
||||
return_value={osc_l2gw.L2_GATEWAY: new_l2gw})
|
||||
|
||||
arg_list = [
|
||||
fake_l2gw['id'],
|
||||
'--name', self._new_name,
|
||||
'--device', 'name=' + self._new_device_name +
|
||||
',interface_names=' + self._new_interface,
|
||||
]
|
||||
verify_list = [
|
||||
('name', self._new_name),
|
||||
('devices', [
|
||||
{'interface_names': self._new_interface,
|
||||
'name': self._new_device_name}]),
|
||||
]
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arg_list, verify_list)
|
||||
columns, data = self.cmd.take_action(parsed_args)
|
||||
attrs = {
|
||||
'name': self._new_name,
|
||||
'devices': [
|
||||
{'interfaces': [
|
||||
{'name': self._new_interface}],
|
||||
'device_name': self._new_device_name}
|
||||
]
|
||||
}
|
||||
|
||||
self._assert_update_succeeded(new_l2gw, attrs, columns, data)
|
||||
|
||||
def test_update_l2gateway_name(self):
|
||||
"""Test Update l2gateway name."""
|
||||
|
||||
fake_l2gw = fakes.FakeL2GW.create_l2gw()
|
||||
new_l2gw = copy.deepcopy(fake_l2gw)
|
||||
new_l2gw['name'] = self._new_name
|
||||
|
||||
self.neutronclient.put = mock.Mock(
|
||||
return_value={osc_l2gw.L2_GATEWAY: new_l2gw})
|
||||
|
||||
arg_list = [
|
||||
fake_l2gw['id'],
|
||||
'--name', self._new_name,
|
||||
]
|
||||
verify_list = [('name', self._new_name)]
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arg_list, verify_list)
|
||||
columns, data = self.cmd.take_action(parsed_args)
|
||||
attrs = {'name': self._new_name}
|
||||
|
||||
self._assert_update_succeeded(new_l2gw, attrs, columns, data)
|
||||
|
||||
def test_update_l2gateway_with_multiple_interfaces(self):
|
||||
"""Test Update l2gateway with multiple interfaces."""
|
||||
|
||||
fake_l2gw = fakes.FakeL2GW.create_l2gw()
|
||||
new_l2gw = copy.deepcopy(fake_l2gw)
|
||||
new_l2gw['devices'][0]['interfaces'].append(
|
||||
{'name': self._new_interface})
|
||||
|
||||
self.neutronclient.put = mock.Mock(
|
||||
return_value={osc_l2gw.L2_GATEWAY: new_l2gw})
|
||||
|
||||
l2gw_device = new_l2gw['devices'][0]
|
||||
l2gw_interface_1 = l2gw_device['interfaces'][0]
|
||||
l2gw_interface_2 = l2gw_device['interfaces'][1]
|
||||
arg_list = [
|
||||
fake_l2gw['id'],
|
||||
'--device', 'name=' + l2gw_device['device_name'] +
|
||||
',interface_names=' + l2gw_interface_1['name'] + ';' +
|
||||
l2gw_interface_2['name']
|
||||
]
|
||||
|
||||
verify_list = [
|
||||
('devices', [
|
||||
{
|
||||
'interface_names':
|
||||
l2gw_interface_1['name'] + ';' +
|
||||
l2gw_interface_2['name'],
|
||||
'name': l2gw_device['device_name']
|
||||
}
|
||||
]),
|
||||
]
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arg_list, verify_list)
|
||||
columns, data = self.cmd.take_action(parsed_args)
|
||||
attrs = {
|
||||
'devices': [
|
||||
{'device_name': l2gw_device['device_name'],
|
||||
'interfaces': [
|
||||
{'name': l2gw_interface_1['name']},
|
||||
{'name': self._new_interface}]}
|
||||
]
|
||||
}
|
||||
|
||||
self._assert_update_succeeded(new_l2gw, attrs, columns, data)
|
||||
|
||||
def test_update_l2gateway_with_segmentation_id(self):
|
||||
"""Test Update l2gateway with segmentation-id."""
|
||||
|
||||
fake_l2gw = fakes.FakeL2GW.create_l2gw()
|
||||
|
||||
new_l2gw = copy.deepcopy(fake_l2gw)
|
||||
new_l2gw['devices'][0]['interfaces'][0]['segmentation_id'] = ['42']
|
||||
|
||||
self.neutronclient.put = mock.Mock(
|
||||
return_value={osc_l2gw.L2_GATEWAY: new_l2gw})
|
||||
|
||||
l2gw_device = new_l2gw['devices'][0]
|
||||
l2gw_interface = l2gw_device['interfaces'][0]
|
||||
arg_list = [
|
||||
fake_l2gw['id'],
|
||||
'--device', 'name=' + l2gw_device['device_name'] +
|
||||
',interface_names=' + l2gw_interface['name'] + '|' +
|
||||
'#'.join(l2gw_interface['segmentation_id']),
|
||||
]
|
||||
|
||||
verify_list = [
|
||||
('devices', [
|
||||
{'interface_names': l2gw_device['interfaces'][0]['name'] +
|
||||
'|' + '#'.join(l2gw_interface['segmentation_id']),
|
||||
'name': l2gw_device['device_name']}]),
|
||||
]
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arg_list, verify_list)
|
||||
columns, data = self.cmd.take_action(parsed_args)
|
||||
attrs = {
|
||||
'devices': [
|
||||
{'device_name': l2gw_device['device_name'],
|
||||
'interfaces': [
|
||||
{'name': l2gw_interface['name'],
|
||||
'segmentation_id': l2gw_interface['segmentation_id']}]
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
self._assert_update_succeeded(new_l2gw, attrs, columns, data)
|
||||
|
||||
def test_update_l2gateway_with_mul_segmentation_ids(self):
|
||||
"""Test Update l2gateway with multiple segmentation-ids."""
|
||||
|
||||
fake_l2gw = fakes.FakeL2GW.create_l2gw()
|
||||
|
||||
new_l2gw = copy.deepcopy(fake_l2gw)
|
||||
new_l2gw['devices'][0]['interfaces'][0]['segmentation_id'] = ['42',
|
||||
'43']
|
||||
self.neutronclient.put = mock.Mock(
|
||||
return_value={osc_l2gw.L2_GATEWAY: new_l2gw})
|
||||
|
||||
l2gw_device = new_l2gw['devices'][0]
|
||||
l2gw_interface = l2gw_device['interfaces'][0]
|
||||
arg_list = [
|
||||
fake_l2gw['id'],
|
||||
'--device', 'name=' + l2gw_device['device_name'] +
|
||||
',interface_names=' + l2gw_interface['name'] + '|' +
|
||||
'#'.join(l2gw_interface['segmentation_id']),
|
||||
]
|
||||
|
||||
verify_list = [
|
||||
('devices', [
|
||||
{'interface_names': l2gw_device['interfaces'][0]['name'] +
|
||||
'|' + '#'.join(l2gw_interface['segmentation_id']),
|
||||
'name': l2gw_device['device_name']}]),
|
||||
]
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arg_list, verify_list)
|
||||
columns, data = self.cmd.take_action(parsed_args)
|
||||
attrs = {
|
||||
'devices': [
|
||||
{'device_name': l2gw_device['device_name'],
|
||||
'interfaces': [
|
||||
{'name': l2gw_interface['name'],
|
||||
'segmentation_id': l2gw_interface['segmentation_id']}]
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
self._assert_update_succeeded(new_l2gw, attrs, columns, data)
|
@ -0,0 +1,173 @@
|
||||
# All Rights Reserved 2018
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import operator
|
||||
|
||||
import mock
|
||||
from osc_lib import utils as osc_utils
|
||||
from osc_lib.utils import columns as column_util
|
||||
|
||||
from neutronclient.tests.unit.osc.v2 import fakes as test_fakes
|
||||
|
||||
from networking_l2gw.l2gatewayclient.osc import l2gw_connection as \
|
||||
osc_l2gw_conn
|
||||
from networking_l2gw.tests.unit.l2gatewayclient.osc import fakes
|
||||
|
||||
columns_long = tuple(col for col, _, listing_mode in osc_l2gw_conn._attr_map
|
||||
if listing_mode in (column_util.LIST_BOTH,
|
||||
column_util.LIST_LONG_ONLY))
|
||||
headers_long = tuple(head for _, head, listing_mode in osc_l2gw_conn._attr_map
|
||||
if listing_mode in (column_util.LIST_BOTH,
|
||||
column_util.LIST_LONG_ONLY))
|
||||
sorted_attr_map = sorted(osc_l2gw_conn._attr_map, key=operator.itemgetter(1))
|
||||
sorted_columns = tuple(col for col, _, _ in sorted_attr_map)
|
||||
sorted_headers = tuple(head for _, head, _ in sorted_attr_map)
|
||||
|
||||
|
||||
def _get_data(attrs, columns=sorted_columns):
|
||||
return osc_utils.get_dict_properties(attrs, columns)
|
||||
|
||||
|
||||
class TestCreateL2gwConnection(test_fakes.TestNeutronClientOSCV2):
|
||||
columns = (
|
||||
'ID',
|
||||
'L2 GateWay ID',
|
||||
'Network ID',
|
||||
'Segmentation ID',
|
||||
'Tenant'
|
||||
)
|
||||
|
||||
def setUp(self):
|
||||
super(TestCreateL2gwConnection, self).setUp()
|
||||
self.cmd = osc_l2gw_conn.CreateL2gwConnection(
|
||||
self.app, self.namespace)
|
||||
self.neutronclient.find_resource = mock.Mock(
|
||||
side_effect=lambda _, name_or_id, *x: {'id': name_or_id})
|
||||
|
||||
def test_create_l2gateway_connection(self):
|
||||
"""Test Create l2gateway-connection."""
|
||||
|
||||
fake_connection = fakes.FakeL2GWConnection.create_l2gw_connection()
|
||||
self.neutronclient.post = mock.Mock(
|
||||
return_value={
|
||||
osc_l2gw_conn.L2_GATEWAY_CONNECTION: fake_connection
|
||||
})
|
||||
|
||||
arg_list = [
|
||||
fake_connection['l2_gateway_id'],
|
||||
fake_connection['network_id'],
|
||||
'--default-segmentation-id', fake_connection['segmentation_id']
|
||||
]
|
||||
verify_list = [
|
||||
('gateway_name', fake_connection['l2_gateway_id']),
|
||||
('network', fake_connection['network_id']),
|
||||
('seg_id', fake_connection['segmentation_id'])
|
||||
]
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arg_list, verify_list)
|
||||
columns, data = self.cmd.take_action(parsed_args)
|
||||
self.neutronclient.post.assert_called_once_with(
|
||||
osc_l2gw_conn.object_path,
|
||||
{osc_l2gw_conn.L2_GATEWAY_CONNECTION:
|
||||
{'segmentation_id': fake_connection['segmentation_id'],
|
||||
'network_id': fake_connection['network_id'],
|
||||
'l2_gateway_id': fake_connection['l2_gateway_id']
|
||||
}
|
||||
}
|
||||
)
|
||||
self.assertEqual(self.columns, columns)
|
||||
self.assertItemEqual(_get_data(fake_connection), data)
|
||||
|
||||
|
||||
class TestListL2gwConnection(test_fakes.TestNeutronClientOSCV2):
|
||||
def setUp(self):
|
||||
super(TestListL2gwConnection, self).setUp()
|
||||
self.cmd = osc_l2gw_conn.ListL2gwConnection(self.app, self.namespace)
|
||||
|
||||
def test_list_l2gateway_connection(self):
|
||||
"""Test List l2gateway-connections."""
|
||||
|
||||
fake_connections = fakes.FakeL2GWConnection.create_l2gw_connections(
|
||||
count=3)
|
||||
self.neutronclient.list = mock.Mock(return_value=fake_connections)
|
||||
arg_list = []
|
||||
verify_list = []
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arg_list, verify_list)
|
||||
|
||||
headers, data = self.cmd.take_action(parsed_args)
|
||||
|
||||
self.neutronclient.list.assert_called_once()
|
||||
self.assertEqual(headers, list(headers_long))
|
||||
self.assertListItemEqual(
|
||||
list(data),
|
||||
[_get_data(fake_connection, columns_long) for fake_connection
|
||||
in fake_connections[osc_l2gw_conn.L2_GATEWAY_CONNECTIONS]]
|
||||
)
|
||||
|
||||
|
||||
class TestShowL2gwConnection(test_fakes.TestNeutronClientOSCV2):
|
||||
def setUp(self):
|
||||
super(TestShowL2gwConnection, self).setUp()
|
||||
self.cmd = osc_l2gw_conn.ShowL2gwConnection(self.app, self.namespace)
|
||||
self.neutronclient.find_resource = mock.Mock(
|
||||
side_effect=lambda _, name_or_id, *x: {'id': name_or_id})
|
||||
|
||||
def test_show_l2gateway_connection(self):
|
||||
"""Test Show l2gateway-connection."""
|
||||
|
||||
fake_connection = fakes.FakeL2GWConnection.create_l2gw_connection()
|
||||
self.neutronclient.get = mock.Mock(
|
||||
return_value={
|
||||
osc_l2gw_conn.L2_GATEWAY_CONNECTION: fake_connection
|
||||
})
|
||||
arg_list = [fake_connection['id']]
|
||||
verify_list = [
|
||||
(osc_l2gw_conn.L2_GATEWAY_CONNECTION, fake_connection['id'])
|
||||
]
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arg_list, verify_list)
|
||||
|
||||
headers, data = self.cmd.take_action(parsed_args)
|
||||
|
||||
self.neutronclient.get.assert_called_once_with(
|
||||
osc_l2gw_conn.resource_path % fake_connection['id'])
|
||||
self.assertEqual(sorted_headers, headers)
|
||||
self.assertItemEqual(_get_data(fake_connection), data)
|
||||
|
||||
|
||||
class TestDeleteL2gwConnection(test_fakes.TestNeutronClientOSCV2):
|
||||
def setUp(self):
|
||||
super(TestDeleteL2gwConnection, self).setUp()
|
||||
self.neutronclient.find_resource = mock.Mock(
|
||||
side_effect=lambda _, name_or_id: {'id': name_or_id})
|
||||
self.cmd = osc_l2gw_conn.DeleteL2gwConnection(self.app, self.namespace)
|
||||
|
||||
def test_delete_l2gateway_connection(self):
|
||||
"""Test Delete l2gateway-connection."""
|
||||
|
||||
fake_connection = fakes.FakeL2GWConnection.create_l2gw_connection()
|
||||
self.neutronclient.delete = mock.Mock(return_value=fake_connection)
|
||||
arg_list = [fake_connection['id']]
|
||||
verify_list = [
|
||||
(osc_l2gw_conn.L2_GATEWAY_CONNECTIONS, [fake_connection['id']])
|
||||
]
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arg_list, verify_list)
|
||||
|
||||
result = self.cmd.take_action(parsed_args)
|
||||
|
||||
self.neutronclient.delete.assert_called_once_with(
|
||||
osc_l2gw_conn.resource_path % fake_connection['id'])
|
||||
self.assertIsNone(result)
|
Loading…
Reference in new issue