From 7bcc5c6141648a8b1a92afab14df50d88201bd4b Mon Sep 17 00:00:00 2001 From: eanylin Date: Mon, 3 Jul 2017 17:05:20 -0500 Subject: [PATCH] Update Falcon API 1) Change the content of shipyard.conf from Upper to Lower Case 2) Return Exception instead of string if shipyard.conf is missing 3) Return HTTP 500 instead of 400 if shipyard.conf is missing 4) Make use of urllib instead --- .../control/airflow_connections.py | 29 +++++++++---------- shipyard_airflow/control/airflow_dag_state.py | 7 ++--- .../control/airflow_get_task_status.py | 7 ++--- .../control/airflow_get_version.py | 7 ++--- shipyard_airflow/control/airflow_list_dags.py | 7 ++--- .../control/airflow_list_tasks.py | 7 ++--- .../control/airflow_trigger_dag.py | 7 ++--- .../control/airflow_trigger_dag_poll.py | 7 ++--- shipyard_airflow/control/dag_runs.py | 7 ++--- shipyard_airflow/control/shipyard.conf | 4 +-- shipyard_airflow/control/tasks.py | 7 ++--- 11 files changed, 43 insertions(+), 53 deletions(-) diff --git a/shipyard_airflow/control/airflow_connections.py b/shipyard_airflow/control/airflow_connections.py index 489e230e..27caaef1 100644 --- a/shipyard_airflow/control/airflow_connections.py +++ b/shipyard_airflow/control/airflow_connections.py @@ -14,6 +14,7 @@ import falcon import json import requests +import urlparse from .base import BaseResource @@ -25,17 +26,17 @@ class AirflowAddConnectionResource(BaseResource): def on_get(self, req, resp, action, conn_id, protocol, host, port): # Retrieve URL - web_server_url = self.retrieve_config('BASE', 'WEB_SERVER') + web_server_url = self.retrieve_config('base', 'web_server') if 'Error' in web_server_url: - resp.status = falcon.HTTP_400 - resp.body = json.dumps({'Error': 'Missing Configuration File'}) - return + resp.status = falcon.HTTP_500 + raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File") else: if action == 'add': - # Concatenate to form the connection URL, Remove whitespaces - conn_uri = "%s %s %s %s %s" % (protocol, '://', host, ':', port) - conn_uri = conn_uri.replace(' ', '') + # Concatenate to form the connection URL + netloc = ''.join([host, ':', port]) + url = (protocol, netloc, '','','') + conn_uri = urlparse.urlunsplit(url) # Form the request URL towards Airflow req_url = '{}/admin/rest_api/api?api=connections&add=true&conn_id={}&conn_uri={}'.format(web_server_url, conn_id, conn_uri) @@ -62,12 +63,11 @@ class AirflowDeleteConnectionResource(BaseResource): def on_get(self, req, resp, action, conn_id): # Retrieve URL - web_server_url = self.retrieve_config('BASE', 'WEB_SERVER') + web_server_url = self.retrieve_config('base', 'web_server') if 'Error' in web_server_url: - resp.status = falcon.HTTP_400 - resp.body = json.dumps({'Error': 'Missing Configuration File'}) - return + resp.status = falcon.HTTP_500 + raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File") else: if action == 'delete': # Form the request URL towards Airflow @@ -95,12 +95,11 @@ class AirflowListConnectionsResource(BaseResource): def on_get(self, req, resp, action): # Retrieve URL - web_server_url = self.retrieve_config('BASE', 'WEB_SERVER') + web_server_url = self.retrieve_config('base', 'web_server') if 'Error' in web_server_url: - resp.status = falcon.HTTP_400 - resp.body = json.dumps({'Error': 'Missing Configuration File'}) - return + resp.status = falcon.HTTP_500 + raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File") else: if action == 'list': # Form the request URL towards Airflow diff --git a/shipyard_airflow/control/airflow_dag_state.py b/shipyard_airflow/control/airflow_dag_state.py index 197232ed..3d6d467a 100644 --- a/shipyard_airflow/control/airflow_dag_state.py +++ b/shipyard_airflow/control/airflow_dag_state.py @@ -23,12 +23,11 @@ class GetDagStateResource(BaseResource): def on_get(self, req, resp, dag_id, execution_date): # Retrieve URL - web_server_url = self.retrieve_config('BASE', 'WEB_SERVER') + web_server_url = self.retrieve_config('base', 'web_server') if 'Error' in web_server_url: - resp.status = falcon.HTTP_400 - resp.body = json.dumps({'Error': 'Missing Configuration File'}) - return + resp.status = falcon.HTTP_500 + raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File") else: req_url = '{}/admin/rest_api/api?api=dag_state&dag_id={}&execution_date={}'.format(web_server_url, dag_id, execution_date) response = requests.get(req_url).json() diff --git a/shipyard_airflow/control/airflow_get_task_status.py b/shipyard_airflow/control/airflow_get_task_status.py index 3b5fbdc8..d768504f 100644 --- a/shipyard_airflow/control/airflow_get_task_status.py +++ b/shipyard_airflow/control/airflow_get_task_status.py @@ -23,12 +23,11 @@ class GetTaskStatusResource(BaseResource): def on_get(self, req, resp, dag_id, task_id, execution_date): # Retrieve URL - web_server_url = self.retrieve_config('BASE', 'WEB_SERVER') + web_server_url = self.retrieve_config('base', 'web_server') if 'Error' in web_server_url: - resp.status = falcon.HTTP_400 - resp.body = json.dumps({'Error': 'Missing Configuration File'}) - return + resp.status = falcon.HTTP_500 + raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File") else: req_url = '{}/admin/rest_api/api?api=task_state&dag_id={}&task_id={}&execution_date={}'.format(web_server_url, dag_id, task_id, execution_date) response = requests.get(req_url).json() diff --git a/shipyard_airflow/control/airflow_get_version.py b/shipyard_airflow/control/airflow_get_version.py index 76591ad5..07abed0a 100644 --- a/shipyard_airflow/control/airflow_get_version.py +++ b/shipyard_airflow/control/airflow_get_version.py @@ -23,12 +23,11 @@ class GetAirflowVersionResource(BaseResource): def on_get(self, req, resp): # Retrieve URL - web_server_url = self.retrieve_config('BASE', 'WEB_SERVER') + web_server_url = self.retrieve_config('base', 'web_server') if 'Error' in web_server_url: - resp.status = falcon.HTTP_400 - resp.body = json.dumps({'Error': 'Missing Configuration File'}) - return + resp.status = falcon.HTTP_500 + raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File") else: # Get Airflow Version req_url = '{}/admin/rest_api/api?api=version'.format(web_server_url) diff --git a/shipyard_airflow/control/airflow_list_dags.py b/shipyard_airflow/control/airflow_list_dags.py index d2b5e60d..ee2fc8c7 100644 --- a/shipyard_airflow/control/airflow_list_dags.py +++ b/shipyard_airflow/control/airflow_list_dags.py @@ -23,12 +23,11 @@ class ListDagsResource(BaseResource): def on_get(self, req, resp): # Retrieve URL - web_server_url = self.retrieve_config('BASE', 'WEB_SERVER') + web_server_url = self.retrieve_config('base', 'web_server') if 'Error' in web_server_url: - resp.status = falcon.HTTP_400 - resp.body = json.dumps({'Error': 'Missing Configuration File'}) - return + resp.status = falcon.HTTP_500 + raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File") else: # List available dags req_url = '{}/admin/rest_api/api?api=list_dags'.format(web_server_url) diff --git a/shipyard_airflow/control/airflow_list_tasks.py b/shipyard_airflow/control/airflow_list_tasks.py index 9a4870b8..b8b848a6 100644 --- a/shipyard_airflow/control/airflow_list_tasks.py +++ b/shipyard_airflow/control/airflow_list_tasks.py @@ -23,12 +23,11 @@ class ListTasksResource(BaseResource): def on_get(self, req, resp, dag_id): # Retrieve URL - web_server_url = self.retrieve_config('BASE', 'WEB_SERVER') + web_server_url = self.retrieve_config('base', 'web_server') if 'Error' in web_server_url: - resp.status = falcon.HTTP_400 - resp.body = json.dumps({'Error': 'Missing Configuration File'}) - return + resp.status = falcon.HTTP_500 + raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File") else: # Retrieve all tasks belonging to a particular Dag req_url = '{}/admin/rest_api/api?api=list_tasks&dag_id={}'.format(web_server_url, dag_id) diff --git a/shipyard_airflow/control/airflow_trigger_dag.py b/shipyard_airflow/control/airflow_trigger_dag.py index e1db4bcb..1e860aa9 100644 --- a/shipyard_airflow/control/airflow_trigger_dag.py +++ b/shipyard_airflow/control/airflow_trigger_dag.py @@ -24,12 +24,11 @@ class TriggerDagRunResource(BaseResource): def on_get(self, req, resp, dag_id, run_id): # Retrieve URL - web_server_url = self.retrieve_config('BASE', 'WEB_SERVER') + web_server_url = self.retrieve_config('base', 'web_server') if 'Error' in web_server_url: - resp.status = falcon.HTTP_400 - resp.body = json.dumps({'Error': 'Missing Configuration File'}) - return + resp.status = falcon.HTTP_500 + raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File") else: req_url = '{}/admin/rest_api/api?api=trigger_dag&dag_id={}&run_id={}'.format(web_server_url, dag_id, run_id) response = requests.get(req_url).json() diff --git a/shipyard_airflow/control/airflow_trigger_dag_poll.py b/shipyard_airflow/control/airflow_trigger_dag_poll.py index 62149bf3..05db6997 100644 --- a/shipyard_airflow/control/airflow_trigger_dag_poll.py +++ b/shipyard_airflow/control/airflow_trigger_dag_poll.py @@ -26,12 +26,11 @@ class TriggerDagRunPollResource(BaseResource): def on_get(self, req, resp, dag_id, run_id): # Retrieve URL - web_server_url = self.retrieve_config('BASE', 'WEB_SERVER') + web_server_url = self.retrieve_config('base', 'web_server') if 'Error' in web_server_url: - resp.status = falcon.HTTP_400 - resp.body = json.dumps({'Error': 'Missing Configuration File'}) - return + resp.status = falcon.HTTP_500 + raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File") else: req_url = '{}/admin/rest_api/api?api=trigger_dag&dag_id={}&run_id={}'.format(web_server_url, dag_id, run_id) response = requests.get(req_url).json() diff --git a/shipyard_airflow/control/dag_runs.py b/shipyard_airflow/control/dag_runs.py index 7a462d3d..ee480634 100644 --- a/shipyard_airflow/control/dag_runs.py +++ b/shipyard_airflow/control/dag_runs.py @@ -23,12 +23,11 @@ class DagRunResource(BaseResource): def on_post(self, req, resp, dag_id, run_id=None, conf=None, execution_date=None): # Retrieve URL - web_server_url = self.retrieve_config('BASE', 'WEB_SERVER') + web_server_url = self.retrieve_config('base', 'web_server') if 'Error' in web_server_url: - resp.status = falcon.HTTP_400 - resp.body = json.dumps({'Error': 'Missing Configuration File'}) - return + resp.status = falcon.HTTP_500 + raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File") else: req_url = '{}/api/experimental/dags/{}/dag_runs'.format(web_server_url, dag_id) diff --git a/shipyard_airflow/control/shipyard.conf b/shipyard_airflow/control/shipyard.conf index 6dadc48c..bb1e928d 100644 --- a/shipyard_airflow/control/shipyard.conf +++ b/shipyard_airflow/control/shipyard.conf @@ -1,3 +1,3 @@ -[BASE] -WEB_SERVER=http://localhost:32080 +[base] +web_server=http://localhost:32080 diff --git a/shipyard_airflow/control/tasks.py b/shipyard_airflow/control/tasks.py index 9cb2f6e8..e67b79e6 100644 --- a/shipyard_airflow/control/tasks.py +++ b/shipyard_airflow/control/tasks.py @@ -23,12 +23,11 @@ class TaskResource(BaseResource): def on_get(self, req, resp, dag_id, task_id): # Retrieve URL - web_server_url = self.retrieve_config('BASE', 'WEB_SERVER') + web_server_url = self.retrieve_config('base', 'web_server') if 'Error' in web_server_url: - resp.status = falcon.HTTP_400 - resp.body = json.dumps({'Error': 'Missing Configuration File'}) - return + resp.status = falcon.HTTP_500 + raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File") else: req_url = '{}/api/experimental/dags/{}/tasks/{}'.format(web_server_url, dag_id, task_id) task_details = requests.get(req_url).json()