Merge "Add Retry Logic for Armada Apply"
This commit is contained in:
commit
22315bb357
|
@ -14,91 +14,56 @@
|
|||
|
||||
from airflow.models import DAG
|
||||
from airflow.operators import ArmadaOperator
|
||||
from airflow.operators.subdag_operator import SubDagOperator
|
||||
|
||||
# Location of shiyard.conf
|
||||
config_path = '/usr/local/airflow/plugins/shipyard.conf'
|
||||
|
||||
# Names used for sub-subdags in the armada site deployment subdag
|
||||
CREATE_ARMADA_CLIENT_DAG_NAME = 'create_armada_client'
|
||||
GET_ARMADA_STATUS_DAG_NAME = 'armada_status'
|
||||
ARMADA_VALIDATE_DAG_NAME = 'armada_validate'
|
||||
ARMADA_APPLY_DAG_NAME = 'armada_apply'
|
||||
ARMADA_GET_RELEASES_DAG_NAME = 'armada_get_releases'
|
||||
|
||||
|
||||
def get_armada_subdag_step(parent_dag_name, child_dag_name, args):
|
||||
'''
|
||||
Execute Armada Subdag
|
||||
'''
|
||||
dag = DAG(
|
||||
'{}.{}'.format(parent_dag_name, child_dag_name),
|
||||
default_args=args)
|
||||
|
||||
# Note that in the event where the 'deploy_site' Action is
|
||||
# triggered from Shipyard, the 'parent_dag_name' variable
|
||||
# gets assigned with 'deploy_site.create_armada_client'.
|
||||
# This is the name that we want to assign to the subdag so
|
||||
# that we can reference it for xcom. The name of the main
|
||||
# dag will be the front part of that value, i.e. 'deploy_site'.
|
||||
# Hence we will extract the front part and assign it to main_dag.
|
||||
# We will reuse this pattern for other Actions, e.g. update_site,
|
||||
# redeploy_site as well.
|
||||
operator = ArmadaOperator(
|
||||
task_id=child_dag_name,
|
||||
shipyard_conf=config_path,
|
||||
action=child_dag_name,
|
||||
main_dag_name=parent_dag_name[0:parent_dag_name.find('.')],
|
||||
sub_dag_name=parent_dag_name,
|
||||
dag=dag)
|
||||
|
||||
return dag
|
||||
|
||||
|
||||
def deploy_site_armada(parent_dag_name, child_dag_name, args):
|
||||
'''
|
||||
Puts into atomic unit
|
||||
Armada Subdag
|
||||
'''
|
||||
dag = DAG(
|
||||
'{}.{}'.format(parent_dag_name, child_dag_name),
|
||||
default_args=args)
|
||||
|
||||
armada_client = SubDagOperator(
|
||||
subdag=get_armada_subdag_step(dag.dag_id,
|
||||
CREATE_ARMADA_CLIENT_DAG_NAME,
|
||||
args),
|
||||
task_id=CREATE_ARMADA_CLIENT_DAG_NAME,
|
||||
# Create Armada Client
|
||||
armada_client = ArmadaOperator(
|
||||
task_id='create_armada_client',
|
||||
shipyard_conf=config_path,
|
||||
action='create_armada_client',
|
||||
dag=dag)
|
||||
|
||||
armada_status = SubDagOperator(
|
||||
subdag=get_armada_subdag_step(dag.dag_id,
|
||||
GET_ARMADA_STATUS_DAG_NAME,
|
||||
args),
|
||||
task_id=GET_ARMADA_STATUS_DAG_NAME,
|
||||
# Get Tiller Status
|
||||
armada_status = ArmadaOperator(
|
||||
task_id='armada_status',
|
||||
shipyard_conf=config_path,
|
||||
action='armada_status',
|
||||
dag=dag)
|
||||
|
||||
armada_validate = SubDagOperator(
|
||||
subdag=get_armada_subdag_step(dag.dag_id,
|
||||
ARMADA_VALIDATE_DAG_NAME,
|
||||
args),
|
||||
task_id=ARMADA_VALIDATE_DAG_NAME,
|
||||
# Validate Armada YAMLs
|
||||
armada_validate = ArmadaOperator(
|
||||
task_id='armada_validate',
|
||||
shipyard_conf=config_path,
|
||||
action='armada_validate',
|
||||
dag=dag)
|
||||
|
||||
armada_apply = SubDagOperator(
|
||||
subdag=get_armada_subdag_step(dag.dag_id,
|
||||
ARMADA_APPLY_DAG_NAME,
|
||||
args),
|
||||
task_id=ARMADA_APPLY_DAG_NAME,
|
||||
# Armada Apply
|
||||
armada_apply = ArmadaOperator(
|
||||
task_id='armada_apply',
|
||||
shipyard_conf=config_path,
|
||||
action='armada_apply',
|
||||
retries=10,
|
||||
dag=dag)
|
||||
|
||||
armada_get_releases = SubDagOperator(
|
||||
subdag=get_armada_subdag_step(dag.dag_id,
|
||||
ARMADA_GET_RELEASES_DAG_NAME,
|
||||
args),
|
||||
task_id=ARMADA_GET_RELEASES_DAG_NAME,
|
||||
# Get Helm Releases
|
||||
armada_get_releases = ArmadaOperator(
|
||||
task_id='armada_get_releases',
|
||||
shipyard_conf=config_path,
|
||||
action='armada_get_releases',
|
||||
dag=dag)
|
||||
|
||||
# DAG Wiring
|
||||
# Define dependencies
|
||||
armada_status.set_upstream(armada_client)
|
||||
armada_validate.set_upstream(armada_status)
|
||||
armada_apply.set_upstream(armada_validate)
|
||||
|
|
|
@ -52,7 +52,7 @@ default_args = {
|
|||
'email_on_retry': False,
|
||||
'provide_context': True,
|
||||
'retries': 0,
|
||||
'retry_delay': timedelta(minutes=1),
|
||||
'retry_delay': timedelta(seconds=30),
|
||||
}
|
||||
|
||||
dag = DAG(PARENT_DAG_NAME, default_args=default_args, schedule_interval=None)
|
||||
|
|
|
@ -221,10 +221,11 @@ class ArmadaOperator(BaseOperator):
|
|||
context['query'])
|
||||
|
||||
# We will expect Armada to return the releases that it is
|
||||
# deploying. An empty value for 'install' means that armada
|
||||
# delploy has failed. Note that if we try and deploy the same
|
||||
# release twice, we will end up with empty response on our
|
||||
# second attempt and that will be treated as a failure scenario.
|
||||
# deploying. An empty value for 'install' means that armada
|
||||
# delploy has failed. Note that if we try and deploy the
|
||||
# same release twice, we will end up with empty response on
|
||||
# our second attempt and that will be treated as a failure
|
||||
# scenario.
|
||||
if armada_post_apply['message']['install']:
|
||||
logging.info("Armada Apply Successfully Executed")
|
||||
logging.info(armada_post_apply)
|
||||
|
|
Loading…
Reference in New Issue