Remove decorators and move to utils

In order to avoid the circular import
in threading utils move the decorators
functionality to utils/misc and move the
locking functionality to utils/lock_utils
and then use these functions from the
threading util (and elsewhere).

Fixes bug: 1236080

Change-Id: I9e71c2ba15782cbb6dd5ab7e1264b77ed47bc29e
This commit is contained in:
Joshua Harlow
2013-10-06 21:38:43 +00:00
committed by Joshua Harlow
parent 8750840ac8
commit e70032d0d9
9 changed files with 113 additions and 127 deletions

View File

@@ -1,60 +0,0 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012-2013 Yahoo! Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import functools
from taskflow.utils import threading_utils
def wraps(fn):
"""This will not be needed in python 3.2 or greater which already has this
built-in to its functools.wraps method.
"""
def wrapper(f):
f = functools.wraps(fn)(f)
f.__wrapped__ = getattr(fn, '__wrapped__', fn)
return f
return wrapper
def locked(*args, **kwargs):
def decorator(f):
attr_name = kwargs.get('lock', '_lock')
@wraps(f)
def wrapper(*args, **kwargs):
lock = getattr(args[0], attr_name)
if isinstance(lock, (tuple, list)):
lock = threading_utils.MultiLock(locks=list(lock))
with lock:
return f(*args, **kwargs)
return wrapper
# This is needed to handle when the decorator has args or the decorator
# doesn't have args, python is rather weird here...
if kwargs or not args:
return decorator
else:
if len(args) == 1:
return decorator(args[0])
else:
return decorator

View File

@@ -24,13 +24,13 @@ from taskflow.engines.action_engine import graph_action
from taskflow.engines.action_engine import task_action
from taskflow.engines import base
from taskflow import decorators
from taskflow import exceptions as exc
from taskflow.openstack.common import excutils
from taskflow import states
from taskflow import storage as t_storage
from taskflow.utils import flow_utils
from taskflow.utils import lock_utils
from taskflow.utils import misc
from taskflow.utils import threading_utils
@@ -76,7 +76,7 @@ class ActionEngine(base.EngineBase):
self.compile()
return self._root.graph
@decorators.locked
@lock_utils.locked
def run(self):
if self.storage.get_flow_state() != states.SUSPENDED:
self.compile()
@@ -102,7 +102,7 @@ class ActionEngine(base.EngineBase):
else:
self._change_state(state)
@decorators.locked(lock='_state_lock')
@lock_utils.locked(lock='_state_lock')
def _change_state(self, state):
old_state = self.storage.get_flow_state()
if not states.check_flow_transition(old_state, state):
@@ -158,7 +158,7 @@ class MultiThreadedActionEngine(ActionEngine):
flow, flow_detail, backend, conf)
self._executor = conf.get('executor', None)
@decorators.locked
@lock_utils.locked
def run(self):
if self._executor is None:
self._executor = futures.ThreadPoolExecutor(

View File

@@ -19,8 +19,8 @@
import threading
from taskflow import decorators
from taskflow.openstack.common import uuidutils
from taskflow.utils import lock_utils
class Job(object):
@@ -47,11 +47,11 @@ class Job(object):
self.state = None
self.book = None
@decorators.locked
@lock_utils.locked
def add(self, *flows):
self._flows.extend(flows)
@decorators.locked
@lock_utils.locked
def remove(self, flow):
j = -1
for i, f in enumerate(self._flows):

View File

@@ -24,7 +24,6 @@ import shutil
import threading
import weakref
from taskflow import decorators
from taskflow import exceptions as exc
from taskflow.openstack.common import jsonutils
from taskflow.openstack.common import timeutils
@@ -170,7 +169,7 @@ class Connection(base.Connection):
self._write_to(td_path, jsonutils.dumps(td_data))
return task_detail
@decorators.locked
@lock_utils.locked
def update_task_details(self, task_detail):
return self._run_with_process_lock("task",
self._save_task_details,
@@ -251,7 +250,7 @@ class Connection(base.Connection):
list(flow_detail), task_path)
return flow_detail
@decorators.locked
@lock_utils.locked
def update_flow_details(self, flow_detail):
return self._run_with_process_lock("flow",
self._save_flow_details,
@@ -298,12 +297,12 @@ class Connection(base.Connection):
list(book), flow_path)
return book
@decorators.locked
@lock_utils.locked
def save_logbook(self, book):
return self._run_with_process_lock("book",
self._save_logbook, book)
@decorators.locked
@lock_utils.locked
def upgrade(self):
def _step_create():
@@ -314,7 +313,7 @@ class Connection(base.Connection):
misc.ensure_tree(self._backend.lock_path)
self._run_with_process_lock("init", _step_create)
@decorators.locked
@lock_utils.locked
def clear_all(self):
def _step_clear():
@@ -334,7 +333,7 @@ class Connection(base.Connection):
# Acquire all locks by going through this little hiearchy.
self._run_with_process_lock("init", _step_book)
@decorators.locked
@lock_utils.locked
def destroy_logbook(self, book_uuid):
def _destroy_tasks(task_details):
@@ -392,7 +391,7 @@ class Connection(base.Connection):
lb.add(self._get_flow_details(fd_uuid))
return lb
@decorators.locked
@lock_utils.locked
def get_logbook(self, book_uuid):
return self._run_with_process_lock("book",
self._get_logbook, book_uuid)

View File

@@ -22,11 +22,11 @@
import logging
import threading
from taskflow import decorators
from taskflow import exceptions as exc
from taskflow.openstack.common import timeutils
from taskflow.persistence.backends import base
from taskflow.persistence import logbook
from taskflow.utils import lock_utils
from taskflow.utils import persistence_utils as p_utils
LOG = logging.getLogger(__name__)
@@ -85,7 +85,7 @@ class Connection(base.Connection):
def close(self):
pass
@decorators.locked(lock="_save_locks")
@lock_utils.locked(lock="_save_locks")
def clear_all(self):
count = 0
for uuid in list(self.backend.log_books.keys()):
@@ -93,7 +93,7 @@ class Connection(base.Connection):
count += 1
return count
@decorators.locked(lock="_save_locks")
@lock_utils.locked(lock="_save_locks")
def destroy_logbook(self, book_uuid):
try:
# Do the same cascading delete that the sql layer does.
@@ -105,7 +105,7 @@ class Connection(base.Connection):
except KeyError:
raise exc.NotFound("No logbook found with id: %s" % book_uuid)
@decorators.locked(lock="_save_locks")
@lock_utils.locked(lock="_save_locks")
def update_task_details(self, task_detail):
try:
e_td = self.backend.task_details[task_detail.uuid]
@@ -125,7 +125,7 @@ class Connection(base.Connection):
self.backend.task_details[task_detail.uuid] = e_td
p_utils.task_details_merge(e_td, task_detail, deep_copy=True)
@decorators.locked(lock="_save_locks")
@lock_utils.locked(lock="_save_locks")
def update_flow_details(self, flow_detail):
try:
e_fd = self.backend.flow_details[flow_detail.uuid]
@@ -136,7 +136,7 @@ class Connection(base.Connection):
self._save_flowdetail_tasks(e_fd, flow_detail)
return e_fd
@decorators.locked(lock="_save_locks")
@lock_utils.locked(lock="_save_locks")
def save_logbook(self, book):
# Get a existing logbook model (or create it if it isn't there).
try:
@@ -166,14 +166,14 @@ class Connection(base.Connection):
self._save_flowdetail_tasks(e_fd, flow_detail)
return e_lb
@decorators.locked(lock='_read_locks')
@lock_utils.locked(lock='_read_locks')
def get_logbook(self, book_uuid):
try:
return self.backend.log_books[book_uuid]
except KeyError:
raise exc.NotFound("No logbook found with id: %s" % book_uuid)
@decorators.locked(lock='_read_locks')
@lock_utils.locked(lock='_read_locks')
def _get_logbooks(self):
return list(self.backend.log_books.values())

View File

@@ -18,8 +18,8 @@
import sys
from taskflow import decorators
from taskflow import test
from taskflow.utils import lock_utils
from taskflow.utils import misc
from taskflow.utils import reflection
@@ -129,7 +129,7 @@ class GetRequiredCallableArgsTest(test.TestCase):
self.assertEquals(['i', 'j'], result)
def test_decorators_work(self):
@decorators.locked
@lock_utils.locked
def locked_fun(x, y):
pass
result = reflection.get_required_callable_args(locked_fun)

View File

@@ -24,12 +24,85 @@
import errno
import logging
import os
import threading
import time
from taskflow.utils import misc
LOG = logging.getLogger(__name__)
WAIT_TIME = 0.01
def locked(*args, **kwargs):
"""A decorator that looks for a given attribute (typically a lock or a list
of locks) and before executing the decorated function uses the given lock
or list of locks as a context manager, automatically releasing on exit.
"""
def decorator(f):
attr_name = kwargs.get('lock', '_lock')
@misc.wraps(f)
def wrapper(*args, **kwargs):
lock = getattr(args[0], attr_name)
if isinstance(lock, (tuple, list)):
lock = MultiLock(locks=list(lock))
with lock:
return f(*args, **kwargs)
return wrapper
# This is needed to handle when the decorator has args or the decorator
# doesn't have args, python is rather weird here...
if kwargs or not args:
return decorator
else:
if len(args) == 1:
return decorator(args[0])
else:
return decorator
class MultiLock(object):
"""A class which can attempt to obtain many locks at once and release
said locks when exiting.
Useful as a context manager around many locks (instead of having to nest
said individual context managers).
"""
def __init__(self, locks):
assert len(locks) > 0, "Zero locks requested"
self._locks = locks
self._locked = [False] * len(locks)
def __enter__(self):
def is_locked(lock):
# NOTE(harlowja): the threading2 lock doesn't seem to have this
# attribute, so thats why we are checking it existing first.
if hasattr(lock, 'locked'):
return lock.locked()
return False
for i in xrange(0, len(self._locked)):
if self._locked[i] or is_locked(self._locks[i]):
raise threading.ThreadError("Lock %s not previously released"
% (i + 1))
self._locked[i] = False
for (i, lock) in enumerate(self._locks):
self._locked[i] = lock.acquire()
def __exit__(self, type, value, traceback):
for (i, locked) in enumerate(self._locked):
try:
if locked:
self._locks[i].release()
self._locked[i] = False
except threading.ThreadError:
LOG.exception("Unable to release lock %s", i + 1)
class _InterProcessLock(object):
"""Lock implementation which allows multiple locks, working around
issues like bugs.debian.org/cgi-bin/bugreport.cgi?bug=632857 and does

View File

@@ -22,6 +22,7 @@ from distutils import version
import collections
import copy
import errno
import functools
import itertools
import logging
import os
@@ -36,6 +37,19 @@ import six
LOG = logging.getLogger(__name__)
def wraps(fn):
"""This will not be needed in python 3.2 or greater which already has this
built-in to its functools.wraps method.
"""
def wrapper(f):
f = functools.wraps(fn)(f)
f.__wrapped__ = getattr(fn, '__wrapped__', fn)
return f
return wrapper
def get_version_string(obj):
"""Gets a object's version as a string.

View File

@@ -22,6 +22,7 @@ import threading
import time
import types
from taskflow.utils import lock_utils
LOG = logging.getLogger(__name__)
@@ -57,46 +58,6 @@ def get_optimal_thread_count():
return 2
class MultiLock(object):
"""A class which can attempt to obtain many locks at once and release
said locks when exiting.
Useful as a context manager around many locks (instead of having to nest
said individual context managers).
"""
def __init__(self, locks):
assert len(locks) > 0, "Zero locks requested"
self._locks = locks
self._locked = [False] * len(locks)
def __enter__(self):
def is_locked(lock):
# NOTE(harlowja): the threading2 lock doesn't seem to have this
# attribute, so thats why we are checking it existing first.
if hasattr(lock, 'locked'):
return lock.locked()
return False
for i in xrange(0, len(self._locked)):
if self._locked[i] or is_locked(self._locks[i]):
raise threading.ThreadError("Lock %s not previously released"
% (i + 1))
self._locked[i] = False
for (i, lock) in enumerate(self._locks):
self._locked[i] = lock.acquire()
def __exit__(self, type, value, traceback):
for (i, locked) in enumerate(self._locked):
try:
if locked:
self._locks[i].release()
self._locked[i] = False
except threading.ThreadError:
LOG.exception("Unable to release lock %s", i + 1)
class CountDownLatch(object):
"""Similar in concept to the java count down latch."""
@@ -137,11 +98,10 @@ class ThreadSafeMeta(type):
"""Metaclass that adds locking to all pubic methods of a class"""
def __new__(cls, name, bases, attrs):
from taskflow import decorators
for attr_name, attr_value in attrs.iteritems():
if isinstance(attr_value, types.FunctionType):
if attr_name[0] != '_':
attrs[attr_name] = decorators.locked(attr_value)
attrs[attr_name] = lock_utils.locked(attr_value)
return super(ThreadSafeMeta, cls).__new__(cls, name, bases, attrs)
def __call__(cls, *args, **kwargs):