Browse Source

Merge "Update to Airflow 1.10.1 and restore sysout"

changes/78/624178/1
Zuul 6 months ago
parent
commit
95cea0f54e

+ 39
- 18
charts/shipyard/values.yaml View File

@@ -448,8 +448,9 @@ conf:
448 448
       encrypt_s3_logs: "False"
449 449
       logging_level: "INFO"
450 450
       fab_logging_level: "WARN"
451
-      # TODO(bryan-strassner) Use this for custom log formatting!
452
-      logging_config_class: ""
451
+      # See image-bundled log_config.py.
452
+      # Adds console logging of task/step logs.
453
+      logging_config_class: log_config.LOGGING_CONFIG
453 454
       # NOTE: Airflow 1.10 introduces extra newline characters between log
454 455
       #     records. Version 1.10.1 should resolve this issue
455 456
       #     https://issues.apache.org/jira/browse/AIRFLOW-1917
@@ -467,26 +468,29 @@ conf:
467 468
       simple_log_format: "%%(asctime)s %%(levelname)s - %%(message)s"
468 469
       log_filename_template: "{{ ti.dag_id }}/{{ ti.task_id }}/{{ execution_date.strftime('%%Y-%%m-%%dT%%H:%%M:%%S') }}/{{ try_number }}.log"
469 470
       log_processor_filename_template: "{{ filename }}.log"
471
+      dag_processor_manager_log_location: /usr/local/airflow/logs/dag_processor_manager/dag_processor_manager.log
470 472
       hostname_callable: "socket:getfqdn"
471 473
       default_timezone: "utc"
472 474
       executor: "CeleryExecutor"
473 475
       # sql_alchemy_conn is extracted from endpoints by the configmap template
476
+      sql_engine_encoding: "utf-8"
474 477
       sql_alchemy_pool_enabled: "True"
475 478
       sql_alchemy_pool_size: 5
476 479
       sql_alchemy_pool_recycle: 1800
477 480
       sql_alchemy_reconnect_timeout: 30
481
+      sql_alchemy_schema: ""
478 482
       parallelism: 32
479
-      dag_concurrency: 16
483
+      dag_concurrency: 8
480 484
       dags_are_paused_at_creation: "False"
481 485
       non_pooled_task_slot_count: 128
482
-      max_active_runs_per_dag: 16
486
+      max_active_runs_per_dag: 8
483 487
       load_examples: "False"
484 488
       plugins_folder: /usr/local/airflow/plugins
485 489
       fernet_key: fKp7omMJ4QlTxfZzVBSiyXVgeCK-6epRjGgMpEIsjvs=
486 490
       donot_pickle: "False"
487 491
       dagbag_import_timeout: 30
488
-      # NOTE: Versions after 1.10 will change this to StandardTaskRunner
489 492
       task_runner: "BashTaskRunner"
493
+      # task_runner: "StandardTaskRunner"  -- coming soon?
490 494
       default_impersonation: ""
491 495
       security: ""
492 496
       secure_mode: "True"
@@ -495,6 +499,7 @@ conf:
495 499
       enable_xcom_pickling: "False"
496 500
       killed_task_cleanup_time: 60
497 501
       dag_run_conf_overrides_params: "False"
502
+      worker_precheck: "False"
498 503
     cli:
499 504
       api_client: airflow.api.client.local_client
500 505
       # endpoint_url is extracted from endpoints by the configmap template
@@ -511,7 +516,7 @@ conf:
511 516
       username: ""
512 517
       password: ""
513 518
     operators:
514
-      default_owner: "Airflow"
519
+      default_owner: "airflow"
515 520
       default_cpus: 1
516 521
       default_ram: 512
517 522
       default_disk: 512
@@ -528,8 +533,8 @@ conf:
528 533
       web_server_master_timeout: 120
529 534
       web_server_worker_timeout: 120
530 535
       worker_refresh_batch_size: 1
531
-      worker_refresh_interval: 30
532
-      secret_key: "temporary_key"
536
+      worker_refresh_interval: 120
537
+      secret_key: "{SECRET_KEY}"
533 538
       workers: 4
534 539
       worker_class: "sync"
535 540
       access_logfile: "-"
@@ -541,12 +546,13 @@ conf:
541 546
       dag_default_view: "tree"
542 547
       dag_orientation: "LR"
543 548
       demo_mode: "False"
544
-      log_fetch_timeout_sec: 10
549
+      log_fetch_timeout_sec: 20
545 550
       hide_paused_dags_by_default: "False"
546 551
       page_size: 100
547 552
       rbac: "False"
548 553
       navbar_color: "#007A87"
549 554
       default_dag_run_display_number: 25
555
+      enable_proxy_fix: "False"
550 556
     email:
551 557
       # Shipyard is not using this
552 558
       email_backend: airflow.utils.send_email_smtp
@@ -561,14 +567,23 @@ conf:
561 567
       smtp_mail_from: airflow@airflow.local
562 568
     celery:
563 569
       celery_app_name: airflow.executors.celery_executor
564
-      worker_concurrency: 16
570
+      # The concurrency that will be used when starting workers with the
571
+      # "airflow worker" command. This defines the number of task instances
572
+      # that a worker will take, so size up your workers based on the resources
573
+      # on your worker box and the nature of your tasks
574
+      worker_concurrency: 4
565 575
       worker_log_server_port: 8793
566 576
       # broker_url is extracted from endpoints by the configmap template
567 577
       # result_backend is extracted from endpoints by the configmap template
568 578
       flower_host: 0.0.0.0
569
-      flower_url_prefix:
579
+      flower_url_prefix: ""
570 580
       flower_port: 5555
581
+      flower_basic_auth: ""
571 582
       default_queue: "default"
583
+      # How many processes CeleryExecutor uses to sync task state.
584
+      # 0 means to use max(1, number of cores - 1) processes.
585
+      # set to 1 for low-volume use
586
+      sync_parallelism: 1
572 587
       celery_config_options: airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG
573 588
       # TODO: Enable this for security
574 589
       ssl_active: "False"
@@ -584,14 +599,17 @@ conf:
584 599
       tls_cert: ""
585 600
       tls_key: ""
586 601
     scheduler:
587
-      job_heartbeat_sec: 5
588
-      scheduler_heartbeat_sec: 5
602
+      # Task instances listen for external kill signal (when you clear tasks
603
+      # from the CLI or the UI), this defines the frequency at which they
604
+      # should listen (in seconds).
605
+      job_heartbeat_sec: 10
606
+      # The scheduler constantly tries to trigger new tasks (look at the
607
+      # scheduler section in the docs for more information). This defines
608
+      # how often the scheduler should run (in seconds).
609
+      scheduler_heartbeat_sec: 10
589 610
       run_duration: -1
590
-      # Check for pending tasks no more than every 5 seconds
591
-      min_file_process_interval: 5
592
-      # This is part of 1.10, but disabled in 1.10.1 (pending) See:
593
-      # https://github.com/apache/incubator-airflow/blob/master/UPDATING.md#min_file_parsing_loop_time-config-option-temporarily-disabled
594
-      min_file_parsing_loop_time: 1
611
+      # Check for pending dag runs no more than every 10 seconds
612
+      min_file_process_interval: 10
595 613
       dag_dir_list_interval: 300
596 614
       # Stats for the scheduler are minimally useful - every 5 mins is enough
597 615
       print_stats_interval: 300
@@ -606,6 +624,9 @@ conf:
606 624
       # Shipyard's use of Airflow is low volume. 1 Thread is probably enough.
607 625
       max_threads: 1
608 626
       authenticate: "False"
627
+      # Turn off scheduler use of cron intervals by setting this to False.
628
+      # DAGs submitted manually in the web UI or with trigger_dag will still run.
629
+      use_job_schedule: "False"
609 630
     ldap:
610 631
       # Shipyard is not using this
611 632
       uri: ""

+ 3
- 1
images/airflow/Dockerfile View File

@@ -91,9 +91,11 @@ RUN pip3 install -r /tmp/requirements.txt \
91 91
     && pip3 uninstall -y snakebite || true
92 92
 
93 93
 # Copy scripts used in the container:
94
-#   entrypoint.sh, airflow_start_service.sh and airflow_logrotate.sh
95 94
 COPY images/airflow/script/*.sh ${AIRFLOW_HOME}/
96 95
 
96
+# Copy configuration (e.g. logging config for Airflow):
97
+COPY images/airflow/config/*.py ${AIRFLOW_HOME}/config/
98
+
97 99
 # Change permissions
98 100
 RUN chown -R airflow: ${AIRFLOW_HOME}
99 101
 

+ 0
- 0
images/airflow/config/__init__.py View File


+ 261
- 0
images/airflow/config/log_config.py View File

@@ -0,0 +1,261 @@
1
+# -*- coding: utf-8 -*-
2
+#
3
+# Licensed to the Apache Software Foundation (ASF) under one
4
+# or more contributor license agreements.  See the NOTICE file
5
+# distributed with this work for additional information
6
+# regarding copyright ownership.  The ASF licenses this file
7
+# to you under the Apache License, Version 2.0 (the
8
+# "License"); you may not use this file except in compliance
9
+# with the License.  You may obtain a copy of the License at
10
+#
11
+#   http://www.apache.org/licenses/LICENSE-2.0
12
+#
13
+# Unless required by applicable law or agreed to in writing,
14
+# software distributed under the License is distributed on an
15
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16
+# KIND, either express or implied.  See the License for the
17
+# specific language governing permissions and limitations
18
+# under the License.
19
+#
20
+# !Important Shipyard/Airflow usage note:
21
+#
22
+# This file is copied from:
23
+# https://github.com/apache/incubator-airflow/blob/master/airflow/config_templates/airflow_local_settings.py
24
+# as this is the recommended way to configure logging as of version 1.10.x
25
+#
26
+# See documentation here:
27
+# https://airflow.readthedocs.io/en/stable/howto/write-logs.html#writing-logs-to-azure-blob-storage
28
+#
29
+# (We are not using azure blob storage at this time, but the included
30
+# instructional steps are pertinent)
31
+#
32
+# Because this file is in the "plugins" directory, it should be referenced
33
+# in the Helm chart's values.yaml as config.log_config.LOGGING_CONFIG
34
+# as opposed to log_config.LOGGING_CONFIG in a new directory in the PYTHONPATH
35
+# as noted in the linked instructions.
36
+#
37
+
38
+import os
39
+
40
+from airflow import configuration as conf
41
+from airflow.utils.file import mkdirs
42
+
43
+# TODO: Logging format and level should be configured
44
+# in this file instead of from airflow.cfg. Currently
45
+# there are other log format and level configurations in
46
+# settings.py and cli.py. Please see AIRFLOW-1455.
47
+LOG_LEVEL = conf.get('core', 'LOGGING_LEVEL').upper()
48
+
49
+# Flask appbuilder's info level log is very verbose,
50
+# so it's set to 'WARN' by default.
51
+FAB_LOG_LEVEL = conf.get('core', 'FAB_LOGGING_LEVEL').upper()
52
+
53
+LOG_FORMAT = conf.get('core', 'LOG_FORMAT')
54
+
55
+BASE_LOG_FOLDER = conf.get('core', 'BASE_LOG_FOLDER')
56
+
57
+PROCESSOR_LOG_FOLDER = conf.get('scheduler', 'CHILD_PROCESS_LOG_DIRECTORY')
58
+
59
+DAG_PROCESSOR_MANAGER_LOG_LOCATION = \
60
+    conf.get('core', 'DAG_PROCESSOR_MANAGER_LOG_LOCATION')
61
+
62
+FILENAME_TEMPLATE = conf.get('core', 'LOG_FILENAME_TEMPLATE')
63
+
64
+PROCESSOR_FILENAME_TEMPLATE = conf.get('core',
65
+                                       'LOG_PROCESSOR_FILENAME_TEMPLATE')
66
+
67
+# Storage bucket url for remote logging
68
+# s3 buckets should start with "s3://"
69
+# gcs buckets should start with "gs://"
70
+# wasb buckets should start with "wasb"
71
+# just to help Airflow select correct handler
72
+REMOTE_BASE_LOG_FOLDER = conf.get('core', 'REMOTE_BASE_LOG_FOLDER')
73
+
74
+ELASTICSEARCH_HOST = conf.get('elasticsearch', 'ELASTICSEARCH_HOST')
75
+
76
+LOG_ID_TEMPLATE = conf.get('elasticsearch', 'ELASTICSEARCH_LOG_ID_TEMPLATE')
77
+
78
+END_OF_LOG_MARK = conf.get('elasticsearch', 'ELASTICSEARCH_END_OF_LOG_MARK')
79
+
80
+# NOTE: Modified for use by Shipyard/Airflow (rename to LOGGING_CONFIG):
81
+LOGGING_CONFIG = {
82
+    'version': 1,
83
+    'disable_existing_loggers': False,
84
+    'formatters': {
85
+        'airflow': {
86
+            'format': LOG_FORMAT,
87
+        },
88
+    },
89
+    'handlers': {
90
+        # NOTE: Add a "raw" python console logger. Using 'console' results
91
+        #       in a state of recursion.
92
+        'py-console': {
93
+            'class': 'logging.StreamHandler',
94
+            'formatter': 'airflow',
95
+            'stream': 'ext://sys.stdout'
96
+        },
97
+        'console': {
98
+            'class': 'airflow.utils.log.logging_mixin.RedirectStdHandler',
99
+            'formatter': 'airflow',
100
+            'stream': 'sys.stdout'
101
+        },
102
+        'task': {
103
+            'class': 'airflow.utils.log.file_task_handler.FileTaskHandler',
104
+            'formatter': 'airflow',
105
+            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
106
+            'filename_template': FILENAME_TEMPLATE,
107
+        },
108
+        'processor': {
109
+            'class':
110
+            'airflow.utils.log.file_processor_handler.FileProcessorHandler',
111
+            'formatter':
112
+            'airflow',
113
+            'base_log_folder':
114
+            os.path.expanduser(PROCESSOR_LOG_FOLDER),
115
+            'filename_template':
116
+            PROCESSOR_FILENAME_TEMPLATE,
117
+        }
118
+    },
119
+    'loggers': {
120
+        'airflow.processor': {
121
+            'handlers': ['processor'],
122
+            'level': LOG_LEVEL,
123
+            'propagate': False,
124
+        },
125
+        'airflow.task': {
126
+            # NOTE: Modified for use by Shipyard/Airflow (add console logging)
127
+            #       The supplied console logger cannot be used here, as it
128
+            #       Leads to out-of-control memory usage
129
+            'handlers': ['task', 'py-console'],
130
+            'level': LOG_LEVEL,
131
+            'propagate': False,
132
+        },
133
+        'flask_appbuilder': {
134
+             # NOTE: Modified this to be "handlers"
135
+            'handlers': ['console'],
136
+            'level': FAB_LOG_LEVEL,
137
+            'propagate': True,
138
+        }
139
+    },
140
+    'root': {
141
+        'handlers': ['console'],
142
+        'level': LOG_LEVEL,
143
+    }
144
+}
145
+
146
+DEFAULT_DAG_PARSING_LOGGING_CONFIG = {
147
+    'handlers': {
148
+        'processor_manager': {
149
+            'class': 'logging.handlers.RotatingFileHandler',
150
+            'formatter': 'airflow',
151
+            'filename': DAG_PROCESSOR_MANAGER_LOG_LOCATION,
152
+            'mode': 'a',
153
+            'maxBytes': 104857600,  # 100MB
154
+            'backupCount': 5
155
+        }
156
+    },
157
+    'loggers': {
158
+        'airflow.processor_manager': {
159
+            'handlers': ['processor_manager'],
160
+            'level': LOG_LEVEL,
161
+            'propagate': False,
162
+        }
163
+    }
164
+}
165
+
166
+REMOTE_HANDLERS = {
167
+    's3': {
168
+        'task': {
169
+            'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
170
+            'formatter': 'airflow',
171
+            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
172
+            's3_log_folder': REMOTE_BASE_LOG_FOLDER,
173
+            'filename_template': FILENAME_TEMPLATE,
174
+        },
175
+        'processor': {
176
+            'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler',
177
+            'formatter': 'airflow',
178
+            'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
179
+            's3_log_folder': REMOTE_BASE_LOG_FOLDER,
180
+            'filename_template': PROCESSOR_FILENAME_TEMPLATE,
181
+        },
182
+    },
183
+    'gcs': {
184
+        'task': {
185
+            'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
186
+            'formatter': 'airflow',
187
+            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
188
+            'gcs_log_folder': REMOTE_BASE_LOG_FOLDER,
189
+            'filename_template': FILENAME_TEMPLATE,
190
+        },
191
+        'processor': {
192
+            'class': 'airflow.utils.log.gcs_task_handler.GCSTaskHandler',
193
+            'formatter': 'airflow',
194
+            'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
195
+            'gcs_log_folder': REMOTE_BASE_LOG_FOLDER,
196
+            'filename_template': PROCESSOR_FILENAME_TEMPLATE,
197
+        },
198
+    },
199
+    'wasb': {
200
+        'task': {
201
+            'class': 'airflow.utils.log.wasb_task_handler.WasbTaskHandler',
202
+            'formatter': 'airflow',
203
+            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
204
+            'wasb_log_folder': REMOTE_BASE_LOG_FOLDER,
205
+            'wasb_container': 'airflow-logs',
206
+            'filename_template': FILENAME_TEMPLATE,
207
+            'delete_local_copy': False,
208
+        },
209
+        'processor': {
210
+            'class': 'airflow.utils.log.wasb_task_handler.WasbTaskHandler',
211
+            'formatter': 'airflow',
212
+            'base_log_folder': os.path.expanduser(PROCESSOR_LOG_FOLDER),
213
+            'wasb_log_folder': REMOTE_BASE_LOG_FOLDER,
214
+            'wasb_container': 'airflow-logs',
215
+            'filename_template': PROCESSOR_FILENAME_TEMPLATE,
216
+            'delete_local_copy': False,
217
+        },
218
+    },
219
+    'elasticsearch': {
220
+        'task': {
221
+            'class':
222
+            'airflow.utils.log.es_task_handler.ElasticsearchTaskHandler',
223
+            'formatter': 'airflow',
224
+            'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER),
225
+            'log_id_template': LOG_ID_TEMPLATE,
226
+            'filename_template': FILENAME_TEMPLATE,
227
+            'end_of_log_mark': END_OF_LOG_MARK,
228
+            'host': ELASTICSEARCH_HOST,
229
+        },
230
+    },
231
+}
232
+
233
+# NOTE: Modified for use by Shipyard/Airflow to "getboolean" as existing
234
+# code of conf.get would evaluate "False" as true.
235
+REMOTE_LOGGING = conf.getboolean('core', 'remote_logging')
236
+
237
+# Only update the handlers and loggers when CONFIG_PROCESSOR_MANAGER_LOGGER is
238
+# set.
239
+# This is to avoid exceptions when initializing RotatingFileHandler multiple
240
+# times in multiple processes.
241
+if os.environ.get('CONFIG_PROCESSOR_MANAGER_LOGGER') == 'True':
242
+    LOGGING_CONFIG['handlers'] \
243
+        .update(DEFAULT_DAG_PARSING_LOGGING_CONFIG['handlers'])
244
+    LOGGING_CONFIG['loggers'] \
245
+        .update(DEFAULT_DAG_PARSING_LOGGING_CONFIG['loggers'])
246
+
247
+    # Manually create log directory for processor_manager handler as
248
+    # RotatingFileHandler will only create file but not the directory.
249
+    processor_manager_handler_config = DEFAULT_DAG_PARSING_LOGGING_CONFIG[
250
+        'handlers']['processor_manager']
251
+    directory = os.path.dirname(processor_manager_handler_config['filename'])
252
+    mkdirs(directory, 0o755)
253
+
254
+if REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('s3://'):
255
+    LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['s3'])
256
+elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('gs://'):
257
+    LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['gcs'])
258
+elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('wasb'):
259
+    LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['wasb'])
260
+elif REMOTE_LOGGING and ELASTICSEARCH_HOST:
261
+    LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['elasticsearch'])

+ 1
- 1
images/airflow/requirements.txt View File

@@ -18,7 +18,7 @@ ndg-httpsclient==0.5.1
18 18
 pyasn1==0.4.4
19 19
 psycopg2==2.7.5
20 20
 docker==3.5.0
21
-apache-airflow[crypto,celery,postgres,hive,hdfs,jdbc]==1.10.0
21
+apache-airflow[crypto,celery,postgres,hive,hdfs,jdbc]==1.10.1
22 22
 python-openstackclient==3.16.1
23 23
 kubernetes>=6.0.0
24 24
 

+ 1
- 1
src/bin/shipyard_airflow/test-requirements.txt View File

@@ -3,7 +3,7 @@ pytest==3.4
3 3
 pytest-cov==2.5.1
4 4
 responses==0.8.1
5 5
 testfixtures==5.1.1
6
-apache-airflow[crypto,celery,postgres,hive,hdfs,jdbc]==1.10.0
6
+apache-airflow[crypto,celery,postgres,hive,hdfs,jdbc]==1.10.1
7 7
 
8 8
 # TODO(bryan-strassner) Pin to version for airflow when added to the
9 9
 # requirements.txt in the airflow images directory

Loading…
Cancel
Save