Refactor congress eventlet server code

This patch removes the current wsgi-server in place of the one that keystone
uses which is basically identical to the one that was already in congress.
The only reason I switched it out for the keystone one is just so copying
the keystone integration out of keystone to congress would be easier.
In addition, this patch adds mutiworker support so can use multiple processes
to run our api server (though currently we just default that to 1).
This patch also includes test cases for the eventlet server.

Change-Id: I25ab083d359a4bd85c93356e5261d31061a7e9aa
Closes-bug: 1339880
This commit is contained in:
Aaron Rosen
2014-07-09 13:40:23 -07:00
parent 2501734d07
commit 23304a92bf
6 changed files with 340 additions and 134 deletions

View File

@@ -1,122 +0,0 @@
# Copyright (c) 2013 VMware, Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import errno
import socket
import sys
import time
import eventlet.wsgi
eventlet.patcher.monkey_patch(all=False, socket=True)
from congress.openstack.common import log as logging
LOG = logging.getLogger(__name__)
# Number of seconds to keep retrying to listen
RETRY_UNTIL_WINDOW = 30
# Sets the value of TCP_KEEPIDLE in seconds for each server socket.
TCP_KEEPIDLE = 600
# Number of backlog requests to configure the socket with
BACKLOG = 4096
class Server(object):
"""Server class to manage multiple WSGI sockets and applications."""
def __init__(self, name, threads=1000, pool=None):
self.pool = pool or eventlet.GreenPool(threads)
self.name = name
def _get_socket(self, host, port, backlog):
bind_addr = (host, port)
# TODO(dims): eventlet's green dns/socket module does not actually
# support IPv6 in getaddrinfo(). We need to get around this in the
# future or monitor upstream for a fix
try:
info = socket.getaddrinfo(bind_addr[0],
bind_addr[1],
socket.AF_UNSPEC,
socket.SOCK_STREAM)[0]
family = info[0]
bind_addr = info[-1]
except Exception:
LOG.error(("Unable to listen on %(host)s:%(port)s") %
{'host': host, 'port': port})
sys.exit(1)
sock = None
retry_until = time.time() + RETRY_UNTIL_WINDOW
while not sock and time.time() < retry_until:
try:
sock = eventlet.listen(bind_addr,
backlog=backlog,
family=family)
except socket.error as err:
if err.errno != errno.EADDRINUSE:
raise
eventlet.sleep(0.1)
if not sock:
raise RuntimeError(("Could not bind to %(host)s:%(port)s "
"after trying for %(time)d seconds") %
{'host': host,
'port': port,
'time': RETRY_UNTIL_WINDOW})
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
# sockets can hang around forever without keepalive
sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
if hasattr(socket, 'TCP_KEEPIDLE'):
sock.setsockopt(socket.IPPROTO_TCP,
socket.TCP_KEEPIDLE,
TCP_KEEPIDLE)
return sock
def start(self, application, port, host='0.0.0.0'):
"""Run a WSGI server with the given application."""
self._host = host
self._port = port
backlog = BACKLOG
self._socket = self._get_socket(self._host,
self._port,
backlog=backlog)
self._server = self.pool.spawn(self._run, application, self._socket)
@property
def host(self):
return self._socket.getsockname()[0] if self._socket else self._host
@property
def port(self):
return self._socket.getsockname()[1] if self._socket else self._port
def stop(self):
self._server.kill()
def wait(self):
"""Wait until all servers have completed running."""
try:
self.pool.waitall()
except KeyboardInterrupt:
pass
def _run(self, application, socket):
"""Start a WSGI server in a new green thread."""
eventlet.wsgi.server(socket, application, custom_pool=self.pool)

View File

@@ -24,7 +24,17 @@ core_opts = [
help="The port to bind to"),
cfg.IntOpt('max_simultaneous_requests', default=1024,
help="Thread pool size for eventlet."),
cfg.BoolOpt('tcp_keepalive', default=False,
help='Set this to true to enable TCP_KEEALIVE socket option '
'on connections received by the API server.'),
cfg.IntOpt('tcp_keepidle',
default=600,
help='Sets the value of TCP_KEEPIDLE in seconds for each '
'server socket. Only applies if tcp_keepalive is '
'true. Not supported on OS X.'),
cfg.IntOpt('api_workers', default=1,
help='The number of worker processes to serve the congress '
'API application.'),
]
# Register the configuration options

View File

@@ -0,0 +1,172 @@
# Copyright 2012 OpenStack Foundation
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# Copyright 2010 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import errno
import re
import socket
import ssl
import sys
import eventlet
import eventlet.wsgi
import greenlet
from congress.openstack.common.gettextutils import _
from congress.openstack.common import log
LOG = log.getLogger(__name__)
class EventletFilteringLogger(log.WritableLogger):
# NOTE(morganfainberg): This logger is designed to filter out specific
# Tracebacks to limit the amount of data that eventlet can log. In the
# case of broken sockets (EPIPE and ECONNRESET), we are seeing a huge
# volume of data being written to the logs due to ~14 lines+ per traceback.
# The traceback in these cases are, at best, useful for limited debugging
# cases.
def __init__(self, *args, **kwargs):
super(EventletFilteringLogger, self).__init__(*args, **kwargs)
self.regex = re.compile(r'errno (%d|%d)' %
(errno.EPIPE, errno.ECONNRESET), re.IGNORECASE)
def write(self, msg):
m = self.regex.search(msg)
if m:
self.logger.log(log.logging.DEBUG, 'Error(%s) writing to socket.',
m.group(1))
else:
self.logger.log(self.level, msg.rstrip())
class Server(object):
"""Server class to manage multiple WSGI sockets and applications."""
def __init__(self, application, host=None, port=None, threads=1000,
keepalive=False, keepidle=None):
self.application = application
self.host = host or '0.0.0.0'
self.port = port or 0
self.pool = eventlet.GreenPool(threads)
self.socket_info = {}
self.greenthread = None
self.do_ssl = False
self.cert_required = False
self.keepalive = keepalive
self.keepidle = keepidle
self.socket = None
def start(self, key=None, backlog=128):
"""Run a WSGI server with the given application."""
if self.socket is None:
self.listen(key=key, backlog=backlog)
self.greenthread = self.pool.spawn(self._run,
self.application,
self.socket)
def listen(self, key=None, backlog=128):
"""Create and start listening on socket.
Call before forking worker processes.
Raises Exception if this has already been called.
"""
if self.socket is not None:
raise Exception(_('Server can only listen once.'))
LOG.info(_('Starting %(arg0)s on %(host)s:%(port)s'),
{'arg0': sys.argv[0],
'host': self.host,
'port': self.port})
# TODO(dims): eventlet's green dns/socket module does not actually
# support IPv6 in getaddrinfo(). We need to get around this in the
# future or monitor upstream for a fix
info = socket.getaddrinfo(self.host,
self.port,
socket.AF_UNSPEC,
socket.SOCK_STREAM)[0]
_socket = eventlet.listen(info[-1],
family=info[0],
backlog=backlog)
if key:
self.socket_info[key] = _socket.getsockname()
# SSL is enabled
if self.do_ssl:
if self.cert_required:
cert_reqs = ssl.CERT_REQUIRED
else:
cert_reqs = ssl.CERT_NONE
sslsocket = eventlet.wrap_ssl(_socket, certfile=self.certfile,
keyfile=self.keyfile,
server_side=True,
cert_reqs=cert_reqs,
ca_certs=self.ca_certs)
_socket = sslsocket
# Optionally enable keepalive on the wsgi socket.
if self.keepalive:
_socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
# This option isn't available in the OS X version of eventlet
if hasattr(socket, 'TCP_KEEPIDLE') and self.keepidle is not None:
_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_KEEPIDLE,
self.keepidle)
self.socket = _socket
def set_ssl(self, certfile, keyfile=None, ca_certs=None,
cert_required=True):
self.certfile = certfile
self.keyfile = keyfile
self.ca_certs = ca_certs
self.cert_required = cert_required
self.do_ssl = True
def kill(self):
if self.greenthread is not None:
self.greenthread.kill()
def stop(self):
self.kill()
def wait(self):
"""Wait until all servers have completed running."""
try:
self.pool.waitall()
except KeyboardInterrupt:
pass
except greenlet.GreenletExit:
pass
def _run(self, application, socket):
"""Start a WSGI server in a new green thread."""
logger = log.getLogger('eventlet.wsgi.server')
try:
eventlet.wsgi.server(socket, application, custom_pool=self.pool,
log=EventletFilteringLogger(logger),
debug=False)
except greenlet.GreenletExit:
# Wait until all servers have completed running
pass
except Exception:
LOG.exception(_('Server error'))
raise

View File

@@ -17,21 +17,77 @@
import eventlet
eventlet.monkey_patch()
import socket
import os.path
from oslo.config import cfg
import sys
from congress.api import application
from congress.api import wsgi
from congress.common import config
from congress.common import eventlet_server
import congress.dse.d6cage
from congress.openstack.common.gettextutils import _
from congress.openstack.common import log as logging
from congress.openstack.common import service
from congress.openstack.common import systemd
LOG = logging.getLogger(__name__)
class ServerWrapper(object):
"""Wraps an eventlet_server with some launching info & capabilities."""
def __init__(self, server, workers):
self.server = server
self.workers = workers
def launch_with(self, launcher):
self.server.listen()
if self.workers > 1:
# Use multi-process launcher
launcher.launch_service(self.server, self.workers)
else:
# Use single process launcher
launcher.launch_service(self.server)
def create_api_server(conf, name, host, port, workers):
api_resource_mgr = application.ResourceManager()
application.initialize_resources(api_resource_mgr)
api_webapp = application.ApiApplication(api_resource_mgr)
congress_api_server = eventlet_server.Server(
api_webapp, host=host, port=port,
keepalive=cfg.CONF.tcp_keepalive,
keepidle=cfg.CONF.tcp_keepidle)
#TODO(arosen) - add ssl support here.
return name, ServerWrapper(congress_api_server, workers)
def serve(*servers):
if max([server[1].workers for server in servers]) > 1:
#TODO(arosen) - need to provide way to communicate with DSE services
launcher = service.ProcessLauncher()
else:
launcher = service.ServiceLauncher()
for name, server in servers:
try:
server.launch_with(launcher)
except socket.error:
LOG.exception(_('Failed to start the %(name)s server') % {
'name': name})
raise
# notify calling process we are ready to serve
systemd.notify_once()
for name, server in servers:
launcher.wait()
class EventLoop(object):
"""Wrapper for eventlet pool and DSE constructs used by services.
@@ -83,7 +139,6 @@ def main():
config.setup_logging()
LOG.info("Starting congress server")
loop = EventLoop(cfg.CONF.max_simultaneous_requests)
# TODO(pballand): Fix the policy enginge registration to work with the
# latest policy changes.
# engine = loop.register_service(
@@ -94,15 +149,15 @@ def main():
# API resource runtime encapsulation:
# event loop -> wsgi server -> webapp -> resource manager
wsgi_server = wsgi.Server("Congress API Broker", pool=loop.pool)
api_resource_mgr = application.ResourceManager()
application.initialize_resources(api_resource_mgr)
api_webapp = application.ApiApplication(api_resource_mgr)
# TODO(pballand): start this inside d6cage(?)
wsgi_server.start(api_webapp, cfg.CONF.bind_port,
cfg.CONF.bind_host)
loop.wait()
#TODO(arosen): find api-paste.conf for keystonemiddleware
paste_config = None
servers = []
servers.append(create_api_server(paste_config,
"congress-api-server",
cfg.CONF.bind_host,
cfg.CONF.bind_port,
cfg.CONF.api_workers))
serve(*servers)
if __name__ == '__main__':

View File

@@ -28,3 +28,6 @@ class ConfigurationTest(testtools.TestCase):
def test_defaults(self):
self.assertEqual('0.0.0.0', cfg.CONF.bind_host)
self.assertEqual(8080, cfg.CONF.bind_port)
self.assertEqual(False, cfg.CONF.tcp_keepalive)
self.assertEqual(600, cfg.CONF.tcp_keepidle)
self.assertEqual(1, cfg.CONF.api_workers)

View File

@@ -0,0 +1,88 @@
# Copyright (c) 2014 VMware
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import socket
import testtools
import mock
from congress.common import config
from congress.common import eventlet_server
class ServerTest(testtools.TestCase):
def setUp(self):
super(ServerTest, self).setUp()
self.host = '127.0.0.1'
self.port = '1234'
#FIXME(arosen) - we need to inherit from a base class that does this.
config.setup_logging()
@mock.patch('eventlet.listen')
@mock.patch('socket.getaddrinfo')
def test_keepalive_unset(self, mock_getaddrinfo, mock_listen):
mock_getaddrinfo.return_value = [(1, 2, 3, 4, 5)]
mock_sock = mock.Mock()
mock_sock.setsockopt = mock.Mock()
mock_listen.return_value = mock_sock
server = eventlet_server.Server(mock.MagicMock(), host=self.host,
port=self.port)
server.start()
self.assertTrue(mock_listen.called)
self.assertFalse(mock_sock.setsockopt.called)
@mock.patch('eventlet.listen')
@mock.patch('socket.getaddrinfo')
def test_keepalive_set(self, mock_getaddrinfo, mock_listen):
mock_getaddrinfo.return_value = [(1, 2, 3, 4, 5)]
mock_sock = mock.Mock()
mock_sock.setsockopt = mock.Mock()
mock_listen.return_value = mock_sock
server = eventlet_server.Server(mock.MagicMock(), host=self.host,
port=self.port, keepalive=True)
server.start()
mock_sock.setsockopt.assert_called_once_with(socket.SOL_SOCKET,
socket.SO_KEEPALIVE,
1)
self.assertTrue(mock_listen.called)
@mock.patch('eventlet.listen')
@mock.patch('socket.getaddrinfo')
def test_keepalive_and_keepidle_set(self, mock_getaddrinfo, mock_listen):
mock_getaddrinfo.return_value = [(1, 2, 3, 4, 5)]
mock_sock = mock.Mock()
mock_sock.setsockopt = mock.Mock()
mock_listen.return_value = mock_sock
server = eventlet_server.Server(mock.MagicMock(), host=self.host,
port=self.port, keepalive=True,
keepidle=1)
server.start()
# keepidle isn't available in the OS X version of eventlet
if hasattr(socket, 'TCP_KEEPIDLE'):
self.assertEqual(mock_sock.setsockopt.call_count, 2)
# Test the last set of call args i.e. for the keepidle
mock_sock.setsockopt.assert_called_with(socket.IPPROTO_TCP,
socket.TCP_KEEPIDLE,
1)
else:
self.assertEqual(mock_sock.setsockopt.call_count, 1)
self.assertTrue(mock_listen.called)