rework close/destroy interfaces
This commit is contained in:
parent
81c3b3a75c
commit
8972c797b3
@ -112,6 +112,13 @@ class MyConnection(fusion.ConnectionEventHandler):
|
||||
|
||||
self.connection.process(time.time())
|
||||
|
||||
def close(self, error=None):
|
||||
self.connection.close(error)
|
||||
|
||||
@property
|
||||
def closed(self):
|
||||
return self.connection.closed
|
||||
|
||||
def destroy(self, error=None):
|
||||
self.connection.user_context = None
|
||||
self.connection.destroy()
|
||||
@ -170,6 +177,10 @@ class MyCaller(fusion.SenderEventHandler,
|
||||
def done(self):
|
||||
return self._send_completed and self._response_received
|
||||
|
||||
def close(self):
|
||||
self._sender.close(None)
|
||||
self._receiver.close(None)
|
||||
|
||||
def destroy(self):
|
||||
if self._sender:
|
||||
self._sender.destroy()
|
||||
@ -300,8 +311,14 @@ def main(argv=None):
|
||||
|
||||
print "DONE"
|
||||
|
||||
my_caller.close()
|
||||
my_connection.close()
|
||||
while not my_connection.closed:
|
||||
my_connection.process()
|
||||
|
||||
print "CLOSED"
|
||||
my_caller.destroy()
|
||||
my_connection.process()
|
||||
my_connection.destroy()
|
||||
|
||||
return 0
|
||||
|
||||
|
@ -319,10 +319,6 @@ Associate an arbitrary user object with this Connection.
|
||||
self._pn_transport.close_head()
|
||||
self._write_done = True
|
||||
|
||||
@property
|
||||
def closed(self):
|
||||
return self._write_done and self._read_done
|
||||
|
||||
def create_sender(self, source_address, target_address=None,
|
||||
eventHandler=None, name=None, properties={}):
|
||||
"""Factory for Sender links"""
|
||||
@ -403,18 +399,29 @@ Associate an arbitrary user object with this Connection.
|
||||
# @todo support reason for close
|
||||
pn_link.close()
|
||||
|
||||
def destroy(self, error=None):
|
||||
def close(self, error=None):
|
||||
"""
|
||||
"""
|
||||
for l in self._sender_links.itervalues():
|
||||
l.destroy(error)
|
||||
self._sender_links = {}
|
||||
l.close(error)
|
||||
for l in self._receiver_links.itervalues():
|
||||
l.destroy(error)
|
||||
self._receiver_links = {}
|
||||
l.close(error)
|
||||
self._pn_session.close()
|
||||
self._pn_session = None
|
||||
self._pn_connection.close()
|
||||
|
||||
@property
|
||||
def closed(self):
|
||||
#return self._write_done and self._read_done
|
||||
state = self._pn_connection.state
|
||||
return state == (proton.Endpoint.LOCAL_CLOSED
|
||||
| proton.Endpoint.REMOTE_CLOSED)
|
||||
|
||||
def destroy(self):
|
||||
"""
|
||||
"""
|
||||
self._pending_links.clear()
|
||||
self._sender_links.clear()
|
||||
self._receiver_links.clear()
|
||||
self._pn_connection = None
|
||||
self._pn_transport = None
|
||||
self._user_context = None
|
||||
|
@ -91,7 +91,16 @@ Associate an arbitrary application object with this link.
|
||||
else:
|
||||
return self._pn_link.remote_target.address
|
||||
|
||||
def _destroy(self):
|
||||
def close(self, error=None):
|
||||
self._pn_link.close()
|
||||
|
||||
@property
|
||||
def closed(self):
|
||||
state = self._pn_link.state
|
||||
return state == (proton.Endpoint.LOCAL_CLOSED
|
||||
| proton.Endpoint.REMOTE_CLOSED)
|
||||
|
||||
def destroy(self):
|
||||
self._user_context = None
|
||||
self._pn_link.context = None
|
||||
self._pn_link = None
|
||||
@ -156,8 +165,7 @@ class SenderLink(_Link):
|
||||
def credit(self):
|
||||
return self._pn_link.credit()
|
||||
|
||||
def destroy(self, error=None):
|
||||
self._pn_link.close()
|
||||
def close(self, error=None):
|
||||
while self._pending_sends:
|
||||
i = self._pending_sends.popleft()
|
||||
cb = i[1]
|
||||
@ -168,10 +176,13 @@ class SenderLink(_Link):
|
||||
cb = i[1]
|
||||
handle = i[2]
|
||||
cb(self, handle, self.ABORTED, error)
|
||||
self._pending_acks = {}
|
||||
self._pending_acks.clear()
|
||||
super(SenderLink, self).close()
|
||||
|
||||
def destroy(self):
|
||||
self._connection._remove_sender(self._name)
|
||||
self._connection = None
|
||||
super(SenderLink, self)._destroy()
|
||||
super(SenderLink, self).destroy()
|
||||
|
||||
def _delivery_updated(self, delivery):
|
||||
# A delivery has changed state.
|
||||
@ -267,10 +278,9 @@ class ReceiverLink(_Link):
|
||||
self._settle_delivery(handle, proton.Delivery.MODIFIED)
|
||||
|
||||
def destroy(self, error=None):
|
||||
self._pn_link.close()
|
||||
self._connection._remove_receiver(self._name)
|
||||
self._connection = None
|
||||
super(ReceiverLink, self)._destroy()
|
||||
super(ReceiverLink, self).destroy()
|
||||
|
||||
def _delivery_updated(self, delivery):
|
||||
# a receive delivery changed state
|
||||
|
Loading…
x
Reference in New Issue
Block a user