Merge "Delete leaked instances"

This commit is contained in:
Jenkins 2015-06-18 17:43:27 +00:00 committed by Gerrit Code Review
commit e8315afdfa
6 changed files with 203 additions and 2 deletions

View File

@ -91,7 +91,7 @@ class FakeList(object):
public=[dict(version=4, addr='fake')],
private=[dict(version=4, addr='fake')]
),
metadata={},
metadata=kw.get('meta', {}),
manager=self,
should_fail=should_fail)
self._list.append(s)

View File

@ -336,6 +336,25 @@ class GearmanClient(gear.Client):
return needed_workers
class InstanceDeleter(threading.Thread):
log = logging.getLogger("nodepool.InstanceDeleter")
def __init__(self, nodepool, provider_name, external_id):
threading.Thread.__init__(self, name='InstanceDeleter for %s %s' %
(provider_name, external_id))
self.nodepool = nodepool
self.provider_name = provider_name
self.external_id = external_id
def run(self):
try:
self.nodepool._deleteInstance(self.provider_name,
self.external_id)
except Exception:
self.log.exception("Exception deleting node %s:" %
self.node_id)
class NodeDeleter(threading.Thread):
log = logging.getLogger("nodepool.NodeDeleter")
@ -1239,6 +1258,8 @@ class NodePool(threading.Thread):
self._delete_threads_lock = threading.Lock()
self._image_delete_threads = {}
self._image_delete_threads_lock = threading.Lock()
self._instance_delete_threads = {}
self._instance_delete_threads_lock = threading.Lock()
self._image_builder_queue = Queue.Queue()
self._image_builder_thread = None
@ -1553,13 +1574,18 @@ class NodePool(threading.Thread):
if self.config and self.config.crons[c.name].job:
self.apsched.unschedule_job(self.config.crons[c.name].job)
parts = c.timespec.split()
if len(parts) > 5:
second = parts[5]
else:
second = None
minute, hour, dom, month, dow = parts[:5]
c.job = self.apsched.add_cron_job(
cron_map[c.name],
day=dom,
day_of_week=dow,
hour=hour,
minute=minute)
minute=minute,
second=second)
else:
c.job = self.config.crons[c.name].job
@ -2266,6 +2292,26 @@ class NodePool(threading.Thread):
dib_image.delete()
self.log.info("Deleted dib image id: %s" % dib_image.id)
def deleteInstance(self, provider_name, external_id):
key = (provider_name, external_id)
try:
self._instance_delete_threads_lock.acquire()
if key in self._instance_delete_threads:
return
t = InstanceDeleter(self, provider_name, external_id)
self._instance_delete_threads[key] = t
t.start()
except Exception:
self.log.exception("Could not delete instance %s on provider %s",
provider_name, external_id)
finally:
self._instance_delete_threads_lock.release()
def _deleteInstance(self, provider_name, external_id):
provider = self.config.providers[provider_name]
manager = self.getProviderManager(provider)
manager.cleanupServer(external_id)
def _doPeriodicCleanup(self):
try:
self.periodicCleanup()
@ -2322,8 +2368,63 @@ class NodePool(threading.Thread):
except Exception:
self.log.exception("Exception cleaning up image id %s:" %
dib_image_id)
try:
self.cleanupLeakedInstances()
pass
except Exception:
self.log.exception("Exception cleaning up leaked nodes")
self.log.debug("Finished periodic cleanup")
def cleanupLeakedInstances(self):
known_providers = self.config.providers.keys()
with self.getDB().getSession() as session:
for provider in self.config.providers.values():
manager = self.getProviderManager(provider)
servers = manager.listServers()
for server in servers:
meta = server.get('metadata', {}).get('nodepool')
if not meta:
self.log.debug("Instance %s (%s) in %s has no "
"nodepool metadata" % (
server['name'], server['id'],
provider.name))
continue
meta = json.loads(meta)
if meta['provider_name'] not in known_providers:
self.log.debug("Instance %s (%s) in %s "
"lists unknown provider %s" % (
server['name'], server['id'],
provider.name,
meta['provider_name']))
continue
snap_image_id = meta.get('snapshot_image_id')
node_id = meta.get('node_id')
if snap_image_id:
if session.getSnapshotImage(snap_image_id):
continue
self.log.warning("Deleting leaked instance %s (%s) "
"in %s for snapshot image id %s" % (
server['name'], server['id'],
provider.name,
snap_image_id))
self.deleteInstance(provider.name, server['id'])
elif node_id:
if session.getNode(node_id):
continue
self.log.warning("Deleting leaked instance %s (%s) "
"in %s for node id %s " % (
server['name'], server['id'],
provider.name, node_id))
self.deleteInstance(provider.name, server['id'])
else:
self.log.warning("Instance %s (%s) in %s has no "
"database id" % (
server['name'], server['id'],
provider.name))
continue
def cleanupOneNode(self, session, node):
now = time.time()
time_in_state = now - node.state_time

View File

@ -91,6 +91,8 @@ def make_server_dict(server):
d['key_name'] = server.key_name
if hasattr(server, 'progress'):
d['progress'] = server.progress
if hasattr(server, 'metadata'):
d['metadata'] = server.metadata
d['public_v4'] = get_public_ip(server)
d['private_v4'] = get_private_ip(server)
d['public_v6'] = get_public_ip(server, version=6)

View File

@ -111,6 +111,9 @@ class BaseTestCase(testtools.TestCase, testresources.ResourcedTestCase):
while True:
done = True
for t in threading.enumerate():
if t.name.startswith("Thread-"):
# apscheduler thread pool
continue
if t.name not in whitelist:
done = False
if done:

View File

@ -0,0 +1,51 @@
script-dir: .
dburi: '{dburi}'
images-dir: '{images_dir}'
cron:
check: '*/15 * * * *'
cleanup: '* * * * * *'
image-update: '14 2 * * *'
zmq-publishers:
- tcp://localhost:8881
#gearman-servers:
# - host: localhost
labels:
- name: fake-label
image: fake-image
min-ready: 1
providers:
- name: fake-provider
providers:
- name: fake-provider
region-name: fake-region
keypair: 'if-present-use-this-keypair'
username: 'fake'
password: 'fake'
auth-url: 'fake'
project-id: 'fake'
max-servers: 96
pool: 'fake'
networks:
- net-id: 'some-uuid'
rate: 0.0001
images:
- name: fake-image
base-image: 'Fake Precise'
min-ram: 8192
name-filter: 'Fake'
meta:
key: value
key2: value
setup: prepare_node_devstack.sh
targets:
- name: fake-target
jenkins:
url: https://jenkins.example.org/
user: fake
apikey: fake

View File

@ -316,3 +316,47 @@ class TestNodepool(tests.DBTestCase):
with ExpectedException(requests.exceptions.ProxyError):
manager.listExtensions()
def test_leaked_node(self):
"""Test that a leaked node is deleted"""
configfile = self.setup_config('leaked_node.yaml')
pool = self.useNodepool(configfile, watermark_sleep=1)
pool.start()
self.waitForImage(pool, 'fake-provider', 'fake-image')
self.waitForNodes(pool)
# Make sure we have a node built and ready
provider = pool.config.providers['fake-provider']
manager = pool.getProviderManager(provider)
servers = manager.listServers()
self.assertEqual(len(servers), 1)
with pool.getDB().getSession() as session:
nodes = session.getNodes(provider_name='fake-provider',
label_name='fake-label',
target_name='fake-target',
state=nodedb.READY)
self.assertEqual(len(nodes), 1)
# Delete the node from the db, but leave the instance
# so it is leaked.
for node in nodes:
node.delete()
nodes = session.getNodes(provider_name='fake-provider',
label_name='fake-label',
target_name='fake-target',
state=nodedb.READY)
self.assertEqual(len(nodes), 0)
# Wait for nodepool to replace it, which should be enough
# time for it to also delete the leaked node
self.waitForNodes(pool)
# Make sure we end up with only one server (the replacement)
servers = manager.listServers()
self.assertEqual(len(servers), 1)
with pool.getDB().getSession() as session:
nodes = session.getNodes(provider_name='fake-provider',
label_name='fake-label',
target_name='fake-target',
state=nodedb.READY)
self.assertEqual(len(nodes), 1)