diff --git a/shipyard_airflow/dags/drydock_deploy_site.py b/shipyard_airflow/dags/drydock_deploy_site.py index d3f62507..ad0242b5 100644 --- a/shipyard_airflow/dags/drydock_deploy_site.py +++ b/shipyard_airflow/dags/drydock_deploy_site.py @@ -12,8 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import configparser - from airflow.models import DAG from airflow.operators.subdag_operator import SubDagOperator from airflow.operators import DryDockOperator @@ -22,177 +20,39 @@ from airflow.operators import DryDockOperator # Location of shiyard.conf config_path = '/usr/local/airflow/plugins/shipyard.conf' -# Read and parse shiyard.conf -config = configparser.ConfigParser() -config.read(config_path) - -# Define Variables -drydock_target_host = config.get('drydock', 'host') -drydock_port = config.get('drydock', 'port') -drydock_conf = config.get('drydock', 'site_yaml') -promenade_conf = config.get('drydock', 'prom_yaml') -parent_dag = 'deploy_site' -child_dag = 'deploy_site.drydock_build' - - -def create_drydock_client(parent_dag_name, child_dag_name, args): - ''' - Create Drydock Client - ''' - dag = DAG( - '{}.{}'.format(parent_dag_name, child_dag_name), - default_args=args, ) - - operator = DryDockOperator( - task_id='create_drydock_client', - host=drydock_target_host, - port=drydock_port, - shipyard_conf=config_path, - action='create_drydock_client', - main_dag_name=parent_dag, - sub_dag_name=child_dag, - dag=dag) - - return dag - - -def drydock_get_design_id(parent_dag_name, child_dag_name, args): - ''' - Get Design ID - ''' - dag = DAG( - '{}.{}'.format(parent_dag_name, child_dag_name), - default_args=args, ) - - operator = DryDockOperator( - task_id='drydock_get_design_id', - action='get_design_id', - main_dag_name=parent_dag, - sub_dag_name=child_dag, - dag=dag) - - return dag - - -def drydock_load_parts(parent_dag_name, child_dag_name, args): - ''' - Load DryDock Yaml - ''' - dag = DAG( - '{}.{}'.format(parent_dag_name, child_dag_name), - default_args=args, ) - - operator = DryDockOperator( - task_id='drydock_load_parts', - drydock_conf=drydock_conf, - action='drydock_load_parts', - main_dag_name=parent_dag, - sub_dag_name=child_dag, - dag=dag) - - return dag - - -def promenade_load_parts(parent_dag_name, child_dag_name, args): - ''' - Load Promenade Yaml - ''' - dag = DAG( - '{}.{}'.format(parent_dag_name, child_dag_name), - default_args=args, ) - - operator = DryDockOperator( - task_id='promenade_load_parts', - promenade_conf=promenade_conf, - action='promenade_load_parts', - main_dag_name=parent_dag, - sub_dag_name=child_dag, - dag=dag) - - return dag - - -def drydock_verify_site(parent_dag_name, child_dag_name, args): - ''' - Verify connectivity between DryDock and MAAS - ''' - dag = DAG( - '{}.{}'.format(parent_dag_name, child_dag_name), - default_args=args, ) - - operator = DryDockOperator( - task_id='drydock_verify_site', - action='verify_site', - main_dag_name=parent_dag, - sub_dag_name=child_dag, - dag=dag) - - return dag - - -def drydock_prepare_site(parent_dag_name, child_dag_name, args): - ''' - Prepare site for deployment - ''' - dag = DAG( - '{}.{}'.format(parent_dag_name, child_dag_name), - default_args=args, ) - - operator = DryDockOperator( - task_id='drydock_prepare_site', - action='prepare_site', - main_dag_name=parent_dag, - sub_dag_name=child_dag, - dag=dag) - - return dag - - -def drydock_prepare_node(parent_dag_name, child_dag_name, args): - ''' - Prepare nodes for deployment - ''' - dag = DAG( - '{}.{}'.format(parent_dag_name, child_dag_name), - default_args=args, ) - - operator = DryDockOperator( - task_id='drydock_prepare_node', - action='prepare_node', - main_dag_name=parent_dag, - sub_dag_name=child_dag, - dag=dag) - - return dag - - -def drydock_deploy_node(parent_dag_name, child_dag_name, args): - ''' - Deploy Nodes - ''' - dag = DAG( - '{}.{}'.format(parent_dag_name, child_dag_name), - default_args=args, ) - - operator = DryDockOperator( - task_id='drydock_deploy_node', - action='deploy_node', - main_dag_name=parent_dag, - sub_dag_name=child_dag, - dag=dag) - - return dag - - # Names used for sub-subdags in the drydock site deployment subdag CREATE_DRYDOCK_CLIENT_DAG_NAME = 'create_drydock_client' -DRYDOCK_GET_DESIGN_ID_DAG_NAME = 'drydock_get_design_id' -DRYDOCK_LOAD_PARTS_DAG_NAME = 'drydock_load_parts' -PROMENADE_LOAD_PARTS_DAG_NAME = 'promenade_load_parts' -DRYDOCK_VERIFY_SITE_DAG_NAME = 'drydock_verify_site' -DRYDOCK_PREPARE_SITE_DAG_NAME = 'drydock_prepare_site' -DRYDOCK_PREPARE_NODE_DAG_NAME = 'drydock_prepare_node' -DRYDOCK_DEPLOY_NODE_DAG_NAME = 'drydock_deploy_node' +DRYDOCK_VERIFY_SITE_DAG_NAME = 'verify_site' +DRYDOCK_PREPARE_SITE_DAG_NAME = 'prepare_site' +DRYDOCK_PREPARE_NODE_DAG_NAME = 'prepare_node' +DRYDOCK_DEPLOY_NODE_DAG_NAME = 'deploy_node' + + +def get_drydock_subdag_step(parent_dag_name, child_dag_name, args): + ''' + Execute DryDock Subdag + ''' + dag = DAG( + '{}.{}'.format(parent_dag_name, child_dag_name), + default_args=args) + # Note that in the event where the 'deploy_site' action is + # triggered from Shipyard, the 'parent_dag_name' variable + # gets assigned with 'deploy_site.create_drydock_client'. + # This is the name that we want to assign to the subdag so + # that we can reference it for xcom. The name of the main + # dag will be the front part of that value, i.e. 'deploy_site'. + # Hence we will extract the front part and assign it to main_dag. + # We will reuse this pattern for other Actions, e.g. update_site, + # redeploy_site as well. + operator = DryDockOperator( + task_id=child_dag_name, + shipyard_conf=config_path, + action=child_dag_name, + main_dag_name=parent_dag_name[0:parent_dag_name.find('.')], + sub_dag_name=parent_dag_name, + dag=dag) + + return dag def deploy_site_drydock(parent_dag_name, child_dag_name, args): @@ -201,64 +61,47 @@ def deploy_site_drydock(parent_dag_name, child_dag_name, args): ''' dag = DAG( '{}.{}'.format(parent_dag_name, child_dag_name), - default_args=args, ) + default_args=args) drydock_client = SubDagOperator( - subdag=create_drydock_client(dag.dag_id, - CREATE_DRYDOCK_CLIENT_DAG_NAME, - args), + subdag=get_drydock_subdag_step(dag.dag_id, + CREATE_DRYDOCK_CLIENT_DAG_NAME, + args), task_id=CREATE_DRYDOCK_CLIENT_DAG_NAME, - dag=dag, ) + dag=dag) - design_id = SubDagOperator( - subdag=drydock_get_design_id( - dag.dag_id, DRYDOCK_GET_DESIGN_ID_DAG_NAME, args), - task_id=DRYDOCK_GET_DESIGN_ID_DAG_NAME, - dag=dag, ) - - drydock_yaml = SubDagOperator( - subdag=drydock_load_parts( - dag.dag_id, DRYDOCK_LOAD_PARTS_DAG_NAME, args), - task_id=DRYDOCK_LOAD_PARTS_DAG_NAME, - dag=dag, ) - - promenade_yaml = SubDagOperator( - subdag=promenade_load_parts(dag.dag_id, - PROMENADE_LOAD_PARTS_DAG_NAME, args), - task_id=PROMENADE_LOAD_PARTS_DAG_NAME, - dag=dag, ) - - verify_site = SubDagOperator( - subdag=drydock_verify_site(dag.dag_id, - DRYDOCK_VERIFY_SITE_DAG_NAME, args), + drydock_verify_site = SubDagOperator( + subdag=get_drydock_subdag_step(dag.dag_id, + DRYDOCK_VERIFY_SITE_DAG_NAME, + args), task_id=DRYDOCK_VERIFY_SITE_DAG_NAME, - dag=dag, ) + dag=dag) - prepare_site = SubDagOperator( - subdag=drydock_prepare_site(dag.dag_id, - DRYDOCK_PREPARE_SITE_DAG_NAME, args), + drydock_prepare_site = SubDagOperator( + subdag=get_drydock_subdag_step(dag.dag_id, + DRYDOCK_PREPARE_SITE_DAG_NAME, + args), task_id=DRYDOCK_PREPARE_SITE_DAG_NAME, - dag=dag, ) + dag=dag) - prepare_node = SubDagOperator( - subdag=drydock_prepare_node(dag.dag_id, - DRYDOCK_PREPARE_NODE_DAG_NAME, args), + drydock_prepare_node = SubDagOperator( + subdag=get_drydock_subdag_step(dag.dag_id, + DRYDOCK_PREPARE_NODE_DAG_NAME, + args), task_id=DRYDOCK_PREPARE_NODE_DAG_NAME, - dag=dag, ) + dag=dag) - deploy_node = SubDagOperator( - subdag=drydock_deploy_node(dag.dag_id, - DRYDOCK_DEPLOY_NODE_DAG_NAME, args), + drydock_deploy_node = SubDagOperator( + subdag=get_drydock_subdag_step(dag.dag_id, + DRYDOCK_DEPLOY_NODE_DAG_NAME, + args), task_id=DRYDOCK_DEPLOY_NODE_DAG_NAME, - dag=dag, ) + dag=dag) # DAG Wiring - design_id.set_upstream(drydock_client) - drydock_yaml.set_upstream(design_id) - promenade_yaml.set_upstream(drydock_yaml) - verify_site.set_upstream(promenade_yaml) - prepare_site.set_upstream(verify_site) - prepare_node.set_upstream(prepare_site) - deploy_node.set_upstream(prepare_node) + drydock_verify_site.set_upstream(drydock_client) + drydock_prepare_site.set_upstream(drydock_verify_site) + drydock_prepare_node.set_upstream(drydock_prepare_site) + drydock_deploy_node.set_upstream(drydock_prepare_node) return dag diff --git a/shipyard_airflow/plugins/drydock_operators.py b/shipyard_airflow/plugins/drydock_operators.py index 1bea8baa..45977a36 100644 --- a/shipyard_airflow/plugins/drydock_operators.py +++ b/shipyard_airflow/plugins/drydock_operators.py @@ -13,7 +13,9 @@ # limitations under the License. import logging +import os import time +from urllib.parse import urlparse from airflow.exceptions import AirflowException from airflow.models import BaseOperator @@ -22,53 +24,55 @@ from airflow.utils.decorators import apply_defaults import drydock_provisioner.drydock_client.client as client import drydock_provisioner.drydock_client.session as session - +from get_k8s_pod_port_ip import get_pod_port_ip +from service_endpoint import ucp_service_endpoint from service_token import shipyard_service_token class DryDockOperator(BaseOperator): """ DryDock Client - :host: Target Host - :port: DryDock Port - :shipyard_conf: Location of shipyard.conf - :drydock_conf: Location of drydock YAML - :promenade_conf: Location of promenade YAML - :action: Task to perform - :design_id: DryDock Design ID - :workflow_info: Information related to the workflow - :main_dag_name: Parent Dag - :sub_dag_name: Child Dag + :param action: Task to perform + :param design_ref: A URI reference to the design documents + :param main_dag_name: Parent Dag + :param node_filter: A filter for narrowing the scope of the task. Valid + fields are 'node_names', 'rack_names', 'node_tags' + :param shipyard_conf: Location of shipyard.conf + :param sub_dag_name: Child Dag + :param workflow_info: Information related to the workflow """ @apply_defaults def __init__(self, - host=None, - port=None, action=None, - design_id=None, - shipyard_conf=None, - drydock_conf=None, - promenade_conf=None, - workflow_info={}, + design_ref=None, main_dag_name=None, + node_filter=None, + shipyard_conf=None, sub_dag_name=None, + workflow_info={}, xcom_push=True, *args, **kwargs): super(DryDockOperator, self).__init__(*args, **kwargs) - self.host = host - self.port = port - self.shipyard_conf = shipyard_conf - self.drydock_conf = drydock_conf - self.promenade_conf = promenade_conf self.action = action - self.design_id = design_id - self.workflow_info = workflow_info + self.design_ref = design_ref self.main_dag_name = main_dag_name + self.node_filter = node_filter + self.shipyard_conf = shipyard_conf self.sub_dag_name = sub_dag_name + self.workflow_info = workflow_info self.xcom_push_flag = xcom_push def execute(self, context): + # Initialize Variables + context['svc_type'] = 'physicalprovisioner' + genesis_node_ip = None + + # Placeholder definition + # TODO: Need to decide how to pass the required value from Shipyard to + # the 'node_filter' variable. No filter will be used for now. + self.node_filter = None + # Define task_instance task_instance = context['task_instance'] @@ -85,6 +89,11 @@ class DryDockOperator(BaseOperator): # DrydockClient if self.action == 'create_drydock_client': + # Retrieve Endpoint Information + context['svc_endpoint'] = ucp_service_endpoint(self, context) + logging.info("DryDock endpoint is %s", context['svc_endpoint']) + + # Set up DryDock Client drydock_client = self.drydock_session_client(context) return drydock_client @@ -94,157 +103,91 @@ class DryDockOperator(BaseOperator): task_ids='create_drydock_client', dag_id=self.sub_dag_name + '.create_drydock_client') - # Get Design ID - if self.action == 'get_design_id': - design_id = self.drydock_create_design(drydock_client) + # Based on the current logic used in CI/CD pipeline, we will + # point to the nginx server on the genesis host which is hosting + # the drydock YAMLs. That will be the URL for 'design_ref' + # NOTE: This is a temporary hack and will be removed once + # Artifactory or Deckhand is able to host the YAMLs + # NOTE: Testing of the Operator was performed with a nginx server + # on the genesis host that is listening on port 6880. The name of + # the YAML file is 'drydock.yaml'. We will use this assumption + # for now. + # TODO: This logic will be updated once DeckHand is integrated + # with DryDock + logging.info("Retrieving information of Tiller pod to obtain Genesis " + "Node IP...") - return design_id + # Retrieve Genesis Node IP + genesis_node_ip = self.get_genesis_node_ip(context) - # DryDock Load Parts - elif self.action == 'drydock_load_parts': - self.parts_type = 'drydock' - self.load_parts(drydock_client, context, self.parts_type) + # Form the URL path that we will use to retrieve the DryDock YAMLs + # Return Exceptions if we are not able to retrieve the URL + if genesis_node_ip: + schema = 'http://' + nginx_host_port = genesis_node_ip + ':6880' + drydock_yaml = 'drydock.yaml' + self.design_ref = os.path.join(schema, + nginx_host_port, + drydock_yaml) - # Promenade Load Parts - elif self.action == 'promenade_load_parts': - self.parts_type = 'promenade' - self.load_parts(drydock_client, context, self.parts_type) + logging.info("Drydock YAMLs will be retrieved from %s", + self.design_ref) + else: + raise AirflowException("Unable to Retrieve Genesis Node IP!") # Create Task for verify_site - elif self.action == 'verify_site': - self.perform_task = 'verify_site' - verify_site = self.drydock_perform_task(drydock_client, context, - self.perform_task, None) + if self.action == 'verify_site': - # Define variables - # Query every 10 seconds for 1 minute - interval = 10 - time_out = 60 - desired_state = 'success' - - # Query verify_site Task - task_id = verify_site['task_id'] - logging.info(task_id) - verify_site_status = self.drydock_query_task(drydock_client, - interval, - time_out, - task_id, - desired_state) - - if verify_site_status == 'timed_out': - raise AirflowException('Verify_Site Task Timed Out!') - elif verify_site_status == 'task_failed': - raise AirflowException('Verify_Site Task Failed!') - else: - logging.info('Verify Site Task:') - logging.info(verify_site_status) + # Default settings for 'verify_site' execution is to query + # the task every 10 seconds and to time out after 60 seconds + # TODO: Need to decide if we want to make polling interval and + # time out a variable in the Dags + self.drydock_action(drydock_client, context, self.action, 10, 60) # Create Task for prepare_site elif self.action == 'prepare_site': - self.perform_task = 'prepare_site' - prepare_site = self.drydock_perform_task(drydock_client, context, - self.perform_task, None) - - # Define variables - # Query every 10 seconds for 2 minutes - interval = 10 - time_out = 120 - desired_state = 'partial_success' - - # Query prepare_site Task - task_id = prepare_site['task_id'] - logging.info(task_id) - prepare_site_status = self.drydock_query_task(drydock_client, - interval, - time_out, - task_id, - desired_state) - - if prepare_site_status == 'timed_out': - raise AirflowException('Prepare_Site Task Timed Out!') - elif prepare_site_status == 'task_failed': - raise AirflowException('Prepare_Site Task Failed!') - else: - logging.info('Prepare Site Task:') - logging.info(prepare_site_status) + # Default settings for 'prepare_site' execution is to query + # the task every 10 seconds and to time out after 120 seconds + self.drydock_action(drydock_client, context, self.action, 10, 120) # Create Task for prepare_node elif self.action == 'prepare_node': - self.perform_task = 'prepare_node' - prepare_node = self.drydock_perform_task( - drydock_client, context, - self.perform_task, workflow_info) - - # Define variables - # Query every 30 seconds for 30 minutes - interval = 30 - time_out = 1800 - desired_state = 'success' - - # Query prepare_node Task - task_id = prepare_node['task_id'] - logging.info(task_id) - prepare_node_status = self.drydock_query_task(drydock_client, - interval, - time_out, - task_id, - desired_state) - - if prepare_node_status == 'timed_out': - raise AirflowException('Prepare_Node Task Timed Out!') - elif prepare_node_status == 'task_failed': - raise AirflowException('Prepare_Node Task Failed!') - else: - logging.info('Prepare Node Task:') - logging.info(prepare_node_status) + # Default settings for 'prepare_node' execution is to query + # the task every 30 seconds and to time out after 1800 seconds + self.drydock_action(drydock_client, context, self.action, 30, 1800) # Create Task for deploy_node elif self.action == 'deploy_node': - self.perform_task = 'deploy_node' - deploy_node = self.drydock_perform_task(drydock_client, - context, - self.perform_task, - workflow_info) + # Default settings for 'deploy_node' execution is to query + # the task every 30 seconds and to time out after 3600 seconds + self.drydock_action(drydock_client, context, self.action, 30, 3600) - # Define variables - # Query every 30 seconds for 60 minutes - interval = 30 - time_out = 3600 - desired_state = 'success' - - # Query deploy_node Task - task_id = deploy_node['task_id'] - logging.info(task_id) - deploy_node_status = self.drydock_query_task(drydock_client, - interval, - time_out, - task_id, - desired_state) - - if deploy_node_status == 'timed_out': - raise AirflowException('Deploy_Node Task Timed Out!') - elif deploy_node_status == 'task_failed': - raise AirflowException('Deploy_Node Task Failed!') - else: - logging.info('Deploy Node Task:') - logging.info(deploy_node_status) + # Do not perform any action else: logging.info('No Action to Perform') @shipyard_service_token def drydock_session_client(self, context): + # Initialize Variables + drydock_url = None + dd_session = None + dd_client = None + + # Parse DryDock Service Endpoint + drydock_url = urlparse(context['svc_endpoint']) # Build a DrydockSession with credentials and target host # information. logging.info("Build DryDock Session") - dd_session = session.DrydockSession(self.host, port=self.port, + dd_session = session.DrydockSession(drydock_url.hostname, + port=drydock_url.port, token=context['svc_token']) # Raise Exception if we are not able to get a drydock session if dd_session: - logging.info("Successfully Built DryDock Session") + logging.info("Successfully Set Up DryDock Session") else: - raise AirflowException("Unable to get a Drydock Session") + raise AirflowException("Failed to set up Drydock Session!") # Use session to build a DrydockClient to make one or more API calls # The DrydockSession will care for TCP connection pooling @@ -254,126 +197,102 @@ class DryDockOperator(BaseOperator): # Raise Exception if we are not able to build drydock client if dd_client: - logging.info("Successfully Built DryDock client") + logging.info("Successfully Set Up DryDock client") else: - raise AirflowException("Unable to Build Drydock Client") + raise AirflowException("Unable to set up Drydock Client!") # Drydock client for XCOM Usage return dd_client - def drydock_create_design(self, drydock_client): + def drydock_action(self, drydock_client, context, action, interval, + time_out): - # Create Design - logging.info('Create Design ID') - drydock_design_id = drydock_client.create_design() + # Trigger DryDock to execute task and retrieve task ID + task_id = self.drydock_perform_task(drydock_client, context, + action, None) - # Raise Exception if we are not able to get a value - # from drydock create_design API call - if drydock_design_id: - return drydock_design_id - else: - raise AirflowException("Unable to create Design ID") + logging.info('Task ID is %s', task_id) - def get_design_id(self, context): - - # Get Design ID from XCOM - task_instance = context['task_instance'] - design_id = task_instance.xcom_pull( - task_ids='drydock_get_design_id', - dag_id=self.sub_dag_name + '.drydock_get_design_id') - - return design_id - - def load_parts(self, drydock_client, context, parts_type): - - # Load new design parts into a design context via YAML conforming - # to the Drydock design YAML schema - - # Open drydock.yaml/promenade.yaml as string so that it can be - # ingested. This step will change in future when DeckHand is - # integrated with the system - if self.parts_type == 'drydock': - with open(self.drydock_conf, "r") as drydock_yaml: - yaml_string = drydock_yaml.read() - else: - with open(self.promenade_conf, "r") as promenade_yaml: - yaml_string = promenade_yaml.read() - - # Get Design ID and pass it to DryDock - self.design_id = self.get_design_id(context) - - # Load Design - # Return Exception if list is empty - logging.info("Load %s Configuration Yaml", self.parts_type) - load_design = drydock_client.load_parts(self.design_id, - yaml_string=yaml_string) - - if len(load_design) == 0: - raise AirflowException("Empty Design. Please check input Yaml.") - else: - logging.info(load_design) + # Query Task + self.drydock_query_task(drydock_client, interval, time_out, + task_id) def drydock_perform_task(self, drydock_client, context, - perform_task, workflow_info): + perform_task, nodes_filter): - # Get Design ID and pass it to DryDock - self.design_id = self.get_design_id(context) - - # Task to do - task_to_perform = self.perform_task + # Initialize Variables + create_task_response = {} + task_id = None # Node Filter - if workflow_info: - nodes_filter = workflow_info['parameters']['servername'] - else: - nodes_filter = None - logging.info("Nodes Filter List: %s", nodes_filter) - # Get uuid of the create_task's id - self.task_id = drydock_client.create_task(self.design_id, - task_to_perform, - nodes_filter) + # Create Task + create_task_response = drydock_client.create_task( + design_ref=self.design_ref, + task_action=perform_task, + node_filter=nodes_filter) - # Get current state/response of the drydock task - task_status = drydock_client.get_task(self.task_id) + # Retrieve Task ID + task_id = create_task_response.get('task_id') + logging.info('Drydock %s task ID is %s', perform_task, task_id) - # task_status should contain information and be of length > 0 - # Raise Exception if that is not the case - if len(task_status) == 0: - raise AirflowException("Unable to get task state") + # Raise Exception if we are not able to get the task_id from + # drydock + if task_id: + return task_id else: - return task_status + raise AirflowException("Unable to create task!") - def drydock_query_task(self, drydock_client, interval, time_out, - task_id, desired_state): + def drydock_query_task(self, drydock_client, interval, time_out, task_id): # Calculate number of times to execute the 'for' loop end_range = int(time_out / interval) - # Query task state + # Query task status for i in range(0, end_range + 1): # Retrieve current task state - task_state = drydock_client.get_task(self.task_id) - logging.info(task_state) + task_state = drydock_client.get_task(task_id=task_id) + task_status = task_state.get('status') + task_results = task_state.get('result')['status'] - # Return Time Out Exception - if task_state['status'] == 'running' and i == end_range: - logging.info('Timed Out!') - return 'timed_out' + logging.info("Current status of task id %s is %s", + task_id, task_status) + + # Raise Time Out Exception + if task_status == 'running' and i == end_range: + raise AirflowException("Task Execution Timed Out!") # Exit 'for' loop if task is in 'complete' state - if task_state['status'] == 'complete': + if task_status == 'complete': break else: time.sleep(interval) # Get final task state - if task_state['result'] == desired_state: - return drydock_client.get_task(self.task_id) + # NOTE: There is a known bug in Drydock where the task result + # for a successfully completed task can either be 'success' or + # 'partial success'. This will be fixed in Drydock in the near + # future. Updates will be made to the Drydock Operator once the + # bug is fixed. + if task_results in ['success', 'partial_success']: + logging.info('Task id %s has been successfully completed', + self.task_id) else: - return 'task_failed' + raise AirflowException("Failed to execute/complete task!") + + @get_pod_port_ip('tiller') + def get_genesis_node_ip(self, context, *args): + + # Get IP and port information of Pods from context + k8s_pods_ip_port = context['pods_ip_port'] + + # Tiller will take the IP of the Genesis Node. Retrieve + # the IP of tiller to get the IP of the Genesis Node + genesis_ip = k8s_pods_ip_port['tiller'].get('ip') + + return genesis_ip class DryDockClientPlugin(AirflowPlugin):