Renamed consume_in_thread -> consume_in_threads
Now that we explicitly start multiple RPC servers, and each listener is served in a separate thread, renamed the method to reflect new behaviour. blueprint oslo-messaging Change-Id: I616f3a23e23e982e13f9b56ce417ca3623247f95
This commit is contained in:
		@@ -51,7 +51,7 @@ def create_consumers(dispatcher, prefix, topic_details):
 | 
			
		||||
            connection.create_consumer(node_topic_name,
 | 
			
		||||
                                       dispatcher,
 | 
			
		||||
                                       fanout=False)
 | 
			
		||||
    connection.consume_in_thread()
 | 
			
		||||
    connection.consume_in_threads()
 | 
			
		||||
    return connection
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -123,8 +123,8 @@ class Service(service.Service):
 | 
			
		||||
        if callable(getattr(self.manager, 'initialize_service_hook', None)):
 | 
			
		||||
            self.manager.initialize_service_hook(self)
 | 
			
		||||
 | 
			
		||||
        # Consume from all consumers in a thread
 | 
			
		||||
        self.conn.consume_in_thread()
 | 
			
		||||
        # Consume from all consumers in threads
 | 
			
		||||
        self.conn.consume_in_threads()
 | 
			
		||||
 | 
			
		||||
    def stop(self):
 | 
			
		||||
        # Try to shut the connection down, but if we get any sort of
 | 
			
		||||
@@ -148,7 +148,7 @@ class Connection(object):
 | 
			
		||||
        server = n_rpc.get_server(target, proxy)
 | 
			
		||||
        self.servers.append(server)
 | 
			
		||||
 | 
			
		||||
    def consume_in_thread(self):
 | 
			
		||||
    def consume_in_threads(self):
 | 
			
		||||
        for server in self.servers:
 | 
			
		||||
            server.start()
 | 
			
		||||
        return self.servers
 | 
			
		||||
 
 | 
			
		||||
@@ -509,8 +509,8 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
 | 
			
		||||
        self.dispatcher = self.callbacks.create_rpc_dispatcher()
 | 
			
		||||
        self.conn.create_consumer(self.topic, self.dispatcher,
 | 
			
		||||
                                  fanout=False)
 | 
			
		||||
        # Consume from all consumers in a thread
 | 
			
		||||
        self.conn.consume_in_thread()
 | 
			
		||||
        # Consume from all consumers in threads
 | 
			
		||||
        self.conn.consume_in_threads()
 | 
			
		||||
 | 
			
		||||
    def create_network(self, context, network):
 | 
			
		||||
        """Create a network.
 | 
			
		||||
 
 | 
			
		||||
@@ -266,8 +266,8 @@ class BrocadePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
 | 
			
		||||
        self.dispatcher = self.callbacks.create_rpc_dispatcher()
 | 
			
		||||
        for svc_topic in self.service_topics.values():
 | 
			
		||||
            self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
 | 
			
		||||
        # Consume from all consumers in a thread
 | 
			
		||||
        self.conn.consume_in_thread()
 | 
			
		||||
        # Consume from all consumers in threads
 | 
			
		||||
        self.conn.consume_in_threads()
 | 
			
		||||
        self.notifier = AgentNotifierApi(topics.AGENT)
 | 
			
		||||
        self.agent_notifiers[q_const.AGENT_TYPE_DHCP] = (
 | 
			
		||||
            dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
 | 
			
		||||
 
 | 
			
		||||
@@ -140,8 +140,8 @@ class N1kvNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
 | 
			
		||||
            self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
 | 
			
		||||
        self.dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
 | 
			
		||||
        self.l3_agent_notifier = l3_rpc_agent_api.L3AgentNotifyAPI()
 | 
			
		||||
        # Consume from all consumers in a thread
 | 
			
		||||
        self.conn.consume_in_thread()
 | 
			
		||||
        # Consume from all consumers in threads
 | 
			
		||||
        self.conn.consume_in_threads()
 | 
			
		||||
 | 
			
		||||
    def _setup_vsm(self):
 | 
			
		||||
        """
 | 
			
		||||
 
 | 
			
		||||
@@ -194,8 +194,8 @@ class HyperVNeutronPlugin(agents_db.AgentDbMixin,
 | 
			
		||||
        self.dispatcher = self.callbacks.create_rpc_dispatcher()
 | 
			
		||||
        for svc_topic in self.service_topics.values():
 | 
			
		||||
            self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
 | 
			
		||||
        # Consume from all consumers in a thread
 | 
			
		||||
        self.conn.consume_in_thread()
 | 
			
		||||
        # Consume from all consumers in threads
 | 
			
		||||
        self.conn.consume_in_threads()
 | 
			
		||||
 | 
			
		||||
    def _parse_network_vlan_ranges(self):
 | 
			
		||||
        self._network_vlan_ranges = plugin_utils.parse_network_vlan_ranges(
 | 
			
		||||
 
 | 
			
		||||
@@ -144,8 +144,8 @@ class SdnvePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
 | 
			
		||||
        self.dispatcher = self.callbacks.create_rpc_dispatcher()
 | 
			
		||||
        self.conn.create_consumer(self.topic, self.dispatcher,
 | 
			
		||||
                                  fanout=False)
 | 
			
		||||
        # Consume from all consumers in a thread
 | 
			
		||||
        self.conn.consume_in_thread()
 | 
			
		||||
        # Consume from all consumers in threads
 | 
			
		||||
        self.conn.consume_in_threads()
 | 
			
		||||
 | 
			
		||||
    def _update_base_binding_dict(self, tenant_type):
 | 
			
		||||
        if tenant_type == constants.TENANT_TYPE_OVERLAY:
 | 
			
		||||
 
 | 
			
		||||
@@ -285,8 +285,8 @@ class LinuxBridgePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
 | 
			
		||||
        self.dispatcher = self.callbacks.create_rpc_dispatcher()
 | 
			
		||||
        for svc_topic in self.service_topics.values():
 | 
			
		||||
            self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
 | 
			
		||||
        # Consume from all consumers in a thread
 | 
			
		||||
        self.conn.consume_in_thread()
 | 
			
		||||
        # Consume from all consumers in threads
 | 
			
		||||
        self.conn.consume_in_threads()
 | 
			
		||||
        self.notifier = AgentNotifierApi(topics.AGENT)
 | 
			
		||||
        self.agent_notifiers[q_const.AGENT_TYPE_DHCP] = (
 | 
			
		||||
            dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
 | 
			
		||||
 
 | 
			
		||||
@@ -386,8 +386,8 @@ class MidonetPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
 | 
			
		||||
        self.dispatcher = self.callbacks.create_rpc_dispatcher()
 | 
			
		||||
        self.conn.create_consumer(self.topic, self.dispatcher,
 | 
			
		||||
                                  fanout=False)
 | 
			
		||||
        # Consume from all consumers in a thread
 | 
			
		||||
        self.conn.consume_in_thread()
 | 
			
		||||
        # Consume from all consumers in threads
 | 
			
		||||
        self.conn.consume_in_threads()
 | 
			
		||||
 | 
			
		||||
    def create_subnet(self, context, subnet):
 | 
			
		||||
        """Create Neutron subnet.
 | 
			
		||||
 
 | 
			
		||||
@@ -132,7 +132,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
 | 
			
		||||
        self.dispatcher = self.callbacks.create_rpc_dispatcher()
 | 
			
		||||
        self.conn.create_consumer(self.topic, self.dispatcher,
 | 
			
		||||
                                  fanout=False)
 | 
			
		||||
        return self.conn.consume_in_thread()
 | 
			
		||||
        return self.conn.consume_in_threads()
 | 
			
		||||
 | 
			
		||||
    def _process_provider_segment(self, segment):
 | 
			
		||||
        network_type = self._get_attribute(segment, provider.NETWORK_TYPE)
 | 
			
		||||
 
 | 
			
		||||
@@ -124,8 +124,8 @@ class MellanoxEswitchPlugin(db_base_plugin_v2.NeutronDbPluginV2,
 | 
			
		||||
        self.dispatcher = self.callbacks.create_rpc_dispatcher()
 | 
			
		||||
        for svc_topic in self.service_topics.values():
 | 
			
		||||
            self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
 | 
			
		||||
        # Consume from all consumers in a thread
 | 
			
		||||
        self.conn.consume_in_thread()
 | 
			
		||||
        # Consume from all consumers in threads
 | 
			
		||||
        self.conn.consume_in_threads()
 | 
			
		||||
        self.notifier = agent_notify_api.AgentNotifierApi(topics.AGENT)
 | 
			
		||||
        self.agent_notifiers[q_const.AGENT_TYPE_DHCP] = (
 | 
			
		||||
            dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
 | 
			
		||||
 
 | 
			
		||||
@@ -154,8 +154,8 @@ class NECPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
 | 
			
		||||
            agents_db.AgentExtRpcCallback()]
 | 
			
		||||
        for svc_topic in self.service_topics.values():
 | 
			
		||||
            self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
 | 
			
		||||
        # Consume from all consumers in a thread
 | 
			
		||||
        self.conn.consume_in_thread()
 | 
			
		||||
        # Consume from all consumers in threads
 | 
			
		||||
        self.conn.consume_in_threads()
 | 
			
		||||
 | 
			
		||||
    def _update_resource_status(self, context, resource, id, status):
 | 
			
		||||
        """Update status of specified resource."""
 | 
			
		||||
 
 | 
			
		||||
@@ -170,8 +170,8 @@ class OneConvergencePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
 | 
			
		||||
        for svc_topic in self.service_topics.values():
 | 
			
		||||
            self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
 | 
			
		||||
 | 
			
		||||
        # Consume from all consumers in a thread
 | 
			
		||||
        self.conn.consume_in_thread()
 | 
			
		||||
        # Consume from all consumers in threads
 | 
			
		||||
        self.conn.consume_in_threads()
 | 
			
		||||
 | 
			
		||||
    def create_network(self, context, network):
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -345,8 +345,8 @@ class OVSNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
 | 
			
		||||
        self.dispatcher = self.callbacks.create_rpc_dispatcher()
 | 
			
		||||
        for svc_topic in self.service_topics.values():
 | 
			
		||||
            self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
 | 
			
		||||
        # Consume from all consumers in a thread
 | 
			
		||||
        self.conn.consume_in_thread()
 | 
			
		||||
        # Consume from all consumers in threads
 | 
			
		||||
        self.conn.consume_in_threads()
 | 
			
		||||
 | 
			
		||||
    def _parse_network_vlan_ranges(self):
 | 
			
		||||
        try:
 | 
			
		||||
 
 | 
			
		||||
@@ -147,7 +147,7 @@ class RyuNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
 | 
			
		||||
        self.dispatcher = self.callbacks.create_rpc_dispatcher()
 | 
			
		||||
        for svc_topic in self.service_topics.values():
 | 
			
		||||
            self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
 | 
			
		||||
        self.conn.consume_in_thread()
 | 
			
		||||
        self.conn.consume_in_threads()
 | 
			
		||||
 | 
			
		||||
    def _create_all_tenant_network(self):
 | 
			
		||||
        for net in db_api_v2.network_all_tenant_list():
 | 
			
		||||
 
 | 
			
		||||
@@ -74,7 +74,7 @@ class DhcpMetadataAccess(object):
 | 
			
		||||
        self.conn.create_consumer(self.topic, self.dispatcher, fanout=False)
 | 
			
		||||
        self.agent_notifiers[const.AGENT_TYPE_DHCP] = (
 | 
			
		||||
            notifier or dhcp_rpc_agent_api.DhcpAgentNotifyAPI())
 | 
			
		||||
        self.conn.consume_in_thread()
 | 
			
		||||
        self.conn.consume_in_threads()
 | 
			
		||||
        self.network_scheduler = importutils.import_object(
 | 
			
		||||
            cfg.CONF.network_scheduler_driver
 | 
			
		||||
        )
 | 
			
		||||
 
 | 
			
		||||
@@ -172,7 +172,7 @@ class FirewallPlugin(firewall_db.Firewall_db_mixin):
 | 
			
		||||
            topics.FIREWALL_PLUGIN,
 | 
			
		||||
            self.callbacks.create_rpc_dispatcher(),
 | 
			
		||||
            fanout=False)
 | 
			
		||||
        self.conn.consume_in_thread()
 | 
			
		||||
        self.conn.consume_in_threads()
 | 
			
		||||
 | 
			
		||||
        self.agent_rpc = FirewallAgentApi(
 | 
			
		||||
            topics.L3_AGENT,
 | 
			
		||||
 
 | 
			
		||||
@@ -80,7 +80,7 @@ class L3RouterPlugin(db_base_plugin_v2.CommonDbMixin,
 | 
			
		||||
        self.dispatcher = self.callbacks.create_rpc_dispatcher()
 | 
			
		||||
        self.conn.create_consumer(self.topic, self.dispatcher,
 | 
			
		||||
                                  fanout=False)
 | 
			
		||||
        self.conn.consume_in_thread()
 | 
			
		||||
        self.conn.consume_in_threads()
 | 
			
		||||
 | 
			
		||||
    def get_plugin_type(self):
 | 
			
		||||
        return constants.L3_ROUTER_NAT
 | 
			
		||||
 
 | 
			
		||||
@@ -348,7 +348,7 @@ class AgentDriverBase(abstract_driver.LoadBalancerAbstractDriver):
 | 
			
		||||
            topics.LOADBALANCER_PLUGIN,
 | 
			
		||||
            self.plugin.agent_callbacks.create_rpc_dispatcher(),
 | 
			
		||||
            fanout=False)
 | 
			
		||||
        self.plugin.conn.consume_in_thread()
 | 
			
		||||
        self.plugin.conn.consume_in_threads()
 | 
			
		||||
 | 
			
		||||
    def get_pool_agent(self, context, pool_id):
 | 
			
		||||
        agent = self.plugin.get_lbaas_agent_hosting_pool(context, pool_id)
 | 
			
		||||
 
 | 
			
		||||
@@ -35,7 +35,7 @@ class MeteringPlugin(metering_db.MeteringDbMixin):
 | 
			
		||||
            topics.METERING_PLUGIN,
 | 
			
		||||
            self.callbacks.create_rpc_dispatcher(),
 | 
			
		||||
            fanout=False)
 | 
			
		||||
        self.conn.consume_in_thread()
 | 
			
		||||
        self.conn.consume_in_threads()
 | 
			
		||||
 | 
			
		||||
        self.meter_rpc = metering_rpc_agent_api.MeteringAgentNotifyAPI()
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -202,7 +202,7 @@ class CiscoCsrIPsecDriver(device_drivers.DeviceDriver):
 | 
			
		||||
            node_topic,
 | 
			
		||||
            self.create_rpc_dispatcher(),
 | 
			
		||||
            fanout=False)
 | 
			
		||||
        self.conn.consume_in_thread()
 | 
			
		||||
        self.conn.consume_in_threads()
 | 
			
		||||
        self.agent_rpc = (
 | 
			
		||||
            CiscoCsrIPsecVpnDriverApi(topics.CISCO_IPSEC_DRIVER_TOPIC, '1.0'))
 | 
			
		||||
        self.periodic_report = loopingcall.FixedIntervalLoopingCall(
 | 
			
		||||
 
 | 
			
		||||
@@ -508,7 +508,7 @@ class IPsecDriver(device_drivers.DeviceDriver):
 | 
			
		||||
            node_topic,
 | 
			
		||||
            self.create_rpc_dispatcher(),
 | 
			
		||||
            fanout=False)
 | 
			
		||||
        self.conn.consume_in_thread()
 | 
			
		||||
        self.conn.consume_in_threads()
 | 
			
		||||
        self.agent_rpc = IPsecVpnDriverApi(topics.IPSEC_DRIVER_TOPIC, '1.0')
 | 
			
		||||
        self.process_status_cache_check = loopingcall.FixedIntervalLoopingCall(
 | 
			
		||||
            self.report_status, self.context)
 | 
			
		||||
 
 | 
			
		||||
@@ -94,7 +94,7 @@ class CiscoCsrIPsecVPNDriver(service_drivers.VpnDriver):
 | 
			
		||||
            topics.CISCO_IPSEC_DRIVER_TOPIC,
 | 
			
		||||
            self.callbacks.create_rpc_dispatcher(),
 | 
			
		||||
            fanout=False)
 | 
			
		||||
        self.conn.consume_in_thread()
 | 
			
		||||
        self.conn.consume_in_threads()
 | 
			
		||||
        self.agent_rpc = CiscoCsrIPsecVpnAgentApi(
 | 
			
		||||
            topics.CISCO_IPSEC_AGENT_TOPIC, BASE_IPSEC_VERSION)
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -79,7 +79,7 @@ class IPsecVPNDriver(service_drivers.VpnDriver):
 | 
			
		||||
            topics.IPSEC_DRIVER_TOPIC,
 | 
			
		||||
            self.callbacks.create_rpc_dispatcher(),
 | 
			
		||||
            fanout=False)
 | 
			
		||||
        self.conn.consume_in_thread()
 | 
			
		||||
        self.conn.consume_in_threads()
 | 
			
		||||
        self.agent_rpc = IPsecVpnAgentApi(
 | 
			
		||||
            topics.IPSEC_AGENT_TOPIC, BASE_IPSEC_VERSION)
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -172,7 +172,7 @@ class BaseTestCase(testtools.TestCase):
 | 
			
		||||
 | 
			
		||||
        # don't actually start RPC listeners when testing
 | 
			
		||||
        self.useFixture(fixtures.MonkeyPatch(
 | 
			
		||||
            'neutron.common.rpc_compat.Connection.consume_in_thread',
 | 
			
		||||
            'neutron.common.rpc_compat.Connection.consume_in_threads',
 | 
			
		||||
            fake_consume_in_threads))
 | 
			
		||||
 | 
			
		||||
        self.useFixture(fixtures.MonkeyPatch(
 | 
			
		||||
 
 | 
			
		||||
@@ -88,7 +88,7 @@ class AgentRPCMethods(base.BaseTestCase):
 | 
			
		||||
            mock.call(new=True),
 | 
			
		||||
            mock.call().create_consumer('foo-topic-op', dispatcher,
 | 
			
		||||
                                        fanout=True),
 | 
			
		||||
            mock.call().consume_in_thread()
 | 
			
		||||
            mock.call().consume_in_threads()
 | 
			
		||||
        ]
 | 
			
		||||
 | 
			
		||||
        call_to_patch = 'neutron.common.rpc_compat.create_connection'
 | 
			
		||||
@@ -104,7 +104,7 @@ class AgentRPCMethods(base.BaseTestCase):
 | 
			
		||||
                                        fanout=True),
 | 
			
		||||
            mock.call().create_consumer('foo-topic-op.node1', dispatcher,
 | 
			
		||||
                                        fanout=False),
 | 
			
		||||
            mock.call().consume_in_thread()
 | 
			
		||||
            mock.call().consume_in_threads()
 | 
			
		||||
        ]
 | 
			
		||||
 | 
			
		||||
        call_to_patch = 'neutron.common.rpc_compat.create_connection'
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user