Add --wait-for-init scheduler option
This instructs the scheduler to wait until all tenants have been initialized before processing pipelines. This can be useful for large systems with excess scheduler capacity to speed up a rolling restart. This also removes an unused instance variable from SchedulerTestManager. Change-Id: I19e733c881d1abf636674bf572f4764a0d018a8a
This commit is contained in:
parent
c5b55e59c8
commit
603b826911
8
releasenotes/notes/wait-for-init-934370422b22b442.yaml
Normal file
8
releasenotes/notes/wait-for-init-934370422b22b442.yaml
Normal file
@ -0,0 +1,8 @@
|
||||
---
|
||||
features:
|
||||
- |
|
||||
The scheduler now accepts an argument `--wait-for-init` which will
|
||||
cause it to wait until all tenants have been initialized before it
|
||||
begins processing pipelines. This may help large systems with
|
||||
excess scheduler capacity perform a rolling restart of schedulers
|
||||
more quickly.
|
@ -4379,11 +4379,12 @@ class SchedulerTestApp:
|
||||
def __init__(self, log, config, changes, additional_event_queues,
|
||||
upstream_root, poller_events,
|
||||
git_url_with_auth, add_cleanup, validate_tenants,
|
||||
instance_id):
|
||||
wait_for_init, instance_id):
|
||||
self.log = log
|
||||
self.config = config
|
||||
self.changes = changes
|
||||
self.validate_tenants = validate_tenants
|
||||
self.wait_for_init = wait_for_init
|
||||
|
||||
# Register connections from the config using fakes
|
||||
self.connections = TestConnectionRegistry(
|
||||
@ -4397,7 +4398,8 @@ class SchedulerTestApp:
|
||||
)
|
||||
self.connections.configure(self.config)
|
||||
|
||||
self.sched = TestScheduler(self.config, self.connections, self)
|
||||
self.sched = TestScheduler(self.config, self.connections, self,
|
||||
wait_for_init)
|
||||
self.sched.log = logging.getLogger(f"zuul.Scheduler-{instance_id}")
|
||||
self.sched._stats_interval = 1
|
||||
|
||||
@ -4463,13 +4465,12 @@ class SchedulerTestApp:
|
||||
|
||||
|
||||
class SchedulerTestManager:
|
||||
def __init__(self, validate_tenants):
|
||||
def __init__(self, validate_tenants, wait_for_init):
|
||||
self.instances = []
|
||||
self.validate_tenants = validate_tenants
|
||||
|
||||
def create(self, log, config, changes, additional_event_queues,
|
||||
upstream_root, poller_events,
|
||||
git_url_with_auth, add_cleanup, validate_tenants):
|
||||
upstream_root, poller_events, git_url_with_auth,
|
||||
add_cleanup, validate_tenants, wait_for_init):
|
||||
# Since the config contains a regex we cannot use copy.deepcopy()
|
||||
# as this will raise an exception with Python <3.7
|
||||
config_data = StringIO()
|
||||
@ -4490,7 +4491,7 @@ class SchedulerTestManager:
|
||||
additional_event_queues, upstream_root,
|
||||
poller_events,
|
||||
git_url_with_auth, add_cleanup,
|
||||
validate_tenants, instance_id)
|
||||
validate_tenants, wait_for_init, instance_id)
|
||||
self.instances.append(app)
|
||||
return app
|
||||
|
||||
@ -4593,6 +4594,7 @@ class ZuulTestCase(BaseTestCase):
|
||||
git_url_with_auth: bool = False
|
||||
log_console_port: int = 19885
|
||||
validate_tenants = None
|
||||
wait_for_init = None
|
||||
scheduler_count = SCHEDULER_COUNT
|
||||
|
||||
def __getattr__(self, name):
|
||||
@ -4757,7 +4759,8 @@ class ZuulTestCase(BaseTestCase):
|
||||
self.history = self.executor_server.build_history
|
||||
self.builds = self.executor_server.running_builds
|
||||
|
||||
self.scheds = SchedulerTestManager(self.validate_tenants)
|
||||
self.scheds = SchedulerTestManager(self.validate_tenants,
|
||||
self.wait_for_init)
|
||||
for _ in range(self.scheduler_count):
|
||||
self.createScheduler()
|
||||
|
||||
@ -4776,7 +4779,7 @@ class ZuulTestCase(BaseTestCase):
|
||||
self.log, self.config, self.changes,
|
||||
self.additional_event_queues, self.upstream_root,
|
||||
self.poller_events, self.git_url_with_auth,
|
||||
self.addCleanup, self.validate_tenants)
|
||||
self.addCleanup, self.validate_tenants, self.wait_for_init)
|
||||
|
||||
def createZKContext(self, lock=None):
|
||||
if lock is None:
|
||||
|
@ -8793,3 +8793,20 @@ class TestEventProcessing(ZuulTestCase):
|
||||
dict(name='tagjob', result='SUCCESS'),
|
||||
dict(name='checkjob', result='SUCCESS', changes='1,1'),
|
||||
], ordered=False)
|
||||
|
||||
|
||||
class TestWaitForInit(ZuulTestCase):
|
||||
tenant_config_file = 'config/single-tenant/main.yaml'
|
||||
wait_for_init = True
|
||||
|
||||
def setUp(self):
|
||||
with self.assertLogs('zuul.Scheduler-0', level='DEBUG') as full_logs:
|
||||
super().setUp()
|
||||
self.assertRegexInList('Waiting for tenant initialization',
|
||||
full_logs.output)
|
||||
|
||||
def test_wait_for_init(self):
|
||||
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||
A.addApproval('Code-Review', 2)
|
||||
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
|
||||
self.waitUntilSettled()
|
||||
|
@ -39,6 +39,10 @@ class Scheduler(zuul.cmd.ZuulDaemonApp):
|
||||
'listed, all tenants will be validated. '
|
||||
'Note: this requires ZooKeeper and '
|
||||
'will distribute work to mergers.')
|
||||
parser.add_argument('--wait-for-init', dest='wait_for_init',
|
||||
action='store_true',
|
||||
help='Wait until all tenants are fully loaded '
|
||||
'before beginning to process events.')
|
||||
self.addSubCommands(parser, zuul.scheduler.COMMANDS)
|
||||
return parser
|
||||
|
||||
@ -82,7 +86,8 @@ class Scheduler(zuul.cmd.ZuulDaemonApp):
|
||||
|
||||
self.configure_connections(require_sql=True)
|
||||
self.sched = zuul.scheduler.Scheduler(self.config,
|
||||
self.connections, self)
|
||||
self.connections, self,
|
||||
self.args.wait_for_init)
|
||||
if self.args.validate_tenants is None:
|
||||
self.connections.registerScheduler(self.sched)
|
||||
self.connections.load(self.sched.zk_client,
|
||||
|
@ -184,9 +184,11 @@ class Scheduler(threading.Thread):
|
||||
_merger_client_class = MergeClient
|
||||
_executor_client_class = ExecutorClient
|
||||
|
||||
def __init__(self, config, connections, app, testonly=False):
|
||||
def __init__(self, config, connections, app, wait_for_init,
|
||||
testonly=False):
|
||||
threading.Thread.__init__(self)
|
||||
self.daemon = True
|
||||
self.wait_for_init = wait_for_init
|
||||
self.hostname = socket.getfqdn()
|
||||
self.primed_event = threading.Event()
|
||||
# Wake up the main run loop
|
||||
@ -1901,6 +1903,9 @@ class Scheduler(threading.Thread):
|
||||
self.log.debug("Statsd enabled")
|
||||
else:
|
||||
self.log.debug("Statsd not configured")
|
||||
if self.wait_for_init:
|
||||
self.log.debug("Waiting for tenant initialization")
|
||||
self.primed_event.wait()
|
||||
while True:
|
||||
self.log.debug("Run handler sleeping")
|
||||
self.wake_event.wait()
|
||||
|
Loading…
x
Reference in New Issue
Block a user