Add a cache for image lists
For sites with large numbers of image uploads, performing a web API request of /image-list can be time-consuming. To improve response time, maintain a TreeCache of image uploads and use that when listing images. The "image-list" CLI is unable to use this cache, and is already as fast as it can be given the number of ZK requests it needs to issue. The same is true for "dib-image-list", and this change adds a cache for that as well. Finally, since we otherwise would have three or four nearly identical cache implementations, the TreeCache system is refactored into a base class that can be used for all the caches. It has some hook points to deal with the very slight behavior differences. Change-Id: Ibff0a9016936b461eccb1b48dcf42f5ad8d8434e
This commit is contained in:
parent
ad7bf9aaeb
commit
6b299871a4
|
@ -715,7 +715,7 @@ class BuildWorker(BaseWorker):
|
|||
:returns: The updated ImageBuild data structure.
|
||||
'''
|
||||
self._lost_zk_connection = False
|
||||
data = zk.ImageBuild()
|
||||
data = zk.ImageBuild(diskimage.name)
|
||||
data.state = zk.BUILDING
|
||||
data.builder_id = self._builder_id
|
||||
data.builder = self._hostname
|
||||
|
@ -959,7 +959,7 @@ class BuildWorker(BaseWorker):
|
|||
|
||||
build_time = time.monotonic() - start_time
|
||||
|
||||
build_data = zk.ImageBuild()
|
||||
build_data = zk.ImageBuild(diskimage.name)
|
||||
build_data.builder_id = self._builder_id
|
||||
build_data.builder = self._hostname
|
||||
build_data.username = diskimage.username
|
||||
|
|
|
@ -194,19 +194,30 @@ def dib_image_list(zk):
|
|||
("state", "State"),
|
||||
("age", "Age")])
|
||||
objs = []
|
||||
for image_name in zk.getImageNames():
|
||||
paused = zk.getImagePaused(image_name)
|
||||
for build_no in zk.getBuildNumbers(image_name):
|
||||
build = zk.getBuild(image_name, build_no)
|
||||
if build:
|
||||
state = paused and 'paused' or build.state
|
||||
objs.append({'id': '-'.join([image_name, build_no]),
|
||||
'image': image_name,
|
||||
'builder': build.builder,
|
||||
'formats': build.formats,
|
||||
'state': state,
|
||||
'age': int(build.state_time)
|
||||
})
|
||||
builds = []
|
||||
image_paused = {}
|
||||
if zk.enable_cache:
|
||||
builds = zk.getCachedBuilds()
|
||||
for image in zk.getCachedImages():
|
||||
image_paused[image.image_name] = image.paused
|
||||
else:
|
||||
for image_name in zk.getImageNames():
|
||||
image_paused[image_name] = \
|
||||
zk.getImagePaused(image_name)
|
||||
for build_no in zk.getBuildNumbers(image_name):
|
||||
build = zk.getBuild(image_name, build_no)
|
||||
if build:
|
||||
builds.append(build)
|
||||
for build in builds:
|
||||
paused = image_paused.get(build._image_name, False)
|
||||
state = paused and 'paused' or build.state
|
||||
objs.append({'id': '-'.join([build._image_name, build.id]),
|
||||
'image': build._image_name,
|
||||
'builder': build.builder,
|
||||
'formats': build.formats,
|
||||
'state': state,
|
||||
'age': int(build.state_time)
|
||||
})
|
||||
return (objs, headers_table)
|
||||
|
||||
|
||||
|
@ -247,22 +258,31 @@ def image_list(zk):
|
|||
("state", "State"),
|
||||
("age", "Age")])
|
||||
objs = []
|
||||
for image_name in zk.getImageNames():
|
||||
for build_no in zk.getBuildNumbers(image_name):
|
||||
for provider in zk.getBuildProviders(image_name, build_no):
|
||||
for upload_no in zk.getImageUploadNumbers(
|
||||
image_name, build_no, provider):
|
||||
upload = zk.getImageUpload(image_name, build_no,
|
||||
provider, upload_no)
|
||||
if not upload:
|
||||
continue
|
||||
values = [build_no, upload_no, provider, image_name,
|
||||
upload.external_name,
|
||||
upload.external_id,
|
||||
upload.state,
|
||||
int(upload.state_time)]
|
||||
objs.append(dict(zip(headers_table.keys(),
|
||||
values)))
|
||||
uploads = []
|
||||
if zk.enable_cache:
|
||||
uploads = zk.getCachedImageUploads()
|
||||
else:
|
||||
for image_name in zk.getImageNames():
|
||||
for build_no in zk.getBuildNumbers(image_name):
|
||||
for provider in zk.getBuildProviders(image_name, build_no):
|
||||
for upload_no in zk.getImageUploadNumbers(
|
||||
image_name, build_no, provider):
|
||||
upload = zk.getImageUpload(image_name, build_no,
|
||||
provider, upload_no)
|
||||
if upload:
|
||||
uploads.append(upload)
|
||||
|
||||
for upload in uploads:
|
||||
values = [upload.build_id,
|
||||
upload.id,
|
||||
upload.provider_name,
|
||||
upload.image_name,
|
||||
upload.external_name,
|
||||
upload.external_id,
|
||||
upload.state,
|
||||
int(upload.state_time)]
|
||||
objs.append(dict(zip(headers_table.keys(),
|
||||
values)))
|
||||
return (objs, headers_table)
|
||||
|
||||
|
||||
|
@ -278,7 +298,8 @@ def request_list(zk):
|
|||
("event_id", "Event ID"),
|
||||
])
|
||||
objs = []
|
||||
for req in zk.nodeRequestIterator():
|
||||
cached_ids = zk.enable_cache
|
||||
for req in zk.nodeRequestIterator(cached_ids=cached_ids):
|
||||
values = [req.id, req.relative_priority,
|
||||
req.state, req.requestor,
|
||||
req.node_types,
|
||||
|
|
|
@ -185,9 +185,10 @@ class ImageBuild(BaseModel):
|
|||
'''
|
||||
VALID_STATES = set([BUILDING, READY, DELETING, FAILED])
|
||||
|
||||
def __init__(self, build_id=None):
|
||||
def __init__(self, image_name=None, build_id=None):
|
||||
super(ImageBuild, self).__init__(build_id)
|
||||
self._formats = []
|
||||
self._image_name = image_name # Not serialized
|
||||
self.builder = None # Hostname
|
||||
self.builder_id = None # Unique ID
|
||||
self.username = None
|
||||
|
@ -230,7 +231,7 @@ class ImageBuild(BaseModel):
|
|||
return d
|
||||
|
||||
@staticmethod
|
||||
def fromDict(d, o_id=None):
|
||||
def fromDict(d, image_name=None, o_id=None):
|
||||
'''
|
||||
Create an ImageBuild object from a dictionary.
|
||||
|
||||
|
@ -239,7 +240,7 @@ class ImageBuild(BaseModel):
|
|||
|
||||
:returns: An initialized ImageBuild object.
|
||||
'''
|
||||
o = ImageBuild(o_id)
|
||||
o = ImageBuild(image_name, o_id)
|
||||
super(ImageBuild, o).fromDict(d)
|
||||
o.builder = d.get('builder')
|
||||
o.builder_id = d.get('builder_id')
|
||||
|
@ -267,6 +268,23 @@ class ImageBuildRequest(object):
|
|||
return "<ImageBuildRequest {}>".format(self.image_name)
|
||||
|
||||
|
||||
class Image:
|
||||
"""Class representing an image.
|
||||
|
||||
This doesn't need to derive from BaseModel since it is not
|
||||
actually serialized to ZK. It exists to hold some in memory flags
|
||||
and attributes.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, image_name):
|
||||
self.image_name = image_name
|
||||
self.paused = False
|
||||
|
||||
def updateFromDict(self, data):
|
||||
pass
|
||||
|
||||
|
||||
class ImageUpload(BaseModel):
|
||||
'''
|
||||
Class representing a provider image upload within the ZooKeeper cluster.
|
||||
|
@ -334,14 +352,18 @@ class ImageUpload(BaseModel):
|
|||
'''
|
||||
o = ImageUpload(build_id, provider_name, image_name, upload_id)
|
||||
super(ImageUpload, o).fromDict(d)
|
||||
o.external_id = d.get('external_id')
|
||||
o.external_name = d.get('external_name')
|
||||
o.format = d.get('format')
|
||||
o.username = d.get('username', 'zuul')
|
||||
o.python_path = d.get('python_path', '/usr/bin/python2')
|
||||
o.shell_type = d.get('shell_type')
|
||||
o.updateFromDict(d)
|
||||
return o
|
||||
|
||||
def updateFromDict(self, d):
|
||||
super().fromDict(d)
|
||||
self.external_id = d.get('external_id')
|
||||
self.external_name = d.get('external_name')
|
||||
self.format = d.get('format')
|
||||
self.username = d.get('username', 'zuul')
|
||||
self.python_path = d.get('python_path', '/usr/bin/python2')
|
||||
self.shell_type = d.get('shell_type')
|
||||
|
||||
|
||||
class NodeRequestLockStats(object):
|
||||
'''
|
||||
|
@ -698,6 +720,237 @@ class Node(BaseModel):
|
|||
self.requestor = d.get('requestor')
|
||||
|
||||
|
||||
class NodepoolTreeCache(abc.ABC):
|
||||
'''
|
||||
Use a ZK TreeCache to keep a cache of local Nodepool objects up to date.
|
||||
'''
|
||||
|
||||
log = logging.getLogger("nodepool.zk.ZooKeeper")
|
||||
|
||||
def __init__(self, zk, root):
|
||||
self.zk = zk
|
||||
self.root = root
|
||||
self._cached_objects = {}
|
||||
self._tree_cache = TreeCache(zk.kazoo_client,
|
||||
self.root)
|
||||
self._tree_cache.listen_fault(self.cacheFaultListener)
|
||||
self._tree_cache.listen(self.cacheListener)
|
||||
self._tree_cache.start()
|
||||
|
||||
def cacheFaultListener(self, e):
|
||||
self.log.exception(e)
|
||||
|
||||
def cacheListener(self, event):
|
||||
try:
|
||||
self._cacheListener(event)
|
||||
except Exception:
|
||||
self.log.exception(
|
||||
"Exception in cache update for event: %s",
|
||||
event)
|
||||
|
||||
def _cacheListener(self, event):
|
||||
if hasattr(event.event_data, 'path'):
|
||||
# Ignore root node
|
||||
path = event.event_data.path
|
||||
if path == self.root:
|
||||
return
|
||||
|
||||
# Ignore any non-node related events such as connection events here
|
||||
if event.event_type not in (TreeEvent.NODE_ADDED,
|
||||
TreeEvent.NODE_UPDATED,
|
||||
TreeEvent.NODE_REMOVED):
|
||||
return
|
||||
|
||||
# Some caches have special handling for certain sub-objects
|
||||
if self.preCacheHook(event):
|
||||
return
|
||||
|
||||
path = event.event_data.path
|
||||
key = self.parsePath(path)
|
||||
if key is None:
|
||||
return
|
||||
|
||||
if event.event_type in (TreeEvent.NODE_ADDED, TreeEvent.NODE_UPDATED):
|
||||
# Images with empty data are invalid so skip add or update these.
|
||||
if event.event_data.data:
|
||||
data = self.zk._bytesToDict(event.event_data.data)
|
||||
else:
|
||||
data = None
|
||||
|
||||
# Perform an in-place update of the cached image if possible
|
||||
old_obj = self._cached_objects.get(key)
|
||||
if old_obj:
|
||||
if event.event_data.stat.version <= old_obj.stat.version:
|
||||
# Don't update to older data
|
||||
return
|
||||
if getattr(old_obj, 'lock', None):
|
||||
# Don't update a locked object
|
||||
return
|
||||
old_obj.updateFromDict(data)
|
||||
old_obj.stat = event.event_data.stat
|
||||
else:
|
||||
obj = self.objectFromDict(data, key)
|
||||
obj.stat = event.event_data.stat
|
||||
self._cached_objects[key] = obj
|
||||
self.postCacheHook(event)
|
||||
elif event.event_type == TreeEvent.NODE_REMOVED:
|
||||
try:
|
||||
del self._cached_objects[key]
|
||||
except KeyError:
|
||||
# If it's already gone, don't care
|
||||
pass
|
||||
self.postCacheHook(event)
|
||||
|
||||
def close(self):
|
||||
self._tree_cache.close()
|
||||
|
||||
# Methods for subclasses:
|
||||
def preCacheHook(self, event):
|
||||
"""Called before the cache is updated
|
||||
|
||||
This is called for any add/update/remove event under the root,
|
||||
even for paths that are ignored, so users much test the
|
||||
relevance of the path in this method.
|
||||
|
||||
If True is returned, then the cache will stop processing the event.
|
||||
Other return values are ignored.
|
||||
"""
|
||||
return None
|
||||
|
||||
def postCacheHook(self, event):
|
||||
"""Called after the cache has been updated"""
|
||||
return None
|
||||
|
||||
@abc.abstractmethod
|
||||
def parsePath(self, path):
|
||||
"""Parse the path and return a cache key
|
||||
|
||||
The cache key is an opaque object ignored by the cache, but
|
||||
must be hashable.
|
||||
|
||||
A convention is to use a tuple of relevant path components as
|
||||
the key.
|
||||
|
||||
Return None to indicate the path is not relevant to the cache.
|
||||
|
||||
"""
|
||||
return None
|
||||
|
||||
@abc.abstractmethod
|
||||
def objectFromDict(self, d, key):
|
||||
"""Construct an object from ZooKeeper data
|
||||
|
||||
Given a dictionary of data from ZK and cache key, construct
|
||||
and return an object to insert into the cache.
|
||||
|
||||
:param dict d: The dictionary.
|
||||
:param object key: The key as returned by parsePath.
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class ImageCache(NodepoolTreeCache):
|
||||
def parsePath(self, path):
|
||||
r = self.zk._parseImageUploadPath(path)
|
||||
if not r:
|
||||
r = self.zk._parseImageBuildPath(path)
|
||||
if not r:
|
||||
r = self.zk._parseImagePath(path)
|
||||
return r
|
||||
|
||||
def preCacheHook(self, event):
|
||||
key = self.zk._parseImagePausePath(event.event_data.path)
|
||||
if key is None:
|
||||
return
|
||||
# A pause flag is being added or removed
|
||||
# The image key is identical to the image pause path key.
|
||||
image = self._cached_objects.get(key)
|
||||
if not image:
|
||||
return
|
||||
if event.event_type in (TreeEvent.NODE_ADDED,
|
||||
TreeEvent.NODE_UPDATED):
|
||||
image.paused = True
|
||||
elif event.event_type == TreeEvent.NODE_REMOVED:
|
||||
image.paused = False
|
||||
# This event was for a paused path; no further handling necessary
|
||||
return True
|
||||
|
||||
def objectFromDict(self, d, key):
|
||||
if len(key) == 4:
|
||||
image, build_number, provider, upload_number = key
|
||||
return ImageUpload.fromDict(d,
|
||||
build_number,
|
||||
provider,
|
||||
image,
|
||||
upload_number)
|
||||
elif len(key) == 2:
|
||||
image, build_number = key
|
||||
return ImageBuild.fromDict(d,
|
||||
image,
|
||||
build_number)
|
||||
elif len(key) == 1:
|
||||
image = key[0]
|
||||
return Image(image)
|
||||
|
||||
|
||||
class NodeCache(NodepoolTreeCache):
|
||||
def parsePath(self, path):
|
||||
return self.zk._parseNodePath(path)
|
||||
|
||||
def preCacheHook(self, event):
|
||||
key = self.zk._parseNodeLockPath(event.event_data.path)
|
||||
if key is None:
|
||||
return
|
||||
# A lock contender is being added or removed
|
||||
node_id, contender = key
|
||||
# Construct a key for the node object
|
||||
obj_key = (node_id,)
|
||||
node = self._cached_objects.get(obj_key)
|
||||
if not node:
|
||||
return
|
||||
if event.event_type in (TreeEvent.NODE_ADDED,
|
||||
TreeEvent.NODE_UPDATED):
|
||||
node.lock_contenders.add(contender)
|
||||
elif event.event_type == TreeEvent.NODE_REMOVED:
|
||||
node.lock_contenders.discard(contender)
|
||||
# This event was for a lock path; no further handling necessary
|
||||
return True
|
||||
|
||||
def postCacheHook(self, event):
|
||||
# set the stats event so the stats reporting thread can act upon it
|
||||
if self.zk.node_stats_event is not None:
|
||||
self.zk.node_stats_event.set()
|
||||
|
||||
def objectFromDict(self, d, key):
|
||||
node_id = key[0]
|
||||
return Node.fromDict(d, node_id)
|
||||
|
||||
def getNode(self, node_id):
|
||||
return self._cached_objects.get((node_id,))
|
||||
|
||||
def getNodeIds(self):
|
||||
# get a copy of the values view to avoid runtime errors in the event
|
||||
# the _cached_nodes dict gets updated while iterating
|
||||
return [x.id for x in list(self._cached_objects.values())]
|
||||
|
||||
|
||||
class RequestCache(NodepoolTreeCache):
|
||||
def parsePath(self, path):
|
||||
return self.zk._parseRequestPath(path)
|
||||
|
||||
def objectFromDict(self, d, key):
|
||||
request_id = key[0]
|
||||
return NodeRequest.fromDict(d, request_id)
|
||||
|
||||
def getNodeRequest(self, request_id):
|
||||
return self._cached_objects.get((request_id,))
|
||||
|
||||
def getNodeRequestIds(self):
|
||||
# get a copy of the values view to avoid runtime errors in the event
|
||||
# the _cached_nodes dict gets updated while iterating
|
||||
return [x.id for x in list(self._cached_objects.values())]
|
||||
|
||||
|
||||
class ZooKeeper(ZooKeeperBase):
|
||||
'''
|
||||
Class implementing the ZooKeeper interface.
|
||||
|
@ -734,8 +987,7 @@ class ZooKeeper(ZooKeeperBase):
|
|||
self._last_retry_log = 0
|
||||
self._node_cache = None
|
||||
self._request_cache = None
|
||||
self._cached_nodes = {}
|
||||
self._cached_node_requests = {}
|
||||
self._image_cache = None
|
||||
self.enable_cache = enable_cache
|
||||
self.node_stats_event = None
|
||||
|
||||
|
@ -749,16 +1001,9 @@ class ZooKeeper(ZooKeeperBase):
|
|||
# =======================================================================
|
||||
def _onConnect(self):
|
||||
if self.enable_cache:
|
||||
self._node_cache = TreeCache(self.kazoo_client, self.NODE_ROOT)
|
||||
self._node_cache.listen_fault(self.cacheFaultListener)
|
||||
self._node_cache.listen(self.nodeCacheListener)
|
||||
self._node_cache.start()
|
||||
|
||||
self._request_cache = TreeCache(self.kazoo_client,
|
||||
self.REQUEST_ROOT)
|
||||
self._request_cache.listen_fault(self.cacheFaultListener)
|
||||
self._request_cache.listen(self.requestCacheListener)
|
||||
self._request_cache.start()
|
||||
self._node_cache = NodeCache(self, self.NODE_ROOT)
|
||||
self._request_cache = RequestCache(self, self.REQUEST_ROOT)
|
||||
self._image_cache = ImageCache(self, self.IMAGE_ROOT)
|
||||
|
||||
def _onDisconnect(self):
|
||||
if self._node_cache is not None:
|
||||
|
@ -769,12 +1014,26 @@ class ZooKeeper(ZooKeeperBase):
|
|||
self._request_cache.close()
|
||||
self._request_cache = None
|
||||
|
||||
if self._image_cache is not None:
|
||||
self._image_cache.close()
|
||||
self._image_cache = None
|
||||
|
||||
def _electionPath(self, election):
|
||||
return "%s/%s" % (self.ELECTION_ROOT, election)
|
||||
|
||||
def _imagePath(self, image):
|
||||
return "%s/%s" % (self.IMAGE_ROOT, image)
|
||||
|
||||
def _parseImagePath(self, path):
|
||||
if not path.startswith(self.IMAGE_ROOT):
|
||||
return None
|
||||
path = path[len(self.IMAGE_ROOT):]
|
||||
parts = path.split('/')
|
||||
if len(parts) != 2:
|
||||
return None
|
||||
image = parts[1]
|
||||
return (image,)
|
||||
|
||||
def _imageBuildRequestPath(self, image):
|
||||
return "%s/request-build" % self._imagePath(image)
|
||||
|
||||
|
@ -784,9 +1043,32 @@ class ZooKeeper(ZooKeeperBase):
|
|||
def _imagePausePath(self, image):
|
||||
return "%s/pause" % self._imagePath(image)
|
||||
|
||||
def _parseImagePausePath(self, path):
|
||||
if not path.startswith(self.IMAGE_ROOT):
|
||||
return None
|
||||
path = path[len(self.IMAGE_ROOT):]
|
||||
parts = path.split('/')
|
||||
if len(parts) != 3:
|
||||
return None
|
||||
if parts[2] != 'pause':
|
||||
return None
|
||||
image = parts[1]
|
||||
return (image,)
|
||||
|
||||
def _imageBuildNumberPath(self, image, build_number):
|
||||
return "%s/%s" % (self._imageBuildsPath(image), build_number)
|
||||
|
||||
def _parseImageBuildPath(self, path):
|
||||
if not path.startswith(self.IMAGE_ROOT):
|
||||
return None
|
||||
path = path[len(self.IMAGE_ROOT):]
|
||||
parts = path.split('/')
|
||||
if len(parts) != 4:
|
||||
return None
|
||||
image = parts[1]
|
||||
build = parts[3]
|
||||
return image, build
|
||||
|
||||
def _imageBuildLockPath(self, image):
|
||||
return "%s/lock" % self._imageBuildsPath(image)
|
||||
|
||||
|
@ -802,6 +1084,19 @@ class ZooKeeper(ZooKeeperBase):
|
|||
build_number,
|
||||
provider)
|
||||
|
||||
def _parseImageUploadPath(self, path):
|
||||
if not path.startswith(self.IMAGE_ROOT):
|
||||
return None
|
||||
path = path[len(self.IMAGE_ROOT):]
|
||||
parts = path.split('/')
|
||||
if len(parts) != 8:
|
||||
return None
|
||||
image = parts[1]
|
||||
build = parts[3]
|
||||
provider = parts[5]
|
||||
upload = parts[7]
|
||||
return image, build, provider, upload
|
||||
|
||||
def _imageUploadLockPath(self, image, build_number, provider):
|
||||
return "%s/lock" % self._imageUploadPath(image, build_number,
|
||||
provider)
|
||||
|
@ -818,12 +1113,45 @@ class ZooKeeper(ZooKeeperBase):
|
|||
def _nodePath(self, node):
|
||||
return "%s/%s" % (self.NODE_ROOT, node)
|
||||
|
||||
def _parseNodePath(self, path):
|
||||
if not path.startswith(self.NODE_ROOT):
|
||||
return None
|
||||
path = path[len(self.NODE_ROOT):]
|
||||
parts = path.split('/')
|
||||
if len(parts) != 2:
|
||||
return None
|
||||
node = parts[1]
|
||||
return (node,)
|
||||
|
||||
def _nodeLockPath(self, node):
|
||||
return "%s/%s/lock" % (self.NODE_ROOT, node)
|
||||
|
||||
def _parseNodeLockPath(self, path):
|
||||
if not path.startswith(self.NODE_ROOT):
|
||||
return None
|
||||
path = path[len(self.NODE_ROOT):]
|
||||
parts = path.split('/')
|
||||
if len(parts) != 3:
|
||||
return None
|
||||
if parts[2] != 'lock':
|
||||
return None
|
||||
node = parts[1]
|
||||
contender = parts[3]
|
||||
return (node, contender)
|
||||
|
||||
def _requestPath(self, request):
|
||||
return "%s/%s" % (self.REQUEST_ROOT, request)
|
||||
|
||||
def _parseRequestPath(self, path):
|
||||
if not path.startswith(self.REQUEST_ROOT):
|
||||
return None
|
||||
path = path[len(self.REQUEST_ROOT):]
|
||||
parts = path.split('/')
|
||||
if len(parts) != 2:
|
||||
return None
|
||||
request = parts[1]
|
||||
return (request,)
|
||||
|
||||
def _requestLockPath(self, request):
|
||||
return "%s/%s" % (self.REQUEST_LOCK_ROOT, request)
|
||||
|
||||
|
@ -1085,6 +1413,19 @@ class ZooKeeper(ZooKeeperBase):
|
|||
return []
|
||||
return sorted(images)
|
||||
|
||||
def getCachedImages(self):
|
||||
'''
|
||||
Retrieve all cached images
|
||||
|
||||
:returns: A list of Image objects.
|
||||
'''
|
||||
if not self.enable_cache:
|
||||
raise RuntimeError("Caching not enabled")
|
||||
items = self._image_cache._cached_objects.items()
|
||||
items = sorted(items, key=lambda x: x[0])
|
||||
return [x[1] for x in items
|
||||
if isinstance(x[1], Image)]
|
||||
|
||||
def getImagePaused(self, image):
|
||||
'''
|
||||
Return the pause flag for an image.
|
||||
|
@ -1190,7 +1531,8 @@ class ZooKeeper(ZooKeeperBase):
|
|||
return None
|
||||
|
||||
try:
|
||||
d = ImageBuild.fromDict(self._bytesToDict(data), build_number)
|
||||
d = ImageBuild.fromDict(self._bytesToDict(data),
|
||||
image, build_number)
|
||||
except json.decoder.JSONDecodeError:
|
||||
self.log.exception('Error loading json data from image build %s',
|
||||
path)
|
||||
|
@ -1228,6 +1570,19 @@ class ZooKeeper(ZooKeeperBase):
|
|||
|
||||
return matches
|
||||
|
||||
def getCachedBuilds(self):
|
||||
'''
|
||||
Retrieve all image build data from the cache
|
||||
|
||||
:returns: A list of ImageBuild objects.
|
||||
'''
|
||||
if not self.enable_cache:
|
||||
raise RuntimeError("Caching not enabled")
|
||||
items = self._image_cache._cached_objects.items()
|
||||
items = sorted(items, key=lambda x: x[0])
|
||||
return [x[1] for x in items
|
||||
if isinstance(x[1], ImageBuild)]
|
||||
|
||||
def getMostRecentBuilds(self, count, image, state=None):
|
||||
'''
|
||||
Retrieve the most recent image build data with the given state.
|
||||
|
@ -1422,6 +1777,19 @@ class ZooKeeper(ZooKeeperBase):
|
|||
|
||||
return recent_data
|
||||
|
||||
def getCachedImageUploads(self):
|
||||
'''
|
||||
Retrieve all image upload data from the cache
|
||||
|
||||
:returns: A list of ImageUpload objects.
|
||||
'''
|
||||
if not self.enable_cache:
|
||||
raise RuntimeError("Caching not enabled")
|
||||
items = self._image_cache._cached_objects.items()
|
||||
items = sorted(items, key=lambda x: x[0])
|
||||
return [x[1] for x in items
|
||||
if isinstance(x[1], ImageUpload)]
|
||||
|
||||
def storeImageUpload(self, image, build_number, provider, image_data,
|
||||
upload_number=None):
|
||||
'''
|
||||
|
@ -1662,8 +2030,8 @@ class ZooKeeper(ZooKeeperBase):
|
|||
|
||||
:returns: The request data, or None if the request was not found.
|
||||
'''
|
||||
if cached:
|
||||
d = self._cached_node_requests.get(request)
|
||||
if cached and self._request_cache:
|
||||
d = self._request_cache.getNodeRequest(request)
|
||||
if d:
|
||||
return d
|
||||
|
||||
|
@ -1901,8 +2269,8 @@ class ZooKeeper(ZooKeeperBase):
|
|||
|
||||
:returns: The node data, or None if the node was not found.
|
||||
'''
|
||||
if cached:
|
||||
d = self._cached_nodes.get(node)
|
||||
if cached and self._node_cache:
|
||||
d = self._node_cache.getNode(node)
|
||||
if d:
|
||||
return d
|
||||
|
||||
|
@ -2126,9 +2494,7 @@ class ZooKeeper(ZooKeeperBase):
|
|||
'''
|
||||
|
||||
if cached_ids:
|
||||
# get a copy of the keys view to avoid runtime errors in the event
|
||||
# the _cached_nodes dict gets updated while iterating
|
||||
node_ids = list(self._cached_nodes.keys())
|
||||
node_ids = self._node_cache.getNodeIds()
|
||||
else:
|
||||
node_ids = self.getNodes()
|
||||
|
||||
|
@ -2146,11 +2512,20 @@ class ZooKeeper(ZooKeeperBase):
|
|||
if lock_stats:
|
||||
yield lock_stats
|
||||
|
||||
def nodeRequestIterator(self, cached=True):
|
||||
def nodeRequestIterator(self, cached=True, cached_ids=False):
|
||||
'''
|
||||
Utility generator method for iterating through all nodes requests.
|
||||
|
||||
:param bool cached: True if the data should be taken from the cache.
|
||||
:param bool cached_ids: True if the request IDs should be taken from
|
||||
the cache.
|
||||
'''
|
||||
for req_id in self.getNodeRequests():
|
||||
if cached_ids:
|
||||
req_ids = self._request_cache.getNodeRequestIds()
|
||||
else:
|
||||
req_ids = self.getNodeRequests()
|
||||
|
||||
for req_id in req_ids:
|
||||
req = self.getNodeRequest(req_id, cached=cached)
|
||||
if req:
|
||||
yield req
|
||||
|
@ -2255,147 +2630,9 @@ class ZooKeeper(ZooKeeperBase):
|
|||
for node in provider_nodes:
|
||||
self.deleteNode(node)
|
||||
|
||||
def cacheFaultListener(self, e):
|
||||
self.log.exception(e)
|
||||
|
||||
def nodeCacheListener(self, event):
|
||||
try:
|
||||
self._nodeCacheListener(event)
|
||||
except Exception:
|
||||
self.log.exception("Exception in node cache update for event: %s",
|
||||
event)
|
||||
|
||||
def _nodeCacheListener(self, event):
|
||||
if hasattr(event.event_data, 'path'):
|
||||
# Ignore root node
|
||||
path = event.event_data.path
|
||||
if path == self.NODE_ROOT:
|
||||
return
|
||||
|
||||
if path.endswith('/lock'):
|
||||
return
|
||||
|
||||
# Ignore any non-node related events such as connection events here
|
||||
if event.event_type not in (TreeEvent.NODE_ADDED,
|
||||
TreeEvent.NODE_UPDATED,
|
||||
TreeEvent.NODE_REMOVED):
|
||||
return
|
||||
|
||||
path = event.event_data.path
|
||||
node_path = path[len(self.NODE_ROOT) + 1:]
|
||||
parts = node_path.split('/')
|
||||
node_id = parts[0]
|
||||
if len(parts) > 1 and parts[1] == 'lock':
|
||||
if len(parts) > 2:
|
||||
# A lock contender is being added or removed
|
||||
contender = parts[2]
|
||||
old_node = self._cached_nodes.get(node_id)
|
||||
if not old_node:
|
||||
return
|
||||
if event.event_type in (TreeEvent.NODE_ADDED,
|
||||
TreeEvent.NODE_UPDATED):
|
||||
old_node.lock_contenders.add(contender)
|
||||
elif event.event_type == TreeEvent.NODE_REMOVED:
|
||||
old_node.lock_contenders.discard(contender)
|
||||
# This event was for a lock path; no further handling necessary
|
||||
return
|
||||
|
||||
if event.event_type in (TreeEvent.NODE_ADDED, TreeEvent.NODE_UPDATED):
|
||||
# Nodes with empty data are invalid so skip add or update these.
|
||||
if not event.event_data.data:
|
||||
return
|
||||
|
||||
# Perform an in-place update of the already cached node if possible
|
||||
d = self._bytesToDict(event.event_data.data)
|
||||
old_node = self._cached_nodes.get(node_id)
|
||||
if old_node:
|
||||
if event.event_data.stat.version <= old_node.stat.version:
|
||||
# Don't update to older data
|
||||
return
|
||||
if old_node.lock:
|
||||
# Don't update a locked node
|
||||
return
|
||||
old_node.updateFromDict(d)
|
||||
old_node.stat = event.event_data.stat
|
||||
else:
|
||||
node = Node.fromDict(d, node_id)
|
||||
node.stat = event.event_data.stat
|
||||
self._cached_nodes[node_id] = node
|
||||
|
||||
# set the stats event so the stats reporting thread can act upon it
|
||||
if self.node_stats_event is not None:
|
||||
self.node_stats_event.set()
|
||||
elif event.event_type == TreeEvent.NODE_REMOVED:
|
||||
try:
|
||||
del self._cached_nodes[node_id]
|
||||
except KeyError:
|
||||
# If it's already gone, don't care
|
||||
pass
|
||||
|
||||
# set the stats event so the stats reporting thread can act upon it
|
||||
if self.node_stats_event is not None:
|
||||
self.node_stats_event.set()
|
||||
|
||||
def setNodeStatsEvent(self, event):
|
||||
self.node_stats_event = event
|
||||
|
||||
def requestCacheListener(self, event):
|
||||
try:
|
||||
self._requestCacheListener(event)
|
||||
except Exception:
|
||||
self.log.exception(
|
||||
"Exception in request cache update for event: %s",
|
||||
event)
|
||||
|
||||
def _requestCacheListener(self, event):
|
||||
if hasattr(event.event_data, 'path'):
|
||||
# Ignore root node
|
||||
path = event.event_data.path
|
||||
if path == self.REQUEST_ROOT:
|
||||
return
|
||||
|
||||
# Ignore lock nodes
|
||||
if '/lock' in path:
|
||||
return
|
||||
|
||||
# Ignore any non-node related events such as connection events here
|
||||
if event.event_type not in (TreeEvent.NODE_ADDED,
|
||||
TreeEvent.NODE_UPDATED,
|
||||
TreeEvent.NODE_REMOVED):
|
||||
return
|
||||
|
||||
path = event.event_data.path
|
||||
request_id = path.rsplit('/', 1)[1]
|
||||
|
||||
if event.event_type in (TreeEvent.NODE_ADDED, TreeEvent.NODE_UPDATED):
|
||||
# Requests with empty data are invalid so skip add or update these.
|
||||
if not event.event_data.data:
|
||||
return
|
||||
|
||||
# Perform an in-place update of the cached request if possible
|
||||
d = self._bytesToDict(event.event_data.data)
|
||||
old_request = self._cached_node_requests.get(request_id)
|
||||
if old_request:
|
||||
if event.event_data.stat.version <= old_request.stat.version:
|
||||
# Don't update to older data
|
||||
return
|
||||
if old_request.lock:
|
||||
# Don't update a locked node request
|
||||
return
|
||||
old_request.updateFromDict(d)
|
||||
old_request.stat = event.event_data.stat
|
||||
else:
|
||||
request = NodeRequest.fromDict(d, request_id)
|
||||
request.stat = event.event_data.stat
|
||||
self._cached_node_requests[request_id] = request
|
||||
|
||||
elif event.event_type == TreeEvent.NODE_REMOVED:
|
||||
try:
|
||||
del self._cached_node_requests[request_id]
|
||||
except KeyError:
|
||||
# If it's already gone, don't care
|
||||
pass
|
||||
|
||||
def getStatsElection(self, identifier):
|
||||
path = self._electionPath('stats')
|
||||
return Election(self.kazoo_client, path, identifier)
|
||||
|
|
Loading…
Reference in New Issue