diff --git a/nodepool/builder.py b/nodepool/builder.py index e1e57ae45..28dd9b2d2 100644 --- a/nodepool/builder.py +++ b/nodepool/builder.py @@ -32,6 +32,8 @@ from nodepool import provider_manager from nodepool import stats from nodepool.zk import zookeeper as zk from nodepool.zk import ZooKeeperClient +from nodepool.zk.components import BuilderComponent +from nodepool.version import get_version_string MINS = 60 @@ -1402,6 +1404,13 @@ class NodePoolBuilder(object): ) self.zk_client.connect() self.zk = zk.ZooKeeper(self.zk_client, enable_cache=False) + + hostname = socket.gethostname() + self.component_info = BuilderComponent( + self.zk_client, hostname, + version=get_version_string()) + self.component_info.register() + self.log.debug('Starting listener for build jobs') # Create build and upload worker objects diff --git a/nodepool/driver/__init__.py b/nodepool/driver/__init__.py index 37f0b34d6..68b72ff45 100644 --- a/nodepool/driver/__init__.py +++ b/nodepool/driver/__init__.py @@ -641,7 +641,7 @@ class NodeRequestHandler(NodeRequestHandlerNotifications, # want to make sure we don't continuously grow this array. if self.launcher_id not in self.request.declined_by: self.request.declined_by.append(self.launcher_id) - launchers = set([x.id for x in self.zk.getRegisteredLaunchers()]) + launchers = set([x.id for x in self.zk.getRegisteredPools()]) if launchers.issubset(set(self.request.declined_by)): # All launchers have declined it self.log.debug("Failing declined node request") diff --git a/nodepool/launcher.py b/nodepool/launcher.py index fb632eae0..bf52882e5 100644 --- a/nodepool/launcher.py +++ b/nodepool/launcher.py @@ -32,9 +32,10 @@ from nodepool import stats from nodepool import config as nodepool_config from nodepool.zk import zookeeper as zk from nodepool.zk import ZooKeeperClient +from nodepool.zk.components import LauncherComponent, PoolComponent from nodepool.driver.utils import QuotaInformation, QuotaSupport from nodepool.logconfig import get_annotated_logger -from nodepool.version import version_info as npd_version_info +from nodepool.version import get_version_string MINS = 60 @@ -109,7 +110,7 @@ class PoolWorker(threading.Thread, stats.StatsReporter): # become out of date as the loop progresses, but it should be # good enough to determine whether we should process requests # which express a preference for a specific provider. - launchers = self.zk.getRegisteredLaunchers() + launchers = self.zk.getRegisteredPools() pm = self.getProviderManager() has_quota_support = isinstance(pm, QuotaSupport) @@ -174,7 +175,7 @@ class PoolWorker(threading.Thread, stats.StatsReporter): candidate_launchers = set( x.id for x in launchers if x.provider_name == req.provider - and x.supported_labels.issuperset(req.node_types)) + and set(x.supported_labels).issuperset(req.node_types)) if candidate_launchers: # There is a launcher online which can satisfy the request if not candidate_launchers.issubset(set(req.declined_by)): @@ -349,6 +350,22 @@ class PoolWorker(threading.Thread, stats.StatsReporter): def run(self): self.running = True + # Make sure we're always registered with ZK + hostname = socket.gethostname() + self.component_info = PoolComponent( + self.zk.zk_client, hostname, + version=get_version_string()) + labels = set() + for prov_cfg in self.nodepool.config.providers.values(): + labels.update(prov_cfg.getSupportedLabels()) + self.component_info.content.update({ + 'id': self.launcher_id, + 'provider_name': self.provider_name, + 'supported_labels': list(labels), + 'state': self.component_info.RUNNING, + }) + self.component_info.register() + while self.running: try: # Don't do work if we've lost communication with the ZK cluster @@ -360,14 +377,11 @@ class PoolWorker(threading.Thread, stats.StatsReporter): if did_suspend: self.log.info("ZooKeeper available. Resuming") - # Make sure we're always registered with ZK - launcher = zk.Launcher() - launcher.id = self.launcher_id + labels = set() for prov_cfg in self.nodepool.config.providers.values(): - launcher.supported_labels.update( - prov_cfg.getSupportedLabels()) - launcher.provider_name = self.provider_name - self.zk.registerLauncher(launcher) + labels.update(prov_cfg.getSupportedLabels()) + if set(self.component_info.supported_labels) != labels: + self.component_info.supported_labels = list(labels) self.updateProviderLimits( self.nodepool.config.providers.get(self.provider_name)) @@ -408,7 +422,7 @@ class PoolWorker(threading.Thread, stats.StatsReporter): ''' self.log.info("%s received stop" % self.name) self.running = False - self.zk.deregisterLauncher(self.launcher_id) + self.component_info.unregister() self.stop_event.set() @@ -984,6 +998,12 @@ class NodePool(threading.Thread): ) self.zk_client.connect() self.zk = zk.ZooKeeper(self.zk_client) + + hostname = socket.gethostname() + self.component_info = LauncherComponent( + self.zk_client, hostname, + version=get_version_string()) + self.component_info.register() else: self.log.debug("Detected ZooKeeper server changes") self.zk_client.resetHosts(configured) @@ -1144,7 +1164,7 @@ class NodePool(threading.Thread): Start point for the NodePool thread. ''' self.log.info("Nodepool launcher %s starting", - npd_version_info.release_string()) + get_version_string()) while not self._stopped: try: self.updateConfig() @@ -1158,6 +1178,8 @@ class NodePool(threading.Thread): if did_suspend: self.log.info("ZooKeeper available. Resuming") + if self.component_info.state != self.component_info.RUNNING: + self.component_info.state = self.component_info.RUNNING self.createMinReady() if not self._cleanup_thread: diff --git a/nodepool/model_api.py b/nodepool/model_api.py new file mode 100644 index 000000000..1a5071fd8 --- /dev/null +++ b/nodepool/model_api.py @@ -0,0 +1,16 @@ +# Copyright 2022 Acme Gating, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +# Currently unused. Included here for future use similar to Zuul. +MODEL_API = 0 diff --git a/nodepool/stats.py b/nodepool/stats.py index f1d333e24..02b82e809 100644 --- a/nodepool/stats.py +++ b/nodepool/stats.py @@ -20,6 +20,7 @@ import statsd from nodepool.zk import zookeeper as zk + log = logging.getLogger("nodepool.stats") @@ -97,7 +98,7 @@ class StatsReporter(object): states = {} - launchers = zk_conn.getRegisteredLaunchers() + launchers = zk_conn.getRegisteredPools() labels = set() for launcher in launchers: labels.update(launcher.supported_labels) diff --git a/nodepool/status.py b/nodepool/status.py index c99b731f4..e8b505c87 100644 --- a/nodepool/status.py +++ b/nodepool/status.py @@ -267,9 +267,9 @@ def label_list(zk): # NOTE(ianw): maybe add to each entry a list of which # launchers support the label? labels = set() - launchers = zk.getRegisteredLaunchers() + launchers = zk.getRegisteredPools() for launcher in launchers: - labels.update(launcher.supported_labels) + labels.update(set(launcher.supported_labels)) for label in labels: objs.append({'label': label}) diff --git a/nodepool/tests/__init__.py b/nodepool/tests/__init__.py index 9ca0e2639..f94758c58 100644 --- a/nodepool/tests/__init__.py +++ b/nodepool/tests/__init__.py @@ -38,6 +38,7 @@ from nodepool import launcher from nodepool import webapp from nodepool.zk import zookeeper as zk from nodepool.zk import ZooKeeperClient +from nodepool.zk.components import COMPONENT_REGISTRY from nodepool.cmd.config_validator import ConfigValidator from nodepool.nodeutils import iterate_timeout @@ -160,9 +161,19 @@ class StatsdFixture(fixtures.Fixture): self.thread.join() +class GlobalRegistryFixture(fixtures.Fixture): + def _setUp(self): + self.addCleanup(self._cleanup) + + def _cleanup(self): + # Remove our component registry from the global + COMPONENT_REGISTRY.clearRegistry() + + class BaseTestCase(testtools.TestCase): def setUp(self): super(BaseTestCase, self).setUp() + self.useFixture(GlobalRegistryFixture()) test_timeout = os.environ.get('OS_TEST_TIMEOUT', 60) try: test_timeout = int(test_timeout) diff --git a/nodepool/tests/unit/test_launcher.py b/nodepool/tests/unit/test_launcher.py index 608aa0769..d726705c1 100644 --- a/nodepool/tests/unit/test_launcher.py +++ b/nodepool/tests/unit/test_launcher.py @@ -18,13 +18,16 @@ import math import time import fixtures import mock +import socket import testtools from nodepool import tests from nodepool.zk import zookeeper as zk +from nodepool.zk.components import PoolComponent from nodepool.driver.fake import provider as fakeprovider from nodepool.nodeutils import iterate_timeout import nodepool.launcher +from nodepool.version import get_version_string from kazoo import exceptions as kze @@ -761,10 +764,17 @@ class TestLauncher(tests.DBTestCase): # Create a dummy launcher with a different set of supported labels # than what we are going to request. - dummy_launcher = zk.Launcher() - dummy_launcher.provider_name = 'other-provider' - dummy_launcher.supported_labels = {'other-label', } - self.zk.registerLauncher(dummy_launcher) + hostname = socket.gethostname() + dummy_component = PoolComponent( + self.zk.zk_client, hostname, + version=get_version_string()) + dummy_component.content.update({ + 'id': 'dummy', + 'provider_name': 'other-provider', + 'supported_labels': ['other-label'], + 'state': dummy_component.RUNNING, + }) + dummy_component.register() # Node request for a specific provider that doesn't support the # requested node type. @@ -778,6 +788,7 @@ class TestLauncher(tests.DBTestCase): self.assertEqual(req.state, zk.FULFILLED) self.assertEqual(len(req.nodes), 1) self.zk.getNode(req.nodes[0]) + dummy_component.unregister() def test_node_boot_from_volume(self): """Test that an image and node are created from a volume""" @@ -1684,7 +1695,7 @@ class TestLauncher(tests.DBTestCase): self.assertEqual(2, len(pool._pool_threads)) # We should have two pool workers registered - self.assertEqual(2, len(self.zk.getRegisteredLaunchers())) + self.assertEqual(2, len(self.zk.getRegisteredPools())) self.replace_config(configfile, 'launcher_two_provider_remove.yaml') @@ -1698,7 +1709,7 @@ class TestLauncher(tests.DBTestCase): pass # We should have one pool worker registered - self.assertEqual(1, len(self.zk.getRegisteredLaunchers())) + self.assertEqual(1, len(self.zk.getRegisteredPools())) def test_failed_provider(self): """Test that broken provider doesn't fail node requests.""" @@ -1914,18 +1925,19 @@ class TestLauncher(tests.DBTestCase): pool.start() self.waitForNodes('fake-label') - launchers = self.zk.getRegisteredLaunchers() + launchers = self.zk.getRegisteredPools() self.assertEqual(1, len(launchers)) # the fake-label-unused label should not appear - self.assertEqual({'fake-label'}, launchers[0].supported_labels) + self.assertEqual({'fake-label'}, set(launchers[0].supported_labels)) self.replace_config(configfile, 'launcher_reg2.yaml') # we should get 1 additional label now - while launchers[0].supported_labels != {'fake-label', 'fake-label2'}: + while (set(launchers[0].supported_labels) != + {'fake-label', 'fake-label2'}): time.sleep(1) - launchers = self.zk.getRegisteredLaunchers() + launchers = self.zk.getRegisteredPools() @mock.patch('nodepool.driver.openstack.handler.' 'OpenStackNodeLauncher._launchNode') diff --git a/nodepool/tests/unit/test_zk.py b/nodepool/tests/unit/test_zk.py index cf2addd71..9d05c8966 100644 --- a/nodepool/tests/unit/test_zk.py +++ b/nodepool/tests/unit/test_zk.py @@ -14,12 +14,39 @@ import testtools import time import uuid +import socket from nodepool import exceptions as npe from nodepool import tests from nodepool.zk import zookeeper as zk +from nodepool.zk.components import PoolComponent from nodepool.config import ZooKeeperConnectionConfig, buildZooKeeperHosts from nodepool.nodeutils import iterate_timeout +from nodepool.version import get_version_string + + +class TestComponentRegistry(tests.DBTestCase): + + def test_pool_component(self): + hostname = socket.gethostname() + launcher = PoolComponent( + self.zk.zk_client, hostname, + version=get_version_string()) + launcher.content.update({ + 'id': "launcher-Poolworker.provider-main-" + uuid.uuid4().hex, + 'provider_name': 'provider', + 'supported_labels': [], + 'state': launcher.RUNNING, + }) + launcher.register() + + launchers = self.zk.getRegisteredPools() + self.assertEqual(1, len(launchers)) + self.assertEqual(launcher.id, list(launchers)[0].id) + + launcher.unregister() + launchers = self.zk.getRegisteredPools() + self.assertEqual(0, len(launchers)) class TestZooKeeper(tests.DBTestCase): @@ -564,23 +591,6 @@ class TestZooKeeper(tests.DBTestCase): self.zk.deleteUpload("trusty", "000", "rax", "000001") self.assertIsNone(self.zk.client.exists(path)) - def test_registerLauncher(self): - launcher = zk.Launcher() - launcher.id = "launcher-Poolworker.provider-main-" + uuid.uuid4().hex - self.zk.registerLauncher(launcher) - launchers = self.zk.getRegisteredLaunchers() - self.assertEqual(1, len(launchers)) - self.assertEqual(launcher.id, launchers[0].id) - - def test_registerLauncher_safe_repeat(self): - launcher = zk.Launcher() - launcher.id = "launcher-Poolworker.provider-main-" + uuid.uuid4().hex - self.zk.registerLauncher(launcher) - self.zk.registerLauncher(launcher) - launchers = self.zk.getRegisteredLaunchers() - self.assertEqual(1, len(launchers)) - self.assertEqual(launcher.id, launchers[0].id) - def test_getNodeRequests_empty(self): self.assertEqual([], self.zk.getNodeRequests()) diff --git a/nodepool/version.py b/nodepool/version.py index 144413696..1035d83ad 100644 --- a/nodepool/version.py +++ b/nodepool/version.py @@ -30,3 +30,9 @@ try: git_version = _metadata['git_version'] except Exception: pass + + +def get_version_string(): + if is_release: + return release_string + return "{} {}".format(release_string, git_version) diff --git a/nodepool/zk/components.py b/nodepool/zk/components.py new file mode 100644 index 000000000..07c228e79 --- /dev/null +++ b/nodepool/zk/components.py @@ -0,0 +1,413 @@ +# Copyright 2020 BMW Group +# Copyright 2022 Acme Gating, LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import sys +import json +import logging +import threading +import time +from collections import defaultdict + +from kazoo.exceptions import NoNodeError +from kazoo.protocol.states import EventType + +from nodepool.zk import ZooKeeperBase +from nodepool.model_api import MODEL_API + + +COMPONENTS_ROOT = "/nodepool/components" + + +class GlobalRegistry: + def __init__(self): + self.registry = None + + def create(self, zk_client): + if not self.registry: + self.registry = ComponentRegistry(zk_client) + return self.registry + + def clearRegistry(self): + self.registry = None + + @property + def model_api(self): + return self.registry.model_api + + +COMPONENT_REGISTRY = GlobalRegistry() + + +class BaseComponent(ZooKeeperBase): + """ + Read/write component object. + + This object holds an offline cache of all the component's attributes. In + case of an failed update to ZooKeeper the local cache will still hold the + fresh values. Updating any attribute uploads all attributes to ZooKeeper. + + This enables this object to be used as local key/value store even if the + ZooKeeper connection got lost. + + :arg client ZooKeeperClient: The ZooKeeperClient object to use, or + None if this should be a read-only component. + :arg hostname str: The component's hostname (multiple components with + the same hostname may be registered; the registry will create unique + nodes for each). + """ + + # Component states + INITIALIZING = "initializing" + RUNNING = "running" + PAUSED = "paused" + STOPPED = "stopped" + + log = logging.getLogger("nodepool.Component") + kind = "base" + + def __init__(self, client, hostname, version=None): + # Ensure that the content is available before setting any other + # attribute, because our __setattr__ implementation is relying on it. + self.__dict__["content"] = { + "hostname": hostname, + "state": self.STOPPED, + "kind": self.kind, + "version": version, + "model_api": 0, + } + super().__init__(client) + + self.path = None + self._zstat = None + self.register_lock = threading.Lock() + + def __getattr__(self, name): + try: + return self.content[name] + except KeyError: + raise AttributeError + + def __setattr__(self, name, value): + # If the specified attribute is not part of our content dictionary, + # fall back to the default __settattr__ behaviour. + if name not in self.content.keys(): + return super().__setattr__(name, value) + + # Set the value in the local content dict + self.content[name] = value + + with self.register_lock: + if not self.path: + self.log.error( + "Path is not set on this component, did you forget " + "to call register()?" + ) + return + + # Update the ZooKeeper node + content = json.dumps(self.content, sort_keys=True).encode("utf-8") + try: + zstat = self.kazoo_client.set( + self.path, content, version=self._zstat.version + ) + self._zstat = zstat + except NoNodeError: + self.log.error("Could not update %s in ZooKeeper", self) + + def register(self, model_api=MODEL_API): + self.content['model_api'] = model_api + with self.register_lock: + path = "/".join([COMPONENTS_ROOT, self.kind, self.hostname]) + self.log.info("Registering component in ZooKeeper %s", path) + self.path, self._zstat = self.kazoo_client.create( + path, + json.dumps(self.content, sort_keys=True).encode("utf-8"), + makepath=True, + ephemeral=True, + sequence=True, + # Also return the zstat, which is necessary to successfully + # update the component. + include_data=True, + ) + + if not COMPONENT_REGISTRY.registry: + return + + # Wait 5 seconds for the component to appear in our local + # cache so that operations which rely on lists of available + # labels, etc, behave more synchronously. + for x in range(50): + registered = set() + for kind, components in COMPONENT_REGISTRY.registry.all(): + for component in components: + registered.add(component.path) + if self.path in registered: + return + time.sleep(0.1) + self.log.info("Did not see component registration for %s", path) + + def unregister(self): + with self.register_lock: + self.log.info("Unregistering component in ZooKeeper %s", self.path) + self.client.on_connect_listeners.remove(self._onConnect) + self.client.on_disconnect_listeners.remove(self._onDisconnect) + self.client.on_reconnect_listeners.remove(self._onReconnect) + self.kazoo_client.delete(self.path) + # Break the object so we can't register again + del self.register_lock + + # Wait 5 seconds for the component to appear in our local + # cache so that operations which rely on lists of available + # labels, etc, behave more synchronously. + for x in range(50): + registered = set() + for kind, components in COMPONENT_REGISTRY.registry.all(): + for component in components: + registered.add(component.path) + if self.path not in registered: + return + time.sleep(0.1) + self.log.info("Did not see component unregistration for %s", self.path) + + def _onReconnect(self): + self.register() + + def updateFromDict(self, data): + self.content.update(data) + + @classmethod + def fromDict(cls, client, hostname, data): + component = cls(client, hostname) + component.updateFromDict(data) + return component + + def __repr__(self): + return f"<{self.__class__.__name__} {self.content}>" + + +class LauncherComponent(BaseComponent): + kind = "launcher" + + +class BuilderComponent(BaseComponent): + kind = "builder" + + +# Not a process, but rather a provider-pool within a launcher. +class PoolComponent(BaseComponent): + kind = "pool" + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.initial_state = { + "id": None, + "provider_name": None, + "supported_labels": [], + } + self.content.update(self.initial_state) + + +class ComponentRegistry(ZooKeeperBase): + """A component registry is organized like: + + /nodepool/components/{kind}/{sequence} + + Where kind corresponds to one of the classes in this module, and + sequence is a ZK sequence node with a prefix of the hostname in + order to produce a unique id for multiple identical components + running on the same host. An example path: + + /nodepool/components/scheduler/hostname0000000000 + + Components are ephemeral nodes, and so if the actual service + disconnects from ZK, the node will disappear. + + Component objects returned by this class are read-only; updating + their attributes will not be reflected in ZooKeeper. + """ + log = logging.getLogger("nodepool.ComponentRegistry") + + COMPONENT_CLASSES = { + "pool": PoolComponent, + "launcher": LauncherComponent, + "builder": BuilderComponent, + } + + def __init__(self, client): + super().__init__(client) + + self.client = client + self._component_tree = None + # kind -> hostname -> component + self._cached_components = defaultdict(dict) + + self.model_api = None + # Have we initialized enough to trust the model_api + self._init = False + # If we are already connected when the class is instantiated, directly + # call the onConnect callback. + if self.client.connected: + self._onConnect() + + def _getComponentRoot(self, kind): + return '/'.join([COMPONENTS_ROOT, kind]) + + def _getComponentPath(self, kind, hostname): + return '/'.join([COMPONENTS_ROOT, kind, hostname]) + + def _onConnect(self): + for kind in self.COMPONENT_CLASSES.keys(): + root = self._getComponentRoot(kind) + self.kazoo_client.ensure_path(root) + self.kazoo_client.ChildrenWatch( + root, self._makeComponentRootWatcher(kind)) + self._init = True + self._updateMinimumModelApi() + + def _makeComponentRootWatcher(self, kind): + def watch(children, event=None): + return self._onComponentRootUpdate(kind, children, event) + return watch + + def _onComponentRootUpdate(self, kind, children, event): + for hostname in children: + component = self._cached_components[kind].get(hostname) + if not component: + self.log.info("Noticed new %s component %s", kind, hostname) + root = self._getComponentPath(kind, hostname) + self.kazoo_client.DataWatch( + root, self._makeComponentWatcher(kind, hostname)) + + def _makeComponentWatcher(self, kind, hostname): + def watch(data, stat, event=None): + return self._onComponentUpdate(kind, hostname, data, stat, event) + return watch + + def _onComponentUpdate(self, kind, hostname, data, stat, event): + if event: + etype = event.type + else: + etype = None + self.log.debug( + "Registry %s got event %s for %s %s", + self, etype, kind, hostname) + if (etype in (None, EventType.CHANGED, EventType.CREATED) and + data is not None): + + # Perform an in-place update of the cached component (if any) + component = self._cached_components.get(kind, {}).get(hostname) + d = json.loads(data.decode("utf-8")) + + self.log.info( + "Component %s %s updated: %s", + kind, hostname, d) + + if component: + if (stat.version <= component._zstat.version): + # Don't update to older data + return + component.updateFromDict(d) + component._zstat = stat + else: + # Create a new component from the data structure + # Get the correct kind of component + # TODO (felix): KeyError on unknown component type? + component_cls = self.COMPONENT_CLASSES[kind] + # Pass in null ZK client to make a read-only component + # instance. + component = component_cls.fromDict(None, hostname, d) + component.path = self._getComponentPath(kind, hostname) + component._zstat = stat + + self._cached_components[kind][hostname] = component + self._updateMinimumModelApi() + elif (etype == EventType.DELETED or data is None): + self.log.info( + "Noticed %s component %s disappeared", + kind, hostname) + try: + del self._cached_components[kind][hostname] + except KeyError: + # If it's already gone, don't care + pass + self._updateMinimumModelApi() + # Return False to stop the datawatch + return False + + def all(self, kind=None): + """Returns a list of components. + + If kind is None, then a list of tuples is returned, with each tuple + being (kind, [list of components]). + + :arg kind str: The type of component to look up in the registry, or + None to return all kinds + """ + + if kind is None: + return [(kind, list(components.values())) + for (kind, components) in self._cached_components.items()] + + # Filter the cached components for the given kind + return self._cached_components.get(kind, {}).values() + + def getMinimumModelApi(self): + """Get the minimum model API version of all currently connected + components""" + + # Start with our own version in case we're the only component + # and we haven't registered. + version = MODEL_API + for kind, components in self.all(): + for component in components: + version = min(version, component.model_api) + return version + + def _updateMinimumModelApi(self): + if not self._init: + return + version = self.getMinimumModelApi() + if version != self.model_api: + self.log.info(f"System minimum data model version {version}; " + f"this component {MODEL_API}") + if self.model_api is None: + if version < MODEL_API: + self.log.info("The data model version of this component is " + "newer than the rest of the system; this " + "component will operate in compatability mode " + "until the system is upgraded") + elif version > MODEL_API: + self.log.error("The data model version of this component is " + "older than the rest of the system; " + "exiting to prevent data corruption") + sys.exit(1) + else: + if version > self.model_api: + if version > MODEL_API: + self.log.info("The data model version of this component " + "is older than other components in the " + "system, so other components will operate " + "in a compability mode; upgrade this " + "component as soon as possible to complete " + "the system upgrade") + elif version == MODEL_API: + self.log.info("The rest of the system has been upgraded " + "to the data model version of this " + "component") + elif version < self.model_api: + self.log.error("A component with a data model version older " + "than the rest of the system has been started; " + "data corruption is very likely to occur.") + # Should we exit here as well? + self.model_api = version diff --git a/nodepool/zk/zookeeper.py b/nodepool/zk/zookeeper.py index 135410de4..15319921e 100644 --- a/nodepool/zk/zookeeper.py +++ b/nodepool/zk/zookeeper.py @@ -26,6 +26,7 @@ from kazoo.recipe.election import Election from nodepool import exceptions as npe from nodepool.logconfig import get_annotated_logger +from nodepool.zk.components import COMPONENT_REGISTRY # States: # We are building this image (or node) but it is not ready for use. @@ -113,56 +114,6 @@ class Serializable(abc.ABC): return json.dumps(self.toDict()).encode('utf8') -class Launcher(Serializable): - ''' - Class to describe a nodepool launcher. - ''' - - def __init__(self): - self.id = None - self.provider_name = None - self._supported_labels = set() - - def __eq__(self, other): - if isinstance(other, Launcher): - return (self.id == other.id and - self.supported_labels == other.supported_labels) - else: - return False - - def __hash__(self): - return hash(self.id) - - @property - def supported_labels(self): - return self._supported_labels - - @supported_labels.setter - def supported_labels(self, value): - if not isinstance(value, set): - raise TypeError("'supported_labels' attribute must be a set") - self._supported_labels = value - - def toDict(self): - d = {} - d['id'] = self.id - d['provider_name'] = self.provider_name - # sets are not JSON serializable, so use a sorted list - d['supported_labels'] = sorted(self.supported_labels) - return d - - @staticmethod - def fromDict(d): - obj = Launcher() - obj.id = d.get('id') - # TODO(tobiash): The fallback to 'unknown' is only needed to avoid - # having a full nodepool shutdown on upgrade. It can be - # removed later. - obj.provider_name = d.get('provider_name', 'unknown') - obj.supported_labels = set(d.get('supported_labels', [])) - return obj - - class BaseModel(Serializable): VALID_STATES = set([]) @@ -764,6 +715,8 @@ class ZooKeeper(object): self._request_cache.listen(self.requestCacheListener) self._request_cache.start() + COMPONENT_REGISTRY.create(self.zk_client) + # ======================================================================= # Private Methods # ======================================================================= @@ -1588,65 +1541,14 @@ class ZooKeeper(object): except kze.NoNodeError: pass - def registerLauncher(self, launcher): + def getRegisteredPools(self): ''' - Register an active node launcher. + Get a list of all launcher pools that have registered with ZooKeeper. - The launcher is de-registered when the launcher process terminates or - otherwise disconnects from ZooKeeper, or via deregisterLauncher(). - It will need to re-register after a lost connection. This method is - safe to call multiple times. - - :param Launcher launcher: Object describing the launcher. + :returns: A list of PoolComponent objects, or empty list if none + are found. ''' - path = self._launcherPath(launcher.id) - - if self.client.exists(path): - data, _ = self.client.get(path) - obj = Launcher.fromDict(self._bytesToDict(data)) - if obj != launcher: - self.client.set(path, launcher.serialize()) - self.log.debug("Updated registration for launcher %s", - launcher.id) - else: - self.client.create(path, value=launcher.serialize(), - makepath=True, ephemeral=True) - self.log.debug("Registered launcher %s", launcher.id) - - def deregisterLauncher(self, launcher_id): - ''' - Deregister an active node launcher. - - :param str launcher_id: ID of the launcher to deregister. - ''' - path = self._launcherPath(launcher_id) - try: - self.client.delete(path, recursive=True) - except kze.NoNodeError: - pass - - def getRegisteredLaunchers(self): - ''' - Get a list of all launchers that have registered with ZooKeeper. - - :returns: A list of Launcher objects, or empty list if none are found. - ''' - try: - launcher_ids = self.client.get_children(self.LAUNCHER_ROOT) - except kze.NoNodeError: - return [] - - objs = [] - for launcher in launcher_ids: - path = self._launcherPath(launcher) - try: - data, _ = self.client.get(path) - except kze.NoNodeError: - # launcher disappeared - continue - - objs.append(Launcher.fromDict(self._bytesToDict(data))) - return objs + return list(COMPONENT_REGISTRY.registry.all(kind='pool')) def getNodeRequests(self): ''' diff --git a/releasenotes/notes/component-registry-327e1ade02155e39.yaml b/releasenotes/notes/component-registry-327e1ade02155e39.yaml new file mode 100644 index 000000000..a136a79d8 --- /dev/null +++ b/releasenotes/notes/component-registry-327e1ade02155e39.yaml @@ -0,0 +1,11 @@ +--- +upgrade: + - | + Due to an internal change in how Nodepool launchers communicate + with each other, all launchers should be upgraded to the same + version within a short period of time. + + They will generally continue to work at different versions, but + the mechanism that allows them to yield to specific providers when + requested is being changed and so that will not function correctly + unless they are upgraded near-simultaneously.