diff --git a/examples/python/rpc-client.py b/examples/python/rpc-client.py index 8935dd8..1ae7296 100644 --- a/examples/python/rpc-client.py +++ b/examples/python/rpc-client.py @@ -64,56 +64,12 @@ class MyConnection(fusion.ConnectionEventHandler): LOG.debug("select() returned") if readable: - count = self.connection.needs_input - if count > 0: - try: - sock_data = self.socket.recv(count) - if sock_data: - self.connection.process_input( sock_data ) - else: - # closed? - self.connection.close_input() - except socket.timeout, e: - raise # I don't expect this - except socket.error, e: - err = e.args[0] - # ignore non-fatal errors - if (err != errno.EAGAIN and - err != errno.EWOULDBLOCK and - err != errno.EINTR): - # otherwise, unrecoverable: - self.connection.close_input() - raise - except: # beats me... - self.connection.close_input() - raise - - if writable: - data = self.connection.output_data() - if data: - try: - rc = self.socket.send(data) - if rc > 0: - self.connection.output_written(rc) - else: - # else socket closed - self.connection.close_output() - except socket.timeout, e: - raise # I don't expect this - except socket.error, e: - err = e.args[0] - # ignore non-fatal errors - if (err != errno.EAGAIN and - err != errno.EWOULDBLOCK and - err != errno.EINTR): - # otherwise, unrecoverable - self.connection.close_output() - raise - except: # beats me... - self.connection.close_output() - raise - + rc = fusion.read_socket_input(self.connection, + self.socket) self.connection.process(time.time()) + if writable: + rc = fusion.write_socket_output(self.connection, + self.socket) def close(self, error=None): self.connection.close(error) diff --git a/examples/python/rpc-server.py b/examples/python/rpc-server.py index fc4adda..59c26ab 100644 --- a/examples/python/rpc-server.py +++ b/examples/python/rpc-server.py @@ -60,7 +60,22 @@ class SocketConnection(fusion.ConnectionEventHandler): """ return self.socket.fileno() + def process_input(self): + """Called when socket is read-ready""" + rc = fusion.read_socket_input(self.connection, + self.socket) + self.connection.process(time.time()) + return rc + + def send_output(self): + """Called when socket is write-ready""" + rc = fusion.write_socket_output(self.connection, + self.socket) + self.connection.process(time.time()) + return rc + # ConnectionEventHandler callbacks: + def connection_active(self, connection): LOG.debug("Connection active callback") @@ -289,30 +304,7 @@ def main(argv=None): conn_properties) else: assert isinstance(r, SocketConnection) - count = r.connection.needs_input - if count > 0: - try: - sock_data = r.socket.recv(count) - if sock_data: - r.connection.process_input( sock_data ) - else: - # closed? - r.connection.close_input() - except socket.timeout, e: - raise # I don't expect this - except socket.error, e: - err = e.args[0] - # ignore non-fatal errors - if (err != errno.EAGAIN and - err != errno.EWOULDBLOCK and - err != errno.EINTR): - # otherwise, unrecoverable: - r.connection.close_input() - raise - except: # beats me... - r.connection.close_input() - raise - r.connection.process(time.time()) + rc = r.process_input() for t in timers: now = time.time() @@ -322,30 +314,7 @@ def main(argv=None): for w in writable: assert isinstance(w, SocketConnection) - data = w.connection.output_data() - if data: - try: - rc = w.socket.send(data) - if rc > 0: - w.connection.output_written(rc) - else: - # else socket closed - w.connection.close_output() - except socket.timeout, e: - raise # I don't expect this - except socket.error, e: - err = e.args[0] - # ignore non-fatal errors - if (err != errno.EAGAIN and - err != errno.EWOULDBLOCK and - err != errno.EINTR): - # otherwise, unrecoverable - w.connection.close_output() - raise - except: # beats me... - w.connection.close_output() - raise - w.connection.process(time.time()) + rc = w.send_output() return 0 diff --git a/python/__init__.py b/python/__init__.py index 1ac019d..4f46673 100644 --- a/python/__init__.py +++ b/python/__init__.py @@ -19,3 +19,4 @@ from fusion.container import * from fusion.connection import * from fusion.link import * +from fusion.sockets import * diff --git a/python/sockets.py b/python/sockets.py new file mode 100644 index 0000000..49131c3 --- /dev/null +++ b/python/sockets.py @@ -0,0 +1,108 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +import socket, errno, logging + +from connection import Connection + +LOG = logging.getLogger(__name__) + +"""helper methods that provide boilerplate socket I/O and Connection + processing. +""" + +def read_socket_input(connection, socket): + """Read from the network layer and processes all data read. Can + support both blocking and non-blocking sockets. + Returns the number of input bytes processed, or EOS if input processing + is done. Any exceptions raised by the socket are re-raised. + """ + count = connection.needs_input + if count <= 0: + return count # 0 or EOS + + try: + sock_data = socket.recv(count) + except socket.timeout, e: + LOG.debug("Socket timeout exception %s", str(e)) + raise # caller must handle + except socket.error, e: + LOG.debug("Socket error exception %s", str(e)) + err = e.args[0] + # ignore non-fatal errors + if (err != errno.EAGAIN and + err != errno.EWOULDBLOCK and + err != errno.EINTR): + # otherwise, unrecoverable: + connection.close_input() + raise # caller must handle + except Exception, e: # beats me... assume fatal + LOG.debug("unknown socket exception %s", str(e)) + connection.close_input() + raise # caller must handle + + if sock_data: + count = connection.process_input( sock_data ) + else: + LOG.debug("Socket closed") + count = Connection.EOS + connection.close_input() + return count + +def write_socket_output(connection, socket): + """Write data to the network layer. Can support both blocking and + non-blocking sockets. + """ + count = connection.has_output + if count <= 0: + return count # 0 or EOS + + data = connection.output_data() + try: + count = socket.send(data) + except socket.timeout, e: + LOG.debug("Socket timeout exception %s", str(e)) + raise # caller must handle + except socket.error, e: + LOG.debug("Socket error exception %s", str(e)) + err = e.args[0] + # ignore non-fatal errors + if (err != errno.EAGAIN and + err != errno.EWOULDBLOCK and + err != errno.EINTR): + # otherwise, unrecoverable + connection.close_output() + raise + except Exception, e: # beats me... assume fatal + LOG.debug("unknown socket exception %s", str(e)) + connection.close_output() + raise + + if count > 0: + connection.output_written(count) + elif data: + LOG.debug("Socket closed") + count = Connection.EOS + connection.close_output() + return count + + +__all__ = [ + "read_socket_input", + "write_socket_output" + ]