Removed EDP dependency on hive server

Currently EDP has two dependencies on hive:
1. get_hiveserver instance obtained by naming convention (hiveserver
   process name). E.g. for HDP plugin this doesn't work because process
   has other name (HIVE_SERVER)
2. there is get_hive_config_path method in plugin SPI. It tells where
   hive config is stored locally on hive server. EDP copies it into
   HDFS for each hive job

This CR removes both dependencies. Since we copy conf file to HDFS for
each job, we can copy it one time and reference to it. Location of
shared config file is hardcoded (like it is done for oozie share libs).

Closes-Bug: #1286460
Partially implements: blueprint edp-plugin-communication

Change-Id: I4de092e767281b4a4f4c21277be81ef956421285
This commit is contained in:
Andrew Lazarev 2014-02-28 22:53:36 -08:00
parent ef934759a4
commit 34ae882ca0
9 changed files with 54 additions and 58 deletions

View File

@ -47,9 +47,6 @@ class IDHProvider(p.ProvisioningPluginBase):
def get_configs(self, hadoop_version):
return self._get_version_handler(hadoop_version).get_plugin_configs()
def get_hive_config_path(self):
return '/etc/hive/conf/hive-site.xml'
def configure_cluster(self, cluster):
self._get_version_handler(
cluster.hadoop_version).configure_cluster(cluster)

View File

@ -26,10 +26,6 @@ class ProvisioningPluginBase(plugins_base.PluginInterface):
def get_configs(self, hadoop_version):
pass
@plugins_base.optional
def get_hive_config_path(self):
pass
@plugins_base.optional
def get_hdfs_user(self):
pass

View File

@ -49,9 +49,6 @@ class VanillaProvider(p.ProvisioningPluginBase):
def get_configs(self, hadoop_version):
return self._get_version_handler(hadoop_version).get_plugin_configs()
def get_hive_config_path(self):
return '/opt/hive/conf/hive-site.xml'
def configure_cluster(self, cluster):
return self._get_version_handler(
cluster.hadoop_version).configure_cluster(cluster)

View File

@ -30,14 +30,21 @@ def refresh_nodes(remote, service):
% service)
def format_namenode(nn_remote):
nn_remote.execute_command("sudo su -c 'hadoop namenode -format' hadoop")
def format_namenode(remote):
remote.execute_command("sudo su -c 'hadoop namenode -format' hadoop")
def hive_create_warehouse_dir(nn_remote):
def hive_create_warehouse_dir(remote):
LOG.debug("Creating Hive warehouse dir")
nn_remote.execute_command("sudo su - -c 'hadoop fs -mkdir "
"/user/hive/warehouse' hadoop")
remote.execute_command("sudo su - -c 'hadoop fs -mkdir "
"/user/hive/warehouse' hadoop")
def hive_copy_shared_conf(remote, dest):
LOG.debug("Copying shared Hive conf")
remote.execute_command(
"sudo su - -c 'hadoop fs -put /opt/hive/conf/hive-site.xml "
"%s' hadoop" % dest)
def oozie_share_lib(remote, nn_hostname):

View File

@ -28,6 +28,7 @@ from savanna.plugins.vanilla.v1_2_1 import config_helper as c_helper
from savanna.plugins.vanilla.v1_2_1 import run_scripts as run
from savanna.plugins.vanilla.v1_2_1 import scaling as sc
from savanna.topology import topology_helper as th
from savanna.utils import edp
from savanna.utils import files as f
from savanna.utils import general as g
from savanna.utils import remote
@ -136,16 +137,18 @@ class VersionHandler(avm.AbstractVersionHandler):
hive_server = utils.get_hiveserver(cluster)
if hive_server:
with remote.get_remote(nn_instance) as r:
with remote.get_remote(hive_server) as r:
run.hive_create_warehouse_dir(r)
if c_helper.is_mysql_enable(cluster):
with remote.get_remote(hive_server) as h:
run.hive_copy_shared_conf(
r, edp.get_hive_shared_conf_path('hadoop'))
if c_helper.is_mysql_enable(cluster):
if not oozie or hive_server.hostname() != oozie.hostname():
run.mysql_start(h, hive_server)
run.hive_create_db(h)
run.hive_metastore_start(h)
LOG.info("Hive Metastore server at %s has been started",
hive_server.hostname())
run.mysql_start(r, hive_server)
run.hive_create_db(r)
run.hive_metastore_start(r)
LOG.info("Hive Metastore server at %s has been started",
hive_server.hostname())
LOG.info('Cluster %s has been started successfully' % cluster.name)
self._set_cluster_info(cluster)

View File

@ -145,13 +145,8 @@ def run_job(job_execution):
creator = workflow_factory.get_creator(job)
# Do other job type specific setup here, for example
# uploading hive configuration
creator.configure_workflow_if_needed(cluster, wf_dir)
wf_xml = creator.get_workflow_xml(job_execution,
input_source,
output_source)
wf_xml = creator.get_workflow_xml(cluster, job_execution,
input_source, output_source)
path_to_workflow = upload_workflow_file(oozie_server,
wf_dir, wf_xml, hdfs_user)

View File

@ -18,14 +18,11 @@ import six
from savanna import conductor as c
from savanna import context
from savanna.plugins import base as plugin_base
from savanna.plugins.general import utils as u
from savanna.service.edp import hdfs_helper as h
from savanna.service.edp.workflow_creator import hive_workflow
from savanna.service.edp.workflow_creator import java_workflow
from savanna.service.edp.workflow_creator import mapreduce_workflow
from savanna.service.edp.workflow_creator import pig_workflow
from savanna.utils import edp
from savanna.utils import remote
from savanna.utils import xmlutils
@ -36,9 +33,6 @@ swift_password = 'fs.swift.service.savanna.password'
class BaseFactory(object):
def configure_workflow_if_needed(self, *args, **kwargs):
pass
def _separate_edp_configs(self, job_dict):
configs = {}
edp_configs = {}
@ -110,7 +104,7 @@ class PigFactory(BaseFactory):
def get_script_name(self, job):
return conductor.job_main_name(context.ctx(), job)
def get_workflow_xml(self, execution, input_data, output_data):
def get_workflow_xml(self, cluster, execution, input_data, output_data):
job_dict = {'configs': self.get_configs(input_data, output_data),
'params': self.get_params(input_data, output_data),
'args': []}
@ -128,29 +122,24 @@ class HiveFactory(BaseFactory):
super(HiveFactory, self).__init__()
self.name = self.get_script_name(job)
self.job_xml = "hive-site.xml"
def get_script_name(self, job):
return conductor.job_main_name(context.ctx(), job)
def get_workflow_xml(self, execution, input_data, output_data):
def get_workflow_xml(self, cluster, execution, input_data, output_data):
job_dict = {'configs': self.get_configs(input_data, output_data),
'params': self.get_params(input_data, output_data)}
self.update_job_dict(job_dict, execution.job_configs)
plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name)
hdfs_user = plugin.get_hdfs_user()
creator = hive_workflow.HiveWorkflowCreator()
creator.build_workflow_xml(self.name,
self.job_xml,
edp.get_hive_shared_conf_path(hdfs_user),
configuration=job_dict['configs'],
params=job_dict['params'])
return creator.get_built_workflow_xml()
def configure_workflow_if_needed(self, cluster, wf_dir):
h_s = u.get_hiveserver(cluster)
plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name)
hdfs_user = plugin.get_hdfs_user()
h.copy_from_local(remote.get_remote(h_s),
plugin.get_hive_config_path(), wf_dir, hdfs_user)
class MapReduceFactory(BaseFactory):
@ -166,7 +155,7 @@ class MapReduceFactory(BaseFactory):
return dict((k[len(prefix):], v) for (k, v) in six.iteritems(
job_dict['edp_configs']) if k.startswith(prefix))
def get_workflow_xml(self, execution, input_data, output_data):
def get_workflow_xml(self, cluster, execution, input_data, output_data):
job_dict = {'configs': self.get_configs(input_data, output_data)}
self.update_job_dict(job_dict, execution.job_configs)
creator = mapreduce_workflow.MapReduceWorkFlowCreator()
@ -182,7 +171,7 @@ class JavaFactory(BaseFactory):
java_opts = job_dict['edp_configs'].get('edp.java.java_opts', None)
return main_class, java_opts
def get_workflow_xml(self, execution, *args, **kwargs):
def get_workflow_xml(self, cluster, execution, *args, **kwargs):
job_dict = {'configs': {},
'args': []}
self.update_job_dict(job_dict, execution.job_configs)

View File

@ -18,6 +18,7 @@ import copy
import mock
from savanna import conductor as cond
from savanna.plugins import base as pb
from savanna.service.edp import job_manager
from savanna.service.edp.workflow_creator import workflow_factory
from savanna.tests.unit import base
@ -35,6 +36,7 @@ class TestJobManager(base.SavannaWithDbTestCase):
def setUp(self):
super(TestJobManager, self).setUp()
p.patch_minidom_writexml()
pb.setup_plugins()
@mock.patch('savanna.utils.remote.get_remote')
@mock.patch('savanna.service.edp.hdfs_helper.create_dir')
@ -99,7 +101,7 @@ class TestJobManager(base.SavannaWithDbTestCase):
creator = workflow_factory.get_creator(job)
res = creator.get_workflow_xml(job_exec,
res = creator.get_workflow_xml(_create_cluster(), job_exec,
input_data, output_data)
self.assertIn("""
@ -131,7 +133,7 @@ class TestJobManager(base.SavannaWithDbTestCase):
output_data = _create_data_source('hdfs://user/hadoop/out')
creator = workflow_factory.get_creator(job)
res = creator.get_workflow_xml(job_exec,
res = creator.get_workflow_xml(_create_cluster(), job_exec,
input_data, output_data)
self.assertIn("""
@ -151,7 +153,7 @@ class TestJobManager(base.SavannaWithDbTestCase):
creator = workflow_factory.get_creator(job)
res = creator.get_workflow_xml(job_exec,
res = creator.get_workflow_xml(_create_cluster(), job_exec,
input_data, output_data)
self.assertIn("""
@ -173,7 +175,7 @@ class TestJobManager(base.SavannaWithDbTestCase):
creator = workflow_factory.get_creator(job)
res = creator.get_workflow_xml(job_exec,
res = creator.get_workflow_xml(_create_cluster(), job_exec,
input_data, output_data)
self.assertIn("""
@ -199,7 +201,7 @@ class TestJobManager(base.SavannaWithDbTestCase):
creator = workflow_factory.get_creator(job)
res = creator.get_workflow_xml(job_exec,
res = creator.get_workflow_xml(_create_cluster(), job_exec,
input_data, output_data)
if streaming:
@ -252,7 +254,7 @@ class TestJobManager(base.SavannaWithDbTestCase):
job, job_exec = _create_all_stack('Java', configs)
creator = workflow_factory.get_creator(job)
res = creator.get_workflow_xml(job_exec)
res = creator.get_workflow_xml(_create_cluster(), job_exec)
self.assertIn("""
<configuration>
@ -281,11 +283,11 @@ class TestJobManager(base.SavannaWithDbTestCase):
creator = workflow_factory.get_creator(job)
res = creator.get_workflow_xml(job_exec,
res = creator.get_workflow_xml(_create_cluster(), job_exec,
input_data, output_data)
self.assertIn("""
<job-xml>hive-site.xml</job-xml>
<job-xml>/user/hadoop/conf/hive-site.xml</job-xml>
<configuration>
<property>
<name>fs.swift.service.savanna.password</name>
@ -311,7 +313,7 @@ class TestJobManager(base.SavannaWithDbTestCase):
creator = workflow_factory.get_creator(job)
res = creator.get_workflow_xml(job_exec,
res = creator.get_workflow_xml(_create_cluster(), job_exec,
input_data, output_data)
self.assertIn("""
@ -402,6 +404,12 @@ def _create_job_binary(id, type):
return binary
def _create_cluster():
cluster = mock.Mock()
cluster.plugin_name = 'vanilla'
return cluster
def _create_data_source(url):
data_source = mock.Mock()
data_source.url = url

View File

@ -45,3 +45,7 @@ def compare_job_type(job_type, *args, **kwargs):
jtype, jsubtype = split_job_type(job_type)
return jtype in args
def get_hive_shared_conf_path(hdfs_user):
return "/user/%s/conf/hive-site.xml" % hdfs_user