From 6ac2f33cb389804ccc8973b3907a3357b0c5a1d3 Mon Sep 17 00:00:00 2001 From: Tristan Cacqueray Date: Sat, 17 Jun 2017 12:44:16 +0000 Subject: [PATCH] Implement a static driver for Nodepool This change adds a static node driver. Change-Id: I065f2b42af49a57218f1c04a08b3ddd15ccc0832 Story: 2001044 Task: 4615 --- doc/source/configuration.rst | 58 ++++++ nodepool/config.py | 8 + nodepool/driver/static/__init__.py | 0 nodepool/driver/static/config.py | 82 ++++++++ nodepool/driver/static/handler.py | 182 ++++++++++++++++++ nodepool/driver/static/provider.py | 90 +++++++++ nodepool/nodeutils.py | 16 ++ .../tests/fixtures/config_validate/good.yaml | 14 ++ .../config_validate/static_error.yaml | 5 + nodepool/tests/fixtures/multiproviders.yaml | 33 ++++ nodepool/tests/fixtures/static.yaml | 29 +++ nodepool/tests/test_driver_static.py | 104 ++++++++++ 12 files changed, 621 insertions(+) create mode 100644 nodepool/driver/static/__init__.py create mode 100644 nodepool/driver/static/config.py create mode 100644 nodepool/driver/static/handler.py create mode 100644 nodepool/driver/static/provider.py create mode 100644 nodepool/tests/fixtures/config_validate/static_error.yaml create mode 100644 nodepool/tests/fixtures/multiproviders.yaml create mode 100644 nodepool/tests/fixtures/static.yaml create mode 100644 nodepool/tests/test_driver_static.py diff --git a/doc/source/configuration.rst b/doc/source/configuration.rst index 5592f0936..592b0e84d 100644 --- a/doc/source/configuration.rst +++ b/doc/source/configuration.rst @@ -631,3 +631,61 @@ Example configuration:: When booting an image from volume, how big should the created volume be. In gigabytes. Default 50. + + +Static driver +^^^^^^^^^^^^^ + +The static provider driver is used to define static nodes. Nodes are also +partitioned into groups called "pools" (see :ref:`static_nodes` for details). + +Example:: + + providers: + - name: static-rack + driver: static + pools: + - name: main + nodes: + - name: trusty.example.com + labels: trusty-static + host-key: fake-key + timeout: 13 + ssh-port: 22022 + username: zuul + max-parallel-jobs: 1 + +.. _static_nodes: + +static nodes +~~~~~~~~~~~~ + +Each entry in a pool`s nodes section indicates a static node and it's +corresponding label. + +**required** + + ``name`` + The hostname or ip address of the static node. + + ``labels`` (list) + The list of labels associated with the node. + +**optional** + + ``username`` + The username nodepool will use to validate it can connect to the node. + Default to *zuul* + + ``timeout`` + The timeout in second before the ssh ping is considered failed. + Default to *5* seconds + + ``host-key`` + The ssh host key of the node. + + ``ssh-port`` + The ssh port, default to *22* + + ``max-parallel-jobs`` + The number of jobs that can run in parallel on this node, default to *1*. diff --git a/nodepool/config.py b/nodepool/config.py index 1d909d36e..bad0dcb48 100755 --- a/nodepool/config.py +++ b/nodepool/config.py @@ -53,6 +53,14 @@ class DiskImage(ConfigValue): return "" % self.name +def as_list(item): + if not item: + return [] + if isinstance(item, list): + return item + return [item] + + def get_provider_config(provider): provider.setdefault('driver', 'openstack') # Ensure legacy configuration still works when using fake cloud diff --git a/nodepool/driver/static/__init__.py b/nodepool/driver/static/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/nodepool/driver/static/config.py b/nodepool/driver/static/config.py new file mode 100644 index 000000000..b0d0c4498 --- /dev/null +++ b/nodepool/driver/static/config.py @@ -0,0 +1,82 @@ +# Copyright 2017 Red Hat +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# +# See the License for the specific language governing permissions and +# limitations under the License. + +import voluptuous as v + +from nodepool.driver import ConfigValue +from nodepool.driver import ProviderConfig +from nodepool.config import as_list + + +class StaticPool(ConfigValue): + def __eq__(self, other): + if (other.labels != self.labels or + other.nodes != self.nodes): + return False + return True + + def __repr__(self): + return "" % self.name + + +class StaticProviderConfig(ProviderConfig): + def __eq__(self, other): + if other.pools != self.pools: + return False + return True + + @staticmethod + def reset(): + pass + + def load(self, config): + self.pools = {} + for pool in self.provider.get('pools', []): + pp = StaticPool() + pp.name = pool['name'] + pp.provider = self + self.pools[pp.name] = pp + pp.labels = set() + pp.nodes = [] + for node in pool.get('nodes', []): + pp.nodes.append({ + 'name': node['name'], + 'labels': as_list(node['labels']), + 'host-key': as_list(node.get('host-key', [])), + 'timeout': int(node.get('timeout', 5)), + 'ssh-port': int(node.get('ssh-port', 22)), + 'username': node.get('username', 'zuul'), + 'max-parallel-jobs': int(node.get('max-parallel-jobs', 1)), + }) + for label in node['labels'].split(): + pp.labels.add(label) + config.labels[label].pools.append(pp) + + def get_schema(self): + pool_node = { + v.Required('name'): str, + v.Required('labels'): v.Any(str, [str]), + 'username': str, + 'timeout': int, + 'host-key': v.Any(str, [str]), + 'ssh-port': int, + 'max-parallel-jobs': int, + } + pool = { + 'name': str, + 'nodes': [pool_node], + } + return v.Schema({'pools': [pool]}) diff --git a/nodepool/driver/static/handler.py b/nodepool/driver/static/handler.py new file mode 100644 index 000000000..498b6339b --- /dev/null +++ b/nodepool/driver/static/handler.py @@ -0,0 +1,182 @@ +# Copyright 2017 Red Hat +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging +import random + +from nodepool import exceptions +from nodepool import nodeutils +from nodepool import zk +from nodepool.driver import NodeRequestHandler + + +class StaticNodeRequestHandler(NodeRequestHandler): + log = logging.getLogger("nodepool.driver.static." + "StaticNodeRequestHandler") + + def _invalidNodeTypes(self): + ''' + Return any node types that are invalid for this provider. + + :returns: A list of node type names that are invalid, or an empty + list if all are valid. + ''' + invalid = [] + for ntype in self.request.node_types: + if ntype not in self.pool.labels: + invalid.append(ntype) + return invalid + + def checkConcurrency(self, static_node): + access_count = 0 + for node in self.zk.nodeIterator(): + if node.hostname != static_node["name"]: + continue + if node.state in ('ready', 'in-use'): + access_count += 1 + if access_count >= static_node["max-parallel-jobs"]: + self.log.info("%s: max concurrency reached (%d)" % ( + static_node["name"], access_count)) + return False + return True + + def _waitForNodeSet(self): + ''' + Fill node set for the request. + + ''' + needed_types = self.request.node_types + static_nodes = [] + unavailable_nodes = [] + ready_nodes = self.zk.getReadyNodesOfTypes(needed_types) + + for ntype in needed_types: + # First try to grab from the list of already available nodes. + got_a_node = False + if self.request.reuse and ntype in ready_nodes: + for node in ready_nodes[ntype]: + # Only interested in nodes from this provider and + # pool + if node.provider != self.provider.name: + continue + if node.pool != self.pool.name: + continue + + try: + self.zk.lockNode(node, blocking=False) + except exceptions.ZKLockException: + # It's already locked so skip it. + continue + else: + if self.paused: + self.log.debug("Unpaused request %s", self.request) + self.paused = False + + self.log.debug( + "Locked existing node %s for request %s", + node.id, self.request.id) + got_a_node = True + node.allocated_to = self.request.id + self.zk.storeNode(node) + self.nodeset.append(node) + break + # Could not grab an existing node, so assign a new one. + if not got_a_node: + for node in self.available_nodes: + if ntype in node["labels"]: + max_concurrency = not self.checkConcurrency(node) + if max_concurrency: + continue + static_nodes.append((ntype, node)) + break + if max_concurrency: + unavailable_nodes.append(ntype) + + if unavailable_nodes: + self.log.debug("%s: static nodes %s are at capacity" % ( + self.request.id, unavailable_nodes)) + self.zk.storeNodeRequest(self.request) + self.zk.unlockNodeRequest(self.request) + self.done = True + return + + for node_type, static_node in static_nodes: + self.log.debug("%s: Assigning static_node %s" % ( + self.request.id, static_node)) + node = zk.Node() + node.state = zk.READY + node.external_id = "static-%s" % self.request.id + node.hostname = static_node["name"] + node.username = static_node["username"] + node.interface_ip = static_node["name"] + node.connection_port = static_node["ssh-port"] + node.connection_type = "ssh" + nodeutils.set_node_ip(node) + node.host_keys = self.manager.nodes_keys[static_node["name"]] + node.provider = self.provider.name + node.pool = self.pool.name + node.launcher = self.launcher_id + node.allocated_to = self.request.id + node.type = node_type + self.nodeset.append(node) + self.zk.storeNode(node) + + def run_handler(self): + ''' + Main body for the StaticNodeRequestHandler. + ''' + self._setFromPoolWorker() + + # We have the launcher_id attr after _setFromPoolWorker() is called. + self.log = logging.getLogger( + "nodepool.driver.static.StaticNodeRequestHandler[%s]" % + self.launcher_id) + + declined_reasons = [] + invalid_types = self._invalidNodeTypes() + if invalid_types: + declined_reasons.append('node type(s) [%s] not available' % + ','.join(invalid_types)) + + self.available_nodes = self.manager.listNodes() + # Randomize static nodes order + random.shuffle(self.available_nodes) + + if len(self.request.node_types) > len(self.available_nodes): + declined_reasons.append('it would exceed quota') + + if declined_reasons: + self.log.debug("Declining node request %s because %s", + self.request.id, ', '.join(declined_reasons)) + self.decline_request() + self.unlockNodeSet(clear_allocation=True) + + # If conditions have changed for a paused request to now cause us + # to decline it, we need to unpause so we don't keep trying it + if self.paused: + self.paused = False + + self.zk.storeNodeRequest(self.request) + self.zk.unlockNodeRequest(self.request) + self.done = True + return + + if self.paused: + self.log.debug("Retrying node request %s", self.request.id) + else: + self.log.debug("Accepting node request %s", self.request.id) + self.request.state = zk.PENDING + self.zk.storeNodeRequest(self.request) + + self._waitForNodeSet() diff --git a/nodepool/driver/static/provider.py b/nodepool/driver/static/provider.py new file mode 100644 index 000000000..212f20648 --- /dev/null +++ b/nodepool/driver/static/provider.py @@ -0,0 +1,90 @@ +# Copyright 2017 Red Hat +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import logging + +from nodepool import exceptions +from nodepool.driver import Provider +from nodepool.nodeutils import keyscan + + +class StaticNodeError(Exception): + pass + + +class StaticNodeProvider(Provider): + log = logging.getLogger("nodepool.driver.static." + "StaticNodeProvider") + + def __init__(self, provider, *args): + self.provider = provider + self.pools = {} + self.static_nodes = {} + self.nodes_keys = {} + + def checkHost(self, node): + # Check node is reachable + try: + keys = keyscan(node["name"], + port=node["ssh-port"], + timeout=node["timeout"]) + except exceptions.SSHTimeoutException: + raise StaticNodeError("%s: SSHTimeoutException" % node["name"]) + + # Check node host-key + if set(node["host-key"]).issubset(set(keys)): + return keys + + self.log.debug("%s: Registered key '%s' not in %s" % ( + node["name"], node["host-key"], keys + )) + raise StaticNodeError("%s: host key mismatches (%s)" % + (node["name"], keys)) + + def start(self): + for pool in self.provider.pools.values(): + self.pools[pool.name] = {} + for node in pool.nodes: + node_name = "%s-%s" % (pool.name, node["name"]) + self.log.debug("%s: Registering static node" % node_name) + try: + self.nodes_keys[node["name"]] = self.checkHost(node) + except StaticNodeError as e: + self.log.error("Couldn't register static node: %s" % e) + continue + self.static_nodes[node_name] = node + + def stop(self): + self.log.debug("Stopping") + + def listNodes(self): + servers = [] + for node in self.static_nodes.values(): + servers.append(node) + return servers + + def cleanupNode(self, server_id): + return True + + def waitForNodeCleanup(self, server_id): + return True + + def labelReady(self, name): + return True + + def join(self): + return True + + def cleanupLeakedResources(self): + pass diff --git a/nodepool/nodeutils.py b/nodepool/nodeutils.py index bf4feb410..3c6de886b 100755 --- a/nodepool/nodeutils.py +++ b/nodepool/nodeutils.py @@ -41,6 +41,22 @@ def iterate_timeout(max_seconds, exc, purpose): raise exc("Timeout waiting for %s" % purpose) +def set_node_ip(node): + ''' + Set the node public_ip + ''' + if 'fake' in node.hostname: + return + addrinfo = socket.getaddrinfo(node.hostname, node.connection_port)[0] + if addrinfo[0] == socket.AF_INET: + node.public_ipv4 = addrinfo[4][0] + elif addrinfo[0] == socket.AF_INET6: + node.public_ipv6 = addrinfo[4][0] + else: + raise exceptions.LaunchNetworkException( + "Unable to find public IP of server") + + def keyscan(ip, port=22, timeout=60): ''' Scan the IP address for public SSH keys. diff --git a/nodepool/tests/fixtures/config_validate/good.yaml b/nodepool/tests/fixtures/config_validate/good.yaml index 979db04b1..89132e9c2 100644 --- a/nodepool/tests/fixtures/config_validate/good.yaml +++ b/nodepool/tests/fixtures/config_validate/good.yaml @@ -18,6 +18,7 @@ labels: min-ready: 0 - name: trusty-external min-ready: 1 + - name: trusty-static providers: - name: cloud1 @@ -76,6 +77,19 @@ providers: cloud-image: trusty-unmanaged min-ram: 8192 + - name: static-rack + driver: static + pools: + - name: main + nodes: + - name: trusty.example.com + labels: trusty-static + host-key: fake-key + timeout: 13 + ssh-port: 22022 + username: zuul + max-parallel-jobs: 1 + diskimages: - name: trusty formats: diff --git a/nodepool/tests/fixtures/config_validate/static_error.yaml b/nodepool/tests/fixtures/config_validate/static_error.yaml new file mode 100644 index 000000000..4191ce2d8 --- /dev/null +++ b/nodepool/tests/fixtures/config_validate/static_error.yaml @@ -0,0 +1,5 @@ +providers: + - name: static-rack + driver: static + cloud: no-cloud + diff --git a/nodepool/tests/fixtures/multiproviders.yaml b/nodepool/tests/fixtures/multiproviders.yaml new file mode 100644 index 000000000..1f68825ef --- /dev/null +++ b/nodepool/tests/fixtures/multiproviders.yaml @@ -0,0 +1,33 @@ +zookeeper-servers: + - host: {zookeeper_host} + port: {zookeeper_port} + chroot: {zookeeper_chroot} + +labels: + - name: fake-static-label + min-ready: 1 + + - name: fake-openstack-label + min-ready: 1 + +providers: + - name: static-provider + driver: static + pools: + - name: main + nodes: + - name: fake-host-1 + labels: fake-static-label + max-parallel-jobs: 1 + + - name: openstack-provider + cloud: fake + driver: fake + cloud-images: + - name: fake-image + pools: + - name: main + labels: + - name: fake-openstack-label + min-ram: 1 + cloud-image: fake-image diff --git a/nodepool/tests/fixtures/static.yaml b/nodepool/tests/fixtures/static.yaml new file mode 100644 index 000000000..500d7b350 --- /dev/null +++ b/nodepool/tests/fixtures/static.yaml @@ -0,0 +1,29 @@ +zookeeper-servers: + - host: {zookeeper_host} + port: {zookeeper_port} + chroot: {zookeeper_chroot} + +labels: + - name: fake-label + min-ready: 2 + + - name: fake-concurrent-label + min-ready: 2 + +providers: + - name: static-provider + driver: static + pools: + - name: main + nodes: + - name: fake-host-1 + labels: fake-label + host-key: ssh-rsa FAKEKEY + timeout: 13 + ssh-port: 22022 + username: zuul + max-parallel-jobs: 1 + - name: fake-host-2 + labels: fake-concurrent-label + host-key: ssh-rsa FAKEKEY + max-parallel-jobs: 2 diff --git a/nodepool/tests/test_driver_static.py b/nodepool/tests/test_driver_static.py new file mode 100644 index 000000000..5191302f0 --- /dev/null +++ b/nodepool/tests/test_driver_static.py @@ -0,0 +1,104 @@ +# Copyright (C) 2017 Red Hat +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import os + +from nodepool import config as nodepool_config +from nodepool import tests +from nodepool import zk +from nodepool.cmd.config_validator import ConfigValidator +from voluptuous import MultipleInvalid + + +class TestDriverStatic(tests.DBTestCase): + log = logging.getLogger("nodepool.TestDriverStatic") + + def test_static_validator(self): + config = os.path.join(os.path.dirname(tests.__file__), + 'fixtures', 'config_validate', + 'static_error.yaml') + validator = ConfigValidator(config) + self.assertRaises(MultipleInvalid, validator.validate) + + def test_static_config(self): + configfile = self.setup_config('static.yaml') + config = nodepool_config.loadConfig(configfile) + self.assertIn('static-provider', config.providers) + + def test_static_handler(self): + configfile = self.setup_config('static.yaml') + pool = self.useNodepool(configfile, watermark_sleep=1) + pool.start() + self.log.debug("Waiting for min-ready nodes") + node = self.waitForNodes('fake-label') + self.assertEqual(len(node), 1) + nodes = self.waitForNodes('fake-concurrent-label', 2) + self.assertEqual(len(nodes), 2) + + node = node[0] + self.log.debug("Marking first node as used %s", node.id) + node.state = zk.USED + self.zk.storeNode(node) + self.waitForNodeDeletion(node) + + self.log.debug("Waiting for node to be re-available") + node = self.waitForNodes('fake-label') + self.assertEqual(len(node), 1) + + def test_static_multinode_handler(self): + configfile = self.setup_config('static.yaml') + pool = self.useNodepool(configfile, watermark_sleep=1) + pool.start() + + req = zk.NodeRequest() + req.state = zk.REQUESTED + req.node_types.append('fake-label') + req.node_types.append('fake-concurrent-label') + self.zk.storeNodeRequest(req) + + self.log.debug("Waiting for request %s", req.id) + req = self.waitForNodeRequest(req) + self.assertEqual(req.state, zk.FULFILLED) + self.assertEqual(len(req.nodes), 2) + + def test_static_multiprovider_handler(self): + configfile = self.setup_config('multiproviders.yaml') + pool = self.useNodepool(configfile, watermark_sleep=1) + pool.start() + + self.wait_for_config(pool) + manager = pool.getProviderManager('openstack-provider') + manager._client.create_image(name="fake-image") + + req = zk.NodeRequest() + req.state = zk.REQUESTED + req.node_types.append('fake-static-label') + self.zk.storeNodeRequest(req) + + self.log.debug("Waiting for request %s", req.id) + req = self.waitForNodeRequest(req) + self.assertEqual(req.state, zk.FULFILLED) + self.assertEqual(len(req.nodes), 1) + + req = zk.NodeRequest() + req.state = zk.REQUESTED + req.node_types.append('fake-openstack-label') + self.zk.storeNodeRequest(req) + + self.log.debug("Waiting for request %s", req.id) + req = self.waitForNodeRequest(req) + self.assertEqual(req.state, zk.FULFILLED) + self.assertEqual(len(req.nodes), 1)