Refactor Armada Base Operator
Make use of static method to initialize 'armada_client' and 'deckhand_design_ref' Change-Id: Iab79efd70fda625fd6220eb44cfc98d4cbb5f97a
This commit is contained in:
parent
18e27e6a30
commit
0a7b2c2783
@ -42,11 +42,8 @@ class ArmadaBaseOperator(BaseOperator):
|
||||
|
||||
@apply_defaults
|
||||
def __init__(self,
|
||||
deckhand_design_ref=None,
|
||||
deckhand_svc_type='deckhand',
|
||||
armada_client=None,
|
||||
armada_svc_endpoint=None,
|
||||
armada_svc_type='armada',
|
||||
deckhand_svc_type='deckhand',
|
||||
main_dag_name=None,
|
||||
query={},
|
||||
shipyard_conf=None,
|
||||
@ -57,11 +54,8 @@ class ArmadaBaseOperator(BaseOperator):
|
||||
*args, **kwargs):
|
||||
"""Initialization of ArmadaBaseOperator object.
|
||||
|
||||
:param deckhand_design_ref: A URI reference to the design documents
|
||||
:param deckhand_svc_type: Deckhand Service Type
|
||||
:param armadaclient: An instance of armada client
|
||||
:param armada_svc_endpoint: Armada Service Endpoint
|
||||
:param armada_svc_type: Armada Service Type
|
||||
:param deckhand_svc_type: Deckhand Service Type
|
||||
:param main_dag_name: Parent Dag
|
||||
:param query: A dictionary containing explicit query string parameters
|
||||
:param shipyard_conf: Location of shipyard.conf
|
||||
@ -76,11 +70,8 @@ class ArmadaBaseOperator(BaseOperator):
|
||||
"""
|
||||
|
||||
super(ArmadaBaseOperator, self).__init__(*args, **kwargs)
|
||||
self.deckhand_design_ref = deckhand_design_ref
|
||||
self.deckhand_svc_type = deckhand_svc_type
|
||||
self.armada_client = armada_client
|
||||
self.armada_svc_endpoint = armada_svc_endpoint
|
||||
self.armada_svc_type = armada_svc_type
|
||||
self.deckhand_svc_type = deckhand_svc_type
|
||||
self.main_dag_name = main_dag_name
|
||||
self.query = query
|
||||
self.shipyard_conf = shipyard_conf
|
||||
@ -114,22 +105,33 @@ class ArmadaBaseOperator(BaseOperator):
|
||||
# Logs uuid of action performed by the Operator
|
||||
logging.info("Armada Operator for action %s", self.action_info['id'])
|
||||
|
||||
# Set up armada client
|
||||
self.get_armada_client()
|
||||
|
||||
# Get deckhand design reference url
|
||||
self.get_deckhand_design_ref()
|
||||
|
||||
def get_armada_client(self):
|
||||
|
||||
# Retrieve Endpoint Information
|
||||
self.armada_svc_endpoint = ucp_service_endpoint(
|
||||
armada_svc_endpoint = ucp_service_endpoint(
|
||||
self, svc_type=self.armada_svc_type)
|
||||
|
||||
logging.info("Armada endpoint is %s", self.armada_svc_endpoint)
|
||||
# Set up armada client
|
||||
self.armada_client = self._init_armada_client(armada_svc_endpoint,
|
||||
self.svc_token)
|
||||
|
||||
# Retrieve DeckHand Endpoint Information
|
||||
deckhand_svc_endpoint = ucp_service_endpoint(
|
||||
self, svc_type=self.deckhand_svc_type)
|
||||
|
||||
# Retrieve last committed revision id
|
||||
committed_revision_id = self.xcom_puller.get_design_version()
|
||||
|
||||
# Get deckhand design reference url
|
||||
self.deckhand_design_ref = self._init_deckhand_design_ref(
|
||||
deckhand_svc_endpoint,
|
||||
committed_revision_id)
|
||||
|
||||
@staticmethod
|
||||
def _init_armada_client(armada_svc_endpoint, svc_token):
|
||||
|
||||
logging.info("Armada endpoint is %s", armada_svc_endpoint)
|
||||
|
||||
# Parse Armada Service Endpoint
|
||||
armada_url = urlparse(self.armada_svc_endpoint)
|
||||
armada_url = urlparse(armada_svc_endpoint)
|
||||
|
||||
# Build a ArmadaSession with credentials and target host
|
||||
# information.
|
||||
@ -137,7 +139,7 @@ class ArmadaBaseOperator(BaseOperator):
|
||||
a_session = session.ArmadaSession(host=armada_url.hostname,
|
||||
port=armada_url.port,
|
||||
scheme='http',
|
||||
token=self.svc_token,
|
||||
token=svc_token,
|
||||
marker=None)
|
||||
|
||||
# Raise Exception if we are not able to set up the session
|
||||
@ -149,36 +151,35 @@ class ArmadaBaseOperator(BaseOperator):
|
||||
# Use the ArmadaSession to build a ArmadaClient that can
|
||||
# be used to make one or more API calls
|
||||
logging.info("Create Armada Client")
|
||||
self.armada_client = client.ArmadaClient(a_session)
|
||||
_armada_client = client.ArmadaClient(a_session)
|
||||
|
||||
# Raise Exception if we are not able to build armada client
|
||||
if self.armada_client:
|
||||
if _armada_client:
|
||||
logging.info("Successfully Set Up Armada client")
|
||||
|
||||
return _armada_client
|
||||
else:
|
||||
raise AirflowException("Failed to set up Armada client!")
|
||||
|
||||
def get_deckhand_design_ref(self):
|
||||
|
||||
# Retrieve DeckHand Endpoint Information
|
||||
deckhand_svc_endpoint = ucp_service_endpoint(
|
||||
self, svc_type=self.deckhand_svc_type)
|
||||
@staticmethod
|
||||
def _init_deckhand_design_ref(deckhand_svc_endpoint,
|
||||
committed_revision_id):
|
||||
|
||||
logging.info("Deckhand endpoint is %s", deckhand_svc_endpoint)
|
||||
|
||||
# Retrieve last committed revision id
|
||||
committed_revision_id = self.xcom_puller.get_design_version()
|
||||
|
||||
# Form DeckHand Design Reference Path
|
||||
# This URL will be used to retrieve the Site Design YAMLs
|
||||
deckhand_path = "deckhand+" + deckhand_svc_endpoint
|
||||
self.deckhand_design_ref = os.path.join(deckhand_path,
|
||||
"revisions",
|
||||
str(committed_revision_id),
|
||||
"rendered-documents")
|
||||
_deckhand_design_ref = os.path.join(deckhand_path,
|
||||
"revisions",
|
||||
str(committed_revision_id),
|
||||
"rendered-documents")
|
||||
|
||||
if self.deckhand_design_ref:
|
||||
if _deckhand_design_ref:
|
||||
logging.info("Design YAMLs will be retrieved from %s",
|
||||
self.deckhand_design_ref)
|
||||
_deckhand_design_ref)
|
||||
|
||||
return _deckhand_design_ref
|
||||
else:
|
||||
raise AirflowException("Unable to Retrieve Design Reference!")
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user