Browse Source

Merge "Port zmq driver to Python 3"

changes/89/148889/1
Jenkins 6 years ago
committed by Gerrit Code Review
parent
commit
ae009a4ed9
1 changed files with 11 additions and 4 deletions
  1. +11
    -4
      oslo_messaging/_drivers/impl_zmq.py

+ 11
- 4
oslo_messaging/_drivers/impl_zmq.py View File

@ -243,14 +243,17 @@ class ZmqClient(object):
msg_id = msg_id or 0
if not envelope:
self.outq.send(map(bytes,
(msg_id, topic, 'cast', _serialize(data))))
data = _serialize(data)
if six.PY3:
data = data.encode('utf-8')
data = (msg_id, topic, b'cast', data)
self.outq.send([bytes(item) for item in data])
return
rpc_envelope = rpc_common.serialize_msg(data[1])
zmq_msg = moves.reduce(lambda x, y: x + y, rpc_envelope.items())
self.outq.send(map(bytes,
(msg_id, topic, 'impl_zmq_v2', data[0]) + zmq_msg))
data = (msg_id, topic, b'impl_zmq_v2', data[0]) + zmq_msg
self.outq.send([bytes(item) for item in data])
def close(self):
self.outq.close()
@ -453,6 +456,8 @@ class ZmqProxy(ZmqBaseReactor):
data = sock.recv(copy=False)
topic = data[1].bytes
if six.PY3:
topic = topic.decode('utf-8')
if topic.startswith('fanout~'):
sock_type = zmq.PUB
@ -658,6 +663,8 @@ def _cast(addr, context, topic, msg, timeout=None, envelope=False,
allowed_remote_exmods = allowed_remote_exmods or []
timeout_cast = timeout or CONF.rpc_cast_timeout
payload = [RpcContext.marshal(context), msg]
if six.PY3:
topic = topic.encode('utf-8')
with Timeout(timeout_cast, exception=rpc_common.Timeout):
conn = None


Loading…
Cancel
Save