diff --git a/mistral/engine/rpc_backend/kombu/kombu_client.py b/mistral/engine/rpc_backend/kombu/kombu_client.py index 10e0bf2e..c7cec54f 100644 --- a/mistral/engine/rpc_backend/kombu/kombu_client.py +++ b/mistral/engine/rpc_backend/kombu/kombu_client.py @@ -56,15 +56,21 @@ class KombuRPCClient(rpc_base.RPCClient, kombu_base.Base): self._timeout = CONF.rpc_response_timeout self.routing_key = self.topic - host = self._hosts.get_host() + hosts = self._hosts.get_hosts() - self.conn = self._make_connection( - host.hostname, - host.port, - host.username, - host.password, - self.virtual_host - ) + self._connections = [] + + for host in hosts: + conn = self._make_connection( + host.hostname, + host.port, + host.username, + host.password, + self.virtual_host + ) + self._connections.append(conn) + + self.conn = self._connections[0] # Create exchange. exchange = self._make_exchange( @@ -85,7 +91,7 @@ class KombuRPCClient(rpc_base.RPCClient, kombu_base.Base): ) self._listener = kombu_listener.KombuRPCListener( - connection=self.conn, + connections=self._connections, callback_queue=self.callback_queue ) diff --git a/mistral/engine/rpc_backend/kombu/kombu_hosts.py b/mistral/engine/rpc_backend/kombu/kombu_hosts.py index 5dde60bd..e7999159 100644 --- a/mistral/engine/rpc_backend/kombu/kombu_hosts.py +++ b/mistral/engine/rpc_backend/kombu/kombu_hosts.py @@ -52,3 +52,6 @@ class KombuHosts(object): def get_host(self): return self._hosts_cycle.next() + + def get_hosts(self): + return self._hosts diff --git a/mistral/engine/rpc_backend/kombu/kombu_listener.py b/mistral/engine/rpc_backend/kombu/kombu_listener.py index 661bb839..11cf9001 100644 --- a/mistral/engine/rpc_backend/kombu/kombu_listener.py +++ b/mistral/engine/rpc_backend/kombu/kombu_listener.py @@ -13,6 +13,7 @@ # License for the specific language governing permissions and limitations # under the License. +import itertools from kombu.mixins import ConsumerMixin from six import moves import threading @@ -26,11 +27,16 @@ LOG = logging.getLogger(__name__) class KombuRPCListener(ConsumerMixin): - def __init__(self, connection, callback_queue): + def __init__(self, connections, callback_queue): self._results = {} - self.connection = connection + self._connections = itertools.cycle(connections) self._callback_queue = callback_queue self._thread = None + self.connection = self._connections.next() + + # TODO(ddeja): Those 2 options should be gathered from config. + self._sleep_time = 1 + self._max_sleep_time = 512 def add_listener(self, correlation_id): self._results[correlation_id] = moves.queue.Queue() @@ -93,3 +99,11 @@ class KombuRPCListener(ConsumerMixin): def get_result(self, correlation_id, timeout): return self._results[correlation_id].get(block=True, timeout=timeout) + + def on_connection_error(self, exc, interval): + self.connection = self._connections.next() + + LOG.debug("Broker connection failed: %s" % exc) + LOG.debug("Sleeping for %s seconds, then retrying connection" % + interval + ) diff --git a/mistral/engine/rpc_backend/kombu/kombu_server.py b/mistral/engine/rpc_backend/kombu/kombu_server.py index c3197e9b..2016dc68 100644 --- a/mistral/engine/rpc_backend/kombu/kombu_server.py +++ b/mistral/engine/rpc_backend/kombu/kombu_server.py @@ -12,8 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +import amqp import socket import threading +import time import kombu from oslo_config import cfg @@ -69,6 +71,10 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base): self.endpoints = [] self._worker = None + # TODO(ddeja): Those 2 options should be gathered from config. + self._sleep_time = 1 + self._max_sleep_time = 512 + @property def is_running(self): """Return whether server is running.""" @@ -77,57 +83,75 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base): def run(self, executor='blocking'): """Start the server.""" self._prepare_worker(executor) - host = self._hosts.get_host() - self.conn = self._make_connection( - host.hostname, - host.port, - host.username, - host.password, - self.virtual_host, - ) + while True: + try: + host = self._hosts.get_host() - LOG.info("Connected to AMQP at %s:%s" % (host.hostname, host.port)) + self.conn = self._make_connection( + host.hostname, + host.port, + host.username, + host.password, + self.virtual_host, + ) - try: - conn = kombu.connections[self.conn].acquire(block=True) - exchange = self._make_exchange( - self.exchange, - durable=self.durable_queue, - auto_delete=self.auto_delete - ) - queue = self._make_queue( - self.topic, - exchange, - routing_key=self.routing_key, - durable=self.durable_queue, - auto_delete=self.auto_delete - ) - with conn.Consumer( - queues=queue, - callbacks=[self._process_message], - ) as consumer: - consumer.qos(prefetch_count=1) + conn = kombu.connections[self.conn].acquire(block=True) - self._running.set() - self._stopped.clear() + exchange = self._make_exchange( + self.exchange, + durable=self.durable_queue, + auto_delete=self.auto_delete + ) - while self.is_running: - try: - conn.drain_events(timeout=1) - except socket.timeout: - pass - except KeyboardInterrupt: - self.stop() + queue = self._make_queue( + self.topic, + exchange, + routing_key=self.routing_key, + durable=self.durable_queue, + auto_delete=self.auto_delete + ) + with conn.Consumer( + queues=queue, + callbacks=[self._process_message], + ) as consumer: + consumer.qos(prefetch_count=1) - LOG.info("Server with id='{0}' stopped.".format( - self.server_id)) + self._running.set() + self._stopped.clear() - return - except socket.error as e: - raise exc.MistralException("Broker connection failed: %s" % e) - finally: - self._stopped.set() + LOG.info("Connected to AMQP at %s:%s" % ( + host.hostname, + host.port + )) + + while self.is_running: + try: + conn.drain_events(timeout=1) + except socket.timeout: + pass + except KeyboardInterrupt: + self.stop() + + LOG.info("Server with id='{0}' stopped.".format( + self.server_id)) + + return + except (socket.error, amqp.exceptions.ConnectionForced) as e: + LOG.debug("Broker connection failed: %s" % e) + finally: + self._stopped.set() + + LOG.debug("Sleeping for %s seconds, than retrying connection" % + self._sleep_time + ) + + time.sleep(self._sleep_time) + + self._sleep_time = min( + self._sleep_time * 2, + self._max_sleep_time + ) def stop(self, graceful=False): self._running.clear()