diff --git a/doc/source/userdoc/edp.rst b/doc/source/userdoc/edp.rst index c6e97c31..0c4eb659 100644 --- a/doc/source/userdoc/edp.rst +++ b/doc/source/userdoc/edp.rst @@ -6,7 +6,7 @@ Overview Sahara's Elastic Data Processing facility or :dfn:`EDP` allows the execution of jobs on clusters created from Sahara. EDP supports: -* Hive, Pig, MapReduce, MapReduce.Streaming and Java job types on Hadoop clusters +* Hive, Pig, MapReduce, MapReduce.Streaming, Java, and Shell job types on Hadoop clusters * Spark jobs on Spark standalone clusters * storage of job binaries in Swift or Sahara's own database * access to input and output data sources in @@ -68,6 +68,8 @@ A :dfn:`Job` object specifies the type of the job and lists all of the individua +-------------------------+-------------+-----------+ | ``Java`` | not used | required | +-------------------------+-------------+-----------+ + | ``Shell`` | required | optional | + +-------------------------+-------------+-----------+ | ``Spark`` | required | optional | +-------------------------+-------------+-----------+ @@ -135,6 +137,8 @@ Jobs can be configured at launch. The job type determines the kinds of values th +--------------------------+---------------+------------+-----------+ | ``Java`` | Yes | No | Yes | +--------------------------+---------------+------------+-----------+ + | ``Shell`` | Yes | Yes | Yes | + +--------------------------+---------------+------------+-----------+ | ``Spark`` | Yes | No | Yes | +--------------------------+---------------+------------+-----------+ @@ -144,7 +148,7 @@ Jobs can be configured at launch. The job type determines the kinds of values th + Other configuration values may be read at runtime by Hadoop jobs + Currently additional configuration values are not available to Spark jobs at runtime -* :dfn:`Parameters` are key/value pairs. They supply values for the Hive and Pig parameter substitution mechanisms. +* :dfn:`Parameters` are key/value pairs. They supply values for the Hive and Pig parameter substitution mechanisms. In Shell jobs, they are passed as environment variables. * :dfn:`Arguments` are strings passed as command line arguments to a shell or main program These values can be set on the :guilabel:`Configure` tab during job launch through the web UI or through the *job_configs* parameter when using the */jobs//execute* REST method. @@ -243,6 +247,17 @@ using one of the above two methods. Furthermore, if Swift data sources are used The ``edp-wordcount`` example bundled with Sahara shows how to use configuration values, arguments, and Swift data paths in a Java job type. +Additional Details for Shell jobs ++++++++++++++++++++++++++++++++++ + +A shell job will execute the script specified as ``main``, and will place any files specified as ``libs`` +in the same working directory (on both the filesystem and in HDFS). Command line arguments may be passed to +the script through the ``args`` array, and any ``params`` values will be passed as environment variables. + +Data Source objects are not used with Shell job types. + +The ``edp-shell`` example bundled with Sahara contains a script which will output the executing user to +a file specified by the first command line argument. Additional Details for Spark jobs +++++++++++++++++++++++++++++++++ diff --git a/etc/edp-examples/edp-shell/shell-example.sh b/etc/edp-examples/edp-shell/shell-example.sh new file mode 100644 index 00000000..cee6e3f9 --- /dev/null +++ b/etc/edp-examples/edp-shell/shell-example.sh @@ -0,0 +1,3 @@ +#!/bin/sh +cat $EXTRA_FILE > $1 +echo $USER >> $1 \ No newline at end of file diff --git a/etc/edp-examples/edp-shell/shell-example.txt b/etc/edp-examples/edp-shell/shell-example.txt new file mode 100644 index 00000000..fe9a086a --- /dev/null +++ b/etc/edp-examples/edp-shell/shell-example.txt @@ -0,0 +1 @@ +The user running this shell script is: \ No newline at end of file diff --git a/etc/edp-examples/json-api-examples/v1.1/README.rst b/etc/edp-examples/json-api-examples/v1.1/README.rst index b326f285..0d38df8b 100644 --- a/etc/edp-examples/json-api-examples/v1.1/README.rst +++ b/etc/edp-examples/json-api-examples/v1.1/README.rst @@ -15,7 +15,7 @@ with JSON payload examples, covering: * Binary storage in both Swift and the Sahara database, and * Job creation for Pig, Map/Reduce, Java, and Spark jobs. -Four example flows are provided: +Five example flows are provided: * A Pig job, using Swift for both data and binary storage. * A Map/Reduce job, using HDFS data sources registered in Sahara and Swift @@ -23,21 +23,33 @@ Four example flows are provided: * A Java job, using raw HDFS data paths and the Sahara database for binary storage. * A Spark job without data inputs, using Swift for binary storage. +* A shell job without data inputs, using the Sahara database for binary + storage. Many other combinations of data source storage, binary storage, and job type are possible. These examples are intended purely as a point of departure for modification and experimentation for any Sahara user who prefers a command-line interface to UI (or who intends to automate Sahara usage.) -A Note on Preconditions and Formatting --------------------------------------- +Notes +===== + +Formatting +---------- The json files provided make many assumptions, allowing the examples to be as literal as possible. However, where objects created by the flow must refer to one another's generated ids, Python dictionary-style is used. -A Note on Swift Credentials ---------------------------- +Oozie is Required for Hadoop +---------------------------- + +When the preconditions for a given example specify that you must have "an +active Hadoop cluster", that cluster must be running an Oozie process in all +cases, as Sahara's EDP jobs are scheduled through Oozie in all Hadoop plugins. + +Swift Credentials +----------------- For the sake of simplicity, these examples pass Swift credentials to the API when creating data sources, storing binaries, and executing jobs. Use of a @@ -46,8 +58,8 @@ store credentials. .. _Swift proxy: http://docs.openstack.org/developer/sahara/userdoc/advanced.configuration.guide.html -A Note for REST Users ---------------------- +Raw REST Usage +-------------- The CLI and Python Sahara client provide their own authentication mechanisms and endpoint discovery. If you wish to use the raw REST API, however, please @@ -67,7 +79,7 @@ Preconditions This example assumes the following: -1. Usage of an OpenStack user named "demo", with password "password." +1. Usage of an OpenStack user named "demo", with password "password". 2. An active Hadoop cluster exists in the demo user's project. 3. In the demo user's project, the following files are stored in Swift in the container ``edp-examples``, as follows: @@ -82,22 +94,22 @@ This example assumes the following: Steps ----- -1. **Input**: Post the payload at ``data-sources/create.swift-pig-input.json`` +1. **Input**: POST the payload at ``data-sources/create.swift-pig-input.json`` to your Sahara endpoint's ``data-sources`` path. Note the new object's id. -2. **Output**: Post the payload at +2. **Output**: POST the payload at ``data-sources/create.swift-pig-output.json`` to your Sahara endpoint's ``data-sources`` path. Note the new object's id. -3. **Script**: Post the payload at ``job-binaries/create.pig-job.json`` to +3. **Script**: POST the payload at ``job-binaries/create.pig-job.json`` to your Sahara endpoint's ``job-binaries`` path. Note the new object's id. -4. **UDF .jar**: Post the payload at ``job-binaries/create.pig-udf.json`` to +4. **UDF .jar**: POST the payload at ``job-binaries/create.pig-udf.json`` to your Sahara endpoint's ``job-binaries`` path. Note the new object's id. 5. **Job**: Insert the script binary id from step 3 and the UDF binary id from - step 4 into the payload at ``jobs/create.pig.json``. Then post this file to + step 4 into the payload at ``jobs/create.pig.json``. Then POST this file to your Sahara endpoint's ``jobs`` path. Note the new object's id. 6. **Job Execution**: Insert your Hadoop cluster id, the input id from step 1, and the output id from step 2 into the payload at - ``job-executions/execute.pig.json``. Then post this file to your Sahara + ``job-executions/execute.pig.json``. Then POST this file to your Sahara endpoint at path ``jobs/{job id from step 5}/execute``. Note @@ -115,7 +127,7 @@ Preconditions This example assumes the following: -1. Usage of an OpenStack user named "demo", with password "password." +1. Usage of an OpenStack user named "demo", with password "password". 2. An active Hadoop cluster exists in the demo user's project, with the master node's HDFS available at URL ``hdfs://hadoop-cluster-master-001:8020/``. @@ -128,20 +140,20 @@ This example assumes the following: Steps ----- -1. **Input**: Post the payload at +1. **Input**: POST the payload at ``data-sources/create.hdfs-map-reduce-input.json`` to your Sahara endpoint's ``data-sources`` path. Note the new object's id. -2. **Output**: Post the payload at +2. **Output**: POST the payload at ``data-sources/create.hdfs-map-reduce-output.json`` to your Sahara endpoint's ``data-sources`` path. Note the new object's id. -3. **Binary**: Post the payload at ``job-binaries/create.map-reduce.json`` to +3. **Binary**: POST the payload at ``job-binaries/create.map-reduce.json`` to your Sahara endpoint's ``job-binaries`` path. Note the new object's id. 4. **Job**: Insert the binary id from step 3 into the payload at - ``jobs/create.map-reduce.json``. Then post this file to your Sahara + ``jobs/create.map-reduce.json``. Then POST this file to your Sahara endpoint's ``jobs`` path. Note the new object's id. 5. **Job Execution**: Insert your Hadoop cluster id, the input id from step 1, and the output id from step 2 into the payload at - ``job-executions/execute.map-reduce.json``. Then post this file to your + ``job-executions/execute.map-reduce.json``. Then POST this file to your Sahara endpoint at path ``jobs/{job id from step 4}/execute``. @@ -153,7 +165,7 @@ Preconditions This example assumes the following: -1. Usage of an OpenStack user named "demo", with password "password." +1. Usage of an OpenStack user named "demo", with password "password". 2. An active Hadoop cluster exists in the demo user's project, with the master node's HDFS available at URL ``hdfs://hadoop-cluster-master-001:8020/``. @@ -163,17 +175,17 @@ This example assumes the following: Steps ----- -1. **Internal Job Binary**: Put the file at +1. **Internal Job Binary**: PUT the file at ``edp-examples/edp-java/edp-java.jar`` into your Sahara endpoint at path ``job-binary-internals/edp-java.jar``. Note the new object's id. 2. **Job Binary**: Insert the internal job binary id from step 1 into the - payload at ``job-binaries/create.java.json``. Then post this file to your + payload at ``job-binaries/create.java.json``. Then POST this file to your Sahara endpoint's ``job-binaries`` path. Note the new object's id. 3. **Job**: Insert the binary id from step 2 into the payload at - ``jobs/create.java.json``. Then post this file to your Sahara endpoint's + ``jobs/create.java.json``. Then POST this file to your Sahara endpoint's ``jobs`` path. Note the new object's id. 4. **Job Execution**: Insert your Hadoop cluster id into the payload at - ``job-executions/execute.java.json``. Then post this file to your Sahara + ``job-executions/execute.java.json``. Then POST this file to your Sahara endpoint at path ``jobs/{job id from step 3}/execute``. @@ -185,7 +197,7 @@ Preconditions This example assumes the following: -1. Usage of an OpenStack user named "demo", with password "password." +1. Usage of an OpenStack user named "demo", with password "password". 2. An active Spark cluster exists in the demo user's project. 3. In the demo user's project, the file at ``edp-examples/edp-spark/spark-example.jar`` is stored in Swift in the @@ -194,13 +206,13 @@ This example assumes the following: Steps ----- -1. **Job Binary**: Post the payload at ``job-binaries/create.spark.json`` +1. **Job Binary**: POST the payload at ``job-binaries/create.spark.json`` to your Sahara endpoint's ``job-binaries`` path. Note the new object's id. 2. **Job**: Insert the binary id from step 1 into the payload at - ``jobs/create.spark.json``. Then post this file to your Sahara endpoint's + ``jobs/create.spark.json``. Then POST this file to your Sahara endpoint's ``jobs`` path. Note the new object's id. 3. **Job Execution**: Insert your Spark cluster id into the payload at - ``job-executions/execute.spark.json``. Then post this file to your Sahara + ``job-executions/execute.spark.json``. Then POST this file to your Sahara endpoint at path ``jobs/{job id from step 2}/execute``. Note @@ -208,3 +220,37 @@ Note Spark jobs can use additional library binaries, but none are needed for the example job. + + +Example 5: Shell script, using the Sahara DB +============================================ + +Preconditions +------------- + +This example assumes the following: + +1. Usage of an OpenStack user named "demo", with password "password". +2. An active Hadoop cluster exists in the demo user's project. + +Steps +----- + +1. **Script File**: PUT the file at + ``edp-examples/edp-shell/shell-example.sh`` into your Sahara endpoint at + path ``job-binary-internals/shell-example.sh``. Note the new object's id. +2. **Text File**: PUT the file at + ``edp-examples/edp-shell/shell-example.txt`` into your Sahara endpoint at + path ``job-binary-internals/shell-example.txt``. Note the new object's id. +3. **Script Binary**: Insert the script file's id from step 1 into the payload + at ``job-binaries/create.shell-script.json``. Then POST this file to your + Sahara endpoint's ``job-binaries`` path. Note the new object's id. +4. **Text Binary**: Insert the text file's id from step 2 into the payload + at ``job-binaries/create.shell-text.json``. Then POST this file to your + Sahara endpoint's ``job-binaries`` path. Note the new object's id. +5. **Job**: Insert the binary ids from steps 3 and 4 into the payload at + ``jobs/create.shell.json``. Then POST this file to your Sahara endpoint's + ``jobs`` path. Note the new object's id. +6. **Job Execution**: Insert your Hadoop cluster id into the payload at + ``job-executions/execute.java.json``. Then POST this file to your Sahara + endpoint at path ``jobs/{job id from step 5}/execute``. diff --git a/etc/edp-examples/json-api-examples/v1.1/job-binaries/create.shell-script.json b/etc/edp-examples/json-api-examples/v1.1/job-binaries/create.shell-script.json new file mode 100644 index 00000000..487dec0a --- /dev/null +++ b/etc/edp-examples/json-api-examples/v1.1/job-binaries/create.shell-script.json @@ -0,0 +1,6 @@ +{ + "name": "shell-example.sh", + "description": "An example shell script", + "url": "internal-db://%(script_binary_internal_id)s", + "extra": {} +} diff --git a/etc/edp-examples/json-api-examples/v1.1/job-binaries/create.shell-text.json b/etc/edp-examples/json-api-examples/v1.1/job-binaries/create.shell-text.json new file mode 100644 index 00000000..d53c6fba --- /dev/null +++ b/etc/edp-examples/json-api-examples/v1.1/job-binaries/create.shell-text.json @@ -0,0 +1,6 @@ +{ + "name": "shell-example.txt", + "description": "An example text file", + "url": "internal-db://%(text_binary_internal_id)s", + "extra": {} +} diff --git a/etc/edp-examples/json-api-examples/v1.1/job-executions/execute.shell.json b/etc/edp-examples/json-api-examples/v1.1/job-executions/execute.shell.json new file mode 100644 index 00000000..7f939a3d --- /dev/null +++ b/etc/edp-examples/json-api-examples/v1.1/job-executions/execute.shell.json @@ -0,0 +1,8 @@ +{ + "cluster_id": "%(cluster_id)s", + "job_configs": { + "configs": {}, + "args": ["/tmp/edp-shell-example-output.txt"], + "params": {"EXTRA_FILE": "shell-example.txt"} + } +} diff --git a/etc/edp-examples/json-api-examples/v1.1/jobs/create.shell.json b/etc/edp-examples/json-api-examples/v1.1/jobs/create.shell.json new file mode 100644 index 00000000..747cee3e --- /dev/null +++ b/etc/edp-examples/json-api-examples/v1.1/jobs/create.shell.json @@ -0,0 +1,7 @@ +{ + "name": "demo-shell-job", + "type": "Shell", + "description": "A runnable Shell job", + "mains": ["%(script_binary_id)s"], + "libs": ["%(text_binary_id)s"] +} diff --git a/sahara/conductor/api.py b/sahara/conductor/api.py index ea530c20..d6e66e89 100644 --- a/sahara/conductor/api.py +++ b/sahara/conductor/api.py @@ -363,6 +363,15 @@ class LocalApi(object): return binary["name"] return None + def job_lib_names(self, context, job): + """Return the name of all job lib binaries or an empty list. + + :param job: This is expected to be a Job object + """ + lib_ids = job.libs or [] + binaries = (self.job_binary_get(context, lib_id) for lib_id in lib_ids) + return [binary["name"] for binary in binaries if binary is not None] + # JobBinary ops @r.wrap(r.JobBinary) diff --git a/sahara/service/edp/oozie/engine.py b/sahara/service/edp/oozie/engine.py index 67d15e69..33277003 100644 --- a/sahara/service/edp/oozie/engine.py +++ b/sahara/service/edp/oozie/engine.py @@ -14,6 +14,7 @@ # limitations under the License. import abc +import os import uuid from oslo_config import cfg @@ -177,9 +178,12 @@ class OozieJobEngine(base_engine.JobEngine): pass def validate_job_execution(self, cluster, job, data): - # All types except Java require input and output objects - # and Java require main class - if job.type in [edp.JOB_TYPE_JAVA]: + # Shell job type requires no specific fields + if job.type == edp.JOB_TYPE_SHELL: + return + # All other types except Java require input and output + # objects and Java require main class + if job.type == edp.JOB_TYPE_JAVA: j.check_main_class_present(data, job) else: j.check_data_sources(data, job) @@ -199,7 +203,8 @@ class OozieJobEngine(base_engine.JobEngine): edp.JOB_TYPE_JAVA, edp.JOB_TYPE_MAPREDUCE, edp.JOB_TYPE_MAPREDUCE_STREAMING, - edp.JOB_TYPE_PIG] + edp.JOB_TYPE_PIG, + edp.JOB_TYPE_SHELL] def _upload_job_files_to_hdfs(self, where, job_dir, job, configs, proxy_configs=None): @@ -208,14 +213,15 @@ class OozieJobEngine(base_engine.JobEngine): builtin_libs = edp.get_builtin_binaries(job, configs) uploaded_paths = [] hdfs_user = self.get_hdfs_user() - lib_dir = job_dir + '/lib' + job_dir_suffix = 'lib' if job.type != edp.JOB_TYPE_SHELL else '' + lib_dir = os.path.join(job_dir, job_dir_suffix) with remote.get_remote(where) as r: for main in mains: raw_data = dispatch.get_raw_binary(main, proxy_configs) h.put_file_to_hdfs(r, raw_data, main.name, job_dir, hdfs_user) uploaded_paths.append(job_dir + '/' + main.name) - if len(libs) > 0: + if len(libs) and job_dir_suffix: # HDFS 2.2.0 fails to put file if the lib dir does not exist self.create_hdfs_dir(r, lib_dir) for lib in libs: diff --git a/sahara/service/edp/oozie/workflow_creator/shell_workflow.py b/sahara/service/edp/oozie/workflow_creator/shell_workflow.py new file mode 100644 index 00000000..91dfd281 --- /dev/null +++ b/sahara/service/edp/oozie/workflow_creator/shell_workflow.py @@ -0,0 +1,45 @@ +# Copyright (c) 2015 Red Hat Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from sahara.service.edp.oozie.workflow_creator import base_workflow +from sahara.utils import xmlutils as x + + +class ShellWorkflowCreator(base_workflow.OozieWorkflowCreator): + + SHELL_XMLNS = {"xmlns": "uri:oozie:shell-action:0.1"} + + def __init__(self): + super(ShellWorkflowCreator, self).__init__('shell') + + def build_workflow_xml(self, script_name, prepare={}, + job_xml=None, configuration=None, env_vars={}, + arguments=[], files=[]): + x.add_attributes_to_element(self.doc, self.tag_name, self.SHELL_XMLNS) + + for k in sorted(prepare): + self._add_to_prepare_element(k, prepare[k]) + + self._add_configuration_elements(configuration) + + x.add_text_element_to_tag(self.doc, self.tag_name, 'exec', script_name) + + for arg in arguments: + x.add_text_element_to_tag(self.doc, self.tag_name, 'argument', arg) + + x.add_equal_separated_dict(self.doc, self.tag_name, + 'env-var', env_vars) + + self._add_files_and_archives(files + [script_name], []) diff --git a/sahara/service/edp/oozie/workflow_creator/workflow_factory.py b/sahara/service/edp/oozie/workflow_creator/workflow_factory.py index 24f214e1..6db3fc89 100644 --- a/sahara/service/edp/oozie/workflow_creator/workflow_factory.py +++ b/sahara/service/edp/oozie/workflow_creator/workflow_factory.py @@ -22,6 +22,7 @@ from sahara.service.edp.oozie.workflow_creator import hive_workflow from sahara.service.edp.oozie.workflow_creator import java_workflow from sahara.service.edp.oozie.workflow_creator import mapreduce_workflow from sahara.service.edp.oozie.workflow_creator import pig_workflow +from sahara.service.edp.oozie.workflow_creator import shell_workflow from sahara.swift import swift_helper as sw from sahara.swift import utils as su from sahara.utils import edp @@ -243,6 +244,34 @@ class JavaFactory(BaseFactory): return creator.get_built_workflow_xml() +class ShellFactory(BaseFactory): + + def __init__(self, job): + super(ShellFactory, self).__init__() + self.name, self.file_names = self.get_file_names(job) + + def get_file_names(self, job): + ctx = context.ctx() + return (conductor.job_main_name(ctx, job), + conductor.job_lib_names(ctx, job)) + + def get_configs(self): + return {'configs': {}, + 'params': {}, + 'args': []} + + def get_workflow_xml(self, cluster, job_configs, *args, **kwargs): + job_dict = self.get_configs() + self.update_job_dict(job_dict, job_configs) + creator = shell_workflow.ShellWorkflowCreator() + creator.build_workflow_xml(self.name, + configuration=job_dict['configs'], + env_vars=job_dict['params'], + arguments=job_dict['args'], + files=self.file_names) + return creator.get_built_workflow_xml() + + def _get_creator(job): def make_PigFactory(): @@ -251,12 +280,16 @@ def _get_creator(job): def make_HiveFactory(): return HiveFactory(job) + def make_ShellFactory(): + return ShellFactory(job) + type_map = { edp.JOB_TYPE_HIVE: make_HiveFactory, edp.JOB_TYPE_JAVA: JavaFactory, edp.JOB_TYPE_MAPREDUCE: MapReduceFactory, edp.JOB_TYPE_MAPREDUCE_STREAMING: MapReduceFactory, - edp.JOB_TYPE_PIG: make_PigFactory + edp.JOB_TYPE_PIG: make_PigFactory, + edp.JOB_TYPE_SHELL: make_ShellFactory } return type_map[job.type]() @@ -274,6 +307,9 @@ def get_possible_job_config(job_type): if edp.compare_job_type(job_type, edp.JOB_TYPE_JAVA): return {'job_config': {'configs': [], 'args': []}} + if edp.compare_job_type(job_type, edp.JOB_TYPE_SHELL): + return {'job_config': {'configs': [], 'params': [], 'args': []}} + if edp.compare_job_type(job_type, edp.JOB_TYPE_MAPREDUCE, edp.JOB_TYPE_PIG): # TODO(nmakhotkin): Here we need return config based on specific plugin diff --git a/sahara/service/validations/edp/job.py b/sahara/service/validations/edp/job.py index ad03ccb9..3d51c967 100644 --- a/sahara/service/validations/edp/job.py +++ b/sahara/service/validations/edp/job.py @@ -77,7 +77,8 @@ def check_mains_libs(data, **kwargs): subtype == edp.JOB_SUBTYPE_STREAMING) # These types must have a value in mains and may also use libs - if job_type in [edp.JOB_TYPE_PIG, edp.JOB_TYPE_HIVE, edp.JOB_TYPE_SPARK]: + if job_type in [edp.JOB_TYPE_PIG, edp.JOB_TYPE_HIVE, + edp.JOB_TYPE_SHELL, edp.JOB_TYPE_SPARK]: if not mains: if job_type == edp.JOB_TYPE_SPARK: msg = _( diff --git a/sahara/tests/integration/tests/edp.py b/sahara/tests/integration/tests/edp.py index e6c96227..f54bc6bd 100644 --- a/sahara/tests/integration/tests/edp.py +++ b/sahara/tests/integration/tests/edp.py @@ -32,6 +32,7 @@ class EDPJobInfo(object): MAPREDUCE_PATH = 'etc/edp-examples/edp-mapreduce/' SPARK_PATH = 'etc/edp-examples/edp-spark/' HIVE_PATH = 'etc/edp-examples/edp-hive/' + SHELL_PATH = 'etc/edp-examples/edp-shell/' HADOOP2_JAVA_PATH = 'etc/edp-examples/hadoop2/edp-java/' @@ -98,6 +99,20 @@ class EDPJobInfo(object): } } + def read_shell_example_script(self): + return open(self.SHELL_PATH + 'shell-example.sh').read() + + def read_shell_example_text_file(self): + return open(self.SHELL_PATH + 'shell-example.txt').read() + + def shell_example_configs(self): + return { + "params": { + "EXTRA_FILE": "*text" + }, + "args": ["/tmp/edp-integration-shell-output.txt"] + } + def read_spark_example_jar(self): return open(self.SPARK_PATH + 'spark-example.jar').read() diff --git a/sahara/tests/integration/tests/gating/test_cdh_gating.py b/sahara/tests/integration/tests/gating/test_cdh_gating.py index 31b96664..6b952b92 100644 --- a/sahara/tests/integration/tests/gating/test_cdh_gating.py +++ b/sahara/tests/integration/tests/gating/test_cdh_gating.py @@ -302,6 +302,15 @@ class CDHGatingTest(check_services.CheckServicesTest, lib_data_list=[{'jar': java_jar}], configs=java_configs) + # check Shell + shell_script_data = self.edp_info.read_shell_example_script() + shell_file_data = self.edp_info.read_shell_example_text_file() + yield self.edp_testing( + job_type=utils_edp.JOB_TYPE_SHELL, + job_data_list=[{'script': shell_script_data}], + lib_data_list=[{'text': shell_file_data}], + configs=self.edp_info.shell_example_configs()) + @b.errormsg("Failure while check services testing: ") def _check_services(self): # check HBase diff --git a/sahara/tests/integration/tests/gating/test_hdp2_gating.py b/sahara/tests/integration/tests/gating/test_hdp2_gating.py index 1423da5a..d71b98d7 100644 --- a/sahara/tests/integration/tests/gating/test_hdp2_gating.py +++ b/sahara/tests/integration/tests/gating/test_hdp2_gating.py @@ -125,7 +125,6 @@ class HDP2GatingTest(swift.SwiftTest, scaling.ScalingTest, # check pig pig_job = self.edp_info.read_pig_example_script() pig_lib = self.edp_info.read_pig_example_jar() - yield self.edp_testing( job_type=utils_edp.JOB_TYPE_PIG, job_data_list=[{'pig': pig_job}], @@ -160,6 +159,15 @@ class HDP2GatingTest(swift.SwiftTest, scaling.ScalingTest, lib_data_list=[{'jar': java_jar}], configs=java_configs) + # check shell + shell_script_data = self.edp_info.read_shell_example_script() + shell_file_data = self.edp_info.read_shell_example_text_file() + yield self.edp_testing( + job_type=utils_edp.JOB_TYPE_SHELL, + job_data_list=[{'script': shell_script_data}], + lib_data_list=[{'text': shell_file_data}], + configs=self.edp_info.shell_example_configs()) + @b.errormsg("Failure while cluster scaling: ") def _check_scaling(self): datanode_count_after_resizing = ( diff --git a/sahara/tests/integration/tests/gating/test_hdp_gating.py b/sahara/tests/integration/tests/gating/test_hdp_gating.py index 4bddd0ec..28708e99 100644 --- a/sahara/tests/integration/tests/gating/test_hdp_gating.py +++ b/sahara/tests/integration/tests/gating/test_hdp_gating.py @@ -121,6 +121,9 @@ class HDPGatingTest(cinder.CinderVolumeTest, edp.EDPTest, mapreduce_jar_data = self.edp_info.read_mapreduce_example_jar() + shell_script_data = self.edp_info.read_shell_example_script() + shell_file_data = self.edp_info.read_shell_example_text_file() + # This is a modified version of WordCount that takes swift configs java_lib_data = self.edp_info.read_java_example_lib() @@ -156,8 +159,17 @@ class HDPGatingTest(cinder.CinderVolumeTest, edp.EDPTest, configs=self.edp_info.java_example_configs(), pass_input_output_args=True) job_ids.append(job_id) + + job_id = self.edp_testing( + job_type=utils_edp.JOB_TYPE_SHELL, + job_data_list=[{'script': shell_script_data}], + lib_data_list=[{'text': shell_file_data}], + configs=self.edp_info.shell_example_configs()) + job_ids.append(job_id) + self.poll_jobs_status(job_ids) + # -----------------------------MAP REDUCE TESTING------------------------------ self.map_reduce_testing(cluster_info) diff --git a/sahara/tests/integration/tests/gating/test_vanilla_gating.py b/sahara/tests/integration/tests/gating/test_vanilla_gating.py index 82b17663..82dca987 100644 --- a/sahara/tests/integration/tests/gating/test_vanilla_gating.py +++ b/sahara/tests/integration/tests/gating/test_vanilla_gating.py @@ -185,6 +185,8 @@ class VanillaGatingTest(cinder.CinderVolumeTest, mapreduce_jar_data = self.edp_info.read_mapreduce_example_jar() # This is a modified version of WordCount that takes swift configs java_lib_data = self.edp_info.read_java_example_lib() + shell_script_data = self.edp_info.read_shell_example_script() + shell_file_data = self.edp_info.read_shell_example_text_file() yield self.edp_testing( job_type=utils_edp.JOB_TYPE_PIG, @@ -215,6 +217,12 @@ class VanillaGatingTest(cinder.CinderVolumeTest, configs=self.edp_info.java_example_configs(), pass_input_output_args=True) + yield self.edp_testing( + job_type=utils_edp.JOB_TYPE_SHELL, + job_data_list=[{'script': shell_script_data}], + lib_data_list=[{'text': shell_file_data}], + configs=self.edp_info.shell_example_configs()) + @b.errormsg("Failure while EDP testing: ") def _check_edp(self): self.poll_jobs_status(list(self._run_edp_test())) diff --git a/sahara/tests/integration/tests/gating/test_vanilla_two_gating.py b/sahara/tests/integration/tests/gating/test_vanilla_two_gating.py index d0dceff1..724e94ca 100644 --- a/sahara/tests/integration/tests/gating/test_vanilla_two_gating.py +++ b/sahara/tests/integration/tests/gating/test_vanilla_two_gating.py @@ -204,6 +204,8 @@ class VanillaTwoGatingTest(cluster_configs.ClusterConfigTest, yield self._edp_java_test() if utils_edp.JOB_TYPE_HIVE not in skipped_edp_job_types: yield self._check_edp_hive() + if utils_edp.JOB_TYPE_SHELL not in skipped_edp_job_types: + yield self._edp_shell_test() # TODO(esikachev): Until fix bug 1413602 def _run_edp_tests_after_scaling(self): @@ -217,6 +219,8 @@ class VanillaTwoGatingTest(cluster_configs.ClusterConfigTest, yield self._edp_mapreduce_streaming_test() if utils_edp.JOB_TYPE_JAVA not in skipped_edp_job_types: yield self._edp_java_test() + if utils_edp.JOB_TYPE_SHELL not in skipped_edp_job_types: + yield self._edp_shell_test() def _edp_pig_test(self): pig_job = self.edp_info.read_pig_example_script() @@ -256,6 +260,15 @@ class VanillaTwoGatingTest(cluster_configs.ClusterConfigTest, lib_data_list=[{'jar': java_jar}], configs=java_configs) + def _edp_shell_test(self): + shell_script_data = self.edp_info.read_shell_example_script() + shell_file_data = self.edp_info.read_shell_example_text_file() + return self.edp_testing( + job_type=utils_edp.JOB_TYPE_SHELL, + job_data_list=[{'script': shell_script_data}], + lib_data_list=[{'text': shell_file_data}], + configs=self.edp_info.shell_example_configs()) + def _check_edp_hive(self): return self.check_edp_hive() diff --git a/sahara/tests/unit/service/edp/test_json_api_examples.py b/sahara/tests/unit/service/edp/test_json_api_examples.py index ed769a0d..6991377b 100644 --- a/sahara/tests/unit/service/edp/test_json_api_examples.py +++ b/sahara/tests/unit/service/edp/test_json_api_examples.py @@ -40,13 +40,18 @@ class TestJSONApiExamplesV11(testtools.TestCase): def test_job_binaries(self): schema = job_binary.JOB_BINARY_SCHEMA path = self.EXAMPLES_PATH % 'job-binaries' - formatter = self._formatter("job_binary_internal_id") + formatter = self._formatter("job_binary_internal_id", + "script_binary_internal_id", + "text_binary_internal_id") self._test(schema, path, formatter) def test_jobs(self): schema = job.JOB_SCHEMA path = self.EXAMPLES_PATH % 'jobs' - formatter = self._formatter("job_binary_id", "udf_binary_id") + formatter = self._formatter("job_binary_id", + "udf_binary_id", + "script_binary_id", + "text_binary_id") self._test(schema, path, formatter) def test_job_executions(self): diff --git a/sahara/tests/unit/service/edp/workflow_creator/test_create_workflow.py b/sahara/tests/unit/service/edp/workflow_creator/test_create_workflow.py index 8572420f..e23d0d3e 100644 --- a/sahara/tests/unit/service/edp/workflow_creator/test_create_workflow.py +++ b/sahara/tests/unit/service/edp/workflow_creator/test_create_workflow.py @@ -20,13 +20,14 @@ from sahara.service.edp.oozie.workflow_creator import hive_workflow as hw from sahara.service.edp.oozie.workflow_creator import java_workflow as jw from sahara.service.edp.oozie.workflow_creator import mapreduce_workflow as mrw from sahara.service.edp.oozie.workflow_creator import pig_workflow as pw +from sahara.service.edp.oozie.workflow_creator import shell_workflow as shw from sahara.utils import patches as p -class TestPigWorkflowCreator(testtools.TestCase): +class TestWorkflowCreators(testtools.TestCase): def setUp(self): - super(TestPigWorkflowCreator, self).setUp() + super(TestWorkflowCreators, self).setUp() p.patch_minidom_writexml() self.prepare = {'delete': ['delete_dir_1', 'delete_dir_2'], 'mkdir': ['mkdir_1']} @@ -218,3 +219,47 @@ class TestPigWorkflowCreator(testtools.TestCase): """ self.assertIn(java_action, res) + + def test_create_shell_workflow(self): + shell_workflow = shw.ShellWorkflowCreator() + main_class = 'doit.sh' + args = ['now'] + env_vars = {"VERSION": 3} + + shell_workflow.build_workflow_xml(main_class, + self.prepare, + self.job_xml, + self.configuration, + env_vars, + args, + self.files) + + res = shell_workflow.get_built_workflow_xml() + shell_action = """ + + ${jobTracker} + ${nameNode} + + + + + + + + conf_param_1 + conf_value_1 + + + conf_param_2 + conf_value_3 + + + doit.sh + now + VERSION=3 + file1 + file2 + doit.sh + """ + + self.assertIn(shell_action, res) diff --git a/sahara/utils/edp.py b/sahara/utils/edp.py index 3daf546e..effe3e92 100644 --- a/sahara/utils/edp.py +++ b/sahara/utils/edp.py @@ -47,6 +47,8 @@ JOB_TYPE_SPARK = 'Spark' JOB_TYPE_MAPREDUCE_STREAMING = (JOB_TYPE_MAPREDUCE + JOB_TYPE_SEP + JOB_SUBTYPE_STREAMING) JOB_TYPE_PIG = 'Pig' +JOB_TYPE_SHELL = 'Shell' + # job type groupings available JOB_TYPES_ALL = [ JOB_TYPE_HIVE, @@ -54,6 +56,7 @@ JOB_TYPES_ALL = [ JOB_TYPE_MAPREDUCE, JOB_TYPE_MAPREDUCE_STREAMING, JOB_TYPE_PIG, + JOB_TYPE_SHELL, JOB_TYPE_SPARK ] diff --git a/sahara/utils/xmlutils.py b/sahara/utils/xmlutils.py index 5ee5d75f..36060283 100644 --- a/sahara/utils/xmlutils.py +++ b/sahara/utils/xmlutils.py @@ -162,6 +162,12 @@ def add_equal_separated_dict(doc, parent_tag, each_elem_tag, value): "%s=%s" % (k, value[k])) +def add_attributes_to_element(doc, tag, attributes): + element = doc.getElementsByTagName(tag)[0] + for name, value in attributes.items(): + element.setAttribute(name, value) + + def add_tagged_list(doc, parent_tag, each_elem_tag, values): for v in values: add_text_element_to_tag(doc, parent_tag, each_elem_tag, v)