From b9b0e27de01d449b55d0d4718997dfbe1891a27b Mon Sep 17 00:00:00 2001 From: Anthony Lin Date: Mon, 16 Apr 2018 10:43:42 +0000 Subject: [PATCH] Add UCP Base Operator 1) Refactor Drydock Base Operator to make use of the UCP Base Operator instead 2) Dump logs from Drydock Pods when there are Exceptions Change-Id: I3fbe03d13b5fc89a503cfb2c3c25751076718554 --- charts/shipyard/values.yaml | 2 + etc/shipyard/shipyard.conf.sample | 10 ++ shipyard_airflow/conf/config.py | 11 ++ .../plugins/drydock_base_operator.py | 139 ++++++++--------- .../plugins/drydock_deploy_nodes.py | 9 +- .../plugins/drydock_destroy_nodes.py | 13 +- .../plugins/drydock_prepare_nodes.py | 1 - .../plugins/drydock_prepare_site.py | 1 - .../plugins/drydock_validate_design.py | 16 +- .../plugins/drydock_verify_site.py | 1 - shipyard_airflow/plugins/ucp_base_operator.py | 143 ++++++++++++++++++ tools/resources/shipyard.conf | 2 + 12 files changed, 248 insertions(+), 100 deletions(-) create mode 100644 shipyard_airflow/plugins/ucp_base_operator.py diff --git a/charts/shipyard/values.yaml b/charts/shipyard/values.yaml index 7f47a94b..fa7bf4cd 100644 --- a/charts/shipyard/values.yaml +++ b/charts/shipyard/values.yaml @@ -389,6 +389,8 @@ conf: airflow: worker_endpoint_scheme: 'http' worker_port: 8793 + k8s_logs: + ucp_namespace: 'ucp' airflow_config_file: path: /usr/local/airflow/airflow.cfg airflow: diff --git a/etc/shipyard/shipyard.conf.sample b/etc/shipyard/shipyard.conf.sample index a21f2ba6..319808c0 100644 --- a/etc/shipyard/shipyard.conf.sample +++ b/etc/shipyard/shipyard.conf.sample @@ -249,6 +249,16 @@ #auth_section = +[k8s_logs] + +# +# From shipyard_airflow +# + +# The namespace of the UCP Pods (string value) +#ucp_namespace = ucp + + [logging] # diff --git a/shipyard_airflow/conf/config.py b/shipyard_airflow/conf/config.py index 278122c8..3575ca18 100644 --- a/shipyard_airflow/conf/config.py +++ b/shipyard_airflow/conf/config.py @@ -202,6 +202,17 @@ SECTIONS = [ ), ] ), + ConfigSection( + name='k8s_logs', + title='Parameters for K8s Pods Logs', + options=[ + cfg.StrOpt( + 'ucp_namespace', + default='ucp', + help='Namespace of UCP Pods' + ), + ] + ), ] diff --git a/shipyard_airflow/plugins/drydock_base_operator.py b/shipyard_airflow/plugins/drydock_base_operator.py index 9c915ce7..9d61bd6c 100644 --- a/shipyard_airflow/plugins/drydock_base_operator.py +++ b/shipyard_airflow/plugins/drydock_base_operator.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import json import logging import os @@ -19,7 +18,6 @@ import time from urllib.parse import urlparse from airflow.exceptions import AirflowException -from airflow.models import BaseOperator from airflow.plugins_manager import AirflowPlugin from airflow.utils.decorators import apply_defaults @@ -28,10 +26,12 @@ import drydock_provisioner.drydock_client.session as session from drydock_provisioner import error as errors from service_endpoint import ucp_service_endpoint from service_token import shipyard_service_token -from xcom_puller import XcomPuller +from ucp_base_operator import UcpBaseOperator + +LOG = logging.getLogger(__name__) -class DrydockBaseOperator(BaseOperator): +class DrydockBaseOperator(UcpBaseOperator): """Drydock Base Operator @@ -49,14 +49,10 @@ class DrydockBaseOperator(BaseOperator): drydock_svc_endpoint=None, drydock_svc_type='physicalprovisioner', drydock_task_id=None, - main_dag_name=None, node_filter=None, redeploy_server=None, - shipyard_conf=None, - sub_dag_name=None, svc_session=None, svc_token=None, - xcom_push=True, *args, **kwargs): """Initialization of DrydockBaseOperator object. @@ -66,62 +62,39 @@ class DrydockBaseOperator(BaseOperator): :param drydock_svc_endpoint: Drydock Service Endpoint :param drydock_svc_type: Drydock Service Type :param drydock_task_id: Drydock Task ID - :param main_dag_name: Parent Dag :param node_filter: A filter for narrowing the scope of the task. Valid fields are 'node_names', 'rack_names', 'node_tags'. Note that node filter is turned off by default, i.e. all nodes will be deployed. :param redeploy_server: Server to be redeployed - :param shipyard_conf: Location of shipyard.conf - :param sub_dag_name: Child Dag :param svc_session: Keystone Session :param svc_token: Keystone Token - :param xcom_push: xcom usage The Drydock operator assumes that prior steps have set xcoms for the action and the deployment configuration """ - super(DrydockBaseOperator, self).__init__(*args, **kwargs) + super(DrydockBaseOperator, + self).__init__( + pod_selector_pattern=[{'pod_pattern': 'drydock-api', + 'container': 'drydock-api'}], + *args, **kwargs) self.deckhand_design_ref = deckhand_design_ref self.deckhand_svc_type = deckhand_svc_type self.drydock_client = drydock_client self.drydock_svc_endpoint = drydock_svc_endpoint self.drydock_svc_type = drydock_svc_type self.drydock_task_id = drydock_task_id - self.main_dag_name = main_dag_name self.node_filter = node_filter self.redeploy_server = redeploy_server - self.shipyard_conf = shipyard_conf - self.sub_dag_name = sub_dag_name self.svc_session = svc_session self.svc_token = svc_token - self.xcom_push_flag = xcom_push - def execute(self, context): - - # Execute drydock base function - self.drydock_base(context) - - # Exeute child function - self.do_execute() - - def drydock_base(self, context): - # Initialize Variables - drydock_url = None - dd_session = None - - # Define task_instance - task_instance = context['task_instance'] - - # Set up and retrieve values from xcom - self.xcom_puller = XcomPuller(self.main_dag_name, task_instance) - self.action_info = self.xcom_puller.get_action_info() - self.dc = self.xcom_puller.get_deployment_configuration() + def run_base(self, context): # Logs uuid of action performed by the Operator - logging.info("DryDock Operator for action %s", self.action_info['id']) + LOG.info("DryDock Operator for action %s", self.action_info['id']) # Retrieve information of the server that we want to redeploy if user # executes the 'redeploy_server' dag @@ -131,18 +104,19 @@ class DrydockBaseOperator(BaseOperator): self.action_info['parameters']['server-name']) if self.redeploy_server: - logging.info("Server to be redeployed is %s", - self.redeploy_server) + LOG.info("Server to be redeployed is %s", + self.redeploy_server) self.node_filter = self.redeploy_server else: - raise AirflowException('Unable to retrieve information of ' - 'node to be redeployed!') + raise AirflowException('%s was unable to retrieve the ' + 'server to be redeployed.' + % self.__class__.__name__) # Retrieve Endpoint Information self.drydock_svc_endpoint = ucp_service_endpoint( self, svc_type=self.drydock_svc_type) - logging.info("Drydock endpoint is %s", self.drydock_svc_endpoint) + LOG.info("Drydock endpoint is %s", self.drydock_svc_endpoint) # Parse DryDock Service Endpoint drydock_url = urlparse(self.drydock_svc_endpoint) @@ -151,25 +125,25 @@ class DrydockBaseOperator(BaseOperator): # information. # The DrydockSession will care for TCP connection pooling # and header management - logging.info("Build DryDock Session") + LOG.info("Build DryDock Session") dd_session = session.DrydockSession(drydock_url.hostname, port=drydock_url.port, auth_gen=self._auth_gen) # Raise Exception if we are not able to set up the session if dd_session: - logging.info("Successfully Set Up DryDock Session") + LOG.info("Successfully Set Up DryDock Session") else: raise AirflowException("Failed to set up Drydock Session!") # Use the DrydockSession to build a DrydockClient that can # be used to make one or more API calls - logging.info("Create DryDock Client") + LOG.info("Create DryDock Client") self.drydock_client = client.DrydockClient(dd_session) # Raise Exception if we are not able to build the client if self.drydock_client: - logging.info("Successfully Set Up DryDock client") + LOG.info("Successfully Set Up DryDock client") else: raise AirflowException("Failed to set up Drydock Client!") @@ -177,7 +151,7 @@ class DrydockBaseOperator(BaseOperator): deckhand_svc_endpoint = ucp_service_endpoint( self, svc_type=self.deckhand_svc_type) - logging.info("Deckhand endpoint is %s", deckhand_svc_endpoint) + LOG.info("Deckhand endpoint is %s", deckhand_svc_endpoint) # Retrieve last committed revision id committed_revision_id = self.xcom_puller.get_design_version() @@ -190,8 +164,8 @@ class DrydockBaseOperator(BaseOperator): str(committed_revision_id), "rendered-documents") if self.deckhand_design_ref: - logging.info("Design YAMLs will be retrieved from %s", - self.deckhand_design_ref) + LOG.info("Design YAMLs will be retrieved from %s", + self.deckhand_design_ref) else: raise AirflowException("Unable to Retrieve Design Reference!") @@ -207,7 +181,7 @@ class DrydockBaseOperator(BaseOperator): create_task_response = {} # Node Filter - logging.info("Nodes Filter List: %s", self.node_filter) + LOG.info("Nodes Filter List: %s", self.node_filter) try: # Create Task @@ -217,12 +191,15 @@ class DrydockBaseOperator(BaseOperator): node_filter=self.node_filter) except errors.ClientError as client_error: + # Dump logs from Drydock pods + self.get_k8s_logs() + raise AirflowException(client_error) # Retrieve Task ID self.drydock_task_id = create_task_response['task_id'] - logging.info('Drydock %s task ID is %s', - task_action, self.drydock_task_id) + LOG.info('Drydock %s task ID is %s', + task_action, self.drydock_task_id) # Raise Exception if we are not able to get the task_id from # Drydock @@ -239,7 +216,7 @@ class DrydockBaseOperator(BaseOperator): # We will round off to nearest whole number end_range = round(int(time_out) / int(interval)) - logging.info('Task ID is %s', self.drydock_task_id) + LOG.info('Task ID is %s', self.drydock_task_id) # Query task status for i in range(0, end_range + 1): @@ -251,17 +228,20 @@ class DrydockBaseOperator(BaseOperator): task_status = task_state['status'] task_result = task_state['result']['status'] - logging.info("Current status of task id %s is %s", - self.drydock_task_id, task_status) + LOG.info("Current status of task id %s is %s", + self.drydock_task_id, task_status) except errors.ClientError as client_error: + # Dump logs from Drydock pods + self.get_k8s_logs() + raise AirflowException(client_error) except: # There can be situations where there are intermittent network # issues that prevents us from retrieving the task state. We # will want to retry in such situations. - logging.warning("Unable to retrieve task state. Retrying...") + LOG.warning("Unable to retrieve task state. Retrying...") # Raise Time Out Exception if task_status == 'running' and i == end_range: @@ -270,21 +250,23 @@ class DrydockBaseOperator(BaseOperator): # Exit 'for' loop if the task is in 'complete' or 'terminated' # state if task_status in ['complete', 'terminated']: - logging.info('Task result is %s', task_result) + LOG.info('Task result is %s', task_result) break else: time.sleep(int(interval)) # Get final task result if task_result == 'success': - logging.info('Task id %s has been successfully completed', - self.drydock_task_id) + LOG.info('Task id %s has been successfully completed', + self.drydock_task_id) else: self.task_failure(True) def task_failure(self, _task_failure): + # Dump logs from Drydock pods + self.get_k8s_logs() - logging.info('Retrieving all tasks records from Drydock...') + LOG.info('Retrieving all tasks records from Drydock...') try: # Get all tasks records @@ -304,39 +286,38 @@ class DrydockBaseOperator(BaseOperator): # Since there is only 1 failed parent task, we will print index 0 # of the list if failed_task: - logging.error('%s task has either failed or timed out', - failed_task[0]['action']) + LOG.error('%s task has either failed or timed out', + failed_task[0]['action']) - logging.error(json.dumps(failed_task[0], - indent=4, - sort_keys=True)) + LOG.error(json.dumps(failed_task[0], + indent=4, + sort_keys=True)) # Get the list of subtasks belonging to the failed parent task subtask_id_list = failed_task[0]['subtask_id_list'] - logging.info("Printing information of failed sub-tasks...") + LOG.info("Printing information of failed sub-tasks...") # Print detailed information of failed step(s) under each subtask # This will help to provide additional information for troubleshooting # purpose. for subtask_id in subtask_id_list: - logging.info("Retrieving details of subtask %s...", - subtask_id) + LOG.info("Retrieving details of subtask %s...", subtask_id) # Retrieve task information task = all_task_ids.get(subtask_id) if task: # Print subtask action and state - logging.info("%s subtask is in %s state", - task['action'], - task['result']['status']) + LOG.info("%s subtask is in %s state", + task['action'], + task['result']['status']) # Print list containing steps in failure state if task['result']['failures']: - logging.error("The following steps have failed:") - logging.error(task['result']['failures']) + LOG.error("The following steps have failed:") + LOG.error(task['result']['failures']) message_list = ( task['result']['details']['messageList'] or []) @@ -346,12 +327,12 @@ class DrydockBaseOperator(BaseOperator): is_error = message['error'] is True if is_error: - logging.error(json.dumps(message, - indent=4, - sort_keys=True)) + LOG.error(json.dumps(message, + indent=4, + sort_keys=True)) else: - logging.info("No failed step detected for subtask %s", - subtask_id) + LOG.info("No failed step detected for subtask %s", + subtask_id) else: raise AirflowException("Unable to retrieve subtask info!") diff --git a/shipyard_airflow/plugins/drydock_deploy_nodes.py b/shipyard_airflow/plugins/drydock_deploy_nodes.py index 63afeaa7..4c8b1f14 100644 --- a/shipyard_airflow/plugins/drydock_deploy_nodes.py +++ b/shipyard_airflow/plugins/drydock_deploy_nodes.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging import time @@ -20,6 +19,8 @@ from airflow.plugins_manager import AirflowPlugin from check_k8s_node_status import check_node_status from drydock_base_operator import DrydockBaseOperator +LOG = logging.getLogger(__name__) + class DrydockDeployNodesOperator(DrydockBaseOperator): @@ -47,9 +48,9 @@ class DrydockDeployNodesOperator(DrydockBaseOperator): # and wait before checking the state of the cluster join process. join_wait = self.dc['physical_provisioner.join_wait'] - logging.info("All nodes deployed in MAAS") - logging.info("Wait for %d seconds before checking node state...", - join_wait) + LOG.info("All nodes deployed in MAAS") + LOG.info("Wait for %d seconds before checking node state...", + join_wait) time.sleep(join_wait) diff --git a/shipyard_airflow/plugins/drydock_destroy_nodes.py b/shipyard_airflow/plugins/drydock_destroy_nodes.py index ef5918a1..33693abb 100644 --- a/shipyard_airflow/plugins/drydock_destroy_nodes.py +++ b/shipyard_airflow/plugins/drydock_destroy_nodes.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import logging import time @@ -19,6 +18,8 @@ from airflow.plugins_manager import AirflowPlugin from drydock_base_operator import DrydockBaseOperator +LOG = logging.getLogger(__name__) + class DrydockDestroyNodeOperator(DrydockBaseOperator): @@ -31,17 +32,13 @@ class DrydockDestroyNodeOperator(DrydockBaseOperator): def do_execute(self): - # Retrieve query interval and timeout - q_interval = self.dc['physical_provisioner.destroy_interval'] - task_timeout = self.dc['physical_provisioner.destroy_timeout'] - # NOTE: This is a PlaceHolder function. The 'destroy_node' # functionalities in DryDock is being worked on and is not # ready at the moment. - logging.info("Destroying node %s from cluster...", - self.redeploy_server) + LOG.info("Destroying node %s from cluster...", + self.redeploy_server) time.sleep(15) - logging.info("Successfully deleted node %s", self.redeploy_server) + LOG.info("Successfully deleted node %s", self.redeploy_server) class DrydockDestroyNodeOperatorPlugin(AirflowPlugin): diff --git a/shipyard_airflow/plugins/drydock_prepare_nodes.py b/shipyard_airflow/plugins/drydock_prepare_nodes.py index 56d70954..ab2fb008 100644 --- a/shipyard_airflow/plugins/drydock_prepare_nodes.py +++ b/shipyard_airflow/plugins/drydock_prepare_nodes.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - from airflow.plugins_manager import AirflowPlugin from drydock_base_operator import DrydockBaseOperator diff --git a/shipyard_airflow/plugins/drydock_prepare_site.py b/shipyard_airflow/plugins/drydock_prepare_site.py index a9edaa77..04264a31 100644 --- a/shipyard_airflow/plugins/drydock_prepare_site.py +++ b/shipyard_airflow/plugins/drydock_prepare_site.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - from airflow.plugins_manager import AirflowPlugin from drydock_base_operator import DrydockBaseOperator diff --git a/shipyard_airflow/plugins/drydock_validate_design.py b/shipyard_airflow/plugins/drydock_validate_design.py index af853055..72186216 100644 --- a/shipyard_airflow/plugins/drydock_validate_design.py +++ b/shipyard_airflow/plugins/drydock_validate_design.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import json import logging import os @@ -22,6 +21,8 @@ from airflow.exceptions import AirflowException from drydock_base_operator import DrydockBaseOperator +LOG = logging.getLogger(__name__) + class DrydockValidateDesignOperator(DrydockBaseOperator): @@ -38,7 +39,7 @@ class DrydockValidateDesignOperator(DrydockBaseOperator): validation_endpoint = os.path.join(self.drydock_svc_endpoint, 'validatedesign') - logging.info("Validation Endpoint is %s", validation_endpoint) + LOG.info("Validation Endpoint is %s", validation_endpoint) # Define Headers and Payload headers = { @@ -53,7 +54,7 @@ class DrydockValidateDesignOperator(DrydockBaseOperator): } # Requests DryDock to validate site design - logging.info("Waiting for DryDock to validate site design...") + LOG.info("Waiting for DryDock to validate site design...") try: design_validate_response = requests.post(validation_endpoint, @@ -67,15 +68,18 @@ class DrydockValidateDesignOperator(DrydockBaseOperator): validate_site_design = design_validate_response.text # Print response - logging.info("Retrieving DryDock validate site design response...") - logging.info(json.loads(validate_site_design)) + LOG.info("Retrieving DryDock validate site design response...") + LOG.info(json.loads(validate_site_design)) # Check if site design is valid status = str(json.loads(validate_site_design).get('status', 'unspecified')) if status.lower() == 'success': - logging.info("DryDock Site Design has been successfully validated") + LOG.info("DryDock Site Design has been successfully validated") else: + # Dump logs from Drydock pods + self.get_k8s_logs() + raise AirflowException("DryDock Site Design Validation Failed " "with status: {}!".format(status)) diff --git a/shipyard_airflow/plugins/drydock_verify_site.py b/shipyard_airflow/plugins/drydock_verify_site.py index 3954f72d..adbfc0e8 100644 --- a/shipyard_airflow/plugins/drydock_verify_site.py +++ b/shipyard_airflow/plugins/drydock_verify_site.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - from airflow.plugins_manager import AirflowPlugin from drydock_base_operator import DrydockBaseOperator diff --git a/shipyard_airflow/plugins/ucp_base_operator.py b/shipyard_airflow/plugins/ucp_base_operator.py new file mode 100644 index 00000000..2a1a35a4 --- /dev/null +++ b/shipyard_airflow/plugins/ucp_base_operator.py @@ -0,0 +1,143 @@ +# Copyright 2018 AT&T Intellectual Property. All other rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import configparser +import logging +import math +from datetime import datetime + +from airflow.models import BaseOperator +from airflow.plugins_manager import AirflowPlugin +from airflow.utils.decorators import apply_defaults + +from get_k8s_logs import get_pod_logs +from get_k8s_logs import K8sLoggingException +from xcom_puller import XcomPuller + +LOG = logging.getLogger(__name__) + + +class UcpBaseOperator(BaseOperator): + + """UCP Base Operator + + All UCP related workflow operators will use the UCP base + operator as the parent and inherit attributes and methods + from this class + + """ + + @apply_defaults + def __init__(self, + main_dag_name=None, + pod_selector_pattern=None, + shipyard_conf=None, + start_time=None, + sub_dag_name=None, + xcom_push=True, + *args, **kwargs): + """Initialization of UcpBaseOperator object. + + :param main_dag_name: Parent Dag + :param pod_selector_pattern: A list containing the information on + the patterns of the Pod name and name + of the associated container for log + queries. This will allow us to query + multiple components, e.g. MAAS and + Drydock at the same time. It also allows + us to query the logs of specific container + in Pods with multiple containers. For + instance the Airflow worker pod contains + both the airflow-worker container and the + log-rotate container. + :param shipyard_conf: Location of shipyard.conf + :param start_time: Time when Operator gets executed + :param sub_dag_name: Child Dag + :param xcom_push: xcom usage + + """ + + super(UcpBaseOperator, self).__init__(*args, **kwargs) + self.main_dag_name = main_dag_name + self.pod_selector_pattern = pod_selector_pattern or [] + self.shipyard_conf = shipyard_conf + self.start_time = datetime.now() + self.sub_dag_name = sub_dag_name + self.xcom_push_flag = xcom_push + + def execute(self, context): + + # Execute UCP base function + self.ucp_base(context) + + # Execute base function + self.run_base(context) + + # Exeute child function + self.do_execute() + + def ucp_base(self, context): + + LOG.info("Running UCP Base Operator...") + + # Read and parse shiyard.conf + config = configparser.ConfigParser() + config.read(self.shipyard_conf) + + # Initialize variable + self.ucp_namespace = config.get('k8s_logs', 'ucp_namespace') + + # Define task_instance + task_instance = context['task_instance'] + + # Set up and retrieve values from xcom + self.xcom_puller = XcomPuller(self.main_dag_name, task_instance) + self.action_info = self.xcom_puller.get_action_info() + self.dc = self.xcom_puller.get_deployment_configuration() + + def get_k8s_logs(self): + """Retrieve Kubernetes pod/container logs specified by an opererator + + This method is "best effort" and should not prevent the progress of + the workflow processing + """ + if self.pod_selector_pattern: + for selector in self.pod_selector_pattern: + # Get difference in current time and time when the + # operator was first executed (in seconds) + t_diff = (datetime.now() - self.start_time).total_seconds() + + # Note that we will end up with a floating number for + # 't_diff' and will need to round it up to the nearest + # integer + t_diff_int = int(math.ceil(t_diff)) + + try: + get_pod_logs(selector['pod_pattern'], + self.ucp_namespace, + selector['container'], + t_diff_int) + + except K8sLoggingException as e: + LOG.error(e) + + else: + LOG.debug("There are no pod logs specified to retrieve") + + +class UcpBaseOperatorPlugin(AirflowPlugin): + + """Creates UcpBaseOperator in Airflow.""" + + name = 'ucp_base_operator_plugin' + operators = [UcpBaseOperator] diff --git a/tools/resources/shipyard.conf b/tools/resources/shipyard.conf index 02d4ea7b..e35d4e3c 100644 --- a/tools/resources/shipyard.conf +++ b/tools/resources/shipyard.conf @@ -40,6 +40,8 @@ project_domain_name = default project_name = service user_domain_name = default username = shipyard +[k8s_logs] +ucp_namespace = ucp [requests_config] airflow_log_connect_timeout = 5 airflow_log_read_timeout = 300