Rename all ClusterContext variables to 'cluster_context'
Change-Id: Id88652c0ccaf9b5779d94ebdb977442fb2e00c80 Closes-Bug: #1612700
This commit is contained in:
parent
53f7854d58
commit
a2cf9813f7
@ -167,16 +167,16 @@ class BaseConfigurer(ac.AbstractConfigurer):
|
||||
util.execute_on_instances(instances, install_java)
|
||||
|
||||
@el.provision_step(_("Configure cluster topology"))
|
||||
def _configure_topology(self, context, instances):
|
||||
def _configure_topology(self, cluster_context, instances):
|
||||
LOG.debug("Configuring cluster topology")
|
||||
|
||||
topology_map = context.topology_map
|
||||
topology_map = cluster_context.topology_map
|
||||
topology_map = ("%s %s" % item for item in six.iteritems(topology_map))
|
||||
topology_map = "\n".join(topology_map) + "\n"
|
||||
|
||||
data_path = "%s/topology.data" % context.mapr_home
|
||||
data_path = "%s/topology.data" % cluster_context.mapr_home
|
||||
script = files.get_file_text(_TOPO_SCRIPT)
|
||||
script_path = '%s/topology.sh' % context.mapr_home
|
||||
script_path = '%s/topology.sh' % cluster_context.mapr_home
|
||||
|
||||
@el.provision_event()
|
||||
def write_topology_data(instance):
|
||||
@ -309,10 +309,11 @@ class BaseConfigurer(ac.AbstractConfigurer):
|
||||
LOG.debug('Executing configure.sh successfully completed')
|
||||
|
||||
@el.provision_event(instance_reference=2)
|
||||
def _configure_sh_instance(self, context, instance, command, specs):
|
||||
def _configure_sh_instance(self, cluster_context, instance, command,
|
||||
specs):
|
||||
if not self.mapr_user_exists(instance):
|
||||
command += ' --create-user'
|
||||
if context.check_for_process(instance, mng.METRICS):
|
||||
if cluster_context.check_for_process(instance, mng.METRICS):
|
||||
command += (' -d %(host)s:%(port)s -du %(user)s -dp %(password)s '
|
||||
'-ds %(db_name)s') % specs
|
||||
with instance.remote() as r:
|
||||
@ -338,12 +339,13 @@ class BaseConfigurer(ac.AbstractConfigurer):
|
||||
'mapr', run_as_root=True, raise_when_error=False)
|
||||
return ec == 0
|
||||
|
||||
def post_start(self, c_context, instances=None):
|
||||
instances = instances or c_context.get_instances()
|
||||
def post_start(self, cluster_context, instances=None):
|
||||
instances = instances or cluster_context.get_instances()
|
||||
LOG.debug('Executing service post start hooks')
|
||||
for service in c_context.cluster_services:
|
||||
updated = c_context.filter_instances(instances, service=service)
|
||||
service.post_start(c_context, updated)
|
||||
for service in cluster_context.cluster_services:
|
||||
updated = cluster_context.filter_instances(instances,
|
||||
service=service)
|
||||
service.post_start(cluster_context, updated)
|
||||
LOG.info(_LI('Post start hooks successfully executed'))
|
||||
|
||||
@el.provision_step(_("Set cluster mode"))
|
||||
@ -372,10 +374,11 @@ class BaseConfigurer(ac.AbstractConfigurer):
|
||||
|
||||
util.execute_on_instances(instances, install_mapr_repos)
|
||||
|
||||
def _update_services(self, c_context, instances):
|
||||
for service in c_context.cluster_services:
|
||||
updated = c_context.filter_instances(instances, service=service)
|
||||
service.update(c_context, updated)
|
||||
def _update_services(self, cluster_context, instances):
|
||||
for service in cluster_context.cluster_services:
|
||||
updated = cluster_context.filter_instances(instances,
|
||||
service=service)
|
||||
service.update(cluster_context, updated)
|
||||
|
||||
def _restart_services(self, cluster_context):
|
||||
restart = cluster_context.should_be_restarted
|
||||
|
@ -25,7 +25,7 @@ from sahara.utils import edp
|
||||
class MapROozieJobEngine(e.OozieJobEngine):
|
||||
def __init__(self, cluster):
|
||||
super(MapROozieJobEngine, self).__init__(cluster)
|
||||
self.ctx = self._get_cluster_context(self.cluster)
|
||||
self.cluster_context = self._get_cluster_context(self.cluster)
|
||||
|
||||
hdfs_user = 'mapr'
|
||||
|
||||
@ -69,16 +69,16 @@ class MapROozieJobEngine(e.OozieJobEngine):
|
||||
return uploaded_paths
|
||||
|
||||
def get_name_node_uri(self, cluster):
|
||||
return self.ctx.name_node_uri
|
||||
return self.cluster_context.name_node_uri
|
||||
|
||||
def get_oozie_server_uri(self, cluster):
|
||||
return self.ctx.oozie_server_uri
|
||||
return self.cluster_context.oozie_server_uri
|
||||
|
||||
def get_oozie_server(self, cluster):
|
||||
return self.ctx.oozie_server
|
||||
return self.cluster_context.oozie_server
|
||||
|
||||
def get_resource_manager_uri(self, cluster):
|
||||
return self.ctx.resource_manager_uri
|
||||
return self.cluster_context.resource_manager_uri
|
||||
|
||||
def _get_cluster_context(self, cluster):
|
||||
h_version = cluster.hadoop_version
|
||||
|
@ -27,10 +27,8 @@ import sahara.plugins.mapr.abstract.node_manager as s
|
||||
import sahara.plugins.mapr.services.management.management as mng
|
||||
import sahara.plugins.mapr.services.maprfs.maprfs as mfs
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
GET_SERVER_ID_CMD = ('maprcli node list -json -filter [ip==%s] -columns id'
|
||||
' | grep id | grep -o \'[0-9]*\'')
|
||||
NODE_LIST_CMD = 'maprcli node list -json'
|
||||
@ -59,15 +57,16 @@ class BaseNodeManager(s.AbstractNodeManager):
|
||||
cldb_remote.execute_command(command, run_as_root=True)
|
||||
LOG.info(_LI("Nodes successfully moved"))
|
||||
|
||||
def remove_nodes(self, c_context, instances):
|
||||
def remove_nodes(self, cluster_context, instances):
|
||||
LOG.debug("Removing nodes from cluster")
|
||||
cldb_instances = self._get_cldb_instances(c_context, instances)
|
||||
cldb_instances = self._get_cldb_instances(cluster_context, instances)
|
||||
with random.choice(cldb_instances).remote() as cldb_remote:
|
||||
for instance in instances:
|
||||
args = {
|
||||
'ip': instance.internal_ip,
|
||||
'nodes': instance.fqdn(),
|
||||
'zookeepers': c_context.get_zookeeper_nodes_ip_with_port(),
|
||||
'zookeepers':
|
||||
cluster_context.get_zookeeper_nodes_ip_with_port(),
|
||||
}
|
||||
command = REMOVE_NODE_CMD % args
|
||||
cldb_remote.execute_command(command, run_as_root=True)
|
||||
@ -109,11 +108,11 @@ class BaseNodeManager(s.AbstractNodeManager):
|
||||
ips = [n['ip'] for n in resp['data']]
|
||||
retry_count += 1
|
||||
for i in instances:
|
||||
if (i.internal_ip not in ips
|
||||
and retry_count > DEFAULT_RETRY_COUNT):
|
||||
raise ex.HadoopProvisionError(_(
|
||||
"Node failed to connect to CLDB: %s") %
|
||||
i.internal_ip)
|
||||
if (i.internal_ip not in ips and
|
||||
(retry_count > DEFAULT_RETRY_COUNT)):
|
||||
msg = _("Node failed to connect to CLDB: %s"
|
||||
) % i.internal_ip
|
||||
raise ex.HadoopProvisionError(msg)
|
||||
break
|
||||
else:
|
||||
context.sleep(DELAY)
|
||||
@ -170,9 +169,10 @@ class BaseNodeManager(s.AbstractNodeManager):
|
||||
def _stop_service(self, instance, service):
|
||||
return self._do_service_action(instance, service, STOP)
|
||||
|
||||
def _get_cldb_instances(self, c_context, instances):
|
||||
current = self._get_current_cluster_instances(c_context, instances)
|
||||
return c_context.filter_instances(current, mfs.CLDB)
|
||||
def _get_cldb_instances(self, cluster_context, instances):
|
||||
current = self._get_current_cluster_instances(cluster_context,
|
||||
instances)
|
||||
return cluster_context.filter_instances(current, mfs.CLDB)
|
||||
|
||||
@staticmethod
|
||||
def await_no_heartbeat():
|
||||
|
@ -66,38 +66,41 @@ class Hive(s.Service):
|
||||
def require_no_sasl(self, version):
|
||||
return version not in ['3.9.0']
|
||||
|
||||
def _get_hive_site_props(self, context):
|
||||
def _get_hive_site_props(self, cluster_context):
|
||||
# Import here to resolve circular dependency
|
||||
from sahara.plugins.mapr.services.mysql import mysql
|
||||
|
||||
zookeepers = context.get_zookeeper_nodes_ip()
|
||||
zookeepers = cluster_context.get_zookeeper_nodes_ip()
|
||||
metastore_specs = mysql.MySQL.METASTORE_SPECS
|
||||
|
||||
return {
|
||||
'javax.jdo.option.ConnectionDriverName': mysql.MySQL.DRIVER_CLASS,
|
||||
'javax.jdo.option.ConnectionURL': self._get_jdbc_uri(context),
|
||||
'javax.jdo.option.ConnectionURL': self._get_jdbc_uri(
|
||||
cluster_context),
|
||||
'javax.jdo.option.ConnectionUserName': metastore_specs.user,
|
||||
'javax.jdo.option.ConnectionPassword': metastore_specs.password,
|
||||
'hive.metastore.uris': self._get_metastore_uri(context),
|
||||
'hive.metastore.uris': self._get_metastore_uri(cluster_context),
|
||||
'hive.zookeeper.quorum': zookeepers,
|
||||
'hbase.zookeeper.quorum': zookeepers,
|
||||
}
|
||||
|
||||
def _get_jdbc_uri(self, context):
|
||||
def _get_jdbc_uri(self, cluster_context):
|
||||
# Import here to resolve circular dependency
|
||||
from sahara.plugins.mapr.services.mysql import mysql
|
||||
|
||||
jdbc_uri = ('jdbc:mysql://%(db_host)s:%(db_port)s/%(db_name)s?'
|
||||
'createDatabaseIfNotExist=true')
|
||||
jdbc_args = {
|
||||
'db_host': mysql.MySQL.get_db_instance(context).internal_ip,
|
||||
'db_host': mysql.MySQL.get_db_instance(
|
||||
cluster_context).internal_ip,
|
||||
'db_port': mysql.MySQL.MYSQL_SERVER_PORT,
|
||||
'db_name': mysql.MySQL.METASTORE_SPECS.db_name,
|
||||
}
|
||||
return jdbc_uri % jdbc_args
|
||||
|
||||
def _get_metastore_uri(self, context):
|
||||
return 'thrift://%s:9083' % context.get_instance_ip(HIVE_METASTORE)
|
||||
def _get_metastore_uri(self, cluster_context):
|
||||
return 'thrift://%s:9083' % cluster_context.get_instance_ip(
|
||||
HIVE_METASTORE)
|
||||
|
||||
def post_start(self, cluster_context, instances):
|
||||
# Import here to resolve circular dependency
|
||||
|
@ -122,9 +122,9 @@ class Hue(s.Service):
|
||||
|
||||
return [hue_ini, hue_sh]
|
||||
|
||||
def _get_hue_ini_props(self, context):
|
||||
db_instance = mysql.MySQL.get_db_instance(context)
|
||||
is_yarn = context.cluster_mode == 'yarn'
|
||||
def _get_hue_ini_props(self, cluster_context):
|
||||
db_instance = mysql.MySQL.get_db_instance(cluster_context)
|
||||
is_yarn = cluster_context.cluster_mode == 'yarn'
|
||||
hue_specs = mysql.MySQL.HUE_SPECS
|
||||
rdbms_specs = mysql.MySQL.RDBMS_SPECS
|
||||
|
||||
@ -136,36 +136,39 @@ class Hue(s.Service):
|
||||
'rdbms_name': rdbms_specs.db_name,
|
||||
'rdbms_user': rdbms_specs.user,
|
||||
'rdbms_password': rdbms_specs.password,
|
||||
'resource_manager_uri': context.resource_manager_uri,
|
||||
'resource_manager_uri': cluster_context.resource_manager_uri,
|
||||
'yarn_mode': is_yarn,
|
||||
'rm_host': context.get_instance_ip(yarn.RESOURCE_MANAGER),
|
||||
'webhdfs_url': context.get_instance_ip(httpfs.HTTP_FS),
|
||||
'jt_host': context.get_instance_ip(mr.JOB_TRACKER),
|
||||
'oozie_host': context.get_instance_ip(oozie.OOZIE),
|
||||
'sqoop_host': context.get_instance_ip(sqoop.SQOOP_2_SERVER),
|
||||
'impala_host': context.get_instance_ip(impala.IMPALA_STATE_STORE),
|
||||
'zk_hosts_with_port': context.get_zookeeper_nodes_ip_with_port(),
|
||||
'rm_host': cluster_context.get_instance_ip(yarn.RESOURCE_MANAGER),
|
||||
'webhdfs_url': cluster_context.get_instance_ip(httpfs.HTTP_FS),
|
||||
'jt_host': cluster_context.get_instance_ip(mr.JOB_TRACKER),
|
||||
'oozie_host': cluster_context.get_instance_ip(oozie.OOZIE),
|
||||
'sqoop_host': cluster_context.get_instance_ip(
|
||||
sqoop.SQOOP_2_SERVER),
|
||||
'impala_host': cluster_context.get_instance_ip(
|
||||
impala.IMPALA_STATE_STORE),
|
||||
'zk_hosts_with_port':
|
||||
cluster_context.get_zookeeper_nodes_ip_with_port(),
|
||||
'secret_key': self._generate_secret_key()
|
||||
}
|
||||
|
||||
hive_host = context.get_instance(hive.HIVE_SERVER_2)
|
||||
hive_host = cluster_context.get_instance(hive.HIVE_SERVER_2)
|
||||
if hive_host:
|
||||
hive_service = context.get_service(hive.HIVE_SERVER_2)
|
||||
hive_service = cluster_context.get_service(hive.HIVE_SERVER_2)
|
||||
result.update({
|
||||
'hive_host': hive_host.internal_ip,
|
||||
'hive_version': hive_service.version,
|
||||
'hive_conf_dir': hive_service.conf_dir(context),
|
||||
'hive_conf_dir': hive_service.conf_dir(cluster_context),
|
||||
})
|
||||
|
||||
hbase_host = context.get_instance(hbase.HBASE_THRIFT)
|
||||
hbase_host = cluster_context.get_instance(hbase.HBASE_THRIFT)
|
||||
if hbase_host:
|
||||
hbase_service = context.get_service(hbase.HBASE_THRIFT)
|
||||
hbase_service = cluster_context.get_service(hbase.HBASE_THRIFT)
|
||||
result.update({
|
||||
'hbase_host': hbase_host.internal_ip,
|
||||
'hbase_conf_dir': hbase_service.conf_dir(context),
|
||||
'hbase_conf_dir': hbase_service.conf_dir(cluster_context),
|
||||
})
|
||||
|
||||
livy_host = context.get_instance(HUE_LIVY)
|
||||
livy_host = cluster_context.get_instance(HUE_LIVY)
|
||||
if livy_host:
|
||||
result.update({
|
||||
'livy_host': livy_host.internal_ip
|
||||
@ -176,7 +179,7 @@ class Hue(s.Service):
|
||||
def post_install(self, cluster_context, instances):
|
||||
hue_instance = cluster_context.get_instance(HUE)
|
||||
|
||||
def migrate_database(remote, context):
|
||||
def migrate_database(remote, cluster_context):
|
||||
hue_home = self.home_dir(cluster_context)
|
||||
cmd = '%(activate)s && %(syncdb)s && %(migrate)s'
|
||||
args = {
|
||||
@ -217,7 +220,7 @@ class Hue(s.Service):
|
||||
if filtered_instances:
|
||||
node_process.restart(filtered_instances)
|
||||
|
||||
def _should_restart(self, c_context, instances):
|
||||
def _should_restart(self, cluster_context, instances):
|
||||
app_services = [
|
||||
impala.Impala(),
|
||||
hive.Hive(),
|
||||
@ -225,8 +228,9 @@ class Hue(s.Service):
|
||||
sqoop.Sqoop2(),
|
||||
spark.SparkOnYarn(),
|
||||
]
|
||||
instances = [c_context.filter_instances(instances, service=service)
|
||||
for service in app_services]
|
||||
instances = [
|
||||
cluster_context.filter_instances(instances, service=service)
|
||||
for service in app_services]
|
||||
return bool(g.unique_list(itertools.chain(*instances)))
|
||||
|
||||
def jt_plugin_path(self, cluster_context):
|
||||
|
@ -52,7 +52,7 @@ class Impala(s.Service):
|
||||
IMPALA_STATE_STORE,
|
||||
]
|
||||
|
||||
def _get_impala_env_props(self, context):
|
||||
def _get_impala_env_props(self, cluster_context):
|
||||
return {}
|
||||
|
||||
def post_start(self, cluster_context, instances):
|
||||
@ -69,9 +69,9 @@ class Impala(s.Service):
|
||||
run_as='root', owner='mapr')
|
||||
|
||||
# hive service instance
|
||||
def _hive(self, context):
|
||||
hive_version = context.get_chosen_service_version('Hive')
|
||||
return context._find_service_instance('Hive', hive_version)
|
||||
def _hive(self, cluster_context):
|
||||
hive_version = cluster_context.get_chosen_service_version('Hive')
|
||||
return cluster_context._find_service_instance('Hive', hive_version)
|
||||
|
||||
def get_config_files(self, cluster_context, configs, instance=None):
|
||||
defaults = 'plugins/mapr/services/impala/resources/impala-env.sh.j2'
|
||||
@ -104,11 +104,12 @@ class ImpalaV141(Impala):
|
||||
vu.at_least(1, IMPALA_SERVER),
|
||||
]
|
||||
|
||||
def _get_impala_env_props(self, context):
|
||||
def _get_impala_env_props(self, cluster_context):
|
||||
return {
|
||||
'impala_version': self.version,
|
||||
'statestore_host': context.get_instance_ip(IMPALA_STATE_STORE),
|
||||
'catalog_host': context.get_instance_ip(IMPALA_CATALOG),
|
||||
'statestore_host': cluster_context.get_instance_ip(
|
||||
IMPALA_STATE_STORE),
|
||||
'catalog_host': cluster_context.get_instance_ip(IMPALA_CATALOG),
|
||||
}
|
||||
|
||||
|
||||
@ -128,11 +129,12 @@ class ImpalaV220(Impala):
|
||||
vu.required_os('centos', self)
|
||||
]
|
||||
|
||||
def _get_impala_env_props(self, context):
|
||||
def _get_impala_env_props(self, cluster_context):
|
||||
return {
|
||||
'impala_version': self.version,
|
||||
'statestore_host': context.get_instance_ip(IMPALA_STATE_STORE),
|
||||
'catalog_host': context.get_instance_ip(IMPALA_CATALOG),
|
||||
'statestore_host': cluster_context.get_instance_ip(
|
||||
IMPALA_STATE_STORE),
|
||||
'catalog_host': cluster_context.get_instance_ip(IMPALA_CATALOG),
|
||||
}
|
||||
|
||||
def _get_packages(self, cluster_context, node_processes):
|
||||
|
@ -78,9 +78,9 @@ class MapReduce(s.Service):
|
||||
|
||||
return [core_site, mapred_site]
|
||||
|
||||
def _get_core_site_props(self, context):
|
||||
def _get_core_site_props(self, cluster_context):
|
||||
result = {}
|
||||
if context.is_node_aware:
|
||||
if cluster_context.is_node_aware:
|
||||
result.update(self._get_core_site_node_aware_props())
|
||||
for conf in swift_helper.get_swift_configs():
|
||||
result[conf['name']] = conf['value']
|
||||
@ -97,9 +97,9 @@ class MapReduce(s.Service):
|
||||
})
|
||||
return result
|
||||
|
||||
def _get_mapred_site_props(self, context):
|
||||
def _get_mapred_site_props(self, cluster_context):
|
||||
result = {}
|
||||
if context.is_node_aware:
|
||||
if cluster_context.is_node_aware:
|
||||
result.update(self._get_mapred_site_node_aware_props())
|
||||
result.update({
|
||||
'jobtracker.thrift.address': '0.0.0.0:9290',
|
||||
|
@ -148,6 +148,6 @@ class MapRFS(s.Service):
|
||||
|
||||
return [cldb_conf, warden_conf]
|
||||
|
||||
def _get_cldb_conf_props(self, context):
|
||||
zookeepers = context.get_zookeeper_nodes_ip_with_port()
|
||||
def _get_cldb_conf_props(self, cluster_context):
|
||||
zookeepers = cluster_context.get_zookeeper_nodes_ip_with_port()
|
||||
return {'cldb.zookeeper.servers': zookeepers}
|
||||
|
@ -155,8 +155,9 @@ class MySQL(s.Service):
|
||||
return list()
|
||||
|
||||
@staticmethod
|
||||
def get_db_instance(context):
|
||||
return context.oozie_server or context.get_instance(spark.SPARK_MASTER)
|
||||
def get_db_instance(cluster_context):
|
||||
return cluster_context.oozie_server or cluster_context.get_instance(
|
||||
spark.SPARK_MASTER)
|
||||
|
||||
@staticmethod
|
||||
def create_databases(cluster_context, instances):
|
||||
|
@ -14,7 +14,7 @@
|
||||
|
||||
from oslo_log import log as logging
|
||||
|
||||
import sahara.context as con
|
||||
import sahara.context as context
|
||||
import sahara.plugins.mapr.domain.configuration_file as bcf
|
||||
import sahara.plugins.mapr.domain.node_process as np
|
||||
import sahara.plugins.mapr.domain.service as s
|
||||
@ -22,7 +22,6 @@ import sahara.plugins.mapr.services.mysql.mysql as mysql
|
||||
import sahara.plugins.mapr.util.general as g
|
||||
import sahara.plugins.mapr.util.validation_utils as vu
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
OOZIE_START_DELAY = 30
|
||||
|
||||
@ -57,25 +56,27 @@ class Oozie(s.Service):
|
||||
oozie_site.add_properties(self._get_oozie_site_props(cluster_context))
|
||||
return [oozie_site]
|
||||
|
||||
def _get_oozie_site_props(self, context):
|
||||
def _get_oozie_site_props(self, cluster_context):
|
||||
oozie_specs = mysql.MySQL.OOZIE_SPECS
|
||||
|
||||
return {
|
||||
'oozie.db.schema.name': oozie_specs.db_name,
|
||||
'oozie.service.JPAService.create.db.schema': True,
|
||||
'oozie.service.JPAService.jdbc.driver': mysql.MySQL.DRIVER_CLASS,
|
||||
'oozie.service.JPAService.jdbc.url': self._get_jdbc_uri(context),
|
||||
'oozie.service.JPAService.jdbc.url': self._get_jdbc_uri(
|
||||
cluster_context),
|
||||
'oozie.service.JPAService.jdbc.username': oozie_specs.user,
|
||||
'oozie.service.JPAService.jdbc.password': oozie_specs.password,
|
||||
'oozie.service.HadoopAccessorService.hadoop.configurations':
|
||||
'*=%s' % context.hadoop_conf
|
||||
'*=%s' % cluster_context.hadoop_conf
|
||||
}
|
||||
|
||||
def _get_jdbc_uri(self, context):
|
||||
def _get_jdbc_uri(self, cluster_context):
|
||||
jdbc_uri = ('jdbc:mysql://%(db_host)s:%(db_port)s/%(db_name)s?'
|
||||
'createDatabaseIfNotExist=true')
|
||||
jdbc_args = {
|
||||
'db_host': mysql.MySQL.get_db_instance(context).internal_ip,
|
||||
'db_host': mysql.MySQL.get_db_instance(
|
||||
cluster_context).internal_ip,
|
||||
'db_port': mysql.MySQL.MYSQL_SERVER_PORT,
|
||||
'db_name': mysql.MySQL.OOZIE_SPECS.db_name,
|
||||
}
|
||||
@ -132,7 +133,7 @@ class Oozie(s.Service):
|
||||
g.execute_on_instances(
|
||||
instances, self._rebuild_oozie_war, cluster_context)
|
||||
OOZIE.start(instances)
|
||||
con.sleep(OOZIE_START_DELAY)
|
||||
context.sleep(OOZIE_START_DELAY)
|
||||
|
||||
|
||||
class OozieV401(Oozie):
|
||||
|
@ -177,66 +177,67 @@ class SparkOnYarn(s.Service):
|
||||
mfs.chmod(r, home, 777, run_as=run_as_user)
|
||||
mfs.chmod(r, libs, 777, run_as=run_as_user)
|
||||
|
||||
def _hive_properties(self, context):
|
||||
hive_version = self._hive(context).version
|
||||
hive_conf = self._hive(context).conf_dir(context)
|
||||
def _hive_properties(self, cluster_context):
|
||||
hive_version = self._hive(cluster_context).version
|
||||
hive_conf = self._hive(cluster_context).conf_dir(cluster_context)
|
||||
hive_site = hive_conf + '/hive-site.xml'
|
||||
hive_datanucleus_libs = self._hive_datanucleus_libs_path(context)
|
||||
hive_libs = self._hive_libs_path(context)
|
||||
hadoop_libs = self._hadoop_libs_path(context)
|
||||
hive_datanucleus_libs = self._hive_datanucleus_libs_path(
|
||||
cluster_context)
|
||||
hive_libs = self._hive_libs_path(cluster_context)
|
||||
hadoop_libs = self._hadoop_libs_path(cluster_context)
|
||||
hive_datanucleus_libs.insert(0, hive_site)
|
||||
mfs_paths = self._hive_datanucleus_libs_path(context)
|
||||
mfs_paths = self._hive_datanucleus_libs_path(cluster_context)
|
||||
return {
|
||||
'spark.yarn.dist.files': ','.join(mfs_paths),
|
||||
'spark.sql.hive.metastore.version': hive_version + '.0',
|
||||
'spark.sql.hive.metastore.jars': ':'.join(hadoop_libs + hive_libs)
|
||||
}
|
||||
|
||||
def _hadoop_libs_path(self, context):
|
||||
def _hadoop_libs_path(self, cluster_context):
|
||||
cmd = 'echo $(hadoop classpath)'
|
||||
with context.get_instance(SPARK_HISTORY_SERVER).remote() as r:
|
||||
with cluster_context.get_instance(SPARK_HISTORY_SERVER).remote() as r:
|
||||
result = r.execute_command(cmd, run_as_root=True, timeout=600)
|
||||
return result[1].replace('\n', '').split(':')
|
||||
|
||||
def _hive_libs_path(self, context):
|
||||
def _hive_libs_path(self, cluster_context):
|
||||
cmd = "find %s -name '*.jar'" % (
|
||||
self._hive(context).home_dir(context) + '/lib')
|
||||
with context.get_instance(hive.HIVE_METASTORE).remote() as r:
|
||||
self._hive(cluster_context).home_dir(cluster_context) + '/lib')
|
||||
with cluster_context.get_instance(hive.HIVE_METASTORE).remote() as r:
|
||||
result = r.execute_command(cmd, run_as_root=True, timeout=600)
|
||||
return [x for x in list(result[1].split('\n')) if x]
|
||||
|
||||
def _assembly_jar_path(self, context):
|
||||
def _assembly_jar_path(self, cluster_context):
|
||||
cmd = "find %s -name 'spark-assembly*.jar'" % (
|
||||
self.home_dir(context) + '/lib')
|
||||
with context.get_instance(SPARK_HISTORY_SERVER).remote() as r:
|
||||
self.home_dir(cluster_context) + '/lib')
|
||||
with cluster_context.get_instance(SPARK_HISTORY_SERVER).remote() as r:
|
||||
result = r.execute_command(cmd, run_as_root=True, timeout=600)
|
||||
if result[1]:
|
||||
return result[1].strip()
|
||||
else:
|
||||
raise Exception("no spark-assembly lib found!")
|
||||
|
||||
def _hive_datanucleus_libs_path(self, context):
|
||||
def _hive_datanucleus_libs_path(self, cluster_context):
|
||||
cmd = "find %s -name 'datanucleus-*.jar'" % (
|
||||
self._hive(context).home_dir(context) + '/lib')
|
||||
with context.get_instance(hive.HIVE_METASTORE).remote() as r:
|
||||
self._hive(cluster_context).home_dir(cluster_context) + '/lib')
|
||||
with cluster_context.get_instance(hive.HIVE_METASTORE).remote() as r:
|
||||
result = r.execute_command(cmd, run_as_root=True, timeout=600)
|
||||
return [x for x in list(result[1].split('\n')) if x]
|
||||
|
||||
# hive installed service instance
|
||||
def _hive(self, context):
|
||||
hive_instance = context.get_instance(hive.HIVE_SERVER_2)
|
||||
def _hive(self, cluster_context):
|
||||
hive_instance = cluster_context.get_instance(hive.HIVE_SERVER_2)
|
||||
if not hive_instance:
|
||||
return None
|
||||
hive_version = context.get_chosen_service_version('Hive')
|
||||
return context._find_service_instance('Hive', hive_version)
|
||||
hive_version = cluster_context.get_chosen_service_version('Hive')
|
||||
return cluster_context._find_service_instance('Hive', hive_version)
|
||||
|
||||
# hbase installed service instance
|
||||
def _hbase(self, context):
|
||||
hbase_instance = context.get_instance(hbase.HBASE_MASTER)
|
||||
def _hbase(self, cluster_context):
|
||||
hbase_instance = cluster_context.get_instance(hbase.HBASE_MASTER)
|
||||
if not hbase_instance:
|
||||
return None
|
||||
hbase_version = context.get_chosen_service_version('HBase')
|
||||
return context._find_service_instance('HBase', hbase_version)
|
||||
hbase_version = cluster_context.get_chosen_service_version('HBase')
|
||||
return cluster_context._find_service_instance('HBase', hbase_version)
|
||||
|
||||
def _get_hbase_version(self, cluster_context):
|
||||
return (self._hbase(cluster_context).version
|
||||
@ -247,20 +248,20 @@ class SparkOnYarn(s.Service):
|
||||
if self._hive(cluster_context) else None)
|
||||
|
||||
# hue installed service instance
|
||||
def _hue(self, context):
|
||||
hue_instance = context.get_instance('Hue')
|
||||
def _hue(self, cluster_context):
|
||||
hue_instance = cluster_context.get_instance('Hue')
|
||||
if not hue_instance:
|
||||
return None
|
||||
hue_version = context.get_chosen_service_version('Hue')
|
||||
return context._find_service_instance('Hue', hue_version)
|
||||
hue_version = cluster_context.get_chosen_service_version('Hue')
|
||||
return cluster_context._find_service_instance('Hue', hue_version)
|
||||
|
||||
def _copy_jar_from_hue(self, context):
|
||||
if not self._hue(context):
|
||||
def _copy_jar_from_hue(self, cluster_context):
|
||||
if not self._hue(cluster_context):
|
||||
return
|
||||
jar_path = "%s/apps/spark/java-lib/javax.servlet-api-*.jar" % \
|
||||
self._hue(context).home_dir(context)
|
||||
path = '%s/lib/' % self.home_dir(context) + self.SERVLET_JAR
|
||||
with context.get_instance('Hue').remote() as r1:
|
||||
for instance in context.get_instances(SPARK_SLAVE):
|
||||
self._hue(cluster_context).home_dir(cluster_context)
|
||||
path = '%s/lib/' % self.home_dir(cluster_context) + self.SERVLET_JAR
|
||||
with cluster_context.get_instance('Hue').remote() as r1:
|
||||
for instance in cluster_context.get_instances(SPARK_SLAVE):
|
||||
with instance.remote() as r2:
|
||||
mfs.exchange(r1, r2, jar_path, path, 'mapr')
|
||||
|
@ -42,5 +42,5 @@ class Sqoop2(s.Service):
|
||||
vu.exactly(1, SQOOP_2_SERVER),
|
||||
]
|
||||
|
||||
def post_install(self, context, instances):
|
||||
self._set_service_dir_owner(context, instances)
|
||||
def post_install(self, cluster_context, instances):
|
||||
self._set_service_dir_owner(cluster_context, instances)
|
||||
|
@ -37,16 +37,17 @@ class Swift(s.Service):
|
||||
# swift does not require any package
|
||||
pass
|
||||
|
||||
def configure(self, context, instances=None):
|
||||
instances = instances or context.get_instances()
|
||||
file_servers = context.filter_instances(instances, maprfs.FILE_SERVER)
|
||||
self._install_swift_jar(context, file_servers)
|
||||
def configure(self, cluster_context, instances=None):
|
||||
instances = instances or cluster_context.get_instances()
|
||||
file_servers = cluster_context.filter_instances(instances,
|
||||
maprfs.FILE_SERVER)
|
||||
self._install_swift_jar(cluster_context, file_servers)
|
||||
|
||||
@el.provision_step("Install Swift service")
|
||||
def _install_swift_jar(self, context, instances):
|
||||
def _install_swift_jar(self, cluster_context, instances):
|
||||
LOG.debug('Installing Swift jar')
|
||||
jar = f.get_file_text(Swift.HADOOP_SWIFT_JAR)
|
||||
path = '%s/swift.jar' % context.hadoop_lib
|
||||
path = '%s/swift.jar' % cluster_context.hadoop_lib
|
||||
|
||||
@el.provision_event()
|
||||
def install_on_instance(inst):
|
||||
|
@ -77,12 +77,12 @@ class YARN(s.Service):
|
||||
|
||||
return [yarn_site, core_site]
|
||||
|
||||
def _get_core_site_props(self, context):
|
||||
def _get_core_site_props(self, cluster_context):
|
||||
result = {
|
||||
'hadoop.proxyuser.mapr.groups': '*',
|
||||
'hadoop.proxyuser.mapr.hosts': '*',
|
||||
}
|
||||
if context.is_node_aware:
|
||||
if cluster_context.is_node_aware:
|
||||
result.update(self._get_core_site_node_aware_props())
|
||||
for conf in swift_helper.get_swift_configs():
|
||||
result[conf['name']] = conf['value']
|
||||
@ -97,7 +97,7 @@ class YARN(s.Service):
|
||||
})
|
||||
return result
|
||||
|
||||
def _get_yarn_site_props(self, context):
|
||||
def _get_yarn_site_props(self, cluster_context):
|
||||
return {
|
||||
'hadoop.proxyuser.mapr.groups': '*',
|
||||
'hadoop.proxyuser.mapr.hosts': '*',
|
||||
|
Loading…
Reference in New Issue
Block a user