Add the pod with cinder volumes protection plugin

Change-Id: I254a1900b7c67cdf7f0f4779195123319771fd17
Implements: blueprint kubernetes-pods-protection-plugin
This commit is contained in:
chenying 2017-09-07 22:07:18 +08:00
parent 892f96c9a6
commit 34e99f82bc
6 changed files with 568 additions and 0 deletions

View File

@ -0,0 +1,21 @@
[provider]
name = OS Infra Provider
description = The provider about running the kubernetes cluster on OpenStack with OpenStack cloud provider.
id = e3982e71-f44d-4b09-8abd-3e53e4b80d10
plugin=karbor-volume-protection-plugin
plugin=karbor-pod-protection-plugin
bank=karbor-swift-bank-plugin
enabled=True
[swift_client]
swift_auth_url=http://127.0.0.1/identity
swift_user=demo
swift_key=password
swift_tenant_name=demo
[swift_bank_plugin]
lease_expire_window=120
lease_renew_window=100
lease_validity_window=100

View File

@ -0,0 +1,38 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
OPTIONS_SCHEMA = {
"title": "Pod Protection Options",
"type": "object",
"properties": {},
"required": []
}
RESTORE_SCHEMA = {
"title": "Pod Protection Restore",
"type": "object",
"properties": {
"restore_name": {
"type": "string",
"title": "Restore Pod Name",
"description": "The name of the restore pod",
},
},
"required": ["restore_name"]
}
SAVED_INFO_SCHEMA = {
"title": "Pod Protection Saved Info",
"type": "object",
"properties": {},
"required": []
}

View File

@ -0,0 +1,330 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from functools import partial
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import uuidutils
from karbor.common import constants
from karbor import exception
from karbor.services.protection.client_factory import ClientFactory
from karbor.services.protection import protection_plugin
from karbor.services.protection.protection_plugins.pod \
import pod_plugin_schemas
from karbor.services.protection.protection_plugins import utils
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
pod_backup_opts = [
cfg.IntOpt(
'poll_interval', default=15,
help='Poll interval for Pod backup status'
),
]
class ProtectOperation(protection_plugin.Operation):
def on_main(self, checkpoint, resource, context, parameters, **kwargs):
pod_id = resource.id
pod_name = resource.name
bank_section = checkpoint.get_resource_bank_section(pod_id)
k8s_client = ClientFactory.create_client("k8s", context)
resource_definition = {"resource_id": pod_id}
LOG.info("Creating pod backup, id: %(pod_id)s) name: "
"%(pod_name)s.", {"pod_id": pod_id, "pod_name": pod_name})
try:
bank_section.update_object("status",
constants.RESOURCE_STATUS_PROTECTING)
# get metadata about pod
pod_namespace, k8s_pod_name = pod_name.split(":")
pod = k8s_client.read_namespaced_pod(
k8s_pod_name, pod_namespace)
resource_definition["resource_name"] = pod_name
resource_definition["namespace"] = pod_namespace
mounted_volumes_list = self._get_mounted_volumes(
k8s_client, pod, pod_namespace)
containers_list = self._get_containers(pod)
# save all pod's metadata
pod_metadata = {
'apiVersion': pod.api_version,
'kind': pod.kind,
'metadata': {
'labels': pod.metadata.labels,
'name': pod.metadata.name,
'namespace': pod.metadata.namespace,
},
'spec': {
'containers': containers_list,
'volumes': mounted_volumes_list,
'restartPolicy': pod.spec.restart_policy
}
}
resource_definition["pod_metadata"] = pod_metadata
LOG.debug("Creating pod backup, pod_metadata: %s.",
pod_metadata)
bank_section.update_object("metadata", resource_definition)
bank_section.update_object("status",
constants.RESOURCE_STATUS_AVAILABLE)
LOG.info("Finish backup pod, pod_id: %s.", pod_id)
except Exception as err:
LOG.exception("Create pod backup failed, pod_id: %s.", pod_id)
bank_section.update_object("status",
constants.RESOURCE_STATUS_ERROR)
raise exception.CreateResourceFailed(
name="Pod Backup",
reason=err,
resource_id=pod_id,
resource_type=constants.POD_RESOURCE_TYPE)
def _get_mounted_volumes(self, k8s_client, pod, pod_namespace):
mounted_volumes_list = []
for volume in pod.spec.volumes:
volume_pvc = volume.persistent_volume_claim
volume_cinder = volume.cinder
volume_pvc_name = volume.name
if volume_pvc:
pvc_name = volume_pvc.claim_name
pvc = k8s_client.read_namespaced_persistent_volume_claim(
pvc_name, pod_namespace)
pv_name = pvc.spec.volume_name
if pv_name:
pv = k8s_client.read_persistent_volume(pv_name)
if pv.spec.cinder:
pod_cinder_volume = {
'name': volume_pvc_name,
'cinder': {
"volumeID": pv.spec.cinder.volume_id,
"fsType": pv.spec.cinder.fs_type,
"readOnly": pv.spec.cinder.read_only
}
}
mounted_volumes_list.append(pod_cinder_volume)
elif volume_cinder:
pod_cinder_volume = {
'name': volume_pvc_name,
'cinder': {
"volumeID": volume_cinder.volume_id,
"fsType": volume_cinder.fs_type,
"readOnly": volume_cinder.read_only
}
}
mounted_volumes_list.append(pod_cinder_volume)
return mounted_volumes_list
def _get_containers(self, pod):
containers_list = []
for spec_container in pod.spec.containers:
resources = (spec_container.resources.to_dict()
if spec_container.resources else None)
volume_mounts_list = []
if spec_container.volume_mounts:
for spec_volume_mount in spec_container.volume_mounts:
if 'serviceaccount' in spec_volume_mount.mount_path:
continue
volume_mount = {
'name': spec_volume_mount.name,
'mountPath': spec_volume_mount.mount_path,
'readOnly': spec_volume_mount.read_only,
}
volume_mounts_list.append(volume_mount)
container = {
'command': spec_container.command,
'image': spec_container.image,
'name': spec_container.name,
'resources': resources,
'volumeMounts': volume_mounts_list
}
containers_list.append(container)
return containers_list
class DeleteOperation(protection_plugin.Operation):
def on_main(self, checkpoint, resource, context, parameters, **kwargs):
resource_id = resource.id
bank_section = checkpoint.get_resource_bank_section(resource_id)
LOG.info("Deleting pod backup, pod_id: %s.", resource_id)
try:
bank_section.update_object("status",
constants.RESOURCE_STATUS_DELETING)
objects = bank_section.list_objects()
for obj in objects:
if obj == "status":
continue
bank_section.delete_object(obj)
bank_section.update_object("status",
constants.RESOURCE_STATUS_DELETED)
LOG.info("Finish delete pod, pod_id: %s.", resource_id)
except Exception as err:
LOG.error("Delete backup failed, pod_id: %s.", resource_id)
bank_section.update_object("status",
constants.RESOURCE_STATUS_ERROR)
raise exception.DeleteResourceFailed(
name="Pod Backup",
reason=err,
resource_id=resource_id,
resource_type=constants.POD_RESOURCE_TYPE)
class RestoreOperation(protection_plugin.Operation):
def __init__(self, poll_interval):
super(RestoreOperation, self).__init__()
self._interval = poll_interval
def on_complete(self, checkpoint, resource, context, parameters, **kwargs):
original_pod_id = resource.id
LOG.info("Restoring pod backup, pod_id: %s.", original_pod_id)
update_method = None
try:
resource_definition = checkpoint.get_resource_bank_section(
original_pod_id).get_object("metadata")
LOG.debug("Restoring pod backup, metadata: %s.",
resource_definition)
k8s_client = ClientFactory.create_client("k8s", context)
new_resources = kwargs.get("new_resources")
# restore pod
new_pod_name = self._restore_pod_instance(
k8s_client, new_resources, original_pod_id,
parameters.get(
"restore_name",
"karbor-restored-pod-%s" % uuidutils.generate_uuid()),
resource_definition)
update_method = partial(utils.update_resource_restore_result,
kwargs.get('restore'), resource.type,
new_pod_name)
update_method(constants.RESOURCE_STATUS_RESTORING)
pod_namespace = resource_definition["namespace"]
self._wait_pod_to_running(k8s_client, new_pod_name,
pod_namespace)
new_resources[original_pod_id] = new_pod_name
update_method(constants.RESOURCE_STATUS_AVAILABLE)
LOG.info("Finish restore pod, pod_id: %s.",
original_pod_id)
except Exception as e:
if update_method:
update_method(constants.RESOURCE_STATUS_ERROR, str(e))
LOG.exception("Restore pod backup failed, pod_id: %s.",
original_pod_id)
raise exception.RestoreResourceFailed(
name="Pod Backup",
reason=e,
resource_id=original_pod_id,
resource_type=constants.POD_RESOURCE_TYPE
)
def _restore_pod_instance(self, k8s_client, new_resources,
original_id, restore_name,
resource_definition):
pod_namespace = resource_definition["namespace"]
pod_metadata = resource_definition["pod_metadata"]
mounted_volumes_list = pod_metadata['spec'].get("volumes", None)
if mounted_volumes_list:
for mounted_volume in mounted_volumes_list:
cinder_volume = mounted_volume.get("cinder", None)
if cinder_volume:
original_volume_id = cinder_volume["volumeID"]
cinder_volume["volumeID"] = new_resources.get(
original_volume_id)
pod_metadata["metadata"]["name"] = restore_name
pod_manifest = pod_metadata
LOG.debug("Restoring pod instance, pod_manifest: %s.",
pod_manifest)
try:
pod = k8s_client.create_namespaced_pod(body=pod_manifest,
namespace=pod_namespace)
except Exception as ex:
LOG.error('Error creating pod (pod_id:%(pod_id)s): '
'%(reason)s', {'server_id': original_id, 'reason': ex})
raise
return pod.metadata.name
def _wait_pod_to_running(self, k8s_client, pod_name, pod_namespace):
def _get_pod_status():
try:
pod = k8s_client.read_namespaced_pod(name=pod_name,
namespace=pod_namespace)
return pod.status.phase
except Exception as ex:
LOG.error('Fetch pod(%(pod_name)s) failed, '
'reason: %(reason)s',
{'pod_name': pod_name,
'reason': ex})
return 'ERROR'
is_success = utils.status_poll(
_get_pod_status,
interval=self._interval,
success_statuses={'Running', },
failure_statuses={'ERROR', 'Failed', 'Unknown'},
ignore_statuses={'Pending'},
)
if not is_success:
raise Exception('The pod does not run successfully')
class PodProtectionPlugin(protection_plugin.ProtectionPlugin):
_SUPPORT_RESOURCE_TYPES = [constants.POD_RESOURCE_TYPE]
def __init__(self, config=None):
super(PodProtectionPlugin, self).__init__(config)
self._config.register_opts(pod_backup_opts,
'pod_backup_protection_plugin')
self._poll_interval = (
self._config.pod_backup_protection_plugin.poll_interval)
@classmethod
def get_supported_resources_types(cls):
return cls._SUPPORT_RESOURCE_TYPES
@classmethod
def get_options_schema(cls, resource_type):
return pod_plugin_schemas.OPTIONS_SCHEMA
@classmethod
def get_restore_schema(cls, resource_type):
return pod_plugin_schemas.RESTORE_SCHEMA
@classmethod
def get_saved_info_schema(cls, resource_type):
return pod_plugin_schemas.SAVED_INFO_SCHEMA
@classmethod
def get_saved_info(cls, metadata_store, resource):
pass
def get_protect_operation(self, resource):
return ProtectOperation()
def get_restore_operation(self, resource):
return RestoreOperation(self._poll_interval)
def get_delete_operation(self, resource):
return DeleteOperation()

View File

@ -0,0 +1,178 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from karbor.common import constants
from karbor.context import RequestContext
from karbor.resource import Resource
from karbor.services.protection.bank_plugin import Bank
from karbor.services.protection.bank_plugin import BankPlugin
from karbor.services.protection.bank_plugin import BankSection
from karbor.services.protection import client_factory
from karbor.services.protection.clients import k8s
from karbor.services.protection.protection_plugins. \
pod.pod_protection_plugin import PodProtectionPlugin
from karbor.services.protection.protection_plugins.pod \
import pod_plugin_schemas
from kubernetes.client.models.v1_object_meta import V1ObjectMeta
from kubernetes.client.models.v1_pod import V1Pod
from kubernetes.client.models.v1_pod_spec import V1PodSpec
from kubernetes.client.models.v1_pod_status import V1PodStatus
from karbor.tests import base
import mock
from oslo_config import cfg
from oslo_config import fixture
class FakeBankPlugin(BankPlugin):
def update_object(self, key, value):
return
def get_object(self, key):
return
def list_objects(self, prefix=None, limit=None, marker=None,
sort_dir=None):
return
def delete_object(self, key):
return
def get_owner_id(self):
return
fake_bank = Bank(FakeBankPlugin())
fake_bank_section = BankSection(bank=fake_bank, section="fake")
def call_hooks(operation, checkpoint, resource, context, parameters, **kwargs):
def noop(*args, **kwargs):
pass
hooks = (
'on_prepare_begin',
'on_prepare_finish',
'on_main',
'on_complete',
)
for hook_name in hooks:
hook = getattr(operation, hook_name, noop)
hook(checkpoint, resource, context, parameters, **kwargs)
class Checkpoint(object):
def __init__(self):
self.bank_section = fake_bank_section
def get_resource_bank_section(self, resource_id):
return self.bank_section
class PodProtectionPluginTest(base.TestCase):
def setUp(self):
super(PodProtectionPluginTest, self).setUp()
plugin_config = cfg.ConfigOpts()
plugin_config_fixture = self.useFixture(fixture.Config(plugin_config))
plugin_config_fixture.load_raw_values(
group='poll_interval',
poll_interval=0,
)
self.plugin = PodProtectionPlugin(plugin_config)
k8s.register_opts(cfg.CONF)
cfg.CONF.set_default('k8s_host',
'https://192.168.98.35:6443',
'k8s_client')
cfg.CONF.set_default('k8s_ssl_ca_cert',
'/etc/provider.d/server-ca.crt',
'k8s_client')
cfg.CONF.set_default('k8s_cert_file',
'/etc/provider.d/client-admin.crt',
'k8s_client')
cfg.CONF.set_default('k8s_key_file',
'/etc/provider.d/client-admin.key',
'k8s_client')
self.cntxt = RequestContext(user_id='demo',
project_id='abcd',
auth_token='efgh',
service_catalog=None)
self.k8s_client = client_factory.ClientFactory.create_client(
"k8s", self.cntxt)
self.checkpoint = Checkpoint()
def test_get_options_schema(self):
options_schema = self.plugin.get_options_schema(
constants.POD_RESOURCE_TYPE)
self.assertEqual(options_schema,
pod_plugin_schemas.OPTIONS_SCHEMA)
def test_get_restore_schema(self):
options_schema = self.plugin.get_restore_schema(
constants.POD_RESOURCE_TYPE)
self.assertEqual(options_schema,
pod_plugin_schemas.RESTORE_SCHEMA)
def test_get_saved_info_schema(self):
options_schema = self.plugin.get_saved_info_schema(
constants.POD_RESOURCE_TYPE)
self.assertEqual(options_schema,
pod_plugin_schemas.SAVED_INFO_SCHEMA)
@mock.patch('karbor.services.protection.clients.k8s.create')
def test_create_backup(self, mock_k8s_create):
resource = Resource(id="c88b92a8-e8b4-504c-bad4-343d92061871",
type=constants.POD_RESOURCE_TYPE,
name='default:busybox-test')
fake_bank_section.update_object = mock.MagicMock()
protect_operation = self.plugin.get_protect_operation(resource)
mock_k8s_create.return_value = self.k8s_client
self.k8s_client.read_namespaced_pod = mock.MagicMock()
self.k8s_client.read_namespaced_pod.return_value = V1Pod(
api_version="v1",
kind="Pod",
metadata=V1ObjectMeta(
name="busybox-test",
namespace="default",
uid="dd8236e1-8c6c-11e7-9b7a-fa163e18e097"),
spec=V1PodSpec(volumes=[], containers=[]),
status=V1PodStatus(phase="Running"))
fake_bank_section.update_object = mock.MagicMock()
call_hooks(protect_operation, self.checkpoint, resource, self.cntxt,
{})
def test_delete_backup(self):
resource = Resource(id="c88b92a8-e8b4-504c-bad4-343d92061871",
type=constants.POD_RESOURCE_TYPE,
name='default:busybox-test')
fake_bank_section.get_object = mock.MagicMock()
fake_bank_section.get_object.return_value = {
"pod_id": "1234"}
fake_bank_section.list_objects = mock.MagicMock()
fake_bank_section.list_objects.return_value = []
delete_operation = self.plugin.get_delete_operation(resource)
call_hooks(delete_operation, self.checkpoint, resource, self.cntxt,
{})
def test_get_supported_resources_types(self):
types = self.plugin.get_supported_resources_types()
self.assertEqual(types,
[constants.POD_RESOURCE_TYPE])

View File

@ -46,6 +46,7 @@ karbor.protections =
karbor-noop-protection-plugin = karbor.services.protection.protection_plugins.noop_plugin:NoopProtectionPlugin
karbor-network-protection-plugin = karbor.services.protection.protection_plugins.network.neutron_protection_plugin:NeutronProtectionPlugin
karbor-database-protection-plugin = karbor.services.protection.protection_plugins.database.database_backup_plugin:DatabaseBackupProtectionPlugin
karbor-pod-protection-plugin = karbor.services.protection.protection_plugins.pod.pod_protection_plugin:PodProtectionPlugin
karbor.provider =
provider-registry = karbor.services.protection.provider:ProviderRegistry
karbor.protectables =