From 8732e4772d7266c9319bd7bada96dc0d13aae4d2 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Wed, 9 Apr 2014 14:25:15 -0700 Subject: [PATCH] Add last_modified & created_on attributes to jobs In order to be able to allow jobboard object users to know when a job was last modified or when the job was created we need to provide accessors to make this possible. These can be used to claim a job after a given period, or for general tracking of which are the oldest jobs... Implements: blueprint job-reference-impl Change-Id: I467bb083d0b143826a44c6aeb6499c483b88fe65 --- taskflow/jobs/backends/impl_zookeeper.py | 62 ++++++++++++++++++++++-- taskflow/jobs/job.py | 10 ++++ taskflow/tests/unit/jobs/test_zk_job.py | 14 ++++++ taskflow/utils/misc.py | 8 +++ 4 files changed, 91 insertions(+), 3 deletions(-) diff --git a/taskflow/jobs/backends/impl_zookeeper.py b/taskflow/jobs/backends/impl_zookeeper.py index c7be06246..b0a53eccf 100644 --- a/taskflow/jobs/backends/impl_zookeeper.py +++ b/taskflow/jobs/backends/impl_zookeeper.py @@ -60,7 +60,8 @@ def _check_who(who): class ZookeeperJob(base_job.Job): def __init__(self, name, board, client, backend, path, - uuid=None, details=None, book=None, book_data=None): + uuid=None, details=None, book=None, book_data=None, + created_on=None): super(ZookeeperJob, self).__init__(name, uuid=uuid, details=details) self._board = board self._book = book @@ -74,6 +75,8 @@ class ZookeeperJob(base_job.Job): " can be provided") self._path = path self._lock_path = "%s.lock" % (path) + self._created_on = created_on + self._node_not_found = False @property def lock_path(self): @@ -83,6 +86,57 @@ class ZookeeperJob(base_job.Job): def path(self): return self._path + def _get_node_attr(self, path, attr_name, trans_func=None): + try: + _data, node_stat = self._client.get(path) + attr = getattr(node_stat, attr_name) + if trans_func is not None: + return trans_func(attr) + else: + return attr + except k_exceptions.NoNodeError as e: + raise excp.NotFound("Can not fetch the %r attribute" + " of job %s (%s), path %s not found" + % (attr_name, self.uuid, self.path, path), e) + except self._client.handler.timeout_exception as e: + raise excp.JobFailure("Can not fetch the %r attribute" + " of job %s (%s), connection timed out" + % (attr_name, self.uuid, self.path), e) + except k_exceptions.SessionExpiredError as e: + raise excp.JobFailure("Can not fetch the %r attribute" + " of job %s (%s), session expired" + % (attr_name, self.uuid, self.path), e) + except (AttributeError, k_exceptions.KazooException) as e: + raise excp.JobFailure("Can not fetch the %r attribute" + " of job %s (%s), internal error" % + (attr_name, self.uuid, self.path), e) + + @property + def last_modified(self): + modified_on = None + try: + if not self._node_not_found: + modified_on = self._get_node_attr( + self.path, 'mtime', + trans_func=misc.millis_to_datetime) + except excp.NotFound: + self._node_not_found = True + return modified_on + + @property + def created_on(self): + # This one we can cache (since it won't change after creation). + if self._node_not_found: + return None + if self._created_on is None: + try: + self._created_on = self._get_node_attr( + self.path, 'ctime', + trans_func=misc.millis_to_datetime) + except excp.NotFound: + self._node_not_found = True + return self._created_on + @property def board(self): return self._board @@ -243,15 +297,17 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard): def _process_child(self, path, request): """Receives the result of a child data fetch request.""" try: - raw_data, _stat = request.get() + raw_data, node_stat = request.get() job_data = misc.decode_json(raw_data) + created_on = misc.millis_to_datetime(node_stat.ctime) with self._job_mutate: if path not in self._known_jobs: job = ZookeeperJob(job_data['name'], self, self._client, self._persistence, path, uuid=job_data['uuid'], book_data=job_data.get("book"), - details=job_data.get("details", {})) + details=job_data.get("details", {}), + created_on=created_on) self._known_jobs[path] = job self._emit(jobboard.POSTED, details={ diff --git a/taskflow/jobs/job.py b/taskflow/jobs/job.py index fbb98f2dc..a02649019 100644 --- a/taskflow/jobs/job.py +++ b/taskflow/jobs/job.py @@ -46,6 +46,16 @@ class Job(object): details = {} self._details = details + @abc.abstractproperty + def last_modified(self): + """The datetime the job was last modified.""" + pass + + @abc.abstractproperty + def created_on(self): + """The datetime the job was created on.""" + pass + @abc.abstractproperty def board(self): """The board this job was posted on or was created from.""" diff --git a/taskflow/tests/unit/jobs/test_zk_job.py b/taskflow/tests/unit/jobs/test_zk_job.py index 93a074f11..26a0fec5d 100644 --- a/taskflow/tests/unit/jobs/test_zk_job.py +++ b/taskflow/tests/unit/jobs/test_zk_job.py @@ -67,6 +67,20 @@ class TestZookeeperJobs(test.TestCase): self.client.flush() self.assertTrue(self.board.connected) + @mock.patch("taskflow.jobs.backends.impl_zookeeper.misc." + "millis_to_datetime") + def test_posting_dates(self, mock_dt): + epoch = misc.millis_to_datetime(0) + mock_dt.return_value = epoch + + with connect_close(self.board): + j = self.board.post('test', p_utils.temporary_log_book()) + self.client.flush() + self.assertEqual(epoch, j.created_on) + self.assertEqual(epoch, j.last_modified) + + self.assertTrue(mock_dt.called) + def test_fresh_iter(self): with connect_close(self.board): book = p_utils.temporary_log_book() diff --git a/taskflow/utils/misc.py b/taskflow/utils/misc.py index a2d4a48a2..ed9a33a76 100644 --- a/taskflow/utils/misc.py +++ b/taskflow/utils/misc.py @@ -18,6 +18,7 @@ import collections import contextlib import copy +import datetime import errno import functools import keyword @@ -106,6 +107,13 @@ def wraps(fn): return wrapper +def millis_to_datetime(milliseconds): + """Converts a given number of milliseconds from the epoch into a datetime + object. + """ + return datetime.datetime.fromtimestamp(float(milliseconds) / 1000) + + def get_version_string(obj): """Gets a object's version as a string.