[zmq] Listeners management cleanup
In this change removed lazy items for servers as they were actually unused. Also removed unnecessary Fanout flag checking for notifications. Change-Id: Iec6647b26f32b4c5f07bb9d630e492ceda5b90b0
This commit is contained in:
parent
62fc6ab094
commit
3fd208ea2b
@ -174,12 +174,6 @@ class ZmqDriver(base.BaseDriver):
|
||||
self.conf.rpc_zmq_matchmaker,
|
||||
).driver(self.conf)
|
||||
|
||||
self.server = LazyDriverItem(
|
||||
zmq_server.ZmqServer, self, self.conf, self.matchmaker)
|
||||
|
||||
self.notify_server = LazyDriverItem(
|
||||
zmq_server.ZmqServer, self, self.conf, self.matchmaker)
|
||||
|
||||
self.client = LazyDriverItem(
|
||||
zmq_client.ZmqClient, self.conf, self.matchmaker,
|
||||
self.allowed_remote_exmods)
|
||||
@ -238,10 +232,7 @@ class ZmqDriver(base.BaseDriver):
|
||||
:type retry: int
|
||||
"""
|
||||
client = self.notifier.get()
|
||||
if target.fanout:
|
||||
client.send_notify_fanout(target, ctxt, message, version, retry)
|
||||
else:
|
||||
client.send_notify(target, ctxt, message, version, retry)
|
||||
client.send_notify(target, ctxt, message, version, retry)
|
||||
|
||||
def listen(self, target):
|
||||
"""Listen to a specified target on a server side
|
||||
@ -261,7 +252,7 @@ class ZmqDriver(base.BaseDriver):
|
||||
:param pool: Not used for zmq implementation
|
||||
:type pool: object
|
||||
"""
|
||||
server = self.notify_server.get()
|
||||
server = zmq_server.ZmqServer(self, self.conf, self.matchmaker)
|
||||
server.listen_notification(targets_and_priorities)
|
||||
return server
|
||||
|
||||
@ -269,6 +260,4 @@ class ZmqDriver(base.BaseDriver):
|
||||
"""Cleanup all driver's connections finally
|
||||
"""
|
||||
self.client.cleanup()
|
||||
self.server.cleanup()
|
||||
self.notify_server.cleanup()
|
||||
self.notifier.cleanup()
|
||||
|
@ -65,13 +65,6 @@ class ZmqClientBase(object):
|
||||
retry=retry)) as request:
|
||||
self.notify_publisher.send_request(request)
|
||||
|
||||
def send_notify_fanout(self, target, context, message, version,
|
||||
retry=None):
|
||||
with contextlib.closing(zmq_request.NotificationFanoutRequest(
|
||||
target, context, message, version=version,
|
||||
retry=retry)) as request:
|
||||
self.notify_publisher.send_request(request)
|
||||
|
||||
def cleanup(self):
|
||||
cleaned = set()
|
||||
for publisher in self.publishers.values():
|
||||
|
@ -54,7 +54,7 @@ class ZmqServer(base.Listener):
|
||||
return message
|
||||
|
||||
def stop(self):
|
||||
consumer = self.rpc_consumer
|
||||
consumer = self.router_consumer
|
||||
LOG.info(_LI("Stop server %(address)s:%(port)s"),
|
||||
{'address': consumer.address, 'port': consumer.port})
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user