Extracting scheduler in test base
As a preparation for scale-out-scheduler the scheduler in tests needs to be extracted in order to start multiple instances. For brevity this change only extracts the instantiation of the scheduler in the test class with no side changes on existing tests. Further changes will follow to step-by-step prepare test base for scale-out-scheduler changes. Change-Id: If8a55873c580b66137476d20d4bfcb5a22544522 Story: 2007192
This commit is contained in:
parent
51ff675fad
commit
5f40b8a06e
132
tests/base.py
132
tests/base.py
|
@ -14,6 +14,7 @@
|
|||
# under the License.
|
||||
|
||||
import configparser
|
||||
from configparser import ConfigParser
|
||||
from contextlib import contextmanager
|
||||
import copy
|
||||
import datetime
|
||||
|
@ -28,6 +29,8 @@ import os
|
|||
import queue
|
||||
import random
|
||||
import re
|
||||
from logging import Logger
|
||||
|
||||
import requests
|
||||
import select
|
||||
import shutil
|
||||
|
@ -58,6 +61,7 @@ import testtools.content_type
|
|||
from git.exc import NoSuchPathError
|
||||
import yaml
|
||||
import paramiko
|
||||
from zuul.lib.connections import ConnectionRegistry
|
||||
|
||||
import tests.fakegithub
|
||||
import zuul.driver.gerrit.gerritsource as gerritsource
|
||||
|
@ -3150,6 +3154,61 @@ class SymLink(object):
|
|||
self.target = target
|
||||
|
||||
|
||||
class SchedulerTestApp:
|
||||
def __init__(self, log: Logger, config: ConfigParser, zk_config: str,
|
||||
connections: ConnectionRegistry):
|
||||
self.log = log
|
||||
self.config = config
|
||||
self.zk_config = zk_config
|
||||
|
||||
self.sched = zuul.scheduler.Scheduler(self.config)
|
||||
self.sched.setZuulApp(self)
|
||||
self.sched._stats_interval = 1
|
||||
|
||||
self.event_queues = [
|
||||
self.sched.result_event_queue,
|
||||
self.sched.trigger_event_queue,
|
||||
self.sched.management_event_queue
|
||||
]
|
||||
|
||||
self.sched.registerConnections(connections)
|
||||
|
||||
self.executor_client = zuul.executor.client.ExecutorClient(
|
||||
self.config, self.sched)
|
||||
self.merge_client = RecordingMergeClient(self.config, self.sched)
|
||||
self.nodepool = zuul.nodepool.Nodepool(self.sched)
|
||||
self.zk = zuul.zk.ZooKeeper(enable_cache=True)
|
||||
self.zk.connect(self.zk_config, timeout=30.0)
|
||||
|
||||
self.sched.setExecutor(self.executor_client)
|
||||
self.sched.setMerger(self.merge_client)
|
||||
self.sched.setNodepool(self.nodepool)
|
||||
self.sched.setZooKeeper(self.zk)
|
||||
|
||||
self.sched.start()
|
||||
self.executor_client.gearman.waitForServer()
|
||||
self.sched.reconfigure(self.config)
|
||||
self.sched.resume()
|
||||
|
||||
def fullReconfigure(self):
|
||||
try:
|
||||
self.sched.reconfigure(self.config)
|
||||
except Exception:
|
||||
self.log.exception("Reconfiguration failed:")
|
||||
|
||||
def smartReconfigure(self, command_socket=False):
|
||||
try:
|
||||
if command_socket:
|
||||
command_socket = self.config.get('scheduler', 'command_socket')
|
||||
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
|
||||
s.connect(command_socket)
|
||||
s.sendall('smart-reconfigure\n'.encode('utf8'))
|
||||
else:
|
||||
self.sched.reconfigure(self.config, smart=True)
|
||||
except Exception:
|
||||
self.log.exception("Reconfiguration failed:")
|
||||
|
||||
|
||||
class ZuulTestCase(BaseTestCase):
|
||||
"""A test case with a functioning Zuul.
|
||||
|
||||
|
@ -3227,6 +3286,10 @@ class ZuulTestCase(BaseTestCase):
|
|||
super(ZuulTestCase, self).setUp()
|
||||
|
||||
self.setupZK()
|
||||
self.fake_nodepool = FakeNodepool(
|
||||
self.zk_chroot_fixture.zookeeper_host,
|
||||
self.zk_chroot_fixture.zookeeper_port,
|
||||
self.zk_chroot_fixture.zookeeper_chroot)
|
||||
|
||||
if not KEEP_TEMPDIRS:
|
||||
tmp_root = self.useFixture(fixtures.TempDir(
|
||||
|
@ -3319,23 +3382,9 @@ class ZuulTestCase(BaseTestCase):
|
|||
gerritsource.GerritSource.replication_retry_interval = 0.5
|
||||
gerritconnection.GerritEventConnector.delay = 0.0
|
||||
|
||||
self.sched = zuul.scheduler.Scheduler(self.config)
|
||||
self.sched.setZuulApp(self)
|
||||
self.sched._stats_interval = 1
|
||||
|
||||
self.event_queues = [
|
||||
self.sched.result_event_queue,
|
||||
self.sched.trigger_event_queue,
|
||||
self.sched.management_event_queue
|
||||
]
|
||||
self.event_queues = []
|
||||
self.poller_events = {}
|
||||
|
||||
self.configure_connections()
|
||||
self.sched.registerConnections(self.connections)
|
||||
|
||||
if hasattr(self, 'fake_github'):
|
||||
self.event_queues.append(
|
||||
self.fake_github.github_event_connector._event_forward_queue)
|
||||
|
||||
self.executor_server = RecordingExecutorServer(
|
||||
self.config, self.connections,
|
||||
|
@ -3348,52 +3397,27 @@ class ZuulTestCase(BaseTestCase):
|
|||
self.history = self.executor_server.build_history
|
||||
self.builds = self.executor_server.running_builds
|
||||
|
||||
self.executor_client = zuul.executor.client.ExecutorClient(
|
||||
self.config, self.sched)
|
||||
self.merge_client = RecordingMergeClient(self.config, self.sched)
|
||||
self.sched_app = SchedulerTestApp(self.log, self.config,
|
||||
self.zk_config,
|
||||
self.connections)
|
||||
self.sched = self.sched_app.sched
|
||||
self.event_queues = self.sched_app.event_queues + self.event_queues
|
||||
|
||||
if hasattr(self, 'fake_github'):
|
||||
self.event_queues.append(
|
||||
self.fake_github.github_event_connector._event_forward_queue)
|
||||
|
||||
self.executor_client = self.sched_app.executor_client
|
||||
self.merge_client = self.sched_app.merge_client
|
||||
self.merge_server = None
|
||||
self.nodepool = zuul.nodepool.Nodepool(self.sched)
|
||||
self.zk = zuul.zk.ZooKeeper(enable_cache=True)
|
||||
self.zk.connect(self.zk_config, timeout=30.0)
|
||||
self.nodepool = self.sched_app.nodepool
|
||||
self.zk = self.sched_app.zk
|
||||
|
||||
self.fake_nodepool = FakeNodepool(
|
||||
self.zk_chroot_fixture.zookeeper_host,
|
||||
self.zk_chroot_fixture.zookeeper_port,
|
||||
self.zk_chroot_fixture.zookeeper_chroot)
|
||||
|
||||
self.sched.setExecutor(self.executor_client)
|
||||
self.sched.setMerger(self.merge_client)
|
||||
self.sched.setNodepool(self.nodepool)
|
||||
self.sched.setZooKeeper(self.zk)
|
||||
|
||||
self.sched.start()
|
||||
self.executor_client.gearman.waitForServer()
|
||||
# Cleanups are run in reverse order
|
||||
self.addCleanup(self.assertCleanShutdown)
|
||||
self.addCleanup(self.shutdown)
|
||||
self.addCleanup(self.assertFinalState)
|
||||
|
||||
self.sched.reconfigure(self.config)
|
||||
self.sched.resume()
|
||||
|
||||
def fullReconfigure(self):
|
||||
try:
|
||||
self.sched.reconfigure(self.config)
|
||||
except Exception:
|
||||
self.log.exception("Reconfiguration failed:")
|
||||
|
||||
def smartReconfigure(self, command_socket=False):
|
||||
try:
|
||||
if command_socket:
|
||||
command_socket = self.config.get('scheduler', 'command_socket')
|
||||
with socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) as s:
|
||||
s.connect(command_socket)
|
||||
s.sendall('smart-reconfigure\n'.encode('utf8'))
|
||||
else:
|
||||
self.sched.reconfigure(self.config, smart=True)
|
||||
except Exception:
|
||||
self.log.exception("Reconfiguration failed:")
|
||||
|
||||
def configure_connections(self, source_only=False):
|
||||
# Set up gerrit related fakes
|
||||
# Set a changes database so multiple FakeGerrit's can report back to
|
||||
|
|
|
@ -8169,7 +8169,7 @@ class TestSchedulerSmartReconfiguration(ZuulTestCase):
|
|||
|
||||
self.newTenantConfig('config/multi-tenant/main-reconfig.yaml')
|
||||
|
||||
self.smartReconfigure(command_socket=command_socket)
|
||||
self.sched_app.smartReconfigure(command_socket=command_socket)
|
||||
|
||||
# Wait for smart reconfiguration. Only tenant-two should be
|
||||
# reconfigured. Note that waitUntilSettled is not
|
||||
|
|
Loading…
Reference in New Issue