From 6129484e34b15f4024cbdb68f6ded18c1c0eec46 Mon Sep 17 00:00:00 2001 From: Andrew Lazarev Date: Tue, 26 Aug 2014 14:17:50 -0700 Subject: [PATCH] Added information about sahara settings to cluster + Added validation check on scaling using other engine Change-Id: I3f36b949d1388f809c33334be6fcd0bdb30ade7a Implements: blueprint cluster-persist-sahara-configuration --- sahara/conductor/manager.py | 1 + sahara/conductor/objects.py | 1 + sahara/conductor/resource.py | 3 +- .../versions/011_sahara_info.py | 40 ++++++++++++++ sahara/db/sqlalchemy/models.py | 1 + sahara/service/direct_engine.py | 3 ++ sahara/service/engine.py | 7 +++ sahara/service/heat_engine.py | 3 ++ sahara/service/ops.py | 16 ++++++ .../service/validations/clusters_scaling.py | 20 +++++++ .../unit/db/migration/test_migrations.py | 3 ++ sahara/tests/unit/service/test_ops.py | 6 ++- .../test_cluster_scaling_validation.py | 54 ++++++++++++++++--- sahara/tests/unit/testutils.py | 2 +- sahara/utils/remote.py | 11 ++++ sahara/utils/ssh_remote.py | 3 ++ 16 files changed, 164 insertions(+), 10 deletions(-) create mode 100644 sahara/db/migration/alembic_migrations/versions/011_sahara_info.py diff --git a/sahara/conductor/manager.py b/sahara/conductor/manager.py index 8b30725e..ebf3bbc2 100644 --- a/sahara/conductor/manager.py +++ b/sahara/conductor/manager.py @@ -29,6 +29,7 @@ CLUSTER_DEFAULTS = { "status_description": "", "info": {}, "rollback_info": {}, + "sahara_info": {}, } NODE_GROUP_DEFAULTS = { diff --git a/sahara/conductor/objects.py b/sahara/conductor/objects.py index 359dd1ac..6df9a766 100644 --- a/sahara/conductor/objects.py +++ b/sahara/conductor/objects.py @@ -54,6 +54,7 @@ class Cluster(object): info extra rollback_info - internal information required for rollback + sahara_info - internal information about sahara settings node_groups - list of NodeGroup objects cluster_template_id cluster_template - ClusterTemplate object diff --git a/sahara/conductor/resource.py b/sahara/conductor/resource.py index 42ab0db3..b8767a9e 100644 --- a/sahara/conductor/resource.py +++ b/sahara/conductor/resource.py @@ -206,7 +206,8 @@ class ClusterResource(Resource, objects.Cluster): 'cluster_template': (ClusterTemplateResource, None) } - _filter_fields = ['management_private_key', 'extra', 'rollback_info'] + _filter_fields = ['management_private_key', 'extra', 'rollback_info', + 'sahara_info'] # EDP Resources diff --git a/sahara/db/migration/alembic_migrations/versions/011_sahara_info.py b/sahara/db/migration/alembic_migrations/versions/011_sahara_info.py new file mode 100644 index 00000000..10a8fcd9 --- /dev/null +++ b/sahara/db/migration/alembic_migrations/versions/011_sahara_info.py @@ -0,0 +1,40 @@ +# Copyright 2014 OpenStack Foundation. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""convert clusters.status_description to LongText + +Revision ID: 011 +Revises: 010 +Create Date: 2014-08-26 22:36:00.783444 + +""" + +# revision identifiers, used by Alembic. +revision = '011' +down_revision = '010' + +from alembic import op +import sqlalchemy as sa + +from sahara.db.sqlalchemy import types as st + + +def upgrade(): + op.add_column('clusters', + sa.Column('sahara_info', st.JsonEncoded())) + + +def downgrade(): + op.drop_column('clusters', 'sahara_info') diff --git a/sahara/db/sqlalchemy/models.py b/sahara/db/sqlalchemy/models.py index bc7c8604..267b0122 100644 --- a/sahara/db/sqlalchemy/models.py +++ b/sahara/db/sqlalchemy/models.py @@ -66,6 +66,7 @@ class Cluster(mb.SaharaBase): info = sa.Column(st.JsonDictType()) extra = sa.Column(st.JsonDictType()) rollback_info = sa.Column(st.JsonDictType()) + sahara_info = sa.Column(st.JsonDictType()) node_groups = relationship('NodeGroup', cascade="all,delete", backref='cluster', lazy='joined') cluster_template_id = sa.Column(sa.String(36), diff --git a/sahara/service/direct_engine.py b/sahara/service/direct_engine.py index 0c5d0d50..4513bb28 100644 --- a/sahara/service/direct_engine.py +++ b/sahara/service/direct_engine.py @@ -39,6 +39,9 @@ SSH_PORT = 22 class DirectEngine(e.Engine): + def get_type_and_version(self): + return "direct.1.0" + def create_cluster(self, cluster): ctx = context.ctx() self._update_rollback_strategy(cluster, shutdown=True) diff --git a/sahara/service/engine.py b/sahara/service/engine.py index 3b3eb245..e6fa53b1 100644 --- a/sahara/service/engine.py +++ b/sahara/service/engine.py @@ -54,6 +54,13 @@ class Engine: def rollback_cluster(self, cluster, reason): pass + @abc.abstractmethod + def get_type_and_version(self): + """Returns engine type and version + + Result should be in the form 'type.major.minor'. + """ + def get_node_group_image_username(self, node_group): image_id = node_group.get_image_id() return nova.client().images.get(image_id).username diff --git a/sahara/service/heat_engine.py b/sahara/service/heat_engine.py index 38746dab..11957a58 100644 --- a/sahara/service/heat_engine.py +++ b/sahara/service/heat_engine.py @@ -32,6 +32,9 @@ LOG = logging.getLogger(__name__) class HeatEngine(e.Engine): + def get_type_and_version(self): + return "heat.1.0" + def _add_volumes(self, ctx, cluster): for instance in g.get_instances(cluster): res_names = heat.client().resources.get( diff --git a/sahara/service/ops.py b/sahara/service/ops.py index eaa28226..a652774f 100644 --- a/sahara/service/ops.py +++ b/sahara/service/ops.py @@ -28,6 +28,7 @@ from sahara.plugins import base as plugin_base from sahara.service.edp import job_manager from sahara.service import trusts from sahara.utils import general as g +from sahara.utils import remote from sahara.utils import rpc as rpc_utils @@ -45,6 +46,10 @@ def setup_ops(engine): INFRA = engine +def get_engine_type_and_version(): + return INFRA.get_type_and_version() + + class LocalOps(object): def provision_cluster(self, cluster_id): context.spawn("cluster-creating-%s" % cluster_id, @@ -166,10 +171,21 @@ def _prepare_provisioning(cluster_id): return ctx, cluster, plugin +def _update_sahara_info(ctx, cluster): + sahara_info = { + 'infrastructure_engine': get_engine_type_and_version(), + 'remote': remote.get_remote_type_and_version()} + + return conductor.cluster_update( + ctx, cluster, {'sahara_info': sahara_info}) + + @ops_error_handler def _provision_cluster(cluster_id): ctx, cluster, plugin = _prepare_provisioning(cluster_id) + cluster = _update_sahara_info(ctx, cluster) + if CONF.use_identity_api_v3 and cluster.is_transient: trusts.create_trust_for_cluster(cluster) diff --git a/sahara/service/validations/clusters_scaling.py b/sahara/service/validations/clusters_scaling.py index 9773bab7..22b51457 100644 --- a/sahara/service/validations/clusters_scaling.py +++ b/sahara/service/validations/clusters_scaling.py @@ -19,6 +19,7 @@ import sahara.exceptions as ex from sahara.i18n import _ import sahara.plugins.base as plugin_base import sahara.service.api as api +from sahara.service import ops import sahara.service.validations.base as b import sahara.service.validations.cluster_templates as cl_t @@ -69,6 +70,25 @@ CLUSTER_SCALING_SCHEMA = { def check_cluster_scaling(data, cluster_id, **kwargs): cluster = api.get_cluster(id=cluster_id) + + cluster_engine = cluster.sahara_info.get( + 'infrastructure_engine') if cluster.sahara_info else None + + if (not cluster_engine and + not ops.get_engine_type_and_version().startswith('direct')): + raise ex.InvalidException( + _("Cluster created before Juno release " + "can't be scaled with %(engine)s engine") % + {"engine": ops.get_engine_type_and_version()}) + + if (cluster.sahara_info and + cluster_engine != ops.get_engine_type_and_version()): + raise ex.InvalidException( + _("Cluster created with %(old_engine)s infrastructure engine " + "can't be scaled with %(new_engine)s engine") % + {"old_engine": cluster.sahara_info.get('infrastructure_engine'), + "new_engine": ops.get_engine_type_and_version()}) + if not (plugin_base.PLUGINS.is_plugin_implements(cluster.plugin_name, 'scale_cluster') and ( plugin_base.PLUGINS.is_plugin_implements(cluster.plugin_name, diff --git a/sahara/tests/unit/db/migration/test_migrations.py b/sahara/tests/unit/db/migration/test_migrations.py index aeef8c28..21398037 100644 --- a/sahara/tests/unit/db/migration/test_migrations.py +++ b/sahara/tests/unit/db/migration/test_migrations.py @@ -397,3 +397,6 @@ class TestMigrations(base.BaseWalkMigrationTestCase, base.CommonTestsMixIn): self.assertColumnExists(engine, 'templates_relations', 'auto_security_group') self.assertColumnExists(engine, 'node_groups', 'open_ports') + + def _check_011(self, engine, date): + self.assertColumnExists(engine, 'clusters', 'sahara_info') diff --git a/sahara/tests/unit/service/test_ops.py b/sahara/tests/unit/service/test_ops.py index acf5b1bd..69cb5a13 100644 --- a/sahara/tests/unit/service/test_ops.py +++ b/sahara/tests/unit/service/test_ops.py @@ -62,10 +62,14 @@ class FakeINFRA(): def shutdown_cluster(self, cluster): TestOPS.SEQUENCE.append('shutdown_cluster') + def rollback_cluster(self, cluster, reason): + TestOPS.SEQUENCE.append('rollback_cluster') + class TestOPS(base.SaharaTestCase): SEQUENCE = [] + @mock.patch('sahara.service.ops._update_sahara_info') @mock.patch('sahara.service.ops._prepare_provisioning', return_value=(mock.Mock(), mock.Mock(), FakePlugin())) @mock.patch('sahara.utils.general.change_cluster_status') @@ -76,7 +80,7 @@ class TestOPS(base.SaharaTestCase): @mock.patch('sahara.service.edp.job_manager.run_job') def test_provision_cluster(self, p_run_job, p_job_exec, p_create_trust, p_conf, p_cluster_get, p_change_status, - p_prep_provisioning): + p_prep_provisioning, p_update_sahara_info): del self.SEQUENCE[:] ops.INFRA = FakeINFRA() ops._provision_cluster('123') diff --git a/sahara/tests/unit/service/validation/test_cluster_scaling_validation.py b/sahara/tests/unit/service/validation/test_cluster_scaling_validation.py index 0c2f739b..b56ff13f 100644 --- a/sahara/tests/unit/service/validation/test_cluster_scaling_validation.py +++ b/sahara/tests/unit/service/validation/test_cluster_scaling_validation.py @@ -56,7 +56,9 @@ class TestScalingValidation(u.ValidationTestCase): self.assertEqual(expected_message, six.text_type(e)) raise e - def test_check_cluster_scaling_resize_ng(self): + @mock.patch("sahara.service.ops.get_engine_type_and_version", + return_value="direct.1.1") + def test_check_cluster_scaling_resize_ng(self, engine_version): ng1 = tu.make_ng_dict('ng', '42', ['namenode'], 1) cluster = tu.create_cluster("cluster1", "tenant1", "vanilla", "1.2.1", [ng1], status='Validating', id='12321') @@ -100,7 +102,9 @@ class TestScalingValidation(u.ValidationTestCase): expected_message='Duplicates in node ' 'group names are detected') - def test_check_cluster_scaling_add_ng(self): + @mock.patch("sahara.service.ops.get_engine_type_and_version", + return_value="direct.1.1") + def test_check_cluster_scaling_add_ng(self, engine_version): ng1 = tu.make_ng_dict('ng', '42', ['namenode'], 1) cluster = tu.create_cluster("test-cluster", "tenant", "vanilla", "1.2.1", [ng1], status='Active', @@ -174,7 +178,9 @@ class TestScalingValidation(u.ValidationTestCase): self.assertEqual(req_data.call_count, 1) self._assert_calls(bad_req, bad_req_i) - def test_cluster_scaling_scheme_v_resize_ng(self): + @mock.patch("sahara.service.ops.get_engine_type_and_version", + return_value="direct.1.1") + def test_cluster_scaling_scheme_v_resize_ng(self, engine_version): self._create_object_fun = mock.Mock() data = { } @@ -204,7 +210,9 @@ class TestScalingValidation(u.ValidationTestCase): u"'count' is a required property") ) - def test_cluster_scaling_validation_add_ng(self): + @mock.patch("sahara.service.ops.get_engine_type_and_version", + return_value="direct.1.1") + def test_cluster_scaling_validation_add_ng(self, engine_version): data = { 'add_node_groups': [ { @@ -238,7 +246,9 @@ class TestScalingValidation(u.ValidationTestCase): u"of the given schemas") ) - def test_cluster_scaling_validation_right_schema(self): + @mock.patch("sahara.service.ops.get_engine_type_and_version", + return_value="direct.1.1") + def test_cluster_scaling_validation_right_schema(self, engine_version): data = { 'add_node_groups': [ { @@ -283,7 +293,9 @@ class TestScalingValidation(u.ValidationTestCase): data=data ) - def test_cluster_scaling_scheme_validation_types(self): + @mock.patch("sahara.service.ops.get_engine_type_and_version", + return_value="direct.1.1") + def test_cluster_scaling_scheme_validation_types(self, engine_version): data = { 'resize_node_groups': {}, } @@ -309,7 +321,9 @@ class TestScalingValidation(u.ValidationTestCase): u'[] is too short') ) - def test_cluster_scaling_v_right_data(self): + @mock.patch("sahara.service.ops.get_engine_type_and_version", + return_value="direct.1.1") + def test_cluster_scaling_v_right_data(self, engine_version): self._create_object_fun = c_s.check_cluster_scaling data = { @@ -331,3 +345,29 @@ class TestScalingValidation(u.ValidationTestCase): patchers = u.start_patch() self._assert_cluster_scaling_validation(data=data) u.stop_patch(patchers) + + @mock.patch("sahara.service.ops.get_engine_type_and_version", + return_value="direct.1.1") + def test_check_cluster_scaling_wrong_engine(self, engine_version): + ng1 = tu.make_ng_dict('ng', '42', ['namenode'], 1) + cluster = tu.create_cluster( + "cluster1", "tenant1", "vanilla", "1.2.1", [ng1], + status='Active', id='12321', + sahara_info={"infrastructure_engine": "heat.1.1"}) + + self._assert_check_scaling( + data={}, cluster=cluster, + expected_message="Cluster created with heat.1.1 infrastructure " + "engine can't be scaled with direct.1.1 engine") + + @mock.patch("sahara.service.ops.get_engine_type_and_version", + return_value="heat.1.1") + def test_check_heat_cluster_scaling_missing_engine(self, engine_version): + ng1 = tu.make_ng_dict('ng', '42', ['namenode'], 1) + cluster = tu.create_cluster("cluster1", "tenant1", "vanilla", "1.2.1", + [ng1], status='Active', id='12321') + + self._assert_check_scaling( + data={}, cluster=cluster, + expected_message="Cluster created before Juno release can't be " + "scaled with heat.1.1 engine") diff --git a/sahara/tests/unit/testutils.py b/sahara/tests/unit/testutils.py index 13539131..25faa30c 100644 --- a/sahara/tests/unit/testutils.py +++ b/sahara/tests/unit/testutils.py @@ -20,7 +20,7 @@ from sahara.conductor import resource as r def create_cluster(name, tenant, plugin, version, node_groups, **kwargs): dct = {'name': name, 'tenant_id': tenant, 'plugin_name': plugin, 'hadoop_version': version, 'node_groups': node_groups, - 'cluster_configs': {}} + 'cluster_configs': {}, "sahara_info": {}} dct.update(kwargs) return r.ClusterResource(dct) diff --git a/sahara/utils/remote.py b/sahara/utils/remote.py index 983397d8..55e1fe47 100644 --- a/sahara/utils/remote.py +++ b/sahara/utils/remote.py @@ -56,6 +56,13 @@ class RemoteDriver(object): def get_userdata_template(self): """Returns userdata template preparing instance to work with driver.""" + @abc.abstractmethod + def get_type_and_version(self): + """Returns engine type and version + + Result should be in the form 'type.major.minor'. + """ + @six.add_metaclass(abc.ABCMeta) class Remote(object): @@ -118,6 +125,10 @@ def setup_remote(driver, engine): DRIVER.setup_remote(engine) +def get_remote_type_and_version(): + return DRIVER.get_type_and_version() + + def _check_driver_is_loaded(): if not DRIVER: raise ex.SystemError(_('Remote driver is not loaded. Most probably ' diff --git a/sahara/utils/ssh_remote.py b/sahara/utils/ssh_remote.py index dc01568c..bb3f327a 100644 --- a/sahara/utils/ssh_remote.py +++ b/sahara/utils/ssh_remote.py @@ -481,6 +481,9 @@ class BulkInstanceInteropHelper(InstanceInteropHelper): class SshRemoteDriver(remote.RemoteDriver): + def get_type_and_version(self): + return "ssh.1.0" + def setup_remote(self, engine): global _global_remote_semaphore global INFRA