Instantiate executor client, merger, nodepool and app within Scheduler
Executor client, merger, nodepool and app were instantiated outside the scheduler and then set using "setX" methods. Those three components are considered as mandatory and should therefore be part of all scheduler instances. This was useful for layout validation where the scheduler was not run but just instantiated. Since the layout validation does not need to instantiate a scheduler anymore, this can be simplified by instantiating these components within the scheduler's constructor. Change-Id: Ide96a85d17820e3950704577ca6fd0d082e26182
This commit is contained in:
committed by
James E. Blair
parent
5d1aeeffb5
commit
e7e1fa2660
+7
-13
@@ -3228,6 +3228,10 @@ class RecordingExecutorServer(zuul.executor.server.ExecutorServer):
|
||||
super(RecordingExecutorServer, self).stop()
|
||||
|
||||
|
||||
class TestScheduler(zuul.scheduler.Scheduler):
|
||||
_merger_client_class = RecordingMergeClient
|
||||
|
||||
|
||||
class FakeGearmanServer(gear.Server):
|
||||
"""A Gearman server for use in tests.
|
||||
|
||||
@@ -4018,28 +4022,18 @@ class SchedulerTestApp:
|
||||
)
|
||||
self.connections.configure(self.config, source_only=source_only)
|
||||
|
||||
self.sched = zuul.scheduler.Scheduler(self.config, self.connections)
|
||||
self.sched = TestScheduler(self.config, self.connections, self)
|
||||
self.sched._stats_interval = 1
|
||||
|
||||
if validate_tenants is None:
|
||||
self.connections.registerScheduler(self.sched)
|
||||
|
||||
self.sched.setZuulApp(self)
|
||||
self.sched._stats_interval = 1
|
||||
|
||||
self.event_queues = [
|
||||
self.sched.result_event_queue,
|
||||
self.sched.trigger_event_queue,
|
||||
self.sched.management_event_queue
|
||||
]
|
||||
|
||||
executor_client = zuul.executor.client.ExecutorClient(
|
||||
self.config, self.sched)
|
||||
merge_client = RecordingMergeClient(self.config, self.sched)
|
||||
nodepool = zuul.nodepool.Nodepool(self.sched)
|
||||
|
||||
self.sched.setExecutor(executor_client)
|
||||
self.sched.setMerger(merge_client)
|
||||
self.sched.setNodepool(nodepool)
|
||||
|
||||
def start(self, validate_tenants: list):
|
||||
self.sched.start()
|
||||
self.sched.executor.gearman.waitForServer()
|
||||
|
||||
+1
-1
@@ -728,7 +728,7 @@ class Client(zuul.cmd.ZuulApp):
|
||||
from zuul import configloader
|
||||
self.configure_connections(source_only=True)
|
||||
sched = scheduler.Scheduler(self.config, self.connections,
|
||||
testonly=True)
|
||||
self, testonly=True)
|
||||
loader = configloader.ConfigLoader(
|
||||
sched.connections, sched, None, None)
|
||||
tenant_config, script = sched._checkTenantSourceConf(self.config)
|
||||
|
||||
+2
-18
@@ -19,11 +19,8 @@ import sys
|
||||
import signal
|
||||
|
||||
import zuul.cmd
|
||||
import zuul.executor.client
|
||||
from zuul.lib.config import get_default
|
||||
from zuul.lib.statsd import get_statsd_config
|
||||
import zuul.merger.client
|
||||
import zuul.nodepool
|
||||
import zuul.scheduler
|
||||
|
||||
|
||||
@@ -139,24 +136,11 @@ class Scheduler(zuul.cmd.ZuulDaemonApp):
|
||||
self.log = logging.getLogger("zuul.Scheduler")
|
||||
|
||||
self.configure_connections(require_sql=True)
|
||||
self.sched = zuul.scheduler.Scheduler(self.config, self.connections)
|
||||
self.sched = zuul.scheduler.Scheduler(self.config,
|
||||
self.connections, self)
|
||||
if self.args.validate_tenants is None:
|
||||
self.connections.registerScheduler(self.sched)
|
||||
|
||||
self.sched.setZuulApp(self)
|
||||
merger = zuul.merger.client.MergeClient(self.config, self.sched)
|
||||
self.sched.setMerger(merger)
|
||||
|
||||
if self.args.validate_tenants is None:
|
||||
|
||||
# Only needed in full mode
|
||||
gearman = zuul.executor.client.ExecutorClient(self.config,
|
||||
self.sched)
|
||||
nodepool = zuul.nodepool.Nodepool(self.sched)
|
||||
|
||||
self.sched.setExecutor(gearman)
|
||||
self.sched.setNodepool(nodepool)
|
||||
|
||||
self.log.info('Starting scheduler')
|
||||
try:
|
||||
self.sched.start()
|
||||
|
||||
+12
-20
@@ -41,6 +41,9 @@ from zuul.lib.logutil import get_annotated_logger
|
||||
from zuul.lib.statsd import get_statsd, normalize_statsd_name
|
||||
import zuul.lib.queue
|
||||
import zuul.lib.repl
|
||||
from zuul import nodepool
|
||||
from zuul.executor.client import ExecutorClient
|
||||
from zuul.merger.client import MergeClient
|
||||
from zuul.model import Build, HoldRequest, Tenant, TriggerEvent
|
||||
from zuul.zk import ZooKeeperClient
|
||||
from zuul.zk.nodepool import ZooKeeperNodepool
|
||||
@@ -287,11 +290,12 @@ class Scheduler(threading.Thread):
|
||||
|
||||
log = logging.getLogger("zuul.Scheduler")
|
||||
_stats_interval = 30
|
||||
_merger_client_class = MergeClient
|
||||
|
||||
# Number of seconds past node expiration a hold request will remain
|
||||
EXPIRED_HOLD_REQUEST_TTL = 24 * 60 * 60
|
||||
|
||||
def __init__(self, config, connections, testonly=False):
|
||||
def __init__(self, config, connections, app, testonly=False):
|
||||
threading.Thread.__init__(self)
|
||||
self.daemon = True
|
||||
self.hostname = socket.getfqdn()
|
||||
@@ -307,9 +311,8 @@ class Scheduler(threading.Thread):
|
||||
}
|
||||
self._hibernate = False
|
||||
self._stopped = False
|
||||
self._zuul_app = None
|
||||
self.executor = None
|
||||
self.merger = None
|
||||
|
||||
self._zuul_app = app
|
||||
self.connections = connections
|
||||
self.statsd = get_statsd(config)
|
||||
self.rpc = rpclistener.RPCListener(config, self)
|
||||
@@ -367,6 +370,11 @@ class Scheduler(threading.Thread):
|
||||
self.ansible_manager = AnsibleManager(
|
||||
default_version=default_ansible_version)
|
||||
|
||||
if not testonly:
|
||||
self.executor = ExecutorClient(self.config, self)
|
||||
self.merger = self._merger_client_class(self.config, self)
|
||||
self.nodepool = nodepool.Nodepool(self)
|
||||
|
||||
def start(self):
|
||||
super(Scheduler, self).start()
|
||||
self._command_running = True
|
||||
@@ -409,22 +417,6 @@ class Scheduler(threading.Thread):
|
||||
def stopConnections(self):
|
||||
self.connections.stop()
|
||||
|
||||
def setZuulApp(self, app):
|
||||
self._zuul_app = app
|
||||
|
||||
def setExecutor(self, executor):
|
||||
self.executor = executor
|
||||
|
||||
def setMerger(self, merger):
|
||||
self.merger = merger
|
||||
|
||||
def setNodepool(self, nodepool):
|
||||
self.nodepool = nodepool
|
||||
|
||||
def setZooKeeper(self, zk_client):
|
||||
self.zk_client = zk_client
|
||||
self.zk_nodepool = ZooKeeperNodepool(zk_client)
|
||||
|
||||
def runStats(self):
|
||||
while not self.stats_stop.wait(self._stats_interval):
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user