From 1fa0e6a8fe21faea5a4ef15d6f4736f935b5be76 Mon Sep 17 00:00:00 2001 From: Victor Stinner Date: Mon, 5 Jan 2015 18:01:13 +0100 Subject: [PATCH] Port zmq driver to Python 3 With eventlet 0.16, it becomes possible to run Oslo Messaging tests on Python 3 with eventlet. This change ports the zmq driver to Python 3: * encode the topic explicitly to UTF-8 * use a list comprehension instead of map() to also get a list on Python 3 (not a generator) The following eventlet change is needed to run tests: https://github.com/eventlet/eventlet/pull/187 Related eventlet issue: https://github.com/eventlet/eventlet/issues/185 I will propose a different change to enable tests with eventlet enabled when a release of eventlet including this fix will be available. Change-Id: Ic8fec515cfa757e08ffb9604e3bfb2e87d08f3d8 --- oslo_messaging/_drivers/impl_zmq.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/oslo_messaging/_drivers/impl_zmq.py b/oslo_messaging/_drivers/impl_zmq.py index 83668d33f..678b3f5dd 100644 --- a/oslo_messaging/_drivers/impl_zmq.py +++ b/oslo_messaging/_drivers/impl_zmq.py @@ -243,14 +243,17 @@ class ZmqClient(object): msg_id = msg_id or 0 if not envelope: - self.outq.send(map(bytes, - (msg_id, topic, 'cast', _serialize(data)))) + data = _serialize(data) + if six.PY3: + data = data.encode('utf-8') + data = (msg_id, topic, b'cast', data) + self.outq.send([bytes(item) for item in data]) return rpc_envelope = rpc_common.serialize_msg(data[1]) zmq_msg = moves.reduce(lambda x, y: x + y, rpc_envelope.items()) - self.outq.send(map(bytes, - (msg_id, topic, 'impl_zmq_v2', data[0]) + zmq_msg)) + data = (msg_id, topic, b'impl_zmq_v2', data[0]) + zmq_msg + self.outq.send([bytes(item) for item in data]) def close(self): self.outq.close() @@ -453,6 +456,8 @@ class ZmqProxy(ZmqBaseReactor): data = sock.recv(copy=False) topic = data[1].bytes + if six.PY3: + topic = topic.decode('utf-8') if topic.startswith('fanout~'): sock_type = zmq.PUB @@ -658,6 +663,8 @@ def _cast(addr, context, topic, msg, timeout=None, envelope=False, allowed_remote_exmods = allowed_remote_exmods or [] timeout_cast = timeout or CONF.rpc_cast_timeout payload = [RpcContext.marshal(context), msg] + if six.PY3: + topic = topic.encode('utf-8') with Timeout(timeout_cast, exception=rpc_common.Timeout): conn = None