d6c72d19e6
Updates to Airflow 1.10.1; See (1), (2) for some notes Related, and additionally: configures Airflow to restore logging of workflow steps to a console/sdtout logger, supporting the desired ability to attach logging and monitoring to standard container mechanisms. This does not change the behavior of also logging to the airflow-arranged log files for steps and DAG runs. A side effect of updating to 1.10.1 includes a major decrease in resource usage by the Airflow scheudler process (reducing from ~ 1 core fully consumed to less than 5% of a core consumed YMMV, but significant) Additional adjustment downward of resources allocated, threads produced, and frequency of polling leads to an overall significant reduction in resource usage. Airship note: Because Airflow 1.10.0 and 1.10.1 use compatible versions of celery and dag_run information, updating from 1.10.0 - 1.10.1 in place is possible if airflow-worker pods are allowed to continue to run. (1) https://github.com/apache/incubator-airflow/blob/master/UPDATING.md (2) https://github.com/apache/incubator-airflow/releases/tag/1.10.1 Change-Id: I9b024e3996c528c7b74e2888191d48c7a45a1f04
262 lines
9.6 KiB
Python
262 lines
9.6 KiB
Python
# -*- coding: utf-8 -*-
|
|
#
|
|
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
#
|
|
# !Important Shipyard/Airflow usage note:
|
|
#
|
|
# This file is copied from:
|
|
# https://github.com/apache/incubator-airflow/blob/master/airflow/config_templates/airflow_local_settings.py
|
|
# as this is the recommended way to configure logging as of version 1.10.x
|
|
#
|
|
# See documentation here:
|
|
# https://airflow.readthedocs.io/en/stable/howto/write-logs.html#writing-logs-to-azure-blob-storage
|
|
#
|
|
# (We are not using azure blob storage at this time, but the included
|
|
# instructional steps are pertinent)
|
|
#
|
|
# Because this file is in the "plugins" directory, it should be referenced
|
|
# in the Helm chart's values.yaml as config.log_config.LOGGING_CONFIG
|
|
# as opposed to log_config.LOGGING_CONFIG in a new directory in the PYTHONPATH
|
|
# as noted in the linked instructions.
|
|
#
|
|
|
|
import os
|
|
|
|
from airflow import configuration as conf
|
|
from airflow.utils.file import mkdirs
|
|
|
|
# TODO: Logging format and level should be configured
|
|
# in this file instead of from airflow.cfg. Currently
|
|
# there are other log format and level configurations in
|
|
# settings.py and cli.py. Please see AIRFLOW-1455.
|
|
LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()
|
|
|
|
# Flask appbuilder's info level log is very verbose,
|
|
# so it's set to 'WARN' by default.
|
|
FAB_LOG_LEVEL = conf.get('core', 'FAB_LOGGING_LEVEL').upper()
|
|
|
|
LOG_FORMAT = conf.get('core', 'LOG_FORMAT')
|
|
|
|
BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
|
|
|
|
PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'CHILD_PROCESS_LOG_DIRECTORY')
|
|
|
|
DAG_PROCESSOR_MANAGER_LOG_LOCATION = \
|
|
conf.get('core', 'DAG_PROCESSOR_MANAGER_LOG_LOCATION')
|
|
|
|
FILENAME_TEMPLATE = conf.get('core', 'LOG_FILENAME_TEMPLATE')
|
|
|
|
PROCESSOR_FILENAME_TEMPLATE = conf.get('core',
|
|
'LOG_PROCESSOR_FILENAME_TEMPLATE')
|
|
|
|
# Storage bucket url for remote logging
|
|
# s3 buckets should start with "s3://"
|
|
# gcs buckets should start with "gs://"
|
|
# wasb buckets should start with "wasb"
|
|
# just to help Airflow select correct handler
|
|
REMOTE_BASE_LOG_FOLDER = conf.get('core', 'REMOTE_BASE_LOG_FOLDER')
|
|
|
|
ELASTICSEARCH_HOST = conf.get('elasticsearch', 'ELASTICSEARCH_HOST')
|
|
|
|
LOG_ID_TEMPLATE = conf.get('elasticsearch', 'ELASTICSEARCH_LOG_ID_TEMPLATE')
|
|
|
|
END_OF_LOG_MARK = conf.get('elasticsearch', 'ELASTICSEARCH_END_OF_LOG_MARK')
|
|
|
|
# NOTE: Modified for use by Shipyard/Airflow (rename to LOGGING_CONFIG):
|
|
LOGGING_CONFIG = {
|
|
'version': 1,
|
|
'disable_existing_loggers': False,
|
|
'formatters': {
|
|
'airflow': {
|
|
'format': LOG_FORMAT,
|
|
},
|
|
},
|
|
'handlers': {
|
|
# NOTE: Add a "raw" python console logger. Using 'console' results
|
|
# in a state of recursion.
|
|
'py-console': {
|
|
'class': 'logging.StreamHandler',
|
|
'formatter': 'airflow',
|
|
'stream': 'ext://sys.stdout'
|
|
},
|
|
'console': {
|
|
'class': 'airflow.utils.log.logging_mixin.RedirectStdHandler',
|
|
'formatter': 'airflow',
|
|
'stream': 'sys.stdout'
|
|
},
|
|
'task': {
|
|
'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
|
|
'formatter': 'airflow',
|
|
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
|
|
'filename_template': FILENAME_TEMPLATE,
|
|
},
|
|
'processor': {
|
|
'class':
|
|
'airflow.utils.log.file_processor_handler.FileProcessorHandler',
|
|
'formatter':
|
|
'airflow',
|
|
'base_log_folder':
|
|
os.path.expanduser(PROCESSOR_LOG_FOLDER),
|
|
'filename_template':
|
|
PROCESSOR_FILENAME_TEMPLATE,
|
|
}
|
|
},
|
|
'loggers': {
|
|
'airflow.processor': {
|
|
'handlers': ['processor'],
|
|
'level': LOG_LEVEL,
|
|
'propagate': False,
|
|
},
|
|
'airflow.task': {
|
|
# NOTE: Modified for use by Shipyard/Airflow (add console logging)
|
|
# The supplied console logger cannot be used here, as it
|
|
# Leads to out-of-control memory usage
|
|
'handlers': ['task', 'py-console'],
|
|
'level': LOG_LEVEL,
|
|
'propagate': False,
|
|
},
|
|
'flask_appbuilder': {
|
|
# NOTE: Modified this to be "handlers"
|
|
'handlers': ['console'],
|
|
'level': FAB_LOG_LEVEL,
|
|
'propagate': True,
|
|
}
|
|
},
|
|
'root': {
|
|
'handlers': ['console'],
|
|
'level': LOG_LEVEL,
|
|
}
|
|
}
|
|
|
|
DEFAULT_DAG_PARSING_LOGGING_CONFIG = {
|
|
'handlers': {
|
|
'processor_manager': {
|
|
'class': 'logging.handlers.RotatingFileHandler',
|
|
'formatter': 'airflow',
|
|
'filename': DAG_PROCESSOR_MANAGER_LOG_LOCATION,
|
|
'mode': 'a',
|
|
'maxBytes': 104857600, # 100MB
|
|
'backupCount': 5
|
|
}
|
|
},
|
|
'loggers': {
|
|
'airflow.processor_manager': {
|
|
'handlers': ['processor_manager'],
|
|
'level': LOG_LEVEL,
|
|
'propagate': False,
|
|
}
|
|
}
|
|
}
|
|
|
|
REMOTE_HANDLERS = {
|
|
's3': {
|
|
'task': {
|
|
'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
|
|
'formatter': 'airflow',
|
|
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
|
|
's3_log_folder': REMOTE_BASE_LOG_FOLDER,
|
|
'filename_template': FILENAME_TEMPLATE,
|
|
},
|
|
'processor': {
|
|
'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
|
|
'formatter': 'airflow',
|
|
'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
|
|
's3_log_folder': REMOTE_BASE_LOG_FOLDER,
|
|
'filename_template': PROCESSOR_FILENAME_TEMPLATE,
|
|
},
|
|
},
|
|
'gcs': {
|
|
'task': {
|
|
'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
|
|
'formatter': 'airflow',
|
|
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
|
|
'gcs_log_folder': REMOTE_BASE_LOG_FOLDER,
|
|
'filename_template': FILENAME_TEMPLATE,
|
|
},
|
|
'processor': {
|
|
'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
|
|
'formatter': 'airflow',
|
|
'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
|
|
'gcs_log_folder': REMOTE_BASE_LOG_FOLDER,
|
|
'filename_template': PROCESSOR_FILENAME_TEMPLATE,
|
|
},
|
|
},
|
|
'wasb': {
|
|
'task': {
|
|
'class': 'airflow.utils.log.wasb_task_handler.WasbTaskHandler',
|
|
'formatter': 'airflow',
|
|
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
|
|
'wasb_log_folder': REMOTE_BASE_LOG_FOLDER,
|
|
'wasb_container': 'airflow-logs',
|
|
'filename_template': FILENAME_TEMPLATE,
|
|
'delete_local_copy': False,
|
|
},
|
|
'processor': {
|
|
'class': 'airflow.utils.log.wasb_task_handler.WasbTaskHandler',
|
|
'formatter': 'airflow',
|
|
'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
|
|
'wasb_log_folder': REMOTE_BASE_LOG_FOLDER,
|
|
'wasb_container': 'airflow-logs',
|
|
'filename_template': PROCESSOR_FILENAME_TEMPLATE,
|
|
'delete_local_copy': False,
|
|
},
|
|
},
|
|
'elasticsearch': {
|
|
'task': {
|
|
'class':
|
|
'airflow.utils.log.es_task_handler.ElasticsearchTaskHandler',
|
|
'formatter': 'airflow',
|
|
'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
|
|
'log_id_template': LOG_ID_TEMPLATE,
|
|
'filename_template': FILENAME_TEMPLATE,
|
|
'end_of_log_mark': END_OF_LOG_MARK,
|
|
'host': ELASTICSEARCH_HOST,
|
|
},
|
|
},
|
|
}
|
|
|
|
# NOTE: Modified for use by Shipyard/Airflow to "getboolean" as existing
|
|
# code of conf.get would evaluate "False" as true.
|
|
REMOTE_LOGGING = conf.getboolean('core', 'remote_logging')
|
|
|
|
# Only update the handlers and loggers when CONFIG_PROCESSOR_MANAGER_LOGGER is
|
|
# set.
|
|
# This is to avoid exceptions when initializing RotatingFileHandler multiple
|
|
# times in multiple processes.
|
|
if os.environ.get('CONFIG_PROCESSOR_MANAGER_LOGGER') == 'True':
|
|
LOGGING_CONFIG['handlers'] \
|
|
.update(DEFAULT_DAG_PARSING_LOGGING_CONFIG['handlers'])
|
|
LOGGING_CONFIG['loggers'] \
|
|
.update(DEFAULT_DAG_PARSING_LOGGING_CONFIG['loggers'])
|
|
|
|
# Manually create log directory for processor_manager handler as
|
|
# RotatingFileHandler will only create file but not the directory.
|
|
processor_manager_handler_config = DEFAULT_DAG_PARSING_LOGGING_CONFIG[
|
|
'handlers']['processor_manager']
|
|
directory = os.path.dirname(processor_manager_handler_config['filename'])
|
|
mkdirs(directory, 0o755)
|
|
|
|
if REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('s3://'):
|
|
LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['s3'])
|
|
elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('gs://'):
|
|
LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['gcs'])
|
|
elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('wasb'):
|
|
LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['wasb'])
|
|
elif REMOTE_LOGGING and ELASTICSEARCH_HOST:
|
|
LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['elasticsearch'])
|