Factors lots of duplicate code out of persistence backends Adds get_flows_for_book to all backends Change-Id: I0434bd4931cd9274876f9e9c92909531f244bcac
128 lines
4.2 KiB
Python
128 lines
4.2 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
|
|
# Copyright (C) 2013 Rackspace Hosting All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import contextlib
|
|
import copy
|
|
import os
|
|
|
|
from taskflow import exceptions as exc
|
|
from taskflow.persistence import path_based
|
|
from taskflow.types import tree
|
|
from taskflow.utils import lock_utils
|
|
|
|
|
|
class MemoryBackend(path_based.PathBasedBackend):
|
|
"""A in-memory (non-persistent) backend.
|
|
|
|
This backend writes logbooks, flow details, and atom details to in-memory
|
|
dictionaries and retrieves from those dictionaries as needed.
|
|
|
|
This backend does *not* provide true transactional semantics. It does
|
|
guarantee that there will be no inter-thread race conditions when
|
|
writing and reading by using a read/write locks.
|
|
"""
|
|
def __init__(self, conf=None):
|
|
super(MemoryBackend, self).__init__(conf)
|
|
if self._path is None:
|
|
self._path = os.sep
|
|
self.memory = tree.Node(self._path)
|
|
self.lock = lock_utils.ReaderWriterLock()
|
|
|
|
def get_connection(self):
|
|
return Connection(self)
|
|
|
|
def close(self):
|
|
pass
|
|
|
|
|
|
class Connection(path_based.PathBasedConnection):
|
|
def __init__(self, backend):
|
|
super(Connection, self).__init__(backend)
|
|
self.upgrade()
|
|
|
|
@contextlib.contextmanager
|
|
def _memory_lock(self, write=False):
|
|
if write:
|
|
lock = self.backend.lock.write_lock
|
|
else:
|
|
lock = self.backend.lock.read_lock
|
|
|
|
with lock():
|
|
try:
|
|
yield
|
|
except exc.TaskFlowException as e:
|
|
raise
|
|
except Exception as e:
|
|
raise exc.StorageFailure("Storage backend internal error", e)
|
|
|
|
def _fetch_node(self, path):
|
|
node = self.backend.memory.find(path)
|
|
if node is None:
|
|
raise exc.NotFound("Item not found %s" % path)
|
|
return node
|
|
|
|
def _join_path(self, *parts):
|
|
return os.path.join(*parts)
|
|
|
|
def _get_item(self, path):
|
|
with self._memory_lock():
|
|
return copy.deepcopy(self._fetch_node(path).metadata['value'])
|
|
|
|
def _set_item(self, path, value, transaction):
|
|
value = copy.deepcopy(value)
|
|
try:
|
|
item_node = self._fetch_node(path)
|
|
item_node.metadata.update(value=value)
|
|
except exc.NotFound:
|
|
dirname, basename = os.path.split(path)
|
|
parent_node = self._fetch_node(dirname)
|
|
parent_node.add(tree.Node(path, name=basename, value=value))
|
|
|
|
def _del_tree(self, path, transaction):
|
|
node = self._fetch_node(path)
|
|
node.disassociate()
|
|
|
|
def _get_children(self, path):
|
|
with self._memory_lock():
|
|
return [node.metadata['name'] for node in self._fetch_node(path)]
|
|
|
|
def _ensure_path(self, path):
|
|
with self._memory_lock(write=True):
|
|
path = os.path.normpath(path)
|
|
parts = path.split(os.sep)
|
|
node = self.backend.memory
|
|
for p in range(len(parts) - 1):
|
|
node_path = os.sep.join(parts[:p + 2])
|
|
try:
|
|
node = self._fetch_node(node_path)
|
|
except exc.NotFound:
|
|
node.add(tree.Node(node_path, name=parts[p + 1]))
|
|
|
|
def _create_link(self, src_path, dest_path, transaction):
|
|
dirname, basename = os.path.split(dest_path)
|
|
parent_node = self._fetch_node(dirname)
|
|
parent_node.add(tree.Node(dest_path, name=basename, target=src_path))
|
|
|
|
@contextlib.contextmanager
|
|
def _transaction(self):
|
|
"""This just wraps a global write-lock"""
|
|
with self._memory_lock(write=True):
|
|
yield
|
|
|
|
def validate(self):
|
|
pass
|