zuul/zuul/driver/git/gitconnection.py
James E. Blair 185b970bad Stabilize git driver tests
These tests relied on sleeps which can cause races when running
the full test suite in parallel.  Instead, wait for the events
we know will happen to happen.

Also remove the dependency on yarl now that aiohttp has made a
release which works with yarl 1.0 (however, it does not work with
<1.0 which is why this needs to be combined with this change to
fix tests).

Change-Id: Ib1c626cdd3f083dd1d23a3c6547bd7163b66567e
2018-01-17 09:53:18 -08:00

261 lines
9.6 KiB
Python

# Copyright 2011 OpenStack, LLC.
# Copyright 2012 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
import git
import time
import logging
import urllib
import threading
import voluptuous as v
from zuul.connection import BaseConnection
from zuul.driver.git.gitmodel import GitTriggerEvent, EMPTY_GIT_REF
from zuul.model import Ref, Branch
class GitWatcher(threading.Thread):
log = logging.getLogger("connection.git.GitWatcher")
def __init__(self, git_connection, baseurl, poll_delay):
threading.Thread.__init__(self)
self.daemon = True
self.git_connection = git_connection
self.baseurl = baseurl
self.poll_delay = poll_delay
self._stopped = False
self.projects_refs = self.git_connection.projects_refs
# This is used by the test framework
self._event_count = 0
def compareRefs(self, project, refs):
partial_events = []
# Fetch previous refs state
base_refs = self.projects_refs.get(project)
# Create list of created refs
rcreateds = set(refs.keys()) - set(base_refs.keys())
# Create list of deleted refs
rdeleteds = set(base_refs.keys()) - set(refs.keys())
# Create the list of updated refs
updateds = {}
for ref, sha in refs.items():
if ref in base_refs and base_refs[ref] != sha:
updateds[ref] = sha
for ref in rcreateds:
event = {
'ref': ref,
'branch_created': True,
'oldrev': EMPTY_GIT_REF,
'newrev': refs[ref]
}
partial_events.append(event)
for ref in rdeleteds:
event = {
'ref': ref,
'branch_deleted': True,
'oldrev': base_refs[ref],
'newrev': EMPTY_GIT_REF
}
partial_events.append(event)
for ref, sha in updateds.items():
event = {
'ref': ref,
'branch_updated': True,
'oldrev': base_refs[ref],
'newrev': sha
}
partial_events.append(event)
events = []
for pevent in partial_events:
event = GitTriggerEvent()
event.type = 'ref-updated'
event.project_hostname = self.git_connection.canonical_hostname
event.project_name = project
for attr in ('ref', 'oldrev', 'newrev', 'branch_created',
'branch_deleted', 'branch_updated'):
if attr in pevent:
setattr(event, attr, pevent[attr])
events.append(event)
return events
def _run(self):
self.log.debug("Walk through projects refs for connection: %s" %
self.git_connection.connection_name)
try:
for project in self.git_connection.projects:
refs = self.git_connection.lsRemote(project)
self.log.debug("Read refs %s for project %s" % (refs, project))
if not self.projects_refs.get(project):
# State for this project does not exist yet so add it.
# No event will be triggered in this loop as
# projects_refs['project'] and refs are equal
self.projects_refs[project] = refs
events = self.compareRefs(project, refs)
self.projects_refs[project] = refs
# Send events to the scheduler
for event in events:
self.log.debug("Handling event: %s" % event)
# Force changes cache update before passing
# the event to the scheduler
self.git_connection.getChange(event)
self.git_connection.logEvent(event)
# Pass the event to the scheduler
self.git_connection.sched.addEvent(event)
self._event_count += 1
except Exception as e:
self.log.debug("Unexpected issue in _run loop: %s" % str(e))
def run(self):
while not self._stopped:
if not self.git_connection.w_pause:
self._run()
# Polling wait delay
else:
self.log.debug("Watcher is on pause")
time.sleep(self.poll_delay)
def stop(self):
self._stopped = True
class GitConnection(BaseConnection):
driver_name = 'git'
log = logging.getLogger("connection.git")
def __init__(self, driver, connection_name, connection_config):
super(GitConnection, self).__init__(driver, connection_name,
connection_config)
if 'baseurl' not in self.connection_config:
raise Exception('baseurl is required for git connections in '
'%s' % self.connection_name)
self.baseurl = self.connection_config.get('baseurl')
self.poll_timeout = float(
self.connection_config.get('poll_delay', 3600 * 2))
self.canonical_hostname = self.connection_config.get(
'canonical_hostname')
if not self.canonical_hostname:
r = urllib.parse.urlparse(self.baseurl)
if r.hostname:
self.canonical_hostname = r.hostname
else:
self.canonical_hostname = 'localhost'
self.w_pause = False
self.projects = {}
self.projects_refs = {}
self._change_cache = {}
def getProject(self, name):
return self.projects.get(name)
def addProject(self, project):
self.projects[project.name] = project
def getChangeFilesUpdated(self, project_name, branch, tosha):
job = self.sched.merger.getFilesChanges(
self.connection_name, project_name, branch, tosha)
self.log.debug("Waiting for fileschanges job %s" % job)
job.wait()
if not job.updated:
raise Exception("Fileschanges job %s failed" % job)
self.log.debug("Fileschanges job %s got changes on files %s" %
(job, job.files))
return job.files
def lsRemote(self, project):
refs = {}
client = git.cmd.Git()
output = client.ls_remote(
os.path.join(self.baseurl, project))
for line in output.splitlines():
sha, ref = line.split('\t')
if ref.startswith('refs/'):
refs[ref] = sha
return refs
def maintainCache(self, relevant):
remove = {}
for branch, refschange in self._change_cache.items():
for ref, change in refschange.items():
if change not in relevant:
remove.setdefault(branch, []).append(ref)
for branch, refs in remove.items():
for ref in refs:
del self._change_cache[branch][ref]
if not self._change_cache[branch]:
del self._change_cache[branch]
def getChange(self, event, refresh=False):
if event.ref and event.ref.startswith('refs/heads/'):
branch = event.ref[len('refs/heads/'):]
change = self._change_cache.get(branch, {}).get(event.newrev)
if change:
return change
project = self.getProject(event.project_name)
change = Branch(project)
change.branch = branch
for attr in ('ref', 'oldrev', 'newrev'):
setattr(change, attr, getattr(event, attr))
change.url = ""
change.files = self.getChangeFilesUpdated(
event.project_name, change.branch, event.oldrev)
self._change_cache.setdefault(branch, {})[event.newrev] = change
elif event.ref:
# catch-all ref (ie, not a branch or head)
project = self.getProject(event.project_name)
change = Ref(project)
for attr in ('ref', 'oldrev', 'newrev'):
setattr(change, attr, getattr(event, attr))
change.url = ""
else:
self.log.warning("Unable to get change for %s" % (event,))
change = None
return change
def getProjectBranches(self, project, tenant):
refs = self.lsRemote(project.name)
branches = [ref[len('refs/heads/'):] for ref in
refs if ref.startswith('refs/heads/')]
return branches
def getGitUrl(self, project):
url = '%s/%s' % (self.baseurl, project.name)
return url
def onLoad(self):
self.log.debug("Starting Git Watcher")
self._start_watcher_thread()
def onStop(self):
self.log.debug("Stopping Git Watcher")
self._stop_watcher_thread()
def _stop_watcher_thread(self):
if self.watcher_thread:
self.watcher_thread.stop()
self.watcher_thread.join()
def _start_watcher_thread(self):
self.watcher_thread = GitWatcher(
self,
self.baseurl,
self.poll_timeout)
self.watcher_thread.start()
def getSchema():
git_connection = v.Any(str, v.Schema(dict))
return git_connection