diff --git a/sahara/tests/tempest/scenario/data_processing/README.rst b/sahara/tests/tempest/scenario/data_processing/README.rst new file mode 100644 index 00000000..c5429b61 --- /dev/null +++ b/sahara/tests/tempest/scenario/data_processing/README.rst @@ -0,0 +1,79 @@ +Tests for Sahara Client in Tempest +==================================== + +How to run +---------- + +Get the latest tempest resources from GitHub: + +.. sourcecode:: console + + $ git clone https://github.com/openstack/tempest.git +.. + +Create a configuration file ``tempest/etc/tempest.conf`` for tempest using the sample file +from ``tempest/etc/tempest.conf.sample``: + +.. sourcecode:: console + + $ cd $TEMPEST_ROOT_DIR + $ cp etc/tempest.conf.sample etc/tempest.conf +.. + +Some configuration options are required for running tests. Here is the list: + +[DEFAULT] +lock_path= + +[identity] +uri= +uri_v3= +username= +tenant_name= +password= +admin_username= +admin_tenant_name= +admin_password= + +[service_available] +sahara=true +neutron=true + +Get the latest sahara resources from GitHub: + +.. sourcecode:: console + + $ git clone https://github.com/openstack/sahara.git +.. + +Copy Sahara Tempest tests directory to tempest: + +.. sourcecode:: console + + $ cp -r $SAHARA_ROOT_DIR/sahara/tests/tempest . +.. + +Create a configuration file ``tempest/scenario/data_processing/etc/sahara_tests.conf`` from +``tempest/scenario/data_processing/etc/sahara_tests.conf.sample``: + +.. sourcecode:: console + + $ cp tempest/scenario/data_processing/etc/sahara_tests.conf.sample tempest/scenario/data_processing/etc/sahara_tests.conf +.. + +All options should be set. Some of them are defaults and can be left without changing, +other should be specified. + +When configuration is finished, you can launch the tests with: + +.. sourcecode:: console + + $ tox -e all -- tempest.scenario.data_processing.client_tests +.. + +If you want to launch all Sahara tests in Tempest, you can do this with ``data_processing`` tag: + +.. sourcecode:: console + + $ tox -e all -- data_processing +.. \ No newline at end of file diff --git a/sahara/tests/tempest/scenario/data_processing/__init__.py b/sahara/tests/tempest/scenario/data_processing/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/sahara/tests/tempest/scenario/data_processing/client_tests/__init__.py b/sahara/tests/tempest/scenario/data_processing/client_tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/sahara/tests/tempest/scenario/data_processing/client_tests/base.py b/sahara/tests/tempest/scenario/data_processing/client_tests/base.py new file mode 100644 index 00000000..5cfc9052 --- /dev/null +++ b/sahara/tests/tempest/scenario/data_processing/client_tests/base.py @@ -0,0 +1,270 @@ +# Copyright (c) 2014 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import time + +from oslo.utils import timeutils +from saharaclient.api import base as sab +from saharaclient import client as sahara_client +from tempest import config +from tempest import exceptions +from tempest.openstack.common import log as logging +from tempest.scenario.data_processing import config as sahara_test_config +from tempest.scenario import manager + + +CONF = sahara_test_config.SAHARA_TEST_CONF +TEMPEST_CONF = config.CONF + +LOG = logging.getLogger(__name__) + + +class BaseDataProcessingTest(manager.ScenarioTest): + @classmethod + def resource_setup(cls): + cls.set_network_resources() + super(BaseDataProcessingTest, cls).resource_setup() + + endpoint_type = TEMPEST_CONF.data_processing.endpoint_type + catalog_type = TEMPEST_CONF.data_processing.catalog_type + auth_url = TEMPEST_CONF.identity.uri + + credentials = cls.credentials() + + cls.client = sahara_client.Client( + CONF.data_processing.saharaclient_version, + credentials.username, + credentials.password, + project_name=credentials.tenant_name, + endpoint_type=endpoint_type, + service_type=catalog_type, + auth_url=auth_url, + sahara_url=CONF.data_processing.sahara_url) + + cls.object_client = cls.manager.object_client + cls.container_client = cls.manager.container_client + + cls.floating_ip_pool = CONF.data_processing.floating_ip_pool + if TEMPEST_CONF.service_available.neutron: + cls.floating_ip_pool = cls.get_floating_ip_pool_id_for_neutron() + + cls.worker_template = { + 'description': 'Test node group template', + 'plugin_name': 'fake', + 'hadoop_version': '0.1', + 'node_processes': [ + 'datanode', + 'tasktracker' + ], + 'flavor_id': CONF.data_processing.flavor_id, + 'floating_ip_pool': cls.floating_ip_pool + } + + cls.master_template = { + 'description': 'Test node group template', + 'plugin_name': 'fake', + 'hadoop_version': '0.1', + 'node_processes': [ + 'namenode', + 'jobtracker' + ], + 'flavor_id': CONF.data_processing.flavor_id, + 'floating_ip_pool': cls.floating_ip_pool, + 'auto_security_group': True + } + + cls.cluster_template = { + 'description': 'Test cluster template', + 'plugin_name': 'fake', + 'hadoop_version': '0.1' + } + + cls.swift_data_source_with_creds = { + 'url': 'swift://sahara-container/input-source', + 'description': 'Test data source', + 'type': 'swift', + 'credentials': { + 'user': 'test', + 'password': '123' + } + } + + cls.local_hdfs_data_source = { + 'url': 'input-source', + 'description': 'Test data source', + 'type': 'hdfs', + } + + cls.external_hdfs_data_source = { + 'url': 'hdfs://test-master-node/usr/hadoop/input-source', + 'description': 'Test data source', + 'type': 'hdfs' + } + + @classmethod + def get_floating_ip_pool_id_for_neutron(cls): + for network in cls.networks_client.list_networks()[1]: + if network['label'] == CONF.data_processing.floating_ip_pool: + return network['id'] + raise exceptions.NotFound( + 'Floating IP pool \'%s\' not found in pool list.' + % CONF.data_processing.floating_ip_pool) + + def get_private_network_id(cls): + for network in cls.networks_client.list_networks()[1]: + if network['label'] == CONF.data_processing.private_network: + return network['id'] + raise exceptions.NotFound( + 'Private network \'%s\' not found in network list.' + % CONF.data_processing.private_network) + + def create_node_group_template(self, name, **kwargs): + + resp_body = self.client.node_group_templates.create( + name, **kwargs) + + self.addCleanup(self.delete_resource, + self.client.node_group_templates, resp_body.id) + + return resp_body + + def create_cluster_template(self, name, **kwargs): + + resp_body = self.client.cluster_templates.create( + name, **kwargs) + + self.addCleanup(self.delete_resource, + self.client.cluster_templates, resp_body.id) + + return resp_body + + def create_data_source(self, name, url, description, type, + credentials=None): + + user = credentials['user'] if credentials else None + pas = credentials['password'] if credentials else None + + resp_body = self.client.data_sources.create( + name, description, type, url, credential_user=user, + credential_pass=pas) + + self.addCleanup(self.delete_resource, + self.client.data_sources, resp_body.id) + + return resp_body + + def create_job_binary(self, name, url, description, extra=None): + + resp_body = self.client.job_binaries.create( + name, url, description, extra) + + self.addCleanup(self.delete_resource, + self.client.job_binaries, resp_body.id) + + return resp_body + + def create_job_binary_internal(self, name, data): + + resp_body = self.client.job_binary_internals.create(name, data) + + self.addCleanup(self.delete_resource, + self.client.job_binary_internals, resp_body.id) + + return resp_body + + def create_job(self, name, job_type, mains, libs=None, description=None): + + libs = libs or () + description = description or '' + + resp_body = self.client.jobs.create( + name, job_type, mains, libs, description) + + self.addCleanup(self.delete_resource, self.client.jobs, resp_body.id) + + return resp_body + + def create_cluster(self, name, **kwargs): + + resp_body = self.client.clusters.create(name, **kwargs) + + self.addCleanup(self.delete_resource, self.client.clusters, + resp_body.id) + + return resp_body + + def check_cluster_active(self, cluster_id): + timeout = CONF.data_processing.cluster_timeout + s_time = timeutils.utcnow() + while timeutils.delta_seconds(s_time, timeutils.utcnow()) < timeout: + cluster = self.client.clusters.get(cluster_id) + if cluster.status == 'Active': + return + if cluster.status == 'Error': + raise exceptions.BuildErrorException( + 'Cluster failed to build and is in "Error" status.') + time.sleep(CONF.data_processing.request_timeout) + raise exceptions.TimeoutException( + 'Cluster failed to get to "Active status within %d seconds.' + % timeout) + + def create_job_execution(self, **kwargs): + + resp_body = self.client.job_executions.create(**kwargs) + + self.addCleanup(self.delete_resource, self.client.job_executions, + resp_body.id) + + return resp_body + + def create_container(self, name): + + self.container_client.create_container(name) + + self.addCleanup(self.delete_swift_container, name) + + def delete_resource(self, resource_client, resource_id): + try: + resource_client.delete(resource_id) + except sab.APIException: + pass + else: + self.delete_timeout(resource_client, resource_id) + + def delete_timeout( + self, resource_client, resource_id, + timeout=CONF.data_processing.cluster_timeout): + + start = timeutils.utcnow() + while timeutils.delta_seconds(start, timeutils.utcnow()) < timeout: + try: + resource_client.get(resource_id) + except sab.APIException as sahara_api_exception: + if 'not found' in sahara_api_exception.message: + return + raise sahara_api_exception + + time.sleep(CONF.data_processing.request_timeout) + + raise exceptions.TimeoutException( + 'Failed to delete resource "%s" in %d seconds.' + % (resource_id, timeout)) + + def delete_swift_container(self, container): + objects = ([obj['name'] for obj in + self.container_client.list_all_container_objects( + container)]) + for obj in objects: + self.object_client.delete_object(container, obj) + self.container_client.delete_container(container) diff --git a/sahara/tests/tempest/scenario/data_processing/client_tests/test_cluster_templates.py b/sahara/tests/tempest/scenario/data_processing/client_tests/test_cluster_templates.py new file mode 100644 index 00000000..66c5df18 --- /dev/null +++ b/sahara/tests/tempest/scenario/data_processing/client_tests/test_cluster_templates.py @@ -0,0 +1,85 @@ +# Copyright (c) 2014 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from tempest.common.utils import data_utils +from tempest.scenario.data_processing.client_tests import base +from tempest.scenario.data_processing import config as sahara_test_config +from tempest import test + +CONF = sahara_test_config.SAHARA_TEST_CONF + + +class ClusterTemplateTest(base.BaseDataProcessingTest): + def _check_create_cluster_template(self): + ng_template_name = data_utils.rand_name('sahara-ng-template') + ng_template = self.create_node_group_template(ng_template_name, + **self.worker_template) + + full_cluster_template = self.cluster_template.copy() + full_cluster_template['node_groups'] = [ + { + 'name': 'master-node', + 'flavor_id': CONF.data_processing.flavor_id, + 'node_processes': ['namenode'], + 'count': 1 + }, + { + 'name': 'worker-node', + 'node_group_template_id': ng_template.id, + 'count': 3 + } + ] + + template_name = data_utils.rand_name('sahara-cluster-template') + + # create cluster template + resp_body = self.create_cluster_template(template_name, + **full_cluster_template) + + # check that template created successfully + self.assertEqual(template_name, resp_body.name) + self.assertDictContainsSubset(self.cluster_template, + resp_body.__dict__) + + return resp_body.id, template_name + + def _check_cluster_template_list(self, template_id, template_name): + # check for cluster template in list + template_list = self.client.cluster_templates.list() + templates_info = [(template.id, template.name) + for template in template_list] + self.assertIn((template_id, template_name), templates_info) + + def _check_cluster_template_get(self, template_id, template_name): + # check cluster template fetch by id + template = self.client.cluster_templates.get( + template_id) + self.assertEqual(template_name, template.name) + self.assertDictContainsSubset(self.cluster_template, template.__dict__) + + def _check_cluster_template_delete(self, template_id): + # delete cluster template by id + self.client.cluster_templates.delete( + template_id) + + # check that cluster template really deleted + templates = self.client.cluster_templates.list() + self.assertNotIn(template_id, [template.id for template in templates]) + + @test.services('data_processing') + def test_cluster_templates(self): + template_id, template_name = self._check_create_cluster_template() + self._check_cluster_template_list(template_id, template_name) + self._check_cluster_template_get(template_id, template_name) + self._check_cluster_template_delete(template_id) diff --git a/sahara/tests/tempest/scenario/data_processing/client_tests/test_data_sources.py b/sahara/tests/tempest/scenario/data_processing/client_tests/test_data_sources.py new file mode 100644 index 00000000..15049579 --- /dev/null +++ b/sahara/tests/tempest/scenario/data_processing/client_tests/test_data_sources.py @@ -0,0 +1,82 @@ +# Copyright (c) 2014 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from tempest.common.utils import data_utils +from tempest.scenario.data_processing.client_tests import base +from tempest import test + + +class DataSourceTest(base.BaseDataProcessingTest): + def _check_data_source_create(self, source_body): + source_name = data_utils.rand_name('sahara-data-source') + # create data source + resp_body = self.create_data_source(source_name, **source_body) + # check that source created successfully + self.assertEqual(source_name, resp_body.name) + if source_body['type'] == 'swift': + source_body = self.swift_data_source + self.assertDictContainsSubset(source_body, resp_body.__dict__) + + return resp_body.id, source_name + + def _check_data_source_list(self, source_id, source_name): + # check for data source in list + source_list = self.client.data_sources.list() + sources_info = [(source.id, source.name) for source in source_list] + self.assertIn((source_id, source_name), sources_info) + + def _check_data_source_get(self, source_id, source_name, source_body): + # check data source fetch by id + source = self.client.data_sources.get(source_id) + self.assertEqual(source_name, source.name) + self.assertDictContainsSubset(source_body, source.__dict__) + + def _check_data_source_delete(self, source_id): + # delete data source + self.client.data_sources.delete(source_id) + # check that data source really deleted + source_list = self.client.data_sources.list() + self.assertNotIn(source_id, [source.id for source in source_list]) + + @test.services('data_processing') + def test_swift_data_source(self): + # Create extra self.swift_data_source variable to use for comparison to + # data source response body because response body has no 'credentials' + # field. + self.swift_data_source = self.swift_data_source_with_creds.copy() + del self.swift_data_source['credentials'] + source_id, source_name = self._check_data_source_create( + self.swift_data_source_with_creds) + self._check_data_source_list(source_id, source_name) + self._check_data_source_get(source_id, source_name, + self.swift_data_source) + self._check_data_source_delete(source_id) + + @test.services('data_processing') + def test_local_hdfs_data_source(self): + source_id, source_name = self._check_data_source_create( + self.local_hdfs_data_source) + self._check_data_source_list(source_id, source_name) + self._check_data_source_get(source_id, source_name, + self.local_hdfs_data_source) + self._check_data_source_delete(source_id) + + @test.services('data_processing') + def test_external_hdfs_data_source(self): + source_id, source_name = self._check_data_source_create( + self.external_hdfs_data_source) + self._check_data_source_list(source_id, source_name) + self._check_data_source_get(source_id, source_name, + self.external_hdfs_data_source) + self._check_data_source_delete(source_id) diff --git a/sahara/tests/tempest/scenario/data_processing/client_tests/test_job_binaries.py b/sahara/tests/tempest/scenario/data_processing/client_tests/test_job_binaries.py new file mode 100644 index 00000000..98617beb --- /dev/null +++ b/sahara/tests/tempest/scenario/data_processing/client_tests/test_job_binaries.py @@ -0,0 +1,115 @@ +# Copyright (c) 2014 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from tempest.common.utils import data_utils +from tempest.scenario.data_processing.client_tests import base +from tempest import test + + +class JobBinariesTest(base.BaseDataProcessingTest): + def _check_job_binary_create(self, binary_body): + binary_name = data_utils.rand_name('sahara-job-binary') + + # create job binary + resp_body = self.create_job_binary(binary_name, **binary_body) + + # ensure that binary created successfully + self.assertEqual(binary_name, resp_body.name) + if 'swift' in binary_body['url']: + binary_body = self.swift_job_binary + else: + binary_body = self.internal_db_binary + self.assertDictContainsSubset(binary_body, resp_body.__dict__) + + return resp_body.id, binary_name + + def _check_job_binary_list(self, binary_id, binary_name): + # check for job binary in list + binary_list = self.client.job_binaries.list() + binaries_info = [(binary.id, binary.name) for binary in binary_list] + self.assertIn((binary_id, binary_name), binaries_info) + + def _check_job_binary_delete(self, binary_id): + # delete job binary by id + self.client.job_binaries.delete(binary_id) + # check that job binary really deleted + binary_list = self.client.job_binaries.list() + self.assertNotIn(binary_id, [binary.id for binary in binary_list]) + + def _check_swift_job_binary_create(self): + self.swift_job_binary_with_extra = { + 'url': 'swift://sahara-container/example.jar', + 'description': 'Test job binary', + 'extra': { + 'user': 'test', + 'password': '123' + } + } + # Create extra self.swift_job_binary variable to use for comparison to + # job binary response body because response body has no 'extra' field. + self.swift_job_binary = self.swift_job_binary_with_extra.copy() + del self.swift_job_binary['extra'] + return self._check_job_binary_create(self.swift_job_binary_with_extra) + + def _check_swift_job_binary_get(self, binary_id, binary_name): + # check job binary fetch by id + binary = self.client.job_binaries.get(binary_id) + self.assertEqual(binary_name, binary.name) + self.assertDictContainsSubset(self.swift_job_binary, binary.__dict__) + + def _check_internal_db_job_binary_create(self): + name = data_utils.rand_name('sahara-internal-job-binary') + self.job_binary_data = 'Some data' + job_binary_internal = ( + self.create_job_binary_internal(name, self.job_binary_data)) + self.internal_db_binary_with_extra = { + 'url': 'internal-db://%s' % job_binary_internal.id, + 'description': 'Test job binary', + 'extra': { + 'user': 'test', + 'password': '123' + } + } + # Create extra self.internal_db_binary variable to use for comparison + # to job binary response body because response body has no 'extra' + # field. + self.internal_db_binary = self.internal_db_binary_with_extra.copy() + del self.internal_db_binary['extra'] + return self._check_job_binary_create( + self.internal_db_binary_with_extra) + + def _check_internal_db_job_binary_get(self, binary_id, binary_name): + # check job binary fetch by id + binary = self.client.job_binaries.get(binary_id) + self.assertEqual(binary_name, binary.name) + self.assertDictContainsSubset(self.internal_db_binary, binary.__dict__) + + def _check_job_binary_get_file(self, binary_id): + data = self.client.job_binaries.get_file(binary_id) + self.assertEqual(self.job_binary_data, data) + + @test.services('data_processing') + def test_swift_job_binaries(self): + binary_id, binary_name = self._check_swift_job_binary_create() + self._check_job_binary_list(binary_id, binary_name) + self._check_swift_job_binary_get(binary_id, binary_name) + self._check_job_binary_delete(binary_id) + + @test.services('data_processing') + def test_internal_job_binaries(self): + binary_id, binary_name = self._check_internal_db_job_binary_create() + self._check_job_binary_list(binary_id, binary_name) + self._check_internal_db_job_binary_get(binary_id, binary_name) + self._check_job_binary_get_file(binary_id) + self._check_job_binary_delete(binary_id) diff --git a/sahara/tests/tempest/scenario/data_processing/client_tests/test_job_binary_internals.py b/sahara/tests/tempest/scenario/data_processing/client_tests/test_job_binary_internals.py new file mode 100644 index 00000000..e2589a3a --- /dev/null +++ b/sahara/tests/tempest/scenario/data_processing/client_tests/test_job_binary_internals.py @@ -0,0 +1,53 @@ +# Copyright (c) 2014 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from tempest.common.utils import data_utils +from tempest.scenario.data_processing.client_tests import base +from tempest import test + + +class JobBinaryInternalsTest(base.BaseDataProcessingTest): + def _check_job_binary_internal_create(self): + name = data_utils.rand_name('sahara-internal-job-binary') + self.job_binary_data = 'Some data' + # create job binary internal + resp_body = self.create_job_binary_internal(name, self.job_binary_data) + # check that job_binary_internal created successfully + self.assertEqual(name, resp_body.name) + return resp_body.id, resp_body.name + + def _check_job_binary_internal_list(self, binary_id, binary_name): + # check for job binary internal in list + binary_list = self.client.job_binary_internals.list() + binaries_info = [(binary.id, binary.name) for binary in binary_list] + self.assertIn((binary_id, binary_name), binaries_info) + + def _check_job_binary_internal_get(self, binary_id, binary_name): + # check job binary internal fetch by id + binary = self.client.job_binary_internals.get(binary_id) + self.assertEqual(binary_name, binary.name) + + def _check_job_binary_internal_delete(self, binary_id): + # delete job binary internal by id + self.client.job_binary_internals.delete(binary_id) + # check that job binary internal really deleted + binary_list = self.client.job_binary_internals.list() + self.assertNotIn(binary_id, [binary.id for binary in binary_list]) + + @test.services('data_processing') + def test_job_binary_internal(self): + binary_id, binary_name = self._check_job_binary_internal_create() + self._check_job_binary_internal_list(binary_id, binary_name) + self._check_job_binary_internal_get(binary_id, binary_name) + self._check_job_binary_internal_delete(binary_id) diff --git a/sahara/tests/tempest/scenario/data_processing/client_tests/test_job_executions.py b/sahara/tests/tempest/scenario/data_processing/client_tests/test_job_executions.py new file mode 100644 index 00000000..7886c3f4 --- /dev/null +++ b/sahara/tests/tempest/scenario/data_processing/client_tests/test_job_executions.py @@ -0,0 +1,298 @@ +# Copyright (c) 2014 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import time + +from oslo.utils import timeutils +from saharaclient.api import base as sab +from tempest.common.utils import data_utils +from tempest import config +from tempest import exceptions +from tempest.scenario.data_processing.client_tests import base +from tempest.scenario.data_processing import config as sahara_test_config +from tempest import test + +CONF = sahara_test_config.SAHARA_TEST_CONF +TEMPEST_CONF = config.CONF + + +class JobExecutionTest(base.BaseDataProcessingTest): + def _check_register_image(self, image_id): + self.client.images.update_image( + image_id, CONF.data_processing.ssh_username, '') + reg_image = self.client.images.get(image_id) + + self.assertDictContainsSubset( + {'_sahara_username': CONF.data_processing.ssh_username}, + reg_image.metadata) + + def _check_image_get(self, image_id): + image = self.client.images.get(image_id) + + self.assertEqual(image_id, image.id) + + def _check_image_list(self, image_id): + # check for image in list + image_list = self.client.images.list() + images_info = [image.id for image in image_list] + + self.assertIn(image_id, images_info) + + def _check_adding_tags(self, image_id): + # adding new tags + self.client.images.update_tags(image_id, ['fake', '0.1']) + image = self.client.images.get(image_id) + + self.assertDictContainsSubset({'_sahara_tag_fake': 'True', + '_sahara_tag_0.1': 'True'}, + image.metadata) + + def _check_deleting_tags(self, image_id): + # deleting tags + self.client.images.update_tags(image_id, []) + image = self.client.images.get(image_id) + + self.assertNotIn('_sahara_tag_fake', image.metadata) + self.assertNotIn('_sahara_tag_0.1', image.metadata) + + def _check_unregister_image(self, image_id): + # unregister image + self.client.images.unregister_image(image_id) + + # check that image really unregistered + image_list = self.client.images.list() + self.assertNotIn(image_id, [image.id for image in image_list]) + + def _check_cluster_create(self): + worker = self.create_node_group_template( + data_utils.rand_name('sahara-ng-template'), **self.worker_template) + + master = self.create_node_group_template( + data_utils.rand_name('sahara-ng-template'), **self.master_template) + + cluster_templ = self.cluster_template.copy() + cluster_templ['node_groups'] = [ + { + 'name': 'master', + 'node_group_template_id': master.id, + 'count': 1 + }, + { + 'name': 'worker', + 'node_group_template_id': worker.id, + 'count': 3 + } + ] + if TEMPEST_CONF.service_available.neutron: + cluster_templ['net_id'] = self.get_private_network_id() + + cluster_template = self.create_cluster_template( + data_utils.rand_name('sahara-cluster-template'), **cluster_templ) + cluster_name = data_utils.rand_name('sahara-cluster') + self.cluster_info = { + 'name': cluster_name, + 'plugin_name': 'fake', + 'hadoop_version': '0.1', + 'cluster_template_id': cluster_template.id, + 'default_image_id': CONF.data_processing.fake_image_id + } + + # create cluster + cluster = self.create_cluster(**self.cluster_info) + + # wait until cluster moves to active state + self.check_cluster_active(cluster.id) + + # check that cluster created successfully + self.assertEqual(cluster_name, cluster.name) + self.assertDictContainsSubset(self.cluster_info, cluster.__dict__) + + return cluster.id, cluster.name + + def _check_cluster_list(self, cluster_id, cluster_name): + # check for cluster in list + cluster_list = self.client.clusters.list() + clusters_info = [(clust.id, clust.name) for clust in cluster_list] + self.assertIn((cluster_id, cluster_name), clusters_info) + + def _check_cluster_get(self, cluster_id, cluster_name): + # check cluster fetch by id + cluster = self.client.clusters.get(cluster_id) + self.assertEqual(cluster_name, cluster.name) + self.assertDictContainsSubset(self.cluster_info, cluster.__dict__) + + def _check_cluster_scale(self, cluster_id): + big_worker = self.create_node_group_template( + data_utils.rand_name('sahara-ng-template'), **self.worker_template) + + scale_body = { + 'resize_node_groups': [ + { + 'count': 2, + 'name': 'worker' + }, + { + "count": 2, + "name": 'master' + } + ], + 'add_node_groups': [ + { + 'count': 1, + 'name': 'big-worker', + 'node_group_template_id': big_worker.id + + } + ] + } + + self.client.clusters.scale(cluster_id, scale_body) + self.check_cluster_active(cluster_id) + + cluster = self.client.clusters.get(cluster_id) + for ng in cluster.node_groups: + if ng['name'] == scale_body['resize_node_groups'][0]['name']: + self.assertDictContainsSubset( + scale_body['resize_node_groups'][0], ng) + elif ng['name'] == scale_body['resize_node_groups'][1]['name']: + self.assertDictContainsSubset( + scale_body['resize_node_groups'][1], ng) + elif ng['name'] == scale_body['add_node_groups'][0]['name']: + self.assertDictContainsSubset( + scale_body['add_node_groups'][0], ng) + + def _check_cluster_delete(self, cluster_id): + self.client.clusters.delete(cluster_id) + + # check that cluster moved to deleting state + cluster = self.client.clusters.get(cluster_id) + self.assertEqual(cluster.status, 'Deleting') + + timeout = CONF.data_processing.cluster_timeout + s_time = timeutils.utcnow() + while timeutils.delta_seconds(s_time, timeutils.utcnow()) < timeout: + try: + self.client.clusters.get(cluster_id) + except sab.APIException: + # cluster is deleted + return + time.sleep(CONF.data_processing.request_timeout) + + raise exceptions.TimeoutException('Cluster failed to terminate' + 'in %d seconds.' % timeout) + + def _check_job_execution_create(self, cluster_id): + # create swift container + container_name = data_utils.rand_name('test-container') + self.create_container(container_name) + + # create input data source + input_file_name = data_utils.rand_name('input') + self.object_client.create_object(container_name, input_file_name, + 'some-data') + + input_file_url = 'swift://%s/%s' % (container_name, input_file_name) + input_source_name = data_utils.rand_name('input-data-source') + input_source = self.create_data_source( + input_source_name, input_file_url, '', 'swift', + {'user': 'test', 'password': '123'}) + + # create output data source + output_dir_name = data_utils.rand_name('output') + output_dir_url = 'swift://%s/%s' % (container_name, output_dir_name) + output_source_name = data_utils.rand_name('output-data-source') + output_source = self.create_data_source( + output_source_name, output_dir_url, '', 'swift', + {'user': 'test', 'password': '123'}) + + job_binary = { + 'name': data_utils.rand_name('sahara-job-binary'), + 'url': input_file_url, + 'description': 'Test job binary', + 'extra': { + 'user': 'test', + 'password': '123' + } + } + # create job_binary + job_binary = self.create_job_binary(**job_binary) + + # create job + job_name = data_utils.rand_name('test-job') + job = self.create_job(job_name, 'Pig', [job_binary.id]) + + self.job_exec_info = { + 'job_id': job.id, + 'cluster_id': cluster_id, + 'input_id': input_source.id, + 'output_id': output_source.id, + 'configs': {} + } + # create job execution + job_execution = self.create_job_execution(**self.job_exec_info) + + return job_execution.id + + def _check_job_execution_list(self, job_exec_id): + # check for job_execution in list + job_exec_list = self.client.job_executions.list() + self.assertIn(job_exec_id, [job_exec.id for job_exec in job_exec_list]) + + def _check_job_execution_get(self, job_exec_id): + # check job_execution fetch by id + job_exec = self.client.job_executions.get(job_exec_id) + # Create extra cls.swift_job_binary variable to use for comparison to + # job binary response body because response body has no 'extra' field. + job_exec_info = self.job_exec_info.copy() + del job_exec_info['configs'] + self.assertDictContainsSubset(job_exec_info, job_exec.__dict__) + + def _check_job_execution_delete(self, job_exec_id): + # delete job_execution by id + self.client.job_executions.delete(job_exec_id) + # check that job_execution really deleted + job_exec_list = self.client.jobs.list() + self.assertNotIn(job_exec_id, [job_exec.id for + job_exec in job_exec_list]) + + @test.attr(type='slow') + @test.services('data_processing') + def test_job_executions(self): + image_id = CONF.data_processing.fake_image_id + self._check_image_get(image_id) + self._check_register_image(image_id) + self._check_image_list(image_id) + self._check_adding_tags(image_id) + + cluster_id, cluster_name = self._check_cluster_create() + self._check_cluster_list(cluster_id, cluster_name) + self._check_cluster_get(cluster_id, cluster_name) + self._check_cluster_scale(cluster_id) + + job_exec_id = self._check_job_execution_create(cluster_id) + self._check_job_execution_list(job_exec_id) + self._check_job_execution_get(job_exec_id) + + self._check_job_execution_delete(job_exec_id) + self._check_cluster_delete(cluster_id) + self._check_deleting_tags(image_id) + self._check_unregister_image(image_id) + + @classmethod + def tearDownClass(cls): + image_list = cls.client.images.list() + image_id = CONF.data_processing.fake_image_id + if image_id in [image.id for image in image_list]: + cls.client.images.unregister_image(image_id) + super(JobExecutionTest, cls).tearDownClass() diff --git a/sahara/tests/tempest/scenario/data_processing/client_tests/test_jobs.py b/sahara/tests/tempest/scenario/data_processing/client_tests/test_jobs.py new file mode 100644 index 00000000..5397aba0 --- /dev/null +++ b/sahara/tests/tempest/scenario/data_processing/client_tests/test_jobs.py @@ -0,0 +1,69 @@ +# Copyright (c) 2014 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from tempest.common.utils import data_utils +from tempest.scenario.data_processing.client_tests import base +from tempest import test + + +class JobTest(base.BaseDataProcessingTest): + def _check_create_job(self): + job_binary = { + 'name': data_utils.rand_name('sahara-job-binary'), + 'url': 'swift://sahara-container.sahara/example.jar', + 'description': 'Test job binary', + 'extra': { + 'user': 'test', + 'password': '123' + } + } + # create job_binary + job_binary = self.create_job_binary(**job_binary) + + self.job = { + 'job_type': 'Pig', + 'mains': [job_binary.id] + } + job_name = data_utils.rand_name('sahara-job') + # create job + job = self.create_job(job_name, **self.job) + # check that job created successfully + self.assertEqual(job_name, job.name) + + return job.id, job.name + + def _check_job_list(self, job_id, job_name): + # check for job in list + job_list = self.client.jobs.list() + jobs_info = [(job.id, job.name) for job in job_list] + self.assertIn((job_id, job_name), jobs_info) + + def _check_get_job(self, job_id, job_name): + # check job fetch by id + job = self.client.jobs.get(job_id) + self.assertEqual(job_name, job.name) + + def _check_delete_job(self, job_id): + # delete job by id + self.client.jobs.delete(job_id) + # check that job really deleted + job_list = self.client.jobs.list() + self.assertNotIn(job_id, [job.id for job in job_list]) + + @test.services('data_processing') + def test_job(self): + job_id, job_name = self._check_create_job() + self._check_job_list(job_id, job_name) + self._check_get_job(job_id, job_name) + self._check_delete_job(job_id) diff --git a/sahara/tests/tempest/scenario/data_processing/client_tests/test_node_group_templates.py b/sahara/tests/tempest/scenario/data_processing/client_tests/test_node_group_templates.py new file mode 100644 index 00000000..59247491 --- /dev/null +++ b/sahara/tests/tempest/scenario/data_processing/client_tests/test_node_group_templates.py @@ -0,0 +1,62 @@ +# Copyright (c) 2014 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from tempest.common.utils import data_utils +from tempest.scenario.data_processing.client_tests import base +from tempest import test + + +class NodeGroupTemplateTest(base.BaseDataProcessingTest): + def _check_create_node_group_template(self): + template_name = data_utils.rand_name('sahara-ng-template') + + # create node group template + resp_body = self.create_node_group_template(template_name, + **self.worker_template) + # check that template created successfully + self.assertEqual(template_name, resp_body.name) + self.assertDictContainsSubset(self.worker_template, + resp_body.__dict__) + + return resp_body.id, template_name + + def _check_node_group_template_list(self, template_id, template_name): + # check for node group template in list + template_list = self.client.node_group_templates.list() + templates_info = [(template.id, template.name) + for template in template_list] + self.assertIn((template_id, template_name), templates_info) + + def _check_node_group_template_get(self, template_id, template_name): + # check node group template fetch by id + template = self.client.node_group_templates.get( + template_id) + self.assertEqual(template_name, template.name) + self.assertDictContainsSubset(self.worker_template, + template.__dict__) + + def _check_node_group_template_delete(self, template_id): + # delete node group template by id + self.client.node_group_templates.delete(template_id) + + # check that node group really deleted + templates = self.client.node_group_templates.list() + self.assertNotIn(template_id, [template.id for template in templates]) + + @test.services('data_processing') + def test_node_group_templates(self): + template_id, template_name = self._check_create_node_group_template() + self._check_node_group_template_list(template_id, template_name) + self._check_node_group_template_get(template_id, template_name) + self._check_node_group_template_delete(template_id) diff --git a/sahara/tests/tempest/scenario/data_processing/client_tests/test_plugins.py b/sahara/tests/tempest/scenario/data_processing/client_tests/test_plugins.py new file mode 100644 index 00000000..3c9b6981 --- /dev/null +++ b/sahara/tests/tempest/scenario/data_processing/client_tests/test_plugins.py @@ -0,0 +1,47 @@ +# Copyright (c) 2014 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from tempest.scenario.data_processing.client_tests import base +from tempest import test + + +class PluginsTest(base.BaseDataProcessingTest): + + def _check_plugins_list(self): + plugins = self.client.plugins.list() + plugins_names = [plugin.name for plugin in plugins] + self.assertIn('fake', plugins_names) + + return plugins_names + + def _check_plugins_get(self, plugins_names): + for plugin_name in plugins_names: + plugin = self.client.plugins.get(plugin_name) + self.assertEqual(plugin_name, plugin.name) + + # check get_version_details + for plugin_version in plugin.versions: + detailed_plugin = self.client.plugins.get_version_details( + plugin_name, plugin_version) + self.assertEqual(plugin_name, detailed_plugin.name) + + # check that required image tags contains name and version + image_tags = detailed_plugin.required_image_tags + self.assertIn(plugin_name, image_tags) + self.assertIn(plugin_version, image_tags) + + @test.services('data_processing') + def test_plugins(self): + plugins_names = self._check_plugins_list() + self._check_plugins_get(plugins_names) diff --git a/sahara/tests/tempest/scenario/data_processing/config.py b/sahara/tests/tempest/scenario/data_processing/config.py new file mode 100644 index 00000000..e23b2307 --- /dev/null +++ b/sahara/tests/tempest/scenario/data_processing/config.py @@ -0,0 +1,83 @@ +# Copyright (c) 2014 Mirantis Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from __future__ import print_function + +import os + +from oslo.config import cfg + + +def class_wrapper(cls): + instances = {} + + def get_instance(): + if cls not in instances: + instances[cls] = cls() + return instances[cls] + + return get_instance + + +data_processing_group = cfg.OptGroup(name='data_processing', + title='Data Processing options') +DataProcessingGroup = [ + cfg.IntOpt('cluster_timeout', + default=3600, + help='Timeout (in seconds) to wait for cluster deployment.'), + cfg.IntOpt('request_timeout', + default=10, + help='Timeout (in seconds) between status checks.'), + cfg.StrOpt('floating_ip_pool', + help='Name of IP pool.'), + cfg.StrOpt('private_network', + help='Name of the private network ' + 'that provides internal connectivity.'), + cfg.StrOpt('fake_image_id', + help='ID of an image which is used for cluster creation.'), + cfg.StrOpt('flavor_id', + help='ID of a flavor.'), + cfg.StrOpt('saharaclient_version', + default='1.1', + help='Version of python-saharaclient'), + cfg.StrOpt('sahara_url', + help='Sahara url as http://ip:port/api_version/tenant_id'), + cfg.StrOpt('ssh_username', + help='Username which is used to log into remote nodes via SSH.') +] + + +@class_wrapper +class SaharaTestConfig(object): + DEFAULT_CONFIG_DIR = os.path.join( + os.path.abspath(os.path.dirname(__file__)), 'etc') + + DEFAULT_CONFIG_FILE = 'sahara_tests.conf' + + def __init__(self): + config_files = [] + path = os.path.join(self.DEFAULT_CONFIG_DIR, self.DEFAULT_CONFIG_FILE) + if os.path.isfile(path): + config_files.append(path) + + conf = cfg.ConfigOpts() + conf([], project='Sahara-tests', + default_config_files=config_files) + conf.register_group(data_processing_group) + conf.register_opts(DataProcessingGroup, data_processing_group) + + self.data_processing = conf.data_processing + + +SAHARA_TEST_CONF = SaharaTestConfig() diff --git a/sahara/tests/tempest/scenario/data_processing/etc/sahara_tests.conf b/sahara/tests/tempest/scenario/data_processing/etc/sahara_tests.conf new file mode 100644 index 00000000..6fbf3ba1 --- /dev/null +++ b/sahara/tests/tempest/scenario/data_processing/etc/sahara_tests.conf @@ -0,0 +1,7 @@ +[data_processing] + +floating_ip_pool='a454832b-5101-421a-a225-5445a98667d4' +private_network_id='cdbfcaa0-d17c-4ec4-9271-eb12c975d825' +fake_image_id='16a534fc-9e1e-43d6-b577-9f805212dc0b' +flavor_id=2 +ssh_username='ubuntu' \ No newline at end of file diff --git a/sahara/tests/tempest/scenario/data_processing/etc/sahara_tests.conf.sample b/sahara/tests/tempest/scenario/data_processing/etc/sahara_tests.conf.sample new file mode 100644 index 00000000..410a8820 --- /dev/null +++ b/sahara/tests/tempest/scenario/data_processing/etc/sahara_tests.conf.sample @@ -0,0 +1,28 @@ +[data_processing] + +# Timeout (in seconds) to wait for cluster deployment. +#cluster_timeout=3600 + +# Timeout (in seconds) between status checks. +#request_timeout=10 + +# Name of IP pool. +#floating_ip_pool= + +# Name of the private network that provides internal connectivity. +#private_network= + +# ID of an image which is used for cluster creation. +#fake_image_id= + +# ID of a flavor. +#flavor_id= + +# Version of python-saharaclient +#saharaclient_version=1.1 + +# Sahara url as http://ip:port/api_version/tenant_id +#sahara_url= + +# Username which is used to log into remote nodes via SSH. +#ssh_username=