Extend interface on InternalClient

* add get_object
 * allow extra headers passthrough on HEAD/metadata reqeusts
 * expose (account|container|get_object)_ring properties

Pipeline propety access to the auto_create_account_prefix also allows us to
bypass the early exit on a container HEAD for auto_create_accounts if the
container-updater hasn't cycled yet.

Allow overriding of storage policy index.

This is something the reconciler will need so that it can GET from one
policy, PUT in another, and then DELETE from the first one again.

DocImpact
Implements: blueprint storage-policies
Change-Id: I9b287d15f2426022d669d1186c9e22dd8ca13fb9
This commit is contained in:
Clay Gerrard 2014-04-16 17:16:57 -07:00
parent 0015019ccd
commit 8bec50838c
11 changed files with 741 additions and 161 deletions

View File

@ -27,7 +27,7 @@ from zlib import compressobj
from swift.common.utils import quote
from swift.common.http import HTTP_NOT_FOUND
from swift.common.swob import Request
from swift.common.wsgi import loadapp
from swift.common.wsgi import loadapp, pipeline_property
class UnexpectedResponse(Exception):
@ -142,6 +142,12 @@ class InternalClient(object):
self.user_agent = user_agent
self.request_tries = request_tries
get_object_ring = pipeline_property('get_object_ring')
container_ring = pipeline_property('container_ring')
account_ring = pipeline_property('account_ring')
auto_create_account_prefix = pipeline_property(
'auto_create_account_prefix', default='.')
def make_request(
self, method, path, headers, acceptable_statuses, body_file=None):
"""
@ -190,7 +196,8 @@ class InternalClient(object):
raise exc_type(*exc_value.args), None, exc_traceback
def _get_metadata(
self, path, metadata_prefix='', acceptable_statuses=(2,)):
self, path, metadata_prefix='', acceptable_statuses=(2,),
headers=None):
"""
Gets metadata by doing a HEAD on a path and using the metadata_prefix
to get values from the headers returned.
@ -201,6 +208,7 @@ class InternalClient(object):
keys in the dict returned. Defaults to ''.
:param acceptable_statuses: List of status for valid responses,
defaults to (2,).
:param headers: extra headers to send
:returns : A dict of metadata with metadata_prefix stripped from keys.
Keys will be lowercase.
@ -211,9 +219,8 @@ class InternalClient(object):
unexpected way.
"""
resp = self.make_request('HEAD', path, {}, acceptable_statuses)
if not resp.status_int // 100 == 2:
return {}
headers = headers or {}
resp = self.make_request('HEAD', path, headers, acceptable_statuses)
metadata_prefix = metadata_prefix.lower()
metadata = {}
for k, v in resp.headers.iteritems():
@ -544,7 +551,8 @@ class InternalClient(object):
def delete_object(
self, account, container, obj,
acceptable_statuses=(2, HTTP_NOT_FOUND)):
acceptable_statuses=(2, HTTP_NOT_FOUND),
headers=None):
"""
Deletes an object.
@ -553,6 +561,7 @@ class InternalClient(object):
:param obj: The object.
:param acceptable_statuses: List of status for valid responses,
defaults to (2, HTTP_NOT_FOUND).
:param headers: extra headers to send with request
:raises UnexpectedResponse: Exception raised when requests fail
to get a response with an acceptable status
@ -561,11 +570,11 @@ class InternalClient(object):
"""
path = self.make_path(account, container, obj)
self.make_request('DELETE', path, {}, acceptable_statuses)
self.make_request('DELETE', path, (headers or {}), acceptable_statuses)
def get_object_metadata(
self, account, container, obj, metadata_prefix='',
acceptable_statuses=(2,)):
acceptable_statuses=(2,), headers=None):
"""
Gets object metadata.
@ -577,6 +586,7 @@ class InternalClient(object):
keys in the dict returned. Defaults to ''.
:param acceptable_statuses: List of status for valid responses,
defaults to (2,).
:param headers: extra headers to send with request
:returns : Dict of object metadata.
@ -587,7 +597,19 @@ class InternalClient(object):
"""
path = self.make_path(account, container, obj)
return self._get_metadata(path, metadata_prefix, acceptable_statuses)
return self._get_metadata(path, metadata_prefix, acceptable_statuses,
headers=headers)
def get_object(self, account, container, obj, headers,
acceptable_statuses=(2,)):
"""
Returns a 3-tuple (status, headers, iterator of object body)
"""
headers = headers or {}
path = self.make_path(account, container, obj)
resp = self.make_request('GET', path, headers, acceptable_statuses)
return (resp.status_int, resp.headers, resp.app_iter)
def iter_object_lines(
self, account, container, obj, headers=None,

View File

@ -201,6 +201,47 @@ class RestrictedGreenPool(GreenPool):
self.waitall()
def pipeline_property(name, **kwargs):
"""
Create a property accessor for the given name. The property will
dig through the bound instance on which it was accessed for an
attribute "app" and check that object for an attribute of the given
name. If the "app" object does not have such an attribute, it will
look for an attribute "app" on THAT object and continue it's search
from there. If the named attribute cannot be found accessing the
property will raise AttributeError.
If a default kwarg is provided you get that instead of the
AttributeError. When found the attribute will be cached on instance
with the property accessor using the same name as the attribute
prefixed with a leading underscore.
"""
cache_attr_name = '_%s' % name
def getter(self):
cached_value = getattr(self, cache_attr_name, None)
if cached_value:
return cached_value
app = self # first app is on self
while True:
app = getattr(app, 'app', None)
if not app:
break
try:
value = getattr(app, name)
except AttributeError:
continue
setattr(self, cache_attr_name, value)
return value
if 'default' in kwargs:
return kwargs['default']
raise AttributeError('No apps in pipeline have a '
'%s attribute' % name)
return property(getter)
class PipelineWrapper(object):
"""
This class provides a number of utility methods for
@ -292,6 +333,13 @@ def loadcontext(object_type, uri, name=None, relative_to=None,
global_conf=global_conf)
def _add_pipeline_properties(app, *names):
for property_name in names:
if not hasattr(app, property_name):
setattr(app.__class__, property_name,
pipeline_property(property_name))
def loadapp(conf_file, global_conf=None, allow_modify_pipeline=True):
"""
Loads a context from a config file, and if the context is a pipeline

View File

@ -91,8 +91,9 @@ class ObjectController(object):
for header in extra_allowed_headers:
if header not in DATAFILE_SYSTEM_META:
self.allowed_headers.add(header)
self.expiring_objects_account = \
(conf.get('auto_create_account_prefix') or '.') + \
self.auto_create_account_prefix = \
conf.get('auto_create_account_prefix') or '.'
self.expiring_objects_account = self.auto_create_account_prefix + \
(conf.get('expiring_objects_account_name') or 'expiring_objects')
self.expiring_objects_container_divisor = \
int(conf.get('expiring_objects_container_divisor') or 86400)

View File

@ -509,7 +509,8 @@ def get_info(app, env, account, container=None, ret_not_found=False,
path = '/v1/%s' % account
if container:
# Stop and check if we have an account?
if not get_info(app, env, account):
if not get_info(app, env, account) and not account.startswith(
getattr(app, 'auto_create_account_prefix', '.')):
return None
path += '/' + container

View File

@ -196,10 +196,11 @@ class ObjectController(Controller):
container_info = self.container_info(
self.account_name, self.container_name, req)
req.acl = container_info['read_acl']
policy_idx = container_info['storage_policy']
obj_ring = self.app.get_object_ring(policy_idx)
# pass the policy index to storage nodes via req header
req.headers[POLICY_INDEX] = policy_idx
policy_index = req.headers.get(POLICY_INDEX,
container_info['storage_policy'])
obj_ring = self.app.get_object_ring(policy_index)
req.headers[POLICY_INDEX] = policy_index
if 'swift.authorize' in req.environ:
aresp = req.environ['swift.authorize'](req)
if aresp:
@ -301,10 +302,11 @@ class ObjectController(Controller):
self.app.expiring_objects_account, delete_at_container)
else:
delete_at_container = delete_at_part = delete_at_nodes = None
policy_idx = container_info['storage_policy']
obj_ring = self.app.get_object_ring(policy_idx)
# pass the policy index to storage nodes via req header
req.headers[POLICY_INDEX] = policy_idx
policy_index = req.headers.get(POLICY_INDEX,
container_info['storage_policy'])
obj_ring = self.app.get_object_ring(policy_index)
req.headers[POLICY_INDEX] = policy_index
partition, nodes = obj_ring.get_nodes(
self.account_name, self.container_name, self.object_name)
req.headers['X-Timestamp'] = normalize_timestamp(time.time())
@ -456,10 +458,11 @@ class ObjectController(Controller):
body='If-None-Match only supports *')
container_info = self.container_info(
self.account_name, self.container_name, req)
policy_idx = container_info['storage_policy']
obj_ring = self.app.get_object_ring(policy_idx)
policy_index = req.headers.get(POLICY_INDEX,
container_info['storage_policy'])
obj_ring = self.app.get_object_ring(policy_index)
# pass the policy index to storage nodes via req header
req.headers[POLICY_INDEX] = policy_idx
req.headers[POLICY_INDEX] = policy_index
container_partition = container_info['partition']
containers = container_info['nodes']
req.acl = container_info['write_acl']
@ -583,6 +586,8 @@ class ObjectController(Controller):
source_header = '/%s/%s/%s/%s' % (ver, acct,
src_container_name, src_obj_name)
source_req = req.copy_get()
# make sure the source request uses it's container_info
source_req.headers.pop(POLICY_INDEX, None)
source_req.path_info = source_header
source_req.headers['X-Newest'] = 'true'
orig_obj_name = self.object_name
@ -771,10 +776,12 @@ class ObjectController(Controller):
"""HTTP DELETE request handler."""
container_info = self.container_info(
self.account_name, self.container_name, req)
policy_idx = container_info['storage_policy']
obj_ring = self.app.get_object_ring(policy_idx)
# pass the policy index to storage nodes via req header
req.headers[POLICY_INDEX] = policy_idx
policy_index = req.headers.get(POLICY_INDEX,
container_info['storage_policy'])
obj_ring = self.app.get_object_ring(policy_index)
# pass the policy index to storage nodes via req header
req.headers[POLICY_INDEX] = policy_index
container_partition = container_info['partition']
containers = container_info['nodes']
req.acl = container_info['write_acl']

View File

@ -112,8 +112,9 @@ class Application(object):
[os.path.join(swift_dir, 'mime.types')])
self.account_autocreate = \
config_true_value(conf.get('account_autocreate', 'no'))
self.expiring_objects_account = \
(conf.get('auto_create_account_prefix') or '.') + \
self.auto_create_account_prefix = (
conf.get('auto_create_account_prefix') or '.')
self.expiring_objects_account = self.auto_create_account_prefix + \
(conf.get('expiring_objects_account_name') or 'expiring_objects')
self.expiring_objects_container_divisor = \
int(conf.get('expiring_objects_container_divisor') or 86400)

View File

@ -101,5 +101,5 @@ class FakeSwift(object):
def call_count(self):
return len(self._calls)
def register(self, method, path, response_class, headers, body):
def register(self, method, path, response_class, headers, body=''):
self._responses[(method, path)] = (response_class, headers, body)

View File

@ -19,10 +19,16 @@ from StringIO import StringIO
import unittest
from urllib import quote
import zlib
from textwrap import dedent
import os
from test.unit import FakeLogger
from eventlet.green import urllib2
from swift.common import internal_client
from swift.common import swob
from test.unit import with_tempdir, write_fake_ring, patch_policies
from test.unit.common.middleware.helpers import FakeSwift
def not_sleep(seconds):
@ -49,6 +55,21 @@ def make_path(account, container=None, obj=None):
return path
def make_path_info(account, container=None, obj=None):
# FakeSwift keys on PATH_INFO - which is *encoded* but unquoted
path = '/v1/%s' % '/'.join(
p for p in (account, container, obj) if p)
return path.encode('utf-8')
def get_client_app():
app = FakeSwift()
with mock.patch('swift.common.internal_client.loadapp',
new=lambda *args, **kwargs: app):
client = internal_client.InternalClient({}, 'test', 1)
return client, app
class InternalClient(internal_client.InternalClient):
def __init__(self):
pass
@ -63,7 +84,8 @@ class GetMetadataInternalClient(internal_client.InternalClient):
self.get_metadata_called = 0
self.metadata = 'some_metadata'
def _get_metadata(self, path, metadata_prefix, acceptable_statuses=None):
def _get_metadata(self, path, metadata_prefix, acceptable_statuses=None,
headers=None):
self.get_metadata_called += 1
self.test.assertEquals(self.path, path)
self.test.assertEquals(self.metadata_prefix, metadata_prefix)
@ -179,6 +201,52 @@ class TestCompressingfileReader(unittest.TestCase):
class TestInternalClient(unittest.TestCase):
@patch_policies(legacy_only=True)
@mock.patch('swift.common.utils.HASH_PATH_SUFFIX', new='endcap')
@with_tempdir
def test_load_from_config(self, tempdir):
conf_path = os.path.join(tempdir, 'interal_client.conf')
conf_body = """
[DEFAULT]
swift_dir = %s
[pipeline:main]
pipeline = catch_errors cache proxy-server
[app:proxy-server]
use = egg:swift#proxy
auto_create_account_prefix = -
[filter:cache]
use = egg:swift#memcache
[filter:catch_errors]
use = egg:swift#catch_errors
""" % tempdir
with open(conf_path, 'w') as f:
f.write(dedent(conf_body))
account_ring_path = os.path.join(tempdir, 'account.ring.gz')
write_fake_ring(account_ring_path)
container_ring_path = os.path.join(tempdir, 'container.ring.gz')
write_fake_ring(container_ring_path)
object_ring_path = os.path.join(tempdir, 'object.ring.gz')
write_fake_ring(object_ring_path)
client = internal_client.InternalClient(conf_path, 'test', 1)
self.assertEqual(client.account_ring, client.app.app.app.account_ring)
self.assertEqual(client.account_ring.serialized_path,
account_ring_path)
self.assertEqual(client.container_ring,
client.app.app.app.container_ring)
self.assertEqual(client.container_ring.serialized_path,
container_ring_path)
object_ring = client.app.app.app.get_object_ring(0)
self.assertEqual(client.get_object_ring(0),
object_ring)
self.assertEqual(object_ring.serialized_path,
object_ring_path)
self.assertEquals(client.auto_create_account_prefix, '-')
def test_init(self):
class App(object):
def __init__(self, test, conf_path):
@ -428,21 +496,24 @@ class TestInternalClient(unittest.TestCase):
self.assertEquals(1, client.make_request_called)
def test_get_metadata_invalid_status(self):
class Response(object):
def __init__(self):
self.status_int = 404
self.headers = {'some_key': 'some_value'}
class FakeApp(object):
def __call__(self, environ, start_response):
start_response('404 Not Found', [('x-foo', 'bar')])
return ['nope']
class InternalClient(internal_client.InternalClient):
def __init__(self):
pass
def make_request(self, *a, **kw):
return Response()
self.user_agent = 'test'
self.request_tries = 1
self.app = FakeApp()
client = InternalClient()
metadata = client._get_metadata('path')
self.assertEquals({}, metadata)
self.assertRaises(internal_client.UnexpectedResponse,
client._get_metadata, 'path')
metadata = client._get_metadata('path', metadata_prefix='x-',
acceptable_statuses=(4,))
self.assertEqual(metadata, {'foo': 'bar'})
def test_make_path(self):
account, container, obj = path_parts()
@ -653,6 +724,26 @@ class TestInternalClient(unittest.TestCase):
self.assertEquals(client.metadata, metadata)
self.assertEquals(1, client.get_metadata_called)
def test_get_metadadata_with_acceptable_status(self):
account, container, obj = path_parts()
path = make_path_info(account)
client, app = get_client_app()
resp_headers = {'some-important-header': 'some value'}
app.register('GET', path, swob.HTTPOk, resp_headers)
metadata = client.get_account_metadata(
account, acceptable_statuses=(2, 4))
self.assertEqual(metadata['some-important-header'],
'some value')
app.register('GET', path, swob.HTTPNotFound, resp_headers)
metadata = client.get_account_metadata(
account, acceptable_statuses=(2, 4))
self.assertEqual(metadata['some-important-header'],
'some value')
app.register('GET', path, swob.HTTPServerError, resp_headers)
self.assertRaises(internal_client.UnexpectedResponse,
client.get_account_metadata, account,
acceptable_statuses=(2, 4))
def test_set_account_metadata(self):
account, container, obj = path_parts()
path = make_path(account)
@ -823,6 +914,47 @@ class TestInternalClient(unittest.TestCase):
self.assertEquals(client.metadata, metadata)
self.assertEquals(1, client.get_metadata_called)
def test_get_metadata_extra_headers(self):
class InternalClient(internal_client.InternalClient):
def __init__(self):
self.app = self.fake_app
self.user_agent = 'some_agent'
self.request_tries = 3
def fake_app(self, env, start_response):
self.req_env = env
start_response('200 Ok', [('Content-Length', '0')])
return []
client = InternalClient()
headers = {'X-Foo': 'bar'}
client.get_object_metadata('account', 'container', 'obj',
headers=headers)
self.assertEqual(client.req_env['HTTP_X_FOO'], 'bar')
def test_get_object(self):
account, container, obj = path_parts()
path_info = make_path_info(account, container, obj)
client, app = get_client_app()
headers = {'foo': 'bar'}
body = 'some_object_body'
app.register('GET', path_info, swob.HTTPOk, headers, body)
req_headers = {'x-important-header': 'some_important_value'}
status_int, resp_headers, obj_iter = client.get_object(
account, container, obj, req_headers)
self.assertEqual(status_int // 100, 2)
for k, v in headers.items():
self.assertEqual(v, resp_headers[k])
self.assertEqual(''.join(obj_iter), body)
self.assertEqual(resp_headers['content-length'], str(len(body)))
self.assertEqual(app.call_count, 1)
req_headers.update({
'host': 'localhost:80', # from swob.Request.blank
'user-agent': 'test', # from InternalClient.make_request
})
self.assertEqual(app.calls_with_headers, [(
'GET', path_info, swob.HeaderKeyDict(req_headers))])
def test_iter_object_lines(self):
class InternalClient(internal_client.InternalClient):
def __init__(self, lines):

View File

@ -35,12 +35,15 @@ import swift.common.middleware.catch_errors
import swift.common.middleware.gatekeeper
import swift.proxy.server
import swift.obj.server as obj_server
import swift.container.server as container_server
import swift.account.server as account_server
from swift.common.swob import Request
from swift.common import wsgi, utils
from swift.common.storage_policy import StoragePolicy, \
StoragePolicyCollection
from test.unit import temptree, write_fake_ring
from test.unit import temptree, with_tempdir, write_fake_ring, patch_policies
from paste.deploy import loadwsgi
@ -754,6 +757,7 @@ class TestPipelineWrapper(unittest.TestCase):
"<unknown> catch_errors tempurl proxy-server")
@mock.patch('swift.common.utils.HASH_PATH_SUFFIX', new='endcap')
class TestPipelineModification(unittest.TestCase):
def pipeline_modules(self, app):
# This is rather brittle; it'll break if a middleware stores its app
@ -1008,5 +1012,100 @@ class TestPipelineModification(unittest.TestCase):
'swift.common.middleware.dlo',
'swift.proxy.server'])
@patch_policies
@with_tempdir
def test_loadapp_proxy(self, tempdir):
conf_path = os.path.join(tempdir, 'proxy-server.conf')
conf_body = """
[DEFAULT]
swift_dir = %s
[pipeline:main]
pipeline = catch_errors cache proxy-server
[app:proxy-server]
use = egg:swift#proxy
[filter:cache]
use = egg:swift#memcache
[filter:catch_errors]
use = egg:swift#catch_errors
""" % tempdir
with open(conf_path, 'w') as f:
f.write(dedent(conf_body))
account_ring_path = os.path.join(tempdir, 'account.ring.gz')
write_fake_ring(account_ring_path)
container_ring_path = os.path.join(tempdir, 'container.ring.gz')
write_fake_ring(container_ring_path)
object_ring_path = os.path.join(tempdir, 'object.ring.gz')
write_fake_ring(object_ring_path)
object_1_ring_path = os.path.join(tempdir, 'object-1.ring.gz')
write_fake_ring(object_1_ring_path)
app = wsgi.loadapp(conf_path)
proxy_app = app.app.app.app.app
self.assertEqual(proxy_app.account_ring.serialized_path,
account_ring_path)
self.assertEqual(proxy_app.container_ring.serialized_path,
container_ring_path)
self.assertEqual(proxy_app.get_object_ring(0).serialized_path,
object_ring_path)
self.assertEqual(proxy_app.get_object_ring(1).serialized_path,
object_1_ring_path)
@with_tempdir
def test_loadapp_storage(self, tempdir):
expectations = {
'object': obj_server.ObjectController,
'container': container_server.ContainerController,
'account': account_server.AccountController,
}
for server_type, controller in expectations.items():
conf_path = os.path.join(
tempdir, '%s-server.conf' % server_type)
conf_body = """
[DEFAULT]
swift_dir = %s
[app:main]
use = egg:swift#%s
""" % (tempdir, server_type)
with open(conf_path, 'w') as f:
f.write(dedent(conf_body))
app = wsgi.loadapp(conf_path)
self.assertTrue(isinstance(app, controller))
def test_pipeline_property(self):
depth = 3
class FakeApp(object):
pass
class AppFilter(object):
def __init__(self, app):
self.app = app
# make a pipeline
app = FakeApp()
filtered_app = app
for i in range(depth):
filtered_app = AppFilter(filtered_app)
# AttributeError if no apps in the pipeline have attribute
wsgi._add_pipeline_properties(filtered_app, 'foo')
self.assertRaises(AttributeError, getattr, filtered_app, 'foo')
# set the attribute
self.assert_(isinstance(app, FakeApp))
app.foo = 'bar'
self.assertEqual(filtered_app.foo, 'bar')
# attribute is cached
app.foo = 'baz'
self.assertEqual(filtered_app.foo, 'bar')
if __name__ == '__main__':
unittest.main()

View File

@ -13,15 +13,19 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import itertools
from collections import defaultdict
import unittest
from mock import patch
from swift.proxy.controllers.base import headers_to_container_info, \
headers_to_account_info, headers_to_object_info, get_container_info, \
get_container_memcache_key, get_account_info, get_account_memcache_key, \
get_object_env_key, _get_cache_key, get_info, get_object_info, \
Controller, GetOrHeadHandler
from swift.common.swob import Request, HTTPException, HeaderKeyDict
get_object_env_key, get_info, get_object_info, \
Controller, GetOrHeadHandler, _set_info_cache, _set_object_info_cache
from swift.common.swob import Request, HTTPException, HeaderKeyDict, \
RESPONSE_REASONS
from swift.common.utils import split_path
from swift.common.http import is_success
from swift.common.storage_policy import StoragePolicy
from test.unit import fake_http_connect, FakeRing, FakeMemcache
from swift.proxy import server as proxy_server
@ -30,58 +34,119 @@ from swift.common.request_helpers import get_sys_meta_prefix
from test.unit import patch_policies
FakeResponse_status_int = 201
class FakeResponse(object):
def __init__(self, headers, env, account, container, obj):
self.headers = headers
self.status_int = FakeResponse_status_int
self.environ = env
base_headers = {}
def __init__(self, status_int=200, headers=None, body=''):
self.status_int = status_int
self._headers = headers or {}
self.body = body
@property
def headers(self):
if is_success(self.status_int):
self._headers.update(self.base_headers)
return self._headers
class AccountResponse(FakeResponse):
base_headers = {
'x-account-container-count': 333,
'x-account-object-count': 1000,
'x-account-bytes-used': 6666,
}
class ContainerResponse(FakeResponse):
base_headers = {
'x-container-object-count': 1000,
'x-container-bytes-used': 6666,
}
class ObjectResponse(FakeResponse):
base_headers = {
'content-length': 5555,
'content-type': 'text/plain'
}
class DynamicResponseFactory(object):
def __init__(self, *statuses):
if statuses:
self.statuses = iter(statuses)
else:
self.statuses = itertools.repeat(200)
self.stats = defaultdict(int)
response_type = {
'obj': ObjectResponse,
'container': ContainerResponse,
'account': AccountResponse,
}
def _get_response(self, type_):
self.stats[type_] += 1
class_ = self.response_type[type_]
return class_(self.statuses.next())
def get_response(self, environ):
(version, account, container, obj) = split_path(
environ['PATH_INFO'], 2, 4, True)
if obj:
env_key = get_object_env_key(account, container, obj)
resp = self._get_response('obj')
elif container:
resp = self._get_response('container')
else:
cache_key, env_key = _get_cache_key(account, container)
if account and container and obj:
info = headers_to_object_info(headers, FakeResponse_status_int)
elif account and container:
info = headers_to_container_info(headers, FakeResponse_status_int)
else:
info = headers_to_account_info(headers, FakeResponse_status_int)
env[env_key] = info
resp = self._get_response('account')
resp.account = account
resp.container = container
resp.obj = obj
return resp
class FakeRequest(object):
def __init__(self, env, path, swift_source=None):
self.environ = env
(version, account, container, obj) = split_path(path, 2, 4, True)
self.account = account
self.container = container
self.obj = obj
if obj:
stype = 'object'
self.headers = {'content-length': 5555,
'content-type': 'text/plain'}
else:
stype = container and 'container' or 'account'
self.headers = {'x-%s-object-count' % (stype): 1000,
'x-%s-bytes-used' % (stype): 6666}
if swift_source:
meta = 'x-%s-meta-fakerequest-swift-source' % stype
self.headers[meta] = swift_source
class FakeApp(object):
def get_response(self, app):
return FakeResponse(self.headers, self.environ, self.account,
self.container, self.obj)
recheck_container_existence = 30
recheck_account_existence = 30
def __init__(self, response_factory=None, statuses=None):
self.responses = response_factory or \
DynamicResponseFactory(*statuses or [])
self.sources = []
def __call__(self, environ, start_response):
self.sources.append(environ.get('swift.source'))
response = self.responses.get_response(environ)
reason = RESPONSE_REASONS[response.status_int][0]
start_response('%d %s' % (response.status_int, reason),
[(k, v) for k, v in response.headers.items()])
# It's a bit strnage, but the get_info cache stuff relies on the
# app setting some keys in the environment as it makes requests
# (in particular GETorHEAD_base) - so our fake does the same
_set_info_cache(self, environ, response.account,
response.container, response)
if response.obj:
_set_object_info_cache(self, environ, response.account,
response.container, response.obj,
response)
return iter(response.body)
class FakeCache(object):
def __init__(self, val):
self.val = val
class FakeCache(FakeMemcache):
def __init__(self, stub=None, **pre_cached):
super(FakeCache, self).__init__()
if pre_cached:
self.store.update(pre_cached)
self.stub = stub
def get(self, *args):
return self.val
def get(self, key):
return self.stub or self.store.get(key)
@patch_policies([StoragePolicy(0, 'zero', True, object_ring=FakeRing())])
@ -125,144 +190,158 @@ class TestFuncs(unittest.TestCase):
self.assertEqual(resp.environ['swift.account/a']['status'], 200)
def test_get_info(self):
global FakeResponse_status_int
app = FakeApp()
# Do a non cached call to account
env = {}
with patch('swift.proxy.controllers.base.'
'_prepare_pre_auth_info_request', FakeRequest):
info_a = get_info(None, env, 'a')
info_a = get_info(app, env, 'a')
# Check that you got proper info
self.assertEquals(info_a['status'], 201)
self.assertEquals(info_a['status'], 200)
self.assertEquals(info_a['bytes'], 6666)
self.assertEquals(info_a['total_object_count'], 1000)
# Make sure the env cache is set
self.assertEquals(env.get('swift.account/a'), info_a)
# Make sure the app was called
self.assertEqual(app.responses.stats['account'], 1)
# Do an env cached call to account
info_a = get_info(None, env, 'a')
info_a = get_info(app, env, 'a')
# Check that you got proper info
self.assertEquals(info_a['status'], 201)
self.assertEquals(info_a['status'], 200)
self.assertEquals(info_a['bytes'], 6666)
self.assertEquals(info_a['total_object_count'], 1000)
# Make sure the env cache is set
self.assertEquals(env.get('swift.account/a'), info_a)
# Make sure the app was NOT called AGAIN
self.assertEqual(app.responses.stats['account'], 1)
# This time do env cached call to account and non cached to container
with patch('swift.proxy.controllers.base.'
'_prepare_pre_auth_info_request', FakeRequest):
info_c = get_info(None, env, 'a', 'c')
info_c = get_info(app, env, 'a', 'c')
# Check that you got proper info
self.assertEquals(info_a['status'], 201)
self.assertEquals(info_c['status'], 200)
self.assertEquals(info_c['bytes'], 6666)
self.assertEquals(info_c['object_count'], 1000)
# Make sure the env cache is set
self.assertEquals(env.get('swift.account/a'), info_a)
self.assertEquals(env.get('swift.container/a/c'), info_c)
# Make sure the app was called for container
self.assertEqual(app.responses.stats['container'], 1)
# This time do a non cached call to account than non cached to
# container
app = FakeApp()
env = {} # abandon previous call to env
with patch('swift.proxy.controllers.base.'
'_prepare_pre_auth_info_request', FakeRequest):
info_c = get_info(None, env, 'a', 'c')
info_c = get_info(app, env, 'a', 'c')
# Check that you got proper info
self.assertEquals(info_a['status'], 201)
self.assertEquals(info_c['status'], 200)
self.assertEquals(info_c['bytes'], 6666)
self.assertEquals(info_c['object_count'], 1000)
# Make sure the env cache is set
self.assertEquals(env.get('swift.account/a'), info_a)
self.assertEquals(env.get('swift.container/a/c'), info_c)
# check app calls both account and container
self.assertEqual(app.responses.stats['account'], 1)
self.assertEqual(app.responses.stats['container'], 1)
# This time do an env cached call to container while account is not
# cached
del(env['swift.account/a'])
info_c = get_info(None, env, 'a', 'c')
info_c = get_info(app, env, 'a', 'c')
# Check that you got proper info
self.assertEquals(info_a['status'], 201)
self.assertEquals(info_a['status'], 200)
self.assertEquals(info_c['bytes'], 6666)
self.assertEquals(info_c['object_count'], 1000)
# Make sure the env cache is set and account still not cached
self.assertEquals(env.get('swift.container/a/c'), info_c)
# no additional calls were made
self.assertEqual(app.responses.stats['account'], 1)
self.assertEqual(app.responses.stats['container'], 1)
# Do a non cached call to account not found with ret_not_found
app = FakeApp(statuses=(404,))
env = {}
with patch('swift.proxy.controllers.base.'
'_prepare_pre_auth_info_request', FakeRequest):
try:
FakeResponse_status_int = 404
info_a = get_info(None, env, 'a', ret_not_found=True)
finally:
FakeResponse_status_int = 201
info_a = get_info(app, env, 'a', ret_not_found=True)
# Check that you got proper info
self.assertEquals(info_a['status'], 404)
self.assertEquals(info_a['bytes'], 6666)
self.assertEquals(info_a['total_object_count'], 1000)
self.assertEquals(info_a['bytes'], None)
self.assertEquals(info_a['total_object_count'], None)
# Make sure the env cache is set
self.assertEquals(env.get('swift.account/a'), info_a)
# and account was called
self.assertEqual(app.responses.stats['account'], 1)
# Do a cached call to account not found with ret_not_found
info_a = get_info(None, env, 'a', ret_not_found=True)
info_a = get_info(app, env, 'a', ret_not_found=True)
# Check that you got proper info
self.assertEquals(info_a['status'], 404)
self.assertEquals(info_a['bytes'], 6666)
self.assertEquals(info_a['total_object_count'], 1000)
self.assertEquals(info_a['bytes'], None)
self.assertEquals(info_a['total_object_count'], None)
# Make sure the env cache is set
self.assertEquals(env.get('swift.account/a'), info_a)
# add account was NOT called AGAIN
self.assertEqual(app.responses.stats['account'], 1)
# Do a non cached call to account not found without ret_not_found
app = FakeApp(statuses=(404,))
env = {}
with patch('swift.proxy.controllers.base.'
'_prepare_pre_auth_info_request', FakeRequest):
try:
FakeResponse_status_int = 404
info_a = get_info(None, env, 'a')
finally:
FakeResponse_status_int = 201
info_a = get_info(app, env, 'a')
# Check that you got proper info
self.assertEquals(info_a, None)
self.assertEquals(env['swift.account/a']['status'], 404)
# and account was called
self.assertEqual(app.responses.stats['account'], 1)
# Do a cached call to account not found without ret_not_found
info_a = get_info(None, env, 'a')
# Check that you got proper info
self.assertEquals(info_a, None)
self.assertEquals(env['swift.account/a']['status'], 404)
# add account was NOT called AGAIN
self.assertEqual(app.responses.stats['account'], 1)
def test_get_container_info_swift_source(self):
req = Request.blank("/v1/a/c", environ={'swift.cache': FakeCache({})})
with patch('swift.proxy.controllers.base.'
'_prepare_pre_auth_info_request', FakeRequest):
resp = get_container_info(req.environ, 'app', swift_source='MC')
self.assertEquals(resp['meta']['fakerequest-swift-source'], 'MC')
app = FakeApp()
req = Request.blank("/v1/a/c", environ={'swift.cache': FakeCache()})
get_container_info(req.environ, app, swift_source='MC')
self.assertEqual(app.sources, ['GET_INFO', 'MC'])
def test_get_object_info_swift_source(self):
app = FakeApp()
req = Request.blank("/v1/a/c/o",
environ={'swift.cache': FakeCache({})})
with patch('swift.proxy.controllers.base.'
'_prepare_pre_auth_info_request', FakeRequest):
resp = get_object_info(req.environ, 'app', swift_source='LU')
self.assertEquals(resp['meta']['fakerequest-swift-source'], 'LU')
environ={'swift.cache': FakeCache()})
get_object_info(req.environ, app, swift_source='LU')
self.assertEqual(app.sources, ['LU'])
def test_get_container_info_no_cache(self):
req = Request.blank("/v1/AUTH_account/cont",
environ={'swift.cache': FakeCache({})})
with patch('swift.proxy.controllers.base.'
'_prepare_pre_auth_info_request', FakeRequest):
resp = get_container_info(req.environ, 'xxx')
resp = get_container_info(req.environ, FakeApp())
self.assertEquals(resp['bytes'], 6666)
self.assertEquals(resp['object_count'], 1000)
def test_get_container_info_no_account(self):
responses = DynamicResponseFactory(404, 200)
app = FakeApp(responses)
req = Request.blank("/v1/AUTH_does_not_exist/cont")
info = get_container_info(req.environ, app)
self.assertEqual(info['status'], 0)
def test_get_container_info_no_auto_account(self):
responses = DynamicResponseFactory(404, 200)
app = FakeApp(responses)
req = Request.blank("/v1/.system_account/cont")
info = get_container_info(req.environ, app)
self.assertEqual(info['status'], 200)
self.assertEquals(info['bytes'], 6666)
self.assertEquals(info['object_count'], 1000)
def test_get_container_info_cache(self):
cached = {'status': 404,
'bytes': 3333,
'object_count': 10,
# simplejson sometimes hands back strings, sometimes unicodes
'versions': u"\u1F4A9"}
cache_stub = {
'status': 404, 'bytes': 3333, 'object_count': 10,
# simplejson sometimes hands back strings, sometimes unicodes
'versions': u"\u1F4A9"}
req = Request.blank("/v1/account/cont",
environ={'swift.cache': FakeCache(cached)})
with patch('swift.proxy.controllers.base.'
'_prepare_pre_auth_info_request', FakeRequest):
resp = get_container_info(req.environ, 'xxx')
environ={'swift.cache': FakeCache(cache_stub)})
resp = get_container_info(req.environ, FakeApp())
self.assertEquals(resp['bytes'], 3333)
self.assertEquals(resp['object_count'], 10)
self.assertEquals(resp['status'], 404)
@ -278,18 +357,16 @@ class TestFuncs(unittest.TestCase):
self.assertEquals(resp['bytes'], 3867)
def test_get_account_info_swift_source(self):
req = Request.blank("/v1/a", environ={'swift.cache': FakeCache({})})
with patch('swift.proxy.controllers.base.'
'_prepare_pre_auth_info_request', FakeRequest):
resp = get_account_info(req.environ, 'a', swift_source='MC')
self.assertEquals(resp['meta']['fakerequest-swift-source'], 'MC')
app = FakeApp()
req = Request.blank("/v1/a", environ={'swift.cache': FakeCache()})
get_account_info(req.environ, app, swift_source='MC')
self.assertEqual(app.sources, ['MC'])
def test_get_account_info_no_cache(self):
app = FakeApp()
req = Request.blank("/v1/AUTH_account",
environ={'swift.cache': FakeCache({})})
with patch('swift.proxy.controllers.base.'
'_prepare_pre_auth_info_request', FakeRequest):
resp = get_account_info(req.environ, 'xxx')
resp = get_account_info(req.environ, app)
self.assertEquals(resp['bytes'], 6666)
self.assertEquals(resp['total_object_count'], 1000)
@ -300,9 +377,7 @@ class TestFuncs(unittest.TestCase):
'total_object_count': 10}
req = Request.blank("/v1/account/cont",
environ={'swift.cache': FakeCache(cached)})
with patch('swift.proxy.controllers.base.'
'_prepare_pre_auth_info_request', FakeRequest):
resp = get_account_info(req.environ, 'xxx')
resp = get_account_info(req.environ, FakeApp())
self.assertEquals(resp['bytes'], 3333)
self.assertEquals(resp['total_object_count'], 10)
self.assertEquals(resp['status'], 404)
@ -315,9 +390,7 @@ class TestFuncs(unittest.TestCase):
'meta': {}}
req = Request.blank("/v1/account/cont",
environ={'swift.cache': FakeCache(cached)})
with patch('swift.proxy.controllers.base.'
'_prepare_pre_auth_info_request', FakeRequest):
resp = get_account_info(req.environ, 'xxx')
resp = get_account_info(req.environ, FakeApp())
self.assertEquals(resp['status'], 404)
self.assertEquals(resp['bytes'], '3333')
self.assertEquals(resp['container_count'], 234)
@ -347,11 +420,13 @@ class TestFuncs(unittest.TestCase):
self.assertEquals(resp['type'], 'application/json')
def test_get_object_info_no_env(self):
app = FakeApp()
req = Request.blank("/v1/account/cont/obj",
environ={'swift.cache': FakeCache({})})
with patch('swift.proxy.controllers.base.'
'_prepare_pre_auth_info_request', FakeRequest):
resp = get_object_info(req.environ, 'xxx')
resp = get_object_info(req.environ, app)
self.assertEqual(app.responses.stats['account'], 0)
self.assertEqual(app.responses.stats['container'], 0)
self.assertEqual(app.responses.stats['obj'], 1)
self.assertEquals(resp['length'], 5555)
self.assertEquals(resp['type'], 'text/plain')

View File

@ -19,6 +19,7 @@ import sys
import unittest
from contextlib import contextmanager, nested
from shutil import rmtree
from StringIO import StringIO
import gc
import time
from textwrap import dedent
@ -46,7 +47,7 @@ from swift.container import server as container_server
from swift.obj import server as object_server
from swift.common.middleware import proxy_logging
from swift.common.middleware.acl import parse_acl, format_acl
from swift.common.exceptions import ChunkReadTimeout
from swift.common.exceptions import ChunkReadTimeout, DiskFileNotExist
from swift.common import utils, constraints
from swift.common.utils import mkdirs, normalize_timestamp, NullLogger
from swift.common.wsgi import monkey_patch_mimetools, loadapp
@ -1022,6 +1023,80 @@ class TestObjectController(unittest.TestCase):
check_file(2, 'c2', ['sde1', 'sdf1'], True)
check_file(2, 'c2', ['sda1', 'sdb1', 'sdc1', 'sdd1'], False)
@unpatch_policies
def test_policy_IO_override(self):
if hasattr(_test_servers[-1], '_filesystem'):
# ironically, the _filesystem attribute on the object server means
# the in-memory diskfile is in use, so this test does not apply
return
prosrv = _test_servers[0]
# validate container policy is 1
req = Request.blank('/v1/a/c1', method='HEAD')
res = req.get_response(prosrv)
self.assertEqual(res.status_int, 204) # sanity check
self.assertEqual(POLICIES[1].name, res.headers['x-storage-policy'])
# check overrides: put it in policy 2 (not where the container says)
req = Request.blank(
'/v1/a/c1/wrong-o',
environ={'REQUEST_METHOD': 'PUT',
'wsgi.input': StringIO("hello")},
headers={'Content-Type': 'text/plain',
'Content-Length': '5',
'X-Backend-Storage-Policy-Index': '2'})
res = req.get_response(prosrv)
self.assertEqual(res.status_int, 201) # sanity check
# go to disk to make sure it's there
partition, nodes = prosrv.get_object_ring(2).get_nodes(
'a', 'c1', 'wrong-o')
node = nodes[0]
conf = {'devices': _testdir, 'mount_check': 'false'}
df_mgr = diskfile.DiskFileManager(conf, FakeLogger())
df = df_mgr.get_diskfile(node['device'], partition, 'a',
'c1', 'wrong-o', policy_idx=2)
with df.open():
contents = ''.join(df.reader())
self.assertEqual(contents, "hello")
# can't get it from the normal place
req = Request.blank('/v1/a/c1/wrong-o',
environ={'REQUEST_METHOD': 'GET'},
headers={'Content-Type': 'text/plain'})
res = req.get_response(prosrv)
self.assertEqual(res.status_int, 404) # sanity check
# but we can get it from policy 2
req = Request.blank('/v1/a/c1/wrong-o',
environ={'REQUEST_METHOD': 'GET'},
headers={'Content-Type': 'text/plain',
'X-Backend-Storage-Policy-Index': '2'})
res = req.get_response(prosrv)
self.assertEqual(res.status_int, 200)
self.assertEqual(res.body, 'hello')
# and we can delete it the same way
req = Request.blank('/v1/a/c1/wrong-o',
environ={'REQUEST_METHOD': 'DELETE'},
headers={'Content-Type': 'text/plain',
'X-Backend-Storage-Policy-Index': '2'})
res = req.get_response(prosrv)
self.assertEqual(res.status_int, 204)
df = df_mgr.get_diskfile(node['device'], partition, 'a',
'c1', 'wrong-o', policy_idx=2)
try:
df.open()
except DiskFileNotExist as e:
now = time.time()
self.assert_(now - 1 < float(e.timestamp) < now + 1)
else:
self.fail('did not raise DiskFileNotExist')
@unpatch_policies
def test_GET_newest_large_file(self):
prolis = _test_sockets[0]
@ -1599,6 +1674,125 @@ class TestObjectController(unittest.TestCase):
test_status_map((200, 200, 404, 500, 500), 503)
test_status_map((200, 200, 404, 404, 404), 404)
@patch_policies([
StoragePolicy(0, 'zero', is_default=True, object_ring=FakeRing()),
StoragePolicy(1, 'one', object_ring=FakeRing()),
])
def test_POST_backend_headers(self):
self.app.object_post_as_copy = False
self.app.sort_nodes = lambda nodes: nodes
backend_requests = []
def capture_requests(ip, port, method, path, headers, *args,
**kwargs):
backend_requests.append((method, path, headers))
req = Request.blank('/v1/a/c/o', {}, method='POST',
headers={'X-Object-Meta-Color': 'Blue'})
# we want the container_info response to says a policy index of 1
resp_headers = {'X-Backend-Storage-Policy-Index': 1}
with mocked_http_conn(
200, 200, 202, 202, 202,
headers=resp_headers, give_connect=capture_requests
) as fake_conn:
resp = req.get_response(self.app)
self.assertRaises(StopIteration, fake_conn.code_iter.next)
self.assertEqual(resp.status_int, 202)
self.assertEqual(len(backend_requests), 5)
def check_request(req, method, path, headers=None):
req_method, req_path, req_headers = req
self.assertEqual(method, req_method)
# caller can ignore leading path parts
self.assertTrue(req_path.endswith(path))
headers = headers or {}
# caller can ignore some headers
for k, v in headers.items():
self.assertEqual(req_headers[k], v)
account_request = backend_requests.pop(0)
check_request(account_request, method='HEAD', path='/sda/1/a')
container_request = backend_requests.pop(0)
check_request(container_request, method='HEAD', path='/sda/1/a/c')
for i, (device, request) in enumerate(zip(('sda', 'sdb', 'sdc'),
backend_requests)):
expectations = {
'method': 'POST',
'path': '/%s/1/a/c/o' % device,
'headers': {
'X-Container-Host': '10.0.0.%d:100%d' % (i, i),
'X-Container-Partition': '1',
'Connection': 'close',
'User-Agent': 'proxy-server %s' % os.getpid(),
'Host': 'localhost:80',
'X-Container-Device': device,
'Referer': 'POST http://localhost/v1/a/c/o',
'X-Object-Meta-Color': 'Blue',
POLICY_INDEX: '1'
},
}
check_request(request, **expectations)
# and again with policy override
self.app.memcache.store = {}
backend_requests = []
req = Request.blank('/v1/a/c/o', {}, method='POST',
headers={'X-Object-Meta-Color': 'Blue',
POLICY_INDEX: 0})
with mocked_http_conn(
200, 200, 202, 202, 202,
headers=resp_headers, give_connect=capture_requests
) as fake_conn:
resp = req.get_response(self.app)
self.assertRaises(StopIteration, fake_conn.code_iter.next)
self.assertEqual(resp.status_int, 202)
self.assertEqual(len(backend_requests), 5)
for request in backend_requests[2:]:
expectations = {
'method': 'POST',
'path': '/1/a/c/o', # ignore device bit
'headers': {
'X-Object-Meta-Color': 'Blue',
POLICY_INDEX: '0',
}
}
check_request(request, **expectations)
# and this time with post as copy
self.app.object_post_as_copy = True
self.app.memcache.store = {}
backend_requests = []
req = Request.blank('/v1/a/c/o', {}, method='POST',
headers={'X-Object-Meta-Color': 'Blue',
POLICY_INDEX: 0})
with mocked_http_conn(
200, 200, 200, 200, 200, 201, 201, 201,
headers=resp_headers, give_connect=capture_requests
) as fake_conn:
resp = req.get_response(self.app)
self.assertRaises(StopIteration, fake_conn.code_iter.next)
self.assertEqual(resp.status_int, 202)
self.assertEqual(len(backend_requests), 8)
policy0 = {POLICY_INDEX: '0'}
policy1 = {POLICY_INDEX: '1'}
expected = [
# account info
{'method': 'HEAD', 'path': '/1/a'},
# container info
{'method': 'HEAD', 'path': '/1/a/c'},
# x-newests
{'method': 'GET', 'path': '/1/a/c/o', 'headers': policy1},
{'method': 'GET', 'path': '/1/a/c/o', 'headers': policy1},
{'method': 'GET', 'path': '/1/a/c/o', 'headers': policy1},
# new writes
{'method': 'PUT', 'path': '/1/a/c/o', 'headers': policy0},
{'method': 'PUT', 'path': '/1/a/c/o', 'headers': policy0},
{'method': 'PUT', 'path': '/1/a/c/o', 'headers': policy0},
]
for request, expectations in zip(backend_requests, expected):
check_request(request, **expectations)
def test_POST_as_copy(self):
with save_globals():
def test_status_map(statuses, expected):