diff --git a/tacker/common/csar_utils.py b/tacker/common/csar_utils.py index cd10a477c..5af8e68f4 100644 --- a/tacker/common/csar_utils.py +++ b/tacker/common/csar_utils.py @@ -521,7 +521,7 @@ def _validate_hash(algorithm, hash_code, csar, artifact_path): def extract_csar_zip_file(file_path, extract_path): try: - with zipfile.ZipFile(file_path, 'r') as zf: + with PreserveZipFilePermissions(file_path, 'r') as zf: zf.extractall(extract_path) except (RuntimeError, zipfile.BadZipfile) as exp: with excutils.save_and_reraise_exception(): @@ -573,3 +573,25 @@ def delete_csar_data(package_uuid): msg = _('Failed to delete csar folder: ' '%(csar_path)s, Error: %(exc)s') LOG.warning(msg, {'csar_path': csar_path, 'exc': exc_message}) + + +class PreserveZipFilePermissions(zipfile.ZipFile): + """Patched _extract_member function of zipFile. + + zipfile.ZipFile.extractall function internally calls + _extract_member function. + Here _extract_member function is patched to retain the + file permissions using member.external_attr >> 16. + + Note: First 16 bits of external_attr store permission details. + """ + def _extract_member(self, member, targetpath, pwd): + if not isinstance(member, zipfile.ZipInfo): + member = self.getinfo(member) + + targetpath = super()._extract_member(member, targetpath, pwd) + + attr = member.external_attr >> 16 + if attr != 0: + os.chmod(targetpath, attr) + return targetpath diff --git a/tacker/tests/unit/common/test_csar_utils.py b/tacker/tests/unit/common/test_csar_utils.py index 8df931e62..7255a4a1b 100644 --- a/tacker/tests/unit/common/test_csar_utils.py +++ b/tacker/tests/unit/common/test_csar_utils.py @@ -464,3 +464,24 @@ class TestCSARUtils(testtools.TestCase): self.assertEqual(flavours[0]['sw_images'][1]['min_disk'], 2000000000) self.assertEqual(flavours[0]['sw_images'][1]['size'], 2000000000) self.assertEqual(flavours[0]['sw_images'][1]['min_ram'], 8590458880) + + def test_extract_csar_zip_file_for_file_permissions(self): + dir_location = tempfile.mkdtemp() + file_obj = tempfile.NamedTemporaryFile(dir=dir_location) + file_path = file_obj.name + file_name = file_path.split("/")[3] + os.chmod(file_path, 0o755) + self.addCleanup(shutil.rmtree, dir_location) + initial_cwd = os.getcwd() + os.chdir(dir_location) + zip_file_path = dir_location + '/' + "test_extract_csar_zip_file.zip" + with zipfile.ZipFile(zip_file_path, 'w') as zip: + zip.write(file_name) + self.assertEqual(True, os.path.exists(zip_file_path)) + zip_extract_path = dir_location + "/zip_extract" + extract_file_path = zip_extract_path + "/" + file_name + os.chdir(initial_cwd) + csar_utils.extract_csar_zip_file(zip_file_path, zip_extract_path) + status = os.stat(extract_file_path) + permission = oct(status.st_mode)[-3:] + self.assertEqual('755', permission)