Merge "Add a cache for image lists"
This commit is contained in:
commit
38177b0a2f
|
@ -715,7 +715,7 @@ class BuildWorker(BaseWorker):
|
|||
:returns: The updated ImageBuild data structure.
|
||||
'''
|
||||
self._lost_zk_connection = False
|
||||
data = zk.ImageBuild()
|
||||
data = zk.ImageBuild(diskimage.name)
|
||||
data.state = zk.BUILDING
|
||||
data.builder_id = self._builder_id
|
||||
data.builder = self._hostname
|
||||
|
@ -959,7 +959,7 @@ class BuildWorker(BaseWorker):
|
|||
|
||||
build_time = time.monotonic() - start_time
|
||||
|
||||
build_data = zk.ImageBuild()
|
||||
build_data = zk.ImageBuild(diskimage.name)
|
||||
build_data.builder_id = self._builder_id
|
||||
build_data.builder = self._hostname
|
||||
build_data.username = diskimage.username
|
||||
|
|
|
@ -194,19 +194,30 @@ def dib_image_list(zk):
|
|||
("state", "State"),
|
||||
("age", "Age")])
|
||||
objs = []
|
||||
for image_name in zk.getImageNames():
|
||||
paused = zk.getImagePaused(image_name)
|
||||
for build_no in zk.getBuildNumbers(image_name):
|
||||
build = zk.getBuild(image_name, build_no)
|
||||
if build:
|
||||
state = paused and 'paused' or build.state
|
||||
objs.append({'id': '-'.join([image_name, build_no]),
|
||||
'image': image_name,
|
||||
'builder': build.builder,
|
||||
'formats': build.formats,
|
||||
'state': state,
|
||||
'age': int(build.state_time)
|
||||
})
|
||||
builds = []
|
||||
image_paused = {}
|
||||
if zk.enable_cache:
|
||||
builds = zk.getCachedBuilds()
|
||||
for image in zk.getCachedImages():
|
||||
image_paused[image.image_name] = image.paused
|
||||
else:
|
||||
for image_name in zk.getImageNames():
|
||||
image_paused[image_name] = \
|
||||
zk.getImagePaused(image_name)
|
||||
for build_no in zk.getBuildNumbers(image_name):
|
||||
build = zk.getBuild(image_name, build_no)
|
||||
if build:
|
||||
builds.append(build)
|
||||
for build in builds:
|
||||
paused = image_paused.get(build._image_name, False)
|
||||
state = paused and 'paused' or build.state
|
||||
objs.append({'id': '-'.join([build._image_name, build.id]),
|
||||
'image': build._image_name,
|
||||
'builder': build.builder,
|
||||
'formats': build.formats,
|
||||
'state': state,
|
||||
'age': int(build.state_time)
|
||||
})
|
||||
return (objs, headers_table)
|
||||
|
||||
|
||||
|
@ -247,22 +258,31 @@ def image_list(zk):
|
|||
("state", "State"),
|
||||
("age", "Age")])
|
||||
objs = []
|
||||
for image_name in zk.getImageNames():
|
||||
for build_no in zk.getBuildNumbers(image_name):
|
||||
for provider in zk.getBuildProviders(image_name, build_no):
|
||||
for upload_no in zk.getImageUploadNumbers(
|
||||
image_name, build_no, provider):
|
||||
upload = zk.getImageUpload(image_name, build_no,
|
||||
provider, upload_no)
|
||||
if not upload:
|
||||
continue
|
||||
values = [build_no, upload_no, provider, image_name,
|
||||
upload.external_name,
|
||||
upload.external_id,
|
||||
upload.state,
|
||||
int(upload.state_time)]
|
||||
objs.append(dict(zip(headers_table.keys(),
|
||||
values)))
|
||||
uploads = []
|
||||
if zk.enable_cache:
|
||||
uploads = zk.getCachedImageUploads()
|
||||
else:
|
||||
for image_name in zk.getImageNames():
|
||||
for build_no in zk.getBuildNumbers(image_name):
|
||||
for provider in zk.getBuildProviders(image_name, build_no):
|
||||
for upload_no in zk.getImageUploadNumbers(
|
||||
image_name, build_no, provider):
|
||||
upload = zk.getImageUpload(image_name, build_no,
|
||||
provider, upload_no)
|
||||
if upload:
|
||||
uploads.append(upload)
|
||||
|
||||
for upload in uploads:
|
||||
values = [upload.build_id,
|
||||
upload.id,
|
||||
upload.provider_name,
|
||||
upload.image_name,
|
||||
upload.external_name,
|
||||
upload.external_id,
|
||||
upload.state,
|
||||
int(upload.state_time)]
|
||||
objs.append(dict(zip(headers_table.keys(),
|
||||
values)))
|
||||
return (objs, headers_table)
|
||||
|
||||
|
||||
|
@ -278,7 +298,8 @@ def request_list(zk):
|
|||
("event_id", "Event ID"),
|
||||
])
|
||||
objs = []
|
||||
for req in zk.nodeRequestIterator():
|
||||
cached_ids = zk.enable_cache
|
||||
for req in zk.nodeRequestIterator(cached_ids=cached_ids):
|
||||
values = [req.id, req.relative_priority,
|
||||
req.state, req.requestor,
|
||||
req.node_types,
|
||||
|
|
|
@ -269,6 +269,12 @@ class TestWebApp(tests.DBTestCase):
|
|||
req.requestor = 'test_request_list'
|
||||
self.zk.storeNodeRequest(req)
|
||||
|
||||
webzk = webapp.nodepool.getZK()
|
||||
for _ in iterate_timeout(30, Exception, 'cache update'):
|
||||
reqs = webzk._request_cache.getNodeRequestIds()
|
||||
if req.id in reqs:
|
||||
break
|
||||
|
||||
http_req = request.Request(
|
||||
"http://localhost:%s/request-list" % port)
|
||||
http_req.add_header('Accept', 'application/json')
|
||||
|
|
|
@ -933,7 +933,7 @@ class TestZKModel(tests.BaseTestCase):
|
|||
'state_time': now
|
||||
}
|
||||
|
||||
o = zk.ImageBuild.fromDict(d, d_id)
|
||||
o = zk.ImageBuild.fromDict(d, 'image_name', d_id)
|
||||
self.assertEqual(o.id, d_id)
|
||||
self.assertEqual(o.state, d['state'])
|
||||
self.assertEqual(o.state_time, d['state_time'])
|
||||
|
|
|
@ -185,9 +185,10 @@ class ImageBuild(BaseModel):
|
|||
'''
|
||||
VALID_STATES = set([BUILDING, READY, DELETING, FAILED])
|
||||
|
||||
def __init__(self, build_id=None):
|
||||
def __init__(self, image_name=None, build_id=None):
|
||||
super(ImageBuild, self).__init__(build_id)
|
||||
self._formats = []
|
||||
self._image_name = image_name # Not serialized
|
||||
self.builder = None # Hostname
|
||||
self.builder_id = None # Unique ID
|
||||
self.username = None
|
||||
|
@ -230,7 +231,7 @@ class ImageBuild(BaseModel):
|
|||
return d
|
||||
|
||||
@staticmethod
|
||||
def fromDict(d, o_id=None):
|
||||
def fromDict(d, image_name=None, o_id=None):
|
||||
'''
|
||||
Create an ImageBuild object from a dictionary.
|
||||
|
||||
|
@ -239,7 +240,7 @@ class ImageBuild(BaseModel):
|
|||
|
||||
:returns: An initialized ImageBuild object.
|
||||
'''
|
||||
o = ImageBuild(o_id)
|
||||
o = ImageBuild(image_name, o_id)
|
||||
super(ImageBuild, o).fromDict(d)
|
||||
o.builder = d.get('builder')
|
||||
o.builder_id = d.get('builder_id')
|
||||
|
@ -267,6 +268,23 @@ class ImageBuildRequest(object):
|
|||
return "<ImageBuildRequest {}>".format(self.image_name)
|
||||
|
||||
|
||||
class Image:
|
||||
"""Class representing an image.
|
||||
|
||||
This doesn't need to derive from BaseModel since it is not
|
||||
actually serialized to ZK. It exists to hold some in memory flags
|
||||
and attributes.
|
||||
|
||||
"""
|
||||
|
||||
def __init__(self, image_name):
|
||||
self.image_name = image_name
|
||||
self.paused = False
|
||||
|
||||
def updateFromDict(self, data):
|
||||
pass
|
||||
|
||||
|
||||
class ImageUpload(BaseModel):
|
||||
'''
|
||||
Class representing a provider image upload within the ZooKeeper cluster.
|
||||
|
@ -334,14 +352,18 @@ class ImageUpload(BaseModel):
|
|||
'''
|
||||
o = ImageUpload(build_id, provider_name, image_name, upload_id)
|
||||
super(ImageUpload, o).fromDict(d)
|
||||
o.external_id = d.get('external_id')
|
||||
o.external_name = d.get('external_name')
|
||||
o.format = d.get('format')
|
||||
o.username = d.get('username', 'zuul')
|
||||
o.python_path = d.get('python_path', '/usr/bin/python2')
|
||||
o.shell_type = d.get('shell_type')
|
||||
o.updateFromDict(d)
|
||||
return o
|
||||
|
||||
def updateFromDict(self, d):
|
||||
super().fromDict(d)
|
||||
self.external_id = d.get('external_id')
|
||||
self.external_name = d.get('external_name')
|
||||
self.format = d.get('format')
|
||||
self.username = d.get('username', 'zuul')
|
||||
self.python_path = d.get('python_path', '/usr/bin/python2')
|
||||
self.shell_type = d.get('shell_type')
|
||||
|
||||
|
||||
class NodeRequestLockStats(object):
|
||||
'''
|
||||
|
@ -698,6 +720,237 @@ class Node(BaseModel):
|
|||
self.requestor = d.get('requestor')
|
||||
|
||||
|
||||
class NodepoolTreeCache(abc.ABC):
|
||||
'''
|
||||
Use a ZK TreeCache to keep a cache of local Nodepool objects up to date.
|
||||
'''
|
||||
|
||||
log = logging.getLogger("nodepool.zk.ZooKeeper")
|
||||
|
||||
def __init__(self, zk, root):
|
||||
self.zk = zk
|
||||
self.root = root
|
||||
self._cached_objects = {}
|
||||
self._tree_cache = TreeCache(zk.kazoo_client,
|
||||
self.root)
|
||||
self._tree_cache.listen_fault(self.cacheFaultListener)
|
||||
self._tree_cache.listen(self.cacheListener)
|
||||
self._tree_cache.start()
|
||||
|
||||
def cacheFaultListener(self, e):
|
||||
self.log.exception(e)
|
||||
|
||||
def cacheListener(self, event):
|
||||
try:
|
||||
self._cacheListener(event)
|
||||
except Exception:
|
||||
self.log.exception(
|
||||
"Exception in cache update for event: %s",
|
||||
event)
|
||||
|
||||
def _cacheListener(self, event):
|
||||
if hasattr(event.event_data, 'path'):
|
||||
# Ignore root node
|
||||
path = event.event_data.path
|
||||
if path == self.root:
|
||||
return
|
||||
|
||||
# Ignore any non-node related events such as connection events here
|
||||
if event.event_type not in (TreeEvent.NODE_ADDED,
|
||||
TreeEvent.NODE_UPDATED,
|
||||
TreeEvent.NODE_REMOVED):
|
||||
return
|
||||
|
||||
# Some caches have special handling for certain sub-objects
|
||||
if self.preCacheHook(event):
|
||||
return
|
||||
|
||||
path = event.event_data.path
|
||||
key = self.parsePath(path)
|
||||
if key is None:
|
||||
return
|
||||
|
||||
if event.event_type in (TreeEvent.NODE_ADDED, TreeEvent.NODE_UPDATED):
|
||||
# Images with empty data are invalid so skip add or update these.
|
||||
if event.event_data.data:
|
||||
data = self.zk._bytesToDict(event.event_data.data)
|
||||
else:
|
||||
data = None
|
||||
|
||||
# Perform an in-place update of the cached image if possible
|
||||
old_obj = self._cached_objects.get(key)
|
||||
if old_obj:
|
||||
if event.event_data.stat.version <= old_obj.stat.version:
|
||||
# Don't update to older data
|
||||
return
|
||||
if getattr(old_obj, 'lock', None):
|
||||
# Don't update a locked object
|
||||
return
|
||||
old_obj.updateFromDict(data)
|
||||
old_obj.stat = event.event_data.stat
|
||||
else:
|
||||
obj = self.objectFromDict(data, key)
|
||||
obj.stat = event.event_data.stat
|
||||
self._cached_objects[key] = obj
|
||||
self.postCacheHook(event)
|
||||
elif event.event_type == TreeEvent.NODE_REMOVED:
|
||||
try:
|
||||
del self._cached_objects[key]
|
||||
except KeyError:
|
||||
# If it's already gone, don't care
|
||||
pass
|
||||
self.postCacheHook(event)
|
||||
|
||||
def close(self):
|
||||
self._tree_cache.close()
|
||||
|
||||
# Methods for subclasses:
|
||||
def preCacheHook(self, event):
|
||||
"""Called before the cache is updated
|
||||
|
||||
This is called for any add/update/remove event under the root,
|
||||
even for paths that are ignored, so users much test the
|
||||
relevance of the path in this method.
|
||||
|
||||
If True is returned, then the cache will stop processing the event.
|
||||
Other return values are ignored.
|
||||
"""
|
||||
return None
|
||||
|
||||
def postCacheHook(self, event):
|
||||
"""Called after the cache has been updated"""
|
||||
return None
|
||||
|
||||
@abc.abstractmethod
|
||||
def parsePath(self, path):
|
||||
"""Parse the path and return a cache key
|
||||
|
||||
The cache key is an opaque object ignored by the cache, but
|
||||
must be hashable.
|
||||
|
||||
A convention is to use a tuple of relevant path components as
|
||||
the key.
|
||||
|
||||
Return None to indicate the path is not relevant to the cache.
|
||||
|
||||
"""
|
||||
return None
|
||||
|
||||
@abc.abstractmethod
|
||||
def objectFromDict(self, d, key):
|
||||
"""Construct an object from ZooKeeper data
|
||||
|
||||
Given a dictionary of data from ZK and cache key, construct
|
||||
and return an object to insert into the cache.
|
||||
|
||||
:param dict d: The dictionary.
|
||||
:param object key: The key as returned by parsePath.
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
class ImageCache(NodepoolTreeCache):
|
||||
def parsePath(self, path):
|
||||
r = self.zk._parseImageUploadPath(path)
|
||||
if not r:
|
||||
r = self.zk._parseImageBuildPath(path)
|
||||
if not r:
|
||||
r = self.zk._parseImagePath(path)
|
||||
return r
|
||||
|
||||
def preCacheHook(self, event):
|
||||
key = self.zk._parseImagePausePath(event.event_data.path)
|
||||
if key is None:
|
||||
return
|
||||
# A pause flag is being added or removed
|
||||
# The image key is identical to the image pause path key.
|
||||
image = self._cached_objects.get(key)
|
||||
if not image:
|
||||
return
|
||||
if event.event_type in (TreeEvent.NODE_ADDED,
|
||||
TreeEvent.NODE_UPDATED):
|
||||
image.paused = True
|
||||
elif event.event_type == TreeEvent.NODE_REMOVED:
|
||||
image.paused = False
|
||||
# This event was for a paused path; no further handling necessary
|
||||
return True
|
||||
|
||||
def objectFromDict(self, d, key):
|
||||
if len(key) == 4:
|
||||
image, build_number, provider, upload_number = key
|
||||
return ImageUpload.fromDict(d,
|
||||
build_number,
|
||||
provider,
|
||||
image,
|
||||
upload_number)
|
||||
elif len(key) == 2:
|
||||
image, build_number = key
|
||||
return ImageBuild.fromDict(d,
|
||||
image,
|
||||
build_number)
|
||||
elif len(key) == 1:
|
||||
image = key[0]
|
||||
return Image(image)
|
||||
|
||||
|
||||
class NodeCache(NodepoolTreeCache):
|
||||
def parsePath(self, path):
|
||||
return self.zk._parseNodePath(path)
|
||||
|
||||
def preCacheHook(self, event):
|
||||
key = self.zk._parseNodeLockPath(event.event_data.path)
|
||||
if key is None:
|
||||
return
|
||||
# A lock contender is being added or removed
|
||||
node_id, contender = key
|
||||
# Construct a key for the node object
|
||||
obj_key = (node_id,)
|
||||
node = self._cached_objects.get(obj_key)
|
||||
if not node:
|
||||
return
|
||||
if event.event_type in (TreeEvent.NODE_ADDED,
|
||||
TreeEvent.NODE_UPDATED):
|
||||
node.lock_contenders.add(contender)
|
||||
elif event.event_type == TreeEvent.NODE_REMOVED:
|
||||
node.lock_contenders.discard(contender)
|
||||
# This event was for a lock path; no further handling necessary
|
||||
return True
|
||||
|
||||
def postCacheHook(self, event):
|
||||
# set the stats event so the stats reporting thread can act upon it
|
||||
if self.zk.node_stats_event is not None:
|
||||
self.zk.node_stats_event.set()
|
||||
|
||||
def objectFromDict(self, d, key):
|
||||
node_id = key[0]
|
||||
return Node.fromDict(d, node_id)
|
||||
|
||||
def getNode(self, node_id):
|
||||
return self._cached_objects.get((node_id,))
|
||||
|
||||
def getNodeIds(self):
|
||||
# get a copy of the values view to avoid runtime errors in the event
|
||||
# the _cached_nodes dict gets updated while iterating
|
||||
return [x.id for x in list(self._cached_objects.values())]
|
||||
|
||||
|
||||
class RequestCache(NodepoolTreeCache):
|
||||
def parsePath(self, path):
|
||||
return self.zk._parseRequestPath(path)
|
||||
|
||||
def objectFromDict(self, d, key):
|
||||
request_id = key[0]
|
||||
return NodeRequest.fromDict(d, request_id)
|
||||
|
||||
def getNodeRequest(self, request_id):
|
||||
return self._cached_objects.get((request_id,))
|
||||
|
||||
def getNodeRequestIds(self):
|
||||
# get a copy of the values view to avoid runtime errors in the event
|
||||
# the _cached_nodes dict gets updated while iterating
|
||||
return [x.id for x in list(self._cached_objects.values())]
|
||||
|
||||
|
||||
class ZooKeeper(ZooKeeperBase):
|
||||
'''
|
||||
Class implementing the ZooKeeper interface.
|
||||
|
@ -734,8 +987,7 @@ class ZooKeeper(ZooKeeperBase):
|
|||
self._last_retry_log = 0
|
||||
self._node_cache = None
|
||||
self._request_cache = None
|
||||
self._cached_nodes = {}
|
||||
self._cached_node_requests = {}
|
||||
self._image_cache = None
|
||||
self.enable_cache = enable_cache
|
||||
self.node_stats_event = None
|
||||
|
||||
|
@ -749,16 +1001,9 @@ class ZooKeeper(ZooKeeperBase):
|
|||
# =======================================================================
|
||||
def _onConnect(self):
|
||||
if self.enable_cache:
|
||||
self._node_cache = TreeCache(self.kazoo_client, self.NODE_ROOT)
|
||||
self._node_cache.listen_fault(self.cacheFaultListener)
|
||||
self._node_cache.listen(self.nodeCacheListener)
|
||||
self._node_cache.start()
|
||||
|
||||
self._request_cache = TreeCache(self.kazoo_client,
|
||||
self.REQUEST_ROOT)
|
||||
self._request_cache.listen_fault(self.cacheFaultListener)
|
||||
self._request_cache.listen(self.requestCacheListener)
|
||||
self._request_cache.start()
|
||||
self._node_cache = NodeCache(self, self.NODE_ROOT)
|
||||
self._request_cache = RequestCache(self, self.REQUEST_ROOT)
|
||||
self._image_cache = ImageCache(self, self.IMAGE_ROOT)
|
||||
|
||||
def _onDisconnect(self):
|
||||
if self._node_cache is not None:
|
||||
|
@ -769,12 +1014,28 @@ class ZooKeeper(ZooKeeperBase):
|
|||
self._request_cache.close()
|
||||
self._request_cache = None
|
||||
|
||||
if self._image_cache is not None:
|
||||
self._image_cache.close()
|
||||
self._image_cache = None
|
||||
|
||||
def _electionPath(self, election):
|
||||
return "%s/%s" % (self.ELECTION_ROOT, election)
|
||||
|
||||
def _imagePath(self, image):
|
||||
return "%s/%s" % (self.IMAGE_ROOT, image)
|
||||
|
||||
def _parseImagePath(self, path):
|
||||
if not path.startswith(self.IMAGE_ROOT):
|
||||
return None
|
||||
path = path[len(self.IMAGE_ROOT):]
|
||||
parts = path.split('/')
|
||||
if len(parts) != 2:
|
||||
return None
|
||||
if parts[-1] == 'lock':
|
||||
return
|
||||
image = parts[1]
|
||||
return (image,)
|
||||
|
||||
def _imageBuildRequestPath(self, image):
|
||||
return "%s/request-build" % self._imagePath(image)
|
||||
|
||||
|
@ -784,9 +1045,34 @@ class ZooKeeper(ZooKeeperBase):
|
|||
def _imagePausePath(self, image):
|
||||
return "%s/pause" % self._imagePath(image)
|
||||
|
||||
def _parseImagePausePath(self, path):
|
||||
if not path.startswith(self.IMAGE_ROOT):
|
||||
return None
|
||||
path = path[len(self.IMAGE_ROOT):]
|
||||
parts = path.split('/')
|
||||
if len(parts) != 3:
|
||||
return None
|
||||
if parts[2] != 'pause':
|
||||
return None
|
||||
image = parts[1]
|
||||
return (image,)
|
||||
|
||||
def _imageBuildNumberPath(self, image, build_number):
|
||||
return "%s/%s" % (self._imageBuildsPath(image), build_number)
|
||||
|
||||
def _parseImageBuildPath(self, path):
|
||||
if not path.startswith(self.IMAGE_ROOT):
|
||||
return None
|
||||
path = path[len(self.IMAGE_ROOT):]
|
||||
parts = path.split('/')
|
||||
if len(parts) != 4:
|
||||
return None
|
||||
if parts[-1] == 'lock':
|
||||
return
|
||||
image = parts[1]
|
||||
build = parts[3]
|
||||
return image, build
|
||||
|
||||
def _imageBuildLockPath(self, image):
|
||||
return "%s/lock" % self._imageBuildsPath(image)
|
||||
|
||||
|
@ -802,6 +1088,21 @@ class ZooKeeper(ZooKeeperBase):
|
|||
build_number,
|
||||
provider)
|
||||
|
||||
def _parseImageUploadPath(self, path):
|
||||
if not path.startswith(self.IMAGE_ROOT):
|
||||
return None
|
||||
path = path[len(self.IMAGE_ROOT):]
|
||||
parts = path.split('/')
|
||||
if len(parts) != 8:
|
||||
return None
|
||||
if parts[-1] == 'lock':
|
||||
return
|
||||
image = parts[1]
|
||||
build = parts[3]
|
||||
provider = parts[5]
|
||||
upload = parts[7]
|
||||
return image, build, provider, upload
|
||||
|
||||
def _imageUploadLockPath(self, image, build_number, provider):
|
||||
return "%s/lock" % self._imageUploadPath(image, build_number,
|
||||
provider)
|
||||
|
@ -818,12 +1119,49 @@ class ZooKeeper(ZooKeeperBase):
|
|||
def _nodePath(self, node):
|
||||
return "%s/%s" % (self.NODE_ROOT, node)
|
||||
|
||||
def _parseNodePath(self, path):
|
||||
if not path.startswith(self.NODE_ROOT):
|
||||
return None
|
||||
path = path[len(self.NODE_ROOT):]
|
||||
parts = path.split('/')
|
||||
if len(parts) != 2:
|
||||
return None
|
||||
if parts[-1] == 'lock':
|
||||
return
|
||||
node = parts[1]
|
||||
return (node,)
|
||||
|
||||
def _nodeLockPath(self, node):
|
||||
return "%s/%s/lock" % (self.NODE_ROOT, node)
|
||||
|
||||
def _parseNodeLockPath(self, path):
|
||||
if not path.startswith(self.NODE_ROOT):
|
||||
return None
|
||||
path = path[len(self.NODE_ROOT):]
|
||||
parts = path.split('/')
|
||||
if len(parts) != 4:
|
||||
return None
|
||||
if parts[2] != 'lock':
|
||||
return None
|
||||
node = parts[1]
|
||||
contender = parts[3]
|
||||
return (node, contender)
|
||||
|
||||
def _requestPath(self, request):
|
||||
return "%s/%s" % (self.REQUEST_ROOT, request)
|
||||
|
||||
def _parseRequestPath(self, path):
|
||||
if not path.startswith(self.REQUEST_ROOT):
|
||||
return None
|
||||
path = path[len(self.REQUEST_ROOT):]
|
||||
parts = path.split('/')
|
||||
if len(parts) != 2:
|
||||
return None
|
||||
if parts[-1] == 'lock':
|
||||
return
|
||||
request = parts[1]
|
||||
return (request,)
|
||||
|
||||
def _requestLockPath(self, request):
|
||||
return "%s/%s" % (self.REQUEST_LOCK_ROOT, request)
|
||||
|
||||
|
@ -1085,6 +1423,19 @@ class ZooKeeper(ZooKeeperBase):
|
|||
return []
|
||||
return sorted(images)
|
||||
|
||||
def getCachedImages(self):
|
||||
'''
|
||||
Retrieve all cached images
|
||||
|
||||
:returns: A list of Image objects.
|
||||
'''
|
||||
if not self.enable_cache:
|
||||
raise RuntimeError("Caching not enabled")
|
||||
items = self._image_cache._cached_objects.items()
|
||||
items = sorted(items, key=lambda x: x[0])
|
||||
return [x[1] for x in items
|
||||
if isinstance(x[1], Image)]
|
||||
|
||||
def getImagePaused(self, image):
|
||||
'''
|
||||
Return the pause flag for an image.
|
||||
|
@ -1190,7 +1541,8 @@ class ZooKeeper(ZooKeeperBase):
|
|||
return None
|
||||
|
||||
try:
|
||||
d = ImageBuild.fromDict(self._bytesToDict(data), build_number)
|
||||
d = ImageBuild.fromDict(self._bytesToDict(data),
|
||||
image, build_number)
|
||||
except json.decoder.JSONDecodeError:
|
||||
self.log.exception('Error loading json data from image build %s',
|
||||
path)
|
||||
|
@ -1228,6 +1580,19 @@ class ZooKeeper(ZooKeeperBase):
|
|||
|
||||
return matches
|
||||
|
||||
def getCachedBuilds(self):
|
||||
'''
|
||||
Retrieve all image build data from the cache
|
||||
|
||||
:returns: A list of ImageBuild objects.
|
||||
'''
|
||||
if not self.enable_cache:
|
||||
raise RuntimeError("Caching not enabled")
|
||||
items = self._image_cache._cached_objects.items()
|
||||
items = sorted(items, key=lambda x: x[0])
|
||||
return [x[1] for x in items
|
||||
if isinstance(x[1], ImageBuild)]
|
||||
|
||||
def getMostRecentBuilds(self, count, image, state=None):
|
||||
'''
|
||||
Retrieve the most recent image build data with the given state.
|
||||
|
@ -1422,6 +1787,19 @@ class ZooKeeper(ZooKeeperBase):
|
|||
|
||||
return recent_data
|
||||
|
||||
def getCachedImageUploads(self):
|
||||
'''
|
||||
Retrieve all image upload data from the cache
|
||||
|
||||
:returns: A list of ImageUpload objects.
|
||||
'''
|
||||
if not self.enable_cache:
|
||||
raise RuntimeError("Caching not enabled")
|
||||
items = self._image_cache._cached_objects.items()
|
||||
items = sorted(items, key=lambda x: x[0])
|
||||
return [x[1] for x in items
|
||||
if isinstance(x[1], ImageUpload)]
|
||||
|
||||
def storeImageUpload(self, image, build_number, provider, image_data,
|
||||
upload_number=None):
|
||||
'''
|
||||
|
@ -1662,8 +2040,8 @@ class ZooKeeper(ZooKeeperBase):
|
|||
|
||||
:returns: The request data, or None if the request was not found.
|
||||
'''
|
||||
if cached:
|
||||
d = self._cached_node_requests.get(request)
|
||||
if cached and self._request_cache:
|
||||
d = self._request_cache.getNodeRequest(request)
|
||||
if d:
|
||||
return d
|
||||
|
||||
|
@ -1901,8 +2279,8 @@ class ZooKeeper(ZooKeeperBase):
|
|||
|
||||
:returns: The node data, or None if the node was not found.
|
||||
'''
|
||||
if cached:
|
||||
d = self._cached_nodes.get(node)
|
||||
if cached and self._node_cache:
|
||||
d = self._node_cache.getNode(node)
|
||||
if d:
|
||||
return d
|
||||
|
||||
|
@ -2126,9 +2504,7 @@ class ZooKeeper(ZooKeeperBase):
|
|||
'''
|
||||
|
||||
if cached_ids:
|
||||
# get a copy of the keys view to avoid runtime errors in the event
|
||||
# the _cached_nodes dict gets updated while iterating
|
||||
node_ids = list(self._cached_nodes.keys())
|
||||
node_ids = self._node_cache.getNodeIds()
|
||||
else:
|
||||
node_ids = self.getNodes()
|
||||
|
||||
|
@ -2146,11 +2522,20 @@ class ZooKeeper(ZooKeeperBase):
|
|||
if lock_stats:
|
||||
yield lock_stats
|
||||
|
||||
def nodeRequestIterator(self, cached=True):
|
||||
def nodeRequestIterator(self, cached=True, cached_ids=False):
|
||||
'''
|
||||
Utility generator method for iterating through all nodes requests.
|
||||
|
||||
:param bool cached: True if the data should be taken from the cache.
|
||||
:param bool cached_ids: True if the request IDs should be taken from
|
||||
the cache.
|
||||
'''
|
||||
for req_id in self.getNodeRequests():
|
||||
if cached_ids:
|
||||
req_ids = self._request_cache.getNodeRequestIds()
|
||||
else:
|
||||
req_ids = self.getNodeRequests()
|
||||
|
||||
for req_id in req_ids:
|
||||
req = self.getNodeRequest(req_id, cached=cached)
|
||||
if req:
|
||||
yield req
|
||||
|
@ -2255,147 +2640,9 @@ class ZooKeeper(ZooKeeperBase):
|
|||
for node in provider_nodes:
|
||||
self.deleteNode(node)
|
||||
|
||||
def cacheFaultListener(self, e):
|
||||
self.log.exception(e)
|
||||
|
||||
def nodeCacheListener(self, event):
|
||||
try:
|
||||
self._nodeCacheListener(event)
|
||||
except Exception:
|
||||
self.log.exception("Exception in node cache update for event: %s",
|
||||
event)
|
||||
|
||||
def _nodeCacheListener(self, event):
|
||||
if hasattr(event.event_data, 'path'):
|
||||
# Ignore root node
|
||||
path = event.event_data.path
|
||||
if path == self.NODE_ROOT:
|
||||
return
|
||||
|
||||
if path.endswith('/lock'):
|
||||
return
|
||||
|
||||
# Ignore any non-node related events such as connection events here
|
||||
if event.event_type not in (TreeEvent.NODE_ADDED,
|
||||
TreeEvent.NODE_UPDATED,
|
||||
TreeEvent.NODE_REMOVED):
|
||||
return
|
||||
|
||||
path = event.event_data.path
|
||||
node_path = path[len(self.NODE_ROOT) + 1:]
|
||||
parts = node_path.split('/')
|
||||
node_id = parts[0]
|
||||
if len(parts) > 1 and parts[1] == 'lock':
|
||||
if len(parts) > 2:
|
||||
# A lock contender is being added or removed
|
||||
contender = parts[2]
|
||||
old_node = self._cached_nodes.get(node_id)
|
||||
if not old_node:
|
||||
return
|
||||
if event.event_type in (TreeEvent.NODE_ADDED,
|
||||
TreeEvent.NODE_UPDATED):
|
||||
old_node.lock_contenders.add(contender)
|
||||
elif event.event_type == TreeEvent.NODE_REMOVED:
|
||||
old_node.lock_contenders.discard(contender)
|
||||
# This event was for a lock path; no further handling necessary
|
||||
return
|
||||
|
||||
if event.event_type in (TreeEvent.NODE_ADDED, TreeEvent.NODE_UPDATED):
|
||||
# Nodes with empty data are invalid so skip add or update these.
|
||||
if not event.event_data.data:
|
||||
return
|
||||
|
||||
# Perform an in-place update of the already cached node if possible
|
||||
d = self._bytesToDict(event.event_data.data)
|
||||
old_node = self._cached_nodes.get(node_id)
|
||||
if old_node:
|
||||
if event.event_data.stat.version <= old_node.stat.version:
|
||||
# Don't update to older data
|
||||
return
|
||||
if old_node.lock:
|
||||
# Don't update a locked node
|
||||
return
|
||||
old_node.updateFromDict(d)
|
||||
old_node.stat = event.event_data.stat
|
||||
else:
|
||||
node = Node.fromDict(d, node_id)
|
||||
node.stat = event.event_data.stat
|
||||
self._cached_nodes[node_id] = node
|
||||
|
||||
# set the stats event so the stats reporting thread can act upon it
|
||||
if self.node_stats_event is not None:
|
||||
self.node_stats_event.set()
|
||||
elif event.event_type == TreeEvent.NODE_REMOVED:
|
||||
try:
|
||||
del self._cached_nodes[node_id]
|
||||
except KeyError:
|
||||
# If it's already gone, don't care
|
||||
pass
|
||||
|
||||
# set the stats event so the stats reporting thread can act upon it
|
||||
if self.node_stats_event is not None:
|
||||
self.node_stats_event.set()
|
||||
|
||||
def setNodeStatsEvent(self, event):
|
||||
self.node_stats_event = event
|
||||
|
||||
def requestCacheListener(self, event):
|
||||
try:
|
||||
self._requestCacheListener(event)
|
||||
except Exception:
|
||||
self.log.exception(
|
||||
"Exception in request cache update for event: %s",
|
||||
event)
|
||||
|
||||
def _requestCacheListener(self, event):
|
||||
if hasattr(event.event_data, 'path'):
|
||||
# Ignore root node
|
||||
path = event.event_data.path
|
||||
if path == self.REQUEST_ROOT:
|
||||
return
|
||||
|
||||
# Ignore lock nodes
|
||||
if '/lock' in path:
|
||||
return
|
||||
|
||||
# Ignore any non-node related events such as connection events here
|
||||
if event.event_type not in (TreeEvent.NODE_ADDED,
|
||||
TreeEvent.NODE_UPDATED,
|
||||
TreeEvent.NODE_REMOVED):
|
||||
return
|
||||
|
||||
path = event.event_data.path
|
||||
request_id = path.rsplit('/', 1)[1]
|
||||
|
||||
if event.event_type in (TreeEvent.NODE_ADDED, TreeEvent.NODE_UPDATED):
|
||||
# Requests with empty data are invalid so skip add or update these.
|
||||
if not event.event_data.data:
|
||||
return
|
||||
|
||||
# Perform an in-place update of the cached request if possible
|
||||
d = self._bytesToDict(event.event_data.data)
|
||||
old_request = self._cached_node_requests.get(request_id)
|
||||
if old_request:
|
||||
if event.event_data.stat.version <= old_request.stat.version:
|
||||
# Don't update to older data
|
||||
return
|
||||
if old_request.lock:
|
||||
# Don't update a locked node request
|
||||
return
|
||||
old_request.updateFromDict(d)
|
||||
old_request.stat = event.event_data.stat
|
||||
else:
|
||||
request = NodeRequest.fromDict(d, request_id)
|
||||
request.stat = event.event_data.stat
|
||||
self._cached_node_requests[request_id] = request
|
||||
|
||||
elif event.event_type == TreeEvent.NODE_REMOVED:
|
||||
try:
|
||||
del self._cached_node_requests[request_id]
|
||||
except KeyError:
|
||||
# If it's already gone, don't care
|
||||
pass
|
||||
|
||||
def getStatsElection(self, identifier):
|
||||
path = self._electionPath('stats')
|
||||
return Election(self.kazoo_client, path, identifier)
|
||||
|
|
Loading…
Reference in New Issue