Make ConnectionRegistry mandatory for Scheduler

So far the connection registry was added after the Scheduler was
instantiated.

We can make the ConnectionRegistry mandatory to simplify the
Scheduler instantiation.

Change-Id: Iff7b1a597c97f2cd13bea75f9f23585b0e7f76b3
This commit is contained in:
Jan Kubovy 2021-02-08 15:29:43 +01:00 committed by James E. Blair
parent 2dfb34a818
commit 5d1aeeffb5
5 changed files with 45 additions and 46 deletions

View File

@ -32,7 +32,6 @@ import queue
import random import random
import re import re
from collections import defaultdict, namedtuple from collections import defaultdict, namedtuple
from logging import Logger
from queue import Queue from queue import Queue
from typing import Callable, Optional, Any, Iterable, Generator, List, Dict from typing import Callable, Optional, Any, Iterable, Generator, List, Dict
@ -3996,19 +3995,33 @@ class SymLink(object):
class SchedulerTestApp: class SchedulerTestApp:
def __init__(self, log: Logger, config: ConfigParser, def __init__(self, log, config, changes, additional_event_queues,
changes: Dict[str, Dict[str, Change]], upstream_root, rpcclient, poller_events,
additional_event_queues, upstream_root: str, git_url_with_auth, source_only, fake_sql,
rpcclient: RPCClient, poller_events, git_url_with_auth: bool, add_cleanup, validate_tenants):
source_only: bool,
fake_sql: bool,
add_cleanup: Callable[[Callable[[], None]], None]):
self.log = log self.log = log
self.config = config self.config = config
self.changes = changes self.changes = changes
self.validate_tenants = validate_tenants
# Register connections from the config using fakes
self.connections = TestConnectionRegistry(
self.changes,
self.config,
additional_event_queues,
upstream_root,
rpcclient,
poller_events,
git_url_with_auth,
add_cleanup,
fake_sql,
)
self.connections.configure(self.config, source_only=source_only)
self.sched = zuul.scheduler.Scheduler(self.config, self.connections)
if validate_tenants is None:
self.connections.registerScheduler(self.sched)
self.sched = zuul.scheduler.Scheduler(self.config)
self.sched.setZuulApp(self) self.sched.setZuulApp(self)
self.sched._stats_interval = 1 self.sched._stats_interval = 1
@ -4018,15 +4031,6 @@ class SchedulerTestApp:
self.sched.management_event_queue self.sched.management_event_queue
] ]
# Register connections from the config using fakes
self.connections = TestConnectionRegistry(
self.changes, self.config, additional_event_queues,
upstream_root, rpcclient, poller_events,
git_url_with_auth, add_cleanup, fake_sql)
self.connections.configure(self.config, source_only=source_only)
self.sched.registerConnections(self.connections)
executor_client = zuul.executor.client.ExecutorClient( executor_client = zuul.executor.client.ExecutorClient(
self.config, self.sched) self.config, self.sched)
merge_client = RecordingMergeClient(self.config, self.sched) merge_client = RecordingMergeClient(self.config, self.sched)
@ -4063,20 +4067,20 @@ class SchedulerTestApp:
class SchedulerTestManager: class SchedulerTestManager:
def __init__(self): def __init__(self, validate_tenants):
self.instances = [] self.instances = []
self.validate_tenants = validate_tenants
def create(self, log: Logger, config: ConfigParser, def create(self, log, config, changes, additional_event_queues,
changes: Dict[str, Dict[str, Change]], additional_event_queues, upstream_root, rpcclient, poller_events,
upstream_root: str, rpcclient: RPCClient, poller_events, git_url_with_auth, source_only, fake_sql, add_cleanup,
git_url_with_auth: bool, source_only: bool, validate_tenants):
fake_sql: bool,
add_cleanup: Callable[[Callable[[], None]], None])\
-> SchedulerTestApp:
app = SchedulerTestApp(log, config, changes, app = SchedulerTestApp(log, config, changes,
additional_event_queues, upstream_root, additional_event_queues, upstream_root,
rpcclient, poller_events, git_url_with_auth, rpcclient, poller_events,
source_only, fake_sql, add_cleanup) git_url_with_auth, source_only,
fake_sql, add_cleanup,
validate_tenants)
self.instances.append(app) self.instances.append(app)
return app return app
@ -4344,12 +4348,12 @@ class ZuulTestCase(BaseTestCase):
self.history = self.executor_server.build_history self.history = self.executor_server.build_history
self.builds = self.executor_server.running_builds self.builds = self.executor_server.running_builds
self.scheds = SchedulerTestManager() self.scheds = SchedulerTestManager(self.validate_tenants)
self.scheds.create( self.scheds.create(
self.log, self.config, self.changes, self.log, self.config, self.changes,
self.additional_event_queues, self.upstream_root, self.rpcclient, self.additional_event_queues, self.upstream_root, self.rpcclient,
self.poller_events, self.git_url_with_auth, self.source_only, self.poller_events, self.git_url_with_auth, self.source_only,
self.fake_sql, self.addCleanup) self.fake_sql, self.addCleanup, self.validate_tenants)
if hasattr(self, 'fake_github'): if hasattr(self, 'fake_github'):
self.additional_event_queues.append( self.additional_event_queues.append(

View File

@ -726,9 +726,9 @@ class Client(zuul.cmd.ZuulApp):
def validate(self): def validate(self):
from zuul import scheduler from zuul import scheduler
from zuul import configloader from zuul import configloader
sched = scheduler.Scheduler(self.config, testonly=True)
self.configure_connections(source_only=True) self.configure_connections(source_only=True)
sched.registerConnections(self.connections, load=False) sched = scheduler.Scheduler(self.config, self.connections,
testonly=True)
loader = configloader.ConfigLoader( loader = configloader.ConfigLoader(
sched.connections, sched, None, None) sched.connections, sched, None, None)
tenant_config, script = sched._checkTenantSourceConf(self.config) tenant_config, script = sched._checkTenantSourceConf(self.config)

View File

@ -138,12 +138,13 @@ class Scheduler(zuul.cmd.ZuulDaemonApp):
self.setup_logging('scheduler', 'log_config') self.setup_logging('scheduler', 'log_config')
self.log = logging.getLogger("zuul.Scheduler") self.log = logging.getLogger("zuul.Scheduler")
self.sched = zuul.scheduler.Scheduler(self.config) self.configure_connections(require_sql=True)
self.sched = zuul.scheduler.Scheduler(self.config, self.connections)
if self.args.validate_tenants is None:
self.connections.registerScheduler(self.sched)
self.sched.setZuulApp(self) self.sched.setZuulApp(self)
merger = zuul.merger.client.MergeClient(self.config, self.sched) merger = zuul.merger.client.MergeClient(self.config, self.sched)
self.configure_connections(require_sql=True)
self.sched.setMerger(merger) self.sched.setMerger(merger)
if self.args.validate_tenants is None: if self.args.validate_tenants is None:
@ -159,7 +160,6 @@ class Scheduler(zuul.cmd.ZuulDaemonApp):
self.log.info('Starting scheduler') self.log.info('Starting scheduler')
try: try:
self.sched.start() self.sched.start()
self.sched.registerConnections(self.connections)
self.sched.reconfigure(self.config, self.sched.reconfigure(self.config,
validate_tenants=self.args.validate_tenants) validate_tenants=self.args.validate_tenants)
self.sched.wakeUp() self.sched.wakeUp()

View File

@ -48,6 +48,7 @@ class BaseConnection(object, metaclass=abc.ABCMeta):
self.driver = driver self.driver = driver
self.connection_name = connection_name self.connection_name = connection_name
self.connection_config = connection_config self.connection_config = connection_config
self.sched = None
def logEvent(self, event): def logEvent(self, event):
log = get_annotated_logger(self.log, event.zuul_event_id) log = get_annotated_logger(self.log, event.zuul_event_id)
@ -73,7 +74,7 @@ class BaseConnection(object, metaclass=abc.ABCMeta):
def onStop(self): def onStop(self):
pass pass
def registerScheduler(self, sched): def registerScheduler(self, sched) -> None:
self.sched = sched self.sched = sched
def clearCache(self): def clearCache(self):

View File

@ -291,7 +291,7 @@ class Scheduler(threading.Thread):
# Number of seconds past node expiration a hold request will remain # Number of seconds past node expiration a hold request will remain
EXPIRED_HOLD_REQUEST_TTL = 24 * 60 * 60 EXPIRED_HOLD_REQUEST_TTL = 24 * 60 * 60
def __init__(self, config, testonly=False): def __init__(self, config, connections, testonly=False):
threading.Thread.__init__(self) threading.Thread.__init__(self)
self.daemon = True self.daemon = True
self.hostname = socket.getfqdn() self.hostname = socket.getfqdn()
@ -310,7 +310,7 @@ class Scheduler(threading.Thread):
self._zuul_app = None self._zuul_app = None
self.executor = None self.executor = None
self.merger = None self.merger = None
self.connections = None self.connections = connections
self.statsd = get_statsd(config) self.statsd = get_statsd(config)
self.rpc = rpclistener.RPCListener(config, self) self.rpc = rpclistener.RPCListener(config, self)
self.rpc_slow = rpclistener.RPCListenerSlow(config, self) self.rpc_slow = rpclistener.RPCListenerSlow(config, self)
@ -406,12 +406,6 @@ class Scheduler(threading.Thread):
except Exception: except Exception:
self.log.exception("Exception while processing command") self.log.exception("Exception while processing command")
def registerConnections(self, connections, load=True):
# load: whether or not to trigger the onLoad for the connection. This
# is useful for not doing a full load during layout validation.
self.connections = connections
self.connections.registerScheduler(self, load)
def stopConnections(self): def stopConnections(self):
self.connections.stop() self.connections.stop()