# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. __all__ = [ "SenderEventHandler", "SenderLink", "ReceiverEventHandler", "ReceiverLink" ] import collections import logging import proton LOG = logging.getLogger(__name__) # link lifecycle state: _STATE_UNINIT = 0 # initial state _STATE_PENDING = 1 # waiting for remote to open _STATE_REQUESTED = 2 # waiting for local to open _STATE_ACTIVE = 3 _STATE_NEED_CLOSE = 4 # remote initiated close _STATE_CLOSING = 5 # locally closed, pending remote _STATE_CLOSED = 6 # terminal state # proton endpoint states: _LOCAL_UNINIT = proton.Endpoint.LOCAL_UNINIT _LOCAL_ACTIVE = proton.Endpoint.LOCAL_ACTIVE _LOCAL_CLOSED = proton.Endpoint.LOCAL_CLOSED _REMOTE_UNINIT = proton.Endpoint.REMOTE_UNINIT _REMOTE_ACTIVE = proton.Endpoint.REMOTE_ACTIVE _REMOTE_CLOSED = proton.Endpoint.REMOTE_CLOSED def _do_pending(link): """The application has opened a new link.""" # nothing to do but wait for the remote: return _STATE_PENDING def _do_requested(link): """The peer has requested a new link be created.""" LOG.debug("Remote has initiated a link") pn_link = link._pn_link if isinstance(link, SenderLink): # has the remote requested a source address? req_source = "" if pn_link.remote_source.dynamic: req_source = None elif pn_link.remote_source.address: req_source = pn_link.remote_source.address handler = link._connection._handler if handler: handler.sender_requested(link._connection, pn_link.name, # handle pn_link.name, req_source, {"target-address": pn_link.remote_target.address}) else: # has the remote requested a target address? req_target = "" if pn_link.remote_target.dynamic: req_target = None elif pn_link.remote_target.address: req_target = pn_link.remote_target.address handler = link._connection._handler if handler: handler.receiver_requested(link._connection, pn_link.name, # handle pn_link.name, req_target, {"source-address": pn_link.remote_source.address}) return _STATE_REQUESTED def _do_active(link): """Both ends of the link have become active.""" LOG.debug("Link is up") if link._handler: if isinstance(link, SenderLink): link._handler.sender_active(link) else: link._handler.receiver_active(link) return _STATE_ACTIVE def _do_need_close(link): """The remote has closed its end of the link.""" # TODO(kgiusti) error reporting LOG.debug("Link remote closed") if link._handler: if isinstance(link, SenderLink): link._handler.sender_remote_closed(link, None) else: link._handler.receiver_remote_closed(link, None) return _STATE_NEED_CLOSE def _do_closing(link): """Locally closed, remote end still active.""" # nothing to do but wait for the remote return _STATE_CLOSING def _do_closed(link): """Both ends of the link have closed.""" LOG.debug("Link close completed") if link._handler: if isinstance(link, SenderLink): link._handler.sender_closed(link) else: link._handler.receiver_closed(link) return _STATE_CLOSED # Given the current endpoint state for the link, move to the next state do any # state-specific processing: _EP_STATE_MACHINE = [ # _STATE_UNINIT: {(_LOCAL_ACTIVE | _REMOTE_UNINIT): _do_pending, (_LOCAL_UNINIT | _REMOTE_ACTIVE): _do_requested}, # _STATE_PENDING: {(_LOCAL_ACTIVE | _REMOTE_ACTIVE): _do_active, (_LOCAL_ACTIVE | _REMOTE_CLOSED): _do_need_close, (_LOCAL_CLOSED | _REMOTE_UNINIT): _do_closed, (_LOCAL_CLOSED | _REMOTE_CLOSED): _do_closed}, # _STATE_REQESTED: {(_LOCAL_ACTIVE | _REMOTE_ACTIVE): _do_active, (_LOCAL_CLOSED | _REMOTE_ACTIVE): _do_closing, (_LOCAL_ACTIVE | _REMOTE_CLOSED): _do_need_close}, # _STATE_ACTIVE: {(_LOCAL_ACTIVE | _REMOTE_CLOSED): _do_need_close, (_LOCAL_CLOSED | _REMOTE_ACTIVE): _do_closing, (_LOCAL_CLOSED | _REMOTE_CLOSED): _do_closed}, # _STATE_NEED_CLOSE: {(_LOCAL_CLOSED | _REMOTE_CLOSED): _do_closed}, # _STATE_CLOSING: {(_LOCAL_CLOSED | _REMOTE_CLOSED): _do_closed}] class _Link(object): """A generic Link base class.""" def __init__(self, connection, pn_link): self._state = _STATE_UNINIT # last known endpoint state: self._ep_state = _LOCAL_UNINIT | _REMOTE_UNINIT self._connection = connection self._name = pn_link.name self._handler = None self._properties = None self._user_context = None # TODO(kgiusti): raise jira to add 'context' attr to api self._pn_link = pn_link pn_link.context = self def configure(self, target_address, source_address, handler, properties): """Assign addresses, properties, etc.""" self._handler = handler self._properties = properties if target_address is None: if not self._pn_link.is_sender: raise Exception("Dynamic target not allowed") self._pn_link.target.dynamic = True elif target_address: self._pn_link.target.address = target_address if source_address is None: if not self._pn_link.is_receiver: raise Exception("Dynamic source not allowed") self._pn_link.source.dynamic = True elif source_address: self._pn_link.source.address = source_address if properties: desired_mode = properties.get("distribution-mode") if desired_mode: if desired_mode == "copy": mode = proton.Terminus.DIST_MODE_COPY elif desired_mode == "move": mode = proton.Terminus.DIST_MODE_MOVE else: raise Exception("Unknown distribution mode: %s" % str(desired_mode)) self._pn_link.source.distribution_mode = mode @property def name(self): return self._name def open(self): LOG.debug("Opening the link.") self._pn_link.open() self._connection._add_work(self) def _get_user_context(self): return self._user_context def _set_user_context(self, ctxt): self._user_context = ctxt _uc_docstr = """Arbitrary application object associated with this link.""" user_context = property(_get_user_context, _set_user_context, doc=_uc_docstr) @property def source_address(self): """Return the authorative source of the link.""" # If link is a sender, source is determined by the local # value, else use the remote. if self._pn_link.is_sender: return self._pn_link.source.address else: return self._pn_link.remote_source.address @property def target_address(self): """Return the authorative target of the link.""" # If link is a receiver, target is determined by the local # value, else use the remote. if self._pn_link.is_receiver: return self._pn_link.target.address else: return self._pn_link.remote_target.address def close(self, error=None): LOG.debug("Closing the link.") self._pn_link.close() self._connection._add_work(self) @property def closed(self): state = self._pn_link.state return state == (proton.Endpoint.LOCAL_CLOSED | proton.Endpoint.REMOTE_CLOSED) def destroy(self): LOG.debug("link destroyed %s", str(self._pn_link)) self._user_context = None if self._pn_link: self._pn_link.context = None self._pn_link.free() self._pn_link = None def _process(self): """Link state machine processing.""" # check for transitions in Endpoint state: pn_link = self._pn_link if self._state != _STATE_CLOSED: ep_state = pn_link.state if ep_state != self._ep_state: LOG.debug("link state: %s old ep: %s new ep: %s", self._state, hex(self._ep_state), hex(ep_state)) self._ep_state = ep_state self._state = _EP_STATE_MACHINE[self._state][ep_state](self) def _process_delivery(self, pn_delivery): pass class SenderEventHandler(object): def sender_active(self, sender_link): LOG.debug("sender_active (ignored)") def sender_remote_closed(self, sender_link, error=None): LOG.debug("sender_remote_closed (ignored)") def sender_closed(self, sender_link): LOG.debug("sender_closed (ignored)") class SenderLink(_Link): # Status for message send callback # ABORTED = -2 TIMED_OUT = -1 UNKNOWN = 0 ACCEPTED = 1 REJECTED = 2 RELEASED = 3 MODIFIED = 4 def __init__(self, connection, pn_link): super(SenderLink, self).__init__(connection, pn_link) self._pending_sends = collections.deque() self._pending_acks = {} self._next_deadline = 0 self._next_tag = 0 # TODO(kgiusti) - think about send-settle-mode configuration def send(self, message, delivery_callback=None, handle=None, deadline=None): self._pending_sends.append((message, delivery_callback, handle, deadline)) # TODO(kgiusti) deadline not supported yet if deadline: raise NotImplementedError("send timeout not supported yet!") if deadline and (self._next_deadline == 0 or self._next_deadline > deadline): self._next_deadline = deadline pn_delivery = self._pn_link.delivery("tag-%x" % self._next_tag) self._next_tag += 1 if pn_delivery.writable: send_req = self._pending_sends.popleft() self._write_msg(pn_delivery, send_req) return 0 def pending(self): return len(self._pending_sends) + len(self._pending_acks) def credit(self): return self._pn_link.credit() def close(self, error=None): while self._pending_sends: i = self._pending_sends.popleft() cb = i[1] if cb: handle = i[2] cb(self, handle, self.ABORTED, error) for i in self._pending_acks.itervalues(): cb = i[1] handle = i[2] cb(self, handle, self.ABORTED, error) self._pending_acks.clear() super(SenderLink, self).close() def reject(self, reason): """See Link Reject, AMQP1.0 spec.""" # TODO(kgiusti) support reason for close self._pn_link.source.type = proton.Terminus.UNSPECIFIED self._pn_link.open() self._pn_link.close() def destroy(self): self._connection._remove_sender(self._name) self._connection = None super(SenderLink, self).destroy() def _process_delivery(self, pn_delivery): """Check if the delivery can be processed.""" _disposition_state_map = { proton.Disposition.ACCEPTED: SenderLink.ACCEPTED, proton.Disposition.REJECTED: SenderLink.REJECTED, proton.Disposition.RELEASED: SenderLink.RELEASED, proton.Disposition.MODIFIED: SenderLink.MODIFIED, } if pn_delivery.tag in self._pending_acks: if pn_delivery.settled: # remote has finished LOG.debug("Remote has settled a sent msg") send_req = self._pending_acks.pop(pn_delivery.tag) state = _disposition_state_map.get(pn_delivery.remote_state, self.UNKNOWN) cb = send_req[1] handle = send_req[2] cb(self, handle, state, None) pn_delivery.settle() else: # not for a sent msg, use it to send the next if pn_delivery.writable and self._pending_sends: send_req = self._pending_sends.popleft() self._write_msg(pn_delivery, send_req) else: # what else is there??? pn_delivery.settle() def _write_msg(self, pn_delivery, send_req): # given a writable delivery, send a message # send_req = (msg, cb, handle, deadline) LOG.debug("Sending a pending message") msg = send_req[0] cb = send_req[1] self._pn_link.send(msg.encode()) self._pn_link.advance() if cb: # delivery callback given if pn_delivery.tag in self._pending_acks: raise Exception("Duplicate delivery tag?") self._pending_acks[pn_delivery.tag] = send_req else: # no status required, so settle it now. pn_delivery.settle() class ReceiverEventHandler(object): def receiver_active(self, receiver_link): LOG.debug("receiver_active (ignored)") def receiver_remote_closed(self, receiver_link, error=None): LOG.debug("receiver_remote_closed (ignored)") def receiver_closed(self, receiver_link): LOG.debug("receiver_closed (ignored)") def message_received(self, receiver_link, message, handle): LOG.debug("message_received (ignored)") class ReceiverLink(_Link): def __init__(self, connection, pn_link): super(ReceiverLink, self).__init__(connection, pn_link) self._next_handle = 0 self._unsettled_deliveries = {} # indexed by handle # TODO(kgiusti) - think about receiver-settle-mode configuration def capacity(self): return self._pn_link.credit() def add_capacity(self, amount): self._pn_link.flow(amount) def message_accepted(self, handle): self._settle_delivery(handle, proton.Delivery.ACCEPTED) def message_rejected(self, handle, reason=None): # TODO(kgiusti): how to deal with 'reason' self._settle_delivery(handle, proton.Delivery.REJECTED) def message_released(self, handle): self._settle_delivery(handle, proton.Delivery.RELEASED) def message_modified(self, handle): self._settle_delivery(handle, proton.Delivery.MODIFIED) def reject(self, reason): """See Link Reject, AMQP1.0 spec.""" # TODO(kgiusti) support reason for close self._pn_link.target.type = proton.Terminus.UNSPECIFIED self._pn_link.open() self._pn_link.close() def destroy(self): self._connection._remove_receiver(self._name) self._connection = None super(ReceiverLink, self).destroy() def _process_delivery(self, pn_delivery): """Check if the delivery can be processed.""" # TODO(kgiusti): multi-frame message transfer if pn_delivery.readable: LOG.debug("Receive delivery readable") data = self._pn_link.recv(pn_delivery.pending) msg = proton.Message() msg.decode(data) self._pn_link.advance() if self._handler: handle = "rmsg-%s:%x" % (self._name, self._next_handle) self._next_handle += 1 self._unsettled_deliveries[handle] = pn_delivery self._handler.message_received(self, msg, handle) else: # TODO(kgiusti): is it ok to assume Delivery.REJECTED? pn_delivery.settle() def _settle_delivery(self, handle, result): # settle delivery associated with a handle if handle not in self._unsettled_deliveries: raise Exception("Invalid message handle: %s" % str(handle)) pn_delivery = self._unsettled_deliveries.pop(handle) pn_delivery.update(result) pn_delivery.settle()