diff --git a/setup.cfg b/setup.cfg index c9663684..378ff741 100644 --- a/setup.cfg +++ b/setup.cfg @@ -29,6 +29,8 @@ packages = [entry_points] taskflow.persistence = + dir = taskflow.persistence.backends.impl_dir:DirBackend + file = taskflow.persistence.backends.impl_dir:DirBackend memory = taskflow.persistence.backends.impl_memory:MemoryBackend mysql = taskflow.persistence.backends.impl_sqlalchemy:SQLAlchemyBackend postgresql = taskflow.persistence.backends.impl_sqlalchemy:SQLAlchemyBackend diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index bc0e544c..5df67322 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -24,8 +24,6 @@ from concurrent import futures from taskflow.engines.action_engine import graph_action from taskflow.engines.action_engine import task_action -from taskflow.persistence import utils as p_utils - from taskflow import decorators from taskflow import exceptions as exc from taskflow import states @@ -33,6 +31,7 @@ from taskflow import storage as t_storage from taskflow.utils import flow_utils from taskflow.utils import misc +from taskflow.utils import persistence_utils as p_utils class ActionEngine(object): diff --git a/taskflow/persistence/backends/impl_dir.py b/taskflow/persistence/backends/impl_dir.py new file mode 100644 index 00000000..cb129fc0 --- /dev/null +++ b/taskflow/persistence/backends/impl_dir.py @@ -0,0 +1,474 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# Copyright (C) 2013 Rackspace Hosting All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import errno +import logging +import os +import shutil +import threading +import weakref + +from taskflow import decorators +from taskflow import exceptions as exc +from taskflow.openstack.common import jsonutils +from taskflow.openstack.common import timeutils +from taskflow.persistence.backends import base +from taskflow.persistence import logbook +from taskflow.utils import lock_utils +from taskflow.utils import misc +from taskflow.utils import persistence_utils as p_utils + +LOG = logging.getLogger(__name__) + +# The lock storage is not thread safe to set items in, so this lock is used to +# protect that access. +_LOCK_STORAGE_MUTATE = threading.RLock() + +# Currently in use paths -> in-process locks are maintained here. +# +# NOTE(harlowja): Values in this dictionary will be automatically released once +# the objects referencing those objects have been garbage collected. +_LOCK_STORAGE = weakref.WeakValueDictionary() + + +class DirBackend(base.Backend): + """A backend that writes logbooks, flow details, and task details to a + provided directory. This backend does *not* provide transactional semantics + although it does guarantee that there will be no race conditions when + writing/reading by using file level locking and in-process locking. + + NOTE(harlowja): this is more of an example/testing backend and likely + should *not* be used in production, since this backend lacks transactional + semantics. + """ + def __init__(self, conf): + super(DirBackend, self).__init__(conf) + self._path = os.path.abspath(conf['path']) + self._lock_path = os.path.join(self._path, 'locks') + self._file_cache = {} + # Ensure that multiple threads are not accessing the same storage at + # the same time, the file lock mechanism doesn't protect against this + # so we must do in-process locking as well. + with _LOCK_STORAGE_MUTATE: + self._lock = _LOCK_STORAGE.setdefault(self._path, + threading.RLock()) + + @property + def lock_path(self): + return self._lock_path + + @property + def base_path(self): + return self._path + + def get_connection(self): + return Connection(self) + + def close(self): + pass + + +class Connection(base.Connection): + def __init__(self, backend): + self._backend = backend + self._file_cache = self._backend._file_cache + self._flow_path = os.path.join(self._backend.base_path, 'flows') + self._task_path = os.path.join(self._backend.base_path, 'tasks') + self._book_path = os.path.join(self._backend.base_path, 'books') + # Share the backends lock so that all threads using the given backend + # are restricted in writing, since the per-process lock we are using + # to restrict the multi-process access does not work inside a process. + self._lock = backend._lock + + def _read_from(self, filename): + # This is very similar to the oslo-incubator fileutils module, but + # tweaked to not depend on a global cache, as well as tweaked to not + # pull-in the oslo logging module (which is a huge pile of code). + mtime = os.path.getmtime(filename) + cache_info = self._file_cache.setdefault(filename, {}) + if not cache_info or mtime > cache_info.get('mtime', 0): + with open(filename, 'rb') as fp: + cache_info['data'] = fp.read() + cache_info['mtime'] = mtime + return cache_info['data'] + + def _write_to(self, filename, contents): + with open(filename, 'wb') as fp: + fp.write(contents) + self._file_cache.pop(filename, None) + + def _run_with_process_lock(self, lock_name, functor, *args, **kwargs): + lock_path = os.path.join(self.backend.lock_path, lock_name) + with lock_utils.InterProcessLock(lock_path): + try: + return functor(*args, **kwargs) + except exc.TaskFlowException: + raise + except Exception as e: + # NOTE(harlowja): trap all other errors as storage errors. + raise exc.StorageError("Failed running locking file based " + "session: %s" % e, e) + + def _get_logbooks(self): + lb_uuids = [] + try: + lb_uuids = [d for d in os.listdir(self._book_path) + if os.path.isdir(os.path.join(self._book_path, d))] + except EnvironmentError as e: + if e.errno != errno.ENOENT: + raise + for lb_uuid in lb_uuids: + try: + yield self._get_logbook(lb_uuid) + except exc.NotFound: + pass + + def get_logbooks(self): + # NOTE(harlowja): don't hold the lock while iterating + with self._lock: + books = list(self._get_logbooks()) + for b in books: + yield b + + @property + def backend(self): + return self._backend + + def close(self): + pass + + def _save_task_details(self, task_detail, ignore_missing): + # See if we have an existing task detail to merge with. + e_td = None + try: + e_td = self._get_task_details(task_detail.uuid, lock=False) + except EnvironmentError: + if not ignore_missing: + raise exc.NotFound("No task details found with id: %s" + % task_detail.uuid) + if e_td is not None: + task_detail = p_utils.task_details_merge(e_td, task_detail) + td_path = os.path.join(self._task_path, task_detail.uuid) + td_data = _format_task_detail(task_detail) + self._write_to(td_path, jsonutils.dumps(td_data)) + return task_detail + + @decorators.locked + def update_task_details(self, task_detail): + return self._run_with_process_lock("task", + self._save_task_details, + task_detail, + ignore_missing=False) + + def _get_task_details(self, uuid, lock=True): + + def _get(): + td_path = os.path.join(self._task_path, uuid) + td_data = jsonutils.loads(self._read_from(td_path)) + return _unformat_task_detail(uuid, td_data) + + if lock: + return self._run_with_process_lock('task', _get) + else: + return _get() + + def _get_flow_details(self, uuid, lock=True): + + def _get(): + fd_path = os.path.join(self._flow_path, uuid) + meta_path = os.path.join(fd_path, 'metadata') + meta = jsonutils.loads(self._read_from(meta_path)) + fd = _unformat_flow_detail(uuid, meta) + td_to_load = [] + td_path = os.path.join(fd_path, 'tasks') + try: + td_to_load = [f for f in os.listdir(td_path) + if os.path.islink(os.path.join(td_path, f))] + except EnvironmentError as e: + if e.errno != errno.ENOENT: + raise + for t_uuid in td_to_load: + fd.add(self._get_task_details(t_uuid)) + return fd + + if lock: + return self._run_with_process_lock('flow', _get) + else: + return _get() + + def _save_tasks_and_link(self, task_details, local_task_path): + for task_detail in task_details: + self._save_task_details(task_detail, ignore_missing=True) + src_td_path = os.path.join(self._task_path, task_detail.uuid) + target_td_path = os.path.join(local_task_path, task_detail.uuid) + try: + os.symlink(src_td_path, target_td_path) + except EnvironmentError as e: + if e.errno != errno.EEXIST: + raise + + def _save_flow_details(self, flow_detail, ignore_missing): + # See if we have an existing flow detail to merge with. + e_fd = None + try: + e_fd = self._get_flow_details(flow_detail.uuid, lock=False) + except EnvironmentError: + if not ignore_missing: + raise exc.NotFound("No flow details found with id: %s" + % flow_detail.uuid) + if e_fd is not None: + e_fd = p_utils.flow_details_merge(e_fd, flow_detail) + for td in flow_detail: + if e_fd.find(td.uuid) is None: + e_fd.add(td) + flow_detail = e_fd + flow_path = os.path.join(self._flow_path, flow_detail.uuid) + misc.ensure_tree(flow_path) + self._write_to(os.path.join(flow_path, 'metadata'), + jsonutils.dumps(_format_flow_detail(flow_detail))) + if len(flow_detail): + task_path = os.path.join(flow_path, 'tasks') + misc.ensure_tree(task_path) + self._run_with_process_lock('task', + self._save_tasks_and_link, + list(flow_detail), task_path) + return flow_detail + + @decorators.locked + def update_flow_details(self, flow_detail): + return self._run_with_process_lock("flow", + self._save_flow_details, + flow_detail, + ignore_missing=False) + + def _save_flows_and_link(self, flow_details, local_flow_path): + for flow_detail in flow_details: + self._save_flow_details(flow_detail, ignore_missing=True) + src_fd_path = os.path.join(self._flow_path, flow_detail.uuid) + target_fd_path = os.path.join(local_flow_path, flow_detail.uuid) + try: + os.symlink(src_fd_path, target_fd_path) + except EnvironmentError as e: + if e.errno != errno.EEXIST: + raise + + def _save_logbook(self, book): + # See if we have an existing logbook to merge with. + e_lb = None + try: + e_lb = self._get_logbook(book.uuid) + except exc.NotFound: + pass + if e_lb is not None: + e_lb = p_utils.logbook_merge(e_lb, book) + for fd in book: + if e_lb.find(fd.uuid) is None: + e_lb.add(fd) + book = e_lb + book_path = os.path.join(self._book_path, book.uuid) + misc.ensure_tree(book_path) + created_at = None + if e_lb is not None: + created_at = e_lb.created_at + self._write_to(os.path.join(book_path, 'metadata'), + jsonutils.dumps(_format_logbook(book, + created_at=created_at))) + if len(book): + flow_path = os.path.join(book_path, 'flows') + misc.ensure_tree(flow_path) + self._run_with_process_lock('flow', + self._save_flows_and_link, + list(book), flow_path) + return book + + @decorators.locked + def save_logbook(self, book): + return self._run_with_process_lock("book", + self._save_logbook, book) + + @decorators.locked + def upgrade(self): + + def _step_create(): + for d in (self._book_path, self._flow_path, self._task_path): + misc.ensure_tree(d) + + misc.ensure_tree(self._backend.base_path) + misc.ensure_tree(self._backend.lock_path) + self._run_with_process_lock("init", _step_create) + + @decorators.locked + def clear_all(self): + + def _step_clear(): + for d in (self._book_path, self._flow_path, self._task_path): + if os.path.isdir(d): + shutil.rmtree(d) + + def _step_task(): + self._run_with_process_lock("task", _step_clear) + + def _step_flow(): + self._run_with_process_lock("flow", _step_task) + + def _step_book(): + self._run_with_process_lock("book", _step_flow) + + # Acquire all locks by going through this little hiearchy. + self._run_with_process_lock("init", _step_book) + + @decorators.locked + def destroy_logbook(self, book_uuid): + + def _destroy_tasks(task_details): + for task_detail in task_details: + try: + shutil.rmtree(os.path.join(self._task_path, + task_detail.uuid)) + except EnvironmentError as e: + if e.errno != errno.ENOENT: + raise + + def _destroy_flows(flow_details): + for flow_detail in flow_details: + self._run_with_process_lock("task", _destroy_tasks, + list(flow_detail)) + try: + shutil.rmtree(os.path.join(self._flow_path, + flow_detail.uuid)) + except EnvironmentError as e: + if e.errno != errno.ENOENT: + raise + + def _destroy_book(): + book = self._get_logbook(book_uuid) + self._run_with_process_lock("flow", _destroy_flows, list(book)) + try: + shutil.rmtree(os.path.join(self._book_path, book.uuid)) + except EnvironmentError as e: + if e.errno != errno.ENOENT: + raise + + # Acquire all locks by going through this little hiearchy. + self._run_with_process_lock("book", _destroy_book) + + def _get_logbook(self, book_uuid): + book_path = os.path.join(self._book_path, book_uuid) + meta_path = os.path.join(book_path, 'metadata') + try: + meta = jsonutils.loads(self._read_from(meta_path)) + except EnvironmentError as e: + if e.errno == errno.ENOENT: + raise exc.NotFound("No logbook found with id: %s" % book_uuid) + else: + raise + lb = _unformat_logbook(book_uuid, meta) + fd_path = os.path.join(book_path, 'flows') + fd_uuids = [] + try: + fd_uuids = [f for f in os.listdir(fd_path) + if os.path.islink(os.path.join(fd_path, f))] + except EnvironmentError as e: + if e.errno != errno.ENOENT: + raise + for fd_uuid in fd_uuids: + lb.add(self._get_flow_details(fd_uuid)) + return lb + + @decorators.locked + def get_logbook(self, book_uuid): + return self._run_with_process_lock("book", + self._get_logbook, book_uuid) + + +### +# Internal <-> external model + other helper functions. +### + +def _str_2_datetime(text): + """Converts an iso8601 string/text into a datetime object (or none)""" + if text is None: + return None + if not isinstance(text, basestring): + raise ValueError("Can only convert strings into a datetime object and" + " not %r" % (text)) + if not len(text): + return None + return timeutils.parse_isotime(text) + + +def _format_task_detail(task_detail): + return { + 'exception': task_detail.exception, + 'meta': task_detail.meta, + 'name': task_detail.name, + 'results': task_detail.results, + 'stacktrace': task_detail.stacktrace, + 'state': task_detail.state, + 'version': task_detail.version, + } + + +def _unformat_task_detail(uuid, td_data): + td = logbook.TaskDetail(name=td_data['name'], uuid=uuid) + td.state = td_data.get('state') + td.results = td_data.get('results') + td.exception = td_data.get('exception') + td.stacktrace = td_data.get('stacktrace') + td.meta = td_data.get('meta') + td.version = td_data.get('version') + return td + + +def _format_flow_detail(flow_detail): + return { + 'name': flow_detail.name, + 'meta': flow_detail.meta, + 'state': flow_detail.state, + } + + +def _unformat_flow_detail(uuid, fd_data): + fd = logbook.FlowDetail(name=fd_data['name'], uuid=uuid) + fd.state = fd_data.get('state') + fd.meta = fd_data.get('meta') + return fd + + +def _format_logbook(book, created_at=None): + lb_data = { + 'name': book.name, + 'meta': book.meta, + } + if created_at: + lb_data['created_at'] = timeutils.isotime(at=created_at) + lb_data['updated_at'] = timeutils.isotime() + else: + lb_data['created_at'] = timeutils.isotime() + lb_data['updated_at'] = None + return lb_data + + +def _unformat_logbook(uuid, lb_data): + lb = logbook.LogBook(name=lb_data['name'], + uuid=uuid, + updated_at=_str_2_datetime(lb_data['updated_at']), + created_at=_str_2_datetime(lb_data['created_at'])) + lb.meta = lb_data.get('meta') + return lb diff --git a/taskflow/persistence/backends/impl_memory.py b/taskflow/persistence/backends/impl_memory.py index 9ba041ad..45e61275 100644 --- a/taskflow/persistence/backends/impl_memory.py +++ b/taskflow/persistence/backends/impl_memory.py @@ -28,6 +28,7 @@ from taskflow import decorators from taskflow import exceptions as exc from taskflow.openstack.common import timeutils from taskflow.persistence.backends import base +from taskflow.utils import persistence_utils as p_utils LOG = logging.getLogger(__name__) @@ -98,8 +99,8 @@ class Connection(base.Connection): @decorators.locked(lock="_save_locks") def update_task_details(self, task_detail): try: - return _task_details_merge(_TASK_DETAILS[task_detail.uuid], - task_detail) + return p_utils.task_details_merge(_TASK_DETAILS[task_detail.uuid], + task_detail) except KeyError: raise exc.NotFound("No task details found with id: %s" % task_detail.uuid) @@ -107,8 +108,8 @@ class Connection(base.Connection): @decorators.locked(lock="_save_locks") def update_flow_details(self, flow_detail): try: - e_fd = _flow_details_merge(_FLOW_DETAILS[flow_detail.uuid], - flow_detail) + e_fd = p_utils.flow_details_merge(_FLOW_DETAILS[flow_detail.uuid], + flow_detail) for task_detail in flow_detail: if e_fd.find(task_detail.uuid) is None: _TASK_DETAILS[task_detail.uuid] = _copy(task_detail) @@ -125,7 +126,7 @@ class Connection(base.Connection): def save_logbook(self, book): # Get a existing logbook model (or create it if it isn't there). try: - e_lb = _logbook_merge(_LOG_BOOKS[book.uuid], book) + e_lb = p_utils.logbook_merge(_LOG_BOOKS[book.uuid], book) # Add anything in to the new logbook that isn't already # in the existing logbook. for flow_detail in book: @@ -164,41 +165,3 @@ class Connection(base.Connection): books = list(_LOG_BOOKS.values()) for lb in books: yield lb - -### -# Merging + other helper functions. -### - - -def _task_details_merge(td_e, td_new): - if td_e is td_new: - return td_e - if td_e.state != td_new.state: - td_e.state = td_new.state - if td_e.results != td_new.results: - td_e.results = td_new.results - if td_e.exception != td_new.exception: - td_e.exception = td_new.exception - if td_e.stacktrace != td_new.stacktrace: - td_e.stacktrace = td_new.stacktrace - if td_e.meta != td_new.meta: - td_e.meta = td_new.meta - return td_e - - -def _flow_details_merge(fd_e, fd_new): - if fd_e is fd_new: - return fd_e - if fd_e.meta != fd_new.meta: - fd_e.meta = fd_new.meta - if fd_e.state != fd_new.state: - fd_e.state = fd_new.state - return fd_e - - -def _logbook_merge(lb_e, lb_new): - if lb_e is lb_new: - return lb_e - if lb_e.meta != lb_new.meta: - lb_e.meta = lb_new.meta - return lb_e diff --git a/taskflow/tests/unit/persistence/test_dir_persistence.py b/taskflow/tests/unit/persistence/test_dir_persistence.py new file mode 100644 index 00000000..076ab2d4 --- /dev/null +++ b/taskflow/tests/unit/persistence/test_dir_persistence.py @@ -0,0 +1,47 @@ +# -*- coding: utf-8 -*- + +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (C) 2013 Rackspace Hosting All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import os +import shutil +import tempfile + +from taskflow.persistence.backends import impl_dir +from taskflow import test +from taskflow.tests.unit.persistence import base + + +class DirPersistenceTest(test.TestCase, base.PersistenceTestMixin): + def _get_connection(self): + conf = { + 'path': self.path, + } + return impl_dir.DirBackend(conf).get_connection() + + def setUp(self): + super(DirPersistenceTest, self).setUp() + self.path = tempfile.mkdtemp() + conn = self._get_connection() + conn.upgrade() + + def tearDown(self): + super(DirPersistenceTest, self).tearDown() + conn = self._get_connection() + conn.clear_all() + if self.path and os.path.isdir(self.path): + shutil.rmtree(self.path) + self.path = None diff --git a/taskflow/tests/unit/test_action_engine.py b/taskflow/tests/unit/test_action_engine.py index 95dee871..85ae1dbf 100644 --- a/taskflow/tests/unit/test_action_engine.py +++ b/taskflow/tests/unit/test_action_engine.py @@ -29,10 +29,10 @@ from taskflow.engines.action_engine import engine as eng from taskflow import exceptions from taskflow.persistence.backends import impl_memory from taskflow.persistence import logbook -from taskflow.persistence import utils as p_utils from taskflow import states from taskflow import task from taskflow import test +from taskflow.utils import persistence_utils as p_utils class TestTask(task.Task): diff --git a/taskflow/tests/unit/test_storage.py b/taskflow/tests/unit/test_storage.py index 86c30d93..9fe6040f 100644 --- a/taskflow/tests/unit/test_storage.py +++ b/taskflow/tests/unit/test_storage.py @@ -21,10 +21,10 @@ import mock from taskflow import exceptions from taskflow.persistence.backends import impl_memory -from taskflow.persistence import utils as p_utils from taskflow import states from taskflow import storage from taskflow import test +from taskflow.utils import persistence_utils as p_utils class StorageTest(test.TestCase): diff --git a/taskflow/utils/lock_utils.py b/taskflow/utils/lock_utils.py new file mode 100644 index 00000000..47a2bbc4 --- /dev/null +++ b/taskflow/utils/lock_utils.py @@ -0,0 +1,110 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack Foundation. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# This is a modified version of what was in oslo-incubator lockutils.py from +# commit 5039a610355e5265fb9fbd1f4023e8160750f32e but this one does not depend +# on oslo.cfg or the very large oslo-incubator oslo logging module (which also +# pulls in oslo.cfg) and is reduced to only what taskflow currently wants to +# use from that code. + +import errno +import logging +import os +import time + +LOG = logging.getLogger(__name__) +WAIT_TIME = 0.01 + + +class _InterProcessLock(object): + """Lock implementation which allows multiple locks, working around + issues like bugs.debian.org/cgi-bin/bugreport.cgi?bug=632857 and does + not require any cleanup. Since the lock is always held on a file + descriptor rather than outside of the process, the lock gets dropped + automatically if the process crashes, even if __exit__ is not executed. + + There are no guarantees regarding usage by multiple green threads in a + single process here. This lock works only between processes. + + Note these locks are released when the descriptor is closed, so it's not + safe to close the file descriptor while another green thread holds the + lock. Just opening and closing the lock file can break synchronisation, + so lock files must be accessed only using this abstraction. + """ + + def __init__(self, name): + self._lockfile = None + self._fname = name + + @property + def path(self): + return self._fname + + def __enter__(self): + self._lockfile = open(self.path, 'w') + + while True: + try: + # Using non-blocking locks since green threads are not + # patched to deal with blocking locking calls. + # Also upon reading the MSDN docs for locking(), it seems + # to have a laughable 10 attempts "blocking" mechanism. + self.trylock() + return self + except IOError as e: + if e.errno in (errno.EACCES, errno.EAGAIN): + time.sleep(WAIT_TIME) + else: + raise + + def __exit__(self, exc_type, exc_val, exc_tb): + try: + self.unlock() + self._lockfile.close() + except IOError: + LOG.exception("Could not release the acquired lock `%s`", + self.path) + + def trylock(self): + raise NotImplementedError() + + def unlock(self): + raise NotImplementedError() + + +class _WindowsLock(_InterProcessLock): + def trylock(self): + msvcrt.locking(self._lockfile.fileno(), msvcrt.LK_NBLCK, 1) + + def unlock(self): + msvcrt.locking(self._lockfile.fileno(), msvcrt.LK_UNLCK, 1) + + +class _PosixLock(_InterProcessLock): + def trylock(self): + fcntl.lockf(self._lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB) + + def unlock(self): + fcntl.lockf(self._lockfile, fcntl.LOCK_UN) + + +if os.name == 'nt': + import msvcrt + InterProcessLock = _WindowsLock +else: + import fcntl + InterProcessLock = _PosixLock diff --git a/taskflow/utils/misc.py b/taskflow/utils/misc.py index bb0547a0..d4acaf46 100644 --- a/taskflow/utils/misc.py +++ b/taskflow/utils/misc.py @@ -21,10 +21,13 @@ from distutils import version import collections import copy +import errno import logging -import six +import os import sys +import six + LOG = logging.getLogger(__name__) @@ -97,6 +100,24 @@ def is_version_compatible(version_1, version_2): return False +# Taken from oslo-incubator file-utils but since that module pulls in a large +# amount of other files it does not seem so useful to include that full +# module just for this function. +def ensure_tree(path): + """Create a directory (and any ancestor directories required) + + :param path: Directory to create + """ + try: + os.makedirs(path) + except OSError as exc: + if exc.errno == errno.EEXIST: + if not os.path.isdir(path): + raise + else: + raise + + class TransitionNotifier(object): """A utility helper class that can be used to subscribe to notifications of events occuring as well as allow a entity to post said diff --git a/taskflow/persistence/utils.py b/taskflow/utils/persistence_utils.py similarity index 67% rename from taskflow/persistence/utils.py rename to taskflow/utils/persistence_utils.py index 8bd477ba..8562e96e 100644 --- a/taskflow/persistence/utils.py +++ b/taskflow/utils/persistence_utils.py @@ -82,3 +82,49 @@ def create_flow_detail(flow, book=None, backend=None): if backend is not None: LOG.warn("Can not save %s without a provided logbook", flow) return flow_detail + + +def task_details_merge(td_e, td_new): + """Merges an existing task details with a new task details object, the new + task details fields, if they differ will replace the existing objects + fields (except name, version, uuid which can not be replaced). + """ + if td_e is td_new: + return td_e + if td_e.state != td_new.state: + td_e.state = td_new.state + if td_e.results != td_new.results: + td_e.results = td_new.results + if td_e.exception != td_new.exception: + td_e.exception = td_new.exception + if td_e.stacktrace != td_new.stacktrace: + td_e.stacktrace = td_new.stacktrace + if td_e.meta != td_new.meta: + td_e.meta = td_new.meta + return td_e + + +def flow_details_merge(fd_e, fd_new): + """Merges an existing flow details with a new flow details object, the new + flow details fields, if they differ will replace the existing objects + fields (except name and uuid which can not be replaced). + """ + if fd_e is fd_new: + return fd_e + if fd_e.meta != fd_new.meta: + fd_e.meta = fd_new.meta + if fd_e.state != fd_new.state: + fd_e.state = fd_new.state + return fd_e + + +def logbook_merge(lb_e, lb_new): + """Merges an existing logbook with a new logbook object, the new logbook + fields, if they differ will replace the existing objects fields (except + name and uuid which can not be replaced). + """ + if lb_e is lb_new: + return lb_e + if lb_e.meta != lb_new.meta: + lb_e.meta = lb_new.meta + return lb_e