Airflow fix
Change-Id: I3f6d3961b1511e3b7cd2f7aab9810d033cfc14a3
This commit is contained in:
parent
1ba0e72628
commit
633870f9a7
|
@ -120,6 +120,10 @@ AUTHORS
|
|||
# vscode
|
||||
.vscode/
|
||||
|
||||
|
||||
# pycharm
|
||||
.idea
|
||||
|
||||
# tests
|
||||
airship-ucp-shipyard.values.yaml
|
||||
airflow-webserver.pid
|
||||
|
@ -128,3 +132,4 @@ airflow.db
|
|||
latest
|
||||
src/bin/shipyard_airflow/shipyard_airflow/config
|
||||
src/bin/shipyard_airflow/shipyard_airflow/webserver_config.py
|
||||
airflow-runtime
|
||||
|
|
|
@ -476,8 +476,8 @@ conf:
|
|||
- broker_url
|
||||
- result_backend
|
||||
- fernet_key
|
||||
lazy_discover_providers: "False"
|
||||
lazy_load_plugins: "False"
|
||||
lazy_discover_providers: "True"
|
||||
lazy_load_plugins: "True"
|
||||
hostname_callable: "socket:getfqdn"
|
||||
default_timezone: "utc"
|
||||
executor: "CeleryExecutor"
|
||||
|
@ -526,7 +526,7 @@ conf:
|
|||
# See image-bundled log_config.py.
|
||||
# Adds console logging of task/step logs.
|
||||
# logging_config_class: log_config.LOGGING_CONFIG
|
||||
logging_config_class: new_log_config.LOGGING_CONFIG
|
||||
logging_config_class: new_log_config.DEFAULT_LOGGING_CONFIG
|
||||
# NOTE: Airflow 1.10 introduces extra newline characters between log
|
||||
# records. Version 1.10.1 should resolve this issue
|
||||
# https://issues.apache.org/jira/browse/AIRFLOW-1917
|
||||
|
@ -544,9 +544,9 @@ conf:
|
|||
log_filename_template: "{{ ti.dag_id }}/{{ ti.task_id }}/{{ execution_date.strftime('%%Y-%%m-%%dT%%H:%%M:%%S') }}/{{ try_number }}.log"
|
||||
log_processor_filename_template: "{{ filename }}.log"
|
||||
dag_processor_manager_log_location: /usr/local/airflow/logs/dag_processor_manager/dag_processor_manager.log
|
||||
logging_level: "INFO"
|
||||
logging_level: "DEBUG"
|
||||
fab_logging_level: "WARNING"
|
||||
celery_logging_level: "INFO"
|
||||
celery_logging_level: "DEBUG"
|
||||
base_log_folder: /usr/local/airflow/logs
|
||||
remote_logging: "False"
|
||||
remote_log_conn_id: ""
|
||||
|
|
|
@ -1,4 +1,341 @@
|
|||
from copy import deepcopy
|
||||
from airflow.config_templates.airflow_local_settings import DEFAULT_LOGGING_CONFIG
|
||||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing,
|
||||
# software distributed under the License is distributed on an
|
||||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
# KIND, either express or implied. See the License for the
|
||||
# specific language governing permissions and limitations
|
||||
# under the License.
|
||||
"""Airflow logging settings."""
|
||||
from __future__ import annotations
|
||||
|
||||
LOGGING_CONFIG = deepcopy(DEFAULT_LOGGING_CONFIG)
|
||||
import os
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from urllib.parse import urlsplit
|
||||
|
||||
from airflow.configuration import conf
|
||||
from airflow.exceptions import AirflowException
|
||||
|
||||
LOG_LEVEL: str = conf.get_mandatory_value("logging", "LOGGING_LEVEL").upper()
|
||||
|
||||
|
||||
# Flask appbuilder's info level log is very verbose,
|
||||
# so it's set to 'WARN' by default.
|
||||
FAB_LOG_LEVEL: str = conf.get_mandatory_value("logging", "FAB_LOGGING_LEVEL").upper()
|
||||
|
||||
LOG_FORMAT: str = conf.get_mandatory_value("logging", "LOG_FORMAT")
|
||||
DAG_PROCESSOR_LOG_FORMAT: str = conf.get_mandatory_value("logging", "DAG_PROCESSOR_LOG_FORMAT")
|
||||
|
||||
LOG_FORMATTER_CLASS: str = conf.get_mandatory_value(
|
||||
"logging", "LOG_FORMATTER_CLASS", fallback="airflow.utils.log.timezone_aware.TimezoneAware"
|
||||
)
|
||||
|
||||
COLORED_LOG_FORMAT: str = conf.get_mandatory_value("logging", "COLORED_LOG_FORMAT")
|
||||
|
||||
COLORED_LOG: bool = conf.getboolean("logging", "COLORED_CONSOLE_LOG")
|
||||
|
||||
COLORED_FORMATTER_CLASS: str = conf.get_mandatory_value("logging", "COLORED_FORMATTER_CLASS")
|
||||
|
||||
DAG_PROCESSOR_LOG_TARGET: str = conf.get_mandatory_value("logging", "DAG_PROCESSOR_LOG_TARGET")
|
||||
|
||||
BASE_LOG_FOLDER: str = conf.get_mandatory_value("logging", "BASE_LOG_FOLDER")
|
||||
|
||||
PROCESSOR_LOG_FOLDER: str = conf.get_mandatory_value("scheduler", "CHILD_PROCESS_LOG_DIRECTORY")
|
||||
|
||||
DAG_PROCESSOR_MANAGER_LOG_LOCATION: str = conf.get_mandatory_value(
|
||||
"logging", "DAG_PROCESSOR_MANAGER_LOG_LOCATION"
|
||||
)
|
||||
|
||||
# FILENAME_TEMPLATE only uses in Remote Logging Handlers since Airflow 2.3.3
|
||||
# All of these handlers inherited from FileTaskHandler and providing any value rather than None
|
||||
# would raise deprecation warning.
|
||||
FILENAME_TEMPLATE: str | None = None
|
||||
|
||||
PROCESSOR_FILENAME_TEMPLATE: str = conf.get_mandatory_value("logging", "LOG_PROCESSOR_FILENAME_TEMPLATE")
|
||||
|
||||
DEFAULT_LOGGING_CONFIG: dict[str, Any] = {
|
||||
"version": 1,
|
||||
"disable_existing_loggers": False,
|
||||
"formatters": {
|
||||
"airflow": {
|
||||
"format": LOG_FORMAT,
|
||||
"class": LOG_FORMATTER_CLASS,
|
||||
},
|
||||
"airflow_coloured": {
|
||||
"format": COLORED_LOG_FORMAT if COLORED_LOG else LOG_FORMAT,
|
||||
"class": COLORED_FORMATTER_CLASS if COLORED_LOG else LOG_FORMATTER_CLASS,
|
||||
},
|
||||
"source_processor": {
|
||||
"format": DAG_PROCESSOR_LOG_FORMAT,
|
||||
"class": LOG_FORMATTER_CLASS,
|
||||
},
|
||||
},
|
||||
"filters": {
|
||||
"mask_secrets": {
|
||||
"()": "airflow.utils.log.secrets_masker.SecretsMasker",
|
||||
},
|
||||
},
|
||||
"handlers": {
|
||||
# NOTE: Add a "raw" python console logger. Using 'console' results
|
||||
# in a state of recursion.
|
||||
'py-console': {
|
||||
'class': 'logging.StreamHandler',
|
||||
'formatter': 'airflow',
|
||||
'stream': 'ext://sys.stdout',
|
||||
"filters": ["mask_secrets"],
|
||||
},
|
||||
"console": {
|
||||
"class": "airflow.utils.log.logging_mixin.RedirectStdHandler",
|
||||
"formatter": "airflow_coloured",
|
||||
"stream": "sys.stdout",
|
||||
"filters": ["mask_secrets"],
|
||||
},
|
||||
"task": {
|
||||
"class": "airflow.utils.log.file_task_handler.FileTaskHandler",
|
||||
"formatter": "airflow",
|
||||
"base_log_folder": os.path.expanduser(BASE_LOG_FOLDER),
|
||||
"filters": ["mask_secrets"],
|
||||
},
|
||||
"processor": {
|
||||
"class": "airflow.utils.log.file_processor_handler.FileProcessorHandler",
|
||||
"formatter": "airflow",
|
||||
"base_log_folder": os.path.expanduser(PROCESSOR_LOG_FOLDER),
|
||||
"filename_template": PROCESSOR_FILENAME_TEMPLATE,
|
||||
"filters": ["mask_secrets"],
|
||||
},
|
||||
"processor_to_stdout": {
|
||||
"class": "airflow.utils.log.logging_mixin.RedirectStdHandler",
|
||||
"formatter": "source_processor",
|
||||
"stream": "sys.stdout",
|
||||
"filters": ["mask_secrets"],
|
||||
},
|
||||
},
|
||||
"loggers": {
|
||||
"airflow.processor": {
|
||||
"handlers": ["processor_to_stdout" if DAG_PROCESSOR_LOG_TARGET == "stdout" else "processor"],
|
||||
"level": LOG_LEVEL,
|
||||
# Set to true here (and reset via set_context) so that if no file is configured we still get logs!
|
||||
"propagate": True,
|
||||
},
|
||||
"airflow.task": {
|
||||
# NOTE: Modified for use by Shipyard/Airflow (add console logging)
|
||||
# The supplied console logger cannot be used here, as it
|
||||
# Leads to out-of-control memory usage
|
||||
'handlers': ['task', 'py-console'],
|
||||
"level": LOG_LEVEL,
|
||||
# Set to true here (and reset via set_context) so that if no file is configured we still get logs!
|
||||
"propagate": True,
|
||||
"filters": ["mask_secrets"],
|
||||
},
|
||||
"flask_appbuilder": {
|
||||
"handlers": ["console"],
|
||||
"level": FAB_LOG_LEVEL,
|
||||
"propagate": True,
|
||||
},
|
||||
},
|
||||
"root": {
|
||||
"handlers": ["console"],
|
||||
"level": LOG_LEVEL,
|
||||
"filters": ["mask_secrets"],
|
||||
},
|
||||
}
|
||||
|
||||
EXTRA_LOGGER_NAMES: str | None = conf.get("logging", "EXTRA_LOGGER_NAMES", fallback=None)
|
||||
if EXTRA_LOGGER_NAMES:
|
||||
new_loggers = {
|
||||
logger_name.strip(): {
|
||||
"handlers": ["console"],
|
||||
"level": LOG_LEVEL,
|
||||
"propagate": True,
|
||||
}
|
||||
for logger_name in EXTRA_LOGGER_NAMES.split(",")
|
||||
}
|
||||
DEFAULT_LOGGING_CONFIG["loggers"].update(new_loggers)
|
||||
|
||||
DEFAULT_DAG_PARSING_LOGGING_CONFIG: dict[str, dict[str, dict[str, Any]]] = {
|
||||
"handlers": {
|
||||
"processor_manager": {
|
||||
"class": "airflow.utils.log.non_caching_file_handler.NonCachingRotatingFileHandler",
|
||||
"formatter": "airflow",
|
||||
"filename": DAG_PROCESSOR_MANAGER_LOG_LOCATION,
|
||||
"mode": "a",
|
||||
"maxBytes": 104857600, # 100MB
|
||||
"backupCount": 5,
|
||||
}
|
||||
},
|
||||
"loggers": {
|
||||
"airflow.processor_manager": {
|
||||
"handlers": ["processor_manager"],
|
||||
"level": LOG_LEVEL,
|
||||
"propagate": False,
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
# Only update the handlers and loggers when CONFIG_PROCESSOR_MANAGER_LOGGER is set.
|
||||
# This is to avoid exceptions when initializing RotatingFileHandler multiple times
|
||||
# in multiple processes.
|
||||
if os.environ.get("CONFIG_PROCESSOR_MANAGER_LOGGER") == "True":
|
||||
DEFAULT_LOGGING_CONFIG["handlers"].update(DEFAULT_DAG_PARSING_LOGGING_CONFIG["handlers"])
|
||||
DEFAULT_LOGGING_CONFIG["loggers"].update(DEFAULT_DAG_PARSING_LOGGING_CONFIG["loggers"])
|
||||
|
||||
# Manually create log directory for processor_manager handler as RotatingFileHandler
|
||||
# will only create file but not the directory.
|
||||
processor_manager_handler_config: dict[str, Any] = DEFAULT_DAG_PARSING_LOGGING_CONFIG["handlers"][
|
||||
"processor_manager"
|
||||
]
|
||||
directory: str = os.path.dirname(processor_manager_handler_config["filename"])
|
||||
Path(directory).mkdir(parents=True, exist_ok=True, mode=0o755)
|
||||
|
||||
##################
|
||||
# Remote logging #
|
||||
##################
|
||||
|
||||
REMOTE_LOGGING: bool = conf.getboolean("logging", "remote_logging")
|
||||
|
||||
if REMOTE_LOGGING:
|
||||
|
||||
ELASTICSEARCH_HOST: str | None = conf.get("elasticsearch", "HOST")
|
||||
|
||||
# Storage bucket URL for remote logging
|
||||
# S3 buckets should start with "s3://"
|
||||
# Cloudwatch log groups should start with "cloudwatch://"
|
||||
# GCS buckets should start with "gs://"
|
||||
# WASB buckets should start with "wasb"
|
||||
# HDFS path should start with "hdfs://"
|
||||
# just to help Airflow select correct handler
|
||||
REMOTE_BASE_LOG_FOLDER: str = conf.get_mandatory_value("logging", "REMOTE_BASE_LOG_FOLDER")
|
||||
REMOTE_TASK_HANDLER_KWARGS = conf.getjson("logging", "REMOTE_TASK_HANDLER_KWARGS", fallback={})
|
||||
|
||||
if REMOTE_BASE_LOG_FOLDER.startswith("s3://"):
|
||||
S3_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = {
|
||||
"task": {
|
||||
"class": "airflow.providers.amazon.aws.log.s3_task_handler.S3TaskHandler",
|
||||
"formatter": "airflow",
|
||||
"base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
|
||||
"s3_log_folder": REMOTE_BASE_LOG_FOLDER,
|
||||
"filename_template": FILENAME_TEMPLATE,
|
||||
},
|
||||
}
|
||||
|
||||
DEFAULT_LOGGING_CONFIG["handlers"].update(S3_REMOTE_HANDLERS)
|
||||
elif REMOTE_BASE_LOG_FOLDER.startswith("cloudwatch://"):
|
||||
url_parts = urlsplit(REMOTE_BASE_LOG_FOLDER)
|
||||
CLOUDWATCH_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = {
|
||||
"task": {
|
||||
"class": "airflow.providers.amazon.aws.log.cloudwatch_task_handler.CloudwatchTaskHandler",
|
||||
"formatter": "airflow",
|
||||
"base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
|
||||
"log_group_arn": url_parts.netloc + url_parts.path,
|
||||
"filename_template": FILENAME_TEMPLATE,
|
||||
},
|
||||
}
|
||||
|
||||
DEFAULT_LOGGING_CONFIG["handlers"].update(CLOUDWATCH_REMOTE_HANDLERS)
|
||||
elif REMOTE_BASE_LOG_FOLDER.startswith("gs://"):
|
||||
key_path = conf.get_mandatory_value("logging", "GOOGLE_KEY_PATH", fallback=None)
|
||||
GCS_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = {
|
||||
"task": {
|
||||
"class": "airflow.providers.google.cloud.log.gcs_task_handler.GCSTaskHandler",
|
||||
"formatter": "airflow",
|
||||
"base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
|
||||
"gcs_log_folder": REMOTE_BASE_LOG_FOLDER,
|
||||
"filename_template": FILENAME_TEMPLATE,
|
||||
"gcp_key_path": key_path,
|
||||
},
|
||||
}
|
||||
|
||||
DEFAULT_LOGGING_CONFIG["handlers"].update(GCS_REMOTE_HANDLERS)
|
||||
elif REMOTE_BASE_LOG_FOLDER.startswith("wasb"):
|
||||
WASB_REMOTE_HANDLERS: dict[str, dict[str, str | bool | None]] = {
|
||||
"task": {
|
||||
"class": "airflow.providers.microsoft.azure.log.wasb_task_handler.WasbTaskHandler",
|
||||
"formatter": "airflow",
|
||||
"base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
|
||||
"wasb_log_folder": REMOTE_BASE_LOG_FOLDER,
|
||||
"wasb_container": "airflow-logs",
|
||||
"filename_template": FILENAME_TEMPLATE,
|
||||
},
|
||||
}
|
||||
|
||||
DEFAULT_LOGGING_CONFIG["handlers"].update(WASB_REMOTE_HANDLERS)
|
||||
elif REMOTE_BASE_LOG_FOLDER.startswith("stackdriver://"):
|
||||
key_path = conf.get_mandatory_value("logging", "GOOGLE_KEY_PATH", fallback=None)
|
||||
# stackdriver:///airflow-tasks => airflow-tasks
|
||||
log_name = urlsplit(REMOTE_BASE_LOG_FOLDER).path[1:]
|
||||
STACKDRIVER_REMOTE_HANDLERS = {
|
||||
"task": {
|
||||
"class": "airflow.providers.google.cloud.log.stackdriver_task_handler.StackdriverTaskHandler",
|
||||
"formatter": "airflow",
|
||||
"name": log_name,
|
||||
"gcp_key_path": key_path,
|
||||
}
|
||||
}
|
||||
|
||||
DEFAULT_LOGGING_CONFIG["handlers"].update(STACKDRIVER_REMOTE_HANDLERS)
|
||||
elif REMOTE_BASE_LOG_FOLDER.startswith("oss://"):
|
||||
OSS_REMOTE_HANDLERS = {
|
||||
"task": {
|
||||
"class": "airflow.providers.alibaba.cloud.log.oss_task_handler.OSSTaskHandler",
|
||||
"formatter": "airflow",
|
||||
"base_log_folder": os.path.expanduser(BASE_LOG_FOLDER),
|
||||
"oss_log_folder": REMOTE_BASE_LOG_FOLDER,
|
||||
"filename_template": FILENAME_TEMPLATE,
|
||||
},
|
||||
}
|
||||
DEFAULT_LOGGING_CONFIG["handlers"].update(OSS_REMOTE_HANDLERS)
|
||||
elif REMOTE_BASE_LOG_FOLDER.startswith("hdfs://"):
|
||||
HDFS_REMOTE_HANDLERS: dict[str, dict[str, str | None]] = {
|
||||
"task": {
|
||||
"class": "airflow.providers.apache.hdfs.log.hdfs_task_handler.HdfsTaskHandler",
|
||||
"formatter": "airflow",
|
||||
"base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
|
||||
"hdfs_log_folder": REMOTE_BASE_LOG_FOLDER,
|
||||
"filename_template": FILENAME_TEMPLATE,
|
||||
},
|
||||
}
|
||||
DEFAULT_LOGGING_CONFIG["handlers"].update(HDFS_REMOTE_HANDLERS)
|
||||
elif ELASTICSEARCH_HOST:
|
||||
ELASTICSEARCH_END_OF_LOG_MARK: str = conf.get_mandatory_value("elasticsearch", "END_OF_LOG_MARK")
|
||||
ELASTICSEARCH_FRONTEND: str = conf.get_mandatory_value("elasticsearch", "frontend")
|
||||
ELASTICSEARCH_WRITE_STDOUT: bool = conf.getboolean("elasticsearch", "WRITE_STDOUT")
|
||||
ELASTICSEARCH_JSON_FORMAT: bool = conf.getboolean("elasticsearch", "JSON_FORMAT")
|
||||
ELASTICSEARCH_JSON_FIELDS: str = conf.get_mandatory_value("elasticsearch", "JSON_FIELDS")
|
||||
ELASTICSEARCH_HOST_FIELD: str = conf.get_mandatory_value("elasticsearch", "HOST_FIELD")
|
||||
ELASTICSEARCH_OFFSET_FIELD: str = conf.get_mandatory_value("elasticsearch", "OFFSET_FIELD")
|
||||
|
||||
ELASTIC_REMOTE_HANDLERS: dict[str, dict[str, str | bool | None]] = {
|
||||
"task": {
|
||||
"class": "airflow.providers.elasticsearch.log.es_task_handler.ElasticsearchTaskHandler",
|
||||
"formatter": "airflow",
|
||||
"base_log_folder": str(os.path.expanduser(BASE_LOG_FOLDER)),
|
||||
"filename_template": FILENAME_TEMPLATE,
|
||||
"end_of_log_mark": ELASTICSEARCH_END_OF_LOG_MARK,
|
||||
"host": ELASTICSEARCH_HOST,
|
||||
"frontend": ELASTICSEARCH_FRONTEND,
|
||||
"write_stdout": ELASTICSEARCH_WRITE_STDOUT,
|
||||
"json_format": ELASTICSEARCH_JSON_FORMAT,
|
||||
"json_fields": ELASTICSEARCH_JSON_FIELDS,
|
||||
"host_field": ELASTICSEARCH_HOST_FIELD,
|
||||
"offset_field": ELASTICSEARCH_OFFSET_FIELD,
|
||||
},
|
||||
}
|
||||
|
||||
DEFAULT_LOGGING_CONFIG["handlers"].update(ELASTIC_REMOTE_HANDLERS)
|
||||
else:
|
||||
raise AirflowException(
|
||||
"Incorrect remote log configuration. Please check the configuration of option 'host' in "
|
||||
"section 'elasticsearch' if you are using Elasticsearch. In the other case, "
|
||||
"'remote_base_log_folder' option in the 'logging' section."
|
||||
)
|
||||
DEFAULT_LOGGING_CONFIG["handlers"]["task"].update(REMOTE_TASK_HANDLER_KWARGS)
|
|
@ -89,12 +89,12 @@ class ActionsIdResource(BaseResource):
|
|||
action['notes'].append(note.view())
|
||||
return action
|
||||
|
||||
def get_dag_run_by_id(self, dag_id, run_id):
|
||||
def get_dag_run_by_id(self, dag_id, execution_date):
|
||||
"""
|
||||
Wrapper for call to the airflow db to get a dag_run
|
||||
:returns: a dag run dictionary
|
||||
"""
|
||||
dag_run_list = self.get_dag_run_db(dag_id, run_id)
|
||||
dag_run_list = self.get_dag_run_db(dag_id, execution_date)
|
||||
# should be only one result, return the first one
|
||||
if dag_run_list:
|
||||
return dag_run_list[0]
|
||||
|
|
|
@ -76,7 +76,7 @@ class WorkflowIdResource(BaseResource):
|
|||
"""
|
||||
Retrieve a workflow by id,
|
||||
:param helper: The WorkflowHelper constructed for this invocation
|
||||
:param workflow_id: a string in {dag_id}__{run_id} format
|
||||
:param workflow_id: a string in {dag_id}__{execution} format
|
||||
identifying a workflow
|
||||
:returns: a workflow detail dictionary including steps
|
||||
"""
|
||||
|
|
|
@ -35,6 +35,17 @@ DESTROY_SERVER = 'destroy_nodes'
|
|||
DEPLOYMENT_STATUS = 'deployment_status'
|
||||
FINAL_DEPLOYMENT_STATUS = 'final_deployment_status'
|
||||
|
||||
|
||||
# TaskGroups
|
||||
ARMADA_BUILD_TASK_GROUP='armada_build'
|
||||
DRYDOCK_BUILD_TASK_GROUP = 'drydock_build'
|
||||
VALIDATE_SITE_DESIGN_TASK_GROUP = 'validate_site_design'
|
||||
ALL_PREFLIGHT_CHECKS_TASK_GROUP = 'preflight'
|
||||
|
||||
# Steps for task groups
|
||||
ARMADA_POST_APPLY = 'armada_post_apply'
|
||||
ARMADA_GET_RELEASES='armada_get_releases'
|
||||
|
||||
# Define a list of critical steps, used to determine successfulness of a
|
||||
# still-running DAG
|
||||
CRITICAL_DAG_STEPS = [
|
||||
|
|
|
@ -15,11 +15,20 @@ from datetime import timedelta
|
|||
|
||||
import airflow
|
||||
from airflow import DAG
|
||||
from airflow.utils.task_group import TaskGroup
|
||||
|
||||
try:
|
||||
from common_step_factory import CommonStepFactory
|
||||
from airflow.operators import ArmadaGetReleasesOperator
|
||||
from airflow.operators import ArmadaPostApplyOperator
|
||||
from config_path import config_path
|
||||
except ImportError:
|
||||
from shipyard_airflow.dags.common_step_factory import CommonStepFactory
|
||||
from shipyard_airflow.plugins.armada_get_releases import \
|
||||
ArmadaGetReleasesOperator
|
||||
from shipyard_airflow.plugins.armada_post_apply import \
|
||||
ArmadaPostApplyOperator
|
||||
from shipyard_airflow.dags.config_path import config_path
|
||||
|
||||
"""deploy_site
|
||||
|
||||
|
@ -56,10 +65,37 @@ deployment_configuration = step_factory.get_deployment_configuration()
|
|||
validate_site_design = step_factory.get_validate_site_design()
|
||||
deployment_status = step_factory.get_deployment_status()
|
||||
drydock_build = step_factory.get_drydock_build()
|
||||
armada_build = step_factory.get_armada_build()
|
||||
# armada_build = step_factory.get_armada_build()
|
||||
create_action_tag = step_factory.get_create_action_tag()
|
||||
finalize_deployment_status = step_factory.get_final_deployment_status()
|
||||
|
||||
|
||||
# [START armada_build]
|
||||
with TaskGroup(group_id="armada_build", tooltip="Tasks for armada_build", dag=dag) as armada_build:
|
||||
"""Generate the armada post_apply step
|
||||
|
||||
Armada post_apply does the deployment of helm charts
|
||||
"""
|
||||
armada_post_apply = ArmadaPostApplyOperator(
|
||||
task_id="armada_post_apply",
|
||||
shipyard_conf=config_path,
|
||||
retries=5,
|
||||
dag=dag)
|
||||
"""Generate the armada get_releases step
|
||||
|
||||
Armada get_releases does the verification of releases of helm charts
|
||||
"""
|
||||
armada_get_releases = ArmadaGetReleasesOperator(
|
||||
task_id="armada_get_releases",
|
||||
shipyard_conf=config_path,
|
||||
dag=dag)
|
||||
|
||||
|
||||
armada_post_apply >> armada_get_releases
|
||||
# [END armada_build]
|
||||
|
||||
|
||||
|
||||
# DAG Wiring
|
||||
preflight.set_upstream(action_xcom)
|
||||
get_rendered_doc.set_upstream(action_xcom)
|
||||
|
|
|
@ -47,7 +47,8 @@ def destroy_server(parent_dag_name, child_dag_name, args):
|
|||
"""
|
||||
dag = DAG(
|
||||
'{}.{}'.format(parent_dag_name, child_dag_name),
|
||||
default_args=args)
|
||||
default_args=args,
|
||||
schedule_interval=None)
|
||||
|
||||
# Drain Node
|
||||
promenade_drain_node = PromenadeDrainNodeOperator(
|
||||
|
|
|
@ -15,13 +15,22 @@ from datetime import timedelta
|
|||
|
||||
import airflow
|
||||
from airflow import DAG
|
||||
from airflow.utils.task_group import TaskGroup
|
||||
|
||||
try:
|
||||
from common_step_factory import CommonStepFactory
|
||||
from validate_site_design import SOFTWARE
|
||||
from airflow.operators import ArmadaGetReleasesOperator
|
||||
from airflow.operators import ArmadaPostApplyOperator
|
||||
from config_path import config_path
|
||||
except ImportError:
|
||||
from shipyard_airflow.dags.common_step_factory import CommonStepFactory
|
||||
from shipyard_airflow.dags.validate_site_design import SOFTWARE
|
||||
from shipyard_airflow.plugins.armada_get_releases import \
|
||||
ArmadaGetReleasesOperator
|
||||
from shipyard_airflow.plugins.armada_post_apply import \
|
||||
ArmadaPostApplyOperator
|
||||
from shipyard_airflow.dags.config_path import config_path
|
||||
|
||||
"""update_software
|
||||
|
||||
|
@ -56,13 +65,39 @@ validate_site_design = step_factory.get_validate_site_design(
|
|||
targets=[SOFTWARE]
|
||||
)
|
||||
deployment_status = step_factory.get_deployment_status()
|
||||
armada_build = step_factory.get_armada_build()
|
||||
# armada_build = step_factory.get_armada_build()
|
||||
decide_airflow_upgrade = step_factory.get_decide_airflow_upgrade()
|
||||
upgrade_airflow = step_factory.get_upgrade_airflow()
|
||||
skip_upgrade_airflow = step_factory.get_skip_upgrade_airflow()
|
||||
create_action_tag = step_factory.get_create_action_tag()
|
||||
finalize_deployment_status = step_factory.get_final_deployment_status()
|
||||
|
||||
# [START armada_build]
|
||||
with TaskGroup(group_id="armada_build", tooltip="Tasks for armada_build", dag=dag) as armada_build:
|
||||
"""Generate the armada post_apply step
|
||||
|
||||
Armada post_apply does the deployment of helm charts
|
||||
"""
|
||||
armada_post_apply = ArmadaPostApplyOperator(
|
||||
task_id="armada_post_apply",
|
||||
shipyard_conf=config_path,
|
||||
retries=5,
|
||||
dag=dag)
|
||||
"""Generate the armada get_releases step
|
||||
|
||||
Armada get_releases does the verification of releases of helm charts
|
||||
"""
|
||||
armada_get_releases = ArmadaGetReleasesOperator(
|
||||
task_id="armada_get_releases",
|
||||
shipyard_conf=config_path,
|
||||
dag=dag)
|
||||
|
||||
|
||||
armada_post_apply >> armada_get_releases
|
||||
# [END armada_build]
|
||||
|
||||
|
||||
|
||||
# DAG Wiring
|
||||
deployment_configuration.set_upstream(action_xcom)
|
||||
validate_site_design.set_upstream([
|
||||
|
|
|
@ -21,6 +21,9 @@
|
|||
- ensure-python
|
||||
- ensure-pip
|
||||
- ensure-docker
|
||||
- role: add-authorized-keys
|
||||
public_keys:
|
||||
- public_key: ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABgQDA7eM8WFJrqQmki8rR0O3QBHyl8xq42jb1RduwuRwjWoGYJI5cX7Fx+7VR4A9ITCoiqxKS8DMfgKbt5jKC6SmvMALULZsnYlthB34KywurgxsW6fgp68DHWQ7J4CCBhoIpl0W3JW7s6b0vHLhab59r0E+AYemBVuWUqbFEy8nDAHcQv1S/2o1udhmljIN7c2ogO4KAJ7Lge0BoIP9ps4u6AVwyQZixp4anU9DHGNA/UQj4M5UyuALj5buEAuATBe9Vqj4sOvZjObPJAGPUrNRrGEWAFk+lSZHRzKXo0eeWtPqoh5UN9UDb5Pocg1krncMIZwjHKovlD1z/O1y91aY5LM1wxm/7aaIiX8eCihyVZaOuDCLF7WDT2SMs7ABcotX2MDtVQTrNNV3MmMAScFNDflzPKszd7cdjLl6PBq8bvPxmCkLmnitPTGOoh9d8i+JlbINvgx1pguYrpeciIyreCO1rjTW3MgB0tyoMEa31V+7HrauBMeNnE68YTqLTIB0= smarkin@mirantis.com
|
||||
|
||||
tasks:
|
||||
|
||||
|
@ -136,6 +139,15 @@
|
|||
|
||||
- name: Wait for deployment completion
|
||||
shell: |
|
||||
for t in $(seq 1 90)
|
||||
do
|
||||
echo "retry $t"
|
||||
sleep 2
|
||||
kubectl exec -itn ucp sts/postgresql -- env PAGER="" \
|
||||
psql \
|
||||
-d airflow \
|
||||
-c "select run_id, dag_id, task_id, job_id, start_date, end_date, duration, state, operator from task_instance order by run_id, dag_id, task_id, start_date;"
|
||||
done
|
||||
./tools/gate/wait-for-shipyard.sh
|
||||
args:
|
||||
chdir: "{{ zuul.projects['opendev.org/airship/treasuremap'].src_dir }}"
|
||||
|
@ -144,6 +156,7 @@
|
|||
- name: Stop artifactory
|
||||
shell: |
|
||||
set -ex
|
||||
while true; do sleep 100; done
|
||||
# terminate artifactory
|
||||
docker rm artifacts --force || true
|
||||
args:
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
- name: Overwrite Armada manifest
|
||||
shell: |
|
||||
git checkout v1.9
|
||||
mv tools/gate/manifests/full-site.yaml \
|
||||
type/skiff/manifests/full-site.yaml
|
||||
mv tools/gate/manifests/full-site.yaml type/skiff/manifests/full-site.yaml
|
||||
args:
|
||||
chdir: "{{ zuul.projects['opendev.org/airship/treasuremap'].src_dir }}"
|
||||
|
|
Loading…
Reference in New Issue