Have tasks be able to provide copy() methods

When a engine needs to copy a task and possibly
adjust its listeners to execute it elsewhere it
needs to be able to clone that object and have
the clone retain different properties than the
initial object; so in order to support this at
a top-level we require a new copy() method which
a task can override (or it can just use the default
implementation if it chooses to).

Part of blueprint process-executor

Change-Id: Ib29a0afdc01973eb94d41af18a9b04601cd2f152
This commit is contained in:
Joshua Harlow
2014-09-12 18:49:29 -07:00
committed by Joshua Harlow
parent e978eca363
commit dc4262e587
2 changed files with 42 additions and 0 deletions

View File

@@ -18,6 +18,7 @@
import abc
import collections
import contextlib
import copy
import logging
import six
@@ -133,6 +134,24 @@ class BaseTask(atom.Atom):
This works the same as :meth:`.post_execute`, but for the revert phase.
"""
def copy(self, retain_listeners=True):
"""Clone/copy this task.
:param retain_listeners: retain the attached listeners when cloning,
when false the listeners will be emptied, when
true the listeners will be copied and retained
:rtype: task
:return: the copied task
"""
c = copy.copy(self)
c._events_listeners = c._events_listeners.copy()
c._events_listeners.clear()
if retain_listeners:
for event_name, listeners in six.iteritems(self._events_listeners):
c._events_listeners[event_name] = listeners[:]
return c
def update_progress(self, progress, **kwargs):
"""Update task progress and notify all registered listeners.

View File

@@ -276,6 +276,29 @@ class TaskTest(test.TestCase):
task = MyTask()
self.assertRaises(ValueError, task.bind, 'update_progress', 2)
def test_copy_no_listeners(self):
handler1 = lambda: None
a_task = MyTask()
a_task.bind(task.EVENT_UPDATE_PROGRESS, handler1)
b_task = a_task.copy(retain_listeners=False)
self.assertEqual(len(list(a_task.listeners_iter())), 1)
self.assertEqual(len(list(b_task.listeners_iter())), 0)
def test_copy_listeners(self):
handler1 = lambda: None
handler2 = lambda: None
a_task = MyTask()
a_task.bind(task.EVENT_UPDATE_PROGRESS, handler1)
b_task = a_task.copy()
self.assertEqual(len(list(b_task.listeners_iter())), 1)
self.assertTrue(a_task.unbind(task.EVENT_UPDATE_PROGRESS))
self.assertEqual(len(list(a_task.listeners_iter())), 0)
self.assertEqual(len(list(b_task.listeners_iter())), 1)
b_task.bind(task.EVENT_UPDATE_PROGRESS, handler2)
listeners = dict(list(b_task.listeners_iter()))
self.assertEqual(len(listeners[task.EVENT_UPDATE_PROGRESS]), 2)
self.assertEqual(len(list(a_task.listeners_iter())), 0)
class FunctorTaskTest(test.TestCase):