Merge "[zmq] Host name and target in socket identity"
This commit is contained in:
commit
4f8fcb332d
@ -13,6 +13,9 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
import six
|
||||||
|
|
||||||
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
|
from oslo_messaging._drivers.zmq_driver.client.publishers.dealer \
|
||||||
import zmq_dealer_publisher_base
|
import zmq_dealer_publisher_base
|
||||||
@ -41,12 +44,17 @@ class DealerPublisherProxy(zmq_dealer_publisher_base.DealerPublisherBase):
|
|||||||
receiver = zmq_receivers.ReplyReceiverProxy(conf)
|
receiver = zmq_receivers.ReplyReceiverProxy(conf)
|
||||||
super(DealerPublisherProxy, self).__init__(conf, matchmaker, sender,
|
super(DealerPublisherProxy, self).__init__(conf, matchmaker, sender,
|
||||||
receiver)
|
receiver)
|
||||||
self.socket = self.sockets_manager.get_socket_to_publishers()
|
self.socket = self.sockets_manager.get_socket_to_publishers(
|
||||||
|
self._generate_identity())
|
||||||
self.routing_table = zmq_routing_table.RoutingTable(self.conf,
|
self.routing_table = zmq_routing_table.RoutingTable(self.conf,
|
||||||
self.matchmaker)
|
self.matchmaker)
|
||||||
self.connection_updater = \
|
self.connection_updater = \
|
||||||
PublisherConnectionUpdater(self.conf, self.matchmaker, self.socket)
|
PublisherConnectionUpdater(self.conf, self.matchmaker, self.socket)
|
||||||
|
|
||||||
|
def _generate_identity(self):
|
||||||
|
return six.b(self.conf.oslo_messaging_zmq.rpc_zmq_host + "/" +
|
||||||
|
str(uuid.uuid4()))
|
||||||
|
|
||||||
def connect_socket(self, request):
|
def connect_socket(self, request):
|
||||||
return self.socket
|
return self.socket
|
||||||
|
|
||||||
|
@ -72,23 +72,25 @@ class SocketsManager(object):
|
|||||||
self._get_hosts_and_connect(socket, target)
|
self._get_hosts_and_connect(socket, target)
|
||||||
return socket
|
return socket
|
||||||
|
|
||||||
def get_socket_to_publishers(self):
|
def get_socket_to_publishers(self, identity=None):
|
||||||
if self.socket_to_publishers is not None:
|
if self.socket_to_publishers is not None:
|
||||||
return self.socket_to_publishers
|
return self.socket_to_publishers
|
||||||
self.socket_to_publishers = zmq_socket.ZmqSocket(
|
self.socket_to_publishers = zmq_socket.ZmqSocket(
|
||||||
self.conf, self.zmq_context, self.socket_type,
|
self.conf, self.zmq_context, self.socket_type,
|
||||||
immediate=self.conf.oslo_messaging_zmq.zmq_immediate)
|
immediate=self.conf.oslo_messaging_zmq.zmq_immediate,
|
||||||
|
identity=identity)
|
||||||
publishers = self.matchmaker.get_publishers()
|
publishers = self.matchmaker.get_publishers()
|
||||||
for pub_address, router_address in publishers:
|
for pub_address, router_address in publishers:
|
||||||
self.socket_to_publishers.connect_to_host(router_address)
|
self.socket_to_publishers.connect_to_host(router_address)
|
||||||
return self.socket_to_publishers
|
return self.socket_to_publishers
|
||||||
|
|
||||||
def get_socket_to_routers(self):
|
def get_socket_to_routers(self, identity=None):
|
||||||
if self.socket_to_routers is not None:
|
if self.socket_to_routers is not None:
|
||||||
return self.socket_to_routers
|
return self.socket_to_routers
|
||||||
self.socket_to_routers = zmq_socket.ZmqSocket(
|
self.socket_to_routers = zmq_socket.ZmqSocket(
|
||||||
self.conf, self.zmq_context, self.socket_type,
|
self.conf, self.zmq_context, self.socket_type,
|
||||||
immediate=self.conf.oslo_messaging_zmq.zmq_immediate)
|
immediate=self.conf.oslo_messaging_zmq.zmq_immediate,
|
||||||
|
identity=identity)
|
||||||
routers = self.matchmaker.get_routers()
|
routers = self.matchmaker.get_routers()
|
||||||
for router_address in routers:
|
for router_address in routers:
|
||||||
self.socket_to_routers.connect_to_host(router_address)
|
self.socket_to_routers.connect_to_host(router_address)
|
||||||
|
@ -105,10 +105,12 @@ class UniversalQueueProxy(object):
|
|||||||
socket.send(b'', zmq.SNDMORE)
|
socket.send(b'', zmq.SNDMORE)
|
||||||
socket.send(reply_id, zmq.SNDMORE)
|
socket.send(reply_id, zmq.SNDMORE)
|
||||||
socket.send(six.b(str(message_type)), zmq.SNDMORE)
|
socket.send(six.b(str(message_type)), zmq.SNDMORE)
|
||||||
LOG.debug("Dispatching %(msg_type)s message %(msg_id)s - to %(rkey)s" %
|
LOG.debug("Dispatching %(msg_type)s message %(msg_id)s - from %(rid)s "
|
||||||
|
"to -> %(rkey)s" %
|
||||||
{"msg_type": zmq_names.message_type_str(message_type),
|
{"msg_type": zmq_names.message_type_str(message_type),
|
||||||
"msg_id": message_id,
|
"msg_id": message_id,
|
||||||
"rkey": routing_key})
|
"rkey": routing_key,
|
||||||
|
"rid": reply_id})
|
||||||
socket.send_multipart(multipart_message)
|
socket.send_multipart(multipart_message)
|
||||||
|
|
||||||
def cleanup(self):
|
def cleanup(self):
|
||||||
|
@ -13,6 +13,9 @@
|
|||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
import uuid
|
||||||
|
|
||||||
|
import six
|
||||||
|
|
||||||
from oslo_messaging._drivers import common as rpc_common
|
from oslo_messaging._drivers import common as rpc_common
|
||||||
from oslo_messaging._drivers.zmq_driver.client import zmq_senders
|
from oslo_messaging._drivers.zmq_driver.client import zmq_senders
|
||||||
@ -21,6 +24,7 @@ from oslo_messaging._drivers.zmq_driver.server.consumers \
|
|||||||
import zmq_consumer_base
|
import zmq_consumer_base
|
||||||
from oslo_messaging._drivers.zmq_driver.server import zmq_incoming_message
|
from oslo_messaging._drivers.zmq_driver.server import zmq_incoming_message
|
||||||
from oslo_messaging._drivers.zmq_driver.server import zmq_ttl_cache
|
from oslo_messaging._drivers.zmq_driver.server import zmq_ttl_cache
|
||||||
|
from oslo_messaging._drivers.zmq_driver import zmq_address
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_updater
|
from oslo_messaging._drivers.zmq_driver import zmq_updater
|
||||||
@ -47,9 +51,15 @@ class DealerConsumer(zmq_consumer_base.SingleSocketConsumer):
|
|||||||
conf, self.matchmaker, self.socket)
|
conf, self.matchmaker, self.socket)
|
||||||
LOG.info(_LI("[%s] Run DEALER consumer"), self.host)
|
LOG.info(_LI("[%s] Run DEALER consumer"), self.host)
|
||||||
|
|
||||||
|
def _generate_identity(self):
|
||||||
|
return six.b(self.conf.oslo_messaging_zmq.rpc_zmq_host + "/" +
|
||||||
|
zmq_address.target_to_key(self.target) + "/" +
|
||||||
|
str(uuid.uuid4()))
|
||||||
|
|
||||||
def subscribe_socket(self, socket_type):
|
def subscribe_socket(self, socket_type):
|
||||||
try:
|
try:
|
||||||
socket = self.sockets_manager.get_socket_to_routers()
|
socket = self.sockets_manager.get_socket_to_routers(
|
||||||
|
self._generate_identity())
|
||||||
self.sockets.append(socket)
|
self.sockets.append(socket)
|
||||||
self.host = socket.handle.identity
|
self.host = socket.handle.identity
|
||||||
self.poller.register(socket, self.receive_message)
|
self.poller.register(socket, self.receive_message)
|
||||||
|
@ -35,10 +35,11 @@ def prefix_str(key, listener_type):
|
|||||||
return listener_type + "_" + key
|
return listener_type + "_" + key
|
||||||
|
|
||||||
|
|
||||||
def target_to_key(target, listener_type):
|
def target_to_key(target, listener_type=None):
|
||||||
|
|
||||||
def prefix(key):
|
def prefix(key):
|
||||||
return prefix_str(key, listener_type)
|
return prefix_str(key, listener_type) if listener_type is not None \
|
||||||
|
else key
|
||||||
|
|
||||||
if target.topic and target.server:
|
if target.topic and target.server:
|
||||||
attributes = ['topic', 'server']
|
attributes = ['topic', 'server']
|
||||||
|
@ -39,7 +39,7 @@ class ZmqSocket(object):
|
|||||||
}
|
}
|
||||||
|
|
||||||
def __init__(self, conf, context, socket_type, immediate,
|
def __init__(self, conf, context, socket_type, immediate,
|
||||||
high_watermark=0):
|
high_watermark=0, identity=None):
|
||||||
self.conf = conf
|
self.conf = conf
|
||||||
self.context = context
|
self.context = context
|
||||||
self.socket_type = socket_type
|
self.socket_type = socket_type
|
||||||
@ -53,7 +53,8 @@ class ZmqSocket(object):
|
|||||||
self.handle.setsockopt(zmq.LINGER, self.close_linger)
|
self.handle.setsockopt(zmq.LINGER, self.close_linger)
|
||||||
# Put messages to only connected queues
|
# Put messages to only connected queues
|
||||||
self.handle.setsockopt(zmq.IMMEDIATE, 1 if immediate else 0)
|
self.handle.setsockopt(zmq.IMMEDIATE, 1 if immediate else 0)
|
||||||
self.handle.identity = six.b(str(uuid.uuid4()))
|
self.handle.identity = six.b(str(uuid.uuid4())) if identity is None \
|
||||||
|
else identity
|
||||||
self.connections = set()
|
self.connections = set()
|
||||||
|
|
||||||
def _get_serializer(self, serialization):
|
def _get_serializer(self, serialization):
|
||||||
|
Loading…
Reference in New Issue
Block a user