General cleanup and refactor of a lot of the API/WSGI service code.
This commit is contained in:
18
bin/nova-api
18
bin/nova-api
@@ -24,6 +24,8 @@ import gettext
|
||||
import os
|
||||
import sys
|
||||
|
||||
import eventlet.pool
|
||||
|
||||
# If ../nova/__init__.py exists, add ../ to Python search path, so that
|
||||
# it will override what happens to be installed in /usr/(local/)lib/python...
|
||||
possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
|
||||
@@ -46,6 +48,13 @@ LOG = logging.getLogger('nova.api')
|
||||
|
||||
FLAGS = flags.FLAGS
|
||||
|
||||
|
||||
def launch(service_name):
|
||||
_service = service.WSGIService(service_name)
|
||||
_service.start()
|
||||
_service.wait()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
utils.default_flagfile()
|
||||
FLAGS(sys.argv)
|
||||
@@ -57,5 +66,10 @@ if __name__ == '__main__':
|
||||
flag_get = FLAGS.get(flag, None)
|
||||
LOG.debug("%(flag)s : %(flag_get)s" % locals())
|
||||
|
||||
service = service.serve_wsgi(service.ApiService)
|
||||
service.wait()
|
||||
|
||||
pool = eventlet.pool.Pool()
|
||||
pool.execute(launch, "ec2")
|
||||
pool.execute(launch, "osapi")
|
||||
pool.wait_all()
|
||||
|
||||
print >>sys.stderr, "Exiting..."
|
||||
|
||||
13
nova/log.py
13
nova/log.py
@@ -314,3 +314,16 @@ logging.setLoggerClass(NovaLogger)
|
||||
def audit(msg, *args, **kwargs):
|
||||
"""Shortcut for logging to root log with sevrity 'AUDIT'."""
|
||||
logging.root.log(AUDIT, msg, *args, **kwargs)
|
||||
|
||||
|
||||
class WritableLogger(object):
|
||||
"""A thin wrapper that responds to `write` and logs."""
|
||||
|
||||
def __init__(self, logger, level=logging.DEBUG):
|
||||
self.logger = logger
|
||||
self.level = level
|
||||
|
||||
def write(self, msg):
|
||||
self.logger.log(self.level, msg)
|
||||
|
||||
|
||||
|
||||
@@ -232,45 +232,45 @@ class Service(object):
|
||||
logging.exception(_('model server went away'))
|
||||
|
||||
|
||||
class WsgiService(object):
|
||||
"""Base class for WSGI based services.
|
||||
class WSGIService(object):
|
||||
"""Provides ability to launch API from a 'paste' configuration."""
|
||||
|
||||
For each api you define, you must also define these flags:
|
||||
:<api>_listen: The address on which to listen
|
||||
:<api>_listen_port: The port on which to listen
|
||||
def __init__(self, name, config_name=None):
|
||||
"""Initialize, but do not start, an API service."""
|
||||
self.name = name
|
||||
self._config_name = config_name or FLAGS.api_paste_config
|
||||
self._config_location = self._find_config()
|
||||
self._config = self._load_config()
|
||||
self.application = self._load_application()
|
||||
host = getattr(FLAGS, '%s_listen' % name, "0.0.0.0")
|
||||
port = getattr(FLAGS, '%s_listen_port' % name, 0)
|
||||
self.server = wsgi.Server(name, self.application, host, port)
|
||||
|
||||
"""
|
||||
def _find_config(self):
|
||||
"""Attempt to find 'paste' configuration file."""
|
||||
location = wsgi.paste_config_file(self._config_name)
|
||||
logging.debug(_("Using paste.deploy config at: %s"), location)
|
||||
return location
|
||||
|
||||
def __init__(self, conf, apis):
|
||||
self.conf = conf
|
||||
self.apis = apis
|
||||
self.wsgi_app = None
|
||||
def _load_config(self):
|
||||
"""Read and return the 'paste' configuration file."""
|
||||
return wsgi.load_paste_configuration(self._config_location, self.name)
|
||||
|
||||
def _load_application(self):
|
||||
"""Using the loaded configuration, return the WSGI application."""
|
||||
return wsgi.load_paste_app(self._config_location, self.name)
|
||||
|
||||
def start(self):
|
||||
self.wsgi_app = _run_wsgi(self.conf, self.apis)
|
||||
"""Start serving this API using loaded configuration."""
|
||||
self.server.start()
|
||||
|
||||
def stop(self):
|
||||
"""Stop serving this API."""
|
||||
self.server.stop()
|
||||
|
||||
def wait(self):
|
||||
self.wsgi_app.wait()
|
||||
|
||||
def get_socket_info(self, api_name):
|
||||
"""Returns the (host, port) that an API was started on."""
|
||||
return self.wsgi_app.socket_info[api_name]
|
||||
|
||||
|
||||
class ApiService(WsgiService):
|
||||
"""Class for our nova-api service."""
|
||||
|
||||
@classmethod
|
||||
def create(cls, conf=None):
|
||||
if not conf:
|
||||
conf = wsgi.paste_config_file(FLAGS.api_paste_config)
|
||||
if not conf:
|
||||
message = (_('No paste configuration found for: %s'),
|
||||
FLAGS.api_paste_config)
|
||||
raise exception.Error(message)
|
||||
api_endpoints = ['ec2', 'osapi']
|
||||
service = cls(conf, api_endpoints)
|
||||
return service
|
||||
"""Wait for the service to stop serving this API."""
|
||||
self.server.wait()
|
||||
|
||||
|
||||
def serve(*services):
|
||||
@@ -321,29 +321,3 @@ def serve_wsgi(cls, conf=None):
|
||||
service.start()
|
||||
|
||||
return service
|
||||
|
||||
|
||||
def _run_wsgi(paste_config_file, apis):
|
||||
logging.debug(_('Using paste.deploy config at: %s'), paste_config_file)
|
||||
apps = []
|
||||
for api in apis:
|
||||
config = wsgi.load_paste_configuration(paste_config_file, api)
|
||||
if config is None:
|
||||
logging.debug(_('No paste configuration for app: %s'), api)
|
||||
continue
|
||||
logging.debug(_('App Config: %(api)s\n%(config)r') % locals())
|
||||
logging.info(_('Running %s API'), api)
|
||||
app = wsgi.load_paste_app(paste_config_file, api)
|
||||
apps.append((app,
|
||||
getattr(FLAGS, '%s_listen_port' % api),
|
||||
getattr(FLAGS, '%s_listen' % api),
|
||||
api))
|
||||
if len(apps) == 0:
|
||||
logging.error(_('No known API applications configured in %s.'),
|
||||
paste_config_file)
|
||||
return
|
||||
|
||||
server = wsgi.Server()
|
||||
for app in apps:
|
||||
server.start(*app)
|
||||
return server
|
||||
|
||||
23
nova/test.py
23
nova/test.py
@@ -38,7 +38,6 @@ from nova import flags
|
||||
from nova import rpc
|
||||
from nova import utils
|
||||
from nova import service
|
||||
from nova import wsgi
|
||||
from nova.virt import fake
|
||||
|
||||
|
||||
@@ -81,7 +80,6 @@ class TestCase(unittest.TestCase):
|
||||
self.injected = []
|
||||
self._services = []
|
||||
self._monkey_patch_attach()
|
||||
self._monkey_patch_wsgi()
|
||||
self._original_flags = FLAGS.FlagValuesDict()
|
||||
rpc.ConnectionPool = rpc.Pool(max_size=FLAGS.rpc_conn_pool_size)
|
||||
|
||||
@@ -107,7 +105,6 @@ class TestCase(unittest.TestCase):
|
||||
|
||||
# Reset our monkey-patches
|
||||
rpc.Consumer.attach_to_eventlet = self.original_attach
|
||||
wsgi.Server.start = self.original_start
|
||||
|
||||
# Stop any timers
|
||||
for x in self.injected:
|
||||
@@ -163,26 +160,6 @@ class TestCase(unittest.TestCase):
|
||||
_wrapped.func_name = self.original_attach.func_name
|
||||
rpc.Consumer.attach_to_eventlet = _wrapped
|
||||
|
||||
def _monkey_patch_wsgi(self):
|
||||
"""Allow us to kill servers spawned by wsgi.Server."""
|
||||
self.original_start = wsgi.Server.start
|
||||
|
||||
@functools.wraps(self.original_start)
|
||||
def _wrapped_start(inner_self, *args, **kwargs):
|
||||
original_spawn_n = inner_self.pool.spawn_n
|
||||
|
||||
@functools.wraps(original_spawn_n)
|
||||
def _wrapped_spawn_n(*args, **kwargs):
|
||||
rv = greenthread.spawn(*args, **kwargs)
|
||||
self._services.append(rv)
|
||||
|
||||
inner_self.pool.spawn_n = _wrapped_spawn_n
|
||||
self.original_start(inner_self, *args, **kwargs)
|
||||
inner_self.pool.spawn_n = original_spawn_n
|
||||
|
||||
_wrapped_start.func_name = self.original_start.func_name
|
||||
wsgi.Server.start = _wrapped_start
|
||||
|
||||
# Useful assertions
|
||||
def assertDictMatch(self, d1, d2, approx_equal=False, tolerance=0.001):
|
||||
"""Assert two dicts are equivalent.
|
||||
|
||||
61
nova/wsgi.py
61
nova/wsgi.py
@@ -43,46 +43,45 @@ FLAGS = flags.FLAGS
|
||||
LOG = logging.getLogger('nova.wsgi')
|
||||
|
||||
|
||||
class WritableLogger(object):
|
||||
"""A thin wrapper that responds to `write` and logs."""
|
||||
|
||||
def __init__(self, logger, level=logging.DEBUG):
|
||||
self.logger = logger
|
||||
self.level = level
|
||||
|
||||
def write(self, msg):
|
||||
self.logger.log(self.level, msg)
|
||||
|
||||
|
||||
class Server(object):
|
||||
"""Server class to manage multiple WSGI sockets and applications."""
|
||||
|
||||
def __init__(self, threads=1000):
|
||||
self.pool = eventlet.GreenPool(threads)
|
||||
self.socket_info = {}
|
||||
default_pool_size = 1000
|
||||
logger_name = "eventlet.wsgi.server"
|
||||
|
||||
def start(self, application, port, host='0.0.0.0', key=None, backlog=128):
|
||||
"""Run a WSGI server with the given application."""
|
||||
arg0 = sys.argv[0]
|
||||
logging.audit(_('Starting %(arg0)s on %(host)s:%(port)s') % locals())
|
||||
socket = eventlet.listen((host, port), backlog=backlog)
|
||||
self.pool.spawn_n(self._run, application, socket)
|
||||
if key:
|
||||
self.socket_info[key] = socket.getsockname()
|
||||
def __init__(self, name, app, host, port, pool_size=None):
|
||||
self.name = name
|
||||
self.app = app
|
||||
self.host = host
|
||||
self.port = port
|
||||
self._pool = eventlet.GreenPool(pool_size or self.default_pool_size)
|
||||
self._log = logging.WritableLogger(logging.getLogger(self.logger_name))
|
||||
|
||||
def _start(self, socket):
|
||||
"""Blocking eventlet WSGI server launched from the real 'start'."""
|
||||
eventlet.wsgi.server(socket,
|
||||
self.app,
|
||||
custom_pool=self._pool,
|
||||
log=self._log)
|
||||
|
||||
def start(self, backlog=128):
|
||||
"""Serve given WSGI application using the given parameters."""
|
||||
socket = eventlet.listen((self.host, self.port), backlog=backlog)
|
||||
self._server = eventlet.spawn(self._start, socket)
|
||||
(self.host, self.port) = socket.getsockname()
|
||||
LOG.info(_('Starting %(app)s on %(host)s:%(port)s') % self.__dict__)
|
||||
|
||||
def stop(self):
|
||||
"""Stop this server by killing the greenthread running it."""
|
||||
self._server.kill()
|
||||
|
||||
def wait(self):
|
||||
"""Wait until all servers have completed running."""
|
||||
"""Wait until server has been stopped."""
|
||||
try:
|
||||
self.pool.waitall()
|
||||
self._server.wait()
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
|
||||
def _run(self, application, socket):
|
||||
"""Start a WSGI server in a new green thread."""
|
||||
logger = logging.getLogger('eventlet.wsgi.server')
|
||||
eventlet.wsgi.server(socket, application, custom_pool=self.pool,
|
||||
log=WritableLogger(logger))
|
||||
|
||||
|
||||
class Request(webob.Request):
|
||||
pass
|
||||
@@ -340,6 +339,8 @@ def paste_config_file(basename):
|
||||
if os.path.exists(configfile):
|
||||
return configfile
|
||||
|
||||
raise Exception(_("Unable to find paste.deploy config '%s'") % basename)
|
||||
|
||||
|
||||
def load_paste_configuration(filename, appname):
|
||||
"""Returns a paste configuration dict, or None."""
|
||||
|
||||
Reference in New Issue
Block a user