Add a capturing listener (for test or other usage)
To enable those that are using taskflow to easily attach to an engine (say in a unit test) and capture all the transitions, details and state changes in a accumulating 'values' list provide a capturing listener that does just this (and change the one used for internal testing to use it) so that others may also benefit from the same functionality. Change-Id: I7a35e282dd4a6b3e14277bea2af1f275615bd212
This commit is contained in:
parent
19f9674877
commit
20d85fe33b
@ -175,12 +175,18 @@ Claim listener
|
||||
|
||||
.. autoclass:: taskflow.listeners.claims.CheckingClaimListener
|
||||
|
||||
Capturing listener
|
||||
------------------
|
||||
|
||||
.. autoclass:: taskflow.listeners.capturing.CaptureListener
|
||||
|
||||
Hierarchy
|
||||
---------
|
||||
|
||||
.. inheritance-diagram::
|
||||
taskflow.listeners.base.DumpingListener
|
||||
taskflow.listeners.base.Listener
|
||||
taskflow.listeners.capturing.CaptureListener
|
||||
taskflow.listeners.claims.CheckingClaimListener
|
||||
taskflow.listeners.logging.DynamicLoggingListener
|
||||
taskflow.listeners.logging.LoggingListener
|
||||
|
105
taskflow/listeners/capturing.py
Normal file
105
taskflow/listeners/capturing.py
Normal file
@ -0,0 +1,105 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from taskflow.listeners import base
|
||||
|
||||
|
||||
def _freeze_it(values):
|
||||
"""Freezes a set of values (handling none/empty nicely)."""
|
||||
if not values:
|
||||
return frozenset()
|
||||
else:
|
||||
return frozenset(values)
|
||||
|
||||
|
||||
class CaptureListener(base.Listener):
|
||||
"""A listener that captures transitions and saves them locally.
|
||||
|
||||
NOTE(harlowja): this listener is *mainly* useful for testing (where it is
|
||||
useful to test the appropriate/expected transitions, produced results...
|
||||
occurred after engine running) but it could have other usages as well.
|
||||
|
||||
:ivar values: Captured transitions + details (the result of
|
||||
the :py:meth:`._format_capture` method) are stored into this
|
||||
list (a previous list to append to may be provided using the
|
||||
constructor keyword argument of the same name); by default
|
||||
this stores tuples of the format ``(kind, state, details)``.
|
||||
"""
|
||||
|
||||
# Constant 'kind' strings used in the default capture formatting (to
|
||||
# identify what was captured); these are saved into the accumulated
|
||||
# values as the first index (so that your code may differentiate between
|
||||
# what was captured).
|
||||
|
||||
#: Kind that denotes a 'flow' capture.
|
||||
FLOW = 'flow'
|
||||
|
||||
#: Kind that denotes a 'task' capture.
|
||||
TASK = 'task'
|
||||
|
||||
#: Kind that denotes a 'retry' capture.
|
||||
RETRY = 'retry'
|
||||
|
||||
def __init__(self, engine,
|
||||
task_listen_for=base.DEFAULT_LISTEN_FOR,
|
||||
flow_listen_for=base.DEFAULT_LISTEN_FOR,
|
||||
retry_listen_for=base.DEFAULT_LISTEN_FOR,
|
||||
# Easily override what you want captured and where it
|
||||
# should save into and what should be skipped...
|
||||
capture_flow=True, capture_task=True, capture_retry=True,
|
||||
# Skip capturing *all* tasks, all retries, all flows...
|
||||
skip_tasks=None, skip_retries=None, skip_flows=None,
|
||||
# Provide your own list (or previous list) to accumulate
|
||||
# into...
|
||||
values=None):
|
||||
super(CaptureListener, self).__init__(
|
||||
engine,
|
||||
task_listen_for=task_listen_for,
|
||||
flow_listen_for=flow_listen_for,
|
||||
retry_listen_for=retry_listen_for)
|
||||
self._capture_flow = capture_flow
|
||||
self._capture_task = capture_task
|
||||
self._capture_retry = capture_retry
|
||||
self._skip_tasks = _freeze_it(skip_tasks)
|
||||
self._skip_flows = _freeze_it(skip_flows)
|
||||
self._skip_retries = _freeze_it(skip_retries)
|
||||
if values is None:
|
||||
self.values = []
|
||||
else:
|
||||
self.values = values
|
||||
|
||||
@staticmethod
|
||||
def _format_capture(kind, state, details):
|
||||
"""Tweak what is saved according to your desire(s)."""
|
||||
return (kind, state, details)
|
||||
|
||||
def _task_receiver(self, state, details):
|
||||
if self._capture_task:
|
||||
if details['task_name'] not in self._skip_tasks:
|
||||
self.values.append(self._format_capture(self.TASK,
|
||||
state, details))
|
||||
|
||||
def _retry_receiver(self, state, details):
|
||||
if self._capture_retry:
|
||||
if details['retry_name'] not in self._skip_retries:
|
||||
self.values.append(self._format_capture(self.RETRY,
|
||||
state, details))
|
||||
|
||||
def _flow_receiver(self, state, details):
|
||||
if self._capture_flow:
|
||||
if details['flow_name'] not in self._skip_flows:
|
||||
self.values.append(self._format_capture(self.FLOW,
|
||||
state, details))
|
@ -20,7 +20,7 @@ import string
|
||||
import six
|
||||
|
||||
from taskflow import exceptions
|
||||
from taskflow.listeners import base as listener_base
|
||||
from taskflow.listeners import capturing
|
||||
from taskflow.persistence.backends import impl_memory
|
||||
from taskflow import retry
|
||||
from taskflow import task
|
||||
@ -117,58 +117,27 @@ class ProvidesRequiresTask(task.Task):
|
||||
return dict((k, k) for k in self.provides)
|
||||
|
||||
|
||||
class CaptureListener(listener_base.Listener):
|
||||
_LOOKUP_NAME_POSTFIX = {
|
||||
'task_name': '.t',
|
||||
'retry_name': '.r',
|
||||
'flow_name': '.f',
|
||||
}
|
||||
# Used to format the captured values into strings (which are easier to
|
||||
# check later in tests)...
|
||||
LOOKUP_NAME_POSTFIX = {
|
||||
capturing.CaptureListener.TASK: ('.t', 'task_name'),
|
||||
capturing.CaptureListener.RETRY: ('.r', 'retry_name'),
|
||||
capturing.CaptureListener.FLOW: ('.f', 'flow_name'),
|
||||
}
|
||||
|
||||
def __init__(self, engine,
|
||||
task_listen_for=listener_base.DEFAULT_LISTEN_FOR,
|
||||
values=None,
|
||||
capture_flow=True, capture_task=True, capture_retry=True,
|
||||
skip_tasks=None, skip_retries=None, skip_flows=None):
|
||||
super(CaptureListener, self).__init__(engine,
|
||||
task_listen_for=task_listen_for)
|
||||
self._capture_flow = capture_flow
|
||||
self._capture_task = capture_task
|
||||
self._capture_retry = capture_retry
|
||||
self._skip_tasks = skip_tasks or []
|
||||
self._skip_flows = skip_flows or []
|
||||
self._skip_retries = skip_retries or []
|
||||
if values is None:
|
||||
self.values = []
|
||||
else:
|
||||
self.values = values
|
||||
|
||||
def _capture(self, state, details, name_key):
|
||||
name = details[name_key]
|
||||
try:
|
||||
name += self._LOOKUP_NAME_POSTFIX[name_key]
|
||||
except KeyError:
|
||||
pass
|
||||
class CaptureListener(capturing.CaptureListener):
|
||||
|
||||
@staticmethod
|
||||
def _format_capture(kind, state, details):
|
||||
name_postfix, name_key = LOOKUP_NAME_POSTFIX[kind]
|
||||
name = details[name_key] + name_postfix
|
||||
if 'result' in details:
|
||||
name += ' %s(%s)' % (state, details['result'])
|
||||
else:
|
||||
name += " %s" % state
|
||||
return name
|
||||
|
||||
def _task_receiver(self, state, details):
|
||||
if self._capture_task:
|
||||
if details['task_name'] not in self._skip_tasks:
|
||||
self.values.append(self._capture(state, details, 'task_name'))
|
||||
|
||||
def _retry_receiver(self, state, details):
|
||||
if self._capture_retry:
|
||||
if details['retry_name'] not in self._skip_retries:
|
||||
self.values.append(self._capture(state, details, 'retry_name'))
|
||||
|
||||
def _flow_receiver(self, state, details):
|
||||
if self._capture_flow:
|
||||
if details['flow_name'] not in self._skip_flows:
|
||||
self.values.append(self._capture(state, details, 'flow_name'))
|
||||
|
||||
|
||||
class ProgressingTask(task.Task):
|
||||
def execute(self, **kwargs):
|
||||
|
Loading…
x
Reference in New Issue
Block a user