diff --git a/neutron/services/logapi/common/constants.py b/neutron/services/logapi/common/constants.py index bb50817b245..dc72c5dfcd1 100644 --- a/neutron/services/logapi/common/constants.py +++ b/neutron/services/logapi/common/constants.py @@ -24,6 +24,9 @@ SECURITY_GROUP = 'security_group' RPC_NAMESPACE_LOGGING = 'logging-plugin' +# Define for rpc_method_key +LOG_RESOURCE = 'log_resource' + # String literal for identifying log resource LOGGING = 'log' diff --git a/neutron/services/logapi/drivers/base.py b/neutron/services/logapi/drivers/base.py index d7e0b2cc3ab..3ddc297bce7 100644 --- a/neutron/services/logapi/drivers/base.py +++ b/neutron/services/logapi/drivers/base.py @@ -18,6 +18,7 @@ from neutron_lib.callbacks import registry from oslo_log import log as logging from neutron.services.logapi.common import constants as log_const +from neutron.services.logapi.rpc import server as server_rpc LOG = logging.getLogger(__name__) @@ -53,6 +54,9 @@ class DriverBase(object): # trigger is the LoggingServiceDriverManager trigger.register_driver(self) + def register_rpc_methods(self, resource_type, rpc_methods): + server_rpc.register_rpc_methods(resource_type, rpc_methods) + def is_loaded(self): """True if the driver is active for the Neutron Server. diff --git a/neutron/services/logapi/drivers/openvswitch/driver.py b/neutron/services/logapi/drivers/openvswitch/driver.py index 4855497a60f..ab7b2be9a2f 100644 --- a/neutron/services/logapi/drivers/openvswitch/driver.py +++ b/neutron/services/logapi/drivers/openvswitch/driver.py @@ -14,9 +14,12 @@ # under the License. from neutron_lib.api.definitions import portbindings +from neutron_lib.callbacks import resources from oslo_log import log as logging +from neutron.services.logapi.common import constants as log_const from neutron.services.logapi.drivers import base +from neutron.services.logapi.rpc import server as server_rpc LOG = logging.getLogger(__name__) @@ -43,4 +46,13 @@ def register(): global DRIVER if not DRIVER: DRIVER = OVSDriver.create() + + # Register RPC methods + if DRIVER.requires_rpc: + rpc_methods = [ + {resources.PORT: server_rpc.get_sg_log_info_for_port}, + {log_const.LOG_RESOURCE: + server_rpc.get_sg_log_info_for_log_resources} + ] + DRIVER.register_rpc_methods(log_const.SECURITY_GROUP, rpc_methods) LOG.debug('Open vSwitch logging driver registered') diff --git a/neutron/services/logapi/drivers/openvswitch/ovs_firewall_log.py b/neutron/services/logapi/drivers/openvswitch/ovs_firewall_log.py index aa2bef33f9f..b5f41ead905 100644 --- a/neutron/services/logapi/drivers/openvswitch/ovs_firewall_log.py +++ b/neutron/services/logapi/drivers/openvswitch/ovs_firewall_log.py @@ -274,10 +274,14 @@ class OVSFirewallLoggingDriver(log_ext.LoggingDriver): # try to clean port flows log for port updated/create event self._cleanup_port_flows_log(port_id) logs_info = self.resource_rpc.get_sg_log_info_for_port( - context, port_id=port_id) + context, + resource_type=log_const.SECURITY_GROUP, + port_id=port_id) elif log_resources: logs_info = self.resource_rpc.get_sg_log_info_for_log_resources( - context, log_resources=log_resources) + context, + resource_type=log_const.SECURITY_GROUP, + log_resources=log_resources) for log_info in logs_info: log_id = log_info['id'] diff --git a/neutron/services/logapi/rpc/agent.py b/neutron/services/logapi/rpc/agent.py index 5a4b8f6beb4..55ef76d911d 100644 --- a/neutron/services/logapi/rpc/agent.py +++ b/neutron/services/logapi/rpc/agent.py @@ -32,14 +32,18 @@ class LoggingApiStub(object): self.rpc_client = n_rpc.get_client(target) @log_helpers.log_method_call - def get_sg_log_info_for_port(self, context, port_id): + def get_sg_log_info_for_port(self, context, resource_type, port_id): """Return list of sg_log info for a port""" cctxt = self.rpc_client.prepare() - return cctxt.call(context, 'get_sg_log_info_for_port', port_id=port_id) + return cctxt.call(context, 'get_sg_log_info_for_port', + resource_type=resource_type, + port_id=port_id) @log_helpers.log_method_call - def get_sg_log_info_for_log_resources(self, context, log_resources): + def get_sg_log_info_for_log_resources(self, context, + resource_type, log_resources): """Return list of sg_log info for list of log_resources""" cctxt = self.rpc_client.prepare() return cctxt.call(context, 'get_sg_log_info_for_log_resources', + resource_type=resource_type, log_resources=log_resources) diff --git a/neutron/services/logapi/rpc/server.py b/neutron/services/logapi/rpc/server.py index c0805ab7e47..520a9b20173 100644 --- a/neutron/services/logapi/rpc/server.py +++ b/neutron/services/logapi/rpc/server.py @@ -13,7 +13,9 @@ # License for the specific language governing permissions and limitations # under the License. +from neutron_lib.callbacks import resources as r_const from oslo_log import helpers as log_helpers +from oslo_log import log as logging import oslo_messaging from neutron.api.rpc.callbacks import events @@ -22,15 +24,57 @@ from neutron.common import rpc as n_rpc from neutron.services.logapi.common import constants as log_const from neutron.services.logapi.common import db_api +LOG = logging.getLogger(__name__) + +# RPC methods mapping +RPC_RESOURCES_METHOD_MAP = {} + + +# This function must be called when a log_driver is registered. +def register_rpc_methods(resource_type, rpc_methods): + """Register RPC methods. + + :param resource_type: string and must be a valid resource type. + :param rpc_methods: list of RPC methods to be registered. + This param would look like: + [ + {'PORT': get_sg_log_info_for_port}, + {'LOG_RESOURCE': get_sg_log_info_for_log_resources} + ] + """ + if resource_type not in RPC_RESOURCES_METHOD_MAP: + RPC_RESOURCES_METHOD_MAP[resource_type] = rpc_methods + + +def get_rpc_method(resource_type, rpc_method_key): + if resource_type not in RPC_RESOURCES_METHOD_MAP: + raise NotImplementedError() + + for rpc_method in RPC_RESOURCES_METHOD_MAP[resource_type]: + if rpc_method_key in rpc_method.keys(): + return list(rpc_method.values())[0] + + raise NotImplementedError() + + +def get_sg_log_info_for_port(context, port_id): + return db_api.get_sg_log_info_for_port(context, port_id) + + +def get_sg_log_info_for_log_resources(context, log_resources): + return db_api.get_sg_log_info_for_log_resources(context, log_resources) + class LoggingApiSkeleton(object): """Skeleton proxy code for agent->server communication.""" # History # 1.0 Initial version + # 1.1 Introduce resource_type as a keyword in order to extend + # support for other resources target = oslo_messaging.Target( - version='1.0', namespace=log_const.RPC_NAMESPACE_LOGGING) + version='1.1', namespace=log_const.RPC_NAMESPACE_LOGGING) def __init__(self): self.conn = n_rpc.Connection() @@ -38,12 +82,21 @@ class LoggingApiSkeleton(object): fanout=False) @log_helpers.log_method_call - def get_sg_log_info_for_port(self, context, port_id): - return db_api.get_sg_log_info_for_port(context, port_id) + def get_sg_log_info_for_port(self, context, port_id, **kwargs): + resource_type = kwargs.get('resource_type', log_const.SECURITY_GROUP) + LOG.debug("Logging agent requests log info " + "for port with resource type %s", resource_type) + rpc_method = get_rpc_method(resource_type, r_const.PORT) + return rpc_method(context, port_id) @log_helpers.log_method_call - def get_sg_log_info_for_log_resources(self, context, log_resources): - return db_api.get_sg_log_info_for_log_resources(context, log_resources) + def get_sg_log_info_for_log_resources(self, context, + log_resources, **kwargs): + resource_type = kwargs.get('resource_type', log_const.SECURITY_GROUP) + LOG.debug("Logging agent requests log info " + "for log resources with resource type %s", resource_type) + rpc_method = get_rpc_method(resource_type, log_const.LOG_RESOURCE) + return rpc_method(context, log_resources) class LoggingApiNotification(object): diff --git a/neutron/tests/unit/services/logapi/common/test_db_api.py b/neutron/tests/unit/services/logapi/common/test_db_api.py index 06cd2f7d13f..7b1bf7255e1 100644 --- a/neutron/tests/unit/services/logapi/common/test_db_api.py +++ b/neutron/tests/unit/services/logapi/common/test_db_api.py @@ -20,6 +20,7 @@ from oslo_utils import uuidutils from neutron.common import utils from neutron.objects.logapi import logging_resource as log_object +from neutron.services.logapi.common import constants as log_const from neutron.services.logapi.common import db_api from neutron.services.logapi.common import validators from neutron.services.logapi.rpc import server as server_rpc @@ -162,41 +163,49 @@ class LoggingRpcCallbackTestCase(test_sg.SecurityGroupDBTestCase): ports_rest = self.deserialize(self.fmt, res) port_id = ports_rest['port']['id'] log = _create_log(resource_id=sg_id, tenant_id=tenant_id) - with mock.patch.object(validators, 'validate_log_type_for_port', - return_value=True): - ports_log = ( - self.rpc_callback.get_sg_log_info_for_log_resources( - self.context, log_resources=[log]) - ) - expected = [{ - 'event': log.event, - 'id': log.id, - 'ports_log': [{ - 'port_id': port_id, - 'security_group_rules': [ - {'direction': 'egress', - 'ethertype': u'IPv4', - 'security_group_id': sg_id}, - {'direction': 'egress', - 'ethertype': u'IPv6', - 'security_group_id': sg_id}, - {'direction': 'ingress', - 'ethertype': u'IPv4', - 'port_range_max': 22, - 'port_range_min': 22, - 'protocol': u'tcp', - 'security_group_id': sg_id}, - {'direction': 'egress', - 'ethertype': u'IPv4', - 'protocol': u'tcp', - 'dest_ip_prefix': - utils.AuthenticIPNetwork('10.0.0.1/32'), - 'security_group_id': sg_id}] - }], - 'project_id': tenant_id - }] - self.assertEqual(expected, ports_log) - self._delete('ports', port_id) + with mock.patch.object( + server_rpc, + 'get_rpc_method', + return_value=server_rpc.get_sg_log_info_for_log_resources + ): + with mock.patch.object(validators, + 'validate_log_type_for_port', + return_value=True): + ports_log = ( + self.rpc_callback.get_sg_log_info_for_log_resources( + self.context, + resource_type=log_const.SECURITY_GROUP, + log_resources=[log]) + ) + expected = [{ + 'event': log.event, + 'id': log.id, + 'ports_log': [{ + 'port_id': port_id, + 'security_group_rules': [ + {'direction': 'egress', + 'ethertype': u'IPv4', + 'security_group_id': sg_id}, + {'direction': 'egress', + 'ethertype': u'IPv6', + 'security_group_id': sg_id}, + {'direction': 'ingress', + 'ethertype': u'IPv4', + 'port_range_max': 22, + 'port_range_min': 22, + 'protocol': u'tcp', + 'security_group_id': sg_id}, + {'direction': 'egress', + 'ethertype': u'IPv4', + 'protocol': u'tcp', + 'dest_ip_prefix': + utils.AuthenticIPNetwork('10.0.0.1/32'), + 'security_group_id': sg_id}] + }], + 'project_id': tenant_id + }] + self.assertEqual(expected, ports_log) + self._delete('ports', port_id) def test_get_sg_log_info_for_port_added_event(self): with self.network() as network, \ @@ -228,39 +237,48 @@ class LoggingRpcCallbackTestCase(test_sg.SecurityGroupDBTestCase): with mock.patch.object( log_object.Log, 'get_objects', return_value=[log]): with mock.patch.object( - validators, 'validate_log_type_for_port', - return_value=True): - ports_log = ( - self.rpc_callback.get_sg_log_info_for_port( - self.context, port_id=port_id) - ) - expected = [{ - 'event': log.event, - 'id': log.id, - 'ports_log': [{ - 'port_id': port_id, - 'security_group_rules': [ - {'direction': 'egress', - 'ethertype': u'IPv4', - 'security_group_id': sg_id}, - {'direction': 'egress', - 'ethertype': u'IPv6', - 'security_group_id': sg_id}, - {'direction': 'ingress', - 'ethertype': u'IPv4', - 'port_range_max': 13, - 'port_range_min': 11, - 'protocol': u'tcp', - 'source_ip_prefix': - utils.AuthenticIPNetwork('10.0.0.1/32'), - 'security_group_id': sg_id}, - {'direction': 'egress', - 'ethertype': u'IPv4', - 'protocol': u'icmp', - 'security_group_id': sg_id}] - }], - 'project_id': tenant_id - }] + server_rpc, + 'get_rpc_method', + return_value=server_rpc.get_sg_log_info_for_port + ): + with mock.patch.object( + validators, + 'validate_log_type_for_port', + return_value=True): + ports_log = ( + self.rpc_callback.get_sg_log_info_for_port( + self.context, + resource_type=log_const.SECURITY_GROUP, + port_id=port_id) + ) + expected = [{ + 'event': log.event, + 'id': log.id, + 'ports_log': [{ + 'port_id': port_id, + 'security_group_rules': [ + {'direction': 'egress', + 'ethertype': u'IPv4', + 'security_group_id': sg_id}, + {'direction': 'egress', + 'ethertype': u'IPv6', + 'security_group_id': sg_id}, + {'direction': 'ingress', + 'ethertype': u'IPv4', + 'port_range_max': 13, + 'port_range_min': 11, + 'protocol': u'tcp', + 'source_ip_prefix': + utils.AuthenticIPNetwork( + '10.0.0.1/32'), + 'security_group_id': sg_id}, + {'direction': 'egress', + 'ethertype': u'IPv4', + 'protocol': u'icmp', + 'security_group_id': sg_id}] + }], + 'project_id': tenant_id + }] - self.assertEqual(expected, ports_log) - self._delete('ports', port_id) + self.assertEqual(expected, ports_log) + self._delete('ports', port_id) diff --git a/neutron/tests/unit/services/logapi/rpc/test_server.py b/neutron/tests/unit/services/logapi/rpc/test_server.py index 4d71a5d7115..1f7701df823 100644 --- a/neutron/tests/unit/services/logapi/rpc/test_server.py +++ b/neutron/tests/unit/services/logapi/rpc/test_server.py @@ -62,6 +62,26 @@ class LoggingApiNotificationTestCase(base.BaseTestCase): events.DELETED) +class TestRegisterValidateRPCMethods(base.BaseTestCase): + + def test_register_rpc_methods_method(self): + resource_type = 'security_group' + method = [{'fake_key1': 'fake_method1'}, + {'fake_key2': 'fake_method2'}] + expected = {resource_type: method} + server_rpc.RPC_RESOURCES_METHOD_MAP.clear() + server_rpc.register_rpc_methods(resource_type, method) + self.assertEqual(expected, server_rpc.RPC_RESOURCES_METHOD_MAP) + + def test_get_rpc_method(self): + resource_type = 'security_group' + method = [{'fake_key1': 'fake_method1'}, + {'fake_key2': 'fake_method2'}] + server_rpc.RPC_RESOURCES_METHOD_MAP = {resource_type: method} + actual = server_rpc.get_rpc_method('security_group', 'fake_key1') + self.assertEqual('fake_method1', actual) + + class LoggingApiSkeletonTestCase(base.BaseTestCase): @mock.patch("neutron.common.rpc.get_server") @@ -76,18 +96,33 @@ class LoggingApiSkeletonTestCase(base.BaseTestCase): @mock.patch("neutron.services.logapi.common.db_api." "get_sg_log_info_for_port") def test_get_sg_log_info_for_port(self, mock_callback): - test_obj = server_rpc.LoggingApiSkeleton() - m_context = mock.Mock() - port_id = '123' - test_obj.get_sg_log_info_for_port(m_context, port_id=port_id) - mock_callback.assert_called_with(m_context, port_id) + with mock.patch.object( + server_rpc, + 'get_rpc_method', + return_value=server_rpc.get_sg_log_info_for_port + ): + test_obj = server_rpc.LoggingApiSkeleton() + m_context = mock.Mock() + port_id = '123' + test_obj.get_sg_log_info_for_port( + m_context, + resource_type=log_const.SECURITY_GROUP, + port_id=port_id) + mock_callback.assert_called_with(m_context, port_id) @mock.patch("neutron.services.logapi.common.db_api." "get_sg_log_info_for_log_resources") def test_get_sg_log_info_for_log_resources(self, mock_callback): - test_obj = server_rpc.LoggingApiSkeleton() - m_context = mock.Mock() - log_resources = [mock.Mock()] - test_obj.get_sg_log_info_for_log_resources(m_context, - log_resources=log_resources) - mock_callback.assert_called_with(m_context, log_resources) + with mock.patch.object( + server_rpc, + 'get_rpc_method', + return_value=server_rpc.get_sg_log_info_for_log_resources + ): + test_obj = server_rpc.LoggingApiSkeleton() + m_context = mock.Mock() + log_resources = [mock.Mock()] + test_obj.get_sg_log_info_for_log_resources( + m_context, + resource_type=log_const.SECURITY_GROUP, + log_resources=log_resources) + mock_callback.assert_called_with(m_context, log_resources)