Workflow to support deployment groups

Updates the Shipyard/Airflow workflow for deploy_site and
update_site to use the deployment group/deployment strategy
information from the design.

This allows for baremetal nodes to be deployed in a design-
specified order, with criticality and success criteria driving
the success and failure of deployment.

Includes refactoring of service endpoints to reduce the need
for so much data passing.

Change-Id: Ib5e9fca535ca74d1819fe46959695acfed5b65c2
This commit is contained in:
Bryan Strassner 2018-05-10 17:15:59 -05:00
parent ea47f2c77b
commit 04906cce68
29 changed files with 1391 additions and 345 deletions

2
.gitignore vendored
View File

@ -2,7 +2,7 @@
__pycache__/
*.py[cod]
*$py.class
**/.pytest_cache/
.pytest_cache/
# C extensions

View File

@ -79,9 +79,9 @@ run:
.PHONY: build_airflow
build_airflow:
ifeq ($(USE_PROXY), true)
docker build --network host -t $(IMAGE) --label $(LABEL) -f $(IMAGE_DIR)/Dockerfile $(IMAGE_DIR) --build-arg http_proxy=$(PROXY) --build-arg https_proxy=$(PROXY)
docker build --network host -t $(IMAGE) --label $(LABEL) -f $(IMAGE_DIR)/Dockerfile --build-arg http_proxy=$(PROXY) --build-arg https_proxy=$(PROXY) --build-arg ctx_base=$(BUILD_CTX) .
else
docker build --network host -t $(IMAGE) --label $(LABEL) -f $(IMAGE_DIR)/Dockerfile $(IMAGE_DIR)
docker build --network host -t $(IMAGE) --label $(LABEL) -f $(IMAGE_DIR)/Dockerfile --build-arg ctx_base=$(BUILD_CTX) .
endif
ifeq ($(PUSH_IMAGE), true)
docker push $(IMAGE)

View File

@ -32,6 +32,7 @@ EXPOSE $WORKER_PORT
# Set ARG for usage during build
ARG AIRFLOW_HOME=/usr/local/airflow
ARG DEBIAN_FRONTEND=noninteractive
ARG ctx_base=src/bin
# Kubectl version
ARG KUBECTL_VERSION=1.8.6
@ -76,17 +77,30 @@ RUN useradd -ms /bin/bash -d ${AIRFLOW_HOME} airflow \
# Dependency requirements
# Note - removing snakebite (python 2 vs. 3). See:
# https://github.com/puckel/docker-airflow/issues/77
COPY ./requirements.txt /tmp/
COPY images/airflow/requirements.txt /tmp/
RUN pip3 install -r /tmp/requirements.txt \
&& pip3 uninstall -y snakebite || true
# Copy scripts used in the container:
# entrypoint.sh, airflow_start_service.sh and airflow_logrotate.sh
COPY script/*.sh ${AIRFLOW_HOME}/
COPY images/airflow/script/*.sh ${AIRFLOW_HOME}/
# Change permissions
RUN chown -R airflow: ${AIRFLOW_HOME}
# Shipyard
#
# Shipyard provides core functionality used by the airflow plugins/operators
# Since Shipyard and Airflow are built together as images, this should prevent
# stale or out-of-date code between these parts.
# Shipyard requirements, source and installation
COPY ${ctx_base}/shipyard_airflow/requirements.txt /tmp/api_requirements.txt
RUN pip3 install -r /tmp/api_requirements.txt
COPY ${ctx_base}/shipyard_airflow /tmp/shipyard/
RUN cd /tmp/shipyard \
&& python3 setup.py install
# Set work directory
USER airflow
WORKDIR ${AIRFLOW_HOME}

View File

@ -82,6 +82,71 @@ class DeploymentGroupManager:
return self._all_groups[group]
return None
def group_list(self):
"""Return a list of DeploymentGroup objects in group order"""
summary = []
for group_nm in self._group_order:
group = self._all_groups[group_nm]
summary.append(group)
return summary
def critical_groups_failed(self):
"""Return True if any critical groups have failed"""
for group in self._all_groups.values():
if group.stage == Stage.FAILED and group.critical:
return True
return False
def evaluate_group_succ_criteria(self, group_name, stage):
"""Checks a group against its success criteria for a stage
:param group_name: the name of the group to check
:param stage: Stage.PREPARED or Stage.DEPLOYED
Returns a boolean: True = success, False = failure.
"""
failed_criteria = self.get_group_failures_for_stage(group_name, stage)
if failed_criteria:
# Logging of criteria has already occurred during checking.
self.mark_group_failed(group_name)
LOG.info("Group %s has failed to meet its success criteria while "
"trying to move to stage: %s",
group_name, stage)
return False
elif stage == Stage.DEPLOYED:
self.mark_group_deployed(group_name)
LOG.info("Group %s has met its success criteria and is "
"successfully deployed (%s)", group_name, stage)
return True
elif stage == Stage.PREPARED:
self.mark_group_prepared(group_name)
LOG.info("Group %s has met its success criteria and is "
"now set to stage %s", group_name, stage)
return True
def report_group_summary(self):
"""Reports the status of all groups handled by this deployment"""
LOG.info("===== Group Summary =====")
for group in self.group_list():
LOG.info(" Group %s%s ended with stage: %s",
group.name,
" [Critical]" if group.critical else "",
group.stage)
LOG.info("===== End Group Summary =====")
def report_node_summary(self):
"""Reports the status of all nodes handled by this deployment"""
# Ordered stages
stages = [Stage.NOT_STARTED,
Stage.PREPARED,
Stage.DEPLOYED,
Stage.FAILED]
LOG.info("===== Node Summary =====")
for stage in stages:
nodes = self.get_nodes(stage=stage)
LOG.info(" Nodes %s: %s", stage, ", ".join(nodes))
LOG.info("===== End Node Summary =====")
#
# Methods that support setup of the nodes in groups
#
@ -163,6 +228,22 @@ class DeploymentGroupManager:
# Methods for handling nodes
#
def fail_unsuccessful_nodes(self, group, successes):
"""Fail nodes that were not successful in a group's actionable list
:param group: the group to check
:param successes: the list of successful nodes from processing
This makes an assumption that all actionable nodes should be in a list
of successes if they are to be considered successful. If the success
list is empty, all the actionable nodes in the group would be
considered failed.
"""
# Mark non-successes as failed
failed_nodes = set(group.actionable_nodes).difference(set(successes))
for node_name in failed_nodes:
self.mark_node_failed(node_name)
def mark_node_deployed(self, node_name):
"""Mark a node as deployed"""
self._set_node_stage(node_name, Stage.DEPLOYED)
@ -203,7 +284,7 @@ def _update_group_actionable_nodes(group, known_nodes):
", ".join(known_nodes))
group_nodes = set(group.full_nodes)
group.actionable_nodes = group_nodes.difference(known_nodes)
group.actionable_nodes = list(group_nodes.difference(known_nodes))
LOG.debug("Group %s set actionable_nodes to %s. "
"Full node list for this group is %s",
group.name,

View File

@ -13,8 +13,7 @@
# limitations under the License.
from airflow.models import DAG
from airflow.operators import DrydockDeployNodesOperator
from airflow.operators import DrydockPrepareNodesOperator
from airflow.operators import DrydockNodesOperator
from airflow.operators import DrydockPrepareSiteOperator
from airflow.operators import DrydockVerifySiteOperator
@ -43,15 +42,8 @@ def deploy_site_drydock(parent_dag_name, child_dag_name, args):
sub_dag_name=child_dag_name,
dag=dag)
drydock_prepare_nodes = DrydockPrepareNodesOperator(
task_id='prepare_nodes',
shipyard_conf=config_path,
main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name,
dag=dag)
drydock_deploy_nodes = DrydockDeployNodesOperator(
task_id='deploy_nodes',
drydock_nodes = DrydockNodesOperator(
task_id='prepare_and_deploy_nodes',
shipyard_conf=config_path,
main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name,
@ -59,7 +51,6 @@ def deploy_site_drydock(parent_dag_name, child_dag_name, args):
# Define dependencies
drydock_prepare_site.set_upstream(drydock_verify_site)
drydock_prepare_nodes.set_upstream(drydock_prepare_site)
drydock_deploy_nodes.set_upstream(drydock_prepare_nodes)
drydock_nodes.set_upstream(drydock_prepare_site)
return dag

View File

@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
from urllib.parse import urlparse
from airflow.exceptions import AirflowException
@ -22,9 +21,9 @@ from airflow.utils.decorators import apply_defaults
import armada.common.client as client
import armada.common.session as session
from get_k8s_pod_port_ip import get_pod_port_ip
from service_endpoint import ucp_service_endpoint
from service_token import shipyard_service_token
from ucp_base_operator import UcpBaseOperator
import service_endpoint
from xcom_pusher import XcomPusher
LOG = logging.getLogger(__name__)
@ -42,16 +41,12 @@ class ArmadaBaseOperator(UcpBaseOperator):
@apply_defaults
def __init__(self,
armada_svc_type='armada',
deckhand_svc_type='deckhand',
query={},
svc_session=None,
svc_token=None,
*args, **kwargs):
"""Initialization of ArmadaBaseOperator object.
:param armada_svc_type: Armada Service Type
:param deckhand_svc_type: Deckhand Service Type
:param query: A dictionary containing explicit query string parameters
:param svc_session: Keystone Session
:param svc_token: Keystone Token
@ -66,8 +61,6 @@ class ArmadaBaseOperator(UcpBaseOperator):
pod_selector_pattern=[{'pod_pattern': 'armada-api',
'container': 'armada-api'}],
*args, **kwargs)
self.armada_svc_type = armada_svc_type
self.deckhand_svc_type = deckhand_svc_type
self.query = query
self.svc_session = svc_session
self.svc_token = svc_token
@ -81,21 +74,11 @@ class ArmadaBaseOperator(UcpBaseOperator):
# Logs uuid of action performed by the Operator
LOG.info("Armada Operator for action %s", self.action_info['id'])
# Retrieve Endpoint Information
armada_svc_endpoint = ucp_service_endpoint(
self, svc_type=self.armada_svc_type)
# Set up armada client
self.armada_client = self._init_armada_client(armada_svc_endpoint,
self.svc_token)
# Retrieve DeckHand Endpoint Information
deckhand_svc_endpoint = ucp_service_endpoint(
self, svc_type=self.deckhand_svc_type)
# Get deckhand design reference url
self.deckhand_design_ref = self._init_deckhand_design_ref(
deckhand_svc_endpoint)
self.armada_client = self._init_armada_client(
self.endpoints.endpoint_by_name(service_endpoint.ARMADA),
self.svc_token
)
@staticmethod
def _init_armada_client(armada_svc_endpoint, svc_token):
@ -133,26 +116,6 @@ class ArmadaBaseOperator(UcpBaseOperator):
else:
raise AirflowException("Failed to set up Armada client!")
def _init_deckhand_design_ref(self, deckhand_svc_endpoint):
LOG.info("Deckhand endpoint is %s", deckhand_svc_endpoint)
# Form DeckHand Design Reference Path
# This URL will be used to retrieve the Site Design YAMLs
deckhand_path = "deckhand+" + deckhand_svc_endpoint
_deckhand_design_ref = os.path.join(deckhand_path,
"revisions",
str(self.revision_id),
"rendered-documents")
if _deckhand_design_ref:
LOG.info("Design YAMLs will be retrieved from %s",
_deckhand_design_ref)
return _deckhand_design_ref
else:
raise AirflowException("Unable to Retrieve Design Reference!")
@get_pod_port_ip('tiller', namespace='kube-system')
def get_tiller_info(self, pods_ip_port={}):

View File

@ -58,7 +58,7 @@ class ArmadaPostApplyOperator(ArmadaBaseOperator):
try:
armada_post_apply = self.armada_client.post_apply(
manifest=armada_manifest,
manifest_ref=self.deckhand_design_ref,
manifest_ref=self.design_ref,
values=override_values,
set=chart_set,
query=self.query,

View File

@ -42,8 +42,7 @@ class ArmadaValidateDesignOperator(ArmadaBaseOperator):
# Validate Site Design
try:
post_validate = self.armada_client.post_validate(
manifest=self.deckhand_design_ref,
timeout=timeout)
manifest=self.design_ref, timeout=timeout)
except errors.ClientError as client_error:
# Dump logs from Armada API pods

View File

@ -15,7 +15,6 @@
import logging
import time
from airflow.exceptions import AirflowException
from kubernetes import client, config
@ -54,7 +53,7 @@ def check_node_status(time_out, interval):
# Logs initial state of all nodes in the cluster
ret_init = v1.list_node(watch=False)
logging.info("Current state of nodes in Cluster is")
logging.info("Current state of nodes in the cluster is")
for i in ret_init.items:
logging.info("%s\t%s\t%s", i.metadata.name,
@ -86,7 +85,7 @@ def check_node_status(time_out, interval):
cluster_ready = False
# Print current state of node
logging.info("Node %s is not Ready", j.metadata.name)
logging.info("Node %s is not ready", j.metadata.name)
logging.debug("Current status of %s is %s",
j.metadata.name,
j.status.conditions[-1].message)
@ -96,16 +95,18 @@ def check_node_status(time_out, interval):
logging.info("Node %s is in Ready state", j.metadata.name)
# Raise Time Out Exception
# If any nodes are not ready and the timeout is reached, stop waiting
if not cluster_ready and i == end_range:
raise AirflowException("Timed Out! One or more Nodes fail to "
"get into Ready State!")
logging.info("Timed Out! One or more Nodes failed to reach ready "
"state")
break
elif cluster_ready:
# Exit loop if Cluster is in Ready state
if cluster_ready:
logging.info("All nodes are in Ready state")
logging.info("All nodes are in ready state")
break
else:
# Back off and check again in next iteration
logging.info("Wait for %d seconds...", int(interval))
time.sleep(int(interval))
# Return the nodes that are not ready.
return not_ready_node_list

View File

@ -19,7 +19,7 @@ from airflow.plugins_manager import AirflowPlugin
from airflow.exceptions import AirflowException
from deckhand.client import client as deckhand_client
from service_endpoint import ucp_service_endpoint
import service_endpoint
from service_token import shipyard_service_token
from ucp_base_operator import UcpBaseOperator
@ -41,8 +41,6 @@ class DeckhandBaseOperator(UcpBaseOperator):
committed_ver=None,
deckhandclient=None,
deckhand_client_read_timeout=None,
deckhand_svc_endpoint=None,
deckhand_svc_type='deckhand',
revision_id=None,
svc_session=None,
svc_token=None,
@ -53,8 +51,6 @@ class DeckhandBaseOperator(UcpBaseOperator):
:param committed_ver: Last committed version
:param deckhandclient: An instance of deckhand client
:param deckhand_client_read_timeout: Deckhand client connect timeout
:param deckhand_svc_endpoint: Deckhand Service Endpoint
:param deckhand_svc_type: Deckhand Service Type
:param revision_id: Target revision for workflow
:param svc_session: Keystone Session
:param svc_token: Keystone Token
@ -70,8 +66,6 @@ class DeckhandBaseOperator(UcpBaseOperator):
self.committed_ver = committed_ver
self.deckhandclient = deckhandclient
self.deckhand_client_read_timeout = deckhand_client_read_timeout
self.deckhand_svc_endpoint = deckhand_svc_endpoint
self.deckhand_svc_type = deckhand_svc_type
self.revision_id = revision_id
self.svc_session = svc_session
self.svc_token = svc_token
@ -96,8 +90,9 @@ class DeckhandBaseOperator(UcpBaseOperator):
self.action_info['id'])
# Retrieve Endpoint Information
self.deckhand_svc_endpoint = ucp_service_endpoint(
self, svc_type=self.deckhand_svc_type)
self.deckhand_svc_endpoint = self.endpoints.endpoint_by_name(
service_endpoint.DECKHAND
)
LOG.info("Deckhand endpoint is %s",
self.deckhand_svc_endpoint)

View File

@ -45,7 +45,7 @@ class DeploymentConfigurationOperator(BaseOperator):
cannot be retrieved
"""
config_keys_defaults = {
"physical_provisioner.deployment_strategy": "all-at-once",
"physical_provisioner.deployment_strategy": None,
"physical_provisioner.deploy_interval": 30,
"physical_provisioner.deploy_timeout": 3600,
"physical_provisioner.destroy_interval": 30,

View File

@ -14,7 +14,6 @@
import copy
import pprint
import logging
import os
import time
from urllib.parse import urlparse
@ -25,9 +24,35 @@ from airflow.utils.decorators import apply_defaults
import drydock_provisioner.drydock_client.client as client
import drydock_provisioner.drydock_client.session as session
from drydock_provisioner import error as errors
from service_endpoint import ucp_service_endpoint
try:
import service_endpoint
except ImportError:
from shipyard_airflow.plugins import service_endpoint
try:
from service_token import shipyard_service_token
except ImportError:
from shipyard_airflow.plugins.service_token import shipyard_service_token
try:
from ucp_base_operator import UcpBaseOperator
except ImportError:
from shipyard_airflow.plugins.ucp_base_operator import UcpBaseOperator
try:
from drydock_errors import (
DrydockClientUseFailureException,
DrydockTaskFailedException,
DrydockTaskNotCreatedException,
DrydockTaskTimeoutException
)
except ImportError:
from shipyard_airflow.plugins.drydock_errors import (
DrydockClientUseFailureException,
DrydockTaskFailedException,
DrydockTaskNotCreatedException,
DrydockTaskTimeoutException
)
LOG = logging.getLogger(__name__)
@ -44,11 +69,7 @@ class DrydockBaseOperator(UcpBaseOperator):
@apply_defaults
def __init__(self,
deckhand_design_ref=None,
deckhand_svc_type='deckhand',
drydock_client=None,
drydock_svc_endpoint=None,
drydock_svc_type='physicalprovisioner',
drydock_task_id=None,
node_filter=None,
redeploy_server=None,
@ -57,11 +78,7 @@ class DrydockBaseOperator(UcpBaseOperator):
*args, **kwargs):
"""Initialization of DrydockBaseOperator object.
:param deckhand_design_ref: A URI reference to the design documents
:param deckhand_svc_type: Deckhand Service Type
:param drydockclient: An instance of drydock client
:param drydock_svc_endpoint: Drydock Service Endpoint
:param drydock_svc_type: Drydock Service Type
:param drydock_task_id: Drydock Task ID
:param node_filter: A filter for narrowing the scope of the task.
Valid fields are 'node_names', 'rack_names',
@ -81,11 +98,7 @@ class DrydockBaseOperator(UcpBaseOperator):
pod_selector_pattern=[{'pod_pattern': 'drydock-api',
'container': 'drydock-api'}],
*args, **kwargs)
self.deckhand_design_ref = deckhand_design_ref
self.deckhand_svc_type = deckhand_svc_type
self.drydock_client = drydock_client
self.drydock_svc_endpoint = drydock_svc_endpoint
self.drydock_svc_type = drydock_svc_type
self.drydock_task_id = drydock_task_id
self.node_filter = node_filter
self.redeploy_server = redeploy_server
@ -126,8 +139,9 @@ class DrydockBaseOperator(UcpBaseOperator):
% self.__class__.__name__)
# Retrieve Endpoint Information
self.drydock_svc_endpoint = ucp_service_endpoint(
self, svc_type=self.drydock_svc_type)
self.drydock_svc_endpoint = self.endpoints.endpoint_by_name(
service_endpoint.DRYDOCK
)
LOG.info("Drydock endpoint is %s", self.drydock_svc_endpoint)
@ -147,7 +161,9 @@ class DrydockBaseOperator(UcpBaseOperator):
if dd_session:
LOG.info("Successfully Set Up DryDock Session")
else:
raise AirflowException("Failed to set up Drydock Session!")
raise DrydockClientUseFailureException(
"Failed to set up Drydock Session!"
)
# Use the DrydockSession to build a DrydockClient that can
# be used to make one or more API calls
@ -158,26 +174,9 @@ class DrydockBaseOperator(UcpBaseOperator):
if self.drydock_client:
LOG.info("Successfully Set Up DryDock client")
else:
raise AirflowException("Failed to set up Drydock Client!")
# Retrieve DeckHand Endpoint Information
deckhand_svc_endpoint = ucp_service_endpoint(
self, svc_type=self.deckhand_svc_type)
LOG.info("Deckhand endpoint is %s", deckhand_svc_endpoint)
# Form DeckHand Design Reference Path
# This URL will be used to retrieve the Site Design YAMLs
deckhand_path = "deckhand+" + deckhand_svc_endpoint
self.deckhand_design_ref = os.path.join(deckhand_path,
"revisions",
str(self.revision_id),
"rendered-documents")
if self.deckhand_design_ref:
LOG.info("Design YAMLs will be retrieved from %s",
self.deckhand_design_ref)
else:
raise AirflowException("Unable to Retrieve Design Reference!")
raise DrydockClientUseFailureException(
"Failed to set up Drydock Client!"
)
@shipyard_service_token
def _auth_gen(self):
@ -196,7 +195,7 @@ class DrydockBaseOperator(UcpBaseOperator):
try:
# Create Task
create_task_response = self.drydock_client.create_task(
design_ref=self.deckhand_design_ref,
design_ref=self.design_ref,
task_action=task_action,
node_filter=self.node_filter)
@ -204,7 +203,7 @@ class DrydockBaseOperator(UcpBaseOperator):
# Dump logs from Drydock pods
self.get_k8s_logs()
raise AirflowException(client_error)
raise DrydockClientUseFailureException(client_error)
# Retrieve Task ID
self.drydock_task_id = create_task_response['task_id']
@ -216,7 +215,7 @@ class DrydockBaseOperator(UcpBaseOperator):
if self.drydock_task_id:
return self.drydock_task_id
else:
raise AirflowException("Unable to create task!")
raise DrydockTaskNotCreatedException("Unable to create task!")
def query_task(self, interval, time_out):
@ -235,21 +234,16 @@ class DrydockBaseOperator(UcpBaseOperator):
try:
# Retrieve current task state
task_state = self.drydock_client.get_task(
task_id=self.drydock_task_id)
task_state = self.get_task_dict(task_id=self.drydock_task_id)
task_status = task_state['status']
task_result = task_state['result']['status']
LOG.info("Current status of task id %s is %s",
self.drydock_task_id, task_status)
except errors.ClientError as client_error:
# Dump logs from Drydock pods
except DrydockClientUseFailureException:
self.get_k8s_logs()
raise AirflowException(client_error)
raise
except:
# There can be situations where there are intermittent network
# issues that prevents us from retrieving the task state. We
@ -275,6 +269,21 @@ class DrydockBaseOperator(UcpBaseOperator):
else:
self.task_failure(True)
def get_task_dict(self, task_id):
"""Retrieve task output in its raw dictionary format
:param task_id: The id of the task to retrieve
Raises DrydockClientUseFailureException if the client raises an
exception
See:
http://att-comdev-drydock.readthedocs.io/en/latest/task.html#task-status-schema
"""
try:
return self.drydock_client.get_task(task_id=task_id)
except errors.ClientError as client_error:
# Dump logs from Drydock pods
raise DrydockClientUseFailureException(client_error)
def task_failure(self, _task_failure):
# Dump logs from Drydock pods
self.get_k8s_logs()
@ -289,7 +298,7 @@ class DrydockBaseOperator(UcpBaseOperator):
self.all_task_ids = {t['task_id']: t for t in all_tasks}
except errors.ClientError as client_error:
raise AirflowException(client_error)
raise DrydockClientUseFailureException(client_error)
# Retrieve the failed parent task and assign it to list
failed_parent_task = (
@ -299,7 +308,7 @@ class DrydockBaseOperator(UcpBaseOperator):
# Since there is only 1 failed parent task, we will print index 0
# of the list
if failed_parent_task:
LOG.error('%s task has either failed or timed out',
LOG.error("%s task has either failed or timed out",
failed_parent_task[0]['action'])
LOG.error(pprint.pprint(failed_parent_task[0]))
@ -312,9 +321,13 @@ class DrydockBaseOperator(UcpBaseOperator):
# Raise Exception to terminate workflow
if _task_failure:
raise AirflowException("Failed to Execute/Complete Task!")
raise DrydockTaskFailedException(
"Failed to Execute/Complete Task!"
)
else:
raise AirflowException("Task Execution Timed Out!")
raise DrydockTaskTimeoutException(
"Task Execution Timed Out!"
)
def check_subtask_failure(self, subtask_id_list):
@ -367,7 +380,9 @@ class DrydockBaseOperator(UcpBaseOperator):
subtask_id)
else:
raise AirflowException("Unable to retrieve subtask info!")
raise DrydockClientUseFailureException(
"Unable to retrieve subtask info!"
)
class DrydockBaseOperatorPlugin(AirflowPlugin):

View File

@ -1,70 +0,0 @@
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import time
from airflow.plugins_manager import AirflowPlugin
from check_k8s_node_status import check_node_status
from drydock_base_operator import DrydockBaseOperator
LOG = logging.getLogger(__name__)
class DrydockDeployNodesOperator(DrydockBaseOperator):
"""Drydock Deploy Nodes Operator
This operator will trigger drydock to deploy the bare metal
nodes
"""
def do_execute(self):
# Trigger DryDock to execute task
self.create_task('deploy_nodes')
# Retrieve query interval and timeout
q_interval = self.dc['physical_provisioner.deploy_interval']
task_timeout = self.dc['physical_provisioner.deploy_timeout']
# Query Task
self.query_task(q_interval, task_timeout)
# It takes time for the cluster join process to be triggered across
# all the nodes in the cluster. Hence there is a need to back off
# and wait before checking the state of the cluster join process.
join_wait = self.dc['physical_provisioner.join_wait']
LOG.info("All nodes deployed in MAAS")
LOG.info("Wait for %d seconds before checking node state...",
join_wait)
time.sleep(join_wait)
# Check that cluster join process is completed before declaring
# deploy_node as 'completed'.
node_st_timeout = self.dc['kubernetes.node_status_timeout']
node_st_interval = self.dc['kubernetes.node_status_interval']
check_node_status(node_st_timeout, node_st_interval)
class DrydockDeployNodesOperatorPlugin(AirflowPlugin):
"""Creates DrydockDeployNodesOperator in Airflow."""
name = 'drydock_deploy_nodes_operator'
operators = [DrydockDeployNodesOperator]

View File

@ -0,0 +1,34 @@
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Drydock specific exceptions generated during operator execution.
Generally marker exceptions extending AirflowException
"""
from airflow.exceptions import AirflowException
class DrydockClientUseFailureException(AirflowException):
pass
class DrydockTaskFailedException(AirflowException):
pass
class DrydockTaskNotCreatedException(AirflowException):
pass
class DrydockTaskTimeoutException(AirflowException):
pass

View File

@ -0,0 +1,486 @@
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Prepare and deploy nodes using Drydock
Uses the deployment strategy named in the deployment-configuration to
progress through preparation and deployment of nodes in a group-based fashion.
In the case of no specified deployment strategy, an "all-at-once" approach is
taken, by which all nodes are deployed together.
Historical Note: This operator replaces the function of drydock_prepare_nodes
and drydock_deploy_nodes operators that existed previously.
"""
import logging
import time
from airflow.exceptions import AirflowException
from airflow.plugins_manager import AirflowPlugin
from shipyard_airflow.common.deployment_group.deployment_group import Stage
from shipyard_airflow.common.deployment_group.deployment_group_manager import \
DeploymentGroupManager
from shipyard_airflow.common.deployment_group.node_lookup import NodeLookup
try:
import check_k8s_node_status
except ImportError:
from shipyard_airflow.plugins import check_k8s_node_status
try:
from drydock_base_operator import DrydockBaseOperator
except ImportError:
from shipyard_airflow.plugins.drydock_base_operator import \
DrydockBaseOperator
try:
from drydock_errors import (
DrydockTaskFailedException,
DrydockTaskTimeoutException
)
except ImportError:
from shipyard_airflow.plugins.drydock_errors import (
DrydockTaskFailedException,
DrydockTaskTimeoutException
)
LOG = logging.getLogger(__name__)
class DrydockNodesOperator(DrydockBaseOperator):
"""Drydock Nodes Operator
Using a deployment strategy to calculate the deployment sequence,
deploy a series of baremetal nodes using Drydock.
"""
def do_execute(self):
self._setup_configured_values()
# setup self.strat_name and self.strategy
self.strategy = {}
self._setup_deployment_strategy()
dgm = _get_deployment_group_manager(
self.strategy['groups'],
_get_node_lookup(self.drydock_client, self.design_ref)
)
_process_deployment_groups(dgm,
self._execute_prepare,
self._execute_deployment)
# All groups "complete" (as they're going to be). Report summary
dgm.report_group_summary()
dgm.report_node_summary()
if dgm.critical_groups_failed():
raise AirflowException(
"One or more deployment groups marked as critical have failed"
)
else:
LOG.info("All critical groups have met their success criteria")
# TODO (bryan-strassner) it is very possible that many nodes failed
# deployment, but all critical groups had enough success to
# continue processing. This will be non-obvious to the casual
# observer of the workflow. A likely enhancement is to allow
# notes be added to the shipyard action associated with this
# workflow that would be reported back to the end user doing a
# describe of the action. This will require new database structures
# to hold the notes, and a means to insert the notes. A shared
# functionality in the base ucp operator or a common module would
# be a reasonable way to support this.
def _setup_configured_values(self):
"""Sets self.<name> values from the deployment configuration"""
# Retrieve query intervals and timeouts
# Intervals - How often will something be queried for status.
self.dep_interval = self.dc['physical_provisioner.deploy_interval']
self.node_st_interval = self.dc['kubernetes.node_status_interval']
self.prep_interval = self.dc[
'physical_provisioner.prepare_node_interval'
]
# Timeouts - Time Shipyard waits for completion of a task.
self.dep_timeout = self.dc['physical_provisioner.deploy_timeout']
self.node_st_timeout = self.dc['kubernetes.node_status_timeout']
self.prep_timeout = self.dc[
'physical_provisioner.prepare_node_timeout'
]
# The time to wait before querying k8s nodes after Drydock deploy nodes
self.join_wait = self.dc['physical_provisioner.join_wait']
def _execute_prepare(self, group):
"""Executes the prepare nodes step for the group.
:param group: the DeploymentGroup to prepare
Returns a QueryTaskResult object
"""
LOG.info("Group %s is preparing nodes", group.name)
self.node_filter = _gen_node_name_filter(group.actionable_nodes)
return self._execute_task('prepare_nodes',
self.prep_interval,
self.prep_timeout)
def _execute_deployment(self, group):
"""Execute the deployment of nodes for the group.
:param group: The DeploymentGroup to deploy
Returns a QueryTaskResult object
"""
LOG.info("Group %s is deploying nodes", group.name)
self.node_filter = _gen_node_name_filter(group.actionable_nodes)
task_result = self._execute_task('deploy_nodes',
self.dep_interval,
self.dep_timeout)
if not task_result.successes:
# if there are no successes from Drydock, there is no need to
# wait and check on the results from node status.
LOG.info("There are no nodes indicated as successful from Drydock."
" Skipping waiting for Kubernetes node join and "
"proceeding to validation")
return task_result
# It takes time for the cluster join process to be triggered across
# all the nodes in the cluster. Hence there is a need to back off
# and wait before checking the state of the cluster join process.
LOG.info("Nodes <%s> reported as deployed in MAAS",
", ".join(task_result.successes))
LOG.info("Waiting for %d seconds before checking node state...",
self.join_wait)
time.sleep(self.join_wait)
# Check that cluster join process is completed before declaring
# deploy_node as 'completed'.
# This should only include nodes that drydock has indicated as
# successful and has passed the join script to.
# Anything not ready in the timeout needs to be considered a failure
not_ready_list = check_k8s_node_status.check_node_status(
self.node_st_timeout,
self.node_st_interval
)
for node in not_ready_list:
# Remove nodes that are not ready from the list of successes, since
# they did not complete deployment successfully.
try:
LOG.info("Node %s failed to join the Kubernetes cluster or was"
" not timely enough", node)
task_result.successes.remove(node)
except ValueError:
# This node is not joined, but was not one that we were
# looking for either.
LOG.info("%s failed to join Kubernetes, but was not in the "
"Drydock results: %s",
node,
", ".join(task_result.successes))
return task_result
def _execute_task(self, task_name, interval, timeout):
"""Execute the Drydock task requested
:param task_name: 'prepare_nodes', 'deploy_nodes'
;param interval: The time between checking status on the task
:param timeout: The total time allowed for the task
Wraps the query_task method in the base class, capturing
AirflowExceptions and summarizing results into a response
QueryTaskResult object
Note: It does not matter if the task ultimately succeeds or fails in
Drydock - the base class will handle all the logging and etc for
the purposes of troubleshooting. What matters is the node successes.
Following any result of query_task, this code will re-query the task
results from Drydock to gather the node successes placing them into
the successes list in the response object. In the case of a failure to
get the task results, this workflow must assume that the result is a
total loss, and pass back no successes
"""
self.create_task(task_name)
result = QueryTaskResult(self.drydock_task_id, task_name)
try:
self.query_task(interval, timeout)
except DrydockTaskFailedException:
# Task failure may be successful enough based on success criteria.
# This should not halt the overall flow of this workflow step.
LOG.warn(
"Task %s has failed. Logs contain details of the failure. "
"Some nodes may be succesful, processing continues", task_name
)
except DrydockTaskTimeoutException:
# Task timeout may be successful enough based on success criteria.
# This should not halt the overall flow of this workflow step.
LOG.warn(
"Task %s has timed out after %s seconds. Logs contain details "
"of the failure. Some nodes may be succesful, processing "
"continues", task_name, timeout
)
# Other AirflowExceptions will fail the whole task - let them do this.
# find successes
result.successes = self._get_successes_for_task(self.drydock_task_id)
return result
def _get_successes_for_task(self, task_id, extend_success=True):
"""Discover the successful nodes based on the current task id.
:param task_id: The id of the task
:param extend_successes: determines if this result extends successes
or simply reports on the task.
Gets the set of successful nodes by examining the self.drydock_task_id.
The children are traversed recursively to display each sub-task's
information.
Only a reported success at the parent task indicates success of the
task. Drydock is assumed to roll up overall success to the top level.
"""
success_nodes = []
task_dict = self.get_task_dict(task_id)
task_status = task_dict.get('status', "Not Specified")
task_result = task_dict.get('result')
if task_result is None:
LOG.warn("Task result is missing for task %s, with status %s."
" Neither successes nor further details can be extracted"
" from this result",
task_id, task_status)
else:
if extend_success:
try:
# successes and failures on the task result drive the
# interpretation of success or failure for this workflow.
# - Any node that is _only_ success for a task is a
# success to us.
# - Any node that is listed as a failure is a failure.
# This implies that a node listed as a success and a
# failure is a failure. E.g. some subtasks succeeded and
# some failed
t_successes = task_result.get('successes', [])
t_failures = task_result.get('failures', [])
actual_successes = set(t_successes) - set(t_failures)
# acquire the successes from success nodes
success_nodes.extend(actual_successes)
LOG.info("Nodes <%s> added as successes for task %s",
", ".join(success_nodes), task_id)
except KeyError:
# missing key on the path to getting nodes - don't add any
LOG.warn("Missing successes field on result of task %s, "
"but a success field was expected. No successes"
" can be extracted from this result",
task_id)
pass
_report_task_info(task_id, task_result, task_status)
# for each child, report only the step info, do not add to overall
# success list.
for ch_task_id in task_dict.get('subtask_id_list', []):
success_nodes.extend(
self._get_successes_for_task(ch_task_id, extend_success=False)
)
# deduplicate and return
return set(success_nodes)
def _setup_deployment_strategy(self):
"""Determine the deployment strategy
Uses the specified strategy from the deployment configuration
or returns a default configuration of 'all-at-once'
"""
self.strat_name = self.dc['physical_provisioner.deployment_strategy']
if self.strat_name:
# if there is a deployment strategy specified, get it and use it
self.strategy = self.get_unique_doc(
name=self.strat_name,
schema="shipyard/DeploymentStrategy/v1"
)
else:
# The default behavior is to deploy all nodes, and fail if
# any nodes fail to deploy.
self.strat_name = 'all-at-once (defaulted)'
self.strategy = _default_deployment_strategy()
LOG.info("Strategy Name: %s has %s groups",
self.strat_name,
len(self.strategy.get('groups', [])))
#
# Functions supporting the nodes operator class
#
def _get_node_lookup(drydock_client, design_ref):
"""Return a NodeLookup suitable for the DeploymentGroupManager
:param drydock_client: the drydock_client object
:param design_ref: the design_ref for the NodeLookup
"""
return NodeLookup(drydock_client, design_ref).lookup
def _get_deployment_group_manager(groups_dict_list, node_lookup):
"""Return a DeploymentGroupManager suitable for managing this deployment
:param groups_dict_list: the list of group dictionaries to use
:param node_lookup: a NodeLookup object that will be used by this
DeploymentGroupManager
"""
return DeploymentGroupManager(groups_dict_list, node_lookup)
def _process_deployment_groups(dgm, prepare_func, deploy_func):
"""Executes the deployment group deployments
:param dgm: the DeploymentGroupManager object that manages the
dependency chain of groups
:param prepare_func: a function that accepts a DeploymentGroup and returns
a QueryTaskResult with the purpose of preparing nodes
:param deploy_func: a function that accepts a DeploymentGroup and returns
a QueryTaskResult with the purpose of deploying nodes
"""
complete = False
while not complete:
# Find the next group to be prepared. Prepare and deploy it.
group = dgm.get_next_group(Stage.PREPARED)
if group is None:
LOG.info("There are no more groups eligible to process")
# whether or not really complete, the processing loop is done.
complete = True
continue
LOG.info("*** Deployment Group: %s is being processed ***", group.name)
if not group.actionable_nodes:
LOG.info("There were no actionable nodes for group %s. It is "
"possible that all nodes: [%s] have previously been "
"deployed. Group will be immediately checked "
"against its success criteria", group.name,
", ".join(group.full_nodes))
# In the case of a group having no actionable nodes, since groups
# prepare -> deploy in direct sequence, we can check against
# deployment, since all nodes would need to be deployed or have
# been attempted. Need to follow the state-transition, so
# PREPARED -> DEPLOYED
dgm.evaluate_group_succ_criteria(group.name, Stage.PREPARED)
dgm.evaluate_group_succ_criteria(group.name, Stage.DEPLOYED)
# success or failure, move on to next group
continue
LOG.info("%s has actionable nodes: [%s]", group.name,
", ".join(group.actionable_nodes))
if len(group.actionable_nodes) < len(group.full_nodes):
LOG.info("Some nodes are not actionable because they were "
"included in a prior group, but will be considered in "
"the success critera calculation for this group")
# Group has actionable nodes.
# Prepare Nodes for group, store QueryTaskResults
prep_qtr = prepare_func(group)
# Mark successes as prepared
for node_name in prep_qtr.successes:
dgm.mark_node_prepared(node_name)
dgm.fail_unsuccessful_nodes(group, prep_qtr.successes)
should_deploy = dgm.evaluate_group_succ_criteria(group.name,
Stage.PREPARED)
if not should_deploy:
# group has failed, move on to next group. Current group has
# been marked as failed.
continue
# Continue with deployment
dep_qtr = deploy_func(group)
# Mark successes as deployed
for node_name in dep_qtr.successes:
dgm.mark_node_deployed(node_name)
dgm.fail_unsuccessful_nodes(group, dep_qtr.successes)
dgm.evaluate_group_succ_criteria(group.name, Stage.DEPLOYED)
def _report_task_info(task_id, task_result, task_status):
"""Logs information regarding a task.
:param task_id: id of the task
:param task_result: The result dictionary of the task
:param task_status: The status for the task
"""
# setup fields, or defaults if missing values
task_failures = task_result.get('failures', [])
task_successes = task_result.get('successes', [])
result_details = task_result.get('details', {'messageList': []})
result_status = task_result.get('status', "No status supplied")
LOG.info("Task %s with status %s/%s reports successes: [%s] and"
" failures: [%s]", task_id, task_status, result_status,
", ".join(task_successes), ", ".join(task_failures))
for message_item in result_details['messageList']:
context_type = message_item.get('context_type', 'N/A')
context_id = message_item.get('context', 'N/A')
message = message_item.get('message', "No message text supplied")
error = message_item.get('error', False)
timestamp = message_item.get('ts', 'No timestamp supplied')
LOG.info(" - Task %s for item %s:%s has message: %s [err=%s, at %s]",
task_id, context_type, context_id, message, error, timestamp)
def _default_deployment_strategy():
"""The default deployment strategy for 'all-at-once'"""
return {
'groups': [
{
'name': 'default',
'critical': True,
'depends_on': [],
'selectors': [
{
'node_names': [],
'node_labels': [],
'node_tags': [],
'rack_names': [],
},
],
'success_criteria': {
'percent_successful_nodes': 100
},
}
]
}
def _gen_node_name_filter(node_names):
"""Generates a drydock compatible node filter using only node names
:param node_names: the nodes with which to create a filter
"""
return {
'filter_set_type': 'union',
'filter_set': [
{
'filter_type': 'union',
'node_names': node_names
}
]
}
class QueryTaskResult:
"""Represents a summarized query result from a task"""
def __init__(self, task_id, task_name):
self.task_id = task_id
self.task_name = task_name
# The succeeded node names
self.successes = []
class DrydockNodesOperatorPlugin(AirflowPlugin):
"""Creates DrydockPrepareNodesOperator in Airflow."""
name = 'drydock_nodes_operator'
operators = [DrydockNodesOperator]

View File

@ -1,46 +0,0 @@
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from airflow.plugins_manager import AirflowPlugin
from drydock_base_operator import DrydockBaseOperator
class DrydockPrepareNodesOperator(DrydockBaseOperator):
"""Drydock Prepare Nodes Operator
This operator will trigger drydock to prepare nodes for
site deployment
"""
def do_execute(self):
# Trigger DryDock to execute task
self.create_task('prepare_nodes')
# Retrieve query interval and timeout
q_interval = self.dc['physical_provisioner.prepare_node_interval']
task_timeout = self.dc['physical_provisioner.prepare_node_timeout']
# Query Task
self.query_task(q_interval, task_timeout)
class DrydockPrepareNodesOperatorPlugin(AirflowPlugin):
"""Creates DrydockPrepareNodesOperator in Airflow."""
name = 'drydock_prepare_nodes_operator'
operators = [DrydockPrepareNodesOperator]

View File

@ -49,7 +49,7 @@ class DrydockValidateDesignOperator(DrydockBaseOperator):
payload = {
'rel': "design",
'href': self.deckhand_design_ref,
'href': self.design_ref,
'type': "application/x-yaml"
}

View File

@ -12,16 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import os
from airflow.utils.decorators import apply_defaults
from airflow.plugins_manager import AirflowPlugin
from airflow.exceptions import AirflowException
try:
from service_endpoint import ucp_service_endpoint
import service_endpoint
except ImportError:
from shipyard_airflow.plugins.service_endpoint import ucp_service_endpoint
from shipyard_airflow.plugins import service_endpoint
try:
from service_token import shipyard_service_token
@ -47,19 +46,11 @@ class PromenadeBaseOperator(UcpBaseOperator):
@apply_defaults
def __init__(self,
deckhand_design_ref=None,
deckhand_svc_type='deckhand',
promenade_svc_endpoint=None,
promenade_svc_type='kubernetesprovisioner',
redeploy_server=None,
svc_token=None,
*args, **kwargs):
"""Initialization of PromenadeBaseOperator object.
:param deckhand_design_ref: A URI reference to the design documents
:param deckhand_svc_type: Deckhand Service Type
:param promenade_svc_endpoint: Promenade Service Endpoint
:param promenade_svc_type: Promenade Service Type
:param redeploy_server: Server to be redeployed
:param svc_token: Keystone Token
The Drydock operator assumes that prior steps have set xcoms for
@ -71,10 +62,6 @@ class PromenadeBaseOperator(UcpBaseOperator):
pod_selector_pattern=[{'pod_pattern': 'promenade-api',
'container': 'promenade-api'}],
*args, **kwargs)
self.deckhand_design_ref = deckhand_design_ref
self.deckhand_svc_type = deckhand_svc_type
self.promenade_svc_endpoint = promenade_svc_endpoint
self.promenade_svc_type = promenade_svc_type
self.redeploy_server = redeploy_server
self.svc_token = svc_token
@ -98,31 +85,12 @@ class PromenadeBaseOperator(UcpBaseOperator):
% self.__class__.__name__)
# Retrieve promenade endpoint
self.promenade_svc_endpoint = ucp_service_endpoint(
self, svc_type=self.promenade_svc_type)
self.promenade_svc_endpoint = self.endpoints.endpoint_by_name(
service_endpoint.PROMENADE
)
LOG.info("Promenade endpoint is %s", self.promenade_svc_endpoint)
# Retrieve Deckhand Endpoint Information
deckhand_svc_endpoint = ucp_service_endpoint(
self, svc_type=self.deckhand_svc_type)
LOG.info("Deckhand endpoint is %s", deckhand_svc_endpoint)
# Form Deckhand Design Reference Path
# This URL will be used to retrieve the Site Design YAMLs
deckhand_path = "deckhand+" + deckhand_svc_endpoint
self.deckhand_design_ref = os.path.join(deckhand_path,
"revisions",
str(self.revision_id),
"rendered-documents")
if self.deckhand_design_ref:
LOG.info("Design YAMLs will be retrieved from %s",
self.deckhand_design_ref)
else:
raise AirflowException("Unable to Retrieve Deckhand Revision "
"%d!" % self.revision_id)
class PromenadeBaseOperatorPlugin(AirflowPlugin):

View File

@ -54,7 +54,7 @@ class PromenadeValidateSiteDesignOperator(PromenadeBaseOperator):
payload = {
'rel': "design",
'href': self.deckhand_design_ref,
'href': self.design_ref,
'type': "application/x-yaml"
}

View File

@ -11,7 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import configparser
import logging
import time
@ -22,15 +22,24 @@ try:
except ImportError:
from shipyard_airflow.plugins.service_session import ucp_keystone_session
# Lookup values for configuration to find the real service type for components
SHIPYARD = 'shipyard'
DRYDOCK = 'drydock'
ARMADA = 'armada'
DECKHAND = 'deckhand'
PROMENADE = 'promenade'
def ucp_service_endpoint(self, svc_type):
LOG = logging.getLogger(__name__)
def _ucp_service_endpoint(shipyard_conf, svc_type):
# Initialize variables
retry = 0
int_endpoint = None
# Retrieve Keystone Session
sess = ucp_keystone_session(self)
sess = ucp_keystone_session(shipyard_conf)
# We will allow 1 retry in getting the Keystone Endpoint with a
# backoff interval of 10 seconds in case there is a temporary
@ -58,3 +67,26 @@ def ucp_service_endpoint(self, svc_type):
raise AirflowException("Unable to get Keystone Endpoint!")
else:
return int_endpoint
class ServiceEndpoints():
"""Class that serves service endpoints"""
def __init__(self, shipyard_conf):
self.shipyard_conf = shipyard_conf
# Read and parse shiyard.conf
self.config = configparser.ConfigParser()
self.config.read(self.shipyard_conf)
def endpoint_by_name(self, svc_name):
"""Return the service endpoint for the named service.
:param svc_name: name of the service from which the service type will
be discovered from the shipyard configuration. Constants in this
module provide names that can be used with an expectation that they
work with a standard/complete configuration file.
E.g.: service_endpoint.DRYDOCK
"""
LOG.info("Looking up service endpoint for: %s", svc_name)
svc_type = self.config.get(svc_name, 'service_type')
return _ucp_service_endpoint(self.shipyard_conf, svc_type)

View File

@ -22,11 +22,11 @@ from keystoneauth1.identity import v3 as keystone_v3
from keystoneauth1 import session as keystone_session
def ucp_keystone_session(self):
def ucp_keystone_session(shipyard_conf):
# Read and parse shiyard.conf
config = configparser.ConfigParser()
config.read(self.shipyard_conf)
config.read(shipyard_conf)
# Initialize variables
retry = 0

View File

@ -47,7 +47,7 @@ def shipyard_service_token(func):
retry = 0
# Retrieve Keystone Session
self.svc_session = ucp_keystone_session(self)
self.svc_session = ucp_keystone_session(self.shipyard_conf)
# We will allow 1 retry in getting the Keystone Token with a
# backoff interval of 10 seconds in case there is a temporary

View File

@ -14,12 +14,19 @@
import configparser
import logging
import math
import os
from datetime import datetime
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
try:
import service_endpoint
except ImportError:
from shipyard_airflow.plugins import service_endpoint
try:
from get_k8s_logs import get_pod_logs
except ImportError:
@ -35,6 +42,16 @@ try:
except ImportError:
from shipyard_airflow.plugins.xcom_puller import XcomPuller
from shipyard_airflow.common.document_validators.document_validation_utils \
import DocumentValidationUtils
try:
from deckhand_client_factory import DeckhandClientFactory
except ImportError:
from shipyard_airflow.plugins.deckhand_client_factory import (
DeckhandClientFactory
)
LOG = logging.getLogger(__name__)
@ -88,6 +105,8 @@ class UcpBaseOperator(BaseOperator):
self.start_time = datetime.now()
self.sub_dag_name = sub_dag_name
self.xcom_push_flag = xcom_push
self.doc_utils = _get_document_util(self.shipyard_conf)
self.endpoints = service_endpoint.ServiceEndpoints(self.shipyard_conf)
def execute(self, context):
@ -120,6 +139,7 @@ class UcpBaseOperator(BaseOperator):
self.action_info = self.xcom_puller.get_action_info()
self.dc = self.xcom_puller.get_deployment_configuration()
self.revision_id = self.action_info['committed_rev_id']
self.design_ref = self._deckhand_design_ref()
def get_k8s_logs(self):
"""Retrieve Kubernetes pod/container logs specified by an opererator
@ -150,10 +170,64 @@ class UcpBaseOperator(BaseOperator):
else:
LOG.debug("There are no pod logs specified to retrieve")
def _deckhand_design_ref(self):
"""Assemble a deckhand design_ref"""
# Retrieve DeckHand Endpoint Information
LOG.info("Assembling a design ref using revision: %s",
self.revision_id)
deckhand_svc_endpoint = self.endpoints.endpoint_by_name(
service_endpoint.DECKHAND
)
# This URL will be used to retrieve the Site Design YAMLs
deckhand_path = "deckhand+{}".format(deckhand_svc_endpoint)
design_ref = os.path.join(deckhand_path,
"revisions",
str(self.revision_id),
"rendered-documents")
LOG.info("Design Reference is %s", design_ref)
return design_ref
def get_unique_doc(self, schema, name, revision_id=None):
"""Retrieve a specific document from Deckhand
:param schema: the schema of the document
:param name: the metadata.name of the document
:param revision_id: the deckhand revision, or defaults to
self.revision_id
Wraps the document_validation_utils call to get the same.
Returns the sepcified document or raises an Airflow exception.
"""
if revision_id is None:
revision_id = self.revision_id
LOG.info(
"Retrieve shipyard/DeploymentConfiguration/v1, "
"deployment-configuration from Deckhand"
)
try:
return self.doc_utils.get_unique_doc(revision_id=revision_id,
name=name,
schema=schema)
except Exception as ex:
LOG.error("A document was expected to be available: Name: %s, "
"Schema: %s, Deckhand revision: %s, but there was an "
"error attempting to retrieve it. Since this document's "
"contents may be critical to the proper operation of "
"the workflow, this is fatal.", schema, name,
revision_id)
LOG.exception(ex)
# if the document is not found for ANY reason, the workflow is
# broken. Raise an Airflow Exception.
raise AirflowException(ex)
def _get_document_util(shipyard_conf):
"""Retrieve an instance of the DocumentValidationUtils"""
dh_client = DeckhandClientFactory(shipyard_conf).get_client()
return DocumentValidationUtils(dh_client)
class UcpBaseOperatorPlugin(AirflowPlugin):
"""Creates UcpBaseOperator in Airflow."""
name = 'ucp_base_operator_plugin'
operators = [UcpBaseOperator]

View File

@ -21,9 +21,9 @@ from airflow.plugins_manager import AirflowPlugin
from airflow.utils.decorators import apply_defaults
try:
from service_endpoint import ucp_service_endpoint
import service_endpoint
except ImportError:
from shipyard_airflow.plugins.service_endpoint import ucp_service_endpoint
from shipyard_airflow.plugins import service_endpoint
try:
from xcom_puller import XcomPuller
@ -55,16 +55,18 @@ class UcpHealthCheckOperator(BaseOperator):
self.shipyard_conf = shipyard_conf
self.main_dag_name = main_dag_name
self.xcom_push_flag = xcom_push
self.endpoints = service_endpoint.ServiceEndpoints(self.shipyard_conf)
def execute(self, context):
# Initialize variable
ucp_components = [
'armada',
'deckhand',
'kubernetesprovisioner',
'physicalprovisioner',
'shipyard']
service_endpoint.ARMADA,
service_endpoint.DECKHAND,
service_endpoint.DRYDOCK,
service_endpoint.PROMENADE,
service_endpoint.SHIPYARD
]
# Define task_instance
self.task_instance = context['task_instance']
@ -80,19 +82,16 @@ class UcpHealthCheckOperator(BaseOperator):
for component in ucp_components:
# Retrieve Endpoint Information
service_endpoint = ucp_service_endpoint(self,
svc_type=component)
LOG.info("%s endpoint is %s", component, service_endpoint)
endpoint = self.endpoints.endpoint_by_name(component)
LOG.info("%s endpoint is %s", component, endpoint)
# Construct Health Check Endpoint
healthcheck_endpoint = os.path.join(service_endpoint,
healthcheck_endpoint = os.path.join(endpoint,
'health')
LOG.info("%s healthcheck endpoint is %s", component,
healthcheck_endpoint)
try:
LOG.info("Performing Health Check on %s", component)
LOG.info("Performing Health Check on %s at %s", component,
healthcheck_endpoint)
# Set health check timeout to 30 seconds
req = requests.get(healthcheck_endpoint, timeout=30)
@ -109,7 +108,7 @@ class UcpHealthCheckOperator(BaseOperator):
"""
# If Drydock health check fails and continue-on-fail, continue
# and create xcom key 'drydock_continue_on_fail'
if (component == 'physicalprovisioner' and
if (component == service_endpoint.DRYDOCK and
self.action_info['parameters'].get(
'continue-on-fail', 'false').lower() == 'true' and
self.action_info['dag_id'] in ['update_site', 'deploy_site']):

View File

@ -31,7 +31,7 @@ from shipyard_airflow.common.deployment_group.errors import (
from .node_lookup_stubs import node_lookup
_GROUPS_YAML = """
GROUPS_YAML = """
- name: control-nodes
critical: true
depends_on:
@ -121,7 +121,7 @@ _GROUPS_YAML = """
minimum_successful_nodes: 1
"""
_CYCLE_GROUPS_YAML = """
CYCLE_GROUPS_YAML = """
- name: group-a
critical: true
depends_on:
@ -148,7 +148,7 @@ _CYCLE_GROUPS_YAML = """
class TestDeploymentGroupManager:
def test_basic_class(self):
dgm = DeploymentGroupManager(yaml.safe_load(_GROUPS_YAML), node_lookup)
dgm = DeploymentGroupManager(yaml.safe_load(GROUPS_YAML), node_lookup)
assert dgm is not None
# topological sort doesn't guarantee a specific order.
assert dgm.get_next_group(Stage.PREPARED).name in ['ntp-node',
@ -160,7 +160,7 @@ class TestDeploymentGroupManager:
def test_cycle_error(self):
with pytest.raises(DeploymentGroupCycleError) as ce:
DeploymentGroupManager(yaml.safe_load(_CYCLE_GROUPS_YAML),
DeploymentGroupManager(yaml.safe_load(CYCLE_GROUPS_YAML),
node_lookup)
assert 'The following are involved' in str(ce)
for g in ['group-a', 'group-c', 'group-d']:
@ -168,11 +168,71 @@ class TestDeploymentGroupManager:
assert 'group-b' not in str(ce)
def test_no_next_group(self):
dgm = DeploymentGroupManager(yaml.safe_load(_GROUPS_YAML), node_lookup)
dgm = DeploymentGroupManager(yaml.safe_load(GROUPS_YAML), node_lookup)
assert dgm.get_next_group(Stage.DEPLOYED) is None
def test_group_list(self):
dgm = DeploymentGroupManager(yaml.safe_load(GROUPS_YAML), node_lookup)
assert len(dgm.group_list()) == 7
group_names = []
for group in dgm.group_list():
group_names.append(group.name)
assert group_names == dgm._group_order
def test_fail_unsuccessful_nodes(self):
dgm = DeploymentGroupManager(yaml.safe_load(GROUPS_YAML), node_lookup)
group = dgm._all_groups.get('control-nodes')
dgm.fail_unsuccessful_nodes(group, [])
assert not dgm.evaluate_group_succ_criteria('control-nodes',
Stage.DEPLOYED)
assert group.stage == Stage.FAILED
def test_reports(self, caplog):
dgm = DeploymentGroupManager(yaml.safe_load(GROUPS_YAML), node_lookup)
dgm.mark_node_deployed('node1')
dgm.mark_node_prepared('node2')
dgm.mark_node_failed('node3')
dgm.mark_group_prepared('control-nodes')
dgm.mark_group_deployed('control-nodes')
dgm.mark_group_prepared('compute-nodes-1')
dgm.mark_group_failed('compute-nodes-2')
dgm.report_group_summary()
assert "===== Group Summary =====" in caplog.text
assert ("Group ntp-node [Critical] ended with stage: "
"Stage.NOT_STARTED") in caplog.text
caplog.clear()
dgm.report_node_summary()
assert "Nodes Stage.PREPARED: node2" in caplog.text
assert "Nodes Stage.FAILED: node3" in caplog.text
assert "===== End Node Summary =====" in caplog.text
assert "It was the best of times" not in caplog.text
def test_evaluate_group_succ_criteria(self):
dgm = DeploymentGroupManager(yaml.safe_load(GROUPS_YAML), node_lookup)
group = dgm._all_groups.get('control-nodes')
nodes = ["node{}".format(i) for i in range(1, 12)]
for node in nodes:
dgm.mark_node_prepared(node)
dgm.fail_unsuccessful_nodes(group, nodes)
assert dgm.evaluate_group_succ_criteria('control-nodes',
Stage.PREPARED)
assert group.stage == Stage.PREPARED
for node in nodes:
dgm.mark_node_deployed(node)
assert dgm.evaluate_group_succ_criteria('control-nodes',
Stage.DEPLOYED)
assert group.stage == Stage.DEPLOYED
def test_critical_groups_failed(self):
dgm = DeploymentGroupManager(yaml.safe_load(GROUPS_YAML), node_lookup)
assert not dgm.critical_groups_failed()
dgm.mark_group_failed('control-nodes')
assert dgm.critical_groups_failed()
def test_ordering_stages_flow_failure(self):
dgm = DeploymentGroupManager(yaml.safe_load(_GROUPS_YAML), node_lookup)
dgm = DeploymentGroupManager(yaml.safe_load(GROUPS_YAML), node_lookup)
group = dgm.get_next_group(Stage.PREPARED)
if group.name == 'monitoring-nodes':
@ -198,24 +258,24 @@ class TestDeploymentGroupManager:
def test_deduplication(self):
"""all-compute-nodes is a duplicate of things it's dependent on, it
should have no actionable nodes"""
dgm = DeploymentGroupManager(yaml.safe_load(_GROUPS_YAML), node_lookup)
dgm = DeploymentGroupManager(yaml.safe_load(GROUPS_YAML), node_lookup)
acn = dgm._all_groups['all-compute-nodes']
assert len(acn.actionable_nodes) == 0
assert len(acn.full_nodes) == 6
def test_bad_group_name_lookup(self):
dgm = DeploymentGroupManager(yaml.safe_load(_GROUPS_YAML), node_lookup)
dgm = DeploymentGroupManager(yaml.safe_load(GROUPS_YAML), node_lookup)
with pytest.raises(UnknownDeploymentGroupError) as udge:
dgm.mark_group_prepared('Limburger Cheese')
assert "Group name Limburger Cheese does not refer" in str(udge)
def test_get_group_failures_for_stage_bad_input(self):
dgm = DeploymentGroupManager(yaml.safe_load(_GROUPS_YAML), node_lookup)
dgm = DeploymentGroupManager(yaml.safe_load(GROUPS_YAML), node_lookup)
with pytest.raises(DeploymentGroupStageError):
dgm.get_group_failures_for_stage('group1', Stage.FAILED)
def test_get_group_failures_for_stage(self):
dgm = DeploymentGroupManager(yaml.safe_load(_GROUPS_YAML), node_lookup)
dgm = DeploymentGroupManager(yaml.safe_load(GROUPS_YAML), node_lookup)
dgm._all_nodes = {'node%d' % x: Stage.DEPLOYED for x in range(1, 13)}
for group_name in dgm._all_groups:
@ -269,27 +329,27 @@ class TestDeploymentGroupManager:
'actual': 0}
def test_mark_node_deployed(self):
dgm = DeploymentGroupManager(yaml.safe_load(_GROUPS_YAML), node_lookup)
dgm = DeploymentGroupManager(yaml.safe_load(GROUPS_YAML), node_lookup)
dgm.mark_node_deployed('node1')
assert dgm.get_nodes(Stage.DEPLOYED) == ['node1']
def test_mark_node_prepared(self):
dgm = DeploymentGroupManager(yaml.safe_load(_GROUPS_YAML), node_lookup)
dgm = DeploymentGroupManager(yaml.safe_load(GROUPS_YAML), node_lookup)
dgm.mark_node_prepared('node1')
assert dgm.get_nodes(Stage.PREPARED) == ['node1']
def test_mark_node_failed(self):
dgm = DeploymentGroupManager(yaml.safe_load(_GROUPS_YAML), node_lookup)
dgm = DeploymentGroupManager(yaml.safe_load(GROUPS_YAML), node_lookup)
dgm.mark_node_failed('node1')
assert dgm.get_nodes(Stage.FAILED) == ['node1']
def test_mark_node_failed_unknown(self):
dgm = DeploymentGroupManager(yaml.safe_load(_GROUPS_YAML), node_lookup)
dgm = DeploymentGroupManager(yaml.safe_load(GROUPS_YAML), node_lookup)
with pytest.raises(UnknownNodeError):
dgm.mark_node_failed('not_node')
def test_get_nodes_all(self):
dgm = DeploymentGroupManager(yaml.safe_load(_GROUPS_YAML), node_lookup)
dgm = DeploymentGroupManager(yaml.safe_load(GROUPS_YAML), node_lookup)
assert set(dgm.get_nodes()) == set(
['node1', 'node2', 'node3', 'node4', 'node5', 'node6', 'node7',
'node8', 'node9', 'node10', 'node11', 'node12']

View File

@ -29,9 +29,9 @@ import tests.unit.common.deployment_group.test_deployment_group_manager as tdgm
def get_doc_returner(style, ds_name):
strategy = MagicMock()
if style == 'cycle':
strategy.data = {"groups": yaml.safe_load(tdgm._CYCLE_GROUPS_YAML)}
strategy.data = {"groups": yaml.safe_load(tdgm.CYCLE_GROUPS_YAML)}
elif style == 'clean':
strategy.data = {"groups": yaml.safe_load(tdgm._GROUPS_YAML)}
strategy.data = {"groups": yaml.safe_load(tdgm.GROUPS_YAML)}
def doc_returner(revision_id, rendered, **filters):
if not revision_id == 99:

View File

@ -0,0 +1,446 @@
# Copyright 2018 AT&T Intellectual Property. All other rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for drydock_nodes operator functions"""
import copy
import mock
import os
import yaml
import pytest
from airflow.exceptions import AirflowException
from shipyard_airflow.common.deployment_group.deployment_group import (
DeploymentGroup,
Stage
)
from shipyard_airflow.common.deployment_group.deployment_group_manager import (
DeploymentGroupManager
)
from shipyard_airflow.plugins.drydock_nodes import (
_default_deployment_strategy,
_gen_node_name_filter,
DrydockNodesOperator,
_process_deployment_groups,
QueryTaskResult
)
from shipyard_airflow.plugins.deployment_configuration_operator import (
DeploymentConfigurationOperator
)
import tests.unit.common.deployment_group.test_deployment_group_manager as tdgm
from tests.unit.common.deployment_group.node_lookup_stubs import node_lookup
CONF_FILE = os.path.join(os.path.dirname(__file__), 'test.conf')
def _fake_deployment_group_manager(cgf_bool):
def dgm_func(group_dict_list, node_lookup):
dgm_mock = mock.MagicMock()
dgm_mock.critical_groups_failed = mock.Mock(return_value=cgf_bool)
return dgm_mock
return dgm_func(None, None)
GROUP_DICT = {
'name': 'control-nodes',
'critical': True,
'depends_on': ['ntp-node'],
'selectors': [
{
'node_names': ['node1', 'node2', 'node3', 'node4', 'node5'],
'node_labels': [],
'node_tags': [],
'rack_names': [],
},
],
'success_criteria': {
'percent_successful_nodes': 90,
'minimum_successful_nodes': 3,
'maximum_failed_nodes': 1,
},
}
TASK_RESULT = QueryTaskResult('t1', 'tn')
TASK_RESULT.successes = ['node1', 'node2', 'node3']
# The top level result should have all successes specified
TASK_DICT = {
'0': {
'result': {
'successes': ['node1', 'node2', 'node3'],
'status': 'success',
},
'subtask_id_list': ['1'],
'status': 'complete'
},
'1': {
'result': {
'successes': ['node3'],
'status': 'success',
},
'subtask_id_list': ['2', '3'],
'status': 'complete'
},
'2': {
'result': {
'successes': ['node2'],
'status': 'success',
'details': {'messageList': [
{
'context': 'node2',
'context_type': 'node',
'error': False,
'extra': '{}',
'message': 'Warning node2 is slow',
'ts': '2018-06-14 22:41:08.195036'
},
{
'context': 'node2',
'context_type': 'node',
},
]},
},
'subtask_id_list': [],
'status': 'complete',
},
'3': {
'result': {
'status': 'success',
'details': {'messageList': [
{
'context': 'task 3',
'context_type': 'task',
'error': False,
'extra': '{}',
'message': 'Started subtask 3 for action apply_node_stuff',
'ts': '2018-06-14 22:41:08.195036'
},
{
'context': 'task 3',
'context_type': 'task',
'error': False,
'extra': '{}',
'message': 'Ended subtask 3 for action apply_node_stuff',
'ts': '2018-06-14 22:41:08.195036'
},
]},
},
'subtask_id_list': [],
'status': 'complete',
},
'99': {
'result': {
'status': 'failure',
'successes': ['node98', 'node97'],
'failures': ['node99'],
'details': {'messageList': [
{
'context': 'task 99',
'context_type': 'task',
'error': False,
'extra': '{}',
'message': 'Started subtask 99 for action do_things',
'ts': '2018-06-14 22:41:08.195036'
},
{
'context': 'task 99',
'context_type': 'task',
'error': True,
'extra': '{}',
'message': 'Task 99 broke things',
'ts': '2018-06-14 22:41:08.195036'
},
]},
},
'subtask_id_list': ['2'],
},
}
DEP_STRAT = {'groups': yaml.safe_load(tdgm.GROUPS_YAML)}
def _fake_setup_ds(self):
self.strategy = DEP_STRAT
def _fake_get_task_dict(task_id):
return TASK_DICT[task_id]
def _gen_pe_func(mode, stand_alone=False):
"""Gen a function to play the role of prepare or deploy function
:param mode: 'all-success', 'all-fail'
:param stand_alone: indicate to make this a "self" or non-self
function. During mocking for direct calls with this function,
stand_alone needs to be True. When patching the DrydockNodesOperator
object, it needs to be false, so that the right amount of "self"
matches the invocation.
"""
def _func(group):
qtr = QueryTaskResult('ti', 'tn')
if mode == 'all-success':
qtr.successes.extend(group.actionable_nodes)
if mode == 'all-fail':
# no new sucesses
pass
return qtr
def _func_self(self, group):
return _func(group)
if stand_alone:
return _func
else:
return _func_self
class TestDrydockNodesOperator:
def test_default_deployment_strategy(self):
"""Assert that the default deployment strategy is named default, is
critical, has no selector values, and an all-or-nothing success
criteria
"""
s = _default_deployment_strategy()
assert s['groups'][0]['name'] == 'default'
assert s['groups'][0]['critical']
assert s['groups'][0]['selectors'][0]['node_names'] == []
assert s['groups'][0]['selectors'][0]['node_labels'] == []
assert s['groups'][0]['selectors'][0]['node_tags'] == []
assert s['groups'][0]['selectors'][0]['rack_names'] == []
assert s['groups'][0]['success_criteria'] == {
'percent_successful_nodes': 100
}
def test_gen_node_name_filter(self):
"""Test that a node name filter with only node_names is created"""
nodes = ['node1', 'node2']
f = _gen_node_name_filter(nodes)
assert f['filter_set'][0]['node_names'] == nodes
assert len(f['filter_set']) == 1
def test_init_DrydockNodesOperator(self):
op = DrydockNodesOperator(main_dag_name="main",
shipyard_conf=CONF_FILE,
task_id="t1")
assert op is not None
@mock.patch.object(DrydockNodesOperator, "get_unique_doc")
def test_setup_deployment_strategy(self, udoc):
"""Assert that the base class method get_unique_doc would be invoked
"""
op = DrydockNodesOperator(main_dag_name="main",
shipyard_conf=CONF_FILE,
task_id="t1")
op.dc = copy.deepcopy(
DeploymentConfigurationOperator.config_keys_defaults
)
op.dc['physical_provisioner.deployment_strategy'] = 'taco-salad'
op._setup_deployment_strategy()
udoc.assert_called_once_with(
name='taco-salad',
schema="shipyard/DeploymentStrategy/v1"
)
@mock.patch("shipyard_airflow.plugins.drydock_nodes."
"_get_deployment_group_manager",
return_value=_fake_deployment_group_manager(cgf_bool=False))
@mock.patch("shipyard_airflow.plugins.drydock_nodes."
"_process_deployment_groups", return_value=True)
@mock.patch("shipyard_airflow.plugins.drydock_nodes._get_node_lookup",
return_value=mock.MagicMock())
def test_do_execute(self, nl, pdg, get_dgm, caplog):
op = DrydockNodesOperator(main_dag_name="main",
shipyard_conf=CONF_FILE,
task_id="t1")
op.dc = copy.deepcopy(
DeploymentConfigurationOperator.config_keys_defaults
)
op.design_ref = {}
op.do_execute()
get_dgm.assert_called_once()
nl.assert_called_once()
pdg.assert_called_once()
assert "critical groups have met their success criteria" in caplog.text
@mock.patch("shipyard_airflow.plugins.drydock_nodes."
"_get_deployment_group_manager",
return_value=_fake_deployment_group_manager(cgf_bool=True))
@mock.patch("shipyard_airflow.plugins.drydock_nodes."
"_process_deployment_groups", return_value=True)
@mock.patch("shipyard_airflow.plugins.drydock_nodes._get_node_lookup",
return_value=mock.MagicMock())
def test_do_execute_exception(self, nl, pdg, get_dgm):
op = DrydockNodesOperator(main_dag_name="main",
shipyard_conf=CONF_FILE,
task_id="t1")
with pytest.raises(AirflowException):
op.dc = copy.deepcopy(
DeploymentConfigurationOperator.config_keys_defaults
)
op.design_ref = {}
op.do_execute()
get_dgm.assert_called_once()
nl.assert_called_once()
pdg.assert_called_once()
def test_execute_prepare(self):
op = DrydockNodesOperator(main_dag_name="main",
shipyard_conf=CONF_FILE,
task_id="t1")
op.dc = copy.deepcopy(
DeploymentConfigurationOperator.config_keys_defaults
)
op._setup_configured_values()
op._execute_task = mock.MagicMock(return_value=TASK_RESULT)
group = DeploymentGroup(GROUP_DICT, mock.MagicMock())
group.actionable_nodes = ['node1', 'node2', 'node3']
op._execute_prepare(group)
op._execute_task.assert_called_once()
@mock.patch("shipyard_airflow.plugins.check_k8s_node_status."
"check_node_status", return_value=[])
def test_execute_deployment(self, cns):
op = DrydockNodesOperator(main_dag_name="main",
shipyard_conf=CONF_FILE,
task_id="t1")
op.dc = copy.deepcopy(
DeploymentConfigurationOperator.config_keys_defaults
)
op._setup_configured_values()
op._execute_task = mock.MagicMock(return_value=TASK_RESULT)
op.join_wait = 0
group = DeploymentGroup(GROUP_DICT, mock.MagicMock())
group.actionable_nodes = ['node1', 'node2', 'node3']
op._execute_deployment(group)
op._execute_task.assert_called_once()
cns.assert_called_once()
@mock.patch("shipyard_airflow.plugins.check_k8s_node_status."
"check_node_status", return_value=['node2', 'node4'])
def test_execute_deployment_k8s_fail(self, cns, caplog):
op = DrydockNodesOperator(main_dag_name="main",
shipyard_conf=CONF_FILE,
task_id="t1")
op.dc = copy.deepcopy(
DeploymentConfigurationOperator.config_keys_defaults
)
op._setup_configured_values()
op._execute_task = mock.MagicMock(return_value=TASK_RESULT)
op.join_wait = 0
group = DeploymentGroup(GROUP_DICT, mock.MagicMock())
group.actionable_nodes = ['node1', 'node2', 'node3']
task_res = op._execute_deployment(group)
op._execute_task.assert_called_once()
cns.assert_called_once()
assert 'node4 failed to join Kubernetes' in caplog.text
assert len(task_res.successes) == 2
def test_get_successess_for_task(self):
op = DrydockNodesOperator(main_dag_name="main",
shipyard_conf=CONF_FILE,
task_id="t1")
op.get_task_dict = _fake_get_task_dict
s = op._get_successes_for_task('0')
for i in range(1, 3):
assert "node{}".format(i) in s
def test_get_successess_for_task_more_logging(self):
op = DrydockNodesOperator(main_dag_name="main",
shipyard_conf=CONF_FILE,
task_id="t1")
op.get_task_dict = _fake_get_task_dict
s = op._get_successes_for_task('99')
for i in range(97, 98):
assert "node{}".format(i) in s
assert "node2" not in s
def test_process_deployment_groups(self):
"""Test the core processing loop of the drydock_nodes module"""
dgm = DeploymentGroupManager(
yaml.safe_load(tdgm.GROUPS_YAML),
node_lookup
)
_process_deployment_groups(
dgm,
_gen_pe_func('all-success', stand_alone=True),
_gen_pe_func('all-success', stand_alone=True))
assert not dgm.critical_groups_failed()
for group in dgm.group_list():
assert dgm.evaluate_group_succ_criteria(group.name, Stage.DEPLOYED)
def test_process_deployment_groups_dep_fail(self):
"""Test the core processing loop of the drydock_nodes module"""
dgm = DeploymentGroupManager(
yaml.safe_load(tdgm.GROUPS_YAML),
node_lookup
)
_process_deployment_groups(
dgm,
_gen_pe_func('all-success', stand_alone=True),
_gen_pe_func('all-fail', stand_alone=True))
assert dgm.critical_groups_failed()
for group in dgm.group_list():
assert group.stage == Stage.FAILED
dgm.report_group_summary()
dgm.report_node_summary()
def test_process_deployment_groups_prep_fail(self):
"""Test the core processing loop of the drydock_nodes module"""
dgm = DeploymentGroupManager(
yaml.safe_load(tdgm.GROUPS_YAML),
node_lookup
)
_process_deployment_groups(
dgm,
_gen_pe_func('all-fail', stand_alone=True),
_gen_pe_func('all-success', stand_alone=True))
assert dgm.critical_groups_failed()
for group in dgm.group_list():
assert group.stage == Stage.FAILED
dgm.report_group_summary()
dgm.report_node_summary()
@mock.patch("shipyard_airflow.plugins.drydock_nodes._get_node_lookup",
return_value=node_lookup)
@mock.patch.object(
DrydockNodesOperator,
'_execute_prepare',
new=_gen_pe_func('all-success')
)
@mock.patch.object(
DrydockNodesOperator,
'_execute_deployment',
new=_gen_pe_func('all-success')
)
@mock.patch.object(DrydockNodesOperator, '_setup_deployment_strategy',
new=_fake_setup_ds)
def test_do_execute_with_dgm(self, nl, caplog):
op = DrydockNodesOperator(main_dag_name="main",
shipyard_conf=CONF_FILE,
task_id="t1")
op.dc = copy.deepcopy(
DeploymentConfigurationOperator.config_keys_defaults
)
op.design_ref = {"a": "b"}
op.do_execute()
assert "critical groups have met their success criteria" in caplog.text
# TODO (bryan-strassner) test for _execute_task

View File

@ -11,7 +11,9 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
import os
import unittest.mock as mock
import pytest
from requests.models import Response
@ -22,10 +24,12 @@ from shipyard_airflow.plugins.ucp_preflight_check_operator import (
ucp_components = [
'armada',
'deckhand',
'kubernetesprovisioner',
'physicalprovisioner',
'promenade',
'drydock',
'shipyard']
CONF_FILE = os.path.join(os.path.dirname(__file__), 'test.conf')
def test_drydock_health_skip_update_site(caplog):
"""
@ -44,18 +48,18 @@ def test_drydock_health_skip_update_site(caplog):
"parameters": {"continue-on-fail": "true"}
}
op = UcpHealthCheckOperator(task_id='test')
op = UcpHealthCheckOperator(task_id='test', shipyard_conf=CONF_FILE)
op.action_info = action_info
op.xcom_pusher = mock.MagicMock()
op.log_health_exception('physicalprovisioner', req)
op.log_health_exception('drydock', req)
assert expected_log in caplog.text
action_info = {
"dag_id": "deploy_site",
"parameters": {"continue-on-fail": "true"}
}
op.log_health_exception('physicalprovisioner', req)
op.log_health_exception('drydock', req)
assert expected_log in caplog.text
@ -70,7 +74,7 @@ def test_failure_log_health():
req = Response()
req.status_code = None
op = UcpHealthCheckOperator(task_id='test')
op = UcpHealthCheckOperator(task_id='test', shipyard_conf=CONF_FILE)
op.action_info = action_info
op.xcom_pusher = mock.MagicMock()

View File

@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
set -ex
set -e
# User can run the script like how they would execute the Shipyard CLI.
# For instance, to run the 'shipyard get actions' command, user can execute
@ -33,7 +33,7 @@ set -ex
# Source Base Docker Command
DIR="$(realpath $(dirname "${BASH_SOURCE}"))"
source "${DIR}/shipyard_docker_base_command.sh"
SHIPYARD_HOSTPATH=${SHIPYARD_HOSTPATH:-"/home/shipyard/host"}
# Execute Shipyard CLI
#
# NOTE: We will mount the current directory so that any directories
@ -46,4 +46,4 @@ source "${DIR}/shipyard_docker_base_command.sh"
# the actual validation and execution. Exceptions will also be
# handled by the Shipyard CLI as this is meant to be a thin wrapper
# script
${base_docker_command} -v $(pwd):/home/shipyard/host ${SHIPYARD_IMAGE} $@
${base_docker_command} -v $(pwd):${SHIPYARD_HOSTPATH} ${SHIPYARD_IMAGE} $@