Add a directory/filesystem based persistence layer

Add a nice non-memory-based but also non-db based
persistence layer which is another good example of
how a persistence layer can be created (and used).

Example directory structure:

  /books
  /books/247c5311-d4ec-461b-9e76-51830d6a75b2
  /books/247c5311-d4ec-461b-9e76-51830d6a75b2/metadata
  /books/247c5311-d4ec-461b-9e76-51830d6a75b2/flows
  /books/247c5311-d4ec-461b-9e76-51830d6a75b2/flows/25f18828-a067-411e-9035-8217536f925d
  /flows
  /flows/25f18828-a067-411e-9035-8217536f925d
  /flows/25f18828-a067-411e-9035-8217536f925d/metadata
  /flows/25f18828-a067-411e-9035-8217536f925d/tasks
  /flows/25f18828-a067-411e-9035-8217536f925d/tasks/a352fa2e-82cf-4c37-89ae-3aa10dbf1437
  /tasks
  /tasks/a352fa2e-82cf-4c37-89ae-3aa10dbf1437

Change-Id: I63aaf56497187e21469bc500a49dd02de0c67f29
This commit is contained in:
Joshua Harlow
2013-09-18 16:04:41 -07:00
committed by Joshua Harlow
parent c108f6a1f5
commit 166bfff48c
10 changed files with 710 additions and 48 deletions

View File

@@ -29,6 +29,8 @@ packages =
[entry_points]
taskflow.persistence =
dir = taskflow.persistence.backends.impl_dir:DirBackend
file = taskflow.persistence.backends.impl_dir:DirBackend
memory = taskflow.persistence.backends.impl_memory:MemoryBackend
mysql = taskflow.persistence.backends.impl_sqlalchemy:SQLAlchemyBackend
postgresql = taskflow.persistence.backends.impl_sqlalchemy:SQLAlchemyBackend

View File

@@ -24,8 +24,6 @@ from concurrent import futures
from taskflow.engines.action_engine import graph_action
from taskflow.engines.action_engine import task_action
from taskflow.persistence import utils as p_utils
from taskflow import decorators
from taskflow import exceptions as exc
from taskflow import states
@@ -33,6 +31,7 @@ from taskflow import storage as t_storage
from taskflow.utils import flow_utils
from taskflow.utils import misc
from taskflow.utils import persistence_utils as p_utils
class ActionEngine(object):

View File

@@ -0,0 +1,474 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
# Copyright (C) 2013 Rackspace Hosting All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import errno
import logging
import os
import shutil
import threading
import weakref
from taskflow import decorators
from taskflow import exceptions as exc
from taskflow.openstack.common import jsonutils
from taskflow.openstack.common import timeutils
from taskflow.persistence.backends import base
from taskflow.persistence import logbook
from taskflow.utils import lock_utils
from taskflow.utils import misc
from taskflow.utils import persistence_utils as p_utils
LOG = logging.getLogger(__name__)
# The lock storage is not thread safe to set items in, so this lock is used to
# protect that access.
_LOCK_STORAGE_MUTATE = threading.RLock()
# Currently in use paths -> in-process locks are maintained here.
#
# NOTE(harlowja): Values in this dictionary will be automatically released once
# the objects referencing those objects have been garbage collected.
_LOCK_STORAGE = weakref.WeakValueDictionary()
class DirBackend(base.Backend):
"""A backend that writes logbooks, flow details, and task details to a
provided directory. This backend does *not* provide transactional semantics
although it does guarantee that there will be no race conditions when
writing/reading by using file level locking and in-process locking.
NOTE(harlowja): this is more of an example/testing backend and likely
should *not* be used in production, since this backend lacks transactional
semantics.
"""
def __init__(self, conf):
super(DirBackend, self).__init__(conf)
self._path = os.path.abspath(conf['path'])
self._lock_path = os.path.join(self._path, 'locks')
self._file_cache = {}
# Ensure that multiple threads are not accessing the same storage at
# the same time, the file lock mechanism doesn't protect against this
# so we must do in-process locking as well.
with _LOCK_STORAGE_MUTATE:
self._lock = _LOCK_STORAGE.setdefault(self._path,
threading.RLock())
@property
def lock_path(self):
return self._lock_path
@property
def base_path(self):
return self._path
def get_connection(self):
return Connection(self)
def close(self):
pass
class Connection(base.Connection):
def __init__(self, backend):
self._backend = backend
self._file_cache = self._backend._file_cache
self._flow_path = os.path.join(self._backend.base_path, 'flows')
self._task_path = os.path.join(self._backend.base_path, 'tasks')
self._book_path = os.path.join(self._backend.base_path, 'books')
# Share the backends lock so that all threads using the given backend
# are restricted in writing, since the per-process lock we are using
# to restrict the multi-process access does not work inside a process.
self._lock = backend._lock
def _read_from(self, filename):
# This is very similar to the oslo-incubator fileutils module, but
# tweaked to not depend on a global cache, as well as tweaked to not
# pull-in the oslo logging module (which is a huge pile of code).
mtime = os.path.getmtime(filename)
cache_info = self._file_cache.setdefault(filename, {})
if not cache_info or mtime > cache_info.get('mtime', 0):
with open(filename, 'rb') as fp:
cache_info['data'] = fp.read()
cache_info['mtime'] = mtime
return cache_info['data']
def _write_to(self, filename, contents):
with open(filename, 'wb') as fp:
fp.write(contents)
self._file_cache.pop(filename, None)
def _run_with_process_lock(self, lock_name, functor, *args, **kwargs):
lock_path = os.path.join(self.backend.lock_path, lock_name)
with lock_utils.InterProcessLock(lock_path):
try:
return functor(*args, **kwargs)
except exc.TaskFlowException:
raise
except Exception as e:
# NOTE(harlowja): trap all other errors as storage errors.
raise exc.StorageError("Failed running locking file based "
"session: %s" % e, e)
def _get_logbooks(self):
lb_uuids = []
try:
lb_uuids = [d for d in os.listdir(self._book_path)
if os.path.isdir(os.path.join(self._book_path, d))]
except EnvironmentError as e:
if e.errno != errno.ENOENT:
raise
for lb_uuid in lb_uuids:
try:
yield self._get_logbook(lb_uuid)
except exc.NotFound:
pass
def get_logbooks(self):
# NOTE(harlowja): don't hold the lock while iterating
with self._lock:
books = list(self._get_logbooks())
for b in books:
yield b
@property
def backend(self):
return self._backend
def close(self):
pass
def _save_task_details(self, task_detail, ignore_missing):
# See if we have an existing task detail to merge with.
e_td = None
try:
e_td = self._get_task_details(task_detail.uuid, lock=False)
except EnvironmentError:
if not ignore_missing:
raise exc.NotFound("No task details found with id: %s"
% task_detail.uuid)
if e_td is not None:
task_detail = p_utils.task_details_merge(e_td, task_detail)
td_path = os.path.join(self._task_path, task_detail.uuid)
td_data = _format_task_detail(task_detail)
self._write_to(td_path, jsonutils.dumps(td_data))
return task_detail
@decorators.locked
def update_task_details(self, task_detail):
return self._run_with_process_lock("task",
self._save_task_details,
task_detail,
ignore_missing=False)
def _get_task_details(self, uuid, lock=True):
def _get():
td_path = os.path.join(self._task_path, uuid)
td_data = jsonutils.loads(self._read_from(td_path))
return _unformat_task_detail(uuid, td_data)
if lock:
return self._run_with_process_lock('task', _get)
else:
return _get()
def _get_flow_details(self, uuid, lock=True):
def _get():
fd_path = os.path.join(self._flow_path, uuid)
meta_path = os.path.join(fd_path, 'metadata')
meta = jsonutils.loads(self._read_from(meta_path))
fd = _unformat_flow_detail(uuid, meta)
td_to_load = []
td_path = os.path.join(fd_path, 'tasks')
try:
td_to_load = [f for f in os.listdir(td_path)
if os.path.islink(os.path.join(td_path, f))]
except EnvironmentError as e:
if e.errno != errno.ENOENT:
raise
for t_uuid in td_to_load:
fd.add(self._get_task_details(t_uuid))
return fd
if lock:
return self._run_with_process_lock('flow', _get)
else:
return _get()
def _save_tasks_and_link(self, task_details, local_task_path):
for task_detail in task_details:
self._save_task_details(task_detail, ignore_missing=True)
src_td_path = os.path.join(self._task_path, task_detail.uuid)
target_td_path = os.path.join(local_task_path, task_detail.uuid)
try:
os.symlink(src_td_path, target_td_path)
except EnvironmentError as e:
if e.errno != errno.EEXIST:
raise
def _save_flow_details(self, flow_detail, ignore_missing):
# See if we have an existing flow detail to merge with.
e_fd = None
try:
e_fd = self._get_flow_details(flow_detail.uuid, lock=False)
except EnvironmentError:
if not ignore_missing:
raise exc.NotFound("No flow details found with id: %s"
% flow_detail.uuid)
if e_fd is not None:
e_fd = p_utils.flow_details_merge(e_fd, flow_detail)
for td in flow_detail:
if e_fd.find(td.uuid) is None:
e_fd.add(td)
flow_detail = e_fd
flow_path = os.path.join(self._flow_path, flow_detail.uuid)
misc.ensure_tree(flow_path)
self._write_to(os.path.join(flow_path, 'metadata'),
jsonutils.dumps(_format_flow_detail(flow_detail)))
if len(flow_detail):
task_path = os.path.join(flow_path, 'tasks')
misc.ensure_tree(task_path)
self._run_with_process_lock('task',
self._save_tasks_and_link,
list(flow_detail), task_path)
return flow_detail
@decorators.locked
def update_flow_details(self, flow_detail):
return self._run_with_process_lock("flow",
self._save_flow_details,
flow_detail,
ignore_missing=False)
def _save_flows_and_link(self, flow_details, local_flow_path):
for flow_detail in flow_details:
self._save_flow_details(flow_detail, ignore_missing=True)
src_fd_path = os.path.join(self._flow_path, flow_detail.uuid)
target_fd_path = os.path.join(local_flow_path, flow_detail.uuid)
try:
os.symlink(src_fd_path, target_fd_path)
except EnvironmentError as e:
if e.errno != errno.EEXIST:
raise
def _save_logbook(self, book):
# See if we have an existing logbook to merge with.
e_lb = None
try:
e_lb = self._get_logbook(book.uuid)
except exc.NotFound:
pass
if e_lb is not None:
e_lb = p_utils.logbook_merge(e_lb, book)
for fd in book:
if e_lb.find(fd.uuid) is None:
e_lb.add(fd)
book = e_lb
book_path = os.path.join(self._book_path, book.uuid)
misc.ensure_tree(book_path)
created_at = None
if e_lb is not None:
created_at = e_lb.created_at
self._write_to(os.path.join(book_path, 'metadata'),
jsonutils.dumps(_format_logbook(book,
created_at=created_at)))
if len(book):
flow_path = os.path.join(book_path, 'flows')
misc.ensure_tree(flow_path)
self._run_with_process_lock('flow',
self._save_flows_and_link,
list(book), flow_path)
return book
@decorators.locked
def save_logbook(self, book):
return self._run_with_process_lock("book",
self._save_logbook, book)
@decorators.locked
def upgrade(self):
def _step_create():
for d in (self._book_path, self._flow_path, self._task_path):
misc.ensure_tree(d)
misc.ensure_tree(self._backend.base_path)
misc.ensure_tree(self._backend.lock_path)
self._run_with_process_lock("init", _step_create)
@decorators.locked
def clear_all(self):
def _step_clear():
for d in (self._book_path, self._flow_path, self._task_path):
if os.path.isdir(d):
shutil.rmtree(d)
def _step_task():
self._run_with_process_lock("task", _step_clear)
def _step_flow():
self._run_with_process_lock("flow", _step_task)
def _step_book():
self._run_with_process_lock("book", _step_flow)
# Acquire all locks by going through this little hiearchy.
self._run_with_process_lock("init", _step_book)
@decorators.locked
def destroy_logbook(self, book_uuid):
def _destroy_tasks(task_details):
for task_detail in task_details:
try:
shutil.rmtree(os.path.join(self._task_path,
task_detail.uuid))
except EnvironmentError as e:
if e.errno != errno.ENOENT:
raise
def _destroy_flows(flow_details):
for flow_detail in flow_details:
self._run_with_process_lock("task", _destroy_tasks,
list(flow_detail))
try:
shutil.rmtree(os.path.join(self._flow_path,
flow_detail.uuid))
except EnvironmentError as e:
if e.errno != errno.ENOENT:
raise
def _destroy_book():
book = self._get_logbook(book_uuid)
self._run_with_process_lock("flow", _destroy_flows, list(book))
try:
shutil.rmtree(os.path.join(self._book_path, book.uuid))
except EnvironmentError as e:
if e.errno != errno.ENOENT:
raise
# Acquire all locks by going through this little hiearchy.
self._run_with_process_lock("book", _destroy_book)
def _get_logbook(self, book_uuid):
book_path = os.path.join(self._book_path, book_uuid)
meta_path = os.path.join(book_path, 'metadata')
try:
meta = jsonutils.loads(self._read_from(meta_path))
except EnvironmentError as e:
if e.errno == errno.ENOENT:
raise exc.NotFound("No logbook found with id: %s" % book_uuid)
else:
raise
lb = _unformat_logbook(book_uuid, meta)
fd_path = os.path.join(book_path, 'flows')
fd_uuids = []
try:
fd_uuids = [f for f in os.listdir(fd_path)
if os.path.islink(os.path.join(fd_path, f))]
except EnvironmentError as e:
if e.errno != errno.ENOENT:
raise
for fd_uuid in fd_uuids:
lb.add(self._get_flow_details(fd_uuid))
return lb
@decorators.locked
def get_logbook(self, book_uuid):
return self._run_with_process_lock("book",
self._get_logbook, book_uuid)
###
# Internal <-> external model + other helper functions.
###
def _str_2_datetime(text):
"""Converts an iso8601 string/text into a datetime object (or none)"""
if text is None:
return None
if not isinstance(text, basestring):
raise ValueError("Can only convert strings into a datetime object and"
" not %r" % (text))
if not len(text):
return None
return timeutils.parse_isotime(text)
def _format_task_detail(task_detail):
return {
'exception': task_detail.exception,
'meta': task_detail.meta,
'name': task_detail.name,
'results': task_detail.results,
'stacktrace': task_detail.stacktrace,
'state': task_detail.state,
'version': task_detail.version,
}
def _unformat_task_detail(uuid, td_data):
td = logbook.TaskDetail(name=td_data['name'], uuid=uuid)
td.state = td_data.get('state')
td.results = td_data.get('results')
td.exception = td_data.get('exception')
td.stacktrace = td_data.get('stacktrace')
td.meta = td_data.get('meta')
td.version = td_data.get('version')
return td
def _format_flow_detail(flow_detail):
return {
'name': flow_detail.name,
'meta': flow_detail.meta,
'state': flow_detail.state,
}
def _unformat_flow_detail(uuid, fd_data):
fd = logbook.FlowDetail(name=fd_data['name'], uuid=uuid)
fd.state = fd_data.get('state')
fd.meta = fd_data.get('meta')
return fd
def _format_logbook(book, created_at=None):
lb_data = {
'name': book.name,
'meta': book.meta,
}
if created_at:
lb_data['created_at'] = timeutils.isotime(at=created_at)
lb_data['updated_at'] = timeutils.isotime()
else:
lb_data['created_at'] = timeutils.isotime()
lb_data['updated_at'] = None
return lb_data
def _unformat_logbook(uuid, lb_data):
lb = logbook.LogBook(name=lb_data['name'],
uuid=uuid,
updated_at=_str_2_datetime(lb_data['updated_at']),
created_at=_str_2_datetime(lb_data['created_at']))
lb.meta = lb_data.get('meta')
return lb

View File

@@ -28,6 +28,7 @@ from taskflow import decorators
from taskflow import exceptions as exc
from taskflow.openstack.common import timeutils
from taskflow.persistence.backends import base
from taskflow.utils import persistence_utils as p_utils
LOG = logging.getLogger(__name__)
@@ -98,8 +99,8 @@ class Connection(base.Connection):
@decorators.locked(lock="_save_locks")
def update_task_details(self, task_detail):
try:
return _task_details_merge(_TASK_DETAILS[task_detail.uuid],
task_detail)
return p_utils.task_details_merge(_TASK_DETAILS[task_detail.uuid],
task_detail)
except KeyError:
raise exc.NotFound("No task details found with id: %s"
% task_detail.uuid)
@@ -107,8 +108,8 @@ class Connection(base.Connection):
@decorators.locked(lock="_save_locks")
def update_flow_details(self, flow_detail):
try:
e_fd = _flow_details_merge(_FLOW_DETAILS[flow_detail.uuid],
flow_detail)
e_fd = p_utils.flow_details_merge(_FLOW_DETAILS[flow_detail.uuid],
flow_detail)
for task_detail in flow_detail:
if e_fd.find(task_detail.uuid) is None:
_TASK_DETAILS[task_detail.uuid] = _copy(task_detail)
@@ -125,7 +126,7 @@ class Connection(base.Connection):
def save_logbook(self, book):
# Get a existing logbook model (or create it if it isn't there).
try:
e_lb = _logbook_merge(_LOG_BOOKS[book.uuid], book)
e_lb = p_utils.logbook_merge(_LOG_BOOKS[book.uuid], book)
# Add anything in to the new logbook that isn't already
# in the existing logbook.
for flow_detail in book:
@@ -164,41 +165,3 @@ class Connection(base.Connection):
books = list(_LOG_BOOKS.values())
for lb in books:
yield lb
###
# Merging + other helper functions.
###
def _task_details_merge(td_e, td_new):
if td_e is td_new:
return td_e
if td_e.state != td_new.state:
td_e.state = td_new.state
if td_e.results != td_new.results:
td_e.results = td_new.results
if td_e.exception != td_new.exception:
td_e.exception = td_new.exception
if td_e.stacktrace != td_new.stacktrace:
td_e.stacktrace = td_new.stacktrace
if td_e.meta != td_new.meta:
td_e.meta = td_new.meta
return td_e
def _flow_details_merge(fd_e, fd_new):
if fd_e is fd_new:
return fd_e
if fd_e.meta != fd_new.meta:
fd_e.meta = fd_new.meta
if fd_e.state != fd_new.state:
fd_e.state = fd_new.state
return fd_e
def _logbook_merge(lb_e, lb_new):
if lb_e is lb_new:
return lb_e
if lb_e.meta != lb_new.meta:
lb_e.meta = lb_new.meta
return lb_e

View File

@@ -0,0 +1,47 @@
# -*- coding: utf-8 -*-
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (C) 2013 Rackspace Hosting All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import shutil
import tempfile
from taskflow.persistence.backends import impl_dir
from taskflow import test
from taskflow.tests.unit.persistence import base
class DirPersistenceTest(test.TestCase, base.PersistenceTestMixin):
def _get_connection(self):
conf = {
'path': self.path,
}
return impl_dir.DirBackend(conf).get_connection()
def setUp(self):
super(DirPersistenceTest, self).setUp()
self.path = tempfile.mkdtemp()
conn = self._get_connection()
conn.upgrade()
def tearDown(self):
super(DirPersistenceTest, self).tearDown()
conn = self._get_connection()
conn.clear_all()
if self.path and os.path.isdir(self.path):
shutil.rmtree(self.path)
self.path = None

View File

@@ -29,10 +29,10 @@ from taskflow.engines.action_engine import engine as eng
from taskflow import exceptions
from taskflow.persistence.backends import impl_memory
from taskflow.persistence import logbook
from taskflow.persistence import utils as p_utils
from taskflow import states
from taskflow import task
from taskflow import test
from taskflow.utils import persistence_utils as p_utils
class TestTask(task.Task):

View File

@@ -21,10 +21,10 @@ import mock
from taskflow import exceptions
from taskflow.persistence.backends import impl_memory
from taskflow.persistence import utils as p_utils
from taskflow import states
from taskflow import storage
from taskflow import test
from taskflow.utils import persistence_utils as p_utils
class StorageTest(test.TestCase):

View File

@@ -0,0 +1,110 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
# This is a modified version of what was in oslo-incubator lockutils.py from
# commit 5039a610355e5265fb9fbd1f4023e8160750f32e but this one does not depend
# on oslo.cfg or the very large oslo-incubator oslo logging module (which also
# pulls in oslo.cfg) and is reduced to only what taskflow currently wants to
# use from that code.
import errno
import logging
import os
import time
LOG = logging.getLogger(__name__)
WAIT_TIME = 0.01
class _InterProcessLock(object):
"""Lock implementation which allows multiple locks, working around
issues like bugs.debian.org/cgi-bin/bugreport.cgi?bug=632857 and does
not require any cleanup. Since the lock is always held on a file
descriptor rather than outside of the process, the lock gets dropped
automatically if the process crashes, even if __exit__ is not executed.
There are no guarantees regarding usage by multiple green threads in a
single process here. This lock works only between processes.
Note these locks are released when the descriptor is closed, so it's not
safe to close the file descriptor while another green thread holds the
lock. Just opening and closing the lock file can break synchronisation,
so lock files must be accessed only using this abstraction.
"""
def __init__(self, name):
self._lockfile = None
self._fname = name
@property
def path(self):
return self._fname
def __enter__(self):
self._lockfile = open(self.path, 'w')
while True:
try:
# Using non-blocking locks since green threads are not
# patched to deal with blocking locking calls.
# Also upon reading the MSDN docs for locking(), it seems
# to have a laughable 10 attempts "blocking" mechanism.
self.trylock()
return self
except IOError as e:
if e.errno in (errno.EACCES, errno.EAGAIN):
time.sleep(WAIT_TIME)
else:
raise
def __exit__(self, exc_type, exc_val, exc_tb):
try:
self.unlock()
self._lockfile.close()
except IOError:
LOG.exception("Could not release the acquired lock `%s`",
self.path)
def trylock(self):
raise NotImplementedError()
def unlock(self):
raise NotImplementedError()
class _WindowsLock(_InterProcessLock):
def trylock(self):
msvcrt.locking(self._lockfile.fileno(), msvcrt.LK_NBLCK, 1)
def unlock(self):
msvcrt.locking(self._lockfile.fileno(), msvcrt.LK_UNLCK, 1)
class _PosixLock(_InterProcessLock):
def trylock(self):
fcntl.lockf(self._lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
def unlock(self):
fcntl.lockf(self._lockfile, fcntl.LOCK_UN)
if os.name == 'nt':
import msvcrt
InterProcessLock = _WindowsLock
else:
import fcntl
InterProcessLock = _PosixLock

View File

@@ -21,10 +21,13 @@ from distutils import version
import collections
import copy
import errno
import logging
import six
import os
import sys
import six
LOG = logging.getLogger(__name__)
@@ -97,6 +100,24 @@ def is_version_compatible(version_1, version_2):
return False
# Taken from oslo-incubator file-utils but since that module pulls in a large
# amount of other files it does not seem so useful to include that full
# module just for this function.
def ensure_tree(path):
"""Create a directory (and any ancestor directories required)
:param path: Directory to create
"""
try:
os.makedirs(path)
except OSError as exc:
if exc.errno == errno.EEXIST:
if not os.path.isdir(path):
raise
else:
raise
class TransitionNotifier(object):
"""A utility helper class that can be used to subscribe to
notifications of events occuring as well as allow a entity to post said

View File

@@ -82,3 +82,49 @@ def create_flow_detail(flow, book=None, backend=None):
if backend is not None:
LOG.warn("Can not save %s without a provided logbook", flow)
return flow_detail
def task_details_merge(td_e, td_new):
"""Merges an existing task details with a new task details object, the new
task details fields, if they differ will replace the existing objects
fields (except name, version, uuid which can not be replaced).
"""
if td_e is td_new:
return td_e
if td_e.state != td_new.state:
td_e.state = td_new.state
if td_e.results != td_new.results:
td_e.results = td_new.results
if td_e.exception != td_new.exception:
td_e.exception = td_new.exception
if td_e.stacktrace != td_new.stacktrace:
td_e.stacktrace = td_new.stacktrace
if td_e.meta != td_new.meta:
td_e.meta = td_new.meta
return td_e
def flow_details_merge(fd_e, fd_new):
"""Merges an existing flow details with a new flow details object, the new
flow details fields, if they differ will replace the existing objects
fields (except name and uuid which can not be replaced).
"""
if fd_e is fd_new:
return fd_e
if fd_e.meta != fd_new.meta:
fd_e.meta = fd_new.meta
if fd_e.state != fd_new.state:
fd_e.state = fd_new.state
return fd_e
def logbook_merge(lb_e, lb_new):
"""Merges an existing logbook with a new logbook object, the new logbook
fields, if they differ will replace the existing objects fields (except
name and uuid which can not be replaced).
"""
if lb_e is lb_new:
return lb_e
if lb_e.meta != lb_new.meta:
lb_e.meta = lb_new.meta
return lb_e