Fixed cluster scaling in distributed mode

API has no way to get engine version without RPC call. Replaced
direct method call with RPC call.

Change-Id: I8b6db54c7eb4b58167c8266bc58f180ccd2bbe13
Closes-Bug: #1388164
This commit is contained in:
Andrew Lazarev 2014-10-31 15:46:22 -07:00
parent 0f3355a439
commit a75f1fd75b
3 changed files with 42 additions and 37 deletions

View File

@ -47,10 +47,6 @@ def setup_ops(engine):
INFRA = engine INFRA = engine
def get_engine_type_and_version():
return INFRA.get_type_and_version()
class LocalOps(object): class LocalOps(object):
def provision_cluster(self, cluster_id): def provision_cluster(self, cluster_id):
context.spawn("cluster-creating-%s" % cluster_id, context.spawn("cluster-creating-%s" % cluster_id,
@ -76,6 +72,9 @@ class LocalOps(object):
context.spawn("Deleting Job Execution %s" % job_execution_id, context.spawn("Deleting Job Execution %s" % job_execution_id,
_delete_job_execution, job_execution_id) _delete_job_execution, job_execution_id)
def get_engine_type_and_version(self):
return INFRA.get_type_and_version()
class RemoteOps(rpc_utils.RPCClient): class RemoteOps(rpc_utils.RPCClient):
def __init__(self): def __init__(self):
@ -103,6 +102,9 @@ class RemoteOps(rpc_utils.RPCClient):
self.cast('delete_job_execution', self.cast('delete_job_execution',
job_execution_id=job_execution_id) job_execution_id=job_execution_id)
def get_engine_type_and_version(self):
return self.call('get_engine_type_and_version')
class OpsServer(rpc_utils.RPCServer): class OpsServer(rpc_utils.RPCServer):
def __init__(self): def __init__(self):
@ -128,6 +130,9 @@ class OpsServer(rpc_utils.RPCServer):
def delete_job_execution(self, job_execution_id): def delete_job_execution(self, job_execution_id):
_delete_job_execution(job_execution_id) _delete_job_execution(job_execution_id)
def get_engine_type_and_version(self):
return INFRA.get_type_and_version()
def ops_error_handler(f): def ops_error_handler(f):
@functools.wraps(f) @functools.wraps(f)
@ -196,7 +201,7 @@ def _prepare_provisioning(cluster_id):
def _update_sahara_info(ctx, cluster): def _update_sahara_info(ctx, cluster):
sahara_info = { sahara_info = {
'infrastructure_engine': get_engine_type_and_version(), 'infrastructure_engine': INFRA.get_type_and_version(),
'remote': remote.get_remote_type_and_version()} 'remote': remote.get_remote_type_and_version()}
return conductor.cluster_update( return conductor.cluster_update(

View File

@ -19,7 +19,6 @@ import sahara.exceptions as ex
from sahara.i18n import _ from sahara.i18n import _
import sahara.plugins.base as plugin_base import sahara.plugins.base as plugin_base
import sahara.service.api as api import sahara.service.api as api
from sahara.service import ops
import sahara.service.validations.base as b import sahara.service.validations.base as b
import sahara.service.validations.cluster_templates as cl_t import sahara.service.validations.cluster_templates as cl_t
@ -74,20 +73,21 @@ def check_cluster_scaling(data, cluster_id, **kwargs):
cluster_engine = cluster.sahara_info.get( cluster_engine = cluster.sahara_info.get(
'infrastructure_engine') if cluster.sahara_info else None 'infrastructure_engine') if cluster.sahara_info else None
engine_type_and_version = api.OPS.get_engine_type_and_version()
if (not cluster_engine and if (not cluster_engine and
not ops.get_engine_type_and_version().startswith('direct')): not engine_type_and_version.startswith('direct')):
raise ex.InvalidException( raise ex.InvalidException(
_("Cluster created before Juno release " _("Cluster created before Juno release "
"can't be scaled with %(engine)s engine") % "can't be scaled with %(engine)s engine") %
{"engine": ops.get_engine_type_and_version()}) {"engine": engine_type_and_version})
if (cluster.sahara_info and if (cluster.sahara_info and
cluster_engine != ops.get_engine_type_and_version()): cluster_engine != engine_type_and_version):
raise ex.InvalidException( raise ex.InvalidException(
_("Cluster created with %(old_engine)s infrastructure engine " _("Cluster created with %(old_engine)s infrastructure engine "
"can't be scaled with %(new_engine)s engine") % "can't be scaled with %(new_engine)s engine") %
{"old_engine": cluster.sahara_info.get('infrastructure_engine'), {"old_engine": cluster.sahara_info.get('infrastructure_engine'),
"new_engine": ops.get_engine_type_and_version()}) "new_engine": engine_type_and_version})
if not (plugin_base.PLUGINS.is_plugin_implements(cluster.plugin_name, if not (plugin_base.PLUGINS.is_plugin_implements(cluster.plugin_name,
'scale_cluster') and ( 'scale_cluster') and (

View File

@ -56,9 +56,9 @@ class TestScalingValidation(u.ValidationTestCase):
self.assertEqual(expected_message, six.text_type(e)) self.assertEqual(expected_message, six.text_type(e))
raise e raise e
@mock.patch("sahara.service.ops.get_engine_type_and_version", @mock.patch("sahara.service.api.OPS")
return_value="direct.1.1") def test_check_cluster_scaling_resize_ng(self, ops):
def test_check_cluster_scaling_resize_ng(self, engine_version): ops.get_engine_type_and_version.return_value = "direct.1.1"
ng1 = tu.make_ng_dict('ng', '42', ['namenode'], 1) ng1 = tu.make_ng_dict('ng', '42', ['namenode'], 1)
cluster = tu.create_cluster("cluster1", "tenant1", "vanilla", "1.2.1", cluster = tu.create_cluster("cluster1", "tenant1", "vanilla", "1.2.1",
[ng1], status='Validating', id='12321') [ng1], status='Validating', id='12321')
@ -102,9 +102,9 @@ class TestScalingValidation(u.ValidationTestCase):
expected_message='Duplicates in node ' expected_message='Duplicates in node '
'group names are detected') 'group names are detected')
@mock.patch("sahara.service.ops.get_engine_type_and_version", @mock.patch("sahara.service.api.OPS")
return_value="direct.1.1") def test_check_cluster_scaling_add_ng(self, ops):
def test_check_cluster_scaling_add_ng(self, engine_version): ops.get_engine_type_and_version.return_value = "direct.1.1"
ng1 = tu.make_ng_dict('ng', '42', ['namenode'], 1) ng1 = tu.make_ng_dict('ng', '42', ['namenode'], 1)
cluster = tu.create_cluster("test-cluster", "tenant", "vanilla", cluster = tu.create_cluster("test-cluster", "tenant", "vanilla",
"1.2.1", [ng1], status='Active', "1.2.1", [ng1], status='Active',
@ -178,9 +178,9 @@ class TestScalingValidation(u.ValidationTestCase):
self.assertEqual(req_data.call_count, 1) self.assertEqual(req_data.call_count, 1)
self._assert_calls(bad_req, bad_req_i) self._assert_calls(bad_req, bad_req_i)
@mock.patch("sahara.service.ops.get_engine_type_and_version", @mock.patch("sahara.service.api.OPS")
return_value="direct.1.1") def test_cluster_scaling_scheme_v_resize_ng(self, ops):
def test_cluster_scaling_scheme_v_resize_ng(self, engine_version): ops.get_engine_type_and_version.return_value = "direct.1.1"
self._create_object_fun = mock.Mock() self._create_object_fun = mock.Mock()
data = { data = {
} }
@ -210,9 +210,9 @@ class TestScalingValidation(u.ValidationTestCase):
u"'count' is a required property") u"'count' is a required property")
) )
@mock.patch("sahara.service.ops.get_engine_type_and_version", @mock.patch("sahara.service.api.OPS")
return_value="direct.1.1") def test_cluster_scaling_validation_add_ng(self, ops):
def test_cluster_scaling_validation_add_ng(self, engine_version): ops.get_engine_type_and_version.return_value = "direct.1.1"
data = { data = {
'add_node_groups': [ 'add_node_groups': [
{ {
@ -246,9 +246,9 @@ class TestScalingValidation(u.ValidationTestCase):
u"of the given schemas") u"of the given schemas")
) )
@mock.patch("sahara.service.ops.get_engine_type_and_version", @mock.patch("sahara.service.api.OPS")
return_value="direct.1.1") def test_cluster_scaling_validation_right_schema(self, ops):
def test_cluster_scaling_validation_right_schema(self, engine_version): ops.get_engine_type_and_version.return_value = "direct.1.1"
data = { data = {
'add_node_groups': [ 'add_node_groups': [
{ {
@ -293,9 +293,9 @@ class TestScalingValidation(u.ValidationTestCase):
data=data data=data
) )
@mock.patch("sahara.service.ops.get_engine_type_and_version", @mock.patch("sahara.service.api.OPS")
return_value="direct.1.1") def test_cluster_scaling_scheme_validation_types(self, ops):
def test_cluster_scaling_scheme_validation_types(self, engine_version): ops.get_engine_type_and_version.return_value = "direct.1.1"
data = { data = {
'resize_node_groups': {}, 'resize_node_groups': {},
} }
@ -321,9 +321,9 @@ class TestScalingValidation(u.ValidationTestCase):
u'[] is too short') u'[] is too short')
) )
@mock.patch("sahara.service.ops.get_engine_type_and_version", @mock.patch("sahara.service.api.OPS")
return_value="direct.1.1") def test_cluster_scaling_v_right_data(self, ops):
def test_cluster_scaling_v_right_data(self, engine_version): ops.get_engine_type_and_version.return_value = "direct.1.1"
self._create_object_fun = c_s.check_cluster_scaling self._create_object_fun = c_s.check_cluster_scaling
data = { data = {
@ -346,9 +346,9 @@ class TestScalingValidation(u.ValidationTestCase):
self._assert_cluster_scaling_validation(data=data) self._assert_cluster_scaling_validation(data=data)
u.stop_patch(patchers) u.stop_patch(patchers)
@mock.patch("sahara.service.ops.get_engine_type_and_version", @mock.patch("sahara.service.api.OPS")
return_value="direct.1.1") def test_check_cluster_scaling_wrong_engine(self, ops):
def test_check_cluster_scaling_wrong_engine(self, engine_version): ops.get_engine_type_and_version.return_value = "direct.1.1"
ng1 = tu.make_ng_dict('ng', '42', ['namenode'], 1) ng1 = tu.make_ng_dict('ng', '42', ['namenode'], 1)
cluster = tu.create_cluster( cluster = tu.create_cluster(
"cluster1", "tenant1", "vanilla", "1.2.1", [ng1], "cluster1", "tenant1", "vanilla", "1.2.1", [ng1],
@ -360,9 +360,9 @@ class TestScalingValidation(u.ValidationTestCase):
expected_message="Cluster created with heat.1.1 infrastructure " expected_message="Cluster created with heat.1.1 infrastructure "
"engine can't be scaled with direct.1.1 engine") "engine can't be scaled with direct.1.1 engine")
@mock.patch("sahara.service.ops.get_engine_type_and_version", @mock.patch("sahara.service.api.OPS")
return_value="heat.1.1") def test_check_heat_cluster_scaling_missing_engine(self, ops):
def test_check_heat_cluster_scaling_missing_engine(self, engine_version): ops.get_engine_type_and_version.return_value = "heat.1.1"
ng1 = tu.make_ng_dict('ng', '42', ['namenode'], 1) ng1 = tu.make_ng_dict('ng', '42', ['namenode'], 1)
cluster = tu.create_cluster("cluster1", "tenant1", "vanilla", "1.2.1", cluster = tu.create_cluster("cluster1", "tenant1", "vanilla", "1.2.1",
[ng1], status='Active', id='12321') [ng1], status='Active', id='12321')