diff --git a/nodepool/driver/__init__.py b/nodepool/driver/__init__.py index ce926f0ad..8f6337530 100644 --- a/nodepool/driver/__init__.py +++ b/nodepool/driver/__init__.py @@ -16,9 +16,11 @@ # limitations under the License. import abc +import collections import inspect import importlib import logging +import math import os import six @@ -143,13 +145,12 @@ class Provider(object): @six.add_metaclass(abc.ABCMeta) class NodeRequestHandler(object): ''' - Class to process a single node request. + Class to process a single nodeset request. The PoolWorker thread will instantiate a class of this type for each node request that it pulls from ZooKeeper. - Subclasses are required to implement the run_handler method and the - NodeLaunchManager to kick off any threads needed to satisfy the request. + Subclasses are required to implement the launch method. ''' def __init__(self, pw, request): @@ -159,12 +160,15 @@ class NodeRequestHandler(object): ''' self.pw = pw self.request = request - self.launch_manager = None self.nodeset = [] self.done = False self.paused = False self.launcher_id = self.pw.launcher_id + self._failed_nodes = [] + self._ready_nodes = [] + self._threads = [] + def _setFromPoolWorker(self): ''' Set values that we pull from the parent PoolWorker. @@ -179,9 +183,229 @@ class NodeRequestHandler(object): @property def alive_thread_count(self): - if not self.launch_manager: - return 0 - return self.launch_manager.alive_thread_count + count = 0 + for t in self._threads: + if t.isAlive(): + count += 1 + return count + + @property + def failed_nodes(self): + return self._failed_nodes + + @property + def ready_nodes(self): + return self._ready_nodes + + def _imagesAvailable(self): + ''' + Determines if the requested images are available for this provider. + + ZooKeeper is queried for an image uploaded to the provider that is + in the READY state. + + :returns: True if it is available, False otherwise. + ''' + if self.provider.driver.manage_images: + for label in self.request.node_types: + if self.pool.labels[label].cloud_image: + if not self.manager.labelReady(self.pool.labels[label]): + return False + else: + if not self.zk.getMostRecentImageUpload( + self.pool.labels[label].diskimage.name, + self.provider.name): + return False + return True + + def _invalidNodeTypes(self): + ''' + Return any node types that are invalid for this provider. + + :returns: A list of node type names that are invalid, or an empty + list if all are valid. + ''' + invalid = [] + for ntype in self.request.node_types: + if ntype not in self.pool.labels: + invalid.append(ntype) + return invalid + + def _waitForNodeSet(self): + ''' + Fill node set for the request. + + Obtain nodes for the request, pausing all new request handling for + this provider until the node set can be filled. + + note:: This code is a bit racey in its calculation of the number of + nodes in use for quota purposes. It is possible for multiple + launchers to be doing this calculation at the same time. Since we + currently have no locking mechanism around the "in use" + calculation, if we are at the edge of the quota, one of the + launchers could attempt to launch a new node after the other + launcher has already started doing so. This would cause an + expected failure from the underlying library, which is ok for now. + ''' + # Since this code can be called more than once for the same request, + # we need to calculate the difference between our current node set + # and what was requested. We cannot use set operations here since a + # node type can appear more than once in the requested types. + saved_types = collections.Counter([n.type for n in self.nodeset]) + requested_types = collections.Counter(self.request.node_types) + diff = requested_types - saved_types + needed_types = list(diff.elements()) + + ready_nodes = self.zk.getReadyNodesOfTypes(needed_types) + + for ntype in needed_types: + # First try to grab from the list of already available nodes. + got_a_node = False + if self.request.reuse and ntype in ready_nodes: + for node in ready_nodes[ntype]: + # Only interested in nodes from this provider and pool + if node.provider != self.provider.name: + continue + if node.pool != self.pool.name: + continue + # Check this driver reuse requirements + if not self.checkReusableNode(node): + continue + try: + self.zk.lockNode(node, blocking=False) + except exceptions.ZKLockException: + # It's already locked so skip it. + continue + else: + if self.paused: + self.log.debug("Unpaused request %s", self.request) + self.paused = False + + self.log.debug( + "Locked existing node %s for request %s", + node.id, self.request.id) + got_a_node = True + node.allocated_to = self.request.id + self.zk.storeNode(node) + self.nodeset.append(node) + # Notify driver handler about node re-use + self.nodeReused(node) + break + + # Could not grab an existing node, so launch a new one. + if not got_a_node: + # If we calculate that we're at capacity, pause until nodes + # are released by Zuul and removed by the DeletedNodeWorker. + if not self.hasRemainingQuota(ntype): + if not self.paused: + self.log.debug( + "Pausing request handling to satisfy request %s", + self.request) + self.paused = True + self.zk.deleteOldestUnusedNode(self.provider.name, + self.pool.name) + return + + if self.paused: + self.log.debug("Unpaused request %s", self.request) + self.paused = False + + node = zk.Node() + node.state = zk.INIT + node.type = ntype + node.provider = self.provider.name + node.pool = self.pool.name + node.launcher = self.launcher_id + node.allocated_to = self.request.id + + self.setNodeMetadata(node) + + # Note: It should be safe (i.e., no race) to lock the node + # *after* it is stored since nodes in INIT state are not + # locked anywhere. + self.zk.storeNode(node) + self.zk.lockNode(node, blocking=False) + self.log.debug("Locked building node %s for request %s", + node.id, self.request.id) + + # Set state AFTER lock so that it isn't accidentally cleaned + # up (unlocked BUILDING nodes will be deleted). + node.state = zk.BUILDING + self.zk.storeNode(node) + + self.nodeset.append(node) + thread = self.launch(node) + if thread: + thread.start() + self._threads.append(thread) + + def _runHandler(self): + ''' + Main body for the node request handling. + ''' + self._setFromPoolWorker() + + if self.provider is None or self.pool is None: + # If the config changed out from underneath us, we could now be + # an invalid provider and should stop handling this request. + raise Exception("Provider configuration missing") + + # We have the launcher_id attr after _setFromPoolWorker() is called. + self.log = logging.getLogger( + "nodepool.driver.NodeRequestHandler[%s]" % self.launcher_id) + + declined_reasons = [] + invalid_types = self._invalidNodeTypes() + if invalid_types: + declined_reasons.append('node type(s) [%s] not available' % + ','.join(invalid_types)) + elif not self._imagesAvailable(): + declined_reasons.append('images are not available') + elif (self.pool.max_servers <= 0 or + not self.hasProviderQuota(self.request.node_types)): + declined_reasons.append('it would exceed quota') + # TODO(tobiash): Maybe also calculate the quota prediction here and + # backoff for some seconds if the used quota would be exceeded? + # This way we could give another (free) provider the chance to take + # this request earlier. + + # For min-ready requests, which do not re-use READY nodes, let's + # decline if this provider is already at capacity. Otherwise, we + # could end up wedged until another request frees up a node. + if self.pool.max_servers is not None and \ + self.request.requestor == "NodePool:min-ready": + current_count = self.zk.countPoolNodes(self.provider.name, + self.pool.name) + # Use >= because dynamic config changes to max-servers can leave + # us with more than max-servers. + # TODO: handle this with the quota code + if current_count >= self.pool.max_servers: + declined_reasons.append("provider cannot satisify min-ready") + + if declined_reasons: + self.log.debug("Declining node request %s because %s", + self.request.id, ', '.join(declined_reasons)) + self.decline_request() + self.unlockNodeSet(clear_allocation=True) + + # If conditions have changed for a paused request to now cause us + # to decline it, we need to unpause so we don't keep trying it + if self.paused: + self.paused = False + + self.zk.storeNodeRequest(self.request) + self.zk.unlockNodeRequest(self.request) + self.done = True + return + + if self.paused: + self.log.debug("Retrying node request %s", self.request.id) + else: + self.log.debug("Accepting node request %s", self.request.id) + self.request.state = zk.PENDING + self.zk.storeNodeRequest(self.request) + + self._waitForNodeSet() # --------------------------------------------------------------- # Public methods @@ -236,7 +460,7 @@ class NodeRequestHandler(object): fulfilled. The node set is saved and added to between calls. ''' try: - self.run_handler() + self._runHandler() except Exception: self.log.exception( "Declining node request %s due to exception in " @@ -269,7 +493,7 @@ class NodeRequestHandler(object): if self.done: return True - if self.launch_manager and not self.launch_manager.poll(): + if not self.pollLauncher(): return False # If the request has been pulled, unallocate the node set so other @@ -291,9 +515,7 @@ class NodeRequestHandler(object): self.request.id) return True - if self.launch_manager and self.launch_manager.failed_nodes or \ - len(self.nodeset) != len(self.request.node_types) or \ - [node for node in self.nodeset if node.state != zk.READY]: + if self.failed_nodes: self.log.debug("Declining node request %s because nodes failed", self.request.id) self.decline_request() @@ -319,57 +541,10 @@ class NodeRequestHandler(object): self.zk.unlockNodeRequest(self.request) return True - @abc.abstractmethod - def run_handler(self): - pass - - -@six.add_metaclass(abc.ABCMeta) -class NodeLaunchManager(object): - ''' - Handle launching multiple nodes in parallel. - - Subclasses are required to implement the launch method. - ''' - def __init__(self, zk, pool, provider_manager, - requestor, retries): - ''' - Initialize the launch manager. - - :param ZooKeeper zk: A ZooKeeper object. - :param ProviderPool pool: A config ProviderPool object. - :param ProviderManager provider_manager: The manager object used to - interact with the selected provider. - :param str requestor: Identifier for the request originator. - :param int retries: Number of times to retry failed launches. - ''' - self._retries = retries - self._nodes = [] - self._failed_nodes = [] - self._ready_nodes = [] - self._threads = [] - self._zk = zk - self._pool = pool - self._provider_manager = provider_manager - self._requestor = requestor - - @property - def alive_thread_count(self): - count = 0 - for t in self._threads: - if t.isAlive(): - count += 1 - return count - - @property - def failed_nodes(self): - return self._failed_nodes - - @property - def ready_nodes(self): - return self._ready_nodes - - def poll(self): + # --------------------------------------------------------------- + # Driver Implementation + # --------------------------------------------------------------- + def pollLauncher(self): ''' Check if all launch requests have completed. @@ -383,14 +558,14 @@ class NodeLaunchManager(object): if self.alive_thread_count: return False - node_states = [node.state for node in self._nodes] + node_states = [node.state for node in self.nodeset] # NOTE: It very important that NodeLauncher always sets one of # these states, no matter what. if not all(s in (zk.READY, zk.FAILED) for s in node_states): return False - for node in self._nodes: + for node in self.nodeset: if node.state == zk.READY: self._ready_nodes.append(node) else: @@ -398,8 +573,52 @@ class NodeLaunchManager(object): return True + def hasProviderQuota(self, node_types): + ''' + Checks if a provider has enough quota to handle a list of nodes. + This does not take our currently existing nodes into account. + + :param node_types: list of node types to check + :return: True if the node list fits into the provider, False otherwise + ''' + return True + + def hasRemainingQuota(self, ntype): + ''' + Checks if the predicted quota is enough for an additional node of type + ntype. + + :param ntype: node type for the quota check + :return: True if there is enough quota, False otherwise + ''' + return True + + def nodeReused(self, node): + ''' + Handler may implement this to be notified when a node is re-used. + The OpenStack handler uses this to set the choozen_az. + ''' + pass + + def checkReusableNode(self, node): + ''' + Handler may implement this to verify a node can be re-used. + The OpenStack handler uses this to verify the node az is correct. + ''' + return True + + def setNodeMetadata(self, node): + ''' + Handler may implement this to store metadata before building the node. + The OpenStack handler uses this to set az, cloud and region. + ''' + pass + @abc.abstractmethod def launch(self, node): + ''' + Handler needs to implement this to launch the node. + ''' pass @@ -414,6 +633,12 @@ class ConfigValue(object): return not self.__eq__(other) +class ConfigPool(ConfigValue): + def __init__(self): + self.labels = [] + self.max_servers = math.inf + + class Driver(ConfigValue): pass diff --git a/nodepool/driver/openstack/config.py b/nodepool/driver/openstack/config.py index 7a4673875..cfdce8af6 100644 --- a/nodepool/driver/openstack/config.py +++ b/nodepool/driver/openstack/config.py @@ -20,6 +20,7 @@ import voluptuous as v from nodepool.driver import ProviderConfig from nodepool.driver import ConfigValue +from nodepool.driver import ConfigPool class ProviderDiskImage(ConfigValue): @@ -59,7 +60,7 @@ class ProviderLabel(ConfigValue): return "" % self.name -class ProviderPool(ConfigValue): +class ProviderPool(ConfigPool): def __eq__(self, other): if (other.labels != self.labels or other.max_cores != self.max_cores or diff --git a/nodepool/driver/openstack/handler.py b/nodepool/driver/openstack/handler.py index 6acf049ab..c30ea0d41 100644 --- a/nodepool/driver/openstack/handler.py +++ b/nodepool/driver/openstack/handler.py @@ -13,7 +13,6 @@ # License for the specific language governing permissions and limitations # under the License. -import collections import logging import math import pprint @@ -27,7 +26,6 @@ from nodepool import exceptions from nodepool import nodeutils as utils from nodepool import stats from nodepool import zk -from nodepool.driver import NodeLaunchManager from nodepool.driver import NodeRequestHandler from nodepool.driver.openstack.provider import QuotaInformation @@ -298,77 +296,16 @@ class NodeLauncher(threading.Thread, stats.StatsReporter): self.log.exception("Exception while reporting stats:") -class OpenStackNodeLaunchManager(NodeLaunchManager): - def launch(self, node): - ''' - Launch a new node as described by the supplied Node. - - We expect each NodeLauncher thread to directly modify the node that - is passed to it. The poll() method will expect to see the node.state - attribute to change as the node is processed. - - :param Node node: The node object. - ''' - self._nodes.append(node) - provider_label = self._pool.labels[node.type] - t = NodeLauncher(self._zk, provider_label, self._provider_manager, - self._requestor, node, self._retries) - t.start() - self._threads.append(t) - - class OpenStackNodeRequestHandler(NodeRequestHandler): def __init__(self, pw, request): - super(OpenStackNodeRequestHandler, self).__init__(pw, request) + super().__init__(pw, request) self.chosen_az = None self.log = logging.getLogger( "nodepool.driver.openstack.OpenStackNodeRequestHandler[%s]" % self.launcher_id) - def _imagesAvailable(self): - ''' - Determines if the requested images are available for this provider. - - ZooKeeper is queried for an image uploaded to the provider that is - in the READY state. - - :returns: True if it is available, False otherwise. - ''' - for label in self.request.node_types: - - if self.pool.labels[label].cloud_image: - if not self.manager.labelReady(self.pool.labels[label]): - return False - else: - if not self.zk.getMostRecentImageUpload( - self.pool.labels[label].diskimage.name, - self.provider.name): - return False - return True - - def _invalidNodeTypes(self): - ''' - Return any node types that are invalid for this provider. - - :returns: A list of node type names that are invalid, or an empty - list if all are valid. - ''' - invalid = [] - for ntype in self.request.node_types: - if ntype not in self.pool.labels: - invalid.append(ntype) - return invalid - - def _hasRemainingQuota(self, ntype): - """ - Checks if the predicted quota is enough for an additional node of type - ntype. - - :param ntype: node type for the quota check - :return: True if there is enough quota, False otherwise - """ - + def hasRemainingQuota(self, ntype): needed_quota = self.manager.quotaNeededByNodeType(ntype, self.pool) # Calculate remaining quota which is calculated as: @@ -394,14 +331,7 @@ class OpenStackNodeRequestHandler(NodeRequestHandler): return pool_quota.non_negative() - def _hasProviderQuota(self, node_types): - """ - Checks if a provider has enough quota to handle a list of nodes. - This does not take our currently existing nodes into account. - - :param node_types: list of node types to check - :return: True if the node list fits into the provider, False otherwise - """ + def hasProviderQuota(self, node_types): needed_quota = QuotaInformation() for ntype in node_types: @@ -423,194 +353,42 @@ class OpenStackNodeRequestHandler(NodeRequestHandler): pool_quota.subtract(needed_quota) return pool_quota.non_negative() - def _waitForNodeSet(self): - ''' - Fill node set for the request. - - Obtain nodes for the request, pausing all new request handling for - this provider until the node set can be filled. + def checkReusableNode(self, node): + if self.chosen_az and node.az != self.chosen_az: + return False + return True + def nodeReused(self, node): + """ We attempt to group the node set within the same provider availability - zone. For this to work properly, the provider entry in the nodepool + zone. + For this to work properly, the provider entry in the nodepool config must list the availability zones. Otherwise, new nodes will be put in random AZs at nova's whim. The exception being if there is an existing node in the READY state that we can select for this node set. Its AZ will then be used for new nodes, as well as any other READY nodes. + """ + # If we haven't already chosen an AZ, select the + # AZ from this ready node. This will cause new nodes + # to share this AZ, as well. + if not self.chosen_az and node.az: + self.chosen_az = node.az - note:: This code is a bit racey in its calculation of the number of - nodes in use for quota purposes. It is possible for multiple - launchers to be doing this calculation at the same time. Since we - currently have no locking mechanism around the "in use" - calculation, if we are at the edge of the quota, one of the - launchers could attempt to launch a new node after the other - launcher has already started doing so. This would cause an - expected failure from the underlying library, which is ok for now. - ''' - if not self.launch_manager: - self.launch_manager = OpenStackNodeLaunchManager( - self.zk, self.pool, self.manager, - self.request.requestor, retries=self.provider.launch_retries) + def setNodeMetadata(self, node): + """ + Select grouping AZ if we didn't set AZ from a selected, + pre-existing node + """ + if not self.chosen_az: + self.chosen_az = random.choice( + self.pool.azs or self.manager.getAZs()) + node.az = self.chosen_az + node.cloud = self.provider.cloud_config.name + node.region = self.provider.region_name - # Since this code can be called more than once for the same request, - # we need to calculate the difference between our current node set - # and what was requested. We cannot use set operations here since a - # node type can appear more than once in the requested types. - saved_types = collections.Counter([n.type for n in self.nodeset]) - requested_types = collections.Counter(self.request.node_types) - diff = requested_types - saved_types - needed_types = list(diff.elements()) - - ready_nodes = self.zk.getReadyNodesOfTypes(needed_types) - - for ntype in needed_types: - # First try to grab from the list of already available nodes. - got_a_node = False - if self.request.reuse and ntype in ready_nodes: - for node in ready_nodes[ntype]: - # Only interested in nodes from this provider and - # pool, and within the selected AZ. - if node.provider != self.provider.name: - continue - if node.pool != self.pool.name: - continue - if self.chosen_az and node.az != self.chosen_az: - continue - - try: - self.zk.lockNode(node, blocking=False) - except exceptions.ZKLockException: - # It's already locked so skip it. - continue - else: - if self.paused: - self.log.debug("Unpaused request %s", self.request) - self.paused = False - - self.log.debug( - "Locked existing node %s for request %s", - node.id, self.request.id) - got_a_node = True - node.allocated_to = self.request.id - self.zk.storeNode(node) - self.nodeset.append(node) - - # If we haven't already chosen an AZ, select the - # AZ from this ready node. This will cause new nodes - # to share this AZ, as well. - if not self.chosen_az and node.az: - self.chosen_az = node.az - break - - # Could not grab an existing node, so launch a new one. - if not got_a_node: - # Select grouping AZ if we didn't set AZ from a selected, - # pre-existing node - if not self.chosen_az: - self.chosen_az = random.choice( - self.pool.azs or self.manager.getAZs()) - - # If we calculate that we're at capacity, pause until nodes - # are released by Zuul and removed by the DeletedNodeWorker. - if not self._hasRemainingQuota(ntype): - if not self.paused: - self.log.debug( - "Pausing request handling to satisfy request %s", - self.request) - self.paused = True - self.zk.deleteOldestUnusedNode(self.provider.name, - self.pool.name) - return - - if self.paused: - self.log.debug("Unpaused request %s", self.request) - self.paused = False - - node = zk.Node() - node.state = zk.INIT - node.type = ntype - node.provider = self.provider.name - node.pool = self.pool.name - node.az = self.chosen_az - node.cloud = self.provider.cloud_config.name - node.region = self.provider.region_name - node.launcher = self.launcher_id - node.allocated_to = self.request.id - - # Note: It should be safe (i.e., no race) to lock the node - # *after* it is stored since nodes in INIT state are not - # locked anywhere. - self.zk.storeNode(node) - self.zk.lockNode(node, blocking=False) - self.log.debug("Locked building node %s for request %s", - node.id, self.request.id) - - # Set state AFTER lock so that it isn't accidentally cleaned - # up (unlocked BUILDING nodes will be deleted). - node.state = zk.BUILDING - self.zk.storeNode(node) - - self.nodeset.append(node) - self.launch_manager.launch(node) - - def run_handler(self): - ''' - Main body for the OpenStackNodeRequestHandler. - ''' - self._setFromPoolWorker() - - if self.provider is None or self.pool is None: - # If the config changed out from underneath us, we could now be - # an invalid provider and should stop handling this request. - raise Exception("Provider configuration missing") - - declined_reasons = [] - invalid_types = self._invalidNodeTypes() - if invalid_types: - declined_reasons.append('node type(s) [%s] not available' % - ','.join(invalid_types)) - elif not self._imagesAvailable(): - declined_reasons.append('images are not available') - elif (self.pool.max_servers <= 0 or - not self._hasProviderQuota(self.request.node_types)): - declined_reasons.append('it would exceed quota') - # TODO(tobiash): Maybe also calculate the quota prediction here and - # backoff for some seconds if the used quota would be exceeded? - # This way we could give another (free) provider the chance to take - # this request earlier. - - # For min-ready requests, which do not re-use READY nodes, let's - # decline if this provider is already at capacity. Otherwise, we - # could end up wedged until another request frees up a node. - if self.request.requestor == "NodePool:min-ready": - current_count = self.zk.countPoolNodes(self.provider.name, - self.pool.name) - # Use >= because dynamic config changes to max-servers can leave - # us with more than max-servers. - if current_count >= self.pool.max_servers: - declined_reasons.append("provider cannot satisify min-ready") - - if declined_reasons: - self.log.debug("Declining node request %s because %s", - self.request.id, ', '.join(declined_reasons)) - self.decline_request() - self.unlockNodeSet(clear_allocation=True) - - # If conditions have changed for a paused request to now cause us - # to decline it, we need to unpause so we don't keep trying it - if self.paused: - self.paused = False - - self.zk.storeNodeRequest(self.request) - self.zk.unlockNodeRequest(self.request) - self.done = True - return - - if self.paused: - self.log.debug("Retrying node request %s", self.request.id) - else: - self.log.debug("Accepting node request %s", self.request.id) - self.request.state = zk.PENDING - self.zk.storeNodeRequest(self.request) - - self._waitForNodeSet() + def launch(self, node): + return NodeLauncher( + self.zk, self.pool.labels[node.type], self.manager, + self.request.requestor, node, + self.provider.launch_retries) diff --git a/nodepool/driver/static/config.py b/nodepool/driver/static/config.py index b684d5043..5d6826e50 100644 --- a/nodepool/driver/static/config.py +++ b/nodepool/driver/static/config.py @@ -16,12 +16,12 @@ import voluptuous as v -from nodepool.driver import ConfigValue +from nodepool.driver import ConfigPool from nodepool.driver import ProviderConfig from nodepool.config import as_list -class StaticPool(ConfigValue): +class StaticPool(ConfigPool): def __eq__(self, other): if (other.labels != self.labels or other.nodes != self.nodes): diff --git a/nodepool/driver/static/handler.py b/nodepool/driver/static/handler.py index 498b6339b..e25cb75b0 100644 --- a/nodepool/driver/static/handler.py +++ b/nodepool/driver/static/handler.py @@ -15,7 +15,6 @@ import logging import random -from nodepool import exceptions from nodepool import nodeutils from nodepool import zk from nodepool.driver import NodeRequestHandler @@ -25,96 +24,41 @@ class StaticNodeRequestHandler(NodeRequestHandler): log = logging.getLogger("nodepool.driver.static." "StaticNodeRequestHandler") - def _invalidNodeTypes(self): - ''' - Return any node types that are invalid for this provider. - - :returns: A list of node type names that are invalid, or an empty - list if all are valid. - ''' - invalid = [] - for ntype in self.request.node_types: - if ntype not in self.pool.labels: - invalid.append(ntype) - return invalid - - def checkConcurrency(self, static_node): + def _checkConcurrency(self, static_node): access_count = 0 + + unavailable_states = ['in-use'] + if not self.request.reuse: + # When re-use is disabled (e.g. for Min-Ready request), we need + # to consider 'ready' node as in-use. + unavailable_states.append('ready') + for node in self.zk.nodeIterator(): if node.hostname != static_node["name"]: continue - if node.state in ('ready', 'in-use'): + if node.state in unavailable_states: access_count += 1 + if access_count >= static_node["max-parallel-jobs"]: self.log.info("%s: max concurrency reached (%d)" % ( static_node["name"], access_count)) return False return True - def _waitForNodeSet(self): - ''' - Fill node set for the request. + def launch(self, node): + static_node = None + available_nodes = self.manager.listNodes() + # Randomize static nodes order + random.shuffle(available_nodes) + for available_node in available_nodes: + if node.type in available_node["labels"]: + if self._checkConcurrency(available_node): + static_node = available_node + break - ''' - needed_types = self.request.node_types - static_nodes = [] - unavailable_nodes = [] - ready_nodes = self.zk.getReadyNodesOfTypes(needed_types) - - for ntype in needed_types: - # First try to grab from the list of already available nodes. - got_a_node = False - if self.request.reuse and ntype in ready_nodes: - for node in ready_nodes[ntype]: - # Only interested in nodes from this provider and - # pool - if node.provider != self.provider.name: - continue - if node.pool != self.pool.name: - continue - - try: - self.zk.lockNode(node, blocking=False) - except exceptions.ZKLockException: - # It's already locked so skip it. - continue - else: - if self.paused: - self.log.debug("Unpaused request %s", self.request) - self.paused = False - - self.log.debug( - "Locked existing node %s for request %s", - node.id, self.request.id) - got_a_node = True - node.allocated_to = self.request.id - self.zk.storeNode(node) - self.nodeset.append(node) - break - # Could not grab an existing node, so assign a new one. - if not got_a_node: - for node in self.available_nodes: - if ntype in node["labels"]: - max_concurrency = not self.checkConcurrency(node) - if max_concurrency: - continue - static_nodes.append((ntype, node)) - break - if max_concurrency: - unavailable_nodes.append(ntype) - - if unavailable_nodes: - self.log.debug("%s: static nodes %s are at capacity" % ( - self.request.id, unavailable_nodes)) - self.zk.storeNodeRequest(self.request) - self.zk.unlockNodeRequest(self.request) - self.done = True - return - - for node_type, static_node in static_nodes: + if static_node: self.log.debug("%s: Assigning static_node %s" % ( self.request.id, static_node)) - node = zk.Node() node.state = zk.READY node.external_id = "static-%s" % self.request.id node.hostname = static_node["name"] @@ -124,59 +68,14 @@ class StaticNodeRequestHandler(NodeRequestHandler): node.connection_type = "ssh" nodeutils.set_node_ip(node) node.host_keys = self.manager.nodes_keys[static_node["name"]] - node.provider = self.provider.name - node.pool = self.pool.name - node.launcher = self.launcher_id - node.allocated_to = self.request.id - node.type = node_type - self.nodeset.append(node) self.zk.storeNode(node) - def run_handler(self): - ''' - Main body for the StaticNodeRequestHandler. - ''' - self._setFromPoolWorker() - - # We have the launcher_id attr after _setFromPoolWorker() is called. - self.log = logging.getLogger( - "nodepool.driver.static.StaticNodeRequestHandler[%s]" % - self.launcher_id) - - declined_reasons = [] - invalid_types = self._invalidNodeTypes() - if invalid_types: - declined_reasons.append('node type(s) [%s] not available' % - ','.join(invalid_types)) - - self.available_nodes = self.manager.listNodes() - # Randomize static nodes order - random.shuffle(self.available_nodes) - - if len(self.request.node_types) > len(self.available_nodes): - declined_reasons.append('it would exceed quota') - - if declined_reasons: - self.log.debug("Declining node request %s because %s", - self.request.id, ', '.join(declined_reasons)) - self.decline_request() - self.unlockNodeSet(clear_allocation=True) - - # If conditions have changed for a paused request to now cause us - # to decline it, we need to unpause so we don't keep trying it - if self.paused: - self.paused = False - - self.zk.storeNodeRequest(self.request) - self.zk.unlockNodeRequest(self.request) - self.done = True - return - - if self.paused: - self.log.debug("Retrying node request %s", self.request.id) - else: - self.log.debug("Accepting node request %s", self.request.id) - self.request.state = zk.PENDING - self.zk.storeNodeRequest(self.request) - - self._waitForNodeSet() + def pollLauncher(self): + waiting_node = False + for node in self.nodeset: + if node.state == zk.READY: + continue + self.launch(node) + if node.state != zk.READY: + waiting_node = True + return not waiting_node diff --git a/nodepool/tests/fixtures/drivers/test/config.py b/nodepool/tests/fixtures/drivers/test/config.py index 81ddde2f5..6e0ba9dea 100644 --- a/nodepool/tests/fixtures/drivers/test/config.py +++ b/nodepool/tests/fixtures/drivers/test/config.py @@ -15,11 +15,11 @@ import math import voluptuous as v -from nodepool.driver import ConfigValue +from nodepool.driver import ConfigPool from nodepool.driver import ProviderConfig -class TestPool(ConfigValue): +class TestPool(ConfigPool): pass @@ -39,6 +39,7 @@ class TestConfig(ProviderConfig): testpool.name = pool['name'] testpool.provider = self testpool.max_servers = pool.get('max-servers', math.inf) + testpool.labels = pool['labels'] for label in pool['labels']: self.labels.add(label) newconfig.labels[label].pools.append(testpool) diff --git a/nodepool/tests/fixtures/drivers/test/handler.py b/nodepool/tests/fixtures/drivers/test/handler.py index dab0c0c2b..a9f917753 100644 --- a/nodepool/tests/fixtures/drivers/test/handler.py +++ b/nodepool/tests/fixtures/drivers/test/handler.py @@ -21,14 +21,7 @@ from nodepool.driver import NodeRequestHandler class TestHandler(NodeRequestHandler): log = logging.getLogger("nodepool.driver.test.TestHandler") - def run_handler(self): - self._setFromPoolWorker() - node = zk.Node() + def launch(self, node): node.state = zk.READY node.external_id = "test-%s" % self.request.id - node.provider = self.provider.name - node.launcher = self.launcher_id - node.allocated_to = self.request.id - node.type = self.request.node_types[0] - self.nodeset.append(node) self.zk.storeNode(node) diff --git a/nodepool/tests/test_launcher.py b/nodepool/tests/test_launcher.py index 0af3427bb..199c7afe5 100644 --- a/nodepool/tests/test_launcher.py +++ b/nodepool/tests/test_launcher.py @@ -1188,7 +1188,7 @@ class TestLauncher(tests.DBTestCase): provider2_second.state = zk.DELETING self.zk.storeNode(provider2_second) - # Set provider1 run_handler to throw exception to simulate a + # Set provider1 runHandler to throw exception to simulate a # broken cloud. Note the pool worker instantiates request handlers on # demand which is why we have a somewhat convoluted monkey patch here. # We must patch deep enough in the request handler that @@ -1199,7 +1199,7 @@ class TestLauncher(tests.DBTestCase): def raise_KeyError(node): raise KeyError('fake-provider') - request_handler.launch_manager.launch = raise_KeyError + request_handler.launch = raise_KeyError # Delete instance in fake-provider. This should cause provider2 # to service the request that was held pending by fake-provider. diff --git a/nodepool/tests/test_nodelaunchmanager.py b/nodepool/tests/test_nodelaunchmanager.py index fd4b3b8f4..211a19c8c 100644 --- a/nodepool/tests/test_nodelaunchmanager.py +++ b/nodepool/tests/test_nodelaunchmanager.py @@ -21,7 +21,7 @@ from nodepool import builder from nodepool import provider_manager from nodepool import tests from nodepool import zk -from nodepool.driver.openstack.handler import OpenStackNodeLaunchManager +from nodepool.driver.openstack.handler import OpenStackNodeRequestHandler class TestNodeLaunchManager(tests.DBTestCase): @@ -47,21 +47,45 @@ class TestNodeLaunchManager(tests.DBTestCase): self.provider, False) self.pmanager.resetClient() + def _createHandler(self, retries=1): + # Mock NodeRequest handler object + class FakePoolWorker: + launcher_id = 'Test' + + class FakeRequest: + requestor = 'zuul' + + class FakeProvider: + launch_retries = retries + + handler = OpenStackNodeRequestHandler(FakePoolWorker(), FakeRequest()) + handler.zk = self.zk + handler.pool = self.provider_pool + handler.manager = self.pmanager + handler.provider = FakeProvider() + return handler + + def _launch(self, handler, node): + # Mock NodeRequest runHandler method + thread = handler.launch(node) + thread.start() + handler._threads.append(thread) + handler.nodeset.append(node) + def test_successful_launch(self): configfile = self.setup_config('node.yaml') self._setup(configfile) + handler = self._createHandler(1) n1 = zk.Node() n1.state = zk.BUILDING n1.type = 'fake-label' - mgr = OpenStackNodeLaunchManager(self.zk, self.provider_pool, - self.pmanager, 'zuul', 1) - mgr.launch(n1) - while not mgr.poll(): + self._launch(handler, n1) + while not handler.pollLauncher(): time.sleep(0) - self.assertEqual(len(mgr.ready_nodes), 1) - self.assertEqual(len(mgr.failed_nodes), 0) - nodes = mgr._provider_manager.listNodes() + self.assertEqual(len(handler.ready_nodes), 1) + self.assertEqual(len(handler.failed_nodes), 0) + nodes = handler.manager.listNodes() self.assertEqual(nodes[0]['metadata']['groups'], 'fake-provider,fake-image,fake-label') @@ -69,23 +93,23 @@ class TestNodeLaunchManager(tests.DBTestCase): def test_failed_launch(self, mock_launch): configfile = self.setup_config('node.yaml') self._setup(configfile) + handler = self._createHandler(1) mock_launch.side_effect = Exception() n1 = zk.Node() n1.state = zk.BUILDING n1.type = 'fake-label' - mgr = OpenStackNodeLaunchManager(self.zk, self.provider_pool, - self.pmanager, 'zuul', 1) - mgr.launch(n1) - while not mgr.poll(): + self._launch(handler, n1) + while not handler.pollLauncher(): time.sleep(0) - self.assertEqual(len(mgr.failed_nodes), 1) - self.assertEqual(len(mgr.ready_nodes), 0) + self.assertEqual(len(handler.failed_nodes), 1) + self.assertEqual(len(handler.ready_nodes), 0) @mock.patch('nodepool.driver.openstack.handler.NodeLauncher._launchNode') def test_mixed_launch(self, mock_launch): configfile = self.setup_config('node.yaml') self._setup(configfile) + handler = self._createHandler(1) mock_launch.side_effect = [None, Exception()] n1 = zk.Node() @@ -94,11 +118,9 @@ class TestNodeLaunchManager(tests.DBTestCase): n2 = zk.Node() n2.state = zk.BUILDING n2.type = 'fake-label' - mgr = OpenStackNodeLaunchManager(self.zk, self.provider_pool, - self.pmanager, 'zuul', 1) - mgr.launch(n1) - mgr.launch(n2) - while not mgr.poll(): + self._launch(handler, n1) + self._launch(handler, n2) + while not handler.pollLauncher(): time.sleep(0) - self.assertEqual(len(mgr.failed_nodes), 1) - self.assertEqual(len(mgr.ready_nodes), 1) + self.assertEqual(len(handler._failed_nodes), 1) + self.assertEqual(len(handler._ready_nodes), 1)