Synced rpc and gettextutils modules from oslo-incubator

The main reason for sync is to get the following oslo-rpc fixes in Neutron:
* I537015f452eb770acba41fdedfe221628f52a920 (reduces delays when reconnecting
  to Qpid in HA deployments)
* Ia148baa6e1ec632789ac3621c85173c2c16f3918 (fixed HA failover, Qpid part)
* I67923cb024bbd143edc8edccf35b9b400df31eb3 (fixed HA failover, RabbitMQ part)

Latest oslo-incubator commit at the moment of sync:
* 2eab986ef3c43f8d1e25065e3cbc1307860c25c7

Change-Id: I2f5bb0d195e050f755ecdbf06a6bbed587a04fbe
Closes-Bug: 1281148
Closes-Bug: 1261631
This commit is contained in:
Ihar Hrachyshka 2014-03-17 14:18:28 +01:00
parent e75f4854a6
commit 665222b38b
13 changed files with 207 additions and 181 deletions

View File

@ -0,0 +1,17 @@
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import six
six.add_move(six.MovedModule('mox', 'mox', 'mox3.mox'))

View File

@ -23,11 +23,11 @@ Usual usage in an openstack.common module:
"""
import copy
import functools
import gettext
import locale
from logging import handlers
import os
import re
from babel import localedata
import six
@ -35,6 +35,17 @@ import six
_localedir = os.environ.get('neutron'.upper() + '_LOCALEDIR')
_t = gettext.translation('neutron', localedir=_localedir, fallback=True)
# We use separate translation catalogs for each log level, so set up a
# mapping between the log level name and the translator. The domain
# for the log level is project_name + "-log-" + log_level so messages
# for each level end up in their own catalog.
_t_log_levels = dict(
(level, gettext.translation('neutron' + '-log-' + level,
localedir=_localedir,
fallback=True))
for level in ['info', 'warning', 'error', 'critical']
)
_AVAILABLE_LANGUAGES = {}
USE_LAZY = False
@ -60,6 +71,28 @@ def _(msg):
return _t.ugettext(msg)
def _log_translation(msg, level):
"""Build a single translation of a log message
"""
if USE_LAZY:
return Message(msg, domain='neutron' + '-log-' + level)
else:
translator = _t_log_levels[level]
if six.PY3:
return translator.gettext(msg)
return translator.ugettext(msg)
# Translators for log levels.
#
# The abbreviated names are meant to reflect the usual use of a short
# name like '_'. The "L" is for "log" and the other letter comes from
# the level.
_LI = functools.partial(_log_translation, level='info')
_LW = functools.partial(_log_translation, level='warning')
_LE = functools.partial(_log_translation, level='error')
_LC = functools.partial(_log_translation, level='critical')
def install(domain, lazy=False):
"""Install a _() function using the given translation domain.
@ -118,7 +151,8 @@ class Message(six.text_type):
and can be treated as such.
"""
def __new__(cls, msgid, msgtext=None, params=None, domain='neutron', *args):
def __new__(cls, msgid, msgtext=None, params=None,
domain='neutron', *args):
"""Create a new Message object.
In order for translation to work gettext requires a message ID, this
@ -213,47 +247,22 @@ class Message(six.text_type):
if other is None:
params = (other,)
elif isinstance(other, dict):
params = self._trim_dictionary_parameters(other)
# Merge the dictionaries
# Copy each item in case one does not support deep copy.
params = {}
if isinstance(self.params, dict):
for key, val in self.params.items():
params[key] = self._copy_param(val)
for key, val in other.items():
params[key] = self._copy_param(val)
else:
params = self._copy_param(other)
return params
def _trim_dictionary_parameters(self, dict_param):
"""Return a dict that only has matching entries in the msgid."""
# NOTE(luisg): Here we trim down the dictionary passed as parameters
# to avoid carrying a lot of unnecessary weight around in the message
# object, for example if someone passes in Message() % locals() but
# only some params are used, and additionally we prevent errors for
# non-deepcopyable objects by unicoding() them.
# Look for %(param) keys in msgid;
# Skip %% and deal with the case where % is first character on the line
keys = re.findall('(?:[^%]|^)?%\((\w*)\)[a-z]', self.msgid)
# If we don't find any %(param) keys but have a %s
if not keys and re.findall('(?:[^%]|^)%[a-z]', self.msgid):
# Apparently the full dictionary is the parameter
params = self._copy_param(dict_param)
else:
params = {}
# Save our existing parameters as defaults to protect
# ourselves from losing values if we are called through an
# (erroneous) chain that builds a valid Message with
# arguments, and then does something like "msg % kwds"
# where kwds is an empty dictionary.
src = {}
if isinstance(self.params, dict):
src.update(self.params)
src.update(dict_param)
for key in keys:
params[key] = self._copy_param(src[key])
return params
def _copy_param(self, param):
try:
return copy.deepcopy(param)
except TypeError:
except Exception:
# Fallback to casting to unicode this will handle the
# python code-like objects that can't be deep-copied
return six.text_type(param)
@ -297,9 +306,27 @@ def get_available_languages(domain):
list_identifiers = (getattr(localedata, 'list', None) or
getattr(localedata, 'locale_identifiers'))
locale_identifiers = list_identifiers()
for i in locale_identifiers:
if find(i) is not None:
language_list.append(i)
# NOTE(luisg): Babel>=1.0,<1.3 has a bug where some OpenStack supported
# locales (e.g. 'zh_CN', and 'zh_TW') aren't supported even though they
# are perfectly legitimate locales:
# https://github.com/mitsuhiko/babel/issues/37
# In Babel 1.3 they fixed the bug and they support these locales, but
# they are still not explicitly "listed" by locale_identifiers().
# That is why we add the locales here explicitly if necessary so that
# they are listed as supported.
aliases = {'zh': 'zh_CN',
'zh_Hant_HK': 'zh_HK',
'zh_Hant': 'zh_TW',
'fil': 'tl_PH'}
for (locale, alias) in six.iteritems(aliases):
if locale in language_list and alias not in language_list:
language_list.append(alias)
_AVAILABLE_LANGUAGES[domain] = language_list
return copy.copy(language_list)

View File

@ -23,13 +23,9 @@ For some wrappers that add message versioning to rpc, see:
rpc.proxy
"""
import inspect
from oslo.config import cfg
from neutron.openstack.common.gettextutils import _
from neutron.openstack.common import importutils
from neutron.openstack.common import local
from neutron.openstack.common import log as logging
@ -93,24 +89,7 @@ def create_connection(new=True):
return _get_impl().create_connection(CONF, new=new)
def _check_for_lock():
if not CONF.debug:
return None
if ((hasattr(local.strong_store, 'locks_held')
and local.strong_store.locks_held)):
stack = ' :: '.join([frame[3] for frame in inspect.stack()])
LOG.warn(_('A RPC is being made while holding a lock. The locks '
'currently held are %(locks)s. This is probably a bug. '
'Please report it. Include the following: [%(stack)s].'),
{'locks': local.strong_store.locks_held,
'stack': stack})
return True
return False
def call(context, topic, msg, timeout=None, check_for_lock=False):
def call(context, topic, msg, timeout=None):
"""Invoke a remote method that returns something.
:param context: Information that identifies the user that has made this
@ -124,16 +103,12 @@ def call(context, topic, msg, timeout=None, check_for_lock=False):
"args" : dict_of_kwargs }
:param timeout: int, number of seconds to use for a response timeout.
If set, this overrides the rpc_response_timeout option.
:param check_for_lock: if True, a warning is emitted if a RPC call is made
with a lock held.
:returns: A dict from the remote method.
:raises: openstack.common.rpc.common.Timeout if a complete response
is not received before the timeout is reached.
"""
if check_for_lock:
_check_for_lock()
return _get_impl().call(CONF, context, topic, msg, timeout)
@ -176,7 +151,7 @@ def fanout_cast(context, topic, msg):
return _get_impl().fanout_cast(CONF, context, topic, msg)
def multicall(context, topic, msg, timeout=None, check_for_lock=False):
def multicall(context, topic, msg, timeout=None):
"""Invoke a remote method and get back an iterator.
In this case, the remote method will be returning multiple values in
@ -194,8 +169,6 @@ def multicall(context, topic, msg, timeout=None, check_for_lock=False):
"args" : dict_of_kwargs }
:param timeout: int, number of seconds to use for a response timeout.
If set, this overrides the rpc_response_timeout option.
:param check_for_lock: if True, a warning is emitted if a RPC call is made
with a lock held.
:returns: An iterator. The iterator will yield a tuple (N, X) where N is
an index that starts at 0 and increases by one for each value
@ -205,8 +178,6 @@ def multicall(context, topic, msg, timeout=None, check_for_lock=False):
:raises: openstack.common.rpc.common.Timeout if a complete response
is not received before the timeout is reached.
"""
if check_for_lock:
_check_for_lock()
return _get_impl().multicall(CONF, context, topic, msg, timeout)

View File

@ -37,7 +37,7 @@ import six
from neutron.openstack.common import excutils
from neutron.openstack.common.gettextutils import _
from neutron.openstack.common.gettextutils import _, _LE
from neutron.openstack.common import local
from neutron.openstack.common import log as logging
from neutron.openstack.common.rpc import common as rpc_common
@ -72,7 +72,7 @@ class Pool(pools.Pool):
# TODO(comstud): Timeout connections not used in a while
def create(self):
LOG.debug(_('Pool creating new connection'))
LOG.debug('Pool creating new connection')
return self.connection_cls(self.conf)
def empty(self):
@ -287,7 +287,7 @@ def unpack_context(conf, msg):
context_dict['reply_q'] = msg.pop('_reply_q', None)
context_dict['conf'] = conf
ctx = RpcContext.from_dict(context_dict)
rpc_common._safe_log(LOG.debug, _('unpacked context: %s'), ctx.to_dict())
rpc_common._safe_log(LOG.debug, 'unpacked context: %s', ctx.to_dict())
return ctx
@ -339,7 +339,7 @@ def _add_unique_id(msg):
"""Add unique_id for checking duplicate messages."""
unique_id = uuid.uuid4().hex
msg.update({UNIQUE_ID: unique_id})
LOG.debug(_('UNIQUE_ID is %s.') % (unique_id))
LOG.debug('UNIQUE_ID is %s.' % (unique_id))
class _ThreadPoolWithWait(object):
@ -432,7 +432,7 @@ class ProxyCallback(_ThreadPoolWithWait):
# the previous context is stored in local.store.context
if hasattr(local.store, 'context'):
del local.store.context
rpc_common._safe_log(LOG.debug, _('received %s'), message_data)
rpc_common._safe_log(LOG.debug, 'received %s', message_data)
self.msg_id_cache.check_duplicate_message(message_data)
ctxt = unpack_context(self.conf, message_data)
method = message_data.get('method')
@ -469,7 +469,7 @@ class ProxyCallback(_ThreadPoolWithWait):
# This final None tells multicall that it is done.
ctxt.reply(ending=True, connection_pool=self.connection_pool)
except rpc_common.ClientException as e:
LOG.debug(_('Expected exception during message handling (%s)') %
LOG.debug('Expected exception during message handling (%s)' %
e._exc_info[1])
ctxt.reply(None, e._exc_info,
connection_pool=self.connection_pool,
@ -477,7 +477,7 @@ class ProxyCallback(_ThreadPoolWithWait):
except Exception:
# sys.exc_info() is deleted by LOG.exception().
exc_info = sys.exc_info()
LOG.error(_('Exception during message handling'),
LOG.error(_LE('Exception during message handling'),
exc_info=exc_info)
ctxt.reply(None, exc_info, connection_pool=self.connection_pool)
@ -551,10 +551,10 @@ _reply_proxy_create_sem = semaphore.Semaphore()
def multicall(conf, context, topic, msg, timeout, connection_pool):
"""Make a call that returns multiple times."""
LOG.debug(_('Making synchronous call on %s ...'), topic)
LOG.debug('Making synchronous call on %s ...', topic)
msg_id = uuid.uuid4().hex
msg.update({'_msg_id': msg_id})
LOG.debug(_('MSG_ID is %s') % (msg_id))
LOG.debug('MSG_ID is %s' % (msg_id))
_add_unique_id(msg)
pack_context(msg, context)
@ -580,7 +580,7 @@ def call(conf, context, topic, msg, timeout, connection_pool):
def cast(conf, context, topic, msg, connection_pool):
"""Sends a message on a topic without waiting for a response."""
LOG.debug(_('Making asynchronous cast on %s...'), topic)
LOG.debug('Making asynchronous cast on %s...', topic)
_add_unique_id(msg)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
@ -589,7 +589,7 @@ def cast(conf, context, topic, msg, connection_pool):
def fanout_cast(conf, context, topic, msg, connection_pool):
"""Sends a message on a fanout exchange without waiting for a response."""
LOG.debug(_('Making asynchronous fanout cast...'))
LOG.debug('Making asynchronous fanout cast...')
_add_unique_id(msg)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
@ -617,7 +617,7 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg,
def notify(conf, context, topic, msg, connection_pool, envelope):
"""Sends a notification event on a topic."""
LOG.debug(_('Sending %(event_type)s on %(topic)s'),
LOG.debug('Sending %(event_type)s on %(topic)s',
dict(event_type=msg.get('event_type'),
topic=topic))
_add_unique_id(msg)

View File

@ -22,7 +22,7 @@ import traceback
from oslo.config import cfg
import six
from neutron.openstack.common.gettextutils import _
from neutron.openstack.common.gettextutils import _, _LE
from neutron.openstack.common import importutils
from neutron.openstack.common import jsonutils
from neutron.openstack.common import local
@ -85,7 +85,7 @@ class RPCException(Exception):
except Exception:
# kwargs doesn't match a variable in the message
# log the issue and the kwargs
LOG.exception(_('Exception in string format operation'))
LOG.exception(_LE('Exception in string format operation'))
for name, value in six.iteritems(kwargs):
LOG.error("%s: %s" % (name, value))
# at least get the core message out if something happened
@ -269,6 +269,10 @@ def _safe_log(log_func, msg, msg_data):
d[k] = '<SANITIZED>'
elif k.lower() in SANITIZE:
d[k] = '<SANITIZED>'
elif isinstance(d[k], list):
for e in d[k]:
if isinstance(e, dict):
_fix_passwords(e)
elif isinstance(d[k], dict):
_fix_passwords(d[k])
return d
@ -285,7 +289,7 @@ def serialize_remote_exception(failure_info, log_failure=True):
tb = traceback.format_exception(*failure_info)
failure = failure_info[1]
if log_failure:
LOG.error(_("Returning exception %s to caller"),
LOG.error(_LE("Returning exception %s to caller"),
six.text_type(failure))
LOG.error(tb)

View File

@ -11,6 +11,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Fake RPC implementation which calls proxy methods directly with no
queues. Casts will block, but this is very useful for tests.
"""
@ -139,8 +140,8 @@ def multicall(conf, context, topic, msg, timeout=None):
if not method:
return
args = msg.get('args', {})
version = msg.get('version', None)
namespace = msg.get('namespace', None)
version = msg.get('version')
namespace = msg.get('namespace')
try:
consumer = CONSUMERS[topic][0]
@ -184,8 +185,8 @@ def fanout_cast(conf, context, topic, msg):
if not method:
return
args = msg.get('args', {})
version = msg.get('version', None)
namespace = msg.get('namespace', None)
version = msg.get('version')
namespace = msg.get('namespace')
for consumer in CONSUMERS.get(topic, []):
try:

View File

@ -29,7 +29,7 @@ from oslo.config import cfg
import six
from neutron.openstack.common import excutils
from neutron.openstack.common.gettextutils import _
from neutron.openstack.common.gettextutils import _, _LE, _LI
from neutron.openstack.common import network_utils
from neutron.openstack.common.rpc import amqp as rpc_amqp
from neutron.openstack.common.rpc import common as rpc_common
@ -38,9 +38,9 @@ from neutron.openstack.common import sslutils
kombu_opts = [
cfg.StrOpt('kombu_ssl_version',
default='',
help='SSL version to use (valid only if SSL enabled). '
'valid values are TLSv1, SSLv23 and SSLv3. SSLv2 may '
'be available on some distributions'
help='If SSL is enabled, the SSL version to use. Valid '
'values are TLSv1, SSLv23 and SSLv3. SSLv2 might '
'be available on some distributions.'
),
cfg.StrOpt('kombu_ssl_keyfile',
default='',
@ -63,33 +63,33 @@ kombu_opts = [
help='RabbitMQ HA cluster host:port pairs'),
cfg.BoolOpt('rabbit_use_ssl',
default=False,
help='connect over SSL for RabbitMQ'),
help='Connect over SSL for RabbitMQ'),
cfg.StrOpt('rabbit_userid',
default='guest',
help='the RabbitMQ userid'),
help='The RabbitMQ userid'),
cfg.StrOpt('rabbit_password',
default='guest',
help='the RabbitMQ password',
help='The RabbitMQ password',
secret=True),
cfg.StrOpt('rabbit_virtual_host',
default='/',
help='the RabbitMQ virtual host'),
help='The RabbitMQ virtual host'),
cfg.IntOpt('rabbit_retry_interval',
default=1,
help='how frequently to retry connecting with RabbitMQ'),
help='How frequently to retry connecting with RabbitMQ'),
cfg.IntOpt('rabbit_retry_backoff',
default=2,
help='how long to backoff for between retries when connecting '
help='How long to backoff for between retries when connecting '
'to RabbitMQ'),
cfg.IntOpt('rabbit_max_retries',
default=0,
help='maximum retries with trying to connect to RabbitMQ '
'(the default of 0 implies an infinite retry count)'),
help='Maximum number of RabbitMQ connection retries. '
'Default is 0 (infinite retry count)'),
cfg.BoolOpt('rabbit_ha_queues',
default=False,
help='use H/A queues in RabbitMQ (x-ha-policy: all).'
'You need to wipe RabbitMQ database when '
'changing this option.'),
help='Use HA queues in RabbitMQ (x-ha-policy: all). '
'If you change this option, you must wipe the '
'RabbitMQ database.'),
]
@ -153,12 +153,12 @@ class ConsumerBase(object):
callback(msg)
except Exception:
if self.ack_on_error:
LOG.exception(_("Failed to process message"
" ... skipping it."))
LOG.exception(_LE("Failed to process message"
" ... skipping it."))
message.ack()
else:
LOG.exception(_("Failed to process message"
" ... will requeue."))
LOG.exception(_LE("Failed to process message"
" ... will requeue."))
message.requeue()
else:
message.ack()
@ -458,6 +458,9 @@ class Connection(object):
self.params_list = params_list
brokers_count = len(self.params_list)
self.next_broker_indices = itertools.cycle(range(brokers_count))
self.memory_transport = self.conf.fake_rabbit
self.connection = None
@ -492,7 +495,7 @@ class Connection(object):
be handled by the caller.
"""
if self.connection:
LOG.info(_("Reconnecting to AMQP server on "
LOG.info(_LI("Reconnecting to AMQP server on "
"%(hostname)s:%(port)d") % params)
try:
self.connection.release()
@ -514,7 +517,7 @@ class Connection(object):
self.channel._new_queue('ae.undeliver')
for consumer in self.consumers:
consumer.reconnect(self.channel)
LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d') %
LOG.info(_LI('Connected to AMQP server on %(hostname)s:%(port)d') %
params)
def reconnect(self):
@ -528,7 +531,7 @@ class Connection(object):
attempt = 0
while True:
params = self.params_list[attempt % len(self.params_list)]
params = self.params_list[next(self.next_broker_indices)]
attempt += 1
try:
self._connect(params)
@ -565,9 +568,9 @@ class Connection(object):
sleep_time = min(sleep_time, self.interval_max)
log_info['sleep_time'] = sleep_time
LOG.error(_('AMQP server on %(hostname)s:%(port)d is '
'unreachable: %(err_str)s. Trying again in '
'%(sleep_time)d seconds.') % log_info)
LOG.error(_LE('AMQP server on %(hostname)s:%(port)d is '
'unreachable: %(err_str)s. Trying again in '
'%(sleep_time)d seconds.') % log_info)
time.sleep(sleep_time)
def ensure(self, error_callback, method, *args, **kwargs):
@ -619,7 +622,7 @@ class Connection(object):
def _connect_error(exc):
log_info = {'topic': topic, 'err_str': str(exc)}
LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
LOG.error(_LE("Failed to declare consumer for topic '%(topic)s': "
"%(err_str)s") % log_info)
def _declare_consumer():
@ -637,11 +640,11 @@ class Connection(object):
def _error_callback(exc):
if isinstance(exc, socket.timeout):
LOG.debug(_('Timed out waiting for RPC response: %s') %
LOG.debug('Timed out waiting for RPC response: %s' %
str(exc))
raise rpc_common.Timeout()
else:
LOG.exception(_('Failed to consume message from queue: %s') %
LOG.exception(_LE('Failed to consume message from queue: %s') %
str(exc))
info['do_consume'] = True
@ -680,7 +683,7 @@ class Connection(object):
def _error_callback(exc):
log_info = {'topic': topic, 'err_str': str(exc)}
LOG.exception(_("Failed to publish message to topic "
LOG.exception(_LE("Failed to publish message to topic "
"'%(topic)s': %(err_str)s") % log_info)
def _publish():

View File

@ -23,7 +23,7 @@ from oslo.config import cfg
import six
from neutron.openstack.common import excutils
from neutron.openstack.common.gettextutils import _
from neutron.openstack.common.gettextutils import _, _LE, _LI
from neutron.openstack.common import importutils
from neutron.openstack.common import jsonutils
from neutron.openstack.common import log as logging
@ -188,7 +188,7 @@ class ConsumerBase(object):
msg = rpc_common.deserialize_msg(message.content)
self.callback(msg)
except Exception:
LOG.exception(_("Failed to process message... skipping it."))
LOG.exception(_LE("Failed to process message... skipping it."))
finally:
# TODO(sandy): Need support for optional ack_on_error.
self.session.acknowledge(message)
@ -467,6 +467,10 @@ class Connection(object):
self.brokers = params['qpid_hosts']
self.username = params['username']
self.password = params['password']
brokers_count = len(self.brokers)
self.next_broker_indices = itertools.cycle(range(brokers_count))
self.connection_create(self.brokers[0])
self.reconnect()
@ -494,7 +498,6 @@ class Connection(object):
def reconnect(self):
"""Handles reconnecting and re-establishing sessions and queues."""
attempt = 0
delay = 1
while True:
# Close the session if necessary
@ -504,21 +507,20 @@ class Connection(object):
except qpid_exceptions.ConnectionError:
pass
broker = self.brokers[attempt % len(self.brokers)]
attempt += 1
broker = self.brokers[next(self.next_broker_indices)]
try:
self.connection_create(broker)
self.connection.open()
except qpid_exceptions.ConnectionError as e:
msg_dict = dict(e=e, delay=delay)
msg = _("Unable to connect to AMQP server: %(e)s. "
"Sleeping %(delay)s seconds") % msg_dict
msg = _LE("Unable to connect to AMQP server: %(e)s. "
"Sleeping %(delay)s seconds") % msg_dict
LOG.error(msg)
time.sleep(delay)
delay = min(2 * delay, 60)
delay = min(delay + 1, 5)
else:
LOG.info(_('Connected to AMQP server on %s'), broker)
LOG.info(_LI('Connected to AMQP server on %s'), broker)
break
self.session = self.connection.session()
@ -531,7 +533,7 @@ class Connection(object):
consumer.reconnect(self.session)
self._register_consumer(consumer)
LOG.debug(_("Re-established AMQP queues"))
LOG.debug("Re-established AMQP queues")
def ensure(self, error_callback, method, *args, **kwargs):
while True:
@ -570,7 +572,7 @@ class Connection(object):
"""
def _connect_error(exc):
log_info = {'topic': topic, 'err_str': str(exc)}
LOG.error(_("Failed to declare consumer for topic '%(topic)s': "
LOG.error(_LE("Failed to declare consumer for topic '%(topic)s': "
"%(err_str)s") % log_info)
def _declare_consumer():
@ -585,11 +587,11 @@ class Connection(object):
def _error_callback(exc):
if isinstance(exc, qpid_exceptions.Empty):
LOG.debug(_('Timed out waiting for RPC response: %s') %
LOG.debug('Timed out waiting for RPC response: %s' %
str(exc))
raise rpc_common.Timeout()
else:
LOG.exception(_('Failed to consume message from queue: %s') %
LOG.exception(_LE('Failed to consume message from queue: %s') %
str(exc))
def _consume():
@ -597,7 +599,7 @@ class Connection(object):
try:
self._lookup_consumer(nxt_receiver).consume()
except Exception:
LOG.exception(_("Error processing message. Skipping it."))
LOG.exception(_LE("Error processing message. Skipping it."))
for iteration in itertools.count(0):
if limit and iteration >= limit:
@ -624,7 +626,7 @@ class Connection(object):
def _connect_error(exc):
log_info = {'topic': topic, 'err_str': str(exc)}
LOG.exception(_("Failed to publish message to topic "
LOG.exception(_LE("Failed to publish message to topic "
"'%(topic)s': %(err_str)s") % log_info)
def _publisher_send():

View File

@ -27,7 +27,7 @@ import six
from six import moves
from neutron.openstack.common import excutils
from neutron.openstack.common.gettextutils import _
from neutron.openstack.common.gettextutils import _, _LE, _LI
from neutron.openstack.common import importutils
from neutron.openstack.common import jsonutils
from neutron.openstack.common.rpc import common as rpc_common
@ -80,7 +80,7 @@ CONF = cfg.CONF
CONF.register_opts(zmq_opts)
ZMQ_CTX = None # ZeroMQ Context, must be global.
matchmaker = None # memoized matchmaker object
matchmaker = None # memorized matchmaker object
def _serialize(data):
@ -93,12 +93,12 @@ def _serialize(data):
return jsonutils.dumps(data, ensure_ascii=True)
except TypeError:
with excutils.save_and_reraise_exception():
LOG.error(_("JSON serialization failed."))
LOG.error(_LE("JSON serialization failed."))
def _deserialize(data):
"""Deserialization wrapper."""
LOG.debug(_("Deserializing: %s"), data)
LOG.debug("Deserializing: %s", data)
return jsonutils.loads(data)
@ -133,9 +133,9 @@ class ZmqSocket(object):
str_data = {'addr': addr, 'type': self.socket_s(),
'subscribe': subscribe, 'bind': bind}
LOG.debug(_("Connecting to %(addr)s with %(type)s"), str_data)
LOG.debug(_("-> Subscribed to %(subscribe)s"), str_data)
LOG.debug(_("-> bind: %(bind)s"), str_data)
LOG.debug("Connecting to %(addr)s with %(type)s", str_data)
LOG.debug("-> Subscribed to %(subscribe)s", str_data)
LOG.debug("-> bind: %(bind)s", str_data)
try:
if bind:
@ -155,7 +155,7 @@ class ZmqSocket(object):
"""Subscribe."""
if not self.can_sub:
raise RPCException("Cannot subscribe on this socket.")
LOG.debug(_("Subscribing to %s"), msg_filter)
LOG.debug("Subscribing to %s", msg_filter)
try:
self.sock.setsockopt(zmq.SUBSCRIBE, msg_filter)
@ -192,7 +192,7 @@ class ZmqSocket(object):
# it would be much worse if some of the code calling this
# were to fail. For now, lets log, and later evaluate
# if we can safely raise here.
LOG.error(_("ZeroMQ socket could not be closed."))
LOG.error(_LE("ZeroMQ socket could not be closed."))
self.sock = None
def recv(self, **kwargs):
@ -264,7 +264,7 @@ class InternalContext(object):
def _get_response(self, ctx, proxy, topic, data):
"""Process a curried message and cast the result to topic."""
LOG.debug(_("Running func with context: %s"), ctx.to_dict())
LOG.debug("Running func with context: %s", ctx.to_dict())
data.setdefault('version', None)
data.setdefault('args', {})
@ -277,13 +277,13 @@ class InternalContext(object):
# ignore these since they are just from shutdowns
pass
except rpc_common.ClientException as e:
LOG.debug(_("Expected exception during message handling (%s)") %
LOG.debug("Expected exception during message handling (%s)" %
e._exc_info[1])
return {'exc':
rpc_common.serialize_remote_exception(e._exc_info,
log_failure=False)}
except Exception:
LOG.error(_("Exception during message handling"))
LOG.error(_LE("Exception during message handling"))
return {'exc':
rpc_common.serialize_remote_exception(sys.exc_info())}
@ -302,7 +302,7 @@ class InternalContext(object):
self._get_response(ctx, proxy, topic, payload),
ctx.replies)
LOG.debug(_("Sending reply"))
LOG.debug("Sending reply")
_multi_send(_cast, ctx, topic, {
'method': '-process_reply',
'args': {
@ -336,7 +336,7 @@ class ConsumerBase(object):
# processed internally. (non-valid method name)
method = data.get('method')
if not method:
LOG.error(_("RPC message did not include method."))
LOG.error(_LE("RPC message did not include method."))
return
# Internal method
@ -368,7 +368,7 @@ class ZmqBaseReactor(ConsumerBase):
def register(self, proxy, in_addr, zmq_type_in,
in_bind=True, subscribe=None):
LOG.info(_("Registering reactor"))
LOG.info(_LI("Registering reactor"))
if zmq_type_in not in (zmq.PULL, zmq.SUB):
raise RPCException("Bad input socktype")
@ -380,12 +380,12 @@ class ZmqBaseReactor(ConsumerBase):
self.proxies[inq] = proxy
self.sockets.append(inq)
LOG.info(_("In reactor registered"))
LOG.info(_LI("In reactor registered"))
def consume_in_thread(self):
@excutils.forever_retry_uncaught_exceptions
def _consume(sock):
LOG.info(_("Consuming socket"))
LOG.info(_LI("Consuming socket"))
while True:
self.consume(sock)
@ -435,7 +435,7 @@ class ZmqProxy(ZmqBaseReactor):
if topic not in self.topic_proxy:
def publisher(waiter):
LOG.info(_("Creating proxy for topic: %s"), topic)
LOG.info(_LI("Creating proxy for topic: %s"), topic)
try:
# The topic is received over the network,
@ -473,14 +473,14 @@ class ZmqProxy(ZmqBaseReactor):
try:
wait_sock_creation.wait()
except RPCException:
LOG.error(_("Topic socket file creation failed."))
LOG.error(_LE("Topic socket file creation failed."))
return
try:
self.topic_proxy[topic].put_nowait(data)
except eventlet.queue.Full:
LOG.error(_("Local per-topic backlog buffer full for topic "
"%(topic)s. Dropping message.") % {'topic': topic})
LOG.error(_LE("Local per-topic backlog buffer full for topic "
"%(topic)s. Dropping message.") % {'topic': topic})
def consume_in_thread(self):
"""Runs the ZmqProxy service."""
@ -495,8 +495,8 @@ class ZmqProxy(ZmqBaseReactor):
except os.error:
if not os.path.isdir(ipc_dir):
with excutils.save_and_reraise_exception():
LOG.error(_("Required IPC directory does not exist at"
" %s") % (ipc_dir, ))
LOG.error(_LE("Required IPC directory does not exist at"
" %s") % (ipc_dir, ))
try:
self.register(consumption_proxy,
consume_in,
@ -504,11 +504,11 @@ class ZmqProxy(ZmqBaseReactor):
except zmq.ZMQError:
if os.access(ipc_dir, os.X_OK):
with excutils.save_and_reraise_exception():
LOG.error(_("Permission denied to IPC directory at"
" %s") % (ipc_dir, ))
LOG.error(_LE("Permission denied to IPC directory at"
" %s") % (ipc_dir, ))
with excutils.save_and_reraise_exception():
LOG.error(_("Could not create ZeroMQ receiver daemon. "
"Socket may already be in use."))
LOG.error(_LE("Could not create ZeroMQ receiver daemon. "
"Socket may already be in use."))
super(ZmqProxy, self).consume_in_thread()
@ -541,7 +541,7 @@ class ZmqReactor(ZmqBaseReactor):
def consume(self, sock):
#TODO(ewindisch): use zero-copy (i.e. references, not copying)
data = sock.recv()
LOG.debug(_("CONSUMER RECEIVED DATA: %s"), data)
LOG.debug("CONSUMER RECEIVED DATA: %s", data)
proxy = self.proxies[sock]
@ -560,7 +560,7 @@ class ZmqReactor(ZmqBaseReactor):
# Unmarshal only after verifying the message.
ctx = RpcContext.unmarshal(data[3])
else:
LOG.error(_("ZMQ Envelope version unsupported or unknown."))
LOG.error(_LE("ZMQ Envelope version unsupported or unknown."))
return
self.pool.spawn_n(self.process, proxy, ctx, request)
@ -588,14 +588,14 @@ class Connection(rpc_common.Connection):
topic = '.'.join((topic.split('.', 1)[0], CONF.rpc_zmq_host))
if topic in self.topics:
LOG.info(_("Skipping topic registration. Already registered."))
LOG.info(_LI("Skipping topic registration. Already registered."))
return
# Receive messages from (local) proxy
inaddr = "ipc://%s/zmq_topic_%s" % \
(CONF.rpc_zmq_ipc_dir, topic)
LOG.debug(_("Consumer is a zmq.%s"),
LOG.debug("Consumer is a zmq.%s",
['PULL', 'SUB'][sock_type == zmq.SUB])
self.reactor.register(proxy, inaddr, sock_type,
@ -647,7 +647,7 @@ def _call(addr, context, topic, msg, timeout=None,
# Replies always come into the reply service.
reply_topic = "zmq_replies.%s" % CONF.rpc_zmq_host
LOG.debug(_("Creating payload"))
LOG.debug("Creating payload")
# Curry the original request into a reply method.
mcontext = RpcContext.marshal(context)
payload = {
@ -660,7 +660,7 @@ def _call(addr, context, topic, msg, timeout=None,
}
}
LOG.debug(_("Creating queue socket for reply waiter"))
LOG.debug("Creating queue socket for reply waiter")
# Messages arriving async.
# TODO(ewindisch): have reply consumer with dynamic subscription mgmt
@ -673,14 +673,14 @@ def _call(addr, context, topic, msg, timeout=None,
zmq.SUB, subscribe=msg_id, bind=False
)
LOG.debug(_("Sending cast"))
LOG.debug("Sending cast")
_cast(addr, context, topic, payload, envelope)
LOG.debug(_("Cast sent; Waiting reply"))
LOG.debug("Cast sent; Waiting reply")
# Blocks until receives reply
msg = msg_waiter.recv()
LOG.debug(_("Received message: %s"), msg)
LOG.debug(_("Unpacking response"))
LOG.debug("Received message: %s", msg)
LOG.debug("Unpacking response")
if msg[2] == 'cast': # Legacy version
raw_msg = _deserialize(msg[-1])[-1]
@ -719,10 +719,10 @@ def _multi_send(method, context, topic, msg, timeout=None,
Dispatches to the matchmaker and sends message to all relevant hosts.
"""
conf = CONF
LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))})
LOG.debug("%(msg)s" % {'msg': ' '.join(map(pformat, (topic, msg)))})
queues = _get_matchmaker().queues(topic)
LOG.debug(_("Sending message(s) to: %s"), queues)
LOG.debug("Sending message(s) to: %s", queues)
# Don't stack if we have no matchmaker results
if not queues:

View File

@ -11,6 +11,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
The MatchMaker classes should except a Topic or Fanout exchange key and
return keys for direct exchanges, per (approximate) AMQP parlance.
@ -21,7 +22,7 @@ import contextlib
import eventlet
from oslo.config import cfg
from neutron.openstack.common.gettextutils import _
from neutron.openstack.common.gettextutils import _, _LI
from neutron.openstack.common import log as logging
@ -212,7 +213,7 @@ class HeartbeatMatchMakerBase(MatchMakerBase):
self.hosts.discard(host)
self.backend_unregister(key, '.'.join((key, host)))
LOG.info(_("Matchmaker unregistered: %(key)s, %(host)s"),
LOG.info(_LI("Matchmaker unregistered: %(key)s, %(host)s"),
{'key': key, 'host': host})
def start_heartbeat(self):

View File

@ -11,6 +11,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
The MatchMaker classes should accept a Topic or Fanout exchange key and
return keys for direct exchanges, per (approximate) AMQP parlance.

View File

@ -11,6 +11,7 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
The MatchMaker classes should except a Topic or Fanout exchange key and
return keys for direct exchanges, per (approximate) AMQP parlance.
@ -21,7 +22,7 @@ import json
from oslo.config import cfg
from neutron.openstack.common.gettextutils import _
from neutron.openstack.common.gettextutils import _LW
from neutron.openstack.common import log as logging
from neutron.openstack.common.rpc import matchmaker as mm
@ -52,9 +53,8 @@ class RingExchange(mm.Exchange):
if ring:
self.ring = ring
else:
fh = open(CONF.matchmaker_ring.ringfile, 'r')
self.ring = json.load(fh)
fh.close()
with open(CONF.matchmaker_ring.ringfile, 'r') as fh:
self.ring = json.load(fh)
self.ring0 = {}
for k in self.ring.keys():
@ -72,8 +72,8 @@ class RoundRobinRingExchange(RingExchange):
def run(self, key):
if not self._ring_has(key):
LOG.warn(
_("No key defining hosts for topic '%s', "
"see ringfile") % (key, )
_LW("No key defining hosts for topic '%s', "
"see ringfile") % (key, )
)
return []
host = next(self.ring0[key])
@ -90,8 +90,8 @@ class FanoutRingExchange(RingExchange):
nkey = key.split('fanout~')[1:][0]
if not self._ring_has(nkey):
LOG.warn(
_("No key defining hosts for topic '%s', "
"see ringfile") % (nkey, )
_LW("No key defining hosts for topic '%s', "
"see ringfile") % (nkey, )
)
return []
return map(lambda x: (key + '.' + x, x), self.ring[nkey])

View File

@ -15,7 +15,6 @@
# License for the specific language governing permissions and limitations
# under the License.
from neutron.openstack.common.gettextutils import _
from neutron.openstack.common import log as logging
from neutron.openstack.common import rpc
from neutron.openstack.common.rpc import dispatcher as rpc_dispatcher
@ -44,7 +43,7 @@ class Service(service.Service):
super(Service, self).start()
self.conn = rpc.create_connection(new=True)
LOG.debug(_("Creating Consumer connection for Service %s") %
LOG.debug("Creating Consumer connection for Service %s" %
self.topic)
dispatcher = rpc_dispatcher.RpcDispatcher([self.manager],