Update Codes Based on Feedbacks

This commit is contained in:
eanylin 2017-06-09 21:42:48 -05:00
parent 8e8fdf1450
commit 0eef85185c
10 changed files with 68 additions and 14 deletions

View File

@ -26,6 +26,8 @@ setup(name='shipyard_airflow',
install_requires=[ install_requires=[
'falcon', 'falcon',
'requests', 'requests',
'configparser',
'uwsgi>1.4' 'uwsgi>1.4'
] ]
) )

View File

@ -18,10 +18,10 @@ from .regions import RegionsResource, RegionResource
from .base import ShipyardRequest, BaseResource from .base import ShipyardRequest, BaseResource
from .tasks import TaskResource from .tasks import TaskResource
from .dag_runs import DagRunResource from .dag_runs import DagRunResource
from .middleware import AuthMiddleware, ContextMiddleware, LoggingMiddleware from .middleware import AuthMiddleware, ContextMiddleware, LoggingMiddleware
def start_api(): def start_api():
control_api = falcon.API(request_type=ShipyardRequest, control_api = falcon.API(request_type=ShipyardRequest,
middleware=[AuthMiddleware(), ContextMiddleware(), LoggingMiddleware()]) middleware=[AuthMiddleware(), ContextMiddleware(), LoggingMiddleware()])
@ -51,3 +51,4 @@ class VersionsResource(BaseResource):
'status': 'stable' 'status': 'stable'
}}) }})
resp.status = falcon.HTTP_200 resp.status = falcon.HTTP_200

View File

@ -13,6 +13,8 @@
# limitations under the License. # limitations under the License.
import falcon.request as request import falcon.request as request
import uuid import uuid
import json
import ConfigParser
class BaseResource(object): class BaseResource(object):
@ -40,6 +42,33 @@ class BaseResource(object):
else: else:
return True return True
# Error Handling
def return_error(self, resp, status_code, message="", retry=False):
"""
Write a error message body and throw a Falcon exception to trigger an HTTP status
:param resp: Falcon response object to update
:param status_code: Falcon status_code constant
:param message: Optional error message to include in the body
:param retry: Optional flag whether client should retry the operation. Can ignore if we rely solely on 4XX vs 5xx status codes
"""
resp.body = json.dumps({'type': 'error', 'message': message, 'retry': retry})
resp.content_type = 'application/json'
resp.status = status_code
# Get Config Data
def retrieve_config(self, resp, section="", variable=""):
config = ConfigParser.ConfigParser()
# The current assumption is that shipyard.conf will be placed in a fixed path
# within the shipyard container - Path TBD
config.read('/home/ubuntu/att-comdev/shipyard/shipyard_airflow/control/shipyard.conf')
# Retrieve data from shipyard.conf
query_data = config.get(section, variable)
return query_data
class ShipyardRequestContext(object): class ShipyardRequestContext(object):
@ -72,3 +101,4 @@ class ShipyardRequestContext(object):
class ShipyardRequest(request.Request): class ShipyardRequest(request.Request):
context_type = ShipyardRequestContext context_type = ShipyardRequestContext

View File

@ -13,6 +13,7 @@
# limitations under the License. # limitations under the License.
import falcon import falcon
import requests import requests
import json
from .base import BaseResource from .base import BaseResource
@ -21,19 +22,21 @@ class DagRunResource(BaseResource):
authorized_roles = ['user'] authorized_roles = ['user']
def on_post(self, req, resp, dag_id, run_id=None, conf=None, execution_date=None): def on_post(self, req, resp, dag_id, run_id=None, conf=None, execution_date=None):
req_url = 'http://localhost:32080/api/experimental/dags/{}/dag_runs'.format(dag_id) # Retrieve URL
web_server_url = self.retrieve_config(resp, 'BASE', 'WEB_SERVER')
req_url = '{}/api/experimental/dags/{}/dag_runs'.format(web_server_url, dag_id)
response = requests.post(req_url, response = requests.post(req_url,
json={ json={
"run_id": run_id, "run_id": run_id,
"conf": conf, "conf": conf,
"execution_date": execution_date, "execution_date": execution_date,
}) })
if not response.ok:
raise IOError() if response.ok:
else:
resp.status = falcon.HTTP_200 resp.status = falcon.HTTP_200
else:
self.return_error(resp, falcon.HTTP_400, 'Fail to Execute Dag')
return
data = response.json()
return data['message']

View File

@ -85,4 +85,5 @@ class LoggingMiddleware(object):
} }
resp.append_header('X-Shipyard-Req', ctx.request_id) resp.append_header('X-Shipyard-Req', ctx.request_id)
self.logger.info("%s - %s" % (req.uri, resp.status), extra=extra) self.logger.info("%s - %s" % (req.uri, resp.status), extra=extra)

View File

@ -27,4 +27,5 @@ class RegionResource(BaseResource):
authorized_roles = ['user'] authorized_roles = ['user']
def on_get(self, req, resp, region_id): def on_get(self, req, resp, region_id):
resp.status = falcon.HTTP_200 resp.status = falcon.HTTP_200

View File

@ -0,0 +1,3 @@
[BASE]
WEB_SERVER=http://localhost:32080

View File

@ -22,7 +22,17 @@ class TaskResource(BaseResource):
authorized_roles = ['user'] authorized_roles = ['user']
def on_get(self, req, resp, dag_id, task_id): def on_get(self, req, resp, dag_id, task_id):
resp.status = falcon.HTTP_200 # Retrieve URL
req_url = 'http://localhost:32080/api/experimental/dags/{}/tasks/{}'.format(dag_id, task_id) web_server_url = self.retrieve_config(resp, 'BASE', 'WEB_SERVER')
req_url = '{}/api/experimental/dags/{}/tasks/{}'.format(web_server_url, dag_id, task_id)
task_details = requests.get(req_url).json() task_details = requests.get(req_url).json()
resp.body = json.dumps(task_details)
if 'error' in task_details:
resp.status = falcon.HTTP_400
resp.body = json.dumps(task_details)
return
else:
resp.status = falcon.HTTP_200
resp.body = json.dumps(task_details)

View File

@ -26,6 +26,8 @@ setup(name='shipyard_airflow',
install_requires=[ install_requires=[
'falcon', 'falcon',
'requests', 'requests',
'configparser',
'uwsgi>1.4' 'uwsgi>1.4'
] ]
) )

View File

@ -36,4 +36,5 @@ def start_shipyard():
return api.start_api() return api.start_api()
shipyard = start_shipyard() shipyard = start_shipyard()