Remove the last remaining vendor code
Partial-Bug: #1520062 Change-Id: Ibbb03d52d9b2f6af7151aaf7bfdbb4734cea9d16
This commit is contained in:
@@ -1,265 +0,0 @@
|
||||
# Copyright 2013 OpenStack Foundation.
|
||||
# All Rights Reserved
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
|
||||
from __future__ import print_function
|
||||
|
||||
from neutronclient._i18n import _
|
||||
from neutronclient.common import utils
|
||||
from neutronclient.neutron import v2_0 as neutronV20
|
||||
|
||||
GW_RESOURCE = 'network_gateway'
|
||||
DEV_RESOURCE = 'gateway_device'
|
||||
CONNECTOR_TYPE_HELP = _("Type of the transport zone connector to use for this "
|
||||
"device. Valid values are gre, stt, ipsec_gre, "
|
||||
"ipsec_stt, and bridge. Defaults to stt. ipsecgre and "
|
||||
"ipsecstt are also accepted as alternative names")
|
||||
CONNECTOR_IP_HELP = _("IP address for this device's transport connector. "
|
||||
"It must correspond to the IP address of the interface "
|
||||
"used for tenant traffic on the NSX gateway node.")
|
||||
CLIENT_CERT_HELP = _("PEM certificate used by the NSX gateway transport node "
|
||||
"to authenticate with the NSX controller.")
|
||||
CLIENT_CERT_FILE_HELP = _("File containing the PEM certificate used by the "
|
||||
"NSX gateway transport node to authenticate with "
|
||||
"the NSX controller.")
|
||||
|
||||
|
||||
class ListGatewayDevice(neutronV20.ListCommand):
|
||||
"""List network gateway devices for a given tenant."""
|
||||
|
||||
resource = DEV_RESOURCE
|
||||
list_columns = ['id', 'name']
|
||||
|
||||
|
||||
class ShowGatewayDevice(neutronV20.ShowCommand):
|
||||
"""Show information for a given network gateway device."""
|
||||
|
||||
resource = DEV_RESOURCE
|
||||
|
||||
|
||||
def read_cert_file(cert_file):
|
||||
return open(cert_file, 'rb').read()
|
||||
|
||||
|
||||
def gateway_device_args2body(parsed_args):
|
||||
body = {}
|
||||
if parsed_args.name:
|
||||
body['name'] = parsed_args.name
|
||||
if parsed_args.connector_type:
|
||||
body['connector_type'] = parsed_args.connector_type
|
||||
if parsed_args.connector_ip:
|
||||
body['connector_ip'] = parsed_args.connector_ip
|
||||
cert_data = None
|
||||
if parsed_args.cert_file:
|
||||
cert_data = read_cert_file(parsed_args.cert_file)
|
||||
elif parsed_args.cert_data:
|
||||
cert_data = parsed_args.cert_data
|
||||
if cert_data:
|
||||
body['client_certificate'] = cert_data
|
||||
if getattr(parsed_args, 'tenant_id', None):
|
||||
body['tenant_id'] = parsed_args.tenant_id
|
||||
return {DEV_RESOURCE: body}
|
||||
|
||||
|
||||
class CreateGatewayDevice(neutronV20.CreateCommand):
|
||||
"""Create a network gateway device."""
|
||||
|
||||
resource = DEV_RESOURCE
|
||||
|
||||
def add_known_arguments(self, parser):
|
||||
parser.add_argument(
|
||||
'name', metavar='NAME',
|
||||
help='Name of network gateway device to create.')
|
||||
parser.add_argument(
|
||||
'--connector-type',
|
||||
default='stt',
|
||||
choices=['stt', 'gre', 'ipsecgre', 'ipsecstt', 'bridge',
|
||||
'ipsec_gre', 'ipsec_stt'],
|
||||
help=CONNECTOR_TYPE_HELP)
|
||||
parser.add_argument(
|
||||
'--connector-ip',
|
||||
required=True,
|
||||
help=CONNECTOR_IP_HELP)
|
||||
client_cert_group = parser.add_mutually_exclusive_group(
|
||||
required=True)
|
||||
client_cert_group.add_argument(
|
||||
'--client-certificate',
|
||||
dest='cert_data',
|
||||
help=CLIENT_CERT_HELP)
|
||||
client_cert_group.add_argument(
|
||||
'--client-certificate-file',
|
||||
dest='cert_file',
|
||||
help=CLIENT_CERT_FILE_HELP)
|
||||
|
||||
def args2body(self, parsed_args):
|
||||
return gateway_device_args2body(parsed_args)
|
||||
|
||||
|
||||
class UpdateGatewayDevice(neutronV20.UpdateCommand):
|
||||
"""Update a network gateway device."""
|
||||
|
||||
resource = DEV_RESOURCE
|
||||
|
||||
def add_known_arguments(self, parser):
|
||||
parser.add_argument(
|
||||
'--name', metavar='NAME',
|
||||
help='New name for network gateway device.')
|
||||
parser.add_argument(
|
||||
'--connector-type',
|
||||
required=False,
|
||||
choices=['stt', 'gre', 'ipsecgre', 'ipsecstt', 'bridge',
|
||||
'ipsec_gre', 'ipsec_stt'],
|
||||
help=CONNECTOR_TYPE_HELP)
|
||||
parser.add_argument(
|
||||
'--connector-ip',
|
||||
required=False,
|
||||
help=CONNECTOR_IP_HELP)
|
||||
client_cert_group = parser.add_mutually_exclusive_group()
|
||||
client_cert_group.add_argument(
|
||||
'--client-certificate',
|
||||
dest='cert_data',
|
||||
help=CLIENT_CERT_HELP)
|
||||
client_cert_group.add_argument(
|
||||
'--client-certificate-file',
|
||||
dest='cert_file',
|
||||
help=CLIENT_CERT_FILE_HELP)
|
||||
|
||||
def args2body(self, parsed_args):
|
||||
return gateway_device_args2body(parsed_args)
|
||||
|
||||
|
||||
class DeleteGatewayDevice(neutronV20.DeleteCommand):
|
||||
"""Delete a given network gateway device."""
|
||||
|
||||
resource = DEV_RESOURCE
|
||||
|
||||
|
||||
class ListNetworkGateway(neutronV20.ListCommand):
|
||||
"""List network gateways for a given tenant."""
|
||||
|
||||
resource = GW_RESOURCE
|
||||
list_columns = ['id', 'name']
|
||||
|
||||
|
||||
class ShowNetworkGateway(neutronV20.ShowCommand):
|
||||
"""Show information of a given network gateway."""
|
||||
|
||||
resource = GW_RESOURCE
|
||||
|
||||
|
||||
class CreateNetworkGateway(neutronV20.CreateCommand):
|
||||
"""Create a network gateway."""
|
||||
|
||||
resource = GW_RESOURCE
|
||||
|
||||
def add_known_arguments(self, parser):
|
||||
parser.add_argument(
|
||||
'name', metavar='NAME',
|
||||
help=_('Name of network gateway to create.'))
|
||||
parser.add_argument(
|
||||
'--device', metavar='id=ID,interface_name=NAME_OR_ID',
|
||||
action='append',
|
||||
help=_('Device info for this gateway. You can repeat this '
|
||||
'option for multiple devices for HA gateways.'))
|
||||
|
||||
def args2body(self, parsed_args):
|
||||
body = {'name': parsed_args.name}
|
||||
devices = []
|
||||
if parsed_args.device:
|
||||
for device in parsed_args.device:
|
||||
devices.append(utils.str2dict(device))
|
||||
if devices:
|
||||
body['devices'] = devices
|
||||
if parsed_args.tenant_id:
|
||||
body['tenant_id'] = parsed_args.tenant_id
|
||||
return {self.resource: body}
|
||||
|
||||
|
||||
class DeleteNetworkGateway(neutronV20.DeleteCommand):
|
||||
"""Delete a given network gateway."""
|
||||
|
||||
resource = GW_RESOURCE
|
||||
|
||||
|
||||
class UpdateNetworkGateway(neutronV20.UpdateCommand):
|
||||
"""Update the name for a network gateway."""
|
||||
|
||||
resource = GW_RESOURCE
|
||||
|
||||
|
||||
class NetworkGatewayInterfaceCommand(neutronV20.NeutronCommand):
|
||||
"""Base class for connecting/disconnecting networks to/from a gateway."""
|
||||
|
||||
resource = GW_RESOURCE
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super(NetworkGatewayInterfaceCommand,
|
||||
self).get_parser(prog_name)
|
||||
parser.add_argument(
|
||||
'net_gateway_id', metavar='NET-GATEWAY-ID',
|
||||
help=_('ID of the network gateway.'))
|
||||
parser.add_argument(
|
||||
'network_id', metavar='NETWORK-ID',
|
||||
help=_('ID of the internal network to connect on the gateway.'))
|
||||
parser.add_argument(
|
||||
'--segmentation-type',
|
||||
help=_('L2 segmentation strategy on the external side of '
|
||||
'the gateway (e.g.: VLAN, FLAT).'))
|
||||
parser.add_argument(
|
||||
'--segmentation-id',
|
||||
help=_('Identifier for the L2 segment on the external side '
|
||||
'of the gateway.'))
|
||||
return parser
|
||||
|
||||
def retrieve_ids(self, client, args):
|
||||
gateway_id = neutronV20.find_resourceid_by_name_or_id(
|
||||
client, self.resource, args.net_gateway_id)
|
||||
network_id = neutronV20.find_resourceid_by_name_or_id(
|
||||
client, 'network', args.network_id)
|
||||
return (gateway_id, network_id)
|
||||
|
||||
|
||||
class ConnectNetworkGateway(NetworkGatewayInterfaceCommand):
|
||||
"""Add an internal network interface to a router."""
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
neutron_client = self.get_client()
|
||||
(gateway_id, network_id) = self.retrieve_ids(neutron_client,
|
||||
parsed_args)
|
||||
neutron_client.connect_network_gateway(
|
||||
gateway_id, {'network_id': network_id,
|
||||
'segmentation_type': parsed_args.segmentation_type,
|
||||
'segmentation_id': parsed_args.segmentation_id})
|
||||
# TODO(Salvatore-Orlando): Do output formatting as
|
||||
# any other command
|
||||
print(_('Connected network to gateway %s') % gateway_id,
|
||||
file=self.app.stdout)
|
||||
|
||||
|
||||
class DisconnectNetworkGateway(NetworkGatewayInterfaceCommand):
|
||||
"""Remove a network from a network gateway."""
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
neutron_client = self.get_client()
|
||||
(gateway_id, network_id) = self.retrieve_ids(neutron_client,
|
||||
parsed_args)
|
||||
neutron_client.disconnect_network_gateway(
|
||||
gateway_id, {'network_id': network_id,
|
||||
'segmentation_type': parsed_args.segmentation_type,
|
||||
'segmentation_id': parsed_args.segmentation_id})
|
||||
# TODO(Salvatore-Orlando): Do output formatting as
|
||||
# any other command
|
||||
print(_('Disconnected network from gateway %s') % gateway_id,
|
||||
file=self.app.stdout)
|
||||
@@ -1,82 +0,0 @@
|
||||
# Copyright 2013 VMware Inc.
|
||||
# All Rights Reserved
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from neutronclient._i18n import _
|
||||
from neutronclient.neutron import v2_0 as neutronV20
|
||||
|
||||
|
||||
class ListQoSQueue(neutronV20.ListCommand):
|
||||
"""List queues that belong to a given tenant."""
|
||||
|
||||
resource = 'qos_queue'
|
||||
list_columns = ['id', 'name', 'min', 'max',
|
||||
'qos_marking', 'dscp', 'default']
|
||||
|
||||
|
||||
class ShowQoSQueue(neutronV20.ShowCommand):
|
||||
"""Show information of a given queue."""
|
||||
|
||||
resource = 'qos_queue'
|
||||
allow_names = True
|
||||
|
||||
|
||||
class CreateQoSQueue(neutronV20.CreateCommand):
|
||||
"""Create a queue."""
|
||||
|
||||
resource = 'qos_queue'
|
||||
|
||||
def add_known_arguments(self, parser):
|
||||
parser.add_argument(
|
||||
'name', metavar='NAME',
|
||||
help=_('Name of queue.'))
|
||||
parser.add_argument(
|
||||
'--min',
|
||||
help=_('Minimum rate.')),
|
||||
parser.add_argument(
|
||||
'--max',
|
||||
help=_('Maximum rate.')),
|
||||
parser.add_argument(
|
||||
'--qos-marking',
|
||||
help=_('QOS marking as untrusted or trusted.')),
|
||||
parser.add_argument(
|
||||
'--default',
|
||||
default=False,
|
||||
help=_('If true all created ports will be the size of this queue, '
|
||||
'if queue is not specified')),
|
||||
parser.add_argument(
|
||||
'--dscp',
|
||||
help=_('Differentiated Services Code Point.')),
|
||||
|
||||
def args2body(self, parsed_args):
|
||||
params = {'name': parsed_args.name,
|
||||
'default': parsed_args.default}
|
||||
if parsed_args.min:
|
||||
params['min'] = parsed_args.min
|
||||
if parsed_args.max:
|
||||
params['max'] = parsed_args.max
|
||||
if parsed_args.qos_marking:
|
||||
params['qos_marking'] = parsed_args.qos_marking
|
||||
if parsed_args.dscp:
|
||||
params['dscp'] = parsed_args.dscp
|
||||
if parsed_args.tenant_id:
|
||||
params['tenant_id'] = parsed_args.tenant_id
|
||||
return {'qos_queue': params}
|
||||
|
||||
|
||||
class DeleteQoSQueue(neutronV20.DeleteCommand):
|
||||
"""Delete a given queue."""
|
||||
|
||||
resource = 'qos_queue'
|
||||
allow_names = True
|
||||
@@ -69,8 +69,6 @@ from neutronclient.neutron.v2_0.lb import vip as lb_vip
|
||||
from neutronclient.neutron.v2_0 import metering
|
||||
from neutronclient.neutron.v2_0 import network
|
||||
from neutronclient.neutron.v2_0 import network_ip_availability
|
||||
from neutronclient.neutron.v2_0.nsx import networkgateway
|
||||
from neutronclient.neutron.v2_0.nsx import qos_queue
|
||||
from neutronclient.neutron.v2_0 import port
|
||||
from neutronclient.neutron.v2_0 import purge
|
||||
from neutronclient.neutron.v2_0.qos import bandwidth_limit_rule
|
||||
@@ -274,26 +272,10 @@ COMMAND_V2 = {
|
||||
'lb-healthmonitor-disassociate': (
|
||||
lb_healthmonitor.DisassociateHealthMonitor
|
||||
),
|
||||
'queue-create': qos_queue.CreateQoSQueue,
|
||||
'queue-delete': qos_queue.DeleteQoSQueue,
|
||||
'queue-show': qos_queue.ShowQoSQueue,
|
||||
'queue-list': qos_queue.ListQoSQueue,
|
||||
'agent-list': agent.ListAgent,
|
||||
'agent-show': agent.ShowAgent,
|
||||
'agent-delete': agent.DeleteAgent,
|
||||
'agent-update': agent.UpdateAgent,
|
||||
'net-gateway-create': networkgateway.CreateNetworkGateway,
|
||||
'net-gateway-update': networkgateway.UpdateNetworkGateway,
|
||||
'net-gateway-delete': networkgateway.DeleteNetworkGateway,
|
||||
'net-gateway-show': networkgateway.ShowNetworkGateway,
|
||||
'net-gateway-list': networkgateway.ListNetworkGateway,
|
||||
'net-gateway-connect': networkgateway.ConnectNetworkGateway,
|
||||
'net-gateway-disconnect': networkgateway.DisconnectNetworkGateway,
|
||||
'gateway-device-create': networkgateway.CreateGatewayDevice,
|
||||
'gateway-device-update': networkgateway.UpdateGatewayDevice,
|
||||
'gateway-device-delete': networkgateway.DeleteGatewayDevice,
|
||||
'gateway-device-show': networkgateway.ShowGatewayDevice,
|
||||
'gateway-device-list': networkgateway.ListGatewayDevice,
|
||||
'dhcp-agent-network-add': agentscheduler.AddNetworkToDhcpAgent,
|
||||
'dhcp-agent-network-remove': agentscheduler.RemoveNetworkFromDhcpAgent,
|
||||
'net-list-on-dhcp-agent': agentscheduler.ListNetworksOnDhcpAgent,
|
||||
|
||||
@@ -1,263 +0,0 @@
|
||||
# Copyright 2012 VMware, Inc
|
||||
# All Rights Reserved
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
|
||||
import sys
|
||||
|
||||
from mox3 import mox
|
||||
import six
|
||||
|
||||
from neutronclient.neutron.v2_0.nsx import networkgateway as nwgw
|
||||
from neutronclient.tests.unit import test_cli20
|
||||
|
||||
|
||||
class CLITestV20NetworkGatewayJSON(test_cli20.CLITestV20Base):
|
||||
|
||||
gw_resource = "network_gateway"
|
||||
dev_resource = "gateway_device"
|
||||
|
||||
non_admin_status_resources = ['network_gateway', 'gateway_device']
|
||||
|
||||
def setUp(self):
|
||||
super(CLITestV20NetworkGatewayJSON, self).setUp(
|
||||
plurals={'devices': 'device',
|
||||
'network_gateways': 'network_gateway'})
|
||||
|
||||
def test_create_gateway(self):
|
||||
cmd = nwgw.CreateNetworkGateway(test_cli20.MyApp(sys.stdout), None)
|
||||
name = 'gw-test'
|
||||
myid = 'myid'
|
||||
args = [name, ]
|
||||
position_names = ['name', ]
|
||||
position_values = [name, ]
|
||||
self._test_create_resource(self.gw_resource, cmd, name, myid, args,
|
||||
position_names, position_values)
|
||||
|
||||
def test_create_gateway_with_tenant(self):
|
||||
cmd = nwgw.CreateNetworkGateway(test_cli20.MyApp(sys.stdout), None)
|
||||
name = 'gw-test'
|
||||
myid = 'myid'
|
||||
args = ['--tenant_id', 'tenantid', name]
|
||||
position_names = ['name', ]
|
||||
position_values = [name, ]
|
||||
self._test_create_resource(self.gw_resource, cmd, name, myid, args,
|
||||
position_names, position_values,
|
||||
tenant_id='tenantid')
|
||||
|
||||
def test_create_gateway_with_device(self):
|
||||
cmd = nwgw.CreateNetworkGateway(test_cli20.MyApp(sys.stdout), None)
|
||||
name = 'gw-test'
|
||||
myid = 'myid'
|
||||
args = ['--device', 'device_id=test', name, ]
|
||||
position_names = ['name', ]
|
||||
position_values = [name, ]
|
||||
self._test_create_resource(self.gw_resource, cmd, name, myid, args,
|
||||
position_names, position_values,
|
||||
devices=[{'device_id': 'test'}])
|
||||
|
||||
def test_list_gateways(self):
|
||||
resources = '%ss' % self.gw_resource
|
||||
cmd = nwgw.ListNetworkGateway(test_cli20.MyApp(sys.stdout), None)
|
||||
self._test_list_resources(resources, cmd, True)
|
||||
|
||||
def test_update_gateway(self):
|
||||
cmd = nwgw.UpdateNetworkGateway(test_cli20.MyApp(sys.stdout), None)
|
||||
self._test_update_resource(self.gw_resource, cmd, 'myid',
|
||||
['myid', '--name', 'higuain'],
|
||||
{'name': 'higuain'})
|
||||
|
||||
def test_delete_gateway(self):
|
||||
cmd = nwgw.DeleteNetworkGateway(test_cli20.MyApp(sys.stdout), None)
|
||||
myid = 'myid'
|
||||
args = [myid]
|
||||
self._test_delete_resource(self.gw_resource, cmd, myid, args)
|
||||
|
||||
def test_show_gateway(self):
|
||||
cmd = nwgw.ShowNetworkGateway(test_cli20.MyApp(sys.stdout), None)
|
||||
args = ['--fields', 'id', '--fields', 'name', self.test_id]
|
||||
self._test_show_resource(self.gw_resource, cmd, self.test_id, args,
|
||||
['id', 'name'])
|
||||
|
||||
def test_connect_network_to_gateway(self):
|
||||
cmd = nwgw.ConnectNetworkGateway(test_cli20.MyApp(sys.stdout), None)
|
||||
args = ['gw_id', 'net_id',
|
||||
'--segmentation-type', 'edi',
|
||||
'--segmentation-id', '7']
|
||||
self._test_update_resource_action(self.gw_resource, cmd, 'gw_id',
|
||||
'connect_network',
|
||||
args,
|
||||
{'network_id': 'net_id',
|
||||
'segmentation_type': 'edi',
|
||||
'segmentation_id': '7'})
|
||||
|
||||
def test_disconnect_network_from_gateway(self):
|
||||
cmd = nwgw.DisconnectNetworkGateway(test_cli20.MyApp(sys.stdout), None)
|
||||
args = ['gw_id', 'net_id',
|
||||
'--segmentation-type', 'edi',
|
||||
'--segmentation-id', '7']
|
||||
self._test_update_resource_action(self.gw_resource, cmd, 'gw_id',
|
||||
'disconnect_network',
|
||||
args,
|
||||
{'network_id': 'net_id',
|
||||
'segmentation_type': 'edi',
|
||||
'segmentation_id': '7'})
|
||||
|
||||
def _test_create_gateway_device(self,
|
||||
name,
|
||||
connector_type,
|
||||
connector_ip,
|
||||
client_certificate=None,
|
||||
client_certificate_file=None,
|
||||
must_raise=False):
|
||||
cmd = nwgw.CreateGatewayDevice(test_cli20.MyApp(sys.stdout), None)
|
||||
myid = 'myid'
|
||||
extra_body = {'connector_type': connector_type,
|
||||
'connector_ip': connector_ip,
|
||||
'client_certificate': client_certificate}
|
||||
self.mox.StubOutWithMock(nwgw, 'read_cert_file')
|
||||
if client_certificate_file:
|
||||
nwgw.read_cert_file(mox.IgnoreArg()).AndReturn('xyz')
|
||||
extra_body['client_certificate'] = 'xyz'
|
||||
self.mox.ReplayAll()
|
||||
position_names = ['name', ]
|
||||
position_values = [name, ]
|
||||
args = []
|
||||
for (k, v) in six.iteritems(extra_body):
|
||||
if (k == 'client_certificate' and client_certificate_file):
|
||||
v = client_certificate_file
|
||||
k = 'client_certificate_file'
|
||||
# Append argument only if value for it was specified
|
||||
if v:
|
||||
args.extend(['--%s' % k.replace('_', '-'), v])
|
||||
# The following is just for verifying the call fails as expected when
|
||||
# both certificate and certificate file are specified. The extra
|
||||
# argument added is client-certificate since the loop above added
|
||||
# client-certificate-file
|
||||
if client_certificate_file and client_certificate:
|
||||
args.extend(['--client-certificate', client_certificate_file])
|
||||
args.append(name)
|
||||
if must_raise:
|
||||
with test_cli20.capture_std_streams():
|
||||
self.assertRaises(
|
||||
SystemExit, self._test_create_resource,
|
||||
self.dev_resource, cmd, name, myid, args,
|
||||
position_names, position_values, extra_body=extra_body)
|
||||
else:
|
||||
self._test_create_resource(
|
||||
self.dev_resource, cmd, name, myid, args,
|
||||
position_names, position_values, extra_body=extra_body)
|
||||
self.mox.UnsetStubs()
|
||||
|
||||
def test_create_gateway_device(self):
|
||||
self._test_create_gateway_device('dev_test', 'stt', '1.1.1.1', 'xyz')
|
||||
|
||||
def test_create_gateway_device_with_certfile(self):
|
||||
self._test_create_gateway_device('dev_test', 'stt', '1.1.1.1',
|
||||
client_certificate_file='some_file')
|
||||
|
||||
def test_create_gateway_device_invalid_connector_type_fails(self):
|
||||
self._test_create_gateway_device('dev_test', 'ciccio',
|
||||
'1.1.1.1', client_certificate='xyz',
|
||||
must_raise=True)
|
||||
|
||||
def test_create_gateway_device_missing_connector_ip_fails(self):
|
||||
self._test_create_gateway_device('dev_test', 'stt',
|
||||
None, client_certificate='xyz',
|
||||
must_raise=True)
|
||||
|
||||
def test_create_gateway_device_missing_certificates_fails(self):
|
||||
self._test_create_gateway_device('dev_test', 'stt', '1.1.1.1',
|
||||
must_raise=True)
|
||||
|
||||
def test_create_gateway_device_with_cert_and_cert_file_fails(self):
|
||||
self._test_create_gateway_device('dev_test', 'stt', '1.1.1.1',
|
||||
client_certificate='xyz',
|
||||
client_certificate_file='some_file',
|
||||
must_raise=True)
|
||||
|
||||
def _test_update_gateway_device(self,
|
||||
name=None,
|
||||
connector_type=None,
|
||||
connector_ip=None,
|
||||
client_certificate=None,
|
||||
client_certificate_file=None,
|
||||
must_raise=False):
|
||||
cmd = nwgw.UpdateGatewayDevice(test_cli20.MyApp(sys.stdout), None)
|
||||
myid = 'myid'
|
||||
extra_body = {}
|
||||
self.mox.StubOutWithMock(nwgw, 'read_cert_file')
|
||||
if client_certificate_file:
|
||||
nwgw.read_cert_file(mox.IgnoreArg()).AndReturn('xyz')
|
||||
self.mox.ReplayAll()
|
||||
args = [myid]
|
||||
|
||||
def process_arg(argname, arg):
|
||||
if arg:
|
||||
extra_body[argname] = arg
|
||||
args.extend(['--%s' % argname.replace('_', '-'), arg])
|
||||
|
||||
process_arg('name', name)
|
||||
process_arg('connector_type', connector_type)
|
||||
process_arg('connector_ip', connector_ip)
|
||||
process_arg('client_certificate', client_certificate)
|
||||
if client_certificate_file:
|
||||
extra_body['client_certificate'] = 'xyz'
|
||||
args.extend(['--client-certificate-file',
|
||||
client_certificate_file])
|
||||
if must_raise:
|
||||
with test_cli20.capture_std_streams():
|
||||
self.assertRaises(
|
||||
SystemExit, self._test_update_resource,
|
||||
self.dev_resource, cmd, myid, args,
|
||||
extrafields=extra_body)
|
||||
else:
|
||||
self._test_update_resource(
|
||||
self.dev_resource, cmd, myid, args,
|
||||
extrafields=extra_body)
|
||||
self.mox.UnsetStubs()
|
||||
|
||||
def test_update_gateway_device(self):
|
||||
self._test_update_gateway_device('dev_test', 'stt', '1.1.1.1', 'xyz')
|
||||
|
||||
def test_update_gateway_device_partial_body(self):
|
||||
self._test_update_gateway_device(name='dev_test',
|
||||
connector_type='stt')
|
||||
|
||||
def test_update_gateway_device_with_certfile(self):
|
||||
self._test_update_gateway_device('dev_test', 'stt', '1.1.1.1',
|
||||
client_certificate_file='some_file')
|
||||
|
||||
def test_update_gateway_device_invalid_connector_type_fails(self):
|
||||
self._test_update_gateway_device('dev_test', 'ciccio',
|
||||
'1.1.1.1', client_certificate='xyz',
|
||||
must_raise=True)
|
||||
|
||||
def test_update_gateway_device_with_cert_and_cert_file_fails(self):
|
||||
self._test_update_gateway_device('dev_test', 'stt', '1.1.1.1',
|
||||
client_certificate='xyz',
|
||||
client_certificate_file='some_file',
|
||||
must_raise=True)
|
||||
|
||||
def test_delete_gateway_device(self):
|
||||
cmd = nwgw.DeleteGatewayDevice(test_cli20.MyApp(sys.stdout), None)
|
||||
myid = 'myid'
|
||||
args = [myid]
|
||||
self._test_delete_resource(self.dev_resource, cmd, myid, args)
|
||||
|
||||
def test_show_gateway_device(self):
|
||||
cmd = nwgw.ShowGatewayDevice(test_cli20.MyApp(sys.stdout), None)
|
||||
args = ['--fields', 'id', '--fields', 'name', self.test_id]
|
||||
self._test_show_resource(self.dev_resource, cmd, self.test_id, args,
|
||||
['id', 'name'])
|
||||
@@ -1,85 +0,0 @@
|
||||
#!/usr/bin/env python
|
||||
# Copyright 2013 VMware Inc.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import sys
|
||||
|
||||
from neutronclient.neutron.v2_0.nsx import qos_queue as qos
|
||||
from neutronclient.tests.unit import test_cli20
|
||||
|
||||
|
||||
class CLITestV20QosQueueJSON(test_cli20.CLITestV20Base):
|
||||
|
||||
non_admin_status_resources = ['qos_queue']
|
||||
|
||||
def setUp(self):
|
||||
super(CLITestV20QosQueueJSON, self).setUp(
|
||||
plurals={'qos_queues': 'qos_queue'})
|
||||
|
||||
def test_create_qos_queue(self):
|
||||
# Create a qos queue.
|
||||
resource = 'qos_queue'
|
||||
cmd = qos.CreateQoSQueue(
|
||||
test_cli20.MyApp(sys.stdout), None)
|
||||
myid = 'myid'
|
||||
name = 'my_queue'
|
||||
default = False
|
||||
args = ['--default', default, name]
|
||||
position_names = ['name', 'default']
|
||||
position_values = [name, default]
|
||||
self._test_create_resource(resource, cmd, name, myid, args,
|
||||
position_names, position_values)
|
||||
|
||||
def test_create_qos_queue_all_values(self):
|
||||
# Create a qos queue.
|
||||
resource = 'qos_queue'
|
||||
cmd = qos.CreateQoSQueue(
|
||||
test_cli20.MyApp(sys.stdout), None)
|
||||
myid = 'myid'
|
||||
name = 'my_queue'
|
||||
default = False
|
||||
min = '10'
|
||||
max = '40'
|
||||
qos_marking = 'untrusted'
|
||||
dscp = '0'
|
||||
args = ['--default', default, '--min', min, '--max', max,
|
||||
'--qos-marking', qos_marking, '--dscp', dscp, name]
|
||||
position_names = ['name', 'default', 'min', 'max', 'qos_marking',
|
||||
'dscp']
|
||||
position_values = [name, default, min, max, qos_marking, dscp]
|
||||
self._test_create_resource(resource, cmd, name, myid, args,
|
||||
position_names, position_values)
|
||||
|
||||
def test_list_qos_queue(self):
|
||||
resources = "qos_queues"
|
||||
cmd = qos.ListQoSQueue(
|
||||
test_cli20.MyApp(sys.stdout), None)
|
||||
self._test_list_resources(resources, cmd, True)
|
||||
|
||||
def test_show_qos_queue_id(self):
|
||||
resource = 'qos_queue'
|
||||
cmd = qos.ShowQoSQueue(
|
||||
test_cli20.MyApp(sys.stdout), None)
|
||||
args = ['--fields', 'id', self.test_id]
|
||||
self._test_show_resource(resource, cmd, self.test_id,
|
||||
args, ['id'])
|
||||
|
||||
def test_delete_qos_queue(self):
|
||||
resource = 'qos_queue'
|
||||
cmd = qos.DeleteQoSQueue(
|
||||
test_cli20.MyApp(sys.stdout), None)
|
||||
myid = 'myid'
|
||||
args = [myid]
|
||||
self._test_delete_resource(resource, cmd, myid, args)
|
||||
@@ -135,9 +135,7 @@ class ShellTest(testtools.TestCase):
|
||||
# just check we have some output
|
||||
required = [
|
||||
'.*--tenant_id',
|
||||
'.*--client-certificate',
|
||||
'.*help',
|
||||
'.*gateway-device-create',
|
||||
'.*--dns-nameserver']
|
||||
help_text, stderr = self.shell('neutron bash-completion')
|
||||
self.assertFalse(stderr)
|
||||
|
||||
Reference in New Issue
Block a user