11 changed files with 1251 additions and 10 deletions
@ -0,0 +1,256 @@
|
||||
# All Rights Reserved 2018 |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may |
||||
# not use this file except in compliance with the License. You may obtain |
||||
# a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
||||
# License for the specific language governing permissions and limitations |
||||
# under the License. |
||||
|
||||
import logging |
||||
|
||||
from osc_lib.cli import format_columns |
||||
from osc_lib.command import command |
||||
from osc_lib import exceptions |
||||
from osc_lib import utils as osc_utils |
||||
from osc_lib.utils import columns as column_util |
||||
|
||||
from neutronclient._i18n import _ |
||||
from neutronclient.common import utils |
||||
from neutronclient.osc import utils as nc_osc_utils |
||||
|
||||
LOG = logging.getLogger(__name__) |
||||
|
||||
INTERFACE_DELIMITER = ";" |
||||
SEGMENTATION_ID_DELIMITER = "#" |
||||
INTERFACE_SEG_ID_DELIMITER = "|" |
||||
|
||||
L2_GATEWAY = 'l2_gateway' |
||||
L2_GATEWAYS = '%ss' % L2_GATEWAY |
||||
|
||||
path = 'l2-gateways' |
||||
object_path = '/%s' % path |
||||
resource_path = '/%s/%%s' % path |
||||
|
||||
_attr_map = ( |
||||
('id', 'ID', column_util.LIST_BOTH), |
||||
('tenant_id', 'Tenant', column_util.LIST_LONG_ONLY), |
||||
('name', 'Name', column_util.LIST_BOTH), |
||||
('devices', 'Devices', column_util.LIST_BOTH), |
||||
) |
||||
_formatters = { |
||||
'devices': format_columns.ListDictColumn, |
||||
} |
||||
|
||||
|
||||
def _get_common_parser(parser): |
||||
"""Adds to parser arguments common to create and update commands. |
||||
|
||||
:params ArgumentParser parser: argparse object contains all command's |
||||
arguments |
||||
""" |
||||
parser.add_argument( |
||||
'--device', |
||||
metavar='name=name,interface_names=INTERFACE-DETAILS', |
||||
action='append', dest='devices', type=utils.str2dict, |
||||
help=_('Device name and Interface-names of l2gateway. ' |
||||
'INTERFACE-DETAILS is of form ' |
||||
'\"<interface_name1>;[<interface_name2>]' |
||||
'[|<seg_id1>[#<seg_id2>]]\" ' |
||||
'(--device option can be repeated)')) |
||||
|
||||
|
||||
def get_interface(interfaces): |
||||
interface_dict = [] |
||||
for interface in interfaces: |
||||
if INTERFACE_SEG_ID_DELIMITER in interface: |
||||
int_name = interface.split(INTERFACE_SEG_ID_DELIMITER)[0] |
||||
segid = interface.split(INTERFACE_SEG_ID_DELIMITER)[1] |
||||
if SEGMENTATION_ID_DELIMITER in segid: |
||||
segid = segid.split(SEGMENTATION_ID_DELIMITER) |
||||
else: |
||||
segid = [segid] |
||||
interface_detail = {'name': int_name, 'segmentation_id': segid} |
||||
else: |
||||
interface_detail = {'name': interface} |
||||
interface_dict.append(interface_detail) |
||||
return interface_dict |
||||
|
||||
|
||||
def _args2body(parsed_args, update=False): |
||||
if parsed_args.devices: |
||||
devices = parsed_args.devices |
||||
interfaces = [] |
||||
else: |
||||
devices = [] |
||||
device_dict = [] |
||||
for device in devices: |
||||
if 'interface_names' in device.keys(): |
||||
interface = device['interface_names'] |
||||
if INTERFACE_DELIMITER in interface: |
||||
interface_dict = interface.split(INTERFACE_DELIMITER) |
||||
interfaces = get_interface(interface_dict) |
||||
else: |
||||
interfaces = get_interface([interface]) |
||||
if 'name' in device.keys(): |
||||
device = {'device_name': device['name'], |
||||
'interfaces': interfaces} |
||||
else: |
||||
device = {'interfaces': interfaces} |
||||
device_dict.append(device) |
||||
if parsed_args.name: |
||||
l2gw_name = parsed_args.name |
||||
body = {L2_GATEWAY: {'name': l2gw_name, |
||||
'devices': device_dict}, } |
||||
else: |
||||
body = {L2_GATEWAY: {'devices': device_dict}, } |
||||
return body |
||||
|
||||
|
||||
class CreateL2gw(command.ShowOne): |
||||
_description = _("Create l2gateway resource") |
||||
|
||||
def get_parser(self, prog_name): |
||||
parser = super(CreateL2gw, self).get_parser(prog_name) |
||||
nc_osc_utils.add_project_owner_option_to_parser(parser) |
||||
parser.add_argument( |
||||
'name', metavar='<GATEWAY-NAME>', |
||||
help=_('Descriptive name for logical gateway.')) |
||||
_get_common_parser(parser) |
||||
return parser |
||||
|
||||
def take_action(self, parsed_args): |
||||
client = self.app.client_manager.neutronclient |
||||
attrs = {} |
||||
if parsed_args.name is not None: |
||||
attrs['name'] = str(parsed_args.name) |
||||
if parsed_args.devices is not None: |
||||
attrs['devices'] = str(parsed_args.devices) |
||||
if 'project' in parsed_args and parsed_args.project is not None: |
||||
project_id = nc_osc_utils.find_project( |
||||
self.app.client_manager.identity, |
||||
parsed_args.project, |
||||
parsed_args.project_domain, |
||||
).id |
||||
attrs['tenant_id'] = project_id |
||||
body = _args2body(parsed_args) |
||||
obj = client.post(object_path, body)[L2_GATEWAY] |
||||
columns, display_columns = column_util.get_columns(obj, _attr_map) |
||||
data = osc_utils.get_dict_properties(obj, columns, |
||||
formatters=_formatters) |
||||
return display_columns, data |
||||
|
||||
|
||||
class ListL2gw(command.Lister): |
||||
_description = _("List l2gateway that belongs to a given tenant") |
||||
|
||||
def get_parser(self, prog_name): |
||||
parser = super(ListL2gw, self).get_parser(prog_name) |
||||
nc_osc_utils.add_project_owner_option_to_parser(parser) |
||||
|
||||
return parser |
||||
|
||||
def take_action(self, parsed_args): |
||||
client = self.app.client_manager.neutronclient |
||||
params = {} |
||||
if parsed_args.project is not None: |
||||
project_id = nc_osc_utils.find_project( |
||||
self.app.client_manager.identity, |
||||
parsed_args.project, |
||||
parsed_args.project_domain, |
||||
).id |
||||
params['tenant_id'] = project_id |
||||
objs = client.list(L2_GATEWAYS, object_path, |
||||
retrieve_all=True, params=params)[L2_GATEWAYS] |
||||
headers, columns = column_util.get_column_definitions( |
||||
_attr_map, long_listing=True) |
||||
return (headers, (osc_utils.get_dict_properties( |
||||
s, columns, formatters=_formatters) for s in objs)) |
||||
|
||||
|
||||
class ShowL2gw(command.ShowOne): |
||||
_description = _("Show information of a given l2gateway") |
||||
|
||||
def get_parser(self, prog_name): |
||||
parser = super(ShowL2gw, self).get_parser(prog_name) |
||||
parser.add_argument( |
||||
L2_GATEWAY, |
||||
metavar="<L2_GATEWAY>", |
||||
help=_("ID or name of l2_gateway to look up."), |
||||
) |
||||
return parser |
||||
|
||||
def take_action(self, parsed_args): |
||||
client = self.app.client_manager.neutronclient |
||||
id = client.find_resource(L2_GATEWAY, parsed_args.l2_gateway)['id'] |
||||
obj = client.get(resource_path % id)[L2_GATEWAY] |
||||
columns, display_columns = column_util.get_columns(obj, _attr_map) |
||||
data = osc_utils.get_dict_properties(obj, columns, |
||||
formatters=_formatters) |
||||
return display_columns, data |
||||
|
||||
|
||||
class DeleteL2gw(command.Command): |
||||
_description = _("Delete a given l2gateway") |
||||
|
||||
def get_parser(self, prog_name): |
||||
parser = super(DeleteL2gw, self).get_parser(prog_name) |
||||
parser.add_argument( |
||||
L2_GATEWAYS, |
||||
metavar="<L2_GATEWAY>", |
||||
nargs="+", |
||||
help=_("ID(s) or name(s) of l2_gateway to delete."), |
||||
) |
||||
return parser |
||||
|
||||
def take_action(self, parsed_args): |
||||
client = self.app.client_manager.neutronclient |
||||
fails = 0 |
||||
for id_or_name in parsed_args.l2_gateways: |
||||
try: |
||||
id = client.find_resource(L2_GATEWAY, id_or_name)['id'] |
||||
client.delete(resource_path % id) |
||||
LOG.warning("L2 Gateaway %(id)s deleted", {'id': id}) |
||||
except Exception as e: |
||||
fails += 1 |
||||
LOG.error("Failed to delete L2 Gateway with name or ID " |
||||
"'%(id_or_name)s': %(e)s", |
||||
{'id_or_name': id_or_name, 'e': e}) |
||||
if fails > 0: |
||||
msg = (_("Failed to delete %(fails)s of %(total)s L2 Gateway.") % |
||||
{'fails': fails, 'total': len(parsed_args.l2_gateways)}) |
||||
raise exceptions.CommandError(msg) |
||||
|
||||
|
||||
class UpdateL2gw(command.ShowOne): |
||||
_description = _("Update a given l2gateway") |
||||
|
||||
def get_parser(self, prog_name): |
||||
parser = super(UpdateL2gw, self).get_parser(prog_name) |
||||
parser.add_argument( |
||||
L2_GATEWAY, |
||||
metavar="<L2_GATEWAY>", |
||||
help=_("ID or name of l2_gateway to update."), |
||||
) |
||||
parser.add_argument('--name', metavar='name', |
||||
help=_('Descriptive name for logical gateway.')) |
||||
_get_common_parser(parser) |
||||
return parser |
||||
|
||||
def take_action(self, parsed_args): |
||||
client = self.app.client_manager.neutronclient |
||||
id = client.find_resource(L2_GATEWAY, parsed_args.l2_gateway)['id'] |
||||
if parsed_args.devices: |
||||
body = _args2body(parsed_args) |
||||
else: |
||||
body = {L2_GATEWAY: {'name': parsed_args.name}} |
||||
obj = client.put(resource_path % id, body)[L2_GATEWAY] |
||||
columns, display_columns = column_util.get_columns(obj, _attr_map) |
||||
data = osc_utils.get_dict_properties(obj, columns, |
||||
formatters=_formatters) |
||||
return display_columns, data |
@ -0,0 +1,170 @@
|
||||
# All Rights Reserved 2018 |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may |
||||
# not use this file except in compliance with the License. You may obtain |
||||
# a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
||||
# License for the specific language governing permissions and limitations |
||||
# under the License. |
||||
|
||||
import logging |
||||
|
||||
from osc_lib.command import command |
||||
from osc_lib import exceptions |
||||
from osc_lib import utils as osc_utils |
||||
from osc_lib.utils import columns as column_util |
||||
|
||||
from neutronclient._i18n import _ |
||||
from neutronclient.neutron import v2_0 as n_v20 |
||||
from neutronclient.osc import utils as nc_osc_utils |
||||
|
||||
LOG = logging.getLogger(__name__) |
||||
|
||||
L2_GATEWAY_CONNECTION = 'l2_gateway_connection' |
||||
L2_GATEWAY_CONNECTIONS = '%ss' % L2_GATEWAY_CONNECTION |
||||
|
||||
path = 'l2-gateway-connections' |
||||
object_path = '/%s' % path |
||||
resource_path = '/%s/%%s' % path |
||||
|
||||
_attr_map = ( |
||||
('id', 'ID', column_util.LIST_BOTH), |
||||
('tenant_id', 'Tenant', column_util.LIST_LONG_ONLY), |
||||
('l2_gateway_id', 'L2 GateWay ID', column_util.LIST_BOTH), |
||||
('network_id', 'Network ID', column_util.LIST_BOTH), |
||||
('segmentation_id', 'Segmentation ID', column_util.LIST_BOTH), |
||||
) |
||||
|
||||
|
||||
class CreateL2gwConnection(command.ShowOne): |
||||
_description = _("Create l2gateway-connection") |
||||
|
||||
def retrieve_ids(self, client, args): |
||||
gateway_id = n_v20.find_resourceid_by_name_or_id( |
||||
client, 'l2_gateway', args.gateway_name) |
||||
network_id = n_v20.find_resourceid_by_name_or_id( |
||||
client, 'network', args.network) |
||||
return gateway_id, network_id |
||||
|
||||
def get_parser(self, prog_name): |
||||
parser = super(CreateL2gwConnection, self).get_parser(prog_name) |
||||
parser.add_argument('gateway_name', metavar='<GATEWAY-NAME/UUID>', |
||||
help=_('Descriptive name for logical gateway.')) |
||||
parser.add_argument( |
||||
'network', metavar='<NETWORK-NAME/UUID>', |
||||
help=_('Network name or uuid.')) |
||||
parser.add_argument( |
||||
'--default-segmentation-id', |
||||
dest='seg_id', |
||||
help=_('default segmentation-id that will ' |
||||
'be applied to the interfaces for which ' |
||||
'segmentation id was not specified ' |
||||
'in l2-gateway-create command.')) |
||||
return parser |
||||
|
||||
def take_action(self, parsed_args): |
||||
client = self.app.client_manager.neutronclient |
||||
(gateway_id, network_id) = self.retrieve_ids(client, parsed_args) |
||||
body = { |
||||
L2_GATEWAY_CONNECTION: { |
||||
'l2_gateway_id': gateway_id, 'network_id': network_id |
||||
} |
||||
} |
||||
if parsed_args.seg_id: |
||||
body[L2_GATEWAY_CONNECTION]['segmentation_id'] = \ |
||||
parsed_args.seg_id |
||||
obj = client.post(object_path, body)[L2_GATEWAY_CONNECTION] |
||||
columns, display_columns = column_util.get_columns(obj, _attr_map) |
||||
data = osc_utils.get_dict_properties(obj, columns) |
||||
return display_columns, data |
||||
|
||||
|
||||
class ListL2gwConnection(command.Lister): |
||||
_description = _("List l2gateway-connections") |
||||
|
||||
def get_parser(self, prog_name): |
||||
parser = super(ListL2gwConnection, self).get_parser(prog_name) |
||||
nc_osc_utils.add_project_owner_option_to_parser(parser) |
||||
|
||||
return parser |
||||
|
||||
def take_action(self, parsed_args): |
||||
client = self.app.client_manager.neutronclient |
||||
params = {} |
||||
if parsed_args.project is not None: |
||||
project_id = nc_osc_utils.find_project( |
||||
self.app.client_manager.identity, |
||||
parsed_args.project, |
||||
parsed_args.project_domain, |
||||
).id |
||||
params['tenant_id'] = project_id |
||||
objs = client.list( |
||||
L2_GATEWAY_CONNECTIONS, object_path, |
||||
retrieve_all=True, params=params)[L2_GATEWAY_CONNECTIONS] |
||||
headers, columns = column_util.get_column_definitions( |
||||
_attr_map, long_listing=True) |
||||
return (headers, (osc_utils.get_dict_properties( |
||||
s, columns) for s in objs)) |
||||
|
||||
|
||||
class ShowL2gwConnection(command.ShowOne): |
||||
_description = _("Show information of a given l2gateway-connection") |
||||
|
||||
def get_parser(self, prog_name): |
||||
parser = super(ShowL2gwConnection, self).get_parser(prog_name) |
||||
parser.add_argument( |
||||
L2_GATEWAY_CONNECTION, |
||||
metavar="<L2_GATEWAY_CONNECTION>", |
||||
help=_("ID of l2_gateway_connection to look up."), |
||||
) |
||||
return parser |
||||
|
||||
def take_action(self, parsed_args): |
||||
client = self.app.client_manager.neutronclient |
||||
id = client.find_resource(L2_GATEWAY_CONNECTION, |
||||
parsed_args.l2_gateway_connection)['id'] |
||||
obj = client.get(resource_path % id)[L2_GATEWAY_CONNECTION] |
||||
columns, display_columns = column_util.get_columns(obj, _attr_map) |
||||
data = osc_utils.get_dict_properties(obj, columns) |
||||
return display_columns, data |
||||
|
||||
|
||||
class DeleteL2gwConnection(command.Command): |
||||
_description = _("Delete a given l2gateway-connection") |
||||
|
||||
def get_parser(self, prog_name): |
||||
parser = super(DeleteL2gwConnection, self).get_parser(prog_name) |
||||
parser.add_argument( |
||||
L2_GATEWAY_CONNECTIONS, |
||||
metavar="<L2_GATEWAY_CONNECTIONS>", |
||||
nargs="+", |
||||
help=_("ID(s) of l2_gateway_connections(s) to delete."), |
||||
) |
||||
return parser |
||||
|
||||
def take_action(self, parsed_args): |
||||
client = self.app.client_manager.neutronclient |
||||
fails = 0 |
||||
for id_or_name in parsed_args.l2_gateway_connections: |
||||
try: |
||||
id = client.find_resource( |
||||
L2_GATEWAY_CONNECTION, id_or_name)['id'] |
||||
client.delete(resource_path % id) |
||||
LOG.warning("L2 Gateaway Connection %(id)s deleted", |
||||
{'id': id}) |
||||
except Exception as e: |
||||
fails += 1 |
||||
LOG.error("Failed to delete L2 Gateway Connection with name " |
||||
"or ID '%(id_or_name)s': %(e)s", |
||||
{'id_or_name': id_or_name, 'e': e}) |
||||
if fails > 0: |
||||
msg = (_("Failed to delete %(fails)s of %(total)s L2 Gateway " |
||||
"Connection.") % |
||||
{'fails': fails, 'total': len( |
||||
parsed_args.l2_gateway_connections)}) |
||||
raise exceptions.CommandError(msg) |
@ -0,0 +1,91 @@
|
||||
# All Rights Reserved 2018 |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may |
||||
# not use this file except in compliance with the License. You may obtain |
||||
# a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
||||
# License for the specific language governing permissions and limitations |
||||
# under the License. |
||||
|
||||
import copy |
||||
|
||||
from oslo_utils import uuidutils |
||||
|
||||
from networking_l2gw.l2gatewayclient.osc import l2gw as osc_l2gw |
||||
from networking_l2gw.l2gatewayclient.osc import l2gw_connection as \ |
||||
osc_l2gw_conn |
||||
|
||||
|
||||
class FakeL2GW(object): |
||||
|
||||
@staticmethod |
||||
def create_l2gw(num_dev=1, num_if=1, attrs=None): |
||||
"""Create one fake L2 Gateway.""" |
||||
attrs = attrs or {} |
||||
interfaces = [{'name': 'interface' + |
||||
uuidutils.generate_uuid(dashed=False)} for |
||||
iface in range(num_if)] |
||||
devices = [{'device_name': 'device' + |
||||
uuidutils.generate_uuid(dashed=False), |
||||
'interfaces': interfaces} for dev in range(num_dev)] |
||||
l2gw_attrs = { |
||||
'id': uuidutils.generate_uuid(), |
||||
'name': 'test-l2gw' + uuidutils.generate_uuid(dashed=False), |
||||
'tenant_id': uuidutils.generate_uuid(), |
||||
'devices': devices |
||||
} |
||||
|
||||
l2gw_attrs.update(attrs) |
||||
return copy.deepcopy(l2gw_attrs) |
||||
|
||||
@staticmethod |
||||
def create_l2gws(attrs=None, count=1): |
||||
"""Create multiple fake L2 Gateways.""" |
||||
|
||||
l2gws = [] |
||||
for i in range(0, count): |
||||
if attrs is None: |
||||
attrs = {'id': 'fake_id%d' % i} |
||||
elif getattr(attrs, 'id', None) is None: |
||||
attrs['id'] = 'fake_id%d' % i |
||||
l2gws.append(FakeL2GW.create_l2gw(attrs=attrs)) |
||||
|
||||
return {osc_l2gw.L2_GATEWAYS: l2gws} |
||||
|
||||
|
||||
class FakeL2GWConnection(object): |
||||
|
||||
@staticmethod |
||||
def create_l2gw_connection(attrs=None): |
||||
"""Create a fake l2gw connection.""" |
||||
|
||||
attrs = attrs or {} |
||||
l2gw_connection_attrs = { |
||||
'network_id': uuidutils.generate_uuid(), |
||||
'l2_gateway_id': uuidutils.generate_uuid(), |
||||
'segmentation_id': '42', |
||||
'tenant_id': uuidutils.generate_uuid(), |
||||
'id': uuidutils.generate_uuid() |
||||
} |
||||
|
||||
l2gw_connection_attrs.update(attrs) |
||||
return copy.deepcopy(l2gw_connection_attrs) |
||||
|
||||
@staticmethod |
||||
def create_l2gw_connections(attrs=None, count=1): |
||||
l2gw_connections = [] |
||||
|
||||
for i in range(0, count): |
||||
if attrs is None: |
||||
attrs = {'id': 'fake_id%d' % i} |
||||
elif getattr(attrs, 'id', None) is None: |
||||
attrs['id'] = 'fake_id%d' % i |
||||
l2gw_connections.append(FakeL2GWConnection.create_l2gw_connection( |
||||
attrs=attrs)) |
||||
|
||||
return {osc_l2gw_conn.L2_GATEWAY_CONNECTIONS: l2gw_connections} |
@ -0,0 +1,502 @@
|
||||
# All Rights Reserved 2018 |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may |
||||
# not use this file except in compliance with the License. You may obtain |
||||
# a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
||||
# License for the specific language governing permissions and limitations |
||||
# under the License. |
||||
|
||||
import copy |
||||
import operator |
||||
|
||||
import mock |
||||
from osc_lib import utils as osc_utils |
||||
from osc_lib.utils import columns as column_util |
||||
|
||||
from neutronclient.tests.unit.osc.v2 import fakes as test_fakes |
||||
|
||||
from networking_l2gw.l2gatewayclient.osc import l2gw as osc_l2gw |
||||
from networking_l2gw.tests.unit.l2gatewayclient.osc import fakes |
||||
|
||||
|
||||
columns_long = tuple(col for col, _, listing_mode in osc_l2gw._attr_map |
||||
if listing_mode in (column_util.LIST_BOTH, |
||||
column_util.LIST_LONG_ONLY)) |
||||
headers_long = tuple(head for _, head, listing_mode in osc_l2gw._attr_map |
||||
if listing_mode in (column_util.LIST_BOTH, |
||||
column_util.LIST_LONG_ONLY)) |
||||
sorted_attr_map = sorted(osc_l2gw._attr_map, key=operator.itemgetter(1)) |
||||
sorted_columns = tuple(col for col, _, _ in sorted_attr_map) |
||||
sorted_headers = tuple(head for _, head, _ in sorted_attr_map) |
||||
|
||||
|
||||
def _get_data(attrs, columns=sorted_columns): |
||||
return osc_utils.get_dict_properties(attrs, columns, |
||||
formatters=osc_l2gw._formatters) |
||||
|
||||
|
||||
class TestCreateL2gw(test_fakes.TestNeutronClientOSCV2): |
||||
|
||||
columns = ( |
||||
'Devices', |
||||
'ID', |
||||
'Name', |
||||
'Tenant' |
||||
) |
||||
|
||||
def setUp(self): |
||||
super(TestCreateL2gw, self).setUp() |
||||
self.cmd = osc_l2gw.CreateL2gw(self.app, self.namespace) |
||||
|
||||
def _assert_create_succeeded(self, fake_l2gw, arg_list, verify_list): |
||||
parsed_args = self.check_parser(self.cmd, arg_list, verify_list) |
||||
columns, data = self.cmd.take_action(parsed_args) |
||||
self.neutronclient.post.assert_called_once_with( |
||||
osc_l2gw.object_path, |
||||
{osc_l2gw.L2_GATEWAY: |
||||
{'name': fake_l2gw['name'], 'devices': fake_l2gw['devices']}} |
||||
) |
||||
self.assertEqual(self.columns, columns) |
||||
self.assertItemEqual(_get_data(fake_l2gw), data) |
||||
|
||||
def test_create_l2gw(self): |
||||
"""Test Create l2gateway.""" |
||||
|
||||
fake_l2gw = fakes.FakeL2GW.create_l2gw() |
||||
|
||||
self.neutronclient.post = mock.Mock( |
||||
return_value={osc_l2gw.L2_GATEWAY: fake_l2gw}) |
||||
l2gw_device = fake_l2gw['devices'][0] |
||||
arg_list = [ |
||||
'--device', 'name=' + l2gw_device['device_name'] + |
||||
',interface_names=' + l2gw_device['interfaces'][0]['name'], |
||||
fake_l2gw['name'] |
||||
] |
||||
|
||||
verify_list = [ |
||||
('devices', [ |
||||
{'interface_names': l2gw_device['interfaces'][0]['name'], |
||||
'name': l2gw_device['device_name']}]), |
||||
('name', fake_l2gw['name']), |
||||
] |
||||
|
||||
self._assert_create_succeeded(fake_l2gw, arg_list, verify_list) |
||||
|
||||
def test_create_l2gateway_with_multiple_devices(self): |
||||
"""Test Create l2gateway for multiple devices.""" |
||||
|
||||
fake_l2gw = fakes.FakeL2GW.create_l2gw(num_dev=2) |
||||
|
||||
self.neutronclient.post = mock.Mock( |
||||
return_value={osc_l2gw.L2_GATEWAY: fake_l2gw}) |
||||
l2gw_device_1 = fake_l2gw['devices'][0] |
||||
l2gw_device_2 = fake_l2gw['devices'][1] |
||||
arg_list = [ |
||||
'--device', 'name=' + l2gw_device_1['device_name'] + |
||||
',interface_names=' + |
||||
l2gw_device_1['interfaces'][0]['name'], |
||||
'--device', 'name=' + l2gw_device_2['device_name'] + |
||||
',interface_names=' + |
||||
l2gw_device_2['interfaces'][0]['name'], |
||||
fake_l2gw['name'] |
||||
] |
||||
verify_list = [ |
||||
('devices', [ |
||||
{'interface_names': l2gw_device_1['interfaces'][0]['name'], |
||||
'name': l2gw_device_1['device_name']}, |
||||
{'interface_names': l2gw_device_2['interfaces'][0]['name'], |
||||
'name': l2gw_device_2['device_name']}, |
||||
]), |
||||
('name', fake_l2gw['name']), |
||||
] |
||||
|
||||
self._assert_create_succeeded(fake_l2gw, arg_list, verify_list) |
||||
|
||||
def test_create_l2gateway_with_multiple_interfaces(self): |
||||
"""Test Create l2gateway with multiple interfaces.""" |
||||
|
||||
fake_l2gw = fakes.FakeL2GW.create_l2gw(num_if=2) |
||||
|
||||
self.neutronclient.post = mock.Mock( |
||||
return_value={osc_l2gw.L2_GATEWAY: fake_l2gw}) |
||||
l2gw_device = fake_l2gw['devices'][0] |
||||
l2gw_interface_1 = l2gw_device['interfaces'][0] |
||||
l2gw_interface_2 = l2gw_device['interfaces'][1] |
||||
arg_list = [ |
||||
'--device', 'name=' + l2gw_device['device_name'] + |
||||
',interface_names=' + l2gw_interface_1['name'] + ';' + |
||||
l2gw_interface_2['name'], |
||||
fake_l2gw['name'] |
||||
] |
||||
verify_list = [ |
||||
('devices', [ |
||||
{ |
||||
'interface_names': |
||||
l2gw_interface_1['name'] + ';' + |
||||
l2gw_interface_2['name'], |
||||
'name': l2gw_device['device_name'] |
||||
} |
||||
]), |
||||
('name', fake_l2gw['name']), |
||||
] |
||||
|
||||
self._assert_create_succeeded(fake_l2gw, arg_list, verify_list) |
||||
|
||||
def test_create_l2gateway_with_segmentation_id(self): |
||||
"""Test Create l2gateway with segmentation-id.""" |
||||
|
||||
fake_l2gw = fakes.FakeL2GW.create_l2gw() |
||||
fake_l2gw['devices'][0]['interfaces'][0]['segmentation_id'] = ['42'] |
||||
|
||||
self.neutronclient.post = mock.Mock( |
||||
return_value={osc_l2gw.L2_GATEWAY: fake_l2gw}) |
||||
l2gw_device = fake_l2gw['devices'][0] |
||||
l2gw_interface = l2gw_device['interfaces'][0] |
||||
arg_list = [ |
||||
'--device', 'name=' + l2gw_device['device_name'] + |
||||
',interface_names=' + l2gw_interface['name'] + '|' + |
||||
'#'.join(l2gw_interface['segmentation_id']), |
||||
fake_l2gw['name'] |
||||
] |
||||
|
||||
verify_list = [ |
||||
('devices', [ |
||||
{'interface_names': l2gw_device['interfaces'][0]['name'] + |
||||
'|' + '#'.join(l2gw_interface['segmentation_id']), |
||||
'name': l2gw_device['device_name']}]), |
||||
('name', fake_l2gw['name']), |
||||
] |
||||
|
||||
self._assert_create_succeeded(fake_l2gw, arg_list, verify_list) |
||||
|
||||
def test_create_l2gateway_with_mul_segmentation_id(self): |
||||
"""Test Create l2gateway with multiple segmentation-ids.""" |
||||
|
||||
fake_l2gw = fakes.FakeL2GW.create_l2gw() |
||||
fake_l2gw['devices'][0]['interfaces'][0]['segmentation_id'] = ['42', |
||||
'43'] |
||||
|
||||
self.neutronclient.post = mock.Mock( |
||||
return_value={osc_l2gw.L2_GATEWAY: fake_l2gw}) |
||||
l2gw_device = fake_l2gw['devices'][0] |
||||
l2gw_interface = l2gw_device['interfaces'][0] |
||||
arg_list = [ |
||||
'--device', 'name=' + l2gw_device['device_name'] + |
||||
',interface_names=' + l2gw_interface['name'] + '|' + |
||||
'#'.join(l2gw_interface['segmentation_id']), |
||||
fake_l2gw['name'], |
||||
] |
||||
|
||||
verify_list = [ |
||||
('devices', [ |
||||
{'interface_names': l2gw_device['interfaces'][0]['name'] + |
||||
'|' + '#'.join(l2gw_interface['segmentation_id']), |
||||
'name': l2gw_device['device_name']}]), |
||||
('name', fake_l2gw['name']), |
||||
] |
||||
|
||||
self._assert_create_succeeded(fake_l2gw, arg_list, verify_list) |
||||
|
||||
|
||||
class TestListL2gw(test_fakes.TestNeutronClientOSCV2): |
||||
def setUp(self): |
||||
super(TestListL2gw, self).setUp() |
||||
self.cmd = osc_l2gw.ListL2gw(self.app, self.namespace) |
||||
|
||||
def test_list_l2gateway(self): |
||||
"""Test List l2gateways.""" |
||||
|
||||
fake_l2gws = fakes.FakeL2GW.create_l2gws(count=4) |
||||
self.neutronclient.list = mock.Mock(return_value=fake_l2gws) |
||||
arg_list = [] |
||||
verify_list = [] |
||||
|
||||
parsed_args = self.check_parser(self.cmd, arg_list, verify_list) |
||||
|
||||
headers, data = self.cmd.take_action(parsed_args) |
||||
|
||||
self.neutronclient.list.assert_called_once() |
||||
self.assertEqual(headers, list(headers_long)) |
||||
self.assertListItemEqual( |
||||
list(data), |
||||
[_get_data(fake_l2gw, columns_long) for fake_l2gw |
||||
in fake_l2gws[osc_l2gw.L2_GATEWAYS]] |
||||
) |
||||
|
||||
|
||||
class TestDeleteL2gw(test_fakes.TestNeutronClientOSCV2): |
||||
def setUp(self): |
||||
super(TestDeleteL2gw, self).setUp() |
||||
self.neutronclient.find_resource = mock.Mock( |
||||
side_effect=lambda _, name_or_id: {'id': name_or_id}) |
||||
self.cmd = osc_l2gw.DeleteL2gw(self.app, self.namespace) |
||||
|
||||
def test_delete_l2gateway(self): |
||||
"""Test Delete l2gateway.""" |
||||
|
||||
fake_l2gw = fakes.FakeL2GW.create_l2gw() |
||||
self.neutronclient.delete = mock.Mock() |
||||
|
||||
arg_list = [ |
||||
fake_l2gw['id'], |
||||
] |
||||
verify_list = [ |
||||
(osc_l2gw.L2_GATEWAYS, [fake_l2gw['id']]), |
||||
] |
||||
|
||||
parsed_args = self.check_parser(self.cmd, arg_list, verify_list) |
||||
|
||||
result = self.cmd.take_action(parsed_args) |
||||
|
||||
self.neutronclient.delete.assert_called_once_with( |
||||
osc_l2gw.resource_path % fake_l2gw['id']) |
||||
self.assertIsNone(result) |
||||
|
||||
|
||||
class TestShowL2gw(test_fakes.TestNeutronClientOSCV2): |
||||
def setUp(self): |
||||
super(TestShowL2gw, self).setUp() |
||||
self.neutronclient.find_resource = mock.Mock( |
||||
side_effect=lambda _, name_or_id: {'id': name_or_id}) |
||||
self.cmd = osc_l2gw.ShowL2gw(self.app, self.namespace) |
||||
|
||||
def test_show_l2gateway(self): |
||||
"""Test Show l2gateway: --fields id --fields name myid.""" |
||||
|
||||
fake_l2gw = fakes.FakeL2GW.create_l2gw() |
||||
self.neutronclient.get = mock.Mock( |
||||
return_value={osc_l2gw.L2_GATEWAY: fake_l2gw}) |
||||
arg_list = [ |
||||
fake_l2gw['id'], |
||||
] |
||||
verify_list = [ |
||||
(osc_l2gw.L2_GATEWAY, fake_l2gw['id']), |
||||
] |
||||
|
||||
parsed_args = self.check_parser(self.cmd, arg_list, verify_list) |
||||
|
||||
headers, data = self.cmd.take_action(parsed_args) |
||||
|
||||
self.neutronclient.get.assert_called_once_with( |
||||
osc_l2gw.resource_path % fake_l2gw['id']) |
||||
self.assertEqual(sorted_headers, headers) |
||||
self.assertItemEqual(_get_data(fake_l2gw), data) |
||||
|
||||
|
||||
class TestUpdateL2gw(test_fakes.TestNeutronClientOSCV2): |
||||
_new_device_name = 'new_device' |
||||
_new_interface = 'new_interface' |
||||
_new_name = 'new_name' |
||||
|
||||
columns = ( |
||||
'Devices', |
||||
'ID', |
||||
'Name', |
||||
'Tenant' |
||||
) |
||||
|
||||
def setUp(self): |
||||
super(TestUpdateL2gw, self).setUp() |
||||
self.cmd = osc_l2gw.UpdateL2gw(self.app, self.namespace) |
||||
self.neutronclient.find_resource = mock.Mock( |
||||
side_effect=lambda _, name_or_id: {'id': name_or_id}) |
||||
|
||||
def _assert_update_succeeded(self, new_l2gw, attrs, columns, data): |
||||
self.neutronclient.put.assert_called_once_with( |
||||
osc_l2gw.resource_path % new_l2gw['id'], |
||||
{osc_l2gw.L2_GATEWAY: attrs}) |
||||
self.assertEqual(self.columns, columns) |
||||
self.assertItemEqual(_get_data(new_l2gw), data) |
||||
|
||||
def test_update_l2gateway(self): |
||||
"""Test Update l2gateway.""" |
||||
|
||||
fake_l2gw = fakes.FakeL2GW.create_l2gw() |
||||
new_l2gw = copy.deepcopy(fake_l2gw) |
||||
new_l2gw['name'] = self._new_name |
||||
new_l2gw['devices'][0]['device_name'] = self._new_device_name |
||||
new_l2gw['devices'][0]['interfaces'][0]['name'] = self._new_interface |
||||
|
||||
self.neutronclient.put = mock.Mock( |
||||
return_value={osc_l2gw.L2_GATEWAY: new_l2gw}) |
||||
|
||||
arg_list = [ |
||||
fake_l2gw['id'], |
||||
'--name', self._new_name, |
||||
'--device', 'name=' + self._new_device_name + |
||||
',interface_names=' + self._new_interface, |
||||
] |
||||
verify_list = [ |
||||
('name', self._new_name), |
||||
('devices', [ |
||||
{'interface_names': self._new_interface, |
||||
'name': self._new_device_name}]), |
||||
] |
||||
|
||||
parsed_args = self.check_parser(self.cmd, arg_list, verify_list) |
||||
columns, data = self.cmd.take_action(parsed_args) |
||||
attrs = { |
||||
'name': self._new_name, |
||||
'devices': [ |
||||
{'interfaces': [ |
||||
{'name': self._new_interface}], |
||||
'device_name': self._new_device_name} |
||||
] |
||||
} |
||||
|
||||
self._assert_update_succeeded(new_l2gw, attrs, columns, data) |
||||
|
||||
def test_update_l2gateway_name(self): |
||||
"""Test Update l2gateway name.""" |
||||
|
||||
fake_l2gw = fakes.FakeL2GW.create_l2gw() |
||||
new_l2gw = copy.deepcopy(fake_l2gw) |
||||
new_l2gw['name'] = self._new_name |
||||
|
||||
self.neutronclient.put = mock.Mock( |
||||
return_value={osc_l2gw.L2_GATEWAY: new_l2gw}) |
||||
|
||||
arg_list = [ |
||||
fake_l2gw['id'], |
||||
'--name', self._new_name, |
||||
] |
||||
verify_list = [('name', self._new_name)] |
||||
|
||||
parsed_args = self.check_parser(self.cmd, arg_list, verify_list) |
||||
columns, data = self.cmd.take_action(parsed_args) |
||||
attrs = {'name': self._new_name} |
||||
|
||||
self._assert_update_succeeded(new_l2gw, attrs, columns, data) |
||||
|
||||
def test_update_l2gateway_with_multiple_interfaces(self): |
||||
"""Test Update l2gateway with multiple interfaces.""" |
||||
|
||||
fake_l2gw = fakes.FakeL2GW.create_l2gw() |
||||
new_l2gw = copy.deepcopy(fake_l2gw) |
||||
new_l2gw['devices'][0]['interfaces'].append( |
||||
{'name': self._new_interface}) |
||||
|
||||
self.neutronclient.put = mock.Mock( |
||||
return_value={osc_l2gw.L2_GATEWAY: new_l2gw}) |
||||
|
||||
l2gw_device = new_l2gw['devices'][0] |
||||
l2gw_interface_1 = l2gw_device['interfaces'][0] |
||||
l2gw_interface_2 = l2gw_device['interfaces'][1] |
||||
arg_list = [ |
||||
fake_l2gw['id'], |
||||
'--device', 'name=' + l2gw_device['device_name'] + |
||||
',interface_names=' + l2gw_interface_1['name'] + ';' + |
||||
l2gw_interface_2['name'] |
||||
] |
||||
|
||||
verify_list = [ |
||||
('devices', [ |
||||
{ |
||||
'interface_names': |
||||
l2gw_interface_1['name'] + ';' + |
||||
l2gw_interface_2['name'], |
||||
'name': l2gw_device['device_name'] |
||||
} |
||||
]), |
||||
] |
||||
|
||||
parsed_args = self.check_parser(self.cmd, arg_list, verify_list) |
||||
columns, data = self.cmd.take_action(parsed_args) |
||||
attrs = { |
||||
'devices': [ |
||||
{'device_name': l2gw_device['device_name'], |
||||
'interfaces': [ |
||||
{'name': l2gw_interface_1['name']}, |
||||
{'name': self._new_interface}]} |
||||
] |
||||
} |
||||
|
||||
self._assert_update_succeeded(new_l2gw, attrs, columns, data) |
||||
|
||||
def test_update_l2gateway_with_segmentation_id(self): |
||||
"""Test Update l2gateway with segmentation-id.""" |
||||
|
||||
fake_l2gw = fakes.FakeL2GW.create_l2gw() |
||||
|
||||
new_l2gw = copy.deepcopy(fake_l2gw) |
||||
new_l2gw['devices'][0]['interfaces'][0]['segmentation_id'] = ['42'] |
||||
|
||||
self.neutronclient.put = mock.Mock( |
||||
return_value={osc_l2gw.L2_GATEWAY: new_l2gw}) |
||||
|
||||
l2gw_device = new_l2gw['devices'][0] |
||||
l2gw_interface = l2gw_device['interfaces'][0] |
||||
arg_list = [ |
||||
fake_l2gw['id'], |
||||
'--device', 'name=' + l2gw_device['device_name'] + |
||||
',interface_names=' + l2gw_interface['name'] + '|' + |
||||
'#'.join(l2gw_interface['segmentation_id']), |
||||
] |
||||
|
||||
verify_list = [ |
||||
('devices', [ |
||||
{'interface_names': l2gw_device['interfaces'][0]['name'] + |
||||
'|' + '#'.join(l2gw_interface['segmentation_id']), |
||||
'name': l2gw_device['device_name']}]), |
||||
] |
||||
|
||||
parsed_args = self.check_parser(self.cmd, arg_list, verify_list) |
||||
columns, data = self.cmd.take_action(parsed_args) |
||||
attrs = { |
||||
'devices': [ |
||||
{'device_name': l2gw_device['device_name'], |
||||
'interfaces': [ |
||||
{'name': l2gw_interface['name'], |
||||
'segmentation_id': l2gw_interface['segmentation_id']}] |
||||
} |
||||
] |
||||
} |
||||
|
||||
self._assert_update_succeeded(new_l2gw, attrs, columns, data) |
||||
|
||||
def test_update_l2gateway_with_mul_segmentation_ids(self): |
||||
"""Test Update l2gateway with multiple segmentation-ids.""" |
||||
|
||||
fake_l2gw = fakes.FakeL2GW.create_l2gw() |
||||
|
||||
new_l2gw = copy.deepcopy(fake_l2gw) |
||||
new_l2gw['devices'][0]['interfaces'][0]['segmentation_id'] = ['42', |
||||
'43'] |
||||
self.neutronclient.put = mock.Mock( |
||||
return_value={osc_l2gw.L2_GATEWAY: new_l2gw}) |
||||
|
||||
l2gw_device = new_l2gw['devices'][0] |
||||
l2gw_interface = l2gw_device['interfaces'][0] |
||||
arg_list = [ |
||||
fake_l2gw['id'], |
||||
'--device', 'name=' + l2gw_device['device_name'] + |
||||
',interface_names=' + l2gw_interface['name'] + '|' + |
||||
'#'.join(l2gw_interface['segmentation_id']), |
||||
] |
||||
|
||||
verify_list = [ |
||||
('devices', [ |
||||
{'interface_names': l2gw_device['interfaces'][0]['name'] + |
||||
'|' + '#'.join(l2gw_interface['segmentation_id']), |
||||
'name': l2gw_device['device_name']}]), |
||||
] |
||||
|
||||
parsed_args = self.check_parser(self.cmd, arg_list, verify_list) |
||||
columns, data = self.cmd.take_action(parsed_args) |
||||
attrs = { |
||||
'devices': [ |
||||
{'device_name': l2gw_device['device_name'], |
||||
'interfaces': [ |
||||
{'name': l2gw_interface['name'], |
||||
'segmentation_id': l2gw_interface['segmentation_id']}] |
||||
} |
||||
] |
||||
} |
||||
|
||||
self._assert_update_succeeded(new_l2gw, attrs, columns, data) |
@ -0,0 +1,173 @@
|
||||
# All Rights Reserved 2018 |
||||
# |
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may |
||||
# not use this file except in compliance with the License. You may obtain |
||||
# a copy of the License at |
||||
# |
||||
# http://www.apache.org/licenses/LICENSE-2.0 |
||||
# |
||||
# Unless required by applicable law or agreed to in writing, software |
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
||||
# License for the specific language governing permissions and limitations |
||||
# under the License. |
||||
|
||||
import operator |
||||
|
||||
import mock |
||||
from osc_lib import utils as osc_utils |
||||
from osc_lib.utils import columns as column_util |
||||
|
||||
from neutronclient.tests.unit.osc.v2 import fakes as test_fakes |
||||
|
||||
from networking_l2gw.l2gatewayclient.osc import l2gw_connection as \ |
||||
osc_l2gw_conn |
||||
from networking_l2gw.tests.unit.l2gatewayclient.osc import fakes |
||||
|
||||
columns_long = tuple(col for col, _, listing_mode in osc_l2gw_conn._attr_map |
||||
if listing_mode in (column_util.LIST_BOTH, |
||||
column_util.LIST_LONG_ONLY)) |
||||
headers_long = tuple(head for _, head, listing_mode in osc_l2gw_conn._attr_map |
||||
if listing_mode in (column_util.LIST_BOTH, |
||||
column_util.LIST_LONG_ONLY)) |
||||
sorted_attr_map = sorted(osc_l2gw_conn._attr_map, key=operator.itemgetter(1)) |
||||
sorted_columns = tuple(col for col, _, _ in sorted_attr_map) |
||||
sorted_headers = tuple(head for _, head, _ in sorted_attr_map) |
||||
|
||||
|
||||
def _get_data(attrs, columns=sorted_columns): |
||||
return osc_utils.get_dict_properties(attrs, columns) |
||||
|
||||
|
||||
class TestCreateL2gwConnection(test_fakes.TestNeutronClientOSCV2): |
||||
columns = ( |
||||
'ID', |
||||
'L2 GateWay ID', |
||||
'Network ID', |
||||
'Segmentation ID', |
||||
'Tenant' |
||||
) |
||||
|
||||
def setUp(self): |
||||
super(TestCreateL2gwConnection, self).setUp() |
||||
self.cmd = osc_l2gw_conn.CreateL2gwConnection( |
||||
self.app, self.namespace) |
||||
self.neutronclient.find_resource = mock.Mock( |
||||
side_effect=lambda _, name_or_id, *x: {'id': name_or_id}) |
||||
|
||||
def test_create_l2gateway_connection(self): |
||||
"""Test Create l2gateway-connection.""" |
||||
|
||||
fake_connection = fakes.FakeL2GWConnection.create_l2gw_connection() |
||||
self.neutronclient.post = mock.Mock( |
||||
return_value={ |
||||
osc_l2gw_conn.L2_GATEWAY_CONNECTION: fake_connection |
||||
}) |
||||
|
||||
arg_list = [ |
||||
fake_connection['l2_gateway_id'], |
||||
fake_connection['network_id'], |
||||
'--default-segmentation-id', fake_connection['segmentation_id'] |
||||
] |
||||
verify_list = [ |
||||
('gateway_name', fake_connection['l2_gateway_id']), |
||||
('network', fake_connection['network_id']), |
||||
('seg_id', fake_connection['segmentation_id']) |
||||
] |
||||
|
||||
parsed_args = self.check_parser(self.cmd, arg_list, verify_list) |
||||
columns, data = self.cmd.take_action(parsed_args) |
||||
self.neutronclient.post.assert_called_once_with( |
||||
osc_l2gw_conn.object_path, |
||||
{osc_l2gw_conn.L2_GATEWAY_CONNECTION: |
||||
{'segmentation_id': fake_connection['segmentation_id'], |
||||
'network_id': fake_connection['network_id'], |
||||
'l2_gateway_id': fake_connection['l2_gateway_id'] |
||||
} |
||||
} |
||||
) |
||||
self.assertEqual(self.columns, columns) |
||||
self.assertItemEqual(_get_data(fake_connection), data) |
||||
|
||||
|
||||
class TestListL2gwConnection(test_fakes.TestNeutronClientOSCV2): |
||||
def setUp(self): |
||||
super(TestListL2gwConnection, self).setUp() |
||||
self.cmd = osc_l2gw_conn.ListL2gwConnection(self.app, self.namespace) |
||||
|
||||
def test_list_l2gateway_connection(self): |
||||
"""Test List l2gateway-connections.""" |
||||
|
||||
fake_connections = fakes.FakeL2GWConnection.create_l2gw_connections( |
||||
count=3) |
||||
self.neutronclient.list = mock.Mock(return_value=fake_connections) |
||||
arg_list = [] |
||||
verify_list = [] |
||||
|
||||
parsed_args = self.check_parser(self.cmd, arg_list, verify_list) |
||||
|
||||
headers, data = self.cmd.take_action(parsed_args) |
||||
|
||||
self.neutronclient.list.assert_called_once() |
||||
self.assertEqual(headers, list(headers_long)) |
||||
self.assertListItemEqual( |
||||
list(data), |
||||
[_get_data(fake_connection, columns_long) for fake_connection |
||||
in fake_connections[osc_l2gw_conn.L2_GATEWAY_CONNECTIONS]] |
||||
) |
||||
|
||||
|
||||
class TestShowL2gwConnection(test_fakes.TestNeutronClientOSCV2): |
||||
def setUp(self): |
||||
super(TestShowL2gwConnection, self).setUp() |
||||
self.cmd = osc_l2gw_conn.ShowL2gwConnection(self.app, self.namespace) |
||||
self.neutronclient.find_resource = mock.Mock( |
||||
side_effect=lambda _, name_or_id, *x: {'id': name_or_id}) |
||||
|
||||
def test_show_l2gateway_connection(self): |
||||
"""Test Show l2gateway-connection.""" |
||||
|
||||
fake_connection = fakes.FakeL2GWConnection.create_l2gw_connection() |
||||
self.neutronclient.get = mock.Mock( |
||||
return_value={ |
||||
osc_l2gw_conn.L2_GATEWAY_CONNECTION: fake_connection |
||||
}) |
||||
arg_list = [fake_connection['id']] |
||||
verify_list = [ |
||||
(osc_l2gw_conn.L2_GATEWAY_CONNECTION, fake_connection['id']) |
||||
] |
||||
|
||||
parsed_args = self.check_parser(self.cmd, arg_list, verify_list) |
||||
|
||||
headers, data = self.cmd.take_action(parsed_args) |
||||
|
||||
self.neutronclient.get.assert_called_once_with( |
||||
osc_l2gw_conn.resource_path % fake_connection['id']) |
||||
self.assertEqual(sorted_headers, headers) |
||||
self.assertItemEqual(_get_data(fake_connection), data) |
||||
|
||||
|
||||
class TestDeleteL2gwConnection(test_fakes.TestNeutronClientOSCV2): |
||||
def setUp(self): |
||||
super(TestDeleteL2gwConnection, self).setUp() |
||||
self.neutronclient.find_resource = mock.Mock( |
||||
side_effect=lambda _, name_or_id: {'id': name_or_id}) |
||||
self.cmd = osc_l2gw_conn.DeleteL2gwConnection(self.app, self.namespace) |
||||
|
||||
def test_delete_l2gateway_connection(self): |
||||
"""Test Delete l2gateway-connection.""" |
||||
|
||||
fake_connection = fakes.FakeL2GWConnection.create_l2gw_connection() |
||||
self.neutronclient.delete = mock.Mock(return_value=fake_connection) |
||||
arg_list = [fake_connection['id']] |
||||
verify_list = [ |
||||
(osc_l2gw_conn.L2_GATEWAY_CONNECTIONS, [fake_connection['id']]) |
||||
] |
||||
|
||||
parsed_args = self.check_parser(self.cmd, arg_list, verify_list) |
||||
|
||||
result = self.cmd.take_action(parsed_args) |
||||
|
||||
self.neutronclient.delete.assert_called_once_with( |
||||
osc_l2gw_conn.resource_path % fake_connection['id']) |
||||
self.assertIsNone(result) |
Loading…
Reference in new issue