Merge "Moving tapas osc client code from neutronclient"
This commit is contained in:
0
openstackclient/network/v2/taas/__init__.py
Normal file
0
openstackclient/network/v2/taas/__init__.py
Normal file
245
openstackclient/network/v2/taas/tap_flow.py
Normal file
245
openstackclient/network/v2/taas/tap_flow.py
Normal file
@@ -0,0 +1,245 @@
|
||||
# All Rights Reserved 2020
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
|
||||
from osc_lib.cli import format_columns
|
||||
from osc_lib.cli import identity as identity_utils
|
||||
from osc_lib.command import command
|
||||
from osc_lib import exceptions
|
||||
from osc_lib import utils as osc_utils
|
||||
from osc_lib.utils import columns as column_util
|
||||
|
||||
from openstackclient.i18n import _
|
||||
from openstackclient.identity import common
|
||||
from openstackclient.network.v2.taas import tap_service
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
TAP_FLOW = 'tap_flow'
|
||||
TAP_FLOWS = f'{TAP_FLOW}s'
|
||||
|
||||
_attr_map = (
|
||||
('id', 'ID', column_util.LIST_BOTH),
|
||||
('tenant_id', 'Tenant', column_util.LIST_LONG_ONLY),
|
||||
('name', 'Name', column_util.LIST_BOTH),
|
||||
('status', 'Status', column_util.LIST_BOTH),
|
||||
('source_port', 'source_port', column_util.LIST_BOTH),
|
||||
('tap_service_id', 'tap_service_id', column_util.LIST_BOTH),
|
||||
('direction', 'Direction', column_util.LIST_BOTH),
|
||||
)
|
||||
|
||||
_formatters = {
|
||||
'vlan_filter': format_columns.ListColumn,
|
||||
}
|
||||
|
||||
|
||||
def _add_updatable_args(parser):
|
||||
parser.add_argument('--name', help=_('Name of this Tap service.'))
|
||||
parser.add_argument(
|
||||
'--description', help=_('Description for this Tap service.')
|
||||
)
|
||||
|
||||
|
||||
class CreateTapFlow(command.ShowOne):
|
||||
_description = _("Create a tap flow")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super().get_parser(prog_name)
|
||||
identity_utils.add_project_owner_option_to_parser(parser)
|
||||
_add_updatable_args(parser)
|
||||
parser.add_argument(
|
||||
'--port',
|
||||
required=True,
|
||||
metavar="SOURCE_PORT",
|
||||
help=_('Source port to which the Tap Flow is connected.'),
|
||||
)
|
||||
parser.add_argument(
|
||||
'--tap-service',
|
||||
required=True,
|
||||
metavar="TAP_SERVICE",
|
||||
help=_('Tap Service to which the Tap Flow belongs.'),
|
||||
)
|
||||
parser.add_argument(
|
||||
'--direction',
|
||||
required=True,
|
||||
metavar="DIRECTION",
|
||||
choices=['IN', 'OUT', 'BOTH'],
|
||||
type=lambda s: s.upper(),
|
||||
help=_(
|
||||
'Direction of the Tap flow. Possible options are: '
|
||||
'IN, OUT, BOTH'
|
||||
),
|
||||
)
|
||||
parser.add_argument(
|
||||
'--vlan-filter',
|
||||
required=False,
|
||||
metavar="VLAN_FILTER",
|
||||
help=_('VLAN Ids to be mirrored in the form of range string.'),
|
||||
)
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.network
|
||||
attrs = {}
|
||||
if parsed_args.name is not None:
|
||||
attrs['name'] = parsed_args.name
|
||||
if parsed_args.description is not None:
|
||||
attrs['description'] = parsed_args.description
|
||||
if parsed_args.port is not None:
|
||||
source_port = client.find_port(
|
||||
parsed_args.port, ignore_missing=False
|
||||
)['id']
|
||||
attrs['source_port'] = source_port
|
||||
if parsed_args.tap_service is not None:
|
||||
tap_service_id = client.find_tap_service(
|
||||
parsed_args.tap_service, ignore_missing=False
|
||||
)['id']
|
||||
attrs['tap_service_id'] = tap_service_id
|
||||
if parsed_args.direction is not None:
|
||||
attrs['direction'] = parsed_args.direction
|
||||
if parsed_args.vlan_filter is not None:
|
||||
attrs['vlan_filter'] = parsed_args.vlan_filter
|
||||
if 'project' in parsed_args and parsed_args.project is not None:
|
||||
attrs['project_id'] = common.find_project(
|
||||
self.app.client_manager.identity,
|
||||
parsed_args.project,
|
||||
parsed_args.project_domain,
|
||||
).id
|
||||
obj = client.create_tap_flow(**attrs)
|
||||
display_columns, columns = tap_service._get_columns(obj)
|
||||
data = osc_utils.get_dict_properties(obj, columns)
|
||||
return display_columns, data
|
||||
|
||||
|
||||
class ListTapFlow(command.Lister):
|
||||
_description = _("List tap flows that belong to a given tenant")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super().get_parser(prog_name)
|
||||
identity_utils.add_project_owner_option_to_parser(parser)
|
||||
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.network
|
||||
params = {}
|
||||
if parsed_args.project is not None:
|
||||
params['project_id'] = common.find_project(
|
||||
self.app.client_manager.identity,
|
||||
parsed_args.project,
|
||||
parsed_args.project_domain,
|
||||
).id
|
||||
objs = client.tap_flows(retrieve_all=True, params=params)
|
||||
headers, columns = column_util.get_column_definitions(
|
||||
_attr_map, long_listing=True
|
||||
)
|
||||
return (
|
||||
headers,
|
||||
(
|
||||
osc_utils.get_dict_properties(
|
||||
s, columns, formatters=_formatters
|
||||
)
|
||||
for s in objs
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
class ShowTapFlow(command.ShowOne):
|
||||
_description = _("Show information of a given tap flow")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super().get_parser(prog_name)
|
||||
parser.add_argument(
|
||||
TAP_FLOW,
|
||||
metavar=f"<{TAP_FLOW}>",
|
||||
help=_("ID or name of tap flow to look up."),
|
||||
)
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.network
|
||||
id = client.find_tap_flow(
|
||||
parsed_args.tap_flow, ignore_missing=False
|
||||
).id
|
||||
obj = client.get_tap_flow(id)
|
||||
display_columns, columns = tap_service._get_columns(obj)
|
||||
data = osc_utils.get_dict_properties(obj, columns)
|
||||
return display_columns, data
|
||||
|
||||
|
||||
class DeleteTapFlow(command.Command):
|
||||
_description = _("Delete a tap flow")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super().get_parser(prog_name)
|
||||
parser.add_argument(
|
||||
TAP_FLOW,
|
||||
metavar=f"<{TAP_FLOW}>",
|
||||
nargs="+",
|
||||
help=_("ID(s) or name(s) of tap flow to delete."),
|
||||
)
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.network
|
||||
fails = 0
|
||||
for id_or_name in parsed_args.tap_flow:
|
||||
try:
|
||||
id = client.find_tap_flow(id_or_name, ignore_missing=False).id
|
||||
client.delete_tap_flow(id)
|
||||
LOG.warning("Tap flow %(id)s deleted", {'id': id})
|
||||
except Exception as e:
|
||||
fails += 1
|
||||
LOG.error(
|
||||
"Failed to delete tap flow with name or ID "
|
||||
"'%(id_or_name)s': %(e)s",
|
||||
{'id_or_name': id_or_name, 'e': e},
|
||||
)
|
||||
if fails > 0:
|
||||
msg = _("Failed to delete %(fails)s of %(total)s tap flow.") % {
|
||||
'fails': fails,
|
||||
'total': len(parsed_args.tap_flow),
|
||||
}
|
||||
raise exceptions.CommandError(msg)
|
||||
|
||||
|
||||
class UpdateTapFlow(command.ShowOne):
|
||||
_description = _("Update a tap flow.")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super().get_parser(prog_name)
|
||||
parser.add_argument(
|
||||
TAP_FLOW,
|
||||
metavar=f"<{TAP_FLOW}>",
|
||||
help=_("ID or name of tap flow to update."),
|
||||
)
|
||||
_add_updatable_args(parser)
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.network
|
||||
original_t_f = client.find_tap_flow(
|
||||
parsed_args.tap_flow, ignore_missing=False
|
||||
).id
|
||||
attrs = {}
|
||||
if parsed_args.name is not None:
|
||||
attrs['name'] = parsed_args.name
|
||||
if parsed_args.description is not None:
|
||||
attrs['description'] = parsed_args.description
|
||||
obj = client.update_tap_flow(original_t_f, **attrs)
|
||||
columns, display_columns = column_util.get_columns(obj, _attr_map)
|
||||
data = osc_utils.get_dict_properties(obj, columns)
|
||||
return display_columns, data
|
||||
238
openstackclient/network/v2/taas/tap_mirror.py
Normal file
238
openstackclient/network/v2/taas/tap_mirror.py
Normal file
@@ -0,0 +1,238 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
|
||||
from osc_lib.cli import identity as identity_utils
|
||||
from osc_lib.command import command
|
||||
from osc_lib import exceptions
|
||||
from osc_lib import utils as osc_utils
|
||||
from osc_lib.utils import columns as column_util
|
||||
|
||||
from openstackclient.i18n import _
|
||||
from openstackclient.identity import common
|
||||
from openstackclient.network.v2 import port as osc_port
|
||||
from openstackclient.network.v2.taas import tap_service
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
TAP_MIRROR = 'tap_mirror'
|
||||
TAP_MIRRORS = f'{TAP_MIRROR}s'
|
||||
|
||||
_attr_map = (
|
||||
('id', 'ID', column_util.LIST_BOTH),
|
||||
('tenant_id', 'Tenant', column_util.LIST_LONG_ONLY),
|
||||
('name', 'Name', column_util.LIST_BOTH),
|
||||
('port_id', 'Port', column_util.LIST_BOTH),
|
||||
('directions', 'Directions', column_util.LIST_LONG_ONLY),
|
||||
('remote_ip', 'Remote IP', column_util.LIST_BOTH),
|
||||
('mirror_type', 'Mirror Type', column_util.LIST_LONG_ONLY),
|
||||
)
|
||||
|
||||
|
||||
def _get_columns(item):
|
||||
column_map: dict[str, str] = {}
|
||||
hidden_columns = ['location', 'tenant_id']
|
||||
return osc_utils.get_osc_show_columns_for_sdk_resource(
|
||||
item, column_map, hidden_columns
|
||||
)
|
||||
|
||||
|
||||
class CreateTapMirror(command.ShowOne):
|
||||
_description = _("Create a Tap Mirror")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super().get_parser(prog_name)
|
||||
identity_utils.add_project_owner_option_to_parser(parser)
|
||||
tap_service._add_updatable_args(parser)
|
||||
parser.add_argument(
|
||||
'--port',
|
||||
dest='port_id',
|
||||
required=True,
|
||||
metavar="PORT",
|
||||
help=_('Port to which the Tap Mirror is connected.'),
|
||||
)
|
||||
parser.add_argument(
|
||||
'--directions',
|
||||
dest='directions',
|
||||
action=osc_port.JSONKeyValueAction,
|
||||
required=True,
|
||||
help=_(
|
||||
'A dictionary of direction and tunnel_id. Direction can '
|
||||
'be IN and OUT.'
|
||||
),
|
||||
)
|
||||
parser.add_argument(
|
||||
'--remote-ip',
|
||||
dest='remote_ip',
|
||||
required=True,
|
||||
help=_(
|
||||
'The remote IP of the Tap Mirror, this will be the '
|
||||
'remote end of the GRE or ERSPAN v1 tunnel'
|
||||
),
|
||||
)
|
||||
parser.add_argument(
|
||||
'--mirror-type',
|
||||
dest='mirror_type',
|
||||
required=True,
|
||||
help=_('The type of the mirroring, it can be gre or erspanv1'),
|
||||
)
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.network
|
||||
attrs = {}
|
||||
if parsed_args.name is not None:
|
||||
attrs['name'] = parsed_args.name
|
||||
if parsed_args.description is not None:
|
||||
attrs['description'] = parsed_args.description
|
||||
if parsed_args.port_id is not None:
|
||||
port_id = client.find_port(
|
||||
parsed_args.port_id, ignore_missing=False
|
||||
)['id']
|
||||
attrs['port_id'] = port_id
|
||||
if parsed_args.directions is not None:
|
||||
attrs['directions'] = parsed_args.directions
|
||||
if parsed_args.remote_ip is not None:
|
||||
attrs['remote_ip'] = parsed_args.remote_ip
|
||||
if parsed_args.mirror_type is not None:
|
||||
attrs['mirror_type'] = parsed_args.mirror_type
|
||||
if 'project' in parsed_args and parsed_args.project is not None:
|
||||
attrs['project_id'] = common.find_project(
|
||||
self.app.client_manager.identity,
|
||||
parsed_args.project,
|
||||
parsed_args.project_domain,
|
||||
).id
|
||||
obj = client.create_tap_mirror(**attrs)
|
||||
display_columns, columns = tap_service._get_columns(obj)
|
||||
data = osc_utils.get_dict_properties(obj, columns)
|
||||
return display_columns, data
|
||||
|
||||
|
||||
class ListTapMirror(command.Lister):
|
||||
_description = _("List Tap Mirrors that belong to a given tenant")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super().get_parser(prog_name)
|
||||
identity_utils.add_project_owner_option_to_parser(parser)
|
||||
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.network
|
||||
params = {}
|
||||
if parsed_args.project is not None:
|
||||
params['project_id'] = common.find_project(
|
||||
self.app.client_manager.identity,
|
||||
parsed_args.project,
|
||||
parsed_args.project_domain,
|
||||
).id
|
||||
objs = client.tap_mirrors(retrieve_all=True, params=params)
|
||||
headers, columns = column_util.get_column_definitions(
|
||||
_attr_map, long_listing=True
|
||||
)
|
||||
return (
|
||||
headers,
|
||||
(osc_utils.get_dict_properties(s, columns) for s in objs),
|
||||
)
|
||||
|
||||
|
||||
class ShowTapMirror(command.ShowOne):
|
||||
_description = _("Show information of a given Tap Mirror")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super().get_parser(prog_name)
|
||||
parser.add_argument(
|
||||
TAP_MIRROR,
|
||||
metavar=f"<{TAP_MIRROR}>",
|
||||
help=_("ID or name of Tap Mirror to look up."),
|
||||
)
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.network
|
||||
id = client.find_tap_mirror(
|
||||
parsed_args.tap_mirror, ignore_missing=False
|
||||
).id
|
||||
obj = client.get_tap_mirror(id)
|
||||
display_columns, columns = tap_service._get_columns(obj)
|
||||
data = osc_utils.get_dict_properties(obj, columns)
|
||||
return display_columns, data
|
||||
|
||||
|
||||
class DeleteTapMirror(command.Command):
|
||||
_description = _("Delete a Tap Mirror")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super().get_parser(prog_name)
|
||||
parser.add_argument(
|
||||
TAP_MIRROR,
|
||||
metavar=f"<{TAP_MIRROR}>",
|
||||
nargs="+",
|
||||
help=_("ID(s) or name(s) of the Tap Mirror to delete."),
|
||||
)
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.network
|
||||
fails = 0
|
||||
for id_or_name in parsed_args.tap_mirror:
|
||||
try:
|
||||
id = client.find_tap_mirror(
|
||||
id_or_name, ignore_missing=False
|
||||
).id
|
||||
|
||||
client.delete_tap_mirror(id)
|
||||
LOG.warning("Tap Mirror %(id)s deleted", {'id': id})
|
||||
except Exception as e:
|
||||
fails += 1
|
||||
LOG.error(
|
||||
"Failed to delete Tap Mirror with name or ID "
|
||||
"'%(id_or_name)s': %(e)s",
|
||||
{'id_or_name': id_or_name, 'e': e},
|
||||
)
|
||||
if fails > 0:
|
||||
msg = _("Failed to delete %(fails)s of %(total)s Tap Mirror.") % {
|
||||
'fails': fails,
|
||||
'total': len(parsed_args.tap_mirror),
|
||||
}
|
||||
raise exceptions.CommandError(msg)
|
||||
|
||||
|
||||
class UpdateTapMirror(command.ShowOne):
|
||||
_description = _("Update a Tap Mirror.")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super().get_parser(prog_name)
|
||||
parser.add_argument(
|
||||
TAP_MIRROR,
|
||||
metavar=f"<{TAP_MIRROR}>",
|
||||
help=_("ID or name of the Tap Mirror to update."),
|
||||
)
|
||||
tap_service._add_updatable_args(parser)
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.network
|
||||
original_t_s = client.find_tap_mirror(
|
||||
parsed_args.tap_mirror, ignore_missing=False
|
||||
).id
|
||||
attrs = {}
|
||||
if parsed_args.name is not None:
|
||||
attrs['name'] = parsed_args.name
|
||||
if parsed_args.description is not None:
|
||||
attrs['description'] = parsed_args.description
|
||||
obj = client.update_tap_mirror(original_t_s, **attrs)
|
||||
display_columns, columns = tap_service._get_columns(obj)
|
||||
data = osc_utils.get_dict_properties(obj, columns)
|
||||
return display_columns, data
|
||||
212
openstackclient/network/v2/taas/tap_service.py
Normal file
212
openstackclient/network/v2/taas/tap_service.py
Normal file
@@ -0,0 +1,212 @@
|
||||
# All Rights Reserved 2020
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
|
||||
from osc_lib.cli import identity as identity_utils
|
||||
from osc_lib.command import command
|
||||
from osc_lib import exceptions
|
||||
from osc_lib import utils as osc_utils
|
||||
from osc_lib.utils import columns as column_util
|
||||
|
||||
from openstackclient.i18n import _
|
||||
from openstackclient.identity import common
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
TAP_SERVICE = 'tap_service'
|
||||
TAP_SERVICES = f'{TAP_SERVICE}s'
|
||||
|
||||
_attr_map = (
|
||||
('id', 'ID', column_util.LIST_BOTH),
|
||||
('tenant_id', 'Tenant', column_util.LIST_LONG_ONLY),
|
||||
('name', 'Name', column_util.LIST_BOTH),
|
||||
('port_id', 'Port', column_util.LIST_BOTH),
|
||||
('status', 'Status', column_util.LIST_BOTH),
|
||||
)
|
||||
|
||||
|
||||
def _add_updatable_args(parser):
|
||||
parser.add_argument('--name', help=_('Name of this Tap service.'))
|
||||
parser.add_argument(
|
||||
'--description', help=_('Description for this Tap service.')
|
||||
)
|
||||
|
||||
|
||||
def _get_columns(item):
|
||||
column_map: dict[str, str] = {}
|
||||
hidden_columns = ['location', 'tenant_id']
|
||||
return osc_utils.get_osc_show_columns_for_sdk_resource(
|
||||
item, column_map, hidden_columns
|
||||
)
|
||||
|
||||
|
||||
class CreateTapService(command.ShowOne):
|
||||
_description = _("Create a tap service")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super().get_parser(prog_name)
|
||||
identity_utils.add_project_owner_option_to_parser(parser)
|
||||
_add_updatable_args(parser)
|
||||
parser.add_argument(
|
||||
'--port',
|
||||
dest='port_id',
|
||||
required=True,
|
||||
metavar="PORT",
|
||||
help=_('Port to which the Tap service is connected.'),
|
||||
)
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.network
|
||||
attrs = {}
|
||||
if parsed_args.name is not None:
|
||||
attrs['name'] = parsed_args.name
|
||||
if parsed_args.description is not None:
|
||||
attrs['description'] = parsed_args.description
|
||||
if parsed_args.port_id is not None:
|
||||
port_id = client.find_port(
|
||||
parsed_args.port_id, ignore_missing=False
|
||||
)['id']
|
||||
attrs['port_id'] = port_id
|
||||
if 'project' in parsed_args and parsed_args.project is not None:
|
||||
attrs['project_id'] = common.find_project(
|
||||
self.app.client_manager.identity,
|
||||
parsed_args.project,
|
||||
parsed_args.project_domain,
|
||||
).id
|
||||
obj = client.create_tap_service(**attrs)
|
||||
display_columns, columns = _get_columns(obj)
|
||||
data = osc_utils.get_dict_properties(obj, columns)
|
||||
return display_columns, data
|
||||
|
||||
|
||||
class ListTapService(command.Lister):
|
||||
_description = _("List tap services that belong to a given project")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super().get_parser(prog_name)
|
||||
identity_utils.add_project_owner_option_to_parser(parser)
|
||||
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.network
|
||||
params = {}
|
||||
if parsed_args.project is not None:
|
||||
params['project_id'] = common.find_project(
|
||||
self.app.client_manager.identity,
|
||||
parsed_args.project,
|
||||
parsed_args.project_domain,
|
||||
).id
|
||||
objs = client.tap_services(retrieve_all=True, params=params)
|
||||
headers, columns = column_util.get_column_definitions(
|
||||
_attr_map, long_listing=True
|
||||
)
|
||||
return (
|
||||
headers,
|
||||
(osc_utils.get_dict_properties(s, columns) for s in objs),
|
||||
)
|
||||
|
||||
|
||||
class ShowTapService(command.ShowOne):
|
||||
_description = _("Show information of a given tap service")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super().get_parser(prog_name)
|
||||
parser.add_argument(
|
||||
TAP_SERVICE,
|
||||
metavar=f"<{TAP_SERVICE}>",
|
||||
help=_("ID or name of tap service to look up."),
|
||||
)
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.network
|
||||
id = client.find_tap_service(
|
||||
parsed_args.tap_service, ignore_missing=False
|
||||
).id
|
||||
obj = client.get_tap_service(id)
|
||||
display_columns, columns = _get_columns(obj)
|
||||
data = osc_utils.get_dict_properties(obj, columns)
|
||||
return display_columns, data
|
||||
|
||||
|
||||
class DeleteTapService(command.Command):
|
||||
_description = _("Delete a tap service")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super().get_parser(prog_name)
|
||||
parser.add_argument(
|
||||
TAP_SERVICE,
|
||||
metavar=f"<{TAP_SERVICE}>",
|
||||
nargs="+",
|
||||
help=_("ID(s) or name(s) of tap service to delete."),
|
||||
)
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.network
|
||||
fails = 0
|
||||
for id_or_name in parsed_args.tap_service:
|
||||
try:
|
||||
id = client.find_tap_service(
|
||||
id_or_name, ignore_missing=False
|
||||
).id
|
||||
|
||||
client.delete_tap_service(id)
|
||||
LOG.warning("Tap service %(id)s deleted", {'id': id})
|
||||
except Exception as e:
|
||||
fails += 1
|
||||
LOG.error(
|
||||
"Failed to delete tap service with name or ID "
|
||||
"'%(id_or_name)s': %(e)s",
|
||||
{'id_or_name': id_or_name, 'e': e},
|
||||
)
|
||||
if fails > 0:
|
||||
msg = _("Failed to delete %(fails)s of %(total)s tap service.") % {
|
||||
'fails': fails,
|
||||
'total': len(parsed_args.tap_service),
|
||||
}
|
||||
raise exceptions.CommandError(msg)
|
||||
|
||||
|
||||
class UpdateTapService(command.ShowOne):
|
||||
_description = _("Update a tap service.")
|
||||
|
||||
def get_parser(self, prog_name):
|
||||
parser = super().get_parser(prog_name)
|
||||
parser.add_argument(
|
||||
TAP_SERVICE,
|
||||
metavar=f"<{TAP_SERVICE}>",
|
||||
help=_("ID or name of tap service to update."),
|
||||
)
|
||||
_add_updatable_args(parser)
|
||||
return parser
|
||||
|
||||
def take_action(self, parsed_args):
|
||||
client = self.app.client_manager.network
|
||||
original_t_s = client.find_tap_service(
|
||||
parsed_args.tap_service, ignore_missing=False
|
||||
).id
|
||||
attrs = {}
|
||||
if parsed_args.name is not None:
|
||||
attrs['name'] = parsed_args.name
|
||||
if parsed_args.description is not None:
|
||||
attrs['description'] = parsed_args.description
|
||||
obj = client.update_tap_service(original_t_s, **attrs)
|
||||
display_columns, columns = _get_columns(obj)
|
||||
data = osc_utils.get_dict_properties(obj, columns)
|
||||
return display_columns, data
|
||||
118
openstackclient/tests/unit/network/v2/taas/fakes.py
Normal file
118
openstackclient/tests/unit/network/v2/taas/fakes.py
Normal file
@@ -0,0 +1,118 @@
|
||||
# All Rights Reserved 2020
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import copy
|
||||
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
|
||||
class FakeTapService:
|
||||
@staticmethod
|
||||
def create_tap_service(attrs=None):
|
||||
"""Create a fake tap service."""
|
||||
attrs = attrs or {}
|
||||
tap_service_attrs = {
|
||||
'id': uuidutils.generate_uuid(),
|
||||
'tenant_id': uuidutils.generate_uuid(),
|
||||
'name': 'test_tap_service' + uuidutils.generate_uuid(),
|
||||
'status': 'ACTIVE',
|
||||
}
|
||||
tap_service_attrs.update(attrs)
|
||||
return copy.deepcopy(tap_service_attrs)
|
||||
|
||||
@staticmethod
|
||||
def create_tap_services(attrs=None, count=1):
|
||||
"""Create multiple fake tap services."""
|
||||
|
||||
tap_services = []
|
||||
for i in range(0, count):
|
||||
if attrs is None:
|
||||
attrs = {'id': f'fake_id{i}'}
|
||||
elif getattr(attrs, 'id', None) is None:
|
||||
attrs['id'] = f'fake_id{i}'
|
||||
tap_services.append(FakeTapService.create_tap_service(attrs=attrs))
|
||||
|
||||
return tap_services
|
||||
|
||||
|
||||
class FakeTapFlow:
|
||||
@staticmethod
|
||||
def create_tap_flow(attrs=None):
|
||||
"""Create a fake tap service."""
|
||||
attrs = attrs or {}
|
||||
tap_flow_attrs = {
|
||||
'id': uuidutils.generate_uuid(),
|
||||
'tenant_id': uuidutils.generate_uuid(),
|
||||
'name': 'test_tap_flow' + uuidutils.generate_uuid(),
|
||||
'status': 'ACTIVE',
|
||||
'direction': 'BOTH',
|
||||
}
|
||||
tap_flow_attrs.update(attrs)
|
||||
return copy.deepcopy(tap_flow_attrs)
|
||||
|
||||
@staticmethod
|
||||
def create_tap_flows(attrs=None, count=1):
|
||||
"""Create multiple fake tap flows."""
|
||||
|
||||
tap_flows = []
|
||||
for i in range(0, count):
|
||||
if attrs is None:
|
||||
attrs = {
|
||||
'id': f'fake_id{i}',
|
||||
'source_port': uuidutils.generate_uuid(),
|
||||
'tap_service_id': uuidutils.generate_uuid(),
|
||||
}
|
||||
elif getattr(attrs, 'id', None) is None:
|
||||
attrs['id'] = f'fake_id{i}'
|
||||
tap_flows.append(FakeTapFlow.create_tap_flow(attrs=attrs))
|
||||
|
||||
return tap_flows
|
||||
|
||||
|
||||
class FakeTapMirror:
|
||||
@staticmethod
|
||||
def create_tap_mirror(attrs=None):
|
||||
"""Create a fake tap mirror."""
|
||||
attrs = attrs or {}
|
||||
tap_mirror_attrs = {
|
||||
'id': uuidutils.generate_uuid(),
|
||||
'tenant_id': uuidutils.generate_uuid(),
|
||||
'name': 'test_tap_mirror' + uuidutils.generate_uuid(),
|
||||
'port_id': uuidutils.generate_uuid(),
|
||||
'directions': 'IN=99',
|
||||
'remote_ip': '192.10.10.2',
|
||||
'mirror_type': 'gre',
|
||||
}
|
||||
tap_mirror_attrs.update(attrs)
|
||||
return copy.deepcopy(tap_mirror_attrs)
|
||||
|
||||
@staticmethod
|
||||
def create_tap_mirrors(attrs=None, count=1):
|
||||
"""Create multiple fake tap mirrors."""
|
||||
|
||||
tap_mirrors = []
|
||||
for i in range(0, count):
|
||||
if attrs is None:
|
||||
attrs = {
|
||||
'id': f'fake_id{i}',
|
||||
'port_id': uuidutils.generate_uuid(),
|
||||
'name': f'test_tap_mirror_{i}',
|
||||
'directions': f'IN={99 + i}',
|
||||
'remote_ip': f'192.10.10.{i + 3}',
|
||||
}
|
||||
elif getattr(attrs, 'id', None) is None:
|
||||
attrs['id'] = f'fake_id{i}'
|
||||
tap_mirrors.append(FakeTapMirror.create_tap_mirror(attrs=attrs))
|
||||
|
||||
return tap_mirrors
|
||||
283
openstackclient/tests/unit/network/v2/taas/test_osc_tap_flow.py
Normal file
283
openstackclient/tests/unit/network/v2/taas/test_osc_tap_flow.py
Normal file
@@ -0,0 +1,283 @@
|
||||
# All Rights Reserved 2020
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import copy
|
||||
import operator
|
||||
import uuid
|
||||
|
||||
from openstack.network.v2 import tap_flow as _tap_flow
|
||||
from osc_lib import utils as osc_utils
|
||||
from osc_lib.utils import columns as column_util
|
||||
|
||||
from openstackclient.network.v2.taas import tap_flow as osc_tap_flow
|
||||
from openstackclient.network.v2.taas import tap_service as osc_tap_service
|
||||
from openstackclient.tests.unit.network.v2 import fakes as network_fakes
|
||||
from openstackclient.tests.unit.network.v2.taas import fakes
|
||||
|
||||
|
||||
columns_long = tuple(
|
||||
col
|
||||
for col, _, listing_mode in osc_tap_flow._attr_map
|
||||
if listing_mode in (column_util.LIST_BOTH, column_util.LIST_LONG_ONLY)
|
||||
)
|
||||
headers_long = tuple(
|
||||
head
|
||||
for _, head, listing_mode in osc_tap_flow._attr_map
|
||||
if listing_mode in (column_util.LIST_BOTH, column_util.LIST_LONG_ONLY)
|
||||
)
|
||||
sorted_attr_map = sorted(osc_tap_flow._attr_map, key=operator.itemgetter(1))
|
||||
sorted_columns = tuple(col for col, _, _ in sorted_attr_map)
|
||||
sorted_headers = tuple(head for _, head, _ in sorted_attr_map)
|
||||
|
||||
|
||||
def _get_data(attrs, columns=sorted_columns):
|
||||
return osc_utils.get_dict_properties(attrs, columns)
|
||||
|
||||
|
||||
class TestCreateTapFlow(network_fakes.TestNetworkV2):
|
||||
columns = (
|
||||
'direction',
|
||||
'id',
|
||||
'name',
|
||||
'source_port',
|
||||
'status',
|
||||
'tap_service_id',
|
||||
)
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.cmd = osc_tap_flow.CreateTapFlow(self.app, None)
|
||||
|
||||
def test_create_tap_flow(self):
|
||||
"""Test Create Tap Flow."""
|
||||
fake_tap_service = fakes.FakeTapService.create_tap_service(
|
||||
attrs={'port_id': str(uuid.uuid4())}
|
||||
)
|
||||
port_id = str(uuid.uuid4())
|
||||
fake_tap_flow = fakes.FakeTapFlow.create_tap_flow(
|
||||
attrs={
|
||||
'source_port': port_id,
|
||||
'tap_service_id': fake_tap_service['id'],
|
||||
}
|
||||
)
|
||||
self.app.client_manager.network.create_tap_flow.return_value = (
|
||||
fake_tap_flow
|
||||
)
|
||||
self.app.client_manager.network.find_port.return_value = {
|
||||
'id': port_id
|
||||
}
|
||||
self.app.client_manager.network.find_tap_service.return_value = (
|
||||
fake_tap_service
|
||||
)
|
||||
arg_list = [
|
||||
'--name',
|
||||
fake_tap_flow['name'],
|
||||
'--port',
|
||||
fake_tap_flow['source_port'],
|
||||
'--tap-service',
|
||||
fake_tap_flow['tap_service_id'],
|
||||
'--direction',
|
||||
fake_tap_flow['direction'],
|
||||
]
|
||||
|
||||
verify_list = [
|
||||
('name', fake_tap_flow['name']),
|
||||
('port', fake_tap_flow['source_port']),
|
||||
('tap_service', fake_tap_flow['tap_service_id']),
|
||||
]
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arg_list, verify_list)
|
||||
columns, data = self.cmd.take_action(parsed_args)
|
||||
mock_create_t_f = self.app.client_manager.network.create_tap_flow
|
||||
mock_create_t_f.assert_called_once_with(
|
||||
**{
|
||||
'name': fake_tap_flow['name'],
|
||||
'source_port': fake_tap_flow['source_port'],
|
||||
'tap_service_id': fake_tap_flow['tap_service_id'],
|
||||
'direction': fake_tap_flow['direction'],
|
||||
}
|
||||
)
|
||||
self.assertEqual(self.columns, columns)
|
||||
fake_data = _get_data(
|
||||
fake_tap_flow, osc_tap_service._get_columns(fake_tap_flow)[1]
|
||||
)
|
||||
self.assertEqual(fake_data, data)
|
||||
|
||||
|
||||
class TestListTapFlow(network_fakes.TestNetworkV2):
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.cmd = osc_tap_flow.ListTapFlow(self.app, None)
|
||||
|
||||
def test_list_tap_flows(self):
|
||||
"""Test List Tap Flow."""
|
||||
fake_tap_flows = fakes.FakeTapFlow.create_tap_flows(
|
||||
attrs={
|
||||
'source_port': str(uuid.uuid4()),
|
||||
'tap_service_id': str(uuid.uuid4()),
|
||||
},
|
||||
count=2,
|
||||
)
|
||||
self.app.client_manager.network.tap_flows.return_value = fake_tap_flows
|
||||
arg_list = []
|
||||
verify_list = []
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arg_list, verify_list)
|
||||
|
||||
headers, data = self.cmd.take_action(parsed_args)
|
||||
|
||||
self.app.client_manager.network.tap_flows.assert_called_once()
|
||||
self.assertEqual(headers, list(headers_long))
|
||||
self.assertCountEqual(
|
||||
list(data),
|
||||
[
|
||||
_get_data(fake_tap_flow, columns_long)
|
||||
for fake_tap_flow in fake_tap_flows
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
class TestDeleteTapFlow(network_fakes.TestNetworkV2):
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.app.client_manager.network.find_tap_flow.side_effect = (
|
||||
lambda name_or_id, ignore_missing: _tap_flow.TapFlow(id=name_or_id)
|
||||
)
|
||||
self.cmd = osc_tap_flow.DeleteTapFlow(self.app, None)
|
||||
|
||||
def test_delete_tap_flow(self):
|
||||
"""Test Delete tap flow."""
|
||||
|
||||
fake_tap_flow = fakes.FakeTapFlow.create_tap_flow(
|
||||
attrs={
|
||||
'source_port': str(uuid.uuid4()),
|
||||
'tap_service_id': str(uuid.uuid4()),
|
||||
}
|
||||
)
|
||||
|
||||
arg_list = [
|
||||
fake_tap_flow['id'],
|
||||
]
|
||||
verify_list = [
|
||||
(osc_tap_flow.TAP_FLOW, [fake_tap_flow['id']]),
|
||||
]
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arg_list, verify_list)
|
||||
|
||||
result = self.cmd.take_action(parsed_args)
|
||||
|
||||
mock_delete_tap_flow = self.app.client_manager.network.delete_tap_flow
|
||||
mock_delete_tap_flow.assert_called_once_with(fake_tap_flow['id'])
|
||||
self.assertIsNone(result)
|
||||
|
||||
|
||||
class TestShowTapFlow(network_fakes.TestNetworkV2):
|
||||
columns = (
|
||||
'direction',
|
||||
'id',
|
||||
'name',
|
||||
'source_port',
|
||||
'status',
|
||||
'tap_service_id',
|
||||
)
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.app.client_manager.network.find_tap_flow.side_effect = (
|
||||
lambda name_or_id, ignore_missing: _tap_flow.TapFlow(id=name_or_id)
|
||||
)
|
||||
self.cmd = osc_tap_flow.ShowTapFlow(self.app, None)
|
||||
|
||||
def test_show_tap_flow(self):
|
||||
"""Test Show tap flow."""
|
||||
fake_tap_flow = fakes.FakeTapFlow.create_tap_flow(
|
||||
attrs={
|
||||
'source_port': str(uuid.uuid4()),
|
||||
'tap_service_id': str(uuid.uuid4()),
|
||||
}
|
||||
)
|
||||
self.app.client_manager.network.get_tap_flow.return_value = (
|
||||
fake_tap_flow
|
||||
)
|
||||
arg_list = [
|
||||
fake_tap_flow['id'],
|
||||
]
|
||||
verify_list = [
|
||||
(osc_tap_flow.TAP_FLOW, fake_tap_flow['id']),
|
||||
]
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arg_list, verify_list)
|
||||
|
||||
headers, data = self.cmd.take_action(parsed_args)
|
||||
|
||||
self.app.client_manager.network.get_tap_flow.assert_called_once_with(
|
||||
fake_tap_flow['id']
|
||||
)
|
||||
self.assertEqual(self.columns, headers)
|
||||
fake_data = _get_data(
|
||||
fake_tap_flow, osc_tap_service._get_columns(fake_tap_flow)[1]
|
||||
)
|
||||
self.assertEqual(fake_data, data)
|
||||
|
||||
|
||||
class TestUpdateTapFlow(network_fakes.TestNetworkV2):
|
||||
_new_name = 'new_name'
|
||||
|
||||
columns = (
|
||||
'Direction',
|
||||
'ID',
|
||||
'Name',
|
||||
'Status',
|
||||
'Tenant',
|
||||
'source_port',
|
||||
'tap_service_id',
|
||||
)
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.cmd = osc_tap_flow.UpdateTapFlow(self.app, None)
|
||||
self.app.client_manager.network.find_tap_flow.side_effect = (
|
||||
lambda name_or_id, ignore_missing: _tap_flow.TapFlow(id=name_or_id)
|
||||
)
|
||||
|
||||
def test_update_tap_flow(self):
|
||||
"""Test update tap service"""
|
||||
fake_tap_flow = fakes.FakeTapFlow.create_tap_flow(
|
||||
attrs={
|
||||
'source_port': str(uuid.uuid4()),
|
||||
'tap_service_id': str(uuid.uuid4()),
|
||||
}
|
||||
)
|
||||
new_tap_flow = copy.deepcopy(fake_tap_flow)
|
||||
new_tap_flow['name'] = self._new_name
|
||||
|
||||
self.app.client_manager.network.update_tap_flow.return_value = (
|
||||
new_tap_flow
|
||||
)
|
||||
|
||||
arg_list = [
|
||||
fake_tap_flow['id'],
|
||||
'--name',
|
||||
self._new_name,
|
||||
]
|
||||
verify_list = [('name', self._new_name)]
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arg_list, verify_list)
|
||||
columns, data = self.cmd.take_action(parsed_args)
|
||||
attrs = {'name': self._new_name}
|
||||
|
||||
mock_update_t_f = self.app.client_manager.network.update_tap_flow
|
||||
mock_update_t_f.assert_called_once_with(new_tap_flow['id'], **attrs)
|
||||
self.assertEqual(self.columns, columns)
|
||||
self.assertEqual(_get_data(new_tap_flow), data)
|
||||
@@ -0,0 +1,283 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import copy
|
||||
import operator
|
||||
import uuid
|
||||
|
||||
from openstack.network.v2 import tap_mirror
|
||||
from osc_lib import utils as osc_utils
|
||||
from osc_lib.utils import columns as column_util
|
||||
|
||||
from openstackclient.network.v2.taas import tap_mirror as osc_tap_mirror
|
||||
from openstackclient.tests.unit.network.v2 import fakes as network_fakes
|
||||
from openstackclient.tests.unit.network.v2.taas import fakes
|
||||
|
||||
|
||||
columns_long = tuple(
|
||||
col
|
||||
for col, _, listing_mode in osc_tap_mirror._attr_map
|
||||
if listing_mode in (column_util.LIST_BOTH, column_util.LIST_LONG_ONLY)
|
||||
)
|
||||
headers_long = tuple(
|
||||
head
|
||||
for _, head, listing_mode in osc_tap_mirror._attr_map
|
||||
if listing_mode in (column_util.LIST_BOTH, column_util.LIST_LONG_ONLY)
|
||||
)
|
||||
sorted_attr_map = sorted(osc_tap_mirror._attr_map, key=operator.itemgetter(1))
|
||||
sorted_columns = tuple(col for col, _, _ in sorted_attr_map)
|
||||
sorted_headers = tuple(head for _, head, _ in sorted_attr_map)
|
||||
|
||||
|
||||
def _get_data(attrs, columns=sorted_columns):
|
||||
return osc_utils.get_dict_properties(attrs, columns)
|
||||
|
||||
|
||||
class TestCreateTapMirror(network_fakes.TestNetworkV2):
|
||||
columns = (
|
||||
'directions',
|
||||
'id',
|
||||
'mirror_type',
|
||||
'name',
|
||||
'port_id',
|
||||
'remote_ip',
|
||||
)
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.cmd = osc_tap_mirror.CreateTapMirror(self.app, None)
|
||||
|
||||
def test_create_tap_mirror(self):
|
||||
port_id = str(uuid.uuid4())
|
||||
fake_tap_mirror = fakes.FakeTapMirror.create_tap_mirror(
|
||||
attrs={'port_id': port_id}
|
||||
)
|
||||
self.app.client_manager.network.create_tap_mirror.return_value = (
|
||||
fake_tap_mirror
|
||||
)
|
||||
self.app.client_manager.network.find_port.return_value = {
|
||||
'id': port_id
|
||||
}
|
||||
self.app.client_manager.network.find_tap_mirror.side_effect = (
|
||||
lambda _, name_or_id: {'id': name_or_id}
|
||||
)
|
||||
arg_list = [
|
||||
'--name',
|
||||
fake_tap_mirror['name'],
|
||||
'--port',
|
||||
fake_tap_mirror['port_id'],
|
||||
'--directions',
|
||||
fake_tap_mirror['directions'],
|
||||
'--remote-ip',
|
||||
fake_tap_mirror['remote_ip'],
|
||||
'--mirror-type',
|
||||
fake_tap_mirror['mirror_type'],
|
||||
]
|
||||
|
||||
verify_directions = fake_tap_mirror['directions'].split('=')
|
||||
verify_directions_dict = {verify_directions[0]: verify_directions[1]}
|
||||
|
||||
verify_list = [
|
||||
('name', fake_tap_mirror['name']),
|
||||
('port_id', fake_tap_mirror['port_id']),
|
||||
('directions', verify_directions_dict),
|
||||
('remote_ip', fake_tap_mirror['remote_ip']),
|
||||
('mirror_type', fake_tap_mirror['mirror_type']),
|
||||
]
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arg_list, verify_list)
|
||||
self.app.client_manager.network.find_tap_mirror.return_value = (
|
||||
fake_tap_mirror
|
||||
)
|
||||
|
||||
columns, data = self.cmd.take_action(parsed_args)
|
||||
create_tap_m_mock = self.app.client_manager.network.create_tap_mirror
|
||||
create_tap_m_mock.assert_called_once_with(
|
||||
**{
|
||||
'name': fake_tap_mirror['name'],
|
||||
'port_id': fake_tap_mirror['port_id'],
|
||||
'directions': verify_directions_dict,
|
||||
'remote_ip': fake_tap_mirror['remote_ip'],
|
||||
'mirror_type': fake_tap_mirror['mirror_type'],
|
||||
}
|
||||
)
|
||||
self.assertEqual(self.columns, columns)
|
||||
fake_data = _get_data(
|
||||
fake_tap_mirror, osc_tap_mirror._get_columns(fake_tap_mirror)[1]
|
||||
)
|
||||
self.assertEqual(fake_data, data)
|
||||
|
||||
|
||||
class TestListTapMirror(network_fakes.TestNetworkV2):
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.cmd = osc_tap_mirror.ListTapMirror(self.app, None)
|
||||
|
||||
def test_list_tap_mirror(self):
|
||||
"""Test List Tap Mirror."""
|
||||
fake_tap_mirrors = fakes.FakeTapMirror.create_tap_mirrors(
|
||||
attrs={'port_id': str(uuid.uuid4())}, count=4
|
||||
)
|
||||
self.app.client_manager.network.tap_mirrors.return_value = (
|
||||
fake_tap_mirrors
|
||||
)
|
||||
|
||||
arg_list = []
|
||||
verify_list = []
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arg_list, verify_list)
|
||||
|
||||
headers, data = self.cmd.take_action(parsed_args)
|
||||
|
||||
self.app.client_manager.network.tap_mirrors.assert_called_once()
|
||||
self.assertEqual(headers, list(headers_long))
|
||||
self.assertCountEqual(
|
||||
list(data),
|
||||
[
|
||||
_get_data(fake_tap_mirror, columns_long)
|
||||
for fake_tap_mirror in fake_tap_mirrors
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
class TestDeleteTapMirror(network_fakes.TestNetworkV2):
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.app.client_manager.network.find_tap_mirror.side_effect = (
|
||||
lambda name_or_id, ignore_missing: tap_mirror.TapMirror(
|
||||
id=name_or_id
|
||||
)
|
||||
)
|
||||
self.cmd = osc_tap_mirror.DeleteTapMirror(self.app, None)
|
||||
|
||||
def test_delete_tap_mirror(self):
|
||||
"""Test Delete Tap Mirror."""
|
||||
|
||||
fake_tap_mirror = fakes.FakeTapMirror.create_tap_mirror(
|
||||
attrs={'port_id': str(uuid.uuid4())}
|
||||
)
|
||||
|
||||
arg_list = [
|
||||
fake_tap_mirror['id'],
|
||||
]
|
||||
verify_list = [
|
||||
(osc_tap_mirror.TAP_MIRROR, [fake_tap_mirror['id']]),
|
||||
]
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arg_list, verify_list)
|
||||
result = self.cmd.take_action(parsed_args)
|
||||
|
||||
mock_delete_tap_m = self.app.client_manager.network.delete_tap_mirror
|
||||
mock_delete_tap_m.assert_called_once_with(fake_tap_mirror['id'])
|
||||
self.assertIsNone(result)
|
||||
|
||||
|
||||
class TestShowTapMirror(network_fakes.TestNetworkV2):
|
||||
columns = (
|
||||
'directions',
|
||||
'id',
|
||||
'mirror_type',
|
||||
'name',
|
||||
'port_id',
|
||||
'remote_ip',
|
||||
)
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.app.client_manager.network.find_tap_mirror.side_effect = (
|
||||
lambda name_or_id, ignore_missing: tap_mirror.TapMirror(
|
||||
id=name_or_id
|
||||
)
|
||||
)
|
||||
self.cmd = osc_tap_mirror.ShowTapMirror(self.app, None)
|
||||
|
||||
def test_show_tap_mirror(self):
|
||||
"""Test Show Tap Mirror."""
|
||||
|
||||
fake_tap_mirror = fakes.FakeTapMirror.create_tap_mirror(
|
||||
attrs={'port_id': str(uuid.uuid4())}
|
||||
)
|
||||
self.app.client_manager.network.get_tap_mirror.return_value = (
|
||||
fake_tap_mirror
|
||||
)
|
||||
arg_list = [
|
||||
fake_tap_mirror['id'],
|
||||
]
|
||||
verify_list = [
|
||||
(osc_tap_mirror.TAP_MIRROR, fake_tap_mirror['id']),
|
||||
]
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arg_list, verify_list)
|
||||
|
||||
headers, data = self.cmd.take_action(parsed_args)
|
||||
|
||||
mock_get_tap_m = self.app.client_manager.network.get_tap_mirror
|
||||
mock_get_tap_m.assert_called_once_with(fake_tap_mirror['id'])
|
||||
self.assertEqual(self.columns, headers)
|
||||
fake_data = _get_data(
|
||||
fake_tap_mirror, osc_tap_mirror._get_columns(fake_tap_mirror)[1]
|
||||
)
|
||||
self.assertEqual(fake_data, data)
|
||||
|
||||
|
||||
class TestUpdateTapMirror(network_fakes.TestNetworkV2):
|
||||
_new_name = 'new_name'
|
||||
columns = (
|
||||
'directions',
|
||||
'id',
|
||||
'mirror_type',
|
||||
'name',
|
||||
'port_id',
|
||||
'remote_ip',
|
||||
)
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.cmd = osc_tap_mirror.UpdateTapMirror(self.app, None)
|
||||
self.app.client_manager.network.find_tap_mirror.side_effect = (
|
||||
lambda name_or_id, ignore_missing: tap_mirror.TapMirror(
|
||||
id=name_or_id
|
||||
)
|
||||
)
|
||||
|
||||
def test_update_tap_mirror(self):
|
||||
"""Test update Tap Mirror"""
|
||||
fake_tap_mirror = fakes.FakeTapMirror.create_tap_mirror(
|
||||
attrs={'port_id': str(uuid.uuid4())}
|
||||
)
|
||||
new_tap_mirror = copy.deepcopy(fake_tap_mirror)
|
||||
new_tap_mirror['name'] = self._new_name
|
||||
|
||||
self.app.client_manager.network.update_tap_mirror.return_value = (
|
||||
new_tap_mirror
|
||||
)
|
||||
|
||||
arg_list = [
|
||||
fake_tap_mirror['id'],
|
||||
'--name',
|
||||
self._new_name,
|
||||
]
|
||||
verify_list = [('name', self._new_name)]
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arg_list, verify_list)
|
||||
columns, data = self.cmd.take_action(parsed_args)
|
||||
attrs = {'name': self._new_name}
|
||||
|
||||
mock_update_tap_m = self.app.client_manager.network.update_tap_mirror
|
||||
mock_update_tap_m.assert_called_once_with(
|
||||
fake_tap_mirror['id'], **attrs
|
||||
)
|
||||
self.assertEqual(self.columns, columns)
|
||||
fake_data = _get_data(
|
||||
new_tap_mirror, osc_tap_mirror._get_columns(new_tap_mirror)[1]
|
||||
)
|
||||
self.assertEqual(fake_data, data)
|
||||
@@ -0,0 +1,266 @@
|
||||
# All Rights Reserved 2020
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import copy
|
||||
import operator
|
||||
import uuid
|
||||
|
||||
from openstack.network.v2 import tap_service
|
||||
from osc_lib import utils as osc_utils
|
||||
from osc_lib.utils import columns as column_util
|
||||
|
||||
from openstackclient.network.v2.taas import tap_service as osc_tap_service
|
||||
from openstackclient.tests.unit.network.v2 import fakes as network_fakes
|
||||
from openstackclient.tests.unit.network.v2.taas import fakes
|
||||
|
||||
|
||||
columns_long = tuple(
|
||||
col
|
||||
for col, _, listing_mode in osc_tap_service._attr_map
|
||||
if listing_mode in (column_util.LIST_BOTH, column_util.LIST_LONG_ONLY)
|
||||
)
|
||||
headers_long = tuple(
|
||||
head
|
||||
for _, head, listing_mode in osc_tap_service._attr_map
|
||||
if listing_mode in (column_util.LIST_BOTH, column_util.LIST_LONG_ONLY)
|
||||
)
|
||||
sorted_attr_map = sorted(osc_tap_service._attr_map, key=operator.itemgetter(1))
|
||||
sorted_columns = tuple(col for col, _, _ in sorted_attr_map)
|
||||
sorted_headers = tuple(head for _, head, _ in sorted_attr_map)
|
||||
|
||||
|
||||
def _get_data(attrs, columns=sorted_columns):
|
||||
return osc_utils.get_dict_properties(attrs, columns)
|
||||
|
||||
|
||||
class TestCreateTapService(network_fakes.TestNetworkV2):
|
||||
columns = (
|
||||
'id',
|
||||
'name',
|
||||
'port_id',
|
||||
'status',
|
||||
)
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.cmd = osc_tap_service.CreateTapService(self.app, None)
|
||||
|
||||
def test_create_tap_service(self):
|
||||
"""Test Create Tap Service."""
|
||||
port_id = str(uuid.uuid4())
|
||||
fake_tap_service = fakes.FakeTapService.create_tap_service(
|
||||
attrs={'port_id': port_id}
|
||||
)
|
||||
self.app.client_manager.network.create_tap_service.return_value = (
|
||||
fake_tap_service
|
||||
)
|
||||
self.app.client_manager.network.find_port.return_value = {
|
||||
'id': port_id
|
||||
}
|
||||
self.app.client_manager.network.find_tap_service.side_effect = (
|
||||
lambda _, name_or_id: {'id': name_or_id}
|
||||
)
|
||||
arg_list = [
|
||||
'--name',
|
||||
fake_tap_service['name'],
|
||||
'--port',
|
||||
fake_tap_service['port_id'],
|
||||
]
|
||||
|
||||
verify_list = [
|
||||
('name', fake_tap_service['name']),
|
||||
('port_id', fake_tap_service['port_id']),
|
||||
]
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arg_list, verify_list)
|
||||
self.app.client_manager.network.find_tap_service.return_value = (
|
||||
fake_tap_service
|
||||
)
|
||||
|
||||
columns, data = self.cmd.take_action(parsed_args)
|
||||
create_tap_s_mock = self.app.client_manager.network.create_tap_service
|
||||
create_tap_s_mock.assert_called_once_with(
|
||||
**{
|
||||
'name': fake_tap_service['name'],
|
||||
'port_id': fake_tap_service['port_id'],
|
||||
}
|
||||
)
|
||||
self.assertEqual(self.columns, columns)
|
||||
fake_data = _get_data(
|
||||
fake_tap_service, osc_tap_service._get_columns(fake_tap_service)[1]
|
||||
)
|
||||
self.assertEqual(fake_data, data)
|
||||
|
||||
|
||||
class TestListTapService(network_fakes.TestNetworkV2):
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.cmd = osc_tap_service.ListTapService(self.app, None)
|
||||
|
||||
def test_list_tap_service(self):
|
||||
"""Test List Tap Service."""
|
||||
fake_tap_services = fakes.FakeTapService.create_tap_services(
|
||||
attrs={'port_id': str(uuid.uuid4())}, count=4
|
||||
)
|
||||
self.app.client_manager.network.tap_services.return_value = (
|
||||
fake_tap_services
|
||||
)
|
||||
|
||||
arg_list = []
|
||||
verify_list = []
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arg_list, verify_list)
|
||||
|
||||
headers, data = self.cmd.take_action(parsed_args)
|
||||
|
||||
self.app.client_manager.network.tap_services.assert_called_once()
|
||||
self.assertEqual(headers, list(headers_long))
|
||||
self.assertCountEqual(
|
||||
list(data),
|
||||
[
|
||||
_get_data(fake_tap_service, columns_long)
|
||||
for fake_tap_service in fake_tap_services
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
class TestDeleteTapService(network_fakes.TestNetworkV2):
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.app.client_manager.network.find_tap_service.side_effect = (
|
||||
lambda name_or_id, ignore_missing: tap_service.TapService(
|
||||
id=name_or_id
|
||||
)
|
||||
)
|
||||
self.cmd = osc_tap_service.DeleteTapService(self.app, None)
|
||||
|
||||
def test_delete_tap_service(self):
|
||||
"""Test Delete tap service."""
|
||||
|
||||
fake_tap_service = fakes.FakeTapService.create_tap_service(
|
||||
attrs={'port_id': str(uuid.uuid4())}
|
||||
)
|
||||
|
||||
arg_list = [
|
||||
fake_tap_service['id'],
|
||||
]
|
||||
verify_list = [
|
||||
(osc_tap_service.TAP_SERVICE, [fake_tap_service['id']]),
|
||||
]
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arg_list, verify_list)
|
||||
result = self.cmd.take_action(parsed_args)
|
||||
|
||||
mock_delete_tap_s = self.app.client_manager.network.delete_tap_service
|
||||
mock_delete_tap_s.assert_called_once_with(fake_tap_service['id'])
|
||||
self.assertIsNone(result)
|
||||
|
||||
|
||||
class TestShowTapService(network_fakes.TestNetworkV2):
|
||||
columns = (
|
||||
'id',
|
||||
'name',
|
||||
'port_id',
|
||||
'status',
|
||||
)
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.app.client_manager.network.find_tap_service.side_effect = (
|
||||
lambda name_or_id, ignore_missing: tap_service.TapService(
|
||||
id=name_or_id
|
||||
)
|
||||
)
|
||||
self.cmd = osc_tap_service.ShowTapService(self.app, None)
|
||||
|
||||
def test_show_tap_service(self):
|
||||
"""Test Show tap service."""
|
||||
|
||||
fake_tap_service = fakes.FakeTapService.create_tap_service(
|
||||
attrs={'port_id': str(uuid.uuid4())}
|
||||
)
|
||||
self.app.client_manager.network.get_tap_service.return_value = (
|
||||
fake_tap_service
|
||||
)
|
||||
arg_list = [
|
||||
fake_tap_service['id'],
|
||||
]
|
||||
verify_list = [
|
||||
(osc_tap_service.TAP_SERVICE, fake_tap_service['id']),
|
||||
]
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arg_list, verify_list)
|
||||
|
||||
headers, data = self.cmd.take_action(parsed_args)
|
||||
|
||||
mock_get_tap_s = self.app.client_manager.network.get_tap_service
|
||||
mock_get_tap_s.assert_called_once_with(fake_tap_service['id'])
|
||||
self.assertEqual(self.columns, headers)
|
||||
fake_data = _get_data(
|
||||
fake_tap_service, osc_tap_service._get_columns(fake_tap_service)[1]
|
||||
)
|
||||
self.assertEqual(fake_data, data)
|
||||
|
||||
|
||||
class TestUpdateTapService(network_fakes.TestNetworkV2):
|
||||
_new_name = 'new_name'
|
||||
|
||||
columns = (
|
||||
'id',
|
||||
'name',
|
||||
'port_id',
|
||||
'status',
|
||||
)
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.cmd = osc_tap_service.UpdateTapService(self.app, None)
|
||||
self.app.client_manager.network.find_tap_service.side_effect = (
|
||||
lambda name_or_id, ignore_missing: tap_service.TapService(
|
||||
id=name_or_id
|
||||
)
|
||||
)
|
||||
|
||||
def test_update_tap_service(self):
|
||||
"""Test update tap service"""
|
||||
fake_tap_service = fakes.FakeTapService.create_tap_service(
|
||||
attrs={'port_id': str(uuid.uuid4())}
|
||||
)
|
||||
new_tap_service = copy.deepcopy(fake_tap_service)
|
||||
new_tap_service['name'] = self._new_name
|
||||
|
||||
self.app.client_manager.network.update_tap_service.return_value = (
|
||||
new_tap_service
|
||||
)
|
||||
|
||||
arg_list = [
|
||||
fake_tap_service['id'],
|
||||
'--name',
|
||||
self._new_name,
|
||||
]
|
||||
verify_list = [('name', self._new_name)]
|
||||
|
||||
parsed_args = self.check_parser(self.cmd, arg_list, verify_list)
|
||||
columns, data = self.cmd.take_action(parsed_args)
|
||||
attrs = {'name': self._new_name}
|
||||
|
||||
mock_update_tap_s = self.app.client_manager.network.update_tap_service
|
||||
mock_update_tap_s.assert_called_once_with(
|
||||
fake_tap_service['id'], **attrs
|
||||
)
|
||||
self.assertEqual(self.columns, columns)
|
||||
fake_data = _get_data(
|
||||
new_tap_service, osc_tap_service._get_columns(new_tap_service)[1]
|
||||
)
|
||||
self.assertEqual(fake_data, data)
|
||||
@@ -540,6 +540,23 @@ subnet_pool_set = "openstackclient.network.v2.subnet_pool:SetSubnetPool"
|
||||
subnet_pool_show = "openstackclient.network.v2.subnet_pool:ShowSubnetPool"
|
||||
subnet_pool_unset = "openstackclient.network.v2.subnet_pool:UnsetSubnetPool"
|
||||
|
||||
# Tap-as-a-Service
|
||||
tap_flow_create = "openstackclient.network.v2.taas.tap_flow:CreateTapFlow"
|
||||
tap_flow_delete = "openstackclient.network.v2.taas.tap_flow:DeleteTapFlow"
|
||||
tap_flow_list = "openstackclient.network.v2.taas.tap_flow:ListTapFlow"
|
||||
tap_flow_show = "openstackclient.network.v2.taas.tap_flow:ShowTapFlow"
|
||||
tap_flow_update = "openstackclient.network.v2.taas.tap_flow:UpdateTapFlow"
|
||||
tap_mirror_create = "openstackclient.network.v2.taas.tap_mirror:CreateTapMirror"
|
||||
tap_mirror_delete = "openstackclient.network.v2.taas.tap_mirror:DeleteTapMirror"
|
||||
tap_mirror_list = "openstackclient.network.v2.taas.tap_mirror:ListTapMirror"
|
||||
tap_mirror_show = "openstackclient.network.v2.taas.tap_mirror:ShowTapMirror"
|
||||
tap_mirror_update = "openstackclient.network.v2.taas.tap_mirror:UpdateTapMirror"
|
||||
tap_service_create = "openstackclient.network.v2.taas.tap_service:CreateTapService"
|
||||
tap_service_delete = "openstackclient.network.v2.taas.tap_service:DeleteTapService"
|
||||
tap_service_list = "openstackclient.network.v2.taas.tap_service:ListTapService"
|
||||
tap_service_show = "openstackclient.network.v2.taas.tap_service:ShowTapService"
|
||||
tap_service_update = "openstackclient.network.v2.taas.tap_service:UpdateTapService"
|
||||
|
||||
[project.entry-points."openstack.object_store.v1"]
|
||||
object_store_account_set = "openstackclient.object.v1.account:SetAccount"
|
||||
object_store_account_show = "openstackclient.object.v1.account:ShowAccount"
|
||||
|
||||
Reference in New Issue
Block a user