Saharaclient tests for tempest

implements bp: saharaclient-tests-in-tempest

Change-Id: I1a491d0ee566e3bd4ab1957ac400806c60602686
This commit is contained in:
Andrey Pavlov 2014-12-10 10:03:08 +03:00
parent aab0b50fb4
commit 8f5ae97331
15 changed files with 1278 additions and 0 deletions

View File

@ -0,0 +1,79 @@
Tests for Sahara Client in Tempest
====================================
How to run
----------
Get the latest tempest resources from GitHub:
.. sourcecode:: console
$ git clone https://github.com/openstack/tempest.git
..
Create a configuration file ``tempest/etc/tempest.conf`` for tempest using the sample file
from ``tempest/etc/tempest.conf.sample``:
.. sourcecode:: console
$ cd $TEMPEST_ROOT_DIR
$ cp etc/tempest.conf.sample etc/tempest.conf
..
Some configuration options are required for running tests. Here is the list:
[DEFAULT]
lock_path=
[identity]
uri=
uri_v3=
username=
tenant_name=
password=
admin_username=
admin_tenant_name=
admin_password=
[service_available]
sahara=true
neutron=true
Get the latest sahara resources from GitHub:
.. sourcecode:: console
$ git clone https://github.com/openstack/sahara.git
..
Copy Sahara Tempest tests directory to tempest:
.. sourcecode:: console
$ cp -r $SAHARA_ROOT_DIR/sahara/tests/tempest .
..
Create a configuration file ``tempest/scenario/data_processing/etc/sahara_tests.conf`` from
``tempest/scenario/data_processing/etc/sahara_tests.conf.sample``:
.. sourcecode:: console
$ cp tempest/scenario/data_processing/etc/sahara_tests.conf.sample tempest/scenario/data_processing/etc/sahara_tests.conf
..
All options should be set. Some of them are defaults and can be left without changing,
other should be specified.
When configuration is finished, you can launch the tests with:
.. sourcecode:: console
$ tox -e all -- tempest.scenario.data_processing.client_tests
..
If you want to launch all Sahara tests in Tempest, you can do this with ``data_processing`` tag:
.. sourcecode:: console
$ tox -e all -- data_processing
..

View File

@ -0,0 +1,270 @@
# Copyright (c) 2014 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
from oslo.utils import timeutils
from saharaclient.api import base as sab
from saharaclient import client as sahara_client
from tempest import config
from tempest import exceptions
from tempest.openstack.common import log as logging
from tempest.scenario.data_processing import config as sahara_test_config
from tempest.scenario import manager
CONF = sahara_test_config.SAHARA_TEST_CONF
TEMPEST_CONF = config.CONF
LOG = logging.getLogger(__name__)
class BaseDataProcessingTest(manager.ScenarioTest):
@classmethod
def resource_setup(cls):
cls.set_network_resources()
super(BaseDataProcessingTest, cls).resource_setup()
endpoint_type = TEMPEST_CONF.data_processing.endpoint_type
catalog_type = TEMPEST_CONF.data_processing.catalog_type
auth_url = TEMPEST_CONF.identity.uri
credentials = cls.credentials()
cls.client = sahara_client.Client(
CONF.data_processing.saharaclient_version,
credentials.username,
credentials.password,
project_name=credentials.tenant_name,
endpoint_type=endpoint_type,
service_type=catalog_type,
auth_url=auth_url,
sahara_url=CONF.data_processing.sahara_url)
cls.object_client = cls.manager.object_client
cls.container_client = cls.manager.container_client
cls.floating_ip_pool = CONF.data_processing.floating_ip_pool
if TEMPEST_CONF.service_available.neutron:
cls.floating_ip_pool = cls.get_floating_ip_pool_id_for_neutron()
cls.worker_template = {
'description': 'Test node group template',
'plugin_name': 'fake',
'hadoop_version': '0.1',
'node_processes': [
'datanode',
'tasktracker'
],
'flavor_id': CONF.data_processing.flavor_id,
'floating_ip_pool': cls.floating_ip_pool
}
cls.master_template = {
'description': 'Test node group template',
'plugin_name': 'fake',
'hadoop_version': '0.1',
'node_processes': [
'namenode',
'jobtracker'
],
'flavor_id': CONF.data_processing.flavor_id,
'floating_ip_pool': cls.floating_ip_pool,
'auto_security_group': True
}
cls.cluster_template = {
'description': 'Test cluster template',
'plugin_name': 'fake',
'hadoop_version': '0.1'
}
cls.swift_data_source_with_creds = {
'url': 'swift://sahara-container/input-source',
'description': 'Test data source',
'type': 'swift',
'credentials': {
'user': 'test',
'password': '123'
}
}
cls.local_hdfs_data_source = {
'url': 'input-source',
'description': 'Test data source',
'type': 'hdfs',
}
cls.external_hdfs_data_source = {
'url': 'hdfs://test-master-node/usr/hadoop/input-source',
'description': 'Test data source',
'type': 'hdfs'
}
@classmethod
def get_floating_ip_pool_id_for_neutron(cls):
for network in cls.networks_client.list_networks()[1]:
if network['label'] == CONF.data_processing.floating_ip_pool:
return network['id']
raise exceptions.NotFound(
'Floating IP pool \'%s\' not found in pool list.'
% CONF.data_processing.floating_ip_pool)
def get_private_network_id(cls):
for network in cls.networks_client.list_networks()[1]:
if network['label'] == CONF.data_processing.private_network:
return network['id']
raise exceptions.NotFound(
'Private network \'%s\' not found in network list.'
% CONF.data_processing.private_network)
def create_node_group_template(self, name, **kwargs):
resp_body = self.client.node_group_templates.create(
name, **kwargs)
self.addCleanup(self.delete_resource,
self.client.node_group_templates, resp_body.id)
return resp_body
def create_cluster_template(self, name, **kwargs):
resp_body = self.client.cluster_templates.create(
name, **kwargs)
self.addCleanup(self.delete_resource,
self.client.cluster_templates, resp_body.id)
return resp_body
def create_data_source(self, name, url, description, type,
credentials=None):
user = credentials['user'] if credentials else None
pas = credentials['password'] if credentials else None
resp_body = self.client.data_sources.create(
name, description, type, url, credential_user=user,
credential_pass=pas)
self.addCleanup(self.delete_resource,
self.client.data_sources, resp_body.id)
return resp_body
def create_job_binary(self, name, url, description, extra=None):
resp_body = self.client.job_binaries.create(
name, url, description, extra)
self.addCleanup(self.delete_resource,
self.client.job_binaries, resp_body.id)
return resp_body
def create_job_binary_internal(self, name, data):
resp_body = self.client.job_binary_internals.create(name, data)
self.addCleanup(self.delete_resource,
self.client.job_binary_internals, resp_body.id)
return resp_body
def create_job(self, name, job_type, mains, libs=None, description=None):
libs = libs or ()
description = description or ''
resp_body = self.client.jobs.create(
name, job_type, mains, libs, description)
self.addCleanup(self.delete_resource, self.client.jobs, resp_body.id)
return resp_body
def create_cluster(self, name, **kwargs):
resp_body = self.client.clusters.create(name, **kwargs)
self.addCleanup(self.delete_resource, self.client.clusters,
resp_body.id)
return resp_body
def check_cluster_active(self, cluster_id):
timeout = CONF.data_processing.cluster_timeout
s_time = timeutils.utcnow()
while timeutils.delta_seconds(s_time, timeutils.utcnow()) < timeout:
cluster = self.client.clusters.get(cluster_id)
if cluster.status == 'Active':
return
if cluster.status == 'Error':
raise exceptions.BuildErrorException(
'Cluster failed to build and is in "Error" status.')
time.sleep(CONF.data_processing.request_timeout)
raise exceptions.TimeoutException(
'Cluster failed to get to "Active status within %d seconds.'
% timeout)
def create_job_execution(self, **kwargs):
resp_body = self.client.job_executions.create(**kwargs)
self.addCleanup(self.delete_resource, self.client.job_executions,
resp_body.id)
return resp_body
def create_container(self, name):
self.container_client.create_container(name)
self.addCleanup(self.delete_swift_container, name)
def delete_resource(self, resource_client, resource_id):
try:
resource_client.delete(resource_id)
except sab.APIException:
pass
else:
self.delete_timeout(resource_client, resource_id)
def delete_timeout(
self, resource_client, resource_id,
timeout=CONF.data_processing.cluster_timeout):
start = timeutils.utcnow()
while timeutils.delta_seconds(start, timeutils.utcnow()) < timeout:
try:
resource_client.get(resource_id)
except sab.APIException as sahara_api_exception:
if 'not found' in sahara_api_exception.message:
return
raise sahara_api_exception
time.sleep(CONF.data_processing.request_timeout)
raise exceptions.TimeoutException(
'Failed to delete resource "%s" in %d seconds.'
% (resource_id, timeout))
def delete_swift_container(self, container):
objects = ([obj['name'] for obj in
self.container_client.list_all_container_objects(
container)])
for obj in objects:
self.object_client.delete_object(container, obj)
self.container_client.delete_container(container)

View File

@ -0,0 +1,85 @@
# Copyright (c) 2014 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.common.utils import data_utils
from tempest.scenario.data_processing.client_tests import base
from tempest.scenario.data_processing import config as sahara_test_config
from tempest import test
CONF = sahara_test_config.SAHARA_TEST_CONF
class ClusterTemplateTest(base.BaseDataProcessingTest):
def _check_create_cluster_template(self):
ng_template_name = data_utils.rand_name('sahara-ng-template')
ng_template = self.create_node_group_template(ng_template_name,
**self.worker_template)
full_cluster_template = self.cluster_template.copy()
full_cluster_template['node_groups'] = [
{
'name': 'master-node',
'flavor_id': CONF.data_processing.flavor_id,
'node_processes': ['namenode'],
'count': 1
},
{
'name': 'worker-node',
'node_group_template_id': ng_template.id,
'count': 3
}
]
template_name = data_utils.rand_name('sahara-cluster-template')
# create cluster template
resp_body = self.create_cluster_template(template_name,
**full_cluster_template)
# check that template created successfully
self.assertEqual(template_name, resp_body.name)
self.assertDictContainsSubset(self.cluster_template,
resp_body.__dict__)
return resp_body.id, template_name
def _check_cluster_template_list(self, template_id, template_name):
# check for cluster template in list
template_list = self.client.cluster_templates.list()
templates_info = [(template.id, template.name)
for template in template_list]
self.assertIn((template_id, template_name), templates_info)
def _check_cluster_template_get(self, template_id, template_name):
# check cluster template fetch by id
template = self.client.cluster_templates.get(
template_id)
self.assertEqual(template_name, template.name)
self.assertDictContainsSubset(self.cluster_template, template.__dict__)
def _check_cluster_template_delete(self, template_id):
# delete cluster template by id
self.client.cluster_templates.delete(
template_id)
# check that cluster template really deleted
templates = self.client.cluster_templates.list()
self.assertNotIn(template_id, [template.id for template in templates])
@test.services('data_processing')
def test_cluster_templates(self):
template_id, template_name = self._check_create_cluster_template()
self._check_cluster_template_list(template_id, template_name)
self._check_cluster_template_get(template_id, template_name)
self._check_cluster_template_delete(template_id)

View File

@ -0,0 +1,82 @@
# Copyright (c) 2014 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.common.utils import data_utils
from tempest.scenario.data_processing.client_tests import base
from tempest import test
class DataSourceTest(base.BaseDataProcessingTest):
def _check_data_source_create(self, source_body):
source_name = data_utils.rand_name('sahara-data-source')
# create data source
resp_body = self.create_data_source(source_name, **source_body)
# check that source created successfully
self.assertEqual(source_name, resp_body.name)
if source_body['type'] == 'swift':
source_body = self.swift_data_source
self.assertDictContainsSubset(source_body, resp_body.__dict__)
return resp_body.id, source_name
def _check_data_source_list(self, source_id, source_name):
# check for data source in list
source_list = self.client.data_sources.list()
sources_info = [(source.id, source.name) for source in source_list]
self.assertIn((source_id, source_name), sources_info)
def _check_data_source_get(self, source_id, source_name, source_body):
# check data source fetch by id
source = self.client.data_sources.get(source_id)
self.assertEqual(source_name, source.name)
self.assertDictContainsSubset(source_body, source.__dict__)
def _check_data_source_delete(self, source_id):
# delete data source
self.client.data_sources.delete(source_id)
# check that data source really deleted
source_list = self.client.data_sources.list()
self.assertNotIn(source_id, [source.id for source in source_list])
@test.services('data_processing')
def test_swift_data_source(self):
# Create extra self.swift_data_source variable to use for comparison to
# data source response body because response body has no 'credentials'
# field.
self.swift_data_source = self.swift_data_source_with_creds.copy()
del self.swift_data_source['credentials']
source_id, source_name = self._check_data_source_create(
self.swift_data_source_with_creds)
self._check_data_source_list(source_id, source_name)
self._check_data_source_get(source_id, source_name,
self.swift_data_source)
self._check_data_source_delete(source_id)
@test.services('data_processing')
def test_local_hdfs_data_source(self):
source_id, source_name = self._check_data_source_create(
self.local_hdfs_data_source)
self._check_data_source_list(source_id, source_name)
self._check_data_source_get(source_id, source_name,
self.local_hdfs_data_source)
self._check_data_source_delete(source_id)
@test.services('data_processing')
def test_external_hdfs_data_source(self):
source_id, source_name = self._check_data_source_create(
self.external_hdfs_data_source)
self._check_data_source_list(source_id, source_name)
self._check_data_source_get(source_id, source_name,
self.external_hdfs_data_source)
self._check_data_source_delete(source_id)

View File

@ -0,0 +1,115 @@
# Copyright (c) 2014 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.common.utils import data_utils
from tempest.scenario.data_processing.client_tests import base
from tempest import test
class JobBinariesTest(base.BaseDataProcessingTest):
def _check_job_binary_create(self, binary_body):
binary_name = data_utils.rand_name('sahara-job-binary')
# create job binary
resp_body = self.create_job_binary(binary_name, **binary_body)
# ensure that binary created successfully
self.assertEqual(binary_name, resp_body.name)
if 'swift' in binary_body['url']:
binary_body = self.swift_job_binary
else:
binary_body = self.internal_db_binary
self.assertDictContainsSubset(binary_body, resp_body.__dict__)
return resp_body.id, binary_name
def _check_job_binary_list(self, binary_id, binary_name):
# check for job binary in list
binary_list = self.client.job_binaries.list()
binaries_info = [(binary.id, binary.name) for binary in binary_list]
self.assertIn((binary_id, binary_name), binaries_info)
def _check_job_binary_delete(self, binary_id):
# delete job binary by id
self.client.job_binaries.delete(binary_id)
# check that job binary really deleted
binary_list = self.client.job_binaries.list()
self.assertNotIn(binary_id, [binary.id for binary in binary_list])
def _check_swift_job_binary_create(self):
self.swift_job_binary_with_extra = {
'url': 'swift://sahara-container/example.jar',
'description': 'Test job binary',
'extra': {
'user': 'test',
'password': '123'
}
}
# Create extra self.swift_job_binary variable to use for comparison to
# job binary response body because response body has no 'extra' field.
self.swift_job_binary = self.swift_job_binary_with_extra.copy()
del self.swift_job_binary['extra']
return self._check_job_binary_create(self.swift_job_binary_with_extra)
def _check_swift_job_binary_get(self, binary_id, binary_name):
# check job binary fetch by id
binary = self.client.job_binaries.get(binary_id)
self.assertEqual(binary_name, binary.name)
self.assertDictContainsSubset(self.swift_job_binary, binary.__dict__)
def _check_internal_db_job_binary_create(self):
name = data_utils.rand_name('sahara-internal-job-binary')
self.job_binary_data = 'Some data'
job_binary_internal = (
self.create_job_binary_internal(name, self.job_binary_data))
self.internal_db_binary_with_extra = {
'url': 'internal-db://%s' % job_binary_internal.id,
'description': 'Test job binary',
'extra': {
'user': 'test',
'password': '123'
}
}
# Create extra self.internal_db_binary variable to use for comparison
# to job binary response body because response body has no 'extra'
# field.
self.internal_db_binary = self.internal_db_binary_with_extra.copy()
del self.internal_db_binary['extra']
return self._check_job_binary_create(
self.internal_db_binary_with_extra)
def _check_internal_db_job_binary_get(self, binary_id, binary_name):
# check job binary fetch by id
binary = self.client.job_binaries.get(binary_id)
self.assertEqual(binary_name, binary.name)
self.assertDictContainsSubset(self.internal_db_binary, binary.__dict__)
def _check_job_binary_get_file(self, binary_id):
data = self.client.job_binaries.get_file(binary_id)
self.assertEqual(self.job_binary_data, data)
@test.services('data_processing')
def test_swift_job_binaries(self):
binary_id, binary_name = self._check_swift_job_binary_create()
self._check_job_binary_list(binary_id, binary_name)
self._check_swift_job_binary_get(binary_id, binary_name)
self._check_job_binary_delete(binary_id)
@test.services('data_processing')
def test_internal_job_binaries(self):
binary_id, binary_name = self._check_internal_db_job_binary_create()
self._check_job_binary_list(binary_id, binary_name)
self._check_internal_db_job_binary_get(binary_id, binary_name)
self._check_job_binary_get_file(binary_id)
self._check_job_binary_delete(binary_id)

View File

@ -0,0 +1,53 @@
# Copyright (c) 2014 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.common.utils import data_utils
from tempest.scenario.data_processing.client_tests import base
from tempest import test
class JobBinaryInternalsTest(base.BaseDataProcessingTest):
def _check_job_binary_internal_create(self):
name = data_utils.rand_name('sahara-internal-job-binary')
self.job_binary_data = 'Some data'
# create job binary internal
resp_body = self.create_job_binary_internal(name, self.job_binary_data)
# check that job_binary_internal created successfully
self.assertEqual(name, resp_body.name)
return resp_body.id, resp_body.name
def _check_job_binary_internal_list(self, binary_id, binary_name):
# check for job binary internal in list
binary_list = self.client.job_binary_internals.list()
binaries_info = [(binary.id, binary.name) for binary in binary_list]
self.assertIn((binary_id, binary_name), binaries_info)
def _check_job_binary_internal_get(self, binary_id, binary_name):
# check job binary internal fetch by id
binary = self.client.job_binary_internals.get(binary_id)
self.assertEqual(binary_name, binary.name)
def _check_job_binary_internal_delete(self, binary_id):
# delete job binary internal by id
self.client.job_binary_internals.delete(binary_id)
# check that job binary internal really deleted
binary_list = self.client.job_binary_internals.list()
self.assertNotIn(binary_id, [binary.id for binary in binary_list])
@test.services('data_processing')
def test_job_binary_internal(self):
binary_id, binary_name = self._check_job_binary_internal_create()
self._check_job_binary_internal_list(binary_id, binary_name)
self._check_job_binary_internal_get(binary_id, binary_name)
self._check_job_binary_internal_delete(binary_id)

View File

@ -0,0 +1,298 @@
# Copyright (c) 2014 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
from oslo.utils import timeutils
from saharaclient.api import base as sab
from tempest.common.utils import data_utils
from tempest import config
from tempest import exceptions
from tempest.scenario.data_processing.client_tests import base
from tempest.scenario.data_processing import config as sahara_test_config
from tempest import test
CONF = sahara_test_config.SAHARA_TEST_CONF
TEMPEST_CONF = config.CONF
class JobExecutionTest(base.BaseDataProcessingTest):
def _check_register_image(self, image_id):
self.client.images.update_image(
image_id, CONF.data_processing.ssh_username, '')
reg_image = self.client.images.get(image_id)
self.assertDictContainsSubset(
{'_sahara_username': CONF.data_processing.ssh_username},
reg_image.metadata)
def _check_image_get(self, image_id):
image = self.client.images.get(image_id)
self.assertEqual(image_id, image.id)
def _check_image_list(self, image_id):
# check for image in list
image_list = self.client.images.list()
images_info = [image.id for image in image_list]
self.assertIn(image_id, images_info)
def _check_adding_tags(self, image_id):
# adding new tags
self.client.images.update_tags(image_id, ['fake', '0.1'])
image = self.client.images.get(image_id)
self.assertDictContainsSubset({'_sahara_tag_fake': 'True',
'_sahara_tag_0.1': 'True'},
image.metadata)
def _check_deleting_tags(self, image_id):
# deleting tags
self.client.images.update_tags(image_id, [])
image = self.client.images.get(image_id)
self.assertNotIn('_sahara_tag_fake', image.metadata)
self.assertNotIn('_sahara_tag_0.1', image.metadata)
def _check_unregister_image(self, image_id):
# unregister image
self.client.images.unregister_image(image_id)
# check that image really unregistered
image_list = self.client.images.list()
self.assertNotIn(image_id, [image.id for image in image_list])
def _check_cluster_create(self):
worker = self.create_node_group_template(
data_utils.rand_name('sahara-ng-template'), **self.worker_template)
master = self.create_node_group_template(
data_utils.rand_name('sahara-ng-template'), **self.master_template)
cluster_templ = self.cluster_template.copy()
cluster_templ['node_groups'] = [
{
'name': 'master',
'node_group_template_id': master.id,
'count': 1
},
{
'name': 'worker',
'node_group_template_id': worker.id,
'count': 3
}
]
if TEMPEST_CONF.service_available.neutron:
cluster_templ['net_id'] = self.get_private_network_id()
cluster_template = self.create_cluster_template(
data_utils.rand_name('sahara-cluster-template'), **cluster_templ)
cluster_name = data_utils.rand_name('sahara-cluster')
self.cluster_info = {
'name': cluster_name,
'plugin_name': 'fake',
'hadoop_version': '0.1',
'cluster_template_id': cluster_template.id,
'default_image_id': CONF.data_processing.fake_image_id
}
# create cluster
cluster = self.create_cluster(**self.cluster_info)
# wait until cluster moves to active state
self.check_cluster_active(cluster.id)
# check that cluster created successfully
self.assertEqual(cluster_name, cluster.name)
self.assertDictContainsSubset(self.cluster_info, cluster.__dict__)
return cluster.id, cluster.name
def _check_cluster_list(self, cluster_id, cluster_name):
# check for cluster in list
cluster_list = self.client.clusters.list()
clusters_info = [(clust.id, clust.name) for clust in cluster_list]
self.assertIn((cluster_id, cluster_name), clusters_info)
def _check_cluster_get(self, cluster_id, cluster_name):
# check cluster fetch by id
cluster = self.client.clusters.get(cluster_id)
self.assertEqual(cluster_name, cluster.name)
self.assertDictContainsSubset(self.cluster_info, cluster.__dict__)
def _check_cluster_scale(self, cluster_id):
big_worker = self.create_node_group_template(
data_utils.rand_name('sahara-ng-template'), **self.worker_template)
scale_body = {
'resize_node_groups': [
{
'count': 2,
'name': 'worker'
},
{
"count": 2,
"name": 'master'
}
],
'add_node_groups': [
{
'count': 1,
'name': 'big-worker',
'node_group_template_id': big_worker.id
}
]
}
self.client.clusters.scale(cluster_id, scale_body)
self.check_cluster_active(cluster_id)
cluster = self.client.clusters.get(cluster_id)
for ng in cluster.node_groups:
if ng['name'] == scale_body['resize_node_groups'][0]['name']:
self.assertDictContainsSubset(
scale_body['resize_node_groups'][0], ng)
elif ng['name'] == scale_body['resize_node_groups'][1]['name']:
self.assertDictContainsSubset(
scale_body['resize_node_groups'][1], ng)
elif ng['name'] == scale_body['add_node_groups'][0]['name']:
self.assertDictContainsSubset(
scale_body['add_node_groups'][0], ng)
def _check_cluster_delete(self, cluster_id):
self.client.clusters.delete(cluster_id)
# check that cluster moved to deleting state
cluster = self.client.clusters.get(cluster_id)
self.assertEqual(cluster.status, 'Deleting')
timeout = CONF.data_processing.cluster_timeout
s_time = timeutils.utcnow()
while timeutils.delta_seconds(s_time, timeutils.utcnow()) < timeout:
try:
self.client.clusters.get(cluster_id)
except sab.APIException:
# cluster is deleted
return
time.sleep(CONF.data_processing.request_timeout)
raise exceptions.TimeoutException('Cluster failed to terminate'
'in %d seconds.' % timeout)
def _check_job_execution_create(self, cluster_id):
# create swift container
container_name = data_utils.rand_name('test-container')
self.create_container(container_name)
# create input data source
input_file_name = data_utils.rand_name('input')
self.object_client.create_object(container_name, input_file_name,
'some-data')
input_file_url = 'swift://%s/%s' % (container_name, input_file_name)
input_source_name = data_utils.rand_name('input-data-source')
input_source = self.create_data_source(
input_source_name, input_file_url, '', 'swift',
{'user': 'test', 'password': '123'})
# create output data source
output_dir_name = data_utils.rand_name('output')
output_dir_url = 'swift://%s/%s' % (container_name, output_dir_name)
output_source_name = data_utils.rand_name('output-data-source')
output_source = self.create_data_source(
output_source_name, output_dir_url, '', 'swift',
{'user': 'test', 'password': '123'})
job_binary = {
'name': data_utils.rand_name('sahara-job-binary'),
'url': input_file_url,
'description': 'Test job binary',
'extra': {
'user': 'test',
'password': '123'
}
}
# create job_binary
job_binary = self.create_job_binary(**job_binary)
# create job
job_name = data_utils.rand_name('test-job')
job = self.create_job(job_name, 'Pig', [job_binary.id])
self.job_exec_info = {
'job_id': job.id,
'cluster_id': cluster_id,
'input_id': input_source.id,
'output_id': output_source.id,
'configs': {}
}
# create job execution
job_execution = self.create_job_execution(**self.job_exec_info)
return job_execution.id
def _check_job_execution_list(self, job_exec_id):
# check for job_execution in list
job_exec_list = self.client.job_executions.list()
self.assertIn(job_exec_id, [job_exec.id for job_exec in job_exec_list])
def _check_job_execution_get(self, job_exec_id):
# check job_execution fetch by id
job_exec = self.client.job_executions.get(job_exec_id)
# Create extra cls.swift_job_binary variable to use for comparison to
# job binary response body because response body has no 'extra' field.
job_exec_info = self.job_exec_info.copy()
del job_exec_info['configs']
self.assertDictContainsSubset(job_exec_info, job_exec.__dict__)
def _check_job_execution_delete(self, job_exec_id):
# delete job_execution by id
self.client.job_executions.delete(job_exec_id)
# check that job_execution really deleted
job_exec_list = self.client.jobs.list()
self.assertNotIn(job_exec_id, [job_exec.id for
job_exec in job_exec_list])
@test.attr(type='slow')
@test.services('data_processing')
def test_job_executions(self):
image_id = CONF.data_processing.fake_image_id
self._check_image_get(image_id)
self._check_register_image(image_id)
self._check_image_list(image_id)
self._check_adding_tags(image_id)
cluster_id, cluster_name = self._check_cluster_create()
self._check_cluster_list(cluster_id, cluster_name)
self._check_cluster_get(cluster_id, cluster_name)
self._check_cluster_scale(cluster_id)
job_exec_id = self._check_job_execution_create(cluster_id)
self._check_job_execution_list(job_exec_id)
self._check_job_execution_get(job_exec_id)
self._check_job_execution_delete(job_exec_id)
self._check_cluster_delete(cluster_id)
self._check_deleting_tags(image_id)
self._check_unregister_image(image_id)
@classmethod
def tearDownClass(cls):
image_list = cls.client.images.list()
image_id = CONF.data_processing.fake_image_id
if image_id in [image.id for image in image_list]:
cls.client.images.unregister_image(image_id)
super(JobExecutionTest, cls).tearDownClass()

View File

@ -0,0 +1,69 @@
# Copyright (c) 2014 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.common.utils import data_utils
from tempest.scenario.data_processing.client_tests import base
from tempest import test
class JobTest(base.BaseDataProcessingTest):
def _check_create_job(self):
job_binary = {
'name': data_utils.rand_name('sahara-job-binary'),
'url': 'swift://sahara-container.sahara/example.jar',
'description': 'Test job binary',
'extra': {
'user': 'test',
'password': '123'
}
}
# create job_binary
job_binary = self.create_job_binary(**job_binary)
self.job = {
'job_type': 'Pig',
'mains': [job_binary.id]
}
job_name = data_utils.rand_name('sahara-job')
# create job
job = self.create_job(job_name, **self.job)
# check that job created successfully
self.assertEqual(job_name, job.name)
return job.id, job.name
def _check_job_list(self, job_id, job_name):
# check for job in list
job_list = self.client.jobs.list()
jobs_info = [(job.id, job.name) for job in job_list]
self.assertIn((job_id, job_name), jobs_info)
def _check_get_job(self, job_id, job_name):
# check job fetch by id
job = self.client.jobs.get(job_id)
self.assertEqual(job_name, job.name)
def _check_delete_job(self, job_id):
# delete job by id
self.client.jobs.delete(job_id)
# check that job really deleted
job_list = self.client.jobs.list()
self.assertNotIn(job_id, [job.id for job in job_list])
@test.services('data_processing')
def test_job(self):
job_id, job_name = self._check_create_job()
self._check_job_list(job_id, job_name)
self._check_get_job(job_id, job_name)
self._check_delete_job(job_id)

View File

@ -0,0 +1,62 @@
# Copyright (c) 2014 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.common.utils import data_utils
from tempest.scenario.data_processing.client_tests import base
from tempest import test
class NodeGroupTemplateTest(base.BaseDataProcessingTest):
def _check_create_node_group_template(self):
template_name = data_utils.rand_name('sahara-ng-template')
# create node group template
resp_body = self.create_node_group_template(template_name,
**self.worker_template)
# check that template created successfully
self.assertEqual(template_name, resp_body.name)
self.assertDictContainsSubset(self.worker_template,
resp_body.__dict__)
return resp_body.id, template_name
def _check_node_group_template_list(self, template_id, template_name):
# check for node group template in list
template_list = self.client.node_group_templates.list()
templates_info = [(template.id, template.name)
for template in template_list]
self.assertIn((template_id, template_name), templates_info)
def _check_node_group_template_get(self, template_id, template_name):
# check node group template fetch by id
template = self.client.node_group_templates.get(
template_id)
self.assertEqual(template_name, template.name)
self.assertDictContainsSubset(self.worker_template,
template.__dict__)
def _check_node_group_template_delete(self, template_id):
# delete node group template by id
self.client.node_group_templates.delete(template_id)
# check that node group really deleted
templates = self.client.node_group_templates.list()
self.assertNotIn(template_id, [template.id for template in templates])
@test.services('data_processing')
def test_node_group_templates(self):
template_id, template_name = self._check_create_node_group_template()
self._check_node_group_template_list(template_id, template_name)
self._check_node_group_template_get(template_id, template_name)
self._check_node_group_template_delete(template_id)

View File

@ -0,0 +1,47 @@
# Copyright (c) 2014 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.scenario.data_processing.client_tests import base
from tempest import test
class PluginsTest(base.BaseDataProcessingTest):
def _check_plugins_list(self):
plugins = self.client.plugins.list()
plugins_names = [plugin.name for plugin in plugins]
self.assertIn('fake', plugins_names)
return plugins_names
def _check_plugins_get(self, plugins_names):
for plugin_name in plugins_names:
plugin = self.client.plugins.get(plugin_name)
self.assertEqual(plugin_name, plugin.name)
# check get_version_details
for plugin_version in plugin.versions:
detailed_plugin = self.client.plugins.get_version_details(
plugin_name, plugin_version)
self.assertEqual(plugin_name, detailed_plugin.name)
# check that required image tags contains name and version
image_tags = detailed_plugin.required_image_tags
self.assertIn(plugin_name, image_tags)
self.assertIn(plugin_version, image_tags)
@test.services('data_processing')
def test_plugins(self):
plugins_names = self._check_plugins_list()
self._check_plugins_get(plugins_names)

View File

@ -0,0 +1,83 @@
# Copyright (c) 2014 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import print_function
import os
from oslo.config import cfg
def class_wrapper(cls):
instances = {}
def get_instance():
if cls not in instances:
instances[cls] = cls()
return instances[cls]
return get_instance
data_processing_group = cfg.OptGroup(name='data_processing',
title='Data Processing options')
DataProcessingGroup = [
cfg.IntOpt('cluster_timeout',
default=3600,
help='Timeout (in seconds) to wait for cluster deployment.'),
cfg.IntOpt('request_timeout',
default=10,
help='Timeout (in seconds) between status checks.'),
cfg.StrOpt('floating_ip_pool',
help='Name of IP pool.'),
cfg.StrOpt('private_network',
help='Name of the private network '
'that provides internal connectivity.'),
cfg.StrOpt('fake_image_id',
help='ID of an image which is used for cluster creation.'),
cfg.StrOpt('flavor_id',
help='ID of a flavor.'),
cfg.StrOpt('saharaclient_version',
default='1.1',
help='Version of python-saharaclient'),
cfg.StrOpt('sahara_url',
help='Sahara url as http://ip:port/api_version/tenant_id'),
cfg.StrOpt('ssh_username',
help='Username which is used to log into remote nodes via SSH.')
]
@class_wrapper
class SaharaTestConfig(object):
DEFAULT_CONFIG_DIR = os.path.join(
os.path.abspath(os.path.dirname(__file__)), 'etc')
DEFAULT_CONFIG_FILE = 'sahara_tests.conf'
def __init__(self):
config_files = []
path = os.path.join(self.DEFAULT_CONFIG_DIR, self.DEFAULT_CONFIG_FILE)
if os.path.isfile(path):
config_files.append(path)
conf = cfg.ConfigOpts()
conf([], project='Sahara-tests',
default_config_files=config_files)
conf.register_group(data_processing_group)
conf.register_opts(DataProcessingGroup, data_processing_group)
self.data_processing = conf.data_processing
SAHARA_TEST_CONF = SaharaTestConfig()

View File

@ -0,0 +1,7 @@
[data_processing]
floating_ip_pool='a454832b-5101-421a-a225-5445a98667d4'
private_network_id='cdbfcaa0-d17c-4ec4-9271-eb12c975d825'
fake_image_id='16a534fc-9e1e-43d6-b577-9f805212dc0b'
flavor_id=2
ssh_username='ubuntu'

View File

@ -0,0 +1,28 @@
[data_processing]
# Timeout (in seconds) to wait for cluster deployment.
#cluster_timeout=3600
# Timeout (in seconds) between status checks.
#request_timeout=10
# Name of IP pool.
#floating_ip_pool=
# Name of the private network that provides internal connectivity.
#private_network=
# ID of an image which is used for cluster creation.
#fake_image_id=
# ID of a flavor.
#flavor_id=
# Version of python-saharaclient
#saharaclient_version=1.1
# Sahara url as http://ip:port/api_version/tenant_id
#sahara_url=
# Username which is used to log into remote nodes via SSH.
#ssh_username=