Merge "Add --wait-for-init scheduler option"
This commit is contained in:
commit
ec83f42a75
|
@ -0,0 +1,8 @@
|
||||||
|
---
|
||||||
|
features:
|
||||||
|
- |
|
||||||
|
The scheduler now accepts an argument `--wait-for-init` which will
|
||||||
|
cause it to wait until all tenants have been initialized before it
|
||||||
|
begins processing pipelines. This may help large systems with
|
||||||
|
excess scheduler capacity perform a rolling restart of schedulers
|
||||||
|
more quickly.
|
|
@ -4379,11 +4379,12 @@ class SchedulerTestApp:
|
||||||
def __init__(self, log, config, changes, additional_event_queues,
|
def __init__(self, log, config, changes, additional_event_queues,
|
||||||
upstream_root, poller_events,
|
upstream_root, poller_events,
|
||||||
git_url_with_auth, add_cleanup, validate_tenants,
|
git_url_with_auth, add_cleanup, validate_tenants,
|
||||||
instance_id):
|
wait_for_init, instance_id):
|
||||||
self.log = log
|
self.log = log
|
||||||
self.config = config
|
self.config = config
|
||||||
self.changes = changes
|
self.changes = changes
|
||||||
self.validate_tenants = validate_tenants
|
self.validate_tenants = validate_tenants
|
||||||
|
self.wait_for_init = wait_for_init
|
||||||
|
|
||||||
# Register connections from the config using fakes
|
# Register connections from the config using fakes
|
||||||
self.connections = TestConnectionRegistry(
|
self.connections = TestConnectionRegistry(
|
||||||
|
@ -4397,7 +4398,8 @@ class SchedulerTestApp:
|
||||||
)
|
)
|
||||||
self.connections.configure(self.config)
|
self.connections.configure(self.config)
|
||||||
|
|
||||||
self.sched = TestScheduler(self.config, self.connections, self)
|
self.sched = TestScheduler(self.config, self.connections, self,
|
||||||
|
wait_for_init)
|
||||||
self.sched.log = logging.getLogger(f"zuul.Scheduler-{instance_id}")
|
self.sched.log = logging.getLogger(f"zuul.Scheduler-{instance_id}")
|
||||||
self.sched._stats_interval = 1
|
self.sched._stats_interval = 1
|
||||||
|
|
||||||
|
@ -4463,13 +4465,12 @@ class SchedulerTestApp:
|
||||||
|
|
||||||
|
|
||||||
class SchedulerTestManager:
|
class SchedulerTestManager:
|
||||||
def __init__(self, validate_tenants):
|
def __init__(self, validate_tenants, wait_for_init):
|
||||||
self.instances = []
|
self.instances = []
|
||||||
self.validate_tenants = validate_tenants
|
|
||||||
|
|
||||||
def create(self, log, config, changes, additional_event_queues,
|
def create(self, log, config, changes, additional_event_queues,
|
||||||
upstream_root, poller_events,
|
upstream_root, poller_events, git_url_with_auth,
|
||||||
git_url_with_auth, add_cleanup, validate_tenants):
|
add_cleanup, validate_tenants, wait_for_init):
|
||||||
# Since the config contains a regex we cannot use copy.deepcopy()
|
# Since the config contains a regex we cannot use copy.deepcopy()
|
||||||
# as this will raise an exception with Python <3.7
|
# as this will raise an exception with Python <3.7
|
||||||
config_data = StringIO()
|
config_data = StringIO()
|
||||||
|
@ -4490,7 +4491,7 @@ class SchedulerTestManager:
|
||||||
additional_event_queues, upstream_root,
|
additional_event_queues, upstream_root,
|
||||||
poller_events,
|
poller_events,
|
||||||
git_url_with_auth, add_cleanup,
|
git_url_with_auth, add_cleanup,
|
||||||
validate_tenants, instance_id)
|
validate_tenants, wait_for_init, instance_id)
|
||||||
self.instances.append(app)
|
self.instances.append(app)
|
||||||
return app
|
return app
|
||||||
|
|
||||||
|
@ -4593,6 +4594,7 @@ class ZuulTestCase(BaseTestCase):
|
||||||
git_url_with_auth: bool = False
|
git_url_with_auth: bool = False
|
||||||
log_console_port: int = 19885
|
log_console_port: int = 19885
|
||||||
validate_tenants = None
|
validate_tenants = None
|
||||||
|
wait_for_init = None
|
||||||
scheduler_count = SCHEDULER_COUNT
|
scheduler_count = SCHEDULER_COUNT
|
||||||
|
|
||||||
def __getattr__(self, name):
|
def __getattr__(self, name):
|
||||||
|
@ -4757,7 +4759,8 @@ class ZuulTestCase(BaseTestCase):
|
||||||
self.history = self.executor_server.build_history
|
self.history = self.executor_server.build_history
|
||||||
self.builds = self.executor_server.running_builds
|
self.builds = self.executor_server.running_builds
|
||||||
|
|
||||||
self.scheds = SchedulerTestManager(self.validate_tenants)
|
self.scheds = SchedulerTestManager(self.validate_tenants,
|
||||||
|
self.wait_for_init)
|
||||||
for _ in range(self.scheduler_count):
|
for _ in range(self.scheduler_count):
|
||||||
self.createScheduler()
|
self.createScheduler()
|
||||||
|
|
||||||
|
@ -4776,7 +4779,7 @@ class ZuulTestCase(BaseTestCase):
|
||||||
self.log, self.config, self.changes,
|
self.log, self.config, self.changes,
|
||||||
self.additional_event_queues, self.upstream_root,
|
self.additional_event_queues, self.upstream_root,
|
||||||
self.poller_events, self.git_url_with_auth,
|
self.poller_events, self.git_url_with_auth,
|
||||||
self.addCleanup, self.validate_tenants)
|
self.addCleanup, self.validate_tenants, self.wait_for_init)
|
||||||
|
|
||||||
def createZKContext(self, lock=None):
|
def createZKContext(self, lock=None):
|
||||||
if lock is None:
|
if lock is None:
|
||||||
|
|
|
@ -8794,3 +8794,20 @@ class TestEventProcessing(ZuulTestCase):
|
||||||
dict(name='tagjob', result='SUCCESS'),
|
dict(name='tagjob', result='SUCCESS'),
|
||||||
dict(name='checkjob', result='SUCCESS', changes='1,1'),
|
dict(name='checkjob', result='SUCCESS', changes='1,1'),
|
||||||
], ordered=False)
|
], ordered=False)
|
||||||
|
|
||||||
|
|
||||||
|
class TestWaitForInit(ZuulTestCase):
|
||||||
|
tenant_config_file = 'config/single-tenant/main.yaml'
|
||||||
|
wait_for_init = True
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
with self.assertLogs('zuul.Scheduler-0', level='DEBUG') as full_logs:
|
||||||
|
super().setUp()
|
||||||
|
self.assertRegexInList('Waiting for tenant initialization',
|
||||||
|
full_logs.output)
|
||||||
|
|
||||||
|
def test_wait_for_init(self):
|
||||||
|
A = self.fake_gerrit.addFakeChange('org/project', 'master', 'A')
|
||||||
|
A.addApproval('Code-Review', 2)
|
||||||
|
self.fake_gerrit.addEvent(A.addApproval('Approved', 1))
|
||||||
|
self.waitUntilSettled()
|
||||||
|
|
|
@ -39,6 +39,10 @@ class Scheduler(zuul.cmd.ZuulDaemonApp):
|
||||||
'listed, all tenants will be validated. '
|
'listed, all tenants will be validated. '
|
||||||
'Note: this requires ZooKeeper and '
|
'Note: this requires ZooKeeper and '
|
||||||
'will distribute work to mergers.')
|
'will distribute work to mergers.')
|
||||||
|
parser.add_argument('--wait-for-init', dest='wait_for_init',
|
||||||
|
action='store_true',
|
||||||
|
help='Wait until all tenants are fully loaded '
|
||||||
|
'before beginning to process events.')
|
||||||
self.addSubCommands(parser, zuul.scheduler.COMMANDS)
|
self.addSubCommands(parser, zuul.scheduler.COMMANDS)
|
||||||
return parser
|
return parser
|
||||||
|
|
||||||
|
@ -82,7 +86,8 @@ class Scheduler(zuul.cmd.ZuulDaemonApp):
|
||||||
|
|
||||||
self.configure_connections(require_sql=True)
|
self.configure_connections(require_sql=True)
|
||||||
self.sched = zuul.scheduler.Scheduler(self.config,
|
self.sched = zuul.scheduler.Scheduler(self.config,
|
||||||
self.connections, self)
|
self.connections, self,
|
||||||
|
self.args.wait_for_init)
|
||||||
if self.args.validate_tenants is None:
|
if self.args.validate_tenants is None:
|
||||||
self.connections.registerScheduler(self.sched)
|
self.connections.registerScheduler(self.sched)
|
||||||
self.connections.load(self.sched.zk_client,
|
self.connections.load(self.sched.zk_client,
|
||||||
|
|
|
@ -184,9 +184,11 @@ class Scheduler(threading.Thread):
|
||||||
_merger_client_class = MergeClient
|
_merger_client_class = MergeClient
|
||||||
_executor_client_class = ExecutorClient
|
_executor_client_class = ExecutorClient
|
||||||
|
|
||||||
def __init__(self, config, connections, app, testonly=False):
|
def __init__(self, config, connections, app, wait_for_init,
|
||||||
|
testonly=False):
|
||||||
threading.Thread.__init__(self)
|
threading.Thread.__init__(self)
|
||||||
self.daemon = True
|
self.daemon = True
|
||||||
|
self.wait_for_init = wait_for_init
|
||||||
self.hostname = socket.getfqdn()
|
self.hostname = socket.getfqdn()
|
||||||
self.primed_event = threading.Event()
|
self.primed_event = threading.Event()
|
||||||
# Wake up the main run loop
|
# Wake up the main run loop
|
||||||
|
@ -1903,6 +1905,9 @@ class Scheduler(threading.Thread):
|
||||||
self.log.debug("Statsd enabled")
|
self.log.debug("Statsd enabled")
|
||||||
else:
|
else:
|
||||||
self.log.debug("Statsd not configured")
|
self.log.debug("Statsd not configured")
|
||||||
|
if self.wait_for_init:
|
||||||
|
self.log.debug("Waiting for tenant initialization")
|
||||||
|
self.primed_event.wait()
|
||||||
while True:
|
while True:
|
||||||
self.log.debug("Run handler sleeping")
|
self.log.debug("Run handler sleeping")
|
||||||
self.wake_event.wait()
|
self.wake_event.wait()
|
||||||
|
|
Loading…
Reference in New Issue