Add last_modified & created_on attributes to jobs

In order to be able to allow jobboard object users
to know when a job was last modified or when the
job was created we need to provide accessors to make
this possible. These can be used to claim a job after
a given period, or for general tracking of which are
the oldest jobs...

Implements: blueprint job-reference-impl

Change-Id: I467bb083d0b143826a44c6aeb6499c483b88fe65
This commit is contained in:
Joshua Harlow 2014-04-09 14:25:15 -07:00
parent 843b487736
commit 8732e4772d
4 changed files with 91 additions and 3 deletions

View File

@ -60,7 +60,8 @@ def _check_who(who):
class ZookeeperJob(base_job.Job): class ZookeeperJob(base_job.Job):
def __init__(self, name, board, client, backend, path, def __init__(self, name, board, client, backend, path,
uuid=None, details=None, book=None, book_data=None): uuid=None, details=None, book=None, book_data=None,
created_on=None):
super(ZookeeperJob, self).__init__(name, uuid=uuid, details=details) super(ZookeeperJob, self).__init__(name, uuid=uuid, details=details)
self._board = board self._board = board
self._book = book self._book = book
@ -74,6 +75,8 @@ class ZookeeperJob(base_job.Job):
" can be provided") " can be provided")
self._path = path self._path = path
self._lock_path = "%s.lock" % (path) self._lock_path = "%s.lock" % (path)
self._created_on = created_on
self._node_not_found = False
@property @property
def lock_path(self): def lock_path(self):
@ -83,6 +86,57 @@ class ZookeeperJob(base_job.Job):
def path(self): def path(self):
return self._path return self._path
def _get_node_attr(self, path, attr_name, trans_func=None):
try:
_data, node_stat = self._client.get(path)
attr = getattr(node_stat, attr_name)
if trans_func is not None:
return trans_func(attr)
else:
return attr
except k_exceptions.NoNodeError as e:
raise excp.NotFound("Can not fetch the %r attribute"
" of job %s (%s), path %s not found"
% (attr_name, self.uuid, self.path, path), e)
except self._client.handler.timeout_exception as e:
raise excp.JobFailure("Can not fetch the %r attribute"
" of job %s (%s), connection timed out"
% (attr_name, self.uuid, self.path), e)
except k_exceptions.SessionExpiredError as e:
raise excp.JobFailure("Can not fetch the %r attribute"
" of job %s (%s), session expired"
% (attr_name, self.uuid, self.path), e)
except (AttributeError, k_exceptions.KazooException) as e:
raise excp.JobFailure("Can not fetch the %r attribute"
" of job %s (%s), internal error" %
(attr_name, self.uuid, self.path), e)
@property
def last_modified(self):
modified_on = None
try:
if not self._node_not_found:
modified_on = self._get_node_attr(
self.path, 'mtime',
trans_func=misc.millis_to_datetime)
except excp.NotFound:
self._node_not_found = True
return modified_on
@property
def created_on(self):
# This one we can cache (since it won't change after creation).
if self._node_not_found:
return None
if self._created_on is None:
try:
self._created_on = self._get_node_attr(
self.path, 'ctime',
trans_func=misc.millis_to_datetime)
except excp.NotFound:
self._node_not_found = True
return self._created_on
@property @property
def board(self): def board(self):
return self._board return self._board
@ -243,15 +297,17 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard):
def _process_child(self, path, request): def _process_child(self, path, request):
"""Receives the result of a child data fetch request.""" """Receives the result of a child data fetch request."""
try: try:
raw_data, _stat = request.get() raw_data, node_stat = request.get()
job_data = misc.decode_json(raw_data) job_data = misc.decode_json(raw_data)
created_on = misc.millis_to_datetime(node_stat.ctime)
with self._job_mutate: with self._job_mutate:
if path not in self._known_jobs: if path not in self._known_jobs:
job = ZookeeperJob(job_data['name'], self, job = ZookeeperJob(job_data['name'], self,
self._client, self._persistence, path, self._client, self._persistence, path,
uuid=job_data['uuid'], uuid=job_data['uuid'],
book_data=job_data.get("book"), book_data=job_data.get("book"),
details=job_data.get("details", {})) details=job_data.get("details", {}),
created_on=created_on)
self._known_jobs[path] = job self._known_jobs[path] = job
self._emit(jobboard.POSTED, self._emit(jobboard.POSTED,
details={ details={

View File

@ -46,6 +46,16 @@ class Job(object):
details = {} details = {}
self._details = details self._details = details
@abc.abstractproperty
def last_modified(self):
"""The datetime the job was last modified."""
pass
@abc.abstractproperty
def created_on(self):
"""The datetime the job was created on."""
pass
@abc.abstractproperty @abc.abstractproperty
def board(self): def board(self):
"""The board this job was posted on or was created from.""" """The board this job was posted on or was created from."""

View File

@ -67,6 +67,20 @@ class TestZookeeperJobs(test.TestCase):
self.client.flush() self.client.flush()
self.assertTrue(self.board.connected) self.assertTrue(self.board.connected)
@mock.patch("taskflow.jobs.backends.impl_zookeeper.misc."
"millis_to_datetime")
def test_posting_dates(self, mock_dt):
epoch = misc.millis_to_datetime(0)
mock_dt.return_value = epoch
with connect_close(self.board):
j = self.board.post('test', p_utils.temporary_log_book())
self.client.flush()
self.assertEqual(epoch, j.created_on)
self.assertEqual(epoch, j.last_modified)
self.assertTrue(mock_dt.called)
def test_fresh_iter(self): def test_fresh_iter(self):
with connect_close(self.board): with connect_close(self.board):
book = p_utils.temporary_log_book() book = p_utils.temporary_log_book()

View File

@ -18,6 +18,7 @@
import collections import collections
import contextlib import contextlib
import copy import copy
import datetime
import errno import errno
import functools import functools
import keyword import keyword
@ -106,6 +107,13 @@ def wraps(fn):
return wrapper return wrapper
def millis_to_datetime(milliseconds):
"""Converts a given number of milliseconds from the epoch into a datetime
object.
"""
return datetime.datetime.fromtimestamp(float(milliseconds) / 1000)
def get_version_string(obj): def get_version_string(obj):
"""Gets a object's version as a string. """Gets a object's version as a string.