@ -38,8 +38,11 @@ from nodepool import launcher
from nodepool import webapp
from nodepool import zk
from nodepool . cmd . config_validator import ConfigValidator
from nodepool . nodeutils import iterate_timeout
TRUE_VALUES = ( ' true ' , ' 1 ' , ' yes ' )
SECOND = 1
ONE_MINUTE = 60 * SECOND
class LoggingPopen ( subprocess . Popen ) :
@ -224,7 +227,9 @@ class BaseTestCase(testtools.TestCase):
' pydevd.Writer ' ,
]
while True :
for _ in iterate_timeout ( ONE_MINUTE , Exception ,
" Transient threads to finish " ,
interval = 0.1 ) :
done = True
for t in threading . enumerate ( ) :
if t . name . startswith ( " Thread- " ) :
@ -245,7 +250,6 @@ class BaseTestCase(testtools.TestCase):
done = False
if done :
return
time . sleep ( 0.1 )
def assertReportedStat ( self , key , value = None , kind = None ) :
""" Check statsd output
@ -268,8 +272,9 @@ class BaseTestCase(testtools.TestCase):
if value :
self . assertNotEqual ( kind , None )
start = time . time ( )
while time . time ( ) < ( start + 5 ) :
for _ in iterate_timeout ( 5 * SECOND , Exception ,
" Find statsd event " ,
interval = 0.1 ) :
# Note our fake statsd just queues up results in a queue.
# We just keep going through them until we find one that
# matches, or fail out. If statsd pipelines are used,
@ -305,7 +310,6 @@ class BaseTestCase(testtools.TestCase):
# this key matches
return True
time . sleep ( 0.1 )
raise Exception ( " Key %s not found in reported stats " % key )
@ -407,7 +411,9 @@ class DBTestCase(BaseTestCase):
time . sleep ( 0.1 )
def waitForImage ( self , provider_name , image_name , ignore_list = None ) :
while True :
for _ in iterate_timeout ( ONE_MINUTE , Exception ,
" Image upload to be ready " ,
interval = 1 ) :
self . wait_for_threads ( )
image = self . zk . getMostRecentImageUpload ( image_name , provider_name )
if image :
@ -415,27 +421,28 @@ class DBTestCase(BaseTestCase):
break
elif not ignore_list :
break
time . sleep ( 1 )
self . wait_for_threads ( )
return image
def waitForUploadRecordDeletion ( self , provider_name , image_name ,
build_id , upload_id ) :
while True :
for _ in iterate_timeout ( ONE_MINUTE , Exception ,
" Image upload record deletion " ,
interval = 1 ) :
self . wait_for_threads ( )
uploads = self . zk . getUploads ( image_name , build_id , provider_name )
if not uploads or upload_id not in [ u . id for u in uploads ] :
break
time . sleep ( 1 )
self . wait_for_threads ( )
def waitForImageDeletion ( self , provider_name , image_name , match = None ) :
while True :
for _ in iterate_timeout ( ONE_MINUTE , Exception ,
" Image upload deletion " ,
interval = 1 ) :
self . wait_for_threads ( )
image = self . zk . getMostRecentImageUpload ( image_name , provider_name )
if not image or ( match and image != match ) :
break
time . sleep ( 1 )
self . wait_for_threads ( )
def waitForBuild ( self , image_name , build_id , states = None ) :
@ -444,12 +451,13 @@ class DBTestCase(BaseTestCase):
base = " - " . join ( [ image_name , build_id ] )
while True :
for _ in iterate_timeout ( ONE_MINUTE , Exception ,
" Image build record to reach state " ,
interval = 1 ) :
self . wait_for_threads ( )
build = self . zk . getBuild ( image_name , build_id )
if build and build . state in states :
break
time . sleep ( 1 )
# We should only expect a dib manifest with a successful build.
while build . state == zk . READY :
@ -465,34 +473,39 @@ class DBTestCase(BaseTestCase):
def waitForBuildDeletion ( self , image_name , build_id ) :
base = " - " . join ( [ image_name , build_id ] )
while True :
for _ in iterate_timeout ( ONE_MINUTE , Exception ,
" DIB build files deletion " ,
interval = 1 ) :
self . wait_for_threads ( )
files = builder . DibImageFile . from_image_id (
self . _config_images_dir . path , base )
if not files :
break
time . sleep ( 1 )
while True :
for _ in iterate_timeout ( ONE_MINUTE , Exception ,
" DIB build file deletion leaks " ,
interval = 1 ) :
self . wait_for_threads ( )
# Now, check the disk to ensure we didn't leak any files.
matches = glob . glob ( ' %s / %s .* ' % ( self . _config_images_dir . path ,
base ) )
if not matches :
break
time . sleep ( 1 )
while True :
for _ in iterate_timeout ( ONE_MINUTE , Exception ,
" Image build record deletion " ,
interval = 1 ) :
self . wait_for_threads ( )
build = self . zk . getBuild ( image_name , build_id )
if not build :
break
time . sleep ( 1 )
self . wait_for_threads ( )
def waitForNodeDeletion ( self , node ) :
while True :
for _ in iterate_timeout ( ONE_MINUTE , Exception ,
" Node record deletion " ,
interval = 1 ) :
exists = False
for n in self . zk . nodeIterator ( ) :
if node . id == n . id :
@ -500,17 +513,19 @@ class DBTestCase(BaseTestCase):
break
if not exists :
break
time . sleep ( 1 )
def waitForInstanceDeletion ( self , manager , instance_id ) :
while True :
for _ in iterate_timeout ( ONE_MINUTE , Exception ,
" Cloud instance deletion " ,
interval = 1 ) :
servers = manager . listNodes ( )
if not ( instance_id in [ s . id for s in servers ] ) :
break
time . sleep ( 1 )
def waitForNodeRequestLockDeletion ( self , request_id ) :
while True :
for _ in iterate_timeout ( ONE_MINUTE , Exception ,
" Node request lock deletion " ,
interval = 1 ) :
exists = False
for lock_id in self . zk . getNodeRequestLockIDs ( ) :
if request_id == lock_id :
@ -518,15 +533,15 @@ class DBTestCase(BaseTestCase):
break
if not exists :
break
time . sleep ( 1 )
def waitForNodes ( self , label , count = 1 ) :
while True :
for _ in iterate_timeout ( ONE_MINUTE , Exception ,
" Ready nodes " ,
interval = 1 ) :
self . wait_for_threads ( )
ready_nodes = self . zk . getReadyNodesOfTypes ( [ label ] )
if label in ready_nodes and len ( ready_nodes [ label ] ) == count :
break
time . sleep ( 1 )
self . wait_for_threads ( )
return ready_nodes [ label ]
@ -536,11 +551,12 @@ class DBTestCase(BaseTestCase):
'''
if states is None :
states = ( zk . FULFILLED , zk . FAILED )
while True :
for _ in iterate_timeout ( ONE_MINUTE , Exception ,
" Node request state transition " ,
interval = 1 ) :
req = self . zk . getNodeRequest ( req . id )
if req . state in states :
break
time . sleep ( 1 )
return req