Use correct exception in the timing listener
Used `StorageFailure` exception instead of non-existing `StorageError` in the timing listener. Change-Id: I83035b737f7507b760799a5d44d4c7d097103ae5
This commit is contained in:
@@ -58,7 +58,7 @@ class TaskFlowException(Exception):
|
||||
# Errors related to storage or operations on storage units.
|
||||
|
||||
class StorageFailure(TaskFlowException):
|
||||
"""Raised when storage backends can not be read/saved/deleted..."""
|
||||
"""Raised when storage backends can not be read/saved/deleted."""
|
||||
|
||||
|
||||
# Job related errors.
|
||||
|
||||
@@ -18,7 +18,7 @@ from __future__ import absolute_import
|
||||
|
||||
import logging
|
||||
|
||||
from taskflow import exceptions as excp
|
||||
from taskflow import exceptions as exc
|
||||
from taskflow.listeners import base
|
||||
from taskflow import states
|
||||
from taskflow.utils import misc
|
||||
@@ -55,7 +55,7 @@ class TimingListener(base.ListenerBase):
|
||||
try:
|
||||
# Don't let storage failures throw exceptions in a listener method.
|
||||
self._engine.storage.update_atom_metadata(task_name, meta_update)
|
||||
except excp.StorageError:
|
||||
except exc.StorageFailure:
|
||||
LOG.warn("Failure to store duration update %s for task %s",
|
||||
meta_update, task_name, exc_info=True)
|
||||
|
||||
|
||||
@@ -434,8 +434,8 @@ class Connection(base.Connection):
|
||||
def _atomdetails_merge(ad_m, ad):
|
||||
atom_type = logbook.atom_detail_type(ad)
|
||||
if atom_type != ad_m.atom_type:
|
||||
raise exc.StorageError("Can not merge differing atom types (%s != %s)"
|
||||
% (atom_type, ad_m.atom_type))
|
||||
raise exc.StorageFailure("Can not merge differing atom types "
|
||||
"(%s != %s)" % (atom_type, ad_m.atom_type))
|
||||
ad_d = ad.to_dict()
|
||||
ad_m.state = ad_d['state']
|
||||
ad_m.intention = ad_d['intention']
|
||||
|
||||
@@ -15,15 +15,18 @@
|
||||
# under the License.
|
||||
|
||||
import contextlib
|
||||
import mock
|
||||
import time
|
||||
|
||||
from taskflow import task
|
||||
from taskflow import test
|
||||
|
||||
import taskflow.engines
|
||||
from taskflow import exceptions as exc
|
||||
from taskflow.listeners import timing
|
||||
from taskflow.patterns import linear_flow as lf
|
||||
from taskflow.persistence.backends import impl_memory
|
||||
from taskflow.tests import utils as t_utils
|
||||
from taskflow.utils import persistence_utils as p_utils
|
||||
|
||||
|
||||
@@ -49,10 +52,10 @@ class TestDuration(test.TestCase):
|
||||
|
||||
def test_duration(self):
|
||||
with contextlib.closing(impl_memory.MemoryBackend({})) as be:
|
||||
flo = lf.Flow("test")
|
||||
flo.add(SleepyTask("test-1", sleep_for=0.1))
|
||||
flow = lf.Flow("test")
|
||||
flow.add(SleepyTask("test-1", sleep_for=0.1))
|
||||
(lb, fd) = p_utils.temporary_flow_detail(be)
|
||||
e = self.make_engine(flo, fd, be)
|
||||
e = self.make_engine(flow, fd, be)
|
||||
with timing.TimingListener(e):
|
||||
e.run()
|
||||
t_uuid = e.storage.get_atom_uuid("test-1")
|
||||
@@ -61,3 +64,19 @@ class TestDuration(test.TestCase):
|
||||
self.assertIsNotNone(td.meta)
|
||||
self.assertIn('duration', td.meta)
|
||||
self.assertGreaterEqual(0.1, td.meta['duration'])
|
||||
|
||||
@mock.patch.object(timing.LOG, 'warn')
|
||||
def test_record_ending_exception(self, mocked_warn):
|
||||
with contextlib.closing(impl_memory.MemoryBackend({})) as be:
|
||||
flow = lf.Flow("test")
|
||||
flow.add(t_utils.TaskNoRequiresNoReturns("test-1"))
|
||||
(lb, fd) = p_utils.temporary_flow_detail(be)
|
||||
e = self.make_engine(flow, fd, be)
|
||||
timing_listener = timing.TimingListener(e)
|
||||
with mock.patch.object(timing_listener._engine.storage,
|
||||
'update_atom_metadata') as mocked_uam:
|
||||
mocked_uam.side_effect = exc.StorageFailure('Woot!')
|
||||
with timing_listener:
|
||||
e.run()
|
||||
mocked_warn.assert_called_once_with(mock.ANY, mock.ANY, 'test-1',
|
||||
exc_info=True)
|
||||
|
||||
Reference in New Issue
Block a user