Merge "Instantiate executor client, merger, nodepool and app within Scheduler"

This commit is contained in:
Zuul 2021-03-12 22:47:30 +00:00 committed by Gerrit Code Review
commit 67eb943bb4
4 changed files with 22 additions and 52 deletions

View File

@ -3228,6 +3228,10 @@ class RecordingExecutorServer(zuul.executor.server.ExecutorServer):
super(RecordingExecutorServer, self).stop() super(RecordingExecutorServer, self).stop()
class TestScheduler(zuul.scheduler.Scheduler):
_merger_client_class = RecordingMergeClient
class FakeGearmanServer(gear.Server): class FakeGearmanServer(gear.Server):
"""A Gearman server for use in tests. """A Gearman server for use in tests.
@ -4020,28 +4024,18 @@ class SchedulerTestApp:
) )
self.connections.configure(self.config, source_only=source_only) self.connections.configure(self.config, source_only=source_only)
self.sched = zuul.scheduler.Scheduler(self.config, self.connections) self.sched = TestScheduler(self.config, self.connections, self)
self.sched._stats_interval = 1
if validate_tenants is None: if validate_tenants is None:
self.connections.registerScheduler(self.sched) self.connections.registerScheduler(self.sched)
self.sched.setZuulApp(self)
self.sched._stats_interval = 1
self.event_queues = [ self.event_queues = [
self.sched.result_event_queue, self.sched.result_event_queue,
self.sched.trigger_event_queue, self.sched.trigger_event_queue,
self.sched.management_event_queue self.sched.management_event_queue
] ]
executor_client = zuul.executor.client.ExecutorClient(
self.config, self.sched)
merge_client = RecordingMergeClient(self.config, self.sched)
nodepool = zuul.nodepool.Nodepool(self.sched)
self.sched.setExecutor(executor_client)
self.sched.setMerger(merge_client)
self.sched.setNodepool(nodepool)
def start(self, validate_tenants: list): def start(self, validate_tenants: list):
self.sched.start() self.sched.start()
self.sched.executor.gearman.waitForServer() self.sched.executor.gearman.waitForServer()

View File

@ -728,7 +728,7 @@ class Client(zuul.cmd.ZuulApp):
from zuul import configloader from zuul import configloader
self.configure_connections(source_only=True) self.configure_connections(source_only=True)
sched = scheduler.Scheduler(self.config, self.connections, sched = scheduler.Scheduler(self.config, self.connections,
testonly=True) self, testonly=True)
loader = configloader.ConfigLoader( loader = configloader.ConfigLoader(
sched.connections, sched, None, None) sched.connections, sched, None, None)
tenant_config, script = sched._checkTenantSourceConf(self.config) tenant_config, script = sched._checkTenantSourceConf(self.config)

View File

@ -19,11 +19,8 @@ import sys
import signal import signal
import zuul.cmd import zuul.cmd
import zuul.executor.client
from zuul.lib.config import get_default from zuul.lib.config import get_default
from zuul.lib.statsd import get_statsd_config from zuul.lib.statsd import get_statsd_config
import zuul.merger.client
import zuul.nodepool
import zuul.scheduler import zuul.scheduler
@ -139,24 +136,11 @@ class Scheduler(zuul.cmd.ZuulDaemonApp):
self.log = logging.getLogger("zuul.Scheduler") self.log = logging.getLogger("zuul.Scheduler")
self.configure_connections(require_sql=True) self.configure_connections(require_sql=True)
self.sched = zuul.scheduler.Scheduler(self.config, self.connections) self.sched = zuul.scheduler.Scheduler(self.config,
self.connections, self)
if self.args.validate_tenants is None: if self.args.validate_tenants is None:
self.connections.registerScheduler(self.sched) self.connections.registerScheduler(self.sched)
self.sched.setZuulApp(self)
merger = zuul.merger.client.MergeClient(self.config, self.sched)
self.sched.setMerger(merger)
if self.args.validate_tenants is None:
# Only needed in full mode
gearman = zuul.executor.client.ExecutorClient(self.config,
self.sched)
nodepool = zuul.nodepool.Nodepool(self.sched)
self.sched.setExecutor(gearman)
self.sched.setNodepool(nodepool)
self.log.info('Starting scheduler') self.log.info('Starting scheduler')
try: try:
self.sched.start() self.sched.start()

View File

@ -41,6 +41,9 @@ from zuul.lib.logutil import get_annotated_logger
from zuul.lib.statsd import get_statsd, normalize_statsd_name from zuul.lib.statsd import get_statsd, normalize_statsd_name
import zuul.lib.queue import zuul.lib.queue
import zuul.lib.repl import zuul.lib.repl
from zuul import nodepool
from zuul.executor.client import ExecutorClient
from zuul.merger.client import MergeClient
from zuul.model import Build, HoldRequest, Tenant, TriggerEvent from zuul.model import Build, HoldRequest, Tenant, TriggerEvent
from zuul.zk import ZooKeeperClient from zuul.zk import ZooKeeperClient
from zuul.zk.nodepool import ZooKeeperNodepool from zuul.zk.nodepool import ZooKeeperNodepool
@ -287,11 +290,12 @@ class Scheduler(threading.Thread):
log = logging.getLogger("zuul.Scheduler") log = logging.getLogger("zuul.Scheduler")
_stats_interval = 30 _stats_interval = 30
_merger_client_class = MergeClient
# Number of seconds past node expiration a hold request will remain # Number of seconds past node expiration a hold request will remain
EXPIRED_HOLD_REQUEST_TTL = 24 * 60 * 60 EXPIRED_HOLD_REQUEST_TTL = 24 * 60 * 60
def __init__(self, config, connections, testonly=False): def __init__(self, config, connections, app, testonly=False):
threading.Thread.__init__(self) threading.Thread.__init__(self)
self.daemon = True self.daemon = True
self.hostname = socket.getfqdn() self.hostname = socket.getfqdn()
@ -307,9 +311,8 @@ class Scheduler(threading.Thread):
} }
self._hibernate = False self._hibernate = False
self._stopped = False self._stopped = False
self._zuul_app = None
self.executor = None self._zuul_app = app
self.merger = None
self.connections = connections self.connections = connections
self.statsd = get_statsd(config) self.statsd = get_statsd(config)
self.rpc = rpclistener.RPCListener(config, self) self.rpc = rpclistener.RPCListener(config, self)
@ -367,6 +370,11 @@ class Scheduler(threading.Thread):
self.ansible_manager = AnsibleManager( self.ansible_manager = AnsibleManager(
default_version=default_ansible_version) default_version=default_ansible_version)
if not testonly:
self.executor = ExecutorClient(self.config, self)
self.merger = self._merger_client_class(self.config, self)
self.nodepool = nodepool.Nodepool(self)
def start(self): def start(self):
super(Scheduler, self).start() super(Scheduler, self).start()
self._command_running = True self._command_running = True
@ -409,22 +417,6 @@ class Scheduler(threading.Thread):
def stopConnections(self): def stopConnections(self):
self.connections.stop() self.connections.stop()
def setZuulApp(self, app):
self._zuul_app = app
def setExecutor(self, executor):
self.executor = executor
def setMerger(self, merger):
self.merger = merger
def setNodepool(self, nodepool):
self.nodepool = nodepool
def setZooKeeper(self, zk_client):
self.zk_client = zk_client
self.zk_nodepool = ZooKeeperNodepool(zk_client)
def runStats(self): def runStats(self):
while not self.stats_stop.wait(self._stats_interval): while not self.stats_stop.wait(self._stats_interval):
try: try: