@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import configparser
import logging
import os
import time
@ -135,32 +136,57 @@ class DryDockOperator(BaseOperator):
else :
raise AirflowException ( " Unable to Retrieve Genesis Node IP! " )
# Read shipyard.conf
config = configparser . ConfigParser ( )
config . read ( self . shipyard_conf )
if not config . read ( self . shipyard_conf ) :
raise AirflowException ( " Unable to read content of shipyard.conf " )
# Create Task for verify_site
if self . action == ' verify_site ' :
# Default settings for 'verify_site' execution is to query
# the task every 10 seconds and to time out after 60 seconds
# TODO: Need to decide if we want to make polling interval and
# time out a variable in the Dags
self . drydock_action ( drydock_client , context , self . action , 10 , 60 )
query_interval = config . get ( ' drydock ' ,
' verify_site_query_interval ' )
task_timeout = config . get ( ' drydock ' , ' verify_site_task_timeout ' )
self . drydock_action ( drydock_client , context , self . action ,
query_interval , task_timeout )
# Create Task for prepare_site
elif self . action == ' prepare_site ' :
# Default settings for 'prepare_site' execution is to query
# the task every 10 seconds and to time out after 120 seconds
self . drydock_action ( drydock_client , context , self . action , 10 , 120 )
query_interval = config . get ( ' drydock ' ,
' prepare_site_query_interval ' )
task_timeout = config . get ( ' drydock ' , ' prepare_site_task_timeout ' )
self . drydock_action ( drydock_client , context , self . action ,
query_interval , task_timeout )
# Create Task for prepare_node
elif self . action == ' prepare_node ' :
elif self . action == ' prepare_node s ' :
# Default settings for 'prepare_node' execution is to query
# the task every 30 seconds and to time out after 1800 seconds
self . drydock_action ( drydock_client , context , self . action , 30 , 1800 )
query_interval = config . get ( ' drydock ' ,
' prepare_node_query_interval ' )
task_timeout = config . get ( ' drydock ' , ' prepare_node_task_timeout ' )
self . drydock_action ( drydock_client , context , self . action ,
query_interval , task_timeout )
# Create Task for deploy_node
elif self . action == ' deploy_node ' :
elif self . action == ' deploy_node s ' :
# Default settings for 'deploy_node' execution is to query
# the task every 30 seconds and to time out after 3600 seconds
self . drydock_action ( drydock_client , context , self . action , 30 , 3600 )
query_interval = config . get ( ' drydock ' ,
' deploy_node_query_interval ' )
task_timeout = config . get ( ' drydock ' , ' deploy_node_task_timeout ' )
self . drydock_action ( drydock_client , context , self . action ,
query_interval , task_timeout )
# Do not perform any action
else :
@ -247,7 +273,10 @@ class DryDockOperator(BaseOperator):
def drydock_query_task ( self , drydock_client , interval , time_out , task_id ) :
# Calculate number of times to execute the 'for' loop
end_range = int ( time_out / interval )
# Convert 'time_out' and 'interval' from string into integer
# The result from the division will be a floating number which
# We will round off to nearest whole number
end_range = round ( int ( time_out ) / int ( interval ) )
# Query task status
for i in range ( 0 , end_range + 1 ) :
@ -264,11 +293,12 @@ class DryDockOperator(BaseOperator):
if task_status == ' running ' and i == end_range :
raise AirflowException ( " Task Execution Timed Out! " )
# Exit 'for' loop if task is in 'complete' state
if task_status == ' complete ' :
# Exit 'for' loop if the task is in 'complete' or 'terminated'
# state
if task_status in [ ' complete ' , ' terminated ' ] :
break
else :
time . sleep ( interval )
time . sleep ( int ( interval ) )
# Get final task state
# NOTE: There is a known bug in Drydock where the task result