Merge "test: Don't test message's reply timeout"
This commit is contained in:
commit
5972b2310f
@ -403,11 +403,6 @@ class TestSendReceive(test_utils.BaseTestCase):
|
||||
('zero', dict(rx_id=False, reply=0)),
|
||||
]
|
||||
|
||||
_reply_fail = [
|
||||
('reply_success', dict(reply_failure_404=False)),
|
||||
('reply_failure', dict(reply_failure_404=True)),
|
||||
]
|
||||
|
||||
_failure = [
|
||||
('success', dict(failure=False)),
|
||||
('failure', dict(failure=True, expected=False)),
|
||||
@ -424,7 +419,6 @@ class TestSendReceive(test_utils.BaseTestCase):
|
||||
cls.scenarios = testscenarios.multiply_scenarios(cls._n_senders,
|
||||
cls._context,
|
||||
cls._reply,
|
||||
cls._reply_fail,
|
||||
cls._failure,
|
||||
cls._timeout)
|
||||
|
||||
@ -457,10 +451,8 @@ class TestSendReceive(test_utils.BaseTestCase):
|
||||
|
||||
def send_and_wait_for_reply(i):
|
||||
try:
|
||||
if self.reply_failure_404:
|
||||
timeout = 0.01
|
||||
else:
|
||||
timeout = self.timeout
|
||||
|
||||
timeout = self.timeout
|
||||
replies.append(driver.send(target,
|
||||
self.ctxt,
|
||||
{'tx_id': i},
|
||||
@ -470,8 +462,7 @@ class TestSendReceive(test_utils.BaseTestCase):
|
||||
self.assertIsNone(self.timeout)
|
||||
except (ZeroDivisionError, oslo_messaging.MessagingTimeout) as e:
|
||||
replies.append(e)
|
||||
self.assertTrue(self.failure or self.timeout is not None
|
||||
or self.reply_failure_404)
|
||||
self.assertTrue(self.failure or self.timeout is not None)
|
||||
|
||||
while len(senders) < self.n_senders:
|
||||
senders.append(threading.Thread(target=send_and_wait_for_reply,
|
||||
@ -491,18 +482,6 @@ class TestSendReceive(test_utils.BaseTestCase):
|
||||
if len(order) > 1:
|
||||
order[-1], order[-2] = order[-2], order[-1]
|
||||
|
||||
if self.reply_failure_404:
|
||||
start = time.time()
|
||||
# NOTE(sileht): Simulate a rpc client restart
|
||||
# By returning a ExchangeNotFound when we try to
|
||||
# send reply
|
||||
exc = (driver._reply_q_conn.connection.
|
||||
connection.channel_errors[0]())
|
||||
exc.code = 404
|
||||
self.useFixture(mockpatch.Patch(
|
||||
'kombu.messaging.Producer.publish',
|
||||
side_effect=exc))
|
||||
|
||||
for i in order:
|
||||
if self.timeout is None:
|
||||
if self.failure:
|
||||
@ -516,22 +495,11 @@ class TestSendReceive(test_utils.BaseTestCase):
|
||||
msgs[i].reply({'rx_id': i})
|
||||
else:
|
||||
msgs[i].reply(self.reply)
|
||||
elif self.reply_failure_404:
|
||||
msgs[i].reply({})
|
||||
senders[i].join()
|
||||
|
||||
if self.reply_failure_404:
|
||||
# NOTE(sileht) all reply fail, first take
|
||||
# kombu_missing_consumer_retry_timeout seconds to fail
|
||||
# next immediately fail
|
||||
dt = time.time() - start
|
||||
rabbit_conf = self.conf.oslo_messaging_rabbit
|
||||
timeout = rabbit_conf.kombu_missing_consumer_retry_timeout
|
||||
self.assertTrue(timeout <= dt < (timeout + 0.100), dt)
|
||||
|
||||
self.assertEqual(len(senders), len(replies))
|
||||
for i, reply in enumerate(replies):
|
||||
if self.timeout is not None or self.reply_failure_404:
|
||||
if self.timeout is not None:
|
||||
self.assertIsInstance(reply, oslo_messaging.MessagingTimeout)
|
||||
elif self.failure:
|
||||
self.assertIsInstance(reply, ZeroDivisionError)
|
||||
|
Loading…
Reference in New Issue
Block a user