diff --git a/mistral/engine/rpc_backend/kombu/kombu_hosts.py b/mistral/engine/rpc_backend/kombu/kombu_hosts.py index e7999159..65fa494d 100644 --- a/mistral/engine/rpc_backend/kombu/kombu_hosts.py +++ b/mistral/engine/rpc_backend/kombu/kombu_hosts.py @@ -15,6 +15,7 @@ import itertools import random +import six import oslo_messaging as messaging @@ -51,7 +52,7 @@ class KombuHosts(object): self._hosts_cycle = itertools.cycle(self._hosts) def get_host(self): - return self._hosts_cycle.next() + return six.next(self._hosts_cycle) def get_hosts(self): return self._hosts diff --git a/mistral/engine/rpc_backend/kombu/kombu_listener.py b/mistral/engine/rpc_backend/kombu/kombu_listener.py index 11cf9001..54908992 100644 --- a/mistral/engine/rpc_backend/kombu/kombu_listener.py +++ b/mistral/engine/rpc_backend/kombu/kombu_listener.py @@ -15,7 +15,7 @@ import itertools from kombu.mixins import ConsumerMixin -from six import moves +import six import threading from oslo_log import log as logging @@ -39,7 +39,7 @@ class KombuRPCListener(ConsumerMixin): self._max_sleep_time = 512 def add_listener(self, correlation_id): - self._results[correlation_id] = moves.queue.Queue() + self._results[correlation_id] = six.moves.queue.Queue() def remove_listener(self, correlation_id): if correlation_id in self._results: @@ -101,7 +101,7 @@ class KombuRPCListener(ConsumerMixin): return self._results[correlation_id].get(block=True, timeout=timeout) def on_connection_error(self, exc, interval): - self.connection = self._connections.next() + self.connection = six.next(self._connections) LOG.debug("Broker connection failed: %s" % exc) LOG.debug("Sleeping for %s seconds, then retrying connection" %