diff --git a/bin/oslo-zmq-receiver b/bin/oslo-zmq-receiver index 4ef24b6d0..00f4a85e0 100755 --- a/bin/oslo-zmq-receiver +++ b/bin/oslo-zmq-receiver @@ -47,28 +47,7 @@ CONF(sys.argv[1:], project='oslo') def main(): logging.setup("oslo") - ipc_dir = CONF.rpc_zmq_ipc_dir - - # Create the necessary directories/files for this service. - if not os.path.isdir(ipc_dir): - try: - utils.execute('mkdir', '-p', ipc_dir, run_as_root=True) - utils.execute('chown', "%s:%s" % (os.getuid(), os.getgid()), - ipc_dir, run_as_root=True) - utils.execute('chmod', '750', ipc_dir, run_as_root=True) - except exception.ProcessExecutionError: - logging.error(_("Could not create IPC socket directory.")) - return - with contextlib.closing(impl_zmq.ZmqProxy(CONF)) as reactor: - consume_in = "tcp://%s:%s" % \ - (CONF.rpc_zmq_bind_address, - CONF.rpc_zmq_port) - consumption_proxy = impl_zmq.InternalContext(None) - - reactor.register(consumption_proxy, - consume_in, zmq.PULL, out_bind=True) - reactor.consume_in_thread() reactor.wait() diff --git a/openstack/common/rpc/impl_zmq.py b/openstack/common/rpc/impl_zmq.py index d595212b1..4df0abab4 100644 --- a/openstack/common/rpc/impl_zmq.py +++ b/openstack/common/rpc/impl_zmq.py @@ -15,6 +15,7 @@ # under the License. import pprint +import os import socket import string import sys @@ -29,6 +30,7 @@ from openstack.common import cfg from openstack.common.gettextutils import _ from openstack.common import importutils from openstack.common import jsonutils +from openstack.common import processutils as utils from openstack.common.rpc import common as rpc_common @@ -487,6 +489,37 @@ class ZmqProxy(ZmqBaseReactor): LOG.error(_("Local per-topic backlog buffer full for topic " "%(topic)s. Dropping message.") % {'topic': topic}) + def consume_in_thread(self): + """Runs the ZmqProxy service""" + ipc_dir = CONF.rpc_zmq_ipc_dir + consume_in = "tcp://%s:%s" % \ + (CONF.rpc_zmq_bind_address, + CONF.rpc_zmq_port) + consumption_proxy = InternalContext(None) + + if not os.path.isdir(ipc_dir): + try: + utils.execute('mkdir', '-p', ipc_dir, run_as_root=True) + utils.execute('chown', "%s:%s" % (os.getuid(), os.getgid()), + ipc_dir, run_as_root=True) + utils.execute('chmod', '750', ipc_dir, run_as_root=True) + except utils.ProcessExecutionError: + LOG.error(_("Could not create IPC directory %s") % + (ipc_dir, )) + raise + + try: + self.register(consumption_proxy, + consume_in, + zmq.PULL, + out_bind=True) + except zmq.ZMQError: + LOG.error(_("Could not create ZeroMQ receiver daemon. " + "Socket may already be in use.")) + raise + + super(ZmqProxy, self).consume_in_thread() + class ZmqReactor(ZmqBaseReactor): """ diff --git a/tests/unit/rpc/test_zmq.py b/tests/unit/rpc/test_zmq.py index d43c3846a..d120bb20b 100644 --- a/tests/unit/rpc/test_zmq.py +++ b/tests/unit/rpc/test_zmq.py @@ -75,7 +75,8 @@ class _RpcZmqBaseTestCase(common.BaseRpcTestCase): self.config(rpc_zmq_ipc_dir=internal_ipc_dir) LOG.info(_("Running internal zmq receiver.")) - self.setup_receiver(internal_ipc_dir) + reactor = impl_zmq.ZmqProxy(FLAGS) + reactor.consume_in_thread() else: LOG.warning(_("Detected zmq-receiver socket.")) LOG.warning(_("Assuming nova-rpc-zmq-receiver is running.")) @@ -84,32 +85,6 @@ class _RpcZmqBaseTestCase(common.BaseRpcTestCase): super(_RpcZmqBaseTestCase, self).setUp( topic=topic, topic_nested=topic_nested) - def setup_receiver(self, ipc_dir): - # Only launch the receiver if it isn't running independently. - # This is checked again, with the (possibly) new ipc_dir. - if not os.path.isdir(ipc_dir): - try: - os.mkdir(ipc_dir) - except OSError: - LOG.error(_("Could not create IPC directory %s") % (ipc_dir, )) - raise - try: - self.reactor = impl_zmq.ZmqProxy(FLAGS) - consume_in = "tcp://%s:%s" % \ - (FLAGS.rpc_zmq_bind_address, - FLAGS.rpc_zmq_port) - consumption_proxy = impl_zmq.InternalContext(None) - - self.reactor.register(consumption_proxy, - consume_in, - zmq.PULL, - out_bind=True) - self.reactor.consume_in_thread() - except zmq.ZMQError: - LOG.error(_("Could not create ZeroMQ receiver daemon. " - "Socket may already be in use.")) - raise - @testutils.skip_if(not impl_zmq, "ZeroMQ library required") def tearDown(self): if self.reactor: