Implement a static driver for Nodepool

This change adds a static node driver.

Change-Id: I065f2b42af49a57218f1c04a08b3ddd15ccc0832
Story: 2001044
Task: 4615
This commit is contained in:
Tristan Cacqueray 2017-06-17 12:44:16 +00:00
parent 318e899b89
commit 6ac2f33cb3
12 changed files with 621 additions and 0 deletions

View File

@ -631,3 +631,61 @@ Example configuration::
When booting an image from volume, how big should the created volume be.
In gigabytes. Default 50.
Static driver
^^^^^^^^^^^^^
The static provider driver is used to define static nodes. Nodes are also
partitioned into groups called "pools" (see :ref:`static_nodes` for details).
Example::
providers:
- name: static-rack
driver: static
pools:
- name: main
nodes:
- name: trusty.example.com
labels: trusty-static
host-key: fake-key
timeout: 13
ssh-port: 22022
username: zuul
max-parallel-jobs: 1
.. _static_nodes:
static nodes
~~~~~~~~~~~~
Each entry in a pool`s nodes section indicates a static node and it's
corresponding label.
**required**
``name``
The hostname or ip address of the static node.
``labels`` (list)
The list of labels associated with the node.
**optional**
``username``
The username nodepool will use to validate it can connect to the node.
Default to *zuul*
``timeout``
The timeout in second before the ssh ping is considered failed.
Default to *5* seconds
``host-key``
The ssh host key of the node.
``ssh-port``
The ssh port, default to *22*
``max-parallel-jobs``
The number of jobs that can run in parallel on this node, default to *1*.

View File

@ -53,6 +53,14 @@ class DiskImage(ConfigValue):
return "<DiskImage %s>" % self.name
def as_list(item):
if not item:
return []
if isinstance(item, list):
return item
return [item]
def get_provider_config(provider):
provider.setdefault('driver', 'openstack')
# Ensure legacy configuration still works when using fake cloud

View File

View File

@ -0,0 +1,82 @@
# Copyright 2017 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
#
# See the License for the specific language governing permissions and
# limitations under the License.
import voluptuous as v
from nodepool.driver import ConfigValue
from nodepool.driver import ProviderConfig
from nodepool.config import as_list
class StaticPool(ConfigValue):
def __eq__(self, other):
if (other.labels != self.labels or
other.nodes != self.nodes):
return False
return True
def __repr__(self):
return "<StaticPool %s>" % self.name
class StaticProviderConfig(ProviderConfig):
def __eq__(self, other):
if other.pools != self.pools:
return False
return True
@staticmethod
def reset():
pass
def load(self, config):
self.pools = {}
for pool in self.provider.get('pools', []):
pp = StaticPool()
pp.name = pool['name']
pp.provider = self
self.pools[pp.name] = pp
pp.labels = set()
pp.nodes = []
for node in pool.get('nodes', []):
pp.nodes.append({
'name': node['name'],
'labels': as_list(node['labels']),
'host-key': as_list(node.get('host-key', [])),
'timeout': int(node.get('timeout', 5)),
'ssh-port': int(node.get('ssh-port', 22)),
'username': node.get('username', 'zuul'),
'max-parallel-jobs': int(node.get('max-parallel-jobs', 1)),
})
for label in node['labels'].split():
pp.labels.add(label)
config.labels[label].pools.append(pp)
def get_schema(self):
pool_node = {
v.Required('name'): str,
v.Required('labels'): v.Any(str, [str]),
'username': str,
'timeout': int,
'host-key': v.Any(str, [str]),
'ssh-port': int,
'max-parallel-jobs': int,
}
pool = {
'name': str,
'nodes': [pool_node],
}
return v.Schema({'pools': [pool]})

View File

@ -0,0 +1,182 @@
# Copyright 2017 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import random
from nodepool import exceptions
from nodepool import nodeutils
from nodepool import zk
from nodepool.driver import NodeRequestHandler
class StaticNodeRequestHandler(NodeRequestHandler):
log = logging.getLogger("nodepool.driver.static."
"StaticNodeRequestHandler")
def _invalidNodeTypes(self):
'''
Return any node types that are invalid for this provider.
:returns: A list of node type names that are invalid, or an empty
list if all are valid.
'''
invalid = []
for ntype in self.request.node_types:
if ntype not in self.pool.labels:
invalid.append(ntype)
return invalid
def checkConcurrency(self, static_node):
access_count = 0
for node in self.zk.nodeIterator():
if node.hostname != static_node["name"]:
continue
if node.state in ('ready', 'in-use'):
access_count += 1
if access_count >= static_node["max-parallel-jobs"]:
self.log.info("%s: max concurrency reached (%d)" % (
static_node["name"], access_count))
return False
return True
def _waitForNodeSet(self):
'''
Fill node set for the request.
'''
needed_types = self.request.node_types
static_nodes = []
unavailable_nodes = []
ready_nodes = self.zk.getReadyNodesOfTypes(needed_types)
for ntype in needed_types:
# First try to grab from the list of already available nodes.
got_a_node = False
if self.request.reuse and ntype in ready_nodes:
for node in ready_nodes[ntype]:
# Only interested in nodes from this provider and
# pool
if node.provider != self.provider.name:
continue
if node.pool != self.pool.name:
continue
try:
self.zk.lockNode(node, blocking=False)
except exceptions.ZKLockException:
# It's already locked so skip it.
continue
else:
if self.paused:
self.log.debug("Unpaused request %s", self.request)
self.paused = False
self.log.debug(
"Locked existing node %s for request %s",
node.id, self.request.id)
got_a_node = True
node.allocated_to = self.request.id
self.zk.storeNode(node)
self.nodeset.append(node)
break
# Could not grab an existing node, so assign a new one.
if not got_a_node:
for node in self.available_nodes:
if ntype in node["labels"]:
max_concurrency = not self.checkConcurrency(node)
if max_concurrency:
continue
static_nodes.append((ntype, node))
break
if max_concurrency:
unavailable_nodes.append(ntype)
if unavailable_nodes:
self.log.debug("%s: static nodes %s are at capacity" % (
self.request.id, unavailable_nodes))
self.zk.storeNodeRequest(self.request)
self.zk.unlockNodeRequest(self.request)
self.done = True
return
for node_type, static_node in static_nodes:
self.log.debug("%s: Assigning static_node %s" % (
self.request.id, static_node))
node = zk.Node()
node.state = zk.READY
node.external_id = "static-%s" % self.request.id
node.hostname = static_node["name"]
node.username = static_node["username"]
node.interface_ip = static_node["name"]
node.connection_port = static_node["ssh-port"]
node.connection_type = "ssh"
nodeutils.set_node_ip(node)
node.host_keys = self.manager.nodes_keys[static_node["name"]]
node.provider = self.provider.name
node.pool = self.pool.name
node.launcher = self.launcher_id
node.allocated_to = self.request.id
node.type = node_type
self.nodeset.append(node)
self.zk.storeNode(node)
def run_handler(self):
'''
Main body for the StaticNodeRequestHandler.
'''
self._setFromPoolWorker()
# We have the launcher_id attr after _setFromPoolWorker() is called.
self.log = logging.getLogger(
"nodepool.driver.static.StaticNodeRequestHandler[%s]" %
self.launcher_id)
declined_reasons = []
invalid_types = self._invalidNodeTypes()
if invalid_types:
declined_reasons.append('node type(s) [%s] not available' %
','.join(invalid_types))
self.available_nodes = self.manager.listNodes()
# Randomize static nodes order
random.shuffle(self.available_nodes)
if len(self.request.node_types) > len(self.available_nodes):
declined_reasons.append('it would exceed quota')
if declined_reasons:
self.log.debug("Declining node request %s because %s",
self.request.id, ', '.join(declined_reasons))
self.decline_request()
self.unlockNodeSet(clear_allocation=True)
# If conditions have changed for a paused request to now cause us
# to decline it, we need to unpause so we don't keep trying it
if self.paused:
self.paused = False
self.zk.storeNodeRequest(self.request)
self.zk.unlockNodeRequest(self.request)
self.done = True
return
if self.paused:
self.log.debug("Retrying node request %s", self.request.id)
else:
self.log.debug("Accepting node request %s", self.request.id)
self.request.state = zk.PENDING
self.zk.storeNodeRequest(self.request)
self._waitForNodeSet()

View File

@ -0,0 +1,90 @@
# Copyright 2017 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
from nodepool import exceptions
from nodepool.driver import Provider
from nodepool.nodeutils import keyscan
class StaticNodeError(Exception):
pass
class StaticNodeProvider(Provider):
log = logging.getLogger("nodepool.driver.static."
"StaticNodeProvider")
def __init__(self, provider, *args):
self.provider = provider
self.pools = {}
self.static_nodes = {}
self.nodes_keys = {}
def checkHost(self, node):
# Check node is reachable
try:
keys = keyscan(node["name"],
port=node["ssh-port"],
timeout=node["timeout"])
except exceptions.SSHTimeoutException:
raise StaticNodeError("%s: SSHTimeoutException" % node["name"])
# Check node host-key
if set(node["host-key"]).issubset(set(keys)):
return keys
self.log.debug("%s: Registered key '%s' not in %s" % (
node["name"], node["host-key"], keys
))
raise StaticNodeError("%s: host key mismatches (%s)" %
(node["name"], keys))
def start(self):
for pool in self.provider.pools.values():
self.pools[pool.name] = {}
for node in pool.nodes:
node_name = "%s-%s" % (pool.name, node["name"])
self.log.debug("%s: Registering static node" % node_name)
try:
self.nodes_keys[node["name"]] = self.checkHost(node)
except StaticNodeError as e:
self.log.error("Couldn't register static node: %s" % e)
continue
self.static_nodes[node_name] = node
def stop(self):
self.log.debug("Stopping")
def listNodes(self):
servers = []
for node in self.static_nodes.values():
servers.append(node)
return servers
def cleanupNode(self, server_id):
return True
def waitForNodeCleanup(self, server_id):
return True
def labelReady(self, name):
return True
def join(self):
return True
def cleanupLeakedResources(self):
pass

View File

@ -41,6 +41,22 @@ def iterate_timeout(max_seconds, exc, purpose):
raise exc("Timeout waiting for %s" % purpose)
def set_node_ip(node):
'''
Set the node public_ip
'''
if 'fake' in node.hostname:
return
addrinfo = socket.getaddrinfo(node.hostname, node.connection_port)[0]
if addrinfo[0] == socket.AF_INET:
node.public_ipv4 = addrinfo[4][0]
elif addrinfo[0] == socket.AF_INET6:
node.public_ipv6 = addrinfo[4][0]
else:
raise exceptions.LaunchNetworkException(
"Unable to find public IP of server")
def keyscan(ip, port=22, timeout=60):
'''
Scan the IP address for public SSH keys.

View File

@ -18,6 +18,7 @@ labels:
min-ready: 0
- name: trusty-external
min-ready: 1
- name: trusty-static
providers:
- name: cloud1
@ -76,6 +77,19 @@ providers:
cloud-image: trusty-unmanaged
min-ram: 8192
- name: static-rack
driver: static
pools:
- name: main
nodes:
- name: trusty.example.com
labels: trusty-static
host-key: fake-key
timeout: 13
ssh-port: 22022
username: zuul
max-parallel-jobs: 1
diskimages:
- name: trusty
formats:

View File

@ -0,0 +1,5 @@
providers:
- name: static-rack
driver: static
cloud: no-cloud

View File

@ -0,0 +1,33 @@
zookeeper-servers:
- host: {zookeeper_host}
port: {zookeeper_port}
chroot: {zookeeper_chroot}
labels:
- name: fake-static-label
min-ready: 1
- name: fake-openstack-label
min-ready: 1
providers:
- name: static-provider
driver: static
pools:
- name: main
nodes:
- name: fake-host-1
labels: fake-static-label
max-parallel-jobs: 1
- name: openstack-provider
cloud: fake
driver: fake
cloud-images:
- name: fake-image
pools:
- name: main
labels:
- name: fake-openstack-label
min-ram: 1
cloud-image: fake-image

29
nodepool/tests/fixtures/static.yaml vendored Normal file
View File

@ -0,0 +1,29 @@
zookeeper-servers:
- host: {zookeeper_host}
port: {zookeeper_port}
chroot: {zookeeper_chroot}
labels:
- name: fake-label
min-ready: 2
- name: fake-concurrent-label
min-ready: 2
providers:
- name: static-provider
driver: static
pools:
- name: main
nodes:
- name: fake-host-1
labels: fake-label
host-key: ssh-rsa FAKEKEY
timeout: 13
ssh-port: 22022
username: zuul
max-parallel-jobs: 1
- name: fake-host-2
labels: fake-concurrent-label
host-key: ssh-rsa FAKEKEY
max-parallel-jobs: 2

View File

@ -0,0 +1,104 @@
# Copyright (C) 2017 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
from nodepool import config as nodepool_config
from nodepool import tests
from nodepool import zk
from nodepool.cmd.config_validator import ConfigValidator
from voluptuous import MultipleInvalid
class TestDriverStatic(tests.DBTestCase):
log = logging.getLogger("nodepool.TestDriverStatic")
def test_static_validator(self):
config = os.path.join(os.path.dirname(tests.__file__),
'fixtures', 'config_validate',
'static_error.yaml')
validator = ConfigValidator(config)
self.assertRaises(MultipleInvalid, validator.validate)
def test_static_config(self):
configfile = self.setup_config('static.yaml')
config = nodepool_config.loadConfig(configfile)
self.assertIn('static-provider', config.providers)
def test_static_handler(self):
configfile = self.setup_config('static.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
pool.start()
self.log.debug("Waiting for min-ready nodes")
node = self.waitForNodes('fake-label')
self.assertEqual(len(node), 1)
nodes = self.waitForNodes('fake-concurrent-label', 2)
self.assertEqual(len(nodes), 2)
node = node[0]
self.log.debug("Marking first node as used %s", node.id)
node.state = zk.USED
self.zk.storeNode(node)
self.waitForNodeDeletion(node)
self.log.debug("Waiting for node to be re-available")
node = self.waitForNodes('fake-label')
self.assertEqual(len(node), 1)
def test_static_multinode_handler(self):
configfile = self.setup_config('static.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
pool.start()
req = zk.NodeRequest()
req.state = zk.REQUESTED
req.node_types.append('fake-label')
req.node_types.append('fake-concurrent-label')
self.zk.storeNodeRequest(req)
self.log.debug("Waiting for request %s", req.id)
req = self.waitForNodeRequest(req)
self.assertEqual(req.state, zk.FULFILLED)
self.assertEqual(len(req.nodes), 2)
def test_static_multiprovider_handler(self):
configfile = self.setup_config('multiproviders.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
pool.start()
self.wait_for_config(pool)
manager = pool.getProviderManager('openstack-provider')
manager._client.create_image(name="fake-image")
req = zk.NodeRequest()
req.state = zk.REQUESTED
req.node_types.append('fake-static-label')
self.zk.storeNodeRequest(req)
self.log.debug("Waiting for request %s", req.id)
req = self.waitForNodeRequest(req)
self.assertEqual(req.state, zk.FULFILLED)
self.assertEqual(len(req.nodes), 1)
req = zk.NodeRequest()
req.state = zk.REQUESTED
req.node_types.append('fake-openstack-label')
self.zk.storeNodeRequest(req)
self.log.debug("Waiting for request %s", req.id)
req = self.waitForNodeRequest(req)
self.assertEqual(req.state, zk.FULFILLED)
self.assertEqual(len(req.nodes), 1)