From ac49955386b695868945a28b6597fe72b3b657e6 Mon Sep 17 00:00:00 2001 From: Sandy Walsh Date: Thu, 25 Sep 2014 14:44:38 +0000 Subject: [PATCH] Adding debugger from oahu Allows for per-trigger-definition debugging. Simply add debug_level=1 to your trigger definition. debug_level=2 gives detailed explainations of why your stream didn't fire or trigger. This way, you can leave the overall debug level at INFO and still get details on the stream you're working on. Yagi worker now uses idle() callback for debug updates. and minor cleanups for pep8/HACKING Change-Id: Id0af7a0adbcc47335ad250130958932c708b5c18 --- .gitignore | 3 + AUTHORS | 1 - ChangeLog | 62 ---------- etc/triggers.yaml | 1 + tests/test_debugger.py | 219 +++++++++++++++++++++++++++++++++ tests/test_definition.py | 121 +++++++++--------- tests/test_pipeline_manager.py | 112 +++++++++++------ tests/test_trigger_manager.py | 41 +++--- winchester/debugging.py | 161 ++++++++++++++++++++++++ winchester/definition.py | 60 +++++---- winchester/pipeline_manager.py | 87 +++++++++---- winchester/trigger_manager.py | 64 ++++++---- winchester/yagi_handler.py | 2 + 13 files changed, 693 insertions(+), 241 deletions(-) delete mode 100644 AUTHORS delete mode 100644 ChangeLog create mode 100644 tests/test_debugger.py create mode 100644 winchester/debugging.py diff --git a/.gitignore b/.gitignore index a735f8b..d832fc3 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,8 @@ *.py[cod] +AUTHORS +Changelog + # C extensions *.so diff --git a/AUTHORS b/AUTHORS deleted file mode 100644 index 9ee0aa5..0000000 --- a/AUTHORS +++ /dev/null @@ -1 +0,0 @@ -Monsyne Dragon diff --git a/ChangeLog b/ChangeLog deleted file mode 100644 index 0c18624..0000000 --- a/ChangeLog +++ /dev/null @@ -1,62 +0,0 @@ -commit 2d47fa6f6e0de0a54975ff92fc87785a052d4371 -Author: Monsyne Dragon -Date: Mon Sep 8 23:02:52 2014 +0000 - - Add reset stream method. - -commit ca0d09f7bc017ef9e372ae29a0c19bf20b68aca5 -Author: Monsyne Dragon -Date: Mon Sep 8 19:57:24 2014 +0000 - - Save newly generated events from pipeline - - Save newly created events from pipeline run if pipeline commits. - Refactor trigger manager api wart, move save_event call into add_event - to make add_event and add_notification symmetric. - -commit 0c619c133d3c248d62a2c5f6441d4fae0bf7042a -Author: Monsyne Dragon -Date: Sun Sep 7 04:07:20 2014 +0000 - - Add database admin command. - - Add admin command for db schema upgrade/downgrade/etc. - Move alembic migrations so above can find them when installed - as a package. - Fix up packaging to use setup.cfg and pbr. - Flesh out README. - -commit a6f84d16036e143b1b605c50b90055a623e3235b -Author: Monsyne Dragon -Date: Thu Sep 4 20:43:41 2014 +0000 - - Fixed a few bugs, added more logging. - - Fixed timestamp bug, and streamstate issue missed in unittests. - Added more logging for pipeline manager. - -commit c2aa498beb14cf0a61066fe1e7df833a16db5733 -Author: Monsyne Dragon -Date: Thu Sep 4 18:05:19 2014 +0000 - - Move yagi handler into winchester codebase. - -commit a8f373e4bf14762ad09a20f8ad9ea543e11c5be7 -Author: Monsyne Dragon -Date: Thu Sep 4 01:49:19 2014 +0000 - - Added full stream processing, pipeline workers, etc. - - Full trigger logic now works. - Added pipeline workers, and test handler. - Added example configs - Lots of unittests. - -commit aa8fb55e879e782268c663f81e73384673d56847 -Author: Monsyne Dragon -Date: Thu Jun 26 01:55:26 2014 +0000 - - Initial commit of DB schema. - - Initial commit of the event schema for the database. - This includes models and alembic migration. \ No newline at end of file diff --git a/etc/triggers.yaml b/etc/triggers.yaml index 6c82d5a..f415bfb 100644 --- a/etc/triggers.yaml +++ b/etc/triggers.yaml @@ -1,5 +1,6 @@ --- - name: test_trigger + debug_level: 2 distinguished_by: - instance_id - timestamp: "day" diff --git a/tests/test_debugger.py b/tests/test_debugger.py new file mode 100644 index 0000000..baaf8a9 --- /dev/null +++ b/tests/test_debugger.py @@ -0,0 +1,219 @@ +import unittest2 as unittest + +import mock + +from winchester import debugging + + +class TestDebugManager(unittest.TestCase): + + def setUp(self): + super(TestDebugManager, self).setUp() + self.debug_manager = debugging.DebugManager() + + def test_get_debugger_none(self): + debugger = self.debug_manager.get_debugger(None) + self.assertEquals("n/a", debugger._name) + self.assertEquals(2, debugger._debug_level) + + def test_get_debugger_off(self): + tdef = mock.MagicMock(name="tdef") + tdef.name = "my_trigger" + tdef.debug_level = 0 + debugger = self.debug_manager.get_debugger(tdef) + self.assertTrue(isinstance(debugger, debugging.NoOpDebugger)) + self.assertEquals(debugger, + self.debug_manager._debuggers['my_trigger']) + + debugger2 = self.debug_manager.get_debugger(tdef) + self.assertEquals(debugger, debugger2) + + def test_get_debugger_on(self): + tdef = mock.MagicMock(name="tdef") + tdef.name = "my_trigger" + tdef.debug_level = 1 + debugger = self.debug_manager.get_debugger(tdef) + self.assertTrue(isinstance(debugger, debugging.DetailedDebugger)) + self.assertEquals(debugger, + self.debug_manager._debuggers['my_trigger']) + + debugger2 = self.debug_manager.get_debugger(tdef) + self.assertEquals(debugger, debugger2) + + def test_dump_group_level1(self): + debugger = mock.MagicMock(name="debugger") + debugger.get_debug_level.return_value = 1 + group = mock.MagicMock(name="group") + group._name = "my_group" + group._match = 1 + group._mismatch = 2 + debugger.get_group.return_value = group + with mock.patch.object(debugging, "logger") as log: + self.debug_manager.dump_group(debugger, "my_group") + + log.info.assert_called_once_with( + "my_group Criteria: 3 checks, 1 passed") + + def test_dump_group_level2(self): + debugger = mock.MagicMock(name="debugger") + debugger.get_debug_level.return_value = 2 + group = mock.MagicMock(name="group") + group._name = "my_group" + group._match = 1 + group._mismatch = 2 + group._reasons = {"foo": 12} + debugger.get_group.return_value = group + with mock.patch.object(debugging, "logger") as log: + self.debug_manager.dump_group(debugger, "my_group") + + self.assertEquals(log.info.call_args_list, + [mock.call("my_group Criteria: 3 checks, 1 passed"), + mock.call(" - foo = 12")]) + + def test_dump_counters(self): + debugger = mock.MagicMock(name="debugger") + debugger._counters = {'foo': 12} + with mock.patch.object(debugging, "logger") as log: + self.debug_manager.dump_counters(debugger) + log.info.assert_called_once_with('Counter "foo" = 12') + + def test_dump_debuggers_off(self): + debugger = mock.MagicMock(name="debugger") + debugger.get_debug_level.return_value = 0 + self.debug_manager._debuggers = {"foo": debugger} + with mock.patch.object(debugging, "logger") as log: + self.debug_manager.dump_debuggers() + self.assertEqual(0, log.info.call_count) + + def test_dump_debuggers_on(self): + debugger = mock.MagicMock(name="debugger") + debugger.get_debug_level.return_value = 1 + debugger._name = "my_debugger" + group = mock.MagicMock(name="group") + debugger._groups = {"my_group": group} + self.debug_manager._debuggers = {"foo": debugger} + with mock.patch.object(self.debug_manager, "dump_counters") as ctr: + with mock.patch.object(self.debug_manager, "dump_group") as grp: + with mock.patch.object(debugging, "logger") as log: + self.debug_manager.dump_debuggers() + self.assertEquals(log.info.call_args_list, + [mock.call("---- Trigger Definition: my_debugger ----"), + mock.call("----------------------------")]) + grp.assert_called_once_with(debugger, "my_group") + ctr.assert_called_once_with(debugger) + debugger.reset.assert_called_once_with() + + +class TestDetailedDebugger(unittest.TestCase): + + def setUp(self): + super(TestDetailedDebugger, self).setUp() + self.debugger = debugging.DetailedDebugger("my_debugger", 2) + + def test_constructor(self): + with mock.patch("winchester.debugging.DetailedDebugger.reset") \ + as reset: + d = debugging.DetailedDebugger("my_debugger", 2) + reset.assert_called_once_with() + + self.assertEquals(self.debugger._name, "my_debugger") + self.assertEquals(self.debugger._debug_level, 2) + + def test_reset(self): + self.assertEquals(self.debugger._groups, {}) + self.assertEquals(self.debugger._counters, {}) + + def test_get_group(self): + self.assertEquals(self.debugger._groups, {}) + g = self.debugger.get_group("foo") + self.assertEquals(g._name, "foo") + self.assertTrue(self.debugger._groups['foo']) + + def test_bump_counter(self): + self.assertEquals(self.debugger._counters, {}) + self.debugger.bump_counter("foo") + self.assertEquals(self.debugger._counters['foo'], 1) + + self.debugger.bump_counter("foo", 2) + self.assertEquals(self.debugger._counters['foo'], 3) + + def test_get_debug_level(self): + self.assertEquals(self.debugger.get_debug_level(), 2) + + +class TestNoOpDebugger(unittest.TestCase): + def setUp(self): + super(TestNoOpDebugger, self).setUp() + self.debugger = debugging.NoOpDebugger("my_debugger", 2) + + def test_reset(self): + self.debugger.reset() + + def test_get_group(self): + g = self.debugger.get_group("foo") + self.assertEquals(g, self.debugger.noop_group) + + def test_bump_counter(self): + self.debugger.bump_counter("foo") + self.debugger.bump_counter("foo", 2) + + def test_get_debug_level(self): + self.assertEquals(self.debugger.get_debug_level(), 0) + + +class TestGroup(unittest.TestCase): + def setUp(self): + super(TestGroup, self).setUp() + self.group = debugging.Group("my_group") + + def test_constructor(self): + self.assertEquals("my_group", self.group._name) + self.assertEquals(0, self.group._match) + self.assertEquals(0, self.group._mismatch) + self.assertEquals({}, self.group._reasons) + + def test_match(self): + self.assertTrue(self.group.match()) + self.assertEquals(1, self.group._match) + + def test_mismatch(self): + self.assertFalse(self.group.mismatch("reason")) + self.assertEquals(1, self.group._mismatch) + self.assertEquals(1, self.group._reasons['reason']) + + def test_check(self): + self.assertTrue(self.group.check(True, "reason")) + self.assertEquals(1, self.group._match) + self.assertEquals(0, self.group._mismatch) + self.assertEquals({}, self.group._reasons) + + self.assertTrue(self.group.check(True, "reason")) + self.assertEquals(2, self.group._match) + self.assertEquals(0, self.group._mismatch) + self.assertEquals({}, self.group._reasons) + + self.assertFalse(self.group.check(False, "reason")) + self.assertEquals(2, self.group._match) + self.assertEquals(1, self.group._mismatch) + self.assertEquals(1, self.group._reasons['reason']) + + self.assertFalse(self.group.check(False, "reason")) + self.assertEquals(2, self.group._match) + self.assertEquals(2, self.group._mismatch) + self.assertEquals(2, self.group._reasons['reason']) + + +class TestNoOpGroup(unittest.TestCase): + def setUp(self): + super(TestNoOpGroup, self).setUp() + self.group = debugging.NoOpGroup() + + def test_match(self): + self.assertTrue(self.group.match()) + + def test_mismatch(self): + self.assertFalse(self.group.mismatch("reason")) + + def test_check(self): + self.assertTrue(self.group.check(True, "reason")) + self.assertFalse(self.group.check(False, "reason")) diff --git a/tests/test_definition.py b/tests/test_definition.py index d6b5e55..846dfe9 100644 --- a/tests/test_definition.py +++ b/tests/test_definition.py @@ -6,6 +6,7 @@ import mock import datetime import timex +from winchester import debugging from winchester import definition @@ -13,27 +14,29 @@ class TestCriterion(unittest.TestCase): def setUp(self): super(TestCriterion, self).setUp() + self.fake_group = debugging.NoOpGroup() + self.fake_debugger = debugging.NoOpDebugger() def test_basic_criterion(self): c = definition.Criterion(3, 'foo') - self.assertTrue(c.match({'foo': 3})) - self.assertFalse(c.match({'foo': 5})) - self.assertFalse(c.match({'bar': 5})) - self.assertFalse(c.match({'foo': "booga"})) + self.assertTrue(c.match({'foo': 3}, self.fake_group)) + self.assertFalse(c.match({'foo': 5}, self.fake_group)) + self.assertFalse(c.match({'bar': 5}, self.fake_group)) + self.assertFalse(c.match({'foo': "booga"}, self.fake_group)) def test_numeric_criterion(self): c = definition.NumericCriterion("3", 'foo') - self.assertTrue(c.match({'foo': 3})) - self.assertFalse(c.match({'foo': 5})) - self.assertFalse(c.match({'bar': 5})) - self.assertFalse(c.match({'foo': "booga"})) + self.assertTrue(c.match({'foo': 3}, self.fake_group)) + self.assertFalse(c.match({'foo': 5}, self.fake_group)) + self.assertFalse(c.match({'bar': 5}, self.fake_group)) + self.assertFalse(c.match({'foo': "booga"}, self.fake_group)) c = definition.NumericCriterion("> 3", 'foo') - self.assertFalse(c.match({'foo': 3})) - self.assertTrue(c.match({'foo': 5})) + self.assertFalse(c.match({'foo': 3}, self.fake_group)) + self.assertTrue(c.match({'foo': 5}, self.fake_group)) c = definition.NumericCriterion("< 3", 'foo') - self.assertFalse(c.match({'foo': 3})) - self.assertFalse(c.match({'foo': 5})) - self.assertTrue(c.match({'foo': 1})) + self.assertFalse(c.match({'foo': 3}, self.fake_group)) + self.assertFalse(c.match({'foo': 5}, self.fake_group)) + self.assertTrue(c.match({'foo': 1}, self.fake_group)) with self.assertRaises(definition.DefinitionError): c = definition.NumericCriterion("zazz", "foo") with self.assertRaises(definition.DefinitionError): @@ -41,17 +44,17 @@ class TestCriterion(unittest.TestCase): def test_float_criterion(self): c = definition.FloatCriterion("3.14", 'foo') - self.assertTrue(c.match({'foo': 3.14})) - self.assertFalse(c.match({'foo': 5.2})) - self.assertFalse(c.match({'bar': 5.2})) - self.assertFalse(c.match({'foo': "booga"})) + self.assertTrue(c.match({'foo': 3.14}, self.fake_group)) + self.assertFalse(c.match({'foo': 5.2}, self.fake_group)) + self.assertFalse(c.match({'bar': 5.2}, self.fake_group)) + self.assertFalse(c.match({'foo': "booga"}, self.fake_group)) c = definition.FloatCriterion("> 3.14", 'foo') - self.assertFalse(c.match({'foo': 3.14})) - self.assertTrue(c.match({'foo': 5.2})) + self.assertFalse(c.match({'foo': 3.14}, self.fake_group)) + self.assertTrue(c.match({'foo': 5.2}, self.fake_group)) c = definition.FloatCriterion("< 3.14", 'foo') - self.assertFalse(c.match({'foo': 3.14})) - self.assertFalse(c.match({'foo': 3.5})) - self.assertTrue(c.match({'foo': 3.02})) + self.assertFalse(c.match({'foo': 3.14}, self.fake_group)) + self.assertFalse(c.match({'foo': 3.5}, self.fake_group)) + self.assertTrue(c.match({'foo': 3.02}, self.fake_group)) with self.assertRaises(definition.DefinitionError): c = definition.FloatCriterion("zazz", "foo") with self.assertRaises(definition.DefinitionError): @@ -61,25 +64,26 @@ class TestCriterion(unittest.TestCase): c = definition.TimeCriterion("day", "foo") e = dict(timestamp=datetime.datetime(2014,8,1,7,52,31,2), foo=datetime.datetime(2014,8,1,1,2,0,0)) - self.assertTrue(c.match(e)) + self.assertTrue(c.match(e, self.fake_group)) e = dict(timestamp=datetime.datetime(2014,8,1,7,52,31,2), foo=datetime.datetime(2014,8,2,1,2,0,0)) - self.assertFalse(c.match(e)) + self.assertFalse(c.match(e, self.fake_group)) e = dict(timestamp=datetime.datetime(2014,8,1,7,52,31,2), bar=datetime.datetime(2014,8,1,1,2,0,0)) - self.assertFalse(c.match(e)) + self.assertFalse(c.match(e, self.fake_group)) e = dict(timestamp=datetime.datetime(2014,8,1,7,52,31,2), message_id='1234-5678', quux=4, foo=datetime.datetime(2014,8,1,1,2,0,0)) - self.assertTrue(c.match(e)) - + self.assertTrue(c.match(e, self.fake_group)) class TestCriteria(unittest.TestCase): def setUp(self): super(TestCriteria, self).setUp() + self.fake_group = debugging.NoOpGroup() + self.fake_debugger = debugging.NoOpDebugger() def test_defaults(self): criteria = definition.Criteria({}) @@ -128,9 +132,9 @@ class TestCriteria(unittest.TestCase): event1 = dict(event_type = "test.foo.zazz") event2 = dict(event_type = "test.wakka.zazz") event3 = dict(event_type = "test.boingy") - self.assertTrue(criteria.match(event1)) - self.assertFalse(criteria.match(event2)) - self.assertFalse(criteria.match(event3)) + self.assertTrue(criteria.match(event1, self.fake_group)) + self.assertFalse(criteria.match(event2, self.fake_group)) + self.assertFalse(criteria.match(event3, self.fake_group)) def test_match_for_timestamp(self): config = dict(timestamp='day($launched_at)') @@ -143,9 +147,9 @@ class TestCriteria(unittest.TestCase): launched_at=datetime.datetime(2014,8,1,1,2,3,4)) event3 = dict(event_type='test.thing', timestamp=datetime.datetime(2014,8,2,17,16,15,14)) - self.assertTrue(criteria.match(event1)) - self.assertFalse(criteria.match(event2)) - self.assertFalse(criteria.match(event3)) + self.assertTrue(criteria.match(event1, self.fake_group)) + self.assertFalse(criteria.match(event2, self.fake_group)) + self.assertFalse(criteria.match(event3, self.fake_group)) def test_match_for_traits(self): config = dict(traits=dict(some_trait="test", @@ -195,42 +199,49 @@ class TestCriteria(unittest.TestCase): other_trait='text here', memory_mb=4096, test_weight=6.283) - self.assertTrue(criteria.match(event1)) - self.assertFalse(criteria.match(event2)) - self.assertFalse(criteria.match(event3)) - self.assertFalse(criteria.match(event4)) - self.assertFalse(criteria.match(event5)) - self.assertFalse(criteria.match(event6)) + self.assertTrue(criteria.match(event1, self.fake_group)) + self.assertFalse(criteria.match(event2, self.fake_group)) + self.assertFalse(criteria.match(event3, self.fake_group)) + self.assertFalse(criteria.match(event4, self.fake_group)) + self.assertFalse(criteria.match(event5, self.fake_group)) + self.assertFalse(criteria.match(event6, self.fake_group)) + class TestTriggerDefinition(unittest.TestCase): def setUp(self): super(TestTriggerDefinition, self).setUp() + self.debug_manager = debugging.DebugManager() def test_config_error_check_and_defaults(self): with self.assertRaises(definition.DefinitionError): - definition.TriggerDefinition(dict()) + definition.TriggerDefinition(dict(), self.debug_manager) with self.assertRaises(definition.DefinitionError): - definition.TriggerDefinition(dict(name='test_trigger')) + definition.TriggerDefinition(dict(name='test_trigger'), + self.debug_manager) with self.assertRaises(definition.DefinitionError): definition.TriggerDefinition(dict(name='test_trigger', - expiration='$last + 1d')) + expiration='$last + 1d'), + self.debug_manager) with self.assertRaises(definition.DefinitionError): definition.TriggerDefinition(dict(name='test_trigger', expiration='$last + 1d', - fire_pipeline='test_pipeline')) + fire_pipeline='test_pipeline'), + self.debug_manager) with self.assertRaises(definition.DefinitionError): definition.TriggerDefinition( dict(name='test_trigger', expiration='$last + 1d', fire_pipeline='test_pipeline', - fire_criteria=[dict(event_type='test.thing')])) + fire_criteria=[dict(event_type='test.thing')]), + self.debug_manager) tdef = definition.TriggerDefinition( dict(name='test_trigger', expiration='$last + 1d', fire_pipeline='test_pipeline', fire_criteria=[dict(event_type='test.thing')], - match_criteria=[dict(event_type='test.*')])) + match_criteria=[dict(event_type='test.*')]), + self.debug_manager) self.assertEqual(len(tdef.distinguished_by), 0) self.assertEqual(len(tdef.fire_criteria), 1) self.assertIsInstance(tdef.fire_criteria[0], definition.Criteria) @@ -245,7 +256,7 @@ class TestTriggerDefinition(unittest.TestCase): fire_pipeline='test_pipeline', fire_criteria=[dict(event_type='test.thing')], match_criteria=[dict(event_type='test.*')]) - tdef = definition.TriggerDefinition(config) + tdef = definition.TriggerDefinition(config, self.debug_manager) event1 = dict(event_type='test.thing') event2 = dict(event_type='other.thing') self.assertTrue(tdef.match(event1)) @@ -256,7 +267,7 @@ class TestTriggerDefinition(unittest.TestCase): fire_criteria=[dict(event_type='test.thing')], match_criteria=[dict(event_type='test.*'), dict(event_type='other.*')]) - tdef = definition.TriggerDefinition(config) + tdef = definition.TriggerDefinition(config, self.debug_manager) self.assertTrue(tdef.match(event1)) self.assertTrue(tdef.match(event2)) @@ -267,7 +278,7 @@ class TestTriggerDefinition(unittest.TestCase): fire_pipeline='test_pipeline', fire_criteria=[dict(event_type='test.thing')], match_criteria=[dict(event_type='test.*')]) - tdef = definition.TriggerDefinition(config) + tdef = definition.TriggerDefinition(config, self.debug_manager) event1 = dict(event_type='test.thing', instance_id='foo') event2 = dict(event_type='test.thing') self.assertTrue(tdef.match(event1)) @@ -281,7 +292,7 @@ class TestTriggerDefinition(unittest.TestCase): fire_criteria=[dict(event_type='test.thing')], match_criteria=[dict(event_type='test.*')]) event1 = dict(event_type='test.thing', instance_id='foo') - tdef = definition.TriggerDefinition(config) + tdef = definition.TriggerDefinition(config, self.debug_manager) mcriteria = tdef.match(event1) dt = tdef.get_distinguishing_traits(event1, mcriteria) self.assertEqual(len(dt), 1) @@ -297,7 +308,7 @@ class TestTriggerDefinition(unittest.TestCase): match_criteria=[dict(event_type='test.*')]) event1 = dict(event_type='test.thing', instance_id='foo', timestamp=datetime.datetime(2014,8,1,20,4,23,444)) - tdef = definition.TriggerDefinition(config) + tdef = definition.TriggerDefinition(config, self.debug_manager) mcriteria = tdef.match(event1) dt = tdef.get_distinguishing_traits(event1, mcriteria) self.assertEqual(len(dt), 2) @@ -320,7 +331,7 @@ class TestTriggerDefinition(unittest.TestCase): map_distinguished_by=dict(instance_id='other_id'))]) event1 = dict(event_type='test.thing', instance_id='foo', other_id='bar') - tdef = definition.TriggerDefinition(config) + tdef = definition.TriggerDefinition(config, self.debug_manager) mcriteria = tdef.match(event1) dt = tdef.get_distinguishing_traits(event1, mcriteria) self.assertEqual(len(dt), 1) @@ -334,7 +345,7 @@ class TestTriggerDefinition(unittest.TestCase): fire_pipeline='test_pipeline', fire_criteria=[dict(event_type='test.thing')], match_criteria=[dict(event_type='test.*')]) - tdef = definition.TriggerDefinition(config) + tdef = definition.TriggerDefinition(config, self.debug_manager) test_time = datetime.datetime(2014,8,1,20,4,23,444) test_time_plus_1hr = datetime.datetime(2014,8,1,21,4,23,444) ft = tdef.get_fire_timestamp(test_time) @@ -346,7 +357,7 @@ class TestTriggerDefinition(unittest.TestCase): fire_pipeline='test_pipeline', fire_criteria=[dict(event_type='test.thing')], match_criteria=[dict(event_type='test.*')]) - tdef = definition.TriggerDefinition(config) + tdef = definition.TriggerDefinition(config, self.debug_manager) ft = tdef.get_fire_timestamp(test_time) self.assertEqual(ft, test_time_plus_1hr) @@ -357,7 +368,7 @@ class TestTriggerDefinition(unittest.TestCase): fire_pipeline='test_pipeline', fire_criteria=[dict(event_type='test.thing')], match_criteria=[dict(event_type='test.*')]) - tdef = definition.TriggerDefinition(config) + tdef = definition.TriggerDefinition(config, self.debug_manager) events1 = [ dict(event_type='test.foobar'), dict(event_type='test.thing'), dict(event_type='test.thing')] @@ -374,7 +385,7 @@ class TestTriggerDefinition(unittest.TestCase): fire_pipeline='test_pipeline', fire_criteria=[dict(event_type='test.thing', number=2)], match_criteria=[dict(event_type='test.*')]) - tdef = definition.TriggerDefinition(config) + tdef = definition.TriggerDefinition(config, self.debug_manager) self.assertTrue(tdef.should_fire(events1)) self.assertFalse(tdef.should_fire(events2)) self.assertFalse(tdef.should_fire(events3)) diff --git a/tests/test_pipeline_manager.py b/tests/test_pipeline_manager.py index 77889bc..58071bf 100644 --- a/tests/test_pipeline_manager.py +++ b/tests/test_pipeline_manager.py @@ -5,12 +5,16 @@ import mock import datetime import timex -from winchester import pipeline_manager +from winchester import debugging from winchester import db as winch_db +from winchester import pipeline_manager from winchester.models import StreamState class TestPipeline(unittest.TestCase): + def setUp(self): + super(TestPipeline, self).setUp() + self.debugger = debugging.NoOpDebugger() def test_check_handler_config(self): @@ -78,13 +82,13 @@ class TestPipeline(unittest.TestCase): p.commit = mock.MagicMock(name='commit') p.rollback = mock.MagicMock(name='rollback') - ret = p.handle_events(test_events) + ret = p.handle_events(test_events, self.debugger) handler_class1.return_value.handle_events.assert_called_once_with(test_events, p.env) events1 = handler_class1.return_value.handle_events.return_value handler_class2.return_value.handle_events.assert_called_once_with(events1, p.env) events2 = handler_class2.return_value.handle_events.return_value handler_class3.return_value.handle_events.assert_called_once_with(events2, p.env) - p.commit.assert_called_once_with() + p.commit.assert_called_once_with(self.debugger) self.assertFalse(p.rollback.called) self.assertEqual(ret, new_events) @@ -112,8 +116,8 @@ class TestPipeline(unittest.TestCase): p.rollback = mock.MagicMock(name='rollback') with self.assertRaises(pipeline_manager.PipelineExecutionError): - ret = p.handle_events(test_events) - p.rollback.assert_called_once_with() + p.handle_events(test_events, self.debugger) + p.rollback.assert_called_once_with(self.debugger) self.assertFalse(p.commit.called) def test_commit(self): @@ -128,7 +132,7 @@ class TestPipeline(unittest.TestCase): 'other_thing': handler_class2, 'some_thing': handler_class3} p = pipeline_manager.Pipeline("test_pipeline", conf, handler_map) - p.commit() + p.commit(self.debugger) handler_class1.return_value.commit.assert_called_once_with() handler_class2.return_value.commit.assert_called_once_with() handler_class3.return_value.commit.assert_called_once_with() @@ -150,7 +154,7 @@ class TestPipeline(unittest.TestCase): 'other_thing': handler_class2, 'some_thing': handler_class3} p = pipeline_manager.Pipeline("test_pipeline", conf, handler_map) - p.commit() + p.commit(self.debugger) handler_class1.return_value.commit.assert_called_once_with() handler_class2.return_value.commit.assert_called_once_with() handler_class3.return_value.commit.assert_called_once_with() @@ -167,7 +171,7 @@ class TestPipeline(unittest.TestCase): 'other_thing': handler_class2, 'some_thing': handler_class3} p = pipeline_manager.Pipeline("test_pipeline", conf, handler_map) - p.rollback() + p.rollback(self.debugger) handler_class1.return_value.rollback.assert_called_once_with() handler_class2.return_value.rollback.assert_called_once_with() handler_class3.return_value.rollback.assert_called_once_with() @@ -189,7 +193,7 @@ class TestPipeline(unittest.TestCase): 'other_thing': handler_class2, 'some_thing': handler_class3} p = pipeline_manager.Pipeline("test_pipeline", conf, handler_map) - p.rollback() + p.rollback(self.debugger) handler_class1.return_value.rollback.assert_called_once_with() handler_class2.return_value.rollback.assert_called_once_with() handler_class3.return_value.rollback.assert_called_once_with() @@ -199,6 +203,7 @@ class TestPipelineManager(unittest.TestCase): def setUp(self): super(TestPipelineManager, self).setUp() + self.debugger = debugging.NoOpDebugger() @mock.patch.object(pipeline_manager.ConfigManager, 'wrap') def test_complete_stream_nopurge(self, mock_config_wrap): @@ -240,19 +245,23 @@ class TestPipelineManager(unittest.TestCase): pm = pipeline_manager.PipelineManager('test') pm.db = mock.MagicMock(spec=pm.db, name='db') trigger_def = mock.MagicMock(name='trigger_def') + trigger_def.debugger = self.debugger pipeline_name = "test" pipeline_config = mock.MagicMock(name='pipeline_config') stream = mock.MagicMock(name='stream') - pm.add_new_events = mock.MagicMock(name='add_nemw_events') + stream.name = "test" + pm.add_new_events = mock.MagicMock(name='add_new_events') pm.pipeline_handlers = mock.MagicMock(name='pipeline_handlers') - ret = pm._run_pipeline(stream, trigger_def, pipeline_name, pipeline_config) + ret = pm._run_pipeline(stream, trigger_def, pipeline_name, + pipeline_config) pm.db.get_stream_events.assert_called_once_with(stream) - mock_pipeline.assert_called_once_with(pipeline_name, pipeline_config, pm.pipeline_handlers) + mock_pipeline.assert_called_once_with(pipeline_name, pipeline_config, + pm.pipeline_handlers) pipeline = mock_pipeline.return_value pipeline.handle_events.assert_called_once_with( - pm.db.get_stream_events.return_value) + pm.db.get_stream_events.return_value, self.debugger) pm.add_new_events.assert_called_once_with( mock_pipeline.return_value.handle_events.return_value) self.assertTrue(ret) @@ -263,21 +272,26 @@ class TestPipelineManager(unittest.TestCase): pm = pipeline_manager.PipelineManager('test') pm.db = mock.MagicMock(spec=pm.db, name='db') trigger_def = mock.MagicMock(name='trigger_def') + trigger_def.debugger = self.debugger pipeline_name = "test" pipeline_config = mock.MagicMock(name='pipeline_config') stream = mock.MagicMock(name='stream') + stream.name = "test" pm.add_new_events = mock.MagicMock(name='add_nemw_events') pm.pipeline_handlers = mock.MagicMock(name='pipeline_handlers') pipeline = mock_pipeline.return_value - pipeline.handle_events.side_effect = pipeline_manager.PipelineExecutionError('test', 'thing') + pipeline.handle_events.side_effect = \ + pipeline_manager.PipelineExecutionError('test', 'thing') - ret = pm._run_pipeline(stream, trigger_def, pipeline_name, pipeline_config) + ret = pm._run_pipeline(stream, trigger_def, pipeline_name, + pipeline_config) pm.db.get_stream_events.assert_called_once_with(stream) - mock_pipeline.assert_called_once_with(pipeline_name, pipeline_config, pm.pipeline_handlers) + mock_pipeline.assert_called_once_with(pipeline_name, pipeline_config, + pm.pipeline_handlers) pipeline.handle_events.assert_called_once_with( - pm.db.get_stream_events.return_value) + pm.db.get_stream_events.return_value, self.debugger) self.assertFalse(pm.add_new_events.called) self.assertFalse(ret) @@ -298,9 +312,11 @@ class TestPipelineManager(unittest.TestCase): pm._run_pipeline = mock.MagicMock(name='_run_pipeline') pm._run_pipeline.return_value = True - ret = pm.fire_stream("test stream") - pm.db.set_stream_state.assert_called_once_with("test stream", StreamState.firing) - pm._run_pipeline.assert_called_once_with(stream, trigger_def, 'test_fire_pipeline', pipeline_config) + ret = pm.fire_stream(stream) + pm.db.set_stream_state.assert_called_once_with(stream, + StreamState.firing) + pm._run_pipeline.assert_called_once_with(stream, trigger_def, + 'test_fire_pipeline', pipeline_config) self.assertFalse(pm._error_stream.called) pm._complete_stream.assert_called_once_with(stream) self.assertTrue(ret) @@ -344,8 +360,9 @@ class TestPipelineManager(unittest.TestCase): pm._run_pipeline = mock.MagicMock(name='_run_pipeline') pm._run_pipeline.return_value = True - ret = pm.fire_stream("test stream") - pm.db.set_stream_state.assert_called_once_with("test stream", StreamState.firing) + ret = pm.fire_stream(stream) + pm.db.set_stream_state.assert_called_once_with(stream, + StreamState.firing) self.assertFalse(pm._error_stream.called) self.assertFalse(pm._run_pipeline.called) pm._complete_stream.assert_called_once_with(stream) @@ -368,9 +385,12 @@ class TestPipelineManager(unittest.TestCase): pm._run_pipeline = mock.MagicMock(name='_run_pipeline') pm._run_pipeline.return_value = False - ret = pm.fire_stream("test stream") - pm.db.set_stream_state.assert_called_once_with("test stream", StreamState.firing) - pm._run_pipeline.assert_called_once_with(stream, trigger_def, 'test_fire_pipeline', pipeline_config) + ret = pm.fire_stream(stream) + pm.db.set_stream_state.assert_called_once_with(stream, + StreamState.firing) + pm._run_pipeline.assert_called_once_with(stream, trigger_def, + 'test_fire_pipeline', + pipeline_config) self.assertFalse(pm._complete_stream.called) pm._error_stream.assert_called_once_with(stream) self.assertFalse(ret) @@ -392,9 +412,11 @@ class TestPipelineManager(unittest.TestCase): pm._run_pipeline = mock.MagicMock(name='_run_pipeline') pm._run_pipeline.return_value = True - ret = pm.expire_stream("test stream") - pm.db.set_stream_state.assert_called_once_with("test stream", StreamState.expiring) - pm._run_pipeline.assert_called_once_with(stream, trigger_def, 'test_fire_pipeline', pipeline_config) + ret = pm.expire_stream(stream) + pm.db.set_stream_state.assert_called_once_with(stream, + StreamState.expiring) + pm._run_pipeline.assert_called_once_with(stream, trigger_def, + 'test_fire_pipeline', pipeline_config) self.assertFalse(pm._error_stream.called) pm._complete_stream.assert_called_once_with(stream) self.assertTrue(ret) @@ -438,8 +460,9 @@ class TestPipelineManager(unittest.TestCase): pm._run_pipeline = mock.MagicMock(name='_run_pipeline') pm._run_pipeline.return_value = True - ret = pm.expire_stream("test stream") - pm.db.set_stream_state.assert_called_once_with("test stream", StreamState.expiring) + ret = pm.expire_stream(stream) + pm.db.set_stream_state.assert_called_once_with(stream, + StreamState.expiring) self.assertFalse(pm._expire_error_stream.called) self.assertFalse(pm._run_pipeline.called) pm._complete_stream.assert_called_once_with(stream) @@ -462,9 +485,11 @@ class TestPipelineManager(unittest.TestCase): pm._run_pipeline = mock.MagicMock(name='_run_pipeline') pm._run_pipeline.return_value = False - ret = pm.expire_stream("test stream") - pm.db.set_stream_state.assert_called_once_with("test stream", StreamState.expiring) - pm._run_pipeline.assert_called_once_with(stream, trigger_def, 'test_fire_pipeline', pipeline_config) + ret = pm.expire_stream(stream) + pm.db.set_stream_state.assert_called_once_with(stream, + StreamState.expiring) + pm._run_pipeline.assert_called_once_with(stream, trigger_def, + 'test_fire_pipeline', pipeline_config) self.assertFalse(pm._complete_stream.called) pm._expire_error_stream.assert_called_once_with(stream) self.assertFalse(ret) @@ -474,13 +499,17 @@ class TestPipelineManager(unittest.TestCase): pm = pipeline_manager.PipelineManager('test') pm.db = mock.MagicMock(spec=pm.db, name='db') stream = mock.MagicMock(name='stream') + stream.name = "my_stream" + tdef = mock.MagicMock(name='tdef') + pm.trigger_map['my_stream'] = tdef pm.expire_stream = mock.MagicMock(name='expire_stream') pm.fire_stream = mock.MagicMock(name='fire_stream') pm.current_time = mock.MagicMock(name='current_time') pm.db.get_ready_streams.return_value = [stream] ret = pm.process_ready_streams(42) - pm.db.get_ready_streams.assert_called_once_with(42, pm.current_time.return_value, expire=False) + pm.db.get_ready_streams.assert_called_once_with(42, + pm.current_time.return_value, expire=False) pm.fire_stream.assert_called_once_with(stream) self.assertFalse(pm.expire_stream.called) self.assertEqual(ret, 1) @@ -490,13 +519,26 @@ class TestPipelineManager(unittest.TestCase): pm = pipeline_manager.PipelineManager('test') pm.db = mock.MagicMock(spec=pm.db, name='db') stream = mock.MagicMock(name='stream') + stream.name = "my_stream" pm.expire_stream = mock.MagicMock(name='expire_stream') pm.fire_stream = mock.MagicMock(name='fire_stream') pm.current_time = mock.MagicMock(name='current_time') pm.db.get_ready_streams.return_value = [stream] ret = pm.process_ready_streams(42, expire=True) - pm.db.get_ready_streams.assert_called_once_with(42, pm.current_time.return_value, expire=True) + pm.db.get_ready_streams.assert_called_once_with(42, + pm.current_time.return_value, expire=True) pm.expire_stream.assert_called_once_with(stream) self.assertFalse(pm.fire_stream.called) self.assertEqual(ret, 1) + + @mock.patch.object(pipeline_manager.ConfigManager, 'wrap') + def test_safe_get_debugger(self, mock_config_wrap): + pm = pipeline_manager.PipelineManager('test') + tdef = mock.MagicMock(name="tdef") + tdef.name = "my trigger" + tdef.debugger = self.debugger + self.assertEqual(pm.safe_get_debugger(tdef), self.debugger) + + self.assertEqual(pm.safe_get_debugger(None)._name, "n/a") + diff --git a/tests/test_trigger_manager.py b/tests/test_trigger_manager.py index d5c02cc..e91c50b 100644 --- a/tests/test_trigger_manager.py +++ b/tests/test_trigger_manager.py @@ -5,14 +5,17 @@ import mock import datetime import timex -from winchester import trigger_manager -from winchester import definition from winchester import db as winch_db +from winchester import debugging +from winchester import definition +from winchester import trigger_manager + class TestTriggerManager(unittest.TestCase): def setUp(self): super(TestTriggerManager, self).setUp() + self.debugger = debugging.NoOpDebugger() @mock.patch.object(trigger_manager.ConfigManager, 'wrap') def test_save_event(self, mock_config_wrap): @@ -25,7 +28,8 @@ class TestTriggerManager(unittest.TestCase): other_test_trait=42) self.assertTrue(tm.save_event(event)) tm.db.create_event.assert_called_once_with('1234-test-5678', 'test.thing', - datetime.datetime(2014,8,1,10,9,8,77777), dict(test_trait='foobar', other_test_trait=42)) + datetime.datetime(2014,8,1,10,9,8,77777), + dict(test_trait='foobar', other_test_trait=42)) @mock.patch.object(trigger_manager.ConfigManager, 'wrap') def test_save_event_dup(self, mock_config_wrap): @@ -39,7 +43,8 @@ class TestTriggerManager(unittest.TestCase): other_test_trait=42) self.assertFalse(tm.save_event(event)) tm.db.create_event.assert_called_once_with('1234-test-5678', 'test.thing', - datetime.datetime(2014,8,1,10,9,8,77777), dict(test_trait='foobar', other_test_trait=42)) + datetime.datetime(2014,8,1,10,9,8,77777), + dict(test_trait='foobar', other_test_trait=42)) @mock.patch('winchester.trigger_manager.EventCondenser', autospec=True) @mock.patch.object(trigger_manager.ConfigManager, 'wrap') @@ -119,11 +124,12 @@ class TestTriggerManager(unittest.TestCase): event = "eventful!" ret = tm._add_or_create_stream(trigger_def, event, dist_traits) - tm.db.get_active_stream.assert_called_once_with(trigger_def.name, dist_traits, - tm.current_time.return_value) + tm.db.get_active_stream.assert_called_once_with(trigger_def.name, + dist_traits, tm.current_time.return_value) self.assertFalse(tm.db.create_stream.called) - tm.db.add_event_stream.assert_called_once_with(tm.db.get_active_stream.return_value, - event, trigger_def.expiration) + tm.db.add_event_stream.assert_called_once_with( + tm.db.get_active_stream.return_value, + event, trigger_def.expiration) self.assertEqual(ret, tm.db.get_active_stream.return_value) @mock.patch.object(trigger_manager.ConfigManager, 'wrap') @@ -183,6 +189,8 @@ class TestTriggerManager(unittest.TestCase): tm = trigger_manager.TriggerManager('test') tm.db = mock.MagicMock(spec=tm.db) tm.trigger_definitions = [mock.MagicMock() for n in range(3)] + for d in tm.trigger_definitions: + d.debugger = self.debugger m_def = tm.trigger_definitions[2] tm.trigger_definitions[0].match.return_value = None tm.trigger_definitions[1].match.return_value = None @@ -197,12 +205,16 @@ class TestTriggerManager(unittest.TestCase): tm.save_event.assert_called_once_with(event) for td in tm.trigger_definitions: td.match.assert_called_once_with(event) - m_def.get_distinguishing_traits.assert_called_once_with(event, m_def.match.return_value) + m_def.get_distinguishing_traits.assert_called_once_with(event, + m_def.match.return_value) tm._add_or_create_stream.assert_called_once_with(m_def, event, m_def.get_distinguishing_traits.return_value) - tm.db.get_stream_events.assert_called_once_with(tm._add_or_create_stream.return_value) - m_def.should_fire.assert_called_once_with(tm.db.get_stream_events.return_value) - tm._ready_to_fire.assert_called_once_with(tm._add_or_create_stream.return_value, m_def) + tm.db.get_stream_events.assert_called_once_with( + tm._add_or_create_stream.return_value) + m_def.should_fire.assert_called_once_with( + tm.db.get_stream_events.return_value) + tm._ready_to_fire.assert_called_once_with( + tm._add_or_create_stream.return_value, m_def) @mock.patch.object(trigger_manager.ConfigManager, 'wrap') def test_add_event_on_ready_stream(self, mock_config_wrap): @@ -223,7 +235,8 @@ class TestTriggerManager(unittest.TestCase): tm.save_event.assert_called_once_with(event) for td in tm.trigger_definitions: td.match.assert_called_once_with(event) - m_def.get_distinguishing_traits.assert_called_once_with(event, m_def.match.return_value) + m_def.get_distinguishing_traits.assert_called_once_with(event, + m_def.match.return_value) tm._add_or_create_stream.assert_called_once_with(m_def, event, m_def.get_distinguishing_traits.return_value) self.assertFalse(tm.db.get_stream_events.called) @@ -254,5 +267,3 @@ class TestTriggerManager(unittest.TestCase): self.assertFalse(tm._add_or_create_stream.called) self.assertFalse(tm.db.get_stream_events.called) self.assertFalse(tm._ready_to_fire.called) - - diff --git a/winchester/debugging.py b/winchester/debugging.py new file mode 100644 index 0000000..9ea1f91 --- /dev/null +++ b/winchester/debugging.py @@ -0,0 +1,161 @@ +# Copyright (c) 2014 Dark Secret Software Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +import logging +import six + +logger = logging.getLogger(__name__) + + +class NoOpGroup(object): + def match(self): + return True + + def mismatch(self, reason): + return False + + def check(self, value, reason): + return value + + +class Group(object): + def __init__(self, name): + self._name = name # Group name + self._match = 0 + self._mismatch = 0 + self._reasons = {} + + def match(self): + self._match += 1 + return True + + def mismatch(self, reason): + count = self._reasons.get(reason, 0) + self._reasons[reason] = count + 1 + self._mismatch += 1 + return False + + def check(self, value, reason): + if value: + return self.match() + return self.mismatch(reason) + + +@six.add_metaclass(abc.ABCMeta) +class BaseDebugger(object): + @abc.abstractmethod + def reset(self): + pass + + @abc.abstractmethod + def get_group(self, name): + pass + + @abc.abstractmethod + def bump_counter(self, name, inc=1): + pass + + @abc.abstractmethod + def get_debug_level(self): + pass + + +class NoOpDebugger(BaseDebugger): + def __init__(self, *args, **kwargs): + self.noop_group = NoOpGroup() + + def reset(self): + pass + + def get_group(self, name): + return self.noop_group + + def bump_counter(self, name, inc=1): + pass + + def get_debug_level(self): + return 0 + + +class DetailedDebugger(BaseDebugger): + def __init__(self, name, debug_level): + super(DetailedDebugger, self).__init__() + self._name = name + self._debug_level = debug_level + self.reset() + + def reset(self): + # If it's not a match or a mismatch it was a fatal error. + self._groups = {} + self._counters = {} + + def get_group(self, name): + group = self._groups.get(name, Group(name)) + self._groups[name] = group + return group + + def bump_counter(self, name, inc=1): + self._counters[name] = self._counters.get(name, 0) + inc + + def get_debug_level(self): + return self._debug_level + + +class DebugManager(object): + def __init__(self): + self._debuggers = {} + + def get_debugger(self, trigger_def): + name = "n/a" + level = 2 # Default these unknowns to full debug. + if trigger_def is not None: + name = trigger_def.name + level = trigger_def.debug_level + debugger = self._debuggers.get(name) + if not debugger: + if level > 0: + debugger = DetailedDebugger(name, level) + else: + debugger = NoOpDebugger() + self._debuggers[name] = debugger + return debugger + + def dump_group(self, debugger, group_name): + group = debugger.get_group(group_name) + logger.info("%s Criteria: %d checks, %d passed" % + (group._name, + group._match + group._mismatch, group._match)) + + if debugger.get_debug_level() > 1: + for kv in group._reasons.items(): + logger.info(" - %s = %d" % kv) + + def dump_counters(self, debugger): + for kv in debugger._counters.items(): + logger.info("Counter \"%s\" = %d" % kv) + + def dump_debuggers(self): + for debugger in self._debuggers.values(): + if debugger.get_debug_level() == 0: + continue + + logger.info("---- Trigger Definition: %s ----" % debugger._name) + for name in debugger._groups.keys(): + self.dump_group(debugger, name) + + self.dump_counters(debugger) + debugger.reset() + logger.info("----------------------------") diff --git a/winchester/definition.py b/winchester/definition.py index c83de23..dc5122d 100644 --- a/winchester/definition.py +++ b/winchester/definition.py @@ -24,7 +24,8 @@ class Criterion(object): def get_from_expression(cls, expression, trait_name): if isinstance(expression, collections.Mapping): if len(expression) != 1: - raise DefinitionError("Only exactly one type of match is allowed per criterion expression") + raise DefinitionError("Only exactly one type of match is " + "allowed per criterion expression") ctype = expression.keys()[0] expr = expression[ctype] if ctype == 'int': @@ -45,16 +46,17 @@ class Criterion(object): self.op = '=' self.value = expr - def match(self, event): + def match(self, event, debug_group): if self.trait_name not in event: - return False + return debug_group.mismatch("not %s" % self.trait_name) value = event[self.trait_name] if self.op == '=': - return value == self.value + return debug_group.check(value == self.value, "== failed") elif self.op == '>': - return value > self.value + return debug_group.check(value > self.value, "> failed") elif self.op == '<': - return value < self.value + return debug_group.check(value < self.value, "< failed") + return debug_group.mismatch("Criterion match() fall-thru") class NumericCriterion(Criterion): @@ -95,16 +97,16 @@ class TimeCriterion(Criterion): self.trait_name = trait_name self.time_expr = timex.parse(expression) - def match(self, event): + def match(self, event, debug_group): if self.trait_name not in event: - return False + return debug_group.mismatch("Time: not '%s'" % self.trait_name) value = event[self.trait_name] try: timerange = self.time_expr(**filter_event_timestamps(event)) except timex.TimexExpressionError: # the event doesn't contain a trait referenced in the expression. - return False - return value in timerange + return debug_group.mismatch("Time: no referenced trait") + return debug_group.check(value in timerange, "Not in timerange") class Criteria(object): @@ -150,30 +152,31 @@ class Criteria(object): return (self.included_type(event_type) and not self.excluded_type(event_type)) - def match(self, event): + def match(self, event, debug_group): if not self.match_type(event['event_type']): - return False + return debug_group.mismatch("Wrong event type") if self.timestamp: try: t = self.timestamp(**filter_event_timestamps(event)) except timex.TimexExpressionError: # the event doesn't contain a trait referenced in the expression. - return False + return debug_group.mismatch("No timestamp trait") if event['timestamp'] not in t: - return False + return debug_group.mismatch("Not time yet.") if not self.traits: - return True - return all(criterion.match(event) for - criterion in self.traits.values()) + return debug_group.match() + return all(criterion.match(event, debug_group) for + criterion in self.traits.values()) class TriggerDefinition(object): - def __init__(self, config): + def __init__(self, config, debug_manager): if 'name' not in config: raise DefinitionError("Required field in trigger definition not " "specified 'name'") self.name = config['name'] + self.debug_level = int(config.get('debug_level', 0)) self.distinguished_by = config.get('distinguished_by', []) for dt in self.distinguished_by: if isinstance(dt, collections.Mapping): @@ -202,19 +205,30 @@ class TriggerDefinition(object): self.load_criteria = [] if 'load_criteria' in config: self.load_criteria = [Criteria(c) for c in config['load_criteria']] + if debug_manager: + self.set_debugger(debug_manager) + + def set_debugger(self, debug_manager): + self.debugger = debug_manager.get_debugger(self) def match(self, event): # all distinguishing traits must exist to match. + group = self.debugger.get_group("Match") for dt in self.distinguished_by: if isinstance(dt, collections.Mapping): trait_name = dt.keys()[0] else: trait_name = dt if trait_name not in event: + group.mismatch("not '%s'" % trait_name) return None + for criteria in self.match_criteria: - if criteria.match(event): + if criteria.match(event, group): + group.match() return criteria + + group.mismatch("No matching criteria") return None def get_distinguishing_traits(self, event, matching_criteria): @@ -237,14 +251,14 @@ class TriggerDefinition(object): return timestamp + datetime.timedelta(seconds=self.fire_delay) def should_fire(self, events): + group = self.debugger.get_group("Fire") for criteria in self.fire_criteria: matches = 0 for event in events: - if criteria.match(event): + if criteria.match(event, group): matches += 1 if matches >= criteria.number: break if matches < criteria.number: - return False - return True - + return group.mismatch("Not enough matching criteria") + return group.match() diff --git a/winchester/pipeline_manager.py b/winchester/pipeline_manager.py index 545c446..5d78b73 100644 --- a/winchester/pipeline_manager.py +++ b/winchester/pipeline_manager.py @@ -59,31 +59,37 @@ class Pipeline(object): raise PipelineExecutionError("Error loading pipeline", e) self.handlers.append(handler) - def handle_events(self, events): + def handle_events(self, events, debugger): event_ids = set(e['message_id'] for e in events) try: for handler in self.handlers: events = handler.handle_events(events, self.env) + debugger.bump_counter("Pre-commit successful") except Exception as e: logger.exception("Error processing pipeline %s" % self.name) - self.rollback() + debugger.bump_counter("Pipeline error") + self.rollback(debugger) raise PipelineExecutionError("Error in pipeline", e) new_events = [e for e in events if e['message_id'] not in event_ids] - self.commit() + self.commit(debugger) return new_events - def commit(self): + def commit(self, debugger): for handler in self.handlers: try: handler.commit() + debugger.bump_counter("Commit successful") except: + debugger.bump_counter("Commit error") logger.exception("Commit error on handler in pipeline %s" % self.name) - def rollback(self): + def rollback(self, debugger): for handler in self.handlers: try: handler.rollback() + debugger.bump_counter("Rollback successful") except: + debugger.bump_counter("Rollback error") logger.exception("Rollback error on handler in pipeline %s" % self.name) @@ -94,22 +100,30 @@ class PipelineManager(object): configs = TriggerManager.config_description() configs.update(dict( pipeline_handlers=ConfigItem(required=True, - help="dictionary of pipeline handlers to load " - "Classes specified with simport syntax. " - "simport docs for more info"), - pipeline_worker_batch_size=ConfigItem(help="Number of streams for pipeline " - "worker(s) to load at a time", default=1000), - pipeline_worker_delay=ConfigItem(help="Number of seconds for pipeline worker to sleep " - "when it finds no streams to process", default=10), + help="dictionary of pipeline handlers to load " + "Classes specified with simport syntax. " + "simport docs for more info"), + pipeline_worker_batch_size=ConfigItem( + help="Number of streams for pipeline " + "worker(s) to load at a time", + default=1000), + pipeline_worker_delay=ConfigItem( + help="Number of seconds for pipeline worker " + "to sleep when it finds no streams to " + "process", default=10), pipeline_config=ConfigItem(required=True, help="Name of pipeline config file " - "defining the handlers for each pipeline."), - purge_completed_streams=ConfigItem(help="Delete successfully proccessed " - "streams when finished?", default=True), + "defining the handlers for each " + "pipeline."), + purge_completed_streams=ConfigItem( + help="Delete successfully proccessed " + "streams when finished?", + default=True), )) return configs - def __init__(self, config, db=None, pipeline_handlers=None, pipeline_config=None, trigger_defs=None): + def __init__(self, config, db=None, pipeline_handlers=None, + pipeline_config=None, trigger_defs=None): logger.debug("PipelineManager: Using config: %s" % str(config)) config = ConfigManager.wrap(config, self.config_description()) self.config = config @@ -142,7 +156,7 @@ class PipelineManager(object): else: defs = config.load_file(config['trigger_definitions']) logger.debug("Loaded trigger definitions %s" % str(defs)) - self.trigger_definitions = [TriggerDefinition(conf) for conf in defs] + self.trigger_definitions = [TriggerDefinition(conf, None) for conf in defs] self.trigger_map = dict((tdef.name, tdef) for tdef in self.trigger_definitions) self.trigger_manager = TriggerManager(self.config, db=self.db, @@ -187,15 +201,19 @@ class PipelineManager(object): self.streams_loaded = 0 self.last_status = self.current_time() + self.trigger_manager.debug_manager.dump_debuggers() + def add_new_events(self, events): for event in events: self.trigger_manager.add_event(event) - def _run_pipeline(self, stream, trigger_def, pipeline_name, pipeline_config): + def _run_pipeline(self, stream, trigger_def, pipeline_name, + pipeline_config): events = self.db.get_stream_events(stream) + debugger = trigger_def.debugger try: pipeline = Pipeline(pipeline_name, pipeline_config, self.pipeline_handlers) - new_events = pipeline.handle_events(events) + new_events = pipeline.handle_events(events, debugger) except PipelineExecutionError: logger.error("Exception in pipeline %s handling stream %s" % ( pipeline_name, stream.id)) @@ -228,15 +246,22 @@ class PipelineManager(object): logger.error("Stream %s locked while trying to set 'expire_error' state! " "This should not happen." % stream.id) + def safe_get_debugger(self, trigger_def): + return trigger_def.debugger if trigger_def is not None else \ + self.trigger_manager.debug_manager.get_debugger(None) + def fire_stream(self, stream): + trigger_def = self.trigger_map.get(stream.name) + debugger = self.safe_get_debugger(trigger_def) try: stream = self.db.set_stream_state(stream, StreamState.firing) except LockError: logger.debug("Stream %s locked. Moving on..." % stream.id) + debugger.bump_counter("Locked") return False logger.debug("Firing Stream %s." % stream.id) - trigger_def = self.trigger_map.get(stream.name) if trigger_def is None: + debugger.bump_counter("Unknown trigger def '%s'" % stream.name) logger.error("Stream %s has unknown trigger definition %s" % ( stream.id, stream.name)) self._error_stream(stream) @@ -245,28 +270,36 @@ class PipelineManager(object): if pipeline is not None: pipe_config = self.pipeline_config.get(pipeline) if pipe_config is None: - logger.error("Trigger %s for stream %s has unknown pipeline %s" % ( - stream.name, stream.id, pipeline)) + debugger.bump_counter("Unknown pipeline '%s'" % pipeline) + logger.error("Trigger %s for stream %s has unknown " + "pipeline %s" % (stream.name, stream.id, + pipeline)) self._error_stream(stream) - if not self._run_pipeline(stream, trigger_def, pipeline, pipe_config): + if not self._run_pipeline(stream, trigger_def, pipeline, + pipe_config): self._error_stream(stream) return False else: logger.debug("No fire pipeline for stream %s. Nothing to do." % ( stream.id)) + debugger.bump_counter("No fire pipeline for '%s'" % stream.name) self._complete_stream(stream) + debugger.bump_counter("Streams fired") self.streams_fired +=1 return True def expire_stream(self, stream): + trigger_def = self.trigger_map.get(stream.name) + debugger = self.safe_get_debugger(trigger_def) try: stream = self.db.set_stream_state(stream, StreamState.expiring) except LockError: + debugger.bump_counter("Locked") logger.debug("Stream %s locked. Moving on..." % stream.id) return False logger.debug("Expiring Stream %s." % stream.id) - trigger_def = self.trigger_map.get(stream.name) if trigger_def is None: + debugger.bump_counter("Unknown trigger def '%s'" % stream.name) logger.error("Stream %s has unknown trigger definition %s" % ( stream.id, stream.name)) self._expire_error_stream(stream) @@ -275,16 +308,20 @@ class PipelineManager(object): if pipeline is not None: pipe_config = self.pipeline_config.get(pipeline) if pipe_config is None: + debugger.bump_counter("Unknown pipeline '%s'" % pipeline) logger.error("Trigger %s for stream %s has unknown pipeline %s" % ( stream.name, stream.id, pipeline)) self._expire_error_stream(stream) - if not self._run_pipeline(stream, trigger_def, pipeline, pipe_config): + if not self._run_pipeline(stream, trigger_def, pipeline, + pipe_config): self._expire_error_stream(stream) return False else: logger.debug("No expire pipeline for stream %s. Nothing to do." % ( stream.id)) + debugger.bump_counter("No expire pipeline for '%s'" % stream.name) self._complete_stream(stream) + debugger.bump_counter("Streams expired") self.streams_expired +=1 return True diff --git a/winchester/trigger_manager.py b/winchester/trigger_manager.py index a9100dc..b48b751 100644 --- a/winchester/trigger_manager.py +++ b/winchester/trigger_manager.py @@ -3,8 +3,9 @@ import logging from stackdistiller import distiller, condenser import simport -from winchester.db import DBInterface, DuplicateError from winchester.config import ConfigManager, ConfigSection, ConfigItem +from winchester import debugging +from winchester.db import DBInterface, DuplicateError from winchester.definition import TriggerDefinition @@ -66,32 +67,37 @@ class TriggerManager(object): @classmethod def config_description(cls): - return dict(config_path=ConfigItem(help="Path(s) to find additional config files", - multiple=True, default='.'), + return dict(config_path=ConfigItem( + help="Path(s) to find additional config files", + multiple=True, default='.'), distiller_config=ConfigItem(required=True, help="Name of distiller config file " "describing what to extract from the " "notifications"), - distiller_trait_plugins=ConfigItem(help="dictionary of trait plugins to load " - "for stackdistiller. Classes specified with " - "simport syntax. See stackdistiller and " - "simport docs for more info", default=dict()), - catch_all_notifications=ConfigItem(help="Store basic info for all notifications," - " even if not listed in distiller config", - default=False), - statistics_period=ConfigItem(help="Emit stats on event counts, etc every " - "this many seconds", default=10), + distiller_trait_plugins=ConfigItem( + help="dictionary of trait plugins to load " + "for stackdistiller. Classes specified with " + "simport syntax. See stackdistiller and " + "simport docs for more info", default=dict()), + catch_all_notifications=ConfigItem( + help="Store basic info for all notifications," + " even if not listed in distiller config", + default=False), + statistics_period=ConfigItem( + help="Emit stats on event counts, etc every " + "this many seconds", default=10), database=ConfigSection(help="Database connection info.", - config_description=DBInterface.config_description()), + config_description=DBInterface.config_description()), trigger_definitions=ConfigItem(required=True, - help="Name of trigger definitions file " - "defining trigger conditions and what events to " - "process for each stream"), - ) + help="Name of trigger definitions file " + "defining trigger conditions and what events to " + "process for each stream"), + ) def __init__(self, config, db=None, stackdistiller=None, trigger_defs=None): config = ConfigManager.wrap(config, self.config_description()) self.config = config + self.debug_manager = debugging.DebugManager() config.check_config() config.add_config_path(*config['config_path']) @@ -110,10 +116,13 @@ class TriggerManager(object): catchall=config['catch_all_notifications']) if trigger_defs is not None: self.trigger_definitions = trigger_defs + for t in self.trigger_definitions: + t.set_debugger(self.debug_manager) else: defs = config.load_file(config['trigger_definitions']) - self.trigger_definitions = [TriggerDefinition(conf) for conf in defs] - self.statistics_period = config['statistics_period'] + self.trigger_definitions = [TriggerDefinition(conf, + self.debug_manager) + for conf in defs] self.saved_events = 0 self.received = 0 self.last_status = self.current_time() @@ -180,9 +189,12 @@ class TriggerManager(object): self.saved_events = 0 self.last_status = self.current_time() + self.debug_manager.dump_debuggers() + def _add_or_create_stream(self, trigger_def, event, dist_traits): stream = self.db.get_active_stream(trigger_def.name, dist_traits, self.current_time()) if stream is None: + trigger_def.debugger.bump_counter("New stream") stream = self.db.create_stream(trigger_def.name, event, dist_traits, trigger_def.expiration) logger.debug("Created New stream %s for %s: distinguished by %s" % ( @@ -194,6 +206,7 @@ class TriggerManager(object): def _ready_to_fire(self, stream, trigger_def): timestamp = trigger_def.get_fire_timestamp(self.current_time()) self.db.stream_ready_to_fire(stream, timestamp) + trigger_def.debugger.bump_counter("Ready to fire") logger.debug("Stream %s ready to fire at %s" % ( stream.id, timestamp)) @@ -202,16 +215,17 @@ class TriggerManager(object): for trigger_def in self.trigger_definitions: matched_criteria = trigger_def.match(event) if matched_criteria: - dist_traits = trigger_def.get_distinguishing_traits(event, matched_criteria) - stream = self._add_or_create_stream(trigger_def, event, dist_traits) + dist_traits = trigger_def.get_distinguishing_traits( + event, matched_criteria) + stream = self._add_or_create_stream(trigger_def, event, + dist_traits) + trigger_def.debugger.bump_counter("Added events") if stream.fire_timestamp is None: - if trigger_def.should_fire(self.db.get_stream_events(stream)): + if trigger_def.should_fire(self.db.get_stream_events( + stream)): self._ready_to_fire(stream, trigger_def) - if (self.current_time() - self.last_status).seconds > self.statistics_period: - self._log_statistics() def add_notification(self, notification_body): event = self.convert_notification(notification_body) if event: self.add_event(event) - diff --git a/winchester/yagi_handler.py b/winchester/yagi_handler.py index a4a541e..1f748d6 100644 --- a/winchester/yagi_handler.py +++ b/winchester/yagi_handler.py @@ -28,3 +28,5 @@ class WinchesterHandler(BaseHandler): for notification in self.iterate_payloads(messages, env): self.trigger_manager.add_notification(notification) + def on_idle(self, num_messages, queue_name): + self.trigger_manager._log_statistics()