From c03cd3f85e9df113ef10833eaedfc846adde45f6 Mon Sep 17 00:00:00 2001 From: Cyril Roelandt Date: Wed, 30 Apr 2014 00:38:50 +0200 Subject: [PATCH] Add an example for the job board feature Change-Id: I9479ca20fa9ed1217a56314c1551d394f5f1ecb5 Co-Authored-By:Dan Krause --- taskflow/examples/job_board_no_test.py | 171 +++++++++++++++++++++++++ 1 file changed, 171 insertions(+) create mode 100644 taskflow/examples/job_board_no_test.py diff --git a/taskflow/examples/job_board_no_test.py b/taskflow/examples/job_board_no_test.py new file mode 100644 index 00000000..d37c96a7 --- /dev/null +++ b/taskflow/examples/job_board_no_test.py @@ -0,0 +1,171 @@ +# -*- encoding: utf-8 -*- +# +# Copyright © 2013 eNovance +# +# Authors: Dan Krause +# Cyril Roelandt +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# This example shows how to use the job board feature. +# +# Let's start by creating some jobs: +# $ python job_board_no_test.py create my-board my-job '{}' +# $ python job_board_no_test.py create my-board my-job '{"foo": "bar"}' +# $ python job_board_no_test.py create my-board my-job '{"foo": "baz"}' +# $ python job_board_no_test.py create my-board my-job '{"foo": "barbaz"}' +# +# Make sure they were registered: +# $ python job_board_no_test.py list my-board +# 7277181a-1f83-473d-8233-f361615bae9e - {} +# 84a396e8-d02e-450d-8566-d93cb68550c0 - {u'foo': u'bar'} +# 4d355d6a-2c72-44a2-a558-19ae52e8ae2c - {u'foo': u'baz'} +# cd9aae2c-fd64-416d-8ba0-426fa8e3d59c - {u'foo': u'barbaz'} +# +# Perform one job: +# $ python job_board_no_test.py consume my-board \ +# 84a396e8-d02e-450d-8566-d93cb68550c0 +# Performing job 84a396e8-d02e-450d-8566-d93cb68550c0 with args \ +# {u'foo': u'bar'} +# $ python job_board_no_test.py list my-board +# 7277181a-1f83-473d-8233-f361615bae9e - {} +# 4d355d6a-2c72-44a2-a558-19ae52e8ae2c - {u'foo': u'baz'} +# cd9aae2c-fd64-416d-8ba0-426fa8e3d59c - {u'foo': u'barbaz'} +# +# Delete a job: +# $ python job_board_no_test.py delete my-board \ +# cd9aae2c-fd64-416d-8ba0-426fa8e3d59c +# $ python job_board_no_test.py list my-board +# 7277181a-1f83-473d-8233-f361615bae9e - {} +# 4d355d6a-2c72-44a2-a558-19ae52e8ae2c - {u'foo': u'baz'} +# +# Delete all the remaining jobs +# $ python job_board_no_test.py clear my-board +# $ python job_board_no_test.py list my-board +# $ + +import argparse +import contextlib +import json +import os +import sys +import tempfile + +import taskflow.jobs.backends as job_backends +from taskflow.persistence import logbook + +import example_utils # noqa + + +@contextlib.contextmanager +def jobboard(*args, **kwargs): + jb = job_backends.fetch(*args, **kwargs) + jb.connect() + yield jb + jb.close() + + +conf = { + 'board': 'zookeeper', + 'hosts': ['127.0.0.1:2181'] +} + + +def consume_job(args): + def perform_job(job): + print("Performing job %s with args %s" % (job.uuid, job.details)) + + with jobboard(args.board_name, conf) as jb: + for job in jb.iterjobs(ensure_fresh=True): + if job.uuid == args.job_uuid: + jb.claim(job, "test-client") + perform_job(job) + jb.consume(job, "test-client") + + +def clear_jobs(args): + with jobboard(args.board_name, conf) as jb: + for job in jb.iterjobs(ensure_fresh=True): + jb.claim(job, "test-client") + jb.consume(job, "test-client") + + +def create_job(args): + store = json.loads(args.details) + book = logbook.LogBook(args.job_name) + if example_utils.SQLALCHEMY_AVAILABLE: + persist_path = os.path.join(tempfile.gettempdir(), "persisting.db") + backend_uri = "sqlite:///%s" % (persist_path) + else: + persist_path = os.path.join(tempfile.gettempdir(), "persisting") + backend_uri = "file:///%s" % (persist_path) + with example_utils.get_backend(backend_uri) as backend: + backend.get_connection().save_logbook(book) + with jobboard(args.board_name, conf, persistence=backend) as jb: + jb.post(args.job_name, book, details=store) + + +def list_jobs(args): + with jobboard(args.board_name, conf) as jb: + for job in jb.iterjobs(ensure_fresh=True): + print("%s - %s" % (job.uuid, job.details)) + + +def delete_job(args): + with jobboard(args.board_name, conf) as jb: + for job in jb.iterjobs(ensure_fresh=True): + if job.uuid == args.job_uuid: + jb.claim(job, "test-client") + jb.consume(job, "test-client") + + +def main(argv): + parser = argparse.ArgumentParser() + subparsers = parser.add_subparsers(title='subcommands', + description='valid subcommands', + help='additional help') + + # Consume command + parser_consume = subparsers.add_parser('consume') + parser_consume.add_argument('board_name') + parser_consume.add_argument('job_uuid') + parser_consume.set_defaults(func=consume_job) + + # Clear command + parser_consume = subparsers.add_parser('clear') + parser_consume.add_argument('board_name') + parser_consume.set_defaults(func=clear_jobs) + + # Create command + parser_create = subparsers.add_parser('create') + parser_create.add_argument('board_name') + parser_create.add_argument('job_name') + parser_create.add_argument('details') + parser_create.set_defaults(func=create_job) + + # Delete command + parser_delete = subparsers.add_parser('delete') + parser_delete.add_argument('board_name') + parser_delete.add_argument('job_uuid') + parser_delete.set_defaults(func=delete_job) + + # List command + parser_list = subparsers.add_parser('list') + parser_list.add_argument('board_name') + parser_list.set_defaults(func=list_jobs) + + args = parser.parse_args(argv) + args.func(args) + +if __name__ == '__main__': + main(sys.argv[1:])