From 02e5b6ccfb301624a04ced0c4361788ab38ccfe4 Mon Sep 17 00:00:00 2001 From: Michael Still Date: Thu, 18 Oct 2012 11:43:37 -0700 Subject: [PATCH] Update common. policy.py and setup.py are not updated as they cause test failures. I will investigate them separately. Change-Id: Ie46626e67ce5c32baf0eda9f63afb33611d5015a --- quantum/openstack/common/log.py | 9 +- quantum/openstack/common/rpc/impl_zmq.py | 96 +++++----------------- quantum/openstack/common/rpc/matchmaker.py | 31 +------ quantum/openstack/common/rpc/service.py | 9 +- quantum/openstack/common/timeutils.py | 21 +++-- 5 files changed, 45 insertions(+), 121 deletions(-) diff --git a/quantum/openstack/common/log.py b/quantum/openstack/common/log.py index 00340384f5..be29bf8ad9 100644 --- a/quantum/openstack/common/log.py +++ b/quantum/openstack/common/log.py @@ -54,15 +54,14 @@ log_opts = [ '%(message)s', help='format string to use for log messages with context'), cfg.StrOpt('logging_default_format_string', - default='%(asctime)s %(levelname)s %(name)s [-] %(instance)s' - '%(message)s', + default='%(asctime)s %(process)d %(levelname)s %(name)s [-]' + ' %(instance)s%(message)s', help='format string to use for log messages without context'), cfg.StrOpt('logging_debug_format_suffix', - default='from (pid=%(process)d) %(funcName)s ' - '%(pathname)s:%(lineno)d', + default='%(funcName)s %(pathname)s:%(lineno)d', help='data to append to log format when level is DEBUG'), cfg.StrOpt('logging_exception_prefix', - default='%(asctime)s TRACE %(name)s %(instance)s', + default='%(asctime)s %(process)d TRACE %(name)s %(instance)s', help='prefix each line of exception output with this format'), cfg.ListOpt('default_log_levels', default=[ diff --git a/quantum/openstack/common/rpc/impl_zmq.py b/quantum/openstack/common/rpc/impl_zmq.py index bee1487fc2..f4fcce0389 100644 --- a/quantum/openstack/common/rpc/impl_zmq.py +++ b/quantum/openstack/common/rpc/impl_zmq.py @@ -58,9 +58,6 @@ zmq_opts = [ cfg.IntOpt('rpc_zmq_port', default=9501, help='ZeroMQ receiver listening port'), - cfg.IntOpt('rpc_zmq_port_pub', default=9502, - help='ZeroMQ fanout publisher port'), - cfg.IntOpt('rpc_zmq_contexts', default=1, help='Number of ZeroMQ contexts, defaults to 1'), @@ -209,7 +206,7 @@ class ZmqClient(object): self.outq = ZmqSocket(addr, socket_type, bind=bind) def cast(self, msg_id, topic, data): - self.outq.send([str(topic), str(msg_id), str('cast'), + self.outq.send([str(msg_id), str(topic), str('cast'), _serialize(data)]) def close(self): @@ -302,9 +299,6 @@ class ConsumerBase(object): else: return [result] - def consume(self, sock): - raise NotImplementedError() - def process(self, style, target, proxy, ctx, data): # Method starting with - are # processed internally. (non-valid method name) @@ -417,17 +411,12 @@ class ZmqProxy(ZmqBaseReactor): zmq.PUB, bind=True) self.sockets.append(self.topic_proxy['zmq_replies']) - self.topic_proxy['fanout~'] = \ - ZmqSocket("tcp://%s:%s" % (CONF.rpc_zmq_bind_address, - CONF.rpc_zmq_port_pub), zmq.PUB, bind=True) - self.sockets.append(self.topic_proxy['fanout~']) - def consume(self, sock): ipc_dir = CONF.rpc_zmq_ipc_dir #TODO(ewindisch): use zero-copy (i.e. references, not copying) data = sock.recv() - topic, msg_id, style, in_msg = data + msg_id, topic, style, in_msg = data topic = topic.split('.', 1)[0] LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data))) @@ -435,11 +424,6 @@ class ZmqProxy(ZmqBaseReactor): # Handle zmq_replies magic if topic.startswith('fanout~'): sock_type = zmq.PUB - - # This doesn't change what is in the message, - # it only specifies that these messages go to - # the generic fanout topic. - topic = 'fanout~' elif topic.startswith('zmq_replies'): sock_type = zmq.PUB inside = _deserialize(in_msg) @@ -450,32 +434,23 @@ class ZmqProxy(ZmqBaseReactor): else: sock_type = zmq.PUSH - if not topic in self.topic_proxy: - outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic), - sock_type, bind=True) - self.topic_proxy[topic] = outq - self.sockets.append(outq) - LOG.info(_("Created topic proxy: %s"), topic) + if not topic in self.topic_proxy: + outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic), + sock_type, bind=True) + self.topic_proxy[topic] = outq + self.sockets.append(outq) + LOG.info(_("Created topic proxy: %s"), topic) + + # It takes some time for a pub socket to open, + # before we can have any faith in doing a send() to it. + if sock_type == zmq.PUB: + eventlet.sleep(.5) LOG.debug(_("ROUTER RELAY-OUT START %(data)s") % {'data': data}) self.topic_proxy[topic].send(data) LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % {'data': data}) -class CallbackReactor(ZmqBaseReactor): - """ - A consumer class passing messages to a callback - """ - - def __init__(self, conf, callback): - self._cb = callback - super(CallbackReactor, self).__init__(conf) - - def consume(self, sock): - data = sock.recv() - self._cb(data[3]) - - class ZmqReactor(ZmqBaseReactor): """ A consumer class implementing a @@ -496,7 +471,7 @@ class ZmqReactor(ZmqBaseReactor): self.mapping[sock].send(data) return - topic, msg_id, style, in_msg = data + msg_id, topic, style, in_msg = data ctx, request = _deserialize(in_msg) ctx = RpcContext.unmarshal(ctx) @@ -513,26 +488,6 @@ class Connection(rpc_common.Connection): def __init__(self, conf): self.reactor = ZmqReactor(conf) - def _consume_fanout(self, reactor, topic, proxy, bind=False): - for topic, host in matchmaker.queues("publishers~%s" % (topic, )): - inaddr = "tcp://%s:%s" % (host, CONF.rpc_zmq_port) - reactor.register(proxy, inaddr, zmq.SUB, in_bind=bind) - - def declare_topic_consumer(self, topic, callback=None, - queue_name=None): - """declare_topic_consumer is a private method, but - it is being used by Quantum (Folsom). - This has been added compatibility. - """ - # Only consume on the base topic name. - topic = topic.split('.', 1)[0] - - if CONF.rpc_zmq_host in matchmaker.queues("fanout~%s" % (topic, )): - return - - reactor = CallbackReactor(CONF, callback) - self._consume_fanout(reactor, topic, None, bind=False) - def create_consumer(self, topic, proxy, fanout=False): # Only consume on the base topic name. topic = topic.split('.', 1)[0] @@ -540,35 +495,22 @@ class Connection(rpc_common.Connection): LOG.info(_("Create Consumer for topic (%(topic)s)") % {'topic': topic}) - # Consume direct-push fanout messages (relay to local consumers) + # Subscription scenarios if fanout: - # If we're not in here, we can't receive direct fanout messages - if CONF.rpc_zmq_host in matchmaker.queues(topic): - # Consume from all remote publishers. - self._consume_fanout(self.reactor, topic, proxy) - else: - LOG.warn("This service cannot receive direct PUSH fanout " - "messages without being known by the matchmaker.") - return - - # Configure consumer for direct pushes. - subscribe = (topic, fanout)[type(fanout) == str] + subscribe = ('', fanout)[type(fanout) == str] sock_type = zmq.SUB topic = 'fanout~' + topic - - inaddr = "tcp://127.0.0.1:%s" % (CONF.rpc_zmq_port_pub, ) else: sock_type = zmq.PULL subscribe = None - # Receive messages from (local) proxy - inaddr = "ipc://%s/zmq_topic_%s" % \ - (CONF.rpc_zmq_ipc_dir, topic) + # Receive messages from (local) proxy + inaddr = "ipc://%s/zmq_topic_%s" % \ + (CONF.rpc_zmq_ipc_dir, topic) LOG.debug(_("Consumer is a zmq.%s"), ['PULL', 'SUB'][sock_type == zmq.SUB]) - # Consume messages from local rpc-zmq-receiver daemon. self.reactor.register(proxy, inaddr, sock_type, subscribe=subscribe, in_bind=False) diff --git a/quantum/openstack/common/rpc/matchmaker.py b/quantum/openstack/common/rpc/matchmaker.py index d5006c5a0e..ecb54eb8ce 100644 --- a/quantum/openstack/common/rpc/matchmaker.py +++ b/quantum/openstack/common/rpc/matchmaker.py @@ -132,14 +132,6 @@ class FanoutBinding(Binding): return False -class PublisherBinding(Binding): - """Match on publishers keys, where key starts with 'publishers.' string.""" - def test(self, key): - if key.startswith('publishers~'): - return True - return False - - class StubExchange(Exchange): """Exchange that does nothing.""" def run(self, key): @@ -190,23 +182,6 @@ class RoundRobinRingExchange(RingExchange): return [(key + '.' + host, host)] -class PublisherRingExchange(RingExchange): - """Fanout Exchange based on a hashmap.""" - def __init__(self, ring=None): - super(PublisherRingExchange, self).__init__(ring) - - def run(self, key): - # Assume starts with "publishers~", strip it for lookup. - nkey = key.split('publishers~')[1:][0] - if not self._ring_has(nkey): - LOG.warn( - _("No key defining hosts for topic '%s', " - "see ringfile") % (nkey, ) - ) - return [] - return map(lambda x: (key + '.' + x, x), self.ring[nkey]) - - class FanoutRingExchange(RingExchange): """Fanout Exchange based on a hashmap.""" def __init__(self, ring=None): @@ -221,8 +196,7 @@ class FanoutRingExchange(RingExchange): "see ringfile") % (nkey, ) ) return [] - return map(lambda x: (key + '.' + x, x), self.ring[nkey] + - ['localhost']) + return map(lambda x: (key + '.' + x, x), self.ring[nkey]) class LocalhostExchange(Exchange): @@ -253,7 +227,6 @@ class MatchMakerRing(MatchMakerBase): """ def __init__(self, ring=None): super(MatchMakerRing, self).__init__() - self.add_binding(PublisherBinding(), PublisherRingExchange(ring)) self.add_binding(FanoutBinding(), FanoutRingExchange(ring)) self.add_binding(DirectBinding(), DirectExchange()) self.add_binding(TopicBinding(), RoundRobinRingExchange(ring)) @@ -266,7 +239,6 @@ class MatchMakerLocalhost(MatchMakerBase): """ def __init__(self): super(MatchMakerLocalhost, self).__init__() - self.add_binding(PublisherBinding(), LocalhostExchange()) self.add_binding(FanoutBinding(), LocalhostExchange()) self.add_binding(DirectBinding(), DirectExchange()) self.add_binding(TopicBinding(), LocalhostExchange()) @@ -281,7 +253,6 @@ class MatchMakerStub(MatchMakerBase): def __init__(self): super(MatchMakerLocalhost, self).__init__() - self.add_binding(PublisherBinding(), StubExchange()) self.add_binding(FanoutBinding(), StubExchange()) self.add_binding(DirectBinding(), StubExchange()) self.add_binding(TopicBinding(), StubExchange()) diff --git a/quantum/openstack/common/rpc/service.py b/quantum/openstack/common/rpc/service.py index 0b2b474766..30ffaaec4d 100644 --- a/quantum/openstack/common/rpc/service.py +++ b/quantum/openstack/common/rpc/service.py @@ -20,6 +20,7 @@ from quantum.openstack.common.gettextutils import _ from quantum.openstack.common import log as logging from quantum.openstack.common import rpc +from quantum.openstack.common.rpc import dispatcher as rpc_dispatcher from quantum.openstack.common import service @@ -46,15 +47,15 @@ class Service(service.Service): LOG.debug(_("Creating Consumer connection for Service %s") % self.topic) - rpc_dispatcher = rpc.dispatcher.RpcDispatcher([self.manager]) + dispatcher = rpc_dispatcher.RpcDispatcher([self.manager]) # Share this same connection for these Consumers - self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=False) + self.conn.create_consumer(self.topic, dispatcher, fanout=False) node_topic = '%s.%s' % (self.topic, self.host) - self.conn.create_consumer(node_topic, rpc_dispatcher, fanout=False) + self.conn.create_consumer(node_topic, dispatcher, fanout=False) - self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=True) + self.conn.create_consumer(self.topic, dispatcher, fanout=True) # Consume from all consumers in a thread self.conn.consume_in_thread() diff --git a/quantum/openstack/common/timeutils.py b/quantum/openstack/common/timeutils.py index c4f6cf0497..86004391de 100644 --- a/quantum/openstack/common/timeutils.py +++ b/quantum/openstack/common/timeutils.py @@ -62,9 +62,11 @@ def parse_strtime(timestr, fmt=PERFECT_TIME_FORMAT): def normalize_time(timestamp): - """Normalize time in arbitrary timezone to UTC""" + """Normalize time in arbitrary timezone to UTC naive object""" offset = timestamp.utcoffset() - return timestamp.replace(tzinfo=None) - offset if offset else timestamp + if offset is None: + return timestamp + return timestamp.replace(tzinfo=None) - offset def is_older_than(before, seconds): @@ -72,6 +74,11 @@ def is_older_than(before, seconds): return utcnow() - before > datetime.timedelta(seconds=seconds) +def is_newer_than(after, seconds): + """Return True if after is newer than seconds.""" + return after - utcnow() > datetime.timedelta(seconds=seconds) + + def utcnow_ts(): """Timestamp version of our utcnow function.""" return calendar.timegm(utcnow().timetuple()) @@ -121,6 +128,10 @@ def marshall_now(now=None): def unmarshall_time(tyme): """Unmarshall a datetime dict.""" - return datetime.datetime(day=tyme['day'], month=tyme['month'], - year=tyme['year'], hour=tyme['hour'], minute=tyme['minute'], - second=tyme['second'], microsecond=tyme['microsecond']) + return datetime.datetime(day=tyme['day'], + month=tyme['month'], + year=tyme['year'], + hour=tyme['hour'], + minute=tyme['minute'], + second=tyme['second'], + microsecond=tyme['microsecond'])