@ -13,7 +13,9 @@
# limitations under the License.
import logging
import os
import time
from urllib . parse import urlparse
from airflow . exceptions import AirflowException
from airflow . models import BaseOperator
@ -22,53 +24,55 @@ from airflow.utils.decorators import apply_defaults
import drydock_provisioner . drydock_client . client as client
import drydock_provisioner . drydock_client . session as session
from get_k8s_pod_port_ip import get_pod_port_ip
from service_endpoint import ucp_service_endpoint
from service_token import shipyard_service_token
class DryDockOperator ( BaseOperator ) :
"""
DryDock Client
: host : Target Host
: port : DryDock Port
: shipyard_conf : Location of shipyard . conf
: drydock_conf : Location of drydock YAML
: promenade_conf : Location of promenade YAML
: action : Task to perform
: design_id : DryDock Design ID
: workflow_info : Information related to the workflow
: main_dag_name : Parent Dag
: sub_dag_name : Child Dag
: param action : Task to perform
: param design_ref : A URI reference to the design documents
: param main_dag_name : Parent Dag
: param node_filter : A filter for narrowing the scope of the task . Valid
fields are ' node_names ' , ' rack_names ' , ' node_tags '
: param shipyard_conf : Location of shipyard . conf
: param sub_dag_name : Child Dag
: param workflow_info : Information related to the workflow
"""
@apply_defaults
def __init__ ( self ,
host = None ,
port = None ,
action = None ,
design_id = None ,
shipyard_conf = None ,
drydock_conf = None ,
promenade_conf = None ,
workflow_info = { } ,
design_ref = None ,
main_dag_name = None ,
node_filter = None ,
shipyard_conf = None ,
sub_dag_name = None ,
workflow_info = { } ,
xcom_push = True ,
* args , * * kwargs ) :
super ( DryDockOperator , self ) . __init__ ( * args , * * kwargs )
self . host = host
self . port = port
self . shipyard_conf = shipyard_conf
self . drydock_conf = drydock_conf
self . promenade_conf = promenade_conf
self . action = action
self . design_id = design_id
self . workflow_info = workflow_info
self . design_ref = design_ref
self . main_dag_name = main_dag_name
self . node_filter = node_filter
self . shipyard_conf = shipyard_conf
self . sub_dag_name = sub_dag_name
self . workflow_info = workflow_info
self . xcom_push_flag = xcom_push
def execute ( self , context ) :
# Initialize Variables
context [ ' svc_type ' ] = ' physicalprovisioner '
genesis_node_ip = None
# Placeholder definition
# TODO: Need to decide how to pass the required value from Shipyard to
# the 'node_filter' variable. No filter will be used for now.
self . node_filter = None
# Define task_instance
task_instance = context [ ' task_instance ' ]
@ -85,6 +89,11 @@ class DryDockOperator(BaseOperator):
# DrydockClient
if self . action == ' create_drydock_client ' :
# Retrieve Endpoint Information
context [ ' svc_endpoint ' ] = ucp_service_endpoint ( self , context )
logging . info ( " DryDock endpoint is %s " , context [ ' svc_endpoint ' ] )
# Set up DryDock Client
drydock_client = self . drydock_session_client ( context )
return drydock_client
@ -94,157 +103,91 @@ class DryDockOperator(BaseOperator):
task_ids = ' create_drydock_client ' ,
dag_id = self . sub_dag_name + ' .create_drydock_client ' )
# Get Design ID
if self . action == ' get_design_id ' :
design_id = self . drydock_create_design ( drydock_client )
return design_id
# DryDock Load Parts
elif self . action == ' drydock_load_parts ' :
self . parts_type = ' drydock '
self . load_parts ( drydock_client , context , self . parts_type )
# Promenade Load Parts
elif self . action == ' promenade_load_parts ' :
self . parts_type = ' promenade '
self . load_parts ( drydock_client , context , self . parts_type )
# Based on the current logic used in CI/CD pipeline, we will
# point to the nginx server on the genesis host which is hosting
# the drydock YAMLs. That will be the URL for 'design_ref'
# NOTE: This is a temporary hack and will be removed once
# Artifactory or Deckhand is able to host the YAMLs
# NOTE: Testing of the Operator was performed with a nginx server
# on the genesis host that is listening on port 6880. The name of
# the YAML file is 'drydock.yaml'. We will use this assumption
# for now.
# TODO: This logic will be updated once DeckHand is integrated
# with DryDock
logging . info ( " Retrieving information of Tiller pod to obtain Genesis "
" Node IP... " )
# Retrieve Genesis Node IP
genesis_node_ip = self . get_genesis_node_ip ( context )
# Form the URL path that we will use to retrieve the DryDock YAMLs
# Return Exceptions if we are not able to retrieve the URL
if genesis_node_ip :
schema = ' http:// '
nginx_host_port = genesis_node_ip + ' :6880 '
drydock_yaml = ' drydock.yaml '
self . design_ref = os . path . join ( schema ,
nginx_host_port ,
drydock_yaml )
logging . info ( " Drydock YAMLs will be retrieved from %s " ,
self . design_ref )
else :
raise AirflowException ( " Unable to Retrieve Genesis Node IP! " )
# Create Task for verify_site
elif self . action == ' verify_site ' :
self . perform_task = ' verify_site '
verify_site = self . drydock_perform_task ( drydock_client , context ,
self . perform_task , None )
# Define variables
# Query every 10 seconds for 1 minute
interval = 10
time_out = 60
desired_state = ' success '
# Query verify_site Task
task_id = verify_site [ ' task_id ' ]
logging . info ( task_id )
verify_site_status = self . drydock_query_task ( drydock_client ,
interval ,
time_out ,
task_id ,
desired_state )
if verify_site_status == ' timed_out ' :
raise AirflowException ( ' Verify_Site Task Timed Out! ' )
elif verify_site_status == ' task_failed ' :
raise AirflowException ( ' Verify_Site Task Failed! ' )
else :
logging . info ( ' Verify Site Task: ' )
logging . info ( verify_site_status )
if self . action == ' verify_site ' :
# Default settings for 'verify_site' execution is to query
# the task every 10 seconds and to time out after 60 seconds
# TODO: Need to decide if we want to make polling interval and
# time out a variable in the Dags
self . drydock_action ( drydock_client , context , self . action , 10 , 60 )
# Create Task for prepare_site
elif self . action == ' prepare_site ' :
self . perform_task = ' prepare_site '
prepare_site = self . drydock_perform_task ( drydock_client , context ,
self . perform_task , None )
# Define variables
# Query every 10 seconds for 2 minutes
interval = 10
time_out = 120
desired_state = ' partial_success '
# Query prepare_site Task
task_id = prepare_site [ ' task_id ' ]
logging . info ( task_id )
prepare_site_status = self . drydock_query_task ( drydock_client ,
interval ,
time_out ,
task_id ,
desired_state )
if prepare_site_status == ' timed_out ' :
raise AirflowException ( ' Prepare_Site Task Timed Out! ' )
elif prepare_site_status == ' task_failed ' :
raise AirflowException ( ' Prepare_Site Task Failed! ' )
else :
logging . info ( ' Prepare Site Task: ' )
logging . info ( prepare_site_status )
# Default settings for 'prepare_site' execution is to query
# the task every 10 seconds and to time out after 120 seconds
self . drydock_action ( drydock_client , context , self . action , 10 , 120 )
# Create Task for prepare_node
elif self . action == ' prepare_node ' :
self . perform_task = ' prepare_node '
prepare_node = self . drydock_perform_task (
drydock_client , context ,
self . perform_task , workflow_info )
# Define variables
# Query every 30 seconds for 30 minutes
interval = 30
time_out = 1800
desired_state = ' success '
# Query prepare_node Task
task_id = prepare_node [ ' task_id ' ]
logging . info ( task_id )
prepare_node_status = self . drydock_query_task ( drydock_client ,
interval ,
time_out ,
task_id ,
desired_state )
if prepare_node_status == ' timed_out ' :
raise AirflowException ( ' Prepare_Node Task Timed Out! ' )
elif prepare_node_status == ' task_failed ' :
raise AirflowException ( ' Prepare_Node Task Failed! ' )
else :
logging . info ( ' Prepare Node Task: ' )
logging . info ( prepare_node_status )
# Default settings for 'prepare_node' execution is to query
# the task every 30 seconds and to time out after 1800 seconds
self . drydock_action ( drydock_client , context , self . action , 30 , 1800 )
# Create Task for deploy_node
elif self . action == ' deploy_node ' :
self . perform_task = ' deploy_node '
deploy_node = self . drydock_perform_task ( drydock_client ,
context ,
self . perform_task ,
workflow_info )
# Define variables
# Query every 30 seconds for 60 minutes
interval = 30
time_out = 3600
desired_state = ' success '
# Query deploy_node Task
task_id = deploy_node [ ' task_id ' ]
logging . info ( task_id )
deploy_node_status = self . drydock_query_task ( drydock_client ,
interval ,
time_out ,
task_id ,
desired_state )
if deploy_node_status == ' timed_out ' :
raise AirflowException ( ' Deploy_Node Task Timed Out! ' )
elif deploy_node_status == ' task_failed ' :
raise AirflowException ( ' Deploy_Node Task Failed! ' )
else :
logging . info ( ' Deploy Node Task: ' )
logging . info ( deploy_node_status )
# Default settings for 'deploy_node' execution is to query
# the task every 30 seconds and to time out after 3600 seconds
self . drydock_action ( drydock_client , context , self . action , 30 , 3600 )
# Do not perform any action
else :
logging . info ( ' No Action to Perform ' )
@shipyard_service_token
def drydock_session_client ( self , context ) :
# Initialize Variables
drydock_url = None
dd_session = None
dd_client = None
# Parse DryDock Service Endpoint
drydock_url = urlparse ( context [ ' svc_endpoint ' ] )
# Build a DrydockSession with credentials and target host
# information.
logging . info ( " Build DryDock Session " )
dd_session = session . DrydockSession ( self . host , port = self . port ,
dd_session = session . DrydockSession ( drydock_url . hostname ,
port = drydock_url . port ,
token = context [ ' svc_token ' ] )
# Raise Exception if we are not able to get a drydock session
if dd_session :
logging . info ( " Successfully Built DryDock Session" )
logging . info ( " Successfully Set Up DryDock Session " )
else :
raise AirflowException ( " Unable to get a Drydock Session " )
raise AirflowException ( " Failed to set up Drydock Session! " )
# Use session to build a DrydockClient to make one or more API calls
# The DrydockSession will care for TCP connection pooling
@ -254,126 +197,102 @@ class DryDockOperator(BaseOperator):
# Raise Exception if we are not able to build drydock client
if dd_client :
logging . info ( " Successfully Built DryDock client" )
logging . info ( " Successfully Set Up DryDock client" )
else :
raise AirflowException ( " Unable to Build Drydock Client " )
raise AirflowException ( " Unable to set up Drydock Client! " )
# Drydock client for XCOM Usage
return dd_client
def drydock_create_design ( self , drydock_client ) :
# Create Design
logging . info ( ' Create Design ID ' )
drydock_design_id = drydock_client . create_design ( )
# Raise Exception if we are not able to get a value
# from drydock create_design API call
if drydock_design_id :
return drydock_design_id
else :
raise AirflowException ( " Unable to create Design ID " )
def get_design_id ( self , context ) :
def drydock_action ( self , drydock_client , context , action , interval ,
time_out ) :
# Get Design ID from XCOM
task_instance = context [ ' task_instance ' ]
design_id = task_instance . xcom_pull (
task_ids = ' drydock_get_design_id ' ,
dag_id = self . sub_dag_name + ' .drydock_get_design_id ' )
return design_id
def load_parts ( self , drydock_client , context , parts_type ) :
# Load new design parts into a design context via YAML conforming
# to the Drydock design YAML schema
# Open drydock.yaml/promenade.yaml as string so that it can be
# ingested. This step will change in future when DeckHand is
# integrated with the system
if self . parts_type == ' drydock ' :
with open ( self . drydock_conf , " r " ) as drydock_yaml :
yaml_string = drydock_yaml . read ( )
else :
with open ( self . promenade_conf , " r " ) as promenade_yaml :
yaml_string = promenade_yaml . read ( )
# Trigger DryDock to execute task and retrieve task ID
task_id = self . drydock_perform_task ( drydock_client , context ,
action , None )
# Get Design ID and pass it to DryDock
self . design_id = self . get_design_id ( context )
logging . info ( ' Task ID is %s ' , task_id )
# Load Design
# Return Exception if list is empty
logging . info ( " Load %s Configuration Yaml " , self . parts_type )
load_design = drydock_client . load_parts ( self . design_id ,
yaml_string = yaml_string )
if len ( load_design ) == 0 :
raise AirflowException ( " Empty Design. Please check input Yaml. " )
else :
logging . info ( load_design )
# Query Task
self . drydock_query_task ( drydock_client , interval , time_out ,
task_id )
def drydock_perform_task ( self , drydock_client , context ,
perform_task , workflow_info ) :
# Get Design ID and pass it to DryDock
self . design_id = self . get_design_id ( context )
perform_task , nodes_filter ) :
# Task to do
task_to_perform = self . perform_task
# Initialize Variables
create_task_response = { }
task_id = None
# Node Filter
if workflow_info :
nodes_filter = workflow_info [ ' parameters ' ] [ ' servername ' ]
else :
nodes_filter = None
logging . info ( " Nodes Filter List: %s " , nodes_filter )
# Get uuid of the create_task's id
self . task_id = drydock_client . create_task ( self . design_id ,
task_to_perform ,
nodes_filter )
# Create Task
create_task_response = drydock_client . create_task (
design_ref = self . design_ref ,
task_action = perform_task ,
node_filter = nodes_filter )
# Get current state/response of the drydock task
task_status = drydock_client . get_task ( self . task_id )
# Retrieve Task ID
task_id = create_task_response . get ( ' task_id ' )
logging . info ( ' Drydock %s task ID is %s ' , perform_task , task_id )
# task_status should contain information and be of length > 0
# Raise Exception if that is not the case
if len ( task_status ) == 0 :
r aise AirflowException ( " Unable to get task state " )
# Raise Exception if we are not able to get the task_id from
# drydock
if task_id :
r eturn task_id
else :
r eturn task_status
r aise AirflowException ( " Unable to create task! " )
def drydock_query_task ( self , drydock_client , interval , time_out ,
task_id , desired_state ) :
def drydock_query_task ( self , drydock_client , interval , time_out , task_id ) :
# Calculate number of times to execute the 'for' loop
end_range = int ( time_out / interval )
# Query task stat e
# Query task stat us
for i in range ( 0 , end_range + 1 ) :
# Retrieve current task state
task_state = drydock_client . get_task ( self . task_id )
logging . info ( task_state )
task_state = drydock_client . get_task ( task_id = task_id )
task_status = task_state . get ( ' status ' )
task_results = task_state . get ( ' result ' ) [ ' status ' ]
# Return Time Out Exception
if task_state [ ' status ' ] == ' running ' and i == end_range :
logging . info ( ' Timed Out! ' )
return ' timed_out '
logging . info ( " Current status of task id %s is %s " ,
task_id , task_status )
# Raise Time Out Exception
if task_status == ' running ' and i == end_range :
raise AirflowException ( " Task Execution Timed Out! " )
# Exit 'for' loop if task is in 'complete' state
if task_stat e[ ' stat us' ] == ' complete ' :
if task_stat us == ' complete ' :
break
else :
time . sleep ( interval )
# Get final task state
if task_state [ ' result ' ] == desired_state :
return drydock_client . get_task ( self . task_id )
# NOTE: There is a known bug in Drydock where the task result
# for a successfully completed task can either be 'success' or
# 'partial success'. This will be fixed in Drydock in the near
# future. Updates will be made to the Drydock Operator once the
# bug is fixed.
if task_results in [ ' success ' , ' partial_success ' ] :
logging . info ( ' Task id %s has been successfully completed ' ,
self . task_id )
else :
return ' task_failed '
raise AirflowException ( " Failed to execute/complete task! " )
@get_pod_port_ip ( ' tiller ' )
def get_genesis_node_ip ( self , context , * args ) :
# Get IP and port information of Pods from context
k8s_pods_ip_port = context [ ' pods_ip_port ' ]
# Tiller will take the IP of the Genesis Node. Retrieve
# the IP of tiller to get the IP of the Genesis Node
genesis_ip = k8s_pods_ip_port [ ' tiller ' ] . get ( ' ip ' )
return genesis_ip
class DryDockClientPlugin ( AirflowPlugin ) :