reusable ZmqProxy service initialization
Move near-identical proxy setup code from oslo-zmq-receiver and test_zmq into ZmqProxy's consume_in_thread method Provides code reuse and simplifies the tests and receiver binary. Change-Id: I1324eacfa2a456599d5fd462b6476ddf659e95c4
This commit is contained in:
parent
d74668fbed
commit
8a187ecc01
@ -47,28 +47,7 @@ CONF(sys.argv[1:], project='oslo')
|
||||
def main():
|
||||
logging.setup("oslo")
|
||||
|
||||
ipc_dir = CONF.rpc_zmq_ipc_dir
|
||||
|
||||
# Create the necessary directories/files for this service.
|
||||
if not os.path.isdir(ipc_dir):
|
||||
try:
|
||||
utils.execute('mkdir', '-p', ipc_dir, run_as_root=True)
|
||||
utils.execute('chown', "%s:%s" % (os.getuid(), os.getgid()),
|
||||
ipc_dir, run_as_root=True)
|
||||
utils.execute('chmod', '750', ipc_dir, run_as_root=True)
|
||||
except exception.ProcessExecutionError:
|
||||
logging.error(_("Could not create IPC socket directory."))
|
||||
return
|
||||
|
||||
with contextlib.closing(impl_zmq.ZmqProxy(CONF)) as reactor:
|
||||
consume_in = "tcp://%s:%s" % \
|
||||
(CONF.rpc_zmq_bind_address,
|
||||
CONF.rpc_zmq_port)
|
||||
consumption_proxy = impl_zmq.InternalContext(None)
|
||||
|
||||
reactor.register(consumption_proxy,
|
||||
consume_in, zmq.PULL, out_bind=True)
|
||||
|
||||
reactor.consume_in_thread()
|
||||
reactor.wait()
|
||||
|
||||
|
@ -15,6 +15,7 @@
|
||||
# under the License.
|
||||
|
||||
import pprint
|
||||
import os
|
||||
import socket
|
||||
import string
|
||||
import sys
|
||||
@ -29,6 +30,7 @@ from openstack.common import cfg
|
||||
from openstack.common.gettextutils import _
|
||||
from openstack.common import importutils
|
||||
from openstack.common import jsonutils
|
||||
from openstack.common import processutils as utils
|
||||
from openstack.common.rpc import common as rpc_common
|
||||
|
||||
|
||||
@ -487,6 +489,37 @@ class ZmqProxy(ZmqBaseReactor):
|
||||
LOG.error(_("Local per-topic backlog buffer full for topic "
|
||||
"%(topic)s. Dropping message.") % {'topic': topic})
|
||||
|
||||
def consume_in_thread(self):
|
||||
"""Runs the ZmqProxy service"""
|
||||
ipc_dir = CONF.rpc_zmq_ipc_dir
|
||||
consume_in = "tcp://%s:%s" % \
|
||||
(CONF.rpc_zmq_bind_address,
|
||||
CONF.rpc_zmq_port)
|
||||
consumption_proxy = InternalContext(None)
|
||||
|
||||
if not os.path.isdir(ipc_dir):
|
||||
try:
|
||||
utils.execute('mkdir', '-p', ipc_dir, run_as_root=True)
|
||||
utils.execute('chown', "%s:%s" % (os.getuid(), os.getgid()),
|
||||
ipc_dir, run_as_root=True)
|
||||
utils.execute('chmod', '750', ipc_dir, run_as_root=True)
|
||||
except utils.ProcessExecutionError:
|
||||
LOG.error(_("Could not create IPC directory %s") %
|
||||
(ipc_dir, ))
|
||||
raise
|
||||
|
||||
try:
|
||||
self.register(consumption_proxy,
|
||||
consume_in,
|
||||
zmq.PULL,
|
||||
out_bind=True)
|
||||
except zmq.ZMQError:
|
||||
LOG.error(_("Could not create ZeroMQ receiver daemon. "
|
||||
"Socket may already be in use."))
|
||||
raise
|
||||
|
||||
super(ZmqProxy, self).consume_in_thread()
|
||||
|
||||
|
||||
class ZmqReactor(ZmqBaseReactor):
|
||||
"""
|
||||
|
@ -75,7 +75,8 @@ class _RpcZmqBaseTestCase(common.BaseRpcTestCase):
|
||||
self.config(rpc_zmq_ipc_dir=internal_ipc_dir)
|
||||
|
||||
LOG.info(_("Running internal zmq receiver."))
|
||||
self.setup_receiver(internal_ipc_dir)
|
||||
reactor = impl_zmq.ZmqProxy(FLAGS)
|
||||
reactor.consume_in_thread()
|
||||
else:
|
||||
LOG.warning(_("Detected zmq-receiver socket."))
|
||||
LOG.warning(_("Assuming nova-rpc-zmq-receiver is running."))
|
||||
@ -84,32 +85,6 @@ class _RpcZmqBaseTestCase(common.BaseRpcTestCase):
|
||||
super(_RpcZmqBaseTestCase, self).setUp(
|
||||
topic=topic, topic_nested=topic_nested)
|
||||
|
||||
def setup_receiver(self, ipc_dir):
|
||||
# Only launch the receiver if it isn't running independently.
|
||||
# This is checked again, with the (possibly) new ipc_dir.
|
||||
if not os.path.isdir(ipc_dir):
|
||||
try:
|
||||
os.mkdir(ipc_dir)
|
||||
except OSError:
|
||||
LOG.error(_("Could not create IPC directory %s") % (ipc_dir, ))
|
||||
raise
|
||||
try:
|
||||
self.reactor = impl_zmq.ZmqProxy(FLAGS)
|
||||
consume_in = "tcp://%s:%s" % \
|
||||
(FLAGS.rpc_zmq_bind_address,
|
||||
FLAGS.rpc_zmq_port)
|
||||
consumption_proxy = impl_zmq.InternalContext(None)
|
||||
|
||||
self.reactor.register(consumption_proxy,
|
||||
consume_in,
|
||||
zmq.PULL,
|
||||
out_bind=True)
|
||||
self.reactor.consume_in_thread()
|
||||
except zmq.ZMQError:
|
||||
LOG.error(_("Could not create ZeroMQ receiver daemon. "
|
||||
"Socket may already be in use."))
|
||||
raise
|
||||
|
||||
@testutils.skip_if(not impl_zmq, "ZeroMQ library required")
|
||||
def tearDown(self):
|
||||
if self.reactor:
|
||||
|
Loading…
Reference in New Issue
Block a user