diff --git a/src/bin/shipyard_airflow/shipyard_airflow/dags/common_step_factory.py b/src/bin/shipyard_airflow/shipyard_airflow/dags/common_step_factory.py index beba0cc1..b11aa1da 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/dags/common_step_factory.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/dags/common_step_factory.py @@ -19,6 +19,7 @@ from airflow.operators.subdag_operator import SubDagOperator try: # Operators are loaded from being registered to airflow.operators # in a deployed fashion + from airflow.operators import ArmadaTestReleasesOperator from airflow.operators import ConcurrencyCheckOperator from airflow.operators import DeckhandRetrieveRenderedDocOperator from airflow.operators import DeploymentConfigurationOperator @@ -27,6 +28,8 @@ try: from airflow.operators import DrydockRelabelNodesOperator except ImportError: # for local testing, they are loaded from their source directory + from shipyard_airflow.plugins.armada_test_releases import \ + ArmadaTestReleasesOperator from shipyard_airflow.plugins.concurrency_check_operator import \ ConcurrencyCheckOperator from shipyard_airflow.plugins.deckhand_retrieve_rendered_doc import \ @@ -219,6 +222,19 @@ class CommonStepFactory(object): on_failure_callback=step_failure_handler, dag=self.dag) + def get_armada_test_releases(self, task_id=dn.ARMADA_TEST_RELEASES): + """Generate the armada_test_releases step + + Armada invokes Helm tests for all deployed releases or a targeted + release specified by the "release" parameter. + """ + return ArmadaTestReleasesOperator( + shipyard_conf=config_path, + main_dag_name=self.parent_dag_name, + task_id=task_id, + on_failure_callback=step_failure_handler, + dag=self.dag) + def get_unguarded_destroy_servers(self, task_id=dn.DESTROY_SERVER): """Generates an unguarded destroy server step. diff --git a/src/bin/shipyard_airflow/shipyard_airflow/dags/dag_names.py b/src/bin/shipyard_airflow/shipyard_airflow/dags/dag_names.py index 05f83022..3fba537f 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/dags/dag_names.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/dags/dag_names.py @@ -22,6 +22,7 @@ RELABEL_NODES_DAG_NAME = 'relabel_nodes' # Steps ACTION_XCOM = 'action_xcom' +ARMADA_TEST_RELEASES = 'armada_test_releases' CONCURRENCY_CHECK = 'dag_concurrency_check' CREATE_ACTION_TAG = 'create_action_tag' DECIDE_AIRFLOW_UPGRADE = 'decide_airflow_upgrade' diff --git a/src/bin/shipyard_airflow/shipyard_airflow/dags/test_site.py b/src/bin/shipyard_airflow/shipyard_airflow/dags/test_site.py new file mode 100644 index 00000000..a0578321 --- /dev/null +++ b/src/bin/shipyard_airflow/shipyard_airflow/dags/test_site.py @@ -0,0 +1,58 @@ +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from datetime import timedelta + +import airflow +from airflow import DAG + +try: + from common_step_factory import CommonStepFactory +except ImportError: + from shipyard_airflow.dags.common_step_factory import CommonStepFactory + +"""test site""" + +PARENT_DAG_NAME = 'test_site' + +default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'start_date': airflow.utils.dates.days_ago(1), + 'email': [''], + 'email_on_failure': False, + 'email_on_retry': False, + 'provide_context': True, + 'retries': 0, + 'retry_delay': timedelta(seconds=30), +} + +dag = DAG(PARENT_DAG_NAME, default_args=default_args, schedule_interval=None) + +step_factory = CommonStepFactory(parent_dag_name=PARENT_DAG_NAME, + dag=dag, + default_args=default_args, + action_type='site') + +action_xcom = step_factory.get_action_xcom() +preflight = step_factory.get_preflight() +deployment_configuration = step_factory.get_deployment_configuration() +test_releases = step_factory.get_armada_test_releases() + +# DAG Wiring +preflight.set_upstream(action_xcom) +deployment_configuration.set_upstream(action_xcom) +test_releases.set_upstream([ + deployment_configuration, + preflight +]) diff --git a/src/bin/shipyard_airflow/shipyard_airflow/plugins/armada_test_releases.py b/src/bin/shipyard_airflow/shipyard_airflow/plugins/armada_test_releases.py new file mode 100644 index 00000000..3cd7f2d5 --- /dev/null +++ b/src/bin/shipyard_airflow/shipyard_airflow/plugins/armada_test_releases.py @@ -0,0 +1,81 @@ +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging + +from airflow.exceptions import AirflowException +from airflow.plugins_manager import AirflowPlugin + +try: + from armada_base_operator import ArmadaBaseOperator +except ImportError: + from shipyard_airflow.plugins.armada_base_operator import \ + ArmadaBaseOperator +from armada.exceptions import api_exceptions as errors + +LOG = logging.getLogger(__name__) + + +class ArmadaTestReleasesOperator(ArmadaBaseOperator): + """Armada Test Releases Operator + + Invoke the Helm test of every deployed release or a targeted release + specified by the "release" parameter. + """ + def do_execute(self): + # Retrieve cleanup flag from action params + cleanup = self.action_params.get('cleanup') + if cleanup: + self.query['cleanup'] = cleanup + + release = self.action_params.get('release') + if release: + # Invoke Helm tests for specified release + self._test_release(release) + else: + # Invoke Helm tests for all deployed releases + # TODO(@drewwalters96): Support execution of tests in parallel. + for release_list in self.get_releases().values(): + for release in release_list: + self._test_release(release) + + def _test_release(self, release): + """Invoke Helm tests on a specified release + + Invokes Helm tests on a specified release using the Armada client + and logs all test results. + """ + LOG.info("Invoking Helm tests for release '{}'".format(release)) + try: + armada_test_release = self.armada_client.get_test_release( + release=release, + query=self.query, + timeout=None) + except errors.ClientError as client_error: + raise AirflowException(client_error) + + if armada_test_release: + LOG.info("Successfully executed Helm tests for release " + "'{}'".format(release)) + LOG.info(armada_test_release) + else: + # Dump logs from Armada API pods + self.get_k8s_logs() + raise AirflowException("Failed to execute Helms test for " + "release '{}'!".format(release)) + + +class ArmadaTestReleasesOperatorPlugin(AirflowPlugin): + """Creates ArmadaTestReleasesOperator in Airflow.""" + name = 'armada_test_releases_operator' + operators = [ArmadaTestReleasesOperator] diff --git a/src/bin/shipyard_airflow/tests/unit/plugins/test_armada_test_releases_operator.py b/src/bin/shipyard_airflow/tests/unit/plugins/test_armada_test_releases_operator.py new file mode 100644 index 00000000..e68e5814 --- /dev/null +++ b/src/bin/shipyard_airflow/tests/unit/plugins/test_armada_test_releases_operator.py @@ -0,0 +1,108 @@ +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Tests ArmadaTestReleasesOperator functionality""" +import os +from unittest import mock + +from airflow.exceptions import AirflowException +import pytest + +from shipyard_airflow.plugins.armada_base_operator import \ + ArmadaBaseOperator +from shipyard_airflow.plugins.armada_test_releases import \ + ArmadaTestReleasesOperator +from shipyard_airflow.plugins.ucp_base_operator import \ + UcpBaseOperator + + + +CONF_FILE = os.path.join(os.path.dirname(__file__), 'test.conf') + +ACTION_PARAMS = { + 'cleanup': True, + 'release': 'glance' +} + +RELEASES = { + 'ucp': ['armada', 'deckhand', 'shipyard'], + 'openstack': ['glance', 'heat', 'horizon', 'keystone'] +} + + +class TestArmadaTestReleasesOperator: + @mock.patch('shipyard_airflow.plugins.armada_test_releases.LOG.info') + @mock.patch.object(ArmadaBaseOperator, 'armada_client', create=True) + @mock.patch.object(ArmadaBaseOperator, 'get_releases', + return_value=RELEASES) + @mock.patch.object(ArmadaBaseOperator, 'get_tiller_info') + def test_do_execute(self, mock_tiller_info, mock_releases, mock_client, + mock_logs): + op = ArmadaTestReleasesOperator(main_dag_name='main', + shipyard_conf=CONF_FILE, + task_id='t1') + op.action_params = dict() + op.do_execute() + + # Verify Armada client called to test every release + calls = list() + for release_list in RELEASES.values(): + for release in release_list: + calls.append(mock.call( + release=release, + query=dict(), + timeout=None)) + mock_client.get_test_release.assert_has_calls(calls, any_order=True) + + # Verify test results logged + mock_logs.assert_called_with(mock_client.get_test_release.return_value) + + @mock.patch('shipyard_airflow.plugins.armada_test_releases.LOG.info') + @mock.patch.object(ArmadaBaseOperator, 'armada_client', create=True) + @mock.patch.object(ArmadaBaseOperator, 'get_tiller_info') + def test_do_execute_with_params(self, mock_tiller, mock_client, mock_logs): + op = ArmadaTestReleasesOperator(main_dag_name='main', + shipyard_conf=CONF_FILE, + task_id='t1') + op.action_params = ACTION_PARAMS + op.do_execute() + + # Verify Armada client called for single release with action params + cleanup = ACTION_PARAMS['cleanup'] + release = ACTION_PARAMS['release'] + mock_client.get_test_release.assert_called_once_with( + release=release, + query=dict(cleanup=cleanup), + timeout=None) + + # Verify test results logged + mock_logs.assert_called_with(mock_client.get_test_release.return_value) + + @mock.patch.object(ArmadaBaseOperator, 'armada_client', create=True) + @mock.patch.object(ArmadaBaseOperator, 'get_releases', + return_value=RELEASES) + @mock.patch.object(ArmadaBaseOperator, 'get_tiller_info') + @mock.patch.object(UcpBaseOperator, 'get_k8s_logs') + def test_do_execute_fail(self, mock_k8s_logs, mock_tiller_info, + mock_releases, mock_client): + mock_client.get_test_release.return_value = None + + op = ArmadaTestReleasesOperator(main_dag_name='main', + shipyard_conf=CONF_FILE, + task_id='t1') + op.action_params = dict() + + # Verify errors logged to pods + with pytest.raises(AirflowException): + op.do_execute() + mock_k8s_logs.assert_called_once()