diff --git a/charts/shipyard/templates/statefulset-airflow-worker.yaml b/charts/shipyard/templates/statefulset-airflow-worker.yaml index 9fcc5a31..84776f5c 100644 --- a/charts/shipyard/templates/statefulset-airflow-worker.yaml +++ b/charts/shipyard/templates/statefulset-airflow-worker.yaml @@ -37,6 +37,13 @@ rules: - get - list - watch + - apiGroups: + - "" + resources: + - configmaps + verbs: + - create + - patch --- apiVersion: rbac.authorization.k8s.io/v1beta1 kind: ClusterRoleBinding diff --git a/charts/shipyard/values.yaml b/charts/shipyard/values.yaml index 6dbccfc4..be8c0b5e 100644 --- a/charts/shipyard/values.yaml +++ b/charts/shipyard/values.yaml @@ -410,6 +410,9 @@ conf: worker_port: 8793 k8s_logs: ucp_namespace: 'ucp' + deployment_status_configmap: + name: 'deployment-status' + namespace: 'ucp' oslo_policy: policy_file: /etc/shipyard/policy.yaml # If non-existent rule is used, the request should be denied. The diff --git a/src/bin/shipyard_airflow/shipyard_airflow/conf/config.py b/src/bin/shipyard_airflow/shipyard_airflow/conf/config.py index 341cb539..8699ff2a 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/conf/config.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/conf/config.py @@ -286,6 +286,22 @@ SECTIONS = [ ), ] ), + ConfigSection( + name='deployment_status_configmap', + title='Parameters for Deployment Status ConfigMap', + options=[ + cfg.StrOpt( + 'name', + default='deployment-status', + help='Name of the Deployment Status ConfigMap' + ), + cfg.StrOpt( + 'namespace', + default='ucp', + help='Namespace of the Deployment Status ConfigMap' + ), + ] + ), ConfigSection( name='document_info', title=('Information about some of the documents Shipyard needs to ' diff --git a/src/bin/shipyard_airflow/shipyard_airflow/dags/common_step_factory.py b/src/bin/shipyard_airflow/shipyard_airflow/dags/common_step_factory.py index b11aa1da..3e964394 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/dags/common_step_factory.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/dags/common_step_factory.py @@ -16,6 +16,7 @@ from airflow.operators.python_operator import BranchPythonOperator from airflow.operators.python_operator import PythonOperator from airflow.operators.subdag_operator import SubDagOperator + try: # Operators are loaded from being registered to airflow.operators # in a deployed fashion @@ -26,6 +27,7 @@ try: from airflow.operators import DeckhandCreateSiteActionTagOperator from airflow.operators import DrydockDestroyNodeOperator from airflow.operators import DrydockRelabelNodesOperator + from airflow.operators import DeploymentStatusOperator except ImportError: # for local testing, they are loaded from their source directory from shipyard_airflow.plugins.armada_test_releases import \ @@ -42,6 +44,8 @@ except ImportError: DrydockDestroyNodeOperator from shipyard_airflow.plugins.drydock_relabel_nodes import \ DrydockRelabelNodesOperator + from shipyard_airflow.plugins.deployment_status_operator import \ + DeploymentStatusOperator try: # modules reside in a flat directory when deployed with dags @@ -348,3 +352,34 @@ class CommonStepFactory(object): trigger_rule="all_done", main_dag_name=self.parent_dag_name, dag=self.dag) + + def get_deployment_status(self, task_id=dn.DEPLOYMENT_STATUS): + """Create/update the deployment status ConfigMap + + This will create or update a ConfigMap with the current state of the + deployment + """ + return DeploymentStatusOperator( + shipyard_conf=config_path, + main_dag_name=self.parent_dag_name, + task_id=task_id, + on_failure_callback=step_failure_handler, + dag=self.dag) + + def get_final_deployment_status(self, task_id=dn.FINAL_DEPLOYMENT_STATUS): + """Finalize the deployment status ConfigMap + + This will finalize the ConfigMap with the current state of the + deployment. Because it is the final step we need to set force_completed + to True to mark it as completed, as well as change the trigger_rule to + "all_done" so the ConfigMap is always updated even if other steps fail + """ + + return DeploymentStatusOperator( + shipyard_conf=config_path, + main_dag_name=self.parent_dag_name, + force_completed=True, + task_id=task_id, + trigger_rule="all_done", + on_failure_callback=step_failure_handler, + dag=self.dag) diff --git a/src/bin/shipyard_airflow/shipyard_airflow/dags/dag_names.py b/src/bin/shipyard_airflow/shipyard_airflow/dags/dag_names.py index 0c37ce16..854f3491 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/dags/dag_names.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/dags/dag_names.py @@ -32,6 +32,8 @@ GET_RENDERED_DOC = 'get_rendered_doc' SKIP_UPGRADE_AIRFLOW = 'skip_upgrade_airflow' UPGRADE_AIRFLOW = 'upgrade_airflow' DESTROY_SERVER = 'destroy_nodes' +DEPLOYMENT_STATUS = 'deployment_status' +FINAL_DEPLOYMENT_STATUS = 'final_deployment_status' # Define a list of critical steps, used to determine successfulness of a # still-running DAG diff --git a/src/bin/shipyard_airflow/shipyard_airflow/dags/deploy_site.py b/src/bin/shipyard_airflow/shipyard_airflow/dags/deploy_site.py index 2415a7f2..b70abcbd 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/dags/deploy_site.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/dags/deploy_site.py @@ -54,9 +54,11 @@ preflight = step_factory.get_preflight() get_rendered_doc = step_factory.get_get_rendered_doc() deployment_configuration = step_factory.get_deployment_configuration() validate_site_design = step_factory.get_validate_site_design() +deployment_status = step_factory.get_deployment_status() drydock_build = step_factory.get_drydock_build() armada_build = step_factory.get_armada_build() create_action_tag = step_factory.get_create_action_tag() +finalize_deployment_status = step_factory.get_final_deployment_status() # DAG Wiring preflight.set_upstream(action_xcom) @@ -68,6 +70,13 @@ validate_site_design.set_upstream([ concurrency_check, deployment_configuration ]) +deployment_status.set_upstream([ + get_rendered_doc, + concurrency_check +]) drydock_build.set_upstream(validate_site_design) armada_build.set_upstream(drydock_build) create_action_tag.set_upstream(armada_build) + +# finalize_deployment_status needs to be downstream of everything +finalize_deployment_status.set_upstream(create_action_tag) diff --git a/src/bin/shipyard_airflow/shipyard_airflow/dags/update_site.py b/src/bin/shipyard_airflow/shipyard_airflow/dags/update_site.py index 892fbb20..fc5505c9 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/dags/update_site.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/dags/update_site.py @@ -58,12 +58,14 @@ preflight = step_factory.get_preflight() get_rendered_doc = step_factory.get_get_rendered_doc() deployment_configuration = step_factory.get_deployment_configuration() validate_site_design = step_factory.get_validate_site_design() +deployment_status = step_factory.get_deployment_status() drydock_build = step_factory.get_drydock_build(verify_nodes_exist=True) armada_build = step_factory.get_armada_build() decide_airflow_upgrade = step_factory.get_decide_airflow_upgrade() upgrade_airflow = step_factory.get_upgrade_airflow() skip_upgrade_airflow = step_factory.get_skip_upgrade_airflow() create_action_tag = step_factory.get_create_action_tag() +finalize_deployment_status = step_factory.get_final_deployment_status() # DAG Wiring preflight.set_upstream(action_xcom) @@ -75,6 +77,10 @@ validate_site_design.set_upstream([ concurrency_check, deployment_configuration ]) +deployment_status.set_upstream([ + get_rendered_doc, + concurrency_check +]) drydock_build.set_upstream(validate_site_design) armada_build.set_upstream(drydock_build) decide_airflow_upgrade.set_upstream(armada_build) @@ -86,3 +92,6 @@ create_action_tag.set_upstream([ upgrade_airflow, skip_upgrade_airflow ]) + +# finalize_deployment_status needs to be downstream of everything +finalize_deployment_status.set_upstream(create_action_tag) diff --git a/src/bin/shipyard_airflow/shipyard_airflow/dags/update_software.py b/src/bin/shipyard_airflow/shipyard_airflow/dags/update_software.py index f55e2631..09ade1d9 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/dags/update_software.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/dags/update_software.py @@ -55,11 +55,13 @@ deployment_configuration = step_factory.get_deployment_configuration() validate_site_design = step_factory.get_validate_site_design( targets=[SOFTWARE] ) +deployment_status = step_factory.get_deployment_status() armada_build = step_factory.get_armada_build() decide_airflow_upgrade = step_factory.get_decide_airflow_upgrade() upgrade_airflow = step_factory.get_upgrade_airflow() skip_upgrade_airflow = step_factory.get_skip_upgrade_airflow() create_action_tag = step_factory.get_create_action_tag() +finalize_deployment_status = step_factory.get_final_deployment_status() # DAG Wiring deployment_configuration.set_upstream(action_xcom) @@ -67,6 +69,7 @@ validate_site_design.set_upstream([ concurrency_check, deployment_configuration ]) +deployment_status.set_upstream(concurrency_check) armada_build.set_upstream(validate_site_design) decide_airflow_upgrade.set_upstream(armada_build) decide_airflow_upgrade.set_downstream([ @@ -77,3 +80,6 @@ create_action_tag.set_upstream([ upgrade_airflow, skip_upgrade_airflow ]) + +# finalize_deployment_status needs to be downstream of everything +finalize_deployment_status.set_upstream(create_action_tag) diff --git a/src/bin/shipyard_airflow/shipyard_airflow/plugins/deployment_status_operator.py b/src/bin/shipyard_airflow/shipyard_airflow/plugins/deployment_status_operator.py new file mode 100644 index 00000000..2340ff27 --- /dev/null +++ b/src/bin/shipyard_airflow/shipyard_airflow/plugins/deployment_status_operator.py @@ -0,0 +1,217 @@ +# Copyright 2019 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import configparser +import logging + +import yaml +from airflow import AirflowException +from airflow.plugins_manager import AirflowPlugin +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults +import kubernetes +from kubernetes.client.rest import ApiException +from kubernetes.client.models.v1_config_map import V1ConfigMap +from kubernetes.client.models.v1_object_meta import V1ObjectMeta + +from shipyard_airflow.conf import config + +from shipyard_airflow.control.helpers.action_helper import \ + get_deployment_status + +from shipyard_airflow.plugins.xcom_puller import XcomPuller + +from shipyard_airflow.common.document_validators.document_validation_utils \ + import DocumentValidationUtils +from shipyard_airflow.plugins.deckhand_client_factory import \ + DeckhandClientFactory + +from shipyard_airflow.common.document_validators.errors import \ + DocumentNotFoundError + +LOG = logging.getLogger(__name__) + +# Variable to hold details about how the Kubernetes ConfigMap is stored +CONFIG_MAP_DETAILS = { + 'api_version': 'v1', + 'kind': 'ConfigMap', + 'pretty': 'true' +} + + +class DeploymentStatusOperator(BaseOperator): + """Deployment status operator + + Update Kubernetes with the deployment status of this dag's action + """ + + @apply_defaults + def __init__(self, shipyard_conf, main_dag_name, force_completed=False, + *args, **kwargs): + super(DeploymentStatusOperator, self).__init__(*args, **kwargs) + self.shipyard_conf = shipyard_conf + self.main_dag_name = main_dag_name + self.force_completed = force_completed + self.xcom_puller = None + + def execute(self, context): + """Execute the main code for this operator. + + Create a ConfigMap with the deployment status of this dag's action + """ + LOG.info("Running deployment status operator") + + self.xcom_puller = XcomPuller(self.main_dag_name, context['ti']) + + # Required for the get_deployment_status helper to function properly + config.parse_args(args=[], default_config_files=[self.shipyard_conf]) + + # First we need to check if the concurrency check was successful as + # this operator is expected to run even if upstream steps fail + if not self.xcom_puller.get_concurrency_status(): + msg = "Concurrency check did not pass, so the deployment status " \ + "will not be updated" + LOG.error(msg) + raise AirflowException(msg) + + deployment_status_doc, revision_id = self._get_status_and_revision() + deployment_version_doc = self._get_version_doc(revision_id) + + full_data = { + 'deployment': deployment_status_doc, + **deployment_version_doc + } + config_map_data = {'release': yaml.safe_dump(full_data)} + + self._store_as_config_map(config_map_data) + + def _get_status_and_revision(self): + """Retrieve the deployment status information from the appropriate + helper function + + :return: dict with the status of the deployment + :return: revision_id of the action + """ + action_info = self.xcom_puller.get_action_info() + deployment_status = get_deployment_status( + action_info, + force_completed=self.force_completed) + + revision_id = action_info['committed_rev_id'] + + return deployment_status, revision_id + + def _get_version_doc(self, revision_id): + """Retrieve the deployment-version document from Deckhand + + :param revision_id: the revision_id of the docs to grab the + deployment-version document from + :return: deployment-version document returned from Deckhand + """ + # Read and parse shipyard.conf + config = configparser.ConfigParser() + config.read(self.shipyard_conf) + + doc_name = config.get('document_info', 'deployment_version_name') + doc_schema = config.get('document_info', 'deployment_version_schema') + + dh_client = DeckhandClientFactory(self.shipyard_conf).get_client() + dh_tool = DocumentValidationUtils(dh_client) + + try: + deployment_version_doc = dh_tool.get_unique_doc( + revision_id=revision_id, + schema=doc_schema, + name=doc_name) + return deployment_version_doc + except DocumentNotFoundError: + LOG.info("There is no deployment-version document in Deckhand " + "under the revision '{}' with the name '{}' and schema " + "'{}'".format(revision_id, doc_name, doc_schema)) + return {} + + def _store_as_config_map(self, data): + """Store given data in a Kubernetes ConfigMap + + :param dict data: The data to store in the ConfigMap + """ + LOG.info("Storing deployment status as Kubernetes ConfigMap") + # Read and parse shipyard.conf + config = configparser.ConfigParser() + config.read(self.shipyard_conf) + + name = config.get('deployment_status_configmap', 'name') + namespace = config.get('deployment_status_configmap', 'namespace') + + k8s_client = self._get_k8s_client() + + cfg_map_obj = self._create_config_map_object(name, namespace, data) + cfg_map_naming = "(name: {}, namespace: {})".format(name, namespace) + try: + LOG.info("Updating deployment status config map {}, " + .format(cfg_map_naming)) + k8s_client.patch_namespaced_config_map( + name, + namespace, + cfg_map_obj, + pretty=CONFIG_MAP_DETAILS['pretty']) + except ApiException as err: + if err.status != 404: + raise + # ConfigMap still needs to be created + LOG.info("Deployment status config map does not exist yet") + LOG.info("Creating deployment status config map {}".format( + cfg_map_naming)) + k8s_client.create_namespaced_config_map( + namespace, + cfg_map_obj, + pretty=CONFIG_MAP_DETAILS['pretty']) + + @staticmethod + def _get_k8s_client(): + """Create and return a Kubernetes client + + :returns: A Kubernetes client object + :rtype: kubernetes.client + """ + # Note that we are using 'in_cluster_config' + LOG.debug("Loading Kubernetes config") + kubernetes.config.load_incluster_config() + LOG.debug("Creating Kubernetes client") + return kubernetes.client.CoreV1Api() + + @staticmethod + def _create_config_map_object(name, namespace, data): + """Create/return a Kubernetes ConfigMap object out of the given data + + :param dict data: The data to put into the config map + :returns: A config map object made from the given data + :rtype: V1ConfigMap + """ + LOG.debug("Creating Kubernetes config map object") + metadata = V1ObjectMeta( + name=name, + namespace=namespace + ) + return V1ConfigMap( + api_version=CONFIG_MAP_DETAILS['api_version'], + kind=CONFIG_MAP_DETAILS['kind'], + data=data, + metadata=metadata + ) + + +class DeploymentStatusOperatorPlugin(AirflowPlugin): + """Creates DeploymentStatusOperatorPlugin in Airflow.""" + name = "deployment_status_operator" + operators = [DeploymentStatusOperator]