Merge "Git driver" into feature/zuulv3
This commit is contained in:
@@ -15,6 +15,7 @@
|
||||
from zuul.driver import Driver, ConnectionInterface, SourceInterface
|
||||
from zuul.driver.git import gitconnection
|
||||
from zuul.driver.git import gitsource
|
||||
from zuul.driver.git import gittrigger
|
||||
|
||||
|
||||
class GitDriver(Driver, ConnectionInterface, SourceInterface):
|
||||
@@ -23,9 +24,15 @@ class GitDriver(Driver, ConnectionInterface, SourceInterface):
|
||||
def getConnection(self, name, config):
|
||||
return gitconnection.GitConnection(self, name, config)
|
||||
|
||||
def getTrigger(self, connection, config=None):
|
||||
return gittrigger.GitTrigger(self, connection, config)
|
||||
|
||||
def getSource(self, connection):
|
||||
return gitsource.GitSource(self, connection)
|
||||
|
||||
def getTriggerSchema(self):
|
||||
return gittrigger.getSchema()
|
||||
|
||||
def getRequireSchema(self):
|
||||
return {}
|
||||
|
||||
|
||||
@@ -13,12 +13,119 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import os
|
||||
import git
|
||||
import time
|
||||
import logging
|
||||
import urllib
|
||||
import threading
|
||||
|
||||
import voluptuous as v
|
||||
|
||||
from zuul.connection import BaseConnection
|
||||
from zuul.driver.git.gitmodel import GitTriggerEvent, EMPTY_GIT_REF
|
||||
from zuul.model import Ref, Branch
|
||||
|
||||
|
||||
class GitWatcher(threading.Thread):
|
||||
log = logging.getLogger("connection.git.GitWatcher")
|
||||
|
||||
def __init__(self, git_connection, baseurl, poll_delay):
|
||||
threading.Thread.__init__(self)
|
||||
self.daemon = True
|
||||
self.git_connection = git_connection
|
||||
self.baseurl = baseurl
|
||||
self.poll_delay = poll_delay
|
||||
self._stopped = False
|
||||
self.projects_refs = self.git_connection.projects_refs
|
||||
|
||||
def compareRefs(self, project, refs):
|
||||
partial_events = []
|
||||
# Fetch previous refs state
|
||||
base_refs = self.projects_refs.get(project)
|
||||
# Create list of created refs
|
||||
rcreateds = set(refs.keys()) - set(base_refs.keys())
|
||||
# Create list of deleted refs
|
||||
rdeleteds = set(base_refs.keys()) - set(refs.keys())
|
||||
# Create the list of updated refs
|
||||
updateds = {}
|
||||
for ref, sha in refs.items():
|
||||
if ref in base_refs and base_refs[ref] != sha:
|
||||
updateds[ref] = sha
|
||||
for ref in rcreateds:
|
||||
event = {
|
||||
'ref': ref,
|
||||
'branch_created': True,
|
||||
'oldrev': EMPTY_GIT_REF,
|
||||
'newrev': refs[ref]
|
||||
}
|
||||
partial_events.append(event)
|
||||
for ref in rdeleteds:
|
||||
event = {
|
||||
'ref': ref,
|
||||
'branch_deleted': True,
|
||||
'oldrev': base_refs[ref],
|
||||
'newrev': EMPTY_GIT_REF
|
||||
}
|
||||
partial_events.append(event)
|
||||
for ref, sha in updateds.items():
|
||||
event = {
|
||||
'ref': ref,
|
||||
'branch_updated': True,
|
||||
'oldrev': base_refs[ref],
|
||||
'newrev': sha
|
||||
}
|
||||
partial_events.append(event)
|
||||
events = []
|
||||
for pevent in partial_events:
|
||||
event = GitTriggerEvent()
|
||||
event.type = 'ref-updated'
|
||||
event.project_hostname = self.git_connection.canonical_hostname
|
||||
event.project_name = project
|
||||
for attr in ('ref', 'oldrev', 'newrev', 'branch_created',
|
||||
'branch_deleted', 'branch_updated'):
|
||||
if attr in pevent:
|
||||
setattr(event, attr, pevent[attr])
|
||||
events.append(event)
|
||||
return events
|
||||
|
||||
def _run(self):
|
||||
self.log.debug("Walk through projects refs for connection: %s" %
|
||||
self.git_connection.connection_name)
|
||||
try:
|
||||
for project in self.git_connection.projects:
|
||||
refs = self.git_connection.lsRemote(project)
|
||||
self.log.debug("Read refs %s for project %s" % (refs, project))
|
||||
if not self.projects_refs.get(project):
|
||||
# State for this project does not exist yet so add it.
|
||||
# No event will be triggered in this loop as
|
||||
# projects_refs['project'] and refs are equal
|
||||
self.projects_refs[project] = refs
|
||||
events = self.compareRefs(project, refs)
|
||||
self.projects_refs[project] = refs
|
||||
# Send events to the scheduler
|
||||
for event in events:
|
||||
self.log.debug("Handling event: %s" % event)
|
||||
# Force changes cache update before passing
|
||||
# the event to the scheduler
|
||||
self.git_connection.getChange(event)
|
||||
self.git_connection.logEvent(event)
|
||||
# Pass the event to the scheduler
|
||||
self.git_connection.sched.addEvent(event)
|
||||
except Exception as e:
|
||||
self.log.debug("Unexpected issue in _run loop: %s" % str(e))
|
||||
|
||||
def run(self):
|
||||
while not self._stopped:
|
||||
if not self.git_connection.w_pause:
|
||||
self._run()
|
||||
# Polling wait delay
|
||||
else:
|
||||
self.log.debug("Watcher is on pause")
|
||||
time.sleep(self.poll_delay)
|
||||
|
||||
def stop(self):
|
||||
self._stopped = True
|
||||
|
||||
|
||||
class GitConnection(BaseConnection):
|
||||
@@ -32,6 +139,8 @@ class GitConnection(BaseConnection):
|
||||
raise Exception('baseurl is required for git connections in '
|
||||
'%s' % self.connection_name)
|
||||
self.baseurl = self.connection_config.get('baseurl')
|
||||
self.poll_timeout = float(
|
||||
self.connection_config.get('poll_delay', 3600 * 2))
|
||||
self.canonical_hostname = self.connection_config.get(
|
||||
'canonical_hostname')
|
||||
if not self.canonical_hostname:
|
||||
@@ -40,7 +149,10 @@ class GitConnection(BaseConnection):
|
||||
self.canonical_hostname = r.hostname
|
||||
else:
|
||||
self.canonical_hostname = 'localhost'
|
||||
self.w_pause = False
|
||||
self.projects = {}
|
||||
self.projects_refs = {}
|
||||
self._change_cache = {}
|
||||
|
||||
def getProject(self, name):
|
||||
return self.projects.get(name)
|
||||
@@ -48,15 +160,97 @@ class GitConnection(BaseConnection):
|
||||
def addProject(self, project):
|
||||
self.projects[project.name] = project
|
||||
|
||||
def getChangeFilesUpdated(self, project_name, branch, tosha):
|
||||
job = self.sched.merger.getFilesChanges(
|
||||
self.connection_name, project_name, branch, tosha)
|
||||
self.log.debug("Waiting for fileschanges job %s" % job)
|
||||
job.wait()
|
||||
if not job.updated:
|
||||
raise Exception("Fileschanges job %s failed" % job)
|
||||
self.log.debug("Fileschanges job %s got changes on files %s" %
|
||||
(job, job.files))
|
||||
return job.files
|
||||
|
||||
def lsRemote(self, project):
|
||||
refs = {}
|
||||
client = git.cmd.Git()
|
||||
output = client.ls_remote(
|
||||
os.path.join(self.baseurl, project))
|
||||
for line in output.splitlines():
|
||||
sha, ref = line.split('\t')
|
||||
if ref.startswith('refs/'):
|
||||
refs[ref] = sha
|
||||
return refs
|
||||
|
||||
def maintainCache(self, relevant):
|
||||
remove = {}
|
||||
for branch, refschange in self._change_cache.items():
|
||||
for ref, change in refschange.items():
|
||||
if change not in relevant:
|
||||
remove.setdefault(branch, []).append(ref)
|
||||
for branch, refs in remove.items():
|
||||
for ref in refs:
|
||||
del self._change_cache[branch][ref]
|
||||
if not self._change_cache[branch]:
|
||||
del self._change_cache[branch]
|
||||
|
||||
def getChange(self, event, refresh=False):
|
||||
if event.ref and event.ref.startswith('refs/heads/'):
|
||||
branch = event.ref[len('refs/heads/'):]
|
||||
change = self._change_cache.get(branch, {}).get(event.newrev)
|
||||
if change:
|
||||
return change
|
||||
project = self.getProject(event.project_name)
|
||||
change = Branch(project)
|
||||
change.branch = branch
|
||||
for attr in ('ref', 'oldrev', 'newrev'):
|
||||
setattr(change, attr, getattr(event, attr))
|
||||
change.url = ""
|
||||
change.files = self.getChangeFilesUpdated(
|
||||
event.project_name, change.branch, event.oldrev)
|
||||
self._change_cache.setdefault(branch, {})[event.newrev] = change
|
||||
elif event.ref:
|
||||
# catch-all ref (ie, not a branch or head)
|
||||
project = self.getProject(event.project_name)
|
||||
change = Ref(project)
|
||||
for attr in ('ref', 'oldrev', 'newrev'):
|
||||
setattr(change, attr, getattr(event, attr))
|
||||
change.url = ""
|
||||
else:
|
||||
self.log.warning("Unable to get change for %s" % (event,))
|
||||
change = None
|
||||
return change
|
||||
|
||||
def getProjectBranches(self, project, tenant):
|
||||
# TODO(jeblair): implement; this will need to handle local or
|
||||
# remote git urls.
|
||||
return ['master']
|
||||
refs = self.lsRemote(project.name)
|
||||
branches = [ref[len('refs/heads/'):] for ref in
|
||||
refs if ref.startswith('refs/heads/')]
|
||||
return branches
|
||||
|
||||
def getGitUrl(self, project):
|
||||
url = '%s/%s' % (self.baseurl, project.name)
|
||||
return url
|
||||
|
||||
def onLoad(self):
|
||||
self.log.debug("Starting Git Watcher")
|
||||
self._start_watcher_thread()
|
||||
|
||||
def onStop(self):
|
||||
self.log.debug("Stopping Git Watcher")
|
||||
self._stop_watcher_thread()
|
||||
|
||||
def _stop_watcher_thread(self):
|
||||
if self.watcher_thread:
|
||||
self.watcher_thread.stop()
|
||||
self.watcher_thread.join()
|
||||
|
||||
def _start_watcher_thread(self):
|
||||
self.watcher_thread = GitWatcher(
|
||||
self,
|
||||
self.baseurl,
|
||||
self.poll_timeout)
|
||||
self.watcher_thread.start()
|
||||
|
||||
|
||||
def getSchema():
|
||||
git_connection = v.Any(str, v.Schema(dict))
|
||||
|
||||
86
zuul/driver/git/gitmodel.py
Normal file
86
zuul/driver/git/gitmodel.py
Normal file
@@ -0,0 +1,86 @@
|
||||
# Copyright 2017 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import re
|
||||
|
||||
from zuul.model import TriggerEvent
|
||||
from zuul.model import EventFilter
|
||||
|
||||
|
||||
EMPTY_GIT_REF = '0' * 40 # git sha of all zeros, used during creates/deletes
|
||||
|
||||
|
||||
class GitTriggerEvent(TriggerEvent):
|
||||
"""Incoming event from an external system."""
|
||||
|
||||
def __repr__(self):
|
||||
ret = '<GitTriggerEvent %s %s' % (self.type,
|
||||
self.project_name)
|
||||
|
||||
if self.branch:
|
||||
ret += " %s" % self.branch
|
||||
ret += " oldrev:%s" % self.oldrev
|
||||
ret += " newrev:%s" % self.newrev
|
||||
ret += '>'
|
||||
|
||||
return ret
|
||||
|
||||
|
||||
class GitEventFilter(EventFilter):
|
||||
def __init__(self, trigger, types=[], refs=[],
|
||||
ignore_deletes=True):
|
||||
|
||||
super().__init__(trigger)
|
||||
|
||||
self._refs = refs
|
||||
self.types = types
|
||||
self.refs = [re.compile(x) for x in refs]
|
||||
self.ignore_deletes = ignore_deletes
|
||||
|
||||
def __repr__(self):
|
||||
ret = '<GitEventFilter'
|
||||
|
||||
if self.types:
|
||||
ret += ' types: %s' % ', '.join(self.types)
|
||||
if self._refs:
|
||||
ret += ' refs: %s' % ', '.join(self._refs)
|
||||
if self.ignore_deletes:
|
||||
ret += ' ignore_deletes: %s' % self.ignore_deletes
|
||||
ret += '>'
|
||||
|
||||
return ret
|
||||
|
||||
def matches(self, event, change):
|
||||
# event types are ORed
|
||||
matches_type = False
|
||||
for etype in self.types:
|
||||
if etype == event.type:
|
||||
matches_type = True
|
||||
if self.types and not matches_type:
|
||||
return False
|
||||
|
||||
# refs are ORed
|
||||
matches_ref = False
|
||||
if event.ref is not None:
|
||||
for ref in self.refs:
|
||||
if ref.match(event.ref):
|
||||
matches_ref = True
|
||||
if self.refs and not matches_ref:
|
||||
return False
|
||||
if self.ignore_deletes and event.newrev == EMPTY_GIT_REF:
|
||||
# If the updated ref has an empty git sha (all 0s),
|
||||
# then the ref is being deleted
|
||||
return False
|
||||
|
||||
return True
|
||||
@@ -36,7 +36,7 @@ class GitSource(BaseSource):
|
||||
raise NotImplemented()
|
||||
|
||||
def getChange(self, event, refresh=False):
|
||||
raise NotImplemented()
|
||||
return self.connection.getChange(event, refresh)
|
||||
|
||||
def getProject(self, name):
|
||||
p = self.connection.getProject(name)
|
||||
|
||||
49
zuul/driver/git/gittrigger.py
Normal file
49
zuul/driver/git/gittrigger.py
Normal file
@@ -0,0 +1,49 @@
|
||||
# Copyright 2017 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import logging
|
||||
import voluptuous as v
|
||||
from zuul.trigger import BaseTrigger
|
||||
from zuul.driver.git.gitmodel import GitEventFilter
|
||||
from zuul.driver.util import scalar_or_list, to_list
|
||||
|
||||
|
||||
class GitTrigger(BaseTrigger):
|
||||
name = 'git'
|
||||
log = logging.getLogger("zuul.GitTrigger")
|
||||
|
||||
def getEventFilters(self, trigger_conf):
|
||||
efilters = []
|
||||
for trigger in to_list(trigger_conf):
|
||||
f = GitEventFilter(
|
||||
trigger=self,
|
||||
types=to_list(trigger['event']),
|
||||
refs=to_list(trigger.get('ref')),
|
||||
ignore_deletes=trigger.get(
|
||||
'ignore-deletes', True)
|
||||
)
|
||||
efilters.append(f)
|
||||
|
||||
return efilters
|
||||
|
||||
|
||||
def getSchema():
|
||||
git_trigger = {
|
||||
v.Required('event'):
|
||||
scalar_or_list(v.Any('ref-updated')),
|
||||
'ref': scalar_or_list(str),
|
||||
'ignore-deletes': bool,
|
||||
}
|
||||
|
||||
return git_trigger
|
||||
Reference in New Issue
Block a user