Latest common updates

Fixes bug 1132908

Change-Id: I726bc5b3199d2fa606832418db9b6e20ffbc3879
This commit is contained in:
Gary Kotton 2013-02-25 17:55:58 +00:00
parent bec2549657
commit 56e6378219
7 changed files with 154 additions and 53 deletions

View File

@ -51,12 +51,20 @@ def _print_greenthreads():
print
def _print_nativethreads():
for threadId, stack in sys._current_frames().items():
print threadId
traceback.print_stack(stack)
print
def initialize_if_enabled():
backdoor_locals = {
'exit': _dont_use_this, # So we don't exit the entire process
'quit': _dont_use_this, # So we don't exit the entire process
'fo': _find_objects,
'pgt': _print_greenthreads,
'pnt': _print_nativethreads,
}
if CONF.backdoor_port is None:

View File

@ -325,16 +325,11 @@ def _create_logging_excepthook(product_name):
def setup(product_name):
"""Setup logging."""
sys.excepthook = _create_logging_excepthook(product_name)
if CONF.log_config:
try:
logging.config.fileConfig(CONF.log_config)
except Exception:
traceback.print_exc()
raise
logging.config.fileConfig(CONF.log_config)
else:
_setup_logging_from_conf(product_name)
sys.excepthook = _create_logging_excepthook(product_name)
def set_defaults(logging_context_format_string):

View File

@ -25,6 +25,7 @@ Specifically, this includes impl_kombu and impl_qpid. impl_carrot also uses
AMQP, but is deprecated and predates this code.
"""
import collections
import inspect
import sys
import uuid
@ -54,6 +55,7 @@ amqp_opts = [
cfg.CONF.register_opts(amqp_opts)
UNIQUE_ID = '_unique_id'
LOG = logging.getLogger(__name__)
@ -236,6 +238,7 @@ def msg_reply(conf, msg_id, reply_q, connection_pool, reply=None,
'failure': failure}
if ending:
msg['ending'] = True
_add_unique_id(msg)
# If a reply_q exists, add the msg_id to the reply and pass the
# reply_q to direct_send() to use it as the response queue.
# Otherwise use the msg_id for backward compatibilty.
@ -302,6 +305,37 @@ def pack_context(msg, context):
msg.update(context_d)
class _MsgIdCache(object):
"""This class checks any duplicate messages."""
# NOTE: This value is considered can be a configuration item, but
# it is not necessary to change its value in most cases,
# so let this value as static for now.
DUP_MSG_CHECK_SIZE = 16
def __init__(self, **kwargs):
self.prev_msgids = collections.deque([],
maxlen=self.DUP_MSG_CHECK_SIZE)
def check_duplicate_message(self, message_data):
"""AMQP consumers may read same message twice when exceptions occur
before ack is returned. This method prevents doing it.
"""
if UNIQUE_ID in message_data:
msg_id = message_data[UNIQUE_ID]
if msg_id not in self.prev_msgids:
self.prev_msgids.append(msg_id)
else:
raise rpc_common.DuplicateMessageError(msg_id=msg_id)
def _add_unique_id(msg):
"""Add unique_id for checking duplicate messages."""
unique_id = uuid.uuid4().hex
msg.update({UNIQUE_ID: unique_id})
LOG.debug(_('UNIQUE_ID is %s.') % (unique_id))
class _ThreadPoolWithWait(object):
"""Base class for a delayed invocation manager used by
the Connection class to start up green threads
@ -349,6 +383,7 @@ class ProxyCallback(_ThreadPoolWithWait):
connection_pool=connection_pool,
)
self.proxy = proxy
self.msg_id_cache = _MsgIdCache()
def __call__(self, message_data):
"""Consumer callback to call a method on a proxy object.
@ -368,6 +403,7 @@ class ProxyCallback(_ThreadPoolWithWait):
if hasattr(local.store, 'context'):
del local.store.context
rpc_common._safe_log(LOG.debug, _('received %s'), message_data)
self.msg_id_cache.check_duplicate_message(message_data)
ctxt = unpack_context(self.conf, message_data)
method = message_data.get('method')
args = message_data.get('args', {})
@ -422,6 +458,7 @@ class MulticallProxyWaiter(object):
self._dataqueue = queue.LightQueue()
# Add this caller to the reply proxy's call_waiters
self._reply_proxy.add_call_waiter(self, self._msg_id)
self.msg_id_cache = _MsgIdCache()
def put(self, data):
self._dataqueue.put(data)
@ -435,6 +472,7 @@ class MulticallProxyWaiter(object):
def _process_data(self, data):
result = None
self.msg_id_cache.check_duplicate_message(data)
if data['failure']:
failure = data['failure']
result = rpc_common.deserialize_remote_exception(self._conf,
@ -479,6 +517,7 @@ class MulticallWaiter(object):
self._done = False
self._got_ending = False
self._conf = conf
self.msg_id_cache = _MsgIdCache()
def done(self):
if self._done:
@ -490,6 +529,7 @@ class MulticallWaiter(object):
def __call__(self, data):
"""The consume() callback will call this. Store the result."""
self.msg_id_cache.check_duplicate_message(data)
if data['failure']:
failure = data['failure']
self._result = rpc_common.deserialize_remote_exception(self._conf,
@ -542,6 +582,7 @@ def multicall(conf, context, topic, msg, timeout, connection_pool):
msg_id = uuid.uuid4().hex
msg.update({'_msg_id': msg_id})
LOG.debug(_('MSG_ID is %s') % (msg_id))
_add_unique_id(msg)
pack_context(msg, context)
# TODO(pekowski): Remove this flag and the code under the if clause
@ -575,6 +616,7 @@ def call(conf, context, topic, msg, timeout, connection_pool):
def cast(conf, context, topic, msg, connection_pool):
"""Sends a message on a topic without waiting for a response."""
LOG.debug(_('Making asynchronous cast on %s...'), topic)
_add_unique_id(msg)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
conn.topic_send(topic, rpc_common.serialize_msg(msg))
@ -583,6 +625,7 @@ def cast(conf, context, topic, msg, connection_pool):
def fanout_cast(conf, context, topic, msg, connection_pool):
"""Sends a message on a fanout exchange without waiting for a response."""
LOG.debug(_('Making asynchronous fanout cast...'))
_add_unique_id(msg)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
conn.fanout_send(topic, rpc_common.serialize_msg(msg))
@ -590,6 +633,7 @@ def fanout_cast(conf, context, topic, msg, connection_pool):
def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
"""Sends a message on a topic to a specific server."""
_add_unique_id(msg)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool, pooled=False,
server_params=server_params) as conn:
@ -599,6 +643,7 @@ def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
def fanout_cast_to_server(conf, context, server_params, topic, msg,
connection_pool):
"""Sends a message on a fanout exchange to a specific server."""
_add_unique_id(msg)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool, pooled=False,
server_params=server_params) as conn:
@ -610,6 +655,7 @@ def notify(conf, context, topic, msg, connection_pool, envelope):
LOG.debug(_('Sending %(event_type)s on %(topic)s'),
dict(event_type=msg.get('event_type'),
topic=topic))
_add_unique_id(msg)
pack_context(msg, context)
with ConnectionContext(conf, connection_pool) as conn:
if envelope:

View File

@ -49,8 +49,8 @@ deserialize_msg().
The current message format (version 2.0) is very simple. It is:
{
'quantum.version': <RPC Envelope Version as a String>,
'quantum.message': <Application Message Payload, JSON encoded>
'oslo.version': <RPC Envelope Version as a String>,
'oslo.message': <Application Message Payload, JSON encoded>
}
Message format version '1.0' is just considered to be the messages we sent
@ -66,8 +66,8 @@ to the messaging libraries as a dict.
'''
_RPC_ENVELOPE_VERSION = '2.0'
_VERSION_KEY = 'quantum.version'
_MESSAGE_KEY = 'quantum.message'
_VERSION_KEY = 'oslo.version'
_MESSAGE_KEY = 'oslo.message'
# TODO(russellb) Turn this on after Grizzly.
@ -125,6 +125,10 @@ class Timeout(RPCException):
message = _("Timeout while waiting on RPC response.")
class DuplicateMessageError(RPCException):
message = _("Found duplicate message(%(msg_id)s). Skipping it.")
class InvalidRPCConnectionReuse(RPCException):
message = _("Invalid reuse of an RPC connection.")

View File

@ -198,6 +198,7 @@ class DirectConsumer(ConsumerBase):
"""
# Default options
options = {'durable': False,
'queue_arguments': _get_queue_arguments(conf),
'auto_delete': True,
'exclusive': False}
options.update(kwargs)

View File

@ -216,12 +216,18 @@ class ZmqClient(object):
socket_type = zmq.PUSH
self.outq = ZmqSocket(addr, socket_type, bind=bind)
def cast(self, msg_id, topic, data, serialize=True, force_envelope=False):
def cast(self, msg_id, topic, data, envelope=False):
msg_id = msg_id or 0
if serialize:
data = rpc_common.serialize_msg(data, force_envelope)
self.outq.send(map(bytes, (msg_id, topic, 'cast', _serialize(data))))
if not (envelope or rpc_common._SEND_RPC_ENVELOPE):
self.outq.send(map(bytes,
(msg_id, topic, 'cast', _serialize(data))))
return
rpc_envelope = rpc_common.serialize_msg(data[1], envelope)
zmq_msg = reduce(lambda x, y: x + y, rpc_envelope.items())
self.outq.send(map(bytes,
(msg_id, topic, 'impl_zmq_v2', data[0]) + zmq_msg))
def close(self):
self.outq.close()
@ -320,7 +326,7 @@ class ConsumerBase(object):
else:
return [result]
def process(self, style, target, proxy, ctx, data):
def process(self, proxy, ctx, data):
data.setdefault('version', None)
data.setdefault('args', {})
@ -432,12 +438,14 @@ class ZmqProxy(ZmqBaseReactor):
#TODO(ewindisch): use zero-copy (i.e. references, not copying)
data = sock.recv()
msg_id, topic, style, in_msg = data
topic = topic.split('.', 1)[0]
topic = data[1]
LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
if topic.startswith('fanout~') or topic.startswith('zmq_replies'):
if topic.startswith('fanout~'):
sock_type = zmq.PUB
topic = topic.split('.', 1)[0]
elif topic.startswith('zmq_replies'):
sock_type = zmq.PUB
else:
sock_type = zmq.PUSH
@ -520,6 +528,21 @@ class ZmqProxy(ZmqBaseReactor):
super(ZmqProxy, self).consume_in_thread()
def unflatten_envelope(packenv):
"""Unflattens the RPC envelope.
Takes a list and returns a dictionary.
i.e. [1,2,3,4] => {1: 2, 3: 4}
"""
i = iter(packenv)
h = {}
try:
while True:
k = i.next()
h[k] = i.next()
except StopIteration:
return h
class ZmqReactor(ZmqBaseReactor):
"""
A consumer class implementing a
@ -540,38 +563,50 @@ class ZmqReactor(ZmqBaseReactor):
self.mapping[sock].send(data)
return
msg_id, topic, style, in_msg = data
ctx, request = rpc_common.deserialize_msg(_deserialize(in_msg))
ctx = RpcContext.unmarshal(ctx)
proxy = self.proxies[sock]
self.pool.spawn_n(self.process, style, topic,
proxy, ctx, request)
if data[2] == 'cast': # Legacy protocol
packenv = data[3]
ctx, msg = _deserialize(packenv)
request = rpc_common.deserialize_msg(msg)
ctx = RpcContext.unmarshal(ctx)
elif data[2] == 'impl_zmq_v2':
packenv = data[4:]
msg = unflatten_envelope(packenv)
request = rpc_common.deserialize_msg(msg)
# Unmarshal only after verifying the message.
ctx = RpcContext.unmarshal(data[3])
else:
LOG.error(_("ZMQ Envelope version unsupported or unknown."))
return
self.pool.spawn_n(self.process, proxy, ctx, request)
class Connection(rpc_common.Connection):
"""Manages connections and threads."""
def __init__(self, conf):
self.topics = []
self.reactor = ZmqReactor(conf)
def create_consumer(self, topic, proxy, fanout=False):
# Only consume on the base topic name.
topic = topic.split('.', 1)[0]
LOG.info(_("Create Consumer for topic (%(topic)s)") %
{'topic': topic})
# Subscription scenarios
if fanout:
subscribe = ('', fanout)[type(fanout) == str]
sock_type = zmq.SUB
topic = 'fanout~' + topic
subscribe = ('', fanout)[type(fanout) == str]
topic = 'fanout~' + topic.split('.', 1)[0]
else:
sock_type = zmq.PULL
subscribe = None
topic = '.'.join((topic.split('.', 1)[0], CONF.rpc_zmq_host))
if topic in self.topics:
LOG.info(_("Skipping topic registration. Already registered."))
return
# Receive messages from (local) proxy
inaddr = "ipc://%s/zmq_topic_%s" % \
@ -582,9 +617,11 @@ class Connection(rpc_common.Connection):
self.reactor.register(proxy, inaddr, sock_type,
subscribe=subscribe, in_bind=False)
self.topics.append(topic)
def close(self):
self.reactor.close()
self.topics = []
def wait(self):
self.reactor.wait()
@ -593,8 +630,8 @@ class Connection(rpc_common.Connection):
self.reactor.consume_in_thread()
def _cast(addr, context, topic, msg, timeout=None, serialize=True,
force_envelope=False, _msg_id=None):
def _cast(addr, context, topic, msg, timeout=None, envelope=False,
_msg_id=None):
timeout_cast = timeout or CONF.rpc_cast_timeout
payload = [RpcContext.marshal(context), msg]
@ -603,7 +640,7 @@ def _cast(addr, context, topic, msg, timeout=None, serialize=True,
conn = ZmqClient(addr)
# assumes cast can't return an exception
conn.cast(_msg_id, topic, payload, serialize, force_envelope)
conn.cast(_msg_id, topic, payload, envelope)
except zmq.ZMQError:
raise RPCException("Cast failed. ZMQ Socket Exception")
finally:
@ -612,7 +649,7 @@ def _cast(addr, context, topic, msg, timeout=None, serialize=True,
def _call(addr, context, topic, msg, timeout=None,
serialize=True, force_envelope=False):
envelope=False):
# timeout_response is how long we wait for a response
timeout = timeout or CONF.rpc_response_timeout
@ -642,20 +679,31 @@ def _call(addr, context, topic, msg, timeout=None,
with Timeout(timeout, exception=rpc_common.Timeout):
try:
msg_waiter = ZmqSocket(
"ipc://%s/zmq_topic_zmq_replies" % CONF.rpc_zmq_ipc_dir,
"ipc://%s/zmq_topic_zmq_replies.%s" %
(CONF.rpc_zmq_ipc_dir,
CONF.rpc_zmq_host),
zmq.SUB, subscribe=msg_id, bind=False
)
LOG.debug(_("Sending cast"))
_cast(addr, context, topic, payload,
serialize=serialize, force_envelope=force_envelope)
_cast(addr, context, topic, payload, envelope)
LOG.debug(_("Cast sent; Waiting reply"))
# Blocks until receives reply
msg = msg_waiter.recv()
LOG.debug(_("Received message: %s"), msg)
LOG.debug(_("Unpacking response"))
responses = _deserialize(msg[-1])[-1]['args']['response']
if msg[2] == 'cast': # Legacy version
raw_msg = _deserialize(msg[-1])[-1]
elif msg[2] == 'impl_zmq_v2':
rpc_envelope = unflatten_envelope(msg[4:])
raw_msg = rpc_common.deserialize_msg(rpc_envelope)
else:
raise rpc_common.UnsupportedRpcEnvelopeVersion(
_("Unsupported or unknown ZMQ envelope returned."))
responses = raw_msg['args']['response']
# ZMQError trumps the Timeout error.
except zmq.ZMQError:
raise RPCException("ZMQ Socket Error")
@ -676,8 +724,8 @@ def _call(addr, context, topic, msg, timeout=None,
return responses[-1]
def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
force_envelope=False, _msg_id=None):
def _multi_send(method, context, topic, msg, timeout=None,
envelope=False, _msg_id=None):
"""
Wraps the sending of messages,
dispatches to the matchmaker and sends
@ -703,11 +751,11 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
if method.__name__ == '_cast':
eventlet.spawn_n(method, _addr, context,
_topic, msg, timeout, serialize,
force_envelope, _msg_id)
_topic, msg, timeout, envelope,
_msg_id)
return
return method(_addr, context, _topic, msg, timeout,
serialize, force_envelope)
envelope)
def create_connection(conf, new=True):
@ -746,8 +794,7 @@ def notify(conf, context, topic, msg, **kwargs):
# NOTE(ewindisch): dot-priority in rpc notifier does not
# work with our assumptions.
topic.replace('.', '-')
kwargs['serialize'] = kwargs.pop('envelope')
kwargs['force_envelope'] = True
kwargs['envelope'] = kwargs.get('envelope', True)
cast(conf, context, topic, msg, **kwargs)

View File

@ -117,9 +117,9 @@ def _run_shell_command(cmd, throw_on_error=False):
output = subprocess.Popen(["/bin/sh", "-c", cmd],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
out = output.communicate()
if output.returncode and throw_on_error:
raise Exception("%s returned %d" % cmd, output.returncode)
out = output.communicate()
if len(out) == 0:
return None
if len(out[0].strip()) == 0:
@ -131,7 +131,7 @@ def write_git_changelog():
"""Write a changelog based on the git changelog."""
new_changelog = 'ChangeLog'
if not os.getenv('SKIP_WRITE_GIT_CHANGELOG'):
if os.path.isdir('.git'):
if os.path.exists('.git'):
git_log_cmd = 'git log --stat'
changelog = _run_shell_command(git_log_cmd)
mailmap = parse_mailmap()
@ -147,7 +147,7 @@ def generate_authors():
old_authors = 'AUTHORS.in'
new_authors = 'AUTHORS'
if not os.getenv('SKIP_GENERATE_AUTHORS'):
if os.path.isdir('.git'):
if os.path.exists('.git'):
# don't include jenkins email address in AUTHORS file
git_log_cmd = ("git log --format='%aN <%aE>' | sort -u | "
"egrep -v '" + jenkins_email + "'")
@ -279,7 +279,7 @@ def _get_version_from_git(pre_version):
revision if there is one, or tag plus number of additional revisions
if the current revision has no tag."""
if os.path.isdir('.git'):
if os.path.exists('.git'):
if pre_version:
try:
return _run_shell_command(