|
|
@ -17,6 +17,7 @@ import time |
|
|
|
from kazoo.client import KazooClient, KazooState |
|
|
|
from kazoo import exceptions as kze |
|
|
|
from kazoo.handlers.threading import KazooTimeoutError |
|
|
|
from kazoo.recipe.cache import TreeCache, TreeEvent |
|
|
|
from kazoo.recipe.lock import Lock |
|
|
|
|
|
|
|
import zuul.model |
|
|
@ -48,13 +49,26 @@ class ZooKeeper(object): |
|
|
|
# Log zookeeper retry every 10 seconds |
|
|
|
retry_log_rate = 10 |
|
|
|
|
|
|
|
def __init__(self): |
|
|
|
def __init__(self, enable_cache=True): |
|
|
|
''' |
|
|
|
Initialize the ZooKeeper object. |
|
|
|
|
|
|
|
:param bool enable_cache: When True, enables caching of ZooKeeper |
|
|
|
objects (e.g., HoldRequests). |
|
|
|
''' |
|
|
|
self.client = None |
|
|
|
self._became_lost = False |
|
|
|
self._last_retry_log = 0 |
|
|
|
self.enable_cache = enable_cache |
|
|
|
|
|
|
|
# The caching model we use is designed around handing out model |
|
|
|
# data as objects. To do this, we use two caches: one is a TreeCache |
|
|
|
# which contains raw znode data (among other details), and one for |
|
|
|
# storing that data serialized as objects. This allows us to return |
|
|
|
# objects from the APIs, and avoids calling the methods to serialize |
|
|
|
# the data into objects more than once. |
|
|
|
self._hold_request_tree = None |
|
|
|
self._cached_hold_requests = {} |
|
|
|
|
|
|
|
def _dictToStr(self, data): |
|
|
|
return json.dumps(data).encode('utf8') |
|
|
@ -126,6 +140,67 @@ class ZooKeeper(object): |
|
|
|
except KazooTimeoutError: |
|
|
|
self.logConnectionRetryEvent() |
|
|
|
|
|
|
|
if self.enable_cache: |
|
|
|
self._hold_request_tree = TreeCache(self.client, |
|
|
|
self.HOLD_REQUEST_ROOT) |
|
|
|
self._hold_request_tree.listen_fault(self.cacheFaultListener) |
|
|
|
self._hold_request_tree.listen(self.holdRequestCacheListener) |
|
|
|
self._hold_request_tree.start() |
|
|
|
|
|
|
|
def cacheFaultListener(self, e): |
|
|
|
self.log.exception(e) |
|
|
|
|
|
|
|
def holdRequestCacheListener(self, event): |
|
|
|
''' |
|
|
|
Keep the hold request object cache in sync with the TreeCache. |
|
|
|
''' |
|
|
|
try: |
|
|
|
self._holdRequestCacheListener(event) |
|
|
|
except Exception: |
|
|
|
self.log.exception( |
|
|
|
"Exception in hold request cache update for event: %s", event) |
|
|
|
|
|
|
|
def _holdRequestCacheListener(self, event): |
|
|
|
if hasattr(event.event_data, 'path'): |
|
|
|
# Ignore root node |
|
|
|
path = event.event_data.path |
|
|
|
if path == self.HOLD_REQUEST_ROOT: |
|
|
|
return |
|
|
|
|
|
|
|
if event.event_type not in (TreeEvent.NODE_ADDED, |
|
|
|
TreeEvent.NODE_UPDATED, |
|
|
|
TreeEvent.NODE_REMOVED): |
|
|
|
return |
|
|
|
|
|
|
|
path = event.event_data.path |
|
|
|
request_id = path.rsplit('/', 1)[1] |
|
|
|
|
|
|
|
if event.event_type in (TreeEvent.NODE_ADDED, TreeEvent.NODE_UPDATED): |
|
|
|
# Requests with no data are invalid |
|
|
|
if not event.event_data.data: |
|
|
|
return |
|
|
|
|
|
|
|
# Perform an in-place update of the already cached request |
|
|
|
d = self._bytesToDict(event.event_data.data) |
|
|
|
old_request = self._cached_hold_requests.get(request_id) |
|
|
|
if old_request: |
|
|
|
if event.event_data.stat.version <= old_request.stat.version: |
|
|
|
# Don't update to older data |
|
|
|
return |
|
|
|
old_request.updateFromDict(d) |
|
|
|
old_request.stat = event.event_data.stat |
|
|
|
else: |
|
|
|
request = zuul.model.HoldRequest.fromDict(d) |
|
|
|
request.id = request_id |
|
|
|
request.stat = event.event_data.stat |
|
|
|
self._cached_hold_requests[request_id] = request |
|
|
|
|
|
|
|
elif event.event_type == TreeEvent.NODE_REMOVED: |
|
|
|
try: |
|
|
|
del self._cached_hold_requests[request_id] |
|
|
|
except KeyError: |
|
|
|
pass |
|
|
|
|
|
|
|
def disconnect(self): |
|
|
|
''' |
|
|
|
Close the ZooKeeper cluster connection. |
|
|
@ -133,6 +208,10 @@ class ZooKeeper(object): |
|
|
|
You should call this method if you used connect() to establish a |
|
|
|
cluster connection. |
|
|
|
''' |
|
|
|
if self._hold_request_tree is not None: |
|
|
|
self._hold_request_tree.close() |
|
|
|
self._hold_request_tree = None |
|
|
|
|
|
|
|
if self.client is not None and self.client.connected: |
|
|
|
self.client.stop() |
|
|
|
self.client.close() |
|
|
@ -480,6 +559,7 @@ class ZooKeeper(object): |
|
|
|
|
|
|
|
obj = zuul.model.HoldRequest.fromDict(self._strToDict(data)) |
|
|
|
obj.id = hold_request_id |
|
|
|
obj.stat = stat |
|
|
|
return obj |
|
|
|
|
|
|
|
def storeHoldRequest(self, hold_request): |
|
|
|