diff --git a/.gitignore b/.gitignore index 48e45def..a0cebe82 100644 --- a/.gitignore +++ b/.gitignore @@ -2,7 +2,7 @@ __pycache__/ *.py[cod] *$py.class -**/.pytest_cache/ +.pytest_cache/ # C extensions diff --git a/Makefile b/Makefile index 13070504..7367b627 100644 --- a/Makefile +++ b/Makefile @@ -79,9 +79,9 @@ run: .PHONY: build_airflow build_airflow: ifeq ($(USE_PROXY), true) - docker build --network host -t $(IMAGE) --label $(LABEL) -f $(IMAGE_DIR)/Dockerfile $(IMAGE_DIR) --build-arg http_proxy=$(PROXY) --build-arg https_proxy=$(PROXY) + docker build --network host -t $(IMAGE) --label $(LABEL) -f $(IMAGE_DIR)/Dockerfile --build-arg http_proxy=$(PROXY) --build-arg https_proxy=$(PROXY) --build-arg ctx_base=$(BUILD_CTX) . else - docker build --network host -t $(IMAGE) --label $(LABEL) -f $(IMAGE_DIR)/Dockerfile $(IMAGE_DIR) + docker build --network host -t $(IMAGE) --label $(LABEL) -f $(IMAGE_DIR)/Dockerfile --build-arg ctx_base=$(BUILD_CTX) . endif ifeq ($(PUSH_IMAGE), true) docker push $(IMAGE) diff --git a/images/airflow/Dockerfile b/images/airflow/Dockerfile index fde2d383..625f1a87 100644 --- a/images/airflow/Dockerfile +++ b/images/airflow/Dockerfile @@ -32,6 +32,7 @@ EXPOSE $WORKER_PORT # Set ARG for usage during build ARG AIRFLOW_HOME=/usr/local/airflow ARG DEBIAN_FRONTEND=noninteractive +ARG ctx_base=src/bin # Kubectl version ARG KUBECTL_VERSION=1.8.6 @@ -76,17 +77,30 @@ RUN useradd -ms /bin/bash -d ${AIRFLOW_HOME} airflow \ # Dependency requirements # Note - removing snakebite (python 2 vs. 3). See: # https://github.com/puckel/docker-airflow/issues/77 -COPY ./requirements.txt /tmp/ +COPY images/airflow/requirements.txt /tmp/ RUN pip3 install -r /tmp/requirements.txt \ && pip3 uninstall -y snakebite || true # Copy scripts used in the container: # entrypoint.sh, airflow_start_service.sh and airflow_logrotate.sh -COPY script/*.sh ${AIRFLOW_HOME}/ +COPY images/airflow/script/*.sh ${AIRFLOW_HOME}/ # Change permissions RUN chown -R airflow: ${AIRFLOW_HOME} +# Shipyard +# +# Shipyard provides core functionality used by the airflow plugins/operators +# Since Shipyard and Airflow are built together as images, this should prevent +# stale or out-of-date code between these parts. +# Shipyard requirements, source and installation +COPY ${ctx_base}/shipyard_airflow/requirements.txt /tmp/api_requirements.txt +RUN pip3 install -r /tmp/api_requirements.txt + +COPY ${ctx_base}/shipyard_airflow /tmp/shipyard/ +RUN cd /tmp/shipyard \ + && python3 setup.py install + # Set work directory USER airflow WORKDIR ${AIRFLOW_HOME} diff --git a/src/bin/shipyard_airflow/shipyard_airflow/common/deployment_group/deployment_group_manager.py b/src/bin/shipyard_airflow/shipyard_airflow/common/deployment_group/deployment_group_manager.py index 2d9da11f..072d5608 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/common/deployment_group/deployment_group_manager.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/common/deployment_group/deployment_group_manager.py @@ -82,6 +82,71 @@ class DeploymentGroupManager: return self._all_groups[group] return None + def group_list(self): + """Return a list of DeploymentGroup objects in group order""" + summary = [] + for group_nm in self._group_order: + group = self._all_groups[group_nm] + summary.append(group) + return summary + + def critical_groups_failed(self): + """Return True if any critical groups have failed""" + for group in self._all_groups.values(): + if group.stage == Stage.FAILED and group.critical: + return True + return False + + def evaluate_group_succ_criteria(self, group_name, stage): + """Checks a group against its success criteria for a stage + + :param group_name: the name of the group to check + :param stage: Stage.PREPARED or Stage.DEPLOYED + Returns a boolean: True = success, False = failure. + """ + failed_criteria = self.get_group_failures_for_stage(group_name, stage) + if failed_criteria: + # Logging of criteria has already occurred during checking. + self.mark_group_failed(group_name) + LOG.info("Group %s has failed to meet its success criteria while " + "trying to move to stage: %s", + group_name, stage) + return False + elif stage == Stage.DEPLOYED: + self.mark_group_deployed(group_name) + LOG.info("Group %s has met its success criteria and is " + "successfully deployed (%s)", group_name, stage) + return True + elif stage == Stage.PREPARED: + self.mark_group_prepared(group_name) + LOG.info("Group %s has met its success criteria and is " + "now set to stage %s", group_name, stage) + return True + + def report_group_summary(self): + """Reports the status of all groups handled by this deployment""" + LOG.info("===== Group Summary =====") + for group in self.group_list(): + LOG.info(" Group %s%s ended with stage: %s", + group.name, + " [Critical]" if group.critical else "", + group.stage) + LOG.info("===== End Group Summary =====") + + def report_node_summary(self): + """Reports the status of all nodes handled by this deployment""" + # Ordered stages + stages = [Stage.NOT_STARTED, + Stage.PREPARED, + Stage.DEPLOYED, + Stage.FAILED] + + LOG.info("===== Node Summary =====") + for stage in stages: + nodes = self.get_nodes(stage=stage) + LOG.info(" Nodes %s: %s", stage, ", ".join(nodes)) + LOG.info("===== End Node Summary =====") + # # Methods that support setup of the nodes in groups # @@ -163,6 +228,22 @@ class DeploymentGroupManager: # Methods for handling nodes # + def fail_unsuccessful_nodes(self, group, successes): + """Fail nodes that were not successful in a group's actionable list + + :param group: the group to check + :param successes: the list of successful nodes from processing + + This makes an assumption that all actionable nodes should be in a list + of successes if they are to be considered successful. If the success + list is empty, all the actionable nodes in the group would be + considered failed. + """ + # Mark non-successes as failed + failed_nodes = set(group.actionable_nodes).difference(set(successes)) + for node_name in failed_nodes: + self.mark_node_failed(node_name) + def mark_node_deployed(self, node_name): """Mark a node as deployed""" self._set_node_stage(node_name, Stage.DEPLOYED) @@ -203,7 +284,7 @@ def _update_group_actionable_nodes(group, known_nodes): ", ".join(known_nodes)) group_nodes = set(group.full_nodes) - group.actionable_nodes = group_nodes.difference(known_nodes) + group.actionable_nodes = list(group_nodes.difference(known_nodes)) LOG.debug("Group %s set actionable_nodes to %s. " "Full node list for this group is %s", group.name, diff --git a/src/bin/shipyard_airflow/shipyard_airflow/dags/drydock_deploy_site.py b/src/bin/shipyard_airflow/shipyard_airflow/dags/drydock_deploy_site.py index 428a5fd8..98218a23 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/dags/drydock_deploy_site.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/dags/drydock_deploy_site.py @@ -13,8 +13,7 @@ # limitations under the License. from airflow.models import DAG -from airflow.operators import DrydockDeployNodesOperator -from airflow.operators import DrydockPrepareNodesOperator +from airflow.operators import DrydockNodesOperator from airflow.operators import DrydockPrepareSiteOperator from airflow.operators import DrydockVerifySiteOperator @@ -43,15 +42,8 @@ def deploy_site_drydock(parent_dag_name, child_dag_name, args): sub_dag_name=child_dag_name, dag=dag) - drydock_prepare_nodes = DrydockPrepareNodesOperator( - task_id='prepare_nodes', - shipyard_conf=config_path, - main_dag_name=parent_dag_name, - sub_dag_name=child_dag_name, - dag=dag) - - drydock_deploy_nodes = DrydockDeployNodesOperator( - task_id='deploy_nodes', + drydock_nodes = DrydockNodesOperator( + task_id='prepare_and_deploy_nodes', shipyard_conf=config_path, main_dag_name=parent_dag_name, sub_dag_name=child_dag_name, @@ -59,7 +51,6 @@ def deploy_site_drydock(parent_dag_name, child_dag_name, args): # Define dependencies drydock_prepare_site.set_upstream(drydock_verify_site) - drydock_prepare_nodes.set_upstream(drydock_prepare_site) - drydock_deploy_nodes.set_upstream(drydock_prepare_nodes) + drydock_nodes.set_upstream(drydock_prepare_site) return dag diff --git a/src/bin/shipyard_airflow/shipyard_airflow/plugins/armada_base_operator.py b/src/bin/shipyard_airflow/shipyard_airflow/plugins/armada_base_operator.py index 8f409762..1be7d3ba 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/plugins/armada_base_operator.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/plugins/armada_base_operator.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -import os from urllib.parse import urlparse from airflow.exceptions import AirflowException @@ -22,9 +21,9 @@ from airflow.utils.decorators import apply_defaults import armada.common.client as client import armada.common.session as session from get_k8s_pod_port_ip import get_pod_port_ip -from service_endpoint import ucp_service_endpoint from service_token import shipyard_service_token from ucp_base_operator import UcpBaseOperator +import service_endpoint from xcom_pusher import XcomPusher LOG = logging.getLogger(__name__) @@ -42,16 +41,12 @@ class ArmadaBaseOperator(UcpBaseOperator): @apply_defaults def __init__(self, - armada_svc_type='armada', - deckhand_svc_type='deckhand', query={}, svc_session=None, svc_token=None, *args, **kwargs): """Initialization of ArmadaBaseOperator object. - :param armada_svc_type: Armada Service Type - :param deckhand_svc_type: Deckhand Service Type :param query: A dictionary containing explicit query string parameters :param svc_session: Keystone Session :param svc_token: Keystone Token @@ -66,8 +61,6 @@ class ArmadaBaseOperator(UcpBaseOperator): pod_selector_pattern=[{'pod_pattern': 'armada-api', 'container': 'armada-api'}], *args, **kwargs) - self.armada_svc_type = armada_svc_type - self.deckhand_svc_type = deckhand_svc_type self.query = query self.svc_session = svc_session self.svc_token = svc_token @@ -81,21 +74,11 @@ class ArmadaBaseOperator(UcpBaseOperator): # Logs uuid of action performed by the Operator LOG.info("Armada Operator for action %s", self.action_info['id']) - # Retrieve Endpoint Information - armada_svc_endpoint = ucp_service_endpoint( - self, svc_type=self.armada_svc_type) - # Set up armada client - self.armada_client = self._init_armada_client(armada_svc_endpoint, - self.svc_token) - - # Retrieve DeckHand Endpoint Information - deckhand_svc_endpoint = ucp_service_endpoint( - self, svc_type=self.deckhand_svc_type) - - # Get deckhand design reference url - self.deckhand_design_ref = self._init_deckhand_design_ref( - deckhand_svc_endpoint) + self.armada_client = self._init_armada_client( + self.endpoints.endpoint_by_name(service_endpoint.ARMADA), + self.svc_token + ) @staticmethod def _init_armada_client(armada_svc_endpoint, svc_token): @@ -133,26 +116,6 @@ class ArmadaBaseOperator(UcpBaseOperator): else: raise AirflowException("Failed to set up Armada client!") - def _init_deckhand_design_ref(self, deckhand_svc_endpoint): - - LOG.info("Deckhand endpoint is %s", deckhand_svc_endpoint) - - # Form DeckHand Design Reference Path - # This URL will be used to retrieve the Site Design YAMLs - deckhand_path = "deckhand+" + deckhand_svc_endpoint - _deckhand_design_ref = os.path.join(deckhand_path, - "revisions", - str(self.revision_id), - "rendered-documents") - - if _deckhand_design_ref: - LOG.info("Design YAMLs will be retrieved from %s", - _deckhand_design_ref) - - return _deckhand_design_ref - else: - raise AirflowException("Unable to Retrieve Design Reference!") - @get_pod_port_ip('tiller', namespace='kube-system') def get_tiller_info(self, pods_ip_port={}): diff --git a/src/bin/shipyard_airflow/shipyard_airflow/plugins/armada_post_apply.py b/src/bin/shipyard_airflow/shipyard_airflow/plugins/armada_post_apply.py index ad931561..7afd6108 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/plugins/armada_post_apply.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/plugins/armada_post_apply.py @@ -58,7 +58,7 @@ class ArmadaPostApplyOperator(ArmadaBaseOperator): try: armada_post_apply = self.armada_client.post_apply( manifest=armada_manifest, - manifest_ref=self.deckhand_design_ref, + manifest_ref=self.design_ref, values=override_values, set=chart_set, query=self.query, diff --git a/src/bin/shipyard_airflow/shipyard_airflow/plugins/armada_validate_design.py b/src/bin/shipyard_airflow/shipyard_airflow/plugins/armada_validate_design.py index c7672f46..5dec64e9 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/plugins/armada_validate_design.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/plugins/armada_validate_design.py @@ -42,8 +42,7 @@ class ArmadaValidateDesignOperator(ArmadaBaseOperator): # Validate Site Design try: post_validate = self.armada_client.post_validate( - manifest=self.deckhand_design_ref, - timeout=timeout) + manifest=self.design_ref, timeout=timeout) except errors.ClientError as client_error: # Dump logs from Armada API pods diff --git a/src/bin/shipyard_airflow/shipyard_airflow/plugins/check_k8s_node_status.py b/src/bin/shipyard_airflow/shipyard_airflow/plugins/check_k8s_node_status.py index 6268c164..9220f769 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/plugins/check_k8s_node_status.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/plugins/check_k8s_node_status.py @@ -15,7 +15,6 @@ import logging import time -from airflow.exceptions import AirflowException from kubernetes import client, config @@ -54,7 +53,7 @@ def check_node_status(time_out, interval): # Logs initial state of all nodes in the cluster ret_init = v1.list_node(watch=False) - logging.info("Current state of nodes in Cluster is") + logging.info("Current state of nodes in the cluster is") for i in ret_init.items: logging.info("%s\t%s\t%s", i.metadata.name, @@ -86,7 +85,7 @@ def check_node_status(time_out, interval): cluster_ready = False # Print current state of node - logging.info("Node %s is not Ready", j.metadata.name) + logging.info("Node %s is not ready", j.metadata.name) logging.debug("Current status of %s is %s", j.metadata.name, j.status.conditions[-1].message) @@ -96,16 +95,18 @@ def check_node_status(time_out, interval): logging.info("Node %s is in Ready state", j.metadata.name) - # Raise Time Out Exception + # If any nodes are not ready and the timeout is reached, stop waiting if not cluster_ready and i == end_range: - raise AirflowException("Timed Out! One or more Nodes fail to " - "get into Ready State!") - - # Exit loop if Cluster is in Ready state - if cluster_ready: - logging.info("All nodes are in Ready state") + logging.info("Timed Out! One or more Nodes failed to reach ready " + "state") + break + elif cluster_ready: + # Exit loop if Cluster is in Ready state + logging.info("All nodes are in ready state") break else: # Back off and check again in next iteration logging.info("Wait for %d seconds...", int(interval)) time.sleep(int(interval)) + # Return the nodes that are not ready. + return not_ready_node_list diff --git a/src/bin/shipyard_airflow/shipyard_airflow/plugins/deckhand_base_operator.py b/src/bin/shipyard_airflow/shipyard_airflow/plugins/deckhand_base_operator.py index 72002f8b..909af305 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/plugins/deckhand_base_operator.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/plugins/deckhand_base_operator.py @@ -19,7 +19,7 @@ from airflow.plugins_manager import AirflowPlugin from airflow.exceptions import AirflowException from deckhand.client import client as deckhand_client -from service_endpoint import ucp_service_endpoint +import service_endpoint from service_token import shipyard_service_token from ucp_base_operator import UcpBaseOperator @@ -41,8 +41,6 @@ class DeckhandBaseOperator(UcpBaseOperator): committed_ver=None, deckhandclient=None, deckhand_client_read_timeout=None, - deckhand_svc_endpoint=None, - deckhand_svc_type='deckhand', revision_id=None, svc_session=None, svc_token=None, @@ -53,8 +51,6 @@ class DeckhandBaseOperator(UcpBaseOperator): :param committed_ver: Last committed version :param deckhandclient: An instance of deckhand client :param deckhand_client_read_timeout: Deckhand client connect timeout - :param deckhand_svc_endpoint: Deckhand Service Endpoint - :param deckhand_svc_type: Deckhand Service Type :param revision_id: Target revision for workflow :param svc_session: Keystone Session :param svc_token: Keystone Token @@ -70,8 +66,6 @@ class DeckhandBaseOperator(UcpBaseOperator): self.committed_ver = committed_ver self.deckhandclient = deckhandclient self.deckhand_client_read_timeout = deckhand_client_read_timeout - self.deckhand_svc_endpoint = deckhand_svc_endpoint - self.deckhand_svc_type = deckhand_svc_type self.revision_id = revision_id self.svc_session = svc_session self.svc_token = svc_token @@ -96,8 +90,9 @@ class DeckhandBaseOperator(UcpBaseOperator): self.action_info['id']) # Retrieve Endpoint Information - self.deckhand_svc_endpoint = ucp_service_endpoint( - self, svc_type=self.deckhand_svc_type) + self.deckhand_svc_endpoint = self.endpoints.endpoint_by_name( + service_endpoint.DECKHAND + ) LOG.info("Deckhand endpoint is %s", self.deckhand_svc_endpoint) diff --git a/src/bin/shipyard_airflow/shipyard_airflow/plugins/deployment_configuration_operator.py b/src/bin/shipyard_airflow/shipyard_airflow/plugins/deployment_configuration_operator.py index b307745e..14ad9493 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/plugins/deployment_configuration_operator.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/plugins/deployment_configuration_operator.py @@ -45,7 +45,7 @@ class DeploymentConfigurationOperator(BaseOperator): cannot be retrieved """ config_keys_defaults = { - "physical_provisioner.deployment_strategy": "all-at-once", + "physical_provisioner.deployment_strategy": None, "physical_provisioner.deploy_interval": 30, "physical_provisioner.deploy_timeout": 3600, "physical_provisioner.destroy_interval": 30, diff --git a/src/bin/shipyard_airflow/shipyard_airflow/plugins/drydock_base_operator.py b/src/bin/shipyard_airflow/shipyard_airflow/plugins/drydock_base_operator.py index d539f6cd..83556247 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/plugins/drydock_base_operator.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/plugins/drydock_base_operator.py @@ -14,7 +14,6 @@ import copy import pprint import logging -import os import time from urllib.parse import urlparse @@ -25,9 +24,35 @@ from airflow.utils.decorators import apply_defaults import drydock_provisioner.drydock_client.client as client import drydock_provisioner.drydock_client.session as session from drydock_provisioner import error as errors -from service_endpoint import ucp_service_endpoint -from service_token import shipyard_service_token -from ucp_base_operator import UcpBaseOperator + +try: + import service_endpoint +except ImportError: + from shipyard_airflow.plugins import service_endpoint +try: + from service_token import shipyard_service_token +except ImportError: + from shipyard_airflow.plugins.service_token import shipyard_service_token + +try: + from ucp_base_operator import UcpBaseOperator +except ImportError: + from shipyard_airflow.plugins.ucp_base_operator import UcpBaseOperator + +try: + from drydock_errors import ( + DrydockClientUseFailureException, + DrydockTaskFailedException, + DrydockTaskNotCreatedException, + DrydockTaskTimeoutException + ) +except ImportError: + from shipyard_airflow.plugins.drydock_errors import ( + DrydockClientUseFailureException, + DrydockTaskFailedException, + DrydockTaskNotCreatedException, + DrydockTaskTimeoutException + ) LOG = logging.getLogger(__name__) @@ -44,11 +69,7 @@ class DrydockBaseOperator(UcpBaseOperator): @apply_defaults def __init__(self, - deckhand_design_ref=None, - deckhand_svc_type='deckhand', drydock_client=None, - drydock_svc_endpoint=None, - drydock_svc_type='physicalprovisioner', drydock_task_id=None, node_filter=None, redeploy_server=None, @@ -57,11 +78,7 @@ class DrydockBaseOperator(UcpBaseOperator): *args, **kwargs): """Initialization of DrydockBaseOperator object. - :param deckhand_design_ref: A URI reference to the design documents - :param deckhand_svc_type: Deckhand Service Type :param drydockclient: An instance of drydock client - :param drydock_svc_endpoint: Drydock Service Endpoint - :param drydock_svc_type: Drydock Service Type :param drydock_task_id: Drydock Task ID :param node_filter: A filter for narrowing the scope of the task. Valid fields are 'node_names', 'rack_names', @@ -81,11 +98,7 @@ class DrydockBaseOperator(UcpBaseOperator): pod_selector_pattern=[{'pod_pattern': 'drydock-api', 'container': 'drydock-api'}], *args, **kwargs) - self.deckhand_design_ref = deckhand_design_ref - self.deckhand_svc_type = deckhand_svc_type self.drydock_client = drydock_client - self.drydock_svc_endpoint = drydock_svc_endpoint - self.drydock_svc_type = drydock_svc_type self.drydock_task_id = drydock_task_id self.node_filter = node_filter self.redeploy_server = redeploy_server @@ -126,8 +139,9 @@ class DrydockBaseOperator(UcpBaseOperator): % self.__class__.__name__) # Retrieve Endpoint Information - self.drydock_svc_endpoint = ucp_service_endpoint( - self, svc_type=self.drydock_svc_type) + self.drydock_svc_endpoint = self.endpoints.endpoint_by_name( + service_endpoint.DRYDOCK + ) LOG.info("Drydock endpoint is %s", self.drydock_svc_endpoint) @@ -147,7 +161,9 @@ class DrydockBaseOperator(UcpBaseOperator): if dd_session: LOG.info("Successfully Set Up DryDock Session") else: - raise AirflowException("Failed to set up Drydock Session!") + raise DrydockClientUseFailureException( + "Failed to set up Drydock Session!" + ) # Use the DrydockSession to build a DrydockClient that can # be used to make one or more API calls @@ -158,26 +174,9 @@ class DrydockBaseOperator(UcpBaseOperator): if self.drydock_client: LOG.info("Successfully Set Up DryDock client") else: - raise AirflowException("Failed to set up Drydock Client!") - - # Retrieve DeckHand Endpoint Information - deckhand_svc_endpoint = ucp_service_endpoint( - self, svc_type=self.deckhand_svc_type) - - LOG.info("Deckhand endpoint is %s", deckhand_svc_endpoint) - - # Form DeckHand Design Reference Path - # This URL will be used to retrieve the Site Design YAMLs - deckhand_path = "deckhand+" + deckhand_svc_endpoint - self.deckhand_design_ref = os.path.join(deckhand_path, - "revisions", - str(self.revision_id), - "rendered-documents") - if self.deckhand_design_ref: - LOG.info("Design YAMLs will be retrieved from %s", - self.deckhand_design_ref) - else: - raise AirflowException("Unable to Retrieve Design Reference!") + raise DrydockClientUseFailureException( + "Failed to set up Drydock Client!" + ) @shipyard_service_token def _auth_gen(self): @@ -196,7 +195,7 @@ class DrydockBaseOperator(UcpBaseOperator): try: # Create Task create_task_response = self.drydock_client.create_task( - design_ref=self.deckhand_design_ref, + design_ref=self.design_ref, task_action=task_action, node_filter=self.node_filter) @@ -204,7 +203,7 @@ class DrydockBaseOperator(UcpBaseOperator): # Dump logs from Drydock pods self.get_k8s_logs() - raise AirflowException(client_error) + raise DrydockClientUseFailureException(client_error) # Retrieve Task ID self.drydock_task_id = create_task_response['task_id'] @@ -216,7 +215,7 @@ class DrydockBaseOperator(UcpBaseOperator): if self.drydock_task_id: return self.drydock_task_id else: - raise AirflowException("Unable to create task!") + raise DrydockTaskNotCreatedException("Unable to create task!") def query_task(self, interval, time_out): @@ -235,21 +234,16 @@ class DrydockBaseOperator(UcpBaseOperator): try: # Retrieve current task state - task_state = self.drydock_client.get_task( - task_id=self.drydock_task_id) + task_state = self.get_task_dict(task_id=self.drydock_task_id) task_status = task_state['status'] task_result = task_state['result']['status'] LOG.info("Current status of task id %s is %s", self.drydock_task_id, task_status) - - except errors.ClientError as client_error: - # Dump logs from Drydock pods + except DrydockClientUseFailureException: self.get_k8s_logs() - - raise AirflowException(client_error) - + raise except: # There can be situations where there are intermittent network # issues that prevents us from retrieving the task state. We @@ -275,6 +269,21 @@ class DrydockBaseOperator(UcpBaseOperator): else: self.task_failure(True) + def get_task_dict(self, task_id): + """Retrieve task output in its raw dictionary format + + :param task_id: The id of the task to retrieve + Raises DrydockClientUseFailureException if the client raises an + exception + See: + http://att-comdev-drydock.readthedocs.io/en/latest/task.html#task-status-schema + """ + try: + return self.drydock_client.get_task(task_id=task_id) + except errors.ClientError as client_error: + # Dump logs from Drydock pods + raise DrydockClientUseFailureException(client_error) + def task_failure(self, _task_failure): # Dump logs from Drydock pods self.get_k8s_logs() @@ -289,7 +298,7 @@ class DrydockBaseOperator(UcpBaseOperator): self.all_task_ids = {t['task_id']: t for t in all_tasks} except errors.ClientError as client_error: - raise AirflowException(client_error) + raise DrydockClientUseFailureException(client_error) # Retrieve the failed parent task and assign it to list failed_parent_task = ( @@ -299,7 +308,7 @@ class DrydockBaseOperator(UcpBaseOperator): # Since there is only 1 failed parent task, we will print index 0 # of the list if failed_parent_task: - LOG.error('%s task has either failed or timed out', + LOG.error("%s task has either failed or timed out", failed_parent_task[0]['action']) LOG.error(pprint.pprint(failed_parent_task[0])) @@ -312,9 +321,13 @@ class DrydockBaseOperator(UcpBaseOperator): # Raise Exception to terminate workflow if _task_failure: - raise AirflowException("Failed to Execute/Complete Task!") + raise DrydockTaskFailedException( + "Failed to Execute/Complete Task!" + ) else: - raise AirflowException("Task Execution Timed Out!") + raise DrydockTaskTimeoutException( + "Task Execution Timed Out!" + ) def check_subtask_failure(self, subtask_id_list): @@ -367,7 +380,9 @@ class DrydockBaseOperator(UcpBaseOperator): subtask_id) else: - raise AirflowException("Unable to retrieve subtask info!") + raise DrydockClientUseFailureException( + "Unable to retrieve subtask info!" + ) class DrydockBaseOperatorPlugin(AirflowPlugin): diff --git a/src/bin/shipyard_airflow/shipyard_airflow/plugins/drydock_deploy_nodes.py b/src/bin/shipyard_airflow/shipyard_airflow/plugins/drydock_deploy_nodes.py deleted file mode 100644 index 4c8b1f14..00000000 --- a/src/bin/shipyard_airflow/shipyard_airflow/plugins/drydock_deploy_nodes.py +++ /dev/null @@ -1,70 +0,0 @@ -# Copyright 2018 AT&T Intellectual Property. All other rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import logging -import time - -from airflow.plugins_manager import AirflowPlugin - -from check_k8s_node_status import check_node_status -from drydock_base_operator import DrydockBaseOperator - -LOG = logging.getLogger(__name__) - - -class DrydockDeployNodesOperator(DrydockBaseOperator): - - """Drydock Deploy Nodes Operator - - This operator will trigger drydock to deploy the bare metal - nodes - - """ - - def do_execute(self): - - # Trigger DryDock to execute task - self.create_task('deploy_nodes') - - # Retrieve query interval and timeout - q_interval = self.dc['physical_provisioner.deploy_interval'] - task_timeout = self.dc['physical_provisioner.deploy_timeout'] - - # Query Task - self.query_task(q_interval, task_timeout) - - # It takes time for the cluster join process to be triggered across - # all the nodes in the cluster. Hence there is a need to back off - # and wait before checking the state of the cluster join process. - join_wait = self.dc['physical_provisioner.join_wait'] - - LOG.info("All nodes deployed in MAAS") - LOG.info("Wait for %d seconds before checking node state...", - join_wait) - - time.sleep(join_wait) - - # Check that cluster join process is completed before declaring - # deploy_node as 'completed'. - node_st_timeout = self.dc['kubernetes.node_status_timeout'] - node_st_interval = self.dc['kubernetes.node_status_interval'] - - check_node_status(node_st_timeout, node_st_interval) - - -class DrydockDeployNodesOperatorPlugin(AirflowPlugin): - - """Creates DrydockDeployNodesOperator in Airflow.""" - - name = 'drydock_deploy_nodes_operator' - operators = [DrydockDeployNodesOperator] diff --git a/src/bin/shipyard_airflow/shipyard_airflow/plugins/drydock_errors.py b/src/bin/shipyard_airflow/shipyard_airflow/plugins/drydock_errors.py new file mode 100644 index 00000000..359b0cd2 --- /dev/null +++ b/src/bin/shipyard_airflow/shipyard_airflow/plugins/drydock_errors.py @@ -0,0 +1,34 @@ +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Drydock specific exceptions generated during operator execution. + +Generally marker exceptions extending AirflowException +""" +from airflow.exceptions import AirflowException + + +class DrydockClientUseFailureException(AirflowException): + pass + + +class DrydockTaskFailedException(AirflowException): + pass + + +class DrydockTaskNotCreatedException(AirflowException): + pass + + +class DrydockTaskTimeoutException(AirflowException): + pass diff --git a/src/bin/shipyard_airflow/shipyard_airflow/plugins/drydock_nodes.py b/src/bin/shipyard_airflow/shipyard_airflow/plugins/drydock_nodes.py new file mode 100644 index 00000000..4dab6610 --- /dev/null +++ b/src/bin/shipyard_airflow/shipyard_airflow/plugins/drydock_nodes.py @@ -0,0 +1,486 @@ +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Prepare and deploy nodes using Drydock + +Uses the deployment strategy named in the deployment-configuration to +progress through preparation and deployment of nodes in a group-based fashion. + +In the case of no specified deployment strategy, an "all-at-once" approach is +taken, by which all nodes are deployed together. + +Historical Note: This operator replaces the function of drydock_prepare_nodes +and drydock_deploy_nodes operators that existed previously. +""" +import logging +import time + +from airflow.exceptions import AirflowException +from airflow.plugins_manager import AirflowPlugin + +from shipyard_airflow.common.deployment_group.deployment_group import Stage +from shipyard_airflow.common.deployment_group.deployment_group_manager import \ + DeploymentGroupManager +from shipyard_airflow.common.deployment_group.node_lookup import NodeLookup + +try: + import check_k8s_node_status +except ImportError: + from shipyard_airflow.plugins import check_k8s_node_status + +try: + from drydock_base_operator import DrydockBaseOperator +except ImportError: + from shipyard_airflow.plugins.drydock_base_operator import \ + DrydockBaseOperator + +try: + from drydock_errors import ( + DrydockTaskFailedException, + DrydockTaskTimeoutException + ) +except ImportError: + from shipyard_airflow.plugins.drydock_errors import ( + DrydockTaskFailedException, + DrydockTaskTimeoutException + ) + +LOG = logging.getLogger(__name__) + + +class DrydockNodesOperator(DrydockBaseOperator): + """Drydock Nodes Operator + + Using a deployment strategy to calculate the deployment sequence, + deploy a series of baremetal nodes using Drydock. + """ + + def do_execute(self): + self._setup_configured_values() + # setup self.strat_name and self.strategy + self.strategy = {} + self._setup_deployment_strategy() + dgm = _get_deployment_group_manager( + self.strategy['groups'], + _get_node_lookup(self.drydock_client, self.design_ref) + ) + + _process_deployment_groups(dgm, + self._execute_prepare, + self._execute_deployment) + + # All groups "complete" (as they're going to be). Report summary + dgm.report_group_summary() + dgm.report_node_summary() + if dgm.critical_groups_failed(): + raise AirflowException( + "One or more deployment groups marked as critical have failed" + ) + else: + LOG.info("All critical groups have met their success criteria") + # TODO (bryan-strassner) it is very possible that many nodes failed + # deployment, but all critical groups had enough success to + # continue processing. This will be non-obvious to the casual + # observer of the workflow. A likely enhancement is to allow + # notes be added to the shipyard action associated with this + # workflow that would be reported back to the end user doing a + # describe of the action. This will require new database structures + # to hold the notes, and a means to insert the notes. A shared + # functionality in the base ucp operator or a common module would + # be a reasonable way to support this. + + def _setup_configured_values(self): + """Sets self. values from the deployment configuration""" + # Retrieve query intervals and timeouts + # Intervals - How often will something be queried for status. + self.dep_interval = self.dc['physical_provisioner.deploy_interval'] + self.node_st_interval = self.dc['kubernetes.node_status_interval'] + self.prep_interval = self.dc[ + 'physical_provisioner.prepare_node_interval' + ] + # Timeouts - Time Shipyard waits for completion of a task. + self.dep_timeout = self.dc['physical_provisioner.deploy_timeout'] + self.node_st_timeout = self.dc['kubernetes.node_status_timeout'] + self.prep_timeout = self.dc[ + 'physical_provisioner.prepare_node_timeout' + ] + # The time to wait before querying k8s nodes after Drydock deploy nodes + self.join_wait = self.dc['physical_provisioner.join_wait'] + + def _execute_prepare(self, group): + """Executes the prepare nodes step for the group. + + :param group: the DeploymentGroup to prepare + Returns a QueryTaskResult object + """ + LOG.info("Group %s is preparing nodes", group.name) + + self.node_filter = _gen_node_name_filter(group.actionable_nodes) + return self._execute_task('prepare_nodes', + self.prep_interval, + self.prep_timeout) + + def _execute_deployment(self, group): + """Execute the deployment of nodes for the group. + + :param group: The DeploymentGroup to deploy + Returns a QueryTaskResult object + """ + LOG.info("Group %s is deploying nodes", group.name) + + self.node_filter = _gen_node_name_filter(group.actionable_nodes) + task_result = self._execute_task('deploy_nodes', + self.dep_interval, + self.dep_timeout) + + if not task_result.successes: + # if there are no successes from Drydock, there is no need to + # wait and check on the results from node status. + LOG.info("There are no nodes indicated as successful from Drydock." + " Skipping waiting for Kubernetes node join and " + "proceeding to validation") + return task_result + + # It takes time for the cluster join process to be triggered across + # all the nodes in the cluster. Hence there is a need to back off + # and wait before checking the state of the cluster join process. + LOG.info("Nodes <%s> reported as deployed in MAAS", + ", ".join(task_result.successes)) + LOG.info("Waiting for %d seconds before checking node state...", + self.join_wait) + time.sleep(self.join_wait) + + # Check that cluster join process is completed before declaring + # deploy_node as 'completed'. + # This should only include nodes that drydock has indicated as + # successful and has passed the join script to. + # Anything not ready in the timeout needs to be considered a failure + not_ready_list = check_k8s_node_status.check_node_status( + self.node_st_timeout, + self.node_st_interval + ) + for node in not_ready_list: + # Remove nodes that are not ready from the list of successes, since + # they did not complete deployment successfully. + try: + LOG.info("Node %s failed to join the Kubernetes cluster or was" + " not timely enough", node) + task_result.successes.remove(node) + except ValueError: + # This node is not joined, but was not one that we were + # looking for either. + LOG.info("%s failed to join Kubernetes, but was not in the " + "Drydock results: %s", + node, + ", ".join(task_result.successes)) + return task_result + + def _execute_task(self, task_name, interval, timeout): + """Execute the Drydock task requested + + :param task_name: 'prepare_nodes', 'deploy_nodes' + ;param interval: The time between checking status on the task + :param timeout: The total time allowed for the task + + Wraps the query_task method in the base class, capturing + AirflowExceptions and summarizing results into a response + QueryTaskResult object + + Note: It does not matter if the task ultimately succeeds or fails in + Drydock - the base class will handle all the logging and etc for + the purposes of troubleshooting. What matters is the node successes. + Following any result of query_task, this code will re-query the task + results from Drydock to gather the node successes placing them into + the successes list in the response object. In the case of a failure to + get the task results, this workflow must assume that the result is a + total loss, and pass back no successes + """ + self.create_task(task_name) + result = QueryTaskResult(self.drydock_task_id, task_name) + + try: + self.query_task(interval, timeout) + except DrydockTaskFailedException: + # Task failure may be successful enough based on success criteria. + # This should not halt the overall flow of this workflow step. + LOG.warn( + "Task %s has failed. Logs contain details of the failure. " + "Some nodes may be succesful, processing continues", task_name + ) + except DrydockTaskTimeoutException: + # Task timeout may be successful enough based on success criteria. + # This should not halt the overall flow of this workflow step. + LOG.warn( + "Task %s has timed out after %s seconds. Logs contain details " + "of the failure. Some nodes may be succesful, processing " + "continues", task_name, timeout + ) + # Other AirflowExceptions will fail the whole task - let them do this. + + # find successes + result.successes = self._get_successes_for_task(self.drydock_task_id) + return result + + def _get_successes_for_task(self, task_id, extend_success=True): + """Discover the successful nodes based on the current task id. + + :param task_id: The id of the task + :param extend_successes: determines if this result extends successes + or simply reports on the task. + Gets the set of successful nodes by examining the self.drydock_task_id. + The children are traversed recursively to display each sub-task's + information. + + Only a reported success at the parent task indicates success of the + task. Drydock is assumed to roll up overall success to the top level. + """ + success_nodes = [] + task_dict = self.get_task_dict(task_id) + task_status = task_dict.get('status', "Not Specified") + task_result = task_dict.get('result') + if task_result is None: + LOG.warn("Task result is missing for task %s, with status %s." + " Neither successes nor further details can be extracted" + " from this result", + task_id, task_status) + else: + if extend_success: + try: + # successes and failures on the task result drive the + # interpretation of success or failure for this workflow. + # - Any node that is _only_ success for a task is a + # success to us. + # - Any node that is listed as a failure is a failure. + # This implies that a node listed as a success and a + # failure is a failure. E.g. some subtasks succeeded and + # some failed + t_successes = task_result.get('successes', []) + t_failures = task_result.get('failures', []) + actual_successes = set(t_successes) - set(t_failures) + # acquire the successes from success nodes + success_nodes.extend(actual_successes) + LOG.info("Nodes <%s> added as successes for task %s", + ", ".join(success_nodes), task_id) + except KeyError: + # missing key on the path to getting nodes - don't add any + LOG.warn("Missing successes field on result of task %s, " + "but a success field was expected. No successes" + " can be extracted from this result", + task_id) + pass + _report_task_info(task_id, task_result, task_status) + + # for each child, report only the step info, do not add to overall + # success list. + for ch_task_id in task_dict.get('subtask_id_list', []): + success_nodes.extend( + self._get_successes_for_task(ch_task_id, extend_success=False) + ) + # deduplicate and return + return set(success_nodes) + + def _setup_deployment_strategy(self): + """Determine the deployment strategy + + Uses the specified strategy from the deployment configuration + or returns a default configuration of 'all-at-once' + """ + self.strat_name = self.dc['physical_provisioner.deployment_strategy'] + if self.strat_name: + # if there is a deployment strategy specified, get it and use it + self.strategy = self.get_unique_doc( + name=self.strat_name, + schema="shipyard/DeploymentStrategy/v1" + ) + else: + # The default behavior is to deploy all nodes, and fail if + # any nodes fail to deploy. + self.strat_name = 'all-at-once (defaulted)' + self.strategy = _default_deployment_strategy() + LOG.info("Strategy Name: %s has %s groups", + self.strat_name, + len(self.strategy.get('groups', []))) + + +# +# Functions supporting the nodes operator class +# + +def _get_node_lookup(drydock_client, design_ref): + """Return a NodeLookup suitable for the DeploymentGroupManager + + :param drydock_client: the drydock_client object + :param design_ref: the design_ref for the NodeLookup + """ + return NodeLookup(drydock_client, design_ref).lookup + + +def _get_deployment_group_manager(groups_dict_list, node_lookup): + """Return a DeploymentGroupManager suitable for managing this deployment + + :param groups_dict_list: the list of group dictionaries to use + :param node_lookup: a NodeLookup object that will be used by this + DeploymentGroupManager + """ + return DeploymentGroupManager(groups_dict_list, node_lookup) + + +def _process_deployment_groups(dgm, prepare_func, deploy_func): + """Executes the deployment group deployments + + :param dgm: the DeploymentGroupManager object that manages the + dependency chain of groups + :param prepare_func: a function that accepts a DeploymentGroup and returns + a QueryTaskResult with the purpose of preparing nodes + :param deploy_func: a function that accepts a DeploymentGroup and returns + a QueryTaskResult with the purpose of deploying nodes + """ + complete = False + while not complete: + # Find the next group to be prepared. Prepare and deploy it. + group = dgm.get_next_group(Stage.PREPARED) + if group is None: + LOG.info("There are no more groups eligible to process") + # whether or not really complete, the processing loop is done. + complete = True + continue + + LOG.info("*** Deployment Group: %s is being processed ***", group.name) + if not group.actionable_nodes: + LOG.info("There were no actionable nodes for group %s. It is " + "possible that all nodes: [%s] have previously been " + "deployed. Group will be immediately checked " + "against its success criteria", group.name, + ", ".join(group.full_nodes)) + + # In the case of a group having no actionable nodes, since groups + # prepare -> deploy in direct sequence, we can check against + # deployment, since all nodes would need to be deployed or have + # been attempted. Need to follow the state-transition, so + # PREPARED -> DEPLOYED + dgm.evaluate_group_succ_criteria(group.name, Stage.PREPARED) + dgm.evaluate_group_succ_criteria(group.name, Stage.DEPLOYED) + # success or failure, move on to next group + continue + + LOG.info("%s has actionable nodes: [%s]", group.name, + ", ".join(group.actionable_nodes)) + if len(group.actionable_nodes) < len(group.full_nodes): + LOG.info("Some nodes are not actionable because they were " + "included in a prior group, but will be considered in " + "the success critera calculation for this group") + + # Group has actionable nodes. + # Prepare Nodes for group, store QueryTaskResults + prep_qtr = prepare_func(group) + # Mark successes as prepared + for node_name in prep_qtr.successes: + dgm.mark_node_prepared(node_name) + + dgm.fail_unsuccessful_nodes(group, prep_qtr.successes) + should_deploy = dgm.evaluate_group_succ_criteria(group.name, + Stage.PREPARED) + if not should_deploy: + # group has failed, move on to next group. Current group has + # been marked as failed. + continue + + # Continue with deployment + dep_qtr = deploy_func(group) + # Mark successes as deployed + for node_name in dep_qtr.successes: + dgm.mark_node_deployed(node_name) + dgm.fail_unsuccessful_nodes(group, dep_qtr.successes) + dgm.evaluate_group_succ_criteria(group.name, Stage.DEPLOYED) + + +def _report_task_info(task_id, task_result, task_status): + """Logs information regarding a task. + + :param task_id: id of the task + :param task_result: The result dictionary of the task + :param task_status: The status for the task + """ + # setup fields, or defaults if missing values + task_failures = task_result.get('failures', []) + task_successes = task_result.get('successes', []) + result_details = task_result.get('details', {'messageList': []}) + result_status = task_result.get('status', "No status supplied") + LOG.info("Task %s with status %s/%s reports successes: [%s] and" + " failures: [%s]", task_id, task_status, result_status, + ", ".join(task_successes), ", ".join(task_failures)) + for message_item in result_details['messageList']: + context_type = message_item.get('context_type', 'N/A') + context_id = message_item.get('context', 'N/A') + message = message_item.get('message', "No message text supplied") + error = message_item.get('error', False) + timestamp = message_item.get('ts', 'No timestamp supplied') + LOG.info(" - Task %s for item %s:%s has message: %s [err=%s, at %s]", + task_id, context_type, context_id, message, error, timestamp) + + +def _default_deployment_strategy(): + """The default deployment strategy for 'all-at-once'""" + return { + 'groups': [ + { + 'name': 'default', + 'critical': True, + 'depends_on': [], + 'selectors': [ + { + 'node_names': [], + 'node_labels': [], + 'node_tags': [], + 'rack_names': [], + }, + ], + 'success_criteria': { + 'percent_successful_nodes': 100 + }, + } + ] + } + + +def _gen_node_name_filter(node_names): + """Generates a drydock compatible node filter using only node names + + :param node_names: the nodes with which to create a filter + """ + return { + 'filter_set_type': 'union', + 'filter_set': [ + { + 'filter_type': 'union', + 'node_names': node_names + } + ] + } + + +class QueryTaskResult: + """Represents a summarized query result from a task""" + def __init__(self, task_id, task_name): + self.task_id = task_id + self.task_name = task_name + # The succeeded node names + self.successes = [] + + +class DrydockNodesOperatorPlugin(AirflowPlugin): + + """Creates DrydockPrepareNodesOperator in Airflow.""" + + name = 'drydock_nodes_operator' + operators = [DrydockNodesOperator] diff --git a/src/bin/shipyard_airflow/shipyard_airflow/plugins/drydock_prepare_nodes.py b/src/bin/shipyard_airflow/shipyard_airflow/plugins/drydock_prepare_nodes.py deleted file mode 100644 index ab2fb008..00000000 --- a/src/bin/shipyard_airflow/shipyard_airflow/plugins/drydock_prepare_nodes.py +++ /dev/null @@ -1,46 +0,0 @@ -# Copyright 2018 AT&T Intellectual Property. All other rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -from airflow.plugins_manager import AirflowPlugin - -from drydock_base_operator import DrydockBaseOperator - - -class DrydockPrepareNodesOperator(DrydockBaseOperator): - - """Drydock Prepare Nodes Operator - - This operator will trigger drydock to prepare nodes for - site deployment - - """ - - def do_execute(self): - - # Trigger DryDock to execute task - self.create_task('prepare_nodes') - - # Retrieve query interval and timeout - q_interval = self.dc['physical_provisioner.prepare_node_interval'] - task_timeout = self.dc['physical_provisioner.prepare_node_timeout'] - - # Query Task - self.query_task(q_interval, task_timeout) - - -class DrydockPrepareNodesOperatorPlugin(AirflowPlugin): - - """Creates DrydockPrepareNodesOperator in Airflow.""" - - name = 'drydock_prepare_nodes_operator' - operators = [DrydockPrepareNodesOperator] diff --git a/src/bin/shipyard_airflow/shipyard_airflow/plugins/drydock_validate_design.py b/src/bin/shipyard_airflow/shipyard_airflow/plugins/drydock_validate_design.py index 72186216..30c374e8 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/plugins/drydock_validate_design.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/plugins/drydock_validate_design.py @@ -49,7 +49,7 @@ class DrydockValidateDesignOperator(DrydockBaseOperator): payload = { 'rel': "design", - 'href': self.deckhand_design_ref, + 'href': self.design_ref, 'type': "application/x-yaml" } diff --git a/src/bin/shipyard_airflow/shipyard_airflow/plugins/promenade_base_operator.py b/src/bin/shipyard_airflow/shipyard_airflow/plugins/promenade_base_operator.py index 00e7d1f6..e40d39af 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/plugins/promenade_base_operator.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/plugins/promenade_base_operator.py @@ -12,16 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -import os from airflow.utils.decorators import apply_defaults from airflow.plugins_manager import AirflowPlugin from airflow.exceptions import AirflowException try: - from service_endpoint import ucp_service_endpoint + import service_endpoint except ImportError: - from shipyard_airflow.plugins.service_endpoint import ucp_service_endpoint + from shipyard_airflow.plugins import service_endpoint try: from service_token import shipyard_service_token @@ -47,19 +46,11 @@ class PromenadeBaseOperator(UcpBaseOperator): @apply_defaults def __init__(self, - deckhand_design_ref=None, - deckhand_svc_type='deckhand', - promenade_svc_endpoint=None, - promenade_svc_type='kubernetesprovisioner', redeploy_server=None, svc_token=None, *args, **kwargs): """Initialization of PromenadeBaseOperator object. - :param deckhand_design_ref: A URI reference to the design documents - :param deckhand_svc_type: Deckhand Service Type - :param promenade_svc_endpoint: Promenade Service Endpoint - :param promenade_svc_type: Promenade Service Type :param redeploy_server: Server to be redeployed :param svc_token: Keystone Token The Drydock operator assumes that prior steps have set xcoms for @@ -71,10 +62,6 @@ class PromenadeBaseOperator(UcpBaseOperator): pod_selector_pattern=[{'pod_pattern': 'promenade-api', 'container': 'promenade-api'}], *args, **kwargs) - self.deckhand_design_ref = deckhand_design_ref - self.deckhand_svc_type = deckhand_svc_type - self.promenade_svc_endpoint = promenade_svc_endpoint - self.promenade_svc_type = promenade_svc_type self.redeploy_server = redeploy_server self.svc_token = svc_token @@ -98,31 +85,12 @@ class PromenadeBaseOperator(UcpBaseOperator): % self.__class__.__name__) # Retrieve promenade endpoint - self.promenade_svc_endpoint = ucp_service_endpoint( - self, svc_type=self.promenade_svc_type) + self.promenade_svc_endpoint = self.endpoints.endpoint_by_name( + service_endpoint.PROMENADE + ) LOG.info("Promenade endpoint is %s", self.promenade_svc_endpoint) - # Retrieve Deckhand Endpoint Information - deckhand_svc_endpoint = ucp_service_endpoint( - self, svc_type=self.deckhand_svc_type) - - LOG.info("Deckhand endpoint is %s", deckhand_svc_endpoint) - - # Form Deckhand Design Reference Path - # This URL will be used to retrieve the Site Design YAMLs - deckhand_path = "deckhand+" + deckhand_svc_endpoint - self.deckhand_design_ref = os.path.join(deckhand_path, - "revisions", - str(self.revision_id), - "rendered-documents") - if self.deckhand_design_ref: - LOG.info("Design YAMLs will be retrieved from %s", - self.deckhand_design_ref) - else: - raise AirflowException("Unable to Retrieve Deckhand Revision " - "%d!" % self.revision_id) - class PromenadeBaseOperatorPlugin(AirflowPlugin): diff --git a/src/bin/shipyard_airflow/shipyard_airflow/plugins/promenade_validate_site_design.py b/src/bin/shipyard_airflow/shipyard_airflow/plugins/promenade_validate_site_design.py index dab852af..b4d5ff01 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/plugins/promenade_validate_site_design.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/plugins/promenade_validate_site_design.py @@ -54,7 +54,7 @@ class PromenadeValidateSiteDesignOperator(PromenadeBaseOperator): payload = { 'rel': "design", - 'href': self.deckhand_design_ref, + 'href': self.design_ref, 'type': "application/x-yaml" } diff --git a/src/bin/shipyard_airflow/shipyard_airflow/plugins/service_endpoint.py b/src/bin/shipyard_airflow/shipyard_airflow/plugins/service_endpoint.py index e16fa146..47d6c159 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/plugins/service_endpoint.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/plugins/service_endpoint.py @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import configparser import logging import time @@ -22,15 +22,24 @@ try: except ImportError: from shipyard_airflow.plugins.service_session import ucp_keystone_session +# Lookup values for configuration to find the real service type for components +SHIPYARD = 'shipyard' +DRYDOCK = 'drydock' +ARMADA = 'armada' +DECKHAND = 'deckhand' +PROMENADE = 'promenade' -def ucp_service_endpoint(self, svc_type): +LOG = logging.getLogger(__name__) + + +def _ucp_service_endpoint(shipyard_conf, svc_type): # Initialize variables retry = 0 int_endpoint = None # Retrieve Keystone Session - sess = ucp_keystone_session(self) + sess = ucp_keystone_session(shipyard_conf) # We will allow 1 retry in getting the Keystone Endpoint with a # backoff interval of 10 seconds in case there is a temporary @@ -58,3 +67,26 @@ def ucp_service_endpoint(self, svc_type): raise AirflowException("Unable to get Keystone Endpoint!") else: return int_endpoint + + +class ServiceEndpoints(): + """Class that serves service endpoints""" + def __init__(self, shipyard_conf): + self.shipyard_conf = shipyard_conf + + # Read and parse shiyard.conf + self.config = configparser.ConfigParser() + self.config.read(self.shipyard_conf) + + def endpoint_by_name(self, svc_name): + """Return the service endpoint for the named service. + + :param svc_name: name of the service from which the service type will + be discovered from the shipyard configuration. Constants in this + module provide names that can be used with an expectation that they + work with a standard/complete configuration file. + E.g.: service_endpoint.DRYDOCK + """ + LOG.info("Looking up service endpoint for: %s", svc_name) + svc_type = self.config.get(svc_name, 'service_type') + return _ucp_service_endpoint(self.shipyard_conf, svc_type) diff --git a/src/bin/shipyard_airflow/shipyard_airflow/plugins/service_session.py b/src/bin/shipyard_airflow/shipyard_airflow/plugins/service_session.py index 17f46432..e45f494b 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/plugins/service_session.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/plugins/service_session.py @@ -22,11 +22,11 @@ from keystoneauth1.identity import v3 as keystone_v3 from keystoneauth1 import session as keystone_session -def ucp_keystone_session(self): +def ucp_keystone_session(shipyard_conf): # Read and parse shiyard.conf config = configparser.ConfigParser() - config.read(self.shipyard_conf) + config.read(shipyard_conf) # Initialize variables retry = 0 diff --git a/src/bin/shipyard_airflow/shipyard_airflow/plugins/service_token.py b/src/bin/shipyard_airflow/shipyard_airflow/plugins/service_token.py index 7bdd61bf..16905f20 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/plugins/service_token.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/plugins/service_token.py @@ -47,7 +47,7 @@ def shipyard_service_token(func): retry = 0 # Retrieve Keystone Session - self.svc_session = ucp_keystone_session(self) + self.svc_session = ucp_keystone_session(self.shipyard_conf) # We will allow 1 retry in getting the Keystone Token with a # backoff interval of 10 seconds in case there is a temporary diff --git a/src/bin/shipyard_airflow/shipyard_airflow/plugins/ucp_base_operator.py b/src/bin/shipyard_airflow/shipyard_airflow/plugins/ucp_base_operator.py index bd2176b4..c4685845 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/plugins/ucp_base_operator.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/plugins/ucp_base_operator.py @@ -14,12 +14,19 @@ import configparser import logging import math +import os from datetime import datetime +from airflow.exceptions import AirflowException from airflow.models import BaseOperator from airflow.plugins_manager import AirflowPlugin from airflow.utils.decorators import apply_defaults +try: + import service_endpoint +except ImportError: + from shipyard_airflow.plugins import service_endpoint + try: from get_k8s_logs import get_pod_logs except ImportError: @@ -35,6 +42,16 @@ try: except ImportError: from shipyard_airflow.plugins.xcom_puller import XcomPuller +from shipyard_airflow.common.document_validators.document_validation_utils \ + import DocumentValidationUtils + +try: + from deckhand_client_factory import DeckhandClientFactory +except ImportError: + from shipyard_airflow.plugins.deckhand_client_factory import ( + DeckhandClientFactory + ) + LOG = logging.getLogger(__name__) @@ -88,6 +105,8 @@ class UcpBaseOperator(BaseOperator): self.start_time = datetime.now() self.sub_dag_name = sub_dag_name self.xcom_push_flag = xcom_push + self.doc_utils = _get_document_util(self.shipyard_conf) + self.endpoints = service_endpoint.ServiceEndpoints(self.shipyard_conf) def execute(self, context): @@ -120,6 +139,7 @@ class UcpBaseOperator(BaseOperator): self.action_info = self.xcom_puller.get_action_info() self.dc = self.xcom_puller.get_deployment_configuration() self.revision_id = self.action_info['committed_rev_id'] + self.design_ref = self._deckhand_design_ref() def get_k8s_logs(self): """Retrieve Kubernetes pod/container logs specified by an opererator @@ -150,10 +170,64 @@ class UcpBaseOperator(BaseOperator): else: LOG.debug("There are no pod logs specified to retrieve") + def _deckhand_design_ref(self): + """Assemble a deckhand design_ref""" + # Retrieve DeckHand Endpoint Information + LOG.info("Assembling a design ref using revision: %s", + self.revision_id) + deckhand_svc_endpoint = self.endpoints.endpoint_by_name( + service_endpoint.DECKHAND + ) + # This URL will be used to retrieve the Site Design YAMLs + deckhand_path = "deckhand+{}".format(deckhand_svc_endpoint) + design_ref = os.path.join(deckhand_path, + "revisions", + str(self.revision_id), + "rendered-documents") + LOG.info("Design Reference is %s", design_ref) + return design_ref + + def get_unique_doc(self, schema, name, revision_id=None): + """Retrieve a specific document from Deckhand + + :param schema: the schema of the document + :param name: the metadata.name of the document + :param revision_id: the deckhand revision, or defaults to + self.revision_id + Wraps the document_validation_utils call to get the same. + Returns the sepcified document or raises an Airflow exception. + """ + if revision_id is None: + revision_id = self.revision_id + + LOG.info( + "Retrieve shipyard/DeploymentConfiguration/v1, " + "deployment-configuration from Deckhand" + ) + try: + return self.doc_utils.get_unique_doc(revision_id=revision_id, + name=name, + schema=schema) + except Exception as ex: + LOG.error("A document was expected to be available: Name: %s, " + "Schema: %s, Deckhand revision: %s, but there was an " + "error attempting to retrieve it. Since this document's " + "contents may be critical to the proper operation of " + "the workflow, this is fatal.", schema, name, + revision_id) + LOG.exception(ex) + # if the document is not found for ANY reason, the workflow is + # broken. Raise an Airflow Exception. + raise AirflowException(ex) + + +def _get_document_util(shipyard_conf): + """Retrieve an instance of the DocumentValidationUtils""" + dh_client = DeckhandClientFactory(shipyard_conf).get_client() + return DocumentValidationUtils(dh_client) + class UcpBaseOperatorPlugin(AirflowPlugin): - """Creates UcpBaseOperator in Airflow.""" - name = 'ucp_base_operator_plugin' operators = [UcpBaseOperator] diff --git a/src/bin/shipyard_airflow/shipyard_airflow/plugins/ucp_preflight_check_operator.py b/src/bin/shipyard_airflow/shipyard_airflow/plugins/ucp_preflight_check_operator.py index 96db1fde..256d091c 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/plugins/ucp_preflight_check_operator.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/plugins/ucp_preflight_check_operator.py @@ -21,9 +21,9 @@ from airflow.plugins_manager import AirflowPlugin from airflow.utils.decorators import apply_defaults try: - from service_endpoint import ucp_service_endpoint + import service_endpoint except ImportError: - from shipyard_airflow.plugins.service_endpoint import ucp_service_endpoint + from shipyard_airflow.plugins import service_endpoint try: from xcom_puller import XcomPuller @@ -55,16 +55,18 @@ class UcpHealthCheckOperator(BaseOperator): self.shipyard_conf = shipyard_conf self.main_dag_name = main_dag_name self.xcom_push_flag = xcom_push + self.endpoints = service_endpoint.ServiceEndpoints(self.shipyard_conf) def execute(self, context): # Initialize variable ucp_components = [ - 'armada', - 'deckhand', - 'kubernetesprovisioner', - 'physicalprovisioner', - 'shipyard'] + service_endpoint.ARMADA, + service_endpoint.DECKHAND, + service_endpoint.DRYDOCK, + service_endpoint.PROMENADE, + service_endpoint.SHIPYARD + ] # Define task_instance self.task_instance = context['task_instance'] @@ -80,19 +82,16 @@ class UcpHealthCheckOperator(BaseOperator): for component in ucp_components: # Retrieve Endpoint Information - service_endpoint = ucp_service_endpoint(self, - svc_type=component) - LOG.info("%s endpoint is %s", component, service_endpoint) + endpoint = self.endpoints.endpoint_by_name(component) + LOG.info("%s endpoint is %s", component, endpoint) # Construct Health Check Endpoint - healthcheck_endpoint = os.path.join(service_endpoint, + healthcheck_endpoint = os.path.join(endpoint, 'health') - LOG.info("%s healthcheck endpoint is %s", component, - healthcheck_endpoint) - try: - LOG.info("Performing Health Check on %s", component) + LOG.info("Performing Health Check on %s at %s", component, + healthcheck_endpoint) # Set health check timeout to 30 seconds req = requests.get(healthcheck_endpoint, timeout=30) @@ -109,7 +108,7 @@ class UcpHealthCheckOperator(BaseOperator): """ # If Drydock health check fails and continue-on-fail, continue # and create xcom key 'drydock_continue_on_fail' - if (component == 'physicalprovisioner' and + if (component == service_endpoint.DRYDOCK and self.action_info['parameters'].get( 'continue-on-fail', 'false').lower() == 'true' and self.action_info['dag_id'] in ['update_site', 'deploy_site']): diff --git a/src/bin/shipyard_airflow/tests/unit/common/deployment_group/test_deployment_group_manager.py b/src/bin/shipyard_airflow/tests/unit/common/deployment_group/test_deployment_group_manager.py index 3d681549..4c459b68 100644 --- a/src/bin/shipyard_airflow/tests/unit/common/deployment_group/test_deployment_group_manager.py +++ b/src/bin/shipyard_airflow/tests/unit/common/deployment_group/test_deployment_group_manager.py @@ -31,7 +31,7 @@ from shipyard_airflow.common.deployment_group.errors import ( from .node_lookup_stubs import node_lookup -_GROUPS_YAML = """ +GROUPS_YAML = """ - name: control-nodes critical: true depends_on: @@ -121,7 +121,7 @@ _GROUPS_YAML = """ minimum_successful_nodes: 1 """ -_CYCLE_GROUPS_YAML = """ +CYCLE_GROUPS_YAML = """ - name: group-a critical: true depends_on: @@ -148,7 +148,7 @@ _CYCLE_GROUPS_YAML = """ class TestDeploymentGroupManager: def test_basic_class(self): - dgm = DeploymentGroupManager(yaml.safe_load(_GROUPS_YAML), node_lookup) + dgm = DeploymentGroupManager(yaml.safe_load(GROUPS_YAML), node_lookup) assert dgm is not None # topological sort doesn't guarantee a specific order. assert dgm.get_next_group(Stage.PREPARED).name in ['ntp-node', @@ -160,7 +160,7 @@ class TestDeploymentGroupManager: def test_cycle_error(self): with pytest.raises(DeploymentGroupCycleError) as ce: - DeploymentGroupManager(yaml.safe_load(_CYCLE_GROUPS_YAML), + DeploymentGroupManager(yaml.safe_load(CYCLE_GROUPS_YAML), node_lookup) assert 'The following are involved' in str(ce) for g in ['group-a', 'group-c', 'group-d']: @@ -168,11 +168,71 @@ class TestDeploymentGroupManager: assert 'group-b' not in str(ce) def test_no_next_group(self): - dgm = DeploymentGroupManager(yaml.safe_load(_GROUPS_YAML), node_lookup) + dgm = DeploymentGroupManager(yaml.safe_load(GROUPS_YAML), node_lookup) assert dgm.get_next_group(Stage.DEPLOYED) is None + def test_group_list(self): + dgm = DeploymentGroupManager(yaml.safe_load(GROUPS_YAML), node_lookup) + assert len(dgm.group_list()) == 7 + group_names = [] + for group in dgm.group_list(): + group_names.append(group.name) + assert group_names == dgm._group_order + + def test_fail_unsuccessful_nodes(self): + dgm = DeploymentGroupManager(yaml.safe_load(GROUPS_YAML), node_lookup) + group = dgm._all_groups.get('control-nodes') + dgm.fail_unsuccessful_nodes(group, []) + assert not dgm.evaluate_group_succ_criteria('control-nodes', + Stage.DEPLOYED) + assert group.stage == Stage.FAILED + + def test_reports(self, caplog): + dgm = DeploymentGroupManager(yaml.safe_load(GROUPS_YAML), node_lookup) + dgm.mark_node_deployed('node1') + dgm.mark_node_prepared('node2') + dgm.mark_node_failed('node3') + dgm.mark_group_prepared('control-nodes') + dgm.mark_group_deployed('control-nodes') + dgm.mark_group_prepared('compute-nodes-1') + dgm.mark_group_failed('compute-nodes-2') + dgm.report_group_summary() + assert "===== Group Summary =====" in caplog.text + assert ("Group ntp-node [Critical] ended with stage: " + "Stage.NOT_STARTED") in caplog.text + caplog.clear() + dgm.report_node_summary() + assert "Nodes Stage.PREPARED: node2" in caplog.text + assert "Nodes Stage.FAILED: node3" in caplog.text + assert "===== End Node Summary =====" in caplog.text + assert "It was the best of times" not in caplog.text + + def test_evaluate_group_succ_criteria(self): + dgm = DeploymentGroupManager(yaml.safe_load(GROUPS_YAML), node_lookup) + group = dgm._all_groups.get('control-nodes') + + nodes = ["node{}".format(i) for i in range(1, 12)] + for node in nodes: + dgm.mark_node_prepared(node) + dgm.fail_unsuccessful_nodes(group, nodes) + assert dgm.evaluate_group_succ_criteria('control-nodes', + Stage.PREPARED) + assert group.stage == Stage.PREPARED + + for node in nodes: + dgm.mark_node_deployed(node) + assert dgm.evaluate_group_succ_criteria('control-nodes', + Stage.DEPLOYED) + assert group.stage == Stage.DEPLOYED + + def test_critical_groups_failed(self): + dgm = DeploymentGroupManager(yaml.safe_load(GROUPS_YAML), node_lookup) + assert not dgm.critical_groups_failed() + dgm.mark_group_failed('control-nodes') + assert dgm.critical_groups_failed() + def test_ordering_stages_flow_failure(self): - dgm = DeploymentGroupManager(yaml.safe_load(_GROUPS_YAML), node_lookup) + dgm = DeploymentGroupManager(yaml.safe_load(GROUPS_YAML), node_lookup) group = dgm.get_next_group(Stage.PREPARED) if group.name == 'monitoring-nodes': @@ -198,24 +258,24 @@ class TestDeploymentGroupManager: def test_deduplication(self): """all-compute-nodes is a duplicate of things it's dependent on, it should have no actionable nodes""" - dgm = DeploymentGroupManager(yaml.safe_load(_GROUPS_YAML), node_lookup) + dgm = DeploymentGroupManager(yaml.safe_load(GROUPS_YAML), node_lookup) acn = dgm._all_groups['all-compute-nodes'] assert len(acn.actionable_nodes) == 0 assert len(acn.full_nodes) == 6 def test_bad_group_name_lookup(self): - dgm = DeploymentGroupManager(yaml.safe_load(_GROUPS_YAML), node_lookup) + dgm = DeploymentGroupManager(yaml.safe_load(GROUPS_YAML), node_lookup) with pytest.raises(UnknownDeploymentGroupError) as udge: dgm.mark_group_prepared('Limburger Cheese') assert "Group name Limburger Cheese does not refer" in str(udge) def test_get_group_failures_for_stage_bad_input(self): - dgm = DeploymentGroupManager(yaml.safe_load(_GROUPS_YAML), node_lookup) + dgm = DeploymentGroupManager(yaml.safe_load(GROUPS_YAML), node_lookup) with pytest.raises(DeploymentGroupStageError): dgm.get_group_failures_for_stage('group1', Stage.FAILED) def test_get_group_failures_for_stage(self): - dgm = DeploymentGroupManager(yaml.safe_load(_GROUPS_YAML), node_lookup) + dgm = DeploymentGroupManager(yaml.safe_load(GROUPS_YAML), node_lookup) dgm._all_nodes = {'node%d' % x: Stage.DEPLOYED for x in range(1, 13)} for group_name in dgm._all_groups: @@ -269,27 +329,27 @@ class TestDeploymentGroupManager: 'actual': 0} def test_mark_node_deployed(self): - dgm = DeploymentGroupManager(yaml.safe_load(_GROUPS_YAML), node_lookup) + dgm = DeploymentGroupManager(yaml.safe_load(GROUPS_YAML), node_lookup) dgm.mark_node_deployed('node1') assert dgm.get_nodes(Stage.DEPLOYED) == ['node1'] def test_mark_node_prepared(self): - dgm = DeploymentGroupManager(yaml.safe_load(_GROUPS_YAML), node_lookup) + dgm = DeploymentGroupManager(yaml.safe_load(GROUPS_YAML), node_lookup) dgm.mark_node_prepared('node1') assert dgm.get_nodes(Stage.PREPARED) == ['node1'] def test_mark_node_failed(self): - dgm = DeploymentGroupManager(yaml.safe_load(_GROUPS_YAML), node_lookup) + dgm = DeploymentGroupManager(yaml.safe_load(GROUPS_YAML), node_lookup) dgm.mark_node_failed('node1') assert dgm.get_nodes(Stage.FAILED) == ['node1'] def test_mark_node_failed_unknown(self): - dgm = DeploymentGroupManager(yaml.safe_load(_GROUPS_YAML), node_lookup) + dgm = DeploymentGroupManager(yaml.safe_load(GROUPS_YAML), node_lookup) with pytest.raises(UnknownNodeError): dgm.mark_node_failed('not_node') def test_get_nodes_all(self): - dgm = DeploymentGroupManager(yaml.safe_load(_GROUPS_YAML), node_lookup) + dgm = DeploymentGroupManager(yaml.safe_load(GROUPS_YAML), node_lookup) assert set(dgm.get_nodes()) == set( ['node1', 'node2', 'node3', 'node4', 'node5', 'node6', 'node7', 'node8', 'node9', 'node10', 'node11', 'node12'] diff --git a/src/bin/shipyard_airflow/tests/unit/control/test_action_validators.py b/src/bin/shipyard_airflow/tests/unit/control/test_action_validators.py index 7b96a899..3196185d 100644 --- a/src/bin/shipyard_airflow/tests/unit/control/test_action_validators.py +++ b/src/bin/shipyard_airflow/tests/unit/control/test_action_validators.py @@ -29,9 +29,9 @@ import tests.unit.common.deployment_group.test_deployment_group_manager as tdgm def get_doc_returner(style, ds_name): strategy = MagicMock() if style == 'cycle': - strategy.data = {"groups": yaml.safe_load(tdgm._CYCLE_GROUPS_YAML)} + strategy.data = {"groups": yaml.safe_load(tdgm.CYCLE_GROUPS_YAML)} elif style == 'clean': - strategy.data = {"groups": yaml.safe_load(tdgm._GROUPS_YAML)} + strategy.data = {"groups": yaml.safe_load(tdgm.GROUPS_YAML)} def doc_returner(revision_id, rendered, **filters): if not revision_id == 99: diff --git a/src/bin/shipyard_airflow/tests/unit/plugins/test_drydock_nodes_operator.py b/src/bin/shipyard_airflow/tests/unit/plugins/test_drydock_nodes_operator.py new file mode 100644 index 00000000..bd9af71c --- /dev/null +++ b/src/bin/shipyard_airflow/tests/unit/plugins/test_drydock_nodes_operator.py @@ -0,0 +1,446 @@ +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests for drydock_nodes operator functions""" +import copy +import mock +import os +import yaml + +import pytest + +from airflow.exceptions import AirflowException + +from shipyard_airflow.common.deployment_group.deployment_group import ( + DeploymentGroup, + Stage +) + +from shipyard_airflow.common.deployment_group.deployment_group_manager import ( + DeploymentGroupManager +) + +from shipyard_airflow.plugins.drydock_nodes import ( + _default_deployment_strategy, + _gen_node_name_filter, + DrydockNodesOperator, + _process_deployment_groups, + QueryTaskResult +) + +from shipyard_airflow.plugins.deployment_configuration_operator import ( + DeploymentConfigurationOperator +) + +import tests.unit.common.deployment_group.test_deployment_group_manager as tdgm +from tests.unit.common.deployment_group.node_lookup_stubs import node_lookup + +CONF_FILE = os.path.join(os.path.dirname(__file__), 'test.conf') + + +def _fake_deployment_group_manager(cgf_bool): + + def dgm_func(group_dict_list, node_lookup): + dgm_mock = mock.MagicMock() + dgm_mock.critical_groups_failed = mock.Mock(return_value=cgf_bool) + return dgm_mock + return dgm_func(None, None) + + +GROUP_DICT = { + 'name': 'control-nodes', + 'critical': True, + 'depends_on': ['ntp-node'], + 'selectors': [ + { + 'node_names': ['node1', 'node2', 'node3', 'node4', 'node5'], + 'node_labels': [], + 'node_tags': [], + 'rack_names': [], + }, + ], + 'success_criteria': { + 'percent_successful_nodes': 90, + 'minimum_successful_nodes': 3, + 'maximum_failed_nodes': 1, + }, +} + +TASK_RESULT = QueryTaskResult('t1', 'tn') +TASK_RESULT.successes = ['node1', 'node2', 'node3'] + +# The top level result should have all successes specified +TASK_DICT = { + '0': { + 'result': { + 'successes': ['node1', 'node2', 'node3'], + 'status': 'success', + }, + 'subtask_id_list': ['1'], + 'status': 'complete' + }, + '1': { + 'result': { + 'successes': ['node3'], + 'status': 'success', + }, + 'subtask_id_list': ['2', '3'], + 'status': 'complete' + }, + '2': { + 'result': { + 'successes': ['node2'], + 'status': 'success', + 'details': {'messageList': [ + { + 'context': 'node2', + 'context_type': 'node', + 'error': False, + 'extra': '{}', + 'message': 'Warning node2 is slow', + 'ts': '2018-06-14 22:41:08.195036' + }, + { + 'context': 'node2', + 'context_type': 'node', + }, + ]}, + }, + 'subtask_id_list': [], + 'status': 'complete', + }, + '3': { + 'result': { + 'status': 'success', + 'details': {'messageList': [ + { + 'context': 'task 3', + 'context_type': 'task', + 'error': False, + 'extra': '{}', + 'message': 'Started subtask 3 for action apply_node_stuff', + 'ts': '2018-06-14 22:41:08.195036' + }, + { + 'context': 'task 3', + 'context_type': 'task', + 'error': False, + 'extra': '{}', + 'message': 'Ended subtask 3 for action apply_node_stuff', + 'ts': '2018-06-14 22:41:08.195036' + }, + ]}, + }, + 'subtask_id_list': [], + 'status': 'complete', + }, + '99': { + 'result': { + 'status': 'failure', + 'successes': ['node98', 'node97'], + 'failures': ['node99'], + 'details': {'messageList': [ + { + 'context': 'task 99', + 'context_type': 'task', + 'error': False, + 'extra': '{}', + 'message': 'Started subtask 99 for action do_things', + 'ts': '2018-06-14 22:41:08.195036' + }, + { + 'context': 'task 99', + 'context_type': 'task', + 'error': True, + 'extra': '{}', + 'message': 'Task 99 broke things', + 'ts': '2018-06-14 22:41:08.195036' + }, + ]}, + }, + 'subtask_id_list': ['2'], + }, +} + +DEP_STRAT = {'groups': yaml.safe_load(tdgm.GROUPS_YAML)} + + +def _fake_setup_ds(self): + self.strategy = DEP_STRAT + + +def _fake_get_task_dict(task_id): + return TASK_DICT[task_id] + + +def _gen_pe_func(mode, stand_alone=False): + """Gen a function to play the role of prepare or deploy function + + :param mode: 'all-success', 'all-fail' + :param stand_alone: indicate to make this a "self" or non-self + function. During mocking for direct calls with this function, + stand_alone needs to be True. When patching the DrydockNodesOperator + object, it needs to be false, so that the right amount of "self" + matches the invocation. + """ + def _func(group): + qtr = QueryTaskResult('ti', 'tn') + if mode == 'all-success': + qtr.successes.extend(group.actionable_nodes) + if mode == 'all-fail': + # no new sucesses + pass + return qtr + + def _func_self(self, group): + return _func(group) + + if stand_alone: + return _func + else: + return _func_self + + +class TestDrydockNodesOperator: + def test_default_deployment_strategy(self): + """Assert that the default deployment strategy is named default, is + critical, has no selector values, and an all-or-nothing success + criteria + """ + s = _default_deployment_strategy() + assert s['groups'][0]['name'] == 'default' + assert s['groups'][0]['critical'] + assert s['groups'][0]['selectors'][0]['node_names'] == [] + assert s['groups'][0]['selectors'][0]['node_labels'] == [] + assert s['groups'][0]['selectors'][0]['node_tags'] == [] + assert s['groups'][0]['selectors'][0]['rack_names'] == [] + assert s['groups'][0]['success_criteria'] == { + 'percent_successful_nodes': 100 + } + + def test_gen_node_name_filter(self): + """Test that a node name filter with only node_names is created""" + nodes = ['node1', 'node2'] + f = _gen_node_name_filter(nodes) + assert f['filter_set'][0]['node_names'] == nodes + assert len(f['filter_set']) == 1 + + def test_init_DrydockNodesOperator(self): + op = DrydockNodesOperator(main_dag_name="main", + shipyard_conf=CONF_FILE, + task_id="t1") + assert op is not None + + @mock.patch.object(DrydockNodesOperator, "get_unique_doc") + def test_setup_deployment_strategy(self, udoc): + """Assert that the base class method get_unique_doc would be invoked + """ + op = DrydockNodesOperator(main_dag_name="main", + shipyard_conf=CONF_FILE, + task_id="t1") + op.dc = copy.deepcopy( + DeploymentConfigurationOperator.config_keys_defaults + ) + op.dc['physical_provisioner.deployment_strategy'] = 'taco-salad' + op._setup_deployment_strategy() + udoc.assert_called_once_with( + name='taco-salad', + schema="shipyard/DeploymentStrategy/v1" + ) + + @mock.patch("shipyard_airflow.plugins.drydock_nodes." + "_get_deployment_group_manager", + return_value=_fake_deployment_group_manager(cgf_bool=False)) + @mock.patch("shipyard_airflow.plugins.drydock_nodes." + "_process_deployment_groups", return_value=True) + @mock.patch("shipyard_airflow.plugins.drydock_nodes._get_node_lookup", + return_value=mock.MagicMock()) + def test_do_execute(self, nl, pdg, get_dgm, caplog): + op = DrydockNodesOperator(main_dag_name="main", + shipyard_conf=CONF_FILE, + task_id="t1") + op.dc = copy.deepcopy( + DeploymentConfigurationOperator.config_keys_defaults + ) + op.design_ref = {} + op.do_execute() + get_dgm.assert_called_once() + nl.assert_called_once() + pdg.assert_called_once() + assert "critical groups have met their success criteria" in caplog.text + + @mock.patch("shipyard_airflow.plugins.drydock_nodes." + "_get_deployment_group_manager", + return_value=_fake_deployment_group_manager(cgf_bool=True)) + @mock.patch("shipyard_airflow.plugins.drydock_nodes." + "_process_deployment_groups", return_value=True) + @mock.patch("shipyard_airflow.plugins.drydock_nodes._get_node_lookup", + return_value=mock.MagicMock()) + def test_do_execute_exception(self, nl, pdg, get_dgm): + op = DrydockNodesOperator(main_dag_name="main", + shipyard_conf=CONF_FILE, + task_id="t1") + with pytest.raises(AirflowException): + op.dc = copy.deepcopy( + DeploymentConfigurationOperator.config_keys_defaults + ) + op.design_ref = {} + op.do_execute() + + get_dgm.assert_called_once() + nl.assert_called_once() + pdg.assert_called_once() + + def test_execute_prepare(self): + op = DrydockNodesOperator(main_dag_name="main", + shipyard_conf=CONF_FILE, + task_id="t1") + op.dc = copy.deepcopy( + DeploymentConfigurationOperator.config_keys_defaults + ) + op._setup_configured_values() + op._execute_task = mock.MagicMock(return_value=TASK_RESULT) + group = DeploymentGroup(GROUP_DICT, mock.MagicMock()) + group.actionable_nodes = ['node1', 'node2', 'node3'] + op._execute_prepare(group) + op._execute_task.assert_called_once() + + @mock.patch("shipyard_airflow.plugins.check_k8s_node_status." + "check_node_status", return_value=[]) + def test_execute_deployment(self, cns): + op = DrydockNodesOperator(main_dag_name="main", + shipyard_conf=CONF_FILE, + task_id="t1") + op.dc = copy.deepcopy( + DeploymentConfigurationOperator.config_keys_defaults + ) + op._setup_configured_values() + op._execute_task = mock.MagicMock(return_value=TASK_RESULT) + op.join_wait = 0 + group = DeploymentGroup(GROUP_DICT, mock.MagicMock()) + group.actionable_nodes = ['node1', 'node2', 'node3'] + op._execute_deployment(group) + op._execute_task.assert_called_once() + cns.assert_called_once() + + @mock.patch("shipyard_airflow.plugins.check_k8s_node_status." + "check_node_status", return_value=['node2', 'node4']) + def test_execute_deployment_k8s_fail(self, cns, caplog): + op = DrydockNodesOperator(main_dag_name="main", + shipyard_conf=CONF_FILE, + task_id="t1") + op.dc = copy.deepcopy( + DeploymentConfigurationOperator.config_keys_defaults + ) + op._setup_configured_values() + op._execute_task = mock.MagicMock(return_value=TASK_RESULT) + op.join_wait = 0 + group = DeploymentGroup(GROUP_DICT, mock.MagicMock()) + group.actionable_nodes = ['node1', 'node2', 'node3'] + task_res = op._execute_deployment(group) + op._execute_task.assert_called_once() + cns.assert_called_once() + assert 'node4 failed to join Kubernetes' in caplog.text + assert len(task_res.successes) == 2 + + def test_get_successess_for_task(self): + op = DrydockNodesOperator(main_dag_name="main", + shipyard_conf=CONF_FILE, + task_id="t1") + op.get_task_dict = _fake_get_task_dict + s = op._get_successes_for_task('0') + for i in range(1, 3): + assert "node{}".format(i) in s + + def test_get_successess_for_task_more_logging(self): + op = DrydockNodesOperator(main_dag_name="main", + shipyard_conf=CONF_FILE, + task_id="t1") + op.get_task_dict = _fake_get_task_dict + s = op._get_successes_for_task('99') + for i in range(97, 98): + assert "node{}".format(i) in s + assert "node2" not in s + + def test_process_deployment_groups(self): + """Test the core processing loop of the drydock_nodes module""" + dgm = DeploymentGroupManager( + yaml.safe_load(tdgm.GROUPS_YAML), + node_lookup + ) + _process_deployment_groups( + dgm, + _gen_pe_func('all-success', stand_alone=True), + _gen_pe_func('all-success', stand_alone=True)) + assert not dgm.critical_groups_failed() + for group in dgm.group_list(): + assert dgm.evaluate_group_succ_criteria(group.name, Stage.DEPLOYED) + + def test_process_deployment_groups_dep_fail(self): + """Test the core processing loop of the drydock_nodes module""" + dgm = DeploymentGroupManager( + yaml.safe_load(tdgm.GROUPS_YAML), + node_lookup + ) + _process_deployment_groups( + dgm, + _gen_pe_func('all-success', stand_alone=True), + _gen_pe_func('all-fail', stand_alone=True)) + assert dgm.critical_groups_failed() + for group in dgm.group_list(): + assert group.stage == Stage.FAILED + dgm.report_group_summary() + dgm.report_node_summary() + + def test_process_deployment_groups_prep_fail(self): + """Test the core processing loop of the drydock_nodes module""" + dgm = DeploymentGroupManager( + yaml.safe_load(tdgm.GROUPS_YAML), + node_lookup + ) + _process_deployment_groups( + dgm, + _gen_pe_func('all-fail', stand_alone=True), + _gen_pe_func('all-success', stand_alone=True)) + assert dgm.critical_groups_failed() + for group in dgm.group_list(): + assert group.stage == Stage.FAILED + dgm.report_group_summary() + dgm.report_node_summary() + + @mock.patch("shipyard_airflow.plugins.drydock_nodes._get_node_lookup", + return_value=node_lookup) + @mock.patch.object( + DrydockNodesOperator, + '_execute_prepare', + new=_gen_pe_func('all-success') + ) + @mock.patch.object( + DrydockNodesOperator, + '_execute_deployment', + new=_gen_pe_func('all-success') + ) + @mock.patch.object(DrydockNodesOperator, '_setup_deployment_strategy', + new=_fake_setup_ds) + def test_do_execute_with_dgm(self, nl, caplog): + op = DrydockNodesOperator(main_dag_name="main", + shipyard_conf=CONF_FILE, + task_id="t1") + op.dc = copy.deepcopy( + DeploymentConfigurationOperator.config_keys_defaults + ) + op.design_ref = {"a": "b"} + op.do_execute() + assert "critical groups have met their success criteria" in caplog.text + + # TODO (bryan-strassner) test for _execute_task diff --git a/src/bin/shipyard_airflow/tests/unit/plugins/test_ucp_preflight_check_operator.py b/src/bin/shipyard_airflow/tests/unit/plugins/test_ucp_preflight_check_operator.py index 39daaccc..ffef9f69 100644 --- a/src/bin/shipyard_airflow/tests/unit/plugins/test_ucp_preflight_check_operator.py +++ b/src/bin/shipyard_airflow/tests/unit/plugins/test_ucp_preflight_check_operator.py @@ -11,7 +11,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import mock +import os +import unittest.mock as mock + import pytest from requests.models import Response @@ -22,10 +24,12 @@ from shipyard_airflow.plugins.ucp_preflight_check_operator import ( ucp_components = [ 'armada', 'deckhand', - 'kubernetesprovisioner', - 'physicalprovisioner', + 'promenade', + 'drydock', 'shipyard'] +CONF_FILE = os.path.join(os.path.dirname(__file__), 'test.conf') + def test_drydock_health_skip_update_site(caplog): """ @@ -44,18 +48,18 @@ def test_drydock_health_skip_update_site(caplog): "parameters": {"continue-on-fail": "true"} } - op = UcpHealthCheckOperator(task_id='test') + op = UcpHealthCheckOperator(task_id='test', shipyard_conf=CONF_FILE) op.action_info = action_info op.xcom_pusher = mock.MagicMock() - op.log_health_exception('physicalprovisioner', req) + op.log_health_exception('drydock', req) assert expected_log in caplog.text action_info = { "dag_id": "deploy_site", "parameters": {"continue-on-fail": "true"} } - op.log_health_exception('physicalprovisioner', req) + op.log_health_exception('drydock', req) assert expected_log in caplog.text @@ -70,7 +74,7 @@ def test_failure_log_health(): req = Response() req.status_code = None - op = UcpHealthCheckOperator(task_id='test') + op = UcpHealthCheckOperator(task_id='test', shipyard_conf=CONF_FILE) op.action_info = action_info op.xcom_pusher = mock.MagicMock() diff --git a/tools/run_shipyard.sh b/tools/run_shipyard.sh index f5e3d02b..e2edb918 100755 --- a/tools/run_shipyard.sh +++ b/tools/run_shipyard.sh @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -set -ex +set -e # User can run the script like how they would execute the Shipyard CLI. # For instance, to run the 'shipyard get actions' command, user can execute @@ -33,7 +33,7 @@ set -ex # Source Base Docker Command DIR="$(realpath $(dirname "${BASH_SOURCE}"))" source "${DIR}/shipyard_docker_base_command.sh" - +SHIPYARD_HOSTPATH=${SHIPYARD_HOSTPATH:-"/home/shipyard/host"} # Execute Shipyard CLI # # NOTE: We will mount the current directory so that any directories @@ -46,4 +46,4 @@ source "${DIR}/shipyard_docker_base_command.sh" # the actual validation and execution. Exceptions will also be # handled by the Shipyard CLI as this is meant to be a thin wrapper # script -${base_docker_command} -v $(pwd):/home/shipyard/host ${SHIPYARD_IMAGE} $@ +${base_docker_command} -v $(pwd):${SHIPYARD_HOSTPATH} ${SHIPYARD_IMAGE} $@