# Copyright 2020 Catalyst Cloud # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import hashlib import json from keystoneauth1 import session from keystoneauth1.identity import v3 from oslo_config import cfg from oslo_log import log as logging import swiftclient from backup.storage import base LOG = logging.getLogger(__name__) CONF = cfg.CONF def _get_user_keystone_session(auth_url, token, tenant_id): auth = v3.Token( auth_url=auth_url, token=token, project_domain_name="Default", project_id=tenant_id ) return session.Session(auth=auth, verify=False) def _get_service_client(auth_url, token, tenant_id): sess = _get_user_keystone_session(auth_url, token, tenant_id) return swiftclient.Connection(session=sess) def _set_attr(original): """Return a swift friendly header key.""" key = original.replace('_', '-') return 'X-Object-Meta-%s' % key def _get_attr(original): """Get a friendly name from an object header key.""" key = original.replace('-', '_') key = key.replace('x_object_meta_', '') return key class StreamReader(object): """Wrap the stream from the backup process and chunk it into segements.""" def __init__(self, stream, container, filename, max_file_size): self.stream = stream self.container = container self.filename = filename self.max_file_size = max_file_size self.segment_length = 0 self.process = None self.file_number = 0 self.end_of_file = False self.end_of_segment = False self.segment_checksum = hashlib.md5() @property def base_filename(self): """Filename with extensions removed.""" return self.filename.split('.')[0] @property def segment(self): return '%s_%08d' % (self.base_filename, self.file_number) @property def first_segment(self): return '%s_%08d' % (self.base_filename, 0) @property def segment_path(self): return '%s/%s' % (self.container, self.segment) def read(self, chunk_size=2 ** 16): if self.end_of_segment: self.segment_length = 0 self.segment_checksum = hashlib.md5() self.end_of_segment = False # Upload to a new file if we are starting or too large if self.segment_length > (self.max_file_size - chunk_size): self.file_number += 1 self.end_of_segment = True return '' chunk = self.stream.read(chunk_size) if not chunk: self.end_of_file = True return '' self.segment_checksum.update(chunk) self.segment_length += len(chunk) return chunk class SwiftStorage(base.Storage): def __init__(self): self.client = _get_service_client(CONF.os_auth_url, CONF.os_token, CONF.os_tenant_id) def save(self, stream, metadata=None, container='database_backups'): """Persist data from the stream to swift. * Read data from stream, upload to swift * Update the new object metadata, stream provides method to get metadata. :returns the new object checkshum and swift full URL. """ filename = stream.manifest LOG.info('Saving %(filename)s to %(container)s in swift.', {'filename': filename, 'container': container}) # Create the container if it doesn't already exist LOG.debug('Ensuring container %s', container) self.client.put_container(container) # Swift Checksum is the checksum of the concatenated segment checksums swift_checksum = hashlib.md5() # Wrap the output of the backup process to segment it for swift stream_reader = StreamReader(stream, container, filename, 2 * (1024 ** 3)) url = self.client.url # Full location where the backup manifest is stored location = "%s/%s/%s" % (url, container, filename) LOG.info('Uploading to %s', location) # Information about each segment upload job segment_results = [] # Read from the stream and write to the container in swift while not stream_reader.end_of_file: LOG.debug('Uploading segment %s.', stream_reader.segment) path = stream_reader.segment_path etag = self.client.put_object(container, stream_reader.segment, stream_reader) segment_checksum = stream_reader.segment_checksum.hexdigest() # Check each segment MD5 hash against swift etag if etag != segment_checksum: msg = ('Failed to upload data segment to swift. ETAG: %(tag)s ' 'Segment MD5: %(checksum)s.' % {'tag': etag, 'checksum': segment_checksum}) raise Exception(msg) segment_results.append({ 'path': path, 'etag': etag, 'size_bytes': stream_reader.segment_length }) swift_checksum.update(segment_checksum.encode()) # All segments uploaded. num_segments = len(segment_results) LOG.debug('File uploaded in %s segments.', num_segments) # An SLO will be generated if the backup was more than one segment in # length. large_object = num_segments > 1 # Meta data is stored as headers if metadata is None: metadata = {} metadata.update(stream.get_metadata()) headers = {} for key, value in metadata.items(): headers[_set_attr(key)] = value LOG.info('Metadata headers: %s', headers) if large_object: manifest_data = json.dumps(segment_results) LOG.info('Creating the SLO manifest file, manifest content: %s', manifest_data) # The etag returned from the manifest PUT is the checksum of the # manifest object (which is empty); this is not the checksum we # want. self.client.put_object(container, filename, manifest_data, query_string='multipart-manifest=put') # Validation checksum is the Swift Checksum final_swift_checksum = swift_checksum.hexdigest() else: LOG.info('Moving segment %(segment)s to %(filename)s.', {'segment': stream_reader.first_segment, 'filename': filename}) segment_result = segment_results[0] # Just rename it via a special put copy. headers['X-Copy-From'] = segment_result['path'] self.client.put_object(container, filename, '', headers=headers) # Delete the old segment file that was copied LOG.info('Deleting the old segment file %s.', stream_reader.first_segment) try: self.client.delete_object(container, stream_reader.first_segment) except swiftclient.exceptions.ClientException as e: if e.http_status != 404: raise final_swift_checksum = segment_result['etag'] # Validate the object by comparing checksums resp = self.client.head_object(container, filename) # swift returns etag in double quotes # e.g. '"dc3b0827f276d8d78312992cc60c2c3f"' etag = resp['etag'].strip('"') # Raise an error and mark backup as failed if etag != final_swift_checksum: msg = ('Failed to upload data to swift. Manifest ETAG: %(tag)s ' 'Swift MD5: %(checksum)s' % {'tag': etag, 'checksum': final_swift_checksum}) raise Exception(msg) return (final_swift_checksum, location) def _explodeLocation(self, location): storage_url = "/".join(location.split('/')[:-2]) container = location.split('/')[-2] filename = location.split('/')[-1] return storage_url, container, filename def _verify_checksum(self, etag, checksum): etag_checksum = etag.strip('"') if etag_checksum != checksum: msg = ('Checksum validation failure, actual: %s, expected: %s' % (etag_checksum, checksum)) raise Exception(msg) def load(self, location, backup_checksum): """Get object from the location.""" storage_url, container, filename = self._explodeLocation(location) headers, contents = self.client.get_object(container, filename, resp_chunk_size=2 ** 16) if backup_checksum: self._verify_checksum(headers.get('etag', ''), backup_checksum) return contents def load_metadata(self, parent_location, parent_checksum): """Load metadata from swift.""" if not parent_location: return {} _, container, filename = self._explodeLocation(parent_location) headers = self.client.head_object(container, filename) if parent_checksum: self._verify_checksum(headers.get('etag', ''), parent_checksum) _meta = {} for key, value in headers.items(): if key.startswith('x-object-meta'): _meta[_get_attr(key)] = value return _meta def is_incremental_backup(self, location): """Check if the location is an incremental backup.""" _, container, filename = self._explodeLocation(location) headers = self.client.head_object(container, filename) if 'x-object-meta-parent-location' in headers: return True return False def get_backup_lsn(self, location): """Get the backup LSN if exists.""" _, container, filename = self._explodeLocation(location) headers = self.client.head_object(container, filename) return headers.get('x-object-meta-lsn')