Add EDP jobs to cli integration tests
Partial-implements: blueprint cli-integration-tests Change-Id: I2de395f07fd7706d05d16afefb916530e18aa207
This commit is contained in:
@@ -86,11 +86,24 @@ COMMON_CONFIG_OPTS = [
|
||||
'INTERNAL_NEUTRON_NETWORK.'),
|
||||
cfg.StrOpt('INTERNAL_NEUTRON_NETWORK',
|
||||
default='private',
|
||||
help='Name for internal Neutron network.')
|
||||
help='Name for internal Neutron network.'),
|
||||
cfg.IntOpt('JOB_LAUNCH_TIMEOUT',
|
||||
default=10,
|
||||
help='Job launch timeout (in minutes); '
|
||||
'minimal value is 1.'),
|
||||
cfg.BoolOpt('INTERNAL_JOB_BINARIES',
|
||||
default=True,
|
||||
help='Store job binary data in the sahara '
|
||||
'internal database. If this option is set to '
|
||||
'False, job binary data will be stored in swift.'),
|
||||
cfg.StrOpt('CLUSTER_NAME',
|
||||
default='test',
|
||||
help='Name for cluster.')
|
||||
]
|
||||
|
||||
|
||||
def general_cluster_config_opts():
|
||||
def general_cluster_config_opts(plugin_text, plugin_name, hadoop_ver,
|
||||
skip_all=False):
|
||||
return [
|
||||
cfg.StrOpt('EXISTING_CLUSTER_ID',
|
||||
help='The id of an existing active cluster '
|
||||
@@ -122,54 +135,40 @@ def general_cluster_config_opts():
|
||||
'chosen by the tag "sahara_i_tests".'),
|
||||
cfg.StrOpt('SSH_USERNAME',
|
||||
help='Username used to log into a cluster node via SSH.'),
|
||||
cfg.BoolOpt('SKIP_CLUSTER_TEARDOWN',
|
||||
default=False,
|
||||
help='Skip tearing down the cluster. If an existing '
|
||||
'cluster is used it will never be torn down by the test.')
|
||||
]
|
||||
|
||||
|
||||
def skip_config_opts(plugin, skip_all=False):
|
||||
return [
|
||||
cfg.StrOpt('HADOOP_VERSION',
|
||||
default='%s' % hadoop_ver,
|
||||
help='Version of Hadoop'),
|
||||
cfg.StrOpt('PLUGIN_NAME',
|
||||
default='%s' % plugin_name,
|
||||
help='Name of plugin'),
|
||||
cfg.BoolOpt('SKIP_ALL_TESTS_FOR_PLUGIN',
|
||||
default=skip_all,
|
||||
help='If this flag is True, then all tests for the %s '
|
||||
'plugin will be skipped.' % plugin)
|
||||
'plugin will be skipped.' % plugin_text),
|
||||
cfg.BoolOpt('SKIP_CLUSTER_TEARDOWN',
|
||||
default=False,
|
||||
help='Skip tearing down the cluster. If an existing '
|
||||
'cluster is used it will never be torn down by the test.'),
|
||||
cfg.BoolOpt('SKIP_JAVA_EDP_TEST', default=False),
|
||||
cfg.BoolOpt('SKIP_MAPREDUCE_EDP_TEST', default=False),
|
||||
cfg.BoolOpt('SKIP_MAPREDUCE_STREAMING_EDP_TEST', default=False),
|
||||
cfg.BoolOpt('SKIP_PIG_EDP_TEST', default=False)
|
||||
]
|
||||
|
||||
|
||||
VANILLA_CONFIG_GROUP = cfg.OptGroup(name='VANILLA')
|
||||
VANILLA_CONFIG_OPTS = skip_config_opts(
|
||||
"Vanilla") + general_cluster_config_opts() + [
|
||||
cfg.StrOpt('HADOOP_VERSION',
|
||||
default="1.2.1",
|
||||
help='Version of Hadoop'),
|
||||
cfg.StrOpt('PLUGIN_NAME',
|
||||
default='vanilla',
|
||||
help='Name of plugin')
|
||||
]
|
||||
VANILLA_CONFIG_OPTS = general_cluster_config_opts("Vanilla",
|
||||
"vanilla", "1.2.1")
|
||||
|
||||
VANILLA2_CONFIG_GROUP = cfg.OptGroup(name='VANILLA2')
|
||||
VANILLA2_CONFIG_OPTS = skip_config_opts(
|
||||
"Vanilla2", skip_all=True) + general_cluster_config_opts() + [
|
||||
cfg.StrOpt('HADOOP_VERSION',
|
||||
default="2.3.0",
|
||||
help='Version of Hadoop'),
|
||||
cfg.StrOpt('PLUGIN_NAME',
|
||||
default='vanilla',
|
||||
help='Name of plugin')
|
||||
]
|
||||
VANILLA2_CONFIG_OPTS = general_cluster_config_opts("Vanilla2",
|
||||
"vanilla", "2.3.0",
|
||||
skip_all=True)
|
||||
|
||||
HDP_CONFIG_GROUP = cfg.OptGroup(name='HDP')
|
||||
HDP_CONFIG_OPTS = skip_config_opts(
|
||||
"HDP", skip_all=True) + general_cluster_config_opts() + [
|
||||
cfg.StrOpt('HADOOP_VERSION',
|
||||
default="1.3.2",
|
||||
help='Version of Hadoop'),
|
||||
cfg.StrOpt('PLUGIN_NAME',
|
||||
default='hdp',
|
||||
help='Name of plugin')
|
||||
]
|
||||
HDP_CONFIG_OPTS = general_cluster_config_opts("HDP",
|
||||
"hdp", "1.3.2",
|
||||
skip_all=True)
|
||||
|
||||
|
||||
def register_config(config, config_group, config_opts):
|
||||
|
||||
@@ -56,6 +56,18 @@
|
||||
# Name for internal Neutron network (string value)
|
||||
#INTERNAL_NEUTRON_NETWORK = 'private'
|
||||
|
||||
# Job launch timeout (in minutes); minimal value is 1
|
||||
# (integer value)
|
||||
#JOB_LAUNCH_TIMEOUT = 10
|
||||
|
||||
# Store job binary data in the sahara internal database.
|
||||
# If this option is set to False, job binary data will be stored in swift
|
||||
# (boolean value)
|
||||
#INTERNAL_JOB_BINARIES = True
|
||||
|
||||
# Name for cluster (string value)
|
||||
#CLUSTER_NAME = 'test'
|
||||
|
||||
[VANILLA]
|
||||
|
||||
# The id of an existing active cluster
|
||||
@@ -104,10 +116,26 @@
|
||||
# Name of plugin (string value)
|
||||
#PLUGIN_NAME = 'vanilla'
|
||||
|
||||
# If this option is set True, no tests for this plugin will be run
|
||||
# If this option is set True no tests for this plugin will be run
|
||||
# (boolean value)
|
||||
#SKIP_ALL_TESTS_FOR_PLUGIN = False
|
||||
|
||||
# If this option is set True no Java EDP job will be submitted
|
||||
# (boolean value)
|
||||
#SKIP_JAVA_EDP_TEST = False
|
||||
|
||||
# If this option is set True no MapReduce EDP job will be submitted
|
||||
# (boolean value)
|
||||
#SKIP_MAPREDUCE_EDP_TEST = False
|
||||
|
||||
# If this option is set True no Streaming MapReduce EDP job
|
||||
# will be submitted (boolean value)
|
||||
#SKIP_MAPREDUCE_STREAMING_EDP_TEST = False
|
||||
|
||||
# If this option is set True no Pig EDP job will be submitted
|
||||
# (boolean value)
|
||||
#SKIP_PIG_EDP_TEST = False
|
||||
|
||||
[VANILLA2]
|
||||
|
||||
# The id of an existing active cluster
|
||||
@@ -160,6 +188,22 @@
|
||||
# (boolean value)
|
||||
#SKIP_ALL_TESTS_FOR_PLUGIN = True
|
||||
|
||||
# If this option is set True no Java EDP job will be submitted
|
||||
# (boolean value)
|
||||
#SKIP_JAVA_EDP_TEST = False
|
||||
|
||||
# If this option is set True no MapReduce EDP job will be submitted
|
||||
# (boolean value)
|
||||
#SKIP_MAPREDUCE_EDP_TEST = False
|
||||
|
||||
# If this option is set True no Streaming MapReduce EDP job
|
||||
# will be submitted (boolean value)
|
||||
#SKIP_MAPREDUCE_STREAMING_EDP_TEST = False
|
||||
|
||||
# If this option is set True no Pig EDP job will be submitted
|
||||
# (boolean value)
|
||||
#SKIP_PIG_EDP_TEST = False
|
||||
|
||||
[HDP]
|
||||
|
||||
# The id of an existing active cluster
|
||||
@@ -211,3 +255,19 @@
|
||||
# If this option is set True, no tests for this plugin will be run
|
||||
# (boolean value)
|
||||
#SKIP_ALL_TESTS_FOR_PLUGIN = True
|
||||
|
||||
# If this option is set True no Java EDP job will be submitted
|
||||
# (boolean value)
|
||||
#SKIP_JAVA_EDP_TEST = False
|
||||
|
||||
# If this option is set True no MapReduce EDP job will be submitted
|
||||
# (boolean value)
|
||||
#SKIP_MAPREDUCE_EDP_TEST = False
|
||||
|
||||
# If this option is set True no Streaming MapReduce EDP job
|
||||
# will be submitted (boolean value)
|
||||
#SKIP_MAPREDUCE_STREAMING_EDP_TEST = False
|
||||
|
||||
# If this option is set True no Pig EDP job will be submitted
|
||||
# (boolean value)
|
||||
#SKIP_PIG_EDP_TEST = False
|
||||
|
||||
@@ -187,8 +187,9 @@ class ClusterTest(base.ITestBase):
|
||||
|
||||
def build_cluster(self, config, node_group_info):
|
||||
self.init_keypair()
|
||||
cluster_name = "test-%s-%s" % (config.PLUGIN_NAME,
|
||||
config.HADOOP_VERSION.replace(".", ""))
|
||||
cluster_name = "%s-%s-%s" % (common.CLUSTER_NAME,
|
||||
config.PLUGIN_NAME,
|
||||
config.HADOOP_VERSION.replace(".", ""))
|
||||
# Create and tag an image
|
||||
image_id, username = self.find_image_id(config)
|
||||
self.cli.register_image(image_id, username, cluster_name)
|
||||
|
||||
303
saharaclient/tests/integration/tests/edp.py
Normal file
303
saharaclient/tests/integration/tests/edp.py
Normal file
@@ -0,0 +1,303 @@
|
||||
# Copyright (c) 2014 Red Hat Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import os
|
||||
import saharaclient.api.base as api_base
|
||||
from saharaclient.tests.integration.configs import config as cfg
|
||||
import saharaclient.tests.integration.tests.base as base
|
||||
import saharaclient.tests.integration.tests.utils as utils
|
||||
|
||||
cfg = cfg.ITConfig()
|
||||
common = cfg.common_config
|
||||
|
||||
|
||||
class EDPTest(base.ITestBase):
|
||||
def setUp(self):
|
||||
super(EDPTest, self).setUp()
|
||||
|
||||
self.swift_utils = utils.SwiftUtils()
|
||||
self.path = 'saharaclient/tests/integration/tests/resources/'
|
||||
|
||||
self.job = None
|
||||
self.job_template = None
|
||||
self.lib_binary = None
|
||||
self.main_binary = None
|
||||
self.input_source = None
|
||||
self.output_source = None
|
||||
|
||||
def tearDown(self):
|
||||
super(EDPTest, self).tearDown()
|
||||
self.swift_utils.delete_containers()
|
||||
|
||||
def _create_binary(self, marker, info, url):
|
||||
# Use the marker value to distinguish the object, but
|
||||
# the name must start with a letter and include a file
|
||||
# extension.
|
||||
binary_name = '%s-%s' % (marker, info['name'])
|
||||
self.cli.job_binary_create(binary_name,
|
||||
url,
|
||||
username=common.OS_USERNAME,
|
||||
password=common.OS_PASSWORD)
|
||||
binary_obj = self.util.find_job_binary_by_name(binary_name)
|
||||
self.assertIsNotNone(binary_obj)
|
||||
self.assertEqual(binary_obj.name, binary_name)
|
||||
self.assertEqual(binary_obj.url, url)
|
||||
return binary_obj
|
||||
|
||||
def _create_swift_binary(self, marker, container, info):
|
||||
if not info:
|
||||
return None
|
||||
self.swift_utils.upload(container, info['name'], info['data'])
|
||||
url = 'swift://%s/%s' % (container, info['name'])
|
||||
return self._create_binary(marker, info, url)
|
||||
|
||||
def _create_internal_binary(self, marker, info):
|
||||
if not info:
|
||||
return None, None
|
||||
output = self.cli.job_binary_data_create(info['path'])
|
||||
id = self.util.find_binary_internal_id(output)
|
||||
url = 'internal-db://%s' % id
|
||||
return self._create_binary(marker, info, url), id
|
||||
|
||||
def _create_data_source(self, name, url):
|
||||
self.cli.data_source_create(name, 'swift', url,
|
||||
username=common.OS_USERNAME,
|
||||
password=common.OS_PASSWORD)
|
||||
source = self.util.find_data_source_by_name(name)
|
||||
self.assertIsNotNone(source)
|
||||
return source
|
||||
|
||||
def _binary_info(self, fname, relative_path=""):
|
||||
# Binaries need to be named, and the name must include
|
||||
# the file extension since Oozie depends on it. So, we
|
||||
# just store the filename for the name here.
|
||||
info = {'name': fname}
|
||||
if common.INTERNAL_JOB_BINARIES:
|
||||
# We will use the cli to upload the file by path
|
||||
info['path'] = self.path + relative_path + fname
|
||||
else:
|
||||
# We will upload the binary data to swift
|
||||
info['data'] = open(self.path + relative_path + fname).read()
|
||||
return info
|
||||
|
||||
def edp_common(self, job_type, lib=None, main=None, configs=None,
|
||||
add_data_sources=True, pass_data_sources_as_args=False):
|
||||
# Generate a new marker for this so we can keep containers separate
|
||||
# and create some input data
|
||||
marker = "%s-%s" % (job_type.replace(".", ""), os.getpid())
|
||||
container = self.swift_utils.create_container(marker)
|
||||
self.swift_utils.generate_input(container, 'input')
|
||||
input_url = 'swift://%s.sahara/input' % container
|
||||
output_url = 'swift://%s.sahara/output' % container
|
||||
|
||||
# Create binaries
|
||||
if common.INTERNAL_JOB_BINARIES:
|
||||
(self.lib_binary,
|
||||
self.lib_data_id) = self._create_internal_binary(marker, lib)
|
||||
(self.main_binary,
|
||||
self.main_data_id) = self._create_internal_binary(marker, main)
|
||||
else:
|
||||
self.lib_data_id = None
|
||||
self.main_data_id = None
|
||||
self.lib_binary = self._create_swift_binary(marker, container, lib)
|
||||
self.main_binary = self._create_swift_binary(marker,
|
||||
container, main)
|
||||
|
||||
# Create data sources
|
||||
if add_data_sources:
|
||||
self.input_source = self._create_data_source('input-%s' % marker,
|
||||
input_url)
|
||||
self.output_source = self._create_data_source('output-%s' % marker,
|
||||
output_url)
|
||||
else:
|
||||
self.input_source = self.output_source = None
|
||||
|
||||
# Create a job template
|
||||
job_template_name = marker
|
||||
self.cli.job_template_create(job_template_name,
|
||||
job_type,
|
||||
main=self.main_binary and (
|
||||
self.main_binary.id or ''),
|
||||
lib=self.lib_binary and (
|
||||
self.lib_binary.id or ''))
|
||||
self.job_template = self.util.find_job_template_by_name(
|
||||
job_template_name)
|
||||
self.assertIsNotNone(self.job_template)
|
||||
self.assertEqual(self.job_template.name, job_template_name)
|
||||
self.assertEqual(self.job_template.type, job_type)
|
||||
if self.lib_binary:
|
||||
self.assertEqual(len(self.job_template.libs), 1)
|
||||
self.assertEqual(self.job_template.libs[0]['id'],
|
||||
self.lib_binary.id)
|
||||
if self.main_binary:
|
||||
self.assertEqual(len(self.job_template.mains), 1)
|
||||
self.assertEqual(self.job_template.mains[0]['id'],
|
||||
self.main_binary.id)
|
||||
|
||||
# Launch the job
|
||||
if pass_data_sources_as_args:
|
||||
args = [input_url, output_url]
|
||||
else:
|
||||
args = None
|
||||
self.cli.job_create(self.job_template.id,
|
||||
self.cluster.id,
|
||||
input_id=self.input_source and (
|
||||
self.input_source.id or ''),
|
||||
output_id=self.output_source and (
|
||||
self.output_source.id or ''),
|
||||
args=args,
|
||||
configs=configs)
|
||||
|
||||
# Find the job using the job_template_id
|
||||
self.job = self.util.find_job_by_job_template_id(self.job_template.id)
|
||||
self.assertIsNotNone(self.job)
|
||||
self.assertEqual(self.job.cluster_id, self.cluster.id)
|
||||
|
||||
# poll for status
|
||||
status = self.util.poll_job_execution(self.job.id)
|
||||
self.assertEqual(status, 'SUCCEEDED')
|
||||
|
||||
# follow up with a deletion of the stuff we made from a util function
|
||||
self.delete_job_objects()
|
||||
|
||||
def pig_edp(self):
|
||||
self.edp_common('Pig',
|
||||
lib=self._binary_info('edp-lib.jar'),
|
||||
main=self._binary_info('edp-job.pig'))
|
||||
|
||||
def mapreduce_edp(self):
|
||||
configs = {
|
||||
"mapred.mapper.class": "org.apache.oozie.example.SampleMapper",
|
||||
"mapred.reducer.class": "org.apache.oozie.example.SampleReducer"
|
||||
}
|
||||
self.edp_common('MapReduce',
|
||||
lib=self._binary_info('edp-mapreduce.jar'),
|
||||
configs=configs)
|
||||
|
||||
def mapreduce_streaming_edp(self):
|
||||
configs = {
|
||||
"edp.streaming.mapper": "/bin/cat",
|
||||
"edp.streaming.reducer": "/usr/bin/wc"
|
||||
}
|
||||
self.edp_common('MapReduce.Streaming',
|
||||
configs=configs)
|
||||
|
||||
def java_edp(self):
|
||||
configs = {
|
||||
'fs.swift.service.sahara.username': common.OS_USERNAME,
|
||||
'fs.swift.service.sahara.password': common.OS_PASSWORD,
|
||||
'edp.java.main_class': 'org.openstack.sahara.examples.WordCount'
|
||||
}
|
||||
self.edp_common('Java',
|
||||
lib=self._binary_info('edp-java.jar',
|
||||
relative_path='edp-java/'),
|
||||
configs=configs,
|
||||
add_data_sources=False,
|
||||
pass_data_sources_as_args=True)
|
||||
|
||||
def run_edp_jobs(self, config):
|
||||
try:
|
||||
if not config.SKIP_JAVA_EDP_TEST:
|
||||
self.java_edp()
|
||||
if not config.SKIP_MAPREDUCE_EDP_TEST:
|
||||
self.mapreduce_edp()
|
||||
if not config.SKIP_MAPREDUCE_STREAMING_EDP_TEST:
|
||||
self.mapreduce_streaming_edp()
|
||||
if not config.SKIP_PIG_EDP_TEST:
|
||||
self.pig_edp()
|
||||
except Exception as e:
|
||||
# Something went wrong, try to clean up what might be left
|
||||
self.delete_job_objects_via_client()
|
||||
raise(e)
|
||||
|
||||
def delete_job_objects(self):
|
||||
if self.job:
|
||||
self.cli.job_delete(self.job.id)
|
||||
self.assertRaises(api_base.APIException,
|
||||
self.util.find_job_by_id,
|
||||
self.job.id)
|
||||
self.job = None
|
||||
|
||||
if self.job_template:
|
||||
self.cli.job_template_delete(self.job_template.id)
|
||||
self.assertRaises(api_base.APIException,
|
||||
self.util.find_job_template_by_id,
|
||||
self.job_template.id)
|
||||
self.job_template = None
|
||||
|
||||
if self.lib_binary:
|
||||
self.cli.job_binary_delete(self.lib_binary.id)
|
||||
self.assertRaises(api_base.APIException,
|
||||
self.util.find_job_binary_by_id,
|
||||
self.lib_binary.id)
|
||||
self.lib_binary = None
|
||||
|
||||
if self.lib_data_id:
|
||||
self.cli.job_binary_data_delete(self.lib_data_id)
|
||||
self.assertRaises(api_base.APIException,
|
||||
self.util.find_binary_internal_by_id,
|
||||
self.lib_data_id)
|
||||
self.lib_data_id = None
|
||||
|
||||
if self.main_binary:
|
||||
self.cli.job_binary_delete(self.main_binary.id)
|
||||
self.assertRaises(api_base.APIException,
|
||||
self.util.find_job_binary_by_id,
|
||||
self.main_binary.id)
|
||||
self.main_binary = None
|
||||
|
||||
if self.main_data_id:
|
||||
self.cli.job_binary_data_delete(self.main_data_id)
|
||||
self.assertRaises(api_base.APIException,
|
||||
self.util.find_binary_internal_by_id,
|
||||
self.main_data_id)
|
||||
self.main_data_id = None
|
||||
|
||||
if self.input_source:
|
||||
self.cli.data_source_delete(self.input_source.id)
|
||||
self.assertRaises(api_base.APIException,
|
||||
self.util.find_data_source_by_id,
|
||||
self.input_source.id)
|
||||
self.input_source = None
|
||||
|
||||
if self.output_source:
|
||||
self.cli.data_source_delete(self.output_source.id)
|
||||
self.assertRaises(api_base.APIException,
|
||||
self.util.find_data_source_by_id,
|
||||
self.output_source.id)
|
||||
self.output_source = None
|
||||
|
||||
def delete_job_objects_via_client(self):
|
||||
try:
|
||||
if self.job:
|
||||
self.util.client.job_executions.delete(self.job.id)
|
||||
self.job = None
|
||||
if self.job_template:
|
||||
self.util.client.jobs.delete(self.job_template.id)
|
||||
self.job_template = None
|
||||
if self.lib_binary:
|
||||
self.util.client.job_binaries.delete(self.lib_binary.id)
|
||||
self.lib_binary = None
|
||||
if self.main_binary:
|
||||
self.util.client.job_binaries.delete(self.main_binary.id)
|
||||
self.main_binary = None
|
||||
if self.input_source:
|
||||
self.util.client.data_sources.delete(self.input_source.id)
|
||||
self.input_source = None
|
||||
if self.output_source:
|
||||
self.util.client.data_sources.delete(self.output_source.id)
|
||||
self.output_source = None
|
||||
except Exception:
|
||||
pass
|
||||
@@ -14,9 +14,10 @@
|
||||
# limitations under the License.
|
||||
|
||||
import saharaclient.tests.integration.tests.cluster as cluster
|
||||
import saharaclient.tests.integration.tests.edp as edp
|
||||
|
||||
|
||||
class FullTestDriver(cluster.ClusterTest):
|
||||
class FullTestDriver(edp.EDPTest, cluster.ClusterTest):
|
||||
|
||||
def drive_full_test(self, config, ng_templates):
|
||||
# If we get an exception during cluster launch, the cluster has already
|
||||
@@ -24,6 +25,7 @@ class FullTestDriver(cluster.ClusterTest):
|
||||
skip_teardown = self.launch_cluster_or_use_existing(config,
|
||||
ng_templates)
|
||||
try:
|
||||
self.run_edp_jobs(config)
|
||||
if not skip_teardown:
|
||||
self.teardown_cluster()
|
||||
except Exception as e:
|
||||
|
||||
@@ -0,0 +1,4 @@
|
||||
Compiled against Hadoop 1.2.1
|
||||
$ mkdir wordcount_classes
|
||||
$ javac -classpath /usr/share/hadoop/hadoop-core-1.2.1.jar:/usr/share/hadoop/lib/commons-cli-1.2.jar -d wordcount_classes WordCount.java
|
||||
$ jar -cvf edp-java.jar -C wordcount_classes/ .
|
||||
@@ -0,0 +1,95 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.openstack.sahara.examples;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.StringTokenizer;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.io.IntWritable;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.mapreduce.Job;
|
||||
import org.apache.hadoop.mapreduce.Mapper;
|
||||
import org.apache.hadoop.mapreduce.Reducer;
|
||||
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
|
||||
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
|
||||
import org.apache.hadoop.util.GenericOptionsParser;
|
||||
|
||||
public class WordCount {
|
||||
|
||||
public static class TokenizerMapper
|
||||
extends Mapper<Object, Text, Text, IntWritable>{
|
||||
|
||||
private final static IntWritable one = new IntWritable(1);
|
||||
private Text word = new Text();
|
||||
|
||||
public void map(Object key, Text value, Context context
|
||||
) throws IOException, InterruptedException {
|
||||
StringTokenizer itr = new StringTokenizer(value.toString());
|
||||
while (itr.hasMoreTokens()) {
|
||||
word.set(itr.nextToken());
|
||||
context.write(word, one);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static class IntSumReducer
|
||||
extends Reducer<Text,IntWritable,Text,IntWritable> {
|
||||
private IntWritable result = new IntWritable();
|
||||
|
||||
public void reduce(Text key, Iterable<IntWritable> values,
|
||||
Context context
|
||||
) throws IOException, InterruptedException {
|
||||
int sum = 0;
|
||||
for (IntWritable val : values) {
|
||||
sum += val.get();
|
||||
}
|
||||
result.set(sum);
|
||||
context.write(key, result);
|
||||
}
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
|
||||
if (otherArgs.length != 2) {
|
||||
System.err.println("Usage: wordcount <in> <out>");
|
||||
System.exit(2);
|
||||
}
|
||||
|
||||
// ---- Begin modifications for EDP ----
|
||||
// This will add properties from the <configuration> tag specified
|
||||
// in the Oozie workflow. For java actions, Oozie writes the
|
||||
// configuration values to a file pointed to by ooze.action.conf.xml
|
||||
conf.addResource(new Path("file:///",
|
||||
System.getProperty("oozie.action.conf.xml")));
|
||||
// ---- End modifications for EDP ----
|
||||
|
||||
Job job = new Job(conf, "word count");
|
||||
job.setJarByClass(WordCount.class);
|
||||
job.setMapperClass(TokenizerMapper.class);
|
||||
job.setCombinerClass(IntSumReducer.class);
|
||||
job.setReducerClass(IntSumReducer.class);
|
||||
job.setOutputKeyClass(Text.class);
|
||||
job.setOutputValueClass(IntWritable.class);
|
||||
FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
|
||||
FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
|
||||
System.exit(job.waitForCompletion(true) ? 0 : 1);
|
||||
}
|
||||
}
|
||||
Binary file not shown.
@@ -0,0 +1,3 @@
|
||||
A = load '$INPUT' using PigStorage(':') as (fruit: chararray);
|
||||
B = foreach A generate com.hadoopbook.pig.Trim(fruit);
|
||||
store B into '$OUTPUT' USING PigStorage();
|
||||
@@ -14,7 +14,10 @@
|
||||
# limitations under the License.
|
||||
|
||||
import json
|
||||
import random
|
||||
import re
|
||||
import six
|
||||
import string
|
||||
import tempfile
|
||||
import time
|
||||
|
||||
@@ -63,6 +66,47 @@ class Utils(object):
|
||||
return self.find_object_by_field(name,
|
||||
self.client.clusters.list())
|
||||
|
||||
def find_job_by_id(self, id):
|
||||
return self.client.job_executions.get(id)
|
||||
|
||||
def find_job_binary_by_id(self, id):
|
||||
return self.client.job_binaries.get(id)
|
||||
|
||||
def find_job_template_by_id(self, id):
|
||||
return self.client.jobs.get(id)
|
||||
|
||||
def find_data_source_by_id(self, id):
|
||||
return self.client.data_sources.get(id)
|
||||
|
||||
def find_binary_internal_by_id(self, id):
|
||||
return self.client.job_binary_internals.get(id)
|
||||
|
||||
def find_job_binary_by_name(self, name):
|
||||
return self.find_object_by_field(name,
|
||||
self.client.job_binaries.list())
|
||||
|
||||
def find_job_template_by_name(self, name):
|
||||
return self.find_object_by_field(name,
|
||||
self.client.jobs.list())
|
||||
|
||||
def find_data_source_by_name(self, name):
|
||||
return self.find_object_by_field(name,
|
||||
self.client.data_sources.list())
|
||||
|
||||
def find_job_by_job_template_id(self, id):
|
||||
return self.find_object_by_field(id,
|
||||
self.client.job_executions.list(),
|
||||
"job_id")
|
||||
|
||||
def find_binary_internal_id(self, output):
|
||||
pattern = '\|\s*%s\s*\|\s*%s' # match '| id | name'
|
||||
internals = [(i.id,
|
||||
i.name) for i in self.client.job_binary_internals.list()]
|
||||
for i in internals:
|
||||
prog = re.compile(pattern % i)
|
||||
if prog.search(output):
|
||||
return i[0]
|
||||
|
||||
def generate_json_file(self, temp):
|
||||
f = tempfile.NamedTemporaryFile(delete=True)
|
||||
f.write(json.dumps(temp))
|
||||
@@ -82,9 +126,23 @@ class Utils(object):
|
||||
cluster = self.client.clusters.get(id)
|
||||
return str(cluster.status)
|
||||
|
||||
def poll_job_execution(self, id):
|
||||
#TODO(tmckay): this should use timeutils but we need
|
||||
#to add it to openstack/common
|
||||
timeout = common['JOB_LAUNCH_TIMEOUT'] * 60
|
||||
status = self.client.job_executions.get(id).info['status']
|
||||
while status != 'SUCCEEDED':
|
||||
if status == 'KILLED' or timeout <= 0:
|
||||
break
|
||||
time.sleep(10)
|
||||
timeout -= 10
|
||||
status = self.client.job_executions.get(id).info['status']
|
||||
return status
|
||||
|
||||
|
||||
class SwiftUtils(object):
|
||||
def __init__(self):
|
||||
self.containers = []
|
||||
self.client = swift_client.Connection(
|
||||
authurl=common['OS_AUTH_URL'],
|
||||
user=common['OS_USERNAME'],
|
||||
@@ -92,6 +150,31 @@ class SwiftUtils(object):
|
||||
tenant_name=common['OS_TENANT_NAME'],
|
||||
auth_version=common['SWIFT_AUTH_VERSION'])
|
||||
|
||||
def create_container(self, marker):
|
||||
container_name = 'cli-test-%s' % marker
|
||||
self.client.put_container(container_name)
|
||||
self.containers.append(container_name)
|
||||
return container_name
|
||||
|
||||
def generate_input(self, container_name, input_name):
|
||||
self.client.put_object(
|
||||
container_name, input_name, ''.join(
|
||||
random.choice(':' + ' ' + '\n' + string.ascii_lowercase)
|
||||
for x in range(10000)
|
||||
)
|
||||
)
|
||||
|
||||
def upload(self, container_name, obj_name, data):
|
||||
self.client.put_object(container_name, obj_name, data)
|
||||
|
||||
def delete_containers(self):
|
||||
for container in self.containers:
|
||||
objects = [obj['name'] for obj in (
|
||||
self.client.get_container(container)[1])]
|
||||
for obj in objects:
|
||||
self.client.delete_object(container, obj)
|
||||
self.client.delete_container(container)
|
||||
|
||||
|
||||
class AssertionWrappers(object):
|
||||
def check_dict_elems_in_obj(self, d, obj, exclude=[]):
|
||||
|
||||
Reference in New Issue
Block a user