CLI for Tap Mirrors

Depends-On: https://review.opendev.org/c/886084
Of course the release of this patch is the depends-on
Change-Id: Ibd3e956d5104538c06193344f9daf2cdf562236e
Related-Bug: #2015471
This commit is contained in:
elajkat
2023-06-14 11:34:54 +02:00
committed by Michal Arbet
parent a2523e1e29
commit e6f997f688
6 changed files with 599 additions and 0 deletions

View File

@ -81,6 +81,59 @@ TapFlow Represents the port from which the traffic needs to be mirrored.
Multiple TapFlow instances can be associated with a single TapService
instance.
Tap Mirror
----------
A ``tapmirror`` mirrors the traffic of a Neutron port using ``gre`` or
``erspan v1`` tunnels.
.. code-block:: python
'tap_mirrors': {
'id': {
'allow_post': False, 'allow_put': False,
'validate': {'type:uuid': None}, 'is_visible': True,
'primary_key': True},
'project_id': {
'allow_post': True, 'allow_put': False,
'validate': {'type:string': db_const.PROJECT_ID_FIELD_SIZE},
'required_by_policy': True, 'is_filter': True,
'is_sort_key': True, 'is_visible': True},
'name': {
'allow_post': True, 'allow_put': True,
'validate': {'type:string': None},
'is_visible': True, 'default': ''},
'description': {
'allow_post': True, 'allow_put': True,
'validate': {'type:string': None},
'is_visible': True, 'default': ''},
'port_id': {
'allow_post': True, 'allow_put': False,
'validate': {'type:uuid': None},
'enforce_policy': True, 'is_visible': True},
'directions': {
'allow_post': True, 'allow_put': False,
'validate': DIRECTION_SPEC,
'is_visible': True},
'remote_ip': {
'allow_post': True, 'allow_put': False,
'validate': {'type:ip_address': None},
'is_visible': True},
'mirror_type': {
'allow_post': True, 'allow_put': False,
'validate': {'type:values': mirror_types_list},
'is_visible': True},
}
mirror_types_list = ['erspanv1', 'gre']
DIRECTION_SPEC = {
'type:dict': {
'IN': {'type:integer': None, 'default': None, 'required': False},
'OUT': {'type:integer': None, 'default': None, 'required': False}
}
}
API REFERENCE
=============
@ -116,3 +169,15 @@ and tap-as-a-service has an extension for taas related commands.
* Update tap flow **openstack tap flow update** <tap flow id/tap flow name> --name <new name of the tap flow> --description <new description of the tap flow>
Tap Mirror CLI
^^^^^^^^^^^^^^
* Create tap mirror: **openstack tap mirror create** --port <name or ID of the port to mirror> --name <name of the tap mirror> --directions <direction of the mirroring, a direction and mirror id pair, direction can be IN and OUT can be repeated to represent both IN and OUT> --remote-ip <The IP of the destination of the mirroring> --mirror-type <the type of the mirroring can be ger and erspanv1>
* List tap mirror: **openstack tap mirror list**
* Show tap mirror: **openstack tap mirror show** <tap mirror id/tap mirror name>
* Delete tap mirror: **openstack tap mirror delete** <tap mirror id/tap mirror name>
* Update tap mirror: **openstack tap mirror update** <tap mirror id or name> --name <new name of the tap mirror> --description <new description of the tap mirror>

View File

@ -0,0 +1,222 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from osc_lib.cli import identity as identity_utils
from osc_lib.command import command
from osc_lib import exceptions
from osc_lib import utils as osc_utils
from osc_lib.utils import columns as column_util
from openstackclient.network.v2 import port as osc_port
from neutron_taas._i18n import _
from neutron_taas.taas_client.osc import tap_service
LOG = logging.getLogger(__name__)
TAP_MIRROR = 'tap_mirror'
TAP_MIRRORS = '%ss' % TAP_MIRROR
_attr_map = (
('id', 'ID', column_util.LIST_BOTH),
('tenant_id', 'Tenant', column_util.LIST_LONG_ONLY),
('name', 'Name', column_util.LIST_BOTH),
('port_id', 'Port', column_util.LIST_BOTH),
('directions', 'Directions', column_util.LIST_LONG_ONLY),
('remote_ip', 'Remote IP', column_util.LIST_BOTH),
('mirror_type', 'Mirror Type', column_util.LIST_LONG_ONLY),
)
def _get_columns(item):
column_map = {}
hidden_columns = ['location', 'tenant_id']
return osc_utils.get_osc_show_columns_for_sdk_resource(
item,
column_map,
hidden_columns
)
class CreateTapMirror(command.ShowOne):
_description = _("Create a Tap Mirror")
def get_parser(self, prog_name):
parser = super().get_parser(prog_name)
identity_utils.add_project_owner_option_to_parser(parser)
tap_service._add_updatable_args(parser)
parser.add_argument(
'--port',
dest='port_id',
required=True,
metavar="PORT",
help=_('Port to which the Tap Mirror is connected.'))
parser.add_argument(
'--directions',
dest='directions',
action=osc_port.JSONKeyValueAction,
required=True,
help=_('A dictionary of direction and tunnel_id. Direction can '
'be IN and OUT.'))
parser.add_argument(
'--remote-ip',
dest='remote_ip',
required=True,
help=_('The remote IP of the Tap Mirror, this will be the '
'remote end of the GRE or ERSPAN v1 tunnel'))
parser.add_argument(
'--mirror-type',
dest='mirror_type',
required=True,
help=_('The type of the mirroring, it can be gre or erspanv1'))
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.network
attrs = {}
if parsed_args.name is not None:
attrs['name'] = str(parsed_args.name)
if parsed_args.description is not None:
attrs['description'] = str(parsed_args.description)
if parsed_args.port_id is not None:
port_id = client.find_port(parsed_args.port_id)['id']
attrs['port_id'] = port_id
if parsed_args.directions is not None:
attrs['directions'] = parsed_args.directions
if parsed_args.remote_ip is not None:
attrs['remote_ip'] = parsed_args.remote_ip
if parsed_args.mirror_type is not None:
attrs['mirror_type'] = parsed_args.mirror_type
if 'project' in parsed_args and parsed_args.project is not None:
project_id = identity_utils.find_project(
self.app.client_manager.identity,
parsed_args.project,
parsed_args.project_domain,
).id
attrs['tenant_id'] = project_id
obj = client.create_tap_mirror(**attrs)
display_columns, columns = tap_service._get_columns(obj)
data = osc_utils.get_dict_properties(obj, columns)
return display_columns, data
class ListTapMirror(command.Lister):
_description = _("List Tap Mirrors that belong to a given tenant")
def get_parser(self, prog_name):
parser = super().get_parser(prog_name)
identity_utils.add_project_owner_option_to_parser(parser)
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.network
params = {}
if parsed_args.project is not None:
project_id = identity_utils.find_project(
self.app.client_manager.identity,
parsed_args.project,
parsed_args.project_domain,
).id
params['tenant_id'] = project_id
objs = client.tap_mirrors(retrieve_all=True, params=params)
headers, columns = column_util.get_column_definitions(
_attr_map, long_listing=True)
return (headers, (osc_utils.get_dict_properties(
s, columns) for s in objs))
class ShowTapMirror(command.ShowOne):
_description = _("Show information of a given Tap Mirror")
def get_parser(self, prog_name):
parser = super().get_parser(prog_name)
parser.add_argument(
TAP_MIRROR,
metavar="<%s>" % TAP_MIRROR,
help=_("ID or name of Tap Mirror to look up."),
)
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.network
id = client.find_tap_mirror(parsed_args.tap_mirror,
ignore_missing=False).id
obj = client.get_tap_mirror(id)
display_columns, columns = tap_service._get_columns(obj)
data = osc_utils.get_dict_properties(obj, columns)
return display_columns, data
class DeleteTapMirror(command.Command):
_description = _("Delete a Tap Mirror")
def get_parser(self, prog_name):
parser = super().get_parser(prog_name)
parser.add_argument(
TAP_MIRROR,
metavar="<%s>" % TAP_MIRROR,
nargs="+",
help=_("ID(s) or name(s) of the Tap Mirror to delete."),
)
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.network
fails = 0
for id_or_name in parsed_args.tap_mirror:
try:
id = client.find_tap_mirror(id_or_name,
ignore_missing=False).id
client.delete_tap_mirror(id)
LOG.warning("Tap Mirror %(id)s deleted", {'id': id})
except Exception as e:
fails += 1
LOG.error("Failed to delete Tap Mirror with name or ID "
"'%(id_or_name)s': %(e)s",
{'id_or_name': id_or_name, 'e': e})
if fails > 0:
msg = (_("Failed to delete %(fails)s of %(total)s Tap Mirror.") %
{'fails': fails, 'total': len(parsed_args.tap_mirror)})
raise exceptions.CommandError(msg)
class UpdateTapMirror(command.ShowOne):
_description = _("Update a Tap Mirror.")
def get_parser(self, prog_name):
parser = super().get_parser(prog_name)
parser.add_argument(
TAP_MIRROR,
metavar="<%s>" % TAP_MIRROR,
help=_("ID or name of the Tap Mirror to update."),
)
tap_service._add_updatable_args(parser)
return parser
def take_action(self, parsed_args):
client = self.app.client_manager.network
original_t_s = client.find_tap_mirror(parsed_args.tap_mirror,
ignore_missing=False).id
attrs = {}
if parsed_args.name is not None:
attrs['name'] = str(parsed_args.name)
if parsed_args.description is not None:
attrs['description'] = str(parsed_args.description)
obj = client.update_tap_mirror(original_t_s, **attrs)
display_columns, columns = tap_service._get_columns(obj)
data = osc_utils.get_dict_properties(obj, columns)
return display_columns, data

View File

@ -81,3 +81,42 @@ class FakeTapFlow:
tap_flows.append(FakeTapFlow.create_tap_flow(attrs=attrs))
return tap_flows
class FakeTapMirror(object):
@staticmethod
def create_tap_mirror(attrs=None):
"""Create a fake tap mirror."""
attrs = attrs or {}
tap_mirror_attrs = {
'id': uuidutils.generate_uuid(),
'tenant_id': uuidutils.generate_uuid(),
'name': 'test_tap_mirror' + uuidutils.generate_uuid(),
'port_id': uuidutils.generate_uuid(),
'directions': 'IN=99',
'remote_ip': '192.10.10.2',
'mirror_type': 'gre',
}
tap_mirror_attrs.update(attrs)
return copy.deepcopy(tap_mirror_attrs)
@staticmethod
def create_tap_mirrors(attrs=None, count=1):
"""Create multiple fake tap mirrors."""
tap_mirrors = []
for i in range(0, count):
if attrs is None:
attrs = {
'id': 'fake_id%d' % i,
'port_id': uuidutils.generate_uuid(),
'name': 'test_tap_mirror_%d' % i,
'directions': 'IN=%d' % 99 + i,
'remote_ip': '192.10.10.%d' % (i + 3),
}
elif getattr(attrs, 'id', None) is None:
attrs['id'] = 'fake_id%d' % i
tap_mirrors.append(FakeTapMirror.create_tap_mirror(attrs=attrs))
return tap_mirrors

View File

@ -0,0 +1,267 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import operator
from unittest import mock
from neutronclient.tests.unit.osc.v2 import fakes as test_fakes
from openstack.network.v2 import tap_mirror
from osc_lib import utils as osc_utils
from osc_lib.utils import columns as column_util
from oslo_utils import uuidutils
from neutron_taas.taas_client.osc import tap_mirror as osc_tap_mirror
from neutron_taas.tests.unit.taas_client.osc import fakes
columns_long = tuple(col for col, _, listing_mode in osc_tap_mirror._attr_map
if listing_mode in (column_util.LIST_BOTH,
column_util.LIST_LONG_ONLY))
headers_long = tuple(head for _, head, listing_mode in
osc_tap_mirror._attr_map if listing_mode in
(column_util.LIST_BOTH, column_util.LIST_LONG_ONLY))
sorted_attr_map = sorted(osc_tap_mirror._attr_map, key=operator.itemgetter(1))
sorted_columns = tuple(col for col, _, _ in sorted_attr_map)
sorted_headers = tuple(head for _, head, _ in sorted_attr_map)
def _get_data(attrs, columns=sorted_columns):
return osc_utils.get_dict_properties(attrs, columns)
class TestCreateTapMirror(test_fakes.TestNeutronClientOSCV2):
columns = (
'directions',
'id',
'mirror_type',
'name',
'port_id',
'remote_ip',
)
def setUp(self):
super().setUp()
self.cmd = osc_tap_mirror.CreateTapMirror(self.app, self.namespace)
def test_create_tap_mirror(self):
port_id = uuidutils.generate_uuid()
fake_tap_mirror = fakes.FakeTapMirror.create_tap_mirror(
attrs={'port_id': port_id}
)
self.app.client_manager.network = mock.Mock()
self.app.client_manager.network.create_tap_mirror = mock.Mock(
return_value=fake_tap_mirror)
self.app.client_manager.network.find_port = mock.Mock(
return_value={'id': port_id})
self.app.client_manager.network.find_tap_mirror = mock.Mock(
side_effect=lambda _, name_or_id: {'id': name_or_id})
arg_list = [
'--name', fake_tap_mirror['name'],
'--port', fake_tap_mirror['port_id'],
'--directions', fake_tap_mirror['directions'],
'--remote-ip', fake_tap_mirror['remote_ip'],
'--mirror-type', fake_tap_mirror['mirror_type'],
]
verify_directions = fake_tap_mirror['directions'].split('=')
verify_directions_dict = {verify_directions[0]: verify_directions[1]}
verify_list = [
('name', fake_tap_mirror['name']),
('port_id', fake_tap_mirror['port_id']),
('directions', verify_directions_dict),
('remote_ip', fake_tap_mirror['remote_ip']),
('mirror_type', fake_tap_mirror['mirror_type']),
]
parsed_args = self.check_parser(self.cmd, arg_list, verify_list)
self.app.client_manager.network.find_tap_mirror = mock.Mock(
return_value=fake_tap_mirror)
columns, data = self.cmd.take_action(parsed_args)
create_tap_m_mock = self.app.client_manager.network.create_tap_mirror
create_tap_m_mock.assert_called_once_with(
**{'name': fake_tap_mirror['name'],
'port_id': fake_tap_mirror['port_id'],
'directions': verify_directions_dict,
'remote_ip': fake_tap_mirror['remote_ip'],
'mirror_type': fake_tap_mirror['mirror_type']})
self.assertEqual(self.columns, columns)
fake_data = _get_data(
fake_tap_mirror,
osc_tap_mirror._get_columns(fake_tap_mirror)[1])
self.assertEqual(fake_data, data)
class TestListTapMirror(test_fakes.TestNeutronClientOSCV2):
def setUp(self):
super().setUp()
self.cmd = osc_tap_mirror.ListTapMirror(self.app, self.namespace)
def test_list_tap_mirror(self):
"""Test List Tap Mirror."""
fake_tap_mirrors = fakes.FakeTapMirror.create_tap_mirrors(
attrs={'port_id': uuidutils.generate_uuid()},
count=4)
self.app.client_manager.network = mock.Mock()
self.app.client_manager.network.tap_mirrors = mock.Mock(
return_value=fake_tap_mirrors)
arg_list = []
verify_list = []
parsed_args = self.check_parser(self.cmd, arg_list, verify_list)
headers, data = self.cmd.take_action(parsed_args)
self.app.client_manager.network.tap_mirrors.assert_called_once()
self.assertEqual(headers, list(headers_long))
self.assertListItemEqual(
list(data),
[_get_data(fake_tap_mirror, columns_long) for fake_tap_mirror
in fake_tap_mirrors]
)
class TestDeleteTapMirror(test_fakes.TestNeutronClientOSCV2):
def setUp(self):
super().setUp()
self.app.client_manager.network = mock.Mock()
self.app.client_manager.network.find_tap_mirror = mock.Mock(
side_effect=lambda name_or_id, ignore_missing:
tap_mirror.TapMirror(id=name_or_id))
self.cmd = osc_tap_mirror.DeleteTapMirror(self.app, self.namespace)
def test_delete_tap_mirror(self):
"""Test Delete Tap Mirror."""
fake_tap_mirror = fakes.FakeTapMirror.create_tap_mirror(
attrs={'port_id': uuidutils.generate_uuid()}
)
self.app.client_manager.network.delete_tap_mirror = mock.Mock()
arg_list = [
fake_tap_mirror['id'],
]
verify_list = [
(osc_tap_mirror.TAP_MIRROR, [fake_tap_mirror['id']]),
]
parsed_args = self.check_parser(self.cmd, arg_list, verify_list)
result = self.cmd.take_action(parsed_args)
mock_delete_tap_m = self.app.client_manager.network.delete_tap_mirror
mock_delete_tap_m.assert_called_once_with(fake_tap_mirror['id'])
self.assertIsNone(result)
class TestShowTapMirror(test_fakes.TestNeutronClientOSCV2):
columns = (
'directions',
'id',
'mirror_type',
'name',
'port_id',
'remote_ip',
)
def setUp(self):
super().setUp()
self.app.client_manager.network = mock.Mock()
self.app.client_manager.network.find_tap_mirror = mock.Mock(
side_effect=lambda name_or_id, ignore_missing:
tap_mirror.TapMirror(id=name_or_id))
self.cmd = osc_tap_mirror.ShowTapMirror(self.app, self.namespace)
def test_show_tap_mirror(self):
"""Test Show Tap Mirror."""
fake_tap_mirror = fakes.FakeTapMirror.create_tap_mirror(
attrs={'port_id': uuidutils.generate_uuid()}
)
self.app.client_manager.network.get_tap_mirror = mock.Mock(
return_value=fake_tap_mirror)
arg_list = [
fake_tap_mirror['id'],
]
verify_list = [
(osc_tap_mirror.TAP_MIRROR, fake_tap_mirror['id']),
]
parsed_args = self.check_parser(self.cmd, arg_list, verify_list)
headers, data = self.cmd.take_action(parsed_args)
mock_get_tap_m = self.app.client_manager.network.get_tap_mirror
mock_get_tap_m.assert_called_once_with(
fake_tap_mirror['id'])
self.assertEqual(self.columns, headers)
fake_data = _get_data(
fake_tap_mirror,
osc_tap_mirror._get_columns(fake_tap_mirror)[1])
self.assertItemEqual(fake_data, data)
class TestUpdateTapMirror(test_fakes.TestNeutronClientOSCV2):
_new_name = 'new_name'
columns = (
'directions',
'id',
'mirror_type',
'name',
'port_id',
'remote_ip',
)
def setUp(self):
super().setUp()
self.cmd = osc_tap_mirror.UpdateTapMirror(self.app, self.namespace)
self.app.client_manager.network = mock.Mock()
self.app.client_manager.network.find_tap_mirror = mock.Mock(
side_effect=lambda name_or_id, ignore_missing:
tap_mirror.TapMirror(id=name_or_id))
def test_update_tap_mirror(self):
"""Test update Tap Mirror"""
fake_tap_mirror = fakes.FakeTapMirror.create_tap_mirror(
attrs={'port_id': uuidutils.generate_uuid()}
)
new_tap_mirror = copy.deepcopy(fake_tap_mirror)
new_tap_mirror['name'] = self._new_name
self.app.client_manager.network.update_tap_mirror = mock.Mock(
return_value=new_tap_mirror)
arg_list = [
fake_tap_mirror['id'],
'--name', self._new_name,
]
verify_list = [('name', self._new_name)]
parsed_args = self.check_parser(self.cmd, arg_list, verify_list)
columns, data = self.cmd.take_action(parsed_args)
attrs = {'name': self._new_name}
mock_update_tap_m = self.app.client_manager.network.update_tap_mirror
mock_update_tap_m.assert_called_once_with(
fake_tap_mirror['id'], **attrs)
self.assertEqual(self.columns, columns)
fake_data = _get_data(
new_tap_mirror,
osc_tap_mirror._get_columns(new_tap_mirror)[1])
self.assertItemEqual(fake_data, data)

View File

@ -2,6 +2,7 @@ pbr>=5.5.0 # Apache-2.0
neutron>=16.0.0.0b1 # Apache-2.0
neutron-lib>=2.11.0 # Apache-2.0
openstacksdk>=0.102.0 # Apache-2.0
python-openstackclient>=3.12.0 # Apache-2.0
osc-lib>=2.3.0 # Apache-2.0
# Opt-in for neutron-lib consumption patches

View File

@ -59,6 +59,11 @@ openstack.neutronclient.v2 =
tap_flow_show = neutron_taas.taas_client.osc.tap_flow:ShowTapFlow
tap_flow_delete = neutron_taas.taas_client.osc.tap_flow:DeleteTapFlow
tap_flow_update = neutron_taas.taas_client.osc.tap_flow:UpdateTapFlow
tap_mirror_create = neutron_taas.taas_client.osc.tap_mirror:CreateTapMirror
tap_mirror_list = neutron_taas.taas_client.osc.tap_mirror:ListTapMirror
tap_mirror_show = neutron_taas.taas_client.osc.tap_mirror:ShowTapMirror
tap_mirror_delete = neutron_taas.taas_client.osc.tap_mirror:DeleteTapMirror
tap_mirror_update = neutron_taas.taas_client.osc.tap_mirror:UpdateTapMirror
[pbr]