Refactor Deckhand Operator

We will move away from the usage of if/else block and will instead
make use of inheritance (deckhand base operator) and task specific
operators in our dags to execute the workflow

Change-Id: I2fb8607162951ea6bdaccca6863ee1c5923bb81f
This commit is contained in:
Anthony Lin 2018-02-19 10:21:52 +00:00 committed by Bryan Strassner
parent d3419123c3
commit 35679d6754
8 changed files with 394 additions and 284 deletions

View File

@ -1,4 +1,4 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -13,7 +13,7 @@
# limitations under the License.
from airflow.models import DAG
from airflow.operators import DeckhandOperator
from airflow.operators import DeckhandGetDesignOperator
# Location of shiyard.conf
@ -30,10 +30,9 @@ def get_design_deckhand(parent_dag_name, child_dag_name, args):
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args)
deckhand_design = DeckhandOperator(
deckhand_design = DeckhandGetDesignOperator(
task_id='deckhand_get_design_version',
shipyard_conf=config_path,
action='deckhand_get_design_version',
main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name,
dag=dag)
@ -44,10 +43,9 @@ def get_design_deckhand(parent_dag_name, child_dag_name, args):
# the issue with the Deckhand client. We will uncomment this
# block once the issue with Deckhand is resolved.
#
# shipyard_retrieve_rendered_doc = DeckhandOperator(
# shipyard_retrieve_rendered_doc = DeckhandValidateSiteDesignOperator(
# task_id='shipyard_retrieve_rendered_doc',
# shipyard_conf=config_path,
# action='shipyard_retrieve_rendered_doc',
# main_dag_name=parent_dag_name,
# sub_dag_name=child_dag_name,
# dag=dag)

View File

@ -14,7 +14,7 @@
from airflow.models import DAG
from airflow.operators import ArmadaOperator
from airflow.operators import DeckhandOperator
from airflow.operators import DeckhandValidateSiteDesignOperator
from airflow.operators import DryDockOperator
# Location of shiyard.conf
@ -31,10 +31,9 @@ def validate_site_design(parent_dag_name, child_dag_name, args):
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args)
deckhand_validate_docs = DeckhandOperator(
deckhand_validate_docs = DeckhandValidateSiteDesignOperator(
task_id='deckhand_validate_site_design',
shipyard_conf=config_path,
action='deckhand_validate_site_design',
main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name,
dag=dag)

View File

@ -0,0 +1,175 @@
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import configparser
import logging
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.plugins_manager import AirflowPlugin
from airflow.exceptions import AirflowException
from deckhand.client import client as deckhand_client
from service_endpoint import ucp_service_endpoint
from service_token import shipyard_service_token
class DeckhandBaseOperator(BaseOperator):
"""Deckhand Base Operator
All deckhand related workflow operators will use the deckhand
base operator as the parent and inherit attributes and methods
from this class
"""
@apply_defaults
def __init__(self,
committed_ver=None,
deckhandclient=None,
deckhand_client_read_timeout=None,
deckhand_svc_endpoint=None,
deckhand_svc_type='deckhand',
main_dag_name=None,
revision_id=None,
shipyard_conf=None,
sub_dag_name=None,
svc_session=None,
svc_token=None,
validation_read_timeout=None,
workflow_info={},
xcom_push=True,
*args, **kwargs):
"""Initialization of DeckhandBaseOperator object.
:param committed_ver: Last committed version
:param deckhandclient: An instance of deckhand client
:param deckhand_client_read_timeout: Deckhand client connect timeout
:param deckhand_svc_endpoint: Deckhand Service Endpoint
:param deckhand_svc_type: Deckhand Service Type
:param main_dag_name: Parent Dag
:param revision_id: Target revision for workflow
:param shipyard_conf: Path of shipyard.conf
:param sub_dag_name: Child Dag
:param svc_session: Keystone Session
:param svc_token: Keystone Token
:param validation_read_timeout: Deckhand validation timeout
:param workflow_info: Information related to current workflow
:param xcom_push: xcom usage
"""
super(DeckhandBaseOperator, self).__init__(*args, **kwargs)
self.committed_ver = committed_ver
self.deckhandclient = deckhandclient
self.deckhand_client_read_timeout = deckhand_client_read_timeout
self.deckhand_svc_endpoint = deckhand_svc_endpoint
self.deckhand_svc_type = deckhand_svc_type
self.main_dag_name = main_dag_name
self.revision_id = revision_id
self.shipyard_conf = shipyard_conf
self.sub_dag_name = sub_dag_name
self.svc_session = svc_session
self.svc_token = svc_token
self.validation_read_timeout = validation_read_timeout
self.workflow_info = workflow_info
self.xcom_push_flag = xcom_push
def execute(self, context):
# Execute deckhand base function
self.deckhand_base(context)
# Exeute child function
self.do_execute()
# Push last committed version to xcom for the
# 'deckhand_get_design_version' subdag
if self.sub_dag_name == 'deckhand_get_design_version':
return self.committed_ver
@shipyard_service_token
def deckhand_base(self, context):
# Read and parse shiyard.conf
config = configparser.ConfigParser()
config.read(self.shipyard_conf)
# Initialize variables
self.deckhand_client_read_timeout = int(config.get(
'requests_config', 'deckhand_client_read_timeout'))
self.validation_read_timeout = int(config.get(
'requests_config', 'validation_read_timeout'))
# Define task_instance
task_instance = context['task_instance']
# Extract information related to current workflow
# The workflow_info variable will be a dictionary
# that contains information about the workflow such
# as action_id, name and other related parameters
self.workflow_info = task_instance.xcom_pull(
task_ids='action_xcom', key='action',
dag_id=self.main_dag_name)
# Logs uuid of Shipyard action
logging.info("Executing Shipyard Action %s",
self.workflow_info['id'])
# Retrieve Endpoint Information
self.deckhand_svc_endpoint = ucp_service_endpoint(
self, svc_type=self.deckhand_svc_type)
logging.info("Deckhand endpoint is %s",
self.deckhand_svc_endpoint)
# Set up DeckHand Client
logging.info("Setting up DeckHand Client...")
# NOTE: The communication between the Airflow workers
# and Deckhand happens via the 'internal' endpoint.
self.deckhandclient = deckhand_client.Client(
session=self.svc_session, endpoint_type='internal')
if not self.deckhandclient:
raise AirflowException('Failed to set up deckhand client!')
# Retrieve 'revision_id' from xcom for tasks other than
# 'deckhand_get_design_version'
#
# NOTE: In the case of 'deploy_site', the dag_id will
# be 'deploy_site.deckhand_get_design_version' for the
# 'deckhand_get_design_version' task. We need to extract
# the xcom value from it in order to get the value of the
# last committed revision ID
if self.sub_dag_name != 'deckhand_get_design_version':
# Retrieve 'revision_id' from xcom
self.revision_id = task_instance.xcom_pull(
task_ids='deckhand_get_design_version',
dag_id=self.main_dag_name + '.deckhand_get_design_version')
if self.revision_id:
logging.info("Revision ID is %d", self.revision_id)
else:
raise AirflowException('Failed to retrieve Revision ID!')
class DeckhandBaseOperatorPlugin(AirflowPlugin):
"""Creates DeckhandBaseOperator in Airflow."""
name = 'deckhand_base_operator_plugin'

View File

@ -0,0 +1,81 @@
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import requests
import yaml
from airflow.plugins_manager import AirflowPlugin
from airflow.exceptions import AirflowException
from deckhand_base_operator import DeckhandBaseOperator
class DeckhandGetDesignOperator(DeckhandBaseOperator):
"""Deckhand Get Design Operator
This operator will trigger deckhand to retrieve the last
committed revision and save it in airflow as xcom
"""
def do_execute(self):
# Retrieve Keystone Token and assign to X-Auth-Token Header
x_auth_token = {"X-Auth-Token": self.svc_token}
# Form Revision Endpoint
revision_endpoint = os.path.join(self.deckhand_svc_endpoint,
'revisions')
# Retrieve Revision
logging.info("Retrieving revisions information...")
try:
query_params = {'tag': 'committed', 'sort': 'id', 'order': 'desc'}
revisions = yaml.safe_load(requests.get(
revision_endpoint,
headers=x_auth_token,
params=query_params,
timeout=self.deckhand_client_read_timeout).text)
except requests.exceptions.RequestException as e:
raise AirflowException(e)
# Print the number of revisions that is currently available
# in DeckHand
logging.info("The number of revisions is %s", revisions['count'])
# Search for the last committed version and save it as xcom
revision_list = revisions.get('results', [])
if revision_list:
self.committed_ver = revision_list[-1].get('id')
else:
raise AirflowException("No revision found in Deckhand!")
if self.committed_ver:
logging.info("Last committed revision is %d", self.committed_ver)
else:
raise AirflowException("Failed to retrieve committed revision!")
class DeckhandGetDesignOperatorPlugin(AirflowPlugin):
"""Creates DeckhandGetDesignOperator in Airflow."""
name = 'deckhand_get_design_operator'
operators = [DeckhandGetDesignOperator]

View File

@ -1,273 +0,0 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import configparser
import logging
import os
import requests
import yaml
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.plugins_manager import AirflowPlugin
from airflow.exceptions import AirflowException
from keystoneauth1.identity import v3 as keystone_v3
from keystoneauth1 import session as keystone_session
from deckhand.client import client as deckhand_client
from service_endpoint import ucp_service_endpoint
from service_token import shipyard_service_token
class DeckhandOperator(BaseOperator):
"""
Supports interaction with Deckhand
:param action: Task to perform
:param main_dag_name: Parent Dag
:param shipyard_conf: Location of shipyard.conf
:param sub_dag_name: Child Dag
"""
@apply_defaults
def __init__(self,
action=None,
main_dag_name=None,
shipyard_conf=None,
sub_dag_name=None,
svc_token=None,
workflow_info={},
xcom_push=True,
*args, **kwargs):
super(DeckhandOperator, self).__init__(*args, **kwargs)
self.action = action
self.main_dag_name = main_dag_name
self.shipyard_conf = shipyard_conf
self.sub_dag_name = sub_dag_name
self.svc_token = svc_token
self.workflow_info = workflow_info
self.xcom_push_flag = xcom_push
def execute(self, context):
# Initialize Variables
deckhand_design_version = None
redeploy_server = None
revision_id = None
# Define task_instance
task_instance = context['task_instance']
# Extract information related to current workflow
# The workflow_info variable will be a dictionary
# that contains information about the workflow such
# as action_id, name and other related parameters
workflow_info = task_instance.xcom_pull(
task_ids='action_xcom', key='action',
dag_id=self.main_dag_name)
# Logs uuid of action performed by the Operator
logging.info("DeckHand Operator for action %s", workflow_info['id'])
# Retrieve information of the server that we want to redeploy if user
# executes the 'redeploy_server' dag
if workflow_info['dag_id'] == 'redeploy_server':
redeploy_server = workflow_info['parameters'].get('server-name')
if redeploy_server:
logging.info("Server to be redeployed is %s", redeploy_server)
else:
raise AirflowException('Unable to retrieve information of '
'node to be redeployed!')
# Retrieve Endpoint Information
svc_type = 'deckhand'
deckhand_svc_endpoint = ucp_service_endpoint(self,
svc_type=svc_type)
logging.info("Deckhand endpoint is %s", deckhand_svc_endpoint)
# Deckhand API Call
# Retrieve Design Version from DeckHand
if self.action == 'deckhand_get_design_version':
# Retrieve DeckHand Design Version
deckhand_design_version = self.deckhand_get_design(
deckhand_svc_endpoint)
if deckhand_design_version:
return deckhand_design_version
else:
raise AirflowException('Failed to retrieve revision ID!')
# Retrieve revision_id from xcom
# Note that in the case of 'deploy_site', the dag_id will
# be 'deploy_site.deckhand_get_design_version' for the
# 'deckhand_get_design_version' task. We need to extract
# the xcom value from it in order to get the value of the
# last committed revision ID
revision_id = task_instance.xcom_pull(
task_ids='deckhand_get_design_version',
dag_id=self.main_dag_name + '.deckhand_get_design_version')
logging.info("Revision ID is %d", revision_id)
# Retrieve Rendered Document from DeckHand
if self.action == 'shipyard_retrieve_rendered_doc':
if revision_id:
self.retrieve_rendered_doc(context,
revision_id)
else:
raise AirflowException('Invalid revision ID!')
# Validate Design using DeckHand
elif self.action == 'deckhand_validate_site_design':
if revision_id:
self.deckhand_validate_site(deckhand_svc_endpoint,
revision_id)
else:
raise AirflowException('Invalid revision ID!')
# No action to perform
else:
logging.info('No Action to Perform')
@shipyard_service_token
def deckhand_get_design(self, deckhand_svc_endpoint):
# Retrieve Keystone Token and assign to X-Auth-Token Header
x_auth_token = {"X-Auth-Token": self.svc_token}
# Form Revision Endpoint
revision_endpoint = os.path.join(deckhand_svc_endpoint,
'revisions')
# Retrieve Revision
logging.info("Retrieving revisions information...")
try:
query_params = {'tag': 'committed', 'sort': 'id', 'order': 'desc'}
revisions = yaml.safe_load(requests.get(
revision_endpoint, headers=x_auth_token,
params=query_params, timeout=30).text)
except requests.exceptions.RequestException as e:
raise AirflowException(e)
logging.info(revisions)
# Print the number of revisions that is currently available on
# DeckHand
logging.info("The number of revisions is %s", revisions['count'])
# Initialize Committed Version
committed_ver = None
# Search for the last committed version and save it as xcom
revision_list = revisions.get('results', [])
if revision_list:
committed_ver = revision_list[-1].get('id')
if committed_ver:
logging.info("Last committed revision is %d", committed_ver)
return committed_ver
else:
raise AirflowException("Failed to retrieve committed revision!")
@shipyard_service_token
def deckhand_validate_site(self, deckhand_svc_endpoint, revision_id):
# Retrieve Keystone Token and assign to X-Auth-Token Header
x_auth_token = {"X-Auth-Token": self.svc_token}
# Form Validation Endpoint
validation_endpoint = os.path.join(deckhand_svc_endpoint,
'revisions',
str(revision_id),
'validations')
logging.info(validation_endpoint)
# Retrieve Validation list
logging.info("Retrieving validation list...")
try:
retrieved_list = yaml.safe_load(
requests.get(validation_endpoint, headers=x_auth_token,
timeout=30).text)
except requests.exceptions.RequestException as e:
raise AirflowException(e)
logging.info(retrieved_list)
# Initialize Validation Status
validation_status = True
# Construct validation_list
validation_list = retrieved_list.get('results', [])
# Assigns 'False' to validation_status if result status
# is 'failure'
for validation in validation_list:
if validation.get('status') == 'failure':
validation_status = False
break
if validation_status:
logging.info("Revision %d has been successfully validated",
revision_id)
else:
raise AirflowException("DeckHand Site Design Validation Failed!")
def retrieve_rendered_doc(self, context, revision_id):
# Initialize Variables
auth = None
keystone_auth = {}
rendered_doc = []
sess = None
# Read and parse shiyard.conf
config = configparser.ConfigParser()
config.read(self.shipyard_conf)
# Construct Session Argument
for attr in ('auth_url', 'password', 'project_domain_name',
'project_name', 'username', 'user_domain_name'):
keystone_auth[attr] = config.get('keystone_authtoken', attr)
# Set up keystone session
auth = keystone_v3.Password(**keystone_auth)
sess = keystone_session.Session(auth=auth)
logging.info("Setting up DeckHand Client...")
# Set up DeckHand Client
# NOTE: The communication between the Airflow workers and Deckhand
# happens via the 'internal' endpoint and not the 'public' endpoint.
# Hence we will need to override the 'endpoint_type' from the default
# 'public' endpoint to 'internal' endpoint.
deckhandclient = deckhand_client.Client(session=sess,
endpoint_type='internal')
logging.info("Retrieving Rendered Document...")
# Retrieve Rendered Document
try:
rendered_doc = deckhandclient.revisions.documents(revision_id,
rendered=True)
logging.info("Successfully Retrieved Rendered Document")
logging.info(rendered_doc)
except:
raise AirflowException("Failed to Retrieve Rendered Document!")
class DeckhandOperatorPlugin(AirflowPlugin):
name = 'deckhand_operator_plugin'
operators = [DeckhandOperator]

View File

@ -0,0 +1,52 @@
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from airflow.plugins_manager import AirflowPlugin
from airflow.exceptions import AirflowException
from deckhand_base_operator import DeckhandBaseOperator
class DeckhandRetrieveRenderedDocOperator(DeckhandBaseOperator):
"""Deckhand Retrieve Rendered Doc Operator
This operator will trigger deckhand to retrieve rendered doc
"""
def do_execute(self):
logging.info("Retrieving Rendered Document...")
# Retrieve Rendered Document
try:
rendered_doc = self.deckhandclient.revisions.documents(
self.revision_id, rendered=True)
logging.info("Successfully Retrieved Rendered Document")
logging.info(rendered_doc)
except:
raise AirflowException("Failed to Retrieve Rendered Document!")
class DeckhandRetrieveRenderedDocOperatorPlugin(AirflowPlugin):
"""Creates DeckhandRetrieveRenderedDocOperator in Airflow."""
name = 'deckhand_retrieve_rendered_doc_operator'
operators = [DeckhandRetrieveRenderedDocOperator]

View File

@ -0,0 +1,77 @@
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
import requests
import yaml
from airflow.plugins_manager import AirflowPlugin
from airflow.exceptions import AirflowException
from deckhand_base_operator import DeckhandBaseOperator
class DeckhandValidateSiteDesignOperator(DeckhandBaseOperator):
"""Deckhand Validate Site Design Operator
This operator will trigger deckhand to validate the
site design YAMLs
"""
def do_execute(self):
# Retrieve Keystone Token and assign to X-Auth-Token Header
x_auth_token = {"X-Auth-Token": self.svc_token}
# Form Validation Endpoint
validation_endpoint = os.path.join(self.deckhand_svc_endpoint,
'revisions',
str(self.revision_id),
'validations')
# Retrieve Validation list
logging.info("Retrieving validation list...")
try:
retrieved_list = yaml.safe_load(
requests.get(validation_endpoint,
headers=x_auth_token,
timeout=self.validation_read_timeout).text)
except requests.exceptions.RequestException as e:
raise AirflowException(e)
# Assigns Boolean 'False' to validation_status if result
# status is 'failure'
if (any([v.get('status') == 'failure'
for v in retrieved_list.get('results', [])])):
validation_status = False
else:
validation_status = True
if validation_status:
logging.info("Revision %d has been successfully validated",
self.revision_id)
else:
raise AirflowException("DeckHand Site Design Validation Failed!")
class DeckhandValidateSiteDesignOperatorPlugin(AirflowPlugin):
"""Creates DeckhandValidateSiteDesignOperator in Airflow."""
name = 'deckhand_validate_site_design_operator'
operators = [DeckhandValidateSiteDesignOperator]

View File

@ -44,7 +44,7 @@ def shipyard_service_token(func):
retry = 0
# Retrieve Keystone Session
sess = ucp_keystone_session(self)
self.svc_session = ucp_keystone_session(self)
# We will allow 1 retry in getting the Keystone Token with a
# backoff interval of 10 seconds in case there is a temporary
@ -53,7 +53,8 @@ def shipyard_service_token(func):
while retry <= 1:
# Retrieve Keystone Token
logging.info("Get Keystone Token")
self.svc_token = sess.get_auth_headers().get('X-Auth-Token')
self.svc_token = self.svc_session.get_auth_headers().get(
'X-Auth-Token')
# Retry if we fail to get the keystone token
if self.svc_token: