From f14ee9ea5cf2f42951a1322a1250c28f3e447585 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Fri, 19 Dec 2014 16:34:21 -0800 Subject: [PATCH] Move the jobboard/job bases to a jobboard/base module In order to match the directory/module layout of the other pluggable backends better move the jobboard modules that define the base abstract classes into a single base file. This makes it easier to look at the taskflow code-base and understand the common layout. This also makes the docs for the zookeeper jobboard better and includes them in the generated developer docs under a implementations section. Change-Id: I36f29c37dcf2403782a75e45665bd7c0a146a06e --- doc/source/jobs.rst | 19 ++- taskflow/jobs/backends/impl_zookeeper.py | 46 ++++++-- taskflow/jobs/{jobboard.py => base.py} | 91 ++++++++++++++ taskflow/jobs/job.py | 111 ------------------ .../tests/unit/conductor/test_conductor.py | 6 +- 5 files changed, 144 insertions(+), 129 deletions(-) rename taskflow/jobs/{jobboard.py => base.py} (74%) delete mode 100644 taskflow/jobs/job.py diff --git a/doc/source/jobs.rst b/doc/source/jobs.rst index f36d69a1a..06f1123e1 100644 --- a/doc/source/jobs.rst +++ b/doc/source/jobs.rst @@ -28,14 +28,14 @@ Definitions =========== Jobs - A :py:class:`job ` consists of a unique identifier, + A :py:class:`job ` consists of a unique identifier, name, and a reference to a :py:class:`logbook ` which contains the details of the work that has been or should be/will be completed to finish the work that has been created for that job. Jobboards - A :py:class:`jobboard ` is responsible for + A :py:class:`jobboard ` is responsible for managing the posting, ownership, and delivery of jobs. It acts as the location where jobs can be posted, claimed and searched for; typically by iteration or notification. Jobboards may be backed by different *capable* @@ -202,6 +202,11 @@ Additional *configuration* parameters: when your program uses eventlet and you want to instruct kazoo to use an eventlet compatible handler (such as the `eventlet handler`_). +.. note:: + + See :py:class:`~taskflow.jobs.backends.impl_zookeeper.ZookeeperJobBoard` + for implementation details. + Considerations ============== @@ -254,15 +259,19 @@ the claim by then, therefore both would be *working* on a job. Interfaces ========== +.. automodule:: taskflow.jobs.base .. automodule:: taskflow.jobs.backends -.. automodule:: taskflow.jobs.job -.. automodule:: taskflow.jobs.jobboard + +Implementations +=============== + +.. automodule:: taskflow.jobs.backends.impl_zookeeper Hierarchy ========= .. inheritance-diagram:: - taskflow.jobs.jobboard + taskflow.jobs.base taskflow.jobs.backends.impl_zookeeper :parts: 1 diff --git a/taskflow/jobs/backends/impl_zookeeper.py b/taskflow/jobs/backends/impl_zookeeper.py index 186be8e63..36e0daea1 100644 --- a/taskflow/jobs/backends/impl_zookeeper.py +++ b/taskflow/jobs/backends/impl_zookeeper.py @@ -29,8 +29,7 @@ from oslo_utils import uuidutils import six from taskflow import exceptions as excp -from taskflow.jobs import job as base_job -from taskflow.jobs import jobboard +from taskflow.jobs import base from taskflow import logging from taskflow import states from taskflow.types import timing as tt @@ -62,7 +61,9 @@ def _check_who(who): raise ValueError("Job applicant must be non-empty") -class ZookeeperJob(base_job.Job): +class ZookeeperJob(base.Job): + """A zookeeper job.""" + def __init__(self, name, board, client, backend, path, uuid=None, details=None, book=None, book_data=None, created_on=None): @@ -95,10 +96,12 @@ class ZookeeperJob(base_job.Job): @property def sequence(self): + """Sequence number of the current job.""" return self._sequence @property def root(self): + """The parent path of the job in zookeeper.""" return self._root def _get_node_attr(self, path, attr_name, trans_func=None): @@ -232,14 +235,14 @@ class ZookeeperJob(base_job.Job): class ZookeeperJobBoardIterator(six.Iterator): - """Iterator over a zookeeper jobboard. + """Iterator over a zookeeper jobboard that iterates over potential jobs. It supports the following attributes/constructor arguments: - * ensure_fresh: boolean that requests that during every fetch of a new + * ``ensure_fresh``: boolean that requests that during every fetch of a new set of jobs this will cause the iterator to force the backend to refresh (ensuring that the jobboard has the most recent job listings). - * only_unclaimed: boolean that indicates whether to only iterate + * ``only_unclaimed``: boolean that indicates whether to only iterate over unclaimed jobs. """ @@ -288,7 +291,30 @@ class ZookeeperJobBoardIterator(six.Iterator): return job -class ZookeeperJobBoard(jobboard.NotifyingJobBoard): +class ZookeeperJobBoard(base.NotifyingJobBoard): + """A jobboard backend by zookeeper. + + Powered by the `kazoo `_ library. + + This jobboard creates *sequenced* persistent znodes in a directory in + zookeeper (that directory defaults ``/taskflow/jobs``) and uses zookeeper + watches to notify other jobboards that the job which was posted using the + :meth:`.post` method (this creates a znode with contents/details in json) + The users of those jobboard(s) (potentially on disjoint sets of machines) + can then iterate over the available jobs and decide if they want to attempt + to claim one of the jobs they have iterated over. If so they will then + attempt to contact zookeeper and will attempt to create a ephemeral znode + using the name of the persistent znode + ".lock" as a postfix. If the + entity trying to use the jobboard to :meth:`.claim` the job is able to + create a ephemeral znode with that name then it will be allowed (and + expected) to perform whatever *work* the contents of that job that it + locked described. Once finished the ephemeral znode and persistent znode + may be deleted (if successfully completed) in a single transcation or if + not successfull (or the entity that claimed the znode dies) the ephemeral + znode will be released (either manually by using :meth:`.abandon` or + automatically by zookeeper the ephemeral is deemed to be lost). + """ + def __init__(self, name, conf, client=None, persistence=None, emit_notifications=True): super(ZookeeperJobBoard, self).__init__(name, conf) @@ -373,7 +399,7 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard): job = self._known_jobs.pop(path, None) if job is not None: LOG.debug("Removed job that was at path '%s'", path) - self._emit(jobboard.REMOVAL, details={'job': job}) + self._emit(base.REMOVAL, details={'job': job}) def _process_child(self, path, request): """Receives the result of a child data fetch request.""" @@ -412,7 +438,7 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard): self._known_jobs[path] = job self._job_cond.notify_all() if job is not None: - self._emit(jobboard.POSTED, details={'job': job}) + self._emit(base.POSTED, details={'job': job}) def _on_job_posting(self, children, delayed=True): LOG.debug("Got children %s under path %s", children, self.path) @@ -498,7 +524,7 @@ class ZookeeperJobBoard(jobboard.NotifyingJobBoard): with self._job_cond: self._known_jobs[job_path] = job self._job_cond.notify_all() - self._emit(jobboard.POSTED, details={'job': job}) + self._emit(base.POSTED, details={'job': job}) return job def claim(self, job, who): diff --git a/taskflow/jobs/jobboard.py b/taskflow/jobs/base.py similarity index 74% rename from taskflow/jobs/jobboard.py rename to taskflow/jobs/base.py index 0938d0e76..eea5b12b6 100644 --- a/taskflow/jobs/jobboard.py +++ b/taskflow/jobs/base.py @@ -17,11 +17,102 @@ import abc +from oslo_utils import uuidutils import six from taskflow.types import notifier +@six.add_metaclass(abc.ABCMeta) +class Job(object): + """A abstraction that represents a named and trackable unit of work. + + A job connects a logbook, a owner, last modified and created on dates and + any associated state that the job has. Since it is a connector to a + logbook, which are each associated with a set of factories that can create + set of flows, it is the current top-level container for a piece of work + that can be owned by an entity (typically that entity will read those + logbooks and run any contained flows). + + Only one entity will be allowed to own and operate on the flows contained + in a job at a given time (for the foreseeable future). + + NOTE(harlowja): It is the object that will be transferred to another + entity on failure so that the contained flows ownership can be + transferred to the secondary entity/owner for resumption, continuation, + reverting... + """ + + def __init__(self, name, uuid=None, details=None): + if uuid: + self._uuid = uuid + else: + self._uuid = uuidutils.generate_uuid() + self._name = name + if not details: + details = {} + self._details = details + + @abc.abstractproperty + def last_modified(self): + """The datetime the job was last modified.""" + pass + + @abc.abstractproperty + def created_on(self): + """The datetime the job was created on.""" + pass + + @abc.abstractproperty + def board(self): + """The board this job was posted on or was created from.""" + + @abc.abstractproperty + def state(self): + """The current state of this job.""" + + @abc.abstractproperty + def book(self): + """Logbook associated with this job. + + If no logbook is associated with this job, this property is None. + """ + + @abc.abstractproperty + def book_uuid(self): + """UUID of logbook associated with this job. + + If no logbook is associated with this job, this property is None. + """ + + @abc.abstractproperty + def book_name(self): + """Name of logbook associated with this job. + + If no logbook is associated with this job, this property is None. + """ + + @property + def uuid(self): + """The uuid of this job.""" + return self._uuid + + @property + def details(self): + """A dictionary of any details associated with this job.""" + return self._details + + @property + def name(self): + """The non-uniquely identifying name of this job.""" + return self._name + + def __str__(self): + """Pretty formats the job into something *more* meaningful.""" + return "%s %s (%s): %s" % (type(self).__name__, + self.name, self.uuid, self.details) + + @six.add_metaclass(abc.ABCMeta) class JobBoard(object): """A place where jobs can be posted, reposted, claimed and transferred. diff --git a/taskflow/jobs/job.py b/taskflow/jobs/job.py deleted file mode 100644 index a7dd04f7d..000000000 --- a/taskflow/jobs/job.py +++ /dev/null @@ -1,111 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright (C) 2013 Rackspace Hosting Inc. All Rights Reserved. -# Copyright (C) 2013 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import abc - -from oslo_utils import uuidutils -import six - - -@six.add_metaclass(abc.ABCMeta) -class Job(object): - """A abstraction that represents a named and trackable unit of work. - - A job connects a logbook, a owner, last modified and created on dates and - any associated state that the job has. Since it is a connector to a - logbook, which are each associated with a set of factories that can create - set of flows, it is the current top-level container for a piece of work - that can be owned by an entity (typically that entity will read those - logbooks and run any contained flows). - - Only one entity will be allowed to own and operate on the flows contained - in a job at a given time (for the foreseeable future). - - NOTE(harlowja): It is the object that will be transferred to another - entity on failure so that the contained flows ownership can be - transferred to the secondary entity/owner for resumption, continuation, - reverting... - """ - - def __init__(self, name, uuid=None, details=None): - if uuid: - self._uuid = uuid - else: - self._uuid = uuidutils.generate_uuid() - self._name = name - if not details: - details = {} - self._details = details - - @abc.abstractproperty - def last_modified(self): - """The datetime the job was last modified.""" - pass - - @abc.abstractproperty - def created_on(self): - """The datetime the job was created on.""" - pass - - @abc.abstractproperty - def board(self): - """The board this job was posted on or was created from.""" - - @abc.abstractproperty - def state(self): - """The current state of this job.""" - - @abc.abstractproperty - def book(self): - """Logbook associated with this job. - - If no logbook is associated with this job, this property is None. - """ - - @abc.abstractproperty - def book_uuid(self): - """UUID of logbook associated with this job. - - If no logbook is associated with this job, this property is None. - """ - - @abc.abstractproperty - def book_name(self): - """Name of logbook associated with this job. - - If no logbook is associated with this job, this property is None. - """ - - @property - def uuid(self): - """The uuid of this job.""" - return self._uuid - - @property - def details(self): - """A dictionary of any details associated with this job.""" - return self._details - - @property - def name(self): - """The non-uniquely identifying name of this job.""" - return self._name - - def __str__(self): - """Pretty formats the job into something *more* meaningful.""" - return "%s %s (%s): %s" % (type(self).__name__, - self.name, self.uuid, self.details) diff --git a/taskflow/tests/unit/conductor/test_conductor.py b/taskflow/tests/unit/conductor/test_conductor.py index b861c12b6..137d0f3a7 100644 --- a/taskflow/tests/unit/conductor/test_conductor.py +++ b/taskflow/tests/unit/conductor/test_conductor.py @@ -22,7 +22,7 @@ from zake import fake_client from taskflow.conductors import single_threaded as stc from taskflow import engines from taskflow.jobs.backends import impl_zookeeper -from taskflow.jobs import jobboard +from taskflow.jobs import base from taskflow.patterns import linear_flow as lf from taskflow.persistence.backends import impl_memory from taskflow import states as st @@ -93,7 +93,7 @@ class SingleThreadedConductorTest(test_utils.EngineTestBase, test.TestCase): def on_consume(state, details): consumed_event.set() - components.board.notifier.register(jobboard.REMOVAL, on_consume) + components.board.notifier.register(base.REMOVAL, on_consume) with close_many(components.conductor, components.client): t = threading_utils.daemon_thread(components.conductor.run) t.start() @@ -122,7 +122,7 @@ class SingleThreadedConductorTest(test_utils.EngineTestBase, test.TestCase): def on_consume(state, details): consumed_event.set() - components.board.notifier.register(jobboard.REMOVAL, on_consume) + components.board.notifier.register(base.REMOVAL, on_consume) with close_many(components.conductor, components.client): t = threading_utils.daemon_thread(components.conductor.run) t.start()