add jobboard trash method

This allows you to move a job to the trash so it
will not be re-attempted, but while also leaving
the details behind for diagnostic purposes.

Change-Id: I3126e8d771e4012241a5fba1cd61c752f87c9952
Implements: blueprint jobboard-garbage-bin
This commit is contained in:
Greg Hill
2015-02-23 15:40:43 -06:00
parent ff66027eef
commit 517fa604c3
4 changed files with 82 additions and 23 deletions

View File

@@ -51,6 +51,7 @@ ALL_JOB_STATES = (
# Transaction support was added in 3.4.0 # Transaction support was added in 3.4.0
MIN_ZK_VERSION = (3, 4, 0) MIN_ZK_VERSION = (3, 4, 0)
LOCK_POSTFIX = ".lock" LOCK_POSTFIX = ".lock"
TRASH_FOLDER = ".trash"
JOB_PREFIX = 'job' JOB_PREFIX = 'job'
@@ -79,7 +80,7 @@ class ZookeeperJob(base.Job):
raise ValueError("Only one of 'book_data' or 'book'" raise ValueError("Only one of 'book_data' or 'book'"
" can be provided") " can be provided")
self._path = k_paths.normpath(path) self._path = k_paths.normpath(path)
self._lock_path = path + LOCK_POSTFIX self._lock_path = self._path + LOCK_POSTFIX
self._created_on = created_on self._created_on = created_on
self._node_not_found = False self._node_not_found = False
basename = k_paths.basename(self._path) basename = k_paths.basename(self._path)
@@ -330,6 +331,8 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
if not k_paths.isabs(path): if not k_paths.isabs(path):
raise ValueError("Zookeeper path must be absolute") raise ValueError("Zookeeper path must be absolute")
self._path = path self._path = path
self._trash_path = self._path.replace(k_paths.basename(self._path),
TRASH_FOLDER)
# The backend to load the full logbooks from, since whats sent over # The backend to load the full logbooks from, since whats sent over
# the zookeeper data connection is only the logbook uuid and name, and # the zookeeper data connection is only the logbook uuid and name, and
# not currently the full logbook (later when a zookeeper backend # not currently the full logbook (later when a zookeeper backend
@@ -362,6 +365,10 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
def path(self): def path(self):
return self._path return self._path
@property
def trash_path(self):
return self._trash_path
@property @property
def job_count(self): def job_count(self):
return len(self._known_jobs) return len(self._known_jobs)
@@ -656,6 +663,30 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
txn.delete(job.lock_path, version=lock_stat.version) txn.delete(job.lock_path, version=lock_stat.version)
kazoo_utils.checked_commit(txn) kazoo_utils.checked_commit(txn)
def trash(self, job, who):
_check_who(who)
with self._wrap(job.uuid, job.path, "Trash failure: %s"):
try:
owner_data = self._get_owner_and_data(job)
lock_data, lock_stat, data, data_stat = owner_data
except k_exceptions.NoNodeError:
raise excp.JobFailure("Can not trash a job %s"
" which we can not determine"
" the owner of" % (job.uuid))
if lock_data.get("owner") != who:
raise excp.JobFailure("Can not trash a job %s"
" which is not owned by %s"
% (job.uuid, who))
trash_path = job.path.replace(self.path, self.trash_path)
value = misc.binary_encode(jsonutils.dumps(data))
txn = self._client.transaction()
txn.create(trash_path, value=value)
txn.delete(job.lock_path, version=lock_stat.version)
txn.delete(job.path, version=data_stat.version)
kazoo_utils.checked_commit(txn)
def _state_change_listener(self, state): def _state_change_listener(self, state):
LOG.debug("Kazoo client has changed to state: %s", state) LOG.debug("Kazoo client has changed to state: %s", state)
@@ -725,6 +756,7 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
if self._worker is None and self._emit_notifications: if self._worker is None and self._emit_notifications:
self._worker = futures.ThreadPoolExecutor(max_workers=1) self._worker = futures.ThreadPoolExecutor(max_workers=1)
self._client.ensure_path(self.path) self._client.ensure_path(self.path)
self._client.ensure_path(self.trash_path)
if self._job_watcher is None: if self._job_watcher is None:
self._job_watcher = watchers.ChildrenWatch( self._job_watcher = watchers.ChildrenWatch(
self._client, self._client,

View File

@@ -260,6 +260,25 @@ class JobBoard(object):
this must be the same name that was used for claiming this job. this must be the same name that was used for claiming this job.
""" """
@abc.abstractmethod
def trash(self, job, who):
"""Trash the provided job.
Trashing a job signals to others that the job is broken and should not
be reclaimed. This is provided as an option for users to be able to
remove jobs from the board externally. The trashed job details should
be kept around in an alternate location to be reviewed, if desired.
Only the entity that has claimed that job can trash a job. Any entity
trashing a unclaimed job (or a job they do not own) will cause an
exception.
:param job: a job on this jobboard that can be trashed (if it does
not exist then a NotFound exception will be raised).
:param who: string that names the entity performing the trashing,
this must be the same name that was used for claiming this job.
"""
@abc.abstractproperty @abc.abstractproperty
def connected(self): def connected(self):
"""Returns if this jobboard is connected.""" """Returns if this jobboard is connected."""

View File

@@ -73,7 +73,7 @@ class ZakeJobboardTest(test.TestCase, base.BoardTestMixin):
def setUp(self): def setUp(self):
super(ZakeJobboardTest, self).setUp() super(ZakeJobboardTest, self).setUp()
self.client, self.board = self._create_board() self.client, self.board = self._create_board()
self.bad_paths = [self.board.path] self.bad_paths = [self.board.path, self.board.trash_path]
self.bad_paths.extend(zake_utils.partition_path(self.board.path)) self.bad_paths.extend(zake_utils.partition_path(self.board.path))
def test_posting_owner_lost(self): def test_posting_owner_lost(self):
@@ -118,6 +118,34 @@ class ZakeJobboardTest(test.TestCase, base.BoardTestMixin):
self.client.storage.pop(path) self.client.storage.pop(path)
self.assertEqual(states.UNCLAIMED, j.state) self.assertEqual(states.UNCLAIMED, j.state)
def test_trashing_claimed_job(self):
with base.connect_close(self.board):
with base.flush(self.client):
j = self.board.post('test', p_utils.temporary_log_book())
self.assertEqual(states.UNCLAIMED, j.state)
with base.flush(self.client):
self.board.claim(j, self.board.name)
self.assertEqual(states.CLAIMED, j.state)
with base.flush(self.client):
self.board.trash(j, self.board.name)
trashed = []
jobs = []
paths = list(six.iteritems(self.client.storage.paths))
for (path, value) in paths:
if path in self.bad_paths:
continue
if path.find(impl_zookeeper.TRASH_FOLDER) > -1:
trashed.append(path)
elif (path.find(self.board._job_base) > -1
and not path.endswith(impl_zookeeper.LOCK_POSTFIX)):
jobs.append(path)
self.assertEqual(len(trashed), 1)
self.assertEqual(len(jobs), 0)
def test_posting_received_raw(self): def test_posting_received_raw(self):
book = p_utils.temporary_log_book() book = p_utils.temporary_log_book()

View File

@@ -37,27 +37,7 @@ def _parse_hosts(hosts):
def prettify_failures(failures, limit=-1): def prettify_failures(failures, limit=-1):
"""Prettifies a checked commits failures (ignores sensitive data...). """Prettifies a checked commits failures (ignores sensitive data...)."""
Example input and output:
>>> from taskflow.utils import kazoo_utils
>>> conf = {"hosts": ['localhost:2181']}
>>> c = kazoo_utils.make_client(conf)
>>> c.start(timeout=1)
>>> txn = c.transaction()
>>> txn.create("/test")
>>> txn.check("/test", 2)
>>> txn.delete("/test")
>>> try:
... kazoo_utils.checked_commit(txn)
... except kazoo_utils.KazooTransactionException as e:
... print(kazoo_utils.prettify_failures(e.failures, limit=1))
...
RolledBackError@Create(path='/test') and 2 more...
>>> c.stop()
>>> c.close()
"""
prettier = [] prettier = []
for (op, r) in failures: for (op, r) in failures:
pretty_op = reflection.get_class_name(op, fully_qualified=False) pretty_op = reflection.get_class_name(op, fully_qualified=False)