Update common.
policy.py and setup.py are not updated as they cause test failures. I will investigate them separately. Change-Id: Ie46626e67ce5c32baf0eda9f63afb33611d5015a
This commit is contained in:
parent
12ed618200
commit
3e0afd8d11
@ -54,15 +54,14 @@ log_opts = [
|
||||
'%(message)s',
|
||||
help='format string to use for log messages with context'),
|
||||
cfg.StrOpt('logging_default_format_string',
|
||||
default='%(asctime)s %(levelname)s %(name)s [-] %(instance)s'
|
||||
'%(message)s',
|
||||
default='%(asctime)s %(process)d %(levelname)s %(name)s [-]'
|
||||
' %(instance)s%(message)s',
|
||||
help='format string to use for log messages without context'),
|
||||
cfg.StrOpt('logging_debug_format_suffix',
|
||||
default='from (pid=%(process)d) %(funcName)s '
|
||||
'%(pathname)s:%(lineno)d',
|
||||
default='%(funcName)s %(pathname)s:%(lineno)d',
|
||||
help='data to append to log format when level is DEBUG'),
|
||||
cfg.StrOpt('logging_exception_prefix',
|
||||
default='%(asctime)s TRACE %(name)s %(instance)s',
|
||||
default='%(asctime)s %(process)d TRACE %(name)s %(instance)s',
|
||||
help='prefix each line of exception output with this format'),
|
||||
cfg.ListOpt('default_log_levels',
|
||||
default=[
|
||||
|
@ -58,9 +58,6 @@ zmq_opts = [
|
||||
cfg.IntOpt('rpc_zmq_port', default=9501,
|
||||
help='ZeroMQ receiver listening port'),
|
||||
|
||||
cfg.IntOpt('rpc_zmq_port_pub', default=9502,
|
||||
help='ZeroMQ fanout publisher port'),
|
||||
|
||||
cfg.IntOpt('rpc_zmq_contexts', default=1,
|
||||
help='Number of ZeroMQ contexts, defaults to 1'),
|
||||
|
||||
@ -209,7 +206,7 @@ class ZmqClient(object):
|
||||
self.outq = ZmqSocket(addr, socket_type, bind=bind)
|
||||
|
||||
def cast(self, msg_id, topic, data):
|
||||
self.outq.send([str(topic), str(msg_id), str('cast'),
|
||||
self.outq.send([str(msg_id), str(topic), str('cast'),
|
||||
_serialize(data)])
|
||||
|
||||
def close(self):
|
||||
@ -302,9 +299,6 @@ class ConsumerBase(object):
|
||||
else:
|
||||
return [result]
|
||||
|
||||
def consume(self, sock):
|
||||
raise NotImplementedError()
|
||||
|
||||
def process(self, style, target, proxy, ctx, data):
|
||||
# Method starting with - are
|
||||
# processed internally. (non-valid method name)
|
||||
@ -417,17 +411,12 @@ class ZmqProxy(ZmqBaseReactor):
|
||||
zmq.PUB, bind=True)
|
||||
self.sockets.append(self.topic_proxy['zmq_replies'])
|
||||
|
||||
self.topic_proxy['fanout~'] = \
|
||||
ZmqSocket("tcp://%s:%s" % (CONF.rpc_zmq_bind_address,
|
||||
CONF.rpc_zmq_port_pub), zmq.PUB, bind=True)
|
||||
self.sockets.append(self.topic_proxy['fanout~'])
|
||||
|
||||
def consume(self, sock):
|
||||
ipc_dir = CONF.rpc_zmq_ipc_dir
|
||||
|
||||
#TODO(ewindisch): use zero-copy (i.e. references, not copying)
|
||||
data = sock.recv()
|
||||
topic, msg_id, style, in_msg = data
|
||||
msg_id, topic, style, in_msg = data
|
||||
topic = topic.split('.', 1)[0]
|
||||
|
||||
LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
|
||||
@ -435,11 +424,6 @@ class ZmqProxy(ZmqBaseReactor):
|
||||
# Handle zmq_replies magic
|
||||
if topic.startswith('fanout~'):
|
||||
sock_type = zmq.PUB
|
||||
|
||||
# This doesn't change what is in the message,
|
||||
# it only specifies that these messages go to
|
||||
# the generic fanout topic.
|
||||
topic = 'fanout~'
|
||||
elif topic.startswith('zmq_replies'):
|
||||
sock_type = zmq.PUB
|
||||
inside = _deserialize(in_msg)
|
||||
@ -450,32 +434,23 @@ class ZmqProxy(ZmqBaseReactor):
|
||||
else:
|
||||
sock_type = zmq.PUSH
|
||||
|
||||
if not topic in self.topic_proxy:
|
||||
outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic),
|
||||
sock_type, bind=True)
|
||||
self.topic_proxy[topic] = outq
|
||||
self.sockets.append(outq)
|
||||
LOG.info(_("Created topic proxy: %s"), topic)
|
||||
if not topic in self.topic_proxy:
|
||||
outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic),
|
||||
sock_type, bind=True)
|
||||
self.topic_proxy[topic] = outq
|
||||
self.sockets.append(outq)
|
||||
LOG.info(_("Created topic proxy: %s"), topic)
|
||||
|
||||
# It takes some time for a pub socket to open,
|
||||
# before we can have any faith in doing a send() to it.
|
||||
if sock_type == zmq.PUB:
|
||||
eventlet.sleep(.5)
|
||||
|
||||
LOG.debug(_("ROUTER RELAY-OUT START %(data)s") % {'data': data})
|
||||
self.topic_proxy[topic].send(data)
|
||||
LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % {'data': data})
|
||||
|
||||
|
||||
class CallbackReactor(ZmqBaseReactor):
|
||||
"""
|
||||
A consumer class passing messages to a callback
|
||||
"""
|
||||
|
||||
def __init__(self, conf, callback):
|
||||
self._cb = callback
|
||||
super(CallbackReactor, self).__init__(conf)
|
||||
|
||||
def consume(self, sock):
|
||||
data = sock.recv()
|
||||
self._cb(data[3])
|
||||
|
||||
|
||||
class ZmqReactor(ZmqBaseReactor):
|
||||
"""
|
||||
A consumer class implementing a
|
||||
@ -496,7 +471,7 @@ class ZmqReactor(ZmqBaseReactor):
|
||||
self.mapping[sock].send(data)
|
||||
return
|
||||
|
||||
topic, msg_id, style, in_msg = data
|
||||
msg_id, topic, style, in_msg = data
|
||||
|
||||
ctx, request = _deserialize(in_msg)
|
||||
ctx = RpcContext.unmarshal(ctx)
|
||||
@ -513,26 +488,6 @@ class Connection(rpc_common.Connection):
|
||||
def __init__(self, conf):
|
||||
self.reactor = ZmqReactor(conf)
|
||||
|
||||
def _consume_fanout(self, reactor, topic, proxy, bind=False):
|
||||
for topic, host in matchmaker.queues("publishers~%s" % (topic, )):
|
||||
inaddr = "tcp://%s:%s" % (host, CONF.rpc_zmq_port)
|
||||
reactor.register(proxy, inaddr, zmq.SUB, in_bind=bind)
|
||||
|
||||
def declare_topic_consumer(self, topic, callback=None,
|
||||
queue_name=None):
|
||||
"""declare_topic_consumer is a private method, but
|
||||
it is being used by Quantum (Folsom).
|
||||
This has been added compatibility.
|
||||
"""
|
||||
# Only consume on the base topic name.
|
||||
topic = topic.split('.', 1)[0]
|
||||
|
||||
if CONF.rpc_zmq_host in matchmaker.queues("fanout~%s" % (topic, )):
|
||||
return
|
||||
|
||||
reactor = CallbackReactor(CONF, callback)
|
||||
self._consume_fanout(reactor, topic, None, bind=False)
|
||||
|
||||
def create_consumer(self, topic, proxy, fanout=False):
|
||||
# Only consume on the base topic name.
|
||||
topic = topic.split('.', 1)[0]
|
||||
@ -540,35 +495,22 @@ class Connection(rpc_common.Connection):
|
||||
LOG.info(_("Create Consumer for topic (%(topic)s)") %
|
||||
{'topic': topic})
|
||||
|
||||
# Consume direct-push fanout messages (relay to local consumers)
|
||||
# Subscription scenarios
|
||||
if fanout:
|
||||
# If we're not in here, we can't receive direct fanout messages
|
||||
if CONF.rpc_zmq_host in matchmaker.queues(topic):
|
||||
# Consume from all remote publishers.
|
||||
self._consume_fanout(self.reactor, topic, proxy)
|
||||
else:
|
||||
LOG.warn("This service cannot receive direct PUSH fanout "
|
||||
"messages without being known by the matchmaker.")
|
||||
return
|
||||
|
||||
# Configure consumer for direct pushes.
|
||||
subscribe = (topic, fanout)[type(fanout) == str]
|
||||
subscribe = ('', fanout)[type(fanout) == str]
|
||||
sock_type = zmq.SUB
|
||||
topic = 'fanout~' + topic
|
||||
|
||||
inaddr = "tcp://127.0.0.1:%s" % (CONF.rpc_zmq_port_pub, )
|
||||
else:
|
||||
sock_type = zmq.PULL
|
||||
subscribe = None
|
||||
|
||||
# Receive messages from (local) proxy
|
||||
inaddr = "ipc://%s/zmq_topic_%s" % \
|
||||
(CONF.rpc_zmq_ipc_dir, topic)
|
||||
# Receive messages from (local) proxy
|
||||
inaddr = "ipc://%s/zmq_topic_%s" % \
|
||||
(CONF.rpc_zmq_ipc_dir, topic)
|
||||
|
||||
LOG.debug(_("Consumer is a zmq.%s"),
|
||||
['PULL', 'SUB'][sock_type == zmq.SUB])
|
||||
|
||||
# Consume messages from local rpc-zmq-receiver daemon.
|
||||
self.reactor.register(proxy, inaddr, sock_type,
|
||||
subscribe=subscribe, in_bind=False)
|
||||
|
||||
|
@ -132,14 +132,6 @@ class FanoutBinding(Binding):
|
||||
return False
|
||||
|
||||
|
||||
class PublisherBinding(Binding):
|
||||
"""Match on publishers keys, where key starts with 'publishers.' string."""
|
||||
def test(self, key):
|
||||
if key.startswith('publishers~'):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
class StubExchange(Exchange):
|
||||
"""Exchange that does nothing."""
|
||||
def run(self, key):
|
||||
@ -190,23 +182,6 @@ class RoundRobinRingExchange(RingExchange):
|
||||
return [(key + '.' + host, host)]
|
||||
|
||||
|
||||
class PublisherRingExchange(RingExchange):
|
||||
"""Fanout Exchange based on a hashmap."""
|
||||
def __init__(self, ring=None):
|
||||
super(PublisherRingExchange, self).__init__(ring)
|
||||
|
||||
def run(self, key):
|
||||
# Assume starts with "publishers~", strip it for lookup.
|
||||
nkey = key.split('publishers~')[1:][0]
|
||||
if not self._ring_has(nkey):
|
||||
LOG.warn(
|
||||
_("No key defining hosts for topic '%s', "
|
||||
"see ringfile") % (nkey, )
|
||||
)
|
||||
return []
|
||||
return map(lambda x: (key + '.' + x, x), self.ring[nkey])
|
||||
|
||||
|
||||
class FanoutRingExchange(RingExchange):
|
||||
"""Fanout Exchange based on a hashmap."""
|
||||
def __init__(self, ring=None):
|
||||
@ -221,8 +196,7 @@ class FanoutRingExchange(RingExchange):
|
||||
"see ringfile") % (nkey, )
|
||||
)
|
||||
return []
|
||||
return map(lambda x: (key + '.' + x, x), self.ring[nkey] +
|
||||
['localhost'])
|
||||
return map(lambda x: (key + '.' + x, x), self.ring[nkey])
|
||||
|
||||
|
||||
class LocalhostExchange(Exchange):
|
||||
@ -253,7 +227,6 @@ class MatchMakerRing(MatchMakerBase):
|
||||
"""
|
||||
def __init__(self, ring=None):
|
||||
super(MatchMakerRing, self).__init__()
|
||||
self.add_binding(PublisherBinding(), PublisherRingExchange(ring))
|
||||
self.add_binding(FanoutBinding(), FanoutRingExchange(ring))
|
||||
self.add_binding(DirectBinding(), DirectExchange())
|
||||
self.add_binding(TopicBinding(), RoundRobinRingExchange(ring))
|
||||
@ -266,7 +239,6 @@ class MatchMakerLocalhost(MatchMakerBase):
|
||||
"""
|
||||
def __init__(self):
|
||||
super(MatchMakerLocalhost, self).__init__()
|
||||
self.add_binding(PublisherBinding(), LocalhostExchange())
|
||||
self.add_binding(FanoutBinding(), LocalhostExchange())
|
||||
self.add_binding(DirectBinding(), DirectExchange())
|
||||
self.add_binding(TopicBinding(), LocalhostExchange())
|
||||
@ -281,7 +253,6 @@ class MatchMakerStub(MatchMakerBase):
|
||||
def __init__(self):
|
||||
super(MatchMakerLocalhost, self).__init__()
|
||||
|
||||
self.add_binding(PublisherBinding(), StubExchange())
|
||||
self.add_binding(FanoutBinding(), StubExchange())
|
||||
self.add_binding(DirectBinding(), StubExchange())
|
||||
self.add_binding(TopicBinding(), StubExchange())
|
||||
|
@ -20,6 +20,7 @@
|
||||
from quantum.openstack.common.gettextutils import _
|
||||
from quantum.openstack.common import log as logging
|
||||
from quantum.openstack.common import rpc
|
||||
from quantum.openstack.common.rpc import dispatcher as rpc_dispatcher
|
||||
from quantum.openstack.common import service
|
||||
|
||||
|
||||
@ -46,15 +47,15 @@ class Service(service.Service):
|
||||
LOG.debug(_("Creating Consumer connection for Service %s") %
|
||||
self.topic)
|
||||
|
||||
rpc_dispatcher = rpc.dispatcher.RpcDispatcher([self.manager])
|
||||
dispatcher = rpc_dispatcher.RpcDispatcher([self.manager])
|
||||
|
||||
# Share this same connection for these Consumers
|
||||
self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=False)
|
||||
self.conn.create_consumer(self.topic, dispatcher, fanout=False)
|
||||
|
||||
node_topic = '%s.%s' % (self.topic, self.host)
|
||||
self.conn.create_consumer(node_topic, rpc_dispatcher, fanout=False)
|
||||
self.conn.create_consumer(node_topic, dispatcher, fanout=False)
|
||||
|
||||
self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=True)
|
||||
self.conn.create_consumer(self.topic, dispatcher, fanout=True)
|
||||
|
||||
# Consume from all consumers in a thread
|
||||
self.conn.consume_in_thread()
|
||||
|
@ -62,9 +62,11 @@ def parse_strtime(timestr, fmt=PERFECT_TIME_FORMAT):
|
||||
|
||||
|
||||
def normalize_time(timestamp):
|
||||
"""Normalize time in arbitrary timezone to UTC"""
|
||||
"""Normalize time in arbitrary timezone to UTC naive object"""
|
||||
offset = timestamp.utcoffset()
|
||||
return timestamp.replace(tzinfo=None) - offset if offset else timestamp
|
||||
if offset is None:
|
||||
return timestamp
|
||||
return timestamp.replace(tzinfo=None) - offset
|
||||
|
||||
|
||||
def is_older_than(before, seconds):
|
||||
@ -72,6 +74,11 @@ def is_older_than(before, seconds):
|
||||
return utcnow() - before > datetime.timedelta(seconds=seconds)
|
||||
|
||||
|
||||
def is_newer_than(after, seconds):
|
||||
"""Return True if after is newer than seconds."""
|
||||
return after - utcnow() > datetime.timedelta(seconds=seconds)
|
||||
|
||||
|
||||
def utcnow_ts():
|
||||
"""Timestamp version of our utcnow function."""
|
||||
return calendar.timegm(utcnow().timetuple())
|
||||
@ -121,6 +128,10 @@ def marshall_now(now=None):
|
||||
|
||||
def unmarshall_time(tyme):
|
||||
"""Unmarshall a datetime dict."""
|
||||
return datetime.datetime(day=tyme['day'], month=tyme['month'],
|
||||
year=tyme['year'], hour=tyme['hour'], minute=tyme['minute'],
|
||||
second=tyme['second'], microsecond=tyme['microsecond'])
|
||||
return datetime.datetime(day=tyme['day'],
|
||||
month=tyme['month'],
|
||||
year=tyme['year'],
|
||||
hour=tyme['hour'],
|
||||
minute=tyme['minute'],
|
||||
second=tyme['second'],
|
||||
microsecond=tyme['microsecond'])
|
||||
|
Loading…
Reference in New Issue
Block a user