Increase s3 bank plugin
Change-Id: I3ab61c9c6d22187d95205ef8235fe078d5d1592d Implements: blueprint s3-bank-plugin
This commit is contained in:
parent
90f05b7c70
commit
323d17b6fb
21
etc/providers.d/openstack-s3-bank.conf
Normal file
21
etc/providers.d/openstack-s3-bank.conf
Normal file
@ -0,0 +1,21 @@
|
||||
[provider]
|
||||
name = OS Infra Provider with S3 compatible storage bank
|
||||
description = This provider uses S3 compatible storage as the bank of karbor
|
||||
id = c8e52e4d-0479-43e0-b1a1-318c86798cb8
|
||||
|
||||
plugin=karbor-volume-protection-plugin
|
||||
plugin=karbor-image-protection-plugin
|
||||
plugin=karbor-server-protection-plugin
|
||||
bank=karbor-s3-bank-plugin
|
||||
|
||||
[s3_client]
|
||||
s3_endpoint=http://127.0.0.1:7480
|
||||
s3_access_key=demo
|
||||
s3_secret_key=password
|
||||
|
||||
[s3_bank_plugin]
|
||||
lease_expire_window=600
|
||||
lease_renew_window=120
|
||||
lease_validity_window=100
|
||||
bank_s3_object_bucket=karbor
|
||||
bank_s3_lease_bucket=lease
|
@ -365,6 +365,10 @@ class CreateContainerFailed(KarborException):
|
||||
message = _("Create Container in Bank Failed: %(reason)s")
|
||||
|
||||
|
||||
class CreateBucketFailed(KarborException):
|
||||
message = _("Create Bucket in Bank Failed: %(reason)s")
|
||||
|
||||
|
||||
class TriggerIsInvalid(Invalid):
|
||||
message = _("Trigger%(trigger_id)s is invalid.")
|
||||
|
||||
|
236
karbor/services/protection/bank_plugins/s3_bank_plugin.py
Normal file
236
karbor/services/protection/bank_plugins/s3_bank_plugin.py
Normal file
@ -0,0 +1,236 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging as log
|
||||
import math
|
||||
import time
|
||||
|
||||
from botocore.exceptions import ClientError
|
||||
from karbor import exception
|
||||
from karbor.i18n import _
|
||||
from karbor.services.protection.bank_plugin import BankPlugin
|
||||
from karbor.services.protection.bank_plugin import LeasePlugin
|
||||
from karbor.services.protection import client_factory
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
from oslo_serialization import jsonutils
|
||||
from oslo_service import loopingcall
|
||||
from oslo_utils import uuidutils
|
||||
|
||||
s3_bank_plugin_opts = [
|
||||
cfg.StrOpt('bank_s3_object_bucket',
|
||||
default='karbor',
|
||||
help='The default s3 object bucket to use.'),
|
||||
cfg.StrOpt('bank_s3_lease_bucket',
|
||||
default='lease',
|
||||
help='The default s3 lease bucket to use.'),
|
||||
]
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
log.getLogger('botocore').setLevel(log.WARNING)
|
||||
|
||||
lease_opt = [cfg.IntOpt('lease_expire_window',
|
||||
default=600,
|
||||
help='expired_window for bank lease, in seconds'),
|
||||
cfg.IntOpt('lease_renew_window',
|
||||
default=120,
|
||||
help='period for bank lease, in seconds, '
|
||||
'between bank lease client renew the lease'),
|
||||
cfg.IntOpt('lease_validity_window',
|
||||
default=100,
|
||||
help='validity_window for bank lease, in seconds'), ]
|
||||
|
||||
|
||||
class S3ConnectionFailed(exception.KarborException):
|
||||
message = _("Connection to s3 failed: %(reason)s")
|
||||
|
||||
|
||||
class S3BankPlugin(BankPlugin, LeasePlugin):
|
||||
"""S3 bank plugin"""
|
||||
def __init__(self, config, context=None):
|
||||
super(S3BankPlugin, self).__init__(config)
|
||||
self._config.register_opts(s3_bank_plugin_opts,
|
||||
"s3_bank_plugin")
|
||||
self._config.register_opts(lease_opt,
|
||||
"s3_bank_plugin")
|
||||
plugin_cfg = self._config.s3_bank_plugin
|
||||
self.bank_object_bucket = plugin_cfg.bank_s3_object_bucket
|
||||
self.lease_expire_window = plugin_cfg.lease_expire_window
|
||||
self.lease_renew_window = plugin_cfg.lease_renew_window
|
||||
self.context = context
|
||||
self.lease_validity_window = plugin_cfg.lease_validity_window
|
||||
|
||||
self.owner_id = uuidutils.generate_uuid()
|
||||
self.lease_expire_time = 0
|
||||
self.bank_leases_bucket = plugin_cfg.bank_s3_lease_bucket
|
||||
self._connection = None
|
||||
|
||||
def _setup_connection(self):
|
||||
return client_factory.ClientFactory.create_client(
|
||||
's3',
|
||||
self.context,
|
||||
self._config
|
||||
)
|
||||
|
||||
@property
|
||||
def connection(self):
|
||||
if not self._connection:
|
||||
_connection = self._setup_connection()
|
||||
# create bucket
|
||||
try:
|
||||
_connection.create_bucket(Bucket=self.bank_object_bucket)
|
||||
_connection.create_bucket(Bucket=self.bank_leases_bucket)
|
||||
except S3ConnectionFailed as err:
|
||||
LOG.error("bank plugin create bucket failed.")
|
||||
raise exception.CreateBucketrFailed(reason=err)
|
||||
self._connection = _connection
|
||||
|
||||
# acquire lease
|
||||
try:
|
||||
self.acquire_lease()
|
||||
except exception.AcquireLeaseFailed:
|
||||
LOG.error("bank plugin acquire lease failed.")
|
||||
raise
|
||||
|
||||
# start renew lease
|
||||
renew_lease_loop = loopingcall.FixedIntervalLoopingCall(
|
||||
self.renew_lease
|
||||
)
|
||||
renew_lease_loop.start(
|
||||
interval=self.lease_renew_window,
|
||||
initial_delay=self.lease_renew_window
|
||||
)
|
||||
return self._connection
|
||||
|
||||
def get_owner_id(self):
|
||||
return self.owner_id
|
||||
|
||||
def update_object(self, key, value):
|
||||
serialized = False
|
||||
try:
|
||||
if not isinstance(value, str):
|
||||
value = jsonutils.dumps(value)
|
||||
serialized = True
|
||||
self._put_object(bucket=self.bank_object_bucket,
|
||||
obj=key,
|
||||
contents=value,
|
||||
headers={
|
||||
'x-object-meta-serialized': str(serialized)
|
||||
})
|
||||
except S3ConnectionFailed as err:
|
||||
LOG.error("update object failed, err: %s.", err)
|
||||
raise exception.BankUpdateObjectFailed(reason=err, key=key)
|
||||
|
||||
def delete_object(self, key):
|
||||
try:
|
||||
self._delete_object(bucket=self.bank_object_bucket,
|
||||
obj=key)
|
||||
except S3ConnectionFailed as err:
|
||||
LOG.error("delete object failed, err: %s.", err)
|
||||
raise exception.BankDeleteObjectFailed(reason=err, key=key)
|
||||
|
||||
def get_object(self, key):
|
||||
try:
|
||||
return self._get_object(bucket=self.bank_object_bucket,
|
||||
obj=key)
|
||||
except S3ConnectionFailed as err:
|
||||
LOG.error("get object failed, err: %s.", err)
|
||||
raise exception.BankGetObjectFailed(reason=err, key=key)
|
||||
|
||||
def list_objects(self, prefix=None, limit=None, marker=None,
|
||||
sort_dir=None):
|
||||
try:
|
||||
response = self._get_bucket(
|
||||
bucket=self.bank_object_bucket,
|
||||
prefix=prefix,
|
||||
limit=limit,
|
||||
marker=marker
|
||||
)
|
||||
if 'Contents' not in response:
|
||||
return []
|
||||
else:
|
||||
return [obj['Key'] for obj in response['Contents']]
|
||||
except S3ConnectionFailed as err:
|
||||
LOG.error("list objects failed, err: %s.", err)
|
||||
raise exception.BankListObjectsFailed(reason=err)
|
||||
|
||||
def acquire_lease(self):
|
||||
bucket = self.bank_leases_bucket
|
||||
obj = self.owner_id
|
||||
contents = self.owner_id
|
||||
self.lease_expire_time = \
|
||||
math.floor(time.time()) + self.lease_expire_window
|
||||
headers = {'lease-expire-time': str(self.lease_expire_time)}
|
||||
try:
|
||||
self._put_object(bucket=bucket,
|
||||
obj=obj,
|
||||
contents=contents,
|
||||
headers=headers)
|
||||
except S3ConnectionFailed as err:
|
||||
LOG.error("acquire lease failed, err:%s.", err)
|
||||
raise exception.AcquireLeaseFailed(reason=err)
|
||||
|
||||
def renew_lease(self):
|
||||
self.acquire_lease()
|
||||
|
||||
def check_lease_validity(self):
|
||||
if (self.lease_expire_time - math.floor(time.time()) >=
|
||||
self.lease_validity_window):
|
||||
return True
|
||||
else:
|
||||
self._delete_object(
|
||||
bucket=self.bank_leases_bucket,
|
||||
obj=self.owner_id
|
||||
)
|
||||
return False
|
||||
|
||||
def _put_object(self, bucket, obj, contents, headers=None):
|
||||
try:
|
||||
self.connection.put_object(
|
||||
Bucket=bucket,
|
||||
Key=obj,
|
||||
Body=contents,
|
||||
Metadata=headers
|
||||
)
|
||||
except ClientError as err:
|
||||
raise S3ConnectionFailed(reason=err)
|
||||
|
||||
def _get_object(self, bucket, obj):
|
||||
try:
|
||||
response = self.connection.get_object(Bucket=bucket, Key=obj)
|
||||
body = response['Body'].read()
|
||||
if response['Metadata']["x-object-meta-serialized"]\
|
||||
.lower() == "true":
|
||||
body = jsonutils.loads(body)
|
||||
return body
|
||||
except ClientError as err:
|
||||
raise S3ConnectionFailed(reason=err)
|
||||
|
||||
def _delete_object(self, bucket, obj):
|
||||
try:
|
||||
self.connection.delete_object(Bucket=bucket,
|
||||
Key=obj)
|
||||
except ClientError as err:
|
||||
raise S3ConnectionFailed(reason=err)
|
||||
|
||||
def _get_bucket(self, bucket, prefix=None, limit=None, marker=None):
|
||||
try:
|
||||
prefix = '' if prefix is None else prefix
|
||||
marker = '' if marker is None else marker
|
||||
response = self.connection.list_objects(
|
||||
Bucket=bucket,
|
||||
Prefix=prefix,
|
||||
MaxKeys=limit,
|
||||
Marker=marker)
|
||||
return response
|
||||
except ClientError as err:
|
||||
raise S3ConnectionFailed(reason=err)
|
53
karbor/services/protection/clients/s3.py
Normal file
53
karbor/services/protection/clients/s3.py
Normal file
@ -0,0 +1,53 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import botocore
|
||||
import botocore.session
|
||||
import logging
|
||||
from oslo_config import cfg
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
SERVICE = 's3'
|
||||
s3_client_opts = [
|
||||
cfg.StrOpt(SERVICE + '_endpoint',
|
||||
help='URL of the S3 compatible storage endpoint.'),
|
||||
cfg.StrOpt(SERVICE + '_access_key',
|
||||
help='Access key for S3 compatible storage.'),
|
||||
cfg.StrOpt(SERVICE + '_secret_key',
|
||||
secret=True,
|
||||
help='Secret key for S3 compatible storage.'),
|
||||
cfg.IntOpt(SERVICE + '_retry_attempts',
|
||||
default=3,
|
||||
help='The number of retries to make for '
|
||||
'S3 operations'),
|
||||
cfg.IntOpt(SERVICE + '_retry_backoff',
|
||||
default=2,
|
||||
help='The backoff time in seconds '
|
||||
'between S3 retries')
|
||||
]
|
||||
|
||||
|
||||
def register_opts(conf):
|
||||
conf.register_opts(s3_client_opts, group=SERVICE + '_client')
|
||||
|
||||
|
||||
def create(context, conf, **kwargs):
|
||||
register_opts(conf)
|
||||
|
||||
client_config = conf.s3_client
|
||||
LOG.info('Creating s3 client with url %s.',
|
||||
client_config.s3_endpoint)
|
||||
return botocore.session.get_session().create_client(
|
||||
's3',
|
||||
aws_access_key_id=client_config.s3_access_key,
|
||||
aws_secret_access_key=client_config.s3_ecret_key,
|
||||
endpoint_url=client_config.s3_endpoint
|
||||
)
|
79
karbor/tests/unit/protection/fake_s3_client.py
Normal file
79
karbor/tests/unit/protection/fake_s3_client.py
Normal file
@ -0,0 +1,79 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from botocore.exceptions import ClientError
|
||||
|
||||
|
||||
class FakeS3Client(object):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(FakeS3Client, self).__init__()
|
||||
|
||||
@classmethod
|
||||
def connection(cls, *args, **kargs):
|
||||
return FakeS3Connection()
|
||||
|
||||
|
||||
class FakeS3Connection(object):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(FakeS3Connection, self).__init__()
|
||||
self.s3_dir = {}
|
||||
self.object_headers = {}
|
||||
|
||||
def create_bucket(self, Bucket):
|
||||
self.s3_dir[Bucket] = {
|
||||
'Keys': {}
|
||||
}
|
||||
|
||||
def list_objects(self, Bucket, Prefix, MaxKeys, Marker):
|
||||
body = []
|
||||
prefix = '' if not Prefix else Prefix
|
||||
for obj in self.s3_dir[Bucket]['Keys'].keys():
|
||||
if obj.startswith(prefix):
|
||||
body.append({
|
||||
'Key': obj
|
||||
})
|
||||
return {'Contents': body}
|
||||
|
||||
def put_object(self, Bucket, Key, Body, Metadata=None):
|
||||
if Bucket in self.s3_dir.keys():
|
||||
self.s3_dir[Bucket]['Keys'][Key] = {
|
||||
'Body': FakeS3Stream(Body),
|
||||
'Metadata': Metadata if Metadata else {}
|
||||
}
|
||||
else:
|
||||
raise ClientError("error_bucket")
|
||||
|
||||
def get_object(self, Bucket, Key):
|
||||
if Bucket in self.s3_dir.keys():
|
||||
if Key in self.s3_dir[Bucket]['Keys'].keys():
|
||||
return self.s3_dir[Bucket]['Keys'][Key]
|
||||
else:
|
||||
raise ClientError("error_object")
|
||||
else:
|
||||
raise ClientError("error_bucket")
|
||||
|
||||
def delete_object(self, Bucket, Key):
|
||||
if Bucket in self.s3_dir.keys():
|
||||
if Key in self.s3_dir[Bucket]['Keys'].keys():
|
||||
del self.s3_dir[Bucket]['Keys'][Key]
|
||||
else:
|
||||
raise ClientError("error_object")
|
||||
else:
|
||||
raise ClientError("error_bucket")
|
||||
|
||||
|
||||
class FakeS3Stream(object):
|
||||
def __init__(self, data):
|
||||
self.data = data
|
||||
|
||||
def read(self):
|
||||
return self.data
|
97
karbor/tests/unit/protection/test_s3_bank_plugin.py
Normal file
97
karbor/tests/unit/protection/test_s3_bank_plugin.py
Normal file
@ -0,0 +1,97 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from karbor.services.protection.clients import s3
|
||||
from karbor.tests import base
|
||||
from karbor.tests.unit.protection.fake_s3_client import FakeS3Client
|
||||
import math
|
||||
import mock
|
||||
from oslo_config import cfg
|
||||
from oslo_utils import importutils
|
||||
import time
|
||||
|
||||
CONF = cfg.CONF
|
||||
|
||||
|
||||
class FakeConf(object):
|
||||
def __init__(self):
|
||||
super(FakeConf, self).__init__()
|
||||
self.lease_expire_window = 600
|
||||
self.lease_renew_window = 120
|
||||
self.lease_validity_window = 100
|
||||
|
||||
|
||||
class S3BankPluginTest(base.TestCase):
|
||||
def setUp(self):
|
||||
super(S3BankPluginTest, self).setUp()
|
||||
self.conf = FakeConf()
|
||||
self.fake_connection = FakeS3Client.connection()
|
||||
s3.create = mock.MagicMock()
|
||||
s3.create.return_value = self.fake_connection
|
||||
import_str = (
|
||||
"karbor.services.protection.bank_plugins."
|
||||
"s3_bank_plugin.S3BankPlugin"
|
||||
)
|
||||
self.object_bucket = "objects"
|
||||
s3_bank_plugin_cls = importutils.import_class(
|
||||
import_str=import_str)
|
||||
|
||||
self.s3_bank_plugin = s3_bank_plugin_cls(CONF, None)
|
||||
|
||||
def test_acquire_lease(self):
|
||||
self.s3_bank_plugin.acquire_lease()
|
||||
expire_time = math.floor(time.time()) + self.conf.lease_expire_window
|
||||
self.assertEqual(self.s3_bank_plugin.lease_expire_time, expire_time)
|
||||
|
||||
def test_renew_lease(self):
|
||||
self.s3_bank_plugin.acquire_lease()
|
||||
expire_time = math.floor(time.time()) + self.conf.lease_expire_window
|
||||
self.assertEqual(self.s3_bank_plugin.lease_expire_time, expire_time)
|
||||
time.sleep(5)
|
||||
self.s3_bank_plugin.acquire_lease()
|
||||
expire_time = math.floor(time.time()) + self.conf.lease_expire_window
|
||||
self.assertEqual(self.s3_bank_plugin.lease_expire_time, expire_time)
|
||||
|
||||
def test_check_lease_validity(self):
|
||||
self.s3_bank_plugin.acquire_lease()
|
||||
expire_time = math.floor(time.time()) + self.conf.lease_expire_window
|
||||
self.assertEqual(self.s3_bank_plugin.lease_expire_time, expire_time)
|
||||
is_valid = self.s3_bank_plugin.check_lease_validity()
|
||||
self.assertEqual(is_valid, True)
|
||||
|
||||
def test_delete_object(self):
|
||||
self.s3_bank_plugin.update_object("key", "value")
|
||||
self.s3_bank_plugin.delete_object("key")
|
||||
object_list = self.s3_bank_plugin.list_objects()
|
||||
self.assertEqual('key' in object_list, False)
|
||||
|
||||
def test_get_object(self):
|
||||
self.s3_bank_plugin.update_object("key", "value")
|
||||
value = self.s3_bank_plugin.get_object("key")
|
||||
self.assertEqual(value, "value")
|
||||
|
||||
def test_list_objects(self):
|
||||
self.s3_bank_plugin.update_object("key-1", "value-1")
|
||||
self.s3_bank_plugin.update_object("key-2", "value-2")
|
||||
objects = self.s3_bank_plugin.list_objects(prefix=None)
|
||||
self.assertEqual(len(objects), 2)
|
||||
|
||||
def test_update_object(self):
|
||||
self.s3_bank_plugin.update_object("key-1", "value-1")
|
||||
self.s3_bank_plugin.update_object("key-1", "value-2")
|
||||
contents = self.s3_bank_plugin.get_object('key-1')
|
||||
self.assertEqual(contents, "value-2")
|
||||
|
||||
def test_create_get_dict_object(self):
|
||||
self.s3_bank_plugin.update_object("dict_object", {"key": "value"})
|
||||
value = self.s3_bank_plugin.get_object("dict_object")
|
||||
self.assertEqual(value, {"key": "value"})
|
4
releasenotes/notes/s3-bank-plugin-b55ca44739d492b0.yaml
Executable file
4
releasenotes/notes/s3-bank-plugin-b55ca44739d492b0.yaml
Executable file
@ -0,0 +1,4 @@
|
||||
---
|
||||
features:
|
||||
- |
|
||||
Add support for using S3 comptatible storage as bank plugin.
|
@ -2,6 +2,7 @@
|
||||
# of appearance. Changing the order has an impact on the overall integration
|
||||
# process, which may cause wedges in the gate later.
|
||||
|
||||
botocore>=1.0.0 # Apache-2.0
|
||||
pbr!=2.1.0,>=2.0.0 # Apache-2.0
|
||||
Babel!=2.4.0,>=2.3.4 # BSD
|
||||
croniter>=0.3.4 # MIT License
|
||||
|
@ -44,6 +44,7 @@ karbor.database.migration_backend =
|
||||
karbor.protections =
|
||||
karbor-swift-bank-plugin = karbor.services.protection.bank_plugins.swift_bank_plugin:SwiftBankPlugin
|
||||
karbor-fs-bank-plugin = karbor.services.protection.bank_plugins.file_system_bank_plugin:FileSystemBankPlugin
|
||||
karbor-s3-bank-plugin = karbor.services.protection.bank_plugins.s3_bank_plugin:S3BankPlugin
|
||||
karbor-volume-protection-plugin = karbor.services.protection.protection_plugins.volume.cinder_protection_plugin:CinderBackupProtectionPlugin
|
||||
karbor-volume-snapshot-plugin = karbor.services.protection.protection_plugins.volume.volume_snapshot_plugin:VolumeSnapshotProtectionPlugin
|
||||
karbor-image-protection-plugin = karbor.services.protection.protection_plugins.image.image_protection_plugin:GlanceProtectionPlugin
|
||||
|
@ -3,7 +3,7 @@
|
||||
# process, which may cause wedges in the gate later.
|
||||
|
||||
hacking!=0.13.0,<0.14,>=0.12.0 # Apache-2.0
|
||||
|
||||
botocore>=1.0.0 # Apache-2.0
|
||||
coverage!=4.4,>=4.0 # Apache-2.0
|
||||
croniter>=0.3.4 # MIT License
|
||||
python-subunit>=0.0.18 # Apache-2.0/BSD
|
||||
|
Loading…
x
Reference in New Issue
Block a user