wip: Cache system config in Zookeeper
The Zuul system config consists of the unparsed abide config and the runtime related settings. TODO: * Remove temporary change to not perform a reconfiguration in case the system config in Zookeeper is valid Change-Id: I8aaf52cc66aa44a7eb166cc8e277d8499d7bb86a
This commit is contained in:
parent
a956b639e6
commit
81a27119d0
|
@ -4178,7 +4178,7 @@ class SchedulerTestApp:
|
|||
self.sched.reconfigure_event_queue,
|
||||
]
|
||||
|
||||
def start(self, validate_tenants: list):
|
||||
def start(self, validate_tenants=None):
|
||||
self.sched.start()
|
||||
if validate_tenants is None:
|
||||
self.sched.prime(self.config)
|
||||
|
|
|
@ -0,0 +1,60 @@
|
|||
# Copyright 2021 BMW Group
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from tests.base import iterate_timeout, ZuulTestCase
|
||||
|
||||
|
||||
class TestScaleOutScheduler(ZuulTestCase):
|
||||
tenant_config_file = "config/single-tenant/main.yaml"
|
||||
|
||||
def create_scheduler(self):
|
||||
return self.scheds.create(
|
||||
self.log,
|
||||
self.config,
|
||||
self.changes,
|
||||
self.additional_event_queues,
|
||||
self.upstream_root,
|
||||
self.rpcclient,
|
||||
self.poller_events,
|
||||
self.git_url_with_auth,
|
||||
self.source_only,
|
||||
self.fake_sql,
|
||||
self.addCleanup,
|
||||
self.validate_tenants)
|
||||
|
||||
def test_config_priming(self):
|
||||
for _ in iterate_timeout(10, "Wait until priming is complete"):
|
||||
layout_state = self.scheds.first.sched.tenant_layout_state.get(
|
||||
"tenant-one")
|
||||
if layout_state is not None:
|
||||
break
|
||||
|
||||
# Second scheduler instance
|
||||
app = self.create_scheduler()
|
||||
# Change a system attribute in order to check that the system config
|
||||
# from Zookeeper was used.
|
||||
app.sched.max_hold += 1234
|
||||
app.config.set("scheduler", "max_hold_expiration",
|
||||
str(app.sched.max_hold))
|
||||
app.start()
|
||||
|
||||
self.assertEqual(len(self.scheds), 2)
|
||||
for _ in iterate_timeout(
|
||||
10, "Wait for all schedulers to have the same layout state"):
|
||||
tenants = [s.sched.unparsed_abide.tenants
|
||||
for s in self.scheds.instances]
|
||||
if all(tenants):
|
||||
break
|
||||
|
||||
self.assertEqual(self.scheds.first.sched.max_hold, app.sched.max_hold)
|
|
@ -20,7 +20,7 @@ import testtools
|
|||
from zuul import model
|
||||
from zuul.model import BuildRequest, HoldRequest
|
||||
from zuul.zk import ZooKeeperClient
|
||||
from zuul.zk.config_cache import UnparsedConfigCache
|
||||
from zuul.zk.config_cache import SystemConfigCache, UnparsedConfigCache
|
||||
from zuul.zk.exceptions import LockException
|
||||
from zuul.zk.executor import ExecutorApi, BuildRequestEvent
|
||||
from zuul.zk.layout import LayoutStateStore, LayoutState
|
||||
|
@ -702,3 +702,47 @@ class TestLayoutStore(ZooKeeperBaseTestCase):
|
|||
self.assertEqual(state, store["tenant"])
|
||||
self.assertNotEqual(state.ltime, -1)
|
||||
self.assertNotEqual(store["tenant"].ltime, -1)
|
||||
|
||||
|
||||
class TestSystemConfigCache(ZooKeeperBaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.config_cache = SystemConfigCache(self.zk_client)
|
||||
|
||||
def test_set_get(self):
|
||||
uac = model.UnparsedAbideConfig()
|
||||
uac.tenants = {"foo": "bar"}
|
||||
uac.admin_rules = ["bar", "foo"]
|
||||
attrs = {"dead": "beef"}
|
||||
self.config_cache.set(uac, attrs)
|
||||
|
||||
uac_cached, cached_attrs = self.config_cache.get()
|
||||
self.assertEqual(uac.uuid, uac_cached.uuid)
|
||||
self.assertEqual(uac.tenants, uac_cached.tenants)
|
||||
self.assertEqual(uac.admin_rules, uac_cached.admin_rules)
|
||||
self.assertEqual(attrs, cached_attrs)
|
||||
|
||||
def test_cache_empty(self):
|
||||
with testtools.ExpectedException(RuntimeError):
|
||||
self.config_cache.get()
|
||||
|
||||
def test_ltime(self):
|
||||
uac = model.UnparsedAbideConfig()
|
||||
|
||||
self.assertEqual(self.config_cache.ltime, -1)
|
||||
|
||||
self.config_cache.set(uac, {})
|
||||
self.assertGreater(self.config_cache.ltime, -1)
|
||||
|
||||
old_ltime = self.config_cache.ltime
|
||||
self.config_cache.set(uac, {})
|
||||
self.assertGreater(self.config_cache.ltime, old_ltime)
|
||||
|
||||
def test_valid(self):
|
||||
uac = model.UnparsedAbideConfig()
|
||||
|
||||
self.assertFalse(self.config_cache.is_valid)
|
||||
|
||||
self.config_cache.set(uac, {})
|
||||
self.assertTrue(self.config_cache.is_valid)
|
||||
|
|
|
@ -4491,6 +4491,7 @@ class UnparsedAbideConfig(object):
|
|||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.uuid = uuid4().hex
|
||||
self.tenants = {}
|
||||
self.admin_rules = []
|
||||
|
||||
|
@ -4519,6 +4520,21 @@ class UnparsedAbideConfig(object):
|
|||
else:
|
||||
raise ConfigItemUnknownError(item)
|
||||
|
||||
def toDict(self):
|
||||
return {
|
||||
"uuid": self.uuid,
|
||||
"tenants": self.tenants,
|
||||
"admin_rules": self.admin_rules,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def fromDict(cls, data):
|
||||
unparsed_abide = cls()
|
||||
unparsed_abide.uuid = data["uuid"]
|
||||
unparsed_abide.tenants = data["tenants"]
|
||||
unparsed_abide.admin_rules = data["admin_rules"]
|
||||
return unparsed_abide
|
||||
|
||||
|
||||
class UnparsedConfig(object):
|
||||
"""A collection of yaml lists that has not yet been parsed into objects."""
|
||||
|
|
|
@ -73,7 +73,7 @@ from zuul.zk.cleanup import SemaphoreCleanupLock, BuildRequestCleanupLock
|
|||
from zuul.zk.components import (
|
||||
BaseComponent, ComponentRegistry, SchedulerComponent
|
||||
)
|
||||
from zuul.zk.config_cache import UnparsedConfigCache
|
||||
from zuul.zk.config_cache import SystemConfigCache, UnparsedConfigCache
|
||||
from zuul.zk.event_queues import (
|
||||
EventWatcher,
|
||||
TenantManagementEventQueue,
|
||||
|
@ -167,6 +167,7 @@ class Scheduler(threading.Thread):
|
|||
self.component_info = SchedulerComponent(self.zk_client, self.hostname)
|
||||
self.component_info.register()
|
||||
self.component_registry = ComponentRegistry(self.zk_client)
|
||||
self.system_config_cache = SystemConfigCache(self.zk_client)
|
||||
self.unparsed_config_cache = UnparsedConfigCache(self.zk_client)
|
||||
|
||||
# TODO (swestphahl): Remove after we've refactored reconfigurations
|
||||
|
@ -261,6 +262,24 @@ class Scheduler(threading.Thread):
|
|||
self.websocket_url = get_default(
|
||||
self.config, 'web', 'websocket_url', None)
|
||||
|
||||
def _getSystemConfigAttributes(self):
|
||||
return {
|
||||
"use_relative_priority": self.use_relative_priority,
|
||||
"default_hold": self.default_hold,
|
||||
"max_hold": self.max_hold,
|
||||
"web_root": self.web_root,
|
||||
"web_status_url": self.web_status_url,
|
||||
"websocket_url": self.websocket_url,
|
||||
}
|
||||
|
||||
def _updateSystemConfigAttributes(self, data):
|
||||
self.use_relative_priority = data["use_relative_priority"]
|
||||
self.default_hold = data["default_hold"]
|
||||
self.max_hold = data["max_hold"]
|
||||
self.web_root = data["web_root"]
|
||||
self.web_status_url = data["web_status_url"]
|
||||
self.websocket_url = data["websocket_url"]
|
||||
|
||||
def start(self):
|
||||
super(Scheduler, self).start()
|
||||
self.keystore = ZooKeeperKeyStorage(
|
||||
|
@ -653,9 +672,17 @@ class Scheduler(threading.Thread):
|
|||
|
||||
def prime(self, config):
|
||||
self.log.debug("Priming scheduler config")
|
||||
event = ReconfigureEvent()
|
||||
event.zuul_event_ltime = self.zk_client.getCurrentLtime()
|
||||
self._doReconfigureEvent(event)
|
||||
if self.system_config_cache.is_valid:
|
||||
self.log.info("Using system config from Zookeeper")
|
||||
with self.layout_lock:
|
||||
self.unparsed_abide, system_attributes = (
|
||||
self.system_config_cache.get())
|
||||
self._updateSystemConfigAttributes(system_attributes)
|
||||
else:
|
||||
|
||||
event = ReconfigureEvent()
|
||||
event.zuul_event_ltime = self.zk_client.getCurrentLtime()
|
||||
self._doReconfigureEvent(event)
|
||||
self.log.debug("Config priming complete")
|
||||
self.last_reconfigured = int(time.time())
|
||||
|
||||
|
@ -906,6 +933,9 @@ class Scheduler(threading.Thread):
|
|||
old_unparsed_abide = self.unparsed_abide
|
||||
self.unparsed_abide = loader.readConfig(
|
||||
tenant_config, from_script=script)
|
||||
# Cache system config in Zookeeper
|
||||
self.system_config_cache.set(self.unparsed_abide,
|
||||
self._getSystemConfigAttributes())
|
||||
|
||||
# We need to handle new and deleted tenants, so we need to process
|
||||
# all tenants currently known and the new ones.
|
||||
|
|
|
@ -21,6 +21,7 @@ from urllib.parse import quote_plus, unquote_plus
|
|||
|
||||
from kazoo.exceptions import NoNodeError
|
||||
|
||||
from zuul import model
|
||||
from zuul.zk import sharding, ZooKeeperSimpleBase
|
||||
|
||||
|
||||
|
@ -168,3 +169,48 @@ class UnparsedConfigCache(ZooKeeperSimpleBase):
|
|||
path = _safe_path(self.cache_path, project_cname, branch_name)
|
||||
with contextlib.suppress(NoNodeError):
|
||||
self.kazoo_client.delete(path, recursive=True)
|
||||
|
||||
|
||||
class SystemConfigCache(ZooKeeperSimpleBase):
|
||||
|
||||
SYSTEM_ROOT = "/zuul/system"
|
||||
log = logging.getLogger("zuul.zk.SystemConfigCache")
|
||||
|
||||
def __init__(self, client):
|
||||
super().__init__(client)
|
||||
self.conf_path = f"{self.SYSTEM_ROOT}/conf"
|
||||
self.lock_path = f"{self.SYSTEM_ROOT}/conf-lock"
|
||||
|
||||
@property
|
||||
def ltime(self):
|
||||
with self.kazoo_client.ReadLock(self.lock_path):
|
||||
zstat = self.kazoo_client.exists(self.conf_path)
|
||||
return -1 if zstat is None else zstat.last_modified_transaction_id
|
||||
|
||||
@property
|
||||
def is_valid(self):
|
||||
return self.ltime > 0
|
||||
|
||||
def get(self):
|
||||
with self.kazoo_client.ReadLock(self.lock_path):
|
||||
try:
|
||||
with sharding.BufferedShardReader(
|
||||
self.kazoo_client, self.conf_path
|
||||
) as stream:
|
||||
data = json.loads(stream.read())
|
||||
except Exception:
|
||||
raise RuntimeError("No valid system config")
|
||||
return (model.UnparsedAbideConfig.fromDict(data["unparsed_abide"]),
|
||||
data["system_attributes"])
|
||||
|
||||
def set(self, unparsed_abide, system_attributes):
|
||||
with self.kazoo_client.WriteLock(self.lock_path):
|
||||
data = {
|
||||
"unparsed_abide": unparsed_abide.toDict(),
|
||||
"system_attributes": system_attributes,
|
||||
}
|
||||
with sharding.BufferedShardWriter(
|
||||
self.kazoo_client, self.conf_path
|
||||
) as stream:
|
||||
stream.truncate(0)
|
||||
stream.write(json.dumps(data).encode("utf8"))
|
||||
|
|
Loading…
Reference in New Issue