shipyard/shipyard_airflow/plugins/xcom_puller.py
Anthony Lin f8fb44b7a0 Change Name of deckhand_get_design_version Subdag
We will need to provide distinctive/unique step_id/step_name in
order to make use of the Shipyard logs API/CLI. It is found during
testing that 'deckhand_get_design_version' is used as both the name
of the subdag as well as 'task_id' [0]. As such, we will only be able
to retrieve the logs of the subdag when we run a query with step_id as
'deckhand_get_design_version'.

This is an issue and hence we will change the name of the subdag so that
it is different from the task_id. In this way we will be retrieve the logs
for the subdag as well as the task.

[0] Current output of describe step (note the name of the steps in failed state):

Steps                                                                 Index        State
step/01CAYXMTK1ECXBK0SF4MQBYEND/action_xcom                           1            success
step/01CAYXMTK1ECXBK0SF4MQBYEND/dag_concurrency_check                 2            success
step/01CAYXMTK1ECXBK0SF4MQBYEND/preflight                             3            success
step/01CAYXMTK1ECXBK0SF4MQBYEND/deckhand_get_design_version           4            failed
step/01CAYXMTK1ECXBK0SF4MQBYEND/dag_deployment_configuration          5            upstream_failed
step/01CAYXMTK1ECXBK0SF4MQBYEND/validate_site_design                  6            upstream_failed
step/01CAYXMTK1ECXBK0SF4MQBYEND/deckhand_get_design_version           7            failed
step/01CAYXMTK1ECXBK0SF4MQBYEND/drydock_build                         8            upstream_failed
step/01CAYXMTK1ECXBK0SF4MQBYEND/ucp_preflight_check                   9            success
step/01CAYXMTK1ECXBK0SF4MQBYEND/k8s_preflight_check                   10           success
step/01CAYXMTK1ECXBK0SF4MQBYEND/shipyard_retrieve_rendered_doc        11           upstream_failed
step/01CAYXMTK1ECXBK0SF4MQBYEND/armada_build                          12           upstream_failed

Change-Id: I191cb8509c8d3d8e63f539b25c1693e9b8794aac
2018-04-13 10:26:32 -04:00

85 lines
3.2 KiB
Python

# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
LOG = logging.getLogger(__name__)
class XcomPuller(object):
"""XcomPuller provides a common source to get reused xcom values
One XcomPuller should be created per task.
Note: xcom values are found by using the current task instance
and finding the <dag_name>.<task_name> that the xcom was added
to the workflow.
The point of this class is to keep all this very configurable
naming in one place as much as possible so that changes to
the dag names and step names have less places to update.
"""
def __init__(self, main_dag_name, task_instance):
self.mdn = main_dag_name
self.ti = task_instance
def _get_xcom(self, source_task, dag_id=None, key=None, log_result=True):
"""Find a particular xcom value"""
if dag_id is None:
source_dag = self.mdn
else:
source_dag = "{}.{}".format(self.mdn, dag_id)
LOG.info("Retrieving xcom from %s.%s with key %s",
source_dag,
source_task,
key)
xcom_val = self.ti.xcom_pull(task_ids=source_task,
dag_id=source_dag,
key=key)
if log_result:
# log the xcom value - don't put large values in xcom!
LOG.info(xcom_val)
return xcom_val
def get_deployment_configuration(self):
"""Retrieve the deployment configuration dictionary"""
source_task = 'get_deployment_configuration'
source_dag = 'dag_deployment_configuration'
key = None
return self._get_xcom(source_task=source_task,
dag_id=source_dag,
key=key)
def get_action_info(self):
"""Retrive the action and action parameter info dictionary
Extract information related to current workflow. This is a dictionary
that contains information about the workflow such as action_id, name
and other related parameters
"""
source_task = 'action_xcom'
source_dag = None
key = 'action'
return self._get_xcom(source_task=source_task,
dag_id=source_dag,
key=key)
def get_design_version(self):
"""Retrieve the design version being used for this workflow"""
source_task = 'deckhand_get_design_version'
source_dag = 'get_design_version'
key = None
return self._get_xcom(source_task=source_task,
dag_id=source_dag,
key=key)