diff --git a/python/connection.py b/python/connection.py index 0810163..f595d6e 100644 --- a/python/connection.py +++ b/python/connection.py @@ -1,43 +1,38 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. # -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. +# http://www.apache.org/licenses/LICENSE-2.0 # +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. __all__ = [ "ConnectionEventHandler", "Connection" - ] +] import logging +from link import ReceiverLink +from link import SenderLink import proton -from link import SenderLink, ReceiverLink LOG = logging.getLogger(__name__) -# -# An implementation of an AMQP 1.0 Connection -# class ConnectionEventHandler(object): - """ - """ + """An implementation of an AMQP 1.0 Connection.""" def connection_active(self, connection): - """connection handshake completed""" + """Connection handshake has completed.""" LOG.debug("connection_active (ignored)") def connection_remote_closed(self, connection, error=None): @@ -69,14 +64,9 @@ class ConnectionEventHandler(object): class Connection(object): - """ - """ - EOS = -1 # indicates 'I/O stream closed' def __init__(self, container, name, eventHandler=None, properties={}): - """ - """ self._name = name self._container = container self._handler = eventHandler @@ -149,15 +139,11 @@ Associate an arbitrary user object with this Connection. """) def open(self): - """ - """ self._pn_connection.open() self._pn_session = self._pn_connection.session() self._pn_session.open() def close(self, error=None): - """ - """ for l in self._sender_links.itervalues(): l.close(error) for l in self._receiver_links.itervalues(): @@ -173,8 +159,6 @@ Associate an arbitrary user object with this Connection. | proton.Endpoint.REMOTE_CLOSED) def destroy(self): - """ - """ self._sender_links.clear() self._receiver_links.clear() self._container._remove_connection(self._name) @@ -187,7 +171,7 @@ Associate an arbitrary user object with this Connection. if pn_link.is_sender and pn_link.name not in self._sender_links: LOG.debug("Remotely initiated Sender needs init") self._sender_links[pn_link.name] = pn_link - pn_link.context = None # @todo: update proton.py + pn_link.context = None # @todo: update proton.py req_source = "" if pn_link.remote_source.dynamic: req_source = None @@ -200,7 +184,7 @@ Associate an arbitrary user object with this Connection. elif pn_link.is_receiver and pn_link.name not in self._receiver_links: LOG.debug("Remotely initiated Receiver needs init") self._receiver_links[pn_link.name] = pn_link - pn_link.context = None # @todo: update proton.py + pn_link.context = None # @todo: update proton.py req_target = "" if pn_link.remote_target.dynamic: req_target = None @@ -211,22 +195,24 @@ Associate an arbitrary user object with this Connection. {"source-address": pn_link.remote_source.address}) - _REMOTE_REQ = (proton.Endpoint.LOCAL_UNINIT|proton.Endpoint.REMOTE_ACTIVE) - _REMOTE_CLOSE = (proton.Endpoint.LOCAL_ACTIVE|proton.Endpoint.REMOTE_CLOSED) - _ACTIVE = (proton.Endpoint.LOCAL_ACTIVE|proton.Endpoint.REMOTE_ACTIVE) - _CLOSED = (proton.Endpoint.LOCAL_CLOSED|proton.Endpoint.REMOTE_CLOSED) + _REMOTE_REQ = (proton.Endpoint.LOCAL_UNINIT + | proton.Endpoint.REMOTE_ACTIVE) + _REMOTE_CLOSE = (proton.Endpoint.LOCAL_ACTIVE + | proton.Endpoint.REMOTE_CLOSED) + _ACTIVE = (proton.Endpoint.LOCAL_ACTIVE | proton.Endpoint.REMOTE_ACTIVE) + _CLOSED = (proton.Endpoint.LOCAL_CLOSED | proton.Endpoint.REMOTE_CLOSED) + _LOCAL_UNINIT = proton.Endpoint.LOCAL_UNINIT def process(self, now): - """ - """ self._next_tick = self._pn_transport.tick(now) # wait until SASL has authenticated # @todo Server-side SASL if self._pn_sasl: if self._pn_sasl.state not in (proton.SASL.STATE_PASS, - proton.SASL.STATE_FAIL): - LOG.debug("SASL in progress. State=%s", str(self._pn_sasl.state)) + proton.SASL.STATE_FAIL): + LOG.debug("SASL in progress. State=%s", + str(self._pn_sasl.state)) self._handler.sasl_step(self, self._pn_sasl) return @@ -240,15 +226,15 @@ Associate an arbitrary user object with this Connection. self._active = True self._handler.connection_active(self) - pn_session = self._pn_connection.session_head(proton.Endpoint.LOCAL_UNINIT) + pn_session = self._pn_connection.session_head(self._LOCAL_UNINIT) while pn_session: LOG.debug("Opening remotely initiated session") pn_session.open() - pn_session = pn_session.next(proton.Endpoint.LOCAL_UNINIT) + pn_session = pn_session.next(self._LOCAL_UNINIT) pn_link = self._pn_connection.link_head(self._REMOTE_REQ) while pn_link: - next_link = pn_link.next(proton.Endpoint.LOCAL_UNINIT) + next_link = pn_link.next(self._LOCAL_UNINIT) if pn_link.state == self._REMOTE_REQ: self._link_requested(pn_link) @@ -266,11 +252,13 @@ Associate an arbitrary user object with this Connection. if pn_link.is_sender: sender_link = pn_link.context assert isinstance(sender_link, SenderLink) - sender_link._handler.sender_active(sender_link) + if sender_link._handler: + sender_link._handler.sender_active(sender_link) else: receiver_link = pn_link.context assert isinstance(receiver_link, ReceiverLink) - receiver_link._handler.receiver_active(receiver_link) + if receiver_link._handler: + receiver_link._handler.receiver_active(receiver_link) pn_link = next_link # process the work queue @@ -298,11 +286,12 @@ Associate an arbitrary user object with this Connection. if pn_link.context: if pn_link.is_sender: sender_link = pn_link.context - sender_link._handler.sender_remote_closed(sender_link, None) + handler = pn_link.context._handler + handler.sender_remote_closed(sender_link, None) else: receiver_link = pn_link.context - receiver_link._handler.receiver_remote_closed(receiver_link, - None) + handler = pn_link.context._handler + handler.receiver_remote_closed(receiver_link, None) pn_link = next_link pn_link = self._pn_connection.link_head(self._CLOSED) @@ -350,8 +339,6 @@ Associate an arbitrary user object with this Connection. @property def needs_input(self): - """ - """ if self._read_done: return self.EOS capacity = self._pn_transport.capacity() @@ -361,8 +348,6 @@ Associate an arbitrary user object with this Connection. return self.EOS def process_input(self, in_data): - """ - """ c = self.needs_input if c <= 0: return c @@ -380,8 +365,6 @@ Associate an arbitrary user object with this Connection. @property def has_output(self): - """ - """ if self._write_done: return self.EOS @@ -392,8 +375,6 @@ Associate an arbitrary user object with this Connection. return self.EOS def output_data(self): - """ - """ c = self.has_output if c <= 0: return None @@ -411,7 +392,7 @@ Associate an arbitrary user object with this Connection. def create_sender(self, source_address, target_address=None, eventHandler=None, name=None, properties={}): - """Factory for Sender links""" + """Factory method for Sender links.""" ident = name or str(source_address) if ident in self._sender_links: raise KeyError("Sender %s already exists!" % ident) @@ -433,11 +414,12 @@ Associate an arbitrary user object with this Connection. if pn_link.remote_source.dynamic and not source_override: raise Exception("A source address must be supplied!") source_addr = source_override or pn_link.remote_source.address - self._sender_links[link_handle] = SenderLink(self, pn_link, - source_addr, - pn_link.remote_target.address, - event_handler, properties) - return self._sender_links[link_handle] + link = SenderLink(self, pn_link, + source_addr, + pn_link.remote_target.address, + event_handler, properties) + self._sender_links[link_handle] = link + return link def reject_sender(self, link_handle, reason): pn_link = self._sender_links.get(link_handle) @@ -449,7 +431,7 @@ Associate an arbitrary user object with this Connection. def create_receiver(self, target_address, source_address=None, eventHandler=None, name=None, properties={}): - """Factory for Receive links""" + """Factory method for creating Receive links.""" ident = name or str(target_address) if ident in self._receiver_links: raise KeyError("Receiver %s already exists!" % ident) @@ -470,11 +452,12 @@ Associate an arbitrary user object with this Connection. if pn_link.remote_target.dynamic and not target_override: raise Exception("A target address must be supplied!") target_addr = target_override or pn_link.remote_target.address - self._receiver_links[link_handle] = ReceiverLink(self, pn_link, - target_addr, - pn_link.remote_source.address, - event_handler, properties) - return self._receiver_links[link_handle] + link = ReceiverLink(self, pn_link, + target_addr, + pn_link.remote_source.address, + event_handler, properties) + self._receiver_links[link_handle] = link + return link def reject_receiver(self, link_handle, reason): pn_link = self._receiver_links.get(link_handle) @@ -484,7 +467,6 @@ Associate an arbitrary user object with this Connection. # @todo support reason for close pn_link.close() - def _remove_sender(self, name): if name in self._sender_links: del self._sender_links[name] @@ -492,4 +474,3 @@ Associate an arbitrary user object with this Connection. def _remove_receiver(self, name): if name in self._receiver_links: del self._receiver_links[name] - diff --git a/python/container.py b/python/container.py index ea0cd90..bee12a3 100644 --- a/python/container.py +++ b/python/container.py @@ -1,41 +1,38 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. # -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 +# http://www.apache.org/licenses/LICENSE-2.0 # -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. __all__ = [ "ContainerEventHandler", "Container" - ] +] + +import heapq +import logging -import heapq, logging from connection import Connection LOG = logging.getLogger(__name__) -# -# An implementation of an AMQP 1.0 container -# class ContainerEventHandler(object): # @todo - ContainerEventHandler pass +# An implementation of an AMQP 1.0 container class Container(object): def __init__(self, name, eventHandler=None, properties={}): self._name = name @@ -93,4 +90,3 @@ class Container(object): def _remove_connection(self, name): if name in self._connections: del self._connections[name] - diff --git a/python/link.py b/python/link.py index 42524ae..fb12725 100644 --- a/python/link.py +++ b/python/link.py @@ -1,36 +1,36 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. # -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. +# http://www.apache.org/licenses/LICENSE-2.0 # +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. __all__ = [ "SenderEventHandler", "SenderLink", "ReceiverEventHandler", "ReceiverLink" - ] +] -import collections, logging +import collections +import logging import proton LOG = logging.getLogger(__name__) + class _Link(object): - """Generic Link base class""" + """A generic Link base class.""" def __init__(self, connection, pn_link, target_address, source_address, handler, properties): @@ -73,8 +73,6 @@ class _Link(object): return self._name def open(self): - """ - """ self._pn_link.open() def _get_user_context(self): @@ -91,7 +89,7 @@ Associate an arbitrary application object with this link. @property def source_address(self): """If link is a sender, source is determined by the local value, else - use the remote + use the remote. """ if self._pn_link.is_sender: return self._pn_link.source.address @@ -101,7 +99,7 @@ Associate an arbitrary application object with this link. @property def target_address(self): """If link is a receiver, target is determined by the local value, else - use the remote + use the remote. """ if self._pn_link.is_receiver: return self._pn_link.target.address @@ -123,9 +121,8 @@ Associate an arbitrary application object with this link. self._pn_link.context = None self._pn_link = None + class SenderEventHandler(object): - """ - """ def sender_active(self, sender_link): LOG.debug("sender_active (ignored)") @@ -160,23 +157,22 @@ class SenderLink(_Link): # @todo - think about send-settle-mode configuration - def send(self, message, delivery_callback=None, handle=None, deadline=None): - """ - """ - self._pending_sends.append( (message, delivery_callback, handle, - deadline) ) + def send(self, message, delivery_callback=None, + handle=None, deadline=None): + self._pending_sends.append((message, delivery_callback, handle, + deadline)) # @todo deadline not supported yet assert not deadline, "send timeout not supported yet!" if deadline and (self._next_deadline == 0 or self._next_deadline > deadline): self._next_deadline = deadline - delivery = self._pn_link.delivery( "tag-%x" % self._next_tag ) + delivery = self._pn_link.delivery("tag-%x" % self._next_tag) self._next_tag += 1 if delivery.writable: send_req = self._pending_sends.popleft() - self._write_msg( delivery, send_req ) + self._write_msg(delivery, send_req) return 0 @@ -213,11 +209,12 @@ class SenderLink(_Link): proton.Disposition.REJECTED: SenderLink.REJECTED, proton.Disposition.RELEASED: SenderLink.RELEASED, proton.Disposition.MODIFIED: SenderLink.MODIFIED, - } + } if delivery.tag in self._pending_acks: if delivery.settled: # remote has finished - LOG.debug("delivery updated, remote state=%s", str(delivery.remote_state)) + LOG.debug("delivery updated, remote state=%s", + str(delivery.remote_state)) send_req = self._pending_acks.pop(delivery.tag) state = _disposition_state_map.get(delivery.remote_state, @@ -241,7 +238,7 @@ class SenderLink(_Link): # send_req = (msg, cb, handle, deadline) msg = send_req[0] cb = send_req[1] - self._pn_link.send( msg.encode() ) + self._pn_link.send(msg.encode()) self._pn_link.advance() if cb: # delivery callback given assert delivery.tag not in self._pending_acks @@ -273,7 +270,7 @@ class ReceiverLink(_Link): target_address, source_address, eventHandler, properties) self._next_handle = 0 - self._unsettled_deliveries = {} # indexed by handle + self._unsettled_deliveries = {} # indexed by handle # @todo - think about receiver-settle-mode configuration @@ -327,4 +324,3 @@ class ReceiverLink(_Link): delivery = self._unsettled_deliveries.pop(handle) delivery.update(result) delivery.settle() - diff --git a/python/sockets.py b/python/sockets.py index e4393bf..efcbc74 100644 --- a/python/sockets.py +++ b/python/sockets.py @@ -1,28 +1,28 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. # -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at # -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. +# http://www.apache.org/licenses/LICENSE-2.0 # +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. __all__ = [ "read_socket_input", "write_socket_output" - ] +] -import socket, errno, logging +import errno +import logging +import socket from connection import Connection @@ -32,6 +32,7 @@ LOG = logging.getLogger(__name__) processing. """ + def read_socket_input(connection, socket_obj): """Read from the network layer and processes all data read. Can support both blocking and non-blocking sockets. @@ -44,57 +45,58 @@ def read_socket_input(connection, socket_obj): try: sock_data = socket_obj.recv(count) - except socket.timeout, e: + except socket.timeout as e: LOG.debug("Socket timeout exception %s", str(e)) raise # caller must handle - except socket.error, e: + except socket.error as e: LOG.debug("Socket error exception %s", str(e)) err = e.args[0] # ignore non-fatal errors if (err != errno.EAGAIN and - err != errno.EWOULDBLOCK and - err != errno.EINTR): + err != errno.EWOULDBLOCK and + err != errno.EINTR): # otherwise, unrecoverable: connection.close_input() raise # caller must handle - except Exception, e: # beats me... assume fatal + except Exception as e: # beats me... assume fatal LOG.debug("unknown socket exception %s", str(e)) connection.close_input() raise # caller must handle if sock_data: - count = connection.process_input( sock_data ) + count = connection.process_input(sock_data) else: LOG.debug("Socket closed") count = Connection.EOS connection.close_input() return count + def write_socket_output(connection, socket_obj): """Write data to the network layer. Can support both blocking and non-blocking sockets. """ count = connection.has_output if count <= 0: - return count # 0 or EOS + return count # 0 or EOS data = connection.output_data() try: count = socket_obj.send(data) - except socket.timeout, e: + except socket.timeout as e: LOG.debug("Socket timeout exception %s", str(e)) - raise # caller must handle - except socket.error, e: + raise # caller must handle + except socket.error as e: LOG.debug("Socket error exception %s", str(e)) err = e.args[0] # ignore non-fatal errors if (err != errno.EAGAIN and - err != errno.EWOULDBLOCK and - err != errno.EINTR): + err != errno.EWOULDBLOCK and + err != errno.EINTR): # otherwise, unrecoverable connection.close_output() raise - except Exception, e: # beats me... assume fatal + except Exception as e: # beats me... assume fatal LOG.debug("unknown socket exception %s", str(e)) connection.close_output() raise @@ -106,4 +108,3 @@ def write_socket_output(connection, socket_obj): count = Connection.EOS connection.close_output() return count -