2016-10-03 15:35:08 -04:00

889 lines
34 KiB
Python

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
__all__ = [
"SenderEventHandler",
"SenderLink",
"ReceiverEventHandler",
"ReceiverLink"
]
import collections
import logging
import proton
from pyngus.endpoint import Endpoint
LOG = logging.getLogger(__name__)
_PROTON_VERSION = (int(getattr(proton, "VERSION_MAJOR", 0)),
int(getattr(proton, "VERSION_MINOR", 0)))
# map property names to proton values:
_dist_modes = {"copy": proton.Terminus.DIST_MODE_COPY,
"move": proton.Terminus.DIST_MODE_MOVE}
_snd_settle_modes = {"settled": proton.Link.SND_SETTLED,
"unsettled": proton.Link.SND_UNSETTLED,
"mixed": proton.Link.SND_MIXED}
_rcv_settle_modes = {"first": proton.Link.RCV_FIRST,
"second": proton.Link.RCV_SECOND}
# TODO(kgiusti): this is duplicated in connection.py, put in common file
class _CallbackLock(object):
"""A utility class for detecting when a callback invokes a non-reentrant
Pyngus method.
"""
def __init__(self, link):
super(_CallbackLock, self).__init__()
self._link = link
self.in_callback = 0
def __enter__(self):
# manually lock parent - can't enter its non-reentrant methods
self._link._connection._callback_lock.__enter__()
self.in_callback += 1
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.in_callback -= 1
self._link._connection._callback_lock.__exit__(None, None, None)
# if a call is made to a non-reentrant method while this context is
# held, then the method will raise a RuntimeError(). Return false to
# propagate the exception to the caller
return False
def _not_reentrant(func):
"""Decorator that prevents callbacks from calling into link methods that
are not reentrant """
def wrap(*args, **kws):
link = args[0]
if link._callback_lock.in_callback:
m = "Link %s cannot be invoked from a callback!" % func
raise RuntimeError(m)
return func(*args, **kws)
return wrap
class _Link(Endpoint):
"""A generic Link base class."""
def __init__(self, connection, pn_link):
super(_Link, self).__init__(pn_link.name)
self._connection = connection
self._handler = None
self._properties = None
self._user_context = None
self._rejected = False # requested link was refused
self._failed = False # protocol error occurred
self._callback_lock = _CallbackLock(self)
# TODO(kgiusti): raise jira to add 'context' attr to api
self._pn_link = pn_link
pn_link.context = self
def configure(self, target_address, source_address, handler, properties):
"""Assign addresses, properties, etc."""
self._handler = handler
self._properties = properties
dynamic_props = None
if properties:
dynamic_props = properties.get("dynamic-node-properties")
mode = _dist_modes.get(properties.get("distribution-mode"))
if mode is not None:
self._pn_link.source.distribution_mode = mode
mode = _snd_settle_modes.get(properties.get("snd-settle-mode"))
if mode is not None:
self._pn_link.snd_settle_mode = mode
mode = _rcv_settle_modes.get(properties.get("rcv-settle-mode"))
if mode is not None:
self._pn_link.rcv_settle_mode = mode
if target_address is None:
if not self._pn_link.is_sender:
raise Exception("Dynamic target not allowed")
self._pn_link.target.dynamic = True
if dynamic_props:
self._pn_link.target.properties.clear()
self._pn_link.target.properties.put_dict(dynamic_props)
elif target_address:
self._pn_link.target.address = target_address
if source_address is None:
if not self._pn_link.is_receiver:
raise Exception("Dynamic source not allowed")
self._pn_link.source.dynamic = True
if dynamic_props:
self._pn_link.source.properties.clear()
self._pn_link.source.properties.put_dict(dynamic_props)
elif source_address:
self._pn_link.source.address = source_address
@property
def name(self):
return self._name
@property
def connection(self):
return self._connection
def open(self):
if self._pn_link.state & proton.Endpoint.LOCAL_UNINIT:
LOG.debug("Opening the link.")
self._pn_link.open()
def _get_user_context(self):
return self._user_context
def _set_user_context(self, ctxt):
self._user_context = ctxt
_uc_docstr = """Arbitrary application object associated with this link."""
user_context = property(_get_user_context, _set_user_context,
doc=_uc_docstr)
@property
def source_address(self):
"""Return the authorative source of the link."""
# If link is a sender, source is determined by the local
# value, else use the remote.
if self._pn_link.is_sender:
return self._pn_link.source.address
else:
return self._pn_link.remote_source.address
@property
def target_address(self):
"""Return the authorative target of the link."""
# If link is a receiver, target is determined by the local
# value, else use the remote.
if self._pn_link.is_receiver:
return self._pn_link.target.address
else:
return self._pn_link.remote_target.address
def close(self, pn_condition=None):
if self._pn_link.state & proton.Endpoint.LOCAL_ACTIVE:
LOG.debug("Closing the link.")
if pn_condition:
self._pn_link.condition = pn_condition
self._pn_link.close()
@property
def active(self):
state = self._pn_link.state
return (not self._failed and
state == (proton.Endpoint.LOCAL_ACTIVE |
proton.Endpoint.REMOTE_ACTIVE))
@property
def closed(self):
state = self._pn_link.state
return (self._failed or
state == (proton.Endpoint.LOCAL_CLOSED |
proton.Endpoint.REMOTE_CLOSED))
def reject(self, pn_condition):
self._rejected = True # prevent 'active' callback!
self._pn_link.open()
if pn_condition:
self._pn_link.condition = pn_condition
self._pn_link.close()
def destroy(self):
LOG.debug("link destroyed %s", str(self._pn_link))
self._user_context = None
self._connection = None
self._handler = None
self._callback_lock = None
if self._pn_link:
session = self._pn_link.session.context
self._pn_link.context = None
self._pn_link.free()
self._pn_link = None
session.link_destroyed(self) # destroy session _after_ link
def _process_delivery(self, pn_delivery):
raise NotImplementedError("Must Override")
def _process_credit(self):
raise NotImplementedError("Must Override")
def _link_failed(self, error):
raise NotImplementedError("Must Override")
def _session_closed(self):
"""Remote has closed the session used by this link."""
# if link not already closed:
if self._endpoint_state & proton.Endpoint.REMOTE_ACTIVE:
# simulate close received
self._process_remote_state()
elif self._endpoint_state & proton.Endpoint.REMOTE_UNINIT:
# locally created link, will never come up
self._failed = True
self._link_failed("Parent session closed.")
# Proton's event model was changed after 0.7
if (_PROTON_VERSION >= (0, 8)):
_endpoint_event_map = {
proton.Event.LINK_REMOTE_OPEN: Endpoint.REMOTE_OPENED,
proton.Event.LINK_REMOTE_CLOSE: Endpoint.REMOTE_CLOSED,
proton.Event.LINK_LOCAL_OPEN: Endpoint.LOCAL_OPENED,
proton.Event.LINK_LOCAL_CLOSE: Endpoint.LOCAL_CLOSED}
@staticmethod
def _handle_proton_event(pn_event, connection):
etype = pn_event.type
if etype == proton.Event.DELIVERY:
pn_link = pn_event.link
pn_link.context and \
pn_link.context._process_delivery(pn_event.delivery)
return True
if etype == proton.Event.LINK_FLOW:
pn_link = pn_event.link
pn_link.context and pn_link.context._process_credit()
return True
ep_event = _Link._endpoint_event_map.get(etype)
if ep_event is not None:
pn_link = pn_event.link
pn_link.context and \
pn_link.context._process_endpoint_event(ep_event)
return True
if etype == proton.Event.LINK_INIT:
pn_link = pn_event.link
# create a new link if requested by remote:
c = hasattr(pn_link, 'context') and pn_link.context
if not c:
session = pn_link.session.context
if (pn_link.is_sender and
pn_link.name not in connection._sender_links):
LOG.debug("Remotely initiated Sender needs init")
link = session.request_sender(pn_link)
connection._sender_links[pn_link.name] = link
elif (pn_link.is_receiver and
pn_link.name not in connection._receiver_links):
LOG.debug("Remotely initiated Receiver needs init")
link = session.request_receiver(pn_link)
connection._receiver_links[pn_link.name] = link
return True
if etype == proton.Event.LINK_FINAL:
LOG.debug("link finalized: %s", pn_event.context)
return True
return False # event not handled
elif hasattr(proton.Event, "LINK_REMOTE_STATE"):
# 0.7 proton event model
@staticmethod
def _handle_proton_event(pn_event, connection):
if pn_event.type == proton.Event.LINK_REMOTE_STATE:
pn_link = pn_event.link
# create a new link if requested by remote:
c = hasattr(pn_link, 'context') and pn_link.context
if ((not c) and
(pn_link.state & proton.Endpoint.LOCAL_UNINIT)):
session = pn_link.session.context
if (pn_link.is_sender and
pn_link.name not in connection._sender_links):
LOG.debug("Remotely initiated Sender needs init")
link = session.request_sender(pn_link)
connection._sender_links[pn_link.name] = link
elif (pn_link.is_receiver and
pn_link.name not in connection._receiver_links):
LOG.debug("Remotely initiated Receiver needs init")
link = session.request_receiver(pn_link)
connection._receiver_links[pn_link.name] = link
pn_link.context._process_remote_state()
return True
elif pn_event.type == proton.Event.LINK_LOCAL_STATE:
pn_link = pn_event.link
pn_link.context._process_local_state()
elif pn_event.type == proton.Event.LINK_FLOW:
pn_link = pn_event.link
pn_link.context._process_credit()
elif pn_event.type == proton.Event.DELIVERY:
pn_link = pn_event.link
pn_delivery = pn_event.delivery
pn_link.context._process_delivery(pn_delivery)
else:
return False # unknown
return True
# endpoint methods:
@property
def _endpoint_state(self):
return self._pn_link.state
def _ep_error(self, error):
super(_Link, self)._ep_error(error)
self._failed = True
self._link_failed("Endpoint protocol error: %s" % error)
def _get_remote_settle_modes(pn_link):
"""Return a map containing the settle modes as provided by the remote.
Skip any default value.
"""
modes = {}
snd = pn_link.remote_snd_settle_mode
if snd == proton.Link.SND_UNSETTLED:
modes['snd-settle-mode'] = 'unsettled'
elif snd == proton.Link.SND_SETTLED:
modes['snd-settle-mode'] = 'settled'
if pn_link.remote_rcv_settle_mode == proton.Link.RCV_SECOND:
modes['rcv-settle-mode'] = 'second'
return modes
class SenderEventHandler(object):
def sender_active(self, sender_link):
LOG.debug("sender_active (ignored)")
def sender_remote_closed(self, sender_link, pn_condition):
LOG.debug("sender_remote_closed condition=%s (ignored)",
pn_condition)
def sender_closed(self, sender_link):
LOG.debug("sender_closed (ignored)")
def credit_granted(self, sender_link):
LOG.debug("credit_granted (ignored)")
def sender_failed(self, sender_link, error):
"""Protocol error occurred."""
LOG.debug("sender_failed error=%s (ignored)", error)
class SenderLink(_Link):
# Status for message send callback
#
ABORTED = -2
TIMED_OUT = -1
UNKNOWN = 0
ACCEPTED = 1
REJECTED = 2
RELEASED = 3
MODIFIED = 4
_DISPOSITION_STATE_MAP = {
proton.Disposition.ACCEPTED: ACCEPTED,
proton.Disposition.REJECTED: REJECTED,
proton.Disposition.RELEASED: RELEASED,
proton.Disposition.MODIFIED: MODIFIED,
}
class _SendRequest(object):
"""Tracks sending a single message."""
def __init__(self, link, tag, message, callback, handle, deadline):
self.link = link
self.tag = tag
self.message = message
self.callback = callback
self.handle = handle
self.deadline = deadline
self.link._send_requests[self.tag] = self
if self.deadline:
self.link._connection._add_timer(self.deadline, self)
def __call__(self):
"""Invoked by Connection on timeout (now <= deadline)."""
self.link._send_expired(self)
def destroy(self, state, info):
"""Invoked on final completion of send."""
if self.deadline:
self.link._connection._cancel_timer(self.deadline, self)
if self.tag in self.link._send_requests:
del self.link._send_requests[self.tag]
if self.callback:
with self.link._callback_lock:
self.callback(self.link, self.handle, state, info)
def __init__(self, connection, pn_link):
super(SenderLink, self).__init__(connection, pn_link)
self._send_requests = {} # indexed by tag
self._pending_sends = collections.deque() # tags in order sent
self._next_deadline = 0
self._next_tag = 0
self._last_credit = 0
# TODO(kgiusti) - think about send-settle-mode configuration
def send(self, message, delivery_callback=None,
handle=None, deadline=None):
tag = "pyngus-tag-%s" % self._next_tag
self._next_tag += 1
send_req = SenderLink._SendRequest(self, tag, message,
delivery_callback, handle,
deadline)
self._pn_link.delivery(tag)
pn_delivery = self._pn_link.current
if pn_delivery and pn_delivery.writable:
# send oldest pending:
if self._pending_sends:
self._pending_sends.append(tag)
tag = self._pending_sends.popleft()
send_req = self._send_requests[tag]
self._write_msg(pn_delivery, send_req)
else:
LOG.debug("Send is pending for credit, tag=%s", tag)
self._pending_sends.append(tag)
return 0
@property
def pending(self):
return len(self._send_requests)
@property
def credit(self):
return self._pn_link.credit
def reject(self, pn_condition=None):
"""See Link Reject, AMQP1.0 spec."""
self._pn_link.source.type = proton.Terminus.UNSPECIFIED
super(SenderLink, self).reject(pn_condition)
@_not_reentrant
def destroy(self):
self._connection._remove_sender(self._name)
self._connection = None
super(SenderLink, self).destroy()
def _process_delivery(self, pn_delivery):
"""Check if the delivery can be processed."""
if pn_delivery.tag in self._send_requests:
if pn_delivery.settled or pn_delivery.remote_state:
# remote has reached a 'terminal state'
outcome = pn_delivery.remote_state
state = SenderLink._DISPOSITION_STATE_MAP.get(outcome,
self.UNKNOWN)
pn_disposition = pn_delivery.remote
info = {}
if state == SenderLink.REJECTED:
if pn_disposition.condition:
info["condition"] = pn_disposition.condition
elif state == SenderLink.MODIFIED:
info["delivery-failed"] = pn_disposition.failed
info["undeliverable-here"] = pn_disposition.undeliverable
annotations = pn_disposition.annotations
if annotations:
info["message-annotations"] = annotations
send_req = self._send_requests.pop(pn_delivery.tag)
send_req.destroy(state, info)
pn_delivery.settle()
elif pn_delivery.writable:
# we can now send on this delivery
if self._pending_sends:
tag = self._pending_sends.popleft()
send_req = self._send_requests[tag]
self._write_msg(pn_delivery, send_req)
else:
# tag no longer valid, expired or canceled send?
LOG.debug("Delivery ignored, tag=%s", str(pn_delivery.tag))
pn_delivery.settle()
def _process_credit(self):
# check if any pending deliveries are now writable:
pn_delivery = self._pn_link.current
while (self._pending_sends and
pn_delivery and pn_delivery.writable):
self._process_delivery(pn_delivery)
pn_delivery = self._pn_link.current
# Alert if credit has become available
if self._handler and not self._rejected:
if 0 < self._pn_link.credit > self._last_credit:
with self._callback_lock:
self._handler.credit_granted(self)
self._last_credit = self._pn_link.credit
def _write_msg(self, pn_delivery, send_req):
# given a writable delivery, send a message
self._pn_link.send(send_req.message.encode())
self._pn_link.advance()
self._last_credit = self._pn_link.credit
if not send_req.callback:
# no disposition callback, so we can discard the send request and
# settle the delivery immediately
send_req.destroy(SenderLink.UNKNOWN, {})
pn_delivery.settle()
def _send_expired(self, send_req):
LOG.debug("Send request timed-out, tag=%s", send_req.tag)
try:
self._pending_sends.remove(send_req.tag)
except ValueError:
pass
send_req.destroy(SenderLink.TIMED_OUT, None)
def _link_failed(self, error):
if self._handler and not self._rejected:
with self._callback_lock:
self._handler.sender_failed(self, error)
# endpoint state machine actions:
def _ep_active(self):
LOG.debug("SenderLink is up")
if self._handler and not self._rejected:
with self._callback_lock:
self._handler.sender_active(self)
def _ep_need_close(self):
LOG.debug("SenderLink remote closed")
if self._handler and not self._rejected:
cond = self._pn_link.remote_condition
with self._callback_lock:
self._handler.sender_remote_closed(self, cond)
def _ep_closed(self):
LOG.debug("SenderLink close completed")
# abort any pending sends
self._pending_sends.clear()
pn_condition = self._pn_link.condition
info = {"condition": pn_condition} if pn_condition else None
while self._send_requests:
key, send_req = self._send_requests.popitem()
send_req.destroy(SenderLink.ABORTED, info)
if self._handler and not self._rejected:
with self._callback_lock:
self._handler.sender_closed(self)
def _ep_requested(self):
LOG.debug("Remote has requested a SenderLink")
handler = self._connection._handler
if handler:
pn_link = self._pn_link
props = _get_remote_settle_modes(pn_link)
# has the remote requested a source address?
req_source = ""
if pn_link.remote_source.dynamic:
req_source = None
req_props = pn_link.remote_source.properties
if req_props and req_props.next() == proton.Data.MAP:
props["dynamic-node-properties"] = req_props.get_dict()
elif pn_link.remote_source.address:
req_source = pn_link.remote_source.address
props["target-address"] = pn_link.remote_target.address
dist_mode = pn_link.remote_source.distribution_mode
if (dist_mode == proton.Terminus.DIST_MODE_COPY):
props["distribution-mode"] = "copy"
elif (dist_mode == proton.Terminus.DIST_MODE_MOVE):
props["distribution-mode"] = "move"
with self._connection._callback_lock:
handler.sender_requested(self._connection,
pn_link.name, # handle
pn_link.name,
req_source,
props)
class ReceiverEventHandler(object):
def receiver_active(self, receiver_link):
LOG.debug("receiver_active (ignored)")
def receiver_remote_closed(self, receiver_link, pn_condition):
LOG.debug("receiver_remote_closed condition=%s (ignored)",
pn_condition)
def receiver_closed(self, receiver_link):
LOG.debug("receiver_closed (ignored)")
def receiver_failed(self, receiver_link, error):
"""Protocol error occurred."""
LOG.debug("receiver_failed error=%s (ignored)", error)
def message_received(self, receiver_link, message, handle):
LOG.debug("message_received (ignored)")
class ReceiverLink(_Link):
def __init__(self, connection, pn_link):
super(ReceiverLink, self).__init__(connection, pn_link)
self._next_handle = 0
self._unsettled_deliveries = {} # indexed by handle
# TODO(kgiusti) - think about receiver-settle-mode configuration
@property
def capacity(self):
return self._pn_link.credit
def add_capacity(self, amount):
self._pn_link.flow(amount)
def _settle_delivery(self, handle, state):
pn_delivery = self._unsettled_deliveries.pop(handle, None)
if pn_delivery is None:
raise Exception("Invalid message handle: %s" % str(handle))
pn_delivery.update(state)
pn_delivery.settle()
def message_accepted(self, handle):
self._settle_delivery(handle, proton.Delivery.ACCEPTED)
def message_released(self, handle):
self._settle_delivery(handle, proton.Delivery.RELEASED)
def message_rejected(self, handle, pn_condition=None):
pn_delivery = self._unsettled_deliveries.pop(handle, None)
if pn_delivery is None:
raise Exception("Invalid message handle: %s" % str(handle))
if pn_condition:
pn_delivery.local.condition = pn_condition
pn_delivery.update(proton.Delivery.REJECTED)
pn_delivery.settle()
def message_modified(self, handle, delivery_failed, undeliverable,
annotations):
pn_delivery = self._unsettled_deliveries.pop(handle, None)
if pn_delivery is None:
raise Exception("Invalid message handle: %s" % str(handle))
pn_delivery.local.failed = delivery_failed
pn_delivery.local.undeliverable = undeliverable
if annotations:
pn_delivery.local.annotations = annotations
pn_delivery.update(proton.Delivery.MODIFIED)
pn_delivery.settle()
def reject(self, pn_condition=None):
"""See Link Reject, AMQP1.0 spec."""
self._pn_link.target.type = proton.Terminus.UNSPECIFIED
super(ReceiverLink, self).reject(pn_condition)
@_not_reentrant
def destroy(self):
self._connection._remove_receiver(self._name)
self._connection = None
super(ReceiverLink, self).destroy()
def _process_delivery(self, pn_delivery):
"""Check if the delivery can be processed."""
if pn_delivery.readable and not pn_delivery.partial:
data = self._pn_link.recv(pn_delivery.pending)
msg = proton.Message()
msg.decode(data)
self._pn_link.advance()
if self._handler:
handle = "rmsg-%s:%x" % (self._name, self._next_handle)
self._next_handle += 1
self._unsettled_deliveries[handle] = pn_delivery
with self._callback_lock:
self._handler.message_received(self, msg, handle)
else:
# TODO(kgiusti): is it ok to assume Delivery.REJECTED?
pn_delivery.settle()
def _process_credit(self):
# Only used by SenderLink
pass
def _link_failed(self, error):
if self._handler and not self._rejected:
with self._callback_lock:
self._handler.receiver_failed(self, error)
# endpoint state machine actions:
def _ep_active(self):
LOG.debug("ReceiverLink is up")
if self._handler and not self._rejected:
with self._callback_lock:
self._handler.receiver_active(self)
def _ep_need_close(self):
LOG.debug("ReceiverLink remote closed")
if self._handler and not self._rejected:
cond = self._pn_link.remote_condition
with self._callback_lock:
self._handler.receiver_remote_closed(self, cond)
def _ep_closed(self):
LOG.debug("ReceiverLink close completed")
if self._handler and not self._rejected:
with self._callback_lock:
self._handler.receiver_closed(self)
def _ep_requested(self):
LOG.debug("Remote has initiated a ReceiverLink")
handler = self._connection._handler
if handler:
pn_link = self._pn_link
props = _get_remote_settle_modes(pn_link)
# has the remote requested a target address?
req_target = ""
if pn_link.remote_target.dynamic:
req_target = None
req_props = pn_link.remote_target.properties
if req_props and req_props.next() == proton.Data.MAP:
props["dynamic-node-properties"] = req_props.get_dict()
elif pn_link.remote_target.address:
req_target = pn_link.remote_target.address
props["source-address"] = pn_link.remote_source.address
dist_mode = pn_link.remote_source.distribution_mode
if (dist_mode == proton.Terminus.DIST_MODE_COPY):
props["distribution-mode"] = "copy"
elif (dist_mode == proton.Terminus.DIST_MODE_MOVE):
props["distribution-mode"] = "move"
with self._connection._callback_lock:
handler.receiver_requested(self._connection,
pn_link.name, # handle
pn_link.name,
req_target,
props)
class _SessionProxy(Endpoint):
"""Corresponds to a Proton Session object."""
def __init__(self, name, connection, pn_session=None):
super(_SessionProxy, self).__init__(name)
self._locally_initiated = not pn_session
self._connection = connection
if not pn_session:
pn_session = connection._pn_connection.session()
self._pn_session = pn_session
self._links = set()
pn_session.context = self
def open(self):
if self._pn_session.state & proton.Endpoint.LOCAL_UNINIT:
self._pn_session.open()
def new_sender(self, name):
"""Create a new sender link."""
pn_link = self._pn_session.sender(name)
return self.request_sender(pn_link)
def request_sender(self, pn_link):
"""Create link from request for a sender."""
sl = SenderLink(self._connection, pn_link)
self._links.add(sl)
return sl
def new_receiver(self, name):
"""Create a new receiver link."""
pn_link = self._pn_session.receiver(name)
return self.request_receiver(pn_link)
def request_receiver(self, pn_link):
"""Create link from request for a receiver."""
rl = ReceiverLink(self._connection, pn_link)
self._links.add(rl)
return rl
def link_destroyed(self, link):
"""Link has been destroyed."""
self._links.discard(link)
if not self._links:
# no more links
LOG.debug("destroying unneeded session")
self._pn_session.close()
self._pn_session.free()
self._pn_session = None
self._connection = None
# Proton's event model was changed after 0.7
if (_PROTON_VERSION >= (0, 8)):
_endpoint_event_map = {
proton.Event.SESSION_REMOTE_OPEN: Endpoint.REMOTE_OPENED,
proton.Event.SESSION_REMOTE_CLOSE: Endpoint.REMOTE_CLOSED,
proton.Event.SESSION_LOCAL_OPEN: Endpoint.LOCAL_OPENED,
proton.Event.SESSION_LOCAL_CLOSE: Endpoint.LOCAL_CLOSED}
@staticmethod
def _handle_proton_event(pn_event, connection):
ep_event = _SessionProxy._endpoint_event_map.get(pn_event.type)
if ep_event is not None:
pn_session = pn_event.context
pn_session.context._process_endpoint_event(ep_event)
elif pn_event.type == proton.Event.SESSION_INIT:
# create a new session if requested by remote:
pn_session = pn_event.context
c = hasattr(pn_session, 'context') and pn_session.context
if not c:
LOG.debug("Opening remotely initiated session")
name = "session-%d" % connection._remote_session_id
connection._remote_session_id += 1
_SessionProxy(name, connection, pn_session)
elif pn_event.type == proton.Event.SESSION_FINAL:
LOG.debug("Session finalized: %s", pn_event.context)
else:
return False # unknown
return True # handled
elif hasattr(proton.Event, "SESSION_REMOTE_STATE"):
# 0.7 proton event model
@staticmethod
def _handle_proton_event(pn_event, connection):
if pn_event.type == proton.Event.SESSION_REMOTE_STATE:
pn_session = pn_event.session
# create a new session if requested by remote:
c = hasattr(pn_session, 'context') and pn_session.context
if not c:
LOG.debug("Opening remotely initiated session")
name = "session-%d" % connection._remote_session_id
connection._remote_session_id += 1
_SessionProxy(name, connection, pn_session)
pn_session.context._process_remote_state()
elif pn_event.type == proton.Event.SESSION_LOCAL_STATE:
pn_session = pn_event.session
pn_session.context._process_local_state()
else:
return False # unknown
return True # handled
@property
def _endpoint_state(self):
return self._pn_session.state
# endpoint state machine actions:
def _ep_requested(self):
"""Peer has requested a new session."""
LOG.debug("Session %s requested - opening...",
self._name)
self.open()
def _ep_active(self):
"""Both ends of the Endpoint have become active."""
LOG.debug("Session %s active", self._name)
def _ep_need_close(self):
"""Peer has closed its end of the session."""
LOG.debug("Session %s close requested - closing...",
self._name)
links = self._links.copy() # may modify _links
for link in links:
link._session_closed()
def _ep_closed(self):
"""Both ends of the endpoint have closed."""
LOG.debug("Session %s closed", self._name)