From a49fd3d8def23bfb49e605fea372c82d1ec6cdff Mon Sep 17 00:00:00 2001 From: Samuel Merritt Date: Fri, 13 Dec 2013 13:11:01 -0800 Subject: [PATCH] Force catch_errors into pipeline This commit adds a hook for WSGI applications (e.g. proxy.server.Application) to modify their WSGI pipelines. This is currently used by the proxy server to ensure that catch_errors is present; if it is missing, it is inserted as the first middleware in the pipeline. This lets us write new, mandatory middlewares for Swift without breaking existing deployments on upgrade. Change-Id: Ibed0f2edb6f80c25be182b3d4544e6a67c5050ad --- swift/common/wsgi.py | 107 ++++++++++++- swift/proxy/server.py | 47 ++++++ test/unit/common/test_wsgi.py | 282 ++++++++++++++++++++++++++++++---- 3 files changed, 407 insertions(+), 29 deletions(-) diff --git a/swift/common/wsgi.py b/swift/common/wsgi.py index c25425348f..e61e1545c5 100644 --- a/swift/common/wsgi.py +++ b/swift/common/wsgi.py @@ -16,6 +16,7 @@ """WSGI tools for use with swift.""" import errno +import inspect import os import signal import time @@ -109,7 +110,6 @@ def wrap_conf_type(f): appconfig = wrap_conf_type(loadwsgi.appconfig) -loadapp = wrap_conf_type(loadwsgi.loadapp) def monkey_patch_mimetools(): @@ -197,6 +197,111 @@ class RestrictedGreenPool(GreenPool): self.waitall() +class PipelineWrapper(object): + """ + This class provides a number of utility methods for + modifying the composition of a wsgi pipeline. + """ + + def __init__(self, context): + self.context = context + + def __contains__(self, entry_point_name): + try: + self.index(entry_point_name) + return True + except ValueError: + return False + + def _format_for_display(self, ctx): + if ctx.entry_point_name: + return ctx.entry_point_name + elif inspect.isfunction(ctx.object): + # ctx.object is a reference to the actual filter_factory + # function, so we pretty-print that. It's not the nice short + # entry point, but it beats "". + # + # These happen when, instead of something like + # + # use = egg:swift#healthcheck + # + # you have something like this: + # + # paste.filter_factory = \ + # swift.common.middleware.healthcheck:filter_factory + return "%s:%s" % (inspect.getmodule(ctx.object).__name__, + ctx.object.__name__) + else: + # No idea what this is + return "" + + def __str__(self): + parts = [self._format_for_display(ctx) + for ctx in self.context.filter_contexts] + parts.append(self._format_for_display(self.context.app_context)) + return " ".join(parts) + + def create_filter(self, entry_point_name): + """ + Creates a context for a filter that can subsequently be added + to a pipeline context. + + :param entry_point_name: entry point of the middleware (Swift only) + + :returns: a filter context + """ + spec = 'egg:swift#' + entry_point_name + ctx = loadwsgi.loadcontext(loadwsgi.FILTER, spec, + global_conf=self.context.global_conf) + ctx.protocol = 'paste.filter_factory' + return ctx + + def index(self, entry_point_name): + """ + Returns the first index of the given entry point name in the pipeline. + + Raises ValueError if the given module is not in the pipeline. + """ + for i, ctx in enumerate(self.context.filter_contexts): + if ctx.entry_point_name == entry_point_name: + return i + raise ValueError("%s is not in pipeline" % (entry_point_name,)) + + def insert_filter(self, ctx, index=0): + """ + Inserts a filter module into the pipeline context. + + :param ctx: the context to be inserted + :param index: (optional) index at which filter should be + inserted in the list of pipeline filters. Default + is 0, which means the start of the pipeline. + """ + self.context.filter_contexts.insert(index, ctx) + + +def loadcontext(object_type, uri, name=None, relative_to=None, + global_conf=None): + add_conf_type = wrap_conf_type(lambda x: x) + return loadwsgi.loadcontext(object_type, add_conf_type(uri), name=name, + relative_to=relative_to, + global_conf=global_conf) + + +def loadapp(conf_file, global_conf): + """ + Loads a context from a config file, and if the context is a pipeline + then presents the app with the opportunity to modify the pipeline. + """ + ctx = loadcontext(loadwsgi.APP, conf_file, global_conf=global_conf) + if ctx.object_type.name == 'pipeline': + # give app the opportunity to modify the pipeline context + app = ctx.app_context.create() + func = getattr(app, 'modify_wsgi_pipeline', None) + if func: + func(PipelineWrapper(ctx)) + return ctx.create() + + def run_server(conf, logger, sock, global_conf=None): # Ensure TZ environment variable exists to avoid stat('/etc/localtime') on # some platforms. This locks in reported times to the timezone in which diff --git a/swift/proxy/server.py b/swift/proxy/server.py index ee6890e7e4..1d5f968d3c 100644 --- a/swift/proxy/server.py +++ b/swift/proxy/server.py @@ -38,6 +38,22 @@ from swift.common.swob import HTTPBadRequest, HTTPForbidden, \ HTTPServerError, HTTPException, Request +# List of entry points for mandatory middlewares. +# +# Fields: +# +# "name" (required) is the entry point name from setup.py. +# +# "after" (optional) is a list of middlewares that this middleware should come +# after. Default is for the middleware to go at the start of the pipeline. Any +# middlewares in the "after" list that are not present in the pipeline will be +# ignored, so you can safely name optional middlewares to come after. For +# example, 'after: ["catch_errors", "bulk"]' would install this middleware +# after catch_errors and bulk if both were present, but if bulk were absent, +# would just install it after catch_errors. +required_filters = [{'name': 'catch_errors'}] + + class Application(object): """WSGI application for the proxy server.""" @@ -478,6 +494,37 @@ class Application(object): {'type': typ, 'ip': node['ip'], 'port': node['port'], 'device': node['device'], 'info': additional_info}) + def modify_wsgi_pipeline(self, pipe): + """ + Called during WSGI pipeline creation. Modifies the WSGI pipeline + context to ensure that mandatory middleware is present in the pipeline. + + :param pipe: A PipelineWrapper object + """ + pipeline_was_modified = False + for filter_spec in reversed(required_filters): + filter_name = filter_spec['name'] + if filter_name not in pipe: + afters = filter_spec.get('after', []) + insert_at = 0 + for after in afters: + try: + insert_at = max(insert_at, pipe.index(after) + 1) + except ValueError: # not in pipeline; ignore it + pass + self.logger.info( + 'Adding required filter %s to pipeline at position %d' % + (filter_name, insert_at)) + ctx = pipe.create_filter(filter_name) + pipe.insert_filter(ctx, index=insert_at) + pipeline_was_modified = True + + if pipeline_was_modified: + self.logger.info("Pipeline was modified. New pipeline is \"%s\".", + pipe) + else: + self.logger.debug("Pipeline is \"%s\"", pipe) + def app_factory(global_conf, **local_conf): """paste.deploy app factory for creating WSGI proxy apps.""" diff --git a/test/unit/common/test_wsgi.py b/test/unit/common/test_wsgi.py index 1d92193fb9..d1e8f5130c 100644 --- a/test/unit/common/test_wsgi.py +++ b/test/unit/common/test_wsgi.py @@ -42,7 +42,7 @@ from swift.common import wsgi, utils, ring from test.unit import temptree -from mock import patch +from paste.deploy import loadwsgi def _fake_rings(tmpdir): @@ -126,14 +126,11 @@ class TestWSGI(unittest.TestCase): swift_dir = TEMPDIR [pipeline:main] - pipeline = catch_errors proxy-server + pipeline = proxy-server [app:proxy-server] use = egg:swift#proxy conn_timeout = 0.2 - - [filter:catch_errors] - use = egg:swift#catch_errors """ contents = dedent(config) with temptree(['proxy-server.conf']) as t: @@ -143,7 +140,7 @@ class TestWSGI(unittest.TestCase): _fake_rings(t) app, conf, logger, log_name = wsgi.init_request_processor( conf_file, 'proxy-server') - # verify pipeline is catch_errors -> proxy-servery + # verify pipeline is catch_errors -> proxy-server expected = swift.common.middleware.catch_errors.CatchErrorMiddleware self.assert_(isinstance(app, expected)) self.assert_(isinstance(app.app, swift.proxy.server.Application)) @@ -179,14 +176,15 @@ class TestWSGI(unittest.TestCase): } # strip indent from test config contents config_dir = dict((f, dedent(c)) for (f, c) in config_dir.items()) - with temptree(*zip(*config_dir.items())) as conf_root: - conf_dir = os.path.join(conf_root, 'proxy-server.conf.d') - with open(os.path.join(conf_dir, 'swift.conf'), 'w') as f: - f.write('[DEFAULT]\nswift_dir = %s' % conf_root) - _fake_rings(conf_root) - app, conf, logger, log_name = wsgi.init_request_processor( - conf_dir, 'proxy-server') - # verify pipeline is catch_errors -> proxy-servery + with mock.patch('swift.proxy.server.Application.modify_wsgi_pipeline'): + with temptree(*zip(*config_dir.items())) as conf_root: + conf_dir = os.path.join(conf_root, 'proxy-server.conf.d') + with open(os.path.join(conf_dir, 'swift.conf'), 'w') as f: + f.write('[DEFAULT]\nswift_dir = %s' % conf_root) + _fake_rings(conf_root) + app, conf, logger, log_name = wsgi.init_request_processor( + conf_dir, 'proxy-server') + # verify pipeline is catch_errors -> proxy-server expected = swift.common.middleware.catch_errors.CatchErrorMiddleware self.assert_(isinstance(app, expected)) self.assert_(isinstance(app.app, swift.proxy.server.Application)) @@ -333,12 +331,14 @@ class TestWSGI(unittest.TestCase): with open(conf_file, 'w') as f: f.write(contents.replace('TEMPDIR', t)) _fake_rings(t) - with patch('swift.common.wsgi.wsgi') as _wsgi: - with patch('swift.common.wsgi.eventlet') as _eventlet: - conf = wsgi.appconfig(conf_file) - logger = logging.getLogger('test') - sock = listen(('localhost', 0)) - wsgi.run_server(conf, logger, sock) + with mock.patch('swift.proxy.server.Application.' + 'modify_wsgi_pipeline'): + with mock.patch('swift.common.wsgi.wsgi') as _wsgi: + with mock.patch('swift.common.wsgi.eventlet') as _eventlet: + conf = wsgi.appconfig(conf_file) + logger = logging.getLogger('test') + sock = listen(('localhost', 0)) + wsgi.run_server(conf, logger, sock) self.assertEquals('HTTP/1.0', _wsgi.HttpProtocol.default_request_version) self.assertEquals(30, _wsgi.WRITE_TIMEOUT) @@ -379,14 +379,16 @@ class TestWSGI(unittest.TestCase): with open(os.path.join(conf_dir, 'swift.conf'), 'w') as f: f.write('[DEFAULT]\nswift_dir = %s' % conf_root) _fake_rings(conf_root) - with patch('swift.common.wsgi.wsgi') as _wsgi: - with patch('swift.common.wsgi.eventlet') as _eventlet: - with patch.dict('os.environ', {'TZ': ''}): - conf = wsgi.appconfig(conf_dir) - logger = logging.getLogger('test') - sock = listen(('localhost', 0)) - wsgi.run_server(conf, logger, sock) - self.assert_(os.environ['TZ'] is not '') + with mock.patch('swift.proxy.server.Application.' + 'modify_wsgi_pipeline'): + with mock.patch('swift.common.wsgi.wsgi') as _wsgi: + with mock.patch('swift.common.wsgi.eventlet') as _eventlet: + with mock.patch.dict('os.environ', {'TZ': ''}): + conf = wsgi.appconfig(conf_dir) + logger = logging.getLogger('test') + sock = listen(('localhost', 0)) + wsgi.run_server(conf, logger, sock) + self.assert_(os.environ['TZ'] is not '') self.assertEquals('HTTP/1.0', _wsgi.HttpProtocol.default_request_version) @@ -667,5 +669,229 @@ class TestWSGIContext(unittest.TestCase): self.assertRaises(StopIteration, iterator.next) +class TestPipelineWrapper(unittest.TestCase): + + def setUp(self): + config = """ + [DEFAULT] + swift_dir = TEMPDIR + + [pipeline:main] + pipeline = healthcheck catch_errors tempurl proxy-server + + [app:proxy-server] + use = egg:swift#proxy + conn_timeout = 0.2 + + [filter:catch_errors] + use = egg:swift#catch_errors + + [filter:healthcheck] + use = egg:swift#healthcheck + + [filter:tempurl] + paste.filter_factory = swift.common.middleware.tempurl:filter_factory + """ + + contents = dedent(config) + with temptree(['proxy-server.conf']) as t: + conf_file = os.path.join(t, 'proxy-server.conf') + with open(conf_file, 'w') as f: + f.write(contents.replace('TEMPDIR', t)) + ctx = wsgi.loadcontext(loadwsgi.APP, conf_file, global_conf={}) + self.pipe = wsgi.PipelineWrapper(ctx) + + def _entry_point_names(self): + # Helper method to return a list of the entry point names for the + # filters in the pipeline. + return [c.entry_point_name for c in self.pipe.context.filter_contexts] + + def test_insert_filter(self): + original_modules = ['healthcheck', 'catch_errors', None] + self.assertEqual(self._entry_point_names(), original_modules) + + self.pipe.insert_filter(self.pipe.create_filter('catch_errors')) + expected_modules = ['catch_errors', 'healthcheck', + 'catch_errors', None] + self.assertEqual(self._entry_point_names(), expected_modules) + + def test_str(self): + self.assertEqual( + str(self.pipe), + "healthcheck catch_errors " + + "swift.common.middleware.tempurl:filter_factory proxy") + + def test_str_unknown_filter(self): + self.pipe.context.filter_contexts[0].entry_point_name = None + self.pipe.context.filter_contexts[0].object = 'mysterious' + self.assertEqual( + str(self.pipe), + " catch_errors " + + "swift.common.middleware.tempurl:filter_factory proxy") + + +class TestPipelineModification(unittest.TestCase): + def pipeline_modules(self, app): + # This is rather brittle; it'll break if a middleware stores its app + # anywhere other than an attribute named "app", but it works for now. + pipe = [] + for _ in xrange(1000): + pipe.append(app.__class__.__module__) + if not hasattr(app, 'app'): + break + app = app.app + return pipe + + def test_load_app(self): + config = """ + [DEFAULT] + swift_dir = TEMPDIR + + [pipeline:main] + pipeline = healthcheck proxy-server + + [app:proxy-server] + use = egg:swift#proxy + conn_timeout = 0.2 + + [filter:catch_errors] + use = egg:swift#catch_errors + + [filter:healthcheck] + use = egg:swift#healthcheck + """ + + def modify_func(app, pipe): + new = pipe.create_filter('catch_errors') + pipe.insert_filter(new) + + contents = dedent(config) + with temptree(['proxy-server.conf']) as t: + conf_file = os.path.join(t, 'proxy-server.conf') + with open(conf_file, 'w') as f: + f.write(contents.replace('TEMPDIR', t)) + _fake_rings(t) + with mock.patch( + 'swift.proxy.server.Application.modify_wsgi_pipeline', + modify_func): + app = wsgi.loadapp(conf_file, global_conf={}) + exp = swift.common.middleware.catch_errors.CatchErrorMiddleware + self.assertTrue(isinstance(app, exp), app) + exp = swift.common.middleware.healthcheck.HealthCheckMiddleware + self.assertTrue(isinstance(app.app, exp), app.app) + exp = swift.proxy.server.Application + self.assertTrue(isinstance(app.app.app, exp), app.app.app) + + def test_proxy_unmodified_wsgi_pipeline(self): + # Make sure things are sane even when we modify nothing + config = """ + [DEFAULT] + swift_dir = TEMPDIR + + [pipeline:main] + pipeline = catch_errors proxy-server + + [app:proxy-server] + use = egg:swift#proxy + conn_timeout = 0.2 + + [filter:catch_errors] + use = egg:swift#catch_errors + """ + + contents = dedent(config) + with temptree(['proxy-server.conf']) as t: + conf_file = os.path.join(t, 'proxy-server.conf') + with open(conf_file, 'w') as f: + f.write(contents.replace('TEMPDIR', t)) + _fake_rings(t) + app = wsgi.loadapp(conf_file, global_conf={}) + + self.assertEqual(self.pipeline_modules(app), + ['swift.common.middleware.catch_errors', + 'swift.proxy.server']) + + def test_proxy_modify_wsgi_pipeline(self): + config = """ + [DEFAULT] + swift_dir = TEMPDIR + + [pipeline:main] + pipeline = healthcheck proxy-server + + [app:proxy-server] + use = egg:swift#proxy + conn_timeout = 0.2 + + [filter:healthcheck] + use = egg:swift#healthcheck + """ + + contents = dedent(config) + with temptree(['proxy-server.conf']) as t: + conf_file = os.path.join(t, 'proxy-server.conf') + with open(conf_file, 'w') as f: + f.write(contents.replace('TEMPDIR', t)) + _fake_rings(t) + app = wsgi.loadapp(conf_file, global_conf={}) + + self.assertEqual(self.pipeline_modules(app)[0], + 'swift.common.middleware.catch_errors') + + def test_proxy_modify_wsgi_pipeline_ordering(self): + config = """ + [DEFAULT] + swift_dir = TEMPDIR + + [pipeline:main] + pipeline = healthcheck proxy-logging bulk tempurl proxy-server + + [app:proxy-server] + use = egg:swift#proxy + conn_timeout = 0.2 + + [filter:healthcheck] + use = egg:swift#healthcheck + + [filter:proxy-logging] + use = egg:swift#proxy_logging + + [filter:bulk] + use = egg:swift#bulk + + [filter:tempurl] + use = egg:swift#tempurl + """ + + new_req_filters = [ + # not in pipeline, no afters + {'name': 'catch_errors'}, + # already in pipeline + {'name': 'proxy_logging', + 'after': ['catch_errors']}, + # not in pipeline, comes after more than one thing + {'name': 'container_quotas', + 'after': ['catch_errors', 'bulk']}] + + contents = dedent(config) + with temptree(['proxy-server.conf']) as t: + conf_file = os.path.join(t, 'proxy-server.conf') + with open(conf_file, 'w') as f: + f.write(contents.replace('TEMPDIR', t)) + _fake_rings(t) + with mock.patch.object(swift.proxy.server, 'required_filters', + new_req_filters): + app = wsgi.loadapp(conf_file, global_conf={}) + + self.assertEqual(self.pipeline_modules(app), [ + 'swift.common.middleware.catch_errors', + 'swift.common.middleware.healthcheck', + 'swift.common.middleware.proxy_logging', + 'swift.common.middleware.bulk', + 'swift.common.middleware.container_quotas', + 'swift.common.middleware.tempurl', + 'swift.proxy.server']) + + if __name__ == '__main__': unittest.main()