From 517fa604c38ea259e7521a43fcc540120576e71c Mon Sep 17 00:00:00 2001 From: Greg Hill Date: Mon, 23 Feb 2015 15:40:43 -0600 Subject: [PATCH] add jobboard trash method This allows you to move a job to the trash so it will not be re-attempted, but while also leaving the details behind for diagnostic purposes. Change-Id: I3126e8d771e4012241a5fba1cd61c752f87c9952 Implements: blueprint jobboard-garbage-bin --- taskflow/jobs/backends/impl_zookeeper.py | 34 +++++++++++++++++++++++- taskflow/jobs/base.py | 19 +++++++++++++ taskflow/tests/unit/jobs/test_zk_job.py | 30 ++++++++++++++++++++- taskflow/utils/kazoo_utils.py | 22 +-------------- 4 files changed, 82 insertions(+), 23 deletions(-) diff --git a/taskflow/jobs/backends/impl_zookeeper.py b/taskflow/jobs/backends/impl_zookeeper.py index 87ccac63..10f529eb 100644 --- a/taskflow/jobs/backends/impl_zookeeper.py +++ b/taskflow/jobs/backends/impl_zookeeper.py @@ -51,6 +51,7 @@ ALL_JOB_STATES = ( # Transaction support was added in 3.4.0 MIN_ZK_VERSION = (3, 4, 0) LOCK_POSTFIX = ".lock" +TRASH_FOLDER = ".trash" JOB_PREFIX = 'job' @@ -79,7 +80,7 @@ class ZookeeperJob(base.Job): raise ValueError("Only one of 'book_data' or 'book'" " can be provided") self._path = k_paths.normpath(path) - self._lock_path = path + LOCK_POSTFIX + self._lock_path = self._path + LOCK_POSTFIX self._created_on = created_on self._node_not_found = False basename = k_paths.basename(self._path) @@ -330,6 +331,8 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): if not k_paths.isabs(path): raise ValueError("Zookeeper path must be absolute") self._path = path + self._trash_path = self._path.replace(k_paths.basename(self._path), + TRASH_FOLDER) # The backend to load the full logbooks from, since whats sent over # the zookeeper data connection is only the logbook uuid and name, and # not currently the full logbook (later when a zookeeper backend @@ -362,6 +365,10 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): def path(self): return self._path + @property + def trash_path(self): + return self._trash_path + @property def job_count(self): return len(self._known_jobs) @@ -656,6 +663,30 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): txn.delete(job.lock_path, version=lock_stat.version) kazoo_utils.checked_commit(txn) + def trash(self, job, who): + _check_who(who) + with self._wrap(job.uuid, job.path, "Trash failure: %s"): + try: + owner_data = self._get_owner_and_data(job) + lock_data, lock_stat, data, data_stat = owner_data + except k_exceptions.NoNodeError: + raise excp.JobFailure("Can not trash a job %s" + " which we can not determine" + " the owner of" % (job.uuid)) + if lock_data.get("owner") != who: + raise excp.JobFailure("Can not trash a job %s" + " which is not owned by %s" + % (job.uuid, who)) + + trash_path = job.path.replace(self.path, self.trash_path) + value = misc.binary_encode(jsonutils.dumps(data)) + + txn = self._client.transaction() + txn.create(trash_path, value=value) + txn.delete(job.lock_path, version=lock_stat.version) + txn.delete(job.path, version=data_stat.version) + kazoo_utils.checked_commit(txn) + def _state_change_listener(self, state): LOG.debug("Kazoo client has changed to state: %s", state) @@ -725,6 +756,7 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): if self._worker is None and self._emit_notifications: self._worker = futures.ThreadPoolExecutor(max_workers=1) self._client.ensure_path(self.path) + self._client.ensure_path(self.trash_path) if self._job_watcher is None: self._job_watcher = watchers.ChildrenWatch( self._client, diff --git a/taskflow/jobs/base.py b/taskflow/jobs/base.py index eea5b12b..1a5bcf2f 100644 --- a/taskflow/jobs/base.py +++ b/taskflow/jobs/base.py @@ -260,6 +260,25 @@ class JobBoard(object): this must be the same name that was used for claiming this job. """ + @abc.abstractmethod + def trash(self, job, who): + """Trash the provided job. + + Trashing a job signals to others that the job is broken and should not + be reclaimed. This is provided as an option for users to be able to + remove jobs from the board externally. The trashed job details should + be kept around in an alternate location to be reviewed, if desired. + + Only the entity that has claimed that job can trash a job. Any entity + trashing a unclaimed job (or a job they do not own) will cause an + exception. + + :param job: a job on this jobboard that can be trashed (if it does + not exist then a NotFound exception will be raised). + :param who: string that names the entity performing the trashing, + this must be the same name that was used for claiming this job. + """ + @abc.abstractproperty def connected(self): """Returns if this jobboard is connected.""" diff --git a/taskflow/tests/unit/jobs/test_zk_job.py b/taskflow/tests/unit/jobs/test_zk_job.py index afff4123..17385306 100644 --- a/taskflow/tests/unit/jobs/test_zk_job.py +++ b/taskflow/tests/unit/jobs/test_zk_job.py @@ -73,7 +73,7 @@ class ZakeJobboardTest(test.TestCase, base.BoardTestMixin): def setUp(self): super(ZakeJobboardTest, self).setUp() self.client, self.board = self._create_board() - self.bad_paths = [self.board.path] + self.bad_paths = [self.board.path, self.board.trash_path] self.bad_paths.extend(zake_utils.partition_path(self.board.path)) def test_posting_owner_lost(self): @@ -118,6 +118,34 @@ class ZakeJobboardTest(test.TestCase, base.BoardTestMixin): self.client.storage.pop(path) self.assertEqual(states.UNCLAIMED, j.state) + def test_trashing_claimed_job(self): + + with base.connect_close(self.board): + with base.flush(self.client): + j = self.board.post('test', p_utils.temporary_log_book()) + self.assertEqual(states.UNCLAIMED, j.state) + with base.flush(self.client): + self.board.claim(j, self.board.name) + self.assertEqual(states.CLAIMED, j.state) + + with base.flush(self.client): + self.board.trash(j, self.board.name) + + trashed = [] + jobs = [] + paths = list(six.iteritems(self.client.storage.paths)) + for (path, value) in paths: + if path in self.bad_paths: + continue + if path.find(impl_zookeeper.TRASH_FOLDER) > -1: + trashed.append(path) + elif (path.find(self.board._job_base) > -1 + and not path.endswith(impl_zookeeper.LOCK_POSTFIX)): + jobs.append(path) + + self.assertEqual(len(trashed), 1) + self.assertEqual(len(jobs), 0) + def test_posting_received_raw(self): book = p_utils.temporary_log_book() diff --git a/taskflow/utils/kazoo_utils.py b/taskflow/utils/kazoo_utils.py index c2869bdd..f681dc46 100644 --- a/taskflow/utils/kazoo_utils.py +++ b/taskflow/utils/kazoo_utils.py @@ -37,27 +37,7 @@ def _parse_hosts(hosts): def prettify_failures(failures, limit=-1): - """Prettifies a checked commits failures (ignores sensitive data...). - - Example input and output: - - >>> from taskflow.utils import kazoo_utils - >>> conf = {"hosts": ['localhost:2181']} - >>> c = kazoo_utils.make_client(conf) - >>> c.start(timeout=1) - >>> txn = c.transaction() - >>> txn.create("/test") - >>> txn.check("/test", 2) - >>> txn.delete("/test") - >>> try: - ... kazoo_utils.checked_commit(txn) - ... except kazoo_utils.KazooTransactionException as e: - ... print(kazoo_utils.prettify_failures(e.failures, limit=1)) - ... - RolledBackError@Create(path='/test') and 2 more... - >>> c.stop() - >>> c.close() - """ + """Prettifies a checked commits failures (ignores sensitive data...).""" prettier = [] for (op, r) in failures: pretty_op = reflection.get_class_name(op, fully_qualified=False)