Extract the watcher from git driver
So that other drivers which may need to poll for ref updates can use the same code as the git driver, extract that to a helper class. Change-Id: I89a9f5e461bfa5daecd74943d8a5521363cfd559
This commit is contained in:
parent
7e4bfb68f3
commit
cbaa384e47
|
@ -90,7 +90,8 @@ class TestGitDriver(ZuulTestCase):
|
|||
# Let's stop the git Watcher to let us merge some changes commits
|
||||
# We want to verify that config changes are detected for commits
|
||||
# on the range oldrev..newrev
|
||||
self.sched.connections.getSource('git').connection.w_pause = True
|
||||
self.sched.connections.getSource('git').connection.\
|
||||
watcher_thread._pause = True
|
||||
# Add a config change
|
||||
change = {
|
||||
'name': 'org/project',
|
||||
|
@ -112,7 +113,8 @@ class TestGitDriver(ZuulTestCase):
|
|||
'common-config', 'Adding f2',
|
||||
{'f2': "Content"})
|
||||
# Restart the git watcher
|
||||
self.sched.connections.getSource('git').connection.w_pause = False
|
||||
self.sched.connections.getSource('git').connection.\
|
||||
watcher_thread._pause = False
|
||||
|
||||
# Wait for the tenant reconfiguration to happen
|
||||
self.waitForEvent(count)
|
||||
|
@ -130,7 +132,7 @@ class TestGitDriver(ZuulTestCase):
|
|||
# Make sure watcher have read initial refs shas
|
||||
delay = 0.1
|
||||
max_delay = 1
|
||||
while not self.git_connection.projects_refs:
|
||||
while not self.git_connection.watcher_thread.projects_refs:
|
||||
time.sleep(delay)
|
||||
max_delay -= delay
|
||||
if max_delay <= 0:
|
||||
|
|
|
@ -18,118 +18,13 @@ import git
|
|||
import time
|
||||
import logging
|
||||
import urllib
|
||||
import threading
|
||||
|
||||
from zuul.connection import BaseConnection
|
||||
from zuul.driver.git.gitmodel import GitTriggerEvent, EMPTY_GIT_REF
|
||||
from zuul.driver.git.gitmodel import GitTriggerEvent
|
||||
from zuul.driver.git.gitwatcher import GitWatcher
|
||||
from zuul.model import Ref, Branch
|
||||
|
||||
|
||||
class GitWatcher(threading.Thread):
|
||||
log = logging.getLogger("zuul.connection.git.watcher")
|
||||
|
||||
def __init__(self, git_connection, baseurl, poll_delay):
|
||||
threading.Thread.__init__(self)
|
||||
self.daemon = True
|
||||
self.git_connection = git_connection
|
||||
self.baseurl = baseurl
|
||||
self.poll_delay = poll_delay
|
||||
self._stopped = False
|
||||
self.projects_refs = self.git_connection.projects_refs
|
||||
# This is used by the test framework
|
||||
self._event_count = 0
|
||||
|
||||
def compareRefs(self, project, refs):
|
||||
partial_events = []
|
||||
# Fetch previous refs state
|
||||
base_refs = self.projects_refs.get(project)
|
||||
# Create list of created refs
|
||||
rcreateds = set(refs.keys()) - set(base_refs.keys())
|
||||
# Create list of deleted refs
|
||||
rdeleteds = set(base_refs.keys()) - set(refs.keys())
|
||||
# Create the list of updated refs
|
||||
updateds = {}
|
||||
for ref, sha in refs.items():
|
||||
if ref in base_refs and base_refs[ref] != sha:
|
||||
updateds[ref] = sha
|
||||
for ref in rcreateds:
|
||||
event = {
|
||||
'ref': ref,
|
||||
'branch_created': True,
|
||||
'oldrev': EMPTY_GIT_REF,
|
||||
'newrev': refs[ref]
|
||||
}
|
||||
partial_events.append(event)
|
||||
for ref in rdeleteds:
|
||||
event = {
|
||||
'ref': ref,
|
||||
'branch_deleted': True,
|
||||
'oldrev': base_refs[ref],
|
||||
'newrev': EMPTY_GIT_REF
|
||||
}
|
||||
partial_events.append(event)
|
||||
for ref, sha in updateds.items():
|
||||
event = {
|
||||
'ref': ref,
|
||||
'branch_updated': True,
|
||||
'oldrev': base_refs[ref],
|
||||
'newrev': sha
|
||||
}
|
||||
partial_events.append(event)
|
||||
events = []
|
||||
for pevent in partial_events:
|
||||
event = GitTriggerEvent()
|
||||
event.type = 'ref-updated'
|
||||
event.timestamp = time.time()
|
||||
event.project_hostname = self.git_connection.canonical_hostname
|
||||
event.project_name = project
|
||||
for attr in ('ref', 'oldrev', 'newrev', 'branch_created',
|
||||
'branch_deleted', 'branch_updated'):
|
||||
if attr in pevent:
|
||||
setattr(event, attr, pevent[attr])
|
||||
events.append(event)
|
||||
return events
|
||||
|
||||
def _run(self):
|
||||
self.log.debug("Walk through projects refs for connection: %s" %
|
||||
self.git_connection.connection_name)
|
||||
try:
|
||||
for project in self.git_connection.projects:
|
||||
refs = self.git_connection.lsRemote(project)
|
||||
self.log.debug("Read refs %s for project %s" % (refs, project))
|
||||
if not self.projects_refs.get(project):
|
||||
# State for this project does not exist yet so add it.
|
||||
# No event will be triggered in this loop as
|
||||
# projects_refs['project'] and refs are equal
|
||||
self.projects_refs[project] = refs
|
||||
events = self.compareRefs(project, refs)
|
||||
self.projects_refs[project] = refs
|
||||
# Send events to the scheduler
|
||||
for event in events:
|
||||
self.log.debug("Handling event: %s" % event)
|
||||
# Force changes cache update before passing
|
||||
# the event to the scheduler
|
||||
self.git_connection.getChange(event)
|
||||
self.git_connection.logEvent(event)
|
||||
# Pass the event to the scheduler
|
||||
self.git_connection.sched.addEvent(event)
|
||||
self._event_count += 1
|
||||
except Exception as e:
|
||||
self.log.debug("Unexpected issue in _run loop: %s" % str(e))
|
||||
|
||||
def run(self):
|
||||
while not self._stopped:
|
||||
if not self.git_connection.w_pause:
|
||||
self._run()
|
||||
# Polling wait delay
|
||||
else:
|
||||
self.log.debug("Watcher is on pause")
|
||||
time.sleep(self.poll_delay)
|
||||
|
||||
def stop(self):
|
||||
self._stopped = True
|
||||
|
||||
|
||||
class GitConnection(BaseConnection):
|
||||
driver_name = 'git'
|
||||
log = logging.getLogger("zuul.connection.git")
|
||||
|
@ -151,9 +46,7 @@ class GitConnection(BaseConnection):
|
|||
self.canonical_hostname = r.hostname
|
||||
else:
|
||||
self.canonical_hostname = 'localhost'
|
||||
self.w_pause = False
|
||||
self.projects = {}
|
||||
self.projects_refs = {}
|
||||
self._change_cache = {}
|
||||
|
||||
def toDict(self):
|
||||
|
@ -241,6 +134,24 @@ class GitConnection(BaseConnection):
|
|||
def getGitUrl(self, project):
|
||||
return os.path.join(self.baseurl, project.name)
|
||||
|
||||
def watcherCallback(self, data):
|
||||
event = GitTriggerEvent()
|
||||
event.type = 'ref-updated'
|
||||
event.timestamp = time.time()
|
||||
event.project_hostname = self.canonical_hostname
|
||||
event.project_name = data['project']
|
||||
for attr in ('ref', 'oldrev', 'newrev', 'branch_created',
|
||||
'branch_deleted', 'branch_updated'):
|
||||
if attr in data:
|
||||
setattr(event, attr, data[attr])
|
||||
|
||||
# Force changes cache update before passing
|
||||
# the event to the scheduler
|
||||
self.getChange(event)
|
||||
self.logEvent(event)
|
||||
# Pass the event to the scheduler
|
||||
self.sched.addEvent(event)
|
||||
|
||||
def onLoad(self):
|
||||
self.log.debug("Starting Git Watcher")
|
||||
self._start_watcher_thread()
|
||||
|
@ -258,5 +169,6 @@ class GitConnection(BaseConnection):
|
|||
self.watcher_thread = GitWatcher(
|
||||
self,
|
||||
self.baseurl,
|
||||
self.poll_timeout)
|
||||
self.poll_timeout,
|
||||
self.watcherCallback)
|
||||
self.watcher_thread.start()
|
||||
|
|
|
@ -0,0 +1,145 @@
|
|||
# Copyright 2011 OpenStack, LLC.
|
||||
# Copyright 2012 Hewlett-Packard Development Company, L.P.
|
||||
# Copyright 2020 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import os
|
||||
import logging
|
||||
import threading
|
||||
import time
|
||||
|
||||
import git
|
||||
|
||||
from zuul.driver.git.gitmodel import EMPTY_GIT_REF
|
||||
|
||||
|
||||
# This class may be used by any driver to implement git head polling.
|
||||
class GitWatcher(threading.Thread):
|
||||
log = logging.getLogger("zuul.connection.git.watcher")
|
||||
|
||||
def __init__(self, connection, baseurl, poll_delay, callback):
|
||||
"""Watch for branch changes
|
||||
|
||||
Watch every project listed in the connection and call a
|
||||
callback method with information about branch changes.
|
||||
|
||||
:param zuul.Connection connection:
|
||||
The Connection to watch.
|
||||
:param str baseurl:
|
||||
The HTTP(S) URL where git repos are hosted.
|
||||
:param int poll_delay:
|
||||
The interval between polls.
|
||||
:param function callback:
|
||||
A callback method to be called for each updated ref. The sole
|
||||
argument is a dictionary describing the update.
|
||||
"""
|
||||
threading.Thread.__init__(self)
|
||||
self.daemon = True
|
||||
self.connection = connection
|
||||
self.baseurl = baseurl
|
||||
self.poll_delay = poll_delay
|
||||
self._stopped = False
|
||||
self.projects_refs = {}
|
||||
self.callback = callback
|
||||
# This is used by the test framework
|
||||
self._event_count = 0
|
||||
self._pause = False
|
||||
|
||||
def lsRemote(self, project):
|
||||
refs = {}
|
||||
client = git.cmd.Git()
|
||||
output = client.ls_remote(
|
||||
"--heads", "--tags",
|
||||
os.path.join(self.baseurl, project))
|
||||
for line in output.splitlines():
|
||||
sha, ref = line.split('\t')
|
||||
if ref.startswith('refs/'):
|
||||
refs[ref] = sha
|
||||
return refs
|
||||
|
||||
def compareRefs(self, project, refs):
|
||||
events = []
|
||||
# Fetch previous refs state
|
||||
base_refs = self.projects_refs.get(project)
|
||||
# Create list of created refs
|
||||
rcreateds = set(refs.keys()) - set(base_refs.keys())
|
||||
# Create list of deleted refs
|
||||
rdeleteds = set(base_refs.keys()) - set(refs.keys())
|
||||
# Create the list of updated refs
|
||||
updateds = {}
|
||||
for ref, sha in refs.items():
|
||||
if ref in base_refs and base_refs[ref] != sha:
|
||||
updateds[ref] = sha
|
||||
for ref in rcreateds:
|
||||
event = {
|
||||
'project': project,
|
||||
'ref': ref,
|
||||
'branch_created': True,
|
||||
'oldrev': EMPTY_GIT_REF,
|
||||
'newrev': refs[ref]
|
||||
}
|
||||
events.append(event)
|
||||
for ref in rdeleteds:
|
||||
event = {
|
||||
'project': project,
|
||||
'ref': ref,
|
||||
'branch_deleted': True,
|
||||
'oldrev': base_refs[ref],
|
||||
'newrev': EMPTY_GIT_REF
|
||||
}
|
||||
events.append(event)
|
||||
for ref, sha in updateds.items():
|
||||
event = {
|
||||
'project': project,
|
||||
'ref': ref,
|
||||
'branch_updated': True,
|
||||
'oldrev': base_refs[ref],
|
||||
'newrev': sha
|
||||
}
|
||||
events.append(event)
|
||||
return events
|
||||
|
||||
def _run(self):
|
||||
self.log.debug("Walk through projects refs for connection: %s" %
|
||||
self.connection.connection_name)
|
||||
try:
|
||||
for project in self.connection.projects:
|
||||
refs = self.lsRemote(project)
|
||||
self.log.debug("Read refs %s for project %s" % (refs, project))
|
||||
if not self.projects_refs.get(project):
|
||||
# State for this project does not exist yet so add it.
|
||||
# No event will be triggered in this loop as
|
||||
# projects_refs['project'] and refs are equal
|
||||
self.projects_refs[project] = refs
|
||||
events = self.compareRefs(project, refs)
|
||||
self.projects_refs[project] = refs
|
||||
# Send events to the scheduler
|
||||
for event in events:
|
||||
self.log.debug("Sending event: %s" % event)
|
||||
self.callback(event)
|
||||
self._event_count += 1
|
||||
except Exception as e:
|
||||
self.log.debug("Unexpected issue in _run loop: %s" % str(e))
|
||||
|
||||
def run(self):
|
||||
while not self._stopped:
|
||||
if not self._pause:
|
||||
self._run()
|
||||
# Polling wait delay
|
||||
else:
|
||||
self.log.debug("Watcher is on pause")
|
||||
time.sleep(self.poll_delay)
|
||||
|
||||
def stop(self):
|
||||
self._stopped = True
|
Loading…
Reference in New Issue