Component Registry in ZooKeeper

This change adds a component registry which can be used by different
components, such as executors, mergers and others to register
themselves, report their state and store arbitrary runtime information.

This is needed to e.g., monitor components or to share the
"accepting_work" state of executors later on.

Change-Id: I4b7197d6cb399513e30d314f8a5f4f55ad9266f8
This commit is contained in:
Jan Kubovy
2020-10-16 17:05:42 +02:00
committed by James E. Blair
parent e7e1fa2660
commit 22935c1177
7 changed files with 357 additions and 3 deletions
+131
View File
@@ -0,0 +1,131 @@
# Copyright 2021 BMW Group
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from zuul.lib.fingergw import FingerGateway
from zuul.zk import ZooKeeperClient
from zuul.zk.components import ZooKeeperComponentRegistry
from tests.base import ZuulTestCase, ZuulWebFixture
class TestComponentRegistry(ZuulTestCase):
tenant_config_file = 'config/single-tenant/main.yaml'
def setUp(self):
super().setUp()
self.zk_client = ZooKeeperClient(
self.zk_chroot_fixture.zk_hosts,
tls_cert=self.zk_chroot_fixture.zookeeper_cert,
tls_key=self.zk_chroot_fixture.zookeeper_key,
tls_ca=self.zk_chroot_fixture.zookeeper_ca,
)
self.addCleanup(self.zk_client.disconnect)
self.zk_client.connect()
self.component_registry = ZooKeeperComponentRegistry(self.zk_client)
def test_scheduler_component(self):
component_states = self.component_registry.all("schedulers")
self.assertEqual(len(component_states), 1)
component = component_states[0]
self.assertEqual(component.get("state"), component.RUNNING)
def test_executor_component(self):
component_states = self.component_registry.all("executors")
self.assertEqual(len(component_states), 1)
component = component_states[0]
self.assertEqual(component.get("state"), component.RUNNING)
self.executor_server.pause()
component = self.component_registry.all("executors")[0]
self.assertEqual(component.get("state"), component.PAUSED)
self.executor_server.unpause()
component = self.component_registry.all("executors")[0]
self.assertEqual(component.get("state"), component.RUNNING)
def test_merger_component(self):
component_states = self.component_registry.all("mergers")
self.assertEqual(len(component_states), 0)
self._startMerger()
component_states = self.component_registry.all("mergers")
self.assertEqual(len(component_states), 1)
component = component_states[0]
self.assertEqual(component.get("state"), component.RUNNING)
self.merge_server.pause()
component = self.component_registry.all("mergers")[0]
self.assertEqual(component.get("state"), component.PAUSED)
self.merge_server.unpause()
component = self.component_registry.all("mergers")[0]
self.assertEqual(component.get("state"), component.RUNNING)
self.merge_server.stop()
self.merge_server.join()
# Set the merger to None so the test doesn't try to stop it again
self.merge_server = None
component_states = self.component_registry.all("mergers")
self.assertEqual(len(component_states), 0)
def test_fingergw_component(self):
component_states = self.component_registry.all("finger-gateways")
self.assertEqual(len(component_states), 0)
gateway = FingerGateway(
self.config,
("127.0.0.1", self.gearman_server.port, None, None, None),
("127.0.0.1", 0),
user=None,
command_socket=None,
pid_file=None
)
gateway.start()
try:
component_states = self.component_registry.all("finger-gateways")
self.assertEqual(len(component_states), 1)
component = component_states[0]
self.assertEqual(component.get("state"), component.RUNNING)
finally:
gateway.stop()
gateway.wait()
component_states = self.component_registry.all("finger-gateways")
self.assertEqual(len(component_states), 0)
def test_web_component(self):
component_states = self.component_registry.all("finger-gateways")
self.assertEqual(len(component_states), 0)
self.useFixture(
ZuulWebFixture(
self.changes, self.config, self.additional_event_queues,
self.upstream_root, self.rpcclient, self.poller_events,
self.git_url_with_auth, self.addCleanup, self.test_root
)
)
component_states = self.component_registry.all("webs")
self.assertEqual(len(component_states), 1)
component = component_states[0]
self.assertEqual(component.get("state"), component.RUNNING)
+7
View File
@@ -2568,6 +2568,9 @@ class ExecutorServer(BaseMergeServer):
# perhaps hostname+pid.
self.hostname = get_default(self.config, 'executor', 'hostname',
socket.getfqdn())
self.zk_component = self.zk_component_registry.register(
'executors', self.hostname
)
self.log_streaming_port = log_streaming_port
self.governor_lock = threading.Lock()
self.run_lock = threading.Lock()
@@ -2782,6 +2785,7 @@ class ExecutorServer(BaseMergeServer):
self.governor_thread.daemon = True
self.governor_thread.start()
self.disk_accountant.start()
self.zk_component.set('state', self.zk_component.RUNNING)
def register_work(self):
if self._running:
@@ -2800,6 +2804,7 @@ class ExecutorServer(BaseMergeServer):
def stop(self):
self.log.debug("Stopping")
self.zk_component.set('state', self.zk_component.STOPPED)
# Use the BaseMergeServer's stop method to disconnect from ZooKeeper.
super().stop()
self.connections.stop()
@@ -2867,12 +2872,14 @@ class ExecutorServer(BaseMergeServer):
def pause(self):
self.log.debug('Pausing')
self.zk_component.set('state', self.zk_component.PAUSED)
self.pause_sensor.pause = True
if self.process_merge_jobs:
super().pause()
def unpause(self):
self.log.debug('Resuming')
self.zk_component.set('state', self.zk_component.RUNNING)
self.pause_sensor.pause = False
if self.process_merge_jobs:
super().unpause()
+7 -1
View File
@@ -23,7 +23,7 @@ import zuul.rpcclient
from zuul.lib import streamer_utils
from zuul.lib.commandsocket import CommandSocket
from zuul.zk import ZooKeeperClient
from zuul.zk.components import ZooKeeperComponentRegistry
COMMANDS = ['stop']
@@ -148,6 +148,10 @@ class FingerGateway(object):
self.zk_client = ZooKeeperClient.fromConfig(config)
self.zk_client.connect()
self.hostname = socket.getfqdn()
self.zk_component = ZooKeeperComponentRegistry(
self.zk_client
).register('finger-gateways', self.hostname)
def _runCommand(self):
while self.command_running:
@@ -198,9 +202,11 @@ class FingerGateway(object):
self.server_thread = threading.Thread(target=self._run)
self.server_thread.daemon = True
self.server_thread.start()
self.zk_component.set('state', self.zk_component.RUNNING)
self.log.info("Finger gateway is started")
def stop(self):
self.zk_component.set('state', self.zk_component.STOPPED)
if self.server:
try:
self.server.shutdown()
+11
View File
@@ -15,6 +15,7 @@
import json
import logging
import os
import socket
import threading
from abc import ABCMeta
from configparser import ConfigParser
@@ -26,6 +27,7 @@ from zuul.lib.config import get_default
from zuul.lib.gearworker import ZuulGearWorker
from zuul.merger import merger
from zuul.merger.merger import nullcontext
from zuul.zk.components import ZooKeeperComponentRegistry
COMMANDS = ['stop', 'pause', 'unpause']
@@ -76,6 +78,7 @@ class BaseMergeServer(metaclass=ABCMeta):
self.zk_client = ZooKeeperClient.fromConfig(self.config)
self.zk_client.connect()
self.zk_component_registry = ZooKeeperComponentRegistry(self.zk_client)
# This merger and its git repos are used to maintain
# up-to-date copies of all the repos that are used by jobs, as
@@ -252,6 +255,10 @@ class MergeServer(BaseMergeServer):
connections,
):
super().__init__(config, 'merger', connections)
self.hostname = socket.getfqdn()
self.zk_component = self.zk_component_registry.register(
'mergers', self.hostname
)
self.command_map = dict(
stop=self.stop,
@@ -274,9 +281,11 @@ class MergeServer(BaseMergeServer):
target=self.runCommand, name='command')
self.command_thread.daemon = True
self.command_thread.start()
self.zk_component.set('state', self.zk_component.RUNNING)
def stop(self):
self.log.debug("Stopping")
self.zk_component.set('state', self.zk_component.STOPPED)
super().stop()
self._command_running = False
self.command_socket.stop()
@@ -287,11 +296,13 @@ class MergeServer(BaseMergeServer):
def pause(self):
self.log.debug('Pausing')
self.zk_component.set('state', self.zk_component.PAUSED)
super().pause()
def unpause(self):
self.log.debug('Resuming')
super().unpause()
self.zk_component.set('state', self.zk_component.RUNNING)
def runCommand(self):
while self._command_running:
+9 -1
View File
@@ -46,6 +46,7 @@ from zuul.executor.client import ExecutorClient
from zuul.merger.client import MergeClient
from zuul.model import Build, HoldRequest, Tenant, TriggerEvent
from zuul.zk import ZooKeeperClient
from zuul.zk.components import ZooKeeperComponentRegistry
from zuul.zk.nodepool import ZooKeeperNodepool
COMMANDS = ['full-reconfigure', 'smart-reconfigure', 'stop', 'repl', 'norepl']
@@ -333,6 +334,11 @@ class Scheduler(threading.Thread):
self.zk_client = ZooKeeperClient.fromConfig(self.config)
self.zk_client.connect()
self.zk_nodepool = ZooKeeperNodepool(self.zk_client)
self.zk_component = (
ZooKeeperComponentRegistry(self.zk_client).register(
"schedulers", self.hostname
)
)
self.trigger_event_queue = queue.Queue()
self.result_event_queue = queue.Queue()
@@ -388,10 +394,11 @@ class Scheduler(threading.Thread):
self.rpc.start()
self.rpc_slow.start()
self.stats_thread.start()
self.zk_component.set('state', self.zk_component.RUNNING)
def stop(self):
self.zk_client.disconnect()
self._stopped = True
self.zk_component.set('state', self.zk_component.STOPPED)
self.stats_stop.set()
self.stopConnections()
self.wake_event.set()
@@ -404,6 +411,7 @@ class Scheduler(threading.Thread):
self._command_running = False
self.command_socket.stop()
self.command_thread.join()
self.zk_client.disconnect()
def runCommand(self):
while self._command_running:
+11 -1
View File
@@ -35,6 +35,7 @@ from zuul.lib.re2util import filter_allowed_disallowed
import zuul.model
import zuul.rpcclient
from zuul.zk import ZooKeeperClient
from zuul.zk.components import ZooKeeperComponentRegistry
from zuul.zk.nodepool import ZooKeeperNodepool
from zuul.lib.auth import AuthenticatorRegistry
from zuul.lib.config import get_default
@@ -1227,6 +1228,8 @@ class ZuulWeb(object):
self.static_path = os.path.abspath(
get_default(self.config, 'web', 'static_path', STATIC_DIR)
)
self.hostname = socket.getfqdn()
gear_server = get_default(self.config, 'gearman', 'server')
gear_port = get_default(self.config, 'gearman', 'port', 4730)
ssl_key = get_default(self.config, 'gearman', 'ssl_key')
@@ -1238,6 +1241,12 @@ class ZuulWeb(object):
ssl_key, ssl_cert, ssl_ca,
client_id='Zuul Web Server')
self.zk_client = ZooKeeperClient.fromConfig(self.config)
self.zk_client.connect()
self.zk_component = (
ZooKeeperComponentRegistry(self.zk_client).register(
'webs', self.hostname
)
)
self.connections = connections
self.authenticators = authenticators
@@ -1376,7 +1385,6 @@ class ZuulWeb(object):
def start(self):
self.log.debug("ZuulWeb starting")
self.zk_client.connect()
self.stream_manager.start()
self.wsplugin = WebSocketPlugin(cherrypy.engine)
self.wsplugin.subscribe()
@@ -1389,9 +1397,11 @@ class ZuulWeb(object):
name='command')
self.command_thread.daemon = True
self.command_thread.start()
self.zk_component.set('state', self.zk_component.RUNNING)
def stop(self):
self.log.debug("ZuulWeb stopping")
self.zk_component.set('state', self.zk_component.STOPPED)
self.rpc.shutdown()
cherrypy.engine.exit()
# Not strictly necessary, but without this, if the server is
+181
View File
@@ -0,0 +1,181 @@
# Copyright 2020 BMW Group
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import logging
import threading
from typing import Any, Dict, List, Optional
from kazoo.client import KazooClient
from zuul.zk import NoClientException, ZooKeeperBase, ZooKeeperClient
class ZooKeeperComponentReadOnly(object):
"""
Read-only component object.
"""
STOPPED = 'stopped'
RUNNING = 'running'
PAUSED = 'paused'
def __init__(self, client: ZooKeeperClient, content: Dict[str, Any]):
self._client = client
self._content = content
@property
def kazoo_client(self) -> KazooClient:
if not self._client.client:
raise NoClientException()
return self._client.client
def get(self, key: str, default: Optional[Any] = None) -> Any:
"""
Gets an attribute of the component.
:param key: Attribute key
:param default: Default value (default: None)
:return: Value of the attribute
"""
return self._content.get(key, default)
class ZooKeeperComponent(ZooKeeperComponentReadOnly):
"""
Read/write component object.
This object holds an offline cache of all the component's attributes.
In case of an failed update to zookeeper the local cache will still
hold the fresh values. Updating any attribute uploads all attributes to
zookeeper.
This enables this object to be used as a local key/value store even if
zookeeper connection got lost. One must still catch exceptions related
to zookeeper connection loss.
"""
log = logging.getLogger("zuul.zk.components.ZooKeeperComponent")
def __init__(
self,
client: ZooKeeperClient,
kind: str,
hostname: str,
content: Dict[str, Any],
):
super().__init__(client, content)
self._kind = kind
self._persisted = False
self._hostname = hostname
self._path = self._register(content)
self._set_lock = threading.Lock()
def set(self, key: str, value: int) -> None:
"""
Sets an attribute of a component and uploads the whole component state
to zookeeper.
Upload only happens if the new value changed or if an previous upload
failed.
:param key: Attribute key
:param value: Value
"""
with self._set_lock:
upload = self._content.get(key) != value or not self._persisted
self._content[key] = value
if upload:
self._persisted = False
stat = self.kazoo_client.exists(self._path)
if not stat: # Re-register, if connection to zk was lost
self._path = self._register(self._content)
else:
content = json.dumps(self._content).encode(
encoding="UTF-8"
)
self.kazoo_client.set(
self._path, content, version=stat.version
)
self._persisted = True
def _register(self, content: Dict[str, Any]) -> str:
path = "{}/{}/{}-".format(
ZooKeeperComponentRegistry.ROOT, self._kind, self._hostname
)
self.log.debug(f"Creating {path} in Zookeeper")
return self.kazoo_client.create(
path,
json.dumps(content).encode("utf-8"),
makepath=True,
ephemeral=True,
sequence=True,
)
def unregister(self):
self.kazoo_client.delete(self._path)
class ZooKeeperComponentRegistry(ZooKeeperBase):
"""
ZooKeeper component registry. Each zuul component can register itself
using this registry. This will create a ephemeral zookeeper node, which
will be then deleted in case such component looses connection to zookeeper.
Any other component may request a list of registered components to check
their properties. List of components received by other components are
read-only. Only the component itself can update its entry in the registry.
"""
ROOT = "/zuul/components"
log = logging.getLogger("zuul.zk.components.ZooKeeperComponentRegistry")
def all(self, kind: str) -> List[ZooKeeperComponentReadOnly]:
"""
Get all registered components of a given kind. Components obtained
using this method cannot be updated.
:param kind: Kind of components
:return: List of read-only components
"""
result = []
path = "{}/{}".format(self.ROOT, kind)
self.kazoo_client.ensure_path(path)
for node in self.kazoo_client.get_children(path):
path = "{}/{}/{}".format(self.ROOT, kind, node)
data, _ = self.kazoo_client.get(path)
content = json.loads(data.decode("UTF-8"))
result.append(ZooKeeperComponentReadOnly(self.client, content))
return result
def register(self, kind: str, hostname: str) -> ZooKeeperComponent:
"""
Register component with a hostname. This method returns an updateable
component object.
:param kind: Kind of component
:param hostname: Hostname
:return: Path representing the components's ZNode
"""
return ZooKeeperComponent(
self.client,
kind,
hostname,
dict(
hostname=hostname,
state=ZooKeeperComponent.STOPPED,
),
)