Cache system config in Zookeeper
The Zuul system config consists of the unparsed abide config and runtime related attributes from the Zuul config file. In order for this config to be consistent on all active schedulers, we also need to store it in Zookeeper. The system config will be updated during processing of a reconfiguration event. Other schedulers can detect the updated system config and perform a local update. Change-Id: I8aaf52cc66aa44a7eb166cc8e277d8499d7bb86a
This commit is contained in:
parent
8aa2ad2db0
commit
d5a2becd11
|
@ -20,7 +20,7 @@ import testtools
|
|||
from zuul import model
|
||||
from zuul.model import BuildRequest, HoldRequest
|
||||
from zuul.zk import ZooKeeperClient
|
||||
from zuul.zk.config_cache import UnparsedConfigCache
|
||||
from zuul.zk.config_cache import SystemConfigCache, UnparsedConfigCache
|
||||
from zuul.zk.exceptions import LockException
|
||||
from zuul.zk.executor import ExecutorApi, BuildRequestEvent
|
||||
from zuul.zk.layout import LayoutStateStore, LayoutState
|
||||
|
@ -725,3 +725,62 @@ class TestLayoutStore(ZooKeeperBaseTestCase):
|
|||
self.assertEqual(state, store["tenant"])
|
||||
self.assertNotEqual(state.ltime, -1)
|
||||
self.assertNotEqual(store["tenant"].ltime, -1)
|
||||
|
||||
|
||||
class TestSystemConfigCache(ZooKeeperBaseTestCase):
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
self.config_cache = SystemConfigCache(self.zk_client)
|
||||
|
||||
def test_set_get(self):
|
||||
uac = model.UnparsedAbideConfig()
|
||||
uac.tenants = {"foo": "bar"}
|
||||
uac.admin_rules = ["bar", "foo"]
|
||||
attrs = model.SystemAttributes.fromDict({
|
||||
"use_relative_priority": True,
|
||||
"max_hold_expiration": 7200,
|
||||
"default_hold_expiration": 3600,
|
||||
"default_ansible_version": "2.9",
|
||||
"web_root": "/web/root",
|
||||
"web_status_url": "/web/status",
|
||||
"websocket_url": "/web/socket",
|
||||
})
|
||||
self.config_cache.set(uac, attrs)
|
||||
|
||||
uac_cached, cached_attrs = self.config_cache.get()
|
||||
self.assertEqual(uac.uuid, uac_cached.uuid)
|
||||
self.assertEqual(uac.tenants, uac_cached.tenants)
|
||||
self.assertEqual(uac.admin_rules, uac_cached.admin_rules)
|
||||
self.assertEqual(attrs, cached_attrs)
|
||||
|
||||
def test_cache_empty(self):
|
||||
with testtools.ExpectedException(RuntimeError):
|
||||
self.config_cache.get()
|
||||
|
||||
def test_ltime(self):
|
||||
uac = model.UnparsedAbideConfig()
|
||||
attrs = model.SystemAttributes()
|
||||
|
||||
self.assertEqual(self.config_cache.ltime, -1)
|
||||
|
||||
self.config_cache.set(uac, attrs)
|
||||
self.assertGreater(self.config_cache.ltime, -1)
|
||||
self.assertEqual(uac.ltime, self.config_cache.ltime)
|
||||
|
||||
old_ltime = self.config_cache.ltime
|
||||
self.config_cache.set(uac, attrs)
|
||||
self.assertGreater(self.config_cache.ltime, old_ltime)
|
||||
self.assertEqual(uac.ltime, self.config_cache.ltime)
|
||||
|
||||
cache_uac, _ = self.config_cache.get()
|
||||
self.assertEqual(uac.ltime, cache_uac.ltime)
|
||||
|
||||
def test_valid(self):
|
||||
uac = model.UnparsedAbideConfig()
|
||||
attrs = model.SystemAttributes()
|
||||
|
||||
self.assertFalse(self.config_cache.is_valid)
|
||||
|
||||
self.config_cache.set(uac, attrs)
|
||||
self.assertTrue(self.config_cache.is_valid)
|
||||
|
|
|
@ -4403,6 +4403,18 @@ class SystemAttributes:
|
|||
self.web_status_url = ""
|
||||
self.websocket_url = None
|
||||
|
||||
def __eq__(self, other):
|
||||
if not isinstance(other, self.__class__):
|
||||
return False
|
||||
return (
|
||||
self.use_relative_priority == other.use_relative_priority
|
||||
and self.max_hold_expiration == other.max_hold_expiration
|
||||
and self.default_hold_expiration == other.default_hold_expiration
|
||||
and self.default_ansible_version == other.default_ansible_version
|
||||
and self.web_root == other.web_root
|
||||
and self.web_status_url == other.web_status_url
|
||||
and self.websocket_url == other.websocket_url)
|
||||
|
||||
@classmethod
|
||||
def fromConfig(cls, config):
|
||||
sys_attrs = cls()
|
||||
|
@ -4572,6 +4584,8 @@ class UnparsedAbideConfig(object):
|
|||
"""
|
||||
|
||||
def __init__(self):
|
||||
self.uuid = uuid4().hex
|
||||
self.ltime = -1
|
||||
self.tenants = {}
|
||||
self.admin_rules = []
|
||||
|
||||
|
@ -4600,6 +4614,22 @@ class UnparsedAbideConfig(object):
|
|||
else:
|
||||
raise ConfigItemUnknownError(item)
|
||||
|
||||
def toDict(self):
|
||||
return {
|
||||
"uuid": self.uuid,
|
||||
"tenants": self.tenants,
|
||||
"admin_rules": self.admin_rules,
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def fromDict(cls, data, ltime):
|
||||
unparsed_abide = cls()
|
||||
unparsed_abide.uuid = data["uuid"]
|
||||
unparsed_abide.ltime = ltime
|
||||
unparsed_abide.tenants = data["tenants"]
|
||||
unparsed_abide.admin_rules = data["admin_rules"]
|
||||
return unparsed_abide
|
||||
|
||||
|
||||
class UnparsedConfig(object):
|
||||
"""A collection of yaml lists that has not yet been parsed into objects."""
|
||||
|
|
|
@ -78,7 +78,7 @@ from zuul.zk.cleanup import (
|
|||
from zuul.zk.components import (
|
||||
BaseComponent, ComponentRegistry, SchedulerComponent
|
||||
)
|
||||
from zuul.zk.config_cache import UnparsedConfigCache
|
||||
from zuul.zk.config_cache import SystemConfigCache, UnparsedConfigCache
|
||||
from zuul.zk.event_queues import (
|
||||
EventWatcher,
|
||||
TenantManagementEventQueue,
|
||||
|
@ -173,6 +173,7 @@ class Scheduler(threading.Thread):
|
|||
self.component_info = SchedulerComponent(self.zk_client, self.hostname)
|
||||
self.component_info.register()
|
||||
self.component_registry = ComponentRegistry(self.zk_client)
|
||||
self.system_config_cache = SystemConfigCache(self.zk_client)
|
||||
self.unparsed_config_cache = UnparsedConfigCache(self.zk_client)
|
||||
|
||||
# TODO (swestphahl): Remove after we've refactored reconfigurations
|
||||
|
@ -902,6 +903,8 @@ class Scheduler(threading.Thread):
|
|||
old_unparsed_abide = self.unparsed_abide
|
||||
self.unparsed_abide = loader.readConfig(
|
||||
tenant_config, from_script=script)
|
||||
# Cache system config in Zookeeper
|
||||
self.system_config_cache.set(self.unparsed_abide, self.globals)
|
||||
|
||||
# We need to handle new and deleted tenants, so we need to process
|
||||
# all tenants currently known and the new ones.
|
||||
|
|
|
@ -21,6 +21,7 @@ from urllib.parse import quote_plus, unquote_plus
|
|||
|
||||
from kazoo.exceptions import NoNodeError
|
||||
|
||||
from zuul import model
|
||||
from zuul.zk import sharding, ZooKeeperSimpleBase
|
||||
from zuul.zk.vendor import lock
|
||||
|
||||
|
@ -170,3 +171,62 @@ class UnparsedConfigCache(ZooKeeperSimpleBase):
|
|||
path = _safe_path(self.cache_path, project_cname, branch_name)
|
||||
with contextlib.suppress(NoNodeError):
|
||||
self.kazoo_client.delete(path, recursive=True)
|
||||
|
||||
|
||||
class SystemConfigCache(ZooKeeperSimpleBase):
|
||||
"""Zookeeper cache for Zuul system configuration.
|
||||
|
||||
The system configuration consists of the unparsed abide config and
|
||||
the runtime related settings from the Zuul config file.
|
||||
"""
|
||||
|
||||
SYSTEM_ROOT = "/zuul/system"
|
||||
log = logging.getLogger("zuul.zk.SystemConfigCache")
|
||||
|
||||
def __init__(self, client):
|
||||
super().__init__(client)
|
||||
self.conf_path = f"{self.SYSTEM_ROOT}/conf"
|
||||
self.lock_path = f"{self.SYSTEM_ROOT}/conf-lock"
|
||||
|
||||
@property
|
||||
def ltime(self):
|
||||
with lock.ReadLock(self.kazoo_client, self.lock_path):
|
||||
zstat = self.kazoo_client.exists(self.conf_path)
|
||||
return -1 if zstat is None else zstat.last_modified_transaction_id
|
||||
|
||||
@property
|
||||
def is_valid(self):
|
||||
return self.ltime > 0
|
||||
|
||||
def get(self):
|
||||
"""Get the system configuration from Zookeeper.
|
||||
|
||||
:returns: A tuple (unparsed abide config, system attributes)
|
||||
"""
|
||||
with lock.ReadLock(self.kazoo_client, self.lock_path):
|
||||
try:
|
||||
with sharding.BufferedShardReader(
|
||||
self.kazoo_client, self.conf_path
|
||||
) as stream:
|
||||
data = json.loads(stream.read())
|
||||
except Exception:
|
||||
raise RuntimeError("No valid system config")
|
||||
zstat = self.kazoo_client.exists(self.conf_path)
|
||||
return (model.UnparsedAbideConfig.fromDict(
|
||||
data["unparsed_abide"],
|
||||
ltime=zstat.last_modified_transaction_id
|
||||
), model.SystemAttributes.fromDict(data["system_attributes"]))
|
||||
|
||||
def set(self, unparsed_abide, system_attributes):
|
||||
with lock.WriteLock(self.kazoo_client, self.lock_path):
|
||||
data = {
|
||||
"unparsed_abide": unparsed_abide.toDict(),
|
||||
"system_attributes": system_attributes.toDict(),
|
||||
}
|
||||
with sharding.BufferedShardWriter(
|
||||
self.kazoo_client, self.conf_path
|
||||
) as stream:
|
||||
stream.truncate(0)
|
||||
stream.write(json.dumps(data).encode("utf8"))
|
||||
zstat = self.kazoo_client.exists(self.conf_path)
|
||||
unparsed_abide.ltime = zstat.last_modified_transaction_id
|
||||
|
|
Loading…
Reference in New Issue