From 1de8bbd8382ecda419e9d7753c2be5dabdbfcbd6 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Mon, 24 Nov 2014 11:50:28 -0800 Subject: [PATCH] Add a claims listener that connects job claims to engines To make it easily possible to stop running a engine that was created from a job, add a claims listener that will be called on state changes that an engine progresses through. During those state changes the jobboard will be queried to determine if the job is still claimed by the respective owner; if not the engine will be suspended and further work will stop. Change-Id: I8bbc6a3e03746ba0a7c74139cf9e230631d80d8f --- doc/source/notifications.rst | 5 + taskflow/listeners/claims.py | 100 +++++++++++++++++++ taskflow/tests/unit/test_listeners.py | 138 ++++++++++++++++++++++++++ 3 files changed, 243 insertions(+) create mode 100644 taskflow/listeners/claims.py diff --git a/doc/source/notifications.rst b/doc/source/notifications.rst index aaa5c0649..3b8575067 100644 --- a/doc/source/notifications.rst +++ b/doc/source/notifications.rst @@ -168,3 +168,8 @@ Timing listener .. autoclass:: taskflow.listeners.timing.TimingListener .. autoclass:: taskflow.listeners.timing.PrintingTimingListener + +Claim listener +-------------- + +.. autoclass:: taskflow.listeners.claims.CheckingClaimListener diff --git a/taskflow/listeners/claims.py b/taskflow/listeners/claims.py new file mode 100644 index 000000000..3fbc15d19 --- /dev/null +++ b/taskflow/listeners/claims.py @@ -0,0 +1,100 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from __future__ import absolute_import + +import logging + +import six + +from taskflow import exceptions +from taskflow.listeners import base +from taskflow import states + +LOG = logging.getLogger(__name__) + + +class CheckingClaimListener(base.ListenerBase): + """Listener that interacts [engine, job, jobboard]; ensures claim is valid. + + This listener (or a derivative) can be associated with an engines + notification system after the job has been claimed (so that the jobs work + can be worked on by that engine). This listener (after associated) will + check that the job is still claimed *whenever* the engine notifies of a + task or flow state change. If the job is not claimed when a state change + occurs, a associated handler (or the default) will be activated to + determine how to react to this *hopefully* exceptional case. + + NOTE(harlowja): this may create more traffic than desired to the + jobboard backend (zookeeper or other), since the amount of state change + per task and flow is non-zero (and checking during each state change will + result in quite a few calls to that management system to check the jobs + claim status); this could be later optimized to check less (or only check + on a smaller set of states) + + NOTE(harlowja): if a custom ``on_job_loss`` callback is provided it must + accept three positional arguments, the first being the current engine being + ran, the second being the 'task/flow' state and the third being the details + that were sent from the engine to listeners for inspection. + """ + + def __init__(self, engine, job, board, owner, on_job_loss=None): + super(CheckingClaimListener, self).__init__(engine) + self._job = job + self._board = board + self._owner = owner + if on_job_loss is None: + self._on_job_loss = self._suspend_engine_on_loss + else: + if not six.callable(on_job_loss): + raise ValueError("Custom 'on_job_loss' handler must be" + " callable") + self._on_job_loss = on_job_loss + + def _suspend_engine_on_loss(self, engine, state, details): + """The default strategy for handling claims being lost.""" + try: + engine.suspend() + except exceptions.TaskFlowException as e: + LOG.warn("Failed suspending engine '%s', (previously owned by" + " '%s'):\n%s", engine, self._owner, e.pformat()) + + def _flow_receiver(self, state, details): + self._claim_checker(state, details) + + def _task_receiver(self, state, details): + self._claim_checker(state, details) + + def _has_been_lost(self): + try: + job_state = self._job.state + job_owner = self._board.find_owner(self._job) + except (exceptions.NotFound, exceptions.JobFailure): + return True + else: + if job_state == states.UNCLAIMED or self._owner != job_owner: + return True + else: + return False + + def _claim_checker(self, state, details): + if not self._has_been_lost(): + LOG.debug("Job '%s' is still claimed (actively owned by '%s')", + self._job, self._owner) + else: + LOG.warn("Job '%s' has lost its claim (previously owned by '%s')", + self._job, self._owner) + self._on_job_loss(self._engine, state, details) diff --git a/taskflow/tests/unit/test_listeners.py b/taskflow/tests/unit/test_listeners.py index 6ba97d6dd..210fe7982 100644 --- a/taskflow/tests/unit/test_listeners.py +++ b/taskflow/tests/unit/test_listeners.py @@ -18,18 +18,27 @@ import contextlib import logging import time +from oslo.serialization import jsonutils +import six +from zake import fake_client + import taskflow.engines from taskflow import exceptions as exc +from taskflow.jobs import backends as jobs +from taskflow.listeners import claims from taskflow.listeners import logging as logging_listeners from taskflow.listeners import timing from taskflow.patterns import linear_flow as lf from taskflow.persistence.backends import impl_memory +from taskflow import states from taskflow import task from taskflow import test from taskflow.test import mock from taskflow.tests import utils as test_utils +from taskflow.utils import misc from taskflow.utils import persistence_utils from taskflow.utils import reflection +from taskflow.utils import threading_utils _LOG_LEVELS = frozenset([ @@ -64,6 +73,135 @@ class EngineMakerMixin(object): return e +class TestClaimListener(test.TestCase, EngineMakerMixin): + def _make_dummy_flow(self, count): + f = lf.Flow('root') + for i in range(0, count): + f.add(test_utils.ProvidesRequiresTask('%s_test' % i, [], [])) + return f + + def setUp(self): + super(TestClaimListener, self).setUp() + self.client = fake_client.FakeClient() + self.addCleanup(self.client.stop) + self.board = jobs.fetch('test', 'zookeeper', client=self.client) + self.addCleanup(self.board.close) + self.board.connect() + + def _post_claim_job(self, job_name, book=None, details=None): + arrived = threading_utils.Event() + + def set_on_children(children): + if children: + arrived.set() + + self.client.ChildrenWatch("/taskflow", set_on_children) + job = self.board.post('test-1') + + # Make sure it arrived and claimed before doing further work... + self.assertTrue(arrived.wait(test_utils.WAIT_TIMEOUT)) + arrived.clear() + self.board.claim(job, self.board.name) + self.assertTrue(arrived.wait(test_utils.WAIT_TIMEOUT)) + self.assertEqual(states.CLAIMED, job.state) + + return job + + def _destroy_locks(self): + children = self.client.storage.get_children("/taskflow", + only_direct=False) + removed = 0 + for p, data in six.iteritems(children): + if p.endswith(".lock"): + self.client.storage.pop(p) + removed += 1 + return removed + + def _change_owner(self, new_owner): + children = self.client.storage.get_children("/taskflow", + only_direct=False) + altered = 0 + for p, data in six.iteritems(children): + if p.endswith(".lock"): + self.client.set(p, misc.binary_encode( + jsonutils.dumps({'owner': new_owner}))) + altered += 1 + return altered + + def test_bad_create(self): + job = self._post_claim_job('test') + f = self._make_dummy_flow(10) + e = self._make_engine(f) + self.assertRaises(ValueError, claims.CheckingClaimListener, + e, job, self.board, self.board.name, + on_job_loss=1) + + def test_claim_lost_suspended(self): + job = self._post_claim_job('test') + f = self._make_dummy_flow(10) + e = self._make_engine(f) + + try_destroy = True + ran_states = [] + with claims.CheckingClaimListener(e, job, + self.board, self.board.name): + for state in e.run_iter(): + ran_states.append(state) + if state == states.SCHEDULING and try_destroy: + try_destroy = bool(self._destroy_locks()) + + self.assertEqual(states.SUSPENDED, e.storage.get_flow_state()) + self.assertEqual(1, ran_states.count(states.ANALYZING)) + self.assertEqual(1, ran_states.count(states.SCHEDULING)) + self.assertEqual(1, ran_states.count(states.WAITING)) + + def test_claim_lost_custom_handler(self): + job = self._post_claim_job('test') + f = self._make_dummy_flow(10) + e = self._make_engine(f) + + handler = mock.MagicMock() + ran_states = [] + try_destroy = True + destroyed_at = -1 + with claims.CheckingClaimListener(e, job, self.board, + self.board.name, + on_job_loss=handler): + for i, state in enumerate(e.run_iter()): + ran_states.append(state) + if state == states.SCHEDULING and try_destroy: + destroyed = bool(self._destroy_locks()) + if destroyed: + destroyed_at = i + try_destroy = False + + self.assertTrue(handler.called) + self.assertEqual(10, ran_states.count(states.SCHEDULING)) + self.assertNotEqual(-1, destroyed_at) + + after_states = ran_states[destroyed_at:] + self.assertGreater(0, len(after_states)) + + def test_claim_lost_new_owner(self): + job = self._post_claim_job('test') + f = self._make_dummy_flow(10) + e = self._make_engine(f) + + change_owner = True + ran_states = [] + with claims.CheckingClaimListener(e, job, + self.board, self.board.name): + for state in e.run_iter(): + ran_states.append(state) + if state == states.SCHEDULING and change_owner: + change_owner = bool(self._change_owner('test-2')) + + self.assertEqual(states.SUSPENDED, e.storage.get_flow_state()) + self.assertEqual(1, ran_states.count(states.ANALYZING)) + self.assertEqual(1, ran_states.count(states.SCHEDULING)) + self.assertEqual(1, ran_states.count(states.WAITING)) + + class TestTimingListener(test.TestCase, EngineMakerMixin): def test_duration(self): with contextlib.closing(impl_memory.MemoryBackend()) as be: