Remove Sahara

This removes the retired Sahara project that
was deprecated in 3.0.0

Change-Id: I7b313693de3f5c35293dd473f24832973de52053
This commit is contained in:
Tobias Urdin
2025-03-26 10:00:43 +01:00
committed by Andriy Kurilin
parent 8b817a2701
commit 9f85b9e16f
65 changed files with 5 additions and 5070 deletions

View File

@@ -24,7 +24,7 @@ Removed
* Support for Python3.8 is dropped
* Removed all support for the retired Murano project
* Removed all support for the retired Sahara project
[3.0.0] - 2024-05-23
--------------------

View File

@@ -706,24 +706,6 @@
# Enable or disable osprofiler to trace the scenarios (boolean value)
#enable_profiler = true
# A timeout in seconds for a cluster create operation (integer value)
#sahara_cluster_create_timeout = 1800
# A timeout in seconds for a cluster delete operation (integer value)
#sahara_cluster_delete_timeout = 900
# Cluster status polling interval in seconds (integer value)
#sahara_cluster_check_interval = 5
# A timeout in seconds for a Job Execution to complete (integer value)
#sahara_job_execution_timeout = 600
# Job Execution status polling interval in seconds (integer value)
#sahara_job_check_interval = 5
# Amount of workers one proxy should serve to. (integer value)
#sahara_workers_per_proxy = 20
# Interval between checks when waiting for a VM to become pingable
# (floating point value)
#vm_ping_poll_interval = 1.0

View File

@@ -26,7 +26,6 @@ from rally_openstack.common.cfg import nova
from rally_openstack.common.cfg import octavia
from rally_openstack.common.cfg import osclients
from rally_openstack.common.cfg import profiler
from rally_openstack.common.cfg import sahara
from rally_openstack.common.cfg import senlin
from rally_openstack.common.cfg import vm
from rally_openstack.common.cfg import watcher
@@ -46,7 +45,7 @@ def list_opts():
opts = {}
for l_opts in (cinder.OPTS, heat.OPTS, ironic.OPTS, magnum.OPTS,
manila.OPTS, mistral.OPTS, monasca.OPTS,
nova.OPTS, osclients.OPTS, profiler.OPTS, sahara.OPTS,
nova.OPTS, osclients.OPTS, profiler.OPTS,
vm.OPTS, glance.OPTS, watcher.OPTS, tempest.OPTS,
keystone_roles.OPTS, keystone_users.OPTS, cleanup.OPTS,
senlin.OPTS, neutron.OPTS, octavia.OPTS,

View File

@@ -1,43 +0,0 @@
# Copyright 2013: Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from rally.common import cfg
OPTS = {"openstack": [
cfg.IntOpt("sahara_cluster_create_timeout",
default=1800,
deprecated_group="benchmark",
help="A timeout in seconds for a cluster create operation"),
cfg.IntOpt("sahara_cluster_delete_timeout",
default=900,
deprecated_group="benchmark",
help="A timeout in seconds for a cluster delete operation"),
cfg.IntOpt("sahara_cluster_check_interval",
default=5,
deprecated_group="benchmark",
help="Cluster status polling interval in seconds"),
cfg.IntOpt("sahara_job_execution_timeout",
default=600,
deprecated_group="benchmark",
help="A timeout in seconds for a Job Execution to complete"),
cfg.IntOpt("sahara_job_check_interval",
default=5,
deprecated_group="benchmark",
help="Job Execution status polling interval in seconds"),
cfg.IntOpt("sahara_workers_per_proxy",
default=20,
deprecated_group="benchmark",
help="Amount of workers one proxy should serve to.")
]}

View File

@@ -59,7 +59,6 @@ class _Service(utils.ImmutableMixin, utils.EnumMixin):
S3 = "s3"
SENLIN = "senlin"
TROVE = "trove"
SAHARA = "sahara"
SWIFT = "swift"
MISTRAL = "mistral"
IRONIC = "ironic"
@@ -91,8 +90,6 @@ class _ServiceType(utils.ImmutableMixin, utils.EnumMixin):
MONITORING = "monitoring"
S3 = "s3"
DATABASE = "database"
DATA_PROCESSING = "data-processing"
DATA_PROCESSING_MOS = "data_processing"
OBJECT_STORE = "object-store"
WORKFLOW_EXECUTION = "workflowv2"
BARE_METAL = "baremetal"
@@ -124,8 +121,6 @@ class _ServiceType(utils.ImmutableMixin, utils.EnumMixin):
self.MONITORING: _Service.MONASCA,
self.S3: _Service.S3,
self.DATABASE: _Service.TROVE,
self.DATA_PROCESSING: _Service.SAHARA,
self.DATA_PROCESSING_MOS: _Service.SAHARA,
self.OBJECT_STORE: _Service.SWIFT,
self.WORKFLOW_EXECUTION: _Service.MISTRAL,
self.BARE_METAL: _Service.IRONIC,

View File

@@ -610,36 +610,6 @@ class Ironic(OSClient):
return client
@configure("sahara", default_version="1.1", supported_versions=["1.0", "1.1"],
default_service_type="data-processing")
class Sahara(OSClient):
"""Wrapper for SaharaClient which returns an authenticated native client.
"""
# NOTE(andreykurilin): saharaclient supports "1.0" version and doesn't
# support "1". `choose_version` and `validate_version` methods are written
# as a hack to covert 1 -> 1.0, which can simplify setting saharaclient
# for end-users.
def choose_version(self, version=None):
return float(super(Sahara, self).choose_version(version))
@classmethod
def validate_version(cls, version):
super(Sahara, cls).validate_version(float(version))
def create_client(self, version=None, service_type=None):
"""Return Sahara client."""
from saharaclient import client as sahara
client = sahara.Client(
self.choose_version(version),
session=self.keystone.get_session()[0],
sahara_url=self._get_endpoint(service_type))
return client
@configure("zaqar", default_version="1.1", default_service_type="messaging",
supported_versions=["1", "1.1"])
class Zaqar(OSClient):

View File

@@ -639,72 +639,6 @@ class GlanceImage(base.ResourceManager):
check_interval=CONF.openstack.glance_image_delete_poll_interval)
# SAHARA
_sahara_order = get_order(600)
@base.resource("sahara", "job_executions", order=next(_sahara_order),
tenant_resource=True)
class SaharaJobExecution(SynchronizedDeletion, base.ResourceManager):
pass
@base.resource("sahara", "jobs", order=next(_sahara_order),
tenant_resource=True)
class SaharaJob(SynchronizedDeletion, base.ResourceManager):
pass
@base.resource("sahara", "job_binary_internals", order=next(_sahara_order),
tenant_resource=True)
class SaharaJobBinaryInternals(SynchronizedDeletion, base.ResourceManager):
pass
@base.resource("sahara", "job_binaries", order=next(_sahara_order),
tenant_resource=True)
class SaharaJobBinary(SynchronizedDeletion, base.ResourceManager):
pass
@base.resource("sahara", "data_sources", order=next(_sahara_order),
tenant_resource=True)
class SaharaDataSource(SynchronizedDeletion, base.ResourceManager):
pass
@base.resource("sahara", "clusters", order=next(_sahara_order),
tenant_resource=True)
class SaharaCluster(base.ResourceManager):
# Need special treatment for Sahara Cluster because of the way the
# exceptions are described in:
# https://github.com/openstack/python-saharaclient/blob/master/
# saharaclient/api/base.py#L145
def is_deleted(self):
from saharaclient.api import base as saharaclient_base
try:
self._manager().get(self.id())
return False
except saharaclient_base.APIException as e:
return e.error_code == 404
@base.resource("sahara", "cluster_templates", order=next(_sahara_order),
tenant_resource=True)
class SaharaClusterTemplate(SynchronizedDeletion, base.ResourceManager):
pass
@base.resource("sahara", "node_group_templates", order=next(_sahara_order),
tenant_resource=True)
class SaharaNodeGroup(SynchronizedDeletion, base.ResourceManager):
pass
# CEILOMETER
@base.resource("ceilometer", "alarms", order=700, tenant_resource=True)

View File

@@ -1,183 +0,0 @@
# Copyright 2014: Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from rally.common import cfg
from rally.common import validation
from rally import exceptions
from rally.task import utils as bench_utils
from rally_openstack.common import consts
from rally_openstack.task.cleanup import manager as resource_manager
from rally_openstack.task import context
from rally_openstack.task.scenarios.sahara import utils
CONF = cfg.CONF
@validation.add("required_platform", platform="openstack", users=True)
@context.configure(name="sahara_cluster", platform="openstack", order=441)
class SaharaCluster(context.OpenStackContext):
"""Context class for setting up the Cluster an EDP job."""
CONFIG_SCHEMA = {
"type": "object",
"$schema": consts.JSON_SCHEMA,
"properties": {
"plugin_name": {
"type": "string"
},
"hadoop_version": {
"type": "string",
},
"workers_count": {
"type": "integer",
"minimum": 1
},
"flavor_id": {
"type": "string",
},
"master_flavor_id": {
"type": "string",
},
"worker_flavor_id": {
"type": "string",
},
"floating_ip_pool": {
"type": "string",
},
"volumes_per_node": {
"type": "integer",
"minimum": 1
},
"volumes_size": {
"type": "integer",
"minimum": 1
},
"auto_security_group": {
"type": "boolean",
},
"security_groups": {
"type": "array",
"items": {
"type": "string"
}
},
"node_configs": {
"type": "object",
"additionalProperties": True
},
"cluster_configs": {
"type": "object",
"additionalProperties": True
},
"enable_anti_affinity": {
"type": "boolean"
},
"enable_proxy": {
"type": "boolean"
},
"use_autoconfig": {
"type": "boolean"
},
},
"additionalProperties": False,
"required": ["plugin_name", "hadoop_version", "workers_count",
"master_flavor_id", "worker_flavor_id"]
}
def setup(self):
utils.init_sahara_context(self)
self.context["sahara"]["clusters"] = {}
wait_dict = {}
for user, tenant_id in self._iterate_per_tenants():
image_id = self.context["tenants"][tenant_id]["sahara"]["image"]
floating_ip_pool = self.config.get("floating_ip_pool")
temporary_context = {
"user": user,
"tenant": self.context["tenants"][tenant_id],
"task": self.context["task"],
"owner_id": self.context["owner_id"]
}
scenario = utils.SaharaScenario(context=temporary_context)
cluster = scenario._launch_cluster(
plugin_name=self.config["plugin_name"],
hadoop_version=self.config["hadoop_version"],
flavor_id=self.config.get("flavor_id"),
master_flavor_id=self.config["master_flavor_id"],
worker_flavor_id=self.config["worker_flavor_id"],
workers_count=self.config["workers_count"],
image_id=image_id,
floating_ip_pool=floating_ip_pool,
volumes_per_node=self.config.get("volumes_per_node"),
volumes_size=self.config.get("volumes_size", 1),
auto_security_group=self.config.get("auto_security_group",
True),
security_groups=self.config.get("security_groups"),
node_configs=self.config.get("node_configs"),
cluster_configs=self.config.get("cluster_configs"),
enable_anti_affinity=self.config.get("enable_anti_affinity",
False),
enable_proxy=self.config.get("enable_proxy", False),
wait_active=False,
use_autoconfig=self.config.get("use_autoconfig", True)
)
self.context["tenants"][tenant_id]["sahara"]["cluster"] = (
cluster.id)
# Need to save the client instance to poll for active status
wait_dict[cluster] = scenario.clients("sahara")
bench_utils.wait_for(
resource=wait_dict,
update_resource=self.update_clusters_dict,
is_ready=self.all_clusters_active,
timeout=CONF.openstack.sahara_cluster_create_timeout,
check_interval=CONF.openstack.sahara_cluster_check_interval)
def update_clusters_dict(self, dct):
new_dct = {}
for cluster, client in dct.items():
new_cl = client.clusters.get(cluster.id)
new_dct[new_cl] = client
return new_dct
def all_clusters_active(self, dct):
for cluster, client in dct.items():
cluster_status = cluster.status.lower()
if cluster_status == "error":
msg = ("Sahara cluster %(name)s has failed to"
" %(action)s. Reason: '%(reason)s'"
% {"name": cluster.name, "action": "start",
"reason": cluster.status_description})
raise exceptions.ContextSetupFailure(ctx_name=self.get_name(),
msg=msg)
elif cluster_status != "active":
return False
return True
def cleanup(self):
resource_manager.cleanup(names=["sahara.clusters"],
users=self.context.get("users", []),
superclass=utils.SaharaScenario,
task_id=self.get_owner_id())

View File

@@ -1,126 +0,0 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from rally.common import validation
from rally import exceptions
from rally_openstack.common import consts
from rally_openstack.common import osclients
from rally_openstack.common.services.image import image as image_services
from rally_openstack.task.cleanup import manager as resource_manager
from rally_openstack.task import context
from rally_openstack.task.scenarios.sahara import utils
@validation.add("required_platform", platform="openstack", users=True)
@context.configure(name="sahara_image", platform="openstack", order=440)
class SaharaImage(context.OpenStackContext):
"""Context class for adding and tagging Sahara images."""
CONFIG_SCHEMA = {
"type": "object",
"$schema": consts.JSON_SCHEMA,
"properties": {
"image_uuid": {
"type": "string"
},
"image_url": {
"type": "string",
},
"username": {
"type": "string"
},
"plugin_name": {
"type": "string",
},
"hadoop_version": {
"type": "string",
}
},
"oneOf": [
{"description": "Create an image.",
"required": ["image_url", "username", "plugin_name",
"hadoop_version"]},
{"description": "Use an existing image.",
"required": ["image_uuid"]}
],
"additionalProperties": False
}
def _create_image(self, hadoop_version, image_url, plugin_name, user,
user_name):
clients = osclients.Clients(user["credential"])
image_service = image_services.Image(
clients, name_generator=self.generate_random_name)
image = image_service.create_image(container_format="bare",
image_location=image_url,
disk_format="qcow2")
clients.sahara().images.update_image(
image_id=image.id, user_name=user_name, desc="")
clients.sahara().images.update_tags(
image_id=image.id, new_tags=[plugin_name, hadoop_version])
return image.id
def setup(self):
utils.init_sahara_context(self)
self.context["sahara"]["images"] = {}
# The user may want to use the existing image. In this case he should
# make sure that the image is public and has all required metadata.
image_uuid = self.config.get("image_uuid")
self.context["sahara"]["need_image_cleanup"] = not image_uuid
if image_uuid:
# Using the first user to check the existing image.
user = self.context["users"][0]
clients = osclients.Clients(user["credential"])
image = clients.glance().images.get(image_uuid)
visibility = None
if hasattr(image, "is_public"):
visibility = "public" if image.is_public else "private"
else:
visibility = image["visibility"]
if visibility != "public":
raise exceptions.ContextSetupFailure(
ctx_name=self.get_name(),
msg="Use only public image for sahara_image context"
)
image_id = image_uuid
for user, tenant_id in self._iterate_per_tenants():
self.context["tenants"][tenant_id]["sahara"]["image"] = (
image_id)
else:
for user, tenant_id in self._iterate_per_tenants():
image_id = self._create_image(
hadoop_version=self.config["hadoop_version"],
image_url=self.config["image_url"],
plugin_name=self.config["plugin_name"],
user=user,
user_name=self.config["username"])
self.context["tenants"][tenant_id]["sahara"]["image"] = (
image_id)
def cleanup(self):
if self.context["sahara"]["need_image_cleanup"]:
resource_manager.cleanup(names=["glance.images"],
users=self.context.get("users", []),
superclass=self.__class__,
task_id=self.get_owner_id())

View File

@@ -1,127 +0,0 @@
# Copyright 2015: Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from urllib.parse import urlparse
import requests
from rally.common import validation
from rally_openstack.common import consts
from rally_openstack.common import osclients
from rally_openstack.task.cleanup import manager as resource_manager
from rally_openstack.task import context
from rally_openstack.task.scenarios.sahara import utils
from rally_openstack.task.scenarios.swift import utils as swift_utils
@validation.add("required_platform", platform="openstack", users=True)
@context.configure(name="sahara_input_data_sources", platform="openstack",
order=443)
class SaharaInputDataSources(context.OpenStackContext):
"""Context class for setting up Input Data Sources for an EDP job."""
CONFIG_SCHEMA = {
"type": "object",
"$schema": consts.JSON_SCHEMA,
"properties": {
"input_type": {
"enum": ["swift", "hdfs"],
},
"input_url": {
"type": "string",
},
"swift_files": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {
"type": "string"
},
"download_url": {
"type": "string"
}
},
"additionalProperties": False,
"required": ["name", "download_url"]
}
}
},
"additionalProperties": False,
"required": ["input_type", "input_url"]
}
def setup(self):
utils.init_sahara_context(self)
self.context["sahara"]["swift_objects"] = []
self.context["sahara"]["container_name"] = None
for user, tenant_id in self._iterate_per_tenants():
clients = osclients.Clients(user["credential"])
if self.config["input_type"] == "swift":
self.setup_inputs_swift(clients, tenant_id,
self.config["input_url"],
self.config["swift_files"],
user["credential"].username,
user["credential"].password)
else:
self.setup_inputs(clients, tenant_id,
self.config["input_type"],
self.config["input_url"])
def setup_inputs(self, clients, tenant_id, input_type, input_url):
input_ds = clients.sahara().data_sources.create(
name=self.generate_random_name(),
description="",
data_source_type=input_type,
url=input_url)
self.context["tenants"][tenant_id]["sahara"]["input"] = input_ds.id
def setup_inputs_swift(self, clients, tenant_id, input_url,
swift_files, username, password):
swift_scenario = swift_utils.SwiftScenario(clients=clients,
context=self.context)
# TODO(astudenov): use self.generate_random_name()
container_name = "rally_" + urlparse(input_url).netloc.rstrip(
".sahara")
self.context["sahara"]["container_name"] = (
swift_scenario._create_container(container_name=container_name))
for swift_file in swift_files:
content = requests.get(swift_file["download_url"]).content
self.context["sahara"]["swift_objects"].append(
swift_scenario._upload_object(
self.context["sahara"]["container_name"], content,
object_name=swift_file["name"]))
input_ds_swift = clients.sahara().data_sources.create(
name=self.generate_random_name(), description="",
data_source_type="swift", url=input_url,
credential_user=username, credential_pass=password)
self.context["tenants"][tenant_id]["sahara"]["input"] = (
input_ds_swift.id)
def cleanup(self):
resource_manager.cleanup(
names=["swift.object", "swift.container"],
users=self.context.get("users", []),
superclass=swift_utils.SwiftScenario,
task_id=self.get_owner_id())
resource_manager.cleanup(
names=["sahara.data_sources"],
users=self.context.get("users", []),
superclass=self.__class__,
task_id=self.get_owner_id())

View File

@@ -1,144 +0,0 @@
# Copyright 2014: Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import requests
from rally.common import validation
from rally import exceptions
from rally_openstack.common import consts
from rally_openstack.common import osclients
from rally_openstack.task.cleanup import manager as resource_manager
from rally_openstack.task import context
from rally_openstack.task.scenarios.sahara import utils
@validation.add("required_platform", platform="openstack", users=True)
@context.configure(name="sahara_job_binaries", platform="openstack", order=442)
class SaharaJobBinaries(context.OpenStackContext):
"""Context class for setting up Job Binaries for an EDP job."""
CONFIG_SCHEMA = {
"type": "object",
"$schema": consts.JSON_SCHEMA,
"properties": {
"mains": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {
"type": "string"
},
"download_url": {
"type": "string"
}
},
"additionalProperties": False,
"required": ["name", "download_url"]
}
},
"libs": {
"type": "array",
"items": {
"type": "object",
"properties": {
"name": {
"type": "string"
},
"download_url": {
"type": "string"
}
},
"additionalProperties": False,
"required": ["name", "download_url"]
}
}
},
"additionalProperties": False
}
# This cache will hold the downloaded libs content to prevent repeated
# downloads for each tenant
lib_cache = {}
def setup(self):
utils.init_sahara_context(self)
for user, tenant_id in self._iterate_per_tenants():
clients = osclients.Clients(user["credential"])
sahara = clients.sahara()
self.context["tenants"][tenant_id]["sahara"]["mains"] = []
self.context["tenants"][tenant_id]["sahara"]["libs"] = []
for main in self.config.get("mains", []):
self.download_and_save_lib(
sahara=sahara,
lib_type="mains",
name=main["name"],
download_url=main["download_url"],
tenant_id=tenant_id)
for lib in self.config.get("libs", []):
self.download_and_save_lib(
sahara=sahara,
lib_type="libs",
name=lib["name"],
download_url=lib["download_url"],
tenant_id=tenant_id)
def setup_inputs(self, sahara, tenant_id, input_type, input_url):
if input_type == "swift":
raise exceptions.RallyException(
"Swift Data Sources are not implemented yet")
# Todo(nkonovalov): Add swift credentials parameters and data upload
input_ds = sahara.data_sources.create(
name=self.generate_random_name(),
description="",
data_source_type=input_type,
url=input_url)
self.context["tenants"][tenant_id]["sahara"]["input"] = input_ds.id
def download_and_save_lib(self, sahara, lib_type, name, download_url,
tenant_id):
if download_url not in self.lib_cache:
lib_data = requests.get(download_url).content
self.lib_cache[download_url] = lib_data
else:
lib_data = self.lib_cache[download_url]
job_binary_internal = sahara.job_binary_internals.create(
name=name,
data=lib_data)
url = "internal-db://%s" % job_binary_internal.id
job_binary = sahara.job_binaries.create(name=name,
url=url,
description="",
extra={})
self.context["tenants"][tenant_id]["sahara"][lib_type].append(
job_binary.id)
def cleanup(self):
resources = ["job_binary_internals", "job_binaries"]
resource_manager.cleanup(
names=["sahara.%s" % res for res in resources],
users=self.context.get("users", []),
superclass=utils.SaharaScenario,
task_id=self.context["task"]["uuid"])

View File

@@ -1,104 +0,0 @@
# Copyright 2015: Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from rally.common import validation
from rally_openstack.common import consts
from rally_openstack.common import osclients
from rally_openstack.task.cleanup import manager as resource_manager
from rally_openstack.task import context
from rally_openstack.task.scenarios.sahara import utils
from rally_openstack.task.scenarios.swift import utils as swift_utils
@validation.add("required_platform", platform="openstack", users=True)
@context.configure(name="sahara_output_data_sources", platform="openstack",
order=444)
class SaharaOutputDataSources(context.OpenStackContext):
"""Context class for setting up Output Data Sources for an EDP job."""
CONFIG_SCHEMA = {
"type": "object",
"$schema": consts.JSON_SCHEMA,
"properties": {
"output_type": {
"enum": ["swift", "hdfs"],
},
"output_url_prefix": {
"type": "string",
}
},
"additionalProperties": False,
"required": ["output_type", "output_url_prefix"]
}
def setup(self):
utils.init_sahara_context(self)
for user, tenant_id in self._iterate_per_tenants():
clients = osclients.Clients(user["credential"])
sahara = clients.sahara()
if self.config["output_type"] == "swift":
swift = swift_utils.SwiftScenario(clients=clients,
context=self.context)
container_name = self.generate_random_name()
self.context["tenants"][tenant_id]["sahara"]["container"] = {
"name": swift._create_container(
container_name=container_name),
"output_swift_objects": []
}
self.setup_outputs_swift(swift, sahara, tenant_id,
container_name,
user["credential"].username,
user["credential"].password)
else:
self.setup_outputs_hdfs(sahara, tenant_id,
self.config["output_url_prefix"])
def setup_outputs_hdfs(self, sahara, tenant_id, output_url):
output_ds = sahara.data_sources.create(
name=self.generate_random_name(),
description="",
data_source_type="hdfs",
url=output_url)
self.context["tenants"][tenant_id]["sahara"]["output"] = output_ds.id
def setup_outputs_swift(self, swift, sahara, tenant_id, container_name,
username, password):
output_ds_swift = sahara.data_sources.create(
name=self.generate_random_name(),
description="",
data_source_type="swift",
url="swift://" + container_name + ".sahara/",
credential_user=username,
credential_pass=password)
self.context["tenants"][tenant_id]["sahara"]["output"] = (
output_ds_swift.id
)
def cleanup(self):
resource_manager.cleanup(
names=["swift.object", "swift.container"],
users=self.context.get("users", []),
superclass=self.__class__,
task_id=self.get_owner_id())
resource_manager.cleanup(
names=["sahara.data_sources"],
users=self.context.get("users", []),
superclass=self.__class__,
task_id=self.get_owner_id())

View File

@@ -1,233 +0,0 @@
# Copyright 2014: Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from rally.common import logging
from rally.task import types
from rally.task import validation
from rally_openstack.common import consts
from rally_openstack.task import scenario
from rally_openstack.task.scenarios.sahara import utils
LOG = logging.getLogger(__name__)
"""Scenarios for Sahara clusters."""
@types.convert(flavor={"type": "nova_flavor"},
master_flavor={"type": "nova_flavor"},
worker_flavor={"type": "nova_flavor"},
neutron_net={"type": "neutron_network"},
floating_ip_pool={"type": "neutron_network"})
@validation.add("flavor_exists", param_name="master_flavor")
@validation.add("flavor_exists", param_name="worker_flavor")
@validation.add("required_contexts", contexts=["users", "sahara_image"])
@validation.add("number", param_name="workers_count", minval=1,
integer_only=True)
@validation.add("required_services", services=[consts.Service.SAHARA])
@validation.add("required_platform", platform="openstack", users=True)
@scenario.configure(context={"cleanup@openstack": ["sahara"]},
name="SaharaClusters.create_and_delete_cluster",
platform="openstack")
class CreateAndDeleteCluster(utils.SaharaScenario):
def run(self, workers_count, plugin_name, hadoop_version,
master_flavor=None, worker_flavor=None, flavor=None,
floating_ip_pool=None, volumes_per_node=None,
volumes_size=None, auto_security_group=None,
security_groups=None, node_configs=None,
cluster_configs=None, enable_anti_affinity=False,
enable_proxy=False, use_autoconfig=True):
"""Launch and delete a Sahara Cluster.
This scenario launches a Hadoop cluster, waits until it becomes
'Active' and deletes it.
:param flavor: Nova flavor that will be for nodes in the
created node groups. Deprecated.
:param master_flavor: Nova flavor that will be used for the master
instance of the cluster
:param worker_flavor: Nova flavor that will be used for the workers of
the cluster
:param workers_count: number of worker instances in a cluster
:param plugin_name: name of a provisioning plugin
:param hadoop_version: version of Hadoop distribution supported by
the specified plugin.
:param floating_ip_pool: floating ip pool name from which Floating
IPs will be allocated. Sahara will determine
automatically how to treat this depending on
its own configurations. Defaults to None
because in some cases Sahara may work w/o
Floating IPs.
:param volumes_per_node: number of Cinder volumes that will be
attached to every cluster node
:param volumes_size: size of each Cinder volume in GB
:param auto_security_group: boolean value. If set to True Sahara will
create a Security Group for each Node Group
in the Cluster automatically.
:param security_groups: list of security groups that will be used
while creating VMs. If auto_security_group
is set to True, this list can be left empty.
:param node_configs: config dict that will be passed to each Node
Group
:param cluster_configs: config dict that will be passed to the
Cluster
:param enable_anti_affinity: If set to true the vms will be scheduled
one per compute node.
:param enable_proxy: Use Master Node of a Cluster as a Proxy node and
do not assign floating ips to workers.
:param use_autoconfig: If True, instances of the node group will be
automatically configured during cluster
creation. If False, the configuration values
should be specify manually
"""
image_id = self.context["tenant"]["sahara"]["image"]
LOG.debug("Using Image: %s" % image_id)
cluster = self._launch_cluster(
flavor_id=flavor,
master_flavor_id=master_flavor,
worker_flavor_id=worker_flavor,
image_id=image_id,
workers_count=workers_count,
plugin_name=plugin_name,
hadoop_version=hadoop_version,
floating_ip_pool=floating_ip_pool,
volumes_per_node=volumes_per_node,
volumes_size=volumes_size,
auto_security_group=auto_security_group,
security_groups=security_groups,
node_configs=node_configs,
cluster_configs=cluster_configs,
enable_anti_affinity=enable_anti_affinity,
enable_proxy=enable_proxy,
use_autoconfig=use_autoconfig)
self._delete_cluster(cluster)
@types.convert(flavor={"type": "nova_flavor"},
master_flavor={"type": "nova_flavor"},
worker_flavor={"type": "nova_flavor"})
@validation.add("flavor_exists", param_name="master_flavor")
@validation.add("flavor_exists", param_name="worker_flavor")
@validation.add("required_services", services=[consts.Service.SAHARA])
@validation.add("required_contexts", contexts=["users", "sahara_image"])
@validation.add("number", param_name="workers_count", minval=1,
integer_only=True)
@scenario.configure(context={"cleanup@openstack": ["sahara"]},
name="SaharaClusters.create_scale_delete_cluster",
platform="openstack")
class CreateScaleDeleteCluster(utils.SaharaScenario):
def run(self, master_flavor, worker_flavor, workers_count,
plugin_name, hadoop_version, deltas, flavor=None,
floating_ip_pool=None, volumes_per_node=None,
volumes_size=None, auto_security_group=None,
security_groups=None, node_configs=None,
cluster_configs=None, enable_anti_affinity=False,
enable_proxy=False, use_autoconfig=True):
"""Launch, scale and delete a Sahara Cluster.
This scenario launches a Hadoop cluster, waits until it becomes
'Active'. Then a series of scale operations is applied. The scaling
happens according to numbers listed in :param deltas. Ex. if
deltas is set to [2, -2] it means that the first scaling operation will
add 2 worker nodes to the cluster and the second will remove two.
:param flavor: Nova flavor that will be for nodes in the
created node groups. Deprecated.
:param master_flavor: Nova flavor that will be used for the master
instance of the cluster
:param worker_flavor: Nova flavor that will be used for the workers of
the cluster
:param workers_count: number of worker instances in a cluster
:param plugin_name: name of a provisioning plugin
:param hadoop_version: version of Hadoop distribution supported by
the specified plugin.
:param deltas: list of integers which will be used to add or
remove worker nodes from the cluster
:param floating_ip_pool: floating ip pool name from which Floating
IPs will be allocated. Sahara will determine
automatically how to treat this depending on
its own configurations. Defaults to None
because in some cases Sahara may work w/o
Floating IPs.
:param neutron_net_id: id of a Neutron network that will be used
for fixed IPs. This parameter is ignored when
Nova Network is set up.
:param volumes_per_node: number of Cinder volumes that will be
attached to every cluster node
:param volumes_size: size of each Cinder volume in GB
:param auto_security_group: boolean value. If set to True Sahara will
create a Security Group for each Node Group
in the Cluster automatically.
:param security_groups: list of security groups that will be used
while creating VMs. If auto_security_group
is set to True this list can be left empty.
:param node_configs: configs dict that will be passed to each Node
Group
:param cluster_configs: configs dict that will be passed to the
Cluster
:param enable_anti_affinity: If set to true the vms will be scheduled
one per compute node.
:param enable_proxy: Use Master Node of a Cluster as a Proxy node and
do not assign floating ips to workers.
:param use_autoconfig: If True, instances of the node group will be
automatically configured during cluster
creation. If False, the configuration values
should be specify manually
"""
image_id = self.context["tenant"]["sahara"]["image"]
LOG.debug("Using Image: %s" % image_id)
cluster = self._launch_cluster(
flavor_id=flavor,
master_flavor_id=master_flavor,
worker_flavor_id=worker_flavor,
image_id=image_id,
workers_count=workers_count,
plugin_name=plugin_name,
hadoop_version=hadoop_version,
floating_ip_pool=floating_ip_pool,
volumes_per_node=volumes_per_node,
volumes_size=volumes_size,
auto_security_group=auto_security_group,
security_groups=security_groups,
node_configs=node_configs,
cluster_configs=cluster_configs,
enable_anti_affinity=enable_anti_affinity,
enable_proxy=enable_proxy,
use_autoconfig=use_autoconfig)
for delta in deltas:
# The Cluster is fetched every time so that its node groups have
# correct 'count' values.
cluster = self.clients("sahara").clusters.get(cluster.id)
if delta == 0:
# Zero scaling makes no sense.
continue
elif delta > 0:
self._scale_cluster_up(cluster, delta)
elif delta < 0:
self._scale_cluster_down(cluster, delta)
self._delete_cluster(cluster)

View File

@@ -1,249 +0,0 @@
# Copyright 2015: Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
NODE_PROCESSES = {
"vanilla": {
"1.2.1": {
"master": ["namenode", "jobtracker", "oozie"],
"worker": ["datanode", "tasktracker"]
},
"2.3.0": {
"master": ["namenode", "resourcemanager", "historyserver",
"oozie"],
"worker": ["datanode", "nodemanager"]
},
"2.4.1": {
"master": ["namenode", "resourcemanager", "historyserver",
"oozie"],
"worker": ["datanode", "nodemanager"]
},
"2.6.0": {
"master": ["namenode", "resourcemanager", "historyserver",
"oozie"],
"worker": ["datanode", "nodemanager"]
},
"2.7.1": {
"master": ["namenode", "resourcemanager", "historyserver",
"oozie"],
"worker": ["datanode", "nodemanager"]
}
},
"hdp": {
"1.3.2": {
"master": ["JOBTRACKER", "NAMENODE", "SECONDARY_NAMENODE",
"GANGLIA_SERVER", "NAGIOS_SERVER",
"AMBARI_SERVER", "OOZIE_SERVER"],
"worker": ["TASKTRACKER", "DATANODE", "HDFS_CLIENT",
"MAPREDUCE_CLIENT", "OOZIE_CLIENT", "PIG"]
},
"2.0.6": {
"manager": ["AMBARI_SERVER", "GANGLIA_SERVER",
"NAGIOS_SERVER"],
"master": ["NAMENODE", "SECONDARY_NAMENODE",
"ZOOKEEPER_SERVER", "ZOOKEEPER_CLIENT",
"HISTORYSERVER", "RESOURCEMANAGER",
"OOZIE_SERVER"],
"worker": ["DATANODE", "HDFS_CLIENT", "ZOOKEEPER_CLIENT",
"PIG", "MAPREDUCE2_CLIENT", "YARN_CLIENT",
"NODEMANAGER", "OOZIE_CLIENT"]
},
"2.2": {
"manager": ["AMBARI_SERVER", "GANGLIA_SERVER",
"NAGIOS_SERVER"],
"master": ["NAMENODE", "SECONDARY_NAMENODE",
"ZOOKEEPER_SERVER", "ZOOKEEPER_CLIENT",
"HISTORYSERVER", "RESOURCEMANAGER",
"OOZIE_SERVER"],
"worker": ["DATANODE", "HDFS_CLIENT", "ZOOKEEPER_CLIENT",
"PIG", "MAPREDUCE2_CLIENT", "YARN_CLIENT",
"NODEMANAGER", "OOZIE_CLIENT", "TEZ_CLIENT"]
}
},
"cdh": {
"5": {
"manager": ["CLOUDERA_MANAGER"],
"master": ["HDFS_NAMENODE", "YARN_RESOURCEMANAGER",
"OOZIE_SERVER", "YARN_JOBHISTORY",
"HDFS_SECONDARYNAMENODE", "HIVE_METASTORE",
"HIVE_SERVER2"],
"worker": ["YARN_NODEMANAGER", "HDFS_DATANODE"]
},
"5.4.0": {
"manager": ["CLOUDERA_MANAGER"],
"master": ["HDFS_NAMENODE", "YARN_RESOURCEMANAGER",
"OOZIE_SERVER", "YARN_JOBHISTORY",
"HDFS_SECONDARYNAMENODE", "HIVE_METASTORE",
"HIVE_SERVER2"],
"worker": ["YARN_NODEMANAGER", "HDFS_DATANODE"]
},
"5.5.0": {
"manager": ["CLOUDERA_MANAGER"],
"master": ["HDFS_NAMENODE", "YARN_RESOURCEMANAGER",
"OOZIE_SERVER", "YARN_JOBHISTORY",
"HDFS_SECONDARYNAMENODE", "HIVE_METASTORE",
"HIVE_SERVER2"],
"worker": ["YARN_NODEMANAGER", "HDFS_DATANODE"]
}
},
"spark": {
"1.3.1": {
"master": ["namenode", "master"],
"worker": ["datanode", "slave"]
},
"1.6.0": {
"master": ["namenode", "master"],
"worker": ["datanode", "slave"]
}
},
"ambari": {
"2.3": {
"master-edp": ["Hive Metastore", "HiveServer", "Oozie"],
"master": ["Ambari", "MapReduce History Server",
"Spark History Server", "NameNode", "ResourceManager",
"SecondaryNameNode", "YARN Timeline Server",
"ZooKeeper"],
"worker": ["DataNode", "NodeManager"]
}
},
"mapr": {
"5.0.0.mrv2": {
"master": ["Metrics", "Webserver", "Zookeeper", "HTTPFS",
"Oozie", "FileServer", "CLDB", "Flume", "Hue",
"NodeManager", "HistoryServer", "ResourseManager",
"HiveServer2", "HiveMetastore", "Sqoop2-Client",
"Sqoop2-Server"],
"worker": ["NodeManager", "FileServer"]
},
"5.1.0.mrv2": {
"master": ["Metrics", "Webserver", "Zookeeper", "HTTPFS",
"Oozie", "FileServer", "CLDB", "Flume", "Hue",
"NodeManager", "HistoryServer", "ResourseManager",
"HiveServer2", "HiveMetastore", "Sqoop2-Client",
"Sqoop2-Server"],
"worker": ["NodeManager", "FileServer"]
}
}
}
REPLICATION_CONFIGS = {
"vanilla": {
"1.2.1": {
"target": "HDFS",
"config_name": "dfs.replication"
},
"2.3.0": {
"target": "HDFS",
"config_name": "dfs.replication"
},
"2.4.1": {
"target": "HDFS",
"config_name": "dfs.replication"
},
"2.6.0": {
"target": "HDFS",
"config_name": "dfs.replication"
},
"2.7.1": {
"target": "HDFS",
"config_name": "dfs.replication"
}
},
"hdp": {
"1.3.2": {
"target": "HDFS",
"config_name": "dfs.replication"
},
"2.0.6": {
"target": "HDFS",
"config_name": "dfs.replication"
},
"2.2": {
"target": "HDFS",
"config_name": "dfs.replication"
}
},
"cdh": {
"5": {
"target": "HDFS",
"config_name": "dfs_replication"
},
"5.4.0": {
"target": "HDFS",
"config_name": "dfs_replication"
},
"5.5.0": {
"target": "HDFS",
"config_name": "dfs_replication"
}
},
"spark": {
"1.3.1": {
"target": "HDFS",
"config_name": "dfs_replication"
},
"1.6.0": {
"target": "HDFS",
"config_name": "dfs_replication"
}
},
"ambari": {
"2.3": {
"target": "HDFS",
"config_name": "dfs_replication"
}
},
"mapr": {
"5.0.0.mrv2": {
"target": "HDFS",
"config_name": "dfs.replication"
},
"5.1.0.mrv2": {
"target": "HDFS",
"config_name": "dfs.replication"
}
}
}
ANTI_AFFINITY_PROCESSES = {
"vanilla": {
"1.2.1": ["datanode"],
"2.3.0": ["datanode"],
"2.4.1": ["datanode"],
"2.6.0": ["datanode"],
"2.7.1": ["datanode"]
},
"hdp": {
"1.3.2": ["DATANODE"],
"2.0.6": ["DATANODE"],
"2.2": ["DATANODE"]
},
"cdh": {
"5": ["HDFS_DATANODE"],
"5.4.0": ["HDFS_DATANODE"],
"5.5.0": ["HDFS_DATANODE"]
},
"spark": {
"1.3.1": ["datanode"],
"1.6.0": ["datanode"]
},
"ambari": {
"2.3": ["DataNode"],
},
"mapr": {
"5.0.0.mrv2": ["FileServer"],
"5.1.0.mrv2": ["FileServer"],
}
}

View File

@@ -1,143 +0,0 @@
# Copyright 2014: Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from rally.common import logging
from rally.task import validation
from rally_openstack.common import consts
from rally_openstack.task import scenario
from rally_openstack.task.scenarios.sahara import utils
LOG = logging.getLogger(__name__)
@validation.add("required_services", services=[consts.Service.SAHARA])
@validation.add("required_contexts", contexts=["users", "sahara_image",
"sahara_job_binaries",
"sahara_cluster"])
@scenario.configure(context={"cleanup@openstack": ["sahara"]},
name="SaharaJob.create_launch_job",
platform="openstack")
class CreateLaunchJob(utils.SaharaScenario):
def run(self, job_type, configs, job_idx=0):
"""Create and execute a Sahara EDP Job.
This scenario Creates a Job entity and launches an execution on a
Cluster.
:param job_type: type of the Data Processing Job
:param configs: config dict that will be passed to a Job Execution
:param job_idx: index of a job in a sequence. This index will be
used to create different atomic actions for each job
in a sequence
"""
mains = self.context["tenant"]["sahara"]["mains"]
libs = self.context["tenant"]["sahara"]["libs"]
name = self.generate_random_name()
job = self.clients("sahara").jobs.create(name=name,
type=job_type,
description="",
mains=mains,
libs=libs)
cluster_id = self.context["tenant"]["sahara"]["cluster"]
if job_type.lower() == "java":
input_id = None
output_id = None
else:
input_id = self.context["tenant"]["sahara"]["input"]
output_id = self._create_output_ds().id
self._run_job_execution(job_id=job.id,
cluster_id=cluster_id,
input_id=input_id,
output_id=output_id,
configs=configs,
job_idx=job_idx)
@validation.add("required_services", services=[consts.Service.SAHARA])
@validation.add("required_contexts", contexts=["users", "sahara_image",
"sahara_job_binaries",
"sahara_cluster"])
@scenario.configure(context={"cleanup@openstack": ["sahara"]},
name="SaharaJob.create_launch_job_sequence",
platform="openstack")
class CreateLaunchJobSequence(utils.SaharaScenario):
def run(self, jobs):
"""Create and execute a sequence of the Sahara EDP Jobs.
This scenario Creates a Job entity and launches an execution on a
Cluster for every job object provided.
:param jobs: list of jobs that should be executed in one context
"""
launch_job = CreateLaunchJob(self.context)
for idx, job in enumerate(jobs):
LOG.debug("Launching Job. Sequence #%d" % idx)
launch_job.run(job["job_type"], job["configs"], idx)
@validation.add("required_services", services=[consts.Service.SAHARA])
@validation.add("required_contexts", contexts=["users", "sahara_image",
"sahara_job_binaries",
"sahara_cluster"])
@scenario.configure(context={"cleanup@openstack": ["sahara"]},
name="SaharaJob.create_launch_job_sequence_with_scaling",
platform="openstack")
class CreateLaunchJobSequenceWithScaling(utils.SaharaScenario,):
def run(self, jobs, deltas):
"""Create and execute Sahara EDP Jobs on a scaling Cluster.
This scenario Creates a Job entity and launches an execution on a
Cluster for every job object provided. The Cluster is scaled according
to the deltas values and the sequence is launched again.
:param jobs: list of jobs that should be executed in one context
:param deltas: list of integers which will be used to add or
remove worker nodes from the cluster
"""
cluster_id = self.context["tenant"]["sahara"]["cluster"]
launch_job_sequence = CreateLaunchJobSequence(self.context)
launch_job_sequence.run(jobs)
for delta in deltas:
# The Cluster is fetched every time so that its node groups have
# correct 'count' values.
cluster = self.clients("sahara").clusters.get(cluster_id)
LOG.debug("Scaling cluster %s with delta %d"
% (cluster.name, delta))
if delta == 0:
# Zero scaling makes no sense.
continue
elif delta > 0:
self._scale_cluster_up(cluster, delta)
elif delta < 0:
self._scale_cluster_down(cluster, delta)
LOG.debug("Starting Job sequence")
launch_job_sequence.run(jobs)

View File

@@ -1,116 +0,0 @@
# Copyright 2014: Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from rally.task import types
from rally.task import validation
from rally_openstack.common import consts
from rally_openstack.task import scenario
from rally_openstack.task.scenarios.sahara import utils
"""Scenarios for Sahara node group templates."""
@types.convert(flavor={"type": "nova_flavor"})
@validation.add("flavor_exists", param_name="flavor")
@validation.add("required_services", services=[consts.Service.SAHARA])
@validation.add("required_platform", platform="openstack", users=True)
@scenario.configure(
context={"cleanup@openstack": ["sahara"]},
name="SaharaNodeGroupTemplates.create_and_list_node_group_templates",
platform="openstack")
class CreateAndListNodeGroupTemplates(utils.SaharaScenario):
def run(self, flavor, plugin_name="vanilla",
hadoop_version="1.2.1", use_autoconfig=True):
"""Create and list Sahara Node Group Templates.
This scenario creates two Node Group Templates with different set of
node processes. The master Node Group Template contains Hadoop's
management processes. The worker Node Group Template contains
Hadoop's worker processes.
By default the templates are created for the vanilla Hadoop
provisioning plugin using the version 1.2.1
After the templates are created the list operation is called.
:param flavor: Nova flavor that will be for nodes in the
created node groups
:param plugin_name: name of a provisioning plugin
:param hadoop_version: version of Hadoop distribution supported by
the specified plugin.
:param use_autoconfig: If True, instances of the node group will be
automatically configured during cluster
creation. If False, the configuration values
should be specify manually
"""
self._create_master_node_group_template(flavor_id=flavor,
plugin_name=plugin_name,
hadoop_version=hadoop_version,
use_autoconfig=use_autoconfig)
self._create_worker_node_group_template(flavor_id=flavor,
plugin_name=plugin_name,
hadoop_version=hadoop_version,
use_autoconfig=use_autoconfig)
self._list_node_group_templates()
@types.convert(flavor={"type": "nova_flavor"})
@validation.add("flavor_exists", param_name="flavor")
@validation.add("required_services", services=[consts.Service.SAHARA])
@validation.add("required_platform", platform="openstack", users=True)
@scenario.configure(
context={"cleanup@openstack": ["sahara"]},
name="SaharaNodeGroupTemplates.create_delete_node_group_templates",
platform="openstack")
class CreateDeleteNodeGroupTemplates(utils.SaharaScenario):
def run(self, flavor, plugin_name="vanilla",
hadoop_version="1.2.1", use_autoconfig=True):
"""Create and delete Sahara Node Group Templates.
This scenario creates and deletes two most common types of
Node Group Templates.
By default the templates are created for the vanilla Hadoop
provisioning plugin using the version 1.2.1
:param flavor: Nova flavor that will be for nodes in the
created node groups
:param plugin_name: name of a provisioning plugin
:param hadoop_version: version of Hadoop distribution supported by
the specified plugin.
:param use_autoconfig: If True, instances of the node group will be
automatically configured during cluster
creation. If False, the configuration values
should be specify manually
"""
master_ngt = self._create_master_node_group_template(
flavor_id=flavor,
plugin_name=plugin_name,
hadoop_version=hadoop_version,
use_autoconfig=use_autoconfig)
worker_ngt = self._create_worker_node_group_template(
flavor_id=flavor,
plugin_name=plugin_name,
hadoop_version=hadoop_version,
use_autoconfig=use_autoconfig)
self._delete_node_group_template(master_ngt)
self._delete_node_group_template(worker_ngt)

View File

@@ -1,589 +0,0 @@
# Copyright 2014: Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import random
from oslo_utils import uuidutils
from rally.common import cfg
from rally.common import logging
from rally.common import utils as rutils
from rally import exceptions
from rally.task import atomic
from rally.task import utils
from rally_openstack.common import consts
from rally_openstack.task import scenario
from rally_openstack.task.scenarios.sahara import consts as sahara_consts
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
class SaharaScenario(scenario.OpenStackScenario):
"""Base class for Sahara scenarios with basic atomic actions."""
# NOTE(sskripnick): Some sahara resource names are validated as hostnames.
# Since underscores are not allowed in hostnames we should not use them.
RESOURCE_NAME_FORMAT = "rally-sahara-XXXXXX-XXXXXXXXXXXXXXXX"
@atomic.action_timer("sahara.list_node_group_templates")
def _list_node_group_templates(self):
"""Return user Node Group Templates list."""
return self.clients("sahara").node_group_templates.list()
@atomic.action_timer("sahara.create_master_node_group_template")
def _create_master_node_group_template(self, flavor_id, plugin_name,
hadoop_version,
use_autoconfig=True):
"""Create a master Node Group Template with a random name.
:param flavor_id: The required argument for the Template
:param plugin_name: Sahara provisioning plugin name
:param hadoop_version: The version of Hadoop distribution supported by
the plugin
:param use_autoconfig: If True, instances of the node group will be
automatically configured during cluster
creation. If False, the configuration values
should be specify manually
:returns: The created Template
"""
name = self.generate_random_name()
return self.clients("sahara").node_group_templates.create(
name=name,
plugin_name=plugin_name,
hadoop_version=hadoop_version,
flavor_id=flavor_id,
node_processes=sahara_consts.NODE_PROCESSES[plugin_name]
[hadoop_version]["master"],
use_autoconfig=use_autoconfig)
@atomic.action_timer("sahara.create_worker_node_group_template")
def _create_worker_node_group_template(self, flavor_id, plugin_name,
hadoop_version, use_autoconfig):
"""Create a worker Node Group Template with a random name.
:param flavor_id: The required argument for the Template
:param plugin_name: Sahara provisioning plugin name
:param hadoop_version: The version of Hadoop distribution supported by
the plugin
:param use_autoconfig: If True, instances of the node group will be
automatically configured during cluster
creation. If False, the configuration values
should be specify manually
:returns: The created Template
"""
name = self.generate_random_name()
return self.clients("sahara").node_group_templates.create(
name=name,
plugin_name=plugin_name,
hadoop_version=hadoop_version,
flavor_id=flavor_id,
node_processes=sahara_consts.NODE_PROCESSES[plugin_name]
[hadoop_version]["worker"],
use_autoconfig=use_autoconfig)
@atomic.action_timer("sahara.delete_node_group_template")
def _delete_node_group_template(self, node_group):
"""Delete a Node Group Template by id.
:param node_group: The Node Group Template to be deleted
"""
self.clients("sahara").node_group_templates.delete(node_group.id)
def _wait_active(self, cluster_object):
utils.wait_for_status(
resource=cluster_object, ready_statuses=["active"],
failure_statuses=["error"], update_resource=self._update_cluster,
timeout=CONF.openstack.sahara_cluster_create_timeout,
check_interval=CONF.openstack.sahara_cluster_check_interval)
def _setup_neutron_floating_ip_pool(self, name_or_id):
if name_or_id:
if uuidutils.is_uuid_like(name_or_id):
# Looks like an id is provided Return as is.
return name_or_id
else:
# It's a name. Changing to id.
for net in self.clients("neutron").list_networks()["networks"]:
if net["name"] == name_or_id:
return net["id"]
# If the name is not found in the list. Exit with error.
raise exceptions.ContextSetupFailure(
ctx_name=self.get_name(),
msg="Could not resolve Floating IP Pool name %s to id"
% name_or_id)
else:
# Pool is not provided. Using the one set as GW for current router.
net = self.context["tenant"]["networks"][0]
router_id = net["router_id"]
router = self.clients("neutron").show_router(router_id)["router"]
net_id = router["external_gateway_info"]["network_id"]
return net_id
def _setup_nova_floating_ip_pool(self, name):
if name:
# The name is provided returning it as is.
return name
else:
# The name is not provided. Discovering
LOG.debug("No Floating Ip Pool provided. Taking random.")
pools = self.clients("nova").floating_ip_pools.list()
if pools:
return random.choice(pools).name
else:
LOG.warning("No Floating Ip Pools found. This may cause "
"instances to be unreachable.")
return None
def _setup_floating_ip_pool(self, node_groups, floating_ip_pool,
enable_proxy):
if consts.Service.NEUTRON in self.clients("services").values():
LOG.debug("Neutron detected as networking backend.")
floating_ip_pool_value = self._setup_neutron_floating_ip_pool(
floating_ip_pool)
else:
LOG.debug("Nova Network detected as networking backend.")
floating_ip_pool_value = self._setup_nova_floating_ip_pool(
floating_ip_pool)
if floating_ip_pool_value:
LOG.debug("Using floating ip pool %s." % floating_ip_pool_value)
# If the pool is set by any means assign it to all node groups.
# If the proxy node feature is enabled, Master Node Group and
# Proxy Workers should have a floating ip pool set up
if enable_proxy:
proxy_groups = [x for x in node_groups
if x["name"] in ("master-ng", "proxy-ng")]
for ng in proxy_groups:
ng["is_proxy_gateway"] = True
ng["floating_ip_pool"] = floating_ip_pool_value
else:
for ng in node_groups:
ng["floating_ip_pool"] = floating_ip_pool_value
return node_groups
def _setup_volumes(self, node_groups, volumes_per_node, volumes_size):
if volumes_per_node:
LOG.debug("Adding volumes config to Node Groups")
for ng in node_groups:
ng_name = ng["name"]
if "worker" in ng_name or "proxy" in ng_name:
# NOTE: Volume storage is used only by HDFS Datanode
# process which runs on workers and proxies.
ng["volumes_per_node"] = volumes_per_node
ng["volumes_size"] = volumes_size
return node_groups
def _setup_security_groups(self, node_groups, auto_security_group,
security_groups):
if auto_security_group:
LOG.debug("Auto security group enabled. Adding to Node Groups.")
if security_groups:
LOG.debug("Adding provided Security Groups to Node Groups.")
for ng in node_groups:
if auto_security_group:
ng["auto_security_group"] = auto_security_group
if security_groups:
ng["security_groups"] = security_groups
return node_groups
def _setup_node_configs(self, node_groups, node_configs):
if node_configs:
LOG.debug("Adding Hadoop configs to Node Groups")
for ng in node_groups:
ng["node_configs"] = node_configs
return node_groups
def _setup_node_autoconfig(self, node_groups, node_autoconfig):
LOG.debug("Adding auto-config par to Node Groups")
for ng in node_groups:
ng["use_autoconfig"] = node_autoconfig
return node_groups
def _setup_replication_config(self, hadoop_version, workers_count,
plugin_name):
replication_value = min(workers_count, 3)
# 3 is a default Hadoop replication
conf = sahara_consts.REPLICATION_CONFIGS[plugin_name][hadoop_version]
LOG.debug("Using replication factor: %s" % replication_value)
replication_config = {
conf["target"]: {
conf["config_name"]: replication_value
}
}
return replication_config
@logging.log_deprecated_args("`flavor_id` argument is deprecated. Use "
"`master_flavor_id` and `worker_flavor_id` "
"parameters.", rally_version="0.2.0",
deprecated_args=["flavor_id"])
@atomic.action_timer("sahara.launch_cluster")
def _launch_cluster(self, plugin_name, hadoop_version, master_flavor_id,
worker_flavor_id, image_id, workers_count,
flavor_id=None,
floating_ip_pool=None, volumes_per_node=None,
volumes_size=None, auto_security_group=None,
security_groups=None, node_configs=None,
cluster_configs=None, enable_anti_affinity=False,
enable_proxy=False,
wait_active=True,
use_autoconfig=True):
"""Create a cluster and wait until it becomes Active.
The cluster is created with two node groups. The master Node Group is
created with one instance. The worker node group contains
node_count - 1 instances.
:param plugin_name: provisioning plugin name
:param hadoop_version: Hadoop version supported by the plugin
:param master_flavor_id: flavor which will be used to create master
instance
:param worker_flavor_id: flavor which will be used to create workers
:param image_id: image id that will be used to boot instances
:param workers_count: number of worker instances. All plugins will
also add one Master instance and some plugins
add a Manager instance.
:param floating_ip_pool: floating ip pool name from which Floating
IPs will be allocated
:param volumes_per_node: number of Cinder volumes that will be
attached to every cluster node
:param volumes_size: size of each Cinder volume in GB
:param auto_security_group: boolean value. If set to True Sahara will
create a Security Group for each Node Group
in the Cluster automatically.
:param security_groups: list of security groups that will be used
while creating VMs. If auto_security_group is
set to True, this list can be left empty.
:param node_configs: configs dict that will be passed to each Node
Group
:param cluster_configs: configs dict that will be passed to the
Cluster
:param enable_anti_affinity: If set to true the vms will be scheduled
one per compute node.
:param enable_proxy: Use Master Node of a Cluster as a Proxy node and
do not assign floating ips to workers.
:param wait_active: Wait until a Cluster gets int "Active" state
:param use_autoconfig: If True, instances of the node group will be
automatically configured during cluster
creation. If False, the configuration values
should be specify manually
:returns: created cluster
"""
if enable_proxy:
proxies_count = int(
workers_count / CONF.openstack.sahara_workers_per_proxy)
else:
proxies_count = 0
if flavor_id:
# Note: the deprecated argument is used. Falling back to single
# flavor behavior.
master_flavor_id = flavor_id
worker_flavor_id = flavor_id
node_groups = [
{
"name": "master-ng",
"flavor_id": master_flavor_id,
"node_processes": sahara_consts.NODE_PROCESSES[plugin_name]
[hadoop_version]["master"],
"count": 1
}, {
"name": "worker-ng",
"flavor_id": worker_flavor_id,
"node_processes": sahara_consts.NODE_PROCESSES[plugin_name]
[hadoop_version]["worker"],
"count": workers_count - proxies_count
}
]
if proxies_count:
node_groups.append({
"name": "proxy-ng",
"flavor_id": worker_flavor_id,
"node_processes": sahara_consts.NODE_PROCESSES[plugin_name]
[hadoop_version]["worker"],
"count": proxies_count
})
if "manager" in (sahara_consts.NODE_PROCESSES[plugin_name]
[hadoop_version]):
# Adding manager group separately as it is supported only in
# specific configurations.
node_groups.append({
"name": "manager-ng",
"flavor_id": master_flavor_id,
"node_processes": sahara_consts.NODE_PROCESSES[plugin_name]
[hadoop_version]["manager"],
"count": 1
})
node_groups = self._setup_floating_ip_pool(node_groups,
floating_ip_pool,
enable_proxy)
neutron_net_id = self._get_neutron_net_id()
node_groups = self._setup_volumes(node_groups, volumes_per_node,
volumes_size)
node_groups = self._setup_security_groups(node_groups,
auto_security_group,
security_groups)
node_groups = self._setup_node_configs(node_groups, node_configs)
node_groups = self._setup_node_autoconfig(node_groups, use_autoconfig)
replication_config = self._setup_replication_config(hadoop_version,
workers_count,
plugin_name)
# The replication factor should be set for small clusters. However the
# cluster_configs parameter can override it
merged_cluster_configs = self._merge_configs(replication_config,
cluster_configs)
aa_processes = None
if enable_anti_affinity:
aa_processes = (sahara_consts.ANTI_AFFINITY_PROCESSES[plugin_name]
[hadoop_version])
name = self.generate_random_name()
cluster_object = self.clients("sahara").clusters.create(
name=name,
plugin_name=plugin_name,
hadoop_version=hadoop_version,
node_groups=node_groups,
default_image_id=image_id,
net_id=neutron_net_id,
cluster_configs=merged_cluster_configs,
anti_affinity=aa_processes,
use_autoconfig=use_autoconfig
)
if wait_active:
LOG.debug("Starting cluster `%s`" % name)
self._wait_active(cluster_object)
return self.clients("sahara").clusters.get(cluster_object.id)
def _update_cluster(self, cluster):
return self.clients("sahara").clusters.get(cluster.id)
def _scale_cluster(self, cluster, delta):
"""The scaling helper.
This method finds the worker node group in a cluster, builds a
scale_object required by Sahara API and waits for the scaling to
complete.
NOTE: This method is not meant to be called directly in scenarios.
There two specific scaling methods of up and down scaling which have
different atomic timers.
"""
worker_node_group = [g for g in cluster.node_groups
if "worker" in g["name"]][0]
scale_object = {
"resize_node_groups": [
{
"name": worker_node_group["name"],
"count": worker_node_group["count"] + delta
}
]
}
self.clients("sahara").clusters.scale(cluster.id, scale_object)
self._wait_active(cluster)
@atomic.action_timer("sahara.scale_up")
def _scale_cluster_up(self, cluster, delta):
"""Add a given number of worker nodes to the cluster.
:param cluster: The cluster to be scaled
:param delta: The number of workers to be added. (A positive number is
expected here)
"""
self._scale_cluster(cluster, delta)
@atomic.action_timer("sahara.scale_down")
def _scale_cluster_down(self, cluster, delta):
"""Remove a given number of worker nodes from the cluster.
:param cluster: The cluster to be scaled
:param delta: The number of workers to be removed. (A negative number
is expected here)
"""
self._scale_cluster(cluster, delta)
@atomic.action_timer("sahara.delete_cluster")
def _delete_cluster(self, cluster):
"""Delete cluster.
:param cluster: cluster to delete
"""
LOG.debug("Deleting cluster `%s`" % cluster.name)
self.clients("sahara").clusters.delete(cluster.id)
utils.wait_for(
resource=cluster,
timeout=CONF.openstack.sahara_cluster_delete_timeout,
check_interval=CONF.openstack.sahara_cluster_check_interval,
is_ready=self._is_cluster_deleted)
def _is_cluster_deleted(self, cluster):
from saharaclient.api import base as sahara_base
LOG.debug("Checking cluster `%s` to be deleted. Status: `%s`"
% (cluster.name, cluster.status))
try:
self.clients("sahara").clusters.get(cluster.id)
return False
except sahara_base.APIException:
return True
def _create_output_ds(self):
"""Create an output Data Source based on EDP context
:returns: The created Data Source
"""
ds_type = self.context["sahara"]["output_conf"]["output_type"]
url_prefix = self.context["sahara"]["output_conf"]["output_url_prefix"]
if ds_type == "swift":
raise exceptions.RallyException(
"Swift Data Sources are not implemented yet")
url = url_prefix.rstrip("/") + "/%s" % self.generate_random_name()
return self.clients("sahara").data_sources.create(
name=self.generate_random_name(),
description="",
data_source_type=ds_type,
url=url)
def _run_job_execution(self, job_id, cluster_id, input_id, output_id,
configs, job_idx):
"""Run a Job Execution and wait until it completes or fails.
The Job Execution is accepted as successful when Oozie reports
"success" or "succeeded" status. The failure statuses are "failed" and
"killed".
The timeout and the polling interval may be configured through
"sahara_job_execution_timeout" and "sahara_job_check_interval"
parameters under the "benchmark" section.
:param job_id: The Job id that will be executed
:param cluster_id: The Cluster id which will execute the Job
:param input_id: The input Data Source id
:param output_id: The output Data Source id
:param configs: The config dict that will be passed as Job Execution's
parameters.
:param job_idx: The index of a job in a sequence
"""
@atomic.action_timer("sahara.job_execution_%s" % job_idx)
def run(self):
job_execution = self.clients("sahara").job_executions.create(
job_id=job_id,
cluster_id=cluster_id,
input_id=input_id,
output_id=output_id,
configs=configs)
utils.wait_for(
resource=job_execution.id,
is_ready=self._job_execution_is_finished,
timeout=CONF.openstack.sahara_job_execution_timeout,
check_interval=CONF.openstack.sahara_job_check_interval)
run(self)
def _job_execution_is_finished(self, je_id):
status = self.clients("sahara").job_executions.get(je_id).info[
"status"].lower()
LOG.debug("Checking for Job Execution %s to complete. Status: %s"
% (je_id, status))
if status in ("success", "succeeded"):
return True
elif status in ("failed", "killed"):
raise exceptions.RallyException(
"Job execution %s has failed" % je_id)
return False
def _merge_configs(self, *configs):
"""Merge configs in special format.
It supports merging of configs in the following format:
applicable_target -> config_name -> config_value
"""
result = {}
for config_dict in configs:
if config_dict:
for a_target in config_dict:
if a_target not in result or not result[a_target]:
result[a_target] = {}
result[a_target].update(config_dict[a_target])
return result
def _get_neutron_net_id(self):
"""Get the Neutron Network id from context.
If Nova Network is used as networking backend, None is returned.
:returns: Network id for Neutron or None for Nova Networking.
"""
if consts.Service.NEUTRON not in self.clients("services").values():
return None
# Taking net id from context.
net = self.context["tenant"]["networks"][0]
neutron_net_id = net["id"]
LOG.debug("Using neutron network %s." % neutron_net_id)
LOG.debug("Using neutron router %s." % net["router_id"])
return neutron_net_id
def init_sahara_context(context_instance):
context_instance.context["sahara"] = context_instance.context.get("sahara",
{})
for user, tenant_id in rutils.iterate_per_tenants(
context_instance.context["users"]):
context_instance.context["tenants"][tenant_id]["sahara"] = (
context_instance.context["tenants"][tenant_id].get("sahara", {}))

View File

@@ -67,15 +67,6 @@ class TempestConfigfileManager(object):
self.conf.set(section_name, "admin_domain_name",
self.credential.user_domain_name or "Default")
# Sahara has two service types: 'data_processing' and 'data-processing'.
# 'data_processing' is deprecated, but it can be used in previous OpenStack
# releases. So we need to configure the 'catalog_type' option to support
# environments where 'data_processing' is used as service type for Sahara.
def _configure_data_processing(self, section_name="data-processing"):
if "sahara" in self.available_services:
self.conf.set(section_name, "catalog_type",
self._get_service_type_by_service_name("sahara"))
def _configure_identity(self, section_name="identity"):
self.conf.set(section_name, "region",
self.credential.region_name)
@@ -190,7 +181,7 @@ class TempestConfigfileManager(object):
def _configure_service_available(self, section_name="service_available"):
services = ["cinder", "glance", "heat", "ironic", "neutron", "nova",
"sahara", "swift"]
"swift"]
for service in services:
# Convert boolean to string because ConfigParser fails
# on attempt to get option with boolean value

View File

@@ -20,7 +20,6 @@ class _TempestApiTestSets(utils.ImmutableMixin, utils.EnumMixin):
BAREMETAL = "baremetal"
CLUSTERING = "clustering"
COMPUTE = "compute"
DATA_PROCESSING = "data_processing"
DATABASE = "database"
IDENTITY = "identity"
IMAGE = "image"

View File

@@ -25,7 +25,6 @@ python-monascaclient # Apache Software License
python-neutronclient # Apache Software License
python-novaclient # Apache License, Version 2.0
python-octaviaclient # Apache Software License
python-saharaclient # Apache License, Version 2.0
python-senlinclient # Apache Software License
python-swiftclient # Apache License, Version 2.0
python-troveclient # Apache Software License

View File

@@ -1,29 +0,0 @@
{
"Dummy.openstack": [
{
"args": {
"sleep": 0.1
},
"runner": {
"type": "constant",
"times": 4,
"concurrency": 2
},
"context": {
"users": {
"tenants": 1,
"users_per_tenant": 1
},
"sahara_cluster": {
"master_flavor_id": "4",
"worker_flavor_id": "3",
"workers_count": 3,
"plugin_name": "vanilla",
"hadoop_version": "2.6.0",
"auto_security_group": true
},
"network": {}
}
}
]
}

View File

@@ -1,21 +0,0 @@
---
Dummy.openstack:
-
args:
sleep: 0.1
runner:
type: "constant"
times: 4
concurrency: 2
context:
users:
tenants: 1
users_per_tenant: 1
sahara_cluster:
master_flavor_id: "4"
worker_flavor_id: "3"
workers_count: 3
plugin_name: "vanilla"
hadoop_version: "2.6.0"
auto_security_group: True
network: {}

View File

@@ -1,27 +0,0 @@
{
"Dummy.openstack": [
{
"args": {
"sleep": 0.1
},
"runner": {
"type": "constant",
"times": 4,
"concurrency": 2
},
"context": {
"users": {
"tenants": 1,
"users_per_tenant": 1
},
"sahara_image": {
"image_url": "http://sahara-files.mirantis.com/sahara-icehouse-vanilla-2.3.0-ubuntu-13.10.qcow2",
"username": "ubuntu",
"plugin_name": "vanilla",
"hadoop_version": "2.3.0"
},
"network": {}
}
}
]
}

View File

@@ -1,19 +0,0 @@
---
Dummy.openstack:
-
args:
sleep: 0.1
runner:
type: "constant"
times: 4
concurrency: 2
context:
users:
tenants: 1
users_per_tenant: 1
sahara_image:
image_url: "http://sahara-files.mirantis.com/sahara-icehouse-vanilla-2.3.0-ubuntu-13.10.qcow2"
username: "ubuntu"
plugin_name: "vanilla"
hadoop_version: "2.3.0"
network: {}

View File

@@ -1,29 +0,0 @@
{
"Dummy.openstack": [
{
"args": {
"sleep": 0.1
},
"runner": {
"type": "constant",
"times": 4,
"concurrency": 2
},
"context": {
"users": {
"tenants": 1,
"users_per_tenant": 1
},
"sahara_input_data_sources": {
"input_type": "hdfs",
"input_url": "/"
},
"sahara_output_data_sources": {
"output_type": "hdfs",
"output_url_prefix": "/out_"
},
"network": {}
}
}
]
}

View File

@@ -1,20 +0,0 @@
---
Dummy.openstack:
-
args:
sleep: 0.1
runner:
type: "constant"
times: 4
concurrency: 2
context:
users:
tenants: 1
users_per_tenant: 1
sahara_input_data_sources:
input_type: "hdfs"
input_url: "/"
sahara_output_data_sources:
output_type: "hdfs"
output_url_prefix: "/out_"
network: {}

View File

@@ -1,54 +0,0 @@
{
"Dummy.openstack": [
{
"args": {
"sleep": 0.1
},
"runner": {
"type": "constant",
"times": 4,
"concurrency": 2
},
"context": {
"users": {
"tenants": 1,
"users_per_tenant": 1
},
"sahara_job_binaries": {
"libs": [{
"name": "tests.jar",
"download_url": "http://repo1.maven.org/maven2/org/apache/hadoop/hadoop-hdfs/2.6.0/hadoop-hdfs-2.6.0-tests.jar"
}]
},
"network": {}
}
},
{
"args": {
"sleep": 0.1
},
"runner": {
"type": "constant",
"times": 4,
"concurrency": 2
},
"context": {
"users": {
"tenants": 1,
"users_per_tenant": 1
},
"sahara_job_binaries": {
"mains": [{
"name": "example.pig",
"download_url": "https://raw.githubusercontent.com/openstack/sahara/master/etc/edp-examples/pig-job/example.pig"
}],
"libs": [{
"name": "udf.jar",
"download_url": "https://github.com/openstack/sahara/blob/master/etc/edp-examples/pig-job/udf.jar?raw=true"
}]
},
"network": {}
}
}
]
}

View File

@@ -1,40 +0,0 @@
---
Dummy.openstack:
-
args:
sleep: 0.1
runner:
type: "constant"
times: 4
concurrency: 2
context:
users:
tenants: 1
users_per_tenant: 1
sahara_job_binaries:
libs:
-
name: "tests.jar"
download_url: "http://repo1.maven.org/maven2/org/apache/hadoop/hadoop-hdfs/2.6.0/hadoop-hdfs-2.6.0-tests.jar"
network: {}
-
args:
sleep: 0.1
runner:
type: "constant"
times: 4
concurrency: 2
context:
users:
tenants: 1
users_per_tenant: 1
sahara_job_binaries:
mains:
-
name: "example.pig"
download_url: "https://raw.githubusercontent.com/openstack/sahara/master/etc/edp-examples/pig-job/example.pig"
libs:
-
name: "udf.jar"
download_url: "https://github.com/openstack/sahara/blob/master/etc/edp-examples/pig-job/udf.jar?raw=true"
network: {}

View File

@@ -1,29 +0,0 @@
{
"Dummy.openstack": [
{
"args": {
"sleep": 0.1
},
"runner": {
"type": "constant",
"times": 4,
"concurrency": 2
},
"context": {
"users": {
"tenants": 1,
"users_per_tenant": 1
},
"sahara_input_data_sources": {
"input_type": "hdfs",
"input_url": "/"
},
"sahara_output_data_sources": {
"output_type": "hdfs",
"output_url_prefix": "/out_"
},
"network": {}
}
}
]
}

View File

@@ -1,20 +0,0 @@
---
Dummy.openstack:
-
args:
sleep: 0.1
runner:
type: "constant"
times: 4
concurrency: 2
context:
users:
tenants: 1
users_per_tenant: 1
sahara_input_data_sources:
input_type: "hdfs"
input_url: "/"
sahara_output_data_sources:
output_type: "hdfs"
output_url_prefix: "/out_"
network: {}

View File

@@ -1,41 +0,0 @@
{
"SaharaClusters.create_and_delete_cluster": [
{
"args": {
"master_flavor": {
"name": "m1.large"
},
"worker_flavor": {
"name": "m1.medium"
},
"workers_count": 3,
"plugin_name": "vanilla",
"hadoop_version": "2.3.0",
"auto_security_group": true
},
"runner": {
"type": "constant",
"times": 4,
"concurrency": 2
},
"context": {
"users": {
"tenants": 1,
"users_per_tenant": 1
},
"sahara_image": {
"image_url": "http://sahara-files.mirantis.com/sahara-icehouse-vanilla-2.3.0-ubuntu-13.10.qcow2",
"username": "ubuntu",
"plugin_name": "vanilla",
"hadoop_version": "2.3.0"
},
"network": {}
},
"sla": {
"failure_rate": {
"max": 0
}
}
}
]
}

View File

@@ -1,29 +0,0 @@
---
SaharaClusters.create_and_delete_cluster:
-
args:
master_flavor:
name: "m1.large"
worker_flavor:
name: "m1.medium"
workers_count: 3
plugin_name: "vanilla"
hadoop_version: "2.3.0"
auto_security_group: True
runner:
type: "constant"
times: 4
concurrency: 2
context:
users:
tenants: 1
users_per_tenant: 1
sahara_image:
image_url: "http://sahara-files.mirantis.com/sahara-icehouse-vanilla-2.3.0-ubuntu-13.10.qcow2"
username: "ubuntu"
plugin_name: "vanilla"
hadoop_version: "2.3.0"
network: {}
sla:
failure_rate:
max: 0

View File

@@ -1,27 +0,0 @@
{
"SaharaNodeGroupTemplates.create_and_list_node_group_templates": [
{
"args": {
"flavor": {
"name": "m1.small"
}
},
"runner": {
"type": "constant",
"times": 100,
"concurrency": 10
},
"context": {
"users": {
"tenants": 1,
"users_per_tenant": 1
}
},
"sla": {
"failure_rate": {
"max": 0
}
}
}
]
}

View File

@@ -1,17 +0,0 @@
---
SaharaNodeGroupTemplates.create_and_list_node_group_templates:
-
args:
flavor:
name: "m1.small"
runner:
type: "constant"
times: 100
concurrency: 10
context:
users:
tenants: 1
users_per_tenant: 1
sla:
failure_rate:
max: 0

View File

@@ -1,27 +0,0 @@
{
"SaharaNodeGroupTemplates.create_delete_node_group_templates": [
{
"args": {
"flavor": {
"name": "m1.small"
}
},
"runner": {
"type": "constant",
"times": 100,
"concurrency": 10
},
"context": {
"users": {
"tenants": 1,
"users_per_tenant": 1
}
},
"sla": {
"failure_rate": {
"max": 0
}
}
}
]
}

View File

@@ -1,17 +0,0 @@
---
SaharaNodeGroupTemplates.create_delete_node_group_templates:
-
args:
flavor:
name: "m1.small"
runner:
type: "constant"
times: 100
concurrency: 10
context:
users:
tenants: 1
users_per_tenant: 1
sla:
failure_rate:
max: 0

View File

@@ -1,42 +0,0 @@
{
"SaharaClusters.create_scale_delete_cluster": [
{
"args": {
"master_flavor": {
"name": "m1.large"
},
"worker_flavor": {
"name": "m1.medium"
},
"workers_count": 3,
"deltas": [1, -1, 1, -1],
"plugin_name": "vanilla",
"hadoop_version": "2.3.0",
"auto_security_group": true
},
"runner": {
"type": "constant",
"times": 4,
"concurrency": 2
},
"context": {
"users": {
"tenants": 1,
"users_per_tenant": 1
},
"sahara_image": {
"image_url": "http://sahara-files.mirantis.com/sahara-icehouse-vanilla-2.3.0-ubuntu-13.10.qcow2",
"username": "ubuntu",
"plugin_name": "vanilla",
"hadoop_version": "2.3.0"
},
"network": {}
},
"sla": {
"failure_rate": {
"max": 0
}
}
}
]
}

View File

@@ -1,34 +0,0 @@
---
SaharaClusters.create_scale_delete_cluster:
-
args:
master_flavor:
name: "m1.large"
worker_flavor:
name: "m1.medium"
workers_count: 3
deltas:
- 1
- -1
- 1
- -1
plugin_name: "vanilla"
hadoop_version: "2.3.0"
auto_security_group: True
runner:
type: "constant"
times: 4
concurrency: 2
context:
users:
tenants: 1
users_per_tenant: 1
sahara_image:
image_url: "http://sahara-files.mirantis.com/sahara-icehouse-vanilla-2.3.0-ubuntu-13.10.qcow2"
username: "ubuntu"
plugin_name: "vanilla"
hadoop_version: "2.3.0"
network: {}
sla:
failure_rate:
max: 0

View File

@@ -1,66 +0,0 @@
{
"SaharaJob.create_launch_job_sequence_with_scaling": [
{
"args": {
"jobs": [
{
"job_type": "Java",
"configs": {
"configs": {
"edp.java.main_class": "org.apache.hadoop.fs.TestDFSIO"
},
"args": ["-write", "-nrFiles", "10", "-fileSize", "100"]
}
}, {
"job_type": "Java",
"configs": {
"configs": {
"edp.java.main_class": "org.apache.hadoop.fs.TestDFSIO"
},
"args": ["-read", "-nrFiles", "10", "-fileSize", "100"]
}
}
],
"deltas": [2, 2, 2]
},
"runner": {
"type": "serial",
"times": 1
},
"context": {
"users": {
"tenants": 1,
"users_per_tenant": 1
},
"sahara_image": {
"image_url": "http://sahara-files.mirantis.com/mos70/sahara-kilo-vanilla-2.6.0-ubuntu-14.04.qcow2",
"username": "ubuntu",
"plugin_name": "vanilla",
"hadoop_version": "2.6.0"
},
"sahara_job_binaries": {
"libs": [
{
"name": "tests.jar",
"download_url": "http://repo1.maven.org/maven2/org/apache/hadoop/hadoop-hdfs/2.6.0/hadoop-hdfs-2.6.0-tests.jar"
}
]
},
"sahara_cluster": {
"master_flavor_id": "4",
"worker_flavor_id": "3",
"workers_count": 3,
"plugin_name": "vanilla",
"hadoop_version": "2.6.0",
"auto_security_group": true
},
"network": {}
},
"sla": {
"failure_rate": {
"max": 0
}
}
}
]
}

View File

@@ -1,59 +0,0 @@
---
SaharaJob.create_launch_job_sequence_with_scaling:
-
args:
jobs:
-
job_type: "Java"
configs:
configs:
edp.java.main_class: "org.apache.hadoop.fs.TestDFSIO"
args:
- "-write"
- "-nrFiles"
- "10"
- "-fileSize"
- "100"
-
job_type: "Java"
configs:
configs:
edp.java.main_class: "org.apache.hadoop.fs.TestDFSIO"
args:
- "-read"
- "-nrFiles"
- "10"
- "-fileSize"
- "100"
deltas:
- 2
- 2
- 2
runner:
type: "serial"
times: 1
context:
users:
tenants: 1
users_per_tenant: 1
sahara_image:
image_url: "http://sahara-files.mirantis.com/mos70/sahara-kilo-vanilla-2.6.0-ubuntu-14.04.qcow2"
username: "ubuntu"
plugin_name: "vanilla"
hadoop_version: "2.6.0"
sahara_job_binaries:
libs:
-
name: "tests.jar"
download_url: "http://repo1.maven.org/maven2/org/apache/hadoop/hadoop-hdfs/2.6.0/hadoop-hdfs-2.6.0-tests.jar"
sahara_cluster:
master_flavor_id: "4"
worker_flavor_id: "3"
workers_count: 3
plugin_name: "vanilla"
hadoop_version: "2.6.0"
auto_security_group: True
network: {}
sla:
failure_rate:
max: 0

View File

@@ -1,66 +0,0 @@
{
"SaharaJob.create_launch_job_sequence": [
{
"args": {
"jobs": [
{
"job_type": "Java",
"configs": {
"configs": {
"edp.java.main_class": "org.apache.hadoop.fs.TestDFSIO"
},
"args": ["-write", "-nrFiles", "10", "-fileSize", "100"]
}
}, {
"job_type": "Java",
"configs": {
"configs": {
"edp.java.main_class": "org.apache.hadoop.fs.TestDFSIO"
},
"args": ["-read", "-nrFiles", "10", "-fileSize", "100"]
}
}
]
},
"runner": {
"type": "constant",
"times": 4,
"concurrency": 2
},
"context": {
"users": {
"tenants": 1,
"users_per_tenant": 1
},
"sahara_image": {
"image_url": "http://sahara-files.mirantis.com/mos70/sahara-kilo-vanilla-2.6.0-ubuntu-14.04.qcow2",
"username": "ubuntu",
"plugin_name": "vanilla",
"hadoop_version": "2.6.0"
},
"sahara_job_binaries": {
"libs": [
{
"name": "tests.jar",
"download_url": "http://repo1.maven.org/maven2/org/apache/hadoop/hadoop-hdfs/2.6.0/hadoop-hdfs-2.6.0-tests.jar"
}
]
},
"sahara_cluster": {
"master_flavor_id": "4",
"worker_flavor_id": "3",
"workers_count": 3,
"plugin_name": "vanilla",
"hadoop_version": "2.6.0",
"auto_security_group": true
},
"network": {}
},
"sla": {
"failure_rate": {
"max": 0
}
}
}
]
}

View File

@@ -1,56 +0,0 @@
---
SaharaJob.create_launch_job_sequence:
-
args:
jobs:
-
job_type: "Java"
configs:
configs:
edp.java.main_class: "org.apache.hadoop.fs.TestDFSIO"
args:
- "-write"
- "-nrFiles"
- "10"
- "-fileSize"
- "100"
-
job_type: "Java"
configs:
configs:
edp.java.main_class: "org.apache.hadoop.fs.TestDFSIO"
args:
- "-read"
- "-nrFiles"
- "10"
- "-fileSize"
- "100"
runner:
type: "constant"
times: 4
concurrency: 2
context:
users:
tenants: 1
users_per_tenant: 1
sahara_image:
image_url: "http://sahara-files.mirantis.com/mos70/sahara-kilo-vanilla-2.6.0-ubuntu-14.04.qcow2"
username: "ubuntu"
plugin_name: "vanilla"
hadoop_version: "2.6.0"
sahara_job_binaries:
libs:
-
name: "tests.jar"
download_url: "http://repo1.maven.org/maven2/org/apache/hadoop/hadoop-hdfs/2.6.0/hadoop-hdfs-2.6.0-tests.jar"
sahara_cluster:
master_flavor_id: "4"
worker_flavor_id: "3"
workers_count: 3
plugin_name: "vanilla"
hadoop_version: "2.6.0"
auto_security_group: True
network: {}
sla:
failure_rate:
max: 0

View File

@@ -1,54 +0,0 @@
{
"SaharaJob.create_launch_job": [
{
"args": {
"job_type": "Java",
"configs": {
"configs": {
"edp.java.main_class": "org.apache.hadoop.examples.PiEstimator"
},
"args": ["10", "10"]
}
},
"runner": {
"type": "constant",
"times": 4,
"concurrency": 2
},
"context": {
"users": {
"tenants": 1,
"users_per_tenant": 1
},
"sahara_image": {
"image_url": "http://sahara-files.mirantis.com/mos70/sahara-kilo-vanilla-2.6.0-ubuntu-14.04.qcow2",
"username": "ubuntu",
"plugin_name": "vanilla",
"hadoop_version": "2.6.0"
},
"sahara_job_binaries": {
"libs": [
{
"name": "examples.jar",
"download_url": "http://repo1.maven.org/maven2/org/apache/hadoop/hadoop-mapreduce-examples/2.6.0/hadoop-mapreduce-examples-2.6.0.jar"
}
]
},
"sahara_cluster": {
"master_flavor_id": "4",
"worker_flavor_id": "3",
"workers_count": 3,
"plugin_name": "vanilla",
"hadoop_version": "2.6.0",
"auto_security_group": true
},
"network": {}
},
"sla": {
"failure_rate": {
"max": 0
}
}
}
]
}

View File

@@ -1,40 +0,0 @@
---
SaharaJob.create_launch_job:
-
args:
job_type: "Java"
configs:
configs:
edp.java.main_class: "org.apache.hadoop.examples.PiEstimator"
args:
- "10"
- "10"
runner:
type: "constant"
times: 4
concurrency: 2
context:
users:
tenants: 1
users_per_tenant: 1
sahara_image:
image_url: "http://sahara-files.mirantis.com/mos70/sahara-kilo-vanilla-2.6.0-ubuntu-14.04.qcow2"
username: "ubuntu"
plugin_name: "vanilla"
hadoop_version: "2.6.0"
sahara_job_binaries:
libs:
-
name: "examples.jar"
download_url: "http://repo1.maven.org/maven2/org/apache/hadoop/hadoop-mapreduce-examples/2.6.0/hadoop-mapreduce-examples-2.6.0.jar"
sahara_cluster:
master_flavor_id: "4"
worker_flavor_id: "3"
workers_count: 3
plugin_name: "vanilla"
hadoop_version: "2.6.0"
auto_security_group: True
network: {}
sla:
failure_rate:
max: 0

View File

@@ -1,63 +0,0 @@
{
"SaharaJob.create_launch_job": [
{
"args": {
"job_type": "Pig",
"configs": {}
},
"runner": {
"type": "constant",
"times": 4,
"concurrency": 2
},
"context": {
"users": {
"tenants": 1,
"users_per_tenant": 1
},
"sahara_image": {
"image_url": "http://sahara-files.mirantis.com/mos70/sahara-kilo-vanilla-2.6.0-ubuntu-14.04.qcow2",
"username": "ubuntu",
"plugin_name": "vanilla",
"hadoop_version": "2.6.0"
},
"sahara_job_binaries": {
"mains": [
{
"name": "example.pig",
"download_url": "https://raw.githubusercontent.com/openstack/sahara/master/etc/edp-examples/pig-job/example.pig"
}
],
"libs": [
{
"name": "udf.jar",
"download_url": "https://github.com/openstack/sahara/blob/master/etc/edp-examples/pig-job/udf.jar?raw=true"
}
]
},
"sahara_input_data_sources": {
"input_type": "hdfs",
"input_url": "/"
},
"sahara_output_data_sources": {
"output_type": "hdfs",
"output_url_prefix": "/out_"
},
"sahara_cluster": {
"master_flavor_id": "4",
"worker_flavor_id": "3",
"workers_count": 3,
"plugin_name": "vanilla",
"hadoop_version": "2.6.0",
"auto_security_group": true
},
"network": {}
},
"sla": {
"failure_rate": {
"max": 0
}
}
}
]
}

View File

@@ -1,45 +0,0 @@
---
SaharaJob.create_launch_job:
-
args:
job_type: "Pig"
configs: {}
runner:
type: "constant"
times: 4
concurrency: 2
context:
users:
tenants: 1
users_per_tenant: 1
sahara_image:
image_url: "http://sahara-files.mirantis.com/mos70/sahara-kilo-vanilla-2.6.0-ubuntu-14.04.qcow2"
username: "ubuntu"
plugin_name: "vanilla"
hadoop_version: "2.6.0"
sahara_job_binaries:
mains:
-
name: "example.pig"
download_url: "https://raw.githubusercontent.com/openstack/sahara/master/etc/edp-examples/pig-job/example.pig"
libs:
-
name: "udf.jar"
download_url: "https://github.com/openstack/sahara/blob/master/etc/edp-examples/pig-job/udf.jar?raw=true"
sahara_input_data_sources:
input_type: "hdfs"
input_url: "/"
sahara_output_data_sources:
output_type: "hdfs"
output_url_prefix: "/out_"
sahara_cluster:
master_flavor_id: "4"
worker_flavor_id: "3"
workers_count: 3
plugin_name: "vanilla"
hadoop_version: "2.6.0"
auto_security_group: True
network: {}
sla:
failure_rate:
max: 0

View File

@@ -351,14 +351,6 @@ class Ironic(ResourceManager):
return self.client.node.list()
class Sahara(ResourceManager):
REQUIRED_SERVICE = consts.Service.SAHARA
def list_node_group_templates(self):
return self.client.node_group_templates.list()
class Designate(ResourceManager):
REQUIRED_SERVICE = consts.Service.DESIGNATE

View File

@@ -757,25 +757,6 @@ class OSClientsTestCase(test.TestCase):
mock_ironic.client.get_client.assert_called_once_with("1", **kw)
self.assertEqual(fake_ironic, self.clients.cache["ironic"])
@mock.patch("%s.Sahara._get_endpoint" % PATH)
def test_sahara(self, mock_sahara__get_endpoint):
fake_sahara = fakes.FakeSaharaClient()
mock_sahara = mock.MagicMock()
mock_sahara.client.Client = mock.MagicMock(return_value=fake_sahara)
mock_sahara__get_endpoint.return_value = "http://fake.to:2/fake"
mock_keystoneauth1 = mock.MagicMock()
self.assertNotIn("sahara", self.clients.cache)
with mock.patch.dict("sys.modules",
{"saharaclient": mock_sahara,
"keystoneauth1": mock_keystoneauth1}):
client = self.clients.sahara()
self.assertEqual(fake_sahara, client)
kw = {
"session": mock_keystoneauth1.session.Session(),
"sahara_url": mock_sahara__get_endpoint.return_value}
mock_sahara.client.Client.assert_called_once_with(1.1, **kw)
self.assertEqual(fake_sahara, self.clients.cache["sahara"])
def test_zaqar(self):
fake_zaqar = fakes.FakeZaqarClient()
mock_zaqar = mock.MagicMock()

View File

@@ -1408,38 +1408,6 @@ class FakeIronicClient(object):
pass
class FakeSaharaClient(object):
def __init__(self):
self.job_executions = mock.MagicMock()
self.jobs = mock.MagicMock()
self.job_binary_internals = mock.MagicMock()
self.job_binaries = mock.MagicMock()
self.data_sources = mock.MagicMock()
self.clusters = mock.MagicMock()
self.cluster_templates = mock.MagicMock()
self.node_group_templates = mock.MagicMock()
self.setup_list_methods()
def setup_list_methods(self):
mock_with_id = mock.MagicMock()
mock_with_id.id = 42
# First call of list returns a list with one object, the next should
# empty after delete.
self.job_executions.list.side_effect = [[mock_with_id], []]
self.jobs.list.side_effect = [[mock_with_id], []]
self.job_binary_internals.list.side_effect = [[mock_with_id], []]
self.job_binaries.list.side_effect = [[mock_with_id], []]
self.data_sources.list.side_effect = [[mock_with_id], []]
self.clusters.list.side_effect = [[mock_with_id], []]
self.cluster_templates.list.side_effect = [[mock_with_id], []]
self.node_group_templates.list.side_effect = [[mock_with_id], []]
class FakeZaqarClient(object):
def __init__(self):
@@ -1508,7 +1476,6 @@ class FakeClients(object):
self._cinder = None
self._neutron = None
self._octavia = None
self._sahara = None
self._heat = None
self._designate = None
self._zaqar = None
@@ -1559,11 +1526,6 @@ class FakeClients(object):
self._octavia = FakeOctaviaClient()
return self._octavia
def sahara(self):
if not self._sahara:
self._sahara = FakeSaharaClient()
return self._sahara
def heat(self):
if not self._heat:
self._heat = FakeHeatClient()

View File

@@ -57,9 +57,6 @@ class RallyJobsTestCase(test.TestCase):
and f.endswith(".yaml")
and not f.endswith("_args.yaml"))}
# TODO(andreykurilin): figure out why it fails
files -= {"rally-mos.yaml", "sahara-clusters.yaml"}
for filename in files:
full_path = os.path.join(self.rally_jobs_path, filename)

View File

@@ -1,148 +0,0 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
from rally.common import cfg
from rally import exceptions
from rally_openstack.task.contexts.sahara import sahara_cluster
from rally_openstack.task.scenarios.sahara import utils as sahara_utils
from tests.unit import test
CONF = cfg.CONF
CTX = "rally_openstack.task.contexts.sahara"
class SaharaClusterTestCase(test.ScenarioTestCase):
patch_task_utils = False
def setUp(self):
super(SaharaClusterTestCase, self).setUp()
self.tenants_num = 2
self.users_per_tenant = 2
self.users = self.tenants_num * self.users_per_tenant
self.tenants = {}
self.users_key = []
for i in range(self.tenants_num):
self.tenants[str(i)] = {"id": str(i), "name": str(i),
"sahara": {"image": "42"}}
for j in range(self.users_per_tenant):
self.users_key.append({"id": "%s_%s" % (str(i), str(j)),
"tenant_id": str(i),
"credential": mock.MagicMock()})
CONF.set_override("sahara_cluster_check_interval", 0, "openstack")
self.context.update({
"config": {
"users": {
"tenants": self.tenants_num,
"users_per_tenant": self.users_per_tenant
},
"sahara_cluster": {
"master_flavor_id": "test_flavor_m",
"worker_flavor_id": "test_flavor_w",
"workers_count": 2,
"plugin_name": "test_plugin",
"hadoop_version": "test_version"
}
},
"admin": {"credential": mock.MagicMock()},
"users": self.users_key,
"tenants": self.tenants
})
@mock.patch("%s.sahara_cluster.resource_manager.cleanup" % CTX)
@mock.patch("%s.sahara_cluster.utils.SaharaScenario._launch_cluster" % CTX,
return_value=mock.MagicMock(id=42))
def test_setup_and_cleanup(self, mock_sahara_scenario__launch_cluster,
mock_cleanup):
sahara_ctx = sahara_cluster.SaharaCluster(self.context)
launch_cluster_calls = []
for i in self.tenants:
launch_cluster_calls.append(mock.call(
flavor_id=None,
plugin_name="test_plugin",
hadoop_version="test_version",
master_flavor_id="test_flavor_m",
worker_flavor_id="test_flavor_w",
workers_count=2,
image_id=self.context["tenants"][i]["sahara"]["image"],
floating_ip_pool=None,
volumes_per_node=None,
volumes_size=1,
auto_security_group=True,
security_groups=None,
node_configs=None,
cluster_configs=None,
enable_anti_affinity=False,
enable_proxy=False,
wait_active=False,
use_autoconfig=True
))
self.clients("sahara").clusters.get.side_effect = [
mock.MagicMock(status="not-active"),
mock.MagicMock(status="active")]
sahara_ctx.setup()
mock_sahara_scenario__launch_cluster.assert_has_calls(
launch_cluster_calls)
sahara_ctx.cleanup()
mock_cleanup.assert_called_once_with(
names=["sahara.clusters"],
users=self.context["users"],
superclass=sahara_utils.SaharaScenario,
task_id=self.context["owner_id"])
@mock.patch("%s.sahara_cluster.utils.SaharaScenario._launch_cluster" % CTX,
return_value=mock.MagicMock(id=42))
def test_setup_and_cleanup_error(self,
mock_sahara_scenario__launch_cluster):
sahara_ctx = sahara_cluster.SaharaCluster(self.context)
launch_cluster_calls = []
for i in self.tenants:
launch_cluster_calls.append(mock.call(
flavor_id=None,
plugin_name="test_plugin",
hadoop_version="test_version",
master_flavor_id="test_flavor_m",
worker_flavor_id="test_flavor_w",
workers_count=2,
image_id=self.context["tenants"][i]["sahara"]["image"],
floating_ip_pool=None,
volumes_per_node=None,
volumes_size=1,
auto_security_groups=True,
security_groups=None,
node_configs=None,
cluster_configs=None,
wait_active=False,
use_autoconfig=True
))
self.clients("sahara").clusters.get.side_effect = [
mock.MagicMock(status="not-active"),
mock.MagicMock(status="error")
]
self.assertRaises(exceptions.ContextSetupFailure, sahara_ctx.setup)

View File

@@ -1,182 +0,0 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
from rally import exceptions
from rally_openstack.task.contexts.sahara import sahara_image
from tests.unit import fakes
from tests.unit import test
BASE_CTX = "rally.task.context"
CTX = "rally_openstack.task.contexts.sahara.sahara_image"
BASE_SCN = "rally.task.scenarios"
class SaharaImageTestCase(test.ScenarioTestCase):
def setUp(self):
super(SaharaImageTestCase, self).setUp()
self.tenants_num = 2
self.users_per_tenant = 2
self.users = self.tenants_num * self.users_per_tenant
self.task = mock.MagicMock()
self.tenants = {}
self.users_key = []
for i in range(self.tenants_num):
self.tenants[str(i)] = {"id": str(i), "name": str(i),
"sahara": {"image": "42"}}
for j in range(self.users_per_tenant):
self.users_key.append({"id": "%s_%s" % (str(i), str(j)),
"tenant_id": str(i),
"credential": fakes.FakeCredential()})
@property
def url_image_context(self):
self.context.update({
"config": {
"users": {
"tenants": self.tenants_num,
"users_per_tenant": self.users_per_tenant,
},
"sahara_image": {
"image_url": "http://somewhere",
"plugin_name": "test_plugin",
"hadoop_version": "test_version",
"username": "test_user"
}
},
"admin": {"credential": fakes.FakeCredential()},
"users": self.users_key,
"tenants": self.tenants
})
return self.context
@property
def existing_image_context(self):
self.context.update({
"config": {
"users": {
"tenants": self.tenants_num,
"users_per_tenant": self.users_per_tenant,
},
"sahara_image": {
"image_uuid": "some_id"
}
},
"admin": {"credential": fakes.FakeCredential()},
"users": self.users_key,
"tenants": self.tenants,
})
return self.context
@mock.patch("rally_openstack.common.services."
"image.image.Image")
@mock.patch("%s.resource_manager.cleanup" % CTX)
@mock.patch("rally_openstack.common.osclients.Clients")
def test_setup_and_cleanup_url_image(self, mock_clients,
mock_cleanup, mock_image):
ctx = self.url_image_context
sahara_ctx = sahara_image.SaharaImage(ctx)
sahara_ctx.generate_random_name = mock.Mock()
image_service = mock.Mock()
mock_image.return_value = image_service
image_service.create_image.return_value = mock.Mock(id=42)
clients = mock.Mock()
mock_clients.return_value = clients
sahara_client = mock.Mock()
clients.sahara.return_value = sahara_client
glance_calls = []
for i in range(self.tenants_num):
glance_calls.append(
mock.call(container_format="bare",
image_location="http://somewhere",
disk_format="qcow2"))
sahara_update_image_calls = []
sahara_update_tags_calls = []
for i in range(self.tenants_num):
sahara_update_image_calls.append(mock.call(image_id=42,
user_name="test_user",
desc=""))
sahara_update_tags_calls.append(mock.call(
image_id=42,
new_tags=["test_plugin", "test_version"]))
sahara_ctx.setup()
image_service.create_image.assert_has_calls(glance_calls)
sahara_client.images.update_image.assert_has_calls(
sahara_update_image_calls)
sahara_client.images.update_tags.assert_has_calls(
sahara_update_tags_calls)
sahara_ctx.cleanup()
mock_cleanup.assert_called_once_with(
names=["glance.images"],
users=ctx["users"],
superclass=sahara_ctx.__class__,
task_id=ctx["owner_id"])
@mock.patch("%s.resource_manager.cleanup" % CTX)
@mock.patch("%s.osclients.Clients" % CTX)
def test_setup_and_cleanup_existing_image(
self, mock_clients, mock_cleanup):
mock_clients.glance.images.get.return_value = mock.MagicMock(
is_public=True)
ctx = self.existing_image_context
sahara_ctx = sahara_image.SaharaImage(ctx)
sahara_ctx._create_image = mock.Mock()
sahara_ctx.setup()
for tenant_id in sahara_ctx.context["tenants"]:
image_id = (
sahara_ctx.context["tenants"][tenant_id]["sahara"]["image"])
self.assertEqual("some_id", image_id)
self.assertFalse(sahara_ctx._create_image.called)
sahara_ctx.cleanup()
self.assertFalse(mock_cleanup.called)
@mock.patch("%s.osclients.Glance.create_client" % CTX)
def test_check_existing_image(self, mock_glance_create_client):
ctx = self.existing_image_context
sahara_ctx = sahara_image.SaharaImage(ctx)
sahara_ctx.setup()
mock_glance_create_client.images.get.asser_called_once_with("some_id")
@mock.patch("%s.osclients.Glance.create_client" % CTX)
def test_check_existing_private_image_fail(self,
mock_glance_create_client):
mock_glance_create_client.return_value.images.get.return_value = (
mock.MagicMock(is_public=False))
ctx = self.existing_image_context
sahara_ctx = sahara_image.SaharaImage(ctx)
self.assertRaises(exceptions.ContextSetupFailure,
sahara_ctx.setup)
mock_glance_create_client.images.get.asser_called_once_with("some_id")

View File

@@ -1,173 +0,0 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
from rally_openstack.task.contexts.sahara import sahara_input_data_sources
from rally_openstack.task.scenarios.swift import utils as swift_utils
from tests.unit import test
CTX = "rally_openstack.task.contexts.sahara"
class SaharaInputDataSourcesTestCase(test.ScenarioTestCase):
def setUp(self):
super(SaharaInputDataSourcesTestCase, self).setUp()
self.tenants_num = 2
self.users_per_tenant = 2
self.task = mock.MagicMock()
self.tenants = {}
self.users = []
for i in range(self.tenants_num):
tenant_id = "tenant_%d" % i
self.tenants[tenant_id] = {"id": tenant_id,
"name": tenant_id + "_name",
"sahara": {"image": "foo_image"}}
for u in range(self.users_per_tenant):
user_id = "%s_user_%d" % (tenant_id, u)
self.users.append(
{"id": user_id,
"tenant_id": tenant_id,
"credential": mock.Mock(auth_url="foo_url",
username=user_id + "_name",
password="foo_password")})
self.context.update({
"config": {
"users": {
"tenants": self.tenants_num,
"users_per_tenant": self.users_per_tenant,
},
"sahara_input_data_sources": {
"input_type": "hdfs",
"input_url": "hdfs://test_host/",
},
},
"admin": {"credential": mock.MagicMock()},
"users": self.users,
"tenants": self.tenants
})
@mock.patch("%s.sahara_input_data_sources.resource_manager.cleanup" % CTX)
@mock.patch("%s.sahara_input_data_sources.osclients" % CTX)
def test_setup_and_cleanup(self, mock_osclients, mock_cleanup):
mock_sahara = mock_osclients.Clients.return_value.sahara.return_value
mock_sahara.data_sources.create.return_value = mock.MagicMock(id=42)
sahara_ctx = sahara_input_data_sources.SaharaInputDataSources(
self.context)
sahara_ctx.generate_random_name = mock.Mock()
input_ds_crete_calls = []
for i in range(self.tenants_num):
input_ds_crete_calls.append(mock.call(
name=sahara_ctx.generate_random_name.return_value,
description="",
data_source_type="hdfs",
url="hdfs://test_host/"))
sahara_ctx.setup()
mock_sahara.data_sources.create.assert_has_calls(
input_ds_crete_calls)
sahara_ctx.cleanup()
mock_cleanup.assert_has_calls((
mock.call(names=["swift.object", "swift.container"],
users=self.context["users"],
superclass=swift_utils.SwiftScenario,
task_id=self.context["owner_id"]),
mock.call(
names=["sahara.data_sources"],
users=self.context["users"],
superclass=sahara_input_data_sources.SaharaInputDataSources,
task_id=self.context["owner_id"])))
@mock.patch("requests.get")
@mock.patch("%s.sahara_input_data_sources.osclients" % CTX)
@mock.patch("%s.sahara_input_data_sources.resource_manager" % CTX)
@mock.patch("%s.sahara_input_data_sources.swift_utils" % CTX)
def test_setup_inputs_swift(self, mock_swift_utils, mock_resource_manager,
mock_osclients, mock_get):
mock_swift_scenario = mock.Mock()
mock_swift_scenario._create_container.side_effect = (
lambda container_name: "container_%s" % container_name)
mock_swift_scenario._upload_object.side_effect = iter(
["uploaded_%d" % i for i in range(10)])
mock_swift_utils.SwiftScenario.return_value = mock_swift_scenario
self.context.update({
"config": {
"users": {
"tenants": self.tenants_num,
"users_per_tenant": self.users_per_tenant,
},
"sahara_input_data_sources": {
"input_type": "swift",
"input_url": "swift://rally.sahara/input_url",
"swift_files": [{
"name": "first",
"download_url": "http://host"}]
},
},
"admin": {"credential": mock.MagicMock()},
"task": mock.MagicMock(),
"users": self.users,
"tenants": self.tenants
})
sahara_ctx = sahara_input_data_sources.SaharaInputDataSources(
self.context)
sahara_ctx.generate_random_name = mock.Mock(
side_effect=iter(["random_name_%d" % i for i in range(10)]))
input_ds_create_calls = []
for i in range(self.tenants_num):
input_ds_create_calls.append(mock.call(
name="random_name_%d" % i,
description="",
data_source_type="swift",
url="swift://rally.sahara/input_url",
credential_user="tenant_%d_user_0_name" % i,
credential_pass="foo_password"
))
sahara_ctx.setup()
self.assertEqual(
input_ds_create_calls,
(mock_osclients.Clients.return_value.sahara.return_value
.data_sources.create.mock_calls))
self.assertEqual({"container_name": "container_rally_rally",
"swift_objects": ["uploaded_0", "uploaded_1"]},
self.context["sahara"])
sahara_ctx.cleanup()
mock_resource_manager.cleanup.assert_has_calls((
mock.call(names=["swift.object", "swift.container"],
users=self.context["users"],
superclass=mock_swift_utils.SwiftScenario,
task_id=self.context["owner_id"]),
mock.call(
names=["sahara.data_sources"],
users=self.context["users"],
superclass=sahara_input_data_sources.SaharaInputDataSources,
task_id=self.context["owner_id"])))

View File

@@ -1,144 +0,0 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
from rally_openstack.task.contexts.sahara import sahara_job_binaries
from rally_openstack.task.scenarios.sahara import utils as sahara_utils
from tests.unit import test
CTX = "rally_openstack.task.contexts.sahara"
class SaharaJobBinariesTestCase(test.ScenarioTestCase):
def setUp(self):
super(SaharaJobBinariesTestCase, self).setUp()
self.tenants_num = 2
self.users_per_tenant = 2
self.users = self.tenants_num * self.users_per_tenant
self.task = mock.MagicMock()
self.tenants = {}
self.users_key = []
for i in range(self.tenants_num):
self.tenants[str(i)] = {"id": str(i), "name": str(i),
"sahara": {"image": "42"}}
for j in range(self.users_per_tenant):
self.users_key.append({"id": "%s_%s" % (str(i), str(j)),
"tenant_id": str(i),
"credential": "credential"})
self.user_key = [{"id": i, "tenant_id": j, "credential": "credential"}
for j in range(self.tenants_num)
for i in range(self.users_per_tenant)]
self.context.update({
"config": {
"users": {
"tenants": self.tenants_num,
"users_per_tenant": self.users_per_tenant,
},
"sahara_job_binaries": {
"libs": [
{
"name": "test.jar",
"download_url": "http://example.com/test.jar"
}
],
"mains": [
{
"name": "test.jar",
"download_url": "http://example.com/test.jar"
}
]
},
},
"admin": {"credential": mock.MagicMock()},
"task": mock.MagicMock(),
"users": self.users_key,
"tenants": self.tenants
})
@mock.patch("%s.sahara_job_binaries.resource_manager.cleanup" % CTX)
@mock.patch(("%s.sahara_job_binaries.SaharaJobBinaries."
"download_and_save_lib") % CTX)
@mock.patch("%s.sahara_job_binaries.osclients" % CTX)
def test_setup_and_cleanup(
self,
mock_osclients,
mock_sahara_job_binaries_download_and_save_lib,
mock_cleanup):
mock_sahara = mock_osclients.Clients(mock.MagicMock()).sahara()
sahara_ctx = sahara_job_binaries.SaharaJobBinaries(self.context)
download_calls = []
for i in range(self.tenants_num):
download_calls.append(mock.call(
sahara=mock_sahara,
lib_type="mains",
name="test.jar",
download_url="http://example.com/test.jar",
tenant_id=str(i)))
download_calls.append(mock.call(
sahara=mock_sahara,
lib_type="libs",
name="test.jar",
download_url="http://example.com/test.jar",
tenant_id=str(i)))
sahara_ctx.setup()
(mock_sahara_job_binaries_download_and_save_lib.
assert_has_calls(download_calls))
sahara_ctx.cleanup()
mock_cleanup.assert_called_once_with(
names=["sahara.job_binary_internals", "sahara.job_binaries"],
users=self.context["users"],
superclass=sahara_utils.SaharaScenario,
task_id=self.context["task"]["uuid"])
@mock.patch("%s.sahara_job_binaries.requests" % CTX)
@mock.patch("%s.sahara_job_binaries.osclients" % CTX)
def test_download_and_save_lib(self, mock_osclients, mock_requests):
mock_requests.get.content.return_value = "some_binary_content"
mock_sahara = mock_osclients.Clients(mock.MagicMock()).sahara()
mock_sahara.job_binary_internals.create.return_value = (
mock.MagicMock(id=42))
sahara_ctx = sahara_job_binaries.SaharaJobBinaries(self.context)
sahara_ctx.context["tenants"]["0"]["sahara"] = {"mains": []}
sahara_ctx.context["tenants"]["0"]["sahara"]["libs"] = []
sahara_ctx.download_and_save_lib(sahara=mock_sahara,
lib_type="mains",
name="test_binary",
download_url="http://somewhere",
tenant_id="0")
sahara_ctx.download_and_save_lib(sahara=mock_sahara,
lib_type="libs",
name="test_binary_2",
download_url="http://somewhere",
tenant_id="0")
mock_requests.get.assert_called_once_with("http://somewhere")

View File

@@ -1,154 +0,0 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
from rally_openstack.common import credential as oscredential
from rally_openstack.task.contexts.sahara import sahara_output_data_sources
from tests.unit import test
CTX = "rally_openstack.task.contexts.sahara"
class SaharaOutputDataSourcesTestCase(test.ScenarioTestCase):
def setUp(self):
super(SaharaOutputDataSourcesTestCase, self).setUp()
fake_dict = oscredential.OpenStackCredential(
"http://fake.example.org:5000/v2.0/", "user", "passwd")
self.tenants_num = 2
self.users_per_tenant = 2
self.users = self.tenants_num * self.users_per_tenant
self.task = mock.MagicMock()
self.tenants = {}
self.users_key = []
for i in range(self.tenants_num):
self.tenants[str(i)] = {"id": str(i), "name": str(i),
"sahara": {"image": "42"}}
for j in range(self.users_per_tenant):
self.users_key.append({"id": "%s_%s" % (str(i), str(j)),
"tenant_id": str(i),
"credential": fake_dict})
self.user_key = [{"id": i, "tenant_id": j, "credential": "credential"}
for j in range(self.tenants_num)
for i in range(self.users_per_tenant)]
self.context.update({
"config": {
"users": {
"tenants": self.tenants_num,
"users_per_tenant": self.users_per_tenant,
},
"sahara_output_data_sources": {
"output_type": "hdfs",
"output_url_prefix": "hdfs://test_host/",
},
},
"admin": {"credential": mock.MagicMock()},
"task": mock.MagicMock(),
"users": self.users_key,
"tenants": self.tenants
})
def check_setup(self):
context = sahara_output_data_sources.SaharaOutputDataSources.context[
"sahara"]["output_conf"]
self.assertIsNotNone(context.get("output_type"))
self.assertIsNotNone(context.get("output_url_prefix"))
@mock.patch("%s.sahara_output_data_sources.resource_manager.cleanup" % CTX)
@mock.patch("%s.sahara_output_data_sources.osclients" % CTX)
def test_setup_and_cleanup_hdfs(self, mock_osclients, mock_cleanup):
mock_sahara = mock_osclients.Clients(mock.MagicMock()).sahara()
mock_sahara.data_sources.create.return_value = mock.MagicMock(
id=42)
sahara_ctx = sahara_output_data_sources.SaharaOutputDataSources(
self.context)
sahara_ctx.generate_random_name = mock.Mock()
output_ds_crete_calls = []
for i in range(self.tenants_num):
output_ds_crete_calls.append(mock.call(
name=sahara_ctx.generate_random_name.return_value,
description="",
data_source_type="hdfs",
url="hdfs://test_host/"))
sahara_ctx.setup()
mock_sahara.data_sources.create.assert_has_calls(
output_ds_crete_calls)
sahara_ctx.cleanup()
mock_cleanup.assert_has_calls((
mock.call(
names=["swift.object", "swift.container"],
users=self.context["users"],
superclass=sahara_output_data_sources.SaharaOutputDataSources,
task_id=self.context["owner_id"]),
mock.call(
names=["sahara.data_sources"],
users=self.context["users"],
superclass=sahara_output_data_sources.SaharaOutputDataSources,
task_id=self.context["owner_id"])))
@mock.patch("%s.sahara_output_data_sources.osclients" % CTX)
def test_setup_inputs_swift(self, mock_osclients):
mock_sahara = mock_osclients.Clients(mock.MagicMock()).sahara()
self.context.update({
"config": {
"users": {
"tenants": self.tenants_num,
"users_per_tenant": self.users_per_tenant,
},
"sahara_output_data_sources": {
"output_type": "swift",
"output_url_prefix": "rally",
},
},
"admin": {"credential": mock.MagicMock()},
"task": mock.MagicMock(),
"users": self.users_key,
"tenants": self.tenants,
"user_choice_method": "random",
})
sahara_ctx = sahara_output_data_sources.SaharaOutputDataSources(
self.context)
sahara_ctx.generate_random_name = mock.Mock(return_value="random_name")
output_ds_crete_calls = []
for i in range(self.tenants_num):
output_ds_crete_calls.append(mock.call(
name="random_name",
description="",
data_source_type="swift",
url="swift://random_name.sahara/",
credential_user="user",
credential_pass="passwd"
))
sahara_ctx.setup()
mock_sahara.data_sources.create.assert_has_calls(
output_ds_crete_calls)
sahara_ctx.cleanup()

View File

@@ -1,170 +0,0 @@
# Copyright 2014: Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
from rally_openstack.task.scenarios.sahara import clusters
from tests.unit import test
BASE = "rally_openstack.task.scenarios.sahara.clusters"
class SaharaClustersTestCase(test.ScenarioTestCase):
@mock.patch("%s.CreateAndDeleteCluster._delete_cluster" % BASE)
@mock.patch("%s.CreateAndDeleteCluster._launch_cluster" % BASE,
return_value=mock.MagicMock(id=42))
def test_create_and_delete_cluster(self,
mock_launch_cluster,
mock_delete_cluster):
scenario = clusters.CreateAndDeleteCluster(self.context)
scenario.context = {
"tenant": {
"sahara": {
"image": "test_image",
}
}
}
scenario.run(master_flavor="test_flavor_m",
worker_flavor="test_flavor_w",
workers_count=5,
plugin_name="test_plugin",
hadoop_version="test_version")
mock_launch_cluster.assert_called_once_with(
flavor_id=None,
master_flavor_id="test_flavor_m",
worker_flavor_id="test_flavor_w",
image_id="test_image",
workers_count=5,
plugin_name="test_plugin",
hadoop_version="test_version",
floating_ip_pool=None,
volumes_per_node=None,
volumes_size=None,
auto_security_group=None,
security_groups=None,
node_configs=None,
cluster_configs=None,
enable_anti_affinity=False,
enable_proxy=False,
use_autoconfig=True)
mock_delete_cluster.assert_called_once_with(
mock_launch_cluster.return_value)
@mock.patch("%s.CreateAndDeleteCluster._delete_cluster" % BASE)
@mock.patch("%s.CreateAndDeleteCluster._launch_cluster" % BASE,
return_value=mock.MagicMock(id=42))
def test_create_and_delete_cluster_deprecated_flavor(self,
mock_launch_cluster,
mock_delete_cluster):
scenario = clusters.CreateAndDeleteCluster(self.context)
scenario.context = {
"tenant": {
"sahara": {
"image": "test_image",
}
}
}
scenario.run(flavor="test_deprecated_arg",
master_flavor=None,
worker_flavor=None,
workers_count=5,
plugin_name="test_plugin",
hadoop_version="test_version")
mock_launch_cluster.assert_called_once_with(
flavor_id="test_deprecated_arg",
master_flavor_id=None,
worker_flavor_id=None,
image_id="test_image",
workers_count=5,
plugin_name="test_plugin",
hadoop_version="test_version",
floating_ip_pool=None,
volumes_per_node=None,
volumes_size=None,
auto_security_group=None,
security_groups=None,
node_configs=None,
cluster_configs=None,
enable_anti_affinity=False,
enable_proxy=False,
use_autoconfig=True)
mock_delete_cluster.assert_called_once_with(
mock_launch_cluster.return_value)
@mock.patch("%s.CreateScaleDeleteCluster._delete_cluster" % BASE)
@mock.patch("%s.CreateScaleDeleteCluster._scale_cluster" % BASE)
@mock.patch("%s.CreateScaleDeleteCluster._launch_cluster" % BASE,
return_value=mock.MagicMock(id=42))
def test_create_scale_delete_cluster(self,
mock_launch_cluster,
mock_scale_cluster,
mock_delete_cluster):
self.clients("sahara").clusters.get.return_value = mock.MagicMock(
id=42, status="active"
)
scenario = clusters.CreateScaleDeleteCluster(self.context)
scenario.context = {
"tenant": {
"sahara": {
"image": "test_image",
}
}
}
scenario.run(master_flavor="test_flavor_m",
worker_flavor="test_flavor_w",
workers_count=5,
deltas=[1, -1],
plugin_name="test_plugin",
hadoop_version="test_version")
mock_launch_cluster.assert_called_once_with(
flavor_id=None,
master_flavor_id="test_flavor_m",
worker_flavor_id="test_flavor_w",
image_id="test_image",
workers_count=5,
plugin_name="test_plugin",
hadoop_version="test_version",
floating_ip_pool=None,
volumes_per_node=None,
volumes_size=None,
auto_security_group=None,
security_groups=None,
node_configs=None,
cluster_configs=None,
enable_anti_affinity=False,
enable_proxy=False,
use_autoconfig=True)
mock_scale_cluster.assert_has_calls([
mock.call(
self.clients("sahara").clusters.get.return_value,
1),
mock.call(
self.clients("sahara").clusters.get.return_value,
-1),
])
mock_delete_cluster.assert_called_once_with(
self.clients("sahara").clusters.get.return_value)

View File

@@ -1,230 +0,0 @@
# Copyright 2014: Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
from rally.common import cfg
from rally_openstack.task.scenarios.sahara import jobs
from tests.unit import test
CONF = cfg.CONF
BASE = "rally_openstack.task.scenarios.sahara.jobs"
class SaharaJobTestCase(test.ScenarioTestCase):
def setUp(self):
super(SaharaJobTestCase, self).setUp()
self.context = test.get_test_context()
CONF.set_override("sahara_cluster_check_interval", 0, "openstack")
CONF.set_override("sahara_job_check_interval", 0, "openstack")
@mock.patch("%s.CreateLaunchJob._run_job_execution" % BASE)
def test_create_launch_job_java(self, mock_run_job):
self.clients("sahara").jobs.create.return_value = mock.MagicMock(
id="42")
self.context.update({
"tenant": {
"sahara": {
"image": "test_image",
"mains": ["main_42"],
"libs": ["lib_42"],
"cluster": "cl_42",
"input": "in_42"
}
}
})
scenario = jobs.CreateLaunchJob(self.context)
scenario.generate_random_name = mock.Mock(
return_value="job_42")
scenario.run(job_type="java",
configs={"conf_key": "conf_val"},
job_idx=0)
self.clients("sahara").jobs.create.assert_called_once_with(
name="job_42",
type="java",
description="",
mains=["main_42"],
libs=["lib_42"]
)
mock_run_job.assert_called_once_with(
job_id="42",
cluster_id="cl_42",
input_id=None,
output_id=None,
configs={"conf_key": "conf_val"},
job_idx=0
)
@mock.patch("%s.CreateLaunchJob._run_job_execution" % BASE)
@mock.patch("%s.CreateLaunchJob._create_output_ds" % BASE,
return_value=mock.MagicMock(id="out_42"))
def test_create_launch_job_pig(self,
mock_create_output,
mock_run_job):
self.clients("sahara").jobs.create.return_value = mock.MagicMock(
id="42")
self.context.update({
"tenant": {
"sahara": {
"image": "test_image",
"mains": ["main_42"],
"libs": ["lib_42"],
"cluster": "cl_42",
"input": "in_42"
}
}
})
scenario = jobs.CreateLaunchJob(self.context)
scenario.generate_random_name = mock.Mock(return_value="job_42")
scenario.run(job_type="pig",
configs={"conf_key": "conf_val"},
job_idx=0)
self.clients("sahara").jobs.create.assert_called_once_with(
name="job_42",
type="pig",
description="",
mains=["main_42"],
libs=["lib_42"]
)
mock_run_job.assert_called_once_with(
job_id="42",
cluster_id="cl_42",
input_id="in_42",
output_id="out_42",
configs={"conf_key": "conf_val"},
job_idx=0
)
@mock.patch("%s.CreateLaunchJob._run_job_execution" % BASE)
@mock.patch("%s.CreateLaunchJob.generate_random_name" % BASE,
return_value="job_42")
def test_create_launch_job_sequence(self,
mock__random_name,
mock_run_job):
self.clients("sahara").jobs.create.return_value = mock.MagicMock(
id="42")
self.context.update({
"tenant": {
"sahara": {
"image": "test_image",
"mains": ["main_42"],
"libs": ["lib_42"],
"cluster": "cl_42",
"input": "in_42"
}
}
})
scenario = jobs.CreateLaunchJobSequence(self.context)
scenario.run(
jobs=[
{
"job_type": "java",
"configs": {"conf_key": "conf_val"}
}, {
"job_type": "java",
"configs": {"conf_key2": "conf_val2"}
}])
jobs_create_call = mock.call(name="job_42",
type="java",
description="",
mains=["main_42"],
libs=["lib_42"])
self.clients("sahara").jobs.create.assert_has_calls(
[jobs_create_call, jobs_create_call])
mock_run_job.assert_has_calls([
mock.call(job_id="42",
cluster_id="cl_42",
input_id=None,
output_id=None,
configs={"conf_key": "conf_val"},
job_idx=0),
mock.call(job_id="42",
cluster_id="cl_42",
input_id=None,
output_id=None,
configs={"conf_key2": "conf_val2"},
job_idx=1)
])
@mock.patch("%s.CreateLaunchJob.generate_random_name" % BASE,
return_value="job_42")
@mock.patch("%s.CreateLaunchJobSequenceWithScaling"
"._scale_cluster" % BASE)
@mock.patch("%s.CreateLaunchJob._run_job_execution" % BASE)
def test_create_launch_job_sequence_with_scaling(
self,
mock_run_job,
mock_create_launch_job_sequence_with_scaling__scale_cluster,
mock_create_launch_job_generate_random_name
):
self.clients("sahara").jobs.create.return_value = mock.MagicMock(
id="42")
self.clients("sahara").clusters.get.return_value = mock.MagicMock(
id="cl_42", status="active")
self.context.update({
"tenant": {
"sahara": {
"image": "test_image",
"mains": ["main_42"],
"libs": ["lib_42"],
"cluster": "cl_42",
"input": "in_42"
}
}
})
scenario = jobs.CreateLaunchJobSequenceWithScaling(self.context)
scenario.run(
jobs=[
{
"job_type": "java",
"configs": {"conf_key": "conf_val"}
}, {
"job_type": "java",
"configs": {"conf_key2": "conf_val2"}
}],
deltas=[1, -1])
jobs_create_call = mock.call(name="job_42",
type="java",
description="",
mains=["main_42"],
libs=["lib_42"])
self.clients("sahara").jobs.create.assert_has_calls(
[jobs_create_call, jobs_create_call])
je_0 = mock.call(job_id="42", cluster_id="cl_42", input_id=None,
output_id=None, configs={"conf_key": "conf_val"},
job_idx=0)
je_1 = mock.call(job_id="42", cluster_id="cl_42", input_id=None,
output_id=None,
configs={"conf_key2": "conf_val2"}, job_idx=1)
mock_run_job.assert_has_calls([je_0, je_1, je_0, je_1, je_0, je_1])

View File

@@ -1,81 +0,0 @@
# Copyright 2014: Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
from rally_openstack.task.scenarios.sahara import node_group_templates as ngts
from tests.unit import test
BASE = "rally_openstack.task.scenarios.sahara.node_group_templates"
class SaharaNodeGroupTemplatesTestCase(test.TestCase):
def setUp(self):
super(SaharaNodeGroupTemplatesTestCase, self).setUp()
self.context = test.get_test_context()
@mock.patch("%s.CreateAndListNodeGroupTemplates"
"._list_node_group_templates" % BASE)
@mock.patch("%s.CreateAndListNodeGroupTemplates"
"._create_master_node_group_template" % BASE)
@mock.patch("%s.CreateAndListNodeGroupTemplates"
"._create_worker_node_group_template" % BASE)
def test_create_and_list_node_group_templates(self,
mock_create_worker,
mock_create_master,
mock_list_group):
ngts.CreateAndListNodeGroupTemplates(self.context).run(
"test_flavor", "test_plugin", "test_version")
mock_create_master.assert_called_once_with(
flavor_id="test_flavor",
plugin_name="test_plugin",
hadoop_version="test_version",
use_autoconfig=True)
mock_create_worker.assert_called_once_with(
flavor_id="test_flavor",
plugin_name="test_plugin",
hadoop_version="test_version",
use_autoconfig=True)
mock_list_group.assert_called_once_with()
@mock.patch("%s.CreateDeleteNodeGroupTemplates"
"._delete_node_group_template" % BASE)
@mock.patch("%s.CreateDeleteNodeGroupTemplates"
"._create_master_node_group_template" % BASE)
@mock.patch("%s.CreateDeleteNodeGroupTemplates"
"._create_worker_node_group_template" % BASE)
def test_create_delete_node_group_templates(self,
mock_create_worker,
mock_create_master,
mock_delete_group):
ngts.CreateDeleteNodeGroupTemplates(self.context).run(
"test_flavor", "test_plugin", "test_version")
mock_create_master.assert_called_once_with(
flavor_id="test_flavor",
plugin_name="test_plugin",
hadoop_version="test_version",
use_autoconfig=True)
mock_create_worker.assert_called_once_with(
flavor_id="test_flavor",
plugin_name="test_plugin",
hadoop_version="test_version",
use_autoconfig=True)
mock_delete_group.assert_has_calls(calls=[
mock.call(mock_create_master.return_value),
mock.call(mock_create_worker.return_value)])

View File

@@ -1,542 +0,0 @@
# Copyright 2014: Mirantis Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
from oslo_utils import uuidutils
from saharaclient.api import base as sahara_base
from rally.common import cfg
from rally import exceptions
from rally_openstack.common import consts
from rally_openstack.task.scenarios.sahara import utils
from tests.unit import test
CONF = cfg.CONF
SAHARA_UTILS = "rally_openstack.task.scenarios.sahara.utils"
class SaharaScenarioTestCase(test.ScenarioTestCase):
# NOTE(stpierre): the Sahara utils generally do funny stuff with
# wait_for() calls -- frequently the is_ready and
# update_resource arguments are functions defined in the Sahara
# utils themselves instead of the more standard resource_is() and
# get_from_manager() calls. As a result, the tests below do more
# integrated/functional testing of wait_for() calls, and we can't
# just mock out wait_for and friends the way we usually do.
patch_task_utils = False
def setUp(self):
super(SaharaScenarioTestCase, self).setUp()
CONF.set_override("sahara_cluster_check_interval", 0, "openstack")
CONF.set_override("sahara_job_check_interval", 0, "openstack")
def test_list_node_group_templates(self):
ngts = []
self.clients("sahara").node_group_templates.list.return_value = ngts
scenario = utils.SaharaScenario(self.context)
return_ngts_list = scenario._list_node_group_templates()
self.assertEqual(ngts, return_ngts_list)
self._test_atomic_action_timer(scenario.atomic_actions(),
"sahara.list_node_group_templates")
@mock.patch(SAHARA_UTILS + ".SaharaScenario.generate_random_name",
return_value="random_name")
@mock.patch(SAHARA_UTILS + ".sahara_consts")
def test_create_node_group_templates(
self, mock_sahara_consts,
mock_generate_random_name):
scenario = utils.SaharaScenario(self.context)
mock_processes = {
"test_plugin": {
"test_version": {
"master": ["p1"],
"worker": ["p2"]
}
}
}
mock_sahara_consts.NODE_PROCESSES = mock_processes
scenario._create_master_node_group_template(
flavor_id="test_flavor",
plugin_name="test_plugin",
hadoop_version="test_version",
use_autoconfig=True
)
scenario._create_worker_node_group_template(
flavor_id="test_flavor",
plugin_name="test_plugin",
hadoop_version="test_version",
use_autoconfig=True
)
create_calls = [
mock.call(
name="random_name",
plugin_name="test_plugin",
hadoop_version="test_version",
flavor_id="test_flavor",
node_processes=["p1"],
use_autoconfig=True),
mock.call(
name="random_name",
plugin_name="test_plugin",
hadoop_version="test_version",
flavor_id="test_flavor",
node_processes=["p2"],
use_autoconfig=True
)]
self.clients("sahara").node_group_templates.create.assert_has_calls(
create_calls)
self._test_atomic_action_timer(
scenario.atomic_actions(),
"sahara.create_master_node_group_template")
self._test_atomic_action_timer(
scenario.atomic_actions(),
"sahara.create_worker_node_group_template")
def test_delete_node_group_templates(self):
scenario = utils.SaharaScenario(self.context)
ng = mock.MagicMock(id=42)
scenario._delete_node_group_template(ng)
delete_mock = self.clients("sahara").node_group_templates.delete
delete_mock.assert_called_once_with(42)
self._test_atomic_action_timer(scenario.atomic_actions(),
"sahara.delete_node_group_template")
@mock.patch(SAHARA_UTILS + ".SaharaScenario.generate_random_name",
return_value="random_name")
@mock.patch(SAHARA_UTILS + ".sahara_consts")
def test_launch_cluster(self, mock_sahara_consts,
mock_generate_random_name):
self.context.update({
"tenant": {
"networks": [
{
"id": "test_neutron_id",
"router_id": "test_router_id"
}
]
}
})
self.clients("services").values.return_value = [
consts.Service.NEUTRON
]
scenario = utils.SaharaScenario(context=self.context)
mock_processes = {
"test_plugin": {
"test_version": {
"master": ["p1"],
"worker": ["p2"]
}
}
}
mock_configs = {
"test_plugin": {
"test_version": {
"target": "HDFS",
"config_name": "dfs.replication"
}
}
}
floating_ip_pool_uuid = uuidutils.generate_uuid()
node_groups = [
{
"name": "master-ng",
"flavor_id": "test_flavor_m",
"node_processes": ["p1"],
"floating_ip_pool": floating_ip_pool_uuid,
"count": 1,
"auto_security_group": True,
"security_groups": ["g1", "g2"],
"node_configs": {"HDFS": {"local_config": "local_value"}},
"use_autoconfig": True,
}, {
"name": "worker-ng",
"flavor_id": "test_flavor_w",
"node_processes": ["p2"],
"floating_ip_pool": floating_ip_pool_uuid,
"volumes_per_node": 5,
"volumes_size": 10,
"count": 42,
"auto_security_group": True,
"security_groups": ["g1", "g2"],
"node_configs": {"HDFS": {"local_config": "local_value"}},
"use_autoconfig": True,
}
]
mock_sahara_consts.NODE_PROCESSES = mock_processes
mock_sahara_consts.REPLICATION_CONFIGS = mock_configs
self.clients("sahara").clusters.create.return_value.id = (
"test_cluster_id")
self.clients("sahara").clusters.get.return_value.status = (
"active")
scenario._launch_cluster(
plugin_name="test_plugin",
hadoop_version="test_version",
master_flavor_id="test_flavor_m",
worker_flavor_id="test_flavor_w",
image_id="test_image",
floating_ip_pool=floating_ip_pool_uuid,
volumes_per_node=5,
volumes_size=10,
auto_security_group=True,
security_groups=["g1", "g2"],
workers_count=42,
node_configs={"HDFS": {"local_config": "local_value"}},
use_autoconfig=True
)
self.clients("sahara").clusters.create.assert_called_once_with(
name="random_name",
plugin_name="test_plugin",
hadoop_version="test_version",
node_groups=node_groups,
default_image_id="test_image",
cluster_configs={"HDFS": {"dfs.replication": 3}},
net_id="test_neutron_id",
anti_affinity=None,
use_autoconfig=True
)
self._test_atomic_action_timer(scenario.atomic_actions(),
"sahara.launch_cluster")
@mock.patch(SAHARA_UTILS + ".SaharaScenario.generate_random_name",
return_value="random_name")
@mock.patch(SAHARA_UTILS + ".sahara_consts")
def test_launch_cluster_with_proxy(self, mock_sahara_consts,
mock_generate_random_name):
context = {
"tenant": {
"networks": [
{
"id": "test_neutron_id",
"router_id": "test_router_id"
}
]
}
}
self.clients("services").values.return_value = [
consts.Service.NEUTRON
]
scenario = utils.SaharaScenario(context=context)
mock_processes = {
"test_plugin": {
"test_version": {
"master": ["p1"],
"worker": ["p2"]
}
}
}
mock_configs = {
"test_plugin": {
"test_version": {
"target": "HDFS",
"config_name": "dfs.replication"
}
}
}
floating_ip_pool_uuid = uuidutils.generate_uuid()
node_groups = [
{
"name": "master-ng",
"flavor_id": "test_flavor_m",
"node_processes": ["p1"],
"floating_ip_pool": floating_ip_pool_uuid,
"count": 1,
"auto_security_group": True,
"security_groups": ["g1", "g2"],
"node_configs": {"HDFS": {"local_config": "local_value"}},
"is_proxy_gateway": True,
"use_autoconfig": True,
}, {
"name": "worker-ng",
"flavor_id": "test_flavor_w",
"node_processes": ["p2"],
"volumes_per_node": 5,
"volumes_size": 10,
"count": 40,
"auto_security_group": True,
"security_groups": ["g1", "g2"],
"node_configs": {"HDFS": {"local_config": "local_value"}},
"use_autoconfig": True,
}, {
"name": "proxy-ng",
"flavor_id": "test_flavor_w",
"node_processes": ["p2"],
"floating_ip_pool": floating_ip_pool_uuid,
"volumes_per_node": 5,
"volumes_size": 10,
"count": 2,
"auto_security_group": True,
"security_groups": ["g1", "g2"],
"node_configs": {"HDFS": {"local_config": "local_value"}},
"is_proxy_gateway": True,
"use_autoconfig": True,
}
]
mock_sahara_consts.NODE_PROCESSES = mock_processes
mock_sahara_consts.REPLICATION_CONFIGS = mock_configs
self.clients("sahara").clusters.create.return_value = mock.MagicMock(
id="test_cluster_id")
self.clients("sahara").clusters.get.return_value = mock.MagicMock(
status="active")
scenario._launch_cluster(
plugin_name="test_plugin",
hadoop_version="test_version",
master_flavor_id="test_flavor_m",
worker_flavor_id="test_flavor_w",
image_id="test_image",
floating_ip_pool=floating_ip_pool_uuid,
volumes_per_node=5,
volumes_size=10,
auto_security_group=True,
security_groups=["g1", "g2"],
workers_count=42,
node_configs={"HDFS": {"local_config": "local_value"}},
enable_proxy=True,
use_autoconfig=True
)
self.clients("sahara").clusters.create.assert_called_once_with(
name="random_name",
plugin_name="test_plugin",
hadoop_version="test_version",
node_groups=node_groups,
default_image_id="test_image",
cluster_configs={"HDFS": {"dfs.replication": 3}},
net_id="test_neutron_id",
anti_affinity=None,
use_autoconfig=True
)
self._test_atomic_action_timer(scenario.atomic_actions(),
"sahara.launch_cluster")
@mock.patch(SAHARA_UTILS + ".SaharaScenario.generate_random_name",
return_value="random_name")
@mock.patch(SAHARA_UTILS + ".sahara_consts")
def test_launch_cluster_error(self, mock_sahara_consts,
mock_generate_random_name):
scenario = utils.SaharaScenario(self.context)
mock_processes = {
"test_plugin": {
"test_version": {
"master": ["p1"],
"worker": ["p2"]
}
}
}
mock_configs = {
"test_plugin": {
"test_version": {
"target": "HDFS",
"config_name": "dfs.replication"
}
}
}
mock_sahara_consts.NODE_PROCESSES = mock_processes
mock_sahara_consts.REPLICATION_CONFIGS = mock_configs
self.clients("sahara").clusters.create.return_value = mock.MagicMock(
id="test_cluster_id")
self.clients("sahara").clusters.get.return_value = mock.MagicMock(
status="error")
self.assertRaises(exceptions.GetResourceErrorStatus,
scenario._launch_cluster,
plugin_name="test_plugin",
hadoop_version="test_version",
master_flavor_id="test_flavor_m",
worker_flavor_id="test_flavor_w",
image_id="test_image",
floating_ip_pool="test_pool",
volumes_per_node=5,
volumes_size=10,
workers_count=42,
node_configs={"HDFS": {"local_config":
"local_value"}})
def test_scale_cluster(self):
scenario = utils.SaharaScenario(self.context)
cluster = mock.MagicMock(id=42, node_groups=[{
"name": "random_master",
"count": 1
}, {
"name": "random_worker",
"count": 41
}])
self.clients("sahara").clusters.get.return_value = mock.MagicMock(
id=42,
status="active")
expected_scale_object = {
"resize_node_groups": [{
"name": "random_worker",
"count": 42
}]
}
scenario._scale_cluster(cluster, 1)
self.clients("sahara").clusters.scale.assert_called_once_with(
42, expected_scale_object)
def test_delete_cluster(self):
scenario = utils.SaharaScenario(self.context)
cluster = mock.MagicMock(id=42)
self.clients("sahara").clusters.get.side_effect = [
cluster, sahara_base.APIException()
]
scenario._delete_cluster(cluster)
delete_mock = self.clients("sahara").clusters.delete
delete_mock.assert_called_once_with(42)
cl_get_expected = mock.call(42)
self.clients("sahara").clusters.get.assert_has_calls([cl_get_expected,
cl_get_expected])
self._test_atomic_action_timer(scenario.atomic_actions(),
"sahara.delete_cluster")
@mock.patch(SAHARA_UTILS + ".SaharaScenario.generate_random_name",
return_value="42")
def test_create_output_ds(self, mock_generate_random_name):
self.context.update({
"sahara": {
"output_conf": {
"output_type": "hdfs",
"output_url_prefix": "hdfs://test_out/"
}
}
})
scenario = utils.SaharaScenario(self.context)
scenario._create_output_ds()
self.clients("sahara").data_sources.create.assert_called_once_with(
name="42",
description="",
data_source_type="hdfs",
url="hdfs://test_out/42"
)
@mock.patch(SAHARA_UTILS + ".SaharaScenario.generate_random_name",
return_value="42")
def test_create_output_ds_swift(self, mock_generate_random_name):
self.context.update({
"sahara": {
"output_conf": {
"output_type": "swift",
"output_url_prefix": "swift://test_out/"
}
}
})
scenario = utils.SaharaScenario(self.context)
self.assertRaises(exceptions.RallyException,
scenario._create_output_ds)
def test_run_job_execution(self):
self.clients("sahara").job_executions.get.side_effect = [
mock.MagicMock(info={"status": "pending"}, id="42"),
mock.MagicMock(info={"status": "SUCCESS"}, id="42")]
self.clients("sahara").job_executions.create.return_value = (
mock.MagicMock(id="42"))
scenario = utils.SaharaScenario(self.context)
scenario._run_job_execution(job_id="test_job_id",
cluster_id="test_cluster_id",
input_id="test_input_id",
output_id="test_output_id",
configs={"k": "v"},
job_idx=0)
self.clients("sahara").job_executions.create.assert_called_once_with(
job_id="test_job_id",
cluster_id="test_cluster_id",
input_id="test_input_id",
output_id="test_output_id",
configs={"k": "v"}
)
je_get_expected = mock.call("42")
self.clients("sahara").job_executions.get.assert_has_calls(
[je_get_expected, je_get_expected]
)
def test_run_job_execution_fail(self):
self.clients("sahara").job_executions.get.side_effect = [
mock.MagicMock(info={"status": "pending"}, id="42"),
mock.MagicMock(info={"status": "killed"}, id="42")]
self.clients("sahara").job_executions.create.return_value = (
mock.MagicMock(id="42"))
scenario = utils.SaharaScenario(self.context)
self.assertRaises(exceptions.RallyException,
scenario._run_job_execution,
job_id="test_job_id",
cluster_id="test_cluster_id",
input_id="test_input_id",
output_id="test_output_id",
configs={"k": "v"},
job_idx=0)
self.clients("sahara").job_executions.create.assert_called_once_with(
job_id="test_job_id",
cluster_id="test_cluster_id",
input_id="test_input_id",
output_id="test_output_id",
configs={"k": "v"}
)

View File

@@ -76,18 +76,6 @@ class TempestConfigfileManagerTestCase(test.TestCase):
for item in expected:
self.assertIn(item, result)
@ddt.data("data_processing", "data-processing")
def test__configure_data_processing(self, service_type):
self.tempest.available_services = ["sahara"]
self.tempest.clients.services.return_value = {
service_type: "sahara"}
self.tempest.conf.add_section("data-processing")
self.tempest._configure_data_processing()
self.assertEqual(service_type,
self.tempest.conf.get("data-processing",
"catalog_type"))
@ddt.data(
# The prefix "ex_" is abbreviation of "expected"
# case #1: both versions are discoverable; version is in the auth_url
@@ -247,15 +235,14 @@ class TempestConfigfileManagerTestCase(test.TestCase):
self.assertIn(item, result)
def test__configure_service_available(self):
available_services = ("nova", "cinder", "glance", "sahara")
available_services = ("nova", "cinder", "glance")
self.tempest.available_services = available_services
self.tempest.conf.add_section("service_available")
self.tempest._configure_service_available()
expected = (
("neutron", "False"), ("heat", "False"), ("nova", "True"),
("swift", "False"), ("cinder", "True"), ("sahara", "True"),
("glance", "True"))
("swift", "False"), ("cinder", "True"), ("glance", "True"))
result = self.tempest.conf.items("service_available")
for item in expected:
self.assertIn(item, result)

View File

@@ -82,7 +82,6 @@ python-neutronclient===11.2.0
python-novaclient===18.6.0
python-octaviaclient===3.7.0
python-openstackclient===6.6.0
python-saharaclient===4.2.0
python-senlinclient===3.1.0
python-subunit===1.4.4
python-swiftclient===4.5.0