diff --git a/tobiko/openstack/openstackclient/__init__.py b/tobiko/openstack/openstackclient/__init__.py index 868cf7b81..57682ce88 100644 --- a/tobiko/openstack/openstackclient/__init__.py +++ b/tobiko/openstack/openstackclient/__init__.py @@ -22,6 +22,7 @@ from tobiko.openstack.openstackclient import _port from tobiko.openstack.openstackclient import _security_group from tobiko.openstack.openstackclient import _security_group_rule from tobiko.openstack.openstackclient import _subnet +from tobiko.openstack.openstackclient import _log OSPCliError = _exception.OSPCliAuthError @@ -61,3 +62,9 @@ subnet_create = _subnet.subnet_create subnet_delete = _subnet.subnet_delete subnet_set = _subnet.subnet_set subnet_unset = _subnet.subnet_unset + +network_loggable_resources_list = _log.network_loggable_resources_list +network_log_create = _log.network_log_create +network_log_show = _log.network_log_show +network_log_list = _log.network_log_list +network_log_delete = _log.network_log_delete diff --git a/tobiko/openstack/openstackclient/_log.py b/tobiko/openstack/openstackclient/_log.py new file mode 100644 index 000000000..ee9f3a30e --- /dev/null +++ b/tobiko/openstack/openstackclient/_log.py @@ -0,0 +1,47 @@ +# Copyright (c) 2021 Red Hat, Inc. +# +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +from __future__ import absolute_import + +from tobiko.openstack.openstackclient import _client + + +def network_loggable_resources_list(*args, **kwargs): + cmd = 'openstack network loggable resources list {params}' + kwargs['format'] = 'json' + return _client.execute(cmd, *args, **kwargs) + + +def network_log_create(log_name, *args, **kwargs): + cmd = f'openstack network log create {{params}} {log_name}' + kwargs['format'] = 'json' + return _client.execute(cmd, *args, **kwargs) + + +def network_log_show(log_name, *args, **kwargs): + cmd = f'openstack network log show {{params}} {log_name}' + kwargs['format'] = 'json' + return _client.execute(cmd, *args, **kwargs) + + +def network_log_list(*args, **kwargs): + cmd = 'openstack network log list {params}' + kwargs['format'] = 'json' + return _client.execute(cmd, *args, **kwargs) + + +def network_log_delete(log_names, *args, **kwargs): + cmd = f'openstack network log delete {{params}} {" ".join(log_names)}' + return _client.execute(cmd, *args, **kwargs) diff --git a/tobiko/tests/scenario/neutron/test_cli.py b/tobiko/tests/scenario/neutron/test_cli.py index 383e4b692..de60ce74b 100644 --- a/tobiko/tests/scenario/neutron/test_cli.py +++ b/tobiko/tests/scenario/neutron/test_cli.py @@ -19,8 +19,11 @@ import random import testtools from oslo_log import log +import tobiko from tobiko.openstack import neutron from tobiko.openstack import openstackclient +from tobiko.openstack import stacks +from tobiko.openstack import topology LOG = log.getLogger(__name__) @@ -62,6 +65,16 @@ class BaseCliTest(testtools.TestCase): self.api.delete_port(port_name) break + def api_network_log_delete(self, log_name): + logs = self.api.list_network_logs()['logs'] + for _log in logs: + if _log['name'] == log_name: + self.api.delete_network_log(_log['id']) + break + if _log['id'] == log_name: + self.api.delete_network_log(_log['id']) + break + def api_random_port_create(self): net_name = self.random_name() port_name = self.random_name() @@ -91,13 +104,114 @@ class BaseCliTest(testtools.TestCase): self.addCleanup(self.api_network_delete, name) return name + def api_random_network_log_create(self, sec_group_id=None): + name = self.random_name() + api_args = {'log': {'name': name, + 'resource_type': 'security_group'}} + if sec_group_id: + api_args['log']['resource_id'] = sec_group_id + self.api.create_network_log(api_args) + self.addCleanup(self.api_network_log_delete, name) + return name + def random_name(self, length=16): letters = 'abcdefghijklmnopqrstuvwxyz' random_string = ''.join(random.choice(letters) for i in range(length)) return f'{self.__class__.__name__}-{random_string}' -class NeutronCliNetwork(BaseCliTest): +@neutron.skip_unless_is_ovs() +@topology.skip_unless_osp_version('16.1', higher=True) +@neutron.skip_if_missing_networking_extensions('logging') +class NeutronOvsLogCliTest(BaseCliTest): + + LOGS_AMOUNT = 2 + + sec_groups_stack = tobiko.required_setup_fixture( + stacks.SecurityGroupsFixture) + + def _get_icmp_sec_group_id(self): + """Returns the uuid of a security group with ICMP allowed.""" + sec_group_obj = self.sec_groups_stack.icmp_security_group_id + if hasattr(sec_group_obj, '__iter__'): + return sec_group_obj + return sec_group_obj[0] + + def test_network_loggable_resources_list(self): + response = openstackclient.network_loggable_resources_list() + self.assertIn('security_group', response[0]['Supported types'], + "Security group logging isn't supported.") + + def test_network_log_create(self): + log_name = self.random_name() + test_sec_group_id = self._get_icmp_sec_group_id() + response = openstackclient.network_log_create( + log_name, **{'resource-type': 'security_group', + 'resource': test_sec_group_id, + 'event': 'ALL'}) + self.addCleanup(self.api_network_log_delete, log_name) + err_msg = 'Creation of log for security group using CLI failed.' + # pylint: disable=E1126 + self.assertEqual(response['Enabled'], True, err_msg) + self.assertEqual(response['Type'], 'security_group', err_msg) + self.assertEqual(response['Event'], 'ALL', err_msg) + + def test_network_log_show(self): + test_sec_group_id = self._get_icmp_sec_group_id() + log_name = self.api_random_network_log_create(test_sec_group_id) + _log = openstackclient.network_log_show(log_name) + err_msg = 'Details show of log for security group using CLI failed.' + # pylint: disable=E1126 + self.assertEqual(_log['Name'], log_name, err_msg) + + def test_network_log_create_all(self): + log_name = self.random_name() + response = openstackclient.network_log_create( + log_name, **{'resource-type': 'security_group', + 'event': 'ALL'}) + self.addCleanup(self.api_network_log_delete, log_name) + err_msg = 'Creation of log for security group using CLI failed.' + # pylint: disable=E1126 + self.assertEqual(response['Enabled'], True, err_msg) + self.assertEqual(response['Type'], 'security_group', err_msg) + self.assertEqual(response['Event'], 'ALL', err_msg) + + def test_network_log_list(self): + test_sec_group_id = self._get_icmp_sec_group_id() + log_names = [self.api_random_network_log_create(test_sec_group_id) + for i in range(self.LOGS_AMOUNT)] + for log_name in log_names: + self.addCleanup(self.api_network_log_delete, log_name) + logs = openstackclient.network_log_list() + found_count = 0 + for _log in logs: + if _log['Name'] in log_names: + found_count += 1 + err_msg = (f'Listing {self.LOGS_AMOUNT} logs for security groups ' + 'using CLI failed.') + # pylint: disable=E1126 + self.assertEqual(found_count, self.LOGS_AMOUNT, err_msg) + + def test_network_log_delete(self): + test_sec_group_id = self._get_icmp_sec_group_id() + log_names = [self.api_random_network_log_create(test_sec_group_id) + for i in range(self.LOGS_AMOUNT)] + openstackclient.network_log_delete(log_names) + logs = self.api.list_network_logs()['logs'] + err_msg = 'Deletion of logs for security group using CLI failed.' + for log_name in log_names: + for _log in logs: + self.assertNotEqual(_log['name'], log_name, err_msg) + + +@neutron.skip_unless_is_ovn() +@topology.skip_unless_osp_version('16.2', higher=True) +@neutron.skip_if_missing_networking_extensions('logging') +class NeutronOvnLogCliTest(NeutronOvsLogCliTest): + pass + + +class NeutronNetworkCliTest(BaseCliTest): def test_network_creation(self): net_name = self.random_name() @@ -131,7 +245,7 @@ class NeutronCliNetwork(BaseCliTest): self.assertEqual(net['name'], net_name) # pylint: disable=E1126 -class NeutronCliSubnet(BaseCliTest): +class NeutronSubnetCliTest(BaseCliTest): def test_subnet_creation(self): subnet_name = self.random_name() @@ -165,7 +279,7 @@ class NeutronCliSubnet(BaseCliTest): self.assertEqual(subnet['name'], subnet_name) # pylint: disable=E1126 -class NeutronCliPort(BaseCliTest): +class NeutronPortCliTest(BaseCliTest): def test_port_creation(self): port_name = self.random_name()