Remove onChangeUpdated method

The onChangeUpdated method is called every time a change is updated.
Its purpose is to iterate over every change in the cache and, if the
updated change is a commit dependency (Depends-On) of the other change,
to set a flag telling the queue processor that changes' dependencies
are out of date and need to be refreshed.  The idea is that if this
happens, the queue processor should notice the dependencies no longer
match and dequeue it.

This became quite slow with the change cache in ZK.

Instead, we can do the following:

* Make sure that every event regarding a change is forwarded to every
  pipeline with that change in it (including non-live changes).

* When processing pipeline trigger events, refresh the dependencies
  of any changes that need to be updated.

There are a few cases that need to be covered in that last point
(but remember, all of this only applies to Depends-on footers which
are in the commit_needs variable):

* Changes to depends-on in gerrit necessarily mean a new patchset
  of the change with the depends-on header, so in the B->A situation,
  if B is updated, it will be re-enqueued.

  * Note that normally the orignal B->A series is dequeued, unless
    dequeue-on-new-patchset is false.  In that case it will remain
    and we will have B->A and B1->A, which is what the user asked for.

  To handle this case, we only need to refresh the deps of the change
  which is updated.

* In a C->B->A situation in gerrit, if B is updated then regardless
  of whether dequeue-on-new-patchset is true or false, the series should
  be dequeued because the dependency changed (and so the test setup
  is incorrect).

  To handle this case, we need to refresh the deps of any change whose
  commit_needs point to the updated change.

* Changes to depends-on in github don't create new patchsets, so we
  only need to refresh the deps of the change which is updated.  Because
  the changes it points to are still valid changes, they will have
  their own deps refreshed in place.

Change-Id: Ifc40ff5583a14c7f015356b963e1ba354974e1c8
This commit is contained in:
James E. Blair 2021-09-22 19:52:39 -07:00
parent 025356aeda
commit cd4f7539c1
9 changed files with 69 additions and 61 deletions

View File

@ -902,8 +902,6 @@ class GerritConnection(ZKChangeCacheMixin, BaseConnection):
if not change.is_merged:
self._updateChangeDependencies(log, change, data, event, history)
self.sched.onChangeUpdated(change, event)
return change
def _updateChangeDependencies(self, log, change, data, event, history):

View File

@ -1306,9 +1306,6 @@ class GithubConnection(ZKChangeCacheMixin, CachedBranchConnection):
change = self._change_cache.updateChangeWithRetry(
key, change, _update_change)
if self.sched:
self.sched.onChangeUpdated(change, event)
finally:
# We need to remove the lock here again so we don't leak
# them.

View File

@ -542,9 +542,6 @@ class GitlabConnection(ZKChangeCacheMixin, CachedBranchConnection):
change = self._change_cache.updateChangeWithRetry(key, change,
_update_change)
if self.sched:
self.sched.onChangeUpdated(change, event)
except Exception:
self.log.warning("Deleting cache key %s due to exception", key)
self._change_cache.delete(key)

View File

@ -621,9 +621,6 @@ class PagureConnection(ZKChangeCacheMixin, BaseConnection):
change = self._change_cache.updateChangeWithRetry(key, change,
_update_change)
if self.sched:
self.sched.onChangeUpdated(change, event)
except Exception:
self.log.warning("Deleting cache key %s due to exception", key)
self._change_cache.delete(key)

View File

@ -199,6 +199,13 @@ class PipelineManager(metaclass=ABCMeta):
return True
return False
def isAnyVersionOfChangeInPipeline(self, change):
# Checks any items in the pipeline
for item in self.pipeline.getAllItems():
if change.stable_id == item.change.stable_id:
return True
return False
def isChangeAlreadyInQueue(self, change, change_queue):
# Checks any item in the specified change queue
for item in change_queue.queue:
@ -206,6 +213,22 @@ class PipelineManager(metaclass=ABCMeta):
return True
return False
def refreshDeps(self, change, event):
if not isinstance(change, model.Change):
return
for item in self.pipeline.getAllItems():
# TODO: with structured-data keys (so we can compare the
# stable_id), we might be able to do more of this without
# going to ZK.
for connection_name, key in item.change.commit_needs_changes:
source = self.sched.connections.getSource(connection_name)
dep = source.getChangeByKey(key)
if (dep.stable_id == change.stable_id):
self.updateCommitDependencies(item.change, None, event)
self.updateCommitDependencies(change, None, event)
def reportEnqueue(self, item):
if not self.pipeline._disabled:
self.log.info("Reporting enqueue, action %s item %s" %
@ -422,6 +445,13 @@ class PipelineManager(metaclass=ABCMeta):
change)
return False
# We know this change isn't in this pipeline, but it may be in
# others. If it is, then presumably its commit_needs are up
# to date and this is a noop; otherwise, we need to refresh
# them anyway.
if isinstance(change, model.Change):
self.updateCommitDependencies(change, None, event)
with self.getChangeQueue(change, event, change_queue) as change_queue:
if not change_queue:
log.debug("Unable to find change queue for "
@ -661,12 +691,13 @@ class PipelineManager(metaclass=ABCMeta):
if dep and (not dep.is_merged) and dep not in dependencies:
log.debug(" Adding dependency: %s", dep)
dependencies.append(dep)
source = self.sched.connections.getSource(
change.project.connection_name)
source.setChangeAttributes(
change,
commit_needs_changes=[d.cache_key for d in dependencies],
refresh_deps=False)
new_commit_needs_changes = [d.cache_key for d in dependencies]
if change.commit_needs_changes != new_commit_needs_changes:
source = self.sched.connections.getSource(
change.project.connection_name)
source.setChangeAttributes(
change,
commit_needs_changes=new_commit_needs_changes)
def provisionNodes(self, item):
log = item.annotateLogger(self.log)

View File

@ -173,9 +173,6 @@ class DependentPipelineManager(SharedQueuePipelineManager):
# Return true if okay to proceed enqueing this change,
# false if the change should not be enqueued.
log.debug("Checking for changes needed by %s:" % change)
if (hasattr(change, 'commit_needs_changes') and
(change.refresh_deps or change.commit_needs_changes is None)):
self.updateCommitDependencies(change, change_queue, event)
if not hasattr(change, 'needs_changes'):
log.debug(" %s does not support dependencies", type(change))
return True

View File

@ -76,9 +76,6 @@ class IndependentPipelineManager(PipelineManager):
log.debug("Checking for changes needed by %s:" % change)
# Return true if okay to proceed enqueing this change,
# false if the change should not be enqueued.
if (hasattr(change, 'commit_needs_changes') and
(change.refresh_deps or change.commit_needs_changes is None)):
self.updateCommitDependencies(change, None, event)
if not hasattr(change, 'needs_changes'):
log.debug(" %s does not support dependencies" % type(change))
return True

View File

@ -3595,6 +3595,17 @@ class Ref(object):
def cache_version(self):
return -1 if self.cache_stat is None else self.cache_stat.version
@property
def stable_id(self):
# The identifier for this ref/change/thing that doesn't change
# even if it's updated. Something like the ref itself, or the
# gerrit change id/number, or the github pull request number.
# Stable across patchsets and pushes.
return {
"project": self.project.canonical_name,
"ref": self.ref,
}
def serialize(self):
return {
"project": self.project.name,
@ -3755,7 +3766,6 @@ class Change(Branch):
# Changes that the pipeline manager determined are needed due
# to Depends-On headers (all drivers):
self.commit_needs_changes = None
self.refresh_deps = False
self.is_current_patchset = True
self.can_merge = False
@ -3790,7 +3800,6 @@ class Change(Branch):
None if data.get("commit_needs_changes") is None
else [tuple(k) for k in data.get("commit_needs_changes", [])]
)
self.refresh_deps = data.get("refresh_deps", False)
self.is_current_patchset = data.get("is_current_patchset", True)
self.can_merge = data.get("can_merge", False)
self.is_merged = data.get("is_merged", False)
@ -3800,6 +3809,17 @@ class Change(Branch):
self.message = data.get("message")
self.commit_id = data.get("commit_id")
@property
def stable_id(self):
# The identifier for this ref/change/thing that doesn't change
# even if it's updated. Something like the ref itself, or the
# gerrit change id/number, or the github pull request number.
# Stable across patchsets and pushes.
return {
"project": self.project.canonical_name,
"number": self.number,
}
def serialize(self):
d = super().serialize()
d.update({
@ -3812,7 +3832,6 @@ class Change(Branch):
"compat_needs_changes": self.compat_needs_changes,
"compat_needed_by_changes": self.git_needed_by_changes,
"commit_needs_changes": self.commit_needs_changes,
"refresh_deps": self.refresh_deps,
"is_current_patchset": self.is_current_patchset,
"can_merge": self.can_merge,
"is_merged": self.is_merged,

View File

@ -1714,10 +1714,7 @@ class Scheduler(threading.Thread):
for pipeline in tenant.layout.pipelines.values():
if (
pipeline.manager.eventMatches(event, change)
or pipeline.manager.isChangeAlreadyInPipeline(change)
or pipeline.manager.findOldVersionOfChangeAlreadyInQueue(
change
)
or pipeline.manager.isAnyVersionOfChangeInPipeline(change)
):
self.pipeline_trigger_events[tenant.name][
pipeline.name
@ -1751,9 +1748,16 @@ class Scheduler(threading.Thread):
e.change, project.source)
return
# Let the pipeline update any dependencies that may need
# refreshing if this change has updated.
# TODO: We only really need to run this if we have a new
# patchest (isPatchsetCreated()) or the depends-on list in the
# PR body has changed (but we don't have a way to test that yet).
pipeline.manager.refreshDeps(change, event)
if event.isPatchsetCreated():
pipeline.manager.removeOldVersionsOfChange(
change, event)
pipeline.manager.removeOldVersionsOfChange(change, event)
elif event.isChangeAbandoned():
pipeline.manager.removeAbandonedChange(change, event)
if pipeline.manager.eventMatches(event, change):
@ -2171,35 +2175,6 @@ class Scheduler(threading.Thread):
pipelines.append(status)
return json.dumps(data)
def onChangeUpdated(self, change, event):
"""Remove stale dependency references on change update.
When a change is updated with a new patchset, other changes in
the system may still have a reference to the old patchset in
their dependencies. Search for those (across all sources) and
mark that their dependencies are out of date. This will cause
them to be refreshed the next time the queue processor
examines them.
"""
log = get_annotated_logger(self.log, event)
log.debug("Change %s has been updated, clearing dependent "
"change caches", change)
for source in self.connections.getSources():
for other_change in source.getCachedChanges():
if not isinstance(other_change, Change):
continue
if other_change.commit_needs_changes is None:
continue
for connection_name, key in other_change.commit_needs_changes:
dep_source = self.connections.getSource(connection_name)
dep = dep_source.getChangeByKey(key)
if change.isUpdateOf(dep):
source.setChangeAttributes(
other_change, refresh_deps=True)
source = self.connections.getSource(change.project.connection_name)
source.setChangeAttributes(change, refresh_deps=True)
def cancelJob(self, buildset, job, build=None, final=False,
force=False):
"""Cancel a running build