From a88a5cf15a81de32707fee49d7a626908a8c8f38 Mon Sep 17 00:00:00 2001 From: Bryan Strassner Date: Mon, 19 Feb 2018 11:51:16 -0600 Subject: [PATCH] Shipyard deployment configuration Puts into place the DeploymentConfiguration yaml that provides the options that should be configured by the site design to the deployment (and update) workflows. This change additionally refactors reused parts to common modules as related to info passing (xcom) Change-Id: Ib6470899b204dbc18d2a9a2e4f95540b3b0032b0 --- etc/shipyard/shipyard.conf.sample | 59 ++---- shipyard_airflow/conf/config.py | 67 ++----- .../control/action/actions_api.py | 10 +- shipyard_airflow/dags/armada_deploy_site.py | 5 +- shipyard_airflow/dags/common_step_factory.py | 175 +++++++++++++++++ shipyard_airflow/dags/config_path.py | 18 ++ .../dags/dag_deployment_configuration.py | 36 ++++ shipyard_airflow/dags/dag_names.py | 26 +++ shipyard_airflow/dags/deckhand_get_design.py | 6 +- shipyard_airflow/dags/deploy_site.py | 95 +++------- shipyard_airflow/dags/destroy_node.py | 6 +- shipyard_airflow/dags/drydock_deploy_site.py | 6 +- shipyard_airflow/dags/preflight_checks.py | 6 +- shipyard_airflow/dags/redeploy_server.py | 96 +++------- shipyard_airflow/dags/update_site.py | 87 +++------ shipyard_airflow/dags/validate_site_design.py | 5 +- shipyard_airflow/plugins/armada_operator.py | 38 ++-- .../plugins/concurrency_check_operator.py | 18 +- .../plugins/deckhand_base_operator.py | 20 +- .../plugins/deckhand_client_factory.py | 66 +++++++ .../deployment_configuration_operator.py | 178 ++++++++++++++++++ shipyard_airflow/plugins/drydock_operators.py | 128 +++++-------- .../plugins/promenade_base_operator.py | 27 +-- .../plugins/promenade_check_etcd.py | 4 + .../plugins/promenade_clear_labels.py | 4 + .../plugins/promenade_drain_node.py | 5 + shipyard_airflow/plugins/xcom_puller.py | 84 +++++++++ .../schemas/deploymentConfiguration.yaml | 76 ++++++++ test-requirements.txt | 4 + tests/unit/control/test.conf | 2 + tests/unit/plugins/test.conf | 15 ++ .../plugins/test_deckhand_client_factory.py | 29 +++ .../test_deployment_configuration_operator.py | 158 ++++++++++++++++ tests/unit/schemas/__init__.py | 0 .../schemas/test_deployment_configuration.py | 78 ++++++++ .../deploymentConfiguration_bad_manifest.yaml | 13 ++ .../deploymentConfiguration_full_valid.yaml | 31 +++ ...deploymentConfiguration_minimal_valid.yaml | 12 ++ tox.ini | 21 ++- 39 files changed, 1221 insertions(+), 493 deletions(-) create mode 100644 shipyard_airflow/dags/common_step_factory.py create mode 100644 shipyard_airflow/dags/config_path.py create mode 100644 shipyard_airflow/dags/dag_deployment_configuration.py create mode 100644 shipyard_airflow/dags/dag_names.py create mode 100644 shipyard_airflow/plugins/deckhand_client_factory.py create mode 100644 shipyard_airflow/plugins/deployment_configuration_operator.py create mode 100644 shipyard_airflow/plugins/xcom_puller.py create mode 100644 shipyard_airflow/schemas/deploymentConfiguration.yaml create mode 100644 tests/unit/plugins/test.conf create mode 100644 tests/unit/plugins/test_deckhand_client_factory.py create mode 100644 tests/unit/plugins/test_deployment_configuration_operator.py create mode 100644 tests/unit/schemas/__init__.py create mode 100644 tests/unit/schemas/test_deployment_configuration.py create mode 100644 tests/unit/yaml_samples/deploymentConfiguration_bad_manifest.yaml create mode 100644 tests/unit/yaml_samples/deploymentConfiguration_full_valid.yaml create mode 100644 tests/unit/yaml_samples/deploymentConfiguration_minimal_valid.yaml diff --git a/etc/shipyard/shipyard.conf.sample b/etc/shipyard/shipyard.conf.sample index 1c18d2cc..67fbb874 100644 --- a/etc/shipyard/shipyard.conf.sample +++ b/etc/shipyard/shipyard.conf.sample @@ -20,7 +20,13 @@ # # The web server for Airflow (string value) -#web_server = http://localhost:32080 +#web_server = http://localhost:32080/ + +# Seconds to wait to connect to the airflow api (integer value) +#airflow_api_connect_timeout = 5 + +# Seconds to wait for a response from the airflow api (integer value) +#airflow_api_read_timeout = 60 # The database for shipyard (string value) #postgresql_db = postgresql+psycopg2://shipyard:changeme@postgresql.ucp:5432/shipyard @@ -31,9 +37,6 @@ # The direcotry containing the alembic.ini file (string value) #alembic_ini_path = /home/shipyard/shipyard -# Upgrade the database on startup (boolean value) -#upgrade_db = true - [deckhand] @@ -58,39 +61,6 @@ # (string value) #service_type = physicalprovisioner -# Query interval (in seconds) for verify_site task (integer value) -#verify_site_query_interval = 10 - -# Time out (in seconds) for verify_site task (integer value) -#verify_site_task_timeout = 60 - -# Query interval (in seconds) for prepare_site task (integer value) -#prepare_site_query_interval = 10 - -# Time out (in seconds) for prepare_site task (integer value) -#prepare_site_task_timeout = 300 - -# Query interval (in seconds) for prepare_node task (integer value) -#prepare_node_query_interval = 30 - -# Time out (in seconds) for prepare_node task (integer value) -#prepare_node_task_timeout = 1800 - -# Query interval (in seconds) for deploy_node task (integer value) -#deploy_node_query_interval = 30 - -# Time out (in seconds) for deploy_node task (integer value) -#deploy_node_task_timeout = 3600 - -# Query interval (in seconds) for destroy_node task (integer value) -#destroy_node_query_interval = 30 - -# Time out (in seconds) for destroy_node task (integer value) -#destroy_node_task_timeout = 900 - -# Backoff time (in seconds) before checking cluster join (integer value) -#cluster_join_check_backoff_time = 120 - [keystone_authtoken] @@ -278,17 +248,22 @@ [requests_config] -# Deckhand client connect timeout (in seconds) + +# +# From shipyard_airflow +# + +# Deckhand client connect timeout (in seconds) (integer value) #deckhand_client_connect_timeout = 5 -# Deckhand client timeout (in seconds) for GET, -# PUT, POST and DELETE request +# Deckhand client timeout (in seconds) for GET, PUT, POST and DELETE request +# (integer value) #deckhand_client_read_timeout = 300 -# UCP component validation connect timeout (in seconds) +# UCP component validation connect timeout (in seconds) (integer value) #validation_connect_timeout = 5 -# UCP component validation timeout (in seconds) +# UCP component validation timeout (in seconds) (integer value) #validation_read_timeout = 300 diff --git a/shipyard_airflow/conf/config.py b/shipyard_airflow/conf/config.py index 76f8b43a..9a5fed93 100644 --- a/shipyard_airflow/conf/config.py +++ b/shipyard_airflow/conf/config.py @@ -32,6 +32,16 @@ SECTIONS = [ default='http://localhost:32080/', help='The web server for Airflow' ), + cfg.IntOpt( + 'airflow_api_connect_timeout', + default=5, + help='Seconds to wait to connect to the airflow api' + ), + cfg.IntOpt( + 'airflow_api_read_timeout', + default=60, + help='Seconds to wait for a response from the airflow api' + ), cfg.StrOpt( 'postgresql_db', default=( @@ -52,7 +62,7 @@ SECTIONS = [ 'alembic_ini_path', default='/home/shipyard/shipyard', help='The direcotry containing the alembic.ini file' - ) + ), ] ), ConfigSection( @@ -125,61 +135,6 @@ SECTIONS = [ 'the service lookup in the Keystone service catalog.' ) ), - cfg.IntOpt( - 'verify_site_query_interval', - default=10, - help='Query interval (in seconds) for verify_site task' - ), - cfg.IntOpt( - 'verify_site_task_timeout', - default=60, - help='Time out (in seconds) for verify_site task' - ), - cfg.IntOpt( - 'prepare_site_query_interval', - default=10, - help='Query interval (in seconds) for prepare_site task' - ), - cfg.IntOpt( - 'prepare_site_task_timeout', - default=300, - help='Time out (in seconds) for prepare_site task' - ), - cfg.IntOpt( - 'prepare_node_query_interval', - default=30, - help='Query interval (in seconds) for prepare_node task' - ), - cfg.IntOpt( - 'prepare_node_task_timeout', - default=1800, - help='Time out (in seconds) for prepare_node task' - ), - cfg.IntOpt( - 'deploy_node_query_interval', - default=30, - help='Query interval (in seconds) for deploy_node task' - ), - cfg.IntOpt( - 'deploy_node_task_timeout', - default=3600, - help='Time out (in seconds) for deploy_node task' - ), - cfg.IntOpt( - 'destroy_node_query_interval', - default=30, - help='Query interval (in seconds) for destroy_node task' - ), - cfg.IntOpt( - 'destroy_node_task_timeout', - default=900, - help='Time out (in seconds) for destroy_node task' - ), - cfg.IntOpt( - 'cluster_join_check_backoff_time', - default=120, - help='Backoff time (in seconds) before checking cluster join' - ), ] ), ConfigSection( diff --git a/shipyard_airflow/control/action/actions_api.py b/shipyard_airflow/control/action/actions_api.py index 760195d5..9f44971a 100644 --- a/shipyard_airflow/control/action/actions_api.py +++ b/shipyard_airflow/control/action/actions_api.py @@ -221,8 +221,13 @@ class ActionsResource(BaseResource): :param dag_id: the name of the dag to invoke :param action: the action structure to invoke the dag with """ + # TODO(bryan-strassner) refactor the mechanics of this method to an + # airflow api client module + # Retrieve URL web_server_url = CONF.base.web_server + c_timeout = CONF.base.airflow_api_connect_timeout + r_timeout = CONF.base.airflow_api_read_timeout if 'Error' in web_server_url: raise ApiError( @@ -232,7 +237,6 @@ class ActionsResource(BaseResource): 'value'), status=falcon.HTTP_503, retry=True, ) - else: conf_value = {'action': action} # "conf" - JSON string that gets pickled into the DagRun's @@ -242,7 +246,7 @@ class ActionsResource(BaseResource): dag_id, self.to_json(conf_value))) try: - resp = requests.get(req_url, timeout=(5, 15)) + resp = requests.get(req_url, timeout=(c_timeout, r_timeout)) LOG.info('Response code from Airflow trigger_dag: %s', resp.status_code) # any 4xx/5xx will be HTTPError, which are RequestException @@ -268,6 +272,8 @@ class ActionsResource(BaseResource): return dag_execution_date def _exhume_date(self, dag_id, log_string): + # TODO(bryan-strassner) refactor this to an airflow api client module + # we are unable to use the response time because that # does not match the time when the dag was recorded. # We have to parse the stdout returned to find the diff --git a/shipyard_airflow/dags/armada_deploy_site.py b/shipyard_airflow/dags/armada_deploy_site.py index 33aea9c2..7839f740 100644 --- a/shipyard_airflow/dags/armada_deploy_site.py +++ b/shipyard_airflow/dags/armada_deploy_site.py @@ -15,10 +15,7 @@ from airflow.models import DAG from airflow.operators import ArmadaOperator -# Location of shiyard.conf -# Note that the shipyard.conf file needs to be placed on a volume -# that can be accessed by the containers -config_path = '/usr/local/airflow/plugins/shipyard.conf' +from config_path import config_path def deploy_site_armada(parent_dag_name, child_dag_name, args): diff --git a/shipyard_airflow/dags/common_step_factory.py b/shipyard_airflow/dags/common_step_factory.py new file mode 100644 index 00000000..5fab959f --- /dev/null +++ b/shipyard_airflow/dags/common_step_factory.py @@ -0,0 +1,175 @@ +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from airflow.operators import ConcurrencyCheckOperator +from airflow.operators.python_operator import PythonOperator +from airflow.operators.subdag_operator import SubDagOperator + +from armada_deploy_site import deploy_site_armada +import dag_names as dn +from deckhand_get_design import get_design_deckhand +from destroy_node import destroy_server +from drydock_deploy_site import deploy_site_drydock +from failure_handlers import step_failure_handler +from dag_deployment_configuration import get_deployment_configuration +from preflight_checks import all_preflight_checks +from validate_site_design import validate_site_design + + +class CommonStepFactory(object): + """Common step factory + + A factory to generate steps that are reused among multiple dags + """ + def __init__(self, parent_dag_name, dag, default_args): + """Creates a factory + + Uses the specified parent_dag_name + """ + self.parent_dag_name = parent_dag_name + self.dag = dag + self.default_args = default_args + + def get_action_xcom(self, task_id=dn.ACTION_XCOM): + """Generate the action_xcom step + + Step responsible for getting the action information passed + by the invocation of the dag, which includes any options. + """ + def xcom_push(**kwargs): + """xcom_push function + + Defines a push function to store the content of 'action' that is + defined via 'dag_run' in XCOM so that it can be used by the + Operators + """ + + kwargs['ti'].xcom_push(key='action', + value=kwargs['dag_run'].conf['action']) + + return PythonOperator(task_id=task_id, + dag=self.dag, + python_callable=xcom_push) + + def get_concurrency_check(self, task_id=dn.DAG_CONCURRENCY_CHECK_DAG_NAME): + """Generate the concurrency check step + + Concurrency check prevents simultaneous execution of dags that should + not execute together. + """ + return ConcurrencyCheckOperator( + task_id=task_id, + on_failure_callback=step_failure_handler, + dag=self.dag) + + def get_preflight(self, task_id=dn.ALL_PREFLIGHT_CHECKS_DAG_NAME): + """Generate the preflight step + + Preflight checks preconditions for running a DAG + """ + return SubDagOperator( + subdag=all_preflight_checks( + self.parent_dag_name, + task_id, + args=self.default_args), + task_id=task_id, + on_failure_callback=step_failure_handler, + dag=self.dag) + + def get_get_design_version(self, task_id=dn.DECKHAND_GET_DESIGN_VERSION): + """Generate the get design version step + + Retrieves the version of the design to use from deckhand + """ + return SubDagOperator( + subdag=get_design_deckhand( + self.parent_dag_name, + task_id, + args=self.default_args), + task_id=task_id, + on_failure_callback=step_failure_handler, + dag=self.dag) + + def get_validate_site_design(self, + task_id=dn.VALIDATE_SITE_DESIGN_DAG_NAME): + """Generate the validate site design step + + Validation of the site design checks that the design to be used + for a deployment passes checks before using it. + """ + return SubDagOperator( + subdag=validate_site_design( + self.parent_dag_name, + task_id, + args=self.default_args), + task_id=task_id, + on_failure_callback=step_failure_handler, + dag=self.dag) + + def get_deployment_configuration(self, + task_id=dn.GET_DEPLOY_CONF_DAG_NAME): + """Generate the step to retrieve the deployment configuration + + This step provides the timings and strategies that will be used in + subsequent steps + """ + return SubDagOperator( + subdag=get_deployment_configuration( + self.parent_dag_name, + task_id, + args=self.default_args), + task_id=task_id, + on_failure_callback=step_failure_handler, + dag=self.dag) + + def get_drydock_build(self, task_id=dn.DRYDOCK_BUILD_DAG_NAME): + """Generate the drydock build step + + Drydock build does the hardware provisioning. + """ + return SubDagOperator( + subdag=deploy_site_drydock( + self.parent_dag_name, + task_id, + args=self.default_args), + task_id=task_id, + on_failure_callback=step_failure_handler, + dag=self.dag) + + def get_armada_build(self, task_id=dn.ARMADA_BUILD_DAG_NAME): + """Generate the armada build step + + Armada build does the deployment of helm charts + """ + return SubDagOperator( + subdag=deploy_site_armada( + self.parent_dag_name, + task_id, + args=self.default_args), + task_id=task_id, + on_failure_callback=step_failure_handler, + dag=self.dag) + + def get_destroy_server(self, task_id=dn.DESTROY_SERVER_DAG_NAME): + """Generate a destroy server step + + Destroy server tears down kubernetes and hardware + """ + return SubDagOperator( + subdag=destroy_server( + self.parent_dag_name, + task_id, + args=self.default_args), + task_id=task_id, + on_failure_callback=step_failure_handler, + dag=self.dag) diff --git a/shipyard_airflow/dags/config_path.py b/shipyard_airflow/dags/config_path.py new file mode 100644 index 00000000..514e419d --- /dev/null +++ b/shipyard_airflow/dags/config_path.py @@ -0,0 +1,18 @@ +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Location of shiyard.conf +# Note that the shipyard.conf file needs to be placed on a volume +# that can be accessed by the containers +config_path = '/usr/local/airflow/plugins/shipyard.conf' diff --git a/shipyard_airflow/dags/dag_deployment_configuration.py b/shipyard_airflow/dags/dag_deployment_configuration.py new file mode 100644 index 00000000..e595186a --- /dev/null +++ b/shipyard_airflow/dags/dag_deployment_configuration.py @@ -0,0 +1,36 @@ +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from airflow.models import DAG +from airflow.operators import DeploymentConfigurationOperator + +from config_path import config_path + + +GET_DEPLOYMENT_CONFIGURATION_NAME = 'get_deployment_configuration' + + +def get_deployment_configuration(parent_dag_name, child_dag_name, args): + """DAG to retrieve deployment configuration""" + dag = DAG( + '{}.{}'.format(parent_dag_name, child_dag_name), + default_args=args) + + deployment_configuration = DeploymentConfigurationOperator( + task_id=GET_DEPLOYMENT_CONFIGURATION_NAME, + shipyard_conf=config_path, + main_dag_name=parent_dag_name, + dag=dag) + + return dag diff --git a/shipyard_airflow/dags/dag_names.py b/shipyard_airflow/dags/dag_names.py new file mode 100644 index 00000000..b1bcad88 --- /dev/null +++ b/shipyard_airflow/dags/dag_names.py @@ -0,0 +1,26 @@ +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Subdags +ALL_PREFLIGHT_CHECKS_DAG_NAME = 'preflight' +ARMADA_BUILD_DAG_NAME = 'armada_build' +DAG_CONCURRENCY_CHECK_DAG_NAME = 'dag_concurrency_check' +DECKHAND_GET_DESIGN_VERSION = 'deckhand_get_design_version' +GET_DEPLOY_CONF_DAG_NAME = 'dag_deployment_configuration' +DRYDOCK_BUILD_DAG_NAME = 'drydock_build' +VALIDATE_SITE_DESIGN_DAG_NAME = 'validate_site_design' +DESTROY_SERVER_DAG_NAME = 'destroy_server' + +# Steps +ACTION_XCOM = 'action_xcom' diff --git a/shipyard_airflow/dags/deckhand_get_design.py b/shipyard_airflow/dags/deckhand_get_design.py index e8d71a8e..138e23e5 100644 --- a/shipyard_airflow/dags/deckhand_get_design.py +++ b/shipyard_airflow/dags/deckhand_get_design.py @@ -16,11 +16,7 @@ from airflow.models import DAG from airflow.operators import DeckhandGetDesignOperator from airflow.operators import DeckhandRetrieveRenderedDocOperator - -# Location of shiyard.conf -# Note that the shipyard.conf file needs to be placed on a volume -# that can be accessed by the containers -config_path = '/usr/local/airflow/plugins/shipyard.conf' +from config_path import config_path def get_design_deckhand(parent_dag_name, child_dag_name, args): diff --git a/shipyard_airflow/dags/deploy_site.py b/shipyard_airflow/dags/deploy_site.py index 2dbc4e31..fa40767f 100644 --- a/shipyard_airflow/dags/deploy_site.py +++ b/shipyard_airflow/dags/deploy_site.py @@ -1,4 +1,4 @@ -# Copyright 2017 AT&T Intellectual Property. All other rights reserved. +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,29 +14,16 @@ from datetime import timedelta import airflow -import failure_handlers from airflow import DAG -from airflow.operators import ConcurrencyCheckOperator -from airflow.operators.python_operator import PythonOperator -from airflow.operators.subdag_operator import SubDagOperator -from armada_deploy_site import deploy_site_armada -from deckhand_get_design import get_design_deckhand -from drydock_deploy_site import deploy_site_drydock -from preflight_checks import all_preflight_checks -from validate_site_design import validate_site_design -""" -deploy_site is the top-level orchestration DAG for deploying a site using the -Undercloud platform. +from common_step_factory import CommonStepFactory +"""deploy_site + +the top-level orchestration DAG for deploying a site using the Undercloud +platform. """ -ALL_PREFLIGHT_CHECKS_DAG_NAME = 'preflight' -ARMADA_BUILD_DAG_NAME = 'armada_build' -DAG_CONCURRENCY_CHECK_DAG_NAME = 'dag_concurrency_check' -DECKHAND_GET_DESIGN_VERSION = 'deckhand_get_design_version' -DRYDOCK_BUILD_DAG_NAME = 'drydock_build' PARENT_DAG_NAME = 'deploy_site' -VALIDATE_SITE_DESIGN_DAG_NAME = 'validate_site_design' default_args = { 'owner': 'airflow', @@ -51,66 +38,28 @@ default_args = { } dag = DAG(PARENT_DAG_NAME, default_args=default_args, schedule_interval=None) -""" -Define push function to store the content of 'action' that is -defined via 'dag_run' in XCOM so that it can be used by the -Operators -""" +step_factory = CommonStepFactory(parent_dag_name=PARENT_DAG_NAME, + dag=dag, + default_args=default_args) -def xcom_push(**kwargs): - # Pushes action XCom - kwargs['ti'].xcom_push(key='action', - value=kwargs['dag_run'].conf['action']) - - -action_xcom = PythonOperator( - task_id='action_xcom', dag=dag, python_callable=xcom_push) - -concurrency_check = ConcurrencyCheckOperator( - task_id=DAG_CONCURRENCY_CHECK_DAG_NAME, - on_failure_callback=failure_handlers.step_failure_handler, - dag=dag) - -preflight = SubDagOperator( - subdag=all_preflight_checks( - PARENT_DAG_NAME, ALL_PREFLIGHT_CHECKS_DAG_NAME, args=default_args), - task_id=ALL_PREFLIGHT_CHECKS_DAG_NAME, - on_failure_callback=failure_handlers.step_failure_handler, - dag=dag) - -get_design_version = SubDagOperator( - subdag=get_design_deckhand( - PARENT_DAG_NAME, DECKHAND_GET_DESIGN_VERSION, args=default_args), - task_id=DECKHAND_GET_DESIGN_VERSION, - on_failure_callback=failure_handlers.step_failure_handler, - dag=dag) - -validate_site_design = SubDagOperator( - subdag=validate_site_design( - PARENT_DAG_NAME, VALIDATE_SITE_DESIGN_DAG_NAME, args=default_args), - task_id=VALIDATE_SITE_DESIGN_DAG_NAME, - on_failure_callback=failure_handlers.step_failure_handler, - dag=dag) - -drydock_build = SubDagOperator( - subdag=deploy_site_drydock( - PARENT_DAG_NAME, DRYDOCK_BUILD_DAG_NAME, args=default_args), - task_id=DRYDOCK_BUILD_DAG_NAME, - on_failure_callback=failure_handlers.step_failure_handler, - dag=dag) - -armada_build = SubDagOperator( - subdag=deploy_site_armada( - PARENT_DAG_NAME, ARMADA_BUILD_DAG_NAME, args=default_args), - task_id=ARMADA_BUILD_DAG_NAME, - on_failure_callback=failure_handlers.step_failure_handler, - dag=dag) +action_xcom = step_factory.get_action_xcom() +concurrency_check = step_factory.get_concurrency_check() +preflight = step_factory.get_preflight() +get_design_version = step_factory.get_get_design_version() +validate_site_design = step_factory.get_validate_site_design() +deployment_configuration = step_factory.get_deployment_configuration() +drydock_build = step_factory.get_drydock_build() +armada_build = step_factory.get_armada_build() # DAG Wiring concurrency_check.set_upstream(action_xcom) preflight.set_upstream(concurrency_check) get_design_version.set_upstream(preflight) validate_site_design.set_upstream(get_design_version) -drydock_build.set_upstream(validate_site_design) +deployment_configuration.set_upstream(get_design_version) +drydock_build.set_upstream([ + validate_site_design, + deployment_configuration +]) armada_build.set_upstream(drydock_build) diff --git a/shipyard_airflow/dags/destroy_node.py b/shipyard_airflow/dags/destroy_node.py index 4c8ade20..ea7f5a0e 100644 --- a/shipyard_airflow/dags/destroy_node.py +++ b/shipyard_airflow/dags/destroy_node.py @@ -20,11 +20,7 @@ from airflow.operators import PromenadeDecommissionNodeOperator from airflow.operators import PromenadeDrainNodeOperator from airflow.operators import PromenadeShutdownKubeletOperator - -# Location of shiyard.conf -# Note that the shipyard.conf file needs to be placed on a volume -# that can be accessed by the containers -config_path = '/usr/local/airflow/plugins/shipyard.conf' +from config_path import config_path def destroy_server(parent_dag_name, child_dag_name, args): diff --git a/shipyard_airflow/dags/drydock_deploy_site.py b/shipyard_airflow/dags/drydock_deploy_site.py index 0dd9bae8..651c320e 100644 --- a/shipyard_airflow/dags/drydock_deploy_site.py +++ b/shipyard_airflow/dags/drydock_deploy_site.py @@ -15,11 +15,7 @@ from airflow.models import DAG from airflow.operators import DryDockOperator - -# Location of shiyard.conf -# Note that the shipyard.conf file needs to be placed on a volume -# that can be accessed by the containers -config_path = '/usr/local/airflow/plugins/shipyard.conf' +from config_path import config_path def deploy_site_drydock(parent_dag_name, child_dag_name, args): diff --git a/shipyard_airflow/dags/preflight_checks.py b/shipyard_airflow/dags/preflight_checks.py index 83bff791..121c0a18 100644 --- a/shipyard_airflow/dags/preflight_checks.py +++ b/shipyard_airflow/dags/preflight_checks.py @@ -16,11 +16,7 @@ from airflow.models import DAG from airflow.operators import K8sHealthCheckOperator from airflow.operators import UcpHealthCheckOperator - -# Location of shiyard.conf -# Note that the shipyard.conf file needs to be placed on a volume -# that can be accessed by the containers -config_path = '/usr/local/airflow/plugins/shipyard.conf' +from config_path import config_path def all_preflight_checks(parent_dag_name, child_dag_name, args): diff --git a/shipyard_airflow/dags/redeploy_server.py b/shipyard_airflow/dags/redeploy_server.py index 5d0d25aa..32da66c0 100644 --- a/shipyard_airflow/dags/redeploy_server.py +++ b/shipyard_airflow/dags/redeploy_server.py @@ -1,4 +1,4 @@ -# Copyright 2017 AT&T Intellectual Property. All other rights reserved. +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,29 +14,16 @@ from datetime import timedelta import airflow -import failure_handlers from airflow import DAG -from airflow.operators import ConcurrencyCheckOperator -from airflow.operators.python_operator import PythonOperator -from airflow.operators.subdag_operator import SubDagOperator -from deckhand_get_design import get_design_deckhand -from destroy_node import destroy_server -from drydock_deploy_site import deploy_site_drydock -from preflight_checks import all_preflight_checks -from validate_site_design import validate_site_design -""" -redeploy_server is the top-level orchestration DAG for redeploying a -server using the Undercloud platform. +from common_step_factory import CommonStepFactory +"""redeploy_server + +The top-level orchestration DAG for redeploying a server using the Undercloud +platform. """ -ALL_PREFLIGHT_CHECKS_DAG_NAME = 'preflight' -DAG_CONCURRENCY_CHECK_DAG_NAME = 'dag_concurrency_check' -DECKHAND_GET_DESIGN_VERSION = 'deckhand_get_design_version' -DESTROY_SERVER_DAG_NAME = 'destroy_server' -DRYDOCK_BUILD_DAG_NAME = 'drydock_build' PARENT_DAG_NAME = 'redeploy_server' -VALIDATE_SITE_DESIGN_DAG_NAME = 'validate_site_design' default_args = { 'owner': 'airflow', @@ -51,66 +38,29 @@ default_args = { } dag = DAG(PARENT_DAG_NAME, default_args=default_args, schedule_interval=None) -""" -Define push function to store the content of 'action' that is -defined via 'dag_run' in XCOM so that it can be used by the -Operators -""" + +step_factory = CommonStepFactory(parent_dag_name=PARENT_DAG_NAME, + dag=dag, + default_args=default_args) -def xcom_push(**kwargs): - # Pushes action XCom - kwargs['ti'].xcom_push(key='action', - value=kwargs['dag_run'].conf['action']) - - -action_xcom = PythonOperator( - task_id='action_xcom', dag=dag, python_callable=xcom_push) - -concurrency_check = ConcurrencyCheckOperator( - task_id=DAG_CONCURRENCY_CHECK_DAG_NAME, - on_failure_callback=failure_handlers.step_failure_handler, - dag=dag) - -preflight = SubDagOperator( - subdag=all_preflight_checks( - PARENT_DAG_NAME, ALL_PREFLIGHT_CHECKS_DAG_NAME, args=default_args), - task_id=ALL_PREFLIGHT_CHECKS_DAG_NAME, - on_failure_callback=failure_handlers.step_failure_handler, - dag=dag) - -get_design_version = SubDagOperator( - subdag=get_design_deckhand( - PARENT_DAG_NAME, DECKHAND_GET_DESIGN_VERSION, args=default_args), - task_id=DECKHAND_GET_DESIGN_VERSION, - on_failure_callback=failure_handlers.step_failure_handler, - dag=dag) - -validate_site_design = SubDagOperator( - subdag=validate_site_design( - PARENT_DAG_NAME, VALIDATE_SITE_DESIGN_DAG_NAME, args=default_args), - task_id=VALIDATE_SITE_DESIGN_DAG_NAME, - on_failure_callback=failure_handlers.step_failure_handler, - dag=dag) - -destroy_server = SubDagOperator( - subdag=destroy_server( - PARENT_DAG_NAME, DESTROY_SERVER_DAG_NAME, args=default_args), - task_id=DESTROY_SERVER_DAG_NAME, - on_failure_callback=failure_handlers.step_failure_handler, - dag=dag) - -drydock_build = SubDagOperator( - subdag=deploy_site_drydock( - PARENT_DAG_NAME, DRYDOCK_BUILD_DAG_NAME, args=default_args), - task_id=DRYDOCK_BUILD_DAG_NAME, - on_failure_callback=failure_handlers.step_failure_handler, - dag=dag) +action_xcom = step_factory.get_action_xcom() +concurrency_check = step_factory.get_concurrency_check() +preflight = step_factory.get_preflight() +get_design_version = step_factory.get_get_design_version() +validate_site_design = step_factory.get_validate_site_design() +deployment_configuration = step_factory.get_deployment_configuration() +destroy_server = step_factory.get_destroy_server() +drydock_build = step_factory.get_drydock_build() # DAG Wiring concurrency_check.set_upstream(action_xcom) preflight.set_upstream(concurrency_check) get_design_version.set_upstream(preflight) validate_site_design.set_upstream(get_design_version) -destroy_server.set_upstream(validate_site_design) +deployment_configuration.set_upstream(get_design_version) +destroy_server.set_upstream([ + validate_site_design, + deployment_configuration +]) drydock_build.set_upstream(destroy_server) diff --git a/shipyard_airflow/dags/update_site.py b/shipyard_airflow/dags/update_site.py index 2df71004..13dad988 100644 --- a/shipyard_airflow/dags/update_site.py +++ b/shipyard_airflow/dags/update_site.py @@ -1,4 +1,4 @@ -# Copyright 2017 AT&T Intellectual Property. All other rights reserved. +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,32 +14,21 @@ from datetime import timedelta import airflow -import failure_handlers from airflow import DAG -from airflow.operators import ConcurrencyCheckOperator -from airflow.operators.python_operator import PythonOperator -from airflow.operators.subdag_operator import SubDagOperator -from armada_deploy_site import deploy_site_armada -from deckhand_get_design import get_design_deckhand -from drydock_deploy_site import deploy_site_drydock -from validate_site_design import validate_site_design -""" -update_site is the top-level orchestration DAG for updating a site using the -Undercloud platform. +from common_step_factory import CommonStepFactory + +"""update_site + +The top-level orchestration DAG for updating a site using the Undercloud +platform. TODO: We will disable pre-flight checks for now and will revisit it at a later date. The pre-flight checks will be more targeted in the case of 'update_site' and will include specific checks on things like coredns, calico and ceph. """ - -ARMADA_BUILD_DAG_NAME = 'armada_build' -DAG_CONCURRENCY_CHECK_DAG_NAME = 'dag_concurrency_check' -DECKHAND_GET_DESIGN_VERSION = 'deckhand_get_design_version' -DRYDOCK_BUILD_DAG_NAME = 'drydock_build' PARENT_DAG_NAME = 'update_site' -VALIDATE_SITE_DESIGN_DAG_NAME = 'validate_site_design' default_args = { 'owner': 'airflow', @@ -54,58 +43,26 @@ default_args = { } dag = DAG(PARENT_DAG_NAME, default_args=default_args, schedule_interval=None) -""" -Define push function to store the content of 'action' that is -defined via 'dag_run' in XCOM so that it can be used by the -Operators -""" +step_factory = CommonStepFactory(parent_dag_name=PARENT_DAG_NAME, + dag=dag, + default_args=default_args) -def xcom_push(**kwargs): - # Pushes action XCom - kwargs['ti'].xcom_push(key='action', - value=kwargs['dag_run'].conf['action']) - - -action_xcom = PythonOperator( - task_id='action_xcom', dag=dag, python_callable=xcom_push) - -concurrency_check = ConcurrencyCheckOperator( - task_id=DAG_CONCURRENCY_CHECK_DAG_NAME, - on_failure_callback=failure_handlers.step_failure_handler, - dag=dag) - -get_design_version = SubDagOperator( - subdag=get_design_deckhand( - PARENT_DAG_NAME, DECKHAND_GET_DESIGN_VERSION, args=default_args), - task_id=DECKHAND_GET_DESIGN_VERSION, - on_failure_callback=failure_handlers.step_failure_handler, - dag=dag) - -validate_site_design = SubDagOperator( - subdag=validate_site_design( - PARENT_DAG_NAME, VALIDATE_SITE_DESIGN_DAG_NAME, args=default_args), - task_id=VALIDATE_SITE_DESIGN_DAG_NAME, - on_failure_callback=failure_handlers.step_failure_handler, - dag=dag) - -drydock_build = SubDagOperator( - subdag=deploy_site_drydock( - PARENT_DAG_NAME, DRYDOCK_BUILD_DAG_NAME, args=default_args), - task_id=DRYDOCK_BUILD_DAG_NAME, - on_failure_callback=failure_handlers.step_failure_handler, - dag=dag) - -armada_build = SubDagOperator( - subdag=deploy_site_armada( - PARENT_DAG_NAME, ARMADA_BUILD_DAG_NAME, args=default_args), - task_id=ARMADA_BUILD_DAG_NAME, - on_failure_callback=failure_handlers.step_failure_handler, - dag=dag) +action_xcom = step_factory.get_action_xcom() +concurrency_check = step_factory.get_concurrency_check() +get_design_version = step_factory.get_get_design_version() +validate_site_design = step_factory.get_validate_site_design() +deployment_configuration = step_factory.get_deployment_configuration() +drydock_build = step_factory.get_drydock_build() +armada_build = step_factory.get_armada_build() # DAG Wiring concurrency_check.set_upstream(action_xcom) get_design_version.set_upstream(concurrency_check) validate_site_design.set_upstream(get_design_version) -drydock_build.set_upstream(validate_site_design) +deployment_configuration.set_upstream(get_design_version) +drydock_build.set_upstream([ + validate_site_design, + deployment_configuration +]) armada_build.set_upstream(drydock_build) diff --git a/shipyard_airflow/dags/validate_site_design.py b/shipyard_airflow/dags/validate_site_design.py index 37d8d22f..9778a1c6 100644 --- a/shipyard_airflow/dags/validate_site_design.py +++ b/shipyard_airflow/dags/validate_site_design.py @@ -17,10 +17,7 @@ from airflow.operators import ArmadaOperator from airflow.operators import DeckhandValidateSiteDesignOperator from airflow.operators import DryDockOperator -# Location of shiyard.conf -# Note that the shipyard.conf file needs to be placed on a volume -# that can be accessed by the containers -config_path = '/usr/local/airflow/plugins/shipyard.conf' +from config_path import config_path def validate_site_design(parent_dag_name, child_dag_name, args): diff --git a/shipyard_airflow/plugins/armada_operator.py b/shipyard_airflow/plugins/armada_operator.py index 683e7119..fc0b94c8 100644 --- a/shipyard_airflow/plugins/armada_operator.py +++ b/shipyard_airflow/plugins/armada_operator.py @@ -28,6 +28,7 @@ import armada.common.session as session from get_k8s_pod_port_ip import get_pod_port_ip from service_endpoint import ucp_service_endpoint from service_token import shipyard_service_token +from xcom_puller import XcomPuller class ArmadaOperator(BaseOperator): @@ -37,6 +38,9 @@ class ArmadaOperator(BaseOperator): :param main_dag_name: Parent Dag :param shipyard_conf: Location of shipyard.conf :param sub_dag_name: Child Dag + + The Drydock operator assumes that prior steps have set xcoms for + the action and the deployment configuration """ @apply_defaults @@ -46,7 +50,6 @@ class ArmadaOperator(BaseOperator): shipyard_conf=None, svc_token=None, sub_dag_name=None, - workflow_info={}, xcom_push=True, *args, **kwargs): @@ -56,7 +59,6 @@ class ArmadaOperator(BaseOperator): self.shipyard_conf = shipyard_conf self.svc_token = svc_token self.sub_dag_name = sub_dag_name - self.workflow_info = workflow_info self.xcom_push_flag = xcom_push def execute(self, context): @@ -67,16 +69,12 @@ class ArmadaOperator(BaseOperator): # Define task_instance task_instance = context['task_instance'] - # Extract information related to current workflow - # The workflow_info variable will be a dictionary - # that contains information about the workflow such - # as action_id, name and other related parameters - workflow_info = task_instance.xcom_pull( - task_ids='action_xcom', key='action', - dag_id=self.main_dag_name) + # Set up and retrieve values from xcom + self.xcom_puller = XcomPuller(self.main_dag_name, task_instance) + self.action_info = self.xcom_puller.get_action_info() # Logs uuid of action performed by the Operator - logging.info("Armada Operator for action %s", workflow_info['id']) + logging.info("Armada Operator for action %s", self.action_info['id']) # Retrieve Deckhand Design Reference design_ref = self.get_deckhand_design_ref(context) @@ -108,6 +106,10 @@ class ArmadaOperator(BaseOperator): return site_design_validity + # Set up target manifest (only if not doing validate) + self.dc = self.xcom_puller.get_deployment_configuration() + self.target_manifest = self.dc['armada.manifest'] + # Create Armada Client # Retrieve Endpoint Information svc_type = 'armada' @@ -128,13 +130,8 @@ class ArmadaOperator(BaseOperator): # Armada Apply elif self.action == 'armada_apply': - # TODO (bryan-strassner) externalize the name of the manifest to - # use this needs to come from a site configuration document for - # consumption by shipyard/airflow. For now. "full-site" is the - # only value that will work. - target_manifest = 'full-site' self.armada_apply(context, armada_client, design_ref, - target_manifest) + self.target_manifest) # Armada Get Releases elif self.action == 'armada_get_releases': @@ -268,14 +265,7 @@ class ArmadaOperator(BaseOperator): logging.info("Deckhand endpoint is %s", deckhand_svc_endpoint) # Retrieve revision_id from xcom - # Note that in the case of 'deploy_site', the dag_id will - # be 'deploy_site.deckhand_get_design_version' for the - # 'deckhand_get_design_version' task. We need to extract - # the xcom value from it in order to get the value of the - # last committed revision ID - committed_revision_id = context['task_instance'].xcom_pull( - task_ids='deckhand_get_design_version', - dag_id=self.main_dag_name + '.deckhand_get_design_version') + committed_revision_id = self.xcom_puller.get_design_version() # Form Design Reference Path that we will use to retrieve # the Design YAMLs diff --git a/shipyard_airflow/plugins/concurrency_check_operator.py b/shipyard_airflow/plugins/concurrency_check_operator.py index 50ced2e6..9e99eb59 100644 --- a/shipyard_airflow/plugins/concurrency_check_operator.py +++ b/shipyard_airflow/plugins/concurrency_check_operator.py @@ -56,9 +56,14 @@ class ConcurrencyCheckOperator(BaseOperator): @apply_defaults def __init__(self, conflicting_dag_set=None, *args, **kwargs): super(ConcurrencyCheckOperator, self).__init__(*args, **kwargs) - if conflicting_dag_set is not None: - self.conflicting_dag_set = conflicting_dag_set - else: + self.conflicting_dag_set = conflicting_dag_set + + def execute(self, context): + """ + Run the check to see if this DAG has an concurrency issues with other + DAGs. Stop the workflow if there is. + """ + if self.conflicting_dag_set is None: self.check_dag_id = self.dag.dag_id logging.debug('dag_id is %s', self.check_dag_id) if '.' in self.dag.dag_id: @@ -70,11 +75,6 @@ class ConcurrencyCheckOperator(BaseOperator): self.conflicting_dag_set = find_conflicting_dag_set( self.check_dag_id) - def execute(self, context): - """ - Run the check to see if this DAG has an concurrency issues with other - DAGs. Stop the workflow if there is. - """ logging.info('Checking for running of dags: %s', ', '.join(self.conflicting_dag_set)) @@ -123,7 +123,7 @@ class ConcurrencyCheckOperator(BaseOperator): """ conflict_string = '{} conflicts with running {}. Aborting run'.format( dag_name, conflict) - logging.warning(conflict_string) + logging.error(conflict_string) raise AirflowException(conflict_string) diff --git a/shipyard_airflow/plugins/deckhand_base_operator.py b/shipyard_airflow/plugins/deckhand_base_operator.py index 60d4cca3..53b703db 100644 --- a/shipyard_airflow/plugins/deckhand_base_operator.py +++ b/shipyard_airflow/plugins/deckhand_base_operator.py @@ -23,6 +23,7 @@ from airflow.exceptions import AirflowException from deckhand.client import client as deckhand_client from service_endpoint import ucp_service_endpoint from service_token import shipyard_service_token +from xcom_puller import XcomPuller class DeckhandBaseOperator(BaseOperator): @@ -49,7 +50,6 @@ class DeckhandBaseOperator(BaseOperator): svc_session=None, svc_token=None, validation_read_timeout=None, - workflow_info={}, xcom_push=True, *args, **kwargs): """Initialization of DeckhandBaseOperator object. @@ -66,7 +66,6 @@ class DeckhandBaseOperator(BaseOperator): :param svc_session: Keystone Session :param svc_token: Keystone Token :param validation_read_timeout: Deckhand validation timeout - :param workflow_info: Information related to current workflow :param xcom_push: xcom usage """ @@ -84,7 +83,6 @@ class DeckhandBaseOperator(BaseOperator): self.svc_session = svc_session self.svc_token = svc_token self.validation_read_timeout = validation_read_timeout - self.workflow_info = workflow_info self.xcom_push_flag = xcom_push def execute(self, context): @@ -117,17 +115,13 @@ class DeckhandBaseOperator(BaseOperator): # Define task_instance task_instance = context['task_instance'] - # Extract information related to current workflow - # The workflow_info variable will be a dictionary - # that contains information about the workflow such - # as action_id, name and other related parameters - self.workflow_info = task_instance.xcom_pull( - task_ids='action_xcom', key='action', - dag_id=self.main_dag_name) + # Set up and retrieve values from xcom + self.xcom_puller = XcomPuller(self.main_dag_name, task_instance) + self.action_info = self.xcom_puller.get_action_info() # Logs uuid of Shipyard action logging.info("Executing Shipyard Action %s", - self.workflow_info['id']) + self.action_info['id']) # Retrieve Endpoint Information self.deckhand_svc_endpoint = ucp_service_endpoint( @@ -158,9 +152,7 @@ class DeckhandBaseOperator(BaseOperator): if self.task_id != 'deckhand_get_design_version': # Retrieve 'revision_id' from xcom - self.revision_id = task_instance.xcom_pull( - task_ids='deckhand_get_design_version', - dag_id=self.main_dag_name + '.deckhand_get_design_version') + self.revision_id = self.xcom_puller.get_design_version() if self.revision_id: logging.info("Revision ID is %d", self.revision_id) diff --git a/shipyard_airflow/plugins/deckhand_client_factory.py b/shipyard_airflow/plugins/deckhand_client_factory.py new file mode 100644 index 00000000..3efa8c8c --- /dev/null +++ b/shipyard_airflow/plugins/deckhand_client_factory.py @@ -0,0 +1,66 @@ +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import configparser +import logging + +from keystoneauth1.identity import v3 as keystone_v3 +from keystoneauth1 import session as keystone_session + +from deckhand.client import client as deckhand_client + +LOG = logging.getLogger(__name__) + + +class DeckhandClientFactory(object): + """Factory for DeckhandClient to encapsulate commonly reused setup""" + + def __init__(self, + shipyard_conf, + *args, **kwargs): + """Deckhand Client Factory + + Creates a client factory to retrieve clients + :param shipyard_conf: Location of shipyard.conf + """ + self.config = configparser.ConfigParser() + self.config.read(shipyard_conf) + + def get_client(self): + """Retrieve a deckhand client""" + + """ + Notes: + TODO(bryan-strassner): If/when the airflow plugin modules move to using + oslo config, consider using the example here: + https://github.com/att-comdev/deckhand/blob/cef3b52a104e620e88a24caf70ed2bb1297c268f/deckhand/barbican/client_wrapper.py#L53 + which will load the attributes from the config more flexibly. + Keystoneauth1 also provides for a simpler solution with: + https://docs.openstack.org/keystoneauth/latest/api/keystoneauth1.loading.html + if oslo config is used. + """ + keystone_auth = {} + # Construct Session Argument + for attr in ('auth_url', 'password', 'project_domain_name', + 'project_name', 'username', 'user_domain_name'): + keystone_auth[attr] = self.config.get('keystone_authtoken', attr) + + # Set up keystone session + auth = keystone_v3.Password(**keystone_auth) + sess = keystone_session.Session(auth=auth) + + LOG.info("Setting up Deckhand client with parameters") + for attr in keystone_auth: + if attr != 'password': + LOG.debug('%s = %s', attr, keystone_auth[attr]) + return deckhand_client.Client(session=sess, endpoint_type='internal') diff --git a/shipyard_airflow/plugins/deployment_configuration_operator.py b/shipyard_airflow/plugins/deployment_configuration_operator.py new file mode 100644 index 00000000..5e15d924 --- /dev/null +++ b/shipyard_airflow/plugins/deployment_configuration_operator.py @@ -0,0 +1,178 @@ +# Copyright 2017 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Deployment Configuration + +Retrieves the deployment configuration from Deckhand and places the values +retrieved into a dictionary +""" +import logging + +from airflow.exceptions import AirflowException +from airflow.models import BaseOperator +from airflow.plugins_manager import AirflowPlugin +from airflow.utils.decorators import apply_defaults + +try: + from deckhand_client_factory import DeckhandClientFactory +except ImportError: + from shipyard_airflow.plugins.deckhand_client_factory import ( + DeckhandClientFactory + ) + +LOG = logging.getLogger(__name__) + + +class DeploymentConfigurationOperator(BaseOperator): + """Deployment Configuration Operator + + Retrieve the deployment configuration from Deckhand for use throughout + the workflow. Put the configuration into a dictionary. + + Failures are raised: + - when Deckhand cannot be contacted + - when the DeploymentConfiguration (deployment-configuration) document + cannot be retrieved + """ + config_keys_defaults = { + "physical_provisioner.deployment_strategy": "all-at-once", + "physical_provisioner.deploy_interval": 30, + "physical_provisioner.deploy_timeout": 3600, + "physical_provisioner.destroy_interval": 30, + "physical_provisioner.destroy_timeout": 900, + "physical_provisioner.join_wait": 120, + "physical_provisioner.prepare_node_interval": 30, + "physical_provisioner.prepare_node_timeout": 1000, + "physical_provisioner.prepare_site_interval": 10, + "physical_provisioner.prepare_site_timeout": 300, + "physical_provisioner.verify_interval": 10, + "physical_provisioner.verify_timeout": 60, + "kubernetes.node_status_interval": 30, + "kubernetes.node_status_timeout": 1800, + "kubernetes_provisioner.drain_timeout": 3600, + "kubernetes_provisioner.drain_grace_period": 1800, + "kubernetes_provisioner.clear_labels_timeout": 1800, + "kubernetes_provisioner.remove_etcd_timeout": 1800, + "kubernetes_provisioner.etcd_ready_timeout": 600, + "armada.manifest": "full-site" + } + + @apply_defaults + def __init__(self, + main_dag_name=None, + shipyard_conf=None, + *args, **kwargs): + """Deployment Configuration Operator + + Generate a DeploymentConfigurationOperator to read the deployment's + configuration for use by other operators + + :param main_dag_name: Parent Dag + :param shipyard_conf: Location of shipyard.conf + """ + + super(DeploymentConfigurationOperator, self).__init__(*args, **kwargs) + self.main_dag_name = main_dag_name + self.shipyard_conf = shipyard_conf + + def execute(self, context): + """Perform Deployment Configuration extraction""" + + revision_id = self.get_revision_id(context.get('task_instance')) + doc = self.get_doc(revision_id) + converted = self.map_config_keys(doc) + # return the mapped configuration so that it can be placed on xcom + return converted + + def get_revision_id(self, task_instance): + """Get the revision id from xcom""" + if task_instance: + LOG.debug("task_instance found, extracting design version") + # Set the revision_id to the revision on the xcom + revision_id = task_instance.xcom_pull( + task_ids='deckhand_get_design_version', + dag_id=self.main_dag_name + '.deckhand_get_design_version') + if revision_id: + LOG.info("Revision is set to: %s for deployment configuration", + revision_id) + return revision_id + # either revision id was not on xcom, or the task_instance is messed + raise AirflowException( + "Design_revision is not set. Cannot proceed with retrieval of" + " the design configuration" + ) + + def get_doc(self, revision_id): + """Get the DeploymentConfiguration document dictionary from Deckhand""" + LOG.info( + "Attempting to retrieve shipyard/DeploymentConfiguration/v1, " + "deployment-configuration from Deckhand" + ) + filters = { + "schema": "shipyard/DeploymentConfiguration/v1", + "metadata.name": "deployment-configuration" + } + try: + dhclient = DeckhandClientFactory(self.shipyard_conf).get_client() + LOG.info("Deckhand Client acquired") + doc = dhclient.revisions.documents(revision_id, + rendered=True, + **filters) + except Exception as ex: + try: + failed_url = ex.url + except AttributeError: + failed_url = "No URL generated" + LOG.exception(ex) + raise AirflowException("Failed to retrieve deployment " + "configuration yaml using url: " + "{}".format(failed_url)) + + if len(doc) == 1 and doc[0].data: + doc_dict = doc[0].data + else: + raise AirflowException("A valid deployment-configuration is " + "required") + + LOG.info("DeploymentConfiguration retrieved") + return doc_dict + + def map_config_keys(self, cfg_data): + """Maps the deployment-configuration + + Converts to a more simple map of key-value pairs + """ + LOG.info("Mapping keys from deployment configuration") + return { + cfg_key: self.get_cfg_value(cfg_data, cfg_key, cfg_default) + for cfg_key, cfg_default in + DeploymentConfigurationOperator.config_keys_defaults.items() + } + + def get_cfg_value(self, cfg_data, cfg_key, cfg_default): + """Uses the dot notation key to get the value from the design config""" + data = cfg_data + for node in cfg_key.split('.'): + data = data.get(node, {}) + if data: + LOG.info("Deployment Config value set- %s: %s", cfg_key, data) + return data + else: + LOG.info("Deployment Config using default- %s: %s", + cfg_key, cfg_default) + return cfg_default + + +class DeploymentConfigurationOperatorPlugin(AirflowPlugin): + name = 'deployment_configuration_operator_plugin' + operators = [DeploymentConfigurationOperator] diff --git a/shipyard_airflow/plugins/drydock_operators.py b/shipyard_airflow/plugins/drydock_operators.py index c2188f40..c87c531b 100644 --- a/shipyard_airflow/plugins/drydock_operators.py +++ b/shipyard_airflow/plugins/drydock_operators.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import configparser import json import logging import os @@ -31,20 +30,11 @@ from check_k8s_node_status import check_node_status from drydock_provisioner import error as errors from service_endpoint import ucp_service_endpoint from service_token import shipyard_service_token +from xcom_puller import XcomPuller class DryDockOperator(BaseOperator): - """ - DryDock Client - :param action: Task to perform - :param design_ref: A URI reference to the design documents - :param main_dag_name: Parent Dag - :param node_filter: A filter for narrowing the scope of the task. Valid - fields are 'node_names', 'rack_names', 'node_tags' - :param shipyard_conf: Location of shipyard.conf - :param sub_dag_name: Child Dag - :param workflow_info: Information related to the workflow - """ + """DryDock Client""" @apply_defaults def __init__(self, action=None, @@ -54,9 +44,20 @@ class DryDockOperator(BaseOperator): shipyard_conf=None, svc_token=None, sub_dag_name=None, - workflow_info={}, xcom_push=True, *args, **kwargs): + """ + :param action: Task to perform + :param design_ref: A URI reference to the design documents + :param main_dag_name: Parent Dag + :param node_filter: A filter for narrowing the scope of the task. Valid + fields are 'node_names', 'rack_names', 'node_tags' + :param shipyard_conf: Location of shipyard.conf + :param sub_dag_name: Child Dag + + The Drydock operator assumes that prior steps have set xcoms for + the action and the deployment configuration + """ super(DryDockOperator, self).__init__(*args, **kwargs) self.action = action @@ -66,7 +67,6 @@ class DryDockOperator(BaseOperator): self.shipyard_conf = shipyard_conf self.svc_token = svc_token self.sub_dag_name = sub_dag_name - self.workflow_info = workflow_info self.xcom_push_flag = xcom_push def execute(self, context): @@ -81,22 +81,19 @@ class DryDockOperator(BaseOperator): # Define task_instance task_instance = context['task_instance'] - # Extract information related to current workflow - # The workflow_info variable will be a dictionary - # that contains information about the workflow such - # as action_id, name and other related parameters - workflow_info = task_instance.xcom_pull( - task_ids='action_xcom', key='action', - dag_id=self.main_dag_name) + # Set up and retrieve values from xcom + self.xcom_puller = XcomPuller(self.main_dag_name, task_instance) + self.action_info = self.xcom_puller.get_action_info() + self.dc = self.xcom_puller.get_deployment_configuration() # Logs uuid of action performed by the Operator - logging.info("DryDock Operator for action %s", workflow_info['id']) + logging.info("DryDock Operator for action %s", self.action_info['id']) # Retrieve information of the server that we want to redeploy if user # executes the 'redeploy_server' dag # Set node filter to be the server that we want to redeploy - if workflow_info['dag_id'] == 'redeploy_server': - redeploy_server = workflow_info['parameters'].get('server-name') + if self.action_info['dag_id'] == 'redeploy_server': + redeploy_server = self.action_info['parameters'].get('server-name') if redeploy_server: logging.info("Server to be redeployed is %s", redeploy_server) @@ -139,83 +136,56 @@ class DryDockOperator(BaseOperator): # Set up DryDock Client drydock_client = self.drydock_session_client(drydock_svc_endpoint) - # Read shipyard.conf - config = configparser.ConfigParser() - config.read(self.shipyard_conf) - - if not config.read(self.shipyard_conf): - raise AirflowException("Unable to read content of shipyard.conf") - # Create Task for verify_site if self.action == 'verify_site': - - # Default settings for 'verify_site' execution is to query - # the task every 10 seconds and to time out after 60 seconds - query_interval = config.get('drydock', - 'verify_site_query_interval') - task_timeout = config.get('drydock', 'verify_site_task_timeout') - + q_interval = self.dc['physical_provisioner.verify_interval'] + task_timeout = self.dc['physical_provisioner.verify_timeout'] self.drydock_action(drydock_client, context, self.action, - query_interval, task_timeout) + q_interval, task_timeout) # Create Task for prepare_site elif self.action == 'prepare_site': - # Default settings for 'prepare_site' execution is to query - # the task every 10 seconds and to time out after 300 seconds - query_interval = config.get('drydock', - 'prepare_site_query_interval') - task_timeout = config.get('drydock', 'prepare_site_task_timeout') - + q_interval = self.dc['physical_provisioner.prepare_site_interval'] + task_timeout = self.dc['physical_provisioner.prepare_site_timeout'] self.drydock_action(drydock_client, context, self.action, - query_interval, task_timeout) + q_interval, task_timeout) # Create Task for prepare_node elif self.action == 'prepare_nodes': - # Default settings for 'prepare_node' execution is to query - # the task every 30 seconds and to time out after 1800 seconds - query_interval = config.get('drydock', - 'prepare_node_query_interval') - task_timeout = config.get('drydock', 'prepare_node_task_timeout') - + q_interval = self.dc['physical_provisioner.prepare_node_interval'] + task_timeout = self.dc['physical_provisioner.prepare_node_timeout'] self.drydock_action(drydock_client, context, self.action, - query_interval, task_timeout) + q_interval, task_timeout) # Create Task for deploy_node elif self.action == 'deploy_nodes': - # Default settings for 'deploy_node' execution is to query - # the task every 30 seconds and to time out after 3600 seconds - query_interval = config.get('drydock', - 'deploy_node_query_interval') - task_timeout = config.get('drydock', 'deploy_node_task_timeout') - + q_interval = self.dc['physical_provisioner.deploy_interval'] + task_timeout = self.dc['physical_provisioner.deploy_timeout'] self.drydock_action(drydock_client, context, self.action, - query_interval, task_timeout) + q_interval, task_timeout) # Wait for 120 seconds (default value) before checking the cluster # join process as it takes time for process to be triggered across # all nodes - cluster_join_check_backoff_time = config.get( - 'drydock', 'cluster_join_check_backoff_time') + join_wait = self.dc['physical_provisioner.join_wait'] logging.info("All nodes deployed in MAAS") logging.info("Wait for %d seconds before checking node state...", - int(cluster_join_check_backoff_time)) - time.sleep(int(cluster_join_check_backoff_time)) - + join_wait) + time.sleep(join_wait) # Check that cluster join process is completed before declaring - # deploy_node as 'completed'. Set time out to 30 minutes and set - # polling interval to 30 seconds. - check_node_status(1800, 30) + # deploy_node as 'completed'. + node_st_timeout = self.dc['kubernetes.node_status_timeout'] + node_st_interval = self.dc['kubernetes.node_status_interval'] + check_node_status(node_st_timeout, node_st_interval) # Create Task for destroy_node # NOTE: This is a PlaceHolder function. The 'destroy_node' # functionalities in DryDock is being worked on and is not # ready at the moment. elif self.action == 'destroy_node': - # Default settings for 'destroy_node' execution is to query - # the task every 30 seconds and to time out after 900 seconds - query_interval = config.get('drydock', - 'destroy_node_query_interval') - task_timeout = config.get('drydock', 'destroy_node_task_timeout') + # see deployment_configuration_operator.py for defaults + q_interval = self.dc['physical_provisioner.destroy_interval'] + task_timeout = self.dc['physical_provisioner.destroy_timeout'] logging.info("Destroying node %s from cluster...", redeploy_server) time.sleep(15) @@ -224,7 +194,7 @@ class DryDockOperator(BaseOperator): # TODO: Uncomment when the function to destroy/delete node is # ready for consumption in Drydock # self.drydock_action(drydock_client, context, self.action, - # query_interval, task_timeout) + # q_interval, task_timeout) # Do not perform any action else: @@ -403,15 +373,7 @@ class DryDockOperator(BaseOperator): svc_type=svc_type) logging.info("Deckhand endpoint is %s", deckhand_svc_endpoint) - # Retrieve revision_id from xcom - # Note that in the case of 'deploy_site', the dag_id will - # be 'deploy_site.deckhand_get_design_version' for the - # 'deckhand_get_design_version' task. We need to extract - # the xcom value from it in order to get the value of the - # last committed revision ID - committed_revision_id = context['task_instance'].xcom_pull( - task_ids='deckhand_get_design_version', - dag_id=self.main_dag_name + '.deckhand_get_design_version') + committed_revision_id = self.xcom_puller.get_design_version() # Form DeckHand Design Reference Path that we will use to retrieve # the DryDock YAMLs diff --git a/shipyard_airflow/plugins/promenade_base_operator.py b/shipyard_airflow/plugins/promenade_base_operator.py index 39637a0e..1389f9e1 100644 --- a/shipyard_airflow/plugins/promenade_base_operator.py +++ b/shipyard_airflow/plugins/promenade_base_operator.py @@ -21,6 +21,7 @@ from airflow.exceptions import AirflowException from service_endpoint import ucp_service_endpoint from service_token import shipyard_service_token +from xcom_puller import XcomPuller class PromenadeBaseOperator(BaseOperator): @@ -30,7 +31,6 @@ class PromenadeBaseOperator(BaseOperator): All promenade related workflow operators will use the promenade base operator as the parent and inherit attributes and methods from this class - """ @apply_defaults @@ -42,7 +42,6 @@ class PromenadeBaseOperator(BaseOperator): shipyard_conf=None, sub_dag_name=None, svc_token=None, - workflow_info={}, xcom_push=True, *args, **kwargs): """Initialization of PromenadeBaseOperator object. @@ -54,9 +53,9 @@ class PromenadeBaseOperator(BaseOperator): :param shipyard_conf: Path of shipyard.conf :param sub_dag_name: Child Dag :param svc_token: Keystone Token - :param workflow_info: Information related to current workflow :param xcom_push: xcom usage - + The Drydock operator assumes that prior steps have set xcoms for + the action and the deployment configuration """ super(PromenadeBaseOperator, self).__init__(*args, @@ -68,11 +67,9 @@ class PromenadeBaseOperator(BaseOperator): self.shipyard_conf = shipyard_conf self.sub_dag_name = sub_dag_name self.svc_token = svc_token - self.workflow_info = workflow_info self.xcom_push_flag = xcom_push def execute(self, context): - # Execute promenade base function self.promenade_base(context) @@ -84,22 +81,18 @@ class PromenadeBaseOperator(BaseOperator): # Define task_instance task_instance = context['task_instance'] - # Extract information related to current workflow - # The workflow_info variable will be a dictionary - # that contains information about the workflow such - # as action_id, name and other related parameters - self.workflow_info = task_instance.xcom_pull( - task_ids='action_xcom', key='action', - dag_id=self.main_dag_name) + # Set up and retrieve values from xcom + self.xcom_puller = XcomPuller(self.main_dag_name, task_instance) + self.action_info = self.xcom_puller.get_action_info() + self.dc = self.xcom_puller.get_deployment_configuration() # Logs uuid of Shipyard action - logging.info("Executing Shipyard Action %s", - self.workflow_info['id']) + logging.info("Executing Shipyard Action %s", self.action_info['id']) # Retrieve information of the server that we want to redeploy # if user executes the 'redeploy_server' dag - if self.workflow_info['dag_id'] == 'redeploy_server': - self.redeploy_server = self.workflow_info['parameters'].get( + if self.action_info['dag_id'] == 'redeploy_server': + self.redeploy_server = self.action_info['parameters'].get( 'server-name') if self.redeploy_server: diff --git a/shipyard_airflow/plugins/promenade_check_etcd.py b/shipyard_airflow/plugins/promenade_check_etcd.py index d4f5e200..f0a406ac 100644 --- a/shipyard_airflow/plugins/promenade_check_etcd.py +++ b/shipyard_airflow/plugins/promenade_check_etcd.py @@ -33,6 +33,10 @@ class PromenadeCheckEtcdOperator(PromenadeBaseOperator): def do_execute(self): # Placeholder function. Updates will be made when the Promenade # API is ready for consumption. + + # TODO(bryan-strassner) use: + # self.dc['kubernetes_provisioner.etcd_ready_timeout'] + # self.dc['kubernetes_provisioner.remove_etcd_timeout'] logging.info("Performing health check on etcd...") time.sleep(5) diff --git a/shipyard_airflow/plugins/promenade_clear_labels.py b/shipyard_airflow/plugins/promenade_clear_labels.py index 526f8b78..a78a9f8d 100644 --- a/shipyard_airflow/plugins/promenade_clear_labels.py +++ b/shipyard_airflow/plugins/promenade_clear_labels.py @@ -33,6 +33,10 @@ class PromenadeClearLabelsOperator(PromenadeBaseOperator): def do_execute(self): # Placeholder function. Updates will be made when the Promenade # API is ready for consumption. + + # TODO(bryan-strassner) use: + # self.dc['kubernetes_provisioner.clear_labels_timeout'] + logging.info("Removing labels on node...") time.sleep(5) diff --git a/shipyard_airflow/plugins/promenade_drain_node.py b/shipyard_airflow/plugins/promenade_drain_node.py index 7fea0c28..1822bdf7 100644 --- a/shipyard_airflow/plugins/promenade_drain_node.py +++ b/shipyard_airflow/plugins/promenade_drain_node.py @@ -35,6 +35,11 @@ class PromenadeDrainNodeOperator(PromenadeBaseOperator): def do_execute(self): # Placeholder function. Updates will be made when the Promenade # API is ready for consumption. + + # TODO(bryan-strassner) use: + # self.dc['kubernetes_provisioner.drain_timeout'] + # self.dc['kubernetes_provisioner.drain_grace_period'] + logging.info("Draining node...") time.sleep(5) diff --git a/shipyard_airflow/plugins/xcom_puller.py b/shipyard_airflow/plugins/xcom_puller.py new file mode 100644 index 00000000..5b3ddef9 --- /dev/null +++ b/shipyard_airflow/plugins/xcom_puller.py @@ -0,0 +1,84 @@ +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging + +LOG = logging.getLogger(__name__) + + +class XcomPuller(object): + """XcomPuller provides a common source to get reused xcom values + + One XcomPuller should be created per task. + Note: xcom values are found by using the current task instance + and finding the . that the xcom was added + to the workflow. + The point of this class is to keep all this very configurable + naming in one place as much as possible so that changes to + the dag names and step names have less places to update. + """ + + def __init__(self, main_dag_name, task_instance): + self.mdn = main_dag_name + self.ti = task_instance + + def _get_xcom(self, source_task, dag_id=None, key=None, log_result=True): + """Find a particular xcom value""" + if dag_id is None: + source_dag = self.mdn + else: + source_dag = "{}.{}".format(self.mdn, dag_id) + LOG.info("Retrieving xcom from %s.%s with key %s", + source_dag, + source_task, + key) + xcom_val = self.ti.xcom_pull(task_ids=source_task, + dag_id=source_dag, + key=key) + if log_result: + # log the xcom value - don't put large values in xcom! + LOG.info(xcom_val) + + return xcom_val + + def get_deployment_configuration(self): + """Retrieve the deployment configuration dictionary""" + source_task = 'get_deployment_configuration' + source_dag = 'dag_deployment_configuration' + key = None + return self._get_xcom(source_task=source_task, + dag_id=source_dag, + key=key) + + def get_action_info(self): + """Retrive the action and action parameter info dictionary + + Extract information related to current workflow. This is a dictionary + that contains information about the workflow such as action_id, name + and other related parameters + """ + source_task = 'action_xcom' + source_dag = None + key = 'action' + return self._get_xcom(source_task=source_task, + dag_id=source_dag, + key=key) + + def get_design_version(self): + """Retrieve the design version being used for this workflow""" + source_task = 'deckhand_get_design_version' + source_dag = 'deckhand_get_design_version' + key = None + return self._get_xcom(source_task=source_task, + dag_id=source_dag, + key=key) diff --git a/shipyard_airflow/schemas/deploymentConfiguration.yaml b/shipyard_airflow/schemas/deploymentConfiguration.yaml new file mode 100644 index 00000000..ce6ae1aa --- /dev/null +++ b/shipyard_airflow/schemas/deploymentConfiguration.yaml @@ -0,0 +1,76 @@ + +--- +schema: 'deckhand/DataSchema/v1' +metadata: + schema: metadata/Control/v1 + name: shipyard/DeploymentConfiguration/v1 + labels: + application: shipyard +data: + $schema: 'http://json-schema.org/schema#' + id: 'https://github.com/att-comdev/shipyard/deploymentConfiguration.yaml' + type: 'object' + properties: + physical_provisioner: + type: 'object' + properties: + deployment_strategy: + type: 'string' + enum: + - 'all-at-once' + deploy_interval: + type: 'integer' + deploy_timeout: + type: 'integer' + destroy_interval: + type: 'integer' + destroy_timeout: + type: 'integer' + join_wait: + type: 'integer' + prepare_node_interval: + type: 'integer' + prepare_node_timeout: + type: 'integer' + prepare_site_interval: + type: 'integer' + prepare_site_timeout: + type: 'integer' + verify_interval: + type: 'integer' + verify_timeout: + type: 'integer' + additionalProperties: false + kubernetes: + type: 'object' + properties: + node_status_interval: + type: 'integer' + node_status_timeout: + type: 'integer' + additionalProperties: false + kubernetes_provisioner: + type: 'object' + properties: + drain_timeout: + type: 'integer' + drain_grace_period: + type: 'integer' + clear_labels_timeout: + type: 'integer' + remove_etcd_timeout: + type: 'integer' + etcd_ready_timeout: + type: 'integer' + additionalProperties: false + armada: + type: 'object' + properties: + manifest: + type: 'string' + additionalProperties: false + required: + - manifest + additionalProperties: false + required: + - armada diff --git a/test-requirements.txt b/test-requirements.txt index 04a07f7f..8c2f45df 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -5,6 +5,10 @@ mock==2.0.0 responses==0.8.1 testfixtures==5.1.1 apache-airflow[crypto,celery,postgres,hive,hdfs,jdbc]==1.9.0 +git+https://github.com/att-comdev/deckhand.git@master#egg=deckhand +git+https://github.com/att-comdev/promenade.git@master#egg=promenade +git+https://github.com/att-comdev/drydock.git@master#egg=drydock +git+https://github.com/att-comdev/armada.git@master#egg=armada # Linting flake8==3.3.0 diff --git a/tests/unit/control/test.conf b/tests/unit/control/test.conf index d9324273..93d1327c 100644 --- a/tests/unit/control/test.conf +++ b/tests/unit/control/test.conf @@ -5,6 +5,8 @@ upgrade_db = false postgresql_airflow_db = postgresql+psycopg2://airflow:password@postgresql.ucp.svc.cluster.local:5432/airflow postgresql_db = postgresql+psycopg2://shipyard:password@postgresql.ucp.svc.cluster.local:5432/shipyard web_server = http://airflow-web-int.ucp.svc.cluster.local:8080/ +airflow_api_connect_timeout = 5 +airflow_api_read_timeout = 60 [deckhand] service_type = deckhand [drydock] diff --git a/tests/unit/plugins/test.conf b/tests/unit/plugins/test.conf new file mode 100644 index 00000000..2f223440 --- /dev/null +++ b/tests/unit/plugins/test.conf @@ -0,0 +1,15 @@ +[keystone_authtoken] +auth_section = keystone_authtoken +auth_type = password +auth_uri = http://keystone-api.ucp.svc.cluster.local:80/v3 +auth_url = http://keystone-api.ucp.svc.cluster.local:80/v3 +auth_version = v3 +delay_auth_decision = true +memcache_secret_key = zwe6wa59AykCCMk4ucOwEbAkmLSXLOYRharO39FYHY0WYlQnxMwTIJna6NBzJskm +memcache_security_strategy = None +memcached_servers = memcached.ucp.svc.cluster.local:11211 +password = password +project_domain_name = default +project_name = service +user_domain_name = default +username = shipyard diff --git a/tests/unit/plugins/test_deckhand_client_factory.py b/tests/unit/plugins/test_deckhand_client_factory.py new file mode 100644 index 00000000..044f4cc7 --- /dev/null +++ b/tests/unit/plugins/test_deckhand_client_factory.py @@ -0,0 +1,29 @@ +# Copyright 2017 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os + +from deckhand.client import client as deckhand_client + +from shipyard_airflow.plugins.deckhand_client_factory import ( + DeckhandClientFactory +) + + +def test_get_client(): + """Test the get_client functionality""" + cur_dir = os.path.dirname(__file__) + filename = os.path.join(cur_dir, 'test.conf') + client_factory = DeckhandClientFactory(filename) + client = client_factory.get_client() + assert isinstance(client, deckhand_client.Client) diff --git a/tests/unit/plugins/test_deployment_configuration_operator.py b/tests/unit/plugins/test_deployment_configuration_operator.py new file mode 100644 index 00000000..24c55aa6 --- /dev/null +++ b/tests/unit/plugins/test_deployment_configuration_operator.py @@ -0,0 +1,158 @@ +# Copyright 2017 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import mock +import pytest +import yaml + +import airflow +from airflow.exceptions import AirflowException + +try: + from deployment_configuration_operator import ( + DeploymentConfigurationOperator + ) +except ImportError: + from shipyard_airflow.plugins.deployment_configuration_operator import ( + DeploymentConfigurationOperator + ) + +try: + from deckhand_client_factory import DeckhandClientFactory +except ImportError: + from shipyard_airflow.plugins.deckhand_client_factory import ( + DeckhandClientFactory + ) + + +def test_execute_exception(): + """Test that execute results in a failure with bad context""" + + dco = DeploymentConfigurationOperator(main_dag_name="main", + shipyard_conf="shipyard.conf", + task_id="t1") + with pytest.raises(AirflowException) as expected_exc: + # Design revision is not set on xcom pull + dco.execute(context={}) + assert ("Design_revision is not set. Cannot proceed with retrieval" + " of the design configuration") in str(expected_exc) + + +@mock.patch.object(DeploymentConfigurationOperator, 'get_revision_id', + return_value=99) +def test_execute_no_client(p1): + # no keystone authtoken present in configuration + dco = DeploymentConfigurationOperator(main_dag_name="main", + shipyard_conf="shipyard.conf", + task_id="t1") + with pytest.raises(AirflowException) as expected_exc: + dco.execute(context={}) + assert ("Failed to retrieve deployment configuration yaml") in str( + expected_exc) + + +@mock.patch.object(airflow.models.TaskInstance, 'xcom_pull', + return_value=99) +def test_get_revision_id(ti): + """Test that get revision id follows desired exits""" + dco = DeploymentConfigurationOperator(main_dag_name="main", + shipyard_conf="shipyard.conf", + task_id="t1") + ti = airflow.models.TaskInstance(task=mock.MagicMock(), + execution_date="no") + rid = dco.get_revision_id(ti) + assert rid == 99 + + +@mock.patch.object(airflow.models.TaskInstance, 'xcom_pull', + return_value=None) +def test_get_revision_id_none(ti): + """Test that get revision id follows desired exits""" + dco = DeploymentConfigurationOperator(main_dag_name="main", + shipyard_conf="shipyard.conf", + task_id="t1") + ti = airflow.models.TaskInstance(task=mock.MagicMock(), execution_date="o") + with pytest.raises(AirflowException) as expected_exc: + rid = dco.get_revision_id(ti) + assert "Design_revision is not set." in str(expected_exc) + + +def test_get_doc_no_deckhand(): + """Get doc should fail to contact deckhand return a document""" + dco = DeploymentConfigurationOperator(main_dag_name="main", + shipyard_conf="shipyard.conf", + task_id="t1") + with pytest.raises(AirflowException) as expected_exc: + dco.get_doc(99) + assert "Failed to retrieve deployment" in str(expected_exc) + + +def get_m_client(data): + doc_obj = mock.MagicMock() + doc_obj.data = data + doc_obj_l = [doc_obj] + mock_client = mock.MagicMock() + mock_client.revisions.documents = lambda r, rendered, **filters: doc_obj_l + return mock_client + + +@mock.patch.object(DeckhandClientFactory, 'get_client', + return_value=get_m_client('abcdefg')) +def test_get_doc_mock_deckhand(p1): + """Get doc should return a document""" + dco = DeploymentConfigurationOperator(main_dag_name="main", + shipyard_conf="shipyard.conf", + task_id="t1") + + doc = dco.get_doc(99) + assert doc == 'abcdefg' + + +@mock.patch.object(DeckhandClientFactory, 'get_client', + return_value=get_m_client(None)) +def test_get_doc_mock_deckhand_invalid(p1): + """Get doc should return a document""" + dco = DeploymentConfigurationOperator(main_dag_name="main", + shipyard_conf="shipyard.conf", + task_id="t1") + + with pytest.raises(AirflowException) as airflow_ex: + dco.get_doc(99) + assert 'valid deployment-configuration' in str(airflow_ex) + + +sample_deployment_config = """ + physical_provisioner: + deployment_strategy: all-at-once + deploy_interval: 900 + kubernetes_provisioner: + drain_timeout: 3600 + drain_grace_period: 1800 + clear_labels_timeout: 1800 + remove_etcd_timeout: 1800 + etcd_ready_timeout: 600 + armada: + manifest: 'full-site'""" + + +def test_map_config_keys(): + """Should reutrn the new dict from the yaml dict""" + yaml_dict = yaml.safe_load(sample_deployment_config) + dco = DeploymentConfigurationOperator(main_dag_name="main", + shipyard_conf="shipyard.conf", + task_id="t1") + mapped = dco.map_config_keys(yaml_dict) + for key in DeploymentConfigurationOperator.config_keys_defaults: + assert key in mapped + assert mapped.get("physical_provisioner.deploy_interval") == 900 + assert mapped.get("physical_provisioner.verify_timeout") == 60 diff --git a/tests/unit/schemas/__init__.py b/tests/unit/schemas/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/unit/schemas/test_deployment_configuration.py b/tests/unit/schemas/test_deployment_configuration.py new file mode 100644 index 00000000..2700ad5f --- /dev/null +++ b/tests/unit/schemas/test_deployment_configuration.py @@ -0,0 +1,78 @@ +# Copyright 2017 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging +import os +import yaml + +import jsonschema +import pkg_resources +import pytest +import shutil + +from jsonschema.exceptions import ValidationError + +LOG = logging.getLogger(__name__) + + +class BaseSchemaValidationTest(object): + def _test_validate(self, schema, expect_failure, input_files, input): + """validates input yaml against schema. + :param schema: schema yaml file + :param expect_failure: should the validation pass or fail. + :param input_files: pytest fixture used to access the test input files + :param input: test input yaml doc filename""" + schema_dir = pkg_resources.resource_filename('shipyard_airflow', + 'schemas') + schema_filename = os.path.join(schema_dir, schema) + schema_file = open(schema_filename, 'r') + schema = yaml.safe_load(schema_file) + + input_file = input_files.join(input) + instance_file = open(str(input_file), 'r') + instance = yaml.safe_load(instance_file) + + LOG.info('Input: %s, Schema: %s', input_file, schema_filename) + + if expect_failure: + with pytest.raises(ValidationError): + jsonschema.validate(instance['data'], schema['data']) + else: + jsonschema.validate(instance['data'], schema['data']) + + +class TestValidation(BaseSchemaValidationTest): + def test_validate_deploy_config_full_valid(self, input_files): + self._test_validate('deploymentConfiguration.yaml', False, input_files, + 'deploymentConfiguration_full_valid.yaml') + + def test_validate_deploy_config_bad_manifest(self, input_files): + self._test_validate('deploymentConfiguration.yaml', True, input_files, + 'deploymentConfiguration_bad_manifest.yaml') + + def test_validate_deploy_config_minimal_valid(self, input_files): + self._test_validate('deploymentConfiguration.yaml', False, input_files, + 'deploymentConfiguration_minimal_valid.yaml') + + @pytest.fixture(scope='module') + def input_files(self, tmpdir_factory, request): + tmpdir = tmpdir_factory.mktemp('data') + samples_dir = os.path.dirname(str( + request.fspath)) + "/" + "../yaml_samples" + samples = os.listdir(samples_dir) + + for f in samples: + src_file = samples_dir + "/" + f + dst_file = str(tmpdir) + "/" + f + shutil.copyfile(src_file, dst_file) + return tmpdir diff --git a/tests/unit/yaml_samples/deploymentConfiguration_bad_manifest.yaml b/tests/unit/yaml_samples/deploymentConfiguration_bad_manifest.yaml new file mode 100644 index 00000000..7e6d5c32 --- /dev/null +++ b/tests/unit/yaml_samples/deploymentConfiguration_bad_manifest.yaml @@ -0,0 +1,13 @@ +--- +schema: shipyard/DeploymentConfiguration/v1 +metadata: + schema: metadata/Document/v1 + name: deployment-configuration + layeringDefinition: + abstract: false + layer: global + storagePolicy: cleartext +data: + armada: + manifest: + bad_name_field: 'full-site' diff --git a/tests/unit/yaml_samples/deploymentConfiguration_full_valid.yaml b/tests/unit/yaml_samples/deploymentConfiguration_full_valid.yaml new file mode 100644 index 00000000..7501b172 --- /dev/null +++ b/tests/unit/yaml_samples/deploymentConfiguration_full_valid.yaml @@ -0,0 +1,31 @@ +--- +schema: shipyard/DeploymentConfiguration/v1 +metadata: + schema: metadata/Document/v1 + name: deployment-configuration + layeringDefinition: + abstract: false + layer: global + storagePolicy: cleartext +data: + physical_provisioner: + deployment_strategy: all-at-once + deploy_interval: 30 + deploy_timeout: 3600 + destroy_interval: 30 + destroy_timeout: 900 + join_wait: 120 + prepare_node_interval: 30 + prepare_node_timeout: 1000 + prepare_site_interval: 10 + prepare_site_timeout: 300 + verify_interval: 10 + verify_timeout: 60 + kubernetes_provisioner: + drain_timeout: 3600 + drain_grace_period: 1800 + clear_labels_timeout: 1800 + remove_etcd_timeout: 1800 + etcd_ready_timeout: 600 + armada: + manifest: 'full-site' diff --git a/tests/unit/yaml_samples/deploymentConfiguration_minimal_valid.yaml b/tests/unit/yaml_samples/deploymentConfiguration_minimal_valid.yaml new file mode 100644 index 00000000..438152e2 --- /dev/null +++ b/tests/unit/yaml_samples/deploymentConfiguration_minimal_valid.yaml @@ -0,0 +1,12 @@ +--- +schema: shipyard/DeploymentConfiguration/v1 +metadata: + schema: metadata/Document/v1 + name: deployment-configuration + layeringDefinition: + abstract: false + layer: global + storagePolicy: cleartext +data: + armada: + manifest: 'full-site' diff --git a/tox.ini b/tox.ini index f0b9cbcf..f57ef260 100644 --- a/tox.ini +++ b/tox.ini @@ -1,13 +1,15 @@ [tox] -envlist = py35, pep8, coverage, bandit, docs +envlist = unit, pep8, coverage, bandit, docs [testenv] -deps = -r{toxinidir}/requirements.txt - -r{toxinidir}/test-requirements.txt setenv= PYTHONWARNING=all basepython=python3.5 -commands= +deps = -r{toxinidir}/requirements.txt + -r{toxinidir}/test-requirements.txt + +[testenv:unit] +commands = pytest \ {posargs} @@ -33,7 +35,11 @@ filename = *.py # NOTE(Bryan Strassner) ignoring F841 because of the airflow example pattern # of naming variables even if they aren't used for DAGs and Operators. # Doing so adds readability and context in this case. -ignore = F841 +# TODO(Bryan Strassner) The hacking rules defined as ignored below in many +# cases need to be un-ignored and fixed up. These are ignored because of +# the method in which test requirements bring in the hacking rules from +# other projects. +ignore = F841, H101, H201, H210, H238, H301, H304, H306, H401, H403, H404, H405 # NOTE(Bryan Strassner) excluding 3rd party and generated code that is brought into the # codebase. exclude = .venv,.git,.tox,build,dist,*plugins/rest_api_plugin.py,*lib/python*,*egg,alembic/env.py,docs @@ -47,9 +53,10 @@ commands = [testenv:coverage] commands = pytest \ + {posargs} \ --cov-branch \ - --cov-report term-missing:skip-covered \ - --cov-config .coveragerc \ + --cov-report=term-missing:skip-covered \ + --cov-config=.coveragerc \ --cov=shipyard_airflow \ --cov=shipyard_client \ --cov-report=html