Sync rpc from oslo-incubator
This includes: 76972e2 Support a new qpid topology 284b13a Raise timeout in fake RPC if no consumers found 9721129 exception: remove 7b0cb37 Don't eat callback exceptions 69abf38 requeue instead of reject 28395d9 Fixes files with wrong bitmode bec54ac Fix case error in qpid exchange type "direct" 61c4cde Ensure context type is handled when using to_dict 223f9e1 Clarify precedence of secret_key_file a035f95 Don't shadow cfg import in securemessage 0f88575 Remove redundant global keyword in securemessage 848c4d5 Some nitpicky securemessage cleanups 5c71c25 Allow non-use of cfg.CONF in securemessage 9157286 RPC: Add MessageSecurity implementation 2031e60 Refactors boolean returns a047a35 Make ZeroMQ based RPC consumer threads more robust 34a6842 On reconnecting a FanoutConsumer, don't grow the topic name f52446c Add serializer param to RPC service 5ff534d Add config for amqp durable/auto_delete queues 7bfd443 Avoid shadowing Exception 'message' attribute Closes-bug: #1178375 Change-Id: Ib5d4733743041eb2324020f9b1dc553260e79b21
This commit is contained in:
parent
90c842e124
commit
579109cab9
@ -1513,7 +1513,7 @@
|
||||
|
||||
# Modules of exceptions that are permitted to be recreatedupon
|
||||
# receiving exception data from an rpc call. (list value)
|
||||
#allowed_rpc_exception_modules=nova.openstack.common.exception,nova.exception,cinder.exception,exceptions
|
||||
#allowed_rpc_exception_modules=nova.exception,cinder.exception,exceptions
|
||||
|
||||
# If passed, use a fake RabbitMQ provider (boolean value)
|
||||
#fake_rabbit=false
|
||||
@ -1523,6 +1523,17 @@
|
||||
#control_exchange=openstack
|
||||
|
||||
|
||||
#
|
||||
# Options defined in nova.openstack.common.rpc.amqp
|
||||
#
|
||||
|
||||
# Use durable queues in amqp. (boolean value)
|
||||
#amqp_durable_queues=false
|
||||
|
||||
# Auto-delete queues in amqp. (boolean value)
|
||||
#amqp_auto_delete=false
|
||||
|
||||
|
||||
#
|
||||
# Options defined in nova.openstack.common.rpc.impl_kombu
|
||||
#
|
||||
@ -1578,9 +1589,6 @@
|
||||
# value)
|
||||
#rabbit_max_retries=0
|
||||
|
||||
# use durable queues in RabbitMQ (boolean value)
|
||||
#rabbit_durable_queues=false
|
||||
|
||||
# use H/A queues in RabbitMQ (x-ha-policy: all).You need to
|
||||
# wipe RabbitMQ database when changing this option. (boolean
|
||||
# value)
|
||||
@ -1620,6 +1628,14 @@
|
||||
# Disable Nagle algorithm (boolean value)
|
||||
#qpid_tcp_nodelay=true
|
||||
|
||||
# The qpid topology version to use. Version 1 is what was
|
||||
# originally used by impl_qpid. Version 2 includes some
|
||||
# backwards-incompatible changes that allow broker federation
|
||||
# to work. Users should update to version 2 when they are
|
||||
# able to take everything down, as it requires a clean break.
|
||||
# (integer value)
|
||||
#qpid_topology_version=1
|
||||
|
||||
|
||||
#
|
||||
# Options defined in nova.openstack.common.rpc.impl_zmq
|
||||
|
@ -56,8 +56,7 @@ rpc_opts = [
|
||||
help='Seconds to wait before a cast expires (TTL). '
|
||||
'Only supported by impl_zmq.'),
|
||||
cfg.ListOpt('allowed_rpc_exception_modules',
|
||||
default=['nova.openstack.common.exception',
|
||||
'nova.exception',
|
||||
default=['nova.exception',
|
||||
'cinder.exception',
|
||||
'exceptions',
|
||||
],
|
||||
|
@ -34,6 +34,7 @@ from eventlet import greenpool
|
||||
from eventlet import pools
|
||||
from eventlet import queue
|
||||
from eventlet import semaphore
|
||||
from oslo.config import cfg
|
||||
|
||||
from nova.openstack.common import excutils
|
||||
from nova.openstack.common.gettextutils import _ # noqa
|
||||
@ -42,6 +43,19 @@ from nova.openstack.common import log as logging
|
||||
from nova.openstack.common.rpc import common as rpc_common
|
||||
|
||||
|
||||
amqp_opts = [
|
||||
cfg.BoolOpt('amqp_durable_queues',
|
||||
default=False,
|
||||
deprecated_name='rabbit_durable_queues',
|
||||
deprecated_group='DEFAULT',
|
||||
help='Use durable queues in amqp.'),
|
||||
cfg.BoolOpt('amqp_auto_delete',
|
||||
default=False,
|
||||
help='Auto-delete queues in amqp.'),
|
||||
]
|
||||
|
||||
cfg.CONF.register_opts(amqp_opts)
|
||||
|
||||
UNIQUE_ID = '_unique_id'
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@ -286,8 +300,13 @@ def pack_context(msg, context):
|
||||
for args at some point.
|
||||
|
||||
"""
|
||||
context_d = dict([('_context_%s' % key, value)
|
||||
for (key, value) in context.to_dict().iteritems()])
|
||||
if isinstance(context, dict):
|
||||
context_d = dict([('_context_%s' % key, value)
|
||||
for (key, value) in context.iteritems()])
|
||||
else:
|
||||
context_d = dict([('_context_%s' % key, value)
|
||||
for (key, value) in context.to_dict().iteritems()])
|
||||
|
||||
msg.update(context_d)
|
||||
|
||||
|
||||
@ -345,22 +364,43 @@ class CallbackWrapper(_ThreadPoolWithWait):
|
||||
Allows it to be invoked in a green thread.
|
||||
"""
|
||||
|
||||
def __init__(self, conf, callback, connection_pool):
|
||||
def __init__(self, conf, callback, connection_pool,
|
||||
wait_for_consumers=False):
|
||||
"""Initiates CallbackWrapper object.
|
||||
|
||||
:param conf: cfg.CONF instance
|
||||
:param callback: a callable (probably a function)
|
||||
:param connection_pool: connection pool as returned by
|
||||
get_connection_pool()
|
||||
:param wait_for_consumers: wait for all green threads to
|
||||
complete and raise the last
|
||||
caught exception, if any.
|
||||
|
||||
"""
|
||||
super(CallbackWrapper, self).__init__(
|
||||
conf=conf,
|
||||
connection_pool=connection_pool,
|
||||
)
|
||||
self.callback = callback
|
||||
self.wait_for_consumers = wait_for_consumers
|
||||
self.exc_info = None
|
||||
|
||||
def _wrap(self, message_data, **kwargs):
|
||||
"""Wrap the callback invocation to catch exceptions.
|
||||
"""
|
||||
try:
|
||||
self.callback(message_data, **kwargs)
|
||||
except Exception:
|
||||
self.exc_info = sys.exc_info()
|
||||
|
||||
def __call__(self, message_data):
|
||||
self.pool.spawn_n(self.callback, message_data)
|
||||
self.exc_info = None
|
||||
self.pool.spawn_n(self._wrap, message_data)
|
||||
|
||||
if self.wait_for_consumers:
|
||||
self.pool.waitall()
|
||||
if self.exc_info:
|
||||
raise self.exc_info[1], None, self.exc_info[2]
|
||||
|
||||
|
||||
class ProxyCallback(_ThreadPoolWithWait):
|
||||
|
@ -74,14 +74,14 @@ _REMOTE_POSTFIX = '_Remote'
|
||||
|
||||
|
||||
class RPCException(Exception):
|
||||
message = _("An unknown RPC related exception occurred.")
|
||||
msg_fmt = _("An unknown RPC related exception occurred.")
|
||||
|
||||
def __init__(self, message=None, **kwargs):
|
||||
self.kwargs = kwargs
|
||||
|
||||
if not message:
|
||||
try:
|
||||
message = self.message % kwargs
|
||||
message = self.msg_fmt % kwargs
|
||||
|
||||
except Exception:
|
||||
# kwargs doesn't match a variable in the message
|
||||
@ -90,7 +90,7 @@ class RPCException(Exception):
|
||||
for name, value in kwargs.iteritems():
|
||||
LOG.error("%s: %s" % (name, value))
|
||||
# at least get the core message out if something happened
|
||||
message = self.message
|
||||
message = self.msg_fmt
|
||||
|
||||
super(RPCException, self).__init__(message)
|
||||
|
||||
@ -104,7 +104,7 @@ class RemoteError(RPCException):
|
||||
contains all of the relevant info.
|
||||
|
||||
"""
|
||||
message = _("Remote error: %(exc_type)s %(value)s\n%(traceback)s.")
|
||||
msg_fmt = _("Remote error: %(exc_type)s %(value)s\n%(traceback)s.")
|
||||
|
||||
def __init__(self, exc_type=None, value=None, traceback=None):
|
||||
self.exc_type = exc_type
|
||||
@ -121,7 +121,7 @@ class Timeout(RPCException):
|
||||
This exception is raised if the rpc_response_timeout is reached while
|
||||
waiting for a response from the remote side.
|
||||
"""
|
||||
message = _('Timeout while waiting on RPC response - '
|
||||
msg_fmt = _('Timeout while waiting on RPC response - '
|
||||
'topic: "%(topic)s", RPC method: "%(method)s" '
|
||||
'info: "%(info)s"')
|
||||
|
||||
@ -144,25 +144,25 @@ class Timeout(RPCException):
|
||||
|
||||
|
||||
class DuplicateMessageError(RPCException):
|
||||
message = _("Found duplicate message(%(msg_id)s). Skipping it.")
|
||||
msg_fmt = _("Found duplicate message(%(msg_id)s). Skipping it.")
|
||||
|
||||
|
||||
class InvalidRPCConnectionReuse(RPCException):
|
||||
message = _("Invalid reuse of an RPC connection.")
|
||||
msg_fmt = _("Invalid reuse of an RPC connection.")
|
||||
|
||||
|
||||
class UnsupportedRpcVersion(RPCException):
|
||||
message = _("Specified RPC version, %(version)s, not supported by "
|
||||
msg_fmt = _("Specified RPC version, %(version)s, not supported by "
|
||||
"this endpoint.")
|
||||
|
||||
|
||||
class UnsupportedRpcEnvelopeVersion(RPCException):
|
||||
message = _("Specified RPC envelope version, %(version)s, "
|
||||
msg_fmt = _("Specified RPC envelope version, %(version)s, "
|
||||
"not supported by this endpoint.")
|
||||
|
||||
|
||||
class RpcVersionCapError(RPCException):
|
||||
message = _("Specified RPC version cap, %(version_cap)s, is too low")
|
||||
msg_fmt = _("Specified RPC version cap, %(version_cap)s, is too low")
|
||||
|
||||
|
||||
class Connection(object):
|
||||
|
@ -86,9 +86,6 @@ kombu_opts = [
|
||||
default=0,
|
||||
help='maximum retries with trying to connect to RabbitMQ '
|
||||
'(the default of 0 implies an infinite retry count)'),
|
||||
cfg.BoolOpt('rabbit_durable_queues',
|
||||
default=False,
|
||||
help='use durable queues in RabbitMQ'),
|
||||
cfg.BoolOpt('rabbit_ha_queues',
|
||||
default=False,
|
||||
help='use H/A queues in RabbitMQ (x-ha-policy: all).'
|
||||
@ -149,29 +146,23 @@ class ConsumerBase(object):
|
||||
Messages that are processed without exception are ack'ed.
|
||||
|
||||
If the message processing generates an exception, it will be
|
||||
ack'ed if ack_on_error=True. Otherwise it will be .reject()'ed.
|
||||
Rejection is better than waiting for the message to timeout.
|
||||
Rejected messages are immediately requeued.
|
||||
ack'ed if ack_on_error=True. Otherwise it will be .requeue()'ed.
|
||||
"""
|
||||
|
||||
ack_msg = False
|
||||
try:
|
||||
msg = rpc_common.deserialize_msg(message.payload)
|
||||
callback(msg)
|
||||
ack_msg = True
|
||||
except Exception:
|
||||
if self.ack_on_error:
|
||||
ack_msg = True
|
||||
LOG.exception(_("Failed to process message"
|
||||
" ... skipping it."))
|
||||
message.ack()
|
||||
else:
|
||||
LOG.exception(_("Failed to process message"
|
||||
" ... will requeue."))
|
||||
finally:
|
||||
if ack_msg:
|
||||
message.ack()
|
||||
else:
|
||||
message.reject()
|
||||
message.requeue()
|
||||
else:
|
||||
message.ack()
|
||||
|
||||
def consume(self, *args, **kwargs):
|
||||
"""Actually declare the consumer on the amqp channel. This will
|
||||
@ -260,9 +251,9 @@ class TopicConsumer(ConsumerBase):
|
||||
Other kombu options may be passed as keyword arguments
|
||||
"""
|
||||
# Default options
|
||||
options = {'durable': conf.rabbit_durable_queues,
|
||||
options = {'durable': conf.amqp_durable_queues,
|
||||
'queue_arguments': _get_queue_arguments(conf),
|
||||
'auto_delete': False,
|
||||
'auto_delete': conf.amqp_auto_delete,
|
||||
'exclusive': False}
|
||||
options.update(kwargs)
|
||||
exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
|
||||
@ -366,8 +357,8 @@ class TopicPublisher(Publisher):
|
||||
|
||||
Kombu options may be passed as keyword args to override defaults
|
||||
"""
|
||||
options = {'durable': conf.rabbit_durable_queues,
|
||||
'auto_delete': False,
|
||||
options = {'durable': conf.amqp_durable_queues,
|
||||
'auto_delete': conf.amqp_auto_delete,
|
||||
'exclusive': False}
|
||||
options.update(kwargs)
|
||||
exchange_name = rpc_amqp.get_control_exchange(conf)
|
||||
@ -397,7 +388,7 @@ class NotifyPublisher(TopicPublisher):
|
||||
"""Publisher class for 'notify'."""
|
||||
|
||||
def __init__(self, conf, channel, topic, **kwargs):
|
||||
self.durable = kwargs.pop('durable', conf.rabbit_durable_queues)
|
||||
self.durable = kwargs.pop('durable', conf.amqp_durable_queues)
|
||||
self.queue_arguments = _get_queue_arguments(conf)
|
||||
super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs)
|
||||
|
||||
@ -493,12 +484,8 @@ class Connection(object):
|
||||
# future with this?
|
||||
ssl_params['cert_reqs'] = ssl.CERT_REQUIRED
|
||||
|
||||
if not ssl_params:
|
||||
# Just have the default behavior
|
||||
return True
|
||||
else:
|
||||
# Return the extended behavior
|
||||
return ssl_params
|
||||
# Return the extended behavior or just have the default behavior
|
||||
return ssl_params or True
|
||||
|
||||
def _connect(self, params):
|
||||
"""Connect to rabbit. Re-establish any queues that may have
|
||||
@ -796,6 +783,7 @@ class Connection(object):
|
||||
callback=callback,
|
||||
connection_pool=rpc_amqp.get_connection_pool(self.conf,
|
||||
Connection),
|
||||
wait_for_consumers=not ack_on_error
|
||||
)
|
||||
self.proxy_callbacks.append(callback_wrapper)
|
||||
self.declare_topic_consumer(
|
||||
|
@ -67,6 +67,17 @@ qpid_opts = [
|
||||
cfg.BoolOpt('qpid_tcp_nodelay',
|
||||
default=True,
|
||||
help='Disable Nagle algorithm'),
|
||||
# NOTE(russellb) If any additional versions are added (beyond 1 and 2),
|
||||
# this file could probably use some additional refactoring so that the
|
||||
# differences between each version are split into different classes.
|
||||
cfg.IntOpt('qpid_topology_version',
|
||||
default=1,
|
||||
help="The qpid topology version to use. Version 1 is what "
|
||||
"was originally used by impl_qpid. Version 2 includes "
|
||||
"some backwards-incompatible changes that allow broker "
|
||||
"federation to work. Users should update to version 2 "
|
||||
"when they are able to take everything down, as it "
|
||||
"requires a clean break."),
|
||||
]
|
||||
|
||||
cfg.CONF.register_opts(qpid_opts)
|
||||
@ -74,10 +85,17 @@ cfg.CONF.register_opts(qpid_opts)
|
||||
JSON_CONTENT_TYPE = 'application/json; charset=utf8'
|
||||
|
||||
|
||||
def raise_invalid_topology_version(conf):
|
||||
msg = (_("Invalid value for qpid_topology_version: %d") %
|
||||
conf.qpid_topology_version)
|
||||
LOG.error(msg)
|
||||
raise Exception(msg)
|
||||
|
||||
|
||||
class ConsumerBase(object):
|
||||
"""Consumer base class."""
|
||||
|
||||
def __init__(self, session, callback, node_name, node_opts,
|
||||
def __init__(self, conf, session, callback, node_name, node_opts,
|
||||
link_name, link_opts):
|
||||
"""Declare a queue on an amqp session.
|
||||
|
||||
@ -95,26 +113,38 @@ class ConsumerBase(object):
|
||||
self.receiver = None
|
||||
self.session = None
|
||||
|
||||
addr_opts = {
|
||||
"create": "always",
|
||||
"node": {
|
||||
"type": "topic",
|
||||
"x-declare": {
|
||||
if conf.qpid_topology_version == 1:
|
||||
addr_opts = {
|
||||
"create": "always",
|
||||
"node": {
|
||||
"type": "topic",
|
||||
"x-declare": {
|
||||
"durable": True,
|
||||
"auto-delete": True,
|
||||
},
|
||||
},
|
||||
"link": {
|
||||
"name": link_name,
|
||||
"durable": True,
|
||||
"auto-delete": True,
|
||||
"x-declare": {
|
||||
"durable": False,
|
||||
"auto-delete": True,
|
||||
"exclusive": False,
|
||||
},
|
||||
},
|
||||
},
|
||||
"link": {
|
||||
"name": link_name,
|
||||
"durable": True,
|
||||
"x-declare": {
|
||||
"durable": False,
|
||||
"auto-delete": True,
|
||||
"exclusive": False,
|
||||
}
|
||||
addr_opts["node"]["x-declare"].update(node_opts)
|
||||
elif conf.qpid_topology_version == 2:
|
||||
addr_opts = {
|
||||
"link": {
|
||||
"x-declare": {
|
||||
"auto-delete": True,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
addr_opts["node"]["x-declare"].update(node_opts)
|
||||
}
|
||||
else:
|
||||
raise_invalid_topology_version()
|
||||
|
||||
addr_opts["link"]["x-declare"].update(link_opts)
|
||||
|
||||
self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
|
||||
@ -181,11 +211,24 @@ class DirectConsumer(ConsumerBase):
|
||||
'callback' is the callback to call when messages are received
|
||||
"""
|
||||
|
||||
super(DirectConsumer, self).__init__(session, callback,
|
||||
"%s/%s" % (msg_id, msg_id),
|
||||
{"type": "direct"},
|
||||
msg_id,
|
||||
{"exclusive": True})
|
||||
link_opts = {
|
||||
"auto-delete": conf.amqp_auto_delete,
|
||||
"exclusive": True,
|
||||
"durable": conf.amqp_durable_queues,
|
||||
}
|
||||
|
||||
if conf.qpid_topology_version == 1:
|
||||
node_name = "%s/%s" % (msg_id, msg_id)
|
||||
node_opts = {"type": "direct"}
|
||||
elif conf.qpid_topology_version == 2:
|
||||
node_name = "amq.direct/%s" % msg_id
|
||||
node_opts = {}
|
||||
else:
|
||||
raise_invalid_topology_version()
|
||||
|
||||
super(DirectConsumer, self).__init__(conf, session, callback,
|
||||
node_name, node_opts, msg_id,
|
||||
link_opts)
|
||||
|
||||
|
||||
class TopicConsumer(ConsumerBase):
|
||||
@ -203,9 +246,20 @@ class TopicConsumer(ConsumerBase):
|
||||
"""
|
||||
|
||||
exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
|
||||
super(TopicConsumer, self).__init__(session, callback,
|
||||
"%s/%s" % (exchange_name, topic),
|
||||
{}, name or topic, {})
|
||||
link_opts = {
|
||||
"auto-delete": conf.amqp_auto_delete,
|
||||
"durable": conf.amqp_durable_queues,
|
||||
}
|
||||
|
||||
if conf.qpid_topology_version == 1:
|
||||
node_name = "%s/%s" % (exchange_name, topic)
|
||||
elif conf.qpid_topology_version == 2:
|
||||
node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic)
|
||||
else:
|
||||
raise_invalid_topology_version()
|
||||
|
||||
super(TopicConsumer, self).__init__(conf, session, callback, node_name,
|
||||
{}, name or topic, link_opts)
|
||||
|
||||
|
||||
class FanoutConsumer(ConsumerBase):
|
||||
@ -220,15 +274,25 @@ class FanoutConsumer(ConsumerBase):
|
||||
"""
|
||||
self.conf = conf
|
||||
|
||||
super(FanoutConsumer, self).__init__(
|
||||
session, callback,
|
||||
"%s_fanout" % topic,
|
||||
{"durable": False, "type": "fanout"},
|
||||
"%s_fanout_%s" % (topic, uuid.uuid4().hex),
|
||||
{"exclusive": True})
|
||||
link_opts = {"exclusive": True}
|
||||
|
||||
if conf.qpid_topology_version == 1:
|
||||
node_name = "%s_fanout" % topic
|
||||
node_opts = {"durable": False, "type": "fanout"}
|
||||
link_name = "%s_fanout_%s" % (topic, uuid.uuid4().hex)
|
||||
elif conf.qpid_topology_version == 2:
|
||||
node_name = "amq.topic/fanout/%s" % topic
|
||||
node_opts = {}
|
||||
link_name = ""
|
||||
else:
|
||||
raise_invalid_topology_version()
|
||||
|
||||
super(FanoutConsumer, self).__init__(conf, session, callback,
|
||||
node_name, node_opts, link_name,
|
||||
link_opts)
|
||||
|
||||
def reconnect(self, session):
|
||||
topic = self.get_node_name()
|
||||
topic = self.get_node_name().rpartition('_fanout')[0]
|
||||
params = {
|
||||
'session': session,
|
||||
'topic': topic,
|
||||
@ -243,29 +307,34 @@ class FanoutConsumer(ConsumerBase):
|
||||
class Publisher(object):
|
||||
"""Base Publisher class."""
|
||||
|
||||
def __init__(self, session, node_name, node_opts=None):
|
||||
def __init__(self, conf, session, node_name, node_opts=None):
|
||||
"""Init the Publisher class with the exchange_name, routing_key,
|
||||
and other options
|
||||
"""
|
||||
self.sender = None
|
||||
self.session = session
|
||||
|
||||
addr_opts = {
|
||||
"create": "always",
|
||||
"node": {
|
||||
"type": "topic",
|
||||
"x-declare": {
|
||||
"durable": False,
|
||||
# auto-delete isn't implemented for exchanges in qpid,
|
||||
# but put in here anyway
|
||||
"auto-delete": True,
|
||||
if conf.qpid_topology_version == 1:
|
||||
addr_opts = {
|
||||
"create": "always",
|
||||
"node": {
|
||||
"type": "topic",
|
||||
"x-declare": {
|
||||
"durable": False,
|
||||
# auto-delete isn't implemented for exchanges in qpid,
|
||||
# but put in here anyway
|
||||
"auto-delete": True,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
if node_opts:
|
||||
addr_opts["node"]["x-declare"].update(node_opts)
|
||||
}
|
||||
if node_opts:
|
||||
addr_opts["node"]["x-declare"].update(node_opts)
|
||||
|
||||
self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
|
||||
self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
|
||||
elif conf.qpid_topology_version == 2:
|
||||
self.address = node_name
|
||||
else:
|
||||
raise_invalid_topology_version()
|
||||
|
||||
self.reconnect(session)
|
||||
|
||||
@ -309,8 +378,18 @@ class DirectPublisher(Publisher):
|
||||
"""Publisher class for 'direct'."""
|
||||
def __init__(self, conf, session, msg_id):
|
||||
"""Init a 'direct' publisher."""
|
||||
super(DirectPublisher, self).__init__(session, msg_id,
|
||||
{"type": "Direct"})
|
||||
|
||||
if conf.qpid_topology_version == 1:
|
||||
node_name = msg_id
|
||||
node_opts = {"type": "direct"}
|
||||
elif conf.qpid_topology_version == 2:
|
||||
node_name = "amq.direct/%s" % msg_id
|
||||
node_opts = {}
|
||||
else:
|
||||
raise_invalid_topology_version()
|
||||
|
||||
super(DirectPublisher, self).__init__(conf, session, node_name,
|
||||
node_opts)
|
||||
|
||||
|
||||
class TopicPublisher(Publisher):
|
||||
@ -319,8 +398,15 @@ class TopicPublisher(Publisher):
|
||||
"""init a 'topic' publisher.
|
||||
"""
|
||||
exchange_name = rpc_amqp.get_control_exchange(conf)
|
||||
super(TopicPublisher, self).__init__(session,
|
||||
"%s/%s" % (exchange_name, topic))
|
||||
|
||||
if conf.qpid_topology_version == 1:
|
||||
node_name = "%s/%s" % (exchange_name, topic)
|
||||
elif conf.qpid_topology_version == 2:
|
||||
node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic)
|
||||
else:
|
||||
raise_invalid_topology_version()
|
||||
|
||||
super(TopicPublisher, self).__init__(conf, session, node_name)
|
||||
|
||||
|
||||
class FanoutPublisher(Publisher):
|
||||
@ -328,9 +414,18 @@ class FanoutPublisher(Publisher):
|
||||
def __init__(self, conf, session, topic):
|
||||
"""init a 'fanout' publisher.
|
||||
"""
|
||||
super(FanoutPublisher, self).__init__(
|
||||
session,
|
||||
"%s_fanout" % topic, {"type": "fanout"})
|
||||
|
||||
if conf.qpid_topology_version == 1:
|
||||
node_name = "%s_fanout" % topic
|
||||
node_opts = {"type": "fanout"}
|
||||
elif conf.qpid_topology_version == 2:
|
||||
node_name = "amq.topic/fanout/%s" % topic
|
||||
node_opts = {}
|
||||
else:
|
||||
raise_invalid_topology_version()
|
||||
|
||||
super(FanoutPublisher, self).__init__(conf, session, node_name,
|
||||
node_opts)
|
||||
|
||||
|
||||
class NotifyPublisher(Publisher):
|
||||
@ -339,9 +434,17 @@ class NotifyPublisher(Publisher):
|
||||
"""init a 'topic' publisher.
|
||||
"""
|
||||
exchange_name = rpc_amqp.get_control_exchange(conf)
|
||||
super(NotifyPublisher, self).__init__(session,
|
||||
"%s/%s" % (exchange_name, topic),
|
||||
{"durable": True})
|
||||
node_opts = {"durable": True}
|
||||
|
||||
if conf.qpid_topology_version == 1:
|
||||
node_name = "%s/%s" % (exchange_name, topic)
|
||||
elif conf.qpid_topology_version == 2:
|
||||
node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic)
|
||||
else:
|
||||
raise_invalid_topology_version()
|
||||
|
||||
super(NotifyPublisher, self).__init__(conf, session, node_name,
|
||||
node_opts)
|
||||
|
||||
|
||||
class Connection(object):
|
||||
@ -655,6 +758,7 @@ class Connection(object):
|
||||
callback=callback,
|
||||
connection_pool=rpc_amqp.get_connection_pool(self.conf,
|
||||
Connection),
|
||||
wait_for_consumers=not ack_on_error
|
||||
)
|
||||
self.proxy_callbacks.append(callback_wrapper)
|
||||
|
||||
|
@ -383,6 +383,7 @@ class ZmqBaseReactor(ConsumerBase):
|
||||
LOG.info(_("In reactor registered"))
|
||||
|
||||
def consume_in_thread(self):
|
||||
@excutils.forever_retry_uncaught_exceptions
|
||||
def _consume(sock):
|
||||
LOG.info(_("Consuming socket"))
|
||||
while True:
|
||||
|
@ -248,9 +248,7 @@ class DirectBinding(Binding):
|
||||
that it maps directly to a host, thus direct.
|
||||
"""
|
||||
def test(self, key):
|
||||
if '.' in key:
|
||||
return True
|
||||
return False
|
||||
return '.' in key
|
||||
|
||||
|
||||
class TopicBinding(Binding):
|
||||
@ -262,17 +260,13 @@ class TopicBinding(Binding):
|
||||
matches that of a direct exchange.
|
||||
"""
|
||||
def test(self, key):
|
||||
if '.' not in key:
|
||||
return True
|
||||
return False
|
||||
return '.' not in key
|
||||
|
||||
|
||||
class FanoutBinding(Binding):
|
||||
"""Match on fanout keys, where key starts with 'fanout.' string."""
|
||||
def test(self, key):
|
||||
if key.startswith('fanout~'):
|
||||
return True
|
||||
return False
|
||||
return key.startswith('fanout~')
|
||||
|
||||
|
||||
class StubExchange(Exchange):
|
||||
|
@ -63,9 +63,7 @@ class RingExchange(mm.Exchange):
|
||||
self.ring0[k] = itertools.cycle(self.ring[k])
|
||||
|
||||
def _ring_has(self, key):
|
||||
if key in self.ring0:
|
||||
return True
|
||||
return False
|
||||
return key in self.ring0
|
||||
|
||||
|
||||
class RoundRobinRingExchange(RingExchange):
|
||||
|
521
nova/openstack/common/rpc/securemessage.py
Normal file
521
nova/openstack/common/rpc/securemessage.py
Normal file
@ -0,0 +1,521 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2013 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import base64
|
||||
import collections
|
||||
import os
|
||||
import struct
|
||||
import time
|
||||
|
||||
import requests
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
from nova.openstack.common.crypto import utils as cryptoutils
|
||||
from nova.openstack.common import jsonutils
|
||||
from nova.openstack.common import log as logging
|
||||
|
||||
secure_message_opts = [
|
||||
cfg.BoolOpt('enabled', default=True,
|
||||
help='Whether Secure Messaging (Signing) is enabled,'
|
||||
' defaults to enabled'),
|
||||
cfg.BoolOpt('enforced', default=False,
|
||||
help='Whether Secure Messaging (Signing) is enforced,'
|
||||
' defaults to not enforced'),
|
||||
cfg.BoolOpt('encrypt', default=False,
|
||||
help='Whether Secure Messaging (Encryption) is enabled,'
|
||||
' defaults to not enabled'),
|
||||
cfg.StrOpt('secret_keys_file',
|
||||
help='Path to the file containing the keys, takes precedence'
|
||||
' over secret_key'),
|
||||
cfg.MultiStrOpt('secret_key',
|
||||
help='A list of keys: (ex: name:<base64 encoded key>),'
|
||||
' ignored if secret_keys_file is set'),
|
||||
cfg.StrOpt('kds_endpoint',
|
||||
help='KDS endpoint (ex: http://kds.example.com:35357/v3)'),
|
||||
]
|
||||
secure_message_group = cfg.OptGroup('secure_messages',
|
||||
title='Secure Messaging options')
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SecureMessageException(Exception):
|
||||
"""Generic Exception for Secure Messages."""
|
||||
|
||||
msg = "An unknown Secure Message related exception occurred."
|
||||
|
||||
def __init__(self, msg=None):
|
||||
if msg is None:
|
||||
msg = self.msg
|
||||
super(SecureMessageException, self).__init__(msg)
|
||||
|
||||
|
||||
class SharedKeyNotFound(SecureMessageException):
|
||||
"""No shared key was found and no other external authentication mechanism
|
||||
is available.
|
||||
"""
|
||||
|
||||
msg = "Shared Key for [%s] Not Found. (%s)"
|
||||
|
||||
def __init__(self, name, errmsg):
|
||||
super(SharedKeyNotFound, self).__init__(self.msg % (name, errmsg))
|
||||
|
||||
|
||||
class InvalidMetadata(SecureMessageException):
|
||||
"""The metadata is invalid."""
|
||||
|
||||
msg = "Invalid metadata: %s"
|
||||
|
||||
def __init__(self, err):
|
||||
super(InvalidMetadata, self).__init__(self.msg % err)
|
||||
|
||||
|
||||
class InvalidSignature(SecureMessageException):
|
||||
"""Signature validation failed."""
|
||||
|
||||
msg = "Failed to validate signature (source=%s, destination=%s)"
|
||||
|
||||
def __init__(self, src, dst):
|
||||
super(InvalidSignature, self).__init__(self.msg % (src, dst))
|
||||
|
||||
|
||||
class UnknownDestinationName(SecureMessageException):
|
||||
"""The Destination name is unknown to us."""
|
||||
|
||||
msg = "Invalid destination name (%s)"
|
||||
|
||||
def __init__(self, name):
|
||||
super(UnknownDestinationName, self).__init__(self.msg % name)
|
||||
|
||||
|
||||
class InvalidEncryptedTicket(SecureMessageException):
|
||||
"""The Encrypted Ticket could not be successfully handled."""
|
||||
|
||||
msg = "Invalid Ticket (source=%s, destination=%s)"
|
||||
|
||||
def __init__(self, src, dst):
|
||||
super(InvalidEncryptedTicket, self).__init__(self.msg % (src, dst))
|
||||
|
||||
|
||||
class InvalidExpiredTicket(SecureMessageException):
|
||||
"""The ticket received is already expired."""
|
||||
|
||||
msg = "Expired ticket (source=%s, destination=%s)"
|
||||
|
||||
def __init__(self, src, dst):
|
||||
super(InvalidExpiredTicket, self).__init__(self.msg % (src, dst))
|
||||
|
||||
|
||||
class CommunicationError(SecureMessageException):
|
||||
"""The Communication with the KDS failed."""
|
||||
|
||||
msg = "Communication Error (target=%s): %s"
|
||||
|
||||
def __init__(self, target, errmsg):
|
||||
super(CommunicationError, self).__init__(self.msg % (target, errmsg))
|
||||
|
||||
|
||||
class InvalidArgument(SecureMessageException):
|
||||
"""Bad initialization argument."""
|
||||
|
||||
msg = "Invalid argument: %s"
|
||||
|
||||
def __init__(self, errmsg):
|
||||
super(InvalidArgument, self).__init__(self.msg % errmsg)
|
||||
|
||||
|
||||
Ticket = collections.namedtuple('Ticket', ['skey', 'ekey', 'esek'])
|
||||
|
||||
|
||||
class KeyStore(object):
|
||||
"""A storage class for Signing and Encryption Keys.
|
||||
|
||||
This class creates an object that holds Generic Keys like Signing
|
||||
Keys, Encryption Keys, Encrypted SEK Tickets ...
|
||||
"""
|
||||
|
||||
def __init__(self):
|
||||
self._kvps = dict()
|
||||
|
||||
def _get_key_name(self, source, target, ktype):
|
||||
return (source, target, ktype)
|
||||
|
||||
def _put(self, src, dst, ktype, expiration, data):
|
||||
name = self._get_key_name(src, dst, ktype)
|
||||
self._kvps[name] = (expiration, data)
|
||||
|
||||
def _get(self, src, dst, ktype):
|
||||
name = self._get_key_name(src, dst, ktype)
|
||||
if name in self._kvps:
|
||||
expiration, data = self._kvps[name]
|
||||
if expiration > time.time():
|
||||
return data
|
||||
else:
|
||||
del self._kvps[name]
|
||||
|
||||
return None
|
||||
|
||||
def clear(self):
|
||||
"""Wipes the store clear of all data."""
|
||||
self._kvps.clear()
|
||||
|
||||
def put_ticket(self, source, target, skey, ekey, esek, expiration):
|
||||
"""Puts a sek pair in the cache.
|
||||
|
||||
:param source: Client name
|
||||
:param target: Target name
|
||||
:param skey: The Signing Key
|
||||
:param ekey: The Encription Key
|
||||
:param esek: The token encrypted with the target key
|
||||
:param expiration: Expiration time in seconds since Epoch
|
||||
"""
|
||||
keys = Ticket(skey, ekey, esek)
|
||||
self._put(source, target, 'ticket', expiration, keys)
|
||||
|
||||
def get_ticket(self, source, target):
|
||||
"""Returns a Ticket (skey, ekey, esek) namedtuple for the
|
||||
source/target pair.
|
||||
"""
|
||||
return self._get(source, target, 'ticket')
|
||||
|
||||
|
||||
_KEY_STORE = KeyStore()
|
||||
|
||||
|
||||
class _KDSClient(object):
|
||||
|
||||
USER_AGENT = 'oslo-incubator/rpc'
|
||||
|
||||
def __init__(self, endpoint=None, timeout=None):
|
||||
"""A KDS Client class."""
|
||||
|
||||
self._endpoint = endpoint
|
||||
if timeout is not None:
|
||||
self.timeout = float(timeout)
|
||||
else:
|
||||
self.timeout = None
|
||||
|
||||
def _do_get(self, url, request):
|
||||
req_kwargs = dict()
|
||||
req_kwargs['headers'] = dict()
|
||||
req_kwargs['headers']['User-Agent'] = self.USER_AGENT
|
||||
req_kwargs['headers']['Content-Type'] = 'application/json'
|
||||
req_kwargs['data'] = jsonutils.dumps({'request': request})
|
||||
if self.timeout is not None:
|
||||
req_kwargs['timeout'] = self.timeout
|
||||
|
||||
try:
|
||||
resp = requests.get(url, **req_kwargs)
|
||||
except requests.ConnectionError as e:
|
||||
err = "Unable to establish connection. %s" % e
|
||||
raise CommunicationError(url, err)
|
||||
|
||||
return resp
|
||||
|
||||
def _get_reply(self, url, resp):
|
||||
if resp.text:
|
||||
try:
|
||||
body = jsonutils.loads(resp.text)
|
||||
reply = body['reply']
|
||||
except (KeyError, TypeError, ValueError):
|
||||
msg = "Failed to decode reply: %s" % resp.text
|
||||
raise CommunicationError(url, msg)
|
||||
else:
|
||||
msg = "No reply data was returned."
|
||||
raise CommunicationError(url, msg)
|
||||
|
||||
return reply
|
||||
|
||||
def _get_ticket(self, request, url=None, redirects=10):
|
||||
"""Send an HTTP request.
|
||||
|
||||
Wraps around 'requests' to handle redirects and common errors.
|
||||
"""
|
||||
if url is None:
|
||||
if not self._endpoint:
|
||||
raise CommunicationError(url, 'Endpoint not configured')
|
||||
url = self._endpoint + '/kds/ticket'
|
||||
|
||||
while redirects:
|
||||
resp = self._do_get(url, request)
|
||||
if resp.status_code in (301, 302, 305):
|
||||
# Redirected. Reissue the request to the new location.
|
||||
url = resp.headers['location']
|
||||
redirects -= 1
|
||||
continue
|
||||
elif resp.status_code != 200:
|
||||
msg = "Request returned failure status: %s (%s)"
|
||||
err = msg % (resp.status_code, resp.text)
|
||||
raise CommunicationError(url, err)
|
||||
|
||||
return self._get_reply(url, resp)
|
||||
|
||||
raise CommunicationError(url, "Too many redirections, giving up!")
|
||||
|
||||
def get_ticket(self, source, target, crypto, key):
|
||||
|
||||
# prepare metadata
|
||||
md = {'requestor': source,
|
||||
'target': target,
|
||||
'timestamp': time.time(),
|
||||
'nonce': struct.unpack('Q', os.urandom(8))[0]}
|
||||
metadata = base64.b64encode(jsonutils.dumps(md))
|
||||
|
||||
# sign metadata
|
||||
signature = crypto.sign(key, metadata)
|
||||
|
||||
# HTTP request
|
||||
reply = self._get_ticket({'metadata': metadata,
|
||||
'signature': signature})
|
||||
|
||||
# verify reply
|
||||
signature = crypto.sign(key, (reply['metadata'] + reply['ticket']))
|
||||
if signature != reply['signature']:
|
||||
raise InvalidEncryptedTicket(md['source'], md['destination'])
|
||||
md = jsonutils.loads(base64.b64decode(reply['metadata']))
|
||||
if ((md['source'] != source or
|
||||
md['destination'] != target or
|
||||
md['expiration'] < time.time())):
|
||||
raise InvalidEncryptedTicket(md['source'], md['destination'])
|
||||
|
||||
# return ticket data
|
||||
tkt = jsonutils.loads(crypto.decrypt(key, reply['ticket']))
|
||||
|
||||
return tkt, md['expiration']
|
||||
|
||||
|
||||
# we need to keep a global nonce, as this value should never repeat non
|
||||
# matter how many SecureMessage objects we create
|
||||
_NONCE = None
|
||||
|
||||
|
||||
def _get_nonce():
|
||||
"""We keep a single counter per instance, as it is so huge we can't
|
||||
possibly cycle through within 1/100 of a second anyway.
|
||||
"""
|
||||
|
||||
global _NONCE
|
||||
# Lazy initialize, for now get a random value, multiply by 2^32 and
|
||||
# use it as the nonce base. The counter itself will rotate after
|
||||
# 2^32 increments.
|
||||
if _NONCE is None:
|
||||
_NONCE = [struct.unpack('I', os.urandom(4))[0], 0]
|
||||
|
||||
# Increment counter and wrap at 2^32
|
||||
_NONCE[1] += 1
|
||||
if _NONCE[1] > 0xffffffff:
|
||||
_NONCE[1] = 0
|
||||
|
||||
# Return base + counter
|
||||
return long((_NONCE[0] * 0xffffffff)) + _NONCE[1]
|
||||
|
||||
|
||||
class SecureMessage(object):
|
||||
"""A Secure Message object.
|
||||
|
||||
This class creates a signing/encryption facility for RPC messages.
|
||||
It encapsulates all the necessary crypto primitives to insulate
|
||||
regular code from the intricacies of message authentication, validation
|
||||
and optionally encryption.
|
||||
|
||||
:param topic: The topic name of the queue
|
||||
:param host: The server name, together with the topic it forms a unique
|
||||
name that is used to source signing keys, and verify
|
||||
incoming messages.
|
||||
:param conf: a ConfigOpts object
|
||||
:param key: (optional) explicitly pass in endpoint private key.
|
||||
If not provided it will be sourced from the service config
|
||||
:param key_store: (optional) Storage class for local caching
|
||||
:param encrypt: (defaults to False) Whether to encrypt messages
|
||||
:param enctype: (defaults to AES) Cipher to use
|
||||
:param hashtype: (defaults to SHA256) Hash function to use for signatures
|
||||
"""
|
||||
|
||||
def __init__(self, topic, host, conf, key=None, key_store=None,
|
||||
encrypt=None, enctype='AES', hashtype='SHA256'):
|
||||
|
||||
conf.register_group(secure_message_group)
|
||||
conf.register_opts(secure_message_opts, group='secure_messages')
|
||||
|
||||
self._name = '%s.%s' % (topic, host)
|
||||
self._key = key
|
||||
self._conf = conf.secure_messages
|
||||
self._encrypt = self._conf.encrypt if (encrypt is None) else encrypt
|
||||
self._crypto = cryptoutils.SymmetricCrypto(enctype, hashtype)
|
||||
self._hkdf = cryptoutils.HKDF(hashtype)
|
||||
self._kds = _KDSClient(self._conf.kds_endpoint)
|
||||
|
||||
if self._key is None:
|
||||
self._key = self._init_key(topic, self._name)
|
||||
if self._key is None:
|
||||
err = "Secret Key (or key file) is missing or malformed"
|
||||
raise SharedKeyNotFound(self._name, err)
|
||||
|
||||
self._key_store = key_store or _KEY_STORE
|
||||
|
||||
def _init_key(self, topic, name):
|
||||
keys = None
|
||||
if self._conf.secret_keys_file:
|
||||
with open(self._conf.secret_keys_file, 'r') as f:
|
||||
keys = f.readlines()
|
||||
elif self._conf.secret_key:
|
||||
keys = self._conf.secret_key
|
||||
|
||||
if keys is None:
|
||||
return None
|
||||
|
||||
for k in keys:
|
||||
if k[0] == '#':
|
||||
continue
|
||||
if ':' not in k:
|
||||
break
|
||||
svc, key = k.split(':', 1)
|
||||
if svc == topic or svc == name:
|
||||
return base64.b64decode(key)
|
||||
|
||||
return None
|
||||
|
||||
def _split_key(self, key, size):
|
||||
sig_key = key[:size]
|
||||
enc_key = key[size:]
|
||||
return sig_key, enc_key
|
||||
|
||||
def _decode_esek(self, key, source, target, timestamp, esek):
|
||||
"""This function decrypts the esek buffer passed in and returns a
|
||||
KeyStore to be used to check and decrypt the received message.
|
||||
|
||||
:param key: The key to use to decrypt the ticket (esek)
|
||||
:param source: The name of the source service
|
||||
:param traget: The name of the target service
|
||||
:param timestamp: The incoming message timestamp
|
||||
:param esek: a base64 encoded encrypted block containing a JSON string
|
||||
"""
|
||||
rkey = None
|
||||
|
||||
try:
|
||||
s = self._crypto.decrypt(key, esek)
|
||||
j = jsonutils.loads(s)
|
||||
|
||||
rkey = base64.b64decode(j['key'])
|
||||
expiration = j['timestamp'] + j['ttl']
|
||||
if j['timestamp'] > timestamp or timestamp > expiration:
|
||||
raise InvalidExpiredTicket(source, target)
|
||||
|
||||
except Exception:
|
||||
raise InvalidEncryptedTicket(source, target)
|
||||
|
||||
info = '%s,%s,%s' % (source, target, str(j['timestamp']))
|
||||
|
||||
sek = self._hkdf.expand(rkey, info, len(key) * 2)
|
||||
|
||||
return self._split_key(sek, len(key))
|
||||
|
||||
def _get_ticket(self, target):
|
||||
"""This function will check if we already have a SEK for the specified
|
||||
target in the cache, or will go and try to fetch a new SEK from the key
|
||||
server.
|
||||
|
||||
:param target: The name of the target service
|
||||
"""
|
||||
ticket = self._key_store.get_ticket(self._name, target)
|
||||
|
||||
if ticket is not None:
|
||||
return ticket
|
||||
|
||||
tkt, expiration = self._kds.get_ticket(self._name, target,
|
||||
self._crypto, self._key)
|
||||
|
||||
self._key_store.put_ticket(self._name, target,
|
||||
base64.b64decode(tkt['skey']),
|
||||
base64.b64decode(tkt['ekey']),
|
||||
tkt['esek'], expiration)
|
||||
return self._key_store.get_ticket(self._name, target)
|
||||
|
||||
def encode(self, version, target, json_msg):
|
||||
"""This is the main encoding function.
|
||||
|
||||
It takes a target and a message and returns a tuple consisting of a
|
||||
JSON serialized metadata object, a JSON serialized (and optionally
|
||||
encrypted) message, and a signature.
|
||||
|
||||
:param version: the current envelope version
|
||||
:param target: The name of the target service (usually with hostname)
|
||||
:param json_msg: a serialized json message object
|
||||
"""
|
||||
ticket = self._get_ticket(target)
|
||||
|
||||
metadata = jsonutils.dumps({'source': self._name,
|
||||
'destination': target,
|
||||
'timestamp': time.time(),
|
||||
'nonce': _get_nonce(),
|
||||
'esek': ticket.esek,
|
||||
'encryption': self._encrypt})
|
||||
|
||||
message = json_msg
|
||||
if self._encrypt:
|
||||
message = self._crypto.encrypt(ticket.ekey, message)
|
||||
|
||||
signature = self._crypto.sign(ticket.skey,
|
||||
version + metadata + message)
|
||||
|
||||
return (metadata, message, signature)
|
||||
|
||||
def decode(self, version, metadata, message, signature):
|
||||
"""This is the main decoding function.
|
||||
|
||||
It takes a version, metadata, message and signature strings and
|
||||
returns a tuple with a (decrypted) message and metadata or raises
|
||||
an exception in case of error.
|
||||
|
||||
:param version: the current envelope version
|
||||
:param metadata: a JSON serialized object with metadata for validation
|
||||
:param message: a JSON serialized (base64 encoded encrypted) message
|
||||
:param signature: a base64 encoded signature
|
||||
"""
|
||||
md = jsonutils.loads(metadata)
|
||||
|
||||
check_args = ('source', 'destination', 'timestamp',
|
||||
'nonce', 'esek', 'encryption')
|
||||
for arg in check_args:
|
||||
if arg not in md:
|
||||
raise InvalidMetadata('Missing metadata "%s"' % arg)
|
||||
|
||||
if md['destination'] != self._name:
|
||||
# TODO(simo) handle group keys by checking target
|
||||
raise UnknownDestinationName(md['destination'])
|
||||
|
||||
try:
|
||||
skey, ekey = self._decode_esek(self._key,
|
||||
md['source'], md['destination'],
|
||||
md['timestamp'], md['esek'])
|
||||
except InvalidExpiredTicket:
|
||||
raise
|
||||
except Exception:
|
||||
raise InvalidMetadata('Failed to decode ESEK for %s/%s' % (
|
||||
md['source'], md['destination']))
|
||||
|
||||
sig = self._crypto.sign(skey, version + metadata + message)
|
||||
|
||||
if sig != signature:
|
||||
raise InvalidSignature(md['source'], md['destination'])
|
||||
|
||||
if md['encryption'] is True:
|
||||
msg = self._crypto.decrypt(ekey, message)
|
||||
else:
|
||||
msg = message
|
||||
|
||||
return (md, msg)
|
@ -32,10 +32,11 @@ class Service(service.Service):
|
||||
|
||||
A service enables rpc by listening to queues based on topic and host.
|
||||
"""
|
||||
def __init__(self, host, topic, manager=None):
|
||||
def __init__(self, host, topic, manager=None, serializer=None):
|
||||
super(Service, self).__init__()
|
||||
self.host = host
|
||||
self.topic = topic
|
||||
self.serializer = serializer
|
||||
if manager is None:
|
||||
self.manager = self
|
||||
else:
|
||||
@ -48,7 +49,8 @@ class Service(service.Service):
|
||||
LOG.debug(_("Creating Consumer connection for Service %s") %
|
||||
self.topic)
|
||||
|
||||
dispatcher = rpc_dispatcher.RpcDispatcher([self.manager])
|
||||
dispatcher = rpc_dispatcher.RpcDispatcher([self.manager],
|
||||
self.serializer)
|
||||
|
||||
# Share this same connection for these Consumers
|
||||
self.conn.create_consumer(self.topic, dispatcher, fanout=False)
|
||||
|
1
nova/openstack/common/rpc/zmq_receiver.py
Executable file → Normal file
1
nova/openstack/common/rpc/zmq_receiver.py
Executable file → Normal file
@ -1,4 +1,3 @@
|
||||
#!/usr/bin/env python
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011 OpenStack Foundation
|
||||
|
Loading…
x
Reference in New Issue
Block a user