Perform fewer gerrit queries for changes submitted-together
When using submitWholeTopic, it is possible to add changes to a dependency cycle without actually updating all of the involved changes. Yet, in Zuul, each of the involved changes needs to have a link to all the others. To accomplish this, when we update information about a change in zuul, we querry gerrit for changes that are "submitted together" with that change. That includes all the changes in the topic. Then we update the information for every change returned from that query in order to set up the reverse linkage. When we update a change, we perform 5 gerrit queries: * The change info itself * Related changes (git parent/child) * Files list * Reverve commit dependencies (Depends-On that point to this change) * Submitted together We also might perform more queries depending on Depends-On fooders we find. When we update the submitted together changes, we perform all of these queries for each of the other changes (but we keep track of the ones we've done already, so we don't do them more than once). That means in a set of 3 changes, we would perform 15 queries, But that's only what we would do if the changes already existed in their final state. In reality, changes are added to a topic one at a time. That means that we will get a gerrit event for every change (either a patchset-created event, or a topic-updated event). Every time we get one of these events, we repeat the process. That means for a set of 3 changes, we would perform 5 queries for the first event, 10 for the second, and 15 for the third, for a total of 30 queries. The total number of queries performed for any number of changes added in this manner is: queries = sum([5*i for i in range(1, count+1)]) count=100 gives 25250 queries For a large number of changes, this could be debilitating to Zuul (and possibly Gerrit). To reduce the impact of this, since we know that when a change is added to a topic, it is not necessary to update anything about the other changes in the topic other than their dependencies, we can avoid running some queries on the other changes in the topic. Ideally, we would avoid running any queries and just presume that a change that appears in the submitted-together list is there because it is a member of the topic, however, the submitted-together list also includes git parents. And if submitWholeTopic is not enabled, and two changes share a topic and a git parent-child relationship, the results would appear the same as if submitWholeTopic was enabled. It is not possible to determine whether the dependency should be one-way (parent-child) or circular (submitWholeTopic) by examining the results from only one change. We need to at least run the submitted-together query for every change in the result set to see if our change appears in the submitted-together results in the others. This change reduces the queries run in the way described. That means the total number of queries is: queries = 4*count + sum([1*i for i in range(1, count+1)]) count=100 gives 10500 queries Which is a significant improvement, especially for larger numbers of changes. In implementing this change, two related bugs were fixed: * The fake gerrit driver now only returns unmerged changes in the submitted-together query (as Gerrit does). * Some query history checks in the Gerrit driver used different values (some used change objects, some used (id, patchset) tuples which caused some history checks to fail. It failed safe, meaning that we performed unecessary queries. With the correction in this change we should perform fewer gerrit queries for complex change dependency topologies regardless of wehther they involve submitWholeTopic. Change-Id: Ie105fb2ecb3f48a9f2e2f97a608537eb601bc688
This commit is contained in:
parent
8cf75cf5a5
commit
f9047557df
@ -728,6 +728,7 @@ class GerritWebServer(object):
|
||||
def do_POST(self):
|
||||
path = self.path
|
||||
self.log.debug("Got POST %s", path)
|
||||
fake_gerrit.api_calls.append(('POST', path))
|
||||
|
||||
data = self.rfile.read(int(self.headers['Content-Length']))
|
||||
data = json.loads(data.decode('utf-8'))
|
||||
@ -749,6 +750,7 @@ class GerritWebServer(object):
|
||||
def do_GET(self):
|
||||
path = self.path
|
||||
self.log.debug("Got GET %s", path)
|
||||
fake_gerrit.api_calls.append(('GET', path))
|
||||
|
||||
m = self.change_re.match(path)
|
||||
if m:
|
||||
@ -1015,6 +1017,7 @@ class FakeGerritConnection(gerritconnection.GerritConnection):
|
||||
self.change_number = 0
|
||||
self.changes = changes_db
|
||||
self.queries = []
|
||||
self.api_calls = []
|
||||
self.upstream_root = upstream_root
|
||||
self.fake_checkers = []
|
||||
self._poller_event = poller_event
|
||||
@ -1136,6 +1139,8 @@ class FakeGerritConnection(gerritconnection.GerritConnection):
|
||||
for dep in change.data.get('dependsOn', []):
|
||||
dep_change = self.changes.get(int(dep['number']))
|
||||
r = dep_change.queryHTTP(internal=True)
|
||||
if r['status'] == 'MERGED':
|
||||
continue
|
||||
if r not in results:
|
||||
results.append(r)
|
||||
if len(results) == 1:
|
||||
|
@ -14,6 +14,7 @@
|
||||
# You should have received a copy of the GNU General Public License
|
||||
# along with this software. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
from collections import Counter
|
||||
import re
|
||||
import textwrap
|
||||
import json
|
||||
@ -3443,6 +3444,72 @@ class TestGerritCircularDependencies(ZuulTestCase):
|
||||
self.assertEqual(A.data["status"], "MERGED")
|
||||
self.assertEqual(B.data["status"], "MERGED")
|
||||
|
||||
def test_submitted_together_storm(self):
|
||||
# Test that if many changes are uploaded with the same topic,
|
||||
# we handle queries efficiently.
|
||||
self.fake_gerrit._fake_submit_whole_topic = True
|
||||
self.waitUntilSettled()
|
||||
A = self.fake_gerrit.addFakeChange('org/project', "master", "A",
|
||||
topic='test-topic')
|
||||
self.fake_gerrit.addEvent(A.getPatchsetCreatedEvent(1))
|
||||
self.waitUntilSettled()
|
||||
B = self.fake_gerrit.addFakeChange('org/project1', "master", "B",
|
||||
topic='test-topic')
|
||||
self.fake_gerrit.addEvent(B.getPatchsetCreatedEvent(1))
|
||||
self.waitUntilSettled()
|
||||
|
||||
C = self.fake_gerrit.addFakeChange('org/project2', "master", "C",
|
||||
topic='test-topic')
|
||||
self.fake_gerrit.addEvent(C.getPatchsetCreatedEvent(1))
|
||||
self.waitUntilSettled()
|
||||
|
||||
# Output all the queries seen for debugging
|
||||
for q in self.fake_gerrit.api_calls:
|
||||
self.log.debug("Query: %s", q)
|
||||
|
||||
gets = [q[1] for q in self.fake_gerrit.api_calls if q[0] == 'GET']
|
||||
counters = Counter()
|
||||
for q in gets:
|
||||
parts = q.split('/')[2:]
|
||||
if len(parts) > 2 and parts[2] == 'revisions':
|
||||
parts.pop(3)
|
||||
if 'q=message' in parts[1]:
|
||||
parts[1] = 'message'
|
||||
counters[tuple(parts)] += 1
|
||||
# Ensure that we don't run these queries more than once for each change
|
||||
qstring = ('o=DETAILED_ACCOUNTS&o=CURRENT_REVISION&'
|
||||
'o=CURRENT_COMMIT&o=CURRENT_FILES&o=LABELS&'
|
||||
'o=DETAILED_LABELS&o=ALL_REVISIONS')
|
||||
self.assertEqual(1, counters[('changes', f'1?{qstring}')])
|
||||
self.assertEqual(1, counters[('changes', f'2?{qstring}')])
|
||||
self.assertEqual(1, counters[('changes', f'3?{qstring}')])
|
||||
self.assertEqual(1, counters[('changes', '1', 'revisions', 'related')])
|
||||
self.assertEqual(1, counters[('changes', '2', 'revisions', 'related')])
|
||||
self.assertEqual(1, counters[('changes', '3', 'revisions', 'related')])
|
||||
self.assertEqual(1, counters[
|
||||
('changes', '1', 'revisions', 'files?parent=1')])
|
||||
self.assertEqual(1, counters[
|
||||
('changes', '2', 'revisions', 'files?parent=1')])
|
||||
self.assertEqual(1, counters[
|
||||
('changes', '3', 'revisions', 'files?parent=1')])
|
||||
self.assertEqual(3, counters[('changes', 'message')])
|
||||
# These queries need to run more often
|
||||
self.assertEqual(3, counters[('changes', '1', 'submitted_together')])
|
||||
self.assertEqual(2, counters[('changes', '2', 'submitted_together')])
|
||||
self.assertEqual(1, counters[('changes', '3', 'submitted_together')])
|
||||
self.assertHistory([
|
||||
dict(name="project-job", changes="1,1"),
|
||||
|
||||
dict(name="project-job", changes="1,1 2,1"),
|
||||
dict(name="project1-job", changes="1,1 2,1"),
|
||||
dict(name="project-vars-job", changes="1,1 2,1"),
|
||||
|
||||
dict(name="project-job", changes="2,1 1,1 3,1"),
|
||||
dict(name="project1-job", changes="2,1 1,1 3,1"),
|
||||
dict(name="project-vars-job", changes="2,1 1,1 3,1"),
|
||||
dict(name="project2-job", changes="2,1 1,1 3,1"),
|
||||
], ordered=False)
|
||||
|
||||
def test_submitted_together_git(self):
|
||||
self.fake_gerrit._fake_submit_whole_topic = True
|
||||
|
||||
|
@ -14,8 +14,10 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import collections
|
||||
import copy
|
||||
import datetime
|
||||
import enum
|
||||
import itertools
|
||||
import json
|
||||
import logging
|
||||
@ -170,6 +172,28 @@ class GerritChangeData(object):
|
||||
change['_revision_number']))
|
||||
|
||||
|
||||
class QueryHistory:
|
||||
class Query(enum.Enum):
|
||||
SEEN = 1 # Not a real query, just that we've seen the change
|
||||
CHANGE = 2 # The main change query
|
||||
SUBMITTED_TOGETHER = 3 # The submitted-together query
|
||||
|
||||
def __init__(self):
|
||||
self.queries = collections.defaultdict(lambda: dict())
|
||||
|
||||
def getByKey(self, query, change_key):
|
||||
if not isinstance(change_key, ChangeKey):
|
||||
raise Exception("Must supply a ChangeKey")
|
||||
key = (change_key.stable_id, change_key.revision)
|
||||
return self.queries[query].get(key)
|
||||
|
||||
def add(self, query, change):
|
||||
if not isinstance(change, GerritChange):
|
||||
raise Exception("Must supply a GerritChange")
|
||||
key = (change.number, change.patchset)
|
||||
self.queries[query][key] = change
|
||||
|
||||
|
||||
class GerritEventConnector(threading.Thread):
|
||||
"""Move events from Gerrit to the scheduler."""
|
||||
|
||||
@ -658,10 +682,20 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
|
||||
elif change_key.change_type == 'Ref':
|
||||
return self._getRef(change_key, refresh=refresh, event=event)
|
||||
|
||||
def _checkMaxDependencies(self, change, history):
|
||||
if change and history:
|
||||
history.add(history.Query.SEEN, change)
|
||||
if (self.max_dependencies is not None and
|
||||
len(history.queries[history.Query.SEEN]) >
|
||||
self.max_dependencies):
|
||||
raise GerritEventProcessingException(
|
||||
f"Change {change} has too many dependencies")
|
||||
|
||||
def _getChange(self, change_key, refresh=False, history=None,
|
||||
event=None, allow_key_update=False):
|
||||
# Ensure number and patchset are str
|
||||
change = self._change_cache.get(change_key)
|
||||
self._checkMaxDependencies(change, history)
|
||||
if change and not refresh:
|
||||
return change
|
||||
if not change:
|
||||
@ -671,6 +705,7 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
|
||||
change = GerritChange(None)
|
||||
change.number = change_key.stable_id
|
||||
change.patchset = change_key.revision
|
||||
self._checkMaxDependencies(change, history)
|
||||
return self._updateChange(change_key, change, event, history,
|
||||
allow_key_update)
|
||||
|
||||
@ -830,17 +865,10 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
|
||||
# drop history, we need to resolve the patchset on events with
|
||||
# no patchsets before adding the entry to the change cache.
|
||||
if history and change.number and change.patchset:
|
||||
for history_change in history:
|
||||
if (history_change.number == change.number and
|
||||
history_change.patchset == change.patchset):
|
||||
log.debug("Change %s is in history", change)
|
||||
return history_change
|
||||
|
||||
if (self.max_dependencies is not None and
|
||||
history and
|
||||
len(history) > self.max_dependencies):
|
||||
raise GerritEventProcessingException(
|
||||
f"Change {change} has too many dependencies")
|
||||
history_change = history.getByKey(history.Query.CHANGE, key)
|
||||
if history_change:
|
||||
log.debug("Change %s is in history", change)
|
||||
return history_change
|
||||
|
||||
log.info("Updating %s", change)
|
||||
data = self.queryChange(change.number, event=event)
|
||||
@ -853,7 +881,7 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
|
||||
# dependent changes (recursively calling this method).
|
||||
if not change.is_merged:
|
||||
extra = self._updateChangeDependencies(
|
||||
log, change, data, event, history)
|
||||
log, key, change, data, event, history)
|
||||
else:
|
||||
extra = {}
|
||||
|
||||
@ -866,10 +894,11 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
|
||||
|
||||
return change
|
||||
|
||||
def _updateChangeDependencies(self, log, change, data, event, history):
|
||||
def _updateChangeDependencies(self, log, key, change, data, event,
|
||||
history):
|
||||
if history is None:
|
||||
history = []
|
||||
history.append(change)
|
||||
history = QueryHistory()
|
||||
history.add(history.Query.CHANGE, change)
|
||||
|
||||
needs_changes = set()
|
||||
git_needs_changes = []
|
||||
@ -932,9 +961,9 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
|
||||
# reference the latest patchset of its Depends-On (this
|
||||
# change). In case the dep is already in history we already
|
||||
# refreshed this change so refresh is not needed in this case.
|
||||
refresh = (dep_num, dep_ps) not in history
|
||||
dep_key = ChangeKey(self.connection_name, None,
|
||||
'GerritChange', str(dep_num), str(dep_ps))
|
||||
refresh = not history.getByKey(history.Query.CHANGE, dep_key)
|
||||
dep = self._getChange(
|
||||
dep_key, refresh=refresh, history=history,
|
||||
event=event)
|
||||
@ -953,17 +982,38 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
|
||||
log.debug("Updating %s: Getting submitted-together "
|
||||
"change %s,%s",
|
||||
change, dep_num, dep_ps)
|
||||
# Because a submitted-together change may be a cross-repo
|
||||
# dependency, cause that change to refresh so that it will
|
||||
# reference the latest patchset of its Depends-On (this
|
||||
# change). In case the dep is already in history we already
|
||||
# refreshed this change so refresh is not needed in this case.
|
||||
refresh = (dep_num, dep_ps) not in history
|
||||
# The query above will have returned a set of changes
|
||||
# that are submitted together along with this one.
|
||||
# That set includes:
|
||||
# * Any git-dependent change if we're not being cherry-picked
|
||||
# * Any change with the same topic if submitWholeTopic is set
|
||||
|
||||
# The first is a one-way dependency, the second is
|
||||
# simultaneous. We are unable to distinguish the two
|
||||
# based only on the results of the submitted-together
|
||||
# query. Therefore, while we know that we need to add
|
||||
# the dep to our dependency list, we don't know
|
||||
# whether we need to add ourselves to the dep list.
|
||||
|
||||
# In order to find that out, we will need to run the
|
||||
# submitted-together query for each dep as well. But
|
||||
# if we've already queried the dep, we don't need to
|
||||
# do it again, and if this change already appears in
|
||||
# the dep's dependencies, we also don't need to query
|
||||
# again.
|
||||
dep_key = ChangeKey(self.connection_name, None,
|
||||
'GerritChange', str(dep_num), str(dep_ps))
|
||||
dep = self._getChange(
|
||||
dep_key, refresh=refresh, history=history,
|
||||
dep_key, refresh=False, history=history,
|
||||
event=event)
|
||||
refresh = True
|
||||
if (history.getByKey(history.Query.CHANGE, dep_key) or
|
||||
history.getByKey(history.Query.SUBMITTED_TOGETHER,
|
||||
dep_key)):
|
||||
refresh = False
|
||||
if (key in dep.compat_needs_changes and
|
||||
key in dep.compat_needed_by_changes):
|
||||
refresh = False
|
||||
# Gerrit changes to be submitted together do not
|
||||
# necessarily get posted with dependency cycles using
|
||||
# git trees and depends-on. However, they are
|
||||
@ -979,6 +1029,11 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
|
||||
and dep not in needed_by_changes):
|
||||
compat_needed_by_changes.append(dep_key.reference)
|
||||
needed_by_changes.add(dep_key.reference)
|
||||
if refresh:
|
||||
# We may need to update the deps dependencies (as
|
||||
# explained above).
|
||||
history.add(history.Query.SUBMITTED_TOGETHER, dep)
|
||||
self.updateSubmittedTogether(log, dep, history, event)
|
||||
except GerritEventProcessingException:
|
||||
raise
|
||||
except Exception:
|
||||
@ -992,6 +1047,51 @@ class GerritConnection(ZKChangeCacheMixin, ZKBranchCacheMixin, BaseConnection):
|
||||
compat_needed_by_changes=compat_needed_by_changes,
|
||||
)
|
||||
|
||||
def updateSubmittedTogether(self, log, change, history, event):
|
||||
# This method is very similar to the last part of
|
||||
# _updateChangeDependencies, but it updates the other
|
||||
# direction and does so without performing a full change
|
||||
# query.
|
||||
extra = {
|
||||
'compat_needs_changes': change.compat_needs_changes[:],
|
||||
'compat_needed_by_changes': change.compat_needed_by_changes[:],
|
||||
}
|
||||
update = False
|
||||
for (dep_num, dep_ps) in self._getSubmittedTogether(change, event):
|
||||
try:
|
||||
log.debug("Updating %s: Getting reverse submitted-together "
|
||||
"change %s,%s",
|
||||
change, dep_num, dep_ps)
|
||||
dep_key = ChangeKey(self.connection_name, None,
|
||||
'GerritChange', str(dep_num), str(dep_ps))
|
||||
dep = self._getChange(
|
||||
dep_key, refresh=False, history=history,
|
||||
event=event)
|
||||
if (dep.open and
|
||||
dep_key.reference not in
|
||||
extra['compat_needs_changes']):
|
||||
extra['compat_needs_changes'].append(dep_key.reference)
|
||||
update = True
|
||||
if (dep.open and
|
||||
dep.is_current_patchset and
|
||||
dep_key.reference not in
|
||||
extra['compat_needed_by_changes']):
|
||||
extra['compat_needed_by_changes'].append(dep_key.reference)
|
||||
update = True
|
||||
except GerritEventProcessingException:
|
||||
raise
|
||||
except Exception:
|
||||
log.exception("Failed to get commit-needed change %s,%s",
|
||||
dep_num, dep_ps)
|
||||
if update:
|
||||
# Actually update the dep in the change cache.
|
||||
def _update_change(c):
|
||||
for k, v in extra.items():
|
||||
setattr(c, k, v)
|
||||
self._change_cache.updateChangeWithRetry(
|
||||
change.cache_stat.key, change, _update_change,
|
||||
allow_key_update=False)
|
||||
|
||||
def isMerged(self, change, head=None):
|
||||
self.log.debug("Checking if change %s is merged" % change)
|
||||
if not change.number:
|
||||
|
Loading…
x
Reference in New Issue
Block a user