# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. __all__ = [ "SenderEventHandler", "SenderLink", "ReceiverEventHandler", "ReceiverLink" ] import collections import logging import proton LOG = logging.getLogger(__name__) class _Link(object): """A generic Link base class.""" def __init__(self, connection, pn_link): self._state = _Link._STATE_UNINIT # last known endpoint state: self._ep_state = (proton.Endpoint.LOCAL_UNINIT | proton.Endpoint.REMOTE_UNINIT) self._connection = connection self._name = pn_link.name self._handler = None self._properties = None self._user_context = None # TODO(kgiusti): raise jira to add 'context' attr to api self._pn_link = pn_link pn_link.context = self def configure(self, target_address, source_address, handler, properties): """Assign addresses, properties, etc.""" self._handler = handler self._properties = properties if target_address is None: if not self._pn_link.is_sender: raise Exception("Dynamic target not allowed") self._pn_link.target.dynamic = True elif target_address: self._pn_link.target.address = target_address if source_address is None: if not self._pn_link.is_receiver: raise Exception("Dynamic source not allowed") self._pn_link.source.dynamic = True elif source_address: self._pn_link.source.address = source_address if properties: desired_mode = properties.get("distribution-mode") if desired_mode: if desired_mode == "copy": mode = proton.Terminus.DIST_MODE_COPY elif desired_mode == "move": mode = proton.Terminus.DIST_MODE_MOVE else: raise Exception("Unknown distribution mode: %s" % str(desired_mode)) self._pn_link.source.distribution_mode = mode @property def name(self): return self._name @property def connection(self): return self._connection def open(self): LOG.debug("Opening the link.") self._pn_link.open() def _get_user_context(self): return self._user_context def _set_user_context(self, ctxt): self._user_context = ctxt _uc_docstr = """Arbitrary application object associated with this link.""" user_context = property(_get_user_context, _set_user_context, doc=_uc_docstr) @property def source_address(self): """Return the authorative source of the link.""" # If link is a sender, source is determined by the local # value, else use the remote. if self._pn_link.is_sender: return self._pn_link.source.address else: return self._pn_link.remote_source.address @property def target_address(self): """Return the authorative target of the link.""" # If link is a receiver, target is determined by the local # value, else use the remote. if self._pn_link.is_receiver: return self._pn_link.target.address else: return self._pn_link.remote_target.address def close(self, pn_condition=None): LOG.debug("Closing the link.") if pn_condition: self._pn_link.condition = pn_condition self._pn_link.close() @property def active(self): state = self._pn_link.state return state == (proton.Endpoint.LOCAL_ACTIVE | proton.Endpoint.REMOTE_ACTIVE) @property def closed(self): state = self._pn_link.state return state == (proton.Endpoint.LOCAL_CLOSED | proton.Endpoint.REMOTE_CLOSED) def reject(self, pn_condition): self._pn_link.open() if pn_condition: self._pn_link.condition = pn_condition self._pn_link.close() def destroy(self): LOG.debug("link destroyed %s", str(self._pn_link)) self._user_context = None self._connection = None if self._pn_link: session = self._pn_link.session.context self._pn_link.context = None self._pn_link.free() self._pn_link = None session.link_destroyed(self) # destroy session _after_ link # Link State Machine # Link States: (note: keep in sync with _STATE_MAP below!) _STATE_UNINIT = 0 # initial state _STATE_PENDING = 1 # waiting for remote to open _STATE_REQUESTED = 2 # waiting for local to open _STATE_CANCELLED = 3 # remote closed requested link before accepted _STATE_ACTIVE = 4 _STATE_NEED_CLOSE = 5 # remote initiated close _STATE_CLOSING = 6 # locally closed, pending remote _STATE_CLOSED = 7 # terminal state _STATE_REJECTED = 8 # terminal state, automatic link cleanup # Events: _LOCAL_ACTIVE = 1 _LOCAL_CLOSED = 2 _REMOTE_ACTIVE = 3 _REMOTE_CLOSED = 4 # state entry actions - link type specific: @staticmethod def _fsm_active(link): """Both ends of the link have become active.""" link._do_active() @staticmethod def _fsm_need_close(link): """The remote has closed its end of the link.""" link._do_need_close() @staticmethod def _fsm_closed(link): """Both ends of the link have closed.""" link._do_closed() @staticmethod def _fsm_requested(link): """Remote has created a new link.""" link._do_requested() def _process_endpoints(self): """Process any changes in link endpoint state.""" LOCAL_MASK = (proton.Endpoint.LOCAL_UNINIT | proton.Endpoint.LOCAL_ACTIVE | proton.Endpoint.LOCAL_CLOSED) REMOTE_MASK = (proton.Endpoint.REMOTE_UNINIT | proton.Endpoint.REMOTE_ACTIVE | proton.Endpoint.REMOTE_CLOSED) new_state = self._pn_link.state old_state = self._ep_state fsm = _Link._STATE_MAP[self._state] if ((new_state & LOCAL_MASK) != (old_state & LOCAL_MASK)): event = None if new_state & proton.Endpoint.LOCAL_ACTIVE: event = _Link._LOCAL_ACTIVE elif new_state & proton.Endpoint.LOCAL_CLOSED: event = _Link._LOCAL_CLOSED if event: entry = fsm.get(event) if entry: LOG.debug("Old State: %d Event %d New State: %d", self._state, event, entry[0]) self._state = entry[0] if entry[1]: entry[1](self) fsm = _Link._STATE_MAP[self._state] if ((new_state & REMOTE_MASK) != (old_state & REMOTE_MASK)): event = None if new_state & proton.Endpoint.REMOTE_ACTIVE: event = _Link._REMOTE_ACTIVE elif new_state & proton.Endpoint.REMOTE_CLOSED: event = _Link._REMOTE_CLOSED if event: entry = fsm.get(event) if entry: LOG.debug("Old State: %d Event %d New State: %d", self._state, event, entry[0]) self._state = entry[0] if entry[1]: entry[1](self) self._ep_state = new_state def _process_delivery(self, pn_delivery): raise NotImplementedError("Must Override") def _process_credit(self): raise NotImplementedError("Must Override") def _session_closed(self): """Remote has closed the session used by this link.""" fsm = _Link._STATE_MAP[self._state] entry = fsm.get(_Link._REMOTE_CLOSED) if entry: LOG.debug("Old State: %d Event %d New State: %d", self._state, _Link._REMOTE_CLOSED, entry[0]) self._state = entry[0] if entry[1]: entry[1](self) _Link._STATE_MAP = [ # {event: (next-state, action), ...} # _STATE_UNINIT: {_Link._LOCAL_ACTIVE: (_Link._STATE_PENDING, None), _Link._REMOTE_ACTIVE: (_Link._STATE_REQUESTED, _Link._fsm_requested), _Link._REMOTE_CLOSED: (_Link._STATE_NEED_CLOSE, _Link._fsm_need_close)}, # _STATE_PENDING: {_Link._LOCAL_CLOSED: (_Link._STATE_CLOSING, None), _Link._REMOTE_ACTIVE: (_Link._STATE_ACTIVE, _Link._fsm_active), _Link._REMOTE_CLOSED: (_Link._STATE_NEED_CLOSE, _Link._fsm_need_close)}, # _STATE_REQUESTED: {_Link._LOCAL_CLOSED: (_Link._STATE_REJECTED, None), _Link._LOCAL_ACTIVE: (_Link._STATE_ACTIVE, _Link._fsm_active), _Link._REMOTE_CLOSED: (_Link._STATE_CANCELLED, None)}, # _STATE_CANCELLED: {_Link._LOCAL_CLOSED: (_Link._STATE_REJECTED, None), _Link._LOCAL_ACTIVE: (_Link._STATE_NEED_CLOSE, _Link._fsm_need_close)}, # _STATE_ACTIVE: {_Link._LOCAL_CLOSED: (_Link._STATE_CLOSING, None), _Link._REMOTE_CLOSED: (_Link._STATE_NEED_CLOSE, _Link._fsm_need_close)}, # _STATE_NEED_CLOSE: {_Link._LOCAL_CLOSED: (_Link._STATE_CLOSED, _Link._fsm_closed)}, # _STATE_CLOSING: {_Link._REMOTE_CLOSED: (_Link._STATE_CLOSED, _Link._fsm_closed)}, # _STATE_CLOSED: {}, # _STATE_REJECTED: {}] class SenderEventHandler(object): def sender_active(self, sender_link): LOG.debug("sender_active (ignored)") def sender_remote_closed(self, sender_link, pn_condition): LOG.debug("sender_remote_closed (ignored)") def sender_closed(self, sender_link): LOG.debug("sender_closed (ignored)") def credit_granted(self, sender_link): LOG.debug("credit_granted (ignored)") class SenderLink(_Link): # Status for message send callback # ABORTED = -2 TIMED_OUT = -1 UNKNOWN = 0 ACCEPTED = 1 REJECTED = 2 RELEASED = 3 MODIFIED = 4 class _SendRequest(object): """Tracks sending a single message.""" def __init__(self, link, tag, message, callback, handle, deadline): self.link = link self.tag = tag self.message = message self.callback = callback self.handle = handle self.deadline = deadline self.link._send_requests[self.tag] = self if deadline: self.link._connection._add_timer(deadline, self) def __call__(self): """Invoked by Connection on timeout (now <= deadline).""" self.link._send_expired(self) def destroy(self, state, info): """Invoked on final completion of send.""" if self.deadline and state != SenderLink.TIMED_OUT: self.link._connection._cancel_timer(self.deadline, self) if self.tag in self.link._send_requests: del self.link._send_requests[self.tag] if self.callback: self.callback(self.link, self.handle, state, info) def __init__(self, connection, pn_link): super(SenderLink, self).__init__(connection, pn_link) self._send_requests = {} # indexed by tag self._pending_sends = collections.deque() # tags in order sent self._next_deadline = 0 self._next_tag = 0 self._last_credit = 0 # TODO(kgiusti) - think about send-settle-mode configuration def send(self, message, delivery_callback=None, handle=None, deadline=None): tag = "dingus-tag-%s" % self._next_tag self._next_tag += 1 send_req = SenderLink._SendRequest(self, tag, message, delivery_callback, handle, deadline) self._pn_link.delivery(tag) LOG.debug("Sending a message, tag=%s", tag) if deadline: self._connection._add_timer(deadline, send_req) pn_delivery = self._pn_link.current if pn_delivery and pn_delivery.writable: # send oldest pending: if self._pending_sends: self._pending_sends.append(tag) tag = self._pending_sends.popleft() send_req = self._send_requests[tag] LOG.debug("Sending previous pending message, tag=%s", tag) self._write_msg(pn_delivery, send_req) else: LOG.debug("Send is pending for credit, tag=%s", tag) self._pending_sends.append(tag) return 0 @property def pending(self): return len(self._send_requests) @property def credit(self): return self._pn_link.credit def close(self, pn_condition=None): self._pending_sends.clear() while self._send_requests: key, send_req = self._send_requests.popitem() # TODO(kgiusti) fix - must be async! send_req.destroy(self, SenderLink.ABORTED, {"condition": pn_condition}) super(SenderLink, self).close(pn_condition) def reject(self, pn_condition=None): """See Link Reject, AMQP1.0 spec.""" self._pn_link.source.type = proton.Terminus.UNSPECIFIED super(SenderLink, self).reject(pn_condition) def destroy(self): self._connection._remove_sender(self._name) self._connection = None super(SenderLink, self).destroy() def _process_delivery(self, pn_delivery): """Check if the delivery can be processed.""" _disposition_state_map = { proton.Disposition.ACCEPTED: SenderLink.ACCEPTED, proton.Disposition.REJECTED: SenderLink.REJECTED, proton.Disposition.RELEASED: SenderLink.RELEASED, proton.Disposition.MODIFIED: SenderLink.MODIFIED, } LOG.debug("Processing delivery, tag=%s", str(pn_delivery.tag)) if pn_delivery.tag in self._send_requests: if pn_delivery.settled: # remote has finished LOG.debug("Remote has settled a sent msg") state = _disposition_state_map.get(pn_delivery.remote_state, self.UNKNOWN) pn_disposition = pn_delivery.remote info = {} if state == SenderLink.REJECTED: if pn_disposition.condition: info["condition"] = pn_disposition.condition elif state == SenderLink.MODIFIED: info["delivery-failed"] = pn_disposition.failed info["undeliverable-here"] = pn_disposition.undeliverable annotations = pn_disposition.annotations if annotations: info["message-annotations"] = annotations send_req = self._send_requests.pop(pn_delivery.tag) send_req.destroy(state, info) pn_delivery.settle() elif pn_delivery.writable: # we can now send on this delivery LOG.debug("Delivery has become writable") if self._pending_sends: tag = self._pending_sends.popleft() send_req = self._send_requests[tag] self._write_msg(pn_delivery, send_req) else: # tag no longer valid, expired or canceled send? LOG.debug("Delivery ignored, tag=%s", str(pn_delivery.tag)) pn_delivery.settle() def _process_credit(self): # Alert if credit has become available if self._handler and self._state == _Link._STATE_ACTIVE: new_credit = self._pn_link.credit if self._last_credit <= 0 and new_credit > 0: LOG.debug("Credit is available, link=%s", self.name) self._handler.credit_granted(self) self._last_credit = new_credit def _write_msg(self, pn_delivery, send_req): # given a writable delivery, send a message LOG.debug("Sending message to engine, tag=%s", send_req.tag) self._pn_link.send(send_req.message.encode()) self._pn_link.advance() if not send_req.callback: # no disposition callback, so we can discard the send request and # settle the delivery immediately send_req.destroy(SenderLink.UNKNOWN, {}) pn_delivery.settle() def _send_expired(self, send_req): LOG.debug("Send request timed-out, tag=%s", send_req.tag) try: self._pending_sends.remove(send_req.tag) except ValueError: pass send_req.destroy(SenderLink.TIMED_OUT, None) # state machine actions: def _do_active(self): LOG.debug("Link is up") if self._handler: self._handler.sender_active(self) def _do_need_close(self): # TODO(kgiusti) error reporting LOG.debug("Link remote closed") if self._handler: cond = self._pn_link.remote_condition self._handler.sender_remote_closed(self, cond) def _do_closed(self): LOG.debug("Link close completed") if self._handler: self._handler.sender_closed(self) def _do_requested(self): LOG.debug("Remote has initiated a link") handler = self._connection._handler if handler: pn_link = self._pn_link # has the remote requested a source address? req_source = "" if pn_link.remote_source.dynamic: req_source = None elif pn_link.remote_source.address: req_source = pn_link.remote_source.address props = {"target-address": pn_link.remote_target.address} dist_mode = pn_link.remote_source.distribution_mode if (dist_mode == proton.Terminus.DIST_MODE_COPY): props["distribution-mode"] = "copy" elif (dist_mode == proton.Terminus.DIST_MODE_MOVE): props["distribution-mode"] = "move" handler.sender_requested(self._connection, pn_link.name, # handle pn_link.name, req_source, props) class ReceiverEventHandler(object): def receiver_active(self, receiver_link): LOG.debug("receiver_active (ignored)") def receiver_remote_closed(self, receiver_link, pn_condition): LOG.debug("receiver_remote_closed (ignored)") def receiver_closed(self, receiver_link): LOG.debug("receiver_closed (ignored)") def message_received(self, receiver_link, message, handle): LOG.debug("message_received (ignored)") class ReceiverLink(_Link): def __init__(self, connection, pn_link): super(ReceiverLink, self).__init__(connection, pn_link) self._next_handle = 0 self._unsettled_deliveries = {} # indexed by handle # TODO(kgiusti) - think about receiver-settle-mode configuration @property def capacity(self): return self._pn_link.credit def add_capacity(self, amount): self._pn_link.flow(amount) def _settle_delivery(self, handle, state): pn_delivery = self._unsettled_deliveries.pop(handle, None) if pn_delivery is None: raise Exception("Invalid message handle: %s" % str(handle)) pn_delivery.update(state) pn_delivery.settle() def message_accepted(self, handle): self._settle_delivery(handle, proton.Delivery.ACCEPTED) def message_released(self, handle): self._settle_delivery(handle, proton.Delivery.RELEASED) def message_rejected(self, handle, pn_condition=None): pn_delivery = self._unsettled_deliveries.pop(handle, None) if pn_delivery is None: raise Exception("Invalid message handle: %s" % str(handle)) if pn_condition: pn_delivery.local.condition = pn_condition pn_delivery.update(proton.Delivery.REJECTED) pn_delivery.settle() def message_modified(self, handle, delivery_failed, undeliverable, annotations): pn_delivery = self._unsettled_deliveries.pop(handle, None) if pn_delivery is None: raise Exception("Invalid message handle: %s" % str(handle)) pn_delivery.local.failed = delivery_failed pn_delivery.local.undeliverable = undeliverable if annotations: pn_delivery.local.annotations = annotations pn_delivery.update(proton.Delivery.MODIFIED) pn_delivery.settle() def reject(self, pn_condition=None): """See Link Reject, AMQP1.0 spec.""" self._pn_link.target.type = proton.Terminus.UNSPECIFIED super(ReceiverLink, self).reject(pn_condition) def destroy(self): self._connection._remove_receiver(self._name) self._connection = None super(ReceiverLink, self).destroy() def _process_delivery(self, pn_delivery): """Check if the delivery can be processed.""" # TODO(kgiusti): multi-frame message transfer if pn_delivery.readable: LOG.debug("Receive delivery readable") data = self._pn_link.recv(pn_delivery.pending) msg = proton.Message() msg.decode(data) self._pn_link.advance() if self._handler: handle = "rmsg-%s:%x" % (self._name, self._next_handle) self._next_handle += 1 self._unsettled_deliveries[handle] = pn_delivery self._handler.message_received(self, msg, handle) else: # TODO(kgiusti): is it ok to assume Delivery.REJECTED? pn_delivery.settle() def _process_credit(self): # Only used by SenderLink pass # state machine actions: def _do_active(self): LOG.debug("Link is up") if self._handler: self._handler.receiver_active(self) def _do_need_close(self): LOG.debug("Link remote closed") if self._handler: cond = self._pn_link.remote_condition self._handler.receiver_remote_closed(self, cond) def _do_closed(self): LOG.debug("Link close completed") if self._handler: self._handler.receiver_closed(self) def _do_requested(self): LOG.debug("Remote has initiated a ReceiverLink") handler = self._connection._handler if handler: pn_link = self._pn_link # has the remote requested a target address? req_target = "" if pn_link.remote_target.dynamic: req_target = None elif pn_link.remote_target.address: req_target = pn_link.remote_target.address props = {"source-address": pn_link.remote_source.address} dist_mode = pn_link.remote_source.distribution_mode if (dist_mode == proton.Terminus.DIST_MODE_COPY): props["distribution-mode"] = "copy" elif (dist_mode == proton.Terminus.DIST_MODE_MOVE): props["distribution-mode"] = "move" handler.receiver_requested(self._connection, pn_link.name, # handle pn_link.name, req_target, props) class _SessionProxy(object): """Corresponds to a Proton Session object.""" def __init__(self, connection, pn_session=None): self._locally_initiated = not pn_session self._connection = connection if not pn_session: pn_session = connection._pn_connection.session() self._pn_session = pn_session self._links = set() pn_session.context = self def open(self): self._pn_session.open() def new_sender(self, name): """Create a new sender link.""" pn_link = self._pn_session.sender(name) return self.request_sender(pn_link) def request_sender(self, pn_link): """Create link from request for a sender.""" sl = SenderLink(self._connection, pn_link) self._links.add(sl) return sl def new_receiver(self, name): """Create a new receiver link.""" pn_link = self._pn_session.receiver(name) return self.request_receiver(pn_link) def request_receiver(self, pn_link): """Create link from request for a receiver.""" rl = ReceiverLink(self._connection, pn_link) self._links.add(rl) return rl def link_destroyed(self, link): """Link has been destroyed""" self._links.discard(link) if not self._links: # no more links LOG.debug("destroying unneeded session") self._pn_session.close() self._pn_session.free() self._pn_session = None self._connection = None def remote_closed(self): """Peer has closed its end of the session.""" links = self._links.copy() # may modify _links for link in links: link._session_closed()