Add support for multiple HDP versions

This is the first commit in a series of commits to support multiple
HDP versions.  There will more than likely be subsequent commits
to account for discoveries during the actual inclusion of new
versions or if the level of abstraction is deemed inappropriate etc.

implements blueprint hdp-version-support

Change-Id: I7a94753b18920a9a1052123da9528c47ff47db7a
This commit is contained in:
Jon Maron 2013-09-26 16:50:22 -04:00
parent 48f4cf66fd
commit 31987454a9
17 changed files with 836 additions and 887 deletions

View File

@ -10,8 +10,10 @@ include savanna/db/migration/alembic_migrations/versions/README
recursive-include savanna/locale *
include savanna/plugins/vanilla/resources/*.xml
include savanna/plugins/hdp/resources/*.json
include savanna/plugins/hdp/resources/*.template
include savanna/plugins/hdp/versions/1_3_2/resources/*.template
include savanna/plugins/hdp/versions/1_3_2/resources/*.json
include savanna/plugins/hdp/versions/2_0/resources/*.template
include savanna/plugins/hdp/versions/2_0/resources/*.json
include savanna/service/edp/resources/*.xml
include savanna/swift/resources/*.xml
include savanna/tests/unit/resources/*.xml

View File

@ -13,21 +13,16 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import requests
from savanna import conductor
from savanna import context
from savanna import exceptions as exc
from savanna.openstack.common import jsonutils as json
from savanna.openstack.common import log as logging
from savanna.plugins.hdp import blueprintprocessor as bp
from savanna.plugins.hdp import clusterspec
from savanna.plugins.hdp import configprovider as cfg
from savanna.plugins.hdp import exceptions as ex
from savanna.plugins.hdp import hadoopserver as h
from savanna.plugins.hdp import savannautils as utils
from savanna.plugins.hdp import validator as v
from savanna.plugins.hdp.versions import versionhandlerfactory as vhf
from savanna.plugins import provisioning as p
conductor = conductor.API
@ -37,15 +32,17 @@ LOG = logging.getLogger(__name__)
class AmbariPlugin(p.ProvisioningPluginBase):
def __init__(self):
self.cluster_ambari_mapping = {}
self.default_config = self._get_default_cluster_configuration()
self.version_factory = vhf.VersionHandlerFactory.get_instance()
def create_cluster(self, cluster, cluster_template):
if cluster_template is None:
raise ValueError('must supply cluster template')
cluster_spec = clusterspec.ClusterSpec(
cluster_template, cluster=cluster)
version = cluster.hadoop_version
handler = self.version_factory.get_version_handler(version)
cluster_spec = handler.get_cluster_spec(cluster_template, cluster)
hosts = self._get_servers(cluster)
ambari_info = self.get_ambari_info(cluster_spec)
@ -60,23 +57,16 @@ class AmbariPlugin(p.ProvisioningPluginBase):
ambari_rpm=rpm))
provisioned = self._provision_cluster(
cluster.name, cluster_spec, ambari_info, servers)
cluster.name, cluster_spec, ambari_info, servers,
cluster.hadoop_version)
if provisioned:
installed = self._install_services(cluster.name, ambari_info)
if installed:
# install the swift integration on the servers
self._install_swift_integration(servers)
LOG.info("Install of Hadoop stack successful.")
# add service urls
self._set_cluster_info(cluster, cluster_spec, ambari_info)
else:
raise ex.HadoopProvisionError(
'Installation of Hadoop stack failed.')
LOG.info("Install of Hadoop stack successful.")
# add service urls
self._set_cluster_info(cluster, cluster_spec, ambari_info)
else:
raise ex.HadoopProvisionError(
'Provisioning of Hadoop cluster failed.')
'Installation of Hadoop stack failed.')
def _get_servers(self, cluster):
servers = []
@ -91,9 +81,11 @@ class AmbariPlugin(p.ProvisioningPluginBase):
return servers
def get_node_processes(self, hadoop_version):
#TODO(jmaron): use version information
node_processes = {}
for service in self.default_config.services:
version_handler = \
self.version_factory.get_version_handler(hadoop_version)
default_config = version_handler.get_default_cluster_configuration()
for service in default_config.services:
components = []
for component in service.components:
components.append(component.name)
@ -101,13 +93,10 @@ class AmbariPlugin(p.ProvisioningPluginBase):
return node_processes
def _install_swift_integration(self, servers):
for server in servers:
server.install_swift_integration()
def convert(self, config, plugin_name, version, template_name,
cluster_template_create):
normalized_config = clusterspec.ClusterSpec(config).normalize()
handler = self.version_factory.get_version_handler(version)
normalized_config = handler.get_cluster_spec(config, None).normalize()
#TODO(jspeidel): can we get the name (first arg) from somewhere?
@ -122,7 +111,7 @@ class AmbariPlugin(p.ProvisioningPluginBase):
node_groups.append(node_group)
cluster_configs = dict()
config_resource = self.get_configs(version)
config_resource = handler.get_config_items()
for entry in normalized_config.cluster_configs:
user_input = next((ui for ui in config_resource
if entry.config.name == ui.name), None)
@ -155,119 +144,8 @@ class AmbariPlugin(p.ProvisioningPluginBase):
def _spawn(self, description, func, *args, **kwargs):
context.spawn(description, func, *args, **kwargs)
def _add_cluster(self, ambari_info, name):
add_cluster_url = 'http://{0}/api/v1/clusters/{1}'.format(
ambari_info.get_address(), name)
#TODO(jspeidel): get stack info from config spec
result = requests.post(add_cluster_url,
data='{"Clusters": {"version" : "HDP-1.3.0"}}',
auth=(ambari_info.user, ambari_info.password))
if result.status_code != 201:
LOG.warning(
'Create cluster command failed. {0}'.format(result.text))
return False
return True
def _add_configurations_to_cluster(
self, cluster_spec, ambari_info, name):
configs = cluster_spec.configurations
config_url = 'http://{0}/api/v1/clusters/{1}'.format(
ambari_info.get_address(), name)
body = {}
clusters = {}
body['Clusters'] = clusters
for config_name in configs:
if config_name == 'ambari':
continue
config_body = {}
clusters['desired_config'] = config_body
config_body['type'] = config_name
#TODO(jspeidel): hard coding for now
config_body['tag'] = 'v1'
config_body['properties'] = configs[config_name]
result = requests.put(config_url, data=json.dumps(body), auth=(
ambari_info.user, ambari_info.password))
if result.status_code != 200:
LOG.warning(
'Set configuration command failed. {0}'.format(
result.text))
return False
return True
def _add_services_to_cluster(self, cluster_spec, ambari_info, name):
services = cluster_spec.services
add_service_url = 'http://{0}/api/v1/clusters/{1}/services/{2}'
for service in services:
if service.name != 'AMBARI':
result = requests.post(add_service_url.format(
ambari_info.get_address(), name, service.name),
auth=(ambari_info.user, ambari_info.password))
if result.status_code != 201:
LOG.warning(
'Create service command failed. {0}'.format(
result.text))
return False
return True
def _add_components_to_services(self, cluster_spec, ambari_info, name):
add_component_url = 'http://{0}/api/v1/clusters/{1}/services/{' \
'2}/components/{3}'
for service in cluster_spec.services:
if service.name != 'AMBARI':
for component in service.components:
result = requests.post(add_component_url.format(
ambari_info.get_address(), name, service.name,
component.name),
auth=(ambari_info.user, ambari_info.password))
if result.status_code != 201:
LOG.warning(
'Create component command failed. {0}'.format(
result.text))
return False
return True
def _add_hosts_and_components(
self, cluster_spec, servers, ambari_info, name):
add_host_url = 'http://{0}/api/v1/clusters/{1}/hosts/{2}'
add_host_component_url = 'http://{0}/api/v1/clusters/{1}' \
'/hosts/{2}/host_components/{3}'
for host in servers:
hostname = host.instance.fqdn.lower()
result = requests.post(
add_host_url.format(ambari_info.get_address(), name, hostname),
auth=(ambari_info.user, ambari_info.password))
if result.status_code != 201:
LOG.warning(
'Create host command failed. {0}'.format(result.text))
return False
node_group_name = host.node_group.name
#TODO(jspeidel): ensure that node group exists
node_group = cluster_spec.node_groups[node_group_name]
for component in node_group.components:
# don't add any AMBARI components
if component.find('AMBARI') != 0:
result = requests.post(add_host_component_url.format(
ambari_info.get_address(), name, hostname, component),
auth=(ambari_info.user, ambari_info.password))
if result.status_code != 201:
LOG.warning(
'Create host_component command failed. {0}'.format(
result.text))
return False
return True
def _provision_cluster(self, name, cluster_spec, ambari_info, servers):
def _provision_cluster(self, name, cluster_spec, ambari_info, servers,
version):
#TODO(jspeidel): encapsulate in another class
LOG.info('Provisioning Cluster via Ambari Server: {0} ...'.format(
@ -278,239 +156,19 @@ class AmbariPlugin(p.ProvisioningPluginBase):
"hdp-provision-instance-%s" % server.instance.hostname,
server.provision_ambari, ambari_info)
self._wait_for_host_registrations(len(servers), ambari_info)
handler = self.version_factory.get_version_handler(version)
ambari_client = handler.get_ambari_client()
self._set_ambari_credentials(cluster_spec, ambari_info)
ambari_client.wait_for_host_registrations(len(servers), ambari_info)
# add cluster
if not self._add_cluster(ambari_info, name):
return False
self._set_ambari_credentials(cluster_spec, ambari_info, version)
# add configurations to cluster
if not self._add_configurations_to_cluster(cluster_spec,
ambari_info, name):
return False
# add services
if not self._add_services_to_cluster(cluster_spec,
ambari_info, name):
return False
# add components to services
if not self._add_components_to_services(cluster_spec,
ambari_info, name):
return False
# add hosts and host_components
if not self._add_hosts_and_components(cluster_spec, servers,
ambari_info, name):
if not ambari_client.provision_cluster(cluster_spec, servers,
ambari_info, name):
return False
return True
def _wait_for_host_registrations(self, num_hosts, ambari_info):
LOG.info(
'Waiting for all Ambari agents to register with server ...')
url = 'http://{0}/api/v1/hosts'.format(ambari_info.get_address())
result = None
json_result = None
#TODO(jspeidel): timeout
while result is None or len(json_result['items']) < num_hosts:
context.sleep(5)
try:
result = requests.get(url, auth=(ambari_info.user,
ambari_info.password))
json_result = json.loads(result.text)
# TODO(jspeidel): just for debug
LOG.info('Registered Hosts: {0} of {1}'.format(
len(json_result['items']), num_hosts))
for hosts in json_result['items']:
LOG.debug('Registered Host: {0}'.format(
hosts['Hosts']['host_name']))
except requests.ConnectionError:
#TODO(jspeidel): max wait time
LOG.info('Waiting to connect to ambari server ...')
def _install_services(self, cluster_name, ambari_info):
LOG.info('Installing required Hadoop services ...')
ambari_address = ambari_info.get_address()
install_url = 'http://{0}/api/v1/clusters/{' \
'1}/services?ServiceInfo/state=INIT'.format(
ambari_address, cluster_name)
body = '{"ServiceInfo": {"state" : "INSTALLED"}}'
result = requests.put(install_url, data=body, auth=(
ambari_info.user, ambari_info.password))
if result.status_code == 202:
#TODO(jspeidel) don't hard code request id
success = self._wait_for_async_request(
self._get_async_request_uri(ambari_info, cluster_name, 1),
auth=(ambari_info.user, ambari_info.password))
if success:
LOG.info("Install of Hadoop stack successful.")
self._finalize_ambari_state(ambari_info)
else:
LOG.critical('Install command failed.')
raise RuntimeError('Hadoop service install failed')
else:
LOG.error(
'Install command failed. {0}'.format(result.text))
raise RuntimeError('Hadoop service install failed')
return success
def _get_async_request_uri(self, ambari_info, cluster_name, request_id):
return 'http://{0}/api/v1/clusters/{1}/requests/{' \
'2}/tasks?fields=Tasks/status'.format(
ambari_info.get_address(), cluster_name,
request_id)
def _wait_for_async_request(self, request_url, auth):
started = False
while not started:
result = requests.get(request_url, auth=auth)
LOG.debug(
'async request ' + request_url + ' response:\n' + result.text)
json_result = json.loads(result.text)
started = True
for items in json_result['items']:
status = items['Tasks']['status']
if status == 'FAILED' or status == 'ABORTED':
return False
else:
if status != 'COMPLETED':
started = False
context.sleep(5)
return started
def _start_services(self, cluster_name, ambari_info):
LOG.info('Starting Hadoop services ...')
LOG.info('Cluster name: {0}, Ambari server address: {1}'
.format(cluster_name, ambari_info.get_address()))
start_url = 'http://{0}/api/v1/clusters/{1}/services?ServiceInfo/' \
'state=INSTALLED'.format(
ambari_info.get_address(), cluster_name)
body = '{"ServiceInfo": {"state" : "STARTED"}}'
auth = (ambari_info.user, ambari_info.password)
result = requests.put(start_url, data=body, auth=auth)
if result.status_code == 202:
# don't hard code request id
success = self._wait_for_async_request(
self._get_async_request_uri(ambari_info, cluster_name, 2),
auth=auth)
if success:
LOG.info(
"Successfully started Hadoop cluster '{0}'.".format(
cluster_name))
else:
LOG.critical('Failed to start Hadoop cluster.')
raise RuntimeError('Failed to start Hadoop cluster.')
else:
LOG.critical(
'Start command failed. Status: {0}, response: {1}'.
format(result.status_code, result.text))
raise RuntimeError('Hadoop cluster start failed.')
def _install_components(self, ambari_info, auth, cluster_name, servers):
LOG.info('Starting Hadoop components while scaling up')
LOG.info('Cluster name {0}, Ambari server ip {1}'
.format(cluster_name, ambari_info.get_address()))
# query for the host components on the given hosts that are in the
# INIT state
body = '{"HostRoles": {"state" : "INSTALLED"}}'
install_uri = 'http://{0}/api/v1/clusters/{' \
'1}/host_components?HostRoles/state=INIT&' \
'HostRoles/host_name.in({2})'.format(
ambari_info.get_address(),
cluster_name,
self._get_host_list(servers))
self._exec_ambari_command(auth, body, install_uri)
def _start_components(self, ambari_info, auth, cluster_name, servers):
# query for all the host components on one of the hosts in the
# INSTALLED state, then get a list of the client services in the list
installed_uri = 'http://{0}/api/v1/clusters/{' \
'1}/host_components?HostRoles/state=INSTALLED&' \
'HostRoles/host_name.in({2})' \
.format(ambari_info.get_address(), cluster_name,
self._get_host_list(servers))
result = requests.get(installed_uri, auth=auth)
if result.status_code == 200:
LOG.debug(
'GET response: {0}'.format(result.text))
json_result = json.loads(result.text)
items = json_result['items']
# select non-CLIENT items
inclusion_list = list(set([x['HostRoles']['component_name']
for x in items if "CLIENT" not in
x['HostRoles']['component_name']]))
# query and start all non-client components on the given set of
# hosts
body = '{"HostRoles": {"state" : "STARTED"}}'
start_uri = 'http://{0}/api/v1/clusters/{' \
'1}/host_components?HostRoles/state=INSTALLED&' \
'HostRoles/host_name.in({2})' \
'&HostRoles/component_name.in({3})'.format(
ambari_info.get_address(), cluster_name,
self._get_host_list(servers),
",".join(inclusion_list))
self._exec_ambari_command(auth, body, start_uri)
else:
raise RuntimeError('Unable to determine installed service '
'components in scaled instances. status'
' code returned = {0}'.format(result.status))
def _install_and_start_components(self, cluster_name, servers,
ambari_info):
auth = (ambari_info.user, ambari_info.password)
self._install_components(ambari_info, auth, cluster_name, servers)
self._install_swift_integration(servers)
self._start_components(ambari_info, auth, cluster_name, servers)
def _exec_ambari_command(self, auth, body, cmd_uri):
LOG.debug('PUT URI: {0}'.format(cmd_uri))
result = requests.put(cmd_uri, data=body,
auth=auth)
if result.status_code == 202:
# don't hard code request id
LOG.debug(
'PUT response: {0}'.format(result.text))
json_result = json.loads(result.text)
href = json_result['href'] + '/tasks?fields=Tasks/status'
success = self._wait_for_async_request(href, auth)
if success:
LOG.info(
"Successfully changed state of Hadoop components ")
else:
LOG.critical('Failed to change state of Hadoop '
'components')
raise RuntimeError('Failed to change state of Hadoop '
'components')
else:
LOG.error(
'Command failed. Status: {0}, response: {1}'.
format(result.status_code, result.text))
raise RuntimeError('Hadoop/Ambari command failed.')
def _get_default_cluster_configuration(self):
with open(os.path.join(os.path.dirname(__file__), 'resources',
'default-cluster.template'), 'r') as f:
return clusterspec.ClusterSpec(f.read())
def _set_cluster_info(self, cluster, cluster_spec, ambari_info):
info = {}
@ -541,25 +199,10 @@ class AmbariPlugin(p.ProvisioningPluginBase):
ctx = context.ctx()
conductor.cluster_update(ctx, cluster, {'info': info})
def _finalize_ambari_state(self, ambari_info):
LOG.info('Finalizing Ambari cluster state.')
persist_state_uri = 'http://{0}/api/v1/persist'.format(
ambari_info.get_address())
# this post data has non-standard format because persist
# resource doesn't comply with Ambari API standards
persist_data = '{ "CLUSTER_CURRENT_STATUS":' \
'"{\\"clusterState\\":\\"CLUSTER_STARTED_5\\"}" }'
result = requests.post(persist_state_uri, data=persist_data,
auth=(ambari_info.user, ambari_info.password))
if result.status_code != 201 and result.status_code != 202:
LOG.warning('Finalizing of Ambari cluster state failed. {0}'.
format(result.text))
raise ex.HadoopProvisionError('Unable to finalize Ambari state.')
def _set_ambari_credentials(self, cluster_spec, ambari_info):
def _set_ambari_credentials(self, cluster_spec, ambari_info, version):
services = cluster_spec.services
ambari_client = self.version_factory.get_version_handler(version).\
get_ambari_client()
for service in services:
if service.name == 'AMBARI':
is_admin_provided = False
@ -567,13 +210,13 @@ class AmbariPlugin(p.ProvisioningPluginBase):
admin_password = ambari_info.password
for u in service.users:
if u.name == 'admin':
self._update_ambari_admin_user(
ambari_client.update_ambari_admin_user(
u.password, ambari_info)
is_admin_provided = True
ambari_info.user = 'admin'
ambari_info.password = u.password
else:
self._add_ambari_user(u, ambari_info)
ambari_client.add_ambari_user(u, ambari_info)
if 'admin' in u.groups:
admin_user = u.name
admin_password = u.password
@ -585,7 +228,7 @@ class AmbariPlugin(p.ProvisioningPluginBase):
"configured.")
ambari_info.user = admin_user
ambari_info.password = admin_password
self._delete_ambari_user('admin', ambari_info)
ambari_client.delete_ambari_user('admin', ambari_info)
break
def _update_ambari_info_credentials(self, cluster_spec, ambari_info):
@ -602,61 +245,13 @@ class AmbariPlugin(p.ProvisioningPluginBase):
LOG.info('Using "{0}" as admin user for scaling of cluster'
.format(ambari_info.user))
def _update_ambari_admin_user(self, password, ambari_info):
old_pwd = ambari_info.password
user_url = 'http://{0}/api/v1/users/admin'.format(
ambari_info.get_address())
update_body = '{{"Users":{{"roles":"admin,user","password":"{0}",'\
'"old_password":"{1}"}} }}'.format(password, old_pwd)
request = self._get_rest_request()
result = request.put(user_url, data=update_body, auth=(
ambari_info.user, ambari_info.password))
if result.status_code != 200:
raise ex.HadoopProvisionError('Unable to update Ambari admin user'
' credentials: {0}'.
format(result.text))
def _add_ambari_user(self, user, ambari_info):
user_url = 'http://{0}/api/v1/users/{1}'.format(
ambari_info.get_address(), user.name)
create_body = '{{"Users":{{"password":"{0}","roles":"{1}"}} }}'.\
format(user.password, '%s' % ','.join(map(str, user.groups)))
request = self._get_rest_request()
result = request.post(user_url, data=create_body, auth=(
ambari_info.user, ambari_info.password))
if result.status_code != 201:
raise ex.HadoopProvisionError('Unable to create Ambari user: {0}'.
format(result.text))
def _delete_ambari_user(self, user_name, ambari_info):
user_url = 'http://{0}/api/v1/users/{1}'.format(
ambari_info.get_address(), user_name)
request = self._get_rest_request()
result = request.delete(user_url, auth=(
ambari_info.user, ambari_info.password))
if result.status_code != 200:
raise ex.HadoopProvisionError('Unable to delete Ambari user: {0}'
' : {1}'.format(user_name,
result.text))
def _get_rest_request(self):
return requests
# SAVANNA PLUGIN SPI METHODS:
def _get_blueprint_processor(self, cluster):
processor = bp.BlueprintProcessor(json.load(
open(os.path.join(os.path.dirname(__file__), 'resources',
'default-cluster.template'), "r")))
processor.process_user_inputs(self._map_to_user_inputs(
'1.3.0', cluster.cluster_configs))
processor.process_node_groups(cluster.node_groups)
version = cluster.hadoop_version
handler = self.version_factory.get_version_handler(version)
user_inputs = self._map_to_user_inputs(version,
cluster.cluster_configs)
processor = handler.process_cluster(user_inputs, cluster.node_groups)
return processor
def configure_cluster(self, cluster):
@ -668,29 +263,24 @@ class AmbariPlugin(p.ProvisioningPluginBase):
# defining node level configuration items (i.e. scope='cluster' in
# all cases for returned configs)
#create a cloud context
#TODO(jmaron): is base host name really necessary any longer?
#cloud_ctx = ClusterContext(None, LOG)
#self._add_instances_to_cluster_context (cloud_ctx, cluster)
self.create_cluster(cluster, json.dumps(processor.blueprint))
def get_versions(self):
return ['1.3.0']
return self.version_factory.get_versions()
def get_configs(self, hadoop_version):
config_resource = cfg.ConfigurationProvider(
json.load(open(os.path.join(os.path.dirname(__file__), 'resources',
'ambari-config-resource.json'), "r")))
return config_resource.get_config_items()
handler = self.version_factory.get_version_handler(hadoop_version)
return handler.get_config_items()
# cluster name argument supports the non-savanna cluster creation mode
def start_cluster(self, cluster, cluster_name=None):
if cluster_name is None:
cluster_name = cluster.name
self._start_services(
client = self.version_factory.get_version_handler(
cluster.hadoop_version).get_ambari_client()
client.start_services(
cluster_name, self.cluster_ambari_mapping[cluster_name])
def get_title(self):
@ -709,9 +299,12 @@ class AmbariPlugin(p.ProvisioningPluginBase):
validator.validate(cluster)
def scale_cluster(self, cluster, instances):
handler = self.version_factory.get_version_handler(
cluster.hadoop_version)
ambari_client = handler.get_ambari_client()
processor = self._get_blueprint_processor(cluster)
cluster_spec = clusterspec.ClusterSpec(
json.dumps(processor.blueprint), cluster=cluster)
cluster_spec = handler.get_cluster_spec(
json.dumps(processor.blueprint), cluster)
rpm = self._get_rpm_uri(cluster_spec)
servers = []
@ -729,14 +322,8 @@ class AmbariPlugin(p.ProvisioningPluginBase):
self._spawn('Ambari provisioning thread',
server.provision_ambari, ambari_info)
self._wait_for_host_registrations(self._get_num_hosts(cluster),
ambari_info)
# now add the hosts and the component
self._add_hosts_and_components(cluster_spec, servers,
ambari_info, cluster.name)
self._install_and_start_components(cluster.name, servers, ambari_info)
ambari_client.scale_cluster(cluster.name, cluster_spec, servers,
self._get_num_hosts(cluster), ambari_info)
def decommission_nodes(self, cluster, instances):
raise exc.InvalidException('The HDP plugin does not yet support the '

View File

@ -13,23 +13,23 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from savanna.openstack.common import jsonutils as json
from savanna.openstack.common import log as logging
from savanna.plugins.hdp import configprovider as cfg
from savanna.plugins.hdp import savannautils as utils
from savanna.plugins.hdp.versions import versionhandlerfactory as vhf
import savanna.utils.openstack.nova as n_helper
LOG = logging.getLogger(__name__)
class ClusterSpec():
def __init__(self, cluster_template, cluster=None):
def __init__(self, cluster_template, cluster=None, version='1.3.2'):
self.services = []
self.configurations = {}
self.node_groups = {}
self.servers = None
self.str = cluster_template
self.version = version
if cluster:
self.servers = self._get_servers_from_savanna_cluster(cluster)
@ -215,12 +215,11 @@ class User():
class NormalizedClusterConfig():
def __init__(self, cluster_spec):
#TODO(jspeidel): get from stack config
self.hadoop_version = '1.3.0'
self.hadoop_version = cluster_spec.version
self.cluster_configs = []
self.node_groups = []
self.config = cfg.ConfigurationProvider(
json.load(open(os.path.join(os.path.dirname(__file__), 'resources',
'ambari-config-resource.json'), "r")))
self.handler = vhf.VersionHandlerFactory.get_instance().\
get_version_handler(self.hadoop_version)
self._parse_configurations(cluster_spec.configurations)
self._parse_node_groups(cluster_spec.node_groups)
@ -242,7 +241,7 @@ class NormalizedClusterConfig():
def _get_property_target(self, config, prop):
# Once config resource is complete we won't need to fall through
# based on config type
target = self.config.get_applicable_target(prop)
target = self.handler.get_applicable_target(prop)
if not target:
if config == 'hdfs-site':
target = 'HDFS'

View File

@ -1,362 +0,0 @@
{
"services" : [
{
"name" : "HDFS",
"components" : [
{
"name" : "NAMENODE",
"type" : "MASTER",
"cardinality" : "1"
},
{
"name" : "DATANODE",
"type" : "SLAVE",
"cardinality" : "1+"
},
{
"name" : "SECONDARY_NAMENODE",
"type" : "MASTER",
"cardinality" : "1"
},
{
"name" : "HDFS_CLIENT",
"type" : "CLIENT",
"cardinality" : "1+"
}
] ,
"configurations" : [
]
},
{
"name" : "MAPREDUCE",
"components" : [
{
"name" : "JOBTRACKER",
"type" : "MASTER",
"cardinality" : "1"
},
{
"name" : "TASKTRACKER",
"type" : "SLAVE",
"cardinality" : "1+"
},
{
"name" : "MAPREDUCE_CLIENT",
"type" : "CLIENT",
"cardinality" : "1+"
}
],
"configurations" : [
]
},
{
"name" : "GANGLIA",
"components" : [
{
"name" : "GANGLIA_SERVER",
"type" : "MASTER",
"cardinality" : "1"
},
{
"name" : "GANGLIA_MONITOR",
"type" : "SLAVE",
"cardinality" : "1+"
}
],
"configurations" : [
]
},
{
"name" : "NAGIOS",
"components" : [
{
"name" : "NAGIOS_SERVER",
"type" : "MASTER",
"cardinality" : "1"
}
],
"configurations" : [
]
},
{
"name" : "AMBARI",
"components" : [
{
"name" : "AMBARI_SERVER",
"type" : "MASTER",
"cardinality" : "1"
},
{
"name" : "AMBARI_AGENT",
"type" : "SLAVE",
"cardinality" : "1+"
}
],
"configurations" : [
]
}
],
"host_role_mappings" : [
{
"name" : "MASTER",
"components" : [
{ "name" : "NAMENODE" },
{ "name" : "JOBTRACKER" },
{ "name" : "SECONDARY_NAMENODE" },
{ "name" : "GANGLIA_SERVER" },
{ "name" : "GANGLIA_MONITOR" },
{ "name" : "NAGIOS_SERVER" },
{ "name" : "AMBARI_SERVER" },
{ "name" : "AMBARI_AGENT" }
],
"hosts" : [
{
"cardinality" : "1"
}
]
},
{
"name" : "SLAVE",
"components" : [
{ "name" : "DATANODE" },
{ "name" : "TASKTRACKER" },
{ "name" : "GANGLIA_MONITOR" },
{ "name" : "HDFS_CLIENT" },
{ "name" : "MAPREDUCE_CLIENT" },
{ "name" : "AMBARI_AGENT" }
],
"hosts" : [
{
"cardinality" : "1+"
}
]
}
],
"configurations" : [
{
"name" : "global",
"properties" : [
{ "name" : "dfs_name_dir", "value" : "/hadoop/hdfs/namenode" },
{ "name" : "fs_checkpoint_dir", "value" : "/hadoop/hdfs/namesecondary" },
{ "name" : "dfs_data_dir", "value" : "/hadoop/hdfs/data" },
{ "name" : "hdfs_log_dir_prefix", "value" : "/var/log/hadoop" },
{ "name" : "hadoop_pid_dir_prefix", "value" : "/var/run/hadoop" },
{ "name" : "dfs_webhdfs_enabled", "value" : false },
{ "name" : "hadoop_heapsize", "value" : "1024" },
{ "name" : "namenode_heapsize", "value" : "1024m" },
{ "name" : "namenode_opt_newsize", "value" : "200m" },
{ "name" : "namenode_opt_maxnewsize", "value" : "640m" },
{ "name" : "datanode_du_reserved", "value" : "1" },
{ "name" : "dtnode_heapsize", "value" : "1024m" },
{ "name" : "dfs_datanode_failed_volume_tolerated", "value" : "0" },
{ "name" : "fs_checkpoint_period", "value" : "21600" },
{ "name" : "fs_checkpoint_size", "value" : "0.5" },
{ "name" : "dfs_exclude", "value" : "dfs.exclude" },
{ "name" : "dfs_include", "value" : "dfs.include" },
{ "name" : "dfs_replication", "value" : "3" },
{ "name" : "dfs_block_local_path_access_user", "value" : "hbase" },
{ "name" : "dfs_datanode_data_dir_perm", "value" : "750" },
{ "name" : "security_enabled", "value" : false },
{ "name" : "kerberos_domain", "value" : "EXAMPLE.COM" },
{ "name" : "kadmin_pw", "value" : "" },
{ "name" : "keytab_path", "value" : "/etc/security/keytabs" },
{ "name" : "namenode_formatted_mark_dir", "value" : "/var/run/hadoop/hdfs/namenode/formatted/" },
{ "name" : "hcat_conf_dir", "value" : "" },
{ "name" : "mapred_local_dir", "value" : "/hadoop/mapred" },
{ "name" : "mapred_system_dir", "value" : "/mapred/system" },
{ "name" : "scheduler_name", "value" : "org.apache.hadoop.mapred.CapacityTaskScheduler" },
{ "name" : "jtnode_opt_newsize", "value" : "200m" },
{ "name" : "jtnode_opt_maxnewsize", "value" : "200m" },
{ "name" : "jtnode_heapsize", "value" : "1024m" },
{ "name" : "mapred_map_tasks_max", "value" : "4" },
{ "name" : "mapred_red_tasks_max", "value" : "2" },
{ "name" : "mapred_cluster_map_mem_mb", "value" : "-1" },
{ "name" : "mapred_cluster_red_mem_mb", "value" : "-1" },
{ "name" : "mapred_cluster_max_map_mem_mb", "value" : "-1" },
{ "name" : "mapred_cluster_max_red_mem_mb", "value" : "-1" },
{ "name" : "mapred_job_map_mem_mb", "value" : "-1" },
{ "name" : "mapred_job_red_mem_mb", "value" : "-1" },
{ "name" : "mapred_child_java_opts_sz", "value" : "768" },
{ "name" : "io_sort_mb", "value" : "200" },
{ "name" : "io_sort_spill_percent", "value" : "0.9" },
{ "name" : "mapreduce_userlog_retainhours", "value" : "24" },
{ "name" : "maxtasks_per_job", "value" : "-1" },
{ "name" : "lzo_enabled", "value" : false },
{ "name" : "snappy_enabled", "value" : true },
{ "name" : "rca_enabled", "value" : true },
{ "name" : "mapred_hosts_exclude", "value" : "mapred.exclude" },
{ "name" : "mapred_hosts_include", "value" : "mapred.include" },
{ "name" : "mapred_jobstatus_dir", "value" : "file:////mapred/jobstatus" },
{ "name" : "task_controller", "value" : "org.apache.hadoop.mapred.DefaultTaskController" },
{ "name" : "nagios_user", "value" : "nagios" },
{ "name" : "nagios_group", "value" : "nagios" },
{ "name" : "nagios_web_login", "value" : "nagiosadmin" },
{ "name" : "nagios_web_password", "value" : "admin" },
{ "name" : "nagios_contact", "value" : "default@REPLACEME.com" },
{ "name" : "hbase_conf_dir", "value" : "/etc/hbase" },
{ "name" : "proxyuser_group", "value" : "users" },
{ "name" : "dfs_datanode_address", "value" : "50010" },
{ "name" : "dfs_datanode_http_address", "value" : "50075" },
{ "name" : "gpl_artifacts_download_url", "value" : "" },
{ "name" : "apache_artifacts_download_url", "value" : "" },
{ "name" : "ganglia_runtime_dir", "value" : "/var/run/ganglia/hdp" },
{ "name" : "gmetad_user", "value" : "nobody" },
{ "name" : "gmond_user", "value" : "nobody" },
{ "name" : "run_dir", "value" : "/var/run/hadoop" },
{ "name" : "hadoop_conf_dir", "value" : "/etc/hadoop" },
{ "name" : "hdfs_user", "value" : "hdfs" },
{ "name" : "mapred_user", "value" : "mapred" },
{ "name" : "hbase_user", "value" : "hbase" },
{ "name" : "hive_user", "value" : "hive" },
{ "name" : "hcat_user", "value" : "hcat" },
{ "name" : "webhcat_user", "value" : "hcat" },
{ "name" : "oozie_user", "value" : "oozie" },
{ "name" : "zk_user", "value" : "zookeeper" },
{ "name" : "user_group", "value" : "hadoop" }
]
},
{
"name" : "core-site",
"properties" : [
{ "name" : "io.file.buffer.size", "value" : "131072" },
{ "name" : "io.serializations", "value" : "org.apache.hadoop.io.serializer.WritableSerialization" },
{ "name" : "io.compression.codec.lzo.class", "value" : "com.hadoop.compression.lzo.LzoCodec" },
{ "name" : "fs.trash.interval", "value" : "360" },
{ "name" : "ipc.client.idlethreshold", "value" : "8000" },
{ "name" : "ipc.client.connection.maxidletime", "value" : "30000" },
{ "name" : "ipc.client.connect.max.retries", "value" : "50" },
{ "name" : "webinterface.private.actions", "value" : "false" },
{ "name" : "fs.default.name", "value" : "hdfs://%AMBARI_HOST%:8020" },
{ "name" : "fs.checkpoint.dir", "value" : "/hadoop/hdfs/namesecondary" },
{ "name" : "fs.checkpoint.period", "value" : "21600" },
{ "name" : "fs.checkpoint.size", "value" : "0.5" },
{ "name" : "fs.checkpoint.edits.dir", "value" : "/hadoop/hdfs/namesecondary" }
]
},
{
"name" : "mapred-site",
"properties" : [
{ "name" : "io.sort.record.percent", "value" : ".2" },
{ "name" : "io.sort.factor", "value" : "100" },
{ "name" : "mapred.tasktracker.tasks.sleeptime-before-sigkill", "value" : "250" },
{ "name" : "mapred.job.tracker.handler.count", "value" : "50" },
{ "name" : "mapreduce.cluster.administrators", "value" : " hadoop" },
{ "name" : "mapred.reduce.parallel.copies", "value" : "30" },
{ "name" : "tasktracker.http.threads", "value" : "50" },
{ "name" : "mapred.map.tasks.speculative.execution", "value" : "false" },
{ "name" : "mapred.reduce.tasks.speculative.execution", "value" : "false" },
{ "name" : "mapred.reduce.slowstart.completed.maps", "value" : "0.05" },
{ "name" : "mapred.inmem.merge.threshold", "value" : "1000" },
{ "name" : "mapred.job.shuffle.merge.percent", "value" : "0.66" },
{ "name" : "mapred.job.shuffle.input.buffer.percent", "value" : "0.7" },
{ "name" : "mapred.output.compression.type", "value" : "BLOCK" },
{ "name" : "mapred.jobtracker.completeuserjobs.maximum", "value" : "0" },
{ "name" : "mapred.jobtracker.restart.recover", "value" : "false" },
{ "name" : "mapred.job.reduce.input.buffer.percent", "value" : "0.0" },
{ "name" : "mapreduce.reduce.input.limit", "value" : "10737418240" },
{ "name" : "mapred.task.timeout", "value" : "600000" },
{ "name" : "jetty.connector", "value" : "org.mortbay.jetty.nio.SelectChannelConnector" },
{ "name" : "mapred.child.root.logger", "value" : "INFO,TLA" },
{ "name" : "mapred.max.tracker.blacklists", "value" : "16" },
{ "name" : "mapred.healthChecker.interval", "value" : "135000" },
{ "name" : "mapred.healthChecker.script.timeout", "value" : "60000" },
{ "name" : "mapred.job.tracker.persist.jobstatus.active", "value" : "false" },
{ "name" : "mapred.job.tracker.persist.jobstatus.hours", "value" : "1" },
{ "name" : "mapred.jobtracker.retirejob.check", "value" : "10000" },
{ "name" : "mapred.jobtracker.retirejob.interval", "value" : "0" },
{ "name" : "mapred.job.tracker.history.completed.location", "value" : "/mapred/history/done" },
{ "name" : "mapreduce.fileoutputcommitter.marksuccessfuljobs", "value" : "false" },
{ "name" : "mapred.job.reuse.jvm.num.tasks", "value" : "1" },
{ "name" : "hadoop.job.history.user.location", "value" : "none" },
{ "name" : "mapreduce.jobtracker.staging.root.dir", "value" : "/user" },
{ "name" : "mapreduce.tasktracker.group", "value" : "hadoop" },
{ "name" : "mapreduce.jobtracker.split.metainfo.maxsize", "value" : "50000000" },
{ "name" : "mapred.jobtracker.blacklist.fault-timeout-window", "value" : "180" },
{ "name" : "mapred.jobtracker.blacklist.fault-bucket-width", "value" : "15" },
{ "name" : "mapred.queue.names", "value" : "default" },
{ "name" : "mapred.local.dir", "value" : "/hadoop/mapred" },
{ "name" : "mapred.jobtracker.taskScheduler", "value" : "org.apache.hadoop.mapred.CapacityTaskScheduler" },
{ "name" : "mapred.tasktracker.map.tasks.maximum", "value" : "4" },
{ "name" : "mapred.tasktracker.reduce.tasks.maximum", "value" : "2" },
{ "name" : "mapred.cluster.reduce.memory.mb", "value" : "-1" },
{ "name" : "mapred.job.map.memory.mb", "value" : "-1" },
{ "name" : "mapred.cluster.max.map.memory.mb", "value" : "-1" },
{ "name" : "mapred.cluster.max.reduce.memory.mb", "value" : "-1" },
{ "name" : "mapred.job.reduce.memory.mb", "value" : "-1" },
{ "name" : "mapred.hosts", "value" : "/etc/hadoop/mapred.include" },
{ "name" : "mapred.hosts.exclude", "value" : "/etc/hadoop/mapred.exclude" },
{ "name" : "mapred.healthChecker.script.path", "value" : "file:////mapred/jobstatus" },
{ "name" : "mapred.job.tracker.persist.jobstatus.dir", "value" : "/etc/hadoop/health_check" },
{ "name" : "mapred.child.java.opts", "value" : "-server -Xmx768m -Djava.net.preferIPv4Stack=true" },
{ "name" : "mapred.cluster.map.memory.mb", "value" : "-1" },
{ "name" : "io.sort.mb", "value" : "200" },
{ "name" : "io.sort.spill.percent", "value" : "0.9" },
{ "name" : "mapred.system.dir", "value" : "/mapred/system" },
{ "name" : "mapred.job.tracker", "value" : "%AMBARI_HOST%:50300" },
{ "name" : "mapred.job.tracker.http.address", "value" : "%AMBARI_HOST%:50030" },
{ "name" : "mapred.userlog.retain.hours", "value" : "24" },
{ "name" : "mapred.jobtracker.maxtasks.per.job", "value" : "-1" },
{ "name" : "mapred.task.tracker.task-controller", "value" : "org.apache.hadoop.mapred.DefaultTaskController" },
{ "name" : "mapreduce.jobtracker.kerberos.principal", "value" : "jt/_HOST@EXAMPLE.COM" },
{ "name" : "mapreduce.tasktracker.kerberos.principal", "value" : "tt/_HOST@EXAMPLE.COM" },
{ "name" : "mapreduce.jobtracker.keytab.file", "value" : "/etc/security/keytabs/jt.service.keytab" },
{ "name" : "mapreduce.tasktracker.keytab.file", "value" : "/etc/security/keytabs/tt.service.keytab" },
{ "name" : "mapreduce.history.server.embedded", "value" : "false" },
{ "name" : "mapreduce.history.server.http.address", "value" : "%AMBARI_HOST%:51111" },
{ "name" : "mapreduce.jobhistory.kerberos.principal", "value" : "jt/_HOST@EXAMPLE.COM" },
{ "name" : "mapreduce.jobhistory.keytab.file", "value" : "/etc/security/keytabs/jt.service.keytab" }
]
},
{
"name" : "hdfs-site",
"properties" : [
{ "name" : "dfs.datanode.socket.write.timeout", "value" : "0" },
{ "name" : "dfs.replication.max", "value" : "50" },
{ "name" : "dfs.heartbeat.interval", "value" : "3" },
{ "name" : "dfs.safemode.threshold.pct", "value" : "1.0f" },
{ "name" : "dfs.balance.bandwidthPerSec", "value" : "6250000" },
{ "name" : "dfs.block.size", "value" : "134217728" },
{ "name" : "dfs.datanode.ipc.address", "value" : "0.0.0.0:8010" },
{ "name" : "dfs.blockreport.initialDelay", "value" : "120" },
{ "name" : "dfs.datanode.du.pct", "value" : "0.85f" },
{ "name" : "dfs.namenode.handler.count", "value" : "40" },
{ "name" : "dfs.datanode.max.xcievers", "value" : "4096" },
{ "name" : "dfs.umaskmode", "value" : "077" },
{ "name" : "dfs.web.ugi", "value" : "gopher,gopher" },
{ "name" : "dfs.permissions", "value" : "true" },
{ "name" : "dfs.permissions.supergroup", "value" : "hdfs" },
{ "name" : "ipc.server.max.response.size", "value" : "5242880" },
{ "name" : "dfs.block.access.token.enable", "value" : "true" },
{ "name" : "dfs.secondary.https.port", "value" : "50490" },
{ "name" : "dfs.https.port", "value" : "50470" },
{ "name" : "dfs.access.time.precision", "value" : "0" },
{ "name" : "dfs.cluster.administrators", "value" : " hdfs" },
{ "name" : "ipc.server.read.threadpool.size", "value" : "5" },
{ "name" : "dfs.name.dir", "value" : "/hadoop/hdfs/namenode" },
{ "name" : "dfs.webhdfs.enabled", "value" : "false" },
{ "name" : "dfs.datanode.failed.volumes.tolerated", "value" : "0" },
{ "name" : "dfs.block.local-path-access.user", "value" : "hbase" },
{ "name" : "dfs.data.dir", "value" : "/hadoop/hdfs/data" },
{ "name" : "dfs.hosts.exclude", "value" : "/etc/hadoop/dfs.exclude" },
{ "name" : "dfs.hosts", "value" : "/etc/hadoop/dfs.include" },
{ "name" : "dfs.replication", "value" : "3" },
{ "name" : "dfs.datanode.address", "value" : "0.0.0.0:50010" },
{ "name" : "dfs.datanode.http.address", "value" : "0.0.0.0:50075" },
{ "name" : "dfs.http.address", "value" : "%AMBARI_HOST%:50070" },
{ "name" : "dfs.datanode.du.reserved", "value" : "1" },
{ "name" : "dfs.namenode.kerberos.principal", "value" : "nn/_HOST@EXAMPLE.COM" },
{ "name" : "dfs.secondary.namenode.kerberos.principal", "value" : "nn/_HOST@EXAMPLE.COM" },
{ "name" : "dfs.namenode.kerberos.https.principal", "value" : "host/_HOST@EXAMPLE.COM" },
{ "name" : "dfs.secondary.namenode.kerberos.https.principal", "value" : "host/_HOST@EXAMPLE.COM" },
{ "name" : "dfs.secondary.http.address", "value" : "%AMBARI_HOST%:50090" },
{ "name" : "dfs.web.authentication.kerberos.keytab", "value" : "/etc/security/keytabs/spnego.service.keytab" },
{ "name" : "dfs.datanode.kerberos.principal", "value" : "dn/_HOST@EXAMPLE.COM" },
{ "name" : "dfs.namenode.keytab.file", "value" : "/etc/security/keytabs/nn.service.keytab" },
{ "name" : "dfs.secondary.namenode.keytab.file", "value" : "/etc/security/keytabs/nn.service.keytab" },
{ "name" : "dfs.datanode.keytab.file", "value" : "/etc/security/keytabs/dn.service.keytab" },
{ "name" : "dfs.https.address", "value" : "%AMBARI_HOST%:50470" },
{ "name" : "dfs.datanode.data.dir.perm", "value" : "750" }
]
}
]
}

View File

@ -0,0 +1,515 @@
# Copyright (c) 2013 Hortonworks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import pkg_resources as pkg
import requests
from savanna import context
from savanna.plugins.hdp import blueprintprocessor as bp
from savanna.plugins.hdp import clusterspec as cs
from savanna.plugins.hdp import configprovider as cfg
from savanna.plugins.hdp import exceptions as ex
from savanna.plugins.hdp.versions import abstractversionhandler as avm
from savanna import version
LOG = logging.getLogger(__name__)
class VersionHandler(avm.AbstractVersionHandler):
config_provider = None
version = None
def _set_version(self, version):
self.version = version
def _get_config_provider(self):
if self.config_provider is None:
self.config_provider = cfg.ConfigurationProvider(
json.load(pkg.resource_stream(version.version_info.package,
'plugins/hdp/versions/1_3_2/resources/'
'ambari-config-resource.json')))
return self.config_provider
def _get_blueprint_processor(self):
processor = bp.BlueprintProcessor(json.loads(
self._get_default_cluster_template()))
return processor
def _get_default_cluster_template(self):
return pkg.resource_string(
version.version_info.package,
'plugins/hdp/versions/1_3_2/resources/default-cluster.template')
def get_version(self):
return self.version
def get_ambari_client(self):
return AmbariClient(self)
def get_config_items(self):
return self._get_config_provider().get_config_items()
def process_cluster(self, user_inputs, node_groups):
processor = self._get_blueprint_processor()
processor.process_user_inputs(user_inputs)
processor.process_node_groups(node_groups)
return processor
def get_applicable_target(self, name):
return self._get_config_provider().get_applicable_target(name)
def get_cluster_spec(self, cluster_template, cluster):
return cs.ClusterSpec(cluster_template, cluster=cluster)
def get_default_cluster_configuration(self):
return cs.ClusterSpec(self._get_default_cluster_template())
def get_node_processes(self):
node_processes = {}
for service in self.get_default_cluster_configuration().services:
components = []
for component in service.components:
components.append(component.name)
node_processes[service.name] = components
return node_processes
def install_swift_integration(self, servers):
for server in servers:
server.install_swift_integration()
class AmbariClient():
def __init__(self, handler):
self.handler = handler
def _add_cluster(self, ambari_info, name):
add_cluster_url = 'http://{0}/api/v1/clusters/{1}'.format(
ambari_info.get_address(), name)
result = requests.post(add_cluster_url,
data='{"Clusters": {"version" : "HDP-1.3.2"}}',
auth=(ambari_info.user, ambari_info.password))
if result.status_code != 201:
LOG.warning(
'Create cluster command failed. {0}'.format(result.text))
return False
return True
def _add_configurations_to_cluster(
self, cluster_spec, ambari_info, name):
configs = cluster_spec.configurations
config_url = 'http://{0}/api/v1/clusters/{1}'.format(
ambari_info.get_address(), name)
body = {}
clusters = {}
body['Clusters'] = clusters
for config_name in configs:
if config_name == 'ambari':
continue
config_body = {}
clusters['desired_config'] = config_body
config_body['type'] = config_name
#TODO(jspeidel): hard coding for now
config_body['tag'] = 'v1'
config_body['properties'] = configs[config_name]
result = requests.put(config_url, data=json.dumps(body), auth=(
ambari_info.user, ambari_info.password))
if result.status_code != 200:
LOG.warning(
'Set configuration command failed. {0}'.format(
result.text))
return False
return True
def _add_services_to_cluster(self, cluster_spec, ambari_info, name):
services = cluster_spec.services
add_service_url = 'http://{0}/api/v1/clusters/{1}/services/{2}'
for service in services:
if service.name != 'AMBARI':
result = requests.post(add_service_url.format(
ambari_info.get_address(), name, service.name),
auth=(ambari_info.user, ambari_info.password))
if result.status_code != 201:
LOG.warning(
'Create service command failed. {0}'.format(
result.text))
return False
return True
def _add_components_to_services(self, cluster_spec, ambari_info, name):
add_component_url = 'http://{0}/api/v1/clusters/{1}/services/{' \
'2}/components/{3}'
for service in cluster_spec.services:
if service.name != 'AMBARI':
for component in service.components:
result = requests.post(add_component_url.format(
ambari_info.get_address(), name, service.name,
component.name), auth=(ambari_info.user,
ambari_info.password))
if result.status_code != 201:
LOG.warning(
'Create component command failed. {0}'.format(
result.text))
return False
return True
def _add_hosts_and_components(
self, cluster_spec, servers, ambari_info, name):
add_host_url = 'http://{0}/api/v1/clusters/{1}/hosts/{2}'
add_host_component_url = 'http://{0}/api/v1/clusters/{1}' \
'/hosts/{2}/host_components/{3}'
for host in servers:
hostname = host.instance.fqdn.lower()
result = requests.post(
add_host_url.format(ambari_info.get_address(), name, hostname),
auth=(ambari_info.user, ambari_info.password))
if result.status_code != 201:
LOG.warning(
'Create host command failed. {0}'.format(result.text))
return False
node_group_name = host.node_group.name
#TODO(jspeidel): ensure that node group exists
node_group = cluster_spec.node_groups[node_group_name]
for component in node_group.components:
# don't add any AMBARI components
if component.find('AMBARI') != 0:
result = requests.post(add_host_component_url.format(
ambari_info.get_address(), name, hostname, component),
auth=(ambari_info.user, ambari_info.password))
if result.status_code != 201:
LOG.warning(
'Create host_component command failed. {0}'.format(
result.text))
return False
return True
def _install_services(self, cluster_name, ambari_info):
LOG.info('Installing required Hadoop services ...')
ambari_address = ambari_info.get_address()
install_url = 'http://{0}/api/v1/clusters/{' \
'1}/services?ServiceInfo/state=INIT'.format(
ambari_address, cluster_name)
body = '{"ServiceInfo": {"state" : "INSTALLED"}}'
result = requests.put(install_url, data=body, auth=(
ambari_info.user, ambari_info.password))
if result.status_code == 202:
#TODO(jspeidel) don't hard code request id
success = self._wait_for_async_request(
self._get_async_request_uri(ambari_info, cluster_name, 1),
auth=(ambari_info.user, ambari_info.password))
if success:
LOG.info("Install of Hadoop stack successful.")
self._finalize_ambari_state(ambari_info)
else:
LOG.critical('Install command failed.')
raise RuntimeError('Hadoop service install failed')
else:
LOG.error(
'Install command failed. {0}'.format(result.text))
raise RuntimeError('Hadoop service install failed')
return success
def _get_async_request_uri(self, ambari_info, cluster_name, request_id):
return 'http://{0}/api/v1/clusters/{1}/requests/{' \
'2}/tasks?fields=Tasks/status'.format(
ambari_info.get_address(), cluster_name,
request_id)
def _wait_for_async_request(self, request_url, auth):
started = False
while not started:
result = requests.get(request_url, auth=auth)
LOG.debug(
'async request ' + request_url + ' response:\n' + result.text)
json_result = json.loads(result.text)
started = True
for items in json_result['items']:
status = items['Tasks']['status']
if status == 'FAILED' or status == 'ABORTED':
return False
else:
if status != 'COMPLETED':
started = False
context.sleep(5)
return started
def _finalize_ambari_state(self, ambari_info):
LOG.info('Finalizing Ambari cluster state.')
persist_state_uri = 'http://{0}/api/v1/persist'.format(
ambari_info.get_address())
# this post data has non-standard format because persist
# resource doesn't comply with Ambari API standards
persist_data = '{ "CLUSTER_CURRENT_STATUS":' \
'"{\\"clusterState\\":\\"CLUSTER_STARTED_5\\"}" }'
result = requests.post(persist_state_uri, data=persist_data,
auth=(ambari_info.user, ambari_info.password))
if result.status_code != 201 and result.status_code != 202:
LOG.warning('Finalizing of Ambari cluster state failed. {0}'.
format(result.text))
raise ex.HadoopProvisionError('Unable to finalize Ambari state.')
def start_services(self, cluster_name, ambari_info):
LOG.info('Starting Hadoop services ...')
LOG.info('Cluster name: {0}, Ambari server address: {1}'
.format(cluster_name, ambari_info.get_address()))
start_url = 'http://{0}/api/v1/clusters/{1}/services?ServiceInfo/' \
'state=INSTALLED'.format(
ambari_info.get_address(), cluster_name)
body = '{"ServiceInfo": {"state" : "STARTED"}}'
auth = (ambari_info.user, ambari_info.password)
result = requests.put(start_url, data=body, auth=auth)
if result.status_code == 202:
# don't hard code request id
success = self._wait_for_async_request(
self._get_async_request_uri(ambari_info, cluster_name, 2),
auth=auth)
if success:
LOG.info(
"Successfully started Hadoop cluster '{0}'.".format(
cluster_name))
else:
LOG.critical('Failed to start Hadoop cluster.')
raise RuntimeError('Failed to start Hadoop cluster.')
else:
LOG.critical(
'Start command failed. Status: {0}, response: {1}'.
format(result.status_code, result.text))
raise RuntimeError('Hadoop cluster start failed.')
def _get_rest_request(self):
return requests
def _exec_ambari_command(self, auth, body, cmd_uri):
LOG.debug('PUT URI: {0}'.format(cmd_uri))
result = requests.put(cmd_uri, data=body,
auth=auth)
if result.status_code == 202:
# don't hard code request id
LOG.debug(
'PUT response: {0}'.format(result.text))
json_result = json.loads(result.text)
href = json_result['href'] + '/tasks?fields=Tasks/status'
success = self._wait_for_async_request(href, auth)
if success:
LOG.info(
"Successfully changed state of Hadoop components ")
else:
LOG.critical('Failed to change state of Hadoop '
'components')
raise RuntimeError('Failed to change state of Hadoop '
'components')
else:
LOG.error(
'Command failed. Status: {0}, response: {1}'.
format(result.status_code, result.text))
raise RuntimeError('Hadoop/Ambari command failed.')
def _get_host_list(self, servers):
host_list = [server.instance.fqdn.lower() for server in servers]
return ",".join(host_list)
def _install_and_start_components(self, cluster_name, servers,
ambari_info):
auth = (ambari_info.user, ambari_info.password)
self.install_components(ambari_info, auth, cluster_name, servers)
self.handler.install_swift_integration(servers)
self.start_components(ambari_info, auth, cluster_name, servers)
def install_components(self, ambari_info, auth, cluster_name, servers):
LOG.info('Starting Hadoop components while scaling up')
LOG.info('Cluster name {0}, Ambari server ip {1}'
.format(cluster_name, ambari_info.get_address()))
# query for the host components on the given hosts that are in the
# INIT state
body = '{"HostRoles": {"state" : "INSTALLED"}}'
install_uri = 'http://{0}/api/v1/clusters/{' \
'1}/host_components?HostRoles/state=INIT&' \
'HostRoles/host_name.in({2})'.format(
ambari_info.get_address(), cluster_name,
self._get_host_list(servers))
self._exec_ambari_command(auth, body, install_uri)
def start_components(self, ambari_info, auth, cluster_name, servers):
# query for all the host components on one of the hosts in the
# INSTALLED state, then get a list of the client services in the list
installed_uri = 'http://{0}/api/v1/clusters/{' \
'1}/host_components?HostRoles/state=INSTALLED&' \
'HostRoles/host_name.in({2})' \
.format(ambari_info.get_address(), cluster_name,
self._get_host_list(servers))
result = requests.get(installed_uri, auth=auth)
if result.status_code == 200:
LOG.debug(
'GET response: {0}'.format(result.text))
json_result = json.loads(result.text)
items = json_result['items']
# select non-CLIENT items
inclusion_list = list(set([x['HostRoles']['component_name']
for x in items if "CLIENT" not in
x['HostRoles']['component_name']]))
# query and start all non-client components on the given set of
# hosts
body = '{"HostRoles": {"state" : "STARTED"}}'
start_uri = 'http://{0}/api/v1/clusters/{' \
'1}/host_components?HostRoles/state=INSTALLED&' \
'HostRoles/host_name.in({2})' \
'&HostRoles/component_name.in({3})'.format(
ambari_info.get_address(), cluster_name,
self._get_host_list(servers),
",".join(inclusion_list))
self._exec_ambari_command(auth, body, start_uri)
else:
raise RuntimeError('Unable to determine installed service '
'components in scaled instances. status'
' code returned = {0}'.format(result.status))
def wait_for_host_registrations(self, num_hosts, ambari_info):
LOG.info(
'Waiting for all Ambari agents to register with server ...')
url = 'http://{0}/api/v1/hosts'.format(ambari_info.get_address())
result = None
json_result = None
#TODO(jspeidel): timeout
while result is None or len(json_result['items']) < num_hosts:
context.sleep(5)
try:
result = requests.get(url, auth=(ambari_info.user,
ambari_info.password))
json_result = json.loads(result.text)
# TODO(jspeidel): just for debug
LOG.info('Registered Hosts: {0} of {1}'.format(
len(json_result['items']), num_hosts))
for hosts in json_result['items']:
LOG.debug('Registered Host: {0}'.format(
hosts['Hosts']['host_name']))
except requests.ConnectionError:
#TODO(jspeidel): max wait time
LOG.info('Waiting to connect to ambari server ...')
def update_ambari_admin_user(self, password, ambari_info):
old_pwd = ambari_info.password
user_url = 'http://{0}/api/v1/users/admin'.format(
ambari_info.get_address())
update_body = '{{"Users":{{"roles":"admin,user","password":"{0}",' \
'"old_password":"{1}"}} }}'.format(password, old_pwd)
request = self._get_rest_request()
result = request.put(user_url, data=update_body, auth=(
ambari_info.user, ambari_info.password))
if result.status_code != 200:
raise ex.HadoopProvisionError('Unable to update Ambari admin user'
' credentials: {0}'.format(
result.text))
def add_ambari_user(self, user, ambari_info):
user_url = 'http://{0}/api/v1/users/{1}'.format(
ambari_info.get_address(), user.name)
create_body = '{{"Users":{{"password":"{0}","roles":"{1}"}} }}'. \
format(user.password, '%s' % ','.join(map(str, user.groups)))
request = self._get_rest_request()
result = request.post(user_url, data=create_body, auth=(
ambari_info.user, ambari_info.password))
if result.status_code != 201:
raise ex.HadoopProvisionError(
'Unable to create Ambari user: {0}'.format(result.text))
def delete_ambari_user(self, user_name, ambari_info):
user_url = 'http://{0}/api/v1/users/{1}'.format(
ambari_info.get_address(), user_name)
request = self._get_rest_request()
result = request.delete(user_url, auth=(
ambari_info.user, ambari_info.password))
if result.status_code != 200:
raise ex.HadoopProvisionError('Unable to delete Ambari user: {0}'
' : {1}'.format(user_name,
result.text))
def scale_cluster(self, name, cluster_spec, servers, num_hosts,
ambari_info):
self.wait_for_host_registrations(num_hosts, ambari_info)
# now add the hosts and the component
self._add_hosts_and_components(cluster_spec, servers,
ambari_info, name)
self._install_and_start_components(name, servers, ambari_info)
def provision_cluster(self, cluster_spec, servers, ambari_info, name):
if not self._add_cluster(ambari_info, name):
return False
# add configurations to cluster
if not self._add_configurations_to_cluster(cluster_spec,
ambari_info, name):
return False
# add services
if not self._add_services_to_cluster(cluster_spec,
ambari_info, name):
return False
# add components to services
if not self._add_components_to_services(cluster_spec,
ambari_info, name):
return False
# add hosts and host_components
if not self._add_hosts_and_components(cluster_spec, servers,
ambari_info, name):
return False
if not self._install_services(name, ambari_info):
return False
self.handler.install_swift_integration(servers)
return True

View File

@ -0,0 +1,57 @@
# Copyright (c) 2013 Hortonworks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from savanna.plugins.hdp.versions import abstractversionhandler as avm
# We have yet to integrate 2.0, so this is essentially a place holder...
class VersionHandler(avm.AbstractVersionHandler):
version = None
def _set_version(self, version):
self.version = version
def get_version(self):
return self.version
def get_config_items(self):
raise NotImplementedError('not yet supported')
def get_ambari_client(self):
raise NotImplementedError('not yet supported')
def process_node_groups(self, node_groups):
raise NotImplementedError('not yet supported')
def get_node_processes(self):
raise NotImplementedError('not yet supported')
def process_user_inputs(self, user_inputs):
raise NotImplementedError('not yet supported')
def get_applicable_target(self, name):
raise NotImplementedError('not yet supported')
def get_cluster_spec(self, cluster, cluster_template):
raise NotImplementedError('not yet supported')
def get_default_cluster_configuration(self):
raise NotImplementedError('not yet supported')
def process_cluster(self, user_inputs, node_groups):
raise NotImplementedError('not yet supported')
def install_swift_integration(self, servers):
raise NotImplementedError('not yet supported')

View File

View File

@ -0,0 +1,57 @@
# Copyright (c) 2013 Hortonworks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
class AbstractVersionHandler():
__metaclass__ = abc.ABCMeta
@abc.abstractmethod
def get_config_items(self):
return
@abc.abstractmethod
def get_applicable_target(self, name):
return
@abc.abstractmethod
def process_cluster(self, user_inputs, node_groups):
return
@abc.abstractmethod
def get_cluster_spec(self, cluster_template, cluster):
return
@abc.abstractmethod
def get_ambari_client(self):
return
@abc.abstractmethod
def get_default_cluster_configuration(self):
return
@abc.abstractmethod
def get_node_processes(self):
return
@abc.abstractmethod
def install_swift_integration(self, servers):
return
@abc.abstractmethod
def get_version(self):
return

View File

@ -0,0 +1,55 @@
# Copyright (c) 2013 Hortonworks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
class VersionHandlerFactory():
versions = None
modules = None
initialized = False
@staticmethod
def get_instance():
if not VersionHandlerFactory.initialized:
src_dir = os.path.join(os.path.dirname(__file__), '')
VersionHandlerFactory.versions = [name.replace('_', '.')
for name in os.listdir(src_dir)
if os.path.isdir(
os.path.join(src_dir, name))]
VersionHandlerFactory.modules = {}
for version in VersionHandlerFactory.versions:
module_name = 'savanna.plugins.hdp.versions.{0}.'\
'versionhandler'.format(
version.replace('.', '_'))
module_class = getattr(
__import__(module_name, fromlist=['savanna']),
'VersionHandler')
module = module_class()
# would prefer to use __init__ or some constructor, but keep
# getting exceptions...
module._set_version(version)
key = version.replace('_', '.')
VersionHandlerFactory.modules[key] = module
VersionHandlerFactory.initialized = True
return VersionHandlerFactory()
def get_versions(self):
return VersionHandlerFactory.versions
def get_version_handler(self, version):
return VersionHandlerFactory.modules[version]

View File

@ -171,7 +171,7 @@ HDP_CONFIG_OPTS = [
'via SSH'),
cfg.StrOpt('HADOOP_VERSION',
default='1.3.0', help='Version of Hadoop'),
default='1.3.2', help='Version of Hadoop'),
cfg.StrOpt('HADOOP_USER',
default='hdfs',
help='Username which is used for access to Hadoop services'),

View File

@ -14,7 +14,6 @@
# limitations under the License.
import mock
import os
import pkg_resources as pkg
from savanna.conductor import resource as r
from savanna.plugins.hdp import ambariplugin as ap
@ -23,6 +22,9 @@ from savanna.plugins.hdp import exceptions as ex
from savanna import version
import unittest2
GET_REST_REQ = "savanna.plugins.hdp.versions.1_3_2.versionhandler." \
"AmbariClient._get_rest_request"
def create_cluster_template(ctx, dct):
return r.ClusterTemplateResource(dct)
@ -32,7 +34,7 @@ class AmbariPluginTest(unittest2.TestCase):
def test_get_node_processes(self):
plugin = ap.AmbariPlugin()
#TODO(jspeidel): provide meaningful input
service_components = plugin.get_node_processes(1)
service_components = plugin.get_node_processes('1.3.2')
self.assertEqual(5, len(service_components))
components = service_components['HDFS']
@ -60,35 +62,32 @@ class AmbariPluginTest(unittest2.TestCase):
@mock.patch("savanna.context.ctx")
def test_convert(self, ctx_func):
plugin = ap.AmbariPlugin()
with open(os.path.join(os.path.realpath('../plugins'), 'hdp',
'resources',
'default-cluster.template'), 'r') as f:
cluster = plugin.convert(f.read(), 'ambari', '1.3.0',
'test-plugin',
create_cluster_template)
with open(os.path.join(os.path.realpath('../plugins'), 'hdp',
'resources',
'default-cluster.template'), 'r') as f:
normalized_config = cs.ClusterSpec(f.read()).normalize()
cluster_config_file = pkg.resource_string(
version.version_info.package,
'plugins/hdp/versions/1_3_2/resources/default-cluster.template')
cluster = plugin.convert(cluster_config_file, 'ambari', '1.3.2',
'test-plugin', create_cluster_template)
normalized_config = cs.ClusterSpec(cluster_config_file).normalize()
self.assertEqual(normalized_config.hadoop_version,
cluster.hadoop_version)
self.assertEqual(len(normalized_config.node_groups),
len(cluster.node_groups))
def test__set_ambari_credentials__admin_only(self):
@mock.patch(GET_REST_REQ)
def test__set_ambari_credentials__admin_only(self, client):
client.side_effect = self._get_test_request
self.requests = []
plugin = ap.AmbariPlugin()
plugin._get_rest_request = self._get_test_request
with open(os.path.join(os.path.realpath('../plugins'), 'hdp',
'resources',
'default-cluster.template'), 'r') as f:
cluster_spec = cs.ClusterSpec(f.read())
cluster_config_file = pkg.resource_string(
version.version_info.package,
'plugins/hdp/versions/1_3_2/resources/default-cluster.template')
cluster_spec = cs.ClusterSpec(cluster_config_file)
ambari_info = ap.AmbariInfo(TestHost('111.11.1111'),
'8080', 'admin', 'old-pwd')
plugin._set_ambari_credentials(cluster_spec, ambari_info)
plugin._set_ambari_credentials(cluster_spec, ambari_info, '1.3.2')
self.assertEqual(1, len(self.requests))
request = self.requests[0]
@ -101,15 +100,16 @@ class AmbariPluginTest(unittest2.TestCase):
self.assertEqual('admin', ambari_info.user)
self.assertEqual('admin', ambari_info.password)
def test__set_ambari_credentials__new_user_no_admin(self):
@mock.patch(GET_REST_REQ)
def test__set_ambari_credentials__new_user_no_admin(self, client):
self.requests = []
plugin = ap.AmbariPlugin()
plugin._get_rest_request = self._get_test_request
client.side_effect = self._get_test_request
with open(os.path.join(os.path.realpath('../plugins'), 'hdp',
'resources',
'default-cluster.template'), 'r') as f:
cluster_spec = cs.ClusterSpec(f.read())
cluster_config_file = pkg.resource_string(
version.version_info.package,
'plugins/hdp/versions/1_3_2/resources/default-cluster.template')
cluster_spec = cs.ClusterSpec(cluster_config_file)
for service in cluster_spec.services:
if service.name == 'AMBARI':
@ -119,7 +119,7 @@ class AmbariPluginTest(unittest2.TestCase):
ambari_info = ap.AmbariInfo(TestHost('111.11.1111'), '8080',
'admin', 'old-pwd')
plugin._set_ambari_credentials(cluster_spec, ambari_info)
plugin._set_ambari_credentials(cluster_spec, ambari_info, '1.3.2')
self.assertEqual(2, len(self.requests))
request = self.requests[0]
@ -139,15 +139,16 @@ class AmbariPluginTest(unittest2.TestCase):
self.assertEqual('test', ambari_info.user)
self.assertEqual('test_pw', ambari_info.password)
def test__set_ambari_credentials__new_user_with_admin(self):
@mock.patch(GET_REST_REQ)
def test__set_ambari_credentials__new_user_with_admin(self, client):
self.requests = []
plugin = ap.AmbariPlugin()
plugin._get_rest_request = self._get_test_request
client.side_effect = self._get_test_request
with open(os.path.join(os.path.realpath('../plugins'), 'hdp',
'resources',
'default-cluster.template'), 'r') as f:
cluster_spec = cs.ClusterSpec(f.read())
cluster_config_file = pkg.resource_string(
version.version_info.package,
'plugins/hdp/versions/1_3_2/resources/default-cluster.template')
cluster_spec = cs.ClusterSpec(cluster_config_file)
for service in cluster_spec.services:
if service.name == 'AMBARI':
@ -156,7 +157,7 @@ class AmbariPluginTest(unittest2.TestCase):
ambari_info = ap.AmbariInfo(TestHost('111.11.1111'), '8080',
'admin', 'old-pwd')
plugin._set_ambari_credentials(cluster_spec, ambari_info)
plugin._set_ambari_credentials(cluster_spec, ambari_info, '1.3.2')
self.assertEqual(2, len(self.requests))
request = self.requests[0]
@ -178,15 +179,16 @@ class AmbariPluginTest(unittest2.TestCase):
self.assertEqual('admin', ambari_info.user)
self.assertEqual('admin', ambari_info.password)
def test__set_ambari_credentials__no_admin_user(self):
@mock.patch(GET_REST_REQ)
def test__set_ambari_credentials__no_admin_user(self, client):
self.requests = []
plugin = ap.AmbariPlugin()
plugin._get_rest_request = self._get_test_request
client.side_effect = self._get_test_request
with open(os.path.join(os.path.realpath('../plugins'), 'hdp',
'resources',
'default-cluster.template'), 'r') as f:
cluster_spec = cs.ClusterSpec(f.read())
cluster_config_file = pkg.resource_string(
version.version_info.package,
'plugins/hdp/versions/1_3_2/resources/default-cluster.template')
cluster_spec = cs.ClusterSpec(cluster_config_file)
for service in cluster_spec.services:
if service.name == 'AMBARI':
@ -199,7 +201,7 @@ class AmbariPluginTest(unittest2.TestCase):
'8080', 'admin', 'old-pwd')
self.assertRaises(ex.HadoopProvisionError,
plugin._set_ambari_credentials(cluster_spec,
ambari_info))
ambari_info, '1.3.2'))
@mock.patch("savanna.utils.openstack.nova.get_instance_info")
def test__get_ambari_info(self, patched):
@ -207,7 +209,7 @@ class AmbariPluginTest(unittest2.TestCase):
cluster_config_file = pkg.resource_string(
version.version_info.package,
'plugins/hdp/resources/default-cluster.template')
'plugins/hdp/versions/1_3_2/resources/default-cluster.template')
test_host = TestServer(
'host1', 'test-master', '111.11.1111',
@ -235,7 +237,7 @@ class AmbariPluginTest(unittest2.TestCase):
cluster_config_file = pkg.resource_string(
version.version_info.package,
'plugins/hdp/resources/default-cluster.template')
'plugins/hdp/versions/1_3_2/resources/default-cluster.template')
cluster_spec = cs.ClusterSpec(cluster_config_file)
ambari_info = ap.AmbariInfo(TestHost('111.11.1111'),

View File

@ -30,7 +30,7 @@ class ClusterSpecTest(unittest2.TestCase):
patched.side_effect = _test_get_instance_info
cluster_config_file = pkg.resource_string(
version.version_info.package,
'plugins/hdp/resources/default-cluster.template')
'plugins/hdp/versions/1_3_2/resources/default-cluster.template')
server1 = TestServer('host1', 'test-master', '11111', 3, '111.11.1111',
'222.11.1111',
@ -60,7 +60,7 @@ class ClusterSpecTest(unittest2.TestCase):
patched.side_effect = _test_get_instance_info
cluster_config_file = pkg.resource_string(
version.version_info.package,
'plugins/hdp/resources/default-cluster.template')
'plugins/hdp/versions/1_3_2/resources/default-cluster.template')
server1 = TestServer('ambari_machine', 'master', '11111', 3,
'111.11.1111', '222.11.1111',
@ -90,7 +90,7 @@ class ClusterSpecTest(unittest2.TestCase):
patched.side_effect = _test_get_instance_info
cluster_config_file = pkg.resource_string(
version.version_info.package,
'plugins/hdp/resources/default-cluster.template')
'plugins/hdp/versions/1_3_2/resources/default-cluster.template')
master_host = TestServer(
'master.novalocal', 'master', '11111', 3,
@ -160,7 +160,7 @@ class ClusterSpecTest(unittest2.TestCase):
def test_ambari_rpm_path(self):
cluster_config_file = pkg.resource_string(
version.version_info.package,
'plugins/hdp/resources/default-cluster.template')
'plugins/hdp/versions/1_3_2/resources/default-cluster.template')
cluster_spec = cs.ClusterSpec(cluster_config_file)
ambari_config = cluster_spec.configurations['ambari']
@ -172,7 +172,7 @@ class ClusterSpecTest(unittest2.TestCase):
def test_parse_default(self):
cluster_config_file = pkg.resource_string(
version.version_info.package,
'plugins/hdp/resources/default-cluster.template')
'plugins/hdp/versions/1_3_2/resources/default-cluster.template')
cluster_config = cs.ClusterSpec(cluster_config_file)
@ -185,7 +185,7 @@ class ClusterSpecTest(unittest2.TestCase):
def test_ambari_rpm(self):
cluster_config_file = pkg.resource_string(
version.version_info.package,
'plugins/hdp/resources/default-cluster.template')
'plugins/hdp/versions/1_3_2/resources/default-cluster.template')
cluster_config = cs.ClusterSpec(cluster_config_file)
@ -197,7 +197,7 @@ class ClusterSpecTest(unittest2.TestCase):
def test_normalize(self):
cluster_config_file = pkg.resource_string(
version.version_info.package,
'plugins/hdp/resources/default-cluster.template')
'plugins/hdp/versions/1_3_2/resources/default-cluster.template')
cluster_config = cs.ClusterSpec(cluster_config_file)
cluster = cluster_config.normalize()

View File

@ -0,0 +1,37 @@
# Copyright (c) 2013 Hortonworks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from savanna.plugins.hdp.versions.versionhandlerfactory import \
VersionHandlerFactory
import unittest2
class VersionManagerFactoryTest(unittest2.TestCase):
def test_get_versions(self):
factory = VersionHandlerFactory.get_instance()
versions = factory.get_versions()
self.assertEqual(2, len(versions))
self.assertIn('1.3.2', versions)
self.assertIn('2.0', versions)
def test_get_version_handlers(self):
factory = VersionHandlerFactory.get_instance()
versions = factory.get_versions()
for version in versions:
handler = factory.get_version_handler(version)
self.assertIsNotNone(handler)
self.assertEqual(version, handler.get_version())