Fix most of the hacking rules
Hacking rules: H302, H303, H304, H401, H404 Change-Id: I38e62696724a99c5ebe74d95d477999bd91a2c9a
This commit is contained in:
@@ -19,17 +19,16 @@
|
||||
import logging
|
||||
import traceback as tb
|
||||
|
||||
from celery.signals import task_failure
|
||||
from celery.signals import task_success
|
||||
from celery import signals
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@task_failure.connect
|
||||
@signals.task_failure.connect
|
||||
def task_error_handler(signal=None, sender=None, task_id=None,
|
||||
exception=None, args=None, kwargs=None,
|
||||
traceback=None, einfo=None):
|
||||
""" If a task errors out, log all error info """
|
||||
"""If a task errors out, log all error info"""
|
||||
LOG.error('Task %s, id: %s, called with args: %s, and kwargs: %s'
|
||||
'failed with exception: %s' % (sender.name, task_id,
|
||||
args, kwargs, exception))
|
||||
@@ -37,7 +36,7 @@ def task_error_handler(signal=None, sender=None, task_id=None,
|
||||
# TODO(jlucci): Auto-initiate rollback from failed task
|
||||
|
||||
|
||||
@task_success.connect
|
||||
@signals.task_success.connect
|
||||
def task_success_handler(singal=None, sender=None, result=None):
|
||||
""" Save task results to WF """
|
||||
"""Save task results to WF."""
|
||||
pass
|
||||
|
||||
@@ -16,8 +16,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
import datetime
|
||||
import functools
|
||||
import logging
|
||||
import threading
|
||||
@@ -198,7 +197,7 @@ class MemoryJobBoard(jobboard.JobBoard):
|
||||
@check_not_closed
|
||||
def post(self, job):
|
||||
with self._lock.acquire(read=False):
|
||||
self._board.append((datetime.utcnow(), job))
|
||||
self._board.append((datetime.datetime.utcnow(), job))
|
||||
# Ensure the job tracks that we posted it
|
||||
job.posted_on.append(weakref.proxy(self))
|
||||
# Let people know a job is here
|
||||
|
||||
@@ -24,7 +24,7 @@ import logging
|
||||
from taskflow import states
|
||||
|
||||
from taskflow.db.sqlalchemy import models
|
||||
from taskflow.db.sqlalchemy.session import get_session
|
||||
from taskflow.db.sqlalchemy import session as sql_session
|
||||
|
||||
from taskflow.openstack.common import exception
|
||||
|
||||
@@ -32,7 +32,7 @@ LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def model_query(context, *args, **kwargs):
|
||||
session = kwargs.get('session') or get_session()
|
||||
session = kwargs.get('session') or sql_session.get_session()
|
||||
query = session.query(*args)
|
||||
|
||||
return query
|
||||
@@ -80,7 +80,7 @@ def logbook_create(context, name, lb_id=None):
|
||||
|
||||
def logbook_get_workflows(context, lb_id):
|
||||
"""Return all workflows associated with a logbook"""
|
||||
session = get_session()
|
||||
session = sql_session.get_session()
|
||||
with session.begin():
|
||||
lb = logbook_get(context, lb_id, session=session)
|
||||
|
||||
@@ -89,7 +89,7 @@ def logbook_get_workflows(context, lb_id):
|
||||
|
||||
def logbook_add_workflow(context, lb_id, wf_name):
|
||||
"""Add Workflow to given LogBook"""
|
||||
session = get_session()
|
||||
session = sql_session.get_session()
|
||||
with session.begin():
|
||||
wf = workflow_get(context, wf_name, session=session)
|
||||
lb = logbook_get(context, lb_id, session=session)
|
||||
@@ -101,7 +101,7 @@ def logbook_add_workflow(context, lb_id, wf_name):
|
||||
|
||||
def logbook_destroy(context, lb_id):
|
||||
"""Delete a given LogBook"""
|
||||
session = get_session()
|
||||
session = sql_session.get_session()
|
||||
with session.begin():
|
||||
lb = logbook_get(context, lb_id, session=session)
|
||||
lb.delete(session=session)
|
||||
@@ -126,7 +126,7 @@ def job_get(context, job_id, session=None):
|
||||
|
||||
def job_update(context, job_id, values):
|
||||
"""Update job with given values"""
|
||||
session = get_session()
|
||||
session = sql_session.get_session()
|
||||
with session.begin():
|
||||
job = job_get(context, job_id, session=session)
|
||||
job.update(values)
|
||||
@@ -135,7 +135,7 @@ def job_update(context, job_id, values):
|
||||
|
||||
def job_add_workflow(context, job_id, wf_id):
|
||||
"""Add a Workflow to given job"""
|
||||
session = get_session()
|
||||
session = sql_session.get_session()
|
||||
with session.begin():
|
||||
job = job_get(context, job_id, session=session)
|
||||
wf = workflow_get(context, wf_id, session=session)
|
||||
@@ -157,7 +157,7 @@ def job_get_state(context, job_id):
|
||||
|
||||
def job_get_logbook(context, job_id):
|
||||
"""Return the logbook associated with the given job"""
|
||||
session = get_session()
|
||||
session = sql_session.get_session()
|
||||
with session.begin():
|
||||
job = job_get(context, job_id, session=session)
|
||||
return job.logbook
|
||||
@@ -177,7 +177,7 @@ def job_create(context, name, job_id=None):
|
||||
|
||||
def job_destroy(context, job_id):
|
||||
"""Delete a given Job"""
|
||||
session = get_session()
|
||||
session = sql_session.get_session()
|
||||
with session.begin():
|
||||
job = job_get(context, job_id, session=session)
|
||||
job.delete(session=session)
|
||||
@@ -218,7 +218,7 @@ def workflow_get_names(context):
|
||||
|
||||
def workflow_get_tasks(context, wf_name):
|
||||
"""Return all tasks for a given Workflow"""
|
||||
session = get_session()
|
||||
session = sql_session.get_session()
|
||||
with session.begin():
|
||||
wf = workflow_get(context, wf_name, session=session)
|
||||
|
||||
@@ -227,7 +227,7 @@ def workflow_get_tasks(context, wf_name):
|
||||
|
||||
def workflow_add_task(context, wf_id, task_id):
|
||||
"""Add a task to a given workflow"""
|
||||
session = get_session()
|
||||
session = sql_session.get_session()
|
||||
with session.begin():
|
||||
task = task_get(context, task_id, session=session)
|
||||
wf = workflow_get(context, wf_id, session=session)
|
||||
@@ -246,7 +246,7 @@ def workflow_create(context, workflow_name):
|
||||
|
||||
def workflow_destroy(context, wf_name):
|
||||
"""Delete a given Workflow"""
|
||||
session = get_session()
|
||||
session = sql_session.get_session()
|
||||
with session.begin():
|
||||
wf = workflow_get(context, wf_name, session=session)
|
||||
wf.delete(session=session)
|
||||
@@ -283,7 +283,7 @@ def task_create(context, task_name, wf_id, task_id=None):
|
||||
|
||||
def task_update(context, task_id, values):
|
||||
"""Update Task with given values"""
|
||||
session = get_session()
|
||||
session = sql_session.get_session()
|
||||
with session.begin():
|
||||
task = task_get(context, task_id)
|
||||
|
||||
@@ -293,7 +293,7 @@ def task_update(context, task_id, values):
|
||||
|
||||
def task_destroy(context, task_id):
|
||||
"""Delete an existing Task"""
|
||||
session = get_session()
|
||||
session = sql_session.get_session()
|
||||
with session.begin():
|
||||
task = task_get(context, task_id, session=session)
|
||||
task.delete(session=session)
|
||||
|
||||
@@ -29,8 +29,7 @@ from sqlalchemy.orm import object_mapper, relationship
|
||||
from sqlalchemy import DateTime, ForeignKey
|
||||
from sqlalchemy import types as types
|
||||
|
||||
from taskflow.db.sqlalchemy.session import get_engine
|
||||
from taskflow.db.sqlalchemy.session import get_session
|
||||
from taskflow.db.sqlalchemy import session as sql_session
|
||||
from taskflow.openstack.common import exception
|
||||
from taskflow.openstack.common import timeutils
|
||||
from taskflow.openstack.common import uuidutils
|
||||
@@ -59,7 +58,7 @@ class TaskFlowBase(object):
|
||||
def save(self, session=None):
|
||||
"""Save this object."""
|
||||
if not session:
|
||||
session = get_session()
|
||||
session = sql_session.get_session()
|
||||
session.add(self)
|
||||
try:
|
||||
session.flush()
|
||||
@@ -74,7 +73,7 @@ class TaskFlowBase(object):
|
||||
self.deleted = True
|
||||
self.deleted_at = timeutils.utcnow()
|
||||
if not session:
|
||||
session = get_session()
|
||||
session = sql_session.get_session()
|
||||
session.delete(self)
|
||||
session.flush()
|
||||
|
||||
@@ -182,4 +181,4 @@ class Task(BASE, TaskFlowBase):
|
||||
|
||||
|
||||
def create_tables():
|
||||
BASE.metadata.create_all(get_engine())
|
||||
BASE.metadata.create_all(sql_session.get_engine())
|
||||
|
||||
@@ -52,8 +52,7 @@ def synchronous_switch_listener(dbapi_conn, connection_rec):
|
||||
|
||||
|
||||
def ping_listener(dbapi_conn, connection_rec, connection_proxy):
|
||||
"""
|
||||
Ensures that MySQL connections checked out of the
|
||||
"""Ensures that MySQL connections checked out of the
|
||||
pool are alive.
|
||||
|
||||
Borrowed from:
|
||||
|
||||
@@ -17,16 +17,15 @@
|
||||
# under the License.
|
||||
|
||||
import abc
|
||||
import datetime
|
||||
import weakref
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
class TaskDetail(object):
|
||||
"""Task details have the bare minimum of these fields/methods."""
|
||||
|
||||
def __init__(self, name, metadata=None):
|
||||
self.date_created = datetime.utcnow()
|
||||
self.date_created = datetime.datetime.utcnow()
|
||||
self.name = name
|
||||
self.metadata = metadata
|
||||
self.date_updated = None
|
||||
|
||||
@@ -17,11 +17,11 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import celery
|
||||
import logging
|
||||
|
||||
from taskflow import logbook
|
||||
|
||||
from celery import chord
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
@@ -37,7 +37,7 @@ class Flow(object):
|
||||
logbook.add_workflow(name)
|
||||
|
||||
def chain_listeners(self, context, initial_task, callback_task):
|
||||
""" Register one listener for a task """
|
||||
"""Register one listener for a task."""
|
||||
if self.root is None:
|
||||
initial_task.name = '%s.%s' % (self.name, initial_task.name)
|
||||
self.root = initial_task.s(context)
|
||||
@@ -50,7 +50,7 @@ class Flow(object):
|
||||
initial_task.link(callback_task.s(context))
|
||||
|
||||
def split_listeners(self, context, initial_task, callback_tasks):
|
||||
""" Register multiple listeners for one task """
|
||||
"""Register multiple listeners for one task."""
|
||||
if self.root is None:
|
||||
initial_task.name = '%s.%s' % (self.name, initial_task.name)
|
||||
self.root = initial_task.s(context)
|
||||
@@ -62,7 +62,7 @@ class Flow(object):
|
||||
initial_task.link(task.s(context))
|
||||
|
||||
def merge_listeners(self, context, initial_tasks, callback_task):
|
||||
""" Register one listener for multiple tasks """
|
||||
"""Register one listener for multiple tasks."""
|
||||
header = []
|
||||
if self.root is None:
|
||||
self.root = []
|
||||
@@ -79,9 +79,9 @@ class Flow(object):
|
||||
|
||||
# TODO(jlucci): Need to set up chord so that it's not executed
|
||||
# immediately.
|
||||
chord(header, body=callback_task)
|
||||
celery.chord(header, body=callback_task)
|
||||
|
||||
def run(self, context, *args, **kwargs):
|
||||
""" Start root task and kick off workflow """
|
||||
"""Start root task and kick off workflow."""
|
||||
self.root(context)
|
||||
LOG.info('WF %s has been started' % (self.name,))
|
||||
|
||||
@@ -16,8 +16,7 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from datetime import datetime
|
||||
|
||||
import datetime
|
||||
import functools
|
||||
import threading
|
||||
import unittest2
|
||||
@@ -65,7 +64,7 @@ class MemoryBackendTest(unittest2.TestCase):
|
||||
if not my_jobs:
|
||||
# No jobs were claimed, lets not search the past again
|
||||
# then, since *likely* those jobs will remain claimed...
|
||||
job_search_from = datetime.utcnow()
|
||||
job_search_from = datetime.datetime.utcnow()
|
||||
if my_jobs and poison.isSet():
|
||||
# Oh crap, we need to unclaim and repost the jobs.
|
||||
for j in my_jobs:
|
||||
@@ -215,7 +214,7 @@ class MemoryBackendTest(unittest2.TestCase):
|
||||
j.claim(owner)
|
||||
|
||||
def receive_job():
|
||||
start = datetime.utcnow()
|
||||
start = datetime.datetime.utcnow()
|
||||
receiver_awake.set()
|
||||
new_jobs = []
|
||||
while not new_jobs:
|
||||
|
||||
Reference in New Issue
Block a user