@ -14,6 +14,7 @@
# under the License.
import configparser
from collections import OrderedDict
from configparser import ConfigParser
from contextlib import contextmanager
import copy
@ -31,7 +32,7 @@ import random
import re
from logging import Logger
from queue import Queue
from typing import Callable , Optional , Any , Iterable , Generator , List
from typing import Callable , Optional , Any , Iterable , Generator , List , Dict
import requests
import select
@ -64,6 +65,21 @@ from git.exc import NoSuchPathError
import yaml
import paramiko
from zuul.model import Change
from zuul.rpcclient import RPCClient
from zuul.driver.zuul import ZuulDriver
from zuul.driver.git import GitDriver
from zuul.driver.smtp import SMTPDriver
from zuul.driver.github import GithubDriver
from zuul.driver.timer import TimerDriver
from zuul.driver.sql import SQLDriver
from zuul.driver.bubblewrap import BubblewrapDriver
from zuul.driver.nullwrap import NullwrapDriver
from zuul.driver.mqtt import MQTTDriver
from zuul.driver.pagure import PagureDriver
from zuul.driver.gitlab import GitlabDriver
from zuul.driver.gerrit import GerritDriver
from zuul.driver.github.githubconnection import GithubClientManager
from zuul.lib.connections import ConnectionRegistry
from psutil import Popen
@ -165,6 +181,162 @@ def never_capture():
return decorator
class GerritDriverMock ( GerritDriver ) :
def __init__ ( self , registry , changes : Dict [ str , Dict [ str , Change ] ] ,
upstream_root : str , additional_event_queues , poller_events ,
add_cleanup : Callable [ [ Callable [ [ ] , None ] ] , None ] ) :
super ( GerritDriverMock , self ) . __init__ ( )
self . registry = registry
self . changes = changes
self . upstream_root = upstream_root
self . additional_event_queues = additional_event_queues
self . poller_events = poller_events
self . add_cleanup = add_cleanup
def getConnection ( self , name , config ) :
db = self . changes . setdefault ( config [ ' server ' ] , { } )
poll_event = self . poller_events . setdefault ( name , threading . Event ( ) )
ref_event = self . poller_events . setdefault ( name + ' -ref ' ,
threading . Event ( ) )
connection = FakeGerritConnection (
self , name , config ,
changes_db = db ,
upstream_root = self . upstream_root ,
poller_event = poll_event ,
ref_watcher_event = ref_event )
if connection . web_server :
self . add_cleanup ( connection . web_server . stop )
self . additional_event_queues . append ( connection . event_queue )
setattr ( self . registry , ' fake_ ' + name , connection )
return connection
class GithubDriverMock ( GithubDriver ) :
def __init__ ( self , registry , changes : Dict [ str , Dict [ str , Change ] ] ,
config : ConfigParser , upstream_root : str ,
additional_event_queues , rpcclient : RPCClient ,
git_url_with_auth : bool ) :
super ( GithubDriverMock , self ) . __init__ ( )
self . registry = registry
self . changes = changes
self . config = config
self . upstream_root = upstream_root
self . additional_event_queues = additional_event_queues
self . rpcclient = rpcclient
self . git_url_with_auth = git_url_with_auth
def registerGithubProjects ( self , connection ) :
path = self . config . get ( ' scheduler ' , ' tenant_config ' )
with open ( os . path . join ( FIXTURE_DIR , path ) ) as f :
tenant_config = yaml . safe_load ( f . read ( ) )
for tenant in tenant_config :
sources = tenant [ ' tenant ' ] [ ' source ' ]
conf = sources . get ( connection . source . name )
if not conf :
return
projects = conf . get ( ' config-projects ' , [ ] )
projects . extend ( conf . get ( ' untrusted-projects ' , [ ] ) )
client = connection . getGithubClient ( None )
for project in projects :
if isinstance ( project , dict ) :
# This can be a dict with the project as the only key
client . addProjectByName (
list ( project . keys ( ) ) [ 0 ] )
else :
client . addProjectByName ( project )
def getConnection ( self , name , config ) :
server = config . get ( ' server ' , ' github.com ' )
db = self . changes . setdefault ( server , { } )
connection = FakeGithubConnection (
self , name , config , self . rpcclient ,
changes_db = db ,
upstream_root = self . upstream_root ,
git_url_with_auth = self . git_url_with_auth )
self . additional_event_queues . append ( connection . event_queue )
setattr ( self . registry , ' fake_ ' + name , connection )
self . registerGithubProjects ( connection )
return connection
class PagureDriverMock ( PagureDriver ) :
def __init__ ( self , registry , changes : Dict [ str , Dict [ str , Change ] ] ,
upstream_root : str , additional_event_queues ,
rpcclient : RPCClient ) :
super ( PagureDriverMock , self ) . __init__ ( )
self . registry = registry
self . changes = changes
self . upstream_root = upstream_root
self . additional_event_queues = additional_event_queues
self . rpcclient = rpcclient
def getConnection ( self , name , config ) :
server = config . get ( ' server ' , ' pagure.io ' )
db = self . changes . setdefault ( server , { } )
connection = FakePagureConnection (
self , name , config , self . rpcclient ,
changes_db = db ,
upstream_root = self . upstream_root )
self . additional_event_queues . append ( connection . event_queue )
setattr ( self . registry , ' fake_ ' + name , connection )
return connection
class GitlabDriverMock ( GitlabDriver ) :
def __init__ ( self , registry , changes : Dict [ str , Dict [ str , Change ] ] ,
upstream_root : str , additional_event_queues ,
rpcclient : RPCClient ) :
super ( GitlabDriverMock , self ) . __init__ ( )
self . registry = registry
self . changes = changes
self . upstream_root = upstream_root
self . additional_event_queues = additional_event_queues
self . rpcclient = rpcclient
def getConnection ( self , name , config ) :
server = config . get ( ' server ' , ' gitlab.com ' )
db = self . changes . setdefault ( server , { } )
connection = FakeGitlabConnection (
self , name , config , self . rpcclient ,
changes_db = db ,
upstream_root = self . upstream_root )
self . additional_event_queues . append ( connection . event_queue )
setattr ( self . registry , ' fake_ ' + name , connection )
return connection
class TestConnectionRegistry ( ConnectionRegistry ) :
def __init__ ( self , changes : Dict [ str , Dict [ str , Change ] ] ,
config : ConfigParser , additional_event_queues ,
upstream_root : str , rpcclient : RPCClient , poller_events ,
git_url_with_auth : bool ,
add_cleanup : Callable [ [ Callable [ [ ] , None ] ] , None ] ) :
self . connections = OrderedDict ( )
self . drivers = { }
self . registerDriver ( ZuulDriver ( ) )
self . registerDriver ( GerritDriverMock (
self , changes , upstream_root , additional_event_queues ,
poller_events , add_cleanup ) )
self . registerDriver ( GitDriver ( ) )
self . registerDriver ( GithubDriverMock (
self , changes , config , upstream_root , additional_event_queues ,
rpcclient , git_url_with_auth ) )
self . registerDriver ( SMTPDriver ( ) )
self . registerDriver ( TimerDriver ( ) )
self . registerDriver ( SQLDriver ( ) )
self . registerDriver ( BubblewrapDriver ( ) )
self . registerDriver ( NullwrapDriver ( ) )
self . registerDriver ( MQTTDriver ( ) )
self . registerDriver ( PagureDriverMock (
self , changes , upstream_root , additional_event_queues , rpcclient ) )
self . registerDriver ( GitlabDriverMock (
self , changes , upstream_root , additional_event_queues , rpcclient ) )
class FakeAnsibleManager ( zuul . lib . ansible . AnsibleManager ) :
def validate ( self ) :
@ -3384,17 +3556,23 @@ class WebProxyFixture(fixtures.Fixture):
class ZuulWebFixture ( fixtures . Fixture ) :
def __init__ ( self , gearman_server_port , config , test_root , info = None ,
zk_hosts = None ) :
def __init__ ( self , gearman_server_port ,
changes : Dict [ str , Dict [ str , Change ] ] , config : ConfigParser ,
additional_event_queues , upstream_root : str ,
rpcclient : RPCClient , poller_events , git_url_with_auth : bool ,
add_cleanup : Callable [ [ Callable [ [ ] , None ] ] , None ] ,
test_root , info = None , zk_hosts = None ) :
super ( ZuulWebFixture , self ) . __init__ ( )
self . gearman_server_port = gearman_server_port
self . connections = zuul . lib . connections . ConnectionRegistry ( )
self . connections = TestConnectionRegistry (
changes , config , additional_event_queues , upstream_root , rpcclient ,
poller_events , git_url_with_auth , add_cleanup )
self . connections . configure (
config ,
include_drivers = [ zuul . driver . sql . SQLDriver ,
zuul . driver . github . GithubDriver ,
zuul . driver . gitlab . GitlabDriver ,
zuul . driver . pagure . PagureDriver ] )
GithubDriverMock ,
GitlabDriverMock ,
PagureDriverMock ] )
self . authenticators = zuul . lib . auth . AuthenticatorRegistry ( )
self . authenticators . configure ( config )
if info is None :
@ -3639,10 +3817,16 @@ class SymLink(object):
class SchedulerTestApp :
def __init__ ( self , log : Logger , config : ConfigParser , zk_config : str ,
connections : ConnectionRegistry ) :
changes : Dict [ str , Dict [ str , Change ] ] ,
additional_event_queues , upstream_root : str ,
rpcclient : RPCClient , poller_events , git_url_with_auth : bool ,
source_only : bool ,
add_cleanup : Callable [ [ Callable [ [ ] , None ] ] , None ] ) :
self . log = log
self . config = config
self . zk_config = zk_config
self . changes = changes
self . sched = zuul . scheduler . Scheduler ( self . config )
self . sched . setZuulApp ( self )
@ -3654,7 +3838,14 @@ class SchedulerTestApp:
self . sched . management_event_queue
]
self . sched . registerConnections ( connections )
# Register connections from the config using fakes
self . connections = TestConnectionRegistry (
self . changes , self . config , additional_event_queues ,
upstream_root , rpcclient , poller_events ,
git_url_with_auth , add_cleanup )
self . connections . configure ( self . config , source_only = source_only )
self . sched . registerConnections ( self . connections )
executor_client = zuul . executor . client . ExecutorClient (
self . config , self . sched )
@ -3697,8 +3888,15 @@ class SchedulerTestManager:
self . instances = [ ]
def create ( self , log : Logger , config : ConfigParser , zk_config : str ,
connections : ConnectionRegistry ) - > SchedulerTestApp :
app = SchedulerTestApp ( log , config , zk_config , connections )
changes : Dict [ str , Dict [ str , Change ] ] , additional_event_queues ,
upstream_root : str , rpcclient : RPCClient , poller_events ,
git_url_with_auth : bool , source_only : bool ,
add_cleanup : Callable [ [ Callable [ [ ] , None ] ] , None ] ) \
- > SchedulerTestApp :
app = SchedulerTestApp ( log , config , zk_config , changes ,
additional_event_queues , upstream_root ,
rpcclient , poller_events , git_url_with_auth ,
source_only , add_cleanup )
self . instances . append ( app )
return app
@ -3799,20 +3997,31 @@ class ZuulTestCase(BaseTestCase):
"""
config_file = ' zuul.conf '
run_ansible = False
create_project_keys = False
use_ssl = False
git_url_with_auth = False
log_console_port = 19885
def __init__ ( self , * args , * * kwargs ) :
super ( ) . __init__ ( * args , * * kwargs )
self . fake_gerrit = None
config_file : str = ' zuul.conf '
run_ansible : bool = False
create_project_keys : bool = False
use_ssl : bool = False
git_url_with_auth : bool = False
log_console_port : int = 19885
source_only : bool = False
def __getattr__ ( self , name ) :
""" Allows to access fake connections the old way, e.g., using
`fake_gerrit` for FakeGerritConnection .
This will access the connection of the first ( default ) scheduler
( `self.scheds.first` ) . To access connections of a different
scheduler use `self.scheds[{X}].connections.fake_{NAME}` .
"""
if name . startswith ( ' fake_ ' ) and \
hasattr ( self . scheds . first . connections , name ) :
return getattr ( self . scheds . first . connections , name )
raise AttributeError ( " ' ZuulTestCase ' object has no attribute ' %s ' "
% name )
def _startMerger ( self ) :
self . merge_server = zuul . merger . server . MergeServer ( self . config ,
self . connections )
self . merge_server = zuul . merger . server . MergeServer (
self . config , self . scheds . first . connections )
self . merge_server . start ( )
def setUp ( self ) :
@ -3915,12 +4124,21 @@ class ZuulTestCase(BaseTestCase):
gerritsource . GerritSource . replication_retry_interval = 0.5
gerritconnection . GerritEventConnector . delay = 0.0
self . changes : Dict [ str , Dict [ str , Change ] ] = { }
self . additional_event_queues = [ ]
self . poller_events = { }
self . configure_connections ( )
self . _configureSmtp ( )
self . _configureMqtt ( )
executor_connections = TestConnectionRegistry (
self . changes , self . config , self . additional_event_queues ,
self . upstream_root , self . rpcclient , self . poller_events ,
self . git_url_with_auth , self . addCleanup )
executor_connections . configure ( self . config ,
source_only = self . source_only )
self . executor_server = RecordingExecutorServer (
self . config , self . connections ,
self . config , executor_ connections,
jobdir_root = self . jobdir_root ,
_run_ansible = self . run_ansible ,
_test_root = self . test_root ,
@ -3932,7 +4150,10 @@ class ZuulTestCase(BaseTestCase):
self . scheds = SchedulerTestManager ( )
self . scheds . create (
self . log , self . config , self . zk_config , self . connections )
self . log , self . config , self . zk_config , self . changes ,
self . additional_event_queues , self . upstream_root , self . rpcclient ,
self . poller_events , self . git_url_with_auth , self . source_only ,
self . addCleanup )
if hasattr ( self , ' fake_github ' ) :
self . additional_event_queues . append (
@ -3951,108 +4172,7 @@ class ZuulTestCase(BaseTestCase):
return [ item for sublist in sched_queues for item in sublist ] + \
self . additional_event_queues
def configure_connections ( self , source_only = False ) :
# Set up gerrit related fakes
# Set a changes database so multiple FakeGerrit's can report back to
# a virtual canonical database given by the configured hostname
self . gerrit_changes_dbs = { }
self . github_changes_dbs = { }
self . pagure_changes_dbs = { }
self . gitlab_changes_dbs = { }
def getGerritConnection ( driver , name , config ) :
db = self . gerrit_changes_dbs . setdefault ( config [ ' server ' ] , { } )
poll_event = self . poller_events . setdefault ( name , threading . Event ( ) )
ref_event = self . poller_events . setdefault ( name + ' -ref ' ,
threading . Event ( ) )
con = FakeGerritConnection ( driver , name , config ,
changes_db = db ,
upstream_root = self . upstream_root ,
poller_event = poll_event ,
ref_watcher_event = ref_event )
if con . web_server :
self . addCleanup ( con . web_server . stop )
self . additional_event_queues . append ( con . event_queue )
setattr ( self , ' fake_ ' + name , con )
return con
self . useFixture ( fixtures . MonkeyPatch (
' zuul.driver.gerrit.GerritDriver.getConnection ' ,
getGerritConnection ) )
def registerGithubProjects ( con ) :
path = self . config . get ( ' scheduler ' , ' tenant_config ' )
with open ( os . path . join ( FIXTURE_DIR , path ) ) as f :
tenant_config = yaml . safe_load ( f . read ( ) )
for tenant in tenant_config :
sources = tenant [ ' tenant ' ] [ ' source ' ]
conf = sources . get ( con . source . name )
if not conf :
return
projects = conf . get ( ' config-projects ' , [ ] )
projects . extend ( conf . get ( ' untrusted-projects ' , [ ] ) )
client = con . getGithubClient ( None )
for project in projects :
if isinstance ( project , dict ) :
# This can be a dict with the project as the only key
client . addProjectByName (
list ( project . keys ( ) ) [ 0 ] )
else :
client . addProjectByName ( project )
def getGithubConnection ( driver , name , config ) :
server = config . get ( ' server ' , ' github.com ' )
db = self . github_changes_dbs . setdefault ( server , { } )
con = FakeGithubConnection (
driver , name , config ,
self . rpcclient ,
changes_db = db ,
upstream_root = self . upstream_root ,
git_url_with_auth = self . git_url_with_auth )
self . additional_event_queues . append ( con . event_queue )
setattr ( self , ' fake_ ' + name , con )
registerGithubProjects ( con )
return con
self . useFixture ( fixtures . MonkeyPatch (
' zuul.driver.github.GithubDriver.getConnection ' ,
getGithubConnection ) )
def getPagureConnection ( driver , name , config ) :
server = config . get ( ' server ' , ' pagure.io ' )
db = self . pagure_changes_dbs . setdefault ( server , { } )
con = FakePagureConnection (
driver , name , config ,
self . rpcclient ,
changes_db = db ,
upstream_root = self . upstream_root )
self . additional_event_queues . append ( con . event_queue )
setattr ( self , ' fake_ ' + name , con )
return con
self . useFixture ( fixtures . MonkeyPatch (
' zuul.driver.pagure.PagureDriver.getConnection ' ,
getPagureConnection ) )
def getGitlabConnection ( driver , name , config ) :
server = config . get ( ' server ' , ' gitlab.com ' )
db = self . gitlab_changes_dbs . setdefault ( server , { } )
con = FakeGitlabConnection (
driver , name , config ,
self . rpcclient ,
changes_db = db ,
upstream_root = self . upstream_root )
self . additional_event_queues . append ( con . event_queue )
setattr ( self , ' fake_ ' + name , con )
return con
self . useFixture ( fixtures . MonkeyPatch (
' zuul.driver.gitlab.GitlabDriver.getConnection ' ,
getGitlabConnection ) )
def _configureSmtp ( self ) :
# Set up smtp related fakes
# TODO(jhesketh): This should come from lib.connections for better
# coverage
@ -4065,6 +4185,7 @@ class ZuulTestCase(BaseTestCase):
self . useFixture ( fixtures . MonkeyPatch ( ' smtplib.SMTP ' , FakeSMTPFactory ) )
def _configureMqtt ( self ) :
# Set up mqtt related fakes
self . mqtt_messages = [ ]
@ -4076,10 +4197,6 @@ class ZuulTestCase(BaseTestCase):
' zuul.driver.mqtt.mqttconnection.MQTTConnection.publish ' ,
fakeMQTTPublish ) )
# Register connections from the config using fakes
self . connections = zuul . lib . connections . ConnectionRegistry ( )
self . connections . configure ( self . config , source_only = source_only )
def setup_config ( self , config_file : str ) :
# This creates the per-test configuration object. It can be
# overridden by subclasses, but should not need to be since it
@ -4343,6 +4460,7 @@ class ZuulTestCase(BaseTestCase):
if self . merge_server :
self . merge_server . stop ( )
self . merge_server . join ( )
self . executor_server . stop ( )
self . executor_server . join ( )
self . scheds . execute ( lambda app : app . sched . stop ( ) )
@ -4373,9 +4491,12 @@ class ZuulTestCase(BaseTestCase):
and not t . name . startswith ( ' ptvsd. ' )
]
if len ( threads ) > 1 :
thread_map = dict ( map ( lambda x : ( x . ident , x . name ) ,
threading . enumerate ( ) ) )
log_str = " "
for thread_id , stack_frame in sys . _current_frames ( ) . items ( ) :
log_str + = " Thread: %s \n " % thread_id
log_str + = " Thread id: %s , name: %s \n " % (
thread_id , thread_map . get ( thread_id , ' UNKNOWN ' ) )
log_str + = " " . join ( traceback . format_stack ( stack_frame ) )
self . log . debug ( log_str )
raise Exception ( " More than one thread is running: %s " % threads )
@ -4580,8 +4701,7 @@ class ZuulTestCase(BaseTestCase):
return False
return True
def __eventQueuesEmpty ( self , matcher ) \
- > Generator [ bool , None , None ] :
def __eventQueuesEmpty ( self , matcher ) - > Generator [ bool , None , None ] :
for event_queue in self . __event_queues ( matcher ) :
yield event_queue . empty ( )
@ -4606,10 +4726,14 @@ class ZuulTestCase(BaseTestCase):
( event_queue , event_queue . empty ( ) ) )
self . log . error ( " All builds waiting: %s " %
( self . __areAllBuildsWaiting ( matcher ) , ) )
self . log . error ( " All merge jobs waiting: %s " %
( self . __areAllMergeJobsWaiting ( matcher ) , ) )
self . log . error ( " All builds reported: %s " %
( self . __haveAllBuildsReported ( matcher ) , ) )
self . log . error ( " All requests completed: %s " %
( self . __areAllNodeRequestsComplete ( matcher ) , ) )
self . log . error ( " All event queues empty: %s " %
( all ( self . __eventQueuesEmpty ( matcher ) ) , ) )
for app in self . scheds . filter ( matcher ) :
self . log . error ( " [Sched: %s ] Merge client jobs: %s " %
( app . sched , app . sched . merger . jobs , ) )
@ -5048,8 +5172,8 @@ class ZuulTestCase(BaseTestCase):
: arg str event : The JSON - encoded event .
"""
specified_conn = self . connections . connections [ connection ]
for conn in self . connections . connections . values ( ) :
specified_conn = self . scheds . first . connections . connections [ connection ]
for conn in self . scheds . first . connections . connections . values ( ) :
if ( isinstance ( conn , specified_conn . __class__ ) and
specified_conn . server == conn . server ) :
conn . addEvent ( event )