Merge "Delete SG log entries when SG is deleted" into stable/train

This commit is contained in:
Zuul 2021-11-17 12:24:16 +00:00 committed by Gerrit Code Review
commit 596ab2cec6
5 changed files with 51 additions and 9 deletions

View File

@ -180,15 +180,17 @@ def get_logs_bound_port(context, port_id):
return [log for log in logs if is_bound(log)]
def get_logs_bound_sg(context, sg_id):
def get_logs_bound_sg(context, sg_id, project_id=None):
"""Return a list of log_resources bound to a security group"""
project_id = context.tenant_id
kwargs = {
'resource_type': constants.SECURITY_GROUP,
'enabled': True}
if project_id:
kwargs['project_id'] = project_id
log_objs = log_object.Log.get_objects(
context,
project_id=project_id,
resource_type=constants.SECURITY_GROUP,
enabled=True)
context, **kwargs)
log_resources = []
for log_obj in log_objs:

View File

@ -29,7 +29,8 @@ class SecurityGroupRuleCallBack(manager.ResourceCallBackBase):
else:
sg_id = kwargs.get('security_group_id')
log_resources = db_api.get_logs_bound_sg(context, sg_id)
log_resources = db_api.get_logs_bound_sg(
context, sg_id, project_id=context.project_id)
if log_resources:
self.resource_push_api(
log_const.RESOURCE_UPDATE, context, log_resources)

View File

@ -14,6 +14,9 @@
# under the License.
from neutron_lib.api.definitions import logging
from neutron_lib.callbacks import events
from neutron_lib.callbacks import registry
from neutron_lib.callbacks import resources
from neutron_lib.db import api as db_api
from neutron_lib.services.logapi import constants as log_const
@ -21,6 +24,7 @@ from neutron.db import db_base_plugin_common
from neutron.extensions import logging as log_ext
from neutron.objects import base as base_obj
from neutron.objects.logapi import logging_resource as log_object
from neutron.services.logapi.common import db_api as log_db_api
from neutron.services.logapi.common import exceptions as log_exc
from neutron.services.logapi.common import validators
from neutron.services.logapi.drivers import manager as driver_mgr
@ -39,12 +43,23 @@ class LoggingPlugin(log_ext.LoggingPluginBase):
super(LoggingPlugin, self).__init__()
self.driver_manager = driver_mgr.LoggingServiceDriverManager()
self.validator_mgr = validators.ResourceValidateRequest.get_instance()
registry.subscribe(
self._clean_security_group_logs,
resources.SECURITY_GROUP, events.AFTER_DELETE)
@property
def supported_logging_types(self):
# supported_logging_types are be dynamically loaded from log_drivers
return self.driver_manager.supported_logging_types
def _clean_security_group_logs(self, resource, event, trigger, payload):
context = payload.context.elevated()
sg_id = payload.resource_id
with db_api.CONTEXT_WRITER.using(context):
sg_logs = log_db_api.get_logs_bound_sg(context, sg_id)
for log in sg_logs:
self.delete_log(context, log['id'])
@db_base_plugin_common.filter_fields
@db_base_plugin_common.convert_result_to_dict
def get_logs(self, context, filters=None, fields=None, sorts=None,

View File

@ -98,7 +98,8 @@ class LoggingDBApiTestCase(test_sg.SecurityGroupDBTestCase):
with mock.patch.object(log_object.Log, 'get_objects',
return_value=[log]):
self.assertEqual(
[log], db_api.get_logs_bound_sg(self.context, self.sg_id))
[log], db_api.get_logs_bound_sg(
self.context, self.sg_id, project_id=self.tenant_id))
# Test get log objects with required resource type
calls = [mock.call(self.context, project_id=self.tenant_id,
@ -119,7 +120,8 @@ class LoggingDBApiTestCase(test_sg.SecurityGroupDBTestCase):
with mock.patch.object(log_object.Log, 'get_objects',
return_value=[log]):
self.assertEqual(
[], db_api.get_logs_bound_sg(self.context, self.sg_id))
[], db_api.get_logs_bound_sg(
self.context, self.sg_id, project_id=self.tenant_id))
# Test get log objects with required resource type
calls = [mock.call(self.context, project_id=self.tenant_id,

View File

@ -14,6 +14,8 @@
# under the License.
import mock
from neutron_lib.callbacks import events
from neutron_lib.callbacks import resources
from neutron_lib import context
from neutron_lib.plugins import constants as plugin_const
from neutron_lib.plugins import directory
@ -24,6 +26,7 @@ from neutron import manager
from neutron.objects.logapi import logging_resource as log_object
from neutron.objects import ports
from neutron.objects import securitygroup as sg_object
from neutron.services.logapi.common import db_api as log_db_api
from neutron.services.logapi.common import exceptions as log_exc
from neutron.services.logapi.common import sg_validate
from neutron.tests.unit.services.logapi import base
@ -67,6 +70,25 @@ class TestLoggingPlugin(base.BaseLogTestCase):
new_callable=log_types).start()
self.ctxt = context.Context('admin', 'fake_tenant')
def test__clean_security_group_logs(self):
logs = [
{'id': uuidutils.generate_uuid()},
{'id': uuidutils.generate_uuid()}]
sg_id = uuidutils.generate_uuid()
expected_delete_calls = [
mock.call(mock.ANY, log['id']) for log in logs]
with mock.patch.object(log_db_api, 'get_logs_bound_sg',
return_value=logs) as mock_get_logs, \
mock.patch.object(
self.log_plugin, 'delete_log') as mock_delete_log:
payload = mock.Mock(
context=self.ctxt, resource_id=sg_id)
self.log_plugin._clean_security_group_logs(
resources.SECURITY_GROUP, events.AFTER_DELETE, None, payload)
mock_get_logs.assert_called_once_with(
mock.ANY, sg_id)
mock_delete_log.assert_has_calls(expected_delete_calls)
def test_get_logs(self):
with mock.patch.object(log_object.Log, 'get_objects')\
as get_objects_mock: