From e7239e2a8614f52853827a4ced566928faf4cfc0 Mon Sep 17 00:00:00 2001 From: Anthony Lin Date: Tue, 10 Oct 2017 03:02:41 +0000 Subject: [PATCH] Shipyard Armada Operator - Uses armada_client - Uses keystone token - Included error handling for workflow - Covers the following flow: 1. Set up Armada Client/Session 2. Armada Status 3. Armada Validate 4. Armada Apply 5. Armada Get Releases Note that Armada and DeckHand integration is ongoing at the moment. We will be reading the armada.yaml directly from Airflow and sending it to Armada for this Patch Set. We will update the Armada Operator once the integration betwen Armada and DeckHand is completed. Change-Id: I53b9257f1d5c4b443989cd0cc8154dd51f7d4168 --- shipyard_airflow/dags/armada_deploy_site.py | 107 ++++++++ shipyard_airflow/dags/deploy_site.py | 8 +- shipyard_airflow/plugins/armada_operator.py | 272 ++++++++++++++++++++ 3 files changed, 385 insertions(+), 2 deletions(-) create mode 100644 shipyard_airflow/dags/armada_deploy_site.py create mode 100644 shipyard_airflow/plugins/armada_operator.py diff --git a/shipyard_airflow/dags/armada_deploy_site.py b/shipyard_airflow/dags/armada_deploy_site.py new file mode 100644 index 00000000..ac1a1944 --- /dev/null +++ b/shipyard_airflow/dags/armada_deploy_site.py @@ -0,0 +1,107 @@ +# Copyright 2017 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from airflow.models import DAG +from airflow.operators import ArmadaOperator +from airflow.operators.subdag_operator import SubDagOperator + +# Location of shiyard.conf +config_path = '/usr/local/airflow/plugins/shipyard.conf' + +# Names used for sub-subdags in the armada site deployment subdag +CREATE_ARMADA_CLIENT_DAG_NAME = 'create_armada_client' +GET_ARMADA_STATUS_DAG_NAME = 'armada_status' +ARMADA_VALIDATE_DAG_NAME = 'armada_validate' +ARMADA_APPLY_DAG_NAME = 'armada_apply' +ARMADA_GET_RELEASES_DAG_NAME = 'armada_get_releases' + + +def get_armada_subdag_step(parent_dag_name, child_dag_name, args): + ''' + Execute Armada Subdag + ''' + dag = DAG( + '{}.{}'.format(parent_dag_name, child_dag_name), + default_args=args) + + # Note that in the event where the 'deploy_site' Action is + # triggered from Shipyard, the 'parent_dag_name' variable + # gets assigned with 'deploy_site.create_armada_client'. + # This is the name that we want to assign to the subdag so + # that we can reference it for xcom. The name of the main + # dag will be the front part of that value, i.e. 'deploy_site'. + # Hence we will extract the front part and assign it to main_dag. + # We will reuse this pattern for other Actions, e.g. update_site, + # redeploy_site as well. + operator = ArmadaOperator( + task_id=child_dag_name, + shipyard_conf=config_path, + action=child_dag_name, + main_dag_name=parent_dag_name[0:parent_dag_name.find('.')], + sub_dag_name=parent_dag_name, + dag=dag) + + return dag + + +def deploy_site_armada(parent_dag_name, child_dag_name, args): + ''' + Puts into atomic unit + ''' + dag = DAG( + '{}.{}'.format(parent_dag_name, child_dag_name), + default_args=args) + + armada_client = SubDagOperator( + subdag=get_armada_subdag_step(dag.dag_id, + CREATE_ARMADA_CLIENT_DAG_NAME, + args), + task_id=CREATE_ARMADA_CLIENT_DAG_NAME, + dag=dag) + + armada_status = SubDagOperator( + subdag=get_armada_subdag_step(dag.dag_id, + GET_ARMADA_STATUS_DAG_NAME, + args), + task_id=GET_ARMADA_STATUS_DAG_NAME, + dag=dag) + + armada_validate = SubDagOperator( + subdag=get_armada_subdag_step(dag.dag_id, + ARMADA_VALIDATE_DAG_NAME, + args), + task_id=ARMADA_VALIDATE_DAG_NAME, + dag=dag) + + armada_apply = SubDagOperator( + subdag=get_armada_subdag_step(dag.dag_id, + ARMADA_APPLY_DAG_NAME, + args), + task_id=ARMADA_APPLY_DAG_NAME, + dag=dag) + + armada_get_releases = SubDagOperator( + subdag=get_armada_subdag_step(dag.dag_id, + ARMADA_GET_RELEASES_DAG_NAME, + args), + task_id=ARMADA_GET_RELEASES_DAG_NAME, + dag=dag) + + # DAG Wiring + armada_status.set_upstream(armada_client) + armada_validate.set_upstream(armada_status) + armada_apply.set_upstream(armada_validate) + armada_get_releases.set_upstream(armada_apply) + + return dag diff --git a/shipyard_airflow/dags/deploy_site.py b/shipyard_airflow/dags/deploy_site.py index 3bb2c376..526a944c 100644 --- a/shipyard_airflow/dags/deploy_site.py +++ b/shipyard_airflow/dags/deploy_site.py @@ -23,6 +23,7 @@ from airflow.operators.subdag_operator import SubDagOperator from deckhand_get_design import get_design_deckhand from drydock_deploy_site import deploy_site_drydock +from armada_deploy_site import deploy_site_armada from preflight_checks import all_preflight_checks from validate_site_design import validate_site_design """ @@ -36,6 +37,7 @@ ALL_PREFLIGHT_CHECKS_DAG_NAME = 'preflight' DECKHAND_GET_DESIGN_VERSION = 'deckhand_get_design_version' VALIDATE_SITE_DESIGN_DAG_NAME = 'validate_site_design' DRYDOCK_BUILD_DAG_NAME = 'drydock_build' +ARMADA_BUILD_DAG_NAME = 'armada_build' default_args = { 'owner': 'airflow', @@ -104,8 +106,10 @@ query_node_status = PlaceholderOperator( on_failure_callback=failure_handlers.step_failure_handler, dag=dag) -armada_build = PlaceholderOperator( - task_id='armada_build', +armada_build = SubDagOperator( + subdag=deploy_site_armada( + PARENT_DAG_NAME, ARMADA_BUILD_DAG_NAME, args=default_args), + task_id=ARMADA_BUILD_DAG_NAME, on_failure_callback=failure_handlers.step_failure_handler, dag=dag) diff --git a/shipyard_airflow/plugins/armada_operator.py b/shipyard_airflow/plugins/armada_operator.py new file mode 100644 index 00000000..fb8de1ac --- /dev/null +++ b/shipyard_airflow/plugins/armada_operator.py @@ -0,0 +1,272 @@ +# Copyright 2017 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from urllib.parse import urlparse + +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults +from airflow.plugins_manager import AirflowPlugin +from airflow.exceptions import AirflowException + +import armada.common.client as client +import armada.common.session as session +from get_k8s_pod_port_ip import get_pod_port_ip +from service_endpoint import ucp_service_endpoint +from service_token import shipyard_service_token + + +class ArmadaOperator(BaseOperator): + """ + Supports interaction with Armada + :param action: Task to perform + :param main_dag_name: Parent Dag + :param shipyard_conf: Location of shipyard.conf + :param sub_dag_name: Child Dag + """ + + @apply_defaults + def __init__(self, + action=None, + main_dag_name=None, + shipyard_conf=None, + sub_dag_name=None, + workflow_info={}, + xcom_push=True, + *args, **kwargs): + + super(ArmadaOperator, self).__init__(*args, **kwargs) + self.action = action + self.main_dag_name = main_dag_name + self.shipyard_conf = shipyard_conf + self.sub_dag_name = sub_dag_name + self.workflow_info = workflow_info + self.xcom_push_flag = xcom_push + + def execute(self, context): + # Initialize Variables + context['svc_type'] = 'armada' + armada_client = None + + # Define task_instance + task_instance = context['task_instance'] + + # Extract information related to current workflow + # The workflow_info variable will be a dictionary + # that contains information about the workflow such + # as action_id, name and other related parameters + workflow_info = task_instance.xcom_pull( + task_ids='action_xcom', key='action', + dag_id=self.main_dag_name) + + # Logs uuid of action performed by the Operator + logging.info("Armada Operator for action %s", workflow_info['id']) + + # Create Armada Client + if self.action == 'create_armada_client': + # Retrieve Endpoint Information + context['svc_endpoint'] = ucp_service_endpoint(self, context) + logging.info("Armada endpoint is %s", context['svc_endpoint']) + + # Set up Armada Client + session_client = self.armada_session_client(context) + + return session_client + + # Retrieve armada_client via XCOM so as to perform other tasks + armada_client = task_instance.xcom_pull( + task_ids='create_armada_client', + dag_id=self.sub_dag_name + '.create_armada_client') + + # Armada API Call + # Armada Status + if self.action == 'armada_status': + # Retrieve Tiller Information and assign to context 'query' + context['query'] = self.get_tiller_info(context) + + self.get_armada_status(context, armada_client) + + # Armada Validate + elif self.action == 'armada_validate': + self.armada_validate(context, armada_client) + + # Armada Apply + elif self.action == 'armada_apply': + # Retrieve Tiller Information and assign to context 'query' + context['query'] = self.get_tiller_info(context) + + self.armada_apply(context, armada_client) + + # Armada Get Releases + elif self.action == 'armada_get_releases': + # Retrieve Tiller Information and assign to context 'query' + context['query'] = self.get_tiller_info(context) + + self.armada_get_releases(context, armada_client) + + else: + logging.info('No Action to Perform') + + @shipyard_service_token + def armada_session_client(self, context): + # Initialize Variables + armada_url = None + a_session = None + a_client = None + + # Parse Armada Service Endpoint + armada_url = urlparse(context['svc_endpoint']) + + # Build a ArmadaSession with credentials and target host + # information. + logging.info("Build Armada Session") + a_session = session.ArmadaSession(armada_url.hostname, + port=armada_url.port, + token=context['svc_token']) + + # Raise Exception if we are not able to get armada session + if a_session: + logging.info("Successfully Set Up Armada Session") + else: + raise AirflowException("Failed to set up Armada Session!") + + # Use session to build a ArmadaClient to make one or more + # API calls. The ArmadaSession will care for TCP connection + # pooling and header management + logging.info("Create Armada Client") + a_client = client.ArmadaClient(a_session) + + # Raise Exception if we are not able to build armada client + if a_client: + logging.info("Successfully Set Up Armada client") + else: + raise AirflowException("Failed to set up Armada client!") + + # Return Armada client for XCOM Usage + return a_client + + @get_pod_port_ip('tiller') + def get_tiller_info(self, context, *args): + # Initialize Variable + query = {} + + # Get IP and port information of Pods from context + k8s_pods_ip_port = context['pods_ip_port'] + + # Assign value to the 'query' dictionary so that we can pass + # it via the Armada Client + query['tiller_host'] = k8s_pods_ip_port['tiller'].get('ip') + query['tiller_port'] = k8s_pods_ip_port['tiller'].get('port') + + return query + + def get_armada_status(self, context, armada_client): + # Check State of Tiller + armada_status = armada_client.get_status(context['query']) + + # Tiller State will return boolean value, i.e. True/False + # Raise Exception if Tiller is in a bad state + if armada_status['tiller']['state']: + logging.info("Tiller is in running state") + logging.info("Tiller version is %s", + armada_status['tiller']['version']) + else: + raise AirflowException("Please check Tiller!") + + def armada_validate(self, context, armada_client): + # Initialize Variables + armada_manifest = None + valid_armada_yaml = {} + + # At this point in time, testing of the operator is being done by + # reading the armada.yaml file on airflow and feeding it to Armada as + # a string. We will assume that the file name is fixed and will always + # be 'armada_site.yaml'. This will change in the near future when + # Armada is integrated with DeckHand. + yaml_path = '/usr/local/airflow/plugins/armada_site.yaml' + + # TODO: We will implement the new approach when Armada and DeckHand + # integration is completed. + with open(yaml_path, 'r') as armada_yaml: + armada_manifest = armada_yaml.read() + + # Validate armada yaml file + logging.info("Armada Validate") + valid_armada_yaml = armada_client.post_validate(armada_manifest) + + # The response will be a dictionary indicating whether the yaml + # file is valid or invalid. We will check the Boolean value in + # this case. + if valid_armada_yaml['valid']: + logging.info("Armada Yaml File is Valid") + else: + raise AirflowException("Invalid Armada Yaml File!") + + def armada_apply(self, context, armada_client): + # Initialize Variables + armada_manifest = None + armada_post_apply = {} + override_values = [] + chart_set = [] + + # At this point in time, testing of the operator is being done by + # reading the armada.yaml file on airflow and feeding it to Armada as + # a string. We will assume that the file name is fixed and will always + # be 'armada_site.yaml'. This will change in the near future when + # Armada is integrated with DeckHand. + yaml_path = '/usr/local/airflow/plugins/armada_site.yaml' + + # TODO: We will implement the new approach when Armada and DeckHand + # integration is completed. Override and chart_set will be considered + # at that time. + with open(yaml_path, 'r') as armada_yaml: + armada_manifest = armada_yaml.read() + + # Execute Armada Apply to install the helm charts in sequence + logging.info("Armada Apply") + armada_post_apply = armada_client.post_apply(armada_manifest, + override_values, + chart_set, + context['query']) + + # We will expect Armada to return the releases that it is + # deploying. An empty value for 'install' means that armada + # delploy has failed. Note that if we try and deploy the same + # release twice, we will end up with empty response on our + # second attempt and that will be treated as a failure scenario. + if armada_post_apply['message']['install']: + logging.info("Armada Apply Successfully Executed") + logging.info(armada_post_apply) + else: + logging.info(armada_post_apply) + raise AirflowException("Armada Apply Failed!") + + def armada_get_releases(self, context, armada_client): + # Initialize Variables + armada_releases = {} + + # Retrieve Armada Releases after deployment + logging.info("Retrieving Armada Releases after deployment..") + armada_releases = armada_client.get_releases(context['query']) + + if armada_releases: + logging.info("Retrieved current Armada Releases") + logging.info(armada_releases) + else: + raise AirflowException("Failed to retrieve Armada Releases") + + +class ArmadaOperatorPlugin(AirflowPlugin): + name = 'armada_operator_plugin' + operators = [ArmadaOperator]