Switch to a custom NotImplementedError derivative

When a feature or method is not implemented it's useful
to throw our own derivative of a NotImplementedError error
so that we can distingush these types of errors vs actual
usage of NotImplementedError which could be thrown from
driver or user code.

Change-Id: I8d5dfb56a254f315c5509dc600a078cfef2dde0b
This commit is contained in:
Joshua Harlow 2014-10-07 21:33:50 -07:00 committed by Joshua Harlow
parent 1caaecc5d6
commit b86b7e15d4
5 changed files with 23 additions and 8 deletions
taskflow
exceptions.py
listeners
persistence
tests

@ -135,6 +135,15 @@ class InvalidFormat(TaskFlowException):
# Others.
class NotImplementedError(NotImplementedError):
"""Exception for when some functionality really isn't implemented.
This is typically useful when the library itself needs to distinguish
internal features not being made available from users features not being
made available/implemented (and to avoid misinterpreting the two).
"""
class WrappedFailure(Exception):
"""Wraps one or several failure objects.

@ -124,12 +124,12 @@ class LoggingBase(ListenerBase):
backend.
To implement your own logging listener derive form this class and
override ``_log`` method.
override the ``_log`` method.
"""
@abc.abstractmethod
def _log(self, message, *args, **kwargs):
raise NotImplementedError()
"""Logs the provided *templated* message to some output."""
def _flow_receiver(self, state, details):
self._log("%s has moved flow '%s' (%s) into state '%s'",

@ -411,7 +411,8 @@ class TaskDetail(AtomDetail):
def merge(self, other, deep_copy=False):
if not isinstance(other, TaskDetail):
raise NotImplementedError("Can only merge with other task details")
raise exc.NotImplementedError("Can only merge with other"
" task details")
if other is self:
return self
super(TaskDetail, self).merge(other, deep_copy=deep_copy)
@ -496,8 +497,8 @@ class RetryDetail(AtomDetail):
def merge(self, other, deep_copy=False):
if not isinstance(other, RetryDetail):
raise NotImplementedError("Can only merge with other retry "
"details")
raise exc.NotImplementedError("Can only merge with other"
" retry details")
if other is self:
return self
super(RetryDetail, self).merge(other, deep_copy=deep_copy)

@ -14,11 +14,13 @@
# License for the specific language governing permissions and limitations
# under the License.
import abc
import contextlib
import os
import random
import tempfile
import six
import testtools
@ -136,19 +138,20 @@ class SqlitePersistenceTest(test.TestCase, base.PersistenceTestMixin):
self.db_location = None
@six.add_metaclass(abc.ABCMeta)
class BackendPersistenceTestMixin(base.PersistenceTestMixin):
"""Specifies a backend type and does required setup and teardown."""
def _get_connection(self):
return self.backend.get_connection()
@abc.abstractmethod
def _init_db(self):
"""Sets up the database, and returns the uri to that database."""
raise NotImplementedError()
@abc.abstractmethod
def _remove_db(self):
"""Cleans up by removing the database once the tests are done."""
raise NotImplementedError()
def setUp(self):
super(BackendPersistenceTestMixin, self).setUp()

@ -285,7 +285,9 @@ class EngineTestBase(object):
super(EngineTestBase, self).tearDown()
def _make_engine(self, flow, **kwargs):
raise NotImplementedError()
raise exceptions.NotImplementedError("_make_engine() must be"
" overridden if an engine is"
" desired")
class FailureMatcher(object):