Merge "Refactor run_handler to be generic"
This commit is contained in:
commit
20c6646bf0
|
@ -16,9 +16,11 @@
|
|||
# limitations under the License.
|
||||
|
||||
import abc
|
||||
import collections
|
||||
import inspect
|
||||
import importlib
|
||||
import logging
|
||||
import math
|
||||
import os
|
||||
|
||||
import six
|
||||
|
@ -143,13 +145,12 @@ class Provider(object):
|
|||
@six.add_metaclass(abc.ABCMeta)
|
||||
class NodeRequestHandler(object):
|
||||
'''
|
||||
Class to process a single node request.
|
||||
Class to process a single nodeset request.
|
||||
|
||||
The PoolWorker thread will instantiate a class of this type for each
|
||||
node request that it pulls from ZooKeeper.
|
||||
|
||||
Subclasses are required to implement the run_handler method and the
|
||||
NodeLaunchManager to kick off any threads needed to satisfy the request.
|
||||
Subclasses are required to implement the launch method.
|
||||
'''
|
||||
|
||||
def __init__(self, pw, request):
|
||||
|
@ -159,12 +160,15 @@ class NodeRequestHandler(object):
|
|||
'''
|
||||
self.pw = pw
|
||||
self.request = request
|
||||
self.launch_manager = None
|
||||
self.nodeset = []
|
||||
self.done = False
|
||||
self.paused = False
|
||||
self.launcher_id = self.pw.launcher_id
|
||||
|
||||
self._failed_nodes = []
|
||||
self._ready_nodes = []
|
||||
self._threads = []
|
||||
|
||||
def _setFromPoolWorker(self):
|
||||
'''
|
||||
Set values that we pull from the parent PoolWorker.
|
||||
|
@ -179,9 +183,229 @@ class NodeRequestHandler(object):
|
|||
|
||||
@property
|
||||
def alive_thread_count(self):
|
||||
if not self.launch_manager:
|
||||
return 0
|
||||
return self.launch_manager.alive_thread_count
|
||||
count = 0
|
||||
for t in self._threads:
|
||||
if t.isAlive():
|
||||
count += 1
|
||||
return count
|
||||
|
||||
@property
|
||||
def failed_nodes(self):
|
||||
return self._failed_nodes
|
||||
|
||||
@property
|
||||
def ready_nodes(self):
|
||||
return self._ready_nodes
|
||||
|
||||
def _imagesAvailable(self):
|
||||
'''
|
||||
Determines if the requested images are available for this provider.
|
||||
|
||||
ZooKeeper is queried for an image uploaded to the provider that is
|
||||
in the READY state.
|
||||
|
||||
:returns: True if it is available, False otherwise.
|
||||
'''
|
||||
if self.provider.driver.manage_images:
|
||||
for label in self.request.node_types:
|
||||
if self.pool.labels[label].cloud_image:
|
||||
if not self.manager.labelReady(self.pool.labels[label]):
|
||||
return False
|
||||
else:
|
||||
if not self.zk.getMostRecentImageUpload(
|
||||
self.pool.labels[label].diskimage.name,
|
||||
self.provider.name):
|
||||
return False
|
||||
return True
|
||||
|
||||
def _invalidNodeTypes(self):
|
||||
'''
|
||||
Return any node types that are invalid for this provider.
|
||||
|
||||
:returns: A list of node type names that are invalid, or an empty
|
||||
list if all are valid.
|
||||
'''
|
||||
invalid = []
|
||||
for ntype in self.request.node_types:
|
||||
if ntype not in self.pool.labels:
|
||||
invalid.append(ntype)
|
||||
return invalid
|
||||
|
||||
def _waitForNodeSet(self):
|
||||
'''
|
||||
Fill node set for the request.
|
||||
|
||||
Obtain nodes for the request, pausing all new request handling for
|
||||
this provider until the node set can be filled.
|
||||
|
||||
note:: This code is a bit racey in its calculation of the number of
|
||||
nodes in use for quota purposes. It is possible for multiple
|
||||
launchers to be doing this calculation at the same time. Since we
|
||||
currently have no locking mechanism around the "in use"
|
||||
calculation, if we are at the edge of the quota, one of the
|
||||
launchers could attempt to launch a new node after the other
|
||||
launcher has already started doing so. This would cause an
|
||||
expected failure from the underlying library, which is ok for now.
|
||||
'''
|
||||
# Since this code can be called more than once for the same request,
|
||||
# we need to calculate the difference between our current node set
|
||||
# and what was requested. We cannot use set operations here since a
|
||||
# node type can appear more than once in the requested types.
|
||||
saved_types = collections.Counter([n.type for n in self.nodeset])
|
||||
requested_types = collections.Counter(self.request.node_types)
|
||||
diff = requested_types - saved_types
|
||||
needed_types = list(diff.elements())
|
||||
|
||||
ready_nodes = self.zk.getReadyNodesOfTypes(needed_types)
|
||||
|
||||
for ntype in needed_types:
|
||||
# First try to grab from the list of already available nodes.
|
||||
got_a_node = False
|
||||
if self.request.reuse and ntype in ready_nodes:
|
||||
for node in ready_nodes[ntype]:
|
||||
# Only interested in nodes from this provider and pool
|
||||
if node.provider != self.provider.name:
|
||||
continue
|
||||
if node.pool != self.pool.name:
|
||||
continue
|
||||
# Check this driver reuse requirements
|
||||
if not self.checkReusableNode(node):
|
||||
continue
|
||||
try:
|
||||
self.zk.lockNode(node, blocking=False)
|
||||
except exceptions.ZKLockException:
|
||||
# It's already locked so skip it.
|
||||
continue
|
||||
else:
|
||||
if self.paused:
|
||||
self.log.debug("Unpaused request %s", self.request)
|
||||
self.paused = False
|
||||
|
||||
self.log.debug(
|
||||
"Locked existing node %s for request %s",
|
||||
node.id, self.request.id)
|
||||
got_a_node = True
|
||||
node.allocated_to = self.request.id
|
||||
self.zk.storeNode(node)
|
||||
self.nodeset.append(node)
|
||||
# Notify driver handler about node re-use
|
||||
self.nodeReused(node)
|
||||
break
|
||||
|
||||
# Could not grab an existing node, so launch a new one.
|
||||
if not got_a_node:
|
||||
# If we calculate that we're at capacity, pause until nodes
|
||||
# are released by Zuul and removed by the DeletedNodeWorker.
|
||||
if not self.hasRemainingQuota(ntype):
|
||||
if not self.paused:
|
||||
self.log.debug(
|
||||
"Pausing request handling to satisfy request %s",
|
||||
self.request)
|
||||
self.paused = True
|
||||
self.zk.deleteOldestUnusedNode(self.provider.name,
|
||||
self.pool.name)
|
||||
return
|
||||
|
||||
if self.paused:
|
||||
self.log.debug("Unpaused request %s", self.request)
|
||||
self.paused = False
|
||||
|
||||
node = zk.Node()
|
||||
node.state = zk.INIT
|
||||
node.type = ntype
|
||||
node.provider = self.provider.name
|
||||
node.pool = self.pool.name
|
||||
node.launcher = self.launcher_id
|
||||
node.allocated_to = self.request.id
|
||||
|
||||
self.setNodeMetadata(node)
|
||||
|
||||
# Note: It should be safe (i.e., no race) to lock the node
|
||||
# *after* it is stored since nodes in INIT state are not
|
||||
# locked anywhere.
|
||||
self.zk.storeNode(node)
|
||||
self.zk.lockNode(node, blocking=False)
|
||||
self.log.debug("Locked building node %s for request %s",
|
||||
node.id, self.request.id)
|
||||
|
||||
# Set state AFTER lock so that it isn't accidentally cleaned
|
||||
# up (unlocked BUILDING nodes will be deleted).
|
||||
node.state = zk.BUILDING
|
||||
self.zk.storeNode(node)
|
||||
|
||||
self.nodeset.append(node)
|
||||
thread = self.launch(node)
|
||||
if thread:
|
||||
thread.start()
|
||||
self._threads.append(thread)
|
||||
|
||||
def _runHandler(self):
|
||||
'''
|
||||
Main body for the node request handling.
|
||||
'''
|
||||
self._setFromPoolWorker()
|
||||
|
||||
if self.provider is None or self.pool is None:
|
||||
# If the config changed out from underneath us, we could now be
|
||||
# an invalid provider and should stop handling this request.
|
||||
raise Exception("Provider configuration missing")
|
||||
|
||||
# We have the launcher_id attr after _setFromPoolWorker() is called.
|
||||
self.log = logging.getLogger(
|
||||
"nodepool.driver.NodeRequestHandler[%s]" % self.launcher_id)
|
||||
|
||||
declined_reasons = []
|
||||
invalid_types = self._invalidNodeTypes()
|
||||
if invalid_types:
|
||||
declined_reasons.append('node type(s) [%s] not available' %
|
||||
','.join(invalid_types))
|
||||
elif not self._imagesAvailable():
|
||||
declined_reasons.append('images are not available')
|
||||
elif (self.pool.max_servers <= 0 or
|
||||
not self.hasProviderQuota(self.request.node_types)):
|
||||
declined_reasons.append('it would exceed quota')
|
||||
# TODO(tobiash): Maybe also calculate the quota prediction here and
|
||||
# backoff for some seconds if the used quota would be exceeded?
|
||||
# This way we could give another (free) provider the chance to take
|
||||
# this request earlier.
|
||||
|
||||
# For min-ready requests, which do not re-use READY nodes, let's
|
||||
# decline if this provider is already at capacity. Otherwise, we
|
||||
# could end up wedged until another request frees up a node.
|
||||
if self.pool.max_servers is not None and \
|
||||
self.request.requestor == "NodePool:min-ready":
|
||||
current_count = self.zk.countPoolNodes(self.provider.name,
|
||||
self.pool.name)
|
||||
# Use >= because dynamic config changes to max-servers can leave
|
||||
# us with more than max-servers.
|
||||
# TODO: handle this with the quota code
|
||||
if current_count >= self.pool.max_servers:
|
||||
declined_reasons.append("provider cannot satisify min-ready")
|
||||
|
||||
if declined_reasons:
|
||||
self.log.debug("Declining node request %s because %s",
|
||||
self.request.id, ', '.join(declined_reasons))
|
||||
self.decline_request()
|
||||
self.unlockNodeSet(clear_allocation=True)
|
||||
|
||||
# If conditions have changed for a paused request to now cause us
|
||||
# to decline it, we need to unpause so we don't keep trying it
|
||||
if self.paused:
|
||||
self.paused = False
|
||||
|
||||
self.zk.storeNodeRequest(self.request)
|
||||
self.zk.unlockNodeRequest(self.request)
|
||||
self.done = True
|
||||
return
|
||||
|
||||
if self.paused:
|
||||
self.log.debug("Retrying node request %s", self.request.id)
|
||||
else:
|
||||
self.log.debug("Accepting node request %s", self.request.id)
|
||||
self.request.state = zk.PENDING
|
||||
self.zk.storeNodeRequest(self.request)
|
||||
|
||||
self._waitForNodeSet()
|
||||
|
||||
# ---------------------------------------------------------------
|
||||
# Public methods
|
||||
|
@ -236,7 +460,7 @@ class NodeRequestHandler(object):
|
|||
fulfilled. The node set is saved and added to between calls.
|
||||
'''
|
||||
try:
|
||||
self.run_handler()
|
||||
self._runHandler()
|
||||
except Exception:
|
||||
self.log.exception(
|
||||
"Declining node request %s due to exception in "
|
||||
|
@ -269,7 +493,7 @@ class NodeRequestHandler(object):
|
|||
if self.done:
|
||||
return True
|
||||
|
||||
if self.launch_manager and not self.launch_manager.poll():
|
||||
if not self.pollLauncher():
|
||||
return False
|
||||
|
||||
# If the request has been pulled, unallocate the node set so other
|
||||
|
@ -291,9 +515,7 @@ class NodeRequestHandler(object):
|
|||
self.request.id)
|
||||
return True
|
||||
|
||||
if self.launch_manager and self.launch_manager.failed_nodes or \
|
||||
len(self.nodeset) != len(self.request.node_types) or \
|
||||
[node for node in self.nodeset if node.state != zk.READY]:
|
||||
if self.failed_nodes:
|
||||
self.log.debug("Declining node request %s because nodes failed",
|
||||
self.request.id)
|
||||
self.decline_request()
|
||||
|
@ -319,57 +541,10 @@ class NodeRequestHandler(object):
|
|||
self.zk.unlockNodeRequest(self.request)
|
||||
return True
|
||||
|
||||
@abc.abstractmethod
|
||||
def run_handler(self):
|
||||
pass
|
||||
|
||||
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class NodeLaunchManager(object):
|
||||
'''
|
||||
Handle launching multiple nodes in parallel.
|
||||
|
||||
Subclasses are required to implement the launch method.
|
||||
'''
|
||||
def __init__(self, zk, pool, provider_manager,
|
||||
requestor, retries):
|
||||
'''
|
||||
Initialize the launch manager.
|
||||
|
||||
:param ZooKeeper zk: A ZooKeeper object.
|
||||
:param ProviderPool pool: A config ProviderPool object.
|
||||
:param ProviderManager provider_manager: The manager object used to
|
||||
interact with the selected provider.
|
||||
:param str requestor: Identifier for the request originator.
|
||||
:param int retries: Number of times to retry failed launches.
|
||||
'''
|
||||
self._retries = retries
|
||||
self._nodes = []
|
||||
self._failed_nodes = []
|
||||
self._ready_nodes = []
|
||||
self._threads = []
|
||||
self._zk = zk
|
||||
self._pool = pool
|
||||
self._provider_manager = provider_manager
|
||||
self._requestor = requestor
|
||||
|
||||
@property
|
||||
def alive_thread_count(self):
|
||||
count = 0
|
||||
for t in self._threads:
|
||||
if t.isAlive():
|
||||
count += 1
|
||||
return count
|
||||
|
||||
@property
|
||||
def failed_nodes(self):
|
||||
return self._failed_nodes
|
||||
|
||||
@property
|
||||
def ready_nodes(self):
|
||||
return self._ready_nodes
|
||||
|
||||
def poll(self):
|
||||
# ---------------------------------------------------------------
|
||||
# Driver Implementation
|
||||
# ---------------------------------------------------------------
|
||||
def pollLauncher(self):
|
||||
'''
|
||||
Check if all launch requests have completed.
|
||||
|
||||
|
@ -383,14 +558,14 @@ class NodeLaunchManager(object):
|
|||
if self.alive_thread_count:
|
||||
return False
|
||||
|
||||
node_states = [node.state for node in self._nodes]
|
||||
node_states = [node.state for node in self.nodeset]
|
||||
|
||||
# NOTE: It very important that NodeLauncher always sets one of
|
||||
# these states, no matter what.
|
||||
if not all(s in (zk.READY, zk.FAILED) for s in node_states):
|
||||
return False
|
||||
|
||||
for node in self._nodes:
|
||||
for node in self.nodeset:
|
||||
if node.state == zk.READY:
|
||||
self._ready_nodes.append(node)
|
||||
else:
|
||||
|
@ -398,8 +573,52 @@ class NodeLaunchManager(object):
|
|||
|
||||
return True
|
||||
|
||||
def hasProviderQuota(self, node_types):
|
||||
'''
|
||||
Checks if a provider has enough quota to handle a list of nodes.
|
||||
This does not take our currently existing nodes into account.
|
||||
|
||||
:param node_types: list of node types to check
|
||||
:return: True if the node list fits into the provider, False otherwise
|
||||
'''
|
||||
return True
|
||||
|
||||
def hasRemainingQuota(self, ntype):
|
||||
'''
|
||||
Checks if the predicted quota is enough for an additional node of type
|
||||
ntype.
|
||||
|
||||
:param ntype: node type for the quota check
|
||||
:return: True if there is enough quota, False otherwise
|
||||
'''
|
||||
return True
|
||||
|
||||
def nodeReused(self, node):
|
||||
'''
|
||||
Handler may implement this to be notified when a node is re-used.
|
||||
The OpenStack handler uses this to set the choozen_az.
|
||||
'''
|
||||
pass
|
||||
|
||||
def checkReusableNode(self, node):
|
||||
'''
|
||||
Handler may implement this to verify a node can be re-used.
|
||||
The OpenStack handler uses this to verify the node az is correct.
|
||||
'''
|
||||
return True
|
||||
|
||||
def setNodeMetadata(self, node):
|
||||
'''
|
||||
Handler may implement this to store metadata before building the node.
|
||||
The OpenStack handler uses this to set az, cloud and region.
|
||||
'''
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def launch(self, node):
|
||||
'''
|
||||
Handler needs to implement this to launch the node.
|
||||
'''
|
||||
pass
|
||||
|
||||
|
||||
|
@ -414,6 +633,12 @@ class ConfigValue(object):
|
|||
return not self.__eq__(other)
|
||||
|
||||
|
||||
class ConfigPool(ConfigValue):
|
||||
def __init__(self):
|
||||
self.labels = []
|
||||
self.max_servers = math.inf
|
||||
|
||||
|
||||
class Driver(ConfigValue):
|
||||
pass
|
||||
|
||||
|
|
|
@ -20,6 +20,7 @@ import voluptuous as v
|
|||
|
||||
from nodepool.driver import ProviderConfig
|
||||
from nodepool.driver import ConfigValue
|
||||
from nodepool.driver import ConfigPool
|
||||
|
||||
|
||||
class ProviderDiskImage(ConfigValue):
|
||||
|
@ -59,7 +60,7 @@ class ProviderLabel(ConfigValue):
|
|||
return "<ProviderLabel %s>" % self.name
|
||||
|
||||
|
||||
class ProviderPool(ConfigValue):
|
||||
class ProviderPool(ConfigPool):
|
||||
def __eq__(self, other):
|
||||
if (other.labels != self.labels or
|
||||
other.max_cores != self.max_cores or
|
||||
|
|
|
@ -13,7 +13,6 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import collections
|
||||
import logging
|
||||
import math
|
||||
import pprint
|
||||
|
@ -27,7 +26,6 @@ from nodepool import exceptions
|
|||
from nodepool import nodeutils as utils
|
||||
from nodepool import stats
|
||||
from nodepool import zk
|
||||
from nodepool.driver import NodeLaunchManager
|
||||
from nodepool.driver import NodeRequestHandler
|
||||
from nodepool.driver.openstack.provider import QuotaInformation
|
||||
|
||||
|
@ -298,77 +296,16 @@ class NodeLauncher(threading.Thread, stats.StatsReporter):
|
|||
self.log.exception("Exception while reporting stats:")
|
||||
|
||||
|
||||
class OpenStackNodeLaunchManager(NodeLaunchManager):
|
||||
def launch(self, node):
|
||||
'''
|
||||
Launch a new node as described by the supplied Node.
|
||||
|
||||
We expect each NodeLauncher thread to directly modify the node that
|
||||
is passed to it. The poll() method will expect to see the node.state
|
||||
attribute to change as the node is processed.
|
||||
|
||||
:param Node node: The node object.
|
||||
'''
|
||||
self._nodes.append(node)
|
||||
provider_label = self._pool.labels[node.type]
|
||||
t = NodeLauncher(self._zk, provider_label, self._provider_manager,
|
||||
self._requestor, node, self._retries)
|
||||
t.start()
|
||||
self._threads.append(t)
|
||||
|
||||
|
||||
class OpenStackNodeRequestHandler(NodeRequestHandler):
|
||||
|
||||
def __init__(self, pw, request):
|
||||
super(OpenStackNodeRequestHandler, self).__init__(pw, request)
|
||||
super().__init__(pw, request)
|
||||
self.chosen_az = None
|
||||
self.log = logging.getLogger(
|
||||
"nodepool.driver.openstack.OpenStackNodeRequestHandler[%s]" %
|
||||
self.launcher_id)
|
||||
|
||||
def _imagesAvailable(self):
|
||||
'''
|
||||
Determines if the requested images are available for this provider.
|
||||
|
||||
ZooKeeper is queried for an image uploaded to the provider that is
|
||||
in the READY state.
|
||||
|
||||
:returns: True if it is available, False otherwise.
|
||||
'''
|
||||
for label in self.request.node_types:
|
||||
|
||||
if self.pool.labels[label].cloud_image:
|
||||
if not self.manager.labelReady(self.pool.labels[label]):
|
||||
return False
|
||||
else:
|
||||
if not self.zk.getMostRecentImageUpload(
|
||||
self.pool.labels[label].diskimage.name,
|
||||
self.provider.name):
|
||||
return False
|
||||
return True
|
||||
|
||||
def _invalidNodeTypes(self):
|
||||
'''
|
||||
Return any node types that are invalid for this provider.
|
||||
|
||||
:returns: A list of node type names that are invalid, or an empty
|
||||
list if all are valid.
|
||||
'''
|
||||
invalid = []
|
||||
for ntype in self.request.node_types:
|
||||
if ntype not in self.pool.labels:
|
||||
invalid.append(ntype)
|
||||
return invalid
|
||||
|
||||
def _hasRemainingQuota(self, ntype):
|
||||
"""
|
||||
Checks if the predicted quota is enough for an additional node of type
|
||||
ntype.
|
||||
|
||||
:param ntype: node type for the quota check
|
||||
:return: True if there is enough quota, False otherwise
|
||||
"""
|
||||
|
||||
def hasRemainingQuota(self, ntype):
|
||||
needed_quota = self.manager.quotaNeededByNodeType(ntype, self.pool)
|
||||
|
||||
# Calculate remaining quota which is calculated as:
|
||||
|
@ -394,14 +331,7 @@ class OpenStackNodeRequestHandler(NodeRequestHandler):
|
|||
|
||||
return pool_quota.non_negative()
|
||||
|
||||
def _hasProviderQuota(self, node_types):
|
||||
"""
|
||||
Checks if a provider has enough quota to handle a list of nodes.
|
||||
This does not take our currently existing nodes into account.
|
||||
|
||||
:param node_types: list of node types to check
|
||||
:return: True if the node list fits into the provider, False otherwise
|
||||
"""
|
||||
def hasProviderQuota(self, node_types):
|
||||
needed_quota = QuotaInformation()
|
||||
|
||||
for ntype in node_types:
|
||||
|
@ -423,194 +353,42 @@ class OpenStackNodeRequestHandler(NodeRequestHandler):
|
|||
pool_quota.subtract(needed_quota)
|
||||
return pool_quota.non_negative()
|
||||
|
||||
def _waitForNodeSet(self):
|
||||
'''
|
||||
Fill node set for the request.
|
||||
|
||||
Obtain nodes for the request, pausing all new request handling for
|
||||
this provider until the node set can be filled.
|
||||
def checkReusableNode(self, node):
|
||||
if self.chosen_az and node.az != self.chosen_az:
|
||||
return False
|
||||
return True
|
||||
|
||||
def nodeReused(self, node):
|
||||
"""
|
||||
We attempt to group the node set within the same provider availability
|
||||
zone. For this to work properly, the provider entry in the nodepool
|
||||
zone.
|
||||
For this to work properly, the provider entry in the nodepool
|
||||
config must list the availability zones. Otherwise, new nodes will be
|
||||
put in random AZs at nova's whim. The exception being if there is an
|
||||
existing node in the READY state that we can select for this node set.
|
||||
Its AZ will then be used for new nodes, as well as any other READY
|
||||
nodes.
|
||||
"""
|
||||
# If we haven't already chosen an AZ, select the
|
||||
# AZ from this ready node. This will cause new nodes
|
||||
# to share this AZ, as well.
|
||||
if not self.chosen_az and node.az:
|
||||
self.chosen_az = node.az
|
||||
|
||||
note:: This code is a bit racey in its calculation of the number of
|
||||
nodes in use for quota purposes. It is possible for multiple
|
||||
launchers to be doing this calculation at the same time. Since we
|
||||
currently have no locking mechanism around the "in use"
|
||||
calculation, if we are at the edge of the quota, one of the
|
||||
launchers could attempt to launch a new node after the other
|
||||
launcher has already started doing so. This would cause an
|
||||
expected failure from the underlying library, which is ok for now.
|
||||
'''
|
||||
if not self.launch_manager:
|
||||
self.launch_manager = OpenStackNodeLaunchManager(
|
||||
self.zk, self.pool, self.manager,
|
||||
self.request.requestor, retries=self.provider.launch_retries)
|
||||
def setNodeMetadata(self, node):
|
||||
"""
|
||||
Select grouping AZ if we didn't set AZ from a selected,
|
||||
pre-existing node
|
||||
"""
|
||||
if not self.chosen_az:
|
||||
self.chosen_az = random.choice(
|
||||
self.pool.azs or self.manager.getAZs())
|
||||
node.az = self.chosen_az
|
||||
node.cloud = self.provider.cloud_config.name
|
||||
node.region = self.provider.region_name
|
||||
|
||||
# Since this code can be called more than once for the same request,
|
||||
# we need to calculate the difference between our current node set
|
||||
# and what was requested. We cannot use set operations here since a
|
||||
# node type can appear more than once in the requested types.
|
||||
saved_types = collections.Counter([n.type for n in self.nodeset])
|
||||
requested_types = collections.Counter(self.request.node_types)
|
||||
diff = requested_types - saved_types
|
||||
needed_types = list(diff.elements())
|
||||
|
||||
ready_nodes = self.zk.getReadyNodesOfTypes(needed_types)
|
||||
|
||||
for ntype in needed_types:
|
||||
# First try to grab from the list of already available nodes.
|
||||
got_a_node = False
|
||||
if self.request.reuse and ntype in ready_nodes:
|
||||
for node in ready_nodes[ntype]:
|
||||
# Only interested in nodes from this provider and
|
||||
# pool, and within the selected AZ.
|
||||
if node.provider != self.provider.name:
|
||||
continue
|
||||
if node.pool != self.pool.name:
|
||||
continue
|
||||
if self.chosen_az and node.az != self.chosen_az:
|
||||
continue
|
||||
|
||||
try:
|
||||
self.zk.lockNode(node, blocking=False)
|
||||
except exceptions.ZKLockException:
|
||||
# It's already locked so skip it.
|
||||
continue
|
||||
else:
|
||||
if self.paused:
|
||||
self.log.debug("Unpaused request %s", self.request)
|
||||
self.paused = False
|
||||
|
||||
self.log.debug(
|
||||
"Locked existing node %s for request %s",
|
||||
node.id, self.request.id)
|
||||
got_a_node = True
|
||||
node.allocated_to = self.request.id
|
||||
self.zk.storeNode(node)
|
||||
self.nodeset.append(node)
|
||||
|
||||
# If we haven't already chosen an AZ, select the
|
||||
# AZ from this ready node. This will cause new nodes
|
||||
# to share this AZ, as well.
|
||||
if not self.chosen_az and node.az:
|
||||
self.chosen_az = node.az
|
||||
break
|
||||
|
||||
# Could not grab an existing node, so launch a new one.
|
||||
if not got_a_node:
|
||||
# Select grouping AZ if we didn't set AZ from a selected,
|
||||
# pre-existing node
|
||||
if not self.chosen_az:
|
||||
self.chosen_az = random.choice(
|
||||
self.pool.azs or self.manager.getAZs())
|
||||
|
||||
# If we calculate that we're at capacity, pause until nodes
|
||||
# are released by Zuul and removed by the DeletedNodeWorker.
|
||||
if not self._hasRemainingQuota(ntype):
|
||||
if not self.paused:
|
||||
self.log.debug(
|
||||
"Pausing request handling to satisfy request %s",
|
||||
self.request)
|
||||
self.paused = True
|
||||
self.zk.deleteOldestUnusedNode(self.provider.name,
|
||||
self.pool.name)
|
||||
return
|
||||
|
||||
if self.paused:
|
||||
self.log.debug("Unpaused request %s", self.request)
|
||||
self.paused = False
|
||||
|
||||
node = zk.Node()
|
||||
node.state = zk.INIT
|
||||
node.type = ntype
|
||||
node.provider = self.provider.name
|
||||
node.pool = self.pool.name
|
||||
node.az = self.chosen_az
|
||||
node.cloud = self.provider.cloud_config.name
|
||||
node.region = self.provider.region_name
|
||||
node.launcher = self.launcher_id
|
||||
node.allocated_to = self.request.id
|
||||
|
||||
# Note: It should be safe (i.e., no race) to lock the node
|
||||
# *after* it is stored since nodes in INIT state are not
|
||||
# locked anywhere.
|
||||
self.zk.storeNode(node)
|
||||
self.zk.lockNode(node, blocking=False)
|
||||
self.log.debug("Locked building node %s for request %s",
|
||||
node.id, self.request.id)
|
||||
|
||||
# Set state AFTER lock so that it isn't accidentally cleaned
|
||||
# up (unlocked BUILDING nodes will be deleted).
|
||||
node.state = zk.BUILDING
|
||||
self.zk.storeNode(node)
|
||||
|
||||
self.nodeset.append(node)
|
||||
self.launch_manager.launch(node)
|
||||
|
||||
def run_handler(self):
|
||||
'''
|
||||
Main body for the OpenStackNodeRequestHandler.
|
||||
'''
|
||||
self._setFromPoolWorker()
|
||||
|
||||
if self.provider is None or self.pool is None:
|
||||
# If the config changed out from underneath us, we could now be
|
||||
# an invalid provider and should stop handling this request.
|
||||
raise Exception("Provider configuration missing")
|
||||
|
||||
declined_reasons = []
|
||||
invalid_types = self._invalidNodeTypes()
|
||||
if invalid_types:
|
||||
declined_reasons.append('node type(s) [%s] not available' %
|
||||
','.join(invalid_types))
|
||||
elif not self._imagesAvailable():
|
||||
declined_reasons.append('images are not available')
|
||||
elif (self.pool.max_servers <= 0 or
|
||||
not self._hasProviderQuota(self.request.node_types)):
|
||||
declined_reasons.append('it would exceed quota')
|
||||
# TODO(tobiash): Maybe also calculate the quota prediction here and
|
||||
# backoff for some seconds if the used quota would be exceeded?
|
||||
# This way we could give another (free) provider the chance to take
|
||||
# this request earlier.
|
||||
|
||||
# For min-ready requests, which do not re-use READY nodes, let's
|
||||
# decline if this provider is already at capacity. Otherwise, we
|
||||
# could end up wedged until another request frees up a node.
|
||||
if self.request.requestor == "NodePool:min-ready":
|
||||
current_count = self.zk.countPoolNodes(self.provider.name,
|
||||
self.pool.name)
|
||||
# Use >= because dynamic config changes to max-servers can leave
|
||||
# us with more than max-servers.
|
||||
if current_count >= self.pool.max_servers:
|
||||
declined_reasons.append("provider cannot satisify min-ready")
|
||||
|
||||
if declined_reasons:
|
||||
self.log.debug("Declining node request %s because %s",
|
||||
self.request.id, ', '.join(declined_reasons))
|
||||
self.decline_request()
|
||||
self.unlockNodeSet(clear_allocation=True)
|
||||
|
||||
# If conditions have changed for a paused request to now cause us
|
||||
# to decline it, we need to unpause so we don't keep trying it
|
||||
if self.paused:
|
||||
self.paused = False
|
||||
|
||||
self.zk.storeNodeRequest(self.request)
|
||||
self.zk.unlockNodeRequest(self.request)
|
||||
self.done = True
|
||||
return
|
||||
|
||||
if self.paused:
|
||||
self.log.debug("Retrying node request %s", self.request.id)
|
||||
else:
|
||||
self.log.debug("Accepting node request %s", self.request.id)
|
||||
self.request.state = zk.PENDING
|
||||
self.zk.storeNodeRequest(self.request)
|
||||
|
||||
self._waitForNodeSet()
|
||||
def launch(self, node):
|
||||
return NodeLauncher(
|
||||
self.zk, self.pool.labels[node.type], self.manager,
|
||||
self.request.requestor, node,
|
||||
self.provider.launch_retries)
|
||||
|
|
|
@ -16,12 +16,12 @@
|
|||
|
||||
import voluptuous as v
|
||||
|
||||
from nodepool.driver import ConfigValue
|
||||
from nodepool.driver import ConfigPool
|
||||
from nodepool.driver import ProviderConfig
|
||||
from nodepool.config import as_list
|
||||
|
||||
|
||||
class StaticPool(ConfigValue):
|
||||
class StaticPool(ConfigPool):
|
||||
def __eq__(self, other):
|
||||
if (other.labels != self.labels or
|
||||
other.nodes != self.nodes):
|
||||
|
|
|
@ -15,7 +15,6 @@
|
|||
import logging
|
||||
import random
|
||||
|
||||
from nodepool import exceptions
|
||||
from nodepool import nodeutils
|
||||
from nodepool import zk
|
||||
from nodepool.driver import NodeRequestHandler
|
||||
|
@ -25,96 +24,41 @@ class StaticNodeRequestHandler(NodeRequestHandler):
|
|||
log = logging.getLogger("nodepool.driver.static."
|
||||
"StaticNodeRequestHandler")
|
||||
|
||||
def _invalidNodeTypes(self):
|
||||
'''
|
||||
Return any node types that are invalid for this provider.
|
||||
|
||||
:returns: A list of node type names that are invalid, or an empty
|
||||
list if all are valid.
|
||||
'''
|
||||
invalid = []
|
||||
for ntype in self.request.node_types:
|
||||
if ntype not in self.pool.labels:
|
||||
invalid.append(ntype)
|
||||
return invalid
|
||||
|
||||
def checkConcurrency(self, static_node):
|
||||
def _checkConcurrency(self, static_node):
|
||||
access_count = 0
|
||||
|
||||
unavailable_states = ['in-use']
|
||||
if not self.request.reuse:
|
||||
# When re-use is disabled (e.g. for Min-Ready request), we need
|
||||
# to consider 'ready' node as in-use.
|
||||
unavailable_states.append('ready')
|
||||
|
||||
for node in self.zk.nodeIterator():
|
||||
if node.hostname != static_node["name"]:
|
||||
continue
|
||||
if node.state in ('ready', 'in-use'):
|
||||
if node.state in unavailable_states:
|
||||
access_count += 1
|
||||
|
||||
if access_count >= static_node["max-parallel-jobs"]:
|
||||
self.log.info("%s: max concurrency reached (%d)" % (
|
||||
static_node["name"], access_count))
|
||||
return False
|
||||
return True
|
||||
|
||||
def _waitForNodeSet(self):
|
||||
'''
|
||||
Fill node set for the request.
|
||||
def launch(self, node):
|
||||
static_node = None
|
||||
available_nodes = self.manager.listNodes()
|
||||
# Randomize static nodes order
|
||||
random.shuffle(available_nodes)
|
||||
for available_node in available_nodes:
|
||||
if node.type in available_node["labels"]:
|
||||
if self._checkConcurrency(available_node):
|
||||
static_node = available_node
|
||||
break
|
||||
|
||||
'''
|
||||
needed_types = self.request.node_types
|
||||
static_nodes = []
|
||||
unavailable_nodes = []
|
||||
ready_nodes = self.zk.getReadyNodesOfTypes(needed_types)
|
||||
|
||||
for ntype in needed_types:
|
||||
# First try to grab from the list of already available nodes.
|
||||
got_a_node = False
|
||||
if self.request.reuse and ntype in ready_nodes:
|
||||
for node in ready_nodes[ntype]:
|
||||
# Only interested in nodes from this provider and
|
||||
# pool
|
||||
if node.provider != self.provider.name:
|
||||
continue
|
||||
if node.pool != self.pool.name:
|
||||
continue
|
||||
|
||||
try:
|
||||
self.zk.lockNode(node, blocking=False)
|
||||
except exceptions.ZKLockException:
|
||||
# It's already locked so skip it.
|
||||
continue
|
||||
else:
|
||||
if self.paused:
|
||||
self.log.debug("Unpaused request %s", self.request)
|
||||
self.paused = False
|
||||
|
||||
self.log.debug(
|
||||
"Locked existing node %s for request %s",
|
||||
node.id, self.request.id)
|
||||
got_a_node = True
|
||||
node.allocated_to = self.request.id
|
||||
self.zk.storeNode(node)
|
||||
self.nodeset.append(node)
|
||||
break
|
||||
# Could not grab an existing node, so assign a new one.
|
||||
if not got_a_node:
|
||||
for node in self.available_nodes:
|
||||
if ntype in node["labels"]:
|
||||
max_concurrency = not self.checkConcurrency(node)
|
||||
if max_concurrency:
|
||||
continue
|
||||
static_nodes.append((ntype, node))
|
||||
break
|
||||
if max_concurrency:
|
||||
unavailable_nodes.append(ntype)
|
||||
|
||||
if unavailable_nodes:
|
||||
self.log.debug("%s: static nodes %s are at capacity" % (
|
||||
self.request.id, unavailable_nodes))
|
||||
self.zk.storeNodeRequest(self.request)
|
||||
self.zk.unlockNodeRequest(self.request)
|
||||
self.done = True
|
||||
return
|
||||
|
||||
for node_type, static_node in static_nodes:
|
||||
if static_node:
|
||||
self.log.debug("%s: Assigning static_node %s" % (
|
||||
self.request.id, static_node))
|
||||
node = zk.Node()
|
||||
node.state = zk.READY
|
||||
node.external_id = "static-%s" % self.request.id
|
||||
node.hostname = static_node["name"]
|
||||
|
@ -124,59 +68,14 @@ class StaticNodeRequestHandler(NodeRequestHandler):
|
|||
node.connection_type = "ssh"
|
||||
nodeutils.set_node_ip(node)
|
||||
node.host_keys = self.manager.nodes_keys[static_node["name"]]
|
||||
node.provider = self.provider.name
|
||||
node.pool = self.pool.name
|
||||
node.launcher = self.launcher_id
|
||||
node.allocated_to = self.request.id
|
||||
node.type = node_type
|
||||
self.nodeset.append(node)
|
||||
self.zk.storeNode(node)
|
||||
|
||||
def run_handler(self):
|
||||
'''
|
||||
Main body for the StaticNodeRequestHandler.
|
||||
'''
|
||||
self._setFromPoolWorker()
|
||||
|
||||
# We have the launcher_id attr after _setFromPoolWorker() is called.
|
||||
self.log = logging.getLogger(
|
||||
"nodepool.driver.static.StaticNodeRequestHandler[%s]" %
|
||||
self.launcher_id)
|
||||
|
||||
declined_reasons = []
|
||||
invalid_types = self._invalidNodeTypes()
|
||||
if invalid_types:
|
||||
declined_reasons.append('node type(s) [%s] not available' %
|
||||
','.join(invalid_types))
|
||||
|
||||
self.available_nodes = self.manager.listNodes()
|
||||
# Randomize static nodes order
|
||||
random.shuffle(self.available_nodes)
|
||||
|
||||
if len(self.request.node_types) > len(self.available_nodes):
|
||||
declined_reasons.append('it would exceed quota')
|
||||
|
||||
if declined_reasons:
|
||||
self.log.debug("Declining node request %s because %s",
|
||||
self.request.id, ', '.join(declined_reasons))
|
||||
self.decline_request()
|
||||
self.unlockNodeSet(clear_allocation=True)
|
||||
|
||||
# If conditions have changed for a paused request to now cause us
|
||||
# to decline it, we need to unpause so we don't keep trying it
|
||||
if self.paused:
|
||||
self.paused = False
|
||||
|
||||
self.zk.storeNodeRequest(self.request)
|
||||
self.zk.unlockNodeRequest(self.request)
|
||||
self.done = True
|
||||
return
|
||||
|
||||
if self.paused:
|
||||
self.log.debug("Retrying node request %s", self.request.id)
|
||||
else:
|
||||
self.log.debug("Accepting node request %s", self.request.id)
|
||||
self.request.state = zk.PENDING
|
||||
self.zk.storeNodeRequest(self.request)
|
||||
|
||||
self._waitForNodeSet()
|
||||
def pollLauncher(self):
|
||||
waiting_node = False
|
||||
for node in self.nodeset:
|
||||
if node.state == zk.READY:
|
||||
continue
|
||||
self.launch(node)
|
||||
if node.state != zk.READY:
|
||||
waiting_node = True
|
||||
return not waiting_node
|
||||
|
|
|
@ -15,11 +15,11 @@
|
|||
import math
|
||||
import voluptuous as v
|
||||
|
||||
from nodepool.driver import ConfigValue
|
||||
from nodepool.driver import ConfigPool
|
||||
from nodepool.driver import ProviderConfig
|
||||
|
||||
|
||||
class TestPool(ConfigValue):
|
||||
class TestPool(ConfigPool):
|
||||
pass
|
||||
|
||||
|
||||
|
@ -39,6 +39,7 @@ class TestConfig(ProviderConfig):
|
|||
testpool.name = pool['name']
|
||||
testpool.provider = self
|
||||
testpool.max_servers = pool.get('max-servers', math.inf)
|
||||
testpool.labels = pool['labels']
|
||||
for label in pool['labels']:
|
||||
self.labels.add(label)
|
||||
newconfig.labels[label].pools.append(testpool)
|
||||
|
|
|
@ -21,14 +21,7 @@ from nodepool.driver import NodeRequestHandler
|
|||
class TestHandler(NodeRequestHandler):
|
||||
log = logging.getLogger("nodepool.driver.test.TestHandler")
|
||||
|
||||
def run_handler(self):
|
||||
self._setFromPoolWorker()
|
||||
node = zk.Node()
|
||||
def launch(self, node):
|
||||
node.state = zk.READY
|
||||
node.external_id = "test-%s" % self.request.id
|
||||
node.provider = self.provider.name
|
||||
node.launcher = self.launcher_id
|
||||
node.allocated_to = self.request.id
|
||||
node.type = self.request.node_types[0]
|
||||
self.nodeset.append(node)
|
||||
self.zk.storeNode(node)
|
||||
|
|
|
@ -1188,7 +1188,7 @@ class TestLauncher(tests.DBTestCase):
|
|||
provider2_second.state = zk.DELETING
|
||||
self.zk.storeNode(provider2_second)
|
||||
|
||||
# Set provider1 run_handler to throw exception to simulate a
|
||||
# Set provider1 runHandler to throw exception to simulate a
|
||||
# broken cloud. Note the pool worker instantiates request handlers on
|
||||
# demand which is why we have a somewhat convoluted monkey patch here.
|
||||
# We must patch deep enough in the request handler that
|
||||
|
@ -1199,7 +1199,7 @@ class TestLauncher(tests.DBTestCase):
|
|||
def raise_KeyError(node):
|
||||
raise KeyError('fake-provider')
|
||||
|
||||
request_handler.launch_manager.launch = raise_KeyError
|
||||
request_handler.launch = raise_KeyError
|
||||
|
||||
# Delete instance in fake-provider. This should cause provider2
|
||||
# to service the request that was held pending by fake-provider.
|
||||
|
|
|
@ -21,7 +21,7 @@ from nodepool import builder
|
|||
from nodepool import provider_manager
|
||||
from nodepool import tests
|
||||
from nodepool import zk
|
||||
from nodepool.driver.openstack.handler import OpenStackNodeLaunchManager
|
||||
from nodepool.driver.openstack.handler import OpenStackNodeRequestHandler
|
||||
|
||||
|
||||
class TestNodeLaunchManager(tests.DBTestCase):
|
||||
|
@ -47,21 +47,45 @@ class TestNodeLaunchManager(tests.DBTestCase):
|
|||
self.provider, False)
|
||||
self.pmanager.resetClient()
|
||||
|
||||
def _createHandler(self, retries=1):
|
||||
# Mock NodeRequest handler object
|
||||
class FakePoolWorker:
|
||||
launcher_id = 'Test'
|
||||
|
||||
class FakeRequest:
|
||||
requestor = 'zuul'
|
||||
|
||||
class FakeProvider:
|
||||
launch_retries = retries
|
||||
|
||||
handler = OpenStackNodeRequestHandler(FakePoolWorker(), FakeRequest())
|
||||
handler.zk = self.zk
|
||||
handler.pool = self.provider_pool
|
||||
handler.manager = self.pmanager
|
||||
handler.provider = FakeProvider()
|
||||
return handler
|
||||
|
||||
def _launch(self, handler, node):
|
||||
# Mock NodeRequest runHandler method
|
||||
thread = handler.launch(node)
|
||||
thread.start()
|
||||
handler._threads.append(thread)
|
||||
handler.nodeset.append(node)
|
||||
|
||||
def test_successful_launch(self):
|
||||
configfile = self.setup_config('node.yaml')
|
||||
self._setup(configfile)
|
||||
handler = self._createHandler(1)
|
||||
|
||||
n1 = zk.Node()
|
||||
n1.state = zk.BUILDING
|
||||
n1.type = 'fake-label'
|
||||
mgr = OpenStackNodeLaunchManager(self.zk, self.provider_pool,
|
||||
self.pmanager, 'zuul', 1)
|
||||
mgr.launch(n1)
|
||||
while not mgr.poll():
|
||||
self._launch(handler, n1)
|
||||
while not handler.pollLauncher():
|
||||
time.sleep(0)
|
||||
self.assertEqual(len(mgr.ready_nodes), 1)
|
||||
self.assertEqual(len(mgr.failed_nodes), 0)
|
||||
nodes = mgr._provider_manager.listNodes()
|
||||
self.assertEqual(len(handler.ready_nodes), 1)
|
||||
self.assertEqual(len(handler.failed_nodes), 0)
|
||||
nodes = handler.manager.listNodes()
|
||||
self.assertEqual(nodes[0]['metadata']['groups'],
|
||||
'fake-provider,fake-image,fake-label')
|
||||
|
||||
|
@ -69,23 +93,23 @@ class TestNodeLaunchManager(tests.DBTestCase):
|
|||
def test_failed_launch(self, mock_launch):
|
||||
configfile = self.setup_config('node.yaml')
|
||||
self._setup(configfile)
|
||||
handler = self._createHandler(1)
|
||||
|
||||
mock_launch.side_effect = Exception()
|
||||
n1 = zk.Node()
|
||||
n1.state = zk.BUILDING
|
||||
n1.type = 'fake-label'
|
||||
mgr = OpenStackNodeLaunchManager(self.zk, self.provider_pool,
|
||||
self.pmanager, 'zuul', 1)
|
||||
mgr.launch(n1)
|
||||
while not mgr.poll():
|
||||
self._launch(handler, n1)
|
||||
while not handler.pollLauncher():
|
||||
time.sleep(0)
|
||||
self.assertEqual(len(mgr.failed_nodes), 1)
|
||||
self.assertEqual(len(mgr.ready_nodes), 0)
|
||||
self.assertEqual(len(handler.failed_nodes), 1)
|
||||
self.assertEqual(len(handler.ready_nodes), 0)
|
||||
|
||||
@mock.patch('nodepool.driver.openstack.handler.NodeLauncher._launchNode')
|
||||
def test_mixed_launch(self, mock_launch):
|
||||
configfile = self.setup_config('node.yaml')
|
||||
self._setup(configfile)
|
||||
handler = self._createHandler(1)
|
||||
|
||||
mock_launch.side_effect = [None, Exception()]
|
||||
n1 = zk.Node()
|
||||
|
@ -94,11 +118,9 @@ class TestNodeLaunchManager(tests.DBTestCase):
|
|||
n2 = zk.Node()
|
||||
n2.state = zk.BUILDING
|
||||
n2.type = 'fake-label'
|
||||
mgr = OpenStackNodeLaunchManager(self.zk, self.provider_pool,
|
||||
self.pmanager, 'zuul', 1)
|
||||
mgr.launch(n1)
|
||||
mgr.launch(n2)
|
||||
while not mgr.poll():
|
||||
self._launch(handler, n1)
|
||||
self._launch(handler, n2)
|
||||
while not handler.pollLauncher():
|
||||
time.sleep(0)
|
||||
self.assertEqual(len(mgr.failed_nodes), 1)
|
||||
self.assertEqual(len(mgr.ready_nodes), 1)
|
||||
self.assertEqual(len(handler._failed_nodes), 1)
|
||||
self.assertEqual(len(handler._ready_nodes), 1)
|
||||
|
|
Loading…
Reference in New Issue