oslo: update the rpc module

- It now requires the versionutils module.
- We need to deal with the "allow_rpc_exception_modules" not
  including heat.common.exception

Change-Id: I1ea1479d7bc023649b2af0f68683e9a81f868df9
This commit is contained in:
Angus Salkeld 2013-12-04 10:10:22 +11:00
parent 4ac0c8f02f
commit e2e711fc1c
20 changed files with 97 additions and 68 deletions

View File

@ -299,9 +299,9 @@
# by impl_zmq. (integer value) # by impl_zmq. (integer value)
#rpc_cast_timeout=30 #rpc_cast_timeout=30
# Modules of exceptions that are permitted to be recreatedupon # Modules of exceptions that are permitted to be recreated
# receiving exception data from an rpc call. (list value) # upon receiving exception data from an rpc call. (list value)
#allowed_rpc_exception_modules=heat.openstack.common.exception,heat.common.exception,nova.exception,cinder.exception,exceptions #allowed_rpc_exception_modules=nova.exception,cinder.exception,exceptions
# If passed, use a fake RabbitMQ provider (boolean value) # If passed, use a fake RabbitMQ provider (boolean value)
#fake_rabbit=false #fake_rabbit=false

View File

@ -170,6 +170,17 @@ cfg.CONF.register_group(revision_group)
cfg.CONF.register_opts(revision_opts, group=revision_group) cfg.CONF.register_opts(revision_opts, group=revision_group)
register_clients_opts() register_clients_opts()
# A bit of history:
# This was added initially by jianingy, then it got added
# to oslo by Luis. Then it was receintly removed from the
# default list again.
# I am not sure we can (or should) rely on oslo to keep
# our exceptions class in the defaults list.
allowed_rpc_exception_modules = cfg.CONF.allowed_rpc_exception_modules
allowed_rpc_exception_modules.append('heat.common.exception')
cfg.CONF.set_default(name='allowed_rpc_exception_modules',
default=allowed_rpc_exception_modules)
def rpc_set_default(): def rpc_set_default():
rpc.set_defaults(control_exchange='heat') rpc.set_defaults(control_exchange='heat')

View File

@ -1,5 +1,3 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the # Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration. # Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved. # All Rights Reserved.
@ -56,9 +54,7 @@ rpc_opts = [
help='Seconds to wait before a cast expires (TTL). ' help='Seconds to wait before a cast expires (TTL). '
'Only supported by impl_zmq.'), 'Only supported by impl_zmq.'),
cfg.ListOpt('allowed_rpc_exception_modules', cfg.ListOpt('allowed_rpc_exception_modules',
default=['heat.openstack.common.exception', default=['nova.exception',
'heat.common.exception',
'nova.exception',
'cinder.exception', 'cinder.exception',
'exceptions', 'exceptions',
], ],
@ -229,7 +225,7 @@ def notify(context, topic, msg, envelope=False):
def cleanup(): def cleanup():
"""Clean up resoruces in use by implementation. """Clean up resources in use by implementation.
Clean up any resources that have been allocated by the RPC implementation. Clean up any resources that have been allocated by the RPC implementation.
This is typically open connections to a messaging service. This function This is typically open connections to a messaging service. This function

View File

@ -1,5 +1,3 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the # Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration. # Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved. # All Rights Reserved.
@ -20,9 +18,9 @@
""" """
Shared code between AMQP based openstack.common.rpc implementations. Shared code between AMQP based openstack.common.rpc implementations.
The code in this module is shared between the rpc implemenations based on AMQP. The code in this module is shared between the rpc implementations based on
Specifically, this includes impl_kombu and impl_qpid. impl_carrot also uses AMQP. Specifically, this includes impl_kombu and impl_qpid. impl_carrot also
AMQP, but is deprecated and predates this code. uses AMQP, but is deprecated and predates this code.
""" """
import collections import collections
@ -189,7 +187,7 @@ class ReplyProxy(ConnectionContext):
def __init__(self, conf, connection_pool): def __init__(self, conf, connection_pool):
self._call_waiters = {} self._call_waiters = {}
self._num_call_waiters = 0 self._num_call_waiters = 0
self._num_call_waiters_wrn_threshhold = 10 self._num_call_waiters_wrn_threshold = 10
self._reply_q = 'reply_' + uuid.uuid4().hex self._reply_q = 'reply_' + uuid.uuid4().hex
super(ReplyProxy, self).__init__(conf, connection_pool, pooled=False) super(ReplyProxy, self).__init__(conf, connection_pool, pooled=False)
self.declare_direct_consumer(self._reply_q, self._process_data) self.declare_direct_consumer(self._reply_q, self._process_data)
@ -208,11 +206,11 @@ class ReplyProxy(ConnectionContext):
def add_call_waiter(self, waiter, msg_id): def add_call_waiter(self, waiter, msg_id):
self._num_call_waiters += 1 self._num_call_waiters += 1
if self._num_call_waiters > self._num_call_waiters_wrn_threshhold: if self._num_call_waiters > self._num_call_waiters_wrn_threshold:
LOG.warn(_('Number of call waiters is greater than warning ' LOG.warn(_('Number of call waiters is greater than warning '
'threshhold: %d. There could be a MulticallProxyWaiter ' 'threshold: %d. There could be a MulticallProxyWaiter '
'leak.') % self._num_call_waiters_wrn_threshhold) 'leak.') % self._num_call_waiters_wrn_threshold)
self._num_call_waiters_wrn_threshhold *= 2 self._num_call_waiters_wrn_threshold *= 2
self._call_waiters[msg_id] = waiter self._call_waiters[msg_id] = waiter
def del_call_waiter(self, msg_id): def del_call_waiter(self, msg_id):
@ -241,7 +239,7 @@ def msg_reply(conf, msg_id, reply_q, connection_pool, reply=None,
_add_unique_id(msg) _add_unique_id(msg)
# If a reply_q exists, add the msg_id to the reply and pass the # If a reply_q exists, add the msg_id to the reply and pass the
# reply_q to direct_send() to use it as the response queue. # reply_q to direct_send() to use it as the response queue.
# Otherwise use the msg_id for backward compatibilty. # Otherwise use the msg_id for backward compatibility.
if reply_q: if reply_q:
msg['_msg_id'] = msg_id msg['_msg_id'] = msg_id
conn.direct_send(reply_q, rpc_common.serialize_msg(msg)) conn.direct_send(reply_q, rpc_common.serialize_msg(msg))

View File

@ -1,5 +1,3 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the # Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration. # Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved. # All Rights Reserved.
@ -29,6 +27,7 @@ from heat.openstack.common import importutils
from heat.openstack.common import jsonutils from heat.openstack.common import jsonutils
from heat.openstack.common import local from heat.openstack.common import local
from heat.openstack.common import log as logging from heat.openstack.common import log as logging
from heat.openstack.common import versionutils
CONF = cfg.CONF CONF = cfg.CONF
@ -265,7 +264,7 @@ def _safe_log(log_func, msg, msg_data):
def _fix_passwords(d): def _fix_passwords(d):
"""Sanitizes the password fields in the dictionary.""" """Sanitizes the password fields in the dictionary."""
for k in d.iterkeys(): for k in six.iterkeys(d):
if k.lower().find('password') != -1: if k.lower().find('password') != -1:
d[k] = '<SANITIZED>' d[k] = '<SANITIZED>'
elif k.lower() in SANITIZE: elif k.lower() in SANITIZE:
@ -441,19 +440,15 @@ def client_exceptions(*exceptions):
return outer return outer
# TODO(sirp): we should deprecate this in favor of
# using `versionutils.is_compatible` directly
def version_is_compatible(imp_version, version): def version_is_compatible(imp_version, version):
"""Determine whether versions are compatible. """Determine whether versions are compatible.
:param imp_version: The version implemented :param imp_version: The version implemented
:param version: The version requested by an incoming message. :param version: The version requested by an incoming message.
""" """
version_parts = version.split('.') return versionutils.is_compatible(version, imp_version)
imp_version_parts = imp_version.split('.')
if int(version_parts[0]) != int(imp_version_parts[0]): # Major
return False
if int(version_parts[1]) > int(imp_version_parts[1]): # Minor
return False
return True
def serialize_msg(raw_msg): def serialize_msg(raw_msg):

View File

@ -1,5 +1,3 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 Red Hat, Inc. # Copyright 2012 Red Hat, Inc.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -1,5 +1,3 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack Foundation # Copyright 2011 OpenStack Foundation
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -1,5 +1,3 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack Foundation # Copyright 2011 OpenStack Foundation
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -28,6 +26,7 @@ import kombu.connection
import kombu.entity import kombu.entity
import kombu.messaging import kombu.messaging
from oslo.config import cfg from oslo.config import cfg
import six
from heat.openstack.common import excutils from heat.openstack.common import excutils
from heat.openstack.common.gettextutils import _ # noqa from heat.openstack.common.gettextutils import _ # noqa
@ -625,7 +624,7 @@ class Connection(object):
def _declare_consumer(): def _declare_consumer():
consumer = consumer_cls(self.conf, self.channel, topic, callback, consumer = consumer_cls(self.conf, self.channel, topic, callback,
self.consumer_num.next()) six.next(self.consumer_num))
self.consumers.append(consumer) self.consumers.append(consumer)
return consumer return consumer
@ -732,7 +731,7 @@ class Connection(object):
it = self.iterconsume(limit=limit) it = self.iterconsume(limit=limit)
while True: while True:
try: try:
it.next() six.next(it)
except StopIteration: except StopIteration:
return return

View File

@ -1,5 +1,3 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack Foundation # Copyright 2011 OpenStack Foundation
# Copyright 2011 - 2012, Red Hat, Inc. # Copyright 2011 - 2012, Red Hat, Inc.
# #
@ -23,6 +21,7 @@ import uuid
import eventlet import eventlet
import greenlet import greenlet
from oslo.config import cfg from oslo.config import cfg
import six
from heat.openstack.common import excutils from heat.openstack.common import excutils
from heat.openstack.common.gettextutils import _ # noqa from heat.openstack.common.gettextutils import _ # noqa
@ -152,7 +151,7 @@ class ConsumerBase(object):
self.connect(session) self.connect(session)
def connect(self, session): def connect(self, session):
"""Declare the reciever on connect.""" """Declare the receiver on connect."""
self._declare_receiver(session) self._declare_receiver(session)
def reconnect(self, session): def reconnect(self, session):
@ -395,7 +394,7 @@ class DirectPublisher(Publisher):
class TopicPublisher(Publisher): class TopicPublisher(Publisher):
"""Publisher class for 'topic'.""" """Publisher class for 'topic'."""
def __init__(self, conf, session, topic): def __init__(self, conf, session, topic):
"""init a 'topic' publisher. """Init a 'topic' publisher.
""" """
exchange_name = rpc_amqp.get_control_exchange(conf) exchange_name = rpc_amqp.get_control_exchange(conf)
@ -412,7 +411,7 @@ class TopicPublisher(Publisher):
class FanoutPublisher(Publisher): class FanoutPublisher(Publisher):
"""Publisher class for 'fanout'.""" """Publisher class for 'fanout'."""
def __init__(self, conf, session, topic): def __init__(self, conf, session, topic):
"""init a 'fanout' publisher. """Init a 'fanout' publisher.
""" """
if conf.qpid_topology_version == 1: if conf.qpid_topology_version == 1:
@ -431,7 +430,7 @@ class FanoutPublisher(Publisher):
class NotifyPublisher(Publisher): class NotifyPublisher(Publisher):
"""Publisher class for notifications.""" """Publisher class for notifications."""
def __init__(self, conf, session, topic): def __init__(self, conf, session, topic):
"""init a 'topic' publisher. """Init a 'topic' publisher.
""" """
exchange_name = rpc_amqp.get_control_exchange(conf) exchange_name = rpc_amqp.get_control_exchange(conf)
node_opts = {"durable": True} node_opts = {"durable": True}
@ -539,7 +538,7 @@ class Connection(object):
consumers = self.consumers consumers = self.consumers
self.consumers = {} self.consumers = {}
for consumer in consumers.itervalues(): for consumer in six.itervalues(consumers):
consumer.reconnect(self.session) consumer.reconnect(self.session)
self._register_consumer(consumer) self._register_consumer(consumer)
@ -697,7 +696,7 @@ class Connection(object):
it = self.iterconsume(limit=limit) it = self.iterconsume(limit=limit)
while True: while True:
try: try:
it.next() six.next(it)
except StopIteration: except StopIteration:
return return

View File

@ -1,5 +1,3 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 Cloudscaling Group, Inc # Copyright 2011 Cloudscaling Group, Inc
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -25,6 +23,8 @@ import uuid
import eventlet import eventlet
import greenlet import greenlet
from oslo.config import cfg from oslo.config import cfg
import six
from six import moves
from heat.openstack.common import excutils from heat.openstack.common import excutils
from heat.openstack.common.gettextutils import _ # noqa from heat.openstack.common.gettextutils import _ # noqa
@ -192,7 +192,7 @@ class ZmqSocket(object):
# it would be much worse if some of the code calling this # it would be much worse if some of the code calling this
# were to fail. For now, lets log, and later evaluate # were to fail. For now, lets log, and later evaluate
# if we can safely raise here. # if we can safely raise here.
LOG.error("ZeroMQ socket could not be closed.") LOG.error(_("ZeroMQ socket could not be closed."))
self.sock = None self.sock = None
def recv(self, **kwargs): def recv(self, **kwargs):
@ -221,7 +221,7 @@ class ZmqClient(object):
return return
rpc_envelope = rpc_common.serialize_msg(data[1], envelope) rpc_envelope = rpc_common.serialize_msg(data[1], envelope)
zmq_msg = reduce(lambda x, y: x + y, rpc_envelope.items()) zmq_msg = moves.reduce(lambda x, y: x + y, rpc_envelope.items())
self.outq.send(map(bytes, self.outq.send(map(bytes,
(msg_id, topic, 'impl_zmq_v2', data[0]) + zmq_msg)) (msg_id, topic, 'impl_zmq_v2', data[0]) + zmq_msg))
@ -523,8 +523,8 @@ def unflatten_envelope(packenv):
h = {} h = {}
try: try:
while True: while True:
k = i.next() k = six.next(i)
h[k] = i.next() h[k] = six.next(i)
except StopIteration: except StopIteration:
return h return h

View File

@ -1,5 +1,3 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 Cloudscaling Group, Inc # Copyright 2011 Cloudscaling Group, Inc
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -1,5 +1,3 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 Cloudscaling Group, Inc # Copyright 2013 Cloudscaling Group, Inc
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -1,5 +1,3 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011-2013 Cloudscaling Group, Inc # Copyright 2011-2013 Cloudscaling Group, Inc
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -1,5 +1,3 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012-2013 Red Hat, Inc. # Copyright 2012-2013 Red Hat, Inc.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -21,7 +19,6 @@ For more information about rpc API version numbers, see:
rpc/dispatcher.py rpc/dispatcher.py
""" """
from heat.openstack.common import rpc from heat.openstack.common import rpc
from heat.openstack.common.rpc import common as rpc_common from heat.openstack.common.rpc import common as rpc_common
from heat.openstack.common.rpc import serializer as rpc_serializer from heat.openstack.common.rpc import serializer as rpc_serializer
@ -36,7 +33,7 @@ class RpcProxy(object):
rpc API. rpc API.
""" """
# The default namespace, which can be overriden in a subclass. # The default namespace, which can be overridden in a subclass.
RPC_API_NAMESPACE = None RPC_API_NAMESPACE = None
def __init__(self, topic, default_version, version_cap=None, def __init__(self, topic, default_version, version_cap=None,

View File

@ -16,10 +16,12 @@
import abc import abc
import six
@six.add_metaclass(abc.ABCMeta)
class Serializer(object): class Serializer(object):
"""Generic (de-)serialization definition base class.""" """Generic (de-)serialization definition base class."""
__metaclass__ = abc.ABCMeta
@abc.abstractmethod @abc.abstractmethod
def serialize_entity(self, context, entity): def serialize_entity(self, context, entity):

View File

@ -1,5 +1,3 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the # Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration. # Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved. # All Rights Reserved.

View File

@ -1,5 +1,3 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack Foundation # Copyright 2011 OpenStack Foundation
# #
# Licensed under the Apache License, Version 2.0 (the "License"); you may # Licensed under the Apache License, Version 2.0 (the "License"); you may

View File

@ -0,0 +1,43 @@
# Copyright (c) 2013 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Helpers for comparing version strings.
"""
import pkg_resources
def is_compatible(requested_version, current_version, same_major=True):
"""Determine whether `requested_version` is satisfied by
`current_version`; in other words, `current_version` is >=
`requested_version`.
:param requested_version: version to check for compatibility
:param current_version: version to check against
:param same_major: if True, the major version must be identical between
`requested_version` and `current_version`. This is used when a
major-version difference indicates incompatibility between the two
versions. Since this is the common-case in practice, the default is
True.
:returns: True if compatible, False if not
"""
requested_parts = pkg_resources.parse_version(requested_version)
current_parts = pkg_resources.parse_version(current_version)
if same_major and (requested_parts[0] != current_parts[0]):
return False
return current_parts >= requested_parts

View File

@ -53,6 +53,8 @@ class HeatTestCase(testtools.TestCase):
'environment.d') 'environment.d')
cfg.CONF.set_default('environment_dir', env_dir) cfg.CONF.set_default('environment_dir', env_dir)
cfg.CONF.set_override('allowed_rpc_exception_modules',
['heat.common.exception', 'exceptions'])
self.addCleanup(cfg.CONF.reset) self.addCleanup(cfg.CONF.reset)
tri = resources.global_env().get_resource_info( tri = resources.global_env().get_resource_info(

View File

@ -24,6 +24,7 @@ module=uuidutils
module=config module=config
module=strutils module=strutils
module=py3kcompat module=py3kcompat
module=versionutils
# The base module to hold the copy of openstack.common # The base module to hold the copy of openstack.common
base=heat base=heat