[zmq] Refactoring of zmq client
Split ZmqClient in order to create different clients for different configurations (direct/proxy). Change-Id: Ib374f62f53f2c82278ce5bc555ea440e8eba6617
This commit is contained in:
parent
acb398e317
commit
8ee19159d2
@ -182,12 +182,18 @@ class ZmqDriver(base.BaseDriver):
|
|||||||
self.get_matchmaker_backend(url),
|
self.get_matchmaker_backend(url),
|
||||||
).driver(self.conf, url=url)
|
).driver(self.conf, url=url)
|
||||||
|
|
||||||
|
client_cls = zmq_client.ZmqClientProxy
|
||||||
|
if conf.use_pub_sub and not conf.use_router_proxy:
|
||||||
|
client_cls = zmq_client.ZmqClientMixDirectPubSub
|
||||||
|
elif not conf.use_pub_sub and not conf.use_router_proxy:
|
||||||
|
client_cls = zmq_client.ZmqClientDirect
|
||||||
|
|
||||||
self.client = LazyDriverItem(
|
self.client = LazyDriverItem(
|
||||||
zmq_client.ZmqClient, self.conf, self.matchmaker,
|
client_cls, self.conf, self.matchmaker,
|
||||||
self.allowed_remote_exmods)
|
self.allowed_remote_exmods)
|
||||||
|
|
||||||
self.notifier = LazyDriverItem(
|
self.notifier = LazyDriverItem(
|
||||||
zmq_client.ZmqClient, self.conf, self.matchmaker,
|
client_cls, self.conf, self.matchmaker,
|
||||||
self.allowed_remote_exmods)
|
self.allowed_remote_exmods)
|
||||||
|
|
||||||
super(ZmqDriver, self).__init__(conf, url, default_exchange,
|
super(ZmqDriver, self).__init__(conf, url, default_exchange,
|
||||||
|
@ -112,30 +112,37 @@ class SocketsManager(object):
|
|||||||
self.socket_to_publishers = None
|
self.socket_to_publishers = None
|
||||||
self.socket_to_routers = None
|
self.socket_to_routers = None
|
||||||
|
|
||||||
def _track_socket(self, socket, target):
|
|
||||||
self.outbound_sockets[str(target)] = (socket, time.time())
|
|
||||||
|
|
||||||
def get_hosts(self, target):
|
def get_hosts(self, target):
|
||||||
return self.matchmaker.get_hosts(
|
return self.matchmaker.get_hosts(
|
||||||
target, zmq_names.socket_type_str(self.listener_type))
|
target, zmq_names.socket_type_str(self.listener_type))
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _key_from_target(target):
|
||||||
|
return target.topic if target.fanout else str(target)
|
||||||
|
|
||||||
def _get_hosts_and_connect(self, socket, target):
|
def _get_hosts_and_connect(self, socket, target):
|
||||||
hosts = self.get_hosts(target)
|
hosts = self.get_hosts(target)
|
||||||
self._connect_to_hosts(socket, target, hosts)
|
self._connect_to_hosts(socket, target, hosts)
|
||||||
|
|
||||||
|
def _track_socket(self, socket, target):
|
||||||
|
key = self._key_from_target(target)
|
||||||
|
self.outbound_sockets[key] = (socket, time.time())
|
||||||
|
|
||||||
def _connect_to_hosts(self, socket, target, hosts):
|
def _connect_to_hosts(self, socket, target, hosts):
|
||||||
for host in hosts:
|
for host in hosts:
|
||||||
socket.connect_to_host(host)
|
socket.connect_to_host(host)
|
||||||
self._track_socket(socket, target)
|
self._track_socket(socket, target)
|
||||||
|
|
||||||
def _check_for_new_hosts(self, target):
|
def _check_for_new_hosts(self, target):
|
||||||
socket, tm = self.outbound_sockets[str(target)]
|
key = self._key_from_target(target)
|
||||||
|
socket, tm = self.outbound_sockets[key]
|
||||||
if 0 <= self.conf.zmq_target_expire <= time.time() - tm:
|
if 0 <= self.conf.zmq_target_expire <= time.time() - tm:
|
||||||
self._get_hosts_and_connect(socket, target)
|
self._get_hosts_and_connect(socket, target)
|
||||||
return socket
|
return socket
|
||||||
|
|
||||||
def get_socket(self, target):
|
def get_socket(self, target):
|
||||||
if str(target) in self.outbound_sockets:
|
key = self._key_from_target(target)
|
||||||
|
if key in self.outbound_sockets:
|
||||||
socket = self._check_for_new_hosts(target)
|
socket = self._check_for_new_hosts(target)
|
||||||
else:
|
else:
|
||||||
socket = zmq_socket.ZmqSocket(self.conf, self.zmq_context,
|
socket = zmq_socket.ZmqSocket(self.conf, self.zmq_context,
|
||||||
@ -143,16 +150,6 @@ class SocketsManager(object):
|
|||||||
self._get_hosts_and_connect(socket, target)
|
self._get_hosts_and_connect(socket, target)
|
||||||
return socket
|
return socket
|
||||||
|
|
||||||
def get_socket_to_hosts(self, target, hosts):
|
|
||||||
key = str(target)
|
|
||||||
if key in self.outbound_sockets:
|
|
||||||
socket, tm = self.outbound_sockets[key]
|
|
||||||
else:
|
|
||||||
socket = zmq_socket.ZmqSocket(self.conf, self.zmq_context,
|
|
||||||
self.socket_type)
|
|
||||||
self._connect_to_hosts(socket, target, hosts)
|
|
||||||
return socket
|
|
||||||
|
|
||||||
def get_socket_to_publishers(self):
|
def get_socket_to_publishers(self):
|
||||||
if self.socket_to_publishers is not None:
|
if self.socket_to_publishers is not None:
|
||||||
return self.socket_to_publishers
|
return self.socket_to_publishers
|
||||||
|
@ -13,6 +13,7 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
|
|
||||||
|
from oslo_messaging._drivers import common
|
||||||
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
|
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
|
||||||
import zmq_dealer_call_publisher
|
import zmq_dealer_call_publisher
|
||||||
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
|
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
|
||||||
@ -28,45 +29,102 @@ from oslo_messaging._drivers.zmq_driver import zmq_names
|
|||||||
zmq = zmq_async.import_zmq()
|
zmq = zmq_async.import_zmq()
|
||||||
|
|
||||||
|
|
||||||
class ZmqClient(zmq_client_base.ZmqClientBase):
|
class WrongClientException(common.RPCException):
|
||||||
|
"""Raised if client type doesn't match configuration"""
|
||||||
|
|
||||||
|
|
||||||
|
class ZmqClientMixDirectPubSub(zmq_client_base.ZmqClientBase):
|
||||||
|
"""Client for using with direct connections and fanout over proxy:
|
||||||
|
|
||||||
|
use_pub_sub = true
|
||||||
|
use_router_proxy = false
|
||||||
|
|
||||||
|
"""
|
||||||
|
|
||||||
def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None):
|
def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None):
|
||||||
|
|
||||||
|
if conf.use_router_proxy or not conf.use_pub_sub:
|
||||||
|
raise WrongClientException()
|
||||||
|
|
||||||
self.sockets_manager = zmq_publisher_base.SocketsManager(
|
self.sockets_manager = zmq_publisher_base.SocketsManager(
|
||||||
conf, matchmaker, zmq.ROUTER, zmq.DEALER)
|
conf, matchmaker, zmq.ROUTER, zmq.DEALER)
|
||||||
|
|
||||||
default_publisher = zmq_dealer_publisher.DealerPublisher(
|
fanout_publisher = zmq_dealer_publisher_proxy.DealerPublisherProxy(
|
||||||
conf, matchmaker)
|
|
||||||
|
|
||||||
publisher_to_proxy = zmq_dealer_publisher_proxy.DealerPublisherProxy(
|
|
||||||
conf, matchmaker, self.sockets_manager.get_socket_to_publishers())
|
conf, matchmaker, self.sockets_manager.get_socket_to_publishers())
|
||||||
|
|
||||||
call_publisher = zmq_dealer_publisher_proxy.DealerCallPublisherProxy(
|
super(ZmqClientMixDirectPubSub, self).__init__(
|
||||||
conf, matchmaker, self.sockets_manager) if conf.use_router_proxy \
|
|
||||||
else zmq_dealer_call_publisher.DealerCallPublisher(
|
|
||||||
conf, matchmaker, self.sockets_manager)
|
|
||||||
|
|
||||||
cast_publisher = publisher_to_proxy if conf.use_router_proxy \
|
|
||||||
else zmq_dealer_publisher.DealerPublisherAsync(
|
|
||||||
conf, matchmaker)
|
|
||||||
|
|
||||||
fanout_publisher = publisher_to_proxy \
|
|
||||||
if conf.use_pub_sub else default_publisher
|
|
||||||
|
|
||||||
super(ZmqClient, self).__init__(
|
|
||||||
conf, matchmaker, allowed_remote_exmods,
|
conf, matchmaker, allowed_remote_exmods,
|
||||||
publishers={
|
publishers={
|
||||||
zmq_names.CALL_TYPE: call_publisher,
|
zmq_names.CALL_TYPE:
|
||||||
|
zmq_dealer_call_publisher.DealerCallPublisher(
|
||||||
|
conf, matchmaker, self.sockets_manager),
|
||||||
|
|
||||||
zmq_names.CAST_TYPE: cast_publisher,
|
|
||||||
|
|
||||||
# Here use DealerPublisherLight for sending request to proxy
|
|
||||||
# which finally uses PubPublisher to send fanout in case of
|
|
||||||
# 'use_pub_sub' option configured.
|
|
||||||
zmq_names.CAST_FANOUT_TYPE: fanout_publisher,
|
zmq_names.CAST_FANOUT_TYPE: fanout_publisher,
|
||||||
|
|
||||||
zmq_names.NOTIFY_TYPE: fanout_publisher,
|
zmq_names.NOTIFY_TYPE: fanout_publisher,
|
||||||
|
|
||||||
"default": default_publisher
|
"default": zmq_dealer_publisher.DealerPublisherAsync(
|
||||||
|
conf, matchmaker)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class ZmqClientDirect(zmq_client_base.ZmqClientBase):
|
||||||
|
"""This kind of client (publishers combination) is to be used for
|
||||||
|
direct connections only:
|
||||||
|
|
||||||
|
use_pub_sub = false
|
||||||
|
use_router_proxy = false
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None):
|
||||||
|
|
||||||
|
if conf.use_pub_sub or conf.use_router_proxy:
|
||||||
|
raise WrongClientException()
|
||||||
|
|
||||||
|
self.sockets_manager = zmq_publisher_base.SocketsManager(
|
||||||
|
conf, matchmaker, zmq.ROUTER, zmq.DEALER)
|
||||||
|
|
||||||
|
super(ZmqClientDirect, self).__init__(
|
||||||
|
conf, matchmaker, allowed_remote_exmods,
|
||||||
|
publishers={
|
||||||
|
zmq_names.CALL_TYPE:
|
||||||
|
zmq_dealer_call_publisher.DealerCallPublisher(
|
||||||
|
conf, matchmaker, self.sockets_manager),
|
||||||
|
|
||||||
|
"default": zmq_dealer_publisher.DealerPublisher(
|
||||||
|
conf, matchmaker)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class ZmqClientProxy(zmq_client_base.ZmqClientBase):
|
||||||
|
"""Client for using with proxy:
|
||||||
|
|
||||||
|
use_pub_sub = true
|
||||||
|
use_router_proxy = true
|
||||||
|
or
|
||||||
|
use_pub_sub = false
|
||||||
|
use_router_proxy = true
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, conf, matchmaker=None, allowed_remote_exmods=None):
|
||||||
|
|
||||||
|
if not conf.use_router_proxy:
|
||||||
|
raise WrongClientException()
|
||||||
|
|
||||||
|
self.sockets_manager = zmq_publisher_base.SocketsManager(
|
||||||
|
conf, matchmaker, zmq.ROUTER, zmq.DEALER)
|
||||||
|
|
||||||
|
super(ZmqClientProxy, self).__init__(
|
||||||
|
conf, matchmaker, allowed_remote_exmods,
|
||||||
|
publishers={
|
||||||
|
zmq_names.CALL_TYPE:
|
||||||
|
zmq_dealer_publisher_proxy.DealerCallPublisherProxy(
|
||||||
|
conf, matchmaker, self.sockets_manager),
|
||||||
|
|
||||||
|
"default": zmq_dealer_publisher_proxy.DealerPublisherProxy(
|
||||||
|
conf, matchmaker,
|
||||||
|
self.sockets_manager.get_socket_to_publishers())
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
@ -190,7 +190,7 @@ class RedisMatchMaker(base.MatchMakerBase):
|
|||||||
key = zmq_address.target_to_key(target, listener_type)
|
key = zmq_address.target_to_key(target, listener_type)
|
||||||
hosts.extend(self._get_hosts_by_key(key))
|
hosts.extend(self._get_hosts_by_key(key))
|
||||||
|
|
||||||
if not hosts and target.topic and target.server:
|
if (not hosts or target.fanout) and target.topic and target.server:
|
||||||
key = zmq_address.prefix_str(target.topic, listener_type)
|
key = zmq_address.prefix_str(target.topic, listener_type)
|
||||||
hosts.extend(self._get_hosts_by_key(key))
|
hosts.extend(self._get_hosts_by_key(key))
|
||||||
|
|
||||||
|
@ -16,6 +16,7 @@ cat > ${DATADIR}/zmq.conf <<EOF
|
|||||||
transport_url=${TRANSPORT_URL}
|
transport_url=${TRANSPORT_URL}
|
||||||
rpc_zmq_matchmaker=${ZMQ_MATCHMAKER}
|
rpc_zmq_matchmaker=${ZMQ_MATCHMAKER}
|
||||||
rpc_zmq_ipc_dir=${ZMQ_IPC_DIR}
|
rpc_zmq_ipc_dir=${ZMQ_IPC_DIR}
|
||||||
|
use_pub_sub=true
|
||||||
use_router_proxy=true
|
use_router_proxy=true
|
||||||
[matchmaker_redis]
|
[matchmaker_redis]
|
||||||
port=${ZMQ_REDIS_PORT}
|
port=${ZMQ_REDIS_PORT}
|
||||||
|
Loading…
Reference in New Issue
Block a user