Merge "[zmq] RPC timeout for CAST"
This commit is contained in:
@@ -98,7 +98,6 @@ class DealerPublisherLight(object):
|
||||
"addr": self.address})
|
||||
|
||||
def cleanup(self):
|
||||
self.socket.setsockopt(zmq.LINGER, 0)
|
||||
self.socket.close()
|
||||
|
||||
|
||||
|
||||
@@ -85,7 +85,6 @@ class PubPublisherProxy(object):
|
||||
def cleanup(self):
|
||||
self.matchmaker.unregister_publisher(
|
||||
(self.host, self.sync_channel.sync_host))
|
||||
self.socket.setsockopt(zmq.LINGER, 0)
|
||||
self.socket.close()
|
||||
|
||||
|
||||
|
||||
@@ -131,12 +131,14 @@ class SocketsManager(object):
|
||||
if str(target) in self.outbound_sockets:
|
||||
socket = self._check_for_new_hosts(target)
|
||||
else:
|
||||
socket = zmq_socket.ZmqSocket(self.zmq_context, self.socket_type)
|
||||
socket = zmq_socket.ZmqSocket(self.conf, self.zmq_context,
|
||||
self.socket_type)
|
||||
self._get_hosts_and_connect(socket, target)
|
||||
return socket
|
||||
|
||||
def get_socket_to_broker(self, target):
|
||||
socket = zmq_socket.ZmqSocket(self.zmq_context, self.socket_type)
|
||||
socket = zmq_socket.ZmqSocket(self.conf, self.zmq_context,
|
||||
self.socket_type)
|
||||
self._track_socket(socket, target)
|
||||
address = zmq_address.get_broker_address(self.conf)
|
||||
socket.connect_to_address(address)
|
||||
@@ -144,5 +146,4 @@ class SocketsManager(object):
|
||||
|
||||
def cleanup(self):
|
||||
for socket, tm in self.outbound_sockets.values():
|
||||
socket.setsockopt(zmq.LINGER, 0)
|
||||
socket.close()
|
||||
|
||||
@@ -49,7 +49,6 @@ class ConsumerBase(object):
|
||||
def cleanup(self):
|
||||
for socket in self.sockets:
|
||||
if not socket.handle.closed:
|
||||
socket.setsockopt(zmq.LINGER, 0)
|
||||
socket.close()
|
||||
self.sockets = []
|
||||
|
||||
|
||||
@@ -59,7 +59,7 @@ class SubConsumer(zmq_consumer_base.ConsumerBase):
|
||||
self.subscriptions = set()
|
||||
self.targets = []
|
||||
self._socket_lock = threading.Lock()
|
||||
self.socket = zmq_socket.ZmqSocket(self.context, zmq.SUB)
|
||||
self.socket = zmq_socket.ZmqSocket(self.conf, self.context, zmq.SUB)
|
||||
self.sockets.append(self.socket)
|
||||
self.id = uuid.uuid4()
|
||||
self.publishers_poller = MatchmakerPoller(
|
||||
@@ -143,8 +143,8 @@ class MatchmakerPoller(object):
|
||||
|
||||
class BackChatter(object):
|
||||
|
||||
def __init__(self, context):
|
||||
self.socket = zmq_socket.ZmqSocket(context, zmq.PUSH)
|
||||
def __init__(self, conf, context):
|
||||
self.socket = zmq_socket.ZmqSocket(conf, context, zmq.PUSH)
|
||||
|
||||
def connect(self, address):
|
||||
self.socket.connect(address)
|
||||
@@ -154,5 +154,4 @@ class BackChatter(object):
|
||||
self.socket.send(zmq_names.ACK_TYPE)
|
||||
|
||||
def close(self):
|
||||
self.socket.setsockopt(zmq.LINGER, 5)
|
||||
self.socket.close()
|
||||
|
||||
@@ -31,7 +31,8 @@ zmq = zmq_async.import_zmq()
|
||||
|
||||
class ZmqSocket(object):
|
||||
|
||||
def __init__(self, context, socket_type):
|
||||
def __init__(self, conf, context, socket_type):
|
||||
self.conf = conf
|
||||
self.context = context
|
||||
self.socket_type = socket_type
|
||||
self.handle = context.socket(socket_type)
|
||||
@@ -85,6 +86,7 @@ class ZmqSocket(object):
|
||||
return self.handle.recv_multipart(*args, **kwargs)
|
||||
|
||||
def close(self, *args, **kwargs):
|
||||
self.handle.setsockopt(zmq.LINGER, self.conf.rpc_cast_timeout * 1000)
|
||||
self.handle.close(*args, **kwargs)
|
||||
|
||||
def connect_to_address(self, address):
|
||||
@@ -118,8 +120,7 @@ class ZmqPortRangeExceededException(exceptions.MessagingException):
|
||||
class ZmqRandomPortSocket(ZmqSocket):
|
||||
|
||||
def __init__(self, conf, context, socket_type):
|
||||
super(ZmqRandomPortSocket, self).__init__(context, socket_type)
|
||||
self.conf = conf
|
||||
super(ZmqRandomPortSocket, self).__init__(conf, context, socket_type)
|
||||
self.bind_address = zmq_address.get_tcp_random_address(self.conf)
|
||||
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user