By default list_servers gets the server list and then attaches the network information to each server object. This involves a call to the floatingips endpoint per instance. This is unneeded during cleanupLeakedInstances since we're only interested in the id and metadata in order to trigger node deletion. In larger clouds this can trigger hundreds or thousands unnecessary api calls to the cloud. Optimize this by getting the server list with the bare flag to avoid that. Change-Id: Ie48e647eefc2aa3169e943fdf6854d82219b645b
650 lines
25 KiB
Python
Executable File
650 lines
25 KiB
Python
Executable File
# Copyright (C) 2011-2013 OpenStack Foundation
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
#
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import logging
|
|
import operator
|
|
import os
|
|
import threading
|
|
import time
|
|
|
|
import openstack
|
|
from openstack.exceptions import ResourceTimeout
|
|
|
|
from nodepool import exceptions
|
|
from nodepool.driver import Provider
|
|
from nodepool.driver.utils import QuotaInformation, QuotaSupport
|
|
from nodepool import stats
|
|
from nodepool import version
|
|
from nodepool import zk
|
|
|
|
# Import entire module to avoid partial-loading, circular import
|
|
from nodepool.driver.openstack import handler
|
|
|
|
|
|
IPS_LIST_AGE = 5 # How long to keep a cached copy of the ip list
|
|
|
|
|
|
class OpenStackProvider(Provider, QuotaSupport):
|
|
log = logging.getLogger("nodepool.driver.openstack.OpenStackProvider")
|
|
|
|
def __init__(self, provider):
|
|
super().__init__()
|
|
self.provider = provider
|
|
self._images = {}
|
|
self._networks = {}
|
|
self.__flavors = {} # TODO(gtema): caching
|
|
self.__azs = None
|
|
self._zk = None
|
|
self._down_ports = set()
|
|
self._last_port_cleanup = None
|
|
self._statsd = stats.get_client()
|
|
self.running = False
|
|
self._server_list_watcher = threading.Thread(
|
|
name='ServerListWatcher', target=self._watchServerList,
|
|
daemon=True)
|
|
self._server_list_watcher_stop_event = threading.Event()
|
|
self._cleanup_queue = {}
|
|
self._startup_queue = {}
|
|
|
|
def start(self, zk_conn):
|
|
self.resetClient()
|
|
self._zk = zk_conn
|
|
self.running = True
|
|
self._server_list_watcher.start()
|
|
|
|
def stop(self):
|
|
self.running = False
|
|
self._server_list_watcher_stop_event.set()
|
|
|
|
def join(self):
|
|
self._server_list_watcher.join()
|
|
|
|
def getRequestHandler(self, poolworker, request):
|
|
return handler.OpenStackNodeRequestHandler(poolworker, request)
|
|
|
|
# TODO(gtema): caching
|
|
@property
|
|
def _flavors(self):
|
|
if not self.__flavors:
|
|
self.__flavors = self._getFlavors()
|
|
return self.__flavors
|
|
|
|
def _getClient(self):
|
|
rate_limit = None
|
|
# nodepool tracks rate limit in time between requests.
|
|
# openstacksdk tracks rate limit in requests per second.
|
|
# 1/time = requests-per-second.
|
|
if self.provider.rate:
|
|
rate_limit = 1 / self.provider.rate
|
|
return openstack.connection.Connection(
|
|
config=self.provider.cloud_config,
|
|
use_direct_get=False,
|
|
rate_limit=rate_limit,
|
|
statsd_host=os.getenv('STATSD_HOST', None),
|
|
statsd_port=os.getenv('STATSD_PORT ', None),
|
|
statsd_prefix='nodepool.task.{0}'.format(self.provider.name),
|
|
app_name='nodepool',
|
|
app_version=version.version_info.version_string()
|
|
)
|
|
|
|
def getProviderLimits(self):
|
|
limits = self._client.get_compute_limits()
|
|
return QuotaInformation.construct_from_limits(limits)
|
|
|
|
def quotaNeededByLabel(self, ntype, pool):
|
|
provider_label = pool.labels[ntype]
|
|
|
|
flavor = self.findFlavor(provider_label.flavor_name,
|
|
provider_label.min_ram)
|
|
|
|
return QuotaInformation.construct_from_flavor(flavor)
|
|
|
|
def unmanagedQuotaUsed(self):
|
|
'''
|
|
Sums up the quota used by servers unmanaged by nodepool.
|
|
|
|
:return: Calculated quota in use by unmanaged servers
|
|
'''
|
|
flavors = self.listFlavorsById()
|
|
used_quota = QuotaInformation()
|
|
|
|
node_ids = set([n.id for n in self._zk.nodeIterator()])
|
|
|
|
for server in self.listNodes():
|
|
meta = server.get('metadata', {})
|
|
|
|
nodepool_provider_name = meta.get('nodepool_provider_name')
|
|
if (nodepool_provider_name and
|
|
nodepool_provider_name == self.provider.name):
|
|
# This provider (regardless of the launcher) owns this
|
|
# server so it must not be accounted for unmanaged
|
|
# quota; unless it has leaked.
|
|
nodepool_node_id = meta.get('nodepool_node_id')
|
|
# FIXME(tobiash): Add a test case for this
|
|
if nodepool_node_id and nodepool_node_id in node_ids:
|
|
# It has not leaked.
|
|
continue
|
|
|
|
# In earlier versions of nova, flavor is an id. In later versions
|
|
# it returns the information we're looking for. If we get the
|
|
# information, we do not have to attempt to look up the ram or
|
|
# vcpus.
|
|
if hasattr(server.flavor, 'id'):
|
|
flavor = flavors.get(server.flavor.id)
|
|
else:
|
|
flavor = server.flavor
|
|
used_quota.add(QuotaInformation.construct_from_flavor(flavor))
|
|
|
|
return used_quota
|
|
|
|
def resetClient(self):
|
|
self._client = self._getClient()
|
|
|
|
def _getFlavors(self):
|
|
flavors = self.listFlavors()
|
|
flavors.sort(key=operator.itemgetter('ram'))
|
|
return flavors
|
|
|
|
# TODO(gtema): These next three methods duplicate logic that is in
|
|
# openstacksdk, caching is not enabled there by default
|
|
# Remove it when caching is default
|
|
def _findFlavorByName(self, flavor_name):
|
|
for f in self._flavors:
|
|
if flavor_name in (f['name'], f['id']):
|
|
return f
|
|
raise Exception("Unable to find flavor: %s" % flavor_name)
|
|
|
|
def _findFlavorByRam(self, min_ram, flavor_name):
|
|
for f in self._flavors:
|
|
if (f['ram'] >= min_ram
|
|
and (not flavor_name or flavor_name in f['name'])):
|
|
return f
|
|
raise Exception("Unable to find flavor with min ram: %s" % min_ram)
|
|
|
|
def findFlavor(self, flavor_name, min_ram):
|
|
# Note: this will throw an error if the provider is offline
|
|
# but all the callers are in threads (they call in via CreateServer) so
|
|
# the mainloop won't be affected.
|
|
# TODO(gtema): enable commented block when openstacksdk has caching
|
|
# enabled by default
|
|
# if min_ram:
|
|
# return self._client.get_flavor_by_ram(
|
|
# ram=min_ram,
|
|
# include=flavor_name,
|
|
# get_extra=False)
|
|
# else:
|
|
# return self._client.get_flavor(flavor_name, get_extra=False)
|
|
|
|
if min_ram:
|
|
return self._findFlavorByRam(min_ram, flavor_name)
|
|
else:
|
|
return self._findFlavorByName(flavor_name)
|
|
|
|
def findImage(self, name):
|
|
if name in self._images:
|
|
return self._images[name]
|
|
|
|
image = self._client.get_image(name, filters={'status': 'active'})
|
|
self._images[name] = image
|
|
return image
|
|
|
|
def findNetwork(self, name):
|
|
if name in self._networks:
|
|
return self._networks[name]
|
|
|
|
network = self._client.get_network(name)
|
|
if not network:
|
|
raise Exception("Unable to find network %s in provider %s" % (
|
|
name, self.provider.name))
|
|
self._networks[name] = network
|
|
return network
|
|
|
|
def deleteImage(self, name, id):
|
|
if name in self._images:
|
|
del self._images[name]
|
|
|
|
return self._client.delete_image(dict(id=id))
|
|
|
|
def createServer(self, name, image,
|
|
flavor_name=None, min_ram=None,
|
|
az=None, key_name=None, config_drive=True,
|
|
nodepool_node_id=None, nodepool_node_label=None,
|
|
nodepool_image_name=None,
|
|
nodepool_pool_name=None,
|
|
networks=None, security_groups=None,
|
|
boot_from_volume=False, volume_size=50,
|
|
instance_properties=None, userdata=None):
|
|
if not networks:
|
|
networks = []
|
|
if not isinstance(image, dict):
|
|
# if it's a dict, we already have the cloud id. If it's not,
|
|
# we don't know if it's name or ID so need to look it up
|
|
image = self.findImage(image)
|
|
flavor = self.findFlavor(flavor_name=flavor_name, min_ram=min_ram)
|
|
create_args = dict(name=name,
|
|
image=image,
|
|
flavor=flavor,
|
|
config_drive=config_drive)
|
|
if boot_from_volume:
|
|
create_args['boot_from_volume'] = boot_from_volume
|
|
create_args['volume_size'] = volume_size
|
|
# NOTE(pabelanger): Always cleanup volumes when we delete a server.
|
|
create_args['terminate_volume'] = True
|
|
if key_name:
|
|
create_args['key_name'] = key_name
|
|
if az:
|
|
create_args['availability_zone'] = az
|
|
if security_groups:
|
|
create_args['security_groups'] = security_groups
|
|
if userdata:
|
|
create_args['userdata'] = userdata
|
|
nics = []
|
|
for network in networks:
|
|
net_id = self.findNetwork(network)['id']
|
|
nics.append({'net-id': net_id})
|
|
if nics:
|
|
create_args['nics'] = nics
|
|
# Put provider.name and image_name in as groups so that ansible
|
|
# inventory can auto-create groups for us based on each of those
|
|
# qualities
|
|
# Also list each of those values directly so that non-ansible
|
|
# consumption programs don't need to play a game of knowing that
|
|
# groups[0] is the image name or anything silly like that.
|
|
groups_list = [self.provider.name]
|
|
|
|
if nodepool_image_name:
|
|
groups_list.append(nodepool_image_name)
|
|
if nodepool_node_label:
|
|
groups_list.append(nodepool_node_label)
|
|
meta = dict(
|
|
groups=",".join(groups_list),
|
|
nodepool_provider_name=self.provider.name,
|
|
nodepool_pool_name=nodepool_pool_name,
|
|
)
|
|
# merge in any provided properties
|
|
if instance_properties:
|
|
meta = {**instance_properties, **meta}
|
|
if nodepool_node_id:
|
|
meta['nodepool_node_id'] = nodepool_node_id
|
|
if nodepool_image_name:
|
|
meta['nodepool_image_name'] = nodepool_image_name
|
|
if nodepool_node_label:
|
|
meta['nodepool_node_label'] = nodepool_node_label
|
|
create_args['meta'] = meta
|
|
|
|
try:
|
|
return self._client.create_server(wait=False, **create_args)
|
|
except openstack.exceptions.BadRequestException:
|
|
# We've gotten a 400 error from nova - which means the request
|
|
# was malformed. The most likely cause of that, unless something
|
|
# became functionally and systemically broken, is stale az, image
|
|
# or flavor cache. Log a message, invalidate the caches so that
|
|
# next time we get new caches.
|
|
self._images = {}
|
|
self.__azs = None
|
|
self.__flavors = {} # TODO(gtema): caching
|
|
self.log.info(
|
|
"Clearing az, flavor and image caches due to 400 error "
|
|
"from nova")
|
|
raise
|
|
|
|
def getServer(self, server_id):
|
|
return self._client.get_server(server_id)
|
|
|
|
def getServerById(self, server_id):
|
|
return self._client.get_server_by_id(server_id)
|
|
|
|
def getServerConsole(self, server_id):
|
|
try:
|
|
return self._client.get_server_console(server_id)
|
|
except openstack.exceptions.OpenStackCloudException:
|
|
return None
|
|
|
|
def waitForServer(self, server, timeout=3600, auto_ip=True):
|
|
# This method is called from a separate thread per server. In order to
|
|
# reduce thread contention we don't call wait_for_server right now
|
|
# but put this thread on sleep until the desired instance is either
|
|
# in ACTIVE or ERROR state. After that just continue with
|
|
# wait_for_server which will continue its magic.
|
|
# TODO: log annotation
|
|
self.log.debug('Wait for central server creation %s', server.id)
|
|
event = threading.Event()
|
|
start_time = time.monotonic()
|
|
self._startup_queue[server.id] = (event, start_time + timeout)
|
|
if not event.wait(timeout=timeout):
|
|
# On timeout emit the same exception as wait_for_server would to
|
|
timeout_message = "Timeout waiting for the server to come up."
|
|
raise ResourceTimeout(timeout_message)
|
|
|
|
# TODO: log annotation
|
|
self.log.debug('Finished wait for central server creation %s',
|
|
server.id)
|
|
|
|
# Re-calculate timeout to account for the duration so far
|
|
elapsed = time.monotonic() - start_time
|
|
timeout = max(0, timeout - elapsed)
|
|
|
|
return self._client.wait_for_server(
|
|
server=server, auto_ip=auto_ip,
|
|
reuse=False, timeout=timeout)
|
|
|
|
def waitForNodeCleanup(self, server_id, timeout=600):
|
|
event = threading.Event()
|
|
self._cleanup_queue[server_id] = (event, time.monotonic() + timeout)
|
|
if not event.wait(timeout=timeout):
|
|
raise exceptions.ServerDeleteException(
|
|
"server %s deletion" % server_id)
|
|
|
|
def createImage(self, server, image_name, meta):
|
|
return self._client.create_image_snapshot(
|
|
image_name, server, **meta)
|
|
|
|
def getImage(self, image_id):
|
|
return self._client.get_image(image_id, filters={'status': 'active'})
|
|
|
|
def labelReady(self, label):
|
|
if not label.cloud_image:
|
|
return False
|
|
|
|
# If an image ID was supplied, we'll assume it is ready since
|
|
# we don't currently have a way of validating that (except during
|
|
# server creation).
|
|
if label.cloud_image.image_id:
|
|
return True
|
|
|
|
image = self.getImage(label.cloud_image.external_name)
|
|
if not image:
|
|
self.log.warning(
|
|
"Provider %s is configured to use %s as the"
|
|
" cloud-image for label %s and that"
|
|
" cloud-image could not be found in the"
|
|
" cloud." % (self.provider.name,
|
|
label.cloud_image.external_name,
|
|
label.name))
|
|
return False
|
|
return True
|
|
|
|
def uploadImage(self, image_name, filename, image_type=None, meta=None,
|
|
md5=None, sha256=None):
|
|
# configure glance and upload image. Note the meta flags
|
|
# are provided as custom glance properties
|
|
# NOTE: we have wait=True set here. This is not how we normally
|
|
# do things in nodepool, preferring to poll ourselves thankyouverymuch.
|
|
# However - two things to note:
|
|
# - PUT has no aysnc mechanism, so we have to handle it anyway
|
|
# - v2 w/task waiting is very strange and complex - but we have to
|
|
# block for our v1 clouds anyway, so we might as well
|
|
# have the interface be the same and treat faking-out
|
|
# a openstacksdk-level fake-async interface later
|
|
if not meta:
|
|
meta = {}
|
|
if image_type:
|
|
meta['disk_format'] = image_type
|
|
image = self._client.create_image(
|
|
name=image_name,
|
|
filename=filename,
|
|
is_public=False,
|
|
wait=True,
|
|
md5=md5,
|
|
sha256=sha256,
|
|
**meta)
|
|
return image.id
|
|
|
|
def listPorts(self, status=None):
|
|
'''
|
|
List known ports.
|
|
|
|
:param str status: A valid port status. E.g., 'ACTIVE' or 'DOWN'.
|
|
'''
|
|
if status:
|
|
ports = self._client.list_ports(filters={'status': status})
|
|
else:
|
|
ports = self._client.list_ports()
|
|
return ports
|
|
|
|
def deletePort(self, port_id):
|
|
self._client.delete_port(port_id)
|
|
|
|
def listImages(self):
|
|
return self._client.list_images()
|
|
|
|
def listFlavors(self):
|
|
return self._client.list_flavors(get_extra=False)
|
|
|
|
def listFlavorsById(self):
|
|
flavors = {}
|
|
for flavor in self._client.list_flavors(get_extra=False):
|
|
flavors[flavor.id] = flavor
|
|
return flavors
|
|
|
|
def listNodes(self):
|
|
# list_servers carries the nodepool server list caching logic
|
|
return self._client.list_servers()
|
|
|
|
def deleteServer(self, server_id):
|
|
return self._client.delete_server(server_id, delete_ips=True)
|
|
|
|
def cleanupNode(self, server_id):
|
|
server = self.getServer(server_id)
|
|
if not server:
|
|
raise exceptions.NotFound()
|
|
|
|
self.log.debug('Deleting server %s' % server_id)
|
|
self.deleteServer(server_id)
|
|
|
|
def cleanupLeakedInstances(self):
|
|
'''
|
|
Delete any leaked server instances.
|
|
|
|
Remove any servers found in this provider that are not recorded in
|
|
the ZooKeeper data.
|
|
'''
|
|
|
|
deleting_nodes = {}
|
|
|
|
for node in self._zk.nodeIterator():
|
|
if node.state == zk.DELETING:
|
|
if node.provider != self.provider.name:
|
|
continue
|
|
if node.provider not in deleting_nodes:
|
|
deleting_nodes[node.provider] = []
|
|
deleting_nodes[node.provider].append(node.external_id)
|
|
|
|
for server in self._client.list_servers(bare=True):
|
|
meta = server.get('metadata', {})
|
|
|
|
if 'nodepool_provider_name' not in meta:
|
|
continue
|
|
|
|
if meta['nodepool_provider_name'] != self.provider.name:
|
|
# Another launcher, sharing this provider but configured
|
|
# with a different name, owns this.
|
|
continue
|
|
|
|
if (self.provider.name in deleting_nodes and
|
|
server.id in deleting_nodes[self.provider.name]):
|
|
# Already deleting this node
|
|
continue
|
|
|
|
if not self._zk.getNode(meta['nodepool_node_id']):
|
|
self.log.warning(
|
|
"Marking for delete leaked instance %s (%s) in %s "
|
|
"(unknown node id %s)",
|
|
server.name, server.id, self.provider.name,
|
|
meta['nodepool_node_id']
|
|
)
|
|
# Create an artifical node to use for deleting the server.
|
|
node = zk.Node()
|
|
node.external_id = server.id
|
|
node.provider = self.provider.name
|
|
node.pool = meta.get('nodepool_pool_name')
|
|
node.state = zk.DELETING
|
|
self._zk.storeNode(node)
|
|
if self._statsd:
|
|
key = ('nodepool.provider.%s.leaked.nodes'
|
|
% self.provider.name)
|
|
self._statsd.incr(key)
|
|
|
|
def filterComputePorts(self, ports):
|
|
'''
|
|
Return a list of compute ports (or no device owner).
|
|
|
|
We are not interested in ports for routers or DHCP.
|
|
'''
|
|
ret = []
|
|
for p in ports:
|
|
if p.device_owner is None or p.device_owner.startswith("compute:"):
|
|
ret.append(p)
|
|
return ret
|
|
|
|
def cleanupLeakedPorts(self):
|
|
if not self._last_port_cleanup:
|
|
self._last_port_cleanup = time.monotonic()
|
|
ports = self.listPorts(status='DOWN')
|
|
ports = self.filterComputePorts(ports)
|
|
self._down_ports = set([p.id for p in ports])
|
|
return
|
|
|
|
# Return if not enough time has passed between cleanup
|
|
last_check_in_secs = int(time.monotonic() - self._last_port_cleanup)
|
|
if last_check_in_secs <= self.provider.port_cleanup_interval:
|
|
return
|
|
|
|
ports = self.listPorts(status='DOWN')
|
|
ports = self.filterComputePorts(ports)
|
|
current_set = set([p.id for p in ports])
|
|
remove_set = current_set & self._down_ports
|
|
|
|
removed_count = 0
|
|
for port_id in remove_set:
|
|
try:
|
|
self.deletePort(port_id)
|
|
except Exception:
|
|
self.log.exception("Exception deleting port %s in %s:",
|
|
port_id, self.provider.name)
|
|
else:
|
|
removed_count += 1
|
|
self.log.debug("Removed DOWN port %s in %s",
|
|
port_id, self.provider.name)
|
|
|
|
if self._statsd and removed_count:
|
|
key = 'nodepool.provider.%s.leaked.ports' % (self.provider.name)
|
|
self._statsd.incr(key, removed_count)
|
|
|
|
self._last_port_cleanup = time.monotonic()
|
|
|
|
# Rely on OpenStack to tell us the down ports rather than doing our
|
|
# own set adjustment.
|
|
ports = self.listPorts(status='DOWN')
|
|
ports = self.filterComputePorts(ports)
|
|
self._down_ports = set([p.id for p in ports])
|
|
|
|
def cleanupLeakedResources(self):
|
|
self.cleanupLeakedInstances()
|
|
if self.provider.port_cleanup_interval:
|
|
self.cleanupLeakedPorts()
|
|
if self.provider.clean_floating_ips:
|
|
did_clean = self._client.delete_unattached_floating_ips()
|
|
if did_clean:
|
|
# some openstacksdk's return True if any port was
|
|
# cleaned, rather than the count. Just set it to 1 to
|
|
# indicate something happened.
|
|
if type(did_clean) == bool:
|
|
did_clean = 1
|
|
if self._statsd:
|
|
key = ('nodepool.provider.%s.leaked.floatingips'
|
|
% self.provider.name)
|
|
self._statsd.incr(key, did_clean)
|
|
|
|
def getAZs(self):
|
|
if self.__azs is None:
|
|
self.__azs = self._client.list_availability_zone_names()
|
|
if not self.__azs:
|
|
# If there are no zones, return a list containing None so that
|
|
# random.choice can pick None and pass that to Nova. If this
|
|
# feels dirty, please direct your ire to policy.json and the
|
|
# ability to turn off random portions of the OpenStack API.
|
|
self.__azs = [None]
|
|
return self.__azs
|
|
|
|
def _watchServerList(self):
|
|
log = logging.getLogger(
|
|
"nodepool.driver.openstack.OpenStackProvider.watcher")
|
|
while self.running:
|
|
if self._server_list_watcher_stop_event.wait(5):
|
|
# We're stopping now so don't wait with any thread for node
|
|
# deletion.
|
|
for event, _ in self._cleanup_queue.values():
|
|
event.set()
|
|
for event, _ in self._startup_queue.values():
|
|
event.set()
|
|
break
|
|
|
|
if not self._cleanup_queue and not self._startup_queue:
|
|
# No server deletion to wait for so check can be skipped
|
|
continue
|
|
|
|
try:
|
|
log.debug('Get server list')
|
|
start = time.monotonic()
|
|
# List bare to avoid neutron calls
|
|
servers = self._client.list_servers(bare=True)
|
|
log.debug('Got server list in %.3fs', time.monotonic() - start)
|
|
except Exception:
|
|
log.exception('Failed to get server list')
|
|
continue
|
|
|
|
def process_timeouts(queue):
|
|
for server_id in list(queue.keys()):
|
|
# Remove entries that are beyond timeout
|
|
_, timeout = queue[server_id]
|
|
if time.monotonic() > timeout:
|
|
del queue[server_id]
|
|
|
|
# Process cleanup queue
|
|
existing_server_ids = {
|
|
server.id for server in servers
|
|
if server.status != 'DELETED'
|
|
}
|
|
for server_id in list(self._cleanup_queue.keys()):
|
|
# Notify waiting threads that don't have server ids
|
|
if server_id not in existing_server_ids:
|
|
# Notify the thread which is waiting for the delete
|
|
log.debug('Waking up cleanup thread for server %s',
|
|
server_id)
|
|
self._cleanup_queue[server_id][0].set()
|
|
del self._cleanup_queue[server_id]
|
|
|
|
# Process startup queue
|
|
finished_server_ids = {
|
|
server.id for server in servers
|
|
if server.status in ('ACTIVE', 'ERROR')
|
|
}
|
|
for server_id in list(self._startup_queue.keys()):
|
|
# Notify waiting threads that don't have server ids
|
|
if server_id in finished_server_ids:
|
|
# Notify the thread which is waiting for the delete
|
|
log.debug('Waking up startup thread for server %s',
|
|
server_id)
|
|
self._startup_queue[server_id][0].set()
|
|
del self._startup_queue[server_id]
|
|
|
|
# Process timeouts
|
|
process_timeouts(self._cleanup_queue)
|
|
process_timeouts(self._startup_queue)
|
|
|
|
log.debug('Done')
|