diff --git a/taskflow/jobs/backends/impl_zookeeper.py b/taskflow/jobs/backends/impl_zookeeper.py index ce8363c2..a4e512be 100644 --- a/taskflow/jobs/backends/impl_zookeeper.py +++ b/taskflow/jobs/backends/impl_zookeeper.py @@ -40,21 +40,6 @@ from taskflow.utils import misc LOG = logging.getLogger(__name__) -UNCLAIMED_JOB_STATES = ( - states.UNCLAIMED, -) -ALL_JOB_STATES = ( - states.UNCLAIMED, - states.COMPLETE, - states.CLAIMED, -) - -# Transaction support was added in 3.4.0 -MIN_ZK_VERSION = (3, 4, 0) -LOCK_POSTFIX = ".lock" -TRASH_FOLDER = ".trash" -JOB_PREFIX = 'job' - def check_who(meth): """Decorator that checks the expected owner type & value restrictions.""" @@ -88,12 +73,12 @@ class ZookeeperJob(base.Job): raise ValueError("Only one of 'book_data' or 'book'" " can be provided") self._path = k_paths.normpath(path) - self._lock_path = self._path + LOCK_POSTFIX + self._lock_path = self._path + board.LOCK_POSTFIX self._created_on = created_on self._node_not_found = False basename = k_paths.basename(self._path) self._root = self._path[0:-len(basename)] - self._sequence = int(basename[len(JOB_PREFIX):]) + self._sequence = int(basename[len(board.JOB_PREFIX):]) self._last_state = None @property @@ -274,6 +259,16 @@ class ZookeeperJobBoardIterator(six.Iterator): over unclaimed jobs. """ + _UNCLAIMED_JOB_STATES = ( + states.UNCLAIMED, + ) + + _JOB_STATES = ( + states.UNCLAIMED, + states.COMPLETE, + states.CLAIMED, + ) + def __init__(self, board, only_unclaimed=False, ensure_fresh=False): self._board = board self._jobs = collections.deque() @@ -290,9 +285,9 @@ class ZookeeperJobBoardIterator(six.Iterator): def _next_job(self): if self.only_unclaimed: - allowed_states = UNCLAIMED_JOB_STATES + allowed_states = self._UNCLAIMED_JOB_STATES else: - allowed_states = ALL_JOB_STATES + allowed_states = self._JOB_STATES job = None while self._jobs and job is None: maybe_job = self._jobs.popleft() @@ -343,6 +338,18 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): automatically by zookeeper the ephemeral is deemed to be lost). """ + #: Transaction support was added in 3.4.0 so we need at least that version. + MIN_ZK_VERSION = (3, 4, 0) + + #: Znode **postfix** that lock entries have. + LOCK_POSTFIX = ".lock" + + #: Znode child path created under root path that contains trashed jobs. + TRASH_FOLDER = ".trash" + + #: Znode **prefix** that job entries have. + JOB_PREFIX = 'job' + def __init__(self, name, conf, client=None, persistence=None, emit_notifications=True): super(ZookeeperJobBoard, self).__init__(name, conf) @@ -359,7 +366,7 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): raise ValueError("Zookeeper path must be absolute") self._path = path self._trash_path = self._path.replace(k_paths.basename(self._path), - TRASH_FOLDER) + self.TRASH_FOLDER) # The backend to load the full logbooks from, since whats sent over # the zookeeper data connection is only the logbook uuid and name, and # not currently the full logbook (later when a zookeeper backend @@ -375,7 +382,7 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): self._job_watcher = None # Since we use sequenced ids this will be the path that the sequences # are prefixed with, for example, job0000000001, job0000000002, ... - self._job_base = k_paths.join(path, JOB_PREFIX) + self._job_base = k_paths.join(path, self.JOB_PREFIX) self._worker = None self._emit_notifications = bool(emit_notifications) self._connected = False @@ -481,7 +488,8 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): LOG.debug("Got children %s under path %s", children, self.path) child_paths = [] for c in children: - if c.endswith(LOCK_POSTFIX) or not c.startswith(JOB_PREFIX): + if (c.endswith(self.LOCK_POSTFIX) or + not c.startswith(self.JOB_PREFIX)): # Skip lock paths or non-job-paths (these are not valid jobs) continue child_paths.append(k_paths.join(self.path, c)) @@ -788,7 +796,7 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): "Failed to connect to zookeeper") try: if self._conf.get('check_compatible', True): - kazoo_utils.check_compatible(self._client, MIN_ZK_VERSION) + kazoo_utils.check_compatible(self._client, self.MIN_ZK_VERSION) if self._worker is None and self._emit_notifications: self._worker = futures.ThreadPoolExecutor(max_workers=1) self._client.ensure_path(self.path) diff --git a/taskflow/tests/unit/jobs/test_zk_job.py b/taskflow/tests/unit/jobs/test_zk_job.py index 17385306..a22a41b9 100644 --- a/taskflow/tests/unit/jobs/test_zk_job.py +++ b/taskflow/tests/unit/jobs/test_zk_job.py @@ -32,11 +32,13 @@ from taskflow.utils import persistence_utils as p_utils TEST_PATH_TPL = '/taskflow/board-test/%s' -_ZOOKEEPER_AVAILABLE = test_utils.zookeeper_available( - impl_zookeeper.MIN_ZK_VERSION) +ZOOKEEPER_AVAILABLE = test_utils.zookeeper_available( + impl_zookeeper.ZookeeperJobBoard.MIN_ZK_VERSION) +TRASH_FOLDER = impl_zookeeper.ZookeeperJobBoard.TRASH_FOLDER +LOCK_POSTFIX = impl_zookeeper.ZookeeperJobBoard.LOCK_POSTFIX -@testtools.skipIf(not _ZOOKEEPER_AVAILABLE, 'zookeeper is not available') +@testtools.skipIf(not ZOOKEEPER_AVAILABLE, 'zookeeper is not available') class ZookeeperJobboardTest(test.TestCase, base.BoardTestMixin): def _create_board(self, persistence=None): @@ -137,10 +139,10 @@ class ZakeJobboardTest(test.TestCase, base.BoardTestMixin): for (path, value) in paths: if path in self.bad_paths: continue - if path.find(impl_zookeeper.TRASH_FOLDER) > -1: + if path.find(TRASH_FOLDER) > -1: trashed.append(path) elif (path.find(self.board._job_base) > -1 - and not path.endswith(impl_zookeeper.LOCK_POSTFIX)): + and not path.endswith(LOCK_POSTFIX)): jobs.append(path) self.assertEqual(len(trashed), 1)