Merge "Sync oslo rpc"
This commit is contained in:
commit
c8217a39a9
@ -33,6 +33,8 @@ from eventlet import pools
|
||||
from eventlet import queue
|
||||
from eventlet import semaphore
|
||||
from oslo.config import cfg
|
||||
import six
|
||||
|
||||
|
||||
from heat.openstack.common import excutils
|
||||
from heat.openstack.common.gettextutils import _ # noqa
|
||||
@ -300,10 +302,11 @@ def pack_context(msg, context):
|
||||
"""
|
||||
if isinstance(context, dict):
|
||||
context_d = dict([('_context_%s' % key, value)
|
||||
for (key, value) in context.iteritems()])
|
||||
for (key, value) in six.iteritems(context)])
|
||||
else:
|
||||
context_d = dict([('_context_%s' % key, value)
|
||||
for (key, value) in context.to_dict().iteritems()])
|
||||
for (key, value) in
|
||||
six.iteritems(context.to_dict())])
|
||||
|
||||
msg.update(context_d)
|
||||
|
||||
|
@ -86,7 +86,7 @@ class RPCException(Exception):
|
||||
# kwargs doesn't match a variable in the message
|
||||
# log the issue and the kwargs
|
||||
LOG.exception(_('Exception in string format operation'))
|
||||
for name, value in kwargs.iteritems():
|
||||
for name, value in six.iteritems(kwargs):
|
||||
LOG.error("%s: %s" % (name, value))
|
||||
# at least get the core message out if something happened
|
||||
message = self.msg_fmt
|
||||
|
@ -81,6 +81,8 @@ On the client side, the same changes should be made as in example 1. The
|
||||
minimum version that supports the new parameter should be specified.
|
||||
"""
|
||||
|
||||
import six
|
||||
|
||||
from heat.openstack.common.rpc import common as rpc_common
|
||||
from heat.openstack.common.rpc import serializer as rpc_serializer
|
||||
|
||||
@ -119,7 +121,7 @@ class RpcDispatcher(object):
|
||||
:returns: A new set of deserialized args
|
||||
"""
|
||||
new_kwargs = dict()
|
||||
for argname, arg in kwargs.iteritems():
|
||||
for argname, arg in six.iteritems(kwargs):
|
||||
new_kwargs[argname] = self.serializer.deserialize_entity(context,
|
||||
arg)
|
||||
return new_kwargs
|
||||
|
@ -445,7 +445,7 @@ class Connection(object):
|
||||
'virtual_host': self.conf.rabbit_virtual_host,
|
||||
}
|
||||
|
||||
for sp_key, value in server_params.iteritems():
|
||||
for sp_key, value in six.iteritems(server_params):
|
||||
p_key = server_params_to_kombu_params.get(sp_key, sp_key)
|
||||
params[p_key] = value
|
||||
|
||||
|
@ -130,14 +130,13 @@ class ConsumerBase(object):
|
||||
},
|
||||
},
|
||||
}
|
||||
if link_name:
|
||||
addr_opts["link"]["name"] = link_name
|
||||
addr_opts["node"]["x-declare"].update(node_opts)
|
||||
elif conf.qpid_topology_version == 2:
|
||||
addr_opts = {
|
||||
"link": {
|
||||
"x-declare": {
|
||||
"auto-delete": True,
|
||||
"exclusive": False,
|
||||
},
|
||||
},
|
||||
}
|
||||
@ -145,6 +144,8 @@ class ConsumerBase(object):
|
||||
raise_invalid_topology_version()
|
||||
|
||||
addr_opts["link"]["x-declare"].update(link_opts)
|
||||
if link_name:
|
||||
addr_opts["link"]["name"] = link_name
|
||||
|
||||
self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
|
||||
|
||||
@ -219,14 +220,16 @@ class DirectConsumer(ConsumerBase):
|
||||
if conf.qpid_topology_version == 1:
|
||||
node_name = "%s/%s" % (msg_id, msg_id)
|
||||
node_opts = {"type": "direct"}
|
||||
link_name = msg_id
|
||||
elif conf.qpid_topology_version == 2:
|
||||
node_name = "amq.direct/%s" % msg_id
|
||||
node_opts = {}
|
||||
link_name = None
|
||||
else:
|
||||
raise_invalid_topology_version()
|
||||
|
||||
super(DirectConsumer, self).__init__(conf, session, callback,
|
||||
node_name, node_opts, msg_id,
|
||||
node_name, node_opts, link_name,
|
||||
link_opts)
|
||||
|
||||
|
||||
|
@ -93,7 +93,7 @@ class MatchMakerRedis(mm_common.HeartbeatMatchMakerBase):
|
||||
if not redis:
|
||||
raise ImportError("Failed to import module redis.")
|
||||
|
||||
self.redis = redis.StrictRedis(
|
||||
self.redis = redis.Redis(
|
||||
host=CONF.matchmaker_redis.host,
|
||||
port=CONF.matchmaker_redis.port,
|
||||
password=CONF.matchmaker_redis.password)
|
||||
|
@ -19,6 +19,8 @@ For more information about rpc API version numbers, see:
|
||||
rpc/dispatcher.py
|
||||
"""
|
||||
|
||||
import six
|
||||
|
||||
from heat.openstack.common import rpc
|
||||
from heat.openstack.common.rpc import common as rpc_common
|
||||
from heat.openstack.common.rpc import serializer as rpc_serializer
|
||||
@ -97,7 +99,7 @@ class RpcProxy(object):
|
||||
:returns: A new set of serialized arguments
|
||||
"""
|
||||
new_kwargs = dict()
|
||||
for argname, arg in kwargs.iteritems():
|
||||
for argname, arg in six.iteritems(kwargs):
|
||||
new_kwargs[argname] = self.serializer.serialize_entity(context,
|
||||
arg)
|
||||
return new_kwargs
|
||||
|
Loading…
x
Reference in New Issue
Block a user