From eb3013f51215d470bd4c8423665bcb37b629707c Mon Sep 17 00:00:00 2001 From: Kenneth Giusti <kgiusti@redhat.com> Date: Thu, 9 Jan 2014 15:55:34 -0500 Subject: [PATCH] Various updates and fixes: Separate endpoint initialization from opening. Allow remote to name the link. Fix examples to index using container+link name. Clean up connection processing state. --- examples/python/rpc-client.py | 166 ++++++++++++---- examples/python/rpc-server.py | 153 ++++++++++---- python/connection.py | 361 +++++++++++++++++++--------------- python/link.py | 34 ++-- python/sockets.py | 8 +- 5 files changed, 472 insertions(+), 250 deletions(-) diff --git a/examples/python/rpc-client.py b/examples/python/rpc-client.py index a2b6c22..7af0718 100644 --- a/examples/python/rpc-client.py +++ b/examples/python/rpc-client.py @@ -37,16 +37,34 @@ to a server, and waits for a response. The method call is a map of the form: class MyConnection(fusion.ConnectionEventHandler): - def __init__(self, name, socket, container, properties): + def __init__(self, name, container, properties): self.name = name - self.socket = socket - self.connection = container.create_connection(name, self, - properties) - self.connection.user_context = self + self.container = container + self.properties = properties + self.socket = None + self.caller = None + self.connection = None + def reset(self): + if self.caller: + self.caller.reset() + if self.connection: + self.connection.user_context = None + self.connection.destroy() + self.connection = None + if self.socket: + self.socket.close() + self.socket = None + + def connect(self, socket): + self.reset() + self.socket = socket + self.connection = self.container.create_connection(self.name, + self, + self.properties) + self.connection.user_context = self self.connection.sasl.mechanisms("ANONYMOUS") self.connection.sasl.client() - self.connection.open() def process(self): @@ -84,11 +102,20 @@ class MyConnection(fusion.ConnectionEventHandler): return self.connection.closed def destroy(self, error=None): - self.connection.user_context = None - self.connection.destroy() - self.connection = None - self.socket.close() - self.socket = None + self.reset() + self.caller.destroy() + self.caller = None + self.container = None + + def create_caller(self, method_map, source_addr, target_addr, + receiver_properties, sender_properties): + """ Caller factory + """ + if self.caller: + self.caller.destroy() + self.caller = MyCaller(method_map, self, source_addr, target_addr, + receiver_properties, sender_properties) + return self.caller # Connection callbacks: @@ -96,7 +123,10 @@ class MyConnection(fusion.ConnectionEventHandler): """connection handshake completed""" LOG.debug("Connection active callback") - def connection_closed(self, connection, reason): + def connection_remote_closed(self, connection, reason): + LOG.debug("Connection remote closed callback") + + def connection_closed(self, connection): LOG.debug("Connection closed callback") def sender_requested(self, connection, link_handle, @@ -123,20 +153,48 @@ class MyCaller(fusion.SenderEventHandler, def __init__(self, method_map, my_connection, my_source, my_target, receiver_properties, sender_properties): - conn = my_connection.connection - self._sender = conn.create_sender(my_source, target_address=None, - eventHandler=self, name=my_source, - properties=sender_properties) - self._receiver = conn.create_receiver(my_target, source_address=None, - eventHandler=self, name=my_target, - properties=receiver_properties) + #self._name = uuid.uuid4().hex + self._my_connection = my_connection + self._source_addr = my_source + self._target_addr = my_target + self._receiver_properties = receiver_properties + self._sender_properties = sender_properties self._method = method_map + self._sender = None + self._receiver = None + self.reset() + + def reset(self): + LOG.debug("Resetting my-caller") + # @todo: for now, use new name as engine isn't cleaning up link state properly... + self._name = uuid.uuid4().hex self._reply_to = None self._to = None - self._send_completed = False self._response_received = False + if self._sender: + self._sender.destroy() + self._sender = None + + if self._receiver: + self._receiver.destroy() + self._receiver = None + + def connect(self): + self.reset() + LOG.debug("Connecting my-caller") + conn = self._my_connection.connection + self._sender = conn.create_sender(self._source_addr, target_address=None, + eventHandler=self, + #name=self._source_addr, + name=self._name, + properties=self._sender_properties) + self._receiver = conn.create_receiver(self._target_addr, source_address=None, + eventHandler=self, + #name=self._target_addr, + name=self._name, + properties=self._receiver_properties) self._sender.open() self._receiver.add_capacity(1) self._receiver.open() @@ -145,14 +203,17 @@ class MyCaller(fusion.SenderEventHandler, return self._send_completed and self._response_received def close(self): + LOG.debug("Closing my-caller") self._sender.close(None) self._receiver.close(None) + def closed(self): + return self._sender.closed and self._receiver.closed + def destroy(self): - if self._sender: - self._sender.destroy() - if self._receiver: - self._receiver.destroy() + LOG.debug("Destroying my-caller") + self.reset() + self._my_connection = None def _send_request(self): """Send a message containing the RPC method call @@ -179,7 +240,12 @@ class MyCaller(fusion.SenderEventHandler, if self._reply_to: self._send_request() - def sender_closed(self, sender_link, error): + def sender_remote_closed(self, sender_link, error): + LOG.debug("Sender remote closed callback") + assert sender_link is self._sender + self._sender.close() + + def sender_closed(self, sender_link): LOG.debug("Sender closed callback") assert sender_link is self._sender self._send_completed = True @@ -200,7 +266,12 @@ class MyCaller(fusion.SenderEventHandler, if self._to: self._send_request() - def receiver_closed(self, receiver_link, error): + def receiver_remote_closed(self, receiver_link, error): + LOG.debug("receiver remote closed callback") + assert receiver_link is self._receiver + self._receiver.close() + + def receiver_closed(self, receiver_link): LOG.debug("Receiver closed callback") assert receiver_link is self._receiver self._response_received = True @@ -222,6 +293,9 @@ def main(argv=None): help="The address of the server [amqp://0.0.0.0:5672]") parser.add_option("-t", "--timeout", dest="timeout", type="int", help="timeout used when waiting for reply, in seconds") + parser.add_option("--repeat", dest="repeat", type="int", + default=1, + help="Repeat the RPC call REPEAT times (0 == forever)") parser.add_option("--trace", dest="trace", action="store_true", help="enable protocol tracing") parser.add_option("--debug", dest="debug", action="store_true", @@ -263,26 +337,42 @@ def main(argv=None): conn_properties = {} if opts.trace: conn_properties["trace"] = True - my_connection = MyConnection( "to-server", my_socket, - container, conn_properties) + + my_connection = MyConnection( "to-server", container, conn_properties) + # Create the RPC caller method = {'method': method_info[0], 'args': dict([(method_info[i], method_info[i+1]) for i in range(1, len(method_info), 2)])} - my_caller = MyCaller( method, - my_connection, - "my-source-address", - "my-target-address", - receiver_properties={"capacity": 1}, - sender_properties={}) + my_caller = my_connection.create_caller( method, + "my-source-address", + "my-target-address", + receiver_properties={}, + sender_properties={}) + my_connection.connect(my_socket) - while not my_caller.done(): - my_connection.process() + repeat = 0 + while opts.repeat == 0 or repeat < opts.repeat: - LOG.debug("RPC completed, closing connections") + LOG.debug("Requesting RPC...") - my_caller.close() + my_caller.connect() + while not my_caller.done(): + my_connection.process() + + LOG.debug("RPC completed! Closing caller...") + + my_caller.close() + + while not my_caller.closed(): + my_connection.process() + + LOG.debug("Caller closed cleanly!") + + repeat += 1 + + print("Closing connections") my_connection.close() while not my_connection.closed: my_connection.process() diff --git a/examples/python/rpc-server.py b/examples/python/rpc-server.py index 478ac37..4188bde 100644 --- a/examples/python/rpc-server.py +++ b/examples/python/rpc-server.py @@ -38,9 +38,16 @@ map sent in the request. """ -# Maps of outgoing and incoming links -sender_links = {} # indexed by Source address -receiver_links = {} # indexed by Target address +# Maps of outgoing and incoming links. These are indexed by +# (remote-container-name, link-name) +sender_links = {} +receiver_links = {} + +# Map reply-to address to the proper sending link (indexed by address) +reply_senders = {} + +# database of all active SocketConnections +socket_connections = {} # indexed by name class SocketConnection(fusion.ConnectionEventHandler): @@ -55,6 +62,16 @@ class SocketConnection(fusion.ConnectionEventHandler): self.connection.sasl.mechanisms("ANONYMOUS") self.connection.sasl.server() self.connection.open() + self.done = False + + def destroy(self): + self.done = True + if self.connection: + self.connection.destroy() + self.connection = None + if self.socket: + self.socket.close() + self.socket = None def fileno(self): """Allows use of a SocketConnection in a select() call. @@ -80,43 +97,58 @@ class SocketConnection(fusion.ConnectionEventHandler): def connection_active(self, connection): LOG.debug("Connection active callback") - def connection_closed(self, connection, reason): - LOG.debug("Connection closed callback") + def connection_remote_closed(self, connection, reason): + LOG.debug("Connection remote closed callback") + assert self.connection is connection + self.connection.close() + + def connection_closed(self, connection): + LOG.debug("connection closed.") + # main loop will destroy + self.done = True def sender_requested(self, connection, link_handle, - requested_source, properties): + name, requested_source, properties): LOG.debug("sender requested callback") global sender_links + global reply_senders + + # reject if name conflict + remote_container = connection.remote_container + ident = (remote_container, name) + if ident in sender_links: + connection.reject_sender(link_handle, "link name in use") + return - name = uuid.uuid4().hex # allow for requested_source address if it doesn't conflict with an - # existing address - if not requested_source or requested_source in sender_links: - requested_source = "/%s/%s" % (connection.container.name, - name) - assert requested_source not in sender_links + # existing address, otherwise override + if not requested_source or requested_source in reply_senders: + requested_source = uuid.uuid4().hex + assert requested_source not in reply_senders - sender = MySenderLink(connection, link_handle, requested_source, - name, {}) - sender_links[requested_source] = sender + sender = MySenderLink(ident, connection, link_handle, requested_source) + sender_links[ident] = sender + reply_senders[requested_source] = sender print("New Sender link created, source=%s" % requested_source) def receiver_requested(self, connection, link_handle, - requested_target, properties): + name, requested_target, properties): LOG.debug("receiver requested callback") - # allow for requested_source address if it doesn't conflict with an - # existing address global receiver_links - name = uuid.uuid4().hex - if not requested_target or requested_target in receiver_links: - requested_target = "/%s/%s" % (connection.container.name, - name) - assert requested_target not in receiver_links + # reject if name conflict + remote_container = connection.remote_container + ident = (remote_container, name) + if ident in receiver_links: + connection.reject_sender(link_handle, "link name in use") + return - receiver = MyReceiverLink(connection, link_handle, - requested_target, name) - receiver_links[requested_target] = receiver + # I don't use the target address, but supply one if necessary + if not requested_target: + requested_target = uuid.uuid4().hex + + receiver = MyReceiverLink(ident, connection, link_handle, requested_target) + receiver_links[ident] = receiver print("New Receiver link created, target=%s" % requested_target) # SASL callbacks: @@ -132,13 +164,14 @@ class SocketConnection(fusion.ConnectionEventHandler): class MySenderLink(fusion.SenderEventHandler): """ """ - def __init__(self, connection, link_handle, source_address, - name, properties): + def __init__(self, ident, connection, link_handle, + source_address, properties={}): + self._ident = ident + self._source_address = source_address self.sender_link = connection.accept_sender(link_handle, source_address, self, - name, properties) self.sender_link.open() @@ -147,8 +180,21 @@ class MySenderLink(fusion.SenderEventHandler): def sender_active(self, sender_link): LOG.debug("sender active callback") - def sender_closed(self, sender_link, error): + def sender_remote_closed(self, sender_link, error): + LOG.debug("sender remote closed callback") + self.sender_link.close() + + def sender_closed(self, sender_link): LOG.debug("sender closed callback") + global sender_links + global reply_senders + + if self._ident in sender_links: + del sender_links[self._ident] + if self._source_address in reply_senders: + del reply_senders[self._source_address] + self.sender_link.destroy() + self.sender_link = None # 'message sent' callback: @@ -159,12 +205,13 @@ class MySenderLink(fusion.SenderEventHandler): class MyReceiverLink(fusion.ReceiverEventHandler): """ """ - def __init__(self, connection, link_handle, target_address, - name, properties={}): + def __init__(self, ident, connection, link_handle, target_address, + properties={}): + self._ident = ident + self._target_address = target_address self._link = connection.accept_receiver(link_handle, target_address, self, - name, properties) self._link.open() @@ -173,22 +220,32 @@ class MyReceiverLink(fusion.ReceiverEventHandler): LOG.debug("receiver active callback") self._link.add_capacity(5) - def receiver_closed(self, receiver_link, error): + def receiver_remote_closed(self, receiver_link, error): + LOG.debug("receiver remote closed callback") + self._link.close() + + def receiver_closed(self, receiver_link): LOG.debug("receiver closed callback") + global receiver_links + + if self._ident in receiver_links: + del receiver_links[self._ident] + self._link.destroy() + self._link = None def message_received(self, receiver_link, message, handle): LOG.debug("message received callback") - global sender_links + global reply_senders # extract to reply-to, correlation id reply_to = message.reply_to - if not reply_to or reply_to not in sender_links: + if not reply_to or reply_to not in reply_senders: LOG.error("sender for reply-to not found, reply-to=%s", str(reply_to)) self._link.message_rejected(handle, "Bad reply-to address") else: - my_sender = sender_links[reply_to] + my_sender = reply_senders[reply_to] correlation_id = message.correlation_id method_map = message.body if (not isinstance(method_map, dict) or @@ -260,7 +317,7 @@ def main(argv=None): # create an AMQP container that will 'provide' the RPC service # container = fusion.Container("example RPC service") - socket_connections = {} # indexed by name (uuid) + global socket_connections while True: @@ -292,6 +349,7 @@ def main(argv=None): readable,writable,ignore = select.select(readfd,writefd,[],timeout) LOG.debug("select() returned") + worked = [] for r in readable: if r is my_socket: # new inbound connection request @@ -305,19 +363,38 @@ def main(argv=None): client_socket, container, conn_properties) + LOG.debug("new connection created name=%s" % name) + else: assert isinstance(r, SocketConnection) rc = r.process_input() + worked.append(r) for t in timers: now = time.time() if t.next_tick > now: break t.process(now) + sc = t.user_context + assert isinstance(sc, SocketConnection) + worked.append(sc) for w in writable: assert isinstance(w, SocketConnection) rc = w.send_output() + worked.append(w) + + # nuke any completed connections: + closed = False + while worked: + sc = worked.pop() + if sc.done: + if sc.name in socket_connections: + del socket_connections[sc.name] + sc.destroy() + closed = True + if closed: + LOG.debug("%d active connections present" % len(socket_connections)) return 0 diff --git a/python/connection.py b/python/connection.py index d92f050..d689e71 100644 --- a/python/connection.py +++ b/python/connection.py @@ -35,17 +35,22 @@ class ConnectionEventHandler(object): """connection handshake completed""" LOG.debug("connection_active (ignored)") - def connection_closed(self, connection, reason): + def connection_remote_closed(self, connection, error=None): + LOG.debug("connection_remote_closed (ignored)") + + def connection_closed(self, connection): LOG.debug("connection_closed (ignored)") def sender_requested(self, connection, link_handle, - requested_source, properties={}): + name, requested_source, + properties={}): # call accept_sender to accept new link, # reject_sender to reject it. LOG.debug("sender_requested (ignored)") def receiver_requested(self, connection, link_handle, - requested_target, properties={}): + name, requested_target, + properties={}): # call accept_sender to accept new link, # reject_sender to reject it. LOG.debug("receiver_requested (ignored)") @@ -81,10 +86,10 @@ class Connection(object): if properties.get("trace"): self._pn_transport.trace(proton.Transport.TRACE_FRM) - self._sender_links = {} - self._receiver_links = {} - self._pending_links = {} - self._pending_link_id = 0 + # indexed by link-name + self._sender_links = {} # SenderLink or pn_link if pending + self._receiver_links = {} # ReceiverLink or pn_link if pending + self._read_done = False self._write_done = False self._next_tick = 0 @@ -101,18 +106,25 @@ class Connection(object): @property # @todo - hopefully remove - def transport(self): + def pn_transport(self): return self._pn_transport @property # @todo - hopefully remove - def connection(self): + def pn_connection(self): return self._pn_connection @property def name(self): return self._name + @property + def remote_container(self): + """Return the name of the remote container. Should be present once the + connection is active. + """ + return self._pn_connection.remote_container + @property # @todo - think about server side use of sasl! def sasl(self): @@ -131,9 +143,73 @@ class Connection(object): Associate an arbitrary user object with this Connection. """) - _NEED_INIT = proton.Endpoint.LOCAL_UNINIT - _NEED_CLOSE = (proton.Endpoint.LOCAL_ACTIVE|proton.Endpoint.REMOTE_CLOSED) + def open(self): + """ + """ + self._pn_connection.open() + self._pn_session = self._pn_connection.session() + self._pn_session.open() + + def close(self, error=None): + """ + """ + for l in self._sender_links.itervalues(): + l.close(error) + for l in self._receiver_links.itervalues(): + l.close(error) + self._pn_session.close() + self._pn_connection.close() + + @property + def closed(self): + #return self._write_done and self._read_done + state = self._pn_connection.state + return state == (proton.Endpoint.LOCAL_CLOSED + | proton.Endpoint.REMOTE_CLOSED) + + def destroy(self): + """ + """ + self._sender_links.clear() + self._receiver_links.clear() + self._container._remove_connection(self._name) + self._container = None + self._pn_connection = None + self._pn_transport = None + self._user_context = None + + def _link_requested(self, pn_link): + if pn_link.is_sender and pn_link.name not in self._sender_links: + LOG.debug("Remotely initiated Sender needs init") + self._sender_links[pn_link.name] = pn_link + pn_link.context = None # @todo: update proton.py + req_source = "" + if pn_link.remote_source.dynamic: + req_source = None + elif pn_link.remote_source.address: + req_source = pn_link.remote_source.address + self._handler.sender_requested(self, pn_link.name, + pn_link.name, req_source, + {"target-address": + pn_link.remote_target.address}) + elif pn_link.is_receiver and pn_link.name not in self._receiver_links: + LOG.debug("Remotely initiated Receiver needs init") + self._receiver_links[pn_link.name] = pn_link + pn_link.context = None # @todo: update proton.py + req_target = "" + if pn_link.remote_target.dynamic: + req_target = None + elif pn_link.remote_target.address: + req_target = pn_link.remote_target.address + self._handler.receiver_requested(self, pn_link.name, + pn_link.name, req_target, + {"source-address": + pn_link.remote_source.address}) + + _REMOTE_REQ = (proton.Endpoint.LOCAL_UNINIT|proton.Endpoint.REMOTE_ACTIVE) + _REMOTE_CLOSE = (proton.Endpoint.LOCAL_ACTIVE|proton.Endpoint.REMOTE_CLOSED) _ACTIVE = (proton.Endpoint.LOCAL_ACTIVE|proton.Endpoint.REMOTE_ACTIVE) + _CLOSED = (proton.Endpoint.LOCAL_CLOSED|proton.Endpoint.REMOTE_CLOSED) def process(self, now): """ @@ -152,103 +228,112 @@ Associate an arbitrary user object with this Connection. self._handler.sasl_done(self, self._pn_sasl.outcome) self._pn_sasl = None - if self._pn_connection.state & self._NEED_INIT: - assert False, "Connection always opened() on create" + # do endpoint up handling: - if not self._active: - if self._pn_connection.state == self._ACTIVE: + if self._pn_connection.state == self._ACTIVE: + if not self._active: self._active = True self._handler.connection_active(self) - ssn = self._pn_connection.session_head(self._NEED_INIT) - while ssn: + pn_session = self._pn_connection.session_head(proton.Endpoint.LOCAL_UNINIT) + while pn_session: LOG.debug("Opening remotely initiated session") - ssn.open() - ssn = ssn.next(self._NEED_INIT) + pn_session.open() + pn_session = pn_session.next(proton.Endpoint.LOCAL_UNINIT) - link = self._pn_connection.link_head(self._NEED_INIT) - while link: - LOG.debug("Remotely initiated Link needs init") - index = self._pending_link_id - self._pending_link_id += 1 - assert index not in self._pending_links - self._pending_links[index] = link - if link.is_sender: - req_source = "" - if link.remote_source.dynamic: - req_source = None - elif link.remote_source.address: - req_source = link.remote_source.address - self._handler.sender_requested(self, index, req_source, - {"target-address": - link.remote_target.address}) - else: - req_target = "" - if link.remote_target.dynamic: - req_target = None - elif link.remote_target.address: - req_target = link.remote_target.address - self._handler.receiver_requested(self, index, req_target, - {"source-address": - link.remote_source.address}) - link = link.next(self._NEED_INIT) + pn_link = self._pn_connection.link_head(self._REMOTE_REQ) + while pn_link: + next_link = pn_link.next(proton.Endpoint.LOCAL_UNINIT) - # @todo: won't scale - link = self._pn_connection.link_head(self._ACTIVE) - while link: - if link.context and not link.context._active: - if link.is_sender: - sender_link = link.context + if pn_link.state == self._REMOTE_REQ: + self._link_requested(pn_link) + + pn_link = next_link + + # @todo: won't scale? + pn_link = self._pn_connection.link_head(self._ACTIVE) + while pn_link: + next_link = pn_link.next(self._ACTIVE) + + if pn_link.context and not pn_link.context._active: + LOG.debug("Link is up") + pn_link.context._active = True + if pn_link.is_sender: + sender_link = pn_link.context + assert isinstance(sender_link, SenderLink) sender_link._handler.sender_active(sender_link) else: - receiver_link = link.context + receiver_link = pn_link.context + assert isinstance(receiver_link, ReceiverLink) receiver_link._handler.receiver_active(receiver_link) - link.context._active = True - link = link.next(self._ACTIVE) + pn_link = next_link # process the work queue - delivery = self._pn_connection.work_head - while delivery: + pn_delivery = self._pn_connection.work_head + while pn_delivery: LOG.debug("Delivery updated!") - if delivery.link.context: - if delivery.link.is_sender: - sender_link = delivery.link.context - sender_link._delivery_updated(delivery) + next_delivery = pn_delivery.work_next + if pn_delivery.link.context: + if pn_delivery.link.is_sender: + sender_link = pn_delivery.link.context + sender_link._delivery_updated(pn_delivery) else: - receiver_link = delivery.link.context - receiver_link._delivery_updated(delivery) - delivery = delivery.work_next + receiver_link = pn_delivery.link.context + receiver_link._delivery_updated(pn_delivery) + pn_delivery = next_delivery - # close all endpoints closed by remotes + # do endpoint down handling: - link = self._pn_connection.link_head(self._NEED_CLOSE) - while link: + pn_link = self._pn_connection.link_head(self._REMOTE_CLOSE) + while pn_link: LOG.debug("Link closed remotely") - link.close() + next_link = pn_link.next(self._REMOTE_CLOSE) # @todo: error reporting - if link.context: - if link.is_sender: - sender_link = link.context - sender_link._handler.sender_closed(sender_link, None) + if pn_link.context: + if pn_link.is_sender: + sender_link = pn_link.context + sender_link._handler.sender_remote_closed(sender_link, None) else: - receiver_link = link.context - receiver_link._handler.receiver_closed(receiver_link, - None) - link = link.next(self._NEED_CLOSE) + receiver_link = pn_link.context + receiver_link._handler.receiver_remote_closed(receiver_link, + None) + pn_link = next_link - ssn = self._pn_connection.session_head(self._NEED_CLOSE) - while ssn: + pn_link = self._pn_connection.link_head(self._CLOSED) + while pn_link: + next_link = pn_link.next(self._CLOSED) + if pn_link.context and pn_link.context._active: + LOG.debug("Link close completed") + pn_link.context._active = False + if pn_link.is_sender: + sender_link = pn_link.context + sender_link._handler.sender_closed(sender_link) + else: + receiver_link = pn_link.context + receiver_link._handler.receiver_closed(receiver_link) + pn_link = next_link + + pn_session = self._pn_connection.session_head(self._REMOTE_CLOSE) + while pn_session: LOG.debug("Session closed remotely") - ssn.close() - ssn = ssn.next(self._NEED_CLOSE) + pn_session.close() + pn_session = pn_session.next(self._REMOTE_CLOSE) - if self._pn_connection.state == (self._NEED_CLOSE): + if self._pn_connection.state == self._REMOTE_CLOSE: LOG.debug("Connection remotely closed") - # @todo - think about handling this wrt links! - cond = self._pn_connection.remote_condition - self._pn_connection.close() - self._handler.connection_closed(self, cond) + self._handler.connection_remote_closed(self, None) + elif self._pn_connection.state == self._CLOSED: + LOG.debug("Connection close complete") + self._handler.connection_closed(self) + + # DEBUG LINK "LEAK" + # count = 0 + # link = self._pn_connection.link_head(0) + # while link: + # count += 1 + # link = link.next(0) + # print "Link Count %d" % count return self._next_tick @@ -328,7 +413,7 @@ Associate an arbitrary user object with this Connection. pn_link = self._pn_session.sender(ident) if pn_link: - s = SenderLink(self, pn_link, ident, + s = SenderLink(self, pn_link, source_address, target_address, eventHandler, properties) self._sender_links[ident] = s @@ -336,30 +421,29 @@ Associate an arbitrary user object with this Connection. return None def accept_sender(self, link_handle, source_override=None, - event_handler=None, name=None, properties={}): - pn_link = self._pending_links.pop(link_handle) - if not pn_link: + event_handler=None, properties={}): + pn_link = self._sender_links.get(link_handle) + if not pn_link or not isinstance(pn_link, proton.Sender): raise Exception("Invalid link_handle: %s" % link_handle) if pn_link.remote_source.dynamic and not source_override: raise Exception("A source address must be supplied!") source_addr = source_override or pn_link.remote_source.address - name = name or source_addr - if name in self._sender_links: - raise KeyError("Sender %s already exists!" % name) - self._sender_links[name] = SenderLink(self, pn_link, name, - source_addr, - pn_link.remote_target.address, - event_handler, properties) - return self._sender_links[name] + self._sender_links[link_handle] = SenderLink(self, pn_link, + source_addr, + pn_link.remote_target.address, + event_handler, properties) + return self._sender_links[link_handle] def reject_sender(self, link_handle, reason): - pn_link = self._pending_links.pop(link_handle) - if pn_link: - # @todo support reason for close - pn_link.close() + pn_link = self._sender_links.get(link_handle) + if not pn_link or not isinstance(pn_link, proton.Sender): + raise Exception("Invalid link_handle: %s" % link_handle) + del self._sender_links[link_handle] + # @todo support reason for close + pn_link.close() - def create_receiver(self, target_address, source_address, - eventHandler, name=None, properties={}): + def create_receiver(self, target_address, source_address=None, + eventHandler=None, name=None, properties={}): """Factory for Receive links""" ident = name or str(target_address) if ident in self._receiver_links: @@ -367,73 +451,34 @@ Associate an arbitrary user object with this Connection. pn_link = self._pn_session.receiver(ident) if pn_link: - r = ReceiverLink(self, pn_link, ident, target_address, + r = ReceiverLink(self, pn_link, target_address, source_address, eventHandler, properties) - if r: - self._receiver_links[ident] = r - return r + self._receiver_links[ident] = r + return r return None def accept_receiver(self, link_handle, target_override=None, - event_handler=None, name=None, properties={}): - pn_link = self._pending_links.pop(link_handle) - if not pn_link: + event_handler=None, properties={}): + pn_link = self._receiver_links.get(link_handle) + if not pn_link or not isinstance(pn_link, proton.Receiver): raise Exception("Invalid link_handle: %s" % link_handle) if pn_link.remote_target.dynamic and not target_override: raise Exception("A target address must be supplied!") target_addr = target_override or pn_link.remote_target.address - name = name or target_addr - if name in self._receiver_links: - raise KeyError("Receiver %s already exists!" % name) - self._receiver_links[name] = ReceiverLink(self, pn_link, name, - target_addr, - pn_link.remote_source.address, - event_handler, properties) - return self._receiver_links[name] - - pass + self._receiver_links[link_handle] = ReceiverLink(self, pn_link, + target_addr, + pn_link.remote_source.address, + event_handler, properties) + return self._receiver_links[link_handle] def reject_receiver(self, link_handle, reason): - pn_link = self._pending_links.pop(link_handle) - if pn_link: - # @todo support reason for close - pn_link.close() + pn_link = self._receiver_links.get(link_handle) + if not pn_link or not isinstance(pn_link, proton.Receiver): + raise Exception("Invalid link_handle: %s" % link_handle) + del self._receiver_links[link_handle] + # @todo support reason for close + pn_link.close() - def open(self): - """ - """ - self._pn_connection.open() - self._pn_session = self._pn_connection.session() - self._pn_session.open() - - def close(self, error=None): - """ - """ - for l in self._sender_links.itervalues(): - l.close(error) - for l in self._receiver_links.itervalues(): - l.close(error) - self._pn_session.close() - self._pn_connection.close() - - @property - def closed(self): - #return self._write_done and self._read_done - state = self._pn_connection.state - return state == (proton.Endpoint.LOCAL_CLOSED - | proton.Endpoint.REMOTE_CLOSED) - - def destroy(self): - """ - """ - self._pending_links.clear() - self._sender_links.clear() - self._receiver_links.clear() - self._container._remove_connection(self._name) - self._container = None - self._pn_connection = None - self._pn_transport = None - self._user_context = None def _remove_sender(self, name): if name in self._sender_links: diff --git a/python/link.py b/python/link.py index 998677a..3a0225a 100644 --- a/python/link.py +++ b/python/link.py @@ -24,15 +24,17 @@ LOG = logging.getLogger(__name__) class _Link(object): """Generic Link base class""" - def __init__(self, connection, pn_link, name, + def __init__(self, connection, pn_link, target_address, source_address, handler, properties): self._connection = connection - self._name = name + self._name = pn_link.name self._handler = handler self._properties = properties + self._user_context = None + self._active = False + # @todo: raise jira to add 'context' attr to api self._pn_link = pn_link - # @todo: raise jira to add 'context' to api pn_link.context = self if target_address is None: @@ -58,8 +60,10 @@ class _Link(object): else: raise Exception("Unknown distribution mode: %s" % str(desired_mode)) - self._user_context = None - self._active = False + + @property + def name(self): + return self._name def open(self): """ @@ -107,18 +111,21 @@ Associate an arbitrary application object with this link. | proton.Endpoint.REMOTE_CLOSED) def destroy(self): + LOG.debug("link destroyed %s" % str(self._pn_link)) self._user_context = None self._pn_link.context = None self._pn_link = None - class SenderEventHandler(object): """ """ def sender_active(self, sender_link): LOG.debug("sender_active (ignored)") - def sender_closed(self, sender_link, error=None): + def sender_remote_closed(self, sender_link, error=None): + LOG.debug("sender_remote_closed (ignored)") + + def sender_closed(self, sender_link): LOG.debug("sender_closed (ignored)") @@ -134,9 +141,9 @@ class SenderLink(_Link): RELEASED = 3 MODIFIED = 4 - def __init__(self, connection, pn_link, name, source_address, + def __init__(self, connection, pn_link, source_address, target_address, eventHandler, properties): - super(SenderLink, self).__init__(connection, pn_link, name, + super(SenderLink, self).__init__(connection, pn_link, target_address, source_address, eventHandler, properties) self._pending_sends = collections.deque() @@ -242,7 +249,10 @@ class ReceiverEventHandler(object): def receiver_active(self, receiver_link): LOG.debug("receiver_active (ignored)") - def receiver_closed(self, receiver_link, error=None): + def receiver_remote_closed(self, receiver_link, error=None): + LOG.debug("receiver_remote_closed (ignored)") + + def receiver_closed(self, receiver_link): LOG.debug("receiver_closed (ignored)") def message_received(self, receiver_link, message, handle): @@ -250,9 +260,9 @@ class ReceiverEventHandler(object): class ReceiverLink(_Link): - def __init__(self, connection, pn_link, name, target_address, + def __init__(self, connection, pn_link, target_address, source_address, eventHandler, properties): - super(ReceiverLink, self).__init__(connection, pn_link, name, + super(ReceiverLink, self).__init__(connection, pn_link, target_address, source_address, eventHandler, properties) self._next_handle = 0 diff --git a/python/sockets.py b/python/sockets.py index 49131c3..932ba0f 100644 --- a/python/sockets.py +++ b/python/sockets.py @@ -26,7 +26,7 @@ LOG = logging.getLogger(__name__) processing. """ -def read_socket_input(connection, socket): +def read_socket_input(connection, socket_obj): """Read from the network layer and processes all data read. Can support both blocking and non-blocking sockets. Returns the number of input bytes processed, or EOS if input processing @@ -37,7 +37,7 @@ def read_socket_input(connection, socket): return count # 0 or EOS try: - sock_data = socket.recv(count) + sock_data = socket_obj.recv(count) except socket.timeout, e: LOG.debug("Socket timeout exception %s", str(e)) raise # caller must handle @@ -64,7 +64,7 @@ def read_socket_input(connection, socket): connection.close_input() return count -def write_socket_output(connection, socket): +def write_socket_output(connection, socket_obj): """Write data to the network layer. Can support both blocking and non-blocking sockets. """ @@ -74,7 +74,7 @@ def write_socket_output(connection, socket): data = connection.output_data() try: - count = socket.send(data) + count = socket_obj.send(data) except socket.timeout, e: LOG.debug("Socket timeout exception %s", str(e)) raise # caller must handle