Migrate cli test framework to tempest-lib

This commit switches the cli tests to use tempest-lib. The code which
was migrated to tempest-lib is removed and all the existing test cases
are converted to use the framework provided by the library. Future
improvements to the cli testing framework should be made in
tempest-lib after this commit.

Partially implements bp tempest-library
Change-Id: If2c9b3895a30905bc3d62b8daef783878d78399a
This commit is contained in:
Matthew Treinish
2014-08-28 19:21:40 -04:00
parent 884e2f3916
commit 411130a023
17 changed files with 82 additions and 510 deletions

View File

@@ -27,3 +27,4 @@ six>=1.7.0
iso8601>=0.1.9
fixtures>=0.3.14
testscenarios>=0.4
tempest-lib

View File

@@ -14,46 +14,19 @@
# under the License.
import functools
import os
import shlex
import subprocess
from tempest_lib.cli import base
import testtools
import tempest.cli.output_parser
from tempest import config
from tempest import exceptions
from tempest.openstack.common import log as logging
from tempest.openstack.common import versionutils
import tempest.test
from tempest import test
LOG = logging.getLogger(__name__)
CONF = config.CONF
def execute(cmd, action, flags='', params='', fail_ok=False,
merge_stderr=False):
"""Executes specified command for the given action."""
cmd = ' '.join([os.path.join(CONF.cli.cli_dir, cmd),
flags, action, params])
LOG.info("running: '%s'" % cmd)
cmd = shlex.split(cmd.encode('utf-8'))
result = ''
result_err = ''
stdout = subprocess.PIPE
stderr = subprocess.STDOUT if merge_stderr else subprocess.PIPE
proc = subprocess.Popen(cmd, stdout=stdout, stderr=stderr)
result, result_err = proc.communicate()
if not fail_ok and proc.returncode != 0:
raise exceptions.CommandFailed(proc.returncode,
cmd,
result,
result_err)
return result
def check_client_version(client, version):
"""Checks if the client's version is compatible with the given version
@@ -62,8 +35,8 @@ def check_client_version(client, version):
@return: True if the client version is compatible with the given version
parameter, False otherwise.
"""
current_version = execute(client, '', params='--version',
merge_stderr=True)
current_version = base.execute(client, '', params='--version',
merge_stderr=True, cli_dir=CONF.cli.cli_dir)
if not current_version.strip():
raise exceptions.TempestException('"%s --version" output was empty' %
@@ -92,7 +65,7 @@ def min_client_version(*args, **kwargs):
return decorator
class ClientTestBase(tempest.test.BaseTestCase):
class ClientTestBase(base.ClientTestBase, test.BaseTestCase):
@classmethod
def resource_setup(cls):
if not CONF.cli.enabled:
@@ -100,92 +73,9 @@ class ClientTestBase(tempest.test.BaseTestCase):
raise cls.skipException(msg)
super(ClientTestBase, cls).resource_setup()
def __init__(self, *args, **kwargs):
self.parser = tempest.cli.output_parser
super(ClientTestBase, self).__init__(*args, **kwargs)
def nova(self, action, flags='', params='', admin=True, fail_ok=False):
"""Executes nova command for the given action."""
flags += ' --endpoint-type %s' % CONF.compute.endpoint_type
return self.cmd_with_auth(
'nova', action, flags, params, admin, fail_ok)
def nova_manage(self, action, flags='', params='', fail_ok=False,
merge_stderr=False):
"""Executes nova-manage command for the given action."""
return execute(
'nova-manage', action, flags, params, fail_ok, merge_stderr)
def keystone(self, action, flags='', params='', admin=True, fail_ok=False):
"""Executes keystone command for the given action."""
return self.cmd_with_auth(
'keystone', action, flags, params, admin, fail_ok)
def glance(self, action, flags='', params='', admin=True, fail_ok=False):
"""Executes glance command for the given action."""
flags += ' --os-endpoint-type %s' % CONF.image.endpoint_type
return self.cmd_with_auth(
'glance', action, flags, params, admin, fail_ok)
def ceilometer(self, action, flags='', params='', admin=True,
fail_ok=False):
"""Executes ceilometer command for the given action."""
flags += ' --os-endpoint-type %s' % CONF.telemetry.endpoint_type
return self.cmd_with_auth(
'ceilometer', action, flags, params, admin, fail_ok)
def heat(self, action, flags='', params='', admin=True,
fail_ok=False):
"""Executes heat command for the given action."""
flags += ' --os-endpoint-type %s' % CONF.orchestration.endpoint_type
return self.cmd_with_auth(
'heat', action, flags, params, admin, fail_ok)
def cinder(self, action, flags='', params='', admin=True, fail_ok=False):
"""Executes cinder command for the given action."""
flags += ' --endpoint-type %s' % CONF.volume.endpoint_type
return self.cmd_with_auth(
'cinder', action, flags, params, admin, fail_ok)
def swift(self, action, flags='', params='', admin=True, fail_ok=False):
"""Executes swift command for the given action."""
flags += ' --os-endpoint-type %s' % CONF.object_storage.endpoint_type
return self.cmd_with_auth(
'swift', action, flags, params, admin, fail_ok)
def neutron(self, action, flags='', params='', admin=True, fail_ok=False):
"""Executes neutron command for the given action."""
flags += ' --endpoint-type %s' % CONF.network.endpoint_type
return self.cmd_with_auth(
'neutron', action, flags, params, admin, fail_ok)
def sahara(self, action, flags='', params='', admin=True,
fail_ok=False, merge_stderr=True):
"""Executes sahara command for the given action."""
flags += ' --endpoint-type %s' % CONF.data_processing.endpoint_type
return self.cmd_with_auth(
'sahara', action, flags, params, admin, fail_ok, merge_stderr)
def cmd_with_auth(self, cmd, action, flags='', params='',
admin=True, fail_ok=False, merge_stderr=False):
"""Executes given command with auth attributes appended."""
# TODO(jogo) make admin=False work
creds = ('--os-username %s --os-tenant-name %s --os-password %s '
'--os-auth-url %s' %
(CONF.identity.admin_username,
CONF.identity.admin_tenant_name,
CONF.identity.admin_password,
CONF.identity.uri))
flags = creds + ' ' + flags
return execute(cmd, action, flags, params, fail_ok, merge_stderr)
def assertTableStruct(self, items, field_names):
"""Verify that all items has keys listed in field_names."""
for item in items:
for field in field_names:
self.assertIn(field, item)
def assertFirstLineStartsWith(self, lines, beginning):
self.assertTrue(lines[0].startswith(beginning),
msg=('Beginning of first line has invalid content: %s'
% lines[:3]))
def _get_clients(self):
clients = base.CLIClient(CONF.identity.admin_username,
CONF.identity.admin_password,
CONF.identity.admin_tenant_name,
CONF.identity.uri, CONF.cli.cli_dir)
return clients

View File

@@ -1,171 +0,0 @@
# Copyright 2013 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Collection of utilities for parsing CLI clients output."""
import re
from tempest import exceptions
from tempest.openstack.common import log as logging
LOG = logging.getLogger(__name__)
delimiter_line = re.compile('^\+\-[\+\-]+\-\+$')
def details_multiple(output_lines, with_label=False):
"""Return list of dicts with item details from cli output tables.
If with_label is True, key '__label' is added to each items dict.
For more about 'label' see OutputParser.tables().
"""
items = []
tables_ = tables(output_lines)
for table_ in tables_:
if 'Property' not in table_['headers'] \
or 'Value' not in table_['headers']:
raise exceptions.InvalidStructure()
item = {}
for value in table_['values']:
item[value[0]] = value[1]
if with_label:
item['__label'] = table_['label']
items.append(item)
return items
def details(output_lines, with_label=False):
"""Return dict with details of first item (table) found in output."""
items = details_multiple(output_lines, with_label)
return items[0]
def listing(output_lines):
"""Return list of dicts with basic item info parsed from cli output.
"""
items = []
table_ = table(output_lines)
for row in table_['values']:
item = {}
for col_idx, col_key in enumerate(table_['headers']):
item[col_key] = row[col_idx]
items.append(item)
return items
def tables(output_lines):
"""Find all ascii-tables in output and parse them.
Return list of tables parsed from cli output as dicts.
(see OutputParser.table())
And, if found, label key (separated line preceding the table)
is added to each tables dict.
"""
tables_ = []
table_ = []
label = None
start = False
header = False
if not isinstance(output_lines, list):
output_lines = output_lines.split('\n')
for line in output_lines:
if delimiter_line.match(line):
if not start:
start = True
elif not header:
# we are after head area
header = True
else:
# table ends here
start = header = None
table_.append(line)
parsed = table(table_)
parsed['label'] = label
tables_.append(parsed)
table_ = []
label = None
continue
if start:
table_.append(line)
else:
if label is None:
label = line
else:
LOG.warn('Invalid line between tables: %s' % line)
if len(table_) > 0:
LOG.warn('Missing end of table')
return tables_
def table(output_lines):
"""Parse single table from cli output.
Return dict with list of column names in 'headers' key and
rows in 'values' key.
"""
table_ = {'headers': [], 'values': []}
columns = None
if not isinstance(output_lines, list):
output_lines = output_lines.split('\n')
if not output_lines[-1]:
# skip last line if empty (just newline at the end)
output_lines = output_lines[:-1]
for line in output_lines:
if delimiter_line.match(line):
columns = _table_columns(line)
continue
if '|' not in line:
LOG.warn('skipping invalid table line: %s' % line)
continue
row = []
for col in columns:
row.append(line[col[0]:col[1]].strip())
if table_['headers']:
table_['values'].append(row)
else:
table_['headers'] = row
return table_
def _table_columns(first_table_row):
"""Find column ranges in output line.
Return list of tuples (start,end) for each column
detected by plus (+) characters in delimiter line.
"""
positions = []
start = 1 # there is '+' at 0
while start < len(first_table_row):
end = first_table_row.find('+', start)
if end == -1:
break
positions.append((start, end))
start = end + 1
return positions

View File

@@ -13,11 +13,11 @@
# License for the specific language governing permissions and limitations
# under the License.
from tempest_lib import exceptions
import testtools
from tempest import cli
from tempest import config
from tempest import exceptions
from tempest.openstack.common import log as logging
import tempest.test
@@ -47,6 +47,11 @@ class SimpleReadOnlyNovaClientTest(cli.ClientTestBase):
raise cls.skipException(msg)
super(SimpleReadOnlyNovaClientTest, cls).resource_setup()
def nova(self, *args, **kwargs):
return self.clients.nova(*args,
endpoint_type=CONF.compute.endpoint_type,
**kwargs)
def test_admin_fake_action(self):
self.assertRaises(exceptions.CommandFailed,
self.nova,

View File

@@ -13,9 +13,10 @@
# License for the specific language governing permissions and limitations
# under the License.
from tempest_lib import exceptions
from tempest import cli
from tempest import config
from tempest import exceptions
from tempest.openstack.common import log as logging
@@ -46,6 +47,9 @@ class SimpleReadOnlyNovaManageTest(cli.ClientTestBase):
raise cls.skipException(msg)
super(SimpleReadOnlyNovaManageTest, cls).resource_setup()
def nova_manage(self, *args, **kwargs):
return self.clients.nova_manage(*args, **kwargs)
def test_admin_fake_action(self):
self.assertRaises(exceptions.CommandFailed,
self.nova_manage,

View File

@@ -15,9 +15,10 @@
import logging
import re
from tempest_lib import exceptions
from tempest import cli
from tempest import config
from tempest import exceptions
from tempest import test
CONF = config.CONF
@@ -40,6 +41,10 @@ class SimpleReadOnlySaharaClientTest(cli.ClientTestBase):
raise cls.skipException(msg)
super(SimpleReadOnlySaharaClientTest, cls).resource_setup()
def sahara(self, *args, **kwargs):
return self.clients.sahara(
*args, endpoint_type=CONF.data_processing.endpoint_type, **kwargs)
@test.attr(type='negative')
def test_sahara_fake_action(self):
self.assertRaises(exceptions.CommandFailed,

View File

@@ -15,9 +15,10 @@
import re
from tempest_lib import exceptions
from tempest import cli
from tempest import config
from tempest import exceptions
from tempest.openstack.common import log as logging
CONF = config.CONF
@@ -34,6 +35,9 @@ class SimpleReadOnlyKeystoneClientTest(cli.ClientTestBase):
their own. They only verify the structure of output if present.
"""
def keystone(self, *args, **kwargs):
return self.clients.keystone(*args, **kwargs)
def test_admin_fake_action(self):
self.assertRaises(exceptions.CommandFailed,
self.keystone,

View File

@@ -15,9 +15,10 @@
import re
from tempest_lib import exceptions
from tempest import cli
from tempest import config
from tempest import exceptions
from tempest.openstack.common import log as logging
CONF = config.CONF
@@ -40,6 +41,11 @@ class SimpleReadOnlyGlanceClientTest(cli.ClientTestBase):
raise cls.skipException(msg)
super(SimpleReadOnlyGlanceClientTest, cls).resource_setup()
def glance(self, *args, **kwargs):
return self.clients.glance(*args,
endpoint_type=CONF.image.endpoint_type,
**kwargs)
def test_glance_fake_action(self):
self.assertRaises(exceptions.CommandFailed,
self.glance,

View File

@@ -15,9 +15,10 @@
import re
from tempest_lib import exceptions
from tempest import cli
from tempest import config
from tempest import exceptions
from tempest.openstack.common import log as logging
from tempest import test
@@ -41,6 +42,11 @@ class SimpleReadOnlyNeutronClientTest(cli.ClientTestBase):
raise cls.skipException(msg)
super(SimpleReadOnlyNeutronClientTest, cls).resource_setup()
def neutron(self, *args, **kwargs):
return self.clients.neutron(*args,
endpoint_type=CONF.network.endpoint_type,
**kwargs)
@test.attr(type='smoke')
def test_neutron_fake_action(self):
self.assertRaises(exceptions.CommandFailed,

View File

@@ -15,9 +15,10 @@
import re
from tempest_lib import exceptions
from tempest import cli
from tempest import config
from tempest import exceptions
CONF = config.CONF
@@ -37,6 +38,10 @@ class SimpleReadOnlySwiftClientTest(cli.ClientTestBase):
raise cls.skipException(msg)
super(SimpleReadOnlySwiftClientTest, cls).resource_setup()
def swift(self, *args, **kwargs):
return self.clients.swift(
*args, endpoint_type=CONF.object_storage.endpoint_type, **kwargs)
def test_swift_fake_action(self):
self.assertRaises(exceptions.CommandFailed,
self.swift,

View File

@@ -42,6 +42,10 @@ class SimpleReadOnlyHeatClientTest(tempest.cli.ClientTestBase):
os.path.dirname(os.path.realpath(__file__))),
'heat_templates/heat_minimal.yaml')
def heat(self, *args, **kwargs):
return self.clients.heat(
*args, endpoint_type=CONF.orchestration.endpoint_type, **kwargs)
def test_heat_stack_list(self):
self.heat('stack-list')

View File

@@ -39,6 +39,10 @@ class SimpleReadOnlyCeilometerClientTest(cli.ClientTestBase):
raise cls.skipException(msg)
super(SimpleReadOnlyCeilometerClientTest, cls).resource_setup()
def ceilometer(self, *args, **kwargs):
return self.clients.ceilometer(
*args, endpoint_type=CONF.telemetry.endpoint_type, **kwargs)
def test_ceilometer_meter_list(self):
self.ceilometer('meter-list')

View File

@@ -16,11 +16,12 @@
import logging
import re
from tempest_lib import exceptions
import testtools
from tempest import cli
from tempest import config
from tempest import exceptions
CONF = config.CONF
LOG = logging.getLogger(__name__)
@@ -41,6 +42,11 @@ class SimpleReadOnlyCinderClientTest(cli.ClientTestBase):
raise cls.skipException(msg)
super(SimpleReadOnlyCinderClientTest, cls).resource_setup()
def cinder(self, *args, **kwargs):
return self.clients.cinder(*args,
endpoint_type=CONF.volume.endpoint_type,
**kwargs)
def test_cinder_fake_action(self):
self.assertRaises(exceptions.CommandFailed,
self.cinder,

View File

@@ -13,17 +13,25 @@
# under the License.
import mock
from tempest_lib.cli import base as cli_base
import testtools
from tempest import cli
from tempest import config
from tempest import exceptions
from tempest.tests import base
from tempest.tests import fake_config
class TestMinClientVersion(base.TestCase):
"""Tests for the min_client_version decorator.
"""
def setUp(self):
super(TestMinClientVersion, self).setUp()
self.useFixture(fake_config.ConfigFixture())
self.stubs.Set(config, 'TempestConfigPrivate', fake_config.FakePrivate)
def _test_min_version(self, required, installed, expect_skip):
@cli.min_client_version(client='nova', version=required)
@@ -33,7 +41,7 @@ class TestMinClientVersion(base.TestCase):
# expected so we need to fail.
self.fail('Should not have gotten past the decorator.')
with mock.patch.object(cli, 'execute',
with mock.patch.object(cli_base, 'execute',
return_value=installed) as mock_cmd:
if expect_skip:
self.assertRaises(testtools.TestCase.skipException, fake,
@@ -41,6 +49,7 @@ class TestMinClientVersion(base.TestCase):
else:
fake(self, expect_skip)
mock_cmd.assert_called_once_with('nova', '', params='--version',
cli_dir='/usr/local/bin',
merge_stderr=True)
def test_min_client_version(self):
@@ -52,7 +61,7 @@ class TestMinClientVersion(base.TestCase):
for case in cases:
self._test_min_version(*case)
@mock.patch.object(cli, 'execute', return_value=' ')
@mock.patch.object(cli_base, 'execute', return_value=' ')
def test_check_client_version_empty_output(self, mock_execute):
# Tests that an exception is raised if the command output is empty.
self.assertRaises(exceptions.TempestException,

View File

@@ -1,30 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest import exceptions
from tempest.tests import base
class TestOutputParser(base.TestCase):
def test_command_failed_exception(self):
returncode = 1
cmd = "foo"
stdout = "output"
stderr = "error"
try:
raise exceptions.CommandFailed(returncode, cmd, stdout, stderr)
except exceptions.CommandFailed as e:
self.assertIn(str(returncode), str(e))
self.assertIn(cmd, str(e))
self.assertIn(stdout, str(e))
self.assertIn(stderr, str(e))

View File

@@ -1,177 +0,0 @@
# Copyright 2014 NEC Corporation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tempest.cli import output_parser
from tempest import exceptions
from tempest.tests import base
class TestOutputParser(base.TestCase):
OUTPUT_LINES = """
+----+------+---------+
| ID | Name | Status |
+----+------+---------+
| 11 | foo | BUILD |
| 21 | bar | ERROR |
| 31 | bee | None |
+----+------+---------+
"""
OUTPUT_LINES2 = """
+----+-------+---------+
| ID | Name2 | Status2 |
+----+-------+---------+
| 41 | aaa | SSSSS |
| 51 | bbb | TTTTT |
| 61 | ccc | AAAAA |
+----+-------+---------+
"""
EXPECTED_TABLE = {'headers': ['ID', 'Name', 'Status'],
'values': [['11', 'foo', 'BUILD'],
['21', 'bar', 'ERROR'],
['31', 'bee', 'None']]}
EXPECTED_TABLE2 = {'headers': ['ID', 'Name2', 'Status2'],
'values': [['41', 'aaa', 'SSSSS'],
['51', 'bbb', 'TTTTT'],
['61', 'ccc', 'AAAAA']]}
def test_table_with_normal_values(self):
actual = output_parser.table(self.OUTPUT_LINES)
self.assertIsInstance(actual, dict)
self.assertEqual(self.EXPECTED_TABLE, actual)
def test_table_with_list(self):
output_lines = self.OUTPUT_LINES.split('\n')
actual = output_parser.table(output_lines)
self.assertIsInstance(actual, dict)
self.assertEqual(self.EXPECTED_TABLE, actual)
def test_table_with_invalid_line(self):
output_lines = self.OUTPUT_LINES + "aaaa"
actual = output_parser.table(output_lines)
self.assertIsInstance(actual, dict)
self.assertEqual(self.EXPECTED_TABLE, actual)
def test_tables_with_normal_values(self):
output_lines = 'test' + self.OUTPUT_LINES +\
'test2' + self.OUTPUT_LINES2
expected = [{'headers': self.EXPECTED_TABLE['headers'],
'label': 'test',
'values': self.EXPECTED_TABLE['values']},
{'headers': self.EXPECTED_TABLE2['headers'],
'label': 'test2',
'values': self.EXPECTED_TABLE2['values']}]
actual = output_parser.tables(output_lines)
self.assertIsInstance(actual, list)
self.assertEqual(expected, actual)
def test_tables_with_invalid_values(self):
output_lines = 'test' + self.OUTPUT_LINES +\
'test2' + self.OUTPUT_LINES2 + '\n'
expected = [{'headers': self.EXPECTED_TABLE['headers'],
'label': 'test',
'values': self.EXPECTED_TABLE['values']},
{'headers': self.EXPECTED_TABLE2['headers'],
'label': 'test2',
'values': self.EXPECTED_TABLE2['values']}]
actual = output_parser.tables(output_lines)
self.assertIsInstance(actual, list)
self.assertEqual(expected, actual)
def test_tables_with_invalid_line(self):
output_lines = 'test' + self.OUTPUT_LINES +\
'test2' + self.OUTPUT_LINES2 +\
'+----+-------+---------+'
expected = [{'headers': self.EXPECTED_TABLE['headers'],
'label': 'test',
'values': self.EXPECTED_TABLE['values']},
{'headers': self.EXPECTED_TABLE2['headers'],
'label': 'test2',
'values': self.EXPECTED_TABLE2['values']}]
actual = output_parser.tables(output_lines)
self.assertIsInstance(actual, list)
self.assertEqual(expected, actual)
LISTING_OUTPUT = """
+----+
| ID |
+----+
| 11 |
| 21 |
| 31 |
+----+
"""
def test_listing(self):
expected = [{'ID': '11'}, {'ID': '21'}, {'ID': '31'}]
actual = output_parser.listing(self.LISTING_OUTPUT)
self.assertIsInstance(actual, list)
self.assertEqual(expected, actual)
def test_details_multiple_with_invalid_line(self):
self.assertRaises(exceptions.InvalidStructure,
output_parser.details_multiple,
self.OUTPUT_LINES)
DETAILS_LINES1 = """First Table
+----------+--------+
| Property | Value |
+----------+--------+
| foo | BUILD |
| bar | ERROR |
| bee | None |
+----------+--------+
"""
DETAILS_LINES2 = """Second Table
+----------+--------+
| Property | Value |
+----------+--------+
| aaa | VVVVV |
| bbb | WWWWW |
| ccc | XXXXX |
+----------+--------+
"""
def test_details_with_normal_line_label_false(self):
expected = {'foo': 'BUILD', 'bar': 'ERROR', 'bee': 'None'}
actual = output_parser.details(self.DETAILS_LINES1)
self.assertEqual(expected, actual)
def test_details_with_normal_line_label_true(self):
expected = {'__label': 'First Table',
'foo': 'BUILD', 'bar': 'ERROR', 'bee': 'None'}
actual = output_parser.details(self.DETAILS_LINES1, with_label=True)
self.assertEqual(expected, actual)
def test_details_multiple_with_normal_line_label_false(self):
expected = [{'foo': 'BUILD', 'bar': 'ERROR', 'bee': 'None'},
{'aaa': 'VVVVV', 'bbb': 'WWWWW', 'ccc': 'XXXXX'}]
actual = output_parser.details_multiple(self.DETAILS_LINES1 +
self.DETAILS_LINES2)
self.assertIsInstance(actual, list)
self.assertEqual(expected, actual)
def test_details_multiple_with_normal_line_label_true(self):
expected = [{'__label': 'First Table',
'foo': 'BUILD', 'bar': 'ERROR', 'bee': 'None'},
{'__label': 'Second Table',
'aaa': 'VVVVV', 'bbb': 'WWWWW', 'ccc': 'XXXXX'}]
actual = output_parser.details_multiple(self.DETAILS_LINES1 +
self.DETAILS_LINES2,
with_label=True)
self.assertIsInstance(actual, list)
self.assertEqual(expected, actual)

View File

@@ -16,7 +16,8 @@
import shlex
import subprocess
import tempest.exceptions as exceptions
from tempest_lib import exceptions
from tempest.openstack.common import log as logging
from tempest.tests import base