diff --git a/taskflow/jobs/backends/impl_zookeeper.py b/taskflow/jobs/backends/impl_zookeeper.py index 55c7b8eb..34ec5b42 100644 --- a/taskflow/jobs/backends/impl_zookeeper.py +++ b/taskflow/jobs/backends/impl_zookeeper.py @@ -55,11 +55,18 @@ TRASH_FOLDER = ".trash" JOB_PREFIX = 'job' -def _check_who(who): - if not isinstance(who, six.string_types): - raise TypeError("Job applicant must be a string type") - if len(who) == 0: - raise ValueError("Job applicant must be non-empty") +def check_who(meth): + """Decorator that checks the expected owner type & value restrictions.""" + + @six.wraps(meth) + def wrapper(self, job, who, *args, **kwargs): + if not isinstance(who, six.string_types): + raise TypeError("Job applicant must be a string type") + if len(who) == 0: + raise ValueError("Job applicant must be non-empty") + return meth(self, job, who, *args, **kwargs) + + return wrapper class ZookeeperJob(base.Job): @@ -535,6 +542,7 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): self._emit(base.POSTED, details={'job': job}) return job + @check_who def claim(self, job, who): def _unclaimable_try_find_owner(cause): try: @@ -547,7 +555,6 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): msg = "Job %s already claimed" % (job.uuid) return excp.UnclaimableJob(msg, cause) - _check_who(who) with self._wrap(job.uuid, job.path, "Claiming failure: %s"): # NOTE(harlowja): post as json which will allow for future changes # more easily than a raw string/text. @@ -626,8 +633,8 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): return (misc.decode_json(lock_data), lock_stat, misc.decode_json(job_data), job_stat) + @check_who def consume(self, job, who): - _check_who(who) with self._wrap(job.uuid, job.path, "Consumption failure: %s"): try: owner_data = self._get_owner_and_data(job) @@ -646,8 +653,8 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): kazoo_utils.checked_commit(txn) self._remove_job(job.path) + @check_who def abandon(self, job, who): - _check_who(who) with self._wrap(job.uuid, job.path, "Abandonment failure: %s"): try: owner_data = self._get_owner_and_data(job) @@ -664,8 +671,8 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): txn.delete(job.lock_path, version=lock_stat.version) kazoo_utils.checked_commit(txn) + @check_who def trash(self, job, who): - _check_who(who) with self._wrap(job.uuid, job.path, "Trash failure: %s"): try: owner_data = self._get_owner_and_data(job)