Update Logging to List

This commit is contained in:
eanylin 2017-07-03 09:27:16 -05:00
parent ed799e5a01
commit bff471f027
2 changed files with 7 additions and 5 deletions

View File

@ -44,8 +44,7 @@ class TaskStateOperator(BaseOperator):
self.airflow_command = ['airflow', 'task_state', airflow_dag_id, airflow_task_id, airflow_execution_date]
def execute(self, context):
logging.info("Running Airflow Command: " + ' '.join(self.airflow_command))
logging.info("Running Airflow Command: %s", self.airflow_command)
# Execute Airflow CLI Command
airflow_cli = subprocess.Popen(self.airflow_command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)

View File

@ -41,16 +41,17 @@ class OpenStackOperator(BaseOperator):
self.xcom_push_flag = xcom_push
def execute(self, context):
logging.info("Running OpenStack Command: " + ' '.join(self.openstack_command))
logging.info("Running OpenStack Command: %s", self.openstack_command)
# Build environment variables.
# Emulate "source" in bash. Sets up environment variables.
pipe = subprocess.Popen(". %s; env" % self.openrc_file, stdout=subprocess.PIPE, shell=True)
data = pipe.communicate()[0]
os_env = dict((line.split("=", 1) for line in data.splitlines()))
# Execute the OpenStack CLI Command
openstack_cli = subprocess.Popen(self.openstack_command, env=os_env, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
# Logs Output
logging.info("Output:")
@ -59,11 +60,13 @@ class OpenStackOperator(BaseOperator):
line = line.strip()
logging.info(line)
# Wait for child process to terminate. Set and return returncode attribute.
openstack_cli.wait()
logging.info("Command exited with "
"return code {0}".format(openstack_cli.returncode))
# Raise Execptions if OpenStack Command Fails
if openstack_cli.returncode:
raise AirflowException("OpenStack Command Failed")