diff --git a/shipyard_airflow/control/shipyard.conf b/shipyard_airflow/control/shipyard.conf index 402f9c7e..c2afe160 100644 --- a/shipyard_airflow/control/shipyard.conf +++ b/shipyard_airflow/control/shipyard.conf @@ -1,13 +1,14 @@ [base] web_server=http://localhost:32080 +postgresql_db = postgresql+psycopg2://postgresql.ucp:5432/shipyard +postgresql_airflow_db = postgresql+psycopg2://postgresql.ucp:5432/airflow [drydock] -host=drydock-api.drydock +host=drydock-api.ucp port=32768 token=bigboss site_yaml=/usr/local/airflow/plugins/drydock.yaml prom_yaml=/usr/local/airflow/plugins/promenade.yaml -k8_masters=k8_master_node02,k8_master_node03 [keystone] OS_AUTH_URL=http://keystone-api.openstack:80/v3 diff --git a/shipyard_airflow/dags/drydock_operator_child.py b/shipyard_airflow/dags/drydock_operator_child.py index 594f6ca3..6a5d5053 100644 --- a/shipyard_airflow/dags/drydock_operator_child.py +++ b/shipyard_airflow/dags/drydock_operator_child.py @@ -18,7 +18,6 @@ import configparser from airflow import DAG from airflow.operators import DryDockOperator - def sub_dag(parent_dag_name, child_dag_name, args, schedule_interval): dag = DAG( '%s.%s' % (parent_dag_name, child_dag_name), @@ -41,11 +40,6 @@ def sub_dag(parent_dag_name, child_dag_name, args, schedule_interval): drydock_conf = config.get('drydock', 'site_yaml') promenade_conf = config.get('drydock', 'prom_yaml') - # Convert to Dictionary - k8_masters_sting = config.get('drydock', 'k8_masters') - k8_masters_list = k8_masters_sting.split(',') - k8_masters = {'node_names': k8_masters_list} - # Create Drydock Client t1 = DryDockOperator( task_id='create_drydock_client', @@ -92,14 +86,12 @@ def sub_dag(parent_dag_name, child_dag_name, args, schedule_interval): t7 = DryDockOperator( task_id='drydock_prepare_node', action='prepare_node', - node_filter=k8_masters, dag=dag) # Deploy Node t8 = DryDockOperator( task_id='drydock_deploy_node', action='deploy_node', - node_filter=k8_masters, dag=dag) # Define dependencies diff --git a/shipyard_airflow/dags/drydock_operator_parent.py b/shipyard_airflow/dags/drydock_operator_parent.py index 937e26ea..8cce87e5 100644 --- a/shipyard_airflow/dags/drydock_operator_parent.py +++ b/shipyard_airflow/dags/drydock_operator_parent.py @@ -17,6 +17,7 @@ import airflow from airflow import DAG from datetime import timedelta +from airflow.operators.python_operator import PythonOperator from airflow.operators.subdag_operator import SubDagOperator from drydock_operator_child import sub_dag @@ -40,9 +41,24 @@ main_dag = DAG( max_active_runs=1 ) +# Define push function to store the content of 'action' that is +# defined via 'dag_run' in XCOM so that it can be used by the +# DryDock Operators +def push(**kwargs): + # Pushes action XCom + kwargs['ti'].xcom_push(key='action', + value=kwargs['dag_run'].conf['action']) + + +action_xcom = PythonOperator( + task_id='action_xcom', dag=main_dag, python_callable=push) + subdag = SubDagOperator( subdag=sub_dag(parent_dag_name, child_dag_name, args, main_dag.schedule_interval), task_id=child_dag_name, default_args=args, dag=main_dag) + +# Set dependencies +subdag.set_upstream(action_xcom) diff --git a/shipyard_airflow/plugins/drydock_operators.py b/shipyard_airflow/plugins/drydock_operators.py index 9d494b81..079f4bca 100644 --- a/shipyard_airflow/plugins/drydock_operators.py +++ b/shipyard_airflow/plugins/drydock_operators.py @@ -37,9 +37,9 @@ class DryDockOperator(BaseOperator): :shipyard_conf: Location of shipyard.conf :drydock_conf: Location of drydock YAML :promenade_conf: Location of promenade YAML - :node_filter: Valid fields are 'node_names','rack_names','node_tags' :action: Task to perform :design_id: DryDock Design ID + :workflow_info: Information related to the workflow """ @apply_defaults def __init__(self, @@ -48,10 +48,10 @@ class DryDockOperator(BaseOperator): token=None, action=None, design_id=None, - node_filter=None, shipyard_conf=None, drydock_conf=None, promenade_conf=None, + workflow_info={}, xcom_push=True, *args, **kwargs): @@ -64,10 +64,21 @@ class DryDockOperator(BaseOperator): self.promenade_conf = promenade_conf self.action = action self.design_id = design_id - self.node_filter = node_filter + self.workflow_info = workflow_info self.xcom_push_flag = xcom_push def execute(self, context): + # Define task_instance + task_instance = context['task_instance'] + + # Extract information related to current workflow + # The workflow_info variable will be a dictionary + # that contains information about the workflow such + # as action_id, name and other related parameters + workflow_info = task_instance.xcom_pull( + task_ids='action_xcom', key='action', + dag_id='drydock_operator_parent') + # DrydockClient if self.action == 'create_drydock_client': drydock_client = self.drydock_session_client(context) @@ -75,8 +86,6 @@ class DryDockOperator(BaseOperator): return drydock_client # Retrieve drydock_client via XCOM so as to perform other tasks - task_instance = context['task_instance'] - drydock_client = task_instance.xcom_pull( task_ids='create_drydock_client', dag_id='drydock_operator_parent.drydock_operator_child') @@ -160,7 +169,7 @@ class DryDockOperator(BaseOperator): self.perform_task = 'prepare_node' prepare_node = self.drydock_perform_task( drydock_client, context, - self.perform_task, self.node_filter) + self.perform_task, workflow_info) # Define variables # Query every 30 seconds for 30 minutes @@ -191,7 +200,7 @@ class DryDockOperator(BaseOperator): deploy_node = self.drydock_perform_task(drydock_client, context, self.perform_task, - self.node_filter) + workflow_info) # Define variables # Query every 30 seconds for 60 minutes @@ -351,7 +360,7 @@ class DryDockOperator(BaseOperator): logging.info(load_design) def drydock_perform_task(self, drydock_client, context, - perform_task, node_filter): + perform_task, workflow_info): # Get Design ID and pass it to DryDock self.design_id = self.get_design_id(context) @@ -360,7 +369,12 @@ class DryDockOperator(BaseOperator): task_to_perform = self.perform_task # Node Filter - nodes_filter = self.node_filter + if workflow_info: + nodes_filter = workflow_info['parameters']['servername'] + else: + nodes_filter = None + + logging.info("Nodes Filter List: %s", nodes_filter) # Get uuid of the create_task's id self.task_id = drydock_client.create_task(self.design_id,