Move zookeeper jobboard constants to class level

To make it easy to document and inquire on the zookeeper
job constants expose them as class level instead of module
level constants (and use them where needed).

Change-Id: Ice53db104c55ba4a7794a358d56922074d44492b
This commit is contained in:
Joshua Harlow
2015-04-27 15:45:22 -07:00
parent 426d08f951
commit 3097e52be8
2 changed files with 38 additions and 28 deletions

View File

@@ -40,21 +40,6 @@ from taskflow.utils import misc
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
UNCLAIMED_JOB_STATES = (
states.UNCLAIMED,
)
ALL_JOB_STATES = (
states.UNCLAIMED,
states.COMPLETE,
states.CLAIMED,
)
# Transaction support was added in 3.4.0
MIN_ZK_VERSION = (3, 4, 0)
LOCK_POSTFIX = ".lock"
TRASH_FOLDER = ".trash"
JOB_PREFIX = 'job'
def check_who(meth): def check_who(meth):
"""Decorator that checks the expected owner type & value restrictions.""" """Decorator that checks the expected owner type & value restrictions."""
@@ -88,12 +73,12 @@ class ZookeeperJob(base.Job):
raise ValueError("Only one of 'book_data' or 'book'" raise ValueError("Only one of 'book_data' or 'book'"
" can be provided") " can be provided")
self._path = k_paths.normpath(path) self._path = k_paths.normpath(path)
self._lock_path = self._path + LOCK_POSTFIX self._lock_path = self._path + board.LOCK_POSTFIX
self._created_on = created_on self._created_on = created_on
self._node_not_found = False self._node_not_found = False
basename = k_paths.basename(self._path) basename = k_paths.basename(self._path)
self._root = self._path[0:-len(basename)] self._root = self._path[0:-len(basename)]
self._sequence = int(basename[len(JOB_PREFIX):]) self._sequence = int(basename[len(board.JOB_PREFIX):])
self._last_state = None self._last_state = None
@property @property
@@ -274,6 +259,16 @@ class ZookeeperJobBoardIterator(six.Iterator):
over unclaimed jobs. over unclaimed jobs.
""" """
_UNCLAIMED_JOB_STATES = (
states.UNCLAIMED,
)
_JOB_STATES = (
states.UNCLAIMED,
states.COMPLETE,
states.CLAIMED,
)
def __init__(self, board, only_unclaimed=False, ensure_fresh=False): def __init__(self, board, only_unclaimed=False, ensure_fresh=False):
self._board = board self._board = board
self._jobs = collections.deque() self._jobs = collections.deque()
@@ -290,9 +285,9 @@ class ZookeeperJobBoardIterator(six.Iterator):
def _next_job(self): def _next_job(self):
if self.only_unclaimed: if self.only_unclaimed:
allowed_states = UNCLAIMED_JOB_STATES allowed_states = self._UNCLAIMED_JOB_STATES
else: else:
allowed_states = ALL_JOB_STATES allowed_states = self._JOB_STATES
job = None job = None
while self._jobs and job is None: while self._jobs and job is None:
maybe_job = self._jobs.popleft() maybe_job = self._jobs.popleft()
@@ -343,6 +338,18 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
automatically by zookeeper the ephemeral is deemed to be lost). automatically by zookeeper the ephemeral is deemed to be lost).
""" """
#: Transaction support was added in 3.4.0 so we need at least that version.
MIN_ZK_VERSION = (3, 4, 0)
#: Znode **postfix** that lock entries have.
LOCK_POSTFIX = ".lock"
#: Znode child path created under root path that contains trashed jobs.
TRASH_FOLDER = ".trash"
#: Znode **prefix** that job entries have.
JOB_PREFIX = 'job'
def __init__(self, name, conf, def __init__(self, name, conf,
client=None, persistence=None, emit_notifications=True): client=None, persistence=None, emit_notifications=True):
super(ZookeeperJobBoard, self).__init__(name, conf) super(ZookeeperJobBoard, self).__init__(name, conf)
@@ -359,7 +366,7 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
raise ValueError("Zookeeper path must be absolute") raise ValueError("Zookeeper path must be absolute")
self._path = path self._path = path
self._trash_path = self._path.replace(k_paths.basename(self._path), self._trash_path = self._path.replace(k_paths.basename(self._path),
TRASH_FOLDER) self.TRASH_FOLDER)
# The backend to load the full logbooks from, since whats sent over # The backend to load the full logbooks from, since whats sent over
# the zookeeper data connection is only the logbook uuid and name, and # the zookeeper data connection is only the logbook uuid and name, and
# not currently the full logbook (later when a zookeeper backend # not currently the full logbook (later when a zookeeper backend
@@ -375,7 +382,7 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
self._job_watcher = None self._job_watcher = None
# Since we use sequenced ids this will be the path that the sequences # Since we use sequenced ids this will be the path that the sequences
# are prefixed with, for example, job0000000001, job0000000002, ... # are prefixed with, for example, job0000000001, job0000000002, ...
self._job_base = k_paths.join(path, JOB_PREFIX) self._job_base = k_paths.join(path, self.JOB_PREFIX)
self._worker = None self._worker = None
self._emit_notifications = bool(emit_notifications) self._emit_notifications = bool(emit_notifications)
self._connected = False self._connected = False
@@ -481,7 +488,8 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
LOG.debug("Got children %s under path %s", children, self.path) LOG.debug("Got children %s under path %s", children, self.path)
child_paths = [] child_paths = []
for c in children: for c in children:
if c.endswith(LOCK_POSTFIX) or not c.startswith(JOB_PREFIX): if (c.endswith(self.LOCK_POSTFIX) or
not c.startswith(self.JOB_PREFIX)):
# Skip lock paths or non-job-paths (these are not valid jobs) # Skip lock paths or non-job-paths (these are not valid jobs)
continue continue
child_paths.append(k_paths.join(self.path, c)) child_paths.append(k_paths.join(self.path, c))
@@ -788,7 +796,7 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
"Failed to connect to zookeeper") "Failed to connect to zookeeper")
try: try:
if self._conf.get('check_compatible', True): if self._conf.get('check_compatible', True):
kazoo_utils.check_compatible(self._client, MIN_ZK_VERSION) kazoo_utils.check_compatible(self._client, self.MIN_ZK_VERSION)
if self._worker is None and self._emit_notifications: if self._worker is None and self._emit_notifications:
self._worker = futures.ThreadPoolExecutor(max_workers=1) self._worker = futures.ThreadPoolExecutor(max_workers=1)
self._client.ensure_path(self.path) self._client.ensure_path(self.path)

View File

@@ -32,11 +32,13 @@ from taskflow.utils import persistence_utils as p_utils
TEST_PATH_TPL = '/taskflow/board-test/%s' TEST_PATH_TPL = '/taskflow/board-test/%s'
_ZOOKEEPER_AVAILABLE = test_utils.zookeeper_available( ZOOKEEPER_AVAILABLE = test_utils.zookeeper_available(
impl_zookeeper.MIN_ZK_VERSION) impl_zookeeper.ZookeeperJobBoard.MIN_ZK_VERSION)
TRASH_FOLDER = impl_zookeeper.ZookeeperJobBoard.TRASH_FOLDER
LOCK_POSTFIX = impl_zookeeper.ZookeeperJobBoard.LOCK_POSTFIX
@testtools.skipIf(not _ZOOKEEPER_AVAILABLE, 'zookeeper is not available') @testtools.skipIf(not ZOOKEEPER_AVAILABLE, 'zookeeper is not available')
class ZookeeperJobboardTest(test.TestCase, base.BoardTestMixin): class ZookeeperJobboardTest(test.TestCase, base.BoardTestMixin):
def _create_board(self, persistence=None): def _create_board(self, persistence=None):
@@ -137,10 +139,10 @@ class ZakeJobboardTest(test.TestCase, base.BoardTestMixin):
for (path, value) in paths: for (path, value) in paths:
if path in self.bad_paths: if path in self.bad_paths:
continue continue
if path.find(impl_zookeeper.TRASH_FOLDER) > -1: if path.find(TRASH_FOLDER) > -1:
trashed.append(path) trashed.append(path)
elif (path.find(self.board._job_base) > -1 elif (path.find(self.board._job_base) > -1
and not path.endswith(impl_zookeeper.LOCK_POSTFIX)): and not path.endswith(LOCK_POSTFIX)):
jobs.append(path) jobs.append(path)
self.assertEqual(len(trashed), 1) self.assertEqual(len(trashed), 1)