# Copyright (C) 2013 Nippon Telegraph and Telephone Corporation. # Copyright (C) 2013 YAMAMOTO Takashi # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. # See the License for the specific language governing permissions and # limitations under the License. import logging import os from os_ken.lib import ip # We don't bother to use cfg.py because monkey patch needs to be # called very early. Instead, we use an environment variable to # select the type of hub. HUB_TYPE = os.getenv('OSKEN_HUB_TYPE', 'eventlet') LOG = logging.getLogger('os_ken.lib.hub') if HUB_TYPE == 'eventlet': import eventlet # HACK: # sleep() is the workaround for the following issue. # https://github.com/eventlet/eventlet/issues/401 eventlet.sleep() import eventlet.event import eventlet.queue import eventlet.semaphore import eventlet.timeout import eventlet.wsgi from eventlet import websocket import greenlet import ssl import socket import traceback getcurrent = eventlet.getcurrent patch = eventlet.monkey_patch sleep = eventlet.sleep listen = eventlet.listen connect = eventlet.connect def spawn(*args, **kwargs): raise_error = kwargs.pop('raise_error', False) def _launch(func, *args, **kwargs): # Mimic gevent's default raise_error=False behaviour # by not propagating an exception to the joiner. try: return func(*args, **kwargs) except TaskExit: pass except BaseException as e: if raise_error: raise e # Log uncaught exception. # Note: this is an intentional divergence from gevent # behaviour; gevent silently ignores such exceptions. LOG.error('hub: uncaught exception: %s', traceback.format_exc()) return eventlet.spawn(_launch, *args, **kwargs) def spawn_after(seconds, *args, **kwargs): raise_error = kwargs.pop('raise_error', False) def _launch(func, *args, **kwargs): # Mimic gevent's default raise_error=False behaviour # by not propagating an exception to the joiner. try: return func(*args, **kwargs) except TaskExit: pass except BaseException as e: if raise_error: raise e # Log uncaught exception. # Note: this is an intentional divergence from gevent # behaviour; gevent silently ignores such exceptions. LOG.error('hub: uncaught exception: %s', traceback.format_exc()) return eventlet.spawn_after(seconds, _launch, *args, **kwargs) def kill(thread): thread.kill() def joinall(threads): for t in threads: # This try-except is necessary when killing an inactive # greenthread. try: t.wait() except TaskExit: pass Queue = eventlet.queue.LightQueue QueueEmpty = eventlet.queue.Empty Semaphore = eventlet.semaphore.Semaphore BoundedSemaphore = eventlet.semaphore.BoundedSemaphore TaskExit = greenlet.GreenletExit class StreamServer(object): def __init__(self, listen_info, handle=None, backlog=None, spawn='default', **ssl_args): assert backlog is None assert spawn == 'default' if ip.valid_ipv6(listen_info[0]): self.server = eventlet.listen(listen_info, family=socket.AF_INET6) elif os.path.isdir(os.path.dirname(listen_info[0])): # Case for Unix domain socket self.server = eventlet.listen(listen_info[0], family=socket.AF_UNIX) else: self.server = eventlet.listen(listen_info) if ssl_args: def wrap_and_handle(sock, addr): ssl_args.setdefault('server_side', True) handle(ssl.wrap_socket(sock, **ssl_args), addr) self.handle = wrap_and_handle else: self.handle = handle def serve_forever(self): while True: sock, addr = self.server.accept() spawn(self.handle, sock, addr) class StreamClient(object): def __init__(self, addr, timeout=None, **ssl_args): assert ip.valid_ipv4(addr[0]) or ip.valid_ipv6(addr[0]) self.addr = addr self.timeout = timeout self.ssl_args = ssl_args self._is_active = True def connect(self): try: if self.timeout is not None: client = socket.create_connection(self.addr, timeout=self.timeout) else: client = socket.create_connection(self.addr) except socket.error: return None if self.ssl_args: client = ssl.wrap_socket(client, **self.ssl_args) return client def connect_loop(self, handle, interval): while self._is_active: sock = self.connect() if sock: handle(sock, self.addr) sleep(interval) def stop(self): self._is_active = False class LoggingWrapper(object): def write(self, message): LOG.info(message.rstrip('\n')) class WSGIServer(StreamServer): def serve_forever(self): self.logger = LoggingWrapper() eventlet.wsgi.server(self.server, self.handle, self.logger) WebSocketWSGI = websocket.WebSocketWSGI Timeout = eventlet.timeout.Timeout class Event(object): def __init__(self): self._ev = eventlet.event.Event() self._cond = False def _wait(self, timeout=None): while not self._cond: self._ev.wait() def _broadcast(self): self._ev.send() # Since eventlet Event doesn't allow multiple send() operations # on an event, re-create the underlying event. # Note: _ev.reset() is obsolete. self._ev = eventlet.event.Event() def is_set(self): return self._cond def set(self): self._cond = True self._broadcast() def clear(self): self._cond = False def wait(self, timeout=None): if timeout is None: self._wait() else: try: with Timeout(timeout): self._wait() except Timeout: pass return self._cond