Merge "Enable the scaling up of nodes in a cluster"
This commit is contained in:
commit
6699fe9943
@ -16,7 +16,9 @@
|
||||
import os
|
||||
import requests
|
||||
|
||||
from savanna import conductor
|
||||
from savanna import context
|
||||
from savanna import exceptions as exc
|
||||
from savanna.openstack.common import jsonutils as json
|
||||
from savanna.openstack.common import log as logging
|
||||
from savanna.openstack.common import uuidutils
|
||||
@ -29,6 +31,7 @@ from savanna.plugins.hdp import savannautils as utils
|
||||
from savanna.plugins.hdp import validator as v
|
||||
from savanna.plugins import provisioning as p
|
||||
|
||||
conductor = conductor.API
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@ -350,7 +353,8 @@ class AmbariPlugin(p.ProvisioningPluginBase):
|
||||
if result.status_code == 202:
|
||||
#TODO(jspeidel) don't hard code request id
|
||||
success = self._wait_for_async_request(
|
||||
1, cluster_name, ambari_info)
|
||||
self._get_async_request_uri(ambari_info, cluster_name, 1),
|
||||
auth=(ambari_info.user, ambari_info.password))
|
||||
if success:
|
||||
LOG.info("Install of Hadoop stack successful.")
|
||||
self._finalize_ambari_state(ambari_info)
|
||||
@ -358,20 +362,22 @@ class AmbariPlugin(p.ProvisioningPluginBase):
|
||||
LOG.critical('Install command failed.')
|
||||
raise RuntimeError('Hadoop service install failed')
|
||||
else:
|
||||
LOG.critical(
|
||||
LOG.error(
|
||||
'Install command failed. {0}'.format(result.text))
|
||||
raise RuntimeError('Hadoop service install failed')
|
||||
|
||||
return success
|
||||
|
||||
def _wait_for_async_request(self, request_id, cluster_name, ambari_info):
|
||||
request_url = 'http://{0}/api/v1/clusters/{1}/requests/{' \
|
||||
'2}/tasks?fields=Tasks/status'.format(
|
||||
ambari_info.get_address(), cluster_name, request_id)
|
||||
def _get_async_request_uri(self, ambari_info, cluster_name, request_id):
|
||||
return 'http://{0}/api/v1/clusters/{1}/requests/{' \
|
||||
'2}/tasks?fields=Tasks/status'.format(
|
||||
ambari_info.get_address(), cluster_name,
|
||||
request_id)
|
||||
|
||||
def _wait_for_async_request(self, request_url, auth):
|
||||
started = False
|
||||
while not started:
|
||||
result = requests.get(request_url, auth=(
|
||||
ambari_info.user, ambari_info.password))
|
||||
result = requests.get(request_url, auth=auth)
|
||||
LOG.debug(
|
||||
'async request ' + request_url + ' response:\n' + result.text)
|
||||
json_result = json.loads(result.text)
|
||||
@ -396,12 +402,13 @@ class AmbariPlugin(p.ProvisioningPluginBase):
|
||||
ambari_info.get_address(), cluster_name)
|
||||
body = '{"ServiceInfo": {"state" : "STARTED"}}'
|
||||
|
||||
result = requests.put(start_url, data=body, auth=(
|
||||
ambari_info.user, ambari_info.password))
|
||||
auth = (ambari_info.user, ambari_info.password)
|
||||
result = requests.put(start_url, data=body, auth=auth)
|
||||
if result.status_code == 202:
|
||||
# don't hard code request id
|
||||
success = self._wait_for_async_request(
|
||||
2, cluster_name, ambari_info)
|
||||
self._get_async_request_uri(ambari_info, cluster_name, 2),
|
||||
auth=auth)
|
||||
if success:
|
||||
LOG.info(
|
||||
"Successfully started Hadoop cluster '{0}'.".format(
|
||||
@ -416,13 +423,98 @@ class AmbariPlugin(p.ProvisioningPluginBase):
|
||||
format(result.status_code, result.text))
|
||||
raise RuntimeError('Hadoop cluster start failed.')
|
||||
|
||||
def _install_components(self, ambari_info, auth, cluster_name, servers):
|
||||
LOG.info('Starting Hadoop components while scaling up')
|
||||
LOG.info('Cluster name {0}, Ambari server ip {1}'
|
||||
.format(cluster_name, ambari_info.get_address()))
|
||||
# query for the host components on the given hosts that are in the
|
||||
# INIT state
|
||||
body = '{"HostRoles": {"state" : "INSTALLED"}}'
|
||||
install_uri = 'http://{0}/api/v1/clusters/{' \
|
||||
'1}/host_components?HostRoles/state=INIT&' \
|
||||
'HostRoles/host_name.in({2})'.format(
|
||||
ambari_info.get_address(),
|
||||
cluster_name,
|
||||
self._get_host_list(servers))
|
||||
self._exec_ambari_command(auth, body, install_uri)
|
||||
|
||||
def _start_components(self, ambari_info, auth, cluster_name, servers):
|
||||
# query for all the host components on one of the hosts in the
|
||||
# INSTALLED state, then get a list of the client services in the list
|
||||
installed_uri = 'http://{0}/api/v1/clusters/{' \
|
||||
'1}/host_components?HostRoles/state=INSTALLED&' \
|
||||
'HostRoles/host_name.in({2})' \
|
||||
.format(ambari_info.get_address(), cluster_name,
|
||||
self._get_host_list(servers))
|
||||
result = requests.get(installed_uri, auth=auth)
|
||||
if result.status_code == 200:
|
||||
LOG.debug(
|
||||
'GET response: {0}'.format(result.text))
|
||||
json_result = json.loads(result.text)
|
||||
items = json_result['items']
|
||||
# select non-CLIENT items
|
||||
inclusion_list = list(set([x['HostRoles']['component_name']
|
||||
for x in items if "CLIENT" not in
|
||||
x['HostRoles']['component_name']]))
|
||||
|
||||
# query and start all non-client components on the given set of
|
||||
# hosts
|
||||
body = '{"HostRoles": {"state" : "STARTED"}}'
|
||||
start_uri = 'http://{0}/api/v1/clusters/{' \
|
||||
'1}/host_components?HostRoles/state=INSTALLED&' \
|
||||
'HostRoles/host_name.in({2})' \
|
||||
'&HostRoles/component_name.in({3})'.format(
|
||||
ambari_info.get_address(), cluster_name,
|
||||
self._get_host_list(servers),
|
||||
",".join(inclusion_list))
|
||||
self._exec_ambari_command(auth, body, start_uri)
|
||||
else:
|
||||
raise RuntimeError('Unable to determine installed service '
|
||||
'components in scaled instances. status'
|
||||
' code returned = {0}'.format(result.status))
|
||||
|
||||
def _install_and_start_components(self, cluster_name, servers,
|
||||
ambari_info):
|
||||
auth = (ambari_info.user, ambari_info.password)
|
||||
|
||||
self._install_components(ambari_info, auth, cluster_name, servers)
|
||||
|
||||
self._start_components(ambari_info, auth, cluster_name, servers)
|
||||
|
||||
def _exec_ambari_command(self, auth, body, cmd_uri):
|
||||
|
||||
LOG.debug('PUT URI: {0}'.format(cmd_uri))
|
||||
result = requests.put(cmd_uri, data=body,
|
||||
auth=auth)
|
||||
if result.status_code == 202:
|
||||
# don't hard code request id
|
||||
LOG.debug(
|
||||
'PUT response: {0}'.format(result.text))
|
||||
json_result = json.loads(result.text)
|
||||
href = json_result['href'] + '/tasks?fields=Tasks/status'
|
||||
success = self._wait_for_async_request(href, auth)
|
||||
if success:
|
||||
LOG.info(
|
||||
"Successfully changed state of Hadoop components ")
|
||||
else:
|
||||
LOG.critical('Failed to change state of Hadoop '
|
||||
'components')
|
||||
raise RuntimeError('Failed to change state of Hadoop '
|
||||
'components')
|
||||
|
||||
else:
|
||||
LOG.error(
|
||||
'Command failed. Status: {0}, response: {1}'.
|
||||
format(result.status_code, result.text))
|
||||
raise RuntimeError('Hadoop/Ambari command failed.')
|
||||
|
||||
def _get_default_cluster_configuration(self):
|
||||
with open(os.path.join(os.path.dirname(__file__), 'resources',
|
||||
'default-cluster.template'), 'r') as f:
|
||||
return clusterspec.ClusterSpec(f.read())
|
||||
|
||||
def _set_cluster_info(self, cluster, cluster_spec, hosts, ambari_info):
|
||||
info = cluster.info
|
||||
info = {}
|
||||
|
||||
try:
|
||||
jobtracker_ip = self._determine_host_for_server_component(
|
||||
@ -448,6 +540,9 @@ class AmbariPlugin(p.ProvisioningPluginBase):
|
||||
'Web UI': 'http://%s' % ambari_info.get_address()
|
||||
}
|
||||
|
||||
ctx = context.ctx()
|
||||
conductor.cluster_update(ctx, cluster, {'info': info})
|
||||
|
||||
def _finalize_ambari_state(self, ambari_info):
|
||||
LOG.info('Finalizing Ambari cluster state.')
|
||||
|
||||
@ -543,16 +638,19 @@ class AmbariPlugin(p.ProvisioningPluginBase):
|
||||
return requests
|
||||
|
||||
# SAVANNA PLUGIN SPI METHODS:
|
||||
def configure_cluster(self, cluster):
|
||||
# take the user inputs from the cluster and node groups and convert
|
||||
# to a ambari blueprint
|
||||
|
||||
def _get_blueprint_processor(self, cluster):
|
||||
processor = bp.BlueprintProcessor(json.load(
|
||||
open(os.path.join(os.path.dirname(__file__), 'resources',
|
||||
'default-cluster.template'), "r")))
|
||||
processor.process_user_inputs(self._map_to_user_inputs(
|
||||
'1.3.0', cluster.cluster_configs))
|
||||
processor.process_node_groups(cluster.node_groups)
|
||||
return processor
|
||||
|
||||
def configure_cluster(self, cluster):
|
||||
# take the user inputs from the cluster and node groups and convert
|
||||
# to a ambari blueprint
|
||||
processor = self._get_blueprint_processor(cluster)
|
||||
# NOTE: for the time being we are going to ignore the node group
|
||||
# level configurations. we are not currently
|
||||
# defining node level configuration items (i.e. scope='cluster' in
|
||||
@ -598,6 +696,56 @@ class AmbariPlugin(p.ProvisioningPluginBase):
|
||||
validator = v.Validator()
|
||||
validator.validate(cluster)
|
||||
|
||||
def scale_cluster(self, cluster, instances):
|
||||
processor = self._get_blueprint_processor(cluster)
|
||||
cluster_spec = clusterspec.ClusterSpec(
|
||||
json.dumps(processor.blueprint), cluster=cluster)
|
||||
rpm = self._get_rpm_uri(cluster_spec)
|
||||
|
||||
servers = []
|
||||
for instance in instances:
|
||||
host_role = utils.get_host_role(instance)
|
||||
servers.append(h.HadoopServer(instance,
|
||||
cluster_spec.node_groups
|
||||
[host_role],
|
||||
ambari_rpm=rpm))
|
||||
|
||||
ambari_info = self.get_ambari_info(cluster_spec,
|
||||
self._get_servers(cluster))
|
||||
|
||||
for server in servers:
|
||||
self._spawn('Ambari provisioning thread',
|
||||
server.provision_ambari, ambari_info)
|
||||
|
||||
self._wait_for_host_registrations(self._get_num_hosts(cluster),
|
||||
ambari_info)
|
||||
|
||||
# now add the hosts and the component
|
||||
self._add_hosts_and_components(cluster_spec, servers,
|
||||
ambari_info, cluster.name)
|
||||
|
||||
self._install_and_start_components(cluster.name, servers, ambari_info)
|
||||
|
||||
def decommission_nodes(self, cluster, instances):
|
||||
raise exc.InvalidException('The HDP plugin does not yet support the '
|
||||
'decommissioning of nodes')
|
||||
|
||||
def validate_scaling(self, cluster, existing, additional):
|
||||
# see if additional servers are slated for "MASTER" group
|
||||
validator = v.Validator()
|
||||
validator.validate_scaling(cluster, existing, additional)
|
||||
|
||||
def _get_num_hosts(self, cluster):
|
||||
count = 0
|
||||
for node_group in cluster.node_groups:
|
||||
count += node_group.count
|
||||
|
||||
return count
|
||||
|
||||
def _get_host_list(self, servers):
|
||||
host_list = [server.instance.fqdn.lower() for server in servers]
|
||||
return ",".join(host_list)
|
||||
|
||||
def _get_rpm_uri(self, cluster_spec):
|
||||
ambari_config = cluster_spec.configurations['ambari']
|
||||
return ambari_config.get('rpm', None)
|
||||
|
@ -31,8 +31,13 @@ class HadoopServer:
|
||||
def __init__(self, instance, node_group, ambari_rpm=None):
|
||||
self.instance = instance
|
||||
self.node_group = node_group
|
||||
self._ssh = self._connect_to_vm()
|
||||
self.ambari_rpm = ambari_rpm or AMBARI_RPM
|
||||
self._remote = remote.get_remote(instance)
|
||||
|
||||
def _connect_to_vm(self):
|
||||
LOG.info(
|
||||
'Connecting to VM: {0}'.format(self.instance.management_ip))
|
||||
return remote.get_remote(self.instance).ssh_connection()
|
||||
|
||||
def provision_ambari(self, ambari_info):
|
||||
self.install_rpms()
|
||||
@ -48,13 +53,13 @@ class HadoopServer:
|
||||
|
||||
#TODO(jspeidel): based on image type, use correct command
|
||||
rpm_cmd = 'rpm -Uvh ' + self.ambari_rpm
|
||||
self._remote.execute_command(rpm_cmd)
|
||||
self._remote.execute_command('yum -y install epel-release')
|
||||
self._execute_on_vm(rpm_cmd)
|
||||
self._execute_on_vm('yum -y install epel-release')
|
||||
|
||||
def _setup_and_start_ambari_server(self, port):
|
||||
LOG.info(
|
||||
'{0}: Installing ambari-server ...'.format(self.instance.hostname))
|
||||
self._remote.execute_command('yum -y install ambari-server')
|
||||
self._execute_on_vm('yum -y install ambari-server')
|
||||
|
||||
LOG.info('Running Ambari Server setup ...')
|
||||
self._execute_on_vm_interactive(
|
||||
@ -64,7 +69,7 @@ class HadoopServer:
|
||||
self._configure_ambari_server_api_port(port)
|
||||
|
||||
LOG.info('Starting Ambari ...')
|
||||
self._remote.execute_command('ambari-server start')
|
||||
self._execute_on_vm('ambari-server start')
|
||||
|
||||
def _configure_ambari_server_api_port(self, port):
|
||||
# do nothing if port is not specified or is default
|
||||
@ -74,28 +79,27 @@ class HadoopServer:
|
||||
ambari_config_file = '/etc/ambari-server/conf/ambari.properties'
|
||||
LOG.debug('Configuring Ambari Server API port: {0}'.format(port))
|
||||
# read the current contents
|
||||
data = self._remote.read_file_from(ambari_config_file)
|
||||
data = remote.read_file_from(self._ssh.open_sftp(), ambari_config_file)
|
||||
data = '{0}\nclient.api.port={1}\n'.format(data, port)
|
||||
|
||||
# write the file back
|
||||
self._remote.write_file_to(ambari_config_file, data)
|
||||
remote.write_file_to(self._ssh.open_sftp(), ambari_config_file, data)
|
||||
|
||||
def _setup_and_start_ambari_agent(self, ambari_server_ip):
|
||||
LOG.info(
|
||||
'{0}: Installing Ambari Agent ...'.format(self.instance.hostname))
|
||||
|
||||
self._remote.execute_command('yum -y install ambari-agent')
|
||||
|
||||
self._execute_on_vm('yum -y install ambari-agent')
|
||||
LOG.debug(
|
||||
'{0}: setting master-ip: {1} in ambari-agent.ini'.format(
|
||||
self.instance.hostname, ambari_server_ip))
|
||||
self._remote.replace_remote_string(
|
||||
self._replace_str_in_remote_file(
|
||||
'/etc/ambari-agent/conf/ambari-agent.ini', 'localhost',
|
||||
ambari_server_ip)
|
||||
|
||||
LOG.info(
|
||||
'{0}: Starting Ambari Agent ...'.format(self.instance.hostname))
|
||||
self._remote.execute_command('ambari-agent start')
|
||||
self._execute_on_vm('ambari-agent start')
|
||||
|
||||
def _configure_ganglia(self, ganglia_server_ip):
|
||||
#TODO(John): the set of files to update is now dependent on which
|
||||
@ -108,44 +112,43 @@ class HadoopServer:
|
||||
# slave config
|
||||
#TODO(jspeidel): set MASTER_SLAVE for master where only one node is
|
||||
# deployed
|
||||
def replace_host_params(config_file):
|
||||
self._remote.replace_remote_string(
|
||||
config_file,
|
||||
'host = %s' % self.instance.hostname,
|
||||
'host = %s' % ganglia_server_ip)
|
||||
|
||||
if self._is_ganglia_slave() or self._is_ganglia_master():
|
||||
replace_host_params(
|
||||
'/etc/ganglia/hdp/HDPSlaves/conf.d/gmond.slave.conf')
|
||||
replace_host_params(
|
||||
'/etc/ganglia/hdp/HDPJobTracker/conf.d/gmond.slave.conf')
|
||||
replace_host_params(
|
||||
'/etc/ganglia/hdp/HDPNameNode/conf.d/gmond.slave.conf')
|
||||
replace_host_params(
|
||||
'/etc/ganglia/hdp/HDPHBaseMaster/conf.d/gmond.slave.conf')
|
||||
self._replace_str_in_remote_file(
|
||||
'/etc/ganglia/hdp/HDPSlaves/conf.d/gmond.slave.conf',
|
||||
'host = {0}'.format(self.instance.hostname),
|
||||
'host = {0}'.format(ganglia_server_ip))
|
||||
self._replace_str_in_remote_file(
|
||||
'/etc/ganglia/hdp/HDPJobTracker/conf.d/gmond.slave.conf',
|
||||
'host = {0}'.format(self.instance.hostname),
|
||||
'host = {0}'.format(ganglia_server_ip))
|
||||
self._replace_str_in_remote_file(
|
||||
'/etc/ganglia/hdp/HDPNameNode/conf.d/gmond.slave.conf',
|
||||
'host = {0}'.format(self.instance.hostname),
|
||||
'host = {0}'.format(ganglia_server_ip))
|
||||
self._replace_str_in_remote_file(
|
||||
'/etc/ganglia/hdp/HDPHBaseMaster/conf.d/gmond.slave.conf',
|
||||
'host = {0}'.format(self.instance.hostname),
|
||||
'host = {0}'.format(ganglia_server_ip))
|
||||
|
||||
#master config
|
||||
def remove_bind_params(config_file):
|
||||
self._remote.replace_remote_string(
|
||||
config_file,
|
||||
'bind = %s' % self.instance.hostname, '')
|
||||
|
||||
if self._is_ganglia_master():
|
||||
remove_bind_params(
|
||||
'/etc/ganglia/hdp/HDPSlaves/conf.d/gmond.master.conf')
|
||||
remove_bind_params(
|
||||
'/etc/ganglia/hdp/HDPJobTracker/conf.d/gmond.master.conf')
|
||||
remove_bind_params(
|
||||
'/etc/ganglia/hdp/HDPNameNode/conf.d/gmond.master.conf')
|
||||
|
||||
#TODO(jspeidel): appears only to be necessary if hbase is installed
|
||||
self._replace_str_in_remote_file(
|
||||
'/etc/ganglia/hdp/HDPSlaves/conf.d/gmond.master.conf',
|
||||
'bind = {0}'.format(self.instance.hostname), '')
|
||||
self._replace_str_in_remote_file(
|
||||
'/etc/ganglia/hdp/HDPJobTracker/conf.d/gmond.master.conf',
|
||||
'bind = {0}'.format(self.instance.hostname), '')
|
||||
self._replace_str_in_remote_file(
|
||||
'/etc/ganglia/hdp/HDPNameNode/conf.d/gmond.master.conf',
|
||||
'bind = {0}'.format(self.instance.hostname), '')
|
||||
#TODO(jspeidel): appears only to be necessary if hbase is installed
|
||||
# self._replace_str_in_remote_file(self._ssh,
|
||||
# '/etc/ganglia/hdp/HDPHBaseMaster/conf.d/gmond.master.conf',
|
||||
# 'bind = {0}'.format(
|
||||
# self.instance.fqdn), '')
|
||||
|
||||
# gangliaClusters.conf
|
||||
self._remote.replace_remote_string(
|
||||
self._replace_str_in_remote_file(
|
||||
'/usr/libexec/hdp/ganglia/gangliaClusters.conf',
|
||||
self.instance.fqdn, ganglia_server_ip)
|
||||
|
||||
@ -153,7 +156,7 @@ class HadoopServer:
|
||||
# configs that are used after restart
|
||||
# gangliaClusters.conf template
|
||||
#TODO(jspeidel): modify file where prop "ganglia_server_host" is set
|
||||
self._remote.replace_remote_string(
|
||||
self._replace_str_in_remote_file(
|
||||
'/var/lib/ambari-agent/puppet/modules/hdp-ganglia/templates'
|
||||
'/gangliaClusters.conf.erb',
|
||||
'<%=scope.function_hdp_host("ganglia_server_host")%>',
|
||||
@ -161,21 +164,29 @@ class HadoopServer:
|
||||
|
||||
# gmondLib.sh This script generates the master and slave configs
|
||||
#TODO(jspeidel): combine into one call. Pass map of old/new values
|
||||
self._remote.replace_remote_string(
|
||||
self._replace_str_in_remote_file(
|
||||
'/var/lib/ambari-agent/puppet/modules/hdp-ganglia/files/gmondLib'
|
||||
'.sh',
|
||||
'bind = ${gmondMasterIP}', '')
|
||||
self._remote.replace_remote_string(
|
||||
self._replace_str_in_remote_file(
|
||||
'/var/lib/ambari-agent/puppet/modules/hdp-ganglia/files/gmondLib'
|
||||
'.sh',
|
||||
'host = ${gmondMasterIP}', 'host = {0}'.format(ganglia_server_ip))
|
||||
self._remote.replace_remote_string(
|
||||
self._replace_str_in_remote_file(
|
||||
'/usr/libexec/hdp/ganglia/gmondLib.sh',
|
||||
'bind = ${gmondMasterIP}', '')
|
||||
self._remote.replace_remote_string(
|
||||
self._replace_str_in_remote_file(
|
||||
'/usr/libexec/hdp/ganglia/gmondLib.sh',
|
||||
'host = ${gmondMasterIP}', 'host = {0}'.format(ganglia_server_ip))
|
||||
|
||||
def _replace_str_in_remote_file(self, filename, origStr, newStr):
|
||||
|
||||
remote.replace_remote_string(self._ssh, filename, origStr,
|
||||
newStr)
|
||||
|
||||
def _log(self, buf):
|
||||
LOG.debug(buf)
|
||||
|
||||
def _execute_on_vm_interactive(self, cmd, matcher):
|
||||
LOG.debug(
|
||||
"{0}: Executing interactive remote command '{1}'".format(
|
||||
@ -183,20 +194,29 @@ class HadoopServer:
|
||||
|
||||
buf = ''
|
||||
all_output = ''
|
||||
with self._remote as r:
|
||||
channel = r.ssh_connection().invoke_shell()
|
||||
try:
|
||||
channel.send(cmd + '\n')
|
||||
while not matcher.is_eof(buf):
|
||||
buf += channel.recv(4096)
|
||||
response = matcher.get_response(buf)
|
||||
if response is not None:
|
||||
channel.send(response + '\n')
|
||||
all_output += buf
|
||||
buf = ''
|
||||
finally:
|
||||
LOG.debug(all_output)
|
||||
LOG.debug(buf)
|
||||
channel = self._ssh.invoke_shell()
|
||||
try:
|
||||
channel.send(cmd + '\n')
|
||||
while not matcher.is_eof(buf):
|
||||
buf += channel.recv(4096)
|
||||
response = matcher.get_response(buf)
|
||||
if response is not None:
|
||||
channel.send(response + '\n')
|
||||
all_output += buf
|
||||
buf = ''
|
||||
finally:
|
||||
channel.close()
|
||||
self._log(all_output)
|
||||
self._log(buf)
|
||||
|
||||
def _execute_on_vm(self, cmd):
|
||||
LOG.debug("{0}: Executing remote command '{1}'".format(
|
||||
self.instance.hostname, cmd))
|
||||
LOG.debug(
|
||||
'Executing using instance: id = {0}, hostname = {1}'.format(
|
||||
self.instance.instance_id,
|
||||
self.instance.hostname))
|
||||
remote.execute_command(self._ssh, cmd)
|
||||
|
||||
def _is_component_available(self, component):
|
||||
return component in self.node_group.components
|
||||
|
@ -14,22 +14,69 @@
|
||||
# limitations under the License.
|
||||
|
||||
import inspect
|
||||
from savanna import conductor
|
||||
from savanna import context
|
||||
import savanna.exceptions as e
|
||||
from savanna.plugins.general import exceptions as ex
|
||||
from savanna.plugins.general import utils
|
||||
|
||||
conductor = conductor.API
|
||||
|
||||
|
||||
class Validator(object):
|
||||
|
||||
def validate(self, cluster):
|
||||
funcs = inspect.getmembers(Validator, predicate=inspect.ismethod)
|
||||
for func in funcs:
|
||||
if func[0].startswith("check_"):
|
||||
getattr(self, func[0])(cluster)
|
||||
|
||||
def _get_named_node_group(self, cluster, ng_name):
|
||||
return next((ng for ng in cluster.node_groups
|
||||
if ng.name == ng_name), None)
|
||||
|
||||
def validate_scaling(self, cluster, existing, additional):
|
||||
orig_existing_count = {}
|
||||
ctx = context.ctx()
|
||||
try:
|
||||
for ng_id in existing:
|
||||
node_group = self._get_by_id(cluster.node_groups, ng_id)
|
||||
if node_group:
|
||||
orig_existing_count[ng_id] = node_group.count
|
||||
conductor.node_group_update(ctx, node_group,
|
||||
{'count':
|
||||
int(existing[ng_id])})
|
||||
else:
|
||||
raise RuntimeError('Node group not found: {0}'.format(
|
||||
ng_id
|
||||
))
|
||||
for ng_id in additional:
|
||||
node_group = self._get_by_id(cluster.node_groups, ng_id)
|
||||
if node_group:
|
||||
conductor.node_group_update(ctx, node_group,
|
||||
{'count':
|
||||
int(additional[ng_id])})
|
||||
else:
|
||||
raise RuntimeError('Node group not found: {0}'.format(
|
||||
ng_id
|
||||
))
|
||||
|
||||
self.validate(cluster)
|
||||
|
||||
finally:
|
||||
for ng_id in additional:
|
||||
for ng_id in additional:
|
||||
node_group = self._get_by_id(cluster.node_groups, ng_id)
|
||||
conductor.node_group_update(ctx, node_group,
|
||||
{'count': 0})
|
||||
for ng_id in orig_existing_count:
|
||||
node_group = self._get_by_id(cluster.node_groups, ng_id)
|
||||
conductor.node_group_update(ctx, node_group,
|
||||
{'count':
|
||||
orig_existing_count[ng_id]})
|
||||
|
||||
def check_for_namenode(self, cluster):
|
||||
count = sum([ng.count for ng
|
||||
in utils.get_node_groups(cluster, "NAMENODE")])
|
||||
in utils.get_node_groups(cluster, "NAMENODE")])
|
||||
if count != 1:
|
||||
raise ex.NotSingleNameNodeException(count)
|
||||
|
||||
@ -56,6 +103,19 @@ class Validator(object):
|
||||
if "AMBARI_AGENT" not in ng.node_processes:
|
||||
raise AmbariAgentNumberException(ng.name)
|
||||
|
||||
def _get_node_groups(self, node_groups, proc_list=list()):
|
||||
proc_list = [proc_list] if type(proc_list) in [str, unicode] \
|
||||
else proc_list
|
||||
return [ng for ng in node_groups
|
||||
if set(proc_list).issubset(ng.node_processes)]
|
||||
|
||||
def _get_by_id(self, lst, id):
|
||||
for obj in lst:
|
||||
if obj.id == id:
|
||||
return obj
|
||||
|
||||
return None
|
||||
|
||||
|
||||
class NoNameNodeException(e.SavannaException):
|
||||
def __init__(self):
|
||||
|
@ -17,6 +17,8 @@ from savanna.plugins.general import exceptions as ex
|
||||
from savanna.plugins.hdp import validator as v
|
||||
import unittest2
|
||||
|
||||
import mock
|
||||
|
||||
|
||||
class ValidatorTest(unittest2.TestCase):
|
||||
|
||||
@ -104,6 +106,182 @@ class ValidatorTest(unittest2.TestCase):
|
||||
with self.assertRaises(v.AmbariAgentNumberException):
|
||||
validator.validate(cluster)
|
||||
|
||||
@mock.patch('savanna.context.ctx')
|
||||
@mock.patch('savanna.conductor.api.LocalApi.node_group_update')
|
||||
def test_scaling_with_no_jobtracker(self, ng, context):
|
||||
ng.side_effect = my_node_group_update
|
||||
context.side_effect = my_get_context
|
||||
cluster = TestCluster()
|
||||
cluster.node_groups.append(TestNodeGroup(["GANGLIA_SERVER",
|
||||
"AMBARI_SERVER",
|
||||
"AMBARI_AGENT",
|
||||
"NAMENODE"], id=1))
|
||||
add_node_group = TestNodeGroup(
|
||||
["GANGLIA_MONITOR", "AMBARI_AGENT", "TASKTRACKER"], count=0, id=2)
|
||||
cluster.node_groups.append(add_node_group)
|
||||
|
||||
additional = {2: 1}
|
||||
existing = {}
|
||||
validator = v.Validator()
|
||||
with self.assertRaises(ex.TaskTrackersWithoutJobTracker):
|
||||
validator.validate_scaling(cluster, existing, additional)
|
||||
|
||||
self.assertEqual(2, len(cluster.node_groups))
|
||||
self.assertEqual(0, cluster.node_groups[1].count)
|
||||
|
||||
@mock.patch('savanna.context.ctx')
|
||||
@mock.patch('savanna.conductor.api.LocalApi.node_group_update')
|
||||
def test_scaling_with_jobtracker(self, ng, context):
|
||||
ng.side_effect = my_node_group_update
|
||||
context.side_effect = my_get_context
|
||||
cluster = TestCluster()
|
||||
cluster.node_groups.append(TestNodeGroup(["GANGLIA_SERVER",
|
||||
"AMBARI_SERVER",
|
||||
"AMBARI_AGENT",
|
||||
"NAMENODE",
|
||||
"JOBTRACKER"], id=1))
|
||||
add_node_group = TestNodeGroup(
|
||||
["GANGLIA_MONITOR", "AMBARI_AGENT", "TASKTRACKER"], count=0, id=2)
|
||||
cluster.node_groups.append(add_node_group)
|
||||
additional = {2: 1}
|
||||
existing = {}
|
||||
validator = v.Validator()
|
||||
validator.validate_scaling(cluster, existing, additional)
|
||||
self.assertEqual(2, len(cluster.node_groups))
|
||||
self.assertEqual(0, cluster.node_groups[1].count)
|
||||
|
||||
@mock.patch('savanna.context.ctx')
|
||||
@mock.patch('savanna.conductor.api.LocalApi.node_group_update')
|
||||
def test_scaling_with_additional_ambari_server(self, ng, context):
|
||||
ng.side_effect = my_node_group_update
|
||||
context.side_effect = my_get_context
|
||||
cluster = TestCluster()
|
||||
cluster.node_groups.append(TestNodeGroup(["GANGLIA_SERVER",
|
||||
"AMBARI_SERVER",
|
||||
"AMBARI_AGENT",
|
||||
"NAMENODE",
|
||||
"JOBTRACKER"], id=1))
|
||||
test_node_group = TestNodeGroup(
|
||||
["GANGLIA_MONITOR", "AMBARI_AGENT", "TASKTRACKER",
|
||||
"AMBARI_SERVER"], count=0, id=2)
|
||||
cluster.node_groups.append(test_node_group)
|
||||
additional = {2: 1}
|
||||
existing = {}
|
||||
validator = v.Validator()
|
||||
with self.assertRaises(v.NotSingleAmbariServerException):
|
||||
validator.validate_scaling(cluster, existing, additional)
|
||||
self.assertEqual(2, len(cluster.node_groups))
|
||||
self.assertEqual(0, cluster.node_groups[1].count)
|
||||
|
||||
@mock.patch('savanna.context.ctx')
|
||||
@mock.patch('savanna.conductor.api.LocalApi.node_group_update')
|
||||
def test_scaling_an_existing_ambari_server_node_group(self, ng, context):
|
||||
ng.side_effect = my_node_group_update
|
||||
context.side_effect = my_get_context
|
||||
cluster = TestCluster()
|
||||
cluster.node_groups.append(TestNodeGroup(["GANGLIA_SERVER",
|
||||
"AMBARI_SERVER",
|
||||
"AMBARI_AGENT",
|
||||
"NAMENODE",
|
||||
"JOBTRACKER"], id=1,
|
||||
count=1))
|
||||
existing = {1: 2}
|
||||
additional = {}
|
||||
validator = v.Validator()
|
||||
with self.assertRaises(v.NotSingleAmbariServerException):
|
||||
validator.validate_scaling(cluster, existing, additional)
|
||||
self.assertEqual(1, len(cluster.node_groups))
|
||||
self.assertEqual(1, cluster.node_groups[0].count)
|
||||
|
||||
@mock.patch('savanna.context.ctx')
|
||||
@mock.patch('savanna.conductor.api.LocalApi.node_group_update')
|
||||
def test_scaling_existing_node_group(self, ng, context):
|
||||
ng.side_effect = my_node_group_update
|
||||
context.side_effect = my_get_context
|
||||
cluster = TestCluster()
|
||||
cluster.node_groups.append(TestNodeGroup(["GANGLIA_MONITOR",
|
||||
"AMBARI_AGENT",
|
||||
"DATANODE",
|
||||
"TASKTRACKER"], id=1))
|
||||
cluster.node_groups.append(TestNodeGroup(["GANGLIA_SERVER",
|
||||
"AMBARI_SERVER",
|
||||
"AMBARI_AGENT",
|
||||
"NAMENODE",
|
||||
"JOBTRACKER"], id=2))
|
||||
additional = {}
|
||||
existing = {1: 2}
|
||||
validator = v.Validator()
|
||||
validator.validate_scaling(cluster, existing, additional)
|
||||
self.assertEqual(2, len(cluster.node_groups))
|
||||
self.assertEqual(1, cluster.node_groups[0].count)
|
||||
|
||||
@mock.patch('savanna.context.ctx')
|
||||
@mock.patch('savanna.conductor.api.LocalApi.node_group_update')
|
||||
def test_scaling_existing_mult_node_group(self, ng, context):
|
||||
ng.side_effect = my_node_group_update
|
||||
context.side_effect = my_get_context
|
||||
cluster = TestCluster()
|
||||
cluster.node_groups.append(TestNodeGroup(["GANGLIA_MONITOR",
|
||||
"AMBARI_AGENT",
|
||||
"DATANODE",
|
||||
"TASKTRACKER"],
|
||||
name="TEST1", id=1))
|
||||
cluster.node_groups.append(TestNodeGroup(["GANGLIA_SERVER",
|
||||
"AMBARI_SERVER",
|
||||
"AMBARI_AGENT",
|
||||
"NAMENODE",
|
||||
"JOBTRACKER"], count=1,
|
||||
name="TEST2", id=2))
|
||||
test_node_group = TestNodeGroup(
|
||||
["GANGLIA_MONITOR", "AMBARI_AGENT", "DATANODE", "TASKTRACKER"],
|
||||
count=0, name="TEST3", id=3)
|
||||
cluster.node_groups.append(test_node_group)
|
||||
additional = {3: 1}
|
||||
existing = {1: 2}
|
||||
validator = v.Validator()
|
||||
validator.validate_scaling(cluster, existing, additional)
|
||||
self.assertEqual(3, len(cluster.node_groups))
|
||||
self.assertEqual(1, cluster.node_groups[0].count)
|
||||
|
||||
@mock.patch('savanna.context.ctx')
|
||||
@mock.patch('savanna.conductor.api.LocalApi.node_group_update')
|
||||
def test_scaling_down_existing_mult_node_group(self, ng, context):
|
||||
ng.side_effect = my_node_group_update
|
||||
context.side_effect = my_get_context
|
||||
cluster = TestCluster()
|
||||
cluster.node_groups.append(TestNodeGroup(["GANGLIA_MONITOR",
|
||||
"AMBARI_AGENT",
|
||||
"DATANODE",
|
||||
"TASKTRACKER"],
|
||||
name="TEST1", id=1))
|
||||
cluster.node_groups.append(TestNodeGroup(["GANGLIA_SERVER",
|
||||
"AMBARI_SERVER",
|
||||
"AMBARI_AGENT",
|
||||
"NAMENODE",
|
||||
"JOBTRACKER"], count=1,
|
||||
name="TEST2", id=2))
|
||||
test_node_group = TestNodeGroup(
|
||||
["GANGLIA_MONITOR", "AMBARI_AGENT", "DATANODE", "TASKTRACKER"],
|
||||
count=0, name="TEST3", id=3)
|
||||
cluster.node_groups.append(test_node_group)
|
||||
additional = {3: 1}
|
||||
existing = {2: 0}
|
||||
validator = v.Validator()
|
||||
with self.assertRaises(v.NotSingleAmbariServerException):
|
||||
validator.validate_scaling(cluster, existing, additional)
|
||||
self.assertEqual(3, len(cluster.node_groups))
|
||||
self.assertEqual(1, cluster.node_groups[0].count)
|
||||
self.assertEqual(1, cluster.node_groups[1].count)
|
||||
|
||||
|
||||
def my_node_group_update(*args, **kwargs):
|
||||
node_group = args[1]
|
||||
node_group.count = args[2]['count']
|
||||
|
||||
|
||||
def my_get_context(*args, **kwargs):
|
||||
return None
|
||||
|
||||
|
||||
class TestCluster(object):
|
||||
|
||||
@ -113,7 +291,8 @@ class TestCluster(object):
|
||||
|
||||
class TestNodeGroup:
|
||||
|
||||
def __init__(self, processes, count=1):
|
||||
def __init__(self, processes, name=None, count=1, id=0):
|
||||
self.node_processes = processes
|
||||
self.count = count or 1
|
||||
self.name = 'TEST'
|
||||
self.id = id
|
||||
|
Loading…
Reference in New Issue
Block a user