Track object versions in the Buildset object
This further reduces the number of ZK object reads during pipeline refreshes by tracking when builds and frozen jobs are updated. During the phases of a build where we know no updates can occur, we already avoid refreshing the Build and FrozenJob objects. But, for example, while a build is running we have to continually refresh it to see if it has completed. We can avoid this by recording expected version information in ZK and only refresh those objects if we know our local copy is out of date. We can store the latest ZK object version of FrozenJob and Build objects on the Buildset. On pipeline refresh, we currently always refresh the buildset object, which means that when we prepare to refresh the FrozenJob or Build objects underneath a Buildset, we will have information about the latest versions of those objects in ZK and can compare to the versions we currently have in memory to decide if we need to refresh them. This should reduce the number of reads in a pipeline refresh by about 50%. But it will cause more writes, in that we will update the Buildset object each time we modify one of its children. This may affect pipeline processing times but the impact should be very small. We will use version numbers (rather than transaction ids) because they are predictable, and updating the buildset first with the predicted next version before updating the child avoids issues caused by a crash between those two steps. Since it is typical for many objects to be created at once, we do optimize the case where the objects are initially created and we avoid making an update to the BuildSet in that case so that we don't repeatedly write the buildset object. Change-Id: I3824af6149bf27c41a8d895fc682236bd0d91f6b
This commit is contained in:
parent
cb40ddc7db
commit
a0ed933fa4
@ -106,3 +106,9 @@ Version 11
|
||||
|
||||
:Prior Zuul version: 8.0.1
|
||||
:Description: Adds merge_modes to branch cache. Affects schedulers and web.
|
||||
|
||||
Version 12
|
||||
----------
|
||||
:Prior Zuul version: 8.0.1
|
||||
:Description: Adds job_versions and build_versions to BuildSet.
|
||||
Affects schedulers.
|
||||
|
@ -32,7 +32,9 @@ import zuul.lib.connections
|
||||
from tests.base import BaseTestCase, FIXTURE_DIR
|
||||
from zuul.lib.ansible import AnsibleManager
|
||||
from zuul.lib import tracing
|
||||
from zuul.model_api import MODEL_API
|
||||
from zuul.zk.zkobject import LocalZKContext
|
||||
from zuul.zk.components import COMPONENT_REGISTRY
|
||||
from zuul import change_matcher
|
||||
|
||||
|
||||
@ -44,6 +46,8 @@ class Dummy(object):
|
||||
|
||||
class TestJob(BaseTestCase):
|
||||
def setUp(self):
|
||||
COMPONENT_REGISTRY.registry = Dummy()
|
||||
COMPONENT_REGISTRY.registry.model_api = MODEL_API
|
||||
self._env_fixture = self.useFixture(
|
||||
fixtures.EnvironmentVariable('HISTTIMEFORMAT', '%Y-%m-%dT%T%z '))
|
||||
super(TestJob, self).setUp()
|
||||
|
@ -254,6 +254,45 @@ class TestModelUpgrade(ZuulTestCase):
|
||||
result='SUCCESS', changes='1,1'),
|
||||
], ordered=False)
|
||||
|
||||
@model_version(11)
|
||||
def test_model_11_12(self):
|
||||
# This excercises the upgrade to store build/job versions
|
||||
first = self.scheds.first
|
||||
second = self.createScheduler()
|
||||
second.start()
|
||||
self.assertEqual(len(self.scheds), 2)
|
||||
for _ in iterate_timeout(10, "until priming is complete"):
|
||||
state_one = first.sched.local_layout_state.get("tenant-one")
|
||||
if state_one:
|
||||
break
|
||||
|
||||
for _ in iterate_timeout(
|
||||
10, "all schedulers to have the same layout state"):
|
||||
if (second.sched.local_layout_state.get(
|
||||
"tenant-one") == state_one):
|
||||
break
|
||||
|
||||
self.executor_server.hold_jobs_in_build = True
|
||||
with second.sched.layout_update_lock, second.sched.run_handler_lock:
|
||||
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
|
||||
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
||||
self.waitUntilSettled(matcher=[first])
|
||||
|
||||
self.model_test_component_info.model_api = 12
|
||||
with first.sched.layout_update_lock, first.sched.run_handler_lock:
|
||||
self.executor_server.hold_jobs_in_build = False
|
||||
self.executor_server.release()
|
||||
self.waitUntilSettled(matcher=[second])
|
||||
|
||||
self.waitUntilSettled()
|
||||
self.assertHistory([
|
||||
dict(name='project-merge', result='SUCCESS', changes='1,1'),
|
||||
dict(name='project-test1', result='SUCCESS', changes='1,1'),
|
||||
dict(name='project-test2', result='SUCCESS', changes='1,1'),
|
||||
dict(name='project1-project2-integration',
|
||||
result='SUCCESS', changes='1,1'),
|
||||
], ordered=False)
|
||||
|
||||
|
||||
class TestGithubModelUpgrade(ZuulTestCase):
|
||||
config_file = 'zuul-github-driver.conf'
|
||||
|
@ -2394,6 +2394,12 @@ class FrozenJob(zkobject.ZKObject):
|
||||
data['_' + job_data_key] = None
|
||||
return data
|
||||
|
||||
def _save(self, context, *args, **kw):
|
||||
# Before saving, update the buildset with the new job version
|
||||
# so that future readers know to refresh it.
|
||||
self.buildset.updateJobVersion(context, self)
|
||||
return super()._save(context, *args, **kw)
|
||||
|
||||
def setWaitingStatus(self, status):
|
||||
if self.waiting_status == status:
|
||||
return
|
||||
@ -3876,6 +3882,12 @@ class Build(zkobject.ZKObject):
|
||||
def getPath(self):
|
||||
return f"{self.job.getPath()}/build/{self.uuid}"
|
||||
|
||||
def _save(self, context, *args, **kw):
|
||||
# Before saving, update the buildset with the new job version
|
||||
# so that future readers know to refresh it.
|
||||
self.job.buildset.updateBuildVersion(context, self)
|
||||
return super()._save(context, *args, **kw)
|
||||
|
||||
def __repr__(self):
|
||||
return ('<Build %s of %s voting:%s>' %
|
||||
(self.uuid, self.job.name, self.job.voting))
|
||||
@ -4086,6 +4098,8 @@ class BuildSet(zkobject.ZKObject):
|
||||
job_graph=None,
|
||||
jobs={},
|
||||
deduplicated_jobs=[],
|
||||
job_versions={},
|
||||
build_versions={},
|
||||
# Cached job graph of previous layout; not serialized
|
||||
_old_job_graph=None,
|
||||
_old_jobs={},
|
||||
@ -4197,6 +4211,8 @@ class BuildSet(zkobject.ZKObject):
|
||||
"configured_time": self.configured_time,
|
||||
"start_time": self.start_time,
|
||||
"repo_state_request_time": self.repo_state_request_time,
|
||||
"job_versions": self.job_versions,
|
||||
"build_versions": self.build_versions,
|
||||
# jobs (serialize as separate objects)
|
||||
}
|
||||
return json.dumps(data, sort_keys=True).encode("utf8")
|
||||
@ -4294,7 +4310,8 @@ class BuildSet(zkobject.ZKObject):
|
||||
|
||||
if job_name in self.jobs:
|
||||
job = self.jobs[job_name]
|
||||
if not old_build_exists:
|
||||
if ((not old_build_exists) or
|
||||
self.shouldRefreshJob(job)):
|
||||
tpe_jobs.append((None, job_name,
|
||||
tpe.submit(job.refresh, context)))
|
||||
else:
|
||||
@ -4306,7 +4323,8 @@ class BuildSet(zkobject.ZKObject):
|
||||
build = self.builds.get(job_name)
|
||||
builds[job_name] = build
|
||||
if build and build.getPath() == build_path:
|
||||
if not build.result:
|
||||
if ((not build.result) or
|
||||
self.shouldRefreshBuild(build)):
|
||||
tpe_jobs.append((
|
||||
None, job_name, tpe.submit(
|
||||
build.refresh, context)))
|
||||
@ -4361,6 +4379,54 @@ class BuildSet(zkobject.ZKObject):
|
||||
})
|
||||
return data
|
||||
|
||||
def updateBuildVersion(self, context, build):
|
||||
# It's tempting to update versions regardless of the model
|
||||
# API, but if we start writing versions before all components
|
||||
# are upgraded we could get out of sync.
|
||||
if (COMPONENT_REGISTRY.model_api < 12):
|
||||
return True
|
||||
|
||||
# It is common for a lot of builds/jobs to be added at once,
|
||||
# so to avoid writing this buildset object repeatedly during
|
||||
# that time, we only update the version after the initial
|
||||
# creation.
|
||||
version = build.getZKVersion()
|
||||
# If zstat is None, we created the object
|
||||
if version is not None:
|
||||
versions = self.build_versions.copy()
|
||||
versions[build.uuid] = version + 1
|
||||
self.updateAttributes(context, build_versions=versions)
|
||||
|
||||
def updateJobVersion(self, context, job):
|
||||
if (COMPONENT_REGISTRY.model_api < 12):
|
||||
return True
|
||||
|
||||
version = job.getZKVersion()
|
||||
if version is not None:
|
||||
versions = self.job_versions.copy()
|
||||
versions[job.name] = version + 1
|
||||
self.updateAttributes(context, job_versions=versions)
|
||||
|
||||
def shouldRefreshBuild(self, build):
|
||||
# Unless all schedulers are updating versions, we can't trust
|
||||
# the data.
|
||||
if (COMPONENT_REGISTRY.model_api < 12):
|
||||
return True
|
||||
current = build.getZKVersion()
|
||||
if current is None:
|
||||
current = -1
|
||||
expected = self.build_versions.get(build.uuid, 0)
|
||||
return expected > current
|
||||
|
||||
def shouldRefreshJob(self, job):
|
||||
if (COMPONENT_REGISTRY.model_api < 12):
|
||||
return True
|
||||
current = job.getZKVersion()
|
||||
if current is None:
|
||||
current = -1
|
||||
expected = self.job_versions.get(job.name, 0)
|
||||
return expected > current
|
||||
|
||||
@property
|
||||
def ref(self):
|
||||
# NOTE(jamielennox): The concept of buildset ref is to be removed and a
|
||||
|
@ -13,5 +13,5 @@
|
||||
# under the License.
|
||||
|
||||
# When making ZK schema changes, increment this and add a record to
|
||||
# docs/developer/model-changelog.rst
|
||||
MODEL_API = 11
|
||||
# doc/source/developer/model-changelog.rst
|
||||
MODEL_API = 12
|
||||
|
@ -308,6 +308,17 @@ class ZKObject:
|
||||
|
||||
return (compressed_size, uncompressed_size)
|
||||
|
||||
def getZKVersion(self):
|
||||
"""Return the ZK version of the object as of the last load/refresh.
|
||||
|
||||
Returns None if the object is newly created.
|
||||
"""
|
||||
zstat = getattr(self, '_zstat', None)
|
||||
# If zstat is None, we created the object
|
||||
if zstat is None:
|
||||
return None
|
||||
return zstat.version
|
||||
|
||||
# Private methods below
|
||||
|
||||
def _retry(self, context, func, *args, max_tries=-1, **kw):
|
||||
|
Loading…
x
Reference in New Issue
Block a user