From 4d201328f5ed64536336212b3ce7bfd89035aa85 Mon Sep 17 00:00:00 2001 From: Tristan Cacqueray Date: Sat, 17 Jun 2017 07:26:47 +0000 Subject: [PATCH] Collect request handling implementation in an OpenStack driver This change moves OpenStack related code to a driver. To avoid circular import, this change also moves the StatsReporter to the stats module so that the handlers doesn't have to import the launcher. Change-Id: I319ce8780aa7e81b079c3f31d546b89eca6cf5f4 Story: 2001044 Task: 4614 --- nodepool/cmd/nodepoolcmd.py | 2 +- nodepool/driver/fake/__init__.py | 0 nodepool/driver/fake/provider.py | 34 ++ nodepool/driver/openstack/__init__.py | 0 nodepool/driver/openstack/handler.py | 506 +++++++++++++++++++ nodepool/driver/openstack/provider.py | 344 +++++++++++++ nodepool/exceptions.py | 20 + nodepool/launcher.py | 617 +---------------------- nodepool/provider_manager.py | 349 +------------ nodepool/stats.py | 97 ++++ nodepool/tests/__init__.py | 3 +- nodepool/tests/test_builder.py | 2 +- nodepool/tests/test_launcher.py | 4 +- nodepool/tests/test_nodelaunchmanager.py | 6 +- nodepool/tests/test_shade_integration.py | 2 +- 15 files changed, 1027 insertions(+), 959 deletions(-) create mode 100644 nodepool/driver/fake/__init__.py create mode 100644 nodepool/driver/fake/provider.py create mode 100644 nodepool/driver/openstack/__init__.py create mode 100644 nodepool/driver/openstack/handler.py create mode 100755 nodepool/driver/openstack/provider.py mode change 100644 => 100755 nodepool/exceptions.py mode change 100644 => 100755 nodepool/stats.py diff --git a/nodepool/cmd/nodepoolcmd.py b/nodepool/cmd/nodepoolcmd.py index 72fbdb5e5..f38b01c23 100755 --- a/nodepool/cmd/nodepoolcmd.py +++ b/nodepool/cmd/nodepoolcmd.py @@ -266,7 +266,7 @@ class NodePoolCmd(NodepoolApp): if self.args.now: manager = provider_manager.get_provider(provider, True) manager.start() - launcher.InstanceDeleter.delete(self.zk, manager, node) + launcher.NodeDeleter.delete(self.zk, manager, node) manager.stop() else: node.state = zk.DELETING diff --git a/nodepool/driver/fake/__init__.py b/nodepool/driver/fake/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/nodepool/driver/fake/provider.py b/nodepool/driver/fake/provider.py new file mode 100644 index 000000000..0a11b41fb --- /dev/null +++ b/nodepool/driver/fake/provider.py @@ -0,0 +1,34 @@ +# Copyright (C) 2011-2013 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# +# See the License for the specific language governing permissions and +# limitations under the License. + +from nodepool import fakeprovider +from nodepool.driver.openstack.provider import OpenStackProvider + + +class FakeProvider(OpenStackProvider): + def __init__(self, provider, use_taskmanager): + self.createServer_fails = 0 + self.__client = fakeprovider.FakeOpenStackCloud() + super(FakeProvider, self).__init__(provider, use_taskmanager) + + def _getClient(self): + return self.__client + + def createServer(self, *args, **kwargs): + while self.createServer_fails: + self.createServer_fails -= 1 + raise Exception("Expected createServer exception") + return super(FakeProvider, self).createServer(*args, **kwargs) diff --git a/nodepool/driver/openstack/__init__.py b/nodepool/driver/openstack/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/nodepool/driver/openstack/handler.py b/nodepool/driver/openstack/handler.py new file mode 100644 index 000000000..cdb145746 --- /dev/null +++ b/nodepool/driver/openstack/handler.py @@ -0,0 +1,506 @@ +# Copyright (C) 2011-2014 OpenStack Foundation +# Copyright 2017 Red Hat +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import collections +import logging +import pprint +import random +import threading +import time + +from nodepool import exceptions +from nodepool import nodeutils as utils +from nodepool import stats +from nodepool import zk +from nodepool.driver import NodeLaunchManager +from nodepool.driver import NodeRequestHandler + + +class NodeLauncher(threading.Thread, stats.StatsReporter): + log = logging.getLogger("nodepool.driver.openstack." + "NodeLauncher") + + def __init__(self, zk, provider_label, provider_manager, requestor, + node, retries): + ''' + Initialize the launcher. + + :param ZooKeeper zk: A ZooKeeper object. + :param ProviderLabel provider: A config ProviderLabel object. + :param ProviderManager provider_manager: The manager object used to + interact with the selected provider. + :param str requestor: Identifier for the request originator. + :param Node node: The node object. + :param int retries: Number of times to retry failed launches. + ''' + threading.Thread.__init__(self, name="NodeLauncher-%s" % node.id) + stats.StatsReporter.__init__(self) + self.log = logging.getLogger("nodepool.NodeLauncher-%s" % node.id) + self._zk = zk + self._label = provider_label + self._manager = provider_manager + self._node = node + self._retries = retries + self._image_name = None + self._requestor = requestor + + self._pool = self._label.pool + self._provider = self._pool.provider + if self._label.diskimage: + self._diskimage = self._provider.diskimages[self._label.diskimage.name] + else: + self._diskimage = None + self._cloud_image = self._provider.cloud_images.get(self._label.cloud_image, None) + + def logConsole(self, server_id, hostname): + if not self._label.console_log: + return + console = self._manager.getServerConsole(server_id) + if console: + self.log.debug('Console log from hostname %s:' % hostname) + for line in console.splitlines(): + self.log.debug(line.rstrip()) + + def _launchNode(self): + if self._label.diskimage: + # launch using diskimage + cloud_image = self._zk.getMostRecentImageUpload( + self._diskimage.name, self._provider.name) + + if not cloud_image: + raise exceptions.LaunchNodepoolException( + "Unable to find current cloud image %s in %s" % + (self._diskimage.name, self._provider.name) + ) + + config_drive = self._diskimage.config_drive + image_external = dict(id=cloud_image.external_id) + image_id = "{path}/{upload_id}".format( + path=self._zk._imageUploadPath(cloud_image.image_name, + cloud_image.build_id, + cloud_image.provider_name), + upload_id=cloud_image.id) + image_name = self._diskimage.name + + else: + # launch using unmanaged cloud image + config_drive = self._cloud_image.config_drive + + # These are different values for zk, but it's all the same + # for cloud-images. + # image_external is what we use for OpenStack. + # image_id is what we record in the node for zk. + # image_name is what we log, so matches the config. + image_external = self._cloud_image.name + if self._cloud_image.image_id: + image_external = dict(id=self._cloud_image.image_id) + elif self._cloud_image.image_name: + image_external = self._cloud_image.image_name + else: + image_external = self._cloud_image.name + image_id = self._cloud_image.name + image_name = self._cloud_image.name + + hostname = self._provider.hostname_format.format( + label=self._label, provider=self._provider, node=self._node + ) + + self.log.info("Creating server with hostname %s in %s from image %s " + "for node id: %s" % (hostname, self._provider.name, + image_name, + self._node.id)) + + # NOTE: We store the node ID in the server metadata to use for leaked + # instance detection. We cannot use the external server ID for this + # because that isn't available in ZooKeeper until after the server is + # active, which could cause a race in leak detection. + + server = self._manager.createServer( + hostname, + image=image_external, + min_ram=self._label.min_ram, + flavor_name=self._label.flavor_name, + key_name=self._label.key_name, + az=self._node.az, + config_drive=config_drive, + nodepool_node_id=self._node.id, + nodepool_image_name=image_name, + networks=self._pool.networks, + boot_from_volume=self._label.boot_from_volume, + volume_size=self._label.volume_size) + + self._node.external_id = server.id + self._node.hostname = hostname + self._node.image_id = image_id + + # Checkpoint save the updated node info + self._zk.storeNode(self._node) + + self.log.debug("Waiting for server %s for node id: %s" % + (server.id, self._node.id)) + server = self._manager.waitForServer( + server, self._provider.launch_timeout) + + if server.status != 'ACTIVE': + raise exceptions.LaunchStatusException("Server %s for node id: %s " + "status: %s" % + (server.id, self._node.id, + server.status)) + + # If we didn't specify an AZ, set it to the one chosen by Nova. + # Do this after we are done waiting since AZ may not be available + # immediately after the create request. + if not self._node.az: + self._node.az = server.location.zone + + interface_ip = server.interface_ip + if not interface_ip: + self.log.debug( + "Server data for failed IP: %s" % pprint.pformat( + server)) + raise exceptions.LaunchNetworkException( + "Unable to find public IP of server") + + self._node.interface_ip = interface_ip + self._node.public_ipv4 = server.public_v4 + self._node.public_ipv6 = server.public_v6 + self._node.private_ipv4 = server.private_v4 + # devstack-gate multi-node depends on private_v4 being populated + # with something. On clouds that don't have a private address, use + # the public. + if not self._node.private_ipv4: + self._node.private_ipv4 = server.public_v4 + + # Checkpoint save the updated node info + self._zk.storeNode(self._node) + + self.log.debug( + "Node %s is running [region: %s, az: %s, ip: %s ipv4: %s, " + "ipv6: %s]" % + (self._node.id, self._node.region, self._node.az, + self._node.interface_ip, self._node.public_ipv4, + self._node.public_ipv6)) + + # Get the SSH public keys for the new node and record in ZooKeeper + try: + self.log.debug("Gathering host keys for node %s", self._node.id) + host_keys = utils.keyscan( + interface_ip, timeout=self._provider.boot_timeout) + if not host_keys: + raise exceptions.LaunchKeyscanException( + "Unable to gather host keys") + except exceptions.SSHTimeoutException: + self.logConsole(self._node.external_id, self._node.hostname) + raise + + self._node.host_keys = host_keys + self._zk.storeNode(self._node) + + def _run(self): + attempts = 1 + while attempts <= self._retries: + try: + self._launchNode() + break + except Exception: + if attempts <= self._retries: + self.log.exception( + "Launch attempt %d/%d failed for node %s:", + attempts, self._retries, self._node.id) + # If we created an instance, delete it. + if self._node.external_id: + self._manager.cleanupNode(self._node.external_id) + self._manager.waitForNodeCleanup(self._node.external_id) + self._node.external_id = None + self._node.public_ipv4 = None + self._node.public_ipv6 = None + self._node.inerface_ip = None + self._zk.storeNode(self._node) + if attempts == self._retries: + raise + attempts += 1 + + self._node.state = zk.READY + self._zk.storeNode(self._node) + self.log.info("Node id %s is ready", self._node.id) + + def run(self): + start_time = time.time() + statsd_key = 'ready' + + try: + self._run() + except Exception as e: + self.log.exception("Launch failed for node %s:", + self._node.id) + self._node.state = zk.FAILED + self._zk.storeNode(self._node) + + if hasattr(e, 'statsd_key'): + statsd_key = e.statsd_key + else: + statsd_key = 'error.unknown' + + try: + dt = int((time.time() - start_time) * 1000) + self.recordLaunchStats(statsd_key, dt, self._image_name, + self._node.provider, self._node.az, + self._requestor) + self.updateNodeStats(self._zk, self._provider) + except Exception: + self.log.exception("Exception while reporting stats:") + + +class OpenStackNodeLaunchManager(NodeLaunchManager): + def launch(self, node): + ''' + Launch a new node as described by the supplied Node. + + We expect each NodeLauncher thread to directly modify the node that + is passed to it. The poll() method will expect to see the node.state + attribute to change as the node is processed. + + :param Node node: The node object. + ''' + self._nodes.append(node) + provider_label = self._pool.labels[node.type] + t = NodeLauncher(self._zk, provider_label, self._manager, + self._requestor, node, self._retries) + t.start() + self._threads.append(t) + + +class OpenStackNodeRequestHandler(NodeRequestHandler): + log = logging.getLogger("nodepool.driver.openstack." + "OpenStackNodeRequestHandler") + + def __init__(self, pw, request): + super(OpenStackNodeRequestHandler, self).__init__(pw, request) + self.chosen_az = None + + def _imagesAvailable(self): + ''' + Determines if the requested images are available for this provider. + + ZooKeeper is queried for an image uploaded to the provider that is + in the READY state. + + :returns: True if it is available, False otherwise. + ''' + for label in self.request.node_types: + + if self.pool.labels[label].cloud_image: + img = self.pool.labels[label].cloud_image + if not self.manager.labelReady(img): + return False + else: + img = self.pool.labels[label].diskimage.name + + if not self.zk.getMostRecentImageUpload(img, self.provider.name): + return False + return True + + def _invalidNodeTypes(self): + ''' + Return any node types that are invalid for this provider. + + :returns: A list of node type names that are invalid, or an empty + list if all are valid. + ''' + invalid = [] + for ntype in self.request.node_types: + if ntype not in self.pool.labels: + invalid.append(ntype) + return invalid + + def _countNodes(self): + ''' + Query ZooKeeper to determine the number of provider nodes launched. + + :returns: An integer for the number launched for this provider. + ''' + count = 0 + for node in self.zk.nodeIterator(): + if (node.provider == self.provider.name and + node.pool == self.pool.name): + count += 1 + return count + + def _waitForNodeSet(self): + ''' + Fill node set for the request. + + Obtain nodes for the request, pausing all new request handling for + this provider until the node set can be filled. + + We attempt to group the node set within the same provider availability + zone. For this to work properly, the provider entry in the nodepool + config must list the availability zones. Otherwise, new nodes will be + put in random AZs at nova's whim. The exception being if there is an + existing node in the READY state that we can select for this node set. + Its AZ will then be used for new nodes, as well as any other READY + nodes. + + note:: This code is a bit racey in its calculation of the number of + nodes in use for quota purposes. It is possible for multiple + launchers to be doing this calculation at the same time. Since we + currently have no locking mechanism around the "in use" + calculation, if we are at the edge of the quota, one of the + launchers could attempt to launch a new node after the other + launcher has already started doing so. This would cause an + expected failure from the underlying library, which is ok for now. + ''' + if not self.launch_manager: + self.launch_manager = OpenStackNodeLaunchManager( + self.zk, self.pool, self.manager, + self.request.requestor, retries=self.provider.launch_retries) + + # Since this code can be called more than once for the same request, + # we need to calculate the difference between our current node set + # and what was requested. We cannot use set operations here since a + # node type can appear more than once in the requested types. + saved_types = collections.Counter([n.type for n in self.nodeset]) + requested_types = collections.Counter(self.request.node_types) + diff = requested_types - saved_types + needed_types = list(diff.elements()) + + ready_nodes = self.zk.getReadyNodesOfTypes(needed_types) + + for ntype in needed_types: + # First try to grab from the list of already available nodes. + got_a_node = False + if self.request.reuse and ntype in ready_nodes: + for node in ready_nodes[ntype]: + # Only interested in nodes from this provider and + # pool, and within the selected AZ. + if node.provider != self.provider.name: + continue + if node.pool != self.pool.name: + continue + if self.chosen_az and node.az != self.chosen_az: + continue + + try: + self.zk.lockNode(node, blocking=False) + except exceptions.ZKLockException: + # It's already locked so skip it. + continue + else: + if self.paused: + self.log.debug("Unpaused request %s", self.request) + self.paused = False + + self.log.debug( + "Locked existing node %s for request %s", + node.id, self.request.id) + got_a_node = True + node.allocated_to = self.request.id + self.zk.storeNode(node) + self.nodeset.append(node) + + # If we haven't already chosen an AZ, select the + # AZ from this ready node. This will cause new nodes + # to share this AZ, as well. + if not self.chosen_az and node.az: + self.chosen_az = node.az + break + + # Could not grab an existing node, so launch a new one. + if not got_a_node: + # Select grouping AZ if we didn't set AZ from a selected, + # pre-existing node + if not self.chosen_az: + self.chosen_az = random.choice( + self.pool.azs or self.manager.getAZs()) + + # If we calculate that we're at capacity, pause until nodes + # are released by Zuul and removed by the DeletedNodeWorker. + if self._countNodes() >= self.pool.max_servers: + if not self.paused: + self.log.debug( + "Pausing request handling to satisfy request %s", + self.request) + self.paused = True + return + + if self.paused: + self.log.debug("Unpaused request %s", self.request) + self.paused = False + + node = zk.Node() + node.state = zk.INIT + node.type = ntype + node.provider = self.provider.name + node.pool = self.pool.name + node.az = self.chosen_az + node.region = self.provider.region_name + node.launcher = self.launcher_id + node.allocated_to = self.request.id + + # Note: It should be safe (i.e., no race) to lock the node + # *after* it is stored since nodes in INIT state are not + # locked anywhere. + self.zk.storeNode(node) + self.zk.lockNode(node, blocking=False) + self.log.debug("Locked building node %s for request %s", + node.id, self.request.id) + + # Set state AFTER lock so sthat it isn't accidentally cleaned + # up (unlocked BUILDING nodes will be deleted). + node.state = zk.BUILDING + self.zk.storeNode(node) + + self.nodeset.append(node) + self.launch_manager.launch(node) + + def run_handler(self): + ''' + Main body for the OpenStackNodeRequestHandler. + ''' + self._setFromPoolWorker() + declined_reasons = [] + invalid_types = self._invalidNodeTypes() + if invalid_types: + declined_reasons.append('node type(s) [%s] not available' % + ','.join(invalid_types)) + elif not self._imagesAvailable(): + declined_reasons.append('images are not available') + if len(self.request.node_types) > self.pool.max_servers: + declined_reasons.append('it would exceed quota') + + if declined_reasons: + self.log.debug("Declining node request %s because %s", + self.request.id, ', '.join(declined_reasons)) + self.request.declined_by.append(self.launcher_id) + launchers = set(self.zk.getRegisteredLaunchers()) + if launchers.issubset(set(self.request.declined_by)): + self.log.debug("Failing declined node request %s", + self.request.id) + # All launchers have declined it + self.request.state = zk.FAILED + self.unlockNodeSet(clear_allocation=True) + self.zk.storeNodeRequest(self.request) + self.zk.unlockNodeRequest(self.request) + self.done = True + return + + if self.paused: + self.log.debug("Retrying node request %s", self.request.id) + else: + self.log.debug("Accepting node request %s", self.request.id) + self.request.state = zk.PENDING + self.zk.storeNodeRequest(self.request) + + self._waitForNodeSet() diff --git a/nodepool/driver/openstack/provider.py b/nodepool/driver/openstack/provider.py new file mode 100755 index 000000000..f9fe1b6cf --- /dev/null +++ b/nodepool/driver/openstack/provider.py @@ -0,0 +1,344 @@ +# Copyright (C) 2011-2013 OpenStack Foundation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from contextlib import contextmanager +import operator + +import shade + +from nodepool import exceptions +from nodepool.driver import Provider +from nodepool.nodeutils import iterate_timeout +from nodepool.task_manager import ManagerStoppedException +from nodepool.task_manager import TaskManager + + +IPS_LIST_AGE = 5 # How long to keep a cached copy of the ip list + + +@contextmanager +def shade_inner_exceptions(): + try: + yield + except shade.OpenStackCloudException as e: + e.log_error() + raise + + +class OpenStackProvider(Provider): + log = logging.getLogger("nodepool.driver.openstack.OpenStackProvider") + + def __init__(self, provider, use_taskmanager): + self.provider = provider + self._images = {} + self._networks = {} + self.__flavors = {} + self.__azs = None + self._use_taskmanager = use_taskmanager + self._taskmanager = None + + def start(self): + if self._use_taskmanager: + self._taskmanager = TaskManager(None, self.provider.name, + self.provider.rate) + self._taskmanager.start() + self.resetClient() + + def stop(self): + if self._taskmanager: + self._taskmanager.stop() + + def join(self): + if self._taskmanager: + self._taskmanager.join() + + @property + def _flavors(self): + if not self.__flavors: + self.__flavors = self._getFlavors() + return self.__flavors + + def _getClient(self): + if self._use_taskmanager: + manager = self._taskmanager + else: + manager = None + return shade.OpenStackCloud( + cloud_config=self.provider.cloud_config, + manager=manager, + **self.provider.cloud_config.config) + + def resetClient(self): + self._client = self._getClient() + if self._use_taskmanager: + self._taskmanager.setClient(self._client) + + def _getFlavors(self): + flavors = self.listFlavors() + flavors.sort(key=operator.itemgetter('ram')) + return flavors + + # TODO(mordred): These next three methods duplicate logic that is in + # shade, but we can't defer to shade until we're happy + # with using shade's resource caching facility. We have + # not yet proven that to our satisfaction, but if/when + # we do, these should be able to go away. + def _findFlavorByName(self, flavor_name): + for f in self._flavors: + if flavor_name in (f['name'], f['id']): + return f + raise Exception("Unable to find flavor: %s" % flavor_name) + + def _findFlavorByRam(self, min_ram, flavor_name): + for f in self._flavors: + if (f['ram'] >= min_ram + and (not flavor_name or flavor_name in f['name'])): + return f + raise Exception("Unable to find flavor with min ram: %s" % min_ram) + + def findFlavor(self, flavor_name, min_ram): + # Note: this will throw an error if the provider is offline + # but all the callers are in threads (they call in via CreateServer) so + # the mainloop won't be affected. + if min_ram: + return self._findFlavorByRam(min_ram, flavor_name) + else: + return self._findFlavorByName(flavor_name) + + def findImage(self, name): + if name in self._images: + return self._images[name] + + with shade_inner_exceptions(): + image = self._client.get_image(name) + self._images[name] = image + return image + + def findNetwork(self, name): + if name in self._networks: + return self._networks[name] + + with shade_inner_exceptions(): + network = self._client.get_network(name) + self._networks[name] = network + return network + + def deleteImage(self, name): + if name in self._images: + del self._images[name] + + with shade_inner_exceptions(): + return self._client.delete_image(name) + + def createServer(self, name, image, + flavor_name=None, min_ram=None, + az=None, key_name=None, config_drive=True, + nodepool_node_id=None, nodepool_node_label=None, + nodepool_image_name=None, + networks=None, boot_from_volume=False, volume_size=50): + if not networks: + networks = [] + if not isinstance(image, dict): + # if it's a dict, we already have the cloud id. If it's not, + # we don't know if it's name or ID so need to look it up + image = self.findImage(image) + flavor = self.findFlavor(flavor_name=flavor_name, min_ram=min_ram) + create_args = dict(name=name, + image=image, + flavor=flavor, + config_drive=config_drive) + if boot_from_volume: + create_args['boot_from_volume'] = boot_from_volume + create_args['volume_size'] = volume_size + # NOTE(pabelanger): Always cleanup volumes when we delete a server. + create_args['terminate_volume'] = True + if key_name: + create_args['key_name'] = key_name + if az: + create_args['availability_zone'] = az + nics = [] + for network in networks: + net_id = self.findNetwork(network)['id'] + nics.append({'net-id': net_id}) + if nics: + create_args['nics'] = nics + # Put provider.name and image_name in as groups so that ansible + # inventory can auto-create groups for us based on each of those + # qualities + # Also list each of those values directly so that non-ansible + # consumption programs don't need to play a game of knowing that + # groups[0] is the image name or anything silly like that. + groups_list = [self.provider.name] + + if nodepool_image_name: + groups_list.append(nodepool_image_name) + if nodepool_node_label: + groups_list.append(nodepool_node_label) + meta = dict( + groups=",".join(groups_list), + nodepool_provider_name=self.provider.name, + ) + if nodepool_node_id: + meta['nodepool_node_id'] = nodepool_node_id + if nodepool_image_name: + meta['nodepool_image_name'] = nodepool_image_name + if nodepool_node_label: + meta['nodepool_node_label'] = nodepool_node_label + create_args['meta'] = meta + + with shade_inner_exceptions(): + return self._client.create_server(wait=False, **create_args) + + def getServer(self, server_id): + with shade_inner_exceptions(): + return self._client.get_server(server_id) + + def getServerConsole(self, server_id): + try: + with shade_inner_exceptions(): + return self._client.get_server_console(server_id) + except shade.OpenStackCloudException: + return None + + def waitForServer(self, server, timeout=3600): + with shade_inner_exceptions(): + return self._client.wait_for_server( + server=server, auto_ip=True, reuse=False, + timeout=timeout) + + def waitForNodeCleanup(self, server_id, timeout=600): + for count in iterate_timeout( + timeout, exceptions.ServerDeleteException, + "server %s deletion" % server_id): + if not self.getServer(server_id): + return + + def waitForImage(self, image_id, timeout=3600): + last_status = None + for count in iterate_timeout( + timeout, exceptions.ImageCreateException, "image creation"): + try: + image = self.getImage(image_id) + except exceptions.NotFound: + continue + except ManagerStoppedException: + raise + except Exception: + self.log.exception('Unable to list images while waiting for ' + '%s will retry' % (image_id)) + continue + + # shade returns None when not found + if not image: + continue + + status = image['status'] + if (last_status != status): + self.log.debug( + 'Status of image in {provider} {id}: {status}'.format( + provider=self.provider.name, + id=image_id, + status=status)) + if status == 'ERROR' and 'fault' in image: + self.log.debug( + 'ERROR in {provider} on {id}: {resason}'.format( + provider=self.provider.name, + id=image_id, + resason=image['fault']['message'])) + last_status = status + # Glance client returns lower case statuses - but let's be sure + if status.lower() in ['active', 'error']: + return image + + def createImage(self, server, image_name, meta): + with shade_inner_exceptions(): + return self._client.create_image_snapshot( + image_name, server, **meta) + + def getImage(self, image_id): + with shade_inner_exceptions(): + return self._client.get_image(image_id) + + def labelReady(self, image_id): + return self.getImage(image_id) + + def uploadImage(self, image_name, filename, image_type=None, meta=None, + md5=None, sha256=None): + # configure glance and upload image. Note the meta flags + # are provided as custom glance properties + # NOTE: we have wait=True set here. This is not how we normally + # do things in nodepool, preferring to poll ourselves thankyouverymuch. + # However - two things to note: + # - PUT has no aysnc mechanism, so we have to handle it anyway + # - v2 w/task waiting is very strange and complex - but we have to + # block for our v1 clouds anyway, so we might as well + # have the interface be the same and treat faking-out + # a shade-level fake-async interface later + if not meta: + meta = {} + if image_type: + meta['disk_format'] = image_type + with shade_inner_exceptions(): + image = self._client.create_image( + name=image_name, + filename=filename, + is_public=False, + wait=True, + md5=md5, + sha256=sha256, + **meta) + return image.id + + def listImages(self): + with shade_inner_exceptions(): + return self._client.list_images() + + def listFlavors(self): + with shade_inner_exceptions(): + return self._client.list_flavors(get_extra=False) + + def listNodes(self): + # shade list_servers carries the nodepool server list caching logic + with shade_inner_exceptions(): + return self._client.list_servers() + + def deleteServer(self, server_id): + with shade_inner_exceptions(): + return self._client.delete_server(server_id, delete_ips=True) + + def cleanupNode(self, server_id): + server = self.getServer(server_id) + if not server: + raise exceptions.NotFound() + + self.log.debug('Deleting server %s' % server_id) + self.deleteServer(server_id) + + def cleanupLeakedFloaters(self): + with shade_inner_exceptions(): + self._client.delete_unattached_floating_ips() + + def getAZs(self): + if self.__azs is None: + self.__azs = self._client.list_availability_zone_names() + if not self.__azs: + # If there are no zones, return a list containing None so that + # random.choice can pick None and pass that to Nova. If this + # feels dirty, please direct your ire to policy.json and the + # ability to turn off random portions of the OpenStack API. + self.__azs = [None] + return self.__azs diff --git a/nodepool/exceptions.py b/nodepool/exceptions.py old mode 100644 new mode 100755 index 93533923b..941b59b49 --- a/nodepool/exceptions.py +++ b/nodepool/exceptions.py @@ -13,6 +13,26 @@ # under the License. +class NotFound(Exception): + pass + + +class LaunchNodepoolException(Exception): + statsd_key = 'error.nodepool' + + +class LaunchStatusException(Exception): + statsd_key = 'error.status' + + +class LaunchNetworkException(Exception): + statsd_key = 'error.network' + + +class LaunchKeyscanException(Exception): + statsd_key = 'error.keyscan' + + class BuilderError(RuntimeError): pass diff --git a/nodepool/launcher.py b/nodepool/launcher.py index e24b9006b..4c7197630 100755 --- a/nodepool/launcher.py +++ b/nodepool/launcher.py @@ -16,24 +16,20 @@ # See the License for the specific language governing permissions and # limitations under the License. -import collections import logging import os import os.path -import pprint -import random import socket import threading import time from nodepool import exceptions -from nodepool import nodeutils as utils from nodepool import provider_manager from nodepool import stats from nodepool import config as nodepool_config from nodepool import zk -from nodepool.driver import NodeRequestHandler -from nodepool.driver import NodeLaunchManager +from nodepool.driver.openstack.handler import OpenStackNodeRequestHandler + MINS = 60 HOURS = 60 * MINS @@ -44,124 +40,13 @@ SUSPEND_WAIT_TIME = 30 # How long to wait between checks for ZooKeeper # connectivity if it disappears. -class LaunchNodepoolException(Exception): - statsd_key = 'error.nodepool' - - -class LaunchStatusException(Exception): - statsd_key = 'error.status' - - -class LaunchNetworkException(Exception): - statsd_key = 'error.network' - - -class LaunchKeyscanException(Exception): - statsd_key = 'error.keyscan' - - -class StatsReporter(object): - ''' - Class adding statsd reporting functionality. - ''' - def __init__(self): - super(StatsReporter, self).__init__() - self._statsd = stats.get_client() - - def recordLaunchStats(self, subkey, dt, image_name, - provider_name, node_az, requestor): - ''' - Record node launch statistics. - - :param str subkey: statsd key - :param int dt: Time delta in milliseconds - :param str image_name: Name of the image used - :param str provider_name: Name of the provider - :param str node_az: AZ of the launched node - :param str requestor: Identifier for the request originator - ''' - if not self._statsd: - return - - keys = [ - 'nodepool.launch.provider.%s.%s' % (provider_name, subkey), - 'nodepool.launch.image.%s.%s' % (image_name, subkey), - 'nodepool.launch.%s' % (subkey,), - ] - - if node_az: - keys.append('nodepool.launch.provider.%s.%s.%s' % - (provider_name, node_az, subkey)) - - if requestor: - # Replace '.' which is a graphite hierarchy, and ':' which is - # a statsd delimeter. - requestor = requestor.replace('.', '_') - requestor = requestor.replace(':', '_') - keys.append('nodepool.launch.requestor.%s.%s' % - (requestor, subkey)) - - for key in keys: - self._statsd.timing(key, dt) - self._statsd.incr(key) - - - def updateNodeStats(self, zk_conn, provider): - ''' - Refresh statistics for all known nodes. - - :param ZooKeeper zk_conn: A ZooKeeper connection object. - :param Provider provider: A config Provider object. - ''' - if not self._statsd: - return - - states = {} - - # Initialize things we know about to zero - for state in zk.Node.VALID_STATES: - key = 'nodepool.nodes.%s' % state - states[key] = 0 - key = 'nodepool.provider.%s.nodes.%s' % (provider.name, state) - states[key] = 0 - - for node in zk_conn.nodeIterator(): - #nodepool.nodes.STATE - key = 'nodepool.nodes.%s' % node.state - states[key] += 1 - - #nodepool.label.LABEL.nodes.STATE - key = 'nodepool.label.%s.nodes.%s' % (node.type, node.state) - # It's possible we could see node types that aren't in our config - if key in states: - states[key] += 1 - else: - states[key] = 1 - - #nodepool.provider.PROVIDER.nodes.STATE - key = 'nodepool.provider.%s.nodes.%s' % (node.provider, node.state) - # It's possible we could see providers that aren't in our config - if key in states: - states[key] += 1 - else: - states[key] = 1 - - for key, count in states.items(): - self._statsd.gauge(key, count) - - #nodepool.provider.PROVIDER.max_servers - key = 'nodepool.provider.%s.max_servers' % provider.name - max_servers = sum([p.max_servers for p in provider.pools.values()]) - self._statsd.gauge(key, max_servers) - - -class InstanceDeleter(threading.Thread, StatsReporter): - log = logging.getLogger("nodepool.InstanceDeleter") +class NodeDeleter(threading.Thread, stats.StatsReporter): + log = logging.getLogger("nodepool.NodeDeleter") def __init__(self, zk, manager, node): - threading.Thread.__init__(self, name='InstanceDeleter for %s %s' % + threading.Thread.__init__(self, name='NodeDeleter for %s %s' % (node.provider, node.external_id)) - StatsReporter.__init__(self) + stats.StatsReporter.__init__(self) self._zk = zk self._manager = manager self._node = node @@ -188,11 +73,11 @@ class InstanceDeleter(threading.Thread, StatsReporter): if node.external_id: manager.cleanupNode(node.external_id) manager.waitForNodeCleanup(node.external_id) - except provider_manager.NotFound: - InstanceDeleter.log.info("Instance %s not found in provider %s", - node.external_id, node.provider) + except exceptions.NotFound: + NodeDeleter.log.info("Instance %s not found in provider %s", + node.external_id, node.provider) except Exception: - InstanceDeleter.log.exception( + NodeDeleter.log.exception( "Exception deleting instance %s from %s:", node.external_id, node.provider) # Don't delete the ZK node in this case, but do unlock it @@ -201,7 +86,7 @@ class InstanceDeleter(threading.Thread, StatsReporter): return if node_exists: - InstanceDeleter.log.info( + NodeDeleter.log.info( "Deleting ZK node id=%s, state=%s, external_id=%s", node.id, node.state, node.external_id) # This also effectively releases the lock @@ -223,480 +108,6 @@ class InstanceDeleter(threading.Thread, StatsReporter): self.log.exception("Exception while reporting stats:") -class NodeLauncher(threading.Thread, StatsReporter): - - def __init__(self, zk, provider_label, provider_manager, requestor, - node, retries): - ''' - Initialize the launcher. - - :param ZooKeeper zk: A ZooKeeper object. - :param ProviderLabel provider: A config ProviderLabel object. - :param ProviderManager provider_manager: The manager object used to - interact with the selected provider. - :param str requestor: Identifier for the request originator. - :param Node node: The node object. - :param int retries: Number of times to retry failed launches. - ''' - threading.Thread.__init__(self, name="NodeLauncher-%s" % node.id) - StatsReporter.__init__(self) - self.log = logging.getLogger("nodepool.NodeLauncher-%s" % node.id) - self._zk = zk - self._label = provider_label - self._manager = provider_manager - self._node = node - self._retries = retries - self._image_name = None - self._requestor = requestor - - self._pool = self._label.pool - self._provider = self._pool.provider - if self._label.diskimage: - self._diskimage = self._provider.diskimages[self._label.diskimage.name] - else: - self._diskimage = None - self._cloud_image = self._provider.cloud_images.get(self._label.cloud_image, None) - - def logConsole(self, server_id, hostname): - if not self._label.console_log: - return - console = self._manager.getServerConsole(server_id) - if console: - self.log.debug('Console log from hostname %s:' % hostname) - for line in console.splitlines(): - self.log.debug(line.rstrip()) - - def _launchNode(self): - if self._label.diskimage: - # launch using diskimage - cloud_image = self._zk.getMostRecentImageUpload( - self._diskimage.name, self._provider.name) - - if not cloud_image: - raise LaunchNodepoolException( - "Unable to find current cloud image %s in %s" % - (self._diskimage.name, self._provider.name) - ) - - config_drive = self._diskimage.config_drive - image_external = dict(id=cloud_image.external_id) - image_id = "{path}/{upload_id}".format( - path=self._zk._imageUploadPath(cloud_image.image_name, - cloud_image.build_id, - cloud_image.provider_name), - upload_id=cloud_image.id) - image_name = self._diskimage.name - - else: - # launch using unmanaged cloud image - config_drive = self._cloud_image.config_drive - - # These are different values for zk, but it's all the same - # for cloud-images. - # image_external is what we use for OpenStack. - # image_id is what we record in the node for zk. - # image_name is what we log, so matches the config. - if self._cloud_image.image_id: - image_external = dict(id=self._cloud_image.image_id) - elif self._cloud_image.image_name: - image_external = self._cloud_image.image_name - else: - image_external = self._cloud_image.name - image_id = self._cloud_image.name - image_name = self._cloud_image.name - - hostname = self._provider.hostname_format.format( - label=self._label, provider=self._provider, node=self._node - ) - - self.log.info("Creating server with hostname %s in %s from image %s " - "for node id: %s" % (hostname, self._provider.name, - image_name, - self._node.id)) - - # NOTE: We store the node ID in the server metadata to use for leaked - # instance detection. We cannot use the external server ID for this - # because that isn't available in ZooKeeper until after the server is - # active, which could cause a race in leak detection. - - server = self._manager.createServer( - hostname, - image=image_external, - min_ram=self._label.min_ram, - flavor_name=self._label.flavor_name, - key_name=self._label.key_name, - az=self._node.az, - config_drive=config_drive, - nodepool_node_id=self._node.id, - nodepool_node_label=self._node.type, - nodepool_image_name=image_name, - networks=self._pool.networks, - boot_from_volume=self._label.boot_from_volume, - volume_size=self._label.volume_size) - - self._node.external_id = server.id - self._node.hostname = hostname - self._node.image_id = image_id - - # Checkpoint save the updated node info - self._zk.storeNode(self._node) - - self.log.debug("Waiting for server %s for node id: %s" % - (server.id, self._node.id)) - server = self._manager.waitForServer( - server, self._provider.launch_timeout) - - if server.status != 'ACTIVE': - raise LaunchStatusException("Server %s for node id: %s " - "status: %s" % - (server.id, self._node.id, - server.status)) - - # If we didn't specify an AZ, set it to the one chosen by Nova. - # Do this after we are done waiting since AZ may not be available - # immediately after the create request. - if not self._node.az: - self._node.az = server.location.zone - - interface_ip = server.interface_ip - if not interface_ip: - self.log.debug( - "Server data for failed IP: %s" % pprint.pformat( - server)) - raise LaunchNetworkException("Unable to find public IP of server") - - self._node.interface_ip = interface_ip - self._node.public_ipv4 = server.public_v4 - self._node.public_ipv6 = server.public_v6 - self._node.private_ipv4 = server.private_v4 - # devstack-gate multi-node depends on private_v4 being populated - # with something. On clouds that don't have a private address, use - # the public. - if not self._node.private_ipv4: - self._node.private_ipv4 = server.public_v4 - - # Checkpoint save the updated node info - self._zk.storeNode(self._node) - - self.log.debug( - "Node %s is running [region: %s, az: %s, ip: %s ipv4: %s, " - "ipv6: %s]" % - (self._node.id, self._node.region, self._node.az, - self._node.interface_ip, self._node.public_ipv4, - self._node.public_ipv6)) - - # Get the SSH public keys for the new node and record in ZooKeeper - try: - self.log.debug("Gathering host keys for node %s", self._node.id) - host_keys = utils.keyscan( - interface_ip, timeout=self._provider.boot_timeout) - if not host_keys: - raise LaunchKeyscanException("Unable to gather host keys") - except exceptions.SSHTimeoutException: - self.logConsole(self._node.external_id, self._node.hostname) - raise - - self._node.host_keys = host_keys - self._zk.storeNode(self._node) - - def _run(self): - attempts = 1 - while attempts <= self._retries: - try: - self._launchNode() - break - except Exception: - if attempts <= self._retries: - self.log.exception( - "Launch attempt %d/%d failed for node %s:", - attempts, self._retries, self._node.id) - # If we created an instance, delete it. - if self._node.external_id: - self._manager.cleanupNode(self._node.external_id) - self._manager.waitForNodeCleanup(self._node.external_id) - self._node.external_id = None - self._node.public_ipv4 = None - self._node.public_ipv6 = None - self._node.inerface_ip = None - self._zk.storeNode(self._node) - if attempts == self._retries: - raise - attempts += 1 - - self._node.state = zk.READY - self._zk.storeNode(self._node) - self.log.info("Node id %s is ready", self._node.id) - - def run(self): - start_time = time.time() - statsd_key = 'ready' - - try: - self._run() - except Exception as e: - self.log.exception("Launch failed for node %s:", - self._node.id) - self._node.state = zk.FAILED - self._zk.storeNode(self._node) - - if hasattr(e, 'statsd_key'): - statsd_key = e.statsd_key - else: - statsd_key = 'error.unknown' - - try: - dt = int((time.time() - start_time) * 1000) - self.recordLaunchStats(statsd_key, dt, self._image_name, - self._node.provider, self._node.az, - self._requestor) - self.updateNodeStats(self._zk, self._provider) - except Exception: - self.log.exception("Exception while reporting stats:") - - -class OpenStackNodeLaunchManager(NodeLaunchManager): - def launch(self, node): - ''' - Launch a new node as described by the supplied Node. - - We expect each NodeLauncher thread to directly modify the node that - is passed to it. The poll() method will expect to see the node.state - attribute to change as the node is processed. - - :param Node node: The node object. - ''' - self._nodes.append(node) - provider_label = self._pool.labels[node.type] - t = NodeLauncher(self._zk, provider_label, self._manager, - self._requestor, node, self._retries) - t.start() - self._threads.append(t) - - -class OpenStackNodeRequestHandler(NodeRequestHandler): - log = logging.getLogger("nodepool.OpenStackNodeRequestHandler") - - def __init__(self, pw, request): - super(OpenStackNodeRequestHandler, self).__init__(pw, request) - self.chosen_az = None - - def _imagesAvailable(self): - ''' - Determines if the requested images are available for this provider. - - ZooKeeper is queried for an image uploaded to the provider that is - in the READY state. - - :returns: True if it is available, False otherwise. - ''' - for label in self.request.node_types: - - if self.pool.labels[label].cloud_image: - img = self.pool.labels[label].cloud_image - if not self.manager.labelReady(img): - return False - else: - img = self.pool.labels[label].diskimage.name - - if not self.zk.getMostRecentImageUpload(img, self.provider.name): - return False - return True - - def _invalidNodeTypes(self): - ''' - Return any node types that are invalid for this provider. - - :returns: A list of node type names that are invalid, or an empty - list if all are valid. - ''' - invalid = [] - for ntype in self.request.node_types: - if ntype not in self.pool.labels: - invalid.append(ntype) - return invalid - - def _countNodes(self): - ''' - Query ZooKeeper to determine the number of provider nodes launched. - - :returns: An integer for the number launched for this provider. - ''' - count = 0 - for node in self.zk.nodeIterator(): - if (node.provider == self.provider.name and - node.pool == self.pool.name): - count += 1 - return count - - def _waitForNodeSet(self): - ''' - Fill node set for the request. - - Obtain nodes for the request, pausing all new request handling for - this provider until the node set can be filled. - - We attempt to group the node set within the same provider availability - zone. For this to work properly, the provider entry in the nodepool - config must list the availability zones. Otherwise, new nodes will be - put in random AZs at nova's whim. The exception being if there is an - existing node in the READY state that we can select for this node set. - Its AZ will then be used for new nodes, as well as any other READY - nodes. - - note:: This code is a bit racey in its calculation of the number of - nodes in use for quota purposes. It is possible for multiple - launchers to be doing this calculation at the same time. Since we - currently have no locking mechanism around the "in use" - calculation, if we are at the edge of the quota, one of the - launchers could attempt to launch a new node after the other - launcher has already started doing so. This would cause an - expected failure from the underlying library, which is ok for now. - ''' - if not self.launch_manager: - self.launch_manager = OpenStackNodeLaunchManager( - self.zk, self.pool, self.manager, - self.request.requestor, retries=self.provider.launch_retries) - - # Since this code can be called more than once for the same request, - # we need to calculate the difference between our current node set - # and what was requested. We cannot use set operations here since a - # node type can appear more than once in the requested types. - saved_types = collections.Counter([n.type for n in self.nodeset]) - requested_types = collections.Counter(self.request.node_types) - diff = requested_types - saved_types - needed_types = list(diff.elements()) - - ready_nodes = self.zk.getReadyNodesOfTypes(needed_types) - - for ntype in needed_types: - # First try to grab from the list of already available nodes. - got_a_node = False - if self.request.reuse and ntype in ready_nodes: - for node in ready_nodes[ntype]: - # Only interested in nodes from this provider and - # pool, and within the selected AZ. - if node.provider != self.provider.name: - continue - if node.pool != self.pool.name: - continue - if self.chosen_az and node.az != self.chosen_az: - continue - - try: - self.zk.lockNode(node, blocking=False) - except exceptions.ZKLockException: - # It's already locked so skip it. - continue - else: - if self.paused: - self.log.debug("Unpaused request %s", self.request) - self.paused = False - - self.log.debug( - "Locked existing node %s for request %s", - node.id, self.request.id) - got_a_node = True - node.allocated_to = self.request.id - self.zk.storeNode(node) - self.nodeset.append(node) - - # If we haven't already chosen an AZ, select the - # AZ from this ready node. This will cause new nodes - # to share this AZ, as well. - if not self.chosen_az and node.az: - self.chosen_az = node.az - break - - # Could not grab an existing node, so launch a new one. - if not got_a_node: - # Select grouping AZ if we didn't set AZ from a selected, - # pre-existing node - if not self.chosen_az: - self.chosen_az = random.choice( - self.pool.azs or self.manager.getAZs()) - - # If we calculate that we're at capacity, pause until nodes - # are released by Zuul and removed by the DeletedNodeWorker. - if self._countNodes() >= self.pool.max_servers: - if not self.paused: - self.log.debug( - "Pausing request handling to satisfy request %s", - self.request) - self.paused = True - return - - if self.paused: - self.log.debug("Unpaused request %s", self.request) - self.paused = False - - node = zk.Node() - node.state = zk.INIT - node.type = ntype - node.provider = self.provider.name - node.pool = self.pool.name - node.az = self.chosen_az - node.region = self.provider.region_name - node.launcher = self.launcher_id - node.allocated_to = self.request.id - - # Note: It should be safe (i.e., no race) to lock the node - # *after* it is stored since nodes in INIT state are not - # locked anywhere. - self.zk.storeNode(node) - self.zk.lockNode(node, blocking=False) - self.log.debug("Locked building node %s for request %s", - node.id, self.request.id) - - # Set state AFTER lock so sthat it isn't accidentally cleaned - # up (unlocked BUILDING nodes will be deleted). - node.state = zk.BUILDING - self.zk.storeNode(node) - - self.nodeset.append(node) - self.launch_manager.launch(node) - - def run_handler(self): - ''' - Main body for the NodeRequestHandler. - ''' - self._setFromPoolWorker() - - declined_reasons = [] - invalid_types = self._invalidNodeTypes() - if invalid_types: - declined_reasons.append('node type(s) [%s] not available' % - ','.join(invalid_types)) - elif not self._imagesAvailable(): - declined_reasons.append('images are not available') - if len(self.request.node_types) > self.pool.max_servers: - declined_reasons.append('it would exceed quota') - - if declined_reasons: - self.log.debug("Declining node request %s because %s", - self.request.id, ', '.join(declined_reasons)) - self.request.declined_by.append(self.launcher_id) - launchers = set(self.zk.getRegisteredLaunchers()) - if launchers.issubset(set(self.request.declined_by)): - self.log.debug("Failing declined node request %s", - self.request.id) - # All launchers have declined it - self.request.state = zk.FAILED - self.unlockNodeSet(clear_allocation=True) - self.zk.storeNodeRequest(self.request) - self.zk.unlockNodeRequest(self.request) - self.done = True - return - - if self.paused: - self.log.debug("Retrying node request %s", self.request.id) - else: - self.log.debug("Accepting node request %s", self.request.id) - self.request.state = zk.PENDING - self.zk.storeNodeRequest(self.request) - - self._waitForNodeSet() - - class PoolWorker(threading.Thread): ''' Class that manages node requests for a single provider pool. @@ -893,7 +304,7 @@ class BaseCleanupWorker(threading.Thread): self.log.info("Deleting %s instance %s from %s", node.state, node.external_id, node.provider) try: - t = InstanceDeleter( + t = NodeDeleter( self._nodepool.getZK(), self._nodepool.getProviderManager(node.provider), node) @@ -1083,7 +494,7 @@ class CleanupWorker(BaseCleanupWorker): zk_conn.unlockNode(node) continue - # The InstanceDeleter thread will unlock and remove the + # The NodeDeleter thread will unlock and remove the # node from ZooKeeper if it succeeds. self._deleteInstance(node) @@ -1169,7 +580,7 @@ class DeletedNodeWorker(BaseCleanupWorker): zk_conn.unlockNode(node) continue - # The InstanceDeleter thread will unlock and remove the + # The NodeDeleter thread will unlock and remove the # node from ZooKeeper if it succeeds. self._deleteInstance(node) diff --git a/nodepool/provider_manager.py b/nodepool/provider_manager.py index 5a9c35a47..93f689734 100755 --- a/nodepool/provider_manager.py +++ b/nodepool/provider_manager.py @@ -17,33 +17,9 @@ # limitations under the License. import logging -from contextlib import contextmanager -import operator -import shade - -from nodepool import exceptions -from nodepool import fakeprovider -from nodepool.driver import Provider -from nodepool.nodeutils import iterate_timeout -from nodepool.task_manager import ManagerStoppedException -from nodepool.task_manager import TaskManager - - -IPS_LIST_AGE = 5 # How long to keep a cached copy of the ip list - - -@contextmanager -def shade_inner_exceptions(): - try: - yield - except shade.OpenStackCloudException as e: - e.log_error() - raise - - -class NotFound(Exception): - pass +from nodepool.driver.fake.provider import FakeProvider +from nodepool.driver.openstack.provider import OpenStackProvider def get_provider(provider, use_taskmanager): @@ -83,324 +59,3 @@ class ProviderManager(object): for m in config.provider_managers.values(): m.stop() m.join() - - -class OpenStackProvider(Provider): - log = logging.getLogger("nodepool.OpenStackProvider") - - def __init__(self, provider, use_taskmanager): - self.provider = provider - self._images = {} - self._networks = {} - self.__flavors = {} - self.__azs = None - self._use_taskmanager = use_taskmanager - self._taskmanager = None - - def start(self): - if self._use_taskmanager: - self._taskmanager = TaskManager(None, self.provider.name, - self.provider.rate) - self._taskmanager.start() - self.resetClient() - - def stop(self): - if self._taskmanager: - self._taskmanager.stop() - - def join(self): - if self._taskmanager: - self._taskmanager.join() - - @property - def _flavors(self): - if not self.__flavors: - self.__flavors = self._getFlavors() - return self.__flavors - - def _getClient(self): - if self._use_taskmanager: - manager = self._taskmanager - else: - manager = None - return shade.OpenStackCloud( - cloud_config=self.provider.cloud_config, - manager=manager, - **self.provider.cloud_config.config) - - def resetClient(self): - self._client = self._getClient() - if self._use_taskmanager: - self._taskmanager.setClient(self._client) - - def _getFlavors(self): - flavors = self.listFlavors() - flavors.sort(key=operator.itemgetter('ram')) - return flavors - - # TODO(mordred): These next three methods duplicate logic that is in - # shade, but we can't defer to shade until we're happy - # with using shade's resource caching facility. We have - # not yet proven that to our satisfaction, but if/when - # we do, these should be able to go away. - def _findFlavorByName(self, flavor_name): - for f in self._flavors: - if flavor_name in (f['name'], f['id']): - return f - raise Exception("Unable to find flavor: %s" % flavor_name) - - def _findFlavorByRam(self, min_ram, flavor_name): - for f in self._flavors: - if (f['ram'] >= min_ram - and (not flavor_name or flavor_name in f['name'])): - return f - raise Exception("Unable to find flavor with min ram: %s" % min_ram) - - def findFlavor(self, flavor_name, min_ram): - # Note: this will throw an error if the provider is offline - # but all the callers are in threads (they call in via CreateServer) so - # the mainloop won't be affected. - if min_ram: - return self._findFlavorByRam(min_ram, flavor_name) - else: - return self._findFlavorByName(flavor_name) - - def findImage(self, name): - if name in self._images: - return self._images[name] - - with shade_inner_exceptions(): - image = self._client.get_image(name) - self._images[name] = image - return image - - def findNetwork(self, name): - if name in self._networks: - return self._networks[name] - - with shade_inner_exceptions(): - network = self._client.get_network(name) - self._networks[name] = network - return network - - def deleteImage(self, name): - if name in self._images: - del self._images[name] - - with shade_inner_exceptions(): - return self._client.delete_image(name) - - def createServer(self, name, image, - flavor_name=None, min_ram=None, - az=None, key_name=None, config_drive=True, - nodepool_node_id=None, nodepool_node_label=None, - nodepool_image_name=None, - networks=None, boot_from_volume=False, volume_size=50): - if not networks: - networks = [] - if not isinstance(image, dict): - # if it's a dict, we already have the cloud id. If it's not, - # we don't know if it's name or ID so need to look it up - image = self.findImage(image) - flavor = self.findFlavor(flavor_name=flavor_name, min_ram=min_ram) - create_args = dict(name=name, - image=image, - flavor=flavor, - config_drive=config_drive) - if boot_from_volume: - create_args['boot_from_volume'] = boot_from_volume - create_args['volume_size'] = volume_size - # NOTE(pabelanger): Always cleanup volumes when we delete a server. - create_args['terminate_volume'] = True - if key_name: - create_args['key_name'] = key_name - if az: - create_args['availability_zone'] = az - nics = [] - for network in networks: - net_id = self.findNetwork(network)['id'] - nics.append({'net-id': net_id}) - if nics: - create_args['nics'] = nics - # Put provider.name and image_name in as groups so that ansible - # inventory can auto-create groups for us based on each of those - # qualities - # Also list each of those values directly so that non-ansible - # consumption programs don't need to play a game of knowing that - # groups[0] is the image name or anything silly like that. - groups_list = [self.provider.name] - - if nodepool_image_name: - groups_list.append(nodepool_image_name) - if nodepool_node_label: - groups_list.append(nodepool_node_label) - meta = dict( - groups=",".join(groups_list), - nodepool_provider_name=self.provider.name, - ) - if nodepool_node_id: - meta['nodepool_node_id'] = nodepool_node_id - if nodepool_image_name: - meta['nodepool_image_name'] = nodepool_image_name - if nodepool_node_label: - meta['nodepool_node_label'] = nodepool_node_label - create_args['meta'] = meta - - with shade_inner_exceptions(): - return self._client.create_server(wait=False, **create_args) - - def getServer(self, server_id): - with shade_inner_exceptions(): - return self._client.get_server(server_id) - - def getServerConsole(self, server_id): - try: - with shade_inner_exceptions(): - return self._client.get_server_console(server_id) - except shade.OpenStackCloudException: - return None - - def waitForServer(self, server, timeout=3600): - with shade_inner_exceptions(): - return self._client.wait_for_server( - server=server, auto_ip=True, reuse=False, - timeout=timeout) - - def waitForNodeCleanup(self, server_id, timeout=600): - for count in iterate_timeout( - timeout, exceptions.ServerDeleteException, - "server %s deletion" % server_id): - if not self.getServer(server_id): - return - - def waitForImage(self, image_id, timeout=3600): - last_status = None - for count in iterate_timeout( - timeout, exceptions.ImageCreateException, "image creation"): - try: - image = self.getImage(image_id) - except NotFound: - continue - except ManagerStoppedException: - raise - except Exception: - self.log.exception('Unable to list images while waiting for ' - '%s will retry' % (image_id)) - continue - - # shade returns None when not found - if not image: - continue - - status = image['status'] - if (last_status != status): - self.log.debug( - 'Status of image in {provider} {id}: {status}'.format( - provider=self.provider.name, - id=image_id, - status=status)) - if status == 'ERROR' and 'fault' in image: - self.log.debug( - 'ERROR in {provider} on {id}: {resason}'.format( - provider=self.provider.name, - id=image_id, - resason=image['fault']['message'])) - last_status = status - # Glance client returns lower case statuses - but let's be sure - if status.lower() in ['active', 'error']: - return image - - def createImage(self, server, image_name, meta): - with shade_inner_exceptions(): - return self._client.create_image_snapshot( - image_name, server, **meta) - - def getImage(self, image_id): - with shade_inner_exceptions(): - return self._client.get_image(image_id) - - def labelReady(self, image_id): - return self.getImage(image_id) - - def uploadImage(self, image_name, filename, image_type=None, meta=None, - md5=None, sha256=None): - # configure glance and upload image. Note the meta flags - # are provided as custom glance properties - # NOTE: we have wait=True set here. This is not how we normally - # do things in nodepool, preferring to poll ourselves thankyouverymuch. - # However - two things to note: - # - PUT has no aysnc mechanism, so we have to handle it anyway - # - v2 w/task waiting is very strange and complex - but we have to - # block for our v1 clouds anyway, so we might as well - # have the interface be the same and treat faking-out - # a shade-level fake-async interface later - if not meta: - meta = {} - if image_type: - meta['disk_format'] = image_type - with shade_inner_exceptions(): - image = self._client.create_image( - name=image_name, - filename=filename, - is_public=False, - wait=True, - md5=md5, - sha256=sha256, - **meta) - return image.id - - def listImages(self): - with shade_inner_exceptions(): - return self._client.list_images() - - def listFlavors(self): - with shade_inner_exceptions(): - return self._client.list_flavors(get_extra=False) - - def listNodes(self): - # shade list_servers carries the nodepool server list caching logic - with shade_inner_exceptions(): - return self._client.list_servers() - - def deleteServer(self, server_id): - with shade_inner_exceptions(): - return self._client.delete_server(server_id, delete_ips=True) - - def cleanupNode(self, server_id): - server = self.getServer(server_id) - if not server: - raise NotFound() - - self.log.debug('Deleting server %s' % server_id) - self.deleteServer(server_id) - - def cleanupLeakedFloaters(self): - with shade_inner_exceptions(): - self._client.delete_unattached_floating_ips() - - def getAZs(self): - if self.__azs is None: - self.__azs = self._client.list_availability_zone_names() - if not self.__azs: - # If there are no zones, return a list containing None so that - # random.choice can pick None and pass that to Nova. If this - # feels dirty, please direct your ire to policy.json and the - # ability to turn off random portions of the OpenStack API. - self.__azs = [None] - return self.__azs - - -class FakeProvider(OpenStackProvider): - def __init__(self, provider, use_taskmanager): - self.createServer_fails = 0 - self.__client = fakeprovider.FakeOpenStackCloud() - super(FakeProvider, self).__init__(provider, use_taskmanager) - - def _getClient(self): - return self.__client - - def createServer(self, *args, **kwargs): - while self.createServer_fails: - self.createServer_fails -= 1 - raise Exception("Expected createServer exception") - return super(FakeProvider, self).createServer(*args, **kwargs) diff --git a/nodepool/stats.py b/nodepool/stats.py old mode 100644 new mode 100755 index 772281df1..675d7d21d --- a/nodepool/stats.py +++ b/nodepool/stats.py @@ -20,6 +20,8 @@ import os import logging import statsd +from nodepool import zk + log = logging.getLogger("nodepool.stats") def get_client(): @@ -38,3 +40,98 @@ def get_client(): return statsd.StatsClient(**statsd_args) else: return None + + +class StatsReporter(object): + ''' + Class adding statsd reporting functionality. + ''' + def __init__(self): + super(StatsReporter, self).__init__() + self._statsd = get_client() + + def recordLaunchStats(self, subkey, dt, image_name, + provider_name, node_az, requestor): + ''' + Record node launch statistics. + + :param str subkey: statsd key + :param int dt: Time delta in milliseconds + :param str image_name: Name of the image used + :param str provider_name: Name of the provider + :param str node_az: AZ of the launched node + :param str requestor: Identifier for the request originator + ''' + if not self._statsd: + return + + keys = [ + 'nodepool.launch.provider.%s.%s' % (provider_name, subkey), + 'nodepool.launch.image.%s.%s' % (image_name, subkey), + 'nodepool.launch.%s' % (subkey,), + ] + + if node_az: + keys.append('nodepool.launch.provider.%s.%s.%s' % + (provider_name, node_az, subkey)) + + if requestor: + # Replace '.' which is a graphite hierarchy, and ':' which is + # a statsd delimeter. + requestor = requestor.replace('.', '_') + requestor = requestor.replace(':', '_') + keys.append('nodepool.launch.requestor.%s.%s' % + (requestor, subkey)) + + for key in keys: + self._statsd.timing(key, dt) + self._statsd.incr(key) + + + def updateNodeStats(self, zk_conn, provider): + ''' + Refresh statistics for all known nodes. + + :param ZooKeeper zk_conn: A ZooKeeper connection object. + :param Provider provider: A config Provider object. + ''' + if not self._statsd: + return + + states = {} + + # Initialize things we know about to zero + for state in zk.Node.VALID_STATES: + key = 'nodepool.nodes.%s' % state + states[key] = 0 + key = 'nodepool.provider.%s.nodes.%s' % (provider.name, state) + states[key] = 0 + + for node in zk_conn.nodeIterator(): + #nodepool.nodes.STATE + key = 'nodepool.nodes.%s' % node.state + states[key] += 1 + + #nodepool.label.LABEL.nodes.STATE + key = 'nodepool.label.%s.nodes.%s' % (node.type, node.state) + # It's possible we could see node types that aren't in our config + if key in states: + states[key] += 1 + else: + states[key] = 1 + + #nodepool.provider.PROVIDER.nodes.STATE + key = 'nodepool.provider.%s.nodes.%s' % (node.provider, node.state) + # It's possible we could see providers that aren't in our config + if key in states: + states[key] += 1 + else: + states[key] = 1 + + for key, count in states.items(): + self._statsd.gauge(key, count) + + #nodepool.provider.PROVIDER.max_servers + key = 'nodepool.provider.%s.max_servers' % provider.name + max_servers = sum([p.max_servers for p in provider.pools.values()]) + self._statsd.gauge(key, max_servers) diff --git a/nodepool/tests/__init__.py b/nodepool/tests/__init__.py index 52ba45813..a25c35cea 100644 --- a/nodepool/tests/__init__.py +++ b/nodepool/tests/__init__.py @@ -198,7 +198,8 @@ class BaseTestCase(testtools.TestCase): return fake_client self.useFixture(fixtures.MonkeyPatch( - 'nodepool.provider_manager.OpenStackProvider._getClient', + 'nodepool.driver.openstack.provider.OpenStackProvider.' + '_getClient', get_fake_client)) self.useFixture(fixtures.MonkeyPatch( 'nodepool.launcher._get_one_cloud', diff --git a/nodepool/tests/test_builder.py b/nodepool/tests/test_builder.py index 6f5acb0bb..4c667cb11 100644 --- a/nodepool/tests/test_builder.py +++ b/nodepool/tests/test_builder.py @@ -105,7 +105,7 @@ class TestNodePoolBuilder(tests.DBTestCase): return fake_client self.useFixture(fixtures.MonkeyPatch( - 'nodepool.provider_manager.FakeProvider._getClient', + 'nodepool.driver.fake.provider.FakeProvider._getClient', get_fake_client)) self.useFixture(fixtures.MonkeyPatch( 'nodepool.launcher._get_one_cloud', diff --git a/nodepool/tests/test_launcher.py b/nodepool/tests/test_launcher.py index 273f00bf9..95047f882 100644 --- a/nodepool/tests/test_launcher.py +++ b/nodepool/tests/test_launcher.py @@ -400,7 +400,7 @@ class TestLauncher(tests.DBTestCase): def fail_delete(self, name): raise RuntimeError('Fake Error') - fake_delete = 'nodepool.provider_manager.FakeProvider.deleteServer' + fake_delete = 'nodepool.driver.fake.provider.FakeProvider.deleteServer' self.useFixture(fixtures.MonkeyPatch(fake_delete, fail_delete)) configfile = self.setup_config('node.yaml') @@ -412,7 +412,7 @@ class TestLauncher(tests.DBTestCase): self.assertEqual(len(nodes), 1) self.zk.lockNode(nodes[0], blocking=False) - nodepool.launcher.InstanceDeleter.delete( + nodepool.launcher.NodeDeleter.delete( self.zk, pool.getProviderManager('fake-provider'), nodes[0]) # Make sure our old node is in delete state, even though delete failed diff --git a/nodepool/tests/test_nodelaunchmanager.py b/nodepool/tests/test_nodelaunchmanager.py index 70513360e..0819a5659 100644 --- a/nodepool/tests/test_nodelaunchmanager.py +++ b/nodepool/tests/test_nodelaunchmanager.py @@ -21,7 +21,7 @@ from nodepool import builder from nodepool import provider_manager from nodepool import tests from nodepool import zk -from nodepool.launcher import OpenStackNodeLaunchManager +from nodepool.driver.openstack.handler import OpenStackNodeLaunchManager class TestNodeLaunchManager(tests.DBTestCase): @@ -62,7 +62,7 @@ class TestNodeLaunchManager(tests.DBTestCase): self.assertEqual(len(mgr.ready_nodes), 1) self.assertEqual(len(mgr.failed_nodes), 0) - @mock.patch('nodepool.launcher.NodeLauncher._launchNode') + @mock.patch('nodepool.driver.openstack.handler.NodeLauncher._launchNode') def test_failed_launch(self, mock_launch): configfile = self.setup_config('node.yaml') self._setup(configfile) @@ -79,7 +79,7 @@ class TestNodeLaunchManager(tests.DBTestCase): self.assertEqual(len(mgr.failed_nodes), 1) self.assertEqual(len(mgr.ready_nodes), 0) - @mock.patch('nodepool.launcher.NodeLauncher._launchNode') + @mock.patch('nodepool.driver.openstack.handler.NodeLauncher._launchNode') def test_mixed_launch(self, mock_launch): configfile = self.setup_config('node.yaml') self._setup(configfile) diff --git a/nodepool/tests/test_shade_integration.py b/nodepool/tests/test_shade_integration.py index 66a4fbe2a..8c947d5d0 100644 --- a/nodepool/tests/test_shade_integration.py +++ b/nodepool/tests/test_shade_integration.py @@ -24,7 +24,7 @@ import yaml from nodepool import config as nodepool_config from nodepool import provider_manager from nodepool import tests -from nodepool.provider_manager import shade_inner_exceptions +from nodepool.driver.openstack.provider import shade_inner_exceptions class TestShadeIntegration(tests.IntegrationTestCase):