Update Error Handling Logic for XCOM Response

This commit is contained in:
eanylin 2017-06-26 17:08:33 +00:00
parent 848e44f7b0
commit e8ea60273e
1 changed files with 8 additions and 5 deletions

View File

@ -61,18 +61,21 @@ class TaskStateOperator(BaseOperator):
pass
else:
logging.info(line)
task_state = line
# Wait for child process to terminate. Set and return returncode attribute.
airflow_cli.wait()
# Raise Execptions if Task State Command Fails
if airflow_cli.returncode:
raise AirflowException("Failed to Retrieve Task State")
# Return XCOM State
task_instance = context['task_instance']
task_instance.xcom_push('task_state', task_state)
if airflow_cli.returncode:
task_state = 'failed'
task_instance.xcom_push('task_state', task_state)
raise AirflowException("Failed to Retrieve Task State")
else:
task_state = line
task_instance.xcom_push('task_state', task_state)
class TaskStatePlugin(AirflowPlugin):