diff --git a/taskflow/backends/celery/celeryapp.py b/taskflow/backends/celery/celeryapp.py index 3bb3595b..e41d5d64 100644 --- a/taskflow/backends/celery/celeryapp.py +++ b/taskflow/backends/celery/celeryapp.py @@ -19,17 +19,16 @@ import logging import traceback as tb -from celery.signals import task_failure -from celery.signals import task_success +from celery import signals LOG = logging.getLogger(__name__) -@task_failure.connect +@signals.task_failure.connect def task_error_handler(signal=None, sender=None, task_id=None, exception=None, args=None, kwargs=None, traceback=None, einfo=None): - """ If a task errors out, log all error info """ + """If a task errors out, log all error info""" LOG.error('Task %s, id: %s, called with args: %s, and kwargs: %s' 'failed with exception: %s' % (sender.name, task_id, args, kwargs, exception)) @@ -37,7 +36,7 @@ def task_error_handler(signal=None, sender=None, task_id=None, # TODO(jlucci): Auto-initiate rollback from failed task -@task_success.connect +@signals.task_success.connect def task_success_handler(singal=None, sender=None, result=None): - """ Save task results to WF """ + """Save task results to WF.""" pass diff --git a/taskflow/backends/memory.py b/taskflow/backends/memory.py index b2c83cf6..f006e310 100644 --- a/taskflow/backends/memory.py +++ b/taskflow/backends/memory.py @@ -16,8 +16,7 @@ # License for the specific language governing permissions and limitations # under the License. -from datetime import datetime - +import datetime import functools import logging import threading @@ -198,7 +197,7 @@ class MemoryJobBoard(jobboard.JobBoard): @check_not_closed def post(self, job): with self._lock.acquire(read=False): - self._board.append((datetime.utcnow(), job)) + self._board.append((datetime.datetime.utcnow(), job)) # Ensure the job tracks that we posted it job.posted_on.append(weakref.proxy(self)) # Let people know a job is here diff --git a/taskflow/db/sqlalchemy/api.py b/taskflow/db/sqlalchemy/api.py index a52f943a..f31c952b 100644 --- a/taskflow/db/sqlalchemy/api.py +++ b/taskflow/db/sqlalchemy/api.py @@ -24,7 +24,7 @@ import logging from taskflow import states from taskflow.db.sqlalchemy import models -from taskflow.db.sqlalchemy.session import get_session +from taskflow.db.sqlalchemy import session as sql_session from taskflow.openstack.common import exception @@ -32,7 +32,7 @@ LOG = logging.getLogger(__name__) def model_query(context, *args, **kwargs): - session = kwargs.get('session') or get_session() + session = kwargs.get('session') or sql_session.get_session() query = session.query(*args) return query @@ -80,7 +80,7 @@ def logbook_create(context, name, lb_id=None): def logbook_get_workflows(context, lb_id): """Return all workflows associated with a logbook""" - session = get_session() + session = sql_session.get_session() with session.begin(): lb = logbook_get(context, lb_id, session=session) @@ -89,7 +89,7 @@ def logbook_get_workflows(context, lb_id): def logbook_add_workflow(context, lb_id, wf_name): """Add Workflow to given LogBook""" - session = get_session() + session = sql_session.get_session() with session.begin(): wf = workflow_get(context, wf_name, session=session) lb = logbook_get(context, lb_id, session=session) @@ -101,7 +101,7 @@ def logbook_add_workflow(context, lb_id, wf_name): def logbook_destroy(context, lb_id): """Delete a given LogBook""" - session = get_session() + session = sql_session.get_session() with session.begin(): lb = logbook_get(context, lb_id, session=session) lb.delete(session=session) @@ -126,7 +126,7 @@ def job_get(context, job_id, session=None): def job_update(context, job_id, values): """Update job with given values""" - session = get_session() + session = sql_session.get_session() with session.begin(): job = job_get(context, job_id, session=session) job.update(values) @@ -135,7 +135,7 @@ def job_update(context, job_id, values): def job_add_workflow(context, job_id, wf_id): """Add a Workflow to given job""" - session = get_session() + session = sql_session.get_session() with session.begin(): job = job_get(context, job_id, session=session) wf = workflow_get(context, wf_id, session=session) @@ -157,7 +157,7 @@ def job_get_state(context, job_id): def job_get_logbook(context, job_id): """Return the logbook associated with the given job""" - session = get_session() + session = sql_session.get_session() with session.begin(): job = job_get(context, job_id, session=session) return job.logbook @@ -177,7 +177,7 @@ def job_create(context, name, job_id=None): def job_destroy(context, job_id): """Delete a given Job""" - session = get_session() + session = sql_session.get_session() with session.begin(): job = job_get(context, job_id, session=session) job.delete(session=session) @@ -218,7 +218,7 @@ def workflow_get_names(context): def workflow_get_tasks(context, wf_name): """Return all tasks for a given Workflow""" - session = get_session() + session = sql_session.get_session() with session.begin(): wf = workflow_get(context, wf_name, session=session) @@ -227,7 +227,7 @@ def workflow_get_tasks(context, wf_name): def workflow_add_task(context, wf_id, task_id): """Add a task to a given workflow""" - session = get_session() + session = sql_session.get_session() with session.begin(): task = task_get(context, task_id, session=session) wf = workflow_get(context, wf_id, session=session) @@ -246,7 +246,7 @@ def workflow_create(context, workflow_name): def workflow_destroy(context, wf_name): """Delete a given Workflow""" - session = get_session() + session = sql_session.get_session() with session.begin(): wf = workflow_get(context, wf_name, session=session) wf.delete(session=session) @@ -283,7 +283,7 @@ def task_create(context, task_name, wf_id, task_id=None): def task_update(context, task_id, values): """Update Task with given values""" - session = get_session() + session = sql_session.get_session() with session.begin(): task = task_get(context, task_id) @@ -293,7 +293,7 @@ def task_update(context, task_id, values): def task_destroy(context, task_id): """Delete an existing Task""" - session = get_session() + session = sql_session.get_session() with session.begin(): task = task_get(context, task_id, session=session) task.delete(session=session) diff --git a/taskflow/db/sqlalchemy/models.py b/taskflow/db/sqlalchemy/models.py index 6f8a1501..35572fab 100644 --- a/taskflow/db/sqlalchemy/models.py +++ b/taskflow/db/sqlalchemy/models.py @@ -29,8 +29,7 @@ from sqlalchemy.orm import object_mapper, relationship from sqlalchemy import DateTime, ForeignKey from sqlalchemy import types as types -from taskflow.db.sqlalchemy.session import get_engine -from taskflow.db.sqlalchemy.session import get_session +from taskflow.db.sqlalchemy import session as sql_session from taskflow.openstack.common import exception from taskflow.openstack.common import timeutils from taskflow.openstack.common import uuidutils @@ -59,7 +58,7 @@ class TaskFlowBase(object): def save(self, session=None): """Save this object.""" if not session: - session = get_session() + session = sql_session.get_session() session.add(self) try: session.flush() @@ -74,7 +73,7 @@ class TaskFlowBase(object): self.deleted = True self.deleted_at = timeutils.utcnow() if not session: - session = get_session() + session = sql_session.get_session() session.delete(self) session.flush() @@ -182,4 +181,4 @@ class Task(BASE, TaskFlowBase): def create_tables(): - BASE.metadata.create_all(get_engine()) + BASE.metadata.create_all(sql_session.get_engine()) diff --git a/taskflow/db/sqlalchemy/session.py b/taskflow/db/sqlalchemy/session.py index 66af18d2..bf84ecb8 100644 --- a/taskflow/db/sqlalchemy/session.py +++ b/taskflow/db/sqlalchemy/session.py @@ -52,8 +52,7 @@ def synchronous_switch_listener(dbapi_conn, connection_rec): def ping_listener(dbapi_conn, connection_rec, connection_proxy): - """ -Ensures that MySQL connections checked out of the + """Ensures that MySQL connections checked out of the pool are alive. Borrowed from: diff --git a/taskflow/logbook.py b/taskflow/logbook.py index 943896e8..037a3306 100644 --- a/taskflow/logbook.py +++ b/taskflow/logbook.py @@ -17,16 +17,15 @@ # under the License. import abc +import datetime import weakref -from datetime import datetime - class TaskDetail(object): """Task details have the bare minimum of these fields/methods.""" def __init__(self, name, metadata=None): - self.date_created = datetime.utcnow() + self.date_created = datetime.datetime.utcnow() self.name = name self.metadata = metadata self.date_updated = None diff --git a/taskflow/patterns/distributed_flow.py b/taskflow/patterns/distributed_flow.py index 3a12cb94..90b3f25d 100644 --- a/taskflow/patterns/distributed_flow.py +++ b/taskflow/patterns/distributed_flow.py @@ -17,11 +17,11 @@ # License for the specific language governing permissions and limitations # under the License. +import celery import logging from taskflow import logbook -from celery import chord LOG = logging.getLogger(__name__) @@ -37,7 +37,7 @@ class Flow(object): logbook.add_workflow(name) def chain_listeners(self, context, initial_task, callback_task): - """ Register one listener for a task """ + """Register one listener for a task.""" if self.root is None: initial_task.name = '%s.%s' % (self.name, initial_task.name) self.root = initial_task.s(context) @@ -50,7 +50,7 @@ class Flow(object): initial_task.link(callback_task.s(context)) def split_listeners(self, context, initial_task, callback_tasks): - """ Register multiple listeners for one task """ + """Register multiple listeners for one task.""" if self.root is None: initial_task.name = '%s.%s' % (self.name, initial_task.name) self.root = initial_task.s(context) @@ -62,7 +62,7 @@ class Flow(object): initial_task.link(task.s(context)) def merge_listeners(self, context, initial_tasks, callback_task): - """ Register one listener for multiple tasks """ + """Register one listener for multiple tasks.""" header = [] if self.root is None: self.root = [] @@ -79,9 +79,9 @@ class Flow(object): # TODO(jlucci): Need to set up chord so that it's not executed # immediately. - chord(header, body=callback_task) + celery.chord(header, body=callback_task) def run(self, context, *args, **kwargs): - """ Start root task and kick off workflow """ + """Start root task and kick off workflow.""" self.root(context) LOG.info('WF %s has been started' % (self.name,)) diff --git a/taskflow/tests/unit/test_memory.py b/taskflow/tests/unit/test_memory.py index 3ea73bb1..13b11b52 100644 --- a/taskflow/tests/unit/test_memory.py +++ b/taskflow/tests/unit/test_memory.py @@ -16,8 +16,7 @@ # License for the specific language governing permissions and limitations # under the License. -from datetime import datetime - +import datetime import functools import threading import unittest2 @@ -65,7 +64,7 @@ class MemoryBackendTest(unittest2.TestCase): if not my_jobs: # No jobs were claimed, lets not search the past again # then, since *likely* those jobs will remain claimed... - job_search_from = datetime.utcnow() + job_search_from = datetime.datetime.utcnow() if my_jobs and poison.isSet(): # Oh crap, we need to unclaim and repost the jobs. for j in my_jobs: @@ -215,7 +214,7 @@ class MemoryBackendTest(unittest2.TestCase): j.claim(owner) def receive_job(): - start = datetime.utcnow() + start = datetime.datetime.utcnow() receiver_awake.set() new_jobs = [] while not new_jobs: diff --git a/tox.ini b/tox.ini index 45859699..4499f71f 100644 --- a/tox.ini +++ b/tox.ini @@ -39,6 +39,6 @@ setenv = NOSE_WITH_COVERAGE=1 commands = {posargs} [flake8] -ignore = H302,H303,H304,H401,H402,H403,H404 +ignore = H402,H403 builtins = _ exclude = .venv,.tox,dist,doc,*openstack/common*,*egg,.git,build,tools \ No newline at end of file