396 lines
15 KiB
Python
396 lines
15 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
# Copyright (C) 2014 AT&T Labs All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import contextlib
|
|
import logging
|
|
|
|
from kazoo import exceptions as k_exc
|
|
from kazoo.protocol import paths
|
|
|
|
from taskflow import exceptions as exc
|
|
from taskflow.openstack.common import jsonutils
|
|
from taskflow.persistence.backends import base
|
|
from taskflow.persistence import logbook
|
|
from taskflow.utils import kazoo_utils as k_utils
|
|
from taskflow.utils import misc
|
|
from taskflow.utils import persistence_utils as p_utils
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
# Transaction support was added in 3.4.0
|
|
MIN_ZK_VERSION = (3, 4, 0)
|
|
|
|
|
|
class ZkBackend(base.Backend):
|
|
"""ZooKeeper as backend storage implementation
|
|
|
|
Example conf (use Kazoo):
|
|
|
|
conf = {
|
|
"hosts": "192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181",
|
|
"path": "/taskflow",
|
|
}
|
|
"""
|
|
def __init__(self, conf, client=None):
|
|
super(ZkBackend, self).__init__(conf)
|
|
path = str(conf.get("path", "/taskflow"))
|
|
if not path:
|
|
raise ValueError("Empty zookeeper path is disallowed")
|
|
if not paths.isabs(path):
|
|
raise ValueError("Zookeeper path must be absolute")
|
|
self._path = path
|
|
if client is not None:
|
|
self._client = client
|
|
self._owned = False
|
|
else:
|
|
self._client = k_utils.make_client(conf)
|
|
self._owned = True
|
|
self._validated = False
|
|
|
|
@property
|
|
def path(self):
|
|
return self._path
|
|
|
|
def get_connection(self):
|
|
conn = ZkConnection(self, self._client)
|
|
if not self._validated:
|
|
conn.validate()
|
|
self._validated = True
|
|
return conn
|
|
|
|
def close(self):
|
|
self._validated = False
|
|
if not self._owned:
|
|
return
|
|
try:
|
|
k_utils.finalize_client(self._client)
|
|
except (k_exc.KazooException, k_exc.ZookeeperError) as e:
|
|
raise exc.StorageError("Unable to finalize client", e)
|
|
|
|
|
|
class ZkConnection(base.Connection):
|
|
def __init__(self, backend, client):
|
|
self._backend = backend
|
|
self._client = client
|
|
self._book_path = paths.join(self._backend.path, "books")
|
|
self._flow_path = paths.join(self._backend.path, "flow_details")
|
|
self._task_path = paths.join(self._backend.path, "task_details")
|
|
with self._exc_wrapper():
|
|
# NOOP if already started.
|
|
self._client.start()
|
|
|
|
def validate(self):
|
|
with self._exc_wrapper():
|
|
k_utils.check_compatible(self._client, MIN_ZK_VERSION)
|
|
|
|
@property
|
|
def backend(self):
|
|
return self._backend
|
|
|
|
@property
|
|
def book_path(self):
|
|
return self._book_path
|
|
|
|
@property
|
|
def flow_path(self):
|
|
return self._flow_path
|
|
|
|
@property
|
|
def task_path(self):
|
|
return self._task_path
|
|
|
|
def close(self):
|
|
pass
|
|
|
|
def upgrade(self):
|
|
"""Creates the initial paths (if they already don't exist)."""
|
|
with self._exc_wrapper():
|
|
for path in (self.book_path, self.flow_path, self.task_path):
|
|
self._client.ensure_path(path)
|
|
|
|
@contextlib.contextmanager
|
|
def _exc_wrapper(self):
|
|
"""Exception wrapper which wraps kazoo exceptions and groups them
|
|
to taskflow exceptions.
|
|
"""
|
|
try:
|
|
yield
|
|
except self._client.handler.timeout_exception as e:
|
|
raise exc.ConnectionFailure("Storage backend timeout: %s" % e)
|
|
except k_exc.SessionExpiredError as e:
|
|
raise exc.ConnectionFailure("Storage backend session"
|
|
" has expired: %s" % e)
|
|
except k_exc.NoNodeError as e:
|
|
raise exc.NotFound("Storage backend node not found: %s" % e)
|
|
except k_exc.NodeExistsError as e:
|
|
raise exc.AlreadyExists("Storage backend duplicate node: %s" % e)
|
|
except (k_exc.KazooException, k_exc.ZookeeperError) as e:
|
|
raise exc.StorageError("Storage backend internal error: %s" % e)
|
|
|
|
def update_task_details(self, td):
|
|
"""Update a task_detail transactionally."""
|
|
with self._exc_wrapper():
|
|
with self._client.transaction() as txn:
|
|
return self._update_task_details(td, txn)
|
|
|
|
def _update_task_details(self, td, txn, create_missing=False):
|
|
# Determine whether the desired data exists or not.
|
|
td_path = paths.join(self.task_path, td.uuid)
|
|
try:
|
|
td_data, _zstat = self._client.get(td_path)
|
|
except k_exc.NoNodeError:
|
|
# Not-existent: create or raise exception.
|
|
if create_missing:
|
|
txn.create(td_path)
|
|
e_td = logbook.TaskDetail(name=td.name, uuid=td.uuid)
|
|
else:
|
|
raise exc.NotFound("No task details found with id: %s"
|
|
% td.uuid)
|
|
else:
|
|
# Existent: read it out.
|
|
e_td = p_utils.unformat_task_detail(td.uuid,
|
|
misc.decode_json(td_data))
|
|
|
|
# Update and write it back
|
|
e_td = p_utils.task_details_merge(e_td, td)
|
|
td_data = p_utils.format_task_detail(e_td)
|
|
txn.set_data(td_path, misc.binary_encode(jsonutils.dumps(td_data)))
|
|
return e_td
|
|
|
|
def get_task_details(self, td_uuid):
|
|
"""Read a taskdetail.
|
|
|
|
*Read-only*, so no need of zk transaction.
|
|
"""
|
|
with self._exc_wrapper():
|
|
return self._get_task_details(td_uuid)
|
|
|
|
def _get_task_details(self, td_uuid):
|
|
td_path = paths.join(self.task_path, td_uuid)
|
|
try:
|
|
td_data, _zstat = self._client.get(td_path)
|
|
except k_exc.NoNodeError:
|
|
raise exc.NotFound("No task details found with id: %s" % td_uuid)
|
|
else:
|
|
return p_utils.unformat_task_detail(td_uuid,
|
|
misc.decode_json(td_data))
|
|
|
|
def update_flow_details(self, fd):
|
|
"""Update a flowdetail transactionally."""
|
|
with self._exc_wrapper():
|
|
with self._client.transaction() as txn:
|
|
return self._update_flow_details(fd, txn)
|
|
|
|
def _update_flow_details(self, fd, txn, create_missing=False):
|
|
# Determine whether the desired data exists or not
|
|
fd_path = paths.join(self.flow_path, fd.uuid)
|
|
try:
|
|
fd_data, _zstat = self._client.get(fd_path)
|
|
except k_exc.NoNodeError:
|
|
# Not-existent: create or raise exception
|
|
if create_missing:
|
|
txn.create(fd_path)
|
|
e_fd = logbook.FlowDetail(name=fd.name, uuid=fd.uuid)
|
|
else:
|
|
raise exc.NotFound("No flow details found with id: %s"
|
|
% fd.uuid)
|
|
else:
|
|
# Existent: read it out
|
|
e_fd = p_utils.unformat_flow_detail(fd.uuid,
|
|
misc.decode_json(fd_data))
|
|
|
|
# Update and write it back
|
|
e_fd = p_utils.flow_details_merge(e_fd, fd)
|
|
fd_data = p_utils.format_flow_detail(e_fd)
|
|
txn.set_data(fd_path, misc.binary_encode(jsonutils.dumps(fd_data)))
|
|
for td in fd:
|
|
td_path = paths.join(fd_path, td.uuid)
|
|
# NOTE(harlowja): create an entry in the flow detail path
|
|
# for the provided task detail so that a reference exists
|
|
# from the flow detail to its task details.
|
|
if not self._client.exists(td_path):
|
|
txn.create(td_path)
|
|
e_fd.add(self._update_task_details(td, txn, create_missing=True))
|
|
return e_fd
|
|
|
|
def get_flow_details(self, fd_uuid):
|
|
"""Read a flowdetail.
|
|
|
|
*Read-only*, so no need of zk transaction.
|
|
"""
|
|
with self._exc_wrapper():
|
|
return self._get_flow_details(fd_uuid)
|
|
|
|
def _get_flow_details(self, fd_uuid):
|
|
fd_path = paths.join(self.flow_path, fd_uuid)
|
|
try:
|
|
fd_data, _zstat = self._client.get(fd_path)
|
|
except k_exc.NoNodeError:
|
|
raise exc.NotFound("No flow details found with id: %s" % fd_uuid)
|
|
|
|
fd = p_utils.unformat_flow_detail(fd_uuid, misc.decode_json(fd_data))
|
|
for td_uuid in self._client.get_children(fd_path):
|
|
fd.add(self._get_task_details(td_uuid))
|
|
return fd
|
|
|
|
def save_logbook(self, lb):
|
|
"""Save (update) a log_book transactionally."""
|
|
|
|
def _create_logbook(lb_path, txn):
|
|
lb_data = p_utils.format_logbook(lb, created_at=None)
|
|
txn.create(lb_path, misc.binary_encode(jsonutils.dumps(lb_data)))
|
|
for fd in lb:
|
|
# NOTE(harlowja): create an entry in the logbook path
|
|
# for the provided flow detail so that a reference exists
|
|
# from the logbook to its flow details.
|
|
txn.create(paths.join(lb_path, fd.uuid))
|
|
fd_path = paths.join(self.flow_path, fd.uuid)
|
|
fd_data = jsonutils.dumps(p_utils.format_flow_detail(fd))
|
|
txn.create(fd_path, misc.binary_encode(fd_data))
|
|
for td in fd:
|
|
# NOTE(harlowja): create an entry in the flow detail path
|
|
# for the provided task detail so that a reference exists
|
|
# from the flow detail to its task details.
|
|
txn.create(paths.join(fd_path, td.uuid))
|
|
td_path = paths.join(self.task_path, td.uuid)
|
|
td_data = jsonutils.dumps(p_utils.format_task_detail(td))
|
|
txn.create(td_path, misc.binary_encode(td_data))
|
|
return lb
|
|
|
|
def _update_logbook(lb_path, lb_data, txn):
|
|
e_lb = p_utils.unformat_logbook(lb.uuid, misc.decode_json(lb_data))
|
|
e_lb = p_utils.logbook_merge(e_lb, lb)
|
|
lb_data = p_utils.format_logbook(e_lb, created_at=lb.created_at)
|
|
txn.set_data(lb_path, misc.binary_encode(jsonutils.dumps(lb_data)))
|
|
for fd in lb:
|
|
fd_path = paths.join(lb_path, fd.uuid)
|
|
if not self._client.exists(fd_path):
|
|
# NOTE(harlowja): create an entry in the logbook path
|
|
# for the provided flow detail so that a reference exists
|
|
# from the logbook to its flow details.
|
|
txn.create(fd_path)
|
|
e_fd = self._update_flow_details(fd, txn, create_missing=True)
|
|
e_lb.add(e_fd)
|
|
return e_lb
|
|
|
|
with self._exc_wrapper():
|
|
with self._client.transaction() as txn:
|
|
# Determine whether the desired data exists or not.
|
|
lb_path = paths.join(self.book_path, lb.uuid)
|
|
try:
|
|
lb_data, _zstat = self._client.get(lb_path)
|
|
except k_exc.NoNodeError:
|
|
# Create a new logbook since it doesn't exist.
|
|
e_lb = _create_logbook(lb_path, txn)
|
|
else:
|
|
# Otherwise update the existing logbook instead.
|
|
e_lb = _update_logbook(lb_path, lb_data, txn)
|
|
# Finally return (updated) logbook.
|
|
return e_lb
|
|
|
|
def _get_logbook(self, lb_uuid):
|
|
lb_path = paths.join(self.book_path, lb_uuid)
|
|
try:
|
|
lb_data, _zstat = self._client.get(lb_path)
|
|
except k_exc.NoNodeError:
|
|
raise exc.NotFound("No logbook found with id: %s" % lb_uuid)
|
|
else:
|
|
lb = p_utils.unformat_logbook(lb_uuid,
|
|
misc.decode_json(lb_data))
|
|
for fd_uuid in self._client.get_children(lb_path):
|
|
lb.add(self._get_flow_details(fd_uuid))
|
|
return lb
|
|
|
|
def get_logbook(self, lb_uuid):
|
|
"""Read a logbook.
|
|
|
|
*Read-only*, so no need of zk transaction.
|
|
"""
|
|
with self._exc_wrapper():
|
|
return self._get_logbook(lb_uuid)
|
|
|
|
def get_logbooks(self):
|
|
"""Read all logbooks.
|
|
|
|
*Read-only*, so no need of zk transaction.
|
|
"""
|
|
with self._exc_wrapper():
|
|
for lb_uuid in self._client.get_children(self.book_path):
|
|
yield self._get_logbook(lb_uuid)
|
|
|
|
def destroy_logbook(self, lb_uuid):
|
|
"""Destroy (delete) a log_book transactionally."""
|
|
|
|
def _destroy_task_details(td_uuid, txn):
|
|
td_path = paths.join(self.task_path, td_uuid)
|
|
if not self._client.exists(td_path):
|
|
raise exc.NotFound("No task details found with id: %s"
|
|
% td_uuid)
|
|
txn.delete(td_path)
|
|
|
|
def _destroy_flow_details(fd_uuid, txn):
|
|
fd_path = paths.join(self.flow_path, fd_uuid)
|
|
if not self._client.exists(fd_path):
|
|
raise exc.NotFound("No flow details found with id: %s"
|
|
% fd_uuid)
|
|
for td_uuid in self._client.get_children(fd_path):
|
|
_destroy_task_details(td_uuid, txn)
|
|
txn.delete(paths.join(fd_path, td_uuid))
|
|
txn.delete(fd_path)
|
|
|
|
def _destroy_logbook(lb_uuid, txn):
|
|
lb_path = paths.join(self.book_path, lb_uuid)
|
|
if not self._client.exists(lb_path):
|
|
raise exc.NotFound("No logbook found with id: %s" % lb_uuid)
|
|
for fd_uuid in self._client.get_children(lb_path):
|
|
_destroy_flow_details(fd_uuid, txn)
|
|
txn.delete(paths.join(lb_path, fd_uuid))
|
|
txn.delete(lb_path)
|
|
|
|
with self._exc_wrapper():
|
|
with self._client.transaction() as txn:
|
|
_destroy_logbook(lb_uuid, txn)
|
|
|
|
def clear_all(self, delete_dirs=True):
|
|
"""Delete all data transactioanlly."""
|
|
with self._exc_wrapper():
|
|
with self._client.transaction() as txn:
|
|
|
|
# Delete all data under logbook path.
|
|
for lb_uuid in self._client.get_children(self.book_path):
|
|
lb_path = paths.join(self.book_path, lb_uuid)
|
|
for fd_uuid in self._client.get_children(lb_path):
|
|
txn.delete(paths.join(lb_path, fd_uuid))
|
|
txn.delete(lb_path)
|
|
|
|
# Delete all data under flowdetail path.
|
|
for fd_uuid in self._client.get_children(self.flow_path):
|
|
fd_path = paths.join(self.flow_path, fd_uuid)
|
|
for td_uuid in self._client.get_children(fd_path):
|
|
txn.delete(paths.join(fd_path, td_uuid))
|
|
txn.delete(fd_path)
|
|
|
|
# Delete all data under taskdetail path.
|
|
for td_uuid in self._client.get_children(self.task_path):
|
|
td_path = paths.join(self.task_path, td_uuid)
|
|
txn.delete(td_path)
|
|
|
|
# Delete containing directories.
|
|
if delete_dirs:
|
|
txn.delete(self.book_path)
|
|
txn.delete(self.task_path)
|
|
txn.delete(self.flow_path)
|