Merge "Update deploy_site Dags & Operators"

This commit is contained in:
Mark Burnett 2017-09-11 13:11:06 -04:00 committed by Gerrit Code Review
commit 38e7622323
5 changed files with 324 additions and 6 deletions

View File

@ -17,11 +17,13 @@ import airflow
from airflow import DAG
import failure_handlers
from preflight_checks import all_preflight_checks
from drydock_deploy_site import deploy_site_drydock
from validate_site_design import validate_site_design
from airflow.operators.subdag_operator import SubDagOperator
from airflow.operators import ConcurrencyCheckOperator
from airflow.operators import DeckhandOperator
from airflow.operators import PlaceholderOperator
from airflow.operators.python_operator import PythonOperator
"""
deploy_site is the top-level orchestration DAG for deploying a site using the
Undercloud platform.
@ -32,6 +34,7 @@ DAG_CONCURRENCY_CHECK_DAG_NAME = 'dag_concurrency_check'
ALL_PREFLIGHT_CHECKS_DAG_NAME = 'preflight'
DECKHAND_GET_DESIGN_VERSION = 'deckhand_get_design_version'
VALIDATE_SITE_DESIGN_DAG_NAME = 'validate_site_design'
DRYDOCK_BUILD_DAG_NAME = 'drydock_build'
default_args = {
'owner': 'airflow',
@ -40,11 +43,25 @@ default_args = {
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'provide_context': True,
'retries': 0,
'retry_delay': timedelta(minutes=1),
}
dag = DAG(PARENT_DAG_NAME, default_args=default_args, schedule_interval=None)
"""
Define push function to store the content of 'action' that is
defined via 'dag_run' in XCOM so that it can be used by the
Operators
"""
def xcom_push(**kwargs):
# Pushes action XCom
kwargs['ti'].xcom_push(key='action',
value=kwargs['dag_run'].conf['action'])
action_xcom = PythonOperator(
task_id='action_xcom', dag=dag, python_callable=xcom_push)
concurrency_check = ConcurrencyCheckOperator(
task_id=DAG_CONCURRENCY_CHECK_DAG_NAME,
@ -56,7 +73,7 @@ preflight = SubDagOperator(
PARENT_DAG_NAME, ALL_PREFLIGHT_CHECKS_DAG_NAME, args=default_args),
task_id=ALL_PREFLIGHT_CHECKS_DAG_NAME,
on_failure_callback=failure_handlers.step_failure_handler,
dag=dag, )
dag=dag)
get_design_version = DeckhandOperator(
task_id=DECKHAND_GET_DESIGN_VERSION,
@ -70,8 +87,10 @@ validate_site_design = SubDagOperator(
on_failure_callback=failure_handlers.step_failure_handler,
dag=dag)
drydock_build = PlaceholderOperator(
task_id='drydock_build',
drydock_build = SubDagOperator(
subdag=deploy_site_drydock(
PARENT_DAG_NAME, DRYDOCK_BUILD_DAG_NAME, args=default_args),
task_id=DRYDOCK_BUILD_DAG_NAME,
on_failure_callback=failure_handlers.step_failure_handler,
dag=dag)
@ -86,6 +105,7 @@ armada_build = PlaceholderOperator(
dag=dag)
# DAG Wiring
concurrency_check.set_upstream(action_xcom)
preflight.set_upstream(concurrency_check)
get_design_version.set_upstream(preflight)
validate_site_design.set_upstream(get_design_version)

View File

@ -0,0 +1,257 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import configparser
from airflow.models import DAG
from airflow.operators.subdag_operator import SubDagOperator
from airflow.operators import DryDockOperator
# Location of shiyard.conf
config_path = '/usr/local/airflow/plugins/shipyard.conf'
# Read and parse shiyard.conf
config = configparser.ConfigParser()
config.read(config_path)
# Define Variables
drydock_target_host = config.get('drydock', 'host')
drydock_port = config.get('drydock', 'port')
drydock_token = config.get('drydock', 'token')
drydock_conf = config.get('drydock', 'site_yaml')
promenade_conf = config.get('drydock', 'prom_yaml')
parent_dag = 'deploy_site'
child_dag = 'deploy_site.drydock_build'
def create_drydock_client(parent_dag_name, child_dag_name, args):
'''
Create Drydock Client
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args, )
operator = DryDockOperator(
task_id='create_drydock_client',
host=drydock_target_host,
port=drydock_port,
token=drydock_token,
shipyard_conf=config_path,
action='create_drydock_client',
main_dag_name=parent_dag,
sub_dag_name=child_dag,
dag=dag)
return dag
def drydock_get_design_id(parent_dag_name, child_dag_name, args):
'''
Get Design ID
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args, )
operator = DryDockOperator(
task_id='drydock_get_design_id',
action='get_design_id',
main_dag_name=parent_dag,
sub_dag_name=child_dag,
dag=dag)
return dag
def drydock_load_parts(parent_dag_name, child_dag_name, args):
'''
Load DryDock Yaml
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args, )
operator = DryDockOperator(
task_id='drydock_load_parts',
drydock_conf=drydock_conf,
action='drydock_load_parts',
main_dag_name=parent_dag,
sub_dag_name=child_dag,
dag=dag)
return dag
def promenade_load_parts(parent_dag_name, child_dag_name, args):
'''
Load Promenade Yaml
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args, )
operator = DryDockOperator(
task_id='promenade_load_parts',
promenade_conf=promenade_conf,
action='promenade_load_parts',
main_dag_name=parent_dag,
sub_dag_name=child_dag,
dag=dag)
return dag
def drydock_verify_site(parent_dag_name, child_dag_name, args):
'''
Verify connectivity between DryDock and MAAS
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args, )
operator = DryDockOperator(
task_id='drydock_verify_site',
action='verify_site',
main_dag_name=parent_dag,
sub_dag_name=child_dag,
dag=dag)
return dag
def drydock_prepare_site(parent_dag_name, child_dag_name, args):
'''
Prepare site for deployment
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args, )
operator = DryDockOperator(
task_id='drydock_prepare_site',
action='prepare_site',
main_dag_name=parent_dag,
sub_dag_name=child_dag,
dag=dag)
return dag
def drydock_prepare_node(parent_dag_name, child_dag_name, args):
'''
Prepare nodes for deployment
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args, )
operator = DryDockOperator(
task_id='drydock_prepare_node',
action='prepare_node',
main_dag_name=parent_dag,
sub_dag_name=child_dag,
dag=dag)
return dag
def drydock_deploy_node(parent_dag_name, child_dag_name, args):
'''
Deploy Nodes
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args, )
operator = DryDockOperator(
task_id='drydock_deploy_node',
action='deploy_node',
main_dag_name=parent_dag,
sub_dag_name=child_dag,
dag=dag)
return dag
# Names used for sub-subdags in the drydock site deployment subdag
CREATE_DRYDOCK_CLIENT_DAG_NAME = 'create_drydock_client'
DRYDOCK_GET_DESIGN_ID_DAG_NAME = 'drydock_get_design_id'
DRYDOCK_LOAD_PARTS_DAG_NAME = 'drydock_load_parts'
PROMENADE_LOAD_PARTS_DAG_NAME = 'promenade_load_parts'
DRYDOCK_VERIFY_SITE_DAG_NAME = 'drydock_verify_site'
DRYDOCK_PREPARE_SITE_DAG_NAME = 'drydock_prepare_site'
DRYDOCK_PREPARE_NODE_DAG_NAME = 'drydock_prepare_node'
DRYDOCK_DEPLOY_NODE_DAG_NAME = 'drydock_deploy_node'
def deploy_site_drydock(parent_dag_name, child_dag_name, args):
'''
Puts all of the drydock deploy site into atomic unit
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args, )
drydock_client = SubDagOperator(
subdag=create_drydock_client(dag.dag_id,
CREATE_DRYDOCK_CLIENT_DAG_NAME,
args),
task_id=CREATE_DRYDOCK_CLIENT_DAG_NAME,
dag=dag, )
design_id = SubDagOperator(
subdag=drydock_get_design_id(
dag.dag_id, DRYDOCK_GET_DESIGN_ID_DAG_NAME, args),
task_id=DRYDOCK_GET_DESIGN_ID_DAG_NAME,
dag=dag, )
drydock_yaml = SubDagOperator(
subdag=drydock_load_parts(
dag.dag_id, DRYDOCK_LOAD_PARTS_DAG_NAME, args),
task_id=DRYDOCK_LOAD_PARTS_DAG_NAME,
dag=dag, )
promenade_yaml = SubDagOperator(
subdag=promenade_load_parts(dag.dag_id,
PROMENADE_LOAD_PARTS_DAG_NAME, args),
task_id=PROMENADE_LOAD_PARTS_DAG_NAME,
dag=dag, )
verify_site = SubDagOperator(
subdag=drydock_verify_site(dag.dag_id,
DRYDOCK_VERIFY_SITE_DAG_NAME, args),
task_id=DRYDOCK_VERIFY_SITE_DAG_NAME,
dag=dag, )
prepare_site = SubDagOperator(
subdag=drydock_prepare_site(dag.dag_id,
DRYDOCK_PREPARE_SITE_DAG_NAME, args),
task_id=DRYDOCK_PREPARE_SITE_DAG_NAME,
dag=dag, )
prepare_node = SubDagOperator(
subdag=drydock_prepare_node(dag.dag_id,
DRYDOCK_PREPARE_NODE_DAG_NAME, args),
task_id=DRYDOCK_PREPARE_NODE_DAG_NAME,
dag=dag, )
deploy_node = SubDagOperator(
subdag=drydock_deploy_node(dag.dag_id,
DRYDOCK_DEPLOY_NODE_DAG_NAME, args),
task_id=DRYDOCK_DEPLOY_NODE_DAG_NAME,
dag=dag, )
# DAG Wiring
design_id.set_upstream(drydock_client)
drydock_yaml.set_upstream(design_id)
promenade_yaml.set_upstream(drydock_yaml)
verify_site.set_upstream(promenade_yaml)
prepare_site.set_upstream(verify_site)
prepare_node.set_upstream(prepare_site)
deploy_node.set_upstream(prepare_node)
return dag

View File

@ -22,6 +22,7 @@ from airflow.operators.subdag_operator import SubDagOperator
from airflow.operators import ConcurrencyCheckOperator
from airflow.operators import DeckhandOperator
from airflow.operators import PlaceholderOperator
from airflow.operators.python_operator import PythonOperator
"""
redeploy_server is the top-level orchestration DAG for redeploying a
server using the Undercloud platform.
@ -40,11 +41,25 @@ default_args = {
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'provide_context': True,
'retries': 0,
'retry_delay': timedelta(minutes=1),
}
dag = DAG(PARENT_DAG_NAME, default_args=default_args, schedule_interval=None)
"""
Define push function to store the content of 'action' that is
defined via 'dag_run' in XCOM so that it can be used by the
Operators
"""
def xcom_push(**kwargs):
# Pushes action XCom
kwargs['ti'].xcom_push(key='action',
value=kwargs['dag_run'].conf['action'])
action_xcom = PythonOperator(
task_id='action_xcom', dag=dag, python_callable=xcom_push)
concurrency_check = ConcurrencyCheckOperator(
task_id=DAG_CONCURRENCY_CHECK_DAG_NAME,
@ -91,6 +106,7 @@ armada_rebuild = PlaceholderOperator(
dag=dag)
# DAG Wiring
concurrency_check.set_upstream(action_xcom)
preflight.set_upstream(concurrency_check)
get_design_version.set_upstream(preflight)
validate_site_design.set_upstream(get_design_version)

View File

@ -22,6 +22,7 @@ from airflow.operators.subdag_operator import SubDagOperator
from airflow.operators import ConcurrencyCheckOperator
from airflow.operators import DeckhandOperator
from airflow.operators import PlaceholderOperator
from airflow.operators.python_operator import PythonOperator
"""
update_site is the top-level orchestration DAG for updating a site using the
Undercloud platform.
@ -40,11 +41,25 @@ default_args = {
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'provide_context': True,
'retries': 0,
'retry_delay': timedelta(minutes=1),
}
dag = DAG(PARENT_DAG_NAME, default_args=default_args, schedule_interval=None)
"""
Define push function to store the content of 'action' that is
defined via 'dag_run' in XCOM so that it can be used by the
Operators
"""
def xcom_push(**kwargs):
# Pushes action XCom
kwargs['ti'].xcom_push(key='action',
value=kwargs['dag_run'].conf['action'])
action_xcom = PythonOperator(
task_id='action_xcom', dag=dag, python_callable=xcom_push)
concurrency_check = ConcurrencyCheckOperator(
task_id=DAG_CONCURRENCY_CHECK_DAG_NAME,
@ -86,6 +101,7 @@ armada_build = PlaceholderOperator(
dag=dag)
# DAG Wiring
concurrency_check.set_upstream(action_xcom)
preflight.set_upstream(concurrency_check)
get_design_version.set_upstream(preflight)
validate_site_design.set_upstream(get_design_version)

View File

@ -40,6 +40,8 @@ class DryDockOperator(BaseOperator):
:action: Task to perform
:design_id: DryDock Design ID
:workflow_info: Information related to the workflow
:main_dag_name: Parent Dag
:sub_dag_name: Child Dag
"""
@apply_defaults
def __init__(self,
@ -52,6 +54,8 @@ class DryDockOperator(BaseOperator):
drydock_conf=None,
promenade_conf=None,
workflow_info={},
main_dag_name=None,
sub_dag_name=None,
xcom_push=True,
*args, **kwargs):
@ -65,6 +69,8 @@ class DryDockOperator(BaseOperator):
self.action = action
self.design_id = design_id
self.workflow_info = workflow_info
self.main_dag_name = main_dag_name
self.sub_dag_name = sub_dag_name
self.xcom_push_flag = xcom_push
def execute(self, context):
@ -77,7 +83,10 @@ class DryDockOperator(BaseOperator):
# as action_id, name and other related parameters
workflow_info = task_instance.xcom_pull(
task_ids='action_xcom', key='action',
dag_id='drydock_operator_parent')
dag_id=self.main_dag_name)
# Logs uuid of action performed by the Operator
logging.info("DryDock Operator for action %s", workflow_info['id'])
# DrydockClient
if self.action == 'create_drydock_client':
@ -88,7 +97,7 @@ class DryDockOperator(BaseOperator):
# Retrieve drydock_client via XCOM so as to perform other tasks
drydock_client = task_instance.xcom_pull(
task_ids='create_drydock_client',
dag_id='drydock_operator_parent.drydock_operator_child')
dag_id=self.sub_dag_name + '.create_drydock_client')
# Get Design ID
if self.action == 'get_design_id':
@ -326,7 +335,7 @@ class DryDockOperator(BaseOperator):
task_instance = context['task_instance']
design_id = task_instance.xcom_pull(
task_ids='drydock_get_design_id',
dag_id='drydock_operator_parent.drydock_operator_child')
dag_id=self.sub_dag_name + '.drydock_get_design_id')
return design_id