sahara/sahara/tests/tempest/scenario/data_processing/client_tests/base.py

285 lines
9.4 KiB
Python

# Copyright (c) 2014 Mirantis Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
from oslo_log import log as logging
from oslo_utils import timeutils
from saharaclient.api import base as sab
from saharaclient import client as sahara_client
from tempest import config
from tempest import exceptions
from tempest.scenario import manager
TEMPEST_CONF = config.CONF
# cluster status
CLUSTER_STATUS_ACTIVE = "Active"
CLUSTER_STATUS_ERROR = "Error"
LOG = logging.getLogger(__name__)
class BaseDataProcessingTest(manager.ScenarioTest):
@classmethod
def resource_setup(cls):
cls.set_network_resources()
super(BaseDataProcessingTest, cls).resource_setup()
endpoint_type = TEMPEST_CONF.data_processing.endpoint_type
catalog_type = TEMPEST_CONF.data_processing.catalog_type
auth_url = TEMPEST_CONF.identity.uri
credentials = cls.os_primary.credentials
cls.client = sahara_client.Client(
TEMPEST_CONF.data_processing.saharaclient_version,
credentials.username,
credentials.password,
project_name=credentials.tenant_name,
endpoint_type=endpoint_type,
service_type=catalog_type,
auth_url=auth_url,
sahara_url=TEMPEST_CONF.data_processing.sahara_url)
cls.object_client = cls.os_primary.object_client
cls.container_client = cls.os_primary.container_client
cls.networks_client = cls.os_primary.compute_networks_client
cls.floating_ip_pool = TEMPEST_CONF.network.floating_network_name
if TEMPEST_CONF.service_available.neutron:
cls.floating_ip_pool = cls.get_floating_ip_pool_id_for_neutron()
cls.worker_template = {
'description': 'Test node group template',
'plugin_name': 'fake',
'hadoop_version': '0.1',
'node_processes': [
'datanode',
'tasktracker'
],
'flavor_id': TEMPEST_CONF.compute.flavor_ref,
'floating_ip_pool': cls.floating_ip_pool
}
cls.master_template = {
'description': 'Test node group template',
'plugin_name': 'fake',
'hadoop_version': '0.1',
'node_processes': [
'namenode',
'jobtracker'
],
'flavor_id': TEMPEST_CONF.compute.flavor_ref,
'floating_ip_pool': cls.floating_ip_pool,
'auto_security_group': True
}
cls.cluster_template = {
'description': 'Test cluster template',
'plugin_name': 'fake',
'hadoop_version': '0.1'
}
cls.swift_data_source_with_creds = {
'url': 'swift://sahara-container/input-source',
'description': 'Test data source',
'type': 'swift',
'credentials': {
'user': 'test',
'password': '123'
}
}
cls.local_hdfs_data_source = {
'url': 'input-source',
'description': 'Test data source',
'type': 'hdfs',
}
cls.external_hdfs_data_source = {
'url': 'hdfs://test-master-node/usr/hadoop/input-source',
'description': 'Test data source',
'type': 'hdfs'
}
@classmethod
def get_floating_ip_pool_id_for_neutron(cls):
net_id = cls._find_network_by_name(
TEMPEST_CONF.network.floating_network_name)
if not net_id:
raise exceptions.NotFound(
'Floating IP pool \'%s\' not found in pool list.'
% TEMPEST_CONF.network.floating_network_name)
return net_id
@classmethod
def get_private_network_id(cls):
net_id = cls._find_network_by_name(
TEMPEST_CONF.compute.fixed_network_name)
if not net_id:
raise exceptions.NotFound(
'Private network \'%s\' not found in network list.'
% TEMPEST_CONF.compute.fixed_network_name)
return net_id
@classmethod
def _find_network_by_name(cls, network_name):
for network in cls.networks_client.list_networks()['networks']:
if network['label'] == network_name:
return network['id']
return None
def create_node_group_template(self, name, **kwargs):
resp_body = self.client.node_group_templates.create(
name, **kwargs)
self.addCleanup(self.delete_resource,
self.client.node_group_templates, resp_body.id)
return resp_body
def create_cluster_template(self, name, **kwargs):
resp_body = self.client.cluster_templates.create(
name, **kwargs)
self.addCleanup(self.delete_resource,
self.client.cluster_templates, resp_body.id)
return resp_body
def create_data_source(self, name, url, description, type,
credentials=None):
user = credentials['user'] if credentials else None
pas = credentials['password'] if credentials else None
resp_body = self.client.data_sources.create(
name, description, type, url, credential_user=user,
credential_pass=pas)
self.addCleanup(self.delete_resource,
self.client.data_sources, resp_body.id)
return resp_body
def create_job_binary(self, name, url, description, extra=None):
resp_body = self.client.job_binaries.create(
name, url, description, extra)
self.addCleanup(self.delete_resource,
self.client.job_binaries, resp_body.id)
return resp_body
def create_job_binary_internal(self, name, data):
resp_body = self.client.job_binary_internals.create(name, data)
self.addCleanup(self.delete_resource,
self.client.job_binary_internals, resp_body.id)
return resp_body
def create_job(self, name, job_type, mains, libs=None, description=None):
libs = libs or ()
description = description or ''
resp_body = self.client.jobs.create(
name, job_type, mains, libs, description)
self.addCleanup(self.delete_resource, self.client.jobs, resp_body.id)
return resp_body
def create_cluster(self, name, **kwargs):
resp_body = self.client.clusters.create(name, **kwargs)
self.addCleanup(self.delete_resource, self.client.clusters,
resp_body.id)
return resp_body
def check_cluster_active(self, cluster_id):
timeout = TEMPEST_CONF.data_processing.cluster_timeout
s_time = timeutils.utcnow()
while timeutils.delta_seconds(s_time, timeutils.utcnow()) < timeout:
cluster = self.client.clusters.get(cluster_id)
if cluster.status == CLUSTER_STATUS_ACTIVE:
return
if cluster.status == CLUSTER_STATUS_ERROR:
raise exceptions.BuildErrorException(
'Cluster failed to build and is in %s status.' %
CLUSTER_STATUS_ERROR)
time.sleep(TEMPEST_CONF.data_processing.request_timeout)
raise exceptions.TimeoutException(
'Cluster failed to get to %s status within %d seconds.'
% (CLUSTER_STATUS_ACTIVE, timeout))
def create_job_execution(self, **kwargs):
resp_body = self.client.job_executions.create(**kwargs)
self.addCleanup(self.delete_resource, self.client.job_executions,
resp_body.id)
return resp_body
def create_container(self, name):
self.container_client.create_container(name)
self.addCleanup(self.delete_swift_container, name)
def delete_resource(self, resource_client, resource_id):
try:
resource_client.delete(resource_id)
except sab.APIException:
pass
else:
self.delete_timeout(resource_client, resource_id)
def delete_timeout(
self, resource_client, resource_id,
timeout=TEMPEST_CONF.data_processing.cluster_timeout):
start = timeutils.utcnow()
while timeutils.delta_seconds(start, timeutils.utcnow()) < timeout:
try:
resource_client.get(resource_id)
except sab.APIException as sahara_api_exception:
if 'not found' in sahara_api_exception.message:
return
raise sahara_api_exception
time.sleep(TEMPEST_CONF.data_processing.request_timeout)
raise exceptions.TimeoutException(
'Failed to delete resource "%s" in %d seconds.'
% (resource_id, timeout))
def delete_swift_container(self, container):
objects = ([obj['name'] for obj in
self.container_client.list_all_container_objects(
container)])
for obj in objects:
self.object_client.delete_object(container, obj)
self.container_client.delete_container(container)