Add api test for logging extension

Depends-On: I66234f45ee74c070119d84830790df296ba8d5f7
Partially-implements: blueprint security-group-logging
Related-Bug: #1468366
Change-Id: I19102b2b2c5cff78bccf05f19f30971d9b563b78
This commit is contained in:
Nguyen Phuong An 2017-11-24 11:30:25 +07:00 committed by Jakub Libosvar
parent cd32b7dcda
commit 67993fc044
4 changed files with 151 additions and 0 deletions

View File

@ -0,0 +1,74 @@
# Copyright 2017 Fujitsu Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
from tempest.lib import exceptions
import testscenarios
from neutron_tempest_plugin.api import base
load_tests = testscenarios.load_tests_apply_scenarios
class LoggingTestJSON(base.BaseAdminNetworkTest):
required_extensions = ['logging', 'standard-attr-description']
@decorators.idempotent_id('8d2e1ba5-455b-4519-a88e-e587002faba6')
def test_log_lifecycle(self):
name = data_utils.rand_name('test-log')
description = data_utils.rand_name('test-log-desc')
log = self.create_log(name=name, description=description,
resource_type='security_group', enabled=True)
# Test 'show log'
retrieved_log = self.admin_client.show_log(log['id'])['log']
self.assertEqual(name, retrieved_log['name'])
self.assertEqual(description, retrieved_log['description'])
self.assertEqual('security_group', retrieved_log['resource_type'])
self.assertTrue(retrieved_log['enabled'])
# Test 'list logs'
logs = self.admin_client.list_logs()['logs']
logs_ids = [log_object['id'] for log_object in logs]
self.assertIn(log['id'], logs_ids)
# Test 'update log'
update_description = data_utils.rand_name('test-log')
self.admin_client.update_log(log['id'],
description=update_description,
enabled=False)
retrieved_log = self.admin_client.show_log(log['id'])['log']
self.assertEqual(update_description, retrieved_log['description'])
self.assertFalse(retrieved_log['enabled'])
# Test 'delete log'
self.admin_client.delete_log(log['id'])
self.assertRaises(exceptions.NotFound,
self.admin_client.show_log, log['id'])
@decorators.idempotent_id('1af6cdab-0eb0-4e13-8027-d89cf1c7a87a')
def test_list_supported_logging_types(self):
# List supported logging types
# Since returned logging types depends on loaded backend drivers
# this test is checking only if returned keys are same as expected keys
expected_log_keys = ['type']
log_types = self.admin_client.list_loggable_resources()
actual_list_log_types = log_types['loggable_resources']
# Verify that only required fields present in logging types
for log_type in actual_list_log_types:
self.assertEqual(tuple(expected_log_keys), tuple(log_type.keys()))

View File

@ -0,0 +1,52 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_utils import uuidutils
from tempest.lib.common.utils import data_utils
from tempest.lib import decorators
from tempest.lib import exceptions as lib_exc
from neutron_tempest_plugin.api import base
class LoggingNegativeTestJSON(base.BaseAdminNetworkTest):
required_extensions = ['logging', 'standard-attr-description']
@decorators.attr(type='negative')
@decorators.idempotent_id('5fc61e24-cad5-4d86-a2d4-f40c0fa0a54c')
def test_create_log_with_invalid_resource_type(self):
log_args = {'name': data_utils.rand_name('test-log'),
'description': data_utils.rand_name('test-log-desc'),
'resource_type': 'fake_resource'}
self.assertRaises(lib_exc.BadRequest,
self.admin_client.create_log, **log_args)
@decorators.attr(type='negative')
@decorators.idempotent_id('7ed63170-0748-44b7-b0a0-64bfd9390dac')
def test_create_log_with_nonexistent_port(self):
log_args = {'name': data_utils.rand_name('test-log'),
'description': data_utils.rand_name('test-log-desc'),
'resource_type': 'security_group',
'target_id': uuidutils.generate_uuid()}
self.assertRaises(lib_exc.NotFound,
self.admin_client.create_log, **log_args)
@decorators.attr(type='negative')
@decorators.idempotent_id('89194c6b-8f47-400b-979b-072b1c1f767b')
def test_create_log_with_nonexistent_sg(self):
log_args = {'name': data_utils.rand_name('test-log'),
'description': data_utils.rand_name('test-log-desc'),
'resource_type': 'security_group',
'resource_id': uuidutils.generate_uuid()}
self.assertRaises(lib_exc.NotFound,
self.admin_client.create_log, **log_args)

View File

@ -121,6 +121,7 @@ class BaseNetworkTest(test.BaseTestCase):
cls.admin_subnetpools = []
cls.security_groups = []
cls.projects = []
cls.log_objects = []
@classmethod
def resource_cleanup(cls):
@ -213,6 +214,11 @@ class BaseNetworkTest(test.BaseTestCase):
cls._try_delete_resource(cls.admin_client.delete_qos_policy,
qos_policy['id'])
# Clean up log_objects
for log_object in cls.log_objects:
cls._try_delete_resource(cls.admin_client.delete_log,
log_object['id'])
super(BaseNetworkTest, cls).resource_cleanup()
@classmethod
@ -515,6 +521,23 @@ class BaseAdminNetworkTest(BaseNetworkTest):
cls.service_profiles.append(service_profile)
return service_profile
@classmethod
def create_log(cls, name, description=None,
resource_type='security_group', resource_id=None,
target_id=None, event='ALL', enabled=True):
"""Wrapper utility that returns a test log object."""
log_args = {'name': name,
'description': description,
'resource_type': resource_type,
'resource_id': resource_id,
'target_id': target_id,
'event': event,
'enabled': enabled}
body = cls.admin_client.create_log(**log_args)
log_object = body['log']
cls.log_objects.append(log_object)
return log_object
@classmethod
def get_unused_ip(cls, net_id, ip_version=None):
"""Get an unused ip address in a allocation pool of net"""

View File

@ -58,6 +58,8 @@ class NetworkClientJSON(service_client.RestClient):
'minimum_bandwidth_rules': 'qos',
'rule_types': 'qos',
'rbac-policies': '',
'logs': 'log',
'loggable_resources': 'log',
}
service_prefix = service_resource_prefix_map.get(
plural_name)