Fix problems during unstable network

In this patch start and stop method is not raised exceptions
if connectivity problem is detected. Rasing exceptions there
exceptions are not expected by MessageHandlerServer.
It hangs server on start

Closes-Bug: #1553168

Change-Id: I891abab2a1184fa65b496ea2f7fc54894bc0b421
This commit is contained in:
dukhlov 2016-03-03 11:00:50 -05:00
parent 41851e9c8a
commit 0aca222d96
7 changed files with 120 additions and 80 deletions

View File

@ -195,7 +195,7 @@ class PikaDriver(base.BaseDriver):
self._declare_rpc_exchange(exchange, self._declare_rpc_exchange(exchange,
expiration_time - time.time()) expiration_time - time.time())
except pika_drv_exc.ConnectionException as e: except pika_drv_exc.ConnectionException as e:
LOG.warning("Problem during declaring exchange. %", e) LOG.warning("Problem during declaring exchange. %s", e)
return True return True
elif isinstance(ex, (pika_drv_exc.ConnectionException, elif isinstance(ex, (pika_drv_exc.ConnectionException,
exceptions.MessageDeliveryFailure)): exceptions.MessageDeliveryFailure)):
@ -240,7 +240,7 @@ class PikaDriver(base.BaseDriver):
self._declare_rpc_exchange(exchange, self._declare_rpc_exchange(exchange,
expiration_time - time.time()) expiration_time - time.time())
except pika_drv_exc.ConnectionException as e: except pika_drv_exc.ConnectionException as e:
LOG.warning("Problem during declaring exchange. %", e) LOG.warning("Problem during declaring exchange. %s", e)
raise ex raise ex
if reply is not None: if reply is not None:
@ -269,7 +269,7 @@ class PikaDriver(base.BaseDriver):
exchange, expiration_time - time.time() exchange, expiration_time - time.time()
) )
except pika_drv_exc.ConnectionException as e: except pika_drv_exc.ConnectionException as e:
LOG.warning("Problem during declaring exchange. %", e) LOG.warning("Problem during declaring exchange. %s", e)
def _declare_notification_queue_binding(self, target, timeout=None): def _declare_notification_queue_binding(self, target, timeout=None):
if timeout is not None and timeout < 0: if timeout is not None and timeout < 0:
@ -303,16 +303,16 @@ class PikaDriver(base.BaseDriver):
def on_exception(ex): def on_exception(ex):
if isinstance(ex, (pika_drv_exc.ExchangeNotFoundException, if isinstance(ex, (pika_drv_exc.ExchangeNotFoundException,
pika_drv_exc.RoutingException)): pika_drv_exc.RoutingException)):
LOG.warning("Problem during sending notification. %", ex) LOG.warning("Problem during sending notification. %s", ex)
try: try:
self._declare_notification_queue_binding(target) self._declare_notification_queue_binding(target)
except pika_drv_exc.ConnectionException as e: except pika_drv_exc.ConnectionException as e:
LOG.warning("Problem during declaring notification queue " LOG.warning("Problem during declaring notification queue "
"binding. %", e) "binding. %s", e)
return True return True
elif isinstance(ex, (pika_drv_exc.ConnectionException, elif isinstance(ex, (pika_drv_exc.ConnectionException,
pika_drv_exc.MessageRejectedException)): pika_drv_exc.MessageRejectedException)):
LOG.warning("Problem during sending notification. %", ex) LOG.warning("Problem during sending notification. %s", ex)
return True return True
else: else:
return False return False

View File

@ -0,0 +1,44 @@
# Copyright 2015 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import select
import socket
import sys
from pika import exceptions as pika_exceptions
import six
PIKA_CONNECTIVITY_ERRORS = (
pika_exceptions.AMQPConnectionError,
pika_exceptions.ConnectionClosed,
pika_exceptions.ChannelClosed,
socket.timeout,
select.error
)
EXCEPTIONS_MODULE = 'exceptions' if six.PY2 else 'builtins'
def is_eventlet_monkey_patched(module):
"""Determines safely is eventlet patching for module enabled or not
:param module: String, module name
:return Bool, True if module is pathed, False otherwise
"""
if 'eventlet.patcher' not in sys.modules:
return False
import eventlet.patcher
return eventlet.patcher.is_monkey_patched(module)

View File

@ -14,7 +14,6 @@
import random import random
import socket import socket
import sys
import threading import threading
import time import time
@ -22,30 +21,16 @@ from oslo_log import log as logging
import pika import pika
from pika.adapters import select_connection from pika.adapters import select_connection
from pika import credentials as pika_credentials from pika import credentials as pika_credentials
import pika_pool import pika_pool
import six
import uuid import uuid
from oslo_messaging._drivers.pika_driver import pika_commons as pika_drv_cmns
from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
_EXCEPTIONS_MODULE = 'exceptions' if six.PY2 else 'builtins'
def _is_eventlet_monkey_patched(module):
"""Determines safely is eventlet patching for module enabled or not
:param module: String, module name
:return Bool, True if module is pathed, False otherwise
"""
if 'eventlet.patcher' not in sys.modules:
return False
import eventlet.patcher
return eventlet.patcher.is_monkey_patched(module)
def _create_select_poller_connection_impl( def _create_select_poller_connection_impl(
parameters, on_open_callback, on_open_error_callback, parameters, on_open_callback, on_open_error_callback,
@ -99,7 +84,9 @@ class PikaEngine(object):
allowed_remote_exmods=None): allowed_remote_exmods=None):
self.conf = conf self.conf = conf
self._force_select_poller_use = _is_eventlet_monkey_patched('select') self._force_select_poller_use = (
pika_drv_cmns.is_eventlet_monkey_patched('select')
)
# processing rpc options # processing rpc options
self.default_rpc_exchange = ( self.default_rpc_exchange = (
@ -109,7 +96,7 @@ class PikaEngine(object):
conf.oslo_messaging_pika.rpc_reply_exchange conf.oslo_messaging_pika.rpc_reply_exchange
) )
self.allowed_remote_exmods = [_EXCEPTIONS_MODULE] self.allowed_remote_exmods = [pika_drv_cmns.EXCEPTIONS_MODULE]
if allowed_remote_exmods: if allowed_remote_exmods:
self.allowed_remote_exmods.extend(allowed_remote_exmods) self.allowed_remote_exmods.extend(allowed_remote_exmods)
@ -359,8 +346,8 @@ class PikaEngine(object):
self.HOST_CONNECTION_LAST_TRY_TIME self.HOST_CONNECTION_LAST_TRY_TIME
] = cur_time ] = cur_time
@staticmethod def declare_exchange_by_channel(self, channel, exchange, exchange_type,
def declare_exchange_by_channel(channel, exchange, exchange_type, durable): durable):
"""Declare exchange using already created channel, if they don't exist """Declare exchange using already created channel, if they don't exist
:param channel: Channel for communication with RabbitMQ :param channel: Channel for communication with RabbitMQ
@ -373,7 +360,7 @@ class PikaEngine(object):
channel.exchange_declare( channel.exchange_declare(
exchange, exchange_type, auto_delete=True, durable=durable exchange, exchange_type, auto_delete=True, durable=durable
) )
except pika_pool.Connection.connectivity_errors as e: except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS as e:
raise pika_drv_exc.ConnectionException( raise pika_drv_exc.ConnectionException(
"Connectivity problem detected during declaring exchange: " "Connectivity problem detected during declaring exchange: "
"exchange:{}, exchange_type: {}, durable: {}. {}".format( "exchange:{}, exchange_type: {}, durable: {}. {}".format(
@ -381,10 +368,9 @@ class PikaEngine(object):
) )
) )
@staticmethod def declare_queue_binding_by_channel(self, channel, exchange, queue,
def declare_queue_binding_by_channel(channel, exchange, queue, routing_key, routing_key, exchange_type,
exchange_type, queue_expiration, queue_expiration, durable):
durable):
"""Declare exchange, queue and bind them using already created """Declare exchange, queue and bind them using already created
channel, if they don't exist channel, if they don't exist
@ -410,7 +396,7 @@ class PikaEngine(object):
channel.queue_declare(queue, durable=durable, arguments=arguments) channel.queue_declare(queue, durable=durable, arguments=arguments)
channel.queue_bind(queue, exchange, routing_key) channel.queue_bind(queue, exchange, routing_key)
except pika_pool.Connection.connectivity_errors as e: except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS as e:
raise pika_drv_exc.ConnectionException( raise pika_drv_exc.ConnectionException(
"Connectivity problem detected during declaring queue " "Connectivity problem detected during declaring queue "
"binding: exchange:{}, queue: {}, routing_key: {}, " "binding: exchange:{}, queue: {}, routing_key: {}, "

View File

@ -13,13 +13,11 @@
# under the License. # under the License.
import threading import threading
import time
import uuid import uuid
from concurrent import futures from concurrent import futures
from oslo_log import log as logging from oslo_log import log as logging
from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc
from oslo_messaging._drivers.pika_driver import pika_poller as pika_drv_poller from oslo_messaging._drivers.pika_driver import pika_poller as pika_drv_poller
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -97,15 +95,7 @@ class RpcReplyPikaListener(object):
""" """
while self._reply_poller: while self._reply_poller:
try: try:
try: messages = self._reply_poller.poll()
messages = self._reply_poller.poll()
except pika_drv_exc.EstablishConnectionException:
LOG.exception("Problem during establishing connection for "
"reply polling")
time.sleep(
self._pika_engine.host_connection_reconnect_delay
)
continue
for message in messages: for message in messages:
try: try:

View File

@ -13,13 +13,14 @@
# under the License. # under the License.
import threading import threading
import time
from oslo_log import log as logging from oslo_log import log as logging
from oslo_utils import timeutils from oslo_utils import timeutils
import pika_pool
import six import six
from oslo_messaging._drivers import base from oslo_messaging._drivers import base
from oslo_messaging._drivers.pika_driver import pika_commons as pika_drv_cmns
from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc
from oslo_messaging._drivers.pika_driver import pika_message as pika_drv_msg from oslo_messaging._drivers.pika_driver import pika_message as pika_drv_msg
@ -143,21 +144,17 @@ class PikaPoller(base.Listener):
"""Cleanup allocated resources (channel, connection, etc). It is unsafe """Cleanup allocated resources (channel, connection, etc). It is unsafe
method for internal use only method for internal use only
""" """
if self._channel:
try:
self._channel.close()
except Exception as ex:
if not pika_pool.Connection.is_connection_invalidated(ex):
LOG.exception("Unexpected error during closing channel")
self._channel = None
if self._connection: if self._connection:
try: try:
self._connection.close() self._connection.close()
except Exception as ex: except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS:
if not pika_pool.Connection.is_connection_invalidated(ex): # expected errors
LOG.exception("Unexpected error during closing connection") pass
self._connection = None except Exception:
LOG.exception("Unexpected error during closing connection")
finally:
self._channel = None
self._connection = None
for i in six.moves.range(len(self._message_queue) - 1, -1, -1): for i in six.moves.range(len(self._message_queue) - 1, -1, -1):
message = self._message_queue[i] message = self._message_queue[i]
@ -201,15 +198,25 @@ class PikaPoller(base.Listener):
else: else:
# consumer is stopped so we don't expect new # consumer is stopped so we don't expect new
# messages, just process already sent events # messages, just process already sent events
self._connection.process_data_events( if self._channel is not None:
time_limit=0 self._connection.process_data_events(
) time_limit=0
)
# and return result if we don't see new messages # and return result if we don't see new messages
if last_queue_size == len(self._message_queue): if last_queue_size == len(self._message_queue):
result = self._message_queue[:prefetch_size] result = self._message_queue[:prefetch_size]
del self._message_queue[:prefetch_size] del self._message_queue[:prefetch_size]
return result return result
except pika_pool.Connection.connectivity_errors: except pika_drv_exc.EstablishConnectionException as e:
LOG.warn("Problem during establishing connection for"
"pika poller %s", e, exc_info=True)
time.sleep(
self._pika_engine.host_connection_reconnect_delay
)
except pika_drv_exc.ConnectionException:
self._cleanup()
raise
except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS:
self._cleanup() self._cleanup()
raise raise
@ -220,19 +227,24 @@ class PikaPoller(base.Listener):
with self._lock: with self._lock:
if self._started: if self._started:
return return
self._started = True
self._cleanup()
try: try:
self._reconnect() self._reconnect()
except Exception as exc: except pika_drv_exc.EstablishConnectionException as exc:
LOG.warn("Can not establishing connection during pika "
"Conecting required during first poll() call. %s",
exc, exc_info=True)
except pika_drv_exc.ConnectionException as exc:
self._cleanup() self._cleanup()
if isinstance(exc, pika_pool.Connection.connectivity_errors): LOG.warn("Connectivity problem during pika poller's start(). "
raise pika_drv_exc.ConnectionException( "Reconnecting required during first poll() call. %s",
"Connectivity problem detected during establishing " exc, exc_info=True)
"poller's connection. " + str(exc)) except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS as exc:
else: self._cleanup()
raise exc LOG.warn("Connectivity problem during pika poller's start(). "
"Reconnecting required during first poll() call. %s",
exc, exc_info=True)
self._started = True
def stop(self): def stop(self):
"""Stops poller. Should be called when polling is not needed anymore to """Stops poller. Should be called when polling is not needed anymore to
@ -246,15 +258,10 @@ class PikaPoller(base.Listener):
if self._queues_to_consume and self._channel: if self._queues_to_consume and self._channel:
try: try:
self._stop_consuming() self._stop_consuming()
except Exception as exc: except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS as exc:
self._cleanup() self._cleanup()
if isinstance(exc, LOG.warn("Connectivity problem detected during consumer "
pika_pool.Connection.connectivity_errors): "cancellation. %s", exc, exc_info=True)
raise pika_drv_exc.ConnectionException(
"Connectivity problem detected during "
"consumer canceling. " + str(exc))
else:
raise exc
self._started = False self._started = False
def cleanup(self): def cleanup(self):

View File

@ -22,7 +22,7 @@ from oslo_serialization import jsonutils
import pika import pika
import oslo_messaging import oslo_messaging
from oslo_messaging._drivers.pika_driver import pika_engine from oslo_messaging._drivers.pika_driver import pika_commons as pika_drv_cmns
from oslo_messaging._drivers.pika_driver import pika_message as pika_drv_msg from oslo_messaging._drivers.pika_driver import pika_message as pika_drv_msg
@ -252,7 +252,7 @@ class RpcReplyPikaIncomingMessageTestCase(unittest.TestCase):
def setUp(self): def setUp(self):
self._pika_engine = mock.Mock() self._pika_engine = mock.Mock()
self._pika_engine.allowed_remote_exmods = [ self._pika_engine.allowed_remote_exmods = [
pika_engine._EXCEPTIONS_MODULE, "oslo_messaging.exceptions" pika_drv_cmns.EXCEPTIONS_MODULE, "oslo_messaging.exceptions"
] ]
self._channel = mock.Mock() self._channel = mock.Mock()

View File

@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import socket
import time import time
import unittest import unittest
@ -33,6 +34,18 @@ class PikaPollerTestCase(unittest.TestCase):
) )
self._prefetch_count = 123 self._prefetch_count = 123
def test_start_when_connection_unavailable(self):
incoming_message_class_mock = mock.Mock()
poller = pika_poller.PikaPoller(
self._pika_engine, self._prefetch_count,
incoming_message_class=incoming_message_class_mock
)
self._pika_engine.create_connection.side_effect = socket.timeout()
# start() should not raise socket.timeout exception
poller.start()
@mock.patch("oslo_messaging._drivers.pika_driver.pika_poller.PikaPoller." @mock.patch("oslo_messaging._drivers.pika_driver.pika_poller.PikaPoller."
"_declare_queue_binding") "_declare_queue_binding")
def test_poll(self, declare_queue_binding_mock): def test_poll(self, declare_queue_binding_mock):