Fix networking-brocade unit tests changes and vyatta imports

Change-Id: Idb888fafdb14730d8a9cfaec124e1e8207d115a2
changes/38/159238/2
Sripriya 8 years ago
parent 208d2b40b2
commit e28f985a89

@ -2,6 +2,6 @@
test_command=OS_STDOUT_CAPTURE=${OS_STDOUT_CAPTURE:-1} \
OS_STDERR_CAPTURE=${OS_STDERR_CAPTURE:-1} \
OS_TEST_TIMEOUT=${OS_TEST_TIMEOUT:-60} \
${PYTHON:-python} -m subunit.run discover -t ./ ./networking_brocade/tests $LISTOPT $IDOPTION
${PYTHON:-python} -m subunit.run discover -t ./ ./networking_brocade/test_discover $LISTOPT $IDOPTION
test_id_option=--load-list $IDFILE
test_list_option=--list

@ -0,0 +1,35 @@
# Copyright 2013 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import sys
if sys.version_info >= (2, 7):
import unittest
else:
import unittest2 as unittest
def load_tests(loader, tests, pattern):
suite = unittest.TestSuite()
base_path = os.path.split(os.path.dirname(os.path.abspath(__file__)))[0]
base_path = os.path.split(base_path)[0]
for test_dir in ['./networking_brocade/tests',
'./networking_brocade/vyatta/tests']:
if not pattern:
suite.addTests(loader.discover(test_dir, top_level_dir=base_path))
else:
suite.addTests(loader.discover(test_dir, pattern=pattern,
top_level_dir=base_path))
return suite

@ -21,8 +21,8 @@ from sqlalchemy.orm import exc as orm_exc
from neutron.db import model_base
from neutron.db import models_v2
from vyatta.common import config as vyatta_config
from vyatta.common import exceptions as v_exc
from networking_brocade.vyatta.common import config as vyatta_config
from networking_brocade.vyatta.common import exceptions as v_exc
@six.add_metaclass(abc.ABCMeta)

@ -20,9 +20,9 @@ from neutron.agent.l3 import router_info
from neutron.agent import l3_agent as entry
from neutron.common import constants as l3_constants
from vyatta.common import config as vyatta_config
from vyatta.common import exceptions as v_exc
from vyatta.vrouter import client as vyatta_client
from networking_brocade.vyatta.common import config as vyatta_config
from networking_brocade.vyatta.common import exceptions as v_exc
from networking_brocade.vyatta.vrouter import client as vyatta_client
_KEY_VYATTA_EXTRA_DATA = '_vyatta'

@ -19,7 +19,7 @@ import functools
import itertools
import re
from vyatta.common import exceptions as v_exc
from networking_brocade.vyatta.common import exceptions as v_exc
HWADDR = r'(?:[a-zA-Z0-9]{2}:){5}[a-zA-Z0-9]{2}'
IPv4_ADDR = r'(?:\d+\.){3}\d+'

@ -0,0 +1,51 @@
# Copyright 2015 Brocade Communications System, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import neutron
from neutron.tests import base as n_base
from neutron.tests.unit import test_db_plugin
from neutron.tests.unit import testlib_api
from oslo_config import cfg
def override_nvalues():
neutron_path = os.path.abspath(
os.path.join(os.path.dirname(neutron.__file__), os.pardir))
neutron_policy = os.path.join(neutron_path, 'etc/policy.json')
cfg.CONF.set_override('policy_file', neutron_policy)
class BaseTestCase(n_base.BaseTestCase):
def setUp(self):
override_nvalues()
super(BaseTestCase, self).setUp()
class NeutronDbPluginV2TestCase(test_db_plugin.NeutronDbPluginV2TestCase):
def setUp(self, plugin=None, service_plugins=None, ext_mgr=None):
override_nvalues()
super(NeutronDbPluginV2TestCase, self).setUp(
plugin, service_plugins, ext_mgr)
class SqlTestCase(testlib_api.SqlTestCase):
def setUp(self):
override_nvalues()
super(SqlTestCase, self).setUp()

@ -20,11 +20,11 @@ import urllib
from neutron import context
from neutron.db import models_v2
from neutron.openstack.common import uuidutils
from neutron.tests import base
from vyatta.common import utils as vyatta_utils
from vyatta.vrouter import client as vyatta_client
from vyatta.vrouter import driver as vrouter_driver
from networking_brocade.vyatta.common import utils as vyatta_utils
from networking_brocade.vyatta.tests import base
from networking_brocade.vyatta.vrouter import client as vyatta_client
from networking_brocade.vyatta.vrouter import driver as vrouter_driver
_uuid = uuidutils.generate_uuid

@ -1,4 +1,4 @@
# Copyright 2015 OpenStack Foundation.
# Copyright 2015 Brocade Communications System, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -22,13 +22,12 @@ from neutron.db import external_net_db
from neutron.db import models_v2
from neutron.extensions import l3
from neutron.openstack.common import uuidutils
from neutron.tests.unit import test_db_plugin
from neutron.tests.unit import test_l3_plugin
from neutron.tests.unit import testlib_api
from neutron.tests.unit import testlib_plugin
from vyatta.common import utils as vyatta_utils
from vyatta.vrouter import neutron_plugin as vrouter_plugin
from networking_brocade.vyatta.common import utils as vyatta_utils
from networking_brocade.vyatta.tests import base
from networking_brocade.vyatta.vrouter import neutron_plugin as vrouter_plugin
_uuid = uuidutils.generate_uuid
@ -46,7 +45,7 @@ class VRouterTestPlugin(vrouter_plugin.VyattaVRouterMixin,
super(VRouterTestPlugin, self).delete_port(context, port_id)
class TestVyattaVRouterPlugin(testlib_api.SqlTestCase,
class TestVyattaVRouterPlugin(base.SqlTestCase,
testlib_plugin.PluginSetupHelper):
def setUp(self):
super(TestVyattaVRouterPlugin, self).setUp()
@ -60,7 +59,8 @@ class TestVyattaVRouterPlugin(testlib_api.SqlTestCase,
fake_driver_mock.return_value = self.driver
self._mock(
'vyatta.vrouter.driver.VyattaVRouterDriver', fake_driver_mock)
'networking_brocade.vyatta.vrouter.driver.VyattaVRouterDriver',
fake_driver_mock)
self.context = context.get_admin_context()
self.plugin = VRouterTestPlugin()
@ -315,15 +315,17 @@ class TestVyattaVRouterPlugin(testlib_api.SqlTestCase,
None)
CORE_PLUGIN_CLASS = (
"vyatta.tests.test_vrouter_neutron_plugin.TestVRouterNatPlugin")
L3_PLUGIN_CLASS = "vyatta.vrouter.neutron_plugin.VyattaVRouterMixin"
"networking_brocade.vyatta.tests.test_vrouter_neutron_plugin"
".TestVRouterNatPlugin")
L3_PLUGIN_CLASS = (
"networking_brocade.vyatta.vrouter.neutron_plugin.VyattaVRouterMixin")
class TestVRouterNatPlugin(test_l3_plugin.TestL3NatBasePlugin):
supported_extension_aliases = ["external-net"]
class VRouterTestCase(test_db_plugin.NeutronDbPluginV2TestCase,
class VRouterTestCase(base.NeutronDbPluginV2TestCase,
test_l3_plugin.L3NatTestCaseBase,
testlib_plugin.NotificationSetupHelper):
def setUp(self, core_plugin=None, l3_plugin=None, ext_mgr=None):
@ -338,7 +340,7 @@ class VRouterTestCase(test_db_plugin.NeutronDbPluginV2TestCase,
self._mock('eventlet.greenthread.sleep')
self._mock(
'vyatta.vrouter.driver.'
'networking_brocade.vyatta.vrouter.driver.'
'VyattaVRouterDriver', FakeVRouterDriver)
cfg.CONF.set_default('allow_overlapping_ips', True)

@ -24,9 +24,9 @@ from oslo_utils import excutils
import six
from neutron.i18n import _LE, _LI
from vyatta.common import exceptions as v_exc
from vyatta.common import parsers as v_parsers
from vyatta.vrouter import client as vyatta_client
from networking_brocade.vyatta.common import exceptions as v_exc
from networking_brocade.vyatta.common import parsers as v_parsers
from networking_brocade.vyatta.vrouter import client as vyatta_client
LOG = logging.getLogger(__name__)

@ -24,11 +24,11 @@ from oslo_serialization import jsonutils
from oslo_utils import excutils
import requests
from vyatta.common import config
from vyatta.common import exceptions as v_exc
from vyatta.common import globals as vyatta_globals
from vyatta.common import parsers
from vyatta.common import utils as vyatta_utils
from networking_brocade.vyatta.common import config
from networking_brocade.vyatta.common import exceptions as v_exc
from networking_brocade.vyatta.common import globals as vyatta_globals
from networking_brocade.vyatta.common import parsers
from networking_brocade.vyatta.common import utils as vyatta_utils
LOG = logging.getLogger(__name__)

@ -26,9 +26,9 @@ from neutron.openstack.common import log as logging
from novaclient import exceptions as nova_exc
from novaclient.v1_1 import client as novaclient
from vyatta.common import config
from vyatta.common import exceptions as v_exc
from vyatta.vrouter import client as vyatta_client
from networking_brocade.vyatta.common import config
from networking_brocade.vyatta.common import exceptions as v_exc
from networking_brocade.vyatta.vrouter import client as vyatta_client
LOG = logging.getLogger(__name__)

@ -37,10 +37,10 @@ from neutron.i18n import _LE
from neutron.openstack.common import log as logging
from neutron.plugins.common import constants
from vyatta.common import config
from vyatta.common import exceptions as v_exc
from vyatta.common import utils as vyatta_utils
from vyatta.vrouter import driver as vrouter_driver
from networking_brocade.vyatta.common import config
from networking_brocade.vyatta.common import exceptions as v_exc
from networking_brocade.vyatta.common import utils as vyatta_utils
from networking_brocade.vyatta.vrouter import driver as vrouter_driver
LOG = logging.getLogger(__name__)

@ -0,0 +1,6 @@
#! /bin/sh
TESTRARGS=$1
exec 3>&1
status=$(exec 4>&1 >&3; ( lockutils-wrapper python setup.py testr --slowest --testr-args="--subunit $TESTRARGS"; echo $? >&4 ) | $(dirname $0)/subunit-trace.py -f) && exit $status

@ -0,0 +1,306 @@
#!/usr/bin/env python
# Copyright 2014 Hewlett-Packard Development Company, L.P.
# Copyright 2014 Samsung Electronics
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Trace a subunit stream in reasonable detail and high accuracy."""
import argparse
import functools
import os
import re
import sys
import mimeparse
import subunit
import testtools
DAY_SECONDS = 60 * 60 * 24
FAILS = []
RESULTS = {}
class Starts(testtools.StreamResult):
def __init__(self, output):
super(Starts, self).__init__()
self._output = output
def startTestRun(self):
self._neednewline = False
self._emitted = set()
def status(self, test_id=None, test_status=None, test_tags=None,
runnable=True, file_name=None, file_bytes=None, eof=False,
mime_type=None, route_code=None, timestamp=None):
super(Starts, self).status(
test_id, test_status,
test_tags=test_tags, runnable=runnable, file_name=file_name,
file_bytes=file_bytes, eof=eof, mime_type=mime_type,
route_code=route_code, timestamp=timestamp)
if not test_id:
if not file_bytes:
return
if not mime_type or mime_type == 'test/plain;charset=utf8':
mime_type = 'text/plain; charset=utf-8'
primary, sub, parameters = mimeparse.parse_mime_type(mime_type)
content_type = testtools.content_type.ContentType(
primary, sub, parameters)
content = testtools.content.Content(
content_type, lambda: [file_bytes])
text = content.as_text()
if text and text[-1] not in '\r\n':
self._neednewline = True
self._output.write(text)
elif test_status == 'inprogress' and test_id not in self._emitted:
if self._neednewline:
self._neednewline = False
self._output.write('\n')
worker = ''
for tag in test_tags or ():
if tag.startswith('worker-'):
worker = '(' + tag[7:] + ') '
if timestamp:
timestr = timestamp.isoformat()
else:
timestr = ''
self._output.write('%s: %s%s [start]\n' %
(timestr, worker, test_id))
self._emitted.add(test_id)
def cleanup_test_name(name, strip_tags=True, strip_scenarios=False):
"""Clean up the test name for display.
By default we strip out the tags in the test because they don't help us
in identifying the test that is run to it's result.
Make it possible to strip out the testscenarios information (not to
be confused with tempest scenarios) however that's often needed to
indentify generated negative tests.
"""
if strip_tags:
tags_start = name.find('[')
tags_end = name.find(']')
if tags_start > 0 and tags_end > tags_start:
newname = name[:tags_start]
newname += name[tags_end + 1:]
name = newname
if strip_scenarios:
tags_start = name.find('(')
tags_end = name.find(')')
if tags_start > 0 and tags_end > tags_start:
newname = name[:tags_start]
newname += name[tags_end + 1:]
name = newname
return name
def get_duration(timestamps):
start, end = timestamps
if not start or not end:
duration = ''
else:
delta = end - start
duration = '%d.%06ds' % (
delta.days * DAY_SECONDS + delta.seconds, delta.microseconds)
return duration
def find_worker(test):
for tag in test['tags']:
if tag.startswith('worker-'):
return int(tag[7:])
return 'NaN'
# Print out stdout/stderr if it exists, always
def print_attachments(stream, test, all_channels=False):
"""Print out subunit attachments.
Print out subunit attachments that contain content. This
runs in 2 modes, one for successes where we print out just stdout
and stderr, and an override that dumps all the attachments.
"""
channels = ('stdout', 'stderr')
for name, detail in test['details'].items():
# NOTE(sdague): the subunit names are a little crazy, and actually
# are in the form pythonlogging:'' (with the colon and quotes)
name = name.split(':')[0]
if detail.content_type.type == 'test':
detail.content_type.type = 'text'
if (all_channels or name in channels) and detail.as_text():
title = "Captured %s:" % name
stream.write("\n%s\n%s\n" % (title, ('~' * len(title))))
# indent attachment lines 4 spaces to make them visually
# offset
for line in detail.as_text().split('\n'):
stream.write(" %s\n" % line)
def show_outcome(stream, test, print_failures=False, failonly=False):
global RESULTS
status = test['status']
# TODO(sdague): ask lifeless why on this?
if status == 'exists':
return
worker = find_worker(test)
name = cleanup_test_name(test['id'])
duration = get_duration(test['timestamps'])
if worker not in RESULTS:
RESULTS[worker] = []
RESULTS[worker].append(test)
# don't count the end of the return code as a fail
if name == 'process-returncode':
return
if status == 'fail':
FAILS.append(test)
stream.write('{%s} %s [%s] ... FAILED\n' % (
worker, name, duration))
if not print_failures:
print_attachments(stream, test, all_channels=True)
elif not failonly:
if status == 'success':
stream.write('{%s} %s [%s] ... ok\n' % (
worker, name, duration))
print_attachments(stream, test)
elif status == 'skip':
stream.write('{%s} %s ... SKIPPED: %s\n' % (
worker, name, test['details']['reason'].as_text()))
else:
stream.write('{%s} %s [%s] ... %s\n' % (
worker, name, duration, test['status']))
if not print_failures:
print_attachments(stream, test, all_channels=True)
stream.flush()
def print_fails(stream):
"""Print summary failure report.
Currently unused, however there remains debate on inline vs. at end
reporting, so leave the utility function for later use.
"""
if not FAILS:
return
stream.write("\n==============================\n")
stream.write("Failed %s tests - output below:" % len(FAILS))
stream.write("\n==============================\n")
for f in FAILS:
stream.write("\n%s\n" % f['id'])
stream.write("%s\n" % ('-' * len(f['id'])))
print_attachments(stream, f, all_channels=True)
stream.write('\n')
def count_tests(key, value):
count = 0
for k, v in RESULTS.items():
for item in v:
if key in item:
if re.search(value, item[key]):
count += 1
return count
def run_time():
runtime = 0.0
for k, v in RESULTS.items():
for test in v:
runtime += float(get_duration(test['timestamps']).strip('s'))
return runtime
def worker_stats(worker):
tests = RESULTS[worker]
num_tests = len(tests)
delta = tests[-1]['timestamps'][1] - tests[0]['timestamps'][0]
return num_tests, delta
def print_summary(stream):
stream.write("\n======\nTotals\n======\n")
stream.write("Run: %s in %s sec.\n" % (count_tests('status', '.*'),
run_time()))
stream.write(" - Passed: %s\n" % count_tests('status', 'success'))
stream.write(" - Skipped: %s\n" % count_tests('status', 'skip'))
stream.write(" - Failed: %s\n" % count_tests('status', 'fail'))
# we could have no results, especially as we filter out the process-codes
if RESULTS:
stream.write("\n==============\nWorker Balance\n==============\n")
for w in range(max(RESULTS.keys()) + 1):
if w not in RESULTS:
stream.write(
" - WARNING: missing Worker %s! "
"Race in testr accounting.\n" % w)
else:
num, time = worker_stats(w)
stream.write(" - Worker %s (%s tests) => %ss\n" %
(w, num, time))
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument('--no-failure-debug', '-n', action='store_true',
dest='print_failures', help='Disable printing failure '
'debug information in realtime')
parser.add_argument('--fails', '-f', action='store_true',
dest='post_fails', help='Print failure debug '
'information after the stream is proccesed')
parser.add_argument('--failonly', action='store_true',
dest='failonly', help="Don't print success items",
default=(
os.environ.get('TRACE_FAILONLY', False)
is not False))
return parser.parse_args()
def main():
args = parse_args()
stream = subunit.ByteStreamToStreamResult(
sys.stdin, non_subunit_name='stdout')
starts = Starts(sys.stdout)
outcomes = testtools.StreamToDict(
functools.partial(show_outcome, sys.stdout,
print_failures=args.print_failures,
failonly=args.failonly))
summary = testtools.StreamSummary()
result = testtools.CopyStreamResult([starts, outcomes, summary])
result.startTestRun()
try:
stream.run(result)
finally:
result.stopTestRun()
if count_tests('status', '.*') == 0:
print("The test run didn't actually run any tests")
return 1
if args.post_fails:
print_fails(sys.stdout)
print_summary(sys.stdout)
return (0 if summary.wasSuccessful() else 1)
if __name__ == '__main__':
sys.exit(main())

@ -11,7 +11,9 @@ setenv =
deps = -egit+https://git.openstack.org/openstack/neutron#egg=neutron
-r{toxinidir}/requirements.txt
-r{toxinidir}/test-requirements.txt
commands = python setup.py testr --slowest --testr-args='{posargs}'
whitelist_externals = sh
commands =
sh tools/pretty_tox.sh '{posargs}'
[testenv:pep8]
commands = flake8

Loading…
Cancel
Save