Merge "Remove hdp 2.0.6 plugin"

This commit is contained in:
Jenkins 2016-06-02 18:16:15 +00:00 committed by Gerrit Code Review
commit 18aa22e7c5
33 changed files with 5 additions and 13200 deletions

View File

@ -27,7 +27,7 @@ SAHARA_SERVICE_PROTOCOL=${SAHARA_SERVICE_PROTOCOL:-$SERVICE_PROTOCOL}
SAHARA_AUTH_CACHE_DIR=${SAHARA_AUTH_CACHE_DIR:-/var/cache/sahara}
SAHARA_ENABLED_PLUGINS=${SAHARA_ENABLED_PLUGINS:-\
vanilla,hdp,cdh,mapr,spark,storm,fake}
vanilla,cdh,mapr,spark,storm,fake}
SAHARA_BIN_DIR=$(get_python_exec_prefix)
SAHARA_ENABLE_DISTRIBUTED_PERIODICS=${SAHARA_ENABLE_DISTRIBUTED_PERIODICS:-\

View File

@ -0,0 +1,4 @@
---
deprecations:
- Support of HDP 2.0.6 plugin was removed. Use Ambari plugin
instead.

View File

@ -1,450 +0,0 @@
# Copyright (c) 2013 Hortonworks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
from oslo_log import log as logging
from sahara import conductor as c
from sahara import context
from sahara import exceptions as base_exc
from sahara.i18n import _
from sahara.i18n import _LI
from sahara.plugins import exceptions as ex
from sahara.plugins.hdp import hadoopserver as h
from sahara.plugins.hdp import saharautils as utils
from sahara.plugins.hdp.versions import versionhandlerfactory as vhf
from sahara.plugins import provisioning as p
from sahara.topology import topology_helper as th
from sahara.utils import cluster_progress_ops as cpo
conductor = c.API
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
class AmbariPlugin(p.ProvisioningPluginBase):
def __init__(self):
self.cluster_ambari_mapping = {}
self.version_factory = vhf.VersionHandlerFactory.get_instance()
def create_cluster(self, cluster):
version = cluster.hadoop_version
handler = self.version_factory.get_version_handler(version)
cluster_spec = handler.get_cluster_spec(
cluster, self._map_to_user_inputs(
version, cluster.cluster_configs))
hosts = self._get_servers(cluster)
ambari_info = self.get_ambari_info(cluster_spec)
self.cluster_ambari_mapping[cluster.name] = ambari_info
rpm = self._get_rpm_uri(cluster_spec)
servers = []
for host in hosts:
host_role = utils.get_host_role(host)
servers.append(
h.HadoopServer(host, cluster_spec.node_groups[host_role],
ambari_rpm=rpm))
self._provision_cluster(
cluster.name, cluster_spec, ambari_info, servers,
cluster.hadoop_version)
# add the topology data file and script if rack awareness is
# enabled
self._configure_topology_for_cluster(cluster, servers)
LOG.info(_LI("Install of Hadoop stack successful."))
# add service urls
self._set_cluster_info(cluster, cluster_spec)
# check if HDFS HA is enabled; set it up if so
if cluster_spec.is_hdfs_ha_enabled(cluster):
self.configure_hdfs_ha(cluster)
@cpo.event_wrapper(
True, step=_("Add configurations to cluster"), param=('cluster', 1))
def configure_hdfs_ha(self, cluster):
LOG.debug("Configuring HDFS HA")
version = cluster.hadoop_version
handler = self.version_factory.get_version_handler(version)
cluster_spec = handler.get_cluster_spec(
cluster, self._map_to_user_inputs(
version, cluster.cluster_configs))
hosts = self._get_servers(cluster)
ambari_info = self.get_ambari_info(cluster_spec)
self.cluster_ambari_mapping[cluster.name] = ambari_info
rpm = self._get_rpm_uri(cluster_spec)
servers = []
for host in hosts:
host_role = utils.get_host_role(host)
servers.append(
h.HadoopServer(host, cluster_spec.node_groups[host_role],
ambari_rpm=rpm))
ambari_client = handler.get_ambari_client()
ambari_client.setup_hdfs_ha(cluster_spec, servers, ambari_info,
cluster.name)
LOG.info(_LI("Configure HDFS HA successful."))
def _get_servers(self, cluster):
servers = []
if hasattr(cluster, 'node_groups') and cluster.node_groups is not None:
# code for a cluster object
for node_group in cluster.node_groups:
servers += node_group.instances
else:
# cluster is actually a cloud context
servers = cluster.instances
return servers
def get_node_processes(self, hadoop_version):
node_processes = {}
version_handler = (
self.version_factory.get_version_handler(hadoop_version))
default_config = version_handler.get_default_cluster_configuration()
for service in default_config.services:
components = []
for component in service.components:
if service.is_user_template_component(component):
components.append(component.name)
node_processes[service.name] = components
return node_processes
def convert(self, config, plugin_name, version, template_name,
cluster_template_create):
handler = self.version_factory.get_version_handler(version)
normalized_config = handler.get_cluster_spec(
None, None, cluster_template=config).normalize()
node_groups = []
for ng in normalized_config.node_groups:
node_group = {
"name": ng.name,
"flavor_id": ng.flavor,
"node_processes": ng.node_processes,
"count": ng.count
}
node_groups.append(node_group)
cluster_configs = dict()
config_resource = handler.get_config_items()
for entry in normalized_config.cluster_configs:
user_input = next((ui for ui in config_resource
if entry.config.name == ui.name), None)
if user_input is not None:
ci = entry.config
# get the associated service dictionary
target = entry.config.applicable_target
service_dict = cluster_configs.get(target, {})
service_dict[ci.name] = entry.value
cluster_configs[target] = service_dict
else:
LOG.debug('Template based input "{entry_name}" is being'
' filtered out as it is not considered a user input'
.format(entry_name=entry.config.name))
ctx = context.ctx()
return cluster_template_create(ctx,
{"name": template_name,
"plugin_name": plugin_name,
"hadoop_version": version,
"node_groups": node_groups,
"cluster_configs": cluster_configs})
def update_infra(self, cluster):
pass
def convert_props_to_template(self, props):
raise NotImplementedError('not yet supported')
def _provision_cluster(self, name, cluster_spec, ambari_info,
servers, version):
# TODO(jspeidel): encapsulate in another class
if servers:
cpo.add_provisioning_step(
servers[0].cluster_id,
_("Provision cluster via Ambari"), len(servers))
with context.ThreadGroup() as tg:
for server in servers:
with context.set_current_instance_id(
server.instance['instance_id']):
tg.spawn(
"hdp-provision-instance-%s" %
server.instance.hostname(),
server.provision_ambari, ambari_info, cluster_spec)
handler = self.version_factory.get_version_handler(version)
ambari_client = handler.get_ambari_client()
ambari_client.wait_for_host_registrations(len(servers), ambari_info)
self._set_ambari_credentials(cluster_spec, ambari_info, version)
ambari_client.provision_cluster(
cluster_spec, servers, ambari_info, name)
LOG.info(_LI('Cluster provisioned via Ambari Server: {server_ip}')
.format(server_ip=ambari_info.get_address()))
# TODO(jspeidel): invoke during scale cluster. Will need to handle dups
def _set_cluster_info(self, cluster, cluster_spec):
info = {}
for service in cluster_spec.services:
if service.deployed:
service.register_service_urls(cluster_spec, info, cluster)
conductor.cluster_update(context.ctx(), cluster, {'info': info})
def _set_ambari_credentials(self, cluster_spec, ambari_info, version):
services = cluster_spec.services
ambari_client = (self.version_factory.get_version_handler(version).
get_ambari_client())
for service in services:
if service.name == 'AMBARI':
is_admin_provided = False
admin_user = ambari_info.user
admin_password = ambari_info.password
for user in service.users:
if user.name == 'admin':
ambari_client.update_ambari_admin_user(
user.password, ambari_info)
is_admin_provided = True
ambari_info.user = 'admin'
ambari_info.password = user.password
else:
ambari_client.add_ambari_user(user, ambari_info)
if 'admin' in user.groups:
admin_user = user.name
admin_password = user.password
if not is_admin_provided:
if admin_user is None:
raise ex.HadoopProvisionError(_("An Ambari user in the"
" admin group must be "
"configured."))
ambari_info.user = admin_user
ambari_info.password = admin_password
ambari_client.delete_ambari_user('admin', ambari_info)
break
def _update_ambari_info_credentials(self, cluster_spec, ambari_info):
services = cluster_spec.services
ambari_service = next((service for service in services if
service.name == 'AMBARI'), None)
if ambari_service is not None:
admin_user = next((user for user in ambari_service.users
if 'admin' in user.groups), None)
if admin_user is not None:
ambari_info.user = admin_user.name
ambari_info.password = admin_user.password
LOG.info(_LI('Using "{username}" as admin user for scaling of cluster')
.format(username=ambari_info.user))
# PLUGIN SPI METHODS:
def get_versions(self):
return self.version_factory.get_versions()
def configure_cluster(self, cluster):
self.create_cluster(cluster)
def get_configs(self, hadoop_version):
handler = self.version_factory.get_version_handler(hadoop_version)
return handler.get_config_items()
# cluster name argument supports the non-sahara cluster creation mode
def start_cluster(self, cluster):
client = self.version_factory.get_version_handler(
cluster.hadoop_version).get_ambari_client()
handler = self.version_factory.get_version_handler(
cluster.hadoop_version)
cluster_spec = handler.get_cluster_spec(
cluster, self._map_to_user_inputs(
cluster.hadoop_version, cluster.cluster_configs))
try:
client.start_services(cluster.name, cluster_spec,
self.cluster_ambari_mapping[cluster.name])
finally:
client.cleanup(self.cluster_ambari_mapping[cluster.name])
def get_title(self):
return 'Hortonworks Data Platform'
def get_description(self):
return _('The Hortonworks Sahara plugin automates the deployment '
'of the Hortonworks Data Platform (HDP) on OpenStack.')
def validate(self, cluster):
raise base_exc.DeprecatedException(
_("The HDP 2.0.6 plugin is deprecated in Mitaka release and "
"will be removed in Newton release. Please, use the Ambari 2.3 "
"instead."))
def scale_cluster(self, cluster, instances):
handler = self.version_factory.get_version_handler(
cluster.hadoop_version)
ambari_client = handler.get_ambari_client()
cluster_spec = handler.get_cluster_spec(
cluster, self._map_to_user_inputs(
cluster.hadoop_version, cluster.cluster_configs))
rpm = self._get_rpm_uri(cluster_spec)
servers = []
for instance in instances:
host_role = utils.get_host_role(instance)
servers.append(h.HadoopServer(instance,
cluster_spec.node_groups
[host_role],
ambari_rpm=rpm))
ambari_info = self.get_ambari_info(cluster_spec)
self._update_ambari_info_credentials(cluster_spec, ambari_info)
cpo.add_provisioning_step(
cluster.id, _("Provision cluster via Ambari"), len(servers))
with context.ThreadGroup() as tg:
for server in servers:
with context.set_current_instance_id(
server.instance['instance_id']):
tg.spawn('Ambari provisioning thread',
server.provision_ambari,
ambari_info, cluster_spec)
ambari_client.configure_scaled_cluster_instances(
cluster.name, cluster_spec, self._get_num_hosts(cluster),
ambari_info)
self._configure_topology_for_cluster(cluster, servers)
ambari_client.start_scaled_cluster_instances(cluster.name,
cluster_spec, servers,
ambari_info)
ambari_client.cleanup(ambari_info)
def get_edp_engine(self, cluster, job_type):
version_handler = (
self.version_factory.get_version_handler(cluster.hadoop_version))
return version_handler.get_edp_engine(cluster, job_type)
def get_edp_job_types(self, versions=None):
res = {}
for vers in self.version_factory.get_versions():
if not versions or vers in versions:
vh = self.version_factory.get_version_handler(vers)
res[vers] = vh.get_edp_job_types()
return res
def get_edp_config_hints(self, job_type, version):
version_handler = (
self.version_factory.get_version_handler(version))
return version_handler.get_edp_config_hints(job_type)
def decommission_nodes(self, cluster, instances):
LOG.info(_LI('AmbariPlugin: decommission_nodes called for '
'HDP version = {version}')
.format(version=cluster.hadoop_version))
handler = self.version_factory.get_version_handler(
cluster.hadoop_version)
ambari_client = handler.get_ambari_client()
cluster_spec = handler.get_cluster_spec(
cluster, self._map_to_user_inputs(
cluster.hadoop_version, cluster.cluster_configs))
ambari_info = self.get_ambari_info(cluster_spec)
ambari_client.decommission_cluster_instances(cluster, cluster_spec,
instances,
ambari_info)
def validate_scaling(self, cluster, existing, additional):
handler = self.version_factory.get_version_handler(
cluster.hadoop_version)
# results in validation
handler.get_cluster_spec(
cluster, [],
dict(list(existing.items()) + list(additional.items())))
def _get_num_hosts(self, cluster):
count = 0
for node_group in cluster.node_groups:
count += node_group.count
return count
def _get_host_list(self, servers):
host_list = [server.instance.fqdn().lower() for server in servers]
return ",".join(host_list)
def _get_rpm_uri(self, cluster_spec):
ambari_config = cluster_spec.configurations['ambari']
return ambari_config.get('rpm', None)
def get_ambari_info(self, cluster_spec):
ambari_host = cluster_spec.determine_component_hosts(
'AMBARI_SERVER').pop()
port = cluster_spec.configurations['ambari'].get(
'server.port', '8080')
return AmbariInfo(ambari_host, port, 'admin', 'admin')
def _configure_topology_for_cluster(self, cluster, servers):
if CONF.enable_data_locality:
cpo.add_provisioning_step(
cluster.id, _("Enable data locality for cluster"),
len(servers))
topology_data = th.generate_topology_map(
cluster, CONF.enable_hypervisor_awareness)
topology_str = "\n".join(
[k + " " + v for k, v in topology_data.items()]) + "\n"
for server in servers:
server.configure_topology(topology_str)
def get_open_ports(self, node_group):
handler = self.version_factory.get_version_handler(
node_group.cluster.hadoop_version)
return handler.get_open_ports(node_group)
class AmbariInfo(object):
def __init__(self, host, port, user, password):
self.host = host
self.port = port
self.user = user
self.password = password
def get_address(self):
return '{0}:{1}'.format(self.host.management_ip, self.port)
def is_ambari_info(self):
pass
def get_cluster(self):
sahara_instance = self.host.sahara_instance
return sahara_instance.cluster
def get_event_info(self):
return self.host.sahara_instance

View File

@ -1,385 +0,0 @@
# Copyright (c) 2013 Hortonworks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_log import log as logging
from oslo_serialization import jsonutils as json
from sahara.i18n import _
from sahara.plugins import exceptions as ex
from sahara.plugins.hdp.versions import versionhandlerfactory as vhf
LOG = logging.getLogger(__name__)
def validate_number_of_datanodes(cluster, scaled_groups, default_configs):
dfs_replication = 0
for config in default_configs:
if config.name == 'dfs.replication':
dfs_replication = config.default_value
conf = cluster.cluster_configs
if 'HDFS' in conf and 'dfs.replication' in conf['HDFS']:
dfs_replication = conf['HDFS']['dfs.replication']
if not scaled_groups:
scaled_groups = {}
dn_count = 0
for ng in cluster.node_groups:
if 'DATANODE' in ng.node_processes:
if ng.id in scaled_groups:
dn_count += scaled_groups[ng.id]
else:
dn_count += ng.count
if dn_count < int(dfs_replication):
raise ex.InvalidComponentCountException(
'datanode', _('%s or more') % dfs_replication, dn_count,
_('Number of %(dn)s instances should not be less '
'than %(replication)s')
% {'dn': 'DATANODE', 'replication': 'dfs.replication'})
class ClusterSpec(object):
def __init__(self, config, version='2.0.6'):
self._config_template = config
self.services = []
self.configurations = {}
self.node_groups = {}
self.version = version
self.user_input_handlers = {}
cluster_template = json.loads(config)
self._parse_services(cluster_template)
self._parse_configurations(cluster_template)
self._process_node_groups(template_json=cluster_template)
def create_operational_config(self, cluster, user_inputs,
scaled_groups=None):
if scaled_groups is None:
scaled_groups = {}
self._determine_deployed_services(cluster)
self._process_node_groups(cluster=cluster)
for ng_id in scaled_groups:
existing = next(group for group in self.node_groups.values()
if group.id == ng_id)
existing.count = scaled_groups[ng_id]
self.validate_node_groups(cluster)
self._finalize_ng_components()
self._parse_configurations(json.loads(self._config_template))
self._process_user_inputs(user_inputs)
self._replace_config_tokens()
def scale(self, updated_groups):
for ng_id in updated_groups:
existing = next(group for group in self.node_groups.values()
if group.id == ng_id)
existing.count = updated_groups[ng_id]
def validate_node_groups(self, cluster):
for service in self.services:
if service.deployed:
service.validate(self, cluster)
elif service.is_mandatory():
raise ex.RequiredServiceMissingException(service.name)
def get_deployed_configurations(self):
configs = set()
for service in self.services:
if service.deployed:
configs |= service.configurations
return configs
def determine_component_hosts(self, component):
hosts = set()
for ng in self.node_groups.values():
if component in ng.components:
hosts |= ng.instances
return hosts
def normalize(self):
return NormalizedClusterConfig(self)
def get_deployed_node_group_count(self, name):
count = 0
for ng in self.get_node_groups_containing_component(name):
count += ng.count
return count
def get_node_groups_containing_component(self, component):
found_node_groups = []
for ng in self.node_groups.values():
if component in ng.components:
found_node_groups.append(ng)
return found_node_groups
def get_components_for_type(self, type):
components = set()
for service in self.services:
for component in service.components:
if component.type == type:
components.add(component.name)
return components
def is_hdfs_ha_enabled(self, cluster):
if self.version == '2.0.6':
if cluster.cluster_configs.get('HDFSHA', False):
if cluster.cluster_configs.HDFSHA.get('hdfs.nnha',
False) is True:
return True
return False
def _parse_services(self, template_json):
handler = (vhf.VersionHandlerFactory.get_instance().
get_version_handler(self.version))
sp = handler.get_services_processor()
for s in template_json['services']:
name = s['name']
service = sp.create_service(name)
self.services.append(service)
for c in s['components']:
component = Component(c['name'], c['type'], c['cardinality'])
service.add_component(component)
if 'users' in s:
for u in s['users']:
user = User(u['name'], u['password'], u['groups'])
service.add_user(user)
configs = self._parse_configurations(s)
for config in configs:
service.add_configuration(config)
def _parse_configurations(self, template_json):
config_names = []
for config in template_json['configurations']:
config_props = {}
name = config['name']
config_names.append(name)
if name in self.configurations:
config_props = self.configurations[name]
else:
self.configurations[name] = config_props
if 'properties' in config:
for prop in config['properties']:
config_props[prop['name']] = prop['value']
return config_names
def _process_node_groups(self, template_json=None, cluster=None):
# get node_groups from config
if template_json and not cluster:
for group in template_json['host_role_mappings']:
node_group = NodeGroup(group['name'])
for component in group['components']:
node_group.add_component(component['name'])
for host in group['hosts']:
if 'predicate' in host:
node_group.predicate = host['predicate']
if 'cardinality' in host:
node_group.cardinality = host['cardinality']
if 'default_count' in host:
node_group.count = host['default_count']
self.node_groups[node_group.name] = node_group
if cluster:
self.node_groups = {}
node_groups = cluster.node_groups
for ng in node_groups:
node_group = NodeGroup(ng.name)
node_group.count = ng.count
node_group.id = ng.id
node_group.components = ng.node_processes[:]
for instance in ng.instances:
node_group.instances.add(Instance(instance))
self.node_groups[node_group.name] = node_group
def _determine_deployed_services(self, cluster):
for ng in cluster.node_groups:
for service in self.services:
if service.deployed:
continue
for sc in service.components:
if sc.name in ng.node_processes:
service.deployed = True
service.register_user_input_handlers(
self.user_input_handlers)
break
def _process_user_inputs(self, user_inputs):
for ui in user_inputs:
# if it doesn't have a tag then it's not part of the
# operational config that Ambari knows about
if not hasattr(ui.config, 'tag'):
continue
user_input_handler = self.user_input_handlers.get(
'{0}/{1}'.format(ui.config.tag, ui.config.name),
self._default_user_input_handler)
user_input_handler(ui, self.configurations)
def _replace_config_tokens(self):
for service in self.services:
if service.deployed:
service.finalize_configuration(self)
def _finalize_ng_components(self):
for service in self.services:
if service.deployed:
service.finalize_ng_components(self)
def _default_user_input_handler(self, user_input, configurations):
config_map = configurations[user_input.config.tag]
config_map[user_input.config.name] = user_input.value
class Component(object):
def __init__(self, name, component_type, cardinality):
self.name = name
self.type = component_type
self.cardinality = cardinality
class NodeGroup(object):
def __init__(self, name):
self.id = None
self.name = name
self.components = []
self.predicate = None
self.cardinality = None
self.count = None
self.instances = set()
def add_component(self, component):
self.components.append(component)
class User(object):
def __init__(self, name, password, groups):
self.name = name
self.password = password
self.groups = groups
class Instance(object):
def __init__(self, sahara_instance):
self.inst_fqdn = sahara_instance.fqdn()
self.management_ip = sahara_instance.management_ip
self.internal_ip = sahara_instance.internal_ip
self.sahara_instance = sahara_instance
def fqdn(self):
return self.inst_fqdn
def remote(self):
return self.sahara_instance.remote()
def __hash__(self):
return hash(self.fqdn())
def __eq__(self, other):
return self.fqdn() == other.fqdn()
class NormalizedClusterConfig(object):
def __init__(self, cluster_spec):
self.hadoop_version = cluster_spec.version
self.cluster_configs = []
self.node_groups = []
self.handler = (vhf.VersionHandlerFactory.get_instance().
get_version_handler(self.hadoop_version))
self._parse_configurations(cluster_spec.configurations)
self._parse_node_groups(cluster_spec.node_groups)
def _parse_configurations(self, configurations):
for config_name, properties in configurations.items():
for prop, value in properties.items():
target = self._get_property_target(prop)
if target:
prop_type = self._get_property_type(prop, value)
# TODO(sdpeidel): should we supply a scope?
self.cluster_configs.append(
NormalizedConfigEntry(NormalizedConfig(
prop, prop_type, value, target, 'cluster'),
value))
def _parse_node_groups(self, node_groups):
for node_group in node_groups.values():
self.node_groups.append(NormalizedNodeGroup(node_group))
def _get_property_target(self, prop):
return self.handler.get_applicable_target(prop)
def _get_property_type(self, prop, value):
# TODO(jspeidel): seems that all numeric prop values in default config
# are encoded as strings. This may be incorrect.
# TODO(jspeidel): should probably analyze string value to determine if
# it is numeric
# TODO(jspeidel): would then need to know whether Ambari expects a
# string or a numeric value
prop_type = type(value).__name__
# print 'Type: {0}'.format(prop_type)
if prop_type == 'str' or prop_type == 'unicode' or value == '':
return 'string'
elif prop_type == 'int':
return 'integer'
elif prop_type == 'bool':
return 'boolean'
else:
raise ValueError(
_("Could not determine property type for property "
"'%(property)s' with value: %(value)s") %
{"property": prop, "value": value})
class NormalizedConfig(object):
def __init__(self, name, config_type, default_value, target, scope):
self.name = name
self.description = None
self.type = config_type
self.default_value = default_value
self.is_optional = False
self.applicable_target = target
self.scope = scope
class NormalizedConfigEntry(object):
def __init__(self, config, value):
self.config = config
self.value = value
class NormalizedNodeGroup(object):
def __init__(self, node_group):
self.name = node_group.name
self.node_processes = node_group.components
self.node_configs = None
# TODO(jpseidel): should not have to specify img/flavor
self.img = None
# TODO(jmaron) the flavor will be set via an ambari blueprint setting,
# but that setting doesn't exist yet. It will be addressed by a bug
# fix shortly
self.flavor = 3
self.count = node_group.count
self.id = node_group.id

View File

@ -1,81 +0,0 @@
# Copyright (c) 2015 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_serialization import jsonutils as json
from sahara.service.edp.oozie.workflow_creator import workflow_factory
from sahara.utils import files as pkg
def get_possible_hive_config_from(file_name):
'''Return the possible configs, args, params for a Hive job.'''
config = {
'configs': load_hadoop_json_for_tag(file_name, 'hive-site.xml'),
'params': {}
}
return config
def get_possible_mapreduce_config_from(file_name):
'''Return the possible configs, args, params for a MapReduce job.'''
config = {
'configs': get_possible_pig_config_from(file_name).get('configs')
}
config['configs'] += workflow_factory.get_possible_mapreduce_configs()
return config
def get_possible_pig_config_from(file_name):
'''Return the possible configs, args, params for a Pig job.'''
config = {
'configs': load_hadoop_json_for_tag(file_name, 'mapred-site.xml'),
'args': [],
'params': {}
}
return config
def get_properties_for_tag(configurations, tag_name):
'''Get the properties for a tag
Given a list of configurations, return the properties for the named tag.
If the named tag cannot be found returns an empty list.
'''
for obj in configurations:
if obj.get('tag') == tag_name:
return obj.get('properties')
return []
def load_hadoop_json_for_tag(file_name, tag_name):
'''Given a file name and a tag, return the configs from that tag.'''
full_json = load_json_file(file_name)
properties = get_properties_for_tag(full_json['configurations'], tag_name)
configs = []
for prop in properties:
configs.append({
'name': prop.get('name'),
'value': prop.get('default_value'),
'description': prop.get('description')
})
return configs
def load_json_file(file_name):
'''Given a package relative json file name, return the json.'''
ftext = pkg.get_file_text(file_name)
loaded_json = json.loads(ftext)
return loaded_json

View File

@ -1,94 +0,0 @@
# Copyright (c) 2013 Hortonworks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
from sahara import exceptions
from sahara.i18n import _
from sahara.plugins import provisioning as p
HOST_REGISTRATIONS_TIMEOUT = p.Config(
'Host registrations timeout', 'general',
'cluster', config_type='int', priority=1,
default_value=3600, is_optional=True,
description='Timeout for host registrations, in seconds')
DECOMMISSIONING_TIMEOUT = p.Config(
'Timeout for decommissioning nodes', 'general',
'cluster', config_type='int', priority=1,
default_value=1000, is_optional=True,
description='Timeout for decommissioning nodes, in seconds')
class ConfigurationProvider(object):
def __init__(self, config, hadoop_version):
self.config = config
self.config_mapper = {}
self.config_items = []
self.hadoop_version = hadoop_version
self._initialize(config)
def get_config_items(self):
return self.config_items
def get_applicable_target(self, name):
return self.config_mapper.get(name)
def _get_target(self, apptarget):
if apptarget == 'TODO':
apptarget = 'general'
return apptarget
def _initialize(self, config):
for configuration in self.config['configurations']:
for service_property in configuration['properties']:
config = p.Config(service_property['name'],
self._get_target(
service_property['applicable_target']),
service_property['scope'],
config_type=service_property['config_type'],
default_value=service_property
['default_value'],
is_optional=service_property[
'is_optional'],
description=service_property[
'description'])
setattr(config, 'tag', configuration['tag'].rsplit(".", 1)[0])
self.config_items.append(config)
# TODO(jspeidel): an assumption is made that property names
# are unique across configuration sections which is dangerous
property_name = service_property['name']
# if property already exists, throw an exception
if property_name in self.config_mapper:
# internal error
# ambari-config-resource contains duplicates
raise exceptions.InvalidDataException(
_('Internal Error. Duplicate property '
'name detected: %s') % property_name)
self.config_mapper[service_property['name']] = (
self._get_target(
service_property['applicable_target']))
host_reg_timeout = copy.copy(HOST_REGISTRATIONS_TIMEOUT)
setattr(host_reg_timeout, 'tag', 'global')
self.config_items.append(host_reg_timeout)
self.config_mapper[host_reg_timeout.name] = 'global'
if self.hadoop_version == '2.0.6':
dec_timeout = copy.copy(DECOMMISSIONING_TIMEOUT)
setattr(dec_timeout, 'tag', 'global')
self.config_items.append(dec_timeout)
self.config_mapper[dec_timeout.name] = 'global'

View File

@ -1,42 +0,0 @@
# Copyright (c) 2014 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from sahara.plugins import exceptions as ex
from sahara.plugins import utils as u
from sahara.service.edp.oozie import engine as edp_engine
class EdpOozieEngine(edp_engine.OozieJobEngine):
def get_hdfs_user(self):
return 'hdfs'
def get_name_node_uri(self, cluster):
hdfs = cluster['info']['HDFS']
return hdfs.get('NameService', hdfs['NameNode'])
def get_oozie_server_uri(self, cluster):
return cluster['info']['JobFlow']['Oozie'] + "/oozie/"
def get_oozie_server(self, cluster):
return u.get_instance(cluster, "OOZIE_SERVER")
def validate_job_execution(self, cluster, job, data):
oo_count = u.get_instances_count(cluster, 'OOZIE_SERVER')
if oo_count != 1:
raise ex.InvalidComponentCountException(
'OOZIE_SERVER', '1', oo_count)
super(EdpOozieEngine, self).validate_job_execution(cluster, job, data)

View File

@ -1,279 +0,0 @@
# Copyright (c) 2013 Hortonworks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
from oslo_log import log as logging
from sahara.i18n import _
from sahara.i18n import _LI
from sahara.plugins import exceptions as ex
from sahara.plugins.hdp import saharautils
from sahara.utils import cluster_progress_ops as cpo
from sahara.utils import files as f
AMBARI_RPM = ('http://s3.amazonaws.com/public-repo-1.hortonworks.com/'
'ambari/centos6/1.x/updates/1.6.0/ambari.repo')
EPEL_RELEASE_PACKAGE_NAME = 'epel-release'
HADOOP_SWIFT_RPM = ('https://s3.amazonaws.com/public-repo-1.hortonworks.com/'
'sahara/swift/hadoop-swift-1.0-1.x86_64.rpm')
HADOOP_SWIFT_LOCAL_RPM = ('/opt/hdp-local-repos/hadoop-swift/'
'hadoop-swift-1.0-1.x86_64.rpm')
LOG = logging.getLogger(__name__)
class HadoopServer(object):
_master_ip = None
def __init__(self, instance, node_group, ambari_rpm=None):
self.instance = instance
self.node_group = node_group
self.ambari_rpm = ambari_rpm or AMBARI_RPM
def get_event_info(self):
return self.instance
@property
def cluster_id(self):
return self.instance.cluster_id
@cpo.event_wrapper(True, param=('self', 0))
def provision_ambari(self, ambari_info, cluster_spec):
self.install_rpms()
global_config = cluster_spec.configurations['global']
jdk_path = global_config.get('java64_home')
if 'AMBARI_SERVER' in self.node_group.components:
self._setup_and_start_ambari_server(ambari_info.port, jdk_path)
# all nodes must run Ambari agent
self._setup_and_start_ambari_agent(ambari_info.host.internal_ip)
@saharautils.inject_remote('r')
def rpms_installed(self, r):
rpm_cmd = 'rpm -q %s' % EPEL_RELEASE_PACKAGE_NAME
ret_code, stdout = r.execute_command(rpm_cmd,
run_as_root=True,
raise_when_error=False)
return ret_code == 0
@saharautils.inject_remote('r')
def install_rpms(self, r):
LOG.debug("Installing rpm's")
# TODO(jspeidel): based on image type, use correct command
curl_cmd = ('curl -f -s -o /etc/yum.repos.d/ambari.repo %s' %
self.ambari_rpm)
ret_code, stdout = r.execute_command(curl_cmd,
run_as_root=True,
raise_when_error=False)
if ret_code == 0:
yum_cmd = 'yum -y install %s' % EPEL_RELEASE_PACKAGE_NAME
r.execute_command(yum_cmd, run_as_root=True)
else:
LOG.debug("Unable to install rpm's from repo, "
"checking for local install.")
if not self.rpms_installed():
raise ex.HadoopProvisionError(
_('Failed to install Hortonworks Ambari'))
@cpo.event_wrapper(True, param=('self', 0))
@saharautils.inject_remote('r')
def install_swift_integration(self, r):
LOG.debug("Installing swift integration")
base_rpm_cmd = 'rpm -U --quiet '
rpm_cmd = base_rpm_cmd + HADOOP_SWIFT_RPM
ret_code, stdout = r.execute_command(rpm_cmd,
run_as_root=True,
raise_when_error=False)
if ret_code != 0:
LOG.debug("Unable to install swift integration from "
"source, checking for local rpm.")
ret_code, stdout = r.execute_command(
'ls ' + HADOOP_SWIFT_LOCAL_RPM,
run_as_root=True,
raise_when_error=False)
if ret_code == 0:
rpm_cmd = base_rpm_cmd + HADOOP_SWIFT_LOCAL_RPM
r.execute_command(rpm_cmd, run_as_root=True)
else:
raise ex.HadoopProvisionError(
_('Failed to install Hadoop Swift integration'))
@cpo.event_wrapper(True, param=('self', 0))
@saharautils.inject_remote('r')
def configure_topology(self, topology_str, r):
r.write_file_to(
'/etc/hadoop/conf/topology.sh',
f.get_file_text(
'plugins/hdp/versions/version_2_0_6/resources/topology.sh'))
r.execute_command(
'chmod +x /etc/hadoop/conf/topology.sh', run_as_root=True
)
r.write_file_to('/etc/hadoop/conf/topology.data', topology_str)
@saharautils.inject_remote('r')
def _setup_and_start_ambari_server(self, port, jdk_path, r):
LOG.debug('Installing ambari-server')
r.execute_command('yum -y install ambari-server', run_as_root=True)
LOG.debug('Running Ambari Server setup')
# remove postgres data directory as a precaution since its existence
# has prevented successful postgres installation
r.execute_command('rm -rf /var/lib/pgsql/data', run_as_root=True)
# determine if the JDK is installed on this image
# in the case of the plain image, no JDK will be available
return_code, stdout = r.execute_command('ls -l {jdk_location}'.format(
jdk_location=jdk_path), raise_when_error=False)
LOG.debug('Queried for JDK location on VM instance, return code = '
'{code}'.format(code=str(return_code)))
# do silent setup since we only use default responses now
# only add -j command if the JDK is configured for the template,
# and if the JDK is present
# in all other cases, allow Ambari to install the JDK
r.execute_command(
'ambari-server setup -s {jdk_arg} > /dev/null 2>&1'.format(
jdk_arg='-j ' + jdk_path if jdk_path and (return_code == 0)
else ''),
run_as_root=True, timeout=1800
)
self._configure_ambari_server_api_port(port)
# NOTE(dmitryme): Reading stdout from 'ambari-server start'
# hangs ssh. Redirecting output to /dev/null fixes that
r.execute_command(
'ambari-server start > /dev/null 2>&1', run_as_root=True
)
LOG.info(_LI('Ambari started'))
@saharautils.inject_remote('r')
def _configure_ambari_server_api_port(self, port, r):
# do nothing if port is not specified or is default
if port is None or port == 8080:
return
ambari_config_file = '/etc/ambari-server/conf/ambari.properties'
LOG.debug('Configuring Ambari Server API port: {port}'.format(
port=port))
# read the current contents
data = r.read_file_from(ambari_config_file)
data = '{0}\nclient.api.port={1}\n'.format(data, port)
# write the file back
r.write_file_to(ambari_config_file, data, run_as_root=True)
@saharautils.inject_remote('r')
def _setup_and_start_ambari_agent(self, ambari_server_ip, r):
LOG.debug('Installing Ambari agent')
r.execute_command('yum -y install ambari-agent', run_as_root=True)
LOG.debug(
'Setting master-ip: {ip} in ambari-agent.ini'.format(
ip=ambari_server_ip))
r.replace_remote_string(
'/etc/ambari-agent/conf/ambari-agent.ini', 'localhost',
ambari_server_ip)
# If the HDP 2 ambari agent is pre-installed on an image, the agent
# will start up during instance launch and therefore the agent
# registration will fail. It is therefore more appropriate to call
# restart since it will either start (if stopped) or restart (if
# running)
r.execute_command('ambari-agent restart', run_as_root=True)
LOG.info(_LI('Ambari Agent started'))
@saharautils.inject_remote('r')
def set_namenode_safemode(self, jh, r):
r.execute_command("sudo su -l hdfs -c 'JAVA_HOME={0} "
"hdfs dfsadmin -safemode enter'".format(jh),
run_as_root=True)
@saharautils.inject_remote('r')
def save_namenode_namespace(self, jh, r):
r.execute_command("sudo su -l hdfs -c 'JAVA_HOME={0} "
"hdfs dfsadmin -saveNamespace'".format(jh),
run_as_root=True)
@saharautils.inject_remote('r')
def initialize_shared_edits(self, jh, r):
r.execute_command("sudo su -l hdfs -c 'JAVA_HOME={0} "
"hdfs namenode -initializeSharedEdits'".format(jh),
run_as_root=True)
@saharautils.inject_remote('r')
def format_zookeeper_fc(self, jh, r):
r.execute_command("sudo su -l hdfs -c 'JAVA_HOME={0} "
"hdfs zkfc -formatZK'".format(jh),
run_as_root=True)
@saharautils.inject_remote('r')
def bootstrap_standby_namenode(self, jh, r):
r.execute_command("sudo su -l hdfs -c 'JAVA_HOME={0} "
"hdfs namenode -bootstrapStandby'".format(jh),
run_as_root=True)
@saharautils.inject_remote('r')
def install_httpfs(self, r):
r.execute_command("yum -y install hadoop-httpfs", run_as_root=True)
@saharautils.inject_remote('r')
def start_httpfs(self, r):
r.execute_command("service hadoop-httpfs start", run_as_root=True)
@saharautils.inject_remote('r')
def write_hue_temp_file(self, filename, content, r):
r.execute_command("echo %s > %s" % (content, filename),
run_as_root=True)
def _log(self, buf):
# FIXME(Kezar): I don't know what's this. Will check later.
LOG.debug(buf)
def _is_component_available(self, component):
return component in self.node_group.components
def _is_ganglia_master(self):
return self._is_component_available('GANGLIA_SERVER')
def _is_ganglia_slave(self):
return self._is_component_available('GANGLIA_MONITOR')
class DefaultPromptMatcher(object):
prompt_pattern = re.compile('(.*\()(.)(\)\?\s*$)', re.DOTALL)
def __init__(self, terminal_token):
self.eof_token = terminal_token
def get_response(self, s):
match = self.prompt_pattern.match(s)
if match:
response = match.group(2)
return response
else:
return None
def is_eof(self, s):
eof = self.eof_token in s
return eof

View File

@ -1,33 +0,0 @@
# Copyright (c) 2013 Hortonworks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
def get_host_role(host):
if hasattr(host, 'role'):
return host.role
else:
return host.node_group.name
def inject_remote(param_name):
def handle(func):
def call(self, *args, **kwargs):
with self.instance.remote() as r:
newkwargs = kwargs.copy()
newkwargs[param_name] = r
return func(self, *args, **newkwargs)
return call
return handle

View File

@ -1,75 +0,0 @@
# Copyright (c) 2013 Hortonworks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import six
@six.add_metaclass(abc.ABCMeta)
class AbstractVersionHandler(object):
@abc.abstractmethod
def get_config_items(self):
return
@abc.abstractmethod
def get_applicable_target(self, name):
return
@abc.abstractmethod
def get_cluster_spec(self, cluster, user_inputs, scaled_groups=None,
cluster_template=None):
return
@abc.abstractmethod
def get_ambari_client(self):
return
@abc.abstractmethod
def get_default_cluster_configuration(self):
return
@abc.abstractmethod
def get_node_processes(self):
return
@abc.abstractmethod
def install_swift_integration(self, servers):
return
@abc.abstractmethod
def get_version(self):
return
@abc.abstractmethod
def get_services_processor(self):
return
@abc.abstractmethod
def get_edp_engine(self, cluster, job_type):
return
@abc.abstractmethod
def get_edp_job_types(self):
return []
@abc.abstractmethod
def get_edp_config_hints(self, job_type):
return {}
@abc.abstractmethod
def get_open_ports(self, node_group):
return []

View File

@ -1,46 +0,0 @@
# Copyright (c) 2014 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from sahara.plugins.hdp import confighints_helper as ch_helper
from sahara.plugins.hdp import edp_engine
from sahara.service.edp import hdfs_helper
from sahara.utils import edp
class EdpOozieEngine(edp_engine.EdpOozieEngine):
def create_hdfs_dir(self, remote, dir_name):
hdfs_helper.create_dir_hadoop2(remote, dir_name, self.get_hdfs_user())
@staticmethod
def get_possible_job_config(job_type):
if edp.compare_job_type(job_type, edp.JOB_TYPE_HIVE):
return {'job_config': ch_helper.get_possible_hive_config_from(
'plugins/hdp/versions/version_2_0_6/resources/'
'ambari-config-resource.json')}
if edp.compare_job_type(job_type,
edp.JOB_TYPE_MAPREDUCE,
edp.JOB_TYPE_MAPREDUCE_STREAMING):
return {'job_config': ch_helper.get_possible_mapreduce_config_from(
'plugins/hdp/versions/version_2_0_6/resources/'
'ambari-config-resource.json')}
if edp.compare_job_type(job_type, edp.JOB_TYPE_PIG):
return {'job_config': ch_helper.get_possible_pig_config_from(
'plugins/hdp/versions/version_2_0_6/resources/'
'ambari-config-resource.json')}
return edp_engine.EdpOozieEngine.get_possible_job_config(job_type)
def get_resource_manager_uri(self, cluster):
return cluster['info']['Yarn']['ResourceManager']

View File

@ -1,21 +0,0 @@
#!/bin/bash
HADOOP_CONF=/etc/hadoop/conf
while [ $# -gt 0 ] ; do
nodeArg=$1
exec< ${HADOOP_CONF}/topology.data
result=""
while read line ; do
ar=( $line )
if [ "${ar[0]}" = "$nodeArg" ] ; then
result="${ar[1]}"
fi
done
shift
if [ -z "$result" ] ; then
echo -n "/default/rack "
else
echo -n "$result "
fi
done

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -1,59 +0,0 @@
# Copyright (c) 2013 Hortonworks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from sahara.utils import general
class VersionHandlerFactory(object):
versions = None
modules = None
initialized = False
@staticmethod
def get_instance():
if not VersionHandlerFactory.initialized:
src_dir = os.path.join(os.path.dirname(__file__), '')
versions = [name[8:].replace('_', '.')
for name in os.listdir(src_dir)
if os.path.isdir(os.path.join(src_dir, name))
and name.startswith('version_')]
versions.sort(key=general.natural_sort_key)
VersionHandlerFactory.versions = versions
VersionHandlerFactory.modules = {}
for version in VersionHandlerFactory.versions:
module_name = ('sahara.plugins.hdp.versions.version_{0}.'
'versionhandler'.format(
version.replace('.', '_')))
module_class = getattr(
__import__(module_name, fromlist=['sahara']),
'VersionHandler')
module = module_class()
# would prefer to use __init__ or some constructor, but keep
# getting exceptions...
module._set_version(version)
VersionHandlerFactory.modules[version] = module
VersionHandlerFactory.initialized = True
return VersionHandlerFactory()
def get_versions(self):
return VersionHandlerFactory.versions
def get_version_handler(self, version):
return VersionHandlerFactory.modules[version]

View File

@ -1,136 +0,0 @@
# Copyright (c) 2013 Hortonworks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pkg_resources as pkg
from sahara.plugins.hdp import clusterspec as cs
from sahara import version
class TestServer(object):
def __init__(self, hostname, role, img, flavor, public_ip, private_ip):
self.inst_fqdn = hostname
self.role = role
self.instance_info = InstanceInfo(
hostname, img, flavor, public_ip, private_ip)
self.management_ip = public_ip
self.public_ip = public_ip
self.internal_ip = private_ip
self.node_group = None
self.sahara_instance = self
self.storage_path = ['/mnt']
def storage_paths(self):
return self.storage_path
def fqdn(self):
return self.inst_fqdn
def remote(self):
return None
def get_instance_info(*args, **kwargs):
return args[0].instance_info
def create_clusterspec(hdp_version='2.0.6'):
version_suffix = hdp_version.replace('.', '_')
cluster_config_file = pkg.resource_string(
version.version_info.package,
'plugins/hdp/versions/version_{0}/resources/'
'default-cluster.template'.format(version_suffix))
return cs.ClusterSpec(cluster_config_file, version=hdp_version)
class InstanceInfo(object):
def __init__(self, hostname, image, flavor, management_ip, internal_ip):
self.image = image
self.flavor = flavor
self.management_ip = management_ip
self.internal_ip = internal_ip
class TestCluster(object):
def __init__(self, node_groups, cluster_configs=None):
self.plugin_name = 'hdp'
self.hadoop_version = None
if cluster_configs:
self.cluster_configs = cluster_configs
else:
self.cluster_configs = {}
self.node_groups = node_groups
self.default_image_id = '11111'
class TestNodeGroup(object):
def __init__(self, name, instances, node_processes, count=1):
self.name = name
self.instances = instances
if instances:
for i in instances:
i.node_group = self
self.node_processes = node_processes
self.count = count
self.id = name
class TestUserInputConfig(object):
def __init__(self, tag, target, name):
self.tag = tag
self.applicable_target = target
self.name = name
class TestRequest(object):
def put(self, url, data=None, auth=None, headers=None):
self.url = url
self.data = data
self.auth = auth
self.headers = headers
self.method = 'put'
return TestResult(200)
def post(self, url, data=None, auth=None, headers=None):
self.url = url
self.data = data
self.auth = auth
self.headers = headers
self.method = 'post'
return TestResult(201)
def delete(self, url, auth=None, headers=None):
self.url = url
self.auth = auth
self.data = None
self.headers = headers
self.method = 'delete'
return TestResult(200)
class TestResult(object):
def __init__(self, status):
self.status_code = status
self.text = ''
class TestUserInput(object):
def __init__(self, config, value):
self.config = config
self.value = value

View File

@ -1,42 +0,0 @@
{
"configurations": [
{
"file": "core-site",
"properties": [
{
"name": "fs.trash.interval",
"default_value": "360",
"config_type": "integer",
"is_optional": true,
"description": "...",
"applicable_target": "service:hdfs",
"scope" : "node"
},
{
"name": "fs.checkpoint.size",
"default_value": "536870912",
"config_type": "integer",
"is_optional": true,
"description": "...",
"applicable_target": "service:hdfs",
"scope" : "node"
}
]
},
{
"file": "global",
"properties": [
{
"name": "dfs_name_dir",
"default_value": "/hadoop/hdfs/namenode",
"config_type": "string",
"is_optional": true,
"description": "...",
"applicable_target": "service:hdfs",
"scope" : "cluster"
}
]
}
]
}

View File

@ -1,195 +0,0 @@
{
"name": "HDP",
"version": "1.3.2",
"author": "Hortonworks",
"created": "03-31-2013",
"reference": "Hortonworks-linux",
"packages": {
"type": "rpm",
"repos": [
{
"name": "HDP",
"repoLocations": [
{
"target": "centos6",
"uri": "http://public-repo-1.hortonworks.com/ambari/centos6/1.x/GA/"
},
{
"target": "suse11",
"uri": "http://public-repo-1.hortonworks.com/ambari/suse11/1.x/GA/"
}
]
}
]
},
"services": [
{
"name": "HDFS",
"components": [
{
"name": "NAMENODE",
"type": "MASTER",
"cardinality": "1",
"hostRequirements": [
{
"name": "python",
"value": "exists"
},
{
"name": "jdk-1.6",
"value": "exists"
}
],
"deployedPackages": {
"type": "rpm",
"deploymentContext": [
{
"name": "customName",
"value": "customValue"
}
]
}
},
{
"name": "DATANODE",
"type": "SLAVE",
"cardinality": "1+",
"hostRequirements": {
"python": "exists",
"jdk-1.6": "exists"
},
"deployedPackages": {
"type": "rpm"
}
}
],
"configurations": [
{
"name": "core-site",
"properties": [
{
"name": "fs.trash.interval",
"value": "360"
},
{
"name": "fs.checkpoint.size",
"value": "536870912"
}
]
},
{
"name": "global",
"properties": [
{
"name": "dfs_name_dir",
"value": "/hadoop/hdfs/namenode"
}
]
}
]
},
{
"name": "MAPREDUCE",
"components": [
{
"name": "JOBTRACKER",
"type": "MASTER",
"cardinality": "1",
"hostRequirements": {
"jdk-1.6": "exists"
},
"deployedPackages": {
"type": "rpm"
}
},
{
"name": "MAPREDUCE_CLIENT",
"type": "CLIENT",
"cardinality": "0+"
}
],
"configurations": [
{
"name": "global",
"properties": [
{
"name": "jobtracker_host",
"value": "localhost"
}
]
}
]
},
{
"name" : "AMBARI",
"components" : [
{
"name" : "AMBARI_SERVER",
"type" : "MASTER",
"cardinality" : "1"
},
{
"name" : "AMBARI_AGENT",
"type" : "SLAVE",
"cardinality" : "1+"
}
],
"configurations" : [
],
"users" : [
{
"name" : "admin",
"password" : "admin",
"groups" : [
"admin",
"user"
]
}
]
}
],
"host_role_mappings" : [
{
"name" : "MASTER",
"components" : [
{ "name" : "NAMENODE" },
{ "name" : "JOBTRACKER" },
{ "name" : "SECONDARY_NAMENODE" },
{ "name" : "GANGLIA_SERVER" },
{ "name" : "GANGLIA_MONITOR" },
{ "name" : "NAGIOS_SERVER" },
{ "name" : "AMBARI_SERVER" },
{ "name" : "AMBARI_AGENT" }
],
"hosts" : [
{
"cardinality" : "1"
}
]
},
{
"name" : "SLAVE",
"components" : [
{ "name" : "DATANODE" },
{ "name" : "TASKTRACKER" },
{ "name" : "GANGLIA_MONITOR" },
{ "name" : "HDFS_CLIENT" },
{ "name" : "MAPREDUCE_CLIENT" },
{ "name" : "AMBARI_AGENT" }
],
"hosts" : [
{
"cardinality" : "1+"
}
]
}
],
"configurations" : [
{
"name" : "global",
"properties" : [
{ "name" : "dfs_name_dir", "value" : "/hadoop/hdfs/namenode" }
]
}
]
}

View File

@ -1,312 +0,0 @@
# Copyright (c) 2013 Hortonworks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
import pkg_resources as pkg
import testtools
from sahara.conductor import resource as r
from sahara.plugins import base as pb
from sahara.plugins import exceptions as ex
from sahara.plugins.hdp import ambariplugin as ap
from sahara.plugins.hdp import clusterspec as cs
from sahara.tests.unit import base as sahara_base
import sahara.tests.unit.plugins.hdp.hdp_test_base as base
from sahara.utils import edp
from sahara import version
GET_REST_REQ = ("sahara.plugins.hdp.versions.version_2_0_6.versionhandler."
"AmbariClient._get_http_session")
def create_cluster_template(ctx, dct):
return r.ClusterTemplateResource(dct)
class AmbariPluginTest(sahara_base.SaharaTestCase):
def setUp(self):
super(AmbariPluginTest, self).setUp()
pb.setup_plugins()
def test_get_node_processes(self):
plugin = ap.AmbariPlugin()
service_components = plugin.get_node_processes('2.0.6')
self.assertEqual({
'YARN': ['RESOURCEMANAGER', 'YARN_CLIENT', 'NODEMANAGER'],
'GANGLIA': ['GANGLIA_SERVER'],
'HUE': ['HUE'],
'HIVE': ['HIVE_SERVER', 'HIVE_METASTORE', 'HIVE_CLIENT',
'MYSQL_SERVER'],
'OOZIE': ['OOZIE_SERVER', 'OOZIE_CLIENT'],
'HDFS': ['NAMENODE', 'DATANODE', 'SECONDARY_NAMENODE',
'HDFS_CLIENT', 'JOURNALNODE', 'ZKFC'],
'SQOOP': ['SQOOP'],
'MAPREDUCE2': ['HISTORYSERVER', 'MAPREDUCE2_CLIENT'],
'ZOOKEEPER': ['ZOOKEEPER_SERVER', 'ZOOKEEPER_CLIENT'],
'HBASE': ['HBASE_MASTER', 'HBASE_REGIONSERVER', 'HBASE_CLIENT'],
'HCATALOG': ['HCAT'],
'NAGIOS': ['NAGIOS_SERVER'],
'AMBARI': ['AMBARI_SERVER'],
'WEBHCAT': ['WEBHCAT_SERVER'],
'PIG': ['PIG']}, service_components)
def test_convert(self):
plugin = ap.AmbariPlugin()
cluster_config_file = pkg.resource_string(
version.version_info.package,
'plugins/hdp/versions/version_2_0_6/resources/'
'default-cluster.template')
cluster = plugin.convert(cluster_config_file, 'ambari', '2.0.6',
'test-plugin', create_cluster_template)
normalized_config = cs.ClusterSpec(cluster_config_file).normalize()
self.assertEqual(normalized_config.hadoop_version,
cluster.hadoop_version)
self.assertEqual(len(normalized_config.node_groups),
len(cluster.node_groups))
@mock.patch(GET_REST_REQ)
def test__set_ambari_credentials__admin_only(self, client):
client.side_effect = self._get_test_request
self.requests = []
plugin = ap.AmbariPlugin()
cluster_config_file = pkg.resource_string(
version.version_info.package,
'plugins/hdp/versions/version_2_0_6/resources/'
'default-cluster.template')
cluster_spec = cs.ClusterSpec(cluster_config_file)
ambari_info = ap.AmbariInfo(TestHost('111.11.1111'),
'8080', 'admin', 'old-pwd')
plugin._set_ambari_credentials(cluster_spec, ambari_info, '2.0.6')
self.assertEqual(1, len(self.requests))
request = self.requests[0]
self.assertEqual('put', request.method)
self.assertEqual('http://111.11.1111:8080/api/v1/users/admin',
request.url)
self.assertEqual('{"Users":{"roles":"admin","password":"admin",'
'"old_password":"old-pwd"} }', request.data)
self.assertEqual(('admin', 'old-pwd'), request.auth)
self.assertEqual('admin', ambari_info.user)
self.assertEqual('admin', ambari_info.password)
@mock.patch(GET_REST_REQ)
def test__set_ambari_credentials__new_user_no_admin(self, client):
self.requests = []
plugin = ap.AmbariPlugin()
client.side_effect = self._get_test_request
cluster_config_file = pkg.resource_string(
version.version_info.package,
'plugins/hdp/versions/version_2_0_6/resources/'
'default-cluster.template')
cluster_spec = cs.ClusterSpec(cluster_config_file)
for service in cluster_spec.services:
if service.name == 'AMBARI':
user = service.users[0]
user.name = 'test'
user.password = 'test_pw'
ambari_info = ap.AmbariInfo(TestHost('111.11.1111'), '8080',
'admin', 'old-pwd')
plugin._set_ambari_credentials(cluster_spec, ambari_info, '2.0.6')
self.assertEqual(2, len(self.requests))
request = self.requests[0]
self.assertEqual('post', request.method)
self.assertEqual('http://111.11.1111:8080/api/v1/users/test',
request.url)
self.assertEqual('{"Users":{"password":"test_pw","roles":"admin"'
'} }', request.data)
self.assertEqual(('admin', 'old-pwd'), request.auth)
request = self.requests[1]
self.assertEqual('delete', request.method)
self.assertEqual('http://111.11.1111:8080/api/v1/users/admin',
request.url)
self.assertIsNone(request.data)
self.assertEqual(('test', 'test_pw'), request.auth)
self.assertEqual('test', ambari_info.user)
self.assertEqual('test_pw', ambari_info.password)
@mock.patch(GET_REST_REQ)
def test__set_ambari_credentials__new_user_with_admin(self, client):
self.requests = []
plugin = ap.AmbariPlugin()
client.side_effect = self._get_test_request
cluster_config_file = pkg.resource_string(
version.version_info.package,
'plugins/hdp/versions/version_2_0_6/resources/'
'default-cluster.template')
cluster_spec = cs.ClusterSpec(cluster_config_file)
for service in cluster_spec.services:
if service.name == 'AMBARI':
new_user = cs.User('test', 'test_pw', ['user'])
service.users.append(new_user)
ambari_info = ap.AmbariInfo(TestHost('111.11.1111'), '8080',
'admin', 'old-pwd')
plugin._set_ambari_credentials(cluster_spec, ambari_info, '2.0.6')
self.assertEqual(2, len(self.requests))
request = self.requests[0]
self.assertEqual('put', request.method)
self.assertEqual('http://111.11.1111:8080/api/v1/users/admin',
request.url)
self.assertEqual('{"Users":{"roles":"admin","password":"admin",'
'"old_password":"old-pwd"} }', request.data)
self.assertEqual(('admin', 'old-pwd'), request.auth)
request = self.requests[1]
self.assertEqual('post', request.method)
self.assertEqual('http://111.11.1111:8080/api/v1/users/test',
request.url)
self.assertEqual('{"Users":{"password":"test_pw","roles":"user"} }',
request.data)
self.assertEqual(('admin', 'admin'), request.auth)
self.assertEqual('admin', ambari_info.user)
self.assertEqual('admin', ambari_info.password)
@mock.patch(GET_REST_REQ)
@testtools.skip("test failure because of #1325108")
def test__set_ambari_credentials__no_admin_user(self, client):
self.requests = []
plugin = ap.AmbariPlugin()
client.side_effect = self._get_test_request
cluster_config_file = pkg.resource_string(
version.version_info.package,
'plugins/hdp/versions/version_2_0_6/resources/'
'default-cluster.template')
cluster_spec = cs.ClusterSpec(cluster_config_file)
for service in cluster_spec.services:
if service.name == 'AMBARI':
user = service.users[0]
user.name = 'test'
user.password = 'test_pw'
user.groups = ['user']
ambari_info = ap.AmbariInfo(TestHost('111.11.1111'),
'8080', 'admin', 'old-pwd')
self.assertRaises(ex.HadoopProvisionError,
plugin._set_ambari_credentials,
cluster_spec, ambari_info, '2.0.6')
@mock.patch("sahara.utils.openstack.nova.get_instance_info",
base.get_instance_info)
@mock.patch('sahara.plugins.hdp.versions.version_2_0_6.services.'
'HdfsService._get_swift_properties', return_value=[])
def test__get_ambari_info(self, patched):
cluster_config_file = pkg.resource_string(
version.version_info.package,
'plugins/hdp/versions/version_2_0_6/resources/'
'default-cluster.template')
test_host = base.TestServer(
'host1', 'test-master', '11111', 3, '111.11.1111',
'222.11.1111')
node_group = base.TestNodeGroup(
'ng1', [test_host], ["AMBARI_SERVER", "NAMENODE", "DATANODE",
'RESOURCEMANAGER', 'YARN_CLIENT',
'NODEMANAGER',
'HISTORYSERVER', 'MAPREDUCE2_CLIENT',
'ZOOKEEPER_SERVER', 'ZOOKEEPER_CLIENT'])
cluster = base.TestCluster([node_group])
cluster_config = cs.ClusterSpec(cluster_config_file)
cluster_config.create_operational_config(cluster, [])
plugin = ap.AmbariPlugin()
# change port
cluster_config.configurations['ambari']['server.port'] = '9000'
ambari_info = plugin.get_ambari_info(cluster_config)
self.assertEqual('9000', ambari_info.port)
# remove port
del cluster_config.configurations['ambari']['server.port']
ambari_info = plugin.get_ambari_info(cluster_config)
self.assertEqual('8080', ambari_info.port)
def test_update_ambari_info_credentials(self):
plugin = ap.AmbariPlugin()
cluster_config_file = pkg.resource_string(
version.version_info.package,
'plugins/hdp/versions/version_2_0_6/resources/'
'default-cluster.template')
cluster_spec = cs.ClusterSpec(cluster_config_file)
ambari_info = ap.AmbariInfo(TestHost('111.11.1111'),
'8080', 'admin', 'old-pwd')
plugin._update_ambari_info_credentials(cluster_spec, ambari_info)
self.assertEqual('admin', ambari_info.user)
self.assertEqual('admin', ambari_info.password)
def test_get_oozie_server(self):
test_host = base.TestServer(
'host1', 'test-master', '11111', 3, '111.11.1111',
'222.11.1111')
node_group = base.TestNodeGroup(
'ng1', [test_host], ["AMBARI_SERVER", "NAMENODE", "DATANODE",
"OOZIE_SERVER"])
cluster = base.TestCluster([node_group])
cluster.hadoop_version = '2.0.6'
plugin = ap.AmbariPlugin()
self.assertIsNotNone(plugin.get_edp_engine(
cluster, edp.JOB_TYPE_PIG).get_oozie_server(cluster))
node_group = base.TestNodeGroup(
'ng1', [test_host], ["AMBARI_SERVER", "NAMENODE", "DATANODE",
"NOT_OOZIE"])
cluster = base.TestCluster([node_group])
cluster.hadoop_version = '2.0.6'
self.assertIsNone(plugin.get_edp_engine(
cluster, edp.JOB_TYPE_PIG).get_oozie_server(cluster))
@mock.patch('sahara.service.edp.hdfs_helper.create_dir_hadoop2')
def test_edp206_calls_hadoop2_create_dir(self, create_dir):
cluster = base.TestCluster([])
cluster.plugin_name = 'hdp'
cluster.hadoop_version = '2.0.6'
plugin = ap.AmbariPlugin()
plugin.get_edp_engine(cluster, edp.JOB_TYPE_PIG).create_hdfs_dir(
mock.Mock(), '/tmp')
self.assertEqual(1, create_dir.call_count)
def _get_test_request(self, host, port):
request = base.TestRequest()
self.requests.append(request)
return request
class TestHost(object):
def __init__(self, management_ip, role=None):
self.management_ip = management_ip
self.role = role

File diff suppressed because it is too large Load Diff

View File

@ -1,147 +0,0 @@
# Copyright (c) 2015 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from sahara.plugins.hdp import confighints_helper as ch_helper
from sahara.tests.unit import base as sahara_base
SAMPLE_CONFIG = {
'configurations': [
{
'tag': 'tag1.xml',
'properties': [
{
'name': 'prop1',
'default_value': '1234',
'description': 'the first property of tag1'
},
{
'name': 'prop2',
'default_value': '5678',
'description': 'the second property of tag1'
}
]
},
{
'tag': 'tag2.xml',
'properties': [
{
'name': 'prop3',
'default_value': '0000',
'description': 'the first property of tag2'
}
]
}
]
}
class ConfigHintsHelperTest(sahara_base.SaharaTestCase):
@mock.patch(
'sahara.plugins.hdp.confighints_helper.load_hadoop_json_for_tag',
wraps=ch_helper.load_hadoop_json_for_tag)
@mock.patch(
'sahara.plugins.hdp.confighints_helper.load_json_file',
return_value=SAMPLE_CONFIG)
def test_get_possible_hive_config_from(self,
load_json_file,
load_hadoop_json_for_tag):
expected_config = {
'configs': [],
'params': {}
}
actual_config = ch_helper.get_possible_hive_config_from(
'sample-file-name.json')
load_hadoop_json_for_tag.assert_called_once_with(
'sample-file-name.json', 'hive-site.xml')
self.assertEqual(expected_config, actual_config)
@mock.patch(
'sahara.service.edp.oozie.workflow_creator.workflow_factory.'
'get_possible_mapreduce_configs',
return_value=[])
@mock.patch(
'sahara.plugins.hdp.confighints_helper.load_hadoop_json_for_tag',
wraps=ch_helper.load_hadoop_json_for_tag)
@mock.patch(
'sahara.plugins.hdp.confighints_helper.load_json_file',
return_value=SAMPLE_CONFIG)
def test_get_possible_mapreduce_config_from(self,
load_json_file,
load_hadoop_json_for_tag,
get_poss_mr_configs):
expected_config = {
'configs': []
}
actual_config = ch_helper.get_possible_mapreduce_config_from(
'sample-file-name.json')
load_hadoop_json_for_tag.assert_called_once_with(
'sample-file-name.json', 'mapred-site.xml')
get_poss_mr_configs.assert_called_once_with()
self.assertEqual(expected_config, actual_config)
@mock.patch(
'sahara.plugins.hdp.confighints_helper.load_hadoop_json_for_tag',
wraps=ch_helper.load_hadoop_json_for_tag)
@mock.patch(
'sahara.plugins.hdp.confighints_helper.load_json_file',
return_value=SAMPLE_CONFIG)
def test_get_possible_pig_config_from(self,
load_json_file,
load_hadoop_json_for_tag):
expected_config = {
'configs': [],
'args': [],
'params': {}
}
actual_config = ch_helper.get_possible_pig_config_from(
'sample-file-name.json')
load_hadoop_json_for_tag.assert_called_once_with(
'sample-file-name.json', 'mapred-site.xml')
self.assertEqual(expected_config, actual_config)
def test_get_properties_for_tag(self):
expected_properties = [
{
'name': 'prop1',
'default_value': '1234',
'description': 'the first property of tag1'
},
{
'name': 'prop2',
'default_value': '5678',
'description': 'the second property of tag1'
}
]
actual_properties = ch_helper.get_properties_for_tag(
SAMPLE_CONFIG['configurations'], 'tag1.xml')
self.assertEqual(expected_properties, actual_properties)
@mock.patch(
'sahara.plugins.hdp.confighints_helper.load_json_file',
return_value=SAMPLE_CONFIG)
def test_load_hadoop_json_for_tag(self, load_json_file):
expected_configs = [
{
'name': 'prop3',
'value': '0000',
'description': 'the first property of tag2'
}
]
actual_configs = ch_helper.load_hadoop_json_for_tag(
'sample-file-name.json', 'tag2.xml')
self.assertEqual(expected_configs, actual_configs)

View File

@ -1,815 +0,0 @@
# Copyright (c) 2013 Hortonworks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from sahara import exceptions as e
from sahara.plugins import exceptions as ex
from sahara.plugins.hdp.versions import versionhandlerfactory as vhf
from sahara.tests.unit import base
from sahara.tests.unit.plugins.hdp import hdp_test_base
versions = ['2.0.6']
class ServicesTest(base.SaharaTestCase):
# TODO(jspeidel): test remaining service functionality which isn't
# tested by coarser grained unit tests.
def get_services_processor(self, version='2.0.6'):
handler = (vhf.VersionHandlerFactory.get_instance().
get_version_handler(version))
s = handler.get_services_processor()
return s
def test_create_hdfs_service(self):
for version in versions:
s = self.get_services_processor(version)
service = s.create_service('HDFS')
self.assertEqual('HDFS', service.name)
expected_configs = set(['global', 'core-site', 'hdfs-site'])
self.assertEqual(expected_configs,
expected_configs & service.configurations)
self.assertTrue(service.is_mandatory())
def test_hdp2_hdfs_service_register_urls(self):
s = self.get_services_processor('2.0.6')
service = s.create_service('HDFS')
cluster_spec = mock.Mock()
cluster_spec.configurations = {
'core-site': {
'fs.defaultFS': 'hdfs://not_expected.com:9020'
},
'hdfs-site': {
'dfs.namenode.http-address': 'http://not_expected.com:10070'
}
}
instance_mock = mock.Mock()
instance_mock.management_ip = '127.0.0.1'
cluster_spec.determine_component_hosts = mock.Mock(
return_value=[instance_mock])
cluster = mock.Mock(cluster_configs={}, name="hdp")
url_info = {}
url_info = service.register_service_urls(cluster_spec, url_info,
cluster)
self.assertEqual(url_info['HDFS']['Web UI'],
'http://127.0.0.1:10070')
self.assertEqual(url_info['HDFS']['NameNode'],
'hdfs://127.0.0.1:9020')
def test_hdp2_ha_hdfs_service_register_urls(self):
s = self.get_services_processor('2.0.6')
service = s.create_service('HDFS')
cluster_spec = mock.Mock()
cluster_spec.configurations = {
'core-site': {
'fs.defaultFS': 'hdfs://not_expected.com:9020'
},
'hdfs-site': {
'dfs.namenode.http-address': 'http://not_expected.com:10070'
}
}
instance_mock = mock.Mock()
instance_mock.management_ip = '127.0.0.1'
cluster_spec.determine_component_hosts = mock.Mock(
return_value=[instance_mock])
cluster = mock.Mock(cluster_configs={'HDFSHA': {'hdfs.nnha': True}})
cluster.name = "hdp-cluster"
url_info = {}
url_info = service.register_service_urls(cluster_spec, url_info,
cluster)
self.assertEqual(url_info['HDFS']['Web UI'],
'http://127.0.0.1:10070')
self.assertEqual(url_info['HDFS']['NameNode'],
'hdfs://127.0.0.1:9020')
self.assertEqual(url_info['HDFS']['NameService'],
'hdfs://hdp-cluster')
def test_create_mr2_service(self):
s = self.get_services_processor('2.0.6')
service = s.create_service('MAPREDUCE2')
self.assertEqual('MAPREDUCE2', service.name)
expected_configs = set(['global', 'core-site', 'mapred-site'])
self.assertEqual(expected_configs,
expected_configs & service.configurations)
self.assertTrue(service.is_mandatory())
def test_hdp2_mr2_service_register_urls(self):
s = self.get_services_processor('2.0.6')
service = s.create_service('MAPREDUCE2')
cluster_spec = mock.Mock()
cluster_spec.configurations = {
'mapred-site': {
'mapreduce.jobhistory.address':
'hdfs://not_expected.com:10300',
'mapreduce.jobhistory.webapp.address':
'http://not_expected.com:10030'
}
}
instance_mock = mock.Mock()
instance_mock.management_ip = '127.0.0.1'
cluster_spec.determine_component_hosts = mock.Mock(
return_value=[instance_mock])
url_info = {}
url_info = service.register_service_urls(cluster_spec, url_info,
mock.Mock())
self.assertEqual(url_info['MapReduce2']['Web UI'],
'http://127.0.0.1:10030')
self.assertEqual(url_info['MapReduce2']['History Server'],
'127.0.0.1:10300')
def test_create_hive_service(self):
for version in versions:
s = self.get_services_processor(version)
service = s.create_service('HIVE')
self.assertEqual('HIVE', service.name)
expected_configs = set(['global', 'core-site', 'hive-site'])
self.assertEqual(expected_configs,
expected_configs & service.configurations)
self.assertFalse(service.is_mandatory())
def test_create_webhcat_service(self):
for version in versions:
s = self.get_services_processor(version)
service = s.create_service('WEBHCAT')
self.assertEqual('WEBHCAT', service.name)
expected_configs = set(['global', 'core-site', 'webhcat-site'])
self.assertEqual(expected_configs,
expected_configs & service.configurations)
self.assertFalse(service.is_mandatory())
def test_create_zk_service(self):
for version in versions:
s = self.get_services_processor()
service = s.create_service('ZOOKEEPER')
self.assertEqual('ZOOKEEPER', service.name)
expected_configs = set(['global', 'core-site'])
self.assertEqual(expected_configs,
expected_configs & service.configurations)
self.assertTrue(service.is_mandatory())
def test_create_oozie_service(self):
for version in versions:
s = self.get_services_processor()
service = s.create_service('OOZIE')
self.assertEqual('OOZIE', service.name)
expected_configs = set(['global', 'core-site', 'oozie-site'])
self.assertEqual(expected_configs,
expected_configs & service.configurations)
self.assertFalse(service.is_mandatory())
def test_oozie_service_register_urls(self):
for version in versions:
s = self.get_services_processor(version)
service = s.create_service('OOZIE')
cluster_spec = mock.Mock()
cluster_spec.configurations = {
'oozie-site': {
'oozie.base.url': 'hdfs://not_expected.com:21000'
}
}
instance_mock = mock.Mock()
instance_mock.management_ip = '127.0.0.1'
cluster_spec.determine_component_hosts = mock.Mock(
return_value=[instance_mock])
url_info = {}
url_info = service.register_service_urls(cluster_spec, url_info,
mock.Mock())
self.assertEqual('http://127.0.0.1:21000',
url_info['JobFlow']['Oozie'])
def test_create_ganglia_service(self):
for version in versions:
s = self.get_services_processor(version)
service = s.create_service('GANGLIA')
self.assertEqual('GANGLIA', service.name)
expected_configs = set(['global', 'core-site'])
self.assertEqual(expected_configs,
expected_configs & service.configurations)
self.assertFalse(service.is_mandatory())
def test_create_ambari_service(self):
for version in versions:
s = self.get_services_processor(version)
service = s.create_service('AMBARI')
self.assertEqual('AMBARI', service.name)
expected_configs = set(['global', 'core-site'])
self.assertEqual(expected_configs,
expected_configs & service.configurations)
self.assertTrue(service.is_mandatory())
@mock.patch("sahara.utils.openstack.nova.get_instance_info",
hdp_test_base.get_instance_info)
@mock.patch(
'sahara.plugins.hdp.versions.version_2_0_6.services.HdfsService.'
'_get_swift_properties',
return_value=[])
def test_hdp2_create_sqoop_service(self, patched):
s = self.get_services_processor('2.0.6')
service = s.create_service('SQOOP')
self.assertEqual('SQOOP', service.name)
expected_configs = set(['global', 'core-site'])
self.assertEqual(expected_configs,
expected_configs & service.configurations)
self.assertFalse(service.is_mandatory())
# ensure that hdfs and mr clients are added implicitly
master_host = hdp_test_base.TestServer(
'master.novalocal', 'master', '11111', 3,
'111.11.1111', '222.11.1111')
master_ng = hdp_test_base.TestNodeGroup(
'master', [master_host], ["NAMENODE", "RESOURCEMANAGER",
"HISTORYSERVER", "SECONDARY_NAMENODE",
"NODEMANAGER", "DATANODE",
"AMBARI_SERVER", "ZOOKEEPER_SERVER"])
sqoop_host = hdp_test_base.TestServer(
'sqoop.novalocal', 'sqoop', '11111', 3,
'111.11.1111', '222.11.1111')
sqoop_ng = hdp_test_base.TestNodeGroup(
'sqoop', [sqoop_host], ["SQOOP"])
cluster = hdp_test_base.TestCluster([master_ng, sqoop_ng])
cluster_spec = hdp_test_base.create_clusterspec(hdp_version='2.0.6')
cluster_spec.create_operational_config(cluster, [])
components = cluster_spec.get_node_groups_containing_component(
'SQOOP')[0].components
self.assertIn('HDFS_CLIENT', components)
self.assertIn('MAPREDUCE2_CLIENT', components)
@mock.patch("sahara.utils.openstack.nova.get_instance_info",
hdp_test_base.get_instance_info)
@mock.patch(
'sahara.plugins.hdp.versions.version_2_0_6.services.HdfsService.'
'_get_swift_properties',
return_value=[])
def test_create_hbase_service(self, patched):
s = self.get_services_processor()
service = s.create_service('HBASE')
self.assertEqual('HBASE', service.name)
expected_configs = set(['global', 'core-site', 'hbase-site'])
self.assertEqual(expected_configs,
expected_configs & service.configurations)
self.assertFalse(service.is_mandatory())
cluster = self._create_hbase_cluster()
cluster_spec = hdp_test_base.create_clusterspec()
cluster_spec.create_operational_config(cluster, [])
components = cluster_spec.get_node_groups_containing_component(
'HBASE_MASTER')[0].components
self.assertIn('HDFS_CLIENT', components)
@mock.patch("sahara.utils.openstack.nova.get_instance_info",
hdp_test_base.get_instance_info)
@mock.patch(
'sahara.plugins.hdp.versions.version_2_0_6.services.HdfsService.'
'_get_swift_properties',
return_value=[])
def test_create_hdp2_hbase_service(self, patched):
for version in versions:
s = self.get_services_processor(version=version)
service = s.create_service('HBASE')
self.assertEqual('HBASE', service.name)
expected_configs = set(['global', 'core-site', 'hbase-site'])
self.assertEqual(expected_configs,
expected_configs & service.configurations)
self.assertFalse(service.is_mandatory())
cluster = self._create_hbase_cluster()
cluster_spec = hdp_test_base.create_clusterspec(
hdp_version=version)
cluster_spec.create_operational_config(cluster, [])
components = cluster_spec.get_node_groups_containing_component(
'HBASE_MASTER')[0].components
self.assertIn('HDFS_CLIENT', components)
def test_create_yarn_service(self):
s = self.get_services_processor(version='2.0.6')
service = s.create_service('YARN')
self.assertEqual('YARN', service.name)
expected_configs = set(['global', 'core-site', 'yarn-site'])
self.assertEqual(expected_configs,
expected_configs & service.configurations)
self.assertTrue(service.is_mandatory())
@mock.patch("sahara.utils.openstack.nova.get_instance_info",
hdp_test_base.get_instance_info)
@mock.patch(
'sahara.plugins.hdp.versions.version_2_0_6.services.HdfsService.'
'_get_swift_properties',
return_value=[])
def test_hbase_properties(self, patched):
for version in versions:
cluster = self._create_hbase_cluster()
cluster_spec = hdp_test_base.create_clusterspec(
hdp_version=version)
cluster_spec.create_operational_config(cluster, [])
s = self.get_services_processor(version=version)
service = s.create_service('HBASE')
ui_handlers = {}
service.register_user_input_handlers(ui_handlers)
ui_handlers['hbase-site/hbase.rootdir'](
hdp_test_base.TestUserInput(
hdp_test_base.TestUserInputConfig(
'', '', 'hbase-site/hbase.rootdir'),
"hdfs://%NN_HOST%:99/some/other/dir"),
cluster_spec.configurations)
self.assertEqual(
"hdfs://%NN_HOST%:99/some/other/dir",
cluster_spec.configurations['hbase-site']['hbase.rootdir'])
self.assertEqual(
"/some/other/dir",
cluster_spec.configurations['global']['hbase_hdfs_root_dir'])
self.assertRaises(
e.InvalidDataException,
ui_handlers['hbase-site/hbase.rootdir'],
hdp_test_base.TestUserInput(
hdp_test_base.TestUserInputConfig(
'', '', 'hbase-site/hbase.rootdir'),
"badprotocol://%NN_HOST%:99/some/other/dir"),
cluster_spec.configurations)
ui_handlers['hbase-site/hbase.tmp.dir'](
hdp_test_base.TestUserInput(
hdp_test_base.TestUserInputConfig(
'', '', 'hbase-site/hbase.tmp.dir'),
"/some/dir"),
cluster_spec.configurations)
self.assertEqual(
"/some/dir",
cluster_spec.configurations['hbase-site']['hbase.tmp.dir'])
self.assertEqual(
"/some/dir",
cluster_spec.configurations['global']['hbase_tmp_dir'])
ui_handlers[
'hbase-site/hbase.regionserver.global.memstore.upperLimit'](
hdp_test_base.TestUserInput(
hdp_test_base.TestUserInputConfig(
'', '', 'hbase-site/hbase.regionserver.global.'
'memstore.upperLimit'),
"111"),
cluster_spec.configurations)
self.assertEqual(
"111",
cluster_spec.configurations['hbase-site'][
'hbase.regionserver.global.memstore.upperLimit'])
self.assertEqual(
"111",
cluster_spec.configurations['global'][
'regionserver_memstore_upperlimit'])
ui_handlers[
'hbase-site/hbase.hstore.blockingStoreFiles'](
hdp_test_base.TestUserInput(
hdp_test_base.TestUserInputConfig(
'', '',
'hbase-site/hbase.hstore.blockingStoreFiles'),
"112"),
cluster_spec.configurations)
self.assertEqual("112", cluster_spec.configurations['hbase-site'][
'hbase.hstore.blockingStoreFiles'])
self.assertEqual("112", cluster_spec.configurations['global'][
'hstore_blockingstorefiles'])
ui_handlers[
'hbase-site/hbase.hstore.compactionThreshold'](
hdp_test_base.TestUserInput(
hdp_test_base.TestUserInputConfig(
'', '',
'hbase-site/hbase.hstore.compactionThreshold'),
"113"),
cluster_spec.configurations)
self.assertEqual("113", cluster_spec.configurations['hbase-site'][
'hbase.hstore.compactionThreshold'])
self.assertEqual("113", cluster_spec.configurations['global'][
'hstore_compactionthreshold'])
ui_handlers[
'hbase-site/hfile.block.cache.size'](
hdp_test_base.TestUserInput(
hdp_test_base.TestUserInputConfig(
'', '', 'hbase-site/hfile.block.cache.size'),
"114"),
cluster_spec.configurations)
self.assertEqual("114", cluster_spec.configurations['hbase-site'][
'hfile.block.cache.size'])
self.assertEqual("114", cluster_spec.configurations['global'][
'hfile_blockcache_size'])
ui_handlers[
'hbase-site/hbase.hregion.max.filesize'](
hdp_test_base.TestUserInput(
hdp_test_base.TestUserInputConfig(
'', '', 'hbase-site/hbase.hregion.max.filesize'),
"115"),
cluster_spec.configurations)
self.assertEqual("115", cluster_spec.configurations['hbase-site'][
'hbase.hregion.max.filesize'])
self.assertEqual("115", cluster_spec.configurations['global'][
'hstorefile_maxsize'])
ui_handlers[
'hbase-site/hbase.regionserver.handler.count'](
hdp_test_base.TestUserInput(
hdp_test_base.TestUserInputConfig(
'', '',
'hbase-site/hbase.regionserver.handler.count'),
"116"),
cluster_spec.configurations)
self.assertEqual("116", cluster_spec.configurations['hbase-site'][
'hbase.regionserver.handler.count'])
self.assertEqual("116", cluster_spec.configurations['global'][
'regionserver_handlers'])
ui_handlers[
'hbase-site/hbase.hregion.majorcompaction'](
hdp_test_base.TestUserInput(
hdp_test_base.TestUserInputConfig(
'', '',
'hbase-site/hbase.hregion.majorcompaction'),
"117"),
cluster_spec.configurations)
self.assertEqual("117", cluster_spec.configurations['hbase-site'][
'hbase.hregion.majorcompaction'])
self.assertEqual("117", cluster_spec.configurations['global'][
'hregion_majorcompaction'])
ui_handlers[
'hbase-site/hbase.regionserver.global.memstore.lowerLimit'](
hdp_test_base.TestUserInput(
hdp_test_base.TestUserInputConfig(
'', '', 'hbase-site/hbase.regionserver.global.'
'memstore.lowerLimit'),
"118"),
cluster_spec.configurations)
self.assertEqual("118", cluster_spec.configurations['hbase-site'][
'hbase.regionserver.global.memstore.lowerLimit'])
self.assertEqual("118", cluster_spec.configurations['global'][
'regionserver_memstore_lowerlimit'])
ui_handlers[
'hbase-site/hbase.hregion.memstore.block.multiplier'](
hdp_test_base.TestUserInput(
hdp_test_base.TestUserInputConfig(
'', '', 'hbase-site/hbase.hregion.memstore.block.'
'multiplier'),
"119"),
cluster_spec.configurations)
self.assertEqual("119", cluster_spec.configurations['hbase-site'][
'hbase.hregion.memstore.block.multiplier'])
self.assertEqual("119", cluster_spec.configurations['global'][
'hregion_blockmultiplier'])
ui_handlers[
'hbase-site/hbase.hregion.memstore.mslab.enabled'](
hdp_test_base.TestUserInput(
hdp_test_base.TestUserInputConfig(
'', '', 'hbase-site/hbase.hregion.memstore.mslab.'
'enabled'),
"false"),
cluster_spec.configurations)
self.assertEqual("false", cluster_spec.configurations['hbase-site']
['hbase.hregion.memstore.mslab.enabled'])
self.assertEqual("false", cluster_spec.configurations['global'][
'regionserver_memstore_lab'])
ui_handlers[
'hbase-site/hbase.hregion.memstore.flush.size'](
hdp_test_base.TestUserInput(
hdp_test_base.TestUserInputConfig(
'', '', 'hbase-site/hbase.hregion.memstore.flush.'
'size'),
"120"),
cluster_spec.configurations)
self.assertEqual("120", cluster_spec.configurations['hbase-site'][
'hbase.hregion.memstore.flush.size'])
if version == '1.3.2':
self.assertEqual("120", cluster_spec.configurations['global'][
'hregion_memstoreflushsize'])
ui_handlers[
'hbase-site/hbase.client.scanner.caching'](
hdp_test_base.TestUserInput(
hdp_test_base.TestUserInputConfig(
'', '', 'hbase-site/hbase.client.scanner.caching'),
"121"),
cluster_spec.configurations)
self.assertEqual("121", cluster_spec.configurations['hbase-site'][
'hbase.client.scanner.caching'])
self.assertEqual("121", cluster_spec.configurations['global'][
'client_scannercaching'])
ui_handlers[
'hbase-site/zookeeper.session.timeout'](
hdp_test_base.TestUserInput(
hdp_test_base.TestUserInputConfig(
'', '', 'hbase-site/zookeeper.session.timeout'),
"122"),
cluster_spec.configurations)
self.assertEqual("122", cluster_spec.configurations['hbase-site'][
'zookeeper.session.timeout'])
self.assertEqual("122", cluster_spec.configurations['global'][
'zookeeper_sessiontimeout'])
ui_handlers[
'hbase-site/hbase.client.keyvalue.maxsize'](
hdp_test_base.TestUserInput(
hdp_test_base.TestUserInputConfig(
'', '',
'hbase-site/hbase.client.keyvalue.maxsize'),
"123"),
cluster_spec.configurations)
self.assertEqual("123", cluster_spec.configurations['hbase-site'][
'hbase.client.keyvalue.maxsize'])
self.assertEqual("123", cluster_spec.configurations['global'][
'hfile_max_keyvalue_size'])
ui_handlers[
'hdfs-site/dfs.support.append'](
hdp_test_base.TestUserInput(
hdp_test_base.TestUserInputConfig(
'', '', 'hdfs-site/dfs.support.append'),
"false"),
cluster_spec.configurations)
self.assertEqual("false", cluster_spec.configurations['hbase-site']
['dfs.support.append'])
self.assertEqual("false", cluster_spec.configurations['hdfs-site'][
'dfs.support.append'])
self.assertEqual("false", cluster_spec.configurations['global'][
'hdfs_support_append'])
ui_handlers[
'hbase-site/dfs.client.read.shortcircuit'](
hdp_test_base.TestUserInput(
hdp_test_base.TestUserInputConfig(
'', '', 'hbase-site/dfs.client.read.shortcircuit'),
"false"),
cluster_spec.configurations)
self.assertEqual("false", cluster_spec.configurations['hbase-site']
['dfs.client.read.shortcircuit'])
self.assertEqual("false", cluster_spec.configurations['global'][
'hdfs_enable_shortcircuit_read'])
@mock.patch("sahara.utils.openstack.nova.get_instance_info",
hdp_test_base.get_instance_info)
@mock.patch(
'sahara.plugins.hdp.versions.version_2_0_6.services.HdfsService.'
'_get_swift_properties',
return_value=[])
def test_hbase_validation(self, patched):
master_host = hdp_test_base.TestServer(
'master.novalocal', 'master', '11111', 3,
'111.11.1111', '222.11.1111')
master_ng = hdp_test_base.TestNodeGroup(
'master', [master_host], ["NAMENODE",
'RESOURCEMANAGER', 'YARN_CLIENT',
'NODEMANAGER',
"SECONDARY_NAMENODE",
"DATANODE",
"AMBARI_SERVER",
'HISTORYSERVER', 'MAPREDUCE2_CLIENT',
'ZOOKEEPER_SERVER', 'ZOOKEEPER_CLIENT'])
hbase_host = hdp_test_base.TestServer(
'hbase.novalocal', 'hbase', '11111', 3,
'111.11.1111', '222.11.1111')
hbase_ng = hdp_test_base.TestNodeGroup(
'hbase', [hbase_host], ["HBASE_MASTER"])
hbase_ng2 = hdp_test_base.TestNodeGroup(
'hbase2', [hbase_host], ["HBASE_MASTER"])
hbase_client_host = hdp_test_base.TestServer(
'hbase-client.novalocal', 'hbase-client', '11111', 3,
'111.11.1111', '222.11.1111')
hbase_client_ng = hdp_test_base.TestNodeGroup(
'hbase-client', [hbase_client_host], ["HBASE_CLIENT"])
hbase_slave_host = hdp_test_base.TestServer(
'hbase-rs.novalocal', 'hbase-rs', '11111', 3,
'111.11.1111', '222.11.1111')
hbase_slave_ng = hdp_test_base.TestNodeGroup(
'hbase-rs', [hbase_slave_host], ["HBASE_REGIONSERVER"])
cluster = hdp_test_base.TestCluster([master_ng, hbase_client_ng])
cluster_spec = hdp_test_base.create_clusterspec()
# validation should fail due to lack of hbase master
self.assertRaises(
ex.InvalidComponentCountException,
cluster_spec.create_operational_config, cluster, [])
cluster = hdp_test_base.TestCluster(
[master_ng, hbase_client_ng, hbase_slave_ng])
cluster_spec = hdp_test_base.create_clusterspec()
# validation should fail due to lack of hbase master
self.assertRaises(
ex.InvalidComponentCountException,
cluster_spec.create_operational_config, cluster, [])
cluster = hdp_test_base.TestCluster(
[master_ng, hbase_client_ng, hbase_ng])
cluster_spec = hdp_test_base.create_clusterspec()
# validation should succeed with hbase master included
cluster_spec.create_operational_config(cluster, [])
cluster = hdp_test_base.TestCluster(
[master_ng, hbase_client_ng, hbase_ng, hbase_ng2])
cluster_spec = hdp_test_base.create_clusterspec()
# validation should fail with multiple hbase master components
self.assertRaises(
ex.InvalidComponentCountException,
cluster_spec.create_operational_config, cluster, [])
@mock.patch("sahara.utils.openstack.nova.get_instance_info",
hdp_test_base.get_instance_info)
@mock.patch(
'sahara.plugins.hdp.versions.version_2_0_6.services.HdfsService.'
'_get_swift_properties',
return_value=[])
def test_hdp2_hbase_validation(self, patched):
master_host = hdp_test_base.TestServer(
'master.novalocal', 'master', '11111', 3,
'111.11.1111', '222.11.1111')
master_ng = hdp_test_base.TestNodeGroup(
'master', [master_host], ["NAMENODE", "RESOURCEMANAGER",
"SECONDARY_NAMENODE", "HISTORYSERVER",
"NODEMANAGER", "DATANODE",
"AMBARI_SERVER", "ZOOKEEPER_SERVER"])
hbase_host = hdp_test_base.TestServer(
'hbase.novalocal', 'hbase', '11111', 3,
'111.11.1111', '222.11.1111')
hbase_ng = hdp_test_base.TestNodeGroup(
'hbase', [hbase_host], ["HBASE_MASTER"])
hbase_ng2 = hdp_test_base.TestNodeGroup(
'hbase2', [hbase_host], ["HBASE_MASTER"])
hbase_client_host = hdp_test_base.TestServer(
'hbase-client.novalocal', 'hbase-client', '11111', 3,
'111.11.1111', '222.11.1111')
hbase_client_ng = hdp_test_base.TestNodeGroup(
'hbase-client', [hbase_client_host], ["HBASE_CLIENT"])
hbase_slave_host = hdp_test_base.TestServer(
'hbase-rs.novalocal', 'hbase-rs', '11111', 3,
'111.11.1111', '222.11.1111')
hbase_slave_ng = hdp_test_base.TestNodeGroup(
'hbase-rs', [hbase_slave_host], ["HBASE_REGIONSERVER"])
cluster = hdp_test_base.TestCluster([master_ng, hbase_client_ng])
cluster_spec = hdp_test_base.create_clusterspec(hdp_version='2.0.6')
# validation should fail due to lack of hbase master
self.assertRaises(
ex.InvalidComponentCountException,
cluster_spec.create_operational_config, cluster, [])
cluster = hdp_test_base.TestCluster(
[master_ng, hbase_client_ng, hbase_slave_ng])
cluster_spec = hdp_test_base.create_clusterspec(hdp_version='2.0.6')
# validation should fail due to lack of hbase master
self.assertRaises(
ex.InvalidComponentCountException,
cluster_spec.create_operational_config, cluster, [])
cluster = hdp_test_base.TestCluster(
[master_ng, hbase_client_ng, hbase_ng])
cluster_spec = hdp_test_base.create_clusterspec(hdp_version='2.0.6')
# validation should succeed with hbase master included
cluster_spec.create_operational_config(cluster, [])
cluster = hdp_test_base.TestCluster(
[master_ng, hbase_client_ng, hbase_ng, hbase_ng2])
cluster_spec = hdp_test_base.create_clusterspec(hdp_version='2.0.6')
# validation should fail with multiple hbase master components
self.assertRaises(
ex.InvalidComponentCountException,
cluster_spec.create_operational_config, cluster, [])
@mock.patch("sahara.utils.openstack.nova.get_instance_info",
hdp_test_base.get_instance_info)
@mock.patch(
'sahara.plugins.hdp.versions.version_2_0_6.services.HdfsService.'
'_get_swift_properties',
return_value=[])
def test_hbase_service_urls(self, patched):
for version in versions:
cluster = self._create_hbase_cluster()
cluster_spec = hdp_test_base.create_clusterspec(
hdp_version=version)
cluster_spec.create_operational_config(cluster, [])
s = self.get_services_processor(version=version)
service = s.create_service('HBASE')
url_info = {}
service.register_service_urls(cluster_spec, url_info, mock.Mock())
self.assertEqual(1, len(url_info))
self.assertEqual(6, len(url_info['HBase']))
self.assertEqual('http://222.22.2222:60010/master-status',
url_info['HBase']['Web UI'])
self.assertEqual('http://222.22.2222:60010/logs',
url_info['HBase']['Logs'])
self.assertEqual('http://222.22.2222:60010/zk.jsp',
url_info['HBase']['Zookeeper Info'])
self.assertEqual('http://222.22.2222:60010/jmx',
url_info['HBase']['JMX'])
self.assertEqual('http://222.22.2222:60010/dump',
url_info['HBase']['Debug Dump'])
self.assertEqual('http://222.22.2222:60010/stacks',
url_info['HBase']['Thread Stacks'])
@mock.patch("sahara.utils.openstack.nova.get_instance_info",
hdp_test_base.get_instance_info)
@mock.patch(
'sahara.plugins.hdp.versions.version_2_0_6.services.HdfsService.'
'_get_swift_properties',
return_value=[])
def test_hbase_replace_tokens(self, patched):
for version in versions:
cluster = self._create_hbase_cluster()
cluster_spec = hdp_test_base.create_clusterspec(
hdp_version=version)
cluster_spec.create_operational_config(cluster, [])
s = self.get_services_processor(version=version)
service = s.create_service('HBASE')
service.finalize_configuration(cluster_spec)
self.assertEqual("hdfs://master.novalocal:8020/apps/hbase/data",
cluster_spec.configurations['hbase-site'][
'hbase.rootdir'])
self.assertEqual(set(['zk.novalocal', 'master.novalocal']),
set(cluster_spec.configurations['hbase-site'][
'hbase.zookeeper.quorum'].split(',')))
def test_get_storage_paths(self):
for version in versions:
s = self.get_services_processor(version=version)
service = s.create_service('AMBARI')
server1 = hdp_test_base.TestServer(
'host1', 'test-master', '11111', 3, '1.1.1.1', '2.2.2.2')
server2 = hdp_test_base.TestServer(
'host2', 'test-slave', '11111', 3, '3.3.3.3', '4.4.4.4')
server3 = hdp_test_base.TestServer(
'host3', 'another-test', '11111', 3, '6.6.6.6', '5.5.5.5')
ng1 = hdp_test_base.TestNodeGroup('ng1', [server1], None)
ng2 = hdp_test_base.TestNodeGroup('ng2', [server2], None)
ng3 = hdp_test_base.TestNodeGroup('ng3', [server3], None)
server1.storage_path = ['/volume/disk1']
server2.storage_path = ['/mnt']
paths = service._get_common_paths([ng1, ng2])
self.assertEqual([], paths)
server1.storage_path = ['/volume/disk1', '/volume/disk2']
server2.storage_path = ['/mnt']
server3.storage_path = ['/volume/disk1']
paths = service._get_common_paths([ng1, ng2, ng3])
self.assertEqual([], paths)
server1.storage_path = ['/volume/disk1', '/volume/disk2']
server2.storage_path = ['/volume/disk1']
server3.storage_path = ['/volume/disk1']
paths = service._get_common_paths([ng1, ng2, ng3])
self.assertEqual(['/volume/disk1'], paths)
def _create_hbase_cluster(self):
master_host = hdp_test_base.TestServer(
'master.novalocal', 'master', '11111', 3,
'111.11.1111', '222.11.1111')
master_ng = hdp_test_base.TestNodeGroup(
'master', [master_host], ["NAMENODE", "RESOURCEMANAGER",
"SECONDARY_NAMENODE", "NODEMANAGER",
"DATANODE", "AMBARI_SERVER",
"HISTORYSERVER", "ZOOKEEPER_SERVER"])
extra_zk_host = hdp_test_base.TestServer(
'zk.novalocal', 'zk', '11112', 3,
'111.11.1112', '222.11.1112')
extra_zk_ng = hdp_test_base.TestNodeGroup(
'zk', [extra_zk_host], ['ZOOKEEPER_SERVER'])
hbase_host = hdp_test_base.TestServer(
'hbase.novalocal', 'hbase', '11111', 3,
'222.22.2222', '222.11.1111')
hbase_ng = hdp_test_base.TestNodeGroup(
'hbase', [hbase_host], ["HBASE_MASTER"])
return hdp_test_base.TestCluster([master_ng, extra_zk_ng, hbase_ng])

View File

@ -1,35 +0,0 @@
# Copyright (c) 2013 Hortonworks, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from sahara.plugins.hdp.versions import versionhandlerfactory
from sahara.tests.unit import base
class VersionManagerFactoryTest(base.SaharaTestCase):
def test_get_versions(self):
factory = versionhandlerfactory.VersionHandlerFactory.get_instance()
versions = factory.get_versions()
self.assertEqual(1, len(versions))
self.assertIn('2.0.6', versions)
def test_get_version_handlers(self):
factory = versionhandlerfactory.VersionHandlerFactory.get_instance()
versions = factory.get_versions()
for version in versions:
handler = factory.get_version_handler(version)
self.assertIsNotNone(handler)
self.assertEqual(version, handler.get_version())

View File

@ -1,98 +0,0 @@
# Copyright (c) 2015 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from sahara.plugins.hdp.versions.version_2_0_6 import edp_engine
from sahara.tests.unit import base as sahara_base
from sahara.utils import edp
class HDP2ConfigHintsTest(sahara_base.SaharaTestCase):
@mock.patch(
'sahara.plugins.hdp.confighints_helper.get_possible_hive_config_from',
return_value={})
def test_get_possible_job_config_hive(self,
get_possible_hive_config_from):
expected_config = {'job_config': {}}
actual_config = edp_engine.EdpOozieEngine.get_possible_job_config(
edp.JOB_TYPE_HIVE)
get_possible_hive_config_from.assert_called_once_with(
'plugins/hdp/versions/version_2_0_6/resources/'
'ambari-config-resource.json')
self.assertEqual(expected_config, actual_config)
@mock.patch('sahara.plugins.hdp.edp_engine.EdpOozieEngine')
def test_get_possible_job_config_java(self, BaseHDPEdpOozieEngine):
expected_config = {'job_config': {}}
BaseHDPEdpOozieEngine.get_possible_job_config.return_value = (
expected_config)
actual_config = edp_engine.EdpOozieEngine.get_possible_job_config(
edp.JOB_TYPE_JAVA)
BaseHDPEdpOozieEngine.get_possible_job_config.assert_called_once_with(
edp.JOB_TYPE_JAVA)
self.assertEqual(expected_config, actual_config)
@mock.patch(
'sahara.plugins.hdp.confighints_helper.'
'get_possible_mapreduce_config_from',
return_value={})
def test_get_possible_job_config_mapreduce(
self, get_possible_mapreduce_config_from):
expected_config = {'job_config': {}}
actual_config = edp_engine.EdpOozieEngine.get_possible_job_config(
edp.JOB_TYPE_MAPREDUCE)
get_possible_mapreduce_config_from.assert_called_once_with(
'plugins/hdp/versions/version_2_0_6/resources/'
'ambari-config-resource.json')
self.assertEqual(expected_config, actual_config)
@mock.patch(
'sahara.plugins.hdp.confighints_helper.'
'get_possible_mapreduce_config_from',
return_value={})
def test_get_possible_job_config_mapreduce_streaming(
self, get_possible_mapreduce_config_from):
expected_config = {'job_config': {}}
actual_config = edp_engine.EdpOozieEngine.get_possible_job_config(
edp.JOB_TYPE_MAPREDUCE_STREAMING)
get_possible_mapreduce_config_from.assert_called_once_with(
'plugins/hdp/versions/version_2_0_6/resources/'
'ambari-config-resource.json')
self.assertEqual(expected_config, actual_config)
@mock.patch(
'sahara.plugins.hdp.confighints_helper.get_possible_pig_config_from',
return_value={})
def test_get_possible_job_config_pig(self,
get_possible_pig_config_from):
expected_config = {'job_config': {}}
actual_config = edp_engine.EdpOozieEngine.get_possible_job_config(
edp.JOB_TYPE_PIG)
get_possible_pig_config_from.assert_called_once_with(
'plugins/hdp/versions/version_2_0_6/resources/'
'ambari-config-resource.json')
self.assertEqual(expected_config, actual_config)
@mock.patch('sahara.plugins.hdp.edp_engine.EdpOozieEngine')
def test_get_possible_job_config_shell(self, BaseHDPEdpOozieEngine):
expected_config = {'job_config': {}}
BaseHDPEdpOozieEngine.get_possible_job_config.return_value = (
expected_config)
actual_config = edp_engine.EdpOozieEngine.get_possible_job_config(
edp.JOB_TYPE_SHELL)
BaseHDPEdpOozieEngine.get_possible_job_config.assert_called_once_with(
edp.JOB_TYPE_SHELL)
self.assertEqual(expected_config, actual_config)

View File

@ -41,7 +41,6 @@ wsgi_scripts =
sahara.cluster.plugins =
vanilla = sahara.plugins.vanilla.plugin:VanillaProvider
hdp = sahara.plugins.hdp.ambariplugin:AmbariPlugin
ambari = sahara.plugins.ambari.plugin:AmbariPluginProvider
mapr = sahara.plugins.mapr.plugin:MapRPlugin
cdh = sahara.plugins.cdh.plugin:CDHPluginProvider