@ -13,9 +13,10 @@
# limitations under the License.
""" Deployment Configuration
Retrieves the deployment configuration from Deckhand and places the values
Retrieves the deployment - configuration from Deckhand and places the values
retrieved into a dictionary
"""
import configparser
import logging
from airflow . exceptions import AirflowException
@ -34,12 +35,13 @@ except ImportError:
from shipyard_airflow . shipyard_const import CustomHeaders
LOG = logging . getLogger ( __name__ )
DOCUMENT_INFO = ' document_info '
class DeploymentConfigurationOperator ( BaseOperator ) :
""" Deployment Configuration Operator
Retrieve the deployment configuration from Deckhand for use throughout
Retrieve the deployment - configuration from Deckhand for use throughout
the workflow . Put the configuration into a dictionary .
Failures are raised :
@ -89,18 +91,23 @@ class DeploymentConfigurationOperator(BaseOperator):
: param main_dag_name : Parent Dag
: param shipyard_conf : Location of shipyard . conf
"""
super ( DeploymentConfigurationOperator , self ) . __init__ ( * args , * * kwargs )
self . main_dag_name = main_dag_name
self . shipyard_conf = shipyard_conf
self . action_info = { }
def _read_config ( self ) :
""" Read in and parse the shipyard config """
self . config = configparser . ConfigParser ( )
self . config . read ( self . shipyard_conf )
def execute ( self , context ) :
""" Perform Deployment Configuration extraction """
self . _read_config ( )
revision_id = self . get_revision_id ( context . get ( ' task_instance ' ) )
doc = self . get_doc ( revision_id )
converted = self . map_config_keys ( doc )
# return the mapped configuration so that it can be placed on xcom
return converted
@ -116,7 +123,7 @@ class DeploymentConfigurationOperator(BaseOperator):
revision_id = self . action_info [ ' committed_rev_id ' ]
if revision_id :
LOG . info ( " Revision is set to: %s for deployment configuration " ,
LOG . info ( " Revision is set to: %s for deployment- configuration " ,
revision_id )
return revision_id
# either revision id was not on xcom, or the task_instance is messed
@ -127,14 +134,16 @@ class DeploymentConfigurationOperator(BaseOperator):
def get_doc ( self , revision_id ) :
""" Get the DeploymentConfiguration document dictionary from Deckhand """
LOG . info (
" Attempting to retrieve shipyard/DeploymentConfiguration/v1, "
" deployment-configuration from Deckhand "
)
filters = {
" schema " : " shipyard/DeploymentConfiguration/v1 " ,
" metadata.name " : " deployment-configuration "
}
schema_fallback = ' shipyard/DeploymentConfiguration/v1 '
schema = self . config . get ( DOCUMENT_INFO ,
' deployment_configuration_schema ' ,
fallback = schema_fallback )
name = self . config . get ( DOCUMENT_INFO ,
' deployment_configuration_name ' ,
fallback = ' deployment-configuration ' )
LOG . info ( " Attempting to retrieve {} , {} from Deckhand " . format ( schema ,
name ) )
filters = { " schema " : schema , " metadata.name " : name }
# Create additional headers dict to pass context marker
# and end user
@ -160,7 +169,7 @@ class DeploymentConfigurationOperator(BaseOperator):
except AttributeError :
failed_url = " No URL generated "
LOG . exception ( ex )
raise AirflowException ( " Failed to retrieve deployment "
raise AirflowException ( " Failed to retrieve deployment- "
" configuration yaml using url: "
" {} " . format ( failed_url ) )
@ -178,7 +187,7 @@ class DeploymentConfigurationOperator(BaseOperator):
Converts to a more simple map of key - value pairs
"""
LOG . info ( " Mapping keys from deployment configuration " )
LOG . info ( " Mapping keys from deployment- configuration " )
return {
cfg_key : self . get_cfg_value ( cfg_data , cfg_key , cfg_default )
for cfg_key , cfg_default in