Fix PEP8 Issues
Fix pep8 issues based on results from Jenkins Job Change-Id: Ice1602859120efbe87a2587c4c8c035db5e1efc8
This commit is contained in:
parent
83b11008b2
commit
27d6da6b49
|
@ -14,104 +14,101 @@
|
|||
"""
|
||||
### DryDock Operator Child Dag
|
||||
"""
|
||||
import airflow
|
||||
import configparser
|
||||
from airflow import DAG
|
||||
from airflow.operators import DryDockOperator
|
||||
from airflow.operators.bash_operator import BashOperator
|
||||
from datetime import datetime, timedelta
|
||||
|
||||
|
||||
def sub_dag(parent_dag_name, child_dag_name, args, schedule_interval):
|
||||
dag = DAG(
|
||||
'%s.%s' % (parent_dag_name, child_dag_name),
|
||||
default_args=args,
|
||||
start_date=args['start_date'],
|
||||
max_active_runs=1,
|
||||
)
|
||||
dag = DAG(
|
||||
'%s.%s' % (parent_dag_name, child_dag_name),
|
||||
default_args=args,
|
||||
start_date=args['start_date'],
|
||||
max_active_runs=1,
|
||||
)
|
||||
|
||||
# Location of shiyard.conf
|
||||
config_path = '/usr/local/airflow/plugins/shipyard.conf'
|
||||
# Location of shiyard.conf
|
||||
config_path = '/usr/local/airflow/plugins/shipyard.conf'
|
||||
|
||||
# Read and parse shiyard.conf
|
||||
config = configparser.ConfigParser()
|
||||
config.read(config_path)
|
||||
# Read and parse shiyard.conf
|
||||
config = configparser.ConfigParser()
|
||||
config.read(config_path)
|
||||
|
||||
# Define Variables
|
||||
drydock_target_host = config.get('drydock', 'host')
|
||||
drydock_port = config.get('drydock', 'port')
|
||||
drydock_token = config.get('drydock', 'token')
|
||||
drydock_conf = config.get('drydock', 'site_yaml')
|
||||
promenade_conf = config.get('drydock', 'prom_yaml')
|
||||
# Define Variables
|
||||
drydock_target_host = config.get('drydock', 'host')
|
||||
drydock_port = config.get('drydock', 'port')
|
||||
drydock_token = config.get('drydock', 'token')
|
||||
drydock_conf = config.get('drydock', 'site_yaml')
|
||||
promenade_conf = config.get('drydock', 'prom_yaml')
|
||||
|
||||
# Convert to Dictionary
|
||||
k8_masters_sting = config.get('drydock', 'k8_masters')
|
||||
k8_masters_list = k8_masters_sting.split(',')
|
||||
k8_masters = {'node_names': k8_masters_list}
|
||||
# Convert to Dictionary
|
||||
k8_masters_sting = config.get('drydock', 'k8_masters')
|
||||
k8_masters_list = k8_masters_sting.split(',')
|
||||
k8_masters = {'node_names': k8_masters_list}
|
||||
|
||||
# Create Drydock Client
|
||||
t1 = DryDockOperator(
|
||||
task_id='create_drydock_client',
|
||||
host=drydock_target_host,
|
||||
port=drydock_port,
|
||||
token=drydock_token,
|
||||
shipyard_conf=config_path,
|
||||
action='create_drydock_client',
|
||||
dag=dag)
|
||||
# Create Drydock Client
|
||||
t1 = DryDockOperator(
|
||||
task_id='create_drydock_client',
|
||||
host=drydock_target_host,
|
||||
port=drydock_port,
|
||||
token=drydock_token,
|
||||
shipyard_conf=config_path,
|
||||
action='create_drydock_client',
|
||||
dag=dag)
|
||||
|
||||
# Get Design ID
|
||||
t2 = DryDockOperator(
|
||||
task_id='drydock_get_design_id',
|
||||
action='get_design_id',
|
||||
dag=dag)
|
||||
# Get Design ID
|
||||
t2 = DryDockOperator(
|
||||
task_id='drydock_get_design_id',
|
||||
action='get_design_id',
|
||||
dag=dag)
|
||||
|
||||
# DryDock Load Parts
|
||||
t3 = DryDockOperator(
|
||||
task_id='drydock_load_parts',
|
||||
drydock_conf=drydock_conf,
|
||||
action='drydock_load_parts',
|
||||
dag=dag)
|
||||
# DryDock Load Parts
|
||||
t3 = DryDockOperator(
|
||||
task_id='drydock_load_parts',
|
||||
drydock_conf=drydock_conf,
|
||||
action='drydock_load_parts',
|
||||
dag=dag)
|
||||
|
||||
# Promenade Load Parts
|
||||
t4 = DryDockOperator(
|
||||
task_id='promenade_load_parts',
|
||||
promenade_conf=promenade_conf,
|
||||
action='promenade_load_parts',
|
||||
dag=dag)
|
||||
# Promenade Load Parts
|
||||
t4 = DryDockOperator(
|
||||
task_id='promenade_load_parts',
|
||||
promenade_conf=promenade_conf,
|
||||
action='promenade_load_parts',
|
||||
dag=dag)
|
||||
|
||||
# Verify Site
|
||||
t5 = DryDockOperator(
|
||||
task_id='drydock_verify_site',
|
||||
action='verify_site',
|
||||
dag=dag)
|
||||
# Verify Site
|
||||
t5 = DryDockOperator(
|
||||
task_id='drydock_verify_site',
|
||||
action='verify_site',
|
||||
dag=dag)
|
||||
|
||||
# Prepare Site
|
||||
t6 = DryDockOperator(
|
||||
task_id='drydock_prepare_site',
|
||||
action='prepare_site',
|
||||
dag=dag)
|
||||
# Prepare Site
|
||||
t6 = DryDockOperator(
|
||||
task_id='drydock_prepare_site',
|
||||
action='prepare_site',
|
||||
dag=dag)
|
||||
|
||||
# Prepare Node
|
||||
t7 = DryDockOperator(
|
||||
task_id='drydock_prepare_node',
|
||||
action='prepare_node',
|
||||
node_filter=k8_masters,
|
||||
dag=dag)
|
||||
# Prepare Node
|
||||
t7 = DryDockOperator(
|
||||
task_id='drydock_prepare_node',
|
||||
action='prepare_node',
|
||||
node_filter=k8_masters,
|
||||
dag=dag)
|
||||
|
||||
# Deploy Node
|
||||
t8 = DryDockOperator(
|
||||
task_id='drydock_deploy_node',
|
||||
action='deploy_node',
|
||||
node_filter=k8_masters,
|
||||
dag=dag)
|
||||
# Deploy Node
|
||||
t8 = DryDockOperator(
|
||||
task_id='drydock_deploy_node',
|
||||
action='deploy_node',
|
||||
node_filter=k8_masters,
|
||||
dag=dag)
|
||||
|
||||
# Define dependencies
|
||||
t2.set_upstream(t1)
|
||||
t3.set_upstream(t2)
|
||||
t4.set_upstream(t3)
|
||||
t5.set_upstream(t4)
|
||||
t6.set_upstream(t5)
|
||||
t7.set_upstream(t6)
|
||||
t8.set_upstream(t7)
|
||||
# Define dependencies
|
||||
t2.set_upstream(t1)
|
||||
t3.set_upstream(t2)
|
||||
t4.set_upstream(t3)
|
||||
t5.set_upstream(t4)
|
||||
t6.set_upstream(t5)
|
||||
t7.set_upstream(t6)
|
||||
t8.set_upstream(t7)
|
||||
|
||||
return dag
|
||||
return dag
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
"""
|
||||
import airflow
|
||||
from airflow import DAG
|
||||
from datetime import datetime, timedelta
|
||||
from datetime import timedelta
|
||||
from airflow.operators.subdag_operator import SubDagOperator
|
||||
from drydock_operator_child import sub_dag
|
||||
|
||||
|
@ -41,7 +41,8 @@ main_dag = DAG(
|
|||
)
|
||||
|
||||
subdag = SubDagOperator(
|
||||
subdag=sub_dag(parent_dag_name, child_dag_name, args, main_dag.schedule_interval),
|
||||
subdag=sub_dag(parent_dag_name, child_dag_name, args,
|
||||
main_dag.schedule_interval),
|
||||
task_id=child_dag_name,
|
||||
default_args=args,
|
||||
dag=main_dag)
|
||||
|
|
|
@ -14,7 +14,6 @@
|
|||
|
||||
import logging
|
||||
import subprocess
|
||||
import sys
|
||||
import os
|
||||
import time
|
||||
import re
|
||||
|
@ -68,9 +67,7 @@ class DryDockOperator(BaseOperator):
|
|||
self.node_filter = node_filter
|
||||
self.xcom_push_flag = xcom_push
|
||||
|
||||
|
||||
def execute(self, context):
|
||||
|
||||
# DrydockClient
|
||||
if self.action == 'create_drydock_client':
|
||||
drydock_client = self.drydock_session_client(context)
|
||||
|
@ -79,9 +76,10 @@ class DryDockOperator(BaseOperator):
|
|||
|
||||
# Retrieve drydock_client via XCOM so as to perform other tasks
|
||||
task_instance = context['task_instance']
|
||||
|
||||
drydock_client = task_instance.xcom_pull(
|
||||
task_ids='create_drydock_client',
|
||||
dag_id='drydock_operator_parent.drydock_operator_child')
|
||||
task_ids='create_drydock_client',
|
||||
dag_id='drydock_operator_parent.drydock_operator_child')
|
||||
|
||||
# Get Design ID
|
||||
if self.action == 'get_design_id':
|
||||
|
@ -144,10 +142,10 @@ class DryDockOperator(BaseOperator):
|
|||
task_id = prepare_site['task_id']
|
||||
logging.info(task_id)
|
||||
prepare_site_status = self.drydock_query_task(drydock_client,
|
||||
interval,
|
||||
time_out,
|
||||
task_id,
|
||||
desired_state)
|
||||
interval,
|
||||
time_out,
|
||||
task_id,
|
||||
desired_state)
|
||||
|
||||
if prepare_site_status == 'timed_out':
|
||||
raise AirflowException('Prepare_Site Task Timed Out!')
|
||||
|
@ -160,8 +158,9 @@ class DryDockOperator(BaseOperator):
|
|||
# Create Task for prepare_node
|
||||
elif self.action == 'prepare_node':
|
||||
self.perform_task = 'prepare_node'
|
||||
prepare_node = self.drydock_perform_task(drydock_client, context,
|
||||
self.perform_task, self.node_filter)
|
||||
prepare_node = self.drydock_perform_task(
|
||||
drydock_client, context,
|
||||
self.perform_task, self.node_filter)
|
||||
|
||||
# Define variables
|
||||
# Query every 30 seconds for 30 minutes
|
||||
|
@ -173,10 +172,10 @@ class DryDockOperator(BaseOperator):
|
|||
task_id = prepare_node['task_id']
|
||||
logging.info(task_id)
|
||||
prepare_node_status = self.drydock_query_task(drydock_client,
|
||||
interval,
|
||||
time_out,
|
||||
task_id,
|
||||
desired_state)
|
||||
interval,
|
||||
time_out,
|
||||
task_id,
|
||||
desired_state)
|
||||
|
||||
if prepare_node_status == 'timed_out':
|
||||
raise AirflowException('Prepare_Node Task Timed Out!')
|
||||
|
@ -189,8 +188,10 @@ class DryDockOperator(BaseOperator):
|
|||
# Create Task for deploy_node
|
||||
elif self.action == 'deploy_node':
|
||||
self.perform_task = 'deploy_node'
|
||||
deploy_node = self.drydock_perform_task(drydock_client, context,
|
||||
self.perform_task, self.node_filter)
|
||||
deploy_node = self.drydock_perform_task(drydock_client,
|
||||
context,
|
||||
self.perform_task,
|
||||
self.node_filter)
|
||||
|
||||
# Define variables
|
||||
# Query every 30 seconds for 60 minutes
|
||||
|
@ -214,11 +215,9 @@ class DryDockOperator(BaseOperator):
|
|||
else:
|
||||
logging.info('Deploy Node Task:')
|
||||
logging.info(deploy_node_status)
|
||||
|
||||
else:
|
||||
logging.info('No Action to Perform')
|
||||
|
||||
|
||||
def keystone_token_get(self, conf_path):
|
||||
|
||||
# Read and parse shiyard.conf
|
||||
|
@ -233,31 +232,31 @@ class DryDockOperator(BaseOperator):
|
|||
|
||||
# Execute 'openstack token issue' command
|
||||
logging.info("Get Keystone Token")
|
||||
keystone_token_output = subprocess.Popen(["openstack", "token", "issue"],
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT)
|
||||
keystone_output = subprocess.Popen(["openstack", "token", "issue"],
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT)
|
||||
|
||||
# Get Keystone Token from output
|
||||
line = ''
|
||||
for line in iter(keystone_token_output.stdout.readline, b''):
|
||||
for line in iter(keystone_output.stdout.readline, b''):
|
||||
line = line.strip()
|
||||
if re.search(r'\bid\b', str(line,'utf-8')):
|
||||
keystone_token = str(line,'utf-8').split(' |')[1].split(' ')[1]
|
||||
if re.search(r'\bid\b', str(line, 'utf-8')):
|
||||
token = str(line, 'utf-8').split(' |')[1].split(' ')[1]
|
||||
|
||||
# Wait for child process to terminate
|
||||
# Set and return returncode attribute.
|
||||
keystone_token_output.wait()
|
||||
logging.info("Command exited with "
|
||||
"return code {0}".format(keystone_token_output.returncode))
|
||||
keystone_output.wait()
|
||||
logging.info(
|
||||
"Command exited with "
|
||||
"return code {0}".format(keystone_output.returncode))
|
||||
|
||||
# Raise Execptions if 'openstack token issue' fails to execute
|
||||
if keystone_token_output.returncode:
|
||||
if keystone_output.returncode:
|
||||
raise AirflowException("Unable to get Keystone Token!")
|
||||
return 'keystone_token_error'
|
||||
else:
|
||||
logging.info(keystone_token)
|
||||
return keystone_token
|
||||
|
||||
logging.info(token)
|
||||
return token
|
||||
|
||||
def drydock_session_client(self, context):
|
||||
|
||||
|
@ -271,10 +270,12 @@ class DryDockOperator(BaseOperator):
|
|||
else:
|
||||
pass
|
||||
|
||||
# Build a DrydockSession with credentials and target host information
|
||||
# Note that hard-coded token will be replaced by keystone_token in near future
|
||||
# Build a DrydockSession with credentials and target host
|
||||
# information. Note that hard-coded token will be replaced
|
||||
# by keystone_token in near future
|
||||
logging.info("Build DryDock Session")
|
||||
dd_session = session.DrydockSession(self.host, port=self.port, token=self.token)
|
||||
dd_session = session.DrydockSession(self.host, port=self.port,
|
||||
token=self.token)
|
||||
|
||||
# Raise Exception if we are not able to get a drydock session
|
||||
if dd_session:
|
||||
|
@ -283,7 +284,8 @@ class DryDockOperator(BaseOperator):
|
|||
raise AirflowException("Unable to get a drydock session")
|
||||
|
||||
# Use session to build a DrydockClient to make one or more API calls
|
||||
# The DrydockSession will care for TCP connection pooling and header management
|
||||
# The DrydockSession will care for TCP connection pooling
|
||||
# and header management
|
||||
logging.info("Create DryDock Client")
|
||||
dd_client = client.DrydockClient(dd_session)
|
||||
|
||||
|
@ -296,7 +298,6 @@ class DryDockOperator(BaseOperator):
|
|||
# Drydock client for XCOM Usage
|
||||
return dd_client
|
||||
|
||||
|
||||
def drydock_create_design(self, drydock_client):
|
||||
|
||||
# Create Design
|
||||
|
@ -310,29 +311,29 @@ class DryDockOperator(BaseOperator):
|
|||
else:
|
||||
raise AirflowException("Unable to create Design ID")
|
||||
|
||||
|
||||
def get_design_id(self, context):
|
||||
|
||||
# Get Design ID from XCOM
|
||||
task_instance = context['task_instance']
|
||||
design_id = task_instance.xcom_pull(task_ids='drydock_get_design_id',
|
||||
dag_id='drydock_operator_parent.drydock_operator_child')
|
||||
design_id = task_instance.xcom_pull(
|
||||
task_ids='drydock_get_design_id',
|
||||
dag_id='drydock_operator_parent.drydock_operator_child')
|
||||
|
||||
return design_id
|
||||
|
||||
|
||||
def load_parts(self, drydock_client, context, parts_type):
|
||||
|
||||
# Load new design parts into a design context via YAML conforming to the
|
||||
# Drydock design YAML schema
|
||||
# Load new design parts into a design context via YAML conforming
|
||||
# to the Drydock design YAML schema
|
||||
|
||||
# Open drydock.yaml/promenade.yaml as string so that it can be ingested
|
||||
# This step will change in future when DeckHand is integrated with the system
|
||||
# Open drydock.yaml/promenade.yaml as string so that it can be
|
||||
# ingested. This step will change in future when DeckHand is
|
||||
# integrated with the system
|
||||
if self.parts_type == 'drydock':
|
||||
with open (self.drydock_conf, "r") as drydock_yaml:
|
||||
with open(self.drydock_conf, "r") as drydock_yaml:
|
||||
yaml_string = drydock_yaml.read()
|
||||
else:
|
||||
with open (self.promenade_conf, "r") as promenade_yaml:
|
||||
with open(self.promenade_conf, "r") as promenade_yaml:
|
||||
yaml_string = promenade_yaml.read()
|
||||
|
||||
# Get Design ID and pass it to DryDock
|
||||
|
@ -341,14 +342,16 @@ class DryDockOperator(BaseOperator):
|
|||
# Load Design
|
||||
# Return Exception if list is empty
|
||||
logging.info("Load %s Configuration Yaml", self.parts_type)
|
||||
load_design = drydock_client.load_parts(self.design_id, yaml_string=yaml_string)
|
||||
load_design = drydock_client.load_parts(self.design_id,
|
||||
yaml_string=yaml_string)
|
||||
|
||||
if len(load_design) == 0:
|
||||
raise AirflowException("Empty Design. Please check input Yaml.")
|
||||
else:
|
||||
logging.info(load_design)
|
||||
|
||||
def drydock_perform_task(self, drydock_client, context, perform_task, node_filter):
|
||||
def drydock_perform_task(self, drydock_client, context,
|
||||
perform_task, node_filter):
|
||||
|
||||
# Get Design ID and pass it to DryDock
|
||||
self.design_id = self.get_design_id(context)
|
||||
|
@ -361,7 +364,8 @@ class DryDockOperator(BaseOperator):
|
|||
|
||||
# Get uuid of the create_task's id
|
||||
self.task_id = drydock_client.create_task(self.design_id,
|
||||
task_to_perform, nodes_filter)
|
||||
task_to_perform,
|
||||
nodes_filter)
|
||||
|
||||
# Get current state/response of the drydock task
|
||||
task_status = drydock_client.get_task(self.task_id)
|
||||
|
@ -377,10 +381,10 @@ class DryDockOperator(BaseOperator):
|
|||
task_id, desired_state):
|
||||
|
||||
# Calculate number of times to execute the 'for' loop
|
||||
end_range = int(time_out/interval)
|
||||
end_range = int(time_out / interval)
|
||||
|
||||
# Query task state
|
||||
for i in range(0,end_range+1):
|
||||
for i in range(0, end_range + 1):
|
||||
|
||||
# Retrieve current task state
|
||||
task_state = drydock_client.get_task(self.task_id)
|
||||
|
|
Loading…
Reference in New Issue