Merge "Reduce redundant Gerrit queries"
This commit is contained in:
@@ -13,6 +13,7 @@
|
||||
# under the License.
|
||||
|
||||
import os
|
||||
import threading
|
||||
import textwrap
|
||||
from unittest import mock
|
||||
|
||||
@@ -868,3 +869,58 @@ class TestGerritFake(ZuulTestCase):
|
||||
# The Gerrit connection method filters out the queried change
|
||||
ret = self.fake_gerrit._getSubmittedTogether(C1, None)
|
||||
self.assertEqual(ret, [(4, 1)])
|
||||
|
||||
|
||||
class TestGerritConnection(ZuulTestCase):
|
||||
config_file = 'zuul-gerrit-web.conf'
|
||||
tenant_config_file = 'config/single-tenant/main.yaml'
|
||||
|
||||
def test_zuul_query_ltime(self):
|
||||
# Add a lock around the event queue iterator so that we can
|
||||
# ensure that multiple events arrive before the first is
|
||||
# processed.
|
||||
lock = threading.Lock()
|
||||
|
||||
orig_iterEvents = self.fake_gerrit.gerrit_event_connector.\
|
||||
event_queue._iterEvents
|
||||
|
||||
def _iterEvents(*args, **kw):
|
||||
with lock:
|
||||
return orig_iterEvents(*args, **kw)
|
||||
|
||||
self.patch(self.fake_gerrit.gerrit_event_connector.event_queue,
|
||||
'_iterEvents', _iterEvents)
|
||||
|
||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||
B = self.fake_gerrit.addFakeChange('org/project', 'master', 'B')
|
||||
B.setDependsOn(A, 1)
|
||||
# Hold the connection queue processing so these events get
|
||||
# processed together
|
||||
with lock:
|
||||
self.fake_gerrit.addEvent(A.addApproval('Code-Review', 2))
|
||||
self.fake_gerrit.addEvent(B.addApproval('Approved', 1))
|
||||
self.fake_gerrit.addEvent(B.addApproval('Code-Review', 2))
|
||||
self.waitUntilSettled()
|
||||
self.assertHistory([])
|
||||
# One query for each change in the above cluster of events.
|
||||
self.assertEqual(A.queried, 1)
|
||||
self.assertEqual(B.queried, 1)
|
||||
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
|
||||
self.waitUntilSettled()
|
||||
self.assertHistory([
|
||||
dict(name="project-merge", result="SUCCESS", changes="1,1"),
|
||||
dict(name="project-test1", result="SUCCESS", changes="1,1"),
|
||||
dict(name="project-test2", result="SUCCESS", changes="1,1"),
|
||||
dict(name="project-merge", result="SUCCESS", changes="1,1 2,1"),
|
||||
dict(name="project-test1", result="SUCCESS", changes="1,1 2,1"),
|
||||
dict(name="project-test2", result="SUCCESS", changes="1,1 2,1"),
|
||||
], ordered=False)
|
||||
# One query due to the event on change A, followed by a query
|
||||
# to verify the merge.
|
||||
self.assertEqual(A.queried, 3)
|
||||
# No query for change B necessary since our cache is up to
|
||||
# date with respect for the triggering event. One query to
|
||||
# verify the merge.
|
||||
self.assertEqual(B.queried, 2)
|
||||
self.assertEqual(A.data['status'], 'MERGED')
|
||||
self.assertEqual(B.data['status'], 'MERGED')
|
||||
|
@@ -103,10 +103,12 @@ class GerritChangeData(object):
|
||||
SSH = 1
|
||||
HTTP = 2
|
||||
|
||||
def __init__(self, fmt, data, related=None, files=None):
|
||||
def __init__(self, fmt, data, related=None, files=None,
|
||||
zuul_query_ltime=None):
|
||||
self.format = fmt
|
||||
self.data = data
|
||||
self.files = files
|
||||
self.zuul_query_ltime = zuul_query_ltime
|
||||
|
||||
if fmt == self.SSH:
|
||||
self.parseSSH(data)
|
||||
@@ -329,19 +331,20 @@ class GerritEventConnector(threading.Thread):
|
||||
|
||||
self.connection.clearConnectionCacheOnBranchEvent(event)
|
||||
|
||||
self._getChange(event)
|
||||
self._getChange(event, connection_event.zuul_event_ltime)
|
||||
self.connection.logEvent(event)
|
||||
self.connection.sched.addTriggerEvent(
|
||||
self.connection.driver_name, event
|
||||
)
|
||||
|
||||
def _getChange(self, event):
|
||||
def _getChange(self, event, connection_event_ltime):
|
||||
# Grab the change if we are managing the project or if it exists in the
|
||||
# cache as it may be a dependency
|
||||
if event.change_number:
|
||||
refresh = True
|
||||
change_key = self.connection.source.getChangeKey(event)
|
||||
if self.connection._change_cache.get(change_key) is None:
|
||||
change = self.connection._change_cache.get(change_key)
|
||||
if change is None:
|
||||
refresh = False
|
||||
for tenant in self.connection.sched.abide.tenants.values():
|
||||
# TODO(fungi): it would be better to have some simple means
|
||||
@@ -353,6 +356,13 @@ class GerritEventConnector(threading.Thread):
|
||||
event.project_name))):
|
||||
refresh = True
|
||||
break
|
||||
else:
|
||||
# We have a cache entry for this change Get the
|
||||
# query ltime for the cache entry; if it's after the
|
||||
# event ltime, we don't need to refresh.
|
||||
if (change.zuul_query_ltime and
|
||||
change.zuul_query_ltime > connection_event_ltime):
|
||||
refresh = False
|
||||
|
||||
if refresh:
|
||||
# Call _getChange for the side effect of updating the
|
||||
@@ -1418,15 +1428,20 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
|
||||
|
||||
def queryChange(self, number, event=None):
|
||||
for attempt in range(3):
|
||||
# Get a query ltime -- any events before this point should be
|
||||
# included in our change data.
|
||||
zuul_query_ltime = self.sched.zk_client.getCurrentLtime()
|
||||
try:
|
||||
if self.session:
|
||||
data, related, files = self.queryChangeHTTP(
|
||||
number, event=event)
|
||||
return GerritChangeData(GerritChangeData.HTTP,
|
||||
data, related, files)
|
||||
data, related, files,
|
||||
zuul_query_ltime=zuul_query_ltime)
|
||||
else:
|
||||
data = self.queryChangeSSH(number, event=event)
|
||||
return GerritChangeData(GerritChangeData.SSH, data)
|
||||
return GerritChangeData(GerritChangeData.SSH, data,
|
||||
zuul_query_ltime=zuul_query_ltime)
|
||||
except Exception:
|
||||
if attempt >= 3:
|
||||
raise
|
||||
|
@@ -35,8 +35,10 @@ class GerritChange(Change):
|
||||
self.approvals = []
|
||||
self.missing_labels = set()
|
||||
self.commit = None
|
||||
self.zuul_query_ltime = None
|
||||
|
||||
def update(self, data, connection):
|
||||
self.zuul_query_ltime = data.zuul_query_ltime
|
||||
if data.format == data.SSH:
|
||||
self.updateFromSSH(data.data, connection)
|
||||
else:
|
||||
@@ -51,6 +53,7 @@ class GerritChange(Change):
|
||||
"approvals": self.approvals,
|
||||
"missing_labels": list(self.missing_labels),
|
||||
"commit": self.commit,
|
||||
"zuul_query_ltime": self.zuul_query_ltime,
|
||||
})
|
||||
return d
|
||||
|
||||
@@ -62,6 +65,7 @@ class GerritChange(Change):
|
||||
self.approvals = data["approvals"]
|
||||
self.missing_labels = set(data["missing_labels"])
|
||||
self.commit = data.get("commit")
|
||||
self.zuul_query_ltime = data.get("zuul_query_ltime")
|
||||
|
||||
def updateFromSSH(self, data, connection):
|
||||
if self.patchset is None:
|
||||
|
@@ -909,7 +909,7 @@ class ConnectionEventQueue(ZooKeeperEventQueue):
|
||||
self._put({'event_data': data})
|
||||
|
||||
def __iter__(self):
|
||||
for data, ack_ref, _ in self._iterEvents():
|
||||
for data, ack_ref, zstat in self._iterEvents():
|
||||
if not data:
|
||||
self.log.warning("Malformed event found: %s", data)
|
||||
self._remove(ack_ref.path)
|
||||
@@ -918,6 +918,7 @@ class ConnectionEventQueue(ZooKeeperEventQueue):
|
||||
event = model.ConnectionEvent.fromDict(
|
||||
data.get('event_data', data))
|
||||
event.ack_ref = ack_ref
|
||||
event.zuul_event_ltime = zstat.creation_transaction_id
|
||||
yield event
|
||||
|
||||
|
||||
|
Reference in New Issue
Block a user