Implement the server side of ZmqDriver

This implements the server side of the driver without modifying the
existing code by allowing the driver to spawn off multiple greenthreads
as before, but queueing any dispatched messages so that the executor
can still do listener.poll() to dispatch messages itself.

This is a hack, but it's a starting point.

Change-Id: Ie299c2695d81d0473cea81d40114326b89de0011
This commit is contained in:
Mark McLoughlin 2013-08-28 13:48:19 +01:00
parent a2d113198c
commit b6d8087746

View File

@ -14,12 +14,15 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import collections
import logging import logging
import os import os
import pprint import pprint
import Queue
import re import re
import socket import socket
import sys import sys
import threading
import types import types
import uuid import uuid
@ -77,7 +80,12 @@ zmq_opts = [
cfg.StrOpt('rpc_zmq_host', default=socket.gethostname(), cfg.StrOpt('rpc_zmq_host', default=socket.gethostname(),
help='Name of this node. Must be a valid hostname, FQDN, or ' help='Name of this node. Must be a valid hostname, FQDN, or '
'IP address. Must match "host" option, if running Nova.') 'IP address. Must match "host" option, if running Nova.'),
cfg.IntOpt('rpc_cast_timeout',
default=30,
help='Seconds to wait before a cast expires (TTL). '
'Only supported by impl_zmq.'),
] ]
CONF = cfg.CONF CONF = cfg.CONF
@ -621,7 +629,7 @@ class Connection(rpc_common.Connection):
def _cast(addr, context, topic, msg, timeout=None, envelope=False, def _cast(addr, context, topic, msg, timeout=None, envelope=False,
_msg_id=None): _msg_id=None, allowed_remote_exmods=[]):
timeout_cast = timeout or CONF.rpc_cast_timeout timeout_cast = timeout or CONF.rpc_cast_timeout
payload = [RpcContext.marshal(context), msg] payload = [RpcContext.marshal(context), msg]
@ -639,7 +647,7 @@ def _cast(addr, context, topic, msg, timeout=None, envelope=False,
def _call(addr, context, topic, msg, timeout=None, def _call(addr, context, topic, msg, timeout=None,
envelope=False): envelope=False, allowed_remote_exmods=[]):
# timeout_response is how long we wait for a response # timeout_response is how long we wait for a response
timeout = timeout or CONF.rpc_response_timeout timeout = timeout or CONF.rpc_response_timeout
@ -709,13 +717,14 @@ def _call(addr, context, topic, msg, timeout=None,
# responses for Exceptions. # responses for Exceptions.
for resp in responses: for resp in responses:
if isinstance(resp, types.DictType) and 'exc' in resp: if isinstance(resp, types.DictType) and 'exc' in resp:
raise rpc_common.deserialize_remote_exception(CONF, resp['exc']) raise rpc_common.deserialize_remote_exception(
resp['exc'], allowed_remote_exmods)
return responses[-1] return responses[-1]
def _multi_send(method, context, topic, msg, timeout=None, def _multi_send(method, context, topic, msg, timeout=None,
envelope=False, _msg_id=None): envelope=False, _msg_id=None, allowed_remote_exmods=[]):
"""Wraps the sending of messages. """Wraps the sending of messages.
Dispatches to the matchmaker and sends message to all relevant hosts. Dispatches to the matchmaker and sends message to all relevant hosts.
@ -744,7 +753,7 @@ def _multi_send(method, context, topic, msg, timeout=None,
_msg_id) _msg_id)
return return
return method(_addr, context, _topic, msg, timeout, return method(_addr, context, _topic, msg, timeout,
envelope) envelope, allowed_remote_exmods)
def create_connection(conf, new=True): def create_connection(conf, new=True):
@ -820,6 +829,59 @@ def _get_matchmaker(*args, **kwargs):
return matchmaker return matchmaker
class ZmqIncomingMessage(base.IncomingMessage):
ReceivedReply = collections.namedtuple(
'ReceivedReply', ['reply', 'failure', 'log_failure'])
def __init__(self, listener, ctxt, message):
super(ZmqIncomingMessage, self).__init__(listener, ctxt, message)
self.condition = threading.Condition()
self.received = None
def reply(self, reply=None, failure=None, log_failure=True):
self.received = self.ReceivedReply(reply, failure, log_failure)
with self.condition:
self.condition.notify()
class ZmqListener(base.Listener):
def __init__(self, driver, target):
super(ZmqListener, self).__init__(driver, target)
self.incoming_queue = Queue.Queue()
def dispatch(self, ctxt, version, method, namespace, **kwargs):
message = {
'method': method,
'args': kwargs
}
if version:
message['version'] = version
if namespace:
message['namespace'] = namespace
incoming = ZmqIncomingMessage(self,
ctxt.to_dict(),
message)
self.incoming_queue.put(incoming)
with incoming.condition:
incoming.condition.wait()
assert incoming.received
if incoming.received.failure:
raise incoming.received.failure
else:
return incoming.received.reply
def poll(self):
while True:
return self.incoming_queue.get()
class ZmqDriver(base.BaseDriver): class ZmqDriver(base.BaseDriver):
# FIXME(markmc): allow this driver to be used without eventlet # FIXME(markmc): allow this driver to be used without eventlet
@ -869,7 +931,8 @@ class ZmqDriver(base.BaseDriver):
topic = 'fanout~' + topic topic = 'fanout~' + topic
reply = _multi_send(method, context, topic, message, reply = _multi_send(method, context, topic, message,
envelope=envelope) envelope=envelope,
allowed_remote_exmods=self._allowed_remote_exmods)
if wait_for_reply: if wait_for_reply:
return reply[-1] return reply[-1]
@ -884,7 +947,18 @@ class ZmqDriver(base.BaseDriver):
return self._send(target, ctxt, message, envelope=(version == 2.0)) return self._send(target, ctxt, message, envelope=(version == 2.0))
def listen(self, target): def listen(self, target):
raise NotImplementedError() conn = create_connection(self.conf)
listener = ZmqListener(self, target)
conn.create_consumer(target.topic, listener)
conn.create_consumer('%s.%s' % (target.topic, target.server),
listener)
conn.create_consumer(target.topic, listener, fanout=True)
conn.consume_in_thread()
return listener
def cleanup(self): def cleanup(self):
cleanup() cleanup()