Merge "[zmq] Use PUSH/PULL for direct CAST"
This commit is contained in:
commit
ad925e4671
@ -28,15 +28,16 @@ zmq = zmq_async.import_zmq()
|
||||
class PushPublisher(zmq_publisher_base.PublisherBase):
|
||||
|
||||
def __init__(self, conf, matchmaker):
|
||||
super(PushPublisher, self).__init__(conf, matchmaker, zmq.PUSH)
|
||||
sockets_manager = zmq_publisher_base.SocketsManager(
|
||||
conf, matchmaker, zmq.PULL, zmq.PUSH)
|
||||
super(PushPublisher, self).__init__(sockets_manager)
|
||||
|
||||
def send_request(self, request):
|
||||
|
||||
if request.msg_type == zmq_names.CALL_TYPE:
|
||||
raise zmq_publisher_base.UnsupportedSendPattern(request.msg_type)
|
||||
|
||||
push_socket = self._check_hosts_connections(
|
||||
request.target, zmq_names.socket_type_str(zmq.PULL))
|
||||
push_socket = self.outbound_sockets.get_socket(request.target)
|
||||
|
||||
if not push_socket.connections:
|
||||
LOG.warning(_LW("Request %s was dropped because no connection"),
|
||||
@ -48,10 +49,3 @@ class PushPublisher(zmq_publisher_base.PublisherBase):
|
||||
self._send_request(push_socket, request)
|
||||
else:
|
||||
self._send_request(push_socket, request)
|
||||
|
||||
def _send_request(self, socket, request):
|
||||
|
||||
super(PushPublisher, self)._send_request(socket, request)
|
||||
|
||||
LOG.debug("Publishing message %(message)s to a target %(target)s",
|
||||
{"message": request.message, "target": request.target})
|
||||
|
@ -17,6 +17,8 @@ from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
|
||||
import zmq_dealer_call_publisher
|
||||
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
|
||||
import zmq_dealer_publisher
|
||||
from oslo_messaging._drivers.zmq_driver.client.publishers \
|
||||
import zmq_push_publisher
|
||||
from oslo_messaging._drivers.zmq_driver.client import zmq_client_base
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_address
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
@ -43,6 +45,9 @@ class ZmqClient(zmq_client_base.ZmqClientBase):
|
||||
zmq_dealer_call_publisher.DealerCallPublisher(
|
||||
conf, matchmaker),
|
||||
|
||||
zmq_names.CAST_TYPE:
|
||||
zmq_push_publisher.PushPublisher(conf, matchmaker),
|
||||
|
||||
# Here use DealerPublisherLight for sending request to proxy
|
||||
# which finally uses PubPublisher to send fanout in case of
|
||||
# 'use_pub_sub' option configured.
|
||||
|
@ -14,6 +14,8 @@
|
||||
|
||||
import abc
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
|
||||
import six
|
||||
|
||||
@ -84,3 +86,43 @@ class SingleSocketConsumer(ConsumerBase):
|
||||
@property
|
||||
def port(self):
|
||||
return self.socket.port
|
||||
|
||||
|
||||
class TargetsManager(object):
|
||||
|
||||
def __init__(self, conf, matchmaker, host, socket_type):
|
||||
self.targets = []
|
||||
self.conf = conf
|
||||
self.matchmaker = matchmaker
|
||||
self.host = host
|
||||
self.socket_type = socket_type
|
||||
self.targets_lock = threading.Lock()
|
||||
self.updater = zmq_async.get_executor(method=self._update_targets) \
|
||||
if conf.zmq_target_expire > 0 else None
|
||||
if self.updater:
|
||||
self.updater.execute()
|
||||
|
||||
def _update_targets(self):
|
||||
with self.targets_lock:
|
||||
for target in self.targets:
|
||||
self.matchmaker.register(
|
||||
target, self.host,
|
||||
zmq_names.socket_type_str(self.socket_type))
|
||||
|
||||
# Update target-records once per half expiration time
|
||||
time.sleep(self.conf.zmq_target_expire / 2)
|
||||
|
||||
def listen(self, target):
|
||||
with self.targets_lock:
|
||||
self.targets.append(target)
|
||||
self.matchmaker.register(
|
||||
target, self.host,
|
||||
zmq_names.socket_type_str(self.socket_type))
|
||||
|
||||
def cleanup(self):
|
||||
if self.updater:
|
||||
self.updater.stop()
|
||||
for target in self.targets:
|
||||
self.matchmaker.unregister(
|
||||
target, self.host,
|
||||
zmq_names.socket_type_str(self.socket_type))
|
||||
|
@ -17,6 +17,7 @@ import logging
|
||||
from oslo_messaging._drivers import base
|
||||
from oslo_messaging._drivers.zmq_driver.server.consumers\
|
||||
import zmq_consumer_base
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_address
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||
from oslo_messaging._i18n import _LE, _LI
|
||||
@ -45,19 +46,33 @@ class PullConsumer(zmq_consumer_base.SingleSocketConsumer):
|
||||
|
||||
def __init__(self, conf, poller, server):
|
||||
super(PullConsumer, self).__init__(conf, poller, server, zmq.PULL)
|
||||
self.matchmaker = server.matchmaker
|
||||
self.host = zmq_address.combine_address(self.conf.rpc_zmq_host,
|
||||
self.port)
|
||||
self.targets = zmq_consumer_base.TargetsManager(
|
||||
conf, self.matchmaker, self.host, zmq.PULL)
|
||||
LOG.info(_LI("[%s] Run PULL consumer"), self.host)
|
||||
|
||||
def listen(self, target):
|
||||
LOG.info(_LI("Listen to target %s"), str(target))
|
||||
# Do nothing here because we have a single socket
|
||||
self.targets.listen(target)
|
||||
|
||||
def cleanup(self):
|
||||
super(PullConsumer, self).cleanup()
|
||||
self.targets.cleanup()
|
||||
|
||||
def receive_message(self, socket):
|
||||
try:
|
||||
msg_type = socket.recv_string()
|
||||
request = socket.recv_pyobj()
|
||||
msg_type = request.msg_type
|
||||
assert msg_type is not None, 'Bad format: msg type expected'
|
||||
context = socket.recv_pyobj()
|
||||
message = socket.recv_pyobj()
|
||||
LOG.debug("Received %(msg_type)s message %(msg)s",
|
||||
{"msg_type": msg_type, "msg": str(message)})
|
||||
context = request.context
|
||||
message = request.message
|
||||
LOG.debug("[%(host)s] Received %(type)s, %(id)s, %(target)s",
|
||||
{"host": self.host,
|
||||
"type": request.msg_type,
|
||||
"id": request.message_id,
|
||||
"target": request.target})
|
||||
|
||||
if msg_type in (zmq_names.CAST_TYPES + zmq_names.NOTIFY_TYPES):
|
||||
return PullIncomingMessage(self.server, context, message)
|
||||
|
@ -13,8 +13,6 @@
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
|
||||
from oslo_messaging._drivers import base
|
||||
from oslo_messaging._drivers.zmq_driver.server.consumers\
|
||||
@ -58,7 +56,8 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer):
|
||||
self.matchmaker = server.matchmaker
|
||||
self.host = zmq_address.combine_address(self.conf.rpc_zmq_host,
|
||||
self.port)
|
||||
self.targets = TargetsManager(conf, self.matchmaker, self.host)
|
||||
self.targets = zmq_consumer_base.TargetsManager(
|
||||
conf, self.matchmaker, self.host, zmq.ROUTER)
|
||||
LOG.info(_LI("[%s] Run ROUTER consumer"), self.host)
|
||||
|
||||
def listen(self, target):
|
||||
@ -98,39 +97,3 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer):
|
||||
|
||||
except zmq.ZMQError as e:
|
||||
LOG.error(_LE("Receiving message failed: %s"), str(e))
|
||||
|
||||
|
||||
class TargetsManager(object):
|
||||
|
||||
def __init__(self, conf, matchmaker, host):
|
||||
self.targets = []
|
||||
self.conf = conf
|
||||
self.matchmaker = matchmaker
|
||||
self.host = host
|
||||
self.targets_lock = threading.Lock()
|
||||
self.updater = zmq_async.get_executor(method=self._update_targets) \
|
||||
if conf.zmq_target_expire > 0 else None
|
||||
if self.updater:
|
||||
self.updater.execute()
|
||||
|
||||
def _update_targets(self):
|
||||
with self.targets_lock:
|
||||
for target in self.targets:
|
||||
self.matchmaker.register(
|
||||
target, self.host, zmq_names.socket_type_str(zmq.ROUTER))
|
||||
|
||||
# Update target-records once per half expiration time
|
||||
time.sleep(self.conf.zmq_target_expire / 2)
|
||||
|
||||
def listen(self, target):
|
||||
with self.targets_lock:
|
||||
self.targets.append(target)
|
||||
self.matchmaker.register(target, self.host,
|
||||
zmq_names.socket_type_str(zmq.ROUTER))
|
||||
|
||||
def cleanup(self):
|
||||
if self.updater:
|
||||
self.updater.stop()
|
||||
for target in self.targets:
|
||||
self.matchmaker.unregister(target, self.host,
|
||||
zmq_names.socket_type_str(zmq.ROUTER))
|
||||
|
@ -16,6 +16,8 @@ import copy
|
||||
import logging
|
||||
|
||||
from oslo_messaging._drivers import base
|
||||
from oslo_messaging._drivers.zmq_driver.server.consumers\
|
||||
import zmq_pull_consumer
|
||||
from oslo_messaging._drivers.zmq_driver.server.consumers\
|
||||
import zmq_router_consumer
|
||||
from oslo_messaging._drivers.zmq_driver.server.consumers\
|
||||
@ -36,12 +38,14 @@ class ZmqServer(base.Listener):
|
||||
self.poller = zmq_async.get_poller()
|
||||
self.router_consumer = zmq_router_consumer.RouterConsumer(
|
||||
conf, self.poller, self)
|
||||
self.pull_consumer = zmq_pull_consumer.PullConsumer(
|
||||
conf, self.poller, self)
|
||||
self.sub_consumer = zmq_sub_consumer.SubConsumer(
|
||||
conf, self.poller, self) if conf.use_pub_sub else None
|
||||
self.notify_consumer = self.sub_consumer if conf.use_pub_sub \
|
||||
else self.router_consumer
|
||||
|
||||
self.consumers = [self.router_consumer]
|
||||
self.consumers = [self.router_consumer, self.pull_consumer]
|
||||
if self.sub_consumer:
|
||||
self.consumers.append(self.sub_consumer)
|
||||
|
||||
@ -62,9 +66,8 @@ class ZmqServer(base.Listener):
|
||||
consumer.cleanup()
|
||||
|
||||
def listen(self, target):
|
||||
consumer = self.router_consumer
|
||||
consumer.listen(target)
|
||||
|
||||
self.router_consumer.listen(target)
|
||||
self.pull_consumer.listen(target)
|
||||
if self.sub_consumer:
|
||||
self.sub_consumer.listen(target)
|
||||
|
||||
|
@ -157,8 +157,7 @@ class Server(object):
|
||||
LOG.debug("Waiting for the stop signal ...")
|
||||
time.sleep(1)
|
||||
self.rpc_server.stop()
|
||||
LOG.debug("Leaving process T:%s Pid:%d", (str(target),
|
||||
os.getpid()))
|
||||
LOG.debug("Leaving process T:%s Pid:%d", str(target), os.getpid())
|
||||
|
||||
def cleanup(self):
|
||||
LOG.debug("Stopping server")
|
||||
|
Loading…
x
Reference in New Issue
Block a user