From 4366f0f719267892135ec56b4be7403700894a1e Mon Sep 17 00:00:00 2001 From: Stanislav Kudriashev Date: Mon, 31 Mar 2014 01:08:49 +0300 Subject: [PATCH] Use correct exception in the timing listener Used `StorageFailure` exception instead of non-existing `StorageError` in the timing listener. Change-Id: I83035b737f7507b760799a5d44d4c7d097103ae5 --- taskflow/exceptions.py | 2 +- taskflow/listeners/timing.py | 4 +-- .../persistence/backends/impl_sqlalchemy.py | 4 +-- taskflow/tests/unit/test_duration.py | 25 ++++++++++++++++--- 4 files changed, 27 insertions(+), 8 deletions(-) diff --git a/taskflow/exceptions.py b/taskflow/exceptions.py index 9e557cc6..f526068e 100644 --- a/taskflow/exceptions.py +++ b/taskflow/exceptions.py @@ -58,7 +58,7 @@ class TaskFlowException(Exception): # Errors related to storage or operations on storage units. class StorageFailure(TaskFlowException): - """Raised when storage backends can not be read/saved/deleted...""" + """Raised when storage backends can not be read/saved/deleted.""" # Job related errors. diff --git a/taskflow/listeners/timing.py b/taskflow/listeners/timing.py index bd8c9542..15ebe82e 100644 --- a/taskflow/listeners/timing.py +++ b/taskflow/listeners/timing.py @@ -18,7 +18,7 @@ from __future__ import absolute_import import logging -from taskflow import exceptions as excp +from taskflow import exceptions as exc from taskflow.listeners import base from taskflow import states from taskflow.utils import misc @@ -55,7 +55,7 @@ class TimingListener(base.ListenerBase): try: # Don't let storage failures throw exceptions in a listener method. self._engine.storage.update_atom_metadata(task_name, meta_update) - except excp.StorageError: + except exc.StorageFailure: LOG.warn("Failure to store duration update %s for task %s", meta_update, task_name, exc_info=True) diff --git a/taskflow/persistence/backends/impl_sqlalchemy.py b/taskflow/persistence/backends/impl_sqlalchemy.py index 2067ff5a..2f2cffcb 100644 --- a/taskflow/persistence/backends/impl_sqlalchemy.py +++ b/taskflow/persistence/backends/impl_sqlalchemy.py @@ -434,8 +434,8 @@ class Connection(base.Connection): def _atomdetails_merge(ad_m, ad): atom_type = logbook.atom_detail_type(ad) if atom_type != ad_m.atom_type: - raise exc.StorageError("Can not merge differing atom types (%s != %s)" - % (atom_type, ad_m.atom_type)) + raise exc.StorageFailure("Can not merge differing atom types " + "(%s != %s)" % (atom_type, ad_m.atom_type)) ad_d = ad.to_dict() ad_m.state = ad_d['state'] ad_m.intention = ad_d['intention'] diff --git a/taskflow/tests/unit/test_duration.py b/taskflow/tests/unit/test_duration.py index 2e5ab84b..67d240cb 100644 --- a/taskflow/tests/unit/test_duration.py +++ b/taskflow/tests/unit/test_duration.py @@ -15,15 +15,18 @@ # under the License. import contextlib +import mock import time from taskflow import task from taskflow import test import taskflow.engines +from taskflow import exceptions as exc from taskflow.listeners import timing from taskflow.patterns import linear_flow as lf from taskflow.persistence.backends import impl_memory +from taskflow.tests import utils as t_utils from taskflow.utils import persistence_utils as p_utils @@ -49,10 +52,10 @@ class TestDuration(test.TestCase): def test_duration(self): with contextlib.closing(impl_memory.MemoryBackend({})) as be: - flo = lf.Flow("test") - flo.add(SleepyTask("test-1", sleep_for=0.1)) + flow = lf.Flow("test") + flow.add(SleepyTask("test-1", sleep_for=0.1)) (lb, fd) = p_utils.temporary_flow_detail(be) - e = self.make_engine(flo, fd, be) + e = self.make_engine(flow, fd, be) with timing.TimingListener(e): e.run() t_uuid = e.storage.get_atom_uuid("test-1") @@ -61,3 +64,19 @@ class TestDuration(test.TestCase): self.assertIsNotNone(td.meta) self.assertIn('duration', td.meta) self.assertGreaterEqual(0.1, td.meta['duration']) + + @mock.patch.object(timing.LOG, 'warn') + def test_record_ending_exception(self, mocked_warn): + with contextlib.closing(impl_memory.MemoryBackend({})) as be: + flow = lf.Flow("test") + flow.add(t_utils.TaskNoRequiresNoReturns("test-1")) + (lb, fd) = p_utils.temporary_flow_detail(be) + e = self.make_engine(flow, fd, be) + timing_listener = timing.TimingListener(e) + with mock.patch.object(timing_listener._engine.storage, + 'update_atom_metadata') as mocked_uam: + mocked_uam.side_effect = exc.StorageFailure('Woot!') + with timing_listener: + e.run() + mocked_warn.assert_called_once_with(mock.ANY, mock.ANY, 'test-1', + exc_info=True)