Merge "Add a claims listener that connects job claims to engines"
This commit is contained in:
commit
c714e29c28
@ -168,3 +168,8 @@ Timing listener
|
||||
.. autoclass:: taskflow.listeners.timing.TimingListener
|
||||
|
||||
.. autoclass:: taskflow.listeners.timing.PrintingTimingListener
|
||||
|
||||
Claim listener
|
||||
--------------
|
||||
|
||||
.. autoclass:: taskflow.listeners.claims.CheckingClaimListener
|
||||
|
100
taskflow/listeners/claims.py
Normal file
100
taskflow/listeners/claims.py
Normal file
@ -0,0 +1,100 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from __future__ import absolute_import
|
||||
|
||||
import logging
|
||||
|
||||
import six
|
||||
|
||||
from taskflow import exceptions
|
||||
from taskflow.listeners import base
|
||||
from taskflow import states
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class CheckingClaimListener(base.ListenerBase):
|
||||
"""Listener that interacts [engine, job, jobboard]; ensures claim is valid.
|
||||
|
||||
This listener (or a derivative) can be associated with an engines
|
||||
notification system after the job has been claimed (so that the jobs work
|
||||
can be worked on by that engine). This listener (after associated) will
|
||||
check that the job is still claimed *whenever* the engine notifies of a
|
||||
task or flow state change. If the job is not claimed when a state change
|
||||
occurs, a associated handler (or the default) will be activated to
|
||||
determine how to react to this *hopefully* exceptional case.
|
||||
|
||||
NOTE(harlowja): this may create more traffic than desired to the
|
||||
jobboard backend (zookeeper or other), since the amount of state change
|
||||
per task and flow is non-zero (and checking during each state change will
|
||||
result in quite a few calls to that management system to check the jobs
|
||||
claim status); this could be later optimized to check less (or only check
|
||||
on a smaller set of states)
|
||||
|
||||
NOTE(harlowja): if a custom ``on_job_loss`` callback is provided it must
|
||||
accept three positional arguments, the first being the current engine being
|
||||
ran, the second being the 'task/flow' state and the third being the details
|
||||
that were sent from the engine to listeners for inspection.
|
||||
"""
|
||||
|
||||
def __init__(self, engine, job, board, owner, on_job_loss=None):
|
||||
super(CheckingClaimListener, self).__init__(engine)
|
||||
self._job = job
|
||||
self._board = board
|
||||
self._owner = owner
|
||||
if on_job_loss is None:
|
||||
self._on_job_loss = self._suspend_engine_on_loss
|
||||
else:
|
||||
if not six.callable(on_job_loss):
|
||||
raise ValueError("Custom 'on_job_loss' handler must be"
|
||||
" callable")
|
||||
self._on_job_loss = on_job_loss
|
||||
|
||||
def _suspend_engine_on_loss(self, engine, state, details):
|
||||
"""The default strategy for handling claims being lost."""
|
||||
try:
|
||||
engine.suspend()
|
||||
except exceptions.TaskFlowException as e:
|
||||
LOG.warn("Failed suspending engine '%s', (previously owned by"
|
||||
" '%s'):\n%s", engine, self._owner, e.pformat())
|
||||
|
||||
def _flow_receiver(self, state, details):
|
||||
self._claim_checker(state, details)
|
||||
|
||||
def _task_receiver(self, state, details):
|
||||
self._claim_checker(state, details)
|
||||
|
||||
def _has_been_lost(self):
|
||||
try:
|
||||
job_state = self._job.state
|
||||
job_owner = self._board.find_owner(self._job)
|
||||
except (exceptions.NotFound, exceptions.JobFailure):
|
||||
return True
|
||||
else:
|
||||
if job_state == states.UNCLAIMED or self._owner != job_owner:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def _claim_checker(self, state, details):
|
||||
if not self._has_been_lost():
|
||||
LOG.debug("Job '%s' is still claimed (actively owned by '%s')",
|
||||
self._job, self._owner)
|
||||
else:
|
||||
LOG.warn("Job '%s' has lost its claim (previously owned by '%s')",
|
||||
self._job, self._owner)
|
||||
self._on_job_loss(self._engine, state, details)
|
@ -18,18 +18,27 @@ import contextlib
|
||||
import logging
|
||||
import time
|
||||
|
||||
from oslo.serialization import jsonutils
|
||||
import six
|
||||
from zake import fake_client
|
||||
|
||||
import taskflow.engines
|
||||
from taskflow import exceptions as exc
|
||||
from taskflow.jobs import backends as jobs
|
||||
from taskflow.listeners import claims
|
||||
from taskflow.listeners import logging as logging_listeners
|
||||
from taskflow.listeners import timing
|
||||
from taskflow.patterns import linear_flow as lf
|
||||
from taskflow.persistence.backends import impl_memory
|
||||
from taskflow import states
|
||||
from taskflow import task
|
||||
from taskflow import test
|
||||
from taskflow.test import mock
|
||||
from taskflow.tests import utils as test_utils
|
||||
from taskflow.utils import misc
|
||||
from taskflow.utils import persistence_utils
|
||||
from taskflow.utils import reflection
|
||||
from taskflow.utils import threading_utils
|
||||
|
||||
|
||||
_LOG_LEVELS = frozenset([
|
||||
@ -64,6 +73,135 @@ class EngineMakerMixin(object):
|
||||
return e
|
||||
|
||||
|
||||
class TestClaimListener(test.TestCase, EngineMakerMixin):
|
||||
def _make_dummy_flow(self, count):
|
||||
f = lf.Flow('root')
|
||||
for i in range(0, count):
|
||||
f.add(test_utils.ProvidesRequiresTask('%s_test' % i, [], []))
|
||||
return f
|
||||
|
||||
def setUp(self):
|
||||
super(TestClaimListener, self).setUp()
|
||||
self.client = fake_client.FakeClient()
|
||||
self.addCleanup(self.client.stop)
|
||||
self.board = jobs.fetch('test', 'zookeeper', client=self.client)
|
||||
self.addCleanup(self.board.close)
|
||||
self.board.connect()
|
||||
|
||||
def _post_claim_job(self, job_name, book=None, details=None):
|
||||
arrived = threading_utils.Event()
|
||||
|
||||
def set_on_children(children):
|
||||
if children:
|
||||
arrived.set()
|
||||
|
||||
self.client.ChildrenWatch("/taskflow", set_on_children)
|
||||
job = self.board.post('test-1')
|
||||
|
||||
# Make sure it arrived and claimed before doing further work...
|
||||
self.assertTrue(arrived.wait(test_utils.WAIT_TIMEOUT))
|
||||
arrived.clear()
|
||||
self.board.claim(job, self.board.name)
|
||||
self.assertTrue(arrived.wait(test_utils.WAIT_TIMEOUT))
|
||||
self.assertEqual(states.CLAIMED, job.state)
|
||||
|
||||
return job
|
||||
|
||||
def _destroy_locks(self):
|
||||
children = self.client.storage.get_children("/taskflow",
|
||||
only_direct=False)
|
||||
removed = 0
|
||||
for p, data in six.iteritems(children):
|
||||
if p.endswith(".lock"):
|
||||
self.client.storage.pop(p)
|
||||
removed += 1
|
||||
return removed
|
||||
|
||||
def _change_owner(self, new_owner):
|
||||
children = self.client.storage.get_children("/taskflow",
|
||||
only_direct=False)
|
||||
altered = 0
|
||||
for p, data in six.iteritems(children):
|
||||
if p.endswith(".lock"):
|
||||
self.client.set(p, misc.binary_encode(
|
||||
jsonutils.dumps({'owner': new_owner})))
|
||||
altered += 1
|
||||
return altered
|
||||
|
||||
def test_bad_create(self):
|
||||
job = self._post_claim_job('test')
|
||||
f = self._make_dummy_flow(10)
|
||||
e = self._make_engine(f)
|
||||
self.assertRaises(ValueError, claims.CheckingClaimListener,
|
||||
e, job, self.board, self.board.name,
|
||||
on_job_loss=1)
|
||||
|
||||
def test_claim_lost_suspended(self):
|
||||
job = self._post_claim_job('test')
|
||||
f = self._make_dummy_flow(10)
|
||||
e = self._make_engine(f)
|
||||
|
||||
try_destroy = True
|
||||
ran_states = []
|
||||
with claims.CheckingClaimListener(e, job,
|
||||
self.board, self.board.name):
|
||||
for state in e.run_iter():
|
||||
ran_states.append(state)
|
||||
if state == states.SCHEDULING and try_destroy:
|
||||
try_destroy = bool(self._destroy_locks())
|
||||
|
||||
self.assertEqual(states.SUSPENDED, e.storage.get_flow_state())
|
||||
self.assertEqual(1, ran_states.count(states.ANALYZING))
|
||||
self.assertEqual(1, ran_states.count(states.SCHEDULING))
|
||||
self.assertEqual(1, ran_states.count(states.WAITING))
|
||||
|
||||
def test_claim_lost_custom_handler(self):
|
||||
job = self._post_claim_job('test')
|
||||
f = self._make_dummy_flow(10)
|
||||
e = self._make_engine(f)
|
||||
|
||||
handler = mock.MagicMock()
|
||||
ran_states = []
|
||||
try_destroy = True
|
||||
destroyed_at = -1
|
||||
with claims.CheckingClaimListener(e, job, self.board,
|
||||
self.board.name,
|
||||
on_job_loss=handler):
|
||||
for i, state in enumerate(e.run_iter()):
|
||||
ran_states.append(state)
|
||||
if state == states.SCHEDULING and try_destroy:
|
||||
destroyed = bool(self._destroy_locks())
|
||||
if destroyed:
|
||||
destroyed_at = i
|
||||
try_destroy = False
|
||||
|
||||
self.assertTrue(handler.called)
|
||||
self.assertEqual(10, ran_states.count(states.SCHEDULING))
|
||||
self.assertNotEqual(-1, destroyed_at)
|
||||
|
||||
after_states = ran_states[destroyed_at:]
|
||||
self.assertGreater(0, len(after_states))
|
||||
|
||||
def test_claim_lost_new_owner(self):
|
||||
job = self._post_claim_job('test')
|
||||
f = self._make_dummy_flow(10)
|
||||
e = self._make_engine(f)
|
||||
|
||||
change_owner = True
|
||||
ran_states = []
|
||||
with claims.CheckingClaimListener(e, job,
|
||||
self.board, self.board.name):
|
||||
for state in e.run_iter():
|
||||
ran_states.append(state)
|
||||
if state == states.SCHEDULING and change_owner:
|
||||
change_owner = bool(self._change_owner('test-2'))
|
||||
|
||||
self.assertEqual(states.SUSPENDED, e.storage.get_flow_state())
|
||||
self.assertEqual(1, ran_states.count(states.ANALYZING))
|
||||
self.assertEqual(1, ran_states.count(states.SCHEDULING))
|
||||
self.assertEqual(1, ran_states.count(states.WAITING))
|
||||
|
||||
|
||||
class TestTimingListener(test.TestCase, EngineMakerMixin):
|
||||
def test_duration(self):
|
||||
with contextlib.closing(impl_memory.MemoryBackend()) as be:
|
||||
|
Loading…
Reference in New Issue
Block a user