Load system config and tenant layouts in zuul-web
This uses the configloader in zuul-web to load the system config and tenant layouts directly from ZooKeeper. Doing so will allow us to provide the necessary information for most API endpoints directly in zuul-web without the need to ask the scheduler via RPC for it. Change-Id: I4fe19c4e41f3357a07b2fda939c5ffb4e7055e37
This commit is contained in:
@@ -4047,12 +4047,8 @@ class ZuulWebFixture(fixtures.Fixture):
|
||||
self.connections = TestConnectionRegistry(
|
||||
changes, config, additional_event_queues, upstream_root, rpcclient,
|
||||
poller_events, git_url_with_auth, add_cleanup, fake_sql)
|
||||
self.connections.configure(
|
||||
config,
|
||||
include_drivers=[zuul.driver.sql.SQLDriver,
|
||||
GithubDriverMock,
|
||||
GitlabDriverMock,
|
||||
PagureDriverMock])
|
||||
self.connections.configure(config)
|
||||
|
||||
self.authenticators = zuul.lib.auth.AuthenticatorRegistry()
|
||||
self.authenticators.configure(config)
|
||||
if info is None:
|
||||
|
||||
@@ -162,11 +162,9 @@ class ZuulApp(object):
|
||||
logging_config.setDebug()
|
||||
logging_config.apply()
|
||||
|
||||
def configure_connections(self, source_only=False, include_drivers=None,
|
||||
require_sql=False):
|
||||
def configure_connections(self, source_only=False, require_sql=False):
|
||||
self.connections = zuul.lib.connections.ConnectionRegistry()
|
||||
self.connections.configure(self.config, source_only, include_drivers,
|
||||
require_sql)
|
||||
self.connections.configure(self.config, source_only, require_sql)
|
||||
|
||||
|
||||
class ZuulDaemonApp(ZuulApp, metaclass=abc.ABCMeta):
|
||||
|
||||
@@ -88,12 +88,7 @@ class WebServer(zuul.cmd.ZuulDaemonApp):
|
||||
self.log = logging.getLogger("zuul.WebServer")
|
||||
|
||||
try:
|
||||
self.configure_connections(
|
||||
include_drivers=[zuul.driver.sql.SQLDriver,
|
||||
zuul.driver.github.GithubDriver,
|
||||
zuul.driver.pagure.PagureDriver,
|
||||
zuul.driver.gitlab.GitlabDriver],
|
||||
require_sql=True)
|
||||
self.configure_connections(require_sql=True)
|
||||
self.configure_authenticators()
|
||||
self._run()
|
||||
except Exception:
|
||||
|
||||
@@ -90,8 +90,7 @@ class ConnectionRegistry(object):
|
||||
for driver in self.drivers.values():
|
||||
driver.stop()
|
||||
|
||||
def configure(self, config, source_only=False, include_drivers=None,
|
||||
require_sql=False):
|
||||
def configure(self, config, source_only=False, require_sql=False):
|
||||
# Register connections from the config
|
||||
connections = OrderedDict()
|
||||
|
||||
@@ -127,16 +126,6 @@ class ConnectionRegistry(object):
|
||||
if source_only and not isinstance(driver, SourceInterface):
|
||||
continue
|
||||
|
||||
# Zuul web needs only the SQL driver, accomodate that here:
|
||||
if include_drivers is not None:
|
||||
found = False
|
||||
for driver_class in include_drivers:
|
||||
if isinstance(driver, driver_class):
|
||||
found = True
|
||||
break
|
||||
if not found:
|
||||
continue
|
||||
|
||||
connection = driver.getConnection(con_name, con_config)
|
||||
connections[con_name] = connection
|
||||
if con_driver == 'sql' and 'database' not in connections:
|
||||
|
||||
@@ -14,6 +14,8 @@
|
||||
# limitations under the License.
|
||||
import cherrypy
|
||||
import socket
|
||||
from collections import defaultdict
|
||||
from contextlib import suppress
|
||||
|
||||
from cachetools.func import ttl_cache
|
||||
from ws4py.server.cherrypyserver import WebSocketPlugin, WebSocketTool
|
||||
@@ -30,19 +32,24 @@ import ssl
|
||||
import threading
|
||||
|
||||
from zuul import exceptions
|
||||
from zuul.configloader import ConfigLoader
|
||||
import zuul.lib.repl
|
||||
from zuul.lib import commandsocket
|
||||
from zuul.lib import streamer_utils
|
||||
from zuul.lib.ansible import AnsibleManager
|
||||
from zuul.lib.keystorage import KeyStorage
|
||||
from zuul.lib.re2util import filter_allowed_disallowed
|
||||
import zuul.model
|
||||
from zuul.model import Abide, SystemAttributes, UnparsedAbideConfig, WebInfo
|
||||
import zuul.rpcclient
|
||||
from zuul.version import get_version_string
|
||||
from zuul.zk import ZooKeeperClient
|
||||
from zuul.zk.components import ComponentRegistry, WebComponent
|
||||
from zuul.zk.config_cache import SystemConfigCache
|
||||
from zuul.zk.executor import ExecutorApi
|
||||
from zuul.zk.layout import LayoutStateStore
|
||||
from zuul.zk.locks import tenant_read_lock
|
||||
from zuul.zk.nodepool import ZooKeeperNodepool
|
||||
from zuul.zk.system import ZuulSystem
|
||||
from zuul.zk.config_cache import SystemConfigCache
|
||||
from zuul.lib.auth import AuthenticatorRegistry
|
||||
from zuul.lib.config import get_default
|
||||
|
||||
@@ -1303,7 +1310,7 @@ class ZuulWeb(object):
|
||||
config,
|
||||
connections,
|
||||
authenticators: AuthenticatorRegistry,
|
||||
info: zuul.model.WebInfo = None):
|
||||
info: WebInfo = None):
|
||||
self.start_time = time.time()
|
||||
self.config = config
|
||||
self.listen_address = get_default(self.config,
|
||||
@@ -1345,9 +1352,17 @@ class ZuulWeb(object):
|
||||
self.system_config_cache = SystemConfigCache(
|
||||
self.zk_client,
|
||||
self.system_config_cache_wake_event.set)
|
||||
# Fetch an initial value so we we have something to serve
|
||||
# requests before the initial callback fires.
|
||||
self.unparsed_abide, _ = self.system_config_cache.get()
|
||||
|
||||
self.keystore = KeyStorage(
|
||||
self.zk_client, password=self._get_key_store_password())
|
||||
self.globals = SystemAttributes.fromConfig(self.config)
|
||||
self.ansible_manager = AnsibleManager(
|
||||
default_version=self.globals.default_ansible_version)
|
||||
self.abide = Abide()
|
||||
self.unparsed_abide = UnparsedAbideConfig()
|
||||
self.tenant_layout_state = LayoutStateStore(
|
||||
self.zk_client, self.system_config_cache_wake_event.set)
|
||||
self.local_layout_state = {}
|
||||
|
||||
self.connections = connections
|
||||
self.authenticators = authenticators
|
||||
@@ -1502,18 +1517,26 @@ class ZuulWeb(object):
|
||||
def port(self):
|
||||
return cherrypy.server.bound_addr[1]
|
||||
|
||||
def updateSystemConfigCache(self):
|
||||
while self._system_config_running:
|
||||
try:
|
||||
self.system_config_cache_wake_event.wait()
|
||||
if not self._system_config_running:
|
||||
return
|
||||
self.unparsed_abide, _ = self.system_config_cache.get()
|
||||
except Exception:
|
||||
self.log.exception("Exception while processing command")
|
||||
|
||||
def start(self):
|
||||
self.log.debug("ZuulWeb starting")
|
||||
|
||||
# Wait for system config and layouts to be loaded
|
||||
while not self.system_config_cache.is_valid:
|
||||
self.system_config_cache_wake_event.wait()
|
||||
|
||||
# Initialize the system config
|
||||
self.updateSystemConfig()
|
||||
|
||||
# Wait until all layouts/tenants are loaded
|
||||
while True:
|
||||
self.system_config_cache_wake_event.clear()
|
||||
self.updateLayout()
|
||||
if (set(self.unparsed_abide.tenants.keys())
|
||||
!= set(self.abide.tenants.keys())):
|
||||
self.system_config_cache_wake_event.wait()
|
||||
else:
|
||||
break
|
||||
|
||||
self.stream_manager.start()
|
||||
self.wsplugin = WebSocketPlugin(cherrypy.engine)
|
||||
self.wsplugin.subscribe()
|
||||
@@ -1529,7 +1552,7 @@ class ZuulWeb(object):
|
||||
self.component_info.state = self.component_info.RUNNING
|
||||
|
||||
self.system_config_thread = threading.Thread(
|
||||
target=self.updateSystemConfigCache,
|
||||
target=self.updateConfig,
|
||||
name='system_config')
|
||||
self._system_config_running = True
|
||||
self.system_config_thread.daemon = True
|
||||
@@ -1578,3 +1601,62 @@ class ZuulWeb(object):
|
||||
return
|
||||
self.repl.stop()
|
||||
self.repl = None
|
||||
|
||||
def _get_key_store_password(self):
|
||||
try:
|
||||
return self.config["keystore"]["password"]
|
||||
except KeyError:
|
||||
raise RuntimeError("No key store password configured!")
|
||||
|
||||
def updateConfig(self):
|
||||
while self._system_config_running:
|
||||
try:
|
||||
self.system_config_cache_wake_event.wait()
|
||||
self.system_config_cache_wake_event.clear()
|
||||
if not self._system_config_running:
|
||||
return
|
||||
self.updateSystemConfig()
|
||||
self.updateLayout()
|
||||
except Exception:
|
||||
self.log.exception("Exception while updating system config")
|
||||
|
||||
def updateSystemConfig(self):
|
||||
self.log.debug("Updating system config")
|
||||
self.unparsed_abide, self.globals = self.system_config_cache.get()
|
||||
self.ansible_manager = AnsibleManager(
|
||||
default_version=self.globals.default_ansible_version)
|
||||
|
||||
loader = ConfigLoader(
|
||||
self.connections, self.zk_client, self.globals,
|
||||
keystorage=self.keystore)
|
||||
|
||||
loader.loadTPCs(self.abide, self.unparsed_abide)
|
||||
loader.loadAdminRules(self.abide, self.unparsed_abide)
|
||||
|
||||
def updateLayout(self):
|
||||
self.log.debug("Updating layout state")
|
||||
loader = ConfigLoader(
|
||||
self.connections, self.zk_client, self.globals,
|
||||
keystorage=self.keystore)
|
||||
|
||||
min_ltimes = defaultdict(lambda: defaultdict(lambda: -1))
|
||||
for tenant_name in self.unparsed_abide.tenants:
|
||||
# Reload the tenant if the layout changed.
|
||||
if (self.local_layout_state.get(tenant_name)
|
||||
== self.tenant_layout_state.get(tenant_name)):
|
||||
continue
|
||||
with tenant_read_lock(self.zk_client, tenant_name):
|
||||
layout_state = self.tenant_layout_state.get(tenant_name)
|
||||
layout_uuid = layout_state and layout_state.uuid
|
||||
# The tenant will be stored in self.abide.tenants after
|
||||
# it was loaded.
|
||||
tenant = loader.loadTenant(
|
||||
self.abide, tenant_name, self.ansible_manager,
|
||||
self.unparsed_abide, min_ltimes=min_ltimes,
|
||||
layout_uuid=layout_uuid)
|
||||
if tenant is not None:
|
||||
self.local_layout_state[tenant_name] = layout_state
|
||||
else:
|
||||
with suppress(KeyError):
|
||||
del self.local_layout_state[tenant_name]
|
||||
self.log.debug("Done updating layout state")
|
||||
|
||||
Reference in New Issue
Block a user