Files
deb-python-taskflow/taskflow/tests/unit/test_progress.py
Joshua Harlow c42b3a2ed5 A few adjustments to the progress code
Remove an unneeded variable in the tasks class as
well as add a test for capturing and checking on
the progress that is emitted and adjust how the task
action should use a single function for doing state
changes (and another one for saving/resetting). Also
add in progress boundary checking (between 0 and 1).

Change-Id: Ic8cf56e3b25a493129f370b7b52e3cb2238da844
2013-10-01 10:15:38 -07:00

127 lines
4.4 KiB
Python

# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
from taskflow import task
from taskflow import test
from taskflow.engines.action_engine import engine as eng
from taskflow.patterns import linear_flow as lf
from taskflow.persistence.backends import impl_memory
from taskflow.utils import persistence_utils as p_utils
class ProgressTask(task.Task):
def __init__(self, name, segments):
super(ProgressTask, self).__init__(name=name)
self._segments = segments
def execute(self):
if self._segments <= 0:
return
for i in range(1, self._segments):
progress = float(i) / self._segments
self.update_progress(progress)
class TestProgress(test.TestCase):
def _make_engine(self, flow):
e = eng.SingleThreadedActionEngine(flow)
e.compile()
return e
def tearDown(self):
super(TestProgress, self).tearDown()
with contextlib.closing(impl_memory.MemoryBackend({})) as be:
with contextlib.closing(be.get_connection()) as conn:
conn.clear_all()
def test_sanity_progress(self):
fired_events = []
def notify_me(task, event_data, progress):
fired_events.append(progress)
ev_count = 5
t = ProgressTask("test", ev_count)
t.bind('update_progress', notify_me)
flo = lf.Flow("test")
flo.add(t)
e = self._make_engine(flo)
e.run()
self.assertEquals(ev_count + 1, len(fired_events))
self.assertEquals(1.0, fired_events[-1])
self.assertEquals(0.0, fired_events[0])
def test_no_segments_progress(self):
fired_events = []
def notify_me(task, event_data, progress):
fired_events.append(progress)
t = ProgressTask("test", 0)
t.bind('update_progress', notify_me)
flo = lf.Flow("test")
flo.add(t)
e = self._make_engine(flo)
e.run()
# 0.0 and 1.0 should be automatically fired
self.assertEquals(2, len(fired_events))
self.assertEquals(1.0, fired_events[-1])
self.assertEquals(0.0, fired_events[0])
def test_storage_progress(self):
with contextlib.closing(impl_memory.MemoryBackend({})) as be:
flo = lf.Flow("test")
flo.add(ProgressTask("test", 3))
b, fd = p_utils.temporary_flow_detail(be)
e = eng.SingleThreadedActionEngine(flo,
book=b, flow_detail=fd,
backend=be)
e.run()
t_uuid = e.storage.get_uuid_by_name("test")
end_progress = e.storage.get_task_progress(t_uuid)
self.assertEquals(1.0, end_progress)
td = fd.find(t_uuid)
self.assertEquals({'progress': 1.0}, td.meta)
def test_dual_storage_progress(self):
fired_events = []
def notify_me(task, event_data, progress):
fired_events.append(progress)
with contextlib.closing(impl_memory.MemoryBackend({})) as be:
t = ProgressTask("test", 5)
t.bind('update_progress', notify_me)
flo = lf.Flow("test")
flo.add(t)
b, fd = p_utils.temporary_flow_detail(be)
e = eng.SingleThreadedActionEngine(flo,
book=b, flow_detail=fd,
backend=be)
e.run()
t_uuid = e.storage.get_uuid_by_name("test")
end_progress = e.storage.get_task_progress(t_uuid)
self.assertEquals(1.0, end_progress)
td = fd.find(t_uuid)
self.assertEquals({'progress': 1.0}, td.meta)
self.assertEquals(6, len(fired_events))