From 6ddf738bbbb703da544940812f89ed4a12295aa6 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Tue, 18 Jun 2013 12:44:27 -0700 Subject: [PATCH] Dinky change to trigger jenkins so I can cleanup. Change-Id: I5ff975bb5ded48a84341efa4ebfdae4bf430ab60 --- setup.cfg | 2 +- taskflow/backends/celery/celeryapp.py | 10 ++---- taskflow/common/__init__.py | 1 - taskflow/common/config.py | 2 ++ taskflow/db/api.py | 29 +++++++++++++++++ taskflow/db/base.py | 3 +- taskflow/db/sqlalchemy/api.py | 29 +++++++++++++++++ taskflow/db/sqlalchemy/models.py | 18 ++++++++--- taskflow/db/sqlalchemy/session.py | 5 ++- taskflow/patterns/distributed_flow.py | 7 ++-- taskflow/patterns/graph_flow.py | 8 ++--- taskflow/patterns/ordered_flow.py | 4 +-- taskflow/tests/unit/test_sql_db_api.py | 3 +- tools/test-requires | 1 + tox.ini | 44 ++++++++++++++++++++++++++ 15 files changed, 137 insertions(+), 29 deletions(-) create mode 100644 tox.ini diff --git a/setup.cfg b/setup.cfg index 5ed7d832..071d4bf9 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,3 +1,3 @@ [nosetests] cover-erase = true -verbosity=2 +verbosity = 2 diff --git a/taskflow/backends/celery/celeryapp.py b/taskflow/backends/celery/celeryapp.py index 678db80c..a973aced 100644 --- a/taskflow/backends/celery/celeryapp.py +++ b/taskflow/backends/celery/celeryapp.py @@ -23,6 +23,7 @@ from celery.signals import task_failure, task_success LOG = logging.getLogger(__name__) + @task_failure.connect def task_error_handler(signal=None, sender=None, task_id=None, exception=None, args=None, kwargs=None, @@ -34,18 +35,11 @@ def task_error_handler(signal=None, sender=None, task_id=None, LOG.error('Trackeback: %s' % (tb.print_tb(traceback), )) wf = sender.name.split('.')[0] task = ('.').join(n for n in (sender.name.split('.')[1:]) if n) - #logbook.update_task(wf, task, status="ERROR", args=args, kwargs=kwargs, - # exception=exception, traceback=(tb.print_tb(traceback))) - # TODO: Auto-initiate rollback from failed task + @task_success.connect def task_success_handler(singal=None, sender=None, result=None): """ Save task results to WF """ wf = sender.name.split('.')[0] task = ('.').join(n for n in (sender.name.split('.')[1:]) if n) - #logbook.update_task(wf, task, status="SUCCESS", result=result) - - - - diff --git a/taskflow/common/__init__.py b/taskflow/common/__init__.py index 2309e169..e068b146 100644 --- a/taskflow/common/__init__.py +++ b/taskflow/common/__init__.py @@ -16,4 +16,3 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. - diff --git a/taskflow/common/config.py b/taskflow/common/config.py index 355614be..5b13308d 100644 --- a/taskflow/common/config.py +++ b/taskflow/common/config.py @@ -49,8 +49,10 @@ celery_opts = [ help='The MongoDB connection string used to connect to the ' 'celery message queue')] + def register_db_opts(): cfg.CONF.register_opts(db_opts) + def register_celery_opts(): cfg.CONF.register_opts(celery_opts) diff --git a/taskflow/db/api.py b/taskflow/db/api.py index 31563c3f..69c082e1 100644 --- a/taskflow/db/api.py +++ b/taskflow/db/api.py @@ -35,6 +35,7 @@ CONF.register_opts(db_opts) IMPL = utils.LazyPluggable('db_backend', sqlalchemy='taskflow.db.sqlalchemy.api') + def configure(): global SQL_CONNECTION global SQL_IDLE_TIMEOUT @@ -46,89 +47,117 @@ def configure(): LOGBOOK """ + def logbook_get(context, lb_id): return IMPL.logbook_get(context, lb_id) + def logbook_get_by_name(context, lb_name): return IMPL.logbook_get_by_name(context, lb_name) + def logbook_create(context, lb_name, lb_id=None): return IMPL.logbook_create(context, lb_name, lb_id) + def logbook_get_workflows(context, lb_id): return IMPL.logbook_get_workflows(context, lb_id) + def logbook_add_workflow(context, lb_id, wf_name): return IMPL.logbook_add_workflow(context, lb_id, wf_name) + def logbook_destroy(context, lb_id): return IMPL.logbook_destroy(context, lb_id) + """ JOB """ + def job_get(context, job_id): return IMPL.job_get(context, job_id) + def job_update(context, job_id, values): return IMPL.job_update(context, job_id, values) + def job_add_workflow(context, job_id, wf_id): return IMPL.job_add_workflow(context, job_id, wf_id) + def job_get_owner(context, job_id): return IMPL.job_get_owner(context, job_id) + def job_get_state(context, job_id): return IMPL.job_get_state(context, job_id) + def job_get_logbook(context, job_id): return IMPL.job_get_logbook(context, job_id) + def job_create(context, job_name, job_id=None): return IMPL.job_create(context, job_name, job_id) + def job_destroy(context, job_id): return IMPL.job_destroy(context, job_id) + """ WORKFLOW """ + def workflow_get(context, wf_name): return IMPL.workflow_get(context, wf_name) + def workflow_get_all(context): return IMPL.workflow_get_all(context) + def workflow_get_names(context): return IMPL.workflow_get_names(context) + def workflow_get_tasks(context, wf_name): return IMPL.workflow_get_tasks(context, wf_name) + def workflow_add_task(context, wf_name, task_id): return IMPL.workflow_add_task(context, wf_name, task_id) + def workflow_create(context, wf_name): return IMPL.workflow_create(context, wf_name) + def workflow_destroy(context, wf_name): return IMPL.workflow_destroy(context, wf_name) + """ TASK """ + def task_get(context, task_id): return IMPL.task_get(context, task_id) + def task_create(context, task_name, wf_id, task_id=None): return IMPL.task_create(context, task_name, wf_id, task_id) + def task_update(context, task_id, values): return IMPL.task_update(context, task_id, values) + def task_destroy(context, task_id): return IMPL.task_destroy(context, task_id) diff --git a/taskflow/db/base.py b/taskflow/db/base.py index 184f5e62..3ecfce17 100644 --- a/taskflow/db/base.py +++ b/taskflow/db/base.py @@ -36,5 +36,4 @@ class Base(object): def __init__(self, db_driver=None): if not db_driver: db_driver = CONF.db_driver - self.db = importutils.import_module(db_driver) # pylint: disable=C0103 - + self.db = importutils.import_module(db_driver) # pylint: disable=C0103 diff --git a/taskflow/db/sqlalchemy/api.py b/taskflow/db/sqlalchemy/api.py index 6b9d9cbc..a5b1fd3f 100644 --- a/taskflow/db/sqlalchemy/api.py +++ b/taskflow/db/sqlalchemy/api.py @@ -29,16 +29,19 @@ from taskflow.openstack.common import exception LOG = logging.getLogger(__name__) + def model_query(context, *args, **kwargs): session = kwargs.get('session') or get_session() query = session.query(*args) return query + """ LOGBOOK """ + def logbook_get(context, lb_id, session=None): """Return a logbook with matching lb_id""" query = model_query(context, models.LogBook, session=session).\ @@ -50,6 +53,7 @@ def logbook_get(context, lb_id, session=None): return query.first() + def logbook_get_by_name(context, lb_name): """Return all logbooks with matching name""" query = model_query(context, models.LogBook).\ @@ -61,6 +65,7 @@ def logbook_get_by_name(context, lb_name): return query.all() + def logbook_create(context, name, lb_id=None): """Create a new logbook""" lb_ref = models.LogBook() @@ -71,6 +76,7 @@ def logbook_create(context, name, lb_id=None): return lb_ref + def logbook_get_workflows(context, lb_id): """Return all workflows associated with a logbook""" session = get_session() @@ -79,6 +85,7 @@ def logbook_get_workflows(context, lb_id): return lb.workflows + def logbook_add_workflow(context, lb_id, wf_name): """Add Workflow to given LogBook""" session = get_session() @@ -90,6 +97,7 @@ def logbook_add_workflow(context, lb_id, wf_name): return lb.workflows + def logbook_destroy(context, lb_id): """Delete a given LogBook""" session = get_session() @@ -97,10 +105,12 @@ def logbook_destroy(context, lb_id): lb = logbook_get(context, lb_id, session=session) lb.delete(session=session) + """ JOB """ + def job_get(context, job_id, session=None): """Return Job with matching job_id""" query = model_query(context, models.Job, session=session).\ @@ -112,6 +122,7 @@ def job_get(context, job_id, session=None): return query.first() + def job_update(context, job_id, values): """Update job with given values""" session = get_session() @@ -120,6 +131,7 @@ def job_update(context, job_id, values): job.update(values) job.save(session=session) + def job_add_workflow(context, job_id, wf_id): """Add a Workflow to given job""" session = get_session() @@ -129,16 +141,19 @@ def job_add_workflow(context, job_id, wf_id): job.workflows.append(wf) return job.workflows + def job_get_owner(context, job_id): """Return a job's current owner""" job = job_get(context, job_id) return job.owner + def job_get_state(context, job_id): """Return a job's current owner""" job = job_get(context, job_id) return job.state + def job_get_logbook(context, job_id): """Return the logbook associated with the given job""" session = get_session() @@ -146,6 +161,7 @@ def job_get_logbook(context, job_id): job = job_get(context, job_id, session=session) return job.logbook + def job_create(context, name, job_id=None): job_ref = models.Job() job_ref.name = name @@ -157,6 +173,7 @@ def job_create(context, name, job_id=None): return job_ref + def job_destroy(context, job_id): """Delete a given Job""" session = get_session() @@ -169,6 +186,7 @@ def job_destroy(context, job_id): WORKFLOW """ + def workflow_get(context, wf_name, session=None): """Return one workflow with matching workflow_id""" query = model_query(context, models.Workflow, session=session).\ @@ -179,6 +197,7 @@ def workflow_get(context, wf_name, session=None): return query.first() + def workflow_get_all(context): """Return all workflows""" results = model_query(context, models.Workflow).all() @@ -188,12 +207,14 @@ def workflow_get_all(context): return results + def workflow_get_names(context): """Return all workflow names""" results = model_query(context, models.Workflow.name).all() return zip(*results) + def workflow_get_tasks(context, wf_name): """Return all tasks for a given Workflow""" session = get_session() @@ -202,6 +223,7 @@ def workflow_get_tasks(context, wf_name): return wf.tasks + def workflow_add_task(context, wf_id, task_id): """Add a task to a given workflow""" session = get_session() @@ -211,6 +233,7 @@ def workflow_add_task(context, wf_id, task_id): wf.tasks.append(task) return wf.tasks + def workflow_create(context, workflow_name): """Create new workflow with workflow_id""" workflow_ref = models.Workflow() @@ -219,6 +242,7 @@ def workflow_create(context, workflow_name): return workflow_ref + def workflow_destroy(context, wf_name): """Delete a given Workflow""" session = get_session() @@ -226,10 +250,12 @@ def workflow_destroy(context, wf_name): wf = workflow_get(context, wf_name, session=session) wf.delete(session=session) + """ TASK """ + def task_get(context, task_id, session=None): """Return Task with task_id""" query = model_query(context, models.Task, session=session).\ @@ -241,6 +267,7 @@ def task_get(context, task_id, session=None): return query.first() + def task_create(context, task_name, wf_id, task_id=None): """Create task associated with given workflow""" task_ref = models.Task() @@ -252,6 +279,7 @@ def task_create(context, task_name, wf_id, task_id=None): return task_ref + def task_update(context, task_id, values): """Update Task with given values""" session = get_session() @@ -261,6 +289,7 @@ def task_update(context, task_id, values): task.update(values) task.save(session=session) + def task_destroy(context, task_id): """Delete an existing Task""" session = get_session() diff --git a/taskflow/db/sqlalchemy/models.py b/taskflow/db/sqlalchemy/models.py index 9acc869b..5e100158 100644 --- a/taskflow/db/sqlalchemy/models.py +++ b/taskflow/db/sqlalchemy/models.py @@ -32,11 +32,12 @@ from sqlalchemy import types as types from json import dumps, loads from taskflow.db.sqlalchemy.session import get_session, get_engine -from taskflow.openstack.common import timeutils, uuidutils +from taskflow.openstack.common import timeutils, uuidutils, exception CONF = cfg.CONF BASE = declarative_base() + class Json(types.TypeDecorator, types.MutableType): impl = types.Text @@ -49,12 +50,11 @@ class Json(types.TypeDecorator, types.MutableType): class TaskFlowBase(object): """Base class for TaskFlow Models.""" - __table_args__ = {'mysql_engine':'InnoDB'} + __table_args__ = {'mysql_engine': 'InnoDB'} __table_initialized = False created_at = Column(DateTime, default=timeutils.utcnow) updated_at = Column(DateTime, default=timeutils.utcnow) - def save(self, session=None): """Save this object.""" if not session: @@ -109,18 +109,23 @@ class TaskFlowBase(object): local.update(joined) return local.iteritems() -workflow_logbook_assoc = Table('wf_lb_assoc', BASE.metadata, + +workflow_logbook_assoc = Table( + 'wf_lb_assoc', BASE.metadata, Column('workflow_id', Integer, ForeignKey('workflow.id')), Column('logbook_id', Integer, ForeignKey('logbook.id')), Column('id', Integer, primary_key=True) ) -workflow_job_assoc = Table('wf_job_assoc', BASE.metadata, + +workflow_job_assoc = Table( + 'wf_job_assoc', BASE.metadata, Column('workflow_id', Integer, ForeignKey('workflow.id')), Column('job_id', Integer, ForeignKey('job.id')), Column('id', Integer, primary_key=True) ) + class LogBook(BASE, TaskFlowBase): """Represents a logbook for a set of workflows""" @@ -134,6 +139,7 @@ class LogBook(BASE, TaskFlowBase): secondary=workflow_logbook_assoc) job = relationship("Job", uselist=False, backref="logbook") + class Job(BASE, TaskFlowBase): """Represents a Job""" @@ -159,6 +165,7 @@ class Workflow(BASE, TaskFlowBase): name = Column(String, unique=True) tasks = relationship("Task", backref="workflow") + class Task(BASE, TaskFlowBase): """Represents Task detail objects""" @@ -172,5 +179,6 @@ class Task(BASE, TaskFlowBase): stacktrace = Column(String) workflow_id = Column(String, ForeignKey('workflow.id')) + def create_tables(): BASE.metadata.create_all(get_engine()) diff --git a/taskflow/db/sqlalchemy/session.py b/taskflow/db/sqlalchemy/session.py index b95238ad..a620c8e4 100644 --- a/taskflow/db/sqlalchemy/session.py +++ b/taskflow/db/sqlalchemy/session.py @@ -27,6 +27,8 @@ import sqlalchemy.interfaces import sqlalchemy from sqlalchemy.pool import NullPool +from sqlalchemy import exc + from taskflow.db import api as db_api LOG = logging.getLogger(__name__) @@ -62,7 +64,7 @@ http://groups.google.com/group/sqlalchemy/msg/a4ce563d802c929f except dbapi_conn.OperationalError, ex: if ex.args[0] in (2006, 2013, 2014, 2045, 2055): LOG.warn(_('Got mysql server has gone away: %s'), ex) - raise DisconnectionError("Database server went away") + raise exc.DisconnectionError("Database server went away") else: raise @@ -101,6 +103,7 @@ def get_maker(engine, autocommit=True, expire_on_commit=False): autocommit=autocommit, expire_on_commit=expire_on_commit) + def _get_sql_connection(): return db_api.SQL_CONNECTION diff --git a/taskflow/patterns/distributed_flow.py b/taskflow/patterns/distributed_flow.py index f2afe7da..d247d00c 100644 --- a/taskflow/patterns/distributed_flow.py +++ b/taskflow/patterns/distributed_flow.py @@ -36,14 +36,13 @@ class Flow(object): self._tasks = [] logbook.add_workflow(name) - def chain_listeners(self, context, initial_task, callback_task): """ Register one listener for a task """ if self.root is None: initial_task.name = '%s.%s' % (self.name, initial_task.name) self.root = initial_task.s(context) self._tasks.append(initial_task) - LOG.info('WF %s root task set to %s' % (self.name, initial_task.name)) + LOG.info('WF %s root task set to %s', self.name, initial_task.name) callback_task.name = '%s.%s' % (self.name, callback_task.name) self._tasks.append(callback_task) @@ -56,13 +55,13 @@ class Flow(object): initial_task.name = '%s.%s' % (self.name, initial_task.name) self.root = initial_task.s(context) self._tasks.append(initial_task) - LOG.info('WF %s root task set to %s' % (self.name, initial_task.name)) + LOG.info('WF %s root task set to %s', self.name, initial_task.name) for task in callback_tasks: task.name = '%s.%s' % (self.name, task.name) self._tasks.append(task) initial_task.link(task.s(context)) - def merge_listeners(self, context, inital_tasks, callback_task): + def merge_listeners(self, context, initial_tasks, callback_task): """ Register one listener for multiple tasks """ header = [] if self.root is None: diff --git a/taskflow/patterns/graph_flow.py b/taskflow/patterns/graph_flow.py index 5e1ddab9..3061deb2 100644 --- a/taskflow/patterns/graph_flow.py +++ b/taskflow/patterns/graph_flow.py @@ -56,8 +56,8 @@ class Flow(ordered_flow.Flow): for (them, there_result) in self.results: if not n in set(getattr(them, 'provides', [])): continue - if (not is_optional and - not self._graph.has_edge(them, task)): + if ((not is_optional and + not self._graph.has_edge(them, task))): continue if there_result and n in there_result: place_where[n].append(there_result[n]) @@ -125,8 +125,8 @@ class Flow(ordered_flow.Flow): if p is n: # No self-referencing allowed. continue - if (len(get_providers(n, want_what)) and not - self._allow_same_inputs): + if ((len(get_providers(n, want_what)) + and not self._allow_same_inputs)): msg = "Multiple providers of %s not allowed." raise exc.InvalidStateException(msg % (want_what)) self._graph.add_edge(p, n, attr_dict={ diff --git a/taskflow/patterns/ordered_flow.py b/taskflow/patterns/ordered_flow.py index e69b99a5..d0364df6 100644 --- a/taskflow/patterns/ordered_flow.py +++ b/taskflow/patterns/ordered_flow.py @@ -52,8 +52,8 @@ class RollbackTask(object): return str(self.task) def __call__(self, cause): - if (hasattr(self.task, "revert") and - isinstance(self.task.revert, collections.Callable)): + if ((hasattr(self.task, "revert") and + isinstance(self.task.revert, collections.Callable))): self.task.revert(self.context, self.result, cause) diff --git a/taskflow/tests/unit/test_sql_db_api.py b/taskflow/tests/unit/test_sql_db_api.py index 1922c5d7..535f757c 100644 --- a/taskflow/tests/unit/test_sql_db_api.py +++ b/taskflow/tests/unit/test_sql_db_api.py @@ -448,7 +448,8 @@ class TaskTest(unittest2.TestCase): self.assertEquals(expected, actual) self.assertRaises(exception.NotFound, db_api.task_update, '', 9001, - dict(exception='ExceptionTest', stacktrace='StacktraceTest')) + dict(exception='ExceptionTest', + stacktrace='StacktraceTest')) def test_task_destroy(self): id = self.tsk_ids.pop() diff --git a/tools/test-requires b/tools/test-requires index f43462fd..0dca05d8 100644 --- a/tools/test-requires +++ b/tools/test-requires @@ -1,6 +1,7 @@ # Packages needed for developer testing this library. coverage distribute>=0.6.24 +flake8==2.0 mox nose nose-exclude diff --git a/tox.ini b/tox.ini new file mode 100644 index 00000000..3f5d36ec --- /dev/null +++ b/tox.ini @@ -0,0 +1,44 @@ +[tox] +envlist = py26,py27,py33,pep8 + +[testenv] +sitepackages = True +setenv = VIRTUAL_ENV={envdir} + LANG=en_US.UTF-8 + LANGUAGE=en_US:en + LC_ALL=C + NOSE_WITH_OPENSTACK=1 + NOSE_OPENSTACK_COLOR=1 + NOSE_OPENSTACK_RED=0.05 + NOSE_OPENSTACK_YELLOW=0.025 + NOSE_OPENSTACK_SHOW_ELAPSED=1 + NOSE_OPENSTACK_STDOUT=1 +deps = -r{toxinidir}/tools/pip-requires + -r{toxinidir}/tools/test-requires +commands = nosetests {posargs} + +[tox:jenkins] +sitepackages = True +downloadcache = ~/cache/pip + +[testenv:pep8] +sitepackages = False +commands = + flake8 {posargs} + +[testenv:pylint] +setenv = VIRTUAL_ENV={envdir} +deps = -r{toxinidir}/tools/pip-requires + pylint==0.26.0 +commands = pylint + +[testenv:cover] +setenv = NOSE_WITH_COVERAGE=1 + +[testenv:venv] +commands = {posargs} + +[flake8] +ignore = E711,E712,F401,F403,F811,F841,H302,H303,H304,H401,H402,H403,H404 +builtins = _ +exclude = .venv,.tox,dist,doc,*openstack/common*,*egg,.git \ No newline at end of file