Merge "Add cinder backup driver for Google Cloud Storage"

This commit is contained in:
Jenkins 2016-01-19 18:22:04 +00:00 committed by Gerrit Code Review
commit 77cec880a4
8 changed files with 1227 additions and 0 deletions

View File

@ -0,0 +1,356 @@
# Copyright (C) 2012 Hewlett-Packard Development Company, L.P.
# Copyright (c) 2014 TrilioData, Inc
# Copyright (c) 2015 EMC Corporation
# Copyright (C) 2015 Kevin Fox <kevin@efox.cc>
# Copyright (C) 2015 Tom Barron <tpb@dyncloud.net>
# Copyright (C) 2016 Vedams Inc.
# Copyright (C) 2016 Google Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Implementation of a backup service using Google Cloud Storage(GCS)
Google Cloud Storage json apis are used for backup operations.
Authentication and authorization are based on OAuth2.0.
Server-centric flow is used for authentication.
"""
import base64
import hashlib
import httplib2
from apiclient import discovery
from apiclient import errors
from apiclient import http
from oauth2client import client
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import timeutils
import six
from cinder.backup import chunkeddriver
from cinder import exception
from cinder.i18n import _
LOG = logging.getLogger(__name__)
gcsbackup_service_opts = [
cfg.StrOpt('backup_gcs_bucket',
help='The GCS bucket to use.'),
cfg.IntOpt('backup_gcs_object_size',
default=52428800,
help='The size in bytes of GCS backup objects.'),
cfg.IntOpt('backup_gcs_block_size',
default=32768,
help='The size in bytes that changes are tracked '
'for incremental backups. backup_gcs_object_size '
'has to be multiple of backup_gcs_block_size.'),
cfg.IntOpt('backup_gcs_reader_chunk_size',
default=2097152,
help='GCS object will be downloaded in chunks of bytes.'),
cfg.IntOpt('backup_gcs_writer_chunk_size',
default=2097152,
help='GCS object will be uploaded in chunks of bytes. '
'Pass in a value of -1 if the file '
'is to be uploaded as a single chunk.'),
cfg.IntOpt('backup_gcs_num_retries',
default=3,
help='Number of times to retry.'),
cfg.ListOpt('backup_gcs_retry_error_codes',
default=['429'],
help='List of GCS error codes.'),
cfg.StrOpt('backup_gcs_bucket_location',
default='US',
help='Location of GCS bucket.'),
cfg.StrOpt('backup_gcs_storage_class',
default='NEARLINE',
help='Storage class of GCS bucket.'),
cfg.StrOpt('backup_gcs_credential_file',
help='Absolute path of GCS service account credential file.'),
cfg.StrOpt('backup_gcs_project_id',
help='Owner project id for GCS bucket.'),
cfg.StrOpt('backup_gcs_user_agent',
default='gcscinder',
help='Http user-agent string for gcs api.'),
cfg.BoolOpt('backup_gcs_enable_progress_timer',
default=True,
help='Enable or Disable the timer to send the periodic '
'progress notifications to Ceilometer when backing '
'up the volume to the GCS backend storage. The '
'default value is True to enable the timer.'),
]
CONF = cfg.CONF
CONF.register_opts(gcsbackup_service_opts)
def gcs_logger(func):
def func_wrapper(self, *args, **kwargs):
try:
return func(self, *args, **kwargs)
except errors.Error as err:
raise exception.GCSApiFailure(reason=err)
except client.Error as err:
raise exception.GCSOAuth2Failure(reason=err)
except Exception as err:
raise exception.GCSConnectionFailure(reason=err)
return func_wrapper
class GoogleBackupDriver(chunkeddriver.ChunkedBackupDriver):
"""Provides backup, restore and delete of backup objects within GCS."""
def __init__(self, context, db_driver=None):
self.check_gcs_options()
backup_bucket = CONF.backup_gcs_bucket
backup_credential = CONF.backup_gcs_credential_file
self.gcs_project_id = CONF.backup_gcs_project_id
chunk_size_bytes = CONF.backup_gcs_object_size
sha_block_size_bytes = CONF.backup_gcs_block_size
enable_progress_timer = CONF.backup_gcs_enable_progress_timer
super(GoogleBackupDriver, self).__init__(context, chunk_size_bytes,
sha_block_size_bytes,
backup_bucket,
enable_progress_timer,
db_driver)
credentials = client.GoogleCredentials.from_stream(backup_credential)
self.reader_chunk_size = CONF.backup_gcs_reader_chunk_size
self.writer_chunk_size = CONF.backup_gcs_writer_chunk_size
self.bucket_location = CONF.backup_gcs_bucket_location
self.storage_class = CONF.backup_gcs_storage_class
self.num_retries = CONF.backup_gcs_num_retries
http_user_agent = http.set_user_agent(httplib2.Http(),
CONF.backup_gcs_user_agent)
self.conn = discovery.build('storage',
'v1',
http=http_user_agent,
credentials=credentials)
self.resumable = self.writer_chunk_size != -1
def check_gcs_options(self):
required_options = ('backup_gcs_bucket', 'backup_gcs_credential_file',
'backup_gcs_project_id')
unset_options = [opt for opt in required_options
if not getattr(CONF, opt, None)]
if unset_options:
msg = _('Unset gcs options: %s') % unset_options
LOG.error(msg)
raise exception.InvalidInput(reason=msg)
@gcs_logger
def put_container(self, bucket):
"""Create the bucket if not exists."""
buckets = self.conn.buckets().list(
project=self.gcs_project_id,
prefix=bucket,
fields="items(name)").execute(
num_retries=self.num_retries).get('items', [])
if not any(b.get('name') == bucket for b in buckets):
self.conn.buckets().insert(
project=self.gcs_project_id,
body={'name': bucket,
'location': self.bucket_location,
'storageClass': self.storage_class}).execute(
num_retries=self.num_retries)
@gcs_logger
def get_container_entries(self, bucket, prefix):
"""Get bucket entry names."""
obj_list_dict = self.conn.objects().list(
bucket=bucket,
fields="items(name)",
prefix=prefix).execute(num_retries=self.num_retries).get(
'items', [])
return [obj_dict.get('name') for obj_dict in obj_list_dict]
def get_object_writer(self, bucket, object_name, extra_metadata=None):
"""Return a writer object.
Returns a writer object that stores a chunk of volume data in a
GCS object store.
"""
return GoogleObjectWriter(bucket, object_name, self.conn,
self.writer_chunk_size,
self.num_retries,
self.resumable)
def get_object_reader(self, bucket, object_name, extra_metadata=None):
"""Return reader object.
Returns a reader object that retrieves a chunk of backed-up volume data
from a GCS object store.
"""
return GoogleObjectReader(bucket, object_name, self.conn,
self.reader_chunk_size,
self.num_retries)
@gcs_logger
def delete_object(self, bucket, object_name):
"""Deletes a backup object from a GCS object store."""
self.conn.objects().delete(
bucket=bucket,
object=object_name).execute(num_retries=self.num_retries)
def _generate_object_name_prefix(self, backup):
"""Generates a GCS backup object name prefix.
prefix = volume_volid/timestamp/az_saz_backup_bakid
volid is volume id.
timestamp is time in UTC with format of YearMonthDateHourMinuteSecond.
saz is storage_availability_zone.
bakid is backup id for volid.
"""
az = 'az_%s' % self.az
backup_name = '%s_backup_%s' % (az, backup.id)
volume = 'volume_%s' % (backup.volume_id)
timestamp = timeutils.utcnow().strftime("%Y%m%d%H%M%S")
prefix = volume + '/' + timestamp + '/' + backup_name
LOG.debug('generate_object_name_prefix: %s', prefix)
return prefix
def update_container_name(self, backup, bucket):
"""Use the bucket name as provided - don't update."""
return
def get_extra_metadata(self, backup, volume):
"""GCS driver does not use any extra metadata."""
return
class GoogleObjectWriter(object):
def __init__(self, bucket, object_name, conn, writer_chunk_size,
num_retries, resumable):
self.bucket = bucket
self.object_name = object_name
self.conn = conn
self.data = bytearray()
self.chunk_size = writer_chunk_size
self.num_retries = num_retries
self.resumable = resumable
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
self.close()
def write(self, data):
self.data += data
@gcs_logger
def close(self):
media = http.MediaIoBaseUpload(six.BytesIO(self.data),
'application/octet-stream',
chunksize=self.chunk_size,
resumable=self.resumable)
resp = self.conn.objects().insert(
bucket=self.bucket,
name=self.object_name,
body={},
media_body=media).execute(num_retries=self.num_retries)
etag = resp['md5Hash']
md5 = hashlib.md5(self.data).digest()
if six.PY3:
md5 = md5.encode('utf-8')
etag = etag.encode('utf-8')
md5 = base64.b64encode(md5)
if etag != md5:
err = _('MD5 of object: %(object_name)s before: '
'%(md5)s and after: %(etag)s is not same.') % {
'object_name': self.object_name,
'md5': md5, 'etag': etag, }
raise exception.InvalidBackup(reason=err)
else:
LOG.debug('MD5 before: %(md5)s and after: %(etag)s '
'writing object: %(object_name)s in GCS.',
{'etag': etag, 'md5': md5,
'object_name': self.object_name, })
return md5
class GoogleObjectReader(object):
def __init__(self, bucket, object_name, conn, reader_chunk_size,
num_retries):
self.bucket = bucket
self.object_name = object_name
self.conn = conn
self.chunk_size = reader_chunk_size
self.num_retries = num_retries
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
pass
@gcs_logger
def read(self):
req = self.conn.objects().get_media(
bucket=self.bucket,
object=self.object_name)
fh = six.BytesIO()
downloader = GoogleMediaIoBaseDownload(
fh, req, chunksize=self.chunk_size)
done = False
while not done:
status, done = downloader.next_chunk(num_retries=self.num_retries)
LOG.debug('GCS Object download Complete.')
return fh.getvalue()
class GoogleMediaIoBaseDownload(http.MediaIoBaseDownload):
@http.util.positional(1)
def next_chunk(self, num_retries=None):
error_codes = CONF.backup_gcs_retry_error_codes
headers = {'range': 'bytes=%d-%d' %
(self._progress, self._progress + self._chunksize)}
gcs_http = self._request.http
for retry_num in range(num_retries + 1):
if retry_num > 0:
self._sleep(self._rand() * 2 ** retry_num)
resp, content = gcs_http.request(self._uri, headers=headers)
if resp.status < 500 and (six.text_type(resp.status)
not in error_codes):
break
if resp.status in [200, 206]:
if 'content-location' in resp and (
resp['content-location'] != self._uri):
self._uri = resp['content-location']
self._progress += len(content)
self._fd.write(content)
if 'content-range' in resp:
content_range = resp['content-range']
length = content_range.rsplit('/', 1)[1]
self._total_size = int(length)
elif 'content-length' in resp:
self._total_size = int(resp['content-length'])
if self._progress == self._total_size:
self._done = True
return (http.MediaDownloadProgress(self._progress,
self._total_size), self._done)
else:
raise http.HttpError(resp, content, uri=self._uri)
def get_backup_driver(context):
return GoogleBackupDriver(context)

View File

@ -1044,3 +1044,16 @@ class TegileAPIException(VolumeBackendAPIException):
# NexentaStor driver exception
class NexentaException(VolumeDriverException):
message = _("%(message)s")
# Google Cloud Storage(GCS) backup driver
class GCSConnectionFailure(BackupDriverException):
message = _("Google Cloud Storage connection failure: %(reason)s")
class GCSApiFailure(BackupDriverException):
message = _("Google Cloud Storage api failure: %(reason)s")
class GCSOAuth2Failure(BackupDriverException):
message = _("Google Cloud Storage oauth2 failure: %(reason)s")

View File

@ -23,6 +23,7 @@ from cinder.backup import chunkeddriver as cinder_backup_chunkeddriver
from cinder.backup import driver as cinder_backup_driver
from cinder.backup.drivers import ceph as cinder_backup_drivers_ceph
from cinder.backup.drivers import glusterfs as cinder_backup_drivers_glusterfs
from cinder.backup.drivers import google as cinder_backup_drivers_google
from cinder.backup.drivers import nfs as cinder_backup_drivers_nfs
from cinder.backup.drivers import posix as cinder_backup_drivers_posix
from cinder.backup.drivers import swift as cinder_backup_drivers_swift
@ -283,6 +284,7 @@ def list_opts():
cinder_volume_drivers_remotefs.nas_opts,
cinder_volume_drivers_remotefs.volume_opts,
cinder_volume_drivers_emc_xtremio.XTREMIO_OPTS,
cinder_backup_drivers_google.gcsbackup_service_opts,
[cinder_api_middleware_auth.use_forwarded_for_opt],
cinder_volume_drivers_hitachi_hbsdcommon.volume_opts,
cinder_volume_drivers_infortrend_eonstor_ds_cli_commoncli.

View File

@ -0,0 +1,154 @@
# Copyright (C) 2012 Hewlett-Packard Development Company, L.P.
# Copyright (C) 2016 Vedams Inc.
# Copyright (C) 2016 Google Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import os
import zlib
from apiclient import errors
from oauth2client import client
from oslo_utils import units
import six
class FakeGoogleObjectInsertExecute(object):
def __init__(self, *args, **kwargs):
self.container_name = kwargs['bucket']
def execute(self, *args, **kwargs):
if self.container_name == 'gcs_api_failure':
raise errors.Error
return {u'md5Hash': u'Z2NzY2luZGVybWQ1'}
class FakeGoogleObjectListExecute(object):
def __init__(self, *args, **kwargs):
self.container_name = kwargs['bucket']
def execute(self, *args, **kwargs):
if self.container_name == 'gcs_connection_failure':
raise Exception
return {'items': [{'name': 'backup_001'},
{'name': 'backup_002'},
{'name': 'backup_003'}]}
class FakeGoogleBucketListExecute(object):
def __init__(self, *args, **kwargs):
self.container_name = kwargs['prefix']
def execute(self, *args, **kwargs):
if self.container_name == 'gcs_oauth2_failure':
raise client.Error
return {u'items': [{u'name': u'gcscinderbucket'},
{u'name': u'gcsbucket'}]}
class FakeGoogleBucketInsertExecute(object):
def execute(self, *args, **kwargs):
pass
class FakeMediaObject(object):
def __init__(self, *args, **kwargs):
self.bucket_name = kwargs['bucket']
self.object_name = kwargs['object']
class FakeGoogleObject(object):
def insert(self, *args, **kwargs):
return FakeGoogleObjectInsertExecute(*args, **kwargs)
def get_media(self, *args, **kwargs):
return FakeMediaObject(*args, **kwargs)
def list(self, *args, **kwargs):
return FakeGoogleObjectListExecute(*args, **kwargs)
class FakeGoogleBucket(object):
def list(self, *args, **kwargs):
return FakeGoogleBucketListExecute(*args, **kwargs)
def insert(self, *args, **kwargs):
return FakeGoogleBucketInsertExecute()
class FakeGoogleDiscovery(object):
"""Logs calls instead of executing."""
def __init__(self, *args, **kwargs):
pass
@classmethod
def Build(self, *args, **kargs):
return FakeDiscoveryBuild()
class FakeDiscoveryBuild(object):
"""Logging calls instead of executing."""
def __init__(self, *args, **kwargs):
pass
def objects(self):
return FakeGoogleObject()
def buckets(self):
return FakeGoogleBucket()
class FakeGoogleCredentials(object):
def __init__(self, *args, **kwargs):
pass
@classmethod
def from_stream(self, *args, **kwargs):
pass
class FakeGoogleMediaIoBaseDownload(object):
def __init__(self, fh, req, chunksize=None):
if 'metadata' in req.object_name:
metadata = {}
metadata['version'] = '1.0.0'
metadata['backup_id'] = 123
metadata['volume_id'] = 123
metadata['backup_name'] = 'fake backup'
metadata['backup_description'] = 'fake backup description'
metadata['created_at'] = '2016-01-09 11:20:54,805'
metadata['objects'] = [{
'backup_001': {'compression': 'zlib', 'length': 10,
'offset': 0},
'backup_002': {'compression': 'zlib', 'length': 10,
'offset': 10},
'backup_003': {'compression': 'zlib', 'length': 10,
'offset': 20}
}]
metadata_json = json.dumps(metadata, sort_keys=True, indent=2)
if six.PY3:
metadata_json = metadata_json.encode('utf-8')
fh.write(metadata_json)
else:
fh.write(zlib.compress(os.urandom(units.Mi)))
def next_chunk(self, **kwargs):
return (100, True)

View File

@ -0,0 +1,131 @@
# Copyright (C) 2012 Hewlett-Packard Development Company, L.P.
# Copyright (C) 2016 Vedams Inc.
# Copyright (C) 2016 Google Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import tempfile
class FakeGoogleObjectInsertExecute(object):
def execute(self, *args, **kwargs):
return {u'md5Hash': u'Z2NzY2luZGVybWQ1'}
class FakeGoogleObjectListExecute(object):
def __init__(self, *args, **kwargs):
self.bucket_name = kwargs['bucket']
self.prefix = kwargs['prefix']
def execute(self, *args, **kwargs):
bucket_dir = tempfile.gettempdir() + '/' + self.bucket_name
fake_body = []
for f in os.listdir(bucket_dir):
try:
f.index(self.prefix)
fake_body.append({'name': f})
except Exception:
pass
return {'items': fake_body}
class FakeGoogleBucketListExecute(object):
def execute(self, *args, **kwargs):
return {u'items': [{u'name': u'gcscinderbucket'},
{u'name': u'gcsbucket'}]}
class FakeGoogleBucketInsertExecute(object):
def execute(self, *args, **kwargs):
pass
class FakeMediaObject(object):
def __init__(self, *args, **kwargs):
self.bucket_name = kwargs['bucket']
self.object_name = kwargs['object']
class FakeGoogleObject(object):
def insert(self, *args, **kwargs):
object_path = (tempfile.gettempdir() + '/' + kwargs['bucket'] + '/' +
kwargs['name'])
kwargs['media_body']._fd.getvalue()
with open(object_path, 'wb') as object_file:
kwargs['media_body']._fd.seek(0)
object_file.write(kwargs['media_body']._fd.read())
return FakeGoogleObjectInsertExecute()
def get_media(self, *args, **kwargs):
return FakeMediaObject(*args, **kwargs)
def list(self, *args, **kwargs):
return FakeGoogleObjectListExecute(*args, **kwargs)
class FakeGoogleBucket(object):
def list(self, *args, **kwargs):
return FakeGoogleBucketListExecute()
def insert(self, *args, **kwargs):
return FakeGoogleBucketInsertExecute()
class FakeGoogleDiscovery(object):
"""Logs calls instead of executing."""
def __init__(self, *args, **kwargs):
pass
@classmethod
def Build(self, *args, **kargs):
return FakeDiscoveryBuild()
class FakeDiscoveryBuild(object):
"""Logging calls instead of executing."""
def __init__(self, *args, **kwargs):
pass
def objects(self):
return FakeGoogleObject()
def buckets(self):
return FakeGoogleBucket()
class FakeGoogleCredentials(object):
def __init__(self, *args, **kwargs):
pass
@classmethod
def from_stream(self, *args, **kwargs):
pass
class FakeGoogleMediaIoBaseDownload(object):
def __init__(self, fh, req, chunksize=None):
object_path = (tempfile.gettempdir() + '/' + req.bucket_name + '/' +
req.object_name)
with open(object_path, 'rb') as object_file:
fh.write(object_file.read())
def next_chunk(self, **kwargs):
return (100, True)

View File

@ -0,0 +1,567 @@
# Copyright (C) 2012 Hewlett-Packard Development Company, L.P.
# Copyright (C) 2016 Vedams Inc.
# Copyright (C) 2016 Google Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Tests for Google Backup code.
"""
import bz2
import filecmp
import hashlib
import os
import shutil
import tempfile
import zlib
import mock
from oslo_utils import units
from cinder.backup.drivers import google as google_dr
from cinder import context
from cinder import db
from cinder import exception
from cinder.i18n import _
from cinder import objects
from cinder import test
from cinder.tests.unit.backup import fake_google_client
from cinder.tests.unit.backup import fake_google_client2
class FakeMD5(object):
def __init__(self, *args, **kwargs):
pass
@classmethod
def digest(self):
return 'gcscindermd5'
@classmethod
def hexdigest(self):
return 'gcscindermd5'
class FakeObjectName(object):
@classmethod
def _fake_generate_object_name_prefix(self, backup):
az = 'az_fake'
backup_name = '%s_backup_%s' % (az, backup.id)
volume = 'volume_%s' % (backup.volume_id)
prefix = volume + '_' + backup_name
return prefix
def gcs_client(func):
@mock.patch.object(google_dr.client, 'GoogleCredentials',
fake_google_client.FakeGoogleCredentials)
@mock.patch.object(google_dr.discovery, 'build',
fake_google_client.FakeGoogleDiscovery.Build)
@mock.patch.object(google_dr, 'GoogleMediaIoBaseDownload',
fake_google_client.FakeGoogleMediaIoBaseDownload)
@mock.patch.object(hashlib, 'md5', FakeMD5)
def func_wrapper(self, *args, **kwargs):
return func(self, *args, **kwargs)
return func_wrapper
def gcs_client2(func):
@mock.patch.object(google_dr.client, 'GoogleCredentials',
fake_google_client2.FakeGoogleCredentials)
@mock.patch.object(google_dr.discovery, 'build',
fake_google_client2.FakeGoogleDiscovery.Build)
@mock.patch.object(google_dr, 'GoogleMediaIoBaseDownload',
fake_google_client2.FakeGoogleMediaIoBaseDownload)
@mock.patch.object(google_dr.GoogleBackupDriver,
'_generate_object_name_prefix',
FakeObjectName._fake_generate_object_name_prefix)
@mock.patch.object(hashlib, 'md5', FakeMD5)
def func_wrapper(self, *args, **kwargs):
return func(self, *args, **kwargs)
return func_wrapper
def fake_backup_metadata(self, backup, object_meta):
raise exception.BackupDriverException(message=_('fake'))
def fake_delete(self, backup):
raise exception.BackupOperationError()
def _fake_delete_object(self, bucket_name, object_name):
raise AssertionError('delete_object method should not be called.')
class GoogleBackupDriverTestCase(test.TestCase):
"""Test Case for Google"""
_DEFAULT_VOLUME_ID = 'c7eb81f4-bec6-4730-a60f-8888885874df'
def _create_volume_db_entry(self, volume_id=_DEFAULT_VOLUME_ID):
vol = {'id': volume_id,
'size': 1,
'status': 'available'}
return db.volume_create(self.ctxt, vol)['id']
def _create_backup_db_entry(self,
volume_id=_DEFAULT_VOLUME_ID,
container=google_dr.CONF.backup_gcs_bucket,
parent_id=None,
service_metadata=None):
try:
db.volume_get(self.ctxt, volume_id)
except exception.NotFound:
self._create_volume_db_entry(volume_id=volume_id)
kwargs = {'size': 1,
'container': container,
'volume_id': volume_id,
'parent_id': parent_id,
'user_id': 'user-id',
'project_id': 'project-id',
'service_metadata': service_metadata,
}
backup = objects.Backup(context=self.ctxt, **kwargs)
backup.create()
return backup
def setUp(self):
super(GoogleBackupDriverTestCase, self).setUp()
self.flags(backup_gcs_bucket='gcscinderbucket')
self.flags(backup_gcs_credential_file='test-file')
self.flags(backup_gcs_project_id='test-gcs')
self.ctxt = context.get_admin_context()
self.volume_file = tempfile.NamedTemporaryFile()
self.temp_dir = tempfile.mkdtemp()
self.addCleanup(self.volume_file.close)
# Remove tempdir.
self.addCleanup(shutil.rmtree, self.temp_dir)
for _i in range(0, 64):
self.volume_file.write(os.urandom(units.Ki))
@gcs_client
def test_backup(self):
volume_id = 'b09b1ad4-5f0e-4d3f-8b9e-0000004f5ec2'
container_name = 'test-bucket'
backup = self._create_backup_db_entry(volume_id=volume_id,
container=container_name)
service = google_dr.GoogleBackupDriver(self.ctxt)
self.volume_file.seek(0)
result = service.backup(backup, self.volume_file)
self.assertIsNone(result)
@gcs_client
def test_backup_uncompressed(self):
volume_id = '2b9f10a3-42b4-4fdf-b316-000000ceb039'
backup = self._create_backup_db_entry(volume_id=volume_id)
self.flags(backup_compression_algorithm='none')
service = google_dr.GoogleBackupDriver(self.ctxt)
self.volume_file.seek(0)
service.backup(backup, self.volume_file)
@gcs_client
def test_backup_bz2(self):
volume_id = 'dc0fee35-b44e-4f13-80d6-000000e1b50c'
backup = self._create_backup_db_entry(volume_id=volume_id)
self.flags(backup_compression_algorithm='bz2')
service = google_dr.GoogleBackupDriver(self.ctxt)
self.volume_file.seek(0)
service.backup(backup, self.volume_file)
@gcs_client
def test_backup_zlib(self):
volume_id = '5cea0535-b6fb-4531-9a38-000000bea094'
backup = self._create_backup_db_entry(volume_id=volume_id)
self.flags(backup_compression_algorithm='zlib')
service = google_dr.GoogleBackupDriver(self.ctxt)
self.volume_file.seek(0)
service.backup(backup, self.volume_file)
@gcs_client
def test_backup_default_container(self):
volume_id = '9552017f-c8b9-4e4e-a876-00000053349c'
backup = self._create_backup_db_entry(volume_id=volume_id,
container=None)
service = google_dr.GoogleBackupDriver(self.ctxt)
self.volume_file.seek(0)
service.backup(backup, self.volume_file)
self.assertEqual('gcscinderbucket', backup.container)
@gcs_client
@mock.patch('cinder.backup.drivers.google.GoogleBackupDriver.'
'_send_progress_end')
@mock.patch('cinder.backup.drivers.google.GoogleBackupDriver.'
'_send_progress_notification')
def test_backup_default_container_notify(self, _send_progress,
_send_progress_end):
volume_id = '87dd0eed-2598-4ebd-8ebb-000000ac578a'
backup = self._create_backup_db_entry(volume_id=volume_id,
container=None)
# If the backup_object_number_per_notification is set to 1,
# the _send_progress method will be called for sure.
google_dr.CONF.set_override("backup_object_number_per_notification", 1)
google_dr.CONF.set_override("backup_gcs_enable_progress_timer", False)
service = google_dr.GoogleBackupDriver(self.ctxt)
self.volume_file.seek(0)
service.backup(backup, self.volume_file)
self.assertTrue(_send_progress.called)
self.assertTrue(_send_progress_end.called)
# If the backup_object_number_per_notification is increased to
# another value, the _send_progress method will not be called.
_send_progress.reset_mock()
_send_progress_end.reset_mock()
google_dr.CONF.set_override("backup_object_number_per_notification",
10)
service = google_dr.GoogleBackupDriver(self.ctxt)
self.volume_file.seek(0)
service.backup(backup, self.volume_file)
self.assertFalse(_send_progress.called)
self.assertTrue(_send_progress_end.called)
# If the timer is enabled, the _send_progress will be called,
# since the timer can trigger the progress notification.
_send_progress.reset_mock()
_send_progress_end.reset_mock()
google_dr.CONF.set_override("backup_object_number_per_notification",
10)
google_dr.CONF.set_override("backup_gcs_enable_progress_timer", True)
service = google_dr.GoogleBackupDriver(self.ctxt)
self.volume_file.seek(0)
service.backup(backup, self.volume_file)
self.assertTrue(_send_progress.called)
self.assertTrue(_send_progress_end.called)
@gcs_client
def test_backup_custom_container(self):
volume_id = '1da9859e-77e5-4731-bd58-000000ca119e'
container_name = 'fake99'
backup = self._create_backup_db_entry(volume_id=volume_id,
container=container_name)
service = google_dr.GoogleBackupDriver(self.ctxt)
self.volume_file.seek(0)
service.backup(backup, self.volume_file)
self.assertEqual(container_name, backup.container)
@gcs_client2
def test_backup_shafile(self):
volume_id = '6465dad4-22af-48f7-8a1a-000000218907'
container_name = self.temp_dir.replace(tempfile.gettempdir() + '/',
'', 1)
backup = self._create_backup_db_entry(volume_id=volume_id,
container=container_name)
service = google_dr.GoogleBackupDriver(self.ctxt)
self.volume_file.seek(0)
service.backup(backup, self.volume_file)
self.assertEqual(container_name, backup.container)
# Verify sha contents
content1 = service._read_sha256file(backup)
self.assertEqual(64 * units.Ki / content1['chunk_size'],
len(content1['sha256s']))
@gcs_client2
def test_backup_cmp_shafiles(self):
volume_id = '1a99ac67-c534-4fe3-b472-0000001785e2'
container_name = self.temp_dir.replace(tempfile.gettempdir() + '/',
'', 1)
backup = self._create_backup_db_entry(volume_id=volume_id,
container=container_name)
service1 = google_dr.GoogleBackupDriver(self.ctxt)
self.volume_file.seek(0)
service1.backup(backup, self.volume_file)
self.assertEqual(container_name, backup.container)
# Create incremental backup with no change to contents
deltabackup = self._create_backup_db_entry(volume_id=volume_id,
container=container_name,
parent_id=backup.id)
service2 = google_dr.GoogleBackupDriver(self.ctxt)
self.volume_file.seek(0)
service2.backup(deltabackup, self.volume_file)
self.assertEqual(container_name, deltabackup.container)
# Compare shas from both files
content1 = service1._read_sha256file(backup)
content2 = service2._read_sha256file(deltabackup)
self.assertEqual(len(content1['sha256s']), len(content2['sha256s']))
self.assertEqual(set(content1['sha256s']), set(content2['sha256s']))
@gcs_client2
def test_backup_delta_two_objects_change(self):
volume_id = '30dab288-265a-4583-9abe-000000d42c67'
self.flags(backup_gcs_object_size=8 * units.Ki)
self.flags(backup_gcs_block_size=units.Ki)
container_name = self.temp_dir.replace(tempfile.gettempdir() + '/',
'', 1)
backup = self._create_backup_db_entry(volume_id=volume_id,
container=container_name)
service1 = google_dr.GoogleBackupDriver(self.ctxt)
self.volume_file.seek(0)
service1.backup(backup, self.volume_file)
self.assertEqual(container_name, backup.container)
# Create incremental backup with no change to contents
self.volume_file.seek(2 * 8 * units.Ki)
self.volume_file.write(os.urandom(units.Ki))
self.volume_file.seek(4 * 8 * units.Ki)
self.volume_file.write(os.urandom(units.Ki))
deltabackup = self._create_backup_db_entry(volume_id=volume_id,
container=container_name,
parent_id=backup.id)
service2 = google_dr.GoogleBackupDriver(self.ctxt)
self.volume_file.seek(0)
service2.backup(deltabackup, self.volume_file)
self.assertEqual(container_name, deltabackup.container)
content1 = service1._read_sha256file(backup)
content2 = service2._read_sha256file(deltabackup)
# Verify that two shas are changed at index 16 and 32
self.assertNotEqual(content1['sha256s'][16], content2['sha256s'][16])
self.assertNotEqual(content1['sha256s'][32], content2['sha256s'][32])
@gcs_client2
def test_backup_delta_two_blocks_in_object_change(self):
volume_id = 'b943e84f-aa67-4331-9ab2-000000cf19ba'
self.flags(backup_gcs_object_size=8 * units.Ki)
self.flags(backup_gcs_block_size=units.Ki)
container_name = self.temp_dir.replace(tempfile.gettempdir() + '/',
'', 1)
backup = self._create_backup_db_entry(volume_id=volume_id,
container=container_name)
service1 = google_dr.GoogleBackupDriver(self.ctxt)
self.volume_file.seek(0)
service1.backup(backup, self.volume_file)
self.assertEqual(container_name, backup.container)
# Create incremental backup with no change to contents
self.volume_file.seek(16 * units.Ki)
self.volume_file.write(os.urandom(units.Ki))
self.volume_file.seek(20 * units.Ki)
self.volume_file.write(os.urandom(units.Ki))
deltabackup = self._create_backup_db_entry(volume_id=volume_id,
container=container_name,
parent_id=backup.id)
service2 = google_dr.GoogleBackupDriver(self.ctxt)
self.volume_file.seek(0)
service2.backup(deltabackup, self.volume_file)
self.assertEqual(container_name, deltabackup.container)
# Verify that two shas are changed at index 16 and 20
content1 = service1._read_sha256file(backup)
content2 = service2._read_sha256file(deltabackup)
self.assertNotEqual(content1['sha256s'][16], content2['sha256s'][16])
self.assertNotEqual(content1['sha256s'][20], content2['sha256s'][20])
@gcs_client
def test_create_backup_fail(self):
volume_id = 'b09b1ad4-5f0e-4d3f-8b9e-0000004f5ec2'
container_name = 'gcs_api_failure'
backup = self._create_backup_db_entry(volume_id=volume_id,
container=container_name)
service = google_dr.GoogleBackupDriver(self.ctxt)
self.volume_file.seek(0)
self.assertRaises(exception.GCSApiFailure,
service.backup,
backup, self.volume_file)
@gcs_client
def test_create_backup_fail2(self):
volume_id = 'b09b1ad4-5f0e-4d3f-8b9e-0000004f5ec2'
container_name = 'gcs_oauth2_failure'
backup = self._create_backup_db_entry(volume_id=volume_id,
container=container_name)
service = google_dr.GoogleBackupDriver(self.ctxt)
self.volume_file.seek(0)
self.assertRaises(exception.GCSOAuth2Failure,
service.backup,
backup, self.volume_file)
@gcs_client
@mock.patch.object(google_dr.GoogleBackupDriver, '_backup_metadata',
fake_backup_metadata)
def test_backup_backup_metadata_fail(self):
"""Test of when an exception occurs in backup().
In backup(), after an exception occurs in
self._backup_metadata(), we want to check the process of an
exception handler.
"""
volume_id = '020d9142-339c-4876-a445-000000f1520c'
backup = self._create_backup_db_entry(volume_id=volume_id)
self.flags(backup_compression_algorithm='none')
service = google_dr.GoogleBackupDriver(self.ctxt)
self.volume_file.seek(0)
# We expect that an exception be notified directly.
self.assertRaises(exception.BackupDriverException,
service.backup,
backup, self.volume_file)
@gcs_client
@mock.patch.object(google_dr.GoogleBackupDriver, '_backup_metadata',
fake_backup_metadata)
@mock.patch.object(google_dr.GoogleBackupDriver, 'delete', fake_delete)
def test_backup_backup_metadata_fail2(self):
"""Test of when an exception occurs in an exception handler.
In backup(), after an exception occurs in
self._backup_metadata(), we want to check the process when the
second exception occurs in self.delete().
"""
volume_id = '2164421d-f181-4db7-b9bd-000000eeb628'
backup = self._create_backup_db_entry(volume_id=volume_id)
self.flags(backup_compression_algorithm='none')
service = google_dr.GoogleBackupDriver(self.ctxt)
self.volume_file.seek(0)
# We expect that the second exception is notified.
self.assertRaises(exception.BackupOperationError,
service.backup,
backup, self.volume_file)
@gcs_client
def test_restore(self):
volume_id = 'c2a81f09-f480-4325-8424-00000071685b'
backup = self._create_backup_db_entry(volume_id=volume_id)
service = google_dr.GoogleBackupDriver(self.ctxt)
with tempfile.NamedTemporaryFile() as volume_file:
service.restore(backup, volume_id, volume_file)
@gcs_client
def test_restore_fail(self):
volume_id = 'c2a81f09-f480-4325-8424-00000071685b'
container_name = 'gcs_connection_failure'
backup = self._create_backup_db_entry(volume_id=volume_id,
container=container_name)
service = google_dr.GoogleBackupDriver(self.ctxt)
with tempfile.NamedTemporaryFile() as volume_file:
self.assertRaises(exception.GCSConnectionFailure,
service.restore,
backup, volume_id, volume_file)
@gcs_client2
def test_restore_delta(self):
volume_id = '04d83506-bcf7-4ff5-9c65-00000051bd2e'
self.flags(backup_gcs_object_size=8 * units.Ki)
self.flags(backup_gcs_block_size=units.Ki)
container_name = self.temp_dir.replace(tempfile.gettempdir() + '/',
'', 1)
backup = self._create_backup_db_entry(volume_id=volume_id,
container=container_name)
service1 = google_dr.GoogleBackupDriver(self.ctxt)
self.volume_file.seek(0)
service1.backup(backup, self.volume_file)
# Create incremental backup with no change to contents
self.volume_file.seek(16 * units.Ki)
self.volume_file.write(os.urandom(units.Ki))
self.volume_file.seek(20 * units.Ki)
self.volume_file.write(os.urandom(units.Ki))
deltabackup = self._create_backup_db_entry(volume_id=volume_id,
container=container_name,
parent_id=backup.id)
self.volume_file.seek(0)
service2 = google_dr.GoogleBackupDriver(self.ctxt)
service2.backup(deltabackup, self.volume_file, True)
with tempfile.NamedTemporaryFile() as restored_file:
service2.restore(deltabackup, volume_id,
restored_file)
self.assertTrue(filecmp.cmp(self.volume_file.name,
restored_file.name))
@gcs_client
def test_delete(self):
volume_id = '9ab256c8-3175-4ad8-baa1-0000007f9d31'
object_prefix = 'test_prefix'
backup = self._create_backup_db_entry(volume_id=volume_id,
service_metadata=object_prefix)
service = google_dr.GoogleBackupDriver(self.ctxt)
service.delete(backup)
@gcs_client
@mock.patch.object(google_dr.GoogleBackupDriver, 'delete_object',
_fake_delete_object)
def test_delete_without_object_prefix(self):
volume_id = 'ee30d649-72a6-49a5-b78d-000000edb6b1'
backup = self._create_backup_db_entry(volume_id=volume_id)
service = google_dr.GoogleBackupDriver(self.ctxt)
service.delete(backup)
@gcs_client
def test_get_compressor(self):
service = google_dr.GoogleBackupDriver(self.ctxt)
compressor = service._get_compressor('None')
self.assertIsNone(compressor)
compressor = service._get_compressor('zlib')
self.assertEqual(zlib, compressor)
compressor = service._get_compressor('bz2')
self.assertEqual(bz2, compressor)
self.assertRaises(ValueError, service._get_compressor, 'fake')
@gcs_client
def test_prepare_output_data_effective_compression(self):
service = google_dr.GoogleBackupDriver(self.ctxt)
# Set up buffer of 128 zeroed bytes
fake_data = b'\0' * 128
result = service._prepare_output_data(fake_data)
self.assertEqual('zlib', result[0])
self.assertTrue(len(result) < len(fake_data))
@gcs_client
def test_prepare_output_data_no_compresssion(self):
self.flags(backup_compression_algorithm='none')
service = google_dr.GoogleBackupDriver(self.ctxt)
# Set up buffer of 128 zeroed bytes
fake_data = b'\0' * 128
result = service._prepare_output_data(fake_data)
self.assertEqual('none', result[0])
self.assertEqual(fake_data, result[1])
@gcs_client
def test_prepare_output_data_ineffective_compression(self):
service = google_dr.GoogleBackupDriver(self.ctxt)
# Set up buffer of 128 zeroed bytes
fake_data = b'\0' * 128
# Pre-compress so that compression in the driver will be ineffective.
already_compressed_data = service.compressor.compress(fake_data)
result = service._prepare_output_data(already_compressed_data)
self.assertEqual('none', result[0])
self.assertEqual(already_compressed_data, result[1])

View File

@ -0,0 +1,3 @@
---
features:
- Add cinder backup driver for Google Cloud Storage.

View File

@ -53,3 +53,4 @@ oslo.vmware>=1.16.0 # Apache-2.0
os-brick>=0.4.0 # Apache-2.0
os-win>=0.0.7 # Apache-2.0
tooz>=1.28.0 # Apache-2.0
google-api-python-client>=1.4.2 # Apache-2.0