Update common.

policy.py is not updated as it causes test failures. I will investigate
that separately.

Change-Id: I9936137c1ab5b368ba6c3c35c423568b38bf913f
This commit is contained in:
Michael Still 2012-10-18 11:43:12 -07:00
parent 1fa8442a52
commit 1ea9588c4d
5 changed files with 43 additions and 122 deletions

View File

@ -58,9 +58,6 @@ zmq_opts = [
cfg.IntOpt('rpc_zmq_port', default=9501,
help='ZeroMQ receiver listening port'),
cfg.IntOpt('rpc_zmq_port_pub', default=9502,
help='ZeroMQ fanout publisher port'),
cfg.IntOpt('rpc_zmq_contexts', default=1,
help='Number of ZeroMQ contexts, defaults to 1'),
@ -209,7 +206,7 @@ class ZmqClient(object):
self.outq = ZmqSocket(addr, socket_type, bind=bind)
def cast(self, msg_id, topic, data):
self.outq.send([str(topic), str(msg_id), str('cast'),
self.outq.send([str(msg_id), str(topic), str('cast'),
_serialize(data)])
def close(self):
@ -302,9 +299,6 @@ class ConsumerBase(object):
else:
return [result]
def consume(self, sock):
raise NotImplementedError()
def process(self, style, target, proxy, ctx, data):
# Method starting with - are
# processed internally. (non-valid method name)
@ -417,17 +411,12 @@ class ZmqProxy(ZmqBaseReactor):
zmq.PUB, bind=True)
self.sockets.append(self.topic_proxy['zmq_replies'])
self.topic_proxy['fanout~'] = \
ZmqSocket("tcp://%s:%s" % (CONF.rpc_zmq_bind_address,
CONF.rpc_zmq_port_pub), zmq.PUB, bind=True)
self.sockets.append(self.topic_proxy['fanout~'])
def consume(self, sock):
ipc_dir = CONF.rpc_zmq_ipc_dir
#TODO(ewindisch): use zero-copy (i.e. references, not copying)
data = sock.recv()
topic, msg_id, style, in_msg = data
msg_id, topic, style, in_msg = data
topic = topic.split('.', 1)[0]
LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
@ -435,11 +424,6 @@ class ZmqProxy(ZmqBaseReactor):
# Handle zmq_replies magic
if topic.startswith('fanout~'):
sock_type = zmq.PUB
# This doesn't change what is in the message,
# it only specifies that these messages go to
# the generic fanout topic.
topic = 'fanout~'
elif topic.startswith('zmq_replies'):
sock_type = zmq.PUB
inside = _deserialize(in_msg)
@ -450,32 +434,23 @@ class ZmqProxy(ZmqBaseReactor):
else:
sock_type = zmq.PUSH
if not topic in self.topic_proxy:
outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic),
sock_type, bind=True)
self.topic_proxy[topic] = outq
self.sockets.append(outq)
LOG.info(_("Created topic proxy: %s"), topic)
if not topic in self.topic_proxy:
outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic),
sock_type, bind=True)
self.topic_proxy[topic] = outq
self.sockets.append(outq)
LOG.info(_("Created topic proxy: %s"), topic)
# It takes some time for a pub socket to open,
# before we can have any faith in doing a send() to it.
if sock_type == zmq.PUB:
eventlet.sleep(.5)
LOG.debug(_("ROUTER RELAY-OUT START %(data)s") % {'data': data})
self.topic_proxy[topic].send(data)
LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % {'data': data})
class CallbackReactor(ZmqBaseReactor):
"""
A consumer class passing messages to a callback
"""
def __init__(self, conf, callback):
self._cb = callback
super(CallbackReactor, self).__init__(conf)
def consume(self, sock):
data = sock.recv()
self._cb(data[3])
class ZmqReactor(ZmqBaseReactor):
"""
A consumer class implementing a
@ -496,7 +471,7 @@ class ZmqReactor(ZmqBaseReactor):
self.mapping[sock].send(data)
return
topic, msg_id, style, in_msg = data
msg_id, topic, style, in_msg = data
ctx, request = _deserialize(in_msg)
ctx = RpcContext.unmarshal(ctx)
@ -513,26 +488,6 @@ class Connection(rpc_common.Connection):
def __init__(self, conf):
self.reactor = ZmqReactor(conf)
def _consume_fanout(self, reactor, topic, proxy, bind=False):
for topic, host in matchmaker.queues("publishers~%s" % (topic, )):
inaddr = "tcp://%s:%s" % (host, CONF.rpc_zmq_port)
reactor.register(proxy, inaddr, zmq.SUB, in_bind=bind)
def declare_topic_consumer(self, topic, callback=None,
queue_name=None):
"""declare_topic_consumer is a private method, but
it is being used by Quantum (Folsom).
This has been added compatibility.
"""
# Only consume on the base topic name.
topic = topic.split('.', 1)[0]
if CONF.rpc_zmq_host in matchmaker.queues("fanout~%s" % (topic, )):
return
reactor = CallbackReactor(CONF, callback)
self._consume_fanout(reactor, topic, None, bind=False)
def create_consumer(self, topic, proxy, fanout=False):
# Only consume on the base topic name.
topic = topic.split('.', 1)[0]
@ -540,35 +495,22 @@ class Connection(rpc_common.Connection):
LOG.info(_("Create Consumer for topic (%(topic)s)") %
{'topic': topic})
# Consume direct-push fanout messages (relay to local consumers)
# Subscription scenarios
if fanout:
# If we're not in here, we can't receive direct fanout messages
if CONF.rpc_zmq_host in matchmaker.queues(topic):
# Consume from all remote publishers.
self._consume_fanout(self.reactor, topic, proxy)
else:
LOG.warn("This service cannot receive direct PUSH fanout "
"messages without being known by the matchmaker.")
return
# Configure consumer for direct pushes.
subscribe = (topic, fanout)[type(fanout) == str]
subscribe = ('', fanout)[type(fanout) == str]
sock_type = zmq.SUB
topic = 'fanout~' + topic
inaddr = "tcp://127.0.0.1:%s" % (CONF.rpc_zmq_port_pub, )
else:
sock_type = zmq.PULL
subscribe = None
# Receive messages from (local) proxy
inaddr = "ipc://%s/zmq_topic_%s" % \
(CONF.rpc_zmq_ipc_dir, topic)
# Receive messages from (local) proxy
inaddr = "ipc://%s/zmq_topic_%s" % \
(CONF.rpc_zmq_ipc_dir, topic)
LOG.debug(_("Consumer is a zmq.%s"),
['PULL', 'SUB'][sock_type == zmq.SUB])
# Consume messages from local rpc-zmq-receiver daemon.
self.reactor.register(proxy, inaddr, sock_type,
subscribe=subscribe, in_bind=False)

View File

@ -132,14 +132,6 @@ class FanoutBinding(Binding):
return False
class PublisherBinding(Binding):
"""Match on publishers keys, where key starts with 'publishers.' string."""
def test(self, key):
if key.startswith('publishers~'):
return True
return False
class StubExchange(Exchange):
"""Exchange that does nothing."""
def run(self, key):
@ -190,23 +182,6 @@ class RoundRobinRingExchange(RingExchange):
return [(key + '.' + host, host)]
class PublisherRingExchange(RingExchange):
"""Fanout Exchange based on a hashmap."""
def __init__(self, ring=None):
super(PublisherRingExchange, self).__init__(ring)
def run(self, key):
# Assume starts with "publishers~", strip it for lookup.
nkey = key.split('publishers~')[1:][0]
if not self._ring_has(nkey):
LOG.warn(
_("No key defining hosts for topic '%s', "
"see ringfile") % (nkey, )
)
return []
return map(lambda x: (key + '.' + x, x), self.ring[nkey])
class FanoutRingExchange(RingExchange):
"""Fanout Exchange based on a hashmap."""
def __init__(self, ring=None):
@ -221,8 +196,7 @@ class FanoutRingExchange(RingExchange):
"see ringfile") % (nkey, )
)
return []
return map(lambda x: (key + '.' + x, x), self.ring[nkey] +
['localhost'])
return map(lambda x: (key + '.' + x, x), self.ring[nkey])
class LocalhostExchange(Exchange):
@ -253,7 +227,6 @@ class MatchMakerRing(MatchMakerBase):
"""
def __init__(self, ring=None):
super(MatchMakerRing, self).__init__()
self.add_binding(PublisherBinding(), PublisherRingExchange(ring))
self.add_binding(FanoutBinding(), FanoutRingExchange(ring))
self.add_binding(DirectBinding(), DirectExchange())
self.add_binding(TopicBinding(), RoundRobinRingExchange(ring))
@ -266,7 +239,6 @@ class MatchMakerLocalhost(MatchMakerBase):
"""
def __init__(self):
super(MatchMakerLocalhost, self).__init__()
self.add_binding(PublisherBinding(), LocalhostExchange())
self.add_binding(FanoutBinding(), LocalhostExchange())
self.add_binding(DirectBinding(), DirectExchange())
self.add_binding(TopicBinding(), LocalhostExchange())
@ -281,7 +253,6 @@ class MatchMakerStub(MatchMakerBase):
def __init__(self):
super(MatchMakerLocalhost, self).__init__()
self.add_binding(PublisherBinding(), StubExchange())
self.add_binding(FanoutBinding(), StubExchange())
self.add_binding(DirectBinding(), StubExchange())
self.add_binding(TopicBinding(), StubExchange())

View File

@ -20,6 +20,7 @@
from cinder.openstack.common.gettextutils import _
from cinder.openstack.common import log as logging
from cinder.openstack.common import rpc
from cinder.openstack.common.rpc import dispatcher as rpc_dispatcher
from cinder.openstack.common import service
@ -46,15 +47,15 @@ class Service(service.Service):
LOG.debug(_("Creating Consumer connection for Service %s") %
self.topic)
rpc_dispatcher = rpc.dispatcher.RpcDispatcher([self.manager])
dispatcher = rpc_dispatcher.RpcDispatcher([self.manager])
# Share this same connection for these Consumers
self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=False)
self.conn.create_consumer(self.topic, dispatcher, fanout=False)
node_topic = '%s.%s' % (self.topic, self.host)
self.conn.create_consumer(node_topic, rpc_dispatcher, fanout=False)
self.conn.create_consumer(node_topic, dispatcher, fanout=False)
self.conn.create_consumer(self.topic, rpc_dispatcher, fanout=True)
self.conn.create_consumer(self.topic, dispatcher, fanout=True)
# Consume from all consumers in a thread
self.conn.consume_in_thread()

View File

@ -31,13 +31,13 @@ from setuptools.command import sdist
def parse_mailmap(mailmap='.mailmap'):
mapping = {}
if os.path.exists(mailmap):
fp = open(mailmap, 'r')
for l in fp:
l = l.strip()
if not l.startswith('#') and ' ' in l:
canonical_email, alias = [x for x in l.split(' ')
if x.startswith('<')]
mapping[alias] = canonical_email
with open(mailmap, 'r') as fp:
for l in fp:
l = l.strip()
if not l.startswith('#') and ' ' in l:
canonical_email, alias = [x for x in l.split(' ')
if x.startswith('<')]
mapping[alias] = canonical_email
return mapping
@ -54,7 +54,8 @@ def canonicalize_emails(changelog, mapping):
def get_reqs_from_files(requirements_files):
for requirements_file in requirements_files:
if os.path.exists(requirements_file):
return open(requirements_file, 'r').read().split('\n')
with open(requirements_file, 'r') as fil:
return fil.read().split('\n')
return []
@ -191,14 +192,14 @@ def write_git_changelog():
def generate_authors():
"""Create AUTHORS file using git commits."""
jenkins_email = 'jenkins@review.openstack.org'
jenkins_email = 'jenkins@review.(openstack|stackforge).org'
old_authors = 'AUTHORS.in'
new_authors = 'AUTHORS'
if not os.getenv('SKIP_GENERATE_AUTHORS'):
if os.path.isdir('.git'):
# don't include jenkins email address in AUTHORS file
git_log_cmd = ("git log --format='%aN <%aE>' | sort -u | "
"grep -v " + jenkins_email)
"egrep -v '" + jenkins_email + "'")
changelog = _run_shell_command(git_log_cmd)
mailmap = parse_mailmap()
with open(new_authors, 'w') as new_authors_fh:
@ -236,7 +237,8 @@ def read_versioninfo(project):
def write_versioninfo(project, version):
"""Write a simple file containing the version of the package."""
open(os.path.join(project, 'versioninfo'), 'w').write("%s\n" % version)
with open(os.path.join(project, 'versioninfo'), 'w') as fil:
fil.write("%s\n" % version)
def get_cmdclass():

View File

@ -74,6 +74,11 @@ def is_older_than(before, seconds):
return utcnow() - before > datetime.timedelta(seconds=seconds)
def is_newer_than(after, seconds):
"""Return True if after is newer than seconds."""
return after - utcnow() > datetime.timedelta(seconds=seconds)
def utcnow_ts():
"""Timestamp version of our utcnow function."""
return calendar.timegm(utcnow().timetuple())