diff --git a/nova/openstack/common/rpc/__init__.py b/nova/openstack/common/rpc/__init__.py index 06f2dfb2c14f..d2faff3debd1 100644 --- a/nova/openstack/common/rpc/__init__.py +++ b/nova/openstack/common/rpc/__init__.py @@ -47,7 +47,9 @@ rpc_opts = [ help='Seconds to wait before a cast expires (TTL). ' 'Only supported by impl_zmq.'), cfg.ListOpt('allowed_rpc_exception_modules', - default=['nova.openstack.common.exception', 'nova.exception'], + default=['nova.openstack.common.exception', + 'nova.exception', + ], help='Modules of exceptions that are permitted to be recreated' 'upon receiving exception data from an rpc call.'), cfg.StrOpt('control_exchange', diff --git a/nova/openstack/common/rpc/amqp.py b/nova/openstack/common/rpc/amqp.py index 13ff5665e553..09f20fe81f58 100644 --- a/nova/openstack/common/rpc/amqp.py +++ b/nova/openstack/common/rpc/amqp.py @@ -35,8 +35,8 @@ from eventlet import pools from eventlet import semaphore from nova.openstack.common import excutils -from nova.openstack.common import local from nova.openstack.common.gettextutils import _ +from nova.openstack.common import local from nova.openstack.common.rpc import common as rpc_common diff --git a/nova/openstack/common/rpc/common.py b/nova/openstack/common/rpc/common.py index 7bc65a6a66a5..01db756ebb60 100644 --- a/nova/openstack/common/rpc/common.py +++ b/nova/openstack/common/rpc/common.py @@ -108,7 +108,7 @@ class Connection(object): """ raise NotImplementedError() - def create_consumer(self, conf, topic, proxy, fanout=False): + def create_consumer(self, topic, proxy, fanout=False): """Create a consumer on this connection. A consumer is associated with a message queue on the backend message @@ -117,7 +117,6 @@ class Connection(object): off of the queue will determine which method gets called on the proxy object. - :param conf: An openstack.common.cfg configuration object. :param topic: This is a name associated with what to consume from. Multiple instances of a service may consume from the same topic. For example, all instances of nova-compute consume @@ -133,7 +132,7 @@ class Connection(object): """ raise NotImplementedError() - def create_worker(self, conf, topic, proxy, pool_name): + def create_worker(self, topic, proxy, pool_name): """Create a worker on this connection. A worker is like a regular consumer of messages directed to a @@ -143,7 +142,6 @@ class Connection(object): be asked to process it. Load is distributed across the members of the pool in round-robin fashion. - :param conf: An openstack.common.cfg configuration object. :param topic: This is a name associated with what to consume from. Multiple instances of a service may consume from the same topic. diff --git a/nova/openstack/common/rpc/impl_qpid.py b/nova/openstack/common/rpc/impl_qpid.py index 202988faf70c..4663f00461e3 100644 --- a/nova/openstack/common/rpc/impl_qpid.py +++ b/nova/openstack/common/rpc/impl_qpid.py @@ -329,7 +329,7 @@ class Connection(object): if self.conf.qpid_reconnect_interval: self.connection.reconnect_interval = ( self.conf.qpid_reconnect_interval) - self.connection.hearbeat = self.conf.qpid_heartbeat + self.connection.heartbeat = self.conf.qpid_heartbeat self.connection.protocol = self.conf.qpid_protocol self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay diff --git a/nova/openstack/common/rpc/impl_zmq.py b/nova/openstack/common/rpc/impl_zmq.py index fc342de50007..6f215891bb90 100644 --- a/nova/openstack/common/rpc/impl_zmq.py +++ b/nova/openstack/common/rpc/impl_zmq.py @@ -47,9 +47,12 @@ zmq_opts = [ 'address.'), # The module.Class to use for matchmaking. - cfg.StrOpt('rpc_zmq_matchmaker', - default='nova.openstack.common.rpc.matchmaker.MatchMakerLocalhost', - help='MatchMaker driver'), + cfg.StrOpt( + 'rpc_zmq_matchmaker', + default=('nova.openstack.common.rpc.' + 'matchmaker.MatchMakerLocalhost'), + help='MatchMaker driver', + ), # The following port is unassigned by IANA as of 2012-05-21 cfg.IntOpt('rpc_zmq_port', default=9501, @@ -60,15 +63,16 @@ zmq_opts = [ cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack', help='Directory for holding IPC sockets'), + cfg.StrOpt('rpc_zmq_host', default=socket.gethostname(), help='Name of this node. Must be a valid hostname, FQDN, or ' - 'IP address') + 'IP address. Must match "host" option, if running Nova.') ] # These globals are defined in register_opts(conf), # a mandatory initialization call -FLAGS = None +CONF = None ZMQ_CTX = None # ZeroMQ Context, must be global. matchmaker = None # memoized matchmaker object @@ -270,7 +274,7 @@ class InternalContext(object): ctx.replies) LOG.debug(_("Sending reply")) - cast(FLAGS, ctx, topic, { + cast(CONF, ctx, topic, { 'method': '-process_reply', 'args': { 'msg_id': msg_id, @@ -325,7 +329,6 @@ class ZmqBaseReactor(ConsumerBase): def __init__(self, conf): super(ZmqBaseReactor, self).__init__() - self.conf = conf self.mapping = {} self.proxies = {} self.threads = [] @@ -401,7 +404,7 @@ class ZmqProxy(ZmqBaseReactor): super(ZmqProxy, self).__init__(conf) self.topic_proxy = {} - ipc_dir = conf.rpc_zmq_ipc_dir + ipc_dir = CONF.rpc_zmq_ipc_dir self.topic_proxy['zmq_replies'] = \ ZmqSocket("ipc://%s/zmq_topic_zmq_replies" % (ipc_dir, ), @@ -409,7 +412,7 @@ class ZmqProxy(ZmqBaseReactor): self.sockets.append(self.topic_proxy['zmq_replies']) def consume(self, sock): - ipc_dir = self.conf.rpc_zmq_ipc_dir + ipc_dir = CONF.rpc_zmq_ipc_dir #TODO(ewindisch): use zero-copy (i.e. references, not copying) data = sock.recv() @@ -483,7 +486,6 @@ class Connection(rpc_common.Connection): """Manages connections and threads.""" def __init__(self, conf): - self.conf = conf self.reactor = ZmqReactor(conf) def create_consumer(self, topic, proxy, fanout=False): @@ -504,7 +506,7 @@ class Connection(rpc_common.Connection): # Receive messages from (local) proxy inaddr = "ipc://%s/zmq_topic_%s" % \ - (self.conf.rpc_zmq_ipc_dir, topic) + (CONF.rpc_zmq_ipc_dir, topic) LOG.debug(_("Consumer is a zmq.%s"), ['PULL', 'SUB'][sock_type == zmq.SUB]) @@ -523,7 +525,7 @@ class Connection(rpc_common.Connection): def _cast(addr, context, msg_id, topic, msg, timeout=None): - timeout_cast = timeout or FLAGS.rpc_cast_timeout + timeout_cast = timeout or CONF.rpc_cast_timeout payload = [RpcContext.marshal(context), msg] with Timeout(timeout_cast, exception=rpc_common.Timeout): @@ -541,13 +543,13 @@ def _cast(addr, context, msg_id, topic, msg, timeout=None): def _call(addr, context, msg_id, topic, msg, timeout=None): # timeout_response is how long we wait for a response - timeout = timeout or FLAGS.rpc_response_timeout + timeout = timeout or CONF.rpc_response_timeout # The msg_id is used to track replies. msg_id = str(uuid.uuid4().hex) # Replies always come into the reply service. - reply_topic = "zmq_replies.%s" % FLAGS.rpc_zmq_host + reply_topic = "zmq_replies.%s" % CONF.rpc_zmq_host LOG.debug(_("Creating payload")) # Curry the original request into a reply method. @@ -569,7 +571,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None): with Timeout(timeout, exception=rpc_common.Timeout): try: msg_waiter = ZmqSocket( - "ipc://%s/zmq_topic_zmq_replies" % FLAGS.rpc_zmq_ipc_dir, + "ipc://%s/zmq_topic_zmq_replies" % CONF.rpc_zmq_ipc_dir, zmq.SUB, subscribe=msg_id, bind=False ) @@ -595,7 +597,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None): # responses for Exceptions. for resp in responses: if isinstance(resp, types.DictType) and 'exc' in resp: - raise rpc_common.deserialize_remote_exception(FLAGS, resp['exc']) + raise rpc_common.deserialize_remote_exception(CONF, resp['exc']) return responses[-1] @@ -606,7 +608,7 @@ def _multi_send(method, context, topic, msg, timeout=None): dispatches to the matchmaker and sends message to all relevant hosts. """ - conf = FLAGS + conf = CONF LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))}) queues = matchmaker.queues(topic) @@ -693,11 +695,11 @@ def register_opts(conf): # We memoize through these globals global ZMQ_CTX global matchmaker - global FLAGS + global CONF - if not FLAGS: + if not CONF: conf.register_opts(zmq_opts) - FLAGS = conf + CONF = conf # Don't re-set, if this method is called twice. if not ZMQ_CTX: ZMQ_CTX = zmq.Context(conf.rpc_zmq_contexts) @@ -716,3 +718,6 @@ def register_opts(conf): mm_impl = importutils.import_module(mm_module) mm_constructor = getattr(mm_impl, mm_class) matchmaker = mm_constructor() + + +register_opts(cfg.CONF) diff --git a/nova/openstack/common/rpc/proxy.py b/nova/openstack/common/rpc/proxy.py index fe28cdde517a..a0775528e706 100644 --- a/nova/openstack/common/rpc/proxy.py +++ b/nova/openstack/common/rpc/proxy.py @@ -112,11 +112,12 @@ class RpcProxy(object): self._set_version(msg, version) rpc.cast(context, self._get_topic(topic), msg) - def fanout_cast(self, context, msg, version=None): + def fanout_cast(self, context, msg, topic=None, version=None): """rpc.fanout_cast() a remote method. :param context: The request context :param msg: The message to send, including the method and args. + :param topic: Override the topic for this message. :param version: (Optional) Override the requested API version in this message. @@ -124,7 +125,7 @@ class RpcProxy(object): from the remote method. """ self._set_version(msg, version) - rpc.fanout_cast(context, self.topic, msg) + rpc.fanout_cast(context, self._get_topic(topic), msg) def cast_to_server(self, context, server_params, msg, topic=None, version=None): @@ -144,13 +145,15 @@ class RpcProxy(object): self._set_version(msg, version) rpc.cast_to_server(context, server_params, self._get_topic(topic), msg) - def fanout_cast_to_server(self, context, server_params, msg, version=None): + def fanout_cast_to_server(self, context, server_params, msg, topic=None, + version=None): """rpc.fanout_cast_to_server() a remote method. :param context: The request context :param server_params: Server parameters. See rpc.cast_to_server() for details. :param msg: The message to send, including the method and args. + :param topic: Override the topic for this message. :param version: (Optional) Override the requested API version in this message. @@ -158,4 +161,5 @@ class RpcProxy(object): return values. """ self._set_version(msg, version) - rpc.fanout_cast_to_server(context, server_params, self.topic, msg) + rpc.fanout_cast_to_server(context, server_params, + self._get_topic(topic), msg)