In the zookeeper jobboard (and persistence backends) we are associating the cause of a new exception when raising a exception. Using the new exception helper we can make this work better on py2.x and py3.x so that the py3.x version has the chain setup correctly (while the py2.x version just uses the class 'cause' attribute instead). Change-Id: Ieeac2f70e1834d4612556565762ffd3be3e5b5a1
157 lines
5.4 KiB
Python
157 lines
5.4 KiB
Python
# -*- coding: utf-8 -*-
|
|
|
|
# Copyright (C) 2014 AT&T Labs All Rights Reserved.
|
|
# Copyright (C) 2015 Rackspace Hosting All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import contextlib
|
|
|
|
from kazoo import exceptions as k_exc
|
|
from kazoo.protocol import paths
|
|
from oslo_serialization import jsonutils
|
|
|
|
from taskflow import exceptions as exc
|
|
from taskflow.persistence import path_based
|
|
from taskflow.utils import kazoo_utils as k_utils
|
|
from taskflow.utils import misc
|
|
|
|
|
|
MIN_ZK_VERSION = (3, 4, 0)
|
|
|
|
|
|
class ZkBackend(path_based.PathBasedBackend):
|
|
"""A zookeeper-backed backend.
|
|
|
|
Example configuration::
|
|
|
|
conf = {
|
|
"hosts": "192.168.0.1:2181,192.168.0.2:2181,192.168.0.3:2181",
|
|
"path": "/taskflow",
|
|
}
|
|
"""
|
|
def __init__(self, conf, client=None):
|
|
super(ZkBackend, self).__init__(conf)
|
|
if not self._path:
|
|
self._path = '/taskflow'
|
|
if not paths.isabs(self._path):
|
|
raise ValueError("Zookeeper path must be absolute")
|
|
if client is not None:
|
|
self._client = client
|
|
self._owned = False
|
|
else:
|
|
self._client = k_utils.make_client(self._conf)
|
|
self._owned = True
|
|
self._validated = False
|
|
|
|
def get_connection(self):
|
|
conn = ZkConnection(self, self._client, self._conf)
|
|
if not self._validated:
|
|
conn.validate()
|
|
self._validated = True
|
|
return conn
|
|
|
|
def close(self):
|
|
self._validated = False
|
|
if not self._owned:
|
|
return
|
|
try:
|
|
k_utils.finalize_client(self._client)
|
|
except (k_exc.KazooException, k_exc.ZookeeperError):
|
|
exc.raise_with_cause(exc.StorageFailure,
|
|
"Unable to finalize client")
|
|
|
|
|
|
class ZkConnection(path_based.PathBasedConnection):
|
|
def __init__(self, backend, client, conf):
|
|
super(ZkConnection, self).__init__(backend)
|
|
self._conf = conf
|
|
self._client = client
|
|
with self._exc_wrapper():
|
|
# NOOP if already started.
|
|
self._client.start()
|
|
|
|
@contextlib.contextmanager
|
|
def _exc_wrapper(self):
|
|
"""Exception context-manager which wraps kazoo exceptions.
|
|
|
|
This is used to capture and wrap any kazoo specific exceptions and
|
|
then group them into corresponding taskflow exceptions (not doing
|
|
that would expose the underlying kazoo exception model).
|
|
"""
|
|
try:
|
|
yield
|
|
except self._client.handler.timeout_exception:
|
|
exc.raise_with_cause(exc.StorageFailure,
|
|
"Storage backend timeout")
|
|
except k_exc.SessionExpiredError:
|
|
exc.raise_with_cause(exc.StorageFailure,
|
|
"Storage backend session has expired")
|
|
except k_exc.NoNodeError as e:
|
|
exc.raise_with_cause(exc.NotFound,
|
|
"Storage backend node not found: %s" % e)
|
|
except k_exc.NodeExistsError as e:
|
|
exc.raise_with_cause(exc.Duplicate,
|
|
"Storage backend duplicate node: %s" % e)
|
|
except (k_exc.KazooException, k_exc.ZookeeperError):
|
|
exc.raise_with_cause(exc.StorageFailure,
|
|
"Storage backend internal error")
|
|
|
|
def _join_path(self, *parts):
|
|
return paths.join(*parts)
|
|
|
|
def _get_item(self, path):
|
|
with self._exc_wrapper():
|
|
data, _ = self._client.get(path)
|
|
return misc.decode_json(data)
|
|
|
|
def _set_item(self, path, value, transaction):
|
|
data = misc.binary_encode(jsonutils.dumps(value))
|
|
if not self._client.exists(path):
|
|
transaction.create(path, data)
|
|
else:
|
|
transaction.set_data(path, data)
|
|
|
|
def _del_tree(self, path, transaction):
|
|
for child in self._get_children(path):
|
|
self._del_tree(self._join_path(path, child), transaction)
|
|
transaction.delete(path)
|
|
|
|
def _get_children(self, path):
|
|
with self._exc_wrapper():
|
|
return self._client.get_children(path)
|
|
|
|
def _ensure_path(self, path):
|
|
with self._exc_wrapper():
|
|
self._client.ensure_path(path)
|
|
|
|
def _create_link(self, src_path, dest_path, transaction):
|
|
if not self._client.exists(dest_path):
|
|
transaction.create(dest_path)
|
|
|
|
@contextlib.contextmanager
|
|
def _transaction(self):
|
|
transaction = self._client.transaction()
|
|
with self._exc_wrapper():
|
|
yield transaction
|
|
k_utils.checked_commit(transaction)
|
|
|
|
def validate(self):
|
|
with self._exc_wrapper():
|
|
try:
|
|
if self._conf.get('check_compatible', True):
|
|
k_utils.check_compatible(self._client, MIN_ZK_VERSION)
|
|
except exc.IncompatibleVersion:
|
|
exc.raise_with_cause(exc.StorageFailure, "Backend storage is"
|
|
" not a compatible version")
|