Separate the call to open() from the constructor
This commit is contained in:
parent
aaa54fb148
commit
560de5b097
@ -44,6 +44,11 @@ class MyConnection(fusion.ConnectionEventHandler):
|
||||
properties)
|
||||
self.connection.user_context = self
|
||||
|
||||
self.connection.sasl.mechanisms("ANONYMOUS")
|
||||
self.connection.sasl.client()
|
||||
|
||||
self.connection.open()
|
||||
|
||||
def process(self):
|
||||
""" Do connection-based processing (I/O and timers) """
|
||||
readfd = []
|
||||
@ -132,6 +137,10 @@ class MyCaller(fusion.SenderEventHandler,
|
||||
self._send_completed = False
|
||||
self._response_received = False
|
||||
|
||||
self._sender.open()
|
||||
self._receiver.add_capacity(1)
|
||||
self._receiver.open()
|
||||
|
||||
def done(self):
|
||||
return self._send_completed and self._response_received
|
||||
|
||||
@ -257,10 +266,6 @@ def main(argv=None):
|
||||
my_connection = MyConnection( "to-server", my_socket,
|
||||
container, conn_properties)
|
||||
|
||||
# @todo: need better sasl + server
|
||||
my_connection.connection.sasl.mechanisms("ANONYMOUS")
|
||||
my_connection.connection.sasl.client()
|
||||
|
||||
# Create the RPC caller
|
||||
method = {'method': method_info[0],
|
||||
'args': dict([(method_info[i], method_info[i+1])
|
||||
|
@ -54,6 +54,7 @@ class SocketConnection(fusion.ConnectionEventHandler):
|
||||
self.connection.user_context = self
|
||||
self.connection.sasl.mechanisms("ANONYMOUS")
|
||||
self.connection.sasl.server()
|
||||
self.connection.open()
|
||||
|
||||
def fileno(self):
|
||||
"""Allows use of a SocketConnection in a select() call.
|
||||
@ -114,8 +115,7 @@ class SocketConnection(fusion.ConnectionEventHandler):
|
||||
assert requested_target not in receiver_links
|
||||
|
||||
receiver = MyReceiverLink(connection, link_handle,
|
||||
requested_target, name,
|
||||
{"capacity": 3})
|
||||
requested_target, name)
|
||||
receiver_links[requested_target] = receiver
|
||||
print("New Receiver link created, target=%s" % requested_target)
|
||||
|
||||
@ -140,6 +140,7 @@ class MySenderLink(fusion.SenderEventHandler):
|
||||
self,
|
||||
name,
|
||||
properties)
|
||||
self.sender_link.open()
|
||||
|
||||
# SenderEventHandler callbacks:
|
||||
|
||||
@ -159,16 +160,18 @@ class MyReceiverLink(fusion.ReceiverEventHandler):
|
||||
"""
|
||||
"""
|
||||
def __init__(self, connection, link_handle, target_address,
|
||||
name, properties):
|
||||
name, properties={}):
|
||||
self._link = connection.accept_receiver(link_handle,
|
||||
target_address,
|
||||
self,
|
||||
name,
|
||||
properties)
|
||||
self._link.open()
|
||||
|
||||
# ReceiverEventHandler callbacks:
|
||||
def receiver_active(self, receiver_link):
|
||||
LOG.debug("receiver active callback")
|
||||
self._link.add_capacity(5)
|
||||
|
||||
def receiver_closed(self, receiver_link, error):
|
||||
LOG.debug("receiver closed callback")
|
||||
|
@ -1,51 +0,0 @@
|
||||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
|
||||
# A simple address representation.
|
||||
# Syntax:
|
||||
# "kgiusti://<dns-name>[:port#]/<resource>; {properties}"
|
||||
#
|
||||
# dns-name and port are used to configure a port (socket.getaddrinfo)
|
||||
# resource identifies the resource on the host:
|
||||
# The 'Source.address' for a sending Link,
|
||||
# The 'Target.address' for a receiving Link
|
||||
# properties is a map of address properties (TBD)
|
||||
|
||||
## Not fully thought out... TBD
|
||||
|
||||
class LinkAddress(object):
|
||||
def __init__(self, address):
|
||||
self._address = address
|
||||
self._properties = properties
|
||||
|
||||
def hostname(self):
|
||||
pass
|
||||
|
||||
def port(self):
|
||||
pass
|
||||
|
||||
def resource(self):
|
||||
pass
|
||||
|
||||
def properties(self):
|
||||
pass
|
||||
|
||||
__all__ = [
|
||||
"LinkAddress"
|
||||
]
|
@ -69,6 +69,8 @@ class Connection(object):
|
||||
"""
|
||||
self._name = name
|
||||
self._container = container
|
||||
self._handler = eventHandler
|
||||
|
||||
self._pn_connection = proton.Connection()
|
||||
self._pn_connection.container = container.name
|
||||
if 'hostname' in properties:
|
||||
@ -78,7 +80,6 @@ class Connection(object):
|
||||
self._pn_transport.bind(self._pn_connection)
|
||||
if properties.get("trace"):
|
||||
self._pn_transport.trace(proton.Transport.TRACE_FRM)
|
||||
self._handler = eventHandler
|
||||
|
||||
self._sender_links = {}
|
||||
self._receiver_links = {}
|
||||
@ -94,10 +95,6 @@ class Connection(object):
|
||||
self._pn_sasl = None
|
||||
self._sasl_done = False
|
||||
|
||||
self._pn_connection.open()
|
||||
self._pn_session = self._pn_connection.session()
|
||||
self._pn_session.open()
|
||||
|
||||
@property
|
||||
def container(self):
|
||||
return self._container
|
||||
@ -402,6 +399,13 @@ Associate an arbitrary user object with this Connection.
|
||||
# @todo support reason for close
|
||||
pn_link.close()
|
||||
|
||||
def open(self):
|
||||
"""
|
||||
"""
|
||||
self._pn_connection.open()
|
||||
self._pn_session = self._pn_connection.session()
|
||||
self._pn_session.open()
|
||||
|
||||
def close(self, error=None):
|
||||
"""
|
||||
"""
|
||||
@ -425,6 +429,8 @@ Associate an arbitrary user object with this Connection.
|
||||
self._pending_links.clear()
|
||||
self._sender_links.clear()
|
||||
self._receiver_links.clear()
|
||||
self._container._remove_connection(self._name)
|
||||
self._container = None
|
||||
self._pn_connection = None
|
||||
self._pn_transport = None
|
||||
self._user_context = None
|
||||
|
@ -61,6 +61,11 @@ class _Link(object):
|
||||
self._user_context = None
|
||||
self._active = False
|
||||
|
||||
def open(self):
|
||||
"""
|
||||
"""
|
||||
self._pn_link.open()
|
||||
|
||||
def _get_user_context(self):
|
||||
return self._user_context
|
||||
|
||||
@ -141,8 +146,6 @@ class SenderLink(_Link):
|
||||
|
||||
# @todo - think about send-settle-mode configuration
|
||||
|
||||
self._pn_link.open()
|
||||
|
||||
def send(self, message, delivery_callback=None, handle=None, deadline=None):
|
||||
"""
|
||||
"""
|
||||
@ -257,10 +260,6 @@ class ReceiverLink(_Link):
|
||||
|
||||
# @todo - think about receiver-settle-mode configuration
|
||||
|
||||
credit = properties.get("capacity", 10)
|
||||
self._pn_link.flow(credit)
|
||||
self._pn_link.open()
|
||||
|
||||
def capacity(self):
|
||||
return self._pn_link.credit()
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user