Shipyard DryDock Operator

- Uses drydock_client
- Added workflow to get keystone token
- Added error handling for workflow
- Covers the following flow
  1. create_drydock_client
  2. get_design_id
  3. load_parts (drydock)
  4. load_parts (promenade)
  5. verify_site
  6. prepare_site
  7. prepare_node
  8. deploy_node

Current testing is done against a drydock container
on local host machine using hard-coded token as the
DryDock Helm Chart with Keystone integration is being
reviewed at the moment.  Integration testing with keystone
token will be done when the DryDock Chart is merged.

Change-Id: Ic5eda099717de6f948afacf9afa07ea5dad3ba7f
This commit is contained in:
Anthony Lin 2017-08-12 04:46:00 +00:00
parent 065bf9259a
commit 4635bb232e
4 changed files with 589 additions and 0 deletions

View File

@ -1,3 +1,20 @@
[base]
web_server=http://localhost:32080
[drydock]
host=drydock-api.drydock
port=32768
token=bigboss
site_yaml=/usr/local/airflow/plugins/drydock.yaml
prom_yaml=/usr/local/airflow/plugins/promenade.yaml
k8_masters=k8_master_node02,k8_master_node03
[keystone]
OS_AUTH_URL=http://keystone-api.openstack:80/v3
OS_PROJECT_NAME=service
OS_USER_DOMAIN_NAME=Default
OS_USERNAME=shipyard
OS_PASSWORD=password
OS_REGION_NAME=RegionOne
OS_IDENTITY_API_VERSION=3

View File

@ -0,0 +1,117 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
### DryDock Operator Child Dag
"""
import airflow
import configparser
from airflow import DAG
from airflow.operators import DryDockOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
def sub_dag(parent_dag_name, child_dag_name, args, schedule_interval):
dag = DAG(
'%s.%s' % (parent_dag_name, child_dag_name),
default_args=args,
start_date=args['start_date'],
max_active_runs=1,
)
# Location of shiyard.conf
config_path = '/usr/local/airflow/plugins/shipyard.conf'
# Read and parse shiyard.conf
config = configparser.ConfigParser()
config.read(config_path)
# Define Variables
drydock_target_host = config.get('drydock', 'host')
drydock_port = config.get('drydock', 'port')
drydock_token = config.get('drydock', 'token')
drydock_conf = config.get('drydock', 'site_yaml')
promenade_conf = config.get('drydock', 'prom_yaml')
# Convert to Dictionary
k8_masters_sting = config.get('drydock', 'k8_masters')
k8_masters_list = k8_masters_sting.split(',')
k8_masters = {'node_names': k8_masters_list}
# Create Drydock Client
t1 = DryDockOperator(
task_id='create_drydock_client',
host=drydock_target_host,
port=drydock_port,
token=drydock_token,
shipyard_conf=config_path,
action='create_drydock_client',
dag=dag)
# Get Design ID
t2 = DryDockOperator(
task_id='drydock_get_design_id',
action='get_design_id',
dag=dag)
# DryDock Load Parts
t3 = DryDockOperator(
task_id='drydock_load_parts',
drydock_conf=drydock_conf,
action='drydock_load_parts',
dag=dag)
# Promenade Load Parts
t4 = DryDockOperator(
task_id='promenade_load_parts',
promenade_conf=promenade_conf,
action='promenade_load_parts',
dag=dag)
# Verify Site
t5 = DryDockOperator(
task_id='drydock_verify_site',
action='verify_site',
dag=dag)
# Prepare Site
t6 = DryDockOperator(
task_id='drydock_prepare_site',
action='prepare_site',
dag=dag)
# Prepare Node
t7 = DryDockOperator(
task_id='drydock_prepare_node',
action='prepare_node',
node_filter=k8_masters,
dag=dag)
# Deploy Node
t8 = DryDockOperator(
task_id='drydock_deploy_node',
action='deploy_node',
node_filter=k8_masters,
dag=dag)
# Define dependencies
t2.set_upstream(t1)
t3.set_upstream(t2)
t4.set_upstream(t3)
t5.set_upstream(t4)
t6.set_upstream(t5)
t7.set_upstream(t6)
t8.set_upstream(t7)
return dag

View File

@ -0,0 +1,47 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
### DryDock Operator Parent Dag
"""
import airflow
from airflow import DAG
from datetime import datetime, timedelta
from airflow.operators.subdag_operator import SubDagOperator
from drydock_operator_child import sub_dag
parent_dag_name = 'drydock_operator_parent'
child_dag_name = 'drydock_operator_child'
args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(1),
'retries': 0,
'retry_delay': timedelta(minutes=1),
'provide_context': True
}
main_dag = DAG(
dag_id=parent_dag_name,
default_args=args,
schedule_interval=None,
start_date=airflow.utils.dates.days_ago(1),
max_active_runs=1
)
subdag = SubDagOperator(
subdag=sub_dag(parent_dag_name, child_dag_name, args, main_dag.schedule_interval),
task_id=child_dag_name,
default_args=args,
dag=main_dag)

View File

@ -0,0 +1,408 @@
# -*- coding: utf-8 -*-
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import subprocess
import sys
import os
import time
import re
import configparser
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
import drydock_provisioner.drydock_client.client as client
import drydock_provisioner.drydock_client.session as session
class DryDockOperator(BaseOperator):
"""
DryDock Client
:host: Target Host
:port: DryDock Port
:token: DryDock Token
:shipyard_conf: Location of shipyard.conf
:drydock_conf: Location of drydock YAML
:promenade_conf: Location of promenade YAML
:node_filter: Valid fields are 'node_names','rack_names','node_tags'
:action: Task to perform
:design_id: DryDock Design ID
"""
@apply_defaults
def __init__(self,
host=None,
port=None,
token=None,
action=None,
design_id=None,
node_filter=None,
shipyard_conf=None,
drydock_conf=None,
promenade_conf=None,
xcom_push=True,
*args, **kwargs):
super(DryDockOperator, self).__init__(*args, **kwargs)
self.host = host
self.port = port
self.token = token
self.shipyard_conf = shipyard_conf
self.drydock_conf = drydock_conf
self.promenade_conf = promenade_conf
self.action = action
self.design_id = design_id
self.node_filter = node_filter
self.xcom_push_flag = xcom_push
def execute(self, context):
# DrydockClient
if self.action == 'create_drydock_client':
drydock_client = self.drydock_session_client(context)
return drydock_client
# Retrieve drydock_client via XCOM so as to perform other tasks
task_instance = context['task_instance']
drydock_client = task_instance.xcom_pull(
task_ids='create_drydock_client',
dag_id='drydock_operator_parent.drydock_operator_child')
# Get Design ID
if self.action == 'get_design_id':
design_id = self.drydock_create_design(drydock_client)
return design_id
# DryDock Load Parts
elif self.action == 'drydock_load_parts':
self.parts_type = 'drydock'
self.load_parts(drydock_client, context, self.parts_type)
# Promenade Load Parts
elif self.action == 'promenade_load_parts':
self.parts_type = 'promenade'
self.load_parts(drydock_client, context, self.parts_type)
# Create Task for verify_site
elif self.action == 'verify_site':
self.perform_task = 'verify_site'
verify_site = self.drydock_perform_task(drydock_client, context,
self.perform_task, None)
# Define variables
# Query every 10 seconds for 1 minute
interval = 10
time_out = 60
desired_state = 'success'
# Query verify_site Task
task_id = verify_site['task_id']
logging.info(task_id)
verify_site_status = self.drydock_query_task(drydock_client,
interval,
time_out,
task_id,
desired_state)
if verify_site_status == 'timed_out':
raise AirflowException('Verify_Site Task Timed Out!')
elif verify_site_status == 'task_failed':
raise AirflowException('Verify_Site Task Failed!')
else:
logging.info('Verify Site Task:')
logging.info(verify_site_status)
# Create Task for prepare_site
elif self.action == 'prepare_site':
self.perform_task = 'prepare_site'
prepare_site = self.drydock_perform_task(drydock_client, context,
self.perform_task, None)
# Define variables
# Query every 10 seconds for 2 minutes
interval = 10
time_out = 120
desired_state = 'partial_success'
# Query prepare_site Task
task_id = prepare_site['task_id']
logging.info(task_id)
prepare_site_status = self.drydock_query_task(drydock_client,
interval,
time_out,
task_id,
desired_state)
if prepare_site_status == 'timed_out':
raise AirflowException('Prepare_Site Task Timed Out!')
elif prepare_site_status == 'task_failed':
raise AirflowException('Prepare_Site Task Failed!')
else:
logging.info('Prepare Site Task:')
logging.info(prepare_site_status)
# Create Task for prepare_node
elif self.action == 'prepare_node':
self.perform_task = 'prepare_node'
prepare_node = self.drydock_perform_task(drydock_client, context,
self.perform_task, self.node_filter)
# Define variables
# Query every 30 seconds for 30 minutes
interval = 30
time_out = 1800
desired_state = 'success'
# Query prepare_node Task
task_id = prepare_node['task_id']
logging.info(task_id)
prepare_node_status = self.drydock_query_task(drydock_client,
interval,
time_out,
task_id,
desired_state)
if prepare_node_status == 'timed_out':
raise AirflowException('Prepare_Node Task Timed Out!')
elif prepare_node_status == 'task_failed':
raise AirflowException('Prepare_Node Task Failed!')
else:
logging.info('Prepare Node Task:')
logging.info(prepare_node_status)
# Create Task for deploy_node
elif self.action == 'deploy_node':
self.perform_task = 'deploy_node'
deploy_node = self.drydock_perform_task(drydock_client, context,
self.perform_task, self.node_filter)
# Define variables
# Query every 30 seconds for 60 minutes
interval = 30
time_out = 3600
desired_state = 'success'
# Query deploy_node Task
task_id = deploy_node['task_id']
logging.info(task_id)
deploy_node_status = self.drydock_query_task(drydock_client,
interval,
time_out,
task_id,
desired_state)
if deploy_node_status == 'timed_out':
raise AirflowException('Deploy_Node Task Timed Out!')
elif deploy_node_status == 'task_failed':
raise AirflowException('Deploy_Node Task Failed!')
else:
logging.info('Deploy Node Task:')
logging.info(deploy_node_status)
else:
logging.info('No Action to Perform')
def keystone_token_get(self, conf_path):
# Read and parse shiyard.conf
config = configparser.ConfigParser()
config.read(conf_path)
# Construct Envrionment variables
for attr in ('OS_AUTH_URL', 'OS_PROJECT_NAME', 'OS_USER_DOMAIN_NAME',
'OS_USERNAME', 'OS_PASSWORD', 'OS_REGION_NAME',
'OS_IDENTITY_API_VERSION'):
os.environ[attr] = config.get('keystone', attr)
# Execute 'openstack token issue' command
logging.info("Get Keystone Token")
keystone_token_output = subprocess.Popen(["openstack", "token", "issue"],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
# Get Keystone Token from output
line = ''
for line in iter(keystone_token_output.stdout.readline, b''):
line = line.strip()
if re.search(r'\bid\b', str(line,'utf-8')):
keystone_token = str(line,'utf-8').split(' |')[1].split(' ')[1]
# Wait for child process to terminate
# Set and return returncode attribute.
keystone_token_output.wait()
logging.info("Command exited with "
"return code {0}".format(keystone_token_output.returncode))
# Raise Execptions if 'openstack token issue' fails to execute
if keystone_token_output.returncode:
raise AirflowException("Unable to get Keystone Token!")
return 'keystone_token_error'
else:
logging.info(keystone_token)
return keystone_token
def drydock_session_client(self, context):
# Retrieve Keystone Token
keystone_token = self.keystone_token_get(self.shipyard_conf)
# Raise Exception and Exit if we are not able to get Keystone
# Token, else continue
if keystone_token == 'keystone_token_error':
raise AirflowException("Unable to get Keystone Token!")
else:
pass
# Build a DrydockSession with credentials and target host information
# Note that hard-coded token will be replaced by keystone_token in near future
logging.info("Build DryDock Session")
dd_session = session.DrydockSession(self.host, port=self.port, token=self.token)
# Raise Exception if we are not able to get a drydock session
if dd_session:
pass
else:
raise AirflowException("Unable to get a drydock session")
# Use session to build a DrydockClient to make one or more API calls
# The DrydockSession will care for TCP connection pooling and header management
logging.info("Create DryDock Client")
dd_client = client.DrydockClient(dd_session)
# Raise Exception if we are not able to build drydock client
if dd_client:
pass
else:
raise AirflowException("Unable to build drydock client")
# Drydock client for XCOM Usage
return dd_client
def drydock_create_design(self, drydock_client):
# Create Design
logging.info('Create Design ID')
drydock_design_id = drydock_client.create_design()
# Raise Exception if we are not able to get a value
# from drydock create_design API call
if drydock_design_id:
return drydock_design_id
else:
raise AirflowException("Unable to create Design ID")
def get_design_id(self, context):
# Get Design ID from XCOM
task_instance = context['task_instance']
design_id = task_instance.xcom_pull(task_ids='drydock_get_design_id',
dag_id='drydock_operator_parent.drydock_operator_child')
return design_id
def load_parts(self, drydock_client, context, parts_type):
# Load new design parts into a design context via YAML conforming to the
# Drydock design YAML schema
# Open drydock.yaml/promenade.yaml as string so that it can be ingested
# This step will change in future when DeckHand is integrated with the system
if self.parts_type == 'drydock':
with open (self.drydock_conf, "r") as drydock_yaml:
yaml_string = drydock_yaml.read()
else:
with open (self.promenade_conf, "r") as promenade_yaml:
yaml_string = promenade_yaml.read()
# Get Design ID and pass it to DryDock
self.design_id = self.get_design_id(context)
# Load Design
# Return Exception if list is empty
logging.info("Load %s Configuration Yaml", self.parts_type)
load_design = drydock_client.load_parts(self.design_id, yaml_string=yaml_string)
if len(load_design) == 0:
raise AirflowException("Empty Design. Please check input Yaml.")
else:
logging.info(load_design)
def drydock_perform_task(self, drydock_client, context, perform_task, node_filter):
# Get Design ID and pass it to DryDock
self.design_id = self.get_design_id(context)
# Task to do
task_to_perform = self.perform_task
# Node Filter
nodes_filter = self.node_filter
# Get uuid of the create_task's id
self.task_id = drydock_client.create_task(self.design_id,
task_to_perform, nodes_filter)
# Get current state/response of the drydock task
task_status = drydock_client.get_task(self.task_id)
# task_status should contain information and be of length > 0
# Raise Exception if that is not the case
if len(task_status) == 0:
raise AirflowException("Unable to get task state")
else:
return task_status
def drydock_query_task(self, drydock_client, interval, time_out,
task_id, desired_state):
# Calculate number of times to execute the 'for' loop
end_range = int(time_out/interval)
# Query task state
for i in range(0,end_range+1):
# Retrieve current task state
task_state = drydock_client.get_task(self.task_id)
logging.info(task_state)
# Return Time Out Exception
if task_state['status'] == 'running' and i == end_range:
logging.info('Timed Out!')
return 'timed_out'
# Exit 'for' loop if task is in 'complete' state
if task_state['status'] == 'complete':
break
else:
time.sleep(interval)
# Get final task state
if task_state['result'] == desired_state:
return drydock_client.get_task(self.task_id)
else:
return 'task_failed'
class DryDockClientPlugin(AirflowPlugin):
name = "drydock_client_plugin"
operators = [DryDockOperator]