Kenneth Giusti 623b1af3ab checkpoint
2013-12-23 10:35:17 -05:00

307 lines
10 KiB
Python

#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
import collections
import proton
class _Link(object):
"""Generic Link base class"""
def __init__(self, connection, pn_link, name,
target_address, source_address,
handler, properties):
self._connection = connection
self._name = name
self._handler = handler
self._properties = properties
self._pn_link = pn_link
# @todo: raise jira to add 'context' to api
pn_link.context = self
if target_address is None:
assert pn_link.is_sender, "Dynamic target not allowed"
self._pn_link.target.dynamic = True
elif target_address:
self._pn_link.target.address = target_address
if source_address is None:
assert pn_link.is_receiver, "Dynamic source not allowed"
self._pn_link.source.dynamic = True
elif source_address:
self._pn_link.source.address = source_address
desired_mode = properties.get("distribution-mode")
if desired_mode:
if desired_mode == "copy":
self._pn_link.source.distribution_mode = \
proton.Terminus.DIST_MODE_COPY
elif desired_mode == "move":
self._pn_link.source.distribution_mode = \
proton.Terminus.DIST_MODE_MOVE
else:
raise Exception("Unknown distribution mode: %s" %
str(desired_mode))
self._user_context = None
def _get_user_context(self):
return self._user_context
def _set_user_context(self, ctxt):
self._user_context = ctxt
user_context = property(_get_user_context, _set_user_context,
doc="""
Associate an arbitrary application object with this link.
""")
@property
def source_address(self):
"""If link is a sender, source is determined by the local value, else
use the remote
"""
if self._pn_link.is_sender:
return self._pn_link.source.address
else:
return self._pn_link.remote_source.address
@property
def target_address(self):
"""If link is a receiver, target is determined by the local value, else
use the remote
"""
if self._pn_link.is_receiver:
return self._pn_link.target.address
else:
return self._pn_link.remote_target.address
def _destroy(self):
self._user_context = None
self._pn_link.context = None
self._pn_link = None
class SenderEventHandler(object):
"""
"""
def sender_active(self, sender_link):
pass
def sender_closed(self, sender_link, error):
pass
class SenderLink(_Link):
# Status for message send callback
#
ABORTED = -2
TIMED_OUT = -1
UNKNOWN = 0
ACCEPTED = 1
REJECTED = 2
RELEASED = 3
MODIFIED = 4
def __init__(self, connection, pn_link, name, source_address,
target_address, eventHandler, properties):
super(SenderLink, self).__init__(connection, pn_link, name,
target_address, source_address,
eventHandler, properties)
self._pending_sends = collections.deque()
self._pending_acks = {}
self._next_deadline = 0
self._next_tag = 0
# @todo - think about send-settle-mode configuration
self._pn_link.open()
def send(self, message, delivery_callback=None, handle=None, deadline=None):
"""
"""
self._pending_sends.append( (message, delivery_callback, handle,
deadline) )
if deadline and (self._next_deadline == 0 or
self._next_deadline > deadline):
self._next_deadline = deadline
delivery = self._pn_link.delivery( "tag-%x" % self._next_tag )
self._next_tag += 1
if delivery.writable:
send_req = self._pending_sends.popleft()
self._write_msg( delivery, send_req )
return 0
def pending(self):
return len(self._pending_sends) + len(self._pending_acks)
def credit(self):
return self._pn_link.credit()
def destroy(self, error=None):
self._pn_link.close()
while self._pending_sends:
i = self._pending_sends.popleft()
cb = i[1]
if cb:
handle = i[2]
cb(self, handle, self.ABORTED, error)
for i in self._pending_acks.itervalues():
cb = i[1]
handle = i[2]
cb(self, handle, self.ABORTED, error)
self._pending_acks = {}
self._connection._remove_sender(self._name)
self._connection = None
super(SenderLink, self)._destroy()
def _delivery_updated(self, delivery):
# A delivery has changed state.
# Do we need to know?
_disposition_state_map = {
proton.Disposition.ACCEPTED: SenderLink.ACCEPTED,
proton.Disposition.REJECTED: SenderLink.REJECTED,
proton.Disposition.RELEASED: SenderLink.RELEASED,
proton.Disposition.MODIFIED: SenderLink.MODIFIED,
}
if delivery.tag in self._pending_acks:
if delivery.settled: # remote has finished
print "deliver updated, remote state=%s" % str(delivery.remote_state)
send_req = self._pending_acks.pop(delivery.tag)
state = _disposition_state_map.get(delivery.remote_state,
self.UNKNOWN)
cb = send_req[1]
handle = send_req[2]
cb(self, handle, state, None)
delivery.settle()
else:
# not for a sent msg, use it to send the next
if delivery.writable and self._pending_sends:
send_req = self._pending_sends.popleft()
self._write_msg(delivery, send_req)
else:
# what else is there???
delivery.settle()
def _write_msg(self, delivery, send_req):
# given a writable delivery, send a message
# send_req = (msg, cb, handle, deadline)
msg = send_req[0]
cb = send_req[1]
self._pn_link.send( msg.encode() )
self._pn_link.advance()
if cb: # delivery callback given
assert delivery.tag not in self._pending_acks
self._pending_acks[delivery.tag] = send_req
else:
# no status required, so settle it now.
delivery.settle()
class ReceiverEventHandler(object):
def receiver_active(self, receiver_link):
pass
def receiver_closed(self, receiver_link, error):
pass
def message_received(self, receiver_link, message, handle):
pass
class ReceiverLink(_Link):
def __init__(self, connection, pn_link, name, target_address,
source_address, eventHandler, properties):
super(ReceiverLink, self).__init__(connection, pn_link, name,
target_address, source_address,
eventHandler, properties)
self._next_handle = 0
self._unsettled_deliveries = {} # indexed by handle
# @todo - think about receiver-settle-mode configuration
credit = properties.get("capacity", 10)
self._pn_link.flow(credit)
self._pn_link.open()
def capacity(self):
return self._pn_link.credit()
def add_capacity(self, amount):
self._pn_link.flow(amount)
def message_accepted(self, handle):
self._settle_delivery(handle, proton.Delivery.ACCEPTED)
def message_rejected(self, handle, reason=None):
# @todo: how to deal with 'reason'
self._settle_delivery(handle, proton.Delivery.REJECTED)
def message_released(self, handle):
self._settle_delivery(handle, proton.Delivery.RELEASED)
def message_modified(self, handle):
self._settle_delivery(handle, proton.Delivery.MODIFIED)
# @todo - add 'received', 'released', etc
def destroy(self, error=None):
self._pn_link.close()
self._connection._remove_receiver(self._name)
self._connection = None
super(ReceiverLink, self)._destroy()
def _delivery_updated(self, delivery):
# a receive delivery changed state
# @todo: multi-frame message transfer
if delivery.readable:
data = self._pn_link.recv(delivery.pending)
msg = proton.Message()
msg.decode(data)
self._pn_link.advance()
if self._handler:
handle = "rmsg-%s:%x" % (self._name, self._next_handle)
self._next_handle += 1
self._unsettled_deliveries[handle] = delivery
self._handler.message_received(self, msg, handle)
else:
# @todo: is it ok to assume Delivery.REJECTED?
delivery.settle()
def _settle_delivery(self, handle, result):
# settle delivery associated with a handle
if handle not in self._unsettled_deliveries:
raise Exception("Invalid message handle: %s" % str(handle))
delivery = self._unsettled_deliveries.pop(handle)
delivery.update(result)
delivery.settle()
__all__ = [
"SenderEventHandler",
"SenderLink",
"ReceiverEventHandler",
"ReceiverLink"
]