Moved URI getters from plugin SPI to edp_engine

Change-Id: I6dec6f62a8b2c8e1772af862bc86816bfa5d7f4d
Partial-Bug: #1357512
This commit is contained in:
Andrew Lazarev 2014-08-19 13:51:01 -07:00
parent cd81457309
commit d541f606b0
23 changed files with 66 additions and 105 deletions

View File

@ -136,27 +136,6 @@ Returns the instance object for the host running the Oozie server (this service
*Returns*: The Oozie server instance object
get_name_node_uri(cluster)
~~~~~~~~~~~~~~~~~~~~~~~~~~
Returns the URI for access to the Name Node
*Returns*: The Name Node URI
get_oozie_server_uri(cluster)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Returns the URI for access to the Oozie server
*Returns*: The Oozie server URI
get_resource_manager_uri(cluster)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Returns the URI for access to the mapred resource manager (e.g Hadoop 1.x - jobtracker, Hadoop 2.x - yarn resource manager)
*Returns*: The resource manager URI
Object Model
============

View File

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from sahara.plugins.cdh import utils as cu
from sahara.service.edp import hdfs_helper
from sahara.service.edp.oozie import engine as edp_engine
@ -24,3 +25,15 @@ class EdpOozieEngine(edp_engine.OozieJobEngine):
def create_hdfs_dir(self, remote, dir_name):
hdfs_helper.create_dir_hadoop2(remote, dir_name, self.get_hdfs_user())
def get_oozie_server_uri(self, cluster):
oozie_ip = cu.get_oozie(cluster).management_ip
return 'http://%s:11000/oozie' % oozie_ip
def get_name_node_uri(self, cluster):
namenode_ip = cu.get_namenode(cluster).fqdn()
return 'hdfs://%s:8020' % namenode_ip
def get_resource_manager_uri(self, cluster):
resourcemanager_ip = cu.get_resourcemanager(cluster).fqdn()
return '%s:8032' % resourcemanager_ip

View File

@ -78,18 +78,6 @@ class CDHPluginProvider(p.ProvisioningPluginBase):
def get_oozie_server(self, cluster):
return cu.get_oozie(cluster)
def get_oozie_server_uri(self, cluster):
oozie_ip = cu.get_oozie(cluster).management_ip
return 'http://%s:11000/oozie' % oozie_ip
def get_name_node_uri(self, cluster):
namenode_ip = cu.get_namenode(cluster).fqdn()
return 'hdfs://%s:8020' % namenode_ip
def get_resource_manager_uri(self, cluster):
resourcemanager_ip = cu.get_resourcemanager(cluster).fqdn()
return '%s:8032' % resourcemanager_ip
def _set_cluster_info(self, cluster):
mng = cu.get_manager(cluster)
info = {

View File

@ -151,17 +151,6 @@ class AmbariPlugin(p.ProvisioningPluginBase):
raise ex.InvalidComponentCountException(
'OOZIE_SERVER', '1', oo_count)
def get_resource_manager_uri(self, cluster):
version_handler = (
self.version_factory.get_version_handler(cluster.hadoop_version))
return version_handler.get_resource_manager_uri(cluster)
def get_name_node_uri(self, cluster):
return cluster['info']['HDFS']['NameNode']
def get_oozie_server_uri(self, cluster):
return cluster['info']['JobFlow']['Oozie'] + "/oozie/"
def update_infra(self, cluster):
pass

View File

@ -20,3 +20,9 @@ class EdpOozieEngine(edp_engine.OozieJobEngine):
def get_hdfs_user(self):
return 'hdfs'
def get_name_node_uri(self, cluster):
return cluster['info']['HDFS']['NameNode']
def get_oozie_server_uri(self, cluster):
return cluster['info']['JobFlow']['Oozie'] + "/oozie/"

View File

@ -58,10 +58,6 @@ class AbstractVersionHandler():
def get_services_processor(self):
return
@abc.abstractmethod
def get_resource_manager_uri(self, cluster):
return
@abc.abstractmethod
def get_edp_engine(self, cluster, job_type):
return

View File

@ -21,3 +21,6 @@ class EdpOozieEngine(edp_engine.EdpOozieEngine):
def create_hdfs_dir(self, remote, dir_name):
hdfs_helper.create_dir_hadoop1(remote, dir_name, self.get_hdfs_user())
def get_resource_manager_uri(self, cluster):
return cluster['info']['MapReduce']['JobTracker']

View File

@ -120,9 +120,6 @@ class VersionHandler(avm.AbstractVersionHandler):
def get_services_processor(self):
return services
def get_resource_manager_uri(self, cluster):
return cluster['info']['MapReduce']['JobTracker']
def get_edp_engine(self, cluster, job_type):
if job_type in edp_engine.EdpOozieEngine.get_supported_job_types():
return edp_engine.EdpOozieEngine(cluster)

View File

@ -21,3 +21,6 @@ class EdpOozieEngine(edp_engine.EdpOozieEngine):
def create_hdfs_dir(self, remote, dir_name):
hdfs_helper.create_dir_hadoop2(remote, dir_name, self.get_hdfs_user())
def get_resource_manager_uri(self, cluster):
return cluster['info']['Yarn']['ResourceManager']

View File

@ -108,9 +108,6 @@ class VersionHandler(avm.AbstractVersionHandler):
def get_services_processor(self):
return services
def get_resource_manager_uri(self, cluster):
return cluster['info']['Yarn']['ResourceManager']
def get_edp_engine(self, cluster, job_type):
if job_type in edp_engine.EdpOozieEngine.get_supported_job_types():
return edp_engine.EdpOozieEngine(cluster)

View File

@ -61,18 +61,10 @@ class ProvisioningPluginBase(plugins_base.PluginInterface):
def scale_cluster(self, cluster, instances):
pass
@plugins_base.optional
def get_name_node_uri(self, cluster):
pass
@plugins_base.optional
def get_oozie_server(self, cluster):
pass
@plugins_base.optional
def get_oozie_server_uri(self, cluster):
pass
@plugins_base.optional
def validate_edp(self, cluster):
pass
@ -85,10 +77,6 @@ class ProvisioningPluginBase(plugins_base.PluginInterface):
def get_open_ports(self, node_group):
return []
@plugins_base.optional
def get_resource_manager_uri(self, cluster):
pass
@plugins_base.required_with_default
def decommission_nodes(self, cluster, instances):
pass

View File

@ -53,10 +53,6 @@ class AbstractVersionHandler():
def validate_scaling(self, cluster, existing, additional):
return
@abc.abstractmethod
def get_resource_manager_uri(self, cluster):
return
@abc.abstractmethod
def get_oozie_server(self, cluster):
return

View File

@ -19,3 +19,9 @@ from sahara.service.edp.oozie import engine as edp_engine
class EdpOozieEngine(edp_engine.OozieJobEngine):
def get_hdfs_user(self):
return 'hadoop'
def get_name_node_uri(self, cluster):
return cluster['info']['HDFS']['NameNode']
def get_oozie_server_uri(self, cluster):
return cluster['info']['JobFlow']['Oozie'] + "/oozie/"

View File

@ -21,3 +21,6 @@ class EdpOozieEngine(edp_engine.EdpOozieEngine):
def create_hdfs_dir(self, remote, dir_name):
hdfs_helper.create_dir_hadoop2(remote, dir_name, self.get_hdfs_user())
def get_resource_manager_uri(self, cluster):
return cluster['info']['YARN']['ResourceManager']

View File

@ -33,10 +33,6 @@ class VanillaProvider(p.ProvisioningPluginBase):
def _get_version_handler(self, hadoop_version):
return self.version_factory.get_version_handler(hadoop_version)
def get_resource_manager_uri(self, cluster):
return self._get_version_handler(
cluster.hadoop_version).get_resource_manager_uri(cluster)
def get_node_processes(self, hadoop_version):
return self._get_version_handler(hadoop_version).get_node_processes()
@ -83,12 +79,6 @@ class VanillaProvider(p.ProvisioningPluginBase):
if oo_count != 1:
raise ex.InvalidComponentCountException('oozie', '1', oo_count)
def get_name_node_uri(self, cluster):
return cluster['info']['HDFS']['NameNode']
def get_oozie_server_uri(self, cluster):
return cluster['info']['JobFlow']['Oozie'] + "/oozie/"
def get_edp_engine(self, cluster, job_type):
return self._get_version_handler(
cluster.hadoop_version).get_edp_engine(cluster, job_type)

View File

@ -21,3 +21,6 @@ class EdpOozieEngine(edp_engine.EdpOozieEngine):
def create_hdfs_dir(self, remote, dir_name):
hdfs_helper.create_dir_hadoop1(remote, dir_name, self.get_hdfs_user())
def get_resource_manager_uri(self, cluster):
return cluster['info']['MapReduce']['JobTracker']

View File

@ -55,9 +55,6 @@ class VersionHandler(avm.AbstractVersionHandler):
"Hive": ["hiveserver"]
}
def get_resource_manager_uri(self, cluster):
return cluster['info']['MapReduce']['JobTracker']
def get_oozie_server(self, cluster):
return vu.get_oozie(cluster)

View File

@ -140,9 +140,6 @@ class VersionHandler(avm.AbstractVersionHandler):
def get_oozie_server(self, cluster):
return vu.get_oozie(cluster)
def get_resource_manager_uri(self, cluster):
return cluster['info']['YARN']['ResourceManager']
def get_edp_engine(self, cluster, job_type):
if job_type in edp_engine.EdpOozieEngine.get_supported_job_types():
return edp_engine.EdpOozieEngine(cluster)

View File

@ -109,7 +109,8 @@ class VersionHandler(avm.AbstractVersionHandler):
if rm:
info['YARN'] = {
'Web UI': 'http://%s:%s' % (rm.management_ip, '8088')
'Web UI': 'http://%s:%s' % (rm.management_ip, '8088'),
'ResourceManager': 'http://%s:%s' % (rm.management_ip, '8032')
}
if nn:
@ -134,11 +135,6 @@ class VersionHandler(avm.AbstractVersionHandler):
def get_oozie_server(self, cluster):
return vu.get_oozie(cluster)
def get_resource_manager_uri(self, cluster):
rm = vu.get_resourcemanager(cluster)
return 'http://%(host)s:%(port)s' % {'host': rm.management_ip,
'port': '8032'}
def get_edp_engine(self, cluster, job_type):
if job_type in edp_engine.EdpOozieEngine.get_supported_job_types():
return edp_engine.EdpOozieEngine(cluster)

View File

@ -92,4 +92,4 @@ def get_data_sources(job_execution, job):
def _append_slash_if_needed(path):
if path[-1] != '/':
path += '/'
return path
return path

View File

@ -45,12 +45,12 @@ class OozieJobEngine(base_engine.JobEngine):
self.plugin = job_utils.get_plugin(self.cluster)
def _get_client(self):
return o.OozieClient(self.plugin.get_oozie_server_uri(self.cluster),
return o.OozieClient(self.get_oozie_server_uri(self.cluster),
self.plugin.get_oozie_server(self.cluster))
def _get_oozie_job_params(self, hdfs_user, path_to_workflow):
rm_path = self.plugin.get_resource_manager_uri(self.cluster)
nn_path = self.plugin.get_name_node_uri(self.cluster)
rm_path = self.get_resource_manager_uri(self.cluster)
nn_path = self.get_name_node_uri(self.cluster)
job_parameters = {
"jobTracker": rm_path,
"nameNode": nn_path,
@ -125,6 +125,18 @@ class OozieJobEngine(base_engine.JobEngine):
def create_hdfs_dir(self, remote, dir_name):
pass
@abc.abstractmethod
def get_oozie_server_uri(self, cluster):
pass
@abc.abstractmethod
def get_name_node_uri(self, cluster):
pass
@abc.abstractmethod
def get_resource_manager_uri(self, cluster):
pass
@staticmethod
def get_possible_job_config(job_type):
return workflow_factory.get_possible_job_config(job_type)

View File

@ -44,4 +44,4 @@ class VanillaPluginTest(base.SaharaWithDbTestCase):
create_dir.reset_mock()
plugin.get_edp_engine(cluster, edp.JOB_TYPE_PIG).create_hdfs_dir(
mock.Mock(), '/tmp')
self.assertEqual(1, create_dir.call_count)
self.assertEqual(1, create_dir.call_count)

View File

@ -38,14 +38,7 @@ class TestOozieEngine(base.SaharaTestCase):
res = oje._add_postfix('aba')
self.assertEqual("aba/", res)
@mock.patch('sahara.service.edp.job_utils.get_plugin')
def test_get_oozie_job_params(self, getplugin):
plugin = mock.Mock()
getplugin.return_value = plugin
plugin.get_resource_manager_uri.return_value = 'http://localhost:50030'
plugin.get_name_node_uri.return_value = 'hdfs://localhost:8020'
def test_get_oozie_job_params(self):
oje = FakeOozieJobEngine(u.create_cluster())
job_params = oje._get_oozie_job_params('hadoop', '/tmp')
self.assertEqual('http://localhost:50030', job_params["jobTracker"])
@ -88,3 +81,12 @@ class FakeOozieJobEngine(oe.OozieJobEngine):
def create_hdfs_dir(self, remote, dir_name):
return
def get_oozie_server_uri(self, cluster):
return 'http://localhost:11000/oozie'
def get_name_node_uri(self, cluster):
return 'hdfs://localhost:8020'
def get_resource_manager_uri(self, cluster):
return 'http://localhost:50030'