Added information about sahara settings to cluster

+ Added validation check on scaling using other engine

Change-Id: I3f36b949d1388f809c33334be6fcd0bdb30ade7a
Implements: blueprint cluster-persist-sahara-configuration
This commit is contained in:
Andrew Lazarev 2014-08-26 14:17:50 -07:00
parent fd4919d8b6
commit 6129484e34
16 changed files with 164 additions and 10 deletions

View File

@ -29,6 +29,7 @@ CLUSTER_DEFAULTS = {
"status_description": "",
"info": {},
"rollback_info": {},
"sahara_info": {},
}
NODE_GROUP_DEFAULTS = {

View File

@ -54,6 +54,7 @@ class Cluster(object):
info
extra
rollback_info - internal information required for rollback
sahara_info - internal information about sahara settings
node_groups - list of NodeGroup objects
cluster_template_id
cluster_template - ClusterTemplate object

View File

@ -206,7 +206,8 @@ class ClusterResource(Resource, objects.Cluster):
'cluster_template': (ClusterTemplateResource, None)
}
_filter_fields = ['management_private_key', 'extra', 'rollback_info']
_filter_fields = ['management_private_key', 'extra', 'rollback_info',
'sahara_info']
# EDP Resources

View File

@ -0,0 +1,40 @@
# Copyright 2014 OpenStack Foundation.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""convert clusters.status_description to LongText
Revision ID: 011
Revises: 010
Create Date: 2014-08-26 22:36:00.783444
"""
# revision identifiers, used by Alembic.
revision = '011'
down_revision = '010'
from alembic import op
import sqlalchemy as sa
from sahara.db.sqlalchemy import types as st
def upgrade():
op.add_column('clusters',
sa.Column('sahara_info', st.JsonEncoded()))
def downgrade():
op.drop_column('clusters', 'sahara_info')

View File

@ -66,6 +66,7 @@ class Cluster(mb.SaharaBase):
info = sa.Column(st.JsonDictType())
extra = sa.Column(st.JsonDictType())
rollback_info = sa.Column(st.JsonDictType())
sahara_info = sa.Column(st.JsonDictType())
node_groups = relationship('NodeGroup', cascade="all,delete",
backref='cluster', lazy='joined')
cluster_template_id = sa.Column(sa.String(36),

View File

@ -39,6 +39,9 @@ SSH_PORT = 22
class DirectEngine(e.Engine):
def get_type_and_version(self):
return "direct.1.0"
def create_cluster(self, cluster):
ctx = context.ctx()
self._update_rollback_strategy(cluster, shutdown=True)

View File

@ -54,6 +54,13 @@ class Engine:
def rollback_cluster(self, cluster, reason):
pass
@abc.abstractmethod
def get_type_and_version(self):
"""Returns engine type and version
Result should be in the form 'type.major.minor'.
"""
def get_node_group_image_username(self, node_group):
image_id = node_group.get_image_id()
return nova.client().images.get(image_id).username

View File

@ -32,6 +32,9 @@ LOG = logging.getLogger(__name__)
class HeatEngine(e.Engine):
def get_type_and_version(self):
return "heat.1.0"
def _add_volumes(self, ctx, cluster):
for instance in g.get_instances(cluster):
res_names = heat.client().resources.get(

View File

@ -28,6 +28,7 @@ from sahara.plugins import base as plugin_base
from sahara.service.edp import job_manager
from sahara.service import trusts
from sahara.utils import general as g
from sahara.utils import remote
from sahara.utils import rpc as rpc_utils
@ -45,6 +46,10 @@ def setup_ops(engine):
INFRA = engine
def get_engine_type_and_version():
return INFRA.get_type_and_version()
class LocalOps(object):
def provision_cluster(self, cluster_id):
context.spawn("cluster-creating-%s" % cluster_id,
@ -166,10 +171,21 @@ def _prepare_provisioning(cluster_id):
return ctx, cluster, plugin
def _update_sahara_info(ctx, cluster):
sahara_info = {
'infrastructure_engine': get_engine_type_and_version(),
'remote': remote.get_remote_type_and_version()}
return conductor.cluster_update(
ctx, cluster, {'sahara_info': sahara_info})
@ops_error_handler
def _provision_cluster(cluster_id):
ctx, cluster, plugin = _prepare_provisioning(cluster_id)
cluster = _update_sahara_info(ctx, cluster)
if CONF.use_identity_api_v3 and cluster.is_transient:
trusts.create_trust_for_cluster(cluster)

View File

@ -19,6 +19,7 @@ import sahara.exceptions as ex
from sahara.i18n import _
import sahara.plugins.base as plugin_base
import sahara.service.api as api
from sahara.service import ops
import sahara.service.validations.base as b
import sahara.service.validations.cluster_templates as cl_t
@ -69,6 +70,25 @@ CLUSTER_SCALING_SCHEMA = {
def check_cluster_scaling(data, cluster_id, **kwargs):
cluster = api.get_cluster(id=cluster_id)
cluster_engine = cluster.sahara_info.get(
'infrastructure_engine') if cluster.sahara_info else None
if (not cluster_engine and
not ops.get_engine_type_and_version().startswith('direct')):
raise ex.InvalidException(
_("Cluster created before Juno release "
"can't be scaled with %(engine)s engine") %
{"engine": ops.get_engine_type_and_version()})
if (cluster.sahara_info and
cluster_engine != ops.get_engine_type_and_version()):
raise ex.InvalidException(
_("Cluster created with %(old_engine)s infrastructure engine "
"can't be scaled with %(new_engine)s engine") %
{"old_engine": cluster.sahara_info.get('infrastructure_engine'),
"new_engine": ops.get_engine_type_and_version()})
if not (plugin_base.PLUGINS.is_plugin_implements(cluster.plugin_name,
'scale_cluster') and (
plugin_base.PLUGINS.is_plugin_implements(cluster.plugin_name,

View File

@ -397,3 +397,6 @@ class TestMigrations(base.BaseWalkMigrationTestCase, base.CommonTestsMixIn):
self.assertColumnExists(engine, 'templates_relations',
'auto_security_group')
self.assertColumnExists(engine, 'node_groups', 'open_ports')
def _check_011(self, engine, date):
self.assertColumnExists(engine, 'clusters', 'sahara_info')

View File

@ -62,10 +62,14 @@ class FakeINFRA():
def shutdown_cluster(self, cluster):
TestOPS.SEQUENCE.append('shutdown_cluster')
def rollback_cluster(self, cluster, reason):
TestOPS.SEQUENCE.append('rollback_cluster')
class TestOPS(base.SaharaTestCase):
SEQUENCE = []
@mock.patch('sahara.service.ops._update_sahara_info')
@mock.patch('sahara.service.ops._prepare_provisioning',
return_value=(mock.Mock(), mock.Mock(), FakePlugin()))
@mock.patch('sahara.utils.general.change_cluster_status')
@ -76,7 +80,7 @@ class TestOPS(base.SaharaTestCase):
@mock.patch('sahara.service.edp.job_manager.run_job')
def test_provision_cluster(self, p_run_job, p_job_exec, p_create_trust,
p_conf, p_cluster_get, p_change_status,
p_prep_provisioning):
p_prep_provisioning, p_update_sahara_info):
del self.SEQUENCE[:]
ops.INFRA = FakeINFRA()
ops._provision_cluster('123')

View File

@ -56,7 +56,9 @@ class TestScalingValidation(u.ValidationTestCase):
self.assertEqual(expected_message, six.text_type(e))
raise e
def test_check_cluster_scaling_resize_ng(self):
@mock.patch("sahara.service.ops.get_engine_type_and_version",
return_value="direct.1.1")
def test_check_cluster_scaling_resize_ng(self, engine_version):
ng1 = tu.make_ng_dict('ng', '42', ['namenode'], 1)
cluster = tu.create_cluster("cluster1", "tenant1", "vanilla", "1.2.1",
[ng1], status='Validating', id='12321')
@ -100,7 +102,9 @@ class TestScalingValidation(u.ValidationTestCase):
expected_message='Duplicates in node '
'group names are detected')
def test_check_cluster_scaling_add_ng(self):
@mock.patch("sahara.service.ops.get_engine_type_and_version",
return_value="direct.1.1")
def test_check_cluster_scaling_add_ng(self, engine_version):
ng1 = tu.make_ng_dict('ng', '42', ['namenode'], 1)
cluster = tu.create_cluster("test-cluster", "tenant", "vanilla",
"1.2.1", [ng1], status='Active',
@ -174,7 +178,9 @@ class TestScalingValidation(u.ValidationTestCase):
self.assertEqual(req_data.call_count, 1)
self._assert_calls(bad_req, bad_req_i)
def test_cluster_scaling_scheme_v_resize_ng(self):
@mock.patch("sahara.service.ops.get_engine_type_and_version",
return_value="direct.1.1")
def test_cluster_scaling_scheme_v_resize_ng(self, engine_version):
self._create_object_fun = mock.Mock()
data = {
}
@ -204,7 +210,9 @@ class TestScalingValidation(u.ValidationTestCase):
u"'count' is a required property")
)
def test_cluster_scaling_validation_add_ng(self):
@mock.patch("sahara.service.ops.get_engine_type_and_version",
return_value="direct.1.1")
def test_cluster_scaling_validation_add_ng(self, engine_version):
data = {
'add_node_groups': [
{
@ -238,7 +246,9 @@ class TestScalingValidation(u.ValidationTestCase):
u"of the given schemas")
)
def test_cluster_scaling_validation_right_schema(self):
@mock.patch("sahara.service.ops.get_engine_type_and_version",
return_value="direct.1.1")
def test_cluster_scaling_validation_right_schema(self, engine_version):
data = {
'add_node_groups': [
{
@ -283,7 +293,9 @@ class TestScalingValidation(u.ValidationTestCase):
data=data
)
def test_cluster_scaling_scheme_validation_types(self):
@mock.patch("sahara.service.ops.get_engine_type_and_version",
return_value="direct.1.1")
def test_cluster_scaling_scheme_validation_types(self, engine_version):
data = {
'resize_node_groups': {},
}
@ -309,7 +321,9 @@ class TestScalingValidation(u.ValidationTestCase):
u'[] is too short')
)
def test_cluster_scaling_v_right_data(self):
@mock.patch("sahara.service.ops.get_engine_type_and_version",
return_value="direct.1.1")
def test_cluster_scaling_v_right_data(self, engine_version):
self._create_object_fun = c_s.check_cluster_scaling
data = {
@ -331,3 +345,29 @@ class TestScalingValidation(u.ValidationTestCase):
patchers = u.start_patch()
self._assert_cluster_scaling_validation(data=data)
u.stop_patch(patchers)
@mock.patch("sahara.service.ops.get_engine_type_and_version",
return_value="direct.1.1")
def test_check_cluster_scaling_wrong_engine(self, engine_version):
ng1 = tu.make_ng_dict('ng', '42', ['namenode'], 1)
cluster = tu.create_cluster(
"cluster1", "tenant1", "vanilla", "1.2.1", [ng1],
status='Active', id='12321',
sahara_info={"infrastructure_engine": "heat.1.1"})
self._assert_check_scaling(
data={}, cluster=cluster,
expected_message="Cluster created with heat.1.1 infrastructure "
"engine can't be scaled with direct.1.1 engine")
@mock.patch("sahara.service.ops.get_engine_type_and_version",
return_value="heat.1.1")
def test_check_heat_cluster_scaling_missing_engine(self, engine_version):
ng1 = tu.make_ng_dict('ng', '42', ['namenode'], 1)
cluster = tu.create_cluster("cluster1", "tenant1", "vanilla", "1.2.1",
[ng1], status='Active', id='12321')
self._assert_check_scaling(
data={}, cluster=cluster,
expected_message="Cluster created before Juno release can't be "
"scaled with heat.1.1 engine")

View File

@ -20,7 +20,7 @@ from sahara.conductor import resource as r
def create_cluster(name, tenant, plugin, version, node_groups, **kwargs):
dct = {'name': name, 'tenant_id': tenant, 'plugin_name': plugin,
'hadoop_version': version, 'node_groups': node_groups,
'cluster_configs': {}}
'cluster_configs': {}, "sahara_info": {}}
dct.update(kwargs)
return r.ClusterResource(dct)

View File

@ -56,6 +56,13 @@ class RemoteDriver(object):
def get_userdata_template(self):
"""Returns userdata template preparing instance to work with driver."""
@abc.abstractmethod
def get_type_and_version(self):
"""Returns engine type and version
Result should be in the form 'type.major.minor'.
"""
@six.add_metaclass(abc.ABCMeta)
class Remote(object):
@ -118,6 +125,10 @@ def setup_remote(driver, engine):
DRIVER.setup_remote(engine)
def get_remote_type_and_version():
return DRIVER.get_type_and_version()
def _check_driver_is_loaded():
if not DRIVER:
raise ex.SystemError(_('Remote driver is not loaded. Most probably '

View File

@ -481,6 +481,9 @@ class BulkInstanceInteropHelper(InstanceInteropHelper):
class SshRemoteDriver(remote.RemoteDriver):
def get_type_and_version(self):
return "ssh.1.0"
def setup_remote(self, engine):
global _global_remote_semaphore
global INFRA