Merge "Component Registry in ZooKeeper"
This commit is contained in:
@@ -0,0 +1,131 @@
|
||||
# Copyright 2021 BMW Group
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from zuul.lib.fingergw import FingerGateway
|
||||
from zuul.zk import ZooKeeperClient
|
||||
from zuul.zk.components import ZooKeeperComponentRegistry
|
||||
|
||||
from tests.base import ZuulTestCase, ZuulWebFixture
|
||||
|
||||
|
||||
class TestComponentRegistry(ZuulTestCase):
|
||||
tenant_config_file = 'config/single-tenant/main.yaml'
|
||||
|
||||
def setUp(self):
|
||||
super().setUp()
|
||||
|
||||
self.zk_client = ZooKeeperClient(
|
||||
self.zk_chroot_fixture.zk_hosts,
|
||||
tls_cert=self.zk_chroot_fixture.zookeeper_cert,
|
||||
tls_key=self.zk_chroot_fixture.zookeeper_key,
|
||||
tls_ca=self.zk_chroot_fixture.zookeeper_ca,
|
||||
)
|
||||
self.addCleanup(self.zk_client.disconnect)
|
||||
self.zk_client.connect()
|
||||
self.component_registry = ZooKeeperComponentRegistry(self.zk_client)
|
||||
|
||||
def test_scheduler_component(self):
|
||||
component_states = self.component_registry.all("schedulers")
|
||||
self.assertEqual(len(component_states), 1)
|
||||
|
||||
component = component_states[0]
|
||||
self.assertEqual(component.get("state"), component.RUNNING)
|
||||
|
||||
def test_executor_component(self):
|
||||
component_states = self.component_registry.all("executors")
|
||||
self.assertEqual(len(component_states), 1)
|
||||
|
||||
component = component_states[0]
|
||||
self.assertEqual(component.get("state"), component.RUNNING)
|
||||
|
||||
self.executor_server.pause()
|
||||
component = self.component_registry.all("executors")[0]
|
||||
self.assertEqual(component.get("state"), component.PAUSED)
|
||||
|
||||
self.executor_server.unpause()
|
||||
component = self.component_registry.all("executors")[0]
|
||||
self.assertEqual(component.get("state"), component.RUNNING)
|
||||
|
||||
def test_merger_component(self):
|
||||
component_states = self.component_registry.all("mergers")
|
||||
self.assertEqual(len(component_states), 0)
|
||||
|
||||
self._startMerger()
|
||||
|
||||
component_states = self.component_registry.all("mergers")
|
||||
self.assertEqual(len(component_states), 1)
|
||||
|
||||
component = component_states[0]
|
||||
self.assertEqual(component.get("state"), component.RUNNING)
|
||||
|
||||
self.merge_server.pause()
|
||||
component = self.component_registry.all("mergers")[0]
|
||||
self.assertEqual(component.get("state"), component.PAUSED)
|
||||
|
||||
self.merge_server.unpause()
|
||||
component = self.component_registry.all("mergers")[0]
|
||||
self.assertEqual(component.get("state"), component.RUNNING)
|
||||
|
||||
self.merge_server.stop()
|
||||
self.merge_server.join()
|
||||
# Set the merger to None so the test doesn't try to stop it again
|
||||
self.merge_server = None
|
||||
|
||||
component_states = self.component_registry.all("mergers")
|
||||
self.assertEqual(len(component_states), 0)
|
||||
|
||||
def test_fingergw_component(self):
|
||||
component_states = self.component_registry.all("finger-gateways")
|
||||
self.assertEqual(len(component_states), 0)
|
||||
|
||||
gateway = FingerGateway(
|
||||
self.config,
|
||||
("127.0.0.1", self.gearman_server.port, None, None, None),
|
||||
("127.0.0.1", 0),
|
||||
user=None,
|
||||
command_socket=None,
|
||||
pid_file=None
|
||||
)
|
||||
gateway.start()
|
||||
|
||||
try:
|
||||
component_states = self.component_registry.all("finger-gateways")
|
||||
self.assertEqual(len(component_states), 1)
|
||||
|
||||
component = component_states[0]
|
||||
self.assertEqual(component.get("state"), component.RUNNING)
|
||||
finally:
|
||||
gateway.stop()
|
||||
gateway.wait()
|
||||
|
||||
component_states = self.component_registry.all("finger-gateways")
|
||||
self.assertEqual(len(component_states), 0)
|
||||
|
||||
def test_web_component(self):
|
||||
component_states = self.component_registry.all("finger-gateways")
|
||||
self.assertEqual(len(component_states), 0)
|
||||
|
||||
self.useFixture(
|
||||
ZuulWebFixture(
|
||||
self.changes, self.config, self.additional_event_queues,
|
||||
self.upstream_root, self.rpcclient, self.poller_events,
|
||||
self.git_url_with_auth, self.addCleanup, self.test_root
|
||||
)
|
||||
)
|
||||
|
||||
component_states = self.component_registry.all("webs")
|
||||
self.assertEqual(len(component_states), 1)
|
||||
|
||||
component = component_states[0]
|
||||
self.assertEqual(component.get("state"), component.RUNNING)
|
||||
@@ -2572,6 +2572,9 @@ class ExecutorServer(BaseMergeServer):
|
||||
# perhaps hostname+pid.
|
||||
self.hostname = get_default(self.config, 'executor', 'hostname',
|
||||
socket.getfqdn())
|
||||
self.zk_component = self.zk_component_registry.register(
|
||||
'executors', self.hostname
|
||||
)
|
||||
self.log_streaming_port = log_streaming_port
|
||||
self.governor_lock = threading.Lock()
|
||||
self.run_lock = threading.Lock()
|
||||
@@ -2786,6 +2789,7 @@ class ExecutorServer(BaseMergeServer):
|
||||
self.governor_thread.daemon = True
|
||||
self.governor_thread.start()
|
||||
self.disk_accountant.start()
|
||||
self.zk_component.set('state', self.zk_component.RUNNING)
|
||||
|
||||
def register_work(self):
|
||||
if self._running:
|
||||
@@ -2804,6 +2808,7 @@ class ExecutorServer(BaseMergeServer):
|
||||
|
||||
def stop(self):
|
||||
self.log.debug("Stopping")
|
||||
self.zk_component.set('state', self.zk_component.STOPPED)
|
||||
# Use the BaseMergeServer's stop method to disconnect from ZooKeeper.
|
||||
super().stop()
|
||||
self.connections.stop()
|
||||
@@ -2871,12 +2876,14 @@ class ExecutorServer(BaseMergeServer):
|
||||
|
||||
def pause(self):
|
||||
self.log.debug('Pausing')
|
||||
self.zk_component.set('state', self.zk_component.PAUSED)
|
||||
self.pause_sensor.pause = True
|
||||
if self.process_merge_jobs:
|
||||
super().pause()
|
||||
|
||||
def unpause(self):
|
||||
self.log.debug('Resuming')
|
||||
self.zk_component.set('state', self.zk_component.RUNNING)
|
||||
self.pause_sensor.pause = False
|
||||
if self.process_merge_jobs:
|
||||
super().unpause()
|
||||
|
||||
@@ -23,7 +23,7 @@ import zuul.rpcclient
|
||||
from zuul.lib import streamer_utils
|
||||
from zuul.lib.commandsocket import CommandSocket
|
||||
from zuul.zk import ZooKeeperClient
|
||||
|
||||
from zuul.zk.components import ZooKeeperComponentRegistry
|
||||
|
||||
COMMANDS = ['stop']
|
||||
|
||||
@@ -148,6 +148,10 @@ class FingerGateway(object):
|
||||
|
||||
self.zk_client = ZooKeeperClient.fromConfig(config)
|
||||
self.zk_client.connect()
|
||||
self.hostname = socket.getfqdn()
|
||||
self.zk_component = ZooKeeperComponentRegistry(
|
||||
self.zk_client
|
||||
).register('finger-gateways', self.hostname)
|
||||
|
||||
def _runCommand(self):
|
||||
while self.command_running:
|
||||
@@ -198,9 +202,11 @@ class FingerGateway(object):
|
||||
self.server_thread = threading.Thread(target=self._run)
|
||||
self.server_thread.daemon = True
|
||||
self.server_thread.start()
|
||||
self.zk_component.set('state', self.zk_component.RUNNING)
|
||||
self.log.info("Finger gateway is started")
|
||||
|
||||
def stop(self):
|
||||
self.zk_component.set('state', self.zk_component.STOPPED)
|
||||
if self.server:
|
||||
try:
|
||||
self.server.shutdown()
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import socket
|
||||
import threading
|
||||
from abc import ABCMeta
|
||||
from configparser import ConfigParser
|
||||
@@ -26,6 +27,7 @@ from zuul.lib.config import get_default
|
||||
from zuul.lib.gearworker import ZuulGearWorker
|
||||
from zuul.merger import merger
|
||||
from zuul.merger.merger import nullcontext
|
||||
from zuul.zk.components import ZooKeeperComponentRegistry
|
||||
|
||||
COMMANDS = ['stop', 'pause', 'unpause']
|
||||
|
||||
@@ -76,6 +78,7 @@ class BaseMergeServer(metaclass=ABCMeta):
|
||||
|
||||
self.zk_client = ZooKeeperClient.fromConfig(self.config)
|
||||
self.zk_client.connect()
|
||||
self.zk_component_registry = ZooKeeperComponentRegistry(self.zk_client)
|
||||
|
||||
# This merger and its git repos are used to maintain
|
||||
# up-to-date copies of all the repos that are used by jobs, as
|
||||
@@ -251,6 +254,10 @@ class MergeServer(BaseMergeServer):
|
||||
connections,
|
||||
):
|
||||
super().__init__(config, 'merger', connections)
|
||||
self.hostname = socket.getfqdn()
|
||||
self.zk_component = self.zk_component_registry.register(
|
||||
'mergers', self.hostname
|
||||
)
|
||||
|
||||
self.command_map = dict(
|
||||
stop=self.stop,
|
||||
@@ -273,9 +280,11 @@ class MergeServer(BaseMergeServer):
|
||||
target=self.runCommand, name='command')
|
||||
self.command_thread.daemon = True
|
||||
self.command_thread.start()
|
||||
self.zk_component.set('state', self.zk_component.RUNNING)
|
||||
|
||||
def stop(self):
|
||||
self.log.debug("Stopping")
|
||||
self.zk_component.set('state', self.zk_component.STOPPED)
|
||||
super().stop()
|
||||
self._command_running = False
|
||||
self.command_socket.stop()
|
||||
@@ -286,11 +295,13 @@ class MergeServer(BaseMergeServer):
|
||||
|
||||
def pause(self):
|
||||
self.log.debug('Pausing')
|
||||
self.zk_component.set('state', self.zk_component.PAUSED)
|
||||
super().pause()
|
||||
|
||||
def unpause(self):
|
||||
self.log.debug('Resuming')
|
||||
super().unpause()
|
||||
self.zk_component.set('state', self.zk_component.RUNNING)
|
||||
|
||||
def runCommand(self):
|
||||
while self._command_running:
|
||||
|
||||
+9
-1
@@ -46,6 +46,7 @@ from zuul.executor.client import ExecutorClient
|
||||
from zuul.merger.client import MergeClient
|
||||
from zuul.model import Build, HoldRequest, Tenant, TriggerEvent
|
||||
from zuul.zk import ZooKeeperClient
|
||||
from zuul.zk.components import ZooKeeperComponentRegistry
|
||||
from zuul.zk.nodepool import ZooKeeperNodepool
|
||||
|
||||
COMMANDS = ['full-reconfigure', 'smart-reconfigure', 'stop', 'repl', 'norepl']
|
||||
@@ -333,6 +334,11 @@ class Scheduler(threading.Thread):
|
||||
self.zk_client = ZooKeeperClient.fromConfig(self.config)
|
||||
self.zk_client.connect()
|
||||
self.zk_nodepool = ZooKeeperNodepool(self.zk_client)
|
||||
self.zk_component = (
|
||||
ZooKeeperComponentRegistry(self.zk_client).register(
|
||||
"schedulers", self.hostname
|
||||
)
|
||||
)
|
||||
|
||||
self.trigger_event_queue = queue.Queue()
|
||||
self.result_event_queue = queue.Queue()
|
||||
@@ -388,10 +394,11 @@ class Scheduler(threading.Thread):
|
||||
self.rpc.start()
|
||||
self.rpc_slow.start()
|
||||
self.stats_thread.start()
|
||||
self.zk_component.set('state', self.zk_component.RUNNING)
|
||||
|
||||
def stop(self):
|
||||
self.zk_client.disconnect()
|
||||
self._stopped = True
|
||||
self.zk_component.set('state', self.zk_component.STOPPED)
|
||||
self.stats_stop.set()
|
||||
self.stopConnections()
|
||||
self.wake_event.set()
|
||||
@@ -404,6 +411,7 @@ class Scheduler(threading.Thread):
|
||||
self._command_running = False
|
||||
self.command_socket.stop()
|
||||
self.command_thread.join()
|
||||
self.zk_client.disconnect()
|
||||
|
||||
def runCommand(self):
|
||||
while self._command_running:
|
||||
|
||||
+11
-1
@@ -35,6 +35,7 @@ from zuul.lib.re2util import filter_allowed_disallowed
|
||||
import zuul.model
|
||||
import zuul.rpcclient
|
||||
from zuul.zk import ZooKeeperClient
|
||||
from zuul.zk.components import ZooKeeperComponentRegistry
|
||||
from zuul.zk.nodepool import ZooKeeperNodepool
|
||||
from zuul.lib.auth import AuthenticatorRegistry
|
||||
from zuul.lib.config import get_default
|
||||
@@ -1227,6 +1228,8 @@ class ZuulWeb(object):
|
||||
self.static_path = os.path.abspath(
|
||||
get_default(self.config, 'web', 'static_path', STATIC_DIR)
|
||||
)
|
||||
self.hostname = socket.getfqdn()
|
||||
|
||||
gear_server = get_default(self.config, 'gearman', 'server')
|
||||
gear_port = get_default(self.config, 'gearman', 'port', 4730)
|
||||
ssl_key = get_default(self.config, 'gearman', 'ssl_key')
|
||||
@@ -1238,6 +1241,12 @@ class ZuulWeb(object):
|
||||
ssl_key, ssl_cert, ssl_ca,
|
||||
client_id='Zuul Web Server')
|
||||
self.zk_client = ZooKeeperClient.fromConfig(self.config)
|
||||
self.zk_client.connect()
|
||||
self.zk_component = (
|
||||
ZooKeeperComponentRegistry(self.zk_client).register(
|
||||
'webs', self.hostname
|
||||
)
|
||||
)
|
||||
|
||||
self.connections = connections
|
||||
self.authenticators = authenticators
|
||||
@@ -1376,7 +1385,6 @@ class ZuulWeb(object):
|
||||
|
||||
def start(self):
|
||||
self.log.debug("ZuulWeb starting")
|
||||
self.zk_client.connect()
|
||||
self.stream_manager.start()
|
||||
self.wsplugin = WebSocketPlugin(cherrypy.engine)
|
||||
self.wsplugin.subscribe()
|
||||
@@ -1389,9 +1397,11 @@ class ZuulWeb(object):
|
||||
name='command')
|
||||
self.command_thread.daemon = True
|
||||
self.command_thread.start()
|
||||
self.zk_component.set('state', self.zk_component.RUNNING)
|
||||
|
||||
def stop(self):
|
||||
self.log.debug("ZuulWeb stopping")
|
||||
self.zk_component.set('state', self.zk_component.STOPPED)
|
||||
self.rpc.shutdown()
|
||||
cherrypy.engine.exit()
|
||||
# Not strictly necessary, but without this, if the server is
|
||||
|
||||
@@ -0,0 +1,181 @@
|
||||
# Copyright 2020 BMW Group
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import json
|
||||
import logging
|
||||
import threading
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
from kazoo.client import KazooClient
|
||||
|
||||
from zuul.zk import NoClientException, ZooKeeperBase, ZooKeeperClient
|
||||
|
||||
|
||||
class ZooKeeperComponentReadOnly(object):
|
||||
"""
|
||||
Read-only component object.
|
||||
"""
|
||||
|
||||
STOPPED = 'stopped'
|
||||
RUNNING = 'running'
|
||||
PAUSED = 'paused'
|
||||
|
||||
def __init__(self, client: ZooKeeperClient, content: Dict[str, Any]):
|
||||
self._client = client
|
||||
self._content = content
|
||||
|
||||
@property
|
||||
def kazoo_client(self) -> KazooClient:
|
||||
if not self._client.client:
|
||||
raise NoClientException()
|
||||
return self._client.client
|
||||
|
||||
def get(self, key: str, default: Optional[Any] = None) -> Any:
|
||||
"""
|
||||
Gets an attribute of the component.
|
||||
|
||||
:param key: Attribute key
|
||||
:param default: Default value (default: None)
|
||||
:return: Value of the attribute
|
||||
"""
|
||||
return self._content.get(key, default)
|
||||
|
||||
|
||||
class ZooKeeperComponent(ZooKeeperComponentReadOnly):
|
||||
"""
|
||||
Read/write component object.
|
||||
|
||||
This object holds an offline cache of all the component's attributes.
|
||||
In case of an failed update to zookeeper the local cache will still
|
||||
hold the fresh values. Updating any attribute uploads all attributes to
|
||||
zookeeper.
|
||||
|
||||
This enables this object to be used as a local key/value store even if
|
||||
zookeeper connection got lost. One must still catch exceptions related
|
||||
to zookeeper connection loss.
|
||||
"""
|
||||
|
||||
log = logging.getLogger("zuul.zk.components.ZooKeeperComponent")
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
client: ZooKeeperClient,
|
||||
kind: str,
|
||||
hostname: str,
|
||||
content: Dict[str, Any],
|
||||
):
|
||||
super().__init__(client, content)
|
||||
self._kind = kind
|
||||
self._persisted = False
|
||||
self._hostname = hostname
|
||||
self._path = self._register(content)
|
||||
self._set_lock = threading.Lock()
|
||||
|
||||
def set(self, key: str, value: int) -> None:
|
||||
"""
|
||||
Sets an attribute of a component and uploads the whole component state
|
||||
to zookeeper.
|
||||
|
||||
Upload only happens if the new value changed or if an previous upload
|
||||
failed.
|
||||
|
||||
:param key: Attribute key
|
||||
:param value: Value
|
||||
"""
|
||||
|
||||
with self._set_lock:
|
||||
upload = self._content.get(key) != value or not self._persisted
|
||||
self._content[key] = value
|
||||
if upload:
|
||||
self._persisted = False
|
||||
stat = self.kazoo_client.exists(self._path)
|
||||
if not stat: # Re-register, if connection to zk was lost
|
||||
self._path = self._register(self._content)
|
||||
else:
|
||||
content = json.dumps(self._content).encode(
|
||||
encoding="UTF-8"
|
||||
)
|
||||
self.kazoo_client.set(
|
||||
self._path, content, version=stat.version
|
||||
)
|
||||
self._persisted = True
|
||||
|
||||
def _register(self, content: Dict[str, Any]) -> str:
|
||||
path = "{}/{}/{}-".format(
|
||||
ZooKeeperComponentRegistry.ROOT, self._kind, self._hostname
|
||||
)
|
||||
self.log.debug(f"Creating {path} in Zookeeper")
|
||||
return self.kazoo_client.create(
|
||||
path,
|
||||
json.dumps(content).encode("utf-8"),
|
||||
makepath=True,
|
||||
ephemeral=True,
|
||||
sequence=True,
|
||||
)
|
||||
|
||||
def unregister(self):
|
||||
self.kazoo_client.delete(self._path)
|
||||
|
||||
|
||||
class ZooKeeperComponentRegistry(ZooKeeperBase):
|
||||
"""
|
||||
ZooKeeper component registry. Each zuul component can register itself
|
||||
using this registry. This will create a ephemeral zookeeper node, which
|
||||
will be then deleted in case such component looses connection to zookeeper.
|
||||
|
||||
Any other component may request a list of registered components to check
|
||||
their properties. List of components received by other components are
|
||||
read-only. Only the component itself can update its entry in the registry.
|
||||
"""
|
||||
|
||||
ROOT = "/zuul/components"
|
||||
|
||||
log = logging.getLogger("zuul.zk.components.ZooKeeperComponentRegistry")
|
||||
|
||||
def all(self, kind: str) -> List[ZooKeeperComponentReadOnly]:
|
||||
"""
|
||||
Get all registered components of a given kind. Components obtained
|
||||
using this method cannot be updated.
|
||||
|
||||
:param kind: Kind of components
|
||||
:return: List of read-only components
|
||||
"""
|
||||
|
||||
result = []
|
||||
path = "{}/{}".format(self.ROOT, kind)
|
||||
self.kazoo_client.ensure_path(path)
|
||||
for node in self.kazoo_client.get_children(path):
|
||||
path = "{}/{}/{}".format(self.ROOT, kind, node)
|
||||
data, _ = self.kazoo_client.get(path)
|
||||
content = json.loads(data.decode("UTF-8"))
|
||||
result.append(ZooKeeperComponentReadOnly(self.client, content))
|
||||
return result
|
||||
|
||||
def register(self, kind: str, hostname: str) -> ZooKeeperComponent:
|
||||
"""
|
||||
Register component with a hostname. This method returns an updateable
|
||||
component object.
|
||||
|
||||
:param kind: Kind of component
|
||||
:param hostname: Hostname
|
||||
:return: Path representing the components's ZNode
|
||||
"""
|
||||
return ZooKeeperComponent(
|
||||
self.client,
|
||||
kind,
|
||||
hostname,
|
||||
dict(
|
||||
hostname=hostname,
|
||||
state=ZooKeeperComponent.STOPPED,
|
||||
),
|
||||
)
|
||||
Reference in New Issue
Block a user