Extended plugin SPI with methods to communicate with EDP

EDP must communicate with plugins via SPI only. It must not
assume data in cluster extra.

Change-Id: Id89fd7b9b5ae3977233d1e049a2f4040fbe92e00
Implements: blueprint edp-plugin-communication
This commit is contained in:
Andrew Lazarev 2014-02-14 14:32:01 -08:00
parent c3fa215e64
commit 9732c6273c
5 changed files with 47 additions and 14 deletions

View File

@ -122,12 +122,26 @@ When user terminates cluster, Sahara simply shuts down all the cluster VMs. This
*Returns*: None
get_oozie_server(cluster)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
~~~~~~~~~~~~~~~~~~~~~~~~~
Returns the instance object for the host running the Oozie server (this service may be referenced by a vendor-dependent identifier)
*Returns*: The Oozie server instance object
get_name_node_uri(cluster)
~~~~~~~~~~~~~~~~~~~~~~~~~~
Returns the URI for access to the Name Node
*Returns*: The Name Node URI
get_oozie_server_uri(cluster)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Returns the URI for access to the Oozie server
*Returns*: The Oozie server URI
get_resource_manager_uri(cluster)
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

View File

@ -152,6 +152,12 @@ class AmbariPlugin(p.ProvisioningPluginBase):
self.version_factory.get_version_handler(cluster.hadoop_version))
return version_handler.get_resource_manager_uri(cluster)
def get_name_node_uri(self, cluster):
return cluster['info']['HDFS']['NameNode']
def get_oozie_server_uri(self, cluster):
return cluster['info']['JobFlow']['Oozie'] + "/oozie/"
def update_infra(self, cluster):
pass

View File

@ -62,10 +62,18 @@ class ProvisioningPluginBase(plugins_base.PluginInterface):
def scale_cluster(self, cluster, instances):
pass
@plugins_base.optional
def get_name_node_uri(self, cluster):
pass
@plugins_base.optional
def get_oozie_server(self, cluster):
pass
@plugins_base.optional
def get_oozie_server_uri(self, cluster):
pass
@plugins_base.optional
def validate_edp(self, cluster):
pass

View File

@ -84,3 +84,9 @@ class VanillaProvider(p.ProvisioningPluginBase):
oo_count = u.get_instances_count(cluster, 'oozie')
if oo_count != 1:
raise ex.InvalidComponentCountException('oozie', '1', oo_count)
def get_name_node_uri(self, cluster):
return cluster['info']['HDFS']['NameNode']
def get_oozie_server_uri(self, cluster):
return cluster['info']['JobFlow']['Oozie'] + "/oozie/"

View File

@ -61,8 +61,7 @@ def get_job_status(job_execution_id):
if cluster is None or cluster.status != 'Active':
return job_execution
client = o.OozieClient(cluster['info']['JobFlow']['Oozie'] + "/oozie",
_get_oozie_server(cluster))
client = _create_oozie_client(cluster)
job_info = client.get_job_status(job_execution)
update = {"info": job_info}
if job_info['status'] in terminated_job_states:
@ -89,23 +88,23 @@ def _get_hdfs_user(cluster):
return hdfs_user
def _create_oozie_client(cluster):
plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name)
return o.OozieClient(plugin.get_oozie_server_uri(cluster),
plugin.get_oozie_server(cluster))
def _get_oozie_server(cluster):
plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name)
return plugin.get_oozie_server(cluster)
def _get_resource_manager_path(cluster):
plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name)
return plugin.get_resource_manager_uri(cluster)
def cancel_job(job_execution_id):
ctx = context.ctx()
job_execution = conductor.job_execution_get(ctx, job_execution_id)
cluster = conductor.cluster_get(ctx, job_execution.cluster_id)
client = o.OozieClient(cluster['info']['JobFlow']['Oozie'] + "/oozie/",
_get_oozie_server(cluster))
client = _create_oozie_client(cluster)
client.kill_job(job_execution)
job_info = client.get_job_status(job_execution)
@ -151,11 +150,11 @@ def run_job(job_execution):
path_to_workflow = upload_workflow_file(oozie_server,
wf_dir, wf_xml, hdfs_user)
rm_path = _get_resource_manager_path(cluster)
nn_path = cluster['info']['HDFS']['NameNode']
plugin = plugin_base.PLUGINS.get_plugin(cluster.plugin_name)
rm_path = plugin.get_resource_manager_uri(cluster)
nn_path = plugin.get_name_node_uri(cluster)
client = o.OozieClient(cluster['info']['JobFlow']['Oozie'] + "/oozie/",
_get_oozie_server(cluster))
client = _create_oozie_client(cluster)
job_parameters = {"jobTracker": rm_path,
"nameNode": nn_path,
"user.name": hdfs_user,