Sync latest openstack.common.rpc

Changes since last sync:

 202f568 Use json instead of jsonutils in rpc.impl_fake.
 6d102bc Provide i18n to those messages without _()
 8695285 Qpid H/A cluster support
 faeafe1 Fixes import order
 cf705c5 Make project pyflakes clean.
 05f8ec7 Fix common rpc to use common logging instead of python logging
 b6d24bb updating sphinx documentation
 33fbd87 Added initialize_service_hook for rpc.Service.
 cf849e0 Clean up dictionary use in RPC drivers

Change-Id: I4fbade51390e159bd9cccd2afc918a4f07740993
This commit is contained in:
Mark McLoughlin
2012-12-05 15:12:15 +00:00
committed by Russell Bryant
parent c33de0099e
commit a3a9d544ec
7 changed files with 43 additions and 25 deletions

View File

@@ -26,7 +26,6 @@ AMQP, but is deprecated and predates this code.
""" """
import inspect import inspect
import logging
import sys import sys
import uuid import uuid
@@ -38,6 +37,7 @@ from nova.openstack.common import cfg
from nova.openstack.common import excutils from nova.openstack.common import excutils
from nova.openstack.common.gettextutils import _ from nova.openstack.common.gettextutils import _
from nova.openstack.common import local from nova.openstack.common import local
from nova.openstack.common import log as logging
from nova.openstack.common.rpc import common as rpc_common from nova.openstack.common.rpc import common as rpc_common
@@ -55,7 +55,7 @@ class Pool(pools.Pool):
# TODO(comstud): Timeout connections not used in a while # TODO(comstud): Timeout connections not used in a while
def create(self): def create(self):
LOG.debug('Pool creating new connection') LOG.debug(_('Pool creating new connection'))
return self.connection_cls(self.conf) return self.connection_cls(self.conf)
def empty(self): def empty(self):
@@ -282,7 +282,7 @@ class ProxyCallback(object):
ctxt.reply(rval, None, connection_pool=self.connection_pool) ctxt.reply(rval, None, connection_pool=self.connection_pool)
# This final None tells multicall that it is done. # This final None tells multicall that it is done.
ctxt.reply(ending=True, connection_pool=self.connection_pool) ctxt.reply(ending=True, connection_pool=self.connection_pool)
except Exception as e: except Exception:
LOG.exception(_('Exception during message handling')) LOG.exception(_('Exception during message handling'))
ctxt.reply(None, sys.exc_info(), ctxt.reply(None, sys.exc_info(),
connection_pool=self.connection_pool) connection_pool=self.connection_pool)
@@ -407,8 +407,9 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg,
def notify(conf, context, topic, msg, connection_pool): def notify(conf, context, topic, msg, connection_pool):
"""Sends a notification event on a topic.""" """Sends a notification event on a topic."""
event_type = msg.get('event_type') LOG.debug(_('Sending %(event_type)s on %(topic)s'),
LOG.debug(_('Sending %(event_type)s on %(topic)s'), locals()) dict(event_type=msg.get('event_type'),
topic=topic))
pack_context(msg, context) pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn: with ConnectionContext(conf, connection_pool) as conn:
conn.notify_send(topic, msg) conn.notify_send(topic, msg)

View File

@@ -18,13 +18,13 @@
# under the License. # under the License.
import copy import copy
import logging
import traceback import traceback
from nova.openstack.common.gettextutils import _ from nova.openstack.common.gettextutils import _
from nova.openstack.common import importutils from nova.openstack.common import importutils
from nova.openstack.common import jsonutils from nova.openstack.common import jsonutils
from nova.openstack.common import local from nova.openstack.common import local
from nova.openstack.common import log as logging
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@@ -40,7 +40,7 @@ class RPCException(Exception):
try: try:
message = self.message % kwargs message = self.message % kwargs
except Exception as e: except Exception:
# kwargs doesn't match a variable in the message # kwargs doesn't match a variable in the message
# log the issue and the kwargs # log the issue and the kwargs
LOG.exception(_('Exception in string format operation')) LOG.exception(_('Exception in string format operation'))
@@ -258,7 +258,7 @@ def deserialize_remote_exception(conf, data):
# we cannot necessarily change an exception message so we must override # we cannot necessarily change an exception message so we must override
# the __str__ method. # the __str__ method.
failure.__class__ = new_ex_type failure.__class__ = new_ex_type
except TypeError as e: except TypeError:
# NOTE(ameade): If a core exception then just add the traceback to the # NOTE(ameade): If a core exception then just add the traceback to the
# first exception argument. # first exception argument.
failure.args = (message,) + failure.args[1:] failure.args = (message,) + failure.args[1:]

View File

@@ -41,8 +41,8 @@ server side of the API at the same time. However, as the code stands today,
there can be both versioned and unversioned APIs implemented in the same code there can be both versioned and unversioned APIs implemented in the same code
base. base.
EXAMPLES
EXAMPLES: ========
Nova was the first project to use versioned rpc APIs. Consider the compute rpc Nova was the first project to use versioned rpc APIs. Consider the compute rpc
API as an example. The client side is in nova/compute/rpcapi.py and the server API as an example. The client side is in nova/compute/rpcapi.py and the server
@@ -50,12 +50,13 @@ side is in nova/compute/manager.py.
Example 1) Adding a new method. Example 1) Adding a new method.
-------------------------------
Adding a new method is a backwards compatible change. It should be added to Adding a new method is a backwards compatible change. It should be added to
nova/compute/manager.py, and RPC_API_VERSION should be bumped from X.Y to nova/compute/manager.py, and RPC_API_VERSION should be bumped from X.Y to
X.Y+1. On the client side, the new method in nova/compute/rpcapi.py should X.Y+1. On the client side, the new method in nova/compute/rpcapi.py should
have a specific version specified to indicate the minimum API version that must have a specific version specified to indicate the minimum API version that must
be implemented for the method to be supported. For example: be implemented for the method to be supported. For example::
def get_host_uptime(self, ctxt, host): def get_host_uptime(self, ctxt, host):
topic = _compute_topic(self.topic, ctxt, host, None) topic = _compute_topic(self.topic, ctxt, host, None)
@@ -67,10 +68,11 @@ get_host_uptime() method.
Example 2) Adding a new parameter. Example 2) Adding a new parameter.
----------------------------------
Adding a new parameter to an rpc method can be made backwards compatible. The Adding a new parameter to an rpc method can be made backwards compatible. The
RPC_API_VERSION on the server side (nova/compute/manager.py) should be bumped. RPC_API_VERSION on the server side (nova/compute/manager.py) should be bumped.
The implementation of the method must not expect the parameter to be present. The implementation of the method must not expect the parameter to be present.::
def some_remote_method(self, arg1, arg2, newarg=None): def some_remote_method(self, arg1, arg2, newarg=None):
# The code needs to deal with newarg=None for cases # The code needs to deal with newarg=None for cases

View File

@@ -18,11 +18,15 @@ queues. Casts will block, but this is very useful for tests.
""" """
import inspect import inspect
# NOTE(russellb): We specifically want to use json, not our own jsonutils.
# jsonutils has some extra logic to automatically convert objects to primitive
# types so that they can be serialized. We want to catch all cases where
# non-primitive types make it into this code and treat it as an error.
import json
import time import time
import eventlet import eventlet
from nova.openstack.common import jsonutils
from nova.openstack.common.rpc import common as rpc_common from nova.openstack.common.rpc import common as rpc_common
CONSUMERS = {} CONSUMERS = {}
@@ -121,7 +125,7 @@ def create_connection(conf, new=True):
def check_serialize(msg): def check_serialize(msg):
"""Make sure a message intended for rpc can be serialized.""" """Make sure a message intended for rpc can be serialized."""
jsonutils.dumps(msg) json.dumps(msg)
def multicall(conf, context, topic, msg, timeout=None): def multicall(conf, context, topic, msg, timeout=None):
@@ -154,6 +158,7 @@ def call(conf, context, topic, msg, timeout=None):
def cast(conf, context, topic, msg): def cast(conf, context, topic, msg):
check_serialize(msg)
try: try:
call(conf, context, topic, msg) call(conf, context, topic, msg)
except Exception: except Exception:

View File

@@ -17,7 +17,6 @@
import functools import functools
import itertools import itertools
import logging
import time import time
import uuid import uuid
@@ -29,6 +28,7 @@ import qpid.messaging.exceptions
from nova.openstack.common import cfg from nova.openstack.common import cfg
from nova.openstack.common.gettextutils import _ from nova.openstack.common.gettextutils import _
from nova.openstack.common import jsonutils from nova.openstack.common import jsonutils
from nova.openstack.common import log as logging
from nova.openstack.common.rpc import amqp as rpc_amqp from nova.openstack.common.rpc import amqp as rpc_amqp
from nova.openstack.common.rpc import common as rpc_common from nova.openstack.common.rpc import common as rpc_common
@@ -41,6 +41,9 @@ qpid_opts = [
cfg.StrOpt('qpid_port', cfg.StrOpt('qpid_port',
default='5672', default='5672',
help='Qpid broker port'), help='Qpid broker port'),
cfg.ListOpt('qpid_hosts',
default=['$qpid_hostname:$qpid_port'],
help='Qpid HA cluster host:port pairs'),
cfg.StrOpt('qpid_username', cfg.StrOpt('qpid_username',
default='', default='',
help='Username for qpid connection'), help='Username for qpid connection'),
@@ -277,22 +280,21 @@ class Connection(object):
self.conf = conf self.conf = conf
params = { params = {
'hostname': self.conf.qpid_hostname, 'qpid_hosts': self.conf.qpid_hosts,
'port': self.conf.qpid_port,
'username': self.conf.qpid_username, 'username': self.conf.qpid_username,
'password': self.conf.qpid_password, 'password': self.conf.qpid_password,
} }
params.update(server_params or {}) params.update(server_params or {})
self.broker = params['hostname'] + ":" + str(params['port']) self.brokers = params['qpid_hosts']
self.username = params['username'] self.username = params['username']
self.password = params['password'] self.password = params['password']
self.connection_create() self.connection_create(self.brokers[0])
self.reconnect() self.reconnect()
def connection_create(self): def connection_create(self, broker):
# Create the connection - this does not open the connection # Create the connection - this does not open the connection
self.connection = qpid.messaging.Connection(self.broker) self.connection = qpid.messaging.Connection(broker)
# Check if flags are set and if so set them for the connection # Check if flags are set and if so set them for the connection
# before we call open # before we call open
@@ -320,10 +322,14 @@ class Connection(object):
except qpid.messaging.exceptions.ConnectionError: except qpid.messaging.exceptions.ConnectionError:
pass pass
attempt = 0
delay = 1 delay = 1
while True: while True:
broker = self.brokers[attempt % len(self.brokers)]
attempt += 1
try: try:
self.connection_create() self.connection_create(broker)
self.connection.open() self.connection.open()
except qpid.messaging.exceptions.ConnectionError, e: except qpid.messaging.exceptions.ConnectionError, e:
msg_dict = dict(e=e, delay=delay) msg_dict = dict(e=e, delay=delay)
@@ -333,10 +339,9 @@ class Connection(object):
time.sleep(delay) time.sleep(delay)
delay = min(2 * delay, 60) delay = min(2 * delay, 60)
else: else:
LOG.info(_('Connected to AMQP server on %s'), broker)
break break
LOG.info(_('Connected to AMQP server on %s'), self.broker)
self.session = self.connection.session() self.session = self.connection.session()
if self.consumers: if self.consumers:

View File

@@ -21,10 +21,10 @@ return keys for direct exchanges, per (approximate) AMQP parlance.
import contextlib import contextlib
import itertools import itertools
import json import json
import logging
from nova.openstack.common import cfg from nova.openstack.common import cfg
from nova.openstack.common.gettextutils import _ from nova.openstack.common.gettextutils import _
from nova.openstack.common import log as logging
matchmaker_opts = [ matchmaker_opts = [

View File

@@ -57,6 +57,11 @@ class Service(service.Service):
self.conn.create_consumer(self.topic, dispatcher, fanout=True) self.conn.create_consumer(self.topic, dispatcher, fanout=True)
# Hook to allow the manager to do other initializations after
# the rpc connection is created.
if callable(getattr(self.manager, 'initialize_service_hook', None)):
self.manager.initialize_service_hook(self)
# Consume from all consumers in a thread # Consume from all consumers in a thread
self.conn.consume_in_thread() self.conn.consume_in_thread()