Merge "Add update_site dag, concurrency check"

This commit is contained in:
Bryan Strassner 2017-08-24 09:59:02 -04:00 committed by Gerrit Code Review
commit be9021dfd5
12 changed files with 434 additions and 117 deletions

View File

@ -13,8 +13,7 @@
# limitations under the License.
from airflow.models import DAG
from airflow.operators import PlaceholderOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators import ConcurrencyCheckOperator
def dag_concurrency_check(parent_dag_name, child_dag_name, args):
@ -26,28 +25,7 @@ def dag_concurrency_check(parent_dag_name, child_dag_name, args):
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args, )
# TODO () Replace this operator with a real operator that will:
# 1) Look for an instance of the parent_dag_name running currently in
# airflow
# 2) Fail if the parent_dag_name is running
# 3) Succeed if there are no instances of parent_dag_name running
dag_concurrency_check_operator = PlaceholderOperator(
dag_concurrency_check_operator = ConcurrencyCheckOperator(
task_id='dag_concurrency_check', dag=dag)
return dag
def dag_concurrency_check_failure_handler(parent_dag_name, child_dag_name,
args):
'''
Peforms the actions necessary when concurrency checks fail
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args, )
operator = DummyOperator(
task_id='dag_concurrency_check_failure_handler',
dag=dag, )
return dag

View File

@ -11,35 +11,27 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import timedelta
import airflow
from airflow import DAG
from dag_concurrency_check import dag_concurrency_check
from dag_concurrency_check import dag_concurrency_check_failure_handler
import failure_handlers
from preflight_checks import all_preflight_checks
from preflight_checks import preflight_failure_handler
from validate_site_design import validate_site_design
from validate_site_design import validate_site_design_failure_handler
from airflow.operators.subdag_operator import SubDagOperator
from airflow.operators import ConcurrencyCheckOperator
from airflow.operators import DeckhandOperator
from airflow.operators import PlaceholderOperator
from airflow.utils.trigger_rule import TriggerRule
'''
"""
deploy_site is the top-level orchestration DAG for deploying a site using the
Undercloud platform.
'''
"""
PARENT_DAG_NAME = 'deploy_site'
DAG_CONCURRENCY_CHECK_DAG_NAME = 'dag_concurrency_check'
CONCURRENCY_FAILURE_DAG_NAME = 'concurrency_check_failure_handler'
ALL_PREFLIGHT_CHECKS_DAG_NAME = 'preflight'
PREFLIGHT_FAILURE_DAG_NAME = 'preflight_failure_handler'
DECKHAND_GET_DESIGN_VERSION = 'deckhand_get_design_version'
VALIDATE_SITE_DESIGN_DAG_NAME = 'validate_site_design'
VALIDATION_FAILED_DAG_NAME = 'validate_site_design_failure_handler'
DECKHAND_MARK_LAST_KNOWN_GOOD = 'deckhand_mark_last_known_good'
default_args = {
'owner': 'airflow',
@ -54,84 +46,49 @@ default_args = {
dag = DAG(PARENT_DAG_NAME, default_args=default_args, schedule_interval=None)
concurrency_check = SubDagOperator(
subdag=dag_concurrency_check(
PARENT_DAG_NAME, DAG_CONCURRENCY_CHECK_DAG_NAME, args=default_args),
concurrency_check = ConcurrencyCheckOperator(
task_id=DAG_CONCURRENCY_CHECK_DAG_NAME,
dag=dag, )
concurrency_check_failure_handler = SubDagOperator(
subdag=dag_concurrency_check_failure_handler(
PARENT_DAG_NAME, CONCURRENCY_FAILURE_DAG_NAME, args=default_args),
task_id=CONCURRENCY_FAILURE_DAG_NAME,
trigger_rule=TriggerRule.ONE_FAILED,
dag=dag, )
on_failure_callback=failure_handlers.step_failure_handler,
dag=dag)
preflight = SubDagOperator(
subdag=all_preflight_checks(
PARENT_DAG_NAME, ALL_PREFLIGHT_CHECKS_DAG_NAME, args=default_args),
task_id=ALL_PREFLIGHT_CHECKS_DAG_NAME,
dag=dag, )
preflight_failure = SubDagOperator(
subdag=preflight_failure_handler(
PARENT_DAG_NAME, PREFLIGHT_FAILURE_DAG_NAME, args=default_args),
task_id=PREFLIGHT_FAILURE_DAG_NAME,
trigger_rule=TriggerRule.ONE_FAILED,
on_failure_callback=failure_handlers.step_failure_handler,
dag=dag, )
get_design_version = DeckhandOperator(
task_id=DECKHAND_GET_DESIGN_VERSION, dag=dag)
task_id=DECKHAND_GET_DESIGN_VERSION,
on_failure_callback=failure_handlers.step_failure_handler,
dag=dag)
validate_site_design = SubDagOperator(
subdag=validate_site_design(
PARENT_DAG_NAME, VALIDATE_SITE_DESIGN_DAG_NAME, args=default_args),
task_id=VALIDATE_SITE_DESIGN_DAG_NAME,
on_failure_callback=failure_handlers.step_failure_handler,
dag=dag)
validate_site_design_failure = SubDagOperator(
subdag=validate_site_design_failure_handler(
dag.dag_id, VALIDATION_FAILED_DAG_NAME, args=default_args),
task_id=VALIDATION_FAILED_DAG_NAME,
trigger_rule=TriggerRule.ONE_FAILED,
dag=dag)
drydock_build = PlaceholderOperator(task_id='drydock_build', dag=dag)
drydock_failure_handler = PlaceholderOperator(
task_id='drydock_failure_handler',
trigger_rule=TriggerRule.ONE_FAILED,
drydock_build = PlaceholderOperator(
task_id='drydock_build',
on_failure_callback=failure_handlers.step_failure_handler,
dag=dag)
query_node_status = PlaceholderOperator(
task_id='deployed_node_status', dag=dag)
nodes_not_healthy = PlaceholderOperator(
task_id='deployed_nodes_not_healthy',
trigger_rule=TriggerRule.ONE_FAILED,
task_id='deployed_node_status',
on_failure_callback=failure_handlers.step_failure_handler,
dag=dag)
armada_build = PlaceholderOperator(task_id='armada_build', dag=dag)
armada_failure_handler = PlaceholderOperator(
task_id='armada_failure_handler',
trigger_rule=TriggerRule.ONE_FAILED,
armada_build = PlaceholderOperator(
task_id='armada_build',
on_failure_callback=failure_handlers.step_failure_handler,
dag=dag)
mark_last_known_good = DeckhandOperator(
task_id=DECKHAND_MARK_LAST_KNOWN_GOOD, dag=dag)
# DAG Wiring
concurrency_check_failure_handler.set_upstream(concurrency_check)
preflight.set_upstream(concurrency_check)
preflight_failure.set_upstream(preflight)
get_design_version.set_upstream(preflight)
validate_site_design.set_upstream(get_design_version)
validate_site_design_failure.set_upstream(validate_site_design)
drydock_build.set_upstream(validate_site_design)
drydock_failure_handler.set_upstream(drydock_build)
query_node_status.set_upstream(drydock_build)
nodes_not_healthy.set_upstream(query_node_status)
armada_build.set_upstream(query_node_status)
armada_failure_handler.set_upstream(armada_build)
mark_last_known_good.set_upstream(armada_build)

View File

@ -0,0 +1,21 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
def step_failure_handler(context):
"""
Callable used to handle failure of this step.
"""
logging.info('%s step failed', context['task_instance'].task_id)

View File

@ -14,7 +14,6 @@
from airflow.models import DAG
from airflow.operators.subdag_operator import SubDagOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators import PlaceholderOperator
@ -150,16 +149,3 @@ def all_preflight_checks(parent_dag_name, child_dag_name, args):
dag=dag, )
return dag
def preflight_failure_handler(parent_dag_name, child_dag_name, args):
'''
Peforms the actions necessary when preflight checks fail
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args, )
operator = DummyOperator(task_id='preflight_failure_handler', dag=dag)
return dag

View File

@ -0,0 +1,94 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from datetime import timedelta
import airflow
from airflow import DAG
import failure_handlers
from preflight_checks import all_preflight_checks
from validate_site_design import validate_site_design
from airflow.operators.subdag_operator import SubDagOperator
from airflow.operators import ConcurrencyCheckOperator
from airflow.operators import DeckhandOperator
from airflow.operators import PlaceholderOperator
"""
update_site is the top-level orchestration DAG for updating a site using the
Undercloud platform.
"""
PARENT_DAG_NAME = 'update_site'
DAG_CONCURRENCY_CHECK_DAG_NAME = 'dag_concurrency_check'
ALL_PREFLIGHT_CHECKS_DAG_NAME = 'preflight'
DECKHAND_GET_DESIGN_VERSION = 'deckhand_get_design_version'
VALIDATE_SITE_DESIGN_DAG_NAME = 'validate_site_design'
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(1),
'email': [''],
'email_on_failure': False,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=1),
}
dag = DAG(PARENT_DAG_NAME, default_args=default_args, schedule_interval=None)
concurrency_check = ConcurrencyCheckOperator(
task_id=DAG_CONCURRENCY_CHECK_DAG_NAME,
on_failure_callback=failure_handlers.step_failure_handler,
dag=dag)
preflight = SubDagOperator(
subdag=all_preflight_checks(
PARENT_DAG_NAME, ALL_PREFLIGHT_CHECKS_DAG_NAME, args=default_args),
task_id=ALL_PREFLIGHT_CHECKS_DAG_NAME,
on_failure_callback=failure_handlers.step_failure_handler,
dag=dag, )
get_design_version = DeckhandOperator(
task_id=DECKHAND_GET_DESIGN_VERSION,
on_failure_callback=failure_handlers.step_failure_handler,
dag=dag)
validate_site_design = SubDagOperator(
subdag=validate_site_design(
PARENT_DAG_NAME, VALIDATE_SITE_DESIGN_DAG_NAME, args=default_args),
task_id=VALIDATE_SITE_DESIGN_DAG_NAME,
on_failure_callback=failure_handlers.step_failure_handler,
dag=dag)
drydock_build = PlaceholderOperator(
task_id='drydock_build',
on_failure_callback=failure_handlers.step_failure_handler,
dag=dag)
query_node_status = PlaceholderOperator(
task_id='deployed_node_status',
on_failure_callback=failure_handlers.step_failure_handler,
dag=dag)
armada_build = PlaceholderOperator(
task_id='armada_build',
on_failure_callback=failure_handlers.step_failure_handler,
dag=dag)
# DAG Wiring
preflight.set_upstream(concurrency_check)
get_design_version.set_upstream(preflight)
validate_site_design.set_upstream(get_design_version)
drydock_build.set_upstream(validate_site_design)
query_node_status.set_upstream(drydock_build)
armada_build.set_upstream(query_node_status)

View File

@ -13,26 +13,10 @@
# limitations under the License.
from airflow.models import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators import DeckhandOperator
from airflow.operators import PlaceholderOperator
def validate_site_design_failure_handler(parent_dag_name, child_dag_name,
args):
'''
Peforms the actions necessary when any of the site design checks fail
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args, )
operator = DummyOperator(
task_id='site_design_validation_failure_handler', dag=dag)
return dag
def validate_site_design(parent_dag_name, child_dag_name, args):
'''
Subdag to delegate design verification to the UCP components

View File

@ -0,0 +1,135 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.plugins_manager import AirflowPlugin
from airflow.hooks.postgres_hook import PostgresHook
from airflow.exceptions import AirflowException
# constants related to the dag_run table.
DAG_RUN_SELECT_RUNNING_SQL = ("select dag_id, execution_date "
"from dag_run "
"where state='running'")
# connection name for airflow's own sql db
AIRFLOW_DB = 'airflows_own_db'
# each set in this list of sets indicates DAGs that shouldn't execute together
CONFLICTING_DAG_SETS = [set(['deploy_site', 'update_site', 'redeploy_server'])]
def find_conflicting_dag_set(dag_name, conflicting_dag_sets=None):
"""
Using dag_name, finds all other dag names that are in any set of
conflicting dags from input conflicting_dag_sets
"""
if conflicting_dag_sets is None:
conflicting_dag_sets = CONFLICTING_DAG_SETS
full_set = set()
for single_set in conflicting_dag_sets:
if dag_name in single_set:
full_set = full_set | single_set
full_set.discard(dag_name)
logging.info('Potential conflicts: %s', ', '.join(full_set))
return full_set
class ConcurrencyCheckOperator(BaseOperator):
"""
Provides a way to indicate which DAGs should not be executing
simultaneously.
"""
@apply_defaults
def __init__(self, conflicting_dag_set=None, *args, **kwargs):
super(ConcurrencyCheckOperator, self).__init__(*args, **kwargs)
if conflicting_dag_set is not None:
self.conflicting_dag_set = conflicting_dag_set
else:
self.check_dag_id = self.dag.dag_id
logging.debug('dag_id is %s', self.check_dag_id)
if '.' in self.dag.dag_id:
self.check_dag_id = self.dag.dag_id.split('.', 1)[0]
logging.debug('dag_id modified to %s', self.check_dag_id)
logging.info('from dag %s, assuming %s for concurrency check',
self.dag.dag_id, self.check_dag_id)
self.conflicting_dag_set = find_conflicting_dag_set(
self.check_dag_id)
def execute(self, context):
"""
Run the check to see if this DAG has an concurrency issues with other
DAGs. Stop the workflow if there is.
"""
logging.info('Checking for running of dags: %s',
', '.join(self.conflicting_dag_set))
conflicting_dag = self.find_conflicting_dag(self.check_dag_id)
if conflicting_dag is None:
logging.info('No conflicts found. Continuing Execution')
else:
self.abort_conflict(
dag_name=self.check_dag_id, conflict=conflicting_dag)
def get_executing_dags(self):
"""
Encapsulation of getting database records of running dags.
Returns a list of records of dag_id and execution_date
"""
logging.info('Executing: %s', DAG_RUN_SELECT_RUNNING_SQL)
airflow_pg_hook = PostgresHook(postgres_conn_id=AIRFLOW_DB)
return airflow_pg_hook.get_records(DAG_RUN_SELECT_RUNNING_SQL)
def find_conflicting_dag(self, dag_id_to_check):
"""
Checks for a DAGs that is conflicting and exits based on the first
one found.
Also will return the dag_id_to_check as conflicting if more than 1
instance is running
"""
self_dag_count = 0
for dag_id, execution_date in self.get_executing_dags():
logging.info('Checking %s @ %s vs. current %s', dag_id,
execution_date, dag_id_to_check)
if dag_id == dag_id_to_check:
self_dag_count += 1
logging.info(
"Found an instance of the dag_id being checked. Tally: %s",
self_dag_count)
if dag_id in self.conflicting_dag_set:
logging.info("Conflict found: %s @ %s", dag_id, execution_date)
return dag_id
if self_dag_count > 1:
return dag_id_to_check
return None
def abort_conflict(self, dag_name, conflict):
"""
Log and raise an exception that there is a conflicting workflow.
"""
conflict_string = '{} conflicts with running {}. Aborting run'.format(
dag_name, conflict)
logging.warning(conflict_string)
raise AirflowException(conflict_string)
class ConcurrencyCheckPlugin(AirflowPlugin):
"""
Register this plugin for this operator.
"""
name = 'concurrency_check_operator_plugin'
operators = [ConcurrencyCheckOperator]

View File

@ -1,6 +1,9 @@
# Testing
pytest==3.2.1
mock==2.0.0
testfixtures==5.1.1
apache-airflow[crypto,celery,postgres,hive,hdfs,jdbc]==1.8.1
psycopg2==2.7.3
# Linting
flake8==3.3.0

View File

View File

@ -0,0 +1,27 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from testfixtures import LogCapture
from types import SimpleNamespace
from shipyard_airflow.dags import failure_handlers
CONTEXT = {'task_instance': SimpleNamespace(task_id='cheese')}
def test_step_failure_handler():
"""
Ensure that the failure handler is logging as intended.
"""
with LogCapture() as log_capturer:
failure_handlers.step_failure_handler(CONTEXT)
log_capturer.check(('root', 'INFO', 'cheese step failed'))

View File

View File

@ -0,0 +1,132 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pytest
from shipyard_airflow.plugins import concurrency_check_operator as operator
from shipyard_airflow.plugins.concurrency_check_operator import (
ConcurrencyCheckOperator
)
from airflow.exceptions import AirflowException
COLOR_SETS = [
set(['blue', 'green']),
set(['blue', 'purple']),
set(['red', 'purple']),
set(['red', 'orange']),
set(['yellow', 'green']),
set(['yellow', 'orange']),
]
CONFLICT_SET = set(['cow', 'monkey', 'chicken'])
def test_find_conflicting_dag_set():
"""
Ensure that the right values are determined by find_conflicting_dag_set
"""
# Should not be found in the default set - no conflicts
assert operator.DAG_RUN_SELECT_RUNNING_SQL is not None
assert not operator.find_conflicting_dag_set("this_is_completely_cheese")
# Check for contents vs the COLOR_SETS
not_in_green_response_set = set(['purple', 'red', 'orange'])
response_set = operator.find_conflicting_dag_set(
dag_name='green', conflicting_dag_sets=COLOR_SETS)
assert 'blue' in response_set
assert 'yellow' in response_set
assert not_in_green_response_set.isdisjoint(response_set)
assert len(response_set) == 2
def get_executing_dags_stub_running_twice():
return [
('buffalo', 'now'),
('buffalo', 'earlier'),
('squirrel', 'ages ago'),
]
def get_executing_dags_stub():
return [
('buffalo', 'now'),
('chicken', 'earlier'),
('monkey', 'ages ago'),
]
def get_executing_dags_stub_no_conflicts():
return [
('buffalo', 'now'),
('hedgehog', 'earlier'),
('panda', 'ages ago'),
]
def test_find_conflicting_dag():
"""
Ensure that:
1) responds with a found conflict
2) responds None if there is no conflict
3) responds with the dag_id_to_check being searched for if it is running
more than once.
"""
cco = ConcurrencyCheckOperator(
conflicting_dag_set=CONFLICT_SET,
task_id='bogus')
# no conflicts
cco.get_executing_dags = get_executing_dags_stub_no_conflicts
assert cco.find_conflicting_dag('buffalo') is None
# self is running twice
cco.get_executing_dags = get_executing_dags_stub_running_twice
assert cco.find_conflicting_dag('buffalo') != 'squirrel'
assert cco.find_conflicting_dag('buffalo') == 'buffalo'
# a conflict from the list
cco.get_executing_dags = get_executing_dags_stub
assert cco.find_conflicting_dag('buffalo') != 'monkey'
assert cco.find_conflicting_dag('buffalo') == 'chicken'
def test_execute_exception():
"""
Run the whole execute function for testing
"""
cco = ConcurrencyCheckOperator(
conflicting_dag_set=CONFLICT_SET,
task_id='bogus')
# dag_id of cow should cause monkey to conflict.
cco.check_dag_id = 'cow'
cco.get_executing_dags = get_executing_dags_stub
try:
cco.execute(None)
pytest.fail('AirflowException should have been raised')
except AirflowException as airflow_exception:
assert 'Aborting run' in airflow_exception.args[0]
def test_execute_success():
"""
Run the whole execute function for testing - successfully!
"""
cco = ConcurrencyCheckOperator(
conflicting_dag_set=set(['car', 'truck']),
task_id='bogus')
# dag_id of airplane should have no conflicts
cco.check_dag_id = 'airplane'
cco.get_executing_dags = get_executing_dags_stub
try:
cco.execute(None)
assert True
except AirflowException:
pytest.fail('AirflowException should not have been raised')