Port to Proton 0.7 RC3.

This commit is contained in:
Kenneth Giusti
2014-04-01 09:55:55 -04:00
parent cc8d51ef5f
commit 4691966002
9 changed files with 310 additions and 255 deletions

View File

@@ -19,6 +19,7 @@
#
""" Minimal message receive example code."""
import logging
import optparse
import sys
import uuid
@@ -28,6 +29,9 @@ from utils import connect_socket
from utils import get_host_port
from utils import process_connection
LOG = logging.getLogger()
LOG.addHandler(logging.StreamHandler())
def main(argv=None):
@@ -39,6 +43,8 @@ def main(argv=None):
parser.add_option("--idle", dest="idle_timeout", type="int",
default=0,
help="Idle timeout for connection (seconds).")
parser.add_option("--debug", dest="debug", action="store_true",
help="enable debug logging")
parser.add_option("--source", dest="source_addr", type="string",
help="Address for link source.")
parser.add_option("--target", dest="target_addr", type="string",
@@ -49,6 +55,8 @@ def main(argv=None):
help="Certificate Authority PEM file")
opts, extra = parser.parse_args(args=argv)
if opts.debug:
LOG.setLevel(logging.DEBUG)
host, port = get_host_port(opts.server)
my_socket = connect_socket(host, port)

View File

@@ -42,7 +42,7 @@ import uuid
# from guppy import hpy
# hp = hpy()
from proton import Message
from proton import Message, Condition
import dingus
LOG = logging.getLogger()
@@ -268,7 +268,9 @@ class MyReceiverLink(dingus.ReceiverEventHandler):
if not reply_to or reply_to not in reply_senders:
LOG.error("sender for reply-to not found, reply-to=%s",
str(reply_to))
self._link.message_rejected(handle, "Bad reply-to address")
info = Condition("not-found",
"Bad reply-to address: %s" % str(reply_to))
self._link.message_rejected(handle, info)
else:
my_sender = reply_senders[reply_to]
correlation_id = message.correlation_id
@@ -276,7 +278,9 @@ class MyReceiverLink(dingus.ReceiverEventHandler):
if (not isinstance(method_map, dict) or
'method' not in method_map):
LOG.error("no method given, map=%s", str(method_map))
self._link.message_rejected(handle, "Bad format")
info = Condition("invalid-field",
"no method given, map=%s" % str(method_map))
self._link.message_rejected(handle, info)
else:
response = Message()
response.address = reply_to

View File

@@ -171,8 +171,8 @@ class MySenderLink(dingus.SenderEventHandler):
def sender_active(self, sender_link):
LOG.debug("Sender: Active")
# TODO(kgiusti) - need credit granted callback:
self.credit_granted(sender_link)
if sender_link.credit > 0:
self.send_message()
def sender_remote_closed(self, sender_link, error):
LOG.debug("Sender: Remote closed")
@@ -186,7 +186,8 @@ class MySenderLink(dingus.SenderEventHandler):
def credit_granted(self, sender_link):
LOG.debug("Sender: credit granted")
# Send a single message:
self.send_message()
if sender_link.credit > 0:
self.send_message()
# 'message sent' callback:
def __call__(self, sender, handle, status, error=None):
@@ -234,6 +235,8 @@ class MyReceiverLink(dingus.ReceiverEventHandler):
self.receiver_link.message_accepted(handle)
print("Message received on Receiver link %s, message=%s"
% (self.receiver_link.name, str(message)))
if receiver_link.capacity < 1:
receiver_link.add_capacity(1)
def main(argv=None):
@@ -291,7 +294,7 @@ def main(argv=None):
[], timeout)
LOG.debug("select() returned")
worked = []
worked = set()
for r in readable:
if r is my_socket:
# new inbound connection request received,
@@ -316,8 +319,9 @@ def main(argv=None):
LOG.debug("new connection created name=%s", name)
else:
assert isinstance(r, SocketConnection)
r.process_input()
worked.append(r)
worked.add(r)
for t in timers:
now = time.time()
@@ -326,12 +330,12 @@ def main(argv=None):
t.process(now)
sc = t.user_context
assert isinstance(sc, SocketConnection)
worked.append(sc)
worked.add(sc)
for w in writable:
assert isinstance(w, SocketConnection)
w.send_output()
worked.append(w)
worked.add(w)
# nuke any completed connections:
closed = False

View File

@@ -75,6 +75,9 @@ def server_socket(host, port, backlog=10):
def process_connection(connection, my_socket):
"""Handle I/O and Timers on a single Connection."""
if connection.closed:
return False
work = False
readfd = []
writefd = []

View File

@@ -22,13 +22,12 @@ __all__ = [
import heapq
import logging
import proton
import time
from warnings import warn
from dingus.link import _SessionProxy
import proton
from dingus.endpoint import Endpoint
LOG = logging.getLogger(__name__)
@@ -77,7 +76,7 @@ class ConnectionEventHandler(object):
LOG.debug("sasl_done (ignored)")
class Connection(object):
class Connection(Endpoint):
"""A Connection to a peer."""
EOS = -1 # indicates 'I/O stream closed'
@@ -130,6 +129,7 @@ class Connection(object):
SSL (eg, plain TCP). Used by a server that will accept clients
requesting either trusted or untrusted connections.
"""
super(Connection, self).__init__()
self._name = name
self._container = container
self._handler = event_handler
@@ -138,6 +138,8 @@ class Connection(object):
self._pn_connection.container = container.name
self._pn_transport = proton.Transport()
self._pn_transport.bind(self._pn_connection)
self._pn_collector = proton.Collector()
self._pn_connection.collect(self._pn_collector)
if properties:
if 'hostname' in properties:
@@ -242,35 +244,27 @@ class Connection(object):
@property
def active(self):
"""Return True if both ends of the Connection are open."""
state = self._pn_connection.state
return (state == (proton.Endpoint.LOCAL_ACTIVE
| proton.Endpoint.REMOTE_ACTIVE))
return self._endpoint_state == self._ACTIVE
@property
def closed(self):
"""Return True if the Connection has closed."""
state = self._pn_connection.state
# if closed in error, state may not be correct:
return (state == (proton.Endpoint.LOCAL_CLOSED
| proton.Endpoint.REMOTE_CLOSED)
or (self._write_done and self._read_done))
"""Return True if the Connection has finished closing."""
return (self._write_done and self._read_done)
def destroy(self):
self._sender_links.clear()
self._receiver_links.clear()
self._container._remove_connection(self._name)
self._container = None
self._pn_collector = None
self._pn_connection = None
self._pn_transport = None
self._user_context = None
_REMOTE_REQ = (proton.Endpoint.LOCAL_UNINIT
| proton.Endpoint.REMOTE_ACTIVE)
_REMOTE_CLOSE = (proton.Endpoint.LOCAL_ACTIVE
| proton.Endpoint.REMOTE_CLOSED)
_ACTIVE = (proton.Endpoint.LOCAL_ACTIVE | proton.Endpoint.REMOTE_ACTIVE)
_CLOSED = (proton.Endpoint.LOCAL_CLOSED | proton.Endpoint.REMOTE_CLOSED)
_LOCAL_UNINIT = proton.Endpoint.LOCAL_UNINIT
_ACTIVE = (proton.Endpoint.LOCAL_ACTIVE | proton.Endpoint.REMOTE_ACTIVE)
def process(self, now):
"""Perform connection state processing."""
@@ -278,7 +272,8 @@ class Connection(object):
raise RuntimeError("Connection.process() is not re-entrant!")
self._in_process = True
try:
# if the connection has hit an unrecoverable error,
# nag the application until connection is destroyed
if self._error:
if self._handler:
self._handler.connection_failed(self, self._error)
@@ -302,41 +297,6 @@ class Connection(object):
self._pn_sasl.outcome)
self._pn_sasl = None
# do endpoint up handling:
if self._pn_connection.state == self._ACTIVE:
if not self._active:
self._active = True
if self._handler:
self._handler.connection_active(self)
pn_session = self._pn_connection.session_head(self._LOCAL_UNINIT)
while pn_session:
LOG.debug("Opening remotely initiated session")
session = _SessionProxy(self, pn_session)
session.open()
pn_session = pn_session.next(self._LOCAL_UNINIT)
pn_link = self._pn_connection.link_head(self._REMOTE_REQ)
while pn_link:
next_pn_link = pn_link.next(self._REMOTE_REQ)
session = pn_link.session.context
if (pn_link.is_sender and
pn_link.name not in self._sender_links):
LOG.debug("Remotely initiated Sender %s needs init",
pn_link.name)
link = session.request_sender(pn_link)
self._sender_links[pn_link.name] = link
link._process_endpoints()
elif (pn_link.is_receiver and
pn_link.name not in self._receiver_links):
LOG.debug("Remotely initiated Receiver %s needs init",
pn_link.name)
link = session.request_receiver(pn_link)
self._receiver_links[pn_link.name] = link
link._process_endpoints()
pn_link = next_pn_link
# process timer events:
timer_deadline = self._expire_timers(now)
transport_deadline = self._pn_transport.tick(now)
@@ -345,62 +305,69 @@ class Connection(object):
else:
self._next_deadline = timer_deadline or transport_deadline
# TODO(kgiusti) won't scale - use new Engine event API
pn_link = self._pn_connection.link_head(self._ACTIVE)
while pn_link:
next_pn_link = pn_link.next(self._ACTIVE)
pn_link.context._process_endpoints()
pn_link.context._process_credit()
pn_link = next_pn_link
# process events from proton:
pn_event = self._pn_collector.peek()
while pn_event:
if pn_event.type == proton.Event.CONNECTION_REMOTE_STATE:
self.process_remote_state()
# process the delivery work queue
pn_delivery = self._pn_connection.work_head
while pn_delivery:
next_delivery = pn_delivery.work_next
pn_delivery.link.context._process_delivery(pn_delivery)
pn_delivery = next_delivery
elif pn_event.type == proton.Event.CONNECTION_LOCAL_STATE:
self.process_local_state()
# do endpoint down handling:
elif pn_event.type == proton.Event.SESSION_REMOTE_STATE:
pn_session = pn_event.session
# create a new session if requested by remote:
if (pn_session.state == self._REMOTE_REQ):
LOG.debug("Opening remotely initiated session")
session = _SessionProxy(self, pn_session)
pn_session.context.process_remote_state()
pn_link = self._pn_connection.link_head(self._REMOTE_CLOSE)
while pn_link:
next_pn_link = pn_link.next(self._REMOTE_CLOSE)
pn_link.context._process_endpoints()
pn_link = next_pn_link
elif pn_event.type == proton.Event.SESSION_LOCAL_STATE:
pn_session = pn_event.session
pn_session.context.process_local_state()
pn_link = self._pn_connection.link_head(self._CLOSED)
while pn_link:
next_pn_link = pn_link.next(self._CLOSED)
pn_link.context._process_endpoints()
pn_link = next_pn_link
elif pn_event.type == proton.Event.LINK_REMOTE_STATE:
pn_link = pn_event.link
# create a new link if requested by remote:
if (pn_link.state == self._REMOTE_REQ):
session = pn_link.session.context
if (pn_link.is_sender and
pn_link.name not in self._sender_links):
LOG.debug("Remotely initiated Sender needs init")
link = session.request_sender(pn_link)
self._sender_links[pn_link.name] = link
elif (pn_link.is_receiver and
pn_link.name not in self._receiver_links):
LOG.debug("Remotely initiated Receiver needs init")
link = session.request_receiver(pn_link)
self._receiver_links[pn_link.name] = link
pn_link.context.process_remote_state()
pn_session = self._pn_connection.session_head(self._REMOTE_CLOSE)
while pn_session:
LOG.debug("Session closed remotely")
next_session = pn_session.next(self._REMOTE_CLOSE)
session = pn_session.context
session.remote_closed()
pn_session = next_session
elif pn_event.type == proton.Event.LINK_LOCAL_STATE:
pn_link = pn_event.link
pn_link.context.process_local_state()
if self._pn_connection.state == self._REMOTE_CLOSE:
LOG.debug("Connection remotely closed")
if self._handler:
cond = self._pn_connection.remote_condition
self._handler.connection_remote_closed(self, cond)
elif self._pn_connection.state == self._CLOSED:
LOG.debug("Connection close complete")
self._next_deadline = 0
elif pn_event.type == proton.Event.DELIVERY:
link = pn_event.link.context
pn_delivery = pn_event.delivery
link._process_delivery(pn_delivery)
elif pn_event.type == proton.Event.LINK_FLOW:
link = pn_event.link.context
link._process_credit()
self._pn_collector.pop()
pn_event = self._pn_collector.peek()
# invoked closed callback after endpoint has fully closed and all
# pending I/O has completed:
if (self._active and
self._endpoint_state == self._CLOSED and
self._read_done and self._write_done):
self._active = False
if self._handler:
self._handler.connection_closed(self)
# DEBUG LINK "LEAK"
# count = 0
# link = self._pn_connection.link_head(0)
# while link:
# count += 1
# link = link.next(0)
# print "Link Count %d" % count
return self._next_deadline
finally:
@@ -441,6 +408,10 @@ class Connection(object):
if rc: # error?
self._read_done = True
return self.EOS
# hack: check if this was the last input needed by the connection.
# If so, this will set the _read_done flag and the 'connection closed'
# callback can be issued on the next call to process()
self.needs_input
return c
def close_input(self, reason=None):
@@ -481,7 +452,11 @@ class Connection(object):
try:
self._pn_transport.pop(count)
except Exception as e:
self._connection_failed(str(e))
return self._connection_failed(str(e))
# hack: check if this was the last output from the connection. If so,
# this will set the _write_done flag and the 'connection closed'
# callback can be issued on the next call to process()
self.has_output
def close_output(self, reason=None):
if not self._write_done:
@@ -562,6 +537,10 @@ class Connection(object):
link.reject(pn_condition)
link.destroy()
@property
def _endpoint_state(self):
return self._pn_connection.state
def _remove_sender(self, name):
if name in self._sender_links:
del self._sender_links[name]
@@ -654,3 +633,20 @@ class Connection(object):
for cb in callbacks:
cb()
return self._timers_heap[0] if self._timers_heap else 0
# endpoint state machine actions:
def _ep_active(self):
"""Both ends of the Endpoint have become active."""
LOG.debug("Connection is up")
if not self._active:
self._active = True
if self._handler:
self._handler.connection_active(self)
def _ep_need_close(self):
"""The remote has closed its end of the endpoint."""
LOG.debug("Connection remotely closed")
if self._handler:
cond = self._pn_connection.remote_condition
self._handler.connection_remote_closed(self, cond)

131
python/dingus/endpoint.py Normal file
View File

@@ -0,0 +1,131 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import proton
LOG = logging.getLogger(__name__)
class Endpoint(object):
"""AMQP Endpoint state machine."""
# Endpoint States: (note: keep in sync with _fsm below!)
STATE_UNINIT = 0 # initial state
STATE_PENDING = 1 # waiting for remote to open
STATE_REQUESTED = 2 # waiting for local to open
STATE_CANCELLED = 3 # remote closed endpoint before local open
STATE_ACTIVE = 4
STATE_NEED_CLOSE = 5 # remote initiated close
STATE_CLOSING = 6 # locally closed, pending remote
STATE_CLOSED = 7 # terminal state
STATE_REJECTED = 8 # terminal state, automatic cleanup
# Events - state has transitioned to:
LOCAL_ACTIVE = 1
LOCAL_CLOSED = 2
REMOTE_ACTIVE = 3
REMOTE_CLOSED = 4
def __init__(self):
self._state = Endpoint.STATE_UNINIT
self._fsm = [ # {event: (next-state, action), ...}
# STATE_UNINIT:
{Endpoint.LOCAL_ACTIVE: (Endpoint.STATE_PENDING, None),
Endpoint.REMOTE_ACTIVE: (Endpoint.STATE_REQUESTED,
self._ep_requested),
Endpoint.REMOTE_CLOSED: (Endpoint.STATE_NEED_CLOSE,
self._ep_need_close)},
# STATE_PENDING:
{Endpoint.LOCAL_CLOSED: (Endpoint.STATE_CLOSING, None),
Endpoint.REMOTE_ACTIVE: (Endpoint.STATE_ACTIVE,
self._ep_active),
Endpoint.REMOTE_CLOSED: (Endpoint.STATE_NEED_CLOSE,
self._ep_need_close)},
# STATE_REQUESTED:
{Endpoint.LOCAL_CLOSED: (Endpoint.STATE_REJECTED, None),
Endpoint.LOCAL_ACTIVE: (Endpoint.STATE_ACTIVE,
self._ep_active),
Endpoint.REMOTE_CLOSED: (Endpoint.STATE_CANCELLED, None)},
# STATE_CANCELLED:
{Endpoint.LOCAL_CLOSED: (Endpoint.STATE_REJECTED, None),
Endpoint.LOCAL_ACTIVE: (Endpoint.STATE_NEED_CLOSE,
self._ep_need_close)},
# STATE_ACTIVE:
{Endpoint.LOCAL_CLOSED: (Endpoint.STATE_CLOSING, None),
Endpoint.REMOTE_CLOSED: (Endpoint.STATE_NEED_CLOSE,
self._ep_need_close)},
# STATE_NEED_CLOSE:
{Endpoint.LOCAL_CLOSED: (Endpoint.STATE_CLOSED,
self._ep_closed)},
# STATE_CLOSING:
{Endpoint.REMOTE_CLOSED: (Endpoint.STATE_CLOSED,
self._ep_closed)},
# STATE_CLOSED:
{},
# STATE_REJECTED:
{}]
def process_remote_state(self):
"""Call when remote endpoint state changes."""
state = self._endpoint_state
if (state & proton.Endpoint.REMOTE_ACTIVE):
self._dispatch_event(Endpoint.REMOTE_ACTIVE)
elif (state & proton.Endpoint.REMOTE_CLOSED):
self._dispatch_event(Endpoint.REMOTE_CLOSED)
def process_local_state(self):
"""Call when local endpoint state changes."""
state = self._endpoint_state
if (state & proton.Endpoint.LOCAL_ACTIVE):
self._dispatch_event(Endpoint.LOCAL_ACTIVE)
elif (state & proton.Endpoint.LOCAL_CLOSED):
self._dispatch_event(Endpoint.LOCAL_CLOSED)
@property
def _endpoint_state(self):
"""Returns the current endpoint state."""
raise NotImplementedError("Must Override")
def _dispatch_event(self, event):
LOG.debug("Endpoint event %d", event)
fsm = self._fsm[self._state]
entry = fsm.get(event)
if entry:
LOG.debug("Old State: %d New State: %d",
self._state, entry[0])
self._state = entry[0]
if entry[1]:
entry[1]()
# state entry actions - overridden by endpoint subclass:
def _ep_requested(self):
"""Remote has activated a new endpoint."""
LOG.debug("endpoint_requested - ignored")
def _ep_active(self):
"""Both ends of the Endpoint have become active."""
LOG.debug("endpoint_active - ignored")
def _ep_need_close(self):
"""The remote has closed its end of the endpoint."""
LOG.debug("endpoint_need_close - ignored")
def _ep_closed(self):
"""Both ends of the endpoint have closed."""
LOG.debug("endpoint_closed - ignored")

View File

@@ -26,17 +26,16 @@ import collections
import logging
import proton
from dingus.endpoint import Endpoint
LOG = logging.getLogger(__name__)
class _Link(object):
class _Link(Endpoint):
"""A generic Link base class."""
def __init__(self, connection, pn_link):
self._state = _Link._STATE_UNINIT
# last known endpoint state:
self._ep_state = (proton.Endpoint.LOCAL_UNINIT |
proton.Endpoint.REMOTE_UNINIT)
super(_Link, self).__init__()
self._connection = connection
self._name = pn_link.name
self._handler = None
@@ -154,89 +153,9 @@ class _Link(object):
self._pn_link = None
session.link_destroyed(self) # destroy session _after_ link
# Link State Machine
# Link States: (note: keep in sync with _STATE_MAP below!)
_STATE_UNINIT = 0 # initial state
_STATE_PENDING = 1 # waiting for remote to open
_STATE_REQUESTED = 2 # waiting for local to open
_STATE_CANCELLED = 3 # remote closed requested link before accepted
_STATE_ACTIVE = 4
_STATE_NEED_CLOSE = 5 # remote initiated close
_STATE_CLOSING = 6 # locally closed, pending remote
_STATE_CLOSED = 7 # terminal state
_STATE_REJECTED = 8 # terminal state, automatic link cleanup
# Events:
_LOCAL_ACTIVE = 1
_LOCAL_CLOSED = 2
_REMOTE_ACTIVE = 3
_REMOTE_CLOSED = 4
# state entry actions - link type specific:
@staticmethod
def _fsm_active(link):
"""Both ends of the link have become active."""
link._do_active()
@staticmethod
def _fsm_need_close(link):
"""The remote has closed its end of the link."""
link._do_need_close()
@staticmethod
def _fsm_closed(link):
"""Both ends of the link have closed."""
link._do_closed()
@staticmethod
def _fsm_requested(link):
"""Remote has created a new link."""
link._do_requested()
def _process_endpoints(self):
"""Process any changes in link endpoint state."""
LOCAL_MASK = (proton.Endpoint.LOCAL_UNINIT |
proton.Endpoint.LOCAL_ACTIVE |
proton.Endpoint.LOCAL_CLOSED)
REMOTE_MASK = (proton.Endpoint.REMOTE_UNINIT |
proton.Endpoint.REMOTE_ACTIVE |
proton.Endpoint.REMOTE_CLOSED)
new_state = self._pn_link.state
old_state = self._ep_state
fsm = _Link._STATE_MAP[self._state]
if ((new_state & LOCAL_MASK) != (old_state & LOCAL_MASK)):
event = None
if new_state & proton.Endpoint.LOCAL_ACTIVE:
event = _Link._LOCAL_ACTIVE
elif new_state & proton.Endpoint.LOCAL_CLOSED:
event = _Link._LOCAL_CLOSED
if event:
entry = fsm.get(event)
if entry:
LOG.debug("Old State: %d Event %d New State: %d",
self._state, event, entry[0])
self._state = entry[0]
if entry[1]:
entry[1](self)
fsm = _Link._STATE_MAP[self._state]
if ((new_state & REMOTE_MASK) != (old_state & REMOTE_MASK)):
event = None
if new_state & proton.Endpoint.REMOTE_ACTIVE:
event = _Link._REMOTE_ACTIVE
elif new_state & proton.Endpoint.REMOTE_CLOSED:
event = _Link._REMOTE_CLOSED
if event:
entry = fsm.get(event)
if entry:
LOG.debug("Old State: %d Event %d New State: %d",
self._state, event, entry[0])
self._state = entry[0]
if entry[1]:
entry[1](self)
self._ep_state = new_state
@property
def _endpoint_state(self):
return self._pn_link.state
def _process_delivery(self, pn_delivery):
raise NotImplementedError("Must Override")
@@ -246,42 +165,8 @@ class _Link(object):
def _session_closed(self):
"""Remote has closed the session used by this link."""
fsm = _Link._STATE_MAP[self._state]
entry = fsm.get(_Link._REMOTE_CLOSED)
if entry:
LOG.debug("Old State: %d Event %d New State: %d",
self._state, _Link._REMOTE_CLOSED, entry[0])
self._state = entry[0]
if entry[1]:
entry[1](self)
_Link._STATE_MAP = [ # {event: (next-state, action), ...}
# _STATE_UNINIT:
{_Link._LOCAL_ACTIVE: (_Link._STATE_PENDING, None),
_Link._REMOTE_ACTIVE: (_Link._STATE_REQUESTED, _Link._fsm_requested),
_Link._REMOTE_CLOSED: (_Link._STATE_NEED_CLOSE, _Link._fsm_need_close)},
# _STATE_PENDING:
{_Link._LOCAL_CLOSED: (_Link._STATE_CLOSING, None),
_Link._REMOTE_ACTIVE: (_Link._STATE_ACTIVE, _Link._fsm_active),
_Link._REMOTE_CLOSED: (_Link._STATE_NEED_CLOSE, _Link._fsm_need_close)},
# _STATE_REQUESTED:
{_Link._LOCAL_CLOSED: (_Link._STATE_REJECTED, None),
_Link._LOCAL_ACTIVE: (_Link._STATE_ACTIVE, _Link._fsm_active),
_Link._REMOTE_CLOSED: (_Link._STATE_CANCELLED, None)},
# _STATE_CANCELLED:
{_Link._LOCAL_CLOSED: (_Link._STATE_REJECTED, None),
_Link._LOCAL_ACTIVE: (_Link._STATE_NEED_CLOSE, _Link._fsm_need_close)},
# _STATE_ACTIVE:
{_Link._LOCAL_CLOSED: (_Link._STATE_CLOSING, None),
_Link._REMOTE_CLOSED: (_Link._STATE_NEED_CLOSE, _Link._fsm_need_close)},
# _STATE_NEED_CLOSE:
{_Link._LOCAL_CLOSED: (_Link._STATE_CLOSED, _Link._fsm_closed)},
# _STATE_CLOSING:
{_Link._REMOTE_CLOSED: (_Link._STATE_CLOSED, _Link._fsm_closed)},
# _STATE_CLOSED:
{},
# _STATE_REJECTED:
{}]
# simulate a remote-closed event:
self._dispatch_event(Endpoint.REMOTE_CLOSED)
class SenderEventHandler(object):
@@ -387,6 +272,9 @@ class SenderLink(_Link):
info = {"condition": pn_condition} if pn_condition else None
while self._send_requests:
key, send_req = self._send_requests.popitem()
info = None
if pn_condition:
info = {"condition": pn_condition}
# TODO(kgiusti) fix - must be async!
send_req.destroy(SenderLink.ABORTED, info)
super(SenderLink, self).close(pn_condition)
@@ -443,19 +331,27 @@ class SenderLink(_Link):
pn_delivery.settle()
def _process_credit(self):
# check if any pending deliveries are now writable:
pn_delivery = self._pn_link.current
while (self._pending_sends and
pn_delivery and pn_delivery.writable):
self._process_delivery(pn_delivery)
pn_delivery = self._pn_link.current
# Alert if credit has become available
if self._handler and self._state == _Link._STATE_ACTIVE:
new_credit = self._pn_link.credit
new_credit = self._pn_link.credit
if self._handler:
if self._last_credit <= 0 and new_credit > 0:
LOG.debug("Credit is available, link=%s", self.name)
self._handler.credit_granted(self)
self._last_credit = new_credit
self._last_credit = new_credit
def _write_msg(self, pn_delivery, send_req):
# given a writable delivery, send a message
LOG.debug("Sending message to engine, tag=%s", send_req.tag)
self._pn_link.send(send_req.message.encode())
self._pn_link.advance()
self._last_credit = self._pn_link.credit
if not send_req.callback:
# no disposition callback, so we can discard the send request and
# settle the delivery immediately
@@ -470,26 +366,26 @@ class SenderLink(_Link):
pass
send_req.destroy(SenderLink.TIMED_OUT, None)
# state machine actions:
# endpoint state machine actions:
def _do_active(self):
def _ep_active(self):
LOG.debug("Link is up")
if self._handler:
self._handler.sender_active(self)
def _do_need_close(self):
def _ep_need_close(self):
# TODO(kgiusti) error reporting
LOG.debug("Link remote closed")
if self._handler:
cond = self._pn_link.remote_condition
self._handler.sender_remote_closed(self, cond)
def _do_closed(self):
def _ep_closed(self):
LOG.debug("Link close completed")
if self._handler:
self._handler.sender_closed(self)
def _do_requested(self):
def _ep_requested(self):
LOG.debug("Remote has initiated a link")
handler = self._connection._handler
if handler:
@@ -612,25 +508,25 @@ class ReceiverLink(_Link):
# Only used by SenderLink
pass
# state machine actions:
# endpoint state machine actions:
def _do_active(self):
def _ep_active(self):
LOG.debug("Link is up")
if self._handler:
self._handler.receiver_active(self)
def _do_need_close(self):
def _ep_need_close(self):
LOG.debug("Link remote closed")
if self._handler:
cond = self._pn_link.remote_condition
self._handler.receiver_remote_closed(self, cond)
def _do_closed(self):
def _ep_closed(self):
LOG.debug("Link close completed")
if self._handler:
self._handler.receiver_closed(self)
def _do_requested(self):
def _ep_requested(self):
LOG.debug("Remote has initiated a ReceiverLink")
handler = self._connection._handler
if handler:
@@ -656,9 +552,10 @@ class ReceiverLink(_Link):
props)
class _SessionProxy(object):
class _SessionProxy(Endpoint):
"""Corresponds to a Proton Session object."""
def __init__(self, connection, pn_session=None):
super(_SessionProxy, self).__init__()
self._locally_initiated = not pn_session
self._connection = connection
if not pn_session:
@@ -703,7 +600,17 @@ class _SessionProxy(object):
self._pn_session = None
self._connection = None
def remote_closed(self):
@property
def _endpoint_state(self):
return self._pn_session.state
# endpoint state machine actions:
def _ep_requested(self):
"""Peer has requested a new session."""
self.open()
def _ep_need_close(self):
"""Peer has closed its end of the session."""
links = self._links.copy() # may modify _links
for link in links:

View File

@@ -17,6 +17,7 @@
# under the License.
#
import common
# import logging
import os
import time
@@ -27,6 +28,7 @@ import dingus
class APITest(common.Test):
def setup(self):
# logging.getLogger("dingus").setLevel(logging.DEBUG)
self.container1 = dingus.Container("test-container-1")
self.container2 = dingus.Container("test-container-2")
@@ -256,6 +258,7 @@ class APITest(common.Test):
assert cb1.failed_ct == 0
c1.process(time.time())
assert cb1.failed_ct > 0
assert cb1.failed_error
def test_io_output_close(self):
"""Premature output close should trigger failed callback."""
@@ -270,6 +273,7 @@ class APITest(common.Test):
assert cb1.failed_ct == 0
c1.process(time.time())
assert cb1.failed_ct > 0
assert cb1.failed_error
def test_process_reentrancy(self):
"""Catch any attempt to re-enter Connection.process() from a

View File

@@ -221,21 +221,19 @@ class APITest(common.Test):
assert rl_handler.message_received_ct == 5
assert sender.credit == 0
assert sender.pending == 1
# TODO(kgiusti) bug - probably shouldn't call back when pending >
# available credit
assert sl_handler.credit_granted_ct == 2
assert sl_handler.credit_granted_ct == 1
receiver.add_capacity(1)
self.process_connections()
assert sender.credit == 0
assert sender.pending == 0
assert sl_handler.credit_granted_ct == 3
assert sl_handler.credit_granted_ct == 1
# verify new credit becomes available:
receiver.add_capacity(1)
self.process_connections()
assert sender.credit == 1
assert sl_handler.credit_granted_ct == 4
assert sl_handler.credit_granted_ct == 2
def test_send_presettled(self):
sender, receiver = self._setup_sender_sync()