diff --git a/charts/shipyard/values.yaml b/charts/shipyard/values.yaml index 5ff9477d..6386f6f0 100644 --- a/charts/shipyard/values.yaml +++ b/charts/shipyard/values.yaml @@ -389,6 +389,11 @@ conf: shipyard: base: web_server: + pool_size: 15 + pool_pre_ping: true + pool_timeout: 30 + pool_overflow: 10 + connection_recycle: -1 profiler: false shipyard: service_type: shipyard @@ -413,6 +418,8 @@ conf: deckhand_client_read_timeout: 300 validation_connect_timeout: 5 validation_read_timeout: 300 + notes_connect_timeout: 5 + notes_read_timeout: 10 airflow: worker_endpoint_scheme: 'http' worker_port: 8793 diff --git a/src/bin/shipyard_airflow/shipyard_airflow/plugins/drydock_base_operator.py b/src/bin/shipyard_airflow/shipyard_airflow/plugins/drydock_base_operator.py index 3e3eb93c..26a848f1 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/plugins/drydock_base_operator.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/plugins/drydock_base_operator.py @@ -419,6 +419,7 @@ class DrydockBaseOperator(UcpBaseOperator): ) pass _report_task_info(task_id, task_result, task_status) + self._create_drydock_results_notes(task_id, task_result) # for each child, report only the step info, do not add to overall # success list. @@ -436,6 +437,97 @@ class DrydockBaseOperator(UcpBaseOperator): # deduplicate and return return set(success_nodes) + def _create_drydock_results_notes(self, dd_task_id, task_result): + """Generate a note in the database with a url to the builddata + + :param dd_task_id: the id of the Drydock task. Note that `self.task_id` + is the workflow task_id, not the same drydock task_id. + :param task_result: the task result object containing the info needed + to produce a note. + + Example task result: + { + 'status': 'success', + 'kind': 'Status', + 'failures': [], + 'apiVersion': 'v1.0', + 'metadata': {}, + 'details': { + 'errorCount': 0, + 'messageList': [{ + 'error': False, + 'context': 'n2', + 'context_type': 'node', + 'extra': '{}', + 'ts': '2018-10-12 16:09:53.778696', + 'message': 'Acquiring node n2 for deployment' + }] + }, + 'successes': ['n2'], + 'links': [{ + 'rel': 'detail_logs', + 'href': 'http://drydock-api.ucp.svc.cluster.local:9000/api/...' + }], + 'reason': None, + 'message': None + } + """ + for msg in task_result.get('details', {}).get('messageList', []): + try: + if msg.get('message'): + error = msg.get('error', False) + msg_text = "{}:{}:{}{}".format( + msg.get('context_type', 'N/A'), + msg.get('context', 'N/A'), + msg.get('message'), + " (error)" if error else "") + self.notes_helper.make_step_note( + action_id=self.action_id, + step_id=self.task_id, + note_val=msg_text, + subject=dd_task_id, + sub_type="Task Message", + note_timestamp=msg.get('ts'), + verbosity=3) + except Exception as ex: + LOG.warn("Error while creating a task result note, " + "processing continues. Source info %s", msg) + LOG.exception(ex) + + links = task_result.get('links', []) + for link in links: + try: + rel = link.get('rel') + href = link.get('href') + extra = _get_context_info_from_url(href) + if rel and href: + self.notes_helper.make_step_note( + action_id=self.action_id, + step_id=self.task_id, + note_val="{}{}".format(rel, extra), + subject=dd_task_id, + sub_type="Linked Task Info", + link_url=href, + is_auth_link=True, + verbosity=5) + except Exception as ex: + LOG.warn("Error while creating a link-based note, " + "processing continues. Source info: %s", link) + LOG.exception(ex) + + +def _get_context_info_from_url(url_string): + """Examine a url for helpful info for use in a note + + :param url_string: The url to examine + :returns: String of helpful information + Strings returned should include a leading space. + """ + if url_string.endswith("/builddata"): + return " - builddata" + # Other "helpful" patterns would show up here. + return "" + def gen_node_name_filter(node_names): """Generates a drydock compatible node filter using only node names diff --git a/src/bin/shipyard_airflow/shipyard_airflow/plugins/drydock_nodes.py b/src/bin/shipyard_airflow/shipyard_airflow/plugins/drydock_nodes.py index 89c09378..d5f917d4 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/plugins/drydock_nodes.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/plugins/drydock_nodes.py @@ -78,22 +78,60 @@ class DrydockNodesOperator(DrydockBaseOperator): # All groups "complete" (as they're going to be). Report summary dgm.report_group_summary() dgm.report_node_summary() + self._gen_summary_notes(dgm) + if dgm.critical_groups_failed(): raise AirflowException( "One or more deployment groups marked as critical have failed" ) else: LOG.info("All critical groups have met their success criteria") - # TODO (bryan-strassner) it is very possible that many nodes failed - # deployment, but all critical groups had enough success to - # continue processing. This will be non-obvious to the casual - # observer of the workflow. A likely enhancement is to allow - # notes be added to the shipyard action associated with this - # workflow that would be reported back to the end user doing a - # describe of the action. This will require new database structures - # to hold the notes, and a means to insert the notes. A shared - # functionality in the base ucp operator or a common module would - # be a reasonable way to support this. + + def _gen_summary_notes(self, dgm): + """Generate notes for the step summarizing the deployment results + + :param dgm: The deployment group manager containing results + """ + # Assemble the nodes into a note + stages = [Stage.NOT_STARTED, Stage.DEPLOYED, Stage.FAILED] + nodes_by_stage = [] + for stage in stages: + nodes = dgm.get_nodes(stage=stage) + if nodes: + nodes_by_stage.append("{}: {}".format( + stage, ", ".join(nodes))) + if nodes_by_stage: + self.notes_helper.make_step_note( + action_id=self.action_id, + step_id=self.task_id, + note_val="; ".join(nodes_by_stage), + subject=self.main_dag_name, + sub_type="Node Deployment", + verbosity=1) + + # assemble the group info into a note + # rotate list into a dict by stage + groups_stages = {} + for group in dgm.group_list(): + if group.stage not in groups_stages: + groups_stages[group.stage] = [] + groups_stages[group.stage].append("{}{}".format( + group.name, "(critical)" if group.critical else "")) + + # iterate stage keyed dictionary for text summary + groups_by_stage = [ + "{}: {}".format(stage, ", ".join(group_list)) + for stage, group_list in groups_stages.items() + ] + + if groups_by_stage: + self.notes_helper.make_step_note( + action_id=self.action_id, + step_id=self.task_id, + note_val="; ".join(groups_by_stage), + subject=self.main_dag_name, + sub_type="Deployment Groups", + verbosity=1) def _setup_configured_values(self): """Sets self. values from the deployment configuration""" diff --git a/src/bin/shipyard_airflow/shipyard_airflow/plugins/ucp_base_operator.py b/src/bin/shipyard_airflow/shipyard_airflow/plugins/ucp_base_operator.py index 58b51d8a..1a396301 100644 --- a/src/bin/shipyard_airflow/shipyard_airflow/plugins/ucp_base_operator.py +++ b/src/bin/shipyard_airflow/shipyard_airflow/plugins/ucp_base_operator.py @@ -21,12 +21,14 @@ from airflow.exceptions import AirflowException from airflow.models import BaseOperator from airflow.plugins_manager import AirflowPlugin from airflow.utils.decorators import apply_defaults +import sqlalchemy try: from deckhand_client_factory import DeckhandClientFactory import service_endpoint from get_k8s_logs import get_pod_logs from get_k8s_logs import K8sLoggingException + from service_token import shipyard_service_token from xcom_puller import XcomPuller except ImportError: from shipyard_airflow.plugins.deckhand_client_factory import \ @@ -34,10 +36,20 @@ except ImportError: from shipyard_airflow.plugins import service_endpoint from shipyard_airflow.plugins.get_k8s_logs import get_pod_logs from shipyard_airflow.plugins.get_k8s_logs import K8sLoggingException + from shipyard_airflow.plugins.service_token import shipyard_service_token from shipyard_airflow.plugins.xcom_puller import XcomPuller from shipyard_airflow.common.document_validators.document_validation_utils \ import DocumentValidationUtils +from shipyard_airflow.common.notes.notes import NotesManager +from shipyard_airflow.common.notes.notes_helper import NotesHelper +from shipyard_airflow.common.notes.storage_impl_db import \ + ShipyardSQLNotesStorage + +# Configuration sections +BASE = 'base' +K8S_LOGS = 'k8s_logs' +REQUESTS_CONFIG = 'requests_config' LOG = logging.getLogger(__name__) @@ -89,16 +101,22 @@ class UcpBaseOperator(BaseOperator): self.shipyard_conf = shipyard_conf self.start_time = datetime.now() self.xcom_push_flag = xcom_push + # lazy init field to hold a shipyard_db_engine + self._shipyard_db_engine = None def execute(self, context): # Setup values that depend on the shipyard configuration self.doc_utils = _get_document_util(self.shipyard_conf) self.endpoints = service_endpoint.ServiceEndpoints(self.shipyard_conf) + # Read and parse shiyard.conf + self.config = configparser.ConfigParser() + self.config.read(self.shipyard_conf) + # Execute Airship base function self.ucp_base(context) - # Execute base function + # Execute base function for child operator self.run_base(context) if self.continue_processing: @@ -119,12 +137,13 @@ class UcpBaseOperator(BaseOperator): LOG.info("Running Airship Base Operator...") - # Read and parse shiyard.conf - config = configparser.ConfigParser() - config.read(self.shipyard_conf) + # Configure the notes helper for this run of an operator + # establishes self.notes_helper + self._setup_notes_helper() - # Initialize variable - self.ucp_namespace = config.get('k8s_logs', 'ucp_namespace') + # Initialize variable that indicates the kubernetes namespace for the + # Airship components + self.ucp_namespace = self.config.get(K8S_LOGS, 'ucp_namespace') # Define task_instance self.task_instance = context['task_instance'] @@ -137,6 +156,8 @@ class UcpBaseOperator(BaseOperator): # Set up other common-use values self.action_id = self.action_info['id'] + # extract the `task` or `step` name for easy access + self.task_id = self.task_instance.task_id self.revision_id = self.action_info['committed_rev_id'] self.action_params = self.action_info.get('parameters', {}) self.design_ref = self._deckhand_design_ref() @@ -250,6 +271,58 @@ class UcpBaseOperator(BaseOperator): # broken. Raise an Airflow Exception. raise AirflowException(ex) + def _get_shipyard_db_engine(self): + """Lazy initialize an engine for the Shipyard database. + + :returns: a SQLAlchemy engine for the Shipyard database. + + Developer's Note: Initially the idea was to use the PostgresHook and + retrieve an engine from there as is done with the concurrency check, + but since we have easy access to a configuration file, this does + direct SQLAlchemy to get the engine. By using the config, the database + connection is not exposed as environment variables -- which is one way + that Airflow registers database connections for use by the dbApiHook + """ + if self._shipyard_db_engine is None: + connection_string = self.config.get(BASE, 'postgresql_db') + pool_size = self.config.getint(BASE, 'pool_size') + max_overflow = self.config.getint(BASE, 'pool_overflow') + pool_pre_ping = self.config.getboolean(BASE, 'pool_pre_ping') + pool_recycle = self.config.getint(BASE, 'connection_recycle') + pool_timeout = self.config.getint(BASE, 'pool_timeout') + self._shipyard_db_engine = sqlalchemy.create_engine( + connection_string, pool_size=pool_size, + max_overflow=max_overflow, + pool_pre_ping=pool_pre_ping, + pool_recycle=pool_recycle, + pool_timeout=pool_timeout + ) + LOG.info("Initialized Shipyard database connection with pool " + "size: %d, max overflow: %d, pool pre ping: %s, pool " + "recycle: %d, and pool timeout: %d", + pool_size, max_overflow, + pool_pre_ping, pool_recycle, + pool_timeout) + + return self._shipyard_db_engine + + @shipyard_service_token + def _token_getter(self): + # Generator method to get a shipyard service token + return self.svc_token + + def _setup_notes_helper(self): + """Setup a notes helper for use by all descendent operators""" + connect_timeout = self.config.get(REQUESTS_CONFIG, + 'notes_connect_timeout') + read_timeout = self.config.get(REQUESTS_CONFIG, 'notes_read_timeout') + self.notes_helper = NotesHelper( + NotesManager( + storage=ShipyardSQLNotesStorage(self._get_shipyard_db_engine), + get_token=self._token_getter, + connect_timeout=connect_timeout, + read_timeout=read_timeout)) + def _get_document_util(shipyard_conf): """Retrieve an instance of the DocumentValidationUtils""" diff --git a/src/bin/shipyard_airflow/tests/unit/plugins/test.conf b/src/bin/shipyard_airflow/tests/unit/plugins/test.conf index 24dca74b..5da374ad 100644 --- a/src/bin/shipyard_airflow/tests/unit/plugins/test.conf +++ b/src/bin/shipyard_airflow/tests/unit/plugins/test.conf @@ -1,3 +1,16 @@ +[base] +postgresl_db = postgresql+psycopg2://shipyard:changeme@postgresql.ucp:5432/shipyard +pool_size = 15 +pool_pre_ping = true +pool_timeout = 30 +pool_overflow = 10 +connection_recycle = -1 +profiler = false + +[requests_config] +notes_connect_timeout = 5 +notes_read_timeout = 10 + [keystone_authtoken] auth_section = keystone_authtoken auth_type = password diff --git a/src/bin/shipyard_airflow/tests/unit/plugins/test_drydock_nodes_operator.py b/src/bin/shipyard_airflow/tests/unit/plugins/test_drydock_nodes_operator.py index 83964781..3987ca5a 100644 --- a/src/bin/shipyard_airflow/tests/unit/plugins/test_drydock_nodes_operator.py +++ b/src/bin/shipyard_airflow/tests/unit/plugins/test_drydock_nodes_operator.py @@ -29,6 +29,9 @@ from shipyard_airflow.common.deployment_group.deployment_group import ( from shipyard_airflow.common.deployment_group.deployment_group_manager import ( DeploymentGroupManager ) +from shipyard_airflow.common.notes.notes import NotesManager +from shipyard_airflow.common.notes.notes_helper import NotesHelper +from shipyard_airflow.common.notes.storage_impl_mem import MemoryNotesStorage from shipyard_airflow.plugins.drydock_base_operator import ( gen_node_name_filter, @@ -213,6 +216,13 @@ def _gen_pe_func(mode, stand_alone=False): else: return _func_self +def get_notes_helper(): + """Setup a notes helper using the in-memory storage module""" + return NotesHelper(NotesManager( + storage=MemoryNotesStorage(), + get_token=lambda: "fake_token") + ) + class TestDrydockNodesOperator: def test_default_deployment_strategy(self): @@ -290,6 +300,10 @@ class TestDrydockNodesOperator: DeploymentConfigurationOperator.config_keys_defaults ) op.design_ref = {} + op.notes_helper = get_notes_helper() + op.action_id = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + op.task_id = "prepare_and_deploy_nodes" + op.do_execute() assert get_dgm.call_count == 1 assert nl.call_count == 1 @@ -312,6 +326,10 @@ class TestDrydockNodesOperator: DeploymentConfigurationOperator.config_keys_defaults ) op.design_ref = {} + op.notes_helper = get_notes_helper() + op.action_id = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + op.task_id = "prepare_and_deploy_nodes" + op.do_execute() assert get_dgm.call_count == 1 @@ -459,6 +477,10 @@ class TestDrydockNodesOperator: DeploymentConfigurationOperator.config_keys_defaults ) op.design_ref = {"a": "b"} + op.notes_helper = get_notes_helper() + op.action_id = "ABCDEFGHIJKLMNOPQRSTUVWXYZ" + op.task_id = "prepare_and_deploy_nodes" + op.do_execute() assert "critical groups have met their success criteria" in caplog.text