cinder/cinder/tests/unit/backup/fake_google_client2.py
Nikesh 165e91c7ec Add cinder backup driver for Google Cloud Storage
Implement cinder backup driver for Google Cloud Storage.
It supports:
 - creating backup which is stored on Google Cloud Storage
 - restoring backup from Google cloud storage
 - incremental backup
 - independent of any volume backend

DocImpact
Implements: blueprint gcs-cinder-backup-driver
Depends-On: I04f2e3dee1f5e001ad2f1cce15893b0965c082d7

Change-Id: Id2951176ad636afa52208e78c0b678cf02b8fad2
Co-Authored-By: Sreedhar Varma<sreedharv@vedams.com>
Co-Authored-By: Lakshmi Narayana<lakshminarayanat@vedams.com>
2016-01-18 23:28:23 +05:30

132 lines
3.6 KiB
Python

# Copyright (C) 2012 Hewlett-Packard Development Company, L.P.
# Copyright (C) 2016 Vedams Inc.
# Copyright (C) 2016 Google Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import tempfile
class FakeGoogleObjectInsertExecute(object):
def execute(self, *args, **kwargs):
return {u'md5Hash': u'Z2NzY2luZGVybWQ1'}
class FakeGoogleObjectListExecute(object):
def __init__(self, *args, **kwargs):
self.bucket_name = kwargs['bucket']
self.prefix = kwargs['prefix']
def execute(self, *args, **kwargs):
bucket_dir = tempfile.gettempdir() + '/' + self.bucket_name
fake_body = []
for f in os.listdir(bucket_dir):
try:
f.index(self.prefix)
fake_body.append({'name': f})
except Exception:
pass
return {'items': fake_body}
class FakeGoogleBucketListExecute(object):
def execute(self, *args, **kwargs):
return {u'items': [{u'name': u'gcscinderbucket'},
{u'name': u'gcsbucket'}]}
class FakeGoogleBucketInsertExecute(object):
def execute(self, *args, **kwargs):
pass
class FakeMediaObject(object):
def __init__(self, *args, **kwargs):
self.bucket_name = kwargs['bucket']
self.object_name = kwargs['object']
class FakeGoogleObject(object):
def insert(self, *args, **kwargs):
object_path = (tempfile.gettempdir() + '/' + kwargs['bucket'] + '/' +
kwargs['name'])
kwargs['media_body']._fd.getvalue()
with open(object_path, 'wb') as object_file:
kwargs['media_body']._fd.seek(0)
object_file.write(kwargs['media_body']._fd.read())
return FakeGoogleObjectInsertExecute()
def get_media(self, *args, **kwargs):
return FakeMediaObject(*args, **kwargs)
def list(self, *args, **kwargs):
return FakeGoogleObjectListExecute(*args, **kwargs)
class FakeGoogleBucket(object):
def list(self, *args, **kwargs):
return FakeGoogleBucketListExecute()
def insert(self, *args, **kwargs):
return FakeGoogleBucketInsertExecute()
class FakeGoogleDiscovery(object):
"""Logs calls instead of executing."""
def __init__(self, *args, **kwargs):
pass
@classmethod
def Build(self, *args, **kargs):
return FakeDiscoveryBuild()
class FakeDiscoveryBuild(object):
"""Logging calls instead of executing."""
def __init__(self, *args, **kwargs):
pass
def objects(self):
return FakeGoogleObject()
def buckets(self):
return FakeGoogleBucket()
class FakeGoogleCredentials(object):
def __init__(self, *args, **kwargs):
pass
@classmethod
def from_stream(self, *args, **kwargs):
pass
class FakeGoogleMediaIoBaseDownload(object):
def __init__(self, fh, req, chunksize=None):
object_path = (tempfile.gettempdir() + '/' + req.bucket_name + '/' +
req.object_name)
with open(object_path, 'rb') as object_file:
fh.write(object_file.read())
def next_chunk(self, **kwargs):
return (100, True)