Simulate remote link close on remote session close
This commit is contained in:
parent
9d1f78eca1
commit
8415834f35
python
@ -23,8 +23,6 @@ __all__ = [
|
||||
import logging
|
||||
import time
|
||||
|
||||
from link import ReceiverLink
|
||||
from link import SenderLink
|
||||
from link import _SessionProxy
|
||||
|
||||
import proton
|
||||
@ -309,7 +307,7 @@ class Connection(object):
|
||||
pn_link.name)
|
||||
pn_link = pn_link.next(self._REMOTE_REQ)
|
||||
|
||||
# TODO(kgiusti) won't scale - use new Enging event API
|
||||
# TODO(kgiusti) won't scale - use new Engine event API
|
||||
pn_link = self._pn_connection.link_head(self._ACTIVE)
|
||||
while pn_link:
|
||||
self._add_work(pn_link.context)
|
||||
@ -330,8 +328,10 @@ class Connection(object):
|
||||
pn_session = self._pn_connection.session_head(self._REMOTE_CLOSE)
|
||||
while pn_session:
|
||||
LOG.debug("Session closed remotely")
|
||||
pn_session.close()
|
||||
pn_session = pn_session.next(self._REMOTE_CLOSE)
|
||||
next_session = pn_session.next(self._REMOTE_CLOSE)
|
||||
session = pn_session.context
|
||||
session.remote_closed()
|
||||
pn_session = next_session
|
||||
|
||||
if self._pn_connection.state == self._REMOTE_CLOSE:
|
||||
LOG.debug("Connection remotely closed")
|
||||
|
@ -129,10 +129,13 @@ class _Link(object):
|
||||
def destroy(self):
|
||||
LOG.debug("link destroyed %s", str(self._pn_link))
|
||||
self._user_context = None
|
||||
self._connection = None
|
||||
if self._pn_link:
|
||||
session = self._pn_link.session.context
|
||||
self._pn_link.context = None
|
||||
self._pn_link.free()
|
||||
self._pn_link = None
|
||||
session.link_destroyed(self) # destroy session _after_ link
|
||||
|
||||
## Link State Machine
|
||||
|
||||
@ -228,9 +231,11 @@ class _Link(object):
|
||||
fsm = _Link._STATE_MAP[self._state]
|
||||
entry = fsm.get(_Link._REMOTE_CLOSED)
|
||||
if entry:
|
||||
self._state = entry[1]
|
||||
if entry[0]:
|
||||
entry[0](self)
|
||||
LOG.debug("Old State: %d Event %d New State: %d",
|
||||
self._state, _Link._REMOTE_CLOSED, entry[0])
|
||||
self._state = entry[0]
|
||||
if entry[1]:
|
||||
entry[1](self)
|
||||
|
||||
_Link._STATE_MAP = [ # {event: (next-state, action), ...}
|
||||
# _STATE_UNINIT:
|
||||
@ -594,6 +599,19 @@ class _SessionProxy(object):
|
||||
self._links.add(rl)
|
||||
return rl
|
||||
|
||||
@property
|
||||
def link_count(self):
|
||||
return len(self._links)
|
||||
def link_destroyed(self, link):
|
||||
"""Link has been destroyed"""
|
||||
self._links.discard(link)
|
||||
if not self._links:
|
||||
# no more links
|
||||
LOG.debug("destroying unneeded session")
|
||||
self._pn_session.close()
|
||||
self._pn_session.free()
|
||||
self._pn_session = None
|
||||
self._connection = None
|
||||
|
||||
def remote_closed(self):
|
||||
"""Peer has closed its end of the session."""
|
||||
links = self._links.copy() # may modify _links
|
||||
for link in links:
|
||||
link._session_closed()
|
||||
|
Loading…
x
Reference in New Issue
Block a user