Merge "Create config cache ltime before requesting files"
This commit is contained in:
commit
7bac2d6a1a
|
@ -4760,15 +4760,6 @@ class ZuulTestCase(BaseTestCase):
|
|||
with open(private_key_file, 'w') as o:
|
||||
o.write(i.read())
|
||||
|
||||
def getCurrentLtime(self):
|
||||
"""Get the logical timestamp as seen by the Zookeeper cluster."""
|
||||
result = self.zk_client.client.command(b"srvr")
|
||||
for line in result.splitlines():
|
||||
match = re.match(r"zxid:\s+0x(?P<zxid>[a-f0-9])", line, re.I)
|
||||
if match:
|
||||
return int(match.group("zxid"), 16)
|
||||
raise RuntimeError("Could not find zxid in Zookeeper srvr output")
|
||||
|
||||
def copyDirToRepo(self, project, source_path):
|
||||
self.init_repo(project)
|
||||
|
||||
|
|
|
@ -581,7 +581,7 @@ class TestUnparsedConfigCache(ZuulTestCase):
|
|||
common_cache = cache.getFilesCache("review.example.com/common-config",
|
||||
"master")
|
||||
tpc = tenant.project_configs["review.example.com/common-config"]
|
||||
self.assertTrue(common_cache.isValidFor(tpc, cache_ltime=-1))
|
||||
self.assertTrue(common_cache.isValidFor(tpc, min_ltime=-1))
|
||||
self.assertEqual(len(common_cache), 1)
|
||||
self.assertIn("zuul.yaml", common_cache)
|
||||
self.assertTrue(len(common_cache["zuul.yaml"]) > 0)
|
||||
|
@ -590,7 +590,7 @@ class TestUnparsedConfigCache(ZuulTestCase):
|
|||
"master")
|
||||
# Cache of org/project should be valid but empty (no in-repo config)
|
||||
tpc = tenant.project_configs["review.example.com/org/project"]
|
||||
self.assertTrue(project_cache.isValidFor(tpc, cache_ltime=-1))
|
||||
self.assertTrue(project_cache.isValidFor(tpc, min_ltime=-1))
|
||||
self.assertEqual(len(project_cache), 0)
|
||||
|
||||
def test_cache_use(self):
|
||||
|
@ -603,7 +603,7 @@ class TestUnparsedConfigCache(ZuulTestCase):
|
|||
|
||||
# Get the current ltime from Zookeeper and run a full reconfiguration,
|
||||
# so that we know all items in the cache have a larger ltime.
|
||||
ltime = self.getCurrentLtime()
|
||||
ltime = self.zk_client.getCurrentLtime()
|
||||
self.scheds.first.fullReconfigure()
|
||||
|
||||
# Clear the unparsed branch cache so all projects (except for
|
||||
|
|
|
@ -55,6 +55,15 @@ class ZooKeeperBaseTestCase(BaseTestCase):
|
|||
self.zk_client.connect()
|
||||
|
||||
|
||||
class TestZookeeperClient(ZooKeeperBaseTestCase):
|
||||
|
||||
def test_ltime(self):
|
||||
ltime = self.zk_client.getCurrentLtime()
|
||||
self.assertGreaterEqual(ltime, 0)
|
||||
self.assertIsInstance(ltime, int)
|
||||
self.assertGreater(self.zk_client.getCurrentLtime(), ltime)
|
||||
|
||||
|
||||
class TestNodepool(ZooKeeperBaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
|
@ -189,29 +198,39 @@ class TestUnparsedConfigCache(ZooKeeperBaseTestCase):
|
|||
tpc.extra_config_dirs = {"foo.d/", "bar.d/"}
|
||||
|
||||
master_files = self.config_cache.getFilesCache("project", "master")
|
||||
self.assertFalse(master_files.isValidFor(tpc, cache_ltime=-1))
|
||||
self.assertFalse(master_files.isValidFor(tpc, min_ltime=-1))
|
||||
|
||||
master_files.setValidFor(tpc.extra_config_files, tpc.extra_config_dirs)
|
||||
self.assertTrue(master_files.isValidFor(tpc, cache_ltime=-1))
|
||||
master_files.setValidFor(tpc.extra_config_files, tpc.extra_config_dirs,
|
||||
ltime=1)
|
||||
self.assertTrue(master_files.isValidFor(tpc, min_ltime=-1))
|
||||
|
||||
tpc.extra_config_files = set()
|
||||
tpc.extra_config_dirs = set()
|
||||
self.assertTrue(master_files.isValidFor(tpc, cache_ltime=-1))
|
||||
self.assertTrue(master_files.isValidFor(tpc, min_ltime=-1))
|
||||
self.assertFalse(master_files.isValidFor(tpc, min_ltime=2))
|
||||
|
||||
tpc.extra_config_files = {"bar.yaml"}
|
||||
tpc.extra_config_dirs = {"bar.d/"}
|
||||
# Valid for subset
|
||||
self.assertTrue(master_files.isValidFor(tpc, cache_ltime=-1))
|
||||
self.assertTrue(master_files.isValidFor(tpc, min_ltime=-1))
|
||||
|
||||
tpc.extra_config_files = {"foo.yaml", "bar.yaml"}
|
||||
tpc.extra_config_dirs = {"foo.d/", "bar.d/", "other.d/"}
|
||||
# Invalid for additional dirs
|
||||
self.assertFalse(master_files.isValidFor(tpc, cache_ltime=-1))
|
||||
self.assertFalse(master_files.isValidFor(tpc, min_ltime=-1))
|
||||
self.assertFalse(master_files.isValidFor(tpc, min_ltime=2))
|
||||
|
||||
tpc.extra_config_files = {"foo.yaml", "bar.yaml", "other.yaml"}
|
||||
tpc.extra_config_dirs = {"foo.d/", "bar.d/"}
|
||||
# Invalid for additional files
|
||||
self.assertFalse(master_files.isValidFor(tpc, cache_ltime=-1))
|
||||
self.assertFalse(master_files.isValidFor(tpc, min_ltime=-1))
|
||||
self.assertFalse(master_files.isValidFor(tpc, min_ltime=2))
|
||||
|
||||
def test_cache_ltime(self):
|
||||
cache = self.config_cache.getFilesCache("project", "master")
|
||||
self.assertEqual(cache.ltime, -1)
|
||||
cache.setValidFor(set(), set(), ltime=1)
|
||||
self.assertEqual(cache.ltime, 1)
|
||||
|
||||
def test_branch_cleanup(self):
|
||||
master_files = self.config_cache.getFilesCache("project", "master")
|
||||
|
|
|
@ -1802,6 +1802,7 @@ class TenantParser(object):
|
|||
|
||||
extra_config_files = abide.getExtraConfigFiles(project.name)
|
||||
extra_config_dirs = abide.getExtraConfigDirs(project.name)
|
||||
ltime = self.scheduler.zk_client.getCurrentLtime()
|
||||
job = self.merger.getFiles(
|
||||
project.source.connection.connection_name,
|
||||
project.name, branch,
|
||||
|
@ -1813,6 +1814,7 @@ class TenantParser(object):
|
|||
project.name, branch))
|
||||
job.extra_config_files = extra_config_files
|
||||
job.extra_config_dirs = extra_config_dirs
|
||||
job.ltime = ltime
|
||||
job.source_context = source_context
|
||||
jobs.append(job)
|
||||
branch_cache.setValidFor(tpc)
|
||||
|
@ -1842,7 +1844,8 @@ class TenantParser(object):
|
|||
if content is not None:
|
||||
files_cache[fn] = content
|
||||
files_cache.setValidFor(job.extra_config_files,
|
||||
job.extra_config_dirs)
|
||||
job.extra_config_dirs,
|
||||
job.ltime)
|
||||
|
||||
def _updateUnparsedBranchCache(self, abide, tenant, source_context, files,
|
||||
loading_errors):
|
||||
|
|
|
@ -17,6 +17,7 @@ from threading import Thread
|
|||
from typing import Optional, List, Callable
|
||||
|
||||
from kazoo.client import KazooClient
|
||||
from kazoo.exceptions import NoNodeError
|
||||
from kazoo.handlers.threading import KazooTimeoutError
|
||||
from kazoo.protocol.states import KazooState
|
||||
|
||||
|
@ -194,6 +195,15 @@ class ZooKeeperClient(object):
|
|||
if isinstance(res, Exception):
|
||||
raise res
|
||||
|
||||
def getCurrentLtime(self):
|
||||
"""Get the logical timestamp as seen by the Zookeeper cluster."""
|
||||
try:
|
||||
zstat = self.client.set("/zuul/ltime", b"")
|
||||
except NoNodeError:
|
||||
self.client.create("/zuul/ltime", b"", makepath=True)
|
||||
zstat = self.client.set("/zuul/ltime", b"")
|
||||
return zstat.last_modified_transaction_id
|
||||
|
||||
|
||||
class ZooKeeperSimpleBase(metaclass=ABCMeta):
|
||||
"""Base class for stateless Zookeeper interaction."""
|
||||
|
|
|
@ -42,11 +42,12 @@ class FilesCache(ZooKeeperSimpleBase, MutableMapping):
|
|||
self.root_path = root_path
|
||||
self.files_path = f"{root_path}/files"
|
||||
|
||||
def setValidFor(self, extra_config_files, extra_config_dirs):
|
||||
def setValidFor(self, extra_config_files, extra_config_dirs, ltime):
|
||||
"""Set the cache valid for the given extra config files/dirs."""
|
||||
data = {
|
||||
"extra_files_searched": list(extra_config_files),
|
||||
"extra_dirs_searched": list(extra_config_dirs),
|
||||
"ltime": ltime,
|
||||
}
|
||||
payload = json.dumps(data).encode("utf8")
|
||||
try:
|
||||
|
@ -54,7 +55,7 @@ class FilesCache(ZooKeeperSimpleBase, MutableMapping):
|
|||
except NoNodeError:
|
||||
self.kazoo_client.create(self.root_path, payload, makepath=True)
|
||||
|
||||
def isValidFor(self, tpc, cache_ltime):
|
||||
def isValidFor(self, tpc, min_ltime):
|
||||
"""Check if the cache is valid.
|
||||
|
||||
Check if the cache is valid for the given tenant project config
|
||||
|
@ -63,24 +64,34 @@ class FilesCache(ZooKeeperSimpleBase, MutableMapping):
|
|||
|
||||
"""
|
||||
try:
|
||||
data, zstat = self.kazoo_client.get(self.root_path)
|
||||
data, _ = self.kazoo_client.get(self.root_path)
|
||||
except NoNodeError:
|
||||
return False
|
||||
|
||||
if zstat.last_modified_transaction_id < cache_ltime:
|
||||
# Cache is outdated
|
||||
return False
|
||||
|
||||
try:
|
||||
content = json.loads(data)
|
||||
extra_files_searched = set(content["extra_files_searched"])
|
||||
extra_dirs_searched = set(content["extra_dirs_searched"])
|
||||
ltime = content["ltime"]
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
if ltime < min_ltime:
|
||||
# Cache is outdated
|
||||
return False
|
||||
|
||||
return (set(tpc.extra_config_files) <= extra_files_searched
|
||||
and set(tpc.extra_config_dirs) <= extra_dirs_searched)
|
||||
|
||||
@property
|
||||
def ltime(self):
|
||||
try:
|
||||
data, _ = self.kazoo_client.get(self.root_path)
|
||||
content = json.loads(data)
|
||||
return content["ltime"]
|
||||
except Exception:
|
||||
return -1
|
||||
|
||||
def _key_path(self, key):
|
||||
return _safe_path(self.files_path, key)
|
||||
|
||||
|
|
Loading…
Reference in New Issue