Assign Gerrit change events to a patchset

We receive some events from Gerrit (such as the currently-unhandled
hashtags-changed or currently-partially-handled topic-changed) events
without a patchset, only a change number. This makes sense from a
data model perspective, but is not useful in Zuul since we must have
a patchset to actually enqueue something.

Currently we will handle these events by querying for the change and
storing the results in the change cache under the key (change, None).
Note that the change itself will have the current patchset assigned
as a result of the query.  But this "upgrade" of information applies
only to the change itself, not the change key, so this up-to-date
change object will never be used for anything productive in Zuul.

In order to actually trigger off of these events, let's "upgrade" the
change key as well after performing the query.  To do that, we will
use a new change key with patchset information when storing the change
object in the cache if our initial change key lacked patchset info but
the resulting change has it.  But only in the specific case where we
are performing our first query after receiving an event.  We will also
update the event with the same patchset information.  This should
mean that after receiving an event and performing the initial query,
we should be guaranteed to have patchset information about the change
and therefore Zuul should never see a (change, None) tuple for a
change key any more.

* (Change keys have more information than that tuple, but those are
   the relevant parts for this change.)

Change-Id: I6f077376044ffbbd3853e2050c507f449da77962
This commit is contained in:
James E. Blair 2023-12-04 13:56:58 -08:00
parent 50e06b4e74
commit ef88c15405
4 changed files with 34 additions and 12 deletions

View File

@ -368,7 +368,10 @@ class GerritEventConnector(threading.Thread):
self.connection.clearConnectionCacheOnBranchEvent(event)
self._getChange(event, connection_event.zuul_event_ltime)
change = self._getChange(event, connection_event.zuul_event_ltime)
if (change and change.patchset and
event.change_number and event.patch_number is None):
event.patch_number = str(change.patchset)
self.connection.logEvent(event)
self.connection.sched.addTriggerEvent(
self.connection.driver_name, event
@ -377,6 +380,7 @@ class GerritEventConnector(threading.Thread):
def _getChange(self, event, connection_event_ltime):
# Grab the change if we are managing the project or if it exists in the
# cache as it may be a dependency
change = None
if event.change_number:
refresh = True
change_key = self.connection.source.getChangeKey(event)
@ -411,8 +415,10 @@ class GerritEventConnector(threading.Thread):
# we need to update those objects by reference so that they
# have the correct/new information and also avoid hitting
# gerrit multiple times.
self.connection._getChange(change_key,
refresh=True, event=event)
change = self.connection._getChange(
change_key, refresh=True, event=event,
allow_key_update=True)
return change
class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
@ -648,7 +654,7 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
return self._getRef(change_key, refresh=refresh, event=event)
def _getChange(self, change_key, refresh=False, history=None,
event=None):
event=None, allow_key_update=False):
# Ensure number and patchset are str
change = self._change_cache.get(change_key)
if change and not refresh:
@ -660,7 +666,8 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
change = GerritChange(None)
change.number = change_key.stable_id
change.patchset = change_key.revision
return self._updateChange(change_key, change, event, history)
return self._updateChange(change_key, change, event, history,
allow_key_update)
def _getTag(self, change_key, refresh=False, event=None):
tag = change_key.stable_id
@ -797,7 +804,8 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
ret.append((dep_change, dep_ps))
return ret
def _updateChange(self, key, change, event, history):
def _updateChange(self, key, change, event, history,
allow_key_update=False):
log = get_annotated_logger(self.log, event)
# In case this change is already in the history we have a
@ -831,10 +839,10 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
data = self.queryChange(change.number, event=event)
def _update_change(c):
c.update(data, self)
return c.update(data, self)
change = self._change_cache.updateChangeWithRetry(key, change,
_update_change)
change = self._change_cache.updateChangeWithRetry(
key, change, _update_change, allow_key_update=allow_key_update)
if not change.is_merged:
self._updateChangeDependencies(log, change, data, event, history)

View File

@ -22,6 +22,8 @@ from zuul.model import EventFilter, RefFilter
from zuul.model import Change, TriggerEvent, FalseWithReason
from zuul.driver.util import time_to_seconds, to_list, make_regex
from zuul import exceptions
from zuul.zk.change_cache import ChangeKey
EMPTY_GIT_REF = '0' * 40 # git sha of all zeros, used during creates/deletes
@ -44,6 +46,9 @@ class GerritChange(Change):
self.updateFromSSH(data.data, connection)
else:
self.updateFromHTTP(data.data, data.files, connection)
key = ChangeKey(connection.connection_name, None,
'GerritChange', str(self.number), str(self.patchset))
return key
def serialize(self):
d = super().serialize()

View File

@ -26,6 +26,12 @@ from zuul.lib.dependson import find_dependency_headers
from zuul.zk.change_cache import ChangeKey
def noneOrStr(val):
if val is None:
return None
return str(val)
class GerritSource(BaseSource):
name = 'gerrit'
log = logging.getLogger("zuul.source.Gerrit")
@ -62,7 +68,7 @@ class GerritSource(BaseSource):
return ChangeKey(connection_name, None,
'GerritChange',
str(event.change_number),
str(event.patch_number))
noneOrStr(event.patch_number))
revision = f'{event.oldrev}..{event.newrev}'
if event.ref and event.ref.startswith('refs/tags/'):
tag = event.ref[len('refs/tags/'):]

View File

@ -425,11 +425,14 @@ class AbstractChangeCache(ZooKeeperSimpleBase, Iterable, abc.ABC):
compressed_size, uncompressed_size)
self._change_cache[key._hash] = change
def updateChangeWithRetry(self, key, change, update_func, retry_count=5):
def updateChangeWithRetry(self, key, change, update_func, retry_count=5,
allow_key_update=False):
for attempt in range(1, retry_count + 1):
try:
version = change.cache_version
update_func(change)
newkey = update_func(change)
if allow_key_update and newkey:
key = newkey
self.set(key, change, version)
break
except ConcurrentUpdateError: