From b2825740488e7afc60479726d305b4fe8ea85361 Mon Sep 17 00:00:00 2001 From: Ihar Hrachyshka Date: Mon, 9 Jun 2014 15:10:04 +0200 Subject: [PATCH] Removed create_rpc_dispatcher methods Now that we don't have a special dispatcher class and we pass a list of endpoints to corresponding functions instead, those methods are unneeded. blueprint oslo-messaging Change-Id: If2b187fd8e553594212264f34b51b5b99c4630b2 --- neutron/common/rpc_compat.py | 8 ++++---- neutron/db/metering/metering_rpc.py | 3 --- .../plugins/bigswitch/agent/restproxy_agent.py | 4 ++-- neutron/plugins/bigswitch/plugin.py | 9 +++------ neutron/plugins/brocade/NeutronPlugin.py | 14 +++----------- .../plugins/cisco/n1kv/n1kv_neutron_plugin.py | 12 ++---------- .../plugins/hyperv/agent/hyperv_neutron_agent.py | 14 ++++---------- neutron/plugins/hyperv/hyperv_neutron_plugin.py | 6 +++--- neutron/plugins/hyperv/rpc_callbacks.py | 9 --------- neutron/plugins/ibm/agent/sdnve_neutron_agent.py | 7 ++----- neutron/plugins/ibm/sdnve_neutron_plugin.py | 13 +++---------- .../agent/linuxbridge_neutron_agent.py | 14 ++------------ neutron/plugins/linuxbridge/lb_neutron_plugin.py | 14 +++----------- neutron/plugins/midonet/plugin.py | 16 +++------------- neutron/plugins/ml2/plugin.py | 7 ++++--- neutron/plugins/ml2/rpc.py | 9 --------- .../plugins/mlnx/agent/eswitch_neutron_agent.py | 15 ++------------- neutron/plugins/mlnx/mlnx_plugin.py | 7 ++++--- neutron/plugins/mlnx/rpc_callbacks.py | 10 ---------- neutron/plugins/nec/agent/nec_neutron_agent.py | 4 ++-- neutron/plugins/nec/nec_plugin.py | 12 ++---------- .../plugins/ofagent/agent/ofa_neutron_agent.py | 12 ++---------- .../oneconvergence/agent/nvsd_neutron_agent.py | 4 ++-- neutron/plugins/oneconvergence/plugin.py | 10 +++------- .../openvswitch/agent/ovs_neutron_agent.py | 12 ++---------- .../plugins/openvswitch/ovs_neutron_plugin.py | 14 +++----------- neutron/plugins/ryu/agent/ryu_neutron_agent.py | 7 ++----- neutron/plugins/ryu/ryu_neutron_plugin.py | 8 ++------ neutron/plugins/vmware/dhcp_meta/rpc.py | 9 --------- neutron/plugins/vmware/dhcpmeta_modes.py | 6 ++++-- neutron/services/firewall/fwaas_plugin.py | 9 ++------- neutron/services/l3_router/l3_router_plugin.py | 13 ++----------- .../drivers/common/agent_driver_base.py | 10 +++++----- neutron/services/metering/metering_plugin.py | 6 ++---- .../services/vpn/device_drivers/cisco_ipsec.py | 9 ++------- neutron/services/vpn/device_drivers/ipsec.py | 9 ++------- .../services/vpn/service_drivers/cisco_ipsec.py | 9 ++------- neutron/services/vpn/service_drivers/ipsec.py | 9 ++------- neutron/tests/unit/bigswitch/test_base.py | 6 ------ .../tests/unit/bigswitch/test_security_groups.py | 2 +- neutron/tests/unit/ml2/test_port_binding.py | 2 +- neutron/tests/unit/ml2/test_security_group.py | 5 +++-- .../unit/oneconvergence/test_security_group.py | 5 +++-- .../unit/openvswitch/test_ovs_security_group.py | 5 +++-- .../tests/unit/ryu/test_ryu_security_group.py | 4 ++-- .../unit/services/firewall/test_fwaas_plugin.py | 6 +++--- neutron/tests/unit/test_agent_rpc.py | 14 +++++++------- 47 files changed, 110 insertions(+), 302 deletions(-) diff --git a/neutron/common/rpc_compat.py b/neutron/common/rpc_compat.py index 697c72a5a88..8c16c2c5bfa 100644 --- a/neutron/common/rpc_compat.py +++ b/neutron/common/rpc_compat.py @@ -108,15 +108,15 @@ class Service(service.Service): LOG.debug("Creating Consumer connection for Service %s" % self.topic) - dispatcher = [self.manager] + endpoints = [self.manager] # Share this same connection for these Consumers - self.conn.create_consumer(self.topic, dispatcher, fanout=False) + self.conn.create_consumer(self.topic, endpoints, fanout=False) node_topic = '%s.%s' % (self.topic, self.host) - self.conn.create_consumer(node_topic, dispatcher, fanout=False) + self.conn.create_consumer(node_topic, endpoints, fanout=False) - self.conn.create_consumer(self.topic, dispatcher, fanout=True) + self.conn.create_consumer(self.topic, endpoints, fanout=True) # Hook to allow the manager to do other initializations after # the rpc connection is created. diff --git a/neutron/db/metering/metering_rpc.py b/neutron/db/metering/metering_rpc.py index b55a0cf4c7e..c0bbd51ad30 100644 --- a/neutron/db/metering/metering_rpc.py +++ b/neutron/db/metering/metering_rpc.py @@ -30,9 +30,6 @@ class MeteringRpcCallbacks(object): def __init__(self, meter_plugin): self.meter_plugin = meter_plugin - def create_rpc_dispatcher(self): - return [self] - def get_sync_data_metering(self, context, **kwargs): l3_plugin = manager.NeutronManager.get_service_plugins().get( service_constants.L3_ROUTER_NAT) diff --git a/neutron/plugins/bigswitch/agent/restproxy_agent.py b/neutron/plugins/bigswitch/agent/restproxy_agent.py index 6cdf5913b07..97aa7d0e3f4 100644 --- a/neutron/plugins/bigswitch/agent/restproxy_agent.py +++ b/neutron/plugins/bigswitch/agent/restproxy_agent.py @@ -105,10 +105,10 @@ class RestProxyAgent(rpc_compat.RpcCallback, self.topic = topics.AGENT self.plugin_rpc = PluginApi(topics.PLUGIN) self.context = q_context.get_admin_context_without_session() - self.dispatcher = [self] + self.endpoints = [self] consumers = [[topics.PORT, topics.UPDATE], [topics.SECURITY_GROUP, topics.UPDATE]] - self.connection = agent_rpc.create_consumers(self.dispatcher, + self.connection = agent_rpc.create_consumers(self.endpoints, self.topic, consumers) diff --git a/neutron/plugins/bigswitch/plugin.py b/neutron/plugins/bigswitch/plugin.py index bdac0cf1a5f..c13c45b656a 100644 --- a/neutron/plugins/bigswitch/plugin.py +++ b/neutron/plugins/bigswitch/plugin.py @@ -119,9 +119,6 @@ class RestProxyCallbacks(rpc_compat.RpcCallback, RPC_API_VERSION = '1.1' - def create_rpc_dispatcher(self): - return [self, agents_db.AgentExtRpcCallback()] - def get_port_from_device(self, device): port_id = re.sub(r"^tap", "", device) port = self.get_port_and_sgs(port_id) @@ -505,9 +502,9 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base, self.agent_notifiers[const.AGENT_TYPE_DHCP] = ( self._dhcp_agent_notifier ) - self.callbacks = RestProxyCallbacks() - self.dispatcher = self.callbacks.create_rpc_dispatcher() - self.conn.create_consumer(self.topic, self.dispatcher, + self.endpoints = [RestProxyCallbacks(), + agents_db.AgentExtRpcCallback()] + self.conn.create_consumer(self.topic, self.endpoints, fanout=False) # Consume from all consumers in threads self.conn.consume_in_threads() diff --git a/neutron/plugins/brocade/NeutronPlugin.py b/neutron/plugins/brocade/NeutronPlugin.py index 5e0ee782708..c633085d074 100644 --- a/neutron/plugins/brocade/NeutronPlugin.py +++ b/neutron/plugins/brocade/NeutronPlugin.py @@ -91,14 +91,6 @@ class BridgeRpcCallbacks(rpc_compat.RpcCallback, # 1.1 Support Security Group RPC TAP_PREFIX_LEN = 3 - def create_rpc_dispatcher(self): - """Get the rpc dispatcher for this manager. - - If a manager would like to set an rpc API version, or support more than - one class as the target of rpc messages, override this method. - """ - return [self, agents_db.AgentExtRpcCallback()] - @classmethod def get_port_from_device(cls, device): """Get port from the brocade specific db.""" @@ -262,10 +254,10 @@ class BrocadePluginV2(db_base_plugin_v2.NeutronDbPluginV2, self.rpc_context = context.RequestContext('neutron', 'neutron', is_admin=False) self.conn = rpc_compat.create_connection(new=True) - self.callbacks = BridgeRpcCallbacks() - self.dispatcher = self.callbacks.create_rpc_dispatcher() + self.endpoints = [BridgeRpcCallbacks(), + agents_db.AgentExtRpcCallback()] for svc_topic in self.service_topics.values(): - self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False) + self.conn.create_consumer(svc_topic, self.endpoints, fanout=False) # Consume from all consumers in threads self.conn.consume_in_threads() self.notifier = AgentNotifierApi(topics.AGENT) diff --git a/neutron/plugins/cisco/n1kv/n1kv_neutron_plugin.py b/neutron/plugins/cisco/n1kv/n1kv_neutron_plugin.py index 83defcf7cde..a31e570f0f0 100644 --- a/neutron/plugins/cisco/n1kv/n1kv_neutron_plugin.py +++ b/neutron/plugins/cisco/n1kv/n1kv_neutron_plugin.py @@ -68,14 +68,6 @@ class N1kvRpcCallbacks(rpc_compat.RpcCallback, # Set RPC API version to 1.1 by default. RPC_API_VERSION = '1.1' - def create_rpc_dispatcher(self): - """Get the rpc dispatcher for this rpc manager. - - If a manager would like to set an rpc API version, or support more than - one class as the target of rpc messages, override this method. - """ - return [self, agents_db.AgentExtRpcCallback()] - class N1kvNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2, external_net_db.External_net_db_mixin, @@ -135,9 +127,9 @@ class N1kvNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2, self.service_topics = {svc_constants.CORE: topics.PLUGIN, svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN} self.conn = rpc_compat.create_connection(new=True) - self.dispatcher = N1kvRpcCallbacks().create_rpc_dispatcher() + self.endpoints = [N1kvRpcCallbacks(), agents_db.AgentExtRpcCallback()] for svc_topic in self.service_topics.values(): - self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False) + self.conn.create_consumer(svc_topic, self.endpoints, fanout=False) self.dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI() self.l3_agent_notifier = l3_rpc_agent_api.L3AgentNotifyAPI() # Consume from all consumers in threads diff --git a/neutron/plugins/hyperv/agent/hyperv_neutron_agent.py b/neutron/plugins/hyperv/agent/hyperv_neutron_agent.py index f76f751f843..07a5ed7764a 100644 --- a/neutron/plugins/hyperv/agent/hyperv_neutron_agent.py +++ b/neutron/plugins/hyperv/agent/hyperv_neutron_agent.py @@ -97,16 +97,13 @@ class HyperVSecurityAgent(rpc_compat.RpcCallback, def _setup_rpc(self): self.topic = topics.AGENT - self.dispatcher = self._create_rpc_dispatcher() + self.endpoints = [HyperVSecurityCallbackMixin(self)] consumers = [[topics.SECURITY_GROUP, topics.UPDATE]] - self.connection = agent_rpc.create_consumers(self.dispatcher, + self.connection = agent_rpc.create_consumers(self.endpoints, self.topic, consumers) - def _create_rpc_dispatcher(self): - return [HyperVSecurityCallbackMixin(self)] - class HyperVSecurityCallbackMixin(rpc_compat.RpcCallback, sg_rpc.SecurityGroupAgentRpcCallbackMixin): @@ -165,13 +162,13 @@ class HyperVNeutronAgent(rpc_compat.RpcCallback): # RPC network init self.context = context.get_admin_context_without_session() # Handle updates from service - self.dispatcher = self._create_rpc_dispatcher() + self.endpoints = [self] # Define the listening consumers for the agent consumers = [[topics.PORT, topics.UPDATE], [topics.NETWORK, topics.DELETE], [topics.PORT, topics.DELETE], [constants.TUNNEL, topics.UPDATE]] - self.connection = agent_rpc.create_consumers(self.dispatcher, + self.connection = agent_rpc.create_consumers(self.endpoints, self.topic, consumers) @@ -233,9 +230,6 @@ class HyperVNeutronAgent(rpc_compat.RpcCallback): network_type, physical_network, segmentation_id, port['admin_state_up']) - def _create_rpc_dispatcher(self): - return [self] - def _get_vswitch_name(self, network_type, physical_network): if network_type != p_const.TYPE_LOCAL: vswitch_name = self._get_vswitch_for_physical_network( diff --git a/neutron/plugins/hyperv/hyperv_neutron_plugin.py b/neutron/plugins/hyperv/hyperv_neutron_plugin.py index 2b2414845c4..4307e513357 100644 --- a/neutron/plugins/hyperv/hyperv_neutron_plugin.py +++ b/neutron/plugins/hyperv/hyperv_neutron_plugin.py @@ -190,10 +190,10 @@ class HyperVNeutronPlugin(agents_db.AgentDbMixin, self.conn = rpc_compat.create_connection(new=True) self.notifier = agent_notifier_api.AgentNotifierApi( topics.AGENT) - self.callbacks = rpc_callbacks.HyperVRpcCallbacks(self.notifier) - self.dispatcher = self.callbacks.create_rpc_dispatcher() + self.endpoints = [rpc_callbacks.HyperVRpcCallbacks(self.notifier), + agents_db.AgentExtRpcCallback()] for svc_topic in self.service_topics.values(): - self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False) + self.conn.create_consumer(svc_topic, self.endpoints, fanout=False) # Consume from all consumers in threads self.conn.consume_in_threads() diff --git a/neutron/plugins/hyperv/rpc_callbacks.py b/neutron/plugins/hyperv/rpc_callbacks.py index e967286d585..874059a5836 100644 --- a/neutron/plugins/hyperv/rpc_callbacks.py +++ b/neutron/plugins/hyperv/rpc_callbacks.py @@ -18,7 +18,6 @@ from neutron.common import constants as q_const from neutron.common import rpc_compat -from neutron.db import agents_db from neutron.db import dhcp_rpc_base from neutron.db import l3_rpc_base from neutron.openstack.common import log as logging @@ -41,14 +40,6 @@ class HyperVRpcCallbacks( self.notifier = notifier self._db = hyperv_db.HyperVPluginDB() - def create_rpc_dispatcher(self): - '''Get the rpc dispatcher for this manager. - - If a manager would like to set an rpc API version, or support more than - one class as the target of rpc messages, override this method. - ''' - return [self, agents_db.AgentExtRpcCallback()] - def get_device_details(self, rpc_context, **kwargs): """Agent requests device details.""" agent_id = kwargs.get('agent_id') diff --git a/neutron/plugins/ibm/agent/sdnve_neutron_agent.py b/neutron/plugins/ibm/agent/sdnve_neutron_agent.py index b1fa1e8b65a..e1c8d3ed71c 100644 --- a/neutron/plugins/ibm/agent/sdnve_neutron_agent.py +++ b/neutron/plugins/ibm/agent/sdnve_neutron_agent.py @@ -123,10 +123,10 @@ class SdnveNeutronAgent(rpc_compat.RpcCallback): self.state_rpc = agent_rpc.PluginReportStateAPI(topics.PLUGIN) self.context = context.get_admin_context_without_session() - self.dispatcher = self.create_rpc_dispatcher() + self.endpoints = [self] consumers = [[constants.INFO, topics.UPDATE]] - self.connection = agent_rpc.create_consumers(self.dispatcher, + self.connection = agent_rpc.create_consumers(self.endpoints, self.topic, consumers) if self.polling_interval: @@ -154,9 +154,6 @@ class SdnveNeutronAgent(rpc_compat.RpcCallback): "connection-mode", "out-of-band") - def create_rpc_dispatcher(self): - return [self] - def setup_integration_br(self, bridge_name, reset_br, out_of_band, controller_ip=None): '''Sets up the integration bridge. diff --git a/neutron/plugins/ibm/sdnve_neutron_plugin.py b/neutron/plugins/ibm/sdnve_neutron_plugin.py index 80ddf4f70bb..cf127f00146 100644 --- a/neutron/plugins/ibm/sdnve_neutron_plugin.py +++ b/neutron/plugins/ibm/sdnve_neutron_plugin.py @@ -48,13 +48,6 @@ class SdnveRpcCallbacks(): def __init__(self, notifier): self.notifier = notifier # used to notify the agent - def create_rpc_dispatcher(self): - '''Get the rpc dispatcher for this manager. - If a manager would like to set an rpc API version, or support more than - one class as the target of rpc messages, override this method. - ''' - return [self, agents_db.AgentExtRpcCallback()] - def sdnve_info(self, rpc_context, **kwargs): '''Update new information.''' info = kwargs.get('info') @@ -140,9 +133,9 @@ class SdnvePluginV2(db_base_plugin_v2.NeutronDbPluginV2, self.topic = topics.PLUGIN self.conn = rpc_compat.create_connection(new=True) self.notifier = AgentNotifierApi(topics.AGENT) - self.callbacks = SdnveRpcCallbacks(self.notifier) - self.dispatcher = self.callbacks.create_rpc_dispatcher() - self.conn.create_consumer(self.topic, self.dispatcher, + self.endpoints = [SdnveRpcCallbacks(self.notifier), + agents_db.AgentExtRpcCallback()] + self.conn.create_consumer(self.topic, self.endpoints, fanout=False) # Consume from all consumers in threads self.conn.consume_in_threads() diff --git a/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py b/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py index 5af3f674a37..65fddfa1fcc 100755 --- a/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py +++ b/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py @@ -809,14 +809,6 @@ class LinuxBridgeRpcCallbacks(rpc_compat.RpcCallback, getattr(self, method)(context, values) - def create_rpc_dispatcher(self): - '''Get the rpc dispatcher for this manager. - - If a manager would like to set an rpc API version, or support more than - one class as the target of rpc messages, override this method. - ''' - return [self] - class LinuxBridgePluginApi(agent_rpc.PluginApi, sg_rpc.SecurityGroupServerRpcApiMixin): @@ -876,9 +868,7 @@ class LinuxBridgeNeutronAgentRPC(sg_rpc.SecurityGroupAgentRpcMixin): # RPC network init self.context = context.get_admin_context_without_session() # Handle updates from service - self.callbacks = LinuxBridgeRpcCallbacks(self.context, - self) - self.dispatcher = self.callbacks.create_rpc_dispatcher() + self.endpoints = [LinuxBridgeRpcCallbacks(self.context, self)] # Define the listening consumers for the agent consumers = [[topics.PORT, topics.UPDATE], [topics.NETWORK, topics.DELETE], @@ -886,7 +876,7 @@ class LinuxBridgeNeutronAgentRPC(sg_rpc.SecurityGroupAgentRpcMixin): if cfg.CONF.VXLAN.l2_population: consumers.append([topics.L2POPULATION, topics.UPDATE, cfg.CONF.host]) - self.connection = agent_rpc.create_consumers(self.dispatcher, + self.connection = agent_rpc.create_consumers(self.endpoints, self.topic, consumers) report_interval = cfg.CONF.AGENT.report_interval diff --git a/neutron/plugins/linuxbridge/lb_neutron_plugin.py b/neutron/plugins/linuxbridge/lb_neutron_plugin.py index 025048e0aad..412275d24eb 100644 --- a/neutron/plugins/linuxbridge/lb_neutron_plugin.py +++ b/neutron/plugins/linuxbridge/lb_neutron_plugin.py @@ -65,14 +65,6 @@ class LinuxBridgeRpcCallbacks(rpc_compat.RpcCallback, # Device names start with "tap" TAP_PREFIX_LEN = 3 - def create_rpc_dispatcher(self): - '''Get the rpc dispatcher for this manager. - - If a manager would like to set an rpc API version, or support more than - one class as the target of rpc messages, override this method. - ''' - return [self, agents_db.AgentExtRpcCallback()] - @classmethod def get_port_from_device(cls, device): port = db.get_port_from_device(device[cls.TAP_PREFIX_LEN:]) @@ -281,10 +273,10 @@ class LinuxBridgePluginV2(db_base_plugin_v2.NeutronDbPluginV2, self.service_topics = {svc_constants.CORE: topics.PLUGIN, svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN} self.conn = rpc_compat.create_connection(new=True) - self.callbacks = LinuxBridgeRpcCallbacks() - self.dispatcher = self.callbacks.create_rpc_dispatcher() + self.endpoints = [LinuxBridgeRpcCallbacks(), + agents_db.AgentExtRpcCallback()] for svc_topic in self.service_topics.values(): - self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False) + self.conn.create_consumer(svc_topic, self.endpoints, fanout=False) # Consume from all consumers in threads self.conn.consume_in_threads() self.notifier = AgentNotifierApi(topics.AGENT) diff --git a/neutron/plugins/midonet/plugin.py b/neutron/plugins/midonet/plugin.py index 7f2dcdfd135..9a706d4a523 100644 --- a/neutron/plugins/midonet/plugin.py +++ b/neutron/plugins/midonet/plugin.py @@ -180,16 +180,6 @@ class MidoRpcCallbacks(rpc_compat.RpcCallback, dhcp_rpc_base.DhcpRpcCallbackMixin): RPC_API_VERSION = '1.1' - def create_rpc_dispatcher(self): - """Get the rpc dispatcher for this manager. - - This a basic implementation that will call the plugin like get_ports - and handle basic events - If a manager would like to set an rpc API version, or support more than - one class as the target of rpc messages, override this method. - """ - return [self, agents_db.AgentExtRpcCallback()] - class MidonetPluginException(n_exc.NeutronException): message = _("%(msg)s") @@ -382,9 +372,9 @@ class MidonetPluginV2(db_base_plugin_v2.NeutronDbPluginV2, # RPC support self.topic = topics.PLUGIN self.conn = rpc_compat.create_connection(new=True) - self.callbacks = MidoRpcCallbacks() - self.dispatcher = self.callbacks.create_rpc_dispatcher() - self.conn.create_consumer(self.topic, self.dispatcher, + self.endpoints = [MidoRpcCallbacks(), + agents_db.AgentExtRpcCallback()] + self.conn.create_consumer(self.topic, self.endpoints, fanout=False) # Consume from all consumers in threads self.conn.consume_in_threads() diff --git a/neutron/plugins/ml2/plugin.py b/neutron/plugins/ml2/plugin.py index e839538e3e4..a324637c5f8 100644 --- a/neutron/plugins/ml2/plugin.py +++ b/neutron/plugins/ml2/plugin.py @@ -25,6 +25,7 @@ from neutron.common import constants as const from neutron.common import exceptions as exc from neutron.common import rpc_compat from neutron.common import topics +from neutron.db import agents_db from neutron.db import agentschedulers_db from neutron.db import allowedaddresspairs_db as addr_pair_db from neutron.db import db_base_plugin_v2 @@ -126,11 +127,11 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2, ) def start_rpc_listeners(self): - self.callbacks = rpc.RpcCallbacks(self.notifier, self.type_manager) + self.endpoints = [rpc.RpcCallbacks(self.notifier, self.type_manager), + agents_db.AgentExtRpcCallback()] self.topic = topics.PLUGIN self.conn = rpc_compat.create_connection(new=True) - self.dispatcher = self.callbacks.create_rpc_dispatcher() - self.conn.create_consumer(self.topic, self.dispatcher, + self.conn.create_consumer(self.topic, self.endpoints, fanout=False) return self.conn.consume_in_threads() diff --git a/neutron/plugins/ml2/rpc.py b/neutron/plugins/ml2/rpc.py index e5068afb4c2..c744147c687 100644 --- a/neutron/plugins/ml2/rpc.py +++ b/neutron/plugins/ml2/rpc.py @@ -19,7 +19,6 @@ from neutron.agent import securitygroups_rpc as sg_rpc from neutron.common import constants as q_const from neutron.common import rpc_compat from neutron.common import topics -from neutron.db import agents_db from neutron.db import api as db_api from neutron.db import dhcp_rpc_base from neutron.db import securitygroups_rpc_base as sg_db_rpc @@ -58,14 +57,6 @@ class RpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin, # test in H3. super(RpcCallbacks, self).__init__(notifier, type_manager) - def create_rpc_dispatcher(self): - '''Get the rpc dispatcher for this manager. - - If a manager would like to set an rpc API version, or support more than - one class as the target of rpc messages, override this method. - ''' - return [self, agents_db.AgentExtRpcCallback()] - @classmethod def _device_to_port_id(cls, device): # REVISIT(rkukura): Consider calling into MechanismDrivers to diff --git a/neutron/plugins/mlnx/agent/eswitch_neutron_agent.py b/neutron/plugins/mlnx/agent/eswitch_neutron_agent.py index 94fd2b89a42..f60f02bb774 100644 --- a/neutron/plugins/mlnx/agent/eswitch_neutron_agent.py +++ b/neutron/plugins/mlnx/agent/eswitch_neutron_agent.py @@ -210,15 +210,6 @@ class MlnxEswitchRpcCallbacks(rpc_compat.RpcCallback, else: LOG.debug(_("No port %s defined on agent."), port['id']) - def create_rpc_dispatcher(self): - """Get the rpc dispatcher for this manager. - - If a manager would like to set an rpc API version, - or support more than one class as the target of rpc messages, - override this method. - """ - return [self] - class MlnxEswitchPluginApi(agent_rpc.PluginApi, sg_rpc.SecurityGroupServerRpcApiMixin): @@ -268,14 +259,12 @@ class MlnxEswitchNeutronAgent(sg_rpc.SecurityGroupAgentRpcMixin): # RPC network init self.context = context.get_admin_context_without_session() # Handle updates from service - self.callbacks = MlnxEswitchRpcCallbacks(self.context, - self) - self.dispatcher = self.callbacks.create_rpc_dispatcher() + self.endpoints = [MlnxEswitchRpcCallbacks(self.context, self)] # Define the listening consumers for the agent consumers = [[topics.PORT, topics.UPDATE], [topics.NETWORK, topics.DELETE], [topics.SECURITY_GROUP, topics.UPDATE]] - self.connection = agent_rpc.create_consumers(self.dispatcher, + self.connection = agent_rpc.create_consumers(self.endpoints, self.topic, consumers) diff --git a/neutron/plugins/mlnx/mlnx_plugin.py b/neutron/plugins/mlnx/mlnx_plugin.py index 05d639a497e..16d72df5534 100644 --- a/neutron/plugins/mlnx/mlnx_plugin.py +++ b/neutron/plugins/mlnx/mlnx_plugin.py @@ -28,6 +28,7 @@ from neutron.common import exceptions as n_exc from neutron.common import rpc_compat from neutron.common import topics from neutron.common import utils +from neutron.db import agents_db from neutron.db import agentschedulers_db from neutron.db import db_base_plugin_v2 from neutron.db import external_net_db @@ -120,10 +121,10 @@ class MellanoxEswitchPlugin(db_base_plugin_v2.NeutronDbPluginV2, self.service_topics = {svc_constants.CORE: topics.PLUGIN, svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN} self.conn = rpc_compat.create_connection(new=True) - self.callbacks = rpc_callbacks.MlnxRpcCallbacks() - self.dispatcher = self.callbacks.create_rpc_dispatcher() + self.endpoints = [rpc_callbacks.MlnxRpcCallbacks(), + agents_db.AgentExtRpcCallback()] for svc_topic in self.service_topics.values(): - self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False) + self.conn.create_consumer(svc_topic, self.endpoints, fanout=False) # Consume from all consumers in threads self.conn.consume_in_threads() self.notifier = agent_notify_api.AgentNotifierApi(topics.AGENT) diff --git a/neutron/plugins/mlnx/rpc_callbacks.py b/neutron/plugins/mlnx/rpc_callbacks.py index 0eda5143687..346d35822d1 100644 --- a/neutron/plugins/mlnx/rpc_callbacks.py +++ b/neutron/plugins/mlnx/rpc_callbacks.py @@ -18,7 +18,6 @@ from oslo.config import cfg from neutron.common import constants as q_const from neutron.common import rpc_compat -from neutron.db import agents_db from neutron.db import api as db_api from neutron.db import dhcp_rpc_base from neutron.db import l3_rpc_base @@ -40,15 +39,6 @@ class MlnxRpcCallbacks(rpc_compat.RpcCallback, #to be compatible with Linux Bridge Agent on Network Node TAP_PREFIX_LEN = 3 - def create_rpc_dispatcher(self): - """Get the rpc dispatcher for this manager. - - If a manager would like to set an RPC API version, - or support more than one class as the target of RPC messages, - override this method. - """ - return [self, agents_db.AgentExtRpcCallback()] - @classmethod def get_port_from_device(cls, device): """Get port according to device. diff --git a/neutron/plugins/nec/agent/nec_neutron_agent.py b/neutron/plugins/nec/agent/nec_neutron_agent.py index c1f580ac2e0..6ab5f82b4ee 100755 --- a/neutron/plugins/nec/agent/nec_neutron_agent.py +++ b/neutron/plugins/nec/agent/nec_neutron_agent.py @@ -156,11 +156,11 @@ class NECNeutronAgent(object): self, self.sg_agent) self.callback_sg = SecurityGroupAgentRpcCallback(self.context, self.sg_agent) - self.dispatcher = [self.callback_nec, self.callback_sg] + self.endpoints = [self.callback_nec, self.callback_sg] # Define the listening consumer for the agent consumers = [[topics.PORT, topics.UPDATE], [topics.SECURITY_GROUP, topics.UPDATE]] - self.connection = agent_rpc.create_consumers(self.dispatcher, + self.connection = agent_rpc.create_consumers(self.endpoints, self.topic, consumers) diff --git a/neutron/plugins/nec/nec_plugin.py b/neutron/plugins/nec/nec_plugin.py index 266dab7468f..f2225e733bc 100644 --- a/neutron/plugins/nec/nec_plugin.py +++ b/neutron/plugins/nec/nec_plugin.py @@ -146,14 +146,14 @@ class NECPluginV2(db_base_plugin_v2.NeutronDbPluginV2, # NOTE: callback_sg is referred to from the sg unit test. self.callback_sg = SecurityGroupServerRpcCallback() - self.dispatcher = [ + self.endpoints = [ NECPluginV2RPCCallbacks(self.safe_reference), DhcpRpcCallback(), L3RpcCallback(), self.callback_sg, agents_db.AgentExtRpcCallback()] for svc_topic in self.service_topics.values(): - self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False) + self.conn.create_consumer(svc_topic, self.endpoints, fanout=False) # Consume from all consumers in threads self.conn.consume_in_threads() @@ -715,14 +715,6 @@ class NECPluginV2RPCCallbacks(rpc_compat.RpcCallback): super(NECPluginV2RPCCallbacks, self).__init__() self.plugin = plugin - def create_rpc_dispatcher(self): - '''Get the rpc dispatcher for this manager. - - If a manager would like to set an rpc API version, or support more than - one class as the target of rpc messages, override this method. - ''' - return [self] - def update_ports(self, rpc_context, **kwargs): """Update ports' information and activate/deavtivate them. diff --git a/neutron/plugins/ofagent/agent/ofa_neutron_agent.py b/neutron/plugins/ofagent/agent/ofa_neutron_agent.py index c79d77a9154..6e6cd84d7f7 100644 --- a/neutron/plugins/ofagent/agent/ofa_neutron_agent.py +++ b/neutron/plugins/ofagent/agent/ofa_neutron_agent.py @@ -286,13 +286,13 @@ class OFANeutronAgent(rpc_compat.RpcCallback, # RPC network init self.context = context.get_admin_context_without_session() # Handle updates from service - self.dispatcher = self.create_rpc_dispatcher() + self.endpoints = [self] # Define the listening consumers for the agent consumers = [[topics.PORT, topics.UPDATE], [topics.NETWORK, topics.DELETE], [constants.TUNNEL, topics.UPDATE], [topics.SECURITY_GROUP, topics.UPDATE]] - self.connection = agent_rpc.create_consumers(self.dispatcher, + self.connection = agent_rpc.create_consumers(self.endpoints, self.topic, consumers) report_interval = cfg.CONF.AGENT.report_interval @@ -344,14 +344,6 @@ class OFANeutronAgent(rpc_compat.RpcCallback, return self.setup_tunnel_port(tun_name, tunnel_ip, tunnel_type) - def create_rpc_dispatcher(self): - """Get the rpc dispatcher for this manager. - - If a manager would like to set an rpc API version, or support more than - one class as the target of rpc messages, override this method. - """ - return [self] - def _provision_local_vlan_outbound_for_tunnel(self, lvid, segmentation_id, ofports): br = self.tun_br diff --git a/neutron/plugins/oneconvergence/agent/nvsd_neutron_agent.py b/neutron/plugins/oneconvergence/agent/nvsd_neutron_agent.py index 0ef6348dfb4..377cdda1e99 100644 --- a/neutron/plugins/oneconvergence/agent/nvsd_neutron_agent.py +++ b/neutron/plugins/oneconvergence/agent/nvsd_neutron_agent.py @@ -119,11 +119,11 @@ class NVSDNeutronAgent(rpc_compat.RpcCallback): self, self.sg_agent) self.callback_sg = SecurityGroupAgentRpcCallback(self.context, self.sg_agent) - self.dispatcher = [self.callback_oc, self.callback_sg] + self.endpoints = [self.callback_oc, self.callback_sg] # Define the listening consumer for the agent consumers = [[topics.PORT, topics.UPDATE], [topics.SECURITY_GROUP, topics.UPDATE]] - self.connection = agent_rpc.create_consumers(self.dispatcher, + self.connection = agent_rpc.create_consumers(self.endpoints, self.topic, consumers) diff --git a/neutron/plugins/oneconvergence/plugin.py b/neutron/plugins/oneconvergence/plugin.py index 4411bd20d60..257ab5494e2 100644 --- a/neutron/plugins/oneconvergence/plugin.py +++ b/neutron/plugins/oneconvergence/plugin.py @@ -58,10 +58,6 @@ class NVSDPluginRpcCallbacks(rpc_compat.RpcCallback, RPC_API_VERSION = '1.1' - def create_rpc_dispatcher(self): - """Get the rpc dispatcher for this manager.""" - return [self, agents_db.AgentExtRpcCallback()] - @staticmethod def get_port_from_device(device): port = nvsd_db.get_port_from_device(device) @@ -165,10 +161,10 @@ class OneConvergencePluginV2(db_base_plugin_v2.NeutronDbPluginV2, self.agent_notifiers[q_const.AGENT_TYPE_L3] = ( l3_rpc_agent_api.L3AgentNotifyAPI() ) - self.callbacks = NVSDPluginRpcCallbacks() - self.dispatcher = self.callbacks.create_rpc_dispatcher() + self.endpoints = [NVSDPluginRpcCallbacks(), + agents_db.AgentExtRpcCallback()] for svc_topic in self.service_topics.values(): - self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False) + self.conn.create_consumer(svc_topic, self.endpoints, fanout=False) # Consume from all consumers in threads self.conn.consume_in_threads() diff --git a/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py b/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py index a66d773c82c..c5b136b068f 100644 --- a/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py +++ b/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py @@ -249,7 +249,7 @@ class OVSNeutronAgent(rpc_compat.RpcCallback, # RPC network init self.context = context.get_admin_context_without_session() # Handle updates from service - self.dispatcher = self.create_rpc_dispatcher() + self.endpoints = [self] # Define the listening consumers for the agent consumers = [[topics.PORT, topics.UPDATE], [topics.NETWORK, topics.DELETE], @@ -258,7 +258,7 @@ class OVSNeutronAgent(rpc_compat.RpcCallback, if self.l2_pop: consumers.append([topics.L2POPULATION, topics.UPDATE, cfg.CONF.host]) - self.connection = agent_rpc.create_consumers(self.dispatcher, + self.connection = agent_rpc.create_consumers(self.endpoints, self.topic, consumers) report_interval = cfg.CONF.AGENT.report_interval @@ -493,14 +493,6 @@ class OVSNeutronAgent(rpc_compat.RpcCallback, else: LOG.warning(_('Action %s not supported'), action) - def create_rpc_dispatcher(self): - '''Get the rpc dispatcher for this manager. - - If a manager would like to set an rpc API version, or support more than - one class as the target of rpc messages, override this method. - ''' - return [self] - def provision_local_vlan(self, net_uuid, network_type, physical_network, segmentation_id): '''Provisions a local VLAN. diff --git a/neutron/plugins/openvswitch/ovs_neutron_plugin.py b/neutron/plugins/openvswitch/ovs_neutron_plugin.py index bf35bb7a4da..31698a3df54 100644 --- a/neutron/plugins/openvswitch/ovs_neutron_plugin.py +++ b/neutron/plugins/openvswitch/ovs_neutron_plugin.py @@ -73,14 +73,6 @@ class OVSRpcCallbacks(rpc_compat.RpcCallback, self.notifier = notifier self.tunnel_type = tunnel_type - def create_rpc_dispatcher(self): - '''Get the rpc dispatcher for this manager. - - If a manager would like to set an rpc API version, or support more than - one class as the target of rpc messages, override this method. - ''' - return [self, agents_db.AgentExtRpcCallback()] - @classmethod def get_port_from_device(cls, device): port = ovs_db_v2.get_port_from_device(device) @@ -341,10 +333,10 @@ class OVSNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2, self.agent_notifiers[q_const.AGENT_TYPE_L3] = ( l3_rpc_agent_api.L3AgentNotifyAPI() ) - self.callbacks = OVSRpcCallbacks(self.notifier, self.tunnel_type) - self.dispatcher = self.callbacks.create_rpc_dispatcher() + self.endpoints = [OVSRpcCallbacks(self.notifier, self.tunnel_type), + agents_db.AgentExtRpcCallback()] for svc_topic in self.service_topics.values(): - self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False) + self.conn.create_consumer(svc_topic, self.endpoints, fanout=False) # Consume from all consumers in threads self.conn.consume_in_threads() diff --git a/neutron/plugins/ryu/agent/ryu_neutron_agent.py b/neutron/plugins/ryu/agent/ryu_neutron_agent.py index 6086113c7f5..d1fac318525 100755 --- a/neutron/plugins/ryu/agent/ryu_neutron_agent.py +++ b/neutron/plugins/ryu/agent/ryu_neutron_agent.py @@ -200,16 +200,13 @@ class OVSNeutronOFPRyuAgent(rpc_compat.RpcCallback, self.topic = topics.AGENT self.plugin_rpc = RyuPluginApi(topics.PLUGIN) self.context = q_context.get_admin_context_without_session() - self.dispatcher = self._create_rpc_dispatcher() + self.endpoints = [self] consumers = [[topics.PORT, topics.UPDATE], [topics.SECURITY_GROUP, topics.UPDATE]] - self.connection = agent_rpc.create_consumers(self.dispatcher, + self.connection = agent_rpc.create_consumers(self.endpoints, self.topic, consumers) - def _create_rpc_dispatcher(self): - return [self] - def _setup_integration_br(self, root_helper, integ_br, tunnel_ip, ovsdb_port, ovsdb_ip): self.int_br = OVSBridge(integ_br, root_helper) diff --git a/neutron/plugins/ryu/ryu_neutron_plugin.py b/neutron/plugins/ryu/ryu_neutron_plugin.py index 0e9405a985f..9fd6bf9893a 100644 --- a/neutron/plugins/ryu/ryu_neutron_plugin.py +++ b/neutron/plugins/ryu/ryu_neutron_plugin.py @@ -57,9 +57,6 @@ class RyuRpcCallbacks(rpc_compat.RpcCallback, super(RyuRpcCallbacks, self).__init__() self.ofp_rest_api_addr = ofp_rest_api_addr - def create_rpc_dispatcher(self): - return [self] - def get_ofp_rest_api(self, context, **kwargs): LOG.debug(_("get_ofp_rest_api: %s"), self.ofp_rest_api_addr) return self.ofp_rest_api_addr @@ -143,10 +140,9 @@ class RyuNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2, svc_constants.L3_ROUTER_NAT: topics.L3PLUGIN} self.conn = rpc_compat.create_connection(new=True) self.notifier = AgentNotifierApi(topics.AGENT) - self.callbacks = RyuRpcCallbacks(self.ofp_api_host) - self.dispatcher = self.callbacks.create_rpc_dispatcher() + self.endpoints = [RyuRpcCallbacks(self.ofp_api_host)] for svc_topic in self.service_topics.values(): - self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False) + self.conn.create_consumer(svc_topic, self.endpoints, fanout=False) self.conn.consume_in_threads() def _create_all_tenant_network(self): diff --git a/neutron/plugins/vmware/dhcp_meta/rpc.py b/neutron/plugins/vmware/dhcp_meta/rpc.py index c32a39b3724..9d409d01a79 100644 --- a/neutron/plugins/vmware/dhcp_meta/rpc.py +++ b/neutron/plugins/vmware/dhcp_meta/rpc.py @@ -25,7 +25,6 @@ from neutron.api.v2 import attributes from neutron.common import constants as const from neutron.common import exceptions as ntn_exc from neutron.common import rpc_compat -from neutron.db import agents_db from neutron.db import db_base_plugin_v2 from neutron.db import dhcp_rpc_base from neutron.db import l3_db @@ -48,14 +47,6 @@ class NSXRpcCallbacks(rpc_compat.RpcCallback, RPC_API_VERSION = '1.1' - def create_rpc_dispatcher(self): - '''Get the rpc dispatcher for this manager. - - If a manager would like to set an rpc API version, or support more than - one class as the target of rpc messages, override this method. - ''' - return [self, agents_db.AgentExtRpcCallback()] - def handle_network_dhcp_access(plugin, context, network, action): pass diff --git a/neutron/plugins/vmware/dhcpmeta_modes.py b/neutron/plugins/vmware/dhcpmeta_modes.py index b878503a277..0ce2112f60a 100644 --- a/neutron/plugins/vmware/dhcpmeta_modes.py +++ b/neutron/plugins/vmware/dhcpmeta_modes.py @@ -21,6 +21,7 @@ from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api from neutron.common import constants as const from neutron.common import rpc_compat from neutron.common import topics +from neutron.db import agents_db from neutron.openstack.common import importutils from neutron.openstack.common import log as logging from neutron.plugins.vmware.common import config @@ -70,8 +71,9 @@ class DhcpMetadataAccess(object): def _setup_rpc_dhcp_metadata(self, notifier=None): self.topic = topics.PLUGIN self.conn = rpc_compat.create_connection(new=True) - self.dispatcher = nsx_rpc.NSXRpcCallbacks().create_rpc_dispatcher() - self.conn.create_consumer(self.topic, self.dispatcher, fanout=False) + self.endpoints = [nsx_rpc.NSXRpcCallbacks(), + agents_db.AgentExtRpcCallback()] + self.conn.create_consumer(self.topic, self.endpoints, fanout=False) self.agent_notifiers[const.AGENT_TYPE_DHCP] = ( notifier or dhcp_rpc_agent_api.DhcpAgentNotifyAPI()) self.conn.consume_in_threads() diff --git a/neutron/services/firewall/fwaas_plugin.py b/neutron/services/firewall/fwaas_plugin.py index dbda0c5bfaa..f0eaf1b2c60 100644 --- a/neutron/services/firewall/fwaas_plugin.py +++ b/neutron/services/firewall/fwaas_plugin.py @@ -40,9 +40,6 @@ class FirewallCallbacks(rpc_compat.RpcCallback): super(FirewallCallbacks, self).__init__() self.plugin = plugin - def create_rpc_dispatcher(self): - return [self] - def set_firewall_status(self, context, firewall_id, status, **kwargs): """Agent uses this to set a firewall's status.""" LOG.debug(_("set_firewall_status() called")) @@ -165,13 +162,11 @@ class FirewallPlugin(firewall_db.Firewall_db_mixin): """Do the initialization for the firewall service plugin here.""" qdbapi.register_models() - self.callbacks = FirewallCallbacks(self) + self.endpoints = [FirewallCallbacks(self)] self.conn = rpc_compat.create_connection(new=True) self.conn.create_consumer( - topics.FIREWALL_PLUGIN, - self.callbacks.create_rpc_dispatcher(), - fanout=False) + topics.FIREWALL_PLUGIN, self.endpoints, fanout=False) self.conn.consume_in_threads() self.agent_rpc = FirewallAgentApi( diff --git a/neutron/services/l3_router/l3_router_plugin.py b/neutron/services/l3_router/l3_router_plugin.py index b13074193f1..bd1378bc8db 100644 --- a/neutron/services/l3_router/l3_router_plugin.py +++ b/neutron/services/l3_router/l3_router_plugin.py @@ -39,14 +39,6 @@ class L3RouterPluginRpcCallbacks(rpc_compat.RpcCallback, RPC_API_VERSION = '1.1' - def create_rpc_dispatcher(self): - """Get the rpc dispatcher for this manager. - - If a manager would like to set an rpc API version, or support more than - one class as the target of rpc messages, override this method. - """ - return [self] - class L3RouterPlugin(db_base_plugin_v2.CommonDbMixin, extraroute_db.ExtraRoute_db_mixin, @@ -76,9 +68,8 @@ class L3RouterPlugin(db_base_plugin_v2.CommonDbMixin, self.conn = rpc_compat.create_connection(new=True) self.agent_notifiers.update( {q_const.AGENT_TYPE_L3: l3_rpc_agent_api.L3AgentNotifyAPI()}) - self.callbacks = L3RouterPluginRpcCallbacks() - self.dispatcher = self.callbacks.create_rpc_dispatcher() - self.conn.create_consumer(self.topic, self.dispatcher, + self.endpoints = [L3RouterPluginRpcCallbacks()] + self.conn.create_consumer(self.topic, self.endpoints, fanout=False) self.conn.consume_in_threads() diff --git a/neutron/services/loadbalancer/drivers/common/agent_driver_base.py b/neutron/services/loadbalancer/drivers/common/agent_driver_base.py index da9c438f14f..5849c959753 100644 --- a/neutron/services/loadbalancer/drivers/common/agent_driver_base.py +++ b/neutron/services/loadbalancer/drivers/common/agent_driver_base.py @@ -64,9 +64,6 @@ class LoadBalancerCallbacks(rpc_compat.RpcCallback): super(LoadBalancerCallbacks, self).__init__() self.plugin = plugin - def create_rpc_dispatcher(self): - return [self, agents_db.AgentExtRpcCallback(self.plugin)] - def get_ready_devices(self, context, host=None): with context.session.begin(subtransactions=True): agents = self.plugin.get_lbaas_agents(context, @@ -342,11 +339,14 @@ class AgentDriverBase(abstract_driver.LoadBalancerAbstractDriver): if hasattr(self.plugin, 'agent_callbacks'): return - self.plugin.agent_callbacks = LoadBalancerCallbacks(self.plugin) + self.plugin.agent_endpoints = [ + LoadBalancerCallbacks(self.plugin), + agents_db.AgentExtRpcCallback(self.plugin) + ] self.plugin.conn = rpc_compat.create_connection(new=True) self.plugin.conn.create_consumer( topics.LOADBALANCER_PLUGIN, - self.plugin.agent_callbacks.create_rpc_dispatcher(), + self.plugin.agent_endpoints, fanout=False) self.plugin.conn.consume_in_threads() diff --git a/neutron/services/metering/metering_plugin.py b/neutron/services/metering/metering_plugin.py index dd111493559..e67dbab60d4 100644 --- a/neutron/services/metering/metering_plugin.py +++ b/neutron/services/metering/metering_plugin.py @@ -28,13 +28,11 @@ class MeteringPlugin(metering_db.MeteringDbMixin): def __init__(self): super(MeteringPlugin, self).__init__() - self.callbacks = metering_rpc.MeteringRpcCallbacks(self) + self.endpoints = [metering_rpc.MeteringRpcCallbacks(self)] self.conn = rpc_compat.create_connection(new=True) self.conn.create_consumer( - topics.METERING_PLUGIN, - self.callbacks.create_rpc_dispatcher(), - fanout=False) + topics.METERING_PLUGIN, self.endpoints, fanout=False) self.conn.consume_in_threads() self.meter_rpc = metering_rpc_agent_api.MeteringAgentNotifyAPI() diff --git a/neutron/services/vpn/device_drivers/cisco_ipsec.py b/neutron/services/vpn/device_drivers/cisco_ipsec.py index 22fe1558941..7d73735c1cc 100644 --- a/neutron/services/vpn/device_drivers/cisco_ipsec.py +++ b/neutron/services/vpn/device_drivers/cisco_ipsec.py @@ -198,10 +198,8 @@ class CiscoCsrIPsecDriver(device_drivers.DeviceDriver): self.service_state = {} - self.conn.create_consumer( - node_topic, - self.create_rpc_dispatcher(), - fanout=False) + self.endpoints = [self] + self.conn.create_consumer(node_topic, self.endpoints, fanout=False) self.conn.consume_in_threads() self.agent_rpc = ( CiscoCsrIPsecVpnDriverApi(topics.CISCO_IPSEC_DRIVER_TOPIC, '1.0')) @@ -225,9 +223,6 @@ class CiscoCsrIPsecDriver(device_drivers.DeviceDriver): v['timeout'])) for k, v in csrs_found.items()]) - def create_rpc_dispatcher(self): - return [self] - def vpnservice_updated(self, context, **kwargs): """Handle VPNaaS service driver change notifications.""" LOG.debug(_("Handling VPN service update notification '%s'"), diff --git a/neutron/services/vpn/device_drivers/ipsec.py b/neutron/services/vpn/device_drivers/ipsec.py index 2ed7c08a6e5..aef47919c0d 100644 --- a/neutron/services/vpn/device_drivers/ipsec.py +++ b/neutron/services/vpn/device_drivers/ipsec.py @@ -504,10 +504,8 @@ class IPsecDriver(device_drivers.DeviceDriver): self.processes = {} self.process_status_cache = {} - self.conn.create_consumer( - node_topic, - self.create_rpc_dispatcher(), - fanout=False) + self.endpoints = [self] + self.conn.create_consumer(node_topic, self.endpoints, fanout=False) self.conn.consume_in_threads() self.agent_rpc = IPsecVpnDriverApi(topics.IPSEC_DRIVER_TOPIC, '1.0') self.process_status_cache_check = loopingcall.FixedIntervalLoopingCall( @@ -515,9 +513,6 @@ class IPsecDriver(device_drivers.DeviceDriver): self.process_status_cache_check.start( interval=self.conf.ipsec.ipsec_status_check_interval) - def create_rpc_dispatcher(self): - return [self] - def _update_nat(self, vpnservice, func): """Setting up nat rule in iptables. diff --git a/neutron/services/vpn/service_drivers/cisco_ipsec.py b/neutron/services/vpn/service_drivers/cisco_ipsec.py index 4c4bd7a059b..ed34f41ff74 100644 --- a/neutron/services/vpn/service_drivers/cisco_ipsec.py +++ b/neutron/services/vpn/service_drivers/cisco_ipsec.py @@ -53,9 +53,6 @@ class CiscoCsrIPsecVpnDriverCallBack(rpc_compat.RpcCallback): super(CiscoCsrIPsecVpnDriverCallBack, self).__init__() self.driver = driver - def create_rpc_dispatcher(self): - return [self] - def get_vpn_services_on_host(self, context, host=None): """Retuns info on the vpnservices on the host.""" plugin = self.driver.service_plugin @@ -88,12 +85,10 @@ class CiscoCsrIPsecVPNDriver(service_drivers.VpnDriver): def __init__(self, service_plugin): super(CiscoCsrIPsecVPNDriver, self).__init__(service_plugin) - self.callbacks = CiscoCsrIPsecVpnDriverCallBack(self) + self.endpoints = [CiscoCsrIPsecVpnDriverCallBack(self)] self.conn = rpc_compat.create_connection(new=True) self.conn.create_consumer( - topics.CISCO_IPSEC_DRIVER_TOPIC, - self.callbacks.create_rpc_dispatcher(), - fanout=False) + topics.CISCO_IPSEC_DRIVER_TOPIC, self.endpoints, fanout=False) self.conn.consume_in_threads() self.agent_rpc = CiscoCsrIPsecVpnAgentApi( topics.CISCO_IPSEC_AGENT_TOPIC, BASE_IPSEC_VERSION) diff --git a/neutron/services/vpn/service_drivers/ipsec.py b/neutron/services/vpn/service_drivers/ipsec.py index 66266204483..be6aa9e26d0 100644 --- a/neutron/services/vpn/service_drivers/ipsec.py +++ b/neutron/services/vpn/service_drivers/ipsec.py @@ -40,9 +40,6 @@ class IPsecVpnDriverCallBack(rpc_compat.RpcCallback): super(IPsecVpnDriverCallBack, self).__init__() self.driver = driver - def create_rpc_dispatcher(self): - return [self] - def get_vpn_services_on_host(self, context, host=None): """Returns the vpnservices on the host.""" plugin = self.driver.service_plugin @@ -73,12 +70,10 @@ class IPsecVPNDriver(service_drivers.VpnDriver): def __init__(self, service_plugin): super(IPsecVPNDriver, self).__init__(service_plugin) - self.callbacks = IPsecVpnDriverCallBack(self) + self.endpoints = [IPsecVpnDriverCallBack(self)] self.conn = rpc_compat.create_connection(new=True) self.conn.create_consumer( - topics.IPSEC_DRIVER_TOPIC, - self.callbacks.create_rpc_dispatcher(), - fanout=False) + topics.IPSEC_DRIVER_TOPIC, self.endpoints, fanout=False) self.conn.consume_in_threads() self.agent_rpc = IPsecVpnAgentApi( topics.IPSEC_AGENT_TOPIC, BASE_IPSEC_VERSION) diff --git a/neutron/tests/unit/bigswitch/test_base.py b/neutron/tests/unit/bigswitch/test_base.py index 6e5a0f7fe78..6fc5580eb53 100644 --- a/neutron/tests/unit/bigswitch/test_base.py +++ b/neutron/tests/unit/bigswitch/test_base.py @@ -33,8 +33,6 @@ from neutron.db import portbindings_db # noqa RESTPROXY_PKG_PATH = 'neutron.plugins.bigswitch.plugin' NOTIFIER = 'neutron.plugins.bigswitch.plugin.AgentNotifierApi' -CALLBACKS = 'neutron.plugins.bigswitch.plugin.RestProxyCallbacks' -DISPATCHER = CALLBACKS + '.create_rpc_dispatcher' CERTFETCH = 'neutron.plugins.bigswitch.servermanager.ServerPool._fetch_cert' SERVER_MANAGER = 'neutron.plugins.bigswitch.servermanager' HTTPCON = 'neutron.plugins.bigswitch.servermanager.httplib.HTTPConnection' @@ -61,15 +59,11 @@ class BigSwitchTestBase(object): def setup_patches(self): self.plugin_notifier_p = mock.patch(NOTIFIER) - # prevent rpc callback dispatcher from being created - self.callbacks_p = mock.patch(DISPATCHER, - new=lambda *args, **kwargs: None) # prevent any greenthreads from spawning self.spawn_p = mock.patch(SPAWN, new=lambda *args, **kwargs: None) # prevent the consistency watchdog from starting self.watch_p = mock.patch(CWATCH, new=lambda *args, **kwargs: None) self.addCleanup(db.clear_db) - self.callbacks_p.start() self.plugin_notifier_p.start() self.spawn_p.start() self.watch_p.start() diff --git a/neutron/tests/unit/bigswitch/test_security_groups.py b/neutron/tests/unit/bigswitch/test_security_groups.py index f08623a729a..1e3a7aa56f7 100644 --- a/neutron/tests/unit/bigswitch/test_security_groups.py +++ b/neutron/tests/unit/bigswitch/test_security_groups.py @@ -32,7 +32,7 @@ class RestProxySecurityGroupsTestCase(test_sg.SecurityGroupDBTestCase, super(RestProxySecurityGroupsTestCase, self).setUp(self.plugin_str) plugin = manager.NeutronManager.get_plugin() self.notifier = plugin.notifier - self.rpc = plugin.callbacks + self.rpc = plugin.endpoints[0] self.startHttpPatch() diff --git a/neutron/tests/unit/ml2/test_port_binding.py b/neutron/tests/unit/ml2/test_port_binding.py index 19523474a2b..b4aa19a9ca6 100644 --- a/neutron/tests/unit/ml2/test_port_binding.py +++ b/neutron/tests/unit/ml2/test_port_binding.py @@ -62,7 +62,7 @@ class PortBindingTestCase(test_plugin.NeutronDbPluginV2TestCase): bound, status) port_id = port['port']['id'] neutron_context = context.get_admin_context() - details = self.plugin.callbacks.get_device_details( + details = self.plugin.endpoints[0].get_device_details( neutron_context, agent_id="theAgentId", device=port_id) if bound: self.assertEqual(details['network_type'], 'local') diff --git a/neutron/tests/unit/ml2/test_security_group.py b/neutron/tests/unit/ml2/test_security_group.py index 46aba96126d..3e82c91e4fd 100644 --- a/neutron/tests/unit/ml2/test_security_group.py +++ b/neutron/tests/unit/ml2/test_security_group.py @@ -74,7 +74,8 @@ class TestMl2SecurityGroups(Ml2SecurityGroupsTestCase, req.get_response(self.api)) port_id = res['port']['id'] plugin = manager.NeutronManager.get_plugin() - port_dict = plugin.callbacks.get_port_from_device(port_id) + callbacks = plugin.endpoints[0] + port_dict = callbacks.get_port_from_device(port_id) self.assertEqual(port_id, port_dict['id']) self.assertEqual([security_group_id], port_dict[ext_sg.SECURITYGROUPS]) @@ -85,7 +86,7 @@ class TestMl2SecurityGroups(Ml2SecurityGroupsTestCase, def test_security_group_get_port_from_device_with_no_port(self): plugin = manager.NeutronManager.get_plugin() - port_dict = plugin.callbacks.get_port_from_device('bad_device_id') + port_dict = plugin.endpoints[0].get_port_from_device('bad_device_id') self.assertIsNone(port_dict) diff --git a/neutron/tests/unit/oneconvergence/test_security_group.py b/neutron/tests/unit/oneconvergence/test_security_group.py index 051d5825b07..af08132c58d 100644 --- a/neutron/tests/unit/oneconvergence/test_security_group.py +++ b/neutron/tests/unit/oneconvergence/test_security_group.py @@ -136,7 +136,8 @@ class TestOneConvergenceSecurityGroups(OneConvergenceSecurityGroupsTestCase, req.get_response(self.api)) port_id = res['port']['id'] plugin = manager.NeutronManager.get_plugin() - port_dict = plugin.callbacks.get_port_from_device(port_id) + callbacks = plugin.endpoints[0] + port_dict = callbacks.get_port_from_device(port_id) self.assertEqual(port_id, port_dict['id']) self.assertEqual([security_group_id], port_dict[ext_sg.SECURITYGROUPS]) @@ -148,7 +149,7 @@ class TestOneConvergenceSecurityGroups(OneConvergenceSecurityGroupsTestCase, def test_security_group_get_port_from_device_with_no_port(self): plugin = manager.NeutronManager.get_plugin() - port_dict = plugin.callbacks.get_port_from_device('bad_device_id') + port_dict = plugin.endpoints[0].get_port_from_device('bad_device_id') self.assertIsNone(port_dict) diff --git a/neutron/tests/unit/openvswitch/test_ovs_security_group.py b/neutron/tests/unit/openvswitch/test_ovs_security_group.py index c681af82713..50e2caf27c9 100644 --- a/neutron/tests/unit/openvswitch/test_ovs_security_group.py +++ b/neutron/tests/unit/openvswitch/test_ovs_security_group.py @@ -84,7 +84,8 @@ class TestOpenvswitchSecurityGroups(OpenvswitchSecurityGroupsTestCase, req.get_response(self.api)) port_id = res['port']['id'] plugin = manager.NeutronManager.get_plugin() - port_dict = plugin.callbacks.get_port_from_device(port_id) + callbacks = plugin.endpoints[0] + port_dict = callbacks.get_port_from_device(port_id) self.assertEqual(port_id, port_dict['id']) self.assertEqual([security_group_id], port_dict[ext_sg.SECURITYGROUPS]) @@ -95,7 +96,7 @@ class TestOpenvswitchSecurityGroups(OpenvswitchSecurityGroupsTestCase, def test_security_group_get_port_from_device_with_no_port(self): plugin = manager.NeutronManager.get_plugin() - port_dict = plugin.callbacks.get_port_from_device('bad_device_id') + port_dict = plugin.endpoints[0].get_port_from_device('bad_device_id') self.assertIsNone(port_dict) diff --git a/neutron/tests/unit/ryu/test_ryu_security_group.py b/neutron/tests/unit/ryu/test_ryu_security_group.py index aadbe8266b6..a023136efc7 100644 --- a/neutron/tests/unit/ryu/test_ryu_security_group.py +++ b/neutron/tests/unit/ryu/test_ryu_security_group.py @@ -73,7 +73,7 @@ class TestRyuSecurityGroups(RyuSecurityGroupsTestCase, req.get_response(self.api)) port_id = res['port']['id'] plugin = manager.NeutronManager.get_plugin() - port_dict = plugin.callbacks.get_port_from_device(port_id) + port_dict = plugin.endpoints[0].get_port_from_device(port_id) self.assertEqual(port_id, port_dict['id']) self.assertEqual([security_group_id], port_dict[ext_sg.SECURITYGROUPS]) @@ -84,7 +84,7 @@ class TestRyuSecurityGroups(RyuSecurityGroupsTestCase, def test_security_group_get_port_from_device_with_no_port(self): plugin = manager.NeutronManager.get_plugin() - port_dict = plugin.callbacks.get_port_from_device('bad_device_id') + port_dict = plugin.endpoints[0].get_port_from_device('bad_device_id') self.assertIsNone(port_dict) diff --git a/neutron/tests/unit/services/firewall/test_fwaas_plugin.py b/neutron/tests/unit/services/firewall/test_fwaas_plugin.py index 73e94698b0e..2430d69c6da 100644 --- a/neutron/tests/unit/services/firewall/test_fwaas_plugin.py +++ b/neutron/tests/unit/services/firewall/test_fwaas_plugin.py @@ -41,7 +41,7 @@ class TestFirewallCallbacks(test_db_firewall.FirewallPluginDbTestCase): def setUp(self): super(TestFirewallCallbacks, self).setUp(fw_plugin=FW_PLUGIN_KLASS) - self.callbacks = self.plugin.callbacks + self.callbacks = self.plugin.endpoints[0] def test_set_firewall_status(self): ctx = context.get_admin_context() @@ -210,7 +210,7 @@ class TestFirewallPluginBase(test_db_firewall.TestFirewallDBPlugin): def setUp(self): super(TestFirewallPluginBase, self).setUp(fw_plugin=FW_PLUGIN_KLASS) - self.callbacks = self.plugin.callbacks + self.callbacks = self.plugin.endpoints[0] def test_create_second_firewall_not_permitted(self): with self.firewall(): @@ -342,7 +342,7 @@ class TestFirewallPluginBase(test_db_firewall.TestFirewallDBPlugin): for k, v in attrs.iteritems(): self.assertEqual(fw_db[k], v) # cleanup the pending firewall - self.plugin.callbacks.firewall_deleted(ctx, fw_id) + self.plugin.endpoints[0].firewall_deleted(ctx, fw_id) def test_delete_firewall_after_agent_delete(self): ctx = context.get_admin_context() diff --git a/neutron/tests/unit/test_agent_rpc.py b/neutron/tests/unit/test_agent_rpc.py index db553a9faf5..ed571376d65 100644 --- a/neutron/tests/unit/test_agent_rpc.py +++ b/neutron/tests/unit/test_agent_rpc.py @@ -83,31 +83,31 @@ class AgentPluginReportState(base.BaseTestCase): class AgentRPCMethods(base.BaseTestCase): def test_create_consumers(self): - dispatcher = mock.Mock() + endpoints = [mock.Mock()] expected = [ mock.call(new=True), - mock.call().create_consumer('foo-topic-op', dispatcher, + mock.call().create_consumer('foo-topic-op', endpoints, fanout=True), mock.call().consume_in_threads() ] call_to_patch = 'neutron.common.rpc_compat.create_connection' with mock.patch(call_to_patch) as create_connection: - rpc.create_consumers(dispatcher, 'foo', [('topic', 'op')]) + rpc.create_consumers(endpoints, 'foo', [('topic', 'op')]) create_connection.assert_has_calls(expected) def test_create_consumers_with_node_name(self): - dispatcher = mock.Mock() + endpoints = [mock.Mock()] expected = [ mock.call(new=True), - mock.call().create_consumer('foo-topic-op', dispatcher, + mock.call().create_consumer('foo-topic-op', endpoints, fanout=True), - mock.call().create_consumer('foo-topic-op.node1', dispatcher, + mock.call().create_consumer('foo-topic-op.node1', endpoints, fanout=False), mock.call().consume_in_threads() ] call_to_patch = 'neutron.common.rpc_compat.create_connection' with mock.patch(call_to_patch) as create_connection: - rpc.create_consumers(dispatcher, 'foo', [('topic', 'op', 'node1')]) + rpc.create_consumers(endpoints, 'foo', [('topic', 'op', 'node1')]) create_connection.assert_has_calls(expected)