diff --git a/nodepool/driver/__init__.py b/nodepool/driver/__init__.py index 8f6337530..d47ff78ed 100644 --- a/nodepool/driver/__init__.py +++ b/nodepool/driver/__init__.py @@ -22,11 +22,16 @@ import importlib import logging import math import os +import time +import threading import six +from kazoo import exceptions as kze + from nodepool import zk from nodepool import exceptions +from nodepool import stats class Drivers: @@ -622,6 +627,65 @@ class NodeRequestHandler(object): pass +class NodeLauncher(threading.Thread, stats.StatsReporter): + ''' + Class to launch a single node. + + The NodeRequestHandler may return such object to manage asynchronous + node creation. + + Subclasses are required to implement the launch method + ''' + + def __init__(self, handler, node): + threading.Thread.__init__(self, name="NodeLauncher-%s" % node.id) + stats.StatsReporter.__init__(self) + self.log = logging.getLogger("nodepool.NodeLauncher-%s" % node.id) + self.handler = handler + self.node = node + self.label = handler.pool.labels[node.type] + self.pool = self.label.pool + self.provider_config = self.pool.provider + + def storeNode(self): + """Store the node state in Zookeeper""" + self.handler.zk.storeNode(self.node) + + def run(self): + start_time = time.monotonic() + statsd_key = 'ready' + + try: + self.launch() + except kze.SessionExpiredError: + # Our node lock is gone, leaving the node state as BUILDING. + # This will get cleaned up in ZooKeeper automatically, but we + # must still set our cached node state to FAILED for the + # NodeLaunchManager's poll() method. + self.log.error( + "Lost ZooKeeper session trying to launch for node %s", + self.node.id) + self.node.state = zk.FAILED + statsd_key = 'error.zksession' + except Exception as e: + self.log.exception("Launch failed for node %s:", + self.node.id) + self.node.state = zk.FAILED + self.handler.zk.storeNode(self.node) + + if hasattr(e, 'statsd_key'): + statsd_key = e.statsd_key + else: + statsd_key = 'error.unknown' + + try: + dt = int((time.monotonic() - start_time) * 1000) + self.recordLaunchStats(statsd_key, dt) + self.updateNodeStats(self.handler.zk, self.provider_config) + except Exception: + self.log.exception("Exception while reporting stats:") + + class ConfigValue(object): def __eq__(self, other): if isinstance(other, ConfigValue): diff --git a/nodepool/driver/openstack/handler.py b/nodepool/driver/openstack/handler.py index c30ea0d41..0a62ed3e6 100644 --- a/nodepool/driver/openstack/handler.py +++ b/nodepool/driver/openstack/handler.py @@ -13,86 +13,66 @@ # License for the specific language governing permissions and limitations # under the License. -import logging import math import pprint import random -import threading -import time from kazoo import exceptions as kze from nodepool import exceptions from nodepool import nodeutils as utils -from nodepool import stats from nodepool import zk +from nodepool.driver import NodeLauncher from nodepool.driver import NodeRequestHandler from nodepool.driver.openstack.provider import QuotaInformation -class NodeLauncher(threading.Thread, stats.StatsReporter): - log = logging.getLogger("nodepool.driver.openstack." - "NodeLauncher") - - def __init__(self, zk, provider_label, provider_manager, requestor, - node, retries): +class OpenStackNodeLauncher(NodeLauncher): + def __init__(self, handler, node, retries): ''' Initialize the launcher. - :param ZooKeeper zk: A ZooKeeper object. - :param ProviderLabel provider: A config ProviderLabel object. - :param ProviderManager provider_manager: The manager object used to - interact with the selected provider. - :param str requestor: Identifier for the request originator. + :param NodeRequestHandler handler: The handler object. :param Node node: The node object. :param int retries: Number of times to retry failed launches. ''' - threading.Thread.__init__(self, name="NodeLauncher-%s" % node.id) - stats.StatsReporter.__init__(self) - self.log = logging.getLogger("nodepool.NodeLauncher-%s" % node.id) - self._zk = zk - self._label = provider_label - self._provider_manager = provider_manager - self._node = node + super().__init__(handler, node) self._retries = retries - self._image_name = None - self._requestor = requestor - self._pool = self._label.pool - self._provider_config = self._pool.provider - if self._label.diskimage: - self._diskimage = self._provider_config.diskimages[ - self._label.diskimage.name] + if self.label.diskimage: + self._diskimage = self.provider_config.diskimages[ + self.label.diskimage.name] else: self._diskimage = None - def logConsole(self, server_id, hostname): - if not self._label.console_log: + def _logConsole(self, server_id, hostname): + if not self.label.console_log: return - console = self._provider_manager.getServerConsole(server_id) + console = self.handler.manager.getServerConsole(server_id) if console: self.log.debug('Console log from hostname %s:' % hostname) for line in console.splitlines(): self.log.debug(line.rstrip()) def _launchNode(self): - if self._label.diskimage: + if self.label.diskimage: # launch using diskimage - cloud_image = self._zk.getMostRecentImageUpload( - self._diskimage.name, self._provider_config.name) + cloud_image = self.handler.zk.getMostRecentImageUpload( + self._diskimage.name, self.provider_config.name) if not cloud_image: raise exceptions.LaunchNodepoolException( "Unable to find current cloud image %s in %s" % - (self._diskimage.name, self._provider_config.name) + (self._diskimage.name, self.provider_config.name) ) config_drive = self._diskimage.config_drive image_external = dict(id=cloud_image.external_id) image_id = "{path}/{upload_id}".format( - path=self._zk._imageUploadPath(cloud_image.image_name, - cloud_image.build_id, - cloud_image.provider_name), + path=self.handler.zk._imageUploadPath( + cloud_image.image_name, + cloud_image.build_id, + cloud_image.provider_name), upload_id=cloud_image.id) image_name = self._diskimage.name username = cloud_image.username @@ -101,73 +81,73 @@ class NodeLauncher(threading.Thread, stats.StatsReporter): else: # launch using unmanaged cloud image - config_drive = self._label.cloud_image.config_drive + config_drive = self.label.cloud_image.config_drive - image_external = self._label.cloud_image.external - image_id = self._label.cloud_image.name - image_name = self._label.cloud_image.name - username = self._label.cloud_image.username - connection_type = self._label.cloud_image.connection_type - connection_port = self._label.cloud_image.connection_port + image_external = self.label.cloud_image.external + image_id = self.label.cloud_image.name + image_name = self.label.cloud_image.name + username = self.label.cloud_image.username + connection_type = self.label.cloud_image.connection_type + connection_port = self.label.cloud_image.connection_port - hostname = self._provider_config.hostname_format.format( - label=self._label, provider=self._provider_config, node=self._node + hostname = self.provider_config.hostname_format.format( + label=self.label, provider=self.provider_config, node=self.node ) self.log.info("Creating server with hostname %s in %s from image %s " "for node id: %s" % (hostname, - self._provider_config.name, + self.provider_config.name, image_name, - self._node.id)) + self.node.id)) # NOTE: We store the node ID in the server metadata to use for leaked # instance detection. We cannot use the external server ID for this # because that isn't available in ZooKeeper until after the server is # active, which could cause a race in leak detection. - server = self._provider_manager.createServer( + server = self.handler.manager.createServer( hostname, image=image_external, - min_ram=self._label.min_ram, - flavor_name=self._label.flavor_name, - key_name=self._label.key_name, - az=self._node.az, + min_ram=self.label.min_ram, + flavor_name=self.label.flavor_name, + key_name=self.label.key_name, + az=self.node.az, config_drive=config_drive, - nodepool_node_id=self._node.id, - nodepool_node_label=self._node.type, + nodepool_node_id=self.node.id, + nodepool_node_label=self.node.type, nodepool_image_name=image_name, - networks=self._pool.networks, - boot_from_volume=self._label.boot_from_volume, - volume_size=self._label.volume_size) + networks=self.pool.networks, + boot_from_volume=self.label.boot_from_volume, + volume_size=self.label.volume_size) - self._node.external_id = server.id - self._node.hostname = hostname - self._node.image_id = image_id + self.node.external_id = server.id + self.node.hostname = hostname + self.node.image_id = image_id if username: - self._node.username = username - self._node.connection_type = connection_type - self._node.connection_port = connection_port + self.node.username = username + self.node.connection_type = connection_type + self.node.connection_port = connection_port # Checkpoint save the updated node info - self._zk.storeNode(self._node) + self.storeNode() self.log.debug("Waiting for server %s for node id: %s" % - (server.id, self._node.id)) - server = self._provider_manager.waitForServer( - server, self._provider_config.launch_timeout, - auto_ip=self._pool.auto_floating_ip) + (server.id, self.node.id)) + server = self.handler.manager.waitForServer( + server, self.provider_config.launch_timeout, + auto_ip=self.pool.auto_floating_ip) if server.status != 'ACTIVE': raise exceptions.LaunchStatusException("Server %s for node id: %s " "status: %s" % - (server.id, self._node.id, + (server.id, self.node.id, server.status)) # If we didn't specify an AZ, set it to the one chosen by Nova. # Do this after we are done waiting since AZ may not be available # immediately after the create request. - if not self._node.az: - self._node.az = server.location.zone + if not self.node.az: + self.node.az = server.location.zone interface_ip = server.interface_ip if not interface_ip: @@ -177,37 +157,37 @@ class NodeLauncher(threading.Thread, stats.StatsReporter): raise exceptions.LaunchNetworkException( "Unable to find public IP of server") - self._node.interface_ip = interface_ip - self._node.public_ipv4 = server.public_v4 - self._node.public_ipv6 = server.public_v6 - self._node.private_ipv4 = server.private_v4 + self.node.interface_ip = interface_ip + self.node.public_ipv4 = server.public_v4 + self.node.public_ipv6 = server.public_v6 + self.node.private_ipv4 = server.private_v4 # devstack-gate multi-node depends on private_v4 being populated # with something. On clouds that don't have a private address, use # the public. - if not self._node.private_ipv4: - self._node.private_ipv4 = server.public_v4 + if not self.node.private_ipv4: + self.node.private_ipv4 = server.public_v4 # Checkpoint save the updated node info - self._zk.storeNode(self._node) + self.storeNode() self.log.debug( "Node %s is running [region: %s, az: %s, ip: %s ipv4: %s, " "ipv6: %s]" % - (self._node.id, self._node.region, self._node.az, - self._node.interface_ip, self._node.public_ipv4, - self._node.public_ipv6)) + (self.node.id, self.node.region, self.node.az, + self.node.interface_ip, self.node.public_ipv4, + self.node.public_ipv6)) # wait and scan the new node and record in ZooKeeper host_keys = [] - if self._pool.host_key_checking: + if self.pool.host_key_checking: try: self.log.debug( - "Gathering host keys for node %s", self._node.id) + "Gathering host keys for node %s", self.node.id) # only gather host keys if the connection type is ssh gather_host_keys = connection_type == 'ssh' host_keys = utils.nodescan( interface_ip, - timeout=self._provider_config.boot_timeout, + timeout=self.provider_config.boot_timeout, gather_hostkeys=gather_host_keys, port=connection_port) @@ -215,13 +195,13 @@ class NodeLauncher(threading.Thread, stats.StatsReporter): raise exceptions.LaunchKeyscanException( "Unable to gather host keys") except exceptions.ConnectionTimeoutException: - self.logConsole(self._node.external_id, self._node.hostname) + self.logConsole(self.node.external_id, self.node.hostname) raise - self._node.host_keys = host_keys - self._zk.storeNode(self._node) + self.node.host_keys = host_keys + self.storeNode() - def _run(self): + def launch(self): attempts = 1 while attempts <= self._retries: try: @@ -235,65 +215,28 @@ class NodeLauncher(threading.Thread, stats.StatsReporter): if attempts <= self._retries: self.log.exception( "Launch attempt %d/%d failed for node %s:", - attempts, self._retries, self._node.id) + attempts, self._retries, self.node.id) # If we created an instance, delete it. - if self._node.external_id: - self._provider_manager.cleanupNode(self._node.external_id) - self._provider_manager.waitForNodeCleanup( - self._node.external_id - ) - self._node.external_id = None - self._node.public_ipv4 = None - self._node.public_ipv6 = None - self._node.interface_ip = None - self._zk.storeNode(self._node) + if self.node.external_id: + self.handler.manager.cleanupNode(self.node.external_id) + self.handler.manager.waitForNodeCleanup( + self.node.external_id) + self.node.external_id = None + self.node.public_ipv4 = None + self.node.public_ipv6 = None + self.node.interface_ip = None + self.storeNode() if attempts == self._retries: raise # Invalidate the quota cache if we encountered a quota error. if 'quota exceeded' in str(e).lower(): self.log.info("Quota exceeded, invalidating quota cache") - self._provider_manager.invalidateQuotaCache() + self.handler.manager.invalidateQuotaCache() attempts += 1 - self._node.state = zk.READY - self._zk.storeNode(self._node) - self.log.info("Node id %s is ready", self._node.id) - - def run(self): - start_time = time.time() - statsd_key = 'ready' - - try: - self._run() - except kze.SessionExpiredError: - # Our node lock is gone, leaving the node state as BUILDING. - # This will get cleaned up in ZooKeeper automatically, but we - # must still set our cached node state to FAILED for the - # NodeLaunchManager's poll() method. - self.log.error( - "Lost ZooKeeper session trying to launch for node %s", - self._node.id) - self._node.state = zk.FAILED - statsd_key = 'error.zksession' - except Exception as e: - self.log.exception("Launch failed for node %s:", - self._node.id) - self._node.state = zk.FAILED - self._zk.storeNode(self._node) - - if hasattr(e, 'statsd_key'): - statsd_key = e.statsd_key - else: - statsd_key = 'error.unknown' - - try: - dt = int((time.time() - start_time) * 1000) - self.recordLaunchStats(statsd_key, dt, self._image_name, - self._node.provider, self._node.az, - self._requestor) - self.updateNodeStats(self._zk, self._provider_config) - except Exception: - self.log.exception("Exception while reporting stats:") + self.node.state = zk.READY + self.storeNode() + self.log.info("Node id %s is ready", self.node.id) class OpenStackNodeRequestHandler(NodeRequestHandler): @@ -301,9 +244,6 @@ class OpenStackNodeRequestHandler(NodeRequestHandler): def __init__(self, pw, request): super().__init__(pw, request) self.chosen_az = None - self.log = logging.getLogger( - "nodepool.driver.openstack.OpenStackNodeRequestHandler[%s]" % - self.launcher_id) def hasRemainingQuota(self, ntype): needed_quota = self.manager.quotaNeededByNodeType(ntype, self.pool) @@ -388,7 +328,4 @@ class OpenStackNodeRequestHandler(NodeRequestHandler): node.region = self.provider.region_name def launch(self, node): - return NodeLauncher( - self.zk, self.pool.labels[node.type], self.manager, - self.request.requestor, node, - self.provider.launch_retries) + return OpenStackNodeLauncher(self, node, self.provider.launch_retries) diff --git a/nodepool/stats.py b/nodepool/stats.py index d733759c7..b9464f12f 100755 --- a/nodepool/stats.py +++ b/nodepool/stats.py @@ -51,35 +51,30 @@ class StatsReporter(object): super(StatsReporter, self).__init__() self._statsd = get_client() - def recordLaunchStats(self, subkey, dt, image_name, - provider_name, node_az, requestor): + def recordLaunchStats(self, subkey, dt): ''' Record node launch statistics. :param str subkey: statsd key :param int dt: Time delta in milliseconds - :param str image_name: Name of the image used - :param str provider_name: Name of the provider - :param str node_az: AZ of the launched node - :param str requestor: Identifier for the request originator ''' if not self._statsd: return keys = [ - 'nodepool.launch.provider.%s.%s' % (provider_name, subkey), - 'nodepool.launch.image.%s.%s' % (image_name, subkey), + 'nodepool.launch.provider.%s.%s' % ( + self.provider_config.name, subkey), 'nodepool.launch.%s' % (subkey,), ] - if node_az: + if self.node.az: keys.append('nodepool.launch.provider.%s.%s.%s' % - (provider_name, node_az, subkey)) + (self.provider_config.name, self.node.az, subkey)) - if requestor: + if self.handler.request.requestor: # Replace '.' which is a graphite hierarchy, and ':' which is # a statsd delimeter. - requestor = requestor.replace('.', '_') + requestor = self.handler.request.requestor.replace('.', '_') requestor = requestor.replace(':', '_') keys.append('nodepool.launch.requestor.%s.%s' % (requestor, subkey)) diff --git a/nodepool/tests/test_launcher.py b/nodepool/tests/test_launcher.py index 199c7afe5..a2d345f3d 100644 --- a/nodepool/tests/test_launcher.py +++ b/nodepool/tests/test_launcher.py @@ -1292,7 +1292,8 @@ class TestLauncher(tests.DBTestCase): time.sleep(1) launchers = self.zk.getRegisteredLaunchers() - @mock.patch('nodepool.driver.openstack.handler.NodeLauncher._launchNode') + @mock.patch('nodepool.driver.openstack.handler.' + 'OpenStackNodeLauncher._launchNode') def test_launchNode_session_expired(self, mock_launch): ''' Test ZK session lost during _launchNode(). diff --git a/nodepool/tests/test_nodelaunchmanager.py b/nodepool/tests/test_nodelaunchmanager.py index 211a19c8c..8436eb51c 100644 --- a/nodepool/tests/test_nodelaunchmanager.py +++ b/nodepool/tests/test_nodelaunchmanager.py @@ -89,7 +89,8 @@ class TestNodeLaunchManager(tests.DBTestCase): self.assertEqual(nodes[0]['metadata']['groups'], 'fake-provider,fake-image,fake-label') - @mock.patch('nodepool.driver.openstack.handler.NodeLauncher._launchNode') + @mock.patch('nodepool.driver.openstack.handler.' + 'OpenStackNodeLauncher._launchNode') def test_failed_launch(self, mock_launch): configfile = self.setup_config('node.yaml') self._setup(configfile) @@ -105,7 +106,8 @@ class TestNodeLaunchManager(tests.DBTestCase): self.assertEqual(len(handler.failed_nodes), 1) self.assertEqual(len(handler.ready_nodes), 0) - @mock.patch('nodepool.driver.openstack.handler.NodeLauncher._launchNode') + @mock.patch('nodepool.driver.openstack.handler.' + 'OpenStackNodeLauncher._launchNode') def test_mixed_launch(self, mock_launch): configfile = self.setup_config('node.yaml') self._setup(configfile)