From 0e4c791c7bc1df62535b4f6e199a78167c73b8e3 Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Tue, 2 Jan 2018 14:20:17 -0800 Subject: [PATCH] Support cross-source dependencies Additional tests and docs in later patches. Change-Id: I3b86a1e3dd507fa5e584680fb6c86d35f9ff3e23 Story: 2001334 Task: 5885 --- doc/source/admin/drivers/zuul.rst | 6 + tests/base.py | 23 +++- tests/unit/test_connection.py | 2 +- tests/unit/test_gerrit_crd.py | 11 +- tests/unit/test_scheduler.py | 2 +- tests/unit/test_zuultrigger.py | 4 +- zuul/driver/gerrit/gerritconnection.py | 55 ++++++++-- zuul/driver/gerrit/gerritsource.py | 56 ++++++++++ zuul/driver/git/gitsource.py | 9 ++ zuul/driver/github/githubconnection.py | 146 +++++++++++-------------- zuul/driver/github/githubmodel.py | 3 +- zuul/driver/github/githubsource.py | 34 ++++++ zuul/driver/zuul/__init__.py | 13 ++- zuul/executor/client.py | 2 +- zuul/lib/connections.py | 14 ++- zuul/lib/dependson.py | 29 +++++ zuul/manager/__init__.py | 28 +++++ zuul/manager/dependent.py | 31 +++++- zuul/manager/independent.py | 3 + zuul/model.py | 38 ++++++- zuul/scheduler.py | 22 ++++ zuul/source/__init__.py | 23 ++++ 22 files changed, 429 insertions(+), 125 deletions(-) create mode 100644 zuul/lib/dependson.py diff --git a/doc/source/admin/drivers/zuul.rst b/doc/source/admin/drivers/zuul.rst index d95dffc9e9..41535ee063 100644 --- a/doc/source/admin/drivers/zuul.rst +++ b/doc/source/admin/drivers/zuul.rst @@ -26,6 +26,12 @@ can simply be used by listing ``zuul`` as the trigger. When Zuul merges a change to a project, it generates this event for every open change in the project. + .. warning:: + + Triggering on this event can cause poor performance when + using the GitHub driver with a large number of + installations. + .. value:: parent-change-enqueued When Zuul enqueues a change into any pipeline, it generates diff --git a/tests/base.py b/tests/base.py index 59c0d2ade9..e688abd023 100755 --- a/tests/base.py +++ b/tests/base.py @@ -170,7 +170,7 @@ class FakeGerritChange(object): 'status': status, 'subject': subject, 'submitRecords': [], - 'url': 'https://hostname/%s' % number} + 'url': 'https://%s/%s' % (self.gerrit.server, number)} self.upstream_root = upstream_root self.addPatchset(files=files, parent=parent) @@ -559,14 +559,13 @@ class FakeGerritConnection(gerritconnection.GerritConnection): return change.query() return {} - def simpleQuery(self, query): - self.log.debug("simpleQuery: %s" % query) - self.queries.append(query) + def _simpleQuery(self, query): if query.startswith('change:'): # Query a specific changeid changeid = query[len('change:'):] l = [change.query() for change in self.changes.values() - if change.data['id'] == changeid] + if (change.data['id'] == changeid or + change.data['number'] == changeid)] elif query.startswith('message:'): # Query the content of a commit message msg = query[len('message:'):].strip() @@ -577,6 +576,20 @@ class FakeGerritConnection(gerritconnection.GerritConnection): l = [change.query() for change in self.changes.values()] return l + def simpleQuery(self, query): + self.log.debug("simpleQuery: %s" % query) + self.queries.append(query) + results = [] + if query.startswith('(') and 'OR' in query: + query = query[1:-2] + for q in query.split(' OR '): + for r in self._simpleQuery(q): + if r not in results: + results.append(r) + else: + results = self._simpleQuery(query) + return results + def _start_watcher_thread(self, *args, **kw): pass diff --git a/tests/unit/test_connection.py b/tests/unit/test_connection.py index 054ee5f040..197b5256d0 100644 --- a/tests/unit/test_connection.py +++ b/tests/unit/test_connection.py @@ -119,7 +119,7 @@ class TestSQLConnection(ZuulDBTestCase): self.assertEqual('SUCCESS', buildset0['result']) self.assertEqual('Build succeeded.', buildset0['message']) self.assertEqual('tenant-one', buildset0['tenant']) - self.assertEqual('https://hostname/%d' % buildset0['change'], + self.assertEqual('https://review.example.com/%d' % buildset0['change'], buildset0['ref_url']) buildset0_builds = conn.execute( diff --git a/tests/unit/test_gerrit_crd.py b/tests/unit/test_gerrit_crd.py index 7a61eec23e..732bc3d600 100644 --- a/tests/unit/test_gerrit_crd.py +++ b/tests/unit/test_gerrit_crd.py @@ -24,9 +24,6 @@ from tests.base import ( class TestGerritCRD(ZuulTestCase): tenant_config_file = 'config/single-tenant/main.yaml' - def setUp(self): - raise self.skipTest("Feature not yet implemented") - def test_crd_gate(self): "Test cross-repo dependencies" A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A') @@ -100,18 +97,14 @@ class TestGerritCRD(ZuulTestCase): A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A') B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B') C1 = self.fake_gerrit.addFakeChange('org/project2', 'mp', 'C1') - C2 = self.fake_gerrit.addFakeChange('org/project2', 'mp', 'C2', - status='ABANDONED') - C1.data['id'] = B.data['id'] - C2.data['id'] = B.data['id'] A.addApproval('Code-Review', 2) B.addApproval('Code-Review', 2) C1.addApproval('Code-Review', 2) # A Depends-On: B+C1 - A.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % ( - A.subject, B.data['url']) + A.data['commitMessage'] = '%s\n\nDepends-On: %s\nDepends-On: %s\n' % ( + A.subject, B.data['url'], C1.data['url']) self.executor_server.hold_jobs_in_build = True B.addApproval('Approved', 1) diff --git a/tests/unit/test_scheduler.py b/tests/unit/test_scheduler.py index d5d8e83648..5db20b3174 100755 --- a/tests/unit/test_scheduler.py +++ b/tests/unit/test_scheduler.py @@ -4196,7 +4196,7 @@ For CI problems and help debugging, contact ci@example.org""" running_item = running_items[0] self.assertEqual([], running_item['failing_reasons']) self.assertEqual([], running_item['items_behind']) - self.assertEqual('https://hostname/1', running_item['url']) + self.assertEqual('https://review.example.com/1', running_item['url']) self.assertIsNone(running_item['item_ahead']) self.assertEqual('org/project', running_item['project']) self.assertIsNone(running_item['remaining_time']) diff --git a/tests/unit/test_zuultrigger.py b/tests/unit/test_zuultrigger.py index 3954a215de..55758537e1 100644 --- a/tests/unit/test_zuultrigger.py +++ b/tests/unit/test_zuultrigger.py @@ -126,5 +126,5 @@ class TestZuulTriggerProjectChangeMerged(ZuulTestCase): "dependencies was unable to be automatically merged with the " "current state of its repository. Please rebase the change and " "upload a new patchset.") - self.assertEqual(self.fake_gerrit.queries[1], - "project:org/project status:open") + self.assertIn("project:org/project status:open", + self.fake_gerrit.queries) diff --git a/zuul/driver/gerrit/gerritconnection.py b/zuul/driver/gerrit/gerritconnection.py index f4b090d408..d3b3c008b7 100644 --- a/zuul/driver/gerrit/gerritconnection.py +++ b/zuul/driver/gerrit/gerritconnection.py @@ -442,8 +442,19 @@ class GerritConnection(BaseConnection): # In case this change is already in the history we have a # cyclic dependency and don't need to update ourselves again # as this gets done in a previous frame of the call stack. - # NOTE(jeblair): I don't think it's possible to hit this case - # anymore as all paths hit the change cache first. + # NOTE(jeblair): The only case where this can still be hit is + # when we get an event for a change with no associated + # patchset; for instance, when the gerrit topic is changed. + # In that case, we will update change 1234,None, which will be + # inserted into the cache as its own entry, but then we will + # resolve the patchset before adding it to the history list, + # then if there are dependencies, we can walk down and then + # back up to the version of this change with a patchset which + # will match the history list but will have bypassed the + # change cache because the previous object had a patchset of + # None. All paths hit the change cache first. To be able to + # drop history, we need to resolve the patchset on events with + # no patchsets before adding the entry to the change cache. if (history and change.number and change.patchset and (change.number, change.patchset) in history): self.log.debug("Change %s is in history" % (change,)) @@ -461,6 +472,11 @@ class GerritConnection(BaseConnection): change.project = self.source.getProject(data['project']) change.branch = data['branch'] change.url = data['url'] + change.uris = [ + '%s/%s' % (self.server, change.number), + '%s/#/c/%s' % (self.server, change.number), + ] + max_ps = 0 files = [] for ps in data['patchSets']: @@ -481,6 +497,7 @@ class GerritConnection(BaseConnection): change.open = data['open'] change.status = data['status'] change.owner = data['owner'] + change.message = data['commitMessage'] if change.is_merged: # This change is merged, so we don't need to look any further @@ -494,7 +511,8 @@ class GerritConnection(BaseConnection): history = history[:] history.append((change.number, change.patchset)) - needs_changes = [] + needs_changes = set() + git_needs_changes = [] if 'dependsOn' in data: parts = data['dependsOn'][0]['ref'].split('/') dep_num, dep_ps = parts[3], parts[4] @@ -505,8 +523,11 @@ class GerritConnection(BaseConnection): # already merged. So even if it is "ABANDONED", we should not # ignore it. if (not dep.is_merged) and dep not in needs_changes: - needs_changes.append(dep) + git_needs_changes.append(dep) + needs_changes.add(dep) + change.git_needs_changes = git_needs_changes + compat_needs_changes = [] for record in self._getDependsOnFromCommit(data['commitMessage'], change): dep_num = record['number'] @@ -516,10 +537,12 @@ class GerritConnection(BaseConnection): (change, dep_num, dep_ps)) dep = self._getChange(dep_num, dep_ps, history=history) if dep.open and dep not in needs_changes: - needs_changes.append(dep) - change.needs_changes = needs_changes + compat_needs_changes.append(dep) + needs_changes.add(dep) + change.compat_needs_changes = compat_needs_changes - needed_by_changes = [] + needed_by_changes = set() + git_needed_by_changes = [] if 'neededBy' in data: for needed in data['neededBy']: parts = needed['ref'].split('/') @@ -527,9 +550,13 @@ class GerritConnection(BaseConnection): self.log.debug("Updating %s: Getting git-needed change %s,%s" % (change, dep_num, dep_ps)) dep = self._getChange(dep_num, dep_ps, history=history) - if dep.open and dep.is_current_patchset: - needed_by_changes.append(dep) + if (dep.open and dep.is_current_patchset and + dep not in needed_by_changes): + git_needed_by_changes.append(dep) + needed_by_changes.add(dep) + change.git_needed_by_changes = git_needed_by_changes + compat_needed_by_changes = [] for record in self._getNeededByFromCommit(data['id'], change): dep_num = record['number'] dep_ps = record['currentPatchSet']['number'] @@ -543,9 +570,13 @@ class GerritConnection(BaseConnection): refresh = (dep_num, dep_ps) not in history dep = self._getChange( dep_num, dep_ps, refresh=refresh, history=history) - if dep.open and dep.is_current_patchset: - needed_by_changes.append(dep) - change.needed_by_changes = needed_by_changes + if (dep.open and dep.is_current_patchset + and dep not in needed_by_changes): + compat_needed_by_changes.append(dep) + needed_by_changes.add(dep) + change.compat_needed_by_changes = compat_needed_by_changes + + self.sched.onChangeUpdated(change) return change diff --git a/zuul/driver/gerrit/gerritsource.py b/zuul/driver/gerrit/gerritsource.py index 7141080ac0..9a75b3eb05 100644 --- a/zuul/driver/gerrit/gerritsource.py +++ b/zuul/driver/gerrit/gerritsource.py @@ -12,12 +12,15 @@ # License for the specific language governing permissions and limitations # under the License. +import re +import urllib import logging import voluptuous as vs from zuul.source import BaseSource from zuul.model import Project from zuul.driver.gerrit.gerritmodel import GerritRefFilter from zuul.driver.util import scalar_or_list, to_list +from zuul.lib.dependson import find_dependency_headers class GerritSource(BaseSource): @@ -44,6 +47,59 @@ class GerritSource(BaseSource): def getChange(self, event, refresh=False): return self.connection.getChange(event, refresh) + change_re = re.compile(r"/(\#\/c\/)?(\d+)[\w]*") + + def getChangeByURL(self, url): + try: + parsed = urllib.parse.urlparse(url) + except ValueError: + return None + m = self.change_re.match(parsed.path) + if not m: + return None + try: + change_no = int(m.group(2)) + except ValueError: + return None + query = "change:%s" % (change_no,) + results = self.connection.simpleQuery(query) + if not results: + return None + change = self.connection._getChange( + results[0]['number'], results[0]['currentPatchSet']['number']) + return change + + def getChangesDependingOn(self, change, projects): + queries = set() + for uri in change.uris: + queries.add('message:%s' % uri) + query = '(' + ' OR '.join(queries) + ')' + results = self.connection.simpleQuery(query) + seen = set() + changes = [] + for result in results: + for match in find_dependency_headers(result['commitMessage']): + found = False + for uri in change.uris: + if uri in match: + found = True + break + if not found: + continue + key = (result['number'], result['currentPatchSet']['number']) + if key in seen: + continue + seen.add(key) + change = self.connection._getChange( + result['number'], result['currentPatchSet']['number']) + changes.append(change) + return changes + + def getCachedChanges(self): + for x in self.connection._change_cache.values(): + for y in x.values(): + yield y + def getProject(self, name): p = self.connection.getProject(name) if not p: diff --git a/zuul/driver/git/gitsource.py b/zuul/driver/git/gitsource.py index 78ae04ee7d..a7d42be12b 100644 --- a/zuul/driver/git/gitsource.py +++ b/zuul/driver/git/gitsource.py @@ -38,6 +38,15 @@ class GitSource(BaseSource): def getChange(self, event, refresh=False): return self.connection.getChange(event, refresh) + def getChangeByURL(self, url): + return None + + def getChangesDependingOn(self, change, projects): + return [] + + def getCachedChanges(self): + return [] + def getProject(self, name): p = self.connection.getProject(name) if not p: diff --git a/zuul/driver/github/githubconnection.py b/zuul/driver/github/githubconnection.py index 4b91c18896..1957cc2477 100644 --- a/zuul/driver/github/githubconnection.py +++ b/zuul/driver/github/githubconnection.py @@ -658,6 +658,9 @@ class GithubConnection(BaseConnection): change = self._getChange(project, event.change_number, event.patch_number, refresh=refresh) change.url = event.change_url + change.uris = [ + '%s/%s/pull/%s' % (self.server, project, change.number), + ] change.updated_at = self._ghTimestampToDate(event.updated_at) change.source_event = event change.is_current_patchset = (change.pr.get('head').get('sha') == @@ -699,58 +702,72 @@ class GithubConnection(BaseConnection): raise return change - def _getDependsOnFromPR(self, body): - prs = [] - seen = set() + def getChangesDependingOn(self, change, projects): + changes = [] + if not change.uris: + return changes - for match in self.depends_on_re.findall(body): - if match in seen: - self.log.debug("Ignoring duplicate Depends-On: %s" % (match,)) - continue - seen.add(match) - # Get the github url - url = match.rsplit()[-1] - # break it into the parts we need - _, org, proj, _, num = url.rsplit('/', 4) - # Get a pull object so we can get the head sha - pull = self.getPull('%s/%s' % (org, proj), int(num)) - prs.append(pull) + # Get a list of projects with unique installation ids + installation_ids = set() + installation_projects = set() - return prs + if projects: + # We only need to find changes in projects in the supplied + # ChangeQueue. Find all of the github installations for + # all of those projects, and search using each of them, so + # that if we get the right results based on the + # permissions granted to each of the installations. The + # common case for this is likely to be just one + # installation -- change queues aren't likely to span more + # than one installation. + for project in projects: + installation_id = self.installation_map.get(project) + if installation_id not in installation_ids: + installation_ids.add(installation_id) + installation_projects.add(project) + else: + # We aren't in the context of a change queue and we just + # need to query all installations. This currently only + # happens if certain features of the zuul trigger are + # used; generally it should be avoided. + for project, installation_id in self.installation_map.items(): + if installation_id not in installation_ids: + installation_ids.add(installation_id) + installation_projects.add(project) - def _getNeededByFromPR(self, change): - prs = [] - seen = set() - # This shouldn't return duplicate issues, but code as if it could - - # This leaves off the protocol, but looks for the specific GitHub - # hostname, the org/project, and the pull request number. - pattern = 'Depends-On %s/%s/pull/%s' % (self.server, - change.project.name, - change.number) + keys = set() + pattern = ' OR '.join(change.uris) query = '%s type:pr is:open in:body' % pattern - # FIXME(tobiash): find a way to query this for different installations - github = self.getGithubClient(change.project.name) - for issue in github.search_issues(query=query): - pr = issue.issue.pull_request().as_dict() - if not pr.get('url'): - continue - if issue in seen: - continue - # the issue provides no good description of the project :\ - org, proj, _, num = pr.get('url').split('/')[-4:] - self.log.debug("Found PR %s/%s/%s needs %s/%s" % - (org, proj, num, change.project.name, - change.number)) - prs.append(pr) - seen.add(issue) + # Repeat the search for each installation id (project) + for installation_project in installation_projects: + github = self.getGithubClient(installation_project) + for issue in github.search_issues(query=query): + pr = issue.issue.pull_request().as_dict() + if not pr.get('url'): + continue + # the issue provides no good description of the project :\ + org, proj, _, num = pr.get('url').split('/')[-4:] + proj = pr.get('base').get('repo').get('full_name') + sha = pr.get('head').get('sha') + key = (proj, num, sha) + if key in keys: + continue + self.log.debug("Found PR %s/%s needs %s/%s" % + (proj, num, change.project.name, + change.number)) + keys.add(key) + self.log.debug("Ran search issues: %s", query) + log_rate_limit(self.log, github) - self.log.debug("Ran search issues: %s", query) - log_rate_limit(self.log, github) - return prs + for key in keys: + (proj, num, sha) = key + project = self.source.getProject(proj) + change = self._getChange(project, int(num), patchset=sha) + changes.append(change) + + return changes def _updateChange(self, change, history=None): - # If this change is already in the history, we have a cyclic # dependency loop and we do not need to update again, since it # was done in a previous frame. @@ -770,10 +787,8 @@ class GithubConnection(BaseConnection): change.reviews = self.getPullReviews(change.project, change.number) change.labels = change.pr.get('labels') - change.body = change.pr.get('body') - # ensure body is at least an empty string - if not change.body: - change.body = '' + # ensure message is at least an empty string + change.message = change.pr.get('body') or '' if history is None: history = [] @@ -781,38 +796,7 @@ class GithubConnection(BaseConnection): history = history[:] history.append((change.project.name, change.number)) - needs_changes = [] - - # Get all the PRs this may depend on - for pr in self._getDependsOnFromPR(change.body): - proj = pr.get('base').get('repo').get('full_name') - pull = pr.get('number') - self.log.debug("Updating %s: Getting dependent " - "pull request %s/%s" % - (change, proj, pull)) - project = self.source.getProject(proj) - dep = self._getChange(project, pull, - patchset=pr.get('head').get('sha'), - history=history) - if (not dep.is_merged) and dep not in needs_changes: - needs_changes.append(dep) - - change.needs_changes = needs_changes - - needed_by_changes = [] - for pr in self._getNeededByFromPR(change): - proj = pr.get('base').get('repo').get('full_name') - pull = pr.get('number') - self.log.debug("Updating %s: Getting needed " - "pull request %s/%s" % - (change, proj, pull)) - project = self.source.getProject(proj) - dep = self._getChange(project, pull, - patchset=pr.get('head').get('sha'), - history=history) - if not dep.is_merged: - needed_by_changes.append(dep) - change.needed_by_changes = needed_by_changes + self.sched.onChangeUpdated(change) return change diff --git a/zuul/driver/github/githubmodel.py b/zuul/driver/github/githubmodel.py index ffd1c3f944..0731dd7338 100644 --- a/zuul/driver/github/githubmodel.py +++ b/zuul/driver/github/githubmodel.py @@ -37,7 +37,8 @@ class PullRequest(Change): self.labels = [] def isUpdateOf(self, other): - if (hasattr(other, 'number') and self.number == other.number and + if (self.project == other.project and + hasattr(other, 'number') and self.number == other.number and hasattr(other, 'patchset') and self.patchset != other.patchset and hasattr(other, 'updated_at') and self.updated_at > other.updated_at): diff --git a/zuul/driver/github/githubsource.py b/zuul/driver/github/githubsource.py index 1e7e07a88f..9834727d79 100644 --- a/zuul/driver/github/githubsource.py +++ b/zuul/driver/github/githubsource.py @@ -12,6 +12,8 @@ # License for the specific language governing permissions and limitations # under the License. +import re +import urllib import logging import time import voluptuous as v @@ -61,6 +63,38 @@ class GithubSource(BaseSource): def getChange(self, event, refresh=False): return self.connection.getChange(event, refresh) + change_re = re.compile(r"/(.*?)/(.*?)/pull/(\d+)[\w]*") + + def getChangeByURL(self, url): + try: + parsed = urllib.parse.urlparse(url) + except ValueError: + return None + m = self.change_re.match(parsed.path) + if not m: + return None + org = m.group(1) + proj = m.group(2) + try: + num = int(m.group(3)) + except ValueError: + return None + pull = self.connection.getPull('%s/%s' % (org, proj), int(num)) + if not pull: + return None + proj = pull.get('base').get('repo').get('full_name') + project = self.getProject(proj) + change = self.connection._getChange( + project, num, + patchset=pull.get('head').get('sha')) + return change + + def getChangesDependingOn(self, change, projects): + return self.connection.getChangesDependingOn(change, projects) + + def getCachedChanges(self): + return self.connection._change_cache.values() + def getProject(self, name): p = self.connection.getProject(name) if not p: diff --git a/zuul/driver/zuul/__init__.py b/zuul/driver/zuul/__init__.py index 0f6ec7da88..e381137a54 100644 --- a/zuul/driver/zuul/__init__.py +++ b/zuul/driver/zuul/__init__.py @@ -90,7 +90,18 @@ class ZuulDriver(Driver, TriggerInterface): if not hasattr(change, 'needed_by_changes'): self.log.debug(" %s does not support dependencies" % type(change)) return - for needs in change.needed_by_changes: + + # This is very inefficient, especially on systems with large + # numbers of github installations. This can be improved later + # with persistent storage of dependency information. + needed_by_changes = set(change.needed_by_changes) + for source in self.sched.connections.getSources(): + self.log.debug(" Checking source: %s", source) + needed_by_changes.update( + source.getChangesDependingOn(change, None)) + self.log.debug(" Following changes: %s", needed_by_changes) + + for needs in needed_by_changes: self._createParentChangeEnqueuedEvent(needs, pipeline) def _createParentChangeEnqueuedEvent(self, change, pipeline): diff --git a/zuul/executor/client.py b/zuul/executor/client.py index 06c2087f74..b21a290d5a 100644 --- a/zuul/executor/client.py +++ b/zuul/executor/client.py @@ -245,7 +245,7 @@ class ExecutorClient(object): for change in dependent_changes: # We have to find the project this way because it may not # be registered in the tenant (ie, a foreign project). - source = self.sched.connections.getSourceByHostname( + source = self.sched.connections.getSourceByCanonicalHostname( change['project']['canonical_hostname']) project = source.getProject(change['project']['name']) if project not in projects: diff --git a/zuul/lib/connections.py b/zuul/lib/connections.py index 262490a606..33c66f9a05 100644 --- a/zuul/lib/connections.py +++ b/zuul/lib/connections.py @@ -14,6 +14,7 @@ import logging import re +from collections import OrderedDict import zuul.driver.zuul import zuul.driver.gerrit @@ -38,7 +39,7 @@ class ConnectionRegistry(object): log = logging.getLogger("zuul.ConnectionRegistry") def __init__(self): - self.connections = {} + self.connections = OrderedDict() self.drivers = {} self.registerDriver(zuul.driver.zuul.ZuulDriver()) @@ -85,7 +86,7 @@ class ConnectionRegistry(object): def configure(self, config, source_only=False): # Register connections from the config - connections = {} + connections = OrderedDict() for section_name in config.sections(): con_match = re.match(r'^connection ([\'\"]?)(.*)(\1)$', @@ -154,6 +155,13 @@ class ConnectionRegistry(object): connection = self.connections[connection_name] return connection.driver.getSource(connection) + def getSources(self): + sources = [] + for connection in self.connections.values(): + if hasattr(connection.driver, 'getSource'): + sources.append(connection.driver.getSource(connection)) + return sources + def getReporter(self, connection_name, config=None): connection = self.connections[connection_name] return connection.driver.getReporter(connection, config) @@ -162,7 +170,7 @@ class ConnectionRegistry(object): connection = self.connections[connection_name] return connection.driver.getTrigger(connection, config) - def getSourceByHostname(self, canonical_hostname): + def getSourceByCanonicalHostname(self, canonical_hostname): for connection in self.connections.values(): if hasattr(connection, 'canonical_hostname'): if connection.canonical_hostname == canonical_hostname: diff --git a/zuul/lib/dependson.py b/zuul/lib/dependson.py new file mode 100644 index 0000000000..cd0f6efa37 --- /dev/null +++ b/zuul/lib/dependson.py @@ -0,0 +1,29 @@ +# Copyright 2018 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import re + + +DEPENDS_ON_RE = re.compile(r"^Depends-On: (.*?)\s*$", + re.MULTILINE | re.IGNORECASE) + + +def find_dependency_headers(message): + # Search for Depends-On headers + dependencies = [] + for match in DEPENDS_ON_RE.findall(message): + if match in dependencies: + continue + dependencies.append(match) + return dependencies diff --git a/zuul/manager/__init__.py b/zuul/manager/__init__.py index d205afc231..b8a280fde9 100644 --- a/zuul/manager/__init__.py +++ b/zuul/manager/__init__.py @@ -12,9 +12,11 @@ import logging import textwrap +import urllib from zuul import exceptions from zuul import model +from zuul.lib.dependson import find_dependency_headers class DynamicChangeQueueContextManager(object): @@ -343,6 +345,32 @@ class PipelineManager(object): self.dequeueItem(item) self.reportStats(item) + def updateCommitDependencies(self, change, change_queue): + # Search for Depends-On headers and find appropriate changes + self.log.debug(" Updating commit dependencies for %s", change) + change.refresh_deps = False + dependencies = [] + seen = set() + for match in find_dependency_headers(change.message): + self.log.debug(" Found Depends-On header: %s", match) + if match in seen: + continue + seen.add(match) + try: + url = urllib.parse.urlparse(match) + except ValueError: + continue + source = self.sched.connections.getSourceByCanonicalHostname( + url.hostname) + if not source: + continue + self.log.debug(" Found source: %s", source) + dep = source.getChangeByURL(match) + if dep and (not dep.is_merged) and dep not in dependencies: + self.log.debug(" Adding dependency: %s", dep) + dependencies.append(dep) + change.commit_needs_changes = dependencies + def provisionNodes(self, item): jobs = item.findJobsToRequest() if not jobs: diff --git a/zuul/manager/dependent.py b/zuul/manager/dependent.py index 5aef453571..23c2cdb753 100644 --- a/zuul/manager/dependent.py +++ b/zuul/manager/dependent.py @@ -95,12 +95,29 @@ class DependentPipelineManager(PipelineManager): def enqueueChangesBehind(self, change, quiet, ignore_requirements, change_queue): self.log.debug("Checking for changes needing %s:" % change) - to_enqueue = [] - source = change.project.source if not hasattr(change, 'needed_by_changes'): self.log.debug(" %s does not support dependencies" % type(change)) return - for other_change in change.needed_by_changes: + + # for project in change_queue, project.source get changes, then dedup. + sources = set() + for project in change_queue.projects: + sources.add(project.source) + + seen = set(change.needed_by_changes) + needed_by_changes = change.needed_by_changes[:] + for source in sources: + self.log.debug(" Checking source: %s", source) + for c in source.getChangesDependingOn(change, + change_queue.projects): + if c not in seen: + seen.add(c) + needed_by_changes.append(c) + + self.log.debug(" Following changes: %s", needed_by_changes) + + to_enqueue = [] + for other_change in needed_by_changes: with self.getChangeQueue(other_change) as other_change_queue: if other_change_queue != change_queue: self.log.debug(" Change %s in project %s can not be " @@ -108,6 +125,7 @@ class DependentPipelineManager(PipelineManager): (other_change, other_change.project, change_queue)) continue + source = other_change.project.source if source.canMerge(other_change, self.getSubmitAllowNeeds()): self.log.debug(" Change %s needs %s and is ready to merge" % (other_change, change)) @@ -145,10 +163,13 @@ class DependentPipelineManager(PipelineManager): return True def checkForChangesNeededBy(self, change, change_queue): - self.log.debug("Checking for changes needed by %s:" % change) - source = change.project.source # Return true if okay to proceed enqueing this change, # false if the change should not be enqueued. + self.log.debug("Checking for changes needed by %s:" % change) + source = change.project.source + if (hasattr(change, 'commit_needs_changes') and + (change.refresh_deps or change.commit_needs_changes is None)): + self.updateCommitDependencies(change, change_queue) if not hasattr(change, 'needs_changes'): self.log.debug(" %s does not support dependencies" % type(change)) return True diff --git a/zuul/manager/independent.py b/zuul/manager/independent.py index 65f5ca0701..0c2baf0106 100644 --- a/zuul/manager/independent.py +++ b/zuul/manager/independent.py @@ -70,6 +70,9 @@ class IndependentPipelineManager(PipelineManager): self.log.debug("Checking for changes needed by %s:" % change) # Return true if okay to proceed enqueing this change, # false if the change should not be enqueued. + if (hasattr(change, 'commit_needs_changes') and + (change.refresh_deps or change.commit_needs_changes is None)): + self.updateCommitDependencies(change, None) if not hasattr(change, 'needs_changes'): self.log.debug(" %s does not support dependencies" % type(change)) return True diff --git a/zuul/model.py b/zuul/model.py index 16a701ddcb..bac9e4cc8d 100644 --- a/zuul/model.py +++ b/zuul/model.py @@ -2103,11 +2103,28 @@ class Change(Branch): def __init__(self, project): super(Change, self).__init__(project) self.number = None + # The gitweb url for browsing the change self.url = None + # URIs for this change which may appear in depends-on headers. + # Note this omits the scheme; i.e., is hostname/path. + self.uris = [] self.patchset = None - self.needs_changes = [] - self.needed_by_changes = [] + # Changes that the source determined are needed due to the + # git DAG: + self.git_needs_changes = [] + self.git_needed_by_changes = [] + + # Changes that the source determined are needed by backwards + # compatible processing of Depends-On headers (Gerrit only): + self.compat_needs_changes = [] + self.compat_needed_by_changes = [] + + # Changes that the pipeline manager determined are needed due + # to Depends-On headers (all drivers): + self.commit_needs_changes = None + self.refresh_deps = False + self.is_current_patchset = True self.can_merge = False self.is_merged = False @@ -2116,6 +2133,11 @@ class Change(Branch): self.status = None self.owner = None + # This may be the commit message, or it may be a cover message + # in the case of a PR. Either way, it's the place where we + # look for depends-on headers. + self.message = None + self.source_event = None def _id(self): @@ -2129,8 +2151,18 @@ class Change(Branch): return True return False + @property + def needs_changes(self): + return (self.git_needs_changes + self.compat_needs_changes + + self.commit_needs_changes) + + @property + def needed_by_changes(self): + return (self.git_needed_by_changes + self.compat_needed_by_changes) + def isUpdateOf(self, other): - if ((hasattr(other, 'number') and self.number == other.number) and + if (self.project == other.project and + (hasattr(other, 'number') and self.number == other.number) and (hasattr(other, 'patchset') and self.patchset is not None and other.patchset is not None and diff --git a/zuul/scheduler.py b/zuul/scheduler.py index c3f2f234d8..a2e3b6eb1e 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -1088,3 +1088,25 @@ class Scheduler(threading.Thread): for pipeline in tenant.layout.pipelines.values(): pipelines.append(pipeline.formatStatusJSON(websocket_url)) return json.dumps(data) + + def onChangeUpdated(self, change): + """Remove stale dependency references on change update. + + When a change is updated with a new patchset, other changes in + the system may still have a reference to the old patchset in + their dependencies. Search for those (across all sources) and + mark that their dependencies are out of date. This will cause + them to be refreshed the next time the queue processor + examines them. + """ + + self.log.debug("Change %s has been updated, clearing dependent " + "change caches", change) + for source in self.connections.getSources(): + for other_change in source.getCachedChanges(): + if other_change.commit_needs_changes is None: + continue + for dep in other_change.commit_needs_changes: + if change.isUpdateOf(dep): + other_change.refresh_deps = True + change.refresh_deps = True diff --git a/zuul/source/__init__.py b/zuul/source/__init__.py index 0396aff499..00dfc9c3ab 100644 --- a/zuul/source/__init__.py +++ b/zuul/source/__init__.py @@ -51,6 +51,29 @@ class BaseSource(object, metaclass=abc.ABCMeta): def getChange(self, event): """Get the change representing an event.""" + @abc.abstractmethod + def getChangeByURL(self, url): + """Get the change corresponding to the supplied URL. + + The URL may may not correspond to this source; if it doesn't, + or there is no change at that URL, return None. + + """ + + @abc.abstractmethod + def getChangesDependingOn(self, change, projects): + """Return changes which depend on changes at the supplied URIs. + + Search this source for changes which depend on the supplied + change. Generally the Change.uris attribute should be used to + perform the search, as it contains a list of URLs without the + scheme which represent a single change + + If the projects argument is None, search across all known + projects. If it is supplied, the search may optionally be + restricted to only those projects. + """ + @abc.abstractmethod def getProjectOpenChanges(self, project): """Get the open changes for a project."""