Added db setup and teardown with sqlite

This commit is contained in:
Chris Alfonso 2012-04-19 18:10:20 -04:00
parent 9c69836bfd
commit bdaed9b997
14 changed files with 673 additions and 61 deletions

1
.gitignore vendored
View File

@ -6,3 +6,4 @@ heat.egg-info
heat/vcsversion.py heat/vcsversion.py
tags tags
*.log *.log
heat/tests/heat-test.db

View File

@ -178,8 +178,6 @@ class HeatEngineConfigOpts(cfg.CommonConfigOpts):
help='port for os volume api to listen'), help='port for os volume api to listen'),
] ]
db_opts = [ db_opts = [
cfg.StrOpt('db_backend', default='heat.db.sqlalchemy.api',
help='The backend to use for db'),
cfg.StrOpt('sql_connection', cfg.StrOpt('sql_connection',
default='mysql://heat:heat@localhost/heat', default='mysql://heat:heat@localhost/heat',
help='The SQLAlchemy connection string used to connect to the ' help='The SQLAlchemy connection string used to connect to the '

View File

@ -25,19 +25,32 @@ Usage:
The underlying driver is loaded . SQLAlchemy is currently the only The underlying driver is loaded . SQLAlchemy is currently the only
supported backend. supported backend.
''' '''
import heat.utils
from heat.openstack.common import utils from heat.openstack.common import utils
from heat.openstack.common import cfg
from heat.common import config
import heat.utils
SQL_CONNECTION = 'sqlite:///heat-test.db/'
SQL_IDLE_TIMEOUT = 3600
db_opts = [
cfg.StrOpt('db_backend',
default='sqlalchemy',
help='The backend to use for db'),
]
conf = config.HeatEngineConfigOpts()
conf.db_backend = 'heat.db.sqlalchemy.api'
IMPL = heat.utils.LazyPluggable('db_backend',
sqlalchemy='heat.db.sqlalchemy.api')
def configure(conf): def configure(conf):
global IMPL
global SQL_CONNECTION global SQL_CONNECTION
global SQL_IDLE_TIMEOUT global SQL_IDLE_TIMEOUT
IMPL = utils.import_object(conf.db_backend)
SQL_CONNECTION = conf.sql_connection SQL_CONNECTION = conf.sql_connection
SQL_IDLE_TIMEOUT = conf.sql_idle_timeout SQL_IDLE_TIMEOUT = conf.sql_idle_timeout
def raw_template_get(context, template_id): def raw_template_get(context, template_id):
return IMPL.raw_template_get(context, template_id) return IMPL.raw_template_get(context, template_id)

31
heat/db/migration.py Normal file
View File

@ -0,0 +1,31 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Database setup and migration commands."""
from heat import utils
IMPL = utils.LazyPluggable('db_backend',
sqlalchemy='heat.db.sqlalchemy.migration')
def db_sync(version=None):
"""Migrate the database to `version` or the most recent version."""
return IMPL.db_sync(version=version)
def db_version():
"""Display the current database version."""
return IMPL.db_version()

View File

@ -0,0 +1,111 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import distutils.version as dist_version
import os
import sys
from heat.db.sqlalchemy.session import get_engine
import sqlalchemy
import migrate
from migrate.versioning import util as migrate_util
_REPOSITORY = None
@migrate_util.decorator
def patched_with_engine(f, *a, **kw):
url = a[0]
engine = migrate_util.construct_engine(url, **kw)
try:
kw['engine'] = engine
return f(*a, **kw)
finally:
if isinstance(engine, migrate_util.Engine) and engine is not url:
migrate_util.log.debug('Disposing SQLAlchemy engine %s', engine)
engine.dispose()
# TODO(jkoelker) When migrate 0.7.3 is released and nova depends
# on that version or higher, this can be removed
MIN_PKG_VERSION = dist_version.StrictVersion('0.7.3')
if (not hasattr(migrate, '__version__') or
dist_version.StrictVersion(migrate.__version__) < MIN_PKG_VERSION):
migrate_util.with_engine = patched_with_engine
# NOTE(jkoelker) Delay importing migrate until we are patched
from migrate.versioning import api as versioning_api
from migrate.versioning.repository import Repository
try:
from migrate.versioning import exceptions as versioning_exceptions
except ImportError:
try:
from migrate import exceptions as versioning_exceptions
except ImportError:
sys.exit(_("python-migrate is not installed. Exiting."))
#_REPOSITORY = None
def db_sync(version=None):
if version is not None:
try:
version = int(version)
except ValueError:
raise exception.Error(_("version should be an integer"))
current_version = db_version()
repository = _find_migrate_repo()
if version is None or version > current_version:
return versioning_api.upgrade(get_engine(), repository, version)
else:
return versioning_api.downgrade(get_engine(), repository,
version)
def db_version():
repository = _find_migrate_repo()
try:
return versioning_api.db_version(get_engine(), repository)
except versioning_exceptions.DatabaseNotControlledError:
# If we aren't version controlled we may already have the database
# in the state from before we started version control, check for that
# and set up version_control appropriately
meta = sqlalchemy.MetaData()
engine = get_engine()
meta.reflect(bind=engine)
try:
for table in ('stack', 'resource', 'event',
'parsed_template', 'raw_template'):
assert table in meta.tables
return db_version_control(1)
except AssertionError:
return db_version_control(0)
def db_version_control(version=None):
repository = _find_migrate_repo()
versioning_api.version_control(get_engine(), repository, version)
return version
def _find_migrate_repo():
"""Get the path for the migrate repository."""
path = os.path.join(os.path.abspath(os.path.dirname(__file__)),
'migrate_repo')
assert os.path.exists(path)
global _REPOSITORY
if _REPOSITORY is None:
_REPOSITORY = Repository(path)
return _REPOSITORY

View File

@ -20,7 +20,6 @@ import os
import string import string
import json import json
import sys import sys
from email import encoders from email import encoders
from email.message import Message from email.message import Message
from email.mime.base import MIMEBase from email.mime.base import MIMEBase
@ -78,7 +77,6 @@ class Resource(object):
self.instance_id = None self.instance_id = None
self.state = None self.state = None
self.id = None self.id = None
self._nova = {} self._nova = {}
if not 'Properties' in self.t: if not 'Properties' in self.t:
# make a dummy entry to prevent having to check all over the # make a dummy entry to prevent having to check all over the
@ -601,7 +599,6 @@ class Instance(Resource):
msg = MIMEText(userdata, _subtype='x-shellscript') msg = MIMEText(userdata, _subtype='x-shellscript')
msg.add_header('Content-Disposition', 'attachment', filename='startup') msg.add_header('Content-Disposition', 'attachment', filename='startup')
mime_blob.attach(msg) mime_blob.attach(msg)
server = self.nova().servers.create(name=self.name, image=image_id, server = self.nova().servers.create(name=self.name, image=image_id,
flavor=flavor_id, flavor=flavor_id,
key_name=key_name, key_name=key_name,

60
heat/testing/README.rst Normal file
View File

@ -0,0 +1,60 @@
=====================================
Heat Testing Infrastructure
=====================================
A note of clarification is in order, to help those who are new to testing in
Heat:
- actual unit tests are created in the "tests" directory;
- the "testing" directory is used to house the infrastructure needed to support
testing in Heat.
This README file attempts to provide current and prospective contributors with
everything they need to know in order to start creating unit tests and
utilizing the convenience code provided in nova.testing.
Test Types: Unit vs. Functional vs. Integration
-----------------------------------------------
TBD
Writing Unit Tests
------------------
TBD
Using Fakes
~~~~~~~~~~~
TBD
test.TestCase
-------------
The TestCase class from heat.test (generally imported as test) will
automatically manage self.stubs using the stubout module and self.mox
using the mox module during the setUp step. They will automatically
verify and clean up during the tearDown step.
If using test.TestCase, calling the super class setUp is required and
calling the super class tearDown is required to be last if tearDown
is overriden.
Writing Functional Tests
------------------------
TBD
Writing Integration Tests
-------------------------
TBD
Tests and assertRaises
----------------------
When asserting that a test should raise an exception, test against the
most specific exception possible. An overly broad exception type (like
Exception) can mask errors in the unit test itself.
Example::
TBD

0
heat/testing/__init__.py Normal file
View File

View File

@ -0,0 +1 @@
import rabbit

362
heat/testing/runner.py Normal file
View File

@ -0,0 +1,362 @@
#!/usr/bin/env python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Colorizer Code is borrowed from Twisted:
# Copyright (c) 2001-2010 Twisted Matrix Laboratories.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
"""Unittest runner for Heat.
To run all tests
python heat/testing/runner.py
To run a single test module:
python heat/testing/runner.py test_resources
To run a single test:
python heat/testing/runner.py
test_resources:ResourceTestCase.test_resource_from_template
"""
import gettext
import heapq
import os
import unittest
import sys
import time
import eventlet
from nose import config
from nose import core
from nose import result
gettext.install('heat', unicode=1)
reldir = os.path.join(os.path.dirname(__file__), '..', '..')
absdir = os.path.abspath(reldir)
sys.path.insert(0, absdir)
from heat.openstack.common import cfg
class _AnsiColorizer(object):
"""
A colorizer is an object that loosely wraps around a stream, allowing
callers to write text to the stream in a particular color.
Colorizer classes must implement C{supported()} and C{write(text, color)}.
"""
_colors = dict(black=30, red=31, green=32, yellow=33,
blue=34, magenta=35, cyan=36, white=37)
def __init__(self, stream):
self.stream = stream
def supported(cls, stream=sys.stdout):
"""
A class method that returns True if the current platform supports
coloring terminal output using this method. Returns False otherwise.
"""
if not stream.isatty():
return False # auto color only on TTYs
try:
import curses
except ImportError:
return False
else:
try:
try:
return curses.tigetnum("colors") > 2
except curses.error:
curses.setupterm()
return curses.tigetnum("colors") > 2
except Exception:
raise
# guess false in case of error
return False
supported = classmethod(supported)
def write(self, text, color):
"""
Write the given text to the stream in the given color.
@param text: Text to be written to the stream.
@param color: A string label for a color. e.g. 'red', 'white'.
"""
color = self._colors[color]
self.stream.write('\x1b[%s;1m%s\x1b[0m' % (color, text))
class _Win32Colorizer(object):
"""
See _AnsiColorizer docstring.
"""
def __init__(self, stream):
import win32console as win
red, green, blue, bold = (win.FOREGROUND_RED, win.FOREGROUND_GREEN,
win.FOREGROUND_BLUE, win.FOREGROUND_INTENSITY)
self.stream = stream
self.screenBuffer = win.GetStdHandle(win.STD_OUT_HANDLE)
self._colors = {
'normal': red | green | blue,
'red': red | bold,
'green': green | bold,
'blue': blue | bold,
'yellow': red | green | bold,
'magenta': red | blue | bold,
'cyan': green | blue | bold,
'white': red | green | blue | bold
}
def supported(cls, stream=sys.stdout):
try:
import win32console
screenBuffer = win32console.GetStdHandle(
win32console.STD_OUT_HANDLE)
except ImportError:
return False
import pywintypes
try:
screenBuffer.SetConsoleTextAttribute(
win32console.FOREGROUND_RED |
win32console.FOREGROUND_GREEN |
win32console.FOREGROUND_BLUE)
except pywintypes.error:
return False
else:
return True
supported = classmethod(supported)
def write(self, text, color):
color = self._colors[color]
self.screenBuffer.SetConsoleTextAttribute(color)
self.stream.write(text)
self.screenBuffer.SetConsoleTextAttribute(self._colors['normal'])
class _NullColorizer(object):
"""
See _AnsiColorizer docstring.
"""
def __init__(self, stream):
self.stream = stream
def supported(cls, stream=sys.stdout):
return True
supported = classmethod(supported)
def write(self, text, color):
self.stream.write(text)
def get_elapsed_time_color(elapsed_time):
if elapsed_time > 1.0:
return 'red'
elif elapsed_time > 0.25:
return 'yellow'
else:
return 'green'
class HeatTestResult(result.TextTestResult):
def __init__(self, *args, **kw):
self.show_elapsed = kw.pop('show_elapsed')
result.TextTestResult.__init__(self, *args, **kw)
self.num_slow_tests = 5
self.slow_tests = [] # this is a fixed-sized heap
self._last_case = None
self.colorizer = None
# NOTE(vish): reset stdout for the terminal check
stdout = sys.stdout
sys.stdout = sys.__stdout__
for colorizer in [_Win32Colorizer, _AnsiColorizer, _NullColorizer]:
if colorizer.supported():
self.colorizer = colorizer(self.stream)
break
sys.stdout = stdout
# NOTE(lorinh): Initialize start_time in case a sqlalchemy-migrate
# error results in it failing to be initialized later. Otherwise,
# _handleElapsedTime will fail, causing the wrong error message to
# be outputted.
self.start_time = time.time()
def getDescription(self, test):
return str(test)
def _handleElapsedTime(self, test):
self.elapsed_time = time.time() - self.start_time
item = (self.elapsed_time, test)
# Record only the n-slowest tests using heap
if len(self.slow_tests) >= self.num_slow_tests:
heapq.heappushpop(self.slow_tests, item)
else:
heapq.heappush(self.slow_tests, item)
def _writeElapsedTime(self, test):
color = get_elapsed_time_color(self.elapsed_time)
self.colorizer.write(" %.2f" % self.elapsed_time, color)
def _writeResult(self, test, long_result, color, short_result, success):
if self.showAll:
self.colorizer.write(long_result, color)
if self.show_elapsed and success:
self._writeElapsedTime(test)
self.stream.writeln()
elif self.dots:
self.stream.write(short_result)
self.stream.flush()
# NOTE(vish): copied from unittest with edit to add color
def addSuccess(self, test):
unittest.TestResult.addSuccess(self, test)
self._handleElapsedTime(test)
self._writeResult(test, 'OK', 'green', '.', True)
# NOTE(vish): copied from unittest with edit to add color
def addFailure(self, test, err):
unittest.TestResult.addFailure(self, test, err)
self._handleElapsedTime(test)
self._writeResult(test, 'FAIL', 'red', 'F', False)
# NOTE(vish): copied from nose with edit to add color
def addError(self, test, err):
"""Overrides normal addError to add support for
errorClasses. If the exception is a registered class, the
error will be added to the list for that class, not errors.
"""
self._handleElapsedTime(test)
stream = getattr(self, 'stream', None)
ec, ev, tb = err
try:
exc_info = self._exc_info_to_string(err, test)
except TypeError:
# 2.3 compat
exc_info = self._exc_info_to_string(err)
for cls, (storage, label, isfail) in self.errorClasses.items():
if result.isclass(ec) and issubclass(ec, cls):
if isfail:
test.passed = False
storage.append((test, exc_info))
# Might get patched into a streamless result
if stream is not None:
if self.showAll:
message = [label]
detail = result._exception_detail(err[1])
if detail:
message.append(detail)
stream.writeln(": ".join(message))
elif self.dots:
stream.write(label[:1])
return
self.errors.append((test, exc_info))
test.passed = False
if stream is not None:
self._writeResult(test, 'ERROR', 'red', 'E', False)
def startTest(self, test):
unittest.TestResult.startTest(self, test)
self.start_time = time.time()
current_case = test.test.__class__.__name__
if self.showAll:
if current_case != self._last_case:
self.stream.writeln(current_case)
self._last_case = current_case
self.stream.write(
' %s' % str(test.test._testMethodName).ljust(60))
self.stream.flush()
class HeatTestRunner(core.TextTestRunner):
def __init__(self, *args, **kwargs):
self.show_elapsed = kwargs.pop('show_elapsed')
core.TextTestRunner.__init__(self, *args, **kwargs)
def _makeResult(self):
return HeatTestResult(self.stream,
self.descriptions,
self.verbosity,
self.config,
show_elapsed=self.show_elapsed)
def _writeSlowTests(self, result_):
# Pare out 'fast' tests
slow_tests = [item for item in result_.slow_tests
if get_elapsed_time_color(item[0]) != 'green']
if slow_tests:
slow_total_time = sum(item[0] for item in slow_tests)
self.stream.writeln("Slowest %i tests took %.2f secs:"
% (len(slow_tests), slow_total_time))
for elapsed_time, test in sorted(slow_tests, reverse=True):
time_str = "%.2f" % elapsed_time
self.stream.writeln(" %s %s" % (time_str.ljust(10), test))
def run(self, test):
result_ = core.TextTestRunner.run(self, test)
if self.show_elapsed:
self._writeSlowTests(result_)
return result_
def run():
# This is a fix to allow the --hide-elapsed flag while accepting
# arbitrary nosetest flags as well
argv = [x for x in sys.argv if x != '--hide-elapsed']
hide_elapsed = argv != sys.argv
# If any argument looks like a test name but doesn't have "heat.tests" in
# front of it, automatically add that so we don't have to type as much
for i, arg in enumerate(argv):
if arg.startswith('test_'):
argv[i] = 'heat.tests.%s' % arg
testdir = os.path.abspath(os.path.join("heat", "tests"))
c = config.Config(stream=sys.stdout,
env=os.environ,
verbosity=3,
workingDir=testdir,
plugins=core.DefaultPluginManager())
runner = HeatTestRunner(stream=c.stream,
verbosity=c.verbosity,
config=c,
show_elapsed=not hide_elapsed)
sys.exit(not core.run(config=c, testRunner=runner, argv=argv))
if __name__ == '__main__':
eventlet.monkey_patch()
run()

View File

@ -16,3 +16,27 @@
# The code below enables nosetests to work with i18n _() blocks # The code below enables nosetests to work with i18n _() blocks
import __builtin__ import __builtin__
setattr(__builtin__, '_', lambda x: x) setattr(__builtin__, '_', lambda x: x)
import os
import shutil
from heat.db.sqlalchemy.session import get_engine
_DB = None
def reset_db():
engine = get_engine()
engine.dispose()
conn = engine.connect()
conn.connection.executescript(_DB)
def setup():
import mox # Fail fast if you don't have mox. Workaround for bug 810424
from heat import db
from heat.db import migration
migration.db_sync()
engine = get_engine()
conn = engine.connect()
# _DB = "".join(line for line in conn.connection.dump())

View File

@ -5,73 +5,60 @@ sys.path.append(os.environ['PYTHON_NOVACLIENT_SRC'])
import nose import nose
import unittest import unittest
import mox import mox
import json
import sqlalchemy
from nose.plugins.attrib import attr from nose.plugins.attrib import attr
from nose import with_setup from nose import with_setup
from tests.v1_1 import fakes from tests.v1_1 import fakes
from heat.engine import resources from heat.engine import resources
from heat.common import config
import heat.db as db_api import heat.db as db_api
from heat.engine import parser
@attr(tag=['unit', 'resource']) @attr(tag=['unit', 'resource'])
@attr(speed='fast') @attr(speed='fast')
class ResourcesTest(unittest.TestCase): class ResourcesTest(unittest.TestCase):
_mox = None
def setUp(self): def setUp(self):
cs = fakes.FakeClient() self.m = mox.Mox()
self._mox = mox.Mox() self.cs = fakes.FakeClient()
sql_connection = 'sqlite://heat.db'
conf = config.HeatEngineConfigOpts()
db_api.configure(conf)
def tearDown(self): def tearDown(self):
self._mox.UnsetStubs() self.m.UnsetStubs()
print "ResourcesTest teardown complete"
def test_initialize_resource_from_template(self):
f = open('templates/WordPress_Single_Instance_gold.template')
t = f.read()
f.close()
stack = self._mox.CreateMockAnything()
stack.id().AndReturn(1)
self._mox.StubOutWithMock(stack, 'resolve_static_refs')
stack.resolve_static_refs(t).AndReturn(t)
self._mox.StubOutWithMock(stack, 'resolve_find_in_map')
stack.resolve_find_in_map(t).AndReturn(t)
self._mox.StubOutWithMock(db_api, 'resource_get_by_name_and_stack')
db_api.resource_get_by_name_and_stack(None, 'test_resource_name', stack).AndReturn(None)
self._mox.ReplayAll()
resource = resources.Resource('test_resource_name', t, stack)
assert isinstance(resource, resources.Resource)
def test_initialize_instance_from_template(self): def test_initialize_instance_from_template(self):
f = open('templates/WordPress_Single_Instance_gold.template') f = open('../../templates/WordPress_Single_Instance_gold.template')
t = f.read() t = json.loads(f.read())
f.close() f.close()
stack = self._mox.CreateMockAnything() params = {}
stack.id().AndReturn(1) parameters = {}
params['KeyStoneCreds'] = None
t['Parameters']['KeyName']['Value'] = 'test'
stack = parser.Stack('test_stack', t, 0, params)
self.m.StubOutWithMock(db_api, 'resource_get_by_name_and_stack')
db_api.resource_get_by_name_and_stack(None, 'test_resource_name',\
stack).AndReturn(None)
self.m.StubOutWithMock(resources.Instance, 'nova')
resources.Instance.nova().AndReturn(self.cs)
resources.Instance.nova().AndReturn(self.cs)
resources.Instance.nova().AndReturn(self.cs)
resources.Instance.nova().AndReturn(self.cs)
self._mox.StubOutWithMock(stack, 'resolve_static_refs') print self.cs.flavors.list()[0].name
stack.resolve_static_refs(t).AndReturn(t) self.m.ReplayAll()
t['Resources']['WebServer']['Properties']['ImageId'] = 'CentOS 5.2'
self._mox.StubOutWithMock(stack, 'resolve_find_in_map') t['Resources']['WebServer']['Properties']['InstanceType'] = '256 MB Server'
stack.resolve_find_in_map(t).AndReturn(t) instance = resources.Instance('test_resource_name',\
t['Resources']['WebServer'], stack)
self._mox.StubOutWithMock(db_api, 'resource_get_by_name_and_stack') instance.itype_oflavor['256 MB Server'] = '256 MB Server'
db_api.resource_get_by_name_and_stack(None, 'test_resource_name', stack).AndReturn(None) instance.create()
self._mox.ReplayAll() # allows testing of the test directly, shown below
instance = resources.Instance('test_resource_name', t, stack)
# allows testing of the test directly, shown below
if __name__ == '__main__': if __name__ == '__main__':
sys.argv.append(__file__) sys.argv.append(__file__)
nose.main() nose.main()

26
heat/utils.py Normal file
View File

@ -0,0 +1,26 @@
class LazyPluggable(object):
"""A pluggable backend loaded lazily based on some value."""
def __init__(self, pivot, **backends):
self.__backends = backends
self.__pivot = pivot
self.__backend = None
def __get_backend(self):
if not self.__backend:
print self.__backends.values()
backend_name = 'sqlalchemy'
backend = self.__backends[backend_name]
if isinstance(backend, tuple):
name = backend[0]
fromlist = backend[1]
else:
name = backend
fromlist = backend
self.__backend = __import__(name, None, None, fromlist)
return self.__backend
def __getattr__(self, key):
backend = self.__get_backend()
return getattr(backend, key)

View File

@ -42,9 +42,10 @@ for arg in "$@"; do
process_option $arg process_option $arg
done done
NOSETESTS="python run_tests.py $noseargs" NOSETESTS="python heat/testing/runner.py $noseopts $noseargs"
function run_tests { function run_tests {
echo 'Running tests'
# Just run the test suites in current environment # Just run the test suites in current environment
${wrapper} $NOSETESTS 2> run_tests.err.log ${wrapper} $NOSETESTS 2> run_tests.err.log
} }
@ -52,7 +53,7 @@ function run_tests {
function run_pep8 { function run_pep8 {
echo "Running pep8 ..." echo "Running pep8 ..."
PEP8_OPTIONS="--exclude=$PEP8_EXCLUDE --repeat" PEP8_OPTIONS="--exclude=$PEP8_EXCLUDE --repeat"
PEP8_INCLUDE="bin/heat bin/heat-api bin/heat-engine heat tools setup.py run_tests.py" PEP8_INCLUDE="bin/heat bin/heat-api bin/heat-engine heat tools setup.py heat/testing/runner.py"
${wrapper} pep8 $PEP8_OPTIONS $PEP8_INCLUDE ${wrapper} pep8 $PEP8_OPTIONS $PEP8_INCLUDE
} }
@ -87,9 +88,9 @@ if [ $just_pep8 -eq 1 ]; then
exit exit
fi fi
run_tests || exit run_tests
if [ -z "$noseargs" ]; then #if [ -z "$noseargs" ]; then
run_pep8 # run_pep8
fi #fi