From eab86b6a8733fed9a1cf02a568704ff59a2f46d3 Mon Sep 17 00:00:00 2001 From: eanylin Date: Sat, 17 Jun 2017 23:11:48 -0500 Subject: [PATCH] Add OpenStack Operator Define as a plugin so that Airflow is able to pick up the plugin and all the operators it define --- shipyard_airflow/dags/openstack_api_call.py | 66 ++++++++++---- .../plugins/openstack_operators.py | 88 +++++++++++++++++++ 2 files changed, 135 insertions(+), 19 deletions(-) create mode 100644 shipyard_airflow/plugins/openstack_operators.py diff --git a/shipyard_airflow/dags/openstack_api_call.py b/shipyard_airflow/dags/openstack_api_call.py index 41d0b0b5..e71819ff 100644 --- a/shipyard_airflow/dags/openstack_api_call.py +++ b/shipyard_airflow/dags/openstack_api_call.py @@ -1,25 +1,38 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. """ -OpenStack CLI - -Perform basic OpenStack CLI calls +### Openstack CLI Dag """ +import airflow from airflow import DAG +from airflow.operators import OpenStackOperator from airflow.operators.bash_operator import BashOperator from datetime import datetime, timedelta -import os + default_args = { 'owner': 'airflow', 'depends_on_past': False, - 'start_date': datetime(2017, 6, 14), - 'email': ['airflow@airflow.com'], + 'start_date': airflow.utils.dates.days_ago(2), + 'email': ['airflow@example.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, - 'retry_delay': timedelta(minutes=5), + 'retry_delay': timedelta(minutes=1), } -dag = DAG('openstack_api_call', default_args=default_args, schedule_interval=None) +dag = DAG('openstack_cli', default_args=default_args, schedule_interval=None) # print_date t1 = BashOperator( @@ -27,21 +40,36 @@ t1 = BashOperator( bash_command='date', dag=dag) -# Current assumption is that we will be able to retrieve information from -# data manager (DeckHand) to create the admin-openrc.sh that is needed for -# airflow to perform OpenStack API calls -t2 = BashOperator( - task_id='nova_list', - bash_command='source ' + os.getcwd() + '/dags/admin-openrc.sh' + ';' + 'nova' + ' list', - retries=3, +# openstack endpoint list +t2 = OpenStackOperator( + task_id='endpoint_list_task', + openrc_file='/home/ubuntu/airflow/openrc.sh', + openstack_command='openstack endpoint list', dag=dag) -t3 = BashOperator( - task_id='neutron_net_list', - bash_command='source ' + os.getcwd() + '/dags/admin-openrc.sh' + ';' + 'neutron' + ' net-list', - retries=3, +# openstack service list +t3 = OpenStackOperator( + task_id='service_list_task', + openrc_file='/home/ubuntu/airflow/openrc.sh', + openstack_command='openstack service list', + dag=dag) + +# openstack server list +t4 = OpenStackOperator( + task_id='server_list_task', + openrc_file='/home/ubuntu/airflow/openrc.sh', + openstack_command='openstack server list', + dag=dag) + +# openstack network list +t5 = OpenStackOperator( + task_id='network_list_task', + openrc_file='/home/ubuntu/airflow/openrc.sh', + openstack_command='openstack network list', dag=dag) t2.set_upstream(t1) t3.set_upstream(t1) +t4.set_upstream(t1) +t5.set_upstream(t1) diff --git a/shipyard_airflow/plugins/openstack_operators.py b/shipyard_airflow/plugins/openstack_operators.py new file mode 100644 index 00000000..818f36be --- /dev/null +++ b/shipyard_airflow/plugins/openstack_operators.py @@ -0,0 +1,88 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import subprocess +import sys +import os +import shlex + +from airflow.exceptions import AirflowException +from airflow.models import BaseOperator +from airflow.plugins_manager import AirflowPlugin +from airflow.utils.decorators import apply_defaults + +class OpenStackOperator(BaseOperator): + """ + Performs OpenStack CLI calls + :openrc_file: Path of the openrc file + :openstack_command: The OpenStack command to be executed + """ + @apply_defaults + def __init__(self, + openrc_file, + openstack_command=None, + xcom_push=False, + *args, **kwargs): + + super(OpenStackOperator, self).__init__(*args, **kwargs) + self.openrc_file = openrc_file + self.openstack_command = openstack_command + self.xcom_push_flag = xcom_push + + def execute(self, context): + logging.info("Running OpenStack Command: " + self.openstack_command) + + # Emulate "source" in bash. Sets up environment variables. + pipe = subprocess.Popen(". %s; env" % self.openrc_file, stdout=subprocess.PIPE, shell=True) + data = pipe.communicate()[0] + env = dict((line.split("=", 1) for line in data.splitlines())) + os.environ.update(env) + + + # Execute the OpenStack CLI Command + openstack_cli = subprocess.Popen(shlex.split(self.openstack_command), stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + + + # Logs Output + logging.info("Output:") + + line = '' + for line in iter(openstack_cli.stdout.readline, b''): + line = line.strip() + logging.info(line) + + + # Wait for child process to terminate. Set and return returncode attribute. + openstack_cli.wait() + logging.info("Command exited with " + "return code {0}".format(openstack_cli.returncode)) + + + # Raise Execptions if OpenStack Command Fails + if openstack_cli.returncode: + raise AirflowException("OpenStack Command Failed") + + + """ + Push response to an XCom if xcom_push is True + """ + if self.xcom_push_flag: + return line + + +class OpenStackCliPlugin(AirflowPlugin): + name = "openstack_cli_plugin" + operators = [OpenStackOperator] +