diff --git a/neutron/openstack/common/__init__.py b/neutron/openstack/common/__init__.py index e69de29bb2d..d1223eaf765 100644 --- a/neutron/openstack/common/__init__.py +++ b/neutron/openstack/common/__init__.py @@ -0,0 +1,17 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import six + + +six.add_move(six.MovedModule('mox', 'mox', 'mox3.mox')) diff --git a/neutron/openstack/common/gettextutils.py b/neutron/openstack/common/gettextutils.py index 618d8fbc515..1c33bfb83e4 100644 --- a/neutron/openstack/common/gettextutils.py +++ b/neutron/openstack/common/gettextutils.py @@ -23,11 +23,11 @@ Usual usage in an openstack.common module: """ import copy +import functools import gettext import locale from logging import handlers import os -import re from babel import localedata import six @@ -35,6 +35,17 @@ import six _localedir = os.environ.get('neutron'.upper() + '_LOCALEDIR') _t = gettext.translation('neutron', localedir=_localedir, fallback=True) +# We use separate translation catalogs for each log level, so set up a +# mapping between the log level name and the translator. The domain +# for the log level is project_name + "-log-" + log_level so messages +# for each level end up in their own catalog. +_t_log_levels = dict( + (level, gettext.translation('neutron' + '-log-' + level, + localedir=_localedir, + fallback=True)) + for level in ['info', 'warning', 'error', 'critical'] +) + _AVAILABLE_LANGUAGES = {} USE_LAZY = False @@ -60,6 +71,28 @@ def _(msg): return _t.ugettext(msg) +def _log_translation(msg, level): + """Build a single translation of a log message + """ + if USE_LAZY: + return Message(msg, domain='neutron' + '-log-' + level) + else: + translator = _t_log_levels[level] + if six.PY3: + return translator.gettext(msg) + return translator.ugettext(msg) + +# Translators for log levels. +# +# The abbreviated names are meant to reflect the usual use of a short +# name like '_'. The "L" is for "log" and the other letter comes from +# the level. +_LI = functools.partial(_log_translation, level='info') +_LW = functools.partial(_log_translation, level='warning') +_LE = functools.partial(_log_translation, level='error') +_LC = functools.partial(_log_translation, level='critical') + + def install(domain, lazy=False): """Install a _() function using the given translation domain. @@ -118,7 +151,8 @@ class Message(six.text_type): and can be treated as such. """ - def __new__(cls, msgid, msgtext=None, params=None, domain='neutron', *args): + def __new__(cls, msgid, msgtext=None, params=None, + domain='neutron', *args): """Create a new Message object. In order for translation to work gettext requires a message ID, this @@ -213,47 +247,22 @@ class Message(six.text_type): if other is None: params = (other,) elif isinstance(other, dict): - params = self._trim_dictionary_parameters(other) + # Merge the dictionaries + # Copy each item in case one does not support deep copy. + params = {} + if isinstance(self.params, dict): + for key, val in self.params.items(): + params[key] = self._copy_param(val) + for key, val in other.items(): + params[key] = self._copy_param(val) else: params = self._copy_param(other) return params - def _trim_dictionary_parameters(self, dict_param): - """Return a dict that only has matching entries in the msgid.""" - # NOTE(luisg): Here we trim down the dictionary passed as parameters - # to avoid carrying a lot of unnecessary weight around in the message - # object, for example if someone passes in Message() % locals() but - # only some params are used, and additionally we prevent errors for - # non-deepcopyable objects by unicoding() them. - - # Look for %(param) keys in msgid; - # Skip %% and deal with the case where % is first character on the line - keys = re.findall('(?:[^%]|^)?%\((\w*)\)[a-z]', self.msgid) - - # If we don't find any %(param) keys but have a %s - if not keys and re.findall('(?:[^%]|^)%[a-z]', self.msgid): - # Apparently the full dictionary is the parameter - params = self._copy_param(dict_param) - else: - params = {} - # Save our existing parameters as defaults to protect - # ourselves from losing values if we are called through an - # (erroneous) chain that builds a valid Message with - # arguments, and then does something like "msg % kwds" - # where kwds is an empty dictionary. - src = {} - if isinstance(self.params, dict): - src.update(self.params) - src.update(dict_param) - for key in keys: - params[key] = self._copy_param(src[key]) - - return params - def _copy_param(self, param): try: return copy.deepcopy(param) - except TypeError: + except Exception: # Fallback to casting to unicode this will handle the # python code-like objects that can't be deep-copied return six.text_type(param) @@ -297,9 +306,27 @@ def get_available_languages(domain): list_identifiers = (getattr(localedata, 'list', None) or getattr(localedata, 'locale_identifiers')) locale_identifiers = list_identifiers() + for i in locale_identifiers: if find(i) is not None: language_list.append(i) + + # NOTE(luisg): Babel>=1.0,<1.3 has a bug where some OpenStack supported + # locales (e.g. 'zh_CN', and 'zh_TW') aren't supported even though they + # are perfectly legitimate locales: + # https://github.com/mitsuhiko/babel/issues/37 + # In Babel 1.3 they fixed the bug and they support these locales, but + # they are still not explicitly "listed" by locale_identifiers(). + # That is why we add the locales here explicitly if necessary so that + # they are listed as supported. + aliases = {'zh': 'zh_CN', + 'zh_Hant_HK': 'zh_HK', + 'zh_Hant': 'zh_TW', + 'fil': 'tl_PH'} + for (locale, alias) in six.iteritems(aliases): + if locale in language_list and alias not in language_list: + language_list.append(alias) + _AVAILABLE_LANGUAGES[domain] = language_list return copy.copy(language_list) diff --git a/neutron/openstack/common/rpc/__init__.py b/neutron/openstack/common/rpc/__init__.py index 046b35272b5..77578e64272 100644 --- a/neutron/openstack/common/rpc/__init__.py +++ b/neutron/openstack/common/rpc/__init__.py @@ -23,13 +23,9 @@ For some wrappers that add message versioning to rpc, see: rpc.proxy """ -import inspect - from oslo.config import cfg -from neutron.openstack.common.gettextutils import _ from neutron.openstack.common import importutils -from neutron.openstack.common import local from neutron.openstack.common import log as logging @@ -93,24 +89,7 @@ def create_connection(new=True): return _get_impl().create_connection(CONF, new=new) -def _check_for_lock(): - if not CONF.debug: - return None - - if ((hasattr(local.strong_store, 'locks_held') - and local.strong_store.locks_held)): - stack = ' :: '.join([frame[3] for frame in inspect.stack()]) - LOG.warn(_('A RPC is being made while holding a lock. The locks ' - 'currently held are %(locks)s. This is probably a bug. ' - 'Please report it. Include the following: [%(stack)s].'), - {'locks': local.strong_store.locks_held, - 'stack': stack}) - return True - - return False - - -def call(context, topic, msg, timeout=None, check_for_lock=False): +def call(context, topic, msg, timeout=None): """Invoke a remote method that returns something. :param context: Information that identifies the user that has made this @@ -124,16 +103,12 @@ def call(context, topic, msg, timeout=None, check_for_lock=False): "args" : dict_of_kwargs } :param timeout: int, number of seconds to use for a response timeout. If set, this overrides the rpc_response_timeout option. - :param check_for_lock: if True, a warning is emitted if a RPC call is made - with a lock held. :returns: A dict from the remote method. :raises: openstack.common.rpc.common.Timeout if a complete response is not received before the timeout is reached. """ - if check_for_lock: - _check_for_lock() return _get_impl().call(CONF, context, topic, msg, timeout) @@ -176,7 +151,7 @@ def fanout_cast(context, topic, msg): return _get_impl().fanout_cast(CONF, context, topic, msg) -def multicall(context, topic, msg, timeout=None, check_for_lock=False): +def multicall(context, topic, msg, timeout=None): """Invoke a remote method and get back an iterator. In this case, the remote method will be returning multiple values in @@ -194,8 +169,6 @@ def multicall(context, topic, msg, timeout=None, check_for_lock=False): "args" : dict_of_kwargs } :param timeout: int, number of seconds to use for a response timeout. If set, this overrides the rpc_response_timeout option. - :param check_for_lock: if True, a warning is emitted if a RPC call is made - with a lock held. :returns: An iterator. The iterator will yield a tuple (N, X) where N is an index that starts at 0 and increases by one for each value @@ -205,8 +178,6 @@ def multicall(context, topic, msg, timeout=None, check_for_lock=False): :raises: openstack.common.rpc.common.Timeout if a complete response is not received before the timeout is reached. """ - if check_for_lock: - _check_for_lock() return _get_impl().multicall(CONF, context, topic, msg, timeout) diff --git a/neutron/openstack/common/rpc/amqp.py b/neutron/openstack/common/rpc/amqp.py index a0fa8cef22a..b0f52395a43 100644 --- a/neutron/openstack/common/rpc/amqp.py +++ b/neutron/openstack/common/rpc/amqp.py @@ -37,7 +37,7 @@ import six from neutron.openstack.common import excutils -from neutron.openstack.common.gettextutils import _ +from neutron.openstack.common.gettextutils import _, _LE from neutron.openstack.common import local from neutron.openstack.common import log as logging from neutron.openstack.common.rpc import common as rpc_common @@ -72,7 +72,7 @@ class Pool(pools.Pool): # TODO(comstud): Timeout connections not used in a while def create(self): - LOG.debug(_('Pool creating new connection')) + LOG.debug('Pool creating new connection') return self.connection_cls(self.conf) def empty(self): @@ -287,7 +287,7 @@ def unpack_context(conf, msg): context_dict['reply_q'] = msg.pop('_reply_q', None) context_dict['conf'] = conf ctx = RpcContext.from_dict(context_dict) - rpc_common._safe_log(LOG.debug, _('unpacked context: %s'), ctx.to_dict()) + rpc_common._safe_log(LOG.debug, 'unpacked context: %s', ctx.to_dict()) return ctx @@ -339,7 +339,7 @@ def _add_unique_id(msg): """Add unique_id for checking duplicate messages.""" unique_id = uuid.uuid4().hex msg.update({UNIQUE_ID: unique_id}) - LOG.debug(_('UNIQUE_ID is %s.') % (unique_id)) + LOG.debug('UNIQUE_ID is %s.' % (unique_id)) class _ThreadPoolWithWait(object): @@ -432,7 +432,7 @@ class ProxyCallback(_ThreadPoolWithWait): # the previous context is stored in local.store.context if hasattr(local.store, 'context'): del local.store.context - rpc_common._safe_log(LOG.debug, _('received %s'), message_data) + rpc_common._safe_log(LOG.debug, 'received %s', message_data) self.msg_id_cache.check_duplicate_message(message_data) ctxt = unpack_context(self.conf, message_data) method = message_data.get('method') @@ -469,7 +469,7 @@ class ProxyCallback(_ThreadPoolWithWait): # This final None tells multicall that it is done. ctxt.reply(ending=True, connection_pool=self.connection_pool) except rpc_common.ClientException as e: - LOG.debug(_('Expected exception during message handling (%s)') % + LOG.debug('Expected exception during message handling (%s)' % e._exc_info[1]) ctxt.reply(None, e._exc_info, connection_pool=self.connection_pool, @@ -477,7 +477,7 @@ class ProxyCallback(_ThreadPoolWithWait): except Exception: # sys.exc_info() is deleted by LOG.exception(). exc_info = sys.exc_info() - LOG.error(_('Exception during message handling'), + LOG.error(_LE('Exception during message handling'), exc_info=exc_info) ctxt.reply(None, exc_info, connection_pool=self.connection_pool) @@ -551,10 +551,10 @@ _reply_proxy_create_sem = semaphore.Semaphore() def multicall(conf, context, topic, msg, timeout, connection_pool): """Make a call that returns multiple times.""" - LOG.debug(_('Making synchronous call on %s ...'), topic) + LOG.debug('Making synchronous call on %s ...', topic) msg_id = uuid.uuid4().hex msg.update({'_msg_id': msg_id}) - LOG.debug(_('MSG_ID is %s') % (msg_id)) + LOG.debug('MSG_ID is %s' % (msg_id)) _add_unique_id(msg) pack_context(msg, context) @@ -580,7 +580,7 @@ def call(conf, context, topic, msg, timeout, connection_pool): def cast(conf, context, topic, msg, connection_pool): """Sends a message on a topic without waiting for a response.""" - LOG.debug(_('Making asynchronous cast on %s...'), topic) + LOG.debug('Making asynchronous cast on %s...', topic) _add_unique_id(msg) pack_context(msg, context) with ConnectionContext(conf, connection_pool) as conn: @@ -589,7 +589,7 @@ def cast(conf, context, topic, msg, connection_pool): def fanout_cast(conf, context, topic, msg, connection_pool): """Sends a message on a fanout exchange without waiting for a response.""" - LOG.debug(_('Making asynchronous fanout cast...')) + LOG.debug('Making asynchronous fanout cast...') _add_unique_id(msg) pack_context(msg, context) with ConnectionContext(conf, connection_pool) as conn: @@ -617,7 +617,7 @@ def fanout_cast_to_server(conf, context, server_params, topic, msg, def notify(conf, context, topic, msg, connection_pool, envelope): """Sends a notification event on a topic.""" - LOG.debug(_('Sending %(event_type)s on %(topic)s'), + LOG.debug('Sending %(event_type)s on %(topic)s', dict(event_type=msg.get('event_type'), topic=topic)) _add_unique_id(msg) diff --git a/neutron/openstack/common/rpc/common.py b/neutron/openstack/common/rpc/common.py index aa772fb95bd..34d484649e9 100644 --- a/neutron/openstack/common/rpc/common.py +++ b/neutron/openstack/common/rpc/common.py @@ -22,7 +22,7 @@ import traceback from oslo.config import cfg import six -from neutron.openstack.common.gettextutils import _ +from neutron.openstack.common.gettextutils import _, _LE from neutron.openstack.common import importutils from neutron.openstack.common import jsonutils from neutron.openstack.common import local @@ -85,7 +85,7 @@ class RPCException(Exception): except Exception: # kwargs doesn't match a variable in the message # log the issue and the kwargs - LOG.exception(_('Exception in string format operation')) + LOG.exception(_LE('Exception in string format operation')) for name, value in six.iteritems(kwargs): LOG.error("%s: %s" % (name, value)) # at least get the core message out if something happened @@ -269,6 +269,10 @@ def _safe_log(log_func, msg, msg_data): d[k] = '' elif k.lower() in SANITIZE: d[k] = '' + elif isinstance(d[k], list): + for e in d[k]: + if isinstance(e, dict): + _fix_passwords(e) elif isinstance(d[k], dict): _fix_passwords(d[k]) return d @@ -285,7 +289,7 @@ def serialize_remote_exception(failure_info, log_failure=True): tb = traceback.format_exception(*failure_info) failure = failure_info[1] if log_failure: - LOG.error(_("Returning exception %s to caller"), + LOG.error(_LE("Returning exception %s to caller"), six.text_type(failure)) LOG.error(tb) diff --git a/neutron/openstack/common/rpc/impl_fake.py b/neutron/openstack/common/rpc/impl_fake.py index f6eea936d24..0c8e119aa70 100644 --- a/neutron/openstack/common/rpc/impl_fake.py +++ b/neutron/openstack/common/rpc/impl_fake.py @@ -11,6 +11,7 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. + """Fake RPC implementation which calls proxy methods directly with no queues. Casts will block, but this is very useful for tests. """ @@ -139,8 +140,8 @@ def multicall(conf, context, topic, msg, timeout=None): if not method: return args = msg.get('args', {}) - version = msg.get('version', None) - namespace = msg.get('namespace', None) + version = msg.get('version') + namespace = msg.get('namespace') try: consumer = CONSUMERS[topic][0] @@ -184,8 +185,8 @@ def fanout_cast(conf, context, topic, msg): if not method: return args = msg.get('args', {}) - version = msg.get('version', None) - namespace = msg.get('namespace', None) + version = msg.get('version') + namespace = msg.get('namespace') for consumer in CONSUMERS.get(topic, []): try: diff --git a/neutron/openstack/common/rpc/impl_kombu.py b/neutron/openstack/common/rpc/impl_kombu.py index b0cb70f9eae..d17f64d78bb 100644 --- a/neutron/openstack/common/rpc/impl_kombu.py +++ b/neutron/openstack/common/rpc/impl_kombu.py @@ -29,7 +29,7 @@ from oslo.config import cfg import six from neutron.openstack.common import excutils -from neutron.openstack.common.gettextutils import _ +from neutron.openstack.common.gettextutils import _, _LE, _LI from neutron.openstack.common import network_utils from neutron.openstack.common.rpc import amqp as rpc_amqp from neutron.openstack.common.rpc import common as rpc_common @@ -38,9 +38,9 @@ from neutron.openstack.common import sslutils kombu_opts = [ cfg.StrOpt('kombu_ssl_version', default='', - help='SSL version to use (valid only if SSL enabled). ' - 'valid values are TLSv1, SSLv23 and SSLv3. SSLv2 may ' - 'be available on some distributions' + help='If SSL is enabled, the SSL version to use. Valid ' + 'values are TLSv1, SSLv23 and SSLv3. SSLv2 might ' + 'be available on some distributions.' ), cfg.StrOpt('kombu_ssl_keyfile', default='', @@ -63,33 +63,33 @@ kombu_opts = [ help='RabbitMQ HA cluster host:port pairs'), cfg.BoolOpt('rabbit_use_ssl', default=False, - help='connect over SSL for RabbitMQ'), + help='Connect over SSL for RabbitMQ'), cfg.StrOpt('rabbit_userid', default='guest', - help='the RabbitMQ userid'), + help='The RabbitMQ userid'), cfg.StrOpt('rabbit_password', default='guest', - help='the RabbitMQ password', + help='The RabbitMQ password', secret=True), cfg.StrOpt('rabbit_virtual_host', default='/', - help='the RabbitMQ virtual host'), + help='The RabbitMQ virtual host'), cfg.IntOpt('rabbit_retry_interval', default=1, - help='how frequently to retry connecting with RabbitMQ'), + help='How frequently to retry connecting with RabbitMQ'), cfg.IntOpt('rabbit_retry_backoff', default=2, - help='how long to backoff for between retries when connecting ' + help='How long to backoff for between retries when connecting ' 'to RabbitMQ'), cfg.IntOpt('rabbit_max_retries', default=0, - help='maximum retries with trying to connect to RabbitMQ ' - '(the default of 0 implies an infinite retry count)'), + help='Maximum number of RabbitMQ connection retries. ' + 'Default is 0 (infinite retry count)'), cfg.BoolOpt('rabbit_ha_queues', default=False, - help='use H/A queues in RabbitMQ (x-ha-policy: all).' - 'You need to wipe RabbitMQ database when ' - 'changing this option.'), + help='Use HA queues in RabbitMQ (x-ha-policy: all). ' + 'If you change this option, you must wipe the ' + 'RabbitMQ database.'), ] @@ -153,12 +153,12 @@ class ConsumerBase(object): callback(msg) except Exception: if self.ack_on_error: - LOG.exception(_("Failed to process message" - " ... skipping it.")) + LOG.exception(_LE("Failed to process message" + " ... skipping it.")) message.ack() else: - LOG.exception(_("Failed to process message" - " ... will requeue.")) + LOG.exception(_LE("Failed to process message" + " ... will requeue.")) message.requeue() else: message.ack() @@ -458,6 +458,9 @@ class Connection(object): self.params_list = params_list + brokers_count = len(self.params_list) + self.next_broker_indices = itertools.cycle(range(brokers_count)) + self.memory_transport = self.conf.fake_rabbit self.connection = None @@ -492,7 +495,7 @@ class Connection(object): be handled by the caller. """ if self.connection: - LOG.info(_("Reconnecting to AMQP server on " + LOG.info(_LI("Reconnecting to AMQP server on " "%(hostname)s:%(port)d") % params) try: self.connection.release() @@ -514,7 +517,7 @@ class Connection(object): self.channel._new_queue('ae.undeliver') for consumer in self.consumers: consumer.reconnect(self.channel) - LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d') % + LOG.info(_LI('Connected to AMQP server on %(hostname)s:%(port)d') % params) def reconnect(self): @@ -528,7 +531,7 @@ class Connection(object): attempt = 0 while True: - params = self.params_list[attempt % len(self.params_list)] + params = self.params_list[next(self.next_broker_indices)] attempt += 1 try: self._connect(params) @@ -565,9 +568,9 @@ class Connection(object): sleep_time = min(sleep_time, self.interval_max) log_info['sleep_time'] = sleep_time - LOG.error(_('AMQP server on %(hostname)s:%(port)d is ' - 'unreachable: %(err_str)s. Trying again in ' - '%(sleep_time)d seconds.') % log_info) + LOG.error(_LE('AMQP server on %(hostname)s:%(port)d is ' + 'unreachable: %(err_str)s. Trying again in ' + '%(sleep_time)d seconds.') % log_info) time.sleep(sleep_time) def ensure(self, error_callback, method, *args, **kwargs): @@ -619,7 +622,7 @@ class Connection(object): def _connect_error(exc): log_info = {'topic': topic, 'err_str': str(exc)} - LOG.error(_("Failed to declare consumer for topic '%(topic)s': " + LOG.error(_LE("Failed to declare consumer for topic '%(topic)s': " "%(err_str)s") % log_info) def _declare_consumer(): @@ -637,11 +640,11 @@ class Connection(object): def _error_callback(exc): if isinstance(exc, socket.timeout): - LOG.debug(_('Timed out waiting for RPC response: %s') % + LOG.debug('Timed out waiting for RPC response: %s' % str(exc)) raise rpc_common.Timeout() else: - LOG.exception(_('Failed to consume message from queue: %s') % + LOG.exception(_LE('Failed to consume message from queue: %s') % str(exc)) info['do_consume'] = True @@ -680,7 +683,7 @@ class Connection(object): def _error_callback(exc): log_info = {'topic': topic, 'err_str': str(exc)} - LOG.exception(_("Failed to publish message to topic " + LOG.exception(_LE("Failed to publish message to topic " "'%(topic)s': %(err_str)s") % log_info) def _publish(): diff --git a/neutron/openstack/common/rpc/impl_qpid.py b/neutron/openstack/common/rpc/impl_qpid.py index 03b12e5d4d8..6f2a3dc6c39 100644 --- a/neutron/openstack/common/rpc/impl_qpid.py +++ b/neutron/openstack/common/rpc/impl_qpid.py @@ -23,7 +23,7 @@ from oslo.config import cfg import six from neutron.openstack.common import excutils -from neutron.openstack.common.gettextutils import _ +from neutron.openstack.common.gettextutils import _, _LE, _LI from neutron.openstack.common import importutils from neutron.openstack.common import jsonutils from neutron.openstack.common import log as logging @@ -188,7 +188,7 @@ class ConsumerBase(object): msg = rpc_common.deserialize_msg(message.content) self.callback(msg) except Exception: - LOG.exception(_("Failed to process message... skipping it.")) + LOG.exception(_LE("Failed to process message... skipping it.")) finally: # TODO(sandy): Need support for optional ack_on_error. self.session.acknowledge(message) @@ -467,6 +467,10 @@ class Connection(object): self.brokers = params['qpid_hosts'] self.username = params['username'] self.password = params['password'] + + brokers_count = len(self.brokers) + self.next_broker_indices = itertools.cycle(range(brokers_count)) + self.connection_create(self.brokers[0]) self.reconnect() @@ -494,7 +498,6 @@ class Connection(object): def reconnect(self): """Handles reconnecting and re-establishing sessions and queues.""" - attempt = 0 delay = 1 while True: # Close the session if necessary @@ -504,21 +507,20 @@ class Connection(object): except qpid_exceptions.ConnectionError: pass - broker = self.brokers[attempt % len(self.brokers)] - attempt += 1 + broker = self.brokers[next(self.next_broker_indices)] try: self.connection_create(broker) self.connection.open() except qpid_exceptions.ConnectionError as e: msg_dict = dict(e=e, delay=delay) - msg = _("Unable to connect to AMQP server: %(e)s. " - "Sleeping %(delay)s seconds") % msg_dict + msg = _LE("Unable to connect to AMQP server: %(e)s. " + "Sleeping %(delay)s seconds") % msg_dict LOG.error(msg) time.sleep(delay) - delay = min(2 * delay, 60) + delay = min(delay + 1, 5) else: - LOG.info(_('Connected to AMQP server on %s'), broker) + LOG.info(_LI('Connected to AMQP server on %s'), broker) break self.session = self.connection.session() @@ -531,7 +533,7 @@ class Connection(object): consumer.reconnect(self.session) self._register_consumer(consumer) - LOG.debug(_("Re-established AMQP queues")) + LOG.debug("Re-established AMQP queues") def ensure(self, error_callback, method, *args, **kwargs): while True: @@ -570,7 +572,7 @@ class Connection(object): """ def _connect_error(exc): log_info = {'topic': topic, 'err_str': str(exc)} - LOG.error(_("Failed to declare consumer for topic '%(topic)s': " + LOG.error(_LE("Failed to declare consumer for topic '%(topic)s': " "%(err_str)s") % log_info) def _declare_consumer(): @@ -585,11 +587,11 @@ class Connection(object): def _error_callback(exc): if isinstance(exc, qpid_exceptions.Empty): - LOG.debug(_('Timed out waiting for RPC response: %s') % + LOG.debug('Timed out waiting for RPC response: %s' % str(exc)) raise rpc_common.Timeout() else: - LOG.exception(_('Failed to consume message from queue: %s') % + LOG.exception(_LE('Failed to consume message from queue: %s') % str(exc)) def _consume(): @@ -597,7 +599,7 @@ class Connection(object): try: self._lookup_consumer(nxt_receiver).consume() except Exception: - LOG.exception(_("Error processing message. Skipping it.")) + LOG.exception(_LE("Error processing message. Skipping it.")) for iteration in itertools.count(0): if limit and iteration >= limit: @@ -624,7 +626,7 @@ class Connection(object): def _connect_error(exc): log_info = {'topic': topic, 'err_str': str(exc)} - LOG.exception(_("Failed to publish message to topic " + LOG.exception(_LE("Failed to publish message to topic " "'%(topic)s': %(err_str)s") % log_info) def _publisher_send(): diff --git a/neutron/openstack/common/rpc/impl_zmq.py b/neutron/openstack/common/rpc/impl_zmq.py index 33fa95dd025..fc4b92f05a4 100644 --- a/neutron/openstack/common/rpc/impl_zmq.py +++ b/neutron/openstack/common/rpc/impl_zmq.py @@ -27,7 +27,7 @@ import six from six import moves from neutron.openstack.common import excutils -from neutron.openstack.common.gettextutils import _ +from neutron.openstack.common.gettextutils import _, _LE, _LI from neutron.openstack.common import importutils from neutron.openstack.common import jsonutils from neutron.openstack.common.rpc import common as rpc_common @@ -80,7 +80,7 @@ CONF = cfg.CONF CONF.register_opts(zmq_opts) ZMQ_CTX = None # ZeroMQ Context, must be global. -matchmaker = None # memoized matchmaker object +matchmaker = None # memorized matchmaker object def _serialize(data): @@ -93,12 +93,12 @@ def _serialize(data): return jsonutils.dumps(data, ensure_ascii=True) except TypeError: with excutils.save_and_reraise_exception(): - LOG.error(_("JSON serialization failed.")) + LOG.error(_LE("JSON serialization failed.")) def _deserialize(data): """Deserialization wrapper.""" - LOG.debug(_("Deserializing: %s"), data) + LOG.debug("Deserializing: %s", data) return jsonutils.loads(data) @@ -133,9 +133,9 @@ class ZmqSocket(object): str_data = {'addr': addr, 'type': self.socket_s(), 'subscribe': subscribe, 'bind': bind} - LOG.debug(_("Connecting to %(addr)s with %(type)s"), str_data) - LOG.debug(_("-> Subscribed to %(subscribe)s"), str_data) - LOG.debug(_("-> bind: %(bind)s"), str_data) + LOG.debug("Connecting to %(addr)s with %(type)s", str_data) + LOG.debug("-> Subscribed to %(subscribe)s", str_data) + LOG.debug("-> bind: %(bind)s", str_data) try: if bind: @@ -155,7 +155,7 @@ class ZmqSocket(object): """Subscribe.""" if not self.can_sub: raise RPCException("Cannot subscribe on this socket.") - LOG.debug(_("Subscribing to %s"), msg_filter) + LOG.debug("Subscribing to %s", msg_filter) try: self.sock.setsockopt(zmq.SUBSCRIBE, msg_filter) @@ -192,7 +192,7 @@ class ZmqSocket(object): # it would be much worse if some of the code calling this # were to fail. For now, lets log, and later evaluate # if we can safely raise here. - LOG.error(_("ZeroMQ socket could not be closed.")) + LOG.error(_LE("ZeroMQ socket could not be closed.")) self.sock = None def recv(self, **kwargs): @@ -264,7 +264,7 @@ class InternalContext(object): def _get_response(self, ctx, proxy, topic, data): """Process a curried message and cast the result to topic.""" - LOG.debug(_("Running func with context: %s"), ctx.to_dict()) + LOG.debug("Running func with context: %s", ctx.to_dict()) data.setdefault('version', None) data.setdefault('args', {}) @@ -277,13 +277,13 @@ class InternalContext(object): # ignore these since they are just from shutdowns pass except rpc_common.ClientException as e: - LOG.debug(_("Expected exception during message handling (%s)") % + LOG.debug("Expected exception during message handling (%s)" % e._exc_info[1]) return {'exc': rpc_common.serialize_remote_exception(e._exc_info, log_failure=False)} except Exception: - LOG.error(_("Exception during message handling")) + LOG.error(_LE("Exception during message handling")) return {'exc': rpc_common.serialize_remote_exception(sys.exc_info())} @@ -302,7 +302,7 @@ class InternalContext(object): self._get_response(ctx, proxy, topic, payload), ctx.replies) - LOG.debug(_("Sending reply")) + LOG.debug("Sending reply") _multi_send(_cast, ctx, topic, { 'method': '-process_reply', 'args': { @@ -336,7 +336,7 @@ class ConsumerBase(object): # processed internally. (non-valid method name) method = data.get('method') if not method: - LOG.error(_("RPC message did not include method.")) + LOG.error(_LE("RPC message did not include method.")) return # Internal method @@ -368,7 +368,7 @@ class ZmqBaseReactor(ConsumerBase): def register(self, proxy, in_addr, zmq_type_in, in_bind=True, subscribe=None): - LOG.info(_("Registering reactor")) + LOG.info(_LI("Registering reactor")) if zmq_type_in not in (zmq.PULL, zmq.SUB): raise RPCException("Bad input socktype") @@ -380,12 +380,12 @@ class ZmqBaseReactor(ConsumerBase): self.proxies[inq] = proxy self.sockets.append(inq) - LOG.info(_("In reactor registered")) + LOG.info(_LI("In reactor registered")) def consume_in_thread(self): @excutils.forever_retry_uncaught_exceptions def _consume(sock): - LOG.info(_("Consuming socket")) + LOG.info(_LI("Consuming socket")) while True: self.consume(sock) @@ -435,7 +435,7 @@ class ZmqProxy(ZmqBaseReactor): if topic not in self.topic_proxy: def publisher(waiter): - LOG.info(_("Creating proxy for topic: %s"), topic) + LOG.info(_LI("Creating proxy for topic: %s"), topic) try: # The topic is received over the network, @@ -473,14 +473,14 @@ class ZmqProxy(ZmqBaseReactor): try: wait_sock_creation.wait() except RPCException: - LOG.error(_("Topic socket file creation failed.")) + LOG.error(_LE("Topic socket file creation failed.")) return try: self.topic_proxy[topic].put_nowait(data) except eventlet.queue.Full: - LOG.error(_("Local per-topic backlog buffer full for topic " - "%(topic)s. Dropping message.") % {'topic': topic}) + LOG.error(_LE("Local per-topic backlog buffer full for topic " + "%(topic)s. Dropping message.") % {'topic': topic}) def consume_in_thread(self): """Runs the ZmqProxy service.""" @@ -495,8 +495,8 @@ class ZmqProxy(ZmqBaseReactor): except os.error: if not os.path.isdir(ipc_dir): with excutils.save_and_reraise_exception(): - LOG.error(_("Required IPC directory does not exist at" - " %s") % (ipc_dir, )) + LOG.error(_LE("Required IPC directory does not exist at" + " %s") % (ipc_dir, )) try: self.register(consumption_proxy, consume_in, @@ -504,11 +504,11 @@ class ZmqProxy(ZmqBaseReactor): except zmq.ZMQError: if os.access(ipc_dir, os.X_OK): with excutils.save_and_reraise_exception(): - LOG.error(_("Permission denied to IPC directory at" - " %s") % (ipc_dir, )) + LOG.error(_LE("Permission denied to IPC directory at" + " %s") % (ipc_dir, )) with excutils.save_and_reraise_exception(): - LOG.error(_("Could not create ZeroMQ receiver daemon. " - "Socket may already be in use.")) + LOG.error(_LE("Could not create ZeroMQ receiver daemon. " + "Socket may already be in use.")) super(ZmqProxy, self).consume_in_thread() @@ -541,7 +541,7 @@ class ZmqReactor(ZmqBaseReactor): def consume(self, sock): #TODO(ewindisch): use zero-copy (i.e. references, not copying) data = sock.recv() - LOG.debug(_("CONSUMER RECEIVED DATA: %s"), data) + LOG.debug("CONSUMER RECEIVED DATA: %s", data) proxy = self.proxies[sock] @@ -560,7 +560,7 @@ class ZmqReactor(ZmqBaseReactor): # Unmarshal only after verifying the message. ctx = RpcContext.unmarshal(data[3]) else: - LOG.error(_("ZMQ Envelope version unsupported or unknown.")) + LOG.error(_LE("ZMQ Envelope version unsupported or unknown.")) return self.pool.spawn_n(self.process, proxy, ctx, request) @@ -588,14 +588,14 @@ class Connection(rpc_common.Connection): topic = '.'.join((topic.split('.', 1)[0], CONF.rpc_zmq_host)) if topic in self.topics: - LOG.info(_("Skipping topic registration. Already registered.")) + LOG.info(_LI("Skipping topic registration. Already registered.")) return # Receive messages from (local) proxy inaddr = "ipc://%s/zmq_topic_%s" % \ (CONF.rpc_zmq_ipc_dir, topic) - LOG.debug(_("Consumer is a zmq.%s"), + LOG.debug("Consumer is a zmq.%s", ['PULL', 'SUB'][sock_type == zmq.SUB]) self.reactor.register(proxy, inaddr, sock_type, @@ -647,7 +647,7 @@ def _call(addr, context, topic, msg, timeout=None, # Replies always come into the reply service. reply_topic = "zmq_replies.%s" % CONF.rpc_zmq_host - LOG.debug(_("Creating payload")) + LOG.debug("Creating payload") # Curry the original request into a reply method. mcontext = RpcContext.marshal(context) payload = { @@ -660,7 +660,7 @@ def _call(addr, context, topic, msg, timeout=None, } } - LOG.debug(_("Creating queue socket for reply waiter")) + LOG.debug("Creating queue socket for reply waiter") # Messages arriving async. # TODO(ewindisch): have reply consumer with dynamic subscription mgmt @@ -673,14 +673,14 @@ def _call(addr, context, topic, msg, timeout=None, zmq.SUB, subscribe=msg_id, bind=False ) - LOG.debug(_("Sending cast")) + LOG.debug("Sending cast") _cast(addr, context, topic, payload, envelope) - LOG.debug(_("Cast sent; Waiting reply")) + LOG.debug("Cast sent; Waiting reply") # Blocks until receives reply msg = msg_waiter.recv() - LOG.debug(_("Received message: %s"), msg) - LOG.debug(_("Unpacking response")) + LOG.debug("Received message: %s", msg) + LOG.debug("Unpacking response") if msg[2] == 'cast': # Legacy version raw_msg = _deserialize(msg[-1])[-1] @@ -719,10 +719,10 @@ def _multi_send(method, context, topic, msg, timeout=None, Dispatches to the matchmaker and sends message to all relevant hosts. """ conf = CONF - LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))}) + LOG.debug("%(msg)s" % {'msg': ' '.join(map(pformat, (topic, msg)))}) queues = _get_matchmaker().queues(topic) - LOG.debug(_("Sending message(s) to: %s"), queues) + LOG.debug("Sending message(s) to: %s", queues) # Don't stack if we have no matchmaker results if not queues: diff --git a/neutron/openstack/common/rpc/matchmaker.py b/neutron/openstack/common/rpc/matchmaker.py index 26b6b9f76fc..21c60c90bab 100644 --- a/neutron/openstack/common/rpc/matchmaker.py +++ b/neutron/openstack/common/rpc/matchmaker.py @@ -11,6 +11,7 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. + """ The MatchMaker classes should except a Topic or Fanout exchange key and return keys for direct exchanges, per (approximate) AMQP parlance. @@ -21,7 +22,7 @@ import contextlib import eventlet from oslo.config import cfg -from neutron.openstack.common.gettextutils import _ +from neutron.openstack.common.gettextutils import _, _LI from neutron.openstack.common import log as logging @@ -212,7 +213,7 @@ class HeartbeatMatchMakerBase(MatchMakerBase): self.hosts.discard(host) self.backend_unregister(key, '.'.join((key, host))) - LOG.info(_("Matchmaker unregistered: %(key)s, %(host)s"), + LOG.info(_LI("Matchmaker unregistered: %(key)s, %(host)s"), {'key': key, 'host': host}) def start_heartbeat(self): diff --git a/neutron/openstack/common/rpc/matchmaker_redis.py b/neutron/openstack/common/rpc/matchmaker_redis.py index 0458060bf3d..7b1fdc3c658 100644 --- a/neutron/openstack/common/rpc/matchmaker_redis.py +++ b/neutron/openstack/common/rpc/matchmaker_redis.py @@ -11,6 +11,7 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. + """ The MatchMaker classes should accept a Topic or Fanout exchange key and return keys for direct exchanges, per (approximate) AMQP parlance. diff --git a/neutron/openstack/common/rpc/matchmaker_ring.py b/neutron/openstack/common/rpc/matchmaker_ring.py index 831741922ea..9d106edbd93 100644 --- a/neutron/openstack/common/rpc/matchmaker_ring.py +++ b/neutron/openstack/common/rpc/matchmaker_ring.py @@ -11,6 +11,7 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. + """ The MatchMaker classes should except a Topic or Fanout exchange key and return keys for direct exchanges, per (approximate) AMQP parlance. @@ -21,7 +22,7 @@ import json from oslo.config import cfg -from neutron.openstack.common.gettextutils import _ +from neutron.openstack.common.gettextutils import _LW from neutron.openstack.common import log as logging from neutron.openstack.common.rpc import matchmaker as mm @@ -52,9 +53,8 @@ class RingExchange(mm.Exchange): if ring: self.ring = ring else: - fh = open(CONF.matchmaker_ring.ringfile, 'r') - self.ring = json.load(fh) - fh.close() + with open(CONF.matchmaker_ring.ringfile, 'r') as fh: + self.ring = json.load(fh) self.ring0 = {} for k in self.ring.keys(): @@ -72,8 +72,8 @@ class RoundRobinRingExchange(RingExchange): def run(self, key): if not self._ring_has(key): LOG.warn( - _("No key defining hosts for topic '%s', " - "see ringfile") % (key, ) + _LW("No key defining hosts for topic '%s', " + "see ringfile") % (key, ) ) return [] host = next(self.ring0[key]) @@ -90,8 +90,8 @@ class FanoutRingExchange(RingExchange): nkey = key.split('fanout~')[1:][0] if not self._ring_has(nkey): LOG.warn( - _("No key defining hosts for topic '%s', " - "see ringfile") % (nkey, ) + _LW("No key defining hosts for topic '%s', " + "see ringfile") % (nkey, ) ) return [] return map(lambda x: (key + '.' + x, x), self.ring[nkey]) diff --git a/neutron/openstack/common/rpc/service.py b/neutron/openstack/common/rpc/service.py index 4479dcc3ac9..ae8c56120f5 100644 --- a/neutron/openstack/common/rpc/service.py +++ b/neutron/openstack/common/rpc/service.py @@ -15,7 +15,6 @@ # License for the specific language governing permissions and limitations # under the License. -from neutron.openstack.common.gettextutils import _ from neutron.openstack.common import log as logging from neutron.openstack.common import rpc from neutron.openstack.common.rpc import dispatcher as rpc_dispatcher @@ -44,7 +43,7 @@ class Service(service.Service): super(Service, self).start() self.conn = rpc.create_connection(new=True) - LOG.debug(_("Creating Consumer connection for Service %s") % + LOG.debug("Creating Consumer connection for Service %s" % self.topic) dispatcher = rpc_dispatcher.RpcDispatcher([self.manager],