diff --git a/doc/source/notifications.rst b/doc/source/notifications.rst index aaa5c0649..3b8575067 100644 --- a/doc/source/notifications.rst +++ b/doc/source/notifications.rst @@ -168,3 +168,8 @@ Timing listener .. autoclass:: taskflow.listeners.timing.TimingListener .. autoclass:: taskflow.listeners.timing.PrintingTimingListener + +Claim listener +-------------- + +.. autoclass:: taskflow.listeners.claims.CheckingClaimListener diff --git a/taskflow/listeners/claims.py b/taskflow/listeners/claims.py new file mode 100644 index 000000000..3fbc15d19 --- /dev/null +++ b/taskflow/listeners/claims.py @@ -0,0 +1,100 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from __future__ import absolute_import + +import logging + +import six + +from taskflow import exceptions +from taskflow.listeners import base +from taskflow import states + +LOG = logging.getLogger(__name__) + + +class CheckingClaimListener(base.ListenerBase): + """Listener that interacts [engine, job, jobboard]; ensures claim is valid. + + This listener (or a derivative) can be associated with an engines + notification system after the job has been claimed (so that the jobs work + can be worked on by that engine). This listener (after associated) will + check that the job is still claimed *whenever* the engine notifies of a + task or flow state change. If the job is not claimed when a state change + occurs, a associated handler (or the default) will be activated to + determine how to react to this *hopefully* exceptional case. + + NOTE(harlowja): this may create more traffic than desired to the + jobboard backend (zookeeper or other), since the amount of state change + per task and flow is non-zero (and checking during each state change will + result in quite a few calls to that management system to check the jobs + claim status); this could be later optimized to check less (or only check + on a smaller set of states) + + NOTE(harlowja): if a custom ``on_job_loss`` callback is provided it must + accept three positional arguments, the first being the current engine being + ran, the second being the 'task/flow' state and the third being the details + that were sent from the engine to listeners for inspection. + """ + + def __init__(self, engine, job, board, owner, on_job_loss=None): + super(CheckingClaimListener, self).__init__(engine) + self._job = job + self._board = board + self._owner = owner + if on_job_loss is None: + self._on_job_loss = self._suspend_engine_on_loss + else: + if not six.callable(on_job_loss): + raise ValueError("Custom 'on_job_loss' handler must be" + " callable") + self._on_job_loss = on_job_loss + + def _suspend_engine_on_loss(self, engine, state, details): + """The default strategy for handling claims being lost.""" + try: + engine.suspend() + except exceptions.TaskFlowException as e: + LOG.warn("Failed suspending engine '%s', (previously owned by" + " '%s'):\n%s", engine, self._owner, e.pformat()) + + def _flow_receiver(self, state, details): + self._claim_checker(state, details) + + def _task_receiver(self, state, details): + self._claim_checker(state, details) + + def _has_been_lost(self): + try: + job_state = self._job.state + job_owner = self._board.find_owner(self._job) + except (exceptions.NotFound, exceptions.JobFailure): + return True + else: + if job_state == states.UNCLAIMED or self._owner != job_owner: + return True + else: + return False + + def _claim_checker(self, state, details): + if not self._has_been_lost(): + LOG.debug("Job '%s' is still claimed (actively owned by '%s')", + self._job, self._owner) + else: + LOG.warn("Job '%s' has lost its claim (previously owned by '%s')", + self._job, self._owner) + self._on_job_loss(self._engine, state, details) diff --git a/taskflow/tests/unit/test_listeners.py b/taskflow/tests/unit/test_listeners.py index 6ba97d6dd..210fe7982 100644 --- a/taskflow/tests/unit/test_listeners.py +++ b/taskflow/tests/unit/test_listeners.py @@ -18,18 +18,27 @@ import contextlib import logging import time +from oslo.serialization import jsonutils +import six +from zake import fake_client + import taskflow.engines from taskflow import exceptions as exc +from taskflow.jobs import backends as jobs +from taskflow.listeners import claims from taskflow.listeners import logging as logging_listeners from taskflow.listeners import timing from taskflow.patterns import linear_flow as lf from taskflow.persistence.backends import impl_memory +from taskflow import states from taskflow import task from taskflow import test from taskflow.test import mock from taskflow.tests import utils as test_utils +from taskflow.utils import misc from taskflow.utils import persistence_utils from taskflow.utils import reflection +from taskflow.utils import threading_utils _LOG_LEVELS = frozenset([ @@ -64,6 +73,135 @@ class EngineMakerMixin(object): return e +class TestClaimListener(test.TestCase, EngineMakerMixin): + def _make_dummy_flow(self, count): + f = lf.Flow('root') + for i in range(0, count): + f.add(test_utils.ProvidesRequiresTask('%s_test' % i, [], [])) + return f + + def setUp(self): + super(TestClaimListener, self).setUp() + self.client = fake_client.FakeClient() + self.addCleanup(self.client.stop) + self.board = jobs.fetch('test', 'zookeeper', client=self.client) + self.addCleanup(self.board.close) + self.board.connect() + + def _post_claim_job(self, job_name, book=None, details=None): + arrived = threading_utils.Event() + + def set_on_children(children): + if children: + arrived.set() + + self.client.ChildrenWatch("/taskflow", set_on_children) + job = self.board.post('test-1') + + # Make sure it arrived and claimed before doing further work... + self.assertTrue(arrived.wait(test_utils.WAIT_TIMEOUT)) + arrived.clear() + self.board.claim(job, self.board.name) + self.assertTrue(arrived.wait(test_utils.WAIT_TIMEOUT)) + self.assertEqual(states.CLAIMED, job.state) + + return job + + def _destroy_locks(self): + children = self.client.storage.get_children("/taskflow", + only_direct=False) + removed = 0 + for p, data in six.iteritems(children): + if p.endswith(".lock"): + self.client.storage.pop(p) + removed += 1 + return removed + + def _change_owner(self, new_owner): + children = self.client.storage.get_children("/taskflow", + only_direct=False) + altered = 0 + for p, data in six.iteritems(children): + if p.endswith(".lock"): + self.client.set(p, misc.binary_encode( + jsonutils.dumps({'owner': new_owner}))) + altered += 1 + return altered + + def test_bad_create(self): + job = self._post_claim_job('test') + f = self._make_dummy_flow(10) + e = self._make_engine(f) + self.assertRaises(ValueError, claims.CheckingClaimListener, + e, job, self.board, self.board.name, + on_job_loss=1) + + def test_claim_lost_suspended(self): + job = self._post_claim_job('test') + f = self._make_dummy_flow(10) + e = self._make_engine(f) + + try_destroy = True + ran_states = [] + with claims.CheckingClaimListener(e, job, + self.board, self.board.name): + for state in e.run_iter(): + ran_states.append(state) + if state == states.SCHEDULING and try_destroy: + try_destroy = bool(self._destroy_locks()) + + self.assertEqual(states.SUSPENDED, e.storage.get_flow_state()) + self.assertEqual(1, ran_states.count(states.ANALYZING)) + self.assertEqual(1, ran_states.count(states.SCHEDULING)) + self.assertEqual(1, ran_states.count(states.WAITING)) + + def test_claim_lost_custom_handler(self): + job = self._post_claim_job('test') + f = self._make_dummy_flow(10) + e = self._make_engine(f) + + handler = mock.MagicMock() + ran_states = [] + try_destroy = True + destroyed_at = -1 + with claims.CheckingClaimListener(e, job, self.board, + self.board.name, + on_job_loss=handler): + for i, state in enumerate(e.run_iter()): + ran_states.append(state) + if state == states.SCHEDULING and try_destroy: + destroyed = bool(self._destroy_locks()) + if destroyed: + destroyed_at = i + try_destroy = False + + self.assertTrue(handler.called) + self.assertEqual(10, ran_states.count(states.SCHEDULING)) + self.assertNotEqual(-1, destroyed_at) + + after_states = ran_states[destroyed_at:] + self.assertGreater(0, len(after_states)) + + def test_claim_lost_new_owner(self): + job = self._post_claim_job('test') + f = self._make_dummy_flow(10) + e = self._make_engine(f) + + change_owner = True + ran_states = [] + with claims.CheckingClaimListener(e, job, + self.board, self.board.name): + for state in e.run_iter(): + ran_states.append(state) + if state == states.SCHEDULING and change_owner: + change_owner = bool(self._change_owner('test-2')) + + self.assertEqual(states.SUSPENDED, e.storage.get_flow_state()) + self.assertEqual(1, ran_states.count(states.ANALYZING)) + self.assertEqual(1, ran_states.count(states.SCHEDULING)) + self.assertEqual(1, ran_states.count(states.WAITING)) + + class TestTimingListener(test.TestCase, EngineMakerMixin): def test_duration(self): with contextlib.closing(impl_memory.MemoryBackend()) as be: