Close file handle after upload job
The opened file for upload is not closed. This fix prevents possible file handle leak. Closes-Bug: #1559079 Change-Id: Ibc58667789e8f54c74ae2bbd32717a45f7b30550
This commit is contained in:
@@ -1714,6 +1714,7 @@ class SwiftService(object):
|
|||||||
segment_name),
|
segment_name),
|
||||||
'log_line': '%s segment %s' % (obj_name, segment_index),
|
'log_line': '%s segment %s' % (obj_name, segment_index),
|
||||||
}
|
}
|
||||||
|
fp = None
|
||||||
try:
|
try:
|
||||||
fp = open(path, 'rb')
|
fp = open(path, 'rb')
|
||||||
fp.seek(segment_start)
|
fp.seek(segment_start)
|
||||||
@@ -1761,6 +1762,9 @@ class SwiftService(object):
|
|||||||
if results_queue is not None:
|
if results_queue is not None:
|
||||||
results_queue.put(res)
|
results_queue.put(res)
|
||||||
return res
|
return res
|
||||||
|
finally:
|
||||||
|
if fp is not None:
|
||||||
|
fp.close()
|
||||||
|
|
||||||
def _get_chunk_data(self, conn, container, obj, headers, manifest=None):
|
def _get_chunk_data(self, conn, container, obj, headers, manifest=None):
|
||||||
chunks = []
|
chunks = []
|
||||||
@@ -2008,29 +2012,36 @@ class SwiftService(object):
|
|||||||
else:
|
else:
|
||||||
res['large_object'] = False
|
res['large_object'] = False
|
||||||
obr = {}
|
obr = {}
|
||||||
if path is not None:
|
fp = None
|
||||||
content_length = getsize(path)
|
try:
|
||||||
contents = LengthWrapper(open(path, 'rb'),
|
if path is not None:
|
||||||
content_length,
|
content_length = getsize(path)
|
||||||
md5=options['checksum'])
|
fp = open(path, 'rb')
|
||||||
else:
|
contents = LengthWrapper(fp,
|
||||||
content_length = None
|
content_length,
|
||||||
contents = ReadableToIterable(stream,
|
md5=options['checksum'])
|
||||||
md5=options['checksum'])
|
else:
|
||||||
|
content_length = None
|
||||||
|
contents = ReadableToIterable(stream,
|
||||||
|
md5=options['checksum'])
|
||||||
|
|
||||||
etag = conn.put_object(
|
etag = conn.put_object(
|
||||||
container, obj, contents,
|
container, obj, contents,
|
||||||
content_length=content_length, headers=put_headers,
|
content_length=content_length, headers=put_headers,
|
||||||
response_dict=obr
|
response_dict=obr
|
||||||
)
|
)
|
||||||
res['response_dict'] = obr
|
res['response_dict'] = obr
|
||||||
|
|
||||||
if (options['checksum'] and
|
if (options['checksum'] and
|
||||||
etag and etag != contents.get_md5sum()):
|
etag and etag != contents.get_md5sum()):
|
||||||
raise SwiftError('Object upload verification failed: '
|
raise SwiftError(
|
||||||
'md5 mismatch, local {0} != remote {1} '
|
'Object upload verification failed: '
|
||||||
'(remote object has not been removed)'
|
'md5 mismatch, local {0} != remote {1} '
|
||||||
.format(contents.get_md5sum(), etag))
|
'(remote object has not been removed)'
|
||||||
|
.format(contents.get_md5sum(), etag))
|
||||||
|
finally:
|
||||||
|
if fp is not None:
|
||||||
|
fp.close()
|
||||||
|
|
||||||
if old_manifest or old_slo_manifest_paths:
|
if old_manifest or old_slo_manifest_paths:
|
||||||
drs = []
|
drs = []
|
||||||
|
@@ -14,6 +14,7 @@
|
|||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
from __future__ import unicode_literals
|
from __future__ import unicode_literals
|
||||||
|
import contextlib
|
||||||
import mock
|
import mock
|
||||||
import os
|
import os
|
||||||
import six
|
import six
|
||||||
@@ -1002,7 +1003,6 @@ class TestService(unittest.TestCase):
|
|||||||
@mock.patch('swiftclient.service.stat')
|
@mock.patch('swiftclient.service.stat')
|
||||||
@mock.patch('swiftclient.service.getmtime', return_value=1.0)
|
@mock.patch('swiftclient.service.getmtime', return_value=1.0)
|
||||||
@mock.patch('swiftclient.service.getsize', return_value=4)
|
@mock.patch('swiftclient.service.getsize', return_value=4)
|
||||||
@mock.patch.object(builtins, 'open', return_value=six.StringIO('asdf'))
|
|
||||||
def test_upload_with_relative_path(self, *args, **kwargs):
|
def test_upload_with_relative_path(self, *args, **kwargs):
|
||||||
service = SwiftService({})
|
service = SwiftService({})
|
||||||
objects = [{'path': "./test",
|
objects = [{'path': "./test",
|
||||||
@@ -1012,7 +1012,9 @@ class TestService(unittest.TestCase):
|
|||||||
{'path': ".\\test",
|
{'path': ".\\test",
|
||||||
'strt_indx': 2}]
|
'strt_indx': 2}]
|
||||||
for obj in objects:
|
for obj in objects:
|
||||||
with mock.patch('swiftclient.service.Connection') as mock_conn:
|
with mock.patch('swiftclient.service.Connection') as mock_conn, \
|
||||||
|
mock.patch.object(builtins, 'open') as mock_open:
|
||||||
|
mock_open.return_value = six.StringIO('asdf')
|
||||||
mock_conn.return_value.head_object.side_effect = \
|
mock_conn.return_value.head_object.side_effect = \
|
||||||
ClientException('Not Found', http_status=404)
|
ClientException('Not Found', http_status=404)
|
||||||
mock_conn.return_value.put_object.return_value =\
|
mock_conn.return_value.put_object.return_value =\
|
||||||
@@ -1032,10 +1034,29 @@ class TestService(unittest.TestCase):
|
|||||||
self.assertEqual(upload_obj_resp['object'],
|
self.assertEqual(upload_obj_resp['object'],
|
||||||
obj['path'][obj['strt_indx']:])
|
obj['path'][obj['strt_indx']:])
|
||||||
self.assertEqual(upload_obj_resp['path'], obj['path'])
|
self.assertEqual(upload_obj_resp['path'], obj['path'])
|
||||||
|
self.assertTrue(mock_open.return_value.closed)
|
||||||
|
|
||||||
|
|
||||||
class TestServiceUpload(_TestServiceBase):
|
class TestServiceUpload(_TestServiceBase):
|
||||||
|
|
||||||
|
@contextlib.contextmanager
|
||||||
|
def assert_open_results_are_closed(self):
|
||||||
|
opened_files = []
|
||||||
|
builtin_open = builtins.open
|
||||||
|
|
||||||
|
def open_wrapper(*a, **kw):
|
||||||
|
opened_files.append((builtin_open(*a, **kw), a, kw))
|
||||||
|
return opened_files[-1][0]
|
||||||
|
|
||||||
|
with mock.patch.object(builtins, 'open', open_wrapper):
|
||||||
|
yield
|
||||||
|
for fp, args, kwargs in opened_files:
|
||||||
|
formatted_args = [repr(a) for a in args]
|
||||||
|
formatted_args.extend('%s=%r' % kv for kv in kwargs.items())
|
||||||
|
formatted_args = ', '.join(formatted_args)
|
||||||
|
self.assertTrue(fp.closed,
|
||||||
|
'Failed to close open(%s)' % formatted_args)
|
||||||
|
|
||||||
def test_upload_object_job_file_with_unicode_path(self):
|
def test_upload_object_job_file_with_unicode_path(self):
|
||||||
# Uploading a file results in the file object being wrapped in a
|
# Uploading a file results in the file object being wrapped in a
|
||||||
# LengthWrapper. This test sets the options in such a way that much
|
# LengthWrapper. This test sets the options in such a way that much
|
||||||
@@ -1109,11 +1130,14 @@ class TestServiceUpload(_TestServiceBase):
|
|||||||
f.write(b'c' * 10)
|
f.write(b'c' * 10)
|
||||||
f.flush()
|
f.flush()
|
||||||
|
|
||||||
# Mock the connection to return an empty etag. This
|
# run read() when put_object is called to calculate md5sum
|
||||||
# skips etag validation which would fail as the LengthWrapper
|
def _consuming_conn(*a, **kw):
|
||||||
# isn't read from.
|
contents = a[2]
|
||||||
|
contents.read() # Force md5 calculation
|
||||||
|
return contents.get_md5sum()
|
||||||
|
|
||||||
mock_conn = mock.Mock()
|
mock_conn = mock.Mock()
|
||||||
mock_conn.put_object.return_value = ''
|
mock_conn.put_object.side_effect = _consuming_conn
|
||||||
type(mock_conn).attempts = mock.PropertyMock(return_value=2)
|
type(mock_conn).attempts = mock.PropertyMock(return_value=2)
|
||||||
expected_r = {
|
expected_r = {
|
||||||
'action': 'upload_segment',
|
'action': 'upload_segment',
|
||||||
@@ -1125,21 +1149,22 @@ class TestServiceUpload(_TestServiceBase):
|
|||||||
'log_line': 'test_o segment 2',
|
'log_line': 'test_o segment 2',
|
||||||
'success': True,
|
'success': True,
|
||||||
'response_dict': {},
|
'response_dict': {},
|
||||||
'segment_etag': '',
|
'segment_etag': md5(b'b' * 10).hexdigest(),
|
||||||
'attempts': 2,
|
'attempts': 2,
|
||||||
}
|
}
|
||||||
|
|
||||||
s = SwiftService()
|
s = SwiftService()
|
||||||
r = s._upload_segment_job(conn=mock_conn,
|
with self.assert_open_results_are_closed():
|
||||||
path=f.name,
|
r = s._upload_segment_job(conn=mock_conn,
|
||||||
container='test_c',
|
path=f.name,
|
||||||
segment_name='test_s_1',
|
container='test_c',
|
||||||
segment_start=10,
|
segment_name='test_s_1',
|
||||||
segment_size=10,
|
segment_start=10,
|
||||||
segment_index=2,
|
segment_size=10,
|
||||||
obj_name='test_o',
|
segment_index=2,
|
||||||
options={'segment_container': None,
|
obj_name='test_o',
|
||||||
'checksum': True})
|
options={'segment_container': None,
|
||||||
|
'checksum': True})
|
||||||
|
|
||||||
self.assertEqual(r, expected_r)
|
self.assertEqual(r, expected_r)
|
||||||
|
|
||||||
@@ -1153,10 +1178,6 @@ class TestServiceUpload(_TestServiceBase):
|
|||||||
contents = mock_conn.put_object.call_args[0][2]
|
contents = mock_conn.put_object.call_args[0][2]
|
||||||
self.assertIsInstance(contents, utils.LengthWrapper)
|
self.assertIsInstance(contents, utils.LengthWrapper)
|
||||||
self.assertEqual(len(contents), 10)
|
self.assertEqual(len(contents), 10)
|
||||||
# This read forces the LengthWrapper to calculate the md5
|
|
||||||
# for the read content.
|
|
||||||
self.assertEqual(contents.read(), b'b' * 10)
|
|
||||||
self.assertEqual(contents.get_md5sum(), md5(b'b' * 10).hexdigest())
|
|
||||||
|
|
||||||
def test_etag_mismatch_with_ignore_checksum(self):
|
def test_etag_mismatch_with_ignore_checksum(self):
|
||||||
def _consuming_conn(*a, **kw):
|
def _consuming_conn(*a, **kw):
|
||||||
@@ -1215,16 +1236,17 @@ class TestServiceUpload(_TestServiceBase):
|
|||||||
type(mock_conn).attempts = mock.PropertyMock(return_value=2)
|
type(mock_conn).attempts = mock.PropertyMock(return_value=2)
|
||||||
|
|
||||||
s = SwiftService()
|
s = SwiftService()
|
||||||
r = s._upload_segment_job(conn=mock_conn,
|
with self.assert_open_results_are_closed():
|
||||||
path=f.name,
|
r = s._upload_segment_job(conn=mock_conn,
|
||||||
container='test_c',
|
path=f.name,
|
||||||
segment_name='test_s_1',
|
container='test_c',
|
||||||
segment_start=10,
|
segment_name='test_s_1',
|
||||||
segment_size=10,
|
segment_start=10,
|
||||||
segment_index=2,
|
segment_size=10,
|
||||||
obj_name='test_o',
|
segment_index=2,
|
||||||
options={'segment_container': None,
|
obj_name='test_o',
|
||||||
'checksum': True})
|
options={'segment_container': None,
|
||||||
|
'checksum': True})
|
||||||
|
|
||||||
self.assertIn('md5 mismatch', str(r.get('error')))
|
self.assertIn('md5 mismatch', str(r.get('error')))
|
||||||
|
|
||||||
@@ -1259,21 +1281,29 @@ class TestServiceUpload(_TestServiceBase):
|
|||||||
}
|
}
|
||||||
expected_mtime = '%f' % os.path.getmtime(f.name)
|
expected_mtime = '%f' % os.path.getmtime(f.name)
|
||||||
|
|
||||||
|
# run read() when put_object is called to calculate md5sum
|
||||||
|
# md5sum is verified in _upload_object_job.
|
||||||
|
def _consuming_conn(*a, **kw):
|
||||||
|
contents = a[2]
|
||||||
|
contents.read() # Force md5 calculation
|
||||||
|
return contents.get_md5sum()
|
||||||
|
|
||||||
mock_conn = mock.Mock()
|
mock_conn = mock.Mock()
|
||||||
mock_conn.put_object.return_value = ''
|
mock_conn.put_object.side_effect = _consuming_conn
|
||||||
type(mock_conn).attempts = mock.PropertyMock(return_value=2)
|
type(mock_conn).attempts = mock.PropertyMock(return_value=2)
|
||||||
|
|
||||||
s = SwiftService()
|
s = SwiftService()
|
||||||
r = s._upload_object_job(conn=mock_conn,
|
with self.assert_open_results_are_closed():
|
||||||
container='test_c',
|
r = s._upload_object_job(conn=mock_conn,
|
||||||
source=f.name,
|
container='test_c',
|
||||||
obj='test_o',
|
source=f.name,
|
||||||
options={'changed': False,
|
obj='test_o',
|
||||||
'skip_identical': False,
|
options={'changed': False,
|
||||||
'leave_segments': True,
|
'skip_identical': False,
|
||||||
'header': '',
|
'leave_segments': True,
|
||||||
'segment_size': 0,
|
'header': '',
|
||||||
'checksum': True})
|
'segment_size': 0,
|
||||||
|
'checksum': True})
|
||||||
|
|
||||||
mtime = r['headers']['x-object-meta-mtime']
|
mtime = r['headers']['x-object-meta-mtime']
|
||||||
self.assertEqual(expected_mtime, mtime)
|
self.assertEqual(expected_mtime, mtime)
|
||||||
@@ -1292,11 +1322,6 @@ class TestServiceUpload(_TestServiceBase):
|
|||||||
contents = mock_conn.put_object.call_args[0][2]
|
contents = mock_conn.put_object.call_args[0][2]
|
||||||
self.assertIsInstance(contents, utils.LengthWrapper)
|
self.assertIsInstance(contents, utils.LengthWrapper)
|
||||||
self.assertEqual(len(contents), 30)
|
self.assertEqual(len(contents), 30)
|
||||||
# This read forces the LengthWrapper to calculate the md5
|
|
||||||
# for the read content. This also checks that LengthWrapper was
|
|
||||||
# initialized with md5=True
|
|
||||||
self.assertEqual(contents.read(), b'a' * 30)
|
|
||||||
self.assertEqual(contents.get_md5sum(), md5(b'a' * 30).hexdigest())
|
|
||||||
|
|
||||||
@mock.patch('swiftclient.service.time', return_value=1400000000)
|
@mock.patch('swiftclient.service.time', return_value=1400000000)
|
||||||
def test_upload_object_job_stream(self, time_mock):
|
def test_upload_object_job_stream(self, time_mock):
|
||||||
|
Reference in New Issue
Block a user