Update Falcon API
1) Change the content of shipyard.conf from Upper to Lower Case 2) Return Exception instead of string if shipyard.conf is missing 3) Return HTTP 500 instead of 400 if shipyard.conf is missing 4) Make use of urllib instead
This commit is contained in:
parent
666136e319
commit
7bcc5c6141
|
@ -14,6 +14,7 @@
|
||||||
import falcon
|
import falcon
|
||||||
import json
|
import json
|
||||||
import requests
|
import requests
|
||||||
|
import urlparse
|
||||||
|
|
||||||
from .base import BaseResource
|
from .base import BaseResource
|
||||||
|
|
||||||
|
@ -25,17 +26,17 @@ class AirflowAddConnectionResource(BaseResource):
|
||||||
|
|
||||||
def on_get(self, req, resp, action, conn_id, protocol, host, port):
|
def on_get(self, req, resp, action, conn_id, protocol, host, port):
|
||||||
# Retrieve URL
|
# Retrieve URL
|
||||||
web_server_url = self.retrieve_config('BASE', 'WEB_SERVER')
|
web_server_url = self.retrieve_config('base', 'web_server')
|
||||||
|
|
||||||
if 'Error' in web_server_url:
|
if 'Error' in web_server_url:
|
||||||
resp.status = falcon.HTTP_400
|
resp.status = falcon.HTTP_500
|
||||||
resp.body = json.dumps({'Error': 'Missing Configuration File'})
|
raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File")
|
||||||
return
|
|
||||||
else:
|
else:
|
||||||
if action == 'add':
|
if action == 'add':
|
||||||
# Concatenate to form the connection URL, Remove whitespaces
|
# Concatenate to form the connection URL
|
||||||
conn_uri = "%s %s %s %s %s" % (protocol, '://', host, ':', port)
|
netloc = ''.join([host, ':', port])
|
||||||
conn_uri = conn_uri.replace(' ', '')
|
url = (protocol, netloc, '','','')
|
||||||
|
conn_uri = urlparse.urlunsplit(url)
|
||||||
|
|
||||||
# Form the request URL towards Airflow
|
# Form the request URL towards Airflow
|
||||||
req_url = '{}/admin/rest_api/api?api=connections&add=true&conn_id={}&conn_uri={}'.format(web_server_url, conn_id, conn_uri)
|
req_url = '{}/admin/rest_api/api?api=connections&add=true&conn_id={}&conn_uri={}'.format(web_server_url, conn_id, conn_uri)
|
||||||
|
@ -62,12 +63,11 @@ class AirflowDeleteConnectionResource(BaseResource):
|
||||||
|
|
||||||
def on_get(self, req, resp, action, conn_id):
|
def on_get(self, req, resp, action, conn_id):
|
||||||
# Retrieve URL
|
# Retrieve URL
|
||||||
web_server_url = self.retrieve_config('BASE', 'WEB_SERVER')
|
web_server_url = self.retrieve_config('base', 'web_server')
|
||||||
|
|
||||||
if 'Error' in web_server_url:
|
if 'Error' in web_server_url:
|
||||||
resp.status = falcon.HTTP_400
|
resp.status = falcon.HTTP_500
|
||||||
resp.body = json.dumps({'Error': 'Missing Configuration File'})
|
raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File")
|
||||||
return
|
|
||||||
else:
|
else:
|
||||||
if action == 'delete':
|
if action == 'delete':
|
||||||
# Form the request URL towards Airflow
|
# Form the request URL towards Airflow
|
||||||
|
@ -95,12 +95,11 @@ class AirflowListConnectionsResource(BaseResource):
|
||||||
|
|
||||||
def on_get(self, req, resp, action):
|
def on_get(self, req, resp, action):
|
||||||
# Retrieve URL
|
# Retrieve URL
|
||||||
web_server_url = self.retrieve_config('BASE', 'WEB_SERVER')
|
web_server_url = self.retrieve_config('base', 'web_server')
|
||||||
|
|
||||||
if 'Error' in web_server_url:
|
if 'Error' in web_server_url:
|
||||||
resp.status = falcon.HTTP_400
|
resp.status = falcon.HTTP_500
|
||||||
resp.body = json.dumps({'Error': 'Missing Configuration File'})
|
raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File")
|
||||||
return
|
|
||||||
else:
|
else:
|
||||||
if action == 'list':
|
if action == 'list':
|
||||||
# Form the request URL towards Airflow
|
# Form the request URL towards Airflow
|
||||||
|
|
|
@ -23,12 +23,11 @@ class GetDagStateResource(BaseResource):
|
||||||
|
|
||||||
def on_get(self, req, resp, dag_id, execution_date):
|
def on_get(self, req, resp, dag_id, execution_date):
|
||||||
# Retrieve URL
|
# Retrieve URL
|
||||||
web_server_url = self.retrieve_config('BASE', 'WEB_SERVER')
|
web_server_url = self.retrieve_config('base', 'web_server')
|
||||||
|
|
||||||
if 'Error' in web_server_url:
|
if 'Error' in web_server_url:
|
||||||
resp.status = falcon.HTTP_400
|
resp.status = falcon.HTTP_500
|
||||||
resp.body = json.dumps({'Error': 'Missing Configuration File'})
|
raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File")
|
||||||
return
|
|
||||||
else:
|
else:
|
||||||
req_url = '{}/admin/rest_api/api?api=dag_state&dag_id={}&execution_date={}'.format(web_server_url, dag_id, execution_date)
|
req_url = '{}/admin/rest_api/api?api=dag_state&dag_id={}&execution_date={}'.format(web_server_url, dag_id, execution_date)
|
||||||
response = requests.get(req_url).json()
|
response = requests.get(req_url).json()
|
||||||
|
|
|
@ -23,12 +23,11 @@ class GetTaskStatusResource(BaseResource):
|
||||||
|
|
||||||
def on_get(self, req, resp, dag_id, task_id, execution_date):
|
def on_get(self, req, resp, dag_id, task_id, execution_date):
|
||||||
# Retrieve URL
|
# Retrieve URL
|
||||||
web_server_url = self.retrieve_config('BASE', 'WEB_SERVER')
|
web_server_url = self.retrieve_config('base', 'web_server')
|
||||||
|
|
||||||
if 'Error' in web_server_url:
|
if 'Error' in web_server_url:
|
||||||
resp.status = falcon.HTTP_400
|
resp.status = falcon.HTTP_500
|
||||||
resp.body = json.dumps({'Error': 'Missing Configuration File'})
|
raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File")
|
||||||
return
|
|
||||||
else:
|
else:
|
||||||
req_url = '{}/admin/rest_api/api?api=task_state&dag_id={}&task_id={}&execution_date={}'.format(web_server_url, dag_id, task_id, execution_date)
|
req_url = '{}/admin/rest_api/api?api=task_state&dag_id={}&task_id={}&execution_date={}'.format(web_server_url, dag_id, task_id, execution_date)
|
||||||
response = requests.get(req_url).json()
|
response = requests.get(req_url).json()
|
||||||
|
|
|
@ -23,12 +23,11 @@ class GetAirflowVersionResource(BaseResource):
|
||||||
|
|
||||||
def on_get(self, req, resp):
|
def on_get(self, req, resp):
|
||||||
# Retrieve URL
|
# Retrieve URL
|
||||||
web_server_url = self.retrieve_config('BASE', 'WEB_SERVER')
|
web_server_url = self.retrieve_config('base', 'web_server')
|
||||||
|
|
||||||
if 'Error' in web_server_url:
|
if 'Error' in web_server_url:
|
||||||
resp.status = falcon.HTTP_400
|
resp.status = falcon.HTTP_500
|
||||||
resp.body = json.dumps({'Error': 'Missing Configuration File'})
|
raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File")
|
||||||
return
|
|
||||||
else:
|
else:
|
||||||
# Get Airflow Version
|
# Get Airflow Version
|
||||||
req_url = '{}/admin/rest_api/api?api=version'.format(web_server_url)
|
req_url = '{}/admin/rest_api/api?api=version'.format(web_server_url)
|
||||||
|
|
|
@ -23,12 +23,11 @@ class ListDagsResource(BaseResource):
|
||||||
|
|
||||||
def on_get(self, req, resp):
|
def on_get(self, req, resp):
|
||||||
# Retrieve URL
|
# Retrieve URL
|
||||||
web_server_url = self.retrieve_config('BASE', 'WEB_SERVER')
|
web_server_url = self.retrieve_config('base', 'web_server')
|
||||||
|
|
||||||
if 'Error' in web_server_url:
|
if 'Error' in web_server_url:
|
||||||
resp.status = falcon.HTTP_400
|
resp.status = falcon.HTTP_500
|
||||||
resp.body = json.dumps({'Error': 'Missing Configuration File'})
|
raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File")
|
||||||
return
|
|
||||||
else:
|
else:
|
||||||
# List available dags
|
# List available dags
|
||||||
req_url = '{}/admin/rest_api/api?api=list_dags'.format(web_server_url)
|
req_url = '{}/admin/rest_api/api?api=list_dags'.format(web_server_url)
|
||||||
|
|
|
@ -23,12 +23,11 @@ class ListTasksResource(BaseResource):
|
||||||
|
|
||||||
def on_get(self, req, resp, dag_id):
|
def on_get(self, req, resp, dag_id):
|
||||||
# Retrieve URL
|
# Retrieve URL
|
||||||
web_server_url = self.retrieve_config('BASE', 'WEB_SERVER')
|
web_server_url = self.retrieve_config('base', 'web_server')
|
||||||
|
|
||||||
if 'Error' in web_server_url:
|
if 'Error' in web_server_url:
|
||||||
resp.status = falcon.HTTP_400
|
resp.status = falcon.HTTP_500
|
||||||
resp.body = json.dumps({'Error': 'Missing Configuration File'})
|
raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File")
|
||||||
return
|
|
||||||
else:
|
else:
|
||||||
# Retrieve all tasks belonging to a particular Dag
|
# Retrieve all tasks belonging to a particular Dag
|
||||||
req_url = '{}/admin/rest_api/api?api=list_tasks&dag_id={}'.format(web_server_url, dag_id)
|
req_url = '{}/admin/rest_api/api?api=list_tasks&dag_id={}'.format(web_server_url, dag_id)
|
||||||
|
|
|
@ -24,12 +24,11 @@ class TriggerDagRunResource(BaseResource):
|
||||||
|
|
||||||
def on_get(self, req, resp, dag_id, run_id):
|
def on_get(self, req, resp, dag_id, run_id):
|
||||||
# Retrieve URL
|
# Retrieve URL
|
||||||
web_server_url = self.retrieve_config('BASE', 'WEB_SERVER')
|
web_server_url = self.retrieve_config('base', 'web_server')
|
||||||
|
|
||||||
if 'Error' in web_server_url:
|
if 'Error' in web_server_url:
|
||||||
resp.status = falcon.HTTP_400
|
resp.status = falcon.HTTP_500
|
||||||
resp.body = json.dumps({'Error': 'Missing Configuration File'})
|
raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File")
|
||||||
return
|
|
||||||
else:
|
else:
|
||||||
req_url = '{}/admin/rest_api/api?api=trigger_dag&dag_id={}&run_id={}'.format(web_server_url, dag_id, run_id)
|
req_url = '{}/admin/rest_api/api?api=trigger_dag&dag_id={}&run_id={}'.format(web_server_url, dag_id, run_id)
|
||||||
response = requests.get(req_url).json()
|
response = requests.get(req_url).json()
|
||||||
|
|
|
@ -26,12 +26,11 @@ class TriggerDagRunPollResource(BaseResource):
|
||||||
|
|
||||||
def on_get(self, req, resp, dag_id, run_id):
|
def on_get(self, req, resp, dag_id, run_id):
|
||||||
# Retrieve URL
|
# Retrieve URL
|
||||||
web_server_url = self.retrieve_config('BASE', 'WEB_SERVER')
|
web_server_url = self.retrieve_config('base', 'web_server')
|
||||||
|
|
||||||
if 'Error' in web_server_url:
|
if 'Error' in web_server_url:
|
||||||
resp.status = falcon.HTTP_400
|
resp.status = falcon.HTTP_500
|
||||||
resp.body = json.dumps({'Error': 'Missing Configuration File'})
|
raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File")
|
||||||
return
|
|
||||||
else:
|
else:
|
||||||
req_url = '{}/admin/rest_api/api?api=trigger_dag&dag_id={}&run_id={}'.format(web_server_url, dag_id, run_id)
|
req_url = '{}/admin/rest_api/api?api=trigger_dag&dag_id={}&run_id={}'.format(web_server_url, dag_id, run_id)
|
||||||
response = requests.get(req_url).json()
|
response = requests.get(req_url).json()
|
||||||
|
|
|
@ -23,12 +23,11 @@ class DagRunResource(BaseResource):
|
||||||
|
|
||||||
def on_post(self, req, resp, dag_id, run_id=None, conf=None, execution_date=None):
|
def on_post(self, req, resp, dag_id, run_id=None, conf=None, execution_date=None):
|
||||||
# Retrieve URL
|
# Retrieve URL
|
||||||
web_server_url = self.retrieve_config('BASE', 'WEB_SERVER')
|
web_server_url = self.retrieve_config('base', 'web_server')
|
||||||
|
|
||||||
if 'Error' in web_server_url:
|
if 'Error' in web_server_url:
|
||||||
resp.status = falcon.HTTP_400
|
resp.status = falcon.HTTP_500
|
||||||
resp.body = json.dumps({'Error': 'Missing Configuration File'})
|
raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File")
|
||||||
return
|
|
||||||
else:
|
else:
|
||||||
req_url = '{}/api/experimental/dags/{}/dag_runs'.format(web_server_url, dag_id)
|
req_url = '{}/api/experimental/dags/{}/dag_runs'.format(web_server_url, dag_id)
|
||||||
|
|
||||||
|
|
|
@ -1,3 +1,3 @@
|
||||||
[BASE]
|
[base]
|
||||||
WEB_SERVER=http://localhost:32080
|
web_server=http://localhost:32080
|
||||||
|
|
||||||
|
|
|
@ -23,12 +23,11 @@ class TaskResource(BaseResource):
|
||||||
|
|
||||||
def on_get(self, req, resp, dag_id, task_id):
|
def on_get(self, req, resp, dag_id, task_id):
|
||||||
# Retrieve URL
|
# Retrieve URL
|
||||||
web_server_url = self.retrieve_config('BASE', 'WEB_SERVER')
|
web_server_url = self.retrieve_config('base', 'web_server')
|
||||||
|
|
||||||
if 'Error' in web_server_url:
|
if 'Error' in web_server_url:
|
||||||
resp.status = falcon.HTTP_400
|
resp.status = falcon.HTTP_500
|
||||||
resp.body = json.dumps({'Error': 'Missing Configuration File'})
|
raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File")
|
||||||
return
|
|
||||||
else:
|
else:
|
||||||
req_url = '{}/api/experimental/dags/{}/tasks/{}'.format(web_server_url, dag_id, task_id)
|
req_url = '{}/api/experimental/dags/{}/tasks/{}'.format(web_server_url, dag_id, task_id)
|
||||||
task_details = requests.get(req_url).json()
|
task_details = requests.get(req_url).json()
|
||||||
|
|
Loading…
Reference in New Issue