Support cross-source dependencies

Additional tests and docs in later patches.

Change-Id: I3b86a1e3dd507fa5e584680fb6c86d35f9ff3e23
Story: 2001334
Task: 5885
This commit is contained in:
James E. Blair 2018-01-02 14:20:17 -08:00
parent 1c2023c108
commit 0e4c791c7b
22 changed files with 429 additions and 125 deletions

View File

@ -26,6 +26,12 @@ can simply be used by listing ``zuul`` as the trigger.
When Zuul merges a change to a project, it generates this When Zuul merges a change to a project, it generates this
event for every open change in the project. event for every open change in the project.
.. warning::
Triggering on this event can cause poor performance when
using the GitHub driver with a large number of
installations.
.. value:: parent-change-enqueued .. value:: parent-change-enqueued
When Zuul enqueues a change into any pipeline, it generates When Zuul enqueues a change into any pipeline, it generates

View File

@ -170,7 +170,7 @@ class FakeGerritChange(object):
'status': status, 'status': status,
'subject': subject, 'subject': subject,
'submitRecords': [], 'submitRecords': [],
'url': 'https://hostname/%s' % number} 'url': 'https://%s/%s' % (self.gerrit.server, number)}
self.upstream_root = upstream_root self.upstream_root = upstream_root
self.addPatchset(files=files, parent=parent) self.addPatchset(files=files, parent=parent)
@ -559,14 +559,13 @@ class FakeGerritConnection(gerritconnection.GerritConnection):
return change.query() return change.query()
return {} return {}
def simpleQuery(self, query): def _simpleQuery(self, query):
self.log.debug("simpleQuery: %s" % query)
self.queries.append(query)
if query.startswith('change:'): if query.startswith('change:'):
# Query a specific changeid # Query a specific changeid
changeid = query[len('change:'):] changeid = query[len('change:'):]
l = [change.query() for change in self.changes.values() l = [change.query() for change in self.changes.values()
if change.data['id'] == changeid] if (change.data['id'] == changeid or
change.data['number'] == changeid)]
elif query.startswith('message:'): elif query.startswith('message:'):
# Query the content of a commit message # Query the content of a commit message
msg = query[len('message:'):].strip() msg = query[len('message:'):].strip()
@ -577,6 +576,20 @@ class FakeGerritConnection(gerritconnection.GerritConnection):
l = [change.query() for change in self.changes.values()] l = [change.query() for change in self.changes.values()]
return l return l
def simpleQuery(self, query):
self.log.debug("simpleQuery: %s" % query)
self.queries.append(query)
results = []
if query.startswith('(') and 'OR' in query:
query = query[1:-2]
for q in query.split(' OR '):
for r in self._simpleQuery(q):
if r not in results:
results.append(r)
else:
results = self._simpleQuery(query)
return results
def _start_watcher_thread(self, *args, **kw): def _start_watcher_thread(self, *args, **kw):
pass pass

View File

@ -119,7 +119,7 @@ class TestSQLConnection(ZuulDBTestCase):
self.assertEqual('SUCCESS', buildset0['result']) self.assertEqual('SUCCESS', buildset0['result'])
self.assertEqual('Build succeeded.', buildset0['message']) self.assertEqual('Build succeeded.', buildset0['message'])
self.assertEqual('tenant-one', buildset0['tenant']) self.assertEqual('tenant-one', buildset0['tenant'])
self.assertEqual('https://hostname/%d' % buildset0['change'], self.assertEqual('https://review.example.com/%d' % buildset0['change'],
buildset0['ref_url']) buildset0['ref_url'])
buildset0_builds = conn.execute( buildset0_builds = conn.execute(

View File

@ -24,9 +24,6 @@ from tests.base import (
class TestGerritCRD(ZuulTestCase): class TestGerritCRD(ZuulTestCase):
tenant_config_file = 'config/single-tenant/main.yaml' tenant_config_file = 'config/single-tenant/main.yaml'
def setUp(self):
raise self.skipTest("Feature not yet implemented")
def test_crd_gate(self): def test_crd_gate(self):
"Test cross-repo dependencies" "Test cross-repo dependencies"
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A') A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
@ -100,18 +97,14 @@ class TestGerritCRD(ZuulTestCase):
A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A') A = self.fake_gerrit.addFakeChange('org/project1', 'master', 'A')
B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B') B = self.fake_gerrit.addFakeChange('org/project2', 'master', 'B')
C1 = self.fake_gerrit.addFakeChange('org/project2', 'mp', 'C1') C1 = self.fake_gerrit.addFakeChange('org/project2', 'mp', 'C1')
C2 = self.fake_gerrit.addFakeChange('org/project2', 'mp', 'C2',
status='ABANDONED')
C1.data['id'] = B.data['id']
C2.data['id'] = B.data['id']
A.addApproval('Code-Review', 2) A.addApproval('Code-Review', 2)
B.addApproval('Code-Review', 2) B.addApproval('Code-Review', 2)
C1.addApproval('Code-Review', 2) C1.addApproval('Code-Review', 2)
# A Depends-On: B+C1 # A Depends-On: B+C1
A.data['commitMessage'] = '%s\n\nDepends-On: %s\n' % ( A.data['commitMessage'] = '%s\n\nDepends-On: %s\nDepends-On: %s\n' % (
A.subject, B.data['url']) A.subject, B.data['url'], C1.data['url'])
self.executor_server.hold_jobs_in_build = True self.executor_server.hold_jobs_in_build = True
B.addApproval('Approved', 1) B.addApproval('Approved', 1)

View File

@ -4196,7 +4196,7 @@ For CI problems and help debugging, contact ci@example.org"""
running_item = running_items[0] running_item = running_items[0]
self.assertEqual([], running_item['failing_reasons']) self.assertEqual([], running_item['failing_reasons'])
self.assertEqual([], running_item['items_behind']) self.assertEqual([], running_item['items_behind'])
self.assertEqual('https://hostname/1', running_item['url']) self.assertEqual('https://review.example.com/1', running_item['url'])
self.assertIsNone(running_item['item_ahead']) self.assertIsNone(running_item['item_ahead'])
self.assertEqual('org/project', running_item['project']) self.assertEqual('org/project', running_item['project'])
self.assertIsNone(running_item['remaining_time']) self.assertIsNone(running_item['remaining_time'])

View File

@ -126,5 +126,5 @@ class TestZuulTriggerProjectChangeMerged(ZuulTestCase):
"dependencies was unable to be automatically merged with the " "dependencies was unable to be automatically merged with the "
"current state of its repository. Please rebase the change and " "current state of its repository. Please rebase the change and "
"upload a new patchset.") "upload a new patchset.")
self.assertEqual(self.fake_gerrit.queries[1], self.assertIn("project:org/project status:open",
"project:org/project status:open") self.fake_gerrit.queries)

View File

@ -442,8 +442,19 @@ class GerritConnection(BaseConnection):
# In case this change is already in the history we have a # In case this change is already in the history we have a
# cyclic dependency and don't need to update ourselves again # cyclic dependency and don't need to update ourselves again
# as this gets done in a previous frame of the call stack. # as this gets done in a previous frame of the call stack.
# NOTE(jeblair): I don't think it's possible to hit this case # NOTE(jeblair): The only case where this can still be hit is
# anymore as all paths hit the change cache first. # when we get an event for a change with no associated
# patchset; for instance, when the gerrit topic is changed.
# In that case, we will update change 1234,None, which will be
# inserted into the cache as its own entry, but then we will
# resolve the patchset before adding it to the history list,
# then if there are dependencies, we can walk down and then
# back up to the version of this change with a patchset which
# will match the history list but will have bypassed the
# change cache because the previous object had a patchset of
# None. All paths hit the change cache first. To be able to
# drop history, we need to resolve the patchset on events with
# no patchsets before adding the entry to the change cache.
if (history and change.number and change.patchset and if (history and change.number and change.patchset and
(change.number, change.patchset) in history): (change.number, change.patchset) in history):
self.log.debug("Change %s is in history" % (change,)) self.log.debug("Change %s is in history" % (change,))
@ -461,6 +472,11 @@ class GerritConnection(BaseConnection):
change.project = self.source.getProject(data['project']) change.project = self.source.getProject(data['project'])
change.branch = data['branch'] change.branch = data['branch']
change.url = data['url'] change.url = data['url']
change.uris = [
'%s/%s' % (self.server, change.number),
'%s/#/c/%s' % (self.server, change.number),
]
max_ps = 0 max_ps = 0
files = [] files = []
for ps in data['patchSets']: for ps in data['patchSets']:
@ -481,6 +497,7 @@ class GerritConnection(BaseConnection):
change.open = data['open'] change.open = data['open']
change.status = data['status'] change.status = data['status']
change.owner = data['owner'] change.owner = data['owner']
change.message = data['commitMessage']
if change.is_merged: if change.is_merged:
# This change is merged, so we don't need to look any further # This change is merged, so we don't need to look any further
@ -494,7 +511,8 @@ class GerritConnection(BaseConnection):
history = history[:] history = history[:]
history.append((change.number, change.patchset)) history.append((change.number, change.patchset))
needs_changes = [] needs_changes = set()
git_needs_changes = []
if 'dependsOn' in data: if 'dependsOn' in data:
parts = data['dependsOn'][0]['ref'].split('/') parts = data['dependsOn'][0]['ref'].split('/')
dep_num, dep_ps = parts[3], parts[4] dep_num, dep_ps = parts[3], parts[4]
@ -505,8 +523,11 @@ class GerritConnection(BaseConnection):
# already merged. So even if it is "ABANDONED", we should not # already merged. So even if it is "ABANDONED", we should not
# ignore it. # ignore it.
if (not dep.is_merged) and dep not in needs_changes: if (not dep.is_merged) and dep not in needs_changes:
needs_changes.append(dep) git_needs_changes.append(dep)
needs_changes.add(dep)
change.git_needs_changes = git_needs_changes
compat_needs_changes = []
for record in self._getDependsOnFromCommit(data['commitMessage'], for record in self._getDependsOnFromCommit(data['commitMessage'],
change): change):
dep_num = record['number'] dep_num = record['number']
@ -516,10 +537,12 @@ class GerritConnection(BaseConnection):
(change, dep_num, dep_ps)) (change, dep_num, dep_ps))
dep = self._getChange(dep_num, dep_ps, history=history) dep = self._getChange(dep_num, dep_ps, history=history)
if dep.open and dep not in needs_changes: if dep.open and dep not in needs_changes:
needs_changes.append(dep) compat_needs_changes.append(dep)
change.needs_changes = needs_changes needs_changes.add(dep)
change.compat_needs_changes = compat_needs_changes
needed_by_changes = [] needed_by_changes = set()
git_needed_by_changes = []
if 'neededBy' in data: if 'neededBy' in data:
for needed in data['neededBy']: for needed in data['neededBy']:
parts = needed['ref'].split('/') parts = needed['ref'].split('/')
@ -527,9 +550,13 @@ class GerritConnection(BaseConnection):
self.log.debug("Updating %s: Getting git-needed change %s,%s" % self.log.debug("Updating %s: Getting git-needed change %s,%s" %
(change, dep_num, dep_ps)) (change, dep_num, dep_ps))
dep = self._getChange(dep_num, dep_ps, history=history) dep = self._getChange(dep_num, dep_ps, history=history)
if dep.open and dep.is_current_patchset: if (dep.open and dep.is_current_patchset and
needed_by_changes.append(dep) dep not in needed_by_changes):
git_needed_by_changes.append(dep)
needed_by_changes.add(dep)
change.git_needed_by_changes = git_needed_by_changes
compat_needed_by_changes = []
for record in self._getNeededByFromCommit(data['id'], change): for record in self._getNeededByFromCommit(data['id'], change):
dep_num = record['number'] dep_num = record['number']
dep_ps = record['currentPatchSet']['number'] dep_ps = record['currentPatchSet']['number']
@ -543,9 +570,13 @@ class GerritConnection(BaseConnection):
refresh = (dep_num, dep_ps) not in history refresh = (dep_num, dep_ps) not in history
dep = self._getChange( dep = self._getChange(
dep_num, dep_ps, refresh=refresh, history=history) dep_num, dep_ps, refresh=refresh, history=history)
if dep.open and dep.is_current_patchset: if (dep.open and dep.is_current_patchset
needed_by_changes.append(dep) and dep not in needed_by_changes):
change.needed_by_changes = needed_by_changes compat_needed_by_changes.append(dep)
needed_by_changes.add(dep)
change.compat_needed_by_changes = compat_needed_by_changes
self.sched.onChangeUpdated(change)
return change return change

View File

@ -12,12 +12,15 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import re
import urllib
import logging import logging
import voluptuous as vs import voluptuous as vs
from zuul.source import BaseSource from zuul.source import BaseSource
from zuul.model import Project from zuul.model import Project
from zuul.driver.gerrit.gerritmodel import GerritRefFilter from zuul.driver.gerrit.gerritmodel import GerritRefFilter
from zuul.driver.util import scalar_or_list, to_list from zuul.driver.util import scalar_or_list, to_list
from zuul.lib.dependson import find_dependency_headers
class GerritSource(BaseSource): class GerritSource(BaseSource):
@ -44,6 +47,59 @@ class GerritSource(BaseSource):
def getChange(self, event, refresh=False): def getChange(self, event, refresh=False):
return self.connection.getChange(event, refresh) return self.connection.getChange(event, refresh)
change_re = re.compile(r"/(\#\/c\/)?(\d+)[\w]*")
def getChangeByURL(self, url):
try:
parsed = urllib.parse.urlparse(url)
except ValueError:
return None
m = self.change_re.match(parsed.path)
if not m:
return None
try:
change_no = int(m.group(2))
except ValueError:
return None
query = "change:%s" % (change_no,)
results = self.connection.simpleQuery(query)
if not results:
return None
change = self.connection._getChange(
results[0]['number'], results[0]['currentPatchSet']['number'])
return change
def getChangesDependingOn(self, change, projects):
queries = set()
for uri in change.uris:
queries.add('message:%s' % uri)
query = '(' + ' OR '.join(queries) + ')'
results = self.connection.simpleQuery(query)
seen = set()
changes = []
for result in results:
for match in find_dependency_headers(result['commitMessage']):
found = False
for uri in change.uris:
if uri in match:
found = True
break
if not found:
continue
key = (result['number'], result['currentPatchSet']['number'])
if key in seen:
continue
seen.add(key)
change = self.connection._getChange(
result['number'], result['currentPatchSet']['number'])
changes.append(change)
return changes
def getCachedChanges(self):
for x in self.connection._change_cache.values():
for y in x.values():
yield y
def getProject(self, name): def getProject(self, name):
p = self.connection.getProject(name) p = self.connection.getProject(name)
if not p: if not p:

View File

@ -38,6 +38,15 @@ class GitSource(BaseSource):
def getChange(self, event, refresh=False): def getChange(self, event, refresh=False):
return self.connection.getChange(event, refresh) return self.connection.getChange(event, refresh)
def getChangeByURL(self, url):
return None
def getChangesDependingOn(self, change, projects):
return []
def getCachedChanges(self):
return []
def getProject(self, name): def getProject(self, name):
p = self.connection.getProject(name) p = self.connection.getProject(name)
if not p: if not p:

View File

@ -658,6 +658,9 @@ class GithubConnection(BaseConnection):
change = self._getChange(project, event.change_number, change = self._getChange(project, event.change_number,
event.patch_number, refresh=refresh) event.patch_number, refresh=refresh)
change.url = event.change_url change.url = event.change_url
change.uris = [
'%s/%s/pull/%s' % (self.server, project, change.number),
]
change.updated_at = self._ghTimestampToDate(event.updated_at) change.updated_at = self._ghTimestampToDate(event.updated_at)
change.source_event = event change.source_event = event
change.is_current_patchset = (change.pr.get('head').get('sha') == change.is_current_patchset = (change.pr.get('head').get('sha') ==
@ -699,58 +702,72 @@ class GithubConnection(BaseConnection):
raise raise
return change return change
def _getDependsOnFromPR(self, body): def getChangesDependingOn(self, change, projects):
prs = [] changes = []
seen = set() if not change.uris:
return changes
for match in self.depends_on_re.findall(body): # Get a list of projects with unique installation ids
if match in seen: installation_ids = set()
self.log.debug("Ignoring duplicate Depends-On: %s" % (match,)) installation_projects = set()
continue
seen.add(match)
# Get the github url
url = match.rsplit()[-1]
# break it into the parts we need
_, org, proj, _, num = url.rsplit('/', 4)
# Get a pull object so we can get the head sha
pull = self.getPull('%s/%s' % (org, proj), int(num))
prs.append(pull)
return prs if projects:
# We only need to find changes in projects in the supplied
# ChangeQueue. Find all of the github installations for
# all of those projects, and search using each of them, so
# that if we get the right results based on the
# permissions granted to each of the installations. The
# common case for this is likely to be just one
# installation -- change queues aren't likely to span more
# than one installation.
for project in projects:
installation_id = self.installation_map.get(project)
if installation_id not in installation_ids:
installation_ids.add(installation_id)
installation_projects.add(project)
else:
# We aren't in the context of a change queue and we just
# need to query all installations. This currently only
# happens if certain features of the zuul trigger are
# used; generally it should be avoided.
for project, installation_id in self.installation_map.items():
if installation_id not in installation_ids:
installation_ids.add(installation_id)
installation_projects.add(project)
def _getNeededByFromPR(self, change): keys = set()
prs = [] pattern = ' OR '.join(change.uris)
seen = set()
# This shouldn't return duplicate issues, but code as if it could
# This leaves off the protocol, but looks for the specific GitHub
# hostname, the org/project, and the pull request number.
pattern = 'Depends-On %s/%s/pull/%s' % (self.server,
change.project.name,
change.number)
query = '%s type:pr is:open in:body' % pattern query = '%s type:pr is:open in:body' % pattern
# FIXME(tobiash): find a way to query this for different installations # Repeat the search for each installation id (project)
github = self.getGithubClient(change.project.name) for installation_project in installation_projects:
for issue in github.search_issues(query=query): github = self.getGithubClient(installation_project)
pr = issue.issue.pull_request().as_dict() for issue in github.search_issues(query=query):
if not pr.get('url'): pr = issue.issue.pull_request().as_dict()
continue if not pr.get('url'):
if issue in seen: continue
continue # the issue provides no good description of the project :\
# the issue provides no good description of the project :\ org, proj, _, num = pr.get('url').split('/')[-4:]
org, proj, _, num = pr.get('url').split('/')[-4:] proj = pr.get('base').get('repo').get('full_name')
self.log.debug("Found PR %s/%s/%s needs %s/%s" % sha = pr.get('head').get('sha')
(org, proj, num, change.project.name, key = (proj, num, sha)
change.number)) if key in keys:
prs.append(pr) continue
seen.add(issue) self.log.debug("Found PR %s/%s needs %s/%s" %
(proj, num, change.project.name,
change.number))
keys.add(key)
self.log.debug("Ran search issues: %s", query)
log_rate_limit(self.log, github)
self.log.debug("Ran search issues: %s", query) for key in keys:
log_rate_limit(self.log, github) (proj, num, sha) = key
return prs project = self.source.getProject(proj)
change = self._getChange(project, int(num), patchset=sha)
changes.append(change)
return changes
def _updateChange(self, change, history=None): def _updateChange(self, change, history=None):
# If this change is already in the history, we have a cyclic # If this change is already in the history, we have a cyclic
# dependency loop and we do not need to update again, since it # dependency loop and we do not need to update again, since it
# was done in a previous frame. # was done in a previous frame.
@ -770,10 +787,8 @@ class GithubConnection(BaseConnection):
change.reviews = self.getPullReviews(change.project, change.reviews = self.getPullReviews(change.project,
change.number) change.number)
change.labels = change.pr.get('labels') change.labels = change.pr.get('labels')
change.body = change.pr.get('body') # ensure message is at least an empty string
# ensure body is at least an empty string change.message = change.pr.get('body') or ''
if not change.body:
change.body = ''
if history is None: if history is None:
history = [] history = []
@ -781,38 +796,7 @@ class GithubConnection(BaseConnection):
history = history[:] history = history[:]
history.append((change.project.name, change.number)) history.append((change.project.name, change.number))
needs_changes = [] self.sched.onChangeUpdated(change)
# Get all the PRs this may depend on
for pr in self._getDependsOnFromPR(change.body):
proj = pr.get('base').get('repo').get('full_name')
pull = pr.get('number')
self.log.debug("Updating %s: Getting dependent "
"pull request %s/%s" %
(change, proj, pull))
project = self.source.getProject(proj)
dep = self._getChange(project, pull,
patchset=pr.get('head').get('sha'),
history=history)
if (not dep.is_merged) and dep not in needs_changes:
needs_changes.append(dep)
change.needs_changes = needs_changes
needed_by_changes = []
for pr in self._getNeededByFromPR(change):
proj = pr.get('base').get('repo').get('full_name')
pull = pr.get('number')
self.log.debug("Updating %s: Getting needed "
"pull request %s/%s" %
(change, proj, pull))
project = self.source.getProject(proj)
dep = self._getChange(project, pull,
patchset=pr.get('head').get('sha'),
history=history)
if not dep.is_merged:
needed_by_changes.append(dep)
change.needed_by_changes = needed_by_changes
return change return change

View File

@ -37,7 +37,8 @@ class PullRequest(Change):
self.labels = [] self.labels = []
def isUpdateOf(self, other): def isUpdateOf(self, other):
if (hasattr(other, 'number') and self.number == other.number and if (self.project == other.project and
hasattr(other, 'number') and self.number == other.number and
hasattr(other, 'patchset') and self.patchset != other.patchset and hasattr(other, 'patchset') and self.patchset != other.patchset and
hasattr(other, 'updated_at') and hasattr(other, 'updated_at') and
self.updated_at > other.updated_at): self.updated_at > other.updated_at):

View File

@ -12,6 +12,8 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import re
import urllib
import logging import logging
import time import time
import voluptuous as v import voluptuous as v
@ -61,6 +63,38 @@ class GithubSource(BaseSource):
def getChange(self, event, refresh=False): def getChange(self, event, refresh=False):
return self.connection.getChange(event, refresh) return self.connection.getChange(event, refresh)
change_re = re.compile(r"/(.*?)/(.*?)/pull/(\d+)[\w]*")
def getChangeByURL(self, url):
try:
parsed = urllib.parse.urlparse(url)
except ValueError:
return None
m = self.change_re.match(parsed.path)
if not m:
return None
org = m.group(1)
proj = m.group(2)
try:
num = int(m.group(3))
except ValueError:
return None
pull = self.connection.getPull('%s/%s' % (org, proj), int(num))
if not pull:
return None
proj = pull.get('base').get('repo').get('full_name')
project = self.getProject(proj)
change = self.connection._getChange(
project, num,
patchset=pull.get('head').get('sha'))
return change
def getChangesDependingOn(self, change, projects):
return self.connection.getChangesDependingOn(change, projects)
def getCachedChanges(self):
return self.connection._change_cache.values()
def getProject(self, name): def getProject(self, name):
p = self.connection.getProject(name) p = self.connection.getProject(name)
if not p: if not p:

View File

@ -90,7 +90,18 @@ class ZuulDriver(Driver, TriggerInterface):
if not hasattr(change, 'needed_by_changes'): if not hasattr(change, 'needed_by_changes'):
self.log.debug(" %s does not support dependencies" % type(change)) self.log.debug(" %s does not support dependencies" % type(change))
return return
for needs in change.needed_by_changes:
# This is very inefficient, especially on systems with large
# numbers of github installations. This can be improved later
# with persistent storage of dependency information.
needed_by_changes = set(change.needed_by_changes)
for source in self.sched.connections.getSources():
self.log.debug(" Checking source: %s", source)
needed_by_changes.update(
source.getChangesDependingOn(change, None))
self.log.debug(" Following changes: %s", needed_by_changes)
for needs in needed_by_changes:
self._createParentChangeEnqueuedEvent(needs, pipeline) self._createParentChangeEnqueuedEvent(needs, pipeline)
def _createParentChangeEnqueuedEvent(self, change, pipeline): def _createParentChangeEnqueuedEvent(self, change, pipeline):

View File

@ -245,7 +245,7 @@ class ExecutorClient(object):
for change in dependent_changes: for change in dependent_changes:
# We have to find the project this way because it may not # We have to find the project this way because it may not
# be registered in the tenant (ie, a foreign project). # be registered in the tenant (ie, a foreign project).
source = self.sched.connections.getSourceByHostname( source = self.sched.connections.getSourceByCanonicalHostname(
change['project']['canonical_hostname']) change['project']['canonical_hostname'])
project = source.getProject(change['project']['name']) project = source.getProject(change['project']['name'])
if project not in projects: if project not in projects:

View File

@ -14,6 +14,7 @@
import logging import logging
import re import re
from collections import OrderedDict
import zuul.driver.zuul import zuul.driver.zuul
import zuul.driver.gerrit import zuul.driver.gerrit
@ -38,7 +39,7 @@ class ConnectionRegistry(object):
log = logging.getLogger("zuul.ConnectionRegistry") log = logging.getLogger("zuul.ConnectionRegistry")
def __init__(self): def __init__(self):
self.connections = {} self.connections = OrderedDict()
self.drivers = {} self.drivers = {}
self.registerDriver(zuul.driver.zuul.ZuulDriver()) self.registerDriver(zuul.driver.zuul.ZuulDriver())
@ -85,7 +86,7 @@ class ConnectionRegistry(object):
def configure(self, config, source_only=False): def configure(self, config, source_only=False):
# Register connections from the config # Register connections from the config
connections = {} connections = OrderedDict()
for section_name in config.sections(): for section_name in config.sections():
con_match = re.match(r'^connection ([\'\"]?)(.*)(\1)$', con_match = re.match(r'^connection ([\'\"]?)(.*)(\1)$',
@ -154,6 +155,13 @@ class ConnectionRegistry(object):
connection = self.connections[connection_name] connection = self.connections[connection_name]
return connection.driver.getSource(connection) return connection.driver.getSource(connection)
def getSources(self):
sources = []
for connection in self.connections.values():
if hasattr(connection.driver, 'getSource'):
sources.append(connection.driver.getSource(connection))
return sources
def getReporter(self, connection_name, config=None): def getReporter(self, connection_name, config=None):
connection = self.connections[connection_name] connection = self.connections[connection_name]
return connection.driver.getReporter(connection, config) return connection.driver.getReporter(connection, config)
@ -162,7 +170,7 @@ class ConnectionRegistry(object):
connection = self.connections[connection_name] connection = self.connections[connection_name]
return connection.driver.getTrigger(connection, config) return connection.driver.getTrigger(connection, config)
def getSourceByHostname(self, canonical_hostname): def getSourceByCanonicalHostname(self, canonical_hostname):
for connection in self.connections.values(): for connection in self.connections.values():
if hasattr(connection, 'canonical_hostname'): if hasattr(connection, 'canonical_hostname'):
if connection.canonical_hostname == canonical_hostname: if connection.canonical_hostname == canonical_hostname:

29
zuul/lib/dependson.py Normal file
View File

@ -0,0 +1,29 @@
# Copyright 2018 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import re
DEPENDS_ON_RE = re.compile(r"^Depends-On: (.*?)\s*$",
re.MULTILINE | re.IGNORECASE)
def find_dependency_headers(message):
# Search for Depends-On headers
dependencies = []
for match in DEPENDS_ON_RE.findall(message):
if match in dependencies:
continue
dependencies.append(match)
return dependencies

View File

@ -12,9 +12,11 @@
import logging import logging
import textwrap import textwrap
import urllib
from zuul import exceptions from zuul import exceptions
from zuul import model from zuul import model
from zuul.lib.dependson import find_dependency_headers
class DynamicChangeQueueContextManager(object): class DynamicChangeQueueContextManager(object):
@ -343,6 +345,32 @@ class PipelineManager(object):
self.dequeueItem(item) self.dequeueItem(item)
self.reportStats(item) self.reportStats(item)
def updateCommitDependencies(self, change, change_queue):
# Search for Depends-On headers and find appropriate changes
self.log.debug(" Updating commit dependencies for %s", change)
change.refresh_deps = False
dependencies = []
seen = set()
for match in find_dependency_headers(change.message):
self.log.debug(" Found Depends-On header: %s", match)
if match in seen:
continue
seen.add(match)
try:
url = urllib.parse.urlparse(match)
except ValueError:
continue
source = self.sched.connections.getSourceByCanonicalHostname(
url.hostname)
if not source:
continue
self.log.debug(" Found source: %s", source)
dep = source.getChangeByURL(match)
if dep and (not dep.is_merged) and dep not in dependencies:
self.log.debug(" Adding dependency: %s", dep)
dependencies.append(dep)
change.commit_needs_changes = dependencies
def provisionNodes(self, item): def provisionNodes(self, item):
jobs = item.findJobsToRequest() jobs = item.findJobsToRequest()
if not jobs: if not jobs:

View File

@ -95,12 +95,29 @@ class DependentPipelineManager(PipelineManager):
def enqueueChangesBehind(self, change, quiet, ignore_requirements, def enqueueChangesBehind(self, change, quiet, ignore_requirements,
change_queue): change_queue):
self.log.debug("Checking for changes needing %s:" % change) self.log.debug("Checking for changes needing %s:" % change)
to_enqueue = []
source = change.project.source
if not hasattr(change, 'needed_by_changes'): if not hasattr(change, 'needed_by_changes'):
self.log.debug(" %s does not support dependencies" % type(change)) self.log.debug(" %s does not support dependencies" % type(change))
return return
for other_change in change.needed_by_changes:
# for project in change_queue, project.source get changes, then dedup.
sources = set()
for project in change_queue.projects:
sources.add(project.source)
seen = set(change.needed_by_changes)
needed_by_changes = change.needed_by_changes[:]
for source in sources:
self.log.debug(" Checking source: %s", source)
for c in source.getChangesDependingOn(change,
change_queue.projects):
if c not in seen:
seen.add(c)
needed_by_changes.append(c)
self.log.debug(" Following changes: %s", needed_by_changes)
to_enqueue = []
for other_change in needed_by_changes:
with self.getChangeQueue(other_change) as other_change_queue: with self.getChangeQueue(other_change) as other_change_queue:
if other_change_queue != change_queue: if other_change_queue != change_queue:
self.log.debug(" Change %s in project %s can not be " self.log.debug(" Change %s in project %s can not be "
@ -108,6 +125,7 @@ class DependentPipelineManager(PipelineManager):
(other_change, other_change.project, (other_change, other_change.project,
change_queue)) change_queue))
continue continue
source = other_change.project.source
if source.canMerge(other_change, self.getSubmitAllowNeeds()): if source.canMerge(other_change, self.getSubmitAllowNeeds()):
self.log.debug(" Change %s needs %s and is ready to merge" % self.log.debug(" Change %s needs %s and is ready to merge" %
(other_change, change)) (other_change, change))
@ -145,10 +163,13 @@ class DependentPipelineManager(PipelineManager):
return True return True
def checkForChangesNeededBy(self, change, change_queue): def checkForChangesNeededBy(self, change, change_queue):
self.log.debug("Checking for changes needed by %s:" % change)
source = change.project.source
# Return true if okay to proceed enqueing this change, # Return true if okay to proceed enqueing this change,
# false if the change should not be enqueued. # false if the change should not be enqueued.
self.log.debug("Checking for changes needed by %s:" % change)
source = change.project.source
if (hasattr(change, 'commit_needs_changes') and
(change.refresh_deps or change.commit_needs_changes is None)):
self.updateCommitDependencies(change, change_queue)
if not hasattr(change, 'needs_changes'): if not hasattr(change, 'needs_changes'):
self.log.debug(" %s does not support dependencies" % type(change)) self.log.debug(" %s does not support dependencies" % type(change))
return True return True

View File

@ -70,6 +70,9 @@ class IndependentPipelineManager(PipelineManager):
self.log.debug("Checking for changes needed by %s:" % change) self.log.debug("Checking for changes needed by %s:" % change)
# Return true if okay to proceed enqueing this change, # Return true if okay to proceed enqueing this change,
# false if the change should not be enqueued. # false if the change should not be enqueued.
if (hasattr(change, 'commit_needs_changes') and
(change.refresh_deps or change.commit_needs_changes is None)):
self.updateCommitDependencies(change, None)
if not hasattr(change, 'needs_changes'): if not hasattr(change, 'needs_changes'):
self.log.debug(" %s does not support dependencies" % type(change)) self.log.debug(" %s does not support dependencies" % type(change))
return True return True

View File

@ -2103,11 +2103,28 @@ class Change(Branch):
def __init__(self, project): def __init__(self, project):
super(Change, self).__init__(project) super(Change, self).__init__(project)
self.number = None self.number = None
# The gitweb url for browsing the change
self.url = None self.url = None
# URIs for this change which may appear in depends-on headers.
# Note this omits the scheme; i.e., is hostname/path.
self.uris = []
self.patchset = None self.patchset = None
self.needs_changes = [] # Changes that the source determined are needed due to the
self.needed_by_changes = [] # git DAG:
self.git_needs_changes = []
self.git_needed_by_changes = []
# Changes that the source determined are needed by backwards
# compatible processing of Depends-On headers (Gerrit only):
self.compat_needs_changes = []
self.compat_needed_by_changes = []
# Changes that the pipeline manager determined are needed due
# to Depends-On headers (all drivers):
self.commit_needs_changes = None
self.refresh_deps = False
self.is_current_patchset = True self.is_current_patchset = True
self.can_merge = False self.can_merge = False
self.is_merged = False self.is_merged = False
@ -2116,6 +2133,11 @@ class Change(Branch):
self.status = None self.status = None
self.owner = None self.owner = None
# This may be the commit message, or it may be a cover message
# in the case of a PR. Either way, it's the place where we
# look for depends-on headers.
self.message = None
self.source_event = None self.source_event = None
def _id(self): def _id(self):
@ -2129,8 +2151,18 @@ class Change(Branch):
return True return True
return False return False
@property
def needs_changes(self):
return (self.git_needs_changes + self.compat_needs_changes +
self.commit_needs_changes)
@property
def needed_by_changes(self):
return (self.git_needed_by_changes + self.compat_needed_by_changes)
def isUpdateOf(self, other): def isUpdateOf(self, other):
if ((hasattr(other, 'number') and self.number == other.number) and if (self.project == other.project and
(hasattr(other, 'number') and self.number == other.number) and
(hasattr(other, 'patchset') and (hasattr(other, 'patchset') and
self.patchset is not None and self.patchset is not None and
other.patchset is not None and other.patchset is not None and

View File

@ -1088,3 +1088,25 @@ class Scheduler(threading.Thread):
for pipeline in tenant.layout.pipelines.values(): for pipeline in tenant.layout.pipelines.values():
pipelines.append(pipeline.formatStatusJSON(websocket_url)) pipelines.append(pipeline.formatStatusJSON(websocket_url))
return json.dumps(data) return json.dumps(data)
def onChangeUpdated(self, change):
"""Remove stale dependency references on change update.
When a change is updated with a new patchset, other changes in
the system may still have a reference to the old patchset in
their dependencies. Search for those (across all sources) and
mark that their dependencies are out of date. This will cause
them to be refreshed the next time the queue processor
examines them.
"""
self.log.debug("Change %s has been updated, clearing dependent "
"change caches", change)
for source in self.connections.getSources():
for other_change in source.getCachedChanges():
if other_change.commit_needs_changes is None:
continue
for dep in other_change.commit_needs_changes:
if change.isUpdateOf(dep):
other_change.refresh_deps = True
change.refresh_deps = True

View File

@ -51,6 +51,29 @@ class BaseSource(object, metaclass=abc.ABCMeta):
def getChange(self, event): def getChange(self, event):
"""Get the change representing an event.""" """Get the change representing an event."""
@abc.abstractmethod
def getChangeByURL(self, url):
"""Get the change corresponding to the supplied URL.
The URL may may not correspond to this source; if it doesn't,
or there is no change at that URL, return None.
"""
@abc.abstractmethod
def getChangesDependingOn(self, change, projects):
"""Return changes which depend on changes at the supplied URIs.
Search this source for changes which depend on the supplied
change. Generally the Change.uris attribute should be used to
perform the search, as it contains a list of URLs without the
scheme which represent a single change
If the projects argument is None, search across all known
projects. If it is supplied, the search may optionally be
restricted to only those projects.
"""
@abc.abstractmethod @abc.abstractmethod
def getProjectOpenChanges(self, project): def getProjectOpenChanges(self, project):
"""Get the open changes for a project.""" """Get the open changes for a project."""