Add a StreamResult Router.

The router dispatches packets to another StreamResult based on their
route_code, stripping the matching section. Unroutable events are forwarded to
a fallback stream.
This commit is contained in:
Robert Collins
2013-04-03 13:11:33 +13:00
parent cb84f53e76
commit 088f82f4dd
6 changed files with 306 additions and 0 deletions

4
NEWS
View File

@@ -87,6 +87,10 @@ Improvements
* ``PlaceHolder`` can now hold timestamps, and applies them before the test and
then before the outcome. (Robert Collins)
* ``StreamResultRouter`` added. This is useful for demultiplexing - e.g. for
partitioning analysis of events or sending feedback encapsulated in
StreamResult events back to their source. (Robert Collins)
* The error message for setUp and tearDown upcall errors was broken on Python
3.4. (Monty Taylor, Robert Collins, #1140688)

View File

@@ -249,6 +249,33 @@ passing the events via this decorator to get timestamped data. As long as
no buffering/queueing or blocking happen before the timestamper sees the event
the timestamp will be as accurate as if the original event had it.
StreamResultRouter
------------------
This is a ``StreamResult`` which forwards events to an arbitrary set of target
``StreamResult`` objects. Events that have no forwarding rule are passed onto
an fallback ``StreamResult`` for processing. The mapping can be changed at
runtime, allowing great flexibility and responsiveness to changes. Because
The mapping can change dynamically and there could be the same recipient for
two different maps, ``startTestRun`` and ``stopTestRun`` handling is fine
grained and up to the user.
If no fallback has been supplied, an unroutable event will raise an exception.
For instance::
>>> router = StreamResultRouter()
>>> sink = doubles.StreamResult()
>>> router.add_rule(sink, 'route_code_prefix', route_prefix='0',
... consume_route=True)
>>> router.status(test_id='foo', route_code='0/1', test_status='uxsuccess')
Would remove the ``0/`` from the route_code and forward the event like so::
>>> sink.status('test_id=foo', route_code='1', test_status='uxsuccess')
See ``pydoc testtools.StreamResultRouter`` for details.
TestResult.addSkip
------------------

View File

@@ -31,6 +31,7 @@ __all__ = [
'skipUnless',
'StreamFailFast',
'StreamResult',
'StreamResultRouter',
'StreamSummary',
'StreamTagger',
'StreamToDict',
@@ -84,6 +85,7 @@ else:
MultiTestResult,
StreamFailFast,
StreamResult,
StreamResultRouter,
StreamSummary,
StreamTagger,
StreamToDict,

View File

@@ -9,6 +9,7 @@ __all__ = [
'MultiTestResult',
'StreamFailFast',
'StreamResult',
'StreamResultRouter',
'StreamSummary',
'StreamTagger',
'StreamToDict',
@@ -31,6 +32,7 @@ from testtools.testresult.real import (
MultiTestResult,
StreamFailFast,
StreamResult,
StreamResultRouter,
StreamSummary,
StreamTagger,
StreamToDict,

View File

@@ -427,6 +427,131 @@ class StreamFailFast(StreamResult):
self.on_error()
class StreamResultRouter(StreamResult):
"""A StreamResult that routes events.
StreamResultRouter forwards received events to another StreamResult object,
selected by a dynamic forwarding policy. Events where no destination is
found are forwarded to the fallback StreamResult, or an error is raised.
Typical use is to construct a router with a fallback and then either
create up front mapping rules, or create them as-needed from the fallback
handler::
>>> router = StreamResultRouter()
>>> sink = doubles.StreamResult()
>>> router.add_rule(sink, 'route_code_prefix', route_prefix='0',
... consume_route=True)
>>> router.status(test_id='foo', route_code='0/1', test_status='uxsuccess')
StreamResultRouter has no buffering.
When adding routes (and for the fallback) whether to call startTestRun and
stopTestRun or to not call them is controllable by passing
'do_start_stop_run'. The default is to call them for the fallback only.
If a route is added after startTestRun has been called, and
do_start_stop_run is True then startTestRun is called immediately on the
new route sink.
There is no a-priori defined lookup order for routes: if they are ambiguous
the behaviour is undefined. Only a single route is chosen for any event.
"""
_policies = {}
def __init__(self, fallback=None, do_start_stop_run=True):
"""Construct a StreamResultRouter with optional fallback.
:param fallback: A StreamResult to forward events to when no route
exists for them.
:param do_start_stop_run: If False do not pass startTestRun and
stopTestRun onto the fallback.
"""
self.fallback = fallback
self._route_code_prefixes = {}
self._test_ids = {}
# Records sinks that should have do_start_stop_run called on them.
self._sinks = []
if do_start_stop_run and fallback:
self._sinks.append(fallback)
self._in_run = False
def startTestRun(self):
super(StreamResultRouter, self).startTestRun()
for sink in self._sinks:
sink.startTestRun()
self._in_run = True
def stopTestRun(self):
super(StreamResultRouter, self).stopTestRun()
for sink in self._sinks:
sink.stopTestRun()
self._in_run = False
def status(self, **kwargs):
route_code = kwargs.get('route_code', None)
test_id = kwargs.get('test_id', None)
if route_code is not None:
prefix = route_code.split('/')[0]
else:
prefix = route_code
if prefix in self._route_code_prefixes:
target, consume_route = self._route_code_prefixes[prefix]
if route_code is not None and consume_route:
route_code = route_code[len(prefix) + 1:]
if not route_code:
route_code = None
kwargs['route_code'] = route_code
elif test_id in self._test_ids:
target = self._test_ids[test_id]
else:
target = self.fallback
target.status(**kwargs)
def add_rule(self, sink, policy, do_start_stop_run=False, **policy_args):
"""Add a rule to route events to sink when they match a given policy.
:param sink: A StreamResult to receive events.
:param policy: A routing policy. Valid policies are
'route_code_prefix' and 'test_id'.
:param do_start_stop_run: If True then startTestRun and stopTestRun
events will be passed onto this sink.
route_code_prefix routes events based on a prefix of the route code in
the event. It takes the following arguments::
:param route_prefix: A prefix to match on - e.g. '0'.
:param consume_route: If True, remove the prefix from the route_code
when forwarding events.
test_id routes events based on the test id::
:param test_id: The test id to route on. Use None to select non-test
events.
map may raise errors::
:raises: ValueError if the policy is unknown
:raises: TypeError if the policy is given arguments it cannot handle.
"""
policy_method = StreamResultRouter._policies.get(policy, None)
if not policy_method:
raise ValueError("bad policy %r" % (policy,))
policy_method(self, sink, **policy_args)
if do_start_stop_run:
self._sinks.append(sink)
if self._in_run:
sink.startTestRun()
def _map_route_code_prefix(self, sink, route_prefix, consume_route=False):
if '/' in route_prefix:
raise TypeError(
"%r is more than one route step long" % (route_prefix,))
self._route_code_prefixes[route_prefix] = (sink, consume_route)
_policies['route_code_prefix'] = _map_route_code_prefix
def _map_test_id(self, sink, test_id):
self._test_ids[test_id] = sink
_policies['test_id'] = _map_test_id
class StreamTagger(CopyStreamResult):
"""Adds or discards tags from StreamResult events."""

View File

@@ -28,6 +28,7 @@ from testtools import (
PlaceHolder,
StreamFailFast,
StreamResult,
StreamResultRouter,
StreamSummary,
StreamTagger,
StreamToDict,
@@ -599,6 +600,12 @@ class TestStreamFailFastContract(TestCase, TestStreamResultContract):
return StreamFailFast(lambda:None)
class TestStreamResultRouterContract(TestCase, TestStreamResultContract):
def _make_result(self):
return StreamResultRouter(StreamResult())
class TestDoubleStreamResultEvents(TestCase):
def test_startTestRun(self):
@@ -1693,6 +1700,145 @@ class TestMergeTags(TestCase):
expected, _merge_tags(current_tags, changing_tags))
class TestStreamResultRouter(TestCase):
def test_start_stop_test_run_no_fallback(self):
result = StreamResultRouter()
result.startTestRun()
result.stopTestRun()
def test_no_fallback_errors(self):
self.assertRaises(Exception, StreamResultRouter().status, test_id='f')
def test_fallback_calls(self):
fallback = LoggingStreamResult()
result = StreamResultRouter(fallback)
result.startTestRun()
result.status(test_id='foo')
result.stopTestRun()
self.assertEqual([
('startTestRun',),
('status', 'foo', None, None, True, None, None, False, None, None,
None),
('stopTestRun',),
],
fallback._events)
def test_fallback_no_do_start_stop_run(self):
fallback = LoggingStreamResult()
result = StreamResultRouter(fallback, do_start_stop_run=False)
result.startTestRun()
result.status(test_id='foo')
result.stopTestRun()
self.assertEqual([
('status', 'foo', None, None, True, None, None, False, None, None,
None)
],
fallback._events)
def test_add_rule_bad_policy(self):
router = StreamResultRouter()
target = LoggingStreamResult()
self.assertRaises(ValueError, router.add_rule, target, 'route_code_prefixa',
route_prefix='0')
def test_add_rule_extra_policy_arg(self):
router = StreamResultRouter()
target = LoggingStreamResult()
self.assertRaises(TypeError, router.add_rule, target, 'route_code_prefix',
route_prefix='0', foo=1)
def test_add_rule_missing_prefix(self):
router = StreamResultRouter()
target = LoggingStreamResult()
self.assertRaises(TypeError, router.add_rule, target, 'route_code_prefix')
def test_add_rule_slash_in_prefix(self):
router = StreamResultRouter()
target = LoggingStreamResult()
self.assertRaises(TypeError, router.add_rule, target, 'route_code_prefix',
route_prefix='0/')
def test_add_rule_route_code_consume_False(self):
fallback = LoggingStreamResult()
target = LoggingStreamResult()
router = StreamResultRouter(fallback)
router.add_rule(target, 'route_code_prefix', route_prefix='0')
router.status(test_id='foo', route_code='0')
router.status(test_id='foo', route_code='0/1')
router.status(test_id='foo')
self.assertEqual([
('status', 'foo', None, None, True, None, None, False, None, '0',
None),
('status', 'foo', None, None, True, None, None, False, None, '0/1',
None),
],
target._events)
self.assertEqual([
('status', 'foo', None, None, True, None, None, False, None, None,
None),
],
fallback._events)
def test_add_rule_route_code_consume_True(self):
fallback = LoggingStreamResult()
target = LoggingStreamResult()
router = StreamResultRouter(fallback)
router.add_rule(
target, 'route_code_prefix', route_prefix='0', consume_route=True)
router.status(test_id='foo', route_code='0') # -> None
router.status(test_id='foo', route_code='0/1') # -> 1
router.status(test_id='foo', route_code='1') # -> fallback as-is.
self.assertEqual([
('status', 'foo', None, None, True, None, None, False, None, None,
None),
('status', 'foo', None, None, True, None, None, False, None, '1',
None),
],
target._events)
self.assertEqual([
('status', 'foo', None, None, True, None, None, False, None, '1',
None),
],
fallback._events)
def test_add_rule_test_id(self):
nontest = LoggingStreamResult()
test = LoggingStreamResult()
router = StreamResultRouter(test)
router.add_rule(nontest, 'test_id', test_id=None)
router.status(test_id='foo', file_name="bar", file_bytes=b'')
router.status(file_name="bar", file_bytes=b'')
self.assertEqual([
('status', 'foo', None, None, True, 'bar', b'', False, None, None,
None),], test._events)
self.assertEqual([
('status', None, None, None, True, 'bar', b'', False, None, None,
None),], nontest._events)
def test_add_rule_do_start_stop_run(self):
nontest = LoggingStreamResult()
router = StreamResultRouter()
router.add_rule(nontest, 'test_id', test_id=None, do_start_stop_run=True)
router.startTestRun()
router.stopTestRun()
self.assertEqual([
('startTestRun',),
('stopTestRun',),
], nontest._events)
def test_add_rule_do_start_stop_run_after_startTestRun(self):
nontest = LoggingStreamResult()
router = StreamResultRouter()
router.startTestRun()
router.add_rule(nontest, 'test_id', test_id=None, do_start_stop_run=True)
router.stopTestRun()
self.assertEqual([
('startTestRun',),
('stopTestRun',),
], nontest._events)
class TestStreamToQueue(TestCase):
def make_result(self):