Shipyard Armada Operator

- Uses armada_client
- Uses keystone token
- Included error handling for workflow
- Covers the following flow:
  1. Set up Armada Client/Session
  2. Armada Status
  3. Armada Validate
  4. Armada Apply
  5. Armada Get Releases

Note that Armada and DeckHand integration is ongoing
at the moment.  We will be reading the armada.yaml
directly from Airflow and sending it to Armada for
this Patch Set.  We will update the Armada Operator
once the integration betwen Armada and DeckHand is
completed.

Change-Id: I53b9257f1d5c4b443989cd0cc8154dd51f7d4168
This commit is contained in:
Anthony Lin 2017-10-10 03:02:41 +00:00
parent d1b2178340
commit e7239e2a86
3 changed files with 385 additions and 2 deletions

View File

@ -0,0 +1,107 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from airflow.models import DAG
from airflow.operators import ArmadaOperator
from airflow.operators.subdag_operator import SubDagOperator
# Location of shiyard.conf
config_path = '/usr/local/airflow/plugins/shipyard.conf'
# Names used for sub-subdags in the armada site deployment subdag
CREATE_ARMADA_CLIENT_DAG_NAME = 'create_armada_client'
GET_ARMADA_STATUS_DAG_NAME = 'armada_status'
ARMADA_VALIDATE_DAG_NAME = 'armada_validate'
ARMADA_APPLY_DAG_NAME = 'armada_apply'
ARMADA_GET_RELEASES_DAG_NAME = 'armada_get_releases'
def get_armada_subdag_step(parent_dag_name, child_dag_name, args):
'''
Execute Armada Subdag
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args)
# Note that in the event where the 'deploy_site' Action is
# triggered from Shipyard, the 'parent_dag_name' variable
# gets assigned with 'deploy_site.create_armada_client'.
# This is the name that we want to assign to the subdag so
# that we can reference it for xcom. The name of the main
# dag will be the front part of that value, i.e. 'deploy_site'.
# Hence we will extract the front part and assign it to main_dag.
# We will reuse this pattern for other Actions, e.g. update_site,
# redeploy_site as well.
operator = ArmadaOperator(
task_id=child_dag_name,
shipyard_conf=config_path,
action=child_dag_name,
main_dag_name=parent_dag_name[0:parent_dag_name.find('.')],
sub_dag_name=parent_dag_name,
dag=dag)
return dag
def deploy_site_armada(parent_dag_name, child_dag_name, args):
'''
Puts into atomic unit
'''
dag = DAG(
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args)
armada_client = SubDagOperator(
subdag=get_armada_subdag_step(dag.dag_id,
CREATE_ARMADA_CLIENT_DAG_NAME,
args),
task_id=CREATE_ARMADA_CLIENT_DAG_NAME,
dag=dag)
armada_status = SubDagOperator(
subdag=get_armada_subdag_step(dag.dag_id,
GET_ARMADA_STATUS_DAG_NAME,
args),
task_id=GET_ARMADA_STATUS_DAG_NAME,
dag=dag)
armada_validate = SubDagOperator(
subdag=get_armada_subdag_step(dag.dag_id,
ARMADA_VALIDATE_DAG_NAME,
args),
task_id=ARMADA_VALIDATE_DAG_NAME,
dag=dag)
armada_apply = SubDagOperator(
subdag=get_armada_subdag_step(dag.dag_id,
ARMADA_APPLY_DAG_NAME,
args),
task_id=ARMADA_APPLY_DAG_NAME,
dag=dag)
armada_get_releases = SubDagOperator(
subdag=get_armada_subdag_step(dag.dag_id,
ARMADA_GET_RELEASES_DAG_NAME,
args),
task_id=ARMADA_GET_RELEASES_DAG_NAME,
dag=dag)
# DAG Wiring
armada_status.set_upstream(armada_client)
armada_validate.set_upstream(armada_status)
armada_apply.set_upstream(armada_validate)
armada_get_releases.set_upstream(armada_apply)
return dag

View File

@ -23,6 +23,7 @@ from airflow.operators.subdag_operator import SubDagOperator
from deckhand_get_design import get_design_deckhand
from drydock_deploy_site import deploy_site_drydock
from armada_deploy_site import deploy_site_armada
from preflight_checks import all_preflight_checks
from validate_site_design import validate_site_design
"""
@ -36,6 +37,7 @@ ALL_PREFLIGHT_CHECKS_DAG_NAME = 'preflight'
DECKHAND_GET_DESIGN_VERSION = 'deckhand_get_design_version'
VALIDATE_SITE_DESIGN_DAG_NAME = 'validate_site_design'
DRYDOCK_BUILD_DAG_NAME = 'drydock_build'
ARMADA_BUILD_DAG_NAME = 'armada_build'
default_args = {
'owner': 'airflow',
@ -104,8 +106,10 @@ query_node_status = PlaceholderOperator(
on_failure_callback=failure_handlers.step_failure_handler,
dag=dag)
armada_build = PlaceholderOperator(
task_id='armada_build',
armada_build = SubDagOperator(
subdag=deploy_site_armada(
PARENT_DAG_NAME, ARMADA_BUILD_DAG_NAME, args=default_args),
task_id=ARMADA_BUILD_DAG_NAME,
on_failure_callback=failure_handlers.step_failure_handler,
dag=dag)

View File

@ -0,0 +1,272 @@
# Copyright 2017 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from urllib.parse import urlparse
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
from airflow.plugins_manager import AirflowPlugin
from airflow.exceptions import AirflowException
import armada.common.client as client
import armada.common.session as session
from get_k8s_pod_port_ip import get_pod_port_ip
from service_endpoint import ucp_service_endpoint
from service_token import shipyard_service_token
class ArmadaOperator(BaseOperator):
"""
Supports interaction with Armada
:param action: Task to perform
:param main_dag_name: Parent Dag
:param shipyard_conf: Location of shipyard.conf
:param sub_dag_name: Child Dag
"""
@apply_defaults
def __init__(self,
action=None,
main_dag_name=None,
shipyard_conf=None,
sub_dag_name=None,
workflow_info={},
xcom_push=True,
*args, **kwargs):
super(ArmadaOperator, self).__init__(*args, **kwargs)
self.action = action
self.main_dag_name = main_dag_name
self.shipyard_conf = shipyard_conf
self.sub_dag_name = sub_dag_name
self.workflow_info = workflow_info
self.xcom_push_flag = xcom_push
def execute(self, context):
# Initialize Variables
context['svc_type'] = 'armada'
armada_client = None
# Define task_instance
task_instance = context['task_instance']
# Extract information related to current workflow
# The workflow_info variable will be a dictionary
# that contains information about the workflow such
# as action_id, name and other related parameters
workflow_info = task_instance.xcom_pull(
task_ids='action_xcom', key='action',
dag_id=self.main_dag_name)
# Logs uuid of action performed by the Operator
logging.info("Armada Operator for action %s", workflow_info['id'])
# Create Armada Client
if self.action == 'create_armada_client':
# Retrieve Endpoint Information
context['svc_endpoint'] = ucp_service_endpoint(self, context)
logging.info("Armada endpoint is %s", context['svc_endpoint'])
# Set up Armada Client
session_client = self.armada_session_client(context)
return session_client
# Retrieve armada_client via XCOM so as to perform other tasks
armada_client = task_instance.xcom_pull(
task_ids='create_armada_client',
dag_id=self.sub_dag_name + '.create_armada_client')
# Armada API Call
# Armada Status
if self.action == 'armada_status':
# Retrieve Tiller Information and assign to context 'query'
context['query'] = self.get_tiller_info(context)
self.get_armada_status(context, armada_client)
# Armada Validate
elif self.action == 'armada_validate':
self.armada_validate(context, armada_client)
# Armada Apply
elif self.action == 'armada_apply':
# Retrieve Tiller Information and assign to context 'query'
context['query'] = self.get_tiller_info(context)
self.armada_apply(context, armada_client)
# Armada Get Releases
elif self.action == 'armada_get_releases':
# Retrieve Tiller Information and assign to context 'query'
context['query'] = self.get_tiller_info(context)
self.armada_get_releases(context, armada_client)
else:
logging.info('No Action to Perform')
@shipyard_service_token
def armada_session_client(self, context):
# Initialize Variables
armada_url = None
a_session = None
a_client = None
# Parse Armada Service Endpoint
armada_url = urlparse(context['svc_endpoint'])
# Build a ArmadaSession with credentials and target host
# information.
logging.info("Build Armada Session")
a_session = session.ArmadaSession(armada_url.hostname,
port=armada_url.port,
token=context['svc_token'])
# Raise Exception if we are not able to get armada session
if a_session:
logging.info("Successfully Set Up Armada Session")
else:
raise AirflowException("Failed to set up Armada Session!")
# Use session to build a ArmadaClient to make one or more
# API calls. The ArmadaSession will care for TCP connection
# pooling and header management
logging.info("Create Armada Client")
a_client = client.ArmadaClient(a_session)
# Raise Exception if we are not able to build armada client
if a_client:
logging.info("Successfully Set Up Armada client")
else:
raise AirflowException("Failed to set up Armada client!")
# Return Armada client for XCOM Usage
return a_client
@get_pod_port_ip('tiller')
def get_tiller_info(self, context, *args):
# Initialize Variable
query = {}
# Get IP and port information of Pods from context
k8s_pods_ip_port = context['pods_ip_port']
# Assign value to the 'query' dictionary so that we can pass
# it via the Armada Client
query['tiller_host'] = k8s_pods_ip_port['tiller'].get('ip')
query['tiller_port'] = k8s_pods_ip_port['tiller'].get('port')
return query
def get_armada_status(self, context, armada_client):
# Check State of Tiller
armada_status = armada_client.get_status(context['query'])
# Tiller State will return boolean value, i.e. True/False
# Raise Exception if Tiller is in a bad state
if armada_status['tiller']['state']:
logging.info("Tiller is in running state")
logging.info("Tiller version is %s",
armada_status['tiller']['version'])
else:
raise AirflowException("Please check Tiller!")
def armada_validate(self, context, armada_client):
# Initialize Variables
armada_manifest = None
valid_armada_yaml = {}
# At this point in time, testing of the operator is being done by
# reading the armada.yaml file on airflow and feeding it to Armada as
# a string. We will assume that the file name is fixed and will always
# be 'armada_site.yaml'. This will change in the near future when
# Armada is integrated with DeckHand.
yaml_path = '/usr/local/airflow/plugins/armada_site.yaml'
# TODO: We will implement the new approach when Armada and DeckHand
# integration is completed.
with open(yaml_path, 'r') as armada_yaml:
armada_manifest = armada_yaml.read()
# Validate armada yaml file
logging.info("Armada Validate")
valid_armada_yaml = armada_client.post_validate(armada_manifest)
# The response will be a dictionary indicating whether the yaml
# file is valid or invalid. We will check the Boolean value in
# this case.
if valid_armada_yaml['valid']:
logging.info("Armada Yaml File is Valid")
else:
raise AirflowException("Invalid Armada Yaml File!")
def armada_apply(self, context, armada_client):
# Initialize Variables
armada_manifest = None
armada_post_apply = {}
override_values = []
chart_set = []
# At this point in time, testing of the operator is being done by
# reading the armada.yaml file on airflow and feeding it to Armada as
# a string. We will assume that the file name is fixed and will always
# be 'armada_site.yaml'. This will change in the near future when
# Armada is integrated with DeckHand.
yaml_path = '/usr/local/airflow/plugins/armada_site.yaml'
# TODO: We will implement the new approach when Armada and DeckHand
# integration is completed. Override and chart_set will be considered
# at that time.
with open(yaml_path, 'r') as armada_yaml:
armada_manifest = armada_yaml.read()
# Execute Armada Apply to install the helm charts in sequence
logging.info("Armada Apply")
armada_post_apply = armada_client.post_apply(armada_manifest,
override_values,
chart_set,
context['query'])
# We will expect Armada to return the releases that it is
# deploying. An empty value for 'install' means that armada
# delploy has failed. Note that if we try and deploy the same
# release twice, we will end up with empty response on our
# second attempt and that will be treated as a failure scenario.
if armada_post_apply['message']['install']:
logging.info("Armada Apply Successfully Executed")
logging.info(armada_post_apply)
else:
logging.info(armada_post_apply)
raise AirflowException("Armada Apply Failed!")
def armada_get_releases(self, context, armada_client):
# Initialize Variables
armada_releases = {}
# Retrieve Armada Releases after deployment
logging.info("Retrieving Armada Releases after deployment..")
armada_releases = armada_client.get_releases(context['query'])
if armada_releases:
logging.info("Retrieved current Armada Releases")
logging.info(armada_releases)
else:
raise AirflowException("Failed to retrieve Armada Releases")
class ArmadaOperatorPlugin(AirflowPlugin):
name = 'armada_operator_plugin'
operators = [ArmadaOperator]