Merge "Fix some comments in a backporting review session"

This commit is contained in:
Jenkins 2015-01-08 19:23:39 +00:00 committed by Gerrit Code Review
commit 8102f25dd2
4 changed files with 10 additions and 7 deletions

View File

@ -25,6 +25,7 @@ from oslo import messaging
from oslo.messaging._drivers import amqp as rpc_amqp from oslo.messaging._drivers import amqp as rpc_amqp
from oslo.messaging._drivers import base from oslo.messaging._drivers import base
from oslo.messaging._drivers import common as rpc_common from oslo.messaging._drivers import common as rpc_common
from oslo.messaging._i18n import _
from oslo.messaging._i18n import _LI from oslo.messaging._i18n import _LI
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -207,7 +208,7 @@ class ReplyWaiter(object):
@staticmethod @staticmethod
def _raise_timeout_exception(msg_id): def _raise_timeout_exception(msg_id):
raise messaging.MessagingTimeout( raise messaging.MessagingTimeout(
'Timed out waiting for a reply to message ID %s' % msg_id) _('Timed out waiting for a reply to message ID %s.') % msg_id)
def _process_reply(self, data): def _process_reply(self, data):
result = None result = None
@ -271,7 +272,8 @@ class ReplyWaiter(object):
# have the first thread take responsibility for passing replies not # have the first thread take responsibility for passing replies not
# intended for itself to the appropriate thread. # intended for itself to the appropriate thread.
# #
timer = rpc_common.DecayingTimer(duration=timeout).start() timer = rpc_common.DecayingTimer(duration=timeout)
timer.start()
final_reply = None final_reply = None
while True: while True:
if self.conn_lock.acquire(False): if self.conn_lock.acquire(False):

View File

@ -340,15 +340,14 @@ class DecayingTimer(object):
def start(self): def start(self):
if self._duration is not None: if self._duration is not None:
self._ends_at = time.time() + max(0, self._duration) self._ends_at = time.time() + max(0, self._duration)
return self
def check_return(self, timeout_callback, *args, **kwargs): def check_return(self, timeout_callback, *args, **kwargs):
maximum = kwargs.pop('maximum', None) maximum = kwargs.pop('maximum', None)
if self._duration is None: if self._duration is None:
return None if maximum is None else maximum return None if maximum is None else maximum
if self._ends_at is None: if self._ends_at is None:
raise RuntimeError("Can not check/return a timeout from a timer" raise RuntimeError(_("Can not check/return a timeout from a timer"
" that has not been started") " that has not been started."))
left = self._ends_at - time.time() left = self._ends_at - time.time()
if left <= 0: if left <= 0:

View File

@ -642,7 +642,8 @@ class Connection(object):
def iterconsume(self, limit=None, timeout=None): def iterconsume(self, limit=None, timeout=None):
"""Return an iterator that will consume from all queues/consumers.""" """Return an iterator that will consume from all queues/consumers."""
timer = rpc_common.DecayingTimer(duration=timeout).start() timer = rpc_common.DecayingTimer(duration=timeout)
timer.start()
def _raise_timeout(exc): def _raise_timeout(exc):
LOG.debug('Timed out waiting for RPC response: %s', exc) LOG.debug('Timed out waiting for RPC response: %s', exc)

View File

@ -701,7 +701,8 @@ class Connection(object):
def iterconsume(self, limit=None, timeout=None): def iterconsume(self, limit=None, timeout=None):
"""Return an iterator that will consume from all queues/consumers.""" """Return an iterator that will consume from all queues/consumers."""
timer = rpc_common.DecayingTimer(duration=timeout).start() timer = rpc_common.DecayingTimer(duration=timeout)
timer.start()
def _raise_timeout(exc): def _raise_timeout(exc):
LOG.debug('Timed out waiting for RPC response: %s', exc) LOG.debug('Timed out waiting for RPC response: %s', exc)