diff --git a/shipyard_airflow/control/airflow_connections.py b/shipyard_airflow/control/airflow_connections.py new file mode 100644 index 00000000..27caaef1 --- /dev/null +++ b/shipyard_airflow/control/airflow_connections.py @@ -0,0 +1,121 @@ +# Copyright 2017 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import falcon +import json +import requests +import urlparse + +from .base import BaseResource + +# We need to be able to add/delete connections so that we can create/delete +# connection endpoints that Airflow needs to connect to +class AirflowAddConnectionResource(BaseResource): + + authorized_roles = ['user'] + + def on_get(self, req, resp, action, conn_id, protocol, host, port): + # Retrieve URL + web_server_url = self.retrieve_config('base', 'web_server') + + if 'Error' in web_server_url: + resp.status = falcon.HTTP_500 + raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File") + else: + if action == 'add': + # Concatenate to form the connection URL + netloc = ''.join([host, ':', port]) + url = (protocol, netloc, '','','') + conn_uri = urlparse.urlunsplit(url) + + # Form the request URL towards Airflow + req_url = '{}/admin/rest_api/api?api=connections&add=true&conn_id={}&conn_uri={}'.format(web_server_url, conn_id, conn_uri) + else: + self.return_error(resp, falcon.HTTP_400, 'Invalid Paremeters for Adding Airflow Connection') + return + + response = requests.get(req_url).json() + + # Return output + if response["output"]["stderr"]: + resp.status = falcon.HTTP_400 + resp.body = response["output"]["stderr"] + return + else: + resp.status = falcon.HTTP_200 + resp.body = response["output"]["stdout"] + + +# Delete a particular connection endpoint +class AirflowDeleteConnectionResource(BaseResource): + + authorized_roles = ['user'] + + def on_get(self, req, resp, action, conn_id): + # Retrieve URL + web_server_url = self.retrieve_config('base', 'web_server') + + if 'Error' in web_server_url: + resp.status = falcon.HTTP_500 + raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File") + else: + if action == 'delete': + # Form the request URL towards Airflow + req_url = '{}/admin/rest_api/api?api=connections&delete=true&conn_id={}'.format(web_server_url, conn_id) + else: + self.return_error(resp, falcon.HTTP_400, 'Invalid Paremeters for Deleting Airflow Connection') + return + + response = requests.get(req_url).json() + + # Return output + if response["output"]["stderr"]: + resp.status = falcon.HTTP_400 + resp.body = response["output"]["stderr"] + return + else: + resp.status = falcon.HTTP_200 + resp.body = response["output"]["stdout"] + + +# List all current connection endpoints +class AirflowListConnectionsResource(BaseResource): + + authorized_roles = ['user'] + + def on_get(self, req, resp, action): + # Retrieve URL + web_server_url = self.retrieve_config('base', 'web_server') + + if 'Error' in web_server_url: + resp.status = falcon.HTTP_500 + raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File") + else: + if action == 'list': + # Form the request URL towards Airflow + req_url = '{}/admin/rest_api/api?api=connections&list=true'.format(web_server_url) + else: + self.return_error(resp, falcon.HTTP_400, 'Invalid Paremeters for listing Airflow Connections') + return + + response = requests.get(req_url).json() + + # Return output + if response["output"]["stderr"]: + resp.status = falcon.HTTP_400 + resp.body = response["output"]["stderr"] + return + else: + resp.status = falcon.HTTP_200 + resp.body = response["output"]["stdout"] + diff --git a/shipyard_airflow/control/airflow_dag_state.py b/shipyard_airflow/control/airflow_dag_state.py new file mode 100644 index 00000000..3d6d467a --- /dev/null +++ b/shipyard_airflow/control/airflow_dag_state.py @@ -0,0 +1,42 @@ +# Copyright 2017 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import falcon +import json +import requests + +from .base import BaseResource + +class GetDagStateResource(BaseResource): + + authorized_roles = ['user'] + + def on_get(self, req, resp, dag_id, execution_date): + # Retrieve URL + web_server_url = self.retrieve_config('base', 'web_server') + + if 'Error' in web_server_url: + resp.status = falcon.HTTP_500 + raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File") + else: + req_url = '{}/admin/rest_api/api?api=dag_state&dag_id={}&execution_date={}'.format(web_server_url, dag_id, execution_date) + response = requests.get(req_url).json() + + if response["output"]["stderr"]: + resp.status = falcon.HTTP_400 + resp.body = response["output"]["stderr"] + return + else: + resp.status = falcon.HTTP_200 + resp.body = response["output"]["stdout"] + diff --git a/shipyard_airflow/control/airflow_get_task_status.py b/shipyard_airflow/control/airflow_get_task_status.py new file mode 100644 index 00000000..d768504f --- /dev/null +++ b/shipyard_airflow/control/airflow_get_task_status.py @@ -0,0 +1,42 @@ +# Copyright 2017 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import falcon +import json +import requests + +from .base import BaseResource + +class GetTaskStatusResource(BaseResource): + + authorized_roles = ['user'] + + def on_get(self, req, resp, dag_id, task_id, execution_date): + # Retrieve URL + web_server_url = self.retrieve_config('base', 'web_server') + + if 'Error' in web_server_url: + resp.status = falcon.HTTP_500 + raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File") + else: + req_url = '{}/admin/rest_api/api?api=task_state&dag_id={}&task_id={}&execution_date={}'.format(web_server_url, dag_id, task_id, execution_date) + response = requests.get(req_url).json() + + if response["output"]["stderr"]: + resp.status = falcon.HTTP_400 + resp.body = response["output"]["stderr"] + return + else: + resp.status = falcon.HTTP_200 + resp.body = response["output"]["stdout"] + diff --git a/shipyard_airflow/control/airflow_get_version.py b/shipyard_airflow/control/airflow_get_version.py new file mode 100644 index 00000000..07abed0a --- /dev/null +++ b/shipyard_airflow/control/airflow_get_version.py @@ -0,0 +1,42 @@ +# Copyright 2017 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import falcon +import json +import requests + +from .base import BaseResource + +class GetAirflowVersionResource(BaseResource): + + authorized_roles = ['user'] + + def on_get(self, req, resp): + # Retrieve URL + web_server_url = self.retrieve_config('base', 'web_server') + + if 'Error' in web_server_url: + resp.status = falcon.HTTP_500 + raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File") + else: + # Get Airflow Version + req_url = '{}/admin/rest_api/api?api=version'.format(web_server_url) + response = requests.get(req_url).json() + + if response["output"]: + resp.status = falcon.HTTP_200 + resp.body = response["output"] + else: + self.return_error(resp, falcon.HTTP_400, 'Fail to Retrieve Airflow Version') + return + diff --git a/shipyard_airflow/control/airflow_list_dags.py b/shipyard_airflow/control/airflow_list_dags.py new file mode 100644 index 00000000..ee2fc8c7 --- /dev/null +++ b/shipyard_airflow/control/airflow_list_dags.py @@ -0,0 +1,43 @@ +# Copyright 2017 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import falcon +import json +import requests + +from .base import BaseResource + +class ListDagsResource(BaseResource): + + authorized_roles = ['user'] + + def on_get(self, req, resp): + # Retrieve URL + web_server_url = self.retrieve_config('base', 'web_server') + + if 'Error' in web_server_url: + resp.status = falcon.HTTP_500 + raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File") + else: + # List available dags + req_url = '{}/admin/rest_api/api?api=list_dags'.format(web_server_url) + response = requests.get(req_url).json() + + if response["output"]["stderr"]: + resp.status = falcon.HTTP_400 + resp.body = response["output"]["stderr"] + return + else: + resp.status = falcon.HTTP_200 + resp.body = response["output"]["stdout"] + diff --git a/shipyard_airflow/control/airflow_list_tasks.py b/shipyard_airflow/control/airflow_list_tasks.py new file mode 100644 index 00000000..b8b848a6 --- /dev/null +++ b/shipyard_airflow/control/airflow_list_tasks.py @@ -0,0 +1,43 @@ +# Copyright 2017 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import falcon +import json +import requests + +from .base import BaseResource + +class ListTasksResource(BaseResource): + + authorized_roles = ['user'] + + def on_get(self, req, resp, dag_id): + # Retrieve URL + web_server_url = self.retrieve_config('base', 'web_server') + + if 'Error' in web_server_url: + resp.status = falcon.HTTP_500 + raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File") + else: + # Retrieve all tasks belonging to a particular Dag + req_url = '{}/admin/rest_api/api?api=list_tasks&dag_id={}'.format(web_server_url, dag_id) + response = requests.get(req_url).json() + + if response["output"]["stderr"]: + resp.status = falcon.HTTP_400 + resp.body = response["output"]["stderr"] + return + else: + resp.status = falcon.HTTP_200 + resp.body = response["output"]["stdout"] + diff --git a/shipyard_airflow/control/airflow_trigger_dag.py b/shipyard_airflow/control/airflow_trigger_dag.py new file mode 100644 index 00000000..1e860aa9 --- /dev/null +++ b/shipyard_airflow/control/airflow_trigger_dag.py @@ -0,0 +1,47 @@ +# Copyright 2017 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import falcon +import json +import requests + +from dateutil.parser import parse +from .base import BaseResource + +class TriggerDagRunResource(BaseResource): + + authorized_roles = ['user'] + + def on_get(self, req, resp, dag_id, run_id): + # Retrieve URL + web_server_url = self.retrieve_config('base', 'web_server') + + if 'Error' in web_server_url: + resp.status = falcon.HTTP_500 + raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File") + else: + req_url = '{}/admin/rest_api/api?api=trigger_dag&dag_id={}&run_id={}'.format(web_server_url, dag_id, run_id) + response = requests.get(req_url).json() + + # Returns error response if API call returns response code other than 200 + if response["http_response_code"] != 200: + resp.status = falcon.HTTP_400 + resp.body = response["output"] + return + else: + resp.status = falcon.HTTP_200 + + # Return time of execution so that we can use it to query dag/task status + dt = parse(response["response_time"]) + resp.body = dt.strftime('%Y-%m-%dT%H:%M:%S') + diff --git a/shipyard_airflow/control/airflow_trigger_dag_poll.py b/shipyard_airflow/control/airflow_trigger_dag_poll.py new file mode 100644 index 00000000..05db6997 --- /dev/null +++ b/shipyard_airflow/control/airflow_trigger_dag_poll.py @@ -0,0 +1,81 @@ +# Copyright 2017 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import falcon +import json +import requests +import time +import logging + +from dateutil.parser import parse +from .base import BaseResource + +class TriggerDagRunPollResource(BaseResource): + + authorized_roles = ['user'] + + def on_get(self, req, resp, dag_id, run_id): + # Retrieve URL + web_server_url = self.retrieve_config('base', 'web_server') + + if 'Error' in web_server_url: + resp.status = falcon.HTTP_500 + raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File") + else: + req_url = '{}/admin/rest_api/api?api=trigger_dag&dag_id={}&run_id={}'.format(web_server_url, dag_id, run_id) + response = requests.get(req_url).json() + + if response["http_response_code"] != 200: + resp.status = falcon.HTTP_400 + resp.body = response["output"] + return + else: + resp.status = falcon.HTTP_200 + + logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') + logging.info("Executing '" + dag_id + "' Dag...") + + # Retrieve time of execution so that we can use it to query dag/task status + dt = parse(response["response_time"]) + exec_date = dt.strftime('%Y-%m-%dT%H:%M:%S') + + url = '{}/admin/rest_api/api?api=dag_state&dag_id={}&execution_date={}'.format(web_server_url, dag_id, exec_date) + + # Back off for 5 seconds before querying the initial state + time.sleep( 5 ) + dag_state = requests.get(url).json() + + # Remove newline character at the end of the response + dag_state = dag_state["output"]["stdout"].encode('utf8').rstrip() + + while dag_state != 'success': + # Get current state + dag_state = requests.get(url).json() + + # Remove newline character at the end of the response + dag_state = dag_state["output"]["stdout"].encode('utf8').rstrip() + + # Logs output of current dag state + logging.info('Current Dag State: ' + dag_state) + + if dag_state == 'failed': + resp.status = falcon.HTTP_500 + logging.info('Dag Execution Failed') + resp.body = json.dumps({'Error': 'Dag Execution Failed'}) + return + + # Wait for 20 seconds before doing a new query + time.sleep( 20 ) + + logging.info('Dag Successfully Executed') + diff --git a/shipyard_airflow/control/api.py b/shipyard_airflow/control/api.py index 4a004aa8..891b6634 100644 --- a/shipyard_airflow/control/api.py +++ b/shipyard_airflow/control/api.py @@ -18,6 +18,16 @@ from .regions import RegionsResource, RegionResource from .base import ShipyardRequest, BaseResource from .tasks import TaskResource from .dag_runs import DagRunResource +from .airflow_get_task_status import GetTaskStatusResource +from .airflow_list_tasks import ListTasksResource +from .airflow_list_dags import ListDagsResource +from .airflow_dag_state import GetDagStateResource +from .airflow_trigger_dag import TriggerDagRunResource +from .airflow_trigger_dag_poll import TriggerDagRunPollResource +from .airflow_connections import AirflowAddConnectionResource +from .airflow_connections import AirflowDeleteConnectionResource +from .airflow_connections import AirflowListConnectionsResource +from .airflow_get_version import GetAirflowVersionResource from .middleware import AuthMiddleware, ContextMiddleware, LoggingMiddleware def start_api(): @@ -34,10 +44,20 @@ def start_api(): ('/regions/{region_id}', RegionResource()), ('/dags/{dag_id}/tasks/{task_id}', TaskResource()), ('/dags/{dag_id}/dag_runs', DagRunResource()), + ('/list_dags', ListDagsResource()), + ('/task_state/dags/{dag_id}/tasks/{task_id}/execution_date/{execution_date}', GetTaskStatusResource()), + ('/dag_state/dags/{dag_id}/execution_date/{execution_date}', GetDagStateResource()), + ('/list_tasks/dags/{dag_id}', ListTasksResource()), + ('/trigger_dag/dags/{dag_id}/run_id/{run_id}', TriggerDagRunResource()), + ('/trigger_dag/dags/{dag_id}/run_id/{run_id}/poll', TriggerDagRunPollResource()), + ('/connections/{action}/conn_id/{conn_id}/protocol/{protocol}/host/{host}/port/{port}', AirflowAddConnectionResource()), + ('/connections/{action}/conn_id/{conn_id}', AirflowDeleteConnectionResource()), + ('/connections/{action}', AirflowListConnectionsResource()), + ('/airflow/version', GetAirflowVersionResource()), ] for path, res in v1_0_routes: - control_api.add_route('/api/experimental' + path, res) + control_api.add_route('/api/v1.0' + path, res) return control_api diff --git a/shipyard_airflow/control/dag_runs.py b/shipyard_airflow/control/dag_runs.py index 7a462d3d..ee480634 100644 --- a/shipyard_airflow/control/dag_runs.py +++ b/shipyard_airflow/control/dag_runs.py @@ -23,12 +23,11 @@ class DagRunResource(BaseResource): def on_post(self, req, resp, dag_id, run_id=None, conf=None, execution_date=None): # Retrieve URL - web_server_url = self.retrieve_config('BASE', 'WEB_SERVER') + web_server_url = self.retrieve_config('base', 'web_server') if 'Error' in web_server_url: - resp.status = falcon.HTTP_400 - resp.body = json.dumps({'Error': 'Missing Configuration File'}) - return + resp.status = falcon.HTTP_500 + raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File") else: req_url = '{}/api/experimental/dags/{}/dag_runs'.format(web_server_url, dag_id) diff --git a/shipyard_airflow/control/shipyard.conf b/shipyard_airflow/control/shipyard.conf index 6dadc48c..bb1e928d 100644 --- a/shipyard_airflow/control/shipyard.conf +++ b/shipyard_airflow/control/shipyard.conf @@ -1,3 +1,3 @@ -[BASE] -WEB_SERVER=http://localhost:32080 +[base] +web_server=http://localhost:32080 diff --git a/shipyard_airflow/control/tasks.py b/shipyard_airflow/control/tasks.py index 9cb2f6e8..e67b79e6 100644 --- a/shipyard_airflow/control/tasks.py +++ b/shipyard_airflow/control/tasks.py @@ -23,12 +23,11 @@ class TaskResource(BaseResource): def on_get(self, req, resp, dag_id, task_id): # Retrieve URL - web_server_url = self.retrieve_config('BASE', 'WEB_SERVER') + web_server_url = self.retrieve_config('base', 'web_server') if 'Error' in web_server_url: - resp.status = falcon.HTTP_400 - resp.body = json.dumps({'Error': 'Missing Configuration File'}) - return + resp.status = falcon.HTTP_500 + raise falcon.HTTPInternalServerError("Internal Server Error", "Missing Configuration File") else: req_url = '{}/api/experimental/dags/{}/tasks/{}'.format(web_server_url, dag_id, task_id) task_details = requests.get(req_url).json()