Merge "Add SFC cli for OSC plugin"
This commit is contained in:
commit
c2783e9880
36
doc/source/cli/osc/v2/networking-sfc.rst
Normal file
36
doc/source/cli/osc/v2/networking-sfc.rst
Normal file
@ -0,0 +1,36 @@
|
||||
==============
|
||||
networking sfc
|
||||
==============
|
||||
|
||||
**Service Function Chaining** is a mechanism for overriding the basic destination based forwarding
|
||||
that is typical of IP networks. Service Function Chains consist of an ordered sequence of
|
||||
Service Functions (SFs). SFs are virtual machines (or potentially physical devices) that perform a
|
||||
network function such as firewall, content cache, packet inspection, or any other function that
|
||||
requires processing of packets in a flow from point A to point B even though the SFs are not
|
||||
literally between point A and B from a routing table perspective.
|
||||
|
||||
Network v2
|
||||
|
||||
.. autoprogram-cliff:: openstack.neutronclient.v2
|
||||
:command: sfc flow classifier *
|
||||
|
||||
.. autoprogram-cliff:: openstack.neutronclient.v2
|
||||
:command: sfc port chain *
|
||||
|
||||
.. autoprogram-cliff:: openstack.neutronclient.v2
|
||||
:command: sfc port pair create
|
||||
|
||||
.. autoprogram-cliff:: openstack.neutronclient.v2
|
||||
:command: sfc port pair delete
|
||||
|
||||
.. autoprogram-cliff:: openstack.neutronclient.v2
|
||||
:command: sfc port pair list
|
||||
|
||||
.. autoprogram-cliff:: openstack.neutronclient.v2
|
||||
:command: sfc port pair set
|
||||
|
||||
.. autoprogram-cliff:: openstack.neutronclient.v2
|
||||
:command: sfc port pair show
|
||||
|
||||
.. autoprogram-cliff:: openstack.neutronclient.v2
|
||||
:command: sfc port pair group *
|
0
neutronclient/osc/v2/sfc/__init__.py
Normal file
0
neutronclient/osc/v2/sfc/__init__.py
Normal file
320
neutronclient/osc/v2/sfc/sfc_flow_classifier.py
Executable file
320
neutronclient/osc/v2/sfc/sfc_flow_classifier.py
Executable file
@ -0,0 +1,320 @@
|
||||
# Copyright (c) 2017 Huawei Technologies India Pvt.Limited.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import argparse
|
||||
import logging
|
||||
|
||||
from osc_lib.command import command
|
||||
from osc_lib import exceptions
|
||||
from osc_lib import utils
|
||||
|
||||
from neutronclient._i18n import _
|
||||
from neutronclient.common import exceptions as nc_exc
|
||||
from neutronclient.osc import utils as nc_osc_utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
resource = 'flow_classifier'
|
||||
|
||||
_attr_map = (
|
||||
('id', 'ID', nc_osc_utils.LIST_BOTH),
|
||||
('name', 'Name', nc_osc_utils.LIST_BOTH),
|
||||
('summary', 'Summary', nc_osc_utils.LIST_SHORT_ONLY),
|
||||
('protocol', 'Protocol', nc_osc_utils.LIST_LONG_ONLY),
|
||||
('ethertype', 'Ethertype', nc_osc_utils.LIST_LONG_ONLY),
|
||||
('source_ip_prefix', 'Source IP',
|
||||
nc_osc_utils.LIST_LONG_ONLY),
|
||||
('destination_ip_prefix', 'Destination IP',
|
||||
nc_osc_utils.LIST_LONG_ONLY),
|
||||
('logical_source_port', 'Logical Source Port',
|
||||
nc_osc_utils.LIST_LONG_ONLY),
|
||||
('logical_destination_port', 'Logical Destination Port',
|
||||
nc_osc_utils.LIST_LONG_ONLY),
|
||||
('source_port_range_min', 'Source Port Range Min',
|
||||
nc_osc_utils.LIST_LONG_ONLY),
|
||||
('source_port_range_max', 'Source Port Range Max',
|
||||
nc_osc_utils.LIST_LONG_ONLY),
|
||||
('destination_port_range_min', 'Destination Port Range Min',
|
||||
nc_osc_utils.LIST_LONG_ONLY),
|
||||
('destination_port_range_max', 'Destination Port Range Max',
|
||||
nc_osc_utils.LIST_LONG_ONLY),
|
||||
('l7_parameters', 'L7 Parameters', nc_osc_utils.LIST_LONG_ONLY),
|
||||
('description', 'Description', nc_osc_utils.LIST_LONG_ONLY),
|
||||
('project_id', 'Project', nc_osc_utils.LIST_LONG_ONLY),
|
||||
)
|
||||
|
||||
|
||||
class CreateSfcFlowClassifier(command.ShowOne):
|
||||
_description = _("Create a flow classifier")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super(CreateSfcFlowClassifier, self).get_parser(prog_name)
|
||||
parser.add_argument(
|
||||
'name',
|
||||
metavar='<name>',
|
||||
help=_('Name of the flow classifier'))
|
||||
parser.add_argument(
|
||||
'--description',
|
||||
metavar='<description>',
|
||||
help=_('Description for the flow classifier'))
|
||||
parser.add_argument(
|
||||
'--protocol',
|
||||
metavar='<protocol>',
|
||||
help=_('IP protocol name. Protocol name should be as per '
|
||||
'IANA standard.'))
|
||||
parser.add_argument(
|
||||
'--ethertype',
|
||||
metavar='{IPv4,IPv6}',
|
||||
default='IPv4', choices=['IPv4', 'IPv6'],
|
||||
help=_('L2 ethertype, default is IPv4'))
|
||||
parser.add_argument(
|
||||
'--source-port',
|
||||
metavar='<min-port>:<max-port>',
|
||||
help=_('Source protocol port (allowed range [1,65535]. Must be '
|
||||
'specified as a:b, where a=min-port and b=max-port) '
|
||||
'in the allowed range.'))
|
||||
parser.add_argument(
|
||||
'--destination-port',
|
||||
metavar='<min-port>:<max-port>',
|
||||
help=_('Destination protocol port (allowed range [1,65535]. Must '
|
||||
'be specified as a:b, where a=min-port and b=max-port) '
|
||||
'in the allowed range.'))
|
||||
parser.add_argument(
|
||||
'--source-ip-prefix',
|
||||
metavar='<source-ip-prefix>',
|
||||
help=_('Source IP address in CIDR notation'))
|
||||
parser.add_argument(
|
||||
'--destination-ip-prefix',
|
||||
metavar='<destination-ip-prefix>',
|
||||
help=_('Destination IP address in CIDR notation'))
|
||||
parser.add_argument(
|
||||
'--logical-source-port',
|
||||
metavar='<logical-source-port>',
|
||||
help=_('Neutron source port (name or ID)'))
|
||||
parser.add_argument(
|
||||
'--logical-destination-port',
|
||||
metavar='<logical-destination-port>',
|
||||
help=_('Neutron destination port (name or ID)'))
|
||||
parser.add_argument(
|
||||
'--l7-parameters',
|
||||
help=_('Dictionary of L7 parameters. Currently, no value is '
|
||||
'supported for this option.'))
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.neutronclient
|
||||
attrs = _get_common_attrs(self.app.client_manager, parsed_args)
|
||||
body = {resource: attrs}
|
||||
obj = client.create_flow_classifier(body)[resource]
|
||||
columns, display_columns = nc_osc_utils.get_columns(obj, _attr_map)
|
||||
data = utils.get_dict_properties(obj, columns)
|
||||
return display_columns, data
|
||||
|
||||
|
||||
class DeleteSfcFlowClassifier(command.Command):
|
||||
_description = _("Delete a given flow classifier")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super(DeleteSfcFlowClassifier, self).get_parser(prog_name)
|
||||
parser.add_argument(
|
||||
'flow_classifier',
|
||||
metavar='<flow-classifier>',
|
||||
help=_("Flow classifier to delete (name or ID)")
|
||||
)
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
# TODO(mohan): Add support for deleting multiple resources.
|
||||
client = self.app.client_manager.neutronclient
|
||||
fc_id = _get_id(client, parsed_args.flow_classifier, resource)
|
||||
try:
|
||||
client.delete_flow_classifier(fc_id)
|
||||
except Exception as e:
|
||||
msg = (_("Failed to delete flow classifier with name "
|
||||
"or ID '%(fc)s': %(e)s")
|
||||
% {'fc': parsed_args.flow_classifier, 'e': e})
|
||||
raise exceptions.CommandError(msg)
|
||||
|
||||
|
||||
class ListSfcFlowClassifier(command.Lister):
|
||||
_description = _("List flow classifiers")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super(ListSfcFlowClassifier, self).get_parser(prog_name)
|
||||
parser.add_argument(
|
||||
'--long',
|
||||
action='store_true',
|
||||
help=_("List additional fields in output")
|
||||
)
|
||||
return parser
|
||||
|
||||
def extend_list(self, data, parsed_args):
|
||||
ext_data = data['flow_classifiers']
|
||||
for d in ext_data:
|
||||
val = []
|
||||
protocol = d['protocol'].upper() if d['protocol'] else 'any'
|
||||
val.append('protocol: ' + protocol)
|
||||
val.append(self._get_protocol_port_details(d, 'source'))
|
||||
val.append(self._get_protocol_port_details(d, 'destination'))
|
||||
if 'logical_source_port' in d:
|
||||
val.append('neutron_source_port: ' +
|
||||
str(d['logical_source_port']))
|
||||
|
||||
if 'logical_destination_port' in d:
|
||||
val.append('neutron_destination_port: ' +
|
||||
str(d['logical_destination_port']))
|
||||
|
||||
if 'l7_parameters' in d:
|
||||
l7_param = 'l7_parameters: {%s}' % ','.join(d['l7_parameters'])
|
||||
val.append(l7_param)
|
||||
d['summary'] = ',\n'.join(val)
|
||||
return ext_data
|
||||
|
||||
def _get_protocol_port_details(self, data, val):
|
||||
type_ip_prefix = val + '_ip_prefix'
|
||||
ip_prefix = data.get(type_ip_prefix)
|
||||
if not ip_prefix:
|
||||
ip_prefix = 'any'
|
||||
min_port = data.get(val + '_port_range_min')
|
||||
if min_port is None:
|
||||
min_port = 'any'
|
||||
max_port = data.get(val + '_port_range_max')
|
||||
if max_port is None:
|
||||
max_port = 'any'
|
||||
return '%s[port]: %s[%s:%s]' % (
|
||||
type, ip_prefix, min_port, max_port)
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.neutronclient
|
||||
obj = client.list_flow_classifier()
|
||||
obj_extend = self.extend_list(obj, parsed_args)
|
||||
headers, columns = nc_osc_utils.get_column_definitions(
|
||||
_attr_map, long_listing=parsed_args.long)
|
||||
return (headers, (utils.get_dict_properties(
|
||||
s, columns) for s in obj_extend))
|
||||
|
||||
|
||||
class SetSfcFlowClassifier(command.Command):
|
||||
_description = _("Set flow classifier properties")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super(SetSfcFlowClassifier, self).get_parser(prog_name)
|
||||
parser.add_argument(
|
||||
'--name',
|
||||
metavar='<name>',
|
||||
help=_('Name of the flow classifier'))
|
||||
parser.add_argument(
|
||||
'--description',
|
||||
metavar='<description>',
|
||||
help=_('Description for the flow classifier'))
|
||||
parser.add_argument(
|
||||
'flow_classifier',
|
||||
metavar='<flow-classifier>',
|
||||
help=_("Flow classifier to modify (name or ID)")
|
||||
)
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.neutronclient
|
||||
fc_id = _get_id(client, parsed_args.flow_classifier, resource)
|
||||
attrs = _get_common_attrs(self.app.client_manager, parsed_args,
|
||||
is_create=False)
|
||||
body = {resource: attrs}
|
||||
try:
|
||||
client.update_flow_classifier(fc_id, body)
|
||||
except Exception as e:
|
||||
msg = (_("Failed to update flow classifier '%(fc)s': %(e)s")
|
||||
% {'fc': parsed_args.flow_classifier, 'e': e})
|
||||
raise exceptions.CommandError(msg)
|
||||
|
||||
|
||||
class ShowSfcFlowClassifier(command.ShowOne):
|
||||
_description = _("Display flow classifier details")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super(ShowSfcFlowClassifier, self).get_parser(prog_name)
|
||||
parser.add_argument(
|
||||
'flow_classifier',
|
||||
metavar='<flow-classifier>',
|
||||
help=_("Flow classifier to display (name or ID)")
|
||||
)
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.neutronclient
|
||||
fc_id = _get_id(client, parsed_args.flow_classifier, resource)
|
||||
obj = client.show_flow_classifier(fc_id)[resource]
|
||||
columns, display_columns = nc_osc_utils.get_columns(obj, _attr_map)
|
||||
data = utils.get_dict_properties(obj, columns)
|
||||
return display_columns, data
|
||||
|
||||
|
||||
def _get_common_attrs(client_manager, parsed_args, is_create=True):
|
||||
attrs = {}
|
||||
if parsed_args.name is not None:
|
||||
attrs['name'] = parsed_args.name
|
||||
if parsed_args.description is not None:
|
||||
attrs['description'] = parsed_args.description
|
||||
if is_create:
|
||||
_get_attrs(client_manager, attrs, parsed_args)
|
||||
return attrs
|
||||
|
||||
|
||||
def _get_attrs(client_manager, attrs, parsed_args):
|
||||
if parsed_args.protocol is not None:
|
||||
attrs['protocol'] = parsed_args.protocol
|
||||
if parsed_args.ethertype:
|
||||
attrs['ethertype'] = parsed_args.ethertype
|
||||
if parsed_args.source_ip_prefix is not None:
|
||||
attrs['source_ip_prefix'] = parsed_args.source_ip_prefix
|
||||
if parsed_args.destination_ip_prefix is not None:
|
||||
attrs['destination_ip_prefix'] = parsed_args.destination_ip_prefix
|
||||
if parsed_args.logical_source_port is not None:
|
||||
attrs['logical_source_port'] = _get_id(
|
||||
client_manager.neutronclient, parsed_args.logical_source_port,
|
||||
'port')
|
||||
if parsed_args.logical_destination_port is not None:
|
||||
attrs['logical_destination_port'] = _get_id(
|
||||
client_manager.neutronclient, parsed_args.logical_destination_port,
|
||||
'port')
|
||||
if parsed_args.source_port is not None:
|
||||
_fill_protocol_port_info(attrs, 'source',
|
||||
parsed_args.source_port)
|
||||
if parsed_args.destination_port is not None:
|
||||
_fill_protocol_port_info(attrs, 'destination',
|
||||
parsed_args.destination_port)
|
||||
if parsed_args.l7_parameters is not None:
|
||||
attrs['l7_parameters'] = parsed_args.l7_parameters
|
||||
|
||||
|
||||
def _fill_protocol_port_info(attrs, port_type, port_val):
|
||||
min_port, sep, max_port = port_val.partition(":")
|
||||
if not min_port:
|
||||
msg = ("Invalid port value '%s', expected format is "
|
||||
"min-port:max-port or min-port.")
|
||||
raise argparse.ArgumentTypeError(msg % port_val)
|
||||
if not max_port:
|
||||
max_port = min_port
|
||||
try:
|
||||
attrs[port_type + '_port_range_min'] = int(min_port)
|
||||
attrs[port_type + '_port_range_max'] = int(max_port)
|
||||
except ValueError:
|
||||
message = (_("Protocol port value %s must be an integer "
|
||||
"or integer:integer.") % port_val)
|
||||
raise nc_exc.CommandError(message=message)
|
||||
|
||||
|
||||
def _get_id(client, id_or_name, resource):
|
||||
return client.find_resource(resource, id_or_name)['id']
|
347
neutronclient/osc/v2/sfc/sfc_port_chain.py
Executable file
347
neutronclient/osc/v2/sfc/sfc_port_chain.py
Executable file
@ -0,0 +1,347 @@
|
||||
# Copyright (c) 2017 Huawei Technologies India Pvt.Limited.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
|
||||
from osc_lib.cli import parseractions
|
||||
from osc_lib.command import command
|
||||
from osc_lib import exceptions
|
||||
from osc_lib import utils
|
||||
|
||||
from neutronclient._i18n import _
|
||||
from neutronclient.osc import utils as nc_osc_utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
resource = 'port_chain'
|
||||
|
||||
_attr_map = (
|
||||
('id', 'ID', nc_osc_utils.LIST_BOTH),
|
||||
('name', 'Name', nc_osc_utils.LIST_BOTH),
|
||||
('port_pair_groups', 'Port Pair Groups', nc_osc_utils.LIST_BOTH),
|
||||
('flow_classifiers', 'Flow Classifiers',
|
||||
nc_osc_utils.LIST_BOTH),
|
||||
('chain_parameters', 'Chain Parameters',
|
||||
nc_osc_utils.LIST_BOTH),
|
||||
('description', 'Description', nc_osc_utils.LIST_LONG_ONLY),
|
||||
('chain_id', 'Chain ID', nc_osc_utils.LIST_BOTH),
|
||||
('project_id', 'Project', nc_osc_utils.LIST_LONG_ONLY),
|
||||
)
|
||||
|
||||
|
||||
class CreateSfcPortChain(command.ShowOne):
|
||||
_description = _("Create a port chain")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super(CreateSfcPortChain, self).get_parser(prog_name)
|
||||
parser.add_argument(
|
||||
'name',
|
||||
metavar='<name>',
|
||||
help=_('Name of the port chain'))
|
||||
parser.add_argument(
|
||||
'--description',
|
||||
metavar='<description>',
|
||||
help=_('Description for the port chain'))
|
||||
parser.add_argument(
|
||||
'--flow-classifier',
|
||||
default=[],
|
||||
metavar='<flow-classifier>',
|
||||
dest='flow_classifiers',
|
||||
action='append',
|
||||
help=_('Add flow classifier (name or ID). '
|
||||
'This option can be repeated.'))
|
||||
parser.add_argument(
|
||||
'--chain-parameters',
|
||||
metavar='correlation=<correlation-type>,symmetric=<boolean>',
|
||||
action=parseractions.MultiKeyValueAction,
|
||||
optional_keys=['correlation', 'symmetric'],
|
||||
help=_('Dictionary of chain parameters. Supports '
|
||||
'correlation=mpls and symmetric=true|false.'))
|
||||
parser.add_argument(
|
||||
'--port-pair-group',
|
||||
metavar='<port-pair-group>',
|
||||
dest='port_pair_groups',
|
||||
required=True,
|
||||
action='append',
|
||||
help=_('Port pair group (name or ID). '
|
||||
'This option can be repeated.'))
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.neutronclient
|
||||
attrs = _get_common_attrs(self.app.client_manager, parsed_args)
|
||||
body = {resource: attrs}
|
||||
obj = client.create_port_chain(body)[resource]
|
||||
columns, display_columns = nc_osc_utils.get_columns(obj, _attr_map)
|
||||
data = utils.get_dict_properties(obj, columns)
|
||||
return display_columns, data
|
||||
|
||||
|
||||
class DeleteSfcPortChain(command.Command):
|
||||
_description = _("Delete a given port chain")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super(DeleteSfcPortChain, self).get_parser(prog_name)
|
||||
parser.add_argument(
|
||||
'port_chain',
|
||||
metavar="<port-chain>",
|
||||
help=_("Port chain to delete (name or ID)")
|
||||
)
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
# TODO(mohan): Add support for deleting multiple resources.
|
||||
client = self.app.client_manager.neutronclient
|
||||
pc_id = _get_id(client, parsed_args.port_chain, resource)
|
||||
try:
|
||||
client.delete_port_chain(pc_id)
|
||||
except Exception as e:
|
||||
msg = (_("Failed to delete port chain with name "
|
||||
"or ID '%(pc)s': %(e)s")
|
||||
% {'pc': parsed_args.port_chain, 'e': e})
|
||||
raise exceptions.CommandError(msg)
|
||||
|
||||
|
||||
class ListSfcPortChain(command.Lister):
|
||||
_description = _("List port chains")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super(ListSfcPortChain, self).get_parser(prog_name)
|
||||
parser.add_argument(
|
||||
'--long',
|
||||
action='store_true',
|
||||
default=False,
|
||||
help=_("List additional fields in output")
|
||||
)
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.neutronclient
|
||||
data = client.list_port_chain()
|
||||
headers, columns = nc_osc_utils.get_column_definitions(
|
||||
_attr_map, long_listing=parsed_args.long)
|
||||
return (headers,
|
||||
(utils.get_dict_properties(s, columns)
|
||||
for s in data['port_chains']))
|
||||
|
||||
|
||||
class SetSfcPortChain(command.Command):
|
||||
_description = _("Set port chain properties")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super(SetSfcPortChain, self).get_parser(prog_name)
|
||||
parser.add_argument(
|
||||
'--name',
|
||||
metavar='<name>',
|
||||
help=_('Name of the port chain'))
|
||||
parser.add_argument(
|
||||
'--description',
|
||||
metavar='<description>',
|
||||
help=_('Description for the port chain'))
|
||||
parser.add_argument(
|
||||
'--flow-classifier',
|
||||
metavar='<flow-classifier>',
|
||||
dest='flow_classifiers',
|
||||
action='append',
|
||||
help=_('Add flow classifier (name or ID). '
|
||||
'This option can be repeated.'))
|
||||
parser.add_argument(
|
||||
'--no-flow-classifier',
|
||||
action='store_true',
|
||||
help=_('Associate no flow classifier with the port chain'))
|
||||
parser.add_argument(
|
||||
'--port-pair-group',
|
||||
metavar='<port-pair-group>',
|
||||
dest='port_pair_groups',
|
||||
action='append',
|
||||
help=_('Add port pair group (name or ID). '
|
||||
'This option can be repeated.'))
|
||||
parser.add_argument(
|
||||
'--no-port-pair-group',
|
||||
action='store_true',
|
||||
help=_('Remove associated port pair group from the port chain.'
|
||||
'At least one --port-pair-group must be specified '
|
||||
'together.'))
|
||||
parser.add_argument(
|
||||
'port_chain',
|
||||
metavar='<port-chain>',
|
||||
help=_("Port chain to modify (name or ID)"))
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.neutronclient
|
||||
pc_id = _get_id(client, parsed_args.port_chain, resource)
|
||||
attrs = _get_common_attrs(self.app.client_manager, parsed_args,
|
||||
is_create=False)
|
||||
if parsed_args.no_flow_classifier:
|
||||
attrs['flow_classifiers'] = []
|
||||
if parsed_args.flow_classifiers:
|
||||
for fc in parsed_args.flow_classifiers:
|
||||
added = [client.find_resource(
|
||||
'flow_classifier', fc,
|
||||
cmd_resource='sfc_flow_classifier')['id']]
|
||||
if parsed_args.no_flow_classifier:
|
||||
existing = []
|
||||
else:
|
||||
existing = [client.find_resource(
|
||||
resource, parsed_args.port_chain,
|
||||
cmd_resource='sfc_port_chain')['flow_classifiers']]
|
||||
attrs['flow_classifiers'] = sorted(list(
|
||||
set(existing) | set(added)))
|
||||
if (parsed_args.no_port_pair_group and not
|
||||
parsed_args.port_pair_groups):
|
||||
message = _('At least one --port-pair-group must be specified.')
|
||||
raise exceptions.CommandError(message)
|
||||
if parsed_args.no_port_pair_group and parsed_args.port_pair_groups:
|
||||
for ppg in parsed_args.port_pair_groups:
|
||||
attrs['port_pair_groups'] = [client.find_resource(
|
||||
'port_pair_group', ppg,
|
||||
cmd_resource='sfc_port_pair_group')['id']]
|
||||
if (parsed_args.port_pair_groups and
|
||||
not parsed_args.no_port_pair_group):
|
||||
existing_ppg = [client.find_resource(
|
||||
resource, parsed_args.port_chain,
|
||||
cmd_resource='sfc_port_chain')['port_pair_groups']]
|
||||
for ppg in parsed_args.port_pair_groups:
|
||||
existing_ppg.append(client.find_resource(
|
||||
'port_pair_group', ppg,
|
||||
cmd_resource='sfc_port_pair_group')['id'])
|
||||
attrs['port_pair_groups'] = sorted(list(set(existing_ppg)))
|
||||
body = {resource: attrs}
|
||||
try:
|
||||
client.update_port_chain(pc_id, body)
|
||||
except Exception as e:
|
||||
msg = (_("Failed to update port chain '%(pc)s': %(e)s")
|
||||
% {'pc': parsed_args.port_chain, 'e': e})
|
||||
raise exceptions.CommandError(msg)
|
||||
|
||||
|
||||
class ShowSfcPortChain(command.ShowOne):
|
||||
_description = _("Display port chain details")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super(ShowSfcPortChain, self).get_parser(prog_name)
|
||||
parser.add_argument(
|
||||
'port_chain',
|
||||
metavar="<port-chain>",
|
||||
help=_("Port chain to display (name or ID)")
|
||||
)
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.neutronclient
|
||||
pc_id = _get_id(client, parsed_args.port_chain, resource)
|
||||
obj = client.show_port_chain(pc_id)[resource]
|
||||
columns, display_columns = nc_osc_utils.get_columns(obj, _attr_map)
|
||||
data = utils.get_dict_properties(obj, columns)
|
||||
return display_columns, data
|
||||
|
||||
|
||||
class UnsetSfcPortChain(command.Command):
|
||||
_description = _("Unset port chain properties")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super(UnsetSfcPortChain, self).get_parser(prog_name)
|
||||
parser.add_argument(
|
||||
'port_chain',
|
||||
metavar='<port-chain>',
|
||||
help=_("Port chain to unset (name or ID)"))
|
||||
port_chain = parser.add_mutually_exclusive_group()
|
||||
port_chain.add_argument(
|
||||
'--flow-classifier',
|
||||
action='append',
|
||||
metavar='<flow-classifier>',
|
||||
dest='flow_classifiers',
|
||||
help=_('Remove flow classifier(s) from the port chain '
|
||||
'(name or ID). This option can be repeated.'))
|
||||
port_chain.add_argument(
|
||||
'--all-flow-classifier',
|
||||
action='store_true',
|
||||
help=_('Remove all flow classifiers from the port chain'))
|
||||
parser.add_argument(
|
||||
'--port-pair-group',
|
||||
metavar='<port-pair-group>',
|
||||
dest='port_pair_groups',
|
||||
action='append',
|
||||
help=_('Remove port pair group(s) from the port chain '
|
||||
'(name or ID). This option can be repeated.'))
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.neutronclient
|
||||
pc_id = _get_id(client, parsed_args.port_chain, resource)
|
||||
attrs = {}
|
||||
if parsed_args.flow_classifiers:
|
||||
existing = [client.find_resource(
|
||||
resource, parsed_args.port_chain,
|
||||
cmd_resource='sfc_port_chain')['flow_classifiers']]
|
||||
for fc in parsed_args.flow_classifiers:
|
||||
removed = [client.find_resource(
|
||||
'flow_classifier', fc,
|
||||
cmd_resource='sfc_flow_classifier')['id']]
|
||||
attrs['flow_classifiers'] = list(set(existing) - set(removed))
|
||||
if parsed_args.all_flow_classifier:
|
||||
attrs['flow_classifiers'] = []
|
||||
if parsed_args.port_pair_groups:
|
||||
existing_ppg = [client.find_resource(
|
||||
resource, parsed_args.port_chain,
|
||||
cmd_resource='sfc_port_chain')['port_pair_groups']]
|
||||
for ppg in parsed_args.port_pair_groups:
|
||||
removed_ppg = [client.find_resource(
|
||||
'port_pair_group', ppg,
|
||||
cmd_resource='sfc_port_pair_group')['id']]
|
||||
attrs['port_pair_groups'] = list(set(existing_ppg) -
|
||||
set(removed_ppg))
|
||||
if attrs['port_pair_groups'] == []:
|
||||
message = _('At least one --port-pair-group must be'
|
||||
' specified.')
|
||||
raise exceptions.CommandError(message)
|
||||
body = {resource: attrs}
|
||||
try:
|
||||
client.update_port_chain(pc_id, body)
|
||||
except Exception as e:
|
||||
msg = (_("Failed to unset port chain '%(pc)s': %(e)s")
|
||||
% {'pc': parsed_args.port_chain, 'e': e})
|
||||
raise exceptions.CommandError(msg)
|
||||
|
||||
|
||||
def _get_common_attrs(client_manager, parsed_args, is_create=True):
|
||||
attrs = {}
|
||||
if parsed_args.name is not None:
|
||||
attrs['name'] = parsed_args.name
|
||||
if parsed_args.description is not None:
|
||||
attrs['description'] = parsed_args.description
|
||||
if ('port_pair_groups' in parsed_args and
|
||||
parsed_args.port_pair_groups is not None):
|
||||
attrs['port_pair_groups'] = [(_get_id(client_manager.neutronclient,
|
||||
ppg, 'port_pair_group'))
|
||||
for ppg in parsed_args.port_pair_groups]
|
||||
if ('flow_classifiers' in parsed_args and
|
||||
parsed_args.flow_classifiers is not None):
|
||||
attrs['flow_classifiers'] = [(_get_id(client_manager.neutronclient, fc,
|
||||
'flow_classifier'))
|
||||
for fc in parsed_args.flow_classifiers]
|
||||
if is_create is True:
|
||||
_get_attrs(attrs, parsed_args)
|
||||
return attrs
|
||||
|
||||
|
||||
def _get_attrs(attrs, parsed_args):
|
||||
if 'chain_parameters' in parsed_args:
|
||||
attrs['chain_parameters'] = parsed_args.chain_parameters
|
||||
|
||||
|
||||
def _get_id(client, id_or_name, resource):
|
||||
return client.find_resource(resource, id_or_name)['id']
|
215
neutronclient/osc/v2/sfc/sfc_port_pair.py
Executable file
215
neutronclient/osc/v2/sfc/sfc_port_pair.py
Executable file
@ -0,0 +1,215 @@
|
||||
# Copyright (c) 2017 Huawei Technologies India Pvt.Limited.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
|
||||
from osc_lib.cli import parseractions
|
||||
from osc_lib.command import command
|
||||
from osc_lib import exceptions
|
||||
from osc_lib import utils
|
||||
|
||||
from neutronclient._i18n import _
|
||||
from neutronclient.osc import utils as nc_osc_utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
resource = 'port_pair'
|
||||
|
||||
_attr_map = (
|
||||
('id', 'ID', nc_osc_utils.LIST_BOTH),
|
||||
('name', 'Name', nc_osc_utils.LIST_BOTH),
|
||||
('ingress', 'Ingress Logical Port', nc_osc_utils.LIST_BOTH),
|
||||
('egress', 'Egress Logical Port', nc_osc_utils.LIST_BOTH),
|
||||
('service_function_parameters', 'Service Function Parameters',
|
||||
nc_osc_utils.LIST_LONG_ONLY),
|
||||
('description', 'Description', nc_osc_utils.LIST_LONG_ONLY),
|
||||
('project_id', 'Project', nc_osc_utils.LIST_LONG_ONLY),
|
||||
)
|
||||
|
||||
|
||||
class CreateSfcPortPair(command.ShowOne):
|
||||
_description = _("Create a port pair")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super(CreateSfcPortPair, self).get_parser(prog_name)
|
||||
parser.add_argument(
|
||||
'name',
|
||||
metavar='<name>',
|
||||
help=_('Name of the port pair'))
|
||||
parser.add_argument(
|
||||
'--description',
|
||||
metavar='<description>',
|
||||
help=_('Description for the port pair'))
|
||||
parser.add_argument(
|
||||
'--service-function-parameters',
|
||||
metavar='correlation=<correlation-type>,weight=<weight>',
|
||||
action=parseractions.MultiKeyValueAction,
|
||||
optional_keys=['correlation', 'weight'],
|
||||
help=_('Dictionary of service function parameters. '
|
||||
'Currently, only correlation=None and weight '
|
||||
'is supported. Weight is an integer that influences '
|
||||
'the selection of a port pair within a port pair group '
|
||||
'for a flow. The higher the weight, the more flows will '
|
||||
'hash to the port pair. The default weight is 1.'))
|
||||
parser.add_argument(
|
||||
'--ingress',
|
||||
metavar='<ingress>',
|
||||
required=True,
|
||||
help=_('Ingress neutron port (name or ID)'))
|
||||
parser.add_argument(
|
||||
'--egress',
|
||||
metavar='<egress>',
|
||||
required=True,
|
||||
help=_('Egress neutron port (name or ID)'))
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.neutronclient
|
||||
attrs = _get_common_attrs(self.app.client_manager, parsed_args)
|
||||
body = {resource: attrs}
|
||||
obj = client.create_port_pair(body)[resource]
|
||||
columns, display_columns = nc_osc_utils.get_columns(obj, _attr_map)
|
||||
data = utils.get_dict_properties(obj, columns)
|
||||
return display_columns, data
|
||||
|
||||
|
||||
class DeleteSfcPortPair(command.Command):
|
||||
_description = _("Delete a given port pair")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super(DeleteSfcPortPair, self).get_parser(prog_name)
|
||||
parser.add_argument(
|
||||
'port_pair',
|
||||
metavar="<port-pair>",
|
||||
help=_("Port pair to delete (name or ID)")
|
||||
)
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
# TODO(mohan): Add support for deleting multiple resources.
|
||||
client = self.app.client_manager.neutronclient
|
||||
port_pair_id = _get_id(client, parsed_args.port_pair, resource)
|
||||
try:
|
||||
client.delete_port_pair(port_pair_id)
|
||||
except Exception as e:
|
||||
msg = (_("Failed to delete port pair with name "
|
||||
"or ID '%(port_pair)s': %(e)s")
|
||||
% {'port_pair': parsed_args.port_pair, 'e': e})
|
||||
raise exceptions.CommandError(msg)
|
||||
|
||||
|
||||
class ListSfcPortPair(command.Lister):
|
||||
_description = _("List port pairs")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super(ListSfcPortPair, self).get_parser(prog_name)
|
||||
parser.add_argument(
|
||||
'--long',
|
||||
action='store_true',
|
||||
help=_("List additional fields in output")
|
||||
)
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.neutronclient
|
||||
data = client.list_port_pair()
|
||||
headers, columns = nc_osc_utils.get_column_definitions(
|
||||
_attr_map, long_listing=parsed_args.long)
|
||||
return (headers,
|
||||
(utils.get_dict_properties(
|
||||
s, columns,
|
||||
) for s in data['port_pairs']))
|
||||
|
||||
|
||||
class SetSfcPortPair(command.Command):
|
||||
_description = _("Set port pair properties")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super(SetSfcPortPair, self).get_parser(prog_name)
|
||||
parser.add_argument(
|
||||
'--name',
|
||||
metavar='<name>',
|
||||
help=_('Name of the port pair'))
|
||||
parser.add_argument(
|
||||
'--description',
|
||||
metavar='<description>',
|
||||
help=_('Description for the port pair'))
|
||||
parser.add_argument(
|
||||
'port_pair',
|
||||
metavar='<port-pair>',
|
||||
help=_("Port pair to modify (name or ID)")
|
||||
)
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.neutronclient
|
||||
port_pair_id = _get_id(client, parsed_args.port_pair, resource)
|
||||
attrs = _get_common_attrs(self.app.client_manager, parsed_args,
|
||||
is_create=False)
|
||||
body = {resource: attrs}
|
||||
try:
|
||||
client.update_port_pair(port_pair_id, body)
|
||||
except Exception as e:
|
||||
msg = (_("Failed to update port pair '%(port_pair)s': %(e)s")
|
||||
% {'port_pair': parsed_args.port_pair, 'e': e})
|
||||
raise exceptions.CommandError(msg)
|
||||
|
||||
|
||||
class ShowSfcPortPair(command.ShowOne):
|
||||
_description = _("Display port pair details")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super(ShowSfcPortPair, self).get_parser(prog_name)
|
||||
parser.add_argument(
|
||||
'port_pair',
|
||||
metavar='<port-pair>',
|
||||
help=_("Port pair to display (name or ID)")
|
||||
)
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.neutronclient
|
||||
port_pair_id = _get_id(client, parsed_args.port_pair, resource)
|
||||
obj = client.show_port_pair(port_pair_id)[resource]
|
||||
columns, display_columns = nc_osc_utils.get_columns(obj, _attr_map)
|
||||
data = utils.get_dict_properties(obj, columns)
|
||||
return display_columns, data
|
||||
|
||||
|
||||
def _get_common_attrs(client_manager, parsed_args, is_create=True):
|
||||
attrs = {}
|
||||
if parsed_args.name is not None:
|
||||
attrs['name'] = parsed_args.name
|
||||
if parsed_args.description is not None:
|
||||
attrs['description'] = parsed_args.description
|
||||
if is_create:
|
||||
_get_attrs(client_manager, attrs, parsed_args)
|
||||
return attrs
|
||||
|
||||
|
||||
def _get_attrs(client_manager, attrs, parsed_args):
|
||||
if parsed_args.ingress is not None:
|
||||
attrs['ingress'] = _get_id(client_manager.neutronclient,
|
||||
parsed_args.ingress, 'port')
|
||||
if parsed_args.egress is not None:
|
||||
attrs['egress'] = _get_id(client_manager.neutronclient,
|
||||
parsed_args.egress, 'port')
|
||||
if 'service_function_parameters' in parsed_args:
|
||||
attrs['service_function_parameters'] = (
|
||||
parsed_args.service_function_parameters)
|
||||
|
||||
|
||||
def _get_id(client, id_or_name, resource):
|
||||
return client.find_resource(resource, id_or_name)['id']
|
291
neutronclient/osc/v2/sfc/sfc_port_pair_group.py
Executable file
291
neutronclient/osc/v2/sfc/sfc_port_pair_group.py
Executable file
@ -0,0 +1,291 @@
|
||||
# Copyright (c) 2017 Huawei Technologies India Pvt.Limited.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
|
||||
from osc_lib.cli import parseractions
|
||||
from osc_lib.command import command
|
||||
from osc_lib import exceptions
|
||||
from osc_lib import utils
|
||||
|
||||
from neutronclient._i18n import _
|
||||
from neutronclient.osc import utils as nc_osc_utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
resource = 'port_pair_group'
|
||||
|
||||
_attr_map = (
|
||||
('id', 'ID', nc_osc_utils.LIST_BOTH),
|
||||
('name', 'Name', nc_osc_utils.LIST_BOTH),
|
||||
('port_pairs', 'Port Pair', nc_osc_utils.LIST_BOTH),
|
||||
('port_pair_group_parameters', 'Port Pair Group Parameters',
|
||||
nc_osc_utils.LIST_BOTH),
|
||||
('description', 'Description', nc_osc_utils.LIST_LONG_ONLY),
|
||||
('group_id', 'Loadbalance ID', nc_osc_utils.LIST_LONG_ONLY),
|
||||
('project_id', 'Project', nc_osc_utils.LIST_LONG_ONLY),
|
||||
)
|
||||
|
||||
|
||||
class CreateSfcPortPairGroup(command.ShowOne):
|
||||
_description = _("Create a port pair group")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super(CreateSfcPortPairGroup, self).get_parser(prog_name)
|
||||
parser.add_argument(
|
||||
'name',
|
||||
metavar='<name>',
|
||||
help=_('Name of the port pair group'))
|
||||
parser.add_argument(
|
||||
'--description',
|
||||
metavar='<description>',
|
||||
help=_('Description for the port pair group'))
|
||||
parser.add_argument(
|
||||
'--port-pair',
|
||||
metavar='<port-pair>',
|
||||
dest='port_pairs',
|
||||
default=[],
|
||||
action='append',
|
||||
help=_('Port pair (name or ID). '
|
||||
'This option can be repeated.'))
|
||||
parser.add_argument(
|
||||
'--port-pair-group-parameters',
|
||||
metavar='lb-fields=<lb-fields>',
|
||||
action=parseractions.KeyValueAction,
|
||||
help=_('Dictionary of port pair group parameters. '
|
||||
'Currently only one parameter lb-fields is supported. '
|
||||
'<lb-fields> is a & separated list of load-balancing '
|
||||
'fields.'))
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.neutronclient
|
||||
attrs = _get_common_attrs(self.app.client_manager, parsed_args)
|
||||
body = {resource: attrs}
|
||||
obj = client.create_port_pair_group(body)[resource]
|
||||
columns, display_columns = nc_osc_utils.get_columns(obj, _attr_map)
|
||||
data = utils.get_dict_properties(obj, columns)
|
||||
return display_columns, data
|
||||
|
||||
|
||||
class DeleteSfcPortPairGroup(command.Command):
|
||||
_description = _("Delete a given port pair group")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super(DeleteSfcPortPairGroup, self).get_parser(prog_name)
|
||||
parser.add_argument(
|
||||
'port_pair_group',
|
||||
metavar='<port-pair-group>',
|
||||
help=_("Port pair group to delete (name or ID)")
|
||||
)
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
# TODO(mohan): Add support for deleting multiple resources.
|
||||
client = self.app.client_manager.neutronclient
|
||||
ppg_id = _get_id(client, parsed_args.port_pair_group, resource)
|
||||
try:
|
||||
client.delete_port_pair_group(ppg_id)
|
||||
except Exception as e:
|
||||
msg = (_("Failed to delete port pair group with name "
|
||||
"or ID '%(ppg)s': %(e)s")
|
||||
% {'ppg': parsed_args.port_pair_group, 'e': e})
|
||||
raise exceptions.CommandError(msg)
|
||||
|
||||
|
||||
class ListSfcPortPairGroup(command.Lister):
|
||||
_description = _("List port pair group")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super(ListSfcPortPairGroup, self).get_parser(prog_name)
|
||||
parser.add_argument(
|
||||
'--long',
|
||||
action='store_true',
|
||||
default=False,
|
||||
help=_("List additional fields in output")
|
||||
)
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.neutronclient
|
||||
data = client.list_port_pair_group()
|
||||
headers, columns = nc_osc_utils.get_column_definitions(
|
||||
_attr_map, long_listing=parsed_args.long)
|
||||
return (headers,
|
||||
(utils.get_dict_properties(
|
||||
s, columns,
|
||||
) for s in data['port_pair_groups']))
|
||||
|
||||
|
||||
class SetSfcPortPairGroup(command.Command):
|
||||
_description = _("Set port pair group properties")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super(SetSfcPortPairGroup, self).get_parser(prog_name)
|
||||
parser.add_argument(
|
||||
'port_pair_group',
|
||||
metavar='<port-pair-group>',
|
||||
help=_("Port pair group to modify (name or ID)"))
|
||||
parser.add_argument(
|
||||
'--name',
|
||||
metavar='<name>',
|
||||
help=_('Name of the port pair group'))
|
||||
parser.add_argument(
|
||||
'--description',
|
||||
metavar='<description>',
|
||||
help=_('Description for the port pair group'))
|
||||
parser.add_argument(
|
||||
'--port-pair',
|
||||
metavar='<port-pair>',
|
||||
dest='port_pairs',
|
||||
default=[],
|
||||
action='append',
|
||||
help=_('Port pair (name or ID). '
|
||||
'This option can be repeated.'))
|
||||
parser.add_argument(
|
||||
'--no-port-pair',
|
||||
action='store_true',
|
||||
help=_('Remove all port pair from port pair group'))
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.neutronclient
|
||||
ppg_id = _get_id(client, parsed_args.port_pair_group, resource)
|
||||
attrs = _get_common_attrs(self.app.client_manager, parsed_args,
|
||||
is_create=False)
|
||||
if parsed_args.no_port_pair:
|
||||
attrs['port_pairs'] = []
|
||||
if parsed_args.port_pairs:
|
||||
added = [client.find_resource('port_pair', pp,
|
||||
cmd_resource='sfc_port_pair')['id']
|
||||
for pp in parsed_args.port_pairs]
|
||||
if parsed_args.no_port_pair:
|
||||
existing = []
|
||||
else:
|
||||
existing = [client.find_resource(
|
||||
resource, parsed_args.port_pair_group,
|
||||
cmd_resource='sfc_port_pair_group')['port_pairs']]
|
||||
attrs['port_pairs'] = sorted(list(set(existing) | set(added)))
|
||||
body = {resource: attrs}
|
||||
try:
|
||||
client.update_port_pair_group(ppg_id, body)
|
||||
except Exception as e:
|
||||
msg = (_("Failed to update port pair group '%(ppg)s': %(e)s")
|
||||
% {'ppg': parsed_args.port_pair_group, 'e': e})
|
||||
raise exceptions.CommandError(msg)
|
||||
|
||||
|
||||
class ShowSfcPortPairGroup(command.ShowOne):
|
||||
_description = _("Display port pair group details")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super(ShowSfcPortPairGroup, self).get_parser(prog_name)
|
||||
parser.add_argument(
|
||||
'port_pair_group',
|
||||
metavar='<port-pair-group>',
|
||||
help=_("Port pair group to display (name or ID)")
|
||||
)
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.neutronclient
|
||||
ppg_id = _get_id(client, parsed_args.port_pair_group, resource)
|
||||
obj = client.show_port_pair_group(ppg_id)[resource]
|
||||
columns, display_columns = nc_osc_utils.get_columns(obj, _attr_map)
|
||||
data = utils.get_dict_properties(obj, columns)
|
||||
return display_columns, data
|
||||
|
||||
|
||||
class UnsetSfcPortPairGroup(command.Command):
|
||||
_description = _("Unset port pairs from port pair group")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super(UnsetSfcPortPairGroup, self).get_parser(prog_name)
|
||||
parser.add_argument(
|
||||
'port_pair_group',
|
||||
metavar='<port-pair-group>',
|
||||
help=_("Port pair group to unset (name or ID)"))
|
||||
port_pair_group = parser.add_mutually_exclusive_group()
|
||||
port_pair_group.add_argument(
|
||||
'--port-pair',
|
||||
action='append',
|
||||
metavar='<port-pair>',
|
||||
dest='port_pairs',
|
||||
help=_('Remove port pair(s) from the port pair group '
|
||||
'(name or ID). This option can be repeated.'))
|
||||
port_pair_group.add_argument(
|
||||
'--all-port-pair',
|
||||
action='store_true',
|
||||
help=_('Remove all port pairs from the port pair group'))
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.neutronclient
|
||||
ppg_id = _get_id(client, parsed_args.port_pair_group, resource)
|
||||
attrs = {}
|
||||
if parsed_args.port_pairs:
|
||||
existing = [client.find_resource(
|
||||
resource, parsed_args.port_pair_group,
|
||||
cmd_resource='sfc_port_pair_group')['port_pairs']]
|
||||
for pp in parsed_args.port_pairs:
|
||||
removed = [client.find_resource(
|
||||
'port_pair', pp, cmd_resource='sfc_port_pair')['id']]
|
||||
attrs['port_pairs'] = list(set(existing) - set(removed))
|
||||
if parsed_args.all_port_pair:
|
||||
attrs['port_pairs'] = []
|
||||
body = {resource: attrs}
|
||||
try:
|
||||
client.update_port_pair_group(ppg_id, body)
|
||||
except Exception as e:
|
||||
msg = (_("Failed to unset port pair group '%(ppg)s': %(e)s")
|
||||
% {'ppg': parsed_args.port_pair_group, 'e': e})
|
||||
raise exceptions.CommandError(msg)
|
||||
|
||||
|
||||
def _get_ppg_param(attrs, ppg):
|
||||
attrs['port_pair_group_parameters'] = {}
|
||||
for key, value in ppg.items():
|
||||
if key == 'lb_fields':
|
||||
attrs['port_pair_group_parameters'][key] = ([
|
||||
field for field in value.split('&') if field])
|
||||
else:
|
||||
attrs['port_pair_group_parameters'][key] = value
|
||||
return attrs['port_pair_group_parameters']
|
||||
|
||||
|
||||
def _get_common_attrs(client_manager, parsed_args, is_create=True):
|
||||
attrs = {}
|
||||
if parsed_args.name is not None:
|
||||
attrs['name'] = parsed_args.name
|
||||
if parsed_args.description is not None:
|
||||
attrs['description'] = parsed_args.description
|
||||
if parsed_args.port_pairs:
|
||||
attrs['port_pairs'] = [(_get_id(client_manager.neutronclient, pp,
|
||||
'port_pair'))
|
||||
for pp in parsed_args.port_pairs]
|
||||
if is_create:
|
||||
_get_attrs(attrs, parsed_args)
|
||||
return attrs
|
||||
|
||||
|
||||
def _get_attrs(attrs, parsed_args):
|
||||
if ('port_pair_group_parameters' in parsed_args and
|
||||
parsed_args.port_pair_group_parameters is not None):
|
||||
attrs['port_pair_group_parameters'] = (
|
||||
_get_ppg_param(attrs, parsed_args.port_pair_group_parameters))
|
||||
|
||||
|
||||
def _get_id(client, id_or_name, resource):
|
||||
return client.find_resource(resource, id_or_name)['id']
|
0
neutronclient/tests/unit/osc/v2/sfc/__init__.py
Normal file
0
neutronclient/tests/unit/osc/v2/sfc/__init__.py
Normal file
234
neutronclient/tests/unit/osc/v2/sfc/fakes.py
Executable file
234
neutronclient/tests/unit/osc/v2/sfc/fakes.py
Executable file
@ -0,0 +1,234 @@
|
||||
# Copyright (c) 2017 Huawei Technologies India Pvt.Limited.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import argparse
|
||||
import copy
|
||||
|
||||
import mock
|
||||
|
||||
from osc_lib.tests import utils
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
|
||||
class TestNeutronClientOSCV2(utils.TestCommand):
|
||||
|
||||
def setUp(self):
|
||||
super(TestNeutronClientOSCV2, self).setUp()
|
||||
self.namespace = argparse.Namespace()
|
||||
self.app.client_manager.session = mock.Mock()
|
||||
self.app.client_manager.neutronclient = mock.Mock()
|
||||
self.neutronclient = self.app.client_manager.neutronclient
|
||||
self.neutronclient.find_resource = mock.Mock(
|
||||
side_effect=lambda resource, name_or_id, project_id=None,
|
||||
cmd_resource=None, parent_id=None, fields=None:
|
||||
{'id': name_or_id})
|
||||
|
||||
|
||||
class FakeSfcPortPair(object):
|
||||
"""Fake port pair attributes."""
|
||||
|
||||
@staticmethod
|
||||
def create_port_pair(attrs=None):
|
||||
"""Create a fake port pair.
|
||||
|
||||
:param Dictionary attrs:
|
||||
A dictionary with all attributes
|
||||
:return:
|
||||
A Dictionary with id, name, description, ingress, egress,
|
||||
service-function-parameter, project_id
|
||||
"""
|
||||
attrs = attrs or {}
|
||||
|
||||
# Set default attributes.
|
||||
port_pair_attrs = {
|
||||
'description': 'description',
|
||||
'egress': uuidutils.generate_uuid(),
|
||||
'id': uuidutils.generate_uuid(),
|
||||
'ingress': uuidutils.generate_uuid(),
|
||||
'name': 'port-pair-name',
|
||||
'service_function_parameters': 'correlation=None,weight=1',
|
||||
'project_id': uuidutils.generate_uuid(),
|
||||
}
|
||||
|
||||
# Overwrite default attributes.
|
||||
port_pair_attrs.update(attrs)
|
||||
return copy.deepcopy(port_pair_attrs)
|
||||
|
||||
@staticmethod
|
||||
def create_port_pairs(attrs=None, count=1):
|
||||
"""Create multiple port_pairs.
|
||||
|
||||
:param Dictionary attrs:
|
||||
A dictionary with all attributes
|
||||
:param int count:
|
||||
The number of port_pairs to fake
|
||||
:return:
|
||||
A list of dictionaries faking the port_pairs
|
||||
"""
|
||||
port_pairs = []
|
||||
for _ in range(count):
|
||||
port_pairs.append(FakeSfcPortPair.create_port_pair(attrs))
|
||||
|
||||
return port_pairs
|
||||
|
||||
|
||||
class FakeSfcPortPairGroup(object):
|
||||
"""Fake port pair group attributes."""
|
||||
|
||||
@staticmethod
|
||||
def create_port_pair_group(attrs=None):
|
||||
"""Create a fake port pair group.
|
||||
|
||||
:param Dictionary attrs:
|
||||
A dictionary with all attributes
|
||||
:return:
|
||||
A Dictionary with id, name, description, port_pairs, group_id
|
||||
port_pair_group_parameters, project_id
|
||||
"""
|
||||
attrs = attrs or {}
|
||||
|
||||
# Set default attributes.
|
||||
port_pair_group_attrs = {
|
||||
'id': uuidutils.generate_uuid(),
|
||||
'group_id': uuidutils.generate_uuid(),
|
||||
'name': 'port-pair-group-name',
|
||||
'description': 'description',
|
||||
'port_pairs': uuidutils.generate_uuid(),
|
||||
'port_pair_group_parameters': '{"lb_fields": []}',
|
||||
'project_id': uuidutils.generate_uuid()
|
||||
}
|
||||
|
||||
# port_pair_group_attrs default attributes.
|
||||
port_pair_group_attrs.update(attrs)
|
||||
return copy.deepcopy(port_pair_group_attrs)
|
||||
|
||||
@staticmethod
|
||||
def create_port_pair_groups(attrs=None, count=1):
|
||||
"""Create multiple port pair groups.
|
||||
|
||||
:param Dictionary attrs:
|
||||
A dictionary with all attributes
|
||||
:param int count:
|
||||
The number of port_pair_groups to fake
|
||||
:return:
|
||||
A list of dictionaries faking the port pair groups
|
||||
"""
|
||||
port_pair_groups = []
|
||||
for _ in range(count):
|
||||
port_pair_groups.append(
|
||||
FakeSfcPortPairGroup.create_port_pair_group(attrs))
|
||||
|
||||
return port_pair_groups
|
||||
|
||||
|
||||
class FakeSfcFlowClassifier(object):
|
||||
"""Fake flow classifier attributes."""
|
||||
|
||||
@staticmethod
|
||||
def create_flow_classifier(attrs=None):
|
||||
"""Create a fake flow classifier.
|
||||
|
||||
:param Dictionary attrs:
|
||||
A dictionary with all attributes
|
||||
:return:
|
||||
A Dictionary with faking port chain attributes
|
||||
"""
|
||||
attrs = attrs or {}
|
||||
|
||||
# Set default attributes.
|
||||
flow_classifier_attrs = {
|
||||
'id': uuidutils.generate_uuid(),
|
||||
'destination_ip_prefix': '2.2.2.2/32',
|
||||
'destination_port_range_max': '90',
|
||||
'destination_port_range_min': '80',
|
||||
'ethertype': 'IPv4',
|
||||
'logical_destination_port': uuidutils.generate_uuid(),
|
||||
'logical_source_port': uuidutils.generate_uuid(),
|
||||
'name': 'flow-classifier-name',
|
||||
'description': 'fc_description',
|
||||
'protocol': 'tcp',
|
||||
'source_ip_prefix': '1.1.1.1/32',
|
||||
'source_port_range_max': '20',
|
||||
'source_port_range_min': '10',
|
||||
'project_id': uuidutils.generate_uuid(),
|
||||
'l7_parameters': '{}'
|
||||
}
|
||||
flow_classifier_attrs.update(attrs)
|
||||
return copy.deepcopy(flow_classifier_attrs)
|
||||
|
||||
@staticmethod
|
||||
def create_flow_classifiers(attrs=None, count=1):
|
||||
"""Create multiple flow classifiers.
|
||||
|
||||
:param Dictionary attrs:
|
||||
A dictionary with all attributes
|
||||
:param int count:
|
||||
The number of flow classifiers to fake
|
||||
:return:
|
||||
A list of dictionaries faking the flow classifiers
|
||||
"""
|
||||
flow_classifiers = []
|
||||
for _ in range(count):
|
||||
flow_classifiers.append(
|
||||
FakeSfcFlowClassifier.create_flow_classifier(attrs))
|
||||
|
||||
return flow_classifiers
|
||||
|
||||
|
||||
class FakeSfcPortChain(object):
|
||||
"""Fake port chain attributes."""
|
||||
|
||||
@staticmethod
|
||||
def create_port_chain(attrs=None):
|
||||
"""Create a fake port chain.
|
||||
|
||||
:param Dictionary attrs:
|
||||
A dictionary with all attributes
|
||||
:return:
|
||||
A Dictionary with faking port chain attributes
|
||||
"""
|
||||
attrs = attrs or {}
|
||||
|
||||
# Set default attributes.
|
||||
port_chain_attrs = {
|
||||
'id': uuidutils.generate_uuid(),
|
||||
'chain_id': uuidutils.generate_uuid(),
|
||||
'name': 'port-chain-name',
|
||||
'description': 'description',
|
||||
'port_pair_groups': uuidutils.generate_uuid(),
|
||||
'flow_classifiers': uuidutils.generate_uuid(),
|
||||
'chain_parameters': '{"correlation": mpls}',
|
||||
'project_id': uuidutils.generate_uuid(),
|
||||
}
|
||||
|
||||
# port_pair_group_attrs default attributes.
|
||||
port_chain_attrs.update(attrs)
|
||||
return copy.deepcopy(port_chain_attrs)
|
||||
|
||||
@staticmethod
|
||||
def create_port_chains(attrs=None, count=1):
|
||||
"""Create multiple port chains.
|
||||
|
||||
:param Dictionary attrs:
|
||||
A dictionary with all attributes
|
||||
:param int count:
|
||||
The number of port chains to fake
|
||||
:return:
|
||||
A list of dictionaries faking the port chains.
|
||||
"""
|
||||
port_chains = []
|
||||
for _ in range(count):
|
||||
port_chains.append(FakeSfcPortChain.create_port_chain(attrs))
|
||||
return port_chains
|
378
neutronclient/tests/unit/osc/v2/sfc/test_flow_classifier.py
Executable file
378
neutronclient/tests/unit/osc/v2/sfc/test_flow_classifier.py
Executable file
@ -0,0 +1,378 @@
|
||||
# Copyright (c) 2017 Huawei Technologies India Pvt.Limited.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
|
||||
from neutronclient.osc.v2.sfc import sfc_flow_classifier
|
||||
from neutronclient.tests.unit.osc.v2.sfc import fakes
|
||||
|
||||
get_id = 'neutronclient.osc.v2.sfc.sfc_flow_classifier._get_id'
|
||||
|
||||
|
||||
def _get_id(client, id_or_name, resource):
|
||||
return id_or_name
|
||||
|
||||
|
||||
class TestCreateSfcFlowClassifier(fakes.TestNeutronClientOSCV2):
|
||||
|
||||
_fc = fakes.FakeSfcFlowClassifier.create_flow_classifier()
|
||||
|
||||
columns = ('Description',
|
||||
'Destination IP',
|
||||
'Destination Port Range Max',
|
||||
'Destination Port Range Min',
|
||||
'Ethertype',
|
||||
'ID',
|
||||
'L7 Parameters',
|
||||
'Logical Destination Port',
|
||||
'Logical Source Port',
|
||||
'Name',
|
||||
'Project',
|
||||
'Protocol',
|
||||
'Source IP',
|
||||
'Source Port Range Max',
|
||||
'Source Port Range Min')
|
||||
|
||||
def get_data(self):
|
||||
return (
|
||||
self._fc['description'],
|
||||
self._fc['destination_ip_prefix'],
|
||||
self._fc['destination_port_range_max'],
|
||||
self._fc['destination_port_range_min'],
|
||||
self._fc['ethertype'],
|
||||
self._fc['id'],
|
||||
self._fc['l7_parameters'],
|
||||
self._fc['logical_destination_port'],
|
||||
self._fc['logical_source_port'],
|
||||
self._fc['name'],
|
||||
self._fc['project_id'],
|
||||
self._fc['protocol'],
|
||||
self._fc['source_ip_prefix'],
|
||||
self._fc['source_port_range_max'],
|
||||
self._fc['source_port_range_min']
|
||||
)
|
||||
|
||||
def setUp(self):
|
||||
super(TestCreateSfcFlowClassifier, self).setUp()
|
||||
mock.patch(get_id, new=_get_id).start()
|
||||
self.neutronclient.create_flow_classifier = mock.Mock(
|
||||
return_value={'flow_classifier': self._fc})
|
||||
self.data = self.get_data()
|
||||
|
||||
# Get the command object to test
|
||||
self.cmd = sfc_flow_classifier.CreateSfcFlowClassifier(self.app,
|
||||
self.namespace)
|
||||
|
||||
def test_create_flow_classifier_default_options(self):
|
||||
arglist = [
|
||||
"--logical-source-port", self._fc['logical_source_port'],
|
||||
"--ethertype", self._fc['ethertype'],
|
||||
self._fc['name'],
|
||||
]
|
||||
verifylist = [
|
||||
('logical_source_port', self._fc['logical_source_port']),
|
||||
('ethertype', self._fc['ethertype']),
|
||||
('name', self._fc['name']),
|
||||
]
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
columns, data = (self.cmd.take_action(parsed_args))
|
||||
|
||||
self.neutronclient.create_flow_classifier.assert_called_once_with({
|
||||
'flow_classifier': {
|
||||
'name': self._fc['name'],
|
||||
'logical_source_port': self._fc['logical_source_port'],
|
||||
'ethertype': self._fc['ethertype']}
|
||||
})
|
||||
self.assertEqual(self.columns, columns)
|
||||
self.assertEqual(self.data, data)
|
||||
|
||||
def test_create_flow_classifier(self):
|
||||
arglist = [
|
||||
"--description", self._fc['description'],
|
||||
"--ethertype", self._fc['ethertype'],
|
||||
"--protocol", self._fc['protocol'],
|
||||
"--source-ip-prefix", self._fc['source_ip_prefix'],
|
||||
"--destination-ip-prefix", self._fc['destination_ip_prefix'],
|
||||
"--logical-source-port", self._fc['logical_source_port'],
|
||||
"--logical-destination-port", self._fc['logical_destination_port'],
|
||||
self._fc['name'],
|
||||
"--l7-parameters", 'url=path',
|
||||
]
|
||||
|
||||
param = 'url=path'
|
||||
|
||||
verifylist = [
|
||||
('description', self._fc['description']),
|
||||
('name', self._fc['name']),
|
||||
('ethertype', self._fc['ethertype']),
|
||||
('protocol', self._fc['protocol']),
|
||||
('source_ip_prefix', self._fc['source_ip_prefix']),
|
||||
('destination_ip_prefix', self._fc['destination_ip_prefix']),
|
||||
('logical_source_port', self._fc['logical_source_port']),
|
||||
('logical_destination_port',
|
||||
self._fc['logical_destination_port']),
|
||||
('l7_parameters', param)
|
||||
]
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
columns, data = (self.cmd.take_action(parsed_args))
|
||||
self.neutronclient.create_flow_classifier.assert_called_once_with({
|
||||
'flow_classifier': {
|
||||
'name': self._fc['name'],
|
||||
'description': self._fc['description'],
|
||||
'ethertype': self._fc['ethertype'],
|
||||
'protocol': self._fc['protocol'],
|
||||
'source_ip_prefix': self._fc['source_ip_prefix'],
|
||||
'destination_ip_prefix': self._fc['destination_ip_prefix'],
|
||||
'logical_source_port': self._fc['logical_source_port'],
|
||||
'logical_destination_port':
|
||||
self._fc['logical_destination_port'],
|
||||
'l7_parameters': param
|
||||
}
|
||||
})
|
||||
self.assertEqual(self.columns, columns)
|
||||
self.assertEqual(self.data, data)
|
||||
|
||||
|
||||
class TestDeleteSfcFlowClassifier(fakes.TestNeutronClientOSCV2):
|
||||
|
||||
_flow_classifier = \
|
||||
fakes.FakeSfcFlowClassifier.create_flow_classifiers(count=1)
|
||||
|
||||
def setUp(self):
|
||||
super(TestDeleteSfcFlowClassifier, self).setUp()
|
||||
mock.patch(get_id, new=_get_id).start()
|
||||
self.neutronclient.delete_flow_classifier = mock.Mock(
|
||||
return_value=None)
|
||||
self.cmd = sfc_flow_classifier.DeleteSfcFlowClassifier(self.app,
|
||||
self.namespace)
|
||||
|
||||
def test_delete_flow_classifier(self):
|
||||
client = self.app.client_manager.neutronclient
|
||||
mock_flow_classifier_delete = client.delete_flow_classifier
|
||||
arglist = [
|
||||
self._flow_classifier[0]['id'],
|
||||
]
|
||||
verifylist = [
|
||||
('flow_classifier', self._flow_classifier[0]['id']),
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
result = self.cmd.take_action(parsed_args)
|
||||
mock_flow_classifier_delete.assert_called_once_with(
|
||||
self._flow_classifier[0]['id'])
|
||||
self.assertIsNone(result)
|
||||
|
||||
|
||||
class TestSetSfcFlowClassifier(fakes.TestNeutronClientOSCV2):
|
||||
_flow_classifier = fakes.FakeSfcFlowClassifier.create_flow_classifier()
|
||||
_flow_classifier_name = _flow_classifier['name']
|
||||
_flow_classifier_id = _flow_classifier['id']
|
||||
|
||||
def setUp(self):
|
||||
super(TestSetSfcFlowClassifier, self).setUp()
|
||||
mock.patch(get_id, new=_get_id).start()
|
||||
self.neutronclient.update_flow_classifier = mock.Mock(
|
||||
return_value=None)
|
||||
self.cmd = sfc_flow_classifier.SetSfcFlowClassifier(self.app,
|
||||
self.namespace)
|
||||
|
||||
def test_set_flow_classifier(self):
|
||||
client = self.app.client_manager.neutronclient
|
||||
mock_flow_classifier_update = client.update_flow_classifier
|
||||
arglist = [
|
||||
self._flow_classifier_name,
|
||||
'--name', 'name_updated',
|
||||
'--description', 'desc_updated'
|
||||
]
|
||||
verifylist = [
|
||||
('flow_classifier', self._flow_classifier_name),
|
||||
('name', 'name_updated'),
|
||||
('description', 'desc_updated'),
|
||||
]
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
result = self.cmd.take_action(parsed_args)
|
||||
|
||||
attrs = {'flow_classifier': {
|
||||
'name': 'name_updated',
|
||||
'description': 'desc_updated'}}
|
||||
mock_flow_classifier_update.assert_called_once_with(
|
||||
self._flow_classifier_name, attrs)
|
||||
self.assertIsNone(result)
|
||||
|
||||
|
||||
class TestShowSfcFlowClassifier(fakes.TestNeutronClientOSCV2):
|
||||
|
||||
_fc = fakes.FakeSfcFlowClassifier.create_flow_classifier()
|
||||
data = (
|
||||
_fc['description'],
|
||||
_fc['destination_ip_prefix'],
|
||||
_fc['destination_port_range_max'],
|
||||
_fc['destination_port_range_min'],
|
||||
_fc['ethertype'],
|
||||
_fc['id'],
|
||||
_fc['l7_parameters'],
|
||||
_fc['logical_destination_port'],
|
||||
_fc['logical_source_port'],
|
||||
_fc['name'],
|
||||
_fc['project_id'],
|
||||
_fc['protocol'],
|
||||
_fc['source_ip_prefix'],
|
||||
_fc['source_port_range_max'],
|
||||
_fc['source_port_range_min']
|
||||
)
|
||||
_flow_classifier = {'flow_classifier': _fc}
|
||||
_flow_classifier_id = _fc['id']
|
||||
columns = ('Description',
|
||||
'Destination IP',
|
||||
'Destination Port Range Max',
|
||||
'Destination Port Range Min',
|
||||
'Ethertype',
|
||||
'ID',
|
||||
'L7 Parameters',
|
||||
'Logical Destination Port',
|
||||
'Logical Source Port',
|
||||
'Name',
|
||||
'Project',
|
||||
'Protocol',
|
||||
'Source IP',
|
||||
'Source Port Range Max',
|
||||
'Source Port Range Min')
|
||||
|
||||
def setUp(self):
|
||||
super(TestShowSfcFlowClassifier, self).setUp()
|
||||
mock.patch(get_id, new=_get_id).start()
|
||||
self.neutronclient.show_flow_classifier = mock.Mock(
|
||||
return_value=self._flow_classifier
|
||||
)
|
||||
# Get the command object to test
|
||||
self.cmd = sfc_flow_classifier.ShowSfcFlowClassifier(self.app,
|
||||
self.namespace)
|
||||
|
||||
def test_show_flow_classifier(self):
|
||||
client = self.app.client_manager.neutronclient
|
||||
mock_flow_classifier_show = client.show_flow_classifier
|
||||
arglist = [
|
||||
self._flow_classifier_id,
|
||||
]
|
||||
verifylist = [
|
||||
('flow_classifier', self._flow_classifier_id),
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
|
||||
columns, data = self.cmd.take_action(parsed_args)
|
||||
mock_flow_classifier_show.assert_called_once_with(
|
||||
self._flow_classifier_id)
|
||||
self.assertEqual(self.columns, columns)
|
||||
self.assertEqual(self.data, data)
|
||||
|
||||
|
||||
class TestListSfcFlowClassifier(fakes.TestNeutronClientOSCV2):
|
||||
|
||||
_fc = fakes.FakeSfcFlowClassifier.create_flow_classifiers(count=1)
|
||||
|
||||
columns = ('ID', 'Name', 'Summary')
|
||||
|
||||
columns_long = ('ID', 'Name', 'Protocol', 'Ethertype', 'Source IP',
|
||||
'Destination IP', 'Logical Source Port',
|
||||
'Logical Destination Port', 'Source Port Range Min',
|
||||
'Source Port Range Max', 'Destination Port Range Min',
|
||||
'Destination Port Range Max', 'L7 Parameters',
|
||||
'Description', 'Project')
|
||||
_flow_classifier = _fc[0]
|
||||
data = [
|
||||
_flow_classifier['id'],
|
||||
_flow_classifier['name'],
|
||||
_flow_classifier['protocol'],
|
||||
_flow_classifier['source_ip_prefix'],
|
||||
_flow_classifier['destination_ip_prefix'],
|
||||
_flow_classifier['logical_source_port'],
|
||||
_flow_classifier['logical_destination_port']
|
||||
]
|
||||
data_long = [
|
||||
_flow_classifier['id'],
|
||||
_flow_classifier['name'],
|
||||
_flow_classifier['protocol'],
|
||||
_flow_classifier['ethertype'],
|
||||
_flow_classifier['source_ip_prefix'],
|
||||
_flow_classifier['destination_ip_prefix'],
|
||||
_flow_classifier['logical_source_port'],
|
||||
_flow_classifier['logical_destination_port'],
|
||||
_flow_classifier['source_port_range_min'],
|
||||
_flow_classifier['source_port_range_max'],
|
||||
_flow_classifier['destination_port_range_min'],
|
||||
_flow_classifier['destination_port_range_max'],
|
||||
_flow_classifier['l7_parameters'],
|
||||
_flow_classifier['description']
|
||||
]
|
||||
|
||||
_flow_classifier1 = {'flow_classifiers': _flow_classifier}
|
||||
_flow_classifier_id = _flow_classifier['id']
|
||||
|
||||
def setUp(self):
|
||||
super(TestListSfcFlowClassifier, self).setUp()
|
||||
mock.patch(get_id, new=_get_id).start()
|
||||
self.neutronclient.list_flow_classifier = mock.Mock(
|
||||
return_value={'flow_classifiers': self._fc}
|
||||
)
|
||||
# Get the command object to test
|
||||
self.cmd = sfc_flow_classifier.ListSfcFlowClassifier(self.app,
|
||||
self.namespace)
|
||||
|
||||
def test_list_flow_classifier(self):
|
||||
arglist = []
|
||||
verifylist = []
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
columns = self.cmd.take_action(parsed_args)
|
||||
fcs = self.neutronclient.list_flow_classifier()['flow_classifiers']
|
||||
fc = fcs[0]
|
||||
data = [
|
||||
fc['id'],
|
||||
fc['name'],
|
||||
fc['protocol'],
|
||||
fc['source_ip_prefix'],
|
||||
fc['destination_ip_prefix'],
|
||||
fc['logical_source_port'],
|
||||
fc['logical_destination_port']
|
||||
]
|
||||
self.assertEqual(list(self.columns), columns[0])
|
||||
self.assertEqual(self.data, data)
|
||||
|
||||
def test_list_with_long_option(self):
|
||||
arglist = ['--long']
|
||||
verifylist = [('long', True)]
|
||||
fcs = self.neutronclient.list_flow_classifier()['flow_classifiers']
|
||||
fc = fcs[0]
|
||||
data = [
|
||||
fc['id'],
|
||||
fc['name'],
|
||||
fc['protocol'],
|
||||
fc['ethertype'],
|
||||
fc['source_ip_prefix'],
|
||||
fc['destination_ip_prefix'],
|
||||
fc['logical_source_port'],
|
||||
fc['logical_destination_port'],
|
||||
fc['source_port_range_min'],
|
||||
fc['source_port_range_max'],
|
||||
fc['destination_port_range_min'],
|
||||
fc['destination_port_range_max'],
|
||||
fc['l7_parameters'],
|
||||
fc['description']
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
columns_long = self.cmd.take_action(parsed_args)[0]
|
||||
self.assertEqual(list(self.columns_long), columns_long)
|
||||
self.assertEqual(self.data_long, data)
|
556
neutronclient/tests/unit/osc/v2/sfc/test_port_chain.py
Executable file
556
neutronclient/tests/unit/osc/v2/sfc/test_port_chain.py
Executable file
@ -0,0 +1,556 @@
|
||||
# Copyright (c) 2017 Huawei Technologies India Pvt.Limited.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
|
||||
from osc_lib import exceptions
|
||||
|
||||
from neutronclient.osc.v2.sfc import sfc_port_chain
|
||||
from neutronclient.tests.unit.osc.v2.sfc import fakes
|
||||
|
||||
|
||||
def _get_id(client, id_or_name, resource):
|
||||
return id_or_name
|
||||
|
||||
|
||||
class TestCreateSfcPortChain(fakes.TestNeutronClientOSCV2):
|
||||
# The new port_chain created
|
||||
_port_chain = fakes.FakeSfcPortChain.create_port_chain()
|
||||
|
||||
columns = ('Chain ID',
|
||||
'Chain Parameters',
|
||||
'Description',
|
||||
'Flow Classifiers',
|
||||
'ID',
|
||||
'Name',
|
||||
'Port Pair Groups',
|
||||
'Project')
|
||||
|
||||
def get_data(self):
|
||||
return (
|
||||
self._port_chain['chain_id'],
|
||||
self._port_chain['chain_parameters'],
|
||||
self._port_chain['description'],
|
||||
self._port_chain['flow_classifiers'],
|
||||
self._port_chain['id'],
|
||||
self._port_chain['name'],
|
||||
self._port_chain['port_pair_groups'],
|
||||
self._port_chain['project_id'],
|
||||
)
|
||||
|
||||
def setUp(self):
|
||||
super(TestCreateSfcPortChain, self).setUp()
|
||||
mock.patch(
|
||||
'neutronclient.osc.v2.sfc.sfc_port_chain._get_id',
|
||||
new=_get_id).start()
|
||||
self.neutronclient.create_port_chain = mock.Mock(
|
||||
return_value={'port_chain': self._port_chain})
|
||||
self.data = self.get_data()
|
||||
|
||||
# Get the command object to test
|
||||
self.cmd = sfc_port_chain.CreateSfcPortChain(self.app, self.namespace)
|
||||
|
||||
def test_create_port_chain_dafault_options(self):
|
||||
arglist = [
|
||||
self._port_chain['name'],
|
||||
"--port-pair-group", self._port_chain['port_pair_groups']
|
||||
]
|
||||
verifylist = [
|
||||
('name', self._port_chain['name']),
|
||||
('port_pair_groups', [self._port_chain['port_pair_groups']]),
|
||||
('flow_classifiers', []),
|
||||
('chain_parameters', None),
|
||||
]
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
columns, data = (self.cmd.take_action(parsed_args))
|
||||
|
||||
self.neutronclient.create_port_chain.assert_called_once_with({
|
||||
'port_chain': {
|
||||
'name': self._port_chain['name'],
|
||||
'port_pair_groups': [self._port_chain['port_pair_groups']],
|
||||
'flow_classifiers': [],
|
||||
'chain_parameters': None}
|
||||
})
|
||||
self.assertEqual(self.columns, columns)
|
||||
self.assertEqual(self.data, data)
|
||||
|
||||
def test_create_port_chain_all_options(self):
|
||||
arglist = [
|
||||
"--description", self._port_chain['description'],
|
||||
"--port-pair-group", self._port_chain['port_pair_groups'],
|
||||
self._port_chain['name'],
|
||||
"--flow-classifier", self._port_chain['flow_classifiers'],
|
||||
"--chain-parameters", 'correlation=mpls,symmetric=true',
|
||||
]
|
||||
|
||||
cp = [{'correlation': 'mpls', 'symmetric': 'true'}]
|
||||
|
||||
verifylist = [
|
||||
('port_pair_groups', [self._port_chain['port_pair_groups']]),
|
||||
('name', self._port_chain['name']),
|
||||
('description', self._port_chain['description']),
|
||||
('flow_classifiers', [self._port_chain['flow_classifiers']]),
|
||||
('chain_parameters', cp)
|
||||
]
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
columns, data = (self.cmd.take_action(parsed_args))
|
||||
|
||||
self.neutronclient.create_port_chain.assert_called_once_with({
|
||||
'port_chain': {
|
||||
'name': self._port_chain['name'],
|
||||
'port_pair_groups': [self._port_chain['port_pair_groups']],
|
||||
'description': self._port_chain['description'],
|
||||
'flow_classifiers': [self._port_chain['flow_classifiers']],
|
||||
'chain_parameters': cp
|
||||
}
|
||||
})
|
||||
self.assertEqual(self.columns, columns)
|
||||
self.assertEqual(self.data, data)
|
||||
|
||||
|
||||
class TestDeleteSfcPortChain(fakes.TestNeutronClientOSCV2):
|
||||
|
||||
_port_chain = fakes.FakeSfcPortChain.create_port_chains(count=1)
|
||||
|
||||
def setUp(self):
|
||||
super(TestDeleteSfcPortChain, self).setUp()
|
||||
mock.patch(
|
||||
'neutronclient.osc.v2.sfc.sfc_port_chain._get_id',
|
||||
new=_get_id).start()
|
||||
self.neutronclient.delete_port_chain = mock.Mock(return_value=None)
|
||||
self.cmd = sfc_port_chain.DeleteSfcPortChain(self.app, self.namespace)
|
||||
|
||||
def test_delete_port_chain(self):
|
||||
client = self.app.client_manager.neutronclient
|
||||
mock_port_chain_delete = client.delete_port_chain
|
||||
arglist = [
|
||||
self._port_chain[0]['id'],
|
||||
]
|
||||
verifylist = [
|
||||
('port_chain', self._port_chain[0]['id']),
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
result = self.cmd.take_action(parsed_args)
|
||||
mock_port_chain_delete.assert_called_once_with(
|
||||
self._port_chain[0]['id'])
|
||||
self.assertIsNone(result)
|
||||
|
||||
|
||||
class TestListSfcPortChain(fakes.TestNeutronClientOSCV2):
|
||||
_port_chains = fakes.FakeSfcPortChain.create_port_chains(count=1)
|
||||
columns = ('ID', 'Name', 'Port Pair Groups', 'Flow Classifiers',
|
||||
'Chain Parameters', 'Chain ID')
|
||||
columns_long = ('ID', 'Name', 'Port Pair Groups', 'Flow Classifiers',
|
||||
'Chain Parameters', 'Description', 'Chain ID', 'Project')
|
||||
_port_chain = _port_chains[0]
|
||||
data = [
|
||||
_port_chain['id'],
|
||||
_port_chain['name'],
|
||||
_port_chain['port_pair_groups'],
|
||||
_port_chain['flow_classifiers'],
|
||||
_port_chain['chain_parameters'],
|
||||
_port_chain['chain_id']
|
||||
]
|
||||
data_long = [
|
||||
_port_chain['id'],
|
||||
_port_chain['name'],
|
||||
_port_chain['project_id'],
|
||||
_port_chain['chain_id'],
|
||||
_port_chain['port_pair_groups'],
|
||||
_port_chain['flow_classifiers'],
|
||||
_port_chain['chain_parameters'],
|
||||
_port_chain['description']
|
||||
]
|
||||
_port_chain1 = {'port_chains': _port_chain}
|
||||
_port_chain_id = _port_chain['id']
|
||||
|
||||
def setUp(self):
|
||||
super(TestListSfcPortChain, self).setUp()
|
||||
mock.patch(
|
||||
'neutronclient.osc.v2.sfc.sfc_port_chain._get_id',
|
||||
new=_get_id).start()
|
||||
self.neutronclient.list_port_chain = mock.Mock(
|
||||
return_value={'port_chains': self._port_chains}
|
||||
)
|
||||
# Get the command object to test
|
||||
self.cmd = sfc_port_chain.ListSfcPortChain(self.app, self.namespace)
|
||||
|
||||
def test_list_port_chain(self):
|
||||
arglist = []
|
||||
verifylist = []
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
columns = self.cmd.take_action(parsed_args)[0]
|
||||
pcs = self.neutronclient.list_port_chain()['port_chains']
|
||||
pc = pcs[0]
|
||||
data = [
|
||||
pc['id'],
|
||||
pc['name'],
|
||||
pc['port_pair_groups'],
|
||||
pc['flow_classifiers'],
|
||||
pc['chain_parameters'],
|
||||
pc['chain_id']
|
||||
]
|
||||
self.assertEqual(list(self.columns), columns)
|
||||
self.assertEqual(self.data, data)
|
||||
|
||||
def test_list_port_chain_with_long_opion(self):
|
||||
arglist = ['--long']
|
||||
verifylist = [('long', True)]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
columns = self.cmd.take_action(parsed_args)[0]
|
||||
pcs = self.neutronclient.list_port_chain()['port_chains']
|
||||
pc = pcs[0]
|
||||
data = [
|
||||
pc['id'],
|
||||
pc['name'],
|
||||
pc['project_id'],
|
||||
pc['chain_id'],
|
||||
pc['port_pair_groups'],
|
||||
pc['flow_classifiers'],
|
||||
pc['chain_parameters'],
|
||||
pc['description']
|
||||
]
|
||||
self.assertEqual(list(self.columns_long), columns)
|
||||
self.assertEqual(self.data_long, data)
|
||||
|
||||
|
||||
class TestSetSfcPortChain(fakes.TestNeutronClientOSCV2):
|
||||
_port_chain = fakes.FakeSfcPortChain.create_port_chain()
|
||||
resource = _port_chain
|
||||
res = 'port_chain'
|
||||
_port_chain_name = _port_chain['name']
|
||||
_port_chain_id = _port_chain['id']
|
||||
pc_ppg = _port_chain['port_pair_groups']
|
||||
pc_fc = _port_chain['flow_classifiers']
|
||||
|
||||
def setUp(self):
|
||||
super(TestSetSfcPortChain, self).setUp()
|
||||
mock.patch(
|
||||
'neutronclient.osc.v2.sfc.sfc_port_chain._get_id',
|
||||
new=_get_id).start()
|
||||
self.mocked = self.neutronclient.update_port_chain
|
||||
self.cmd = sfc_port_chain.SetSfcPortChain(self.app, self.namespace)
|
||||
|
||||
def test_set_port_chain(self):
|
||||
client = self.app.client_manager.neutronclient
|
||||
mock_port_chain_update = client.update_port_chain
|
||||
arglist = [
|
||||
self._port_chain_name,
|
||||
'--name', 'name_updated',
|
||||
'--description', 'desc_updated',
|
||||
]
|
||||
verifylist = [
|
||||
('port_chain', self._port_chain_name),
|
||||
('name', 'name_updated'),
|
||||
('description', 'desc_updated'),
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
result = self.cmd.take_action(parsed_args)
|
||||
attrs = {'port_chain': {'name': 'name_updated',
|
||||
'description': 'desc_updated'}}
|
||||
mock_port_chain_update.assert_called_once_with(self._port_chain_name,
|
||||
attrs)
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_set_flow_classifier(self):
|
||||
target = self.resource['id']
|
||||
fc1 = 'flow_classifier1'
|
||||
|
||||
def _mock_flow_classifier(*args, **kwargs):
|
||||
|
||||
if self.neutronclient.find_resource.call_count == 1:
|
||||
self.neutronclient.find_resource.assert_called_with(
|
||||
'flow_classifier', fc1, cmd_resource='sfc_flow_classifier')
|
||||
return {'id': args[1]}
|
||||
|
||||
if self.neutronclient.find_resource.call_count == 2:
|
||||
self.neutronclient.find_resource.assert_called_with(
|
||||
self.res, target, cmd_resource='sfc_port_chain')
|
||||
return {'flow_classifiers': self.pc_fc}
|
||||
self.neutronclient.find_resource.side_effect = _mock_flow_classifier
|
||||
arglist = [
|
||||
target,
|
||||
'--flow-classifier', fc1,
|
||||
]
|
||||
verifylist = [
|
||||
(self.res, target),
|
||||
('flow_classifiers', [fc1])
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
result = self.cmd.take_action(parsed_args)
|
||||
expect = {'flow_classifiers': sorted([self.pc_fc, fc1])}
|
||||
self.mocked.assert_called_once_with(target, {self.res: expect})
|
||||
self.assertEqual(2, self.neutronclient.find_resource.call_count)
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_set_no_flow_classifier(self):
|
||||
client = self.app.client_manager.neutronclient
|
||||
mock_port_chain_update = client.update_port_chain
|
||||
arglist = [
|
||||
self._port_chain_name,
|
||||
'--no-flow-classifier',
|
||||
]
|
||||
verifylist = [
|
||||
('port_chain', self._port_chain_name),
|
||||
('no_flow_classifier', True),
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
result = self.cmd.take_action(parsed_args)
|
||||
attrs = {'port_chain': {'flow_classifiers': []}}
|
||||
mock_port_chain_update.assert_called_once_with(self._port_chain_name,
|
||||
attrs)
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_set_port_pair_groups(self):
|
||||
target = self.resource['id']
|
||||
existing_ppg = self.pc_ppg
|
||||
ppg1 = 'port_pair_group1'
|
||||
ppg2 = 'port_pair_group2'
|
||||
|
||||
def _mock_flow_classifier(*args, **kwargs):
|
||||
if self.neutronclient.find_resource.call_count == 1:
|
||||
self.neutronclient.find_resource.assert_called_with(
|
||||
self.res, target, cmd_resource='sfc_port_chain')
|
||||
return {'port_pair_groups': self.pc_ppg}
|
||||
|
||||
if self.neutronclient.find_resource.call_count == 2:
|
||||
self.neutronclient.find_resource.assert_called_with(
|
||||
'port_pair_group', ppg1,
|
||||
cmd_resource='sfc_port_pair_group')
|
||||
return {'id': args[1]}
|
||||
|
||||
if self.neutronclient.find_resource.call_count == 3:
|
||||
self.neutronclient.find_resource.assert_called_with(
|
||||
'port_pair_group', ppg2,
|
||||
cmd_resource='sfc_port_pair_group')
|
||||
return {'id': args[1]}
|
||||
|
||||
self.neutronclient.find_resource.side_effect = _mock_flow_classifier
|
||||
arglist = [
|
||||
target,
|
||||
'--port-pair-group', ppg1,
|
||||
'--port-pair-group', ppg2,
|
||||
]
|
||||
verifylist = [
|
||||
(self.res, target),
|
||||
('port_pair_groups', [ppg1, ppg2])
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
result = self.cmd.take_action(parsed_args)
|
||||
expect = {'port_pair_groups': sorted([existing_ppg, ppg1, ppg2])}
|
||||
self.mocked.assert_called_once_with(target, {self.res: expect})
|
||||
self.assertEqual(3, self.neutronclient.find_resource.call_count)
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_set_no_port_pair_group(self):
|
||||
target = self.resource['id']
|
||||
ppg1 = 'port_pair_group1'
|
||||
|
||||
def _mock_port_pair_group(*args, **kwargs):
|
||||
|
||||
if self.neutronclient.find_resource.call_count == 1:
|
||||
self.neutronclient.find_resource.assert_called_with(
|
||||
'port_pair_group', ppg1,
|
||||
cmd_resource='sfc_port_pair_group')
|
||||
return {'id': args[1]}
|
||||
self.neutronclient.find_resource.side_effect = _mock_port_pair_group
|
||||
arglist = [
|
||||
target,
|
||||
'--no-port-pair-group',
|
||||
'--port-pair-group', ppg1,
|
||||
]
|
||||
verifylist = [
|
||||
(self.res, target),
|
||||
('no_port_pair_group', True),
|
||||
('port_pair_groups', [ppg1])
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
result = self.cmd.take_action(parsed_args)
|
||||
expect = {'port_pair_groups': [ppg1]}
|
||||
self.mocked.assert_called_once_with(target, {self.res: expect})
|
||||
self.assertEqual(1, self.neutronclient.find_resource.call_count)
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_set_only_no_port_pair_group(self):
|
||||
target = self.resource['id']
|
||||
arglist = [
|
||||
target,
|
||||
'--no-port-pair-group',
|
||||
]
|
||||
verifylist = [
|
||||
(self.res, target),
|
||||
('no_port_pair_group', True),
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
self.assertRaises(
|
||||
exceptions.CommandError, self.cmd.take_action, parsed_args)
|
||||
|
||||
|
||||
class TestShowSfcPortChain(fakes.TestNeutronClientOSCV2):
|
||||
|
||||
_pc = fakes.FakeSfcPortChain.create_port_chain()
|
||||
data = (
|
||||
_pc['chain_id'],
|
||||
_pc['chain_parameters'],
|
||||
_pc['description'],
|
||||
_pc['flow_classifiers'],
|
||||
_pc['id'],
|
||||
_pc['name'],
|
||||
_pc['port_pair_groups'],
|
||||
_pc['project_id']
|
||||
)
|
||||
_port_chain = {'port_chain': _pc}
|
||||
_port_chain_id = _pc['id']
|
||||
columns = ('Chain ID',
|
||||
'Chain Parameters',
|
||||
'Description',
|
||||
'Flow Classifiers',
|
||||
'ID',
|
||||
'Name',
|
||||
'Port Pair Groups',
|
||||
'Project')
|
||||
|
||||
def setUp(self):
|
||||
super(TestShowSfcPortChain, self).setUp()
|
||||
mock.patch(
|
||||
'neutronclient.osc.v2.sfc.sfc_port_chain._get_id',
|
||||
new=_get_id).start()
|
||||
self.neutronclient.show_port_chain = mock.Mock(
|
||||
return_value=self._port_chain
|
||||
)
|
||||
# Get the command object to test
|
||||
self.cmd = sfc_port_chain.ShowSfcPortChain(self.app, self.namespace)
|
||||
|
||||
def test_show_port_chain(self):
|
||||
client = self.app.client_manager.neutronclient
|
||||
mock_port_chain_show = client.show_port_chain
|
||||
arglist = [
|
||||
self._port_chain_id,
|
||||
]
|
||||
verifylist = [
|
||||
('port_chain', self._port_chain_id),
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
|
||||
columns, data = self.cmd.take_action(parsed_args)
|
||||
mock_port_chain_show.assert_called_once_with(self._port_chain_id)
|
||||
self.assertEqual(self.columns, columns)
|
||||
self.assertEqual(self.data, data)
|
||||
|
||||
|
||||
class TestUnsetSfcPortChain(fakes.TestNeutronClientOSCV2):
|
||||
_port_chain = fakes.FakeSfcPortChain.create_port_chain()
|
||||
resource = _port_chain
|
||||
res = 'port_chain'
|
||||
_port_chain_name = _port_chain['name']
|
||||
_port_chain_id = _port_chain['id']
|
||||
pc_ppg = _port_chain['port_pair_groups']
|
||||
pc_fc = _port_chain['flow_classifiers']
|
||||
|
||||
def setUp(self):
|
||||
super(TestUnsetSfcPortChain, self).setUp()
|
||||
mock.patch(
|
||||
'neutronclient.osc.v2.sfc.sfc_port_chain._get_id',
|
||||
new=_get_id).start()
|
||||
self.neutronclient.update_port_chain = mock.Mock(
|
||||
return_value=None)
|
||||
self.mocked = self.neutronclient.update_port_chain
|
||||
self.cmd = sfc_port_chain.UnsetSfcPortChain(self.app, self.namespace)
|
||||
|
||||
def test_unset_port_pair_group(self):
|
||||
target = self.resource['id']
|
||||
ppg1 = 'port_pair_group1'
|
||||
|
||||
def _mock_port_pair_group(*args, **kwargs):
|
||||
if self.neutronclient.find_resource.call_count == 1:
|
||||
self.neutronclient.find_resource.assert_called_with(
|
||||
self.res, target, cmd_resource='sfc_port_chain')
|
||||
return {'port_pair_groups': self.pc_ppg}
|
||||
|
||||
if self.neutronclient.find_resource.call_count == 2:
|
||||
self.neutronclient.find_resource.assert_called_with(
|
||||
'port_pair_group', ppg1,
|
||||
cmd_resource='sfc_port_pair_group')
|
||||
return {'id': args[1]}
|
||||
if self.neutronclient.find_resource.call_count == 3:
|
||||
self.neutronclient.find_resource.assert_called_with(
|
||||
self.res, target, cmd_resource='sfc_port_chain')
|
||||
return {'id': args[1]}
|
||||
self.neutronclient.find_resource.side_effect = _mock_port_pair_group
|
||||
|
||||
arglist = [
|
||||
target,
|
||||
'--port-pair-group', ppg1,
|
||||
]
|
||||
verifylist = [
|
||||
(self.res, target),
|
||||
('port_pair_groups', [ppg1])
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
result = self.cmd.take_action(parsed_args)
|
||||
expect = {'port_pair_groups': sorted([self.pc_ppg])}
|
||||
self.mocked.assert_called_once_with(target, {self.res: expect})
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_unset_flow_classifier(self):
|
||||
target = self.resource['id']
|
||||
fc1 = 'flow_classifier1'
|
||||
|
||||
def _mock_flow_classifier(*args, **kwargs):
|
||||
if self.neutronclient.find_resource.call_count == 1:
|
||||
self.neutronclient.find_resource.assert_called_with(
|
||||
self.res, target, cmd_resource='sfc_port_chain')
|
||||
return {'flow_classifiers': self.pc_fc}
|
||||
|
||||
if self.neutronclient.find_resource.call_count == 2:
|
||||
self.neutronclient.find_resource.assert_called_with(
|
||||
'flow_classifier', fc1, cmd_resource='sfc_flow_classifier')
|
||||
return {'id': args[1]}
|
||||
self.neutronclient.find_resource.side_effect = _mock_flow_classifier
|
||||
|
||||
arglist = [
|
||||
target,
|
||||
'--flow-classifier', fc1,
|
||||
]
|
||||
verifylist = [
|
||||
(self.res, target),
|
||||
('flow_classifiers', [fc1])
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
result = self.cmd.take_action(parsed_args)
|
||||
expect = {'flow_classifiers': sorted([self.pc_fc])}
|
||||
self.mocked.assert_called_once_with(target, {self.res: expect})
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_unset_all_flow_classifier(self):
|
||||
client = self.app.client_manager.neutronclient
|
||||
target = self.resource['id']
|
||||
mock_port_chain_update = client.update_port_chain
|
||||
arglist = [
|
||||
target,
|
||||
'--all-flow-classifier',
|
||||
]
|
||||
verifylist = [
|
||||
(self.res, target),
|
||||
('all_flow_classifier', True)
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
result = self.cmd.take_action(parsed_args)
|
||||
expect = {'flow_classifiers': []}
|
||||
mock_port_chain_update.assert_called_once_with(target,
|
||||
{self.res: expect})
|
||||
self.assertIsNone(result)
|
302
neutronclient/tests/unit/osc/v2/sfc/test_port_pair.py
Executable file
302
neutronclient/tests/unit/osc/v2/sfc/test_port_pair.py
Executable file
@ -0,0 +1,302 @@
|
||||
# Copyright (c) 2017 Huawei Technologies India Pvt.Limited.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
|
||||
from neutronclient.osc.v2.sfc import sfc_port_pair
|
||||
from neutronclient.tests.unit.osc.v2.sfc import fakes
|
||||
|
||||
|
||||
def _get_id(client, id_or_name, resource):
|
||||
return id_or_name
|
||||
|
||||
|
||||
class TestCreateSfcPortPair(fakes.TestNeutronClientOSCV2):
|
||||
# The new port_pair created
|
||||
_port_pair = fakes.FakeSfcPortPair.create_port_pair()
|
||||
|
||||
columns = ('Description',
|
||||
'Egress Logical Port',
|
||||
'ID',
|
||||
'Ingress Logical Port',
|
||||
'Name',
|
||||
'Project',
|
||||
'Service Function Parameters')
|
||||
|
||||
def get_data(self):
|
||||
return (
|
||||
self._port_pair['description'],
|
||||
self._port_pair['egress'],
|
||||
self._port_pair['id'],
|
||||
self._port_pair['ingress'],
|
||||
self._port_pair['name'],
|
||||
self._port_pair['project_id'],
|
||||
self._port_pair['service_function_parameters']
|
||||
)
|
||||
|
||||
def setUp(self):
|
||||
super(TestCreateSfcPortPair, self).setUp()
|
||||
mock.patch('neutronclient.osc.v2.sfc.sfc_port_pair._get_id',
|
||||
new=_get_id).start()
|
||||
self.neutronclient.create_port_pair = mock.Mock(
|
||||
return_value={'port_pair': self._port_pair})
|
||||
self.data = self.get_data()
|
||||
|
||||
# Get the command object to test
|
||||
self.cmd = sfc_port_pair.CreateSfcPortPair(self.app, self.namespace)
|
||||
|
||||
def test_create_port_pair_default_options(self):
|
||||
arglist = [
|
||||
"--ingress", self._port_pair['ingress'],
|
||||
"--egress", self._port_pair['egress'],
|
||||
self._port_pair['name'],
|
||||
]
|
||||
verifylist = [
|
||||
('ingress', self._port_pair['ingress']),
|
||||
('egress', self._port_pair['egress']),
|
||||
('name', self._port_pair['name'])
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
columns, data = (self.cmd.take_action(parsed_args))
|
||||
|
||||
self.neutronclient.create_port_pair.assert_called_once_with({
|
||||
'port_pair': {'name': self._port_pair['name'],
|
||||
'ingress': self._port_pair['ingress'],
|
||||
'egress': self._port_pair['egress'],
|
||||
'service_function_parameters': None,
|
||||
}
|
||||
})
|
||||
self.assertEqual(self.columns, columns)
|
||||
self.assertEqual(self.data, data)
|
||||
|
||||
def test_create_port_pair_all_options(self):
|
||||
arglist = [
|
||||
"--description", self._port_pair['description'],
|
||||
"--egress", self._port_pair['egress'],
|
||||
"--ingress", self._port_pair['ingress'],
|
||||
self._port_pair['name'],
|
||||
"--service-function-parameters", 'correlation=None,weight=1',
|
||||
]
|
||||
|
||||
sfp = [{'correlation': 'None', 'weight': '1'}]
|
||||
|
||||
verifylist = [
|
||||
('ingress', self._port_pair['ingress']),
|
||||
('egress', self._port_pair['egress']),
|
||||
('name', self._port_pair['name']),
|
||||
('description', self._port_pair['description']),
|
||||
('service_function_parameters', sfp)
|
||||
]
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
columns, data = (self.cmd.take_action(parsed_args))
|
||||
|
||||
self.neutronclient.create_port_pair.assert_called_once_with({
|
||||
'port_pair': {'name': self._port_pair['name'],
|
||||
'ingress': self._port_pair['ingress'],
|
||||
'egress': self._port_pair['egress'],
|
||||
'description': self._port_pair['description'],
|
||||
'service_function_parameters':
|
||||
[{'correlation': 'None', 'weight': '1'}],
|
||||
}
|
||||
})
|
||||
self.assertEqual(self.columns, columns)
|
||||
self.assertEqual(self.data, data)
|
||||
|
||||
|
||||
class TestDeleteSfcPortPair(fakes.TestNeutronClientOSCV2):
|
||||
|
||||
_port_pair = fakes.FakeSfcPortPair.create_port_pairs(count=1)
|
||||
|
||||
def setUp(self):
|
||||
super(TestDeleteSfcPortPair, self).setUp()
|
||||
mock.patch('neutronclient.osc.v2.sfc.sfc_port_pair._get_id',
|
||||
new=_get_id).start()
|
||||
self.neutronclient.delete_port_pair = mock.Mock(return_value=None)
|
||||
self.cmd = sfc_port_pair.DeleteSfcPortPair(self.app, self.namespace)
|
||||
|
||||
def test_delete_port_pair(self):
|
||||
client = self.app.client_manager.neutronclient
|
||||
mock_port_pair_delete = client.delete_port_pair
|
||||
arglist = [
|
||||
self._port_pair[0]['id'],
|
||||
]
|
||||
verifylist = [
|
||||
('port_pair', self._port_pair[0]['id']),
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
result = self.cmd.take_action(parsed_args)
|
||||
mock_port_pair_delete.assert_called_once_with(
|
||||
self._port_pair[0]['id'])
|
||||
self.assertIsNone(result)
|
||||
|
||||
|
||||
class TestListSfcPortPair(fakes.TestNeutronClientOSCV2):
|
||||
_port_pairs = fakes.FakeSfcPortPair.create_port_pairs()
|
||||
columns = ('ID', 'Name', 'Ingress Logical Port', 'Egress Logical Port')
|
||||
columns_long = ('ID', 'Name', 'Ingress Logical Port',
|
||||
'Egress Logical Port', 'Service Function Parameters',
|
||||
'Description', 'Project')
|
||||
_port_pair = _port_pairs[0]
|
||||
data = [
|
||||
_port_pair['id'],
|
||||
_port_pair['name'],
|
||||
_port_pair['ingress'],
|
||||
_port_pair['egress']
|
||||
]
|
||||
data_long = [
|
||||
_port_pair['id'],
|
||||
_port_pair['name'],
|
||||
_port_pair['ingress'],
|
||||
_port_pair['egress'],
|
||||
_port_pair['service_function_parameters'],
|
||||
_port_pair['description']
|
||||
]
|
||||
_port_pair1 = {'port_pairs': _port_pair}
|
||||
_port_pair_id = _port_pair['id'],
|
||||
|
||||
def setUp(self):
|
||||
super(TestListSfcPortPair, self).setUp()
|
||||
mock.patch('neutronclient.osc.v2.sfc.sfc_port_pair._get_id',
|
||||
new=_get_id).start()
|
||||
self.neutronclient.list_port_pair = mock.Mock(
|
||||
return_value={'port_pairs': self._port_pairs}
|
||||
)
|
||||
# Get the command object to test
|
||||
self.cmd = sfc_port_pair.ListSfcPortPair(self.app, self.namespace)
|
||||
|
||||
def test_list_port_pair(self):
|
||||
arglist = []
|
||||
verifylist = []
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
columns = self.cmd.take_action(parsed_args)[0]
|
||||
port_pairs = self.neutronclient.list_port_pair()['port_pairs']
|
||||
port_pair = port_pairs[0]
|
||||
data = [
|
||||
port_pair['id'],
|
||||
port_pair['name'],
|
||||
port_pair['ingress'],
|
||||
port_pair['egress']
|
||||
]
|
||||
self.assertEqual(list(self.columns), columns)
|
||||
self.assertEqual(self.data, data)
|
||||
|
||||
def test_list_with_long_option(self):
|
||||
arglist = ['--long']
|
||||
verifylist = [('long', True)]
|
||||
port_pairs = self.neutronclient.list_port_pair()['port_pairs']
|
||||
port_pair = port_pairs[0]
|
||||
data = [
|
||||
port_pair['id'],
|
||||
port_pair['name'],
|
||||
port_pair['ingress'],
|
||||
port_pair['egress'],
|
||||
port_pair['service_function_parameters'],
|
||||
port_pair['description']
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
columns_long = self.cmd.take_action(parsed_args)[0]
|
||||
self.assertEqual(list(self.columns_long), columns_long)
|
||||
self.assertEqual(self.data_long, data)
|
||||
|
||||
|
||||
class TestSetSfcPortPair(fakes.TestNeutronClientOSCV2):
|
||||
_port_pair = fakes.FakeSfcPortPair.create_port_pair()
|
||||
_port_pair_name = _port_pair['name']
|
||||
_port_pair_id = _port_pair['id']
|
||||
|
||||
def setUp(self):
|
||||
super(TestSetSfcPortPair, self).setUp()
|
||||
mock.patch('neutronclient.osc.v2.sfc.sfc_port_pair._get_id',
|
||||
new=_get_id).start()
|
||||
self.neutronclient.update_port_pair = mock.Mock(return_value=None)
|
||||
self.cmd = sfc_port_pair.SetSfcPortPair(self.app, self.namespace)
|
||||
|
||||
def test_set_port_pair(self):
|
||||
client = self.app.client_manager.neutronclient
|
||||
mock_port_pair_update = client.update_port_pair
|
||||
arglist = [
|
||||
self._port_pair_name,
|
||||
'--name', 'name_updated',
|
||||
'--description', 'desc_updated'
|
||||
]
|
||||
verifylist = [
|
||||
('port_pair', self._port_pair_name),
|
||||
('name', 'name_updated'),
|
||||
('description', 'desc_updated'),
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
result = self.cmd.take_action(parsed_args)
|
||||
attrs = {'port_pair': {
|
||||
'name': 'name_updated',
|
||||
'description': 'desc_updated'}
|
||||
}
|
||||
mock_port_pair_update.assert_called_once_with(self._port_pair_name,
|
||||
attrs)
|
||||
self.assertIsNone(result)
|
||||
|
||||
|
||||
class TestShowSfcPortPair(fakes.TestNeutronClientOSCV2):
|
||||
|
||||
_pp = fakes.FakeSfcPortPair.create_port_pair()
|
||||
|
||||
data = (
|
||||
_pp['description'],
|
||||
_pp['egress'],
|
||||
_pp['id'],
|
||||
_pp['ingress'],
|
||||
_pp['name'],
|
||||
_pp['project_id'],
|
||||
_pp['service_function_parameters'],
|
||||
)
|
||||
_port_pair = {'port_pair': _pp}
|
||||
_port_pair_id = _pp['id']
|
||||
columns = (
|
||||
'Description',
|
||||
'Egress Logical Port',
|
||||
'ID',
|
||||
'Ingress Logical Port',
|
||||
'Name',
|
||||
'Project',
|
||||
'Service Function Parameters'
|
||||
)
|
||||
|
||||
def setUp(self):
|
||||
super(TestShowSfcPortPair, self).setUp()
|
||||
mock.patch('neutronclient.osc.v2.sfc.sfc_port_pair._get_id',
|
||||
new=_get_id).start()
|
||||
|
||||
self.neutronclient.show_port_pair = mock.Mock(
|
||||
return_value=self._port_pair
|
||||
)
|
||||
|
||||
# Get the command object to test
|
||||
self.cmd = sfc_port_pair.ShowSfcPortPair(self.app, self.namespace)
|
||||
|
||||
def test_show_port_pair(self):
|
||||
client = self.app.client_manager.neutronclient
|
||||
mock_port_pair_show = client.show_port_pair
|
||||
arglist = [
|
||||
self._port_pair_id,
|
||||
]
|
||||
verifylist = [
|
||||
('port_pair', self._port_pair_id),
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
|
||||
columns, data = self.cmd.take_action(parsed_args)
|
||||
mock_port_pair_show.assert_called_once_with(self._port_pair_id)
|
||||
self.assertEqual(self.columns, columns)
|
||||
self.assertEqual(self.data, data)
|
424
neutronclient/tests/unit/osc/v2/sfc/test_port_pair_group.py
Executable file
424
neutronclient/tests/unit/osc/v2/sfc/test_port_pair_group.py
Executable file
@ -0,0 +1,424 @@
|
||||
# Copyright (c) 2017 Huawei Technologies India Pvt.Limited.
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
|
||||
from neutronclient.osc.v2.sfc import sfc_port_pair_group
|
||||
from neutronclient.tests.unit.osc.v2.sfc import fakes
|
||||
|
||||
|
||||
def _get_id(client, id_or_name, resource):
|
||||
return id_or_name
|
||||
|
||||
|
||||
class TestCreateSfcPortPairGroup(fakes.TestNeutronClientOSCV2):
|
||||
|
||||
_port_pair_group = fakes.FakeSfcPortPairGroup.create_port_pair_group()
|
||||
|
||||
columns = ('Description',
|
||||
'ID',
|
||||
'Loadbalance ID',
|
||||
'Name',
|
||||
'Port Pair',
|
||||
'Port Pair Group Parameters',
|
||||
'Project')
|
||||
|
||||
def get_data(self):
|
||||
return (
|
||||
self._port_pair_group['description'],
|
||||
self._port_pair_group['id'],
|
||||
self._port_pair_group['group_id'],
|
||||
self._port_pair_group['name'],
|
||||
self._port_pair_group['port_pairs'],
|
||||
self._port_pair_group['port_pair_group_parameters'],
|
||||
self._port_pair_group['project_id']
|
||||
)
|
||||
|
||||
def setUp(self):
|
||||
super(TestCreateSfcPortPairGroup, self).setUp()
|
||||
mock.patch(
|
||||
'neutronclient.osc.v2.sfc.sfc_port_pair_group._get_id',
|
||||
new=_get_id).start()
|
||||
self.neutronclient.create_port_pair_group = mock.Mock(
|
||||
return_value={'port_pair_group': self._port_pair_group})
|
||||
self.data = self.get_data()
|
||||
# Get the command object to test
|
||||
self.cmd = sfc_port_pair_group.CreateSfcPortPairGroup(self.app,
|
||||
self.namespace)
|
||||
|
||||
def test_create_port_pair_group_default_options(self):
|
||||
arglist = [
|
||||
"--port-pair", self._port_pair_group['port_pairs'],
|
||||
self._port_pair_group['name'],
|
||||
]
|
||||
verifylist = [
|
||||
('port_pairs', [self._port_pair_group['port_pairs']]),
|
||||
('name', self._port_pair_group['name']),
|
||||
]
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
columns, data = (self.cmd.take_action(parsed_args))
|
||||
self.neutronclient.create_port_pair_group.assert_called_once_with({
|
||||
'port_pair_group': {
|
||||
'name': self._port_pair_group['name'],
|
||||
'port_pairs': [self._port_pair_group['port_pairs']]}
|
||||
})
|
||||
self.assertEqual(self.columns, columns)
|
||||
self.assertEqual(self.data, data)
|
||||
|
||||
def test_create_port_pair_group(self):
|
||||
arglist = [
|
||||
"--description", self._port_pair_group['description'],
|
||||
"--port-pair", self._port_pair_group['port_pairs'],
|
||||
self._port_pair_group['name'],
|
||||
]
|
||||
verifylist = [
|
||||
('port_pairs', [self._port_pair_group['port_pairs']]),
|
||||
('name', self._port_pair_group['name']),
|
||||
('description', self._port_pair_group['description']),
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
columns, data = (self.cmd.take_action(parsed_args))
|
||||
|
||||
self.neutronclient.create_port_pair_group.assert_called_once_with({
|
||||
'port_pair_group': {
|
||||
'name': self._port_pair_group['name'],
|
||||
'port_pairs': [self._port_pair_group['port_pairs']],
|
||||
'description': self._port_pair_group['description'],
|
||||
}
|
||||
})
|
||||
self.assertEqual(self.columns, columns)
|
||||
self.assertEqual(self.data, data)
|
||||
|
||||
|
||||
class TestDeleteSfcPortPairGroup(fakes.TestNeutronClientOSCV2):
|
||||
|
||||
_port_pair_group = (fakes.FakeSfcPortPairGroup.create_port_pair_groups
|
||||
(count=1))
|
||||
|
||||
def setUp(self):
|
||||
super(TestDeleteSfcPortPairGroup, self).setUp()
|
||||
mock.patch(
|
||||
'neutronclient.osc.v2.sfc.sfc_port_pair_group._get_id',
|
||||
new=_get_id).start()
|
||||
self.neutronclient.delete_port_pair_group = mock.Mock(
|
||||
return_value=None)
|
||||
self.cmd = sfc_port_pair_group.DeleteSfcPortPairGroup(self.app,
|
||||
self.namespace)
|
||||
|
||||
def test_delete_port_pair_group(self):
|
||||
client = self.app.client_manager.neutronclient
|
||||
mock_port_pair_group_delete = client.delete_port_pair_group
|
||||
arglist = [
|
||||
self._port_pair_group[0]['id'],
|
||||
]
|
||||
verifylist = [
|
||||
('port_pair_group', self._port_pair_group[0]['id']),
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
result = self.cmd.take_action(parsed_args)
|
||||
mock_port_pair_group_delete.assert_called_once_with(
|
||||
self._port_pair_group[0]['id'])
|
||||
self.assertIsNone(result)
|
||||
|
||||
|
||||
class TestListSfcPortPairGroup(fakes.TestNeutronClientOSCV2):
|
||||
_ppgs = fakes.FakeSfcPortPairGroup.create_port_pair_groups(count=1)
|
||||
columns = ('ID', 'Name', 'Port Pair', 'Port Pair Group Parameters')
|
||||
columns_long = ('ID', 'Name', 'Port Pair', 'Port Pair Group Parameters',
|
||||
'Description', 'Loadbalance ID', 'Project')
|
||||
_port_pair_group = _ppgs[0]
|
||||
data = [
|
||||
_port_pair_group['id'],
|
||||
_port_pair_group['name'],
|
||||
_port_pair_group['port_pairs'],
|
||||
_port_pair_group['port_pair_group_parameters']
|
||||
]
|
||||
data_long = [
|
||||
_port_pair_group['id'],
|
||||
_port_pair_group['name'],
|
||||
_port_pair_group['port_pairs'],
|
||||
_port_pair_group['port_pair_group_parameters'],
|
||||
_port_pair_group['description']
|
||||
]
|
||||
_port_pair_group1 = {'port_pair_groups': _port_pair_group}
|
||||
_port_pair_id = _port_pair_group['id']
|
||||
|
||||
def setUp(self):
|
||||
super(TestListSfcPortPairGroup, self).setUp()
|
||||
mock.patch(
|
||||
'neutronclient.osc.v2.sfc.sfc_port_pair_group._get_id',
|
||||
new=_get_id).start()
|
||||
|
||||
self.neutronclient.list_port_pair_group = mock.Mock(
|
||||
return_value={'port_pair_groups': self._ppgs}
|
||||
)
|
||||
# Get the command object to test
|
||||
self.cmd = sfc_port_pair_group.ListSfcPortPairGroup(self.app,
|
||||
self.namespace)
|
||||
|
||||
def test_list_port_pair_group(self):
|
||||
arglist = []
|
||||
verifylist = []
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
columns = self.cmd.take_action(parsed_args)[0]
|
||||
ppgs = self.neutronclient.list_port_pair_group()['port_pair_groups']
|
||||
ppg = ppgs[0]
|
||||
data = [
|
||||
ppg['id'],
|
||||
ppg['name'],
|
||||
ppg['port_pairs'],
|
||||
ppg['port_pair_group_parameters']
|
||||
]
|
||||
self.assertEqual(list(self.columns), columns)
|
||||
self.assertEqual(self.data, data)
|
||||
|
||||
def test_list_with_long_option(self):
|
||||
arglist = ['--long']
|
||||
verifylist = [('long', True)]
|
||||
ppgs = self.neutronclient.list_port_pair_group()['port_pair_groups']
|
||||
ppg = ppgs[0]
|
||||
data = [
|
||||
ppg['id'],
|
||||
ppg['name'],
|
||||
ppg['port_pairs'],
|
||||
ppg['port_pair_group_parameters'],
|
||||
ppg['description']
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
columns_long = self.cmd.take_action(parsed_args)[0]
|
||||
self.assertEqual(list(self.columns_long), columns_long)
|
||||
self.assertEqual(self.data_long, data)
|
||||
|
||||
|
||||
class TestSetSfcPortPairGroup(fakes.TestNeutronClientOSCV2):
|
||||
_port_pair_group = fakes.FakeSfcPortPairGroup.create_port_pair_group()
|
||||
resource = _port_pair_group
|
||||
res = 'port_pair_group'
|
||||
_port_pair_group_name = _port_pair_group['name']
|
||||
ppg_pp = _port_pair_group['port_pairs']
|
||||
_port_pair_group_id = _port_pair_group['id']
|
||||
|
||||
def setUp(self):
|
||||
super(TestSetSfcPortPairGroup, self).setUp()
|
||||
|
||||
mock.patch(
|
||||
'neutronclient.osc.v2.sfc.sfc_port_pair_group._get_id',
|
||||
new=_get_id).start()
|
||||
self.neutronclient.update_port_pair_group = mock.Mock(
|
||||
return_value=None)
|
||||
self.mocked = self.neutronclient.update_port_pair_group
|
||||
self.cmd = sfc_port_pair_group.SetSfcPortPairGroup(self.app,
|
||||
self.namespace)
|
||||
|
||||
def test_set_port_pair_group(self):
|
||||
target = self.resource['id']
|
||||
port_pair1 = 'additional_port1'
|
||||
port_pair2 = 'additional_port2'
|
||||
|
||||
def _mock_port_pair_group(*args, **kwargs):
|
||||
|
||||
if self.neutronclient.find_resource.call_count == 1:
|
||||
self.neutronclient.find_resource.assert_called_with(
|
||||
'port_pair', port_pair1, cmd_resource='sfc_port_pair')
|
||||
return {'id': args[1]}
|
||||
|
||||
if self.neutronclient.find_resource.call_count == 2:
|
||||
self.neutronclient.find_resource.assert_called_with(
|
||||
'port_pair', port_pair2, cmd_resource='sfc_port_pair')
|
||||
return {'id': args[1]}
|
||||
|
||||
if self.neutronclient.find_resource.call_count == 3:
|
||||
self.neutronclient.find_resource.assert_called_with(
|
||||
self.res, target, cmd_resource='sfc_port_pair_group')
|
||||
return {'port_pairs': self.ppg_pp}
|
||||
|
||||
self.neutronclient.find_resource.side_effect = _mock_port_pair_group
|
||||
|
||||
arglist = [
|
||||
target,
|
||||
'--port-pair', port_pair1,
|
||||
'--port-pair', port_pair2,
|
||||
]
|
||||
verifylist = [
|
||||
(self.res, target),
|
||||
('port_pairs', [port_pair1, port_pair2])
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
result = self.cmd.take_action(parsed_args)
|
||||
expect = {'port_pairs': sorted([self.ppg_pp, port_pair1, port_pair2])}
|
||||
self.mocked.assert_called_once_with(target, {self.res: expect})
|
||||
self.assertEqual(3, self.neutronclient.find_resource.call_count)
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_set_no_port_pair(self):
|
||||
client = self.app.client_manager.neutronclient
|
||||
mock_port_pair_group_update = client.update_port_pair_group
|
||||
arglist = [
|
||||
self._port_pair_group_name,
|
||||
'--name', 'name_updated',
|
||||
'--description', 'desc_updated',
|
||||
'--no-port-pair',
|
||||
]
|
||||
verifylist = [
|
||||
('port_pair_group', self._port_pair_group_name),
|
||||
('name', 'name_updated'),
|
||||
('description', 'desc_updated'),
|
||||
('no_port_pair', True),
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
result = self.cmd.take_action(parsed_args)
|
||||
|
||||
attrs = {'port_pair_group': {'name': 'name_updated',
|
||||
'description': 'desc_updated',
|
||||
'port_pairs': []}}
|
||||
mock_port_pair_group_update.assert_called_once_with(
|
||||
self._port_pair_group_name, attrs)
|
||||
self.assertIsNone(result)
|
||||
|
||||
|
||||
class TestShowSfcPortPairGroup(fakes.TestNeutronClientOSCV2):
|
||||
|
||||
_ppg = fakes.FakeSfcPortPairGroup.create_port_pair_group()
|
||||
data = (
|
||||
_ppg['description'],
|
||||
_ppg['id'],
|
||||
_ppg['group_id'],
|
||||
_ppg['name'],
|
||||
_ppg['port_pairs'],
|
||||
_ppg['port_pair_group_parameters'],
|
||||
_ppg['project_id'])
|
||||
_port_pair_group = {'port_pair_group': _ppg}
|
||||
_port_pair_group_id = _ppg['id']
|
||||
columns = (
|
||||
'Description',
|
||||
'ID',
|
||||
'Loadbalance ID',
|
||||
'Name',
|
||||
'Port Pair',
|
||||
'Port Pair Group Parameters',
|
||||
'Project')
|
||||
|
||||
def setUp(self):
|
||||
super(TestShowSfcPortPairGroup, self).setUp()
|
||||
mock.patch(
|
||||
'neutronclient.osc.v2.sfc.sfc_port_pair_group._get_id',
|
||||
new=_get_id).start()
|
||||
|
||||
self.neutronclient.show_port_pair_group = mock.Mock(
|
||||
return_value=self._port_pair_group
|
||||
)
|
||||
self.cmd = sfc_port_pair_group.ShowSfcPortPairGroup(self.app,
|
||||
self.namespace)
|
||||
|
||||
def test_show_port_pair_group(self):
|
||||
client = self.app.client_manager.neutronclient
|
||||
mock_port_pair_group_show = client.show_port_pair_group
|
||||
arglist = [
|
||||
self._port_pair_group_id,
|
||||
]
|
||||
verifylist = [
|
||||
('port_pair_group', self._port_pair_group_id),
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
|
||||
columns, data = self.cmd.take_action(parsed_args)
|
||||
mock_port_pair_group_show.assert_called_once_with(
|
||||
self._port_pair_group_id)
|
||||
self.assertEqual(self.columns, columns)
|
||||
self.assertEqual(self.data, data)
|
||||
|
||||
|
||||
class TestUnsetSfcPortPairGroup(fakes.TestNeutronClientOSCV2):
|
||||
_port_pair_group = fakes.FakeSfcPortPairGroup.create_port_pair_group()
|
||||
resource = _port_pair_group
|
||||
res = 'port_pair_group'
|
||||
_port_pair_group_name = _port_pair_group['name']
|
||||
_port_pair_group_id = _port_pair_group['id']
|
||||
ppg_pp = _port_pair_group['port_pairs']
|
||||
|
||||
def setUp(self):
|
||||
super(TestUnsetSfcPortPairGroup, self).setUp()
|
||||
mock.patch(
|
||||
'neutronclient.osc.v2.sfc.sfc_port_pair_group._get_id',
|
||||
new=_get_id).start()
|
||||
self.neutronclient.update_port_pair_group = mock.Mock(
|
||||
return_value=None)
|
||||
self.mocked = self.neutronclient.update_port_pair_group
|
||||
self.cmd = sfc_port_pair_group.UnsetSfcPortPairGroup(
|
||||
self.app, self.namespace)
|
||||
|
||||
def test_unset_port_pair(self):
|
||||
target = self.resource['id']
|
||||
port_pair1 = 'additional_port1'
|
||||
port_pair2 = 'additional_port2'
|
||||
|
||||
def _mock_port_pair(*args, **kwargs):
|
||||
|
||||
if self.neutronclient.find_resource.call_count == 1:
|
||||
self.neutronclient.find_resource.assert_called_with(
|
||||
self.res, target, cmd_resource='sfc_port_pair_group')
|
||||
return {'port_pairs': self.ppg_pp}
|
||||
|
||||
if self.neutronclient.find_resource.call_count == 2:
|
||||
self.neutronclient.find_resource.assert_called_with(
|
||||
'port_pair', port_pair1, cmd_resource='sfc_port_pair')
|
||||
return {'id': args[1]}
|
||||
|
||||
if self.neutronclient.find_resource.call_count == 3:
|
||||
self.neutronclient.find_resource.assert_called_with(
|
||||
'port_pair', port_pair2, cmd_resource='sfc_port_pair')
|
||||
return {'id': args[1]}
|
||||
|
||||
if self.neutronclient.find_resource.call_count == 4:
|
||||
self.neutronclient.find_resource.assert_called_with(
|
||||
self.res, target, cmd_resource='sfc_port_pair_group')
|
||||
return {'id': args[1]}
|
||||
|
||||
self.neutronclient.find_resource.side_effect = _mock_port_pair
|
||||
|
||||
arglist = [
|
||||
target,
|
||||
'--port-pair', port_pair1,
|
||||
'--port-pair', port_pair2,
|
||||
]
|
||||
verifylist = [
|
||||
(self.res, target),
|
||||
('port_pairs', [port_pair1, port_pair2])
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
result = self.cmd.take_action(parsed_args)
|
||||
expect = {'port_pairs': sorted([self.ppg_pp])}
|
||||
self.mocked.assert_called_once_with(target, {self.res: expect})
|
||||
self.assertIsNone(result)
|
||||
|
||||
def test_unset_all_port_pair(self):
|
||||
client = self.app.client_manager.neutronclient
|
||||
mock_port_pair_group_update = client.update_port_pair_group
|
||||
arglist = [
|
||||
self._port_pair_group_name,
|
||||
'--all-port-pair',
|
||||
]
|
||||
verifylist = [
|
||||
('port_pair_group', self._port_pair_group_name),
|
||||
('all_port_pair', True),
|
||||
]
|
||||
parsed_args = self.check_parser(self.cmd, arglist, verifylist)
|
||||
result = self.cmd.take_action(parsed_args)
|
||||
|
||||
attrs = {'port_pair_group': {'port_pairs': []}}
|
||||
mock_port_pair_group_update.assert_called_once_with(
|
||||
self._port_pair_group_name, attrs)
|
||||
self.assertIsNone(result)
|
@ -509,6 +509,16 @@ class Client(ClientBase):
|
||||
security_group_path = "/security-groups/%s"
|
||||
security_group_rules_path = "/security-group-rules"
|
||||
security_group_rule_path = "/security-group-rules/%s"
|
||||
|
||||
sfc_flow_classifiers_path = "/sfc/flow_classifiers"
|
||||
sfc_flow_classifier_path = "/sfc/flow_classifiers/%s"
|
||||
sfc_port_pairs_path = "/sfc/port_pairs"
|
||||
sfc_port_pair_path = "/sfc/port_pairs/%s"
|
||||
sfc_port_pair_groups_path = "/sfc/port_pair_groups"
|
||||
sfc_port_pair_group_path = "/sfc/port_pair_groups/%s"
|
||||
sfc_port_chains_path = "/sfc/port_chains"
|
||||
sfc_port_chain_path = "/sfc/port_chains/%s"
|
||||
|
||||
endpoint_groups_path = "/vpn/endpoint-groups"
|
||||
endpoint_group_path = "/vpn/endpoint-groups/%s"
|
||||
vpnservices_path = "/vpn/vpnservices"
|
||||
@ -690,6 +700,10 @@ class Client(ClientBase):
|
||||
'bgpvpns': 'bgpvpn',
|
||||
'network_associations': 'network_association',
|
||||
'router_associations': 'router_association',
|
||||
'flow_classifiers': 'flow_classifier',
|
||||
'port_pairs': 'port_pair',
|
||||
'port_pair_groups': 'port_pair_group',
|
||||
'port_chains': 'port_chain',
|
||||
}
|
||||
|
||||
def list_ext(self, collection, path, retrieve_all, **_params):
|
||||
@ -2157,6 +2171,95 @@ class Client(ClientBase):
|
||||
return self.delete(
|
||||
self.bgpvpn_router_association_path % (bgpvpn, router_assoc))
|
||||
|
||||
def create_port_pair(self, body=None):
|
||||
"""Creates a new Port Pair."""
|
||||
return self.post(self.sfc_port_pairs_path, body=body)
|
||||
|
||||
def update_port_pair(self, port_pair, body=None):
|
||||
"""Update a Port Pair."""
|
||||
return self.put(self.sfc_port_pair_path % port_pair, body=body)
|
||||
|
||||
def delete_port_pair(self, port_pair):
|
||||
"""Deletes the specified Port Pair."""
|
||||
return self.delete(self.sfc_port_pair_path % (port_pair))
|
||||
|
||||
def list_port_pair(self, retrieve_all=True, **_params):
|
||||
"""Fetches a list of all Port Pairs."""
|
||||
return self.list('port_pairs', self.sfc_port_pairs_path, retrieve_all,
|
||||
**_params)
|
||||
|
||||
def show_port_pair(self, port_pair, **_params):
|
||||
"""Fetches information of a certain Port Pair."""
|
||||
return self.get(self.sfc_port_pair_path % (port_pair), params=_params)
|
||||
|
||||
def create_port_pair_group(self, body=None):
|
||||
"""Creates a new Port Pair Group."""
|
||||
return self.post(self.sfc_port_pair_groups_path, body=body)
|
||||
|
||||
def update_port_pair_group(self, port_pair_group, body=None):
|
||||
"""Update a Port Pair Group."""
|
||||
return self.put(self.sfc_port_pair_group_path % port_pair_group,
|
||||
body=body)
|
||||
|
||||
def delete_port_pair_group(self, port_pair_group):
|
||||
"""Deletes the specified Port Pair Group."""
|
||||
return self.delete(self.sfc_port_pair_group_path % (port_pair_group))
|
||||
|
||||
def list_port_pair_group(self, retrieve_all=True, **_params):
|
||||
"""Fetches a list of all Port Pair Groups."""
|
||||
return self.list('port_pair_groups', self.sfc_port_pair_groups_path,
|
||||
retrieve_all, **_params)
|
||||
|
||||
def show_port_pair_group(self, port_pair_group, **_params):
|
||||
"""Fetches information of a certain Port Pair Group."""
|
||||
return self.get(self.sfc_port_pair_group_path % (port_pair_group),
|
||||
params=_params)
|
||||
|
||||
def create_port_chain(self, body=None):
|
||||
"""Creates a new Port Chain."""
|
||||
return self.post(self.sfc_port_chains_path, body=body)
|
||||
|
||||
def update_port_chain(self, port_chain, body=None):
|
||||
"""Update a Port Chain."""
|
||||
return self.put(self.sfc_port_chain_path % port_chain, body=body)
|
||||
|
||||
def delete_port_chain(self, port_chain):
|
||||
"""Deletes the specified Port Chain."""
|
||||
return self.delete(self.sfc_port_chain_path % (port_chain))
|
||||
|
||||
def list_port_chain(self, retrieve_all=True, **_params):
|
||||
"""Fetches a list of all Port Chains."""
|
||||
return self.list('port_chains', self.sfc_port_chains_path,
|
||||
retrieve_all, **_params)
|
||||
|
||||
def show_port_chain(self, port_chain, **_params):
|
||||
"""Fetches information of a certain Port Chain."""
|
||||
return self.get(self.sfc_port_chain_path % (port_chain),
|
||||
params=_params)
|
||||
|
||||
def create_flow_classifier(self, body=None):
|
||||
"""Creates a new Flow Classifier."""
|
||||
return self.post(self.sfc_flow_classifiers_path, body=body)
|
||||
|
||||
def update_flow_classifier(self, flow_classifier, body=None):
|
||||
"""Update a Flow Classifier."""
|
||||
return self.put(self.sfc_flow_classifier_path % flow_classifier,
|
||||
body=body)
|
||||
|
||||
def delete_flow_classifier(self, flow_classifier):
|
||||
"""Deletes the specified Flow Classifier."""
|
||||
return self.delete(self.sfc_flow_classifier_path % (flow_classifier))
|
||||
|
||||
def list_flow_classifier(self, retrieve_all=True, **_params):
|
||||
"""Fetches a list of all Flow Classifiers."""
|
||||
return self.list('flow_classifiers', self.sfc_flow_classifiers_path,
|
||||
retrieve_all, **_params)
|
||||
|
||||
def show_flow_classifier(self, flow_classifier, **_params):
|
||||
"""Fetches information of a certain Flow Classifier."""
|
||||
return self.get(self.sfc_flow_classifier_path % (flow_classifier),
|
||||
params=_params)
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
"""Initialize a new client for the Neutron v2.0 API."""
|
||||
super(Client, self).__init__(**kwargs)
|
||||
|
5
releasenotes/notes/add-sfc-commands.yaml
Normal file
5
releasenotes/notes/add-sfc-commands.yaml
Normal file
@ -0,0 +1,5 @@
|
||||
---
|
||||
features:
|
||||
- |
|
||||
Add OSC plugin support for the “Networking Service Function Chaining” feature commands along with client bindings.
|
||||
[Blueprint `openstackclient-cli-porting <https://blueprints.launchpad.net/networking-sfc/+spec/openstackclient-cli-porting>`_]
|
22
setup.cfg
22
setup.cfg
@ -42,6 +42,28 @@ openstack.neutronclient.v2 =
|
||||
network_trunk_set = neutronclient.osc.v2.trunk.network_trunk:SetNetworkTrunk
|
||||
network_trunk_show = neutronclient.osc.v2.trunk.network_trunk:ShowNetworkTrunk
|
||||
network_trunk_unset = neutronclient.osc.v2.trunk.network_trunk:UnsetNetworkTrunk
|
||||
sfc_flow_classifier_create = neutronclient.osc.v2.sfc.sfc_flow_classifier:CreateSfcFlowClassifier
|
||||
sfc_flow_classifier_delete = neutronclient.osc.v2.sfc.sfc_flow_classifier:DeleteSfcFlowClassifier
|
||||
sfc_flow_classifier_list = neutronclient.osc.v2.sfc.sfc_flow_classifier:ListSfcFlowClassifier
|
||||
sfc_flow_classifier_set = neutronclient.osc.v2.sfc.sfc_flow_classifier:SetSfcFlowClassifier
|
||||
sfc_flow_classifier_show = neutronclient.osc.v2.sfc.sfc_flow_classifier:ShowSfcFlowClassifier
|
||||
sfc_port_chain_create = neutronclient.osc.v2.sfc.sfc_port_chain:CreateSfcPortChain
|
||||
sfc_port_chain_delete = neutronclient.osc.v2.sfc.sfc_port_chain:DeleteSfcPortChain
|
||||
sfc_port_chain_list = neutronclient.osc.v2.sfc.sfc_port_chain:ListSfcPortChain
|
||||
sfc_port_chain_set = neutronclient.osc.v2.sfc.sfc_port_chain:SetSfcPortChain
|
||||
sfc_port_chain_show = neutronclient.osc.v2.sfc.sfc_port_chain:ShowSfcPortChain
|
||||
sfc_port_chain_unset = neutronclient.osc.v2.sfc.sfc_port_chain:UnsetSfcPortChain
|
||||
sfc_port_pair_create = neutronclient.osc.v2.sfc.sfc_port_pair:CreateSfcPortPair
|
||||
sfc_port_pair_delete = neutronclient.osc.v2.sfc.sfc_port_pair:DeleteSfcPortPair
|
||||
sfc_port_pair_list = neutronclient.osc.v2.sfc.sfc_port_pair:ListSfcPortPair
|
||||
sfc_port_pair_set = neutronclient.osc.v2.sfc.sfc_port_pair:SetSfcPortPair
|
||||
sfc_port_pair_show = neutronclient.osc.v2.sfc.sfc_port_pair:ShowSfcPortPair
|
||||
sfc_port_pair_group_create = neutronclient.osc.v2.sfc.sfc_port_pair_group:CreateSfcPortPairGroup
|
||||
sfc_port_pair_group_delete = neutronclient.osc.v2.sfc.sfc_port_pair_group:DeleteSfcPortPairGroup
|
||||
sfc_port_pair_group_list = neutronclient.osc.v2.sfc.sfc_port_pair_group:ListSfcPortPairGroup
|
||||
sfc_port_pair_group_set = neutronclient.osc.v2.sfc.sfc_port_pair_group:SetSfcPortPairGroup
|
||||
sfc_port_pair_group_show = neutronclient.osc.v2.sfc.sfc_port_pair_group:ShowSfcPortPairGroup
|
||||
sfc_port_pair_group_unset = neutronclient.osc.v2.sfc.sfc_port_pair_group:UnsetSfcPortPairGroup
|
||||
|
||||
firewall_group_create = neutronclient.osc.v2.fwaas.firewallgroup:CreateFirewallGroup
|
||||
firewall_group_delete = neutronclient.osc.v2.fwaas.firewallgroup:DeleteFirewallGroup
|
||||
|
Loading…
x
Reference in New Issue
Block a user