Fix credit stall issue by invoking the credit_granted callback
whenever credit increases rather than only after credit has been exhausted.
This commit is contained in:
parent
dd77107d5e
commit
f922b7842d
@ -521,13 +521,12 @@ class SenderLink(_Link):
|
||||
pn_delivery = self._pn_link.current
|
||||
|
||||
# Alert if credit has become available
|
||||
new_credit = self._pn_link.credit
|
||||
if self._handler and not self._rejected:
|
||||
if self._last_credit <= 0 and new_credit > 0:
|
||||
if 0 < self._pn_link.credit > self._last_credit:
|
||||
LOG.debug("Credit is available, link=%s", self.name)
|
||||
with self._callback_lock:
|
||||
self._handler.credit_granted(self)
|
||||
self._last_credit = new_credit
|
||||
self._last_credit = self._pn_link.credit
|
||||
|
||||
def _write_msg(self, pn_delivery, send_req):
|
||||
# given a writable delivery, send a message
|
||||
|
@ -148,7 +148,7 @@ class PerfReceiver(pyngus.ReceiverEventHandler):
|
||||
self.perf_conn.latency += latency
|
||||
self.perf_conn.latency_min = min(latency, self.perf_conn.latency_min)
|
||||
self.perf_conn.latency_max = max(latency, self.perf_conn.latency_max)
|
||||
if self.link.capacity < self.credit_low and \
|
||||
if self.link.capacity <= self.credit_low and \
|
||||
self.received < self.msg_count:
|
||||
self.link.add_capacity(self.credit_window - self.link.capacity)
|
||||
elif self.received == self.msg_count:
|
||||
|
@ -254,8 +254,8 @@ class APITest(common.Test):
|
||||
self.process_connections()
|
||||
assert receiver.capacity == 4
|
||||
assert sender.credit == 4
|
||||
# callback only occurs when credit is no longer zero:
|
||||
assert sl_handler.credit_granted_ct == 1
|
||||
# callback occurs when credit is increased:
|
||||
assert sl_handler.credit_granted_ct == 2
|
||||
assert sender.pending == 0
|
||||
msg = Message()
|
||||
msg.body = "Hi"
|
||||
@ -279,26 +279,27 @@ class APITest(common.Test):
|
||||
sender.send(msg)
|
||||
self.process_connections()
|
||||
assert sender.pending == 2
|
||||
assert sl_handler.credit_granted_ct == 1
|
||||
assert sl_handler.credit_granted_ct == 2
|
||||
receiver.add_capacity(1)
|
||||
self.process_connections()
|
||||
assert receiver.capacity == 0
|
||||
assert rl_handler.message_received_ct == 5
|
||||
assert sender.credit == 0
|
||||
assert sender.pending == 1
|
||||
assert sl_handler.credit_granted_ct == 1
|
||||
assert sl_handler.credit_granted_ct == 2
|
||||
|
||||
receiver.add_capacity(1)
|
||||
self.process_connections()
|
||||
assert sender.credit == 0
|
||||
assert sender.pending == 0
|
||||
assert sl_handler.credit_granted_ct == 1
|
||||
# credit callback not called since pending ate it
|
||||
assert sl_handler.credit_granted_ct == 2
|
||||
|
||||
# verify new credit becomes available:
|
||||
receiver.add_capacity(1)
|
||||
self.process_connections()
|
||||
assert sender.credit == 1
|
||||
assert sl_handler.credit_granted_ct == 2
|
||||
assert sl_handler.credit_granted_ct == 3
|
||||
|
||||
def test_send_presettled(self):
|
||||
sender, receiver = self._setup_sender_sync()
|
||||
|
Loading…
x
Reference in New Issue
Block a user