Files
deb-python-taskflow/taskflow/tests/unit/jobs/base.py
Joshua Harlow 125d015d92 Cleanup zookeeper integration testing
In the persistence tests only use zake when zookeeper is
not available (and of the right version). When zookeeper is
available skip running zake.

In the jobboard tests split out the tests which are not
specific for zake into a base class (allowing for a future
commit to add a zookeeper integration test).

Change-Id: I50d51639a7f6c03c29d559c485676fddb9a7cf20
2014-05-09 20:39:05 -07:00

274 lines
9.7 KiB
Python

# -*- coding: utf-8 -*-
# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import contextlib
import mock
import threading
import time
from kazoo.recipe import watchers
from taskflow import exceptions as excp
from taskflow.openstack.common import uuidutils
from taskflow.persistence.backends import impl_dir
from taskflow import states
from taskflow.utils import misc
from taskflow.utils import persistence_utils as p_utils
@contextlib.contextmanager
def connect_close(*args):
try:
for a in args:
a.connect()
yield
finally:
for a in args:
a.close()
@contextlib.contextmanager
def flush(client, path=None):
# This uses the linearity guarantee of zookeeper (and associated libraries)
# to create a temporary node, wait until a watcher notifies it's created,
# then yield back for more work, and then at the end of that work delete
# the created node. This ensures that the operations done in the yield
# of this context manager will be applied and all watchers will have fired
# before this context manager exits.
if not path:
path = "/tmp-%s" % uuidutils.generate_uuid()
created = threading.Event()
deleted = threading.Event()
def on_created(data, stat):
if stat is not None:
created.set()
return False # cause this watcher to cease to exist
def on_deleted(data, stat):
if stat is None:
deleted.set()
return False # cause this watcher to cease to exist
watchers.DataWatch(client, path, func=on_created)
client.create(path)
created.wait()
try:
yield
finally:
watchers.DataWatch(client, path, func=on_deleted)
client.delete(path, recursive=True)
deleted.wait()
class BoardTestMixin(object):
def test_connect(self):
self.assertFalse(self.board.connected)
with connect_close(self.board):
self.assertTrue(self.board.connected)
@mock.patch("taskflow.jobs.backends.impl_zookeeper.misc."
"millis_to_datetime")
def test_posting_dates(self, mock_dt):
epoch = misc.millis_to_datetime(0)
mock_dt.return_value = epoch
with connect_close(self.board):
j = self.board.post('test', p_utils.temporary_log_book())
self.assertEqual(epoch, j.created_on)
self.assertEqual(epoch, j.last_modified)
self.assertTrue(mock_dt.called)
def test_board_iter(self):
with connect_close(self.board):
it = self.board.iterjobs()
self.assertEqual(it.board, self.board)
self.assertFalse(it.only_unclaimed)
self.assertFalse(it.ensure_fresh)
def test_board_iter_empty(self):
with connect_close(self.board):
jobs_found = list(self.board.iterjobs())
self.assertEqual([], jobs_found)
def test_fresh_iter(self):
with connect_close(self.board):
book = p_utils.temporary_log_book()
self.board.post('test', book)
jobs = list(self.board.iterjobs(ensure_fresh=True))
self.assertEqual(1, len(jobs))
def test_wait_timeout(self):
with connect_close(self.board):
self.assertRaises(excp.NotFound, self.board.wait, timeout=0.1)
def test_wait_arrival(self):
ev = threading.Event()
jobs = []
def poster(wait_post=0.2):
ev.wait() # wait until the waiter is active
time.sleep(wait_post)
self.board.post('test', p_utils.temporary_log_book())
def waiter():
ev.set()
it = self.board.wait()
jobs.extend(it)
with connect_close(self.board):
t1 = threading.Thread(target=poster)
t1.daemon = True
t1.start()
t2 = threading.Thread(target=waiter)
t2.daemon = True
t2.start()
for t in (t1, t2):
t.join()
self.assertEqual(1, len(jobs))
def test_posting_claim(self):
with connect_close(self.board):
with flush(self.client):
self.board.post('test', p_utils.temporary_log_book())
self.assertEqual(1, self.board.job_count)
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
self.assertEqual(1, len(possible_jobs))
j = possible_jobs[0]
self.assertEqual(states.UNCLAIMED, j.state)
with flush(self.client):
self.board.claim(j, self.board.name)
self.assertEqual(self.board.name, self.board.find_owner(j))
self.assertEqual(states.CLAIMED, j.state)
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
self.assertEqual(0, len(possible_jobs))
self.assertRaisesAttrAccess(excp.NotFound, j, 'state')
self.assertRaises(excp.NotFound,
self.board.consume, j, self.board.name)
def test_posting_claim_consume(self):
with connect_close(self.board):
with flush(self.client):
self.board.post('test', p_utils.temporary_log_book())
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
self.assertEqual(1, len(possible_jobs))
j = possible_jobs[0]
with flush(self.client):
self.board.claim(j, self.board.name)
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
self.assertEqual(0, len(possible_jobs))
with flush(self.client):
self.board.consume(j, self.board.name)
self.assertEqual(0, len(list(self.board.iterjobs())))
self.assertRaises(excp.NotFound,
self.board.consume, j, self.board.name)
def test_posting_claim_abandon(self):
with connect_close(self.board):
with flush(self.client):
self.board.post('test', p_utils.temporary_log_book())
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
self.assertEqual(1, len(possible_jobs))
j = possible_jobs[0]
with flush(self.client):
self.board.claim(j, self.board.name)
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
self.assertEqual(0, len(possible_jobs))
with flush(self.client):
self.board.abandon(j, self.board.name)
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
self.assertEqual(1, len(possible_jobs))
def test_posting_claim_diff_owner(self):
with connect_close(self.board):
with flush(self.client):
self.board.post('test', p_utils.temporary_log_book())
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
self.assertEqual(1, len(possible_jobs))
with flush(self.client):
self.board.claim(possible_jobs[0], self.board.name)
possible_jobs = list(self.board.iterjobs())
self.assertEqual(1, len(possible_jobs))
self.assertRaises(excp.UnclaimableJob, self.board.claim,
possible_jobs[0], self.board.name + "-1")
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
self.assertEqual(0, len(possible_jobs))
def test_posting_no_post(self):
with connect_close(self.board):
with mock.patch.object(self.client, 'create') as create_func:
create_func.side_effect = IOError("Unable to post")
self.assertRaises(IOError, self.board.post,
'test', p_utils.temporary_log_book())
self.assertEqual(0, self.board.job_count)
def test_posting_with_book(self):
backend = impl_dir.DirBackend(conf={
'path': self.makeTmpDir(),
})
backend.get_connection().upgrade()
book, flow_detail = p_utils.temporary_flow_detail(backend)
self.assertEqual(1, len(book))
client, board = self._create_board(persistence=backend)
self.addCleanup(board.close)
with connect_close(board):
with flush(client):
board.post('test', book)
possible_jobs = list(board.iterjobs(only_unclaimed=True))
self.assertEqual(1, len(possible_jobs))
j = possible_jobs[0]
self.assertEqual(1, len(j.book))
self.assertEqual(book.name, j.book.name)
self.assertEqual(book.uuid, j.book.uuid)
flow_details = list(j.book)
self.assertEqual(flow_detail.uuid, flow_details[0].uuid)
self.assertEqual(flow_detail.name, flow_details[0].name)
def test_posting_abandon_no_owner(self):
with connect_close(self.board):
with flush(self.client):
self.board.post('test', p_utils.temporary_log_book())
self.assertEqual(1, self.board.job_count)
possible_jobs = list(self.board.iterjobs(only_unclaimed=True))
self.assertEqual(1, len(possible_jobs))
j = possible_jobs[0]
self.assertRaises(excp.JobFailure, self.board.abandon, j, j.name)