diff --git a/shipyard_airflow/dags/armada_deploy_site.py b/shipyard_airflow/dags/armada_deploy_site.py new file mode 100644 index 00000000..ac1a1944 --- /dev/null +++ b/shipyard_airflow/dags/armada_deploy_site.py @@ -0,0 +1,107 @@ +# Copyright 2017 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from airflow.models import DAG +from airflow.operators import ArmadaOperator +from airflow.operators.subdag_operator import SubDagOperator + +# Location of shiyard.conf +config_path = '/usr/local/airflow/plugins/shipyard.conf' + +# Names used for sub-subdags in the armada site deployment subdag +CREATE_ARMADA_CLIENT_DAG_NAME = 'create_armada_client' +GET_ARMADA_STATUS_DAG_NAME = 'armada_status' +ARMADA_VALIDATE_DAG_NAME = 'armada_validate' +ARMADA_APPLY_DAG_NAME = 'armada_apply' +ARMADA_GET_RELEASES_DAG_NAME = 'armada_get_releases' + + +def get_armada_subdag_step(parent_dag_name, child_dag_name, args): + ''' + Execute Armada Subdag + ''' + dag = DAG( + '{}.{}'.format(parent_dag_name, child_dag_name), + default_args=args) + + # Note that in the event where the 'deploy_site' Action is + # triggered from Shipyard, the 'parent_dag_name' variable + # gets assigned with 'deploy_site.create_armada_client'. + # This is the name that we want to assign to the subdag so + # that we can reference it for xcom. The name of the main + # dag will be the front part of that value, i.e. 'deploy_site'. + # Hence we will extract the front part and assign it to main_dag. + # We will reuse this pattern for other Actions, e.g. update_site, + # redeploy_site as well. + operator = ArmadaOperator( + task_id=child_dag_name, + shipyard_conf=config_path, + action=child_dag_name, + main_dag_name=parent_dag_name[0:parent_dag_name.find('.')], + sub_dag_name=parent_dag_name, + dag=dag) + + return dag + + +def deploy_site_armada(parent_dag_name, child_dag_name, args): + ''' + Puts into atomic unit + ''' + dag = DAG( + '{}.{}'.format(parent_dag_name, child_dag_name), + default_args=args) + + armada_client = SubDagOperator( + subdag=get_armada_subdag_step(dag.dag_id, + CREATE_ARMADA_CLIENT_DAG_NAME, + args), + task_id=CREATE_ARMADA_CLIENT_DAG_NAME, + dag=dag) + + armada_status = SubDagOperator( + subdag=get_armada_subdag_step(dag.dag_id, + GET_ARMADA_STATUS_DAG_NAME, + args), + task_id=GET_ARMADA_STATUS_DAG_NAME, + dag=dag) + + armada_validate = SubDagOperator( + subdag=get_armada_subdag_step(dag.dag_id, + ARMADA_VALIDATE_DAG_NAME, + args), + task_id=ARMADA_VALIDATE_DAG_NAME, + dag=dag) + + armada_apply = SubDagOperator( + subdag=get_armada_subdag_step(dag.dag_id, + ARMADA_APPLY_DAG_NAME, + args), + task_id=ARMADA_APPLY_DAG_NAME, + dag=dag) + + armada_get_releases = SubDagOperator( + subdag=get_armada_subdag_step(dag.dag_id, + ARMADA_GET_RELEASES_DAG_NAME, + args), + task_id=ARMADA_GET_RELEASES_DAG_NAME, + dag=dag) + + # DAG Wiring + armada_status.set_upstream(armada_client) + armada_validate.set_upstream(armada_status) + armada_apply.set_upstream(armada_validate) + armada_get_releases.set_upstream(armada_apply) + + return dag diff --git a/shipyard_airflow/dags/deploy_site.py b/shipyard_airflow/dags/deploy_site.py index 3bb2c376..526a944c 100644 --- a/shipyard_airflow/dags/deploy_site.py +++ b/shipyard_airflow/dags/deploy_site.py @@ -23,6 +23,7 @@ from airflow.operators.subdag_operator import SubDagOperator from deckhand_get_design import get_design_deckhand from drydock_deploy_site import deploy_site_drydock +from armada_deploy_site import deploy_site_armada from preflight_checks import all_preflight_checks from validate_site_design import validate_site_design """ @@ -36,6 +37,7 @@ ALL_PREFLIGHT_CHECKS_DAG_NAME = 'preflight' DECKHAND_GET_DESIGN_VERSION = 'deckhand_get_design_version' VALIDATE_SITE_DESIGN_DAG_NAME = 'validate_site_design' DRYDOCK_BUILD_DAG_NAME = 'drydock_build' +ARMADA_BUILD_DAG_NAME = 'armada_build' default_args = { 'owner': 'airflow', @@ -104,8 +106,10 @@ query_node_status = PlaceholderOperator( on_failure_callback=failure_handlers.step_failure_handler, dag=dag) -armada_build = PlaceholderOperator( - task_id='armada_build', +armada_build = SubDagOperator( + subdag=deploy_site_armada( + PARENT_DAG_NAME, ARMADA_BUILD_DAG_NAME, args=default_args), + task_id=ARMADA_BUILD_DAG_NAME, on_failure_callback=failure_handlers.step_failure_handler, dag=dag) diff --git a/shipyard_airflow/plugins/armada_operator.py b/shipyard_airflow/plugins/armada_operator.py new file mode 100644 index 00000000..fb8de1ac --- /dev/null +++ b/shipyard_airflow/plugins/armada_operator.py @@ -0,0 +1,272 @@ +# Copyright 2017 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from urllib.parse import urlparse + +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults +from airflow.plugins_manager import AirflowPlugin +from airflow.exceptions import AirflowException + +import armada.common.client as client +import armada.common.session as session +from get_k8s_pod_port_ip import get_pod_port_ip +from service_endpoint import ucp_service_endpoint +from service_token import shipyard_service_token + + +class ArmadaOperator(BaseOperator): + """ + Supports interaction with Armada + :param action: Task to perform + :param main_dag_name: Parent Dag + :param shipyard_conf: Location of shipyard.conf + :param sub_dag_name: Child Dag + """ + + @apply_defaults + def __init__(self, + action=None, + main_dag_name=None, + shipyard_conf=None, + sub_dag_name=None, + workflow_info={}, + xcom_push=True, + *args, **kwargs): + + super(ArmadaOperator, self).__init__(*args, **kwargs) + self.action = action + self.main_dag_name = main_dag_name + self.shipyard_conf = shipyard_conf + self.sub_dag_name = sub_dag_name + self.workflow_info = workflow_info + self.xcom_push_flag = xcom_push + + def execute(self, context): + # Initialize Variables + context['svc_type'] = 'armada' + armada_client = None + + # Define task_instance + task_instance = context['task_instance'] + + # Extract information related to current workflow + # The workflow_info variable will be a dictionary + # that contains information about the workflow such + # as action_id, name and other related parameters + workflow_info = task_instance.xcom_pull( + task_ids='action_xcom', key='action', + dag_id=self.main_dag_name) + + # Logs uuid of action performed by the Operator + logging.info("Armada Operator for action %s", workflow_info['id']) + + # Create Armada Client + if self.action == 'create_armada_client': + # Retrieve Endpoint Information + context['svc_endpoint'] = ucp_service_endpoint(self, context) + logging.info("Armada endpoint is %s", context['svc_endpoint']) + + # Set up Armada Client + session_client = self.armada_session_client(context) + + return session_client + + # Retrieve armada_client via XCOM so as to perform other tasks + armada_client = task_instance.xcom_pull( + task_ids='create_armada_client', + dag_id=self.sub_dag_name + '.create_armada_client') + + # Armada API Call + # Armada Status + if self.action == 'armada_status': + # Retrieve Tiller Information and assign to context 'query' + context['query'] = self.get_tiller_info(context) + + self.get_armada_status(context, armada_client) + + # Armada Validate + elif self.action == 'armada_validate': + self.armada_validate(context, armada_client) + + # Armada Apply + elif self.action == 'armada_apply': + # Retrieve Tiller Information and assign to context 'query' + context['query'] = self.get_tiller_info(context) + + self.armada_apply(context, armada_client) + + # Armada Get Releases + elif self.action == 'armada_get_releases': + # Retrieve Tiller Information and assign to context 'query' + context['query'] = self.get_tiller_info(context) + + self.armada_get_releases(context, armada_client) + + else: + logging.info('No Action to Perform') + + @shipyard_service_token + def armada_session_client(self, context): + # Initialize Variables + armada_url = None + a_session = None + a_client = None + + # Parse Armada Service Endpoint + armada_url = urlparse(context['svc_endpoint']) + + # Build a ArmadaSession with credentials and target host + # information. + logging.info("Build Armada Session") + a_session = session.ArmadaSession(armada_url.hostname, + port=armada_url.port, + token=context['svc_token']) + + # Raise Exception if we are not able to get armada session + if a_session: + logging.info("Successfully Set Up Armada Session") + else: + raise AirflowException("Failed to set up Armada Session!") + + # Use session to build a ArmadaClient to make one or more + # API calls. The ArmadaSession will care for TCP connection + # pooling and header management + logging.info("Create Armada Client") + a_client = client.ArmadaClient(a_session) + + # Raise Exception if we are not able to build armada client + if a_client: + logging.info("Successfully Set Up Armada client") + else: + raise AirflowException("Failed to set up Armada client!") + + # Return Armada client for XCOM Usage + return a_client + + @get_pod_port_ip('tiller') + def get_tiller_info(self, context, *args): + # Initialize Variable + query = {} + + # Get IP and port information of Pods from context + k8s_pods_ip_port = context['pods_ip_port'] + + # Assign value to the 'query' dictionary so that we can pass + # it via the Armada Client + query['tiller_host'] = k8s_pods_ip_port['tiller'].get('ip') + query['tiller_port'] = k8s_pods_ip_port['tiller'].get('port') + + return query + + def get_armada_status(self, context, armada_client): + # Check State of Tiller + armada_status = armada_client.get_status(context['query']) + + # Tiller State will return boolean value, i.e. True/False + # Raise Exception if Tiller is in a bad state + if armada_status['tiller']['state']: + logging.info("Tiller is in running state") + logging.info("Tiller version is %s", + armada_status['tiller']['version']) + else: + raise AirflowException("Please check Tiller!") + + def armada_validate(self, context, armada_client): + # Initialize Variables + armada_manifest = None + valid_armada_yaml = {} + + # At this point in time, testing of the operator is being done by + # reading the armada.yaml file on airflow and feeding it to Armada as + # a string. We will assume that the file name is fixed and will always + # be 'armada_site.yaml'. This will change in the near future when + # Armada is integrated with DeckHand. + yaml_path = '/usr/local/airflow/plugins/armada_site.yaml' + + # TODO: We will implement the new approach when Armada and DeckHand + # integration is completed. + with open(yaml_path, 'r') as armada_yaml: + armada_manifest = armada_yaml.read() + + # Validate armada yaml file + logging.info("Armada Validate") + valid_armada_yaml = armada_client.post_validate(armada_manifest) + + # The response will be a dictionary indicating whether the yaml + # file is valid or invalid. We will check the Boolean value in + # this case. + if valid_armada_yaml['valid']: + logging.info("Armada Yaml File is Valid") + else: + raise AirflowException("Invalid Armada Yaml File!") + + def armada_apply(self, context, armada_client): + # Initialize Variables + armada_manifest = None + armada_post_apply = {} + override_values = [] + chart_set = [] + + # At this point in time, testing of the operator is being done by + # reading the armada.yaml file on airflow and feeding it to Armada as + # a string. We will assume that the file name is fixed and will always + # be 'armada_site.yaml'. This will change in the near future when + # Armada is integrated with DeckHand. + yaml_path = '/usr/local/airflow/plugins/armada_site.yaml' + + # TODO: We will implement the new approach when Armada and DeckHand + # integration is completed. Override and chart_set will be considered + # at that time. + with open(yaml_path, 'r') as armada_yaml: + armada_manifest = armada_yaml.read() + + # Execute Armada Apply to install the helm charts in sequence + logging.info("Armada Apply") + armada_post_apply = armada_client.post_apply(armada_manifest, + override_values, + chart_set, + context['query']) + + # We will expect Armada to return the releases that it is + # deploying. An empty value for 'install' means that armada + # delploy has failed. Note that if we try and deploy the same + # release twice, we will end up with empty response on our + # second attempt and that will be treated as a failure scenario. + if armada_post_apply['message']['install']: + logging.info("Armada Apply Successfully Executed") + logging.info(armada_post_apply) + else: + logging.info(armada_post_apply) + raise AirflowException("Armada Apply Failed!") + + def armada_get_releases(self, context, armada_client): + # Initialize Variables + armada_releases = {} + + # Retrieve Armada Releases after deployment + logging.info("Retrieving Armada Releases after deployment..") + armada_releases = armada_client.get_releases(context['query']) + + if armada_releases: + logging.info("Retrieved current Armada Releases") + logging.info(armada_releases) + else: + raise AirflowException("Failed to retrieve Armada Releases") + + +class ArmadaOperatorPlugin(AirflowPlugin): + name = 'armada_operator_plugin' + operators = [ArmadaOperator]