Decouple read chunk size from write chunk size
When using the copy_from option, readers and writers can have different speeds to respectively read and write. A reader timeout will happen if the writer is slow and the writer is being asked to write a lot. This is currently happening when using the VMware store and copying from an HTTP server. The reader is reading 16MB which takes too long to upload to vCenter which is causing a timeout from the HTTP server. The writer should be able to control the size of the chunks being read when using copy_from: this way the writer will write fast enough to not make the reader timeout. This patch addresses the issue by introducing the notion of read chunk size and write chunk size. Each store can have its own value for read and write. The write chunk size of the destination store will be used as the read chunk size of the source store in case of an image-create where the copy_from option is specified. Closes-Bug: #1336168 Signed-off-by: Arnaud Legendre <alegendre@vmware.com> Signed-off-by: Zhi Yan Liu <zhiyanl@cn.ibm.com> Change-Id: I4e0c563b8f3a5ced8f65fcca83d341a97729a5d4
This commit is contained in:
parent
323aba66cf
commit
5148c9648f
|
@ -447,9 +447,10 @@ class Controller(controller.BaseController):
|
|||
return Controller._validate_source(source, req)
|
||||
|
||||
@staticmethod
|
||||
def _get_from_store(context, where):
|
||||
def _get_from_store(context, where, dest=None):
|
||||
try:
|
||||
image_data, image_size = get_from_backend(context, where)
|
||||
image_data, image_size = get_from_backend(
|
||||
context, where, dest=dest)
|
||||
except exception.NotFound as e:
|
||||
raise HTTPNotFound(explanation=e.msg)
|
||||
image_size = int(image_size) if image_size else None
|
||||
|
@ -571,11 +572,15 @@ class Controller(controller.BaseController):
|
|||
:retval The location where the image was stored
|
||||
"""
|
||||
|
||||
scheme = req.headers.get('x-image-meta-store', CONF.default_store)
|
||||
store = self.get_store_or_400(req, scheme)
|
||||
|
||||
copy_from = self._copy_from(req)
|
||||
if copy_from:
|
||||
try:
|
||||
image_data, image_size = self._get_from_store(req.context,
|
||||
copy_from)
|
||||
copy_from,
|
||||
dest=store)
|
||||
except Exception as e:
|
||||
upload_utils.safe_kill(req, image_meta['id'])
|
||||
msg = ("Copy from external source failed: %s" %
|
||||
|
@ -594,10 +599,6 @@ class Controller(controller.BaseController):
|
|||
|
||||
image_data = req.body_file
|
||||
|
||||
scheme = req.headers.get('x-image-meta-store', CONF.default_store)
|
||||
|
||||
store = self.get_store_or_400(req, scheme)
|
||||
|
||||
image_id = image_meta['id']
|
||||
LOG.debug("Setting image %s to status 'saving'", image_id)
|
||||
registry.update_image_metadata(req.context, image_id,
|
||||
|
@ -1102,7 +1103,7 @@ class ImageDeserializer(wsgi.JSONRequestDeserializer):
|
|||
if image_size is None and data is not None:
|
||||
data = utils.LimitingReader(data, CONF.image_size_cap)
|
||||
|
||||
#NOTE(bcwaldon): this is a hack to make sure the downstream code
|
||||
# NOTE(bcwaldon): this is a hack to make sure the downstream code
|
||||
# gets the correct image data
|
||||
request.body_file = data
|
||||
|
||||
|
|
|
@ -258,10 +258,12 @@ def get_from_backend(context, uri, **kwargs):
|
|||
"""Yields chunks of data from backend specified by uri"""
|
||||
|
||||
loc = location.get_location_from_uri(uri)
|
||||
store = get_store_from_uri(context, uri, loc)
|
||||
|
||||
src_store = get_store_from_uri(context, uri, loc)
|
||||
dest_store = kwargs.get('dest')
|
||||
if dest_store is not None:
|
||||
src_store.READ_CHUNKSIZE = dest_store.WRITE_CHUNKSIZE
|
||||
try:
|
||||
return store.get(loc)
|
||||
return src_store.get(loc)
|
||||
except NotImplementedError:
|
||||
raise exception.StoreGetNotSupported
|
||||
|
||||
|
|
|
@ -27,7 +27,8 @@ LOG = logging.getLogger(__name__)
|
|||
|
||||
class Store(object):
|
||||
|
||||
CHUNKSIZE = 16 * units.Mi # 16M
|
||||
READ_CHUNKSIZE = 16 * units.Mi # 16M
|
||||
WRITE_CHUNKSIZE = READ_CHUNKSIZE
|
||||
|
||||
@staticmethod
|
||||
def _unconfigured(*args, **kwargs):
|
||||
|
|
|
@ -89,8 +89,6 @@ class ChunkedFile(object):
|
|||
something that can iterate over a large file
|
||||
"""
|
||||
|
||||
CHUNKSIZE = 65536
|
||||
|
||||
def __init__(self, filepath):
|
||||
self.filepath = filepath
|
||||
self.fp = open(self.filepath, 'rb')
|
||||
|
@ -100,7 +98,7 @@ class ChunkedFile(object):
|
|||
try:
|
||||
if self.fp:
|
||||
while True:
|
||||
chunk = self.fp.read(ChunkedFile.CHUNKSIZE)
|
||||
chunk = self.fp.read(Store.READ_CHUNKSIZE)
|
||||
if chunk:
|
||||
yield chunk
|
||||
else:
|
||||
|
@ -117,6 +115,9 @@ class ChunkedFile(object):
|
|||
|
||||
class Store(glance.store.base.Store):
|
||||
|
||||
READ_CHUNKSIZE = 64 * units.Ki
|
||||
WRITE_CHUNKSIZE = READ_CHUNKSIZE
|
||||
|
||||
def get_schemes(self):
|
||||
return ('file', 'filesystem')
|
||||
|
||||
|
@ -431,7 +432,7 @@ class Store(glance.store.base.Store):
|
|||
try:
|
||||
with open(filepath, 'wb') as f:
|
||||
for buf in utils.chunkreadable(image_file,
|
||||
ChunkedFile.CHUNKSIZE):
|
||||
self.WRITE_CHUNKSIZE):
|
||||
bytes_written += len(buf)
|
||||
checksum.update(buf)
|
||||
f.write(buf)
|
||||
|
|
|
@ -122,7 +122,7 @@ class Store(glance.store.base.Store):
|
|||
"""
|
||||
conn, resp, content_length = self._query(location, 'GET')
|
||||
|
||||
iterator = http_response_iterator(conn, resp, self.CHUNKSIZE)
|
||||
iterator = http_response_iterator(conn, resp, self.READ_CHUNKSIZE)
|
||||
|
||||
class ResponseIndexable(glance.store.Indexable):
|
||||
def another(self):
|
||||
|
|
|
@ -188,7 +188,8 @@ class Store(glance.store.base.Store):
|
|||
itself, it should raise `exception.BadStoreConfiguration`
|
||||
"""
|
||||
try:
|
||||
self.chunk_size = CONF.rbd_store_chunk_size * units.Mi
|
||||
self.READ_CHUNKSIZE = CONF.rbd_store_chunk_size * units.Mi
|
||||
self.WRITE_CHUNKSIZE = self.READ_CHUNKSIZE
|
||||
|
||||
# these must not be unicode since they will be passed to a
|
||||
# non-unicode-aware C library
|
||||
|
@ -323,7 +324,7 @@ class Store(glance.store.base.Store):
|
|||
if hasattr(conn, 'get_fsid'):
|
||||
fsid = conn.get_fsid()
|
||||
with conn.open_ioctx(self.pool) as ioctx:
|
||||
order = int(math.log(self.chunk_size, 2))
|
||||
order = int(math.log(self.WRITE_CHUNKSIZE, 2))
|
||||
LOG.debug('creating image %(name)s with order %(order)d and '
|
||||
'size %(size)d',
|
||||
{'name': text_type(image_name),
|
||||
|
@ -345,7 +346,7 @@ class Store(glance.store.base.Store):
|
|||
bytes_written = 0
|
||||
offset = 0
|
||||
chunks = utils.chunkreadable(image_file,
|
||||
self.chunk_size)
|
||||
self.WRITE_CHUNKSIZE)
|
||||
for chunk in chunks:
|
||||
# If the image size provided is zero we need to do
|
||||
# a resize for the amount we are writing. This will
|
||||
|
|
|
@ -257,8 +257,6 @@ class ChunkedFile(object):
|
|||
something that can iterate over a ``boto.s3.key.Key``
|
||||
"""
|
||||
|
||||
CHUNKSIZE = 65536
|
||||
|
||||
def __init__(self, fp):
|
||||
self.fp = fp
|
||||
|
||||
|
@ -267,7 +265,7 @@ class ChunkedFile(object):
|
|||
try:
|
||||
if self.fp:
|
||||
while True:
|
||||
chunk = self.fp.read(ChunkedFile.CHUNKSIZE)
|
||||
chunk = self.fp.read(Store.READ_CHUNKSIZE)
|
||||
if chunk:
|
||||
yield chunk
|
||||
else:
|
||||
|
@ -295,6 +293,9 @@ class ChunkedFile(object):
|
|||
class Store(glance.store.base.Store):
|
||||
"""An implementation of the s3 adapter."""
|
||||
|
||||
READ_CHUNKSIZE = 64 * units.Ki
|
||||
WRITE_CHUNKSIZE = READ_CHUNKSIZE
|
||||
|
||||
EXAMPLE_URL = "s3://<ACCESS_KEY>:<SECRET_KEY>@<S3_URL>/<BUCKET>/<OBJ>"
|
||||
|
||||
def get_schemes(self):
|
||||
|
@ -372,11 +373,11 @@ class Store(glance.store.base.Store):
|
|||
"""
|
||||
key = self._retrieve_key(location)
|
||||
|
||||
key.BufferSize = self.CHUNKSIZE
|
||||
key.BufferSize = self.READ_CHUNKSIZE
|
||||
|
||||
class ChunkedIndexable(glance.store.Indexable):
|
||||
def another(self):
|
||||
return (self.wrapped.fp.read(ChunkedFile.CHUNKSIZE)
|
||||
return (self.wrapped.fp.read(self.READ_CHUNKSIZE)
|
||||
if self.wrapped.fp else None)
|
||||
|
||||
return (ChunkedIndexable(ChunkedFile(key), key.size), key.size)
|
||||
|
@ -504,7 +505,7 @@ class Store(glance.store.base.Store):
|
|||
tmpdir = self.s3_store_object_buffer_dir
|
||||
temp_file = tempfile.NamedTemporaryFile(dir=tmpdir)
|
||||
checksum = hashlib.md5()
|
||||
for chunk in utils.chunkreadable(image_file, self.CHUNKSIZE):
|
||||
for chunk in utils.chunkreadable(image_file, self.WRITE_CHUNKSIZE):
|
||||
checksum.update(chunk)
|
||||
temp_file.write(chunk)
|
||||
temp_file.flush()
|
||||
|
|
|
@ -191,7 +191,8 @@ class Store(glance.store.base.Store):
|
|||
"""
|
||||
|
||||
try:
|
||||
self.chunk_size = CONF.sheepdog_store_chunk_size * units.Mi
|
||||
self.READ_CHUNKSIZE = CONF.sheepdog_store_chunk_size * units.Mi
|
||||
self.WRITE_CHUNKSIZE = self.READ_CHUNKSIZE
|
||||
self.addr = CONF.sheepdog_store_address.strip()
|
||||
self.port = CONF.sheepdog_store_port
|
||||
except cfg.ConfigFileValueError as e:
|
||||
|
@ -231,7 +232,7 @@ class Store(glance.store.base.Store):
|
|||
|
||||
loc = location.store_location
|
||||
image = SheepdogImage(self.addr, self.port, loc.image,
|
||||
self.chunk_size)
|
||||
self.READ_CHUNKSIZE)
|
||||
if not image.exist():
|
||||
raise exception.NotFound(_("Sheepdog image %s does not exist")
|
||||
% image.name)
|
||||
|
@ -250,7 +251,7 @@ class Store(glance.store.base.Store):
|
|||
|
||||
loc = location.store_location
|
||||
image = SheepdogImage(self.addr, self.port, loc.image,
|
||||
self.chunk_size)
|
||||
self.READ_CHUNKSIZE)
|
||||
if not image.exist():
|
||||
raise exception.NotFound(_("Sheepdog image %s does not exist")
|
||||
% image.name)
|
||||
|
@ -272,7 +273,7 @@ class Store(glance.store.base.Store):
|
|||
"""
|
||||
|
||||
image = SheepdogImage(self.addr, self.port, image_id,
|
||||
self.chunk_size)
|
||||
self.WRITE_CHUNKSIZE)
|
||||
if image.exist():
|
||||
raise exception.Duplicate(_("Sheepdog image %s already exists")
|
||||
% image_id)
|
||||
|
@ -285,7 +286,7 @@ class Store(glance.store.base.Store):
|
|||
try:
|
||||
total = left = image_size
|
||||
while left > 0:
|
||||
length = min(self.chunk_size, left)
|
||||
length = min(self.WRITE_CHUNKSIZE, left)
|
||||
data = image_file.read(length)
|
||||
image.write(data, total - left, length)
|
||||
left -= length
|
||||
|
@ -311,7 +312,7 @@ class Store(glance.store.base.Store):
|
|||
|
||||
loc = location.store_location
|
||||
image = SheepdogImage(self.addr, self.port, loc.image,
|
||||
self.chunk_size)
|
||||
self.WRITe_CHUNKSIZE)
|
||||
if not image.exist():
|
||||
raise exception.NotFound(_("Sheepdog image %s does not exist") %
|
||||
loc.image)
|
||||
|
|
|
@ -31,6 +31,7 @@ from glance.common import swift_store_utils
|
|||
from glance import i18n
|
||||
from glance.openstack.common import excutils
|
||||
import glance.openstack.common.log as logging
|
||||
from glance.openstack.common import units
|
||||
import glance.store
|
||||
import glance.store.base
|
||||
import glance.store.location
|
||||
|
@ -350,7 +351,7 @@ def Store(context=None, loc=None, configure=True):
|
|||
|
||||
|
||||
class BaseStore(glance.store.base.Store):
|
||||
CHUNKSIZE = 65536
|
||||
READ_CHUNKSIZE = 64 * units.Ki
|
||||
|
||||
def get_schemes(self):
|
||||
return ('swift+https', 'swift', 'swift+http', 'swift+config')
|
||||
|
@ -379,7 +380,7 @@ class BaseStore(glance.store.base.Store):
|
|||
try:
|
||||
resp_headers, resp_body = connection.get_object(
|
||||
container=location.container, obj=location.obj,
|
||||
resp_chunk_size=self.CHUNKSIZE, headers=headers)
|
||||
resp_chunk_size=self.READ_CHUNKSIZE, headers=headers)
|
||||
except swiftclient.ClientException as e:
|
||||
if e.http_status == httplib.NOT_FOUND:
|
||||
msg = _("Swift could not find object %s.") % location.obj
|
||||
|
|
|
@ -30,6 +30,7 @@ from glance.common import exception
|
|||
from glance.openstack.common import excutils
|
||||
from glance.openstack.common import gettextutils
|
||||
import glance.openstack.common.log as logging
|
||||
from glance.openstack.common import units
|
||||
import glance.store
|
||||
import glance.store.base
|
||||
import glance.store.location
|
||||
|
@ -221,6 +222,8 @@ class StoreLocation(glance.store.location.StoreLocation):
|
|||
class Store(glance.store.base.Store):
|
||||
"""An implementation of the VMware datastore adapter."""
|
||||
|
||||
WRITE_CHUNKSIZE = units.Mi
|
||||
|
||||
def get_schemes(self):
|
||||
return (STORE_SCHEME,)
|
||||
|
||||
|
@ -366,7 +369,7 @@ class Store(glance.store.base.Store):
|
|||
from glance.store.location.get_location_from_uri()
|
||||
"""
|
||||
conn, resp, content_length = self._query(location, 'GET')
|
||||
iterator = http_response_iterator(conn, resp, self.CHUNKSIZE)
|
||||
iterator = http_response_iterator(conn, resp, self.READ_CHUNKSIZE)
|
||||
|
||||
class ResponseIndexable(glance.store.Indexable):
|
||||
|
||||
|
|
|
@ -30,7 +30,6 @@ import six.moves.builtins as __builtin__
|
|||
from glance.common import exception
|
||||
from glance.openstack.common import units
|
||||
|
||||
from glance.store.filesystem import ChunkedFile
|
||||
from glance.store.filesystem import Store
|
||||
from glance.store.location import get_location_from_uri
|
||||
from glance.tests.unit import base
|
||||
|
@ -43,14 +42,16 @@ class TestStore(base.IsolatedUnitTest):
|
|||
def setUp(self):
|
||||
"""Establish a clean test environment"""
|
||||
super(TestStore, self).setUp()
|
||||
self.orig_chunksize = ChunkedFile.CHUNKSIZE
|
||||
ChunkedFile.CHUNKSIZE = 10
|
||||
self.orig_read_chunksize = Store.READ_CHUNKSIZE
|
||||
self.orig_write_chunksize = Store.WRITE_CHUNKSIZE
|
||||
Store.READ_CHUNKSIZE = Store.WRITE_CHUNKSIZE = 10
|
||||
self.store = Store()
|
||||
|
||||
def tearDown(self):
|
||||
"""Clear the test environment"""
|
||||
super(TestStore, self).tearDown()
|
||||
ChunkedFile.CHUNKSIZE = self.orig_chunksize
|
||||
Store.READ_CHUNKSIZE = self.orig_read_chunksize
|
||||
Store.WRITE_CHUNKSIZE = self.orig_write_chunksize
|
||||
|
||||
def test_configure_add_single_datadir(self):
|
||||
"""
|
||||
|
@ -191,7 +192,7 @@ class TestStore(base.IsolatedUnitTest):
|
|||
|
||||
def test_add(self):
|
||||
"""Test that we can add an image via the filesystem backend"""
|
||||
ChunkedFile.CHUNKSIZE = 1024
|
||||
Store.WRITE_CHUNKSIZE = 1024
|
||||
expected_image_id = str(uuid.uuid4())
|
||||
expected_file_size = 5 * units.Ki # 5K
|
||||
expected_file_contents = "*" * expected_file_size
|
||||
|
@ -232,7 +233,7 @@ class TestStore(base.IsolatedUnitTest):
|
|||
self.store.configure_add()
|
||||
|
||||
"""Test that we can add an image via the filesystem backend"""
|
||||
ChunkedFile.CHUNKSIZE = 1024
|
||||
Store.WRITE_CHUNKSIZE = 1024
|
||||
expected_image_id = str(uuid.uuid4())
|
||||
expected_file_size = 5 * units.Ki # 5K
|
||||
expected_file_contents = "*" * expected_file_size
|
||||
|
@ -279,7 +280,7 @@ class TestStore(base.IsolatedUnitTest):
|
|||
|
||||
self.stubs.Set(self.store, '_get_capacity_info',
|
||||
fake_get_capacity_info)
|
||||
ChunkedFile.CHUNKSIZE = 1024
|
||||
Store.WRITE_CHUNKSIZE = 1024
|
||||
expected_image_id = str(uuid.uuid4())
|
||||
expected_file_size = 5 * units.Ki # 5K
|
||||
expected_file_contents = "*" * expected_file_size
|
||||
|
@ -347,7 +348,7 @@ class TestStore(base.IsolatedUnitTest):
|
|||
Tests that adding an image with an existing identifier
|
||||
raises an appropriate exception
|
||||
"""
|
||||
ChunkedFile.CHUNKSIZE = 1024
|
||||
Store.WRITE_CHUNKSIZE = 1024
|
||||
image_id = str(uuid.uuid4())
|
||||
file_size = 5 * units.Ki # 5K
|
||||
file_contents = "*" * file_size
|
||||
|
@ -362,7 +363,7 @@ class TestStore(base.IsolatedUnitTest):
|
|||
image_id, image_file, 0)
|
||||
|
||||
def _do_test_add_write_failure(self, errno, exception):
|
||||
ChunkedFile.CHUNKSIZE = 1024
|
||||
Store.WRITE_CHUNKSIZE = 1024
|
||||
image_id = str(uuid.uuid4())
|
||||
file_size = 5 * units.Ki # 5K
|
||||
file_contents = "*" * file_size
|
||||
|
@ -419,7 +420,7 @@ class TestStore(base.IsolatedUnitTest):
|
|||
Tests the partial image file is cleaned up after a read
|
||||
failure.
|
||||
"""
|
||||
ChunkedFile.CHUNKSIZE = 1024
|
||||
Store.WRITE_CHUNKSIZE = 1024
|
||||
image_id = str(uuid.uuid4())
|
||||
file_size = 5 * units.Ki # 5K
|
||||
file_contents = "*" * file_size
|
||||
|
|
|
@ -93,7 +93,7 @@ class TestHttpStore(base.StoreClearingUnitTest):
|
|||
super(TestHttpStore, self).setUp()
|
||||
self.stubs = stubout.StubOutForTesting()
|
||||
stub_out_http_backend(self.stubs)
|
||||
Store.CHUNKSIZE = 2
|
||||
Store.READ_CHUNKSIZE = 2
|
||||
self.store = Store()
|
||||
configure_registry_client()
|
||||
|
||||
|
|
|
@ -99,7 +99,7 @@ def stub_out_swiftclient(stubs, swift_store_auth_version):
|
|||
# Large object manifest...
|
||||
global SWIFT_PUT_OBJECT_CALLS
|
||||
SWIFT_PUT_OBJECT_CALLS += 1
|
||||
CHUNKSIZE = 64 * units.Ki
|
||||
CHUNKSIZE = swift.BaseStore.READ_CHUNKSIZE
|
||||
fixture_key = "%s/%s" % (container, name)
|
||||
if fixture_key not in fixture_headers:
|
||||
if kwargs.get('headers'):
|
||||
|
|
|
@ -92,7 +92,7 @@ class TestStore(base.StoreClearingUnitTest):
|
|||
|
||||
super(TestStore, self).setUp()
|
||||
|
||||
Store.CHUNKSIZE = 2
|
||||
Store.READ_CHUNKSIZE = 2
|
||||
self.store = Store()
|
||||
|
||||
class FakeSession:
|
||||
|
|
Loading…
Reference in New Issue