From 55b2f62ed2b14915699f230b82dc7124a0ff3fde Mon Sep 17 00:00:00 2001 From: Anthony Lin Date: Thu, 7 Sep 2017 18:36:22 +0000 Subject: [PATCH] Update deploy_site Dags & Operators There is a need to update the existing dags, i.e. deploy_site, redeploy_server as well as update_site in order to make use of xcom. This is needed in order for the shipyard ACTION API to trigger the workflow. A new subdag for drydock workflow has to be created as well in order to fit into the flow of the deploy site dag. DryDock Operator was updated as well so that we can use it with the deploy_site dag. The uuid of the ACTION performed from Shipyard will be logged as well so that we can use it for logs correlation. Change-Id: I25f7486a0510431d663547988fb5884a9845d2aa --- shipyard_airflow/dags/deploy_site.py | 26 +- shipyard_airflow/dags/drydock_deploy_site.py | 257 ++++++++++++++++++ shipyard_airflow/dags/redeploy_server.py | 16 ++ shipyard_airflow/dags/update_site.py | 16 ++ shipyard_airflow/plugins/drydock_operators.py | 15 +- 5 files changed, 324 insertions(+), 6 deletions(-) create mode 100644 shipyard_airflow/dags/drydock_deploy_site.py diff --git a/shipyard_airflow/dags/deploy_site.py b/shipyard_airflow/dags/deploy_site.py index 19edc262..1b1909a1 100644 --- a/shipyard_airflow/dags/deploy_site.py +++ b/shipyard_airflow/dags/deploy_site.py @@ -17,11 +17,13 @@ import airflow from airflow import DAG import failure_handlers from preflight_checks import all_preflight_checks +from drydock_deploy_site import deploy_site_drydock from validate_site_design import validate_site_design from airflow.operators.subdag_operator import SubDagOperator from airflow.operators import ConcurrencyCheckOperator from airflow.operators import DeckhandOperator from airflow.operators import PlaceholderOperator +from airflow.operators.python_operator import PythonOperator """ deploy_site is the top-level orchestration DAG for deploying a site using the Undercloud platform. @@ -32,6 +34,7 @@ DAG_CONCURRENCY_CHECK_DAG_NAME = 'dag_concurrency_check' ALL_PREFLIGHT_CHECKS_DAG_NAME = 'preflight' DECKHAND_GET_DESIGN_VERSION = 'deckhand_get_design_version' VALIDATE_SITE_DESIGN_DAG_NAME = 'validate_site_design' +DRYDOCK_BUILD_DAG_NAME = 'drydock_build' default_args = { 'owner': 'airflow', @@ -40,11 +43,25 @@ default_args = { 'email': [''], 'email_on_failure': False, 'email_on_retry': False, + 'provide_context': True, 'retries': 0, 'retry_delay': timedelta(minutes=1), } dag = DAG(PARENT_DAG_NAME, default_args=default_args, schedule_interval=None) +""" +Define push function to store the content of 'action' that is +defined via 'dag_run' in XCOM so that it can be used by the +Operators +""" +def xcom_push(**kwargs): + # Pushes action XCom + kwargs['ti'].xcom_push(key='action', + value=kwargs['dag_run'].conf['action']) + + +action_xcom = PythonOperator( + task_id='action_xcom', dag=dag, python_callable=xcom_push) concurrency_check = ConcurrencyCheckOperator( task_id=DAG_CONCURRENCY_CHECK_DAG_NAME, @@ -56,7 +73,7 @@ preflight = SubDagOperator( PARENT_DAG_NAME, ALL_PREFLIGHT_CHECKS_DAG_NAME, args=default_args), task_id=ALL_PREFLIGHT_CHECKS_DAG_NAME, on_failure_callback=failure_handlers.step_failure_handler, - dag=dag, ) + dag=dag) get_design_version = DeckhandOperator( task_id=DECKHAND_GET_DESIGN_VERSION, @@ -70,8 +87,10 @@ validate_site_design = SubDagOperator( on_failure_callback=failure_handlers.step_failure_handler, dag=dag) -drydock_build = PlaceholderOperator( - task_id='drydock_build', +drydock_build = SubDagOperator( + subdag=deploy_site_drydock( + PARENT_DAG_NAME, DRYDOCK_BUILD_DAG_NAME, args=default_args), + task_id=DRYDOCK_BUILD_DAG_NAME, on_failure_callback=failure_handlers.step_failure_handler, dag=dag) @@ -86,6 +105,7 @@ armada_build = PlaceholderOperator( dag=dag) # DAG Wiring +concurrency_check.set_upstream(action_xcom) preflight.set_upstream(concurrency_check) get_design_version.set_upstream(preflight) validate_site_design.set_upstream(get_design_version) diff --git a/shipyard_airflow/dags/drydock_deploy_site.py b/shipyard_airflow/dags/drydock_deploy_site.py new file mode 100644 index 00000000..5df71542 --- /dev/null +++ b/shipyard_airflow/dags/drydock_deploy_site.py @@ -0,0 +1,257 @@ +# Copyright 2017 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import configparser + +from airflow.models import DAG +from airflow.operators.subdag_operator import SubDagOperator +from airflow.operators import DryDockOperator + + +# Location of shiyard.conf +config_path = '/usr/local/airflow/plugins/shipyard.conf' + +# Read and parse shiyard.conf +config = configparser.ConfigParser() +config.read(config_path) + +# Define Variables +drydock_target_host = config.get('drydock', 'host') +drydock_port = config.get('drydock', 'port') +drydock_token = config.get('drydock', 'token') +drydock_conf = config.get('drydock', 'site_yaml') +promenade_conf = config.get('drydock', 'prom_yaml') +parent_dag = 'deploy_site' +child_dag = 'deploy_site.drydock_build' + +def create_drydock_client(parent_dag_name, child_dag_name, args): + ''' + Create Drydock Client + ''' + dag = DAG( + '{}.{}'.format(parent_dag_name, child_dag_name), + default_args=args, ) + + operator = DryDockOperator( + task_id='create_drydock_client', + host=drydock_target_host, + port=drydock_port, + token=drydock_token, + shipyard_conf=config_path, + action='create_drydock_client', + main_dag_name=parent_dag, + sub_dag_name=child_dag, + dag=dag) + + return dag + +def drydock_get_design_id(parent_dag_name, child_dag_name, args): + ''' + Get Design ID + ''' + dag = DAG( + '{}.{}'.format(parent_dag_name, child_dag_name), + default_args=args, ) + + operator = DryDockOperator( + task_id='drydock_get_design_id', + action='get_design_id', + main_dag_name=parent_dag, + sub_dag_name=child_dag, + dag=dag) + + return dag + +def drydock_load_parts(parent_dag_name, child_dag_name, args): + ''' + Load DryDock Yaml + ''' + dag = DAG( + '{}.{}'.format(parent_dag_name, child_dag_name), + default_args=args, ) + + operator = DryDockOperator( + task_id='drydock_load_parts', + drydock_conf=drydock_conf, + action='drydock_load_parts', + main_dag_name=parent_dag, + sub_dag_name=child_dag, + dag=dag) + + return dag + +def promenade_load_parts(parent_dag_name, child_dag_name, args): + ''' + Load Promenade Yaml + ''' + dag = DAG( + '{}.{}'.format(parent_dag_name, child_dag_name), + default_args=args, ) + + operator = DryDockOperator( + task_id='promenade_load_parts', + promenade_conf=promenade_conf, + action='promenade_load_parts', + main_dag_name=parent_dag, + sub_dag_name=child_dag, + dag=dag) + + return dag + +def drydock_verify_site(parent_dag_name, child_dag_name, args): + ''' + Verify connectivity between DryDock and MAAS + ''' + dag = DAG( + '{}.{}'.format(parent_dag_name, child_dag_name), + default_args=args, ) + + operator = DryDockOperator( + task_id='drydock_verify_site', + action='verify_site', + main_dag_name=parent_dag, + sub_dag_name=child_dag, + dag=dag) + + return dag + +def drydock_prepare_site(parent_dag_name, child_dag_name, args): + ''' + Prepare site for deployment + ''' + dag = DAG( + '{}.{}'.format(parent_dag_name, child_dag_name), + default_args=args, ) + + operator = DryDockOperator( + task_id='drydock_prepare_site', + action='prepare_site', + main_dag_name=parent_dag, + sub_dag_name=child_dag, + dag=dag) + + return dag + +def drydock_prepare_node(parent_dag_name, child_dag_name, args): + ''' + Prepare nodes for deployment + ''' + dag = DAG( + '{}.{}'.format(parent_dag_name, child_dag_name), + default_args=args, ) + + operator = DryDockOperator( + task_id='drydock_prepare_node', + action='prepare_node', + main_dag_name=parent_dag, + sub_dag_name=child_dag, + dag=dag) + + return dag + +def drydock_deploy_node(parent_dag_name, child_dag_name, args): + ''' + Deploy Nodes + ''' + dag = DAG( + '{}.{}'.format(parent_dag_name, child_dag_name), + default_args=args, ) + + operator = DryDockOperator( + task_id='drydock_deploy_node', + action='deploy_node', + main_dag_name=parent_dag, + sub_dag_name=child_dag, + dag=dag) + + return dag + + +# Names used for sub-subdags in the drydock site deployment subdag +CREATE_DRYDOCK_CLIENT_DAG_NAME = 'create_drydock_client' +DRYDOCK_GET_DESIGN_ID_DAG_NAME = 'drydock_get_design_id' +DRYDOCK_LOAD_PARTS_DAG_NAME = 'drydock_load_parts' +PROMENADE_LOAD_PARTS_DAG_NAME = 'promenade_load_parts' +DRYDOCK_VERIFY_SITE_DAG_NAME = 'drydock_verify_site' +DRYDOCK_PREPARE_SITE_DAG_NAME = 'drydock_prepare_site' +DRYDOCK_PREPARE_NODE_DAG_NAME = 'drydock_prepare_node' +DRYDOCK_DEPLOY_NODE_DAG_NAME = 'drydock_deploy_node' + +def deploy_site_drydock(parent_dag_name, child_dag_name, args): + ''' + Puts all of the drydock deploy site into atomic unit + ''' + dag = DAG( + '{}.{}'.format(parent_dag_name, child_dag_name), + default_args=args, ) + + drydock_client = SubDagOperator( + subdag=create_drydock_client(dag.dag_id, + CREATE_DRYDOCK_CLIENT_DAG_NAME, + args), + task_id=CREATE_DRYDOCK_CLIENT_DAG_NAME, + dag=dag, ) + + design_id = SubDagOperator( + subdag=drydock_get_design_id( + dag.dag_id, DRYDOCK_GET_DESIGN_ID_DAG_NAME, args), + task_id=DRYDOCK_GET_DESIGN_ID_DAG_NAME, + dag=dag, ) + + drydock_yaml = SubDagOperator( + subdag=drydock_load_parts( + dag.dag_id, DRYDOCK_LOAD_PARTS_DAG_NAME, args), + task_id=DRYDOCK_LOAD_PARTS_DAG_NAME, + dag=dag, ) + + promenade_yaml = SubDagOperator( + subdag=promenade_load_parts(dag.dag_id, + PROMENADE_LOAD_PARTS_DAG_NAME, args), + task_id=PROMENADE_LOAD_PARTS_DAG_NAME, + dag=dag, ) + + verify_site = SubDagOperator( + subdag=drydock_verify_site(dag.dag_id, + DRYDOCK_VERIFY_SITE_DAG_NAME, args), + task_id=DRYDOCK_VERIFY_SITE_DAG_NAME, + dag=dag, ) + + prepare_site = SubDagOperator( + subdag=drydock_prepare_site(dag.dag_id, + DRYDOCK_PREPARE_SITE_DAG_NAME, args), + task_id=DRYDOCK_PREPARE_SITE_DAG_NAME, + dag=dag, ) + + prepare_node = SubDagOperator( + subdag=drydock_prepare_node(dag.dag_id, + DRYDOCK_PREPARE_NODE_DAG_NAME, args), + task_id=DRYDOCK_PREPARE_NODE_DAG_NAME, + dag=dag, ) + + deploy_node = SubDagOperator( + subdag=drydock_deploy_node(dag.dag_id, + DRYDOCK_DEPLOY_NODE_DAG_NAME, args), + task_id=DRYDOCK_DEPLOY_NODE_DAG_NAME, + dag=dag, ) + + # DAG Wiring + design_id.set_upstream(drydock_client) + drydock_yaml.set_upstream(design_id) + promenade_yaml.set_upstream(drydock_yaml) + verify_site.set_upstream(promenade_yaml) + prepare_site.set_upstream(verify_site) + prepare_node.set_upstream(prepare_site) + deploy_node.set_upstream(prepare_node) + + return dag diff --git a/shipyard_airflow/dags/redeploy_server.py b/shipyard_airflow/dags/redeploy_server.py index ddbc1522..6b06109a 100644 --- a/shipyard_airflow/dags/redeploy_server.py +++ b/shipyard_airflow/dags/redeploy_server.py @@ -22,6 +22,7 @@ from airflow.operators.subdag_operator import SubDagOperator from airflow.operators import ConcurrencyCheckOperator from airflow.operators import DeckhandOperator from airflow.operators import PlaceholderOperator +from airflow.operators.python_operator import PythonOperator """ redeploy_server is the top-level orchestration DAG for redeploying a server using the Undercloud platform. @@ -40,11 +41,25 @@ default_args = { 'email': [''], 'email_on_failure': False, 'email_on_retry': False, + 'provide_context': True, 'retries': 0, 'retry_delay': timedelta(minutes=1), } dag = DAG(PARENT_DAG_NAME, default_args=default_args, schedule_interval=None) +""" +Define push function to store the content of 'action' that is +defined via 'dag_run' in XCOM so that it can be used by the +Operators +""" +def xcom_push(**kwargs): + # Pushes action XCom + kwargs['ti'].xcom_push(key='action', + value=kwargs['dag_run'].conf['action']) + + +action_xcom = PythonOperator( + task_id='action_xcom', dag=dag, python_callable=xcom_push) concurrency_check = ConcurrencyCheckOperator( task_id=DAG_CONCURRENCY_CHECK_DAG_NAME, @@ -91,6 +106,7 @@ armada_rebuild = PlaceholderOperator( dag=dag) # DAG Wiring +concurrency_check.set_upstream(action_xcom) preflight.set_upstream(concurrency_check) get_design_version.set_upstream(preflight) validate_site_design.set_upstream(get_design_version) diff --git a/shipyard_airflow/dags/update_site.py b/shipyard_airflow/dags/update_site.py index 2452c417..57a8bdea 100644 --- a/shipyard_airflow/dags/update_site.py +++ b/shipyard_airflow/dags/update_site.py @@ -22,6 +22,7 @@ from airflow.operators.subdag_operator import SubDagOperator from airflow.operators import ConcurrencyCheckOperator from airflow.operators import DeckhandOperator from airflow.operators import PlaceholderOperator +from airflow.operators.python_operator import PythonOperator """ update_site is the top-level orchestration DAG for updating a site using the Undercloud platform. @@ -40,11 +41,25 @@ default_args = { 'email': [''], 'email_on_failure': False, 'email_on_retry': False, + 'provide_context': True, 'retries': 0, 'retry_delay': timedelta(minutes=1), } dag = DAG(PARENT_DAG_NAME, default_args=default_args, schedule_interval=None) +""" +Define push function to store the content of 'action' that is +defined via 'dag_run' in XCOM so that it can be used by the +Operators +""" +def xcom_push(**kwargs): + # Pushes action XCom + kwargs['ti'].xcom_push(key='action', + value=kwargs['dag_run'].conf['action']) + + +action_xcom = PythonOperator( + task_id='action_xcom', dag=dag, python_callable=xcom_push) concurrency_check = ConcurrencyCheckOperator( task_id=DAG_CONCURRENCY_CHECK_DAG_NAME, @@ -86,6 +101,7 @@ armada_build = PlaceholderOperator( dag=dag) # DAG Wiring +concurrency_check.set_upstream(action_xcom) preflight.set_upstream(concurrency_check) get_design_version.set_upstream(preflight) validate_site_design.set_upstream(get_design_version) diff --git a/shipyard_airflow/plugins/drydock_operators.py b/shipyard_airflow/plugins/drydock_operators.py index 079f4bca..75c14ce0 100644 --- a/shipyard_airflow/plugins/drydock_operators.py +++ b/shipyard_airflow/plugins/drydock_operators.py @@ -40,6 +40,8 @@ class DryDockOperator(BaseOperator): :action: Task to perform :design_id: DryDock Design ID :workflow_info: Information related to the workflow + :main_dag_name: Parent Dag + :sub_dag_name: Child Dag """ @apply_defaults def __init__(self, @@ -52,6 +54,8 @@ class DryDockOperator(BaseOperator): drydock_conf=None, promenade_conf=None, workflow_info={}, + main_dag_name=None, + sub_dag_name=None, xcom_push=True, *args, **kwargs): @@ -65,6 +69,8 @@ class DryDockOperator(BaseOperator): self.action = action self.design_id = design_id self.workflow_info = workflow_info + self.main_dag_name = main_dag_name + self.sub_dag_name = sub_dag_name self.xcom_push_flag = xcom_push def execute(self, context): @@ -77,7 +83,10 @@ class DryDockOperator(BaseOperator): # as action_id, name and other related parameters workflow_info = task_instance.xcom_pull( task_ids='action_xcom', key='action', - dag_id='drydock_operator_parent') + dag_id=self.main_dag_name) + + # Logs uuid of action performed by the Operator + logging.info("DryDock Operator for action %s", workflow_info['id']) # DrydockClient if self.action == 'create_drydock_client': @@ -88,7 +97,7 @@ class DryDockOperator(BaseOperator): # Retrieve drydock_client via XCOM so as to perform other tasks drydock_client = task_instance.xcom_pull( task_ids='create_drydock_client', - dag_id='drydock_operator_parent.drydock_operator_child') + dag_id=self.sub_dag_name + '.create_drydock_client') # Get Design ID if self.action == 'get_design_id': @@ -326,7 +335,7 @@ class DryDockOperator(BaseOperator): task_instance = context['task_instance'] design_id = task_instance.xcom_pull( task_ids='drydock_get_design_id', - dag_id='drydock_operator_parent.drydock_operator_child') + dag_id=self.sub_dag_name + '.drydock_get_design_id') return design_id