Add support for Proton 0.8

This commit is contained in:
Kenneth Giusti 2014-09-12 16:46:33 -04:00
parent a2b9a7814e
commit 373d46d0e5
6 changed files with 325 additions and 136 deletions

@ -27,10 +27,14 @@ import time
import warnings
from pyngus.endpoint import Endpoint
from pyngus.link import _Link
from pyngus.link import _SessionProxy
LOG = logging.getLogger(__name__)
_PROTON_VERSION = (int(getattr(proton, "VERSION_MAJOR", 0)),
int(getattr(proton, "VERSION_MINOR", 0)))
class ConnectionEventHandler(object):
"""An implementation of an AMQP 1.0 Connection."""
@ -283,9 +287,10 @@ class Connection(Endpoint):
self._pn_transport.unbind()
self._pn_transport = None
self._pn_connection = None
# memory leak: drain the collector before releasing it
while self._pn_collector.peek():
self._pn_collector.pop()
if _PROTON_VERSION < (0, 8):
# memory leak: drain the collector before releasing it
while self._pn_collector.peek():
self._pn_collector.pop()
self._pn_collector = None
_REMOTE_REQ = (proton.Endpoint.LOCAL_UNINIT
@ -339,71 +344,26 @@ class Connection(Endpoint):
# process events from proton:
pn_event = self._pn_collector.peek()
while pn_event:
if pn_event.type == proton.Event.CONNECTION_REMOTE_STATE:
self.process_remote_state()
elif pn_event.type == proton.Event.CONNECTION_LOCAL_STATE:
self.process_local_state()
elif pn_event.type == proton.Event.SESSION_REMOTE_STATE:
pn_session = pn_event.session
# create a new session if requested by remote:
c = hasattr(pn_session, 'context') and pn_session.context
if ((not c) and
(pn_session.state & proton.Endpoint.LOCAL_UNINIT)):
LOG.debug("Opening remotely initiated session")
name = "session-%d" % self._remote_session_id
self._remote_session_id += 1
session = _SessionProxy(name, self, pn_session)
pn_session.context.process_remote_state()
elif pn_event.type == proton.Event.SESSION_LOCAL_STATE:
pn_session = pn_event.session
session = pn_session.context
if session:
session.process_local_state()
elif pn_event.type == proton.Event.LINK_REMOTE_STATE:
pn_link = pn_event.link
# create a new link if requested by remote:
c = hasattr(pn_link, 'context') and pn_link.context
if ((not c) and
(pn_link.state & proton.Endpoint.LOCAL_UNINIT)):
session = pn_link.session.context
if (pn_link.is_sender and
pn_link.name not in self._sender_links):
LOG.debug("Remotely initiated Sender needs init")
link = session.request_sender(pn_link)
self._sender_links[pn_link.name] = link
elif (pn_link.is_receiver and
pn_link.name not in self._receiver_links):
LOG.debug("Remotely initiated Receiver needs init")
link = session.request_receiver(pn_link)
self._receiver_links[pn_link.name] = link
pn_link.context.process_remote_state()
elif pn_event.type == proton.Event.LINK_LOCAL_STATE:
pn_link = pn_event.link
link = pn_link.context
if link:
link.process_local_state()
elif pn_event.type == proton.Event.DELIVERY:
link = pn_event.link.context
pn_delivery = pn_event.delivery
link._process_delivery(pn_delivery)
elif pn_event.type == proton.Event.LINK_FLOW:
link = pn_event.link.context
link._process_credit()
if self._handle_proton_event(pn_event):
pass
elif _SessionProxy._handle_proton_event(pn_event, self):
pass
elif _Link._handle_proton_event(pn_event, self):
pass
self._pn_collector.pop()
pn_event = self._pn_collector.peek()
# invoked closed callback after endpoint has fully closed and all
# pending I/O has completed:
if (self._endpoint_state == self._CLOSED and
# re-check for connection failure after processing all pending
# engine events:
if self._error:
if self._handler:
self._handler.connection_failed(self, self._error)
# nag application until connection is destroyed
self._next_deadline = now
elif (self._endpoint_state == self._CLOSED and
self._read_done and self._write_done):
# invoke closed callback after endpoint has fully closed and
# all pending I/O has completed:
if self._handler:
self._handler.connection_closed(self)
@ -589,7 +549,7 @@ class Connection(Endpoint):
if name in self._receiver_links:
del self._receiver_links[name]
def _connection_failed(self, error):
def _connection_failed(self, error="Error not specified!"):
"""Clean up after connection failure detected."""
if not self._error:
LOG.error("Connection failed: %s", str(error))
@ -674,6 +634,40 @@ class Connection(Endpoint):
cb()
return self._timers_heap[0] if self._timers_heap else 0
# Proton's event model was changed after 0.7
if (_PROTON_VERSION >= (0, 8)):
_endpoint_event_map = {
proton.Event.CONNECTION_REMOTE_OPEN: Endpoint.REMOTE_OPENED,
proton.Event.CONNECTION_REMOTE_CLOSE: Endpoint.REMOTE_CLOSED,
proton.Event.CONNECTION_OPEN: Endpoint.LOCAL_OPENED,
proton.Event.CONNECTION_CLOSE: Endpoint.LOCAL_CLOSED}
def _handle_proton_event(self, pn_event):
ep_event = Connection._endpoint_event_map.get(pn_event.type)
if ep_event is not None:
self._process_endpoint_event(ep_event)
elif pn_event.type == proton.Event.CONNECTION_INIT:
LOG.debug("Connection created: %s", pn_event.context)
elif pn_event.type == proton.Event.CONNECTION_FINAL:
LOG.error("Connection finalized: %s", pn_event.context)
elif pn_event.type == proton.Event.TRANSPORT_ERROR:
self._connection_failed(str(self._pn_transport.condition))
else:
return False # unknown
return True # handled
elif hasattr(proton.Event, "CONNECTION_LOCAL_STATE"):
# 0.7 proton event model
def _handle_proton_event(self, pn_event):
if pn_event.type == proton.Event.CONNECTION_LOCAL_STATE:
self._process_local_state()
elif pn_event.type == proton.Event.CONNECTION_REMOTE_STATE:
self._process_remote_state()
else:
return False # unknown
return True # handled
else:
raise Exception("The installed version of Proton is not supported.")
# endpoint state machine actions:
def _ep_active(self):
@ -689,7 +683,7 @@ class Connection(Endpoint):
cond = self._pn_connection.remote_condition
self._handler.connection_remote_closed(self, cond)
def _ep_error(self):
def _ep_error(self, error):
"""The endpoint state machine failed due to protocol error."""
super(Connection, self)._ep_error()
super(Connection, self)._ep_error(error)
self._connection_failed("Protocol error occurred.")

@ -16,14 +16,18 @@
# under the License.
import logging
import proton
LOG = logging.getLogger(__name__)
_PROTON_VERSION = (int(getattr(proton, "VERSION_MAJOR", 0)),
int(getattr(proton, "VERSION_MINOR", 0)))
class Endpoint(object):
"""AMQP Endpoint state machine."""
# Endpoint States: (note: keep in sync with _fsm below!)
# Endpoint States:
STATE_UNINIT = 0 # initial state
STATE_PENDING = 1 # local opened, waiting for remote to open
STATE_REQUESTED = 2 # remote opened, waiting for local to open
@ -35,83 +39,131 @@ class Endpoint(object):
STATE_CLOSED = 8 # terminal state
STATE_ERROR = 9 # unexpected state transition
state_names = ["STATE_UNINIT", "STATE_PENDING", "STATE_REQUESTED",
STATE_NAMES = ["STATE_UNINIT", "STATE_PENDING", "STATE_REQUESTED",
"STATE_CANCELLED", "STATE_ABANDONED", "STATE_ACTIVE",
"STATE_NEED_CLOSE", "STATE_CLOSING", "STATE_CLOSED",
"STATE_ERROR"]
# Events - (which endpoint state has changed)
# Endpoint state transitions are fixed to the following sequence:
# UNINIT --> ACTIVE --> CLOSED
LOCAL_UPDATE = 0
REMOTE_UPDATE = 1
# Events:
# These correspond to endpoint events generated by the Proton Engine
event_names = ["LOCAL_UPDATE", "REMOTE_UPDATE"]
LOCAL_OPENED = 0
LOCAL_CLOSED = 1
REMOTE_OPENED = 2
REMOTE_CLOSED = 3
EVENT_NAMES = ["LOCAL_OPENED", "LOCAL_CLOSED",
"REMOTE_OPENED", "REMOTE_CLOSED"]
# Endpoint Finite State Machine:
# Indexed by current state, each entry is indexed by the event received and
# returns a tuple of (next-state, action). If there is no entry for a
# given event, _ep_error() is invoked and the endpoint moves to the
# terminal STATE_ERROR state.
_FSM = {}
_FSM[STATE_UNINIT] = {
LOCAL_OPENED: (STATE_PENDING, None),
REMOTE_OPENED: (STATE_REQUESTED, lambda s: s._ep_requested())
}
_FSM[STATE_PENDING] = {
LOCAL_CLOSED: (STATE_CANCELLED, None),
REMOTE_OPENED: (STATE_ACTIVE, lambda s: s._ep_active())
}
_FSM[STATE_REQUESTED] = {
LOCAL_OPENED: (STATE_ACTIVE, lambda s: s._ep_active()),
REMOTE_CLOSED: (STATE_ABANDONED, None)
}
_FSM[STATE_CANCELLED] = {
REMOTE_OPENED: (STATE_CLOSING, None)
}
_FSM[STATE_ABANDONED] = {
LOCAL_OPENED: (STATE_NEED_CLOSE, lambda s: s._ep_need_close()),
LOCAL_CLOSED: (STATE_CLOSED, lambda s: s._ep_closed())
}
_FSM[STATE_ACTIVE] = {
LOCAL_CLOSED: (STATE_CLOSING, None),
REMOTE_CLOSED: (STATE_NEED_CLOSE, lambda s: s._ep_need_close())
}
_FSM[STATE_NEED_CLOSE] = {
LOCAL_CLOSED: (STATE_CLOSED, lambda s: s._ep_closed())
}
_FSM[STATE_CLOSING] = {
REMOTE_CLOSED: (STATE_CLOSED, lambda s: s._ep_closed())
}
_FSM[STATE_CLOSED] = {
LOCAL_CLOSED: (STATE_CLOSED, None), # ignore redundant closes
REMOTE_CLOSED: (STATE_CLOSED, None)
}
_FSM[STATE_ERROR] = { # terminal state
LOCAL_OPENED: (STATE_ERROR, None),
LOCAL_CLOSED: (STATE_ERROR, None),
REMOTE_OPENED: (STATE_ERROR, None),
REMOTE_CLOSED: (STATE_ERROR, None)
}
def __init__(self, name):
self._name = name
self._state = Endpoint.STATE_UNINIT
# Finite State Machine:
# Indexed by current state, each entry is a list indexed by the event
# received. Contains a tuple of (next-state, action)
self._fsm = [
# STATE_UNINIT:
[(Endpoint.STATE_PENDING, None), # L(open)
(Endpoint.STATE_REQUESTED, self._ep_requested)], # R(open)
# STATE_PENDING:
[(Endpoint.STATE_CANCELLED, None), # L(close)
(Endpoint.STATE_ACTIVE, self._ep_active)], # R(open)
# STATE_REQUESTED:
[(Endpoint.STATE_ACTIVE, self._ep_active), # L(open)
(Endpoint.STATE_ABANDONED, None)], # R(close)
# STATE_CANCELLED:
[None,
(Endpoint.STATE_CLOSING, None)], # R(open)
# STATE_ABANDONED:
[(Endpoint.STATE_NEED_CLOSE, self._ep_need_close), # L(open)
(Endpoint.STATE_ERROR, self._ep_error)],
# STATE_ACTIVE:
[(Endpoint.STATE_CLOSING, None), # L(close)
(Endpoint.STATE_NEED_CLOSE, self._ep_need_close)], # R(close)
# STATE_NEED_CLOSE:
[(Endpoint.STATE_CLOSED, self._ep_closed), # L(close)
(Endpoint.STATE_ERROR, self._ep_error)],
# STATE_CLOSING:
[None,
(Endpoint.STATE_CLOSED, self._ep_closed)], # R(close)
# STATE_CLOSED:
[None, None],
# STATE_ERROR:
[None, None],
]
if (_PROTON_VERSION < (0, 8)):
# The old proton event model did not generate specific endpoint
# events. Rather it simply indicated local or remote state change
# occured without giving the value of the state (opened/closed).
# Map these events to open and close events, assuming the Proton
# endpoint state transitions are fixed to the following sequence:
# UNINIT --> ACTIVE --> CLOSED
self._remote_events = [Endpoint.REMOTE_OPENED,
Endpoint.REMOTE_CLOSED]
self._local_events = [Endpoint.LOCAL_OPENED,
Endpoint.LOCAL_CLOSED]
def process_remote_state(self):
"""Call when remote endpoint state changes."""
self._dispatch_event(Endpoint.REMOTE_UPDATE)
def _process_endpoint_event(self, event):
"""Called when the Proton Engine generates an endpoint state change
event.
"""
LOG.debug("Endpoint %s event: %s",
self._name, Endpoint.EVENT_NAMES[event])
state_fsm = Endpoint._FSM[self._state]
entry = state_fsm.get(event)
if not entry:
# protocol error: invalid event for current state
old_state = self._state
self._state = Endpoint.STATE_ERROR
self._ep_error("invalid event=%s in state=%s" %
(Endpoint.EVENT_NAMES[event],
Endpoint.STATE_NAMES[old_state]))
return
def process_local_state(self):
"""Call when local endpoint state changes."""
self._dispatch_event(Endpoint.LOCAL_UPDATE)
LOG.debug("Endpoint %s Old State: %s New State: %s",
self._name,
Endpoint.STATE_NAMES[self._state],
Endpoint.STATE_NAMES[entry[0]])
self._state = entry[0]
if entry[1]:
entry[1](self)
if (_PROTON_VERSION < (0, 8)):
def _process_remote_state(self):
"""Call when remote endpoint state changes."""
try:
event = self._remote_events.pop(0)
self._process_endpoint_event(event)
except IndexError:
LOG.debug("Endpoint %s: ignoring unexpected remote event",
self._name)
def _process_local_state(self):
"""Call when local endpoint state changes."""
try:
event = self._local_events.pop(0)
self._process_endpoint_event(event)
except IndexError:
LOG.debug("Endpoint %s: ignoring unexpected local event",
self._name)
@property
def _endpoint_state(self):
"""Returns the current endpoint state."""
raise NotImplementedError("Must Override")
def _dispatch_event(self, event):
LOG.debug("Endpoint %s event: %s",
self._name, Endpoint.event_names[event])
fsm = self._fsm[self._state]
entry = fsm[event]
if entry:
LOG.debug("Endpoint %s Old State: %s New State: %s",
self._name,
Endpoint.state_names[self._state],
Endpoint.state_names[entry[0]])
self._state = entry[0]
if entry[1]:
entry[1]()
# state entry actions - overridden by endpoint subclass:
def _ep_requested(self):
@ -130,7 +182,7 @@ class Endpoint(object):
"""Both ends of the endpoint have closed."""
LOG.debug("endpoint_closed - ignored")
def _ep_error(self):
def _ep_error(self, error):
"""Unanticipated/illegal state change."""
LOG.error("Endpoint state error: %s, %s",
self._name, Endpoint.state_names[self._state])
LOG.error("Endpoint state error: endpoint=%s, error=%s",
self._name, error)

@ -30,6 +30,9 @@ from pyngus.endpoint import Endpoint
LOG = logging.getLogger(__name__)
_PROTON_VERSION = (int(getattr(proton, "VERSION_MAJOR", 0)),
int(getattr(proton, "VERSION_MINOR", 0)))
# map property names to proton values:
_dist_modes = {"copy": proton.Terminus.DIST_MODE_COPY,
"move": proton.Terminus.DIST_MODE_MOVE}
@ -194,15 +197,95 @@ class _Link(Endpoint):
self._failed = True
self._link_failed("Parent session closed.")
# Proton's event model was changed after 0.7
if (_PROTON_VERSION >= (0, 8)):
_endpoint_event_map = {
proton.Event.LINK_REMOTE_OPEN: Endpoint.REMOTE_OPENED,
proton.Event.LINK_REMOTE_CLOSE: Endpoint.REMOTE_CLOSED,
proton.Event.LINK_OPEN: Endpoint.LOCAL_OPENED,
proton.Event.LINK_CLOSE: Endpoint.LOCAL_CLOSED}
@staticmethod
def _handle_proton_event(pn_event, connection):
ep_event = _Link._endpoint_event_map.get(pn_event.type)
if pn_event.type == proton.Event.DELIVERY:
pn_delivery = pn_event.context
pn_link = pn_delivery.link
pn_link.context._process_delivery(pn_delivery)
elif pn_event.type == proton.Event.LINK_FLOW:
pn_link = pn_event.context
pn_link.context._process_credit()
elif ep_event is not None:
pn_link = pn_event.context
if pn_link.context:
pn_link.context._process_endpoint_event(ep_event)
elif pn_event.type == proton.Event.LINK_INIT:
pn_link = pn_event.context
# create a new link if requested by remote:
c = hasattr(pn_link, 'context') and pn_link.context
if not c:
session = pn_link.session.context
if (pn_link.is_sender and
pn_link.name not in connection._sender_links):
LOG.debug("Remotely initiated Sender needs init")
link = session.request_sender(pn_link)
connection._sender_links[pn_link.name] = link
elif (pn_link.is_receiver and
pn_link.name not in connection._receiver_links):
LOG.debug("Remotely initiated Receiver needs init")
link = session.request_receiver(pn_link)
connection._receiver_links[pn_link.name] = link
elif pn_event.type == proton.Event.LINK_FINAL:
LOG.debug("link finalized: %s", pn_event.context)
else:
return False # unknown
return True # handled
elif hasattr(proton.Event, "LINK_REMOTE_STATE"):
# 0.7 proton event model
@staticmethod
def _handle_proton_event(pn_event, connection):
if pn_event.type == proton.Event.LINK_REMOTE_STATE:
pn_link = pn_event.link
# create a new link if requested by remote:
c = hasattr(pn_link, 'context') and pn_link.context
if ((not c) and
(pn_link.state & proton.Endpoint.LOCAL_UNINIT)):
session = pn_link.session.context
if (pn_link.is_sender and
pn_link.name not in connection._sender_links):
LOG.debug("Remotely initiated Sender needs init")
link = session.request_sender(pn_link)
connection._sender_links[pn_link.name] = link
elif (pn_link.is_receiver and
pn_link.name not in connection._receiver_links):
LOG.debug("Remotely initiated Receiver needs init")
link = session.request_receiver(pn_link)
connection._receiver_links[pn_link.name] = link
pn_link.context._process_remote_state()
return True
elif pn_event.type == proton.Event.LINK_LOCAL_STATE:
pn_link = pn_event.link
pn_link.context._process_local_state()
elif pn_event.type == proton.Event.LINK_FLOW:
pn_link = pn_event.link
pn_link.context._process_credit()
elif pn_event.type == proton.Event.DELIVERY:
pn_link = pn_event.link
pn_delivery = pn_event.delivery
pn_link.context._process_delivery(pn_delivery)
else:
return False # unknown
return True
# endpoint methods:
@property
def _endpoint_state(self):
return self._pn_link.state
def _ep_error(self):
super(_Link, self)._ep_error()
def _ep_error(self, error):
super(_Link, self)._ep_error(error)
self._failed = True
self._link_failed("Endpoint protocol error.")
self._link_failed("Endpoint protocol error: %s" % error)
def _get_remote_settle_modes(pn_link):
@ -341,7 +424,8 @@ class SenderLink(_Link):
proton.Disposition.MODIFIED: SenderLink.MODIFIED,
}
LOG.debug("Processing delivery, tag=%s", str(pn_delivery.tag))
LOG.debug("Processing send delivery, tag=%s",
str(pn_delivery.tag))
if pn_delivery.tag in self._send_requests:
if pn_delivery.settled: # remote has finished
LOG.debug("Remote has settled a sent msg")
@ -375,6 +459,7 @@ class SenderLink(_Link):
def _process_credit(self):
# check if any pending deliveries are now writable:
LOG.debug("credit event, link=%s", self.name)
pn_delivery = self._pn_link.current
while (self._pending_sends and
pn_delivery and pn_delivery.writable):
@ -548,6 +633,8 @@ class ReceiverLink(_Link):
def _process_delivery(self, pn_delivery):
"""Check if the delivery can be processed."""
LOG.debug("Processing receive delivery, tag=%s",
str(pn_delivery.tag))
if pn_delivery.readable and not pn_delivery.partial:
LOG.debug("Receive delivery readable")
data = self._pn_link.recv(pn_delivery.pending)
@ -668,6 +755,55 @@ class _SessionProxy(Endpoint):
self._pn_session = None
self._connection = None
# Proton's event model was changed after 0.7
if (_PROTON_VERSION >= (0, 8)):
_endpoint_event_map = {
proton.Event.SESSION_REMOTE_OPEN: Endpoint.REMOTE_OPENED,
proton.Event.SESSION_REMOTE_CLOSE: Endpoint.REMOTE_CLOSED,
proton.Event.SESSION_OPEN: Endpoint.LOCAL_OPENED,
proton.Event.SESSION_CLOSE: Endpoint.LOCAL_CLOSED}
@staticmethod
def _handle_proton_event(pn_event, connection):
ep_event = _SessionProxy._endpoint_event_map.get(pn_event.type)
if ep_event is not None:
pn_session = pn_event.context
pn_session.context._process_endpoint_event(ep_event)
elif pn_event.type == proton.Event.SESSION_INIT:
# create a new session if requested by remote:
pn_session = pn_event.context
c = hasattr(pn_session, 'context') and pn_session.context
if not c:
LOG.debug("Opening remotely initiated session")
name = "session-%d" % connection._remote_session_id
connection._remote_session_id += 1
_SessionProxy(name, connection, pn_session)
elif pn_event.type == proton.Event.SESSION_FINAL:
LOG.debug("Session finalized: %s", pn_event.context)
else:
return False # unknown
return True # handled
elif hasattr(proton.Event, "SESSION_REMOTE_STATE"):
# 0.7 proton event model
@staticmethod
def _handle_proton_event(pn_event, connection):
if pn_event.type == proton.Event.SESSION_REMOTE_STATE:
pn_session = pn_event.session
# create a new session if requested by remote:
c = hasattr(pn_session, 'context') and pn_session.context
if not c:
LOG.debug("Opening remotely initiated session")
name = "session-%d" % connection._remote_session_id
connection._remote_session_id += 1
_SessionProxy(name, connection, pn_session)
pn_session.context._process_remote_state()
elif pn_event.type == proton.Event.SESSION_LOCAL_STATE:
pn_session = pn_event.session
pn_session.context._process_local_state()
else:
return False # unknown
return True # handled
@property
def _endpoint_state(self):
return self._pn_session.state

@ -20,11 +20,15 @@
import gc
import time
import proton
import pyngus
class Test(object):
PROTON_VERSION = (int(getattr(proton, "VERSION_MAJOR", 0)),
int(getattr(proton, "VERSION_MINOR", 0)))
def __init__(self, name):
self.name = name

@ -401,6 +401,8 @@ class APITest(common.Test):
def test_io_output_close(self):
"""Premature output close should trigger failed callback."""
if self.PROTON_VERSION >= (0, 8):
raise common.Skipped("Skipping test - error deprecated?")
cb1 = common.ConnCallback()
c1 = self.container1.create_connection("c1", cb1)
c2 = self.container2.create_connection("c2")

@ -628,7 +628,8 @@ class APITest(common.Test):
def test_use_after_free(self):
"""Causes proton library to segfault!!!"""
raise common.Skipped("Skipping test - causes segfault in proton!")
if self.PROTON_VERSION < (0, 8):
raise common.Skipped("Skipping test - causes segfault in proton!")
sender = self.conn1.create_sender("src1", "tgt1")
sender.open()
self.process_connections()