diff --git a/doc/source/devref/plugin.spi.rst b/doc/source/devref/plugin.spi.rst index a2c277ce..a2e93100 100644 --- a/doc/source/devref/plugin.spi.rst +++ b/doc/source/devref/plugin.spi.rst @@ -136,27 +136,6 @@ Returns the instance object for the host running the Oozie server (this service *Returns*: The Oozie server instance object -get_name_node_uri(cluster) -~~~~~~~~~~~~~~~~~~~~~~~~~~ - -Returns the URI for access to the Name Node - -*Returns*: The Name Node URI - -get_oozie_server_uri(cluster) -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -Returns the URI for access to the Oozie server - -*Returns*: The Oozie server URI - -get_resource_manager_uri(cluster) -~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ - -Returns the URI for access to the mapred resource manager (e.g Hadoop 1.x - jobtracker, Hadoop 2.x - yarn resource manager) - -*Returns*: The resource manager URI - Object Model ============ diff --git a/sahara/plugins/cdh/edp_engine.py b/sahara/plugins/cdh/edp_engine.py index e3c79312..991500cf 100644 --- a/sahara/plugins/cdh/edp_engine.py +++ b/sahara/plugins/cdh/edp_engine.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from sahara.plugins.cdh import utils as cu from sahara.service.edp import hdfs_helper from sahara.service.edp.oozie import engine as edp_engine @@ -24,3 +25,15 @@ class EdpOozieEngine(edp_engine.OozieJobEngine): def create_hdfs_dir(self, remote, dir_name): hdfs_helper.create_dir_hadoop2(remote, dir_name, self.get_hdfs_user()) + + def get_oozie_server_uri(self, cluster): + oozie_ip = cu.get_oozie(cluster).management_ip + return 'http://%s:11000/oozie' % oozie_ip + + def get_name_node_uri(self, cluster): + namenode_ip = cu.get_namenode(cluster).fqdn() + return 'hdfs://%s:8020' % namenode_ip + + def get_resource_manager_uri(self, cluster): + resourcemanager_ip = cu.get_resourcemanager(cluster).fqdn() + return '%s:8032' % resourcemanager_ip diff --git a/sahara/plugins/cdh/plugin.py b/sahara/plugins/cdh/plugin.py index d0bf63c8..78803079 100644 --- a/sahara/plugins/cdh/plugin.py +++ b/sahara/plugins/cdh/plugin.py @@ -78,18 +78,6 @@ class CDHPluginProvider(p.ProvisioningPluginBase): def get_oozie_server(self, cluster): return cu.get_oozie(cluster) - def get_oozie_server_uri(self, cluster): - oozie_ip = cu.get_oozie(cluster).management_ip - return 'http://%s:11000/oozie' % oozie_ip - - def get_name_node_uri(self, cluster): - namenode_ip = cu.get_namenode(cluster).fqdn() - return 'hdfs://%s:8020' % namenode_ip - - def get_resource_manager_uri(self, cluster): - resourcemanager_ip = cu.get_resourcemanager(cluster).fqdn() - return '%s:8032' % resourcemanager_ip - def _set_cluster_info(self, cluster): mng = cu.get_manager(cluster) info = { diff --git a/sahara/plugins/hdp/ambariplugin.py b/sahara/plugins/hdp/ambariplugin.py index 2fcd3cca..e6ba0d1e 100644 --- a/sahara/plugins/hdp/ambariplugin.py +++ b/sahara/plugins/hdp/ambariplugin.py @@ -151,17 +151,6 @@ class AmbariPlugin(p.ProvisioningPluginBase): raise ex.InvalidComponentCountException( 'OOZIE_SERVER', '1', oo_count) - def get_resource_manager_uri(self, cluster): - version_handler = ( - self.version_factory.get_version_handler(cluster.hadoop_version)) - return version_handler.get_resource_manager_uri(cluster) - - def get_name_node_uri(self, cluster): - return cluster['info']['HDFS']['NameNode'] - - def get_oozie_server_uri(self, cluster): - return cluster['info']['JobFlow']['Oozie'] + "/oozie/" - def update_infra(self, cluster): pass diff --git a/sahara/plugins/hdp/edp_engine.py b/sahara/plugins/hdp/edp_engine.py index 8a88098a..e9d94e6a 100644 --- a/sahara/plugins/hdp/edp_engine.py +++ b/sahara/plugins/hdp/edp_engine.py @@ -20,3 +20,9 @@ class EdpOozieEngine(edp_engine.OozieJobEngine): def get_hdfs_user(self): return 'hdfs' + + def get_name_node_uri(self, cluster): + return cluster['info']['HDFS']['NameNode'] + + def get_oozie_server_uri(self, cluster): + return cluster['info']['JobFlow']['Oozie'] + "/oozie/" diff --git a/sahara/plugins/hdp/versions/abstractversionhandler.py b/sahara/plugins/hdp/versions/abstractversionhandler.py index 6a537184..00494719 100644 --- a/sahara/plugins/hdp/versions/abstractversionhandler.py +++ b/sahara/plugins/hdp/versions/abstractversionhandler.py @@ -58,10 +58,6 @@ class AbstractVersionHandler(): def get_services_processor(self): return - @abc.abstractmethod - def get_resource_manager_uri(self, cluster): - return - @abc.abstractmethod def get_edp_engine(self, cluster, job_type): return diff --git a/sahara/plugins/hdp/versions/version_1_3_2/edp_engine.py b/sahara/plugins/hdp/versions/version_1_3_2/edp_engine.py index 4b98469f..278569c1 100644 --- a/sahara/plugins/hdp/versions/version_1_3_2/edp_engine.py +++ b/sahara/plugins/hdp/versions/version_1_3_2/edp_engine.py @@ -21,3 +21,6 @@ class EdpOozieEngine(edp_engine.EdpOozieEngine): def create_hdfs_dir(self, remote, dir_name): hdfs_helper.create_dir_hadoop1(remote, dir_name, self.get_hdfs_user()) + + def get_resource_manager_uri(self, cluster): + return cluster['info']['MapReduce']['JobTracker'] diff --git a/sahara/plugins/hdp/versions/version_1_3_2/versionhandler.py b/sahara/plugins/hdp/versions/version_1_3_2/versionhandler.py index de715499..af07c2e9 100644 --- a/sahara/plugins/hdp/versions/version_1_3_2/versionhandler.py +++ b/sahara/plugins/hdp/versions/version_1_3_2/versionhandler.py @@ -120,9 +120,6 @@ class VersionHandler(avm.AbstractVersionHandler): def get_services_processor(self): return services - def get_resource_manager_uri(self, cluster): - return cluster['info']['MapReduce']['JobTracker'] - def get_edp_engine(self, cluster, job_type): if job_type in edp_engine.EdpOozieEngine.get_supported_job_types(): return edp_engine.EdpOozieEngine(cluster) diff --git a/sahara/plugins/hdp/versions/version_2_0_6/edp_engine.py b/sahara/plugins/hdp/versions/version_2_0_6/edp_engine.py index afbe2f5d..ca9caecb 100644 --- a/sahara/plugins/hdp/versions/version_2_0_6/edp_engine.py +++ b/sahara/plugins/hdp/versions/version_2_0_6/edp_engine.py @@ -21,3 +21,6 @@ class EdpOozieEngine(edp_engine.EdpOozieEngine): def create_hdfs_dir(self, remote, dir_name): hdfs_helper.create_dir_hadoop2(remote, dir_name, self.get_hdfs_user()) + + def get_resource_manager_uri(self, cluster): + return cluster['info']['Yarn']['ResourceManager'] diff --git a/sahara/plugins/hdp/versions/version_2_0_6/versionhandler.py b/sahara/plugins/hdp/versions/version_2_0_6/versionhandler.py index f573719a..b925d740 100644 --- a/sahara/plugins/hdp/versions/version_2_0_6/versionhandler.py +++ b/sahara/plugins/hdp/versions/version_2_0_6/versionhandler.py @@ -108,9 +108,6 @@ class VersionHandler(avm.AbstractVersionHandler): def get_services_processor(self): return services - def get_resource_manager_uri(self, cluster): - return cluster['info']['Yarn']['ResourceManager'] - def get_edp_engine(self, cluster, job_type): if job_type in edp_engine.EdpOozieEngine.get_supported_job_types(): return edp_engine.EdpOozieEngine(cluster) diff --git a/sahara/plugins/provisioning.py b/sahara/plugins/provisioning.py index 2233e1e7..0c140ac9 100644 --- a/sahara/plugins/provisioning.py +++ b/sahara/plugins/provisioning.py @@ -61,18 +61,10 @@ class ProvisioningPluginBase(plugins_base.PluginInterface): def scale_cluster(self, cluster, instances): pass - @plugins_base.optional - def get_name_node_uri(self, cluster): - pass - @plugins_base.optional def get_oozie_server(self, cluster): pass - @plugins_base.optional - def get_oozie_server_uri(self, cluster): - pass - @plugins_base.optional def validate_edp(self, cluster): pass @@ -85,10 +77,6 @@ class ProvisioningPluginBase(plugins_base.PluginInterface): def get_open_ports(self, node_group): return [] - @plugins_base.optional - def get_resource_manager_uri(self, cluster): - pass - @plugins_base.required_with_default def decommission_nodes(self, cluster, instances): pass diff --git a/sahara/plugins/vanilla/abstractversionhandler.py b/sahara/plugins/vanilla/abstractversionhandler.py index 59c82942..1c23a75c 100644 --- a/sahara/plugins/vanilla/abstractversionhandler.py +++ b/sahara/plugins/vanilla/abstractversionhandler.py @@ -53,10 +53,6 @@ class AbstractVersionHandler(): def validate_scaling(self, cluster, existing, additional): return - @abc.abstractmethod - def get_resource_manager_uri(self, cluster): - return - @abc.abstractmethod def get_oozie_server(self, cluster): return diff --git a/sahara/plugins/vanilla/edp_engine.py b/sahara/plugins/vanilla/edp_engine.py index 23413419..79b840f6 100644 --- a/sahara/plugins/vanilla/edp_engine.py +++ b/sahara/plugins/vanilla/edp_engine.py @@ -19,3 +19,9 @@ from sahara.service.edp.oozie import engine as edp_engine class EdpOozieEngine(edp_engine.OozieJobEngine): def get_hdfs_user(self): return 'hadoop' + + def get_name_node_uri(self, cluster): + return cluster['info']['HDFS']['NameNode'] + + def get_oozie_server_uri(self, cluster): + return cluster['info']['JobFlow']['Oozie'] + "/oozie/" diff --git a/sahara/plugins/vanilla/hadoop2/edp_engine.py b/sahara/plugins/vanilla/hadoop2/edp_engine.py index f0768daf..8698db7d 100644 --- a/sahara/plugins/vanilla/hadoop2/edp_engine.py +++ b/sahara/plugins/vanilla/hadoop2/edp_engine.py @@ -21,3 +21,6 @@ class EdpOozieEngine(edp_engine.EdpOozieEngine): def create_hdfs_dir(self, remote, dir_name): hdfs_helper.create_dir_hadoop2(remote, dir_name, self.get_hdfs_user()) + + def get_resource_manager_uri(self, cluster): + return cluster['info']['YARN']['ResourceManager'] diff --git a/sahara/plugins/vanilla/plugin.py b/sahara/plugins/vanilla/plugin.py index a5206aed..d81c1d39 100644 --- a/sahara/plugins/vanilla/plugin.py +++ b/sahara/plugins/vanilla/plugin.py @@ -33,10 +33,6 @@ class VanillaProvider(p.ProvisioningPluginBase): def _get_version_handler(self, hadoop_version): return self.version_factory.get_version_handler(hadoop_version) - def get_resource_manager_uri(self, cluster): - return self._get_version_handler( - cluster.hadoop_version).get_resource_manager_uri(cluster) - def get_node_processes(self, hadoop_version): return self._get_version_handler(hadoop_version).get_node_processes() @@ -83,12 +79,6 @@ class VanillaProvider(p.ProvisioningPluginBase): if oo_count != 1: raise ex.InvalidComponentCountException('oozie', '1', oo_count) - def get_name_node_uri(self, cluster): - return cluster['info']['HDFS']['NameNode'] - - def get_oozie_server_uri(self, cluster): - return cluster['info']['JobFlow']['Oozie'] + "/oozie/" - def get_edp_engine(self, cluster, job_type): return self._get_version_handler( cluster.hadoop_version).get_edp_engine(cluster, job_type) diff --git a/sahara/plugins/vanilla/v1_2_1/edp_engine.py b/sahara/plugins/vanilla/v1_2_1/edp_engine.py index fc98b96a..d77cc1e0 100644 --- a/sahara/plugins/vanilla/v1_2_1/edp_engine.py +++ b/sahara/plugins/vanilla/v1_2_1/edp_engine.py @@ -21,3 +21,6 @@ class EdpOozieEngine(edp_engine.EdpOozieEngine): def create_hdfs_dir(self, remote, dir_name): hdfs_helper.create_dir_hadoop1(remote, dir_name, self.get_hdfs_user()) + + def get_resource_manager_uri(self, cluster): + return cluster['info']['MapReduce']['JobTracker'] diff --git a/sahara/plugins/vanilla/v1_2_1/versionhandler.py b/sahara/plugins/vanilla/v1_2_1/versionhandler.py index 4835ba9c..e549e124 100644 --- a/sahara/plugins/vanilla/v1_2_1/versionhandler.py +++ b/sahara/plugins/vanilla/v1_2_1/versionhandler.py @@ -55,9 +55,6 @@ class VersionHandler(avm.AbstractVersionHandler): "Hive": ["hiveserver"] } - def get_resource_manager_uri(self, cluster): - return cluster['info']['MapReduce']['JobTracker'] - def get_oozie_server(self, cluster): return vu.get_oozie(cluster) diff --git a/sahara/plugins/vanilla/v2_3_0/versionhandler.py b/sahara/plugins/vanilla/v2_3_0/versionhandler.py index 53264b7f..1b703f44 100644 --- a/sahara/plugins/vanilla/v2_3_0/versionhandler.py +++ b/sahara/plugins/vanilla/v2_3_0/versionhandler.py @@ -140,9 +140,6 @@ class VersionHandler(avm.AbstractVersionHandler): def get_oozie_server(self, cluster): return vu.get_oozie(cluster) - def get_resource_manager_uri(self, cluster): - return cluster['info']['YARN']['ResourceManager'] - def get_edp_engine(self, cluster, job_type): if job_type in edp_engine.EdpOozieEngine.get_supported_job_types(): return edp_engine.EdpOozieEngine(cluster) diff --git a/sahara/plugins/vanilla/v2_4_1/versionhandler.py b/sahara/plugins/vanilla/v2_4_1/versionhandler.py index bd9f075d..9d2d93fd 100644 --- a/sahara/plugins/vanilla/v2_4_1/versionhandler.py +++ b/sahara/plugins/vanilla/v2_4_1/versionhandler.py @@ -109,7 +109,8 @@ class VersionHandler(avm.AbstractVersionHandler): if rm: info['YARN'] = { - 'Web UI': 'http://%s:%s' % (rm.management_ip, '8088') + 'Web UI': 'http://%s:%s' % (rm.management_ip, '8088'), + 'ResourceManager': 'http://%s:%s' % (rm.management_ip, '8032') } if nn: @@ -134,11 +135,6 @@ class VersionHandler(avm.AbstractVersionHandler): def get_oozie_server(self, cluster): return vu.get_oozie(cluster) - def get_resource_manager_uri(self, cluster): - rm = vu.get_resourcemanager(cluster) - return 'http://%(host)s:%(port)s' % {'host': rm.management_ip, - 'port': '8032'} - def get_edp_engine(self, cluster, job_type): if job_type in edp_engine.EdpOozieEngine.get_supported_job_types(): return edp_engine.EdpOozieEngine(cluster) diff --git a/sahara/service/edp/job_utils.py b/sahara/service/edp/job_utils.py index 3409c71a..ceb62334 100644 --- a/sahara/service/edp/job_utils.py +++ b/sahara/service/edp/job_utils.py @@ -92,4 +92,4 @@ def get_data_sources(job_execution, job): def _append_slash_if_needed(path): if path[-1] != '/': path += '/' - return path \ No newline at end of file + return path diff --git a/sahara/service/edp/oozie/engine.py b/sahara/service/edp/oozie/engine.py index 24450baa..8ae2e606 100644 --- a/sahara/service/edp/oozie/engine.py +++ b/sahara/service/edp/oozie/engine.py @@ -45,12 +45,12 @@ class OozieJobEngine(base_engine.JobEngine): self.plugin = job_utils.get_plugin(self.cluster) def _get_client(self): - return o.OozieClient(self.plugin.get_oozie_server_uri(self.cluster), + return o.OozieClient(self.get_oozie_server_uri(self.cluster), self.plugin.get_oozie_server(self.cluster)) def _get_oozie_job_params(self, hdfs_user, path_to_workflow): - rm_path = self.plugin.get_resource_manager_uri(self.cluster) - nn_path = self.plugin.get_name_node_uri(self.cluster) + rm_path = self.get_resource_manager_uri(self.cluster) + nn_path = self.get_name_node_uri(self.cluster) job_parameters = { "jobTracker": rm_path, "nameNode": nn_path, @@ -125,6 +125,18 @@ class OozieJobEngine(base_engine.JobEngine): def create_hdfs_dir(self, remote, dir_name): pass + @abc.abstractmethod + def get_oozie_server_uri(self, cluster): + pass + + @abc.abstractmethod + def get_name_node_uri(self, cluster): + pass + + @abc.abstractmethod + def get_resource_manager_uri(self, cluster): + pass + @staticmethod def get_possible_job_config(job_type): return workflow_factory.get_possible_job_config(job_type) diff --git a/sahara/tests/unit/plugins/vanilla/hadoop2/test_plugin.py b/sahara/tests/unit/plugins/vanilla/hadoop2/test_plugin.py index cb3b6d46..5c01efff 100644 --- a/sahara/tests/unit/plugins/vanilla/hadoop2/test_plugin.py +++ b/sahara/tests/unit/plugins/vanilla/hadoop2/test_plugin.py @@ -44,4 +44,4 @@ class VanillaPluginTest(base.SaharaWithDbTestCase): create_dir.reset_mock() plugin.get_edp_engine(cluster, edp.JOB_TYPE_PIG).create_hdfs_dir( mock.Mock(), '/tmp') - self.assertEqual(1, create_dir.call_count) \ No newline at end of file + self.assertEqual(1, create_dir.call_count) diff --git a/sahara/tests/unit/service/edp/oozie/test_oozie.py b/sahara/tests/unit/service/edp/oozie/test_oozie.py index c3840d1a..996dc2f6 100644 --- a/sahara/tests/unit/service/edp/oozie/test_oozie.py +++ b/sahara/tests/unit/service/edp/oozie/test_oozie.py @@ -38,14 +38,7 @@ class TestOozieEngine(base.SaharaTestCase): res = oje._add_postfix('aba') self.assertEqual("aba/", res) - @mock.patch('sahara.service.edp.job_utils.get_plugin') - def test_get_oozie_job_params(self, getplugin): - plugin = mock.Mock() - getplugin.return_value = plugin - - plugin.get_resource_manager_uri.return_value = 'http://localhost:50030' - plugin.get_name_node_uri.return_value = 'hdfs://localhost:8020' - + def test_get_oozie_job_params(self): oje = FakeOozieJobEngine(u.create_cluster()) job_params = oje._get_oozie_job_params('hadoop', '/tmp') self.assertEqual('http://localhost:50030', job_params["jobTracker"]) @@ -88,3 +81,12 @@ class FakeOozieJobEngine(oe.OozieJobEngine): def create_hdfs_dir(self, remote, dir_name): return + + def get_oozie_server_uri(self, cluster): + return 'http://localhost:11000/oozie' + + def get_name_node_uri(self, cluster): + return 'hdfs://localhost:8020' + + def get_resource_manager_uri(self, cluster): + return 'http://localhost:50030'