Merge "Improvement of logging acorrding to oslo.i18n guideline"
This commit is contained in:
commit
6e95e8b891
@ -28,6 +28,7 @@ from oslo_messaging._drivers import amqp as rpc_amqp
|
|||||||
from oslo_messaging._drivers import base
|
from oslo_messaging._drivers import base
|
||||||
from oslo_messaging._drivers import common as rpc_common
|
from oslo_messaging._drivers import common as rpc_common
|
||||||
from oslo_messaging._i18n import _
|
from oslo_messaging._i18n import _
|
||||||
|
from oslo_messaging._i18n import _LE
|
||||||
from oslo_messaging._i18n import _LI
|
from oslo_messaging._i18n import _LI
|
||||||
from oslo_messaging._i18n import _LW
|
from oslo_messaging._i18n import _LW
|
||||||
|
|
||||||
@ -64,7 +65,7 @@ class AMQPIncomingMessage(base.IncomingMessage):
|
|||||||
unique_id = msg[rpc_amqp.UNIQUE_ID]
|
unique_id = msg[rpc_amqp.UNIQUE_ID]
|
||||||
|
|
||||||
LOG.debug("sending reply msg_id: %(msg_id)s "
|
LOG.debug("sending reply msg_id: %(msg_id)s "
|
||||||
"reply queue: %(reply_q)s" % {
|
"reply queue: %(reply_q)s", {
|
||||||
'msg_id': self.msg_id,
|
'msg_id': self.msg_id,
|
||||||
'unique_id': unique_id,
|
'unique_id': unique_id,
|
||||||
'reply_q': self.reply_q})
|
'reply_q': self.reply_q})
|
||||||
@ -99,7 +100,7 @@ class AMQPIncomingMessage(base.IncomingMessage):
|
|||||||
if timer.check_return() > 0:
|
if timer.check_return() > 0:
|
||||||
LOG.debug(("The reply %(msg_id)s cannot be sent "
|
LOG.debug(("The reply %(msg_id)s cannot be sent "
|
||||||
"%(reply_q)s reply queue don't exist, "
|
"%(reply_q)s reply queue don't exist, "
|
||||||
"retrying...") % {
|
"retrying..."), {
|
||||||
'msg_id': self.msg_id,
|
'msg_id': self.msg_id,
|
||||||
'reply_q': self.reply_q})
|
'reply_q': self.reply_q})
|
||||||
time.sleep(0.25)
|
time.sleep(0.25)
|
||||||
@ -107,7 +108,7 @@ class AMQPIncomingMessage(base.IncomingMessage):
|
|||||||
self._obsolete_reply_queues.add(self.reply_q, self.msg_id)
|
self._obsolete_reply_queues.add(self.reply_q, self.msg_id)
|
||||||
LOG.info(_LI("The reply %(msg_id)s cannot be sent "
|
LOG.info(_LI("The reply %(msg_id)s cannot be sent "
|
||||||
"%(reply_q)s reply queue don't exist after "
|
"%(reply_q)s reply queue don't exist after "
|
||||||
"%(duration)s sec abandoning...") % {
|
"%(duration)s sec abandoning..."), {
|
||||||
'msg_id': self.msg_id,
|
'msg_id': self.msg_id,
|
||||||
'reply_q': self.reply_q,
|
'reply_q': self.reply_q,
|
||||||
'duration': duration})
|
'duration': duration})
|
||||||
@ -192,7 +193,7 @@ class AMQPListener(base.Listener):
|
|||||||
|
|
||||||
unique_id = self.msg_id_cache.check_duplicate_message(message)
|
unique_id = self.msg_id_cache.check_duplicate_message(message)
|
||||||
|
|
||||||
LOG.debug("received message msg_id: %(msg_id)s reply to %(queue)s" % {
|
LOG.debug("received message msg_id: %(msg_id)s reply to %(queue)s", {
|
||||||
'queue': ctxt.reply_q, 'msg_id': ctxt.msg_id})
|
'queue': ctxt.reply_q, 'msg_id': ctxt.msg_id})
|
||||||
|
|
||||||
self.incoming.append(AMQPIncomingMessage(self,
|
self.incoming.append(AMQPIncomingMessage(self,
|
||||||
@ -250,10 +251,11 @@ class ReplyWaiters(object):
|
|||||||
def add(self, msg_id):
|
def add(self, msg_id):
|
||||||
self._queues[msg_id] = moves.queue.Queue()
|
self._queues[msg_id] = moves.queue.Queue()
|
||||||
if len(self._queues) > self._wrn_threshold:
|
if len(self._queues) > self._wrn_threshold:
|
||||||
LOG.warn('Number of call queues is greater than warning '
|
LOG.warn(_LW('Number of call queues is greater than warning '
|
||||||
'threshold: %d. There could be a leak. Increasing'
|
'threshold: %(old_threshold)s. There could be a '
|
||||||
' threshold to: %d', self._wrn_threshold,
|
'leak. Increasing threshold to: %(threshold)s'),
|
||||||
self._wrn_threshold * 2)
|
{'old_threshold': self._wrn_threshold,
|
||||||
|
'threshold': self._wrn_threshold * 2})
|
||||||
self._wrn_threshold *= 2
|
self._wrn_threshold *= 2
|
||||||
|
|
||||||
def remove(self, msg_id):
|
def remove(self, msg_id):
|
||||||
@ -286,14 +288,14 @@ class ReplyWaiter(object):
|
|||||||
try:
|
try:
|
||||||
self.conn.consume()
|
self.conn.consume()
|
||||||
except Exception:
|
except Exception:
|
||||||
LOG.exception("Failed to process incoming message, "
|
LOG.exception(_LE("Failed to process incoming message, "
|
||||||
"retrying...")
|
"retrying..."))
|
||||||
|
|
||||||
def __call__(self, message):
|
def __call__(self, message):
|
||||||
message.acknowledge()
|
message.acknowledge()
|
||||||
incoming_msg_id = message.pop('_msg_id', None)
|
incoming_msg_id = message.pop('_msg_id', None)
|
||||||
if message.get('ending'):
|
if message.get('ending'):
|
||||||
LOG.debug("received reply msg_id: %s" % incoming_msg_id)
|
LOG.debug("received reply msg_id: %s", incoming_msg_id)
|
||||||
self.waiters.put(incoming_msg_id, message)
|
self.waiters.put(incoming_msg_id, message)
|
||||||
|
|
||||||
def listen(self, msg_id):
|
def listen(self, msg_id):
|
||||||
|
@ -85,7 +85,8 @@ class RPCException(Exception):
|
|||||||
except Exception:
|
except Exception:
|
||||||
# kwargs doesn't match a variable in the message
|
# kwargs doesn't match a variable in the message
|
||||||
# log the issue and the kwargs
|
# log the issue and the kwargs
|
||||||
LOG.exception(_LE('Exception in string format operation'))
|
LOG.exception(_LE('Exception in string format operation, '
|
||||||
|
'kwargs are:'))
|
||||||
for name, value in six.iteritems(kwargs):
|
for name, value in six.iteritems(kwargs):
|
||||||
LOG.error("%s: %s", name, value)
|
LOG.error("%s: %s", name, value)
|
||||||
# at least get the core message out if something happened
|
# at least get the core message out if something happened
|
||||||
@ -411,7 +412,7 @@ class ConnectionContext(Connection):
|
|||||||
try:
|
try:
|
||||||
self.connection.reset()
|
self.connection.reset()
|
||||||
except Exception:
|
except Exception:
|
||||||
LOG.exception("Fail to reset the connection, drop it")
|
LOG.exception(_LE("Fail to reset the connection, drop it"))
|
||||||
try:
|
try:
|
||||||
self.connection.close()
|
self.connection.close()
|
||||||
except Exception:
|
except Exception:
|
||||||
|
@ -402,15 +402,15 @@ class Connection(object):
|
|||||||
|
|
||||||
self._url = ''
|
self._url = ''
|
||||||
if self.fake_rabbit:
|
if self.fake_rabbit:
|
||||||
LOG.warn("Deprecated: fake_rabbit option is deprecated, set "
|
LOG.warn(_LW("Deprecated: fake_rabbit option is deprecated, set "
|
||||||
"rpc_backend to kombu+memory or use the fake "
|
"rpc_backend to kombu+memory or use the fake "
|
||||||
"driver instead.")
|
"driver instead."))
|
||||||
self._url = 'memory://%s/' % virtual_host
|
self._url = 'memory://%s/' % virtual_host
|
||||||
elif url.hosts:
|
elif url.hosts:
|
||||||
if url.transport.startswith('kombu+'):
|
if url.transport.startswith('kombu+'):
|
||||||
LOG.warn(_LW('Selecting the kombu transport through the '
|
LOG.warn(_LW('Selecting the kombu transport through the '
|
||||||
'transport url (%s) is a experimental feature '
|
'transport url (%s) is a experimental feature '
|
||||||
'and this is not yet supported.') % url.transport)
|
'and this is not yet supported.'), url.transport)
|
||||||
if len(url.hosts) > 1:
|
if len(url.hosts) > 1:
|
||||||
random.shuffle(url.hosts)
|
random.shuffle(url.hosts)
|
||||||
for host in url.hosts:
|
for host in url.hosts:
|
||||||
@ -600,10 +600,10 @@ class Connection(object):
|
|||||||
|
|
||||||
current_pid = os.getpid()
|
current_pid = os.getpid()
|
||||||
if self._initial_pid != current_pid:
|
if self._initial_pid != current_pid:
|
||||||
LOG.warn("Process forked after connection established! "
|
LOG.warn(_LW("Process forked after connection established! "
|
||||||
"This can result in unpredictable behavior. "
|
"This can result in unpredictable behavior. "
|
||||||
"See: http://docs.openstack.org/developer/"
|
"See: http://docs.openstack.org/developer/"
|
||||||
"oslo_messaging/transport.html")
|
"oslo_messaging/transport.html"))
|
||||||
self._initial_pid = current_pid
|
self._initial_pid = current_pid
|
||||||
|
|
||||||
if retry is None:
|
if retry is None:
|
||||||
|
@ -36,6 +36,7 @@ from six import moves
|
|||||||
|
|
||||||
from oslo_messaging._drivers.protocols.amqp import eventloop
|
from oslo_messaging._drivers.protocols.amqp import eventloop
|
||||||
from oslo_messaging._drivers.protocols.amqp import opts
|
from oslo_messaging._drivers.protocols.amqp import opts
|
||||||
|
from oslo_messaging._i18n import _LE, _LI, _LW
|
||||||
from oslo_messaging import exceptions
|
from oslo_messaging import exceptions
|
||||||
from oslo_messaging import transport
|
from oslo_messaging import transport
|
||||||
|
|
||||||
@ -90,8 +91,8 @@ class Replies(pyngus.ReceiverEventHandler):
|
|||||||
# reply is placed on reply_queue
|
# reply is placed on reply_queue
|
||||||
self._correlation[request.id] = reply_queue
|
self._correlation[request.id] = reply_queue
|
||||||
request.reply_to = self._receiver.source_address
|
request.reply_to = self._receiver.source_address
|
||||||
LOG.debug("Reply for msg id=%s expected on link %s",
|
LOG.debug("Reply for msg id=%(id)s expected on link %(reply_to)s",
|
||||||
request.id, request.reply_to)
|
{'id': request.id, 'reply_to': request.reply_to})
|
||||||
return request.id
|
return request.id
|
||||||
|
|
||||||
def cancel_response(self, msg_id):
|
def cancel_response(self, msg_id):
|
||||||
@ -121,7 +122,7 @@ class Replies(pyngus.ReceiverEventHandler):
|
|||||||
# TODO(kgiusti) Unclear if this error will ever occur (as opposed to
|
# TODO(kgiusti) Unclear if this error will ever occur (as opposed to
|
||||||
# the Connection failing instead). Log for now, possibly implement a
|
# the Connection failing instead). Log for now, possibly implement a
|
||||||
# recovery strategy if necessary.
|
# recovery strategy if necessary.
|
||||||
LOG.error("Reply subscription closed by peer: %s",
|
LOG.error(_LE("Reply subscription closed by peer: %s"),
|
||||||
(pn_condition or "no error given"))
|
(pn_condition or "no error given"))
|
||||||
|
|
||||||
def message_received(self, receiver, message, handle):
|
def message_received(self, receiver, message, handle):
|
||||||
@ -141,8 +142,8 @@ class Replies(pyngus.ReceiverEventHandler):
|
|||||||
del self._correlation[key]
|
del self._correlation[key]
|
||||||
receiver.message_accepted(handle)
|
receiver.message_accepted(handle)
|
||||||
else:
|
else:
|
||||||
LOG.warn("Can't find receiver for response msg id=%s, dropping!",
|
LOG.warn(_LW("Can't find receiver for response msg id=%s, "
|
||||||
key)
|
"dropping!"), key)
|
||||||
receiver.message_modified(handle, True, True, None)
|
receiver.message_modified(handle, True, True, None)
|
||||||
|
|
||||||
def _update_credit(self):
|
def _update_credit(self):
|
||||||
@ -194,12 +195,12 @@ class Server(pyngus.ReceiverEventHandler):
|
|||||||
"""This is a Pyngus callback, invoked by Pyngus when the peer of this
|
"""This is a Pyngus callback, invoked by Pyngus when the peer of this
|
||||||
receiver link has initiated closing the connection.
|
receiver link has initiated closing the connection.
|
||||||
"""
|
"""
|
||||||
text = "Server subscription %(addr)s closed by peer: %(err_msg)s"
|
|
||||||
vals = {
|
vals = {
|
||||||
"addr": receiver.source_address or receiver.target_address,
|
"addr": receiver.source_address or receiver.target_address,
|
||||||
"err_msg": pn_condition or "no error given"
|
"err_msg": pn_condition or "no error given"
|
||||||
}
|
}
|
||||||
LOG.error(text % vals)
|
LOG.error(_LE("Server subscription %(addr)s closed "
|
||||||
|
"by peer: %(err_msg)s"), vals)
|
||||||
|
|
||||||
def message_received(self, receiver, message, handle):
|
def message_received(self, receiver, message, handle):
|
||||||
"""This is a Pyngus callback, invoked by Pyngus when a new message
|
"""This is a Pyngus callback, invoked by Pyngus when a new message
|
||||||
@ -348,7 +349,8 @@ class Controller(pyngus.ConnectionEventHandler):
|
|||||||
will include the reply message (if successful).
|
will include the reply message (if successful).
|
||||||
"""
|
"""
|
||||||
address = self._resolve(target)
|
address = self._resolve(target)
|
||||||
LOG.debug("Sending request for %s to %s", target, address)
|
LOG.debug("Sending request for %(target)s to %(address)s",
|
||||||
|
{'target': target, 'address': address})
|
||||||
if reply_expected:
|
if reply_expected:
|
||||||
msg_id = self._replies.prepare_for_response(request, result_queue)
|
msg_id = self._replies.prepare_for_response(request, result_queue)
|
||||||
|
|
||||||
@ -399,7 +401,8 @@ class Controller(pyngus.ConnectionEventHandler):
|
|||||||
self._subscribe(target, addresses, in_queue)
|
self._subscribe(target, addresses, in_queue)
|
||||||
|
|
||||||
def _subscribe(self, target, addresses, in_queue):
|
def _subscribe(self, target, addresses, in_queue):
|
||||||
LOG.debug("Subscribing to %s (%s)", target, addresses)
|
LOG.debug("Subscribing to %(target)s (%(addresses)s)",
|
||||||
|
{'target': target, 'addresses': addresses})
|
||||||
self._servers[target] = Server(addresses, in_queue)
|
self._servers[target] = Server(addresses, in_queue)
|
||||||
self._servers[target].attach(self._socket_connection.connection)
|
self._servers[target].attach(self._socket_connection.connection)
|
||||||
|
|
||||||
@ -500,7 +503,7 @@ class Controller(pyngus.ConnectionEventHandler):
|
|||||||
try:
|
try:
|
||||||
self._tasks.get(False).execute(self)
|
self._tasks.get(False).execute(self)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
LOG.exception("Error processing task: %s", e)
|
LOG.exception(_LE("Error processing task: %s"), e)
|
||||||
count += 1
|
count += 1
|
||||||
|
|
||||||
# if we hit _max_task_batch, resume task processing later:
|
# if we hit _max_task_batch, resume task processing later:
|
||||||
@ -532,7 +535,7 @@ class Controller(pyngus.ConnectionEventHandler):
|
|||||||
"""Called when the driver destroys the controller, this method attempts
|
"""Called when the driver destroys the controller, this method attempts
|
||||||
to cleanly close the AMQP connection to the peer.
|
to cleanly close the AMQP connection to the peer.
|
||||||
"""
|
"""
|
||||||
LOG.info("Shutting down AMQP connection")
|
LOG.info(_LI("Shutting down AMQP connection"))
|
||||||
self._closing = True
|
self._closing = True
|
||||||
if self._socket_connection.connection.active:
|
if self._socket_connection.connection.active:
|
||||||
# try a clean shutdown
|
# try a clean shutdown
|
||||||
@ -547,8 +550,9 @@ class Controller(pyngus.ConnectionEventHandler):
|
|||||||
"""Invoked when the Replies reply link has become active. At this
|
"""Invoked when the Replies reply link has become active. At this
|
||||||
point, we are ready to send/receive messages (via Task processing).
|
point, we are ready to send/receive messages (via Task processing).
|
||||||
"""
|
"""
|
||||||
LOG.info("Messaging is active (%s:%i)", self.hosts.current.hostname,
|
LOG.info(_LI("Messaging is active (%(hostname)s:%(port)s)"),
|
||||||
self.hosts.current.port)
|
{'hostname': self.hosts.current.hostname,
|
||||||
|
'port': self.hosts.current.port})
|
||||||
self._schedule_task_processing()
|
self._schedule_task_processing()
|
||||||
|
|
||||||
# callback from eventloop on socket error
|
# callback from eventloop on socket error
|
||||||
@ -576,8 +580,9 @@ class Controller(pyngus.ConnectionEventHandler):
|
|||||||
the peer is up. At this point, the driver will activate all subscriber
|
the peer is up. At this point, the driver will activate all subscriber
|
||||||
links (server) and the reply link.
|
links (server) and the reply link.
|
||||||
"""
|
"""
|
||||||
LOG.debug("Connection active (%s:%i), subscribing...",
|
LOG.debug("Connection active (%(hostname)s:%(port)s), subscribing...",
|
||||||
self.hosts.current.hostname, self.hosts.current.port)
|
{'hostname': self.hosts.current.hostname,
|
||||||
|
'port': self.hosts.current.port})
|
||||||
for s in self._servers.values():
|
for s in self._servers.values():
|
||||||
s.attach(self._socket_connection.connection)
|
s.attach(self._socket_connection.connection)
|
||||||
self._replies = Replies(self._socket_connection.connection,
|
self._replies = Replies(self._socket_connection.connection,
|
||||||
@ -603,7 +608,7 @@ class Controller(pyngus.ConnectionEventHandler):
|
|||||||
# connection. Acknowledge the close, and try to reconnect/failover
|
# connection. Acknowledge the close, and try to reconnect/failover
|
||||||
# later once the connection has closed (connection_closed is
|
# later once the connection has closed (connection_closed is
|
||||||
# called).
|
# called).
|
||||||
LOG.info("Connection closed by peer: %s",
|
LOG.info(_LI("Connection closed by peer: %s"),
|
||||||
reason or "no reason given")
|
reason or "no reason given")
|
||||||
self._socket_connection.connection.close()
|
self._socket_connection.connection.close()
|
||||||
|
|
||||||
@ -614,9 +619,11 @@ class Controller(pyngus.ConnectionEventHandler):
|
|||||||
"""
|
"""
|
||||||
if outcome == proton.SASL.OK:
|
if outcome == proton.SASL.OK:
|
||||||
return
|
return
|
||||||
LOG.error("AUTHENTICATION FAILURE: Cannot connect to %s:%s as user %s",
|
LOG.error(_LE("AUTHENTICATION FAILURE: Cannot connect to "
|
||||||
self.hosts.current.hostname, self.hosts.current.port,
|
"%(hostname)s:%(port)s as user %(username)s"),
|
||||||
self.hosts.current.username)
|
{'hostname': self.hosts.current.hostname,
|
||||||
|
'port': self.hosts.current.port,
|
||||||
|
'username': self.hosts.current.username})
|
||||||
# connection failure will be handled later
|
# connection failure will be handled later
|
||||||
|
|
||||||
def _complete_shutdown(self):
|
def _complete_shutdown(self):
|
||||||
@ -625,7 +632,7 @@ class Controller(pyngus.ConnectionEventHandler):
|
|||||||
"""
|
"""
|
||||||
self._socket_connection.close()
|
self._socket_connection.close()
|
||||||
self.processor.shutdown()
|
self.processor.shutdown()
|
||||||
LOG.info("Messaging has shutdown")
|
LOG.info(_LI("Messaging has shutdown"))
|
||||||
|
|
||||||
def _handle_connection_loss(self):
|
def _handle_connection_loss(self):
|
||||||
"""The connection to the messaging service has been lost. Try to
|
"""The connection to the messaging service has been lost. Try to
|
||||||
@ -641,7 +648,7 @@ class Controller(pyngus.ConnectionEventHandler):
|
|||||||
if not self._reconnecting:
|
if not self._reconnecting:
|
||||||
self._reconnecting = True
|
self._reconnecting = True
|
||||||
self._replies = None
|
self._replies = None
|
||||||
LOG.info("delaying reconnect attempt for %d seconds",
|
LOG.info(_LI("delaying reconnect attempt for %d seconds"),
|
||||||
self._delay)
|
self._delay)
|
||||||
self.processor.schedule(lambda: self._do_reconnect(),
|
self.processor.schedule(lambda: self._do_reconnect(),
|
||||||
self._delay)
|
self._delay)
|
||||||
@ -660,5 +667,6 @@ class Controller(pyngus.ConnectionEventHandler):
|
|||||||
self._senders = {}
|
self._senders = {}
|
||||||
self._socket_connection.reset()
|
self._socket_connection.reset()
|
||||||
host = self.hosts.next()
|
host = self.hosts.next()
|
||||||
LOG.info("Reconnecting to: %s:%i", host.hostname, host.port)
|
LOG.info(_LI("Reconnecting to: %(hostname):%(port)"),
|
||||||
|
{'hostname': host.hostname, 'port': host.port})
|
||||||
self._socket_connection.connect(host)
|
self._socket_connection.connect(host)
|
||||||
|
@ -31,6 +31,7 @@ from six import moves
|
|||||||
|
|
||||||
from oslo_messaging._drivers import base
|
from oslo_messaging._drivers import base
|
||||||
from oslo_messaging._drivers import common
|
from oslo_messaging._drivers import common
|
||||||
|
from oslo_messaging._i18n import _LI, _LW
|
||||||
from oslo_messaging import target as messaging_target
|
from oslo_messaging import target as messaging_target
|
||||||
|
|
||||||
|
|
||||||
@ -137,7 +138,7 @@ class ProtonDriver(base.BaseDriver):
|
|||||||
def __init__(self, conf, url,
|
def __init__(self, conf, url,
|
||||||
default_exchange=None, allowed_remote_exmods=[]):
|
default_exchange=None, allowed_remote_exmods=[]):
|
||||||
# TODO(kgiusti) Remove once driver fully stabilizes:
|
# TODO(kgiusti) Remove once driver fully stabilizes:
|
||||||
LOG.warning("Support for the 'amqp' transport is EXPERIMENTAL.")
|
LOG.warning(_LW("Support for the 'amqp' transport is EXPERIMENTAL."))
|
||||||
if proton is None or hasattr(controller, "fake_controller"):
|
if proton is None or hasattr(controller, "fake_controller"):
|
||||||
raise NotImplementedError("Proton AMQP C libraries not installed")
|
raise NotImplementedError("Proton AMQP C libraries not installed")
|
||||||
|
|
||||||
@ -167,7 +168,8 @@ class ProtonDriver(base.BaseDriver):
|
|||||||
|
|
||||||
if old_pid != self._pid:
|
if old_pid != self._pid:
|
||||||
if self._ctrl is not None:
|
if self._ctrl is not None:
|
||||||
LOG.warning("Process forked after connection established!")
|
LOG.warning(_LW("Process forked after connection "
|
||||||
|
"established!"))
|
||||||
self._ctrl.shutdown(wait=False)
|
self._ctrl.shutdown(wait=False)
|
||||||
# Create a Controller that connects to the messaging service:
|
# Create a Controller that connects to the messaging service:
|
||||||
self._ctrl = controller.Controller(self._hosts,
|
self._ctrl = controller.Controller(self._hosts,
|
||||||
@ -244,4 +246,4 @@ class ProtonDriver(base.BaseDriver):
|
|||||||
if self._ctrl:
|
if self._ctrl:
|
||||||
self._ctrl.shutdown()
|
self._ctrl.shutdown()
|
||||||
self._ctrl = None
|
self._ctrl = None
|
||||||
LOG.info("AMQP 1.0 messaging driver shutdown")
|
LOG.info(_LI("AMQP 1.0 messaging driver shutdown"))
|
||||||
|
@ -17,6 +17,7 @@ import threading
|
|||||||
import time
|
import time
|
||||||
|
|
||||||
from oslo_messaging._drivers.protocols.amqp import controller
|
from oslo_messaging._drivers.protocols.amqp import controller
|
||||||
|
from oslo_messaging._i18n import _LW
|
||||||
from oslo_messaging import exceptions
|
from oslo_messaging import exceptions
|
||||||
|
|
||||||
from six import moves
|
from six import moves
|
||||||
@ -61,7 +62,8 @@ class SendTask(controller.Task):
|
|||||||
controller.request(self._target, self._request,
|
controller.request(self._target, self._request,
|
||||||
self._results_queue, self._wait_for_reply)
|
self._results_queue, self._wait_for_reply)
|
||||||
else:
|
else:
|
||||||
LOG.warn("Send request to %s aborted: TTL expired.", self._target)
|
LOG.warn(_LW("Send request to %s aborted: TTL expired."),
|
||||||
|
self._target)
|
||||||
|
|
||||||
|
|
||||||
class ListenTask(controller.Task):
|
class ListenTask(controller.Task):
|
||||||
|
@ -36,6 +36,7 @@ import uuid
|
|||||||
import pyngus
|
import pyngus
|
||||||
from six import moves
|
from six import moves
|
||||||
|
|
||||||
|
from oslo_messaging._i18n import _LE, _LI, _LW
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
@ -100,7 +101,7 @@ class _SocketConnection(object):
|
|||||||
if not addr:
|
if not addr:
|
||||||
key = "%s:%i" % (host.hostname, host.port)
|
key = "%s:%i" % (host.hostname, host.port)
|
||||||
error = "Invalid peer address '%s'" % key
|
error = "Invalid peer address '%s'" % key
|
||||||
LOG.error(error)
|
LOG.error(_LE("Invalid peer address '%s'"), key)
|
||||||
self._handler.socket_error(error)
|
self._handler.socket_error(error)
|
||||||
return
|
return
|
||||||
my_socket = socket.socket(addr[0][0], addr[0][1], addr[0][2])
|
my_socket = socket.socket(addr[0][0], addr[0][1], addr[0][2])
|
||||||
@ -111,7 +112,7 @@ class _SocketConnection(object):
|
|||||||
except socket.error as e:
|
except socket.error as e:
|
||||||
if e.errno != errno.EINPROGRESS:
|
if e.errno != errno.EINPROGRESS:
|
||||||
error = "Socket connect failure '%s'" % str(e)
|
error = "Socket connect failure '%s'" % str(e)
|
||||||
LOG.error(error)
|
LOG.error(_LE("Socket connect failure '%s'"), str(e))
|
||||||
self._handler.socket_error(error)
|
self._handler.socket_error(error)
|
||||||
return
|
return
|
||||||
self.socket = my_socket
|
self.socket = my_socket
|
||||||
@ -316,7 +317,7 @@ class Thread(threading.Thread):
|
|||||||
results = select.select(readfds, writefds, [], timeout)
|
results = select.select(readfds, writefds, [], timeout)
|
||||||
except select.error as serror:
|
except select.error as serror:
|
||||||
if serror[0] == errno.EINTR:
|
if serror[0] == errno.EINTR:
|
||||||
LOG.warning("ignoring interrupt from select(): %s",
|
LOG.warning(_LW("ignoring interrupt from select(): %s"),
|
||||||
str(serror))
|
str(serror))
|
||||||
continue
|
continue
|
||||||
raise # assuming fatal...
|
raise # assuming fatal...
|
||||||
@ -342,6 +343,6 @@ class Thread(threading.Thread):
|
|||||||
|
|
||||||
self._schedule.process() # run any deferred requests
|
self._schedule.process() # run any deferred requests
|
||||||
|
|
||||||
LOG.info("eventloop thread exiting, container=%s",
|
LOG.info(_LI("eventloop thread exiting, container=%s"),
|
||||||
self._container.name)
|
self._container.name)
|
||||||
self._container.destroy()
|
self._container.destroy()
|
||||||
|
@ -58,8 +58,8 @@ class UniversalQueueProxy(zmq_base_proxy.BaseProxy):
|
|||||||
self._redirect_reply(message)
|
self._redirect_reply(message)
|
||||||
|
|
||||||
def _redirect_in_request(self, multipart_message):
|
def _redirect_in_request(self, multipart_message):
|
||||||
LOG.debug("-> Redirecting request %s to TCP publisher"
|
LOG.debug("-> Redirecting request %s to TCP publisher",
|
||||||
% multipart_message)
|
multipart_message)
|
||||||
envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE]
|
envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE]
|
||||||
if self.conf.use_pub_sub and \
|
if self.conf.use_pub_sub and \
|
||||||
envelope[zmq_names.FIELD_MSG_TYPE] \
|
envelope[zmq_names.FIELD_MSG_TYPE] \
|
||||||
@ -69,13 +69,13 @@ class UniversalQueueProxy(zmq_base_proxy.BaseProxy):
|
|||||||
self.direct_publisher.send_request(multipart_message)
|
self.direct_publisher.send_request(multipart_message)
|
||||||
|
|
||||||
def _redirect_reply(self, reply):
|
def _redirect_reply(self, reply):
|
||||||
LOG.debug("Reply proxy %s" % reply)
|
LOG.debug("Reply proxy %s", reply)
|
||||||
if reply[zmq_names.IDX_REPLY_TYPE] == zmq_names.ACK_TYPE:
|
if reply[zmq_names.IDX_REPLY_TYPE] == zmq_names.ACK_TYPE:
|
||||||
LOG.debug("Acknowledge dropped %s" % reply)
|
LOG.debug("Acknowledge dropped %s", reply)
|
||||||
return
|
return
|
||||||
|
|
||||||
LOG.debug("<- Redirecting reply to ROUTER: reply: %s"
|
LOG.debug("<- Redirecting reply to ROUTER: reply: %s",
|
||||||
% reply[zmq_names.IDX_REPLY_BODY:])
|
reply[zmq_names.IDX_REPLY_BODY:])
|
||||||
|
|
||||||
self.router_socket.send_multipart(reply[zmq_names.IDX_REPLY_BODY:])
|
self.router_socket.send_multipart(reply[zmq_names.IDX_REPLY_BODY:])
|
||||||
|
|
||||||
|
@ -58,7 +58,7 @@ class DealerCallPublisher(zmq_publisher_base.PublisherBase):
|
|||||||
finally:
|
finally:
|
||||||
self.reply_waiter.untrack_id(request.message_id)
|
self.reply_waiter.untrack_id(request.message_id)
|
||||||
|
|
||||||
LOG.debug("Received reply %s" % reply)
|
LOG.debug("Received reply %s", reply)
|
||||||
if reply[zmq_names.FIELD_FAILURE]:
|
if reply[zmq_names.FIELD_FAILURE]:
|
||||||
raise rpc_common.deserialize_remote_exception(
|
raise rpc_common.deserialize_remote_exception(
|
||||||
reply[zmq_names.FIELD_FAILURE],
|
reply[zmq_names.FIELD_FAILURE],
|
||||||
@ -86,9 +86,8 @@ class RequestSender(zmq_publisher_base.PublisherMultisend):
|
|||||||
socket.send(b'', zmq.SNDMORE)
|
socket.send(b'', zmq.SNDMORE)
|
||||||
socket.send_pyobj(request)
|
socket.send_pyobj(request)
|
||||||
|
|
||||||
LOG.debug("Sending message_id %(message)s to a target %(target)s"
|
LOG.debug("Sending message_id %(message)s to a target %(target)s",
|
||||||
% {"message": request.message_id,
|
{"message": request.message_id, "target": request.target})
|
||||||
"target": request.target})
|
|
||||||
|
|
||||||
def _check_hosts_connections(self, target, listener_type):
|
def _check_hosts_connections(self, target, listener_type):
|
||||||
if str(target) in self.outbound_sockets:
|
if str(target) in self.outbound_sockets:
|
||||||
@ -144,10 +143,10 @@ class RequestSenderLight(RequestSender):
|
|||||||
|
|
||||||
def _do_send_request(self, socket, request):
|
def _do_send_request(self, socket, request):
|
||||||
LOG.debug("Sending %(type)s message_id %(message)s"
|
LOG.debug("Sending %(type)s message_id %(message)s"
|
||||||
" to a target %(target)s"
|
" to a target %(target)s",
|
||||||
% {"type": request.msg_type,
|
{"type": request.msg_type,
|
||||||
"message": request.message_id,
|
"message": request.message_id,
|
||||||
"target": request.target})
|
"target": request.target})
|
||||||
|
|
||||||
envelope = request.create_envelope()
|
envelope = request.create_envelope()
|
||||||
|
|
||||||
@ -182,7 +181,7 @@ class ReplyWaiter(object):
|
|||||||
empty = socket.recv()
|
empty = socket.recv()
|
||||||
assert empty == b'', "Empty expected!"
|
assert empty == b'', "Empty expected!"
|
||||||
reply = socket.recv_pyobj()
|
reply = socket.recv_pyobj()
|
||||||
LOG.debug("Received reply %s" % reply)
|
LOG.debug("Received reply %s", reply)
|
||||||
return reply
|
return reply
|
||||||
|
|
||||||
self.poller.register(socket, recv_method=_receive_method)
|
self.poller.register(socket, recv_method=_receive_method)
|
||||||
@ -196,4 +195,4 @@ class ReplyWaiter(object):
|
|||||||
if call_future:
|
if call_future:
|
||||||
call_future.set_result(reply)
|
call_future.set_result(reply)
|
||||||
else:
|
else:
|
||||||
LOG.warning(_LW("Received timed out reply: %s") % reply_id)
|
LOG.warning(_LW("Received timed out reply: %s"), reply_id)
|
||||||
|
@ -42,8 +42,8 @@ class DealerPublisher(zmq_publisher_base.PublisherMultisend):
|
|||||||
# a queue for keeping messages to send them later
|
# a queue for keeping messages to send them later
|
||||||
# when some listener appears. However such approach
|
# when some listener appears. However such approach
|
||||||
# being more reliable will consume additional memory.
|
# being more reliable will consume additional memory.
|
||||||
LOG.warning(_LW("Request %s was dropped because no connection")
|
LOG.warning(_LW("Request %s was dropped because no connection"),
|
||||||
% request.msg_type)
|
request.msg_type)
|
||||||
return
|
return
|
||||||
|
|
||||||
if request.msg_type in zmq_names.MULTISEND_TYPES:
|
if request.msg_type in zmq_names.MULTISEND_TYPES:
|
||||||
@ -61,9 +61,8 @@ class DealerPublisher(zmq_publisher_base.PublisherMultisend):
|
|||||||
socket.send(b'', zmq.SNDMORE)
|
socket.send(b'', zmq.SNDMORE)
|
||||||
socket.send_pyobj(request)
|
socket.send_pyobj(request)
|
||||||
|
|
||||||
LOG.debug("Sending message_id %(message)s to a target %(target)s"
|
LOG.debug("Sending message_id %(message)s to a target %(target)s",
|
||||||
% {"message": request.message_id,
|
{"message": request.message_id, "target": request.target})
|
||||||
"target": request.target})
|
|
||||||
|
|
||||||
def cleanup(self):
|
def cleanup(self):
|
||||||
super(DealerPublisher, self).cleanup()
|
super(DealerPublisher, self).cleanup()
|
||||||
@ -90,10 +89,10 @@ class DealerPublisherLight(zmq_publisher_base.PublisherBase):
|
|||||||
self.socket.send_pyobj(request)
|
self.socket.send_pyobj(request)
|
||||||
|
|
||||||
LOG.debug("->[proxy:%(addr)s] Sending message_id %(message)s to "
|
LOG.debug("->[proxy:%(addr)s] Sending message_id %(message)s to "
|
||||||
"a target %(target)s"
|
"a target %(target)s",
|
||||||
% {"message": request.message_id,
|
{"message": request.message_id,
|
||||||
"target": request.target,
|
"target": request.target,
|
||||||
"addr": self.address})
|
"addr": self.address})
|
||||||
|
|
||||||
def cleanup(self):
|
def cleanup(self):
|
||||||
self.socket.setsockopt(zmq.LINGER, 0)
|
self.socket.setsockopt(zmq.LINGER, 0)
|
||||||
@ -118,7 +117,7 @@ class AcknowledgementReceiver(object):
|
|||||||
|
|
||||||
def poll_for_acknowledgements(self):
|
def poll_for_acknowledgements(self):
|
||||||
ack_message, socket = self.poller.poll()
|
ack_message, socket = self.poller.poll()
|
||||||
LOG.debug("Message %s acknowledged" % ack_message[zmq_names.FIELD_ID])
|
LOG.debug("Message %s acknowledged", ack_message[zmq_names.FIELD_ID])
|
||||||
|
|
||||||
def cleanup(self):
|
def cleanup(self):
|
||||||
self.thread.stop()
|
self.thread.stop()
|
||||||
|
@ -35,7 +35,7 @@ class DealerPublisherProxy(zmq_dealer_publisher.DealerPublisher):
|
|||||||
|
|
||||||
envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE]
|
envelope = multipart_message[zmq_names.MULTIPART_IDX_ENVELOPE]
|
||||||
|
|
||||||
LOG.debug("Envelope: %s" % envelope)
|
LOG.debug("Envelope: %s", envelope)
|
||||||
|
|
||||||
target = envelope[zmq_names.FIELD_TARGET]
|
target = envelope[zmq_names.FIELD_TARGET]
|
||||||
dealer_socket = self._check_hosts_connections(
|
dealer_socket = self._check_hosts_connections(
|
||||||
@ -46,8 +46,8 @@ class DealerPublisherProxy(zmq_dealer_publisher.DealerPublisher):
|
|||||||
# a queue for keeping messages to send them later
|
# a queue for keeping messages to send them later
|
||||||
# when some listener appears. However such approach
|
# when some listener appears. However such approach
|
||||||
# being more reliable will consume additional memory.
|
# being more reliable will consume additional memory.
|
||||||
LOG.warning(_LW("Request %s was dropped because no connection")
|
LOG.warning(_LW("Request %s was dropped because no connection"),
|
||||||
% envelope[zmq_names.FIELD_MSG_TYPE])
|
envelope[zmq_names.FIELD_MSG_TYPE])
|
||||||
return
|
return
|
||||||
|
|
||||||
self.reply_receiver.track_socket(dealer_socket.handle)
|
self.reply_receiver.track_socket(dealer_socket.handle)
|
||||||
|
@ -53,7 +53,7 @@ class PubPublisherProxy(zmq_publisher_base.PublisherBase):
|
|||||||
|
|
||||||
self.sync_channel = SyncChannel(conf, matchmaker, self.zmq_context)
|
self.sync_channel = SyncChannel(conf, matchmaker, self.zmq_context)
|
||||||
|
|
||||||
LOG.info(_LI("[PUB:%(pub)s, PULL:%(pull)s] Run PUB publisher") %
|
LOG.info(_LI("[PUB:%(pub)s, PULL:%(pull)s] Run PUB publisher"),
|
||||||
{"pub": self.host,
|
{"pub": self.host,
|
||||||
"pull": self.sync_channel.sync_host})
|
"pull": self.sync_channel.sync_host})
|
||||||
|
|
||||||
@ -75,10 +75,10 @@ class PubPublisherProxy(zmq_publisher_base.PublisherBase):
|
|||||||
self.socket.send(multipart_message[zmq_names.MULTIPART_IDX_BODY])
|
self.socket.send(multipart_message[zmq_names.MULTIPART_IDX_BODY])
|
||||||
|
|
||||||
LOG.debug("Publishing message [%(topic)s] %(message_id)s to "
|
LOG.debug("Publishing message [%(topic)s] %(message_id)s to "
|
||||||
"a target %(target)s "
|
"a target %(target)s ",
|
||||||
% {"message_id": message_id,
|
{"message_id": message_id,
|
||||||
"target": target,
|
"target": target,
|
||||||
"topic": topic_filter})
|
"topic": topic_filter})
|
||||||
|
|
||||||
def cleanup(self):
|
def cleanup(self):
|
||||||
self.matchmaker.unregister_publisher(
|
self.matchmaker.unregister_publisher(
|
||||||
@ -114,10 +114,10 @@ class SyncChannel(object):
|
|||||||
self.sync_socket.port)
|
self.sync_socket.port)
|
||||||
|
|
||||||
def is_ready(self):
|
def is_ready(self):
|
||||||
LOG.debug("[%s] Waiting for ready from first subscriber" %
|
LOG.debug("[%s] Waiting for ready from first subscriber",
|
||||||
self.sync_host)
|
self.sync_host)
|
||||||
if self._ready is None:
|
if self._ready is None:
|
||||||
self._ready = self.poller.poll()
|
self._ready = self.poller.poll()
|
||||||
LOG.debug("[%s] Received ready from first subscriber" %
|
LOG.debug("[%s] Received ready from first subscriber",
|
||||||
self.sync_host)
|
self.sync_host)
|
||||||
return self._ready is not None
|
return self._ready is not None
|
||||||
|
@ -91,10 +91,10 @@ class PublisherBase(object):
|
|||||||
:type request: zmq_request.Request
|
:type request: zmq_request.Request
|
||||||
"""
|
"""
|
||||||
LOG.debug("Sending %(type)s message_id %(message)s to a target "
|
LOG.debug("Sending %(type)s message_id %(message)s to a target "
|
||||||
"%(target)s"
|
"%(target)s",
|
||||||
% {"type": request.msg_type,
|
{"type": request.msg_type,
|
||||||
"message": request.message_id,
|
"message": request.message_id,
|
||||||
"target": request.target})
|
"target": request.target})
|
||||||
socket.send_pyobj(request)
|
socket.send_pyobj(request)
|
||||||
|
|
||||||
def cleanup(self):
|
def cleanup(self):
|
||||||
@ -137,10 +137,8 @@ class PublisherMultisend(PublisherBase):
|
|||||||
def _connect_to_address(self, socket, address, target):
|
def _connect_to_address(self, socket, address, target):
|
||||||
stype = zmq_names.socket_type_str(self.socket_type)
|
stype = zmq_names.socket_type_str(self.socket_type)
|
||||||
try:
|
try:
|
||||||
LOG.info(_LI("Connecting %(stype)s to %(address)s for %(target)s")
|
LOG.info(_LI("Connecting %(stype)s to %(address)s for %(target)s"),
|
||||||
% {"stype": stype,
|
{"stype": stype, "address": address, "target": target})
|
||||||
"address": address,
|
|
||||||
"target": target})
|
|
||||||
|
|
||||||
if six.PY3:
|
if six.PY3:
|
||||||
socket.setsockopt_string(zmq.IDENTITY, str(uuid.uuid1()))
|
socket.setsockopt_string(zmq.IDENTITY, str(uuid.uuid1()))
|
||||||
@ -151,8 +149,8 @@ class PublisherMultisend(PublisherBase):
|
|||||||
except zmq.ZMQError as e:
|
except zmq.ZMQError as e:
|
||||||
errmsg = _LE("Failed connecting %(stype) to %(address)s: %(e)s")\
|
errmsg = _LE("Failed connecting %(stype) to %(address)s: %(e)s")\
|
||||||
% (stype, address, e)
|
% (stype, address, e)
|
||||||
LOG.error(_LE("Failed connecting %(stype) to %(address)s: %(e)s")
|
LOG.error(_LE("Failed connecting %(stype) to %(address)s: %(e)s"),
|
||||||
% (stype, address, e))
|
(stype, address, e))
|
||||||
raise rpc_common.RPCException(errmsg)
|
raise rpc_common.RPCException(errmsg)
|
||||||
|
|
||||||
def _connect_to_host(self, socket, host, target):
|
def _connect_to_host(self, socket, host, target):
|
||||||
|
@ -39,8 +39,8 @@ class PushPublisher(zmq_publisher_base.PublisherMultisend):
|
|||||||
request.target, zmq_names.socket_type_str(zmq.PULL))
|
request.target, zmq_names.socket_type_str(zmq.PULL))
|
||||||
|
|
||||||
if not push_socket.connections:
|
if not push_socket.connections:
|
||||||
LOG.warning(_LW("Request %s was dropped because no connection")
|
LOG.warning(_LW("Request %s was dropped because no connection"),
|
||||||
% request.msg_type)
|
request.msg_type)
|
||||||
return
|
return
|
||||||
|
|
||||||
if request.msg_type in zmq_names.MULTISEND_TYPES:
|
if request.msg_type in zmq_names.MULTISEND_TYPES:
|
||||||
@ -53,6 +53,5 @@ class PushPublisher(zmq_publisher_base.PublisherMultisend):
|
|||||||
|
|
||||||
super(PushPublisher, self)._send_request(socket, request)
|
super(PushPublisher, self)._send_request(socket, request)
|
||||||
|
|
||||||
LOG.debug("Publishing message %(message)s to a target %(target)s"
|
LOG.debug("Publishing message %(message)s to a target %(target)s",
|
||||||
% {"message": request.message,
|
{"message": request.message, "target": request.target})
|
||||||
"target": request.target})
|
|
||||||
|
@ -55,7 +55,7 @@ class ThreadingPoller(zmq_poller.ZmqPoller):
|
|||||||
try:
|
try:
|
||||||
sockets = dict(self.poller.poll(timeout=timeout))
|
sockets = dict(self.poller.poll(timeout=timeout))
|
||||||
except zmq.ZMQError as e:
|
except zmq.ZMQError as e:
|
||||||
LOG.debug("Polling terminated with error: %s" % e)
|
LOG.debug("Polling terminated with error: %s", e)
|
||||||
|
|
||||||
if not sockets:
|
if not sockets:
|
||||||
return None, None
|
return None, None
|
||||||
|
@ -74,8 +74,8 @@ class SingleSocketConsumer(ConsumerBase):
|
|||||||
except zmq.ZMQError as e:
|
except zmq.ZMQError as e:
|
||||||
errmsg = _LE("Failed binding to port %(port)d: %(e)s")\
|
errmsg = _LE("Failed binding to port %(port)d: %(e)s")\
|
||||||
% (self.port, e)
|
% (self.port, e)
|
||||||
LOG.error(_LE("Failed binding to port %(port)d: %(e)s")
|
LOG.error(_LE("Failed binding to port %(port)d: %(e)s"),
|
||||||
% (self.port, e))
|
(self.port, e))
|
||||||
raise rpc_common.RPCException(errmsg)
|
raise rpc_common.RPCException(errmsg)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
|
@ -47,7 +47,7 @@ class PullConsumer(zmq_consumer_base.SingleSocketConsumer):
|
|||||||
super(PullConsumer, self).__init__(conf, poller, server, zmq.PULL)
|
super(PullConsumer, self).__init__(conf, poller, server, zmq.PULL)
|
||||||
|
|
||||||
def listen(self, target):
|
def listen(self, target):
|
||||||
LOG.info(_LI("Listen to target %s") % str(target))
|
LOG.info(_LI("Listen to target %s"), str(target))
|
||||||
# Do nothing here because we have a single socket
|
# Do nothing here because we have a single socket
|
||||||
|
|
||||||
def receive_message(self, socket):
|
def receive_message(self, socket):
|
||||||
@ -56,14 +56,13 @@ class PullConsumer(zmq_consumer_base.SingleSocketConsumer):
|
|||||||
assert msg_type is not None, 'Bad format: msg type expected'
|
assert msg_type is not None, 'Bad format: msg type expected'
|
||||||
context = socket.recv_pyobj()
|
context = socket.recv_pyobj()
|
||||||
message = socket.recv_pyobj()
|
message = socket.recv_pyobj()
|
||||||
LOG.debug("Received %(msg_type)s message %(msg)s"
|
LOG.debug("Received %(msg_type)s message %(msg)s",
|
||||||
% {"msg_type": msg_type,
|
{"msg_type": msg_type, "msg": str(message)})
|
||||||
"msg": str(message)})
|
|
||||||
|
|
||||||
if msg_type in (zmq_names.CAST_TYPES + zmq_names.NOTIFY_TYPES):
|
if msg_type in (zmq_names.CAST_TYPES + zmq_names.NOTIFY_TYPES):
|
||||||
return PullIncomingMessage(self.server, context, message)
|
return PullIncomingMessage(self.server, context, message)
|
||||||
else:
|
else:
|
||||||
LOG.error(_LE("Unknown message type: %s") % msg_type)
|
LOG.error(_LE("Unknown message type: %s"), msg_type)
|
||||||
|
|
||||||
except zmq.ZMQError as e:
|
except zmq.ZMQError as e:
|
||||||
LOG.error(_LE("Receiving message failed: %s") % str(e))
|
LOG.error(_LE("Receiving message failed: %s"), str(e))
|
||||||
|
@ -21,7 +21,7 @@ from oslo_messaging._drivers.zmq_driver.server import zmq_incoming_message
|
|||||||
from oslo_messaging._drivers.zmq_driver import zmq_address
|
from oslo_messaging._drivers.zmq_driver import zmq_address
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_names
|
from oslo_messaging._drivers.zmq_driver import zmq_names
|
||||||
from oslo_messaging._i18n import _LE
|
from oslo_messaging._i18n import _LE, _LI
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -57,11 +57,12 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer):
|
|||||||
self.targets = []
|
self.targets = []
|
||||||
self.host = zmq_address.combine_address(self.conf.rpc_zmq_host,
|
self.host = zmq_address.combine_address(self.conf.rpc_zmq_host,
|
||||||
self.port)
|
self.port)
|
||||||
LOG.info("[%s] Run ROUTER consumer" % self.host)
|
LOG.info(_LI("[%s] Run ROUTER consumer"), self.host)
|
||||||
|
|
||||||
def listen(self, target):
|
def listen(self, target):
|
||||||
|
|
||||||
LOG.info("[%s] Listen to target %s" % (self.host, target))
|
LOG.info(_LI("[%(host)s] Listen to target %(target)s"),
|
||||||
|
{'host': self.host, 'target': target})
|
||||||
|
|
||||||
self.targets.append(target)
|
self.targets.append(target)
|
||||||
self.matchmaker.register(target, self.host,
|
self.matchmaker.register(target, self.host,
|
||||||
@ -83,11 +84,11 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer):
|
|||||||
def receive_message(self, socket):
|
def receive_message(self, socket):
|
||||||
try:
|
try:
|
||||||
request, reply_id = self._receive_request(socket)
|
request, reply_id = self._receive_request(socket)
|
||||||
LOG.debug("[%(host)s] Received %(type)s, %(id)s, %(target)s"
|
LOG.debug("[%(host)s] Received %(type)s, %(id)s, %(target)s",
|
||||||
% {"host": self.host,
|
{"host": self.host,
|
||||||
"type": request.msg_type,
|
"type": request.msg_type,
|
||||||
"id": request.message_id,
|
"id": request.message_id,
|
||||||
"target": request.target})
|
"target": request.target})
|
||||||
|
|
||||||
if request.msg_type == zmq_names.CALL_TYPE:
|
if request.msg_type == zmq_names.CALL_TYPE:
|
||||||
return zmq_incoming_message.ZmqIncomingRequest(
|
return zmq_incoming_message.ZmqIncomingRequest(
|
||||||
@ -97,10 +98,10 @@ class RouterConsumer(zmq_consumer_base.SingleSocketConsumer):
|
|||||||
self.server, request.context, request.message, socket,
|
self.server, request.context, request.message, socket,
|
||||||
reply_id, request.message_id, self.poller)
|
reply_id, request.message_id, self.poller)
|
||||||
else:
|
else:
|
||||||
LOG.error(_LE("Unknown message type: %s") % request.msg_type)
|
LOG.error(_LE("Unknown message type: %s"), request.msg_type)
|
||||||
|
|
||||||
except zmq.ZMQError as e:
|
except zmq.ZMQError as e:
|
||||||
LOG.error(_LE("Receiving message failed: %s") % str(e))
|
LOG.error(_LE("Receiving message failed: %s"), str(e))
|
||||||
|
|
||||||
|
|
||||||
class RouterConsumerBroker(RouterConsumer):
|
class RouterConsumerBroker(RouterConsumer):
|
||||||
|
@ -77,9 +77,8 @@ class SubConsumer(zmq_consumer_base.ConsumerBase):
|
|||||||
self.socket.setsockopt(zmq.SUBSCRIBE, topic_filter)
|
self.socket.setsockopt(zmq.SUBSCRIBE, topic_filter)
|
||||||
self.subscriptions.add(topic_filter)
|
self.subscriptions.add(topic_filter)
|
||||||
|
|
||||||
LOG.debug("[%(host)s] Subscribing to topic %(filter)s"
|
LOG.debug("[%(host)s] Subscribing to topic %(filter)s",
|
||||||
% {"host": self.id,
|
{"host": self.id, "filter": topic_filter})
|
||||||
"filter": topic_filter})
|
|
||||||
|
|
||||||
def on_publishers(self, publishers):
|
def on_publishers(self, publishers):
|
||||||
with self._socket_lock:
|
with self._socket_lock:
|
||||||
@ -87,17 +86,18 @@ class SubConsumer(zmq_consumer_base.ConsumerBase):
|
|||||||
self.socket.connect(zmq_address.get_tcp_direct_address(host))
|
self.socket.connect(zmq_address.get_tcp_direct_address(host))
|
||||||
|
|
||||||
self.poller.register(self.socket, self.receive_message)
|
self.poller.register(self.socket, self.receive_message)
|
||||||
LOG.debug("[%s] SUB consumer connected to publishers %s"
|
LOG.debug("[%s] SUB consumer connected to publishers %s",
|
||||||
% (self.id, publishers))
|
(self.id, publishers))
|
||||||
|
|
||||||
def listen(self, target):
|
def listen(self, target):
|
||||||
LOG.debug("Listen to target %s" % target)
|
LOG.debug("Listen to target %s", target)
|
||||||
with self._socket_lock:
|
with self._socket_lock:
|
||||||
self._subscribe_on_target(target)
|
self._subscribe_on_target(target)
|
||||||
|
|
||||||
def _receive_request(self, socket):
|
def _receive_request(self, socket):
|
||||||
topic_filter = socket.recv()
|
topic_filter = socket.recv()
|
||||||
LOG.debug("[%s] Received %s topic" % (self.id, topic_filter))
|
LOG.debug("[%(id)s] Received %(topict_filter)s topic",
|
||||||
|
{'id': self.id, 'topic_filter': topic_filter})
|
||||||
assert topic_filter in self.subscriptions
|
assert topic_filter in self.subscriptions
|
||||||
request = socket.recv_pyobj()
|
request = socket.recv_pyobj()
|
||||||
return request
|
return request
|
||||||
@ -107,18 +107,18 @@ class SubConsumer(zmq_consumer_base.ConsumerBase):
|
|||||||
request = self._receive_request(socket)
|
request = self._receive_request(socket)
|
||||||
if not request:
|
if not request:
|
||||||
return None
|
return None
|
||||||
LOG.debug("Received %(type)s, %(id)s, %(target)s"
|
LOG.debug("Received %(type)s, %(id)s, %(target)s",
|
||||||
% {"type": request.msg_type,
|
{"type": request.msg_type,
|
||||||
"id": request.message_id,
|
"id": request.message_id,
|
||||||
"target": request.target})
|
"target": request.target})
|
||||||
|
|
||||||
if request.msg_type not in zmq_names.MULTISEND_TYPES:
|
if request.msg_type not in zmq_names.MULTISEND_TYPES:
|
||||||
LOG.error(_LE("Unknown message type: %s") % request.msg_type)
|
LOG.error(_LE("Unknown message type: %s"), request.msg_type)
|
||||||
else:
|
else:
|
||||||
return SubIncomingMessage(self.server, request, socket,
|
return SubIncomingMessage(self.server, request, socket,
|
||||||
self.poller)
|
self.poller)
|
||||||
except zmq.ZMQError as e:
|
except zmq.ZMQError as e:
|
||||||
LOG.error(_LE("Receiving message failed: %s") % str(e))
|
LOG.error(_LE("Receiving message failed: %s"), str(e))
|
||||||
|
|
||||||
|
|
||||||
class MatchmakerPoller(object):
|
class MatchmakerPoller(object):
|
||||||
|
@ -21,6 +21,7 @@ from oslo_messaging._drivers.zmq_driver.server.consumers\
|
|||||||
from oslo_messaging._drivers.zmq_driver.server.consumers\
|
from oslo_messaging._drivers.zmq_driver.server.consumers\
|
||||||
import zmq_sub_consumer
|
import zmq_sub_consumer
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_async
|
from oslo_messaging._drivers.zmq_driver import zmq_async
|
||||||
|
from oslo_messaging._i18n import _LI
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
@ -53,8 +54,9 @@ class ZmqServer(base.Listener):
|
|||||||
return message
|
return message
|
||||||
|
|
||||||
def stop(self):
|
def stop(self):
|
||||||
consumer = self.router_consumer
|
consumer = self.rpc_consumer
|
||||||
LOG.info("Stop server %s:%d" % (consumer.address, consumer.port))
|
LOG.info(_LI("Stop server %(address)s:%(port)s"),
|
||||||
|
{'address': consumer.address, 'port': consumer.port})
|
||||||
|
|
||||||
def cleanup(self):
|
def cleanup(self):
|
||||||
self.poller.close()
|
self.poller.close()
|
||||||
|
@ -16,6 +16,9 @@ import logging
|
|||||||
|
|
||||||
import six
|
import six
|
||||||
|
|
||||||
|
from oslo_messaging._i18n import _
|
||||||
|
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
"DispatcherBase",
|
"DispatcherBase",
|
||||||
"DispatcherExecutorContext"
|
"DispatcherExecutorContext"
|
||||||
@ -58,7 +61,7 @@ class DispatcherExecutorContext(object):
|
|||||||
self._result = self._dispatch(self._incoming,
|
self._result = self._dispatch(self._incoming,
|
||||||
self._executor_callback)
|
self._executor_callback)
|
||||||
except Exception:
|
except Exception:
|
||||||
msg = 'The dispatcher method must catches all exceptions'
|
msg = _('The dispatcher method must catches all exceptions')
|
||||||
LOG.exception(msg)
|
LOG.exception(msg)
|
||||||
raise RuntimeError(msg)
|
raise RuntimeError(msg)
|
||||||
|
|
||||||
|
@ -19,6 +19,7 @@ import logging
|
|||||||
|
|
||||||
import six
|
import six
|
||||||
|
|
||||||
|
from oslo_messaging._i18n import _LE, _LW
|
||||||
from oslo_messaging import dispatcher
|
from oslo_messaging import dispatcher
|
||||||
from oslo_messaging import localcontext
|
from oslo_messaging import localcontext
|
||||||
from oslo_messaging import serializer as msg_serializer
|
from oslo_messaging import serializer as msg_serializer
|
||||||
@ -74,7 +75,7 @@ class _NotificationDispatcherBase(dispatcher.DispatcherBase):
|
|||||||
else:
|
else:
|
||||||
m.acknowledge()
|
m.acknowledge()
|
||||||
except Exception:
|
except Exception:
|
||||||
LOG.error("Fail to ack/requeue message", exc_info=True)
|
LOG.error(_LE("Fail to ack/requeue message"), exc_info=True)
|
||||||
|
|
||||||
def _dispatch_and_handle_error(self, incoming, executor_callback):
|
def _dispatch_and_handle_error(self, incoming, executor_callback):
|
||||||
"""Dispatch a notification message to the appropriate endpoint method.
|
"""Dispatch a notification message to the appropriate endpoint method.
|
||||||
@ -85,7 +86,7 @@ class _NotificationDispatcherBase(dispatcher.DispatcherBase):
|
|||||||
try:
|
try:
|
||||||
return self._dispatch(incoming, executor_callback)
|
return self._dispatch(incoming, executor_callback)
|
||||||
except Exception:
|
except Exception:
|
||||||
LOG.error('Exception during message handling', exc_info=True)
|
LOG.error(_LE('Exception during message handling'), exc_info=True)
|
||||||
|
|
||||||
def _dispatch(self, incoming, executor_callback=None):
|
def _dispatch(self, incoming, executor_callback=None):
|
||||||
"""Dispatch notification messages to the appropriate endpoint method.
|
"""Dispatch notification messages to the appropriate endpoint method.
|
||||||
@ -101,7 +102,7 @@ class _NotificationDispatcherBase(dispatcher.DispatcherBase):
|
|||||||
raw_messages = list(raw_messages)
|
raw_messages = list(raw_messages)
|
||||||
messages = list(messages)
|
messages = list(messages)
|
||||||
if priority not in PRIORITIES:
|
if priority not in PRIORITIES:
|
||||||
LOG.warning('Unknown priority "%s"', priority)
|
LOG.warning(_LW('Unknown priority "%s"'), priority)
|
||||||
continue
|
continue
|
||||||
for screen, callback in self._callbacks_by_priority.get(priority,
|
for screen, callback in self._callbacks_by_priority.get(priority,
|
||||||
[]):
|
[]):
|
||||||
|
@ -18,6 +18,7 @@
|
|||||||
import logging
|
import logging
|
||||||
|
|
||||||
import oslo_messaging
|
import oslo_messaging
|
||||||
|
from oslo_messaging._i18n import _LE
|
||||||
from oslo_messaging.notify import notifier
|
from oslo_messaging.notify import notifier
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
@ -47,8 +48,8 @@ class MessagingDriver(notifier.Driver):
|
|||||||
version=self.version,
|
version=self.version,
|
||||||
retry=retry)
|
retry=retry)
|
||||||
except Exception:
|
except Exception:
|
||||||
LOG.exception("Could not send notification to %(topic)s. "
|
LOG.exception(_LE("Could not send notification to %(topic)s. "
|
||||||
"Payload=%(message)s",
|
"Payload=%(message)s"),
|
||||||
dict(topic=topic, message=message))
|
dict(topic=topic, message=message))
|
||||||
|
|
||||||
|
|
||||||
|
@ -24,6 +24,7 @@ from oslo_utils import timeutils
|
|||||||
import six
|
import six
|
||||||
from stevedore import named
|
from stevedore import named
|
||||||
|
|
||||||
|
from oslo_messaging._i18n import _LE
|
||||||
from oslo_messaging import serializer as msg_serializer
|
from oslo_messaging import serializer as msg_serializer
|
||||||
from oslo_messaging import transport as msg_transport
|
from oslo_messaging import transport as msg_transport
|
||||||
|
|
||||||
@ -225,8 +226,8 @@ class Notifier(object):
|
|||||||
try:
|
try:
|
||||||
ext.obj.notify(ctxt, msg, priority, retry or self.retry)
|
ext.obj.notify(ctxt, msg, priority, retry or self.retry)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
_LOG.exception("Problem '%(e)s' attempting to send to "
|
_LOG.exception(_LE("Problem '%(e)s' attempting to send to "
|
||||||
"notification system. Payload=%(payload)s",
|
"notification system. Payload=%(payload)s"),
|
||||||
dict(e=e, payload=payload))
|
dict(e=e, payload=payload))
|
||||||
|
|
||||||
if self._driver_mgr.extensions:
|
if self._driver_mgr.extensions:
|
||||||
|
@ -34,6 +34,7 @@ from oslo_utils import timeutils
|
|||||||
from stevedore import driver
|
from stevedore import driver
|
||||||
|
|
||||||
from oslo_messaging._drivers import base as driver_base
|
from oslo_messaging._drivers import base as driver_base
|
||||||
|
from oslo_messaging._i18n import _LW
|
||||||
from oslo_messaging import exceptions
|
from oslo_messaging import exceptions
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
@ -111,7 +112,7 @@ class _OrderedTask(object):
|
|||||||
|
|
||||||
while condition():
|
while condition():
|
||||||
if log_timer is not None and log_timer.expired():
|
if log_timer is not None and log_timer.expired():
|
||||||
LOG.warn('Possible hang: %s' % msg)
|
LOG.warn(_LW('Possible hang: %s'), msg)
|
||||||
LOG.debug(''.join(traceback.format_stack()))
|
LOG.debug(''.join(traceback.format_stack()))
|
||||||
# Only log once. After than we wait indefinitely without
|
# Only log once. After than we wait indefinitely without
|
||||||
# logging.
|
# logging.
|
||||||
@ -345,11 +346,11 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner):
|
|||||||
"""
|
"""
|
||||||
# Warn that restarting will be deprecated
|
# Warn that restarting will be deprecated
|
||||||
if self._started:
|
if self._started:
|
||||||
LOG.warn('Restarting a MessageHandlingServer is inherently racy. '
|
LOG.warn(_LW('Restarting a MessageHandlingServer is inherently '
|
||||||
'It is deprecated, and will become a noop in a future '
|
'racy. It is deprecated, and will become a noop in '
|
||||||
'release of oslo.messaging. If you need to restart '
|
'a future release of oslo.messaging. If you need to '
|
||||||
'MessageHandlingServer you should instantiate a new '
|
'restart MessageHandlingServer you should '
|
||||||
'object.')
|
'instantiate a new object.'))
|
||||||
self._started = True
|
self._started = True
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
@ -744,7 +744,7 @@ class TestServerLocking(test_utils.BaseTestCase):
|
|||||||
# DEFAULT_LOG_AFTER
|
# DEFAULT_LOG_AFTER
|
||||||
|
|
||||||
log_event = threading.Event()
|
log_event = threading.Event()
|
||||||
mock_log.warn.side_effect = lambda _: log_event.set()
|
mock_log.warn.side_effect = lambda _, __: log_event.set()
|
||||||
|
|
||||||
# Call stop without calling start. We should log a wait after 1 second
|
# Call stop without calling start. We should log a wait after 1 second
|
||||||
thread = eventlet.spawn(self.server.stop)
|
thread = eventlet.spawn(self.server.stop)
|
||||||
@ -760,7 +760,7 @@ class TestServerLocking(test_utils.BaseTestCase):
|
|||||||
# the number of seconds passed to log_after
|
# the number of seconds passed to log_after
|
||||||
|
|
||||||
log_event = threading.Event()
|
log_event = threading.Event()
|
||||||
mock_log.warn.side_effect = lambda _: log_event.set()
|
mock_log.warn.side_effect = lambda _, __: log_event.set()
|
||||||
|
|
||||||
# Call stop without calling start. We should log a wait after 1 second
|
# Call stop without calling start. We should log a wait after 1 second
|
||||||
thread = eventlet.spawn(self.server.stop, log_after=1)
|
thread = eventlet.spawn(self.server.stop, log_after=1)
|
||||||
@ -776,7 +776,7 @@ class TestServerLocking(test_utils.BaseTestCase):
|
|||||||
# specified an absolute timeout
|
# specified an absolute timeout
|
||||||
|
|
||||||
log_event = threading.Event()
|
log_event = threading.Event()
|
||||||
mock_log.warn.side_effect = lambda _: log_event.set()
|
mock_log.warn.side_effect = lambda _, __: log_event.set()
|
||||||
|
|
||||||
# Call stop without calling start. We should log a wait after 1 second
|
# Call stop without calling start. We should log a wait after 1 second
|
||||||
thread = eventlet.spawn(self.server.stop, log_after=1, timeout=2)
|
thread = eventlet.spawn(self.server.stop, log_after=1, timeout=2)
|
||||||
|
Loading…
Reference in New Issue
Block a user