diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py index 85dc459a..fb7a8f3d 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/dealer/zmq_dealer_publisher.py @@ -98,7 +98,6 @@ class DealerPublisherLight(object): "addr": self.address}) def cleanup(self): - self.socket.setsockopt(zmq.LINGER, 0) self.socket.close() diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_pub_publisher.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_pub_publisher.py index 7dc4b239..7d1f6b95 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_pub_publisher.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_pub_publisher.py @@ -85,7 +85,6 @@ class PubPublisherProxy(object): def cleanup(self): self.matchmaker.unregister_publisher( (self.host, self.sync_channel.sync_host)) - self.socket.setsockopt(zmq.LINGER, 0) self.socket.close() diff --git a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py index e08905a6..a58bd5fc 100644 --- a/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py +++ b/oslo_messaging/_drivers/zmq_driver/client/publishers/zmq_publisher_base.py @@ -131,12 +131,14 @@ class SocketsManager(object): if str(target) in self.outbound_sockets: socket = self._check_for_new_hosts(target) else: - socket = zmq_socket.ZmqSocket(self.zmq_context, self.socket_type) + socket = zmq_socket.ZmqSocket(self.conf, self.zmq_context, + self.socket_type) self._get_hosts_and_connect(socket, target) return socket def get_socket_to_broker(self, target): - socket = zmq_socket.ZmqSocket(self.zmq_context, self.socket_type) + socket = zmq_socket.ZmqSocket(self.conf, self.zmq_context, + self.socket_type) self._track_socket(socket, target) address = zmq_address.get_broker_address(self.conf) socket.connect_to_address(address) @@ -144,5 +146,4 @@ class SocketsManager(object): def cleanup(self): for socket, tm in self.outbound_sockets.values(): - socket.setsockopt(zmq.LINGER, 0) socket.close() diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py index 07936d30..88a7dd40 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_consumer_base.py @@ -49,7 +49,6 @@ class ConsumerBase(object): def cleanup(self): for socket in self.sockets: if not socket.handle.closed: - socket.setsockopt(zmq.LINGER, 0) socket.close() self.sockets = [] diff --git a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py index 36bb99cc..d892c495 100644 --- a/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py +++ b/oslo_messaging/_drivers/zmq_driver/server/consumers/zmq_sub_consumer.py @@ -59,7 +59,7 @@ class SubConsumer(zmq_consumer_base.ConsumerBase): self.subscriptions = set() self.targets = [] self._socket_lock = threading.Lock() - self.socket = zmq_socket.ZmqSocket(self.context, zmq.SUB) + self.socket = zmq_socket.ZmqSocket(self.conf, self.context, zmq.SUB) self.sockets.append(self.socket) self.id = uuid.uuid4() self.publishers_poller = MatchmakerPoller( @@ -143,8 +143,8 @@ class MatchmakerPoller(object): class BackChatter(object): - def __init__(self, context): - self.socket = zmq_socket.ZmqSocket(context, zmq.PUSH) + def __init__(self, conf, context): + self.socket = zmq_socket.ZmqSocket(conf, context, zmq.PUSH) def connect(self, address): self.socket.connect(address) @@ -154,5 +154,4 @@ class BackChatter(object): self.socket.send(zmq_names.ACK_TYPE) def close(self): - self.socket.setsockopt(zmq.LINGER, 5) self.socket.close() diff --git a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py index c1665726..8556b734 100644 --- a/oslo_messaging/_drivers/zmq_driver/zmq_socket.py +++ b/oslo_messaging/_drivers/zmq_driver/zmq_socket.py @@ -31,7 +31,8 @@ zmq = zmq_async.import_zmq() class ZmqSocket(object): - def __init__(self, context, socket_type): + def __init__(self, conf, context, socket_type): + self.conf = conf self.context = context self.socket_type = socket_type self.handle = context.socket(socket_type) @@ -85,6 +86,7 @@ class ZmqSocket(object): return self.handle.recv_multipart(*args, **kwargs) def close(self, *args, **kwargs): + self.handle.setsockopt(zmq.LINGER, self.conf.rpc_cast_timeout * 1000) self.handle.close(*args, **kwargs) def connect_to_address(self, address): @@ -118,8 +120,7 @@ class ZmqPortRangeExceededException(exceptions.MessagingException): class ZmqRandomPortSocket(ZmqSocket): def __init__(self, conf, context, socket_type): - super(ZmqRandomPortSocket, self).__init__(context, socket_type) - self.conf = conf + super(ZmqRandomPortSocket, self).__init__(conf, context, socket_type) self.bind_address = zmq_address.get_tcp_random_address(self.conf) try: