From a0ed933fa432fd93b34deeac77c9d45fda34f432 Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Mon, 19 Dec 2022 14:01:05 -0800 Subject: [PATCH] Track object versions in the Buildset object This further reduces the number of ZK object reads during pipeline refreshes by tracking when builds and frozen jobs are updated. During the phases of a build where we know no updates can occur, we already avoid refreshing the Build and FrozenJob objects. But, for example, while a build is running we have to continually refresh it to see if it has completed. We can avoid this by recording expected version information in ZK and only refresh those objects if we know our local copy is out of date. We can store the latest ZK object version of FrozenJob and Build objects on the Buildset. On pipeline refresh, we currently always refresh the buildset object, which means that when we prepare to refresh the FrozenJob or Build objects underneath a Buildset, we will have information about the latest versions of those objects in ZK and can compare to the versions we currently have in memory to decide if we need to refresh them. This should reduce the number of reads in a pipeline refresh by about 50%. But it will cause more writes, in that we will update the Buildset object each time we modify one of its children. This may affect pipeline processing times but the impact should be very small. We will use version numbers (rather than transaction ids) because they are predictable, and updating the buildset first with the predicted next version before updating the child avoids issues caused by a crash between those two steps. Since it is typical for many objects to be created at once, we do optimize the case where the objects are initially created and we avoid making an update to the BuildSet in that case so that we don't repeatedly write the buildset object. Change-Id: I3824af6149bf27c41a8d895fc682236bd0d91f6b --- doc/source/developer/model-changelog.rst | 6 ++ tests/unit/test_model.py | 4 ++ tests/unit/test_model_upgrade.py | 39 +++++++++++++ zuul/model.py | 70 +++++++++++++++++++++++- zuul/model_api.py | 4 +- zuul/zk/zkobject.py | 11 ++++ 6 files changed, 130 insertions(+), 4 deletions(-) diff --git a/doc/source/developer/model-changelog.rst b/doc/source/developer/model-changelog.rst index a14ff98956..b80979362d 100644 --- a/doc/source/developer/model-changelog.rst +++ b/doc/source/developer/model-changelog.rst @@ -106,3 +106,9 @@ Version 11 :Prior Zuul version: 8.0.1 :Description: Adds merge_modes to branch cache. Affects schedulers and web. + +Version 12 +---------- +:Prior Zuul version: 8.0.1 +:Description: Adds job_versions and build_versions to BuildSet. + Affects schedulers. diff --git a/tests/unit/test_model.py b/tests/unit/test_model.py index aed40b94e7..98f971948b 100644 --- a/tests/unit/test_model.py +++ b/tests/unit/test_model.py @@ -32,7 +32,9 @@ import zuul.lib.connections from tests.base import BaseTestCase, FIXTURE_DIR from zuul.lib.ansible import AnsibleManager from zuul.lib import tracing +from zuul.model_api import MODEL_API from zuul.zk.zkobject import LocalZKContext +from zuul.zk.components import COMPONENT_REGISTRY from zuul import change_matcher @@ -44,6 +46,8 @@ class Dummy(object): class TestJob(BaseTestCase): def setUp(self): + COMPONENT_REGISTRY.registry = Dummy() + COMPONENT_REGISTRY.registry.model_api = MODEL_API self._env_fixture = self.useFixture( fixtures.EnvironmentVariable('HISTTIMEFORMAT', '%Y-%m-%dT%T%z ')) super(TestJob, self).setUp() diff --git a/tests/unit/test_model_upgrade.py b/tests/unit/test_model_upgrade.py index f392e8c3e6..a5a49bed4f 100644 --- a/tests/unit/test_model_upgrade.py +++ b/tests/unit/test_model_upgrade.py @@ -254,6 +254,45 @@ class TestModelUpgrade(ZuulTestCase): result='SUCCESS', changes='1,1'), ], ordered=False) + @model_version(11) + def test_model_11_12(self): + # This excercises the upgrade to store build/job versions + first = self.scheds.first + second = self.createScheduler() + second.start() + self.assertEqual(len(self.scheds), 2) + for _ in iterate_timeout(10, "until priming is complete"): + state_one = first.sched.local_layout_state.get("tenant-one") + if state_one: + break + + for _ in iterate_timeout( + 10, "all schedulers to have the same layout state"): + if (second.sched.local_layout_state.get( + "tenant-one") == state_one): + break + + self.executor_server.hold_jobs_in_build = True + with second.sched.layout_update_lock, second.sched.run_handler_lock: + A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A') + self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1)) + self.waitUntilSettled(matcher=[first]) + + self.model_test_component_info.model_api = 12 + with first.sched.layout_update_lock, first.sched.run_handler_lock: + self.executor_server.hold_jobs_in_build = False + self.executor_server.release() + self.waitUntilSettled(matcher=[second]) + + self.waitUntilSettled() + self.assertHistory([ + dict(name='project-merge', result='SUCCESS', changes='1,1'), + dict(name='project-test1', result='SUCCESS', changes='1,1'), + dict(name='project-test2', result='SUCCESS', changes='1,1'), + dict(name='project1-project2-integration', + result='SUCCESS', changes='1,1'), + ], ordered=False) + class TestGithubModelUpgrade(ZuulTestCase): config_file = 'zuul-github-driver.conf' diff --git a/zuul/model.py b/zuul/model.py index ac38c37055..31c7f8a4b0 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -2394,6 +2394,12 @@ class FrozenJob(zkobject.ZKObject): data['_' + job_data_key] = None return data + def _save(self, context, *args, **kw): + # Before saving, update the buildset with the new job version + # so that future readers know to refresh it. + self.buildset.updateJobVersion(context, self) + return super()._save(context, *args, **kw) + def setWaitingStatus(self, status): if self.waiting_status == status: return @@ -3876,6 +3882,12 @@ class Build(zkobject.ZKObject): def getPath(self): return f"{self.job.getPath()}/build/{self.uuid}" + def _save(self, context, *args, **kw): + # Before saving, update the buildset with the new job version + # so that future readers know to refresh it. + self.job.buildset.updateBuildVersion(context, self) + return super()._save(context, *args, **kw) + def __repr__(self): return ('' % (self.uuid, self.job.name, self.job.voting)) @@ -4086,6 +4098,8 @@ class BuildSet(zkobject.ZKObject): job_graph=None, jobs={}, deduplicated_jobs=[], + job_versions={}, + build_versions={}, # Cached job graph of previous layout; not serialized _old_job_graph=None, _old_jobs={}, @@ -4197,6 +4211,8 @@ class BuildSet(zkobject.ZKObject): "configured_time": self.configured_time, "start_time": self.start_time, "repo_state_request_time": self.repo_state_request_time, + "job_versions": self.job_versions, + "build_versions": self.build_versions, # jobs (serialize as separate objects) } return json.dumps(data, sort_keys=True).encode("utf8") @@ -4294,7 +4310,8 @@ class BuildSet(zkobject.ZKObject): if job_name in self.jobs: job = self.jobs[job_name] - if not old_build_exists: + if ((not old_build_exists) or + self.shouldRefreshJob(job)): tpe_jobs.append((None, job_name, tpe.submit(job.refresh, context))) else: @@ -4306,7 +4323,8 @@ class BuildSet(zkobject.ZKObject): build = self.builds.get(job_name) builds[job_name] = build if build and build.getPath() == build_path: - if not build.result: + if ((not build.result) or + self.shouldRefreshBuild(build)): tpe_jobs.append(( None, job_name, tpe.submit( build.refresh, context))) @@ -4361,6 +4379,54 @@ class BuildSet(zkobject.ZKObject): }) return data + def updateBuildVersion(self, context, build): + # It's tempting to update versions regardless of the model + # API, but if we start writing versions before all components + # are upgraded we could get out of sync. + if (COMPONENT_REGISTRY.model_api < 12): + return True + + # It is common for a lot of builds/jobs to be added at once, + # so to avoid writing this buildset object repeatedly during + # that time, we only update the version after the initial + # creation. + version = build.getZKVersion() + # If zstat is None, we created the object + if version is not None: + versions = self.build_versions.copy() + versions[build.uuid] = version + 1 + self.updateAttributes(context, build_versions=versions) + + def updateJobVersion(self, context, job): + if (COMPONENT_REGISTRY.model_api < 12): + return True + + version = job.getZKVersion() + if version is not None: + versions = self.job_versions.copy() + versions[job.name] = version + 1 + self.updateAttributes(context, job_versions=versions) + + def shouldRefreshBuild(self, build): + # Unless all schedulers are updating versions, we can't trust + # the data. + if (COMPONENT_REGISTRY.model_api < 12): + return True + current = build.getZKVersion() + if current is None: + current = -1 + expected = self.build_versions.get(build.uuid, 0) + return expected > current + + def shouldRefreshJob(self, job): + if (COMPONENT_REGISTRY.model_api < 12): + return True + current = job.getZKVersion() + if current is None: + current = -1 + expected = self.job_versions.get(job.name, 0) + return expected > current + @property def ref(self): # NOTE(jamielennox): The concept of buildset ref is to be removed and a diff --git a/zuul/model_api.py b/zuul/model_api.py index 6c93a51773..ccb12077d5 100644 --- a/zuul/model_api.py +++ b/zuul/model_api.py @@ -13,5 +13,5 @@ # under the License. # When making ZK schema changes, increment this and add a record to -# docs/developer/model-changelog.rst -MODEL_API = 11 +# doc/source/developer/model-changelog.rst +MODEL_API = 12 diff --git a/zuul/zk/zkobject.py b/zuul/zk/zkobject.py index 73adf59549..b228ecaa4a 100644 --- a/zuul/zk/zkobject.py +++ b/zuul/zk/zkobject.py @@ -308,6 +308,17 @@ class ZKObject: return (compressed_size, uncompressed_size) + def getZKVersion(self): + """Return the ZK version of the object as of the last load/refresh. + + Returns None if the object is newly created. + """ + zstat = getattr(self, '_zstat', None) + # If zstat is None, we created the object + if zstat is None: + return None + return zstat.version + # Private methods below def _retry(self, context, func, *args, max_tries=-1, **kw):