diff --git a/shipyard_airflow/control/shipyard.conf b/shipyard_airflow/control/shipyard.conf index bb1e928d..402f9c7e 100644 --- a/shipyard_airflow/control/shipyard.conf +++ b/shipyard_airflow/control/shipyard.conf @@ -1,3 +1,20 @@ [base] web_server=http://localhost:32080 +[drydock] +host=drydock-api.drydock +port=32768 +token=bigboss +site_yaml=/usr/local/airflow/plugins/drydock.yaml +prom_yaml=/usr/local/airflow/plugins/promenade.yaml +k8_masters=k8_master_node02,k8_master_node03 + +[keystone] +OS_AUTH_URL=http://keystone-api.openstack:80/v3 +OS_PROJECT_NAME=service +OS_USER_DOMAIN_NAME=Default +OS_USERNAME=shipyard +OS_PASSWORD=password +OS_REGION_NAME=RegionOne +OS_IDENTITY_API_VERSION=3 + diff --git a/shipyard_airflow/dags/drydock_operator_child.py b/shipyard_airflow/dags/drydock_operator_child.py new file mode 100644 index 00000000..d505b073 --- /dev/null +++ b/shipyard_airflow/dags/drydock_operator_child.py @@ -0,0 +1,117 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +### DryDock Operator Child Dag +""" +import airflow +import configparser +from airflow import DAG +from airflow.operators import DryDockOperator +from airflow.operators.bash_operator import BashOperator +from datetime import datetime, timedelta + + +def sub_dag(parent_dag_name, child_dag_name, args, schedule_interval): + dag = DAG( + '%s.%s' % (parent_dag_name, child_dag_name), + default_args=args, + start_date=args['start_date'], + max_active_runs=1, + ) + + # Location of shiyard.conf + config_path = '/usr/local/airflow/plugins/shipyard.conf' + + # Read and parse shiyard.conf + config = configparser.ConfigParser() + config.read(config_path) + + # Define Variables + drydock_target_host = config.get('drydock', 'host') + drydock_port = config.get('drydock', 'port') + drydock_token = config.get('drydock', 'token') + drydock_conf = config.get('drydock', 'site_yaml') + promenade_conf = config.get('drydock', 'prom_yaml') + + # Convert to Dictionary + k8_masters_sting = config.get('drydock', 'k8_masters') + k8_masters_list = k8_masters_sting.split(',') + k8_masters = {'node_names': k8_masters_list} + + # Create Drydock Client + t1 = DryDockOperator( + task_id='create_drydock_client', + host=drydock_target_host, + port=drydock_port, + token=drydock_token, + shipyard_conf=config_path, + action='create_drydock_client', + dag=dag) + + # Get Design ID + t2 = DryDockOperator( + task_id='drydock_get_design_id', + action='get_design_id', + dag=dag) + + # DryDock Load Parts + t3 = DryDockOperator( + task_id='drydock_load_parts', + drydock_conf=drydock_conf, + action='drydock_load_parts', + dag=dag) + + # Promenade Load Parts + t4 = DryDockOperator( + task_id='promenade_load_parts', + promenade_conf=promenade_conf, + action='promenade_load_parts', + dag=dag) + + # Verify Site + t5 = DryDockOperator( + task_id='drydock_verify_site', + action='verify_site', + dag=dag) + + # Prepare Site + t6 = DryDockOperator( + task_id='drydock_prepare_site', + action='prepare_site', + dag=dag) + + # Prepare Node + t7 = DryDockOperator( + task_id='drydock_prepare_node', + action='prepare_node', + node_filter=k8_masters, + dag=dag) + + # Deploy Node + t8 = DryDockOperator( + task_id='drydock_deploy_node', + action='deploy_node', + node_filter=k8_masters, + dag=dag) + + # Define dependencies + t2.set_upstream(t1) + t3.set_upstream(t2) + t4.set_upstream(t3) + t5.set_upstream(t4) + t6.set_upstream(t5) + t7.set_upstream(t6) + t8.set_upstream(t7) + + return dag diff --git a/shipyard_airflow/dags/drydock_operator_parent.py b/shipyard_airflow/dags/drydock_operator_parent.py new file mode 100644 index 00000000..b0150f6b --- /dev/null +++ b/shipyard_airflow/dags/drydock_operator_parent.py @@ -0,0 +1,47 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +### DryDock Operator Parent Dag +""" +import airflow +from airflow import DAG +from datetime import datetime, timedelta +from airflow.operators.subdag_operator import SubDagOperator +from drydock_operator_child import sub_dag + +parent_dag_name = 'drydock_operator_parent' +child_dag_name = 'drydock_operator_child' + +args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'start_date': airflow.utils.dates.days_ago(1), + 'retries': 0, + 'retry_delay': timedelta(minutes=1), + 'provide_context': True +} + +main_dag = DAG( + dag_id=parent_dag_name, + default_args=args, + schedule_interval=None, + start_date=airflow.utils.dates.days_ago(1), + max_active_runs=1 +) + +subdag = SubDagOperator( + subdag=sub_dag(parent_dag_name, child_dag_name, args, main_dag.schedule_interval), + task_id=child_dag_name, + default_args=args, + dag=main_dag) diff --git a/shipyard_airflow/plugins/drydock_operators.py b/shipyard_airflow/plugins/drydock_operators.py new file mode 100644 index 00000000..e600bb50 --- /dev/null +++ b/shipyard_airflow/plugins/drydock_operators.py @@ -0,0 +1,408 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import subprocess +import sys +import os +import time +import re +import configparser + +from airflow.exceptions import AirflowException +from airflow.models import BaseOperator +from airflow.plugins_manager import AirflowPlugin +from airflow.utils.decorators import apply_defaults + +import drydock_provisioner.drydock_client.client as client +import drydock_provisioner.drydock_client.session as session + + +class DryDockOperator(BaseOperator): + """ + DryDock Client + :host: Target Host + :port: DryDock Port + :token: DryDock Token + :shipyard_conf: Location of shipyard.conf + :drydock_conf: Location of drydock YAML + :promenade_conf: Location of promenade YAML + :node_filter: Valid fields are 'node_names','rack_names','node_tags' + :action: Task to perform + :design_id: DryDock Design ID + """ + @apply_defaults + def __init__(self, + host=None, + port=None, + token=None, + action=None, + design_id=None, + node_filter=None, + shipyard_conf=None, + drydock_conf=None, + promenade_conf=None, + xcom_push=True, + *args, **kwargs): + + super(DryDockOperator, self).__init__(*args, **kwargs) + self.host = host + self.port = port + self.token = token + self.shipyard_conf = shipyard_conf + self.drydock_conf = drydock_conf + self.promenade_conf = promenade_conf + self.action = action + self.design_id = design_id + self.node_filter = node_filter + self.xcom_push_flag = xcom_push + + + def execute(self, context): + + # DrydockClient + if self.action == 'create_drydock_client': + drydock_client = self.drydock_session_client(context) + + return drydock_client + + # Retrieve drydock_client via XCOM so as to perform other tasks + task_instance = context['task_instance'] + drydock_client = task_instance.xcom_pull( + task_ids='create_drydock_client', + dag_id='drydock_operator_parent.drydock_operator_child') + + # Get Design ID + if self.action == 'get_design_id': + design_id = self.drydock_create_design(drydock_client) + + return design_id + + # DryDock Load Parts + elif self.action == 'drydock_load_parts': + self.parts_type = 'drydock' + self.load_parts(drydock_client, context, self.parts_type) + + # Promenade Load Parts + elif self.action == 'promenade_load_parts': + self.parts_type = 'promenade' + self.load_parts(drydock_client, context, self.parts_type) + + # Create Task for verify_site + elif self.action == 'verify_site': + self.perform_task = 'verify_site' + verify_site = self.drydock_perform_task(drydock_client, context, + self.perform_task, None) + + # Define variables + # Query every 10 seconds for 1 minute + interval = 10 + time_out = 60 + desired_state = 'success' + + # Query verify_site Task + task_id = verify_site['task_id'] + logging.info(task_id) + verify_site_status = self.drydock_query_task(drydock_client, + interval, + time_out, + task_id, + desired_state) + + if verify_site_status == 'timed_out': + raise AirflowException('Verify_Site Task Timed Out!') + elif verify_site_status == 'task_failed': + raise AirflowException('Verify_Site Task Failed!') + else: + logging.info('Verify Site Task:') + logging.info(verify_site_status) + + # Create Task for prepare_site + elif self.action == 'prepare_site': + self.perform_task = 'prepare_site' + prepare_site = self.drydock_perform_task(drydock_client, context, + self.perform_task, None) + + # Define variables + # Query every 10 seconds for 2 minutes + interval = 10 + time_out = 120 + desired_state = 'partial_success' + + # Query prepare_site Task + task_id = prepare_site['task_id'] + logging.info(task_id) + prepare_site_status = self.drydock_query_task(drydock_client, + interval, + time_out, + task_id, + desired_state) + + if prepare_site_status == 'timed_out': + raise AirflowException('Prepare_Site Task Timed Out!') + elif prepare_site_status == 'task_failed': + raise AirflowException('Prepare_Site Task Failed!') + else: + logging.info('Prepare Site Task:') + logging.info(prepare_site_status) + + # Create Task for prepare_node + elif self.action == 'prepare_node': + self.perform_task = 'prepare_node' + prepare_node = self.drydock_perform_task(drydock_client, context, + self.perform_task, self.node_filter) + + # Define variables + # Query every 30 seconds for 30 minutes + interval = 30 + time_out = 1800 + desired_state = 'success' + + # Query prepare_node Task + task_id = prepare_node['task_id'] + logging.info(task_id) + prepare_node_status = self.drydock_query_task(drydock_client, + interval, + time_out, + task_id, + desired_state) + + if prepare_node_status == 'timed_out': + raise AirflowException('Prepare_Node Task Timed Out!') + elif prepare_node_status == 'task_failed': + raise AirflowException('Prepare_Node Task Failed!') + else: + logging.info('Prepare Node Task:') + logging.info(prepare_node_status) + + # Create Task for deploy_node + elif self.action == 'deploy_node': + self.perform_task = 'deploy_node' + deploy_node = self.drydock_perform_task(drydock_client, context, + self.perform_task, self.node_filter) + + # Define variables + # Query every 30 seconds for 60 minutes + interval = 30 + time_out = 3600 + desired_state = 'success' + + # Query deploy_node Task + task_id = deploy_node['task_id'] + logging.info(task_id) + deploy_node_status = self.drydock_query_task(drydock_client, + interval, + time_out, + task_id, + desired_state) + + if deploy_node_status == 'timed_out': + raise AirflowException('Deploy_Node Task Timed Out!') + elif deploy_node_status == 'task_failed': + raise AirflowException('Deploy_Node Task Failed!') + else: + logging.info('Deploy Node Task:') + logging.info(deploy_node_status) + + else: + logging.info('No Action to Perform') + + + def keystone_token_get(self, conf_path): + + # Read and parse shiyard.conf + config = configparser.ConfigParser() + config.read(conf_path) + + # Construct Envrionment variables + for attr in ('OS_AUTH_URL', 'OS_PROJECT_NAME', 'OS_USER_DOMAIN_NAME', + 'OS_USERNAME', 'OS_PASSWORD', 'OS_REGION_NAME', + 'OS_IDENTITY_API_VERSION'): + os.environ[attr] = config.get('keystone', attr) + + # Execute 'openstack token issue' command + logging.info("Get Keystone Token") + keystone_token_output = subprocess.Popen(["openstack", "token", "issue"], + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + + # Get Keystone Token from output + line = '' + for line in iter(keystone_token_output.stdout.readline, b''): + line = line.strip() + if re.search(r'\bid\b', str(line,'utf-8')): + keystone_token = str(line,'utf-8').split(' |')[1].split(' ')[1] + + # Wait for child process to terminate + # Set and return returncode attribute. + keystone_token_output.wait() + logging.info("Command exited with " + "return code {0}".format(keystone_token_output.returncode)) + + # Raise Execptions if 'openstack token issue' fails to execute + if keystone_token_output.returncode: + raise AirflowException("Unable to get Keystone Token!") + return 'keystone_token_error' + else: + logging.info(keystone_token) + return keystone_token + + + def drydock_session_client(self, context): + + # Retrieve Keystone Token + keystone_token = self.keystone_token_get(self.shipyard_conf) + + # Raise Exception and Exit if we are not able to get Keystone + # Token, else continue + if keystone_token == 'keystone_token_error': + raise AirflowException("Unable to get Keystone Token!") + else: + pass + + # Build a DrydockSession with credentials and target host information + # Note that hard-coded token will be replaced by keystone_token in near future + logging.info("Build DryDock Session") + dd_session = session.DrydockSession(self.host, port=self.port, token=self.token) + + # Raise Exception if we are not able to get a drydock session + if dd_session: + pass + else: + raise AirflowException("Unable to get a drydock session") + + # Use session to build a DrydockClient to make one or more API calls + # The DrydockSession will care for TCP connection pooling and header management + logging.info("Create DryDock Client") + dd_client = client.DrydockClient(dd_session) + + # Raise Exception if we are not able to build drydock client + if dd_client: + pass + else: + raise AirflowException("Unable to build drydock client") + + # Drydock client for XCOM Usage + return dd_client + + + def drydock_create_design(self, drydock_client): + + # Create Design + logging.info('Create Design ID') + drydock_design_id = drydock_client.create_design() + + # Raise Exception if we are not able to get a value + # from drydock create_design API call + if drydock_design_id: + return drydock_design_id + else: + raise AirflowException("Unable to create Design ID") + + + def get_design_id(self, context): + + # Get Design ID from XCOM + task_instance = context['task_instance'] + design_id = task_instance.xcom_pull(task_ids='drydock_get_design_id', + dag_id='drydock_operator_parent.drydock_operator_child') + + return design_id + + + def load_parts(self, drydock_client, context, parts_type): + + # Load new design parts into a design context via YAML conforming to the + # Drydock design YAML schema + + # Open drydock.yaml/promenade.yaml as string so that it can be ingested + # This step will change in future when DeckHand is integrated with the system + if self.parts_type == 'drydock': + with open (self.drydock_conf, "r") as drydock_yaml: + yaml_string = drydock_yaml.read() + else: + with open (self.promenade_conf, "r") as promenade_yaml: + yaml_string = promenade_yaml.read() + + # Get Design ID and pass it to DryDock + self.design_id = self.get_design_id(context) + + # Load Design + # Return Exception if list is empty + logging.info("Load %s Configuration Yaml", self.parts_type) + load_design = drydock_client.load_parts(self.design_id, yaml_string=yaml_string) + + if len(load_design) == 0: + raise AirflowException("Empty Design. Please check input Yaml.") + else: + logging.info(load_design) + + def drydock_perform_task(self, drydock_client, context, perform_task, node_filter): + + # Get Design ID and pass it to DryDock + self.design_id = self.get_design_id(context) + + # Task to do + task_to_perform = self.perform_task + + # Node Filter + nodes_filter = self.node_filter + + # Get uuid of the create_task's id + self.task_id = drydock_client.create_task(self.design_id, + task_to_perform, nodes_filter) + + # Get current state/response of the drydock task + task_status = drydock_client.get_task(self.task_id) + + # task_status should contain information and be of length > 0 + # Raise Exception if that is not the case + if len(task_status) == 0: + raise AirflowException("Unable to get task state") + else: + return task_status + + def drydock_query_task(self, drydock_client, interval, time_out, + task_id, desired_state): + + # Calculate number of times to execute the 'for' loop + end_range = int(time_out/interval) + + # Query task state + for i in range(0,end_range+1): + + # Retrieve current task state + task_state = drydock_client.get_task(self.task_id) + logging.info(task_state) + + # Return Time Out Exception + if task_state['status'] == 'running' and i == end_range: + logging.info('Timed Out!') + return 'timed_out' + + # Exit 'for' loop if task is in 'complete' state + if task_state['status'] == 'complete': + break + else: + time.sleep(interval) + + # Get final task state + if task_state['result'] == desired_state: + return drydock_client.get_task(self.task_id) + else: + return 'task_failed' + +class DryDockClientPlugin(AirflowPlugin): + name = "drydock_client_plugin" + operators = [DryDockOperator]