Merge "Add DeckHand Operator"

This commit is contained in:
Tin Lam 2017-10-05 23:13:00 -04:00 committed by Gerrit Code Review
commit 313888d30d
3 changed files with 165 additions and 19 deletions

View File

@ -0,0 +1,65 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from airflow.models import DAG
from airflow.operators import DeckhandOperator
from airflow.operators.subdag_operator import SubDagOperator
# Location of shiyard.conf
config_path = '/usr/local/airflow/plugins/shipyard.conf'
# Define Variables
parent_dag = 'deploy_site'
child_dag = 'deploy_site.deckhand_get_design_version'
# Names used for sub-subdags in the deckhand subdag
DECKHAND_GET_DESIGN_VERSION_DAG_NAME = 'deckhand_get_design_version'
def get_design_version(parent_dag_name, child_dag_name, args):
'''
Get Deckhand Design Version
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args)
operator = DeckhandOperator(
task_id=DECKHAND_GET_DESIGN_VERSION_DAG_NAME,
shipyard_conf=config_path,
action=DECKHAND_GET_DESIGN_VERSION_DAG_NAME,
main_dag_name=parent_dag,
sub_dag_name=child_dag,
dag=dag)
return dag
def get_design_deckhand(parent_dag_name, child_dag_name, args):
'''
Puts into atomic unit
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args)
deckhand_design = SubDagOperator(
subdag=get_design_version(dag.dag_id,
DECKHAND_GET_DESIGN_VERSION_DAG_NAME,
args),
task_id=DECKHAND_GET_DESIGN_VERSION_DAG_NAME,
dag=dag)
return dag

View File

@ -14,16 +14,17 @@
from datetime import timedelta
import airflow
from airflow import DAG
import failure_handlers
from preflight_checks import all_preflight_checks
from drydock_deploy_site import deploy_site_drydock
from validate_site_design import validate_site_design
from airflow.operators.subdag_operator import SubDagOperator
from airflow import DAG
from airflow.operators import ConcurrencyCheckOperator
from airflow.operators import DeckhandOperator
from airflow.operators import PlaceholderOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.subdag_operator import SubDagOperator
from deckhand_get_design import get_design_deckhand
from drydock_deploy_site import deploy_site_drydock
from preflight_checks import all_preflight_checks
from validate_site_design import validate_site_design
"""
deploy_site is the top-level orchestration DAG for deploying a site using the
Undercloud platform.
@ -77,7 +78,9 @@ preflight = SubDagOperator(
on_failure_callback=failure_handlers.step_failure_handler,
dag=dag)
get_design_version = DeckhandOperator(
get_design_version = SubDagOperator(
subdag=get_design_deckhand(
PARENT_DAG_NAME, DECKHAND_GET_DESIGN_VERSION, args=default_args),
task_id=DECKHAND_GET_DESIGN_VERSION,
on_failure_callback=failure_handlers.step_failure_handler,
dag=dag)

View File

@ -11,32 +11,110 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import requests
import yaml
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.plugins_manager import AirflowPlugin
from airflow.exceptions import AirflowException
from service_endpoint import ucp_service_endpoint
class DeckhandOperator(BaseOperator):
"""
Supports interaction with Deckhand.
Supports interaction with Deckhand
:param action: Task to perform
:param main_dag_name: Parent Dag
:param shipyard_conf: Location of shipyard.conf
:param sub_dag_name: Child Dag
"""
# TODO () remove this special coloring when the operator is done.
ui_color = '#e8f7e4'
@apply_defaults
def __init__(self, *args, **kwargs):
super(DeckhandOperator, self).__init__(*args, **kwargs)
def __init__(self,
action=None,
main_dag_name=None,
shipyard_conf=None,
sub_dag_name=None,
workflow_info={},
xcom_push=True,
*args, **kwargs):
super(DeckhandOperator, self).__init__(*args, **kwargs)
self.action = action
self.main_dag_name = main_dag_name
self.shipyard_conf = shipyard_conf
self.sub_dag_name = sub_dag_name
self.workflow_info = workflow_info
self.xcom_push_flag = xcom_push
# TODO () make this communicate with Deckhand.
# Needs to expose functionality so general interaction
# with deckhand can occur.
def execute(self, context):
logging.info('%s : %s !!! not implemented. '
'Need to get design revision from Deckhand',
self.dag.dag_id, self.task_id)
# Initialize Variables
context['svc_type'] = 'deckhand'
deckhand_design_version = None
# Define task_instance
task_instance = context['task_instance']
# Extract information related to current workflow
# The workflow_info variable will be a dictionary
# that contains information about the workflow such
# as action_id, name and other related parameters
workflow_info = task_instance.xcom_pull(
task_ids='action_xcom', key='action',
dag_id=self.main_dag_name)
# Logs uuid of action performed by the Operator
logging.info("DeckHand Operator for action %s", workflow_info['id'])
# Deckhand API Call
if self.action == 'deckhand_get_design_version':
# Retrieve Endpoint Information
context['svc_endpoint'] = ucp_service_endpoint(self, context)
logging.info("Deckhand endpoint is %s", context['svc_endpoint'])
# Retrieve DeckHand Design Version
deckhand_design_version = self.deckhand_get_design(context)
if deckhand_design_version:
return deckhand_design_version
else:
raise AirflowException('Failed to retrieve revision ID!')
else:
logging.info('No Action to Perform')
def deckhand_get_design(self, context):
# Form Revision Endpoint
revision_endpoint = context['svc_endpoint'] + '/revisions'
# Retrieve Revision
logging.info("Retrieving revisions information...")
revisions = yaml.safe_load(requests.get(revision_endpoint).text)
# Print the number of revisions that is currently available on
# DeckHand
logging.info("The number of revisions is %s", revisions['count'])
# Initialize Revision ID
committed_ver = None
# Construct revision_list
revision_list = revisions.get('results')
# Search for the last committed version and save it as xcom
for revision in reversed(revision_list):
if 'committed' in revision.get('tags'):
committed_ver = revision.get('id')
break
if committed_ver:
logging.info("Last committed revision is %d", committed_ver)
return committed_ver
else:
raise AirflowException("Failed to retrieve committed revision!")
class DeckhandOperatorPlugin(AirflowPlugin):