Simplify driver API

Further decouple the driver API from the provider configuration
by simplifying the driver interface (which will eventually lead to
the driver deciding whether or not to support multiple labels) and
removing config references in NodeLauncher.

This significantly simplifies the driver API by:

  * Reducing the launching of nodes to a launch() and launchesCompleted()
    interface. The launchesCompleted() method acts as a periodic check on
    whether the nodeset launches have all completed.

  * How the drivers actually launch nodes is decoupled from the driver
    API by not having the return of NodeRequestHandler.launch() be either
    a thread or None. This leaves it entirely up to the driver to decide
    the best way to manage launches... threads, subprocesses, messaging,
    etc. E.g., thread handling is moved from driver API into OpenStack driver.

  * Drivers no longer need to implement the very confusing pollLauncher()
    method. This was originally meant to poll the launch threads, but
    even drivers not using threads (e.g., the static driver) had to
    reimplement this method. That made no sense.

The NodeLauncher class (currently used only by the OpenStack driver),
is moved to a driver utility module that other drivers may choose to
use or ignore. And it no longer directly accesses the provider config.

The test_nodelaunchmanager.py tests was originally testing the
NodeLaunchManager class (removed quite some time ago) which managed
launch threads. This was eventually moved to the driver interface.
This change further moves thread management into the OpenStack driver.
As a result, these tests are redundant since the test_launcher.py
tests cover all of this. So remove test_nodelaunchmanager.py.

Change-Id: I407d999b3608e1614c0dbe49808b2a64756dde58
This commit is contained in:
David Shrewsbury
2018-05-15 17:29:37 -04:00
parent 3b1f958ad6
commit be0c59535a
7 changed files with 212 additions and 266 deletions

View File

@@ -22,14 +22,9 @@ import importlib
import logging
import math
import os
import time
import threading
from kazoo import exceptions as kze
from nodepool import zk
from nodepool import exceptions
from nodepool import stats
class Drivers:
@@ -168,7 +163,6 @@ class NodeRequestHandler(object, metaclass=abc.ABCMeta):
self._failed_nodes = []
self._ready_nodes = []
self._threads = []
def _setFromPoolWorker(self):
'''
@@ -182,14 +176,6 @@ class NodeRequestHandler(object, metaclass=abc.ABCMeta):
self.zk = self.pw.getZK()
self.manager = self.pw.getProviderManager()
@property
def alive_thread_count(self):
count = 0
for t in self._threads:
if t.isAlive():
count += 1
return count
@property
def failed_nodes(self):
return self._failed_nodes
@@ -315,10 +301,7 @@ class NodeRequestHandler(object, metaclass=abc.ABCMeta):
self.zk.storeNode(node)
self.nodeset.append(node)
thread = self.launch(node)
if thread:
thread.start()
self._threads.append(thread)
self.launch(node)
def _runHandler(self):
'''
@@ -459,24 +442,23 @@ class NodeRequestHandler(object, metaclass=abc.ABCMeta):
self.done = True
def poll(self):
'''
Check if the request has been handled.
Once the request has been handled, the 'nodeset' attribute will be
filled with the list of nodes assigned to the request, or it will be
empty if the request could not be fulfilled.
:returns: True if we are done with the request, False otherwise.
'''
if self.paused:
return False
if self.done:
return True
if not self.pollLauncher():
# Driver must implement this call
if not self.launchesComplete():
return False
# Launches are complete, so populate ready_nodes and failed_nodes.
for node in self.nodeset:
if node.state == zk.READY:
self.ready_nodes.append(node)
else:
self.failed_nodes.append(node)
# If the request has been pulled, unallocate the node set so other
# requests can use them.
if not self.zk.getNodeRequest(self.request.id):
@@ -525,34 +507,6 @@ class NodeRequestHandler(object, metaclass=abc.ABCMeta):
# ---------------------------------------------------------------
# Driver Implementation
# ---------------------------------------------------------------
def pollLauncher(self):
'''
Check if all launch requests have completed.
When all of the Node objects have reached a final state (READY or
FAILED), we'll know all threads have finished the launch process.
'''
if not self._threads:
return True
# Give the NodeLaunch threads time to finish.
if self.alive_thread_count:
return False
node_states = [node.state for node in self.nodeset]
# NOTE: It very important that NodeLauncher always sets one of
# these states, no matter what.
if not all(s in (zk.READY, zk.FAILED) for s in node_states):
return False
for node in self.nodeset:
if node.state == zk.READY:
self._ready_nodes.append(node)
else:
self._failed_nodes.append(node)
return True
def hasProviderQuota(self, node_types):
'''
@@ -595,6 +549,23 @@ class NodeRequestHandler(object, metaclass=abc.ABCMeta):
'''
pass
@property
@abc.abstractmethod
def alive_thread_count(self):
'''
Return the number of active node launching threads in use by this
request handler.
This is used to limit request handling threads for a provider.
This is an approximate, top-end number for alive threads, since some
threads obviously may have finished by the time we finish the
calculation.
:returns: A count (integer) of active threads.
'''
pass
@abc.abstractmethod
def imagesAvailable(self):
'''
@@ -612,64 +583,18 @@ class NodeRequestHandler(object, metaclass=abc.ABCMeta):
'''
pass
@abc.abstractmethod
def launchesComplete(self):
'''
Handler needs to implement this to check if all nodes in self.nodeset
have completed the launch sequence..
class NodeLauncher(threading.Thread, stats.StatsReporter):
'''
Class to launch a single node.
This method will be called periodically to check on launch progress.
The NodeRequestHandler may return such object to manage asynchronous
node creation.
Subclasses are required to implement the launch method
'''
def __init__(self, handler, node):
threading.Thread.__init__(self, name="NodeLauncher-%s" % node.id)
stats.StatsReporter.__init__(self)
self.log = logging.getLogger("nodepool.NodeLauncher-%s" % node.id)
self.handler = handler
self.node = node
self.label = handler.pool.labels[node.type]
self.pool = self.label.pool
self.provider_config = self.pool.provider
def storeNode(self):
"""Store the node state in Zookeeper"""
self.handler.zk.storeNode(self.node)
def run(self):
start_time = time.monotonic()
statsd_key = 'ready'
try:
self.launch()
except kze.SessionExpiredError:
# Our node lock is gone, leaving the node state as BUILDING.
# This will get cleaned up in ZooKeeper automatically, but we
# must still set our cached node state to FAILED for the
# NodeLaunchManager's poll() method.
self.log.error(
"Lost ZooKeeper session trying to launch for node %s",
self.node.id)
self.node.state = zk.FAILED
statsd_key = 'error.zksession'
except Exception as e:
self.log.exception("Launch failed for node %s:",
self.node.id)
self.node.state = zk.FAILED
self.handler.zk.storeNode(self.node)
if hasattr(e, 'statsd_key'):
statsd_key = e.statsd_key
else:
statsd_key = 'error.unknown'
try:
dt = int((time.monotonic() - start_time) * 1000)
self.recordLaunchStats(statsd_key, dt)
self.updateNodeStats(self.handler.zk, self.provider_config)
except Exception:
self.log.exception("Exception while reporting stats:")
:returns: True if all launches are complete (successfully or not),
False otherwise.
'''
pass
class ConfigValue(object, metaclass=abc.ABCMeta):

View File

@@ -22,28 +22,32 @@ from kazoo import exceptions as kze
from nodepool import exceptions
from nodepool import nodeutils as utils
from nodepool import zk
from nodepool.driver import NodeLauncher
from nodepool.driver.utils import NodeLauncher
from nodepool.driver import NodeRequestHandler
from nodepool.driver.openstack.provider import QuotaInformation
class OpenStackNodeLauncher(NodeLauncher):
def __init__(self, handler, node, retries):
def __init__(self, handler, node, provider_config, provider_label):
'''
Initialize the launcher.
:param NodeRequestHandler handler: The handler object.
:param Node node: The node object.
:param int retries: Number of times to retry failed launches.
:param OpenStackNodeRequestHandler handler: The handler object.
:param Node node: A Node object describing the node to launch.
:param ProviderConfig provider_config: A ProviderConfig object
describing the provider launching this node.
:param ProviderLabel provider_label: A ProviderLabel object
describing the label to use for the node.
'''
super().__init__(handler, node)
self._retries = retries
super().__init__(handler.zk, node, provider_config)
if self.label.diskimage:
self._diskimage = self.provider_config.diskimages[
self.label.diskimage.name]
else:
self._diskimage = None
# Number of times to retry failed launches.
self._retries = provider_config.launch_retries
self.label = provider_label
self.pool = provider_label.pool
self.handler = handler
self.zk = handler.zk
def _logConsole(self, server_id, hostname):
if not self.label.console_log:
@@ -56,17 +60,23 @@ class OpenStackNodeLauncher(NodeLauncher):
def _launchNode(self):
if self.label.diskimage:
diskimage = self.provider_config.diskimages[
self.label.diskimage.name]
else:
diskimage = None
if diskimage:
# launch using diskimage
cloud_image = self.handler.zk.getMostRecentImageUpload(
self._diskimage.name, self.provider_config.name)
diskimage.name, self.provider_config.name)
if not cloud_image:
raise exceptions.LaunchNodepoolException(
"Unable to find current cloud image %s in %s" %
(self._diskimage.name, self.provider_config.name)
(diskimage.name, self.provider_config.name)
)
config_drive = self._diskimage.config_drive
config_drive = diskimage.config_drive
image_external = dict(id=cloud_image.external_id)
image_id = "{path}/{upload_id}".format(
path=self.handler.zk._imageUploadPath(
@@ -74,10 +84,10 @@ class OpenStackNodeLauncher(NodeLauncher):
cloud_image.build_id,
cloud_image.provider_name),
upload_id=cloud_image.id)
image_name = self._diskimage.name
image_name = diskimage.name
username = cloud_image.username
connection_type = self._diskimage.connection_type
connection_port = self._diskimage.connection_port
connection_type = diskimage.connection_type
connection_port = diskimage.connection_port
else:
# launch using unmanaged cloud image
@@ -129,7 +139,7 @@ class OpenStackNodeLauncher(NodeLauncher):
self.node.connection_port = connection_port
# Checkpoint save the updated node info
self.storeNode()
self.zk.storeNode(self.node)
self.log.debug("Waiting for server %s for node id: %s" %
(server.id, self.node.id))
@@ -168,7 +178,7 @@ class OpenStackNodeLauncher(NodeLauncher):
self.node.private_ipv4 = server.public_v4
# Checkpoint save the updated node info
self.storeNode()
self.zk.storeNode(self.node)
self.log.debug(
"Node %s is running [region: %s, az: %s, ip: %s ipv4: %s, "
@@ -199,7 +209,7 @@ class OpenStackNodeLauncher(NodeLauncher):
raise
self.node.host_keys = host_keys
self.storeNode()
self.zk.storeNode(self.node)
def launch(self):
attempts = 1
@@ -225,7 +235,7 @@ class OpenStackNodeLauncher(NodeLauncher):
self.node.public_ipv4 = None
self.node.public_ipv6 = None
self.node.interface_ip = None
self.storeNode()
self.zk.storeNode(self.node)
if attempts == self._retries:
raise
# Invalidate the quota cache if we encountered a quota error.
@@ -235,7 +245,7 @@ class OpenStackNodeLauncher(NodeLauncher):
attempts += 1
self.node.state = zk.READY
self.storeNode()
self.zk.storeNode(self.node)
self.log.info("Node id %s is ready", self.node.id)
@@ -244,6 +254,15 @@ class OpenStackNodeRequestHandler(NodeRequestHandler):
def __init__(self, pw, request):
super().__init__(pw, request)
self.chosen_az = None
self._threads = []
@property
def alive_thread_count(self):
count = 0
for t in self._threads:
if t.isAlive():
count += 1
return count
def imagesAvailable(self):
'''
@@ -348,5 +367,31 @@ class OpenStackNodeRequestHandler(NodeRequestHandler):
node.cloud = self.provider.cloud_config.name
node.region = self.provider.region_name
def launchesComplete(self):
'''
Check if all launch requests have completed.
When all of the Node objects have reached a final state (READY or
FAILED), we'll know all threads have finished the launch process.
'''
if not self._threads:
return True
# Give the NodeLaunch threads time to finish.
if self.alive_thread_count:
return False
node_states = [node.state for node in self.nodeset]
# NOTE: It very important that NodeLauncher always sets one of
# these states, no matter what.
if not all(s in (zk.READY, zk.FAILED) for s in node_states):
return False
return True
def launch(self, node):
return OpenStackNodeLauncher(self, node, self.provider.launch_retries)
label = self.pool.labels[node.type]
thd = OpenStackNodeLauncher(self, node, self.provider, label)
thd.start()
self._threads.append(thd)

View File

@@ -24,6 +24,11 @@ class StaticNodeRequestHandler(NodeRequestHandler):
log = logging.getLogger("nodepool.driver.static."
"StaticNodeRequestHandler")
@property
def alive_thread_count(self):
# We don't spawn threads to launch nodes, so always return 1.
return 1
def _checkConcurrency(self, static_node):
access_count = 0
@@ -76,7 +81,12 @@ class StaticNodeRequestHandler(NodeRequestHandler):
node.host_keys = self.manager.nodes_keys[static_node["name"]]
self.zk.storeNode(node)
def pollLauncher(self):
def launchesComplete(self):
'''
Our nodeset could have nodes in BUILDING state because we may be
waiting for one of our static nodes to free up. Keep calling launch()
to try to grab one.
'''
waiting_node = False
for node in self.nodeset:
if node.state == zk.READY:

86
nodepool/driver/utils.py Normal file
View File

@@ -0,0 +1,86 @@
# Copyright (C) 2018 Red Hat
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
#
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import logging
import threading
import time
from kazoo import exceptions as kze
from nodepool import stats
from nodepool import zk
class NodeLauncher(threading.Thread,
stats.StatsReporter,
metaclass=abc.ABCMeta):
'''
Class to launch a single node within a thread and record stats.
At this time, the implementing class must manage this thread.
'''
def __init__(self, zk_conn, node, provider_config):
'''
:param ZooKeeper zk_conn: The ZooKeeper connection object.
:param Node node: A Node object describing the node to launch.
:param ProviderConfig provider_config: A ProviderConfig object
describing the provider launching this node.
'''
threading.Thread.__init__(self, name="NodeLauncher-%s" % node.id)
stats.StatsReporter.__init__(self)
self.log = logging.getLogger("nodepool.NodeLauncher-%s" % node.id)
self.zk = zk_conn
self.node = node
self.provider_config = provider_config
@abc.abstractmethod
def launch(self):
pass
def run(self):
start_time = time.monotonic()
statsd_key = 'ready'
try:
self.launch()
except kze.SessionExpiredError:
# Our node lock is gone, leaving the node state as BUILDING.
# This will get cleaned up in ZooKeeper automatically, but we
# must still set our cached node state to FAILED for the
# NodeLaunchManager's poll() method.
self.log.error(
"Lost ZooKeeper session trying to launch for node %s",
self.node.id)
self.node.state = zk.FAILED
statsd_key = 'error.zksession'
except Exception as e:
self.log.exception("Launch failed for node %s:", self.node.id)
self.node.state = zk.FAILED
self.zk.storeNode(self.node)
if hasattr(e, 'statsd_key'):
statsd_key = e.statsd_key
else:
statsd_key = 'error.unknown'
try:
dt = int((time.monotonic() - start_time) * 1000)
self.recordLaunchStats(statsd_key, dt)
self.updateNodeStats(self.zk, self.provider_config)
except Exception:
self.log.exception("Exception while reporting stats:")

View File

@@ -21,9 +21,16 @@ from nodepool.driver import NodeRequestHandler
class TestHandler(NodeRequestHandler):
log = logging.getLogger("nodepool.driver.test.TestHandler")
@property
def alive_thread_count(self):
return 1
def imagesAvailable(self):
return True
def launchesComplete(self):
return True
def launch(self, node):
node.state = zk.READY
node.external_id = "test-%s" % self.request.id

View File

@@ -1323,7 +1323,8 @@ class TestLauncher(tests.DBTestCase):
while self.zk.countPoolNodes('fake-provider', 'main'):
time.sleep(0)
@mock.patch('nodepool.driver.NodeRequestHandler.poll')
@mock.patch(
'nodepool.driver.openstack.handler.OpenStackNodeRequestHandler.poll')
def test_handler_poll_session_expired(self, mock_poll):
'''
Test ZK session lost during handler poll().

View File

@@ -1,128 +0,0 @@
# Copyright (C) 2017 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import mock
import time
from nodepool import builder
from nodepool import provider_manager
from nodepool import tests
from nodepool import zk
from nodepool.driver.openstack.handler import OpenStackNodeRequestHandler
class TestNodeLaunchManager(tests.DBTestCase):
log = logging.getLogger("nodepool.TestNodeLaunchManager")
def _setup(self, configfile):
# Need a builder for the launch code to work and to access
# config objects.
b = builder.NodePoolBuilder(configfile)
b.cleanup_interval = .5
b.build_interval = .1
b.upload_interval = .1
b.dib_cmd = 'nodepool/tests/fake-image-create'
b.start()
self.addCleanup(b.stop)
self.waitForImage('fake-provider', 'fake-image')
self.provider = b._config.providers['fake-provider']
self.provider_pool = self.provider.pools['main']
# The builder config does not have a provider manager, so create one.
self.pmanager = provider_manager.get_provider(
self.provider, False)
self.pmanager.resetClient()
def _createHandler(self, retries=1):
# Mock NodeRequest handler object
class FakePoolWorker:
launcher_id = 'Test'
class FakeRequest:
requestor = 'zuul'
class FakeProvider:
launch_retries = retries
handler = OpenStackNodeRequestHandler(FakePoolWorker(), FakeRequest())
handler.zk = self.zk
handler.pool = self.provider_pool
handler.manager = self.pmanager
handler.provider = FakeProvider()
return handler
def _launch(self, handler, node):
# Mock NodeRequest runHandler method
thread = handler.launch(node)
thread.start()
handler._threads.append(thread)
handler.nodeset.append(node)
def test_successful_launch(self):
configfile = self.setup_config('node.yaml')
self._setup(configfile)
handler = self._createHandler(1)
n1 = zk.Node()
n1.state = zk.BUILDING
n1.type = 'fake-label'
self._launch(handler, n1)
while not handler.pollLauncher():
time.sleep(0)
self.assertEqual(len(handler.ready_nodes), 1)
self.assertEqual(len(handler.failed_nodes), 0)
nodes = handler.manager.listNodes()
self.assertEqual(nodes[0]['metadata']['groups'],
'fake-provider,fake-image,fake-label')
@mock.patch('nodepool.driver.openstack.handler.'
'OpenStackNodeLauncher._launchNode')
def test_failed_launch(self, mock_launch):
configfile = self.setup_config('node.yaml')
self._setup(configfile)
handler = self._createHandler(1)
mock_launch.side_effect = Exception()
n1 = zk.Node()
n1.state = zk.BUILDING
n1.type = 'fake-label'
self._launch(handler, n1)
while not handler.pollLauncher():
time.sleep(0)
self.assertEqual(len(handler.failed_nodes), 1)
self.assertEqual(len(handler.ready_nodes), 0)
@mock.patch('nodepool.driver.openstack.handler.'
'OpenStackNodeLauncher._launchNode')
def test_mixed_launch(self, mock_launch):
configfile = self.setup_config('node.yaml')
self._setup(configfile)
handler = self._createHandler(1)
mock_launch.side_effect = [None, Exception()]
n1 = zk.Node()
n1.state = zk.BUILDING
n1.type = 'fake-label'
n2 = zk.Node()
n2.state = zk.BUILDING
n2.type = 'fake-label'
self._launch(handler, n1)
self._launch(handler, n2)
while not handler.pollLauncher():
time.sleep(0)
self.assertEqual(len(handler._failed_nodes), 1)
self.assertEqual(len(handler._ready_nodes), 1)