From 088f82f4ddd9e1e02d9c81ecbaf1f971442891a2 Mon Sep 17 00:00:00 2001 From: Robert Collins Date: Wed, 3 Apr 2013 13:11:33 +1300 Subject: [PATCH] Add a StreamResult Router. The router dispatches packets to another StreamResult based on their route_code, stripping the matching section. Unroutable events are forwarded to a fallback stream. --- NEWS | 4 + doc/for-framework-folk.rst | 27 ++++++ testtools/__init__.py | 2 + testtools/testresult/__init__.py | 2 + testtools/testresult/real.py | 125 ++++++++++++++++++++++++ testtools/tests/test_testresult.py | 146 +++++++++++++++++++++++++++++ 6 files changed, 306 insertions(+) diff --git a/NEWS b/NEWS index 781747b..e7ece2d 100644 --- a/NEWS +++ b/NEWS @@ -87,6 +87,10 @@ Improvements * ``PlaceHolder`` can now hold timestamps, and applies them before the test and then before the outcome. (Robert Collins) +* ``StreamResultRouter`` added. This is useful for demultiplexing - e.g. for + partitioning analysis of events or sending feedback encapsulated in + StreamResult events back to their source. (Robert Collins) + * The error message for setUp and tearDown upcall errors was broken on Python 3.4. (Monty Taylor, Robert Collins, #1140688) diff --git a/doc/for-framework-folk.rst b/doc/for-framework-folk.rst index 5f397a0..8846025 100644 --- a/doc/for-framework-folk.rst +++ b/doc/for-framework-folk.rst @@ -249,6 +249,33 @@ passing the events via this decorator to get timestamped data. As long as no buffering/queueing or blocking happen before the timestamper sees the event the timestamp will be as accurate as if the original event had it. +StreamResultRouter +------------------ + +This is a ``StreamResult`` which forwards events to an arbitrary set of target +``StreamResult`` objects. Events that have no forwarding rule are passed onto +an fallback ``StreamResult`` for processing. The mapping can be changed at +runtime, allowing great flexibility and responsiveness to changes. Because +The mapping can change dynamically and there could be the same recipient for +two different maps, ``startTestRun`` and ``stopTestRun`` handling is fine +grained and up to the user. + +If no fallback has been supplied, an unroutable event will raise an exception. + +For instance:: + + >>> router = StreamResultRouter() + >>> sink = doubles.StreamResult() + >>> router.add_rule(sink, 'route_code_prefix', route_prefix='0', + ... consume_route=True) + >>> router.status(test_id='foo', route_code='0/1', test_status='uxsuccess') + +Would remove the ``0/`` from the route_code and forward the event like so:: + + >>> sink.status('test_id=foo', route_code='1', test_status='uxsuccess') + +See ``pydoc testtools.StreamResultRouter`` for details. + TestResult.addSkip ------------------ diff --git a/testtools/__init__.py b/testtools/__init__.py index e58f60a..94e526f 100644 --- a/testtools/__init__.py +++ b/testtools/__init__.py @@ -31,6 +31,7 @@ __all__ = [ 'skipUnless', 'StreamFailFast', 'StreamResult', + 'StreamResultRouter', 'StreamSummary', 'StreamTagger', 'StreamToDict', @@ -84,6 +85,7 @@ else: MultiTestResult, StreamFailFast, StreamResult, + StreamResultRouter, StreamSummary, StreamTagger, StreamToDict, diff --git a/testtools/testresult/__init__.py b/testtools/testresult/__init__.py index bdef369..5bf8f9c 100644 --- a/testtools/testresult/__init__.py +++ b/testtools/testresult/__init__.py @@ -9,6 +9,7 @@ __all__ = [ 'MultiTestResult', 'StreamFailFast', 'StreamResult', + 'StreamResultRouter', 'StreamSummary', 'StreamTagger', 'StreamToDict', @@ -31,6 +32,7 @@ from testtools.testresult.real import ( MultiTestResult, StreamFailFast, StreamResult, + StreamResultRouter, StreamSummary, StreamTagger, StreamToDict, diff --git a/testtools/testresult/real.py b/testtools/testresult/real.py index bad2a3a..b7fdf52 100644 --- a/testtools/testresult/real.py +++ b/testtools/testresult/real.py @@ -427,6 +427,131 @@ class StreamFailFast(StreamResult): self.on_error() +class StreamResultRouter(StreamResult): + """A StreamResult that routes events. + + StreamResultRouter forwards received events to another StreamResult object, + selected by a dynamic forwarding policy. Events where no destination is + found are forwarded to the fallback StreamResult, or an error is raised. + + Typical use is to construct a router with a fallback and then either + create up front mapping rules, or create them as-needed from the fallback + handler:: + + >>> router = StreamResultRouter() + >>> sink = doubles.StreamResult() + >>> router.add_rule(sink, 'route_code_prefix', route_prefix='0', + ... consume_route=True) + >>> router.status(test_id='foo', route_code='0/1', test_status='uxsuccess') + + StreamResultRouter has no buffering. + + When adding routes (and for the fallback) whether to call startTestRun and + stopTestRun or to not call them is controllable by passing + 'do_start_stop_run'. The default is to call them for the fallback only. + If a route is added after startTestRun has been called, and + do_start_stop_run is True then startTestRun is called immediately on the + new route sink. + + There is no a-priori defined lookup order for routes: if they are ambiguous + the behaviour is undefined. Only a single route is chosen for any event. + """ + + _policies = {} + + def __init__(self, fallback=None, do_start_stop_run=True): + """Construct a StreamResultRouter with optional fallback. + + :param fallback: A StreamResult to forward events to when no route + exists for them. + :param do_start_stop_run: If False do not pass startTestRun and + stopTestRun onto the fallback. + """ + self.fallback = fallback + self._route_code_prefixes = {} + self._test_ids = {} + # Records sinks that should have do_start_stop_run called on them. + self._sinks = [] + if do_start_stop_run and fallback: + self._sinks.append(fallback) + self._in_run = False + + def startTestRun(self): + super(StreamResultRouter, self).startTestRun() + for sink in self._sinks: + sink.startTestRun() + self._in_run = True + + def stopTestRun(self): + super(StreamResultRouter, self).stopTestRun() + for sink in self._sinks: + sink.stopTestRun() + self._in_run = False + + def status(self, **kwargs): + route_code = kwargs.get('route_code', None) + test_id = kwargs.get('test_id', None) + if route_code is not None: + prefix = route_code.split('/')[0] + else: + prefix = route_code + if prefix in self._route_code_prefixes: + target, consume_route = self._route_code_prefixes[prefix] + if route_code is not None and consume_route: + route_code = route_code[len(prefix) + 1:] + if not route_code: + route_code = None + kwargs['route_code'] = route_code + elif test_id in self._test_ids: + target = self._test_ids[test_id] + else: + target = self.fallback + target.status(**kwargs) + + def add_rule(self, sink, policy, do_start_stop_run=False, **policy_args): + """Add a rule to route events to sink when they match a given policy. + + :param sink: A StreamResult to receive events. + :param policy: A routing policy. Valid policies are + 'route_code_prefix' and 'test_id'. + :param do_start_stop_run: If True then startTestRun and stopTestRun + events will be passed onto this sink. + + route_code_prefix routes events based on a prefix of the route code in + the event. It takes the following arguments:: + :param route_prefix: A prefix to match on - e.g. '0'. + :param consume_route: If True, remove the prefix from the route_code + when forwarding events. + + test_id routes events based on the test id:: + :param test_id: The test id to route on. Use None to select non-test + events. + + map may raise errors:: + :raises: ValueError if the policy is unknown + :raises: TypeError if the policy is given arguments it cannot handle. + """ + policy_method = StreamResultRouter._policies.get(policy, None) + if not policy_method: + raise ValueError("bad policy %r" % (policy,)) + policy_method(self, sink, **policy_args) + if do_start_stop_run: + self._sinks.append(sink) + if self._in_run: + sink.startTestRun() + + def _map_route_code_prefix(self, sink, route_prefix, consume_route=False): + if '/' in route_prefix: + raise TypeError( + "%r is more than one route step long" % (route_prefix,)) + self._route_code_prefixes[route_prefix] = (sink, consume_route) + _policies['route_code_prefix'] = _map_route_code_prefix + + def _map_test_id(self, sink, test_id): + self._test_ids[test_id] = sink + _policies['test_id'] = _map_test_id + + class StreamTagger(CopyStreamResult): """Adds or discards tags from StreamResult events.""" diff --git a/testtools/tests/test_testresult.py b/testtools/tests/test_testresult.py index 9726c4b..04dc7ce 100644 --- a/testtools/tests/test_testresult.py +++ b/testtools/tests/test_testresult.py @@ -28,6 +28,7 @@ from testtools import ( PlaceHolder, StreamFailFast, StreamResult, + StreamResultRouter, StreamSummary, StreamTagger, StreamToDict, @@ -599,6 +600,12 @@ class TestStreamFailFastContract(TestCase, TestStreamResultContract): return StreamFailFast(lambda:None) +class TestStreamResultRouterContract(TestCase, TestStreamResultContract): + + def _make_result(self): + return StreamResultRouter(StreamResult()) + + class TestDoubleStreamResultEvents(TestCase): def test_startTestRun(self): @@ -1693,6 +1700,145 @@ class TestMergeTags(TestCase): expected, _merge_tags(current_tags, changing_tags)) +class TestStreamResultRouter(TestCase): + + def test_start_stop_test_run_no_fallback(self): + result = StreamResultRouter() + result.startTestRun() + result.stopTestRun() + + def test_no_fallback_errors(self): + self.assertRaises(Exception, StreamResultRouter().status, test_id='f') + + def test_fallback_calls(self): + fallback = LoggingStreamResult() + result = StreamResultRouter(fallback) + result.startTestRun() + result.status(test_id='foo') + result.stopTestRun() + self.assertEqual([ + ('startTestRun',), + ('status', 'foo', None, None, True, None, None, False, None, None, + None), + ('stopTestRun',), + ], + fallback._events) + + def test_fallback_no_do_start_stop_run(self): + fallback = LoggingStreamResult() + result = StreamResultRouter(fallback, do_start_stop_run=False) + result.startTestRun() + result.status(test_id='foo') + result.stopTestRun() + self.assertEqual([ + ('status', 'foo', None, None, True, None, None, False, None, None, + None) + ], + fallback._events) + + def test_add_rule_bad_policy(self): + router = StreamResultRouter() + target = LoggingStreamResult() + self.assertRaises(ValueError, router.add_rule, target, 'route_code_prefixa', + route_prefix='0') + + def test_add_rule_extra_policy_arg(self): + router = StreamResultRouter() + target = LoggingStreamResult() + self.assertRaises(TypeError, router.add_rule, target, 'route_code_prefix', + route_prefix='0', foo=1) + + def test_add_rule_missing_prefix(self): + router = StreamResultRouter() + target = LoggingStreamResult() + self.assertRaises(TypeError, router.add_rule, target, 'route_code_prefix') + + def test_add_rule_slash_in_prefix(self): + router = StreamResultRouter() + target = LoggingStreamResult() + self.assertRaises(TypeError, router.add_rule, target, 'route_code_prefix', + route_prefix='0/') + + def test_add_rule_route_code_consume_False(self): + fallback = LoggingStreamResult() + target = LoggingStreamResult() + router = StreamResultRouter(fallback) + router.add_rule(target, 'route_code_prefix', route_prefix='0') + router.status(test_id='foo', route_code='0') + router.status(test_id='foo', route_code='0/1') + router.status(test_id='foo') + self.assertEqual([ + ('status', 'foo', None, None, True, None, None, False, None, '0', + None), + ('status', 'foo', None, None, True, None, None, False, None, '0/1', + None), + ], + target._events) + self.assertEqual([ + ('status', 'foo', None, None, True, None, None, False, None, None, + None), + ], + fallback._events) + + def test_add_rule_route_code_consume_True(self): + fallback = LoggingStreamResult() + target = LoggingStreamResult() + router = StreamResultRouter(fallback) + router.add_rule( + target, 'route_code_prefix', route_prefix='0', consume_route=True) + router.status(test_id='foo', route_code='0') # -> None + router.status(test_id='foo', route_code='0/1') # -> 1 + router.status(test_id='foo', route_code='1') # -> fallback as-is. + self.assertEqual([ + ('status', 'foo', None, None, True, None, None, False, None, None, + None), + ('status', 'foo', None, None, True, None, None, False, None, '1', + None), + ], + target._events) + self.assertEqual([ + ('status', 'foo', None, None, True, None, None, False, None, '1', + None), + ], + fallback._events) + + def test_add_rule_test_id(self): + nontest = LoggingStreamResult() + test = LoggingStreamResult() + router = StreamResultRouter(test) + router.add_rule(nontest, 'test_id', test_id=None) + router.status(test_id='foo', file_name="bar", file_bytes=b'') + router.status(file_name="bar", file_bytes=b'') + self.assertEqual([ + ('status', 'foo', None, None, True, 'bar', b'', False, None, None, + None),], test._events) + self.assertEqual([ + ('status', None, None, None, True, 'bar', b'', False, None, None, + None),], nontest._events) + + def test_add_rule_do_start_stop_run(self): + nontest = LoggingStreamResult() + router = StreamResultRouter() + router.add_rule(nontest, 'test_id', test_id=None, do_start_stop_run=True) + router.startTestRun() + router.stopTestRun() + self.assertEqual([ + ('startTestRun',), + ('stopTestRun',), + ], nontest._events) + + def test_add_rule_do_start_stop_run_after_startTestRun(self): + nontest = LoggingStreamResult() + router = StreamResultRouter() + router.startTestRun() + router.add_rule(nontest, 'test_id', test_id=None, do_start_stop_run=True) + router.stopTestRun() + self.assertEqual([ + ('startTestRun',), + ('stopTestRun',), + ], nontest._events) + + class TestStreamToQueue(TestCase): def make_result(self):