From 4691966002a6a90cacf32cbab2197cc5af12ea4f Mon Sep 17 00:00:00 2001 From: Kenneth Giusti Date: Tue, 1 Apr 2014 09:55:55 -0400 Subject: [PATCH] Port to Proton 0.7 RC3. --- examples/python/recv.py | 8 ++ examples/python/rpc-server.py | 10 +- examples/python/server.py | 18 ++- examples/python/utils.py | 3 + python/dingus/connection.py | 198 +++++++++++++------------- python/dingus/endpoint.py | 131 +++++++++++++++++ python/dingus/link.py | 185 ++++++------------------ tests/python/unit_tests/connection.py | 4 + tests/python/unit_tests/link.py | 8 +- 9 files changed, 310 insertions(+), 255 deletions(-) create mode 100644 python/dingus/endpoint.py diff --git a/examples/python/recv.py b/examples/python/recv.py index 8f1571e..c28b1f9 100755 --- a/examples/python/recv.py +++ b/examples/python/recv.py @@ -19,6 +19,7 @@ # """ Minimal message receive example code.""" +import logging import optparse import sys import uuid @@ -28,6 +29,9 @@ from utils import connect_socket from utils import get_host_port from utils import process_connection +LOG = logging.getLogger() +LOG.addHandler(logging.StreamHandler()) + def main(argv=None): @@ -39,6 +43,8 @@ def main(argv=None): parser.add_option("--idle", dest="idle_timeout", type="int", default=0, help="Idle timeout for connection (seconds).") + parser.add_option("--debug", dest="debug", action="store_true", + help="enable debug logging") parser.add_option("--source", dest="source_addr", type="string", help="Address for link source.") parser.add_option("--target", dest="target_addr", type="string", @@ -49,6 +55,8 @@ def main(argv=None): help="Certificate Authority PEM file") opts, extra = parser.parse_args(args=argv) + if opts.debug: + LOG.setLevel(logging.DEBUG) host, port = get_host_port(opts.server) my_socket = connect_socket(host, port) diff --git a/examples/python/rpc-server.py b/examples/python/rpc-server.py index baede93..bbed503 100755 --- a/examples/python/rpc-server.py +++ b/examples/python/rpc-server.py @@ -42,7 +42,7 @@ import uuid # from guppy import hpy # hp = hpy() -from proton import Message +from proton import Message, Condition import dingus LOG = logging.getLogger() @@ -268,7 +268,9 @@ class MyReceiverLink(dingus.ReceiverEventHandler): if not reply_to or reply_to not in reply_senders: LOG.error("sender for reply-to not found, reply-to=%s", str(reply_to)) - self._link.message_rejected(handle, "Bad reply-to address") + info = Condition("not-found", + "Bad reply-to address: %s" % str(reply_to)) + self._link.message_rejected(handle, info) else: my_sender = reply_senders[reply_to] correlation_id = message.correlation_id @@ -276,7 +278,9 @@ class MyReceiverLink(dingus.ReceiverEventHandler): if (not isinstance(method_map, dict) or 'method' not in method_map): LOG.error("no method given, map=%s", str(method_map)) - self._link.message_rejected(handle, "Bad format") + info = Condition("invalid-field", + "no method given, map=%s" % str(method_map)) + self._link.message_rejected(handle, info) else: response = Message() response.address = reply_to diff --git a/examples/python/server.py b/examples/python/server.py index c96992b..a07d435 100755 --- a/examples/python/server.py +++ b/examples/python/server.py @@ -171,8 +171,8 @@ class MySenderLink(dingus.SenderEventHandler): def sender_active(self, sender_link): LOG.debug("Sender: Active") - # TODO(kgiusti) - need credit granted callback: - self.credit_granted(sender_link) + if sender_link.credit > 0: + self.send_message() def sender_remote_closed(self, sender_link, error): LOG.debug("Sender: Remote closed") @@ -186,7 +186,8 @@ class MySenderLink(dingus.SenderEventHandler): def credit_granted(self, sender_link): LOG.debug("Sender: credit granted") # Send a single message: - self.send_message() + if sender_link.credit > 0: + self.send_message() # 'message sent' callback: def __call__(self, sender, handle, status, error=None): @@ -234,6 +235,8 @@ class MyReceiverLink(dingus.ReceiverEventHandler): self.receiver_link.message_accepted(handle) print("Message received on Receiver link %s, message=%s" % (self.receiver_link.name, str(message))) + if receiver_link.capacity < 1: + receiver_link.add_capacity(1) def main(argv=None): @@ -291,7 +294,7 @@ def main(argv=None): [], timeout) LOG.debug("select() returned") - worked = [] + worked = set() for r in readable: if r is my_socket: # new inbound connection request received, @@ -316,8 +319,9 @@ def main(argv=None): LOG.debug("new connection created name=%s", name) else: + assert isinstance(r, SocketConnection) r.process_input() - worked.append(r) + worked.add(r) for t in timers: now = time.time() @@ -326,12 +330,12 @@ def main(argv=None): t.process(now) sc = t.user_context assert isinstance(sc, SocketConnection) - worked.append(sc) + worked.add(sc) for w in writable: assert isinstance(w, SocketConnection) w.send_output() - worked.append(w) + worked.add(w) # nuke any completed connections: closed = False diff --git a/examples/python/utils.py b/examples/python/utils.py index 9288b3c..67ba39d 100644 --- a/examples/python/utils.py +++ b/examples/python/utils.py @@ -75,6 +75,9 @@ def server_socket(host, port, backlog=10): def process_connection(connection, my_socket): """Handle I/O and Timers on a single Connection.""" + if connection.closed: + return False + work = False readfd = [] writefd = [] diff --git a/python/dingus/connection.py b/python/dingus/connection.py index 7115381..68834c7 100644 --- a/python/dingus/connection.py +++ b/python/dingus/connection.py @@ -22,13 +22,12 @@ __all__ = [ import heapq import logging +import proton import time from warnings import warn from dingus.link import _SessionProxy - -import proton - +from dingus.endpoint import Endpoint LOG = logging.getLogger(__name__) @@ -77,7 +76,7 @@ class ConnectionEventHandler(object): LOG.debug("sasl_done (ignored)") -class Connection(object): +class Connection(Endpoint): """A Connection to a peer.""" EOS = -1 # indicates 'I/O stream closed' @@ -130,6 +129,7 @@ class Connection(object): SSL (eg, plain TCP). Used by a server that will accept clients requesting either trusted or untrusted connections. """ + super(Connection, self).__init__() self._name = name self._container = container self._handler = event_handler @@ -138,6 +138,8 @@ class Connection(object): self._pn_connection.container = container.name self._pn_transport = proton.Transport() self._pn_transport.bind(self._pn_connection) + self._pn_collector = proton.Collector() + self._pn_connection.collect(self._pn_collector) if properties: if 'hostname' in properties: @@ -242,35 +244,27 @@ class Connection(object): @property def active(self): """Return True if both ends of the Connection are open.""" - state = self._pn_connection.state - return (state == (proton.Endpoint.LOCAL_ACTIVE - | proton.Endpoint.REMOTE_ACTIVE)) + return self._endpoint_state == self._ACTIVE @property def closed(self): - """Return True if the Connection has closed.""" - state = self._pn_connection.state - # if closed in error, state may not be correct: - return (state == (proton.Endpoint.LOCAL_CLOSED - | proton.Endpoint.REMOTE_CLOSED) - or (self._write_done and self._read_done)) + """Return True if the Connection has finished closing.""" + return (self._write_done and self._read_done) def destroy(self): self._sender_links.clear() self._receiver_links.clear() self._container._remove_connection(self._name) self._container = None + self._pn_collector = None self._pn_connection = None self._pn_transport = None self._user_context = None _REMOTE_REQ = (proton.Endpoint.LOCAL_UNINIT | proton.Endpoint.REMOTE_ACTIVE) - _REMOTE_CLOSE = (proton.Endpoint.LOCAL_ACTIVE - | proton.Endpoint.REMOTE_CLOSED) - _ACTIVE = (proton.Endpoint.LOCAL_ACTIVE | proton.Endpoint.REMOTE_ACTIVE) _CLOSED = (proton.Endpoint.LOCAL_CLOSED | proton.Endpoint.REMOTE_CLOSED) - _LOCAL_UNINIT = proton.Endpoint.LOCAL_UNINIT + _ACTIVE = (proton.Endpoint.LOCAL_ACTIVE | proton.Endpoint.REMOTE_ACTIVE) def process(self, now): """Perform connection state processing.""" @@ -278,7 +272,8 @@ class Connection(object): raise RuntimeError("Connection.process() is not re-entrant!") self._in_process = True try: - + # if the connection has hit an unrecoverable error, + # nag the application until connection is destroyed if self._error: if self._handler: self._handler.connection_failed(self, self._error) @@ -302,41 +297,6 @@ class Connection(object): self._pn_sasl.outcome) self._pn_sasl = None - # do endpoint up handling: - - if self._pn_connection.state == self._ACTIVE: - if not self._active: - self._active = True - if self._handler: - self._handler.connection_active(self) - - pn_session = self._pn_connection.session_head(self._LOCAL_UNINIT) - while pn_session: - LOG.debug("Opening remotely initiated session") - session = _SessionProxy(self, pn_session) - session.open() - pn_session = pn_session.next(self._LOCAL_UNINIT) - - pn_link = self._pn_connection.link_head(self._REMOTE_REQ) - while pn_link: - next_pn_link = pn_link.next(self._REMOTE_REQ) - session = pn_link.session.context - if (pn_link.is_sender and - pn_link.name not in self._sender_links): - LOG.debug("Remotely initiated Sender %s needs init", - pn_link.name) - link = session.request_sender(pn_link) - self._sender_links[pn_link.name] = link - link._process_endpoints() - elif (pn_link.is_receiver and - pn_link.name not in self._receiver_links): - LOG.debug("Remotely initiated Receiver %s needs init", - pn_link.name) - link = session.request_receiver(pn_link) - self._receiver_links[pn_link.name] = link - link._process_endpoints() - pn_link = next_pn_link - # process timer events: timer_deadline = self._expire_timers(now) transport_deadline = self._pn_transport.tick(now) @@ -345,62 +305,69 @@ class Connection(object): else: self._next_deadline = timer_deadline or transport_deadline - # TODO(kgiusti) won't scale - use new Engine event API - pn_link = self._pn_connection.link_head(self._ACTIVE) - while pn_link: - next_pn_link = pn_link.next(self._ACTIVE) - pn_link.context._process_endpoints() - pn_link.context._process_credit() - pn_link = next_pn_link + # process events from proton: + pn_event = self._pn_collector.peek() + while pn_event: + if pn_event.type == proton.Event.CONNECTION_REMOTE_STATE: + self.process_remote_state() - # process the delivery work queue - pn_delivery = self._pn_connection.work_head - while pn_delivery: - next_delivery = pn_delivery.work_next - pn_delivery.link.context._process_delivery(pn_delivery) - pn_delivery = next_delivery + elif pn_event.type == proton.Event.CONNECTION_LOCAL_STATE: + self.process_local_state() - # do endpoint down handling: + elif pn_event.type == proton.Event.SESSION_REMOTE_STATE: + pn_session = pn_event.session + # create a new session if requested by remote: + if (pn_session.state == self._REMOTE_REQ): + LOG.debug("Opening remotely initiated session") + session = _SessionProxy(self, pn_session) + pn_session.context.process_remote_state() - pn_link = self._pn_connection.link_head(self._REMOTE_CLOSE) - while pn_link: - next_pn_link = pn_link.next(self._REMOTE_CLOSE) - pn_link.context._process_endpoints() - pn_link = next_pn_link + elif pn_event.type == proton.Event.SESSION_LOCAL_STATE: + pn_session = pn_event.session + pn_session.context.process_local_state() - pn_link = self._pn_connection.link_head(self._CLOSED) - while pn_link: - next_pn_link = pn_link.next(self._CLOSED) - pn_link.context._process_endpoints() - pn_link = next_pn_link + elif pn_event.type == proton.Event.LINK_REMOTE_STATE: + pn_link = pn_event.link + # create a new link if requested by remote: + if (pn_link.state == self._REMOTE_REQ): + session = pn_link.session.context + if (pn_link.is_sender and + pn_link.name not in self._sender_links): + LOG.debug("Remotely initiated Sender needs init") + link = session.request_sender(pn_link) + self._sender_links[pn_link.name] = link + elif (pn_link.is_receiver and + pn_link.name not in self._receiver_links): + LOG.debug("Remotely initiated Receiver needs init") + link = session.request_receiver(pn_link) + self._receiver_links[pn_link.name] = link + pn_link.context.process_remote_state() - pn_session = self._pn_connection.session_head(self._REMOTE_CLOSE) - while pn_session: - LOG.debug("Session closed remotely") - next_session = pn_session.next(self._REMOTE_CLOSE) - session = pn_session.context - session.remote_closed() - pn_session = next_session + elif pn_event.type == proton.Event.LINK_LOCAL_STATE: + pn_link = pn_event.link + pn_link.context.process_local_state() - if self._pn_connection.state == self._REMOTE_CLOSE: - LOG.debug("Connection remotely closed") - if self._handler: - cond = self._pn_connection.remote_condition - self._handler.connection_remote_closed(self, cond) - elif self._pn_connection.state == self._CLOSED: - LOG.debug("Connection close complete") - self._next_deadline = 0 + elif pn_event.type == proton.Event.DELIVERY: + link = pn_event.link.context + pn_delivery = pn_event.delivery + link._process_delivery(pn_delivery) + + elif pn_event.type == proton.Event.LINK_FLOW: + link = pn_event.link.context + link._process_credit() + + self._pn_collector.pop() + pn_event = self._pn_collector.peek() + + # invoked closed callback after endpoint has fully closed and all + # pending I/O has completed: + if (self._active and + self._endpoint_state == self._CLOSED and + self._read_done and self._write_done): + self._active = False if self._handler: self._handler.connection_closed(self) - # DEBUG LINK "LEAK" - # count = 0 - # link = self._pn_connection.link_head(0) - # while link: - # count += 1 - # link = link.next(0) - # print "Link Count %d" % count - return self._next_deadline finally: @@ -441,6 +408,10 @@ class Connection(object): if rc: # error? self._read_done = True return self.EOS + # hack: check if this was the last input needed by the connection. + # If so, this will set the _read_done flag and the 'connection closed' + # callback can be issued on the next call to process() + self.needs_input return c def close_input(self, reason=None): @@ -481,7 +452,11 @@ class Connection(object): try: self._pn_transport.pop(count) except Exception as e: - self._connection_failed(str(e)) + return self._connection_failed(str(e)) + # hack: check if this was the last output from the connection. If so, + # this will set the _write_done flag and the 'connection closed' + # callback can be issued on the next call to process() + self.has_output def close_output(self, reason=None): if not self._write_done: @@ -562,6 +537,10 @@ class Connection(object): link.reject(pn_condition) link.destroy() + @property + def _endpoint_state(self): + return self._pn_connection.state + def _remove_sender(self, name): if name in self._sender_links: del self._sender_links[name] @@ -654,3 +633,20 @@ class Connection(object): for cb in callbacks: cb() return self._timers_heap[0] if self._timers_heap else 0 + + # endpoint state machine actions: + + def _ep_active(self): + """Both ends of the Endpoint have become active.""" + LOG.debug("Connection is up") + if not self._active: + self._active = True + if self._handler: + self._handler.connection_active(self) + + def _ep_need_close(self): + """The remote has closed its end of the endpoint.""" + LOG.debug("Connection remotely closed") + if self._handler: + cond = self._pn_connection.remote_condition + self._handler.connection_remote_closed(self, cond) diff --git a/python/dingus/endpoint.py b/python/dingus/endpoint.py new file mode 100644 index 0000000..9f346e0 --- /dev/null +++ b/python/dingus/endpoint.py @@ -0,0 +1,131 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import proton + +LOG = logging.getLogger(__name__) + + +class Endpoint(object): + """AMQP Endpoint state machine.""" + + # Endpoint States: (note: keep in sync with _fsm below!) + STATE_UNINIT = 0 # initial state + STATE_PENDING = 1 # waiting for remote to open + STATE_REQUESTED = 2 # waiting for local to open + STATE_CANCELLED = 3 # remote closed endpoint before local open + STATE_ACTIVE = 4 + STATE_NEED_CLOSE = 5 # remote initiated close + STATE_CLOSING = 6 # locally closed, pending remote + STATE_CLOSED = 7 # terminal state + STATE_REJECTED = 8 # terminal state, automatic cleanup + + # Events - state has transitioned to: + LOCAL_ACTIVE = 1 + LOCAL_CLOSED = 2 + REMOTE_ACTIVE = 3 + REMOTE_CLOSED = 4 + + def __init__(self): + self._state = Endpoint.STATE_UNINIT + self._fsm = [ # {event: (next-state, action), ...} + # STATE_UNINIT: + {Endpoint.LOCAL_ACTIVE: (Endpoint.STATE_PENDING, None), + Endpoint.REMOTE_ACTIVE: (Endpoint.STATE_REQUESTED, + self._ep_requested), + Endpoint.REMOTE_CLOSED: (Endpoint.STATE_NEED_CLOSE, + self._ep_need_close)}, + # STATE_PENDING: + {Endpoint.LOCAL_CLOSED: (Endpoint.STATE_CLOSING, None), + Endpoint.REMOTE_ACTIVE: (Endpoint.STATE_ACTIVE, + self._ep_active), + Endpoint.REMOTE_CLOSED: (Endpoint.STATE_NEED_CLOSE, + self._ep_need_close)}, + # STATE_REQUESTED: + {Endpoint.LOCAL_CLOSED: (Endpoint.STATE_REJECTED, None), + Endpoint.LOCAL_ACTIVE: (Endpoint.STATE_ACTIVE, + self._ep_active), + Endpoint.REMOTE_CLOSED: (Endpoint.STATE_CANCELLED, None)}, + # STATE_CANCELLED: + {Endpoint.LOCAL_CLOSED: (Endpoint.STATE_REJECTED, None), + Endpoint.LOCAL_ACTIVE: (Endpoint.STATE_NEED_CLOSE, + self._ep_need_close)}, + # STATE_ACTIVE: + {Endpoint.LOCAL_CLOSED: (Endpoint.STATE_CLOSING, None), + Endpoint.REMOTE_CLOSED: (Endpoint.STATE_NEED_CLOSE, + self._ep_need_close)}, + # STATE_NEED_CLOSE: + {Endpoint.LOCAL_CLOSED: (Endpoint.STATE_CLOSED, + self._ep_closed)}, + # STATE_CLOSING: + {Endpoint.REMOTE_CLOSED: (Endpoint.STATE_CLOSED, + self._ep_closed)}, + # STATE_CLOSED: + {}, + # STATE_REJECTED: + {}] + + def process_remote_state(self): + """Call when remote endpoint state changes.""" + state = self._endpoint_state + if (state & proton.Endpoint.REMOTE_ACTIVE): + self._dispatch_event(Endpoint.REMOTE_ACTIVE) + elif (state & proton.Endpoint.REMOTE_CLOSED): + self._dispatch_event(Endpoint.REMOTE_CLOSED) + + def process_local_state(self): + """Call when local endpoint state changes.""" + state = self._endpoint_state + if (state & proton.Endpoint.LOCAL_ACTIVE): + self._dispatch_event(Endpoint.LOCAL_ACTIVE) + elif (state & proton.Endpoint.LOCAL_CLOSED): + self._dispatch_event(Endpoint.LOCAL_CLOSED) + + @property + def _endpoint_state(self): + """Returns the current endpoint state.""" + raise NotImplementedError("Must Override") + + def _dispatch_event(self, event): + LOG.debug("Endpoint event %d", event) + fsm = self._fsm[self._state] + entry = fsm.get(event) + if entry: + LOG.debug("Old State: %d New State: %d", + self._state, entry[0]) + self._state = entry[0] + if entry[1]: + entry[1]() + + # state entry actions - overridden by endpoint subclass: + + def _ep_requested(self): + """Remote has activated a new endpoint.""" + LOG.debug("endpoint_requested - ignored") + + def _ep_active(self): + """Both ends of the Endpoint have become active.""" + LOG.debug("endpoint_active - ignored") + + def _ep_need_close(self): + """The remote has closed its end of the endpoint.""" + LOG.debug("endpoint_need_close - ignored") + + def _ep_closed(self): + """Both ends of the endpoint have closed.""" + LOG.debug("endpoint_closed - ignored") diff --git a/python/dingus/link.py b/python/dingus/link.py index dba517b..2fc0a53 100644 --- a/python/dingus/link.py +++ b/python/dingus/link.py @@ -26,17 +26,16 @@ import collections import logging import proton +from dingus.endpoint import Endpoint + LOG = logging.getLogger(__name__) -class _Link(object): +class _Link(Endpoint): """A generic Link base class.""" def __init__(self, connection, pn_link): - self._state = _Link._STATE_UNINIT - # last known endpoint state: - self._ep_state = (proton.Endpoint.LOCAL_UNINIT | - proton.Endpoint.REMOTE_UNINIT) + super(_Link, self).__init__() self._connection = connection self._name = pn_link.name self._handler = None @@ -154,89 +153,9 @@ class _Link(object): self._pn_link = None session.link_destroyed(self) # destroy session _after_ link - # Link State Machine - - # Link States: (note: keep in sync with _STATE_MAP below!) - _STATE_UNINIT = 0 # initial state - _STATE_PENDING = 1 # waiting for remote to open - _STATE_REQUESTED = 2 # waiting for local to open - _STATE_CANCELLED = 3 # remote closed requested link before accepted - _STATE_ACTIVE = 4 - _STATE_NEED_CLOSE = 5 # remote initiated close - _STATE_CLOSING = 6 # locally closed, pending remote - _STATE_CLOSED = 7 # terminal state - _STATE_REJECTED = 8 # terminal state, automatic link cleanup - - # Events: - _LOCAL_ACTIVE = 1 - _LOCAL_CLOSED = 2 - _REMOTE_ACTIVE = 3 - _REMOTE_CLOSED = 4 - - # state entry actions - link type specific: - - @staticmethod - def _fsm_active(link): - """Both ends of the link have become active.""" - link._do_active() - - @staticmethod - def _fsm_need_close(link): - """The remote has closed its end of the link.""" - link._do_need_close() - - @staticmethod - def _fsm_closed(link): - """Both ends of the link have closed.""" - link._do_closed() - - @staticmethod - def _fsm_requested(link): - """Remote has created a new link.""" - link._do_requested() - - def _process_endpoints(self): - """Process any changes in link endpoint state.""" - LOCAL_MASK = (proton.Endpoint.LOCAL_UNINIT | - proton.Endpoint.LOCAL_ACTIVE | - proton.Endpoint.LOCAL_CLOSED) - REMOTE_MASK = (proton.Endpoint.REMOTE_UNINIT | - proton.Endpoint.REMOTE_ACTIVE | - proton.Endpoint.REMOTE_CLOSED) - new_state = self._pn_link.state - old_state = self._ep_state - fsm = _Link._STATE_MAP[self._state] - if ((new_state & LOCAL_MASK) != (old_state & LOCAL_MASK)): - event = None - if new_state & proton.Endpoint.LOCAL_ACTIVE: - event = _Link._LOCAL_ACTIVE - elif new_state & proton.Endpoint.LOCAL_CLOSED: - event = _Link._LOCAL_CLOSED - if event: - entry = fsm.get(event) - if entry: - LOG.debug("Old State: %d Event %d New State: %d", - self._state, event, entry[0]) - self._state = entry[0] - if entry[1]: - entry[1](self) - - fsm = _Link._STATE_MAP[self._state] - if ((new_state & REMOTE_MASK) != (old_state & REMOTE_MASK)): - event = None - if new_state & proton.Endpoint.REMOTE_ACTIVE: - event = _Link._REMOTE_ACTIVE - elif new_state & proton.Endpoint.REMOTE_CLOSED: - event = _Link._REMOTE_CLOSED - if event: - entry = fsm.get(event) - if entry: - LOG.debug("Old State: %d Event %d New State: %d", - self._state, event, entry[0]) - self._state = entry[0] - if entry[1]: - entry[1](self) - self._ep_state = new_state + @property + def _endpoint_state(self): + return self._pn_link.state def _process_delivery(self, pn_delivery): raise NotImplementedError("Must Override") @@ -246,42 +165,8 @@ class _Link(object): def _session_closed(self): """Remote has closed the session used by this link.""" - fsm = _Link._STATE_MAP[self._state] - entry = fsm.get(_Link._REMOTE_CLOSED) - if entry: - LOG.debug("Old State: %d Event %d New State: %d", - self._state, _Link._REMOTE_CLOSED, entry[0]) - self._state = entry[0] - if entry[1]: - entry[1](self) - -_Link._STATE_MAP = [ # {event: (next-state, action), ...} - # _STATE_UNINIT: - {_Link._LOCAL_ACTIVE: (_Link._STATE_PENDING, None), - _Link._REMOTE_ACTIVE: (_Link._STATE_REQUESTED, _Link._fsm_requested), - _Link._REMOTE_CLOSED: (_Link._STATE_NEED_CLOSE, _Link._fsm_need_close)}, - # _STATE_PENDING: - {_Link._LOCAL_CLOSED: (_Link._STATE_CLOSING, None), - _Link._REMOTE_ACTIVE: (_Link._STATE_ACTIVE, _Link._fsm_active), - _Link._REMOTE_CLOSED: (_Link._STATE_NEED_CLOSE, _Link._fsm_need_close)}, - # _STATE_REQUESTED: - {_Link._LOCAL_CLOSED: (_Link._STATE_REJECTED, None), - _Link._LOCAL_ACTIVE: (_Link._STATE_ACTIVE, _Link._fsm_active), - _Link._REMOTE_CLOSED: (_Link._STATE_CANCELLED, None)}, - # _STATE_CANCELLED: - {_Link._LOCAL_CLOSED: (_Link._STATE_REJECTED, None), - _Link._LOCAL_ACTIVE: (_Link._STATE_NEED_CLOSE, _Link._fsm_need_close)}, - # _STATE_ACTIVE: - {_Link._LOCAL_CLOSED: (_Link._STATE_CLOSING, None), - _Link._REMOTE_CLOSED: (_Link._STATE_NEED_CLOSE, _Link._fsm_need_close)}, - # _STATE_NEED_CLOSE: - {_Link._LOCAL_CLOSED: (_Link._STATE_CLOSED, _Link._fsm_closed)}, - # _STATE_CLOSING: - {_Link._REMOTE_CLOSED: (_Link._STATE_CLOSED, _Link._fsm_closed)}, - # _STATE_CLOSED: - {}, - # _STATE_REJECTED: - {}] + # simulate a remote-closed event: + self._dispatch_event(Endpoint.REMOTE_CLOSED) class SenderEventHandler(object): @@ -387,6 +272,9 @@ class SenderLink(_Link): info = {"condition": pn_condition} if pn_condition else None while self._send_requests: key, send_req = self._send_requests.popitem() + info = None + if pn_condition: + info = {"condition": pn_condition} # TODO(kgiusti) fix - must be async! send_req.destroy(SenderLink.ABORTED, info) super(SenderLink, self).close(pn_condition) @@ -443,19 +331,27 @@ class SenderLink(_Link): pn_delivery.settle() def _process_credit(self): + # check if any pending deliveries are now writable: + pn_delivery = self._pn_link.current + while (self._pending_sends and + pn_delivery and pn_delivery.writable): + self._process_delivery(pn_delivery) + pn_delivery = self._pn_link.current + # Alert if credit has become available - if self._handler and self._state == _Link._STATE_ACTIVE: - new_credit = self._pn_link.credit + new_credit = self._pn_link.credit + if self._handler: if self._last_credit <= 0 and new_credit > 0: LOG.debug("Credit is available, link=%s", self.name) self._handler.credit_granted(self) - self._last_credit = new_credit + self._last_credit = new_credit def _write_msg(self, pn_delivery, send_req): # given a writable delivery, send a message LOG.debug("Sending message to engine, tag=%s", send_req.tag) self._pn_link.send(send_req.message.encode()) self._pn_link.advance() + self._last_credit = self._pn_link.credit if not send_req.callback: # no disposition callback, so we can discard the send request and # settle the delivery immediately @@ -470,26 +366,26 @@ class SenderLink(_Link): pass send_req.destroy(SenderLink.TIMED_OUT, None) - # state machine actions: + # endpoint state machine actions: - def _do_active(self): + def _ep_active(self): LOG.debug("Link is up") if self._handler: self._handler.sender_active(self) - def _do_need_close(self): + def _ep_need_close(self): # TODO(kgiusti) error reporting LOG.debug("Link remote closed") if self._handler: cond = self._pn_link.remote_condition self._handler.sender_remote_closed(self, cond) - def _do_closed(self): + def _ep_closed(self): LOG.debug("Link close completed") if self._handler: self._handler.sender_closed(self) - def _do_requested(self): + def _ep_requested(self): LOG.debug("Remote has initiated a link") handler = self._connection._handler if handler: @@ -612,25 +508,25 @@ class ReceiverLink(_Link): # Only used by SenderLink pass - # state machine actions: + # endpoint state machine actions: - def _do_active(self): + def _ep_active(self): LOG.debug("Link is up") if self._handler: self._handler.receiver_active(self) - def _do_need_close(self): + def _ep_need_close(self): LOG.debug("Link remote closed") if self._handler: cond = self._pn_link.remote_condition self._handler.receiver_remote_closed(self, cond) - def _do_closed(self): + def _ep_closed(self): LOG.debug("Link close completed") if self._handler: self._handler.receiver_closed(self) - def _do_requested(self): + def _ep_requested(self): LOG.debug("Remote has initiated a ReceiverLink") handler = self._connection._handler if handler: @@ -656,9 +552,10 @@ class ReceiverLink(_Link): props) -class _SessionProxy(object): +class _SessionProxy(Endpoint): """Corresponds to a Proton Session object.""" def __init__(self, connection, pn_session=None): + super(_SessionProxy, self).__init__() self._locally_initiated = not pn_session self._connection = connection if not pn_session: @@ -703,7 +600,17 @@ class _SessionProxy(object): self._pn_session = None self._connection = None - def remote_closed(self): + @property + def _endpoint_state(self): + return self._pn_session.state + + # endpoint state machine actions: + + def _ep_requested(self): + """Peer has requested a new session.""" + self.open() + + def _ep_need_close(self): """Peer has closed its end of the session.""" links = self._links.copy() # may modify _links for link in links: diff --git a/tests/python/unit_tests/connection.py b/tests/python/unit_tests/connection.py index cae0094..dbdb1f6 100644 --- a/tests/python/unit_tests/connection.py +++ b/tests/python/unit_tests/connection.py @@ -17,6 +17,7 @@ # under the License. # import common +# import logging import os import time @@ -27,6 +28,7 @@ import dingus class APITest(common.Test): def setup(self): + # logging.getLogger("dingus").setLevel(logging.DEBUG) self.container1 = dingus.Container("test-container-1") self.container2 = dingus.Container("test-container-2") @@ -256,6 +258,7 @@ class APITest(common.Test): assert cb1.failed_ct == 0 c1.process(time.time()) assert cb1.failed_ct > 0 + assert cb1.failed_error def test_io_output_close(self): """Premature output close should trigger failed callback.""" @@ -270,6 +273,7 @@ class APITest(common.Test): assert cb1.failed_ct == 0 c1.process(time.time()) assert cb1.failed_ct > 0 + assert cb1.failed_error def test_process_reentrancy(self): """Catch any attempt to re-enter Connection.process() from a diff --git a/tests/python/unit_tests/link.py b/tests/python/unit_tests/link.py index 878cebe..956f250 100644 --- a/tests/python/unit_tests/link.py +++ b/tests/python/unit_tests/link.py @@ -221,21 +221,19 @@ class APITest(common.Test): assert rl_handler.message_received_ct == 5 assert sender.credit == 0 assert sender.pending == 1 - # TODO(kgiusti) bug - probably shouldn't call back when pending > - # available credit - assert sl_handler.credit_granted_ct == 2 + assert sl_handler.credit_granted_ct == 1 receiver.add_capacity(1) self.process_connections() assert sender.credit == 0 assert sender.pending == 0 - assert sl_handler.credit_granted_ct == 3 + assert sl_handler.credit_granted_ct == 1 # verify new credit becomes available: receiver.add_capacity(1) self.process_connections() assert sender.credit == 1 - assert sl_handler.credit_granted_ct == 4 + assert sl_handler.credit_granted_ct == 2 def test_send_presettled(self): sender, receiver = self._setup_sender_sync()