[packetary] Infrastructure improvments.

* Added retry for 404 (HTTP Status).
* Added method for downloading the file to the ConnectionManager.
* AsynchronousSection collects all exceptions with traceback and
     log it after all tasks has been completed.
* generator.gen_relation returns object instead of list
* GzipDecompress: fixed case when there is not enough data to decompress next chunk

Change-Id: Ie2014aeee0c4776ed411ef6f28a996ef2d029c95
This commit is contained in:
Bulat Gaifullin 2015-11-03 20:33:22 +03:00
parent 84d5e97218
commit 60ba0e6e62
9 changed files with 118 additions and 52 deletions

View File

@ -104,18 +104,24 @@ class RetryHandler(urllib.BaseHandler):
def http_error(self, req, fp, code, msg, hdrs):
"""Checks error code and retries request if it is allowed."""
if code >= 500 and req.retries_left > 0:
logger.error(
"fail request: %s - %d(%s), retries left - %d.",
req.get_full_url(), code, msg, req.retries_left
)
if req.retries_left > 0 and is_retryable_http_error(code):
req.retries_left -= 1
logger.warning(
"fail request: %s - %d(%s), retries left - %d.",
req.get_full_url(), code, msg, req.retries_left
)
return self.parent.open(req)
https_request = http_request
https_response = http_response
def is_retryable_http_error(code):
"""Checks that http error can be retried."""
return code == http_client.NOT_FOUND or \
code >= http_client.INTERNAL_SERVER_ERROR
class ConnectionsManager(object):
"""The connections manager."""
@ -180,19 +186,34 @@ class ConnectionsManager(object):
url, six.text_type(e), request.retries_left
)
def retrieve(self, url, filename, offset=0):
def retrieve(self, url, filename, **attributes):
"""Downloads remote file.
:param url: the remote file`s url
:param filename: the file`s name, that includes path on local fs
:param offset: the number of bytes from the beginning,
that will be skipped
:param filename: the target filename on local filesystem
:param attributes: the file attributes, like size, hashsum, etc.
:return: the count of actually copied bytes
"""
offset = 0
try:
stats = os.stat(filename)
expected_size = attributes.get('size', -1)
if expected_size == stats.st_size:
# TODO(check hashsum)
return 0
if stats.st_size < expected_size:
offset = stats.st_size
except OSError as e:
if e.errno != errno.ENOENT:
raise
self._ensure_dir_exists(filename)
logger.info("download: %s from the offset: %d", url, offset)
self._ensure_dir_exists(filename)
fd = os.open(filename, os.O_CREAT | os.O_WRONLY)
try:
self._copy_stream(fd, url, offset)
return self._copy_stream(fd, url, offset)
except RangeError:
if offset == 0:
raise
@ -200,7 +221,7 @@ class ConnectionsManager(object):
"Failed to resume download, starts from the beginning: %s",
url
)
self._copy_stream(fd, url, 0)
return self._copy_stream(fd, url, 0)
finally:
os.fsync(fd)
os.close(fd)
@ -222,14 +243,18 @@ class ConnectionsManager(object):
:param url: the remote file`s url
:param offset: the number of bytes from the beginning,
that will be skipped
:return: the count of actually copied bytes
"""
source = self.open_stream(url, offset)
os.ftruncate(fd, offset)
os.lseek(fd, offset, os.SEEK_SET)
chunk_size = 16 * 1024
size = 0
while 1:
chunk = source.read(chunk_size)
if not chunk:
break
os.write(fd, chunk)
size += len(chunk)
return size

View File

@ -17,6 +17,8 @@
from __future__ import with_statement
import logging
import sys
import six
from eventlet.greenpool import GreenPool
@ -40,11 +42,11 @@ class AsynchronousSection(object):
self.executor = GreenPool(max(size, self.MIN_POOL_SIZE))
self.ignore_errors_num = ignore_errors_num
self.errors = 0
self.errors = []
self.tasks = set()
def __enter__(self):
self.errors = 0
self.errors[:] = []
return self
def __exit__(self, etype, *_):
@ -52,12 +54,13 @@ class AsynchronousSection(object):
def execute(self, func, *args, **kwargs):
"""Calls function asynchronously."""
if 0 <= self.ignore_errors_num < self.errors:
if 0 <= self.ignore_errors_num < len(self.errors):
raise RuntimeError("Too many errors.")
gt = self.executor.spawn(func, *args, **kwargs)
self.tasks.add(gt)
gt.link(self.on_complete)
return gt
def on_complete(self, gt):
"""Callback to handle task completion."""
@ -65,8 +68,8 @@ class AsynchronousSection(object):
try:
gt.wait()
except Exception as e:
self.errors += 1
logger.exception("Task failed: %s", six.text_type(e))
logger.error("Task failed: %s", six.text_type(e))
self.errors.append(sys.exc_info())
finally:
self.tasks.discard(gt)
@ -76,7 +79,13 @@ class AsynchronousSection(object):
Do not use directly, will be called from context manager.
"""
self.executor.waitall()
if not ignore_errors and self.errors > 0:
raise RuntimeError(
"Operations completed with errors. See log for more details."
)
if len(self.errors) > 0:
for exc_info in self.errors:
logger.exception("error details.", exc_info=exc_info)
self.errors[:] = []
if not ignore_errors:
raise RuntimeError(
"Operations completed with errors.\n"
"See log for more details."
)

View File

@ -66,7 +66,7 @@ class StreamWrapper(object):
result = self._align_chunk(result, size)
size -= len(result)
while size > 0:
chunk = self.read_chunk(max(self.CHUNK_SIZE, size))
chunk = self.read_chunk(self.CHUNK_SIZE)
if not chunk:
break
if len(chunk) > size:
@ -115,11 +115,17 @@ class GzipDecompress(StreamWrapper):
def read_chunk(self, chunksize):
if self.decompress.unconsumed_tail:
return self.decompress.decompress(
uncompressed = self.decompress.decompress(
self.decompress.unconsumed_tail, chunksize
)
if uncompressed:
return uncompressed
chunk = self.stream.read(chunksize)
if not chunk:
return self.decompress.flush()
return self.decompress.decompress(chunk, chunksize)
while True:
chunk = self.stream.read(chunksize)
if not chunk:
break
uncompressed = self.decompress.decompress(chunk, chunksize)
if uncompressed:
return uncompressed
return self.decompress.flush()

View File

@ -23,28 +23,28 @@ def gen_repository(name="test", url="file:///test",
return objects.Repository(name, url, architecture, origin)
def gen_relation(name="test", version=None):
def gen_relation(name="test", version=None, alternative=None):
"""Helper to create PackageRelation object with default attributes."""
return [
objects.PackageRelation(
name=name, version=objects.VersionRange(*(version or []))
)
]
return objects.PackageRelation(
name=name,
version=objects.VersionRange(*(version or [])),
alternative=alternative
)
def gen_package(idx=1, **kwargs):
"""Helper to create Package object with default attributes."""
repository = gen_repository()
kwargs.setdefault("name", "package{0}".format(idx))
name = kwargs.setdefault("name", "package{0}".format(idx))
kwargs.setdefault("repository", repository)
kwargs.setdefault("version", 1)
kwargs.setdefault("checksum", objects.FileChecksum("1", "2", "3"))
kwargs.setdefault("filename", "test.pkg")
kwargs.setdefault("filename", "{0}.pkg".format(name))
kwargs.setdefault("filesize", 1)
for relation in ("requires", "provides", "obsoletes"):
if relation not in kwargs:
kwargs[relation] = gen_relation(
kwargs[relation] = [gen_relation(
"{0}{1}".format(relation, idx), ["le", idx + 1]
)
)]
return objects.Package(**kwargs)

View File

@ -112,29 +112,48 @@ class TestConnectionManager(base.TestCase):
def test_retrieve_from_offset(self, os, *_):
manager = connections.ConnectionsManager()
os.path.mkdirs.side_effect = OSError(17, "")
os.stat.return_value = mock.MagicMock(st_size=10)
os.open.return_value = 1
response = mock.MagicMock()
manager.opener.open.return_value = response
response.read.side_effect = [b"test", b""]
manager.retrieve("/file/src", "/file/dst", 10)
manager.retrieve("/file/src", "/file/dst", size=20)
os.lseek.assert_called_once_with(1, 10, os.SEEK_SET)
os.ftruncate.assert_called_once_with(1, 10)
self.assertEqual(1, os.write.call_count)
os.fsync.assert_called_once_with(1)
os.close.assert_called_once_with(1)
@mock.patch("packetary.library.connections.urllib.build_opener")
@mock.patch("packetary.library.connections.os")
def test_retrieve_non_existence(self, os, *_):
manager = connections.ConnectionsManager()
os.path.mkdirs.side_effect = OSError(17, "")
os.stat.side_effect = OSError(2, "")
os.open.return_value = 1
response = mock.MagicMock()
manager.opener.open.return_value = response
response.read.side_effect = [b"test", b""]
manager.retrieve("/file/src", "/file/dst", size=20)
os.lseek.assert_called_once_with(1, 0, os.SEEK_SET)
os.ftruncate.assert_called_once_with(1, 0)
self.assertEqual(1, os.write.call_count)
os.fsync.assert_called_once_with(1)
os.close.assert_called_once_with(1)
@mock.patch("packetary.library.connections.urllib.build_opener")
@mock.patch("packetary.library.connections.os")
def test_retrieve_from_offset_fail(self, os, _, logger):
manager = connections.ConnectionsManager(retries_num=2)
os.path.mkdirs.side_effect = OSError(connections.errno.EACCES, "")
os.stat.return_value = mock.MagicMock(st_size=10)
os.open.return_value = 1
response = mock.MagicMock()
manager.opener.open.side_effect = [
connections.RangeError("error"), response
]
response.read.side_effect = [b"test", b""]
manager.retrieve("/file/src", "/file/dst", 10)
manager.retrieve("/file/src", "/file/dst", size=20)
logger.warning.assert_called_once_with(
"Failed to resume download, starts from the beginning: %s",
"/file/src"
@ -198,12 +217,12 @@ class TestRetryHandler(base.TestCase):
self.handler.http_error(
request, mock.MagicMock(), 500, "error", mock.MagicMock()
)
logger.warning.assert_called_with(
logger.error.assert_called_with(
"fail request: %s - %d(%s), retries left - %d.",
"/test", 500, "error", 0
"/test", 500, "error", 1
)
self.handler.http_error(
request, mock.MagicMock(), 500, "error", mock.MagicMock()
request, mock.MagicMock(), 404, "error", mock.MagicMock()
)
self.handler.parent.open.assert_called_once_with(request)

View File

@ -47,15 +47,14 @@ class TestAsynchronousSection(base.TestCase):
section.execute(_raise_value_error)
section.execute(time.sleep, 0)
section.wait(ignore_errors=True)
self.assertEqual(1, section.errors)
logger.exception.assert_called_with(
"Task failed: %s", "error"
"error details.", exc_info=mock.ANY
)
def test_fail_if_too_many_errors(self, _):
section = executor.AsynchronousSection(ignore_errors_num=0)
section = executor.AsynchronousSection(size=1, ignore_errors_num=0)
section.execute(_raise_value_error)
section.wait(ignore_errors=True)
time.sleep(0) # switch context
with self.assertRaisesRegexp(RuntimeError, "Too many errors"):
section.execute(time.sleep, 0)

View File

@ -83,7 +83,7 @@ class TestIndex(base.TestCase):
index = Index()
p1 = gen_package(idx=1, version=2)
p2 = gen_package(idx=2, version=2)
p2.obsoletes.extend(
p2.obsoletes.append(
gen_relation(p1.name, ["lt", p1.version])
)
index.add(p1)

View File

@ -105,8 +105,8 @@ class TestRelationObject(TestObjectBase):
def test_hashable(self):
self.check_hashable(
generator.gen_relation(name="test1")[0],
generator.gen_relation(name="test1", version=["le", 1])[0]
generator.gen_relation(name="test1"),
generator.gen_relation(name="test1", version=["le", 1])
)
def test_from_args(self):
@ -135,9 +135,9 @@ class TestRelationObject(TestObjectBase):
class TestVersionRange(TestObjectBase):
def test_equal(self):
self.check_equal(
generator.gen_relation(name="test1"),
generator.gen_relation(name="test1"),
generator.gen_relation(name="test2")
VersionRange("eq", 1),
VersionRange("eq", 1),
VersionRange("le", 1)
)
def test_hashable(self):

View File

@ -91,3 +91,11 @@ class TestGzipDecompress(base.TestCase):
self.assertEqual(
[b"line1\n", b"line2\n", b"line3\n"],
lines)
def test_handle_case_if_not_enough_data_to_decompress(self):
self.stream.CHUNK_SIZE = 1
chunk = self.stream.read()
self.assertEqual(
b"line1\nline2\nline3\n",
chunk
)