@ -13,12 +13,119 @@
# License for the specific language governing permissions and limitations
# under the License.
import os
import git
import time
import logging
import urllib
import threading
import voluptuous as v
from zuul . connection import BaseConnection
from zuul . driver . git . gitmodel import GitTriggerEvent , EMPTY_GIT_REF
from zuul . model import Ref , Branch
class GitWatcher ( threading . Thread ) :
log = logging . getLogger ( " connection.git.GitWatcher " )
def __init__ ( self , git_connection , baseurl , poll_delay ) :
threading . Thread . __init__ ( self )
self . daemon = True
self . git_connection = git_connection
self . baseurl = baseurl
self . poll_delay = poll_delay
self . _stopped = False
self . projects_refs = self . git_connection . projects_refs
def compareRefs ( self , project , refs ) :
partial_events = [ ]
# Fetch previous refs state
base_refs = self . projects_refs . get ( project )
# Create list of created refs
rcreateds = set ( refs . keys ( ) ) - set ( base_refs . keys ( ) )
# Create list of deleted refs
rdeleteds = set ( base_refs . keys ( ) ) - set ( refs . keys ( ) )
# Create the list of updated refs
updateds = { }
for ref , sha in refs . items ( ) :
if ref in base_refs and base_refs [ ref ] != sha :
updateds [ ref ] = sha
for ref in rcreateds :
event = {
' ref ' : ref ,
' branch_created ' : True ,
' oldrev ' : EMPTY_GIT_REF ,
' newrev ' : refs [ ref ]
}
partial_events . append ( event )
for ref in rdeleteds :
event = {
' ref ' : ref ,
' branch_deleted ' : True ,
' oldrev ' : base_refs [ ref ] ,
' newrev ' : EMPTY_GIT_REF
}
partial_events . append ( event )
for ref , sha in updateds . items ( ) :
event = {
' ref ' : ref ,
' branch_updated ' : True ,
' oldrev ' : base_refs [ ref ] ,
' newrev ' : sha
}
partial_events . append ( event )
events = [ ]
for pevent in partial_events :
event = GitTriggerEvent ( )
event . type = ' ref-updated '
event . project_hostname = self . git_connection . canonical_hostname
event . project_name = project
for attr in ( ' ref ' , ' oldrev ' , ' newrev ' , ' branch_created ' ,
' branch_deleted ' , ' branch_updated ' ) :
if attr in pevent :
setattr ( event , attr , pevent [ attr ] )
events . append ( event )
return events
def _run ( self ) :
self . log . debug ( " Walk through projects refs for connection: %s " %
self . git_connection . connection_name )
try :
for project in self . git_connection . projects :
refs = self . git_connection . lsRemote ( project )
self . log . debug ( " Read refs %s for project %s " % ( refs , project ) )
if not self . projects_refs . get ( project ) :
# State for this project does not exist yet so add it.
# No event will be triggered in this loop as
# projects_refs['project'] and refs are equal
self . projects_refs [ project ] = refs
events = self . compareRefs ( project , refs )
self . projects_refs [ project ] = refs
# Send events to the scheduler
for event in events :
self . log . debug ( " Handling event: %s " % event )
# Force changes cache update before passing
# the event to the scheduler
self . git_connection . getChange ( event )
self . git_connection . logEvent ( event )
# Pass the event to the scheduler
self . git_connection . sched . addEvent ( event )
except Exception as e :
self . log . debug ( " Unexpected issue in _run loop: %s " % str ( e ) )
def run ( self ) :
while not self . _stopped :
if not self . git_connection . w_pause :
self . _run ( )
# Polling wait delay
else :
self . log . debug ( " Watcher is on pause " )
time . sleep ( self . poll_delay )
def stop ( self ) :
self . _stopped = True
class GitConnection ( BaseConnection ) :
@ -32,6 +139,8 @@ class GitConnection(BaseConnection):
raise Exception ( ' baseurl is required for git connections in '
' %s ' % self . connection_name )
self . baseurl = self . connection_config . get ( ' baseurl ' )
self . poll_timeout = float (
self . connection_config . get ( ' poll_delay ' , 3600 * 2 ) )
self . canonical_hostname = self . connection_config . get (
' canonical_hostname ' )
if not self . canonical_hostname :
@ -40,7 +149,10 @@ class GitConnection(BaseConnection):
self . canonical_hostname = r . hostname
else :
self . canonical_hostname = ' localhost '
self . w_pause = False
self . projects = { }
self . projects_refs = { }
self . _change_cache = { }
def getProject ( self , name ) :
return self . projects . get ( name )
@ -48,15 +160,97 @@ class GitConnection(BaseConnection):
def addProject ( self , project ) :
self . projects [ project . name ] = project
def getChangeFilesUpdated ( self , project_name , branch , tosha ) :
job = self . sched . merger . getFilesChanges (
self . connection_name , project_name , branch , tosha )
self . log . debug ( " Waiting for fileschanges job %s " % job )
job . wait ( )
if not job . updated :
raise Exception ( " Fileschanges job %s failed " % job )
self . log . debug ( " Fileschanges job %s got changes on files %s " %
( job , job . files ) )
return job . files
def lsRemote ( self , project ) :
refs = { }
client = git . cmd . Git ( )
output = client . ls_remote (
os . path . join ( self . baseurl , project ) )
for line in output . splitlines ( ) :
sha , ref = line . split ( ' \t ' )
if ref . startswith ( ' refs/ ' ) :
refs [ ref ] = sha
return refs
def maintainCache ( self , relevant ) :
remove = { }
for branch , refschange in self . _change_cache . items ( ) :
for ref , change in refschange . items ( ) :
if change not in relevant :
remove . setdefault ( branch , [ ] ) . append ( ref )
for branch , refs in remove . items ( ) :
for ref in refs :
del self . _change_cache [ branch ] [ ref ]
if not self . _change_cache [ branch ] :
del self . _change_cache [ branch ]
def getChange ( self , event , refresh = False ) :
if event . ref and event . ref . startswith ( ' refs/heads/ ' ) :
branch = event . ref [ len ( ' refs/heads/ ' ) : ]
change = self . _change_cache . get ( branch , { } ) . get ( event . newrev )
if change :
return change
project = self . getProject ( event . project_name )
change = Branch ( project )
change . branch = branch
for attr in ( ' ref ' , ' oldrev ' , ' newrev ' ) :
setattr ( change , attr , getattr ( event , attr ) )
change . url = " "
change . files = self . getChangeFilesUpdated (
event . project_name , change . branch , event . oldrev )
self . _change_cache . setdefault ( branch , { } ) [ event . newrev ] = change
elif event . ref :
# catch-all ref (ie, not a branch or head)
project = self . getProject ( event . project_name )
change = Ref ( project )
for attr in ( ' ref ' , ' oldrev ' , ' newrev ' ) :
setattr ( change , attr , getattr ( event , attr ) )
change . url = " "
else :
self . log . warning ( " Unable to get change for %s " % ( event , ) )
change = None
return change
def getProjectBranches ( self , project , tenant ) :
# TODO(jeblair): implement; this will need to handle local or
# remote git urls.
return [ ' master ' ]
refs = self . lsRemote ( project . name )
branches = [ ref [ len ( ' refs/heads/ ' ) : ] for ref in
refs if ref . startswith ( ' refs/heads/ ' ) ]
return branches
def getGitUrl ( self , project ) :
url = ' %s / %s ' % ( self . baseurl , project . name )
return url
def onLoad ( self ) :
self . log . debug ( " Starting Git Watcher " )
self . _start_watcher_thread ( )
def onStop ( self ) :
self . log . debug ( " Stopping Git Watcher " )
self . _stop_watcher_thread ( )
def _stop_watcher_thread ( self ) :
if self . watcher_thread :
self . watcher_thread . stop ( )
self . watcher_thread . join ( )
def _start_watcher_thread ( self ) :
self . watcher_thread = GitWatcher (
self ,
self . baseurl ,
self . poll_timeout )
self . watcher_thread . start ( )
def getSchema ( ) :
git_connection = v . Any ( str , v . Schema ( dict ) )