diff --git a/tests/unit/test_zk.py b/tests/unit/test_zk.py index 61c4966f4d..9318ba0ddb 100644 --- a/tests/unit/test_zk.py +++ b/tests/unit/test_zk.py @@ -320,6 +320,27 @@ class TestExecutorApi(ZooKeeperBaseTestCase): items.extend(self._get_zk_tree(path)) return items + def _get_watches(self): + chroot = self.zk_chroot_fixture.zookeeper_chroot + data = self.zk_client.client.command(b'wchp') + ret = {} + sessions = None + for line in data.split('\n'): + if line.startswith('\t'): + if sessions is not None: + sessions.append(line.strip()) + else: + line = line.strip() + if not line: + continue + if line.startswith(chroot): + line = line[len(chroot):] + sessions = [] + ret[line] = sessions + else: + sessions = None + return ret + def test_build_request(self): # Test the lifecycle of a build request request_queue = queue.Queue() @@ -398,6 +419,7 @@ class TestExecutorApi(ZooKeeperBaseTestCase): server.fulfillCancel(a) server.unlock(a) self.assertEqual(client.get(a.path).state, BuildRequest.COMPLETED) + self.assertNotEqual(self._get_watches(), {}) # Scheduler removes build request on completion client.remove(sched_a) @@ -408,6 +430,7 @@ class TestExecutorApi(ZooKeeperBaseTestCase): self.assertEqual(self._get_zk_tree( client.BUILD_REQUEST_ROOT + '/zones'), []) self.assertEqual(self._get_zk_tree(client.LOCK_ROOT), []) + self.assertEqual(self._get_watches(), {}) def test_build_request_remove(self): # Test the scheduler forcibly removing a request (perhaps the diff --git a/tools/zoo.cfg b/tools/zoo.cfg index bf1ac6cf7d..7d9f3cee40 100644 --- a/tools/zoo.cfg +++ b/tools/zoo.cfg @@ -14,3 +14,4 @@ serverCnxnFactory=org.apache.zookeeper.server.NettyServerCnxnFactory secureClientPort=2281 ssl.keyStore.location=/var/certs/keystores/zuul-test-zookeeper.pem ssl.trustStore.location=/var/certs/certs/cacert.pem +4lw.commands.whitelist=* diff --git a/zuul/zk/executor.py b/zuul/zk/executor.py index e11af572ed..2effe7063e 100644 --- a/zuul/zk/executor.py +++ b/zuul/zk/executor.py @@ -27,6 +27,7 @@ from zuul.model import BuildRequest from zuul.zk import ZooKeeperSimpleBase from zuul.zk.exceptions import BuildRequestNotFound from zuul.zk import sharding +from zuul.zk.watchers import ExistingDataWatch class BuildRequestEvent(Enum): @@ -175,8 +176,9 @@ class ExecutorApi(ZooKeeperSimpleBase): ) for req_path in new_build_requests: - self.kazoo_client.DataWatch(req_path, - self._makeBuildStateWatcher(req_path)) + ExistingDataWatch(self.kazoo_client, + req_path, + self._makeBuildStateWatcher(req_path)) # Notify the user about new build requests if a callback is provided, # but only if there are new requests (we don't want to fire on the @@ -313,6 +315,10 @@ class ExecutorApi(ZooKeeperSimpleBase): return build_request def remove(self, build_request): + log = get_annotated_logger( + self.log, event=None, build=build_request.uuid + ) + log.debug("Removing build request %s", build_request) try: # As the build node might contain children (result, data, ...) we # must delete it recursively. @@ -326,6 +332,10 @@ class ExecutorApi(ZooKeeperSimpleBase): self.kazoo_client.delete(path, recursive=True) except NoNodeError: pass + try: + self.kazoo_client.get(build_request.path) + except NoNodeError: + pass def _watchBuildEvents(self, actions, event=None): if event is None: diff --git a/zuul/zk/watchers.py b/zuul/zk/watchers.py new file mode 100644 index 0000000000..bfb9bc1612 --- /dev/null +++ b/zuul/zk/watchers.py @@ -0,0 +1,482 @@ +# This file is from the Kazoo project +# https://github.com/python-zk/kazoo/blob/master/kazoo/recipe/watchers.py +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Higher level child and data watching API's. + +:Maintainer: Ben Bangert +:Status: Production + +.. note:: + + :ref:`DataWatch` and :ref:`ChildrenWatch` may only handle a single + function, attempts to associate a single instance with multiple functions + will result in an exception being thrown. + +""" +from functools import partial, wraps +import logging +import time +import warnings + +from kazoo.exceptions import ( + ConnectionClosedError, + NoNodeError, + KazooException +) +from kazoo.protocol.states import KazooState +from kazoo.retry import KazooRetry + + +log = logging.getLogger(__name__) + + +_STOP_WATCHING = object() + + +def _ignore_closed(func): + @wraps(func) + def wrapper(*args, **kwargs): + try: + return func(*args, **kwargs) + except ConnectionClosedError: + pass + return wrapper + + +class DataWatch(object): + """Watches a node for data updates and calls the specified + function each time it changes + + The function will also be called the very first time its + registered to get the data. + + Returning `False` from the registered function will disable future + data change calls. If the client connection is closed (using the + close command), the DataWatch will no longer get updates. + + If the function supplied takes three arguments, then the third one + will be a :class:`~kazoo.protocol.states.WatchedEvent`. It will + only be set if the change to the data occurs as a result of the + server notifying the watch that there has been a change. Events + like reconnection or the first call will not include an event. + + If the node does not exist, then the function will be called with + ``None`` for all values. + + .. tip:: + + Because :class:`DataWatch` can watch nodes that don't exist, it + can be used alternatively as a higher-level Exists watcher that + survives reconnections and session loss. + + Example with client: + + .. code-block:: python + + @client.DataWatch('/path/to/watch') + def my_func(data, stat): + print("Data is %s" % data) + print("Version is %s" % stat.version) + + # Above function is called immediately and prints + + # Or if you want the event object + @client.DataWatch('/path/to/watch') + def my_func(data, stat, event): + print("Data is %s" % data) + print("Version is %s" % stat.version) + print("Event is %s" % event) + + .. versionchanged:: 1.2 + + DataWatch now ignores additional arguments that were previously + passed to it and warns that they are no longer respected. + + """ + def __init__(self, client, path, func=None, *args, **kwargs): + """Create a data watcher for a path + + :param client: A zookeeper client. + :type client: :class:`~kazoo.client.KazooClient` + :param path: The path to watch for data changes on. + :type path: str + :param func: Function to call initially and every time the + node changes. `func` will be called with a + tuple, the value of the node and a + :class:`~kazoo.client.ZnodeStat` instance. + :type func: callable + + """ + self._client = client + self._path = path + self._func = func + self._stopped = False + self._run_lock = client.handler.lock_object() + self._version = None + self._retry = KazooRetry(max_tries=None, + sleep_func=client.handler.sleep_func) + self._include_event = None + self._ever_called = False + self._used = False + + if args or kwargs: + warnings.warn('Passing additional arguments to DataWatch is' + ' deprecated. ignore_missing_node is now assumed ' + ' to be True by default, and the event will be ' + ' sent if the function can handle receiving it', + DeprecationWarning, stacklevel=2) + + # Register our session listener if we're going to resume + # across session losses + if func is not None: + self._used = True + self._client.add_listener(self._session_watcher) + self._get_data() + + def __call__(self, func): + """Callable version for use as a decorator + + :param func: Function to call initially and every time the + data changes. `func` will be called with a + tuple, the value of the node and a + :class:`~kazoo.client.ZnodeStat` instance. + :type func: callable + + """ + if self._used: + raise KazooException( + "A function has already been associated with this " + "DataWatch instance.") + + self._func = func + + self._used = True + self._client.add_listener(self._session_watcher) + self._get_data() + return func + + def _log_func_exception(self, data, stat, event=None): + try: + # For backwards compatibility, don't send event to the + # callback unless the send_event is set in constructor + if not self._ever_called: + self._ever_called = True + try: + result = self._func(data, stat, event) + except TypeError: + result = self._func(data, stat) + if result is False: + self._stopped = True + self._func = None + self._client.remove_listener(self._session_watcher) + except Exception as exc: + log.exception(exc) + raise + + @_ignore_closed + def _get_data(self, event=None): + # Ensure this runs one at a time, possible because the session + # watcher may trigger a run + with self._run_lock: + if self._stopped: + return + + initial_version = self._version + + try: + data, stat = self._retry(self._client.get, + self._path, self._watcher) + except NoNodeError: + data = None + + # This will set 'stat' to None if the node does not yet + # exist. + stat = self._retry(self._client.exists, self._path, + self._watcher) + if stat: + self._client.handler.spawn(self._get_data) + return + + # No node data, clear out version + if stat is None: + self._version = None + else: + self._version = stat.mzxid + + # Call our function if its the first time ever, or if the + # version has changed + if initial_version != self._version or not self._ever_called: + self._log_func_exception(data, stat, event) + + def _watcher(self, event): + self._get_data(event=event) + + def _set_watch(self, state): + with self._run_lock: + self._watch_established = state + + def _session_watcher(self, state): + if state == KazooState.CONNECTED: + self._client.handler.spawn(self._get_data) + + +class ChildrenWatch(object): + """Watches a node for children updates and calls the specified + function each time it changes + + The function will also be called the very first time its + registered to get children. + + Returning `False` from the registered function will disable future + children change calls. If the client connection is closed (using + the close command), the ChildrenWatch will no longer get updates. + + if send_event=True in __init__, then the function will always be + called with second parameter, ``event``. Upon initial call or when + recovering a lost session the ``event`` is always ``None``. + Otherwise it's a :class:`~kazoo.prototype.state.WatchedEvent` + instance. + + Example with client: + + .. code-block:: python + + @client.ChildrenWatch('/path/to/watch') + def my_func(children): + print "Children are %s" % children + + # Above function is called immediately and prints children + + """ + def __init__(self, client, path, func=None, + allow_session_lost=True, send_event=False): + """Create a children watcher for a path + + :param client: A zookeeper client. + :type client: :class:`~kazoo.client.KazooClient` + :param path: The path to watch for children on. + :type path: str + :param func: Function to call initially and every time the + children change. `func` will be called with a + single argument, the list of children. + :type func: callable + :param allow_session_lost: Whether the watch should be + re-registered if the zookeeper + session is lost. + :type allow_session_lost: bool + :type send_event: bool + :param send_event: Whether the function should be passed the + event sent by ZooKeeper or None upon + initialization (see class documentation) + + The path must already exist for the children watcher to + run. + + """ + self._client = client + self._path = path + self._func = func + self._send_event = send_event + self._stopped = False + self._watch_established = False + self._allow_session_lost = allow_session_lost + self._run_lock = client.handler.lock_object() + self._prior_children = None + self._used = False + + # Register our session listener if we're going to resume + # across session losses + if func is not None: + self._used = True + if allow_session_lost: + self._client.add_listener(self._session_watcher) + self._get_children() + + def __call__(self, func): + """Callable version for use as a decorator + + :param func: Function to call initially and every time the + children change. `func` will be called with a + single argument, the list of children. + :type func: callable + + """ + if self._used: + raise KazooException( + "A function has already been associated with this " + "ChildrenWatch instance.") + + self._func = func + + self._used = True + if self._allow_session_lost: + self._client.add_listener(self._session_watcher) + self._get_children() + return func + + @_ignore_closed + def _get_children(self, event=None): + with self._run_lock: # Ensure this runs one at a time + if self._stopped: + return + + try: + children = self._client.retry(self._client.get_children, + self._path, self._watcher) + except NoNodeError: + self._stopped = True + return + + if not self._watch_established: + self._watch_established = True + + if self._prior_children is not None and \ + self._prior_children == children: + return + + self._prior_children = children + + try: + if self._send_event: + result = self._func(children, event) + else: + result = self._func(children) + if result is False: + self._stopped = True + self._func = None + if self._allow_session_lost: + self._client.remove_listener(self._session_watcher) + except Exception as exc: + log.exception(exc) + raise + + def _watcher(self, event): + if event.type != "NONE": + self._get_children(event) + + def _session_watcher(self, state): + if state in (KazooState.LOST, KazooState.SUSPENDED): + self._watch_established = False + elif (state == KazooState.CONNECTED and + not self._watch_established and not self._stopped): + self._client.handler.spawn(self._get_children) + + +class PatientChildrenWatch(object): + """Patient Children Watch that returns values after the children + of a node don't change for a period of time + + A separate watcher for the children of a node, that ignores + changes within a boundary time and sets the result only when the + boundary time has elapsed with no children changes. + + Example:: + + watcher = PatientChildrenWatch(client, '/some/path', + time_boundary=5) + async_object = watcher.start() + + # Blocks until the children have not changed for time boundary + # (5 in this case) seconds, returns children list and an + # async_result that will be set if the children change in the + # future + children, child_async = async_object.get() + + .. note:: + + This Watch is different from :class:`DataWatch` and + :class:`ChildrenWatch` as it only returns once, does not take + a function that is called, and provides an + :class:`~kazoo.interfaces.IAsyncResult` object that can be + checked to see if the children have changed later. + + """ + def __init__(self, client, path, time_boundary=30): + self.client = client + self.path = path + self.children = [] + self.time_boundary = time_boundary + self.children_changed = client.handler.event_object() + + def start(self): + """Begin the watching process asynchronously + + :returns: An :class:`~kazoo.interfaces.IAsyncResult` instance + that will be set when no change has occurred to the + children for time boundary seconds. + + """ + self.asy = asy = self.client.handler.async_result() + self.client.handler.spawn(self._inner_start) + return asy + + def _inner_start(self): + try: + while True: + async_result = self.client.handler.async_result() + self.children = self.client.retry( + self.client.get_children, self.path, + partial(self._children_watcher, async_result)) + self.client.handler.sleep_func(self.time_boundary) + + if self.children_changed.is_set(): + self.children_changed.clear() + else: + break + + self.asy.set((self.children, async_result)) + except Exception as exc: + self.asy.set_exception(exc) + + def _children_watcher(self, async_result, event): + self.children_changed.set() + async_result.set(time.time()) + + +# Local modifications below: +# +class ExistingDataWatch(DataWatch): + @_ignore_closed + def _get_data(self, event=None): + # Ensure this runs one at a time, possible because the session + # watcher may trigger a run + with self._run_lock: + if self._stopped: + return + + initial_version = self._version + + try: + data, stat = self._retry(self._client.get, + self._path, self._watcher) + except NoNodeError: + data = stat = None + + # No node data, clear out version + if stat is None: + self._version = None + else: + self._version = stat.mzxid + + # Call our function if its the first time ever, or if the + # version has changed + if initial_version != self._version or not self._ever_called: + self._log_func_exception(data, stat, event) + + # If the node doesn't exist, we won't be watching any more + if stat is None: + self._stopped = True + self._func = None + self._client.remove_listener(self._session_watcher)