Update RPC code from common
* Fix impl_zmq following common migration. A number of problems have been discovered with impl_zmq following the move to common. These problems have been resolved in openstack-common. This patch squashes those changes and brings them into Nova. * rpc/common - Removes conf argument where it was no longer necessary. * AMQP - Fix import order * Qpid - Fix heartbeat * __init__ - Fix line length, pep8 * proxy - Enable fanout_cast and fanout_cast_to_server to provide topic Change-Id: If9c4616a50d7ec6c7a7e4adad58dc2fdcfc3ffdf
This commit is contained in:
parent
91f2ee518a
commit
23c63e08e8
@ -47,7 +47,9 @@ rpc_opts = [
|
||||
help='Seconds to wait before a cast expires (TTL). '
|
||||
'Only supported by impl_zmq.'),
|
||||
cfg.ListOpt('allowed_rpc_exception_modules',
|
||||
default=['nova.openstack.common.exception', 'nova.exception'],
|
||||
default=['nova.openstack.common.exception',
|
||||
'nova.exception',
|
||||
],
|
||||
help='Modules of exceptions that are permitted to be recreated'
|
||||
'upon receiving exception data from an rpc call.'),
|
||||
cfg.StrOpt('control_exchange',
|
||||
|
@ -35,8 +35,8 @@ from eventlet import pools
|
||||
from eventlet import semaphore
|
||||
|
||||
from nova.openstack.common import excutils
|
||||
from nova.openstack.common import local
|
||||
from nova.openstack.common.gettextutils import _
|
||||
from nova.openstack.common import local
|
||||
from nova.openstack.common.rpc import common as rpc_common
|
||||
|
||||
|
||||
|
@ -108,7 +108,7 @@ class Connection(object):
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def create_consumer(self, conf, topic, proxy, fanout=False):
|
||||
def create_consumer(self, topic, proxy, fanout=False):
|
||||
"""Create a consumer on this connection.
|
||||
|
||||
A consumer is associated with a message queue on the backend message
|
||||
@ -117,7 +117,6 @@ class Connection(object):
|
||||
off of the queue will determine which method gets called on the proxy
|
||||
object.
|
||||
|
||||
:param conf: An openstack.common.cfg configuration object.
|
||||
:param topic: This is a name associated with what to consume from.
|
||||
Multiple instances of a service may consume from the same
|
||||
topic. For example, all instances of nova-compute consume
|
||||
@ -133,7 +132,7 @@ class Connection(object):
|
||||
"""
|
||||
raise NotImplementedError()
|
||||
|
||||
def create_worker(self, conf, topic, proxy, pool_name):
|
||||
def create_worker(self, topic, proxy, pool_name):
|
||||
"""Create a worker on this connection.
|
||||
|
||||
A worker is like a regular consumer of messages directed to a
|
||||
@ -143,7 +142,6 @@ class Connection(object):
|
||||
be asked to process it. Load is distributed across the members
|
||||
of the pool in round-robin fashion.
|
||||
|
||||
:param conf: An openstack.common.cfg configuration object.
|
||||
:param topic: This is a name associated with what to consume from.
|
||||
Multiple instances of a service may consume from the same
|
||||
topic.
|
||||
|
@ -329,7 +329,7 @@ class Connection(object):
|
||||
if self.conf.qpid_reconnect_interval:
|
||||
self.connection.reconnect_interval = (
|
||||
self.conf.qpid_reconnect_interval)
|
||||
self.connection.hearbeat = self.conf.qpid_heartbeat
|
||||
self.connection.heartbeat = self.conf.qpid_heartbeat
|
||||
self.connection.protocol = self.conf.qpid_protocol
|
||||
self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay
|
||||
|
||||
|
@ -47,9 +47,12 @@ zmq_opts = [
|
||||
'address.'),
|
||||
|
||||
# The module.Class to use for matchmaking.
|
||||
cfg.StrOpt('rpc_zmq_matchmaker',
|
||||
default='nova.openstack.common.rpc.matchmaker.MatchMakerLocalhost',
|
||||
help='MatchMaker driver'),
|
||||
cfg.StrOpt(
|
||||
'rpc_zmq_matchmaker',
|
||||
default=('nova.openstack.common.rpc.'
|
||||
'matchmaker.MatchMakerLocalhost'),
|
||||
help='MatchMaker driver',
|
||||
),
|
||||
|
||||
# The following port is unassigned by IANA as of 2012-05-21
|
||||
cfg.IntOpt('rpc_zmq_port', default=9501,
|
||||
@ -60,15 +63,16 @@ zmq_opts = [
|
||||
|
||||
cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack',
|
||||
help='Directory for holding IPC sockets'),
|
||||
|
||||
cfg.StrOpt('rpc_zmq_host', default=socket.gethostname(),
|
||||
help='Name of this node. Must be a valid hostname, FQDN, or '
|
||||
'IP address')
|
||||
'IP address. Must match "host" option, if running Nova.')
|
||||
]
|
||||
|
||||
|
||||
# These globals are defined in register_opts(conf),
|
||||
# a mandatory initialization call
|
||||
FLAGS = None
|
||||
CONF = None
|
||||
ZMQ_CTX = None # ZeroMQ Context, must be global.
|
||||
matchmaker = None # memoized matchmaker object
|
||||
|
||||
@ -270,7 +274,7 @@ class InternalContext(object):
|
||||
ctx.replies)
|
||||
|
||||
LOG.debug(_("Sending reply"))
|
||||
cast(FLAGS, ctx, topic, {
|
||||
cast(CONF, ctx, topic, {
|
||||
'method': '-process_reply',
|
||||
'args': {
|
||||
'msg_id': msg_id,
|
||||
@ -325,7 +329,6 @@ class ZmqBaseReactor(ConsumerBase):
|
||||
def __init__(self, conf):
|
||||
super(ZmqBaseReactor, self).__init__()
|
||||
|
||||
self.conf = conf
|
||||
self.mapping = {}
|
||||
self.proxies = {}
|
||||
self.threads = []
|
||||
@ -401,7 +404,7 @@ class ZmqProxy(ZmqBaseReactor):
|
||||
super(ZmqProxy, self).__init__(conf)
|
||||
|
||||
self.topic_proxy = {}
|
||||
ipc_dir = conf.rpc_zmq_ipc_dir
|
||||
ipc_dir = CONF.rpc_zmq_ipc_dir
|
||||
|
||||
self.topic_proxy['zmq_replies'] = \
|
||||
ZmqSocket("ipc://%s/zmq_topic_zmq_replies" % (ipc_dir, ),
|
||||
@ -409,7 +412,7 @@ class ZmqProxy(ZmqBaseReactor):
|
||||
self.sockets.append(self.topic_proxy['zmq_replies'])
|
||||
|
||||
def consume(self, sock):
|
||||
ipc_dir = self.conf.rpc_zmq_ipc_dir
|
||||
ipc_dir = CONF.rpc_zmq_ipc_dir
|
||||
|
||||
#TODO(ewindisch): use zero-copy (i.e. references, not copying)
|
||||
data = sock.recv()
|
||||
@ -483,7 +486,6 @@ class Connection(rpc_common.Connection):
|
||||
"""Manages connections and threads."""
|
||||
|
||||
def __init__(self, conf):
|
||||
self.conf = conf
|
||||
self.reactor = ZmqReactor(conf)
|
||||
|
||||
def create_consumer(self, topic, proxy, fanout=False):
|
||||
@ -504,7 +506,7 @@ class Connection(rpc_common.Connection):
|
||||
|
||||
# Receive messages from (local) proxy
|
||||
inaddr = "ipc://%s/zmq_topic_%s" % \
|
||||
(self.conf.rpc_zmq_ipc_dir, topic)
|
||||
(CONF.rpc_zmq_ipc_dir, topic)
|
||||
|
||||
LOG.debug(_("Consumer is a zmq.%s"),
|
||||
['PULL', 'SUB'][sock_type == zmq.SUB])
|
||||
@ -523,7 +525,7 @@ class Connection(rpc_common.Connection):
|
||||
|
||||
|
||||
def _cast(addr, context, msg_id, topic, msg, timeout=None):
|
||||
timeout_cast = timeout or FLAGS.rpc_cast_timeout
|
||||
timeout_cast = timeout or CONF.rpc_cast_timeout
|
||||
payload = [RpcContext.marshal(context), msg]
|
||||
|
||||
with Timeout(timeout_cast, exception=rpc_common.Timeout):
|
||||
@ -541,13 +543,13 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None):
|
||||
|
||||
def _call(addr, context, msg_id, topic, msg, timeout=None):
|
||||
# timeout_response is how long we wait for a response
|
||||
timeout = timeout or FLAGS.rpc_response_timeout
|
||||
timeout = timeout or CONF.rpc_response_timeout
|
||||
|
||||
# The msg_id is used to track replies.
|
||||
msg_id = str(uuid.uuid4().hex)
|
||||
|
||||
# Replies always come into the reply service.
|
||||
reply_topic = "zmq_replies.%s" % FLAGS.rpc_zmq_host
|
||||
reply_topic = "zmq_replies.%s" % CONF.rpc_zmq_host
|
||||
|
||||
LOG.debug(_("Creating payload"))
|
||||
# Curry the original request into a reply method.
|
||||
@ -569,7 +571,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None):
|
||||
with Timeout(timeout, exception=rpc_common.Timeout):
|
||||
try:
|
||||
msg_waiter = ZmqSocket(
|
||||
"ipc://%s/zmq_topic_zmq_replies" % FLAGS.rpc_zmq_ipc_dir,
|
||||
"ipc://%s/zmq_topic_zmq_replies" % CONF.rpc_zmq_ipc_dir,
|
||||
zmq.SUB, subscribe=msg_id, bind=False
|
||||
)
|
||||
|
||||
@ -595,7 +597,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None):
|
||||
# responses for Exceptions.
|
||||
for resp in responses:
|
||||
if isinstance(resp, types.DictType) and 'exc' in resp:
|
||||
raise rpc_common.deserialize_remote_exception(FLAGS, resp['exc'])
|
||||
raise rpc_common.deserialize_remote_exception(CONF, resp['exc'])
|
||||
|
||||
return responses[-1]
|
||||
|
||||
@ -606,7 +608,7 @@ def _multi_send(method, context, topic, msg, timeout=None):
|
||||
dispatches to the matchmaker and sends
|
||||
message to all relevant hosts.
|
||||
"""
|
||||
conf = FLAGS
|
||||
conf = CONF
|
||||
LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))})
|
||||
|
||||
queues = matchmaker.queues(topic)
|
||||
@ -693,11 +695,11 @@ def register_opts(conf):
|
||||
# We memoize through these globals
|
||||
global ZMQ_CTX
|
||||
global matchmaker
|
||||
global FLAGS
|
||||
global CONF
|
||||
|
||||
if not FLAGS:
|
||||
if not CONF:
|
||||
conf.register_opts(zmq_opts)
|
||||
FLAGS = conf
|
||||
CONF = conf
|
||||
# Don't re-set, if this method is called twice.
|
||||
if not ZMQ_CTX:
|
||||
ZMQ_CTX = zmq.Context(conf.rpc_zmq_contexts)
|
||||
@ -716,3 +718,6 @@ def register_opts(conf):
|
||||
mm_impl = importutils.import_module(mm_module)
|
||||
mm_constructor = getattr(mm_impl, mm_class)
|
||||
matchmaker = mm_constructor()
|
||||
|
||||
|
||||
register_opts(cfg.CONF)
|
||||
|
@ -112,11 +112,12 @@ class RpcProxy(object):
|
||||
self._set_version(msg, version)
|
||||
rpc.cast(context, self._get_topic(topic), msg)
|
||||
|
||||
def fanout_cast(self, context, msg, version=None):
|
||||
def fanout_cast(self, context, msg, topic=None, version=None):
|
||||
"""rpc.fanout_cast() a remote method.
|
||||
|
||||
:param context: The request context
|
||||
:param msg: The message to send, including the method and args.
|
||||
:param topic: Override the topic for this message.
|
||||
:param version: (Optional) Override the requested API version in this
|
||||
message.
|
||||
|
||||
@ -124,7 +125,7 @@ class RpcProxy(object):
|
||||
from the remote method.
|
||||
"""
|
||||
self._set_version(msg, version)
|
||||
rpc.fanout_cast(context, self.topic, msg)
|
||||
rpc.fanout_cast(context, self._get_topic(topic), msg)
|
||||
|
||||
def cast_to_server(self, context, server_params, msg, topic=None,
|
||||
version=None):
|
||||
@ -144,13 +145,15 @@ class RpcProxy(object):
|
||||
self._set_version(msg, version)
|
||||
rpc.cast_to_server(context, server_params, self._get_topic(topic), msg)
|
||||
|
||||
def fanout_cast_to_server(self, context, server_params, msg, version=None):
|
||||
def fanout_cast_to_server(self, context, server_params, msg, topic=None,
|
||||
version=None):
|
||||
"""rpc.fanout_cast_to_server() a remote method.
|
||||
|
||||
:param context: The request context
|
||||
:param server_params: Server parameters. See rpc.cast_to_server() for
|
||||
details.
|
||||
:param msg: The message to send, including the method and args.
|
||||
:param topic: Override the topic for this message.
|
||||
:param version: (Optional) Override the requested API version in this
|
||||
message.
|
||||
|
||||
@ -158,4 +161,5 @@ class RpcProxy(object):
|
||||
return values.
|
||||
"""
|
||||
self._set_version(msg, version)
|
||||
rpc.fanout_cast_to_server(context, server_params, self.topic, msg)
|
||||
rpc.fanout_cast_to_server(context, server_params,
|
||||
self._get_topic(topic), msg)
|
||||
|
Loading…
Reference in New Issue
Block a user