Merge "Add a common HBase lib in hdfs on cluster start"
This commit is contained in:
commit
3fda398489
|
@ -75,6 +75,10 @@ ENABLE_SWIFT = p.Config('Enable Swift', 'general', 'cluster',
|
|||
config_type='bool', priority=1,
|
||||
default_value=True)
|
||||
|
||||
ENABLE_HBASE_COMMON_LIB = p.Config('Enable HBase Common Lib',
|
||||
'general', 'cluster', config_type='bool',
|
||||
priority=1, default_value=True)
|
||||
|
||||
SWIFT_LIB_URL = p.Config(
|
||||
'Hadoop OpenStack library URL', 'general', 'cluster', priority=1,
|
||||
default_value=DEFAULT_SWIFT_LIB_URL,
|
||||
|
@ -91,7 +95,8 @@ EXTJS_LIB_URL = p.Config(
|
|||
|
||||
def _get_cluster_plugin_configs():
|
||||
return [CDH5_REPO_URL, CDH5_REPO_KEY_URL, CM5_REPO_URL, CM5_REPO_KEY_URL,
|
||||
ENABLE_SWIFT, SWIFT_LIB_URL, EXTJS_LIB_URL]
|
||||
ENABLE_SWIFT, ENABLE_HBASE_COMMON_LIB, SWIFT_LIB_URL,
|
||||
EXTJS_LIB_URL]
|
||||
|
||||
|
||||
# ng wide configs
|
||||
|
@ -208,6 +213,10 @@ def is_swift_enabled(cluster):
|
|||
return _get_config_value(cluster, ENABLE_SWIFT)
|
||||
|
||||
|
||||
def is_hbase_common_lib_enabled(cluster):
|
||||
return _get_config_value(cluster, ENABLE_HBASE_COMMON_LIB)
|
||||
|
||||
|
||||
def get_swift_lib_url(cluster):
|
||||
return _get_config_value(cluster, SWIFT_LIB_URL)
|
||||
|
||||
|
|
|
@ -17,6 +17,7 @@ from sahara.i18n import _
|
|||
from sahara.plugins.cdh import commands as cmd
|
||||
from sahara.plugins.cdh.v5 import cloudera_utils as cu
|
||||
from sahara.plugins import utils as gu
|
||||
from sahara.service.edp import hdfs_helper as h
|
||||
from sahara.utils import cluster_progress_ops as cpo
|
||||
|
||||
|
||||
|
@ -220,6 +221,17 @@ def start_cluster(cluster):
|
|||
if CU.pu.get_hbase_master(cluster):
|
||||
start_hbase_master(cluster, cm_cluster)
|
||||
|
||||
create_hbase_common_lib(cluster)
|
||||
|
||||
|
||||
@cpo.event_wrapper(
|
||||
True, step=_("Create HBase common lib"), param=('cluster', 0))
|
||||
def create_hbase_common_lib(cluster):
|
||||
server = CU.pu.get_hbase_master(cluster)
|
||||
if CU.pu.c_helper.is_hbase_common_lib_enabled(cluster) and server:
|
||||
with server.remote() as r:
|
||||
h.create_hbase_common_lib(r)
|
||||
|
||||
|
||||
def get_open_ports(node_group):
|
||||
ports = [9000] # for CM agent
|
||||
|
|
|
@ -132,6 +132,10 @@ ENABLE_SWIFT = p.Config('Enable Swift', 'general', 'cluster',
|
|||
config_type='bool', priority=1,
|
||||
default_value=True)
|
||||
|
||||
ENABLE_HBASE_COMMON_LIB = p.Config('Enable HBase Common Lib',
|
||||
'general', 'cluster', config_type='bool',
|
||||
priority=1, default_value=True)
|
||||
|
||||
SWIFT_LIB_URL = p.Config(
|
||||
'Hadoop OpenStack library URL', 'general', 'cluster', priority=1,
|
||||
default_value=DEFAULT_SWIFT_LIB_URL,
|
||||
|
@ -148,7 +152,8 @@ EXTJS_LIB_URL = p.Config(
|
|||
|
||||
def _get_cluster_plugin_configs():
|
||||
return [CDH5_REPO_URL, CDH5_REPO_KEY_URL, CM5_REPO_URL, CM5_REPO_KEY_URL,
|
||||
ENABLE_SWIFT, SWIFT_LIB_URL, EXTJS_LIB_URL]
|
||||
ENABLE_SWIFT, ENABLE_HBASE_COMMON_LIB, SWIFT_LIB_URL,
|
||||
EXTJS_LIB_URL]
|
||||
|
||||
|
||||
# ng wide configs
|
||||
|
@ -303,6 +308,10 @@ def is_swift_enabled(cluster):
|
|||
return _get_config_value(cluster, ENABLE_SWIFT)
|
||||
|
||||
|
||||
def is_hbase_common_lib_enabled(cluster):
|
||||
return _get_config_value(cluster, ENABLE_HBASE_COMMON_LIB)
|
||||
|
||||
|
||||
def get_swift_lib_url(cluster):
|
||||
return _get_config_value(cluster, SWIFT_LIB_URL)
|
||||
|
||||
|
|
|
@ -17,9 +17,9 @@ from sahara.i18n import _
|
|||
from sahara.plugins.cdh import commands as cmd
|
||||
from sahara.plugins.cdh.v5_3_0 import cloudera_utils as cu
|
||||
from sahara.plugins import utils as gu
|
||||
from sahara.service.edp import hdfs_helper as h
|
||||
from sahara.utils import cluster_progress_ops as cpo
|
||||
|
||||
|
||||
PACKAGES = [
|
||||
'cloudera-manager-agent',
|
||||
'cloudera-manager-daemons',
|
||||
|
@ -147,6 +147,11 @@ def _finish_cluster_starting(cluster):
|
|||
if CU.pu.get_hive_metastore(cluster):
|
||||
CU.pu.put_hive_hdfs_xml(cluster)
|
||||
|
||||
server = CU.pu.get_hbase_master(cluster)
|
||||
if CU.pu.c_helper.is_hbase_common_lib_enabled(cluster) and server:
|
||||
with server.remote() as r:
|
||||
h.create_hbase_common_lib(r)
|
||||
|
||||
if CU.pu.get_flumes(cluster):
|
||||
flume = CU.get_service_by_role('AGENT', cluster)
|
||||
CU.start_service(flume)
|
||||
|
|
|
@ -20,12 +20,30 @@ from six.moves.urllib import parse as urlparse
|
|||
|
||||
from sahara import conductor as c
|
||||
from sahara import context
|
||||
from sahara.plugins import exceptions as ex
|
||||
from sahara.plugins import utils as u
|
||||
from sahara.utils import general as g
|
||||
|
||||
|
||||
conductor = c.API
|
||||
|
||||
HBASE_COMMON_LIB_PATH = "/user/sahara-hbase-lib"
|
||||
|
||||
|
||||
def create_hbase_common_lib(r):
|
||||
r.execute_command(
|
||||
'sudo su - -c "hadoop dfs -mkdir -p %s" hdfs' % (
|
||||
HBASE_COMMON_LIB_PATH))
|
||||
ret_code, stdout = r.execute_command(
|
||||
'hbase classpath')
|
||||
if ret_code == 0:
|
||||
paths = stdout.split(':')
|
||||
for p in paths:
|
||||
if p.endswith(".jar"):
|
||||
r.execute_command('sudo su - -c "hadoop fs -put -p %s %s" hdfs'
|
||||
% (p, HBASE_COMMON_LIB_PATH))
|
||||
else:
|
||||
raise ex.RequiredServiceMissingException('hbase')
|
||||
|
||||
|
||||
def put_file_to_hdfs(r, file, file_name, path, hdfs_user):
|
||||
tmp_file_name = '%s.%s' % (file_name, six.text_type(uuid.uuid4()))
|
||||
|
|
|
@ -50,14 +50,27 @@ class OozieJobEngine(base_engine.JobEngine):
|
|||
return o.OozieClient(self.get_oozie_server_uri(self.cluster),
|
||||
self.get_oozie_server(self.cluster))
|
||||
|
||||
def _get_oozie_job_params(self, hdfs_user, path_to_workflow, oozie_params):
|
||||
def _get_oozie_job_params(self, hdfs_user, path_to_workflow, oozie_params,
|
||||
use_hbase_lib):
|
||||
app_path = "oozie.wf.application.path"
|
||||
oozie_libpath_key = "oozie.libpath"
|
||||
oozie_libpath = ""
|
||||
rm_path = self.get_resource_manager_uri(self.cluster)
|
||||
nn_path = self.get_name_node_uri(self.cluster)
|
||||
hbase_common_lib_path = "%s%s" % (nn_path, h.HBASE_COMMON_LIB_PATH)
|
||||
|
||||
if use_hbase_lib:
|
||||
if oozie_libpath_key in oozie_params:
|
||||
oozie_libpath = "%s,%s" % (oozie_params.get(oozie_libpath_key,
|
||||
""), hbase_common_lib_path)
|
||||
else:
|
||||
oozie_libpath = hbase_common_lib_path
|
||||
|
||||
job_parameters = {
|
||||
"jobTracker": rm_path,
|
||||
"nameNode": nn_path,
|
||||
"user.name": hdfs_user,
|
||||
oozie_libpath_key: oozie_libpath,
|
||||
app_path: "%s%s" % (nn_path, path_to_workflow),
|
||||
"oozie.use.system.libpath": "true"}
|
||||
|
||||
|
@ -65,6 +78,8 @@ class OozieJobEngine(base_engine.JobEngine):
|
|||
# possibly make any sense
|
||||
if app_path in oozie_params:
|
||||
del oozie_params[app_path]
|
||||
if oozie_libpath_key in oozie_params:
|
||||
del oozie_params[oozie_libpath_key]
|
||||
|
||||
job_parameters.update(oozie_params)
|
||||
return job_parameters
|
||||
|
@ -103,6 +118,7 @@ class OozieJobEngine(base_engine.JobEngine):
|
|||
|
||||
proxy_configs = updated_job_configs.get('proxy_configs')
|
||||
configs = updated_job_configs.get('configs', {})
|
||||
use_hbase_lib = configs.get('edp.hbase_common_lib', {})
|
||||
|
||||
# Extract all the 'oozie.' configs so that they can be set in the
|
||||
# job properties file. These are config values for Oozie itself,
|
||||
|
@ -137,7 +153,8 @@ class OozieJobEngine(base_engine.JobEngine):
|
|||
|
||||
job_params = self._get_oozie_job_params(hdfs_user,
|
||||
path_to_workflow,
|
||||
oozie_params)
|
||||
oozie_params,
|
||||
use_hbase_lib)
|
||||
|
||||
client = self._get_client()
|
||||
oozie_job_id = client.add_job(x.create_hadoop_xml(job_params),
|
||||
|
|
|
@ -43,18 +43,20 @@ class TestOozieEngine(base.SaharaTestCase):
|
|||
oozie_params = {'oozie.libpath': '/mylibpath',
|
||||
'oozie.wf.application.path': '/wrong'}
|
||||
job_params = oje._get_oozie_job_params('hadoop',
|
||||
'/tmp', oozie_params)
|
||||
'/tmp', oozie_params, True)
|
||||
self.assertEqual('http://localhost:50030', job_params["jobTracker"])
|
||||
self.assertEqual('hdfs://localhost:8020', job_params["nameNode"])
|
||||
self.assertEqual('hadoop', job_params["user.name"])
|
||||
self.assertEqual('hdfs://localhost:8020/tmp',
|
||||
job_params['oozie.wf.application.path'])
|
||||
self.assertEqual('/mylibpath', job_params['oozie.libpath'])
|
||||
self.assertEqual("/mylibpath,hdfs://localhost:8020/user/"
|
||||
"sahara-hbase-lib", job_params['oozie.libpath'])
|
||||
|
||||
# Make sure this doesn't raise an exception
|
||||
job_params = oje._get_oozie_job_params('hadoop',
|
||||
'/tmp', {})
|
||||
self.assertNotIn('oozie.libpath', job_params)
|
||||
'/tmp', {}, True)
|
||||
self.assertEqual("hdfs://localhost:8020/user/"
|
||||
"sahara-hbase-lib", job_params['oozie.libpath'])
|
||||
|
||||
@mock.patch('sahara.utils.remote.get_remote')
|
||||
@mock.patch('sahara.utils.ssh_remote.InstanceInteropHelper')
|
||||
|
|
Loading…
Reference in New Issue