Refactor NodeLauncher to be generic

This change refactors the NodeLauncher object so that it is part
of the generic interface. A nodepool driver handler only need to
return a subclass implementing the launch method.

Moreover this change adapts the StatsReporter class.

Change-Id: I6cfec650b862cb4fa0cb391bcc1248549e30c91b
This commit is contained in:
Tristan Cacqueray 2018-01-10 01:44:53 +00:00
parent badb7e48ad
commit 11f0ffd201
5 changed files with 163 additions and 164 deletions

View File

@ -22,11 +22,16 @@ import importlib
import logging import logging
import math import math
import os import os
import time
import threading
import six import six
from kazoo import exceptions as kze
from nodepool import zk from nodepool import zk
from nodepool import exceptions from nodepool import exceptions
from nodepool import stats
class Drivers: class Drivers:
@ -622,6 +627,65 @@ class NodeRequestHandler(object):
pass pass
class NodeLauncher(threading.Thread, stats.StatsReporter):
'''
Class to launch a single node.
The NodeRequestHandler may return such object to manage asynchronous
node creation.
Subclasses are required to implement the launch method
'''
def __init__(self, handler, node):
threading.Thread.__init__(self, name="NodeLauncher-%s" % node.id)
stats.StatsReporter.__init__(self)
self.log = logging.getLogger("nodepool.NodeLauncher-%s" % node.id)
self.handler = handler
self.node = node
self.label = handler.pool.labels[node.type]
self.pool = self.label.pool
self.provider_config = self.pool.provider
def storeNode(self):
"""Store the node state in Zookeeper"""
self.handler.zk.storeNode(self.node)
def run(self):
start_time = time.monotonic()
statsd_key = 'ready'
try:
self.launch()
except kze.SessionExpiredError:
# Our node lock is gone, leaving the node state as BUILDING.
# This will get cleaned up in ZooKeeper automatically, but we
# must still set our cached node state to FAILED for the
# NodeLaunchManager's poll() method.
self.log.error(
"Lost ZooKeeper session trying to launch for node %s",
self.node.id)
self.node.state = zk.FAILED
statsd_key = 'error.zksession'
except Exception as e:
self.log.exception("Launch failed for node %s:",
self.node.id)
self.node.state = zk.FAILED
self.handler.zk.storeNode(self.node)
if hasattr(e, 'statsd_key'):
statsd_key = e.statsd_key
else:
statsd_key = 'error.unknown'
try:
dt = int((time.monotonic() - start_time) * 1000)
self.recordLaunchStats(statsd_key, dt)
self.updateNodeStats(self.handler.zk, self.provider_config)
except Exception:
self.log.exception("Exception while reporting stats:")
class ConfigValue(object): class ConfigValue(object):
def __eq__(self, other): def __eq__(self, other):
if isinstance(other, ConfigValue): if isinstance(other, ConfigValue):

View File

@ -13,86 +13,66 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import logging
import math import math
import pprint import pprint
import random import random
import threading
import time
from kazoo import exceptions as kze from kazoo import exceptions as kze
from nodepool import exceptions from nodepool import exceptions
from nodepool import nodeutils as utils from nodepool import nodeutils as utils
from nodepool import stats
from nodepool import zk from nodepool import zk
from nodepool.driver import NodeLauncher
from nodepool.driver import NodeRequestHandler from nodepool.driver import NodeRequestHandler
from nodepool.driver.openstack.provider import QuotaInformation from nodepool.driver.openstack.provider import QuotaInformation
class NodeLauncher(threading.Thread, stats.StatsReporter): class OpenStackNodeLauncher(NodeLauncher):
log = logging.getLogger("nodepool.driver.openstack." def __init__(self, handler, node, retries):
"NodeLauncher")
def __init__(self, zk, provider_label, provider_manager, requestor,
node, retries):
''' '''
Initialize the launcher. Initialize the launcher.
:param ZooKeeper zk: A ZooKeeper object. :param NodeRequestHandler handler: The handler object.
:param ProviderLabel provider: A config ProviderLabel object.
:param ProviderManager provider_manager: The manager object used to
interact with the selected provider.
:param str requestor: Identifier for the request originator.
:param Node node: The node object. :param Node node: The node object.
:param int retries: Number of times to retry failed launches. :param int retries: Number of times to retry failed launches.
''' '''
threading.Thread.__init__(self, name="NodeLauncher-%s" % node.id) super().__init__(handler, node)
stats.StatsReporter.__init__(self)
self.log = logging.getLogger("nodepool.NodeLauncher-%s" % node.id)
self._zk = zk
self._label = provider_label
self._provider_manager = provider_manager
self._node = node
self._retries = retries self._retries = retries
self._image_name = None
self._requestor = requestor
self._pool = self._label.pool if self.label.diskimage:
self._provider_config = self._pool.provider self._diskimage = self.provider_config.diskimages[
if self._label.diskimage: self.label.diskimage.name]
self._diskimage = self._provider_config.diskimages[
self._label.diskimage.name]
else: else:
self._diskimage = None self._diskimage = None
def logConsole(self, server_id, hostname): def _logConsole(self, server_id, hostname):
if not self._label.console_log: if not self.label.console_log:
return return
console = self._provider_manager.getServerConsole(server_id) console = self.handler.manager.getServerConsole(server_id)
if console: if console:
self.log.debug('Console log from hostname %s:' % hostname) self.log.debug('Console log from hostname %s:' % hostname)
for line in console.splitlines(): for line in console.splitlines():
self.log.debug(line.rstrip()) self.log.debug(line.rstrip())
def _launchNode(self): def _launchNode(self):
if self._label.diskimage: if self.label.diskimage:
# launch using diskimage # launch using diskimage
cloud_image = self._zk.getMostRecentImageUpload( cloud_image = self.handler.zk.getMostRecentImageUpload(
self._diskimage.name, self._provider_config.name) self._diskimage.name, self.provider_config.name)
if not cloud_image: if not cloud_image:
raise exceptions.LaunchNodepoolException( raise exceptions.LaunchNodepoolException(
"Unable to find current cloud image %s in %s" % "Unable to find current cloud image %s in %s" %
(self._diskimage.name, self._provider_config.name) (self._diskimage.name, self.provider_config.name)
) )
config_drive = self._diskimage.config_drive config_drive = self._diskimage.config_drive
image_external = dict(id=cloud_image.external_id) image_external = dict(id=cloud_image.external_id)
image_id = "{path}/{upload_id}".format( image_id = "{path}/{upload_id}".format(
path=self._zk._imageUploadPath(cloud_image.image_name, path=self.handler.zk._imageUploadPath(
cloud_image.build_id, cloud_image.image_name,
cloud_image.provider_name), cloud_image.build_id,
cloud_image.provider_name),
upload_id=cloud_image.id) upload_id=cloud_image.id)
image_name = self._diskimage.name image_name = self._diskimage.name
username = cloud_image.username username = cloud_image.username
@ -101,73 +81,73 @@ class NodeLauncher(threading.Thread, stats.StatsReporter):
else: else:
# launch using unmanaged cloud image # launch using unmanaged cloud image
config_drive = self._label.cloud_image.config_drive config_drive = self.label.cloud_image.config_drive
image_external = self._label.cloud_image.external image_external = self.label.cloud_image.external
image_id = self._label.cloud_image.name image_id = self.label.cloud_image.name
image_name = self._label.cloud_image.name image_name = self.label.cloud_image.name
username = self._label.cloud_image.username username = self.label.cloud_image.username
connection_type = self._label.cloud_image.connection_type connection_type = self.label.cloud_image.connection_type
connection_port = self._label.cloud_image.connection_port connection_port = self.label.cloud_image.connection_port
hostname = self._provider_config.hostname_format.format( hostname = self.provider_config.hostname_format.format(
label=self._label, provider=self._provider_config, node=self._node label=self.label, provider=self.provider_config, node=self.node
) )
self.log.info("Creating server with hostname %s in %s from image %s " self.log.info("Creating server with hostname %s in %s from image %s "
"for node id: %s" % (hostname, "for node id: %s" % (hostname,
self._provider_config.name, self.provider_config.name,
image_name, image_name,
self._node.id)) self.node.id))
# NOTE: We store the node ID in the server metadata to use for leaked # NOTE: We store the node ID in the server metadata to use for leaked
# instance detection. We cannot use the external server ID for this # instance detection. We cannot use the external server ID for this
# because that isn't available in ZooKeeper until after the server is # because that isn't available in ZooKeeper until after the server is
# active, which could cause a race in leak detection. # active, which could cause a race in leak detection.
server = self._provider_manager.createServer( server = self.handler.manager.createServer(
hostname, hostname,
image=image_external, image=image_external,
min_ram=self._label.min_ram, min_ram=self.label.min_ram,
flavor_name=self._label.flavor_name, flavor_name=self.label.flavor_name,
key_name=self._label.key_name, key_name=self.label.key_name,
az=self._node.az, az=self.node.az,
config_drive=config_drive, config_drive=config_drive,
nodepool_node_id=self._node.id, nodepool_node_id=self.node.id,
nodepool_node_label=self._node.type, nodepool_node_label=self.node.type,
nodepool_image_name=image_name, nodepool_image_name=image_name,
networks=self._pool.networks, networks=self.pool.networks,
boot_from_volume=self._label.boot_from_volume, boot_from_volume=self.label.boot_from_volume,
volume_size=self._label.volume_size) volume_size=self.label.volume_size)
self._node.external_id = server.id self.node.external_id = server.id
self._node.hostname = hostname self.node.hostname = hostname
self._node.image_id = image_id self.node.image_id = image_id
if username: if username:
self._node.username = username self.node.username = username
self._node.connection_type = connection_type self.node.connection_type = connection_type
self._node.connection_port = connection_port self.node.connection_port = connection_port
# Checkpoint save the updated node info # Checkpoint save the updated node info
self._zk.storeNode(self._node) self.storeNode()
self.log.debug("Waiting for server %s for node id: %s" % self.log.debug("Waiting for server %s for node id: %s" %
(server.id, self._node.id)) (server.id, self.node.id))
server = self._provider_manager.waitForServer( server = self.handler.manager.waitForServer(
server, self._provider_config.launch_timeout, server, self.provider_config.launch_timeout,
auto_ip=self._pool.auto_floating_ip) auto_ip=self.pool.auto_floating_ip)
if server.status != 'ACTIVE': if server.status != 'ACTIVE':
raise exceptions.LaunchStatusException("Server %s for node id: %s " raise exceptions.LaunchStatusException("Server %s for node id: %s "
"status: %s" % "status: %s" %
(server.id, self._node.id, (server.id, self.node.id,
server.status)) server.status))
# If we didn't specify an AZ, set it to the one chosen by Nova. # If we didn't specify an AZ, set it to the one chosen by Nova.
# Do this after we are done waiting since AZ may not be available # Do this after we are done waiting since AZ may not be available
# immediately after the create request. # immediately after the create request.
if not self._node.az: if not self.node.az:
self._node.az = server.location.zone self.node.az = server.location.zone
interface_ip = server.interface_ip interface_ip = server.interface_ip
if not interface_ip: if not interface_ip:
@ -177,37 +157,37 @@ class NodeLauncher(threading.Thread, stats.StatsReporter):
raise exceptions.LaunchNetworkException( raise exceptions.LaunchNetworkException(
"Unable to find public IP of server") "Unable to find public IP of server")
self._node.interface_ip = interface_ip self.node.interface_ip = interface_ip
self._node.public_ipv4 = server.public_v4 self.node.public_ipv4 = server.public_v4
self._node.public_ipv6 = server.public_v6 self.node.public_ipv6 = server.public_v6
self._node.private_ipv4 = server.private_v4 self.node.private_ipv4 = server.private_v4
# devstack-gate multi-node depends on private_v4 being populated # devstack-gate multi-node depends on private_v4 being populated
# with something. On clouds that don't have a private address, use # with something. On clouds that don't have a private address, use
# the public. # the public.
if not self._node.private_ipv4: if not self.node.private_ipv4:
self._node.private_ipv4 = server.public_v4 self.node.private_ipv4 = server.public_v4
# Checkpoint save the updated node info # Checkpoint save the updated node info
self._zk.storeNode(self._node) self.storeNode()
self.log.debug( self.log.debug(
"Node %s is running [region: %s, az: %s, ip: %s ipv4: %s, " "Node %s is running [region: %s, az: %s, ip: %s ipv4: %s, "
"ipv6: %s]" % "ipv6: %s]" %
(self._node.id, self._node.region, self._node.az, (self.node.id, self.node.region, self.node.az,
self._node.interface_ip, self._node.public_ipv4, self.node.interface_ip, self.node.public_ipv4,
self._node.public_ipv6)) self.node.public_ipv6))
# wait and scan the new node and record in ZooKeeper # wait and scan the new node and record in ZooKeeper
host_keys = [] host_keys = []
if self._pool.host_key_checking: if self.pool.host_key_checking:
try: try:
self.log.debug( self.log.debug(
"Gathering host keys for node %s", self._node.id) "Gathering host keys for node %s", self.node.id)
# only gather host keys if the connection type is ssh # only gather host keys if the connection type is ssh
gather_host_keys = connection_type == 'ssh' gather_host_keys = connection_type == 'ssh'
host_keys = utils.nodescan( host_keys = utils.nodescan(
interface_ip, interface_ip,
timeout=self._provider_config.boot_timeout, timeout=self.provider_config.boot_timeout,
gather_hostkeys=gather_host_keys, gather_hostkeys=gather_host_keys,
port=connection_port) port=connection_port)
@ -215,13 +195,13 @@ class NodeLauncher(threading.Thread, stats.StatsReporter):
raise exceptions.LaunchKeyscanException( raise exceptions.LaunchKeyscanException(
"Unable to gather host keys") "Unable to gather host keys")
except exceptions.ConnectionTimeoutException: except exceptions.ConnectionTimeoutException:
self.logConsole(self._node.external_id, self._node.hostname) self.logConsole(self.node.external_id, self.node.hostname)
raise raise
self._node.host_keys = host_keys self.node.host_keys = host_keys
self._zk.storeNode(self._node) self.storeNode()
def _run(self): def launch(self):
attempts = 1 attempts = 1
while attempts <= self._retries: while attempts <= self._retries:
try: try:
@ -235,65 +215,28 @@ class NodeLauncher(threading.Thread, stats.StatsReporter):
if attempts <= self._retries: if attempts <= self._retries:
self.log.exception( self.log.exception(
"Launch attempt %d/%d failed for node %s:", "Launch attempt %d/%d failed for node %s:",
attempts, self._retries, self._node.id) attempts, self._retries, self.node.id)
# If we created an instance, delete it. # If we created an instance, delete it.
if self._node.external_id: if self.node.external_id:
self._provider_manager.cleanupNode(self._node.external_id) self.handler.manager.cleanupNode(self.node.external_id)
self._provider_manager.waitForNodeCleanup( self.handler.manager.waitForNodeCleanup(
self._node.external_id self.node.external_id)
) self.node.external_id = None
self._node.external_id = None self.node.public_ipv4 = None
self._node.public_ipv4 = None self.node.public_ipv6 = None
self._node.public_ipv6 = None self.node.interface_ip = None
self._node.interface_ip = None self.storeNode()
self._zk.storeNode(self._node)
if attempts == self._retries: if attempts == self._retries:
raise raise
# Invalidate the quota cache if we encountered a quota error. # Invalidate the quota cache if we encountered a quota error.
if 'quota exceeded' in str(e).lower(): if 'quota exceeded' in str(e).lower():
self.log.info("Quota exceeded, invalidating quota cache") self.log.info("Quota exceeded, invalidating quota cache")
self._provider_manager.invalidateQuotaCache() self.handler.manager.invalidateQuotaCache()
attempts += 1 attempts += 1
self._node.state = zk.READY self.node.state = zk.READY
self._zk.storeNode(self._node) self.storeNode()
self.log.info("Node id %s is ready", self._node.id) self.log.info("Node id %s is ready", self.node.id)
def run(self):
start_time = time.time()
statsd_key = 'ready'
try:
self._run()
except kze.SessionExpiredError:
# Our node lock is gone, leaving the node state as BUILDING.
# This will get cleaned up in ZooKeeper automatically, but we
# must still set our cached node state to FAILED for the
# NodeLaunchManager's poll() method.
self.log.error(
"Lost ZooKeeper session trying to launch for node %s",
self._node.id)
self._node.state = zk.FAILED
statsd_key = 'error.zksession'
except Exception as e:
self.log.exception("Launch failed for node %s:",
self._node.id)
self._node.state = zk.FAILED
self._zk.storeNode(self._node)
if hasattr(e, 'statsd_key'):
statsd_key = e.statsd_key
else:
statsd_key = 'error.unknown'
try:
dt = int((time.time() - start_time) * 1000)
self.recordLaunchStats(statsd_key, dt, self._image_name,
self._node.provider, self._node.az,
self._requestor)
self.updateNodeStats(self._zk, self._provider_config)
except Exception:
self.log.exception("Exception while reporting stats:")
class OpenStackNodeRequestHandler(NodeRequestHandler): class OpenStackNodeRequestHandler(NodeRequestHandler):
@ -301,9 +244,6 @@ class OpenStackNodeRequestHandler(NodeRequestHandler):
def __init__(self, pw, request): def __init__(self, pw, request):
super().__init__(pw, request) super().__init__(pw, request)
self.chosen_az = None self.chosen_az = None
self.log = logging.getLogger(
"nodepool.driver.openstack.OpenStackNodeRequestHandler[%s]" %
self.launcher_id)
def hasRemainingQuota(self, ntype): def hasRemainingQuota(self, ntype):
needed_quota = self.manager.quotaNeededByNodeType(ntype, self.pool) needed_quota = self.manager.quotaNeededByNodeType(ntype, self.pool)
@ -388,7 +328,4 @@ class OpenStackNodeRequestHandler(NodeRequestHandler):
node.region = self.provider.region_name node.region = self.provider.region_name
def launch(self, node): def launch(self, node):
return NodeLauncher( return OpenStackNodeLauncher(self, node, self.provider.launch_retries)
self.zk, self.pool.labels[node.type], self.manager,
self.request.requestor, node,
self.provider.launch_retries)

View File

@ -51,35 +51,30 @@ class StatsReporter(object):
super(StatsReporter, self).__init__() super(StatsReporter, self).__init__()
self._statsd = get_client() self._statsd = get_client()
def recordLaunchStats(self, subkey, dt, image_name, def recordLaunchStats(self, subkey, dt):
provider_name, node_az, requestor):
''' '''
Record node launch statistics. Record node launch statistics.
:param str subkey: statsd key :param str subkey: statsd key
:param int dt: Time delta in milliseconds :param int dt: Time delta in milliseconds
:param str image_name: Name of the image used
:param str provider_name: Name of the provider
:param str node_az: AZ of the launched node
:param str requestor: Identifier for the request originator
''' '''
if not self._statsd: if not self._statsd:
return return
keys = [ keys = [
'nodepool.launch.provider.%s.%s' % (provider_name, subkey), 'nodepool.launch.provider.%s.%s' % (
'nodepool.launch.image.%s.%s' % (image_name, subkey), self.provider_config.name, subkey),
'nodepool.launch.%s' % (subkey,), 'nodepool.launch.%s' % (subkey,),
] ]
if node_az: if self.node.az:
keys.append('nodepool.launch.provider.%s.%s.%s' % keys.append('nodepool.launch.provider.%s.%s.%s' %
(provider_name, node_az, subkey)) (self.provider_config.name, self.node.az, subkey))
if requestor: if self.handler.request.requestor:
# Replace '.' which is a graphite hierarchy, and ':' which is # Replace '.' which is a graphite hierarchy, and ':' which is
# a statsd delimeter. # a statsd delimeter.
requestor = requestor.replace('.', '_') requestor = self.handler.request.requestor.replace('.', '_')
requestor = requestor.replace(':', '_') requestor = requestor.replace(':', '_')
keys.append('nodepool.launch.requestor.%s.%s' % keys.append('nodepool.launch.requestor.%s.%s' %
(requestor, subkey)) (requestor, subkey))

View File

@ -1292,7 +1292,8 @@ class TestLauncher(tests.DBTestCase):
time.sleep(1) time.sleep(1)
launchers = self.zk.getRegisteredLaunchers() launchers = self.zk.getRegisteredLaunchers()
@mock.patch('nodepool.driver.openstack.handler.NodeLauncher._launchNode') @mock.patch('nodepool.driver.openstack.handler.'
'OpenStackNodeLauncher._launchNode')
def test_launchNode_session_expired(self, mock_launch): def test_launchNode_session_expired(self, mock_launch):
''' '''
Test ZK session lost during _launchNode(). Test ZK session lost during _launchNode().

View File

@ -89,7 +89,8 @@ class TestNodeLaunchManager(tests.DBTestCase):
self.assertEqual(nodes[0]['metadata']['groups'], self.assertEqual(nodes[0]['metadata']['groups'],
'fake-provider,fake-image,fake-label') 'fake-provider,fake-image,fake-label')
@mock.patch('nodepool.driver.openstack.handler.NodeLauncher._launchNode') @mock.patch('nodepool.driver.openstack.handler.'
'OpenStackNodeLauncher._launchNode')
def test_failed_launch(self, mock_launch): def test_failed_launch(self, mock_launch):
configfile = self.setup_config('node.yaml') configfile = self.setup_config('node.yaml')
self._setup(configfile) self._setup(configfile)
@ -105,7 +106,8 @@ class TestNodeLaunchManager(tests.DBTestCase):
self.assertEqual(len(handler.failed_nodes), 1) self.assertEqual(len(handler.failed_nodes), 1)
self.assertEqual(len(handler.ready_nodes), 0) self.assertEqual(len(handler.ready_nodes), 0)
@mock.patch('nodepool.driver.openstack.handler.NodeLauncher._launchNode') @mock.patch('nodepool.driver.openstack.handler.'
'OpenStackNodeLauncher._launchNode')
def test_mixed_launch(self, mock_launch): def test_mixed_launch(self, mock_launch):
configfile = self.setup_config('node.yaml') configfile = self.setup_config('node.yaml')
self._setup(configfile) self._setup(configfile)