Sahara Job Execution benchmark

The benchmark for job execution added.

Change-Id: Iac251de92413f0f1fe1c872b5183c91e40eb1232
This commit is contained in:
Nikita Konovalov 2014-08-26 16:32:43 +04:00
parent 5b510cf90b
commit 90064d0139
10 changed files with 568 additions and 6 deletions

View File

@ -0,0 +1,50 @@
{
"SaharaJob.create_launch_job": [
{
"args": {
"job_type": "Java",
"configs": {
"configs": {
"edp.java.main_class": "org.apache.hadoop.fs.TestDFSIO"
},
"args": ["-write", "-nrFiles", "10", "fileSize", "100"]
}
},
"runner": {
"type": "constant",
"times": 4,
"concurrency": 2
},
"context": {
"users": {
"tenants": 1,
"users_per_tenant": 1
},
"sahara_image": {
"image_url": "http://sahara-files.mirantis.com/sahara-icehouse-vanilla-1.2.1-ubuntu-13.10.qcow2",
"username": "ubuntu",
"plugin_name": "vanilla",
"hadoop_version": "1.2.1"
},
"sahara_edp": {
"input_type": "hdfs",
"output_type": "hdfs",
"input_url": "/",
"output_url_prefix": "/out_",
"libs": [
{
"name": "tests.jar",
"download_url": "http://repo1.maven.org/maven2/org/apache/hadoop/hadoop-test/1.2.1/hadoop-test-1.2.1.jar"
}
]
},
"sahara_cluster": {
"flavor_id": "2",
"node_count": 2,
"plugin_name": "vanilla",
"hadoop_version": "1.2.1"
}
}
}
]
}

View File

@ -0,0 +1,41 @@
---
SaharaJob.create_launch_job:
-
args:
job_type: "Java"
configs:
configs:
edp.java.main_class: "org.apache.hadoop.fs.TestDFSIO"
args:
- "-write"
- "-nrFiles"
- "10"
- "fileSize"
- "100"
runner:
type: "constant"
times: 4
concurrency: 2
context:
users:
tenants: 1
users_per_tenant: 1
sahara_image:
image_url: "http://sahara-files.mirantis.com/sahara-icehouse-vanilla-1.2.1-ubuntu-13.10.qcow2"
username: "ubuntu"
plugin_name: "vanilla"
hadoop_version: "1.2.1"
sahara_edp:
input_type: "hdfs"
output_type: "hdfs"
input_url: "/"
output_url_prefix: "/out_"
libs:
-
name: "tests.jar"
download_url: "http://repo1.maven.org/maven2/org/apache/hadoop/hadoop-test/1.2.1/hadoop-test-1.2.1.jar"
sahara_cluster:
flavor_id: "2"
node_count: 2
plugin_name: "vanilla"
hadoop_version: "1.2.1"

View File

@ -0,0 +1,50 @@
{
"SaharaJob.create_launch_job": [
{
"args": {
"job_type": "Java",
"configs": {
"configs": {
"edp.java.main_class": "org.apache.hadoop.examples.PiEstimator"
},
"args": ["10", "10"]
}
},
"runner": {
"type": "constant",
"times": 4,
"concurrency": 2
},
"context": {
"users": {
"tenants": 1,
"users_per_tenant": 1
},
"sahara_image": {
"image_url": "http://sahara-files.mirantis.com/sahara-icehouse-vanilla-1.2.1-ubuntu-13.10.qcow2",
"username": "ubuntu",
"plugin_name": "vanilla",
"hadoop_version": "1.2.1"
},
"sahara_edp": {
"input_type": "hdfs",
"output_type": "hdfs",
"input_url": "/",
"output_url_prefix": "/out_",
"libs": [
{
"name": "examples.jar",
"download_url": "http://repo1.maven.org/maven2/org/apache/hadoop/hadoop-examples/1.2.1/hadoop-examples-1.2.1.jar"
}
]
},
"sahara_cluster": {
"flavor_id": "2",
"node_count": 2,
"plugin_name": "vanilla",
"hadoop_version": "1.2.1"
}
}
}
]
}

View File

@ -0,0 +1,38 @@
---
SaharaJob.create_launch_job:
-
args:
job_type: "Java"
configs:
configs:
edp.java.main_class: "org.apache.hadoop.examples.PiEstimator"
args:
- "10"
- "10"
runner:
type: "constant"
times: 4
concurrency: 2
context:
users:
tenants: 1
users_per_tenant: 1
sahara_image:
image_url: "http://sahara-files.mirantis.com/sahara-icehouse-vanilla-1.2.1-ubuntu-13.10.qcow2"
username: "ubuntu"
plugin_name: "vanilla"
hadoop_version: "1.2.1"
sahara_edp:
input_type: "hdfs"
output_type: "hdfs"
input_url: "/"
output_url_prefix: "/out_"
libs:
-
name: "examples.jar"
download_url: "http://repo1.maven.org/maven2/org/apache/hadoop/hadoop-examples/1.2.1/hadoop-examples-1.2.1.jar"
sahara_cluster:
flavor_id: "2"
node_count: 2
plugin_name: "vanilla"
hadoop_version: "1.2.1"

View File

@ -0,0 +1,51 @@
{
"SaharaJob.create_launch_job": [
{
"args": {
"job_type": "Pig",
"configs": {}
},
"runner": {
"type": "constant",
"times": 4,
"concurrency": 2
},
"context": {
"users": {
"tenants": 1,
"users_per_tenant": 1
},
"sahara_image": {
"image_url": "http://sahara-files.mirantis.com/sahara-icehouse-vanilla-1.2.1-ubuntu-13.10.qcow2",
"username": "ubuntu",
"plugin_name": "vanilla",
"hadoop_version": "1.2.1"
},
"sahara_edp": {
"input_type": "hdfs",
"output_type": "hdfs",
"input_url": "/",
"output_url_prefix": "/out_",
"mains": [
{
"name": "example.pig",
"download_url": "https://raw.githubusercontent.com/openstack/sahara/master/etc/edp-examples/pig-job/example.pig"
}
],
"libs": [
{
"name": "udf.jar",
"download_url": "https://github.com/openstack/sahara/blob/master/etc/edp-examples/pig-job/udf.jar?raw=true"
}
]
},
"sahara_cluster": {
"flavor_id": "2",
"node_count": 2,
"plugin_name": "vanilla",
"hadoop_version": "1.2.1"
}
}
}
]
}

View File

@ -0,0 +1,37 @@
---
SaharaJob.create_launch_job:
-
args:
job_type: "Pig"
configs: {}
runner:
type: "constant"
times: 4
concurrency: 2
context:
users:
tenants: 1
users_per_tenant: 1
sahara_image:
image_url: "http://sahara-files.mirantis.com/sahara-icehouse-vanilla-1.2.1-ubuntu-13.10.qcow2"
username: "ubuntu"
plugin_name: "vanilla"
hadoop_version: "1.2.1"
sahara_edp:
input_type: "hdfs"
output_type: "hdfs"
input_url: "/"
output_url_prefix: "/out_"
mains:
-
name: "example.pig"
download_url: "https://raw.githubusercontent.com/openstack/sahara/master/etc/edp-examples/pig-job/example.pig"
libs:
-
name: "udf.jar"
download_url: "https://github.com/openstack/sahara/blob/master/etc/edp-examples/pig-job/udf.jar?raw=true"
sahara_cluster:
flavor_id: "2"
node_count: 2
plugin_name: "vanilla"
hadoop_version: "1.2.1"

View File

@ -0,0 +1,66 @@
# Copyright 2014: Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from rally.benchmark.scenarios import base
from rally.benchmark.scenarios.sahara import utils
from rally.benchmark import validation
from rally import consts
from rally.openstack.common import log as logging
LOG = logging.getLogger(__name__)
class SaharaJob(utils.SaharaScenario):
@validation.required_services(consts.Service.SAHARA)
@validation.required_contexts("users", "sahara_image", "sahara_edp",
"sahara_cluster")
@base.scenario(context={"cleanup": ["sahara"]})
def create_launch_job(self, job_type, configs):
"""Test the Sahara EDP Job execution.
:param job_type: The type of the Data Processing Job
:param configs: The configs dict that will be passed to a Job Execution
This scenario Creates a Job entity and launches an execution on a
Cluster.
"""
tenant_id = self.clients("keystone").tenant_id
mains = self.context()["sahara_mains"][tenant_id]
libs = self.context()["sahara_libs"][tenant_id]
name = self._generate_random_name(prefix="job_")
job = self.clients("sahara").jobs.create(name=name,
type=job_type,
description="",
mains=mains,
libs=libs)
cluster_id = self.context()["sahara_clusters"][tenant_id]
if job_type.lower() == "java":
input_id = None
output_id = None
else:
input_id = self.context()["sahara_inputs"][tenant_id]
output_id = self._create_output_ds().id
self._run_job_execution(job_id=job.id,
cluster_id=cluster_id,
input_id=input_id,
output_id=output_id,
configs=configs)

View File

@ -24,15 +24,19 @@ from rally.openstack.common import log as logging
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
CONF = cfg.CONF CONF = cfg.CONF
CREATE_CLUSTER_OPTS = [ TIMEOUT_OPTS = [
cfg.IntOpt("cluster_create_timeout", default=600, cfg.IntOpt("cluster_create_timeout", default=600,
help="A timeout in seconds for a cluster create operation"), help="A timeout in seconds for a cluster create operation"),
cfg.IntOpt("cluster_check_interval", default=5, cfg.IntOpt("cluster_check_interval", default=5,
help="Cluster status polling interval in seconds"),
cfg.IntOpt("job_execution_timeout", default=600,
help="A timeout in seconds for a cluster create operation"),
cfg.IntOpt("job_check_interval", default=5,
help="Cluster status polling interval in seconds") help="Cluster status polling interval in seconds")
] ]
benchmark_group = cfg.OptGroup(name='benchmark', title='benchmark options') benchmark_group = cfg.OptGroup(name='benchmark', title='benchmark options')
CONF.register_opts(CREATE_CLUSTER_OPTS, group=benchmark_group) CONF.register_opts(TIMEOUT_OPTS, group=benchmark_group)
class SaharaScenario(base.Scenario): class SaharaScenario(base.Scenario):
@ -244,3 +248,46 @@ class SaharaScenario(base.Scenario):
description="", description="",
data_source_type=ds_type, data_source_type=ds_type,
url=url) url=url)
@base.atomic_action_timer('sahara.job_execution')
def _run_job_execution(self, job_id, cluster_id, input_id, output_id,
configs):
"""Runs a Job Execution and waits until it completes or fails.
The Job Execution is accepted as successful when Oozie reports
"success" or "succeeded" status. The failure statuses are "failed" and
"killed".
The timeout and the polling interval may be configured through
"job_execution_timeout" and "job_check_interval" parameters under the
"benchmark" section.
:param job_id: The Job id that will be executed
:param cluster_id: The Cluster id which will execute the Job
:param input_id: The input Data Source id
:param output_id: The output Data Source id
:param configs: The config dict that will be passed as Job Execution's
parameters.
"""
job_execution = self.clients("sahara").job_executions.create(
job_id=job_id,
cluster_id=cluster_id,
input_id=input_id,
output_id=output_id,
configs=configs)
def is_finished(je_id):
status = self.clients("sahara").job_executions.get(je_id).info[
'status']
if status.lower() in ("success", "succeeded"):
return True
elif status.lower() in ("failed", "killed"):
raise exceptions.RallyException("Job execution %s has failed"
% je_id)
return False
bench_utils.wait_for(
resource=job_execution.id, is_ready=is_finished,
timeout=CONF.benchmark.job_execution_timeout,
check_interval=CONF.benchmark.job_check_interval)

View File

@ -0,0 +1,117 @@
# Copyright 2014: Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo.config import cfg
from rally.benchmark.scenarios.sahara import jobs
from tests import test
CONF = cfg.CONF
SAHARA_JOB = "rally.benchmark.scenarios.sahara.jobs.SaharaJob"
SAHARA_UTILS = 'rally.benchmark.scenarios.sahara.utils'
class SaharaJobTestCase(test.TestCase):
def setUp(self):
super(SaharaJobTestCase, self).setUp()
CONF.set_override("cluster_check_interval", 0, "benchmark")
CONF.set_override("job_check_interval", 0, "benchmark")
@mock.patch(SAHARA_UTILS + '.SaharaScenario._generate_random_name',
return_value="job_42")
@mock.patch(SAHARA_JOB + "._run_job_execution")
@mock.patch(SAHARA_UTILS + '.SaharaScenario.clients')
def test_create_launch_job_java(self, mock_osclients, mock_run_execution,
mock_random_name):
mock_sahara = mock_osclients("sahara")
mock_sahara.jobs.create.return_value = mock.MagicMock(id="42")
jobs_scenario = jobs.SaharaJob()
jobs_scenario.clients("keystone").tenant_id = "test_tenant"
jobs_scenario.context = mock.MagicMock(return_value={
"sahara_images": {"test_tenant": "test_image"},
"sahara_mains": {"test_tenant": ["main_42"]},
"sahara_libs": {"test_tenant": ["lib_42"]},
"sahara_clusters": {"test_tenant": "cl_42"},
"sahara_inputs": {"test_tenant": "in_42"}}
)
jobs_scenario.create_launch_job(
job_type="java",
configs={"conf_key": "conf_val"}
)
mock_sahara.jobs.create.assert_called_once_with(
name=mock_random_name.return_value,
type="java",
description="",
mains=["main_42"],
libs=["lib_42"]
)
mock_run_execution.assert_called_once_with(
job_id="42",
cluster_id="cl_42",
input_id=None,
output_id=None,
configs={"conf_key": "conf_val"}
)
@mock.patch(SAHARA_UTILS + '.SaharaScenario._generate_random_name',
return_value="job_42")
@mock.patch(SAHARA_JOB + "._run_job_execution")
@mock.patch(SAHARA_JOB + "._create_output_ds",
return_value=mock.MagicMock(id="out_42"))
@mock.patch(SAHARA_UTILS + '.SaharaScenario.clients')
def test_create_launch_job_pig(self, mock_osclients, mock_create_ds,
mock_run_execution, mock_random_name):
mock_sahara = mock_osclients("sahara")
mock_sahara.jobs.create.return_value = mock.MagicMock(id="42")
jobs_scenario = jobs.SaharaJob()
jobs_scenario.clients("keystone").tenant_id = "test_tenant"
jobs_scenario.context = mock.MagicMock(return_value={
"sahara_images": {"test_tenant": "test_image"},
"sahara_mains": {"test_tenant": ["main_42"]},
"sahara_libs": {"test_tenant": ["lib_42"]},
"sahara_clusters": {"test_tenant": "cl_42"},
"sahara_inputs": {"test_tenant": "in_42"}}
)
jobs_scenario.create_launch_job(
job_type="pig",
configs={"conf_key": "conf_val"}
)
mock_sahara.jobs.create.assert_called_once_with(
name=mock_random_name.return_value,
type="pig",
description="",
mains=["main_42"],
libs=["lib_42"]
)
mock_run_execution.assert_called_once_with(
job_id="42",
cluster_id="cl_42",
input_id="in_42",
output_id="out_42",
configs={"conf_key": "conf_val"}
)

View File

@ -14,6 +14,7 @@
# under the License. # under the License.
import mock import mock
from oslo.config import cfg
from saharaclient.api import base as sahara_base from saharaclient.api import base as sahara_base
from rally.benchmark.scenarios.sahara import utils from rally.benchmark.scenarios.sahara import utils
@ -21,11 +22,18 @@ from rally import exceptions
from tests.benchmark.scenarios import test_base from tests.benchmark.scenarios import test_base
from tests import test from tests import test
CONF = cfg.CONF
SAHARA_UTILS = 'rally.benchmark.scenarios.sahara.utils' SAHARA_UTILS = 'rally.benchmark.scenarios.sahara.utils'
class SaharaNodeGroupTemplatesScenarioTestCase(test.TestCase): class SaharaUtilsTestCase(test.TestCase):
def setUp(self):
super(SaharaUtilsTestCase, self).setUp()
CONF.set_override("cluster_check_interval", 0, "benchmark")
CONF.set_override("job_check_interval", 0, "benchmark")
def _test_atomic_action_timer(self, atomic_actions, name): def _test_atomic_action_timer(self, atomic_actions, name):
action_duration = test_base.get_atomic_action_timer_value_by_name( action_duration = test_base.get_atomic_action_timer_value_by_name(
@ -196,9 +204,9 @@ class SaharaNodeGroupTemplatesScenarioTestCase(test.TestCase):
delete_mock = mock_clients("sahara").clusters.delete delete_mock = mock_clients("sahara").clusters.delete
delete_mock.assert_called_once_with(42) delete_mock.assert_called_once_with(42)
mock_clients("sahara").clusters.get.assert_has_calls([ cl_get_expected = mock.call(42)
mock.call(42), mock_clients("sahara").clusters.get.assert_has_calls([cl_get_expected,
mock.call(42)]) cl_get_expected])
self._test_atomic_action_timer(scenario.atomic_actions(), self._test_atomic_action_timer(scenario.atomic_actions(),
'sahara.delete_cluster') 'sahara.delete_cluster')
@ -240,3 +248,60 @@ class SaharaNodeGroupTemplatesScenarioTestCase(test.TestCase):
scenario = utils.SaharaScenario(ctxt) scenario = utils.SaharaScenario(ctxt)
self.assertRaises(exceptions.RallyException, self.assertRaises(exceptions.RallyException,
scenario._create_output_ds) scenario._create_output_ds)
@mock.patch(SAHARA_UTILS + '.SaharaScenario.clients')
def test_run_job_execution(self, mock_clients):
mock_clients("sahara").job_executions.get.side_effect = [
mock.MagicMock(info={"status": "pending"}, id="42"),
mock.MagicMock(info={"status": "SUCCESS"}, id="42")]
mock_clients("sahara").job_executions.create.return_value = (
mock.MagicMock(id="42"))
scenario = utils.SaharaScenario()
scenario._run_job_execution(job_id="test_job_id",
cluster_id="test_cluster_id",
input_id="test_input_id",
output_id="test_output_id",
configs={"k": "v"})
mock_clients("sahara").job_executions.create.assert_called_once_with(
job_id="test_job_id",
cluster_id="test_cluster_id",
input_id="test_input_id",
output_id="test_output_id",
configs={"k": "v"}
)
je_get_expected = mock.call("42")
mock_clients("sahara").job_executions.get.assert_has_calls(
[je_get_expected, je_get_expected]
)
@mock.patch(SAHARA_UTILS + '.SaharaScenario.clients')
def test_run_job_execution_fail(self, mock_clients):
mock_clients("sahara").job_executions.get.side_effect = [
mock.MagicMock(info={"status": "pending"}, id="42"),
mock.MagicMock(info={"status": "killed"}, id="42")]
mock_clients("sahara").job_executions.create.return_value = (
mock.MagicMock(id="42"))
scenario = utils.SaharaScenario()
self.assertRaises(exceptions.RallyException,
scenario._run_job_execution,
job_id="test_job_id",
cluster_id="test_cluster_id",
input_id="test_input_id",
output_id="test_output_id",
configs={"k": "v"})
mock_clients("sahara").job_executions.create.assert_called_once_with(
job_id="test_job_id",
cluster_id="test_cluster_id",
input_id="test_input_id",
output_id="test_output_id",
configs={"k": "v"}
)