From ed799e5a01ade2a97ae3ec1bb8587f3578d7e026 Mon Sep 17 00:00:00 2001 From: eanylin Date: Sun, 2 Jul 2017 22:07:46 -0500 Subject: [PATCH] Update Airflow Dag and Plugin Updated codes based on Feedbacks --- .../dags/samples/airflow_task_state_operators.py | 8 ++++---- .../dags/samples/openstack_api_call.py | 10 +++++----- .../plugins/airflow_task_state_operators.py | 7 +++---- shipyard_airflow/plugins/openstack_operators.py | 14 ++++---------- shipyard_airflow/plugins/rest_api_plugin.py | 1 - 5 files changed, 16 insertions(+), 24 deletions(-) diff --git a/shipyard_airflow/dags/samples/airflow_task_state_operators.py b/shipyard_airflow/dags/samples/airflow_task_state_operators.py index 0b040a0e..7a462cdc 100644 --- a/shipyard_airflow/dags/samples/airflow_task_state_operators.py +++ b/shipyard_airflow/dags/samples/airflow_task_state_operators.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. """ -### Airflow Task State +### Airflow Task State Sample Dag """ import airflow from airflow import DAG @@ -36,9 +36,9 @@ dag = DAG('airflow_task_state', default_args=default_args, schedule_interval=Non # Get Task State t1 = TaskStateOperator( task_id='airflow_task_state', - airflow_dag_id='airflow_cli', - airflow_task_id='airflow_task_state', - airflow_execution_date='2017-06-25T21:27:52.809436', + airflow_dag_id='openstack_cli', + airflow_task_id='endpoint_list_task', + airflow_execution_date='2017-07-02T21:30:33.519582', dag=dag) # Use XCOM to Retrieve Task State diff --git a/shipyard_airflow/dags/samples/openstack_api_call.py b/shipyard_airflow/dags/samples/openstack_api_call.py index 9bb5d60b..f7abfd59 100644 --- a/shipyard_airflow/dags/samples/openstack_api_call.py +++ b/shipyard_airflow/dags/samples/openstack_api_call.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. """ -### Openstack CLI Dag +### Openstack CLI Sample Dag """ import airflow from airflow import DAG @@ -47,28 +47,28 @@ t1 = BashOperator( t2 = OpenStackOperator( task_id='endpoint_list_task', openrc_file='/usr/local/airflow/dags/openrc.sh', - openstack_command='openstack endpoint list', + openstack_command=['openstack', 'endpoint', 'list'], dag=dag) # openstack service list t3 = OpenStackOperator( task_id='service_list_task', openrc_file='/usr/local/airflow/dags/openrc.sh', - openstack_command='openstack service list', + openstack_command=['openstack', 'service', 'list'], dag=dag) # openstack server list t4 = OpenStackOperator( task_id='server_list_task', openrc_file='/usr/local/airflow/dags/openrc.sh', - openstack_command='openstack server list', + openstack_command=['openstack', 'server', 'list'], dag=dag) # openstack network list t5 = OpenStackOperator( task_id='network_list_task', openrc_file='/usr/local/airflow/dags/openrc.sh', - openstack_command='openstack network list', + openstack_command=['openstack', 'network', 'list'], dag=dag) t2.set_upstream(t1) diff --git a/shipyard_airflow/plugins/airflow_task_state_operators.py b/shipyard_airflow/plugins/airflow_task_state_operators.py index e5b57c7c..2e76ffc9 100644 --- a/shipyard_airflow/plugins/airflow_task_state_operators.py +++ b/shipyard_airflow/plugins/airflow_task_state_operators.py @@ -16,7 +16,6 @@ import logging import subprocess import sys import os -import shlex from airflow.exceptions import AirflowException from airflow.models import BaseOperator @@ -42,14 +41,14 @@ class TaskStateOperator(BaseOperator): self.airflow_dag_id = airflow_dag_id self.airflow_task_id = airflow_task_id self.airflow_execution_date = airflow_execution_date - self.airflow_command = "%s %s %s %s" % ("airflow task_state", airflow_dag_id, airflow_task_id, airflow_execution_date) + self.airflow_command = ['airflow', 'task_state', airflow_dag_id, airflow_task_id, airflow_execution_date] def execute(self, context): - logging.info("Running Airflow Command: " + self.airflow_command) + logging.info("Running Airflow Command: " + ' '.join(self.airflow_command)) # Execute Airflow CLI Command - airflow_cli = subprocess.Popen(shlex.split(self.airflow_command), stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + airflow_cli = subprocess.Popen(self.airflow_command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) # Logs Output # Filter out logging messages from standard output and keep only the relevant information diff --git a/shipyard_airflow/plugins/openstack_operators.py b/shipyard_airflow/plugins/openstack_operators.py index 818f36be..dde845d4 100644 --- a/shipyard_airflow/plugins/openstack_operators.py +++ b/shipyard_airflow/plugins/openstack_operators.py @@ -16,7 +16,6 @@ import logging import subprocess import sys import os -import shlex from airflow.exceptions import AirflowException from airflow.models import BaseOperator @@ -42,18 +41,15 @@ class OpenStackOperator(BaseOperator): self.xcom_push_flag = xcom_push def execute(self, context): - logging.info("Running OpenStack Command: " + self.openstack_command) + logging.info("Running OpenStack Command: " + ' '.join(self.openstack_command)) - # Emulate "source" in bash. Sets up environment variables. + # Build environment variables. pipe = subprocess.Popen(". %s; env" % self.openrc_file, stdout=subprocess.PIPE, shell=True) data = pipe.communicate()[0] - env = dict((line.split("=", 1) for line in data.splitlines())) - os.environ.update(env) - + os_env = dict((line.split("=", 1) for line in data.splitlines())) # Execute the OpenStack CLI Command - openstack_cli = subprocess.Popen(shlex.split(self.openstack_command), stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - + openstack_cli = subprocess.Popen(self.openstack_command, env=os_env, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) # Logs Output logging.info("Output:") @@ -63,13 +59,11 @@ class OpenStackOperator(BaseOperator): line = line.strip() logging.info(line) - # Wait for child process to terminate. Set and return returncode attribute. openstack_cli.wait() logging.info("Command exited with " "return code {0}".format(openstack_cli.returncode)) - # Raise Execptions if OpenStack Command Fails if openstack_cli.returncode: raise AirflowException("OpenStack Command Failed") diff --git a/shipyard_airflow/plugins/rest_api_plugin.py b/shipyard_airflow/plugins/rest_api_plugin.py index 77c8d480..9ac0879c 100644 --- a/shipyard_airflow/plugins/rest_api_plugin.py +++ b/shipyard_airflow/plugins/rest_api_plugin.py @@ -435,7 +435,6 @@ class REST_API(BaseView): # '/api' REST Endpoint where API requests should all come in @csrf.exempt # Exempt the CSRF token @expose('/api', methods=["GET", "POST"]) - @http_token_secure # On each request, def api(self): base_response = REST_API_Response_Util.get_base_response()