4164518502
This patch tries to cover some edge cases could happen during Shipyard Airflow operator execution. All operators at the moment make interactions with other services i.e. k8s pods. In a case of exceptions during execution of the operator, logs will be fetched from the appropriate pod and if the operator has "fetch_failure_details" method (see DrydockBaseOperator) it will be called as well. What exception could happen during an operator execution? Besides explicitly defined in code like DrydockClientUseFailureException, other exception e.g. KeyError or similar may be raised. It's not clear who is a culprit in that client side (Shipyard) or server side (Drydock, Armada, Deckhand, Promenade). So this patch applies defensive mode and gets logs from pods and gets additional details for any exceptional situations. For doing that do_execute method is wrapped with try..except in UcpBaseOperator.execute. While fetching logs from a pod and fetching failure details it makes appropriate logging by itself and finally reraises the original exception. Change-Id: If1501e9a24b05edb6eb32c7b1b2d27f24f3ee063
158 lines
5.7 KiB
Python
158 lines
5.7 KiB
Python
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
from datetime import datetime
|
|
import logging
|
|
# Using nosec to prevent Bandit blacklist reporting. Subprocess is used
|
|
# in a controlled way as part of this operator.
|
|
import subprocess # nosec
|
|
|
|
from airflow.plugins_manager import AirflowPlugin
|
|
from airflow.exceptions import AirflowException
|
|
|
|
try:
|
|
from deckhand_base_operator import DeckhandBaseOperator
|
|
except ImportError:
|
|
from shipyard_airflow.plugins.deckhand_base_operator import \
|
|
DeckhandBaseOperator
|
|
|
|
FAILED_STATUSES = ('failed', 'upstream_failed')
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
|
|
class DeckhandCreateSiteActionTagOperator(DeckhandBaseOperator):
|
|
|
|
"""Deckhand Create Site Action Tag Operator
|
|
|
|
This operator will trigger Deckhand to create a tag for the revision at
|
|
the end of the workflow. The tag will either be 'site-action-success'
|
|
or 'site-action-failure' (dependent upon the result of the workflow).
|
|
|
|
"""
|
|
|
|
def do_execute(self):
|
|
|
|
# Calculate total elapsed time for workflow
|
|
time_delta = datetime.now() - self.task_instance.execution_date
|
|
|
|
hours, remainder = divmod(time_delta.seconds, 3600)
|
|
minutes, seconds = divmod(remainder, 60)
|
|
|
|
LOG.info('The workflow took %d hr %d mins %d seconds to'
|
|
' execute', hours, minutes, seconds)
|
|
|
|
LOG.info("Retrieving final state of %s...", self.main_dag_name)
|
|
|
|
workflow_result = self.check_workflow_result()
|
|
|
|
LOG.info("Creating Site Action Tag for Revision %d", self.revision_id)
|
|
|
|
# Create site action tag
|
|
try:
|
|
if workflow_result:
|
|
self.deckhandclient.tags.create(revision_id=self.revision_id,
|
|
tag='site-action-success')
|
|
else:
|
|
self.deckhandclient.tags.create(revision_id=self.revision_id,
|
|
tag='site-action-failure')
|
|
|
|
LOG.info("Site Action Tag created for Revision %d",
|
|
self.revision_id)
|
|
|
|
except:
|
|
raise AirflowException("Failed to create revision tag!")
|
|
|
|
def check_task_result(self, task_id):
|
|
|
|
# Convert Execution Date from datetime format to string
|
|
fmt = '%Y-%m-%dT%H:%M:%S'
|
|
execution_date = self.task_instance.execution_date.strftime(fmt)
|
|
|
|
# Retrieve result of task execution
|
|
#
|
|
# Using nosec because:
|
|
# 1) this subprocess runs within the same container
|
|
# that runs this code
|
|
# 2) has no input that is sourced from an external user
|
|
# 3) Is not supported via any API that is also accessible to this
|
|
# container.
|
|
response = subprocess.run( # nosec
|
|
['airflow',
|
|
'task_state',
|
|
self.main_dag_name,
|
|
task_id,
|
|
execution_date],
|
|
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
|
|
if response.returncode != 0:
|
|
LOG.error("Encountered error while executing Airflow CLI!")
|
|
|
|
raise AirflowException(response.stderr.decode('utf-8'))
|
|
|
|
else:
|
|
# The result of the task state will be the last element of
|
|
# the list. The task should# either be in 'success' or
|
|
# 'failed' state as all relevant tasks would have completed
|
|
# execution at this point in time.
|
|
result = response.stdout.decode('utf-8').splitlines()[-1]
|
|
|
|
LOG.info("Task %s is in %s state", task_id, result)
|
|
|
|
return result
|
|
|
|
def check_workflow_result(self):
|
|
|
|
# Initialize Variables
|
|
task = ['armada_build']
|
|
task_result = {}
|
|
|
|
if self.main_dag_name in ['update_site', 'update_software']:
|
|
# NOTE: We will check the final state of the 'armada_build' task
|
|
# as a 'success' means that all tasks preceding it would either
|
|
# be in 'skipped' or 'success' state. A failure of 'armada_build'
|
|
# would mean that the workflow has failed. Hence it is sufficient
|
|
# to determine the success/failure of the 'deploy_site' workflow
|
|
# with the final state of the 'armada_build' task.
|
|
#
|
|
# NOTE: The 'update_site' and 'update_software' workflows contain
|
|
# additional steps for upgrading of worker pods.
|
|
for k in ['skip_upgrade_airflow', 'upgrade_airflow']:
|
|
task.append(k)
|
|
|
|
# Retrieve task result
|
|
for i in task:
|
|
task_result[i] = self.check_task_result(i)
|
|
|
|
# Check for failed task(s)
|
|
failed_task = [x for x in task if task_result[x] in FAILED_STATUSES]
|
|
|
|
if failed_task:
|
|
LOG.info("Either upstream tasks or tasks in the "
|
|
"workflow have failed: %s",
|
|
", ".join(failed_task))
|
|
|
|
return False
|
|
|
|
else:
|
|
LOG.info("All tasks completed successfully")
|
|
|
|
return True
|
|
|
|
|
|
class DeckhandCreateSiteActionTagOperatorPlugin(AirflowPlugin):
|
|
|
|
"""Creates DeckhandCreateSiteActionTagOperator in Airflow."""
|
|
|
|
name = 'deckhand_create_site_action_tag_operator'
|
|
operators = [DeckhandCreateSiteActionTagOperator]
|