## # Copyright (c) 2013 Yury Selivanov # License: Apache 2.0 ## """Greensocket (non-blocking) for asyncio. Use ``greenio.socket`` in the same way as you would use stdlib's ``socket.socket`` in ``greenio.task`` tasks or coroutines invoked from them. """ from __future__ import absolute_import from greenio import asyncio from socket import error, SOCK_STREAM from socket import socket as std_socket from . import yield_from from . import _GreenLoopMixin class socket: def __init__(self, *args, **kwargs): _from_sock = kwargs.pop('_from_sock', None) if _from_sock: own_sock = None self._sock = _from_sock else: own_sock = std_socket(*args, **kwargs) self._sock = own_sock try: self._sock.setblocking(False) self._loop = asyncio.get_event_loop() assert isinstance(self._loop, _GreenLoopMixin), \ 'greenio event loop is required' except: if own_sock is not None: # An unexpected error has occurred. Close the # socket object if it was created in __init__. own_sock.close() raise @classmethod def from_socket(cls, sock): return cls(_from_sock=sock) @property def family(self): return self._sock.family @property def type(self): return self._sock.type @property def proto(self): return self._sock.proto def _proxy(attr): def proxy(self, *args, **kwargs): meth = getattr(self._sock, attr) return meth(*args, **kwargs) proxy.__name__ = attr proxy.__qualname__ = attr proxy.__doc__ = getattr(getattr(std_socket, attr), '__doc__', None) return proxy def _copydoc(func): func.__doc__ = getattr( getattr(std_socket, func.__name__), '__doc__', None) return func @_copydoc def setblocking(self, flag): if flag: raise error('greenio.socket does not support blocking mode') @_copydoc def recv(self, nbytes): fut = self._loop.sock_recv(self._sock, nbytes) yield_from(fut) return fut.result() @_copydoc def connect(self, addr): fut = self._loop.sock_connect(self._sock, addr) yield_from(fut) return fut.result() @_copydoc def sendall(self, data, flags=0): assert not flags fut = self._loop.sock_sendall(self._sock, data) yield_from(fut) return fut.result() @_copydoc def send(self, data, flags=0): self.sendall(data, flags) return len(data) @_copydoc def accept(self): fut = self._loop.sock_accept(self._sock) yield_from(fut) sock, addr = fut.result() return self.__class__.from_socket(sock), addr @_copydoc def makefile(self, mode, *args, **kwargs): if mode == 'rb': return ReadFile(self._loop, self._sock) elif mode == 'wb': return WriteFile(self._loop, self._sock) raise NotImplementedError bind = _proxy('bind') listen = _proxy('listen') getsockname = _proxy('getsockname') getpeername = _proxy('getpeername') gettimeout = _proxy('gettimeout') getsockopt = _proxy('getsockopt') setsockopt = _proxy('setsockopt') fileno = _proxy('fileno') # socket.detach() was added in Python 3.2 if hasattr(std_socket, 'detach'): detach = _proxy('detach') close = _proxy('close') shutdown = _proxy('shutdown') del _copydoc, _proxy class ReadFile: def __init__(self, loop, sock): self._loop = loop self._sock = sock self._buf = bytearray() def read(self, size): while 1: if size <= len(self._buf): data = self._buf[:size] del self._buf[:size] return data fut = self._loop.sock_recv(self._sock, size - len(self._buf)) yield_from(fut) res = fut.result() self._buf.extend(res) if size <= len(self._buf): data = self._buf[:size] del self._buf[:size] return data else: data = self._buf[:] del self._buf[:] return data def close(self): pass class WriteFile: def __init__(self, loop, sock): self._loop = loop self._sock = sock def write(self, data): fut = self._loop.sock_sendall(self._sock, data) yield_from(fut) return fut.result() def flush(self): pass def close(self): pass def create_connection(address, timeout=None): loop = asyncio.get_event_loop() host, port = address rslt = yield_from( loop.getaddrinfo(host, port, family=0, type=SOCK_STREAM)) for res in rslt: af, socktype, proto, canonname, sa = res sock = None try: sock = socket(af, socktype, proto) sock.connect(sa) return sock except ConnectionError: if sock: sock.close() raise error('unable to connect to {!r}'.format(address))