Get rid of pipeline_property

Instead, ensure every middleware gets a reference to the final WSGI
application. Note that this reimplements much of paste.deploy's pipeline
handling, but that code hasn't changed in years, anyway.

Change-Id: I2fbb21cabf72849ce84760a6d2607aa2af67f286
This commit is contained in:
Tim Burke 2022-01-06 12:12:06 -08:00
parent 8ac63b7609
commit 9bc1c008a5
9 changed files with 98 additions and 170 deletions

View File

@ -25,14 +25,13 @@ import zlib
from time import gmtime, strftime, time
from zlib import compressobj
from swift.common.constraints import AUTO_CREATE_ACCOUNT_PREFIX
from swift.common.exceptions import ClientException
from swift.common.http import (HTTP_NOT_FOUND, HTTP_MULTIPLE_CHOICES,
is_client_error, is_server_error)
from swift.common.request_helpers import USE_REPLICATION_NETWORK_HEADER
from swift.common.swob import Request, bytes_to_wsgi
from swift.common.utils import quote, close_if_possible, drain_and_close
from swift.common.wsgi import loadapp, pipeline_property
from swift.common.wsgi import loadapp
if six.PY3:
from eventlet.green.urllib import request as urllib2
@ -148,24 +147,24 @@ class InternalClient(object):
:param global_conf: a dict of options to update the loaded proxy config.
Options in ``global_conf`` will override those in ``conf_path`` except
where the ``conf_path`` option is preceded by ``set``.
:param app: Optionally provide a WSGI app for the internal client to use.
"""
def __init__(self, conf_path, user_agent, request_tries,
allow_modify_pipeline=False, use_replication_network=False,
global_conf=None):
global_conf=None, app=None):
if request_tries < 1:
raise ValueError('request_tries must be positive')
self.app = loadapp(conf_path, global_conf=global_conf,
allow_modify_pipeline=allow_modify_pipeline,)
self.app = app or loadapp(conf_path, global_conf=global_conf,
allow_modify_pipeline=allow_modify_pipeline,)
self.user_agent = user_agent
self.request_tries = request_tries
self.use_replication_network = use_replication_network
get_object_ring = pipeline_property('get_object_ring')
container_ring = pipeline_property('container_ring')
account_ring = pipeline_property('account_ring')
auto_create_account_prefix = pipeline_property(
'auto_create_account_prefix', default=AUTO_CREATE_ACCOUNT_PREFIX)
self.get_object_ring = self.app._pipeline_final_app.get_object_ring
self.container_ring = self.app._pipeline_final_app.container_ring
self.account_ring = self.app._pipeline_final_app.account_ring
self.auto_create_account_prefix = \
self.app._pipeline_final_app.auto_create_account_prefix
def make_request(
self, method, path, headers, acceptable_statuses, body_file=None,

View File

@ -237,47 +237,6 @@ class RestrictedGreenPool(GreenPool):
self.waitall()
def pipeline_property(name, **kwargs):
"""
Create a property accessor for the given name. The property will
dig through the bound instance on which it was accessed for an
attribute "app" and check that object for an attribute of the given
name. If the "app" object does not have such an attribute, it will
look for an attribute "app" on THAT object and continue it's search
from there. If the named attribute cannot be found accessing the
property will raise AttributeError.
If a default kwarg is provided you get that instead of the
AttributeError. When found the attribute will be cached on instance
with the property accessor using the same name as the attribute
prefixed with a leading underscore.
"""
cache_attr_name = '_%s' % name
def getter(self):
cached_value = getattr(self, cache_attr_name, None)
if cached_value:
return cached_value
app = self # first app is on self
while True:
app = getattr(app, 'app', None)
if not app:
break
try:
value = getattr(app, name)
except AttributeError:
continue
setattr(self, cache_attr_name, value)
return value
if 'default' in kwargs:
return kwargs['default']
raise AttributeError('No apps in pipeline have a '
'%s attribute' % name)
return property(getter)
class PipelineWrapper(object):
"""
This class provides a number of utility methods for
@ -375,13 +334,6 @@ def loadcontext(object_type, uri, name=None, relative_to=None,
global_conf=global_conf)
def _add_pipeline_properties(app, *names):
for property_name in names:
if not hasattr(app, property_name):
setattr(app.__class__, property_name,
pipeline_property(property_name))
def loadapp(conf_file, global_conf=None, allow_modify_pipeline=True):
"""
Loads a context from a config file, and if the context is a pipeline
@ -400,13 +352,17 @@ def loadapp(conf_file, global_conf=None, allow_modify_pipeline=True):
ctx = loadcontext(loadwsgi.APP, conf_file, global_conf=global_conf)
if ctx.object_type.name == 'pipeline':
# give app the opportunity to modify the pipeline context
app = ctx.app_context.create()
func = getattr(app, 'modify_wsgi_pipeline', None)
ultimate_app = ctx.app_context.create()
func = getattr(ultimate_app, 'modify_wsgi_pipeline', None)
if func and allow_modify_pipeline:
func(PipelineWrapper(ctx))
# cache the freshly created app so we don't have to redo
# initialization checks and log startup messages again
ctx.app_context.create = lambda: app
filters = [c.create() for c in reversed(ctx.filter_contexts)]
app = ultimate_app
app._pipeline_final_app = ultimate_app
for filter_app in filters:
app = filter_app(app)
app._pipeline_final_app = ultimate_app
return app
return ctx.create()

View File

@ -890,7 +890,7 @@ class TestContainerSyncAndVersioning(BaseTestContainerSync):
int_client = self.make_internal_client()
# TODO: what a terrible hack, maybe we need to extend internal
# client to allow caller to become a swift_owner??
int_client.app.app.app.app.swift_owner_headers = []
int_client.app._pipeline_final_app.swift_owner_headers = []
int_client.set_container_metadata(self.account, container_name,
metadata=sync_headers)

View File

@ -93,6 +93,8 @@ class FakeSwift(object):
self.account_ring = FakeRing()
self.container_ring = FakeRing()
self.get_object_ring = lambda policy_index: FakeRing()
self.auto_create_account_prefix = '.'
self._pipeline_final_app = self
def _find_response(self, method, path):
path = normalize_path(path)

View File

@ -86,9 +86,7 @@ def make_path_info(account, container=None, obj=None):
def get_client_app():
app = FakeSwift()
with mock.patch('swift.common.internal_client.loadapp',
new=lambda *args, **kwargs: app):
client = internal_client.InternalClient({}, 'test', 1)
client = internal_client.InternalClient({}, 'test', 1, app=app)
return client, app
@ -380,7 +378,7 @@ class TestInternalClient(unittest.TestCase):
def test_init(self):
conf_path = 'some_path'
app = object()
app = FakeSwift()
user_agent = 'some_user_agent'
request_tries = 123
@ -405,28 +403,18 @@ class TestInternalClient(unittest.TestCase):
self.assertEqual(request_tries, client.request_tries)
self.assertFalse(client.use_replication_network)
with mock.patch.object(
internal_client, 'loadapp', return_value=app) as mock_loadapp:
client = internal_client.InternalClient(
conf_path, user_agent, request_tries,
use_replication_network=True)
mock_loadapp.assert_called_once_with(
conf_path, global_conf=None, allow_modify_pipeline=False)
client = internal_client.InternalClient(
conf_path, user_agent, request_tries, app=app,
use_replication_network=True)
self.assertEqual(app, client.app)
self.assertEqual(user_agent, client.user_agent)
self.assertEqual(request_tries, client.request_tries)
self.assertTrue(client.use_replication_network)
global_conf = {'log_name': 'custom'}
with mock.patch.object(
internal_client, 'loadapp', return_value=app) as mock_loadapp:
client = internal_client.InternalClient(
conf_path, user_agent, request_tries,
use_replication_network=True, global_conf=global_conf)
mock_loadapp.assert_called_once_with(
conf_path, global_conf=global_conf, allow_modify_pipeline=False)
client = internal_client.InternalClient(
conf_path, user_agent, request_tries, app=app,
use_replication_network=True, global_conf=global_conf)
self.assertEqual(app, client.app)
self.assertEqual(user_agent, client.user_agent)
self.assertEqual(request_tries, client.request_tries)

View File

@ -1942,6 +1942,10 @@ class TestPipelineModification(unittest.TestCase):
self.assertTrue(isinstance(app.app, exp), app.app)
exp = swift.proxy.server.Application
self.assertTrue(isinstance(app.app.app, exp), app.app.app)
# Everybody gets a reference to the final app, too
self.assertIs(app.app.app, app._pipeline_final_app)
self.assertIs(app.app.app, app.app._pipeline_final_app)
self.assertIs(app.app.app, app.app.app._pipeline_final_app)
# make sure you can turn off the pipeline modification if you want
def blow_up(*_, **__):
@ -2399,7 +2403,7 @@ class TestPipelineModification(unittest.TestCase):
tempdir, policy.ring_name + '.ring.gz')
app = wsgi.loadapp(conf_path)
proxy_app = app.app.app.app.app.app.app.app
proxy_app = app._pipeline_final_app
self.assertEqual(proxy_app.account_ring.serialized_path,
account_ring_path)
self.assertEqual(proxy_app.container_ring.serialized_path,
@ -2431,36 +2435,6 @@ class TestPipelineModification(unittest.TestCase):
app = wsgi.loadapp(conf_path)
self.assertTrue(isinstance(app, controller))
def test_pipeline_property(self):
depth = 3
class FakeApp(object):
pass
class AppFilter(object):
def __init__(self, app):
self.app = app
# make a pipeline
app = FakeApp()
filtered_app = app
for i in range(depth):
filtered_app = AppFilter(filtered_app)
# AttributeError if no apps in the pipeline have attribute
wsgi._add_pipeline_properties(filtered_app, 'foo')
self.assertRaises(AttributeError, getattr, filtered_app, 'foo')
# set the attribute
self.assertTrue(isinstance(app, FakeApp))
app.foo = 'bar'
self.assertEqual(filtered_app.foo, 'bar')
# attribute is cached
app.foo = 'baz'
self.assertEqual(filtered_app.foo, 'bar')
if __name__ == '__main__':
unittest.main()

View File

@ -105,6 +105,7 @@ class FakeInternalClient(reconciler.InternalClient):
self.request_tries = 1
self.use_replication_network = True
self.parse(listings)
self.container_ring = FakeRing()
def parse(self, listings):
listings = listings or {}

View File

@ -101,10 +101,8 @@ class TestObjectExpirer(TestCase):
def setUp(self):
global not_sleep
self.old_loadapp = internal_client.loadapp
self.old_sleep = internal_client.sleep
internal_client.loadapp = lambda *a, **kw: None
internal_client.sleep = not_sleep
self.rcache = mkdtemp()
@ -149,17 +147,28 @@ class TestObjectExpirer(TestCase):
)
]
def make_fake_ic(self, app):
app._pipeline_final_app = mock.MagicMock()
return internal_client.InternalClient(None, 'fake-ic', 1, app=app)
def tearDown(self):
rmtree(self.rcache)
internal_client.sleep = self.old_sleep
internal_client.loadapp = self.old_loadapp
def test_init(self):
x = expirer.ObjectExpirer({}, logger=self.logger)
with mock.patch.object(expirer, 'InternalClient',
return_value=self.fake_swift) as mock_ic:
x = expirer.ObjectExpirer({}, logger=self.logger)
self.assertEqual(mock_ic.mock_calls, [mock.call(
'/etc/swift/object-expirer.conf', 'Swift Object Expirer', 3,
use_replication_network=True,
global_conf={'log_name': 'object-expirer-ic'})])
self.assertEqual(self.logger.get_lines_for_level('warning'), [])
self.assertEqual(x.expiring_objects_account, '.expiring_objects')
self.assertIs(x.swift, self.fake_swift)
x = expirer.ObjectExpirer({'auto_create_account_prefix': '-'},
logger=self.logger)
logger=self.logger, swift=self.fake_swift)
self.assertEqual(self.logger.get_lines_for_level('warning'), [
'Option auto_create_account_prefix is deprecated. '
'Configure auto_create_account_prefix under the '
@ -185,7 +194,7 @@ class TestObjectExpirer(TestCase):
'my-object-expirer-ic')
def test_get_process_values_from_kwargs(self):
x = expirer.ObjectExpirer({})
x = expirer.ObjectExpirer({}, swift=self.fake_swift)
vals = {
'processes': 5,
'process': 1,
@ -199,7 +208,7 @@ class TestObjectExpirer(TestCase):
'processes': 5,
'process': 1,
}
x = expirer.ObjectExpirer(vals)
x = expirer.ObjectExpirer(vals, swift=self.fake_swift)
x.get_process_values({})
self.assertEqual(x.processes, 5)
self.assertEqual(x.process, 1)
@ -210,14 +219,14 @@ class TestObjectExpirer(TestCase):
'process': -1,
}
# from config
x = expirer.ObjectExpirer(vals)
x = expirer.ObjectExpirer(vals, swift=self.fake_swift)
expected_msg = 'process must be an integer greater' \
' than or equal to 0'
with self.assertRaises(ValueError) as ctx:
x.get_process_values({})
self.assertEqual(str(ctx.exception), expected_msg)
# from kwargs
x = expirer.ObjectExpirer({})
x = expirer.ObjectExpirer({}, swift=self.fake_swift)
with self.assertRaises(ValueError) as ctx:
x.get_process_values(vals)
self.assertEqual(str(ctx.exception), expected_msg)
@ -228,14 +237,14 @@ class TestObjectExpirer(TestCase):
'process': 1,
}
# from config
x = expirer.ObjectExpirer(vals)
x = expirer.ObjectExpirer(vals, swift=self.fake_swift)
expected_msg = 'processes must be an integer greater' \
' than or equal to 0'
with self.assertRaises(ValueError) as ctx:
x.get_process_values({})
self.assertEqual(str(ctx.exception), expected_msg)
# from kwargs
x = expirer.ObjectExpirer({})
x = expirer.ObjectExpirer({}, swift=self.fake_swift)
with self.assertRaises(ValueError) as ctx:
x.get_process_values(vals)
self.assertEqual(str(ctx.exception), expected_msg)
@ -246,13 +255,13 @@ class TestObjectExpirer(TestCase):
'process': 7,
}
# from config
x = expirer.ObjectExpirer(vals)
x = expirer.ObjectExpirer(vals, swift=self.fake_swift)
expected_msg = 'process must be less than processes'
with self.assertRaises(ValueError) as ctx:
x.get_process_values({})
self.assertEqual(str(ctx.exception), expected_msg)
# from kwargs
x = expirer.ObjectExpirer({})
x = expirer.ObjectExpirer({}, swift=self.fake_swift)
with self.assertRaises(ValueError) as ctx:
x.get_process_values(vals)
self.assertEqual(str(ctx.exception), expected_msg)
@ -263,13 +272,13 @@ class TestObjectExpirer(TestCase):
'process': 5,
}
# from config
x = expirer.ObjectExpirer(vals)
x = expirer.ObjectExpirer(vals, swift=self.fake_swift)
expected_msg = 'process must be less than processes'
with self.assertRaises(ValueError) as ctx:
x.get_process_values({})
self.assertEqual(str(ctx.exception), expected_msg)
# from kwargs
x = expirer.ObjectExpirer({})
x = expirer.ObjectExpirer({}, swift=self.fake_swift)
with self.assertRaises(ValueError) as ctx:
x.get_process_values(vals)
self.assertEqual(str(ctx.exception), expected_msg)
@ -278,11 +287,13 @@ class TestObjectExpirer(TestCase):
conf = {
'concurrency': 0,
}
self.assertRaises(ValueError, expirer.ObjectExpirer, conf)
with self.assertRaises(ValueError):
expirer.ObjectExpirer(conf, swift=self.fake_swift)
conf = {
'concurrency': -1,
}
self.assertRaises(ValueError, expirer.ObjectExpirer, conf)
with self.assertRaises(ValueError):
expirer.ObjectExpirer(conf, swift=self.fake_swift)
def test_process_based_concurrency(self):
@ -322,7 +333,8 @@ class TestObjectExpirer(TestCase):
self.assertEqual(deleted_objects, expected)
def test_delete_object(self):
x = expirer.ObjectExpirer({}, logger=self.logger)
x = expirer.ObjectExpirer({}, logger=self.logger,
swift=self.fake_swift)
actual_obj = 'actual_obj'
timestamp = int(time())
reclaim_ts = timestamp - x.reclaim_age
@ -382,7 +394,8 @@ class TestObjectExpirer(TestCase):
self.fail("Failed on %r at %f: %s" % (exc, ts, err))
def test_report(self):
x = expirer.ObjectExpirer({}, logger=self.logger)
x = expirer.ObjectExpirer({}, logger=self.logger,
swift=self.fake_swift)
x.report()
self.assertEqual(x.logger.get_lines_for_level('info'), [])
@ -403,7 +416,8 @@ class TestObjectExpirer(TestCase):
'so far' in str(x.logger.get_lines_for_level('info')))
def test_parse_task_obj(self):
x = expirer.ObjectExpirer(self.conf, logger=self.logger)
x = expirer.ObjectExpirer(self.conf, logger=self.logger,
swift=self.fake_swift)
def assert_parse_task_obj(task_obj, expected_delete_at,
expected_account, expected_container,
@ -430,7 +444,8 @@ class TestObjectExpirer(TestCase):
}
def test_round_robin_order(self):
x = expirer.ObjectExpirer(self.conf, logger=self.logger)
x = expirer.ObjectExpirer(self.conf, logger=self.logger,
swift=self.fake_swift)
task_con_obj_list = [
# objects in 0000 timestamp container
self.make_task('0000', 'a/c0/o0'),
@ -539,7 +554,8 @@ class TestObjectExpirer(TestCase):
self.assertEqual(task_con_obj_list, result)
def test_hash_mod(self):
x = expirer.ObjectExpirer(self.conf, logger=self.logger)
x = expirer.ObjectExpirer(self.conf, logger=self.logger,
swift=self.fake_swift)
mod_count = [0, 0, 0]
for i in range(1000):
name = 'obj%d' % i
@ -552,24 +568,28 @@ class TestObjectExpirer(TestCase):
self.assertGreater(mod_count[2], 300)
def test_iter_task_accounts_to_expire(self):
x = expirer.ObjectExpirer(self.conf, logger=self.logger)
x = expirer.ObjectExpirer(self.conf, logger=self.logger,
swift=self.fake_swift)
results = [_ for _ in x.iter_task_accounts_to_expire()]
self.assertEqual(results, [('.expiring_objects', 0, 1)])
self.conf['processes'] = '2'
self.conf['process'] = '1'
x = expirer.ObjectExpirer(self.conf, logger=self.logger)
x = expirer.ObjectExpirer(self.conf, logger=self.logger,
swift=self.fake_swift)
results = [_ for _ in x.iter_task_accounts_to_expire()]
self.assertEqual(results, [('.expiring_objects', 1, 2)])
def test_delete_at_time_of_task_container(self):
x = expirer.ObjectExpirer(self.conf, logger=self.logger)
x = expirer.ObjectExpirer(self.conf, logger=self.logger,
swift=self.fake_swift)
self.assertEqual(x.delete_at_time_of_task_container('0000'), 0)
self.assertEqual(x.delete_at_time_of_task_container('0001'), 1)
self.assertEqual(x.delete_at_time_of_task_container('1000'), 1000)
def test_run_once_nothing_to_do(self):
x = expirer.ObjectExpirer(self.conf, logger=self.logger)
x = expirer.ObjectExpirer(self.conf, logger=self.logger,
swift=self.fake_swift)
x.swift = 'throw error because a string does not have needed methods'
x.run_once()
self.assertEqual(x.logger.get_lines_for_level('error'),
@ -824,8 +844,9 @@ class TestObjectExpirer(TestCase):
raise SystemExit('test_run_forever')
interval = 1234
x = expirer.ObjectExpirer({'__file__': 'unit_test',
'interval': interval})
x = expirer.ObjectExpirer(
{'__file__': 'unit_test', 'interval': interval},
swift=self.fake_swift)
with mock.patch.object(expirer, 'random', not_random), \
mock.patch.object(expirer, 'sleep', not_sleep), \
self.assertRaises(SystemExit) as caught:
@ -843,7 +864,8 @@ class TestObjectExpirer(TestCase):
raise Exception('exception %d' % raises[0])
raise SystemExit('exiting exception %d' % raises[0])
x = expirer.ObjectExpirer({}, logger=self.logger)
x = expirer.ObjectExpirer({}, logger=self.logger,
swift=self.fake_swift)
orig_sleep = expirer.sleep
try:
expirer.sleep = not_sleep
@ -867,9 +889,7 @@ class TestObjectExpirer(TestCase):
start_response('204 No Content', [('Content-Length', '0')])
return []
internal_client.loadapp = lambda *a, **kw: fake_app
x = expirer.ObjectExpirer({})
x = expirer.ObjectExpirer({}, swift=self.make_fake_ic(fake_app))
ts = Timestamp('1234')
x.delete_actual_object('path/to/object', ts, False)
self.assertEqual(got_env[0]['HTTP_X_IF_DELETE_AT'], ts)
@ -886,9 +906,7 @@ class TestObjectExpirer(TestCase):
start_response('204 No Content', [('Content-Length', '0')])
return []
internal_client.loadapp = lambda *a, **kw: fake_app
x = expirer.ObjectExpirer({})
x = expirer.ObjectExpirer({}, swift=self.make_fake_ic(fake_app))
ts = Timestamp('1234')
x.delete_actual_object('path/to/object', ts, True)
self.assertNotIn('HTTP_X_IF_DELETE_AT', got_env[0])
@ -906,9 +924,7 @@ class TestObjectExpirer(TestCase):
start_response('204 No Content', [('Content-Length', '0')])
return []
internal_client.loadapp = lambda *a, **kw: fake_app
x = expirer.ObjectExpirer({})
x = expirer.ObjectExpirer({}, swift=self.make_fake_ic(fake_app))
ts = Timestamp('1234')
x.delete_actual_object('path/to/object name', ts, False)
self.assertEqual(got_env[0]['HTTP_X_IF_DELETE_AT'], ts)
@ -926,9 +942,7 @@ class TestObjectExpirer(TestCase):
start_response(test_status, [('Content-Length', '0')])
return []
internal_client.loadapp = lambda *a, **kw: fake_app
x = expirer.ObjectExpirer({})
x = expirer.ObjectExpirer({}, swift=self.make_fake_ic(fake_app))
ts = Timestamp('1234')
if should_raise:
with self.assertRaises(internal_client.UnexpectedResponse):
@ -954,9 +968,7 @@ class TestObjectExpirer(TestCase):
start_response(test_status, [('Content-Length', '0')])
return []
internal_client.loadapp = lambda *a, **kw: fake_app
x = expirer.ObjectExpirer({})
x = expirer.ObjectExpirer({}, swift=self.make_fake_ic(fake_app))
ts = Timestamp('1234')
if should_raise:
with self.assertRaises(internal_client.UnexpectedResponse):
@ -982,9 +994,7 @@ class TestObjectExpirer(TestCase):
[('Content-Length', '0')])
return []
internal_client.loadapp = lambda *a, **kw: fake_app
x = expirer.ObjectExpirer({})
x = expirer.ObjectExpirer({}, swift=self.make_fake_ic(fake_app))
exc = None
try:
x.delete_actual_object('path/to/object', Timestamp('1234'), False)
@ -997,7 +1007,7 @@ class TestObjectExpirer(TestCase):
def test_delete_actual_object_quotes(self):
name = 'this name/should get/quoted'
timestamp = Timestamp('1366063156.863045')
x = expirer.ObjectExpirer({})
x = expirer.ObjectExpirer({}, swift=self.make_fake_ic(self.fake_swift))
x.swift.make_request = mock.Mock()
x.swift.make_request.return_value.status_int = 204
x.swift.make_request.return_value.app_iter = []
@ -1009,8 +1019,9 @@ class TestObjectExpirer(TestCase):
def test_delete_actual_object_queue_cleaning(self):
name = 'acc/cont/something'
timestamp = Timestamp('1515544858.80602')
x = expirer.ObjectExpirer({})
x.swift.make_request = mock.MagicMock()
x = expirer.ObjectExpirer({}, swift=self.make_fake_ic(self.fake_swift))
x.swift.make_request = mock.MagicMock(
return_value=swob.HTTPNoContent())
x.delete_actual_object(name, timestamp, False)
self.assertEqual(x.swift.make_request.call_count, 1)
header = 'X-Backend-Clean-Expiring-Object-Queue'

View File

@ -1421,10 +1421,7 @@ class TestProxyServerLoading(unittest.TestCase):
object_ring_path = os.path.join(self.tempdir,
policy.ring_name + '.ring.gz')
write_fake_ring(object_ring_path)
app = loadapp(conf_path)
# find the end of the pipeline
while hasattr(app, 'app'):
app = app.app
app = loadapp(conf_path)._pipeline_final_app
# validate loaded rings
self.assertEqual(app.account_ring.serialized_path,