Support cross scheduler config loading

On startup a scheduler will use the cached system config from Zookeeper
in case it exists. Otherwise it will load/generate the config and store
it in the cache.

The scheduler will then load the configured tenants as follows:

- In case a layout state already exists the scheduler willl acquire the
  tenant READ lock, read the cached unparsed config from Zookeeper and
  load the tenant.
- If there is no existing layout state for a tenant the scheduler
  acquires the tenant WRITE lock, loads the tenant and creates a new
  layout state in Zookeeper.

This changes the behavior during config priming in that we no longer
perform a full reconfiguration on scheduler startup!

During processing of a tenant reconfiguration event, the layout state in
Zookeeper for that tenant is updated. All other schedulers are notified
of the changed layout state via a data watch, which will set the wake
event of the run handler. Similarliy, a changed system config in
Zookeeper will also wake up the run handler.

The system config and tenanty layouts are direcly updated in the run
handler when an outdated config/layout is detected.

Change-Id: I48b145cecec5890d212d751f68b3947f04ee1ed5
This commit is contained in:
Simon Westphahl 2021-07-28 15:33:25 +02:00
parent d5a2becd11
commit 25ef070f5e
6 changed files with 268 additions and 22 deletions

View File

@ -4169,7 +4169,7 @@ class SchedulerTestApp:
self.sched.reconfigure_event_queue,
]
def start(self, validate_tenants: list):
def start(self, validate_tenants=None):
self.sched.start()
if validate_tenants is None:
self.sched.prime(self.config)

99
tests/unit/test_sos.py Normal file
View File

@ -0,0 +1,99 @@
# Copyright 2021 BMW Group
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from tests.base import iterate_timeout, ZuulTestCase
class TestScaleOutScheduler(ZuulTestCase):
tenant_config_file = "config/single-tenant/main.yaml"
def create_scheduler(self):
return self.scheds.create(
self.log,
self.config,
self.changes,
self.additional_event_queues,
self.upstream_root,
self.rpcclient,
self.poller_events,
self.git_url_with_auth,
self.fake_sql,
self.addCleanup,
self.validate_tenants)
def test_config_priming(self):
for _ in iterate_timeout(10, "Wait until priming is complete"):
layout_state = self.scheds.first.sched.tenant_layout_state.get(
"tenant-one")
if layout_state is not None:
break
# Second scheduler instance
app = self.create_scheduler()
# Change a system attribute in order to check that the system config
# from Zookeeper was used.
app.sched.globals.max_hold_expiration += 1234
app.config.set("scheduler", "max_hold_expiration",
str(app.sched.globals.max_hold_expiration))
app.start()
for _ in iterate_timeout(
10, "Wait for all schedulers to have the same layout state"):
layout_states = [s.sched.local_layout_state.get("tenant-one")
for s in self.scheds.instances]
if all(l == layout_state for l in layout_states):
break
for app in self.scheds.instances:
if app is self.scheds.first:
self.assertIsNotNone(
app.sched.merger.history.get("merger:cat"))
else:
# Make sure the other schedulers did not issue any cat jobs
self.assertIsNone(app.sched.merger.history.get("merger:cat"))
self.waitUntilSettled()
self.assertEqual(self.scheds.first.sched.globals.max_hold_expiration,
app.sched.globals.max_hold_expiration)
def test_reconfigure(self):
# Create a second scheduler instance
app = self.create_scheduler()
app.start()
self.assertEqual(len(self.scheds), 2)
for _ in iterate_timeout(10, "Wait until priming is complete"):
old = self.scheds.first.sched.tenant_layout_state.get("tenant-one")
if old is not None:
break
for _ in iterate_timeout(
10, "Wait for all schedulers to have the same layout state"):
layout_states = [a.sched.local_layout_state.get("tenant-one")
for a in self.scheds.instances]
if all(l == old for l in layout_states):
break
self.scheds.first.sched.reconfigure(self.scheds.first.config)
self.waitUntilSettled()
new = self.scheds.first.sched.tenant_layout_state["tenant-one"]
self.assertNotEqual(old, new)
for _ in iterate_timeout(10, "Wait for all schedulers to update"):
layout_states = [a.sched.local_layout_state.get("tenant-one")
for a in self.scheds.instances]
if all(l == new for l in layout_states):
break
self.waitUntilSettled()

View File

@ -719,19 +719,25 @@ class TestLocks(ZooKeeperBaseTestCase):
class TestLayoutStore(ZooKeeperBaseTestCase):
def test_layout_state(self):
store = LayoutStateStore(self.zk_client)
store = LayoutStateStore(self.zk_client, lambda: None)
state = LayoutState("tenant", "hostname", 0)
store["tenant"] = state
self.assertEqual(state, store["tenant"])
self.assertNotEqual(state.ltime, -1)
self.assertNotEqual(store["tenant"].ltime, -1)
def test_ordering(self):
state_one = LayoutState("tenant", "hostname", 1, ltime=1)
state_two = LayoutState("tenant", "hostname", 2, ltime=2)
self.assertGreater(state_two, state_one)
class TestSystemConfigCache(ZooKeeperBaseTestCase):
def setUp(self):
super().setUp()
self.config_cache = SystemConfigCache(self.zk_client)
self.config_cache = SystemConfigCache(self.zk_client, lambda: None)
def test_set_get(self):
uac = model.UnparsedAbideConfig()

View File

@ -24,6 +24,7 @@ import threading
import time
import traceback
import uuid
from contextlib import suppress
from collections import defaultdict
from apscheduler.schedulers.background import BackgroundScheduler
@ -173,7 +174,8 @@ class Scheduler(threading.Thread):
self.component_info = SchedulerComponent(self.zk_client, self.hostname)
self.component_info.register()
self.component_registry = ComponentRegistry(self.zk_client)
self.system_config_cache = SystemConfigCache(self.zk_client)
self.system_config_cache = SystemConfigCache(self.zk_client,
self.wake_event.set)
self.unparsed_config_cache = UnparsedConfigCache(self.zk_client)
# TODO (swestphahl): Remove after we've refactored reconfigurations
@ -208,7 +210,9 @@ class Scheduler(threading.Thread):
self.abide = Abide()
self.unparsed_abide = UnparsedAbideConfig()
self.tenant_layout_state = LayoutStateStore(self.zk_client)
self.tenant_layout_state = LayoutStateStore(self.zk_client,
self.wake_event.set)
self.local_layout_state = {}
if not testonly:
time_dir = self._get_time_database_dir()
@ -638,12 +642,52 @@ class Scheduler(threading.Thread):
self.repl = None
def prime(self, config):
self.log.debug("Priming scheduler config")
event = ReconfigureEvent()
event.zuul_event_ltime = self.zk_client.getCurrentLtime()
self._doReconfigureEvent(event)
self.log.debug("Config priming complete")
self.last_reconfigured = int(time.time())
self.log.info("Priming scheduler config")
start = time.monotonic()
if self.system_config_cache.is_valid:
self.log.info("Using system config from Zookeeper")
self.updateSystemConfig()
else:
self.log.info("Creating initial system config")
self.primeSystemConfig()
loader = configloader.ConfigLoader(
self.connections, self, self.merger, self.keystore)
new_tenants = (set(self.unparsed_abide.tenants)
- self.abide.tenants.keys())
with self.layout_lock:
for tenant_name in new_tenants:
layout_state = self.tenant_layout_state.get(tenant_name)
# In case we don't have a cached layout state we need to
# acquire the write lock since we load a new tenant.
if layout_state is None:
tlock = tenant_write_lock(self.zk_client, tenant_name)
else:
tlock = tenant_read_lock(self.zk_client, tenant_name)
# Consider all caches valid (min. ltime -1)
min_ltimes = defaultdict(lambda: defaultdict(lambda: -1))
with tlock:
tenant = loader.loadTenant(
self.abide, tenant_name, self.ansible_manager,
self.unparsed_abide, min_ltimes=min_ltimes)
# Refresh the layout state now that we are holding the lock
# and we can be sure it won't be changed concurrently.
layout_state = self.tenant_layout_state.get(tenant_name)
if layout_state is None:
# Reconfigure only tenants w/o an existing layout state
self._reconfigureTenant(tenant)
else:
self.local_layout_state[tenant_name] = layout_state
self.connections.reconfigureDrivers(tenant)
duration = round(time.monotonic() - start, 3)
self.log.info("Config priming complete (duration: %s seconds)",
duration)
self.wake_event.set()
def reconfigure(self, config, smart=False):
self.log.debug("Submitting reconfiguration event")
@ -815,6 +859,29 @@ class Scheduler(threading.Thread):
"current mode is %o" % (key_dir, mode))
return key_dir
def updateTenantLayout(self, tenant_name):
self.log.debug("Updating layout of tenant %s", tenant_name)
if self.unparsed_abide.ltime < self.system_config_cache.ltime:
self.updateSystemConfig()
# Consider all caches valid (min. ltime -1)
min_ltimes = defaultdict(lambda: defaultdict(lambda: -1))
loader = configloader.ConfigLoader(
self.connections, self, self.merger, self.keystore)
with self.layout_lock:
self.log.debug("Updating local layout of tenant %s ", tenant_name)
tenant = loader.loadTenant(self.abide, tenant_name,
self.ansible_manager,
self.unparsed_abide,
min_ltimes=min_ltimes)
if tenant is not None:
layout_state = self.tenant_layout_state[tenant.name]
self.local_layout_state[tenant_name] = layout_state
self.connections.reconfigureDrivers(tenant)
else:
with suppress(KeyError):
del self.local_layout_state[tenant_name]
def _checkTenantSourceConf(self, config):
tenant_config = None
script = False
@ -956,6 +1023,9 @@ class Scheduler(threading.Thread):
def _doTenantReconfigureEvent(self, event):
# This is called in the scheduler loop after another thread submits
# a request
if self.unparsed_abide.ltime < self.system_config_cache.ltime:
self.updateSystemConfig()
with self.layout_lock:
self.log.info("Tenant reconfiguration beginning for %s due to "
"projects %s",
@ -1167,6 +1237,9 @@ class Scheduler(threading.Thread):
hostname=self.hostname,
last_reconfigured=int(time.time()),
)
# We need to update the local layout state before the remote state,
# to avoid race conditions in the layout changed callback.
self.local_layout_state[tenant.name] = layout_state
self.tenant_layout_state[tenant.name] = layout_state
if self.statsd:
@ -1374,20 +1447,40 @@ class Scheduler(threading.Thread):
if not self._stopped:
self.process_reconfigure_queue()
# Process tenant management events separate from other events
# as they might reload the tenant.
for tenant in self.abide.tenants.values():
if not self._stopped:
# This will also forward events for the pipelines
# (e.g. enqueue or dequeue events) to the matching
# pipeline event queues that are processed afterwards.
self.process_tenant_management_queue(tenant)
if self.unparsed_abide.ltime < self.system_config_cache.ltime:
self.updateSystemConfig()
for tenant_name in self.unparsed_abide.tenants:
if self._stopped:
break
tenant = self.abide.tenants.get(tenant_name)
if not tenant:
continue
# This will also forward events for the pipelines
# (e.g. enqueue or dequeue events) to the matching
# pipeline event queues that are processed afterwards.
self.process_tenant_management_queue(tenant)
if self._stopped:
break
for tenant in self.abide.tenants.values():
try:
with tenant_read_lock(
self.zk_client, tenant.name, blocking=False
self.zk_client, tenant_name, blocking=False
):
if (self.tenant_layout_state[tenant_name]
> self.local_layout_state[tenant_name]):
self.log.debug(
"Local layout of tenant %s not up to date",
tenant.name)
self.updateTenantLayout(tenant_name)
# Get tenant again, as it might have been updated
# by a tenant reconfig or layout change.
tenant = self.abide.tenants[tenant_name]
if not self._stopped:
# This will forward trigger events to pipeline
# event queues that are processed below.
@ -1397,6 +1490,11 @@ class Scheduler(threading.Thread):
except LockException:
self.log.debug("Skipping locked tenant %s",
tenant.name)
if (self.tenant_layout_state[tenant_name]
> self.local_layout_state[tenant_name]):
# Let's keep looping until we've updated to the
# latest tenant layout.
self.wake_event.set()
except Exception:
self.log.exception("Exception in run handler:")
# There may still be more events to process
@ -1404,6 +1502,28 @@ class Scheduler(threading.Thread):
finally:
self.run_handler_lock.release()
def primeSystemConfig(self):
with self.layout_lock:
loader = configloader.ConfigLoader(
self.connections, self, self.merger, self.keystore)
tenant_config, script = self._checkTenantSourceConf(self.config)
self.unparsed_abide = loader.readConfig(
tenant_config, from_script=script)
self.system_config_cache.set(self.unparsed_abide, self.globals)
loader.loadTPCs(self.abide, self.unparsed_abide)
loader.loadAdminRules(self.abide, self.unparsed_abide)
def updateSystemConfig(self):
with self.layout_lock:
self.unparsed_abide, self.globals = self.system_config_cache.get()
self.ansible_manager = AnsibleManager(
default_version=self.globals.default_ansible_version)
loader = configloader.ConfigLoader(
self.connections, self, self.merger, self.keystore)
loader.loadTPCs(self.abide, self.unparsed_abide)
loader.loadAdminRules(self.abide, self.unparsed_abide)
def process_pipelines(self, tenant):
for pipeline in tenant.layout.pipelines.values():
if self._stopped:

View File

@ -183,10 +183,15 @@ class SystemConfigCache(ZooKeeperSimpleBase):
SYSTEM_ROOT = "/zuul/system"
log = logging.getLogger("zuul.zk.SystemConfigCache")
def __init__(self, client):
def __init__(self, client, callback):
super().__init__(client)
self.conf_path = f"{self.SYSTEM_ROOT}/conf"
self.lock_path = f"{self.SYSTEM_ROOT}/conf-lock"
self._callback = callback
self.kazoo_client.DataWatch(self.conf_path, self._configChanged)
def _configChanged(self, data, stat, event):
self._callback()
@property
def ltime(self):

View File

@ -93,6 +93,22 @@ class LayoutStateStore(ZooKeeperBase, MutableMapping):
layout_root = "/zuul/layout"
def __init__(self, client, callback):
super().__init__(client)
self._watched_tenants = set()
self._callback = callback
self.kazoo_client.ensure_path(self.layout_root)
self.kazoo_client.ChildrenWatch(self.layout_root, self._layoutCallback)
def _layoutCallback(self, tenant_list, event=None):
new_tenants = set(tenant_list) - self._watched_tenants
for tenant_name in new_tenants:
self.kazoo_client.DataWatch(f"{self.layout_root}/{tenant_name}",
self._callbackWrapper)
def _callbackWrapper(self, data, stat, event):
self._callback()
def __getitem__(self, tenant_name):
try:
data, zstat = self.kazoo_client.get(