diff --git a/src/bin/shipyard_airflow/shipyard_airflow/control/helpers/action_helper.py b/src/bin/shipyard_airflow/shipyard_airflow/control/helpers/action_helper.py index e9f0c637..41c40ca0 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/control/helpers/action_helper.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/control/helpers/action_helper.py @@ -17,7 +17,11 @@ import falcon import logging from shipyard_airflow.common.notes.notes import MIN_VERBOSITY +from shipyard_airflow.control.helpers.design_reference_helper import ( + DesignRefHelper +) from shipyard_airflow.control.helpers.notes import NOTES as notes_helper +from shipyard_airflow.dags.dag_names import CRITICAL_DAG_STEPS from shipyard_airflow.db.db import AIRFLOW_DB, SHIPYARD_DB from shipyard_airflow.errors import ApiError @@ -90,6 +94,73 @@ def format_step(action_id, step, index, notes): } +def get_deployment_status(action, force_completed=False): + """Given a set of action data, make/return a set of deployment status data + + :param dict action: A dictionary of data about an action. + The keys of the dict should include: + - committed_rev_id + - context_marker + - dag_status + - id + - timestamp + - user + Any missing key will result in a piece of the + deployment status being "unknown" + :param bool force_completed: optional. If True the status will be forced + to "completed". This will be useful when the + last step of the DAG wants to get the status + of the deployment, and knows that the + DAG/deployment/action really is completed even + though it does not appear to be. + :returns: A dict of deployment status data + :rtype: dict + """ + # Create a URL to get the documents in Deckhand, using the Deckhand + # revision ID associated with the action + document_url = DesignRefHelper().get_design_reference_href( + action.get('committed_rev_id', 'unknown')) + + # Create the "status" and "result" of the deployment + dag_status = action.get('dag_status', '').upper() + status = 'running' + result = 'unknown' + status_result_map = { + 'FAILED': 'failed', + 'UPSTREAM_FAILED': 'failed', + 'SKIPPED': 'failed', + 'REMOVED': 'failed', + 'SHUTDOWN': 'failed', + 'SUCCESS': 'successful' + } + if dag_status in status_result_map: + # We don't need to check anything else, we know the status of the + # deployment is completed and we can map directly to a result + status = 'completed' + result = status_result_map[dag_status] + else: + # The DAG could still be running, or be in some other state, so we + # need to dig into the DAG to determine what the result really is + # Use an ActionsHelper to check all the DAG steps + helper = ActionsHelper(action.get('id')) + result = helper.get_result_from_dag_steps() + + # Check to see if we need to override the status to "completed" + if force_completed: + status = "completed" + + # Return the dictionary of data + return { + 'status': status, + 'results': result, + 'context-marker': action.get('context_marker', 'unknown'), + 'action': action.get('id', 'unknown'), + 'document_url': document_url, + 'user': action.get('user', 'unknown'), + 'date': action.get('timestamp', 'unknown') + } + + class ActionsHelper(object): """ A helper class for Shipyard actions @@ -164,6 +235,78 @@ class ActionsHelper(object): return steps + def _get_latest_steps(self): + """Get all the steps for the action, and return the latest try on each + + :returns: All dictionary of task_id (a string name) to step details + :rtype: dict + """ + latest_steps = {} + for step in self._get_all_steps(): + task_id = step['task_id'] + if task_id in latest_steps: + # We already have this step, see if this one is more recent + # than the one we already have + if step['try_number'] > latest_steps[task_id]['try_number']: + latest_steps[task_id] = step + else: + # Step we have not seen yet + latest_steps[task_id] = step + + return latest_steps + + def get_result_from_dag_steps(self): + """Look up the DAG steps, and create a result string based on the state + of the DAG steps + + We will only check "critical" steps, as these are what's most + important, and won't even run if other steps fail in the first place + If any critical steps have a state that falls under our success states + and no other critical steps have a state that falls under the failed or + running states, result will be "successful" + If any critical steps have a state that fall under our failed states, + result will be "failed" + If any critical steps have a state that fall under our running states, + result will be "pending" + If no critical steps have states that fall under any of the state sets, + result will be "unknown" + + :returns: The result of the DAG based on the DAG's steps + :rtype: str + """ + result = 'unknown' + running_states = [ + None, + 'None', + 'scheduled', + 'queued', + 'running', + 'up_for_retry', + 'up_for_reschedule' + ] + failed_states = ['shutdown', 'failed', 'upstream_failed'] + success_states = ['skipped', 'success'] + latest_steps = self._get_latest_steps() + for step_id in CRITICAL_DAG_STEPS: + if step_id in latest_steps: + state = latest_steps[step_id]['state'] + if state in success_states and result == 'unknown': + result = 'successful' + elif state in failed_states: + result = 'failed' + break + elif state in running_states: + result = 'pending' + elif state not in success_states: + # The state we are looking at doesn't fall under any of our + # known states + LOG.warning('Found DAG step with unexpected state: {}'. + format(state)) + result = 'unknown' + break + + return result + def get_step(self, step_id, try_number=None): """ :param step_id: Step ID - task_id in db diff --git a/src/bin/shipyard_airflow/shipyard_airflow/dags/dag_names.py b/src/bin/shipyard_airflow/shipyard_airflow/dags/dag_names.py index 3fba537f..ac8ca05f 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/dags/dag_names.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/dags/dag_names.py @@ -31,3 +31,12 @@ GET_RENDERED_DOC = 'get_rendered_doc' SKIP_UPGRADE_AIRFLOW = 'skip_upgrade_airflow' UPGRADE_AIRFLOW = 'upgrade_airflow' DESTROY_SERVER = 'destroy_nodes' + +# Define a list of critical steps, used to determine successfulness of a +# still-running DAG +CRITICAL_DAG_STEPS = [ + ARMADA_BUILD_DAG_NAME, + SKIP_UPGRADE_AIRFLOW, + UPGRADE_AIRFLOW, + ARMADA_TEST_RELEASES +] diff --git a/src/bin/shipyard_airflow/tests/unit/control/test_action_helper.py b/src/bin/shipyard_airflow/tests/unit/control/test_action_helper.py index aa4ee080..b9cea94a 100644 --- a/src/bin/shipyard_airflow/tests/unit/control/test_action_helper.py +++ b/src/bin/shipyard_airflow/tests/unit/control/test_action_helper.py @@ -13,9 +13,142 @@ # limitations under the License. """ Tests for the action_helper.py module """ -from shipyard_airflow.control.helpers import action_helper +from unittest.mock import patch import yaml +from shipyard_airflow.control.helpers import action_helper +from shipyard_airflow.control.helpers.design_reference_helper import ( + DesignRefHelper +) + + +def get_repeated_steps(): + """Returns a list of fake step dictionaries with repeated steps (tries) + + For use in testing getting the latest of a step + Currently, for tests that use this, the only thing that matters is the + task ID and the try number. If this function gets used by more/future tests + more data may need to be added + + task_A tries: 1, 2, 3 + task_B tries: 1 + task_C tries: 1, 2 + task_D tries: 1 + task_E tries: 1 + + :returns: A list of fake (and incomplete) step dictionaries, some of which + are repeated across multiple tries + :rtype: list + """ + return [ + { + 'task_id': 'task_A', + 'try_number': 1 + }, + { + 'task_id': 'task_A', + 'try_number': 2 + }, + { + 'task_id': 'task_A', + 'try_number': 3 + }, + { + 'task_id': 'task_B', + 'try_number': 1 + }, + { + 'task_id': 'task_C', + 'try_number': 2 + }, + { + 'task_id': 'task_C', + 'try_number': 1 + }, + { + 'task_id': 'task_D', + 'try_number': 1 + }, + { + 'task_id': 'task_E', + 'try_number': 1 + } + ] + + +def get_fake_latest_step_dict_failed(): + """Make a fake dictionary of "latest" steps that represent a failed dag + + The only key required by the tests calling this function is "state", so + the steps contained in the returned dict are incomplete + + :returns: A dictionary of "latest" steps that represent a failed dag + :rtype: dict + """ + return { + 'armada_build': {'state': 'failed'}, + 'arbitrary_step': {'state': 'success'}, + 'another_arbitrary_step': {'state': 'running'}, + 'upgrade_airflow': {'state': 'success'}, + 'concurrency_check': {'state': 'success'} + } + + +def get_fake_latest_step_dict_running(): + """Make a fake dictionary of "latest" steps that represent a running dag + + The only key required by the tests calling this function is "state", so + the steps contained in the returned dict are incomplete + + :returns: A dictionary of "latest" steps that represent a running dag + :rtype: dict + """ + return { + 'armada_build': {'state': 'queued'}, + 'arbitrary_step': {'state': 'success'}, + 'another_arbitrary_step': {'state': 'running'}, + 'upgrade_airflow': {'state': 'running'}, + 'concurrency_check': {'state': 'success'} + } + + +def get_fake_latest_step_dict_successful(): + """Make a fake dictionary of "latest" steps that represent a successful dag + + The only key required by the tests calling this function is "state", so + the steps contained in the returned dict are incomplete + + :returns: A dictionary of "latest" steps that represent a successful dag + :rtype: dict + """ + return { + 'armada_build': {'state': 'success'}, + 'arbitrary_step': {'state': 'success'}, + 'another_arbitrary_step': {'state': 'success'}, + 'upgrade_airflow': {'state': 'skipped'}, + 'concurrency_check': {'state': 'success'} + } + + +def get_fake_latest_step_dict_unknown(): + """Make a fake dictionary of "latest" steps that represent a dag of unknown + result + + The only key required by the tests calling this function is "state", so + the steps contained in the returned dict are incomplete + + :returns: A dictionary of "latest" steps that represent a dag of unknown + result + :rtype: dict + """ + return { + 'armada_build': {'state': 'success'}, + 'arbitrary_step': {'state': 'what'}, + 'another_arbitrary_step': {'state': 'are'}, + 'upgrade_airflow': {'state': 'these'}, + 'concurrency_check': {'state': 'states?'} + } + def test_determine_lifecycle(): dag_statuses = [ @@ -168,3 +301,225 @@ def test_get_step(): try_number = 2 step = actions_helper.get_step(step_id, try_number) assert(step['hostname'].startswith('airflow-worker-1')) + + +@patch('shipyard_airflow.control.helpers.deckhand_client.DeckhandClient.' + 'get_path') +@patch('shipyard_airflow.control.helpers.design_reference_helper.' + 'DesignRefHelper.get_design_reference_href', return_value='href') +def test_get_deployment_status_no_action_helper_completed_failed(get_href, + get_path): + action = { + 'committed_rev_id': 'rev_id', + 'context_marker': 'markofcontext', + 'dag_status': 'FAILED', + 'id': 'action_id', + 'timestamp': 'my_timestamp', + 'user': 'cool-person' + } + expected_data = { + 'status': 'completed', + 'results': 'failed', + 'context-marker': action['context_marker'], + 'action': action['id'], + 'document_url': 'href', + 'user': action['user'], + 'date': action['timestamp'] + } + + deployment_status = action_helper.get_deployment_status(action) + assert deployment_status['status'] == expected_data['status'] + assert deployment_status['results'] == expected_data['results'] + assert (deployment_status['context-marker'] == + expected_data['context-marker']) + assert deployment_status['action'] == expected_data['action'] + assert deployment_status['document_url'] == expected_data['document_url'] + assert deployment_status['user'] == expected_data['user'] + assert deployment_status['date'] == expected_data['date'] + get_href.assert_called_once_with(action['committed_rev_id']) + assert get_path.called # This means we created a DesignRefHelper object + + +@patch('shipyard_airflow.control.helpers.deckhand_client.DeckhandClient.' + 'get_path') +@patch('shipyard_airflow.control.helpers.design_reference_helper.' + 'DesignRefHelper.get_design_reference_href', return_value='href') +def test_get_deployment_status_no_action_helper_completed_success(get_href, + get_path): + action = { + 'committed_rev_id': 'rev_id', + 'context_marker': 'markofcontext', + 'dag_status': 'SUCCESS', + 'id': 'action_id', + 'timestamp': 'my_timestamp', + 'user': 'cool-person' + } + expected_data = { + 'status': 'completed', + 'results': 'successful', + 'context-marker': action['context_marker'], + 'action': action['id'], + 'document_url': 'href', + 'user': action['user'], + 'date': action['timestamp'] + } + + deployment_status = action_helper.get_deployment_status(action) + assert deployment_status['status'] == expected_data['status'] + assert deployment_status['results'] == expected_data['results'] + assert (deployment_status['context-marker'] == + expected_data['context-marker']) + assert deployment_status['action'] == expected_data['action'] + assert deployment_status['document_url'] == expected_data['document_url'] + assert deployment_status['user'] == expected_data['user'] + assert deployment_status['date'] == expected_data['date'] + get_href.assert_called_once_with(action['committed_rev_id']) + assert get_path.called # This means we created a DesignRefHelper object + + +@patch.object(action_helper.ActionsHelper, + 'get_result_from_dag_steps', + return_value='result') +@patch('shipyard_airflow.control.helpers.deckhand_client.DeckhandClient.' + 'get_path') +@patch('shipyard_airflow.control.helpers.design_reference_helper.' + 'DesignRefHelper.get_design_reference_href', return_value='href') +def test_get_deployment_status_use_action_helper(get_href, + get_path, + get_result): + action = { + 'committed_rev_id': 'rev_id', + 'context_marker': 'markofcontext', + 'dag_status': 'ASDFJKL:', + 'id': 'action_id', + 'timestamp': 'my_timestamp', + 'user': 'cool-person' + } + expected_data = { + 'status': 'running', + 'results': 'result', + 'context-marker': action['context_marker'], + 'action': action['id'], + 'document_url': 'href', + 'user': action['user'], + 'date': action['timestamp'] + } + + deployment_status = action_helper.get_deployment_status(action) + assert deployment_status['status'] == expected_data['status'] + assert deployment_status['results'] == expected_data['results'] + assert (deployment_status['context-marker'] == + expected_data['context-marker']) + assert deployment_status['action'] == expected_data['action'] + assert deployment_status['document_url'] == expected_data['document_url'] + assert deployment_status['user'] == expected_data['user'] + assert deployment_status['date'] == expected_data['date'] + get_href.assert_called_once_with(action['committed_rev_id']) + assert get_result.called + assert get_path.called # This means we created a DesignRefHelper object + + +@patch.object(action_helper.ActionsHelper, + 'get_result_from_dag_steps', + return_value='result') +@patch('shipyard_airflow.control.helpers.deckhand_client.DeckhandClient.' + 'get_path') +@patch('shipyard_airflow.control.helpers.design_reference_helper.' + 'DesignRefHelper.get_design_reference_href', return_value='href') +def test_get_deployment_status_use_action_helper_force_completed(get_href, + get_path, + get_result): + action = { + 'committed_rev_id': 'rev_id', + 'context_marker': 'markofcontext', + 'dag_status': 'ASDFJKL:', + 'id': 'action_id', + 'timestamp': 'my_timestamp', + 'user': 'cool-person' + } + expected_data = { + 'status': 'completed', + 'results': 'result', + 'context-marker': action['context_marker'], + 'action': action['id'], + 'document_url': 'href', + 'user': action['user'], + 'date': action['timestamp'] + } + + deployment_status = action_helper.get_deployment_status(action, True) + assert deployment_status['status'] == expected_data['status'] + assert deployment_status['results'] == expected_data['results'] + assert (deployment_status['context-marker'] == + expected_data['context-marker']) + assert deployment_status['action'] == expected_data['action'] + assert deployment_status['document_url'] == expected_data['document_url'] + assert deployment_status['user'] == expected_data['user'] + assert deployment_status['date'] == expected_data['date'] + get_href.assert_called_once_with(action['committed_rev_id']) + assert get_result.called + assert get_path.called # This means we created a DesignRefHelper object + + +@patch.object(action_helper.ActionsHelper, '_get_action_info') +@patch.object(action_helper.ActionsHelper, '_get_all_steps', + return_value=get_repeated_steps()) +def test__get_latest_steps(get_all_steps, get_action_info): + helper = action_helper.ActionsHelper(action_id='irrelevant') + latest_steps_dict = helper._get_latest_steps() + assert latest_steps_dict['task_A']['try_number'] == 3 + assert latest_steps_dict['task_B']['try_number'] == 1 + assert latest_steps_dict['task_C']['try_number'] == 2 + assert latest_steps_dict['task_D']['try_number'] == 1 + assert latest_steps_dict['task_E']['try_number'] == 1 + assert get_all_steps.called + assert get_action_info.called + + +@patch.object(action_helper.ActionsHelper, '_get_action_info') +@patch.object(action_helper.ActionsHelper, '_get_latest_steps', + return_value=get_fake_latest_step_dict_successful()) +def test_get_result_from_dag_steps_success(get_latest_steps, get_action_info): + helper = action_helper.ActionsHelper(action_id='irrelevant') + result = helper.get_result_from_dag_steps() + assert result == 'successful' + assert get_latest_steps.called + assert get_action_info.called + + +@patch.object(action_helper.ActionsHelper, '_get_action_info') +@patch.object(action_helper.ActionsHelper, '_get_latest_steps', + return_value=get_fake_latest_step_dict_failed()) +def test_get_result_from_dag_steps_failed(get_latest_steps, get_action_info): + helper = action_helper.ActionsHelper(action_id='irrelevant') + result = helper.get_result_from_dag_steps() + assert result == 'failed' + assert get_latest_steps.called + assert get_action_info.called + + +@patch.object(action_helper.ActionsHelper, '_get_action_info') +@patch.object(action_helper.ActionsHelper, '_get_latest_steps', + return_value=get_fake_latest_step_dict_running()) +def test_get_result_from_dag_steps_running(get_latest_steps, get_action_info): + helper = action_helper.ActionsHelper(action_id='irrelevant') + result = helper.get_result_from_dag_steps() + assert result == 'pending' + assert get_latest_steps.called + assert get_action_info.called + + +@patch('logging.Logger.warning') +@patch.object(action_helper.ActionsHelper, '_get_action_info') +@patch.object(action_helper.ActionsHelper, '_get_latest_steps', + return_value=get_fake_latest_step_dict_unknown()) +def test_get_result_from_dag_steps_unknown(get_latest_steps, + get_action_info, + log_warning_patch): + helper = action_helper.ActionsHelper(action_id='irrelevant') + result = helper.get_result_from_dag_steps() + assert result == 'unknown' + assert get_latest_steps.called + assert get_action_info.called + # Each critical step that had an unknown state should log a warning: + assert log_warning_patch.call_count == 1