# -*- coding: utf-8 -*- # # Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # # !Important Shipyard/Airflow usage note: # # This file is copied from: # https://github.com/apache/incubator-airflow/blob/master/airflow/config_templates/airflow_local_settings.py # as this is the recommended way to configure logging as of version 1.10.x # # See documentation here: # https://airflow.readthedocs.io/en/stable/howto/write-logs.html#writing-logs-to-azure-blob-storage # # (We are not using azure blob storage at this time, but the included # instructional steps are pertinent) # # Because this file is in the "plugins" directory, it should be referenced # in the Helm chart's values.yaml as config.log_config.LOGGING_CONFIG # as opposed to log_config.LOGGING_CONFIG in a new directory in the PYTHONPATH # as noted in the linked instructions. # import os from airflow import configuration as conf from airflow.utils.file import mkdirs # TODO: Logging format and level should be configured # in this file instead of from airflow.cfg. Currently # there are other log format and level configurations in # settings.py and cli.py. Please see AIRFLOW-1455. LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper() # Flask appbuilder's info level log is very verbose, # so it's set to 'WARN' by default. FAB_LOG_LEVEL = conf.get('core', 'FAB_LOGGING_LEVEL').upper() LOG_FORMAT = conf.get('core', 'LOG_FORMAT') BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER') PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'CHILD_PROCESS_LOG_DIRECTORY') DAG_PROCESSOR_MANAGER_LOG_LOCATION = \ conf.get('core', 'DAG_PROCESSOR_MANAGER_LOG_LOCATION') FILENAME_TEMPLATE = conf.get('core', 'LOG_FILENAME_TEMPLATE') PROCESSOR_FILENAME_TEMPLATE = conf.get('core', 'LOG_PROCESSOR_FILENAME_TEMPLATE') # Storage bucket url for remote logging # s3 buckets should start with "s3://" # gcs buckets should start with "gs://" # wasb buckets should start with "wasb" # just to help Airflow select correct handler REMOTE_BASE_LOG_FOLDER = conf.get('core', 'REMOTE_BASE_LOG_FOLDER') ELASTICSEARCH_HOST = conf.get('elasticsearch', 'ELASTICSEARCH_HOST') LOG_ID_TEMPLATE = conf.get('elasticsearch', 'ELASTICSEARCH_LOG_ID_TEMPLATE') END_OF_LOG_MARK = conf.get('elasticsearch', 'ELASTICSEARCH_END_OF_LOG_MARK') # NOTE: Modified for use by Shipyard/Airflow (rename to LOGGING_CONFIG): LOGGING_CONFIG = { 'version': 1, 'disable_existing_loggers': False, 'formatters': { 'airflow': { 'format': LOG_FORMAT, }, }, 'handlers': { # NOTE: Add a "raw" python console logger. Using 'console' results # in a state of recursion. 'py-console': { 'class': 'logging.StreamHandler', 'formatter': 'airflow', 'stream': 'ext://sys.stdout' }, 'console': { 'class': 'airflow.utils.log.logging_mixin.RedirectStdHandler', 'formatter': 'airflow', 'stream': 'sys.stdout' }, 'task': { 'class': 'airflow.utils.log.file_task_handler.FileTaskHandler', 'formatter': 'airflow', 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER), 'filename_template': FILENAME_TEMPLATE, }, 'processor': { 'class': 'airflow.utils.log.file_processor_handler.FileProcessorHandler', 'formatter': 'airflow', 'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER), 'filename_template': PROCESSOR_FILENAME_TEMPLATE, } }, 'loggers': { 'airflow.processor': { 'handlers': ['processor'], 'level': LOG_LEVEL, 'propagate': False, }, 'airflow.task': { # NOTE: Modified for use by Shipyard/Airflow (add console logging) # The supplied console logger cannot be used here, as it # Leads to out-of-control memory usage 'handlers': ['task', 'py-console'], 'level': LOG_LEVEL, 'propagate': False, }, 'flask_appbuilder': { # NOTE: Modified this to be "handlers" 'handlers': ['console'], 'level': FAB_LOG_LEVEL, 'propagate': True, } }, 'root': { 'handlers': ['console'], 'level': LOG_LEVEL, } } DEFAULT_DAG_PARSING_LOGGING_CONFIG = { 'handlers': { 'processor_manager': { 'class': 'logging.handlers.RotatingFileHandler', 'formatter': 'airflow', 'filename': DAG_PROCESSOR_MANAGER_LOG_LOCATION, 'mode': 'a', 'maxBytes': 104857600, # 100MB 'backupCount': 5 } }, 'loggers': { 'airflow.processor_manager': { 'handlers': ['processor_manager'], 'level': LOG_LEVEL, 'propagate': False, } } } REMOTE_HANDLERS = { 's3': { 'task': { 'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler', 'formatter': 'airflow', 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER), 's3_log_folder': REMOTE_BASE_LOG_FOLDER, 'filename_template': FILENAME_TEMPLATE, }, 'processor': { 'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler', 'formatter': 'airflow', 'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER), 's3_log_folder': REMOTE_BASE_LOG_FOLDER, 'filename_template': PROCESSOR_FILENAME_TEMPLATE, }, }, 'gcs': { 'task': { 'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler', 'formatter': 'airflow', 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER), 'gcs_log_folder': REMOTE_BASE_LOG_FOLDER, 'filename_template': FILENAME_TEMPLATE, }, 'processor': { 'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler', 'formatter': 'airflow', 'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER), 'gcs_log_folder': REMOTE_BASE_LOG_FOLDER, 'filename_template': PROCESSOR_FILENAME_TEMPLATE, }, }, 'wasb': { 'task': { 'class': 'airflow.utils.log.wasb_task_handler.WasbTaskHandler', 'formatter': 'airflow', 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER), 'wasb_log_folder': REMOTE_BASE_LOG_FOLDER, 'wasb_container': 'airflow-logs', 'filename_template': FILENAME_TEMPLATE, 'delete_local_copy': False, }, 'processor': { 'class': 'airflow.utils.log.wasb_task_handler.WasbTaskHandler', 'formatter': 'airflow', 'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER), 'wasb_log_folder': REMOTE_BASE_LOG_FOLDER, 'wasb_container': 'airflow-logs', 'filename_template': PROCESSOR_FILENAME_TEMPLATE, 'delete_local_copy': False, }, }, 'elasticsearch': { 'task': { 'class': 'airflow.utils.log.es_task_handler.ElasticsearchTaskHandler', 'formatter': 'airflow', 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER), 'log_id_template': LOG_ID_TEMPLATE, 'filename_template': FILENAME_TEMPLATE, 'end_of_log_mark': END_OF_LOG_MARK, 'host': ELASTICSEARCH_HOST, }, }, } # NOTE: Modified for use by Shipyard/Airflow to "getboolean" as existing # code of conf.get would evaluate "False" as true. REMOTE_LOGGING = conf.getboolean('core', 'remote_logging') # Only update the handlers and loggers when CONFIG_PROCESSOR_MANAGER_LOGGER is # set. # This is to avoid exceptions when initializing RotatingFileHandler multiple # times in multiple processes. if os.environ.get('CONFIG_PROCESSOR_MANAGER_LOGGER') == 'True': LOGGING_CONFIG['handlers'] \ .update(DEFAULT_DAG_PARSING_LOGGING_CONFIG['handlers']) LOGGING_CONFIG['loggers'] \ .update(DEFAULT_DAG_PARSING_LOGGING_CONFIG['loggers']) # Manually create log directory for processor_manager handler as # RotatingFileHandler will only create file but not the directory. processor_manager_handler_config = DEFAULT_DAG_PARSING_LOGGING_CONFIG[ 'handlers']['processor_manager'] directory = os.path.dirname(processor_manager_handler_config['filename']) mkdirs(directory, 0o755) if REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('s3://'): LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['s3']) elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('gs://'): LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['gcs']) elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('wasb'): LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['wasb']) elif REMOTE_LOGGING and ELASTICSEARCH_HOST: LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['elasticsearch'])