Made EDP engine plugin specific

+ Moved 'get_hdfs_user' method from plugin SPI to EDP engine

Futher steps: move other EDP-specific method to EDP engine

Change-Id: I0537397894012f496ea4abc2661aa8331fbf6bd3
Partial-Bug: #1357512
This commit is contained in:
Andrew Lazarev 2014-08-15 19:12:43 -07:00
parent 822c2f053f
commit 42526b808b
17 changed files with 228 additions and 122 deletions

View File

@ -0,0 +1,22 @@
# Copyright (c) 2014 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from sahara.service.edp.oozie import engine as edp_engine
class EdpOozieEngine(edp_engine.OozieJobEngine):
def get_hdfs_user(self):
return 'hdfs'

View File

@ -17,6 +17,7 @@ from sahara import conductor
from sahara import context
from sahara.plugins.cdh import config_helper as c_helper
from sahara.plugins.cdh import deploy as dp
from sahara.plugins.cdh import edp_engine
from sahara.plugins.cdh import utils as cu
from sahara.plugins.cdh import validation as vl
from sahara.plugins import provisioning as p
@ -74,9 +75,6 @@ class CDHPluginProvider(p.ProvisioningPluginBase):
vl.validate_existing_ng_scaling(cluster, existing)
vl.validate_additional_ng_scaling(cluster, additional)
def get_hdfs_user(self):
return 'hdfs'
def get_oozie_server(self, cluster):
return cu.get_oozie(cluster)
@ -104,3 +102,8 @@ class CDHPluginProvider(p.ProvisioningPluginBase):
ctx = context.ctx()
conductor.cluster_update(ctx, cluster, {'info': info})
def get_edp_engine(self, cluster, job_type):
if job_type in edp_engine.EdpOozieEngine.get_supported_job_types():
return edp_engine.EdpOozieEngine(cluster)
return None

View File

@ -22,6 +22,7 @@ from sahara.i18n import _LI
from sahara.openstack.common import log as logging
from sahara.plugins.general import exceptions as ex
from sahara.plugins.general import utils as u
from sahara.plugins.hdp import edp_engine
from sahara.plugins.hdp import hadoopserver as h
from sahara.plugins.hdp import saharautils as utils
from sahara.plugins.hdp.versions import versionhandlerfactory as vhf
@ -96,9 +97,6 @@ class AmbariPlugin(p.ProvisioningPluginBase):
return node_processes
def get_hdfs_user(self):
return 'hdfs'
def convert(self, config, plugin_name, version, template_name,
cluster_template_create):
handler = self.version_factory.get_version_handler(version)
@ -143,6 +141,11 @@ class AmbariPlugin(p.ProvisioningPluginBase):
def get_oozie_server(self, cluster):
return u.get_instance(cluster, "OOZIE_SERVER")
def get_edp_engine(self, cluster, job_type):
if job_type in edp_engine.EdpOozieEngine.get_supported_job_types():
return edp_engine.EdpOozieEngine(cluster)
return None
def validate_edp(self, cluster):
oo_count = u.get_instances_count(cluster, 'OOZIE_SERVER')
if oo_count != 1:

View File

@ -0,0 +1,22 @@
# Copyright (c) 2014 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from sahara.service.edp.oozie import engine as edp_engine
class EdpOozieEngine(edp_engine.OozieJobEngine):
def get_hdfs_user(self):
return 'hdfs'

View File

@ -29,10 +29,6 @@ class ProvisioningPluginBase(plugins_base.PluginInterface):
def get_configs(self, hadoop_version):
pass
@plugins_base.optional
def get_hdfs_user(self):
pass
@plugins_base.required
def get_node_processes(self, hadoop_version):
pass
@ -81,36 +77,9 @@ class ProvisioningPluginBase(plugins_base.PluginInterface):
def validate_edp(self, cluster):
pass
@plugins_base.required_with_default
def get_edp_engine(self, cluster, job_type, default_engines):
'''Default implementation to select an EDP job engine
This method chooses an EDP implementation based on job type. It should
be overloaded by a plugin to allow different behavior or the selection
of a custom EDP implementation.
The default_engines parameter is a list of default EDP implementations.
Each item in the list is a dictionary, and each dictionary has the
following elements:
name (a simple name for the implementation)
job_types (a list of EDP job types supported by the implementation)
engine (a class derived from sahara.service.edp.base_engine.JobEngine)
This method will choose the first engine that it finds which lists the
job_type value in the job_types element. An instance of that engine
will be allocated and returned.
:param cluster: a Sahara cluster object
:param job_type: an EDP job type string
:param default_engines: a list of dictionaries describing the default
implementations.
:returns: an instance of a class derived from
sahara.service.edp.base_engine.JobEngine or None
'''
for eng in default_engines:
if job_type in eng["job_types"]:
return eng["engine"](cluster)
@plugins_base.optional
def get_edp_engine(self, cluster, job_type):
pass
@plugins_base.optional
def get_resource_manager_uri(self, cluster):

View File

@ -0,0 +1,20 @@
# Copyright (c) 2014 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from sahara.service.edp.spark import engine as edp_engine
class EdpEngine(edp_engine.SparkJobEngine):
pass

View File

@ -26,6 +26,7 @@ from sahara.plugins.general import exceptions as ex
from sahara.plugins.general import utils
from sahara.plugins import provisioning as p
from sahara.plugins.spark import config_helper as c_helper
from sahara.plugins.spark import edp_engine
from sahara.plugins.spark import run_scripts as run
from sahara.plugins.spark import scaling as sc
from sahara.topology import topology_helper as th
@ -418,35 +419,11 @@ class SparkProvider(p.ProvisioningPluginBase):
"replicas (replication factor is %s)") %
rep_factor)
def get_edp_engine(self, cluster, job_type, default_engines):
'''Select an EDP engine for Spark standalone deployment
def get_edp_engine(self, cluster, job_type):
if cluster.hadoop_version < "1.0.0":
return
The default_engines parameter is a list of default EDP implementations.
Each item in the list is a dictionary, and each dictionary has the
following elements:
if job_type in edp_engine.EdpEngine.get_supported_job_types():
return edp_engine.EdpEngine(cluster)
name (a simple name for the implementation)
job_types (a list of EDP job types supported by the implementation)
engine (a class derived from sahara.service.edp.base_engine.JobEngine)
This method will choose the first engine that it finds from the default
list which meets the following criteria:
eng['name'] == spark
eng['job_types'] == job_type
An instance of that engine will be allocated and returned.
:param cluster: a Sahara cluster object
:param job_type: an EDP job type string
:param default_engines: a list of dictionaries describing the default
implementations.
:returns: an instance of a class derived from
sahara.service.edp.base_engine.JobEngine or None
'''
# We know that spark EDP requires at least spark 1.0.0
# to have spark-submit. Reject anything else.
if cluster.hadoop_version >= "1.0.0":
for eng in default_engines:
if self.name == eng['name'] and job_type in eng["job_types"]:
return eng["engine"](cluster)
return None

View File

@ -0,0 +1,21 @@
# Copyright (c) 2014 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from sahara.service.edp.oozie import engine as edp_engine
class EdpOozieEngine(edp_engine.OozieJobEngine):
def get_hdfs_user(self):
return 'hadoop'

View File

@ -17,6 +17,7 @@ from sahara.i18n import _
from sahara.plugins.general import exceptions as ex
from sahara.plugins.general import utils as u
from sahara.plugins import provisioning as p
from sahara.plugins.vanilla import edp_engine
from sahara.plugins.vanilla import versionfactory as vhf
@ -37,9 +38,6 @@ class VanillaProvider(p.ProvisioningPluginBase):
return self._get_version_handler(
cluster.hadoop_version).get_resource_manager_uri(cluster)
def get_hdfs_user(self):
return 'hadoop'
def get_node_processes(self, hadoop_version):
return self._get_version_handler(hadoop_version).get_node_processes()
@ -91,3 +89,8 @@ class VanillaProvider(p.ProvisioningPluginBase):
def get_oozie_server_uri(self, cluster):
return cluster['info']['JobFlow']['Oozie'] + "/oozie/"
def get_edp_engine(self, cluster, job_type):
if job_type in edp_engine.EdpOozieEngine.get_supported_job_types():
return edp_engine.EdpOozieEngine(cluster)
return None

View File

@ -41,3 +41,8 @@ class JobEngine(object):
@abc.abstractmethod
def get_possible_job_config(job_type):
return None
@staticmethod
@abc.abstractmethod
def get_supported_job_types():
return None

View File

@ -35,24 +35,8 @@ CONF = cfg.CONF
conductor = c.API
def _make_engine(name, job_types, engine_class):
return {"name": name,
"job_types": job_types,
"engine": engine_class}
default_engines = [_make_engine("oozie",
[edp.JOB_TYPE_HIVE,
edp.JOB_TYPE_JAVA,
edp.JOB_TYPE_MAPREDUCE,
edp.JOB_TYPE_MAPREDUCE_STREAMING,
edp.JOB_TYPE_PIG],
oozie_engine.OozieJobEngine),
_make_engine("spark",
[edp.JOB_TYPE_JAVA,
edp.JOB_TYPE_SPARK],
spark_engine.SparkJobEngine)
]
ENGINES = [oozie_engine.OozieJobEngine,
spark_engine.SparkJobEngine]
def _get_job_type(job_execution):
@ -62,8 +46,7 @@ def _get_job_type(job_execution):
def _get_job_engine(cluster, job_execution):
return job_utils.get_plugin(cluster).get_edp_engine(cluster,
_get_job_type(
job_execution),
default_engines)
job_execution))
def _write_job_status(job_execution, job_info):
@ -184,15 +167,6 @@ def update_job_statuses():
def get_job_config_hints(job_type):
# TODO(tmckay) We need plugin-specific config hints
# (not a new problem). At the moment we don't have a plugin
# or cluster argument in this call so we can only go by
# job type.
# Since we currently have a single engine for each
# job type (Spark will support Java only temporarily)
# we can just key off of job type
for eng in default_engines:
if job_type in eng["job_types"]:
return eng["engine"].get_possible_job_config(job_type)
for eng in ENGINES:
if job_type in eng.get_supported_job_types():
return eng.get_possible_job_config(job_type)

View File

@ -13,8 +13,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
from oslo.config import cfg
import six
from sahara import conductor as c
from sahara import context
@ -23,6 +25,7 @@ from sahara.service.edp import hdfs_helper as h
from sahara.service.edp import job_utils
from sahara.service.edp.oozie import oozie as o
from sahara.service.edp.oozie.workflow_creator import workflow_factory
from sahara.utils import edp
from sahara.utils import remote
from sahara.utils import xmlutils as x
@ -31,9 +34,10 @@ CONF = cfg.CONF
conductor = c.API
@six.add_metaclass(abc.ABCMeta)
class OozieJobEngine(base_engine.JobEngine):
def __init__(self, cluster):
def __init__(self, cluster):
self.cluster = cluster
self.plugin = job_utils.get_plugin(self.cluster)
@ -79,7 +83,7 @@ class OozieJobEngine(base_engine.JobEngine):
h.configure_cluster_for_hdfs(self.cluster, data_source)
break
hdfs_user = self.plugin.get_hdfs_user()
hdfs_user = self.get_hdfs_user()
# TODO(tmckay): this should probably be "get_namenode"
# but that call does not exist in the plugin api now.
@ -92,7 +96,8 @@ class OozieJobEngine(base_engine.JobEngine):
job, hdfs_user)
wf_xml = workflow_factory.get_workflow_xml(
job, self.cluster, job_execution, input_source, output_source)
job, self.cluster, job_execution, input_source, output_source,
hdfs_user)
path_to_workflow = self._upload_workflow_file(oozie_server, wf_dir,
wf_xml, hdfs_user)
@ -111,6 +116,18 @@ class OozieJobEngine(base_engine.JobEngine):
status = None
return (oozie_job_id, status, None)
@abc.abstractmethod
def get_hdfs_user(self):
pass
@staticmethod
def get_possible_job_config(job_type):
return workflow_factory.get_possible_job_config(job_type)
@staticmethod
def get_supported_job_types():
return [edp.JOB_TYPE_HIVE,
edp.JOB_TYPE_JAVA,
edp.JOB_TYPE_MAPREDUCE,
edp.JOB_TYPE_MAPREDUCE_STREAMING,
edp.JOB_TYPE_PIG]

View File

@ -18,7 +18,6 @@ import six.moves.urllib.parse as urlparse
from sahara import conductor as c
from sahara import context
from sahara.plugins import base as plugin_base
from sahara.service.edp.oozie.workflow_creator import hive_workflow
from sahara.service.edp.oozie.workflow_creator import java_workflow
from sahara.service.edp.oozie.workflow_creator import mapreduce_workflow
@ -127,7 +126,8 @@ class PigFactory(BaseFactory):
def get_script_name(self, job):
return conductor.job_main_name(context.ctx(), job)
def get_workflow_xml(self, cluster, execution, input_data, output_data):
def get_workflow_xml(self, cluster, execution, input_data, output_data,
hdfs_user):
job_dict = {'configs': self.get_configs(input_data, output_data),
'params': self.get_params(input_data, output_data),
'args': []}
@ -149,13 +149,12 @@ class HiveFactory(BaseFactory):
def get_script_name(self, job):
return conductor.job_main_name(context.ctx(), job)
def get_workflow_xml(self, cluster, execution, input_data, output_data):
def get_workflow_xml(self, cluster, execution, input_data, output_data,
hdfs_user):
job_dict = {'configs': self.get_configs(input_data, output_data),
'params': self.get_params(input_data, output_data)}
self.update_job_dict(job_dict, execution.job_configs)
plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name)
hdfs_user = plugin.get_hdfs_user()
creator = hive_workflow.HiveWorkflowCreator()
creator.build_workflow_xml(self.name,
edp.get_hive_shared_conf_path(hdfs_user),
@ -178,7 +177,8 @@ class MapReduceFactory(BaseFactory):
return dict((k[len(prefix):], v) for (k, v) in six.iteritems(
job_dict['edp_configs']) if k.startswith(prefix))
def get_workflow_xml(self, cluster, execution, input_data, output_data):
def get_workflow_xml(self, cluster, execution, input_data, output_data,
hdfs_user):
job_dict = {'configs': self.get_configs(input_data, output_data)}
self.update_job_dict(job_dict, execution.job_configs)
creator = mapreduce_workflow.MapReduceWorkFlowCreator()

View File

@ -188,3 +188,7 @@ class SparkJobEngine(base_engine.JobEngine):
@staticmethod
def get_possible_job_config(job_type):
return {'job_config': {'configs': [], 'args': []}}
@staticmethod
def get_supported_job_types():
return [edp.JOB_TYPE_SPARK]

View File

@ -0,0 +1,55 @@
# Copyright (c) 2013 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from sahara import conductor as cond
from sahara import context
from sahara.plugins import base as pb
from sahara.service.edp.spark import engine
from sahara.tests.unit import base
from sahara.utils import edp
conductor = cond.API
class SparkPluginTest(base.SaharaWithDbTestCase):
def setUp(self):
super(SparkPluginTest, self).setUp()
self.override_config("plugins", ["spark"])
pb.setup_plugins()
def test_plugin09_no_edp_engine(self):
cluster_dict = {
'name': 'cluster',
'plugin_name': 'spark',
'hadoop_version': '0.9.1',
'default_image_id': 'image'}
cluster = conductor.cluster_create(context.ctx(), cluster_dict)
plugin = pb.PLUGINS.get_plugin(cluster.plugin_name)
self.assertIsNone(plugin.get_edp_engine(cluster, edp.JOB_TYPE_SPARK))
def test_plugin10_edp_engine(self):
cluster_dict = {
'name': 'cluster',
'plugin_name': 'spark',
'hadoop_version': '1.0.0',
'default_image_id': 'image'}
cluster = conductor.cluster_create(context.ctx(), cluster_dict)
plugin = pb.PLUGINS.get_plugin(cluster.plugin_name)
self.assertIsInstance(
plugin.get_edp_engine(cluster, edp.JOB_TYPE_SPARK),
engine.SparkJobEngine)

View File

@ -171,7 +171,8 @@ class TestJobManager(base.SaharaWithDbTestCase):
output_data = _create_data_source('swift://ex/o')
res = workflow_factory.get_workflow_xml(
job, _create_cluster(), job_exec, input_data, output_data)
job, _create_cluster(), job_exec, input_data, output_data,
'hadoop')
self.assertIn("""
<param>INPUT=swift://ex.sahara/i</param>
@ -202,7 +203,8 @@ class TestJobManager(base.SaharaWithDbTestCase):
output_data = _create_data_source('hdfs://user/hadoop/out')
res = workflow_factory.get_workflow_xml(
job, _create_cluster(), job_exec, input_data, output_data)
job, _create_cluster(), job_exec, input_data, output_data,
'hadoop')
self.assertIn("""
<configuration>
@ -220,7 +222,8 @@ class TestJobManager(base.SaharaWithDbTestCase):
output_data = _create_data_source('swift://ex/o')
res = workflow_factory.get_workflow_xml(
job, _create_cluster(), job_exec, input_data, output_data)
job, _create_cluster(), job_exec, input_data, output_data,
'hadoop')
self.assertIn("""
<configuration>
@ -240,7 +243,8 @@ class TestJobManager(base.SaharaWithDbTestCase):
output_data = _create_data_source('hdfs://user/hadoop/out')
res = workflow_factory.get_workflow_xml(
job, _create_cluster(), job_exec, input_data, output_data)
job, _create_cluster(), job_exec, input_data, output_data,
'hadoop')
self.assertIn("""
<configuration>
@ -264,7 +268,8 @@ class TestJobManager(base.SaharaWithDbTestCase):
output_data = _create_data_source('swift://ex/o')
res = workflow_factory.get_workflow_xml(
job, _create_cluster(), job_exec, input_data, output_data)
job, _create_cluster(), job_exec, input_data, output_data,
'hadoop')
if streaming:
self.assertIn("""
@ -344,7 +349,8 @@ class TestJobManager(base.SaharaWithDbTestCase):
output_data = _create_data_source('swift://ex/o')
res = workflow_factory.get_workflow_xml(
job, _create_cluster(), job_exec, input_data, output_data)
job, _create_cluster(), job_exec, input_data, output_data,
'hadoop')
self.assertIn("""
<job-xml>/user/hadoop/conf/hive-site.xml</job-xml>
@ -372,7 +378,8 @@ class TestJobManager(base.SaharaWithDbTestCase):
job_type, configs={"configs": {'c': 'f'}})
res = workflow_factory.get_workflow_xml(
job, _create_cluster(), job_exec, input_data, output_data)
job, _create_cluster(), job_exec, input_data, output_data,
'hadoop')
self.assertIn("""
<property>
@ -512,6 +519,10 @@ class TestJobManager(base.SaharaWithDbTestCase):
@mock.patch('sahara.service.edp.job_utils.get_plugin')
def test_get_oozie_job_params(self, getplugin):
class OozieJobEngine(oozie_engine.OozieJobEngine):
def get_hdfs_user(self):
return 'hadoop'
plugin = mock.Mock()
getplugin.return_value = plugin
@ -519,7 +530,7 @@ class TestJobManager(base.SaharaWithDbTestCase):
plugin.get_name_node_uri.return_value = 'hdfs://localhost:8020'
cluster = _create_cluster()
oje = oozie_engine.OozieJobEngine(cluster)
oje = OozieJobEngine(cluster)
job_params = oje._get_oozie_job_params('hadoop', '/tmp')
self.assertEqual('http://localhost:50030', job_params["jobTracker"])
self.assertEqual('hdfs://localhost:8020', job_params["nameNode"])