Merge "Add swift upload retry mechanism"
This commit is contained in:
@@ -13,13 +13,16 @@
|
||||
# limitations under the License.
|
||||
|
||||
import hashlib
|
||||
import io
|
||||
import json
|
||||
import tempfile
|
||||
|
||||
from keystoneauth1.identity import v3
|
||||
from keystoneauth1 import session
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
import swiftclient
|
||||
from swiftclient import exceptions as swift_exc
|
||||
|
||||
from backup.storage import base
|
||||
|
||||
@@ -62,19 +65,51 @@ def _get_attr(original):
|
||||
|
||||
|
||||
class StreamReader(object):
|
||||
"""Wrap the stream from the backup process and chunk it into segements."""
|
||||
"""Wrap the stream from the backup process and chunk it into segments.
|
||||
This class now buffers each segment to a temporary file, making it seekable
|
||||
for retry mechanisms, like the one in swiftclient.
|
||||
"""
|
||||
|
||||
def __init__(self, stream, container, filename, max_file_size):
|
||||
self.stream = stream
|
||||
self.container = container
|
||||
self.filename = filename
|
||||
self.max_file_size = max_file_size
|
||||
self.segment_length = 0
|
||||
self.process = None
|
||||
self.file_number = 0
|
||||
# Will be incremented to 0 in _start_new_segment
|
||||
self.file_number = -1
|
||||
# True if the entire original stream is exhausted
|
||||
self.end_of_file = False
|
||||
self.end_of_segment = False
|
||||
self.segment_checksum = hashlib.md5()
|
||||
# tempfile.TemporaryFile for current segment
|
||||
self._current_segment_buffer = None
|
||||
self._current_segment_checksum = hashlib.md5()
|
||||
self._current_segment_length = 0
|
||||
# Current read position within _current_segment_buffer
|
||||
self._buffer_read_offset = 0
|
||||
# True when the current segment is fully read from original stream
|
||||
self._segment_fully_buffered = False
|
||||
# These properties store the final checksum and length of the segment
|
||||
# after it has been fully read before starting a new segment
|
||||
self._segment_final_checksum = None
|
||||
self._segment_final_length = 0
|
||||
# Initialize the first segment
|
||||
self._start_new_segment()
|
||||
|
||||
def _start_new_segment(self):
|
||||
# Prepares for a new segment by creating a new buffer and
|
||||
# resetting state.
|
||||
if self._current_segment_buffer:
|
||||
# Store the final checksum before closing the buffer
|
||||
self._current_segment_buffer.close()
|
||||
|
||||
self.file_number += 1
|
||||
# Using a temporary file for buffering as segments can be large (2GB)
|
||||
self._current_segment_buffer = tempfile.TemporaryFile()
|
||||
self._current_segment_checksum = hashlib.md5()
|
||||
self._current_segment_length = 0
|
||||
# Reset read pointer for the new buffer
|
||||
self._buffer_read_offset = 0
|
||||
# # Reset flag for new segment
|
||||
self._segment_fully_buffered = False
|
||||
|
||||
@property
|
||||
def base_filename(self):
|
||||
@@ -94,25 +129,112 @@ class StreamReader(object):
|
||||
return '%s/%s' % (self.container, self.segment)
|
||||
|
||||
def read(self, chunk_size=2 ** 16):
|
||||
if self.end_of_segment:
|
||||
self.segment_length = 0
|
||||
self.segment_checksum = hashlib.md5()
|
||||
self.end_of_segment = False
|
||||
"""Read data from the stream. This method buffers the current segment
|
||||
from the underlying stream to a temporary file, then serves chunks
|
||||
from that buffer. This makes the stream seekable.
|
||||
"""
|
||||
# Phase 1: Ensure the current segment is fully buffered from the
|
||||
# original stream. This loop will run until the current segment's
|
||||
# data is entirely in _current_segment_buffer or the original
|
||||
# stream is exhausted.
|
||||
if not self._segment_fully_buffered:
|
||||
while True:
|
||||
if (self._current_segment_length + chunk_size) > \
|
||||
self.max_file_size:
|
||||
self._segment_fully_buffered = True
|
||||
LOG.info("StreamReader: Current segment %s reached max "
|
||||
"size. Fully buffered.", self.segment)
|
||||
break
|
||||
|
||||
# Upload to a new file if we are starting or too large
|
||||
if self.segment_length > (self.max_file_size - chunk_size):
|
||||
self.file_number += 1
|
||||
self.end_of_segment = True
|
||||
return ''
|
||||
# Read from the original, unseekable stream
|
||||
chunk = self.stream.read(chunk_size)
|
||||
|
||||
chunk = self.stream.read(chunk_size)
|
||||
if not chunk:
|
||||
self.end_of_file = True
|
||||
return ''
|
||||
if not chunk:
|
||||
# Original stream exhausted. Mark overall end of file.
|
||||
self.end_of_file = True
|
||||
self._segment_fully_buffered = True
|
||||
LOG.info("StreamReader: Original stream exhausted. "
|
||||
"Current segment %s fully buffered.",
|
||||
self.segment)
|
||||
break
|
||||
|
||||
self.segment_checksum.update(chunk)
|
||||
self.segment_length += len(chunk)
|
||||
return chunk
|
||||
self._current_segment_buffer.write(chunk)
|
||||
self._current_segment_checksum.update(chunk)
|
||||
self._current_segment_length += len(chunk)
|
||||
|
||||
# After buffering is complete for this segment, rewind the buffer
|
||||
# so that subsequent reads (by swiftclient,
|
||||
# potentially after seek) start from the beginning of the
|
||||
# buffered data.
|
||||
self._current_segment_buffer.seek(0)
|
||||
self._buffer_read_offset = 0
|
||||
# Phase 2: Serve data from the buffered temporary file
|
||||
data = self._current_segment_buffer.read(chunk_size)
|
||||
self._buffer_read_offset += len(data)
|
||||
# start new segment if the orignal stream is not exhausted
|
||||
if not data and not self.end_of_file:
|
||||
LOG.info("StreamReader: Finished serving data for segment %s. "
|
||||
"Preparing for next.", self.segment)
|
||||
# before we start a new_segment, we need to save the
|
||||
# current checksum
|
||||
self._segment_final_checksum = \
|
||||
self._current_segment_checksum.hexdigest()
|
||||
self._segment_final_length = self._current_segment_length
|
||||
self._start_new_segment()
|
||||
return b''
|
||||
|
||||
# If we've reached the end of the file, just return empty
|
||||
if not data and self.end_of_file:
|
||||
# update final_checksum after all data is read
|
||||
self._segment_final_checksum = \
|
||||
self._current_segment_checksum.hexdigest()
|
||||
self._segment_final_length = self._current_segment_length
|
||||
# Close the buffer after all data is read
|
||||
self._current_segment_buffer.close()
|
||||
return b''
|
||||
|
||||
return data
|
||||
|
||||
def seek(self, offset, whence=io.SEEK_SET):
|
||||
"""Seek within the current segment's buffered data."""
|
||||
if not self._current_segment_buffer:
|
||||
raise io.UnsupportedOperation("StreamReader: No segment buffer "
|
||||
"available for seeking.")
|
||||
|
||||
new_pos = self._current_segment_buffer.seek(offset, whence)
|
||||
self._buffer_read_offset = new_pos
|
||||
|
||||
if new_pos > self._current_segment_length:
|
||||
raise IOError(f"StreamReader: Cannot seek beyond buffered data. "
|
||||
f"Requested position: {new_pos}, Buffered data "
|
||||
f"length: {self._current_segment_length}")
|
||||
|
||||
return new_pos
|
||||
|
||||
def tell(self):
|
||||
# Return the current position within the current segment's
|
||||
# buffered data.
|
||||
if not self._current_segment_buffer:
|
||||
return 0
|
||||
return self._buffer_read_offset
|
||||
|
||||
@property
|
||||
def segment_checksum(self):
|
||||
"""Returns the checksum of the *current* segment."""
|
||||
return self._segment_final_checksum or \
|
||||
self._current_segment_checksum.hexdigest()
|
||||
|
||||
@property
|
||||
def segment_length(self):
|
||||
"""Returns the length of the *current* segment."""
|
||||
return self._segment_final_length or \
|
||||
self._current_segment_checksum.hexdigest()
|
||||
|
||||
def release_buffer(self):
|
||||
"""Manually release the current segment buffer."""
|
||||
if self._current_segment_buffer:
|
||||
self._current_segment_buffer.close()
|
||||
self._current_segment_buffer = None
|
||||
|
||||
|
||||
class SwiftStorage(base.Storage):
|
||||
@@ -136,7 +258,14 @@ class SwiftStorage(base.Storage):
|
||||
|
||||
# Create the container if it doesn't already exist
|
||||
LOG.debug('Ensuring container %s', container)
|
||||
self.client.put_container(container)
|
||||
try:
|
||||
self.client.put_container(container)
|
||||
except swift_exc.ClientException as e:
|
||||
# 409 Conflict means container already exists
|
||||
if e.http_status != 409:
|
||||
LOG.error('Failed to create container %s: %s',
|
||||
container, str(e))
|
||||
raise
|
||||
|
||||
# Swift Checksum is the checksum of the concatenated segment checksums
|
||||
swift_checksum = hashlib.md5()
|
||||
@@ -154,28 +283,42 @@ class SwiftStorage(base.Storage):
|
||||
|
||||
# Read from the stream and write to the container in swift
|
||||
while not stream_reader.end_of_file:
|
||||
LOG.debug('Uploading segment %s.', stream_reader.segment)
|
||||
segment_name = stream_reader.segment
|
||||
# This path includes the correct segment number
|
||||
path = stream_reader.segment_path
|
||||
etag = self.client.put_object(container,
|
||||
stream_reader.segment,
|
||||
stream_reader)
|
||||
|
||||
segment_checksum = stream_reader.segment_checksum.hexdigest()
|
||||
LOG.info('Uploading segment %s.', segment_name)
|
||||
# self.client.put_object now receives a seekable stream_reader,
|
||||
# allowing swiftclient's internal retries to work on the current
|
||||
# segment.
|
||||
try:
|
||||
etag = self.client.put_object(container,
|
||||
segment_name,
|
||||
stream_reader)
|
||||
except swift_exc.ClientException as e:
|
||||
LOG.error('Swift client error uploading segment %s: %s',
|
||||
segment_name, str(e))
|
||||
raise
|
||||
# After put_object returns, the segment_checksum and segment_length
|
||||
# properties of stream_reader refer to the segment that was just
|
||||
# uploaded.
|
||||
segment_md5 = stream_reader.segment_checksum
|
||||
current_segment_bytes = stream_reader.segment_length
|
||||
|
||||
# Check each segment MD5 hash against swift etag
|
||||
if etag != segment_checksum:
|
||||
if etag != segment_md5:
|
||||
msg = ('Failed to upload data segment to swift. ETAG: %(tag)s '
|
||||
'Segment MD5: %(checksum)s.' %
|
||||
{'tag': etag, 'checksum': segment_checksum})
|
||||
{'tag': etag, 'checksum': segment_md5})
|
||||
raise Exception(msg)
|
||||
|
||||
segment_results.append({
|
||||
'path': path,
|
||||
'etag': etag,
|
||||
'size_bytes': stream_reader.segment_length
|
||||
'size_bytes': current_segment_bytes
|
||||
})
|
||||
|
||||
swift_checksum.update(segment_checksum.encode())
|
||||
swift_checksum.update(segment_md5.encode())
|
||||
|
||||
# All segments uploaded.
|
||||
num_segments = len(segment_results)
|
||||
@@ -204,6 +347,7 @@ class SwiftStorage(base.Storage):
|
||||
self.client.put_object(container,
|
||||
filename,
|
||||
manifest_data,
|
||||
headers=headers,
|
||||
query_string='multipart-manifest=put')
|
||||
|
||||
# Validation checksum is the Swift Checksum
|
||||
@@ -225,7 +369,7 @@ class SwiftStorage(base.Storage):
|
||||
try:
|
||||
self.client.delete_object(container,
|
||||
stream_reader.first_segment)
|
||||
except swiftclient.exceptions.ClientException as e:
|
||||
except swift_exc.ClientException as e:
|
||||
if e.http_status != 404:
|
||||
raise
|
||||
|
||||
|
||||
Reference in New Issue
Block a user