Vendor and use fixed Kazoo read/write lock
The Kazoo read/write lock implementation can deadlock under certain conditions. This should be fixed in https://github.com/python-zk/kazoo/pull/650. We will vendor the lock recipe until the issue is resolved upstream. Existing read/write locks are update to use the vendored version. To avoid name collisions the vendored recipe is located in zuul/zk/vendor. The existing watcher recipe was also moved to this sub-directory. Change-Id: If3de8419931dfcb7ccda62527583167135d9ba02
This commit is contained in:
parent
f8f7509ff2
commit
3054122d1a
@ -22,6 +22,7 @@ from urllib.parse import quote_plus, unquote_plus
|
||||
from kazoo.exceptions import NoNodeError
|
||||
|
||||
from zuul.zk import sharding, ZooKeeperSimpleBase
|
||||
from zuul.zk.vendor import lock
|
||||
|
||||
CONFIG_ROOT = "/zuul/config"
|
||||
|
||||
@ -144,12 +145,12 @@ class UnparsedConfigCache(ZooKeeperSimpleBase):
|
||||
self.lock_path = f"{config_root}/lock"
|
||||
|
||||
def readLock(self, project_cname):
|
||||
return self.kazoo_client.ReadLock(
|
||||
_safe_path(self.lock_path, project_cname))
|
||||
return lock.ReadLock(
|
||||
self.kazoo_client, _safe_path(self.lock_path, project_cname))
|
||||
|
||||
def writeLock(self, project_cname):
|
||||
return self.kazoo_client.WriteLock(
|
||||
_safe_path(self.lock_path, project_cname))
|
||||
return lock.WriteLock(
|
||||
self.kazoo_client, _safe_path(self.lock_path, project_cname))
|
||||
|
||||
def getFilesCache(self, project_cname, branch_name):
|
||||
path = _safe_path(self.cache_path, project_cname, branch_name)
|
||||
|
@ -27,7 +27,7 @@ from zuul.model import BuildRequest
|
||||
from zuul.zk import ZooKeeperSimpleBase
|
||||
from zuul.zk.exceptions import BuildRequestNotFound
|
||||
from zuul.zk import sharding
|
||||
from zuul.zk.watchers import ExistingDataWatch
|
||||
from zuul.zk.vendor.watchers import ExistingDataWatch
|
||||
|
||||
|
||||
class BuildRequestEvent(Enum):
|
||||
|
@ -17,6 +17,7 @@ from contextlib import contextmanager
|
||||
from urllib.parse import quote_plus
|
||||
|
||||
from zuul.zk.exceptions import LockException
|
||||
from zuul.zk.vendor.lock import ReadLock, WriteLock
|
||||
|
||||
LOCK_ROOT = "/zuul/locks"
|
||||
TENANT_LOCK_ROOT = f"{LOCK_ROOT}/tenant"
|
||||
@ -40,7 +41,7 @@ def locked(lock, blocking=True, timeout=None):
|
||||
def tenant_read_lock(client, tenant_name, blocking=True):
|
||||
safe_tenant = quote_plus(tenant_name)
|
||||
with locked(
|
||||
client.client.ReadLock(f"{TENANT_LOCK_ROOT}/{safe_tenant}"),
|
||||
ReadLock(client.client, f"{TENANT_LOCK_ROOT}/{safe_tenant}"),
|
||||
blocking=blocking
|
||||
) as lock:
|
||||
yield lock
|
||||
@ -50,7 +51,7 @@ def tenant_read_lock(client, tenant_name, blocking=True):
|
||||
def tenant_write_lock(client, tenant_name, blocking=True):
|
||||
safe_tenant = quote_plus(tenant_name)
|
||||
with locked(
|
||||
client.client.WriteLock(f"{TENANT_LOCK_ROOT}/{safe_tenant}"),
|
||||
WriteLock(client.client, f"{TENANT_LOCK_ROOT}/{safe_tenant}"),
|
||||
blocking=blocking
|
||||
) as lock:
|
||||
yield lock
|
||||
|
0
zuul/zk/vendor/__init__.py
vendored
Normal file
0
zuul/zk/vendor/__init__.py
vendored
Normal file
759
zuul/zk/vendor/lock.py
vendored
Normal file
759
zuul/zk/vendor/lock.py
vendored
Normal file
@ -0,0 +1,759 @@
|
||||
# This file is from the Kazoo project and contains fixes proposed in
|
||||
# https://github.com/python-zk/kazoo/pull/650
|
||||
#
|
||||
# https://github.com/python-zk/kazoo/blob/master/kazoo/recipe/lock.py
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
"""Zookeeper Locking Implementations
|
||||
|
||||
:Maintainer: Ben Bangert <ben@groovie.org>
|
||||
:Status: Production
|
||||
|
||||
Error Handling
|
||||
==============
|
||||
|
||||
It's highly recommended to add a state listener with
|
||||
:meth:`~KazooClient.add_listener` and watch for
|
||||
:attr:`~KazooState.LOST` and :attr:`~KazooState.SUSPENDED` state
|
||||
changes and re-act appropriately. In the event that a
|
||||
:attr:`~KazooState.LOST` state occurs, its certain that the lock
|
||||
and/or the lease has been lost.
|
||||
|
||||
"""
|
||||
import re
|
||||
import sys
|
||||
|
||||
try:
|
||||
from time import monotonic as now
|
||||
except ImportError:
|
||||
from time import time as now
|
||||
import uuid
|
||||
|
||||
import six
|
||||
|
||||
from kazoo.exceptions import (
|
||||
CancelledError,
|
||||
KazooException,
|
||||
LockTimeout,
|
||||
NoNodeError,
|
||||
)
|
||||
from kazoo.protocol.states import KazooState
|
||||
from kazoo.retry import (
|
||||
ForceRetryError,
|
||||
KazooRetry,
|
||||
RetryFailedError,
|
||||
)
|
||||
|
||||
|
||||
class _Watch(object):
|
||||
def __init__(self, duration=None):
|
||||
self.duration = duration
|
||||
self.started_at = None
|
||||
|
||||
def start(self):
|
||||
self.started_at = now()
|
||||
|
||||
def leftover(self):
|
||||
if self.duration is None:
|
||||
return None
|
||||
else:
|
||||
elapsed = now() - self.started_at
|
||||
return max(0, self.duration - elapsed)
|
||||
|
||||
|
||||
class Lock(object):
|
||||
"""Kazoo Lock
|
||||
|
||||
Example usage with a :class:`~kazoo.client.KazooClient` instance:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
zk = KazooClient()
|
||||
zk.start()
|
||||
lock = zk.Lock("/lockpath", "my-identifier")
|
||||
with lock: # blocks waiting for lock acquisition
|
||||
# do something with the lock
|
||||
|
||||
Note: This lock is not *re-entrant*. Repeated calls after already
|
||||
acquired will block.
|
||||
|
||||
This is an exclusive lock. For a read/write lock, see :class:`WriteLock`
|
||||
and :class:`ReadLock`.
|
||||
|
||||
"""
|
||||
|
||||
# Node name, after the contender UUID, before the sequence
|
||||
# number. Involved in read/write locks.
|
||||
_NODE_NAME = "__lock__"
|
||||
|
||||
# Node names which exclude this contender when present at a lower
|
||||
# sequence number. Involved in read/write locks.
|
||||
_EXCLUDE_NAMES = ["__lock__"]
|
||||
|
||||
def __init__(self, client, path, identifier=None, extra_lock_patterns=()):
|
||||
"""Create a Kazoo lock.
|
||||
|
||||
:param client: A :class:`~kazoo.client.KazooClient` instance.
|
||||
:param path: The lock path to use.
|
||||
:param identifier: Name to use for this lock contender. This can be
|
||||
useful for querying to see who the current lock
|
||||
contenders are.
|
||||
:param extra_lock_patterns: Strings that will be used to
|
||||
identify other znode in the path
|
||||
that should be considered contenders
|
||||
for this lock.
|
||||
Use this for cross-implementation
|
||||
compatibility.
|
||||
|
||||
.. versionadded:: 2.7.1
|
||||
The extra_lock_patterns option.
|
||||
"""
|
||||
self.client = client
|
||||
self.path = path
|
||||
self._exclude_names = set(
|
||||
self._EXCLUDE_NAMES + list(extra_lock_patterns)
|
||||
)
|
||||
self._contenders_re = re.compile(
|
||||
r"(?:{patterns})(-?\d{{10}})$".format(
|
||||
patterns="|".join(self._exclude_names)
|
||||
)
|
||||
)
|
||||
|
||||
# some data is written to the node. this can be queried via
|
||||
# contenders() to see who is contending for the lock
|
||||
self.data = str(identifier or "").encode("utf-8")
|
||||
self.node = None
|
||||
|
||||
self.wake_event = client.handler.event_object()
|
||||
|
||||
# props to Netflix Curator for this trick. It is possible for our
|
||||
# create request to succeed on the server, but for a failure to
|
||||
# prevent us from getting back the full path name. We prefix our
|
||||
# lock name with a uuid and can check for its presence on retry.
|
||||
self.prefix = uuid.uuid4().hex + self._NODE_NAME
|
||||
self.create_path = self.path + "/" + self.prefix
|
||||
|
||||
self.create_tried = False
|
||||
self.is_acquired = False
|
||||
self.assured_path = False
|
||||
self.cancelled = False
|
||||
self._retry = KazooRetry(
|
||||
max_tries=None, sleep_func=client.handler.sleep_func
|
||||
)
|
||||
self._lock = client.handler.lock_object()
|
||||
|
||||
def _ensure_path(self):
|
||||
self.client.ensure_path(self.path)
|
||||
self.assured_path = True
|
||||
|
||||
def cancel(self):
|
||||
"""Cancel a pending lock acquire."""
|
||||
self.cancelled = True
|
||||
self.wake_event.set()
|
||||
|
||||
def acquire(self, blocking=True, timeout=None, ephemeral=True):
|
||||
"""
|
||||
Acquire the lock. By defaults blocks and waits forever.
|
||||
|
||||
:param blocking: Block until lock is obtained or return immediately.
|
||||
:type blocking: bool
|
||||
:param timeout: Don't wait forever to acquire the lock.
|
||||
:type timeout: float or None
|
||||
:param ephemeral: Don't use ephemeral znode for the lock.
|
||||
:type ephemeral: bool
|
||||
|
||||
:returns: Was the lock acquired?
|
||||
:rtype: bool
|
||||
|
||||
:raises: :exc:`~kazoo.exceptions.LockTimeout` if the lock
|
||||
wasn't acquired within `timeout` seconds.
|
||||
|
||||
.. warning::
|
||||
|
||||
When :attr:`ephemeral` is set to False session expiration
|
||||
will not release the lock and must be handled separately.
|
||||
|
||||
.. versionadded:: 1.1
|
||||
The timeout option.
|
||||
|
||||
.. versionadded:: 2.4.1
|
||||
The ephemeral option.
|
||||
"""
|
||||
|
||||
def _acquire_lock():
|
||||
got_it = self._lock.acquire(False)
|
||||
if not got_it:
|
||||
raise ForceRetryError()
|
||||
return True
|
||||
|
||||
retry = self._retry.copy()
|
||||
retry.deadline = timeout
|
||||
|
||||
# Ensure we are locked so that we avoid multiple threads in
|
||||
# this acquistion routine at the same time...
|
||||
locked = self._lock.acquire(False)
|
||||
if not locked and not blocking:
|
||||
return False
|
||||
if not locked:
|
||||
# Lock acquire doesn't take a timeout, so simulate it...
|
||||
# XXX: This is not true in Py3 >= 3.2
|
||||
try:
|
||||
locked = retry(_acquire_lock)
|
||||
except RetryFailedError:
|
||||
return False
|
||||
already_acquired = self.is_acquired
|
||||
try:
|
||||
gotten = False
|
||||
try:
|
||||
gotten = retry(
|
||||
self._inner_acquire,
|
||||
blocking=blocking,
|
||||
timeout=timeout,
|
||||
ephemeral=ephemeral,
|
||||
)
|
||||
except RetryFailedError:
|
||||
pass
|
||||
except KazooException:
|
||||
# if we did ultimately fail, attempt to clean up
|
||||
exc_info = sys.exc_info()
|
||||
if not already_acquired:
|
||||
self._best_effort_cleanup()
|
||||
self.cancelled = False
|
||||
six.reraise(exc_info[0], exc_info[1], exc_info[2])
|
||||
if gotten:
|
||||
self.is_acquired = gotten
|
||||
if not gotten and not already_acquired:
|
||||
self._best_effort_cleanup()
|
||||
return gotten
|
||||
finally:
|
||||
self._lock.release()
|
||||
|
||||
def _watch_session(self, state):
|
||||
self.wake_event.set()
|
||||
return True
|
||||
|
||||
def _inner_acquire(self, blocking, timeout, ephemeral=True):
|
||||
|
||||
# wait until it's our chance to get it..
|
||||
if self.is_acquired:
|
||||
if not blocking:
|
||||
return False
|
||||
raise ForceRetryError()
|
||||
|
||||
# make sure our election parent node exists
|
||||
if not self.assured_path:
|
||||
self._ensure_path()
|
||||
|
||||
node = None
|
||||
if self.create_tried:
|
||||
node = self._find_node()
|
||||
else:
|
||||
self.create_tried = True
|
||||
|
||||
if not node:
|
||||
node = self.client.create(
|
||||
self.create_path, self.data, ephemeral=ephemeral, sequence=True
|
||||
)
|
||||
# strip off path to node
|
||||
node = node[len(self.path) + 1:]
|
||||
|
||||
self.node = node
|
||||
|
||||
while True:
|
||||
self.wake_event.clear()
|
||||
|
||||
# bail out with an exception if cancellation has been requested
|
||||
if self.cancelled:
|
||||
raise CancelledError()
|
||||
|
||||
predecessor = self._get_predecessor(node)
|
||||
if predecessor is None:
|
||||
return True
|
||||
|
||||
if not blocking:
|
||||
return False
|
||||
|
||||
# otherwise we are in the mix. watch predecessor and bide our time
|
||||
predecessor = self.path + "/" + predecessor
|
||||
self.client.add_listener(self._watch_session)
|
||||
try:
|
||||
self.client.get(predecessor, self._watch_predecessor)
|
||||
except NoNodeError:
|
||||
pass # predecessor has already been deleted
|
||||
else:
|
||||
self.wake_event.wait(timeout)
|
||||
if not self.wake_event.isSet():
|
||||
raise LockTimeout(
|
||||
"Failed to acquire lock on %s after %s seconds"
|
||||
% (self.path, timeout)
|
||||
)
|
||||
finally:
|
||||
self.client.remove_listener(self._watch_session)
|
||||
|
||||
def _watch_predecessor(self, event):
|
||||
self.wake_event.set()
|
||||
|
||||
def _get_predecessor(self, node):
|
||||
"""returns `node`'s predecessor or None
|
||||
|
||||
Note: This handle the case where the current lock is not a contender
|
||||
(e.g. rlock), this and also edge cases where the lock's ephemeral node
|
||||
is gone.
|
||||
"""
|
||||
node_sequence = node[len(self.prefix):]
|
||||
children = self.client.get_children(self.path)
|
||||
found_self = False
|
||||
# Filter out the contenders using the computed regex
|
||||
contender_matches = []
|
||||
for child in children:
|
||||
match = self._contenders_re.search(child)
|
||||
if match is not None:
|
||||
contender_sequence = match.group(1)
|
||||
# Only consider contenders with a smaller sequence number.
|
||||
# A contender with a smaller sequence number has a higher
|
||||
# priority.
|
||||
if contender_sequence < node_sequence:
|
||||
contender_matches.append(match)
|
||||
if child == node:
|
||||
# Remember the node's match object so we can short circuit
|
||||
# below.
|
||||
found_self = match
|
||||
|
||||
if found_self is False: # pragma: nocover
|
||||
# somehow we aren't in the childrens -- probably we are
|
||||
# recovering from a session failure and our ephemeral
|
||||
# node was removed.
|
||||
raise ForceRetryError()
|
||||
|
||||
if not contender_matches:
|
||||
return None
|
||||
|
||||
# Sort the contenders using the sequence number extracted by the regex
|
||||
# and return the original string of the predecessor.
|
||||
sorted_matches = sorted(contender_matches, key=lambda m: m.groups())
|
||||
return sorted_matches[-1].string
|
||||
|
||||
def _find_node(self):
|
||||
children = self.client.get_children(self.path)
|
||||
for child in children:
|
||||
if child.startswith(self.prefix):
|
||||
return child
|
||||
return None
|
||||
|
||||
def _delete_node(self, node):
|
||||
self.client.delete(self.path + "/" + node)
|
||||
|
||||
def _best_effort_cleanup(self):
|
||||
try:
|
||||
node = self.node or self._find_node()
|
||||
if node:
|
||||
self._delete_node(node)
|
||||
except KazooException: # pragma: nocover
|
||||
pass
|
||||
|
||||
def release(self):
|
||||
"""Release the lock immediately."""
|
||||
return self.client.retry(self._inner_release)
|
||||
|
||||
def _inner_release(self):
|
||||
if not self.is_acquired:
|
||||
return False
|
||||
|
||||
try:
|
||||
self._delete_node(self.node)
|
||||
except NoNodeError: # pragma: nocover
|
||||
pass
|
||||
|
||||
self.is_acquired = False
|
||||
self.node = None
|
||||
return True
|
||||
|
||||
def contenders(self):
|
||||
"""Return an ordered list of the current contenders for the
|
||||
lock.
|
||||
|
||||
.. note::
|
||||
|
||||
If the contenders did not set an identifier, it will appear
|
||||
as a blank string.
|
||||
|
||||
"""
|
||||
# make sure our election parent node exists
|
||||
if not self.assured_path:
|
||||
self._ensure_path()
|
||||
|
||||
children = self.client.get_children(self.path)
|
||||
# We want all contenders, including self (this is especially important
|
||||
# for r/w locks). This is similar to the logic of `_get_predecessor`
|
||||
# except we include our own pattern.
|
||||
all_contenders_re = re.compile(
|
||||
r"(?:{patterns})(-?\d{{10}})$".format(
|
||||
patterns="|".join(self._exclude_names | {self._NODE_NAME})
|
||||
)
|
||||
)
|
||||
# Filter out the contenders using the computed regex
|
||||
contender_matches = []
|
||||
for child in children:
|
||||
match = all_contenders_re.search(child)
|
||||
if match is not None:
|
||||
contender_matches.append(match)
|
||||
# Sort the contenders using the sequence number extracted by the regex,
|
||||
# then extract the original string.
|
||||
contender_nodes = [
|
||||
match.string
|
||||
for match in sorted(contender_matches, key=lambda m: m.groups())
|
||||
]
|
||||
# Retrieve all the contender nodes data (preserving order).
|
||||
contenders = []
|
||||
for node in contender_nodes:
|
||||
try:
|
||||
data, stat = self.client.get(self.path + "/" + node)
|
||||
if data is not None:
|
||||
contenders.append(data.decode("utf-8"))
|
||||
except NoNodeError: # pragma: nocover
|
||||
pass
|
||||
|
||||
return contenders
|
||||
|
||||
def __enter__(self):
|
||||
self.acquire()
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
self.release()
|
||||
|
||||
|
||||
class WriteLock(Lock):
|
||||
"""Kazoo Write Lock
|
||||
|
||||
Example usage with a :class:`~kazoo.client.KazooClient` instance:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
zk = KazooClient()
|
||||
zk.start()
|
||||
lock = zk.WriteLock("/lockpath", "my-identifier")
|
||||
with lock: # blocks waiting for lock acquisition
|
||||
# do something with the lock
|
||||
|
||||
The lock path passed to WriteLock and ReadLock must match for them to
|
||||
communicate. The write lock can not be acquired if it is held by
|
||||
any readers or writers.
|
||||
|
||||
Note: This lock is not *re-entrant*. Repeated calls after already
|
||||
acquired will block.
|
||||
|
||||
This is the write-side of a shared lock. See :class:`Lock` for a
|
||||
standard exclusive lock and :class:`ReadLock` for the read-side of a
|
||||
shared lock.
|
||||
|
||||
"""
|
||||
|
||||
_NODE_NAME = "__lock__"
|
||||
_EXCLUDE_NAMES = ["__lock__", "__rlock__"]
|
||||
|
||||
|
||||
class ReadLock(Lock):
|
||||
"""Kazoo Read Lock
|
||||
|
||||
Example usage with a :class:`~kazoo.client.KazooClient` instance:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
zk = KazooClient()
|
||||
zk.start()
|
||||
lock = zk.ReadLock("/lockpath", "my-identifier")
|
||||
with lock: # blocks waiting for outstanding writers
|
||||
# do something with the lock
|
||||
|
||||
The lock path passed to WriteLock and ReadLock must match for them to
|
||||
communicate. The read lock blocks if it is held by any writers,
|
||||
but multiple readers may hold the lock.
|
||||
|
||||
Note: This lock is not *re-entrant*. Repeated calls after already
|
||||
acquired will block.
|
||||
|
||||
This is the read-side of a shared lock. See :class:`Lock` for a
|
||||
standard exclusive lock and :class:`WriteLock` for the write-side of a
|
||||
shared lock.
|
||||
|
||||
"""
|
||||
|
||||
_NODE_NAME = "__rlock__"
|
||||
_EXCLUDE_NAMES = ["__lock__"]
|
||||
|
||||
|
||||
class Semaphore(object):
|
||||
"""A Zookeeper-based Semaphore
|
||||
|
||||
This synchronization primitive operates in the same manner as the
|
||||
Python threading version only uses the concept of leases to
|
||||
indicate how many available leases are available for the lock
|
||||
rather than counting.
|
||||
|
||||
Note: This lock is not meant to be *re-entrant*.
|
||||
|
||||
Example:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
zk = KazooClient()
|
||||
semaphore = zk.Semaphore("/leasepath", "my-identifier")
|
||||
with semaphore: # blocks waiting for lock acquisition
|
||||
# do something with the semaphore
|
||||
|
||||
.. warning::
|
||||
|
||||
This class stores the allowed max_leases as the data on the
|
||||
top-level semaphore node. The stored value is checked once
|
||||
against the max_leases of each instance. This check is
|
||||
performed when acquire is called the first time. The semaphore
|
||||
node needs to be deleted to change the allowed leases.
|
||||
|
||||
.. versionadded:: 0.6
|
||||
The Semaphore class.
|
||||
|
||||
.. versionadded:: 1.1
|
||||
The max_leases check.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, client, path, identifier=None, max_leases=1):
|
||||
"""Create a Kazoo Lock
|
||||
|
||||
:param client: A :class:`~kazoo.client.KazooClient` instance.
|
||||
:param path: The semaphore path to use.
|
||||
:param identifier: Name to use for this lock contender. This
|
||||
can be useful for querying to see who the
|
||||
current lock contenders are.
|
||||
:param max_leases: The maximum amount of leases available for
|
||||
the semaphore.
|
||||
|
||||
"""
|
||||
# Implementation notes about how excessive thundering herd
|
||||
# and watches are avoided
|
||||
# - A node (lease pool) holds children for each lease in use
|
||||
# - A lock is acquired for a process attempting to acquire a
|
||||
# lease. If a lease is available, the ephemeral node is
|
||||
# created in the lease pool and the lock is released.
|
||||
# - Only the lock holder watches for children changes in the
|
||||
# lease pool
|
||||
self.client = client
|
||||
self.path = path
|
||||
|
||||
# some data is written to the node. this can be queried via
|
||||
# contenders() to see who is contending for the lock
|
||||
self.data = str(identifier or "").encode("utf-8")
|
||||
self.max_leases = max_leases
|
||||
self.wake_event = client.handler.event_object()
|
||||
|
||||
self.create_path = self.path + "/" + uuid.uuid4().hex
|
||||
self.lock_path = path + "-" + "__lock__"
|
||||
self.is_acquired = False
|
||||
self.assured_path = False
|
||||
self.cancelled = False
|
||||
self._session_expired = False
|
||||
|
||||
def _ensure_path(self):
|
||||
result = self.client.ensure_path(self.path)
|
||||
self.assured_path = True
|
||||
if result is True:
|
||||
# node did already exist
|
||||
data, _ = self.client.get(self.path)
|
||||
try:
|
||||
leases = int(data.decode("utf-8"))
|
||||
except (ValueError, TypeError):
|
||||
# ignore non-numeric data, maybe the node data is used
|
||||
# for other purposes
|
||||
pass
|
||||
else:
|
||||
if leases != self.max_leases:
|
||||
raise ValueError(
|
||||
"Inconsistent max leases: %s, expected: %s"
|
||||
% (leases, self.max_leases)
|
||||
)
|
||||
else:
|
||||
self.client.set(self.path, str(self.max_leases).encode("utf-8"))
|
||||
|
||||
def cancel(self):
|
||||
"""Cancel a pending semaphore acquire."""
|
||||
self.cancelled = True
|
||||
self.wake_event.set()
|
||||
|
||||
def acquire(self, blocking=True, timeout=None):
|
||||
"""Acquire the semaphore. By defaults blocks and waits forever.
|
||||
|
||||
:param blocking: Block until semaphore is obtained or
|
||||
return immediately.
|
||||
:type blocking: bool
|
||||
:param timeout: Don't wait forever to acquire the semaphore.
|
||||
:type timeout: float or None
|
||||
|
||||
:returns: Was the semaphore acquired?
|
||||
:rtype: bool
|
||||
|
||||
:raises:
|
||||
ValueError if the max_leases value doesn't match the
|
||||
stored value.
|
||||
|
||||
:exc:`~kazoo.exceptions.LockTimeout` if the semaphore
|
||||
wasn't acquired within `timeout` seconds.
|
||||
|
||||
.. versionadded:: 1.1
|
||||
The blocking, timeout arguments and the max_leases check.
|
||||
"""
|
||||
# If the semaphore had previously been canceled, make sure to
|
||||
# reset that state.
|
||||
self.cancelled = False
|
||||
|
||||
try:
|
||||
self.is_acquired = self.client.retry(
|
||||
self._inner_acquire, blocking=blocking, timeout=timeout
|
||||
)
|
||||
except KazooException:
|
||||
# if we did ultimately fail, attempt to clean up
|
||||
self._best_effort_cleanup()
|
||||
self.cancelled = False
|
||||
raise
|
||||
|
||||
return self.is_acquired
|
||||
|
||||
def _inner_acquire(self, blocking, timeout=None):
|
||||
"""Inner loop that runs from the top anytime a command hits a
|
||||
retryable Zookeeper exception."""
|
||||
self._session_expired = False
|
||||
self.client.add_listener(self._watch_session)
|
||||
|
||||
if not self.assured_path:
|
||||
self._ensure_path()
|
||||
|
||||
# Do we already have a lease?
|
||||
if self.client.exists(self.create_path):
|
||||
return True
|
||||
|
||||
w = _Watch(duration=timeout)
|
||||
w.start()
|
||||
lock = self.client.Lock(self.lock_path, self.data)
|
||||
try:
|
||||
gotten = lock.acquire(blocking=blocking, timeout=w.leftover())
|
||||
if not gotten:
|
||||
return False
|
||||
while True:
|
||||
self.wake_event.clear()
|
||||
|
||||
# Attempt to grab our lease...
|
||||
if self._get_lease():
|
||||
return True
|
||||
|
||||
if blocking:
|
||||
# If blocking, wait until self._watch_lease_change() is
|
||||
# called before returning
|
||||
self.wake_event.wait(w.leftover())
|
||||
if not self.wake_event.isSet():
|
||||
raise LockTimeout(
|
||||
"Failed to acquire semaphore on %s"
|
||||
" after %s seconds" % (self.path, timeout)
|
||||
)
|
||||
else:
|
||||
return False
|
||||
finally:
|
||||
lock.release()
|
||||
|
||||
def _watch_lease_change(self, event):
|
||||
self.wake_event.set()
|
||||
|
||||
def _get_lease(self, data=None):
|
||||
# Make sure the session is still valid
|
||||
if self._session_expired:
|
||||
raise ForceRetryError("Retry on session loss at top")
|
||||
|
||||
# Make sure that the request hasn't been canceled
|
||||
if self.cancelled:
|
||||
raise CancelledError("Semaphore cancelled")
|
||||
|
||||
# Get a list of the current potential lock holders. If they change,
|
||||
# notify our wake_event object. This is used to unblock a blocking
|
||||
# self._inner_acquire call.
|
||||
children = self.client.get_children(
|
||||
self.path, self._watch_lease_change
|
||||
)
|
||||
|
||||
# If there are leases available, acquire one
|
||||
if len(children) < self.max_leases:
|
||||
self.client.create(self.create_path, self.data, ephemeral=True)
|
||||
|
||||
# Check if our acquisition was successful or not. Update our state.
|
||||
if self.client.exists(self.create_path):
|
||||
self.is_acquired = True
|
||||
else:
|
||||
self.is_acquired = False
|
||||
|
||||
# Return current state
|
||||
return self.is_acquired
|
||||
|
||||
def _watch_session(self, state):
|
||||
if state == KazooState.LOST:
|
||||
self._session_expired = True
|
||||
self.wake_event.set()
|
||||
|
||||
# Return true to de-register
|
||||
return True
|
||||
|
||||
def _best_effort_cleanup(self):
|
||||
try:
|
||||
self.client.delete(self.create_path)
|
||||
except KazooException: # pragma: nocover
|
||||
pass
|
||||
|
||||
def release(self):
|
||||
"""Release the lease immediately."""
|
||||
return self.client.retry(self._inner_release)
|
||||
|
||||
def _inner_release(self):
|
||||
if not self.is_acquired:
|
||||
return False
|
||||
try:
|
||||
self.client.delete(self.create_path)
|
||||
except NoNodeError: # pragma: nocover
|
||||
pass
|
||||
self.is_acquired = False
|
||||
return True
|
||||
|
||||
def lease_holders(self):
|
||||
"""Return an unordered list of the current lease holders.
|
||||
|
||||
.. note::
|
||||
|
||||
If the lease holder did not set an identifier, it will
|
||||
appear as a blank string.
|
||||
|
||||
"""
|
||||
if not self.client.exists(self.path):
|
||||
return []
|
||||
|
||||
children = self.client.get_children(self.path)
|
||||
|
||||
lease_holders = []
|
||||
for child in children:
|
||||
try:
|
||||
data, stat = self.client.get(self.path + "/" + child)
|
||||
lease_holders.append(data.decode("utf-8"))
|
||||
except NoNodeError: # pragma: nocover
|
||||
pass
|
||||
return lease_holders
|
||||
|
||||
def __enter__(self):
|
||||
self.acquire()
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
self.release()
|
Loading…
Reference in New Issue
Block a user