Clean up and move cli test base
This commit moves the migrated cli testing framework and adapts it to be an independent tool. Part of this is removing the config references and reworking the test class to be a regular class for calling cli args.
This commit is contained in:
0
tempest_lib/cli/__init__.py
Normal file
0
tempest_lib/cli/__init__.py
Normal file
@@ -20,23 +20,19 @@ import subprocess
|
|||||||
|
|
||||||
import testtools
|
import testtools
|
||||||
|
|
||||||
import tempest.cli.output_parser
|
import tempest_lib.cli.output_parser
|
||||||
from tempest import config
|
from tempest_lib import exceptions
|
||||||
from tempest import exceptions
|
from tempest_lib.openstack.common import log as logging
|
||||||
from tempest.openstack.common import log as logging
|
from tempest_lib.openstack.common import versionutils
|
||||||
from tempest.openstack.common import versionutils
|
|
||||||
import tempest.test
|
|
||||||
|
|
||||||
|
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
CONF = config.CONF
|
|
||||||
|
|
||||||
|
|
||||||
def execute(cmd, action, flags='', params='', fail_ok=False,
|
def execute(cmd, action, flags='', params='', fail_ok=False,
|
||||||
merge_stderr=False):
|
merge_stderr=False, cli_dir='/usr/bin'):
|
||||||
"""Executes specified command for the given action."""
|
"""Executes specified command for the given action."""
|
||||||
cmd = ' '.join([os.path.join(CONF.cli.cli_dir, cmd),
|
cmd = ' '.join([os.path.join(cli_dir, cmd),
|
||||||
flags, action, params])
|
flags, action, params])
|
||||||
LOG.info("running: '%s'" % cmd)
|
LOG.info("running: '%s'" % cmd)
|
||||||
cmd = shlex.split(cmd.encode('utf-8'))
|
cmd = shlex.split(cmd.encode('utf-8'))
|
||||||
@@ -92,21 +88,21 @@ def min_client_version(*args, **kwargs):
|
|||||||
return decorator
|
return decorator
|
||||||
|
|
||||||
|
|
||||||
class ClientTestBase(tempest.test.BaseTestCase):
|
class CLIClientBase(object):
|
||||||
@classmethod
|
def __init__(self, username='', password='', tenant_name='', uri='',
|
||||||
def setUpClass(cls):
|
cli_dir='', *args, **kwargs):
|
||||||
if not CONF.cli.enabled:
|
super(CLIClientBase, self).__init__()
|
||||||
msg = "cli testing disabled"
|
self.parser = tempest_lib.cli.output_parser
|
||||||
raise cls.skipException(msg)
|
self.cli_dir = cli_dir if cli_dir else '/usr/bin'
|
||||||
super(ClientTestBase, cls).setUpClass()
|
self.username = username
|
||||||
|
self.tenant_name = tenant_name
|
||||||
|
self.password = password
|
||||||
|
self.uri = uri
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
def nova(self, action, flags='', params='', admin=True, fail_ok=False,
|
||||||
self.parser = tempest.cli.output_parser
|
endpoint_type='publicURL'):
|
||||||
super(ClientTestBase, self).__init__(*args, **kwargs)
|
|
||||||
|
|
||||||
def nova(self, action, flags='', params='', admin=True, fail_ok=False):
|
|
||||||
"""Executes nova command for the given action."""
|
"""Executes nova command for the given action."""
|
||||||
flags += ' --endpoint-type %s' % CONF.compute.endpoint_type
|
flags += ' --endpoint-type %s' % endpoint_type
|
||||||
return self.cmd_with_auth(
|
return self.cmd_with_auth(
|
||||||
'nova', action, flags, params, admin, fail_ok)
|
'nova', action, flags, params, admin, fail_ok)
|
||||||
|
|
||||||
@@ -121,63 +117,68 @@ class ClientTestBase(tempest.test.BaseTestCase):
|
|||||||
return self.cmd_with_auth(
|
return self.cmd_with_auth(
|
||||||
'keystone', action, flags, params, admin, fail_ok)
|
'keystone', action, flags, params, admin, fail_ok)
|
||||||
|
|
||||||
def glance(self, action, flags='', params='', admin=True, fail_ok=False):
|
def glance(self, action, flags='', params='', admin=True, fail_ok=False,
|
||||||
|
endpoint_type='publicURL'):
|
||||||
"""Executes glance command for the given action."""
|
"""Executes glance command for the given action."""
|
||||||
flags += ' --os-endpoint-type %s' % CONF.image.endpoint_type
|
flags += ' --os-endpoint-type %s' % endpoint_type
|
||||||
return self.cmd_with_auth(
|
return self.cmd_with_auth(
|
||||||
'glance', action, flags, params, admin, fail_ok)
|
'glance', action, flags, params, admin, fail_ok)
|
||||||
|
|
||||||
def ceilometer(self, action, flags='', params='', admin=True,
|
def ceilometer(self, action, flags='', params='', admin=True,
|
||||||
fail_ok=False):
|
fail_ok=False, endpoint_type='publicURL'):
|
||||||
"""Executes ceilometer command for the given action."""
|
"""Executes ceilometer command for the given action."""
|
||||||
flags += ' --os-endpoint-type %s' % CONF.telemetry.endpoint_type
|
flags += ' --os-endpoint-type %s' % endpoint_type
|
||||||
return self.cmd_with_auth(
|
return self.cmd_with_auth(
|
||||||
'ceilometer', action, flags, params, admin, fail_ok)
|
'ceilometer', action, flags, params, admin, fail_ok)
|
||||||
|
|
||||||
def heat(self, action, flags='', params='', admin=True,
|
def heat(self, action, flags='', params='', admin=True,
|
||||||
fail_ok=False):
|
fail_ok=False, endpoint_type='publicURL'):
|
||||||
"""Executes heat command for the given action."""
|
"""Executes heat command for the given action."""
|
||||||
flags += ' --os-endpoint-type %s' % CONF.orchestration.endpoint_type
|
flags += ' --os-endpoint-type %s' % endpoint_type
|
||||||
return self.cmd_with_auth(
|
return self.cmd_with_auth(
|
||||||
'heat', action, flags, params, admin, fail_ok)
|
'heat', action, flags, params, admin, fail_ok)
|
||||||
|
|
||||||
def cinder(self, action, flags='', params='', admin=True, fail_ok=False):
|
def cinder(self, action, flags='', params='', admin=True, fail_ok=False,
|
||||||
|
endpoint_type='publicURL'):
|
||||||
"""Executes cinder command for the given action."""
|
"""Executes cinder command for the given action."""
|
||||||
flags += ' --endpoint-type %s' % CONF.volume.endpoint_type
|
flags += ' --endpoint-type %s' % endpoint_type
|
||||||
return self.cmd_with_auth(
|
return self.cmd_with_auth(
|
||||||
'cinder', action, flags, params, admin, fail_ok)
|
'cinder', action, flags, params, admin, fail_ok)
|
||||||
|
|
||||||
def swift(self, action, flags='', params='', admin=True, fail_ok=False):
|
def swift(self, action, flags='', params='', admin=True, fail_ok=False,
|
||||||
|
endpoint_type='publicURL'):
|
||||||
"""Executes swift command for the given action."""
|
"""Executes swift command for the given action."""
|
||||||
flags += ' --os-endpoint-type %s' % CONF.object_storage.endpoint_type
|
flags += ' --os-endpoint-type %s' % endpoint_type
|
||||||
return self.cmd_with_auth(
|
return self.cmd_with_auth(
|
||||||
'swift', action, flags, params, admin, fail_ok)
|
'swift', action, flags, params, admin, fail_ok)
|
||||||
|
|
||||||
def neutron(self, action, flags='', params='', admin=True, fail_ok=False):
|
def neutron(self, action, flags='', params='', admin=True, fail_ok=False,
|
||||||
|
endpoint_type='publicURL'):
|
||||||
"""Executes neutron command for the given action."""
|
"""Executes neutron command for the given action."""
|
||||||
flags += ' --endpoint-type %s' % CONF.network.endpoint_type
|
flags += ' --endpoint-type %s' % endpoint_type
|
||||||
return self.cmd_with_auth(
|
return self.cmd_with_auth(
|
||||||
'neutron', action, flags, params, admin, fail_ok)
|
'neutron', action, flags, params, admin, fail_ok)
|
||||||
|
|
||||||
def sahara(self, action, flags='', params='', admin=True,
|
def sahara(self, action, flags='', params='', admin=True,
|
||||||
fail_ok=False, merge_stderr=True):
|
fail_ok=False, merge_stderr=True, endpoint_type='publicURL'):
|
||||||
"""Executes sahara command for the given action."""
|
"""Executes sahara command for the given action."""
|
||||||
flags += ' --endpoint-type %s' % CONF.data_processing.endpoint_type
|
flags += ' --endpoint-type %s' % endpoint_type
|
||||||
return self.cmd_with_auth(
|
return self.cmd_with_auth(
|
||||||
'sahara', action, flags, params, admin, fail_ok, merge_stderr)
|
'sahara', action, flags, params, admin, fail_ok, merge_stderr)
|
||||||
|
|
||||||
def cmd_with_auth(self, cmd, action, flags='', params='',
|
def cmd_with_auth(self, cmd, action, flags='', params='',
|
||||||
admin=True, fail_ok=False, merge_stderr=False):
|
fail_ok=False, merge_stderr=False):
|
||||||
"""Executes given command with auth attributes appended."""
|
"""Executes given command with auth attributes appended."""
|
||||||
# TODO(jogo) make admin=False work
|
# TODO(jogo) make admin=False work
|
||||||
creds = ('--os-username %s --os-tenant-name %s --os-password %s '
|
creds = ('--os-username %s --os-tenant-name %s --os-password %s '
|
||||||
'--os-auth-url %s' %
|
'--os-auth-url %s' %
|
||||||
(CONF.identity.admin_username,
|
(self.username,
|
||||||
CONF.identity.admin_tenant_name,
|
self.tenant_name,
|
||||||
CONF.identity.admin_password,
|
self.password,
|
||||||
CONF.identity.uri))
|
self.uri))
|
||||||
flags = creds + ' ' + flags
|
flags = creds + ' ' + flags
|
||||||
return execute(cmd, action, flags, params, fail_ok, merge_stderr)
|
return execute(cmd, action, flags, params, fail_ok, merge_stderr,
|
||||||
|
self.cli_dir)
|
||||||
|
|
||||||
def assertTableStruct(self, items, field_names):
|
def assertTableStruct(self, items, field_names):
|
||||||
"""Verify that all items has keys listed in field_names."""
|
"""Verify that all items has keys listed in field_names."""
|
Reference in New Issue
Block a user