From fb1dab78575a3f67eb60e6675bbd1ffcb1b32228 Mon Sep 17 00:00:00 2001 From: Cao Xuan Hoang Date: Tue, 2 Jan 2018 15:52:47 +0700 Subject: [PATCH] [log]: Add rpc stuff for logging This patch adds rpc stuff for logging extension as below: - Reserves a rpc listener for listening callback from log drivers. - Also provides log_db_api: + get_logs_bound_port + get_logs_bound_fwg + get_fwg_log_info_for_port + get_fwg_log_info_for_log_resources This patch also introduces get_fwg_ports_in_tenant method in firewall_db_v2.py as helper function for further processing. Co-Authored-By: Nguyen Phuong An Co-Authored-By: Kim Bao Long Partial-Bug: #1720727 Change-Id: I6ad3fcec9a233286fdbd039d6d0e3a842c027f3f --- .../db/firewall/v2/firewall_db_v2.py | 15 +- .../services/logapi/common/__init__.py | 0 .../services/logapi/common/log_db_api.py | 217 +++++++++++++ neutron_fwaas/services/logapi/rpc/__init__.py | 0 .../services/logapi/rpc/log_server.py | 30 ++ .../services/logapi/common/test_log_db_api.py | 299 ++++++++++++++++++ .../unit/services/logapi/rpc/__init__.py | 0 .../services/logapi/rpc/test_log_server.py | 56 ++++ 8 files changed, 616 insertions(+), 1 deletion(-) create mode 100644 neutron_fwaas/services/logapi/common/__init__.py create mode 100644 neutron_fwaas/services/logapi/common/log_db_api.py create mode 100644 neutron_fwaas/services/logapi/rpc/__init__.py create mode 100644 neutron_fwaas/services/logapi/rpc/log_server.py create mode 100644 neutron_fwaas/tests/unit/services/logapi/common/test_log_db_api.py create mode 100644 neutron_fwaas/tests/unit/services/logapi/rpc/__init__.py create mode 100644 neutron_fwaas/tests/unit/services/logapi/rpc/test_log_server.py diff --git a/neutron_fwaas/db/firewall/v2/firewall_db_v2.py b/neutron_fwaas/db/firewall/v2/firewall_db_v2.py index 6f809ff24..ee157eec1 100644 --- a/neutron_fwaas/db/firewall/v2/firewall_db_v2.py +++ b/neutron_fwaas/db/firewall/v2/firewall_db_v2.py @@ -845,13 +845,26 @@ class FirewallPluginDb(common_db_mixin.CommonDbMixin): return None def get_fwg_attached_to_port(self, context, port_id): - """Return a firewall group ID is associated to a port""" + """Return a firewall group ID that is attached to a given port""" fwg_port = self._model_query(context, FirewallGroupPortAssociation).\ filter_by(port_id=port_id).first() if fwg_port: return fwg_port.firewall_group_id return None + def get_fwg_ports_in_tenant(self, context, tenant_id): + """Return a list of ports under a given tenant""" + try: + fwg_id = FirewallGroupPortAssociation.firewall_group_id + with context.session.begin(subtransactions=True): + port_qry = context.session.query( + FirewallGroupPortAssociation.port_id).join( + FirewallGroup, FirewallGroup.id == fwg_id).filter( + FirewallGroup.tenant_id == tenant_id).all() + return list({port for (port,) in port_qry}) + except exc.NoResultFound: + return [] + def _ensure_default_firewall_group(self, context, tenant_id): """Create a default firewall group if one doesn't exist for a tenant diff --git a/neutron_fwaas/services/logapi/common/__init__.py b/neutron_fwaas/services/logapi/common/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/neutron_fwaas/services/logapi/common/log_db_api.py b/neutron_fwaas/services/logapi/common/log_db_api.py new file mode 100644 index 000000000..5df87a274 --- /dev/null +++ b/neutron_fwaas/services/logapi/common/log_db_api.py @@ -0,0 +1,217 @@ +# Copyright (c) 2018 Fujitsu Limited +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from neutron.objects.logapi import logging_resource as log_object +from neutron.objects import ports as port_objects +from neutron_lib import constants as nl_const +from neutron_lib.plugins import directory + +from neutron_fwaas.common import fwaas_constants +from neutron_fwaas.services.logapi import constants + +fw_plugin_db = None + + +# TODO(longkb): We will support L2 port. Currently, this method returns only +# router ports. +def _get_ports_being_logged(context, log_obj): + """Return a list of ports being logged with a given log_obj""" + + target_id = log_obj['target_id'] + resource_id = log_obj['resource_id'] + + # If 'target_id' (port_id) is specified in a log_obj + if target_id: + fwg_id = fw_plugin_db.get_fwg_attached_to_port(context, target_id) + if fwg_id: + port_ids = [target_id] + else: + port_ids = [] + # If 'resource_id' (fwg_id) is specified in a log_obj + elif resource_id: + port_ids = \ + fw_plugin_db.get_ports_in_firewall_group(context, resource_id) + # Both 'resource_id' and 'target_id' aren't specified in a log_resource + else: + tenant_id = log_obj['project_id'] + port_ids = fw_plugin_db.get_fwg_ports_in_tenant(context, tenant_id) + + filtered_port_ids = [] + for port_id in port_ids: + port = port_objects.Port.get_object(context, id=port_id) + device_owner = port.get('device_owner', '') + # TODO(longkb): L2 ports will be supported in the future + # Check whether a port is router port or not. + if device_owner in nl_const.ROUTER_INTERFACE_OWNERS: + # Check whether a port is attached to firewall group or not + fwg = fw_plugin_db.get_fwg_attached_to_port(context, port_id) + if fwg: + filtered_port_ids.append(port_id) + return filtered_port_ids + + +def _make_log_info_dict(log_obj, port_ids): + log_info = { + 'id': log_obj['id'], + 'ports_log': port_ids, + 'event': log_obj['event'], + 'project_id': log_obj['project_id'] + } + return log_info + + +def get_logs_for_port(context, port_id): + """Return a list of log_resources bound to a given port_id""" + + logs_bounded = [] + port = port_objects.Port.get_object(context, id=port_id) + + if not port: + return logs_bounded + # Ignore if a given port_id is not belong to router port + device_owner = port.get('device_owner', '') + if device_owner not in nl_const.ROUTER_INTERFACE_OWNERS: + return logs_bounded + + # Ignore if a given port does not attach to any fwg + fwg_id = fw_plugin_db.get_fwg_attached_to_port(context, port_id) + if not fwg_id: + return logs_bounded + + project_id = port['project_id'] + log_objs = log_object.Log.get_objects( + context, project_id=project_id, + resource_type=constants.FIREWALL_GROUP, enabled=True) + + for log_obj in log_objs: + if log_obj.resource_id == fwg_id: + logs_bounded.append(log_obj) + elif log_obj.target_id == port['id']: + logs_bounded.append(log_obj) + elif not log_obj.target_id and not log_obj.resource_id: + logs_bounded.append(log_obj) + return logs_bounded + + +def get_logs_for_fwg(context, fwg_id, ports_delta): + """Return a list of log_resources bound to a firewall group""" + + global fw_plugin_db + if not fw_plugin_db: + fw_plugin = directory.get_plugin(fwaas_constants.FIREWALL_V2) + + # NOTE(longkb): check whether fw plugin was loaded or not. + if not fw_plugin: + return [] + fw_plugin_db = fw_plugin.driver.firewall_db + + project_id = context.tenant_id + log_objs = log_object.Log.get_objects( + context, project_id=project_id, + resource_type=constants.FIREWALL_GROUP, enabled=True) + + log_resources = [] + for log_obj in log_objs: + if log_obj.resource_id == fwg_id: + log_resources.append(log_obj) + elif log_obj.target_id in ports_delta: + log_resources.append(log_obj) + elif not log_obj.resource_id and not log_obj.target_id: + log_resources.append(log_obj) + return log_resources + + +def get_fwg_log_info_for_port(context, port_ids): + """Return a list of firewall group log info for a given port + The list has format as below: + + [ + { + 'event': u'ALL', + 'id': '733e0499-e69e-4106-a84a-635fbc5fbbc0', + 'project_id': u'46f70361-ba71-4bd0-9769-3573fd227c4b', + 'ports_log': + [ + port1_id, + port2_id, + ] + }, + ] + :param context: current running context information + :param port_ids: list of ports which needed to get firewall group log info + + """ + + global fw_plugin_db + if not fw_plugin_db: + fw_plugin = directory.get_plugin(fwaas_constants.FIREWALL_V2) + + # NOTE(longkb): check whether fw plugin was loaded or not. + if not fw_plugin: + return [] + fw_plugin_db = fw_plugin.driver.firewall_db + + logs_info = [] + log_bounds = set() + for port_id in port_ids: + log_objs = get_logs_for_port(context, port_id) + if log_objs: + log_bounds |= set(log_objs) + if log_bounds: + for log_resource in log_bounds: + port_ids = _get_ports_being_logged(context, log_resource) + log_info = _make_log_info_dict(log_resource, port_ids) + logs_info.append(log_info) + return logs_info + + +def get_fwg_log_info_for_log_resources(context, log_resources): + """Return a list of firewall group log info for list of log_resources + The list has format as below: + + [ + { + 'event': u'ALL', + 'id': '733e0499-e69e-4106-a84a-635fbc5fbbc0', + 'project_id': u'46f70361-ba71-4bd0-9769-3573fd227c4b', + 'ports_log': + [ + port1_id, + port2_id, + ] + }, + ] + :param context: current running context information + :param log_resources: list of log_resources, which needed to get firewall + groups log info + + """ + + global fw_plugin_db + if not fw_plugin_db: + fw_plugin = directory.get_plugin(fwaas_constants.FIREWALL_V2) + + # NOTE(longkb): check whether fw plugin was loaded or not. + if not fw_plugin: + return [] + fw_plugin_db = fw_plugin.driver.firewall_db + + logs_info = [] + for log_resource in log_resources: + ports_id = _get_ports_being_logged(context, log_resource) + log_info = _make_log_info_dict(log_resource, ports_id) + logs_info.append(log_info) + + return logs_info diff --git a/neutron_fwaas/services/logapi/rpc/__init__.py b/neutron_fwaas/services/logapi/rpc/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/neutron_fwaas/services/logapi/rpc/log_server.py b/neutron_fwaas/services/logapi/rpc/log_server.py new file mode 100644 index 000000000..ca5e2e244 --- /dev/null +++ b/neutron_fwaas/services/logapi/rpc/log_server.py @@ -0,0 +1,30 @@ +# Copyright (C) 2018 Fujitsu Limited +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from neutron_fwaas.services.logapi.common import log_db_api + + +# Use this when register log driver with +# "register_rpc_methods" function +def get_fwg_log_info_for_port(context, port_id): + return log_db_api.get_fwg_log_info_for_port(context, port_id) + + +# Use this when register log driver with +# "register_rpc_methods" function +def get_fwg_log_info_for_log_resources(context, log_resources): + return log_db_api.get_fwg_log_info_for_log_resources( + context, + log_resources) diff --git a/neutron_fwaas/tests/unit/services/logapi/common/test_log_db_api.py b/neutron_fwaas/tests/unit/services/logapi/common/test_log_db_api.py new file mode 100644 index 000000000..3f42fcdda --- /dev/null +++ b/neutron_fwaas/tests/unit/services/logapi/common/test_log_db_api.py @@ -0,0 +1,299 @@ +# Copyright (c) 2018 Fujitsu Limited +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock +from neutron.objects.logapi import logging_resource as log_object +from neutron.objects import ports as port_objects +from neutron.services.logapi.rpc import server as server_rpc +from neutron.tests import base +from neutron_lib import constants as nl_const +from oslo_utils import uuidutils + +from neutron_fwaas.services.logapi.common import log_db_api +from neutron_fwaas.services.logapi.rpc import log_server as fwg_rpc + +FWG = 'firewall_group' + + +def _create_log_object(tenant_id, resource_id=None, + target_id=None, event='ALL'): + + log_data = { + 'id': uuidutils.generate_uuid(), + 'name': 'fake_log_name', + 'resource_type': FWG, + 'project_id': tenant_id, + 'event': event, + 'enabled': True} + if resource_id: + log_data['resource_id'] = resource_id + if target_id: + log_data['target_id'] = target_id + return log_object.Log(**log_data) + + +def _fake_log_info(id, project_id, ports_id, event='ALL'): + expected = { + 'id': id, + 'project_id': project_id, + 'ports_log': ports_id, + 'event': event + } + return expected + + +def _fake_port_object(port_id, device_owner, + project_id=uuidutils.generate_uuid()): + port_data = { + 'id': port_id, + 'device_owner': device_owner, + 'project_id': project_id + } + return port_data + + +class LoggingRpcCallbackTestCase(base.BaseTestCase): + + def setUp(self): + super(LoggingRpcCallbackTestCase, self).setUp() + self.context = mock.Mock() + self.rpc_callback = server_rpc.LoggingApiSkeleton() + + log_db_api.fw_plugin_db = mock.Mock() + + self.vm_port = uuidutils.generate_uuid() + self.router_port = uuidutils.generate_uuid() + self.fake_vm_port = \ + _fake_port_object(self.vm_port, + nl_const.DEVICE_OWNER_COMPUTE_PREFIX) + + self.fake_router_port = \ + _fake_port_object(self.router_port, + nl_const.DEVICE_OWNER_ROUTER_INTF) + self.fake_router_ports = \ + [_fake_port_object(self.router_port, device) + for device in nl_const.ROUTER_INTERFACE_OWNERS] + + def test_get_fwg_log_info_for_log_resources(self): + fwg_id = uuidutils.generate_uuid() + tenant_id = uuidutils.generate_uuid() + log_obj = _create_log_object(tenant_id, resource_id=fwg_id) + + rpc_call = fwg_rpc.get_fwg_log_info_for_log_resources + with mock.patch.object(server_rpc, 'get_rpc_method', + return_value=rpc_call): + fake_ports = ['fake_port_1', 'fake_port_2'] + with mock.patch.object(log_db_api, '_get_ports_being_logged', + return_value=fake_ports): + expected_log_info = [ + _fake_log_info(log_obj['id'], tenant_id, fake_ports) + ] + + logs_info = self.rpc_callback.\ + get_sg_log_info_for_log_resources(self.context, + resource_type=FWG, + log_resources=[log_obj]) + self.assertEqual(expected_log_info, logs_info) + + def test_get_fwg_log_info_for_port(self): + fwg_id = uuidutils.generate_uuid() + port_id = uuidutils.generate_uuid() + tenant_id = uuidutils.generate_uuid() + + log_obj = _create_log_object(tenant_id, resource_id=fwg_id, + target_id=port_id) + + rpc_call = fwg_rpc.get_fwg_log_info_for_port + with mock.patch.object(server_rpc, 'get_rpc_method', + return_value=rpc_call): + with mock.patch.object(log_db_api, 'get_logs_for_port', + return_value=[log_obj]): + fake_ports = [port_id, 'fake_port2'] + with mock.patch.object(log_db_api, '_get_ports_being_logged', + return_value=fake_ports): + expected_log_info = [_fake_log_info(log_obj['id'], + tenant_id, + fake_ports)] + logs_info = self.rpc_callback.\ + get_sg_log_info_for_port(self.context, + resource_type=FWG, + port_id=port_id) + self.assertEqual(expected_log_info, logs_info) + + def test_get_ports_being_logged_with_target_id(self): + tenant_id = uuidutils.generate_uuid() + fwg_id = uuidutils.generate_uuid() + + # Test with VM port + log_obj = _create_log_object(tenant_id, resource_id=fwg_id, + target_id=self.vm_port) + with mock.patch.object(port_objects.Port, 'get_object', + return_value=self.fake_vm_port): + logged_port_ids = \ + log_db_api._get_ports_being_logged(self.context, log_obj) + self.assertEqual([], logged_port_ids) + + # Test with router ports + log_obj = _create_log_object(tenant_id, resource_id=fwg_id, + target_id=self.router_port) + + log_db_api.fw_plugin_db. \ + get_fwg_attached_to_port = mock.Mock(return_value='fwg_id') + with mock.patch.object(port_objects.Port, 'get_object', + side_effect=self.fake_router_ports): + + for port in self.fake_router_ports: + logged_port_ids = \ + log_db_api._get_ports_being_logged(self.context, log_obj) + self.assertEqual([self.router_port], logged_port_ids) + + def test_get_ports_being_logged_with_resource_id(self): + tenant_id = uuidutils.generate_uuid() + fwg_id = uuidutils.generate_uuid() + log_obj = _create_log_object(tenant_id, resource_id=fwg_id) + + log_db_api.fw_plugin_db.get_ports_in_firewall_group = \ + mock.Mock(return_value=[self.vm_port]) + # Test with VM port + with mock.patch.object(port_objects.Port, 'get_object', + return_value=self.fake_vm_port): + logged_port_ids = \ + log_db_api._get_ports_being_logged(self.context, log_obj) + self.assertEqual([], logged_port_ids) + + # Test with router ports + router_ports = [self.router_port, self.router_port, self.router_port] + log_db_api.fw_plugin_db. \ + get_ports_in_firewall_group = mock.Mock(return_value=router_ports) + log_db_api.fw_plugin_db. \ + get_fwg_attached_to_port = mock.Mock(return_value='fwg_id') + + with mock.patch.object(port_objects.Port, 'get_object', + side_effect=self.fake_router_ports): + logged_port_ids = \ + log_db_api._get_ports_being_logged(self.context, log_obj) + self.assertEqual(router_ports, logged_port_ids) + + # Test with both vm port and router ports + log_db_api.fw_plugin_db.get_ports_in_firewall_group = \ + mock.Mock(return_value=[self.vm_port, self.router_port]) + log_db_api.fw_plugin_db. \ + get_fwg_attached_to_port = mock.Mock(return_value='fwg_id') + + with mock.patch.object(port_objects.Port, 'get_object', + side_effect=[self.fake_vm_port, + self.fake_router_port]): + logged_port_ids = \ + log_db_api._get_ports_being_logged(self.context, log_obj) + self.assertEqual([self.router_port], logged_port_ids) + + def test_get_ports_being_logged_with_ports_in_tenant(self): + tenant_id = uuidutils.generate_uuid() + log_obj = _create_log_object(tenant_id) + + log_db_api.fw_plugin_db.get_fwg_ports_in_tenant = \ + mock.Mock(return_value=[self.router_port]) + log_db_api.fw_plugin_db. \ + get_fwg_attached_to_port = mock.Mock(return_value='fwg_id') + + with mock.patch.object(port_objects.Port, 'get_object', + return_value=self.fake_router_port): + log_db_api._get_ports_being_logged(self.context, log_obj) + log_db_api.fw_plugin_db.get_fwg_ports_in_tenant.\ + assert_called_with(self.context, tenant_id) + + def test_logs_for_port_with_vm_port(self): + with mock.patch.object(port_objects.Port, 'get_object', + return_value=self.fake_vm_port): + logs = log_db_api.get_logs_for_port(self.context, self.vm_port) + self.assertEqual([], logs) + + def test_logs_for_port_with_router_port(self): + tenant_id = uuidutils.generate_uuid() + resource_id = uuidutils.generate_uuid() + target_id = uuidutils.generate_uuid() + log_db_api.fw_plugin_db.get_fwg_attached_to_port = \ + mock.Mock(side_effect=[[], resource_id, resource_id]) + with mock.patch.object(port_objects.Port, 'get_object', + return_value=self.fake_router_port): + + # Test with router port that did not attach to fwg + logs = log_db_api.get_logs_for_port(self.context, self.router_port) + self.assertEqual([], logs) + + # Test with router port that attached to fwg + # Fake log objects that bounds a given port + log = _create_log_object(tenant_id) + resource_log = _create_log_object(tenant_id, resource_id) + target_log = _create_log_object(tenant_id, resource_id, target_id) + log_objs = [log, target_log, resource_log] + + with mock.patch.object(log_object.Log, 'get_objects', + return_value=log_objs): + self.fake_router_port = mock.Mock(return_value=target_id) + logs = log_db_api.get_logs_for_port(self.context, + self.router_port) + self.assertEqual(log_objs, logs) + + # Fake log objects that does not bound a given port + unbound_resource = uuidutils.generate_uuid() + resource_log = _create_log_object(tenant_id, unbound_resource) + target_log = _create_log_object(tenant_id, unbound_resource, + target_id) + log_objs = [log, target_log, resource_log] + + with mock.patch.object(log_object.Log, 'get_objects', + return_value=log_objs): + self.fake_router_port = mock.Mock(return_value=target_id) + logs = log_db_api.get_logs_for_port(self.context, + self.router_port) + self.assertEqual([log], logs) + + def test_logs_for_fwg(self): + tenant_id = uuidutils.generate_uuid() + resource_id = uuidutils.generate_uuid() + target_id = uuidutils.generate_uuid() + + # Fake log objects that bounds a given fwg + log = _create_log_object(tenant_id) + resource_log = _create_log_object(tenant_id, resource_id) + target_log = _create_log_object(tenant_id, target_id=target_id) + ports_delta = [target_id] + + # Test with port that in ports_delta + log_db_api.fw_plugin_db.get_fwg_attached_to_port = \ + mock.Mock(return_value=None) + with mock.patch.object(log_object.Log, 'get_objects', + return_value=[target_log]): + logs = log_db_api.get_logs_for_fwg(self.context, + resource_id, + ports_delta) + self.assertEqual([target_log], logs) + + # Test with log that bound to a give fwg + with mock.patch.object(log_object.Log, 'get_objects', + return_value=[resource_log]): + logs = log_db_api.get_logs_for_fwg(self.context, + resource_id, + ports_delta) + self.assertEqual([resource_log], logs) + + # Test with log that does not bound to any fwg or port + with mock.patch.object(log_object.Log, 'get_objects', + return_value=[log]): + logs = log_db_api.get_logs_for_fwg(self.context, + resource_id, + ports_delta) + self.assertEqual([log], logs) diff --git a/neutron_fwaas/tests/unit/services/logapi/rpc/__init__.py b/neutron_fwaas/tests/unit/services/logapi/rpc/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/neutron_fwaas/tests/unit/services/logapi/rpc/test_log_server.py b/neutron_fwaas/tests/unit/services/logapi/rpc/test_log_server.py new file mode 100644 index 000000000..092a148c6 --- /dev/null +++ b/neutron_fwaas/tests/unit/services/logapi/rpc/test_log_server.py @@ -0,0 +1,56 @@ +# Copyright (c) 2018 Fujitsu Limited +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import mock + +from neutron.services.logapi.rpc import server as server_rpc +from neutron.tests import base + +from neutron_fwaas.services.logapi.rpc import log_server as fw_server_rpc + + +class FWGLoggingApiSkeletonTestCase(base.BaseTestCase): + @mock.patch("neutron_fwaas.services.logapi.common.log_db_api." + "get_fwg_log_info_for_port") + def test_get_fwg_log_info_for_port(self, mock_callback): + with mock.patch.object( + server_rpc, + 'get_rpc_method', + return_value=fw_server_rpc.get_fwg_log_info_for_port + ): + test_obj = server_rpc.LoggingApiSkeleton() + m_context = mock.Mock() + port_id = '123' + test_obj.get_sg_log_info_for_port(m_context, + resource_type='firewall_v2', + port_id=port_id) + mock_callback.assert_called_with(m_context, port_id) + + @mock.patch("neutron_fwaas.services.logapi.common.log_db_api." + "get_fwg_log_info_for_log_resources") + def test_get_fwg_log_info_for_log_resources(self, mock_callback): + with mock.patch.object( + server_rpc, + 'get_rpc_method', + return_value=fw_server_rpc.get_fwg_log_info_for_log_resources + ): + test_obj = server_rpc.LoggingApiSkeleton() + m_context = mock.Mock() + log_resources = [mock.Mock()] + test_obj.get_sg_log_info_for_log_resources( + m_context, + resource_type='firewall_v2', + log_resources=log_resources) + mock_callback.assert_called_with(m_context, log_resources)