Merge "Some zookeeper persistence improvements/adjustments"
This commit is contained in:
@@ -19,20 +19,22 @@
|
||||
import contextlib
|
||||
import logging
|
||||
|
||||
from kazoo import client as kazoo_client
|
||||
from kazoo import exceptions as k_exc
|
||||
from kazoo.protocol import paths
|
||||
from zake import fake_client
|
||||
|
||||
from taskflow import exceptions as exc
|
||||
from taskflow.openstack.common import jsonutils
|
||||
from taskflow.persistence.backends import base
|
||||
from taskflow.persistence import logbook
|
||||
from taskflow.utils import kazoo_utils as k_utils
|
||||
from taskflow.utils import misc
|
||||
from taskflow.utils import persistence_utils as p_utils
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
# Transaction support was added in 3.4.0
|
||||
MIN_ZK_VERSION = (3, 4, 0)
|
||||
|
||||
|
||||
class ZkBackend(base.Backend):
|
||||
"""ZooKeeper as backend storage implementation
|
||||
@@ -43,63 +45,79 @@ class ZkBackend(base.Backend):
|
||||
"hosts": "192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181",
|
||||
"path": "/taskflow",
|
||||
}
|
||||
|
||||
Example conf (use Zake):
|
||||
|
||||
conf = {
|
||||
"path": "/taskflow",
|
||||
}
|
||||
"""
|
||||
def __init__(self, conf):
|
||||
def __init__(self, conf, client=None):
|
||||
super(ZkBackend, self).__init__(conf)
|
||||
path = conf.get("path", "/taskflow")
|
||||
path = str(conf.get("path", "/taskflow"))
|
||||
if not path:
|
||||
raise ValueError("Empty zookeeper path is disallowed")
|
||||
if not paths.isabs(path):
|
||||
raise ValueError("Zookeeper path must be absolute")
|
||||
self._path = path
|
||||
hosts = conf.get("hosts", None)
|
||||
# if no specified hosts, use zake to fake
|
||||
if hosts is None:
|
||||
self._zk = fake_client.FakeClient()
|
||||
# otherwise use Kazoo
|
||||
if client is not None:
|
||||
self._zk = client
|
||||
self._owned = False
|
||||
else:
|
||||
self._zk = kazoo_client.KazooClient(hosts=hosts)
|
||||
|
||||
@property
|
||||
def zk(self):
|
||||
return self._zk
|
||||
self._zk = k_utils.make_client(conf)
|
||||
self._owned = True
|
||||
self._validated = False
|
||||
|
||||
@property
|
||||
def path(self):
|
||||
return self._path
|
||||
|
||||
def get_connection(self):
|
||||
return ZkConnection(self)
|
||||
conn = ZkConnection(self, self._zk)
|
||||
if not self._validated:
|
||||
conn.validate()
|
||||
self._validated = True
|
||||
return conn
|
||||
|
||||
def close(self):
|
||||
self.zk.stop()
|
||||
self.zk.close()
|
||||
self._validated = False
|
||||
if not self._owned:
|
||||
return
|
||||
try:
|
||||
self._zk.stop()
|
||||
except (k_exc.KazooException, k_exc.ZookeeperError) as e:
|
||||
raise exc.StorageError("Unable to stop client: %s" % e)
|
||||
try:
|
||||
self._zk.close()
|
||||
except TypeError:
|
||||
# NOTE(harlowja): https://github.com/python-zk/kazoo/issues/167
|
||||
pass
|
||||
except (k_exc.KazooException, k_exc.ZookeeperError) as e:
|
||||
raise exc.StorageError("Unable to close client: %s" % e)
|
||||
|
||||
|
||||
class ZkConnection(base.Connection):
|
||||
def __init__(self, backend):
|
||||
def __init__(self, backend, client):
|
||||
self._backend = backend
|
||||
|
||||
self._log_path = paths.join(self._backend.path, "log_books")
|
||||
self._client = client
|
||||
self._book_path = paths.join(self._backend.path, "books")
|
||||
self._flow_path = paths.join(self._backend.path, "flow_details")
|
||||
self._task_path = paths.join(self._backend.path, "task_details")
|
||||
|
||||
with self._exc_wrapper():
|
||||
self._backend.zk.start()
|
||||
self._backend.zk.ensure_path(self._backend.path)
|
||||
self._backend.zk.ensure_path(self.log_path)
|
||||
self._backend.zk.ensure_path(self.flow_path)
|
||||
self._backend.zk.ensure_path(self.task_path)
|
||||
# NOOP if already started.
|
||||
self._client.start()
|
||||
|
||||
def validate(self):
|
||||
with self._exc_wrapper():
|
||||
zk_ver = self._client.server_version()
|
||||
if tuple(zk_ver) < MIN_ZK_VERSION:
|
||||
given_zk_ver = ".".join([str(a) for a in zk_ver])
|
||||
desired_zk_ver = ".".join([str(a) for a in MIN_ZK_VERSION])
|
||||
raise exc.StorageError("Incompatible zookeeper version"
|
||||
" %s detected, zookeeper >= %s required"
|
||||
% (given_zk_ver, desired_zk_ver))
|
||||
|
||||
@property
|
||||
def backend(self):
|
||||
return self._backend
|
||||
|
||||
@property
|
||||
def log_path(self):
|
||||
return self._log_path
|
||||
def book_path(self):
|
||||
return self._book_path
|
||||
|
||||
@property
|
||||
def flow_path(self):
|
||||
@@ -113,7 +131,10 @@ class ZkConnection(base.Connection):
|
||||
pass
|
||||
|
||||
def upgrade(self):
|
||||
pass
|
||||
"""Creates the initial paths (if they already don't exist)."""
|
||||
with self._exc_wrapper():
|
||||
for path in (self.book_path, self.flow_path, self.task_path):
|
||||
self._client.ensure_path(path)
|
||||
|
||||
@contextlib.contextmanager
|
||||
def _exc_wrapper(self):
|
||||
@@ -122,264 +143,262 @@ class ZkConnection(base.Connection):
|
||||
"""
|
||||
try:
|
||||
yield
|
||||
except self._backend.zk.handler.timeout_exception as e:
|
||||
raise exc.ConnectionFailure("Backend error: %s" % e)
|
||||
except self._client.handler.timeout_exception as e:
|
||||
raise exc.ConnectionFailure("Storage backend timeout: %s" % e)
|
||||
except k_exc.SessionExpiredError as e:
|
||||
raise exc.ConnectionFailure("Backend error: %s" % e)
|
||||
raise exc.ConnectionFailure("Storage backend session"
|
||||
" has expired: %s" % e)
|
||||
except k_exc.NoNodeError as e:
|
||||
raise exc.NotFound("Backend error: %s" % e)
|
||||
raise exc.NotFound("Storage backend node not found: %s" % e)
|
||||
except k_exc.NodeExistsError as e:
|
||||
raise exc.AlreadyExists("Backend error: %s" % e)
|
||||
raise exc.AlreadyExists("Storage backend duplicate node: %s" % e)
|
||||
except (k_exc.KazooException, k_exc.ZookeeperError) as e:
|
||||
raise exc.TaskFlowException("Backend error: %s" % e)
|
||||
raise exc.StorageError("Storage backend internal error: %s" % e)
|
||||
|
||||
def update_task_details(self, td):
|
||||
"""Update a task_detail transactionally.
|
||||
"""
|
||||
"""Update a task_detail transactionally."""
|
||||
with self._exc_wrapper():
|
||||
with self.backend.zk.transaction() as txn:
|
||||
with self._client.transaction() as txn:
|
||||
return self._update_task_details(td, txn)
|
||||
|
||||
def _update_task_details(self, td, txn, create_missing=False):
|
||||
zk_path = paths.join(self.task_path, td.uuid)
|
||||
|
||||
# determine whether the desired data exists or not
|
||||
# Determine whether the desired data exists or not.
|
||||
td_path = paths.join(self.task_path, td.uuid)
|
||||
try:
|
||||
zk_data, _zstat = self.backend.zk.get(zk_path)
|
||||
td_data, _zstat = self._client.get(td_path)
|
||||
except k_exc.NoNodeError:
|
||||
# Not-existent: create or raise exception
|
||||
# Not-existent: create or raise exception.
|
||||
if create_missing:
|
||||
txn.create(zk_path)
|
||||
txn.create(td_path)
|
||||
e_td = logbook.TaskDetail(name=td.name, uuid=td.uuid)
|
||||
else:
|
||||
raise exc.NotFound("No task details found with id: %s"
|
||||
% td.uuid)
|
||||
# Existent: read it out
|
||||
else:
|
||||
# Existent: read it out.
|
||||
e_td = p_utils.unformat_task_detail(td.uuid,
|
||||
misc.decode_json(zk_data))
|
||||
misc.decode_json(td_data))
|
||||
|
||||
# Update and write it back
|
||||
e_td = p_utils.task_details_merge(e_td, td)
|
||||
zk_data = misc.binary_encode(jsonutils.dumps(
|
||||
p_utils.format_task_detail(e_td)))
|
||||
txn.set_data(zk_path, zk_data)
|
||||
td_data = p_utils.format_task_detail(e_td)
|
||||
txn.set_data(td_path, misc.binary_encode(jsonutils.dumps(td_data)))
|
||||
return e_td
|
||||
|
||||
def get_task_details(self, td_uuid):
|
||||
"""Read a task_detail. *Read-only*, so no need of zk transaction.
|
||||
"""Read a taskdetail.
|
||||
|
||||
*Read-only*, so no need of zk transaction.
|
||||
"""
|
||||
with self._exc_wrapper():
|
||||
return self._get_task_details(td_uuid)
|
||||
|
||||
def _get_task_details(self, td_uuid):
|
||||
zk_path = paths.join(self.task_path, td_uuid)
|
||||
|
||||
td_path = paths.join(self.task_path, td_uuid)
|
||||
try:
|
||||
zk_data, _zstat = self.backend.zk.get(zk_path)
|
||||
td_data, _zstat = self._client.get(td_path)
|
||||
except k_exc.NoNodeError:
|
||||
raise exc.NotFound("No task details found with id: %s" % td_uuid)
|
||||
|
||||
td = p_utils.unformat_task_detail(td_uuid, misc.decode_json(zk_data))
|
||||
return td
|
||||
else:
|
||||
return p_utils.unformat_task_detail(td_uuid,
|
||||
misc.decode_json(td_data))
|
||||
|
||||
def update_flow_details(self, fd):
|
||||
"""Update a flow_detail transactionally.
|
||||
"""
|
||||
"""Update a flowdetail transactionally."""
|
||||
with self._exc_wrapper():
|
||||
with self.backend.zk.transaction() as txn:
|
||||
with self._client.transaction() as txn:
|
||||
return self._update_flow_details(fd, txn)
|
||||
|
||||
def _update_flow_details(self, fd, txn, create_missing=False):
|
||||
zk_path = paths.join(self.flow_path, fd.uuid)
|
||||
|
||||
# determine whether the desired data exists or not
|
||||
# Determine whether the desired data exists or not
|
||||
fd_path = paths.join(self.flow_path, fd.uuid)
|
||||
try:
|
||||
zk_data, _zstat = self.backend.zk.get(zk_path)
|
||||
fd_data, _zstat = self._client.get(fd_path)
|
||||
except k_exc.NoNodeError:
|
||||
# Not-existent: create or raise exception
|
||||
# Not-existent: create or raise exception
|
||||
if create_missing:
|
||||
txn.create(zk_path)
|
||||
txn.create(fd_path)
|
||||
e_fd = logbook.FlowDetail(name=fd.name, uuid=fd.uuid)
|
||||
else:
|
||||
raise exc.NotFound("No flow details found with id: %s"
|
||||
% fd.uuid)
|
||||
# Existent: read it out
|
||||
else:
|
||||
# Existent: read it out
|
||||
e_fd = p_utils.unformat_flow_detail(fd.uuid,
|
||||
misc.decode_json(zk_data))
|
||||
misc.decode_json(fd_data))
|
||||
|
||||
# Update and write it back
|
||||
e_fd = p_utils.flow_details_merge(e_fd, fd)
|
||||
zk_data = misc.binary_encode(jsonutils.dumps(
|
||||
p_utils.format_flow_detail(e_fd)))
|
||||
txn.set_data(zk_path, zk_data)
|
||||
fd_data = p_utils.format_flow_detail(e_fd)
|
||||
txn.set_data(fd_path, misc.binary_encode(jsonutils.dumps(fd_data)))
|
||||
for td in fd:
|
||||
zk_path = paths.join(self.flow_path, fd.uuid, td.uuid)
|
||||
if not self.backend.zk.exists(zk_path):
|
||||
txn.create(zk_path)
|
||||
e_td = self._update_task_details(td, txn, create_missing=True)
|
||||
e_fd.add(e_td)
|
||||
td_path = paths.join(fd_path, td.uuid)
|
||||
# NOTE(harlowja): create an entry in the flow detail path
|
||||
# for the provided task detail so that a reference exists
|
||||
# from the flow detail to its task details.
|
||||
if not self._client.exists(td_path):
|
||||
txn.create(td_path)
|
||||
e_fd.add(self._update_task_details(td, txn, create_missing=True))
|
||||
return e_fd
|
||||
|
||||
def get_flow_details(self, fd_uuid):
|
||||
"""Read a flow_detail. *Read-only*, so no need of zk transaction.
|
||||
"""Read a flowdetail.
|
||||
|
||||
*Read-only*, so no need of zk transaction.
|
||||
"""
|
||||
with self._exc_wrapper():
|
||||
return self._get_flow_details(fd_uuid)
|
||||
|
||||
def _get_flow_details(self, fd_uuid):
|
||||
zk_path = paths.join(self.flow_path, fd_uuid)
|
||||
|
||||
fd_path = paths.join(self.flow_path, fd_uuid)
|
||||
try:
|
||||
zk_data, _zstat = self.backend.zk.get(zk_path)
|
||||
fd_data, _zstat = self._client.get(fd_path)
|
||||
except k_exc.NoNodeError:
|
||||
raise exc.NotFound("No flow details found with id: %s" % fd_uuid)
|
||||
|
||||
fd = p_utils.unformat_flow_detail(fd_uuid, misc.decode_json(zk_data))
|
||||
for td_uuid in self.backend.zk.get_children(zk_path):
|
||||
td = self._get_task_details(td_uuid)
|
||||
fd.add(td)
|
||||
fd = p_utils.unformat_flow_detail(fd_uuid, misc.decode_json(fd_data))
|
||||
for td_uuid in self._client.get_children(fd_path):
|
||||
fd.add(self._get_task_details(td_uuid))
|
||||
return fd
|
||||
|
||||
def save_logbook(self, lb):
|
||||
"""Save (update) a log_book transactionally.
|
||||
"""
|
||||
"""Save (update) a log_book transactionally."""
|
||||
|
||||
def create_logbook(lb_path, txn):
|
||||
lb_data = p_utils.format_logbook(lb, created_at=None)
|
||||
txn.create(lb_path, misc.binary_encode(jsonutils.dumps(lb_data)))
|
||||
for fd in lb:
|
||||
# NOTE(harlowja): create an entry in the logbook path
|
||||
# for the provided flow detail so that a reference exists
|
||||
# from the logbook to its flow details.
|
||||
txn.create(paths.join(lb_path, fd.uuid))
|
||||
fd_path = paths.join(self.flow_path, fd.uuid)
|
||||
fd_data = jsonutils.dumps(p_utils.format_flow_detail(fd))
|
||||
txn.create(fd_path, misc.binary_encode(fd_data))
|
||||
for td in fd:
|
||||
# NOTE(harlowja): create an entry in the flow detail path
|
||||
# for the provided task detail so that a reference exists
|
||||
# from the flow detail to its task details.
|
||||
txn.create(paths.join(fd_path, td.uuid))
|
||||
td_path = paths.join(self.task_path, td.uuid)
|
||||
td_data = jsonutils.dumps(p_utils.format_task_detail(td))
|
||||
txn.create(td_path, misc.binary_encode(td_data))
|
||||
return lb
|
||||
|
||||
def update_logbook(lb_path, lb_data, txn):
|
||||
e_lb = p_utils.unformat_logbook(lb.uuid, misc.decode_json(lb_data))
|
||||
e_lb = p_utils.logbook_merge(e_lb, lb)
|
||||
lb_data = p_utils.format_logbook(e_lb, created_at=lb.created_at)
|
||||
txn.set_data(lb_path, misc.binary_encode(jsonutils.dumps(lb_data)))
|
||||
for fd in lb:
|
||||
fd_path = paths.join(lb_path, fd.uuid)
|
||||
if not self._client.exists(fd_path):
|
||||
# NOTE(harlowja): create an entry in the logbook path
|
||||
# for the provided flow detail so that a reference exists
|
||||
# from the logbook to its flow details.
|
||||
txn.create(fd_path)
|
||||
e_fd = self._update_flow_details(fd, txn, create_missing=True)
|
||||
e_lb.add(e_fd)
|
||||
return e_lb
|
||||
|
||||
with self._exc_wrapper():
|
||||
with self.backend.zk.transaction() as txn:
|
||||
zk_path = paths.join(self.log_path, lb.uuid)
|
||||
|
||||
# determine whether the desired data exists or not
|
||||
with self._client.transaction() as txn:
|
||||
# Determine whether the desired data exists or not.
|
||||
lb_path = paths.join(self.book_path, lb.uuid)
|
||||
try:
|
||||
zk_data, _zstat = self.backend.zk.get(zk_path)
|
||||
lb_data, _zstat = self._client.get(lb_path)
|
||||
except k_exc.NoNodeError:
|
||||
# Create if a new log_book
|
||||
e_lb = lb
|
||||
zk_data = misc.binary_encode(jsonutils.dumps(
|
||||
p_utils.format_logbook(lb, created_at=None)))
|
||||
txn.create(zk_path, zk_data)
|
||||
for fd in lb:
|
||||
zk_path = paths.join(self.log_path, lb.uuid, fd.uuid)
|
||||
txn.create(zk_path)
|
||||
zk_path = paths.join(self.flow_path, fd.uuid)
|
||||
zk_data = misc.binary_encode(jsonutils.dumps(
|
||||
p_utils.format_flow_detail(fd)))
|
||||
txn.create(zk_path, zk_data)
|
||||
for td in fd:
|
||||
zk_path = paths.join(
|
||||
self.flow_path, fd.uuid, td.uuid)
|
||||
txn.create(zk_path)
|
||||
zk_path = paths.join(self.task_path, td.uuid)
|
||||
zk_data = misc.binary_encode(jsonutils.dumps(
|
||||
p_utils.format_task_detail(td)))
|
||||
txn.create(zk_path, zk_data)
|
||||
|
||||
# Otherwise update the existing log_book
|
||||
# Create a new logbook since it doesn't exist.
|
||||
e_lb = create_logbook(lb_path, txn)
|
||||
else:
|
||||
e_lb = p_utils.unformat_logbook(lb.uuid,
|
||||
misc.decode_json(zk_data))
|
||||
e_lb = p_utils.logbook_merge(e_lb, lb)
|
||||
zk_data = misc.binary_encode((jsonutils.dumps(
|
||||
p_utils.format_logbook(e_lb,
|
||||
created_at=lb.created_at))))
|
||||
txn.set_data(zk_path, zk_data)
|
||||
for fd in lb:
|
||||
zk_path = paths.join(self.log_path, lb.uuid, fd.uuid)
|
||||
if not self.backend.zk.exists(zk_path):
|
||||
txn.create(zk_path)
|
||||
e_fd = self._update_flow_details(fd, txn,
|
||||
create_missing=True)
|
||||
e_lb.add(e_fd)
|
||||
# finally return (updated) log_book
|
||||
# Otherwise update the existing logbook instead.
|
||||
e_lb = update_logbook(lb_path, lb_data, txn)
|
||||
# Finally return (updated) logbook.
|
||||
return e_lb
|
||||
|
||||
def get_logbook(self, lb_uuid):
|
||||
"""Read a log_book. *Read-only*, so no need of zk transaction.
|
||||
"""Read a logbook.
|
||||
|
||||
*Read-only*, so no need of zk transaction.
|
||||
"""
|
||||
with self._exc_wrapper():
|
||||
zk_path = paths.join(self.log_path, lb_uuid)
|
||||
|
||||
lb_path = paths.join(self.book_path, lb_uuid)
|
||||
try:
|
||||
zk_data, _zstat = self.backend.zk.get(zk_path)
|
||||
lb_data, _zstat = self._client.get(lb_path)
|
||||
except k_exc.NoNodeError:
|
||||
raise exc.NotFound("No logbook found with id: %s" % lb_uuid)
|
||||
|
||||
lb = p_utils.unformat_logbook(lb_uuid, misc.decode_json(zk_data))
|
||||
for fd_uuid in self.backend.zk.get_children(zk_path):
|
||||
fd = self._get_flow_details(fd_uuid)
|
||||
lb.add(fd)
|
||||
return lb
|
||||
else:
|
||||
lb = p_utils.unformat_logbook(lb_uuid,
|
||||
misc.decode_json(lb_data))
|
||||
for fd_uuid in self._client.get_children(lb_path):
|
||||
lb.add(self._get_flow_details(fd_uuid))
|
||||
return lb
|
||||
|
||||
def get_logbooks(self):
|
||||
"""Read all logbooks. *Read-only*, so no need of zk transaction
|
||||
"""
|
||||
"""Read all logbooks. *Read-only*, so no need of zk transaction."""
|
||||
with self._exc_wrapper():
|
||||
for lb_uuid in self.backend.zk.get_children(self.log_path):
|
||||
for lb_uuid in self._client.get_children(self.book_path):
|
||||
yield self.get_logbook(lb_uuid)
|
||||
|
||||
def destroy_logbook(self, lb_uuid):
|
||||
"""Detroy (delete) a log_book transactionally.
|
||||
"""
|
||||
"""Detroy (delete) a log_book transactionally."""
|
||||
|
||||
def _destroy_task_details(td_uuid, txn):
|
||||
zk_path = paths.join(self.task_path, td_uuid)
|
||||
if not self.backend.zk.exists(zk_path):
|
||||
td_path = paths.join(self.task_path, td_uuid)
|
||||
if not self._client.exists(td_path):
|
||||
raise exc.NotFound("No task details found with id: %s"
|
||||
% td_uuid)
|
||||
|
||||
txn.delete(zk_path)
|
||||
txn.delete(td_path)
|
||||
|
||||
def _destroy_flow_details(fd_uuid, txn):
|
||||
zk_path = paths.join(self.flow_path, fd_uuid)
|
||||
if not self.backend.zk.exists(zk_path):
|
||||
fd_path = paths.join(self.flow_path, fd_uuid)
|
||||
if not self._client.exists(fd_path):
|
||||
raise exc.NotFound("No flow details found with id: %s"
|
||||
% fd_uuid)
|
||||
|
||||
for td_uuid in self.backend.zk.get_children(zk_path):
|
||||
for td_uuid in self._client.get_children(fd_path):
|
||||
_destroy_task_details(td_uuid, txn)
|
||||
txn.delete(paths.join(zk_path, td_uuid))
|
||||
txn.delete(zk_path)
|
||||
txn.delete(paths.join(fd_path, td_uuid))
|
||||
txn.delete(fd_path)
|
||||
|
||||
def _destroy_logbook(lb_uuid, txn):
|
||||
zk_path = paths.join(self.log_path, lb_uuid)
|
||||
if not self.backend.zk.exists(zk_path):
|
||||
lb_path = paths.join(self.book_path, lb_uuid)
|
||||
if not self._client.exists(lb_path):
|
||||
raise exc.NotFound("No logbook found with id: %s" % lb_uuid)
|
||||
|
||||
for fd_uuid in self.backend.zk.get_children(zk_path):
|
||||
for fd_uuid in self._client.get_children(lb_path):
|
||||
_destroy_flow_details(fd_uuid, txn)
|
||||
txn.delete(paths.join(zk_path, fd_uuid))
|
||||
txn.delete(zk_path)
|
||||
txn.delete(paths.join(lb_path, fd_uuid))
|
||||
txn.delete(lb_path)
|
||||
|
||||
with self._exc_wrapper():
|
||||
with self.backend.zk.transaction() as txn:
|
||||
with self._client.transaction() as txn:
|
||||
_destroy_logbook(lb_uuid, txn)
|
||||
|
||||
def clear_all(self):
|
||||
"""Delete all data transactioanlly.
|
||||
"""
|
||||
def clear_all(self, delete_dirs=True):
|
||||
"""Delete all data transactioanlly."""
|
||||
with self._exc_wrapper():
|
||||
with self.backend.zk.transaction() as txn:
|
||||
# delete all data under log_book path
|
||||
for lb_uuid in self.backend.zk.get_children(self.log_path):
|
||||
zk_path = paths.join(self.log_path, lb_uuid)
|
||||
for fd_uuid in self.backend.zk.get_children(zk_path):
|
||||
txn.delete(paths.join(zk_path, fd_uuid))
|
||||
txn.delete(zk_path)
|
||||
txn.delete(self.log_path)
|
||||
with self._client.transaction() as txn:
|
||||
|
||||
# delete all data under flow_detail path
|
||||
for fd_uuid in self.backend.zk.get_children(self.flow_path):
|
||||
zk_path = paths.join(self.flow_path, fd_uuid)
|
||||
for td_uuid in self.backend.zk.get_children(zk_path):
|
||||
txn.delete(paths.join(zk_path, td_uuid))
|
||||
txn.delete(zk_path)
|
||||
txn.delete(self.flow_path)
|
||||
# Delete all data under logbook path.
|
||||
for lb_uuid in self._client.get_children(self.book_path):
|
||||
lb_path = paths.join(self.book_path, lb_uuid)
|
||||
for fd_uuid in self._client.get_children(lb_path):
|
||||
txn.delete(paths.join(lb_path, fd_uuid))
|
||||
txn.delete(lb_path)
|
||||
|
||||
# delete all data under task_detail path
|
||||
for td_uuid in self.backend.zk.get_children(self.task_path):
|
||||
zk_path = paths.join(self.task_path, td_uuid)
|
||||
txn.delete(zk_path)
|
||||
txn.delete(self.task_path)
|
||||
# Delete all data under flowdetail path.
|
||||
for fd_uuid in self._client.get_children(self.flow_path):
|
||||
fd_path = paths.join(self.flow_path, fd_uuid)
|
||||
for td_uuid in self._client.get_children(fd_path):
|
||||
txn.delete(paths.join(fd_path, td_uuid))
|
||||
txn.delete(fd_path)
|
||||
|
||||
# delete top-level path
|
||||
txn.delete(self.backend.path)
|
||||
# Delete all data under taskdetail path.
|
||||
for td_uuid in self._client.get_children(self.task_path):
|
||||
td_path = paths.join(self.task_path, td_uuid)
|
||||
txn.delete(td_path)
|
||||
|
||||
# Delete containing directories.
|
||||
if delete_dirs:
|
||||
txn.delete(self.book_path)
|
||||
txn.delete(self.task_path)
|
||||
txn.delete(self.flow_path)
|
||||
|
||||
@@ -18,30 +18,29 @@
|
||||
|
||||
import contextlib
|
||||
|
||||
from zake import fake_client
|
||||
|
||||
from taskflow.persistence import backends
|
||||
from taskflow.persistence.backends import impl_zookeeper
|
||||
from taskflow import test
|
||||
from taskflow.tests.unit.persistence import base
|
||||
|
||||
|
||||
class ZkPersistenceTest(test.TestCase, base.PersistenceTestMixin):
|
||||
class ZakePersistenceTest(test.TestCase, base.PersistenceTestMixin):
|
||||
def _get_connection(self):
|
||||
return self._backend.get_connection()
|
||||
|
||||
def setUp(self):
|
||||
super(ZkPersistenceTest, self).setUp()
|
||||
super(ZakePersistenceTest, self).setUp()
|
||||
conf = {
|
||||
"path": "/taskflow",
|
||||
}
|
||||
self._backend = impl_zookeeper.ZkBackend(conf)
|
||||
client = fake_client.FakeClient()
|
||||
client.start()
|
||||
self._backend = impl_zookeeper.ZkBackend(conf, client=client)
|
||||
conn = self._backend.get_connection()
|
||||
conn.upgrade()
|
||||
|
||||
def tearDown(self):
|
||||
super(ZkPersistenceTest, self).tearDown()
|
||||
conn = self._get_connection()
|
||||
conn.clear_all()
|
||||
|
||||
def test_zk_persistence_entry_point(self):
|
||||
conf = {'connection': 'zookeeper:'}
|
||||
with contextlib.closing(backends.fetch(conf)) as be:
|
||||
|
||||
54
taskflow/utils/kazoo_utils.py
Normal file
54
taskflow/utils/kazoo_utils.py
Normal file
@@ -0,0 +1,54 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from kazoo import client
|
||||
import six
|
||||
|
||||
|
||||
def _parse_hosts(hosts):
|
||||
if isinstance(hosts, six.string_types):
|
||||
return hosts.strip()
|
||||
if isinstance(hosts, (dict)):
|
||||
host_ports = []
|
||||
for (k, v) in six.iteritems(hosts):
|
||||
host_ports.append("%s:%s" % (k, v))
|
||||
hosts = host_ports
|
||||
if isinstance(hosts, (list, set, tuple)):
|
||||
return ",".join([str(h) for h in hosts])
|
||||
return hosts
|
||||
|
||||
|
||||
def make_client(conf):
|
||||
"""Creates a kazoo client given a configuration dictionary."""
|
||||
client_kwargs = {
|
||||
'read_only': bool(conf.get('read_only')),
|
||||
'randomize_hosts': bool(conf.get('randomize_hosts')),
|
||||
}
|
||||
hosts = _parse_hosts(conf.get("hosts", "localhost:2181"))
|
||||
if not hosts or not isinstance(hosts, six.string_types):
|
||||
raise TypeError("Invalid hosts format, expected "
|
||||
"non-empty string/list, not %s" % type(hosts))
|
||||
client_kwargs['hosts'] = hosts
|
||||
if 'timeout' in conf:
|
||||
client_kwargs['timeout'] = float(conf['timeout'])
|
||||
# Kazoo supports various handlers, gevent, threading, eventlet...
|
||||
# allow the user of this client object to optionally specify one to be
|
||||
# used.
|
||||
if 'handler' in conf:
|
||||
client_kwargs['handler'] = conf['handler']
|
||||
return client.KazooClient(**client_kwargs)
|
||||
@@ -6,4 +6,4 @@ testrepository>=0.0.17
|
||||
testtools>=0.9.32,<0.9.35
|
||||
# ZooKeeper
|
||||
kazoo>=1.3.1
|
||||
zake>=0.0.7
|
||||
zake>=0.0.13
|
||||
|
||||
Reference in New Issue
Block a user