Reanimate gceapi to work with updated gcutil

Implemented gce functional test for zones via google api client

Change-Id: I8dcae50f50e4ea22a368e015426afe5bfbef5cdd
This commit is contained in:
alexm 2015-10-12 19:36:58 +03:00 committed by alexey-mr
parent 43fdf104d3
commit 0186280b15
9 changed files with 835 additions and 0 deletions

23
gceapi/api/opts.py Normal file
View File

@ -0,0 +1,23 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import gceapi.api
import itertools
def list_opts():
return [
('DEFAULT',
itertools.chain(
gceapi.api.gce_opts,
)),
]

33
gceapi/opts.py Normal file
View File

@ -0,0 +1,33 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import gceapi.auth
import gceapi.db.api
import gceapi.exception
import gceapi.paths
import gceapi.service
import gceapi.wsgi
import itertools
def list_opts():
return [
('DEFAULT',
itertools.chain(
gceapi.auth.auth_opts,
gceapi.db.api.tpool_opts,
gceapi.exception.exc_log_opts,
gceapi.paths.path_opts,
gceapi.service.service_opts,
gceapi.wsgi.wsgi_opts,
)),
]

View File

@ -0,0 +1,122 @@
#!/bin/bash -x
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# This script is executed inside post_test_hook function in devstack gate.
# Sleep some time until all services are starting
sleep 5
export TEST_CONFIG_DIR=$(readlink -f .)
export TEST_CONFIG="functional_tests.conf"
# save original creds(admin) for later usage
OLD_OS_PROJECT_NAME=$OS_PROJECT_NAME
OLD_OS_USERNAME=$OS_USERNAME
OLD_OS_PASSWORD=$OS_PASSWORD
# bug somewhere
unset OS_AUTH_TYPE
if [[ ! -f $TEST_CONFIG_DIR/$TEST_CONFIG ]]; then
openstack endpoint list --os-identity-api-version=3
openstack service list --long
if [[ "$?" -ne "0" ]]; then
echo "Looks like credentials are absent."
exit 1
fi
# prepare flavors
nova flavor-create --is-public True m1.gceapi 16 512 0 1
# create separate user/project
project_name="project-$(cat /dev/urandom | tr -cd 'a-f0-9' | head -c 8)"
eval $(openstack project create -f shell -c id $project_name)
project_id=$id
[[ -n "$project_id" ]] || { echo "Can't create project"; exit 1; }
user_name="user-$(cat /dev/urandom | tr -cd 'a-f0-9' | head -c 8)"
eval $(openstack user create "$user_name" --project "$project_id" --password "password" --email "$user_name@example.com" -f shell -c id)
user_id=$id
[[ -n "$user_id" ]] || { echo "Can't create user"; exit 1; }
# add 'Member' role for swift access
role_id=$(openstack role show Member -c id -f value)
openstack role add --project $project_id --user $user_id $role_id
# create network
if [[ -n $(openstack service list | grep neutron) ]]; then
net_id=$(neutron net-create --tenant-id $project_id "private" | grep ' id ' | awk '{print $4}')
[[ -n "$net_id" ]] || { echo "net-create failed"; exit 1; }
subnet_id=$(neutron subnet-create --tenant-id $project_id --ip_version 4 --gateway 10.0.0.1 --name "private_subnet" $net_id 10.0.0.0/24 | grep ' id ' | awk '{print $4}')
[[ -n "$subnet_id" ]] || { echo "subnet-create failed"; exit 1; }
router_id=$(neutron router-create --tenant-id $project_id "private_router" | grep ' id ' | awk '{print $4}')
[[ -n "$router_id" ]] || { echo "router-create failed"; exit 1; }
neutron router-interface-add $router_id $subnet_id
[[ "$?" -eq 0 ]] || { echo "router-interface-add failed"; exit 1; }
public_net_id=$(neutron net-list | grep public | awk '{print $2}')
[[ -n "$public_net_id" ]] || { echo "can't find public network"; exit 1; }
neutron router-gateway-set $router_id $public_net_id
[[ "$?" -eq 0 ]] || { echo "router-gateway-set failed"; exit 1; }
fi
export OS_PROJECT_NAME=$project_name
export OS_TENANT_NAME=$project_name
export OS_USERNAME=$user_name
export OS_PASSWORD="password"
sudo bash -c "cat > $TEST_CONFIG_DIR/$TEST_CONFIG <<EOF
[gce]
# Generic options
build_interval=${TIMEOUT:-180}
# GCE API schema
schema=${GCE_SCHEMA:-'etc/gceapi/protocols/v1.json'}
# GCE services address
protocol=${GCE_API_PROTOCOL:-'http'}
host=${GCE_HOST:-'localhost'}
port=${GCE_PORT:-8787}
# GCE URLs
auth_url=${GCE_AUTH_URL:-'/auth'}
discovery_url=${GCE_DISCOVERY_URL:-'/discovery/v1/apis/{api}/{apiVersion}/rest'}
# GCE resource IDs for testing
project_id=$project_id
zone=${ZONE:-'nova'}
region=${REGION:-'RegionOne'}
EOF"
fi
sudo pip install -r test-requirements.txt
sudo OS_STDOUT_CAPTURE=-1 OS_STDERR_CAPTURE=-1 OS_TEST_TIMEOUT=500 OS_TEST_LOCK_PATH=${TMPDIR:-'/tmp'} \
python -m subunit.run discover -t ./ ./gceapi/tests/functional | subunit-2to1 | tools/colorizer.py
RETVAL=$?
# Here can be some commands for log archiving, etc...
echo Enumerate resources to check what left after tests
for i in instances images disks snapshots
do
echo "List of "$i
gcloud compute $i list
echo ""
done
export OS_PROJECT_NAME=$OLD_OS_PROJECT_NAME
export OS_TENANT_NAME=$OLD_OS_PROJECT_NAME
export OS_USERNAME=$OLD_OS_USERNAME
export OS_PASSWORD=$OLD_OS_PASSWORD
openstack server list --all-projects
openstack image list
openstack volume list --all-projects
cinder snapshot-list --all-tenants
exit $RETVAL

View File

View File

@ -0,0 +1,50 @@
# Copyright 2015 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from gceapi.tests.functional import test_base
class TestRegions(test_base.GCETestCase):
"""
Test perform two action:
- list of available regions
- describe a particular region (that is in config)
Test expects project_id and testing region are to be set in test config.
"""
@property
def regions(self):
res = self.api.compute.regions()
self.assertIsNotNone(res,
'Null regions object, api is not built properly')
return res
def test_list(self):
project_id = self.cfg.project_id
self.trace('List regions: project_id={}'.format(project_id))
result = self.regions.list(project=project_id).execute()
self.trace('Regions: {}'.format(result))
self.api.validate_schema(value=result, schema_name='RegionList')
def test_describe(self):
project_id = self.cfg.project_id
region = self.cfg.region
self.trace('Describe region: project_id={} region={}'.format(
project_id, region))
result = self.regions.get(project=project_id, region=region).execute()
self.trace('Region: {}'.format(result))
self.api.validate_schema(value=result, schema_name='Region')

View File

@ -0,0 +1,53 @@
# Copyright 2015 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from gceapi.tests.functional import test_base
class TestZones(test_base.GCETestCase):
"""
Test perform two actions: list of available zones and get info about
a particular zone. Test expects project_id and zone are to be set in
test config.
"""
@property
def zones(self):
res = self.api.compute.zones()
self.assertIsNotNone(res,
'Null regions object, api is not built properly')
return res
def test_get_zone(self):
project_id = self.cfg.project_id
zone = self.cfg.zone
self.trace('Get zone: project_id={} zone={}'.format(project_id, zone))
result = self.zones.get(project=project_id, zone=zone).execute()
self.trace('Zone: {}'.format(result))
self.api.validate_schema(value=result, schema_name='Zone')
def test_list_zones(self):
project_id = self.cfg.project_id
self.trace('List zones: project_id={}'.format(project_id))
result = self.zones.list(project=project_id).execute()
self.trace('Zones: {}'.format(result))
self.api.validate_schema(value=result, schema_name='ZoneList')
zone = self.cfg.zone
self.assertIn(
zone,
result['items'],
'There is no required zone {} in returned list'.format(zone))

View File

@ -0,0 +1,116 @@
# Copyright 2012 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from os import environ
from os import path
from oslo_config import cfg
OPTIONS_GROUP = cfg.OptGroup(name='gce', title='GCE options')
OPTIONS = [
# Generic options
cfg.IntOpt('build_timeout',
default=180,
help='Timeout'),
cfg.IntOpt('build_interval',
default=1,
help='Interval'),
# GCE API schema
cfg.StrOpt('schema',
default='etc/gceapi/protocols/v1.json',
help='Json file with API schema for validation'),
# GCE services address
cfg.StrOpt('protocol',
default='http',
help='GCE protocl (http or https)'),
cfg.StrOpt('host',
default='localhost',
help='GCE service host'),
cfg.IntOpt('port',
default=8787,
help='GCE service port'),
# GCE URLs
cfg.StrOpt('auth_url',
default='/auth',
help='OAuth API relative URL'),
cfg.StrOpt('discovery_url',
default='/discovery/v1/apis/{api}/{apiVersion}/rest',
help='Discovery API relative URL'),
# GCE resource IDs for testing
cfg.StrOpt('project_id',
default='test',
help='GCE Project ID for testing'),
cfg.StrOpt('zone',
default='nova',
help='GCE Zone for testing'),
cfg.StrOpt('region',
default='RegionOne',
help='GCE Region for testing'),
]
# This should never be called outside of this class
class ConfigPrivate(object):
"""Provides OpenStack configuration information."""
def __init__(self):
"""Initialize a configuration from a conf directory and conf file."""
super(ConfigPrivate, self).__init__()
base_dir = self._get_base_dir()
cfg_file_path = self._get_default_config_path(base_dir)
config_files = []
if path.exists(cfg_file_path):
config_files.append(cfg_file_path)
cfg.CONF.register_group(OPTIONS_GROUP)
cfg.CONF.register_opts(OPTIONS, group=OPTIONS_GROUP)
cfg.CONF([], project='gceapi', default_config_files=config_files)
self.gce = cfg.CONF.gce
# Load API scheme for API calls validation
with open(self._get_default_schema_path(base_dir), 'r') as f:
from json import load
self.gce.schema = load(f)
@staticmethod
def _get_base_dir():
cur_dir = path.dirname(__file__)
base_dir = path.dirname(path.dirname(path.dirname(cur_dir)))
return environ.get('TEST_CONFIG_DIR', base_dir)
@staticmethod
def _get_default_config_path(base_dir):
conf_file = environ.get('TEST_CONFIG', 'functional_tests.conf')
return path.join(base_dir, conf_file)
def _get_default_schema_path(self, base_dir):
schema_file = environ.get('TEST_SCHEMA', self.gce.schema)
return path.join(base_dir, schema_file)
class ConfigProxy(object):
_config = None
def __getattr__(self, attr):
if not self._config:
self._config = ConfigPrivate()
return getattr(self._config, attr)
CONF = ConfigProxy()

View File

@ -0,0 +1,106 @@
# Copyright 2015 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from gceapi.tests.functional import config
from googleapiclient.discovery import build
from googleapiclient.schema import Schemas
from jsonschema import validate
from oauth2client.client import GoogleCredentials
from tempest_lib import base
class TestSupp(object):
def __init__(self, *args, **kwargs):
self._cfg = config.CONF.gce
from oslo_log import log as logging
self._log = logging.getLogger("gceapi")
@property
def cfg(self):
return self._cfg
def trace(self, *args, **kwargs):
print(args, kwargs)
self._log.trace(*args, **kwargs)
class GCEApi(object):
def __init__(self, supp):
self._supp = supp
self._schema = None
self._compute = None
def init(self):
self._schema = Schemas(self._supp.cfg.schema)
self._auth()
self._build_api()
def _auth(self):
self._trace('Create GoogleCredentials from default app file')
self.credentials = GoogleCredentials.get_application_default()
def _build_api(self):
url = self._get_discovery_url()
self._trace(
'Build Google compute api with discovery url {}'.format(url))
self._compute = build(
'compute', 'v1',
credentials=self.credentials,
discoveryServiceUrl=url
)
def _get_discovery_url(self):
cfg = self._supp.cfg
return '{}://{}:{}{}'.format(
cfg.protocol,
cfg.host,
cfg.port,
cfg.discovery_url
)
def _trace(self, msg):
self._supp.trace(msg)
@property
def compute(self):
assert(self._compute is not None)
return self._compute
def validate_schema(self, value, schema_name):
validate(value, self._schema[schema_name])
class GCETestCase(base.BaseTestCase):
@property
def cfg(self):
assert(self._supp.cfg is not None)
return self._supp.cfg
@property
def api(self):
assert(self._api is not None)
return self._api
def trace(self, *args, **kwargs):
self._supp.trace(*args, **kwargs)
@classmethod
def setUpClass(cls):
cls._supp = TestSupp()
cls._api = GCEApi(cls._supp)
cls._api.init()
super(GCETestCase, cls).setUpClass()

332
tools/colorizer.py Executable file
View File

@ -0,0 +1,332 @@
#!/usr/bin/env python
# Copyright (c) 2013, Nebula, Inc.
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# Colorizer Code is borrowed from Twisted:
# Copyright (c) 2001-2010 Twisted Matrix Laboratories.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""Display a subunit stream through a colorized unittest test runner."""
import heapq
import sys
import unittest
import subunit
import testtools
class _AnsiColorizer(object):
"""
A colorizer is an object that loosely wraps around a stream, allowing
callers to write text to the stream in a particular color.
Colorizer classes must implement C{supported()} and C{write(text, color)}.
"""
_colors = dict(black=30, red=31, green=32, yellow=33,
blue=34, magenta=35, cyan=36, white=37)
def __init__(self, stream):
self.stream = stream
def supported(cls, stream=sys.stdout):
"""
A class method that returns True if the current platform supports
coloring terminal output using this method. Returns False otherwise.
"""
if not stream.isatty():
return False # auto color only on TTYs
try:
import curses
except ImportError:
return False
else:
try:
try:
return curses.tigetnum("colors") > 2
except curses.error:
curses.setupterm()
return curses.tigetnum("colors") > 2
except Exception:
# guess false in case of error
return False
supported = classmethod(supported)
def write(self, text, color):
"""
Write the given text to the stream in the given color.
@param text: Text to be written to the stream.
@param color: A string label for a color. e.g. 'red', 'white'.
"""
color = self._colors[color]
self.stream.write('\x1b[%s;1m%s\x1b[0m' % (color, text))
class _Win32Colorizer(object):
"""
See _AnsiColorizer docstring.
"""
def __init__(self, stream):
import win32console
red, green, blue, bold = (win32console.FOREGROUND_RED,
win32console.FOREGROUND_GREEN,
win32console.FOREGROUND_BLUE,
win32console.FOREGROUND_INTENSITY)
self.stream = stream
self.screenBuffer = win32console.GetStdHandle(
win32console.STD_OUT_HANDLE)
self._colors = {'normal': red | green | blue,
'red': red | bold,
'green': green | bold,
'blue': blue | bold,
'yellow': red | green | bold,
'magenta': red | blue | bold,
'cyan': green | blue | bold,
'white': red | green | blue | bold}
def supported(cls, stream=sys.stdout):
try:
import win32console
screenBuffer = win32console.GetStdHandle(
win32console.STD_OUT_HANDLE)
except ImportError:
return False
import pywintypes
try:
screenBuffer.SetConsoleTextAttribute(
win32console.FOREGROUND_RED |
win32console.FOREGROUND_GREEN |
win32console.FOREGROUND_BLUE)
except pywintypes.error:
return False
else:
return True
supported = classmethod(supported)
def write(self, text, color):
color = self._colors[color]
self.screenBuffer.SetConsoleTextAttribute(color)
self.stream.write(text)
self.screenBuffer.SetConsoleTextAttribute(self._colors['normal'])
class _NullColorizer(object):
"""
See _AnsiColorizer docstring.
"""
def __init__(self, stream):
self.stream = stream
def supported(cls, stream=sys.stdout):
return True
supported = classmethod(supported)
def write(self, text, color):
self.stream.write(text)
def get_elapsed_time_color(elapsed_time):
if elapsed_time > 1.0:
return 'red'
elif elapsed_time > 0.25:
return 'yellow'
else:
return 'green'
class ApiTestResult(testtools.TestResult):
def __init__(self, stream, descriptions, verbosity):
super(ApiTestResult, self).__init__()
self.stream = stream
self.showAll = verbosity > 1
self.num_slow_tests = 10
self.slow_tests = [] # this is a fixed-sized heap
self.colorizer = None
# NOTE(vish): reset stdout for the terminal check
stdout = sys.stdout
sys.stdout = sys.__stdout__
for colorizer in [_Win32Colorizer, _AnsiColorizer, _NullColorizer]:
if colorizer.supported():
self.colorizer = colorizer(self.stream)
break
sys.stdout = stdout
self.start_time = None
self.last_time = {}
self.results = {}
self.last_written = None
def _writeElapsedTime(self, elapsed):
color = get_elapsed_time_color(elapsed)
self.colorizer.write(" %.2f" % elapsed, color)
def _addResult(self, test, *args):
try:
name = test.id()
except AttributeError:
name = 'Unknown.unknown'
test_class, test_name = name.rsplit('.', 1)
elapsed = (self._now() - self.start_time).total_seconds()
item = (elapsed, test_class, test_name)
if len(self.slow_tests) >= self.num_slow_tests:
heapq.heappushpop(self.slow_tests, item)
else:
heapq.heappush(self.slow_tests, item)
self.results.setdefault(test_class, [])
self.results[test_class].append((test_name, elapsed) + args)
self.last_time[test_class] = self._now()
self.writeTests()
def _writeResult(self, test_name, elapsed, long_result, color,
short_result, success):
if self.showAll:
self.stream.write(' %s' % str(test_name).ljust(66))
self.colorizer.write(long_result, color)
if success:
self._writeElapsedTime(elapsed)
self.stream.writeln()
else:
self.colorizer.write(short_result, color)
def addSuccess(self, test):
super(ApiTestResult, self).addSuccess(test)
self._addResult(test, 'OK', 'green', '.', True)
def addFailure(self, test, err):
if test.id() == 'process-returncode':
return
super(ApiTestResult, self).addFailure(test, err)
self._addResult(test, 'FAIL', 'red', 'F', False)
def addError(self, test, err):
super(ApiTestResult, self).addFailure(test, err)
self._addResult(test, 'ERROR', 'red', 'E', False)
def addSkip(self, test, reason=None, details=None):
super(ApiTestResult, self).addSkip(test, reason, details)
self._addResult(test, 'SKIP', 'blue', 'S', True)
def startTest(self, test):
self.start_time = self._now()
super(ApiTestResult, self).startTest(test)
def writeTestCase(self, cls):
if not self.results.get(cls):
return
if cls != self.last_written:
self.colorizer.write(cls, 'white')
self.stream.writeln()
for result in self.results[cls]:
self._writeResult(*result)
del self.results[cls]
self.stream.flush()
self.last_written = cls
def writeTests(self):
time = self.last_time.get(self.last_written, self._now())
if not self.last_written or (self._now() - time).total_seconds() > 2.0:
diff = 3.0
while diff > 2.0:
classes = self.results.keys()
oldest = min(classes, key=lambda x: self.last_time[x])
diff = (self._now() - self.last_time[oldest]).total_seconds()
self.writeTestCase(oldest)
else:
self.writeTestCase(self.last_written)
def done(self):
self.stopTestRun()
def stopTestRun(self):
for cls in list(self.results.iterkeys()):
self.writeTestCase(cls)
self.stream.writeln()
self.writeSlowTests()
def writeSlowTests(self):
# Pare out 'fast' tests
slow_tests = [item for item in self.slow_tests
if get_elapsed_time_color(item[0]) != 'green']
if slow_tests:
slow_total_time = sum(item[0] for item in slow_tests)
slow = ("Slowest %i tests took %.2f secs:"
% (len(slow_tests), slow_total_time))
self.colorizer.write(slow, 'yellow')
self.stream.writeln()
last_cls = None
# sort by name
for elapsed, cls, name in sorted(slow_tests,
key=lambda x: x[1] + x[2]):
if cls != last_cls:
self.colorizer.write(cls, 'white')
self.stream.writeln()
last_cls = cls
self.stream.write(' %s' % str(name).ljust(68))
self._writeElapsedTime(elapsed)
self.stream.writeln()
def printErrors(self):
if self.showAll:
self.stream.writeln()
self.printErrorList('ERROR', self.errors)
self.printErrorList('FAIL', self.failures)
def printErrorList(self, flavor, errors):
for test, err in errors:
self.colorizer.write("=" * 70, 'red')
self.stream.writeln()
self.colorizer.write(flavor, 'red')
self.stream.writeln(": %s" % test.id())
self.colorizer.write("-" * 70, 'red')
self.stream.writeln()
self.stream.writeln("%s" % err)
test = subunit.ProtocolTestCase(sys.stdin, passthrough=None)
if sys.version_info[0:2] <= (2, 6):
runner = unittest.TextTestRunner(verbosity=2)
else:
runner = unittest.TextTestRunner(verbosity=2, resultclass=ApiTestResult)
if runner.run(test).wasSuccessful():
exit_code = 0
else:
exit_code = 1
sys.exit(exit_code)