diff --git a/packetary/library/connections.py b/packetary/library/connections.py index 97e6c6c..f10db69 100644 --- a/packetary/library/connections.py +++ b/packetary/library/connections.py @@ -104,18 +104,24 @@ class RetryHandler(urllib.BaseHandler): def http_error(self, req, fp, code, msg, hdrs): """Checks error code and retries request if it is allowed.""" - if code >= 500 and req.retries_left > 0: + logger.error( + "fail request: %s - %d(%s), retries left - %d.", + req.get_full_url(), code, msg, req.retries_left + ) + if req.retries_left > 0 and is_retryable_http_error(code): req.retries_left -= 1 - logger.warning( - "fail request: %s - %d(%s), retries left - %d.", - req.get_full_url(), code, msg, req.retries_left - ) return self.parent.open(req) https_request = http_request https_response = http_response +def is_retryable_http_error(code): + """Checks that http error can be retried.""" + return code == http_client.NOT_FOUND or \ + code >= http_client.INTERNAL_SERVER_ERROR + + class ConnectionsManager(object): """The connections manager.""" @@ -180,19 +186,34 @@ class ConnectionsManager(object): url, six.text_type(e), request.retries_left ) - def retrieve(self, url, filename, offset=0): + def retrieve(self, url, filename, **attributes): """Downloads remote file. :param url: the remote file`s url - :param filename: the file`s name, that includes path on local fs - :param offset: the number of bytes from the beginning, - that will be skipped + :param filename: the target filename on local filesystem + :param attributes: the file attributes, like size, hashsum, etc. + :return: the count of actually copied bytes """ + offset = 0 + try: + stats = os.stat(filename) + expected_size = attributes.get('size', -1) + if expected_size == stats.st_size: + # TODO(check hashsum) + return 0 + + if stats.st_size < expected_size: + offset = stats.st_size + except OSError as e: + if e.errno != errno.ENOENT: + raise + self._ensure_dir_exists(filename) + + logger.info("download: %s from the offset: %d", url, offset) - self._ensure_dir_exists(filename) fd = os.open(filename, os.O_CREAT | os.O_WRONLY) try: - self._copy_stream(fd, url, offset) + return self._copy_stream(fd, url, offset) except RangeError: if offset == 0: raise @@ -200,7 +221,7 @@ class ConnectionsManager(object): "Failed to resume download, starts from the beginning: %s", url ) - self._copy_stream(fd, url, 0) + return self._copy_stream(fd, url, 0) finally: os.fsync(fd) os.close(fd) @@ -222,14 +243,18 @@ class ConnectionsManager(object): :param url: the remote file`s url :param offset: the number of bytes from the beginning, that will be skipped + :return: the count of actually copied bytes """ source = self.open_stream(url, offset) os.ftruncate(fd, offset) os.lseek(fd, offset, os.SEEK_SET) chunk_size = 16 * 1024 + size = 0 while 1: chunk = source.read(chunk_size) if not chunk: break os.write(fd, chunk) + size += len(chunk) + return size diff --git a/packetary/library/executor.py b/packetary/library/executor.py index 7e708d1..d9d0260 100644 --- a/packetary/library/executor.py +++ b/packetary/library/executor.py @@ -17,6 +17,8 @@ from __future__ import with_statement import logging +import sys + import six from eventlet.greenpool import GreenPool @@ -40,11 +42,11 @@ class AsynchronousSection(object): self.executor = GreenPool(max(size, self.MIN_POOL_SIZE)) self.ignore_errors_num = ignore_errors_num - self.errors = 0 + self.errors = [] self.tasks = set() def __enter__(self): - self.errors = 0 + self.errors[:] = [] return self def __exit__(self, etype, *_): @@ -52,12 +54,13 @@ class AsynchronousSection(object): def execute(self, func, *args, **kwargs): """Calls function asynchronously.""" - if 0 <= self.ignore_errors_num < self.errors: + if 0 <= self.ignore_errors_num < len(self.errors): raise RuntimeError("Too many errors.") gt = self.executor.spawn(func, *args, **kwargs) self.tasks.add(gt) gt.link(self.on_complete) + return gt def on_complete(self, gt): """Callback to handle task completion.""" @@ -65,8 +68,8 @@ class AsynchronousSection(object): try: gt.wait() except Exception as e: - self.errors += 1 - logger.exception("Task failed: %s", six.text_type(e)) + logger.error("Task failed: %s", six.text_type(e)) + self.errors.append(sys.exc_info()) finally: self.tasks.discard(gt) @@ -76,7 +79,13 @@ class AsynchronousSection(object): Do not use directly, will be called from context manager. """ self.executor.waitall() - if not ignore_errors and self.errors > 0: - raise RuntimeError( - "Operations completed with errors. See log for more details." - ) + if len(self.errors) > 0: + for exc_info in self.errors: + logger.exception("error details.", exc_info=exc_info) + + self.errors[:] = [] + if not ignore_errors: + raise RuntimeError( + "Operations completed with errors.\n" + "See log for more details." + ) diff --git a/packetary/library/streams.py b/packetary/library/streams.py index 558a27b..9ab6cf8 100644 --- a/packetary/library/streams.py +++ b/packetary/library/streams.py @@ -66,7 +66,7 @@ class StreamWrapper(object): result = self._align_chunk(result, size) size -= len(result) while size > 0: - chunk = self.read_chunk(max(self.CHUNK_SIZE, size)) + chunk = self.read_chunk(self.CHUNK_SIZE) if not chunk: break if len(chunk) > size: @@ -115,11 +115,17 @@ class GzipDecompress(StreamWrapper): def read_chunk(self, chunksize): if self.decompress.unconsumed_tail: - return self.decompress.decompress( + uncompressed = self.decompress.decompress( self.decompress.unconsumed_tail, chunksize ) + if uncompressed: + return uncompressed - chunk = self.stream.read(chunksize) - if not chunk: - return self.decompress.flush() - return self.decompress.decompress(chunk, chunksize) + while True: + chunk = self.stream.read(chunksize) + if not chunk: + break + uncompressed = self.decompress.decompress(chunk, chunksize) + if uncompressed: + return uncompressed + return self.decompress.flush() diff --git a/packetary/tests/stubs/generator.py b/packetary/tests/stubs/generator.py index 6253313..2f9f65c 100644 --- a/packetary/tests/stubs/generator.py +++ b/packetary/tests/stubs/generator.py @@ -23,28 +23,28 @@ def gen_repository(name="test", url="file:///test", return objects.Repository(name, url, architecture, origin) -def gen_relation(name="test", version=None): +def gen_relation(name="test", version=None, alternative=None): """Helper to create PackageRelation object with default attributes.""" - return [ - objects.PackageRelation( - name=name, version=objects.VersionRange(*(version or [])) - ) - ] + return objects.PackageRelation( + name=name, + version=objects.VersionRange(*(version or [])), + alternative=alternative + ) def gen_package(idx=1, **kwargs): """Helper to create Package object with default attributes.""" repository = gen_repository() - kwargs.setdefault("name", "package{0}".format(idx)) + name = kwargs.setdefault("name", "package{0}".format(idx)) kwargs.setdefault("repository", repository) kwargs.setdefault("version", 1) kwargs.setdefault("checksum", objects.FileChecksum("1", "2", "3")) - kwargs.setdefault("filename", "test.pkg") + kwargs.setdefault("filename", "{0}.pkg".format(name)) kwargs.setdefault("filesize", 1) for relation in ("requires", "provides", "obsoletes"): if relation not in kwargs: - kwargs[relation] = gen_relation( + kwargs[relation] = [gen_relation( "{0}{1}".format(relation, idx), ["le", idx + 1] - ) + )] return objects.Package(**kwargs) diff --git a/packetary/tests/test_connections.py b/packetary/tests/test_connections.py index fc19b79..8afcd45 100644 --- a/packetary/tests/test_connections.py +++ b/packetary/tests/test_connections.py @@ -112,29 +112,48 @@ class TestConnectionManager(base.TestCase): def test_retrieve_from_offset(self, os, *_): manager = connections.ConnectionsManager() os.path.mkdirs.side_effect = OSError(17, "") + os.stat.return_value = mock.MagicMock(st_size=10) os.open.return_value = 1 response = mock.MagicMock() manager.opener.open.return_value = response response.read.side_effect = [b"test", b""] - manager.retrieve("/file/src", "/file/dst", 10) + manager.retrieve("/file/src", "/file/dst", size=20) os.lseek.assert_called_once_with(1, 10, os.SEEK_SET) os.ftruncate.assert_called_once_with(1, 10) self.assertEqual(1, os.write.call_count) os.fsync.assert_called_once_with(1) os.close.assert_called_once_with(1) + @mock.patch("packetary.library.connections.urllib.build_opener") + @mock.patch("packetary.library.connections.os") + def test_retrieve_non_existence(self, os, *_): + manager = connections.ConnectionsManager() + os.path.mkdirs.side_effect = OSError(17, "") + os.stat.side_effect = OSError(2, "") + os.open.return_value = 1 + response = mock.MagicMock() + manager.opener.open.return_value = response + response.read.side_effect = [b"test", b""] + manager.retrieve("/file/src", "/file/dst", size=20) + os.lseek.assert_called_once_with(1, 0, os.SEEK_SET) + os.ftruncate.assert_called_once_with(1, 0) + self.assertEqual(1, os.write.call_count) + os.fsync.assert_called_once_with(1) + os.close.assert_called_once_with(1) + @mock.patch("packetary.library.connections.urllib.build_opener") @mock.patch("packetary.library.connections.os") def test_retrieve_from_offset_fail(self, os, _, logger): manager = connections.ConnectionsManager(retries_num=2) os.path.mkdirs.side_effect = OSError(connections.errno.EACCES, "") + os.stat.return_value = mock.MagicMock(st_size=10) os.open.return_value = 1 response = mock.MagicMock() manager.opener.open.side_effect = [ connections.RangeError("error"), response ] response.read.side_effect = [b"test", b""] - manager.retrieve("/file/src", "/file/dst", 10) + manager.retrieve("/file/src", "/file/dst", size=20) logger.warning.assert_called_once_with( "Failed to resume download, starts from the beginning: %s", "/file/src" @@ -198,12 +217,12 @@ class TestRetryHandler(base.TestCase): self.handler.http_error( request, mock.MagicMock(), 500, "error", mock.MagicMock() ) - logger.warning.assert_called_with( + logger.error.assert_called_with( "fail request: %s - %d(%s), retries left - %d.", - "/test", 500, "error", 0 + "/test", 500, "error", 1 ) self.handler.http_error( - request, mock.MagicMock(), 500, "error", mock.MagicMock() + request, mock.MagicMock(), 404, "error", mock.MagicMock() ) self.handler.parent.open.assert_called_once_with(request) diff --git a/packetary/tests/test_executor.py b/packetary/tests/test_executor.py index 59d036e..ed0b13e 100644 --- a/packetary/tests/test_executor.py +++ b/packetary/tests/test_executor.py @@ -47,15 +47,14 @@ class TestAsynchronousSection(base.TestCase): section.execute(_raise_value_error) section.execute(time.sleep, 0) section.wait(ignore_errors=True) - self.assertEqual(1, section.errors) logger.exception.assert_called_with( - "Task failed: %s", "error" + "error details.", exc_info=mock.ANY ) def test_fail_if_too_many_errors(self, _): - section = executor.AsynchronousSection(ignore_errors_num=0) + section = executor.AsynchronousSection(size=1, ignore_errors_num=0) section.execute(_raise_value_error) - section.wait(ignore_errors=True) + time.sleep(0) # switch context with self.assertRaisesRegexp(RuntimeError, "Too many errors"): section.execute(time.sleep, 0) diff --git a/packetary/tests/test_index.py b/packetary/tests/test_index.py index 4278679..e5baa18 100644 --- a/packetary/tests/test_index.py +++ b/packetary/tests/test_index.py @@ -83,7 +83,7 @@ class TestIndex(base.TestCase): index = Index() p1 = gen_package(idx=1, version=2) p2 = gen_package(idx=2, version=2) - p2.obsoletes.extend( + p2.obsoletes.append( gen_relation(p1.name, ["lt", p1.version]) ) index.add(p1) diff --git a/packetary/tests/test_objects.py b/packetary/tests/test_objects.py index a582346..fa6a45b 100644 --- a/packetary/tests/test_objects.py +++ b/packetary/tests/test_objects.py @@ -105,8 +105,8 @@ class TestRelationObject(TestObjectBase): def test_hashable(self): self.check_hashable( - generator.gen_relation(name="test1")[0], - generator.gen_relation(name="test1", version=["le", 1])[0] + generator.gen_relation(name="test1"), + generator.gen_relation(name="test1", version=["le", 1]) ) def test_from_args(self): @@ -135,9 +135,9 @@ class TestRelationObject(TestObjectBase): class TestVersionRange(TestObjectBase): def test_equal(self): self.check_equal( - generator.gen_relation(name="test1"), - generator.gen_relation(name="test1"), - generator.gen_relation(name="test2") + VersionRange("eq", 1), + VersionRange("eq", 1), + VersionRange("le", 1) ) def test_hashable(self): diff --git a/packetary/tests/test_streams.py b/packetary/tests/test_streams.py index 77fc956..0678d61 100644 --- a/packetary/tests/test_streams.py +++ b/packetary/tests/test_streams.py @@ -91,3 +91,11 @@ class TestGzipDecompress(base.TestCase): self.assertEqual( [b"line1\n", b"line2\n", b"line3\n"], lines) + + def test_handle_case_if_not_enough_data_to_decompress(self): + self.stream.CHUNK_SIZE = 1 + chunk = self.stream.read() + self.assertEqual( + b"line1\nline2\nline3\n", + chunk + )