object versioning
Object versioning in swift is implemented by setting a flag on the container to tell swift to version all objects in the container. The flag is the ``X-Versions-Location`` header on the container, and its value is the container where the versions are stored. When data is ``PUT`` into a versioned container (a container with the versioning flag turned on), the existing data in the file is redirected to a new object and the data in the ``PUT`` request is saved as the data for the versioned object. The new object name (for the previous version) is ``<versions_container>/<object_name>/<timestamp>``, where the timestamp is generated by converting the ``Last-Modified`` header value of the current version to a unix timestamp. A ``GET`` to a versioned object will return the current version of the object without having to do any request redirects or metadata lookups. Change-Id: I4fcd723145e02bbb2ec1d3ad356713f5dea43b8b
This commit is contained in:
@@ -59,7 +59,8 @@ from swift.common.constraints import check_metadata, check_object_creation, \
|
||||
check_utf8, CONTAINER_LISTING_LIMIT, MAX_ACCOUNT_NAME_LENGTH, \
|
||||
MAX_CONTAINER_NAME_LENGTH, MAX_FILE_SIZE
|
||||
from swift.common.exceptions import ChunkReadTimeout, \
|
||||
ChunkWriteTimeout, ConnectionTimeout
|
||||
ChunkWriteTimeout, ConnectionTimeout, ListingIterNotFound, \
|
||||
ListingIterNotAuthorized, ListingIterError
|
||||
|
||||
|
||||
def update_headers(response, headers):
|
||||
@@ -487,17 +488,20 @@ class Controller(object):
|
||||
read_acl = cache_value['read_acl']
|
||||
write_acl = cache_value['write_acl']
|
||||
sync_key = cache_value.get('sync_key')
|
||||
versions = cache_value.get('versions')
|
||||
if status == 200:
|
||||
return partition, nodes, read_acl, write_acl, sync_key
|
||||
return partition, nodes, read_acl, write_acl, sync_key, \
|
||||
versions
|
||||
elif status == 404:
|
||||
return None, None, None, None, None
|
||||
return None, None, None, None, None, None
|
||||
if not self.account_info(account, autocreate=account_autocreate)[1]:
|
||||
return None, None, None, None, None
|
||||
return None, None, None, None, None, None
|
||||
result_code = 0
|
||||
read_acl = None
|
||||
write_acl = None
|
||||
sync_key = None
|
||||
container_size = None
|
||||
versions = None
|
||||
attempts_left = self.app.container_ring.replica_count
|
||||
headers = {'x-trans-id': self.trans_id, 'Connection': 'close'}
|
||||
for node in self.iter_nodes(partition, nodes, self.app.container_ring):
|
||||
@@ -515,6 +519,7 @@ class Controller(object):
|
||||
sync_key = resp.getheader('x-container-sync-key')
|
||||
container_size = \
|
||||
resp.getheader('X-Container-Object-Count')
|
||||
versions = resp.getheader('x-versions-location')
|
||||
break
|
||||
elif resp.status == 404:
|
||||
if result_code == 0:
|
||||
@@ -542,11 +547,12 @@ class Controller(object):
|
||||
'read_acl': read_acl,
|
||||
'write_acl': write_acl,
|
||||
'sync_key': sync_key,
|
||||
'container_size': container_size},
|
||||
'container_size': container_size,
|
||||
'versions': versions},
|
||||
timeout=cache_timeout)
|
||||
if result_code == 200:
|
||||
return partition, nodes, read_acl, write_acl, sync_key
|
||||
return None, None, None, None, None
|
||||
return partition, nodes, read_acl, write_acl, sync_key, versions
|
||||
return None, None, None, None, None, None
|
||||
|
||||
def iter_nodes(self, partition, nodes, ring):
|
||||
"""
|
||||
@@ -788,9 +794,6 @@ class Controller(object):
|
||||
bodies.append('')
|
||||
possible_source.read()
|
||||
continue
|
||||
if (req.method == 'GET' and
|
||||
possible_source.status in (200, 206)) or \
|
||||
200 <= possible_source.status <= 399:
|
||||
if newest:
|
||||
if source:
|
||||
ts = float(source.getheader('x-put-timestamp') or
|
||||
@@ -860,11 +863,42 @@ class ObjectController(Controller):
|
||||
self.container_name = unquote(container_name)
|
||||
self.object_name = unquote(object_name)
|
||||
|
||||
def _listing_iter(self, lcontainer, lprefix, env):
|
||||
lpartition, lnodes = self.app.container_ring.get_nodes(
|
||||
self.account_name, lcontainer)
|
||||
marker = ''
|
||||
while True:
|
||||
path = '/%s/%s' % (quote(self.account_name), quote(lcontainer))
|
||||
lreq = Request.blank(
|
||||
'%s?prefix=%s&format=json&marker=%s' %
|
||||
(path, quote(lprefix), quote(marker)), environ=env)
|
||||
lreq.environ['REQUEST_METHOD'] = 'GET'
|
||||
lreq.path_info = path
|
||||
shuffle(lnodes)
|
||||
lresp = self.GETorHEAD_base(lreq, _('Container'),
|
||||
lpartition, lnodes, lreq.path_info,
|
||||
self.app.container_ring.replica_count)
|
||||
if lresp.status_int == 404:
|
||||
raise ListingIterNotFound()
|
||||
elif lresp.status_int // 100 != 2:
|
||||
raise ListingIterError()
|
||||
if 'swift.authorize' in env:
|
||||
lreq.acl = lresp.headers.get('x-container-read')
|
||||
aresp = env['swift.authorize'](lreq)
|
||||
if aresp:
|
||||
raise ListingIterNotAuthorized(aresp)
|
||||
sublisting = json.loads(lresp.body)
|
||||
if not sublisting:
|
||||
break
|
||||
marker = sublisting[-1]['name']
|
||||
for obj in sublisting:
|
||||
yield obj
|
||||
|
||||
def GETorHEAD(self, req):
|
||||
"""Handle HTTP GET or HEAD requests."""
|
||||
_junk, _junk, req.acl, _junk, _junk, object_versions = \
|
||||
self.container_info(self.account_name, self.container_name)
|
||||
if 'swift.authorize' in req.environ:
|
||||
req.acl = \
|
||||
self.container_info(self.account_name, self.container_name)[2]
|
||||
aresp = req.environ['swift.authorize'](req)
|
||||
if aresp:
|
||||
return aresp
|
||||
@@ -874,9 +908,10 @@ class ObjectController(Controller):
|
||||
resp = self.GETorHEAD_base(req, _('Object'), partition,
|
||||
self.iter_nodes(partition, nodes, self.app.object_ring),
|
||||
req.path_info, self.app.object_ring.replica_count)
|
||||
|
||||
# If we get a 416 Requested Range Not Satisfiable we have to check if
|
||||
# we were actually requesting a manifest object and then redo the range
|
||||
# request on the whole object.
|
||||
# we were actually requesting a manifest and then redo
|
||||
# the range request on the whole object.
|
||||
if resp.status_int == 416:
|
||||
req_range = req.range
|
||||
req.range = None
|
||||
@@ -891,68 +926,17 @@ class ObjectController(Controller):
|
||||
if 'x-object-manifest' in resp.headers:
|
||||
lcontainer, lprefix = \
|
||||
resp.headers['x-object-manifest'].split('/', 1)
|
||||
lpartition, lnodes = self.app.container_ring.get_nodes(
|
||||
self.account_name, lcontainer)
|
||||
marker = ''
|
||||
listing = []
|
||||
while True:
|
||||
lreq = Request.blank('/%s/%s?prefix=%s&format=json&marker=%s' %
|
||||
(quote(self.account_name), quote(lcontainer),
|
||||
quote(lprefix), quote(marker)))
|
||||
shuffle(lnodes)
|
||||
lresp = self.GETorHEAD_base(lreq, _('Container'), lpartition,
|
||||
lnodes, lreq.path_info,
|
||||
self.app.container_ring.replica_count)
|
||||
if lresp.status_int // 100 != 2:
|
||||
lresp = HTTPNotFound(request=req)
|
||||
lresp.headers['X-Object-Manifest'] = \
|
||||
resp.headers['x-object-manifest']
|
||||
return lresp
|
||||
if 'swift.authorize' in req.environ:
|
||||
req.acl = lresp.headers.get('x-container-read')
|
||||
aresp = req.environ['swift.authorize'](req)
|
||||
if aresp:
|
||||
return aresp
|
||||
sublisting = json.loads(lresp.body)
|
||||
if not sublisting:
|
||||
break
|
||||
listing.extend(sublisting)
|
||||
if len(listing) > CONTAINER_LISTING_LIMIT:
|
||||
break
|
||||
marker = sublisting[-1]['name']
|
||||
try:
|
||||
listing = list(self._listing_iter(lcontainer, lprefix,
|
||||
req.environ))
|
||||
except ListingIterNotFound:
|
||||
return HTTPNotFound(request=req)
|
||||
except ListingIterNotAuthorized, err:
|
||||
return err.aresp
|
||||
except ListingIterError:
|
||||
return HTTPServerError(request=req)
|
||||
|
||||
if len(listing) > CONTAINER_LISTING_LIMIT:
|
||||
# We will serve large objects with a ton of segments with
|
||||
# chunked transfer encoding.
|
||||
|
||||
def listing_iter():
|
||||
marker = ''
|
||||
while True:
|
||||
lreq = Request.blank(
|
||||
'/%s/%s?prefix=%s&format=json&marker=%s' %
|
||||
(quote(self.account_name), quote(lcontainer),
|
||||
quote(lprefix), quote(marker)))
|
||||
lresp = self.GETorHEAD_base(lreq, _('Container'),
|
||||
lpartition, lnodes, lreq.path_info,
|
||||
self.app.container_ring.replica_count)
|
||||
if lresp.status_int // 100 != 2:
|
||||
raise Exception(_('Object manifest GET could not '
|
||||
'continue listing: %s %s') %
|
||||
(req.path, lreq.path))
|
||||
if 'swift.authorize' in req.environ:
|
||||
req.acl = lresp.headers.get('x-container-read')
|
||||
aresp = req.environ['swift.authorize'](req)
|
||||
if aresp:
|
||||
raise Exception(_('Object manifest GET could '
|
||||
'not continue listing: %s %s') %
|
||||
(req.path, aresp))
|
||||
sublisting = json.loads(lresp.body)
|
||||
if not sublisting:
|
||||
break
|
||||
for obj in sublisting:
|
||||
yield obj
|
||||
marker = sublisting[-1]['name']
|
||||
|
||||
resp = Response(headers=resp.headers, request=req,
|
||||
conditional_response=True)
|
||||
if req.method == 'HEAD':
|
||||
@@ -971,7 +955,8 @@ class ObjectController(Controller):
|
||||
return head_response
|
||||
else:
|
||||
resp.app_iter = SegmentedIterable(self, lcontainer,
|
||||
listing_iter(), resp)
|
||||
self._listing_iter(lcontainer, lprefix, req.environ),
|
||||
resp)
|
||||
|
||||
else:
|
||||
# For objects with a reasonable number of segments, we'll serve
|
||||
@@ -1030,6 +1015,7 @@ class ObjectController(Controller):
|
||||
req.headers['X-Copy-From'] = quote('/%s/%s' % (self.container_name,
|
||||
self.object_name))
|
||||
req.headers['X-Fresh-Metadata'] = 'true'
|
||||
req.environ['swift_versioned_copy'] = True
|
||||
resp = self.PUT(req)
|
||||
# Older editions returned 202 Accepted on object POSTs, so we'll
|
||||
# convert any 201 Created responses to that for compatibility with
|
||||
@@ -1041,7 +1027,7 @@ class ObjectController(Controller):
|
||||
error_response = check_metadata(req, 'object')
|
||||
if error_response:
|
||||
return error_response
|
||||
container_partition, containers, _junk, req.acl, _junk = \
|
||||
container_partition, containers, _junk, req.acl, _junk, _junk = \
|
||||
self.container_info(self.account_name, self.container_name,
|
||||
account_autocreate=self.app.account_autocreate)
|
||||
if 'swift.authorize' in req.environ:
|
||||
@@ -1124,7 +1110,7 @@ class ObjectController(Controller):
|
||||
def PUT(self, req):
|
||||
"""HTTP PUT request handler."""
|
||||
(container_partition, containers, _junk, req.acl,
|
||||
req.environ['swift_sync_key']) = \
|
||||
req.environ['swift_sync_key'], object_versions) = \
|
||||
self.container_info(self.account_name, self.container_name,
|
||||
account_autocreate=self.app.account_autocreate)
|
||||
if 'swift.authorize' in req.environ:
|
||||
@@ -1160,19 +1146,20 @@ class ObjectController(Controller):
|
||||
delete_at_part = delete_at_nodes = None
|
||||
partition, nodes = self.app.object_ring.get_nodes(
|
||||
self.account_name, self.container_name, self.object_name)
|
||||
# do a HEAD request for container sync and checking object versions
|
||||
if 'x-timestamp' in req.headers or (object_versions and not
|
||||
req.environ.get('swift_versioned_copy')):
|
||||
hreq = Request.blank(req.path_info, headers={'X-Newest': 'True'},
|
||||
environ={'REQUEST_METHOD': 'HEAD'})
|
||||
hresp = self.GETorHEAD_base(hreq, _('Object'), partition, nodes,
|
||||
hreq.path_info, self.app.object_ring.replica_count)
|
||||
# Used by container sync feature
|
||||
if 'x-timestamp' in req.headers:
|
||||
try:
|
||||
req.headers['X-Timestamp'] = \
|
||||
normalize_timestamp(float(req.headers['x-timestamp']))
|
||||
# For container sync PUTs, do a HEAD to see if we can
|
||||
# shortcircuit
|
||||
hreq = Request.blank(req.path_info,
|
||||
environ={'REQUEST_METHOD': 'HEAD'})
|
||||
self.GETorHEAD_base(hreq, _('Object'), partition, nodes,
|
||||
hreq.path_info, self.app.object_ring.replica_count)
|
||||
if 'swift_x_timestamp' in hreq.environ and \
|
||||
float(hreq.environ['swift_x_timestamp']) >= \
|
||||
if 'swift_x_timestamp' in hresp.environ and \
|
||||
float(hresp.environ['swift_x_timestamp']) >= \
|
||||
float(req.headers['x-timestamp']):
|
||||
return HTTPAccepted(request=req)
|
||||
except ValueError:
|
||||
@@ -1191,6 +1178,39 @@ class ObjectController(Controller):
|
||||
error_response = check_object_creation(req, self.object_name)
|
||||
if error_response:
|
||||
return error_response
|
||||
if object_versions and not req.environ.get('swift_versioned_copy'):
|
||||
is_manifest = 'x-object-manifest' in req.headers or \
|
||||
'x-object-manifest' in hresp.headers
|
||||
if hresp.status_int != 404 and not is_manifest:
|
||||
# This is a version manifest and needs to be handled
|
||||
# differently. First copy the existing data to a new object,
|
||||
# then write the data from this request to the version manifest
|
||||
# object.
|
||||
lcontainer = object_versions.split('/')[0]
|
||||
prefix_len = '%03x' % len(self.object_name)
|
||||
lprefix = prefix_len + self.object_name + '/'
|
||||
ts_source = hresp.environ.get('swift_x_timestamp')
|
||||
if ts_source is None:
|
||||
ts_source = time.mktime(time.strptime(
|
||||
hresp.headers['last-modified'],
|
||||
'%a, %d %b %Y %H:%M:%S GMT'))
|
||||
new_ts = normalize_timestamp(ts_source)
|
||||
vers_obj_name = lprefix + new_ts
|
||||
copy_headers = {
|
||||
'Destination': '%s/%s' % (lcontainer, vers_obj_name)}
|
||||
copy_environ = {'REQUEST_METHOD': 'COPY',
|
||||
'swift_versioned_copy': True
|
||||
}
|
||||
copy_req = Request.blank(req.path_info, headers=copy_headers,
|
||||
environ=copy_environ)
|
||||
copy_resp = self.COPY(copy_req)
|
||||
if copy_resp.status_int // 100 == 4:
|
||||
# missing container or bad permissions
|
||||
return HTTPPreconditionFailed(request=req)
|
||||
elif copy_resp.status_int // 100 != 2:
|
||||
# could not copy the data, bail
|
||||
return HTTPServiceUnavailable(request=req)
|
||||
|
||||
reader = req.environ['wsgi.input'].read
|
||||
data_source = iter(lambda: reader(self.app.client_chunk_size), '')
|
||||
source_header = req.headers.get('X-Copy-From')
|
||||
@@ -1367,8 +1387,59 @@ class ObjectController(Controller):
|
||||
def DELETE(self, req):
|
||||
"""HTTP DELETE request handler."""
|
||||
(container_partition, containers, _junk, req.acl,
|
||||
req.environ['swift_sync_key']) = \
|
||||
req.environ['swift_sync_key'], object_versions) = \
|
||||
self.container_info(self.account_name, self.container_name)
|
||||
if object_versions:
|
||||
# this is a version manifest and needs to be handled differently
|
||||
lcontainer = object_versions.split('/')[0]
|
||||
prefix_len = '%03x' % len(self.object_name)
|
||||
lprefix = prefix_len + self.object_name + '/'
|
||||
try:
|
||||
raw_listing = self._listing_iter(lcontainer, lprefix,
|
||||
req.environ)
|
||||
except ListingIterNotFound:
|
||||
# set raw_listing so that the actual object is deleted
|
||||
raw_listing = []
|
||||
except ListingIterNotAuthorized, err:
|
||||
return err.aresp
|
||||
except ListingIterError:
|
||||
return HTTPServerError(request=req)
|
||||
last_item = None
|
||||
for item in raw_listing: # find the last item
|
||||
last_item = item
|
||||
if last_item:
|
||||
# there are older versions so copy the previous version to the
|
||||
# current object and delete the previous version
|
||||
orig_container = self.container_name
|
||||
orig_obj = self.object_name
|
||||
self.container_name = lcontainer
|
||||
self.object_name = last_item['name']
|
||||
copy_path = '/' + self.account_name + '/' + \
|
||||
self.container_name + '/' + self.object_name
|
||||
copy_headers = {'X-Newest': 'True',
|
||||
'Destination': orig_container + '/' + orig_obj
|
||||
}
|
||||
copy_environ = {'REQUEST_METHOD': 'COPY',
|
||||
'swift_versioned_copy': True
|
||||
}
|
||||
creq = Request.blank(copy_path, headers=copy_headers,
|
||||
environ=copy_environ)
|
||||
copy_resp = self.COPY(creq)
|
||||
if copy_resp.status_int // 100 == 4:
|
||||
# some user error, maybe permissions
|
||||
return HTTPPreconditionFailed(request=req)
|
||||
elif copy_resp.status_int // 100 != 2:
|
||||
# could not copy the data, bail
|
||||
return HTTPServiceUnavailable(request=req)
|
||||
# reset these because the COPY changed them
|
||||
self.container_name = lcontainer
|
||||
self.object_name = last_item['name']
|
||||
new_del_req = Request.blank(copy_path, environ=req.environ)
|
||||
(container_partition, containers,
|
||||
_junk, new_del_req.acl, _junk, _junk) = \
|
||||
self.container_info(self.account_name, self.container_name)
|
||||
new_del_req.path_info = copy_path
|
||||
req = new_del_req
|
||||
if 'swift.authorize' in req.environ:
|
||||
aresp = req.environ['swift.authorize'](req)
|
||||
if aresp:
|
||||
@@ -1435,7 +1506,8 @@ class ContainerController(Controller):
|
||||
|
||||
# Ensure these are all lowercase
|
||||
pass_through_headers = ['x-container-read', 'x-container-write',
|
||||
'x-container-sync-key', 'x-container-sync-to']
|
||||
'x-container-sync-key', 'x-container-sync-to',
|
||||
'x-versions-location']
|
||||
|
||||
def __init__(self, app, account_name, container_name, **kwargs):
|
||||
Controller.__init__(self, app)
|
||||
@@ -1473,7 +1545,8 @@ class ContainerController(Controller):
|
||||
'read_acl': resp.headers.get('x-container-read'),
|
||||
'write_acl': resp.headers.get('x-container-write'),
|
||||
'sync_key': resp.headers.get('x-container-sync-key'),
|
||||
'container_size': resp.headers.get('x-container-object-count')},
|
||||
'container_size': resp.headers.get('x-container-object-count'),
|
||||
'versions': resp.headers.get('x-versions-location')},
|
||||
timeout=self.app.recheck_container_existence)
|
||||
|
||||
if 'swift.authorize' in req.environ:
|
||||
|
||||
Reference in New Issue
Block a user