Merge "return the SLO etag generated from the segment etags on PUT"
This commit is contained in:
@@ -138,12 +138,14 @@ from urllib import quote
|
|||||||
from cStringIO import StringIO
|
from cStringIO import StringIO
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
import mimetypes
|
import mimetypes
|
||||||
|
from hashlib import md5
|
||||||
from swift.common.swob import Request, HTTPBadRequest, HTTPServerError, \
|
from swift.common.swob import Request, HTTPBadRequest, HTTPServerError, \
|
||||||
HTTPMethodNotAllowed, HTTPRequestEntityTooLarge, HTTPLengthRequired, \
|
HTTPMethodNotAllowed, HTTPRequestEntityTooLarge, HTTPLengthRequired, \
|
||||||
HTTPOk, HTTPPreconditionFailed, wsgify
|
HTTPOk, HTTPPreconditionFailed, HTTPException
|
||||||
from swift.common.utils import json, get_logger, config_true_value
|
from swift.common.utils import json, get_logger, config_true_value
|
||||||
from swift.common.constraints import check_utf8, MAX_BUFFERED_SLO_SEGMENTS
|
from swift.common.constraints import check_utf8, MAX_BUFFERED_SLO_SEGMENTS
|
||||||
from swift.common.http import HTTP_NOT_FOUND
|
from swift.common.http import HTTP_NOT_FOUND
|
||||||
|
from swift.common.wsgi import WSGIContext
|
||||||
from swift.common.middleware.bulk import get_response_body, \
|
from swift.common.middleware.bulk import get_response_body, \
|
||||||
ACCEPTABLE_FORMATS, Bulk
|
ACCEPTABLE_FORMATS, Bulk
|
||||||
|
|
||||||
@@ -171,6 +173,26 @@ def parse_input(raw_data):
|
|||||||
return parsed_data
|
return parsed_data
|
||||||
|
|
||||||
|
|
||||||
|
class SloContext(WSGIContext):
|
||||||
|
|
||||||
|
def __init__(self, slo, slo_etag):
|
||||||
|
WSGIContext.__init__(self, slo.app)
|
||||||
|
self.slo_etag = '"' + slo_etag.hexdigest() + '"'
|
||||||
|
|
||||||
|
def handle_slo_put(self, req, start_response):
|
||||||
|
app_resp = self._app_call(req.environ)
|
||||||
|
|
||||||
|
for i in xrange(len(self._response_headers)):
|
||||||
|
if self._response_headers[i][0].lower() == 'etag':
|
||||||
|
self._response_headers[i] = ('Etag', self.slo_etag)
|
||||||
|
break
|
||||||
|
|
||||||
|
start_response(self._response_status,
|
||||||
|
self._response_headers,
|
||||||
|
self._response_exc_info)
|
||||||
|
return app_resp
|
||||||
|
|
||||||
|
|
||||||
class StaticLargeObject(object):
|
class StaticLargeObject(object):
|
||||||
"""
|
"""
|
||||||
StaticLargeObject Middleware
|
StaticLargeObject Middleware
|
||||||
@@ -197,11 +219,12 @@ class StaticLargeObject(object):
|
|||||||
self.bulk_deleter = Bulk(
|
self.bulk_deleter = Bulk(
|
||||||
app, {'max_deletes_per_request': self.max_manifest_segments})
|
app, {'max_deletes_per_request': self.max_manifest_segments})
|
||||||
|
|
||||||
def handle_multipart_put(self, req):
|
def handle_multipart_put(self, req, start_response):
|
||||||
"""
|
"""
|
||||||
Will handle the PUT of a SLO manifest.
|
Will handle the PUT of a SLO manifest.
|
||||||
Heads every object in manifest to check if is valid and if so will
|
Heads every object in manifest to check if is valid and if so will
|
||||||
save a manifest generated from the user input.
|
save a manifest generated from the user input. Uses WSGIContext to
|
||||||
|
call self.app and start_response and returns a WSGI iterator.
|
||||||
|
|
||||||
:params req: a swob.Request with an obj in path
|
:params req: a swob.Request with an obj in path
|
||||||
:raises: HttpException on errors
|
:raises: HttpException on errors
|
||||||
@@ -209,7 +232,7 @@ class StaticLargeObject(object):
|
|||||||
try:
|
try:
|
||||||
vrs, account, container, obj = req.split_path(1, 4, True)
|
vrs, account, container, obj = req.split_path(1, 4, True)
|
||||||
except ValueError:
|
except ValueError:
|
||||||
return self.app
|
return self.app(req.environ, start_response)
|
||||||
if req.content_length > self.max_manifest_size:
|
if req.content_length > self.max_manifest_size:
|
||||||
raise HTTPRequestEntityTooLarge(
|
raise HTTPRequestEntityTooLarge(
|
||||||
"Manifest File > %d bytes" % self.max_manifest_size)
|
"Manifest File > %d bytes" % self.max_manifest_size)
|
||||||
@@ -230,6 +253,7 @@ class StaticLargeObject(object):
|
|||||||
if not out_content_type:
|
if not out_content_type:
|
||||||
out_content_type = 'text/plain'
|
out_content_type = 'text/plain'
|
||||||
data_for_storage = []
|
data_for_storage = []
|
||||||
|
slo_etag = md5()
|
||||||
for index, seg_dict in enumerate(parsed_data):
|
for index, seg_dict in enumerate(parsed_data):
|
||||||
obj_path = '/'.join(
|
obj_path = '/'.join(
|
||||||
['', vrs, account, seg_dict['path'].lstrip('/')])
|
['', vrs, account, seg_dict['path'].lstrip('/')])
|
||||||
@@ -260,7 +284,9 @@ class StaticLargeObject(object):
|
|||||||
total_size += seg_size
|
total_size += seg_size
|
||||||
if seg_size != head_seg_resp.content_length:
|
if seg_size != head_seg_resp.content_length:
|
||||||
problem_segments.append([quote(obj_path), 'Size Mismatch'])
|
problem_segments.append([quote(obj_path), 'Size Mismatch'])
|
||||||
if seg_dict['etag'] != head_seg_resp.etag:
|
if seg_dict['etag'] == head_seg_resp.etag:
|
||||||
|
slo_etag.update(seg_dict['etag'])
|
||||||
|
else:
|
||||||
problem_segments.append([quote(obj_path), 'Etag Mismatch'])
|
problem_segments.append([quote(obj_path), 'Etag Mismatch'])
|
||||||
if head_seg_resp.last_modified:
|
if head_seg_resp.last_modified:
|
||||||
last_modified = head_seg_resp.last_modified
|
last_modified = head_seg_resp.last_modified
|
||||||
@@ -298,7 +324,9 @@ class StaticLargeObject(object):
|
|||||||
json_data = json.dumps(data_for_storage)
|
json_data = json.dumps(data_for_storage)
|
||||||
env['CONTENT_LENGTH'] = str(len(json_data))
|
env['CONTENT_LENGTH'] = str(len(json_data))
|
||||||
env['wsgi.input'] = StringIO(json_data)
|
env['wsgi.input'] = StringIO(json_data)
|
||||||
return self.app
|
|
||||||
|
slo_context = SloContext(self, slo_etag)
|
||||||
|
return slo_context.handle_slo_put(req, start_response)
|
||||||
|
|
||||||
def get_segments_to_delete_iter(self, req):
|
def get_segments_to_delete_iter(self, req):
|
||||||
"""
|
"""
|
||||||
@@ -376,30 +404,34 @@ class StaticLargeObject(object):
|
|||||||
out_content_type=out_content_type)
|
out_content_type=out_content_type)
|
||||||
return resp
|
return resp
|
||||||
|
|
||||||
@wsgify
|
def __call__(self, env, start_response):
|
||||||
def __call__(self, req):
|
|
||||||
"""
|
"""
|
||||||
WSGI entry point
|
WSGI entry point
|
||||||
"""
|
"""
|
||||||
|
req = Request(env)
|
||||||
try:
|
try:
|
||||||
vrs, account, container, obj = req.split_path(1, 4, True)
|
vrs, account, container, obj = req.split_path(1, 4, True)
|
||||||
except ValueError:
|
except ValueError:
|
||||||
return self.app
|
return self.app(env, start_response)
|
||||||
if obj:
|
try:
|
||||||
if req.method == 'PUT' and \
|
if obj:
|
||||||
req.params.get('multipart-manifest') == 'put':
|
if req.method == 'PUT' and \
|
||||||
return self.handle_multipart_put(req)
|
req.params.get('multipart-manifest') == 'put':
|
||||||
if req.method == 'DELETE' and \
|
return self.handle_multipart_put(req, start_response)
|
||||||
req.params.get('multipart-manifest') == 'delete':
|
if req.method == 'DELETE' and \
|
||||||
return self.handle_multipart_delete(req)
|
req.params.get('multipart-manifest') == 'delete':
|
||||||
if 'X-Static-Large-Object' in req.headers:
|
return self.handle_multipart_delete(req)(env,
|
||||||
raise HTTPBadRequest(
|
start_response)
|
||||||
request=req,
|
if 'X-Static-Large-Object' in req.headers:
|
||||||
body='X-Static-Large-Object is a reserved header. '
|
raise HTTPBadRequest(
|
||||||
'To create a static large object add query param '
|
request=req,
|
||||||
'multipart-manifest=put.')
|
body='X-Static-Large-Object is a reserved header. '
|
||||||
|
'To create a static large object add query param '
|
||||||
|
'multipart-manifest=put.')
|
||||||
|
except HTTPException, err_resp:
|
||||||
|
return err_resp(env, start_response)
|
||||||
|
|
||||||
return self.app
|
return self.app(env, start_response)
|
||||||
|
|
||||||
|
|
||||||
def filter_factory(global_conf, **local_conf):
|
def filter_factory(global_conf, **local_conf):
|
||||||
|
|||||||
@@ -15,6 +15,7 @@
|
|||||||
|
|
||||||
import unittest
|
import unittest
|
||||||
from mock import patch
|
from mock import patch
|
||||||
|
from hashlib import md5
|
||||||
from swift.common.middleware import slo
|
from swift.common.middleware import slo
|
||||||
from swift.common.utils import json
|
from swift.common.utils import json
|
||||||
from swift.common.swob import Request, Response, HTTPException
|
from swift.common.swob import Request, Response, HTTPException
|
||||||
@@ -194,7 +195,7 @@ class TestStaticLargeObject(unittest.TestCase):
|
|||||||
req = Request.blank('/v/a/c/o')
|
req = Request.blank('/v/a/c/o')
|
||||||
req.content_length = self.slo.max_manifest_size + 1
|
req.content_length = self.slo.max_manifest_size + 1
|
||||||
try:
|
try:
|
||||||
self.slo.handle_multipart_put(req)
|
self.slo.handle_multipart_put(req, fake_start_response)
|
||||||
except HTTPException, e:
|
except HTTPException, e:
|
||||||
pass
|
pass
|
||||||
self.assertEquals(e.status_int, 413)
|
self.assertEquals(e.status_int, 413)
|
||||||
@@ -203,7 +204,7 @@ class TestStaticLargeObject(unittest.TestCase):
|
|||||||
req = Request.blank('/v/a/c/o', body=test_json_data)
|
req = Request.blank('/v/a/c/o', body=test_json_data)
|
||||||
e = None
|
e = None
|
||||||
try:
|
try:
|
||||||
self.slo.handle_multipart_put(req)
|
self.slo.handle_multipart_put(req, fake_start_response)
|
||||||
except HTTPException, e:
|
except HTTPException, e:
|
||||||
pass
|
pass
|
||||||
self.assertEquals(e.status_int, 413)
|
self.assertEquals(e.status_int, 413)
|
||||||
@@ -211,14 +212,14 @@ class TestStaticLargeObject(unittest.TestCase):
|
|||||||
with patch.object(self.slo, 'min_segment_size', 1000):
|
with patch.object(self.slo, 'min_segment_size', 1000):
|
||||||
req = Request.blank('/v/a/c/o', body=test_json_data)
|
req = Request.blank('/v/a/c/o', body=test_json_data)
|
||||||
try:
|
try:
|
||||||
self.slo.handle_multipart_put(req)
|
self.slo.handle_multipart_put(req, fake_start_response)
|
||||||
except HTTPException, e:
|
except HTTPException, e:
|
||||||
pass
|
pass
|
||||||
self.assertEquals(e.status_int, 400)
|
self.assertEquals(e.status_int, 400)
|
||||||
|
|
||||||
req = Request.blank('/v/a/c/o', headers={'X-Copy-From': 'lala'})
|
req = Request.blank('/v/a/c/o', headers={'X-Copy-From': 'lala'})
|
||||||
try:
|
try:
|
||||||
self.slo.handle_multipart_put(req)
|
self.slo.handle_multipart_put(req, fake_start_response)
|
||||||
except HTTPException, e:
|
except HTTPException, e:
|
||||||
pass
|
pass
|
||||||
self.assertEquals(e.status_int, 405)
|
self.assertEquals(e.status_int, 405)
|
||||||
@@ -227,7 +228,9 @@ class TestStaticLargeObject(unittest.TestCase):
|
|||||||
req = Request.blank(
|
req = Request.blank(
|
||||||
'/?multipart-manifest=put',
|
'/?multipart-manifest=put',
|
||||||
environ={'REQUEST_METHOD': 'PUT'}, body=test_json_data)
|
environ={'REQUEST_METHOD': 'PUT'}, body=test_json_data)
|
||||||
self.assertEquals(self.slo.handle_multipart_put(req), self.app)
|
self.assertEquals(
|
||||||
|
self.slo.handle_multipart_put(req, fake_start_response),
|
||||||
|
['passed'])
|
||||||
|
|
||||||
def test_handle_multipart_put_success(self):
|
def test_handle_multipart_put_success(self):
|
||||||
req = Request.blank(
|
req = Request.blank(
|
||||||
@@ -235,7 +238,12 @@ class TestStaticLargeObject(unittest.TestCase):
|
|||||||
environ={'REQUEST_METHOD': 'PUT'}, headers={'Accept': 'test'},
|
environ={'REQUEST_METHOD': 'PUT'}, headers={'Accept': 'test'},
|
||||||
body=test_json_data)
|
body=test_json_data)
|
||||||
self.assertTrue('X-Static-Large-Object' not in req.headers)
|
self.assertTrue('X-Static-Large-Object' not in req.headers)
|
||||||
self.slo(req.environ, fake_start_response)
|
|
||||||
|
def my_fake_start_response(*args, **kwargs):
|
||||||
|
gen_etag = '"' + md5('etagoftheobjectsegment').hexdigest() + '"'
|
||||||
|
self.assertTrue(('Etag', gen_etag) in args[1])
|
||||||
|
|
||||||
|
self.slo(req.environ, my_fake_start_response)
|
||||||
self.assertTrue('X-Static-Large-Object' in req.headers)
|
self.assertTrue('X-Static-Large-Object' in req.headers)
|
||||||
|
|
||||||
def test_handle_multipart_put_success_allow_small_last_segment(self):
|
def test_handle_multipart_put_success_allow_small_last_segment(self):
|
||||||
@@ -282,7 +290,8 @@ class TestStaticLargeObject(unittest.TestCase):
|
|||||||
req = Request.blank(
|
req = Request.blank(
|
||||||
'/test_good/AUTH_test/c/man?multipart-manifest=put',
|
'/test_good/AUTH_test/c/man?multipart-manifest=put',
|
||||||
environ={'REQUEST_METHOD': 'PUT'}, body=bad_data)
|
environ={'REQUEST_METHOD': 'PUT'}, body=bad_data)
|
||||||
self.assertRaises(HTTPException, self.slo.handle_multipart_put, req)
|
self.assertRaises(HTTPException, self.slo.handle_multipart_put, req,
|
||||||
|
fake_start_response)
|
||||||
|
|
||||||
for bad_data in [
|
for bad_data in [
|
||||||
json.dumps([{'path': '/cont', 'etag': 'etagoftheobj',
|
json.dumps([{'path': '/cont', 'etag': 'etagoftheobj',
|
||||||
@@ -305,17 +314,17 @@ class TestStaticLargeObject(unittest.TestCase):
|
|||||||
'/test_good/AUTH_test/c/man?multipart-manifest=put',
|
'/test_good/AUTH_test/c/man?multipart-manifest=put',
|
||||||
environ={'REQUEST_METHOD': 'PUT'}, body=bad_data)
|
environ={'REQUEST_METHOD': 'PUT'}, body=bad_data)
|
||||||
self.assertRaises(HTTPException, self.slo.handle_multipart_put,
|
self.assertRaises(HTTPException, self.slo.handle_multipart_put,
|
||||||
req)
|
req, fake_start_response)
|
||||||
|
|
||||||
def test_handle_multipart_put_check_data(self):
|
def test_handle_multipart_put_check_data(self):
|
||||||
good_data = json.dumps(
|
good_data = json.dumps(
|
||||||
[{'path': '/c/a_1', 'etag': 'a', 'size_bytes': '1'},
|
[{'path': '/c/a_1', 'etag': 'a', 'size_bytes': '1'},
|
||||||
{'path': '/d/b_2', 'etag': 'b', 'size_bytes': '2'}])
|
{'path': '/d/b_2', 'etag': 'b', 'size_bytes': '2'}])
|
||||||
req = Request.blank(
|
req = Request.blank(
|
||||||
'/test_good_check/A/c/man?multipart-manifest=put',
|
'/test_good_check/A/c/man_3?multipart-manifest=put',
|
||||||
environ={'REQUEST_METHOD': 'PUT'}, body=good_data)
|
environ={'REQUEST_METHOD': 'PUT'}, body=good_data)
|
||||||
self.slo.handle_multipart_put(req)
|
self.slo.handle_multipart_put(req, fake_start_response)
|
||||||
self.assertEquals(self.app.calls, 2)
|
self.assertEquals(self.app.calls, 3)
|
||||||
self.assert_(req.environ['CONTENT_TYPE'].endswith(';swift_bytes=3'))
|
self.assert_(req.environ['CONTENT_TYPE'].endswith(';swift_bytes=3'))
|
||||||
manifest_data = json.loads(req.environ['wsgi.input'].read())
|
manifest_data = json.loads(req.environ['wsgi.input'].read())
|
||||||
self.assertEquals(len(manifest_data), 2)
|
self.assertEquals(len(manifest_data), 2)
|
||||||
@@ -336,7 +345,7 @@ class TestStaticLargeObject(unittest.TestCase):
|
|||||||
headers={'Accept': 'application/json'},
|
headers={'Accept': 'application/json'},
|
||||||
body=bad_data)
|
body=bad_data)
|
||||||
try:
|
try:
|
||||||
self.slo.handle_multipart_put(req)
|
self.slo.handle_multipart_put(req, fake_start_response)
|
||||||
except HTTPException, e:
|
except HTTPException, e:
|
||||||
self.assertEquals(self.app.calls, 4)
|
self.assertEquals(self.app.calls, 4)
|
||||||
data = json.loads(e.body)
|
data = json.loads(e.body)
|
||||||
|
|||||||
Reference in New Issue
Block a user