Merge "DAG Maintenance - parallelization"
This commit is contained in:
commit
29d8810465
|
@ -33,7 +33,6 @@ def deploy_site_armada(parent_dag_name, child_dag_name, args):
|
||||||
task_id='armada_get_status',
|
task_id='armada_get_status',
|
||||||
shipyard_conf=config_path,
|
shipyard_conf=config_path,
|
||||||
main_dag_name=parent_dag_name,
|
main_dag_name=parent_dag_name,
|
||||||
sub_dag_name=child_dag_name,
|
|
||||||
dag=dag)
|
dag=dag)
|
||||||
|
|
||||||
# Armada Apply
|
# Armada Apply
|
||||||
|
@ -41,7 +40,6 @@ def deploy_site_armada(parent_dag_name, child_dag_name, args):
|
||||||
task_id='armada_post_apply',
|
task_id='armada_post_apply',
|
||||||
shipyard_conf=config_path,
|
shipyard_conf=config_path,
|
||||||
main_dag_name=parent_dag_name,
|
main_dag_name=parent_dag_name,
|
||||||
sub_dag_name=child_dag_name,
|
|
||||||
retries=3,
|
retries=3,
|
||||||
dag=dag)
|
dag=dag)
|
||||||
|
|
||||||
|
@ -50,7 +48,6 @@ def deploy_site_armada(parent_dag_name, child_dag_name, args):
|
||||||
task_id='armada_get_releases',
|
task_id='armada_get_releases',
|
||||||
shipyard_conf=config_path,
|
shipyard_conf=config_path,
|
||||||
main_dag_name=parent_dag_name,
|
main_dag_name=parent_dag_name,
|
||||||
sub_dag_name=child_dag_name,
|
|
||||||
dag=dag)
|
dag=dag)
|
||||||
|
|
||||||
# Define dependencies
|
# Define dependencies
|
||||||
|
|
|
@ -12,15 +12,17 @@
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
from airflow.operators import ConcurrencyCheckOperator
|
from airflow.operators import ConcurrencyCheckOperator
|
||||||
|
from airflow.operators import DeckhandRetrieveRenderedDocOperator
|
||||||
|
from airflow.operators import DeploymentConfigurationOperator
|
||||||
|
from airflow.operators import DeckhandCreateSiteActionTagOperator
|
||||||
|
|
||||||
from airflow.operators.bash_operator import BashOperator
|
from airflow.operators.bash_operator import BashOperator
|
||||||
from airflow.operators.python_operator import BranchPythonOperator
|
from airflow.operators.python_operator import BranchPythonOperator
|
||||||
from airflow.operators.python_operator import PythonOperator
|
from airflow.operators.python_operator import PythonOperator
|
||||||
from airflow.operators.subdag_operator import SubDagOperator
|
from airflow.operators.subdag_operator import SubDagOperator
|
||||||
|
|
||||||
from armada_deploy_site import deploy_site_armada
|
from armada_deploy_site import deploy_site_armada
|
||||||
from dag_deployment_configuration import get_deployment_configuration
|
from config_path import config_path
|
||||||
from deckhand_create_tag import create_deckhand_tag
|
|
||||||
from deckhand_get_rendered_doc import get_rendered_doc_deckhand
|
|
||||||
from destroy_node import destroy_server
|
from destroy_node import destroy_server
|
||||||
from drydock_deploy_site import deploy_site_drydock
|
from drydock_deploy_site import deploy_site_drydock
|
||||||
from failure_handlers import step_failure_handler
|
from failure_handlers import step_failure_handler
|
||||||
|
@ -64,7 +66,7 @@ class CommonStepFactory(object):
|
||||||
dag=self.dag,
|
dag=self.dag,
|
||||||
python_callable=xcom_push)
|
python_callable=xcom_push)
|
||||||
|
|
||||||
def get_concurrency_check(self, task_id=dn.DAG_CONCURRENCY_CHECK_DAG_NAME):
|
def get_concurrency_check(self, task_id=dn.CONCURRENCY_CHECK):
|
||||||
"""Generate the concurrency check step
|
"""Generate the concurrency check step
|
||||||
|
|
||||||
Concurrency check prevents simultaneous execution of dags that should
|
Concurrency check prevents simultaneous execution of dags that should
|
||||||
|
@ -95,11 +97,9 @@ class CommonStepFactory(object):
|
||||||
Check that we are able to render the docs before proceeding
|
Check that we are able to render the docs before proceeding
|
||||||
further with the workflow
|
further with the workflow
|
||||||
"""
|
"""
|
||||||
return SubDagOperator(
|
return DeckhandRetrieveRenderedDocOperator(
|
||||||
subdag=get_rendered_doc_deckhand(
|
shipyard_conf=config_path,
|
||||||
self.parent_dag_name,
|
main_dag_name=self.parent_dag_name,
|
||||||
task_id,
|
|
||||||
args=self.default_args),
|
|
||||||
task_id=task_id,
|
task_id=task_id,
|
||||||
on_failure_callback=step_failure_handler,
|
on_failure_callback=step_failure_handler,
|
||||||
dag=self.dag)
|
dag=self.dag)
|
||||||
|
@ -121,17 +121,15 @@ class CommonStepFactory(object):
|
||||||
dag=self.dag)
|
dag=self.dag)
|
||||||
|
|
||||||
def get_deployment_configuration(self,
|
def get_deployment_configuration(self,
|
||||||
task_id=dn.GET_DEPLOY_CONF_DAG_NAME):
|
task_id=dn.DEPLOYMENT_CONFIGURATION):
|
||||||
"""Generate the step to retrieve the deployment configuration
|
"""Generate the step to retrieve the deployment configuration
|
||||||
|
|
||||||
This step provides the timings and strategies that will be used in
|
This step provides the timings and strategies that will be used in
|
||||||
subsequent steps
|
subsequent steps
|
||||||
"""
|
"""
|
||||||
return SubDagOperator(
|
return DeploymentConfigurationOperator(
|
||||||
subdag=get_deployment_configuration(
|
main_dag_name=self.parent_dag_name,
|
||||||
self.parent_dag_name,
|
shipyard_conf=config_path,
|
||||||
task_id,
|
|
||||||
args=self.default_args),
|
|
||||||
task_id=task_id,
|
task_id=task_id,
|
||||||
on_failure_callback=step_failure_handler,
|
on_failure_callback=step_failure_handler,
|
||||||
dag=self.dag)
|
dag=self.dag)
|
||||||
|
@ -254,12 +252,11 @@ class CommonStepFactory(object):
|
||||||
Note that trigger_rule is set to "all_done" so that this
|
Note that trigger_rule is set to "all_done" so that this
|
||||||
step will run even when upstream tasks are in failed state.
|
step will run even when upstream tasks are in failed state.
|
||||||
"""
|
"""
|
||||||
return SubDagOperator(
|
|
||||||
subdag=create_deckhand_tag(
|
return DeckhandCreateSiteActionTagOperator(
|
||||||
self.parent_dag_name,
|
|
||||||
task_id,
|
|
||||||
args=self.default_args),
|
|
||||||
task_id=task_id,
|
task_id=task_id,
|
||||||
|
shipyard_conf=config_path,
|
||||||
on_failure_callback=step_failure_handler,
|
on_failure_callback=step_failure_handler,
|
||||||
trigger_rule="all_done",
|
trigger_rule="all_done",
|
||||||
|
main_dag_name=self.parent_dag_name,
|
||||||
dag=self.dag)
|
dag=self.dag)
|
||||||
|
|
|
@ -1,31 +0,0 @@
|
||||||
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
# you may not use this file except in compliance with the License.
|
|
||||||
# You may obtain a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
# See the License for the specific language governing permissions and
|
|
||||||
# limitations under the License.
|
|
||||||
|
|
||||||
from airflow.models import DAG
|
|
||||||
from airflow.operators import ConcurrencyCheckOperator
|
|
||||||
|
|
||||||
|
|
||||||
def dag_concurrency_check(parent_dag_name, child_dag_name, args):
|
|
||||||
'''
|
|
||||||
dag_concurrency_check is a sub-DAG that will will allow for a DAG to
|
|
||||||
determine if it is already running, and result in an error if so.
|
|
||||||
'''
|
|
||||||
dag = DAG(
|
|
||||||
'{}.{}'.format(parent_dag_name, child_dag_name),
|
|
||||||
default_args=args, )
|
|
||||||
|
|
||||||
dag_concurrency_check_operator = ConcurrencyCheckOperator(
|
|
||||||
task_id='dag_concurrency_check', dag=dag)
|
|
||||||
|
|
||||||
return dag
|
|
|
@ -1,36 +0,0 @@
|
||||||
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
# you may not use this file except in compliance with the License.
|
|
||||||
# You may obtain a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
# See the License for the specific language governing permissions and
|
|
||||||
# limitations under the License.
|
|
||||||
|
|
||||||
from airflow.models import DAG
|
|
||||||
from airflow.operators import DeploymentConfigurationOperator
|
|
||||||
|
|
||||||
from config_path import config_path
|
|
||||||
|
|
||||||
|
|
||||||
GET_DEPLOYMENT_CONFIGURATION_NAME = 'get_deployment_configuration'
|
|
||||||
|
|
||||||
|
|
||||||
def get_deployment_configuration(parent_dag_name, child_dag_name, args):
|
|
||||||
"""DAG to retrieve deployment configuration"""
|
|
||||||
dag = DAG(
|
|
||||||
'{}.{}'.format(parent_dag_name, child_dag_name),
|
|
||||||
default_args=args)
|
|
||||||
|
|
||||||
deployment_configuration = DeploymentConfigurationOperator(
|
|
||||||
task_id=GET_DEPLOYMENT_CONFIGURATION_NAME,
|
|
||||||
shipyard_conf=config_path,
|
|
||||||
main_dag_name=parent_dag_name,
|
|
||||||
dag=dag)
|
|
||||||
|
|
||||||
return dag
|
|
|
@ -15,16 +15,16 @@
|
||||||
# Subdags
|
# Subdags
|
||||||
ALL_PREFLIGHT_CHECKS_DAG_NAME = 'preflight'
|
ALL_PREFLIGHT_CHECKS_DAG_NAME = 'preflight'
|
||||||
ARMADA_BUILD_DAG_NAME = 'armada_build'
|
ARMADA_BUILD_DAG_NAME = 'armada_build'
|
||||||
CREATE_ACTION_TAG = 'create_action_tag'
|
|
||||||
DAG_CONCURRENCY_CHECK_DAG_NAME = 'dag_concurrency_check'
|
|
||||||
DESTROY_SERVER_DAG_NAME = 'destroy_server'
|
DESTROY_SERVER_DAG_NAME = 'destroy_server'
|
||||||
DRYDOCK_BUILD_DAG_NAME = 'drydock_build'
|
DRYDOCK_BUILD_DAG_NAME = 'drydock_build'
|
||||||
GET_DEPLOY_CONF_DAG_NAME = 'dag_deployment_configuration'
|
|
||||||
GET_RENDERED_DOC = 'get_rendered_doc'
|
|
||||||
VALIDATE_SITE_DESIGN_DAG_NAME = 'validate_site_design'
|
VALIDATE_SITE_DESIGN_DAG_NAME = 'validate_site_design'
|
||||||
|
|
||||||
# Steps
|
# Steps
|
||||||
ACTION_XCOM = 'action_xcom'
|
ACTION_XCOM = 'action_xcom'
|
||||||
|
CONCURRENCY_CHECK = 'dag_concurrency_check'
|
||||||
|
CREATE_ACTION_TAG = 'create_action_tag'
|
||||||
DECIDE_AIRFLOW_UPGRADE = 'decide_airflow_upgrade'
|
DECIDE_AIRFLOW_UPGRADE = 'decide_airflow_upgrade'
|
||||||
|
DEPLOYMENT_CONFIGURATION = 'deployment_configuration'
|
||||||
|
GET_RENDERED_DOC = 'get_rendered_doc'
|
||||||
SKIP_UPGRADE_AIRFLOW = 'skip_upgrade_airflow'
|
SKIP_UPGRADE_AIRFLOW = 'skip_upgrade_airflow'
|
||||||
UPGRADE_AIRFLOW = 'upgrade_airflow'
|
UPGRADE_AIRFLOW = 'upgrade_airflow'
|
||||||
|
|
|
@ -1,36 +0,0 @@
|
||||||
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
# you may not use this file except in compliance with the License.
|
|
||||||
# You may obtain a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
# See the License for the specific language governing permissions and
|
|
||||||
# limitations under the License.
|
|
||||||
|
|
||||||
from airflow.models import DAG
|
|
||||||
from airflow.operators import DeckhandCreateSiteActionTagOperator
|
|
||||||
|
|
||||||
from config_path import config_path
|
|
||||||
|
|
||||||
|
|
||||||
def create_deckhand_tag(parent_dag_name, child_dag_name, args):
|
|
||||||
'''
|
|
||||||
Create Deckhand Revision Tag
|
|
||||||
'''
|
|
||||||
dag = DAG(
|
|
||||||
'{}.{}'.format(parent_dag_name, child_dag_name),
|
|
||||||
default_args=args)
|
|
||||||
|
|
||||||
create_action_tag = DeckhandCreateSiteActionTagOperator(
|
|
||||||
task_id='deckhand_create_action_tag',
|
|
||||||
shipyard_conf=config_path,
|
|
||||||
main_dag_name=parent_dag_name,
|
|
||||||
sub_dag_name=child_dag_name,
|
|
||||||
dag=dag)
|
|
||||||
|
|
||||||
return dag
|
|
|
@ -1,36 +0,0 @@
|
||||||
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
# you may not use this file except in compliance with the License.
|
|
||||||
# You may obtain a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
# See the License for the specific language governing permissions and
|
|
||||||
# limitations under the License.
|
|
||||||
|
|
||||||
from airflow.models import DAG
|
|
||||||
from airflow.operators import DeckhandRetrieveRenderedDocOperator
|
|
||||||
|
|
||||||
from config_path import config_path
|
|
||||||
|
|
||||||
|
|
||||||
def get_rendered_doc_deckhand(parent_dag_name, child_dag_name, args):
|
|
||||||
'''
|
|
||||||
Get rendered documents from Deckhand for the committed revision ID.
|
|
||||||
'''
|
|
||||||
dag = DAG(
|
|
||||||
'{}.{}'.format(parent_dag_name, child_dag_name),
|
|
||||||
default_args=args)
|
|
||||||
|
|
||||||
deckhand_retrieve_rendered_doc = DeckhandRetrieveRenderedDocOperator(
|
|
||||||
task_id='deckhand_retrieve_rendered_doc',
|
|
||||||
shipyard_conf=config_path,
|
|
||||||
main_dag_name=parent_dag_name,
|
|
||||||
sub_dag_name=child_dag_name,
|
|
||||||
dag=dag)
|
|
||||||
|
|
||||||
return dag
|
|
|
@ -54,11 +54,15 @@ armada_build = step_factory.get_armada_build()
|
||||||
create_action_tag = step_factory.get_create_action_tag()
|
create_action_tag = step_factory.get_create_action_tag()
|
||||||
|
|
||||||
# DAG Wiring
|
# DAG Wiring
|
||||||
concurrency_check.set_upstream(action_xcom)
|
preflight.set_upstream(action_xcom)
|
||||||
preflight.set_upstream(concurrency_check)
|
get_rendered_doc.set_upstream(action_xcom)
|
||||||
get_rendered_doc.set_upstream(preflight)
|
deployment_configuration.set_upstream(action_xcom)
|
||||||
deployment_configuration.set_upstream(get_rendered_doc)
|
validate_site_design.set_upstream([
|
||||||
validate_site_design.set_upstream(deployment_configuration)
|
preflight,
|
||||||
|
get_rendered_doc,
|
||||||
|
concurrency_check,
|
||||||
|
deployment_configuration
|
||||||
|
])
|
||||||
drydock_build.set_upstream(validate_site_design)
|
drydock_build.set_upstream(validate_site_design)
|
||||||
armada_build.set_upstream(drydock_build)
|
armada_build.set_upstream(drydock_build)
|
||||||
create_action_tag.set_upstream(armada_build)
|
create_action_tag.set_upstream(armada_build)
|
||||||
|
|
|
@ -39,7 +39,6 @@ def destroy_server(parent_dag_name, child_dag_name, args):
|
||||||
task_id='promenade_drain_node',
|
task_id='promenade_drain_node',
|
||||||
shipyard_conf=config_path,
|
shipyard_conf=config_path,
|
||||||
main_dag_name=parent_dag_name,
|
main_dag_name=parent_dag_name,
|
||||||
sub_dag_name=child_dag_name,
|
|
||||||
dag=dag)
|
dag=dag)
|
||||||
|
|
||||||
# Clear Labels
|
# Clear Labels
|
||||||
|
@ -47,7 +46,6 @@ def destroy_server(parent_dag_name, child_dag_name, args):
|
||||||
task_id='promenade_clear_labels',
|
task_id='promenade_clear_labels',
|
||||||
shipyard_conf=config_path,
|
shipyard_conf=config_path,
|
||||||
main_dag_name=parent_dag_name,
|
main_dag_name=parent_dag_name,
|
||||||
sub_dag_name=child_dag_name,
|
|
||||||
dag=dag)
|
dag=dag)
|
||||||
|
|
||||||
# Shutdown Kubelet
|
# Shutdown Kubelet
|
||||||
|
@ -55,7 +53,6 @@ def destroy_server(parent_dag_name, child_dag_name, args):
|
||||||
task_id='promenade_shutdown_kubelet',
|
task_id='promenade_shutdown_kubelet',
|
||||||
shipyard_conf=config_path,
|
shipyard_conf=config_path,
|
||||||
main_dag_name=parent_dag_name,
|
main_dag_name=parent_dag_name,
|
||||||
sub_dag_name=child_dag_name,
|
|
||||||
dag=dag)
|
dag=dag)
|
||||||
|
|
||||||
# ETCD Sanity Check
|
# ETCD Sanity Check
|
||||||
|
@ -63,7 +60,6 @@ def destroy_server(parent_dag_name, child_dag_name, args):
|
||||||
task_id='promenade_check_etcd',
|
task_id='promenade_check_etcd',
|
||||||
shipyard_conf=config_path,
|
shipyard_conf=config_path,
|
||||||
main_dag_name=parent_dag_name,
|
main_dag_name=parent_dag_name,
|
||||||
sub_dag_name=child_dag_name,
|
|
||||||
dag=dag)
|
dag=dag)
|
||||||
|
|
||||||
# Power down and destroy node using DryDock
|
# Power down and destroy node using DryDock
|
||||||
|
@ -71,7 +67,6 @@ def destroy_server(parent_dag_name, child_dag_name, args):
|
||||||
task_id='destroy_node',
|
task_id='destroy_node',
|
||||||
shipyard_conf=config_path,
|
shipyard_conf=config_path,
|
||||||
main_dag_name=parent_dag_name,
|
main_dag_name=parent_dag_name,
|
||||||
sub_dag_name=child_dag_name,
|
|
||||||
dag=dag)
|
dag=dag)
|
||||||
|
|
||||||
# Decommission node from Kubernetes cluster using Promenade
|
# Decommission node from Kubernetes cluster using Promenade
|
||||||
|
@ -79,7 +74,6 @@ def destroy_server(parent_dag_name, child_dag_name, args):
|
||||||
task_id='promenade_decommission_node',
|
task_id='promenade_decommission_node',
|
||||||
shipyard_conf=config_path,
|
shipyard_conf=config_path,
|
||||||
main_dag_name=parent_dag_name,
|
main_dag_name=parent_dag_name,
|
||||||
sub_dag_name=child_dag_name,
|
|
||||||
dag=dag)
|
dag=dag)
|
||||||
|
|
||||||
# Define dependencies
|
# Define dependencies
|
||||||
|
|
|
@ -32,21 +32,18 @@ def deploy_site_drydock(parent_dag_name, child_dag_name, args):
|
||||||
task_id='verify_site',
|
task_id='verify_site',
|
||||||
shipyard_conf=config_path,
|
shipyard_conf=config_path,
|
||||||
main_dag_name=parent_dag_name,
|
main_dag_name=parent_dag_name,
|
||||||
sub_dag_name=child_dag_name,
|
|
||||||
dag=dag)
|
dag=dag)
|
||||||
|
|
||||||
drydock_prepare_site = DrydockPrepareSiteOperator(
|
drydock_prepare_site = DrydockPrepareSiteOperator(
|
||||||
task_id='prepare_site',
|
task_id='prepare_site',
|
||||||
shipyard_conf=config_path,
|
shipyard_conf=config_path,
|
||||||
main_dag_name=parent_dag_name,
|
main_dag_name=parent_dag_name,
|
||||||
sub_dag_name=child_dag_name,
|
|
||||||
dag=dag)
|
dag=dag)
|
||||||
|
|
||||||
drydock_nodes = DrydockNodesOperator(
|
drydock_nodes = DrydockNodesOperator(
|
||||||
task_id='prepare_and_deploy_nodes',
|
task_id='prepare_and_deploy_nodes',
|
||||||
shipyard_conf=config_path,
|
shipyard_conf=config_path,
|
||||||
main_dag_name=parent_dag_name,
|
main_dag_name=parent_dag_name,
|
||||||
sub_dag_name=child_dag_name,
|
|
||||||
dag=dag)
|
dag=dag)
|
||||||
|
|
||||||
# Define dependencies
|
# Define dependencies
|
||||||
|
|
|
@ -62,11 +62,15 @@ skip_upgrade_airflow = step_factory.get_skip_upgrade_airflow()
|
||||||
create_action_tag = step_factory.get_create_action_tag()
|
create_action_tag = step_factory.get_create_action_tag()
|
||||||
|
|
||||||
# DAG Wiring
|
# DAG Wiring
|
||||||
concurrency_check.set_upstream(action_xcom)
|
preflight.set_upstream(action_xcom)
|
||||||
preflight.set_upstream(concurrency_check)
|
get_rendered_doc.set_upstream(action_xcom)
|
||||||
get_rendered_doc.set_upstream(preflight)
|
deployment_configuration.set_upstream(action_xcom)
|
||||||
deployment_configuration.set_upstream(get_rendered_doc)
|
validate_site_design.set_upstream([
|
||||||
validate_site_design.set_upstream(deployment_configuration)
|
preflight,
|
||||||
|
get_rendered_doc,
|
||||||
|
concurrency_check,
|
||||||
|
deployment_configuration
|
||||||
|
])
|
||||||
drydock_build.set_upstream(validate_site_design)
|
drydock_build.set_upstream(validate_site_design)
|
||||||
armada_build.set_upstream(drydock_build)
|
armada_build.set_upstream(drydock_build)
|
||||||
decide_airflow_upgrade.set_upstream(armada_build)
|
decide_airflow_upgrade.set_upstream(armada_build)
|
||||||
|
|
|
@ -22,9 +22,10 @@ from config_path import config_path
|
||||||
|
|
||||||
|
|
||||||
def validate_site_design(parent_dag_name, child_dag_name, args):
|
def validate_site_design(parent_dag_name, child_dag_name, args):
|
||||||
'''
|
"""Subdag to delegate design verification to the UCP components
|
||||||
Subdag to delegate design verification to the UCP components
|
|
||||||
'''
|
There is no wiring of steps - they all execute in parallel
|
||||||
|
"""
|
||||||
dag = DAG(
|
dag = DAG(
|
||||||
'{}.{}'.format(parent_dag_name, child_dag_name),
|
'{}.{}'.format(parent_dag_name, child_dag_name),
|
||||||
default_args=args)
|
default_args=args)
|
||||||
|
@ -33,32 +34,28 @@ def validate_site_design(parent_dag_name, child_dag_name, args):
|
||||||
task_id='deckhand_validate_site_design',
|
task_id='deckhand_validate_site_design',
|
||||||
shipyard_conf=config_path,
|
shipyard_conf=config_path,
|
||||||
main_dag_name=parent_dag_name,
|
main_dag_name=parent_dag_name,
|
||||||
sub_dag_name=child_dag_name,
|
retries=1,
|
||||||
retries=3,
|
|
||||||
dag=dag)
|
dag=dag)
|
||||||
|
|
||||||
drydock_validate_docs = DrydockValidateDesignOperator(
|
drydock_validate_docs = DrydockValidateDesignOperator(
|
||||||
task_id='drydock_validate_site_design',
|
task_id='drydock_validate_site_design',
|
||||||
shipyard_conf=config_path,
|
shipyard_conf=config_path,
|
||||||
main_dag_name=parent_dag_name,
|
main_dag_name=parent_dag_name,
|
||||||
sub_dag_name=child_dag_name,
|
retries=1,
|
||||||
retries=3,
|
|
||||||
dag=dag)
|
dag=dag)
|
||||||
|
|
||||||
armada_validate_docs = ArmadaValidateDesignOperator(
|
armada_validate_docs = ArmadaValidateDesignOperator(
|
||||||
task_id='armada_validate_site_design',
|
task_id='armada_validate_site_design',
|
||||||
shipyard_conf=config_path,
|
shipyard_conf=config_path,
|
||||||
main_dag_name=parent_dag_name,
|
main_dag_name=parent_dag_name,
|
||||||
sub_dag_name=child_dag_name,
|
retries=1,
|
||||||
retries=3,
|
|
||||||
dag=dag)
|
dag=dag)
|
||||||
|
|
||||||
promenade_validate_docs = PromenadeValidateSiteDesignOperator(
|
promenade_validate_docs = PromenadeValidateSiteDesignOperator(
|
||||||
task_id='promenade_validate_site_design',
|
task_id='promenade_validate_site_design',
|
||||||
shipyard_conf=config_path,
|
shipyard_conf=config_path,
|
||||||
main_dag_name=parent_dag_name,
|
main_dag_name=parent_dag_name,
|
||||||
sub_dag_name=child_dag_name,
|
retries=1,
|
||||||
retries=3,
|
|
||||||
dag=dag)
|
dag=dag)
|
||||||
|
|
||||||
return dag
|
return dag
|
||||||
|
|
|
@ -71,7 +71,6 @@ class UcpBaseOperator(BaseOperator):
|
||||||
pod_selector_pattern=None,
|
pod_selector_pattern=None,
|
||||||
shipyard_conf=None,
|
shipyard_conf=None,
|
||||||
start_time=None,
|
start_time=None,
|
||||||
sub_dag_name=None,
|
|
||||||
xcom_push=True,
|
xcom_push=True,
|
||||||
*args, **kwargs):
|
*args, **kwargs):
|
||||||
"""Initialization of UcpBaseOperator object.
|
"""Initialization of UcpBaseOperator object.
|
||||||
|
@ -92,7 +91,6 @@ class UcpBaseOperator(BaseOperator):
|
||||||
log-rotate container.
|
log-rotate container.
|
||||||
:param shipyard_conf: Location of shipyard.conf
|
:param shipyard_conf: Location of shipyard.conf
|
||||||
:param start_time: Time when Operator gets executed
|
:param start_time: Time when Operator gets executed
|
||||||
:param sub_dag_name: Child Dag
|
|
||||||
:param xcom_push: xcom usage
|
:param xcom_push: xcom usage
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
@ -103,7 +101,6 @@ class UcpBaseOperator(BaseOperator):
|
||||||
self.pod_selector_pattern = pod_selector_pattern or []
|
self.pod_selector_pattern = pod_selector_pattern or []
|
||||||
self.shipyard_conf = shipyard_conf
|
self.shipyard_conf = shipyard_conf
|
||||||
self.start_time = datetime.now()
|
self.start_time = datetime.now()
|
||||||
self.sub_dag_name = sub_dag_name
|
|
||||||
self.xcom_push_flag = xcom_push
|
self.xcom_push_flag = xcom_push
|
||||||
self.doc_utils = _get_document_util(self.shipyard_conf)
|
self.doc_utils = _get_document_util(self.shipyard_conf)
|
||||||
self.endpoints = service_endpoint.ServiceEndpoints(self.shipyard_conf)
|
self.endpoints = service_endpoint.ServiceEndpoints(self.shipyard_conf)
|
||||||
|
|
|
@ -33,7 +33,20 @@ class XcomPuller(object):
|
||||||
self.ti = task_instance
|
self.ti = task_instance
|
||||||
|
|
||||||
def _get_xcom(self, source_task, dag_id=None, key=None, log_result=True):
|
def _get_xcom(self, source_task, dag_id=None, key=None, log_result=True):
|
||||||
"""Find a particular xcom value"""
|
"""Find and return an xcom value
|
||||||
|
|
||||||
|
:param source_task: The name of the task that wrote the xcom
|
||||||
|
:param dag_id: The name of the subdag (of the main DAG) that contained
|
||||||
|
the source task. Let this default to None if the task is a direct
|
||||||
|
child of the main dag
|
||||||
|
:param key: The name of the xcom item that was written by the task. If
|
||||||
|
the source task allowed for the step to simply push xcom at the
|
||||||
|
end of the step, leave this None.
|
||||||
|
:param log_result: boolean to indicate if the value of the xcom should
|
||||||
|
be logged upon retreival. This can be nice for investigative
|
||||||
|
purposes, but would likely not be good for large or complex
|
||||||
|
values.
|
||||||
|
"""
|
||||||
if dag_id is None:
|
if dag_id is None:
|
||||||
source_dag = self.mdn
|
source_dag = self.mdn
|
||||||
else:
|
else:
|
||||||
|
@ -53,8 +66,8 @@ class XcomPuller(object):
|
||||||
|
|
||||||
def get_deployment_configuration(self):
|
def get_deployment_configuration(self):
|
||||||
"""Retrieve the deployment configuration dictionary"""
|
"""Retrieve the deployment configuration dictionary"""
|
||||||
source_task = 'get_deployment_configuration'
|
source_task = 'deployment_configuration'
|
||||||
source_dag = 'dag_deployment_configuration'
|
source_dag = None
|
||||||
key = None
|
key = None
|
||||||
return self._get_xcom(source_task=source_task,
|
return self._get_xcom(source_task=source_task,
|
||||||
dag_id=source_dag,
|
dag_id=source_dag,
|
||||||
|
@ -77,7 +90,7 @@ class XcomPuller(object):
|
||||||
def get_check_drydock_continue_on_fail(self):
|
def get_check_drydock_continue_on_fail(self):
|
||||||
"""Check if 'drydock_continue_on_fail' key exists"""
|
"""Check if 'drydock_continue_on_fail' key exists"""
|
||||||
source_task = 'ucp_preflight_check'
|
source_task = 'ucp_preflight_check'
|
||||||
source_dag = 'preflight'
|
source_dag = None
|
||||||
key = 'drydock_continue_on_fail'
|
key = 'drydock_continue_on_fail'
|
||||||
return self._get_xcom(source_task=source_task,
|
return self._get_xcom(source_task=source_task,
|
||||||
dag_id=source_dag,
|
dag_id=source_dag,
|
||||||
|
|
Loading…
Reference in New Issue