diff --git a/nova/openstack/common/rpc/amqp.py b/nova/openstack/common/rpc/amqp.py index 3324e3758..60bff59fe 100644 --- a/nova/openstack/common/rpc/amqp.py +++ b/nova/openstack/common/rpc/amqp.py @@ -26,7 +26,6 @@ AMQP, but is deprecated and predates this code. """ import inspect -import logging import sys import uuid @@ -38,6 +37,7 @@ from nova.openstack.common import cfg from nova.openstack.common import excutils from nova.openstack.common.gettextutils import _ from nova.openstack.common import local +from nova.openstack.common import log as logging from nova.openstack.common.rpc import common as rpc_common @@ -55,7 +55,7 @@ class Pool(pools.Pool): # TODO(comstud): Timeout connections not used in a while def create(self): - LOG.debug('Pool creating new connection') + LOG.debug(_('Pool creating new connection')) return self.connection_cls(self.conf) def empty(self): @@ -282,7 +282,7 @@ class ProxyCallback(object): ctxt.reply(rval, None, connection_pool=self.connection_pool) # This final None tells multicall that it is done. ctxt.reply(ending=True, connection_pool=self.connection_pool) - except Exception as e: + except Exception: LOG.exception(_('Exception during message handling')) ctxt.reply(None, sys.exc_info(), connection_pool=self.connection_pool) @@ -407,8 +407,9 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg, def notify(conf, context, topic, msg, connection_pool): """Sends a notification event on a topic.""" - event_type = msg.get('event_type') - LOG.debug(_('Sending %(event_type)s on %(topic)s'), locals()) + LOG.debug(_('Sending %(event_type)s on %(topic)s'), + dict(event_type=msg.get('event_type'), + topic=topic)) pack_context(msg, context) with ConnectionContext(conf, connection_pool) as conn: conn.notify_send(topic, msg) diff --git a/nova/openstack/common/rpc/common.py b/nova/openstack/common/rpc/common.py index eb3416804..73a18012c 100644 --- a/nova/openstack/common/rpc/common.py +++ b/nova/openstack/common/rpc/common.py @@ -18,13 +18,13 @@ # under the License. import copy -import logging import traceback from nova.openstack.common.gettextutils import _ from nova.openstack.common import importutils from nova.openstack.common import jsonutils from nova.openstack.common import local +from nova.openstack.common import log as logging LOG = logging.getLogger(__name__) @@ -40,7 +40,7 @@ class RPCException(Exception): try: message = self.message % kwargs - except Exception as e: + except Exception: # kwargs doesn't match a variable in the message # log the issue and the kwargs LOG.exception(_('Exception in string format operation')) @@ -258,7 +258,7 @@ def deserialize_remote_exception(conf, data): # we cannot necessarily change an exception message so we must override # the __str__ method. failure.__class__ = new_ex_type - except TypeError as e: + except TypeError: # NOTE(ameade): If a core exception then just add the traceback to the # first exception argument. failure.args = (message,) + failure.args[1:] diff --git a/nova/openstack/common/rpc/dispatcher.py b/nova/openstack/common/rpc/dispatcher.py index 34c2954db..4dee5d509 100644 --- a/nova/openstack/common/rpc/dispatcher.py +++ b/nova/openstack/common/rpc/dispatcher.py @@ -41,8 +41,8 @@ server side of the API at the same time. However, as the code stands today, there can be both versioned and unversioned APIs implemented in the same code base. - -EXAMPLES: +EXAMPLES +======== Nova was the first project to use versioned rpc APIs. Consider the compute rpc API as an example. The client side is in nova/compute/rpcapi.py and the server @@ -50,12 +50,13 @@ side is in nova/compute/manager.py. Example 1) Adding a new method. +------------------------------- Adding a new method is a backwards compatible change. It should be added to nova/compute/manager.py, and RPC_API_VERSION should be bumped from X.Y to X.Y+1. On the client side, the new method in nova/compute/rpcapi.py should have a specific version specified to indicate the minimum API version that must -be implemented for the method to be supported. For example: +be implemented for the method to be supported. For example:: def get_host_uptime(self, ctxt, host): topic = _compute_topic(self.topic, ctxt, host, None) @@ -67,10 +68,11 @@ get_host_uptime() method. Example 2) Adding a new parameter. +---------------------------------- Adding a new parameter to an rpc method can be made backwards compatible. The RPC_API_VERSION on the server side (nova/compute/manager.py) should be bumped. -The implementation of the method must not expect the parameter to be present. +The implementation of the method must not expect the parameter to be present.:: def some_remote_method(self, arg1, arg2, newarg=None): # The code needs to deal with newarg=None for cases diff --git a/nova/openstack/common/rpc/impl_fake.py b/nova/openstack/common/rpc/impl_fake.py index 8db0da015..0e60da3f7 100644 --- a/nova/openstack/common/rpc/impl_fake.py +++ b/nova/openstack/common/rpc/impl_fake.py @@ -18,11 +18,15 @@ queues. Casts will block, but this is very useful for tests. """ import inspect +# NOTE(russellb): We specifically want to use json, not our own jsonutils. +# jsonutils has some extra logic to automatically convert objects to primitive +# types so that they can be serialized. We want to catch all cases where +# non-primitive types make it into this code and treat it as an error. +import json import time import eventlet -from nova.openstack.common import jsonutils from nova.openstack.common.rpc import common as rpc_common CONSUMERS = {} @@ -121,7 +125,7 @@ def create_connection(conf, new=True): def check_serialize(msg): """Make sure a message intended for rpc can be serialized.""" - jsonutils.dumps(msg) + json.dumps(msg) def multicall(conf, context, topic, msg, timeout=None): @@ -154,6 +158,7 @@ def call(conf, context, topic, msg, timeout=None): def cast(conf, context, topic, msg): + check_serialize(msg) try: call(conf, context, topic, msg) except Exception: diff --git a/nova/openstack/common/rpc/impl_qpid.py b/nova/openstack/common/rpc/impl_qpid.py index b87050753..5570ea867 100644 --- a/nova/openstack/common/rpc/impl_qpid.py +++ b/nova/openstack/common/rpc/impl_qpid.py @@ -17,7 +17,6 @@ import functools import itertools -import logging import time import uuid @@ -29,6 +28,7 @@ import qpid.messaging.exceptions from nova.openstack.common import cfg from nova.openstack.common.gettextutils import _ from nova.openstack.common import jsonutils +from nova.openstack.common import log as logging from nova.openstack.common.rpc import amqp as rpc_amqp from nova.openstack.common.rpc import common as rpc_common @@ -41,6 +41,9 @@ qpid_opts = [ cfg.StrOpt('qpid_port', default='5672', help='Qpid broker port'), + cfg.ListOpt('qpid_hosts', + default=['$qpid_hostname:$qpid_port'], + help='Qpid HA cluster host:port pairs'), cfg.StrOpt('qpid_username', default='', help='Username for qpid connection'), @@ -277,22 +280,21 @@ class Connection(object): self.conf = conf params = { - 'hostname': self.conf.qpid_hostname, - 'port': self.conf.qpid_port, + 'qpid_hosts': self.conf.qpid_hosts, 'username': self.conf.qpid_username, 'password': self.conf.qpid_password, } params.update(server_params or {}) - self.broker = params['hostname'] + ":" + str(params['port']) + self.brokers = params['qpid_hosts'] self.username = params['username'] self.password = params['password'] - self.connection_create() + self.connection_create(self.brokers[0]) self.reconnect() - def connection_create(self): + def connection_create(self, broker): # Create the connection - this does not open the connection - self.connection = qpid.messaging.Connection(self.broker) + self.connection = qpid.messaging.Connection(broker) # Check if flags are set and if so set them for the connection # before we call open @@ -320,10 +322,14 @@ class Connection(object): except qpid.messaging.exceptions.ConnectionError: pass + attempt = 0 delay = 1 while True: + broker = self.brokers[attempt % len(self.brokers)] + attempt += 1 + try: - self.connection_create() + self.connection_create(broker) self.connection.open() except qpid.messaging.exceptions.ConnectionError, e: msg_dict = dict(e=e, delay=delay) @@ -333,10 +339,9 @@ class Connection(object): time.sleep(delay) delay = min(2 * delay, 60) else: + LOG.info(_('Connected to AMQP server on %s'), broker) break - LOG.info(_('Connected to AMQP server on %s'), self.broker) - self.session = self.connection.session() if self.consumers: diff --git a/nova/openstack/common/rpc/matchmaker.py b/nova/openstack/common/rpc/matchmaker.py index 783e3713c..8b2c67a44 100644 --- a/nova/openstack/common/rpc/matchmaker.py +++ b/nova/openstack/common/rpc/matchmaker.py @@ -21,10 +21,10 @@ return keys for direct exchanges, per (approximate) AMQP parlance. import contextlib import itertools import json -import logging from nova.openstack.common import cfg from nova.openstack.common.gettextutils import _ +from nova.openstack.common import log as logging matchmaker_opts = [ diff --git a/nova/openstack/common/rpc/service.py b/nova/openstack/common/rpc/service.py index 15508e432..94dc7960e 100644 --- a/nova/openstack/common/rpc/service.py +++ b/nova/openstack/common/rpc/service.py @@ -57,6 +57,11 @@ class Service(service.Service): self.conn.create_consumer(self.topic, dispatcher, fanout=True) + # Hook to allow the manager to do other initializations after + # the rpc connection is created. + if callable(getattr(self.manager, 'initialize_service_hook', None)): + self.manager.initialize_service_hook(self) + # Consume from all consumers in a thread self.conn.consume_in_thread()