Merge "Fix regression in change tracking"

This commit is contained in:
Jenkins 2016-01-08 00:19:32 +00:00 committed by Gerrit Code Review
commit 514db293af
5 changed files with 32 additions and 11 deletions

View File

@ -43,6 +43,14 @@ class BaseConnection(object):
self.connection_name = connection_name
self.connection_config = connection_config
# Keep track of the sources, triggers and reporters using this
# connection
self.attached_to = {
'source': [],
'trigger': [],
'reporter': [],
}
def onLoad(self):
pass
@ -51,3 +59,6 @@ class BaseConnection(object):
def registerScheduler(self, sched):
self.sched = sched
def registerUse(self, what, instance):
self.attached_to[what].append(instance)

View File

@ -47,7 +47,6 @@ class GerritEventConnector(threading.Thread):
def _handleEvent(self):
ts, data = self.connection.getEvent()
if self._stopped:
self.connection.eventDone()
return
# Gerrit can produce inconsistent data immediately after an
# event, So ensure that we do not deliver the event to Zuul
@ -101,11 +100,23 @@ class GerritEventConnector(threading.Thread):
if (event.change_number and
self.connection.sched.getProject(event.project_name)):
# Mark the change as needing a refresh in the cache
event._needs_refresh = True
# Call _getChange for the side effect of updating the
# cache. Note that this modifies Change objects outside
# the main thread.
# NOTE(jhesketh): Ideally we'd just remove the change from the
# cache to denote that it needs updating. However the change
# object is already used by Item's and hence BuildSet's etc. and
# we need to update those objects by reference so that they have
# the correct/new information and also avoid hitting gerrit
# multiple times.
if self.connection.attached_to['source']:
self.connection.attached_to['source'][0]._getChange(
event.change_number, event.patch_number, refresh=True)
# We only need to do this once since the connection maintains
# the cache (which is shared between all the sources)
# NOTE(jhesketh): We may couple sources and connections again
# at which point this becomes more sensible.
self.connection.sched.addEvent(event)
self.connection.eventDone()
def run(self):
while True:
@ -115,6 +126,8 @@ class GerritEventConnector(threading.Thread):
self._handleEvent()
except:
self.log.exception("Exception moving Gerrit event:")
finally:
self.connection.eventDone()
class GerritWatcher(threading.Thread):

View File

@ -1005,9 +1005,6 @@ class TriggerEvent(object):
# an admin command, etc):
self.forced_pipeline = None
# Internal mechanism to track if the change needs a refresh from cache
self._needs_refresh = False
def __repr__(self):
ret = '<TriggerEvent %s %s' % (self.type, self.project_name)

View File

@ -307,6 +307,9 @@ class Scheduler(threading.Thread):
driver_config, self, connection
)
if connection:
connection.registerUse(dtype, driver_instance)
return driver_instance
def _getSourceDriver(self, connection_name):

View File

@ -129,9 +129,6 @@ class GerritSource(BaseSource):
def getChange(self, event, project):
if event.change_number:
refresh = False
if event._needs_refresh:
refresh = True
event._needs_refresh = False
change = self._getChange(event.change_number, event.patch_number,
refresh=refresh)
elif event.ref: