@ -18,6 +18,7 @@ import os
import tempfile
import time
from unittest . mock import patch
from functools import wraps
import yaml
@ -234,97 +235,108 @@ class TestDriverGce(tests.DBTestCase):
except Exception :
pass
def _test_gce_machine ( self , label ,
def _test_with_pool ( the_test ) :
@wraps ( the_test )
def wrapper ( self , * args , * * kwargs ) :
self . patch ( googleapiclient , ' discovery ' , GCloudEmulator ( ) )
conf_template = os . path . join (
os . path . dirname ( __file__ ) , ' .. ' , ' fixtures ' , ' gce.yaml ' )
with open ( conf_template ) as f :
raw_config = yaml . safe_load ( f )
raw_config [ ' zookeeper-servers ' ] [ 0 ] = {
' host ' : self . zookeeper_host ,
' port ' : self . zookeeper_port ,
' chroot ' : self . zookeeper_chroot ,
}
raw_config [ ' zookeeper-tls ' ] = {
' ca ' : self . zookeeper_ca ,
' cert ' : self . zookeeper_cert ,
' key ' : self . zookeeper_key ,
}
with tempfile . NamedTemporaryFile ( ) as tf :
tf . write ( yaml . safe_dump (
raw_config , default_flow_style = False ) . encode ( ' utf-8 ' ) )
tf . flush ( )
configfile = self . setup_config ( tf . name )
pool = self . useNodepool ( configfile , watermark_sleep = 1 )
the_test ( self , pool , * args , * * kwargs )
return wrapper
@_test_with_pool
def test_gce_reconfigure ( self , pool ) :
pool . updateConfig ( )
pool . updateConfig ( )
@_test_with_pool
def _test_gce_machine ( self , pool , label ,
is_valid_config = True ,
host_key_checking = True ) :
self . patch ( googleapiclient , ' discovery ' , GCloudEmulator ( ) )
conf_template = os . path . join (
os . path . dirname ( __file__ ) , ' .. ' , ' fixtures ' , ' gce.yaml ' )
with open ( conf_template ) as f :
raw_config = yaml . safe_load ( f )
raw_config [ ' zookeeper-servers ' ] [ 0 ] = {
' host ' : self . zookeeper_host ,
' port ' : self . zookeeper_port ,
' chroot ' : self . zookeeper_chroot ,
}
raw_config [ ' zookeeper-tls ' ] = {
' ca ' : self . zookeeper_ca ,
' cert ' : self . zookeeper_cert ,
' key ' : self . zookeeper_key ,
}
with tempfile . NamedTemporaryFile ( ) as tf :
tf . write ( yaml . safe_dump (
raw_config , default_flow_style = False ) . encode ( ' utf-8 ' ) )
tf . flush ( )
configfile = self . setup_config ( tf . name )
pool = self . useNodepool ( configfile , watermark_sleep = 1 )
pool . start ( )
self . _wait_for_provider ( pool , ' gcloud-provider ' )
with patch ( ' nodepool.driver.simple.nodescan ' ) as nodescan :
nodescan . return_value = ' MOCK KEY '
req = zk . NodeRequest ( )
req . state = zk . REQUESTED
req . node_types . append ( label )
self . zk . storeNodeRequest ( req )
self . log . debug ( " Waiting for request %s " , req . id )
req = self . waitForNodeRequest ( req )
self . log . debug ( " Finished request %s " , req . id )
if is_valid_config is False :
self . assertEqual ( req . state , zk . FAILED )
self . assertEqual ( req . nodes , [ ] )
return
self . assertEqual ( req . state , zk . FULFILLED )
self . assertNotEqual ( req . nodes , [ ] )
node = self . zk . getNode ( req . nodes [ 0 ] )
self . assertEqual ( node . allocated_to , req . id )
self . assertEqual ( node . state , zk . READY )
self . assertIsNotNone ( node . launcher )
self . assertEqual ( node . connection_type , ' ssh ' )
self . assertEqual ( node . attributes ,
{ ' key1 ' : ' value1 ' , ' key2 ' : ' value2 ' } )
if host_key_checking :
nodescan . assert_called_with (
node . interface_ip ,
port = 22 ,
timeout = 180 ,
gather_hostkeys = True )
# A new request will be paused and for lack of quota
# until this one is deleted
req2 = zk . NodeRequest ( )
req2 . state = zk . REQUESTED
req2 . node_types . append ( label )
self . zk . storeNodeRequest ( req2 )
req2 = self . waitForNodeRequest (
req2 , ( zk . PENDING , zk . FAILED , zk . FULFILLED ) )
self . assertEqual ( req2 . state , zk . PENDING )
# It could flip from PENDING to one of the others,
# so sleep a bit and be sure
time . sleep ( 1 )
req2 = self . waitForNodeRequest (
req2 , ( zk . PENDING , zk . FAILED , zk . FULFILLED ) )
self . assertEqual ( req2 . state , zk . PENDING )
node . state = zk . DELETING
self . zk . storeNode ( node )
self . waitForNodeDeletion ( node )
req2 = self . waitForNodeRequest ( req2 ,
( zk . FAILED , zk . FULFILLED ) )
self . assertEqual ( req2 . state , zk . FULFILLED )
node = self . zk . getNode ( req2 . nodes [ 0 ] )
node . state = zk . DELETING
self . zk . storeNode ( node )
self . waitForNodeDeletion ( node )
pool . start ( )
self . _wait_for_provider ( pool , ' gcloud-provider ' )
with patch ( ' nodepool.driver.simple.nodescan ' ) as nodescan :
nodescan . return_value = ' MOCK KEY '
req = zk . NodeRequest ( )
req . state = zk . REQUESTED
req . node_types . append ( label )
self . zk . storeNodeRequest ( req )
self . log . debug ( " Waiting for request %s " , req . id )
req = self . waitForNodeRequest ( req )
self . log . debug ( " Finished request %s " , req . id )
if is_valid_config is False :
self . assertEqual ( req . state , zk . FAILED )
self . assertEqual ( req . nodes , [ ] )
return
self . assertEqual ( req . state , zk . FULFILLED )
self . assertNotEqual ( req . nodes , [ ] )
node = self . zk . getNode ( req . nodes [ 0 ] )
self . assertEqual ( node . allocated_to , req . id )
self . assertEqual ( node . state , zk . READY )
self . assertIsNotNone ( node . launcher )
self . assertEqual ( node . connection_type , ' ssh ' )
self . assertEqual ( node . attributes ,
{ ' key1 ' : ' value1 ' , ' key2 ' : ' value2 ' } )
if host_key_checking :
nodescan . assert_called_with (
node . interface_ip ,
port = 22 ,
timeout = 180 ,
gather_hostkeys = True )
# A new request will be paused and for lack of quota
# until this one is deleted
req2 = zk . NodeRequest ( )
req2 . state = zk . REQUESTED
req2 . node_types . append ( label )
self . zk . storeNodeRequest ( req2 )
req2 = self . waitForNodeRequest (
req2 , ( zk . PENDING , zk . FAILED , zk . FULFILLED ) )
self . assertEqual ( req2 . state , zk . PENDING )
# It could flip from PENDING to one of the others,
# so sleep a bit and be sure
time . sleep ( 1 )
req2 = self . waitForNodeRequest (
req2 , ( zk . PENDING , zk . FAILED , zk . FULFILLED ) )
self . assertEqual ( req2 . state , zk . PENDING )
node . state = zk . DELETING
self . zk . storeNode ( node )
self . waitForNodeDeletion ( node )
req2 = self . waitForNodeRequest ( req2 ,
( zk . FAILED , zk . FULFILLED ) )
self . assertEqual ( req2 . state , zk . FULFILLED )
node = self . zk . getNode ( req2 . nodes [ 0 ] )
node . state = zk . DELETING
self . zk . storeNode ( node )
self . waitForNodeDeletion ( node )
def test_gce_machine ( self ) :
self . _test_gce_machine ( ' debian-stretch-f1-micro ' )