[EDP] Add Oozie Shell Job Type
This change adds the Shell job type, currently implemented for the Oozie engine (per spec). Oozie shell actions provide a great deal of flexibility and will empower users to easily customize and extend the features of Sahara EDP as needed. For example, a shell action could be used to manage hdfs on the cluster, do pre or post processing for another job launched from Sahara, or run a data processing job from a specialized launcher that does extra configuration not otherwise available from Sahara (ie, setting a special classpath for a Java job). Change-Id: I0d8b59cf55cf583f0d24c2c8c2e487813d8ec716 Implements: blueprint add-edp-shell-action
This commit is contained in:
parent
b9cd04839f
commit
36881a9cba
@ -6,7 +6,7 @@ Overview
|
||||
|
||||
Sahara's Elastic Data Processing facility or :dfn:`EDP` allows the execution of jobs on clusters created from Sahara. EDP supports:
|
||||
|
||||
* Hive, Pig, MapReduce, MapReduce.Streaming and Java job types on Hadoop clusters
|
||||
* Hive, Pig, MapReduce, MapReduce.Streaming, Java, and Shell job types on Hadoop clusters
|
||||
* Spark jobs on Spark standalone clusters
|
||||
* storage of job binaries in Swift or Sahara's own database
|
||||
* access to input and output data sources in
|
||||
@ -68,6 +68,8 @@ A :dfn:`Job` object specifies the type of the job and lists all of the individua
|
||||
+-------------------------+-------------+-----------+
|
||||
| ``Java`` | not used | required |
|
||||
+-------------------------+-------------+-----------+
|
||||
| ``Shell`` | required | optional |
|
||||
+-------------------------+-------------+-----------+
|
||||
| ``Spark`` | required | optional |
|
||||
+-------------------------+-------------+-----------+
|
||||
|
||||
@ -135,6 +137,8 @@ Jobs can be configured at launch. The job type determines the kinds of values th
|
||||
+--------------------------+---------------+------------+-----------+
|
||||
| ``Java`` | Yes | No | Yes |
|
||||
+--------------------------+---------------+------------+-----------+
|
||||
| ``Shell`` | Yes | Yes | Yes |
|
||||
+--------------------------+---------------+------------+-----------+
|
||||
| ``Spark`` | Yes | No | Yes |
|
||||
+--------------------------+---------------+------------+-----------+
|
||||
|
||||
@ -144,7 +148,7 @@ Jobs can be configured at launch. The job type determines the kinds of values th
|
||||
+ Other configuration values may be read at runtime by Hadoop jobs
|
||||
+ Currently additional configuration values are not available to Spark jobs at runtime
|
||||
|
||||
* :dfn:`Parameters` are key/value pairs. They supply values for the Hive and Pig parameter substitution mechanisms.
|
||||
* :dfn:`Parameters` are key/value pairs. They supply values for the Hive and Pig parameter substitution mechanisms. In Shell jobs, they are passed as environment variables.
|
||||
* :dfn:`Arguments` are strings passed as command line arguments to a shell or main program
|
||||
|
||||
These values can be set on the :guilabel:`Configure` tab during job launch through the web UI or through the *job_configs* parameter when using the */jobs/<job_id>/execute* REST method.
|
||||
@ -243,6 +247,17 @@ using one of the above two methods. Furthermore, if Swift data sources are used
|
||||
|
||||
The ``edp-wordcount`` example bundled with Sahara shows how to use configuration values, arguments, and Swift data paths in a Java job type.
|
||||
|
||||
Additional Details for Shell jobs
|
||||
+++++++++++++++++++++++++++++++++
|
||||
|
||||
A shell job will execute the script specified as ``main``, and will place any files specified as ``libs``
|
||||
in the same working directory (on both the filesystem and in HDFS). Command line arguments may be passed to
|
||||
the script through the ``args`` array, and any ``params`` values will be passed as environment variables.
|
||||
|
||||
Data Source objects are not used with Shell job types.
|
||||
|
||||
The ``edp-shell`` example bundled with Sahara contains a script which will output the executing user to
|
||||
a file specified by the first command line argument.
|
||||
|
||||
Additional Details for Spark jobs
|
||||
+++++++++++++++++++++++++++++++++
|
||||
|
3
etc/edp-examples/edp-shell/shell-example.sh
Normal file
3
etc/edp-examples/edp-shell/shell-example.sh
Normal file
@ -0,0 +1,3 @@
|
||||
#!/bin/sh
|
||||
cat $EXTRA_FILE > $1
|
||||
echo $USER >> $1
|
1
etc/edp-examples/edp-shell/shell-example.txt
Normal file
1
etc/edp-examples/edp-shell/shell-example.txt
Normal file
@ -0,0 +1 @@
|
||||
The user running this shell script is:
|
@ -15,7 +15,7 @@ with JSON payload examples, covering:
|
||||
* Binary storage in both Swift and the Sahara database, and
|
||||
* Job creation for Pig, Map/Reduce, Java, and Spark jobs.
|
||||
|
||||
Four example flows are provided:
|
||||
Five example flows are provided:
|
||||
|
||||
* A Pig job, using Swift for both data and binary storage.
|
||||
* A Map/Reduce job, using HDFS data sources registered in Sahara and Swift
|
||||
@ -23,21 +23,33 @@ Four example flows are provided:
|
||||
* A Java job, using raw HDFS data paths and the Sahara database for binary
|
||||
storage.
|
||||
* A Spark job without data inputs, using Swift for binary storage.
|
||||
* A shell job without data inputs, using the Sahara database for binary
|
||||
storage.
|
||||
|
||||
Many other combinations of data source storage, binary storage, and job type
|
||||
are possible. These examples are intended purely as a point of departure for
|
||||
modification and experimentation for any Sahara user who prefers a
|
||||
command-line interface to UI (or who intends to automate Sahara usage.)
|
||||
|
||||
A Note on Preconditions and Formatting
|
||||
--------------------------------------
|
||||
Notes
|
||||
=====
|
||||
|
||||
Formatting
|
||||
----------
|
||||
|
||||
The json files provided make many assumptions, allowing the examples to be as
|
||||
literal as possible. However, where objects created by the flow must refer to
|
||||
one another's generated ids, Python dictionary-style is used.
|
||||
|
||||
A Note on Swift Credentials
|
||||
---------------------------
|
||||
Oozie is Required for Hadoop
|
||||
----------------------------
|
||||
|
||||
When the preconditions for a given example specify that you must have "an
|
||||
active Hadoop cluster", that cluster must be running an Oozie process in all
|
||||
cases, as Sahara's EDP jobs are scheduled through Oozie in all Hadoop plugins.
|
||||
|
||||
Swift Credentials
|
||||
-----------------
|
||||
|
||||
For the sake of simplicity, these examples pass Swift credentials to the API
|
||||
when creating data sources, storing binaries, and executing jobs. Use of a
|
||||
@ -46,8 +58,8 @@ store credentials.
|
||||
|
||||
.. _Swift proxy: http://docs.openstack.org/developer/sahara/userdoc/advanced.configuration.guide.html
|
||||
|
||||
A Note for REST Users
|
||||
---------------------
|
||||
Raw REST Usage
|
||||
--------------
|
||||
|
||||
The CLI and Python Sahara client provide their own authentication mechanisms
|
||||
and endpoint discovery. If you wish to use the raw REST API, however, please
|
||||
@ -67,7 +79,7 @@ Preconditions
|
||||
|
||||
This example assumes the following:
|
||||
|
||||
1. Usage of an OpenStack user named "demo", with password "password."
|
||||
1. Usage of an OpenStack user named "demo", with password "password".
|
||||
2. An active Hadoop cluster exists in the demo user's project.
|
||||
3. In the demo user's project, the following files are stored in Swift in the
|
||||
container ``edp-examples``, as follows:
|
||||
@ -82,22 +94,22 @@ This example assumes the following:
|
||||
Steps
|
||||
-----
|
||||
|
||||
1. **Input**: Post the payload at ``data-sources/create.swift-pig-input.json``
|
||||
1. **Input**: POST the payload at ``data-sources/create.swift-pig-input.json``
|
||||
to your Sahara endpoint's ``data-sources`` path. Note the new object's
|
||||
id.
|
||||
2. **Output**: Post the payload at
|
||||
2. **Output**: POST the payload at
|
||||
``data-sources/create.swift-pig-output.json`` to your Sahara endpoint's
|
||||
``data-sources`` path. Note the new object's id.
|
||||
3. **Script**: Post the payload at ``job-binaries/create.pig-job.json`` to
|
||||
3. **Script**: POST the payload at ``job-binaries/create.pig-job.json`` to
|
||||
your Sahara endpoint's ``job-binaries`` path. Note the new object's id.
|
||||
4. **UDF .jar**: Post the payload at ``job-binaries/create.pig-udf.json`` to
|
||||
4. **UDF .jar**: POST the payload at ``job-binaries/create.pig-udf.json`` to
|
||||
your Sahara endpoint's ``job-binaries`` path. Note the new object's id.
|
||||
5. **Job**: Insert the script binary id from step 3 and the UDF binary id from
|
||||
step 4 into the payload at ``jobs/create.pig.json``. Then post this file to
|
||||
step 4 into the payload at ``jobs/create.pig.json``. Then POST this file to
|
||||
your Sahara endpoint's ``jobs`` path. Note the new object's id.
|
||||
6. **Job Execution**: Insert your Hadoop cluster id, the input id from step 1,
|
||||
and the output id from step 2 into the payload at
|
||||
``job-executions/execute.pig.json``. Then post this file to your Sahara
|
||||
``job-executions/execute.pig.json``. Then POST this file to your Sahara
|
||||
endpoint at path ``jobs/{job id from step 5}/execute``.
|
||||
|
||||
Note
|
||||
@ -115,7 +127,7 @@ Preconditions
|
||||
|
||||
This example assumes the following:
|
||||
|
||||
1. Usage of an OpenStack user named "demo", with password "password."
|
||||
1. Usage of an OpenStack user named "demo", with password "password".
|
||||
2. An active Hadoop cluster exists in the demo user's project, with the
|
||||
master node's HDFS available at URL
|
||||
``hdfs://hadoop-cluster-master-001:8020/``.
|
||||
@ -128,20 +140,20 @@ This example assumes the following:
|
||||
Steps
|
||||
-----
|
||||
|
||||
1. **Input**: Post the payload at
|
||||
1. **Input**: POST the payload at
|
||||
``data-sources/create.hdfs-map-reduce-input.json`` to your Sahara
|
||||
endpoint's ``data-sources`` path. Note the new object's id.
|
||||
2. **Output**: Post the payload at
|
||||
2. **Output**: POST the payload at
|
||||
``data-sources/create.hdfs-map-reduce-output.json`` to your Sahara
|
||||
endpoint's ``data-sources`` path. Note the new object's id.
|
||||
3. **Binary**: Post the payload at ``job-binaries/create.map-reduce.json`` to
|
||||
3. **Binary**: POST the payload at ``job-binaries/create.map-reduce.json`` to
|
||||
your Sahara endpoint's ``job-binaries`` path. Note the new object's id.
|
||||
4. **Job**: Insert the binary id from step 3 into the payload at
|
||||
``jobs/create.map-reduce.json``. Then post this file to your Sahara
|
||||
``jobs/create.map-reduce.json``. Then POST this file to your Sahara
|
||||
endpoint's ``jobs`` path. Note the new object's id.
|
||||
5. **Job Execution**: Insert your Hadoop cluster id, the input id from step 1,
|
||||
and the output id from step 2 into the payload at
|
||||
``job-executions/execute.map-reduce.json``. Then post this file to your
|
||||
``job-executions/execute.map-reduce.json``. Then POST this file to your
|
||||
Sahara endpoint at path ``jobs/{job id from step 4}/execute``.
|
||||
|
||||
|
||||
@ -153,7 +165,7 @@ Preconditions
|
||||
|
||||
This example assumes the following:
|
||||
|
||||
1. Usage of an OpenStack user named "demo", with password "password."
|
||||
1. Usage of an OpenStack user named "demo", with password "password".
|
||||
2. An active Hadoop cluster exists in the demo user's project, with the
|
||||
master node's HDFS available at URL
|
||||
``hdfs://hadoop-cluster-master-001:8020/``.
|
||||
@ -163,17 +175,17 @@ This example assumes the following:
|
||||
Steps
|
||||
-----
|
||||
|
||||
1. **Internal Job Binary**: Put the file at
|
||||
1. **Internal Job Binary**: PUT the file at
|
||||
``edp-examples/edp-java/edp-java.jar`` into your Sahara endpoint at path
|
||||
``job-binary-internals/edp-java.jar``. Note the new object's id.
|
||||
2. **Job Binary**: Insert the internal job binary id from step 1 into the
|
||||
payload at ``job-binaries/create.java.json``. Then post this file to your
|
||||
payload at ``job-binaries/create.java.json``. Then POST this file to your
|
||||
Sahara endpoint's ``job-binaries`` path. Note the new object's id.
|
||||
3. **Job**: Insert the binary id from step 2 into the payload at
|
||||
``jobs/create.java.json``. Then post this file to your Sahara endpoint's
|
||||
``jobs/create.java.json``. Then POST this file to your Sahara endpoint's
|
||||
``jobs`` path. Note the new object's id.
|
||||
4. **Job Execution**: Insert your Hadoop cluster id into the payload at
|
||||
``job-executions/execute.java.json``. Then post this file to your Sahara
|
||||
``job-executions/execute.java.json``. Then POST this file to your Sahara
|
||||
endpoint at path ``jobs/{job id from step 3}/execute``.
|
||||
|
||||
|
||||
@ -185,7 +197,7 @@ Preconditions
|
||||
|
||||
This example assumes the following:
|
||||
|
||||
1. Usage of an OpenStack user named "demo", with password "password."
|
||||
1. Usage of an OpenStack user named "demo", with password "password".
|
||||
2. An active Spark cluster exists in the demo user's project.
|
||||
3. In the demo user's project, the file at
|
||||
``edp-examples/edp-spark/spark-example.jar`` is stored in Swift in the
|
||||
@ -194,13 +206,13 @@ This example assumes the following:
|
||||
Steps
|
||||
-----
|
||||
|
||||
1. **Job Binary**: Post the payload at ``job-binaries/create.spark.json``
|
||||
1. **Job Binary**: POST the payload at ``job-binaries/create.spark.json``
|
||||
to your Sahara endpoint's ``job-binaries`` path. Note the new object's id.
|
||||
2. **Job**: Insert the binary id from step 1 into the payload at
|
||||
``jobs/create.spark.json``. Then post this file to your Sahara endpoint's
|
||||
``jobs/create.spark.json``. Then POST this file to your Sahara endpoint's
|
||||
``jobs`` path. Note the new object's id.
|
||||
3. **Job Execution**: Insert your Spark cluster id into the payload at
|
||||
``job-executions/execute.spark.json``. Then post this file to your Sahara
|
||||
``job-executions/execute.spark.json``. Then POST this file to your Sahara
|
||||
endpoint at path ``jobs/{job id from step 2}/execute``.
|
||||
|
||||
Note
|
||||
@ -208,3 +220,37 @@ Note
|
||||
|
||||
Spark jobs can use additional library binaries, but none are needed for the
|
||||
example job.
|
||||
|
||||
|
||||
Example 5: Shell script, using the Sahara DB
|
||||
============================================
|
||||
|
||||
Preconditions
|
||||
-------------
|
||||
|
||||
This example assumes the following:
|
||||
|
||||
1. Usage of an OpenStack user named "demo", with password "password".
|
||||
2. An active Hadoop cluster exists in the demo user's project.
|
||||
|
||||
Steps
|
||||
-----
|
||||
|
||||
1. **Script File**: PUT the file at
|
||||
``edp-examples/edp-shell/shell-example.sh`` into your Sahara endpoint at
|
||||
path ``job-binary-internals/shell-example.sh``. Note the new object's id.
|
||||
2. **Text File**: PUT the file at
|
||||
``edp-examples/edp-shell/shell-example.txt`` into your Sahara endpoint at
|
||||
path ``job-binary-internals/shell-example.txt``. Note the new object's id.
|
||||
3. **Script Binary**: Insert the script file's id from step 1 into the payload
|
||||
at ``job-binaries/create.shell-script.json``. Then POST this file to your
|
||||
Sahara endpoint's ``job-binaries`` path. Note the new object's id.
|
||||
4. **Text Binary**: Insert the text file's id from step 2 into the payload
|
||||
at ``job-binaries/create.shell-text.json``. Then POST this file to your
|
||||
Sahara endpoint's ``job-binaries`` path. Note the new object's id.
|
||||
5. **Job**: Insert the binary ids from steps 3 and 4 into the payload at
|
||||
``jobs/create.shell.json``. Then POST this file to your Sahara endpoint's
|
||||
``jobs`` path. Note the new object's id.
|
||||
6. **Job Execution**: Insert your Hadoop cluster id into the payload at
|
||||
``job-executions/execute.java.json``. Then POST this file to your Sahara
|
||||
endpoint at path ``jobs/{job id from step 5}/execute``.
|
||||
|
@ -0,0 +1,6 @@
|
||||
{
|
||||
"name": "shell-example.sh",
|
||||
"description": "An example shell script",
|
||||
"url": "internal-db://%(script_binary_internal_id)s",
|
||||
"extra": {}
|
||||
}
|
@ -0,0 +1,6 @@
|
||||
{
|
||||
"name": "shell-example.txt",
|
||||
"description": "An example text file",
|
||||
"url": "internal-db://%(text_binary_internal_id)s",
|
||||
"extra": {}
|
||||
}
|
@ -0,0 +1,8 @@
|
||||
{
|
||||
"cluster_id": "%(cluster_id)s",
|
||||
"job_configs": {
|
||||
"configs": {},
|
||||
"args": ["/tmp/edp-shell-example-output.txt"],
|
||||
"params": {"EXTRA_FILE": "shell-example.txt"}
|
||||
}
|
||||
}
|
@ -0,0 +1,7 @@
|
||||
{
|
||||
"name": "demo-shell-job",
|
||||
"type": "Shell",
|
||||
"description": "A runnable Shell job",
|
||||
"mains": ["%(script_binary_id)s"],
|
||||
"libs": ["%(text_binary_id)s"]
|
||||
}
|
@ -363,6 +363,15 @@ class LocalApi(object):
|
||||
return binary["name"]
|
||||
return None
|
||||
|
||||
def job_lib_names(self, context, job):
|
||||
"""Return the name of all job lib binaries or an empty list.
|
||||
|
||||
:param job: This is expected to be a Job object
|
||||
"""
|
||||
lib_ids = job.libs or []
|
||||
binaries = (self.job_binary_get(context, lib_id) for lib_id in lib_ids)
|
||||
return [binary["name"] for binary in binaries if binary is not None]
|
||||
|
||||
# JobBinary ops
|
||||
|
||||
@r.wrap(r.JobBinary)
|
||||
|
@ -14,6 +14,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
import abc
|
||||
import os
|
||||
import uuid
|
||||
|
||||
from oslo_config import cfg
|
||||
@ -177,9 +178,12 @@ class OozieJobEngine(base_engine.JobEngine):
|
||||
pass
|
||||
|
||||
def validate_job_execution(self, cluster, job, data):
|
||||
# All types except Java require input and output objects
|
||||
# and Java require main class
|
||||
if job.type in [edp.JOB_TYPE_JAVA]:
|
||||
# Shell job type requires no specific fields
|
||||
if job.type == edp.JOB_TYPE_SHELL:
|
||||
return
|
||||
# All other types except Java require input and output
|
||||
# objects and Java require main class
|
||||
if job.type == edp.JOB_TYPE_JAVA:
|
||||
j.check_main_class_present(data, job)
|
||||
else:
|
||||
j.check_data_sources(data, job)
|
||||
@ -199,7 +203,8 @@ class OozieJobEngine(base_engine.JobEngine):
|
||||
edp.JOB_TYPE_JAVA,
|
||||
edp.JOB_TYPE_MAPREDUCE,
|
||||
edp.JOB_TYPE_MAPREDUCE_STREAMING,
|
||||
edp.JOB_TYPE_PIG]
|
||||
edp.JOB_TYPE_PIG,
|
||||
edp.JOB_TYPE_SHELL]
|
||||
|
||||
def _upload_job_files_to_hdfs(self, where, job_dir, job, configs,
|
||||
proxy_configs=None):
|
||||
@ -208,14 +213,15 @@ class OozieJobEngine(base_engine.JobEngine):
|
||||
builtin_libs = edp.get_builtin_binaries(job, configs)
|
||||
uploaded_paths = []
|
||||
hdfs_user = self.get_hdfs_user()
|
||||
lib_dir = job_dir + '/lib'
|
||||
job_dir_suffix = 'lib' if job.type != edp.JOB_TYPE_SHELL else ''
|
||||
lib_dir = os.path.join(job_dir, job_dir_suffix)
|
||||
|
||||
with remote.get_remote(where) as r:
|
||||
for main in mains:
|
||||
raw_data = dispatch.get_raw_binary(main, proxy_configs)
|
||||
h.put_file_to_hdfs(r, raw_data, main.name, job_dir, hdfs_user)
|
||||
uploaded_paths.append(job_dir + '/' + main.name)
|
||||
if len(libs) > 0:
|
||||
if len(libs) and job_dir_suffix:
|
||||
# HDFS 2.2.0 fails to put file if the lib dir does not exist
|
||||
self.create_hdfs_dir(r, lib_dir)
|
||||
for lib in libs:
|
||||
|
45
sahara/service/edp/oozie/workflow_creator/shell_workflow.py
Normal file
45
sahara/service/edp/oozie/workflow_creator/shell_workflow.py
Normal file
@ -0,0 +1,45 @@
|
||||
# Copyright (c) 2015 Red Hat Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
from sahara.service.edp.oozie.workflow_creator import base_workflow
|
||||
from sahara.utils import xmlutils as x
|
||||
|
||||
|
||||
class ShellWorkflowCreator(base_workflow.OozieWorkflowCreator):
|
||||
|
||||
SHELL_XMLNS = {"xmlns": "uri:oozie:shell-action:0.1"}
|
||||
|
||||
def __init__(self):
|
||||
super(ShellWorkflowCreator, self).__init__('shell')
|
||||
|
||||
def build_workflow_xml(self, script_name, prepare={},
|
||||
job_xml=None, configuration=None, env_vars={},
|
||||
arguments=[], files=[]):
|
||||
x.add_attributes_to_element(self.doc, self.tag_name, self.SHELL_XMLNS)
|
||||
|
||||
for k in sorted(prepare):
|
||||
self._add_to_prepare_element(k, prepare[k])
|
||||
|
||||
self._add_configuration_elements(configuration)
|
||||
|
||||
x.add_text_element_to_tag(self.doc, self.tag_name, 'exec', script_name)
|
||||
|
||||
for arg in arguments:
|
||||
x.add_text_element_to_tag(self.doc, self.tag_name, 'argument', arg)
|
||||
|
||||
x.add_equal_separated_dict(self.doc, self.tag_name,
|
||||
'env-var', env_vars)
|
||||
|
||||
self._add_files_and_archives(files + [script_name], [])
|
@ -22,6 +22,7 @@ from sahara.service.edp.oozie.workflow_creator import hive_workflow
|
||||
from sahara.service.edp.oozie.workflow_creator import java_workflow
|
||||
from sahara.service.edp.oozie.workflow_creator import mapreduce_workflow
|
||||
from sahara.service.edp.oozie.workflow_creator import pig_workflow
|
||||
from sahara.service.edp.oozie.workflow_creator import shell_workflow
|
||||
from sahara.swift import swift_helper as sw
|
||||
from sahara.swift import utils as su
|
||||
from sahara.utils import edp
|
||||
@ -243,6 +244,34 @@ class JavaFactory(BaseFactory):
|
||||
return creator.get_built_workflow_xml()
|
||||
|
||||
|
||||
class ShellFactory(BaseFactory):
|
||||
|
||||
def __init__(self, job):
|
||||
super(ShellFactory, self).__init__()
|
||||
self.name, self.file_names = self.get_file_names(job)
|
||||
|
||||
def get_file_names(self, job):
|
||||
ctx = context.ctx()
|
||||
return (conductor.job_main_name(ctx, job),
|
||||
conductor.job_lib_names(ctx, job))
|
||||
|
||||
def get_configs(self):
|
||||
return {'configs': {},
|
||||
'params': {},
|
||||
'args': []}
|
||||
|
||||
def get_workflow_xml(self, cluster, job_configs, *args, **kwargs):
|
||||
job_dict = self.get_configs()
|
||||
self.update_job_dict(job_dict, job_configs)
|
||||
creator = shell_workflow.ShellWorkflowCreator()
|
||||
creator.build_workflow_xml(self.name,
|
||||
configuration=job_dict['configs'],
|
||||
env_vars=job_dict['params'],
|
||||
arguments=job_dict['args'],
|
||||
files=self.file_names)
|
||||
return creator.get_built_workflow_xml()
|
||||
|
||||
|
||||
def _get_creator(job):
|
||||
|
||||
def make_PigFactory():
|
||||
@ -251,12 +280,16 @@ def _get_creator(job):
|
||||
def make_HiveFactory():
|
||||
return HiveFactory(job)
|
||||
|
||||
def make_ShellFactory():
|
||||
return ShellFactory(job)
|
||||
|
||||
type_map = {
|
||||
edp.JOB_TYPE_HIVE: make_HiveFactory,
|
||||
edp.JOB_TYPE_JAVA: JavaFactory,
|
||||
edp.JOB_TYPE_MAPREDUCE: MapReduceFactory,
|
||||
edp.JOB_TYPE_MAPREDUCE_STREAMING: MapReduceFactory,
|
||||
edp.JOB_TYPE_PIG: make_PigFactory
|
||||
edp.JOB_TYPE_PIG: make_PigFactory,
|
||||
edp.JOB_TYPE_SHELL: make_ShellFactory
|
||||
}
|
||||
|
||||
return type_map[job.type]()
|
||||
@ -274,6 +307,9 @@ def get_possible_job_config(job_type):
|
||||
if edp.compare_job_type(job_type, edp.JOB_TYPE_JAVA):
|
||||
return {'job_config': {'configs': [], 'args': []}}
|
||||
|
||||
if edp.compare_job_type(job_type, edp.JOB_TYPE_SHELL):
|
||||
return {'job_config': {'configs': [], 'params': [], 'args': []}}
|
||||
|
||||
if edp.compare_job_type(job_type,
|
||||
edp.JOB_TYPE_MAPREDUCE, edp.JOB_TYPE_PIG):
|
||||
# TODO(nmakhotkin): Here we need return config based on specific plugin
|
||||
|
@ -77,7 +77,8 @@ def check_mains_libs(data, **kwargs):
|
||||
subtype == edp.JOB_SUBTYPE_STREAMING)
|
||||
|
||||
# These types must have a value in mains and may also use libs
|
||||
if job_type in [edp.JOB_TYPE_PIG, edp.JOB_TYPE_HIVE, edp.JOB_TYPE_SPARK]:
|
||||
if job_type in [edp.JOB_TYPE_PIG, edp.JOB_TYPE_HIVE,
|
||||
edp.JOB_TYPE_SHELL, edp.JOB_TYPE_SPARK]:
|
||||
if not mains:
|
||||
if job_type == edp.JOB_TYPE_SPARK:
|
||||
msg = _(
|
||||
|
@ -32,6 +32,7 @@ class EDPJobInfo(object):
|
||||
MAPREDUCE_PATH = 'etc/edp-examples/edp-mapreduce/'
|
||||
SPARK_PATH = 'etc/edp-examples/edp-spark/'
|
||||
HIVE_PATH = 'etc/edp-examples/edp-hive/'
|
||||
SHELL_PATH = 'etc/edp-examples/edp-shell/'
|
||||
|
||||
HADOOP2_JAVA_PATH = 'etc/edp-examples/hadoop2/edp-java/'
|
||||
|
||||
@ -98,6 +99,20 @@ class EDPJobInfo(object):
|
||||
}
|
||||
}
|
||||
|
||||
def read_shell_example_script(self):
|
||||
return open(self.SHELL_PATH + 'shell-example.sh').read()
|
||||
|
||||
def read_shell_example_text_file(self):
|
||||
return open(self.SHELL_PATH + 'shell-example.txt').read()
|
||||
|
||||
def shell_example_configs(self):
|
||||
return {
|
||||
"params": {
|
||||
"EXTRA_FILE": "*text"
|
||||
},
|
||||
"args": ["/tmp/edp-integration-shell-output.txt"]
|
||||
}
|
||||
|
||||
def read_spark_example_jar(self):
|
||||
return open(self.SPARK_PATH + 'spark-example.jar').read()
|
||||
|
||||
|
@ -302,6 +302,15 @@ class CDHGatingTest(check_services.CheckServicesTest,
|
||||
lib_data_list=[{'jar': java_jar}],
|
||||
configs=java_configs)
|
||||
|
||||
# check Shell
|
||||
shell_script_data = self.edp_info.read_shell_example_script()
|
||||
shell_file_data = self.edp_info.read_shell_example_text_file()
|
||||
yield self.edp_testing(
|
||||
job_type=utils_edp.JOB_TYPE_SHELL,
|
||||
job_data_list=[{'script': shell_script_data}],
|
||||
lib_data_list=[{'text': shell_file_data}],
|
||||
configs=self.edp_info.shell_example_configs())
|
||||
|
||||
@b.errormsg("Failure while check services testing: ")
|
||||
def _check_services(self):
|
||||
# check HBase
|
||||
|
@ -125,7 +125,6 @@ class HDP2GatingTest(swift.SwiftTest, scaling.ScalingTest,
|
||||
# check pig
|
||||
pig_job = self.edp_info.read_pig_example_script()
|
||||
pig_lib = self.edp_info.read_pig_example_jar()
|
||||
|
||||
yield self.edp_testing(
|
||||
job_type=utils_edp.JOB_TYPE_PIG,
|
||||
job_data_list=[{'pig': pig_job}],
|
||||
@ -160,6 +159,15 @@ class HDP2GatingTest(swift.SwiftTest, scaling.ScalingTest,
|
||||
lib_data_list=[{'jar': java_jar}],
|
||||
configs=java_configs)
|
||||
|
||||
# check shell
|
||||
shell_script_data = self.edp_info.read_shell_example_script()
|
||||
shell_file_data = self.edp_info.read_shell_example_text_file()
|
||||
yield self.edp_testing(
|
||||
job_type=utils_edp.JOB_TYPE_SHELL,
|
||||
job_data_list=[{'script': shell_script_data}],
|
||||
lib_data_list=[{'text': shell_file_data}],
|
||||
configs=self.edp_info.shell_example_configs())
|
||||
|
||||
@b.errormsg("Failure while cluster scaling: ")
|
||||
def _check_scaling(self):
|
||||
datanode_count_after_resizing = (
|
||||
|
@ -121,6 +121,9 @@ class HDPGatingTest(cinder.CinderVolumeTest, edp.EDPTest,
|
||||
|
||||
mapreduce_jar_data = self.edp_info.read_mapreduce_example_jar()
|
||||
|
||||
shell_script_data = self.edp_info.read_shell_example_script()
|
||||
shell_file_data = self.edp_info.read_shell_example_text_file()
|
||||
|
||||
# This is a modified version of WordCount that takes swift configs
|
||||
java_lib_data = self.edp_info.read_java_example_lib()
|
||||
|
||||
@ -156,8 +159,17 @@ class HDPGatingTest(cinder.CinderVolumeTest, edp.EDPTest,
|
||||
configs=self.edp_info.java_example_configs(),
|
||||
pass_input_output_args=True)
|
||||
job_ids.append(job_id)
|
||||
|
||||
job_id = self.edp_testing(
|
||||
job_type=utils_edp.JOB_TYPE_SHELL,
|
||||
job_data_list=[{'script': shell_script_data}],
|
||||
lib_data_list=[{'text': shell_file_data}],
|
||||
configs=self.edp_info.shell_example_configs())
|
||||
job_ids.append(job_id)
|
||||
|
||||
self.poll_jobs_status(job_ids)
|
||||
|
||||
|
||||
# -----------------------------MAP REDUCE TESTING------------------------------
|
||||
|
||||
self.map_reduce_testing(cluster_info)
|
||||
|
@ -185,6 +185,8 @@ class VanillaGatingTest(cinder.CinderVolumeTest,
|
||||
mapreduce_jar_data = self.edp_info.read_mapreduce_example_jar()
|
||||
# This is a modified version of WordCount that takes swift configs
|
||||
java_lib_data = self.edp_info.read_java_example_lib()
|
||||
shell_script_data = self.edp_info.read_shell_example_script()
|
||||
shell_file_data = self.edp_info.read_shell_example_text_file()
|
||||
|
||||
yield self.edp_testing(
|
||||
job_type=utils_edp.JOB_TYPE_PIG,
|
||||
@ -215,6 +217,12 @@ class VanillaGatingTest(cinder.CinderVolumeTest,
|
||||
configs=self.edp_info.java_example_configs(),
|
||||
pass_input_output_args=True)
|
||||
|
||||
yield self.edp_testing(
|
||||
job_type=utils_edp.JOB_TYPE_SHELL,
|
||||
job_data_list=[{'script': shell_script_data}],
|
||||
lib_data_list=[{'text': shell_file_data}],
|
||||
configs=self.edp_info.shell_example_configs())
|
||||
|
||||
@b.errormsg("Failure while EDP testing: ")
|
||||
def _check_edp(self):
|
||||
self.poll_jobs_status(list(self._run_edp_test()))
|
||||
|
@ -204,6 +204,8 @@ class VanillaTwoGatingTest(cluster_configs.ClusterConfigTest,
|
||||
yield self._edp_java_test()
|
||||
if utils_edp.JOB_TYPE_HIVE not in skipped_edp_job_types:
|
||||
yield self._check_edp_hive()
|
||||
if utils_edp.JOB_TYPE_SHELL not in skipped_edp_job_types:
|
||||
yield self._edp_shell_test()
|
||||
|
||||
# TODO(esikachev): Until fix bug 1413602
|
||||
def _run_edp_tests_after_scaling(self):
|
||||
@ -217,6 +219,8 @@ class VanillaTwoGatingTest(cluster_configs.ClusterConfigTest,
|
||||
yield self._edp_mapreduce_streaming_test()
|
||||
if utils_edp.JOB_TYPE_JAVA not in skipped_edp_job_types:
|
||||
yield self._edp_java_test()
|
||||
if utils_edp.JOB_TYPE_SHELL not in skipped_edp_job_types:
|
||||
yield self._edp_shell_test()
|
||||
|
||||
def _edp_pig_test(self):
|
||||
pig_job = self.edp_info.read_pig_example_script()
|
||||
@ -256,6 +260,15 @@ class VanillaTwoGatingTest(cluster_configs.ClusterConfigTest,
|
||||
lib_data_list=[{'jar': java_jar}],
|
||||
configs=java_configs)
|
||||
|
||||
def _edp_shell_test(self):
|
||||
shell_script_data = self.edp_info.read_shell_example_script()
|
||||
shell_file_data = self.edp_info.read_shell_example_text_file()
|
||||
return self.edp_testing(
|
||||
job_type=utils_edp.JOB_TYPE_SHELL,
|
||||
job_data_list=[{'script': shell_script_data}],
|
||||
lib_data_list=[{'text': shell_file_data}],
|
||||
configs=self.edp_info.shell_example_configs())
|
||||
|
||||
def _check_edp_hive(self):
|
||||
return self.check_edp_hive()
|
||||
|
||||
|
@ -40,13 +40,18 @@ class TestJSONApiExamplesV11(testtools.TestCase):
|
||||
def test_job_binaries(self):
|
||||
schema = job_binary.JOB_BINARY_SCHEMA
|
||||
path = self.EXAMPLES_PATH % 'job-binaries'
|
||||
formatter = self._formatter("job_binary_internal_id")
|
||||
formatter = self._formatter("job_binary_internal_id",
|
||||
"script_binary_internal_id",
|
||||
"text_binary_internal_id")
|
||||
self._test(schema, path, formatter)
|
||||
|
||||
def test_jobs(self):
|
||||
schema = job.JOB_SCHEMA
|
||||
path = self.EXAMPLES_PATH % 'jobs'
|
||||
formatter = self._formatter("job_binary_id", "udf_binary_id")
|
||||
formatter = self._formatter("job_binary_id",
|
||||
"udf_binary_id",
|
||||
"script_binary_id",
|
||||
"text_binary_id")
|
||||
self._test(schema, path, formatter)
|
||||
|
||||
def test_job_executions(self):
|
||||
|
@ -20,13 +20,14 @@ from sahara.service.edp.oozie.workflow_creator import hive_workflow as hw
|
||||
from sahara.service.edp.oozie.workflow_creator import java_workflow as jw
|
||||
from sahara.service.edp.oozie.workflow_creator import mapreduce_workflow as mrw
|
||||
from sahara.service.edp.oozie.workflow_creator import pig_workflow as pw
|
||||
from sahara.service.edp.oozie.workflow_creator import shell_workflow as shw
|
||||
from sahara.utils import patches as p
|
||||
|
||||
|
||||
class TestPigWorkflowCreator(testtools.TestCase):
|
||||
class TestWorkflowCreators(testtools.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(TestPigWorkflowCreator, self).setUp()
|
||||
super(TestWorkflowCreators, self).setUp()
|
||||
p.patch_minidom_writexml()
|
||||
self.prepare = {'delete': ['delete_dir_1', 'delete_dir_2'],
|
||||
'mkdir': ['mkdir_1']}
|
||||
@ -218,3 +219,47 @@ class TestPigWorkflowCreator(testtools.TestCase):
|
||||
</java>"""
|
||||
|
||||
self.assertIn(java_action, res)
|
||||
|
||||
def test_create_shell_workflow(self):
|
||||
shell_workflow = shw.ShellWorkflowCreator()
|
||||
main_class = 'doit.sh'
|
||||
args = ['now']
|
||||
env_vars = {"VERSION": 3}
|
||||
|
||||
shell_workflow.build_workflow_xml(main_class,
|
||||
self.prepare,
|
||||
self.job_xml,
|
||||
self.configuration,
|
||||
env_vars,
|
||||
args,
|
||||
self.files)
|
||||
|
||||
res = shell_workflow.get_built_workflow_xml()
|
||||
shell_action = """
|
||||
<shell xmlns="uri:oozie:shell-action:0.1">
|
||||
<job-tracker>${jobTracker}</job-tracker>
|
||||
<name-node>${nameNode}</name-node>
|
||||
<prepare>
|
||||
<delete path="delete_dir_1"/>
|
||||
<delete path="delete_dir_2"/>
|
||||
<mkdir path="mkdir_1"/>
|
||||
</prepare>
|
||||
<configuration>
|
||||
<property>
|
||||
<name>conf_param_1</name>
|
||||
<value>conf_value_1</value>
|
||||
</property>
|
||||
<property>
|
||||
<name>conf_param_2</name>
|
||||
<value>conf_value_3</value>
|
||||
</property>
|
||||
</configuration>
|
||||
<exec>doit.sh</exec>
|
||||
<argument>now</argument>
|
||||
<env-var>VERSION=3</env-var>
|
||||
<file>file1</file>
|
||||
<file>file2</file>
|
||||
<file>doit.sh</file>
|
||||
</shell>"""
|
||||
|
||||
self.assertIn(shell_action, res)
|
||||
|
@ -47,6 +47,8 @@ JOB_TYPE_SPARK = 'Spark'
|
||||
JOB_TYPE_MAPREDUCE_STREAMING = (JOB_TYPE_MAPREDUCE + JOB_TYPE_SEP +
|
||||
JOB_SUBTYPE_STREAMING)
|
||||
JOB_TYPE_PIG = 'Pig'
|
||||
JOB_TYPE_SHELL = 'Shell'
|
||||
|
||||
# job type groupings available
|
||||
JOB_TYPES_ALL = [
|
||||
JOB_TYPE_HIVE,
|
||||
@ -54,6 +56,7 @@ JOB_TYPES_ALL = [
|
||||
JOB_TYPE_MAPREDUCE,
|
||||
JOB_TYPE_MAPREDUCE_STREAMING,
|
||||
JOB_TYPE_PIG,
|
||||
JOB_TYPE_SHELL,
|
||||
JOB_TYPE_SPARK
|
||||
]
|
||||
|
||||
|
@ -162,6 +162,12 @@ def add_equal_separated_dict(doc, parent_tag, each_elem_tag, value):
|
||||
"%s=%s" % (k, value[k]))
|
||||
|
||||
|
||||
def add_attributes_to_element(doc, tag, attributes):
|
||||
element = doc.getElementsByTagName(tag)[0]
|
||||
for name, value in attributes.items():
|
||||
element.setAttribute(name, value)
|
||||
|
||||
|
||||
def add_tagged_list(doc, parent_tag, each_elem_tag, values):
|
||||
for v in values:
|
||||
add_text_element_to_tag(doc, parent_tag, each_elem_tag, v)
|
||||
|
Loading…
Reference in New Issue
Block a user