Update deploy_site Dags & Operators

There is a need to update the existing dags, i.e.
deploy_site, redeploy_server as well as update_site
in order to make use of xcom. This is needed in order
for the shipyard ACTION API to trigger the workflow.

A new subdag for drydock workflow has to be created
as well in order to fit into the flow of the deploy
site dag.  DryDock Operator was updated as well so
that we can use it with the deploy_site dag.

The uuid of the ACTION performed from Shipyard will
be logged as well so that we can use it for logs
correlation.

Change-Id: I25f7486a0510431d663547988fb5884a9845d2aa
This commit is contained in:
Anthony Lin 2017-09-07 18:36:22 +00:00
parent f79c0e1c31
commit 55b2f62ed2
5 changed files with 324 additions and 6 deletions

View File

@ -17,11 +17,13 @@ import airflow
from airflow import DAG
import failure_handlers
from preflight_checks import all_preflight_checks
from drydock_deploy_site import deploy_site_drydock
from validate_site_design import validate_site_design
from airflow.operators.subdag_operator import SubDagOperator
from airflow.operators import ConcurrencyCheckOperator
from airflow.operators import DeckhandOperator
from airflow.operators import PlaceholderOperator
from airflow.operators.python_operator import PythonOperator
"""
deploy_site is the top-level orchestration DAG for deploying a site using the
Undercloud platform.
@ -32,6 +34,7 @@ DAG_CONCURRENCY_CHECK_DAG_NAME = 'dag_concurrency_check'
ALL_PREFLIGHT_CHECKS_DAG_NAME = 'preflight'
DECKHAND_GET_DESIGN_VERSION = 'deckhand_get_design_version'
VALIDATE_SITE_DESIGN_DAG_NAME = 'validate_site_design'
DRYDOCK_BUILD_DAG_NAME = 'drydock_build'
default_args = {
'owner': 'airflow',
@ -40,11 +43,25 @@ default_args = {
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'provide_context': True,
'retries': 0,
'retry_delay': timedelta(minutes=1),
}
dag = DAG(PARENT_DAG_NAME, default_args=default_args, schedule_interval=None)
"""
Define push function to store the content of 'action' that is
defined via 'dag_run' in XCOM so that it can be used by the
Operators
"""
def xcom_push(**kwargs):
# Pushes action XCom
kwargs['ti'].xcom_push(key='action',
value=kwargs['dag_run'].conf['action'])
action_xcom = PythonOperator(
task_id='action_xcom', dag=dag, python_callable=xcom_push)
concurrency_check = ConcurrencyCheckOperator(
task_id=DAG_CONCURRENCY_CHECK_DAG_NAME,
@ -56,7 +73,7 @@ preflight = SubDagOperator(
PARENT_DAG_NAME, ALL_PREFLIGHT_CHECKS_DAG_NAME, args=default_args),
task_id=ALL_PREFLIGHT_CHECKS_DAG_NAME,
on_failure_callback=failure_handlers.step_failure_handler,
dag=dag, )
dag=dag)
get_design_version = DeckhandOperator(
task_id=DECKHAND_GET_DESIGN_VERSION,
@ -70,8 +87,10 @@ validate_site_design = SubDagOperator(
on_failure_callback=failure_handlers.step_failure_handler,
dag=dag)
drydock_build = PlaceholderOperator(
task_id='drydock_build',
drydock_build = SubDagOperator(
subdag=deploy_site_drydock(
PARENT_DAG_NAME, DRYDOCK_BUILD_DAG_NAME, args=default_args),
task_id=DRYDOCK_BUILD_DAG_NAME,
on_failure_callback=failure_handlers.step_failure_handler,
dag=dag)
@ -86,6 +105,7 @@ armada_build = PlaceholderOperator(
dag=dag)
# DAG Wiring
concurrency_check.set_upstream(action_xcom)
preflight.set_upstream(concurrency_check)
get_design_version.set_upstream(preflight)
validate_site_design.set_upstream(get_design_version)

View File

@ -0,0 +1,257 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import configparser
from airflow.models import DAG
from airflow.operators.subdag_operator import SubDagOperator
from airflow.operators import DryDockOperator
# Location of shiyard.conf
config_path = '/usr/local/airflow/plugins/shipyard.conf'
# Read and parse shiyard.conf
config = configparser.ConfigParser()
config.read(config_path)
# Define Variables
drydock_target_host = config.get('drydock', 'host')
drydock_port = config.get('drydock', 'port')
drydock_token = config.get('drydock', 'token')
drydock_conf = config.get('drydock', 'site_yaml')
promenade_conf = config.get('drydock', 'prom_yaml')
parent_dag = 'deploy_site'
child_dag = 'deploy_site.drydock_build'
def create_drydock_client(parent_dag_name, child_dag_name, args):
'''
Create Drydock Client
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args, )
operator = DryDockOperator(
task_id='create_drydock_client',
host=drydock_target_host,
port=drydock_port,
token=drydock_token,
shipyard_conf=config_path,
action='create_drydock_client',
main_dag_name=parent_dag,
sub_dag_name=child_dag,
dag=dag)
return dag
def drydock_get_design_id(parent_dag_name, child_dag_name, args):
'''
Get Design ID
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args, )
operator = DryDockOperator(
task_id='drydock_get_design_id',
action='get_design_id',
main_dag_name=parent_dag,
sub_dag_name=child_dag,
dag=dag)
return dag
def drydock_load_parts(parent_dag_name, child_dag_name, args):
'''
Load DryDock Yaml
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args, )
operator = DryDockOperator(
task_id='drydock_load_parts',
drydock_conf=drydock_conf,
action='drydock_load_parts',
main_dag_name=parent_dag,
sub_dag_name=child_dag,
dag=dag)
return dag
def promenade_load_parts(parent_dag_name, child_dag_name, args):
'''
Load Promenade Yaml
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args, )
operator = DryDockOperator(
task_id='promenade_load_parts',
promenade_conf=promenade_conf,
action='promenade_load_parts',
main_dag_name=parent_dag,
sub_dag_name=child_dag,
dag=dag)
return dag
def drydock_verify_site(parent_dag_name, child_dag_name, args):
'''
Verify connectivity between DryDock and MAAS
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args, )
operator = DryDockOperator(
task_id='drydock_verify_site',
action='verify_site',
main_dag_name=parent_dag,
sub_dag_name=child_dag,
dag=dag)
return dag
def drydock_prepare_site(parent_dag_name, child_dag_name, args):
'''
Prepare site for deployment
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args, )
operator = DryDockOperator(
task_id='drydock_prepare_site',
action='prepare_site',
main_dag_name=parent_dag,
sub_dag_name=child_dag,
dag=dag)
return dag
def drydock_prepare_node(parent_dag_name, child_dag_name, args):
'''
Prepare nodes for deployment
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args, )
operator = DryDockOperator(
task_id='drydock_prepare_node',
action='prepare_node',
main_dag_name=parent_dag,
sub_dag_name=child_dag,
dag=dag)
return dag
def drydock_deploy_node(parent_dag_name, child_dag_name, args):
'''
Deploy Nodes
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args, )
operator = DryDockOperator(
task_id='drydock_deploy_node',
action='deploy_node',
main_dag_name=parent_dag,
sub_dag_name=child_dag,
dag=dag)
return dag
# Names used for sub-subdags in the drydock site deployment subdag
CREATE_DRYDOCK_CLIENT_DAG_NAME = 'create_drydock_client'
DRYDOCK_GET_DESIGN_ID_DAG_NAME = 'drydock_get_design_id'
DRYDOCK_LOAD_PARTS_DAG_NAME = 'drydock_load_parts'
PROMENADE_LOAD_PARTS_DAG_NAME = 'promenade_load_parts'
DRYDOCK_VERIFY_SITE_DAG_NAME = 'drydock_verify_site'
DRYDOCK_PREPARE_SITE_DAG_NAME = 'drydock_prepare_site'
DRYDOCK_PREPARE_NODE_DAG_NAME = 'drydock_prepare_node'
DRYDOCK_DEPLOY_NODE_DAG_NAME = 'drydock_deploy_node'
def deploy_site_drydock(parent_dag_name, child_dag_name, args):
'''
Puts all of the drydock deploy site into atomic unit
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args, )
drydock_client = SubDagOperator(
subdag=create_drydock_client(dag.dag_id,
CREATE_DRYDOCK_CLIENT_DAG_NAME,
args),
task_id=CREATE_DRYDOCK_CLIENT_DAG_NAME,
dag=dag, )
design_id = SubDagOperator(
subdag=drydock_get_design_id(
dag.dag_id, DRYDOCK_GET_DESIGN_ID_DAG_NAME, args),
task_id=DRYDOCK_GET_DESIGN_ID_DAG_NAME,
dag=dag, )
drydock_yaml = SubDagOperator(
subdag=drydock_load_parts(
dag.dag_id, DRYDOCK_LOAD_PARTS_DAG_NAME, args),
task_id=DRYDOCK_LOAD_PARTS_DAG_NAME,
dag=dag, )
promenade_yaml = SubDagOperator(
subdag=promenade_load_parts(dag.dag_id,
PROMENADE_LOAD_PARTS_DAG_NAME, args),
task_id=PROMENADE_LOAD_PARTS_DAG_NAME,
dag=dag, )
verify_site = SubDagOperator(
subdag=drydock_verify_site(dag.dag_id,
DRYDOCK_VERIFY_SITE_DAG_NAME, args),
task_id=DRYDOCK_VERIFY_SITE_DAG_NAME,
dag=dag, )
prepare_site = SubDagOperator(
subdag=drydock_prepare_site(dag.dag_id,
DRYDOCK_PREPARE_SITE_DAG_NAME, args),
task_id=DRYDOCK_PREPARE_SITE_DAG_NAME,
dag=dag, )
prepare_node = SubDagOperator(
subdag=drydock_prepare_node(dag.dag_id,
DRYDOCK_PREPARE_NODE_DAG_NAME, args),
task_id=DRYDOCK_PREPARE_NODE_DAG_NAME,
dag=dag, )
deploy_node = SubDagOperator(
subdag=drydock_deploy_node(dag.dag_id,
DRYDOCK_DEPLOY_NODE_DAG_NAME, args),
task_id=DRYDOCK_DEPLOY_NODE_DAG_NAME,
dag=dag, )
# DAG Wiring
design_id.set_upstream(drydock_client)
drydock_yaml.set_upstream(design_id)
promenade_yaml.set_upstream(drydock_yaml)
verify_site.set_upstream(promenade_yaml)
prepare_site.set_upstream(verify_site)
prepare_node.set_upstream(prepare_site)
deploy_node.set_upstream(prepare_node)
return dag

View File

@ -22,6 +22,7 @@ from airflow.operators.subdag_operator import SubDagOperator
from airflow.operators import ConcurrencyCheckOperator
from airflow.operators import DeckhandOperator
from airflow.operators import PlaceholderOperator
from airflow.operators.python_operator import PythonOperator
"""
redeploy_server is the top-level orchestration DAG for redeploying a
server using the Undercloud platform.
@ -40,11 +41,25 @@ default_args = {
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'provide_context': True,
'retries': 0,
'retry_delay': timedelta(minutes=1),
}
dag = DAG(PARENT_DAG_NAME, default_args=default_args, schedule_interval=None)
"""
Define push function to store the content of 'action' that is
defined via 'dag_run' in XCOM so that it can be used by the
Operators
"""
def xcom_push(**kwargs):
# Pushes action XCom
kwargs['ti'].xcom_push(key='action',
value=kwargs['dag_run'].conf['action'])
action_xcom = PythonOperator(
task_id='action_xcom', dag=dag, python_callable=xcom_push)
concurrency_check = ConcurrencyCheckOperator(
task_id=DAG_CONCURRENCY_CHECK_DAG_NAME,
@ -91,6 +106,7 @@ armada_rebuild = PlaceholderOperator(
dag=dag)
# DAG Wiring
concurrency_check.set_upstream(action_xcom)
preflight.set_upstream(concurrency_check)
get_design_version.set_upstream(preflight)
validate_site_design.set_upstream(get_design_version)

View File

@ -22,6 +22,7 @@ from airflow.operators.subdag_operator import SubDagOperator
from airflow.operators import ConcurrencyCheckOperator
from airflow.operators import DeckhandOperator
from airflow.operators import PlaceholderOperator
from airflow.operators.python_operator import PythonOperator
"""
update_site is the top-level orchestration DAG for updating a site using the
Undercloud platform.
@ -40,11 +41,25 @@ default_args = {
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'provide_context': True,
'retries': 0,
'retry_delay': timedelta(minutes=1),
}
dag = DAG(PARENT_DAG_NAME, default_args=default_args, schedule_interval=None)
"""
Define push function to store the content of 'action' that is
defined via 'dag_run' in XCOM so that it can be used by the
Operators
"""
def xcom_push(**kwargs):
# Pushes action XCom
kwargs['ti'].xcom_push(key='action',
value=kwargs['dag_run'].conf['action'])
action_xcom = PythonOperator(
task_id='action_xcom', dag=dag, python_callable=xcom_push)
concurrency_check = ConcurrencyCheckOperator(
task_id=DAG_CONCURRENCY_CHECK_DAG_NAME,
@ -86,6 +101,7 @@ armada_build = PlaceholderOperator(
dag=dag)
# DAG Wiring
concurrency_check.set_upstream(action_xcom)
preflight.set_upstream(concurrency_check)
get_design_version.set_upstream(preflight)
validate_site_design.set_upstream(get_design_version)

View File

@ -40,6 +40,8 @@ class DryDockOperator(BaseOperator):
:action: Task to perform
:design_id: DryDock Design ID
:workflow_info: Information related to the workflow
:main_dag_name: Parent Dag
:sub_dag_name: Child Dag
"""
@apply_defaults
def __init__(self,
@ -52,6 +54,8 @@ class DryDockOperator(BaseOperator):
drydock_conf=None,
promenade_conf=None,
workflow_info={},
main_dag_name=None,
sub_dag_name=None,
xcom_push=True,
*args, **kwargs):
@ -65,6 +69,8 @@ class DryDockOperator(BaseOperator):
self.action = action
self.design_id = design_id
self.workflow_info = workflow_info
self.main_dag_name = main_dag_name
self.sub_dag_name = sub_dag_name
self.xcom_push_flag = xcom_push
def execute(self, context):
@ -77,7 +83,10 @@ class DryDockOperator(BaseOperator):
# as action_id, name and other related parameters
workflow_info = task_instance.xcom_pull(
task_ids='action_xcom', key='action',
dag_id='drydock_operator_parent')
dag_id=self.main_dag_name)
# Logs uuid of action performed by the Operator
logging.info("DryDock Operator for action %s", workflow_info['id'])
# DrydockClient
if self.action == 'create_drydock_client':
@ -88,7 +97,7 @@ class DryDockOperator(BaseOperator):
# Retrieve drydock_client via XCOM so as to perform other tasks
drydock_client = task_instance.xcom_pull(
task_ids='create_drydock_client',
dag_id='drydock_operator_parent.drydock_operator_child')
dag_id=self.sub_dag_name + '.create_drydock_client')
# Get Design ID
if self.action == 'get_design_id':
@ -326,7 +335,7 @@ class DryDockOperator(BaseOperator):
task_instance = context['task_instance']
design_id = task_instance.xcom_pull(
task_ids='drydock_get_design_id',
dag_id='drydock_operator_parent.drydock_operator_child')
dag_id=self.sub_dag_name + '.drydock_get_design_id')
return design_id