Fixing glance-api hangs in the qpid notifier
Glance-api was able to hang in qpid notifier under heavy image creation load. The ``thread`` and ``select`` modules used by the python-qpid for managing the AMQP connection. When the eventlet was not able to switch between threads because leaded to hang and/or pipe(2) leaking issues. * Monkey patching the ``select`` and ``thread`` modules to be eventlet friendly in order to avoid hanging issues. * The reference to the connection object in the QpidStrategy was replaceable by a concurrent thread, which could cause various issues. Using just local variables for storing connection object in order to avoid concurrent unsafe manipulation. Fixing bug 1229042 Change-Id: I8fa8c4f36892b96d406216cb3c64854a94ca9df7
This commit is contained in:
parent
8c31de4d9a
commit
2e7aa761b6
@ -26,8 +26,9 @@ import eventlet
|
|||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
|
|
||||||
# Monkey patch socket and time
|
# Monkey patch socket, time, select, threads
|
||||||
eventlet.patcher.monkey_patch(all=False, socket=True, time=True)
|
eventlet.patcher.monkey_patch(all=False, socket=True, time=True,
|
||||||
|
select=True, thread=True)
|
||||||
|
|
||||||
# If ../glance/__init__.py exists, add ../ to Python search path, so that
|
# If ../glance/__init__.py exists, add ../ to Python search path, so that
|
||||||
# it will override what happens to be installed in /usr/(local/)lib/python...
|
# it will override what happens to be installed in /usr/(local/)lib/python...
|
||||||
|
@ -83,30 +83,31 @@ class QpidStrategy(strategy.Strategy):
|
|||||||
def _open_connection(self):
|
def _open_connection(self):
|
||||||
"""Initialize the Qpid notification strategy."""
|
"""Initialize the Qpid notification strategy."""
|
||||||
broker = CONF.qpid_hostname + ":" + CONF.qpid_port
|
broker = CONF.qpid_hostname + ":" + CONF.qpid_port
|
||||||
self.connection = qpid.messaging.Connection(broker)
|
connection = qpid.messaging.Connection(broker)
|
||||||
self.connection.username = CONF.qpid_username
|
connection.username = CONF.qpid_username
|
||||||
self.connection.password = CONF.qpid_password
|
connection.password = CONF.qpid_password
|
||||||
self.connection.sasl_mechanisms = CONF.qpid_sasl_mechanisms
|
connection.sasl_mechanisms = CONF.qpid_sasl_mechanisms
|
||||||
# Hard code this option as enabled so that reconnect logic isn't needed
|
# Hard code this option as enabled so that reconnect logic isn't needed
|
||||||
# in this file at all.
|
# in this file at all.
|
||||||
self.connection.reconnect = True
|
connection.reconnect = True
|
||||||
if CONF.qpid_reconnect_timeout:
|
if CONF.qpid_reconnect_timeout:
|
||||||
self.connection.reconnect_timeout = CONF.qpid_reconnect_timeout
|
connection.reconnect_timeout = CONF.qpid_reconnect_timeout
|
||||||
if CONF.qpid_reconnect_limit:
|
if CONF.qpid_reconnect_limit:
|
||||||
self.connection.reconnect_limit = CONF.qpid_reconnect_limit
|
connection.reconnect_limit = CONF.qpid_reconnect_limit
|
||||||
if CONF.qpid_reconnect_interval_max:
|
if CONF.qpid_reconnect_interval_max:
|
||||||
self.connection.reconnect_interval_max = (
|
connection.reconnect_interval_max = (
|
||||||
CONF.qpid_reconnect_interval_max)
|
CONF.qpid_reconnect_interval_max)
|
||||||
if CONF.qpid_reconnect_interval_min:
|
if CONF.qpid_reconnect_interval_min:
|
||||||
self.connection.reconnect_interval_min = (
|
connection.reconnect_interval_min = (
|
||||||
CONF.qpid_reconnect_interval_min)
|
CONF.qpid_reconnect_interval_min)
|
||||||
if CONF.qpid_reconnect_interval:
|
if CONF.qpid_reconnect_interval:
|
||||||
self.connection.reconnect_interval = CONF.qpid_reconnect_interval
|
connection.reconnect_interval = CONF.qpid_reconnect_interval
|
||||||
self.connection.heartbeat = CONF.qpid_heartbeat
|
connection.heartbeat = CONF.qpid_heartbeat
|
||||||
self.connection.protocol = CONF.qpid_protocol
|
connection.protocol = CONF.qpid_protocol
|
||||||
self.connection.tcp_nodelay = CONF.qpid_tcp_nodelay
|
connection.tcp_nodelay = CONF.qpid_tcp_nodelay
|
||||||
self.connection.open()
|
connection.open()
|
||||||
LOG.info(_('Connected to AMQP server on %s') % broker)
|
LOG.info(_('Connected to AMQP server on %s') % broker)
|
||||||
|
return connection
|
||||||
|
|
||||||
def _send(self, priority, msg):
|
def _send(self, priority, msg):
|
||||||
addr_opts = {
|
addr_opts = {
|
||||||
@ -124,11 +125,10 @@ class QpidStrategy(strategy.Strategy):
|
|||||||
topic = "%s.%s" % (CONF.qpid_notification_topic, priority)
|
topic = "%s.%s" % (CONF.qpid_notification_topic, priority)
|
||||||
address = "%s/%s ; %s" % (CONF.qpid_notification_exchange, topic,
|
address = "%s/%s ; %s" % (CONF.qpid_notification_exchange, topic,
|
||||||
json.dumps(addr_opts))
|
json.dumps(addr_opts))
|
||||||
|
connection = None
|
||||||
try:
|
try:
|
||||||
self.connection = None
|
connection = self._open_connection()
|
||||||
self._open_connection()
|
session = connection.session()
|
||||||
session = self.connection.session()
|
|
||||||
sender = session.sender(address)
|
sender = session.sender(address)
|
||||||
qpid_msg = qpid.messaging.Message(content=msg)
|
qpid_msg = qpid.messaging.Message(content=msg)
|
||||||
sender.send(qpid_msg)
|
sender.send(qpid_msg)
|
||||||
@ -138,8 +138,8 @@ class QpidStrategy(strategy.Strategy):
|
|||||||
'Message: %(msg)s') % details)
|
'Message: %(msg)s') % details)
|
||||||
raise
|
raise
|
||||||
finally:
|
finally:
|
||||||
if self.connection and self.connection.opened():
|
if connection and connection.opened():
|
||||||
self.connection.close()
|
connection.close()
|
||||||
|
|
||||||
def warn(self, msg):
|
def warn(self, msg):
|
||||||
self._send('warn', msg)
|
self._send('warn', msg)
|
||||||
|
@ -348,7 +348,7 @@ class TestQpidNotifier(utils.BaseTestCase):
|
|||||||
qpid.messaging.Sender = self.orig_sender
|
qpid.messaging.Sender = self.orig_sender
|
||||||
qpid.messaging.Receiver = self.orig_receiver
|
qpid.messaging.Receiver = self.orig_receiver
|
||||||
|
|
||||||
def _test_notify(self, priority, exception=False, opened=True):
|
def _test_notify(self, priority, exception=False, exception_send=False):
|
||||||
test_msg = {'a': 'b'}
|
test_msg = {'a': 'b'}
|
||||||
|
|
||||||
self.mock_connection = self.mocker.CreateMock(self.orig_connection)
|
self.mock_connection = self.mocker.CreateMock(self.orig_connection)
|
||||||
@ -358,7 +358,7 @@ class TestQpidNotifier(utils.BaseTestCase):
|
|||||||
self.mock_connection.username = ""
|
self.mock_connection.username = ""
|
||||||
if exception:
|
if exception:
|
||||||
self.mock_connection.open().AndRaise(
|
self.mock_connection.open().AndRaise(
|
||||||
Exception('Test Exception'))
|
Exception('Test open Exception'))
|
||||||
else:
|
else:
|
||||||
self.mock_connection.open()
|
self.mock_connection.open()
|
||||||
self.mock_connection.session().AndReturn(self.mock_session)
|
self.mock_connection.session().AndReturn(self.mock_session)
|
||||||
@ -368,9 +368,14 @@ class TestQpidNotifier(utils.BaseTestCase):
|
|||||||
'"create": "always"}' % priority)
|
'"create": "always"}' % priority)
|
||||||
self.mock_session.sender(expected_address).AndReturn(
|
self.mock_session.sender(expected_address).AndReturn(
|
||||||
self.mock_sender)
|
self.mock_sender)
|
||||||
|
if exception_send:
|
||||||
|
self.mock_sender.send(mox.IgnoreArg()).AndRaise(
|
||||||
|
Exception('Test send Exception'))
|
||||||
|
# NOTE(afazekas): the opened and close call is expected
|
||||||
|
# in this case, but not expected if the open fails
|
||||||
|
else:
|
||||||
self.mock_sender.send(mox.IgnoreArg())
|
self.mock_sender.send(mox.IgnoreArg())
|
||||||
self.mock_connection.opened().AndReturn(opened)
|
self.mock_connection.opened().AndReturn(True)
|
||||||
if opened:
|
|
||||||
self.mock_connection.close()
|
self.mock_connection.close()
|
||||||
|
|
||||||
self.mocker.ReplayAll()
|
self.mocker.ReplayAll()
|
||||||
@ -378,17 +383,17 @@ class TestQpidNotifier(utils.BaseTestCase):
|
|||||||
self.config(notifier_strategy="qpid")
|
self.config(notifier_strategy="qpid")
|
||||||
notifier = self.notify_qpid.QpidStrategy()
|
notifier = self.notify_qpid.QpidStrategy()
|
||||||
if priority == 'info':
|
if priority == 'info':
|
||||||
if exception:
|
if exception or exception_send:
|
||||||
self.assertRaises(Exception, notifier.info, test_msg)
|
self.assertRaises(Exception, notifier.info, test_msg)
|
||||||
else:
|
else:
|
||||||
notifier.info(test_msg)
|
notifier.info(test_msg)
|
||||||
elif priority == 'warn':
|
elif priority == 'warn':
|
||||||
if exception:
|
if exception or exception_send:
|
||||||
self.assertRaises(Exception, notifier.warn, test_msg)
|
self.assertRaises(Exception, notifier.warn, test_msg)
|
||||||
else:
|
else:
|
||||||
notifier.warn(test_msg)
|
notifier.warn(test_msg)
|
||||||
elif priority == 'error':
|
elif priority == 'error':
|
||||||
if exception:
|
if exception or exception_send:
|
||||||
self.assertRaises(Exception, notifier.error, test_msg)
|
self.assertRaises(Exception, notifier.error, test_msg)
|
||||||
else:
|
else:
|
||||||
notifier.error(test_msg)
|
notifier.error(test_msg)
|
||||||
@ -407,8 +412,14 @@ class TestQpidNotifier(utils.BaseTestCase):
|
|||||||
def test_exception_open_successful(self):
|
def test_exception_open_successful(self):
|
||||||
self._test_notify('info', exception=True)
|
self._test_notify('info', exception=True)
|
||||||
|
|
||||||
def test_exception_open_failed(self):
|
def test_info_fail(self):
|
||||||
self._test_notify('info', exception=True, opened=False)
|
self._test_notify('info', exception_send=True)
|
||||||
|
|
||||||
|
def test_warn_fail(self):
|
||||||
|
self._test_notify('warn', exception_send=True)
|
||||||
|
|
||||||
|
def test_error_fail(self):
|
||||||
|
self._test_notify('error', exception_send=True)
|
||||||
|
|
||||||
|
|
||||||
class TestRabbitContentType(utils.BaseTestCase):
|
class TestRabbitContentType(utils.BaseTestCase):
|
||||||
|
Loading…
Reference in New Issue
Block a user