Update DryDock Operator

There is a need to pass workflow-related information in the form
of JSON string to Airflow via the ACTION API call from Shipyard.
In this way, we will be able to keep track/correlate actions
performed via Shipyard as well as to pass in the parameters that
we want Airflow to consume for its workflow.

A sample JSON string that we will pass in via Shipyard using the
Action API call will be similar to the following:

{
  "action": {
    "id": "1234567890",
    "name": "node_filter",
    "parameters": {
      "servername": "node2,node3"
    }
  }
}

This patch set is meant to update the DryDock Operator and its
related dags in order to support this implementation

The node_filter information will be passed in via the Shipyard
API call instead

Change-Id: I324959d5f42c98c5fcb1ac44c9677f818bfebbc1
This commit is contained in:
Anthony Lin 2017-09-01 18:57:16 +00:00
parent d8213e33fc
commit e326a732b3
4 changed files with 42 additions and 19 deletions

View File

@ -1,13 +1,14 @@
[base] [base]
web_server=http://localhost:32080 web_server=http://localhost:32080
postgresql_db = postgresql+psycopg2://postgresql.ucp:5432/shipyard
postgresql_airflow_db = postgresql+psycopg2://postgresql.ucp:5432/airflow
[drydock] [drydock]
host=drydock-api.drydock host=drydock-api.ucp
port=32768 port=32768
token=bigboss token=bigboss
site_yaml=/usr/local/airflow/plugins/drydock.yaml site_yaml=/usr/local/airflow/plugins/drydock.yaml
prom_yaml=/usr/local/airflow/plugins/promenade.yaml prom_yaml=/usr/local/airflow/plugins/promenade.yaml
k8_masters=k8_master_node02,k8_master_node03
[keystone] [keystone]
OS_AUTH_URL=http://keystone-api.openstack:80/v3 OS_AUTH_URL=http://keystone-api.openstack:80/v3

View File

@ -18,7 +18,6 @@ import configparser
from airflow import DAG from airflow import DAG
from airflow.operators import DryDockOperator from airflow.operators import DryDockOperator
def sub_dag(parent_dag_name, child_dag_name, args, schedule_interval): def sub_dag(parent_dag_name, child_dag_name, args, schedule_interval):
dag = DAG( dag = DAG(
'%s.%s' % (parent_dag_name, child_dag_name), '%s.%s' % (parent_dag_name, child_dag_name),
@ -41,11 +40,6 @@ def sub_dag(parent_dag_name, child_dag_name, args, schedule_interval):
drydock_conf = config.get('drydock', 'site_yaml') drydock_conf = config.get('drydock', 'site_yaml')
promenade_conf = config.get('drydock', 'prom_yaml') promenade_conf = config.get('drydock', 'prom_yaml')
# Convert to Dictionary
k8_masters_sting = config.get('drydock', 'k8_masters')
k8_masters_list = k8_masters_sting.split(',')
k8_masters = {'node_names': k8_masters_list}
# Create Drydock Client # Create Drydock Client
t1 = DryDockOperator( t1 = DryDockOperator(
task_id='create_drydock_client', task_id='create_drydock_client',
@ -92,14 +86,12 @@ def sub_dag(parent_dag_name, child_dag_name, args, schedule_interval):
t7 = DryDockOperator( t7 = DryDockOperator(
task_id='drydock_prepare_node', task_id='drydock_prepare_node',
action='prepare_node', action='prepare_node',
node_filter=k8_masters,
dag=dag) dag=dag)
# Deploy Node # Deploy Node
t8 = DryDockOperator( t8 = DryDockOperator(
task_id='drydock_deploy_node', task_id='drydock_deploy_node',
action='deploy_node', action='deploy_node',
node_filter=k8_masters,
dag=dag) dag=dag)
# Define dependencies # Define dependencies

View File

@ -17,6 +17,7 @@
import airflow import airflow
from airflow import DAG from airflow import DAG
from datetime import timedelta from datetime import timedelta
from airflow.operators.python_operator import PythonOperator
from airflow.operators.subdag_operator import SubDagOperator from airflow.operators.subdag_operator import SubDagOperator
from drydock_operator_child import sub_dag from drydock_operator_child import sub_dag
@ -40,9 +41,24 @@ main_dag = DAG(
max_active_runs=1 max_active_runs=1
) )
# Define push function to store the content of 'action' that is
# defined via 'dag_run' in XCOM so that it can be used by the
# DryDock Operators
def push(**kwargs):
# Pushes action XCom
kwargs['ti'].xcom_push(key='action',
value=kwargs['dag_run'].conf['action'])
action_xcom = PythonOperator(
task_id='action_xcom', dag=main_dag, python_callable=push)
subdag = SubDagOperator( subdag = SubDagOperator(
subdag=sub_dag(parent_dag_name, child_dag_name, args, subdag=sub_dag(parent_dag_name, child_dag_name, args,
main_dag.schedule_interval), main_dag.schedule_interval),
task_id=child_dag_name, task_id=child_dag_name,
default_args=args, default_args=args,
dag=main_dag) dag=main_dag)
# Set dependencies
subdag.set_upstream(action_xcom)

View File

@ -37,9 +37,9 @@ class DryDockOperator(BaseOperator):
:shipyard_conf: Location of shipyard.conf :shipyard_conf: Location of shipyard.conf
:drydock_conf: Location of drydock YAML :drydock_conf: Location of drydock YAML
:promenade_conf: Location of promenade YAML :promenade_conf: Location of promenade YAML
:node_filter: Valid fields are 'node_names','rack_names','node_tags'
:action: Task to perform :action: Task to perform
:design_id: DryDock Design ID :design_id: DryDock Design ID
:workflow_info: Information related to the workflow
""" """
@apply_defaults @apply_defaults
def __init__(self, def __init__(self,
@ -48,10 +48,10 @@ class DryDockOperator(BaseOperator):
token=None, token=None,
action=None, action=None,
design_id=None, design_id=None,
node_filter=None,
shipyard_conf=None, shipyard_conf=None,
drydock_conf=None, drydock_conf=None,
promenade_conf=None, promenade_conf=None,
workflow_info={},
xcom_push=True, xcom_push=True,
*args, **kwargs): *args, **kwargs):
@ -64,10 +64,21 @@ class DryDockOperator(BaseOperator):
self.promenade_conf = promenade_conf self.promenade_conf = promenade_conf
self.action = action self.action = action
self.design_id = design_id self.design_id = design_id
self.node_filter = node_filter self.workflow_info = workflow_info
self.xcom_push_flag = xcom_push self.xcom_push_flag = xcom_push
def execute(self, context): def execute(self, context):
# Define task_instance
task_instance = context['task_instance']
# Extract information related to current workflow
# The workflow_info variable will be a dictionary
# that contains information about the workflow such
# as action_id, name and other related parameters
workflow_info = task_instance.xcom_pull(
task_ids='action_xcom', key='action',
dag_id='drydock_operator_parent')
# DrydockClient # DrydockClient
if self.action == 'create_drydock_client': if self.action == 'create_drydock_client':
drydock_client = self.drydock_session_client(context) drydock_client = self.drydock_session_client(context)
@ -75,8 +86,6 @@ class DryDockOperator(BaseOperator):
return drydock_client return drydock_client
# Retrieve drydock_client via XCOM so as to perform other tasks # Retrieve drydock_client via XCOM so as to perform other tasks
task_instance = context['task_instance']
drydock_client = task_instance.xcom_pull( drydock_client = task_instance.xcom_pull(
task_ids='create_drydock_client', task_ids='create_drydock_client',
dag_id='drydock_operator_parent.drydock_operator_child') dag_id='drydock_operator_parent.drydock_operator_child')
@ -160,7 +169,7 @@ class DryDockOperator(BaseOperator):
self.perform_task = 'prepare_node' self.perform_task = 'prepare_node'
prepare_node = self.drydock_perform_task( prepare_node = self.drydock_perform_task(
drydock_client, context, drydock_client, context,
self.perform_task, self.node_filter) self.perform_task, workflow_info)
# Define variables # Define variables
# Query every 30 seconds for 30 minutes # Query every 30 seconds for 30 minutes
@ -191,7 +200,7 @@ class DryDockOperator(BaseOperator):
deploy_node = self.drydock_perform_task(drydock_client, deploy_node = self.drydock_perform_task(drydock_client,
context, context,
self.perform_task, self.perform_task,
self.node_filter) workflow_info)
# Define variables # Define variables
# Query every 30 seconds for 60 minutes # Query every 30 seconds for 60 minutes
@ -351,7 +360,7 @@ class DryDockOperator(BaseOperator):
logging.info(load_design) logging.info(load_design)
def drydock_perform_task(self, drydock_client, context, def drydock_perform_task(self, drydock_client, context,
perform_task, node_filter): perform_task, workflow_info):
# Get Design ID and pass it to DryDock # Get Design ID and pass it to DryDock
self.design_id = self.get_design_id(context) self.design_id = self.get_design_id(context)
@ -360,7 +369,12 @@ class DryDockOperator(BaseOperator):
task_to_perform = self.perform_task task_to_perform = self.perform_task
# Node Filter # Node Filter
nodes_filter = self.node_filter if workflow_info:
nodes_filter = workflow_info['parameters']['servername']
else:
nodes_filter = None
logging.info("Nodes Filter List: %s", nodes_filter)
# Get uuid of the create_task's id # Get uuid of the create_task's id
self.task_id = drydock_client.create_task(self.design_id, self.task_id = drydock_client.create_task(self.design_id,