@ -28,6 +28,7 @@ from airflow.utils.decorators import apply_defaults
import drydock_provisioner . drydock_client . client as client
import drydock_provisioner . drydock_client . session as session
from check_k8s_node_status import check_node_status
from drydock_provisioner import error as errors
from service_endpoint import ucp_service_endpoint
from service_token import shipyard_service_token
@ -152,7 +153,7 @@ class DryDockOperator(BaseOperator):
# Create Task for prepare_site
elif self . action == ' prepare_site ' :
# Default settings for 'prepare_site' execution is to query
# the task every 10 seconds and to time out after 12 0 seconds
# the task every 10 seconds and to time out after 30 0 seconds
query_interval = config . get ( ' drydock ' ,
' prepare_site_query_interval ' )
task_timeout = config . get ( ' drydock ' , ' prepare_site_task_timeout ' )
@ -249,8 +250,8 @@ class DryDockOperator(BaseOperator):
logging . info ( ' Task ID is %s ' , task_id )
# Query Task
self . drydock_query_task ( drydock_client , interval , time_out ,
task_id )
self . drydock_query_task ( drydock_client , context , interval ,
time_out , t ask_id )
def drydock_perform_task ( self , drydock_client , context ,
perform_task , nodes_filter ) :
@ -279,7 +280,13 @@ class DryDockOperator(BaseOperator):
else :
raise AirflowException ( " Unable to create task! " )
def drydock_query_task ( self , drydock_client , interval , time_out , task_id ) :
def drydock_query_task ( self , drydock_client , context , interval ,
time_out , task_id ) :
# Initialize Variables
keystone_token_expired = False
new_dd_client = None
dd_client = drydock_client
# Calculate number of times to execute the 'for' loop
# Convert 'time_out' and 'interval' from string into integer
@ -290,15 +297,49 @@ class DryDockOperator(BaseOperator):
# Query task status
for i in range ( 0 , end_range + 1 ) :
if keystone_token_expired :
logging . info ( " Established new drydock session " )
dd_client = new_dd_client
try :
# Retrieve current task state
task_state = drydock_client . get_task ( task_id = task_id )
task_state = dd_client . get_task ( task_id = task_id )
task_status = task_state . get ( ' status ' )
task_result = task_state . get ( ' result ' ) [ ' status ' ]
logging . info ( " Current status of task id %s is %s " ,
task_id , task_status )
keystone_token_expired = False
except errors . ClientUnauthorizedError as unauthorized_error :
# TODO: This is a temporary workaround. Drydock will be
# updated with the appropriate fix in the drydock api
# client by having the session detect a 401/403 response
# and refresh the token appropriately.
# Logs drydock client unauthorized error
keystone_token_expired = True
logging . error ( unauthorized_error )
# Set up new drydock client with new keystone token
logging . info ( " Setting up new drydock session... " )
context [ ' svc_endpoint ' ] = ucp_service_endpoint (
self , svc_type = ' physicalprovisioner ' )
new_dd_client = self . drydock_session_client ( context )
except errors . ClientForbiddenError as forbidden_error :
raise AirflowException ( forbidden_error )
except errors . ClientError as client_error :
raise AirflowException ( client_error )
except :
# There can be instances where there are intermittent network
# issues that prevents us from retrieving the task state. We
# will want to retry in such situations.
logging . info ( " Unable to retrieve task state. Retrying... " )
# Raise Time Out Exception