Add S3 store support to glance_store
Revive the S3 driver that was supported up to Mitaka and add multiple store support to correspond with the features added to glance_store from Newton to U cycle. In addition, the use of the S3 SDK used by the Mitaka S3 driver has been deprecated, so this implementation will be changed to use the new version. Change-Id: I203134837319080ead96da69048baf90086d2117 Implements: blueprint re-support-s3-driver
This commit is contained in:
parent
ae73287c7b
commit
261bad3e43
@ -45,3 +45,8 @@ below the table of supported drivers and maintainers:
|
||||
- Sabari Murugesan
|
||||
- smurugesan@vmware.com
|
||||
- sabari
|
||||
* - S3
|
||||
- Supported
|
||||
- Naohiro Sameshima
|
||||
- naohiro.sameshima@global.ntt
|
||||
- nao-shark
|
||||
|
932
glance_store/_drivers/s3.py
Normal file
932
glance_store/_drivers/s3.py
Normal file
@ -0,0 +1,932 @@
|
||||
# Copyright 2010 OpenStack Foundation
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""Storage backend for S3 or Storage Servers that follow the S3 Protocol"""
|
||||
|
||||
import hashlib
|
||||
import logging
|
||||
import math
|
||||
import re
|
||||
|
||||
from boto3 import session as boto_session
|
||||
from botocore import client as boto_client
|
||||
from botocore import exceptions as boto_exceptions
|
||||
from botocore import utils as boto_utils
|
||||
import eventlet
|
||||
from oslo_config import cfg
|
||||
from oslo_utils import encodeutils
|
||||
from oslo_utils import units
|
||||
import six
|
||||
from six.moves import urllib
|
||||
|
||||
import glance_store
|
||||
from glance_store import capabilities
|
||||
from glance_store.common import utils
|
||||
import glance_store.driver
|
||||
from glance_store import exceptions
|
||||
from glance_store.i18n import _
|
||||
import glance_store.location
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
DEFAULT_LARGE_OBJECT_SIZE = 100 # 100M
|
||||
DEFAULT_LARGE_OBJECT_CHUNK_SIZE = 10 # 10M
|
||||
DEFAULT_LARGE_OBJECT_MIN_CHUNK_SIZE = 5 # 5M
|
||||
DEFAULT_THREAD_POOLS = 10 # 10 pools
|
||||
MAX_PART_NUM = 10000 # 10000 upload parts
|
||||
|
||||
_S3_OPTS = [
|
||||
cfg.StrOpt('s3_store_host',
|
||||
help="""
|
||||
The host where the S3 server is listening.
|
||||
|
||||
This configuration option sets the host of the S3 or S3 compatible storage
|
||||
Server. This option is required when using the S3 storage backend.
|
||||
The host can contain a DNS name (e.g. s3.amazonaws.com, my-object-storage.com)
|
||||
or an IP address (127.0.0.1).
|
||||
|
||||
Possible values:
|
||||
* A valid DNS name
|
||||
* A valid IPv4 address
|
||||
|
||||
Related Options:
|
||||
* s3_store_access_key
|
||||
* s3_store_secret_key
|
||||
|
||||
"""),
|
||||
cfg.StrOpt('s3_store_access_key',
|
||||
secret=True,
|
||||
help="""
|
||||
The S3 query token access key.
|
||||
|
||||
This configuration option takes the access key for authenticating with the
|
||||
Amazon S3 or S3 compatible storage server. This option is required when using
|
||||
the S3 storage backend.
|
||||
|
||||
Possible values:
|
||||
* Any string value that is the access key for a user with appropriate
|
||||
privileges
|
||||
|
||||
Related Options:
|
||||
* s3_store_host
|
||||
* s3_store_secret_key
|
||||
|
||||
"""),
|
||||
cfg.StrOpt('s3_store_secret_key',
|
||||
secret=True,
|
||||
help="""
|
||||
The S3 query token secret key.
|
||||
|
||||
This configuration option takes the secret key for authenticating with the
|
||||
Amazon S3 or S3 compatible storage server. This option is required when using
|
||||
the S3 storage backend.
|
||||
|
||||
Possible values:
|
||||
* Any string value that is a secret key corresponding to the access key
|
||||
specified using the ``s3_store_host`` option
|
||||
|
||||
Related Options:
|
||||
* s3_store_host
|
||||
* s3_store_access_key
|
||||
|
||||
"""),
|
||||
cfg.StrOpt('s3_store_bucket',
|
||||
help="""
|
||||
The S3 bucket to be used to store the Glance data.
|
||||
|
||||
This configuration option specifies where the glance images will be stored
|
||||
in the S3. If ``s3_store_create_bucket_on_put`` is set to true, it will be
|
||||
created automatically even if the bucket does not exist.
|
||||
|
||||
Possible values:
|
||||
* Any string value
|
||||
|
||||
Related Options:
|
||||
* s3_store_create_bucket_on_put
|
||||
* s3_store_bucket_url_format
|
||||
|
||||
"""),
|
||||
cfg.BoolOpt('s3_store_create_bucket_on_put',
|
||||
default=False,
|
||||
help="""
|
||||
Determine whether S3 should create a new bucket.
|
||||
|
||||
This configuration option takes boolean value to indicate whether Glance should
|
||||
create a new bucket to S3 if it does not exist.
|
||||
|
||||
Possible values:
|
||||
* Any Boolean value
|
||||
|
||||
Related Options:
|
||||
* None
|
||||
|
||||
"""),
|
||||
cfg.StrOpt('s3_store_bucket_url_format',
|
||||
default='auto',
|
||||
help="""
|
||||
The S3 calling format used to determine the object.
|
||||
|
||||
This configuration option takes access model that is used to specify the
|
||||
address of an object in an S3 bucket.
|
||||
|
||||
NOTE:
|
||||
In ``path``-style, the endpoint for the object looks like
|
||||
'https://s3.amazonaws.com/bucket/example.img'.
|
||||
And in ``virtual``-style, the endpoint for the object looks like
|
||||
'https://bucket.s3.amazonaws.com/example.img'.
|
||||
If you do not follow the DNS naming convention in the bucket name, you can
|
||||
get objects in the path style, but not in the virtual style.
|
||||
|
||||
Possible values:
|
||||
* Any string value of ``auto``, ``virtual``, or ``path``
|
||||
|
||||
Related Options:
|
||||
* s3_store_bucket
|
||||
|
||||
"""),
|
||||
cfg.IntOpt('s3_store_large_object_size',
|
||||
default=DEFAULT_LARGE_OBJECT_SIZE,
|
||||
help="""
|
||||
What size, in MB, should S3 start chunking image files and do a multipart
|
||||
upload in S3.
|
||||
|
||||
This configuration option takes a threshold in MB to determine whether to
|
||||
upload the image to S3 as is or to split it (Multipart Upload).
|
||||
|
||||
Note: You can only split up to 10,000 images.
|
||||
|
||||
Possible values:
|
||||
* Any positive integer value
|
||||
|
||||
Related Options:
|
||||
* s3_store_large_object_chunk_size
|
||||
* s3_store_thread_pools
|
||||
|
||||
"""),
|
||||
cfg.IntOpt('s3_store_large_object_chunk_size',
|
||||
default=DEFAULT_LARGE_OBJECT_CHUNK_SIZE,
|
||||
help="""
|
||||
What multipart upload part size, in MB, should S3 use when uploading parts.
|
||||
|
||||
This configuration option takes the image split size in MB for Multipart
|
||||
Upload.
|
||||
|
||||
Note: You can only split up to 10,000 images.
|
||||
|
||||
Possible values:
|
||||
* Any positive integer value (must be greater than or equal to 5M)
|
||||
|
||||
Related Options:
|
||||
* s3_store_large_object_size
|
||||
* s3_store_thread_pools
|
||||
|
||||
"""),
|
||||
cfg.IntOpt('s3_store_thread_pools',
|
||||
default=DEFAULT_THREAD_POOLS,
|
||||
help="""
|
||||
The number of thread pools to perform a multipart upload in S3.
|
||||
|
||||
This configuration option takes the number of thread pools when performing a
|
||||
Multipart Upload.
|
||||
|
||||
Possible values:
|
||||
* Any positive integer value
|
||||
|
||||
Related Options:
|
||||
* s3_store_large_object_size
|
||||
* s3_store_large_object_chunk_size
|
||||
|
||||
""")
|
||||
]
|
||||
|
||||
|
||||
class UploadPart(object):
|
||||
"""The class for the upload part."""
|
||||
def __init__(self, mpu, fp, partnum, chunks):
|
||||
self.mpu = mpu
|
||||
self.partnum = partnum
|
||||
self.fp = fp
|
||||
self.size = 0
|
||||
self.chunks = chunks
|
||||
self.etag = {}
|
||||
self.success = True
|
||||
|
||||
|
||||
def run_upload(s3_client, bucket, key, part):
|
||||
"""Upload the upload part into S3 and set returned etag and size to its
|
||||
part info.
|
||||
|
||||
:param s3_client: An object with credentials to connect to S3
|
||||
:param bucket: The S3 bucket name
|
||||
:param key: The object name to be stored (image identifier)
|
||||
:param part: UploadPart object which used during multipart upload
|
||||
"""
|
||||
pnum = part.partnum
|
||||
bsize = part.chunks
|
||||
upload_id = part.mpu['UploadId']
|
||||
LOG.debug("Uploading upload part in S3 partnum=%(pnum)d, "
|
||||
"size=%(bsize)d, key=%(key)s, UploadId=%(UploadId)s",
|
||||
{'pnum': pnum, 'bsize': bsize, 'key': key,
|
||||
'UploadId': upload_id})
|
||||
|
||||
try:
|
||||
key = s3_client.upload_part(Body=part.fp,
|
||||
Bucket=bucket,
|
||||
ContentLength=bsize,
|
||||
Key=key,
|
||||
PartNumber=pnum,
|
||||
UploadId=upload_id)
|
||||
part.etag[part.partnum] = key['ETag']
|
||||
part.size = bsize
|
||||
except boto_exceptions.ClientError as e:
|
||||
error_code = e.response['Error']['Code']
|
||||
error_message = e.response['Error']['Message']
|
||||
LOG.warning("Failed to upload part in S3 partnum=%(pnum)d, "
|
||||
"size=%(bsize)d, error code=%(error_code)d, "
|
||||
"error message=%(error_message)s",
|
||||
{'pnum': pnum, 'bsize': bsize, 'error_code': error_code,
|
||||
'error_message': error_message})
|
||||
part.success = False
|
||||
finally:
|
||||
part.fp.close()
|
||||
|
||||
|
||||
class StoreLocation(glance_store.location.StoreLocation):
|
||||
"""Class describing an S3 URI.
|
||||
|
||||
An S3 URI can look like any of the following:
|
||||
|
||||
s3://accesskey:secretkey@s3.amazonaws.com/bucket/key-id
|
||||
s3+https://accesskey:secretkey@s3.amazonaws.com/bucket/key-id
|
||||
|
||||
The s3+https:// URIs indicate there is an HTTPS s3service URL
|
||||
"""
|
||||
def process_specs(self):
|
||||
self.scheme = self.specs.get('scheme', 's3')
|
||||
self.accesskey = self.specs.get('accesskey')
|
||||
self.secretkey = self.specs.get('secretkey')
|
||||
s3_host = self.specs.get('s3serviceurl')
|
||||
self.bucket = self.specs.get('bucket')
|
||||
self.key = self.specs.get('key')
|
||||
|
||||
if s3_host.startswith('https://'):
|
||||
self.scheme = 's3+https'
|
||||
s3_host = s3_host[len('https://'):].strip('/')
|
||||
elif s3_host.startswith('http://'):
|
||||
s3_host = s3_host[len('http://'):].strip('/')
|
||||
self.s3serviceurl = s3_host.strip('/')
|
||||
|
||||
def _get_credstring(self):
|
||||
if self.accesskey:
|
||||
return '%s:%s@' % (self.accesskey, self.secretkey)
|
||||
return ''
|
||||
|
||||
def get_uri(self):
|
||||
return "%s://%s%s/%s/%s" % (self.scheme, self._get_credstring(),
|
||||
self.s3serviceurl, self.bucket, self.key)
|
||||
|
||||
def parse_uri(self, uri):
|
||||
"""Parse URLs.
|
||||
|
||||
Note that an Amazon AWS secret key can contain the forward slash,
|
||||
which is entirely retarded, and breaks urlparse miserably.
|
||||
This function works around that issue.
|
||||
"""
|
||||
# Make sure that URIs that contain multiple schemes, such as:
|
||||
# s3://accesskey:secretkey@https://s3.amazonaws.com/bucket/key-id
|
||||
# are immediately rejected.
|
||||
if uri.count('://') != 1:
|
||||
reason = ("URI cannot contain more than one occurrence "
|
||||
"of a scheme. If you have specified a URI like "
|
||||
"s3://accesskey:secretkey@"
|
||||
"https://s3.amazonaws.com/bucket/key-id"
|
||||
", you need to change it to use the "
|
||||
"s3+https:// scheme, like so: "
|
||||
"s3+https://accesskey:secretkey@"
|
||||
"s3.amazonaws.com/bucket/key-id")
|
||||
LOG.info("Invalid store uri: %s", reason)
|
||||
raise exceptions.BadStoreUri(uri=uri)
|
||||
|
||||
pieces = urllib.parse.urlparse(uri)
|
||||
self.validate_schemas(uri, valid_schemas=(
|
||||
's3://', 's3+http://', 's3+https://'))
|
||||
self.scheme = pieces.scheme
|
||||
path = pieces.path.strip('/')
|
||||
netloc = pieces.netloc.strip('/')
|
||||
entire_path = (netloc + '/' + path).strip('/')
|
||||
|
||||
if '@' in uri:
|
||||
creds, path = entire_path.split('@')
|
||||
cred_parts = creds.split(':')
|
||||
|
||||
try:
|
||||
self.accesskey = cred_parts[0]
|
||||
self.secretkey = cred_parts[1]
|
||||
except IndexError:
|
||||
LOG.error("Badly formed S3 credentials")
|
||||
raise exceptions.BadStoreUri(uri=uri)
|
||||
else:
|
||||
self.accesskey = None
|
||||
path = entire_path
|
||||
try:
|
||||
path_parts = path.split('/')
|
||||
self.key = path_parts.pop()
|
||||
self.bucket = path_parts.pop()
|
||||
if path_parts:
|
||||
self.s3serviceurl = '/'.join(path_parts).strip('/')
|
||||
else:
|
||||
LOG.error("Badly formed S3 URI. Missing s3 service URL.")
|
||||
raise exceptions.BadStoreUri(uri=uri)
|
||||
except IndexError:
|
||||
LOG.error("Badly formed S3 URI")
|
||||
raise exceptions.BadStoreUri(uri=uri)
|
||||
|
||||
|
||||
class Store(glance_store.driver.Store):
|
||||
"""An implementation of the s3 adapter."""
|
||||
|
||||
_CAPABILITIES = capabilities.BitMasks.RW_ACCESS
|
||||
OPTIONS = _S3_OPTS
|
||||
EXAMPLE_URL = "s3://<ACCESS_KEY>:<SECRET_KEY>@<S3_URL>/<BUCKET>/<OBJ>"
|
||||
|
||||
READ_CHUNKSIZE = 64 * units.Ki
|
||||
WRITE_CHUNKSIZE = READ_CHUNKSIZE
|
||||
|
||||
@staticmethod
|
||||
def get_schemes():
|
||||
return 's3', 's3+http', 's3+https'
|
||||
|
||||
def configure_add(self):
|
||||
"""
|
||||
Configure the Store to use the stored configuration options
|
||||
Any store that needs special configuration should implement
|
||||
this method. If the store was not able to successfully configure
|
||||
itself, it should raise `exceptions.BadStoreConfiguration`
|
||||
"""
|
||||
self.s3_host = self._option_get('s3_store_host')
|
||||
self.access_key = self._option_get('s3_store_access_key')
|
||||
self.secret_key = self._option_get('s3_store_secret_key')
|
||||
self.bucket = self._option_get('s3_store_bucket')
|
||||
|
||||
self.scheme = 's3'
|
||||
if self.s3_host.startswith('https://'):
|
||||
self.scheme = 's3+https'
|
||||
self.full_s3_host = self.s3_host
|
||||
elif self.s3_host.startswith('http://'):
|
||||
self.full_s3_host = self.s3_host
|
||||
else: # Defaults http
|
||||
self.full_s3_host = 'http://' + self.s3_host
|
||||
|
||||
_s3_obj_size = self._option_get('s3_store_large_object_size')
|
||||
self.s3_store_large_object_size = _s3_obj_size * units.Mi
|
||||
_s3_ck_size = self._option_get('s3_store_large_object_chunk_size')
|
||||
_s3_ck_min = DEFAULT_LARGE_OBJECT_MIN_CHUNK_SIZE
|
||||
if _s3_ck_size < _s3_ck_min:
|
||||
reason = _("s3_store_large_object_chunk_size must be at "
|
||||
"least %d MB.") % _s3_ck_min
|
||||
LOG.error(reason)
|
||||
raise exceptions.BadStoreConfiguration(store_name="s3",
|
||||
reason=reason)
|
||||
self.s3_store_large_object_chunk_size = _s3_ck_size * units.Mi
|
||||
|
||||
self.s3_store_thread_pools = self._option_get('s3_store_thread_pools')
|
||||
if self.s3_store_thread_pools <= 0:
|
||||
reason = _("s3_store_thread_pools must be a positive "
|
||||
"integer. %s") % self.s3_store_thread_pools
|
||||
LOG.error(reason)
|
||||
raise exceptions.BadStoreConfiguration(store_name="s3",
|
||||
reason=reason)
|
||||
|
||||
if self.backend_group:
|
||||
self._set_url_prefix()
|
||||
|
||||
def _set_url_prefix(self):
|
||||
s3_host = self.s3_host
|
||||
if s3_host.startswith('http://'):
|
||||
s3_host = s3_host[len('http://'):]
|
||||
elif s3_host.startswith('https://'):
|
||||
s3_host = s3_host[len('https://'):]
|
||||
|
||||
self._url_prefix = "%s://%s:%s@%s/%s" % (self.scheme, self.access_key,
|
||||
self.secret_key, s3_host,
|
||||
self.bucket)
|
||||
|
||||
def _option_get(self, param):
|
||||
if self.backend_group:
|
||||
store_conf = getattr(self.conf, self.backend_group)
|
||||
else:
|
||||
store_conf = self.conf.glance_store
|
||||
|
||||
result = getattr(store_conf, param)
|
||||
if not result:
|
||||
if param == 's3_store_create_bucket_on_put':
|
||||
return result
|
||||
reason = _("Could not find %s in configuration options.") % param
|
||||
LOG.error(reason)
|
||||
raise exceptions.BadStoreConfiguration(store_name="s3",
|
||||
reason=reason)
|
||||
return result
|
||||
|
||||
def _create_s3_client(self, loc):
|
||||
"""Create a client object to use when connecting to S3.
|
||||
|
||||
:param loc: `glance_store.location.Location` object, supplied
|
||||
from glance_store.location.get_location_from_uri()
|
||||
:returns: An object with credentials to connect to S3
|
||||
"""
|
||||
s3_host = self._option_get('s3_store_host')
|
||||
url_format = self._option_get('s3_store_bucket_url_format')
|
||||
calling_format = {'addressing_style': url_format}
|
||||
|
||||
session = boto_session.Session(aws_access_key_id=loc.accesskey,
|
||||
aws_secret_access_key=loc.secretkey)
|
||||
config = boto_client.Config(s3=calling_format)
|
||||
location = get_s3_location(s3_host)
|
||||
|
||||
bucket_name = loc.bucket
|
||||
if (url_format == 'virtual' and
|
||||
not boto_utils.check_dns_name(bucket_name)):
|
||||
raise boto_exceptions.InvalidDNSNameError(bucket_name=bucket_name)
|
||||
|
||||
region_name, endpoint_url = None, None
|
||||
if location:
|
||||
region_name = location
|
||||
else:
|
||||
endpoint_url = s3_host
|
||||
|
||||
return session.client(service_name='s3',
|
||||
endpoint_url=endpoint_url,
|
||||
region_name=region_name,
|
||||
use_ssl=(loc.scheme == 's3+https'),
|
||||
config=config)
|
||||
|
||||
def _operation_set(self, loc):
|
||||
"""Objects and variables frequently used when operating S3 are
|
||||
returned together.
|
||||
|
||||
:param loc: `glance_store.location.Location` object, supplied
|
||||
from glance_store.location.get_location_from_uri()
|
||||
"returns: tuple of: (1) S3 client object, (2) Bucket name,
|
||||
(3) Image Object name
|
||||
"""
|
||||
return self._create_s3_client(loc), loc.bucket, loc.key
|
||||
|
||||
@capabilities.check
|
||||
def get(self, location, offset=0, chunk_size=None, context=None):
|
||||
"""
|
||||
Takes a `glance_store.location.Location` object that indicates
|
||||
where to find the image file, and returns a tuple of generator
|
||||
(for reading the image file) and image_size
|
||||
|
||||
:param location: `glance_store.location.Location` object, supplied
|
||||
from glance_store.location.get_location_from_uri()
|
||||
:raises: `glance_store.exceptions.NotFound` if image does not exist
|
||||
"""
|
||||
loc = location.store_location
|
||||
s3_client, bucket, key = self._operation_set(loc)
|
||||
|
||||
if not self._object_exists(s3_client, bucket, key):
|
||||
LOG.warning("Could not find key %(key)s in "
|
||||
"bucket %(bucket)s", {'key': key, 'bucket': bucket})
|
||||
raise exceptions.NotFound(image=key)
|
||||
|
||||
key = s3_client.get_object(Bucket=bucket, Key=key)
|
||||
|
||||
LOG.debug("Retrieved image object from S3 using s3_host=%(s3_host)s, "
|
||||
"access_key=%(accesskey)s, bucket=%(bucket)s, "
|
||||
"key=%(key)s)",
|
||||
{'s3_host': loc.s3serviceurl, 'accesskey': loc.accesskey,
|
||||
'bucket': bucket, 'key': key})
|
||||
|
||||
cs = self.READ_CHUNKSIZE
|
||||
|
||||
class ResponseIndexable(glance_store.Indexable):
|
||||
def another(self):
|
||||
try:
|
||||
return next(self.wrapped)
|
||||
except StopIteration:
|
||||
return b''
|
||||
|
||||
return (ResponseIndexable(utils.chunkiter(key['Body'], cs),
|
||||
key['ContentLength']), key['ContentLength'])
|
||||
|
||||
def get_size(self, location, context=None):
|
||||
"""
|
||||
Takes a `glance_store.location.Location` object that indicates
|
||||
where to find the image file and returns the image size
|
||||
|
||||
:param location: `glance_store.location.Location` object, supplied
|
||||
from glance_store.location.get_location_from_uri()
|
||||
:raises: `glance_store.exceptions.NotFound` if image does not exist
|
||||
:rtype: int
|
||||
"""
|
||||
loc = location.store_location
|
||||
s3_client, bucket, key = self._operation_set(loc)
|
||||
|
||||
if not self._object_exists(s3_client, bucket, key):
|
||||
LOG.warning("Could not find key %(key)s in "
|
||||
"bucket %(bucket)s", {'key': key, 'bucket': bucket})
|
||||
raise exceptions.NotFound(image=key)
|
||||
|
||||
key = s3_client.head_object(Bucket=bucket, Key=key)
|
||||
return key['ContentLength']
|
||||
|
||||
@capabilities.check
|
||||
def add(self, image_id, image_file, image_size, hashing_algo, context=None,
|
||||
verifier=None):
|
||||
"""
|
||||
Stores an image file with supplied identifier to the backend
|
||||
storage system and returns a tuple containing information
|
||||
about the stored image.
|
||||
|
||||
:param image_id: The opaque image identifier
|
||||
:param image_file: The image data to write, as a file-like object
|
||||
:param image_size: The size of the image data to write, in bytes
|
||||
:param hashing_algo: A hashlib algorithm identifier (string)
|
||||
:param context: A context object
|
||||
:param verifier: An object used to verify signatures for images
|
||||
|
||||
:returns: tuple of: (1) URL in backing store, (2) bytes written,
|
||||
(3) checksum, (4) multihash value, and (5) a dictionary
|
||||
with storage system specific information
|
||||
:raises: `glance_store.exceptions.Duplicate` if the image already
|
||||
exists
|
||||
"""
|
||||
loc = StoreLocation(store_specs={'scheme': self.scheme,
|
||||
'bucket': self.bucket,
|
||||
'key': image_id,
|
||||
's3serviceurl': self.full_s3_host,
|
||||
'accesskey': self.access_key,
|
||||
'secretkey': self.secret_key},
|
||||
conf=self.conf,
|
||||
backend_group=self.backend_group)
|
||||
|
||||
s3_client, bucket, key = self._operation_set(loc)
|
||||
|
||||
if not self._bucket_exists(s3_client, bucket):
|
||||
if self._option_get('s3_store_create_bucket_on_put'):
|
||||
self._create_bucket(s3_client,
|
||||
self._option_get('s3_store_host'),
|
||||
bucket)
|
||||
else:
|
||||
msg = (_("The bucket %s does not exist in "
|
||||
"S3. Please set the "
|
||||
"s3_store_create_bucket_on_put option "
|
||||
"to add bucket to S3 automatically.") % bucket)
|
||||
raise glance_store.BackendException(msg)
|
||||
|
||||
LOG.debug("Adding image object to S3 using (s3_host=%(s3_host)s, "
|
||||
"access_key=%(access_key)s, bucket=%(bucket)s, "
|
||||
"key=%(key)s)",
|
||||
{'s3_host': self.s3_host, 'access_key': loc.accesskey,
|
||||
'bucket': bucket, 'key': key})
|
||||
|
||||
if not self._object_exists(s3_client, bucket, key):
|
||||
if image_size < self.s3_store_large_object_size:
|
||||
return self._add_singlepart(s3_client=s3_client,
|
||||
image_file=image_file,
|
||||
bucket=bucket,
|
||||
key=key,
|
||||
loc=loc,
|
||||
hashing_algo=hashing_algo,
|
||||
verifier=verifier)
|
||||
|
||||
return self._add_multipart(s3_client=s3_client,
|
||||
image_file=image_file,
|
||||
image_size=image_size,
|
||||
bucket=bucket,
|
||||
key=key,
|
||||
loc=loc,
|
||||
hashing_algo=hashing_algo,
|
||||
verifier=verifier)
|
||||
LOG.warning("S3 already has an image with bucket ID %(bucket)s, "
|
||||
"key %(key)s", {'bucket': bucket, 'key': key})
|
||||
raise exceptions.Duplicate(image=key)
|
||||
|
||||
def _add_singlepart(self, s3_client, image_file, bucket, key, loc,
|
||||
hashing_algo, verifier):
|
||||
"""Stores an image file with a single part upload to S3 backend.
|
||||
|
||||
:param s3_client: An object with credentials to connect to S3
|
||||
:param image_file: The image data to write, as a file-like object
|
||||
:param bucket: S3 bucket name
|
||||
:param key: The object name to be stored (image identifier)
|
||||
:param loc: `glance_store.location.Location` object, supplied
|
||||
from glance_store.location.get_location_from_uri()
|
||||
:param hashing_algo: A hashlib algorithm identifier (string)
|
||||
:param verifier: An object used to verify signatures for images
|
||||
:returns: tuple of: (1) URL in backing store, (2) bytes written,
|
||||
(3) checksum, (4) multihash value, and (5) a dictionary
|
||||
with storage system specific information
|
||||
"""
|
||||
os_hash_value = hashlib.new(str(hashing_algo))
|
||||
checksum = hashlib.md5()
|
||||
image_data = b''
|
||||
image_size = 0
|
||||
for chunk in utils.chunkreadable(image_file, self.WRITE_CHUNKSIZE):
|
||||
image_data += chunk
|
||||
image_size += len(chunk)
|
||||
os_hash_value.update(chunk)
|
||||
checksum.update(chunk)
|
||||
if verifier:
|
||||
verifier.update(chunk)
|
||||
|
||||
s3_client.put_object(Body=image_data,
|
||||
Bucket=bucket,
|
||||
Key=key)
|
||||
hash_hex = os_hash_value.hexdigest()
|
||||
checksum_hex = checksum.hexdigest()
|
||||
|
||||
# Add store backend information to location metadata
|
||||
metadata = {}
|
||||
if self.backend_group:
|
||||
metadata['store'] = self.backend_group
|
||||
|
||||
LOG.debug("Wrote %(size)d bytes to S3 key named %(key)s "
|
||||
"with checksum %(checksum)s",
|
||||
{'size': image_size, 'key': key, 'checksum': checksum_hex})
|
||||
|
||||
return loc.get_uri(), image_size, checksum_hex, hash_hex, metadata
|
||||
|
||||
def _add_multipart(self, s3_client, image_file, image_size, bucket,
|
||||
key, loc, hashing_algo, verifier):
|
||||
"""Stores an image file with a multi part upload to S3 backend.
|
||||
|
||||
:param s3_client: An object with credentials to connect to S3
|
||||
:param image_file: The image data to write, as a file-like object
|
||||
:param bucket: S3 bucket name
|
||||
:param key: The object name to be stored (image identifier)
|
||||
:param loc: `glance_store.location.Location` object, supplied
|
||||
from glance_store.location.get_location_from_uri()
|
||||
:param hashing_algo: A hashlib algorithm identifier (string)
|
||||
:param verifier: An object used to verify signatures for images
|
||||
:returns: tuple of: (1) URL in backing store, (2) bytes written,
|
||||
(3) checksum, (4) multihash value, and (5) a dictionary
|
||||
with storage system specific information
|
||||
"""
|
||||
os_hash_value = hashlib.new(str(hashing_algo))
|
||||
checksum = hashlib.md5()
|
||||
pool_size = self.s3_store_thread_pools
|
||||
pool = eventlet.greenpool.GreenPool(size=pool_size)
|
||||
mpu = s3_client.create_multipart_upload(Bucket=bucket, Key=key)
|
||||
upload_id = mpu['UploadId']
|
||||
LOG.debug("Multipart initiate key=%(key)s, UploadId=%(UploadId)s",
|
||||
{'key': key, 'UploadId': upload_id})
|
||||
cstart = 0
|
||||
plist = []
|
||||
|
||||
chunk_size = int(math.ceil(float(image_size) / MAX_PART_NUM))
|
||||
write_chunk_size = max(self.s3_store_large_object_chunk_size,
|
||||
chunk_size)
|
||||
it = utils.chunkreadable(image_file, self.WRITE_CHUNKSIZE)
|
||||
buffered_chunk = b''
|
||||
while True:
|
||||
try:
|
||||
buffered_clen = len(buffered_chunk)
|
||||
if buffered_clen < write_chunk_size:
|
||||
# keep reading data
|
||||
read_chunk = next(it)
|
||||
buffered_chunk += read_chunk
|
||||
continue
|
||||
else:
|
||||
write_chunk = buffered_chunk[:write_chunk_size]
|
||||
remained_data = buffered_chunk[write_chunk_size:]
|
||||
os_hash_value.update(write_chunk)
|
||||
checksum.update(write_chunk)
|
||||
if verifier:
|
||||
verifier.update(write_chunk)
|
||||
fp = six.BytesIO(write_chunk)
|
||||
fp.seek(0)
|
||||
part = UploadPart(mpu, fp, cstart + 1, len(write_chunk))
|
||||
pool.spawn_n(run_upload, s3_client, bucket, key, part)
|
||||
plist.append(part)
|
||||
cstart += 1
|
||||
buffered_chunk = remained_data
|
||||
except StopIteration:
|
||||
if len(buffered_chunk) > 0:
|
||||
# Write the last chunk data
|
||||
write_chunk = buffered_chunk
|
||||
os_hash_value.update(write_chunk)
|
||||
checksum.update(write_chunk)
|
||||
if verifier:
|
||||
verifier.update(write_chunk)
|
||||
fp = six.BytesIO(write_chunk)
|
||||
fp.seek(0)
|
||||
part = UploadPart(mpu, fp, cstart + 1, len(write_chunk))
|
||||
pool.spawn_n(run_upload, s3_client, bucket, key, part)
|
||||
plist.append(part)
|
||||
break
|
||||
|
||||
pedict = {}
|
||||
total_size = 0
|
||||
pool.waitall()
|
||||
|
||||
for part in plist:
|
||||
pedict.update(part.etag)
|
||||
total_size += part.size
|
||||
|
||||
success = True
|
||||
for part in plist:
|
||||
if not part.success:
|
||||
success = False
|
||||
|
||||
if success:
|
||||
# Complete
|
||||
mpu_list = self._get_mpu_list(pedict)
|
||||
s3_client.complete_multipart_upload(Bucket=bucket,
|
||||
Key=key,
|
||||
MultipartUpload=mpu_list,
|
||||
UploadId=upload_id)
|
||||
hash_hex = os_hash_value.hexdigest()
|
||||
checksum_hex = checksum.hexdigest()
|
||||
|
||||
# Add store backend information to location metadata
|
||||
metadata = {}
|
||||
if self.backend_group:
|
||||
metadata['store'] = self.backend_group
|
||||
|
||||
LOG.info("Multipart complete key=%(key)s "
|
||||
"UploadId=%(UploadId)s "
|
||||
"Wrote %(total_size)d bytes to S3 key "
|
||||
"named %(key)s "
|
||||
"with checksum %(checksum)s",
|
||||
{'key': key, 'UploadId': upload_id,
|
||||
'total_size': total_size, 'checksum': checksum_hex})
|
||||
return loc.get_uri(), total_size, checksum_hex, hash_hex, metadata
|
||||
|
||||
# Abort
|
||||
s3_client.abort_multipart_upload(Bucket=bucket, Key=key,
|
||||
UploadId=upload_id)
|
||||
LOG.error("Some parts failed to upload to S3. "
|
||||
"Aborted the key=%s", key)
|
||||
msg = _("Failed to add image object to S3. key=%s") % key
|
||||
raise glance_store.BackendException(msg)
|
||||
|
||||
@capabilities.check
|
||||
def delete(self, location, context=None):
|
||||
"""
|
||||
Takes a `glance_store.location.Location` object that indicates
|
||||
where to find the image file to delete.
|
||||
|
||||
:param location: `glance_store.location.Location` object, supplied
|
||||
from glance_store.location.get_location_from_uri()
|
||||
|
||||
:raises: NotFound if image does not exist;
|
||||
InUseByStore if image is in use or snapshot unprotect failed
|
||||
"""
|
||||
loc = location.store_location
|
||||
s3_client, bucket, key = self._operation_set(loc)
|
||||
|
||||
if not self._object_exists(s3_client, bucket, key):
|
||||
LOG.warning("Could not find key %(key)s in bucket %(bucket)s",
|
||||
{'key': key, 'bucket': bucket})
|
||||
raise exceptions.NotFound(image=key)
|
||||
|
||||
LOG.debug("Deleting image object from S3 using s3_host=%(s3_host)s, "
|
||||
"accesskey=%(accesskey)s, bucket=%(bucket)s, key=%(key)s)",
|
||||
{'s3_host': loc.s3serviceurl, 'accesskey': loc.accesskey,
|
||||
'bucket': bucket, 'key': key})
|
||||
|
||||
return s3_client.delete_object(Bucket=bucket, Key=key)
|
||||
|
||||
@staticmethod
|
||||
def _bucket_exists(s3_client, bucket):
|
||||
"""Check whether bucket exists in the S3.
|
||||
|
||||
:param s3_client: An object with credentials to connect to S3
|
||||
:param bucket: S3 bucket name
|
||||
:returns: boolean value; If the value is true, the bucket is exist
|
||||
if false, it is not.
|
||||
:raises: BadStoreConfiguration if cannot connect to S3 successfully
|
||||
"""
|
||||
try:
|
||||
s3_client.head_bucket(Bucket=bucket)
|
||||
except boto_exceptions.ClientError as e:
|
||||
error_code = e.response['Error']['Code']
|
||||
if error_code == '404':
|
||||
return False
|
||||
msg = ("Failed to get bucket info: %s" %
|
||||
encodeutils.exception_to_unicode(e))
|
||||
LOG.error(msg)
|
||||
raise glance_store.BadStoreConfiguration(store_name='s3',
|
||||
reason=msg)
|
||||
else:
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def _object_exists(s3_client, bucket, key):
|
||||
"""Check whether object exists in the specific bucket of S3.
|
||||
|
||||
:param s3_client: An object with credentials to connect to S3
|
||||
:param bucket: S3 bucket name
|
||||
:param key: The image object name
|
||||
:returns: boolean value; If the value is true, the object is exist
|
||||
if false, it is not.
|
||||
:raises: BadStoreConfiguration if cannot connect to S3 successfully
|
||||
"""
|
||||
try:
|
||||
s3_client.head_object(Bucket=bucket, Key=key)
|
||||
except boto_exceptions.ClientError as e:
|
||||
error_code = e.response['Error']['Code']
|
||||
if error_code == '404':
|
||||
return False
|
||||
msg = ("Failed to get object info: %s" %
|
||||
encodeutils.exception_to_unicode(e))
|
||||
LOG.error(msg)
|
||||
raise glance_store.BadStoreConfiguration(store_name='s3',
|
||||
reason=msg)
|
||||
else:
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def _create_bucket(s3_client, s3_host, bucket):
|
||||
"""Create bucket into the S3.
|
||||
|
||||
:param s3_client: An object with credentials to connect to S3
|
||||
:param s3_host: S3 endpoint url
|
||||
:param bucket: S3 bucket name
|
||||
:raises: BadStoreConfiguration if cannot connect to S3 successfully
|
||||
"""
|
||||
region = get_s3_location(s3_host)
|
||||
try:
|
||||
s3_client.create_bucket(
|
||||
Bucket=bucket,
|
||||
) if region == '' else s3_client.create_bucket(
|
||||
Bucket=bucket,
|
||||
CreateBucketConfiguration={
|
||||
'LocationConstraint': region
|
||||
}
|
||||
)
|
||||
except boto_exceptions.ClientError as e:
|
||||
msg = ("Failed to add bucket to S3: %s" %
|
||||
encodeutils.exception_to_unicode(e))
|
||||
LOG.error(msg)
|
||||
raise glance_store.BadStoreConfiguration(store_name='s3',
|
||||
reason=msg)
|
||||
|
||||
@staticmethod
|
||||
def _get_mpu_list(pedict):
|
||||
"""Convert an object type and struct for use in
|
||||
boto3.client('s3').complete_multipart_upload.
|
||||
|
||||
:param pedict: dict which containing UploadPart.etag
|
||||
:returns: list with pedict converted properly
|
||||
"""
|
||||
return {
|
||||
'Parts': [
|
||||
{
|
||||
'PartNumber': pnum,
|
||||
'ETag': etag
|
||||
} for pnum, etag in six.iteritems(pedict)
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
def get_s3_location(s3_host):
|
||||
"""Get S3 region information from ``s3_store_host``.
|
||||
|
||||
:param s3_host: S3 endpoint url
|
||||
:returns: string value; region information which user wants to use on
|
||||
Amazon S3, and if user wants to use S3 compatible storage,
|
||||
returns ''
|
||||
"""
|
||||
locations = {
|
||||
's3.amazonaws.com': '',
|
||||
's3-us-east-1.amazonaws.com': 'us-east-1',
|
||||
's3-us-east-2.amazonaws.com': 'us-east-2',
|
||||
's3-us-west-1.amazonaws.com': 'us-west-1',
|
||||
's3-us-west-2.amazonaws.com': 'us-west-2',
|
||||
's3-ap-east-1.amazonaws.com': 'ap-east-1',
|
||||
's3-ap-south-1.amazonaws.com': 'ap-south-1',
|
||||
's3-ap-northeast-1.amazonaws.com': 'ap-northeast-1',
|
||||
's3-ap-northeast-2.amazonaws.com': 'ap-northeast-2',
|
||||
's3-ap-northeast-3.amazonaws.com': 'ap-northeast-3',
|
||||
's3-ap-southeast-1.amazonaws.com': 'ap-southeast-1',
|
||||
's3-ap-southeast-2.amazonaws.com': 'ap-southeast-2',
|
||||
's3-ca-central-1.amazonaws.com': 'ca-central-1',
|
||||
's3-cn-north-1.amazonaws.com.cn': 'cn-north-1',
|
||||
's3-cn-northwest-1.amazonaws.com.cn': 'cn-northwest-1',
|
||||
's3-eu-central-1.amazonaws.com': 'eu-central-1',
|
||||
's3-eu-west-1.amazonaws.com': 'eu-west-1',
|
||||
's3-eu-west-2.amazonaws.com': 'eu-west-2',
|
||||
's3-eu-west-3.amazonaws.com': 'eu-west-3',
|
||||
's3-eu-north-1.amazonaws.com': 'eu-north-1',
|
||||
's3-sa-east-1.amazonaws.com': 'sa-east-1'
|
||||
}
|
||||
# strip off scheme and port if present
|
||||
key = re.sub('^(https?://)?(?P<host>[^:]+[^/])(:[0-9]+)?/?$',
|
||||
'\g<host>',
|
||||
s3_host)
|
||||
return locations.get(key, '')
|
@ -59,6 +59,7 @@ Possible values:
|
||||
* rbd
|
||||
* cinder
|
||||
* vmware
|
||||
* s3
|
||||
|
||||
Related Options:
|
||||
* default_store
|
||||
@ -68,7 +69,7 @@ Related Options:
|
||||
default='file',
|
||||
choices=('file', 'filesystem', 'http', 'https', 'swift',
|
||||
'swift+http', 'swift+https', 'swift+config', 'rbd',
|
||||
'cinder', 'vsphere'),
|
||||
'cinder', 'vsphere', 's3'),
|
||||
deprecated_for_removal=True,
|
||||
deprecated_since='Rocky',
|
||||
deprecated_reason="""
|
||||
@ -102,6 +103,7 @@ Possible values:
|
||||
* rbd
|
||||
* cinder
|
||||
* vsphere
|
||||
* s3
|
||||
|
||||
Related Options:
|
||||
* stores
|
||||
|
@ -69,6 +69,8 @@ def get_location_from_uri(uri, conf=CONF):
|
||||
swift+http://user:account:pass@authurl.com/container/obj-id
|
||||
file:///var/lib/glance/images/1
|
||||
cinder://volume-id
|
||||
s3://accesskey:secretkey@s3.amazonaws.com/bucket/key-id
|
||||
s3+https://accesskey:secretkey@s3.amazonaws.com/bucket/key-id
|
||||
"""
|
||||
pieces = urllib.parse.urlparse(uri)
|
||||
if pieces.scheme not in SCHEME_TO_CLS_MAP.keys():
|
||||
@ -97,6 +99,8 @@ def get_location_from_uri_and_backend(uri, backend, conf=CONF):
|
||||
swift+http://user:account:pass@authurl.com/container/obj-id
|
||||
file:///var/lib/glance/images/1
|
||||
cinder://volume-id
|
||||
s3://accesskey:secretkey@s3.amazonaws.com/bucket/key-id
|
||||
s3+https://accesskey:secretkey@s3.amazonaws.com/bucket/key-id
|
||||
"""
|
||||
|
||||
pieces = urllib.parse.urlparse(uri)
|
||||
|
459
glance_store/tests/unit/test_multistore_s3.py
Normal file
459
glance_store/tests/unit/test_multistore_s3.py
Normal file
@ -0,0 +1,459 @@
|
||||
# Copyright 2011 OpenStack Foundation
|
||||
# All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""Tests the Multiple S3 backend store"""
|
||||
|
||||
import hashlib
|
||||
import uuid
|
||||
|
||||
import boto3
|
||||
import botocore
|
||||
from botocore import exceptions as boto_exceptions
|
||||
from botocore import stub
|
||||
import mock
|
||||
from oslo_config import cfg
|
||||
from oslo_utils import units
|
||||
import six
|
||||
|
||||
import glance_store as store
|
||||
from glance_store._drivers import s3
|
||||
from glance_store import exceptions
|
||||
from glance_store import location
|
||||
from glance_store.tests import base
|
||||
from glance_store.tests.unit import test_store_capabilities
|
||||
|
||||
|
||||
FAKE_UUID = str(uuid.uuid4())
|
||||
|
||||
FIVE_KB = 5 * units.Ki
|
||||
S3_CONF = {
|
||||
's3_store_access_key': 'user',
|
||||
's3_store_secret_key': 'key',
|
||||
's3_store_host': 'https://s3-region1.com',
|
||||
's3_store_bucket': 'glance',
|
||||
's3_store_large_object_size': 5, # over 5MB is large
|
||||
's3_store_large_object_chunk_size': 6, # part size is 6MB
|
||||
}
|
||||
|
||||
|
||||
def format_s3_location(user, key, authurl, bucket, obj):
|
||||
"""Helper method that returns a S3 store URI given the component pieces."""
|
||||
scheme = 's3'
|
||||
if authurl.startswith('https://'):
|
||||
scheme = 's3+https'
|
||||
authurl = authurl[8:]
|
||||
elif authurl.startswith('http://'):
|
||||
authurl = authurl[7:]
|
||||
authurl = authurl.strip('/')
|
||||
return "%s://%s:%s@%s/%s/%s" % (scheme, user, key, authurl,
|
||||
bucket, obj)
|
||||
|
||||
|
||||
class TestMultiS3Store(base.MultiStoreBaseTest,
|
||||
test_store_capabilities.TestStoreCapabilitiesChecking):
|
||||
|
||||
# NOTE(flaper87): temporary until we
|
||||
# can move to a fully-local lib.
|
||||
# (Swift store's fault)
|
||||
_CONF = cfg.ConfigOpts()
|
||||
|
||||
def setUp(self):
|
||||
"""Establish a clean test environment."""
|
||||
super(TestMultiS3Store, self).setUp()
|
||||
enabled_backends = {
|
||||
"s3_region1": "s3",
|
||||
"s3_region2": "s3"
|
||||
}
|
||||
self.hash_algo = 'sha256'
|
||||
self.conf = self._CONF
|
||||
self.conf(args=[])
|
||||
self.conf.register_opt(cfg.DictOpt('enabled_backends'))
|
||||
self.config(enabled_backends=enabled_backends)
|
||||
store.register_store_opts(self.conf)
|
||||
self.config(default_backend='s3_region1', group='glance_store')
|
||||
|
||||
# set s3 related config options
|
||||
self.config(group='s3_region1',
|
||||
s3_store_access_key='user',
|
||||
s3_store_secret_key='key',
|
||||
s3_store_host='https://s3-region1.com',
|
||||
s3_store_bucket='glance',
|
||||
s3_store_large_object_size=5,
|
||||
s3_store_large_object_chunk_size=6)
|
||||
|
||||
self.config(group='s3_region2',
|
||||
s3_store_access_key='user',
|
||||
s3_store_secret_key='key',
|
||||
s3_store_host='http://s3-region2.com',
|
||||
s3_store_bucket='glance',
|
||||
s3_store_large_object_size=5,
|
||||
s3_store_large_object_chunk_size=6)
|
||||
# Ensure stores + locations cleared
|
||||
location.SCHEME_TO_CLS_BACKEND_MAP = {}
|
||||
store.create_multi_stores(self.conf)
|
||||
|
||||
self.addCleanup(setattr, location, 'SCHEME_TO_CLS_BACKEND_MAP',
|
||||
dict())
|
||||
self.addCleanup(self.conf.reset)
|
||||
|
||||
self.store = s3.Store(self.conf, backend="s3_region1")
|
||||
self.store.configure()
|
||||
self.register_store_backend_schemes(self.store, 's3', 's3_region1')
|
||||
|
||||
def test_location_url_prefix_is_set(self):
|
||||
expected_url_prefix = "s3+https://user:key@s3-region1.com/glance"
|
||||
self.assertEqual(expected_url_prefix, self.store.url_prefix)
|
||||
|
||||
def test_get_invalid_bucket_name(self):
|
||||
self.config(s3_store_bucket_url_format='virtual', group='s3_region1')
|
||||
|
||||
invalid_buckets = ['not.dns.compliant', 'aa', 'bucket-']
|
||||
for bucket in invalid_buckets:
|
||||
loc = location.get_location_from_uri_and_backend(
|
||||
"s3+https://user:key@auth_address/%s/key" % bucket,
|
||||
's3_region1', conf=self.conf)
|
||||
self.assertRaises(boto_exceptions.InvalidDNSNameError,
|
||||
self.store.get, loc)
|
||||
|
||||
@mock.patch.object(boto3.session.Session, "client")
|
||||
def test_get(self, mock_client):
|
||||
"""Test a "normal" retrieval of an image in chunks."""
|
||||
bucket, key = 'glance', FAKE_UUID
|
||||
fixture_object = {
|
||||
'Body': six.BytesIO(b"*" * FIVE_KB),
|
||||
'ContentLength': FIVE_KB
|
||||
}
|
||||
fake_s3_client = botocore.session.get_session().create_client('s3')
|
||||
|
||||
with stub.Stubber(fake_s3_client) as stubber:
|
||||
stubber.add_response(method='head_object',
|
||||
service_response={},
|
||||
expected_params={
|
||||
'Bucket': bucket,
|
||||
'Key': key
|
||||
})
|
||||
stubber.add_response(method='get_object',
|
||||
service_response=fixture_object,
|
||||
expected_params={
|
||||
'Bucket': bucket,
|
||||
'Key': key
|
||||
})
|
||||
mock_client.return_value = fake_s3_client
|
||||
|
||||
loc = location.get_location_from_uri_and_backend(
|
||||
"s3+https://user:key@auth_address/%s/%s" % (bucket, key),
|
||||
's3_region1', conf=self.conf)
|
||||
(image_s3, image_size) = self.store.get(loc)
|
||||
|
||||
self.assertEqual(FIVE_KB, image_size)
|
||||
|
||||
expected_data = b"*" * FIVE_KB
|
||||
data = b""
|
||||
|
||||
for chunk in image_s3:
|
||||
data += chunk
|
||||
self.assertEqual(expected_data, data)
|
||||
|
||||
def test_partial_get(self):
|
||||
loc = location.get_location_from_uri_and_backend(
|
||||
"s3+https://user:key@auth_address/glance/%s" % FAKE_UUID,
|
||||
's3_region1', conf=self.conf)
|
||||
self.assertRaises(exceptions.StoreRandomGetNotSupported,
|
||||
self.store.get, loc, chunk_size=1)
|
||||
|
||||
@mock.patch.object(boto3.session.Session, "client")
|
||||
def test_get_non_existing(self, mock_client):
|
||||
"""Test that trying to retrieve a s3 that doesn't exist raises an
|
||||
error
|
||||
"""
|
||||
bucket, key = 'glance', 'no_exist'
|
||||
fake_s3_client = botocore.session.get_session().create_client('s3')
|
||||
|
||||
with stub.Stubber(fake_s3_client) as stubber:
|
||||
stubber.add_client_error(method='head_object',
|
||||
service_error_code='404',
|
||||
service_message='''
|
||||
The specified key does not exist.
|
||||
''',
|
||||
expected_params={
|
||||
'Bucket': bucket,
|
||||
'Key': key
|
||||
})
|
||||
mock_client.return_value = fake_s3_client
|
||||
|
||||
uri = "s3+https://user:key@auth_address/%s/%s" % (bucket, key)
|
||||
loc = location.get_location_from_uri_and_backend(uri,
|
||||
's3_region1',
|
||||
conf=self.conf)
|
||||
self.assertRaises(exceptions.NotFound, self.store.get, loc)
|
||||
|
||||
@mock.patch.object(boto3.session.Session, "client")
|
||||
def test_add_singlepart(self, mock_client):
|
||||
"""Test that we can add an image via the s3 backend."""
|
||||
expected_image_id = str(uuid.uuid4())
|
||||
expected_s3_size = FIVE_KB
|
||||
expected_s3_contents = b"*" * expected_s3_size
|
||||
expected_checksum = hashlib.md5(expected_s3_contents).hexdigest()
|
||||
expected_multihash = hashlib.sha256(expected_s3_contents).hexdigest()
|
||||
expected_location = format_s3_location(
|
||||
S3_CONF['s3_store_access_key'],
|
||||
S3_CONF['s3_store_secret_key'],
|
||||
S3_CONF['s3_store_host'],
|
||||
S3_CONF['s3_store_bucket'],
|
||||
expected_image_id)
|
||||
image_s3 = six.BytesIO(expected_s3_contents)
|
||||
|
||||
fake_s3_client = botocore.session.get_session().create_client('s3')
|
||||
|
||||
with stub.Stubber(fake_s3_client) as stubber:
|
||||
stubber.add_response(method='head_bucket',
|
||||
service_response={},
|
||||
expected_params={
|
||||
'Bucket': S3_CONF['s3_store_bucket']
|
||||
})
|
||||
stubber.add_client_error(method='head_object',
|
||||
service_error_code='404',
|
||||
service_message='',
|
||||
expected_params={
|
||||
'Bucket': S3_CONF['s3_store_bucket'],
|
||||
'Key': expected_image_id
|
||||
})
|
||||
stubber.add_response(method='put_object',
|
||||
service_response={},
|
||||
expected_params={
|
||||
'Bucket': S3_CONF['s3_store_bucket'],
|
||||
'Key': expected_image_id,
|
||||
'Body': botocore.stub.ANY
|
||||
})
|
||||
|
||||
mock_client.return_value = fake_s3_client
|
||||
loc, size, checksum, multihash, metadata = \
|
||||
self.store.add(expected_image_id, image_s3, expected_s3_size,
|
||||
self.hash_algo)
|
||||
self.assertEqual("s3_region1", metadata["store"])
|
||||
|
||||
self.assertEqual(expected_location, loc)
|
||||
self.assertEqual(expected_s3_size, size)
|
||||
self.assertEqual(expected_checksum, checksum)
|
||||
self.assertEqual(expected_multihash, multihash)
|
||||
|
||||
@mock.patch.object(boto3.session.Session, "client")
|
||||
def test_add_different_backend(self, mock_client):
|
||||
self.store = s3.Store(self.conf, backend="s3_region2")
|
||||
self.store.configure()
|
||||
self.register_store_backend_schemes(self.store, 's3', 's3_region2')
|
||||
|
||||
expected_image_id = str(uuid.uuid4())
|
||||
expected_s3_size = FIVE_KB
|
||||
expected_s3_contents = b"*" * expected_s3_size
|
||||
expected_checksum = hashlib.md5(expected_s3_contents).hexdigest()
|
||||
expected_multihash = hashlib.sha256(expected_s3_contents).hexdigest()
|
||||
expected_location = format_s3_location(
|
||||
S3_CONF['s3_store_access_key'],
|
||||
S3_CONF['s3_store_secret_key'],
|
||||
'http://s3-region2.com',
|
||||
S3_CONF['s3_store_bucket'],
|
||||
expected_image_id)
|
||||
image_s3 = six.BytesIO(expected_s3_contents)
|
||||
|
||||
fake_s3_client = botocore.session.get_session().create_client('s3')
|
||||
|
||||
with stub.Stubber(fake_s3_client) as stubber:
|
||||
stubber.add_response(method='head_bucket',
|
||||
service_response={},
|
||||
expected_params={
|
||||
'Bucket': S3_CONF['s3_store_bucket']
|
||||
})
|
||||
stubber.add_client_error(method='head_object',
|
||||
service_error_code='404',
|
||||
service_message='',
|
||||
expected_params={
|
||||
'Bucket': S3_CONF['s3_store_bucket'],
|
||||
'Key': expected_image_id
|
||||
})
|
||||
stubber.add_response(method='put_object',
|
||||
service_response={},
|
||||
expected_params={
|
||||
'Bucket': S3_CONF['s3_store_bucket'],
|
||||
'Key': expected_image_id,
|
||||
'Body': botocore.stub.ANY
|
||||
})
|
||||
|
||||
mock_client.return_value = fake_s3_client
|
||||
loc, size, checksum, multihash, metadata = \
|
||||
self.store.add(expected_image_id, image_s3, expected_s3_size,
|
||||
self.hash_algo)
|
||||
self.assertEqual("s3_region2", metadata["store"])
|
||||
|
||||
self.assertEqual(expected_location, loc)
|
||||
self.assertEqual(expected_s3_size, size)
|
||||
self.assertEqual(expected_checksum, checksum)
|
||||
self.assertEqual(expected_multihash, multihash)
|
||||
|
||||
@mock.patch.object(boto3.session.Session, "client")
|
||||
def test_add_with_verifier(self, mock_client):
|
||||
"""Assert 'verifier.update' is called when verifier is provided"""
|
||||
expected_image_id = str(uuid.uuid4())
|
||||
expected_s3_size = FIVE_KB
|
||||
expected_s3_contents = b"*" * expected_s3_size
|
||||
image_s3 = six.BytesIO(expected_s3_contents)
|
||||
|
||||
fake_s3_client = botocore.session.get_session().create_client('s3')
|
||||
verifier = mock.MagicMock(name='mock_verifier')
|
||||
|
||||
with stub.Stubber(fake_s3_client) as stubber:
|
||||
stubber.add_response(method='head_bucket', service_response={})
|
||||
stubber.add_client_error(method='head_object',
|
||||
service_error_code='404',
|
||||
service_message='')
|
||||
stubber.add_response(method='put_object', service_response={})
|
||||
|
||||
mock_client.return_value = fake_s3_client
|
||||
self.store.add(expected_image_id, image_s3, expected_s3_size,
|
||||
self.hash_algo, verifier=verifier)
|
||||
verifier.update.assert_called_with(expected_s3_contents)
|
||||
|
||||
@mock.patch.object(boto3.session.Session, "client")
|
||||
def test_add_multipart(self, mock_client):
|
||||
"""Test that we can add an image via the s3 backend."""
|
||||
expected_image_id = str(uuid.uuid4())
|
||||
expected_s3_size = 16 * units.Mi
|
||||
expected_s3_contents = b"*" * expected_s3_size
|
||||
expected_checksum = hashlib.md5(expected_s3_contents).hexdigest()
|
||||
expected_multihash = hashlib.sha256(expected_s3_contents).hexdigest()
|
||||
expected_location = format_s3_location(
|
||||
S3_CONF['s3_store_access_key'],
|
||||
S3_CONF['s3_store_secret_key'],
|
||||
S3_CONF['s3_store_host'],
|
||||
S3_CONF['s3_store_bucket'],
|
||||
expected_image_id)
|
||||
image_s3 = six.BytesIO(expected_s3_contents)
|
||||
|
||||
fake_s3_client = botocore.session.get_session().create_client('s3')
|
||||
|
||||
num_parts = 3 # image size = 16MB and chunk size is 6MB
|
||||
with stub.Stubber(fake_s3_client) as stubber:
|
||||
stubber.add_response(method='head_bucket',
|
||||
service_response={},
|
||||
expected_params={
|
||||
'Bucket': S3_CONF['s3_store_bucket']
|
||||
})
|
||||
stubber.add_client_error(method='head_object',
|
||||
service_error_code='404',
|
||||
service_message='',
|
||||
expected_params={
|
||||
'Bucket': S3_CONF['s3_store_bucket'],
|
||||
'Key': expected_image_id
|
||||
})
|
||||
stubber.add_response(method='create_multipart_upload',
|
||||
service_response={
|
||||
"Bucket": S3_CONF['s3_store_bucket'],
|
||||
"Key": expected_image_id,
|
||||
"UploadId": 'UploadId'
|
||||
},
|
||||
expected_params={
|
||||
"Bucket": S3_CONF['s3_store_bucket'],
|
||||
"Key": expected_image_id,
|
||||
})
|
||||
parts = []
|
||||
remaining_image_size = expected_s3_size
|
||||
chunk_size = S3_CONF['s3_store_large_object_chunk_size'] * units.Mi
|
||||
for i in range(num_parts):
|
||||
part_number = i + 1
|
||||
stubber.add_response(method='upload_part',
|
||||
service_response={
|
||||
'ETag': 'ETag'
|
||||
},
|
||||
expected_params={
|
||||
"Bucket": S3_CONF['s3_store_bucket'],
|
||||
"Key": expected_image_id,
|
||||
"Body": botocore.stub.ANY,
|
||||
'ContentLength': chunk_size,
|
||||
"PartNumber": part_number,
|
||||
"UploadId": 'UploadId'
|
||||
})
|
||||
parts.append({'ETag': 'ETag', 'PartNumber': part_number})
|
||||
|
||||
remaining_image_size -= chunk_size
|
||||
if remaining_image_size < chunk_size:
|
||||
chunk_size = remaining_image_size
|
||||
|
||||
stubber.add_response(method='complete_multipart_upload',
|
||||
service_response={
|
||||
"Bucket": S3_CONF['s3_store_bucket'],
|
||||
"Key": expected_image_id,
|
||||
'ETag': 'ETag'
|
||||
},
|
||||
expected_params={
|
||||
"Bucket": S3_CONF['s3_store_bucket'],
|
||||
"Key": expected_image_id,
|
||||
"MultipartUpload": {
|
||||
"Parts": parts
|
||||
},
|
||||
"UploadId": 'UploadId'
|
||||
})
|
||||
|
||||
mock_client.return_value = fake_s3_client
|
||||
loc, size, checksum, multihash, metadata = \
|
||||
self.store.add(expected_image_id, image_s3, expected_s3_size,
|
||||
self.hash_algo)
|
||||
self.assertEqual("s3_region1", metadata["store"])
|
||||
|
||||
self.assertEqual(expected_location, loc)
|
||||
self.assertEqual(expected_s3_size, size)
|
||||
self.assertEqual(expected_checksum, checksum)
|
||||
self.assertEqual(expected_multihash, multihash)
|
||||
|
||||
@mock.patch.object(boto3.session.Session, "client")
|
||||
def test_add_already_existing(self, mock_client):
|
||||
"""Tests that adding an image with an existing identifier raises an
|
||||
appropriate exception
|
||||
"""
|
||||
image_s3 = six.BytesIO(b"never_gonna_make_it")
|
||||
|
||||
fake_s3_client = botocore.session.get_session().create_client('s3')
|
||||
|
||||
with stub.Stubber(fake_s3_client) as stubber:
|
||||
stubber.add_response(method='head_bucket', service_response={})
|
||||
stubber.add_response(method='head_object', service_response={})
|
||||
mock_client.return_value = fake_s3_client
|
||||
self.assertRaises(exceptions.Duplicate, self.store.add,
|
||||