Increase s3 driver support

When use s3 driver, you should specify storage type with
'--storage s3', specify the auth info like '--access-key xxx'
and '--secret-key xxx', and specify the S3 compatible storage
endpoint with '--endpoint  http://ip:port'.

Change-Id: If35468b3bd3a156ac5ade60c771ec8ff1a320139
Implements: blueprint s3-driver-support
This commit is contained in:
Pengju Jiao 2017-06-04 17:57:22 +08:00
parent 78d92b4b9a
commit c4d1afac07
9 changed files with 509 additions and 9 deletions

View File

@ -76,11 +76,12 @@ DEFAULT_PARAMS = {
'mode': 'fs', 'action': 'backup', 'shadow': '', 'shadow_path': '',
'windows_volume': '', 'command': None, 'metadata_out': None,
'storage': 'swift', 'ssh_key': '', 'ssh_username': '', 'ssh_host': '',
'ssh_port': DEFAULT_SSH_PORT, 'compression': 'gzip',
'overwrite': False, 'incremental': None,
'consistency_check': False, 'consistency_checksum': None,
'nova_restore_network': None, 'cindernative_backup_id': None,
'sync': True, 'engine_name': 'tar', 'timeout': 120, 'project_id': None,
'ssh_port': DEFAULT_SSH_PORT, 'access_key': '', 'secret_key': '',
'endpoint': '', 'compression': 'gzip', 'overwrite': False,
'incremental': None, 'consistency_check': False,
'consistency_checksum': None, 'nova_restore_network': None,
'cindernative_backup_id': None, 'sync': True, 'engine_name': 'tar',
'timeout': 120, 'project_id': None,
}
_COMMON = [
@ -413,10 +414,27 @@ _COMMON = [
cfg.StrOpt('storage',
dest='storage',
default=DEFAULT_PARAMS['storage'],
choices=['local', 'swift', 'ssh'],
help="Storage for backups. Can be Swift or Local now. Swift is "
"default storage now. Local stores backups on the same "
"defined path and swift will store files in container."
choices=['local', 'swift', 'ssh', 's3'],
help="Storage for backups. Can be Swift, Local, SSH and S3 "
"now. Swift is default storage now. Local stores backups"
"on the same defined path, swift will store files in "
"container, and s3 will store files in bucket in S3 "
"compatible storage."
),
cfg.StrOpt('access-key',
dest='access_key',
default=DEFAULT_PARAMS['access_key'],
help="Access key for S3 compatible storage"
),
cfg.StrOpt('secret-key',
dest='secret_key',
default=DEFAULT_PARAMS['secret_key'],
help="Secret key for S3 compatible storage"
),
cfg.StrOpt('endpoint',
dest='endpoint',
default=DEFAULT_PARAMS['endpoint'],
help="Endpoint of S3 compatible storage"
),
cfg.StrOpt('ssh-key',
dest='ssh_key',

View File

@ -61,6 +61,12 @@ class NovaEngine(engine.BackupEngine):
headers, data = swift_connection.get_object(
self.storage.storage_path,
"project_" + project_id)
elif self.storage._type == 's3':
bucket_name, object_name = self.get_storage_info(project_id)
data = self.storage.get_object(
bucket_name=bucket_name,
key=object_name
)['Body'].read()
elif self.storage._type in ['local', 'ssh']:
backup_basepath = os.path.join(self.storage.storage_path,
'project_' + project_id)
@ -143,6 +149,13 @@ class NovaEngine(engine.BackupEngine):
swift_connection.put_object(self.storage.storage_path,
"project_{0}".format(project_id),
data)
elif self.storage._type == 's3':
bucket_name, object_name = self.get_storage_info(project_id)
self.storage.put_object(
bucket_name=bucket_name,
key=object_name,
data=data
)
elif self.storage._type in ['local', 'ssh']:
backup_basepath = os.path.join(self.storage.storage_path,
"project_" + project_id)
@ -169,6 +182,16 @@ class NovaEngine(engine.BackupEngine):
futures.wait(futures_list, CONF.timeout)
def get_storage_info(self, project_id):
if self.storage.get_object_prefix() != '':
object_name = "{0}/project_{1}".format(
self.storage.get_object_prefix(),
project_id
)
else:
object_name = "project_{0}".format(project_id)
return self.storage.get_bucket_name(), object_name
def backup_data(self, backup_resource, manifest_path):
server = self.nova.servers.get(backup_resource)
if not server:

View File

@ -32,6 +32,7 @@ from freezer.engine import manager as engine_manager
from freezer import job
from freezer.storage import local
from freezer.storage import multiple
from freezer.storage import s3
from freezer.storage import ssh
from freezer.storage import swift
from freezer.utils import utils
@ -63,6 +64,25 @@ def freezer_main(backup_args):
backup_args.client_manager = client_manager.get_client_manager(
backup_args.__dict__)
if backup_args.storage == 's3':
if backup_args.__dict__['access_key'] == '' \
and 'ACCESS_KEY' in os.environ:
backup_args.__dict__['access_key'] = os.environ.get('ACCESS_KEY')
if backup_args.__dict__['access_key'] == '':
raise Exception('No access key found for S3 compatible storage')
if backup_args.__dict__['secret_key'] == '' \
and 'SECRET_KEY' in os.environ:
backup_args.__dict__['secret_key'] = os.environ.get('SECRET_KEY')
if backup_args.__dict__['secret_key'] == '':
raise Exception('No secret key found for S3 compatible storage')
if backup_args.__dict__['endpoint'] == '' \
and 'ENDPOINT' in os.environ:
backup_args.__dict__['endpoint'] = os.environ.get('ENDPOINT')
if backup_args.__dict__['endpoint'] == '':
raise Exception('No endpoint found for S3 compatible storage')
if backup_args.storages:
# pylint: disable=abstract-class-instantiated
storage = multiple.MultipleStorage(
@ -178,6 +198,14 @@ def storage_from_dict(backup_args, max_segment_size):
storage = swift.SwiftStorage(
client_manager, container, max_segment_size)
elif storage_name == "s3":
storage = s3.S3Storage(
backup_args['access_key'],
backup_args['secret_key'],
backup_args['endpoint'],
container,
max_segment_size
)
elif storage_name == "local":
storage = local.LocalStorage(
storage_path=container,

View File

@ -49,6 +49,14 @@ class RestoreOs(object):
info, backups = swift.get_container(self.container, prefix=path)
backups = sorted(
map(lambda x: int(x["name"].rsplit("/", 1)[-1]), backups))
elif self.storage.type == "s3":
bucket_name, path = self.get_storage_info(path)
backups = self.storage.list_all_objects(
bucket_name=bucket_name,
prefix=path
)
backups = sorted(
map(lambda x: int(x['Key'].split("/", 2)[1]), backups))
elif self.storage.type == "local":
path = "{0}/{1}".format(self.container, path)
backups = os.listdir(os.path.abspath(path))
@ -67,6 +75,14 @@ class RestoreOs(object):
raise BaseException(msg)
return backups[-1]
def get_storage_info(self, path):
storage_info = self.container.split('/', 1)
container_name = storage_info[0]
object_prefix = storage_info[1] if '/' in self.container else ''
if object_prefix != '':
path = "{0}/{1}".format(object_prefix, path)
return container_name, path
def _create_image(self, path, restore_from_timestamp):
"""
:param path:
@ -84,6 +100,40 @@ class RestoreOs(object):
length = int(stream[0]["x-object-meta-length"])
data = utils.ReSizeStream(stream[1], length, 1)
info = stream[0]
image = self.client_manager.create_image(
name="restore_{}".format(path),
container_format="bare",
disk_format="raw",
data=data)
return info, image
elif self.storage.type == 's3':
if self.storage.get_object_prefix() != '':
base_path = "{0}/{1}/{2}".format(
self.storage.get_object_prefix(),
path,
backup
)
else:
base_path = "{0}/{1}".format(path, backup)
image_file = "{0}/{1}".format(base_path, path)
s3_object = self.storage.get_object(
bucket_name=self.storage.get_bucket_name(),
key=image_file
)
stream = utils.S3ResponseStream(data=s3_object['Body'],
chunk_size=10000000)
data = utils.ReSizeStream(
stream,
s3_object['ContentLength'],
1
)
metadata = "{0}/metadata".format(base_path)
metadata_object = self.storage.get_object(
bucket_name=self.storage.get_bucket_name(),
key=metadata
)
info = json.load(metadata_object['Body'])
image = self.client_manager.create_image(
name="restore_{}".format(path),
container_format="bare",

286
freezer/storage/s3.py Normal file
View File

@ -0,0 +1,286 @@
"""
(c) Copyright 2014,2015 Hewlett-Packard Development Company, L.P.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import json
import botocore
import botocore.session
import logging
import requests
from oslo_log import log
from requests.packages.urllib3.exceptions import InsecureRequestWarning
from freezer.storage import physical
from freezer.utils import utils
LOG = log.getLogger(__name__)
logging.getLogger('botocore').setLevel(logging.WARNING)
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
class S3Storage(physical.PhysicalStorage):
_type = 's3'
def __init__(self, access_key, secret_key, endpoint, container,
max_segment_size, skip_prepare=False):
"""
:type container: str
"""
self.access_key = access_key
self.secret_key = secret_key
self.endpoint = endpoint
super(S3Storage, self).__init__(
storage_path=container,
max_segment_size=max_segment_size,
skip_prepare=skip_prepare)
self.container = container
storage_info = self.get_storage_info()
self._bucket_name = storage_info[0]
self._object_prefix = storage_info[1]
def get_storage_info(self):
storage_info = self.storage_path.split('/', 1)
bucket_name = storage_info[0]
object_prefix = storage_info[1] if '/' in self.storage_path else ''
return bucket_name, object_prefix
def rmtree(self, path):
split = path.split('/', 1)
all_s3_objects = self.list_all_objects(
bucket_name=split[0],
prefix=split[1]
)
for s3_object in all_s3_objects:
self.get_s3_connection().delete_object(
Bucket=split[0],
Key=s3_object['Key']
)
def put_file(self, from_path, to_path):
split = to_path.split('/', 1)
self.get_s3_connection().put_object(
Bucket=split[0],
Key=split[1],
Body=open(from_path, 'r')
)
def get_s3_connection(self):
"""
:rtype: s3client.Connection
:return:
"""
return botocore.session.get_session().create_client(
's3',
aws_access_key_id=self.access_key,
aws_secret_access_key=self.secret_key,
endpoint_url=self.endpoint
)
def prepare(self):
"""
Check if the provided bucket is already available on S3
compatible storage. The verification is done by exact matching
between the provided bucket name and the whole list of bucket
available for the S3 account.
"""
bucket_list = \
[c['Name'] for c in self.get_s3_connection()
.list_buckets()['Buckets']]
split = self.storage_path.split('/', 1)
if split[0] not in bucket_list:
self.get_s3_connection().create_bucket(Bucket=split[0])
def info(self):
# S3 standard interface do not have stats query function
buckets = self.get_s3_connection().list_buckets()['Buckets']
ordered_buckets = []
for bucket in buckets:
ordered_bucket = {
'container_name': bucket['Name'],
'size': '{0} MB'.format(0),
'objects_count': 0
}
ordered_buckets.append(ordered_bucket)
return ordered_buckets
def get_file(self, from_path, to_path):
split = from_path.split('/', 1)
with open(to_path, 'ab') as obj_fd:
s3_object = self.get_s3_connection().get_object(
Bucket=split[0],
Key=split[1])['Body']
while True:
object_trunk = s3_object.read(self.max_segment_size)
if len(object_trunk) == 0:
break
obj_fd.write(object_trunk)
def add_stream(self, stream, package_name, headers=None):
split = package_name.rsplit('/', 1)
backup_basedir = package_name \
if self.get_object_prefix() == '' \
else "{0}/{1}".format(self.get_object_prefix(), package_name)
backup_basepath = "{0}/{1}".format(backup_basedir, split[0])
backup_meta_data = "{0}/metadata".format(backup_basedir)
self.upload_stream(backup_basepath, stream)
self.get_s3_connection().put_object(
Bucket=self.get_bucket_name(),
Body=json.dumps(headers),
Key=backup_meta_data
)
def upload_stream(self, backup_basepath, stream):
upload_id = self.get_s3_connection().create_multipart_upload(
Bucket=self.get_bucket_name(),
Key=backup_basepath
)['UploadId']
upload_part_index = 1
uploaded_parts = []
try:
for el in stream:
response = self.get_s3_connection().upload_part(
Body=el,
Bucket=self.get_bucket_name(),
Key=backup_basepath,
PartNumber=upload_part_index,
UploadId=upload_id
)
uploaded_parts.append({
'PartNumber': upload_part_index,
'ETag': response['ETag']
})
upload_part_index += 1
# Complete the upload, which requires info on all of the parts
part_info = {
'Parts': uploaded_parts
}
if not uploaded_parts:
# currently, not support volume boot instance
LOG.error(
"No part uploaded(not support volume boot instance)"
)
raise RuntimeError(
'No part uploaded(not support volume boot instance)'
)
self.get_s3_connection().complete_multipart_upload(
Bucket=self.get_bucket_name(),
Key=backup_basepath,
MultipartUpload=part_info,
UploadId=upload_id
)
except Exception as e:
LOG.error("Upload stream to S3 error, so abort it. "
"Exception: {0}".format(e))
self.get_s3_connection().abort_multipart_upload(
Bucket=self.get_bucket_name(),
Key=backup_basepath,
UploadId=upload_id
)
def backup_blocks(self, backup):
"""
:param backup:
:type backup: freezer.storage.base.Backup
:return:
"""
split = backup.data_path.split('/', 1)
s3_object = self.get_s3_connection().get_object(
Bucket=split[0],
Key=split[1]
)
return utils.S3ResponseStream(
data=s3_object['Body'],
chunk_size=self.max_segment_size
)
def write_backup(self, rich_queue, backup):
"""
Upload object to the remote S3 compatible storage server
:type rich_queue: freezer.streaming.RichQueue
:type backup: freezer.storage.base.Backup
"""
backup = backup.copy(storage=self)
path = backup.data_path.split('/', 1)[1]
self.upload_stream(path, rich_queue.get_messages())
def listdir(self, path):
"""
:type path: str
:param path:
:rtype: collections.Iterable[str]
"""
try:
files = set()
split = path.split('/', 1)
all_s3_objects = self.list_all_objects(
bucket_name=split[0],
prefix=split[1]
)
for s3_object in all_s3_objects:
files.add(s3_object['Key'][len(split[1]):].split('/', 2)[1])
return files
except Exception as e:
LOG.error(e)
return []
def create_dirs(self, folder_list):
pass
def get_bucket_name(self):
return self._bucket_name
def get_object_prefix(self):
return self._object_prefix
def get_object(self, bucket_name, key):
return self.get_s3_connection().get_object(
Bucket=bucket_name,
Key=key
)
def list_all_objects(self, bucket_name, prefix):
all_objects = []
is_truncated = True
marker = ''
while is_truncated:
response = self.get_s3_connection().list_objects(
Bucket=bucket_name,
Marker=marker,
Prefix=prefix
)
if 'Contents' not in response:
break
all_objects.extend(response['Contents'])
is_truncated = response['IsTruncated']
return all_objects
def put_object(self, bucket_name, key, data):
self.get_s3_connection().put_object(
Bucket=bucket_name,
Key=key,
Body=data
)

View File

@ -176,3 +176,52 @@ class TestNovaEngineFSLikeStorage(TestNovaEngine):
'rb')
self.mock_file.readline.assert_called_once_with()
self.engine.restore.assert_has_calls(self.expected_restore_calls)
class TestNovaEngineS3Storage(TestNovaEngine):
def setUp(self):
super(TestNovaEngineS3Storage, self).setUp()
self.mock_s3_storage = mock.MagicMock()
self.mock_s3_storage.get_object()['Body'].read.return_value \
= self.instance_ids_str
self.mock_s3_storage.put_object = mock.MagicMock()
self.mock_s3_storage.storage_path = 'test/storage/path'
self.mock_s3_storage._type = 's3'
self.engine = nova.NovaEngine(self.mock_s3_storage)
self.engine.client = self.backup_opt.client_manager
self.engine.backup = mock.Mock()
self.engine.restore = mock.Mock()
self.engine.nova = self.mock_nova
def test_backup_nova_tenant_to_s3_storage(self):
self.engine.backup_nova_tenant(self.project_id,
self.backup_opt.backup_name,
self.backup_opt.no_incremental,
self.backup_opt.max_level,
self.backup_opt.always_level,
self.backup_opt.restart_always_level)
self.mock_nova.servers.list.assert_called_once_with(detailed=False)
self.mock_s3_storage.put_object.assert_called_with(
bucket_name=self.mock_s3_storage.get_bucket_name(),
key="{0}/project_test-project-id".format(
self.mock_s3_storage.get_object_prefix()),
data=self.instance_ids_str
)
self.engine.backup.assert_has_calls(self.expected_backup_calls)
def test_restore_nova_tenant_from_s3_storage(self):
self.engine.restore_nova_tenant(self.project_id,
self.backup_opt.backup_name,
self.backup_opt.overwrite,
'test_timestamp')
self.mock_s3_storage.get_object.assert_called_with(
bucket_name=self.mock_s3_storage.get_bucket_name(),
key="{0}/project_test-project-id".format(
self.mock_s3_storage.get_object_prefix()
)
)
self.engine.restore.assert_has_calls(self.expected_restore_calls)

View File

@ -19,6 +19,7 @@ Freezer general utils functions
import datetime
import errno
import fnmatch as fn
import logging
import os
import subprocess
import sys
@ -30,6 +31,8 @@ from functools import wraps
from oslo_log import log
from six.moves import configparser
logging.getLogger('botocore').setLevel(logging.WARNING)
LOG = log.getLogger(__name__)
@ -210,6 +213,36 @@ class Bunch(object):
return self.__dict__.get(item)
class S3ResponseStream(object):
"""
Readable and iterable object body response wrapper.
"""
def __init__(self, data, chunk_size):
"""
Wrap the underlying response
:param data: the response to wrap
:param chunk_size: number of bytes to return each iteration/next call
"""
self.data = data
self.chunk_size = chunk_size
def read(self, length=None):
return self.data.read(length)
def __iter__(self):
return self
def next(self):
buf = self.read(self.chunk_size)
if not buf:
raise StopIteration()
return buf
def __next__(self):
return self.next()
class ReSizeStream(object):
"""
Iterator/File-like object for changing size of chunk in stream

View File

@ -0,0 +1,12 @@
---
prelude: >
Currently, freezer can only store backup data to swift compatible object
storage (except local and ssh), so we should increase support for other
storage driver. S3 compatible object storage is a valid choice, which is
used by many individuals and companies in the public or private clouds.
features:
- |
Added new storage type called 's3' which allows to store the backup data
to S3 compatible object storage and restore from it with using botocore
library.

View File

@ -1,6 +1,7 @@
# The order of packages is significant, because pip processes them in the order
# of appearance. Changing the order has an impact on the overall integration
# process, which may cause wedges in the gate later.
botocore>=1.0.0 # Apache-2.0
setuptools!=24.0.0,!=34.0.0,!=34.0.1,!=34.0.2,!=34.0.3,!=34.1.0,!=34.1.1,!=34.2.0,!=34.3.0,!=34.3.1,!=34.3.2,>=16.0 # PSF/ZPL
pbr!=2.1.0,>=2.0.0 # Apache-2.0
python-swiftclient>=3.2.0 # Apache-2.0