From 3d88cf9e33775c2de2cb169b505478672d717b33 Mon Sep 17 00:00:00 2001 From: Anthony Lin Date: Mon, 22 Jan 2018 05:44:32 +0000 Subject: [PATCH] Redeploy Server - Dags & Operators This patch set updates the required dags and operators for the redeploy server workflow. It also introduces the Promenade Operator. Note that many of the required functionalities in DryDock and Promenade are being worked on and are not ready at the moment. As such, this patch set is mainly providing the skeleton framework for the redeploy server workflow. The dags and relevant Operators will be updated at a later date when the features and functionalities are ready for usage. Change-Id: I4baae76ea9d8cde9c2b0bab3feac896d01400868 --- charts/shipyard/values.yaml | 2 + etc/shipyard/shipyard.conf.sample | 6 + shipyard_airflow/conf/config.py | 10 + shipyard_airflow/dags/destroy_node.py | 95 +++++++++ shipyard_airflow/dags/redeploy_server.py | 56 +++-- shipyard_airflow/plugins/deckhand_operator.py | 12 ++ shipyard_airflow/plugins/drydock_operators.py | 37 +++- .../plugins/promenade_operator.py | 201 ++++++++++++++++++ tests/unit/control/test.conf | 2 + tools/resources/shipyard.conf | 2 + 10 files changed, 393 insertions(+), 30 deletions(-) create mode 100644 shipyard_airflow/dags/destroy_node.py create mode 100644 shipyard_airflow/plugins/promenade_operator.py diff --git a/charts/shipyard/values.yaml b/charts/shipyard/values.yaml index 15a7698f..84bfb998 100644 --- a/charts/shipyard/values.yaml +++ b/charts/shipyard/values.yaml @@ -328,6 +328,8 @@ conf: prepare_node_task_timeout: 1800 deploy_node_query_interval: 30 deploy_node_task_timeout: 3600 + destroy_node_query_interval: 30 + destroy_node_task_timeout: 900 cluster_join_check_backoff_time: 120 keystone_authtoken: delay_auth_decision: true diff --git a/etc/shipyard/shipyard.conf.sample b/etc/shipyard/shipyard.conf.sample index dc17fe6f..2d3097f5 100644 --- a/etc/shipyard/shipyard.conf.sample +++ b/etc/shipyard/shipyard.conf.sample @@ -82,6 +82,12 @@ # Time out (in seconds) for deploy_node task (integer value) #deploy_node_task_timeout = 3600 +# Query interval (in seconds) for destroy_node task (integer value) +#destroy_node_query_interval = 30 + +# Time out (in seconds) for destroy_node task (integer value) +#destroy_node_task_timeout = 900 + # Backoff time (in seconds) before checking cluster join (integer value) #cluster_join_check_backoff_time = 120 diff --git a/shipyard_airflow/conf/config.py b/shipyard_airflow/conf/config.py index afc42375..9b2cbbd1 100644 --- a/shipyard_airflow/conf/config.py +++ b/shipyard_airflow/conf/config.py @@ -170,6 +170,16 @@ SECTIONS = [ default=3600, help='Time out (in seconds) for deploy_node task' ), + cfg.IntOpt( + 'destroy_node_query_interval', + default=30, + help='Query interval (in seconds) for destroy_node task' + ), + cfg.IntOpt( + 'destroy_node_task_timeout', + default=900, + help='Time out (in seconds) for destroy_node task' + ), cfg.IntOpt( 'cluster_join_check_backoff_time', default=120, diff --git a/shipyard_airflow/dags/destroy_node.py b/shipyard_airflow/dags/destroy_node.py new file mode 100644 index 00000000..0fefb4cf --- /dev/null +++ b/shipyard_airflow/dags/destroy_node.py @@ -0,0 +1,95 @@ +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from airflow.models import DAG +from airflow.operators import DryDockOperator +from airflow.operators import PromenadeOperator + + +# Location of shiyard.conf +# Note that the shipyard.conf file needs to be placed on a volume +# that can be accessed by the containers +config_path = '/usr/local/airflow/plugins/shipyard.conf' + + +def destroy_server(parent_dag_name, child_dag_name, args): + ''' + Tear Down Node + ''' + dag = DAG( + '{}.{}'.format(parent_dag_name, child_dag_name), + default_args=args) + + # Drain Node + promenade_drain_node = PromenadeOperator( + task_id='promenade_drain_node', + shipyard_conf=config_path, + action='promenade_drain_node', + main_dag_name=parent_dag_name, + sub_dag_name=child_dag_name, + dag=dag) + + # Remove Labels + promenade_remove_labels = PromenadeOperator( + task_id='promenade_remove_labels', + shipyard_conf=config_path, + action='promenade_remove_labels', + main_dag_name=parent_dag_name, + sub_dag_name=child_dag_name, + dag=dag) + + # Stop Kubelet + promenade_stop_kubelet = PromenadeOperator( + task_id='promenade_stop_kubelet', + shipyard_conf=config_path, + action='promenade_stop_kubelet', + main_dag_name=parent_dag_name, + sub_dag_name=child_dag_name, + dag=dag) + + # ETCD Sanity Check + promenade_check_etcd = PromenadeOperator( + task_id='promenade_check_etcd', + shipyard_conf=config_path, + action='promenade_check_etcd', + main_dag_name=parent_dag_name, + sub_dag_name=child_dag_name, + dag=dag) + + # Power down and destroy node using DryDock + drydock_destroy_node = DryDockOperator( + task_id='destroy_node', + shipyard_conf=config_path, + action='destroy_node', + main_dag_name=parent_dag_name, + sub_dag_name=child_dag_name, + dag=dag) + + # Delete node from cluster using Promenade + promenade_delete_node = PromenadeOperator( + task_id='promenade_delete_node', + shipyard_conf=config_path, + action='promenade_delete_node', + main_dag_name=parent_dag_name, + sub_dag_name=child_dag_name, + dag=dag) + + # Define dependencies + promenade_remove_labels.set_upstream(promenade_drain_node) + promenade_stop_kubelet.set_upstream(promenade_remove_labels) + promenade_check_etcd.set_upstream(promenade_stop_kubelet) + drydock_destroy_node.set_upstream(promenade_check_etcd) + promenade_delete_node.set_upstream(drydock_destroy_node) + + return dag diff --git a/shipyard_airflow/dags/redeploy_server.py b/shipyard_airflow/dags/redeploy_server.py index a59ff191..5d0d25aa 100644 --- a/shipyard_airflow/dags/redeploy_server.py +++ b/shipyard_airflow/dags/redeploy_server.py @@ -14,24 +14,28 @@ from datetime import timedelta import airflow -from airflow import DAG import failure_handlers +from airflow import DAG +from airflow.operators import ConcurrencyCheckOperator +from airflow.operators.python_operator import PythonOperator +from airflow.operators.subdag_operator import SubDagOperator + +from deckhand_get_design import get_design_deckhand +from destroy_node import destroy_server +from drydock_deploy_site import deploy_site_drydock from preflight_checks import all_preflight_checks from validate_site_design import validate_site_design -from airflow.operators.subdag_operator import SubDagOperator -from airflow.operators import ConcurrencyCheckOperator -from airflow.operators import DeckhandOperator -from airflow.operators import PlaceholderOperator -from airflow.operators.python_operator import PythonOperator """ redeploy_server is the top-level orchestration DAG for redeploying a server using the Undercloud platform. """ -PARENT_DAG_NAME = 'redeploy_server' -DAG_CONCURRENCY_CHECK_DAG_NAME = 'dag_concurrency_check' ALL_PREFLIGHT_CHECKS_DAG_NAME = 'preflight' +DAG_CONCURRENCY_CHECK_DAG_NAME = 'dag_concurrency_check' DECKHAND_GET_DESIGN_VERSION = 'deckhand_get_design_version' +DESTROY_SERVER_DAG_NAME = 'destroy_server' +DRYDOCK_BUILD_DAG_NAME = 'drydock_build' +PARENT_DAG_NAME = 'redeploy_server' VALIDATE_SITE_DESIGN_DAG_NAME = 'validate_site_design' default_args = { @@ -43,7 +47,7 @@ default_args = { 'email_on_retry': False, 'provide_context': True, 'retries': 0, - 'retry_delay': timedelta(minutes=1), + 'retry_delay': timedelta(seconds=30), } dag = DAG(PARENT_DAG_NAME, default_args=default_args, schedule_interval=None) @@ -73,9 +77,11 @@ preflight = SubDagOperator( PARENT_DAG_NAME, ALL_PREFLIGHT_CHECKS_DAG_NAME, args=default_args), task_id=ALL_PREFLIGHT_CHECKS_DAG_NAME, on_failure_callback=failure_handlers.step_failure_handler, - dag=dag, ) + dag=dag) -get_design_version = DeckhandOperator( +get_design_version = SubDagOperator( + subdag=get_design_deckhand( + PARENT_DAG_NAME, DECKHAND_GET_DESIGN_VERSION, args=default_args), task_id=DECKHAND_GET_DESIGN_VERSION, on_failure_callback=failure_handlers.step_failure_handler, dag=dag) @@ -87,23 +93,17 @@ validate_site_design = SubDagOperator( on_failure_callback=failure_handlers.step_failure_handler, dag=dag) -site_evacuation = PlaceholderOperator( - task_id='site_evacuation', +destroy_server = SubDagOperator( + subdag=destroy_server( + PARENT_DAG_NAME, DESTROY_SERVER_DAG_NAME, args=default_args), + task_id=DESTROY_SERVER_DAG_NAME, on_failure_callback=failure_handlers.step_failure_handler, dag=dag) -drydock_rebuild = PlaceholderOperator( - task_id='drydock_rebuild', - on_failure_callback=failure_handlers.step_failure_handler, - dag=dag) - -query_node_status = PlaceholderOperator( - task_id='redeployed_node_status', - on_failure_callback=failure_handlers.step_failure_handler, - dag=dag) - -armada_rebuild = PlaceholderOperator( - task_id='armada_rebuild', +drydock_build = SubDagOperator( + subdag=deploy_site_drydock( + PARENT_DAG_NAME, DRYDOCK_BUILD_DAG_NAME, args=default_args), + task_id=DRYDOCK_BUILD_DAG_NAME, on_failure_callback=failure_handlers.step_failure_handler, dag=dag) @@ -112,7 +112,5 @@ concurrency_check.set_upstream(action_xcom) preflight.set_upstream(concurrency_check) get_design_version.set_upstream(preflight) validate_site_design.set_upstream(get_design_version) -site_evacuation.set_upstream(validate_site_design) -drydock_rebuild.set_upstream(site_evacuation) -query_node_status.set_upstream(drydock_rebuild) -armada_rebuild.set_upstream(query_node_status) +destroy_server.set_upstream(validate_site_design) +drydock_build.set_upstream(destroy_server) diff --git a/shipyard_airflow/plugins/deckhand_operator.py b/shipyard_airflow/plugins/deckhand_operator.py index 377ea1c4..e262c8ba 100644 --- a/shipyard_airflow/plugins/deckhand_operator.py +++ b/shipyard_airflow/plugins/deckhand_operator.py @@ -56,6 +56,7 @@ class DeckhandOperator(BaseOperator): def execute(self, context): # Initialize Variables deckhand_design_version = None + redeploy_server = None # Define task_instance task_instance = context['task_instance'] @@ -71,6 +72,17 @@ class DeckhandOperator(BaseOperator): # Logs uuid of action performed by the Operator logging.info("DeckHand Operator for action %s", workflow_info['id']) + # Retrieve information of the server that we want to redeploy if user + # executes the 'redeploy_server' dag + if workflow_info['dag_id'] == 'redeploy_server': + redeploy_server = workflow_info['parameters'].get('server-name') + + if redeploy_server: + logging.info("Server to be redeployed is %s", redeploy_server) + else: + raise AirflowException('Unable to retrieve information of ' + 'node to be redeployed!') + # Retrieve Endpoint Information svc_type = 'deckhand' context['svc_endpoint'] = ucp_service_endpoint(self, diff --git a/shipyard_airflow/plugins/drydock_operators.py b/shipyard_airflow/plugins/drydock_operators.py index d71983ca..ca2ed582 100644 --- a/shipyard_airflow/plugins/drydock_operators.py +++ b/shipyard_airflow/plugins/drydock_operators.py @@ -68,6 +68,8 @@ class DryDockOperator(BaseOperator): self.xcom_push_flag = xcom_push def execute(self, context): + # Initialize Variable + redeploy_server = None # Placeholder definition # TODO: Need to decide how to pass the required value from Shipyard to @@ -88,6 +90,19 @@ class DryDockOperator(BaseOperator): # Logs uuid of action performed by the Operator logging.info("DryDock Operator for action %s", workflow_info['id']) + # Retrieve information of the server that we want to redeploy if user + # executes the 'redeploy_server' dag + # Set node filter to be the server that we want to redeploy + if workflow_info['dag_id'] == 'redeploy_server': + redeploy_server = workflow_info['parameters'].get('server-name') + + if redeploy_server: + logging.info("Server to be redeployed is %s", redeploy_server) + self.node_filter = redeploy_server + else: + raise AirflowException('Unable to retrieve information of ' + 'node to be redeployed!') + # Retrieve Deckhand Design Reference self.design_ref = self.get_deckhand_design_ref(context) @@ -188,6 +203,26 @@ class DryDockOperator(BaseOperator): # polling interval to 30 seconds. check_node_status(1800, 30) + # Create Task for destroy_node + # NOTE: This is a PlaceHolder function. The 'destroy_node' + # functionalities in DryDock is being worked on and is not + # ready at the moment. + elif self.action == 'destroy_node': + # Default settings for 'destroy_node' execution is to query + # the task every 30 seconds and to time out after 900 seconds + query_interval = config.get('drydock', + 'destroy_node_query_interval') + task_timeout = config.get('drydock', 'destroy_node_task_timeout') + + logging.info("Destroying node %s from cluster...", redeploy_server) + time.sleep(30) + logging.info("Successfully deleted node %s", redeploy_server) + + # TODO: Uncomment when the function to destroy/delete node is + # ready for consumption in Drydock + # self.drydock_action(drydock_client, context, self.action, + # query_interval, task_timeout) + # Do not perform any action else: logging.info('No Action to Perform') @@ -235,7 +270,7 @@ class DryDockOperator(BaseOperator): # Trigger DryDock to execute task and retrieve task ID task_id = self.drydock_perform_task(drydock_client, context, - action, None) + action, self.node_filter) logging.info('Task ID is %s', task_id) diff --git a/shipyard_airflow/plugins/promenade_operator.py b/shipyard_airflow/plugins/promenade_operator.py new file mode 100644 index 00000000..f9c46a2b --- /dev/null +++ b/shipyard_airflow/plugins/promenade_operator.py @@ -0,0 +1,201 @@ +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import time + +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults +from airflow.plugins_manager import AirflowPlugin +from airflow.exceptions import AirflowException + +from service_endpoint import ucp_service_endpoint +from service_token import shipyard_service_token + + +class PromenadeOperator(BaseOperator): + """ + Supports interaction with Promenade + :param action: Task to perform + :param main_dag_name: Parent Dag + :param shipyard_conf: Location of shipyard.conf + :param sub_dag_name: Child Dag + """ + + @apply_defaults + def __init__(self, + action=None, + main_dag_name=None, + shipyard_conf=None, + sub_dag_name=None, + workflow_info={}, + xcom_push=True, + *args, **kwargs): + + super(PromenadeOperator, self).__init__(*args, **kwargs) + self.action = action + self.main_dag_name = main_dag_name + self.shipyard_conf = shipyard_conf + self.sub_dag_name = sub_dag_name + self.workflow_info = workflow_info + self.xcom_push_flag = xcom_push + + def execute(self, context): + # Initialize Variables + check_etcd = False + delete_node = False + labels_removed = False + node_drained = False + redeploy_server = None + stop_kubelet = False + + # Define task_instance + task_instance = context['task_instance'] + + # Extract information related to current workflow + # The workflow_info variable will be a dictionary + # that contains information about the workflow such + # as action_id, name and other related parameters + workflow_info = task_instance.xcom_pull( + task_ids='action_xcom', key='action', + dag_id=self.main_dag_name) + + # Logs uuid of action performed by the Operator + logging.info("Promenade Operator for action %s", workflow_info['id']) + + # Retrieve information of the server that we want to redeploy if user + # executes the 'redeploy_server' dag + if workflow_info['dag_id'] == 'redeploy_server': + redeploy_server = workflow_info['parameters'].get('server-name') + + if redeploy_server: + logging.info("Server to be redeployed is %s", redeploy_server) + else: + raise AirflowException('Unable to retrieve information of ' + 'node to be redeployed!') + + # Retrieve Endpoint Information + svc_type = 'kubernetesprovisioner' + context['svc_endpoint'] = ucp_service_endpoint(self, + svc_type=svc_type) + logging.info("Promenade endpoint is %s", context['svc_endpoint']) + + # Promenade API Call + # Drain node using Promenade + if self.action == 'promenade_drain_node': + node_drained = self.promenade_drain_node(context, + redeploy_server) + + if node_drained: + logging.info("Node %s has been successfully drained", + redeploy_server) + else: + raise AirflowException('Failed to drain %s!', + redeploy_server) + + # Remove labels using Promenade + elif self.action == 'promenade_remove_labels': + labels_removed = self.promenade_drain_node(context, + redeploy_server) + + if labels_removed: + logging.info("Successfully removed labels on %s", + redeploy_server) + else: + raise AirflowException('Failed to remove labels on %s!', + redeploy_server) + + # Stops kubelet on node using Promenade + elif self.action == 'promenade_stop_kubelet': + stop_kubelet = self.promenade_stop_kubelet(context, + redeploy_server) + + if stop_kubelet: + logging.info("Successfully stopped kubelet on %s", + redeploy_server) + else: + raise AirflowException('Failed to stopped kubelet on %s!', + redeploy_server) + + # Performs etcd sanity check using Promenade + elif self.action == 'promenade_check_etcd': + check_etcd = self.promenade_check_etcd(context) + + if check_etcd: + logging.info("The etcd cluster is healthy and ready") + else: + raise AirflowException('Please check the state of etcd!') + + # Delete node from cluster using Promenade + elif self.action == 'promenade_delete_node': + delete_node = self.promenade_delete_node(context, + redeploy_server) + + if delete_node: + logging.info("Succesfully deleted node %s from cluster", + redeploy_server) + else: + raise AirflowException('Failed to node %s from cluster!', + redeploy_server) + + # No action to perform + else: + logging.info('No Action to Perform') + + @shipyard_service_token + def promenade_drain_node(self, context, redeploy_server): + # Placeholder function. Updates will be made when the Promenade + # API is ready for consumption. + logging.info("Draining node...") + + return True + + @shipyard_service_token + def promenade_remove_labels(self, context, redeploy_server): + # Placeholder function. Updates will be made when the Promenade + # API is ready for consumption. + logging.info("Removing labels on node...") + + return True + + @shipyard_service_token + def promenade_stop_kubelet(self, context, redeploy_server): + # Placeholder function. Updates will be made when the Promenade + # API is ready for consumption. + logging.info("Stopping kubelet on node...") + + return True + + @shipyard_service_token + def promenade_check_etcd(self, context): + # Placeholder function. Updates will be made when the Promenade + # API is ready for consumption. + logging.info("Performing health check on etcd...") + + return True + + @shipyard_service_token + def promenade_delete_node(self, context, redeploy_server): + # Placeholder function. Updates will be made when the Promenade + # API is ready for consumption. + logging.info("Deleting node from cluster...") + time.sleep(30) + logging.info("Successfully deleted node %s", redeploy_server) + + return True + + +class PromenadeOperatorPlugin(AirflowPlugin): + name = 'promenade_operator_plugin' + operators = [PromenadeOperator] diff --git a/tests/unit/control/test.conf b/tests/unit/control/test.conf index fd1eb072..f689a6f2 100644 --- a/tests/unit/control/test.conf +++ b/tests/unit/control/test.conf @@ -11,6 +11,8 @@ service_type = deckhand cluster_join_check_backoff_time = 120 deploy_node_query_interval = 30 deploy_node_task_timeout = 3600 +destroy_node_query_interval = 30 +destroy_node_task_timeout = 900 prepare_node_query_interval = 30 prepare_node_task_timeout = 1800 prepare_site_query_interval = 10 diff --git a/tools/resources/shipyard.conf b/tools/resources/shipyard.conf index 29121f9d..779a2444 100644 --- a/tools/resources/shipyard.conf +++ b/tools/resources/shipyard.conf @@ -13,6 +13,8 @@ service_type = deckhand cluster_join_check_backoff_time = 120 deploy_node_query_interval = 30 deploy_node_task_timeout = 3600 +destroy_node_query_interval = 30 +destroy_node_task_timeout = 900 prepare_node_query_interval = 30 prepare_node_task_timeout = 1800 prepare_site_query_interval = 10