Merge "Switch Swift cloud layer to proxy" into feature/r1
This commit is contained in:
@@ -13,10 +13,7 @@
|
||||
# import types so that we can reference ListType in sphinx param declarations.
|
||||
# We can't just use list, because sphinx gets confused by
|
||||
# openstack.resource.Resource.list and openstack.resource2.Resource.list
|
||||
import collections
|
||||
import concurrent.futures
|
||||
import json
|
||||
import os
|
||||
import types # noqa
|
||||
import urllib.parse
|
||||
|
||||
@@ -26,8 +23,6 @@ from openstack.cloud import _normalize
|
||||
from openstack.cloud import _utils
|
||||
from openstack.cloud import exc
|
||||
from openstack import exceptions
|
||||
from openstack import proxy
|
||||
from openstack import utils
|
||||
|
||||
|
||||
DEFAULT_OBJECT_SEGMENT_SIZE = 1073741824 # 1GB
|
||||
@@ -149,12 +144,8 @@ class ObjectStoreCloudMixin(_normalize.Normalizer):
|
||||
:param dict headers:
|
||||
Key/Value headers to set on the container.
|
||||
"""
|
||||
# TODO(gtema): Decide on whether to deprecate this or change i/f to the
|
||||
# container metadata names
|
||||
exceptions.raise_from_response(
|
||||
self.object_store.post(
|
||||
self._get_object_endpoint(name), headers=headers)
|
||||
)
|
||||
self.object_store.set_container_metadata(
|
||||
name, refresh=False, **headers)
|
||||
|
||||
def set_container_access(self, name, access, refresh=False):
|
||||
"""Set the access control list on a container.
|
||||
@@ -202,40 +193,11 @@ class ObjectStoreCloudMixin(_normalize.Normalizer):
|
||||
The object-storage service publishes a set of capabilities that
|
||||
include metadata about maximum values and thresholds.
|
||||
"""
|
||||
# The endpoint in the catalog has version and project-id in it
|
||||
# To get capabilities, we have to disassemble and reassemble the URL
|
||||
# This logic is taken from swiftclient
|
||||
endpoint = urllib.parse.urlparse(self.object_store.get_endpoint())
|
||||
url = "{scheme}://{netloc}/info".format(
|
||||
scheme=endpoint.scheme, netloc=endpoint.netloc)
|
||||
|
||||
return proxy._json_response(self.object_store.get(url))
|
||||
return self.object_store.get_info()
|
||||
|
||||
def get_object_segment_size(self, segment_size):
|
||||
"""Get a segment size that will work given capabilities"""
|
||||
if segment_size is None:
|
||||
segment_size = DEFAULT_OBJECT_SEGMENT_SIZE
|
||||
min_segment_size = 0
|
||||
try:
|
||||
caps = self.get_object_capabilities()
|
||||
except exc.OpenStackCloudHTTPError as e:
|
||||
if e.response.status_code in (404, 412):
|
||||
server_max_file_size = DEFAULT_MAX_FILE_SIZE
|
||||
self.log.info(
|
||||
"Swift capabilities not supported. "
|
||||
"Using default max file size.")
|
||||
else:
|
||||
raise
|
||||
else:
|
||||
server_max_file_size = caps.get('swift', {}).get('max_file_size',
|
||||
0)
|
||||
min_segment_size = caps.get('slo', {}).get('min_segment_size', 0)
|
||||
|
||||
if segment_size > server_max_file_size:
|
||||
return server_max_file_size
|
||||
if segment_size < min_segment_size:
|
||||
return min_segment_size
|
||||
return segment_size
|
||||
return self.object_store.get_object_segment_size(segment_size)
|
||||
|
||||
def is_object_stale(
|
||||
self, container, name, filename, file_md5=None, file_sha256=None):
|
||||
@@ -251,35 +213,10 @@ class ObjectStoreCloudMixin(_normalize.Normalizer):
|
||||
Pre-calculated sha256 of the file contents. Defaults to None which
|
||||
means calculate locally.
|
||||
"""
|
||||
metadata = self.get_object_metadata(container, name)
|
||||
if not metadata:
|
||||
self.log.debug(
|
||||
"swift stale check, no object: {container}/{name}".format(
|
||||
container=container, name=name))
|
||||
return True
|
||||
|
||||
if not (file_md5 or file_sha256):
|
||||
(file_md5, file_sha256) = utils._get_file_hashes(filename)
|
||||
md5_key = metadata.get(
|
||||
self._OBJECT_MD5_KEY, metadata.get(self._SHADE_OBJECT_MD5_KEY, ''))
|
||||
sha256_key = metadata.get(
|
||||
self._OBJECT_SHA256_KEY, metadata.get(
|
||||
self._SHADE_OBJECT_SHA256_KEY, ''))
|
||||
up_to_date = utils._hashes_up_to_date(
|
||||
md5=file_md5, sha256=file_sha256,
|
||||
md5_key=md5_key, sha256_key=sha256_key)
|
||||
|
||||
if not up_to_date:
|
||||
self.log.debug(
|
||||
"swift checksum mismatch: "
|
||||
" %(filename)s!=%(container)s/%(name)s",
|
||||
{'filename': filename, 'container': container, 'name': name})
|
||||
return True
|
||||
|
||||
self.log.debug(
|
||||
"swift object up to date: %(container)s/%(name)s",
|
||||
{'container': container, 'name': name})
|
||||
return False
|
||||
return self.object_store.is_object_stale(
|
||||
container, name, filename,
|
||||
file_md5=file_md5, file_sha256=file_sha256
|
||||
)
|
||||
|
||||
def create_directory_marker_object(self, container, name, **headers):
|
||||
"""Create a zero-byte directory marker object
|
||||
@@ -349,217 +286,14 @@ class ObjectStoreCloudMixin(_normalize.Normalizer):
|
||||
|
||||
:raises: ``OpenStackCloudException`` on operation error.
|
||||
"""
|
||||
if data is not None and filename:
|
||||
raise ValueError(
|
||||
"Both filename and data given. Please choose one.")
|
||||
if data is not None and not name:
|
||||
raise ValueError(
|
||||
"name is a required parameter when data is given")
|
||||
if data is not None and generate_checksums:
|
||||
raise ValueError(
|
||||
"checksums cannot be generated with data parameter")
|
||||
if generate_checksums is None:
|
||||
if data is not None:
|
||||
generate_checksums = False
|
||||
else:
|
||||
generate_checksums = True
|
||||
|
||||
if not metadata:
|
||||
metadata = {}
|
||||
|
||||
if not filename and data is None:
|
||||
filename = name
|
||||
|
||||
if generate_checksums and (md5 is None or sha256 is None):
|
||||
(md5, sha256) = utils._get_file_hashes(filename)
|
||||
if md5:
|
||||
headers[self._OBJECT_MD5_KEY] = md5 or ''
|
||||
if sha256:
|
||||
headers[self._OBJECT_SHA256_KEY] = sha256 or ''
|
||||
for (k, v) in metadata.items():
|
||||
if not k.lower().startswith('x-object-meta-'):
|
||||
headers['x-object-meta-' + k] = v
|
||||
else:
|
||||
headers[k] = v
|
||||
|
||||
endpoint = self._get_object_endpoint(container, name)
|
||||
|
||||
if data is not None:
|
||||
self.log.debug(
|
||||
"swift uploading data to %(endpoint)s",
|
||||
{'endpoint': endpoint})
|
||||
|
||||
return self._upload_object_data(endpoint, data, headers)
|
||||
|
||||
# segment_size gets used as a step value in a range call, so needs
|
||||
# to be an int
|
||||
if segment_size:
|
||||
segment_size = int(segment_size)
|
||||
segment_size = self.get_object_segment_size(segment_size)
|
||||
file_size = os.path.getsize(filename)
|
||||
|
||||
if self.is_object_stale(container, name, filename, md5, sha256):
|
||||
|
||||
self.log.debug(
|
||||
"swift uploading %(filename)s to %(endpoint)s",
|
||||
{'filename': filename, 'endpoint': endpoint})
|
||||
|
||||
if file_size <= segment_size:
|
||||
self._upload_object(endpoint, filename, headers)
|
||||
else:
|
||||
self._upload_large_object(
|
||||
endpoint, filename, headers,
|
||||
file_size, segment_size, use_slo)
|
||||
|
||||
def _upload_object_data(self, endpoint, data, headers):
|
||||
return proxy._json_response(self.object_store.put(
|
||||
endpoint, headers=headers, data=data))
|
||||
|
||||
def _upload_object(self, endpoint, filename, headers):
|
||||
return proxy._json_response(self.object_store.put(
|
||||
endpoint, headers=headers, data=open(filename, 'rb')))
|
||||
|
||||
def _get_file_segments(self, endpoint, filename, file_size, segment_size):
|
||||
# Use an ordered dict here so that testing can replicate things
|
||||
segments = collections.OrderedDict()
|
||||
for (index, offset) in enumerate(range(0, file_size, segment_size)):
|
||||
remaining = file_size - (index * segment_size)
|
||||
segment = _utils.FileSegment(
|
||||
filename, offset,
|
||||
segment_size if segment_size < remaining else remaining)
|
||||
name = '{endpoint}/{index:0>6}'.format(
|
||||
endpoint=endpoint, index=index)
|
||||
segments[name] = segment
|
||||
return segments
|
||||
|
||||
def _object_name_from_url(self, url):
|
||||
'''Get container_name/object_name from the full URL called.
|
||||
|
||||
Remove the Swift endpoint from the front of the URL, and remove
|
||||
the leaving / that will leave behind.'''
|
||||
endpoint = self.object_store.get_endpoint()
|
||||
object_name = url.replace(endpoint, '')
|
||||
if object_name.startswith('/'):
|
||||
object_name = object_name[1:]
|
||||
return object_name
|
||||
|
||||
def _add_etag_to_manifest(self, segment_results, manifest):
|
||||
for result in segment_results:
|
||||
if 'Etag' not in result.headers:
|
||||
continue
|
||||
name = self._object_name_from_url(result.url)
|
||||
for entry in manifest:
|
||||
if entry['path'] == '/{name}'.format(name=name):
|
||||
entry['etag'] = result.headers['Etag']
|
||||
|
||||
def _upload_large_object(
|
||||
self, endpoint, filename,
|
||||
headers, file_size, segment_size, use_slo):
|
||||
# If the object is big, we need to break it up into segments that
|
||||
# are no larger than segment_size, upload each of them individually
|
||||
# and then upload a manifest object. The segments can be uploaded in
|
||||
# parallel, so we'll use the async feature of the TaskManager.
|
||||
|
||||
segment_futures = []
|
||||
segment_results = []
|
||||
retry_results = []
|
||||
retry_futures = []
|
||||
manifest = []
|
||||
|
||||
# Get an OrderedDict with keys being the swift location for the
|
||||
# segment, the value a FileSegment file-like object that is a
|
||||
# slice of the data for the segment.
|
||||
segments = self._get_file_segments(
|
||||
endpoint, filename, file_size, segment_size)
|
||||
|
||||
# Schedule the segments for upload
|
||||
for name, segment in segments.items():
|
||||
# Async call to put - schedules execution and returns a future
|
||||
segment_future = self._pool_executor.submit(
|
||||
self.object_store.put,
|
||||
name, headers=headers, data=segment,
|
||||
raise_exc=False)
|
||||
segment_futures.append(segment_future)
|
||||
# TODO(mordred) Collect etags from results to add to this manifest
|
||||
# dict. Then sort the list of dicts by path.
|
||||
manifest.append(dict(
|
||||
path='/{name}'.format(name=name),
|
||||
size_bytes=segment.length))
|
||||
|
||||
# Try once and collect failed results to retry
|
||||
segment_results, retry_results = self._wait_for_futures(
|
||||
segment_futures, raise_on_error=False)
|
||||
|
||||
self._add_etag_to_manifest(segment_results, manifest)
|
||||
|
||||
for result in retry_results:
|
||||
# Grab the FileSegment for the failed upload so we can retry
|
||||
name = self._object_name_from_url(result.url)
|
||||
segment = segments[name]
|
||||
segment.seek(0)
|
||||
# Async call to put - schedules execution and returns a future
|
||||
segment_future = self._pool_executor.submit(
|
||||
self.object_store.put,
|
||||
name, headers=headers, data=segment)
|
||||
# TODO(mordred) Collect etags from results to add to this manifest
|
||||
# dict. Then sort the list of dicts by path.
|
||||
retry_futures.append(segment_future)
|
||||
|
||||
# If any segments fail the second time, just throw the error
|
||||
segment_results, retry_results = self._wait_for_futures(
|
||||
retry_futures, raise_on_error=True)
|
||||
|
||||
self._add_etag_to_manifest(segment_results, manifest)
|
||||
|
||||
# If the final manifest upload fails, remove the segments we've
|
||||
# already uploaded.
|
||||
try:
|
||||
if use_slo:
|
||||
return self._finish_large_object_slo(endpoint, headers,
|
||||
manifest)
|
||||
else:
|
||||
return self._finish_large_object_dlo(endpoint, headers)
|
||||
except Exception:
|
||||
try:
|
||||
segment_prefix = endpoint.split('/')[-1]
|
||||
self.log.debug(
|
||||
"Failed to upload large object manifest for %s. "
|
||||
"Removing segment uploads.", segment_prefix)
|
||||
self.delete_autocreated_image_objects(
|
||||
segment_prefix=segment_prefix)
|
||||
except Exception:
|
||||
self.log.exception(
|
||||
"Failed to cleanup image objects for %s:",
|
||||
segment_prefix)
|
||||
raise
|
||||
|
||||
def _finish_large_object_slo(self, endpoint, headers, manifest):
|
||||
# TODO(mordred) send an etag of the manifest, which is the md5sum
|
||||
# of the concatenation of the etags of the results
|
||||
headers = headers.copy()
|
||||
retries = 3
|
||||
while True:
|
||||
try:
|
||||
return self._object_store_client.put(
|
||||
endpoint,
|
||||
params={'multipart-manifest': 'put'},
|
||||
headers=headers, data=json.dumps(manifest))
|
||||
except Exception:
|
||||
retries -= 1
|
||||
if retries == 0:
|
||||
raise
|
||||
|
||||
def _finish_large_object_dlo(self, endpoint, headers):
|
||||
headers = headers.copy()
|
||||
headers['X-Object-Manifest'] = endpoint
|
||||
retries = 3
|
||||
while True:
|
||||
try:
|
||||
return self._object_store_client.put(endpoint, headers=headers)
|
||||
except Exception:
|
||||
retries -= 1
|
||||
if retries == 0:
|
||||
raise
|
||||
return self.object_store.create_object(
|
||||
container, name,
|
||||
filename=filename, data=data,
|
||||
md5=md5, sha256=sha256, use_slo=use_slo,
|
||||
generate_checksums=generate_checksums,
|
||||
metadata=metadata,
|
||||
**headers
|
||||
)
|
||||
|
||||
def update_object(self, container, name, metadata=None, **headers):
|
||||
"""Update the metadata of an object
|
||||
@@ -573,19 +307,10 @@ class ObjectStoreCloudMixin(_normalize.Normalizer):
|
||||
|
||||
:raises: ``OpenStackCloudException`` on operation error.
|
||||
"""
|
||||
if not metadata:
|
||||
metadata = {}
|
||||
|
||||
metadata_headers = {}
|
||||
|
||||
for (k, v) in metadata.items():
|
||||
metadata_headers['x-object-meta-' + k] = v
|
||||
|
||||
headers = dict(headers, **metadata_headers)
|
||||
|
||||
return self._object_store_client.post(
|
||||
self._get_object_endpoint(container, name),
|
||||
headers=headers)
|
||||
meta = metadata.copy() or {}
|
||||
meta.update(**headers)
|
||||
self.object_store.set_object_metadata(
|
||||
name, container, **meta)
|
||||
|
||||
def list_objects(self, container, full_listing=True, prefix=None):
|
||||
"""List objects.
|
||||
@@ -634,27 +359,11 @@ class ObjectStoreCloudMixin(_normalize.Normalizer):
|
||||
|
||||
:raises: OpenStackCloudException on operation error.
|
||||
"""
|
||||
# TODO(mordred) DELETE for swift returns status in text/plain format
|
||||
# like so:
|
||||
# Number Deleted: 15
|
||||
# Number Not Found: 0
|
||||
# Response Body:
|
||||
# Response Status: 200 OK
|
||||
# Errors:
|
||||
# We should ultimately do something with that
|
||||
try:
|
||||
if not meta:
|
||||
meta = self.get_object_metadata(container, name)
|
||||
if not meta:
|
||||
return False
|
||||
params = {}
|
||||
if meta.get('X-Static-Large-Object', None) == 'True':
|
||||
params['multipart-manifest'] = 'delete'
|
||||
self._object_store_client.delete(
|
||||
self._get_object_endpoint(container, name),
|
||||
params=params)
|
||||
self.object_store.delete_object(
|
||||
name, ignore_missing=False, container=container)
|
||||
return True
|
||||
except exc.OpenStackCloudHTTPError:
|
||||
except exceptions.SDKException:
|
||||
return False
|
||||
|
||||
def delete_autocreated_image_objects(self, container=None,
|
||||
@@ -672,28 +381,14 @@ class ObjectStoreCloudMixin(_normalize.Normalizer):
|
||||
delete. If not given, all image upload segments present are
|
||||
deleted.
|
||||
"""
|
||||
if container is None:
|
||||
container = self._OBJECT_AUTOCREATE_CONTAINER
|
||||
# This method only makes sense on clouds that use tasks
|
||||
if not self.image_api_use_tasks:
|
||||
return False
|
||||
|
||||
deleted = False
|
||||
for obj in self.list_objects(container, prefix=segment_prefix):
|
||||
meta = self.get_object_metadata(container, obj['name'])
|
||||
if meta.get(
|
||||
self._OBJECT_AUTOCREATE_KEY, meta.get(
|
||||
self._SHADE_OBJECT_AUTOCREATE_KEY)) == 'true':
|
||||
if self.delete_object(container, obj['name'], meta):
|
||||
deleted = True
|
||||
return deleted
|
||||
return self.object_store._delete_autocreated_image_objects(
|
||||
container, segment_prefix=segment_prefix
|
||||
)
|
||||
|
||||
def get_object_metadata(self, container, name):
|
||||
try:
|
||||
return self._object_store_client.head(
|
||||
self._get_object_endpoint(container, name)).headers
|
||||
except exceptions.NotFoundException:
|
||||
return None
|
||||
return self.object_store.get_object_metadata(
|
||||
name, container
|
||||
).metadata
|
||||
|
||||
def get_object_raw(self, container, obj, query_string=None, stream=False):
|
||||
"""Get a raw response object for an object.
|
||||
@@ -739,14 +434,11 @@ class ObjectStoreCloudMixin(_normalize.Normalizer):
|
||||
:raises: OpenStackCloudException on operation error.
|
||||
"""
|
||||
try:
|
||||
with self.get_object_raw(
|
||||
container, obj, query_string=query_string) as response:
|
||||
for ret in response.iter_content(chunk_size=resp_chunk_size):
|
||||
yield ret
|
||||
except exc.OpenStackCloudHTTPError as e:
|
||||
if e.response.status_code == 404:
|
||||
return
|
||||
raise
|
||||
for ret in self.object_store.stream_object(
|
||||
obj, container, chunk_size=resp_chunk_size):
|
||||
yield ret
|
||||
except exceptions.ResourceNotFound:
|
||||
return
|
||||
|
||||
def get_object(self, container, obj, query_string=None,
|
||||
resp_chunk_size=1024, outfile=None, stream=False):
|
||||
@@ -770,33 +462,19 @@ class ObjectStoreCloudMixin(_normalize.Normalizer):
|
||||
is not found (404).
|
||||
:raises: OpenStackCloudException on operation error.
|
||||
"""
|
||||
# TODO(mordred) implement resp_chunk_size
|
||||
endpoint = self._get_object_endpoint(container, obj, query_string)
|
||||
try:
|
||||
get_stream = (outfile is not None)
|
||||
with self._object_store_client.get(
|
||||
endpoint, stream=get_stream) as response:
|
||||
response_headers = {
|
||||
k.lower(): v for k, v in response.headers.items()}
|
||||
if outfile:
|
||||
if isinstance(outfile, str):
|
||||
outfile_handle = open(outfile, 'wb')
|
||||
else:
|
||||
outfile_handle = outfile
|
||||
for chunk in response.iter_content(
|
||||
resp_chunk_size, decode_unicode=False):
|
||||
outfile_handle.write(chunk)
|
||||
if isinstance(outfile, str):
|
||||
outfile_handle.close()
|
||||
else:
|
||||
outfile_handle.flush()
|
||||
return (response_headers, None)
|
||||
else:
|
||||
return (response_headers, response.text)
|
||||
except exc.OpenStackCloudHTTPError as e:
|
||||
if e.response.status_code == 404:
|
||||
return None
|
||||
raise
|
||||
obj = self.object_store.get_object(
|
||||
obj, container=container,
|
||||
resp_chunk_size=resp_chunk_size,
|
||||
outfile=outfile,
|
||||
remember_content=(outfile is None)
|
||||
)
|
||||
headers = {
|
||||
k.lower(): v for k, v in obj._last_headers.items()}
|
||||
return (headers, obj.data)
|
||||
|
||||
except exceptions.ResourceNotFound:
|
||||
return None
|
||||
|
||||
def _wait_for_futures(self, futures, raise_on_error=True):
|
||||
'''Collect results or failures from a list of running future tasks.'''
|
||||
|
||||
6
openstack/cloud/openstackcloud.py
Executable file → Normal file
6
openstack/cloud/openstackcloud.py
Executable file → Normal file
@@ -60,9 +60,9 @@ class _OpenStackCloudMixin:
|
||||
:param bool strict: Only return documented attributes for each resource
|
||||
as per the Data Model contract. (Default False)
|
||||
"""
|
||||
_OBJECT_MD5_KEY = 'x-object-meta-x-sdk-md5'
|
||||
_OBJECT_SHA256_KEY = 'x-object-meta-x-sdk-sha256'
|
||||
_OBJECT_AUTOCREATE_KEY = 'x-object-meta-x-sdk-autocreated'
|
||||
_OBJECT_MD5_KEY = 'x-sdk-md5'
|
||||
_OBJECT_SHA256_KEY = 'x-sdk-sha256'
|
||||
_OBJECT_AUTOCREATE_KEY = 'x-sdk-autocreated'
|
||||
_OBJECT_AUTOCREATE_CONTAINER = 'images'
|
||||
|
||||
# NOTE(shade) shade keys were x-object-meta-x-shade-md5 - we need to check
|
||||
|
||||
@@ -25,6 +25,7 @@ class BaseResource(resource.Resource):
|
||||
|
||||
_custom_metadata_prefix = None
|
||||
_system_metadata = dict()
|
||||
_last_headers = dict()
|
||||
|
||||
def __init__(self, metadata=None, **attrs):
|
||||
"""Process and save metadata known at creation stage
|
||||
@@ -88,6 +89,11 @@ class BaseResource(resource.Resource):
|
||||
self.metadata[key] = headers[header]
|
||||
|
||||
def _translate_response(self, response, has_body=None, error_message=None):
|
||||
# Save headers of the last operation for potential use (get_object of
|
||||
# cloud layer).
|
||||
# This must happen before invoking parent _translate_response, cause it
|
||||
# pops known headers.
|
||||
self._last_headers = response.headers.copy()
|
||||
super(BaseResource, self)._translate_response(
|
||||
response, has_body=has_body, error_message=error_message)
|
||||
self._set_metadata(response.headers)
|
||||
|
||||
@@ -236,24 +236,70 @@ class Proxy(proxy.Proxy):
|
||||
|
||||
raise ValueError("container must be specified")
|
||||
|
||||
def get_object(self, obj, container=None):
|
||||
def get_object(
|
||||
self, obj, container=None, resp_chunk_size=1024,
|
||||
outfile=None, remember_content=False
|
||||
):
|
||||
"""Get the data associated with an object
|
||||
|
||||
:param obj: The value can be the name of an object or a
|
||||
:class:`~openstack.object_store.v1.obj.Object` instance.
|
||||
:class:`~openstack.object_store.v1.obj.Object` instance.
|
||||
:param container: The value can be the name of a container or a
|
||||
:class:`~openstack.object_store.v1.container.Container`
|
||||
instance.
|
||||
:class:`~openstack.object_store.v1.container.Container`
|
||||
instance.
|
||||
:param int resp_chunk_size:
|
||||
chunk size of data to read. Only used if the results are
|
||||
being written to a file or stream is True.
|
||||
(optional, defaults to 1k)
|
||||
:param outfile:
|
||||
Write the object to a file instead of returning the contents.
|
||||
If this option is given, body in the return tuple will be None.
|
||||
outfile can either be a file path given as a string, or a
|
||||
File like object.
|
||||
:param bool remember_content: Flag whether object data should be saved
|
||||
as `data` property of the Object. When left as `false` and
|
||||
`outfile` is not defined data will not be saved and need to be
|
||||
fetched separately.
|
||||
|
||||
:returns: The contents of the object. Use the
|
||||
:func:`~get_object_metadata`
|
||||
method if you want an object resource.
|
||||
:returns: Instance of the
|
||||
:class:`~openstack.object_store.v1.obj.Object` objects.
|
||||
:raises: :class:`~openstack.exceptions.ResourceNotFound`
|
||||
when no resource can be found.
|
||||
when no resource can be found.
|
||||
"""
|
||||
container_name = self._get_container_name(
|
||||
obj=obj, container=container)
|
||||
return self._get(_obj.Object, obj, container=container_name)
|
||||
|
||||
_object = self._get_resource(
|
||||
_obj.Object, obj,
|
||||
container=container_name)
|
||||
request = _object._prepare_request()
|
||||
|
||||
get_stream = (outfile is not None)
|
||||
|
||||
response = self.get(
|
||||
request.url,
|
||||
headers=request.headers,
|
||||
stream=get_stream
|
||||
)
|
||||
exceptions.raise_from_response(response)
|
||||
_object._translate_response(response, has_body=False)
|
||||
|
||||
if outfile:
|
||||
if isinstance(outfile, str):
|
||||
outfile_handle = open(outfile, 'wb')
|
||||
else:
|
||||
outfile_handle = outfile
|
||||
for chunk in response.iter_content(
|
||||
resp_chunk_size, decode_unicode=False):
|
||||
outfile_handle.write(chunk)
|
||||
if isinstance(outfile, str):
|
||||
outfile_handle.close()
|
||||
else:
|
||||
outfile_handle.flush()
|
||||
elif remember_content:
|
||||
_object.data = response.text
|
||||
|
||||
return _object
|
||||
|
||||
def download_object(self, obj, container=None, **attrs):
|
||||
"""Download the data contained inside an object.
|
||||
@@ -288,7 +334,6 @@ class Proxy(proxy.Proxy):
|
||||
"""
|
||||
container_name = self._get_container_name(
|
||||
obj=obj, container=container)
|
||||
container_name = self._get_container_name(container=container)
|
||||
obj = self._get_resource(
|
||||
_obj.Object, obj, container=container_name, **attrs)
|
||||
return obj.stream(self, chunk_size=chunk_size)
|
||||
@@ -356,9 +401,9 @@ class Proxy(proxy.Proxy):
|
||||
if generate_checksums and (md5 is None or sha256 is None):
|
||||
(md5, sha256) = utils._get_file_hashes(filename)
|
||||
if md5:
|
||||
headers[self._connection._OBJECT_MD5_KEY] = md5 or ''
|
||||
metadata[self._connection._OBJECT_MD5_KEY] = md5
|
||||
if sha256:
|
||||
headers[self._connection._OBJECT_SHA256_KEY] = sha256 or ''
|
||||
metadata[self._connection._OBJECT_SHA256_KEY] = sha256
|
||||
|
||||
container_name = self._get_container_name(container=container)
|
||||
endpoint = '{container}/{name}'.format(container=container_name,
|
||||
@@ -368,7 +413,6 @@ class Proxy(proxy.Proxy):
|
||||
self.log.debug(
|
||||
"swift uploading data to %(endpoint)s",
|
||||
{'endpoint': endpoint})
|
||||
# TODO(gtema): custom headers need to be somehow injected
|
||||
return self._create(
|
||||
_obj.Object, container=container_name,
|
||||
name=name, data=data, metadata=metadata,
|
||||
@@ -387,10 +431,14 @@ class Proxy(proxy.Proxy):
|
||||
"swift uploading %(filename)s to %(endpoint)s",
|
||||
{'filename': filename, 'endpoint': endpoint})
|
||||
|
||||
if metadata is not None:
|
||||
# Rely on the class headers calculation for requested metadata
|
||||
meta_headers = _obj.Object()._calculate_headers(metadata)
|
||||
headers.update(meta_headers)
|
||||
|
||||
if file_size <= segment_size:
|
||||
# TODO(gtema): replace with regular resource put, but
|
||||
# custom headers need to be somehow injected
|
||||
self._upload_object(endpoint, filename, headers)
|
||||
|
||||
else:
|
||||
self._upload_large_object(
|
||||
endpoint, filename, headers,
|
||||
@@ -501,8 +549,9 @@ class Proxy(proxy.Proxy):
|
||||
Pre-calculated sha256 of the file contents. Defaults to None which
|
||||
means calculate locally.
|
||||
"""
|
||||
metadata = self._connection.get_object_metadata(container, name)
|
||||
if not metadata:
|
||||
try:
|
||||
metadata = self.get_object_metadata(name, container).metadata
|
||||
except exceptions.NotFoundException:
|
||||
self._connection.log.debug(
|
||||
"swift stale check, no object: {container}/{name}".format(
|
||||
container=container, name=name))
|
||||
@@ -592,29 +641,61 @@ class Proxy(proxy.Proxy):
|
||||
|
||||
self._add_etag_to_manifest(segment_results, manifest)
|
||||
|
||||
if use_slo:
|
||||
return self._finish_large_object_slo(endpoint, headers, manifest)
|
||||
else:
|
||||
return self._finish_large_object_dlo(endpoint, headers)
|
||||
try:
|
||||
if use_slo:
|
||||
return self._finish_large_object_slo(
|
||||
endpoint, headers, manifest)
|
||||
else:
|
||||
return self._finish_large_object_dlo(
|
||||
endpoint, headers)
|
||||
except Exception:
|
||||
try:
|
||||
segment_prefix = endpoint.split('/')[-1]
|
||||
self.log.debug(
|
||||
"Failed to upload large object manifest for %s. "
|
||||
"Removing segment uploads.", segment_prefix)
|
||||
self._delete_autocreated_image_objects(
|
||||
segment_prefix=segment_prefix)
|
||||
except Exception:
|
||||
self.log.exception(
|
||||
"Failed to cleanup image objects for %s:",
|
||||
segment_prefix)
|
||||
raise
|
||||
|
||||
def _finish_large_object_slo(self, endpoint, headers, manifest):
|
||||
# TODO(mordred) send an etag of the manifest, which is the md5sum
|
||||
# of the concatenation of the etags of the results
|
||||
headers = headers.copy()
|
||||
return self.put(
|
||||
endpoint,
|
||||
params={'multipart-manifest': 'put'},
|
||||
headers=headers, data=json.dumps(manifest))
|
||||
retries = 3
|
||||
while True:
|
||||
try:
|
||||
return exceptions.raise_from_response(self.put(
|
||||
endpoint,
|
||||
params={'multipart-manifest': 'put'},
|
||||
headers=headers, data=json.dumps(manifest))
|
||||
)
|
||||
except Exception:
|
||||
retries -= 1
|
||||
if retries == 0:
|
||||
raise
|
||||
|
||||
def _finish_large_object_dlo(self, endpoint, headers):
|
||||
headers = headers.copy()
|
||||
headers['X-Object-Manifest'] = endpoint
|
||||
return self.put(endpoint, headers=headers)
|
||||
retries = 3
|
||||
while True:
|
||||
try:
|
||||
return exceptions.raise_from_response(
|
||||
self.put(endpoint, headers=headers))
|
||||
except Exception:
|
||||
retries -= 1
|
||||
if retries == 0:
|
||||
raise
|
||||
|
||||
def _upload_object(self, endpoint, filename, headers):
|
||||
with open(filename, 'rb') as dt:
|
||||
return proxy._json_response(self.put(
|
||||
endpoint, headers=headers, data=dt))
|
||||
return self.put(
|
||||
endpoint, headers=headers, data=dt)
|
||||
|
||||
def _get_file_segments(self, endpoint, filename, file_size, segment_size):
|
||||
# Use an ordered dict here so that testing can replicate things
|
||||
@@ -926,3 +1007,33 @@ class Proxy(proxy.Proxy):
|
||||
return temp_url.encode('utf-8')
|
||||
else:
|
||||
return temp_url
|
||||
|
||||
def _delete_autocreated_image_objects(
|
||||
self, container=None, segment_prefix=None
|
||||
):
|
||||
"""Delete all objects autocreated for image uploads.
|
||||
|
||||
This method should generally not be needed, as shade should clean up
|
||||
the objects it uses for object-based image creation. If something
|
||||
goes wrong and it is found that there are leaked objects, this method
|
||||
can be used to delete any objects that shade has created on the user's
|
||||
behalf in service of image uploads.
|
||||
|
||||
:param str container: Name of the container. Defaults to 'images'.
|
||||
:param str segment_prefix: Prefix for the image segment names to
|
||||
delete. If not given, all image upload segments present are
|
||||
deleted.
|
||||
"""
|
||||
if container is None:
|
||||
container = self._connection._OBJECT_AUTOCREATE_CONTAINER
|
||||
# This method only makes sense on clouds that use tasks
|
||||
if not self._connection.image_api_use_tasks:
|
||||
return False
|
||||
|
||||
deleted = False
|
||||
for obj in self.objects(container, prefix=segment_prefix):
|
||||
meta = self.get_object_metadata(obj).metadata
|
||||
if meta.get(self._connection._OBJECT_AUTOCREATE_KEY) == 'true':
|
||||
self.delete_object(obj, ignore_missing=True)
|
||||
deleted = True
|
||||
return deleted
|
||||
|
||||
@@ -71,13 +71,13 @@ class TestObject(base.BaseFunctionalTest):
|
||||
))
|
||||
self.assertEqual(
|
||||
'bar', self.user_cloud.get_object_metadata(
|
||||
container_name, name)['x-object-meta-foo']
|
||||
container_name, name)['foo']
|
||||
)
|
||||
self.user_cloud.update_object(container=container_name, name=name,
|
||||
metadata={'testk': 'testv'})
|
||||
self.assertEqual(
|
||||
'testv', self.user_cloud.get_object_metadata(
|
||||
container_name, name)['x-object-meta-testk']
|
||||
container_name, name)['testk']
|
||||
)
|
||||
try:
|
||||
self.assertIsNotNone(
|
||||
@@ -139,13 +139,13 @@ class TestObject(base.BaseFunctionalTest):
|
||||
))
|
||||
self.assertEqual(
|
||||
'bar', self.user_cloud.get_object_metadata(
|
||||
container_name, name)['x-object-meta-foo']
|
||||
container_name, name)['foo']
|
||||
)
|
||||
self.user_cloud.update_object(container=container_name, name=name,
|
||||
metadata={'testk': 'testv'})
|
||||
self.assertEqual(
|
||||
'testv', self.user_cloud.get_object_metadata(
|
||||
container_name, name)['x-object-meta-testk']
|
||||
container_name, name)['testk']
|
||||
)
|
||||
try:
|
||||
with tempfile.NamedTemporaryFile() as fake_file:
|
||||
|
||||
@@ -615,10 +615,10 @@ class TestImage(BaseTestImage):
|
||||
object=self.image_name),
|
||||
status_code=201,
|
||||
validate=dict(
|
||||
headers={'x-object-meta-x-sdk-md5':
|
||||
headers={'X-Object-Meta-x-sdk-md5':
|
||||
self.fake_image_dict[
|
||||
'owner_specified.openstack.md5'],
|
||||
'x-object-meta-x-sdk-sha256':
|
||||
'X-Object-Meta-x-sdk-sha256':
|
||||
self.fake_image_dict[
|
||||
'owner_specified.openstack.sha256']})
|
||||
),
|
||||
@@ -711,12 +711,12 @@ class TestImage(BaseTestImage):
|
||||
self.assert_calls()
|
||||
|
||||
def test_delete_autocreated_no_tasks(self):
|
||||
self.use_nothing()
|
||||
self.use_keystone_v3()
|
||||
self.cloud.image_api_use_tasks = False
|
||||
deleted = self.cloud.delete_autocreated_image_objects(
|
||||
container=self.container_name)
|
||||
self.assertFalse(deleted)
|
||||
self.assert_calls()
|
||||
self.assert_calls([])
|
||||
|
||||
def test_delete_image_task(self):
|
||||
self.cloud.image_api_use_tasks = True
|
||||
@@ -823,8 +823,10 @@ class TestImage(BaseTestImage):
|
||||
'Date': 'Thu, 16 Nov 2017 15:24:30 GMT',
|
||||
'Accept-Ranges': 'bytes',
|
||||
'Content-Type': 'application/octet-stream',
|
||||
self.cloud._OBJECT_AUTOCREATE_KEY: 'true',
|
||||
'Etag': fakes.NO_MD5}),
|
||||
('X-Object-Meta-'
|
||||
+ self.cloud._OBJECT_AUTOCREATE_KEY): 'true',
|
||||
'Etag': fakes.NO_MD5,
|
||||
'X-Static-Large-Object': 'false'}),
|
||||
dict(method='DELETE',
|
||||
uri='{endpoint}/{container}/{object}'.format(
|
||||
endpoint=endpoint, container=self.container_name,
|
||||
|
||||
@@ -1079,11 +1079,12 @@ class TestObjectUploads(BaseTestObject):
|
||||
'X-Trans-Id': 'txbbb825960a3243b49a36f-005a0dadaedfw1',
|
||||
'Content-Length': '1290170880',
|
||||
'Last-Modified': 'Tue, 14 Apr 2015 18:29:01 GMT',
|
||||
'x-object-meta-x-sdk-autocreated': 'true',
|
||||
'X-Object-Meta-x-sdk-autocreated': 'true',
|
||||
'X-Object-Meta-X-Shade-Sha256': 'does not matter',
|
||||
'X-Object-Meta-X-Shade-Md5': 'does not matter',
|
||||
'Date': 'Thu, 16 Nov 2017 15:24:30 GMT',
|
||||
'Accept-Ranges': 'bytes',
|
||||
'X-Static-Large-Object': 'false',
|
||||
'Content-Type': 'application/octet-stream',
|
||||
'Etag': '249219347276c331b87bf1ac2152d9af',
|
||||
}),
|
||||
|
||||
@@ -17,6 +17,7 @@ import tempfile
|
||||
import time
|
||||
from unittest import mock
|
||||
|
||||
import requests_mock
|
||||
from testscenarios import load_tests_apply_scenarios as load_tests # noqa
|
||||
|
||||
from openstack.object_store.v1 import account
|
||||
@@ -26,6 +27,16 @@ from openstack.tests.unit.cloud import test_object as base_test_object
|
||||
from openstack.tests.unit import test_proxy_base
|
||||
|
||||
|
||||
class FakeResponse:
|
||||
def __init__(self, response, status_code=200, headers=None):
|
||||
self.body = response
|
||||
self.status_code = status_code
|
||||
self.headers = headers if headers else {}
|
||||
|
||||
def json(self):
|
||||
return self.body
|
||||
|
||||
|
||||
class TestObjectStoreProxy(test_proxy_base.TestProxyBase):
|
||||
|
||||
kwargs_to_path_args = False
|
||||
@@ -111,12 +122,31 @@ class TestObjectStoreProxy(test_proxy_base.TestProxyBase):
|
||||
self.assertRaises(TypeError, self.proxy.upload_object)
|
||||
|
||||
def test_object_get(self):
|
||||
kwargs = dict(container="container")
|
||||
self.verify_get(
|
||||
self.proxy.get_object, obj.Object,
|
||||
method_args=["object"],
|
||||
method_kwargs=kwargs,
|
||||
expected_kwargs=kwargs)
|
||||
with requests_mock.Mocker() as m:
|
||||
m.get("%scontainer/object" % self.endpoint,
|
||||
text="data")
|
||||
res = self.proxy.get_object("object", container="container")
|
||||
self.assertIsNone(res.data)
|
||||
|
||||
def test_object_get_write_file(self):
|
||||
with requests_mock.Mocker() as m:
|
||||
m.get("%scontainer/object" % self.endpoint,
|
||||
text="data")
|
||||
with tempfile.NamedTemporaryFile() as f:
|
||||
self.proxy.get_object(
|
||||
"object", container="container",
|
||||
outfile=f.name)
|
||||
dt = open(f.name).read()
|
||||
self.assertEqual(dt, "data")
|
||||
|
||||
def test_object_get_remember_content(self):
|
||||
with requests_mock.Mocker() as m:
|
||||
m.get("%scontainer/object" % self.endpoint,
|
||||
text="data")
|
||||
res = self.proxy.get_object(
|
||||
"object", container="container",
|
||||
remember_content=True)
|
||||
self.assertEqual(res.data, "data")
|
||||
|
||||
def test_set_temp_url_key(self):
|
||||
|
||||
|
||||
Reference in New Issue
Block a user