fixes and cleanups

This commit is contained in:
Kenneth Giusti 2013-12-23 15:29:19 -05:00
parent 623b1af3ab
commit 81c3b3a75c
6 changed files with 110 additions and 61 deletions

@ -20,11 +20,7 @@ import optparse, sys, time, uuid
import re, socket, select, errno
from proton import Message
# @todo stop the madness:
import container as fusion_container
import connection as fusion_connection
import link as fusion_link
import fusion
"""
@ -36,7 +32,7 @@ to a server, and waits for a response. The method call is a map of the form:
"""
class MyConnection(fusion_connection.ConnectionEventHandler):
class MyConnection(fusion.ConnectionEventHandler):
def __init__(self, name, socket, container, properties):
self.name = name
@ -149,8 +145,8 @@ class MyConnection(fusion_connection.ConnectionEventHandler):
print result
class MyCaller(fusion_link.SenderEventHandler,
fusion_link.ReceiverEventHandler):
class MyCaller(fusion.SenderEventHandler,
fusion.ReceiverEventHandler):
"""
"""
@ -167,11 +163,18 @@ class MyCaller(fusion_link.SenderEventHandler,
self._method = method_map
self._reply_to = None
self._to = None
self._receiver_closed = False
self._sender_closed = False
self._send_completed = False
self._response_received = False
def done(self):
return self._receiver_closed and self._sender_closed
return self._send_completed and self._response_received
def destroy(self):
if self._sender:
self._sender.destroy()
if self._receiver:
self._receiver.destroy()
def _send_request(self):
"""Send a message containing the RPC method call
@ -197,12 +200,14 @@ class MyCaller(fusion_link.SenderEventHandler,
def sender_closed(self, sender_link, error):
print "APP: SENDER CLOSED"
self._sender_closed = True
assert sender_link is self._sender
self._send_completed = True
# send complete callback:
def __call__(self, sender_link, handle, status, error):
print "APP: MESSAGE SENT CALLBACK %s" % status
self._send_completed = True
# ReceiverEventHandler callbacks:
@ -216,16 +221,20 @@ class MyCaller(fusion_link.SenderEventHandler,
def receiver_closed(self, receiver_link, error):
print "APP: RECEIVER CLOSED"
self._receiver_closed = True
assert receiver_link is self._receiver
self._response_received = True
def message_received(self, receiver_link, message, handle):
print "APP: MESSAGE RECEIVED"
assert receiver_link is self._receiver
print "Response received: %s" % str(message)
receiver_link.message_accepted(handle)
self._response_received = True
self._sender.destroy()
del self._sender
self._receiver.destroy()
del self._receiver
#self._sender.destroy()
#del self._sender
#self._receiver.destroy()
#del self._receiver
def main(argv=None):
@ -267,7 +276,7 @@ def main(argv=None):
# create AMQP container, connection, sender and receiver
#
container = fusion_container.Container(uuid.uuid4().hex)
container = fusion.Container(uuid.uuid4().hex)
my_connection = MyConnection( "to-server", my_socket,
container, {})
@ -283,13 +292,17 @@ def main(argv=None):
my_connection,
"my-source-address",
"my-target-address",
receiver_properties = {"capacity": 1})
receiver_properties={"capacity": 1},
sender_properties={})
while not my_caller.done():
my_connection.process()
print "DONE"
my_caller.destroy()
my_connection.process()
return 0

@ -20,11 +20,7 @@ import optparse, sys, time, uuid
import re, socket, select, errno
from proton import Message
# @todo stop the madness:
import container as fusion_container
import connection as fusion_connection
import link as fusion_link
import fusion
"""
@ -44,7 +40,7 @@ sender_links = {} # indexed by Source address
receiver_links = {} # indexed by Target address
class SocketConnection(fusion_connection.ConnectionEventHandler):
class SocketConnection(fusion.ConnectionEventHandler):
"""Associates a fusion Connection with a python network socket"""
def __init__(self, name, socket, container, conn_properties):
@ -53,6 +49,8 @@ class SocketConnection(fusion_connection.ConnectionEventHandler):
self.connection = container.create_connection(name, self,
conn_properties)
self.connection.user_context = self
self.connection.sasl.mechanisms("ANONYMOUS")
self.connection.sasl.server()
def fileno(self):
"""Allows use of a SocketConnection in a select() call.
@ -103,13 +101,18 @@ class SocketConnection(fusion_connection.ConnectionEventHandler):
receiver_links[requested_target] = receiver
print "APP: NEW RECEIVER LINK CREATED, target=%s" % requested_target
# SASL callbacks:
def sasl_step(self, connection, pn_sasl):
print "SASL STEP"
pn_sasl.done(pn_sasl.OK)
def sasl_done(self, connection, result):
print "APP: SASL DONE"
print result
class MySenderLink(fusion_link.SenderEventHandler):
class MySenderLink(fusion.SenderEventHandler):
"""
"""
def __init__(self, connection, link_handle, source_address,
@ -135,7 +138,7 @@ class MySenderLink(fusion_link.SenderEventHandler):
print "APP: MESSAGE SENT CALLBACK %s" % status
class MyReceiverLink(fusion_link.ReceiverEventHandler):
class MyReceiverLink(fusion.ReceiverEventHandler):
"""
"""
def __init__(self, connection, link_handle, target_address,
@ -181,6 +184,8 @@ class MyReceiverLink(fusion_link.ReceiverEventHandler):
link.send( response, my_sender,
message, time.time() + 5.0)
self._link.message_accepted(handle)
if self._link.capacity == 0:
self._link.add_capacity( 5 )
@ -220,7 +225,7 @@ def main(argv=None):
# create an AMQP container that will 'provide' the RPC service
#
container = fusion_container.Container("example RPC service")
container = fusion.Container("example RPC service")
socket_connections = {} # indexed by name (uuid)
while True:

@ -16,3 +16,6 @@
# specific language governing permissions and limitations
# under the License.
#
from fusion.container import *
from fusion.connection import *
from fusion.link import *

@ -47,6 +47,9 @@ class ConnectionEventHandler(object):
pass
# @todo cleaner sasl support, esp. server side
def sasl_step(self, connection, pn_sasl):
pass
def sasl_done(self, connection, result):
pass
@ -84,7 +87,7 @@ class Connection(object):
self._active = False
# @todo sasl configuration and handling
self._sasl = None
self._pn_sasl = None
self._sasl_done = False
self._pn_connection.open()
@ -112,9 +115,9 @@ class Connection(object):
@property
# @todo - think about server side use of sasl!
def sasl(self):
if not self._sasl:
self._sasl = self._pn_transport.sasl()
return self._sasl
if not self._pn_sasl:
self._pn_sasl = self._pn_transport.sasl()
return self._pn_sasl
def _get_user_context(self):
return self._user_context
@ -138,14 +141,15 @@ Associate an arbitrary user object with this Connection.
# wait until SASL has authenticated
# @todo Server-side SASL
if self._sasl:
if self._sasl.state not in (proton.SASL.STATE_PASS,
if self._pn_sasl:
if self._pn_sasl.state not in (proton.SASL.STATE_PASS,
proton.SASL.STATE_FAIL):
print("SASL wait.")
print("SASL in progress. State=%s" % self._pn_sasl.state)
self._handler.sasl_step(self, self._pn_sasl)
return
self._handler.sasl_done(self, self.sasl.outcome)
self._sasl = None
self._handler.sasl_done(self, self._pn_sasl.outcome)
self._pn_sasl = None
if self._pn_connection.state & self._NEED_INIT:
assert False, "Connection always opened() on create"
@ -170,45 +174,67 @@ Associate an arbitrary user object with this Connection.
self._pending_links[index] = link
if link.is_sender:
req_source = ""
if link.remote_source.is_dynamic:
if link.remote_source.dynamic:
req_source = None
elif link.remote_source.address:
req_source = link.remote_source.address
self._handler.sender_pending(self, index, req_source,
{"target-address":
link.remote_target.address})
self._handler.sender_requested(self, index, req_source,
{"target-address":
link.remote_target.address})
else:
req_target = ""
if link.remote_target.is_dynamic:
if link.remote_target.dynamic:
req_target = None
elif link.remote_target.address:
req_target = link.remote_target.address
self._handler.receiver_pending(self, index, req_target,
{"source-address":
link.remote_source.address})
self._handler.receiver_requested(self, index, req_target,
{"source-address":
link.remote_source.address})
link = link.next(self._NEED_INIT)
# ?any need for ACTIVE callback?
# @todo: won't scale
link = self._pn_connection.link_head(self._ACTIVE)
while link:
if link.context and not link.context._active:
if link.is_sender:
sender_link = link.context
sender_link._handler.sender_active(sender_link)
else:
receiver_link = link.context
receiver_link._handler.receiver_active(receiver_link)
link.context._active = True
link = link.next(self._ACTIVE)
# process the work queue
delivery = self._pn_connection.work_head
while delivery:
print "Delivery updated!"
if delivery.link.is_sender:
sender_link = delivery.link.context
sender_link._delivery_updated(delivery)
else:
receiver_link = delivery.link.context
receiver_link._delivery_updated(delivery)
if delivery.link.context:
print "Delivery context set!"
if delivery.link.is_sender:
sender_link = delivery.link.context
sender_link._delivery_updated(delivery)
else:
receiver_link = delivery.link.context
receiver_link._delivery_updated(delivery)
delivery = delivery.work_next
# close all endpoints closed by remotes
link = self._pn_connection.link_head(self._NEED_CLOSE)
while link:
print "Link needs close"
# @todo - find link, invoke callback
print "Link closed remotely"
link.close()
# @todo: error reporting
if link.context:
if link.is_sender:
sender_link = link.context
sender_link._handler.sender_closed(sender_link, None)
else:
receiver_link = link.context
receiver_link._handler.receiver_closed(receiver_link,
None)
link = link.next(self._NEED_CLOSE)
ssn = self._pn_connection.session_head(self._NEED_CLOSE)
@ -318,7 +344,7 @@ Associate an arbitrary user object with this Connection.
pn_link = self._pending_links.pop(link_handle)
if not pn_link:
raise Exception("Invalid link_handle: %s" % link_handle)
if pn_link.remote_source.is_dynamic and not source_override:
if pn_link.remote_source.dynamic and not source_override:
raise Exception("A source address must be supplied!")
source_addr = source_override or pn_link.remote_source.address
name = name or source_addr
@ -357,7 +383,7 @@ Associate an arbitrary user object with this Connection.
pn_link = self._pending_links.pop(link_handle)
if not pn_link:
raise Exception("Invalid link_handle: %s" % link_handle)
if pn_link.remote_target.is_dynamic and not target_override:
if pn_link.remote_target.dynamic and not target_override:
raise Exception("A target address must be supplied!")
target_addr = target_override or pn_link.remote_target.address
name = name or target_addr

@ -60,7 +60,7 @@ class Container(object):
writers = []
timer_heap = []
for c in self._connections.itervalues():
if c.need_input > 0:
if c.needs_input > 0:
readers.append(c)
if c.has_output > 0:
writers.append(c)

@ -32,6 +32,7 @@ class _Link(object):
self._pn_link = pn_link
# @todo: raise jira to add 'context' to api
pn_link.context = self
print "Setting context in link %s" % str(pn_link)
if target_address is None:
assert pn_link.is_sender, "Dynamic target not allowed"
@ -57,6 +58,7 @@ class _Link(object):
raise Exception("Unknown distribution mode: %s" %
str(desired_mode))
self._user_context = None
self._active = False
def _get_user_context(self):
return self._user_context
@ -100,7 +102,7 @@ class SenderEventHandler(object):
"""
def sender_active(self, sender_link):
pass
def sender_closed(self, sender_link, error):
def sender_closed(self, sender_link, error=None):
pass
@ -180,6 +182,7 @@ class SenderLink(_Link):
proton.Disposition.RELEASED: SenderLink.RELEASED,
proton.Disposition.MODIFIED: SenderLink.MODIFIED,
}
print "Sender delivery updated"
if delivery.tag in self._pending_acks:
if delivery.settled: # remote has finished
@ -222,7 +225,7 @@ class ReceiverEventHandler(object):
def receiver_active(self, receiver_link):
pass
def receiver_closed(self, receiver_link, error):
def receiver_closed(self, receiver_link, error=None):
pass
def message_received(self, receiver_link, message, handle):
@ -263,8 +266,6 @@ class ReceiverLink(_Link):
def message_modified(self, handle):
self._settle_delivery(handle, proton.Delivery.MODIFIED)
# @todo - add 'received', 'released', etc
def destroy(self, error=None):
self._pn_link.close()
self._connection._remove_receiver(self._name)
@ -274,6 +275,7 @@ class ReceiverLink(_Link):
def _delivery_updated(self, delivery):
# a receive delivery changed state
# @todo: multi-frame message transfer
print "Receive delivery updated"
if delivery.readable:
data = self._pn_link.recv(delivery.pending)
msg = proton.Message()