Merge "plugins api impl"

This commit is contained in:
Jenkins 2016-07-06 11:08:27 +00:00 committed by Gerrit Code Review
commit 439a5a4200
13 changed files with 542 additions and 16 deletions

View File

@ -207,6 +207,14 @@ def plugins_get_version(plugin_name, version):
return u.render(api.get_plugin(plugin_name, version).wrapped_dict) return u.render(api.get_plugin(plugin_name, version).wrapped_dict)
@rest.patch('/plugins/<plugin_name>')
@acl.enforce("data-processing:plugins:patch")
@v.check_exists(api.get_plugin, plugin_name='plugin_name')
@v.validate(v_p.plugin_update_validation_jsonschema(), v_p.check_plugin_update)
def plugins_update(plugin_name, data):
return u.render(api.update_plugin(plugin_name, data).wrapped_dict)
@rest.post_file('/plugins/<plugin_name>/<version>/convert-config/<name>') @rest.post_file('/plugins/<plugin_name>/<version>/convert-config/<name>')
@acl.enforce("data-processing:plugins:convert_config") @acl.enforce("data-processing:plugins:convert_config")
@v.check_exists(api.get_plugin, plugin_name='plugin_name', version='version') @v.check_exists(api.get_plugin, plugin_name='plugin_name', version='version')

View File

@ -16,6 +16,7 @@
from sahara.api import acl from sahara.api import acl
from sahara.service.api.v2 import plugins as api from sahara.service.api.v2 import plugins as api
from sahara.service import validation as v from sahara.service import validation as v
from sahara.service.validations import plugins as v_p
import sahara.utils.api as u import sahara.utils.api as u
@ -40,3 +41,11 @@ def plugins_get(plugin_name):
@v.check_exists(api.get_plugin, plugin_name='plugin_name', version='version') @v.check_exists(api.get_plugin, plugin_name='plugin_name', version='version')
def plugins_get_version(plugin_name, version): def plugins_get_version(plugin_name, version):
return u.render(api.get_plugin(plugin_name, version).wrapped_dict) return u.render(api.get_plugin(plugin_name, version).wrapped_dict)
@rest.patch('/plugins/<plugin_name>')
@acl.enforce("data-processing:plugins:patch")
@v.check_exists(api.get_plugin, plugin_name='plugin_name')
@v.validate(v_p.plugin_update_validation_jsonschema(), v_p.check_plugin_update)
def plugins_update(plugin_name, data):
return u.render(api.update_plugin(plugin_name, data).wrapped_dict)

View File

@ -22,7 +22,7 @@ from oslo_log import log
from sahara import exceptions as ex from sahara import exceptions as ex
from sahara.i18n import _ from sahara.i18n import _
from sahara.plugins import base as plugins_base from sahara.plugins import opts as plugins_base
from sahara.service.castellan import config as castellan from sahara.service.castellan import config as castellan
from sahara.topology import topology_helper from sahara.topology import topology_helper
from sahara.utils.notification import sender from sahara.utils.notification import sender

View File

@ -20,23 +20,17 @@ from oslo_log import log as logging
import six import six
from stevedore import enabled from stevedore import enabled
from sahara import conductor as cond
from sahara import exceptions as ex from sahara import exceptions as ex
from sahara.i18n import _ from sahara.i18n import _
from sahara.i18n import _LI from sahara.i18n import _LI
from sahara.plugins import labels
from sahara.utils import resources from sahara.utils import resources
conductor = cond.API
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
opts = [
cfg.ListOpt('plugins',
default=['vanilla', 'spark', 'cdh', 'ambari'],
help='List of plugins to be loaded. Sahara preserves the '
'order of the list when returning it.'),
]
CONF = cfg.CONF CONF = cfg.CONF
CONF.register_opts(opts)
def required(fun): def required(fun):
@ -87,7 +81,9 @@ class PluginInterface(resources.BaseResource):
class PluginManager(object): class PluginManager(object):
def __init__(self): def __init__(self):
self.plugins = {} self.plugins = {}
self.default_label_schema = {}
self._load_cluster_plugins() self._load_cluster_plugins()
self.label_handler = labels.LabelHandler(self.plugins)
def _load_cluster_plugins(self): def _load_cluster_plugins(self):
config_plugins = CONF.plugins config_plugins = CONF.plugins
@ -134,6 +130,8 @@ class PluginManager(object):
plugin = self.get_plugin(plugin_name) plugin = self.get_plugin(plugin_name)
if plugin: if plugin:
res = plugin.as_resource() res = plugin.as_resource()
res._info.update(self.label_handler.get_label_full_details(
plugin_name))
if version: if version:
if version in plugin.get_versions(): if version in plugin.get_versions():
res._info.update(plugin.get_version_details(version)) res._info.update(plugin.get_version_details(version))
@ -141,6 +139,15 @@ class PluginManager(object):
return None return None
return res return res
def update_plugin(self, plugin_name, values):
self.label_handler.update_plugin(plugin_name, values)
return self.serialize_plugin(plugin_name)
def validate_plugin_update(self, plugin_name, values):
return self.label_handler.validate_plugin_update(plugin_name, values)
def get_plugin_update_validation_jsonschema(self):
return self.label_handler.get_plugin_update_validation_jsonschema()
PLUGINS = None PLUGINS = None

View File

@ -34,6 +34,17 @@ class FakePluginProvider(p.ProvisioningPluginBase):
def get_versions(self): def get_versions(self):
return ["0.1"] return ["0.1"]
def get_labels(self):
return {
'plugin_labels': {
'enabled': {'status': True},
'hidden': {'status': True},
},
'version_labels': {
'0.1': {'enabled': {'status': True}}
}
}
def get_node_processes(self, hadoop_version): def get_node_processes(self, hadoop_version):
return { return {
"HDFS": ["namenode", "datanode"], "HDFS": ["namenode", "datanode"],

194
sahara/plugins/labels.py Normal file
View File

@ -0,0 +1,194 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
from sahara import conductor as cond
from sahara import context
from sahara import exceptions as ex
from sahara.i18n import _
conductor = cond.API
STABLE = {
'name': 'stable',
'mutable': False,
'description': "Indicates that plugin or its version are stable to be used"
}
DEPRECATED = {
'name': 'deprecated',
'mutable': False,
'description': "Plugin or its version is deprecated and will be removed "
"in future releases. Please, consider to use another "
"plugin or its version."
}
ENABLED = {
'name': 'enabled',
'mutable': True,
'description': "Plugin or its version is enabled and can be used by user."
}
HIDDEN = {
'name': 'hidden',
'mutable': True,
'description': "Existence of plugin or its version is hidden, but "
"still can be used for cluster creation by CLI and "
"directly by client."
}
PLUGIN_LABELS_SCOPE = 'plugin_labels'
VERSION_LABELS_SCOPE = 'version_labels'
MUTABLE = 'mutable'
LABEL_OBJECT = {
'type': 'object',
'properties': {
'status': {
'type': 'boolean',
}
},
"additionalProperties": False,
}
class LabelHandler(object):
def __init__(self, loaded_plugins):
self.plugins = loaded_plugins
def get_plugin_update_validation_jsonschema(self):
schema = {
'type': 'object', "additionalProperties": False,
'properties': {
VERSION_LABELS_SCOPE: {
'type': 'object', 'additionalProperties': False,
},
},
}
ln = [label['name'] for label in self.get_labels()]
labels_descr_object = {
'type': 'object',
"properties": {name: copy.deepcopy(LABEL_OBJECT) for name in ln},
"additionalProperties": False
}
schema['properties'][PLUGIN_LABELS_SCOPE] = copy.deepcopy(
labels_descr_object)
all_versions = []
for plugin_name in self.plugins.keys():
plugin = self.plugins[plugin_name]
all_versions.extend(plugin.get_versions())
all_versions = set(all_versions)
schema['properties'][VERSION_LABELS_SCOPE]['properties'] = {
ver: copy.deepcopy(labels_descr_object) for ver in all_versions
}
return schema
def get_default_label_details(self, plugin_name):
plugin = self.plugins.get(plugin_name)
return plugin.get_labels()
def get_label_details(self, plugin_name):
plugin = conductor.plugin_get(context.ctx(), plugin_name)
if not plugin:
plugin = self.get_default_label_details(plugin_name)
# keep only tenant
fields = ['name', 'id', 'updated_at', 'created_at']
for field in fields:
if field in plugin:
del plugin[field]
return plugin
def get_label_full_details(self, plugin_name):
return self.expand_data(self.get_label_details(plugin_name))
def get_labels(self):
return [HIDDEN, STABLE, ENABLED, DEPRECATED]
def get_labels_map(self):
return {
label['name']: label for label in self.get_labels()
}
def expand_data(self, plugin):
plugin_labels = plugin.get(PLUGIN_LABELS_SCOPE)
labels_map = self.get_labels_map()
for key in plugin_labels.keys():
key_desc = labels_map.get(key)
plugin_labels[key].update(key_desc)
del plugin_labels[key]['name']
for version in plugin.get(VERSION_LABELS_SCOPE):
vers_labels = plugin.get(VERSION_LABELS_SCOPE).get(version)
for key in vers_labels.keys():
key_desc = labels_map.get(key)
vers_labels[key].update(key_desc)
del vers_labels[key]['name']
return plugin
def _validate_labels_update(self, default_data, update_values):
for label in update_values.keys():
if label not in default_data.keys():
raise ex.InvalidDataException(
_("Label '%s' can't be updated because it's not "
"available for plugin or its version") % label)
if not default_data[label][MUTABLE]:
raise ex.InvalidDataException(
_("Label '%s' can't be updated because it's not "
"mutable") % label)
def validate_plugin_update(self, plugin_name, values):
plugin = self.plugins[plugin_name]
# it's important to get full details since we have mutability
default = self.get_label_full_details(plugin_name)
if values.get(PLUGIN_LABELS_SCOPE):
pl = values.get(PLUGIN_LABELS_SCOPE)
self._validate_labels_update(default[PLUGIN_LABELS_SCOPE], pl)
if values.get(VERSION_LABELS_SCOPE):
vl = values.get(VERSION_LABELS_SCOPE)
for version in vl.keys():
if version not in plugin.get_versions():
raise ex.InvalidDataException(
_("Unknown plugin version '%(version)s' of "
"%(plugin)s") % {
'version': version, 'plugin': plugin_name})
self._validate_labels_update(
default[VERSION_LABELS_SCOPE][version], vl[version])
def update_plugin(self, plugin_name, values):
ctx = context.ctx()
current = self.get_label_details(plugin_name)
if not conductor.plugin_get(ctx, plugin_name):
current['name'] = plugin_name
conductor.plugin_create(ctx, current)
del current['name']
if values.get(PLUGIN_LABELS_SCOPE):
for label in values.get(PLUGIN_LABELS_SCOPE).keys():
current[PLUGIN_LABELS_SCOPE][label].update(
values.get(PLUGIN_LABELS_SCOPE).get(label))
else:
del current[PLUGIN_LABELS_SCOPE]
if values.get(VERSION_LABELS_SCOPE):
vl = values.get(VERSION_LABELS_SCOPE)
for version in vl.keys():
for label in vl.get(version).keys():
current[VERSION_LABELS_SCOPE][version][label].update(
vl[version][label])
else:
del current[VERSION_LABELS_SCOPE]
conductor.plugin_update(context.ctx(), plugin_name, current)

26
sahara/plugins/opts.py Normal file
View File

@ -0,0 +1,26 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# File contains plugins opts to avoid cyclic imports issue
from oslo_config import cfg
opts = [
cfg.ListOpt('plugins',
default=['vanilla', 'spark', 'cdh', 'ambari'],
help='List of plugins to be loaded. Sahara preserves the '
'order of the list when returning it.'),
]
CONF = cfg.CONF
CONF.register_opts(opts)

View File

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
import copy
from sahara import exceptions as ex from sahara import exceptions as ex
from sahara.i18n import _ from sahara.i18n import _
@ -29,6 +30,17 @@ class ProvisioningPluginBase(plugins_base.PluginInterface):
def get_configs(self, hadoop_version): def get_configs(self, hadoop_version):
pass pass
@plugins_base.required_with_default
def get_labels(self):
versions = self.get_versions()
default = {'enabled': {'status': True}}
return {
'plugin_labels': copy.deepcopy(default),
'version_labels': {
version: copy.deepcopy(default) for version in versions
}
}
@plugins_base.required @plugins_base.required
def get_node_processes(self, hadoop_version): def get_node_processes(self, hadoop_version):
pass pass

View File

@ -225,6 +225,10 @@ def get_plugin(plugin_name, version=None):
return plugin_base.PLUGINS.serialize_plugin(plugin_name, version) return plugin_base.PLUGINS.serialize_plugin(plugin_name, version)
def update_plugin(plugin_name, values):
return plugin_base.PLUGINS.update_plugin(plugin_name, values)
def construct_ngs_for_scaling(cluster, additional_node_groups): def construct_ngs_for_scaling(cluster, additional_node_groups):
ctx = context.ctx() ctx = context.ctx()
additional = {} additional = {}

View File

@ -31,6 +31,10 @@ def get_plugin(plugin_name, version=None):
return plugin_base.PLUGINS.serialize_plugin(plugin_name, version) return plugin_base.PLUGINS.serialize_plugin(plugin_name, version)
def update_plugin(plugin_name, values):
return plugin_base.PLUGINS.update_plugin(plugin_name, values)
def construct_ngs_for_scaling(cluster, additional_node_groups): def construct_ngs_for_scaling(cluster, additional_node_groups):
ctx = context.ctx() ctx = context.ctx()
additional = {} additional = {}

View File

@ -15,9 +15,18 @@
import sahara.exceptions as ex import sahara.exceptions as ex
from sahara.i18n import _ from sahara.i18n import _
from sahara.plugins import base
def plugin_update_validation_jsonschema():
return base.PLUGINS.get_plugin_update_validation_jsonschema()
def check_convert_to_template(plugin_name, version, **kwargs): def check_convert_to_template(plugin_name, version, **kwargs):
raise ex.InvalidReferenceException( raise ex.InvalidReferenceException(
_("Requested plugin '%s' doesn't support converting config files " _("Requested plugin '%s' doesn't support converting config files "
"to cluster templates") % plugin_name) "to cluster templates") % plugin_name)
def check_plugin_update(plugin_name, values, **kwargs):
base.PLUGINS.validate_plugin_update(plugin_name, values)

View File

@ -0,0 +1,241 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import jsonschema.exceptions as json_exc
import testtools
from sahara import conductor as cond
from sahara import exceptions as ex
from sahara.plugins import base
from sahara.tests.unit import base as unit_base
from sahara.utils import api_validator
conductor = cond.API
EXPECTED_SCHEMA = {
"type": "object",
"additionalProperties": False,
"properties": {
"plugin_labels": {
"type": "object",
"additionalProperties": False,
"properties": {
"hidden": {
"type": "object",
"additionalProperties": False,
"properties": {
"status": {
"type": "boolean"
}
}
},
"stable": {
"type": "object",
"additionalProperties": False,
"properties": {
"status": {
"type": "boolean"
}
}
},
"enabled": {
"type": "object",
"additionalProperties": False,
"properties": {
"status": {
"type": "boolean"
}
}
},
"deprecated": {
"type": "object",
"additionalProperties": False,
"properties": {
"status": {
"type": "boolean"
}
}
}
}
},
"version_labels": {
"type": "object",
"additionalProperties": False,
"properties": {
"0.1": {
"type": "object",
"additionalProperties": False,
"properties": {
"hidden": {
"type": "object",
"additionalProperties": False,
"properties": {
"status": {
"type": "boolean"
}
}
},
"stable": {
"type": "object",
"additionalProperties": False,
"properties": {
"status": {
"type": "boolean"
}
}
},
"enabled": {
"type": "object",
"additionalProperties": False,
"properties": {
"status": {
"type": "boolean"
}
}
},
"deprecated": {
"type": "object",
"additionalProperties": False,
"properties": {
"status": {
"type": "boolean"
}
}
}
}
}
}
},
}
}
class TestPluginLabels(unit_base.SaharaWithDbTestCase):
def test_validate_default_labels_load(self):
all_plugins = ['cdh', 'ambari', 'fake', 'storm', 'mapr', 'spark',
'vanilla']
self.override_config('plugins', all_plugins)
manager = base.PluginManager()
for plugin in all_plugins:
data = manager.label_handler.get_label_details(plugin)
self.assertIsNotNone(data)
# order doesn't play a role
self.assertIsNotNone(data['plugin_labels'])
self.assertEqual(
sorted(list(manager.get_plugin(plugin).get_versions())),
sorted(list(data.get('version_labels').keys())))
def test_get_label_full_details(self):
self.override_config('plugins', ['fake'])
lh = base.PluginManager().label_handler
result = lh.get_label_full_details('fake')
self.assertIsNotNone(result.get('plugin_labels'))
self.assertIsNotNone(result.get('version_labels'))
pl = result.get('plugin_labels')
self.assertEqual(
['enabled', 'hidden'],
sorted(list(pl.keys()))
)
for lb in ['hidden', 'enabled']:
self.assertEqual(
['description', 'mutable', 'status'],
sorted(list(pl[lb]))
)
vl = result.get('version_labels')
self.assertEqual(['0.1'], list(vl.keys()))
vl = vl.get('0.1')
self.assertEqual(
['enabled'], list(vl.keys()))
self.assertEqual(
['description', 'mutable', 'status'],
sorted(list(vl['enabled']))
)
def test_validate_plugin_update(self):
def validate(plugin_name, values, validator, lh):
validator.validate(values)
lh.validate_plugin_update(plugin_name, values)
values = {'plugin_labels': {'enabled': {'status': False}}}
self.override_config('plugins', ['fake', 'spark'])
lh = base.PluginManager()
validator = api_validator.ApiValidator(
lh.get_plugin_update_validation_jsonschema())
validate('fake', values, validator, lh)
values = {'plugin_labels': {'not_exists': {'status': False}}}
with testtools.ExpectedException(json_exc.ValidationError):
validate('fake', values, validator, lh)
values = {'plugin_labels': {'enabled': {'status': 'False'}}}
with testtools.ExpectedException(json_exc.ValidationError):
validate('fake', values, validator, lh)
values = {'field': {'blala': 'blalalalalala'}}
with testtools.ExpectedException(json_exc.ValidationError):
validate('fake', values, validator, lh)
values = {'plugin_labels': {'hidden': {'status': True}}}
with testtools.ExpectedException(ex.InvalidDataException):
# valid under schema, but not valid under validator
# hidden is not available to spark
validate('spark', values, validator, lh)
values = {'plugin_labels': {'enabled': {'mutable': False}}}
with testtools.ExpectedException(json_exc.ValidationError):
validate('spark', values, validator, lh)
values = {'version_labels': {'enabled': {'status': False}}}
with testtools.ExpectedException(json_exc.ValidationError):
validate('spark', values, validator, lh)
values = {'version_labels': {'0.1': {'enabled': {'status': False}}}}
validate('fake', values, validator, lh)
values = {'version_labels': {'0.1': {'enabled': {'status': False}}}}
with testtools.ExpectedException(ex.InvalidDataException):
validate('spark', values, validator, lh)
values = {'version_labels': {'0.1': {'hidden': {'status': True}}}}
with testtools.ExpectedException(ex.InvalidDataException):
validate('fake', values, validator, lh)
def test_jsonschema(self):
self.override_config('plugins', ['fake'])
lh = base.PluginManager()
schema = lh.get_plugin_update_validation_jsonschema()
self.assertEqual(EXPECTED_SCHEMA, schema)
def test_update(self):
self.override_config('plugins', ['fake'])
lh = base.PluginManager()
data = lh.update_plugin('fake', values={
'plugin_labels': {'enabled': {'status': False}}}).dict
# enabled is updated, but hidden still same
self.assertFalse(data['plugin_labels']['enabled']['status'])
self.assertTrue(data['plugin_labels']['hidden']['status'])
data = lh.update_plugin('fake', values={
'version_labels': {'0.1': {'enabled': {'status': False}}}}).dict
self.assertFalse(data['plugin_labels']['enabled']['status'])
self.assertTrue(data['plugin_labels']['hidden']['status'])
self.assertFalse(data['version_labels']['0.1']['enabled']['status'])

View File

@ -135,12 +135,8 @@ class FakePlugin(pr_base.ProvisioningPluginBase):
class FakePluginManager(pl_base.PluginManager): class FakePluginManager(pl_base.PluginManager):
def __init__(self, calls_order): def __init__(self, calls_order):
self.calls = calls_order super(FakePluginManager, self).__init__()
self.plugins['fake'] = FakePlugin(calls_order)
def get_plugin(self, plugin_name):
if plugin_name == "fake":
return FakePlugin(self.calls)
return None
class FakeOps(object): class FakeOps(object):
@ -296,6 +292,11 @@ class TestApi(base.SaharaWithDbTestCase):
self.assertIsNone(api.get_plugin('fake', '0.3')) self.assertIsNone(api.get_plugin('fake', '0.3'))
data = api.get_plugin('fake').dict data = api.get_plugin('fake').dict
self.assertIsNotNone(data.get('version_labels'))
self.assertIsNotNone(data.get('plugin_labels'))
del data['plugin_labels']
del data['version_labels']
self.assertEqual({ self.assertEqual({
'description': "Some description", 'description': "Some description",
'name': 'fake', 'name': 'fake',