From 56e63782197d8b2512456e5a9ec6e96c7b6033dc Mon Sep 17 00:00:00 2001 From: Gary Kotton Date: Mon, 25 Feb 2013 17:55:58 +0000 Subject: [PATCH] Latest common updates Fixes bug 1132908 Change-Id: I726bc5b3199d2fa606832418db9b6e20ffbc3879 --- quantum/openstack/common/eventlet_backdoor.py | 8 ++ quantum/openstack/common/log.py | 9 +- quantum/openstack/common/rpc/amqp.py | 46 +++++++ quantum/openstack/common/rpc/common.py | 12 +- quantum/openstack/common/rpc/impl_kombu.py | 1 + quantum/openstack/common/rpc/impl_zmq.py | 123 ++++++++++++------ quantum/openstack/common/setup.py | 8 +- 7 files changed, 154 insertions(+), 53 deletions(-) diff --git a/quantum/openstack/common/eventlet_backdoor.py b/quantum/openstack/common/eventlet_backdoor.py index 61ceded43..8b81ebf8e 100644 --- a/quantum/openstack/common/eventlet_backdoor.py +++ b/quantum/openstack/common/eventlet_backdoor.py @@ -51,12 +51,20 @@ def _print_greenthreads(): print +def _print_nativethreads(): + for threadId, stack in sys._current_frames().items(): + print threadId + traceback.print_stack(stack) + print + + def initialize_if_enabled(): backdoor_locals = { 'exit': _dont_use_this, # So we don't exit the entire process 'quit': _dont_use_this, # So we don't exit the entire process 'fo': _find_objects, 'pgt': _print_greenthreads, + 'pnt': _print_nativethreads, } if CONF.backdoor_port is None: diff --git a/quantum/openstack/common/log.py b/quantum/openstack/common/log.py index ef7e27e8a..37245d4da 100644 --- a/quantum/openstack/common/log.py +++ b/quantum/openstack/common/log.py @@ -325,16 +325,11 @@ def _create_logging_excepthook(product_name): def setup(product_name): """Setup logging.""" - sys.excepthook = _create_logging_excepthook(product_name) - if CONF.log_config: - try: - logging.config.fileConfig(CONF.log_config) - except Exception: - traceback.print_exc() - raise + logging.config.fileConfig(CONF.log_config) else: _setup_logging_from_conf(product_name) + sys.excepthook = _create_logging_excepthook(product_name) def set_defaults(logging_context_format_string): diff --git a/quantum/openstack/common/rpc/amqp.py b/quantum/openstack/common/rpc/amqp.py index 4b7b99dc2..a6d5002d5 100644 --- a/quantum/openstack/common/rpc/amqp.py +++ b/quantum/openstack/common/rpc/amqp.py @@ -25,6 +25,7 @@ Specifically, this includes impl_kombu and impl_qpid. impl_carrot also uses AMQP, but is deprecated and predates this code. """ +import collections import inspect import sys import uuid @@ -54,6 +55,7 @@ amqp_opts = [ cfg.CONF.register_opts(amqp_opts) +UNIQUE_ID = '_unique_id' LOG = logging.getLogger(__name__) @@ -236,6 +238,7 @@ def msg_reply(conf, msg_id, reply_q, connection_pool, reply=None, 'failure': failure} if ending: msg['ending'] = True + _add_unique_id(msg) # If a reply_q exists, add the msg_id to the reply and pass the # reply_q to direct_send() to use it as the response queue. # Otherwise use the msg_id for backward compatibilty. @@ -302,6 +305,37 @@ def pack_context(msg, context): msg.update(context_d) +class _MsgIdCache(object): + """This class checks any duplicate messages.""" + + # NOTE: This value is considered can be a configuration item, but + # it is not necessary to change its value in most cases, + # so let this value as static for now. + DUP_MSG_CHECK_SIZE = 16 + + def __init__(self, **kwargs): + self.prev_msgids = collections.deque([], + maxlen=self.DUP_MSG_CHECK_SIZE) + + def check_duplicate_message(self, message_data): + """AMQP consumers may read same message twice when exceptions occur + before ack is returned. This method prevents doing it. + """ + if UNIQUE_ID in message_data: + msg_id = message_data[UNIQUE_ID] + if msg_id not in self.prev_msgids: + self.prev_msgids.append(msg_id) + else: + raise rpc_common.DuplicateMessageError(msg_id=msg_id) + + +def _add_unique_id(msg): + """Add unique_id for checking duplicate messages.""" + unique_id = uuid.uuid4().hex + msg.update({UNIQUE_ID: unique_id}) + LOG.debug(_('UNIQUE_ID is %s.') % (unique_id)) + + class _ThreadPoolWithWait(object): """Base class for a delayed invocation manager used by the Connection class to start up green threads @@ -349,6 +383,7 @@ class ProxyCallback(_ThreadPoolWithWait): connection_pool=connection_pool, ) self.proxy = proxy + self.msg_id_cache = _MsgIdCache() def __call__(self, message_data): """Consumer callback to call a method on a proxy object. @@ -368,6 +403,7 @@ class ProxyCallback(_ThreadPoolWithWait): if hasattr(local.store, 'context'): del local.store.context rpc_common._safe_log(LOG.debug, _('received %s'), message_data) + self.msg_id_cache.check_duplicate_message(message_data) ctxt = unpack_context(self.conf, message_data) method = message_data.get('method') args = message_data.get('args', {}) @@ -422,6 +458,7 @@ class MulticallProxyWaiter(object): self._dataqueue = queue.LightQueue() # Add this caller to the reply proxy's call_waiters self._reply_proxy.add_call_waiter(self, self._msg_id) + self.msg_id_cache = _MsgIdCache() def put(self, data): self._dataqueue.put(data) @@ -435,6 +472,7 @@ class MulticallProxyWaiter(object): def _process_data(self, data): result = None + self.msg_id_cache.check_duplicate_message(data) if data['failure']: failure = data['failure'] result = rpc_common.deserialize_remote_exception(self._conf, @@ -479,6 +517,7 @@ class MulticallWaiter(object): self._done = False self._got_ending = False self._conf = conf + self.msg_id_cache = _MsgIdCache() def done(self): if self._done: @@ -490,6 +529,7 @@ class MulticallWaiter(object): def __call__(self, data): """The consume() callback will call this. Store the result.""" + self.msg_id_cache.check_duplicate_message(data) if data['failure']: failure = data['failure'] self._result = rpc_common.deserialize_remote_exception(self._conf, @@ -542,6 +582,7 @@ def multicall(conf, context, topic, msg, timeout, connection_pool): msg_id = uuid.uuid4().hex msg.update({'_msg_id': msg_id}) LOG.debug(_('MSG_ID is %s') % (msg_id)) + _add_unique_id(msg) pack_context(msg, context) # TODO(pekowski): Remove this flag and the code under the if clause @@ -575,6 +616,7 @@ def call(conf, context, topic, msg, timeout, connection_pool): def cast(conf, context, topic, msg, connection_pool): """Sends a message on a topic without waiting for a response.""" LOG.debug(_('Making asynchronous cast on %s...'), topic) + _add_unique_id(msg) pack_context(msg, context) with ConnectionContext(conf, connection_pool) as conn: conn.topic_send(topic, rpc_common.serialize_msg(msg)) @@ -583,6 +625,7 @@ def cast(conf, context, topic, msg, connection_pool): def fanout_cast(conf, context, topic, msg, connection_pool): """Sends a message on a fanout exchange without waiting for a response.""" LOG.debug(_('Making asynchronous fanout cast...')) + _add_unique_id(msg) pack_context(msg, context) with ConnectionContext(conf, connection_pool) as conn: conn.fanout_send(topic, rpc_common.serialize_msg(msg)) @@ -590,6 +633,7 @@ def fanout_cast(conf, context, topic, msg, connection_pool): def cast_to_server(conf, context, server_params, topic, msg, connection_pool): """Sends a message on a topic to a specific server.""" + _add_unique_id(msg) pack_context(msg, context) with ConnectionContext(conf, connection_pool, pooled=False, server_params=server_params) as conn: @@ -599,6 +643,7 @@ def cast_to_server(conf, context, server_params, topic, msg, connection_pool): def fanout_cast_to_server(conf, context, server_params, topic, msg, connection_pool): """Sends a message on a fanout exchange to a specific server.""" + _add_unique_id(msg) pack_context(msg, context) with ConnectionContext(conf, connection_pool, pooled=False, server_params=server_params) as conn: @@ -610,6 +655,7 @@ def notify(conf, context, topic, msg, connection_pool, envelope): LOG.debug(_('Sending %(event_type)s on %(topic)s'), dict(event_type=msg.get('event_type'), topic=topic)) + _add_unique_id(msg) pack_context(msg, context) with ConnectionContext(conf, connection_pool) as conn: if envelope: diff --git a/quantum/openstack/common/rpc/common.py b/quantum/openstack/common/rpc/common.py index 4421e7d95..67a9ebc3b 100644 --- a/quantum/openstack/common/rpc/common.py +++ b/quantum/openstack/common/rpc/common.py @@ -49,8 +49,8 @@ deserialize_msg(). The current message format (version 2.0) is very simple. It is: { - 'quantum.version': , - 'quantum.message': + 'oslo.version': , + 'oslo.message': } Message format version '1.0' is just considered to be the messages we sent @@ -66,8 +66,8 @@ to the messaging libraries as a dict. ''' _RPC_ENVELOPE_VERSION = '2.0' -_VERSION_KEY = 'quantum.version' -_MESSAGE_KEY = 'quantum.message' +_VERSION_KEY = 'oslo.version' +_MESSAGE_KEY = 'oslo.message' # TODO(russellb) Turn this on after Grizzly. @@ -125,6 +125,10 @@ class Timeout(RPCException): message = _("Timeout while waiting on RPC response.") +class DuplicateMessageError(RPCException): + message = _("Found duplicate message(%(msg_id)s). Skipping it.") + + class InvalidRPCConnectionReuse(RPCException): message = _("Invalid reuse of an RPC connection.") diff --git a/quantum/openstack/common/rpc/impl_kombu.py b/quantum/openstack/common/rpc/impl_kombu.py index dd4d47e21..2a7f2dd8b 100644 --- a/quantum/openstack/common/rpc/impl_kombu.py +++ b/quantum/openstack/common/rpc/impl_kombu.py @@ -198,6 +198,7 @@ class DirectConsumer(ConsumerBase): """ # Default options options = {'durable': False, + 'queue_arguments': _get_queue_arguments(conf), 'auto_delete': True, 'exclusive': False} options.update(kwargs) diff --git a/quantum/openstack/common/rpc/impl_zmq.py b/quantum/openstack/common/rpc/impl_zmq.py index caa89c88e..421457a42 100644 --- a/quantum/openstack/common/rpc/impl_zmq.py +++ b/quantum/openstack/common/rpc/impl_zmq.py @@ -216,12 +216,18 @@ class ZmqClient(object): socket_type = zmq.PUSH self.outq = ZmqSocket(addr, socket_type, bind=bind) - def cast(self, msg_id, topic, data, serialize=True, force_envelope=False): + def cast(self, msg_id, topic, data, envelope=False): msg_id = msg_id or 0 - if serialize: - data = rpc_common.serialize_msg(data, force_envelope) - self.outq.send(map(bytes, (msg_id, topic, 'cast', _serialize(data)))) + if not (envelope or rpc_common._SEND_RPC_ENVELOPE): + self.outq.send(map(bytes, + (msg_id, topic, 'cast', _serialize(data)))) + return + + rpc_envelope = rpc_common.serialize_msg(data[1], envelope) + zmq_msg = reduce(lambda x, y: x + y, rpc_envelope.items()) + self.outq.send(map(bytes, + (msg_id, topic, 'impl_zmq_v2', data[0]) + zmq_msg)) def close(self): self.outq.close() @@ -320,7 +326,7 @@ class ConsumerBase(object): else: return [result] - def process(self, style, target, proxy, ctx, data): + def process(self, proxy, ctx, data): data.setdefault('version', None) data.setdefault('args', {}) @@ -432,12 +438,14 @@ class ZmqProxy(ZmqBaseReactor): #TODO(ewindisch): use zero-copy (i.e. references, not copying) data = sock.recv() - msg_id, topic, style, in_msg = data - topic = topic.split('.', 1)[0] + topic = data[1] LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data))) - if topic.startswith('fanout~') or topic.startswith('zmq_replies'): + if topic.startswith('fanout~'): + sock_type = zmq.PUB + topic = topic.split('.', 1)[0] + elif topic.startswith('zmq_replies'): sock_type = zmq.PUB else: sock_type = zmq.PUSH @@ -520,6 +528,21 @@ class ZmqProxy(ZmqBaseReactor): super(ZmqProxy, self).consume_in_thread() +def unflatten_envelope(packenv): + """Unflattens the RPC envelope. + Takes a list and returns a dictionary. + i.e. [1,2,3,4] => {1: 2, 3: 4} + """ + i = iter(packenv) + h = {} + try: + while True: + k = i.next() + h[k] = i.next() + except StopIteration: + return h + + class ZmqReactor(ZmqBaseReactor): """ A consumer class implementing a @@ -540,38 +563,50 @@ class ZmqReactor(ZmqBaseReactor): self.mapping[sock].send(data) return - msg_id, topic, style, in_msg = data - - ctx, request = rpc_common.deserialize_msg(_deserialize(in_msg)) - ctx = RpcContext.unmarshal(ctx) - proxy = self.proxies[sock] - self.pool.spawn_n(self.process, style, topic, - proxy, ctx, request) + if data[2] == 'cast': # Legacy protocol + packenv = data[3] + + ctx, msg = _deserialize(packenv) + request = rpc_common.deserialize_msg(msg) + ctx = RpcContext.unmarshal(ctx) + elif data[2] == 'impl_zmq_v2': + packenv = data[4:] + + msg = unflatten_envelope(packenv) + request = rpc_common.deserialize_msg(msg) + + # Unmarshal only after verifying the message. + ctx = RpcContext.unmarshal(data[3]) + else: + LOG.error(_("ZMQ Envelope version unsupported or unknown.")) + return + + self.pool.spawn_n(self.process, proxy, ctx, request) class Connection(rpc_common.Connection): """Manages connections and threads.""" def __init__(self, conf): + self.topics = [] self.reactor = ZmqReactor(conf) def create_consumer(self, topic, proxy, fanout=False): - # Only consume on the base topic name. - topic = topic.split('.', 1)[0] - - LOG.info(_("Create Consumer for topic (%(topic)s)") % - {'topic': topic}) - # Subscription scenarios if fanout: - subscribe = ('', fanout)[type(fanout) == str] sock_type = zmq.SUB - topic = 'fanout~' + topic + subscribe = ('', fanout)[type(fanout) == str] + topic = 'fanout~' + topic.split('.', 1)[0] else: sock_type = zmq.PULL subscribe = None + topic = '.'.join((topic.split('.', 1)[0], CONF.rpc_zmq_host)) + + if topic in self.topics: + LOG.info(_("Skipping topic registration. Already registered.")) + return # Receive messages from (local) proxy inaddr = "ipc://%s/zmq_topic_%s" % \ @@ -582,9 +617,11 @@ class Connection(rpc_common.Connection): self.reactor.register(proxy, inaddr, sock_type, subscribe=subscribe, in_bind=False) + self.topics.append(topic) def close(self): self.reactor.close() + self.topics = [] def wait(self): self.reactor.wait() @@ -593,8 +630,8 @@ class Connection(rpc_common.Connection): self.reactor.consume_in_thread() -def _cast(addr, context, topic, msg, timeout=None, serialize=True, - force_envelope=False, _msg_id=None): +def _cast(addr, context, topic, msg, timeout=None, envelope=False, + _msg_id=None): timeout_cast = timeout or CONF.rpc_cast_timeout payload = [RpcContext.marshal(context), msg] @@ -603,7 +640,7 @@ def _cast(addr, context, topic, msg, timeout=None, serialize=True, conn = ZmqClient(addr) # assumes cast can't return an exception - conn.cast(_msg_id, topic, payload, serialize, force_envelope) + conn.cast(_msg_id, topic, payload, envelope) except zmq.ZMQError: raise RPCException("Cast failed. ZMQ Socket Exception") finally: @@ -612,7 +649,7 @@ def _cast(addr, context, topic, msg, timeout=None, serialize=True, def _call(addr, context, topic, msg, timeout=None, - serialize=True, force_envelope=False): + envelope=False): # timeout_response is how long we wait for a response timeout = timeout or CONF.rpc_response_timeout @@ -642,20 +679,31 @@ def _call(addr, context, topic, msg, timeout=None, with Timeout(timeout, exception=rpc_common.Timeout): try: msg_waiter = ZmqSocket( - "ipc://%s/zmq_topic_zmq_replies" % CONF.rpc_zmq_ipc_dir, + "ipc://%s/zmq_topic_zmq_replies.%s" % + (CONF.rpc_zmq_ipc_dir, + CONF.rpc_zmq_host), zmq.SUB, subscribe=msg_id, bind=False ) LOG.debug(_("Sending cast")) - _cast(addr, context, topic, payload, - serialize=serialize, force_envelope=force_envelope) + _cast(addr, context, topic, payload, envelope) LOG.debug(_("Cast sent; Waiting reply")) # Blocks until receives reply msg = msg_waiter.recv() LOG.debug(_("Received message: %s"), msg) LOG.debug(_("Unpacking response")) - responses = _deserialize(msg[-1])[-1]['args']['response'] + + if msg[2] == 'cast': # Legacy version + raw_msg = _deserialize(msg[-1])[-1] + elif msg[2] == 'impl_zmq_v2': + rpc_envelope = unflatten_envelope(msg[4:]) + raw_msg = rpc_common.deserialize_msg(rpc_envelope) + else: + raise rpc_common.UnsupportedRpcEnvelopeVersion( + _("Unsupported or unknown ZMQ envelope returned.")) + + responses = raw_msg['args']['response'] # ZMQError trumps the Timeout error. except zmq.ZMQError: raise RPCException("ZMQ Socket Error") @@ -676,8 +724,8 @@ def _call(addr, context, topic, msg, timeout=None, return responses[-1] -def _multi_send(method, context, topic, msg, timeout=None, serialize=True, - force_envelope=False, _msg_id=None): +def _multi_send(method, context, topic, msg, timeout=None, + envelope=False, _msg_id=None): """ Wraps the sending of messages, dispatches to the matchmaker and sends @@ -703,11 +751,11 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True, if method.__name__ == '_cast': eventlet.spawn_n(method, _addr, context, - _topic, msg, timeout, serialize, - force_envelope, _msg_id) + _topic, msg, timeout, envelope, + _msg_id) return return method(_addr, context, _topic, msg, timeout, - serialize, force_envelope) + envelope) def create_connection(conf, new=True): @@ -746,8 +794,7 @@ def notify(conf, context, topic, msg, **kwargs): # NOTE(ewindisch): dot-priority in rpc notifier does not # work with our assumptions. topic.replace('.', '-') - kwargs['serialize'] = kwargs.pop('envelope') - kwargs['force_envelope'] = True + kwargs['envelope'] = kwargs.get('envelope', True) cast(conf, context, topic, msg, **kwargs) diff --git a/quantum/openstack/common/setup.py b/quantum/openstack/common/setup.py index 35680b304..22f864d50 100644 --- a/quantum/openstack/common/setup.py +++ b/quantum/openstack/common/setup.py @@ -117,9 +117,9 @@ def _run_shell_command(cmd, throw_on_error=False): output = subprocess.Popen(["/bin/sh", "-c", cmd], stdout=subprocess.PIPE, stderr=subprocess.PIPE) + out = output.communicate() if output.returncode and throw_on_error: raise Exception("%s returned %d" % cmd, output.returncode) - out = output.communicate() if len(out) == 0: return None if len(out[0].strip()) == 0: @@ -131,7 +131,7 @@ def write_git_changelog(): """Write a changelog based on the git changelog.""" new_changelog = 'ChangeLog' if not os.getenv('SKIP_WRITE_GIT_CHANGELOG'): - if os.path.isdir('.git'): + if os.path.exists('.git'): git_log_cmd = 'git log --stat' changelog = _run_shell_command(git_log_cmd) mailmap = parse_mailmap() @@ -147,7 +147,7 @@ def generate_authors(): old_authors = 'AUTHORS.in' new_authors = 'AUTHORS' if not os.getenv('SKIP_GENERATE_AUTHORS'): - if os.path.isdir('.git'): + if os.path.exists('.git'): # don't include jenkins email address in AUTHORS file git_log_cmd = ("git log --format='%aN <%aE>' | sort -u | " "egrep -v '" + jenkins_email + "'") @@ -279,7 +279,7 @@ def _get_version_from_git(pre_version): revision if there is one, or tag plus number of additional revisions if the current revision has no tag.""" - if os.path.isdir('.git'): + if os.path.exists('.git'): if pre_version: try: return _run_shell_command(