From a612aa603c5c6935dc1c3617a18f81b3004d192d Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Thu, 12 May 2022 16:23:55 -0700 Subject: [PATCH] Add the component registry from Zuul This uses a cache and lets us update metadata about components and act on changes quickly (as compared to the current launcher registry which doesn't have provision for live updates). This removes the launcher registry, so operators should take care to update all launchers within a short period of time since the functionality to yield to a specific provider depends on it. Change-Id: I6409db0edf022d711f4e825e2b3eb487e7a79922 --- nodepool/builder.py | 9 + nodepool/driver/__init__.py | 2 +- nodepool/launcher.py | 46 +- nodepool/model_api.py | 16 + nodepool/stats.py | 3 +- nodepool/status.py | 4 +- nodepool/tests/__init__.py | 11 + nodepool/tests/unit/test_launcher.py | 32 +- nodepool/tests/unit/test_zk.py | 44 +- nodepool/version.py | 6 + nodepool/zk/components.py | 413 ++++++++++++++++++ nodepool/zk/zookeeper.py | 114 +---- .../component-registry-327e1ade02155e39.yaml | 11 + 13 files changed, 562 insertions(+), 149 deletions(-) create mode 100644 nodepool/model_api.py create mode 100644 nodepool/zk/components.py create mode 100644 releasenotes/notes/component-registry-327e1ade02155e39.yaml diff --git a/nodepool/builder.py b/nodepool/builder.py index e1e57ae45..28dd9b2d2 100644 --- a/nodepool/builder.py +++ b/nodepool/builder.py @@ -32,6 +32,8 @@ from nodepool import provider_manager from nodepool import stats from nodepool.zk import zookeeper as zk from nodepool.zk import ZooKeeperClient +from nodepool.zk.components import BuilderComponent +from nodepool.version import get_version_string MINS = 60 @@ -1402,6 +1404,13 @@ class NodePoolBuilder(object): ) self.zk_client.connect() self.zk = zk.ZooKeeper(self.zk_client, enable_cache=False) + + hostname = socket.gethostname() + self.component_info = BuilderComponent( + self.zk_client, hostname, + version=get_version_string()) + self.component_info.register() + self.log.debug('Starting listener for build jobs') # Create build and upload worker objects diff --git a/nodepool/driver/__init__.py b/nodepool/driver/__init__.py index 37f0b34d6..68b72ff45 100644 --- a/nodepool/driver/__init__.py +++ b/nodepool/driver/__init__.py @@ -641,7 +641,7 @@ class NodeRequestHandler(NodeRequestHandlerNotifications, # want to make sure we don't continuously grow this array. if self.launcher_id not in self.request.declined_by: self.request.declined_by.append(self.launcher_id) - launchers = set([x.id for x in self.zk.getRegisteredLaunchers()]) + launchers = set([x.id for x in self.zk.getRegisteredPools()]) if launchers.issubset(set(self.request.declined_by)): # All launchers have declined it self.log.debug("Failing declined node request") diff --git a/nodepool/launcher.py b/nodepool/launcher.py index c5aa08de7..4e7710e25 100644 --- a/nodepool/launcher.py +++ b/nodepool/launcher.py @@ -32,9 +32,10 @@ from nodepool import stats from nodepool import config as nodepool_config from nodepool.zk import zookeeper as zk from nodepool.zk import ZooKeeperClient +from nodepool.zk.components import LauncherComponent, PoolComponent from nodepool.driver.utils import QuotaInformation, QuotaSupport from nodepool.logconfig import get_annotated_logger -from nodepool.version import version_info as npd_version_info +from nodepool.version import get_version_string MINS = 60 @@ -109,7 +110,7 @@ class PoolWorker(threading.Thread, stats.StatsReporter): # become out of date as the loop progresses, but it should be # good enough to determine whether we should process requests # which express a preference for a specific provider. - launchers = self.zk.getRegisteredLaunchers() + launchers = self.zk.getRegisteredPools() pm = self.getProviderManager() has_quota_support = isinstance(pm, QuotaSupport) @@ -174,7 +175,7 @@ class PoolWorker(threading.Thread, stats.StatsReporter): candidate_launchers = set( x.id for x in launchers if x.provider_name == req.provider - and x.supported_labels.issuperset(req.node_types)) + and set(x.supported_labels).issuperset(req.node_types)) if candidate_launchers: # There is a launcher online which can satisfy the request if not candidate_launchers.issubset(set(req.declined_by)): @@ -349,6 +350,22 @@ class PoolWorker(threading.Thread, stats.StatsReporter): def run(self): self.running = True + # Make sure we're always registered with ZK + hostname = socket.gethostname() + self.component_info = PoolComponent( + self.zk.zk_client, hostname, + version=get_version_string()) + labels = set() + for prov_cfg in self.nodepool.config.providers.values(): + labels.update(prov_cfg.getSupportedLabels()) + self.component_info.content.update({ + 'id': self.launcher_id, + 'provider_name': self.provider_name, + 'supported_labels': list(labels), + 'state': self.component_info.RUNNING, + }) + self.component_info.register() + while self.running: try: # Don't do work if we've lost communication with the ZK cluster @@ -360,14 +377,11 @@ class PoolWorker(threading.Thread, stats.StatsReporter): if did_suspend: self.log.info("ZooKeeper available. Resuming") - # Make sure we're always registered with ZK - launcher = zk.Launcher() - launcher.id = self.launcher_id + labels = set() for prov_cfg in self.nodepool.config.providers.values(): - launcher.supported_labels.update( - prov_cfg.getSupportedLabels()) - launcher.provider_name = self.provider_name - self.zk.registerLauncher(launcher) + labels.update(prov_cfg.getSupportedLabels()) + if set(self.component_info.supported_labels) != labels: + self.component_info.supported_labels = list(labels) self.updateProviderLimits( self.nodepool.config.providers.get(self.provider_name)) @@ -408,7 +422,7 @@ class PoolWorker(threading.Thread, stats.StatsReporter): ''' self.log.info("%s received stop" % self.name) self.running = False - self.zk.deregisterLauncher(self.launcher_id) + self.component_info.unregister() self.stop_event.set() @@ -982,6 +996,12 @@ class NodePool(threading.Thread): ) self.zk_client.connect() self.zk = zk.ZooKeeper(self.zk_client) + + hostname = socket.gethostname() + self.component_info = LauncherComponent( + self.zk_client, hostname, + version=get_version_string()) + self.component_info.register() else: self.log.debug("Detected ZooKeeper server changes") self.zk_client.resetHosts(configured) @@ -1142,7 +1162,7 @@ class NodePool(threading.Thread): Start point for the NodePool thread. ''' self.log.info("Nodepool launcher %s starting", - npd_version_info.release_string()) + get_version_string()) while not self._stopped: try: self.updateConfig() @@ -1156,6 +1176,8 @@ class NodePool(threading.Thread): if did_suspend: self.log.info("ZooKeeper available. Resuming") + if self.component_info.state != self.component_info.RUNNING: + self.component_info.state = self.component_info.RUNNING self.createMinReady() if not self._cleanup_thread: diff --git a/nodepool/model_api.py b/nodepool/model_api.py new file mode 100644 index 000000000..1a5071fd8 --- /dev/null +++ b/nodepool/model_api.py @@ -0,0 +1,16 @@ +# Copyright 2022 Acme Gating, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# Currently unused. Included here for future use similar to Zuul. +MODEL_API = 0 diff --git a/nodepool/stats.py b/nodepool/stats.py index f1d333e24..02b82e809 100644 --- a/nodepool/stats.py +++ b/nodepool/stats.py @@ -20,6 +20,7 @@ import statsd from nodepool.zk import zookeeper as zk + log = logging.getLogger("nodepool.stats") @@ -97,7 +98,7 @@ class StatsReporter(object): states = {} - launchers = zk_conn.getRegisteredLaunchers() + launchers = zk_conn.getRegisteredPools() labels = set() for launcher in launchers: labels.update(launcher.supported_labels) diff --git a/nodepool/status.py b/nodepool/status.py index c99b731f4..e8b505c87 100644 --- a/nodepool/status.py +++ b/nodepool/status.py @@ -267,9 +267,9 @@ def label_list(zk): # NOTE(ianw): maybe add to each entry a list of which # launchers support the label? labels = set() - launchers = zk.getRegisteredLaunchers() + launchers = zk.getRegisteredPools() for launcher in launchers: - labels.update(launcher.supported_labels) + labels.update(set(launcher.supported_labels)) for label in labels: objs.append({'label': label}) diff --git a/nodepool/tests/__init__.py b/nodepool/tests/__init__.py index 84bd8e132..c9ac6bb7b 100644 --- a/nodepool/tests/__init__.py +++ b/nodepool/tests/__init__.py @@ -38,6 +38,7 @@ from nodepool import launcher from nodepool import webapp from nodepool.zk import zookeeper as zk from nodepool.zk import ZooKeeperClient +from nodepool.zk.components import COMPONENT_REGISTRY from nodepool.cmd.config_validator import ConfigValidator from nodepool.nodeutils import iterate_timeout @@ -160,9 +161,19 @@ class StatsdFixture(fixtures.Fixture): self.thread.join() +class GlobalRegistryFixture(fixtures.Fixture): + def _setUp(self): + self.addCleanup(self._cleanup) + + def _cleanup(self): + # Remove our component registry from the global + COMPONENT_REGISTRY.clearRegistry() + + class BaseTestCase(testtools.TestCase): def setUp(self): super(BaseTestCase, self).setUp() + self.useFixture(GlobalRegistryFixture()) test_timeout = os.environ.get('OS_TEST_TIMEOUT', 60) try: test_timeout = int(test_timeout) diff --git a/nodepool/tests/unit/test_launcher.py b/nodepool/tests/unit/test_launcher.py index 8285a6545..8c37d8536 100644 --- a/nodepool/tests/unit/test_launcher.py +++ b/nodepool/tests/unit/test_launcher.py @@ -18,13 +18,16 @@ import math import time import fixtures import mock +import socket import testtools from nodepool import tests from nodepool.zk import zookeeper as zk +from nodepool.zk.components import PoolComponent from nodepool.driver.fake import provider as fakeprovider from nodepool.nodeutils import iterate_timeout import nodepool.launcher +from nodepool.version import get_version_string from kazoo import exceptions as kze @@ -760,10 +763,17 @@ class TestLauncher(tests.DBTestCase): # Create a dummy launcher with a different set of supported labels # than what we are going to request. - dummy_launcher = zk.Launcher() - dummy_launcher.provider_name = 'other-provider' - dummy_launcher.supported_labels = {'other-label', } - self.zk.registerLauncher(dummy_launcher) + hostname = socket.gethostname() + dummy_component = PoolComponent( + self.zk.zk_client, hostname, + version=get_version_string()) + dummy_component.content.update({ + 'id': 'dummy', + 'provider_name': 'other-provider', + 'supported_labels': ['other-label'], + 'state': dummy_component.RUNNING, + }) + dummy_component.register() # Node request for a specific provider that doesn't support the # requested node type. @@ -777,6 +787,7 @@ class TestLauncher(tests.DBTestCase): self.assertEqual(req.state, zk.FULFILLED) self.assertEqual(len(req.nodes), 1) self.zk.getNode(req.nodes[0]) + dummy_component.unregister() def test_node_boot_from_volume(self): """Test that an image and node are created from a volume""" @@ -1683,7 +1694,7 @@ class TestLauncher(tests.DBTestCase): self.assertEqual(2, len(pool._pool_threads)) # We should have two pool workers registered - self.assertEqual(2, len(self.zk.getRegisteredLaunchers())) + self.assertEqual(2, len(self.zk.getRegisteredPools())) self.replace_config(configfile, 'launcher_two_provider_remove.yaml') @@ -1697,7 +1708,7 @@ class TestLauncher(tests.DBTestCase): pass # We should have one pool worker registered - self.assertEqual(1, len(self.zk.getRegisteredLaunchers())) + self.assertEqual(1, len(self.zk.getRegisteredPools())) def test_failed_provider(self): """Test that broken provider doesn't fail node requests.""" @@ -1913,18 +1924,19 @@ class TestLauncher(tests.DBTestCase): pool.start() self.waitForNodes('fake-label') - launchers = self.zk.getRegisteredLaunchers() + launchers = self.zk.getRegisteredPools() self.assertEqual(1, len(launchers)) # the fake-label-unused label should not appear - self.assertEqual({'fake-label'}, launchers[0].supported_labels) + self.assertEqual({'fake-label'}, set(launchers[0].supported_labels)) self.replace_config(configfile, 'launcher_reg2.yaml') # we should get 1 additional label now - while launchers[0].supported_labels != {'fake-label', 'fake-label2'}: + while (set(launchers[0].supported_labels) != + {'fake-label', 'fake-label2'}): time.sleep(1) - launchers = self.zk.getRegisteredLaunchers() + launchers = self.zk.getRegisteredPools() @mock.patch('nodepool.driver.openstack.handler.' 'OpenStackNodeLauncher._launchNode') diff --git a/nodepool/tests/unit/test_zk.py b/nodepool/tests/unit/test_zk.py index cf2addd71..9d05c8966 100644 --- a/nodepool/tests/unit/test_zk.py +++ b/nodepool/tests/unit/test_zk.py @@ -14,12 +14,39 @@ import testtools import time import uuid +import socket from nodepool import exceptions as npe from nodepool import tests from nodepool.zk import zookeeper as zk +from nodepool.zk.components import PoolComponent from nodepool.config import ZooKeeperConnectionConfig, buildZooKeeperHosts from nodepool.nodeutils import iterate_timeout +from nodepool.version import get_version_string + + +class TestComponentRegistry(tests.DBTestCase): + + def test_pool_component(self): + hostname = socket.gethostname() + launcher = PoolComponent( + self.zk.zk_client, hostname, + version=get_version_string()) + launcher.content.update({ + 'id': "launcher-Poolworker.provider-main-" + uuid.uuid4().hex, + 'provider_name': 'provider', + 'supported_labels': [], + 'state': launcher.RUNNING, + }) + launcher.register() + + launchers = self.zk.getRegisteredPools() + self.assertEqual(1, len(launchers)) + self.assertEqual(launcher.id, list(launchers)[0].id) + + launcher.unregister() + launchers = self.zk.getRegisteredPools() + self.assertEqual(0, len(launchers)) class TestZooKeeper(tests.DBTestCase): @@ -564,23 +591,6 @@ class TestZooKeeper(tests.DBTestCase): self.zk.deleteUpload("trusty", "000", "rax", "000001") self.assertIsNone(self.zk.client.exists(path)) - def test_registerLauncher(self): - launcher = zk.Launcher() - launcher.id = "launcher-Poolworker.provider-main-" + uuid.uuid4().hex - self.zk.registerLauncher(launcher) - launchers = self.zk.getRegisteredLaunchers() - self.assertEqual(1, len(launchers)) - self.assertEqual(launcher.id, launchers[0].id) - - def test_registerLauncher_safe_repeat(self): - launcher = zk.Launcher() - launcher.id = "launcher-Poolworker.provider-main-" + uuid.uuid4().hex - self.zk.registerLauncher(launcher) - self.zk.registerLauncher(launcher) - launchers = self.zk.getRegisteredLaunchers() - self.assertEqual(1, len(launchers)) - self.assertEqual(launcher.id, launchers[0].id) - def test_getNodeRequests_empty(self): self.assertEqual([], self.zk.getNodeRequests()) diff --git a/nodepool/version.py b/nodepool/version.py index 144413696..1035d83ad 100644 --- a/nodepool/version.py +++ b/nodepool/version.py @@ -30,3 +30,9 @@ try: git_version = _metadata['git_version'] except Exception: pass + + +def get_version_string(): + if is_release: + return release_string + return "{} {}".format(release_string, git_version) diff --git a/nodepool/zk/components.py b/nodepool/zk/components.py new file mode 100644 index 000000000..07c228e79 --- /dev/null +++ b/nodepool/zk/components.py @@ -0,0 +1,413 @@ +# Copyright 2020 BMW Group +# Copyright 2022 Acme Gating, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import sys +import json +import logging +import threading +import time +from collections import defaultdict + +from kazoo.exceptions import NoNodeError +from kazoo.protocol.states import EventType + +from nodepool.zk import ZooKeeperBase +from nodepool.model_api import MODEL_API + + +COMPONENTS_ROOT = "/nodepool/components" + + +class GlobalRegistry: + def __init__(self): + self.registry = None + + def create(self, zk_client): + if not self.registry: + self.registry = ComponentRegistry(zk_client) + return self.registry + + def clearRegistry(self): + self.registry = None + + @property + def model_api(self): + return self.registry.model_api + + +COMPONENT_REGISTRY = GlobalRegistry() + + +class BaseComponent(ZooKeeperBase): + """ + Read/write component object. + + This object holds an offline cache of all the component's attributes. In + case of an failed update to ZooKeeper the local cache will still hold the + fresh values. Updating any attribute uploads all attributes to ZooKeeper. + + This enables this object to be used as local key/value store even if the + ZooKeeper connection got lost. + + :arg client ZooKeeperClient: The ZooKeeperClient object to use, or + None if this should be a read-only component. + :arg hostname str: The component's hostname (multiple components with + the same hostname may be registered; the registry will create unique + nodes for each). + """ + + # Component states + INITIALIZING = "initializing" + RUNNING = "running" + PAUSED = "paused" + STOPPED = "stopped" + + log = logging.getLogger("nodepool.Component") + kind = "base" + + def __init__(self, client, hostname, version=None): + # Ensure that the content is available before setting any other + # attribute, because our __setattr__ implementation is relying on it. + self.__dict__["content"] = { + "hostname": hostname, + "state": self.STOPPED, + "kind": self.kind, + "version": version, + "model_api": 0, + } + super().__init__(client) + + self.path = None + self._zstat = None + self.register_lock = threading.Lock() + + def __getattr__(self, name): + try: + return self.content[name] + except KeyError: + raise AttributeError + + def __setattr__(self, name, value): + # If the specified attribute is not part of our content dictionary, + # fall back to the default __settattr__ behaviour. + if name not in self.content.keys(): + return super().__setattr__(name, value) + + # Set the value in the local content dict + self.content[name] = value + + with self.register_lock: + if not self.path: + self.log.error( + "Path is not set on this component, did you forget " + "to call register()?" + ) + return + + # Update the ZooKeeper node + content = json.dumps(self.content, sort_keys=True).encode("utf-8") + try: + zstat = self.kazoo_client.set( + self.path, content, version=self._zstat.version + ) + self._zstat = zstat + except NoNodeError: + self.log.error("Could not update %s in ZooKeeper", self) + + def register(self, model_api=MODEL_API): + self.content['model_api'] = model_api + with self.register_lock: + path = "/".join([COMPONENTS_ROOT, self.kind, self.hostname]) + self.log.info("Registering component in ZooKeeper %s", path) + self.path, self._zstat = self.kazoo_client.create( + path, + json.dumps(self.content, sort_keys=True).encode("utf-8"), + makepath=True, + ephemeral=True, + sequence=True, + # Also return the zstat, which is necessary to successfully + # update the component. + include_data=True, + ) + + if not COMPONENT_REGISTRY.registry: + return + + # Wait 5 seconds for the component to appear in our local + # cache so that operations which rely on lists of available + # labels, etc, behave more synchronously. + for x in range(50): + registered = set() + for kind, components in COMPONENT_REGISTRY.registry.all(): + for component in components: + registered.add(component.path) + if self.path in registered: + return + time.sleep(0.1) + self.log.info("Did not see component registration for %s", path) + + def unregister(self): + with self.register_lock: + self.log.info("Unregistering component in ZooKeeper %s", self.path) + self.client.on_connect_listeners.remove(self._onConnect) + self.client.on_disconnect_listeners.remove(self._onDisconnect) + self.client.on_reconnect_listeners.remove(self._onReconnect) + self.kazoo_client.delete(self.path) + # Break the object so we can't register again + del self.register_lock + + # Wait 5 seconds for the component to appear in our local + # cache so that operations which rely on lists of available + # labels, etc, behave more synchronously. + for x in range(50): + registered = set() + for kind, components in COMPONENT_REGISTRY.registry.all(): + for component in components: + registered.add(component.path) + if self.path not in registered: + return + time.sleep(0.1) + self.log.info("Did not see component unregistration for %s", self.path) + + def _onReconnect(self): + self.register() + + def updateFromDict(self, data): + self.content.update(data) + + @classmethod + def fromDict(cls, client, hostname, data): + component = cls(client, hostname) + component.updateFromDict(data) + return component + + def __repr__(self): + return f"<{self.__class__.__name__} {self.content}>" + + +class LauncherComponent(BaseComponent): + kind = "launcher" + + +class BuilderComponent(BaseComponent): + kind = "builder" + + +# Not a process, but rather a provider-pool within a launcher. +class PoolComponent(BaseComponent): + kind = "pool" + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.initial_state = { + "id": None, + "provider_name": None, + "supported_labels": [], + } + self.content.update(self.initial_state) + + +class ComponentRegistry(ZooKeeperBase): + """A component registry is organized like: + + /nodepool/components/{kind}/{sequence} + + Where kind corresponds to one of the classes in this module, and + sequence is a ZK sequence node with a prefix of the hostname in + order to produce a unique id for multiple identical components + running on the same host. An example path: + + /nodepool/components/scheduler/hostname0000000000 + + Components are ephemeral nodes, and so if the actual service + disconnects from ZK, the node will disappear. + + Component objects returned by this class are read-only; updating + their attributes will not be reflected in ZooKeeper. + """ + log = logging.getLogger("nodepool.ComponentRegistry") + + COMPONENT_CLASSES = { + "pool": PoolComponent, + "launcher": LauncherComponent, + "builder": BuilderComponent, + } + + def __init__(self, client): + super().__init__(client) + + self.client = client + self._component_tree = None + # kind -> hostname -> component + self._cached_components = defaultdict(dict) + + self.model_api = None + # Have we initialized enough to trust the model_api + self._init = False + # If we are already connected when the class is instantiated, directly + # call the onConnect callback. + if self.client.connected: + self._onConnect() + + def _getComponentRoot(self, kind): + return '/'.join([COMPONENTS_ROOT, kind]) + + def _getComponentPath(self, kind, hostname): + return '/'.join([COMPONENTS_ROOT, kind, hostname]) + + def _onConnect(self): + for kind in self.COMPONENT_CLASSES.keys(): + root = self._getComponentRoot(kind) + self.kazoo_client.ensure_path(root) + self.kazoo_client.ChildrenWatch( + root, self._makeComponentRootWatcher(kind)) + self._init = True + self._updateMinimumModelApi() + + def _makeComponentRootWatcher(self, kind): + def watch(children, event=None): + return self._onComponentRootUpdate(kind, children, event) + return watch + + def _onComponentRootUpdate(self, kind, children, event): + for hostname in children: + component = self._cached_components[kind].get(hostname) + if not component: + self.log.info("Noticed new %s component %s", kind, hostname) + root = self._getComponentPath(kind, hostname) + self.kazoo_client.DataWatch( + root, self._makeComponentWatcher(kind, hostname)) + + def _makeComponentWatcher(self, kind, hostname): + def watch(data, stat, event=None): + return self._onComponentUpdate(kind, hostname, data, stat, event) + return watch + + def _onComponentUpdate(self, kind, hostname, data, stat, event): + if event: + etype = event.type + else: + etype = None + self.log.debug( + "Registry %s got event %s for %s %s", + self, etype, kind, hostname) + if (etype in (None, EventType.CHANGED, EventType.CREATED) and + data is not None): + + # Perform an in-place update of the cached component (if any) + component = self._cached_components.get(kind, {}).get(hostname) + d = json.loads(data.decode("utf-8")) + + self.log.info( + "Component %s %s updated: %s", + kind, hostname, d) + + if component: + if (stat.version <= component._zstat.version): + # Don't update to older data + return + component.updateFromDict(d) + component._zstat = stat + else: + # Create a new component from the data structure + # Get the correct kind of component + # TODO (felix): KeyError on unknown component type? + component_cls = self.COMPONENT_CLASSES[kind] + # Pass in null ZK client to make a read-only component + # instance. + component = component_cls.fromDict(None, hostname, d) + component.path = self._getComponentPath(kind, hostname) + component._zstat = stat + + self._cached_components[kind][hostname] = component + self._updateMinimumModelApi() + elif (etype == EventType.DELETED or data is None): + self.log.info( + "Noticed %s component %s disappeared", + kind, hostname) + try: + del self._cached_components[kind][hostname] + except KeyError: + # If it's already gone, don't care + pass + self._updateMinimumModelApi() + # Return False to stop the datawatch + return False + + def all(self, kind=None): + """Returns a list of components. + + If kind is None, then a list of tuples is returned, with each tuple + being (kind, [list of components]). + + :arg kind str: The type of component to look up in the registry, or + None to return all kinds + """ + + if kind is None: + return [(kind, list(components.values())) + for (kind, components) in self._cached_components.items()] + + # Filter the cached components for the given kind + return self._cached_components.get(kind, {}).values() + + def getMinimumModelApi(self): + """Get the minimum model API version of all currently connected + components""" + + # Start with our own version in case we're the only component + # and we haven't registered. + version = MODEL_API + for kind, components in self.all(): + for component in components: + version = min(version, component.model_api) + return version + + def _updateMinimumModelApi(self): + if not self._init: + return + version = self.getMinimumModelApi() + if version != self.model_api: + self.log.info(f"System minimum data model version {version}; " + f"this component {MODEL_API}") + if self.model_api is None: + if version < MODEL_API: + self.log.info("The data model version of this component is " + "newer than the rest of the system; this " + "component will operate in compatability mode " + "until the system is upgraded") + elif version > MODEL_API: + self.log.error("The data model version of this component is " + "older than the rest of the system; " + "exiting to prevent data corruption") + sys.exit(1) + else: + if version > self.model_api: + if version > MODEL_API: + self.log.info("The data model version of this component " + "is older than other components in the " + "system, so other components will operate " + "in a compability mode; upgrade this " + "component as soon as possible to complete " + "the system upgrade") + elif version == MODEL_API: + self.log.info("The rest of the system has been upgraded " + "to the data model version of this " + "component") + elif version < self.model_api: + self.log.error("A component with a data model version older " + "than the rest of the system has been started; " + "data corruption is very likely to occur.") + # Should we exit here as well? + self.model_api = version diff --git a/nodepool/zk/zookeeper.py b/nodepool/zk/zookeeper.py index 135410de4..15319921e 100644 --- a/nodepool/zk/zookeeper.py +++ b/nodepool/zk/zookeeper.py @@ -26,6 +26,7 @@ from kazoo.recipe.election import Election from nodepool import exceptions as npe from nodepool.logconfig import get_annotated_logger +from nodepool.zk.components import COMPONENT_REGISTRY # States: # We are building this image (or node) but it is not ready for use. @@ -113,56 +114,6 @@ class Serializable(abc.ABC): return json.dumps(self.toDict()).encode('utf8') -class Launcher(Serializable): - ''' - Class to describe a nodepool launcher. - ''' - - def __init__(self): - self.id = None - self.provider_name = None - self._supported_labels = set() - - def __eq__(self, other): - if isinstance(other, Launcher): - return (self.id == other.id and - self.supported_labels == other.supported_labels) - else: - return False - - def __hash__(self): - return hash(self.id) - - @property - def supported_labels(self): - return self._supported_labels - - @supported_labels.setter - def supported_labels(self, value): - if not isinstance(value, set): - raise TypeError("'supported_labels' attribute must be a set") - self._supported_labels = value - - def toDict(self): - d = {} - d['id'] = self.id - d['provider_name'] = self.provider_name - # sets are not JSON serializable, so use a sorted list - d['supported_labels'] = sorted(self.supported_labels) - return d - - @staticmethod - def fromDict(d): - obj = Launcher() - obj.id = d.get('id') - # TODO(tobiash): The fallback to 'unknown' is only needed to avoid - # having a full nodepool shutdown on upgrade. It can be - # removed later. - obj.provider_name = d.get('provider_name', 'unknown') - obj.supported_labels = set(d.get('supported_labels', [])) - return obj - - class BaseModel(Serializable): VALID_STATES = set([]) @@ -764,6 +715,8 @@ class ZooKeeper(object): self._request_cache.listen(self.requestCacheListener) self._request_cache.start() + COMPONENT_REGISTRY.create(self.zk_client) + # ======================================================================= # Private Methods # ======================================================================= @@ -1588,65 +1541,14 @@ class ZooKeeper(object): except kze.NoNodeError: pass - def registerLauncher(self, launcher): + def getRegisteredPools(self): ''' - Register an active node launcher. + Get a list of all launcher pools that have registered with ZooKeeper. - The launcher is de-registered when the launcher process terminates or - otherwise disconnects from ZooKeeper, or via deregisterLauncher(). - It will need to re-register after a lost connection. This method is - safe to call multiple times. - - :param Launcher launcher: Object describing the launcher. + :returns: A list of PoolComponent objects, or empty list if none + are found. ''' - path = self._launcherPath(launcher.id) - - if self.client.exists(path): - data, _ = self.client.get(path) - obj = Launcher.fromDict(self._bytesToDict(data)) - if obj != launcher: - self.client.set(path, launcher.serialize()) - self.log.debug("Updated registration for launcher %s", - launcher.id) - else: - self.client.create(path, value=launcher.serialize(), - makepath=True, ephemeral=True) - self.log.debug("Registered launcher %s", launcher.id) - - def deregisterLauncher(self, launcher_id): - ''' - Deregister an active node launcher. - - :param str launcher_id: ID of the launcher to deregister. - ''' - path = self._launcherPath(launcher_id) - try: - self.client.delete(path, recursive=True) - except kze.NoNodeError: - pass - - def getRegisteredLaunchers(self): - ''' - Get a list of all launchers that have registered with ZooKeeper. - - :returns: A list of Launcher objects, or empty list if none are found. - ''' - try: - launcher_ids = self.client.get_children(self.LAUNCHER_ROOT) - except kze.NoNodeError: - return [] - - objs = [] - for launcher in launcher_ids: - path = self._launcherPath(launcher) - try: - data, _ = self.client.get(path) - except kze.NoNodeError: - # launcher disappeared - continue - - objs.append(Launcher.fromDict(self._bytesToDict(data))) - return objs + return list(COMPONENT_REGISTRY.registry.all(kind='pool')) def getNodeRequests(self): ''' diff --git a/releasenotes/notes/component-registry-327e1ade02155e39.yaml b/releasenotes/notes/component-registry-327e1ade02155e39.yaml new file mode 100644 index 000000000..a136a79d8 --- /dev/null +++ b/releasenotes/notes/component-registry-327e1ade02155e39.yaml @@ -0,0 +1,11 @@ +--- +upgrade: + - | + Due to an internal change in how Nodepool launchers communicate + with each other, all launchers should be upgraded to the same + version within a short period of time. + + They will generally continue to work at different versions, but + the mechanism that allows them to yield to specific providers when + requested is being changed and so that will not function correctly + unless they are upgraded near-simultaneously.