diff --git a/saharaclient/tests/integration/configs/config.py b/saharaclient/tests/integration/configs/config.py index aa92c7a7..e9952ac4 100644 --- a/saharaclient/tests/integration/configs/config.py +++ b/saharaclient/tests/integration/configs/config.py @@ -86,11 +86,24 @@ COMMON_CONFIG_OPTS = [ 'INTERNAL_NEUTRON_NETWORK.'), cfg.StrOpt('INTERNAL_NEUTRON_NETWORK', default='private', - help='Name for internal Neutron network.') + help='Name for internal Neutron network.'), + cfg.IntOpt('JOB_LAUNCH_TIMEOUT', + default=10, + help='Job launch timeout (in minutes); ' + 'minimal value is 1.'), + cfg.BoolOpt('INTERNAL_JOB_BINARIES', + default=True, + help='Store job binary data in the sahara ' + 'internal database. If this option is set to ' + 'False, job binary data will be stored in swift.'), + cfg.StrOpt('CLUSTER_NAME', + default='test', + help='Name for cluster.') ] -def general_cluster_config_opts(): +def general_cluster_config_opts(plugin_text, plugin_name, hadoop_ver, + skip_all=False): return [ cfg.StrOpt('EXISTING_CLUSTER_ID', help='The id of an existing active cluster ' @@ -122,54 +135,40 @@ def general_cluster_config_opts(): 'chosen by the tag "sahara_i_tests".'), cfg.StrOpt('SSH_USERNAME', help='Username used to log into a cluster node via SSH.'), - cfg.BoolOpt('SKIP_CLUSTER_TEARDOWN', - default=False, - help='Skip tearing down the cluster. If an existing ' - 'cluster is used it will never be torn down by the test.') - ] - - -def skip_config_opts(plugin, skip_all=False): - return [ + cfg.StrOpt('HADOOP_VERSION', + default='%s' % hadoop_ver, + help='Version of Hadoop'), + cfg.StrOpt('PLUGIN_NAME', + default='%s' % plugin_name, + help='Name of plugin'), cfg.BoolOpt('SKIP_ALL_TESTS_FOR_PLUGIN', default=skip_all, help='If this flag is True, then all tests for the %s ' - 'plugin will be skipped.' % plugin) + 'plugin will be skipped.' % plugin_text), + cfg.BoolOpt('SKIP_CLUSTER_TEARDOWN', + default=False, + help='Skip tearing down the cluster. If an existing ' + 'cluster is used it will never be torn down by the test.'), + cfg.BoolOpt('SKIP_JAVA_EDP_TEST', default=False), + cfg.BoolOpt('SKIP_MAPREDUCE_EDP_TEST', default=False), + cfg.BoolOpt('SKIP_MAPREDUCE_STREAMING_EDP_TEST', default=False), + cfg.BoolOpt('SKIP_PIG_EDP_TEST', default=False) ] VANILLA_CONFIG_GROUP = cfg.OptGroup(name='VANILLA') -VANILLA_CONFIG_OPTS = skip_config_opts( - "Vanilla") + general_cluster_config_opts() + [ - cfg.StrOpt('HADOOP_VERSION', - default="1.2.1", - help='Version of Hadoop'), - cfg.StrOpt('PLUGIN_NAME', - default='vanilla', - help='Name of plugin') - ] +VANILLA_CONFIG_OPTS = general_cluster_config_opts("Vanilla", + "vanilla", "1.2.1") VANILLA2_CONFIG_GROUP = cfg.OptGroup(name='VANILLA2') -VANILLA2_CONFIG_OPTS = skip_config_opts( - "Vanilla2", skip_all=True) + general_cluster_config_opts() + [ - cfg.StrOpt('HADOOP_VERSION', - default="2.3.0", - help='Version of Hadoop'), - cfg.StrOpt('PLUGIN_NAME', - default='vanilla', - help='Name of plugin') - ] +VANILLA2_CONFIG_OPTS = general_cluster_config_opts("Vanilla2", + "vanilla", "2.3.0", + skip_all=True) HDP_CONFIG_GROUP = cfg.OptGroup(name='HDP') -HDP_CONFIG_OPTS = skip_config_opts( - "HDP", skip_all=True) + general_cluster_config_opts() + [ - cfg.StrOpt('HADOOP_VERSION', - default="1.3.2", - help='Version of Hadoop'), - cfg.StrOpt('PLUGIN_NAME', - default='hdp', - help='Name of plugin') - ] +HDP_CONFIG_OPTS = general_cluster_config_opts("HDP", + "hdp", "1.3.2", + skip_all=True) def register_config(config, config_group, config_opts): diff --git a/saharaclient/tests/integration/configs/itest.conf.sample-full b/saharaclient/tests/integration/configs/itest.conf.sample-full index c023c48f..db8b1d25 100644 --- a/saharaclient/tests/integration/configs/itest.conf.sample-full +++ b/saharaclient/tests/integration/configs/itest.conf.sample-full @@ -56,6 +56,18 @@ # Name for internal Neutron network (string value) #INTERNAL_NEUTRON_NETWORK = 'private' +# Job launch timeout (in minutes); minimal value is 1 +# (integer value) +#JOB_LAUNCH_TIMEOUT = 10 + +# Store job binary data in the sahara internal database. +# If this option is set to False, job binary data will be stored in swift +# (boolean value) +#INTERNAL_JOB_BINARIES = True + +# Name for cluster (string value) +#CLUSTER_NAME = 'test' + [VANILLA] # The id of an existing active cluster @@ -104,10 +116,26 @@ # Name of plugin (string value) #PLUGIN_NAME = 'vanilla' -# If this option is set True, no tests for this plugin will be run +# If this option is set True no tests for this plugin will be run # (boolean value) #SKIP_ALL_TESTS_FOR_PLUGIN = False +# If this option is set True no Java EDP job will be submitted +# (boolean value) +#SKIP_JAVA_EDP_TEST = False + +# If this option is set True no MapReduce EDP job will be submitted +# (boolean value) +#SKIP_MAPREDUCE_EDP_TEST = False + +# If this option is set True no Streaming MapReduce EDP job +# will be submitted (boolean value) +#SKIP_MAPREDUCE_STREAMING_EDP_TEST = False + +# If this option is set True no Pig EDP job will be submitted +# (boolean value) +#SKIP_PIG_EDP_TEST = False + [VANILLA2] # The id of an existing active cluster @@ -160,6 +188,22 @@ # (boolean value) #SKIP_ALL_TESTS_FOR_PLUGIN = True +# If this option is set True no Java EDP job will be submitted +# (boolean value) +#SKIP_JAVA_EDP_TEST = False + +# If this option is set True no MapReduce EDP job will be submitted +# (boolean value) +#SKIP_MAPREDUCE_EDP_TEST = False + +# If this option is set True no Streaming MapReduce EDP job +# will be submitted (boolean value) +#SKIP_MAPREDUCE_STREAMING_EDP_TEST = False + +# If this option is set True no Pig EDP job will be submitted +# (boolean value) +#SKIP_PIG_EDP_TEST = False + [HDP] # The id of an existing active cluster @@ -211,3 +255,19 @@ # If this option is set True, no tests for this plugin will be run # (boolean value) #SKIP_ALL_TESTS_FOR_PLUGIN = True + +# If this option is set True no Java EDP job will be submitted +# (boolean value) +#SKIP_JAVA_EDP_TEST = False + +# If this option is set True no MapReduce EDP job will be submitted +# (boolean value) +#SKIP_MAPREDUCE_EDP_TEST = False + +# If this option is set True no Streaming MapReduce EDP job +# will be submitted (boolean value) +#SKIP_MAPREDUCE_STREAMING_EDP_TEST = False + +# If this option is set True no Pig EDP job will be submitted +# (boolean value) +#SKIP_PIG_EDP_TEST = False diff --git a/saharaclient/tests/integration/tests/cluster.py b/saharaclient/tests/integration/tests/cluster.py index f28fa5d6..c4b3b46d 100644 --- a/saharaclient/tests/integration/tests/cluster.py +++ b/saharaclient/tests/integration/tests/cluster.py @@ -187,8 +187,9 @@ class ClusterTest(base.ITestBase): def build_cluster(self, config, node_group_info): self.init_keypair() - cluster_name = "test-%s-%s" % (config.PLUGIN_NAME, - config.HADOOP_VERSION.replace(".", "")) + cluster_name = "%s-%s-%s" % (common.CLUSTER_NAME, + config.PLUGIN_NAME, + config.HADOOP_VERSION.replace(".", "")) # Create and tag an image image_id, username = self.find_image_id(config) self.cli.register_image(image_id, username, cluster_name) diff --git a/saharaclient/tests/integration/tests/edp.py b/saharaclient/tests/integration/tests/edp.py new file mode 100644 index 00000000..70cd409a --- /dev/null +++ b/saharaclient/tests/integration/tests/edp.py @@ -0,0 +1,303 @@ +# Copyright (c) 2014 Red Hat Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import saharaclient.api.base as api_base +from saharaclient.tests.integration.configs import config as cfg +import saharaclient.tests.integration.tests.base as base +import saharaclient.tests.integration.tests.utils as utils + +cfg = cfg.ITConfig() +common = cfg.common_config + + +class EDPTest(base.ITestBase): + def setUp(self): + super(EDPTest, self).setUp() + + self.swift_utils = utils.SwiftUtils() + self.path = 'saharaclient/tests/integration/tests/resources/' + + self.job = None + self.job_template = None + self.lib_binary = None + self.main_binary = None + self.input_source = None + self.output_source = None + + def tearDown(self): + super(EDPTest, self).tearDown() + self.swift_utils.delete_containers() + + def _create_binary(self, marker, info, url): + # Use the marker value to distinguish the object, but + # the name must start with a letter and include a file + # extension. + binary_name = '%s-%s' % (marker, info['name']) + self.cli.job_binary_create(binary_name, + url, + username=common.OS_USERNAME, + password=common.OS_PASSWORD) + binary_obj = self.util.find_job_binary_by_name(binary_name) + self.assertIsNotNone(binary_obj) + self.assertEqual(binary_obj.name, binary_name) + self.assertEqual(binary_obj.url, url) + return binary_obj + + def _create_swift_binary(self, marker, container, info): + if not info: + return None + self.swift_utils.upload(container, info['name'], info['data']) + url = 'swift://%s/%s' % (container, info['name']) + return self._create_binary(marker, info, url) + + def _create_internal_binary(self, marker, info): + if not info: + return None, None + output = self.cli.job_binary_data_create(info['path']) + id = self.util.find_binary_internal_id(output) + url = 'internal-db://%s' % id + return self._create_binary(marker, info, url), id + + def _create_data_source(self, name, url): + self.cli.data_source_create(name, 'swift', url, + username=common.OS_USERNAME, + password=common.OS_PASSWORD) + source = self.util.find_data_source_by_name(name) + self.assertIsNotNone(source) + return source + + def _binary_info(self, fname, relative_path=""): + # Binaries need to be named, and the name must include + # the file extension since Oozie depends on it. So, we + # just store the filename for the name here. + info = {'name': fname} + if common.INTERNAL_JOB_BINARIES: + # We will use the cli to upload the file by path + info['path'] = self.path + relative_path + fname + else: + # We will upload the binary data to swift + info['data'] = open(self.path + relative_path + fname).read() + return info + + def edp_common(self, job_type, lib=None, main=None, configs=None, + add_data_sources=True, pass_data_sources_as_args=False): + # Generate a new marker for this so we can keep containers separate + # and create some input data + marker = "%s-%s" % (job_type.replace(".", ""), os.getpid()) + container = self.swift_utils.create_container(marker) + self.swift_utils.generate_input(container, 'input') + input_url = 'swift://%s.sahara/input' % container + output_url = 'swift://%s.sahara/output' % container + + # Create binaries + if common.INTERNAL_JOB_BINARIES: + (self.lib_binary, + self.lib_data_id) = self._create_internal_binary(marker, lib) + (self.main_binary, + self.main_data_id) = self._create_internal_binary(marker, main) + else: + self.lib_data_id = None + self.main_data_id = None + self.lib_binary = self._create_swift_binary(marker, container, lib) + self.main_binary = self._create_swift_binary(marker, + container, main) + + # Create data sources + if add_data_sources: + self.input_source = self._create_data_source('input-%s' % marker, + input_url) + self.output_source = self._create_data_source('output-%s' % marker, + output_url) + else: + self.input_source = self.output_source = None + + # Create a job template + job_template_name = marker + self.cli.job_template_create(job_template_name, + job_type, + main=self.main_binary and ( + self.main_binary.id or ''), + lib=self.lib_binary and ( + self.lib_binary.id or '')) + self.job_template = self.util.find_job_template_by_name( + job_template_name) + self.assertIsNotNone(self.job_template) + self.assertEqual(self.job_template.name, job_template_name) + self.assertEqual(self.job_template.type, job_type) + if self.lib_binary: + self.assertEqual(len(self.job_template.libs), 1) + self.assertEqual(self.job_template.libs[0]['id'], + self.lib_binary.id) + if self.main_binary: + self.assertEqual(len(self.job_template.mains), 1) + self.assertEqual(self.job_template.mains[0]['id'], + self.main_binary.id) + + # Launch the job + if pass_data_sources_as_args: + args = [input_url, output_url] + else: + args = None + self.cli.job_create(self.job_template.id, + self.cluster.id, + input_id=self.input_source and ( + self.input_source.id or ''), + output_id=self.output_source and ( + self.output_source.id or ''), + args=args, + configs=configs) + + # Find the job using the job_template_id + self.job = self.util.find_job_by_job_template_id(self.job_template.id) + self.assertIsNotNone(self.job) + self.assertEqual(self.job.cluster_id, self.cluster.id) + + # poll for status + status = self.util.poll_job_execution(self.job.id) + self.assertEqual(status, 'SUCCEEDED') + + # follow up with a deletion of the stuff we made from a util function + self.delete_job_objects() + + def pig_edp(self): + self.edp_common('Pig', + lib=self._binary_info('edp-lib.jar'), + main=self._binary_info('edp-job.pig')) + + def mapreduce_edp(self): + configs = { + "mapred.mapper.class": "org.apache.oozie.example.SampleMapper", + "mapred.reducer.class": "org.apache.oozie.example.SampleReducer" + } + self.edp_common('MapReduce', + lib=self._binary_info('edp-mapreduce.jar'), + configs=configs) + + def mapreduce_streaming_edp(self): + configs = { + "edp.streaming.mapper": "/bin/cat", + "edp.streaming.reducer": "/usr/bin/wc" + } + self.edp_common('MapReduce.Streaming', + configs=configs) + + def java_edp(self): + configs = { + 'fs.swift.service.sahara.username': common.OS_USERNAME, + 'fs.swift.service.sahara.password': common.OS_PASSWORD, + 'edp.java.main_class': 'org.openstack.sahara.examples.WordCount' + } + self.edp_common('Java', + lib=self._binary_info('edp-java.jar', + relative_path='edp-java/'), + configs=configs, + add_data_sources=False, + pass_data_sources_as_args=True) + + def run_edp_jobs(self, config): + try: + if not config.SKIP_JAVA_EDP_TEST: + self.java_edp() + if not config.SKIP_MAPREDUCE_EDP_TEST: + self.mapreduce_edp() + if not config.SKIP_MAPREDUCE_STREAMING_EDP_TEST: + self.mapreduce_streaming_edp() + if not config.SKIP_PIG_EDP_TEST: + self.pig_edp() + except Exception as e: + # Something went wrong, try to clean up what might be left + self.delete_job_objects_via_client() + raise(e) + + def delete_job_objects(self): + if self.job: + self.cli.job_delete(self.job.id) + self.assertRaises(api_base.APIException, + self.util.find_job_by_id, + self.job.id) + self.job = None + + if self.job_template: + self.cli.job_template_delete(self.job_template.id) + self.assertRaises(api_base.APIException, + self.util.find_job_template_by_id, + self.job_template.id) + self.job_template = None + + if self.lib_binary: + self.cli.job_binary_delete(self.lib_binary.id) + self.assertRaises(api_base.APIException, + self.util.find_job_binary_by_id, + self.lib_binary.id) + self.lib_binary = None + + if self.lib_data_id: + self.cli.job_binary_data_delete(self.lib_data_id) + self.assertRaises(api_base.APIException, + self.util.find_binary_internal_by_id, + self.lib_data_id) + self.lib_data_id = None + + if self.main_binary: + self.cli.job_binary_delete(self.main_binary.id) + self.assertRaises(api_base.APIException, + self.util.find_job_binary_by_id, + self.main_binary.id) + self.main_binary = None + + if self.main_data_id: + self.cli.job_binary_data_delete(self.main_data_id) + self.assertRaises(api_base.APIException, + self.util.find_binary_internal_by_id, + self.main_data_id) + self.main_data_id = None + + if self.input_source: + self.cli.data_source_delete(self.input_source.id) + self.assertRaises(api_base.APIException, + self.util.find_data_source_by_id, + self.input_source.id) + self.input_source = None + + if self.output_source: + self.cli.data_source_delete(self.output_source.id) + self.assertRaises(api_base.APIException, + self.util.find_data_source_by_id, + self.output_source.id) + self.output_source = None + + def delete_job_objects_via_client(self): + try: + if self.job: + self.util.client.job_executions.delete(self.job.id) + self.job = None + if self.job_template: + self.util.client.jobs.delete(self.job_template.id) + self.job_template = None + if self.lib_binary: + self.util.client.job_binaries.delete(self.lib_binary.id) + self.lib_binary = None + if self.main_binary: + self.util.client.job_binaries.delete(self.main_binary.id) + self.main_binary = None + if self.input_source: + self.util.client.data_sources.delete(self.input_source.id) + self.input_source = None + if self.output_source: + self.util.client.data_sources.delete(self.output_source.id) + self.output_source = None + except Exception: + pass diff --git a/saharaclient/tests/integration/tests/full_test_driver.py b/saharaclient/tests/integration/tests/full_test_driver.py index 75c699b4..c1e0123b 100644 --- a/saharaclient/tests/integration/tests/full_test_driver.py +++ b/saharaclient/tests/integration/tests/full_test_driver.py @@ -14,9 +14,10 @@ # limitations under the License. import saharaclient.tests.integration.tests.cluster as cluster +import saharaclient.tests.integration.tests.edp as edp -class FullTestDriver(cluster.ClusterTest): +class FullTestDriver(edp.EDPTest, cluster.ClusterTest): def drive_full_test(self, config, ng_templates): # If we get an exception during cluster launch, the cluster has already @@ -24,6 +25,7 @@ class FullTestDriver(cluster.ClusterTest): skip_teardown = self.launch_cluster_or_use_existing(config, ng_templates) try: + self.run_edp_jobs(config) if not skip_teardown: self.teardown_cluster() except Exception as e: diff --git a/saharaclient/tests/integration/tests/resources/edp-java/README b/saharaclient/tests/integration/tests/resources/edp-java/README new file mode 100644 index 00000000..4fa99240 --- /dev/null +++ b/saharaclient/tests/integration/tests/resources/edp-java/README @@ -0,0 +1,4 @@ +Compiled against Hadoop 1.2.1 +$ mkdir wordcount_classes +$ javac -classpath /usr/share/hadoop/hadoop-core-1.2.1.jar:/usr/share/hadoop/lib/commons-cli-1.2.jar -d wordcount_classes WordCount.java +$ jar -cvf edp-java.jar -C wordcount_classes/ . diff --git a/saharaclient/tests/integration/tests/resources/edp-java/WordCount.java b/saharaclient/tests/integration/tests/resources/edp-java/WordCount.java new file mode 100644 index 00000000..8f834b5b --- /dev/null +++ b/saharaclient/tests/integration/tests/resources/edp-java/WordCount.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.openstack.sahara.examples; + +import java.io.IOException; +import java.util.StringTokenizer; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.mapreduce.Reducer; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.util.GenericOptionsParser; + +public class WordCount { + + public static class TokenizerMapper + extends Mapper{ + + private final static IntWritable one = new IntWritable(1); + private Text word = new Text(); + + public void map(Object key, Text value, Context context + ) throws IOException, InterruptedException { + StringTokenizer itr = new StringTokenizer(value.toString()); + while (itr.hasMoreTokens()) { + word.set(itr.nextToken()); + context.write(word, one); + } + } + } + + public static class IntSumReducer + extends Reducer { + private IntWritable result = new IntWritable(); + + public void reduce(Text key, Iterable values, + Context context + ) throws IOException, InterruptedException { + int sum = 0; + for (IntWritable val : values) { + sum += val.get(); + } + result.set(sum); + context.write(key, result); + } + } + + public static void main(String[] args) throws Exception { + Configuration conf = new Configuration(); + String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); + if (otherArgs.length != 2) { + System.err.println("Usage: wordcount "); + System.exit(2); + } + + // ---- Begin modifications for EDP ---- + // This will add properties from the tag specified + // in the Oozie workflow. For java actions, Oozie writes the + // configuration values to a file pointed to by ooze.action.conf.xml + conf.addResource(new Path("file:///", + System.getProperty("oozie.action.conf.xml"))); + // ---- End modifications for EDP ---- + + Job job = new Job(conf, "word count"); + job.setJarByClass(WordCount.class); + job.setMapperClass(TokenizerMapper.class); + job.setCombinerClass(IntSumReducer.class); + job.setReducerClass(IntSumReducer.class); + job.setOutputKeyClass(Text.class); + job.setOutputValueClass(IntWritable.class); + FileInputFormat.addInputPath(job, new Path(otherArgs[0])); + FileOutputFormat.setOutputPath(job, new Path(otherArgs[1])); + System.exit(job.waitForCompletion(true) ? 0 : 1); + } +} diff --git a/saharaclient/tests/integration/tests/resources/edp-java/edp-java.jar b/saharaclient/tests/integration/tests/resources/edp-java/edp-java.jar new file mode 100644 index 00000000..910a24aa Binary files /dev/null and b/saharaclient/tests/integration/tests/resources/edp-java/edp-java.jar differ diff --git a/saharaclient/tests/integration/tests/resources/edp-job.pig b/saharaclient/tests/integration/tests/resources/edp-job.pig new file mode 100644 index 00000000..4141906e --- /dev/null +++ b/saharaclient/tests/integration/tests/resources/edp-job.pig @@ -0,0 +1,3 @@ +A = load '$INPUT' using PigStorage(':') as (fruit: chararray); +B = foreach A generate com.hadoopbook.pig.Trim(fruit); +store B into '$OUTPUT' USING PigStorage(); diff --git a/saharaclient/tests/integration/tests/utils.py b/saharaclient/tests/integration/tests/utils.py index b24a25e2..058b7b02 100644 --- a/saharaclient/tests/integration/tests/utils.py +++ b/saharaclient/tests/integration/tests/utils.py @@ -14,7 +14,10 @@ # limitations under the License. import json +import random +import re import six +import string import tempfile import time @@ -63,6 +66,47 @@ class Utils(object): return self.find_object_by_field(name, self.client.clusters.list()) + def find_job_by_id(self, id): + return self.client.job_executions.get(id) + + def find_job_binary_by_id(self, id): + return self.client.job_binaries.get(id) + + def find_job_template_by_id(self, id): + return self.client.jobs.get(id) + + def find_data_source_by_id(self, id): + return self.client.data_sources.get(id) + + def find_binary_internal_by_id(self, id): + return self.client.job_binary_internals.get(id) + + def find_job_binary_by_name(self, name): + return self.find_object_by_field(name, + self.client.job_binaries.list()) + + def find_job_template_by_name(self, name): + return self.find_object_by_field(name, + self.client.jobs.list()) + + def find_data_source_by_name(self, name): + return self.find_object_by_field(name, + self.client.data_sources.list()) + + def find_job_by_job_template_id(self, id): + return self.find_object_by_field(id, + self.client.job_executions.list(), + "job_id") + + def find_binary_internal_id(self, output): + pattern = '\|\s*%s\s*\|\s*%s' # match '| id | name' + internals = [(i.id, + i.name) for i in self.client.job_binary_internals.list()] + for i in internals: + prog = re.compile(pattern % i) + if prog.search(output): + return i[0] + def generate_json_file(self, temp): f = tempfile.NamedTemporaryFile(delete=True) f.write(json.dumps(temp)) @@ -82,9 +126,23 @@ class Utils(object): cluster = self.client.clusters.get(id) return str(cluster.status) + def poll_job_execution(self, id): + #TODO(tmckay): this should use timeutils but we need + #to add it to openstack/common + timeout = common['JOB_LAUNCH_TIMEOUT'] * 60 + status = self.client.job_executions.get(id).info['status'] + while status != 'SUCCEEDED': + if status == 'KILLED' or timeout <= 0: + break + time.sleep(10) + timeout -= 10 + status = self.client.job_executions.get(id).info['status'] + return status + class SwiftUtils(object): def __init__(self): + self.containers = [] self.client = swift_client.Connection( authurl=common['OS_AUTH_URL'], user=common['OS_USERNAME'], @@ -92,6 +150,31 @@ class SwiftUtils(object): tenant_name=common['OS_TENANT_NAME'], auth_version=common['SWIFT_AUTH_VERSION']) + def create_container(self, marker): + container_name = 'cli-test-%s' % marker + self.client.put_container(container_name) + self.containers.append(container_name) + return container_name + + def generate_input(self, container_name, input_name): + self.client.put_object( + container_name, input_name, ''.join( + random.choice(':' + ' ' + '\n' + string.ascii_lowercase) + for x in range(10000) + ) + ) + + def upload(self, container_name, obj_name, data): + self.client.put_object(container_name, obj_name, data) + + def delete_containers(self): + for container in self.containers: + objects = [obj['name'] for obj in ( + self.client.get_container(container)[1])] + for obj in objects: + self.client.delete_object(container, obj) + self.client.delete_container(container) + class AssertionWrappers(object): def check_dict_elems_in_obj(self, d, obj, exclude=[]):