Merge "Update Shipyard API for Trigger Dag"

This commit is contained in:
Anthony Lin 2017-08-29 14:45:24 -04:00 committed by Gerrit Code Review
commit 0ddcf959a8
3 changed files with 45 additions and 4 deletions

View File

@ -22,7 +22,7 @@ class TriggerDagRunResource(BaseResource):
authorized_roles = ['user']
def on_get(self, req, resp, dag_id, run_id):
def on_get(self, req, resp, dag_id, conf):
# Retrieve URL
web_server_url = self.retrieve_config('base', 'web_server')
@ -31,8 +31,11 @@ class TriggerDagRunResource(BaseResource):
raise falcon.HTTPInternalServerError("Internal Server Error",
"Missing Configuration File")
else:
# "conf" - JSON string that gets pickled into the DagRun's
# conf attribute
req_url = ('{}/admin/rest_api/api?api=trigger_dag&dag_id={}'
'&run_id={}'.format(web_server_url, dag_id, run_id))
'&conf={}'.format(web_server_url, dag_id, conf))
response = requests.get(req_url).json()
# Returns error response if API call returns
@ -42,7 +45,8 @@ class TriggerDagRunResource(BaseResource):
resp.body = response["output"]
return
else:
resp.status = falcon.HTTP_200
# Returns 201 if action is created successfully
resp.status = falcon.HTTP_201
# Return time of execution so that we can use
# it to query dag/task status

View File

@ -56,7 +56,7 @@ def start_api():
('/dag_state/dags/{dag_id}/execution_date/{execution_date}',
GetDagStateResource()),
('/list_tasks/dags/{dag_id}', ListTasksResource()),
('/trigger_dag/dags/{dag_id}/run_id/{run_id}',
('/trigger_dag/dags/{dag_id}/conf/{conf}',
TriggerDagRunResource()),
('/trigger_dag/dags/{dag_id}/run_id/{run_id}/poll',
TriggerDagRunPollResource()),

View File

@ -670,6 +670,43 @@ class REST_API(BaseView):
@staticmethod
def execute_cli_command(airflow_cmd_split):
logging.info("Executing CLI Command")
# There is a need to handle this case separately as the current implementation
# breaks the JSON string into multiple parts in a List and this cause the Airflow
# command to stop working properly
#
# The idea here is to handle the JSON string separately and to make use of the
# fact that the command will take on the following pattern:
#
# `airflow trigger_dag --config '{"message": "Hello World"}' test_1`
#
# , where test_1 is the name of the Dag
#
if airflow_cmd_split[2] == '--conf':
# Initialize list x and extract the JSON string
# from the airflow_cmd_split List
x = []
for i in range(3, len(airflow_cmd_split) - 1):
x.append(airflow_cmd_split[i])
# Initialize list y
y = [None] * 5
# Assign values to list y
for j in range(0, 3):
y[j] = airflow_cmd_split[j]
# Create string using list x and assigns to y[3]
y[3] = " ".join(x)
# Dag name will always be airflow_cmd_split[-1]
y[4] = airflow_cmd_split[-1]
# Assigns updated values to airflow_cmd_split
airflow_cmd_split = y
logging.info(airflow_cmd_split)
process = subprocess.Popen(airflow_cmd_split, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
process.wait()
return REST_API.collect_process_output(process)