diff --git a/tests/base.py b/tests/base.py index b6c8291509..e74980d109 100644 --- a/tests/base.py +++ b/tests/base.py @@ -3228,6 +3228,10 @@ class RecordingExecutorServer(zuul.executor.server.ExecutorServer): super(RecordingExecutorServer, self).stop() +class TestScheduler(zuul.scheduler.Scheduler): + _merger_client_class = RecordingMergeClient + + class FakeGearmanServer(gear.Server): """A Gearman server for use in tests. @@ -4020,28 +4024,18 @@ class SchedulerTestApp: ) self.connections.configure(self.config, source_only=source_only) - self.sched = zuul.scheduler.Scheduler(self.config, self.connections) + self.sched = TestScheduler(self.config, self.connections, self) + self.sched._stats_interval = 1 + if validate_tenants is None: self.connections.registerScheduler(self.sched) - self.sched.setZuulApp(self) - self.sched._stats_interval = 1 - self.event_queues = [ self.sched.result_event_queue, self.sched.trigger_event_queue, self.sched.management_event_queue ] - executor_client = zuul.executor.client.ExecutorClient( - self.config, self.sched) - merge_client = RecordingMergeClient(self.config, self.sched) - nodepool = zuul.nodepool.Nodepool(self.sched) - - self.sched.setExecutor(executor_client) - self.sched.setMerger(merge_client) - self.sched.setNodepool(nodepool) - def start(self, validate_tenants: list): self.sched.start() self.sched.executor.gearman.waitForServer() diff --git a/zuul/cmd/client.py b/zuul/cmd/client.py index 5c4c096056..0240f8b5ba 100755 --- a/zuul/cmd/client.py +++ b/zuul/cmd/client.py @@ -728,7 +728,7 @@ class Client(zuul.cmd.ZuulApp): from zuul import configloader self.configure_connections(source_only=True) sched = scheduler.Scheduler(self.config, self.connections, - testonly=True) + self, testonly=True) loader = configloader.ConfigLoader( sched.connections, sched, None, None) tenant_config, script = sched._checkTenantSourceConf(self.config) diff --git a/zuul/cmd/scheduler.py b/zuul/cmd/scheduler.py index 89435b39da..d1ec602147 100755 --- a/zuul/cmd/scheduler.py +++ b/zuul/cmd/scheduler.py @@ -19,11 +19,8 @@ import sys import signal import zuul.cmd -import zuul.executor.client from zuul.lib.config import get_default from zuul.lib.statsd import get_statsd_config -import zuul.merger.client -import zuul.nodepool import zuul.scheduler @@ -139,24 +136,11 @@ class Scheduler(zuul.cmd.ZuulDaemonApp): self.log = logging.getLogger("zuul.Scheduler") self.configure_connections(require_sql=True) - self.sched = zuul.scheduler.Scheduler(self.config, self.connections) + self.sched = zuul.scheduler.Scheduler(self.config, + self.connections, self) if self.args.validate_tenants is None: self.connections.registerScheduler(self.sched) - self.sched.setZuulApp(self) - merger = zuul.merger.client.MergeClient(self.config, self.sched) - self.sched.setMerger(merger) - - if self.args.validate_tenants is None: - - # Only needed in full mode - gearman = zuul.executor.client.ExecutorClient(self.config, - self.sched) - nodepool = zuul.nodepool.Nodepool(self.sched) - - self.sched.setExecutor(gearman) - self.sched.setNodepool(nodepool) - self.log.info('Starting scheduler') try: self.sched.start() diff --git a/zuul/scheduler.py b/zuul/scheduler.py index 1731106515..ba673ec3a6 100644 --- a/zuul/scheduler.py +++ b/zuul/scheduler.py @@ -41,6 +41,9 @@ from zuul.lib.logutil import get_annotated_logger from zuul.lib.statsd import get_statsd, normalize_statsd_name import zuul.lib.queue import zuul.lib.repl +from zuul import nodepool +from zuul.executor.client import ExecutorClient +from zuul.merger.client import MergeClient from zuul.model import Build, HoldRequest, Tenant, TriggerEvent from zuul.zk import ZooKeeperClient from zuul.zk.nodepool import ZooKeeperNodepool @@ -287,11 +290,12 @@ class Scheduler(threading.Thread): log = logging.getLogger("zuul.Scheduler") _stats_interval = 30 + _merger_client_class = MergeClient # Number of seconds past node expiration a hold request will remain EXPIRED_HOLD_REQUEST_TTL = 24 * 60 * 60 - def __init__(self, config, connections, testonly=False): + def __init__(self, config, connections, app, testonly=False): threading.Thread.__init__(self) self.daemon = True self.hostname = socket.getfqdn() @@ -307,9 +311,8 @@ class Scheduler(threading.Thread): } self._hibernate = False self._stopped = False - self._zuul_app = None - self.executor = None - self.merger = None + + self._zuul_app = app self.connections = connections self.statsd = get_statsd(config) self.rpc = rpclistener.RPCListener(config, self) @@ -367,6 +370,11 @@ class Scheduler(threading.Thread): self.ansible_manager = AnsibleManager( default_version=default_ansible_version) + if not testonly: + self.executor = ExecutorClient(self.config, self) + self.merger = self._merger_client_class(self.config, self) + self.nodepool = nodepool.Nodepool(self) + def start(self): super(Scheduler, self).start() self._command_running = True @@ -409,22 +417,6 @@ class Scheduler(threading.Thread): def stopConnections(self): self.connections.stop() - def setZuulApp(self, app): - self._zuul_app = app - - def setExecutor(self, executor): - self.executor = executor - - def setMerger(self, merger): - self.merger = merger - - def setNodepool(self, nodepool): - self.nodepool = nodepool - - def setZooKeeper(self, zk_client): - self.zk_client = zk_client - self.zk_nodepool = ZooKeeperNodepool(zk_client) - def runStats(self): while not self.stats_stop.wait(self._stats_interval): try: