Cluster scaling implementation

The following is supported:

* Scaling node groups if they contain tasktracker or datanode only
* Add new node group to cluster
* Validation
* Rollback

Implements blueprint manual-cluster-scaling

Change-Id: I3838db4b05afa3583db0d059541c00e75cc6cfe2
This commit is contained in:
Nadya Privalova 2013-07-02 15:12:30 +04:00
parent 07153cc6b3
commit 32855b9efb
7 changed files with 311 additions and 68 deletions

View File

@ -0,0 +1,18 @@
{
"add_node_groups":[
{
"node_group_template_id": "9e1225e0-b7ce-425b-9555-3a61166167c8",
"count":1,
"name": "test_ng"
}
],
"resize_node_groups":[
{
"name" : "worker",
"count":2
}
]
}

View File

@ -40,18 +40,18 @@ def clusters_create(data):
return u.render(api.create_cluster(data).wrapped_dict) return u.render(api.create_cluster(data).wrapped_dict)
@rest.put('/clusters/<cluster_id>')
@v.check_exists(api.get_cluster, 'cluster_id')
def clusters_scale(cluster_id, data):
return u.render(api.scale_cluster(cluster_id, data).wrapped_dict)
@rest.get('/clusters/<cluster_id>') @rest.get('/clusters/<cluster_id>')
@v.check_exists(api.get_cluster, 'cluster_id') @v.check_exists(api.get_cluster, 'cluster_id')
def clusters_get(cluster_id): def clusters_get(cluster_id):
return u.render(api.get_cluster(id=cluster_id).wrapped_dict) return u.render(api.get_cluster(id=cluster_id).wrapped_dict)
@rest.put('/clusters/<cluster_id>')
@v.check_exists(api.get_cluster, 'cluster_id')
def clusters_update(cluster_id):
return _not_implemented()
@rest.delete('/clusters/<cluster_id>') @rest.delete('/clusters/<cluster_id>')
@v.check_exists(api.get_cluster, 'cluster_id') @v.check_exists(api.get_cluster, 'cluster_id')
def clusters_delete(cluster_id): def clusters_delete(cluster_id):

View File

@ -49,6 +49,9 @@ class ProvisioningPluginBase(plugins_base.PluginInterface):
def validate(self, cluster): def validate(self, cluster):
pass pass
def validate_scaling(self, cluster, existing, additional):
pass
def update_infra(self, cluster): def update_infra(self, cluster):
pass pass
@ -60,6 +63,9 @@ class ProvisioningPluginBase(plugins_base.PluginInterface):
def start_cluster(self, cluster): def start_cluster(self, cluster):
pass pass
def scale_cluster(self, cluster, instances):
pass
def convert(self, hadoop_version, config_file): def convert(self, hadoop_version, config_file):
pass pass

View File

@ -34,3 +34,19 @@ class TaskTrackersWithoutJobTracker(e.SavannaException):
def __init__(self): def __init__(self):
self.message = "TaskTrackers cannot be configures without JobTracker" self.message = "TaskTrackers cannot be configures without JobTracker"
self.code = "TASK_TRACKERS_WITHOUT_JOB_TRACKER" self.code = "TASK_TRACKERS_WITHOUT_JOB_TRACKER"
class NodeGroupsDoNotExist(e.SavannaException):
def __init__(self, ng_names):
names = ''.join(ng_names)
self.message = "Cluster does not contain required node groups " +\
names
self.code = "NODE_GROUP_DOES_NOT_EXIST"
class NodeGroupCannotBeScaled(e.SavannaException):
def __init__(self, ng_name):
self.message = "Chosen node group " + ng_name + " cannot be " \
"scaled : Vanilla plugin supports only datanode and " \
"tasktracker scaling"
self.code = "NODE_GROUP_CANNOT_BE_SCALED"

View File

@ -74,15 +74,14 @@ class VanillaProvider(p.ProvisioningPluginBase):
pass pass
def configure_cluster(self, cluster): def configure_cluster(self, cluster):
for ng in cluster.node_groups:
for inst in ng.instances:
inst.remote.execute_command(
'sudo chown -R $USER:$USER /etc/hadoop'
)
self._extract_configs(cluster) self._extract_configs(cluster)
self._push_configs_to_nodes(cluster) self._push_configs_to_nodes(cluster)
self._write_hadoop_user_keys(cluster) self._write_hadoop_user_keys(cluster.private_key,
utils.get_instances(cluster))
nn = utils.get_namenode(cluster)
nn.remote.execute_command(
"sudo su -c 'hadoop namenode -format' hadoop")
def start_cluster(self, cluster): def start_cluster(self, cluster):
nn_instance = utils.get_namenode(cluster) nn_instance = utils.get_namenode(cluster)
@ -121,26 +120,76 @@ class VanillaProvider(p.ProvisioningPluginBase):
) )
} }
def _push_configs_to_nodes(self, cluster): def validate_scaling(self, cluster, existing, additional):
ng_names = existing.copy()
allowed = ["datanode", "tasktracker"]
#validate existing n_g scaling at first:
for ng in cluster.node_groups: for ng in cluster.node_groups:
if ng.name in ng_names:
del ng_names[ng.name]
if not set(ng.node_processes).issubset(allowed):
raise ex.NodeGroupCannotBeScaled(ng.name)
if len(ng_names) != 0:
raise ex.NodeGroupsDoNotExist(ng_names.keys())
#validate additional n_g
jt = utils.get_jobtracker(cluster)
nn = utils.get_namenode(cluster)
for ng in additional:
if (not set(ng.node_processes).issubset(allowed)) or (
not jt and 'tasktracker' in ng.node_processes) or (
not nn and 'datanode' in ng.node_processes):
raise ex.NodeGroupCannotBeScaled(ng.name)
def scale_cluster(self, cluster, instances):
self._extract_configs(cluster)
self._push_configs_to_nodes(cluster, instances=instances)
self._write_hadoop_user_keys(cluster.private_key,
instances)
for i in instances:
with i.remote as remote:
if "datanode" in i.node_group.node_processes:
remote.execute_command('sudo su -c '
'"/usr/sbin/hadoop-daemon.sh '
'start datanode" hadoop'
'>> /tmp/savanna-start-datanode.log'
' 2>&1')
if "tasktracker" in i.node_group.node_processes:
remote.execute_command('sudo su -c '
'"/usr/sbin/hadoop-daemon.sh '
'start tasktracker" hadoop'
'>> /tmp/savanna-start-'
'tasktracker.log 2>&1')
def _push_configs_to_nodes(self, cluster, instances=None):
if not instances:
instances = utils.get_instances(cluster)
for inst in instances:
files = { files = {
'/etc/hadoop/core-site.xml': ng.extra['xml']['core-site'], '/etc/hadoop/core-site.xml': inst.node_group.extra['xml'][
'/etc/hadoop/mapred-site.xml': ng.extra['xml']['mapred-site'], 'core-site'],
'/etc/hadoop/hdfs-site.xml': ng.extra['xml']['hdfs-site'], '/etc/hadoop/mapred-site.xml': inst.node_group.extra['xml'][
'/tmp/savanna-hadoop-init.sh': ng.extra['setup_script'] 'mapred-site'],
'/etc/hadoop/hdfs-site.xml': inst.node_group.extra['xml'][
'hdfs-site'],
'/tmp/savanna-hadoop-init.sh': inst.node_group.extra[
'setup_script']
} }
for inst in ng.instances: with inst.remote as r:
inst.remote.write_files_to(files) r.execute_command(
inst.remote.execute_command( 'sudo chown -R $USER:$USER /etc/hadoop'
)
r.write_files_to(files)
r.execute_command(
'sudo chmod 0500 /tmp/savanna-hadoop-init.sh' 'sudo chmod 0500 /tmp/savanna-hadoop-init.sh'
) )
inst.remote.execute_command( r.execute_command(
'sudo /tmp/savanna-hadoop-init.sh ' 'sudo /tmp/savanna-hadoop-init.sh '
'>> /tmp/savanna-hadoop-init.log 2>&1') '>> /tmp/savanna-hadoop-init.log 2>&1')
nn = utils.get_namenode(cluster) nn = utils.get_namenode(cluster)
jt = utils.get_jobtracker(cluster) jt = utils.get_jobtracker(cluster)
nn.remote.write_files_to({ nn.remote.write_files_to({
'/etc/hadoop/slaves': utils.generate_host_names( '/etc/hadoop/slaves': utils.generate_host_names(
utils.get_datanodes(cluster)), utils.get_datanodes(cluster)),
@ -148,9 +197,6 @@ class VanillaProvider(p.ProvisioningPluginBase):
utils.get_secondarynamenodes(cluster)) utils.get_secondarynamenodes(cluster))
}) })
nn.remote.execute_command(
"sudo su -c 'hadoop namenode -format' hadoop")
if jt and nn.instance_id != jt.instance_id: if jt and nn.instance_id != jt.instance_id:
jt.remote.write_file_to('/etc/hadoop/slaves', jt.remote.write_file_to('/etc/hadoop/slaves',
utils.generate_host_names( utils.generate_host_names(
@ -171,9 +217,8 @@ class VanillaProvider(p.ProvisioningPluginBase):
'Web UI': 'http://%s:50070' % nn.management_ip 'Web UI': 'http://%s:50070' % nn.management_ip
} }
def _write_hadoop_user_keys(self, cluster): def _write_hadoop_user_keys(self, private_key, instances):
private_key = cluster.private_key public_key = crypto.private_key_to_public_key(private_key)
public_key = crypto.private_key_to_public_key(cluster.private_key)
files = { files = {
'id_rsa': private_key, 'id_rsa': private_key,
@ -185,8 +230,7 @@ class VanillaProvider(p.ProvisioningPluginBase):
'sudo chown -R hadoop:hadoop /home/hadoop/.ssh; ' \ 'sudo chown -R hadoop:hadoop /home/hadoop/.ssh; ' \
'sudo chmod 600 /home/hadoop/.ssh/{id_rsa,authorized_keys}' 'sudo chmod 600 /home/hadoop/.ssh/{id_rsa,authorized_keys}'
for node_group in cluster.node_groups: for instance in instances:
for instance in node_group.instances: with instance.remote as remote:
with instance.remote as remote: remote.write_files_to(files)
remote.write_files_to(files) remote.execute_command(mv_cmd)
remote.execute_command(mv_cmd)

View File

@ -33,6 +33,41 @@ get_clusters = s.get_clusters
get_cluster = s.get_cluster get_cluster = s.get_cluster
def scale_cluster(cluster_id, data):
cluster = get_cluster(id=cluster_id)
plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name)
existing_node_groups = data.get('resize_node_groups', [])
additional_node_groups = data.get('add_node_groups', [])
#the next map is the main object we will work with
#to_be_enlarged : {node_group_name: desired_amount_of_instances}
to_be_enlarged = {}
for ng in existing_node_groups:
to_be_enlarged.update({ng['name']: ng['count']})
additional = construct_ngs_for_scaling(additional_node_groups)
try:
cluster.status = 'Validating'
context.model_save(cluster)
_validate_cluster(cluster, plugin, additional)
plugin.validate_scaling(cluster, to_be_enlarged, additional)
except Exception:
with excutils.save_and_reraise_exception():
cluster.status = 'Active'
context.model_save(cluster)
# If we are here validation is successful.
# So let's update bd and to_be_enlarged map:
for add_n_g in additional:
cluster.node_groups.append(add_n_g)
to_be_enlarged.update({add_n_g.name: additional[add_n_g]})
context.model_save(cluster)
context.spawn(_provision_nodes, cluster_id, to_be_enlarged)
return cluster
def create_cluster(values): def create_cluster(values):
cluster = s.create_cluster(values) cluster = s.create_cluster(values)
plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name) plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name)
@ -55,6 +90,24 @@ def create_cluster(values):
return cluster return cluster
#node_group_names_map = {node_group_name:desired_amount_of_instances}
def _provision_nodes(cluster_id, node_group_names_map):
cluster = get_cluster(id=cluster_id)
plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name)
cluster.status = 'Scaling'
context.model_save(cluster)
instances = i.scale_cluster(cluster, node_group_names_map)
cluster.status = 'Configuring'
context.model_save(cluster)
plugin.scale_cluster(cluster, instances)
# cluster is now up and ready
cluster.status = 'Active'
context.model_save(cluster)
def _provision_cluster(cluster_id): def _provision_cluster(cluster_id):
cluster = get_cluster(id=cluster_id) cluster = get_cluster(id=cluster_id)
plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name) plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name)
@ -145,27 +198,60 @@ def convert_to_cluster_template(plugin_name, version, config_file):
def _validate(cluster, plugin): def _validate(cluster, plugin):
# Validate that user configs are not included in plugin configs set # Validate that user configs are not included in plugin configs set
pl_confs = _get_plugin_configs(cluster, plugin)
for ng in cluster.node_groups:
_validate_node_group(pl_confs, ng)
def _validate_cluster(cluster, plugin, node_groups):
# Validate that user configs are not included in plugin configs set
pl_confs = _get_plugin_configs(cluster, plugin)
for ng in node_groups:
ng.cluster = cluster
_validate_node_group(pl_confs, ng)
ng.cluster = None
def _get_plugin_configs(cluster, plugin):
pl_confs = {} pl_confs = {}
for config in plugin.get_configs(cluster.hadoop_version): for config in plugin.get_configs(cluster.hadoop_version):
if pl_confs.get(config.applicable_target): if pl_confs.get(config.applicable_target):
pl_confs[config.applicable_target].append(config.name) pl_confs[config.applicable_target].append(config.name)
else: else:
pl_confs[config.applicable_target] = [config.name] pl_confs[config.applicable_target] = [config.name]
return pl_confs
for ng in cluster.node_groups:
for app_target, configs in ng.configuration.items():
if app_target not in pl_confs:
raise RuntimeError("Plugin doesn't contain applicable "
"target '%s'" % app_target)
for name, values in configs.items():
if name not in pl_confs[app_target]:
raise RuntimeError("Plugin's applicable target '%s' "
"doesn't contain config with name '%s'"
% (app_target, name))
def _validate_node_group(pl_confs, node_group):
for app_target, configs in node_group.configuration.items():
if app_target not in pl_confs:
raise RuntimeError("Plugin doesn't contain applicable "
"target '%s'" % app_target)
for name, values in configs.items():
if name not in pl_confs[app_target]:
raise RuntimeError("Plugin's applicable target '%s' "
"doesn't contain config with name '%s'"
% (app_target, name))
def construct_ngs_for_scaling(additional_node_groups):
additional = {}
for ng in additional_node_groups:
tmpl_id = ng['node_group_template_id']
count = ng['count']
if tmpl_id:
tmpl = get_node_group_template(id=tmpl_id)
node_group = tmpl.to_object(ng, m.NodeGroup)
else:
node_group = m.NodeGroup(**ng)
#need to set 0 because tmpl.to_object overwrote count
node_group.count = 0
additional.update({node_group: count})
return additional
## Image Registry ## Image Registry
def get_images(tags): def get_images(tags):
return nova.client().images.list_registered(tags) return nova.client().images.list_registered(tags)

View File

@ -46,30 +46,81 @@ def create_cluster(cluster):
_rollback_cluster_creation(cluster, ex) _rollback_cluster_creation(cluster, ex)
#node_group_names: {node_group_name:desired_amount_of_instances}
def scale_cluster(cluster, node_group_names_map):
#Now let's work with real node_groups, not names:
node_groups_map = {}
for ng in cluster.node_groups:
if ng.name in node_group_names_map:
node_groups_map.update({ng: node_group_names_map[ng.name]})
try:
instances_list = _create_cluster_instances(
cluster, node_groups_map)
_await_instances(cluster)
except Exception as ex:
LOG.warn("Can't scale cluster: %s", ex)
with excutils.save_and_reraise_exception():
_rollback_cluster_scaling(instances_list)
instances_list = []
cluster.status = 'Active'
context.model_save(cluster)
# we should be here with valid cluster: if instances creation
# was not successful all extra-instances will be removed above
_configure_instances(cluster)
return instances_list
def _create_instances(cluster): def _create_instances(cluster):
"""Create all instances using nova client and persist them into DB."""
session = context.ctx().session
aa_groups = _generate_anti_affinity_groups(cluster) aa_groups = _generate_anti_affinity_groups(cluster)
for node_group in cluster.node_groups: for node_group in cluster.node_groups:
userdata = _generate_user_data_script(node_group) userdata = _generate_user_data_script(node_group)
for idx in xrange(1, node_group.count + 1): for idx in xrange(1, node_group.count + 1):
name = '%s-%s-%03d' % (cluster.name, node_group.name, idx) _run_instance(cluster, node_group, idx, aa_groups, userdata)
aa_group = node_group.anti_affinity_group
ids = aa_groups[aa_group]
hints = {'different_host': list(ids)} if ids else None
nova_instance = nova.client().servers.create(
name, node_group.get_image_id(), node_group.flavor_id,
scheduler_hints=hints, userdata=userdata,
key_name=cluster.user_keypair_id)
with session.begin(): def _create_cluster_instances(cluster, node_groups_map):
instance = m.Instance(node_group.id, nova_instance.id, name) aa_groups = _generate_anti_affinity_groups(cluster)
node_group.instances.append(instance) instances = []
session.add(instance) for node_group in node_groups_map:
count = node_groups_map[node_group]
userdata = _generate_user_data_script(node_group)
for idx in xrange(node_group.count + 1, count + 1):
instance = _run_instance(cluster, node_group, idx,
aa_groups, userdata)
instances.append(instance)
if aa_group: node_group.count = count
aa_groups[aa_group].append(nova_instance.id) context.model_save(node_group)
context.model_save(cluster)
return instances
#extracted method. Return
def _run_instance(cluster, node_group, idx, aa_groups, userdata):
"""Create instance using nova client and persist them into DB."""
session = context.ctx().session
name = '%s-%s-%03d' % (cluster.name, node_group.name, idx)
aa_group = node_group.anti_affinity_group
ids = aa_groups[aa_group]
hints = {'different_host': list(ids)} if ids else None
context.model_save(node_group)
nova_instance = nova.client().servers.create(
name, node_group.get_image_id(), node_group.flavor_id,
scheduler_hints=hints, userdata=userdata,
key_name=cluster.user_keypair_id)
with session.begin():
instance = m.Instance(node_group.id, nova_instance.id, name)
node_group.instances.append(instance)
session.add(instance)
if aa_group:
aa_groups[aa_group].append(nova_instance.id)
return instance
def _generate_user_data_script(node_group): def _generate_user_data_script(node_group):
@ -77,7 +128,6 @@ def _generate_user_data_script(node_group):
echo "%(public_key)s" >> %(user_home)s/.ssh/authorized_keys echo "%(public_key)s" >> %(user_home)s/.ssh/authorized_keys
echo "%(private_key)s" > %(user_home)s/.ssh/id_rsa echo "%(private_key)s" > %(user_home)s/.ssh/id_rsa
""" """
cluster = node_group.cluster cluster = node_group.cluster
if node_group.username == "root": if node_group.username == "root":
user_home = "/root/" user_home = "/root/"
@ -169,13 +219,8 @@ def _rollback_cluster_creation(cluster, ex):
"""Shutdown all instances and update cluster status.""" """Shutdown all instances and update cluster status."""
# update cluster status # update cluster status
# update cluster status description # update cluster status description
_shutdown_instances(cluster, True)
def _shutdown_instances(cluster, quiet=False):
"""Shutdown all instances related to the specified cluster."""
session = context.ctx().session session = context.ctx().session
_shutdown_instances(cluster, True)
alive_instances = set([srv.id for srv in nova.client().servers.list()]) alive_instances = set([srv.id for srv in nova.client().servers.list()])
for node_group in cluster.node_groups: for node_group in cluster.node_groups:
@ -186,6 +231,34 @@ def _shutdown_instances(cluster, quiet=False):
session.delete(instance) session.delete(instance)
def _rollback_cluster_scaling(instances):
#if some nodes are up we should shut them down and update "count" in
# node_group
session = context.ctx().session
for i in instances:
ng = i.node_group
_shutdown_instance(i)
ng.count -= 1
context.model_save(ng)
if ng.count == 0:
with session.begin():
session.delete(ng)
def _shutdown_instances(cluster, quiet=False):
for node_group in cluster.node_groups:
for instance in node_group.instances:
_shutdown_instance(instance)
def _shutdown_instance(instance):
session = context.ctx().session
nova.client().servers.delete(instance.instance_id)
with session.begin():
instance.node_group = None
session.delete(instance)
def shutdown_cluster(cluster): def shutdown_cluster(cluster):
"""Shutdown specified cluster and all related resources.""" """Shutdown specified cluster and all related resources."""
_shutdown_instances(cluster) _shutdown_instances(cluster)