From 1a07818711fd027fa55ce4a62789292c8c5ea7d6 Mon Sep 17 00:00:00 2001 From: Anthony Lin Date: Sun, 22 Oct 2017 05:04:21 +0000 Subject: [PATCH] Revamp DryDock Operator and Dag There has been changes to the way in which DryDock YAMLs are consumed by DryDock. We will make use of a self-hosted NGINX container on the Genesis Host to house the YAMLs while waiting for the integration work to be completed between DryDock and Deckhand. This is the current approach taken by CI/CD. This P.S. is meant to align Shipyard/Airflow with the current implementations in CI/CD Pipeline. Change-Id: I7136aa74e33f980589820c75159fa3259ee79976 --- shipyard_airflow/dags/drydock_deploy_site.py | 277 +++---------- shipyard_airflow/plugins/drydock_operators.py | 381 +++++++----------- 2 files changed, 210 insertions(+), 448 deletions(-) diff --git a/shipyard_airflow/dags/drydock_deploy_site.py b/shipyard_airflow/dags/drydock_deploy_site.py index d3f62507..ad0242b5 100644 --- a/shipyard_airflow/dags/drydock_deploy_site.py +++ b/shipyard_airflow/dags/drydock_deploy_site.py @@ -12,8 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import configparser - from airflow.models import DAG from airflow.operators.subdag_operator import SubDagOperator from airflow.operators import DryDockOperator @@ -22,177 +20,39 @@ from airflow.operators import DryDockOperator # Location of shiyard.conf config_path = '/usr/local/airflow/plugins/shipyard.conf' -# Read and parse shiyard.conf -config = configparser.ConfigParser() -config.read(config_path) - -# Define Variables -drydock_target_host = config.get('drydock', 'host') -drydock_port = config.get('drydock', 'port') -drydock_conf = config.get('drydock', 'site_yaml') -promenade_conf = config.get('drydock', 'prom_yaml') -parent_dag = 'deploy_site' -child_dag = 'deploy_site.drydock_build' - - -def create_drydock_client(parent_dag_name, child_dag_name, args): - ''' - Create Drydock Client - ''' - dag = DAG( - '{}.{}'.format(parent_dag_name, child_dag_name), - default_args=args, ) - - operator = DryDockOperator( - task_id='create_drydock_client', - host=drydock_target_host, - port=drydock_port, - shipyard_conf=config_path, - action='create_drydock_client', - main_dag_name=parent_dag, - sub_dag_name=child_dag, - dag=dag) - - return dag - - -def drydock_get_design_id(parent_dag_name, child_dag_name, args): - ''' - Get Design ID - ''' - dag = DAG( - '{}.{}'.format(parent_dag_name, child_dag_name), - default_args=args, ) - - operator = DryDockOperator( - task_id='drydock_get_design_id', - action='get_design_id', - main_dag_name=parent_dag, - sub_dag_name=child_dag, - dag=dag) - - return dag - - -def drydock_load_parts(parent_dag_name, child_dag_name, args): - ''' - Load DryDock Yaml - ''' - dag = DAG( - '{}.{}'.format(parent_dag_name, child_dag_name), - default_args=args, ) - - operator = DryDockOperator( - task_id='drydock_load_parts', - drydock_conf=drydock_conf, - action='drydock_load_parts', - main_dag_name=parent_dag, - sub_dag_name=child_dag, - dag=dag) - - return dag - - -def promenade_load_parts(parent_dag_name, child_dag_name, args): - ''' - Load Promenade Yaml - ''' - dag = DAG( - '{}.{}'.format(parent_dag_name, child_dag_name), - default_args=args, ) - - operator = DryDockOperator( - task_id='promenade_load_parts', - promenade_conf=promenade_conf, - action='promenade_load_parts', - main_dag_name=parent_dag, - sub_dag_name=child_dag, - dag=dag) - - return dag - - -def drydock_verify_site(parent_dag_name, child_dag_name, args): - ''' - Verify connectivity between DryDock and MAAS - ''' - dag = DAG( - '{}.{}'.format(parent_dag_name, child_dag_name), - default_args=args, ) - - operator = DryDockOperator( - task_id='drydock_verify_site', - action='verify_site', - main_dag_name=parent_dag, - sub_dag_name=child_dag, - dag=dag) - - return dag - - -def drydock_prepare_site(parent_dag_name, child_dag_name, args): - ''' - Prepare site for deployment - ''' - dag = DAG( - '{}.{}'.format(parent_dag_name, child_dag_name), - default_args=args, ) - - operator = DryDockOperator( - task_id='drydock_prepare_site', - action='prepare_site', - main_dag_name=parent_dag, - sub_dag_name=child_dag, - dag=dag) - - return dag - - -def drydock_prepare_node(parent_dag_name, child_dag_name, args): - ''' - Prepare nodes for deployment - ''' - dag = DAG( - '{}.{}'.format(parent_dag_name, child_dag_name), - default_args=args, ) - - operator = DryDockOperator( - task_id='drydock_prepare_node', - action='prepare_node', - main_dag_name=parent_dag, - sub_dag_name=child_dag, - dag=dag) - - return dag - - -def drydock_deploy_node(parent_dag_name, child_dag_name, args): - ''' - Deploy Nodes - ''' - dag = DAG( - '{}.{}'.format(parent_dag_name, child_dag_name), - default_args=args, ) - - operator = DryDockOperator( - task_id='drydock_deploy_node', - action='deploy_node', - main_dag_name=parent_dag, - sub_dag_name=child_dag, - dag=dag) - - return dag - - # Names used for sub-subdags in the drydock site deployment subdag CREATE_DRYDOCK_CLIENT_DAG_NAME = 'create_drydock_client' -DRYDOCK_GET_DESIGN_ID_DAG_NAME = 'drydock_get_design_id' -DRYDOCK_LOAD_PARTS_DAG_NAME = 'drydock_load_parts' -PROMENADE_LOAD_PARTS_DAG_NAME = 'promenade_load_parts' -DRYDOCK_VERIFY_SITE_DAG_NAME = 'drydock_verify_site' -DRYDOCK_PREPARE_SITE_DAG_NAME = 'drydock_prepare_site' -DRYDOCK_PREPARE_NODE_DAG_NAME = 'drydock_prepare_node' -DRYDOCK_DEPLOY_NODE_DAG_NAME = 'drydock_deploy_node' +DRYDOCK_VERIFY_SITE_DAG_NAME = 'verify_site' +DRYDOCK_PREPARE_SITE_DAG_NAME = 'prepare_site' +DRYDOCK_PREPARE_NODE_DAG_NAME = 'prepare_node' +DRYDOCK_DEPLOY_NODE_DAG_NAME = 'deploy_node' + + +def get_drydock_subdag_step(parent_dag_name, child_dag_name, args): + ''' + Execute DryDock Subdag + ''' + dag = DAG( + '{}.{}'.format(parent_dag_name, child_dag_name), + default_args=args) + # Note that in the event where the 'deploy_site' action is + # triggered from Shipyard, the 'parent_dag_name' variable + # gets assigned with 'deploy_site.create_drydock_client'. + # This is the name that we want to assign to the subdag so + # that we can reference it for xcom. The name of the main + # dag will be the front part of that value, i.e. 'deploy_site'. + # Hence we will extract the front part and assign it to main_dag. + # We will reuse this pattern for other Actions, e.g. update_site, + # redeploy_site as well. + operator = DryDockOperator( + task_id=child_dag_name, + shipyard_conf=config_path, + action=child_dag_name, + main_dag_name=parent_dag_name[0:parent_dag_name.find('.')], + sub_dag_name=parent_dag_name, + dag=dag) + + return dag def deploy_site_drydock(parent_dag_name, child_dag_name, args): @@ -201,64 +61,47 @@ def deploy_site_drydock(parent_dag_name, child_dag_name, args): ''' dag = DAG( '{}.{}'.format(parent_dag_name, child_dag_name), - default_args=args, ) + default_args=args) drydock_client = SubDagOperator( - subdag=create_drydock_client(dag.dag_id, - CREATE_DRYDOCK_CLIENT_DAG_NAME, - args), + subdag=get_drydock_subdag_step(dag.dag_id, + CREATE_DRYDOCK_CLIENT_DAG_NAME, + args), task_id=CREATE_DRYDOCK_CLIENT_DAG_NAME, - dag=dag, ) + dag=dag) - design_id = SubDagOperator( - subdag=drydock_get_design_id( - dag.dag_id, DRYDOCK_GET_DESIGN_ID_DAG_NAME, args), - task_id=DRYDOCK_GET_DESIGN_ID_DAG_NAME, - dag=dag, ) - - drydock_yaml = SubDagOperator( - subdag=drydock_load_parts( - dag.dag_id, DRYDOCK_LOAD_PARTS_DAG_NAME, args), - task_id=DRYDOCK_LOAD_PARTS_DAG_NAME, - dag=dag, ) - - promenade_yaml = SubDagOperator( - subdag=promenade_load_parts(dag.dag_id, - PROMENADE_LOAD_PARTS_DAG_NAME, args), - task_id=PROMENADE_LOAD_PARTS_DAG_NAME, - dag=dag, ) - - verify_site = SubDagOperator( - subdag=drydock_verify_site(dag.dag_id, - DRYDOCK_VERIFY_SITE_DAG_NAME, args), + drydock_verify_site = SubDagOperator( + subdag=get_drydock_subdag_step(dag.dag_id, + DRYDOCK_VERIFY_SITE_DAG_NAME, + args), task_id=DRYDOCK_VERIFY_SITE_DAG_NAME, - dag=dag, ) + dag=dag) - prepare_site = SubDagOperator( - subdag=drydock_prepare_site(dag.dag_id, - DRYDOCK_PREPARE_SITE_DAG_NAME, args), + drydock_prepare_site = SubDagOperator( + subdag=get_drydock_subdag_step(dag.dag_id, + DRYDOCK_PREPARE_SITE_DAG_NAME, + args), task_id=DRYDOCK_PREPARE_SITE_DAG_NAME, - dag=dag, ) + dag=dag) - prepare_node = SubDagOperator( - subdag=drydock_prepare_node(dag.dag_id, - DRYDOCK_PREPARE_NODE_DAG_NAME, args), + drydock_prepare_node = SubDagOperator( + subdag=get_drydock_subdag_step(dag.dag_id, + DRYDOCK_PREPARE_NODE_DAG_NAME, + args), task_id=DRYDOCK_PREPARE_NODE_DAG_NAME, - dag=dag, ) + dag=dag) - deploy_node = SubDagOperator( - subdag=drydock_deploy_node(dag.dag_id, - DRYDOCK_DEPLOY_NODE_DAG_NAME, args), + drydock_deploy_node = SubDagOperator( + subdag=get_drydock_subdag_step(dag.dag_id, + DRYDOCK_DEPLOY_NODE_DAG_NAME, + args), task_id=DRYDOCK_DEPLOY_NODE_DAG_NAME, - dag=dag, ) + dag=dag) # DAG Wiring - design_id.set_upstream(drydock_client) - drydock_yaml.set_upstream(design_id) - promenade_yaml.set_upstream(drydock_yaml) - verify_site.set_upstream(promenade_yaml) - prepare_site.set_upstream(verify_site) - prepare_node.set_upstream(prepare_site) - deploy_node.set_upstream(prepare_node) + drydock_verify_site.set_upstream(drydock_client) + drydock_prepare_site.set_upstream(drydock_verify_site) + drydock_prepare_node.set_upstream(drydock_prepare_site) + drydock_deploy_node.set_upstream(drydock_prepare_node) return dag diff --git a/shipyard_airflow/plugins/drydock_operators.py b/shipyard_airflow/plugins/drydock_operators.py index 1bea8baa..45977a36 100644 --- a/shipyard_airflow/plugins/drydock_operators.py +++ b/shipyard_airflow/plugins/drydock_operators.py @@ -13,7 +13,9 @@ # limitations under the License. import logging +import os import time +from urllib.parse import urlparse from airflow.exceptions import AirflowException from airflow.models import BaseOperator @@ -22,53 +24,55 @@ from airflow.utils.decorators import apply_defaults import drydock_provisioner.drydock_client.client as client import drydock_provisioner.drydock_client.session as session - +from get_k8s_pod_port_ip import get_pod_port_ip +from service_endpoint import ucp_service_endpoint from service_token import shipyard_service_token class DryDockOperator(BaseOperator): """ DryDock Client - :host: Target Host - :port: DryDock Port - :shipyard_conf: Location of shipyard.conf - :drydock_conf: Location of drydock YAML - :promenade_conf: Location of promenade YAML - :action: Task to perform - :design_id: DryDock Design ID - :workflow_info: Information related to the workflow - :main_dag_name: Parent Dag - :sub_dag_name: Child Dag + :param action: Task to perform + :param design_ref: A URI reference to the design documents + :param main_dag_name: Parent Dag + :param node_filter: A filter for narrowing the scope of the task. Valid + fields are 'node_names', 'rack_names', 'node_tags' + :param shipyard_conf: Location of shipyard.conf + :param sub_dag_name: Child Dag + :param workflow_info: Information related to the workflow """ @apply_defaults def __init__(self, - host=None, - port=None, action=None, - design_id=None, - shipyard_conf=None, - drydock_conf=None, - promenade_conf=None, - workflow_info={}, + design_ref=None, main_dag_name=None, + node_filter=None, + shipyard_conf=None, sub_dag_name=None, + workflow_info={}, xcom_push=True, *args, **kwargs): super(DryDockOperator, self).__init__(*args, **kwargs) - self.host = host - self.port = port - self.shipyard_conf = shipyard_conf - self.drydock_conf = drydock_conf - self.promenade_conf = promenade_conf self.action = action - self.design_id = design_id - self.workflow_info = workflow_info + self.design_ref = design_ref self.main_dag_name = main_dag_name + self.node_filter = node_filter + self.shipyard_conf = shipyard_conf self.sub_dag_name = sub_dag_name + self.workflow_info = workflow_info self.xcom_push_flag = xcom_push def execute(self, context): + # Initialize Variables + context['svc_type'] = 'physicalprovisioner' + genesis_node_ip = None + + # Placeholder definition + # TODO: Need to decide how to pass the required value from Shipyard to + # the 'node_filter' variable. No filter will be used for now. + self.node_filter = None + # Define task_instance task_instance = context['task_instance'] @@ -85,6 +89,11 @@ class DryDockOperator(BaseOperator): # DrydockClient if self.action == 'create_drydock_client': + # Retrieve Endpoint Information + context['svc_endpoint'] = ucp_service_endpoint(self, context) + logging.info("DryDock endpoint is %s", context['svc_endpoint']) + + # Set up DryDock Client drydock_client = self.drydock_session_client(context) return drydock_client @@ -94,157 +103,91 @@ class DryDockOperator(BaseOperator): task_ids='create_drydock_client', dag_id=self.sub_dag_name + '.create_drydock_client') - # Get Design ID - if self.action == 'get_design_id': - design_id = self.drydock_create_design(drydock_client) + # Based on the current logic used in CI/CD pipeline, we will + # point to the nginx server on the genesis host which is hosting + # the drydock YAMLs. That will be the URL for 'design_ref' + # NOTE: This is a temporary hack and will be removed once + # Artifactory or Deckhand is able to host the YAMLs + # NOTE: Testing of the Operator was performed with a nginx server + # on the genesis host that is listening on port 6880. The name of + # the YAML file is 'drydock.yaml'. We will use this assumption + # for now. + # TODO: This logic will be updated once DeckHand is integrated + # with DryDock + logging.info("Retrieving information of Tiller pod to obtain Genesis " + "Node IP...") - return design_id + # Retrieve Genesis Node IP + genesis_node_ip = self.get_genesis_node_ip(context) - # DryDock Load Parts - elif self.action == 'drydock_load_parts': - self.parts_type = 'drydock' - self.load_parts(drydock_client, context, self.parts_type) + # Form the URL path that we will use to retrieve the DryDock YAMLs + # Return Exceptions if we are not able to retrieve the URL + if genesis_node_ip: + schema = 'http://' + nginx_host_port = genesis_node_ip + ':6880' + drydock_yaml = 'drydock.yaml' + self.design_ref = os.path.join(schema, + nginx_host_port, + drydock_yaml) - # Promenade Load Parts - elif self.action == 'promenade_load_parts': - self.parts_type = 'promenade' - self.load_parts(drydock_client, context, self.parts_type) + logging.info("Drydock YAMLs will be retrieved from %s", + self.design_ref) + else: + raise AirflowException("Unable to Retrieve Genesis Node IP!") # Create Task for verify_site - elif self.action == 'verify_site': - self.perform_task = 'verify_site' - verify_site = self.drydock_perform_task(drydock_client, context, - self.perform_task, None) + if self.action == 'verify_site': - # Define variables - # Query every 10 seconds for 1 minute - interval = 10 - time_out = 60 - desired_state = 'success' - - # Query verify_site Task - task_id = verify_site['task_id'] - logging.info(task_id) - verify_site_status = self.drydock_query_task(drydock_client, - interval, - time_out, - task_id, - desired_state) - - if verify_site_status == 'timed_out': - raise AirflowException('Verify_Site Task Timed Out!') - elif verify_site_status == 'task_failed': - raise AirflowException('Verify_Site Task Failed!') - else: - logging.info('Verify Site Task:') - logging.info(verify_site_status) + # Default settings for 'verify_site' execution is to query + # the task every 10 seconds and to time out after 60 seconds + # TODO: Need to decide if we want to make polling interval and + # time out a variable in the Dags + self.drydock_action(drydock_client, context, self.action, 10, 60) # Create Task for prepare_site elif self.action == 'prepare_site': - self.perform_task = 'prepare_site' - prepare_site = self.drydock_perform_task(drydock_client, context, - self.perform_task, None) - - # Define variables - # Query every 10 seconds for 2 minutes - interval = 10 - time_out = 120 - desired_state = 'partial_success' - - # Query prepare_site Task - task_id = prepare_site['task_id'] - logging.info(task_id) - prepare_site_status = self.drydock_query_task(drydock_client, - interval, - time_out, - task_id, - desired_state) - - if prepare_site_status == 'timed_out': - raise AirflowException('Prepare_Site Task Timed Out!') - elif prepare_site_status == 'task_failed': - raise AirflowException('Prepare_Site Task Failed!') - else: - logging.info('Prepare Site Task:') - logging.info(prepare_site_status) + # Default settings for 'prepare_site' execution is to query + # the task every 10 seconds and to time out after 120 seconds + self.drydock_action(drydock_client, context, self.action, 10, 120) # Create Task for prepare_node elif self.action == 'prepare_node': - self.perform_task = 'prepare_node' - prepare_node = self.drydock_perform_task( - drydock_client, context, - self.perform_task, workflow_info) - - # Define variables - # Query every 30 seconds for 30 minutes - interval = 30 - time_out = 1800 - desired_state = 'success' - - # Query prepare_node Task - task_id = prepare_node['task_id'] - logging.info(task_id) - prepare_node_status = self.drydock_query_task(drydock_client, - interval, - time_out, - task_id, - desired_state) - - if prepare_node_status == 'timed_out': - raise AirflowException('Prepare_Node Task Timed Out!') - elif prepare_node_status == 'task_failed': - raise AirflowException('Prepare_Node Task Failed!') - else: - logging.info('Prepare Node Task:') - logging.info(prepare_node_status) + # Default settings for 'prepare_node' execution is to query + # the task every 30 seconds and to time out after 1800 seconds + self.drydock_action(drydock_client, context, self.action, 30, 1800) # Create Task for deploy_node elif self.action == 'deploy_node': - self.perform_task = 'deploy_node' - deploy_node = self.drydock_perform_task(drydock_client, - context, - self.perform_task, - workflow_info) + # Default settings for 'deploy_node' execution is to query + # the task every 30 seconds and to time out after 3600 seconds + self.drydock_action(drydock_client, context, self.action, 30, 3600) - # Define variables - # Query every 30 seconds for 60 minutes - interval = 30 - time_out = 3600 - desired_state = 'success' - - # Query deploy_node Task - task_id = deploy_node['task_id'] - logging.info(task_id) - deploy_node_status = self.drydock_query_task(drydock_client, - interval, - time_out, - task_id, - desired_state) - - if deploy_node_status == 'timed_out': - raise AirflowException('Deploy_Node Task Timed Out!') - elif deploy_node_status == 'task_failed': - raise AirflowException('Deploy_Node Task Failed!') - else: - logging.info('Deploy Node Task:') - logging.info(deploy_node_status) + # Do not perform any action else: logging.info('No Action to Perform') @shipyard_service_token def drydock_session_client(self, context): + # Initialize Variables + drydock_url = None + dd_session = None + dd_client = None + + # Parse DryDock Service Endpoint + drydock_url = urlparse(context['svc_endpoint']) # Build a DrydockSession with credentials and target host # information. logging.info("Build DryDock Session") - dd_session = session.DrydockSession(self.host, port=self.port, + dd_session = session.DrydockSession(drydock_url.hostname, + port=drydock_url.port, token=context['svc_token']) # Raise Exception if we are not able to get a drydock session if dd_session: - logging.info("Successfully Built DryDock Session") + logging.info("Successfully Set Up DryDock Session") else: - raise AirflowException("Unable to get a Drydock Session") + raise AirflowException("Failed to set up Drydock Session!") # Use session to build a DrydockClient to make one or more API calls # The DrydockSession will care for TCP connection pooling @@ -254,126 +197,102 @@ class DryDockOperator(BaseOperator): # Raise Exception if we are not able to build drydock client if dd_client: - logging.info("Successfully Built DryDock client") + logging.info("Successfully Set Up DryDock client") else: - raise AirflowException("Unable to Build Drydock Client") + raise AirflowException("Unable to set up Drydock Client!") # Drydock client for XCOM Usage return dd_client - def drydock_create_design(self, drydock_client): + def drydock_action(self, drydock_client, context, action, interval, + time_out): - # Create Design - logging.info('Create Design ID') - drydock_design_id = drydock_client.create_design() + # Trigger DryDock to execute task and retrieve task ID + task_id = self.drydock_perform_task(drydock_client, context, + action, None) - # Raise Exception if we are not able to get a value - # from drydock create_design API call - if drydock_design_id: - return drydock_design_id - else: - raise AirflowException("Unable to create Design ID") + logging.info('Task ID is %s', task_id) - def get_design_id(self, context): - - # Get Design ID from XCOM - task_instance = context['task_instance'] - design_id = task_instance.xcom_pull( - task_ids='drydock_get_design_id', - dag_id=self.sub_dag_name + '.drydock_get_design_id') - - return design_id - - def load_parts(self, drydock_client, context, parts_type): - - # Load new design parts into a design context via YAML conforming - # to the Drydock design YAML schema - - # Open drydock.yaml/promenade.yaml as string so that it can be - # ingested. This step will change in future when DeckHand is - # integrated with the system - if self.parts_type == 'drydock': - with open(self.drydock_conf, "r") as drydock_yaml: - yaml_string = drydock_yaml.read() - else: - with open(self.promenade_conf, "r") as promenade_yaml: - yaml_string = promenade_yaml.read() - - # Get Design ID and pass it to DryDock - self.design_id = self.get_design_id(context) - - # Load Design - # Return Exception if list is empty - logging.info("Load %s Configuration Yaml", self.parts_type) - load_design = drydock_client.load_parts(self.design_id, - yaml_string=yaml_string) - - if len(load_design) == 0: - raise AirflowException("Empty Design. Please check input Yaml.") - else: - logging.info(load_design) + # Query Task + self.drydock_query_task(drydock_client, interval, time_out, + task_id) def drydock_perform_task(self, drydock_client, context, - perform_task, workflow_info): + perform_task, nodes_filter): - # Get Design ID and pass it to DryDock - self.design_id = self.get_design_id(context) - - # Task to do - task_to_perform = self.perform_task + # Initialize Variables + create_task_response = {} + task_id = None # Node Filter - if workflow_info: - nodes_filter = workflow_info['parameters']['servername'] - else: - nodes_filter = None - logging.info("Nodes Filter List: %s", nodes_filter) - # Get uuid of the create_task's id - self.task_id = drydock_client.create_task(self.design_id, - task_to_perform, - nodes_filter) + # Create Task + create_task_response = drydock_client.create_task( + design_ref=self.design_ref, + task_action=perform_task, + node_filter=nodes_filter) - # Get current state/response of the drydock task - task_status = drydock_client.get_task(self.task_id) + # Retrieve Task ID + task_id = create_task_response.get('task_id') + logging.info('Drydock %s task ID is %s', perform_task, task_id) - # task_status should contain information and be of length > 0 - # Raise Exception if that is not the case - if len(task_status) == 0: - raise AirflowException("Unable to get task state") + # Raise Exception if we are not able to get the task_id from + # drydock + if task_id: + return task_id else: - return task_status + raise AirflowException("Unable to create task!") - def drydock_query_task(self, drydock_client, interval, time_out, - task_id, desired_state): + def drydock_query_task(self, drydock_client, interval, time_out, task_id): # Calculate number of times to execute the 'for' loop end_range = int(time_out / interval) - # Query task state + # Query task status for i in range(0, end_range + 1): # Retrieve current task state - task_state = drydock_client.get_task(self.task_id) - logging.info(task_state) + task_state = drydock_client.get_task(task_id=task_id) + task_status = task_state.get('status') + task_results = task_state.get('result')['status'] - # Return Time Out Exception - if task_state['status'] == 'running' and i == end_range: - logging.info('Timed Out!') - return 'timed_out' + logging.info("Current status of task id %s is %s", + task_id, task_status) + + # Raise Time Out Exception + if task_status == 'running' and i == end_range: + raise AirflowException("Task Execution Timed Out!") # Exit 'for' loop if task is in 'complete' state - if task_state['status'] == 'complete': + if task_status == 'complete': break else: time.sleep(interval) # Get final task state - if task_state['result'] == desired_state: - return drydock_client.get_task(self.task_id) + # NOTE: There is a known bug in Drydock where the task result + # for a successfully completed task can either be 'success' or + # 'partial success'. This will be fixed in Drydock in the near + # future. Updates will be made to the Drydock Operator once the + # bug is fixed. + if task_results in ['success', 'partial_success']: + logging.info('Task id %s has been successfully completed', + self.task_id) else: - return 'task_failed' + raise AirflowException("Failed to execute/complete task!") + + @get_pod_port_ip('tiller') + def get_genesis_node_ip(self, context, *args): + + # Get IP and port information of Pods from context + k8s_pods_ip_port = context['pods_ip_port'] + + # Tiller will take the IP of the Genesis Node. Retrieve + # the IP of tiller to get the IP of the Genesis Node + genesis_ip = k8s_pods_ip_port['tiller'].get('ip') + + return genesis_ip class DryDockClientPlugin(AirflowPlugin):