Add deploy site DAG skeleton

reformatted using yapf

updated to import so airflow could find things

updated to use the right parameters to subdags

updated to remove promenade steps

fix some trailing whitespace

changed for review comments

rename files to properly use underscores instead of dashes

Change-Id: I5bd5b6d76f2f98afbaffa62ed7d4a4d756bc3dfc
This commit is contained in:
Bryan Strassner 2017-08-11 12:59:54 -05:00
parent 539d5050ad
commit 4c2a3a5a75
9 changed files with 506 additions and 13 deletions

View File

@ -1,13 +0,0 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View File

View File

@ -0,0 +1,53 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import airflow
from airflow.models import DAG
from airflow.operators import PlaceholderOperator
from airflow.operators.dummy_operator import DummyOperator
def dag_concurrency_check(parent_dag_name, child_dag_name, args):
'''
dag_concurrency_check is a sub-DAG that will will allow for a DAG to
determine if it is already running, and result in an error if so.
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args, )
# TODO () Replace this operator with a real operator that will:
# 1) Look for an instance of the parent_dag_name running currently in
# airflow
# 2) Fail if the parent_dag_name is running
# 3) Succeed if there are no instances of parent_dag_name running
dag_concurrency_check_operator = PlaceholderOperator(
task_id='dag_concurrency_check', dag=dag)
return dag
def dag_concurrency_check_failure_handler(parent_dag_name, child_dag_name,
args):
'''
Peforms the actions necessary when concurrency checks fail
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args, )
operator = DummyOperator(
task_id='dag_concurrency_check_failure_handler', dag=dag, )
return dag

View File

@ -0,0 +1,143 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import datetime, timedelta
import airflow
from airflow import DAG
from dag_concurrency_check import dag_concurrency_check
from dag_concurrency_check import dag_concurrency_check_failure_handler
from preflight_checks import all_preflight_checks
from preflight_checks import preflight_failure_handler
from validate_site_design import validate_site_design
from validate_site_design import validate_site_design_failure_handler
from airflow.operators.subdag_operator import SubDagOperator
from airflow.operators import DeckhandOperator
from airflow.operators import PlaceholderOperator
from airflow.utils.trigger_rule import TriggerRule
from datetime import datetime, timedelta
'''
deploy_site is the top-level orchestration DAG for deploying a site using the
Undercloud platform.
'''
PARENT_DAG_NAME = 'deploy_site'
DAG_CONCURRENCY_CHECK_DAG_NAME = 'dag_concurrency_check'
CONCURRENCY_FAILURE_DAG_NAME = 'concurrency_check_failure_handler'
ALL_PREFLIGHT_CHECKS_DAG_NAME = 'preflight'
PREFLIGHT_FAILURE_DAG_NAME = 'preflight_failure_handler'
DECKHAND_GET_DESIGN_VERSION = 'deckhand_get_design_version'
VALIDATE_SITE_DESIGN_DAG_NAME = 'validate_site_design'
VALIDATION_FAILED_DAG_NAME = 'validate_site_design_failure_handler'
DECKHAND_MARK_LAST_KNOWN_GOOD = 'deckhand_mark_last_known_good'
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(1),
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=1),
}
dag = DAG(PARENT_DAG_NAME, default_args=default_args, schedule_interval=None)
concurrency_check = SubDagOperator(
subdag=dag_concurrency_check(PARENT_DAG_NAME,
DAG_CONCURRENCY_CHECK_DAG_NAME,
args=default_args),
task_id=DAG_CONCURRENCY_CHECK_DAG_NAME,
dag=dag, )
concurrency_check_failure_handler = SubDagOperator(
subdag=dag_concurrency_check_failure_handler(
PARENT_DAG_NAME, CONCURRENCY_FAILURE_DAG_NAME,
args=default_args),
task_id=CONCURRENCY_FAILURE_DAG_NAME,
trigger_rule=TriggerRule.ONE_FAILED,
dag=dag, )
preflight = SubDagOperator(
subdag=all_preflight_checks(PARENT_DAG_NAME, ALL_PREFLIGHT_CHECKS_DAG_NAME,
args=default_args),
task_id=ALL_PREFLIGHT_CHECKS_DAG_NAME,
dag=dag, )
preflight_failure = SubDagOperator(
subdag=preflight_failure_handler(PARENT_DAG_NAME,
PREFLIGHT_FAILURE_DAG_NAME,
args=default_args),
task_id=PREFLIGHT_FAILURE_DAG_NAME,
trigger_rule=TriggerRule.ONE_FAILED,
dag=dag, )
get_design_version = DeckhandOperator(
task_id=DECKHAND_GET_DESIGN_VERSION, dag=dag)
validate_site_design = SubDagOperator(
subdag=validate_site_design(PARENT_DAG_NAME, VALIDATE_SITE_DESIGN_DAG_NAME,
args=default_args),
task_id=VALIDATE_SITE_DESIGN_DAG_NAME,
dag=dag)
validate_site_design_failure = SubDagOperator(
subdag=validate_site_design_failure_handler(
dag.dag_id, VALIDATION_FAILED_DAG_NAME,
args=default_args),
task_id=VALIDATION_FAILED_DAG_NAME,
trigger_rule=TriggerRule.ONE_FAILED,
dag=dag)
drydock_build = PlaceholderOperator(task_id='drydock_build', dag=dag)
drydock_failure_handler = PlaceholderOperator(
task_id='drydock_failure_handler',
trigger_rule=TriggerRule.ONE_FAILED,
dag=dag)
query_node_status = PlaceholderOperator(
task_id='deployed_node_status', dag=dag)
nodes_not_healthy = PlaceholderOperator(
task_id='deployed_nodes_not_healthy',
trigger_rule=TriggerRule.ONE_FAILED,
dag=dag)
armada_build = PlaceholderOperator(task_id='armada_build', dag=dag)
armada_failure_handler = PlaceholderOperator(
task_id='armada_failure_handler',
trigger_rule=TriggerRule.ONE_FAILED,
dag=dag)
mark_last_known_good = DeckhandOperator(
task_id=DECKHAND_MARK_LAST_KNOWN_GOOD, dag=dag)
# DAG Wiring
concurrency_check_failure_handler.set_upstream(concurrency_check)
preflight.set_upstream(concurrency_check)
preflight_failure.set_upstream(preflight)
get_design_version.set_upstream(preflight)
validate_site_design.set_upstream(get_design_version)
validate_site_design_failure.set_upstream(validate_site_design)
drydock_build.set_upstream(validate_site_design)
drydock_failure_handler.set_upstream(drydock_build)
query_node_status.set_upstream(drydock_build)
nodes_not_healthy.set_upstream(query_node_status)
armada_build.set_upstream(query_node_status)
armada_failure_handler.set_upstream(armada_build)
mark_last_known_good.set_upstream(armada_build)

View File

@ -0,0 +1,166 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import airflow
from airflow.models import DAG
from airflow.operators.subdag_operator import SubDagOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators import PlaceholderOperator
def k8s_preflight_check(parent_dag_name, child_dag_name, args):
'''
The k8s_preflight_check checks that k8s is in a good state for
the purposes of the Undercloud Platform to proceed with processing
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args, )
# TODO () Replace this operator with a real operator that will:
# 1) Ensure k8s is up and running.
# 2) Ensure that pods are not crashed
operator = PlaceholderOperator(task_id='k8s_preflight_check', dag=dag)
return dag
def shipyard_preflight_check(parent_dag_name, child_dag_name, args):
'''
Checks that shipyard is in a good state for
the purposes of the Undercloud Platform to proceed with processing
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args, )
# TODO () Replace this operator with a real operator that will:
# 1) Ensure shipyard is up and running.
operator = PlaceholderOperator(task_id='shipyard_preflight_check', dag=dag)
return dag
def deckhand_preflight_check(parent_dag_name, child_dag_name, args, ):
'''
Checks that deckhand is in a good state for
the purposes of the Undercloud Platform to proceed with processing
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args, )
# TODO () Replace this operator with a real operator that will:
# 1) Ensure deckhand is up and running.
operator = PlaceholderOperator(task_id='deckhand_preflight_check', dag=dag)
return dag
def drydock_preflight_check(parent_dag_name, child_dag_name, args):
'''
Checks that drydock is in a good state for
the purposes of the Undercloud Platform to proceed with processing
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args, )
# TODO () Replace this operator with a real operator that will:
# 1) Ensure drydock is up and running.
operator = PlaceholderOperator(task_id='drydock_preflight_check', dag=dag)
return dag
def armada_preflight_check(parent_dag_name, child_dag_name, args):
'''
Checks that armada is in a good state for
the purposes of the Undercloud Platform to proceed with processing
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args, )
# TODO () Replace this operator with a real operator that will:
# 1) Ensure armada is up and running.
operator = PlaceholderOperator(task_id='armada_preflight_check', dag=dag)
return dag
# Names used for sub-subdags in the all preflight check subdag
K8S_PREFLIGHT_CHECK_DAG_NAME = 'k8s_preflight_check'
SHIPYARD_PREFLIGHT_CHECK_DAG_NAME = 'shipyard_preflight_check'
DECKHAND_PREFLIGHT_CHECK_DAG_NAME = 'deckhand_preflight_check'
DRYDOCK_PREFLIGHT_CHECK_DAG_NAME = 'drydock_preflight_check'
PROMENADE_PREFLIGHT_CHECK_DAG_NAME = 'promenade_preflight_check'
ARMADA_PREFLIGHT_CHECK_DAG_NAME = 'armada_preflight_check'
def all_preflight_checks(parent_dag_name, child_dag_name, args):
'''
puts all of the preflight checks into an atomic unit.
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args, )
k8s = SubDagOperator(
subdag=k8s_preflight_check(dag.dag_id, K8S_PREFLIGHT_CHECK_DAG_NAME,
args),
task_id=K8S_PREFLIGHT_CHECK_DAG_NAME,
dag=dag, )
shipyard = SubDagOperator(
subdag=shipyard_preflight_check(dag.dag_id,
SHIPYARD_PREFLIGHT_CHECK_DAG_NAME,
args),
task_id=SHIPYARD_PREFLIGHT_CHECK_DAG_NAME,
dag=dag, )
deckhand = SubDagOperator(
subdag=deckhand_preflight_check(dag.dag_id,
DECKHAND_PREFLIGHT_CHECK_DAG_NAME,
args),
task_id=DECKHAND_PREFLIGHT_CHECK_DAG_NAME,
dag=dag, )
drydock = SubDagOperator(
subdag=drydock_preflight_check(dag.dag_id,
DRYDOCK_PREFLIGHT_CHECK_DAG_NAME,
args),
task_id=DRYDOCK_PREFLIGHT_CHECK_DAG_NAME,
dag=dag, )
armada = SubDagOperator(
subdag=armada_preflight_check(
dag.dag_id, ARMADA_PREFLIGHT_CHECK_DAG_NAME, args),
task_id=ARMADA_PREFLIGHT_CHECK_DAG_NAME,
dag=dag, )
return dag
def preflight_failure_handler(parent_dag_name, child_dag_name, args):
'''
Peforms the actions necessary when preflight checks fail
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args, )
operator = DummyOperator(task_id='preflight_failure_handler', dag=dag)
return dag

View File

@ -0,0 +1,58 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import airflow
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.subdag_operator import SubDagOperator
from airflow.utils.trigger_rule import TriggerRule
from airflow.operators import DeckhandOperator
from airflow.operators import PlaceholderOperator
def validate_site_design_failure_handler(parent_dag_name, child_dag_name,
args):
'''
Peforms the actions necessary when any of the site design checks fail
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args, )
operator = DummyOperator(
task_id='site_design_validation_failure_handler', dag=dag)
return dag
def validate_site_design(parent_dag_name, child_dag_name, args):
'''
Subdag to delegate design verification to the UCP components
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args, )
deckhand_validate_docs = DeckhandOperator(
task_id='deckhand_validate_site_design', dag=dag)
#TODO () use the real operator here
drydock_validate_docs = PlaceholderOperator(
task_id='drydock_validate_site_design', dag=dag)
#TODO () use the real operator here
armada_validate_docs = PlaceholderOperator(
task_id='armada_validate_site_design', dag=dag)
return dag

View File

View File

@ -0,0 +1,44 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.plugins_manager import AirflowPlugin
class DeckhandOperator(BaseOperator):
"""
Supports interaction with Deckhand.
"""
#TODO () remove this special coloring when the operator is done.
ui_color = '#e8f7e4'
@apply_defaults
def __init__(self, *args, **kwargs):
super(DeckhandOperator, self).__init__(*args, **kwargs)
# TODO () make this communicate with Deckhand.
# Needs to expose functionality so general interaction
# with deckhand can occur.
def execute(self, context):
logging.info('%s : %s !!! not implemented. '
'Need to get design revision from Deckhand',
self.dag.dag_id, self.task_id)
class DeckhandOperatorPlugin(AirflowPlugin):
name = 'deckhand_operator_plugin'
operators = [DeckhandOperator]

View File

@ -0,0 +1,42 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.plugins_manager import AirflowPlugin
class PlaceholderOperator(BaseOperator):
"""
Operator that writes a log of its presence, as not implemented.
This is intended to be a little noisy so it's easy to see what's
missing.
"""
template_fields = tuple()
ui_color = '#e8f7e4'
@apply_defaults
def __init__(self, *args, **kwargs):
super(PlaceholderOperator, self).__init__(*args, **kwargs)
def execute(self, context):
logging.info('%s : %s !!! not implemented', self.dag.dag_id,
self.task_id)
class PlaceholderOperatorPlugin(AirflowPlugin):
name = "placeholder_operator_plugin"
operators = [PlaceholderOperator]