Merge "Mistral fails on RabbitMQ restart"

This commit is contained in:
Jenkins 2017-10-06 11:49:57 +00:00 committed by Gerrit Code Review
commit ea0c07cac1
5 changed files with 74 additions and 19 deletions

View File

@ -68,7 +68,8 @@ class Base(object):
userid=amqp_user, userid=amqp_user,
password=amqp_password, password=amqp_password,
virtual_host=amqp_vhost, virtual_host=amqp_vhost,
port=amqp_port port=amqp_port,
transport_options={'confirm_publish': True}
) )
@staticmethod @staticmethod

View File

@ -11,7 +11,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import socket
import itertools
import errno
import six
from six import moves from six import moves
import kombu import kombu
@ -25,6 +30,11 @@ from mistral.rpc.kombu import kombu_hosts
from mistral.rpc.kombu import kombu_listener from mistral.rpc.kombu import kombu_listener
from mistral import utils from mistral import utils
#: When connection to the RabbitMQ server breaks, the
#: client will receive EPIPE socket errors. These indicate
#: an error that may be fixed by retrying. This constant
#: is a guess for how many times the retry may be reasonable
EPIPE_RETRIES = 4
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -55,7 +65,7 @@ class KombuRPCClient(rpc_base.RPCClient, kombu_base.Base):
hosts = self._hosts.get_hosts() hosts = self._hosts.get_hosts()
self._connections = [] connections = []
for host in hosts: for host in hosts:
conn = self._make_connection( conn = self._make_connection(
@ -65,9 +75,9 @@ class KombuRPCClient(rpc_base.RPCClient, kombu_base.Base):
host.password, host.password,
self.virtual_host self.virtual_host
) )
self._connections.append(conn) connections.append(conn)
self.conn = self._connections[0] self._connections = itertools.cycle(connections)
# Create exchange. # Create exchange.
exchange = self._make_exchange( exchange = self._make_exchange(
@ -135,15 +145,9 @@ class KombuRPCClient(rpc_base.RPCClient, kombu_base.Base):
self._listener.add_listener(correlation_id) self._listener.add_listener(correlation_id)
# Publish request. # Publish request.
with kombu.producers[self.conn].acquire(block=True) as producer: for retry_round in six.moves.range(EPIPE_RETRIES):
producer.publish( if self._publish_request(body, correlation_id):
body=body, break
exchange=self.exchange,
routing_key=self.topic,
reply_to=self.queue_name,
correlation_id=correlation_id,
delivery_mode=2
)
# Start waiting for response. # Start waiting for response.
if async_: if async_:
@ -169,6 +173,39 @@ class KombuRPCClient(rpc_base.RPCClient, kombu_base.Base):
return res_object return res_object
def _publish_request(self, body, correlation_id):
"""Publishes the request message
.. note::
The :const:`errno.EPIPE` socket errors are suppressed
and result in False being returned. This is because
this type of error can usually be fixed by retrying.
:param body: message body
:param correlation_id: correlation id
:return: True if publish succeeded, False otherwise
:rtype: bool
"""
try:
conn = self._listener.wait_ready()
if conn:
with kombu.producers[conn].acquire(block=True) as producer:
producer.publish(
body=body,
exchange=self.exchange,
routing_key=self.topic,
reply_to=self.queue_name,
correlation_id=correlation_id,
delivery_mode=2
)
return True
except socket.error as e:
if e.errno != errno.EPIPE:
raise
else:
LOG.debug('Retrying publish due to broker connection failure')
return False
def sync_call(self, ctx, method, target=None, **kwargs): def sync_call(self, ctx, method, target=None, **kwargs):
return self._call(ctx, method, async_=False, target=target, **kwargs) return self._call(ctx, method, async_=False, target=target, **kwargs)

View File

@ -34,9 +34,7 @@ class KombuRPCListener(ConsumerMixin):
self._thread = None self._thread = None
self.connection = six.next(self._connections) self.connection = six.next(self._connections)
# TODO(ddeja): Those 2 options should be gathered from config. self.ready = threading.Event()
self._sleep_time = 1
self._max_sleep_time = 512
def add_listener(self, correlation_id): def add_listener(self, correlation_id):
self._results[correlation_id] = six.moves.queue.Queue() self._results[correlation_id] = six.moves.queue.Queue()
@ -46,11 +44,14 @@ class KombuRPCListener(ConsumerMixin):
del self._results[correlation_id] del self._results[correlation_id]
def get_consumers(self, Consumer, channel): def get_consumers(self, Consumer, channel):
return [Consumer( consumers = [Consumer(
self._callback_queue, self._callback_queue,
callbacks=[self.on_message], callbacks=[self.on_message],
accept=['pickle', 'json'] accept=['pickle', 'json']
)] )]
self.ready.set()
return consumers
def start(self): def start(self):
if self._thread is None: if self._thread is None:
@ -102,6 +103,8 @@ class KombuRPCListener(ConsumerMixin):
return self._results[correlation_id].get(block=True, timeout=timeout) return self._results[correlation_id].get(block=True, timeout=timeout)
def on_connection_error(self, exc, interval): def on_connection_error(self, exc, interval):
self.ready.clear()
self.connection = six.next(self._connections) self.connection = six.next(self._connections)
LOG.debug("Broker connection failed: %s", exc) LOG.debug("Broker connection failed: %s", exc)
@ -109,3 +112,16 @@ class KombuRPCListener(ConsumerMixin):
"Sleeping for %s seconds, then retrying connection", "Sleeping for %s seconds, then retrying connection",
interval interval
) )
def wait_ready(self, timeout=10.0):
"""Waits for the listener to successfully declare the consumer
:param timeout: timeout for waiting in seconds
:return: same as :func:`~threading.Event.wait`
:rtype: bool
"""
if self.ready.wait(timeout=timeout):
return self.connection
else:
return False

View File

@ -77,7 +77,7 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base):
# TODO(ddeja): Those 2 options should be gathered from config. # TODO(ddeja): Those 2 options should be gathered from config.
self._sleep_time = 1 self._sleep_time = 1
self._max_sleep_time = 512 self._max_sleep_time = 10
@property @property
def is_running(self): def is_running(self):
@ -136,6 +136,7 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base):
host.hostname, host.hostname,
host.port host.port
) )
self._sleep_time = 1
while self.is_running: while self.is_running:
try: try:

View File

@ -107,7 +107,7 @@ class KombuServerTestCase(base.KombuTestCase):
fake_kombu.connection.acquire.return_value = acquire_mock fake_kombu.connection.acquire.return_value = acquire_mock
self.assertRaises(TestException, self.server._run, 'blocking') self.assertRaises(TestException, self.server._run, 'blocking')
self.assertEqual(self.server._sleep_time, 2) self.assertEqual(self.server._sleep_time, 1)
def test_run_socket_timeout_still_running(self): def test_run_socket_timeout_still_running(self):