Merge "[Sahara] Split EDP context"
This commit is contained in:
commit
a298b40f96
@ -0,0 +1,89 @@
|
|||||||
|
# Copyright 2015: Mirantis Inc.
|
||||||
|
# All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
from rally.common.i18n import _
|
||||||
|
from rally.common import log as logging
|
||||||
|
from rally.common import utils as rutils
|
||||||
|
from rally import consts
|
||||||
|
from rally import exceptions
|
||||||
|
from rally import osclients
|
||||||
|
from rally.plugins.openstack.context.cleanup import manager as resource_manager
|
||||||
|
from rally.task import context
|
||||||
|
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@context.configure(name="sahara_data_sources", order=443)
|
||||||
|
class SaharaDataSources(context.Context):
|
||||||
|
"""Context class for setting up Data Sources for an EDP job."""
|
||||||
|
|
||||||
|
CONFIG_SCHEMA = {
|
||||||
|
"type": "object",
|
||||||
|
"$schema": consts.JSON_SCHEMA,
|
||||||
|
"properties": {
|
||||||
|
"input_type": {
|
||||||
|
"enum": ["swift", "hdfs"],
|
||||||
|
},
|
||||||
|
"input_url": {
|
||||||
|
"type": "string",
|
||||||
|
},
|
||||||
|
"output_type": {
|
||||||
|
"enum": ["swift", "hdfs"],
|
||||||
|
},
|
||||||
|
"output_url_prefix": {
|
||||||
|
"type": "string",
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"additionalProperties": False,
|
||||||
|
"required": ["input_type", "input_url",
|
||||||
|
"output_type", "output_url_prefix"]
|
||||||
|
}
|
||||||
|
|
||||||
|
@rutils.log_task_wrapper(LOG.info,
|
||||||
|
_("Enter context: `Sahara Data Sources`"))
|
||||||
|
def setup(self):
|
||||||
|
self.context["sahara_output_conf"] = {
|
||||||
|
"output_type": self.config["output_type"],
|
||||||
|
"output_url_prefix": self.config["output_url_prefix"]
|
||||||
|
}
|
||||||
|
for user, tenant_id in rutils.iterate_per_tenants(
|
||||||
|
self.context["users"]):
|
||||||
|
clients = osclients.Clients(user["endpoint"])
|
||||||
|
sahara = clients.sahara()
|
||||||
|
|
||||||
|
self.setup_inputs(sahara, tenant_id, self.config["input_type"],
|
||||||
|
self.config["input_url"])
|
||||||
|
|
||||||
|
def setup_inputs(self, sahara, tenant_id, input_type, input_url):
|
||||||
|
if input_type == "swift":
|
||||||
|
raise exceptions.RallyException(
|
||||||
|
_("Swift Data Sources are not implemented yet"))
|
||||||
|
# Todo(nkonovalov): Add swift credentials parameters and data upload
|
||||||
|
input_ds = sahara.data_sources.create(
|
||||||
|
name=self.generate_random_name(),
|
||||||
|
description="",
|
||||||
|
data_source_type=input_type,
|
||||||
|
url=input_url)
|
||||||
|
|
||||||
|
self.context["tenants"][tenant_id]["sahara_input"] = input_ds.id
|
||||||
|
|
||||||
|
@rutils.log_task_wrapper(LOG.info, _("Exit context: `Sahara EDP`"))
|
||||||
|
def cleanup(self):
|
||||||
|
resources = ["job_executions", "jobs", "data_sources"]
|
||||||
|
|
||||||
|
# TODO(boris-42): Delete only resources created by this context
|
||||||
|
resource_manager.cleanup(
|
||||||
|
names=["sahara.%s" % res for res in resources],
|
||||||
|
users=self.context.get("users", []))
|
@ -28,26 +28,14 @@ from rally.task import context
|
|||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
@context.configure(name="sahara_edp", order=442)
|
@context.configure(name="sahara_job_binaries", order=442)
|
||||||
class SaharaEDP(context.Context):
|
class SaharaJobBinaries(context.Context):
|
||||||
"""Context class for setting up the environment for an EDP job."""
|
"""Context class for setting up Job Binaries for an EDP job."""
|
||||||
|
|
||||||
CONFIG_SCHEMA = {
|
CONFIG_SCHEMA = {
|
||||||
"type": "object",
|
"type": "object",
|
||||||
"$schema": consts.JSON_SCHEMA,
|
"$schema": consts.JSON_SCHEMA,
|
||||||
"properties": {
|
"properties": {
|
||||||
"input_type": {
|
|
||||||
"enum": ["swift", "hdfs"],
|
|
||||||
},
|
|
||||||
"input_url": {
|
|
||||||
"type": "string",
|
|
||||||
},
|
|
||||||
"output_type": {
|
|
||||||
"enum": ["swift", "hdfs"],
|
|
||||||
},
|
|
||||||
"output_url_prefix": {
|
|
||||||
"type": "string",
|
|
||||||
},
|
|
||||||
"mains": {
|
"mains": {
|
||||||
"type": "array",
|
"type": "array",
|
||||||
"items": {
|
"items": {
|
||||||
@ -81,24 +69,16 @@ class SaharaEDP(context.Context):
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"additionalProperties": False,
|
"additionalProperties": False
|
||||||
"required": ["input_type", "input_url",
|
|
||||||
"output_type", "output_url_prefix"]
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@rutils.log_task_wrapper(LOG.info, _("Enter context: `Sahara EDP`"))
|
# This cache will hold the downloaded libs content to prevent repeated
|
||||||
def setup(self):
|
# downloads for each tenant
|
||||||
self.context["sahara_output_conf"] = {
|
lib_cache = {}
|
||||||
"output_type": self.config["output_type"],
|
|
||||||
"output_url_prefix": self.config["output_url_prefix"]
|
|
||||||
}
|
|
||||||
self.context["sahara_mains"] = {}
|
|
||||||
self.context["sahara_libs"] = {}
|
|
||||||
|
|
||||||
input_type = self.config["input_type"]
|
@rutils.log_task_wrapper(LOG.info,
|
||||||
input_url = self.config["input_url"]
|
_("Enter context: `Sahara Job Binaries`"))
|
||||||
mains = self.config.get("mains", [])
|
def setup(self):
|
||||||
libs = self.config.get("libs", [])
|
|
||||||
|
|
||||||
for user, tenant_id in rutils.iterate_per_tenants(
|
for user, tenant_id in rutils.iterate_per_tenants(
|
||||||
self.context["users"]):
|
self.context["users"]):
|
||||||
@ -106,12 +86,10 @@ class SaharaEDP(context.Context):
|
|||||||
clients = osclients.Clients(user["endpoint"])
|
clients = osclients.Clients(user["endpoint"])
|
||||||
sahara = clients.sahara()
|
sahara = clients.sahara()
|
||||||
|
|
||||||
self.setup_inputs(sahara, tenant_id, input_type, input_url)
|
|
||||||
|
|
||||||
self.context["tenants"][tenant_id]["sahara_mains"] = []
|
self.context["tenants"][tenant_id]["sahara_mains"] = []
|
||||||
self.context["tenants"][tenant_id]["sahara_libs"] = []
|
self.context["tenants"][tenant_id]["sahara_libs"] = []
|
||||||
|
|
||||||
for main in mains:
|
for main in self.config.get("mains", []):
|
||||||
self.download_and_save_lib(
|
self.download_and_save_lib(
|
||||||
sahara=sahara,
|
sahara=sahara,
|
||||||
lib_type="sahara_mains",
|
lib_type="sahara_mains",
|
||||||
@ -119,7 +97,7 @@ class SaharaEDP(context.Context):
|
|||||||
download_url=main["download_url"],
|
download_url=main["download_url"],
|
||||||
tenant_id=tenant_id)
|
tenant_id=tenant_id)
|
||||||
|
|
||||||
for lib in libs:
|
for lib in self.config.get("libs", []):
|
||||||
self.download_and_save_lib(
|
self.download_and_save_lib(
|
||||||
sahara=sahara,
|
sahara=sahara,
|
||||||
lib_type="sahara_libs",
|
lib_type="sahara_libs",
|
||||||
@ -142,7 +120,11 @@ class SaharaEDP(context.Context):
|
|||||||
|
|
||||||
def download_and_save_lib(self, sahara, lib_type, name, download_url,
|
def download_and_save_lib(self, sahara, lib_type, name, download_url,
|
||||||
tenant_id):
|
tenant_id):
|
||||||
lib_data = requests.get(download_url).content
|
if download_url not in self.lib_cache:
|
||||||
|
lib_data = requests.get(download_url).content
|
||||||
|
self.lib_cache[download_url] = lib_data
|
||||||
|
else:
|
||||||
|
lib_data = self.lib_cache[download_url]
|
||||||
|
|
||||||
job_binary_internal = sahara.job_binary_internals.create(
|
job_binary_internal = sahara.job_binary_internals.create(
|
||||||
name=name,
|
name=name,
|
||||||
@ -156,10 +138,10 @@ class SaharaEDP(context.Context):
|
|||||||
|
|
||||||
self.context["tenants"][tenant_id][lib_type].append(job_binary.id)
|
self.context["tenants"][tenant_id][lib_type].append(job_binary.id)
|
||||||
|
|
||||||
@rutils.log_task_wrapper(LOG.info, _("Exit context: `Sahara EDP`"))
|
@rutils.log_task_wrapper(LOG.info,
|
||||||
|
_("Exit context: `Sahara Job Binaries`"))
|
||||||
def cleanup(self):
|
def cleanup(self):
|
||||||
resources = ["job_executions", "jobs", "job_binary_internals",
|
resources = ["job_binary_internals", "job_binaries"]
|
||||||
"job_binaries", "data_sources"]
|
|
||||||
|
|
||||||
# TODO(boris-42): Delete only resources created by this context
|
# TODO(boris-42): Delete only resources created by this context
|
||||||
resource_manager.cleanup(
|
resource_manager.cleanup(
|
@ -26,8 +26,8 @@ class SaharaJob(utils.SaharaScenario):
|
|||||||
"""Benchmark scenarios for Sahara jobs."""
|
"""Benchmark scenarios for Sahara jobs."""
|
||||||
|
|
||||||
@validation.required_services(consts.Service.SAHARA)
|
@validation.required_services(consts.Service.SAHARA)
|
||||||
@validation.required_contexts("users", "sahara_image", "sahara_edp",
|
@validation.required_contexts("users", "sahara_image",
|
||||||
"sahara_cluster")
|
"sahara_job_binaries", "sahara_cluster")
|
||||||
@scenario.configure(context={"cleanup": ["sahara"]})
|
@scenario.configure(context={"cleanup": ["sahara"]})
|
||||||
def create_launch_job(self, job_type, configs, job_idx=0):
|
def create_launch_job(self, job_type, configs, job_idx=0):
|
||||||
"""Create and execute a Sahara EDP Job.
|
"""Create and execute a Sahara EDP Job.
|
||||||
@ -69,8 +69,8 @@ class SaharaJob(utils.SaharaScenario):
|
|||||||
job_idx=job_idx)
|
job_idx=job_idx)
|
||||||
|
|
||||||
@validation.required_services(consts.Service.SAHARA)
|
@validation.required_services(consts.Service.SAHARA)
|
||||||
@validation.required_contexts("users", "sahara_image", "sahara_edp",
|
@validation.required_contexts("users", "sahara_image",
|
||||||
"sahara_cluster")
|
"sahara_job_binaries", "sahara_cluster")
|
||||||
@scenario.configure(context={"cleanup": ["sahara"]})
|
@scenario.configure(context={"cleanup": ["sahara"]})
|
||||||
def create_launch_job_sequence(self, jobs):
|
def create_launch_job_sequence(self, jobs):
|
||||||
"""Create and execute a sequence of the Sahara EDP Jobs.
|
"""Create and execute a sequence of the Sahara EDP Jobs.
|
||||||
@ -86,8 +86,8 @@ class SaharaJob(utils.SaharaScenario):
|
|||||||
self.create_launch_job(job["job_type"], job["configs"], idx)
|
self.create_launch_job(job["job_type"], job["configs"], idx)
|
||||||
|
|
||||||
@validation.required_services(consts.Service.SAHARA)
|
@validation.required_services(consts.Service.SAHARA)
|
||||||
@validation.required_contexts("users", "sahara_image", "sahara_edp",
|
@validation.required_contexts("users", "sahara_image",
|
||||||
"sahara_cluster")
|
"sahara_job_binaries", "sahara_cluster")
|
||||||
@scenario.configure(context={"cleanup": ["sahara"]})
|
@scenario.configure(context={"cleanup": ["sahara"]})
|
||||||
def create_launch_job_sequence_with_scaling(self, jobs, deltas):
|
def create_launch_job_sequence_with_scaling(self, jobs, deltas):
|
||||||
"""Create and execute Sahara EDP Jobs on a scaling Cluster.
|
"""Create and execute Sahara EDP Jobs on a scaling Cluster.
|
||||||
|
@ -38,11 +38,7 @@
|
|||||||
"plugin_name": "vanilla",
|
"plugin_name": "vanilla",
|
||||||
"hadoop_version": "1.2.1"
|
"hadoop_version": "1.2.1"
|
||||||
},
|
},
|
||||||
"sahara_edp": {
|
"sahara_job_binaries": {
|
||||||
"input_type": "hdfs",
|
|
||||||
"output_type": "hdfs",
|
|
||||||
"input_url": "/",
|
|
||||||
"output_url_prefix": "/out_",
|
|
||||||
"libs": [
|
"libs": [
|
||||||
{
|
{
|
||||||
"name": "tests.jar",
|
"name": "tests.jar",
|
||||||
|
@ -38,11 +38,7 @@
|
|||||||
username: "ubuntu"
|
username: "ubuntu"
|
||||||
plugin_name: "vanilla"
|
plugin_name: "vanilla"
|
||||||
hadoop_version: "1.2.1"
|
hadoop_version: "1.2.1"
|
||||||
sahara_edp:
|
sahara_job_binaries:
|
||||||
input_type: "hdfs"
|
|
||||||
output_type: "hdfs"
|
|
||||||
input_url: "/"
|
|
||||||
output_url_prefix: "/out_"
|
|
||||||
libs:
|
libs:
|
||||||
-
|
-
|
||||||
name: "tests.jar"
|
name: "tests.jar"
|
||||||
|
@ -38,11 +38,7 @@
|
|||||||
"plugin_name": "vanilla",
|
"plugin_name": "vanilla",
|
||||||
"hadoop_version": "1.2.1"
|
"hadoop_version": "1.2.1"
|
||||||
},
|
},
|
||||||
"sahara_edp": {
|
"sahara_job_binaries": {
|
||||||
"input_type": "hdfs",
|
|
||||||
"output_type": "hdfs",
|
|
||||||
"input_url": "/",
|
|
||||||
"output_url_prefix": "/out_",
|
|
||||||
"libs": [
|
"libs": [
|
||||||
{
|
{
|
||||||
"name": "tests.jar",
|
"name": "tests.jar",
|
||||||
|
@ -41,11 +41,7 @@
|
|||||||
username: "ubuntu"
|
username: "ubuntu"
|
||||||
plugin_name: "vanilla"
|
plugin_name: "vanilla"
|
||||||
hadoop_version: "1.2.1"
|
hadoop_version: "1.2.1"
|
||||||
sahara_edp:
|
sahara_job_binaries:
|
||||||
input_type: "hdfs"
|
|
||||||
output_type: "hdfs"
|
|
||||||
input_url: "/"
|
|
||||||
output_url_prefix: "/out_"
|
|
||||||
libs:
|
libs:
|
||||||
-
|
-
|
||||||
name: "tests.jar"
|
name: "tests.jar"
|
||||||
|
@ -26,11 +26,7 @@
|
|||||||
"plugin_name": "vanilla",
|
"plugin_name": "vanilla",
|
||||||
"hadoop_version": "1.2.1"
|
"hadoop_version": "1.2.1"
|
||||||
},
|
},
|
||||||
"sahara_edp": {
|
"sahara_job_binaries": {
|
||||||
"input_type": "hdfs",
|
|
||||||
"output_type": "hdfs",
|
|
||||||
"input_url": "/",
|
|
||||||
"output_url_prefix": "/out_",
|
|
||||||
"libs": [
|
"libs": [
|
||||||
{
|
{
|
||||||
"name": "examples.jar",
|
"name": "examples.jar",
|
||||||
|
@ -22,11 +22,7 @@
|
|||||||
username: "ubuntu"
|
username: "ubuntu"
|
||||||
plugin_name: "vanilla"
|
plugin_name: "vanilla"
|
||||||
hadoop_version: "1.2.1"
|
hadoop_version: "1.2.1"
|
||||||
sahara_edp:
|
sahara_job_binaries:
|
||||||
input_type: "hdfs"
|
|
||||||
output_type: "hdfs"
|
|
||||||
input_url: "/"
|
|
||||||
output_url_prefix: "/out_"
|
|
||||||
libs:
|
libs:
|
||||||
-
|
-
|
||||||
name: "examples.jar"
|
name: "examples.jar"
|
||||||
|
@ -21,11 +21,7 @@
|
|||||||
"plugin_name": "vanilla",
|
"plugin_name": "vanilla",
|
||||||
"hadoop_version": "1.2.1"
|
"hadoop_version": "1.2.1"
|
||||||
},
|
},
|
||||||
"sahara_edp": {
|
"sahara_job_binaries": {
|
||||||
"input_type": "hdfs",
|
|
||||||
"output_type": "hdfs",
|
|
||||||
"input_url": "/",
|
|
||||||
"output_url_prefix": "/out_",
|
|
||||||
"mains": [
|
"mains": [
|
||||||
{
|
{
|
||||||
"name": "example.pig",
|
"name": "example.pig",
|
||||||
@ -39,6 +35,12 @@
|
|||||||
}
|
}
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
|
"sahara_data_sources": {
|
||||||
|
"input_type": "hdfs",
|
||||||
|
"output_type": "hdfs",
|
||||||
|
"input_url": "/",
|
||||||
|
"output_url_prefix": "/out_"
|
||||||
|
},
|
||||||
"sahara_cluster": {
|
"sahara_cluster": {
|
||||||
"flavor_id": "2",
|
"flavor_id": "2",
|
||||||
"workers_count": 3,
|
"workers_count": 3,
|
||||||
|
@ -17,11 +17,7 @@
|
|||||||
username: "ubuntu"
|
username: "ubuntu"
|
||||||
plugin_name: "vanilla"
|
plugin_name: "vanilla"
|
||||||
hadoop_version: "1.2.1"
|
hadoop_version: "1.2.1"
|
||||||
sahara_edp:
|
sahara_job_binaries:
|
||||||
input_type: "hdfs"
|
|
||||||
output_type: "hdfs"
|
|
||||||
input_url: "/"
|
|
||||||
output_url_prefix: "/out_"
|
|
||||||
mains:
|
mains:
|
||||||
-
|
-
|
||||||
name: "example.pig"
|
name: "example.pig"
|
||||||
@ -30,6 +26,11 @@
|
|||||||
-
|
-
|
||||||
name: "udf.jar"
|
name: "udf.jar"
|
||||||
download_url: "https://github.com/openstack/sahara/blob/master/etc/edp-examples/pig-job/udf.jar?raw=true"
|
download_url: "https://github.com/openstack/sahara/blob/master/etc/edp-examples/pig-job/udf.jar?raw=true"
|
||||||
|
sahara_data_sources:
|
||||||
|
input_type: "hdfs"
|
||||||
|
output_type: "hdfs"
|
||||||
|
input_url: "/"
|
||||||
|
output_url_prefix: "/out_"
|
||||||
sahara_cluster:
|
sahara_cluster:
|
||||||
flavor_id: "2"
|
flavor_id: "2"
|
||||||
workers_count: 3
|
workers_count: 3
|
||||||
|
@ -14,16 +14,17 @@
|
|||||||
|
|
||||||
import mock
|
import mock
|
||||||
|
|
||||||
from rally.plugins.openstack.context.sahara import sahara_edp
|
from rally.plugins.openstack.context.sahara import sahara_data_sources
|
||||||
from tests.unit import test
|
from tests.unit import test
|
||||||
|
|
||||||
|
|
||||||
CTX = "rally.plugins.openstack.context.sahara"
|
CTX = "rally.plugins.openstack.context.sahara"
|
||||||
|
|
||||||
|
|
||||||
class SaharaEDPTestCase(test.TestCase):
|
class SaharaDataSourcesTestCase(test.ScenarioTestCase):
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(SaharaEDPTestCase, self).setUp()
|
super(SaharaDataSourcesTestCase, self).setUp()
|
||||||
self.tenants_num = 2
|
self.tenants_num = 2
|
||||||
self.users_per_tenant = 2
|
self.users_per_tenant = 2
|
||||||
self.users = self.tenants_num * self.users_per_tenant
|
self.users = self.tenants_num * self.users_per_tenant
|
||||||
@ -44,55 +45,35 @@ class SaharaEDPTestCase(test.TestCase):
|
|||||||
for j in range(self.tenants_num)
|
for j in range(self.tenants_num)
|
||||||
for i in range(self.users_per_tenant)]
|
for i in range(self.users_per_tenant)]
|
||||||
|
|
||||||
@property
|
self.context.update({
|
||||||
def context_without_edp_keys(self):
|
|
||||||
context = test.get_test_context()
|
|
||||||
context.update({
|
|
||||||
"config": {
|
"config": {
|
||||||
"users": {
|
"users": {
|
||||||
"tenants": self.tenants_num,
|
"tenants": self.tenants_num,
|
||||||
"users_per_tenant": self.users_per_tenant,
|
"users_per_tenant": self.users_per_tenant,
|
||||||
},
|
},
|
||||||
"sahara_edp": {
|
"sahara_data_sources": {
|
||||||
"input_type": "hdfs",
|
"input_type": "hdfs",
|
||||||
"output_type": "hdfs",
|
"output_type": "hdfs",
|
||||||
"input_url": "hdfs://test_host/",
|
"input_url": "hdfs://test_host/",
|
||||||
"output_url_prefix": "hdfs://test_host/out_",
|
"output_url_prefix": "hdfs://test_host/out_"
|
||||||
"libs": [
|
|
||||||
{
|
|
||||||
"name": "test.jar",
|
|
||||||
"download_url": "http://example.com/test.jar"
|
|
||||||
}
|
|
||||||
]
|
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
"admin": {"endpoint": mock.MagicMock()},
|
"admin": {"endpoint": mock.MagicMock()},
|
||||||
"users": self.users_key,
|
"users": self.users_key,
|
||||||
"tenants": self.tenants
|
"tenants": self.tenants
|
||||||
})
|
})
|
||||||
return context
|
|
||||||
|
|
||||||
@mock.patch("%s.sahara_edp.resource_manager.cleanup" % CTX)
|
@mock.patch("%s.sahara_data_sources.resource_manager.cleanup" % CTX)
|
||||||
@mock.patch("%s.sahara_edp.requests" % CTX)
|
@mock.patch("%s.sahara_data_sources.osclients" % CTX)
|
||||||
@mock.patch("%s.sahara_edp.osclients" % CTX)
|
def test_setup_and_cleanup(self, mock_osclients, mock_cleanup):
|
||||||
def test_setup_and_cleanup(self, mock_osclients, mock_requests,
|
|
||||||
mock_cleanup):
|
|
||||||
|
|
||||||
mock_sahara = mock_osclients.Clients(mock.MagicMock()).sahara()
|
mock_sahara = mock_osclients.Clients(mock.MagicMock()).sahara()
|
||||||
mock_sahara.data_sources.create.return_value = mock.MagicMock(id=42)
|
mock_sahara.data_sources.create.return_value = mock.MagicMock(id=42)
|
||||||
mock_sahara.job_binary_internals.create.return_value = (
|
|
||||||
mock.MagicMock(id=42))
|
|
||||||
|
|
||||||
mock_requests.get().content = "test_binary"
|
sahara_ctx = sahara_data_sources.SaharaDataSources(self.context)
|
||||||
|
|
||||||
ctx = self.context_without_edp_keys
|
|
||||||
sahara_ctx = sahara_edp.SaharaEDP(ctx)
|
|
||||||
sahara_ctx.generate_random_name = mock.Mock()
|
sahara_ctx.generate_random_name = mock.Mock()
|
||||||
|
|
||||||
input_ds_crete_calls = []
|
input_ds_crete_calls = []
|
||||||
download_calls = []
|
|
||||||
job_binary_internals_calls = []
|
|
||||||
job_binaries_calls = []
|
|
||||||
|
|
||||||
for i in range(self.tenants_num):
|
for i in range(self.tenants_num):
|
||||||
input_ds_crete_calls.append(mock.call(
|
input_ds_crete_calls.append(mock.call(
|
||||||
@ -100,28 +81,14 @@ class SaharaEDPTestCase(test.TestCase):
|
|||||||
description="",
|
description="",
|
||||||
data_source_type="hdfs",
|
data_source_type="hdfs",
|
||||||
url="hdfs://test_host/"))
|
url="hdfs://test_host/"))
|
||||||
download_calls.append(mock.call("http://example.com/test.jar"))
|
|
||||||
job_binary_internals_calls.append(mock.call(
|
|
||||||
name="test.jar",
|
|
||||||
data="test_binary"))
|
|
||||||
job_binaries_calls.append(mock.call(
|
|
||||||
name="test.jar",
|
|
||||||
url="internal-db://42",
|
|
||||||
description="",
|
|
||||||
extra={}))
|
|
||||||
|
|
||||||
sahara_ctx.setup()
|
sahara_ctx.setup()
|
||||||
|
|
||||||
mock_sahara.data_sources.create.assert_has_calls(input_ds_crete_calls)
|
mock_sahara.data_sources.create.assert_has_calls(input_ds_crete_calls)
|
||||||
mock_requests.get.assert_has_calls(download_calls)
|
|
||||||
mock_sahara.job_binary_internals.create.assert_has_calls(
|
|
||||||
job_binary_internals_calls)
|
|
||||||
mock_sahara.job_binaries.create.assert_has_calls(job_binaries_calls)
|
|
||||||
|
|
||||||
sahara_ctx.cleanup()
|
sahara_ctx.cleanup()
|
||||||
|
|
||||||
mock_cleanup.assert_called_once_with(
|
mock_cleanup.assert_called_once_with(
|
||||||
names=["sahara.job_executions", "sahara.jobs",
|
names=["sahara.job_executions", "sahara.jobs",
|
||||||
"sahara.job_binary_internals", "sahara.job_binaries",
|
|
||||||
"sahara.data_sources"],
|
"sahara.data_sources"],
|
||||||
users=ctx["users"])
|
users=self.context["users"])
|
@ -0,0 +1,140 @@
|
|||||||
|
# All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import mock
|
||||||
|
|
||||||
|
from rally.plugins.openstack.context.sahara import sahara_job_binaries
|
||||||
|
from tests.unit import test
|
||||||
|
|
||||||
|
CTX = "rally.plugins.openstack.context.sahara"
|
||||||
|
|
||||||
|
|
||||||
|
class SaharaJobBinariesTestCase(test.ScenarioTestCase):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(SaharaJobBinariesTestCase, self).setUp()
|
||||||
|
self.tenants_num = 2
|
||||||
|
self.users_per_tenant = 2
|
||||||
|
self.users = self.tenants_num * self.users_per_tenant
|
||||||
|
self.task = mock.MagicMock()
|
||||||
|
|
||||||
|
self.tenants = {}
|
||||||
|
self.users_key = []
|
||||||
|
|
||||||
|
for i in range(self.tenants_num):
|
||||||
|
self.tenants[str(i)] = {"id": str(i), "name": str(i),
|
||||||
|
"sahara_image": "42"}
|
||||||
|
for j in range(self.users_per_tenant):
|
||||||
|
self.users_key.append({"id": "%s_%s" % (str(i), str(j)),
|
||||||
|
"tenant_id": str(i),
|
||||||
|
"endpoint": "endpoint"})
|
||||||
|
|
||||||
|
self.user_key = [{"id": i, "tenant_id": j, "endpoint": "endpoint"}
|
||||||
|
for j in range(self.tenants_num)
|
||||||
|
for i in range(self.users_per_tenant)]
|
||||||
|
|
||||||
|
self.context.update({
|
||||||
|
"config": {
|
||||||
|
"users": {
|
||||||
|
"tenants": self.tenants_num,
|
||||||
|
"users_per_tenant": self.users_per_tenant,
|
||||||
|
},
|
||||||
|
"sahara_job_binaries": {
|
||||||
|
"libs": [
|
||||||
|
{
|
||||||
|
"name": "test.jar",
|
||||||
|
"download_url": "http://example.com/test.jar"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"mains": [
|
||||||
|
{
|
||||||
|
"name": "test.jar",
|
||||||
|
"download_url": "http://example.com/test.jar"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"admin": {"endpoint": mock.MagicMock()},
|
||||||
|
"task": mock.MagicMock(),
|
||||||
|
"users": self.users_key,
|
||||||
|
"tenants": self.tenants
|
||||||
|
})
|
||||||
|
|
||||||
|
@mock.patch("%s.sahara_job_binaries.resource_manager.cleanup" % CTX)
|
||||||
|
@mock.patch(("%s.sahara_job_binaries.SaharaJobBinaries."
|
||||||
|
"download_and_save_lib") % CTX)
|
||||||
|
@mock.patch("%s.sahara_job_binaries.osclients" % CTX)
|
||||||
|
def test_setup_and_cleanup(
|
||||||
|
self,
|
||||||
|
mock_osclients,
|
||||||
|
mock_sahara_job_binaries_download_and_save_lib,
|
||||||
|
mock_cleanup):
|
||||||
|
|
||||||
|
mock_sahara = mock_osclients.Clients(mock.MagicMock()).sahara()
|
||||||
|
|
||||||
|
sahara_ctx = sahara_job_binaries.SaharaJobBinaries(self.context)
|
||||||
|
|
||||||
|
download_calls = []
|
||||||
|
|
||||||
|
for i in range(self.tenants_num):
|
||||||
|
download_calls.append(mock.call(
|
||||||
|
sahara=mock_sahara,
|
||||||
|
lib_type="sahara_mains",
|
||||||
|
name="test.jar",
|
||||||
|
download_url="http://example.com/test.jar",
|
||||||
|
tenant_id=str(i)))
|
||||||
|
download_calls.append(mock.call(
|
||||||
|
sahara=mock_sahara,
|
||||||
|
lib_type="sahara_libs",
|
||||||
|
name="test.jar",
|
||||||
|
download_url="http://example.com/test.jar",
|
||||||
|
tenant_id=str(i)))
|
||||||
|
|
||||||
|
sahara_ctx.setup()
|
||||||
|
|
||||||
|
(mock_sahara_job_binaries_download_and_save_lib.
|
||||||
|
assert_has_calls(download_calls))
|
||||||
|
|
||||||
|
sahara_ctx.cleanup()
|
||||||
|
|
||||||
|
mock_cleanup.assert_called_once_with(
|
||||||
|
names=["sahara.job_binary_internals", "sahara.job_binaries"],
|
||||||
|
users=self.context["users"])
|
||||||
|
|
||||||
|
@mock.patch("%s.sahara_job_binaries.requests" % CTX)
|
||||||
|
@mock.patch("%s.sahara_job_binaries.osclients" % CTX)
|
||||||
|
def test_download_and_save_lib(self, mock_osclients, mock_requests):
|
||||||
|
|
||||||
|
mock_requests.get.content.return_value = "some_binary_content"
|
||||||
|
mock_sahara = mock_osclients.Clients(mock.MagicMock()).sahara()
|
||||||
|
mock_sahara.job_binary_internals.create.return_value = (
|
||||||
|
mock.MagicMock(id=42))
|
||||||
|
|
||||||
|
sahara_ctx = sahara_job_binaries.SaharaJobBinaries(self.context)
|
||||||
|
sahara_ctx.context["tenants"]["0"]["sahara_mains"] = []
|
||||||
|
sahara_ctx.context["tenants"]["0"]["sahara_libs"] = []
|
||||||
|
|
||||||
|
sahara_ctx.download_and_save_lib(sahara=mock_sahara,
|
||||||
|
lib_type="sahara_mains",
|
||||||
|
name="test_binary",
|
||||||
|
download_url="http://somewhere",
|
||||||
|
tenant_id="0")
|
||||||
|
|
||||||
|
sahara_ctx.download_and_save_lib(sahara=mock_sahara,
|
||||||
|
lib_type="sahara_libs",
|
||||||
|
name="test_binary_2",
|
||||||
|
download_url="http://somewhere",
|
||||||
|
tenant_id="0")
|
||||||
|
|
||||||
|
mock_requests.get.assert_called_once_with("http://somewhere")
|
Loading…
Reference in New Issue
Block a user