[zmq] Second router proxy doesn't dispatch messages properly
Fixed consumers identity generation, so that single identity to be generated per socket object. Change-Id: Ibbc004fed87ac47430fcad0a3782cdaf27e4c926 Closes-Bug: #1578867
This commit is contained in:
parent
90e7b26016
commit
e65539bb70
@ -78,7 +78,7 @@ class UniversalQueueProxy(object):
|
|||||||
|
|
||||||
def _redirect_message(self, multipart_message):
|
def _redirect_message(self, multipart_message):
|
||||||
envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE]
|
envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE]
|
||||||
LOG.debug("<-> Route message: %s", envelope)
|
LOG.debug("<-> Dispatch message: %s", envelope)
|
||||||
response_binary = multipart_message[zmq_names.MULTIPART_IDX_BODY]
|
response_binary = multipart_message[zmq_names.MULTIPART_IDX_BODY]
|
||||||
|
|
||||||
self.router_socket.send(envelope.routing_key, zmq.SNDMORE)
|
self.router_socket.send(envelope.routing_key, zmq.SNDMORE)
|
||||||
|
@ -42,7 +42,7 @@ class ZmqSocket(object):
|
|||||||
if self.conf.rpc_cast_timeout > 0:
|
if self.conf.rpc_cast_timeout > 0:
|
||||||
self.close_linger = self.conf.rpc_cast_timeout * 1000
|
self.close_linger = self.conf.rpc_cast_timeout * 1000
|
||||||
self.handle.setsockopt(zmq.LINGER, self.close_linger)
|
self.handle.setsockopt(zmq.LINGER, self.close_linger)
|
||||||
|
self.handle.identity = six.b(str(uuid.uuid4()))
|
||||||
self.connections = set()
|
self.connections = set()
|
||||||
|
|
||||||
def type_name(self):
|
def type_name(self):
|
||||||
@ -98,14 +98,10 @@ class ZmqSocket(object):
|
|||||||
def connect_to_address(self, address):
|
def connect_to_address(self, address):
|
||||||
stype = zmq_names.socket_type_str(self.socket_type)
|
stype = zmq_names.socket_type_str(self.socket_type)
|
||||||
try:
|
try:
|
||||||
LOG.info(_LI("Connecting %(stype)s to %(address)s"),
|
LOG.info(_LI("Connecting %(stype)s id %(id)s to %(address)s"),
|
||||||
{"stype": stype, "address": address})
|
{"stype": stype,
|
||||||
|
"id": self.handle.identity,
|
||||||
if six.PY3:
|
"address": address})
|
||||||
self.setsockopt_string(zmq.IDENTITY, str(uuid.uuid1()))
|
|
||||||
else:
|
|
||||||
self.handle.identity = str(uuid.uuid1())
|
|
||||||
|
|
||||||
self.connect(address)
|
self.connect(address)
|
||||||
except zmq.ZMQError as e:
|
except zmq.ZMQError as e:
|
||||||
errmsg = _LE("Failed connecting %(stype) to %(address)s: %(e)s")\
|
errmsg = _LE("Failed connecting %(stype) to %(address)s: %(e)s")\
|
||||||
|
Loading…
Reference in New Issue
Block a user