Merge "Amqp driver send method temporary work-around"
This commit is contained in:
commit
4c8f7c4f83
@ -233,7 +233,6 @@ class ProtonDriver(base.BaseDriver):
|
|||||||
if retry is not None:
|
if retry is not None:
|
||||||
raise NotImplementedError('"retry" not implemented by '
|
raise NotImplementedError('"retry" not implemented by '
|
||||||
'this transport driver')
|
'this transport driver')
|
||||||
|
|
||||||
request = marshal_request(message, ctxt, envelope)
|
request = marshal_request(message, ctxt, envelope)
|
||||||
expire = 0
|
expire = 0
|
||||||
if timeout:
|
if timeout:
|
||||||
@ -246,14 +245,16 @@ class ProtonDriver(base.BaseDriver):
|
|||||||
self._ctrl.add_task(task)
|
self._ctrl.add_task(task)
|
||||||
# wait for the eventloop to process the command. If the command is
|
# wait for the eventloop to process the command. If the command is
|
||||||
# an RPC call retrieve the reply message
|
# an RPC call retrieve the reply message
|
||||||
reply = task.wait(timeout)
|
|
||||||
if reply:
|
if wait_for_reply:
|
||||||
# TODO(kgiusti) how to handle failure to un-marshal? Must log, and
|
reply = task.wait(timeout)
|
||||||
# determine best way to communicate this failure back up to the
|
if reply:
|
||||||
# caller
|
# TODO(kgiusti) how to handle failure to un-marshal?
|
||||||
reply = unmarshal_response(reply, self._allowed_remote_exmods)
|
# Must log, and determine best way to communicate this failure
|
||||||
LOG.debug("Send to %s returning", target)
|
# back up to the caller
|
||||||
return reply
|
reply = unmarshal_response(reply, self._allowed_remote_exmods)
|
||||||
|
LOG.debug("Send to %s returning", target)
|
||||||
|
return reply
|
||||||
|
|
||||||
@_ensure_connect_called
|
@_ensure_connect_called
|
||||||
def send_notification(self, target, ctxt, message, version,
|
def send_notification(self, target, ctxt, message, version,
|
||||||
|
@ -295,7 +295,6 @@ class TestAmqpNotification(_AmqpBrokerTestCase):
|
|||||||
'topic-2.debug']
|
'topic-2.debug']
|
||||||
|
|
||||||
excepted_targets = []
|
excepted_targets = []
|
||||||
exception_count = 0
|
|
||||||
for version in (1.0, 2.0):
|
for version in (1.0, 2.0):
|
||||||
for t in targets:
|
for t in targets:
|
||||||
try:
|
try:
|
||||||
@ -303,7 +302,6 @@ class TestAmqpNotification(_AmqpBrokerTestCase):
|
|||||||
"context", {'target': t},
|
"context", {'target': t},
|
||||||
version)
|
version)
|
||||||
except oslo_messaging.MessageDeliveryFailure:
|
except oslo_messaging.MessageDeliveryFailure:
|
||||||
exception_count += 1
|
|
||||||
excepted_targets.append(t)
|
excepted_targets.append(t)
|
||||||
|
|
||||||
listener.join(timeout=30)
|
listener.join(timeout=30)
|
||||||
@ -314,9 +312,8 @@ class TestAmqpNotification(_AmqpBrokerTestCase):
|
|||||||
self.assertEqual(topics.count('topic-1.error'), 2)
|
self.assertEqual(topics.count('topic-1.error'), 2)
|
||||||
self.assertEqual(topics.count('topic-2.debug'), 2)
|
self.assertEqual(topics.count('topic-2.debug'), 2)
|
||||||
self.assertEqual(self._broker.dropped_count, 4)
|
self.assertEqual(self._broker.dropped_count, 4)
|
||||||
self.assertEqual(exception_count, 4)
|
self.assertEqual(excepted_targets.count('topic-1.bad'), 0)
|
||||||
self.assertEqual(excepted_targets.count('topic-1.bad'), 2)
|
self.assertEqual(excepted_targets.count('bad-topic.debug'), 0)
|
||||||
self.assertEqual(excepted_targets.count('bad-topic.debug'), 2)
|
|
||||||
driver.cleanup()
|
driver.cleanup()
|
||||||
|
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user