Files
zuul/tests/unit/test_component_registry.py
James E. Blair 317e42a2e5 Use a TreeCache for job request queues
To reduce ZK traffic and contention in sites with many executors
and jobs, we move the job request queue to a TreeCache.  This is
a structure we have been using successfully for some time in
nodepool, and we added it to Zuul for the nodepool-in-zuul work.

This uses a single watch for all items in the job request queue.
It will keep all the requests in the local cache of the queue
up-to-date.  We still do not cache the parameters and other data;
we only load those when needed.

A later change will take advantage of the fact that we can detect
whether a request is locked or not without locking it ourselves
to further reduce traffic and contention.  This change attempts
to minimize changes other than what is necessary for the tree
cache.

Change-Id: Ie3d62dadd9990c847df8302d4da801a990394d2c
2025-02-04 08:53:52 -08:00

149 lines
4.9 KiB
Python

# Copyright 2021 BMW Group
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import configparser
from zuul.lib.fingergw import FingerGateway
from zuul.zk.components import BaseComponent, ComponentRegistry
from tests.base import (
iterate_timeout,
okay_tracebacks,
ZuulTestCase,
ZuulWebFixture,
)
class TestComponentRegistry(ZuulTestCase):
tenant_config_file = 'config/single-tenant/main.yaml'
def setUp(self):
super().setUp()
self.component_registry = ComponentRegistry(self.zk_client)
def assertComponentAttr(self, component_name, attr_name,
attr_value, timeout=25):
for _ in iterate_timeout(
timeout,
f"{component_name} in cache has {attr_name} set to {attr_value}",
):
components = list(self.component_registry.all(component_name))
if (
len(components) > 0 and
getattr(components[0], attr_name) == attr_value
):
break
def assertComponentState(self, component_name, state, timeout=25):
return self.assertComponentAttr(
component_name, "state", state, timeout
)
def assertComponentStopped(self, component_name, timeout=25):
for _ in iterate_timeout(
timeout, f"{component_name} in cache is stopped"
):
components = list(self.component_registry.all(component_name))
if len(components) == 0:
break
def test_scheduler_component(self):
self.assertComponentState("scheduler", BaseComponent.RUNNING)
@okay_tracebacks('_start',
'_playbackWorker')
def test_executor_component(self):
self.assertComponentState("executor", BaseComponent.RUNNING)
self.executor_server.pause()
self.assertComponentState("executor", BaseComponent.PAUSED)
self.executor_server.unpause()
self.assertComponentState("executor", BaseComponent.RUNNING)
self.executor_server.unregister_work()
self.assertComponentAttr("executor", "accepting_work", False)
self.executor_server.register_work()
self.assertComponentAttr("executor", "accepting_work", True)
# This can cause tracebacks in the logs when the tree cache
# attempts to restart.
self.executor_server.zk_client.client.stop()
self.assertComponentStopped("executor")
self.executor_server.zk_client.client.start()
self.assertComponentAttr("executor", "accepting_work", True)
def test_merger_component(self):
self._startMerger()
self.assertComponentState("merger", BaseComponent.RUNNING)
self.merge_server.pause()
self.assertComponentState("merger", BaseComponent.PAUSED)
self.merge_server.unpause()
self.assertComponentState("merger", BaseComponent.RUNNING)
self.merge_server.stop()
self.merge_server.join()
# Set the merger to None so the test doesn't try to stop it again
self.merge_server = None
try:
self.assertComponentStopped("merger")
except Exception:
for kind, components in self.component_registry.all():
self.log.error("Component %s has %s online", kind, components)
raise
def test_fingergw_component(self):
config = configparser.ConfigParser()
config.read_dict(self.config)
config.read_dict({
'fingergw': {
'listen_address': '::',
'port': '0',
'hostname': 'janine',
}
})
gateway = FingerGateway(
config,
command_socket=None,
pid_file=None
)
gateway.start()
try:
self.assertComponentState("fingergw", BaseComponent.RUNNING)
self.assertComponentAttr("fingergw", "hostname", "janine")
finally:
gateway.stop()
self.assertComponentStopped("fingergw")
def test_web_component(self):
self.useFixture(
ZuulWebFixture(
self.config, self.test_config, self.additional_event_queues,
self.upstream_root, self.poller_events,
self.git_url_with_auth, self.addCleanup, self.test_root
)
)
self.assertComponentState("web", BaseComponent.RUNNING)
def test_launcher_component(self):
self.assertComponentState("launcher", BaseComponent.RUNNING)