Improve component registry

This improves the usage of the component registry in various ways:

1. It adds a tree cache to the registry. The cache is eventual
   consistent, which should be sufficient for most use cases like
   calculating stats in the scheduler and getting a list of components
   without the need to ask ZooKeeper every time for the list of
   components.

2. Components can now be used as classes rather than dictionaries, which
   makes using and updating them much easier and nicer.

3. Components can be used without a registry. This makes registering
   components easier and you only need to instantiate a registry when
   you need the registry itself (e.g. in the scheduler).

With that change the registry itself is not used anywhere in the
production code because it's not required at this point. I will add this
in the next commit.

Change-Id: Ia8efba26114119eecffb9a89264083e4b8a80de0
This commit is contained in:
Felix Edel 2021-04-15 16:26:13 +02:00 committed by James E. Blair
parent 641874967e
commit 040f403e7f
7 changed files with 281 additions and 221 deletions

View File

@ -14,9 +14,9 @@
from zuul.lib.fingergw import FingerGateway
from zuul.zk import ZooKeeperClient
from zuul.zk.components import ZooKeeperComponentRegistry
from zuul.zk.components import BaseComponent, ComponentRegistry
from tests.base import ZuulTestCase, ZuulWebFixture
from tests.base import iterate_timeout, ZuulTestCase, ZuulWebFixture
class TestComponentRegistry(ZuulTestCase):
@ -33,62 +33,54 @@ class TestComponentRegistry(ZuulTestCase):
)
self.addCleanup(self.zk_client.disconnect)
self.zk_client.connect()
self.component_registry = ZooKeeperComponentRegistry(self.zk_client)
self.component_registry = ComponentRegistry(self.zk_client)
def assertComponentState(self, component_name, state, timeout=5):
for _ in iterate_timeout(
timeout, f"{component_name} in cache is in state {state}"
):
components = list(self.component_registry.all(component_name))
if len(components) > 0 and components[0].state == state:
break
def assertComponentStopped(self, component_name, timeout=5):
for _ in iterate_timeout(
timeout, f"{component_name} in cache is stopped"
):
components = list(self.component_registry.all(component_name))
if len(components) == 0:
break
def test_scheduler_component(self):
component_states = self.component_registry.all("schedulers")
self.assertEqual(len(component_states), 1)
component = component_states[0]
self.assertEqual(component.get("state"), component.RUNNING)
self.assertComponentState("scheduler", BaseComponent.RUNNING)
def test_executor_component(self):
component_states = self.component_registry.all("executors")
self.assertEqual(len(component_states), 1)
component = component_states[0]
self.assertEqual(component.get("state"), component.RUNNING)
self.assertComponentState("executor", BaseComponent.RUNNING)
self.executor_server.pause()
component = self.component_registry.all("executors")[0]
self.assertEqual(component.get("state"), component.PAUSED)
self.assertComponentState("executor", BaseComponent.PAUSED)
self.executor_server.unpause()
component = self.component_registry.all("executors")[0]
self.assertEqual(component.get("state"), component.RUNNING)
self.assertComponentState("executor", BaseComponent.RUNNING)
def test_merger_component(self):
component_states = self.component_registry.all("mergers")
self.assertEqual(len(component_states), 0)
self._startMerger()
component_states = self.component_registry.all("mergers")
self.assertEqual(len(component_states), 1)
component = component_states[0]
self.assertEqual(component.get("state"), component.RUNNING)
self.assertComponentState("merger", BaseComponent.RUNNING)
self.merge_server.pause()
component = self.component_registry.all("mergers")[0]
self.assertEqual(component.get("state"), component.PAUSED)
self.assertComponentState("merger", BaseComponent.PAUSED)
self.merge_server.unpause()
component = self.component_registry.all("mergers")[0]
self.assertEqual(component.get("state"), component.RUNNING)
self.assertComponentState("merger", BaseComponent.RUNNING)
self.merge_server.stop()
self.merge_server.join()
# Set the merger to None so the test doesn't try to stop it again
self.merge_server = None
component_states = self.component_registry.all("mergers")
self.assertEqual(len(component_states), 0)
self.assertComponentStopped("merger")
def test_fingergw_component(self):
component_states = self.component_registry.all("finger-gateways")
self.assertEqual(len(component_states), 0)
gateway = FingerGateway(
self.config,
("127.0.0.1", self.gearman_server.port, None, None, None),
@ -100,22 +92,14 @@ class TestComponentRegistry(ZuulTestCase):
gateway.start()
try:
component_states = self.component_registry.all("finger-gateways")
self.assertEqual(len(component_states), 1)
component = component_states[0]
self.assertEqual(component.get("state"), component.RUNNING)
self.assertComponentState("fingergw", BaseComponent.RUNNING)
finally:
gateway.stop()
gateway.wait()
component_states = self.component_registry.all("finger-gateways")
self.assertEqual(len(component_states), 0)
self.assertComponentStopped("fingergw")
def test_web_component(self):
component_states = self.component_registry.all("finger-gateways")
self.assertEqual(len(component_states), 0)
self.useFixture(
ZuulWebFixture(
self.changes, self.config, self.additional_event_queues,
@ -124,8 +108,4 @@ class TestComponentRegistry(ZuulTestCase):
)
)
component_states = self.component_registry.all("webs")
self.assertEqual(len(component_states), 1)
component = component_states[0]
self.assertEqual(component.get("state"), component.RUNNING)
self.assertComponentState("web", BaseComponent.RUNNING)

View File

@ -63,6 +63,7 @@ from zuul.model import (
)
import zuul.model
from zuul.zk.event_queues import PipelineResultEventQueue
from zuul.zk.components import ExecutorComponent
BUFFER_LINES_FOR_SYNTAX = 200
COMMANDS = ['stop', 'pause', 'unpause', 'graceful', 'verbose',
@ -2693,9 +2694,8 @@ class ExecutorServer(BaseMergeServer):
# perhaps hostname+pid.
self.hostname = get_default(self.config, 'executor', 'hostname',
socket.getfqdn())
self.zk_component = self.zk_component_registry.register(
'executors', self.hostname
)
self.component_info = ExecutorComponent(self.zk_client, self.hostname)
self.component_info.register()
self.log_streaming_port = log_streaming_port
self.governor_lock = threading.Lock()
self.run_lock = threading.Lock()
@ -2920,7 +2920,7 @@ class ExecutorServer(BaseMergeServer):
self.governor_thread.daemon = True
self.governor_thread.start()
self.disk_accountant.start()
self.zk_component.set('state', self.zk_component.RUNNING)
self.component_info.state = self.component_info.RUNNING
def register_work(self):
if self._running:
@ -2939,7 +2939,7 @@ class ExecutorServer(BaseMergeServer):
def stop(self):
self.log.debug("Stopping")
self.zk_component.set('state', self.zk_component.STOPPED)
self.component_info.state = self.component_info.STOPPED
# Use the BaseMergeServer's stop method to disconnect from ZooKeeper.
super().stop()
self.connections.stop()
@ -3007,14 +3007,14 @@ class ExecutorServer(BaseMergeServer):
def pause(self):
self.log.debug('Pausing')
self.zk_component.set('state', self.zk_component.PAUSED)
self.component_info.state = self.component_info.PAUSED
self.pause_sensor.pause = True
if self.process_merge_jobs:
super().pause()
def unpause(self):
self.log.debug('Resuming')
self.zk_component.set('state', self.zk_component.RUNNING)
self.component_info.state = self.component_info.RUNNING
self.pause_sensor.pause = False
if self.process_merge_jobs:
super().unpause()

View File

@ -23,7 +23,7 @@ import zuul.rpcclient
from zuul.lib import streamer_utils
from zuul.lib.commandsocket import CommandSocket
from zuul.zk import ZooKeeperClient
from zuul.zk.components import ZooKeeperComponentRegistry
from zuul.zk.components import FingerGatewayComponent
COMMANDS = ['stop']
@ -149,9 +149,10 @@ class FingerGateway(object):
self.zk_client = ZooKeeperClient.fromConfig(config)
self.zk_client.connect()
self.hostname = socket.getfqdn()
self.zk_component = ZooKeeperComponentRegistry(
self.zk_client
).register('finger-gateways', self.hostname)
self.component_info = FingerGatewayComponent(
self.zk_client, self.hostname
)
self.component_info.register()
def _runCommand(self):
while self.command_running:
@ -202,11 +203,11 @@ class FingerGateway(object):
self.server_thread = threading.Thread(target=self._run)
self.server_thread.daemon = True
self.server_thread.start()
self.zk_component.set('state', self.zk_component.RUNNING)
self.component_info.state = self.component_info.RUNNING
self.log.info("Finger gateway is started")
def stop(self):
self.zk_component.set('state', self.zk_component.STOPPED)
self.component_info.state = self.component_info.STOPPED
if self.server:
try:
self.server.shutdown()

View File

@ -27,7 +27,7 @@ from zuul.lib.config import get_default
from zuul.lib.gearworker import ZuulGearWorker
from zuul.merger import merger
from zuul.merger.merger import nullcontext
from zuul.zk.components import ZooKeeperComponentRegistry
from zuul.zk.components import MergerComponent
COMMANDS = ['stop', 'pause', 'unpause']
@ -78,7 +78,6 @@ class BaseMergeServer(metaclass=ABCMeta):
self.zk_client = ZooKeeperClient.fromConfig(self.config)
self.zk_client.connect()
self.zk_component_registry = ZooKeeperComponentRegistry(self.zk_client)
# This merger and its git repos are used to maintain
# up-to-date copies of all the repos that are used by jobs, as
@ -264,9 +263,8 @@ class MergeServer(BaseMergeServer):
):
super().__init__(config, 'merger', connections)
self.hostname = socket.getfqdn()
self.zk_component = self.zk_component_registry.register(
'mergers', self.hostname
)
self.component_info = MergerComponent(self.zk_client, self.hostname)
self.component_info.register()
self.command_map = dict(
stop=self.stop,
@ -289,11 +287,11 @@ class MergeServer(BaseMergeServer):
target=self.runCommand, name='command')
self.command_thread.daemon = True
self.command_thread.start()
self.zk_component.set('state', self.zk_component.RUNNING)
self.component_info.state = self.component_info.RUNNING
def stop(self):
self.log.debug("Stopping")
self.zk_component.set('state', self.zk_component.STOPPED)
self.component_info.state = self.component_info.STOPPED
super().stop()
self._command_running = False
self.command_socket.stop()
@ -304,13 +302,13 @@ class MergeServer(BaseMergeServer):
def pause(self):
self.log.debug('Pausing')
self.zk_component.set('state', self.zk_component.PAUSED)
self.component_info.state = self.component_info.PAUSED
super().pause()
def unpause(self):
self.log.debug('Resuming')
super().unpause()
self.zk_component.set('state', self.zk_component.RUNNING)
self.component_info.state = self.component_info.RUNNING
def runCommand(self):
while self._command_running:

View File

@ -68,7 +68,7 @@ from zuul.model import (
UnparsedAbideConfig,
)
from zuul.zk import ZooKeeperClient
from zuul.zk.components import ZooKeeperComponentRegistry
from zuul.zk.components import SchedulerComponent
from zuul.zk.event_queues import (
GlobalEventWatcher,
GlobalManagementEventQueue,
@ -153,11 +153,8 @@ class Scheduler(threading.Thread):
self.zk_client = ZooKeeperClient.fromConfig(self.config)
self.zk_client.connect()
self.zk_nodepool = ZooKeeperNodepool(self.zk_client)
self.zk_component = (
ZooKeeperComponentRegistry(self.zk_client).register(
"schedulers", self.hostname
)
)
self.component_info = SchedulerComponent(self.zk_client, self.hostname)
self.component_info.register()
self.result_event_queue = NamedQueue("ResultEventQueue")
self.global_watcher = GlobalEventWatcher(
@ -241,11 +238,11 @@ class Scheduler(threading.Thread):
self.rpc_slow.start()
self.stats_thread.start()
self.cleanup_thread.start()
self.zk_component.set('state', self.zk_component.RUNNING)
self.component_info.state = self.component_info.RUNNING
def stop(self):
self._stopped = True
self.zk_component.set('state', self.zk_component.STOPPED)
self.component_info.state = self.component_info.STOPPED
self.stats_stop.set()
self.cleanup_stop.set()
self.stopConnections()

View File

@ -35,7 +35,7 @@ from zuul.lib.re2util import filter_allowed_disallowed
import zuul.model
import zuul.rpcclient
from zuul.zk import ZooKeeperClient
from zuul.zk.components import ZooKeeperComponentRegistry
from zuul.zk.components import WebComponent
from zuul.zk.nodepool import ZooKeeperNodepool
from zuul.lib.auth import AuthenticatorRegistry
from zuul.lib.config import get_default
@ -1266,11 +1266,8 @@ class ZuulWeb(object):
client_id='Zuul Web Server')
self.zk_client = ZooKeeperClient.fromConfig(self.config)
self.zk_client.connect()
self.zk_component = (
ZooKeeperComponentRegistry(self.zk_client).register(
'webs', self.hostname
)
)
self.component_info = WebComponent(self.zk_client, self.hostname)
self.component_info.register()
self.connections = connections
self.authenticators = authenticators
@ -1427,11 +1424,11 @@ class ZuulWeb(object):
name='command')
self.command_thread.daemon = True
self.command_thread.start()
self.zk_component.set('state', self.zk_component.RUNNING)
self.component_info.state = self.component_info.RUNNING
def stop(self):
self.log.debug("ZuulWeb stopping")
self.zk_component.set('state', self.zk_component.STOPPED)
self.component_info.state = self.component_info.STOPPED
self.rpc.shutdown()
cherrypy.engine.exit()
# Not strictly necessary, but without this, if the server is

View File

@ -13,173 +13,260 @@
# under the License.
import json
import logging
import threading
from typing import Any, Dict, List, Optional
from collections import defaultdict
from kazoo.client import KazooClient
from kazoo.exceptions import NoNodeError
from kazoo.recipe.cache import TreeCache, TreeEvent
from zuul.zk import NoClientException, ZooKeeperSimpleBase, ZooKeeperClient
from zuul.zk import ZooKeeperBase, ZooKeeperSimpleBase
class ZooKeeperComponentReadOnly(object):
"""
Read-only component object.
"""
STOPPED = 'stopped'
RUNNING = 'running'
PAUSED = 'paused'
def __init__(self, client: ZooKeeperClient, content: Dict[str, Any]):
self._client = client
self._content = content
@property
def kazoo_client(self) -> KazooClient:
if not self._client.client:
raise NoClientException()
return self._client.client
def get(self, key: str, default: Optional[Any] = None) -> Any:
"""
Gets an attribute of the component.
:param key: Attribute key
:param default: Default value (default: None)
:return: Value of the attribute
"""
return self._content.get(key, default)
COMPONENTS_ROOT = "/zuul/components"
class ZooKeeperComponent(ZooKeeperComponentReadOnly):
class BaseComponent(ZooKeeperSimpleBase):
"""
Read/write component object.
This object holds an offline cache of all the component's attributes.
In case of an failed update to zookeeper the local cache will still
hold the fresh values. Updating any attribute uploads all attributes to
zookeeper.
This object holds an offline cache of all the component's attributes. In
case of an failed update to ZooKeeper the local cache will still hold the
fresh values. Updating any attribute uploads all attributes to ZooKeeper.
This enables this object to be used as a local key/value store even if
zookeeper connection got lost. One must still catch exceptions related
to zookeeper connection loss.
This enables this object to be used as local key/value store even if the
ZooKeeper connection got lost.
"""
log = logging.getLogger("zuul.zk.components.ZooKeeperComponent")
# Component states
RUNNING = "running"
PAUSED = "paused"
STOPPED = "stopped"
def __init__(
self,
client: ZooKeeperClient,
kind: str,
hostname: str,
content: Dict[str, Any],
):
super().__init__(client, content)
self._kind = kind
self._persisted = False
self._hostname = hostname
self._path = self._register(content)
self._set_lock = threading.Lock()
log = logging.getLogger("zuul.zk.components.BaseComponent")
kind = "base"
def set(self, key: str, value: int) -> None:
"""
Sets an attribute of a component and uploads the whole component state
to zookeeper.
def __init__(self, client, hostname):
# Ensure that the content is available before setting any other
# attribute, because our __setattr__ implementation is relying on it.
self.__dict__["content"] = {
"hostname": hostname,
"state": self.STOPPED,
"kind": self.kind,
}
# NOTE (felix): If we want to have a "read-only" component, we could
# provide client=None to the constructor.
super().__init__(client)
Upload only happens if the new value changed or if an previous upload
failed.
self.path = None
self._zstat = None
:param key: Attribute key
:param value: Value
"""
def __getattr__(self, name):
try:
return self.content[name]
except KeyError:
raise AttributeError
with self._set_lock:
upload = self._content.get(key) != value or not self._persisted
self._content[key] = value
if upload:
self._persisted = False
stat = self.kazoo_client.exists(self._path)
if not stat: # Re-register, if connection to zk was lost
self._path = self._register(self._content)
else:
content = json.dumps(self._content).encode(
encoding="UTF-8"
)
self.kazoo_client.set(
self._path, content, version=stat.version
)
self._persisted = True
def __setattr__(self, name, value):
# If the specified attribute is not part of our content dictionary,
# fall back to the default __settattr__ behaviour.
if name not in self.content.keys():
return super().__setattr__(name, value)
def _register(self, content: Dict[str, Any]) -> str:
path = "{}/{}/{}-".format(
ZooKeeperComponentRegistry.ROOT, self._kind, self._hostname
)
self.log.debug(f"Creating {path} in Zookeeper")
return self.kazoo_client.create(
# Set the value in the local content dict
self.content[name] = value
if not self.path:
self.log.error(
"Path is not set on this component, did you forget to call "
"register()?"
)
return
# Update the ZooKeeper node
content = json.dumps(self.content).encode("utf-8")
try:
zstat = self.kazoo_client.set(
self.path, content, version=self._zstat.version
)
self._zstat = zstat
except NoNodeError:
self.log.error("Could not update %s in ZooKeeper", self)
def register(self):
path = "/".join([COMPONENTS_ROOT, self.kind, self.hostname])
self.log.debug("Registering component in ZooKeeper %s", path)
self.path, self._zstat = self.kazoo_client.create(
path,
json.dumps(content).encode("utf-8"),
json.dumps(self.content).encode("utf-8"),
makepath=True,
ephemeral=True,
sequence=True,
# Also return the zstat, which is necessary to successfully update
# the component.
include_data=True,
)
def unregister(self):
self.kazoo_client.delete(self._path)
def updateFromDict(self, data):
self.content.update(data)
@classmethod
def fromDict(cls, client, hostname, data):
component = cls(client, hostname)
component.updateFromDict(data)
return component
def __repr__(self):
return f"<{self.__class__.__name__} {self.content}>"
class ZooKeeperComponentRegistry(ZooKeeperSimpleBase):
"""
ZooKeeper component registry. Each zuul component can register itself
using this registry. This will create a ephemeral zookeeper node, which
will be then deleted in case such component looses connection to zookeeper.
class SchedulerComponent(BaseComponent):
kind = "scheduler"
Any other component may request a list of registered components to check
their properties. List of components received by other components are
read-only. Only the component itself can update its entry in the registry.
"""
ROOT = "/zuul/components"
class ExecutorComponent(BaseComponent):
kind = "executor"
class MergerComponent(BaseComponent):
kind = "merger"
class FingerGatewayComponent(BaseComponent):
kind = "fingergw"
class WebComponent(BaseComponent):
kind = "web"
class ComponentRegistry(ZooKeeperBase):
log = logging.getLogger("zuul.zk.components.ZooKeeperComponentRegistry")
def all(self, kind: str) -> List[ZooKeeperComponentReadOnly]:
"""
Get all registered components of a given kind. Components obtained
using this method cannot be updated.
COMPONENT_CLASSES = {
"scheduler": SchedulerComponent,
"executor": ExecutorComponent,
"merger": MergerComponent,
"fingergw": FingerGatewayComponent,
"web": WebComponent,
}
:param kind: Kind of components
:return: List of read-only components
"""
def __init__(self, client):
super().__init__(client)
result = []
path = "{}/{}".format(self.ROOT, kind)
try:
for node in self.kazoo_client.get_children(path):
path = "{}/{}/{}".format(self.ROOT, kind, node)
data, _ = self.kazoo_client.get(path)
content = json.loads(data.decode("UTF-8"))
result.append(ZooKeeperComponentReadOnly(self.client, content))
except NoNodeError:
# If the node doesn't exist there is no component registered.
pass
return result
self.client = client
self._component_tree = None
# kind -> hostname -> component
self._cached_components = defaultdict(dict)
def register(self, kind: str, hostname: str) -> ZooKeeperComponent:
"""
Register component with a hostname. This method returns an updateable
component object.
# If we are already connected when the class is instantiated, directly
# call the onConnect callback.
if self.client.connected:
self._onConnect()
:param kind: Kind of component
:param hostname: Hostname
:return: Path representing the components's ZNode
"""
return ZooKeeperComponent(
self.client,
kind,
hostname,
dict(
hostname=hostname,
state=ZooKeeperComponent.STOPPED,
),
def _onConnect(self):
self._component_tree = TreeCache(self.kazoo_client, COMPONENTS_ROOT)
self._component_tree.listen_fault(self._cacheFaultListener)
self._component_tree.listen(self._componentCacheListener)
self._component_tree.start()
def _onDisconnect(self):
if self._component_tree is not None:
self._component_tree.close()
# Explicitly unset the TreeCache, otherwise we might leak
# open connections/ports.
self._component_tree = None
def all(self, kind=None):
if kind is None:
return [kind.values() for kind in self._cached_components.keys()]
# Filter the cached components for the given kind
return self._cached_components.get(kind, {}).values()
def _cacheFaultListener(self, e):
self.log.exception(e)
def _componentCacheListener(self, event):
path = None
if hasattr(event.event_data, "path"):
path = event.event_data.path
# Ignore events without path
if not path:
return
# Ignore root node
if path == COMPONENTS_ROOT:
return
# Ignore lock nodes
if "__lock__" in path:
return
# Ignore unrelated events
if event.event_type not in (
TreeEvent.NODE_ADDED,
TreeEvent.NODE_UPDATED,
TreeEvent.NODE_REMOVED,
):
return
# Split the path into segments to find out the type of event (e.g.
# a subnode was created or the buildnode itself was touched).
segments = self._getSegments(path)
# The segments we are interested in should look something like this:
# <kind> / <hostname>
if len(segments) < 2:
# Ignore events that don't touch a component
return
kind = segments[0]
hostname = segments[1]
self.log.debug(
"Got cache update event %s for path %s", event.event_type, path
)
if event.event_type in (TreeEvent.NODE_UPDATED, TreeEvent.NODE_ADDED):
# Ignore events without data
if not event.event_data.data:
return
# Perform an in-place update of the cached component (if any)
component = self._cached_components.get(kind, {}).get(hostname)
d = json.loads(event.event_data.data.decode("utf-8"))
if component:
if (
event.event_data.stat.version
<= component._zstat.version
):
# Don't update to older data
return
component.updateFromDict(d)
component._zstat = event.event_data.stat
else:
# Create a new component from the data structure
# Get the correct kind of component
# TODO (felix): KeyError on unknown component type?
component_cls = self.COMPONENT_CLASSES[kind]
component = component_cls.fromDict(self.client, hostname, d)
component.path = path
component._zstat = event.event_data.stat
self._cached_components[kind][hostname] = component
elif event.event_type == TreeEvent.NODE_REMOVED:
try:
del self._cached_components[kind][hostname]
except KeyError:
# If it's already gone, don't care
pass
def _getSegments(self, path):
if path.startswith(COMPONENTS_ROOT):
# Normalize the path (remove the root part)
path = path[len(COMPONENTS_ROOT) + 1:]
return path.split("/")