Renamed consume_in_thread -> consume_in_threads
Now that we explicitly start multiple RPC servers, and each listener is served in a separate thread, renamed the method to reflect new behaviour. blueprint oslo-messaging Change-Id: I616f3a23e23e982e13f9b56ce417ca3623247f95
This commit is contained in:
parent
18dd65142f
commit
159bb1bf37
|
@ -51,7 +51,7 @@ def create_consumers(dispatcher, prefix, topic_details):
|
|||
connection.create_consumer(node_topic_name,
|
||||
dispatcher,
|
||||
fanout=False)
|
||||
connection.consume_in_thread()
|
||||
connection.consume_in_threads()
|
||||
return connection
|
||||
|
||||
|
||||
|
|
|
@ -123,8 +123,8 @@ class Service(service.Service):
|
|||
if callable(getattr(self.manager, 'initialize_service_hook', None)):
|
||||
self.manager.initialize_service_hook(self)
|
||||
|
||||
# Consume from all consumers in a thread
|
||||
self.conn.consume_in_thread()
|
||||
# Consume from all consumers in threads
|
||||
self.conn.consume_in_threads()
|
||||
|
||||
def stop(self):
|
||||
# Try to shut the connection down, but if we get any sort of
|
||||
|
@ -148,7 +148,7 @@ class Connection(object):
|
|||
server = n_rpc.get_server(target, proxy)
|
||||
self.servers.append(server)
|
||||
|
||||
def consume_in_thread(self):
|
||||
def consume_in_threads(self):
|
||||
for server in self.servers:
|
||||
server.start()
|
||||
return self.servers
|
||||
|
|
|
@ -509,8 +509,8 @@ class NeutronRestProxyV2(NeutronRestProxyV2Base,
|
|||
self.dispatcher = self.callbacks.create_rpc_dispatcher()
|
||||
self.conn.create_consumer(self.topic, self.dispatcher,
|
||||
fanout=False)
|
||||
# Consume from all consumers in a thread
|
||||
self.conn.consume_in_thread()
|
||||
# Consume from all consumers in threads
|
||||
self.conn.consume_in_threads()
|
||||
|
||||
def create_network(self, context, network):
|
||||
"""Create a network.
|
||||
|
|
|
@ -266,8 +266,8 @@ class BrocadePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
self.dispatcher = self.callbacks.create_rpc_dispatcher()
|
||||
for svc_topic in self.service_topics.values():
|
||||
self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
|
||||
# Consume from all consumers in a thread
|
||||
self.conn.consume_in_thread()
|
||||
# Consume from all consumers in threads
|
||||
self.conn.consume_in_threads()
|
||||
self.notifier = AgentNotifierApi(topics.AGENT)
|
||||
self.agent_notifiers[q_const.AGENT_TYPE_DHCP] = (
|
||||
dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
|
||||
|
|
|
@ -140,8 +140,8 @@ class N1kvNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
|
||||
self.dhcp_agent_notifier = dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
|
||||
self.l3_agent_notifier = l3_rpc_agent_api.L3AgentNotifyAPI()
|
||||
# Consume from all consumers in a thread
|
||||
self.conn.consume_in_thread()
|
||||
# Consume from all consumers in threads
|
||||
self.conn.consume_in_threads()
|
||||
|
||||
def _setup_vsm(self):
|
||||
"""
|
||||
|
|
|
@ -194,8 +194,8 @@ class HyperVNeutronPlugin(agents_db.AgentDbMixin,
|
|||
self.dispatcher = self.callbacks.create_rpc_dispatcher()
|
||||
for svc_topic in self.service_topics.values():
|
||||
self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
|
||||
# Consume from all consumers in a thread
|
||||
self.conn.consume_in_thread()
|
||||
# Consume from all consumers in threads
|
||||
self.conn.consume_in_threads()
|
||||
|
||||
def _parse_network_vlan_ranges(self):
|
||||
self._network_vlan_ranges = plugin_utils.parse_network_vlan_ranges(
|
||||
|
|
|
@ -144,8 +144,8 @@ class SdnvePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
self.dispatcher = self.callbacks.create_rpc_dispatcher()
|
||||
self.conn.create_consumer(self.topic, self.dispatcher,
|
||||
fanout=False)
|
||||
# Consume from all consumers in a thread
|
||||
self.conn.consume_in_thread()
|
||||
# Consume from all consumers in threads
|
||||
self.conn.consume_in_threads()
|
||||
|
||||
def _update_base_binding_dict(self, tenant_type):
|
||||
if tenant_type == constants.TENANT_TYPE_OVERLAY:
|
||||
|
|
|
@ -285,8 +285,8 @@ class LinuxBridgePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
self.dispatcher = self.callbacks.create_rpc_dispatcher()
|
||||
for svc_topic in self.service_topics.values():
|
||||
self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
|
||||
# Consume from all consumers in a thread
|
||||
self.conn.consume_in_thread()
|
||||
# Consume from all consumers in threads
|
||||
self.conn.consume_in_threads()
|
||||
self.notifier = AgentNotifierApi(topics.AGENT)
|
||||
self.agent_notifiers[q_const.AGENT_TYPE_DHCP] = (
|
||||
dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
|
||||
|
|
|
@ -386,8 +386,8 @@ class MidonetPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
self.dispatcher = self.callbacks.create_rpc_dispatcher()
|
||||
self.conn.create_consumer(self.topic, self.dispatcher,
|
||||
fanout=False)
|
||||
# Consume from all consumers in a thread
|
||||
self.conn.consume_in_thread()
|
||||
# Consume from all consumers in threads
|
||||
self.conn.consume_in_threads()
|
||||
|
||||
def create_subnet(self, context, subnet):
|
||||
"""Create Neutron subnet.
|
||||
|
|
|
@ -132,7 +132,7 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
self.dispatcher = self.callbacks.create_rpc_dispatcher()
|
||||
self.conn.create_consumer(self.topic, self.dispatcher,
|
||||
fanout=False)
|
||||
return self.conn.consume_in_thread()
|
||||
return self.conn.consume_in_threads()
|
||||
|
||||
def _process_provider_segment(self, segment):
|
||||
network_type = self._get_attribute(segment, provider.NETWORK_TYPE)
|
||||
|
|
|
@ -124,8 +124,8 @@ class MellanoxEswitchPlugin(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
self.dispatcher = self.callbacks.create_rpc_dispatcher()
|
||||
for svc_topic in self.service_topics.values():
|
||||
self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
|
||||
# Consume from all consumers in a thread
|
||||
self.conn.consume_in_thread()
|
||||
# Consume from all consumers in threads
|
||||
self.conn.consume_in_threads()
|
||||
self.notifier = agent_notify_api.AgentNotifierApi(topics.AGENT)
|
||||
self.agent_notifiers[q_const.AGENT_TYPE_DHCP] = (
|
||||
dhcp_rpc_agent_api.DhcpAgentNotifyAPI()
|
||||
|
|
|
@ -154,8 +154,8 @@ class NECPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
agents_db.AgentExtRpcCallback()]
|
||||
for svc_topic in self.service_topics.values():
|
||||
self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
|
||||
# Consume from all consumers in a thread
|
||||
self.conn.consume_in_thread()
|
||||
# Consume from all consumers in threads
|
||||
self.conn.consume_in_threads()
|
||||
|
||||
def _update_resource_status(self, context, resource, id, status):
|
||||
"""Update status of specified resource."""
|
||||
|
|
|
@ -170,8 +170,8 @@ class OneConvergencePluginV2(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
for svc_topic in self.service_topics.values():
|
||||
self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
|
||||
|
||||
# Consume from all consumers in a thread
|
||||
self.conn.consume_in_thread()
|
||||
# Consume from all consumers in threads
|
||||
self.conn.consume_in_threads()
|
||||
|
||||
def create_network(self, context, network):
|
||||
|
||||
|
|
|
@ -345,8 +345,8 @@ class OVSNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
self.dispatcher = self.callbacks.create_rpc_dispatcher()
|
||||
for svc_topic in self.service_topics.values():
|
||||
self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
|
||||
# Consume from all consumers in a thread
|
||||
self.conn.consume_in_thread()
|
||||
# Consume from all consumers in threads
|
||||
self.conn.consume_in_threads()
|
||||
|
||||
def _parse_network_vlan_ranges(self):
|
||||
try:
|
||||
|
|
|
@ -147,7 +147,7 @@ class RyuNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2,
|
|||
self.dispatcher = self.callbacks.create_rpc_dispatcher()
|
||||
for svc_topic in self.service_topics.values():
|
||||
self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False)
|
||||
self.conn.consume_in_thread()
|
||||
self.conn.consume_in_threads()
|
||||
|
||||
def _create_all_tenant_network(self):
|
||||
for net in db_api_v2.network_all_tenant_list():
|
||||
|
|
|
@ -74,7 +74,7 @@ class DhcpMetadataAccess(object):
|
|||
self.conn.create_consumer(self.topic, self.dispatcher, fanout=False)
|
||||
self.agent_notifiers[const.AGENT_TYPE_DHCP] = (
|
||||
notifier or dhcp_rpc_agent_api.DhcpAgentNotifyAPI())
|
||||
self.conn.consume_in_thread()
|
||||
self.conn.consume_in_threads()
|
||||
self.network_scheduler = importutils.import_object(
|
||||
cfg.CONF.network_scheduler_driver
|
||||
)
|
||||
|
|
|
@ -172,7 +172,7 @@ class FirewallPlugin(firewall_db.Firewall_db_mixin):
|
|||
topics.FIREWALL_PLUGIN,
|
||||
self.callbacks.create_rpc_dispatcher(),
|
||||
fanout=False)
|
||||
self.conn.consume_in_thread()
|
||||
self.conn.consume_in_threads()
|
||||
|
||||
self.agent_rpc = FirewallAgentApi(
|
||||
topics.L3_AGENT,
|
||||
|
|
|
@ -80,7 +80,7 @@ class L3RouterPlugin(db_base_plugin_v2.CommonDbMixin,
|
|||
self.dispatcher = self.callbacks.create_rpc_dispatcher()
|
||||
self.conn.create_consumer(self.topic, self.dispatcher,
|
||||
fanout=False)
|
||||
self.conn.consume_in_thread()
|
||||
self.conn.consume_in_threads()
|
||||
|
||||
def get_plugin_type(self):
|
||||
return constants.L3_ROUTER_NAT
|
||||
|
|
|
@ -348,7 +348,7 @@ class AgentDriverBase(abstract_driver.LoadBalancerAbstractDriver):
|
|||
topics.LOADBALANCER_PLUGIN,
|
||||
self.plugin.agent_callbacks.create_rpc_dispatcher(),
|
||||
fanout=False)
|
||||
self.plugin.conn.consume_in_thread()
|
||||
self.plugin.conn.consume_in_threads()
|
||||
|
||||
def get_pool_agent(self, context, pool_id):
|
||||
agent = self.plugin.get_lbaas_agent_hosting_pool(context, pool_id)
|
||||
|
|
|
@ -35,7 +35,7 @@ class MeteringPlugin(metering_db.MeteringDbMixin):
|
|||
topics.METERING_PLUGIN,
|
||||
self.callbacks.create_rpc_dispatcher(),
|
||||
fanout=False)
|
||||
self.conn.consume_in_thread()
|
||||
self.conn.consume_in_threads()
|
||||
|
||||
self.meter_rpc = metering_rpc_agent_api.MeteringAgentNotifyAPI()
|
||||
|
||||
|
|
|
@ -202,7 +202,7 @@ class CiscoCsrIPsecDriver(device_drivers.DeviceDriver):
|
|||
node_topic,
|
||||
self.create_rpc_dispatcher(),
|
||||
fanout=False)
|
||||
self.conn.consume_in_thread()
|
||||
self.conn.consume_in_threads()
|
||||
self.agent_rpc = (
|
||||
CiscoCsrIPsecVpnDriverApi(topics.CISCO_IPSEC_DRIVER_TOPIC, '1.0'))
|
||||
self.periodic_report = loopingcall.FixedIntervalLoopingCall(
|
||||
|
|
|
@ -508,7 +508,7 @@ class IPsecDriver(device_drivers.DeviceDriver):
|
|||
node_topic,
|
||||
self.create_rpc_dispatcher(),
|
||||
fanout=False)
|
||||
self.conn.consume_in_thread()
|
||||
self.conn.consume_in_threads()
|
||||
self.agent_rpc = IPsecVpnDriverApi(topics.IPSEC_DRIVER_TOPIC, '1.0')
|
||||
self.process_status_cache_check = loopingcall.FixedIntervalLoopingCall(
|
||||
self.report_status, self.context)
|
||||
|
|
|
@ -94,7 +94,7 @@ class CiscoCsrIPsecVPNDriver(service_drivers.VpnDriver):
|
|||
topics.CISCO_IPSEC_DRIVER_TOPIC,
|
||||
self.callbacks.create_rpc_dispatcher(),
|
||||
fanout=False)
|
||||
self.conn.consume_in_thread()
|
||||
self.conn.consume_in_threads()
|
||||
self.agent_rpc = CiscoCsrIPsecVpnAgentApi(
|
||||
topics.CISCO_IPSEC_AGENT_TOPIC, BASE_IPSEC_VERSION)
|
||||
|
||||
|
|
|
@ -79,7 +79,7 @@ class IPsecVPNDriver(service_drivers.VpnDriver):
|
|||
topics.IPSEC_DRIVER_TOPIC,
|
||||
self.callbacks.create_rpc_dispatcher(),
|
||||
fanout=False)
|
||||
self.conn.consume_in_thread()
|
||||
self.conn.consume_in_threads()
|
||||
self.agent_rpc = IPsecVpnAgentApi(
|
||||
topics.IPSEC_AGENT_TOPIC, BASE_IPSEC_VERSION)
|
||||
|
||||
|
|
|
@ -172,7 +172,7 @@ class BaseTestCase(testtools.TestCase):
|
|||
|
||||
# don't actually start RPC listeners when testing
|
||||
self.useFixture(fixtures.MonkeyPatch(
|
||||
'neutron.common.rpc_compat.Connection.consume_in_thread',
|
||||
'neutron.common.rpc_compat.Connection.consume_in_threads',
|
||||
fake_consume_in_threads))
|
||||
|
||||
self.useFixture(fixtures.MonkeyPatch(
|
||||
|
|
|
@ -88,7 +88,7 @@ class AgentRPCMethods(base.BaseTestCase):
|
|||
mock.call(new=True),
|
||||
mock.call().create_consumer('foo-topic-op', dispatcher,
|
||||
fanout=True),
|
||||
mock.call().consume_in_thread()
|
||||
mock.call().consume_in_threads()
|
||||
]
|
||||
|
||||
call_to_patch = 'neutron.common.rpc_compat.create_connection'
|
||||
|
@ -104,7 +104,7 @@ class AgentRPCMethods(base.BaseTestCase):
|
|||
fanout=True),
|
||||
mock.call().create_consumer('foo-topic-op.node1', dispatcher,
|
||||
fanout=False),
|
||||
mock.call().consume_in_thread()
|
||||
mock.call().consume_in_threads()
|
||||
]
|
||||
|
||||
call_to_patch = 'neutron.common.rpc_compat.create_connection'
|
||||
|
|
Loading…
Reference in New Issue