Jobboard example that show jobs + workers + producers
Add a new example that spins up a set of threads to simulate the actual workers and producers that would be normally attached to a jobboard and use these threads to producer and consume a set of simple jobs (that are filtered on by the workers). - This also fixes how python3 removed the __cmp__ operator which we were using for sorting the jobs that were posted and now we must use __lt__ instead. Fixes bug 1367496 Part of blueprint more-examples Change-Id: Ib8d116637b8edae31e4c8927a28515907855f8bf
This commit is contained in:

committed by
Joshua Harlow

parent
f8d69ffc31
commit
15e3962e2a
@@ -198,3 +198,15 @@ Code
|
|||||||
:language: python
|
:language: python
|
||||||
:linenos:
|
:linenos:
|
||||||
:lines: 16-
|
:lines: 16-
|
||||||
|
|
||||||
|
Jobboard producer/consumer (simple)
|
||||||
|
===================================
|
||||||
|
|
||||||
|
.. note::
|
||||||
|
|
||||||
|
Full source located at :example:`jobboard_produce_consume_colors`
|
||||||
|
|
||||||
|
.. literalinclude:: ../../taskflow/examples/jobboard_produce_consume_colors.py
|
||||||
|
:language: python
|
||||||
|
:linenos:
|
||||||
|
:lines: 16-
|
||||||
|
178
taskflow/examples/jobboard_produce_consume_colors.py
Normal file
178
taskflow/examples/jobboard_produce_consume_colors.py
Normal file
@@ -0,0 +1,178 @@
|
|||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import collections
|
||||||
|
import contextlib
|
||||||
|
import logging
|
||||||
|
import os
|
||||||
|
import random
|
||||||
|
import sys
|
||||||
|
import threading
|
||||||
|
import time
|
||||||
|
|
||||||
|
logging.basicConfig(level=logging.ERROR)
|
||||||
|
|
||||||
|
top_dir = os.path.abspath(os.path.join(os.path.dirname(__file__),
|
||||||
|
os.pardir,
|
||||||
|
os.pardir))
|
||||||
|
sys.path.insert(0, top_dir)
|
||||||
|
|
||||||
|
import six
|
||||||
|
from zake import fake_client
|
||||||
|
|
||||||
|
from taskflow import exceptions as excp
|
||||||
|
from taskflow.jobs import backends
|
||||||
|
|
||||||
|
# In this example we show how a jobboard can be used to post work for other
|
||||||
|
# entities to work on. This example creates a set of jobs using one producer
|
||||||
|
# thread (typically this would be split across many machines) and then having
|
||||||
|
# other worker threads with there own jobboards select work using a given
|
||||||
|
# filters [red/blue] and then perform that work (and consuming or abandoning
|
||||||
|
# the job after it has been completed or failed).
|
||||||
|
|
||||||
|
# Things to note:
|
||||||
|
# - No persistence layer is used (or logbook), just the job details are used
|
||||||
|
# to determine if a job should be selected by a worker or not.
|
||||||
|
# - This example runs in a single process (this is expected to be atypical
|
||||||
|
# but this example shows that it can be done if needed, for testing...)
|
||||||
|
# - The iterjobs(), claim(), consume()/abandon() worker workflow.
|
||||||
|
# - The post() producer workflow.
|
||||||
|
|
||||||
|
SHARED_CONF = {
|
||||||
|
'path': "/taskflow/jobs",
|
||||||
|
'board': 'zookeeper',
|
||||||
|
}
|
||||||
|
|
||||||
|
# How many workers and producers of work will be created (as threads).
|
||||||
|
PRODUCERS = 3
|
||||||
|
WORKERS = 5
|
||||||
|
|
||||||
|
# How many units of work each producer will create.
|
||||||
|
PRODUCER_UNITS = 10
|
||||||
|
|
||||||
|
# How many units of work are expected to be produced (used so workers can
|
||||||
|
# know when to stop running and shutdown, typically this would not be a
|
||||||
|
# a value but we have to limit this examples execution time to be less than
|
||||||
|
# infinity).
|
||||||
|
EXPECTED_UNITS = PRODUCER_UNITS * PRODUCERS
|
||||||
|
|
||||||
|
# Delay between producing/consuming more work.
|
||||||
|
WORKER_DELAY, PRODUCER_DELAY = (0.5, 0.5)
|
||||||
|
|
||||||
|
# To ensure threads don't trample other threads output.
|
||||||
|
STDOUT_LOCK = threading.Lock()
|
||||||
|
|
||||||
|
|
||||||
|
def dispatch_work(job):
|
||||||
|
# This is where the jobs contained work *would* be done
|
||||||
|
time.sleep(1.0)
|
||||||
|
|
||||||
|
|
||||||
|
def safe_print(name, message, prefix=""):
|
||||||
|
with STDOUT_LOCK:
|
||||||
|
if prefix:
|
||||||
|
print("%s %s: %s" % (prefix, name, message))
|
||||||
|
else:
|
||||||
|
print("%s: %s" % (name, message))
|
||||||
|
|
||||||
|
|
||||||
|
def worker(ident, client, consumed):
|
||||||
|
# Create a personal board (using the same client so that it works in
|
||||||
|
# the same process) and start looking for jobs on the board that we want
|
||||||
|
# to perform.
|
||||||
|
name = "W-%s" % (ident)
|
||||||
|
safe_print(name, "started")
|
||||||
|
claimed_jobs = 0
|
||||||
|
consumed_jobs = 0
|
||||||
|
abandoned_jobs = 0
|
||||||
|
with backends.backend(name, SHARED_CONF.copy(), client=client) as board:
|
||||||
|
while len(consumed) != EXPECTED_UNITS:
|
||||||
|
favorite_color = random.choice(['blue', 'red'])
|
||||||
|
for job in board.iterjobs(ensure_fresh=True, only_unclaimed=True):
|
||||||
|
# See if we should even bother with it...
|
||||||
|
if job.details.get('color') != favorite_color:
|
||||||
|
continue
|
||||||
|
safe_print(name, "'%s' [attempting claim]" % (job))
|
||||||
|
try:
|
||||||
|
board.claim(job, name)
|
||||||
|
claimed_jobs += 1
|
||||||
|
safe_print(name, "'%s' [claimed]" % (job))
|
||||||
|
except (excp.NotFound, excp.UnclaimableJob):
|
||||||
|
safe_print(name, "'%s' [claim unsuccessful]" % (job))
|
||||||
|
else:
|
||||||
|
try:
|
||||||
|
dispatch_work(job)
|
||||||
|
board.consume(job, name)
|
||||||
|
safe_print(name, "'%s' [consumed]" % (job))
|
||||||
|
consumed_jobs += 1
|
||||||
|
consumed.append(job)
|
||||||
|
except Exception:
|
||||||
|
board.abandon(job, name)
|
||||||
|
abandoned_jobs += 1
|
||||||
|
safe_print(name, "'%s' [abandoned]" % (job))
|
||||||
|
time.sleep(WORKER_DELAY)
|
||||||
|
safe_print(name,
|
||||||
|
"finished (claimed %s jobs, consumed %s jobs,"
|
||||||
|
" abandoned %s jobs)" % (claimed_jobs, consumed_jobs,
|
||||||
|
abandoned_jobs), prefix=">>>")
|
||||||
|
|
||||||
|
|
||||||
|
def producer(ident, client):
|
||||||
|
# Create a personal board (using the same client so that it works in
|
||||||
|
# the same process) and start posting jobs on the board that we want
|
||||||
|
# some entity to perform.
|
||||||
|
name = "P-%s" % (ident)
|
||||||
|
safe_print(name, "started")
|
||||||
|
with backends.backend(name, SHARED_CONF.copy(), client=client) as board:
|
||||||
|
for i in six.moves.xrange(0, PRODUCER_UNITS):
|
||||||
|
job_name = "%s-%s" % (name, i)
|
||||||
|
details = {
|
||||||
|
'color': random.choice(['red', 'blue']),
|
||||||
|
}
|
||||||
|
job = board.post(job_name, book=None, details=details)
|
||||||
|
safe_print(name, "'%s' [posted]" % (job))
|
||||||
|
time.sleep(PRODUCER_DELAY)
|
||||||
|
safe_print(name, "finished", prefix=">>>")
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
with contextlib.closing(fake_client.FakeClient()) as c:
|
||||||
|
created = []
|
||||||
|
for i in range(0, PRODUCERS):
|
||||||
|
p = threading.Thread(target=producer, args=(i + 1, c))
|
||||||
|
p.daemon = True
|
||||||
|
created.append(p)
|
||||||
|
p.start()
|
||||||
|
consumed = collections.deque()
|
||||||
|
for i in range(0, WORKERS):
|
||||||
|
w = threading.Thread(target=worker, args=(i + 1, c, consumed))
|
||||||
|
w.daemon = True
|
||||||
|
created.append(w)
|
||||||
|
w.start()
|
||||||
|
while created:
|
||||||
|
t = created.pop()
|
||||||
|
t.join()
|
||||||
|
# At the end there should be nothing leftover, let's verify that.
|
||||||
|
board = backends.fetch('verifier', SHARED_CONF.copy(), client=c)
|
||||||
|
board.connect()
|
||||||
|
with contextlib.closing(board):
|
||||||
|
if board.job_count != 0 or len(consumed) != EXPECTED_UNITS:
|
||||||
|
return 1
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
sys.exit(main())
|
@@ -77,10 +77,13 @@ class ZookeeperJob(base_job.Job):
|
|||||||
if all((self._book, self._book_data)):
|
if all((self._book, self._book_data)):
|
||||||
raise ValueError("Only one of 'book_data' or 'book'"
|
raise ValueError("Only one of 'book_data' or 'book'"
|
||||||
" can be provided")
|
" can be provided")
|
||||||
self._path = path
|
self._path = k_paths.normpath(path)
|
||||||
self._lock_path = path + LOCK_POSTFIX
|
self._lock_path = path + LOCK_POSTFIX
|
||||||
self._created_on = created_on
|
self._created_on = created_on
|
||||||
self._node_not_found = False
|
self._node_not_found = False
|
||||||
|
basename = k_paths.basename(self._path)
|
||||||
|
self._root = self._path[0:-len(basename)]
|
||||||
|
self._sequence = int(basename[len(JOB_PREFIX):])
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def lock_path(self):
|
def lock_path(self):
|
||||||
@@ -90,6 +93,14 @@ class ZookeeperJob(base_job.Job):
|
|||||||
def path(self):
|
def path(self):
|
||||||
return self._path
|
return self._path
|
||||||
|
|
||||||
|
@property
|
||||||
|
def sequence(self):
|
||||||
|
return self._sequence
|
||||||
|
|
||||||
|
@property
|
||||||
|
def root(self):
|
||||||
|
return self._root
|
||||||
|
|
||||||
def _get_node_attr(self, path, attr_name, trans_func=None):
|
def _get_node_attr(self, path, attr_name, trans_func=None):
|
||||||
try:
|
try:
|
||||||
_data, node_stat = self._client.get(path)
|
_data, node_stat = self._client.get(path)
|
||||||
@@ -186,8 +197,11 @@ class ZookeeperJob(base_job.Job):
|
|||||||
return states.UNCLAIMED
|
return states.UNCLAIMED
|
||||||
return states.CLAIMED
|
return states.CLAIMED
|
||||||
|
|
||||||
def __cmp__(self, other):
|
def __lt__(self, other):
|
||||||
return cmp(self.path, other.path)
|
if self.root == other.root:
|
||||||
|
return self.sequence < other.sequence
|
||||||
|
else:
|
||||||
|
return self.root < other.root
|
||||||
|
|
||||||
def __hash__(self):
|
def __hash__(self):
|
||||||
return hash(self.path)
|
return hash(self.path)
|
||||||
|
Reference in New Issue
Block a user