Merge "Add Tap-as-a-Service client code"
This commit is contained in:
0
neutronclient/osc/v2/taas/__init__.py
Normal file
0
neutronclient/osc/v2/taas/__init__.py
Normal file
225
neutronclient/osc/v2/taas/tap_flow.py
Normal file
225
neutronclient/osc/v2/taas/tap_flow.py
Normal file
@@ -0,0 +1,225 @@
|
||||
# All Rights Reserved 2020
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
|
||||
from osc_lib.cli import format_columns
|
||||
from osc_lib.cli import identity as identity_utils
|
||||
from osc_lib.command import command
|
||||
from osc_lib import exceptions
|
||||
from osc_lib import utils as osc_utils
|
||||
from osc_lib.utils import columns as column_util
|
||||
|
||||
from neutronclient._i18n import _
|
||||
from neutronclient.osc.v2.taas import tap_service
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
TAP_FLOW = 'tap_flow'
|
||||
TAP_FLOWS = '%ss' % TAP_FLOW
|
||||
|
||||
_attr_map = (
|
||||
('id', 'ID', column_util.LIST_BOTH),
|
||||
('tenant_id', 'Tenant', column_util.LIST_LONG_ONLY),
|
||||
('name', 'Name', column_util.LIST_BOTH),
|
||||
('status', 'Status', column_util.LIST_BOTH),
|
||||
('source_port', 'source_port', column_util.LIST_BOTH),
|
||||
('tap_service_id', 'tap_service_id', column_util.LIST_BOTH),
|
||||
('direction', 'Direction', column_util.LIST_BOTH),
|
||||
)
|
||||
|
||||
_formatters = {
|
||||
'vlan_filter': format_columns.ListColumn,
|
||||
}
|
||||
|
||||
|
||||
def _add_updatable_args(parser):
|
||||
parser.add_argument(
|
||||
'--name',
|
||||
help=_('Name of this Tap service.'))
|
||||
parser.add_argument(
|
||||
'--description',
|
||||
help=_('Description for this Tap service.'))
|
||||
|
||||
|
||||
class CreateTapFlow(command.ShowOne):
|
||||
_description = _("Create a tap flow")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super().get_parser(prog_name)
|
||||
identity_utils.add_project_owner_option_to_parser(parser)
|
||||
_add_updatable_args(parser)
|
||||
parser.add_argument(
|
||||
'--port',
|
||||
required=True,
|
||||
metavar="SOURCE_PORT",
|
||||
help=_('Source port to which the Tap Flow is connected.'))
|
||||
parser.add_argument(
|
||||
'--tap-service',
|
||||
required=True,
|
||||
metavar="TAP_SERVICE",
|
||||
help=_('Tap Service to which the Tap Flow belongs.'))
|
||||
parser.add_argument(
|
||||
'--direction',
|
||||
required=True,
|
||||
metavar="DIRECTION",
|
||||
choices=['IN', 'OUT', 'BOTH'],
|
||||
type=lambda s: s.upper(),
|
||||
help=_('Direction of the Tap flow. Possible options are: '
|
||||
'IN, OUT, BOTH'))
|
||||
parser.add_argument(
|
||||
'--vlan-filter',
|
||||
required=False,
|
||||
metavar="VLAN_FILTER",
|
||||
help=_('VLAN Ids to be mirrored in the form of range string.'))
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.network
|
||||
attrs = {}
|
||||
if parsed_args.name is not None:
|
||||
attrs['name'] = str(parsed_args.name)
|
||||
if parsed_args.description is not None:
|
||||
attrs['description'] = str(parsed_args.description)
|
||||
if parsed_args.port is not None:
|
||||
source_port = client.find_port(parsed_args.port)['id']
|
||||
attrs['source_port'] = source_port
|
||||
if parsed_args.tap_service is not None:
|
||||
tap_service_id = client.find_tap_service(
|
||||
parsed_args.tap_service)['id']
|
||||
attrs['tap_service_id'] = tap_service_id
|
||||
if parsed_args.direction is not None:
|
||||
attrs['direction'] = parsed_args.direction
|
||||
if parsed_args.vlan_filter is not None:
|
||||
attrs['vlan_filter'] = parsed_args.vlan_filter
|
||||
if 'project' in parsed_args and parsed_args.project is not None:
|
||||
project_id = identity_utils.find_project(
|
||||
self.app.client_manager.identity,
|
||||
parsed_args.project,
|
||||
parsed_args.project_domain,
|
||||
).id
|
||||
attrs['tenant_id'] = project_id
|
||||
obj = client.create_tap_flow(**attrs)
|
||||
display_columns, columns = tap_service._get_columns(obj)
|
||||
data = osc_utils.get_dict_properties(obj, columns)
|
||||
return display_columns, data
|
||||
|
||||
|
||||
class ListTapFlow(command.Lister):
|
||||
_description = _("List tap flows that belong to a given tenant")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super().get_parser(prog_name)
|
||||
identity_utils.add_project_owner_option_to_parser(parser)
|
||||
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.network
|
||||
params = {}
|
||||
if parsed_args.project is not None:
|
||||
project_id = identity_utils.find_project(
|
||||
self.app.client_manager.identity,
|
||||
parsed_args.project,
|
||||
parsed_args.project_domain,
|
||||
).id
|
||||
params['tenant_id'] = project_id
|
||||
objs = client.tap_flows(retrieve_all=True, params=params)
|
||||
headers, columns = column_util.get_column_definitions(
|
||||
_attr_map, long_listing=True)
|
||||
return (headers, (osc_utils.get_dict_properties(
|
||||
s, columns, formatters=_formatters) for s in objs))
|
||||
|
||||
|
||||
class ShowTapFlow(command.ShowOne):
|
||||
_description = _("Show information of a given tap flow")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super().get_parser(prog_name)
|
||||
parser.add_argument(
|
||||
TAP_FLOW,
|
||||
metavar="<%s>" % TAP_FLOW,
|
||||
help=_("ID or name of tap flow to look up."),
|
||||
)
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.network
|
||||
id = client.find_tap_flow(parsed_args.tap_flow,
|
||||
ignore_missing=False).id
|
||||
obj = client.get_tap_flow(id)
|
||||
display_columns, columns = tap_service._get_columns(obj)
|
||||
data = osc_utils.get_dict_properties(obj, columns)
|
||||
return display_columns, data
|
||||
|
||||
|
||||
class DeleteTapFlow(command.Command):
|
||||
_description = _("Delete a tap flow")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super().get_parser(prog_name)
|
||||
parser.add_argument(
|
||||
TAP_FLOW,
|
||||
metavar="<%s>" % TAP_FLOW,
|
||||
nargs="+",
|
||||
help=_("ID(s) or name(s) of tap flow to delete."),
|
||||
)
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.network
|
||||
fails = 0
|
||||
for id_or_name in parsed_args.tap_flow:
|
||||
try:
|
||||
id = client.find_tap_flow(id_or_name,
|
||||
ignore_missing=False).id
|
||||
client.delete_tap_flow(id)
|
||||
LOG.warning("Tap flow %(id)s deleted", {'id': id})
|
||||
except Exception as e:
|
||||
fails += 1
|
||||
LOG.error("Failed to delete tap flow with name or ID "
|
||||
"'%(id_or_name)s': %(e)s",
|
||||
{'id_or_name': id_or_name, 'e': e})
|
||||
if fails > 0:
|
||||
msg = (_("Failed to delete %(fails)s of %(total)s tap flow.") %
|
||||
{'fails': fails, 'total': len(parsed_args.tap_flow)})
|
||||
raise exceptions.CommandError(msg)
|
||||
|
||||
|
||||
class UpdateTapFlow(command.ShowOne):
|
||||
_description = _("Update a tap flow.")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super().get_parser(prog_name)
|
||||
parser.add_argument(
|
||||
TAP_FLOW,
|
||||
metavar="<%s>" % TAP_FLOW,
|
||||
help=_("ID or name of tap flow to update."),
|
||||
)
|
||||
_add_updatable_args(parser)
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.network
|
||||
original_t_f = client.find_tap_flow(parsed_args.tap_flow,
|
||||
ignore_missing=False).id
|
||||
attrs = {}
|
||||
if parsed_args.name is not None:
|
||||
attrs['name'] = str(parsed_args.name)
|
||||
if parsed_args.description is not None:
|
||||
attrs['description'] = str(parsed_args.description)
|
||||
obj = client.update_tap_flow(original_t_f, **attrs)
|
||||
columns, display_columns = column_util.get_columns(obj, _attr_map)
|
||||
data = osc_utils.get_dict_properties(obj, columns)
|
||||
return display_columns, data
|
222
neutronclient/osc/v2/taas/tap_mirror.py
Normal file
222
neutronclient/osc/v2/taas/tap_mirror.py
Normal file
@@ -0,0 +1,222 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
|
||||
from osc_lib.cli import identity as identity_utils
|
||||
from osc_lib.command import command
|
||||
from osc_lib import exceptions
|
||||
from osc_lib import utils as osc_utils
|
||||
from osc_lib.utils import columns as column_util
|
||||
|
||||
from openstackclient.network.v2 import port as osc_port
|
||||
|
||||
from neutronclient._i18n import _
|
||||
from neutronclient.osc.v2.taas import tap_service
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
TAP_MIRROR = 'tap_mirror'
|
||||
TAP_MIRRORS = '%ss' % TAP_MIRROR
|
||||
|
||||
_attr_map = (
|
||||
('id', 'ID', column_util.LIST_BOTH),
|
||||
('tenant_id', 'Tenant', column_util.LIST_LONG_ONLY),
|
||||
('name', 'Name', column_util.LIST_BOTH),
|
||||
('port_id', 'Port', column_util.LIST_BOTH),
|
||||
('directions', 'Directions', column_util.LIST_LONG_ONLY),
|
||||
('remote_ip', 'Remote IP', column_util.LIST_BOTH),
|
||||
('mirror_type', 'Mirror Type', column_util.LIST_LONG_ONLY),
|
||||
)
|
||||
|
||||
|
||||
def _get_columns(item):
|
||||
column_map = {}
|
||||
hidden_columns = ['location', 'tenant_id']
|
||||
return osc_utils.get_osc_show_columns_for_sdk_resource(
|
||||
item,
|
||||
column_map,
|
||||
hidden_columns
|
||||
)
|
||||
|
||||
|
||||
class CreateTapMirror(command.ShowOne):
|
||||
_description = _("Create a Tap Mirror")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super().get_parser(prog_name)
|
||||
identity_utils.add_project_owner_option_to_parser(parser)
|
||||
tap_service._add_updatable_args(parser)
|
||||
parser.add_argument(
|
||||
'--port',
|
||||
dest='port_id',
|
||||
required=True,
|
||||
metavar="PORT",
|
||||
help=_('Port to which the Tap Mirror is connected.'))
|
||||
parser.add_argument(
|
||||
'--directions',
|
||||
dest='directions',
|
||||
action=osc_port.JSONKeyValueAction,
|
||||
required=True,
|
||||
help=_('A dictionary of direction and tunnel_id. Direction can '
|
||||
'be IN and OUT.'))
|
||||
parser.add_argument(
|
||||
'--remote-ip',
|
||||
dest='remote_ip',
|
||||
required=True,
|
||||
help=_('The remote IP of the Tap Mirror, this will be the '
|
||||
'remote end of the GRE or ERSPAN v1 tunnel'))
|
||||
parser.add_argument(
|
||||
'--mirror-type',
|
||||
dest='mirror_type',
|
||||
required=True,
|
||||
help=_('The type of the mirroring, it can be gre or erspanv1'))
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.network
|
||||
attrs = {}
|
||||
if parsed_args.name is not None:
|
||||
attrs['name'] = str(parsed_args.name)
|
||||
if parsed_args.description is not None:
|
||||
attrs['description'] = str(parsed_args.description)
|
||||
if parsed_args.port_id is not None:
|
||||
port_id = client.find_port(parsed_args.port_id)['id']
|
||||
attrs['port_id'] = port_id
|
||||
if parsed_args.directions is not None:
|
||||
attrs['directions'] = parsed_args.directions
|
||||
if parsed_args.remote_ip is not None:
|
||||
attrs['remote_ip'] = parsed_args.remote_ip
|
||||
if parsed_args.mirror_type is not None:
|
||||
attrs['mirror_type'] = parsed_args.mirror_type
|
||||
if 'project' in parsed_args and parsed_args.project is not None:
|
||||
project_id = identity_utils.find_project(
|
||||
self.app.client_manager.identity,
|
||||
parsed_args.project,
|
||||
parsed_args.project_domain,
|
||||
).id
|
||||
attrs['tenant_id'] = project_id
|
||||
obj = client.create_tap_mirror(**attrs)
|
||||
display_columns, columns = tap_service._get_columns(obj)
|
||||
data = osc_utils.get_dict_properties(obj, columns)
|
||||
return display_columns, data
|
||||
|
||||
|
||||
class ListTapMirror(command.Lister):
|
||||
_description = _("List Tap Mirrors that belong to a given tenant")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super().get_parser(prog_name)
|
||||
identity_utils.add_project_owner_option_to_parser(parser)
|
||||
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.network
|
||||
params = {}
|
||||
if parsed_args.project is not None:
|
||||
project_id = identity_utils.find_project(
|
||||
self.app.client_manager.identity,
|
||||
parsed_args.project,
|
||||
parsed_args.project_domain,
|
||||
).id
|
||||
params['tenant_id'] = project_id
|
||||
objs = client.tap_mirrors(retrieve_all=True, params=params)
|
||||
headers, columns = column_util.get_column_definitions(
|
||||
_attr_map, long_listing=True)
|
||||
return (headers, (osc_utils.get_dict_properties(
|
||||
s, columns) for s in objs))
|
||||
|
||||
|
||||
class ShowTapMirror(command.ShowOne):
|
||||
_description = _("Show information of a given Tap Mirror")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super().get_parser(prog_name)
|
||||
parser.add_argument(
|
||||
TAP_MIRROR,
|
||||
metavar="<%s>" % TAP_MIRROR,
|
||||
help=_("ID or name of Tap Mirror to look up."),
|
||||
)
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.network
|
||||
id = client.find_tap_mirror(parsed_args.tap_mirror,
|
||||
ignore_missing=False).id
|
||||
obj = client.get_tap_mirror(id)
|
||||
display_columns, columns = tap_service._get_columns(obj)
|
||||
data = osc_utils.get_dict_properties(obj, columns)
|
||||
return display_columns, data
|
||||
|
||||
|
||||
class DeleteTapMirror(command.Command):
|
||||
_description = _("Delete a Tap Mirror")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super().get_parser(prog_name)
|
||||
parser.add_argument(
|
||||
TAP_MIRROR,
|
||||
metavar="<%s>" % TAP_MIRROR,
|
||||
nargs="+",
|
||||
help=_("ID(s) or name(s) of the Tap Mirror to delete."),
|
||||
)
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.network
|
||||
fails = 0
|
||||
for id_or_name in parsed_args.tap_mirror:
|
||||
try:
|
||||
id = client.find_tap_mirror(id_or_name,
|
||||
ignore_missing=False).id
|
||||
|
||||
client.delete_tap_mirror(id)
|
||||
LOG.warning("Tap Mirror %(id)s deleted", {'id': id})
|
||||
except Exception as e:
|
||||
fails += 1
|
||||
LOG.error("Failed to delete Tap Mirror with name or ID "
|
||||
"'%(id_or_name)s': %(e)s",
|
||||
{'id_or_name': id_or_name, 'e': e})
|
||||
if fails > 0:
|
||||
msg = (_("Failed to delete %(fails)s of %(total)s Tap Mirror.") %
|
||||
{'fails': fails, 'total': len(parsed_args.tap_mirror)})
|
||||
raise exceptions.CommandError(msg)
|
||||
|
||||
|
||||
class UpdateTapMirror(command.ShowOne):
|
||||
_description = _("Update a Tap Mirror.")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super().get_parser(prog_name)
|
||||
parser.add_argument(
|
||||
TAP_MIRROR,
|
||||
metavar="<%s>" % TAP_MIRROR,
|
||||
help=_("ID or name of the Tap Mirror to update."),
|
||||
)
|
||||
tap_service._add_updatable_args(parser)
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.network
|
||||
original_t_s = client.find_tap_mirror(parsed_args.tap_mirror,
|
||||
ignore_missing=False).id
|
||||
attrs = {}
|
||||
if parsed_args.name is not None:
|
||||
attrs['name'] = str(parsed_args.name)
|
||||
if parsed_args.description is not None:
|
||||
attrs['description'] = str(parsed_args.description)
|
||||
obj = client.update_tap_mirror(original_t_s, **attrs)
|
||||
display_columns, columns = tap_service._get_columns(obj)
|
||||
data = osc_utils.get_dict_properties(obj, columns)
|
||||
return display_columns, data
|
211
neutronclient/osc/v2/taas/tap_service.py
Normal file
211
neutronclient/osc/v2/taas/tap_service.py
Normal file
@@ -0,0 +1,211 @@
|
||||
# All Rights Reserved 2020
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
|
||||
from osc_lib.cli import identity as identity_utils
|
||||
from osc_lib.command import command
|
||||
from osc_lib import exceptions
|
||||
from osc_lib import utils as osc_utils
|
||||
from osc_lib.utils import columns as column_util
|
||||
|
||||
from neutronclient._i18n import _
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
TAP_SERVICE = 'tap_service'
|
||||
TAP_SERVICES = '%ss' % TAP_SERVICE
|
||||
|
||||
_attr_map = (
|
||||
('id', 'ID', column_util.LIST_BOTH),
|
||||
('tenant_id', 'Tenant', column_util.LIST_LONG_ONLY),
|
||||
('name', 'Name', column_util.LIST_BOTH),
|
||||
('port_id', 'Port', column_util.LIST_BOTH),
|
||||
('status', 'Status', column_util.LIST_BOTH),
|
||||
)
|
||||
|
||||
|
||||
def _add_updatable_args(parser):
|
||||
parser.add_argument(
|
||||
'--name',
|
||||
help=_('Name of this Tap service.'))
|
||||
parser.add_argument(
|
||||
'--description',
|
||||
help=_('Description for this Tap service.'))
|
||||
|
||||
|
||||
def _updatable_args2body(parsed_args, body):
|
||||
for attribute in ['name', 'description']:
|
||||
if (hasattr(parsed_args, attribute) and
|
||||
getattr(parsed_args, attribute) is not None):
|
||||
body[attribute] = getattr(parsed_args, attribute)
|
||||
|
||||
|
||||
def _get_columns(item):
|
||||
column_map = {}
|
||||
hidden_columns = ['location', 'tenant_id']
|
||||
return osc_utils.get_osc_show_columns_for_sdk_resource(
|
||||
item,
|
||||
column_map,
|
||||
hidden_columns
|
||||
)
|
||||
|
||||
|
||||
class CreateTapService(command.ShowOne):
|
||||
_description = _("Create a tap service")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super().get_parser(prog_name)
|
||||
identity_utils.add_project_owner_option_to_parser(parser)
|
||||
_add_updatable_args(parser)
|
||||
parser.add_argument(
|
||||
'--port',
|
||||
dest='port_id',
|
||||
required=True,
|
||||
metavar="PORT",
|
||||
help=_('Port to which the Tap service is connected.'))
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.network
|
||||
attrs = {}
|
||||
if parsed_args.name is not None:
|
||||
attrs['name'] = str(parsed_args.name)
|
||||
if parsed_args.description is not None:
|
||||
attrs['description'] = str(parsed_args.description)
|
||||
if parsed_args.port_id is not None:
|
||||
port_id = client.find_port(parsed_args.port_id)['id']
|
||||
attrs['port_id'] = port_id
|
||||
if 'project' in parsed_args and parsed_args.project is not None:
|
||||
project_id = identity_utils.find_project(
|
||||
self.app.client_manager.identity,
|
||||
parsed_args.project,
|
||||
parsed_args.project_domain,
|
||||
).id
|
||||
attrs['tenant_id'] = project_id
|
||||
obj = client.create_tap_service(**attrs)
|
||||
display_columns, columns = _get_columns(obj)
|
||||
data = osc_utils.get_dict_properties(obj, columns)
|
||||
return display_columns, data
|
||||
|
||||
|
||||
class ListTapService(command.Lister):
|
||||
_description = _("List tap services that belong to a given tenant")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super().get_parser(prog_name)
|
||||
identity_utils.add_project_owner_option_to_parser(parser)
|
||||
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.network
|
||||
params = {}
|
||||
if parsed_args.project is not None:
|
||||
project_id = identity_utils.find_project(
|
||||
self.app.client_manager.identity,
|
||||
parsed_args.project,
|
||||
parsed_args.project_domain,
|
||||
).id
|
||||
params['tenant_id'] = project_id
|
||||
objs = client.tap_services(retrieve_all=True, params=params)
|
||||
headers, columns = column_util.get_column_definitions(
|
||||
_attr_map, long_listing=True)
|
||||
return (headers, (osc_utils.get_dict_properties(
|
||||
s, columns) for s in objs))
|
||||
|
||||
|
||||
class ShowTapService(command.ShowOne):
|
||||
_description = _("Show information of a given tap service")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super().get_parser(prog_name)
|
||||
parser.add_argument(
|
||||
TAP_SERVICE,
|
||||
metavar="<%s>" % TAP_SERVICE,
|
||||
help=_("ID or name of tap service to look up."),
|
||||
)
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.network
|
||||
id = client.find_tap_service(parsed_args.tap_service,
|
||||
ignore_missing=False).id
|
||||
obj = client.get_tap_service(id)
|
||||
display_columns, columns = _get_columns(obj)
|
||||
data = osc_utils.get_dict_properties(obj, columns)
|
||||
return display_columns, data
|
||||
|
||||
|
||||
class DeleteTapService(command.Command):
|
||||
_description = _("Delete a tap service")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super().get_parser(prog_name)
|
||||
parser.add_argument(
|
||||
TAP_SERVICE,
|
||||
metavar="<%s>" % TAP_SERVICE,
|
||||
nargs="+",
|
||||
help=_("ID(s) or name(s) of tap service to delete."),
|
||||
)
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.network
|
||||
fails = 0
|
||||
for id_or_name in parsed_args.tap_service:
|
||||
try:
|
||||
id = client.find_tap_service(id_or_name,
|
||||
ignore_missing=False).id
|
||||
|
||||
client.delete_tap_service(id)
|
||||
LOG.warning("Tap service %(id)s deleted", {'id': id})
|
||||
except Exception as e:
|
||||
fails += 1
|
||||
LOG.error("Failed to delete tap service with name or ID "
|
||||
"'%(id_or_name)s': %(e)s",
|
||||
{'id_or_name': id_or_name, 'e': e})
|
||||
if fails > 0:
|
||||
msg = (_("Failed to delete %(fails)s of %(total)s tap service.") %
|
||||
{'fails': fails, 'total': len(parsed_args.tap_service)})
|
||||
raise exceptions.CommandError(msg)
|
||||
|
||||
|
||||
class UpdateTapService(command.ShowOne):
|
||||
_description = _("Update a tap service.")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super().get_parser(prog_name)
|
||||
parser.add_argument(
|
||||
TAP_SERVICE,
|
||||
metavar="<%s>" % TAP_SERVICE,
|
||||
help=_("ID or name of tap service to update."),
|
||||
)
|
||||
_add_updatable_args(parser)
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.network
|
||||
original_t_s = client.find_tap_service(parsed_args.tap_service,
|
||||
ignore_missing=False).id
|
||||
attrs = {}
|
||||
if parsed_args.name is not None:
|
||||
attrs['name'] = str(parsed_args.name)
|
||||
if parsed_args.description is not None:
|
||||
attrs['description'] = str(parsed_args.description)
|
||||
obj = client.update_tap_service(original_t_s, **attrs)
|
||||
display_columns, columns = _get_columns(obj)
|
||||
data = osc_utils.get_dict_properties(obj, columns)
|
||||
return display_columns, data
|
0
neutronclient/tests/unit/osc/v2/taas/__init__.py
Normal file
0
neutronclient/tests/unit/osc/v2/taas/__init__.py
Normal file
122
neutronclient/tests/unit/osc/v2/taas/fakes.py
Normal file
122
neutronclient/tests/unit/osc/v2/taas/fakes.py
Normal file
@@ -0,0 +1,122 @@
|
||||
# All Rights Reserved 2020
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import copy
|
||||
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
|
||||
class FakeTapService:
|
||||
|
||||
@staticmethod
|
||||
def create_tap_service(attrs=None):
|
||||
"""Create a fake tap service."""
|
||||
attrs = attrs or {}
|
||||
tap_service_attrs = {
|
||||
'id': uuidutils.generate_uuid(),
|
||||
'tenant_id': uuidutils.generate_uuid(),
|
||||
'name': 'test_tap_service' + uuidutils.generate_uuid(),
|
||||
'status': 'ACTIVE',
|
||||
}
|
||||
tap_service_attrs.update(attrs)
|
||||
return copy.deepcopy(tap_service_attrs)
|
||||
|
||||
@staticmethod
|
||||
def create_tap_services(attrs=None, count=1):
|
||||
"""Create multiple fake tap services."""
|
||||
|
||||
tap_services = []
|
||||
for i in range(0, count):
|
||||
if attrs is None:
|
||||
attrs = {'id': 'fake_id%d' % i}
|
||||
elif getattr(attrs, 'id', None) is None:
|
||||
attrs['id'] = 'fake_id%d' % i
|
||||
tap_services.append(FakeTapService.create_tap_service(
|
||||
attrs=attrs))
|
||||
|
||||
return tap_services
|
||||
|
||||
|
||||
class FakeTapFlow:
|
||||
|
||||
@staticmethod
|
||||
def create_tap_flow(attrs=None):
|
||||
"""Create a fake tap service."""
|
||||
attrs = attrs or {}
|
||||
tap_flow_attrs = {
|
||||
'id': uuidutils.generate_uuid(),
|
||||
'tenant_id': uuidutils.generate_uuid(),
|
||||
'name': 'test_tap_flow' + uuidutils.generate_uuid(),
|
||||
'status': 'ACTIVE',
|
||||
'direction': 'BOTH',
|
||||
}
|
||||
tap_flow_attrs.update(attrs)
|
||||
return copy.deepcopy(tap_flow_attrs)
|
||||
|
||||
@staticmethod
|
||||
def create_tap_flows(attrs=None, count=1):
|
||||
"""Create multiple fake tap flows."""
|
||||
|
||||
tap_flows = []
|
||||
for i in range(0, count):
|
||||
if attrs is None:
|
||||
attrs = {
|
||||
'id': 'fake_id%d' % i,
|
||||
'source_port': uuidutils.generate_uuid(),
|
||||
'tap_service_id': uuidutils.generate_uuid()
|
||||
}
|
||||
elif getattr(attrs, 'id', None) is None:
|
||||
attrs['id'] = 'fake_id%d' % i
|
||||
tap_flows.append(FakeTapFlow.create_tap_flow(attrs=attrs))
|
||||
|
||||
return tap_flows
|
||||
|
||||
|
||||
class FakeTapMirror(object):
|
||||
|
||||
@staticmethod
|
||||
def create_tap_mirror(attrs=None):
|
||||
"""Create a fake tap mirror."""
|
||||
attrs = attrs or {}
|
||||
tap_mirror_attrs = {
|
||||
'id': uuidutils.generate_uuid(),
|
||||
'tenant_id': uuidutils.generate_uuid(),
|
||||
'name': 'test_tap_mirror' + uuidutils.generate_uuid(),
|
||||
'port_id': uuidutils.generate_uuid(),
|
||||
'directions': 'IN=99',
|
||||
'remote_ip': '192.10.10.2',
|
||||
'mirror_type': 'gre',
|
||||
}
|
||||
tap_mirror_attrs.update(attrs)
|
||||
return copy.deepcopy(tap_mirror_attrs)
|
||||
|
||||
@staticmethod
|
||||
def create_tap_mirrors(attrs=None, count=1):
|
||||
"""Create multiple fake tap mirrors."""
|
||||
|
||||
tap_mirrors = []
|
||||
for i in range(0, count):
|
||||
if attrs is None:
|
||||
attrs = {
|
||||
'id': 'fake_id%d' % i,
|
||||
'port_id': uuidutils.generate_uuid(),
|
||||
'name': 'test_tap_mirror_%d' % i,
|
||||
'directions': 'IN=%d' % 99 + i,
|
||||
'remote_ip': '192.10.10.%d' % (i + 3),
|
||||
}
|
||||
elif getattr(attrs, 'id', None) is None:
|
||||
attrs['id'] = 'fake_id%d' % i
|
||||
tap_mirrors.append(FakeTapMirror.create_tap_mirror(attrs=attrs))
|
||||
|
||||
return tap_mirrors
|
283
neutronclient/tests/unit/osc/v2/taas/test_osc_tap_flow.py
Normal file
283
neutronclient/tests/unit/osc/v2/taas/test_osc_tap_flow.py
Normal file
@@ -0,0 +1,283 @@
|
||||
# All Rights Reserved 2020
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import copy
|
||||
import operator
|
||||
from unittest import mock
|
||||
|
||||
from neutronclient.tests.unit.osc.v2 import fakes as test_fakes
|
||||
from openstack.network.v2 import tap_flow as _tap_flow
|
||||
from osc_lib import utils as osc_utils
|
||||
from osc_lib.utils import columns as column_util
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from neutronclient.osc.v2.taas import tap_flow as osc_tap_flow
|
||||
from neutronclient.osc.v2.taas import tap_service as osc_tap_service
|
||||
from neutronclient.tests.unit.osc.v2.taas import fakes
|
||||
|
||||
|
||||
columns_long = tuple(col for col, _, listing_mode in osc_tap_flow._attr_map
|
||||
if listing_mode in (column_util.LIST_BOTH,
|
||||
column_util.LIST_LONG_ONLY))
|
||||
headers_long = tuple(head for _, head, listing_mode in
|
||||
osc_tap_flow._attr_map if listing_mode in
|
||||
(column_util.LIST_BOTH, column_util.LIST_LONG_ONLY))
|
||||
sorted_attr_map = sorted(osc_tap_flow._attr_map, key=operator.itemgetter(1))
|
||||
sorted_columns = tuple(col for col, _, _ in sorted_attr_map)
|
||||
sorted_headers = tuple(head for _, head, _ in sorted_attr_map)
|
||||
|
||||
|
||||
def _get_data(attrs, columns=sorted_columns):
|
||||
return osc_utils.get_dict_properties(attrs, columns)
|
||||
|
||||
|
||||
class TestCreateTapFlow(test_fakes.TestNeutronClientOSCV2):
|
||||
|
||||
columns = (
|
||||
'direction',
|
||||
'id',
|
||||
'name',
|
||||
'source_port',
|
||||
'status',
|
||||
'tap_service_id',
|
||||
)
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.cmd = osc_tap_flow.CreateTapFlow(self.app, self.namespace)
|
||||
|
||||
def test_create_tap_flow(self):
|
||||
"""Test Create Tap Flow."""
|
||||
port_id = uuidutils.generate_uuid()
|
||||
fake_tap_service = fakes.FakeTapService.create_tap_service(
|
||||
attrs={'port_id': port_id}
|
||||
)
|
||||
port_id = uuidutils.generate_uuid()
|
||||
fake_tap_flow = fakes.FakeTapFlow.create_tap_flow(
|
||||
attrs={
|
||||
'source_port': port_id,
|
||||
'tap_service_id': fake_tap_service['id']
|
||||
}
|
||||
)
|
||||
self.app.client_manager.network = mock.Mock()
|
||||
self.app.client_manager.network.create_tap_flow = mock.Mock(
|
||||
return_value=fake_tap_flow)
|
||||
self.app.client_manager.network.find_port = mock.Mock(
|
||||
return_value={'id': port_id})
|
||||
self.app.client_manager.network.find_tap_service = mock.Mock(
|
||||
return_value=fake_tap_service)
|
||||
arg_list = [
|
||||
'--name', fake_tap_flow['name'],
|
||||
'--port', fake_tap_flow['source_port'],
|
||||
'--tap-service', fake_tap_flow['tap_service_id'],
|
||||
'--direction', fake_tap_flow['direction'],
|
||||
]
|
||||
|
||||
verify_list = [
|
||||
('name', fake_tap_flow['name']),
|
||||
('port', fake_tap_flow['source_port']),
|
||||
('tap_service', fake_tap_flow['tap_service_id']),
|
||||
]
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arg_list, verify_list)
|
||||
with mock.patch.object(
|
||||
self.app.client_manager.network,
|
||||
'_find') as nc_find:
|
||||
nc_find.side_effect = [
|
||||
{'id': fake_tap_flow['tap_service_id']}
|
||||
]
|
||||
|
||||
columns, data = self.cmd.take_action(parsed_args)
|
||||
mock_create_t_f = self.app.client_manager.network.create_tap_flow
|
||||
mock_create_t_f.assert_called_once_with(
|
||||
**{
|
||||
'name': fake_tap_flow['name'],
|
||||
'source_port': fake_tap_flow['source_port'],
|
||||
'tap_service_id': fake_tap_flow['tap_service_id'],
|
||||
'direction': fake_tap_flow['direction']
|
||||
}
|
||||
)
|
||||
self.assertEqual(self.columns, columns)
|
||||
fake_data = _get_data(
|
||||
fake_tap_flow,
|
||||
osc_tap_service._get_columns(fake_tap_flow)[1])
|
||||
self.assertItemEqual(fake_data, data)
|
||||
|
||||
|
||||
class TestListTapFlow(test_fakes.TestNeutronClientOSCV2):
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.cmd = osc_tap_flow.ListTapFlow(self.app, self.namespace)
|
||||
|
||||
def test_list_tap_flows(self):
|
||||
"""Test List Tap Flow."""
|
||||
fake_tap_flows = fakes.FakeTapFlow.create_tap_flows(
|
||||
attrs={
|
||||
'source_port': uuidutils.generate_uuid(),
|
||||
'tap_service_id': uuidutils.generate_uuid(),
|
||||
},
|
||||
count=2)
|
||||
self.app.client_manager.network = mock.Mock()
|
||||
self.app.client_manager.network.tap_flows = mock.Mock(
|
||||
return_value=fake_tap_flows)
|
||||
arg_list = []
|
||||
verify_list = []
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arg_list, verify_list)
|
||||
|
||||
headers, data = self.cmd.take_action(parsed_args)
|
||||
|
||||
self.app.client_manager.network.tap_flows.assert_called_once()
|
||||
self.assertEqual(headers, list(headers_long))
|
||||
self.assertListItemEqual(
|
||||
list(data),
|
||||
[_get_data(fake_tap_flow, columns_long) for fake_tap_flow
|
||||
in fake_tap_flows]
|
||||
)
|
||||
|
||||
|
||||
class TestDeleteTapFlow(test_fakes.TestNeutronClientOSCV2):
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.app.client_manager.network = mock.Mock()
|
||||
self.app.client_manager.network.find_tap_flow = mock.Mock(
|
||||
side_effect=lambda name_or_id, ignore_missing:
|
||||
_tap_flow.TapFlow(id=name_or_id))
|
||||
self.cmd = osc_tap_flow.DeleteTapFlow(self.app, self.namespace)
|
||||
|
||||
def test_delete_tap_flow(self):
|
||||
"""Test Delete tap flow."""
|
||||
|
||||
fake_tap_flow = fakes.FakeTapFlow.create_tap_flow(
|
||||
attrs={
|
||||
'source_port': uuidutils.generate_uuid(),
|
||||
'tap_service_id': uuidutils.generate_uuid(),
|
||||
}
|
||||
)
|
||||
self.app.client_manager.network.delete_tap_flow = mock.Mock()
|
||||
|
||||
arg_list = [
|
||||
fake_tap_flow['id'],
|
||||
]
|
||||
verify_list = [
|
||||
(osc_tap_flow.TAP_FLOW, [fake_tap_flow['id']]),
|
||||
]
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arg_list, verify_list)
|
||||
|
||||
result = self.cmd.take_action(parsed_args)
|
||||
|
||||
mock_delete_tap_flow = self.app.client_manager.network.delete_tap_flow
|
||||
mock_delete_tap_flow.assert_called_once_with(fake_tap_flow['id'])
|
||||
self.assertIsNone(result)
|
||||
|
||||
|
||||
class TestShowTapFlow(test_fakes.TestNeutronClientOSCV2):
|
||||
columns = (
|
||||
'direction',
|
||||
'id',
|
||||
'name',
|
||||
'source_port',
|
||||
'status',
|
||||
'tap_service_id'
|
||||
)
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.app.client_manager.network = mock.Mock()
|
||||
self.app.client_manager.network.find_tap_flow = mock.Mock(
|
||||
side_effect=lambda name_or_id, ignore_missing:
|
||||
_tap_flow.TapFlow(id=name_or_id))
|
||||
self.cmd = osc_tap_flow.ShowTapFlow(self.app, self.namespace)
|
||||
|
||||
def test_show_tap_flow(self):
|
||||
"""Test Show tap flow."""
|
||||
fake_tap_flow = fakes.FakeTapFlow.create_tap_flow(
|
||||
attrs={
|
||||
'source_port': uuidutils.generate_uuid(),
|
||||
'tap_service_id': uuidutils.generate_uuid(),
|
||||
}
|
||||
)
|
||||
self.app.client_manager.network.get_tap_flow = mock.Mock(
|
||||
return_value=fake_tap_flow)
|
||||
arg_list = [
|
||||
fake_tap_flow['id'],
|
||||
]
|
||||
verify_list = [
|
||||
(osc_tap_flow.TAP_FLOW, fake_tap_flow['id']),
|
||||
]
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arg_list, verify_list)
|
||||
|
||||
headers, data = self.cmd.take_action(parsed_args)
|
||||
|
||||
self.app.client_manager.network.get_tap_flow.assert_called_once_with(
|
||||
fake_tap_flow['id'])
|
||||
self.assertEqual(self.columns, headers)
|
||||
fake_data = _get_data(
|
||||
fake_tap_flow,
|
||||
osc_tap_service._get_columns(fake_tap_flow)[1])
|
||||
self.assertItemEqual(fake_data, data)
|
||||
|
||||
|
||||
class TestUpdateTapFlow(test_fakes.TestNeutronClientOSCV2):
|
||||
|
||||
_new_name = 'new_name'
|
||||
|
||||
columns = (
|
||||
'Direction',
|
||||
'ID',
|
||||
'Name',
|
||||
'Status',
|
||||
'Tenant',
|
||||
'source_port',
|
||||
'tap_service_id',
|
||||
)
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.cmd = osc_tap_flow.UpdateTapFlow(self.app, self.namespace)
|
||||
self.app.client_manager.network = mock.Mock()
|
||||
self.app.client_manager.network.find_tap_flow = mock.Mock(
|
||||
side_effect=lambda name_or_id, ignore_missing:
|
||||
_tap_flow.TapFlow(id=name_or_id))
|
||||
|
||||
def test_update_tap_flow(self):
|
||||
"""Test update tap service"""
|
||||
fake_tap_flow = fakes.FakeTapFlow.create_tap_flow(
|
||||
attrs={
|
||||
'source_port': uuidutils.generate_uuid(),
|
||||
'tap_service_id': uuidutils.generate_uuid(),
|
||||
}
|
||||
)
|
||||
new_tap_flow = copy.deepcopy(fake_tap_flow)
|
||||
new_tap_flow['name'] = self._new_name
|
||||
|
||||
self.app.client_manager.network.update_tap_flow = mock.Mock(
|
||||
return_value=new_tap_flow)
|
||||
|
||||
arg_list = [
|
||||
fake_tap_flow['id'],
|
||||
'--name', self._new_name,
|
||||
]
|
||||
verify_list = [('name', self._new_name)]
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arg_list, verify_list)
|
||||
columns, data = self.cmd.take_action(parsed_args)
|
||||
attrs = {'name': self._new_name}
|
||||
|
||||
mock_update_t_f = self.app.client_manager.network.update_tap_flow
|
||||
mock_update_t_f.assert_called_once_with(new_tap_flow['id'], **attrs)
|
||||
self.assertEqual(self.columns, columns)
|
||||
self.assertItemEqual(_get_data(new_tap_flow), data)
|
267
neutronclient/tests/unit/osc/v2/taas/test_osc_tap_mirror.py
Normal file
267
neutronclient/tests/unit/osc/v2/taas/test_osc_tap_mirror.py
Normal file
@@ -0,0 +1,267 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import copy
|
||||
import operator
|
||||
from unittest import mock
|
||||
|
||||
from neutronclient.tests.unit.osc.v2 import fakes as test_fakes
|
||||
from openstack.network.v2 import tap_mirror
|
||||
from osc_lib import utils as osc_utils
|
||||
from osc_lib.utils import columns as column_util
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from neutronclient.osc.v2.taas import tap_mirror as osc_tap_mirror
|
||||
from neutronclient.tests.unit.osc.v2.taas import fakes
|
||||
|
||||
|
||||
columns_long = tuple(col for col, _, listing_mode in osc_tap_mirror._attr_map
|
||||
if listing_mode in (column_util.LIST_BOTH,
|
||||
column_util.LIST_LONG_ONLY))
|
||||
headers_long = tuple(head for _, head, listing_mode in
|
||||
osc_tap_mirror._attr_map if listing_mode in
|
||||
(column_util.LIST_BOTH, column_util.LIST_LONG_ONLY))
|
||||
sorted_attr_map = sorted(osc_tap_mirror._attr_map, key=operator.itemgetter(1))
|
||||
sorted_columns = tuple(col for col, _, _ in sorted_attr_map)
|
||||
sorted_headers = tuple(head for _, head, _ in sorted_attr_map)
|
||||
|
||||
|
||||
def _get_data(attrs, columns=sorted_columns):
|
||||
return osc_utils.get_dict_properties(attrs, columns)
|
||||
|
||||
|
||||
class TestCreateTapMirror(test_fakes.TestNeutronClientOSCV2):
|
||||
|
||||
columns = (
|
||||
'directions',
|
||||
'id',
|
||||
'mirror_type',
|
||||
'name',
|
||||
'port_id',
|
||||
'remote_ip',
|
||||
)
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.cmd = osc_tap_mirror.CreateTapMirror(self.app, self.namespace)
|
||||
|
||||
def test_create_tap_mirror(self):
|
||||
port_id = uuidutils.generate_uuid()
|
||||
fake_tap_mirror = fakes.FakeTapMirror.create_tap_mirror(
|
||||
attrs={'port_id': port_id}
|
||||
)
|
||||
self.app.client_manager.network = mock.Mock()
|
||||
self.app.client_manager.network.create_tap_mirror = mock.Mock(
|
||||
return_value=fake_tap_mirror)
|
||||
self.app.client_manager.network.find_port = mock.Mock(
|
||||
return_value={'id': port_id})
|
||||
self.app.client_manager.network.find_tap_mirror = mock.Mock(
|
||||
side_effect=lambda _, name_or_id: {'id': name_or_id})
|
||||
arg_list = [
|
||||
'--name', fake_tap_mirror['name'],
|
||||
'--port', fake_tap_mirror['port_id'],
|
||||
'--directions', fake_tap_mirror['directions'],
|
||||
'--remote-ip', fake_tap_mirror['remote_ip'],
|
||||
'--mirror-type', fake_tap_mirror['mirror_type'],
|
||||
]
|
||||
|
||||
verify_directions = fake_tap_mirror['directions'].split('=')
|
||||
verify_directions_dict = {verify_directions[0]: verify_directions[1]}
|
||||
|
||||
verify_list = [
|
||||
('name', fake_tap_mirror['name']),
|
||||
('port_id', fake_tap_mirror['port_id']),
|
||||
('directions', verify_directions_dict),
|
||||
('remote_ip', fake_tap_mirror['remote_ip']),
|
||||
('mirror_type', fake_tap_mirror['mirror_type']),
|
||||
]
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arg_list, verify_list)
|
||||
self.app.client_manager.network.find_tap_mirror = mock.Mock(
|
||||
return_value=fake_tap_mirror)
|
||||
|
||||
columns, data = self.cmd.take_action(parsed_args)
|
||||
create_tap_m_mock = self.app.client_manager.network.create_tap_mirror
|
||||
create_tap_m_mock.assert_called_once_with(
|
||||
**{'name': fake_tap_mirror['name'],
|
||||
'port_id': fake_tap_mirror['port_id'],
|
||||
'directions': verify_directions_dict,
|
||||
'remote_ip': fake_tap_mirror['remote_ip'],
|
||||
'mirror_type': fake_tap_mirror['mirror_type']})
|
||||
self.assertEqual(self.columns, columns)
|
||||
fake_data = _get_data(
|
||||
fake_tap_mirror,
|
||||
osc_tap_mirror._get_columns(fake_tap_mirror)[1])
|
||||
self.assertEqual(fake_data, data)
|
||||
|
||||
|
||||
class TestListTapMirror(test_fakes.TestNeutronClientOSCV2):
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.cmd = osc_tap_mirror.ListTapMirror(self.app, self.namespace)
|
||||
|
||||
def test_list_tap_mirror(self):
|
||||
"""Test List Tap Mirror."""
|
||||
fake_tap_mirrors = fakes.FakeTapMirror.create_tap_mirrors(
|
||||
attrs={'port_id': uuidutils.generate_uuid()},
|
||||
count=4)
|
||||
self.app.client_manager.network = mock.Mock()
|
||||
self.app.client_manager.network.tap_mirrors = mock.Mock(
|
||||
return_value=fake_tap_mirrors)
|
||||
|
||||
arg_list = []
|
||||
verify_list = []
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arg_list, verify_list)
|
||||
|
||||
headers, data = self.cmd.take_action(parsed_args)
|
||||
|
||||
self.app.client_manager.network.tap_mirrors.assert_called_once()
|
||||
self.assertEqual(headers, list(headers_long))
|
||||
self.assertListItemEqual(
|
||||
list(data),
|
||||
[_get_data(fake_tap_mirror, columns_long) for fake_tap_mirror
|
||||
in fake_tap_mirrors]
|
||||
)
|
||||
|
||||
|
||||
class TestDeleteTapMirror(test_fakes.TestNeutronClientOSCV2):
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.app.client_manager.network = mock.Mock()
|
||||
self.app.client_manager.network.find_tap_mirror = mock.Mock(
|
||||
side_effect=lambda name_or_id, ignore_missing:
|
||||
tap_mirror.TapMirror(id=name_or_id))
|
||||
self.cmd = osc_tap_mirror.DeleteTapMirror(self.app, self.namespace)
|
||||
|
||||
def test_delete_tap_mirror(self):
|
||||
"""Test Delete Tap Mirror."""
|
||||
|
||||
fake_tap_mirror = fakes.FakeTapMirror.create_tap_mirror(
|
||||
attrs={'port_id': uuidutils.generate_uuid()}
|
||||
)
|
||||
self.app.client_manager.network.delete_tap_mirror = mock.Mock()
|
||||
|
||||
arg_list = [
|
||||
fake_tap_mirror['id'],
|
||||
]
|
||||
verify_list = [
|
||||
(osc_tap_mirror.TAP_MIRROR, [fake_tap_mirror['id']]),
|
||||
]
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arg_list, verify_list)
|
||||
result = self.cmd.take_action(parsed_args)
|
||||
|
||||
mock_delete_tap_m = self.app.client_manager.network.delete_tap_mirror
|
||||
mock_delete_tap_m.assert_called_once_with(fake_tap_mirror['id'])
|
||||
self.assertIsNone(result)
|
||||
|
||||
|
||||
class TestShowTapMirror(test_fakes.TestNeutronClientOSCV2):
|
||||
|
||||
columns = (
|
||||
'directions',
|
||||
'id',
|
||||
'mirror_type',
|
||||
'name',
|
||||
'port_id',
|
||||
'remote_ip',
|
||||
)
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.app.client_manager.network = mock.Mock()
|
||||
self.app.client_manager.network.find_tap_mirror = mock.Mock(
|
||||
side_effect=lambda name_or_id, ignore_missing:
|
||||
tap_mirror.TapMirror(id=name_or_id))
|
||||
self.cmd = osc_tap_mirror.ShowTapMirror(self.app, self.namespace)
|
||||
|
||||
def test_show_tap_mirror(self):
|
||||
"""Test Show Tap Mirror."""
|
||||
|
||||
fake_tap_mirror = fakes.FakeTapMirror.create_tap_mirror(
|
||||
attrs={'port_id': uuidutils.generate_uuid()}
|
||||
)
|
||||
self.app.client_manager.network.get_tap_mirror = mock.Mock(
|
||||
return_value=fake_tap_mirror)
|
||||
arg_list = [
|
||||
fake_tap_mirror['id'],
|
||||
]
|
||||
verify_list = [
|
||||
(osc_tap_mirror.TAP_MIRROR, fake_tap_mirror['id']),
|
||||
]
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arg_list, verify_list)
|
||||
|
||||
headers, data = self.cmd.take_action(parsed_args)
|
||||
|
||||
mock_get_tap_m = self.app.client_manager.network.get_tap_mirror
|
||||
mock_get_tap_m.assert_called_once_with(
|
||||
fake_tap_mirror['id'])
|
||||
self.assertEqual(self.columns, headers)
|
||||
fake_data = _get_data(
|
||||
fake_tap_mirror,
|
||||
osc_tap_mirror._get_columns(fake_tap_mirror)[1])
|
||||
self.assertItemEqual(fake_data, data)
|
||||
|
||||
|
||||
class TestUpdateTapMirror(test_fakes.TestNeutronClientOSCV2):
|
||||
|
||||
_new_name = 'new_name'
|
||||
columns = (
|
||||
'directions',
|
||||
'id',
|
||||
'mirror_type',
|
||||
'name',
|
||||
'port_id',
|
||||
'remote_ip',
|
||||
)
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.cmd = osc_tap_mirror.UpdateTapMirror(self.app, self.namespace)
|
||||
self.app.client_manager.network = mock.Mock()
|
||||
self.app.client_manager.network.find_tap_mirror = mock.Mock(
|
||||
side_effect=lambda name_or_id, ignore_missing:
|
||||
tap_mirror.TapMirror(id=name_or_id))
|
||||
|
||||
def test_update_tap_mirror(self):
|
||||
"""Test update Tap Mirror"""
|
||||
fake_tap_mirror = fakes.FakeTapMirror.create_tap_mirror(
|
||||
attrs={'port_id': uuidutils.generate_uuid()}
|
||||
)
|
||||
new_tap_mirror = copy.deepcopy(fake_tap_mirror)
|
||||
new_tap_mirror['name'] = self._new_name
|
||||
|
||||
self.app.client_manager.network.update_tap_mirror = mock.Mock(
|
||||
return_value=new_tap_mirror)
|
||||
|
||||
arg_list = [
|
||||
fake_tap_mirror['id'],
|
||||
'--name', self._new_name,
|
||||
]
|
||||
verify_list = [('name', self._new_name)]
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arg_list, verify_list)
|
||||
columns, data = self.cmd.take_action(parsed_args)
|
||||
attrs = {'name': self._new_name}
|
||||
|
||||
mock_update_tap_m = self.app.client_manager.network.update_tap_mirror
|
||||
mock_update_tap_m.assert_called_once_with(
|
||||
fake_tap_mirror['id'], **attrs)
|
||||
self.assertEqual(self.columns, columns)
|
||||
fake_data = _get_data(
|
||||
new_tap_mirror,
|
||||
osc_tap_mirror._get_columns(new_tap_mirror)[1])
|
||||
self.assertItemEqual(fake_data, data)
|
250
neutronclient/tests/unit/osc/v2/taas/test_osc_tap_service.py
Normal file
250
neutronclient/tests/unit/osc/v2/taas/test_osc_tap_service.py
Normal file
@@ -0,0 +1,250 @@
|
||||
# All Rights Reserved 2020
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import copy
|
||||
import operator
|
||||
from unittest import mock
|
||||
|
||||
from neutronclient.tests.unit.osc.v2 import fakes as test_fakes
|
||||
from openstack.network.v2 import tap_service
|
||||
from osc_lib import utils as osc_utils
|
||||
from osc_lib.utils import columns as column_util
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
from neutronclient.osc.v2.taas import tap_service as osc_tap_service
|
||||
from neutronclient.tests.unit.osc.v2.taas import fakes
|
||||
|
||||
|
||||
columns_long = tuple(col for col, _, listing_mode in osc_tap_service._attr_map
|
||||
if listing_mode in (column_util.LIST_BOTH,
|
||||
column_util.LIST_LONG_ONLY))
|
||||
headers_long = tuple(head for _, head, listing_mode in
|
||||
osc_tap_service._attr_map if listing_mode in
|
||||
(column_util.LIST_BOTH, column_util.LIST_LONG_ONLY))
|
||||
sorted_attr_map = sorted(osc_tap_service._attr_map, key=operator.itemgetter(1))
|
||||
sorted_columns = tuple(col for col, _, _ in sorted_attr_map)
|
||||
sorted_headers = tuple(head for _, head, _ in sorted_attr_map)
|
||||
|
||||
|
||||
def _get_data(attrs, columns=sorted_columns):
|
||||
return osc_utils.get_dict_properties(attrs, columns)
|
||||
|
||||
|
||||
class TestCreateTapService(test_fakes.TestNeutronClientOSCV2):
|
||||
|
||||
columns = (
|
||||
'id',
|
||||
'name',
|
||||
'port_id',
|
||||
'status',
|
||||
)
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.cmd = osc_tap_service.CreateTapService(self.app, self.namespace)
|
||||
|
||||
def test_create_tap_service(self):
|
||||
"""Test Create Tap Service."""
|
||||
port_id = uuidutils.generate_uuid()
|
||||
fake_tap_service = fakes.FakeTapService.create_tap_service(
|
||||
attrs={'port_id': port_id}
|
||||
)
|
||||
self.app.client_manager.network = mock.Mock()
|
||||
self.app.client_manager.network.create_tap_service = mock.Mock(
|
||||
return_value=fake_tap_service)
|
||||
self.app.client_manager.network.find_port = mock.Mock(
|
||||
return_value={'id': port_id})
|
||||
self.app.client_manager.network.find_tap_service = mock.Mock(
|
||||
side_effect=lambda _, name_or_id: {'id': name_or_id})
|
||||
arg_list = [
|
||||
'--name', fake_tap_service['name'],
|
||||
'--port', fake_tap_service['port_id'],
|
||||
]
|
||||
|
||||
verify_list = [
|
||||
('name', fake_tap_service['name']),
|
||||
('port_id', fake_tap_service['port_id']),
|
||||
]
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arg_list, verify_list)
|
||||
self.app.client_manager.network.find_tap_service = mock.Mock(
|
||||
return_value=fake_tap_service)
|
||||
|
||||
columns, data = self.cmd.take_action(parsed_args)
|
||||
create_tap_s_mock = self.app.client_manager.network.create_tap_service
|
||||
create_tap_s_mock.assert_called_once_with(
|
||||
**{'name': fake_tap_service['name'],
|
||||
'port_id': fake_tap_service['port_id']})
|
||||
self.assertEqual(self.columns, columns)
|
||||
fake_data = _get_data(
|
||||
fake_tap_service,
|
||||
osc_tap_service._get_columns(fake_tap_service)[1])
|
||||
self.assertEqual(fake_data, data)
|
||||
|
||||
|
||||
class TestListTapService(test_fakes.TestNeutronClientOSCV2):
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.cmd = osc_tap_service.ListTapService(self.app, self.namespace)
|
||||
|
||||
def test_list_tap_service(self):
|
||||
"""Test List Tap Service."""
|
||||
fake_tap_services = fakes.FakeTapService.create_tap_services(
|
||||
attrs={'port_id': uuidutils.generate_uuid()},
|
||||
count=4)
|
||||
self.app.client_manager.network = mock.Mock()
|
||||
self.app.client_manager.network.tap_services = mock.Mock(
|
||||
return_value=fake_tap_services)
|
||||
|
||||
arg_list = []
|
||||
verify_list = []
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arg_list, verify_list)
|
||||
|
||||
headers, data = self.cmd.take_action(parsed_args)
|
||||
|
||||
self.app.client_manager.network.tap_services.assert_called_once()
|
||||
self.assertEqual(headers, list(headers_long))
|
||||
self.assertListItemEqual(
|
||||
list(data),
|
||||
[_get_data(fake_tap_service, columns_long) for fake_tap_service
|
||||
in fake_tap_services]
|
||||
)
|
||||
|
||||
|
||||
class TestDeleteTapService(test_fakes.TestNeutronClientOSCV2):
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.app.client_manager.network = mock.Mock()
|
||||
self.app.client_manager.network.find_tap_service = mock.Mock(
|
||||
side_effect=lambda name_or_id, ignore_missing:
|
||||
tap_service.TapService(id=name_or_id))
|
||||
self.cmd = osc_tap_service.DeleteTapService(self.app, self.namespace)
|
||||
|
||||
def test_delete_tap_service(self):
|
||||
"""Test Delete tap service."""
|
||||
|
||||
fake_tap_service = fakes.FakeTapService.create_tap_service(
|
||||
attrs={'port_id': uuidutils.generate_uuid()}
|
||||
)
|
||||
self.app.client_manager.network.delete_tap_service = mock.Mock()
|
||||
|
||||
arg_list = [
|
||||
fake_tap_service['id'],
|
||||
]
|
||||
verify_list = [
|
||||
(osc_tap_service.TAP_SERVICE, [fake_tap_service['id']]),
|
||||
]
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arg_list, verify_list)
|
||||
result = self.cmd.take_action(parsed_args)
|
||||
|
||||
mock_delete_tap_s = self.app.client_manager.network.delete_tap_service
|
||||
mock_delete_tap_s.assert_called_once_with(fake_tap_service['id'])
|
||||
self.assertIsNone(result)
|
||||
|
||||
|
||||
class TestShowTapService(test_fakes.TestNeutronClientOSCV2):
|
||||
columns = (
|
||||
'id',
|
||||
'name',
|
||||
'port_id',
|
||||
'status',
|
||||
)
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.app.client_manager.network = mock.Mock()
|
||||
self.app.client_manager.network.find_tap_service = mock.Mock(
|
||||
side_effect=lambda name_or_id, ignore_missing:
|
||||
tap_service.TapService(id=name_or_id))
|
||||
self.cmd = osc_tap_service.ShowTapService(self.app, self.namespace)
|
||||
|
||||
def test_show_tap_service(self):
|
||||
"""Test Show tap service."""
|
||||
|
||||
fake_tap_service = fakes.FakeTapService.create_tap_service(
|
||||
attrs={'port_id': uuidutils.generate_uuid()}
|
||||
)
|
||||
self.app.client_manager.network.get_tap_service = mock.Mock(
|
||||
return_value=fake_tap_service)
|
||||
arg_list = [
|
||||
fake_tap_service['id'],
|
||||
]
|
||||
verify_list = [
|
||||
(osc_tap_service.TAP_SERVICE, fake_tap_service['id']),
|
||||
]
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arg_list, verify_list)
|
||||
|
||||
headers, data = self.cmd.take_action(parsed_args)
|
||||
|
||||
mock_get_tap_s = self.app.client_manager.network.get_tap_service
|
||||
mock_get_tap_s.assert_called_once_with(
|
||||
fake_tap_service['id'])
|
||||
self.assertEqual(self.columns, headers)
|
||||
fake_data = _get_data(
|
||||
fake_tap_service,
|
||||
osc_tap_service._get_columns(fake_tap_service)[1])
|
||||
self.assertItemEqual(fake_data, data)
|
||||
|
||||
|
||||
class TestUpdateTapService(test_fakes.TestNeutronClientOSCV2):
|
||||
|
||||
_new_name = 'new_name'
|
||||
|
||||
columns = (
|
||||
'id',
|
||||
'name',
|
||||
'port_id',
|
||||
'status',
|
||||
)
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.cmd = osc_tap_service.UpdateTapService(self.app, self.namespace)
|
||||
self.app.client_manager.network = mock.Mock()
|
||||
self.app.client_manager.network.find_tap_service = mock.Mock(
|
||||
side_effect=lambda name_or_id, ignore_missing:
|
||||
tap_service.TapService(id=name_or_id))
|
||||
|
||||
def test_update_tap_service(self):
|
||||
"""Test update tap service"""
|
||||
fake_tap_service = fakes.FakeTapService.create_tap_service(
|
||||
attrs={'port_id': uuidutils.generate_uuid()}
|
||||
)
|
||||
new_tap_service = copy.deepcopy(fake_tap_service)
|
||||
new_tap_service['name'] = self._new_name
|
||||
|
||||
self.app.client_manager.network.update_tap_service = mock.Mock(
|
||||
return_value=new_tap_service)
|
||||
|
||||
arg_list = [
|
||||
fake_tap_service['id'],
|
||||
'--name', self._new_name,
|
||||
]
|
||||
verify_list = [('name', self._new_name)]
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arg_list, verify_list)
|
||||
columns, data = self.cmd.take_action(parsed_args)
|
||||
attrs = {'name': self._new_name}
|
||||
|
||||
mock_update_tap_s = self.app.client_manager.network.update_tap_service
|
||||
mock_update_tap_s.assert_called_once_with(
|
||||
fake_tap_service['id'], **attrs)
|
||||
self.assertEqual(self.columns, columns)
|
||||
fake_data = _get_data(
|
||||
new_tap_service,
|
||||
osc_tap_service._get_columns(new_tap_service)[1])
|
||||
self.assertItemEqual(fake_data, data)
|
16
setup.cfg
16
setup.cfg
@@ -161,3 +161,19 @@ openstack.neutronclient.v2 =
|
||||
vpn_ipsec_site_connection_show = neutronclient.osc.v2.vpnaas.ipsec_site_connection:ShowIPsecSiteConnection
|
||||
|
||||
network_onboard_subnets = neutronclient.osc.v2.subnet_onboard.subnet_onboard:NetworkOnboardSubnets
|
||||
|
||||
tap_flow_create = neutronclient.osc.v2.taas.tap_flow:CreateTapFlow
|
||||
tap_flow_delete = neutronclient.osc.v2.taas.tap_flow:DeleteTapFlow
|
||||
tap_flow_list = neutronclient.osc.v2.taas.tap_flow:ListTapFlow
|
||||
tap_flow_show = neutronclient.osc.v2.taas.tap_flow:ShowTapFlow
|
||||
tap_flow_update = neutronclient.osc.v2.taas.tap_flow:UpdateTapFlow
|
||||
tap_mirror_create = neutronclient.osc.v2.taas.tap_mirror:CreateTapMirror
|
||||
tap_mirror_delete = neutronclient.osc.v2.taas.tap_mirror:DeleteTapMirror
|
||||
tap_mirror_list = neutronclient.osc.v2.taas.tap_mirror:ListTapMirror
|
||||
tap_mirror_show = neutronclient.osc.v2.taas.tap_mirror:ShowTapMirror
|
||||
tap_mirror_update = neutronclient.osc.v2.taas.tap_mirror:UpdateTapMirror
|
||||
tap_service_create = neutronclient.osc.v2.taas.tap_service:CreateTapService
|
||||
tap_service_delete = neutronclient.osc.v2.taas.tap_service:DeleteTapService
|
||||
tap_service_list = neutronclient.osc.v2.taas.tap_service:ListTapService
|
||||
tap_service_show = neutronclient.osc.v2.taas.tap_service:ShowTapService
|
||||
tap_service_update = neutronclient.osc.v2.taas.tap_service:UpdateTapService
|
||||
|
Reference in New Issue
Block a user