Browse Source

Store status of deployment in a ConfigMap

This change adds a new Shipyard Operator that creates/updates a
ConfigMap with information on the version and status of the current
running deployment. This ConfigMap will be created at the start of the
deployments, and will be updated at the end even if the previous steps
fail.

This operator has been added to the deploy_site, update_site, and
update_software DAGs.

Change-Id: Iab9ea84d5e1edd6a8635cc4e4fa93647ee485194
changes/97/660197/11
Michael Beaver 2 years ago
parent
commit
53e863954b
  1. 7
      charts/shipyard/templates/statefulset-airflow-worker.yaml
  2. 3
      charts/shipyard/values.yaml
  3. 16
      src/bin/shipyard_airflow/shipyard_airflow/conf/config.py
  4. 35
      src/bin/shipyard_airflow/shipyard_airflow/dags/common_step_factory.py
  5. 2
      src/bin/shipyard_airflow/shipyard_airflow/dags/dag_names.py
  6. 9
      src/bin/shipyard_airflow/shipyard_airflow/dags/deploy_site.py
  7. 9
      src/bin/shipyard_airflow/shipyard_airflow/dags/update_site.py
  8. 6
      src/bin/shipyard_airflow/shipyard_airflow/dags/update_software.py
  9. 217
      src/bin/shipyard_airflow/shipyard_airflow/plugins/deployment_status_operator.py

7
charts/shipyard/templates/statefulset-airflow-worker.yaml

@ -37,6 +37,13 @@ rules:
- get
- list
- watch
- apiGroups:
- ""
resources:
- configmaps
verbs:
- create
- patch
---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRoleBinding

3
charts/shipyard/values.yaml

@ -410,6 +410,9 @@ conf:
worker_port: 8793
k8s_logs:
ucp_namespace: 'ucp'
deployment_status_configmap:
name: 'deployment-status'
namespace: 'ucp'
oslo_policy:
policy_file: /etc/shipyard/policy.yaml
# If non-existent rule is used, the request should be denied. The

16
src/bin/shipyard_airflow/shipyard_airflow/conf/config.py

@ -286,6 +286,22 @@ SECTIONS = [
),
]
),
ConfigSection(
name='deployment_status_configmap',
title='Parameters for Deployment Status ConfigMap',
options=[
cfg.StrOpt(
'name',
default='deployment-status',
help='Name of the Deployment Status ConfigMap'
),
cfg.StrOpt(
'namespace',
default='ucp',
help='Namespace of the Deployment Status ConfigMap'
),
]
),
ConfigSection(
name='document_info',
title=('Information about some of the documents Shipyard needs to '

35
src/bin/shipyard_airflow/shipyard_airflow/dags/common_step_factory.py

@ -16,6 +16,7 @@ from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.subdag_operator import SubDagOperator
try:
# Operators are loaded from being registered to airflow.operators
# in a deployed fashion
@ -26,6 +27,7 @@ try:
from airflow.operators import DeckhandCreateSiteActionTagOperator
from airflow.operators import DrydockDestroyNodeOperator
from airflow.operators import DrydockRelabelNodesOperator
from airflow.operators import DeploymentStatusOperator
except ImportError:
# for local testing, they are loaded from their source directory
from shipyard_airflow.plugins.armada_test_releases import \
@ -42,6 +44,8 @@ except ImportError:
DrydockDestroyNodeOperator
from shipyard_airflow.plugins.drydock_relabel_nodes import \
DrydockRelabelNodesOperator
from shipyard_airflow.plugins.deployment_status_operator import \
DeploymentStatusOperator
try:
# modules reside in a flat directory when deployed with dags
@ -348,3 +352,34 @@ class CommonStepFactory(object):
trigger_rule="all_done",
main_dag_name=self.parent_dag_name,
dag=self.dag)
def get_deployment_status(self, task_id=dn.DEPLOYMENT_STATUS):
"""Create/update the deployment status ConfigMap
This will create or update a ConfigMap with the current state of the
deployment
"""
return DeploymentStatusOperator(
shipyard_conf=config_path,
main_dag_name=self.parent_dag_name,
task_id=task_id,
on_failure_callback=step_failure_handler,
dag=self.dag)
def get_final_deployment_status(self, task_id=dn.FINAL_DEPLOYMENT_STATUS):
"""Finalize the deployment status ConfigMap
This will finalize the ConfigMap with the current state of the
deployment. Because it is the final step we need to set force_completed
to True to mark it as completed, as well as change the trigger_rule to
"all_done" so the ConfigMap is always updated even if other steps fail
"""
return DeploymentStatusOperator(
shipyard_conf=config_path,
main_dag_name=self.parent_dag_name,
force_completed=True,
task_id=task_id,
trigger_rule="all_done",
on_failure_callback=step_failure_handler,
dag=self.dag)

2
src/bin/shipyard_airflow/shipyard_airflow/dags/dag_names.py

@ -32,6 +32,8 @@ GET_RENDERED_DOC = 'get_rendered_doc'
SKIP_UPGRADE_AIRFLOW = 'skip_upgrade_airflow'
UPGRADE_AIRFLOW = 'upgrade_airflow'
DESTROY_SERVER = 'destroy_nodes'
DEPLOYMENT_STATUS = 'deployment_status'
FINAL_DEPLOYMENT_STATUS = 'final_deployment_status'
# Define a list of critical steps, used to determine successfulness of a
# still-running DAG

9
src/bin/shipyard_airflow/shipyard_airflow/dags/deploy_site.py

@ -54,9 +54,11 @@ preflight = step_factory.get_preflight()
get_rendered_doc = step_factory.get_get_rendered_doc()
deployment_configuration = step_factory.get_deployment_configuration()
validate_site_design = step_factory.get_validate_site_design()
deployment_status = step_factory.get_deployment_status()
drydock_build = step_factory.get_drydock_build()
armada_build = step_factory.get_armada_build()
create_action_tag = step_factory.get_create_action_tag()
finalize_deployment_status = step_factory.get_final_deployment_status()
# DAG Wiring
preflight.set_upstream(action_xcom)
@ -68,6 +70,13 @@ validate_site_design.set_upstream([
concurrency_check,
deployment_configuration
])
deployment_status.set_upstream([
get_rendered_doc,
concurrency_check
])
drydock_build.set_upstream(validate_site_design)
armada_build.set_upstream(drydock_build)
create_action_tag.set_upstream(armada_build)
# finalize_deployment_status needs to be downstream of everything
finalize_deployment_status.set_upstream(create_action_tag)

9
src/bin/shipyard_airflow/shipyard_airflow/dags/update_site.py

@ -58,12 +58,14 @@ preflight = step_factory.get_preflight()
get_rendered_doc = step_factory.get_get_rendered_doc()
deployment_configuration = step_factory.get_deployment_configuration()
validate_site_design = step_factory.get_validate_site_design()
deployment_status = step_factory.get_deployment_status()
drydock_build = step_factory.get_drydock_build(verify_nodes_exist=True)
armada_build = step_factory.get_armada_build()
decide_airflow_upgrade = step_factory.get_decide_airflow_upgrade()
upgrade_airflow = step_factory.get_upgrade_airflow()
skip_upgrade_airflow = step_factory.get_skip_upgrade_airflow()
create_action_tag = step_factory.get_create_action_tag()
finalize_deployment_status = step_factory.get_final_deployment_status()
# DAG Wiring
preflight.set_upstream(action_xcom)
@ -75,6 +77,10 @@ validate_site_design.set_upstream([
concurrency_check,
deployment_configuration
])
deployment_status.set_upstream([
get_rendered_doc,
concurrency_check
])
drydock_build.set_upstream(validate_site_design)
armada_build.set_upstream(drydock_build)
decide_airflow_upgrade.set_upstream(armada_build)
@ -86,3 +92,6 @@ create_action_tag.set_upstream([
upgrade_airflow,
skip_upgrade_airflow
])
# finalize_deployment_status needs to be downstream of everything
finalize_deployment_status.set_upstream(create_action_tag)

6
src/bin/shipyard_airflow/shipyard_airflow/dags/update_software.py

@ -55,11 +55,13 @@ deployment_configuration = step_factory.get_deployment_configuration()
validate_site_design = step_factory.get_validate_site_design(
targets=[SOFTWARE]
)
deployment_status = step_factory.get_deployment_status()
armada_build = step_factory.get_armada_build()
decide_airflow_upgrade = step_factory.get_decide_airflow_upgrade()
upgrade_airflow = step_factory.get_upgrade_airflow()
skip_upgrade_airflow = step_factory.get_skip_upgrade_airflow()
create_action_tag = step_factory.get_create_action_tag()
finalize_deployment_status = step_factory.get_final_deployment_status()
# DAG Wiring
deployment_configuration.set_upstream(action_xcom)
@ -67,6 +69,7 @@ validate_site_design.set_upstream([
concurrency_check,
deployment_configuration
])
deployment_status.set_upstream(concurrency_check)
armada_build.set_upstream(validate_site_design)
decide_airflow_upgrade.set_upstream(armada_build)
decide_airflow_upgrade.set_downstream([
@ -77,3 +80,6 @@ create_action_tag.set_upstream([
upgrade_airflow,
skip_upgrade_airflow
])
# finalize_deployment_status needs to be downstream of everything
finalize_deployment_status.set_upstream(create_action_tag)

217
src/bin/shipyard_airflow/shipyard_airflow/plugins/deployment_status_operator.py

@ -0,0 +1,217 @@
# Copyright 2019 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import configparser
import logging
import yaml
from airflow import AirflowException
from airflow.plugins_manager import AirflowPlugin
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
import kubernetes
from kubernetes.client.rest import ApiException
from kubernetes.client.models.v1_config_map import V1ConfigMap
from kubernetes.client.models.v1_object_meta import V1ObjectMeta
from shipyard_airflow.conf import config
from shipyard_airflow.control.helpers.action_helper import \
get_deployment_status
from shipyard_airflow.plugins.xcom_puller import XcomPuller
from shipyard_airflow.common.document_validators.document_validation_utils \
import DocumentValidationUtils
from shipyard_airflow.plugins.deckhand_client_factory import \
DeckhandClientFactory
from shipyard_airflow.common.document_validators.errors import \
DocumentNotFoundError
LOG = logging.getLogger(__name__)
# Variable to hold details about how the Kubernetes ConfigMap is stored
CONFIG_MAP_DETAILS = {
'api_version': 'v1',
'kind': 'ConfigMap',
'pretty': 'true'
}
class DeploymentStatusOperator(BaseOperator):
"""Deployment status operator
Update Kubernetes with the deployment status of this dag's action
"""
@apply_defaults
def __init__(self, shipyard_conf, main_dag_name, force_completed=False,
*args, **kwargs):
super(DeploymentStatusOperator, self).__init__(*args, **kwargs)
self.shipyard_conf = shipyard_conf
self.main_dag_name = main_dag_name
self.force_completed = force_completed
self.xcom_puller = None
def execute(self, context):
"""Execute the main code for this operator.
Create a ConfigMap with the deployment status of this dag's action
"""
LOG.info("Running deployment status operator")
self.xcom_puller = XcomPuller(self.main_dag_name, context['ti'])
# Required for the get_deployment_status helper to function properly
config.parse_args(args=[], default_config_files=[self.shipyard_conf])
# First we need to check if the concurrency check was successful as
# this operator is expected to run even if upstream steps fail
if not self.xcom_puller.get_concurrency_status():
msg = "Concurrency check did not pass, so the deployment status " \
"will not be updated"
LOG.error(msg)
raise AirflowException(msg)
deployment_status_doc, revision_id = self._get_status_and_revision()
deployment_version_doc = self._get_version_doc(revision_id)
full_data = {
'deployment': deployment_status_doc,
**deployment_version_doc
}
config_map_data = {'release': yaml.safe_dump(full_data)}
self._store_as_config_map(config_map_data)
def _get_status_and_revision(self):
"""Retrieve the deployment status information from the appropriate
helper function
:return: dict with the status of the deployment
:return: revision_id of the action
"""
action_info = self.xcom_puller.get_action_info()
deployment_status = get_deployment_status(
action_info,
force_completed=self.force_completed)
revision_id = action_info['committed_rev_id']
return deployment_status, revision_id
def _get_version_doc(self, revision_id):
"""Retrieve the deployment-version document from Deckhand
:param revision_id: the revision_id of the docs to grab the
deployment-version document from
:return: deployment-version document returned from Deckhand
"""
# Read and parse shipyard.conf
config = configparser.ConfigParser()
config.read(self.shipyard_conf)
doc_name = config.get('document_info', 'deployment_version_name')
doc_schema = config.get('document_info', 'deployment_version_schema')
dh_client = DeckhandClientFactory(self.shipyard_conf).get_client()
dh_tool = DocumentValidationUtils(dh_client)
try:
deployment_version_doc = dh_tool.get_unique_doc(
revision_id=revision_id,
schema=doc_schema,
name=doc_name)
return deployment_version_doc
except DocumentNotFoundError:
LOG.info("There is no deployment-version document in Deckhand "
"under the revision '{}' with the name '{}' and schema "
"'{}'".format(revision_id, doc_name, doc_schema))
return {}
def _store_as_config_map(self, data):
"""Store given data in a Kubernetes ConfigMap
:param dict data: The data to store in the ConfigMap
"""
LOG.info("Storing deployment status as Kubernetes ConfigMap")
# Read and parse shipyard.conf
config = configparser.ConfigParser()
config.read(self.shipyard_conf)
name = config.get('deployment_status_configmap', 'name')
namespace = config.get('deployment_status_configmap', 'namespace')
k8s_client = self._get_k8s_client()
cfg_map_obj = self._create_config_map_object(name, namespace, data)
cfg_map_naming = "(name: {}, namespace: {})".format(name, namespace)
try:
LOG.info("Updating deployment status config map {}, "
.format(cfg_map_naming))
k8s_client.patch_namespaced_config_map(
name,
namespace,
cfg_map_obj,
pretty=CONFIG_MAP_DETAILS['pretty'])
except ApiException as err:
if err.status != 404:
raise
# ConfigMap still needs to be created
LOG.info("Deployment status config map does not exist yet")
LOG.info("Creating deployment status config map {}".format(
cfg_map_naming))
k8s_client.create_namespaced_config_map(
namespace,
cfg_map_obj,
pretty=CONFIG_MAP_DETAILS['pretty'])
@staticmethod
def _get_k8s_client():
"""Create and return a Kubernetes client
:returns: A Kubernetes client object
:rtype: kubernetes.client
"""
# Note that we are using 'in_cluster_config'
LOG.debug("Loading Kubernetes config")
kubernetes.config.load_incluster_config()
LOG.debug("Creating Kubernetes client")
return kubernetes.client.CoreV1Api()
@staticmethod
def _create_config_map_object(name, namespace, data):
"""Create/return a Kubernetes ConfigMap object out of the given data
:param dict data: The data to put into the config map
:returns: A config map object made from the given data
:rtype: V1ConfigMap
"""
LOG.debug("Creating Kubernetes config map object")
metadata = V1ObjectMeta(
name=name,
namespace=namespace
)
return V1ConfigMap(
api_version=CONFIG_MAP_DETAILS['api_version'],
kind=CONFIG_MAP_DETAILS['kind'],
data=data,
metadata=metadata
)
class DeploymentStatusOperatorPlugin(AirflowPlugin):
"""Creates DeploymentStatusOperatorPlugin in Airflow."""
name = "deployment_status_operator"
operators = [DeploymentStatusOperator]
Loading…
Cancel
Save