Automatic Tempest Configuration Generator
# Copyright 2016, 2018 Red Hat, Inc.
# All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
from six.moves import configparser
from tempest.lib import exceptions
from config_tempest.constants import LOG
from import Service
class ObjectStorageService(Service):
def set_extensions(self):
if 'v3' not in self.service_url: # it's not a v3 url
body = self.do_get(self.service_url, top_level=True,
body = json.loads(body)
# Remove Swift general information from extensions list
self.extensions = body.keys()
except Exception:
self.extensions = []
self.extensions = []
def list_create_roles(self, conf, client):
roles = client.list_roles()['roles']
for section_key in ["operator_role", "reseller_admin_role"]:
key_value = conf.get_defaulted("object-storage", section_key)
if key_value not in [r['name'] for r in roles]:"Creating %s role", key_value)
except exceptions.Conflict:"Role %s already exists", key_value)
conf.set('object-storage', 'operator_role', 'admin')
except exceptions.Forbidden:"Roles can't be listed or created. The user doesn't have "
# If is not admin, we set the operator_role to Member
# otherwise we set to admin
conf.set('object-storage', 'operator_role', 'Member')
def get_feature_name(self):
return 'object-storage'
def get_service_extension_key(self):
return 'discoverable_apis'
def _check_health_check(self, path):
resp, _ = self.client.accounts.get(path, {})
return resp['status'] == '200'
except Exception as e:
LOG.warning('Healthcheck API not discovered giving %s', e)
return False
def check_service_status(self, conf):
"""Use healthcheck api to check the service status
:type conf: TempestConf object
# Check for swift discoverability if it is False
# check_service_status returns False
# Else above is True, then we can check for healthcheck
# API then we can find the service_status
if not conf.get_bool_value(
return False
return True
except configparser.NoSectionError:
# On swift, healthcheck is under http://.../healthcheck, while in
# ceph is under http://.../swift/healthcheck, we check both cases
return_value = self._check_health_check('healthcheck')
if not return_value:
return_value = self._check_health_check('swift/healthcheck')
return return_value
def set_default_tempest_options(self, conf):
"""Set default values for swift
:type conf: TempestConf object
swift_status = self.check_service_status(conf)
# Set roles based on service status
if swift_status:
self.list_create_roles(conf, self.client.roles)
def get_service_name():
return ['swift']
def get_codename():
return 'swift'