Add datasource benchmark tests

I chose to integrate these micro-benchmarks with the unit tests
because they're intended to measuring performance changes rather than
report raw capabilities. Since testr already provides timing data and
we'd require similar discovery and fixture capabilities to make it
work, this seemed like a reasonable strategy.

Change-Id: I5a171fad1f2b55d43ac4e47b6ea4ff491dab570e
This commit is contained in:
SH 2014-11-18 15:43:02 -08:00
parent ea3af4b721
commit fe054dd4e9
6 changed files with 278 additions and 11 deletions

View File

@ -0,0 +1,79 @@
#!/usr/bin/env python
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from congress.datasources.datasource_driver import DataSourceDriver
def d6service(name, keys, inbox, datapath, args):
"""This method is called by d6cage to create a dataservice
instance. There are a couple of parameters we found useful
to add to that call, so we included them here instead of
modifying d6cage (and all the d6cage.createservice calls).
"""
return BenchmarkDriver(name, keys, inbox, datapath, args)
class BenchmarkDriver(DataSourceDriver):
BENCHTABLE = 'benchtable'
value_trans = {'translation-type': 'VALUE'}
translator = {
'translation-type': 'HDICT',
'table-name': BENCHTABLE,
'selector-type': 'DICT_SELECTOR',
'field-translators':
({'fieldname': 'field1', 'translator': value_trans},
{'fieldname': 'field2', 'translator': value_trans})}
def __init__(self, name='', keys='', inbox=None, datapath=None, args=None):
super(BenchmarkDriver, self).__init__(
name,
keys,
inbox,
datapath,
args)
# used by update_from_datasources to manufacture data. Default small.
self.datarows = 10
@classmethod
def get_translators(cls):
return (cls.translator,)
def update_from_datasource(self):
self.state = {}
# TODO(sh): using self.convert_objs() takes about 10x the time. Needs
# optimization efforts.
row_data = tuple((self.BENCHTABLE, ('val1_%d' % i, 'val2_%d' % i))
for i in xrange(self.datarows))
for table, row in row_data:
if table not in self.state:
self.state[table] = set()
self.state[table].add(row)
def get_credentials(self, *args, **kwargs):
return {}
# Allow simple manual tests on mocked data
def main():
print 'Schema:'
print '\n'.join(['%s %s' % (k, v)
for k, v in BenchmarkDriver.get_schema().items()])
driver = BenchmarkDriver()
driver.update_from_datasource()
print 'Resulting state'
print str(driver.state)
if __name__ == '__main__':
main()

View File

@ -18,6 +18,7 @@
import os
import fixtures
import mox
from oslo.config import cfg
import testtools
@ -40,6 +41,7 @@ class TestCase(testtools.TestCase):
super(TestCase, self).setUp()
self.mox = mox.Mox()
self.setup_config()
self.addCleanup(cfg.CONF.reset)
config.setup_logging()
@ -70,6 +72,11 @@ class TestCase(testtools.TestCase):
"""Tests that need a non-default config can override this method."""
config.init([], default_config_files=[])
def tearDown(self):
super(TestCase, self).tearDown()
self.mox.UnsetStubs()
self.mox = None
class SqlTestCase(TestCase):
@ -91,3 +98,10 @@ class SqlTestCase(TestCase):
conn.execute(table.delete())
self.addCleanup(clear_tables)
class Benchmark(SqlTestCase):
def setUp(self):
if os.getenv("TEST_BENCHMARK") != "true":
self.skipTest("Skipping slow benchmark tests")
super(Benchmark, self).setUp()

View File

@ -14,7 +14,7 @@
# under the License.
#
import os.path
import os
import time
from retrying import retry
@ -91,9 +91,14 @@ def datasource_config_path():
def state_path():
"""Return path to policy logs for testing."""
"""Return path to policy logs for testing.
Directory will be created if it doesn't exist.
"""
path = test_path()
path = os.path.join(path, "snapshot")
if not os.path.exists(path):
os.makedirs(path)
return path

View File

@ -0,0 +1,167 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import functools
import logging
import eventlet
import mox
from congress.dse.dataobj import pubData
from congress import harness
from congress.policy import compile
from congress.policy import runtime
from congress.tests import base
from congress.tests import helper
LOG = logging.getLogger(__name__)
class BenchmarkDatasource(base.Benchmark):
def setUp(self):
super(BenchmarkDatasource, self).setUp()
config = {'benchmark': {
'module': helper.data_module_path('benchmark_driver.py'),
'poll_time': 0}}
cage = harness.create(helper.root_path(), helper.state_path(), None,
config)
engine = cage.service_object('engine')
api = {'policy': cage.service_object('api-policy'),
'rule': cage.service_object('api-rule'),
'table': cage.service_object('api-table'),
'row': cage.service_object('api-row'),
'datasource': cage.service_object('api-datasource'),
'status': cage.service_object('api-status'),
'schema': cage.service_object('api-schema')}
helper.retry_check_subscriptions(engine, [(api['rule'].name,
'policy-update')])
helper.retry_check_subscribers(api['rule'], [(engine.name,
'policy-update')])
self.assertTrue('benchmark' in cage.services)
datasource = cage.service_object('benchmark')
table_name = datasource.BENCHTABLE
self.assertEqual(datasource.state, {})
# add a subscriber to ensure the updates end up in datasource.dataPath
pubdata = datasource.pubdata.setdefault(table_name,
pubData(table_name))
pubdata.addsubscriber(self.__class__.__name__, "push", "")
self.assertTrue(datasource.pubdata[table_name])
self.cage = cage
self.engine = engine
self.api = api
self.table_name = table_name
self.datasource = datasource
def benchmark_datasource_update(self, size):
"""Benchmark a datasource update.
Time the propagation of a datasource update from datasource.poll() to
ending up in the datasource.dataPath queue.
"""
LOG.info("%s:: benchmarking datasource update of %d rows", size)
self.datasource.datarows = size
# intercept the queue addition so it doesn't immediately get pulled off
# by the d6cage
received = eventlet.Queue()
self.mox.StubOutWithMock(self.datasource.dataPath, "put_nowait")
self.datasource.dataPath.put_nowait(mox.IgnoreArg()).WithSideEffects(
received.put_nowait)
self.mox.ReplayAll()
# poll and then wait until we've got an item from our queue
LOG.info("%s:: polling datasource", self.__class__.__name__)
self.datasource.poll()
result = received.get(timeout=30)
self.assertTrue(result.body)
self.assertEqual(len(result.body.data), size)
self.mox.VerifyAll()
def benchmark_datasource_to_policy_update(self, size):
"""Benchmark small datsource update to policy propagation.
Time the propagation of a datasource update from datasource.poll() to
completion of a simple policy update.
"""
LOG.info("%s:: benchmarking datasource update of %d rows", size)
self.datasource.datarows = size
table_name = self.table_name
# dummy policy only intended to produce a subscriber for the table
key_to_index = self.datasource.get_column_map(table_name)
id_index = 'x%d' % key_to_index.items()[0][1]
max_index = max(key_to_index.values())
args = ['x%d' % i for i in xrange(max_index + 1)]
formula = compile.parse1('p(%s) :- benchmark:%s(%s)' % (id_index,
table_name, ','.join(args)))
# publish the formula and verify we see a subscription
LOG.debug('%s:: sending formula: %s', self.__class__.__name__, formula)
self.api['rule'].publish('policy-update', [runtime.Event(formula)])
helper.retry_check_subscriptions(
self.engine, [('benchmark', table_name)])
helper.retry_check_subscribers(
self.datasource, [(self.engine.name, table_name)])
# intercept inbox.task_done() so we know when it's finished. Sadly,
# eventlet doesn't have a condition-like object.
fake_condition = eventlet.Queue()
fake_notify = functools.partial(fake_condition.put_nowait, True)
self.mox.StubOutWithMock(self.engine.inbox, "task_done")
self.engine.inbox.task_done().WithSideEffects(fake_notify)
self.mox.ReplayAll()
LOG.info("%s:: polling datasource", self.__class__.__name__)
self.datasource.poll()
fake_condition.get(timeout=30)
self.mox.VerifyAll()
def test_benchmark_datasource_update_small(self):
"""Benchmark a small datasource update.
Time the propagation of a small (10 row) datasource update from
datasource.poll() to ending up in the datasource.dataPath queue.
"""
self.benchmark_datasource_update(10)
def test_benchmark_datasource_update_large(self):
"""Benchmark a large datasource update.
Time the propagation of a large (100k row) datasource update from
datasource.poll() to ending up in the datasource.dataPath queue.
"""
self.benchmark_datasource_update(100000)
def test_benchmark_datasource_to_policy_update_small(self):
"""Benchmark small datsource update to policy propagation.
Time the propagation of a small (10 row) datasource update from
datasource.poll() to a simple policy update.
"""
self.benchmark_datasource_to_policy_update(10)
# TODO(sh): First goal is 100k rows in under 15s. Right now, 2k rows takes
# about 10s and 100k times out after 30 minutes.
def test_benchmark_datasource_to_policy_update_large(self):
"""Benchmark small datsource update to policy propagation.
Time the propagation of a large (2k row) datasource update from
datasource.poll() to a simple policy update.
"""
self.benchmark_datasource_to_policy_update(2000)

View File

@ -23,7 +23,6 @@ Tests for `congress` module.
import mox
import neutronclient.v2_0
import os
from congress.api import webservice
from congress.common import config
@ -41,13 +40,6 @@ LOG = logging.getLogger(__name__)
class TestCongress(base.SqlTestCase):
@staticmethod
def state_path():
"""Return path to the dir at which policy contents are stored."""
path = helper.state_path()
if not os.path.exists(path):
os.makedirs(path)
return path
def setUp(self):
"""Setup tests that use multiple mock neutron instances."""
@ -64,7 +56,7 @@ class TestCongress(base.SqlTestCase):
override['neutron2'] = {'client': neutron_mock2, 'poll_time': 0}
override['nova'] = {'poll_time': 0}
cage = harness.create(helper.root_path(), self.state_path(),
cage = harness.create(helper.root_path(), helper.state_path(),
helper.datasource_config_path(), override)
engine = cage.service_object('engine')
api = {'policy': cage.service_object('api-policy'),

10
tox.ini
View File

@ -22,6 +22,16 @@ commands = {posargs}
[testenv:cover]
commands = python setup.py testr --coverage --testr-args='{posargs}'
[testenv:bench]
setenv =
TEST_BENCHMARK=true
VIRTUAL_ENV={envdir}
LANG=en_US.UTF-8
LANGUAGE=en_US:en
LC_ALL=C
commands = python setup.py testr --no-parallel --testr-args='test_benchmark {posargs}'
testr slowest --all
[flake8]
# E125 continuation line does not distinguish itself from next logical line
# E126 continuation line over-indented for hanging indent