mpu: refactor object-versioning part 2

Give ObjectContext some variables to avoid passing them around between
methods.

Break out/rename some methods for clarity.

Add an is_versioning_enabled helper function.

Change-Id: If901c4e8fe9e23a9f6515e4626c6aac04194c98b
Signed-off-by: Alistair Coles <alistairncoles@gmail.com>
This commit is contained in:
Alistair Coles
2025-08-13 15:48:09 +01:00
parent 13689f2d11
commit c5773a10c9
2 changed files with 176 additions and 170 deletions

View File

@@ -182,6 +182,11 @@ SYSMETA_PARENT_CONT = get_sys_meta_prefix('container') + 'parent-container'
SYSMETA_VERSIONS_SYMLINK = get_sys_meta_prefix('object') + 'versions-symlink'
def is_versioning_enabled(container_info):
return config_true_value(container_info.get(
'sysmeta', {}).get('versions-enabled'))
def build_versions_container_name(container_name):
"""
Get the name of the versions container for given ``container_name``.
@@ -312,6 +317,29 @@ class ObjectVersioningContext(WSGIContext):
class ObjectContext(ObjectVersioningContext):
def __init__(self, wsgi_app, logger, api_version, account,
container, obj, versions_cont, is_enabled):
"""
Note that account, container, obj should be unquoted by caller
if the url path is under url encoding (e.g. %FF)
:param wsgi_app: WSGI application
:param logger: logger object
:param api_version: should be v1 unless swift bumps api version
:param account: account name string
:param container: container name string
:param obj: object name string
:param versions_cont: container holding versions of the requested obj
:param is_enabled: is versioning currently enabled
"""
super().__init__(wsgi_app, logger)
self.api_version = api_version
self.account = account
self.container = container
self.obj = obj
self.versions_cont = versions_cont
self.is_enabled = is_enabled
def get_version(self, req):
"""
Get version to use for a client request.
@@ -361,14 +389,13 @@ class ObjectContext(ObjectVersioningContext):
return source_resp
def _put_versioned_obj(self, req, put_path_info, source_resp):
# Create a new Request object to PUT to the versions container, copying
# all headers from the source object apart from x-timestamp.
# Create a new Request object to PUT to the versions container.
headers = {'X-Backend-Allow-Reserved-Names': 'true'}
headers.update(source_resp.headers)
put_req = make_pre_authed_request(
req.environ, path=wsgi_quote(put_path_info), method='PUT',
headers={'X-Backend-Allow-Reserved-Names': 'true'},
swift_source='OV')
copy_header_subset(source_resp, put_req,
lambda k: k.lower() != 'x-timestamp')
headers=headers, swift_source='OV')
put_req.environ['wsgi.input'] = FileLikeIter(source_resp.app_iter)
slo_size = put_req.headers.get('X-Object-Sysmeta-Slo-Size')
if slo_size:
@@ -380,12 +407,10 @@ class ObjectContext(ObjectVersioningContext):
close_if_possible(source_resp.app_iter)
return put_resp
def _put_versioned_obj_from_client(self, req, versions_cont, api_version,
account_name, object_name):
version = self.get_version(req)
vers_obj_name = build_versions_object_name(object_name, version)
def _put_versioned_obj_from_client(self, req, version):
vers_obj_name = build_versions_object_name(self.obj, version)
put_path_info = "/%s/%s/%s/%s" % (
api_version, account_name, versions_cont, vers_obj_name)
self.api_version, self.account, self.versions_cont, vers_obj_name)
# Consciously *do not* set swift_source here -- this req is in charge
# of reading bytes from the client, don't let it look like that data
# movement is due to some internal-to-swift thing
@@ -435,10 +460,8 @@ class ObjectContext(ObjectVersioningContext):
return (put_resp, vers_obj_name, put_bytes, put_content_type)
def _put_symlink_to_version(self, req, versions_cont, put_vers_obj_name,
api_version, account_name, object_name,
put_etag, put_bytes, put_content_type):
def _put_symlink_to_version(self, req, put_vers_obj_name, put_etag,
put_bytes, put_content_type):
req.method = 'PUT'
# inch x-timestamp forward, just in case
req.ensure_x_timestamp()
@@ -449,7 +472,7 @@ class ObjectContext(ObjectVersioningContext):
# N.B. in stack mode DELETE we use content_type from listing
req.headers['Content-Type'] = put_content_type
req.headers[TGT_OBJ_SYMLINK_HDR] = wsgi_quote('%s/%s' % (
versions_cont, put_vers_obj_name))
self.versions_cont, put_vers_obj_name))
req.headers[SYSMETA_VERSIONS_SYMLINK] = 'true'
req.headers[SYMLOOP_EXTEND] = 'true'
req.headers[ALLOW_RESERVED_NAMES] = 'true'
@@ -488,21 +511,15 @@ class ObjectContext(ObjectVersioningContext):
# could not version the data, bail
raise HTTPServiceUnavailable(request=req)
def _copy_current(self, req, versions_cont, api_version, account_name,
object_name):
'''
def _copy_current(self, req):
"""
Check if the current version of the object is a versions-symlink
if not, it's because this object was added to the container when
versioning was not enabled. We'll need to copy it into the versions
containers now.
:param req: original request.
:param versions_cont: container where previous versions of the object
are stored.
:param api_version: api version.
:param account_name: account name.
:param object_name: name of object of original request
'''
"""
# validate the write access to the versioned container before
# making any backend requests
if 'swift.authorize' in req.environ:
@@ -518,7 +535,7 @@ class ObjectContext(ObjectVersioningContext):
if get_resp.status_int == HTTP_NOT_FOUND:
# nothing to version, proceed with original request
drain_and_close(get_resp)
return get_resp
return
# check for any other errors
self._check_response_error(req, get_resp)
@@ -526,15 +543,15 @@ class ObjectContext(ObjectVersioningContext):
if get_resp.headers.get(SYSMETA_VERSIONS_SYMLINK) == 'true':
# existing object is a VW symlink; no action required
drain_and_close(get_resp)
return get_resp
return
# if there's an existing object, then copy it to
# X-Versions-Location
version = self.get_null_version(get_resp)
vers_obj_name = build_versions_object_name(object_name, version)
get_resp.headers.pop('x-timestamp', None)
vers_obj_name = build_versions_object_name(self.obj, version)
put_path_info = "/%s/%s/%s/%s" % (
api_version, account_name, versions_cont, vers_obj_name)
self.api_version, self.account, self.versions_cont, vers_obj_name)
put_resp = self._put_versioned_obj(req, put_path_info, get_resp)
if put_resp.status_int == HTTP_NOT_FOUND:
@@ -545,8 +562,7 @@ class ObjectContext(ObjectVersioningContext):
self._check_response_error(req, put_resp)
def handle_put(self, req, versions_cont, api_version,
account_name, object_name, is_enabled):
def handle_put(self, req):
"""
Check if the current version of the object is a versions-symlink
if not, it's because this object was added to the container when
@@ -557,38 +573,26 @@ class ObjectContext(ObjectVersioningContext):
and add a static symlink in the versioned container.
:param req: original request.
:param versions_cont: container where previous versions of the object
are stored.
:param api_version: api version.
:param account_name: account name.
:param object_name: name of object of original request
"""
# handle object request for a disabled versioned container.
if not is_enabled:
if not self.is_enabled:
return req.get_response(self.app)
# attempt to copy current object to versions container
self._copy_current(req, versions_cont, api_version, account_name,
object_name)
self._copy_current(req)
# write client's put directly to versioned container
req.ensure_x_timestamp()
version = self.get_version(req)
put_resp, put_vers_obj_name, put_bytes, put_content_type = \
self._put_versioned_obj_from_client(req, versions_cont,
api_version, account_name,
object_name)
self._put_versioned_obj_from_client(req, version)
# and add an static symlink to original container
target_etag = put_resp.headers['Etag']
return self._put_symlink_to_version(req, versions_cont,
put_vers_obj_name, api_version,
account_name, object_name,
target_etag, put_bytes,
put_content_type)
return self._put_symlink_to_version(
req, put_vers_obj_name, target_etag, put_bytes, put_content_type)
def handle_delete(self, req, versions_cont, api_version,
account_name, container_name,
object_name, is_enabled):
def handle_delete(self, req):
"""
Handle DELETE requests.
@@ -596,24 +600,18 @@ class ObjectContext(ObjectVersioningContext):
delete marker before proceeding with original request.
:param req: original request.
:param versions_cont: container where previous versions of the object
are stored.
:param api_version: api version.
:param account_name: account name.
:param object_name: name of object of original request
"""
# handle object request for a disabled versioned container.
if not is_enabled:
if not self.is_enabled:
return req.get_response(self.app)
self._copy_current(req, versions_cont, api_version,
account_name, object_name)
self._copy_current(req)
req.ensure_x_timestamp()
version = self.get_version(req)
marker_name = build_versions_object_name(object_name, version)
marker_name = build_versions_object_name(self.obj, version)
marker_path = "/%s/%s/%s/%s" % (
api_version, account_name, versions_cont, marker_name)
self.api_version, self.account, self.versions_cont, marker_name)
marker_headers = {
# Definitive source of truth is Content-Type, and since we add
# a swift_* param, we know users haven't set it themselves.
@@ -641,18 +639,15 @@ class ObjectContext(ObjectVersioningContext):
drain_and_close(resp)
return resp
def handle_post(self, req, versions_cont, account):
'''
def handle_post(self, req):
"""
Handle a POST request to an object in a versioned container.
If the response is a 307 because the POST went to a symlink,
follow the symlink and send the request to the versioned object
:param req: original request.
:param versions_cont: container where previous versions of the object
are stored.
:param account: account name.
'''
"""
# create eventual post request before
# encryption middleware changes the request headers
post_req = make_pre_authed_request(
@@ -671,7 +666,7 @@ class ObjectContext(ObjectVersioningContext):
# Only follow if the version container matches
if split_path(loc, 4, 4, True)[1:3] == [
account, versions_cont]:
self.account, self.versions_cont]:
drain_and_close(resp)
post_req.path_info = loc
resp = post_req.get_response(self.app)
@@ -700,32 +695,36 @@ class ObjectContext(ObjectVersioningContext):
drain_and_close(hresp)
return head_is_tombstone, symlink_target
def handle_delete_version(self, req, versions_cont, api_version,
account_name, container_name,
object_name, is_enabled, version):
def handle_delete_with_version_id(self, req, version):
"""
Handle a DELETE?version_id request.
:param req: original request.
:param version: version to delete.
"""
if version == 'null':
# let the request go directly through to the is_latest link
return
return req.get_response(self.app)
auth_token_header = {'X-Auth-Token': req.headers.get('X-Auth-Token')}
head_is_tombstone, symlink_target = self._check_head(
req, auth_token_header)
versions_obj = build_versions_object_name(object_name, version)
req_obj_path = '%s/%s' % (versions_cont, versions_obj)
versions_obj = build_versions_object_name(self.obj, version)
req_obj_path = '%s/%s' % (self.versions_cont, versions_obj)
if head_is_tombstone or not symlink_target or (
wsgi_unquote(symlink_target) != wsgi_unquote(req_obj_path)):
# If there's no current version (i.e., tombstone or unversioned
# object) or if current version links to another version, then
# just delete the version requested to be deleted
req.path_info = "/%s/%s/%s/%s" % (
api_version, account_name, versions_cont, versions_obj)
self.api_version, self.account, self.versions_cont,
versions_obj)
req.headers['X-Backend-Allow-Reserved-Names'] = 'true'
if head_is_tombstone or not symlink_target:
resp_version_id = 'null'
else:
_, vers_obj_name = wsgi_unquote(symlink_target).split('/', 1)
resp_version_id = parse_versions_object_name(
vers_obj_name)[1]
resp_version_id = parse_versions_object_name(vers_obj_name)[1]
else:
# if version-id is the latest version, delete the link too
# First, kill the link...
@@ -736,7 +735,8 @@ class ObjectContext(ObjectVersioningContext):
# *then* the backing data
req.path_info = "/%s/%s/%s/%s" % (
api_version, account_name, versions_cont, versions_obj)
self.api_version, self.account, self.versions_cont,
versions_obj)
req.headers['X-Backend-Allow-Reserved-Names'] = 'true'
resp_version_id = 'null'
resp = req.get_response(self.app)
@@ -744,11 +744,13 @@ class ObjectContext(ObjectVersioningContext):
resp.headers['X-Object-Current-Version-Id'] = resp_version_id
return resp
def handle_put_version(self, req, versions_cont, api_version, account_name,
container, object_name, is_enabled, version):
def handle_put_with_version_id(self, req, version):
"""
Handle a PUT?version-id request and create/update the is_latest link to
point to the specific version. Expects a valid 'version' id.
:param req: original request.
:param version: version to make the latest.
"""
if req.is_chunked:
has_body = (req.body_file.read(1) != b'')
@@ -761,9 +763,10 @@ class ObjectContext(ObjectVersioningContext):
body='PUT version-id requests require a zero byte body',
request=req,
content_type='text/plain')
versions_obj_name = build_versions_object_name(object_name, version)
versions_obj_name = build_versions_object_name(self.obj, version)
versioned_obj_path = "/%s/%s/%s/%s" % (
api_version, account_name, versions_cont, versions_obj_name)
self.api_version, self.account, self.versions_cont,
versions_obj_name)
obj_head_headers = {'X-Backend-Allow-Reserved-Names': 'true'}
head_req = make_pre_authed_request(
req.environ, path=wsgi_quote(versioned_obj_path) + '?symlink=get',
@@ -789,12 +792,52 @@ class ObjectContext(ObjectVersioningContext):
put_bytes = head_resp.content_length
put_content_type = head_resp.headers['Content-Type']
resp = self._put_symlink_to_version(
req, versions_cont, versions_obj_name, api_version, account_name,
object_name, put_etag, put_bytes, put_content_type)
req, versions_obj_name, put_etag, put_bytes, put_content_type)
return resp
def handle_versioned_request(self, req, versions_cont, api_version,
account, container, obj, is_enabled, version):
def handle_get_head_with_version_id(self, req, version):
# Re-write the path; most everything else goes through normally
req.path_info = "/%s/%s/%s/%s" % (
self.api_version, self.account, self.versions_cont,
build_versions_object_name(self.obj, version))
req.headers['X-Backend-Allow-Reserved-Names'] = 'true'
resp = req.get_response(self.app)
if resp.is_success:
resp.headers['X-Object-Version-Id'] = version
# Well, except for some delete marker business...
is_del_marker = DELETE_MARKER_CONTENT_TYPE == resp.headers.get(
'X-Backend-Content-Type', resp.headers['Content-Type'])
if req.method == 'HEAD':
drain_and_close(resp)
if is_del_marker:
hdrs = {'X-Object-Version-Id': version,
'Content-Type': DELETE_MARKER_CONTENT_TYPE}
raise HTTPNotFound(request=req, headers=hdrs)
return resp
def hande_get_head_with_null_version_id(self, req):
resp = req.get_response(self.app)
location = wsgi_unquote(resp.headers.get('Content-Location', ''))
is_version_link = get_reserved_name('versions', '') in location
if resp.is_success and not is_version_link:
# this is the de-facto null version
resp.headers['X-Object-Version-Id'] = 'null'
if req.method == 'HEAD':
drain_and_close(resp)
return resp
elif is_version_link:
# Have a latest version, but it's got a real version-id.
# Since the user specifically asked for null, return 404
close_if_possible(resp.app_iter)
raise HTTPNotFound(request=req)
else:
return resp
def handle_request_with_version_id(self, req, version):
"""
Handle 'version-id' request for object resource. When a request
contains a ``version-id=<id>`` parameter, the request is acted upon
@@ -808,12 +851,6 @@ class ObjectContext(ObjectVersioningContext):
the contents of the versioned object.
:param req: The original request
:param versions_cont: container holding versions of the requested obj
:param api_version: should be v1 unless swift bumps api version
:param account: account name string
:param container: container name string
:param object: object name string
:param is_enabled: is versioning currently enabled
:param version: version of the object to act on
"""
# ?version-id requests are allowed for GET, HEAD, DELETE reqs
@@ -821,7 +858,7 @@ class ObjectContext(ObjectVersioningContext):
raise HTTPBadRequest(
'%s to a specific version is not allowed' % req.method,
request=req)
elif not versions_cont and version != 'null':
elif not self.versions_cont and version != 'null':
raise HTTPBadRequest(
'version-aware operations require that the container is '
'versioned', request=req)
@@ -829,65 +866,33 @@ class ObjectContext(ObjectVersioningContext):
try:
validate_version(version)
except ValueError:
raise HTTPBadRequest('Invalid version parameter', request=req)
raise HTTPBadRequest('Invalid version parameter',
request=req)
if req.method == 'DELETE':
return self.handle_delete_version(
req, versions_cont, api_version, account,
container, obj, is_enabled, version)
return self.handle_delete_with_version_id(req, version)
elif req.method == 'PUT':
return self.handle_put_version(
req, versions_cont, api_version, account,
container, obj, is_enabled, version)
return self.handle_put_with_version_id(req, version)
if version == 'null':
resp = req.get_response(self.app)
if resp.is_success:
if get_reserved_name('versions', '') in wsgi_unquote(
resp.headers.get('Content-Location', '')):
# Have a latest version, but it's got a real version-id.
# Since the user specifically asked for null, return 404
close_if_possible(resp.app_iter)
raise HTTPNotFound(request=req)
resp.headers['X-Object-Version-Id'] = 'null'
if req.method == 'HEAD':
drain_and_close(resp)
return resp
else:
# Re-write the path; most everything else goes through normally
req.path_info = "/%s/%s/%s/%s" % (
api_version, account, versions_cont,
build_versions_object_name(obj, version))
req.headers['X-Backend-Allow-Reserved-Names'] = 'true'
# try the user namespace container first...
resp = self.hande_get_head_with_null_version_id(req)
if resp:
return resp
return self.handle_get_head_with_version_id(req, version)
resp = req.get_response(self.app)
if resp.is_success:
resp.headers['X-Object-Version-Id'] = version
def handle_request_without_version_id(self, req):
"""
Handle request for an object resource that may require a new version to
be created.
# Well, except for some delete marker business...
is_del_marker = DELETE_MARKER_CONTENT_TYPE == resp.headers.get(
'X-Backend-Content-Type', resp.headers['Content-Type'])
if req.method == 'HEAD':
drain_and_close(resp)
if is_del_marker:
hdrs = {'X-Object-Version-Id': version,
'Content-Type': DELETE_MARKER_CONTENT_TYPE}
raise HTTPNotFound(request=req, headers=hdrs)
return resp
def handle_request(self, req, versions_cont, api_version, account,
container, obj, is_enabled):
:param req: original request.
"""
if req.method == 'PUT':
return self.handle_put(
req, versions_cont, api_version, account, obj,
is_enabled)
return self.handle_put(req)
elif req.method == 'POST':
return self.handle_post(req, versions_cont, account)
return self.handle_post(req)
elif req.method == 'DELETE':
return self.handle_delete(
req, versions_cont, api_version, account,
container, obj, is_enabled)
return self.handle_delete(req)
# GET/HEAD/OPTIONS
resp = req.get_response(self.app)
@@ -897,26 +902,42 @@ class ObjectContext(ObjectVersioningContext):
loc = wsgi_unquote(resp.headers.get('Content-Location', ''))
if loc:
_, acct, cont, version_obj = split_path(loc, 4, 4, True)
if acct == account and cont == versions_cont:
if acct == self.account and cont == self.versions_cont:
_, version = parse_versions_object_name(version_obj)
if version is not None:
resp.headers['X-Object-Version-Id'] = version
content_loc = wsgi_quote('/%s/%s/%s/%s' % (
api_version, account, container, obj,
)) + '?version-id=%s' % (version,)
self.api_version, self.account, self.container,
self.obj)) + '?version-id=%s' % (version,)
resp.headers['Content-Location'] = content_loc
symlink_target = wsgi_unquote(resp.headers.get('X-Symlink-Target', ''))
if symlink_target:
cont, version_obj = split_path('/%s' % symlink_target, 2, 2, True)
if cont == versions_cont:
if cont == self.versions_cont:
_, version = parse_versions_object_name(version_obj)
if version is not None:
resp.headers['X-Object-Version-Id'] = version
symlink_target = wsgi_quote('%s/%s' % (container, obj)) + \
symlink_target = wsgi_quote(
'%s/%s' % (self.container, self.obj)) + \
'?version-id=%s' % (version,)
resp.headers['X-Symlink-Target'] = symlink_target
return resp
def handle_request(self, req):
"""
Handle request for an object resource.
:param req: swift.common.swob.Request instance
"""
version_id = req.params.get('version-id')
if version_id:
return self.handle_request_with_version_id(req, version_id)
elif self.versions_cont:
# handle object request for a versioned container
return self.handle_request_without_version_id(req)
else:
return self.app
class ContainerContext(ObjectVersioningContext):
def handle_request(self, req, start_response):
@@ -1503,35 +1524,19 @@ class ObjectVersioningMiddleware(object):
:param container: container name string
:param object: object name string
"""
resp = None
container_info = get_container_info(
req.environ, self.app, swift_source='OV')
versions_cont = container_info.get(
'sysmeta', {}).get('versions-container', '')
is_enabled = config_true_value(container_info.get(
'sysmeta', {}).get('versions-enabled'))
if versions_cont:
versions_cont = wsgi_unquote(str_to_wsgi(
versions_cont)).split('/')[0]
if req.params.get('version-id'):
vw_ctx = ObjectContext(self.app, self.logger)
resp = vw_ctx.handle_versioned_request(
req, versions_cont, api_version, account, container, obj,
is_enabled, req.params['version-id'])
elif versions_cont:
# handle object request for a enabled versioned container
vw_ctx = ObjectContext(self.app, self.logger)
resp = vw_ctx.handle_request(
req, versions_cont, api_version, account, container, obj,
is_enabled)
if resp:
return resp
else:
return self.app
is_enabled = is_versioning_enabled(container_info)
object_ctx = ObjectContext(
self.app, self.logger, api_version, account, container, obj,
versions_cont, is_enabled)
return object_ctx.handle_request(req)
def __call__(self, env, start_response):
req = Request(env)

View File

@@ -3621,7 +3621,8 @@ class TestModuleFunctions(unittest.TestCase):
class TestObjectContext(unittest.TestCase):
def setUp(self):
app = FakeSwift()
self.obj_context = object_versioning.ObjectContext(app, app.logger)
self.obj_context = object_versioning.ObjectContext(
app, app.logger, 'v1', 'c', 'a', 'o', None, False)
self.ts_iter = make_timestamp_iter()
def test_get_version(self):