From b7934502d6c2e5dc2f4aefcc92630ab8b867fcf1 Mon Sep 17 00:00:00 2001 From: Anthony Lin Date: Wed, 2 May 2018 07:41:37 +0000 Subject: [PATCH] Clean up Operators that are not in use The airflow_task_state Operator was created a while back before the major revamp/refactoring of Shipyard/Airflow API/CLI and other custom Operators. It is not used in any of the workflow and will be removed. The deckhand_get_design Operator is no longer needed after a new logic to retrieve the last committed revision was introduced [0]. This task will now be performed during the initial stage of workflow execution by Shipyard. Hence we will remove this Operator as well. [0] https://review.gerrithub.io/#/c/att-comdev/shipyard/+/408457/ Change-Id: I6c0311d583b69056839c9e33e738f05f54a8288f --- .../plugins/airflow_task_state_operators.py | 92 ------------------- .../plugins/deckhand_get_design.py | 88 ------------------ 2 files changed, 180 deletions(-) delete mode 100644 src/bin/shipyard_airflow/shipyard_airflow/plugins/airflow_task_state_operators.py delete mode 100644 src/bin/shipyard_airflow/shipyard_airflow/plugins/deckhand_get_design.py diff --git a/src/bin/shipyard_airflow/shipyard_airflow/plugins/airflow_task_state_operators.py b/src/bin/shipyard_airflow/shipyard_airflow/plugins/airflow_task_state_operators.py deleted file mode 100644 index e8a527d1..00000000 --- a/src/bin/shipyard_airflow/shipyard_airflow/plugins/airflow_task_state_operators.py +++ /dev/null @@ -1,92 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging -# Using nosec to prevent Bandit blacklist reporting. Subprocess is used -# in a controlled way as part of this operator. -import subprocess # nosec - -from airflow.exceptions import AirflowException -from airflow.models import BaseOperator -from airflow.plugins_manager import AirflowPlugin -from airflow.utils.decorators import apply_defaults - - -class TaskStateOperator(BaseOperator): - """ - Retrieve Task State - :airflow_dag_id: Dag ID - :airflow_task_id: Task ID - :airflow_execution_date: Task Execution Date - """ - - @apply_defaults - def __init__(self, - airflow_command=None, - airflow_dag_id=None, - airflow_task_id=None, - airflow_execution_date=None, - *args, - **kwargs): - - super(TaskStateOperator, self).__init__(*args, **kwargs) - self.airflow_dag_id = airflow_dag_id - self.airflow_task_id = airflow_task_id - self.airflow_execution_date = airflow_execution_date - self.airflow_command = [ - 'airflow', 'task_state', airflow_dag_id, airflow_task_id, - airflow_execution_date - ] - - def execute(self, context): - logging.info("Running Airflow Command: %s", self.airflow_command) - - # Execute Airflow CLI Command - airflow_cli = subprocess.Popen( # nosec - self.airflow_command, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT) - - # Logs Output - # Filter out logging messages from standard output - # and keep only the relevant information - line = '' - for line in iter(airflow_cli.stdout.readline, b''): - line = line.strip() - - if line.startswith('['): - pass - else: - logging.info(line) - - # Wait for child process to terminate. - # Set and return returncode attribute. - airflow_cli.wait() - - # Raise Execptions if Task State Command Fails - # Return XCOM State - task_instance = context['task_instance'] - - if airflow_cli.returncode: - task_state = 'failed' - task_instance.xcom_push('task_state', task_state) - raise AirflowException("Failed to Retrieve Task State") - else: - task_state = line - task_instance.xcom_push('task_state', task_state) - - -class TaskStatePlugin(AirflowPlugin): - name = "task_state_plugin" - operators = [TaskStateOperator] diff --git a/src/bin/shipyard_airflow/shipyard_airflow/plugins/deckhand_get_design.py b/src/bin/shipyard_airflow/shipyard_airflow/plugins/deckhand_get_design.py deleted file mode 100644 index b7800cc8..00000000 --- a/src/bin/shipyard_airflow/shipyard_airflow/plugins/deckhand_get_design.py +++ /dev/null @@ -1,88 +0,0 @@ -# Copyright 2018 AT&T Intellectual Property. All other rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import logging -import os -import requests -import yaml - -from airflow.plugins_manager import AirflowPlugin -from airflow.exceptions import AirflowException - -from deckhand_base_operator import DeckhandBaseOperator - -LOG = logging.getLogger(__name__) - - -class DeckhandGetDesignOperator(DeckhandBaseOperator): - - """Deckhand Get Design Operator - - This operator will trigger deckhand to retrieve the last - committed revision and save it in airflow as xcom - - """ - - def do_execute(self): - - # Retrieve Keystone Token and assign to X-Auth-Token Header - x_auth_token = {"X-Auth-Token": self.svc_token} - - # Form Revision Endpoint - revision_endpoint = os.path.join(self.deckhand_svc_endpoint, - 'revisions') - - # Retrieve Revision - LOG.info("Retrieving revisions information...") - - try: - query_params = {'tag': 'committed', 'sort': 'id', 'order': 'desc'} - revisions = yaml.safe_load(requests.get( - revision_endpoint, - headers=x_auth_token, - params=query_params, - timeout=self.deckhand_client_read_timeout).text) - - except requests.exceptions.RequestException as e: - # Dump logs from Deckhand pods - self.get_k8s_logs() - - raise AirflowException(e) - - # Print info about revisions from DeckHand - LOG.info("Revisions response: %s", revisions) - LOG.info("The number of committed revisions is %s", - revisions['count']) - - # Search for the latest committed version and save it as xcom. - # Since the order : desc paramater above, this is index 0 if there - # are any results - revision_list = revisions.get('results', []) - if revision_list: - self.committed_ver = revision_list[0].get('id') - LOG.info("Latest committed revision is %d", self.committed_ver) - - # Error if we cannot resolve the committed version to use. - if not self.committed_ver: - # Dump logs from Deckhand pods - self.get_k8s_logs() - - raise AirflowException("No committed revision found in Deckhand!") - - -class DeckhandGetDesignOperatorPlugin(AirflowPlugin): - - """Creates DeckhandGetDesignOperator in Airflow.""" - - name = 'deckhand_get_design_operator' - operators = [DeckhandGetDesignOperator]