Update Operators

There has been significant code changes in Airflow from
version 1.8.2 to version 1.9.0 as seen in [0]

In particular, it seems that we are not able to store the
drydock session as xcom now due to serialization errors [1].
It appears that io.TextIOWrapper (which is a wrapper that
converts binary file-like objects to text file-like objects)
can't be serialized as it assumes it may have external/run
time state, e.g. a file descriptor that has a specific position
in a file that may not be available when it is deserialized at a
later stage in time.

Hence we are making changes to the logic such that we will create
a new session for each new task instead of using xcom to store the
session and re-use it across tasks.

[0] https://github.com/apache/incubator-airflow/blob/master/CHANGELOG.txt

[1] Exceptions Seen in Airflow 1.9.0

[2018-01-18 16:37:23,394] {base_task_runner.py:98} INFO - Subtask: Traceback (most recent call last):
[2018-01-18 16:37:23,394] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/bin/airflow", line 27, in <module>
[2018-01-18 16:37:23,394] {base_task_runner.py:98} INFO - Subtask:     args.func(args)
[2018-01-18 16:37:23,395] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python3.5/dist-packages/airflow/bin/cli.py", line 392, in run
[2018-01-18 16:37:23,395] {base_task_runner.py:98} INFO - Subtask:     pool=args.pool,
[2018-01-18 16:37:23,395] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python3.5/dist-packages/airflow/utils/db.py", line 50, in wrapper
[2018-01-18 16:37:23,395] {base_task_runner.py:98} INFO - Subtask:     result = func(*args, **kwargs)
[2018-01-18 16:37:23,396] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python3.5/dist-packages/airflow/models.py", line 1497, in _run_raw_task
[2018-01-18 16:37:23,396] {base_task_runner.py:98} INFO - Subtask:     self.xcom_push(key=XCOM_RETURN_KEY, value=result)
[2018-01-18 16:37:23,396] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python3.5/dist-packages/airflow/models.py", line 1817, in xcom_push
[2018-01-18 16:37:23,396] {base_task_runner.py:98} INFO - Subtask:     execution_date=execution_date or self.execution_date)
[2018-01-18 16:37:23,397] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python3.5/dist-packages/airflow/utils/db.py", line 50, in wrapper
[2018-01-18 16:37:23,397] {base_task_runner.py:98} INFO - Subtask:     result = func(*args, **kwargs)
[2018-01-18 16:37:23,397] {base_task_runner.py:98} INFO - Subtask:   File "/usr/local/lib/python3.5/dist-packages/airflow/models.py", line 4103, in set
[2018-01-18 16:37:23,397] {base_task_runner.py:98} INFO - Subtask:     value = pickle.dumps(value)
[2018-01-18 16:37:23,398] {base_task_runner.py:98} INFO - Subtask: TypeError: cannot serialize '_io.TextIOWrapper' object

Change-Id: I0fd686a91a86a36768a2caeed4b16a1dbbb040a3
This commit is contained in:
Anthony Lin 2018-01-16 17:20:06 +00:00
parent 7b2d831388
commit 8e076287f3
4 changed files with 18 additions and 53 deletions

View File

@ -29,15 +29,6 @@ def deploy_site_armada(parent_dag_name, child_dag_name, args):
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args)
# Create Armada Client
armada_client = ArmadaOperator(
task_id='create_armada_client',
shipyard_conf=config_path,
action='create_armada_client',
main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name,
dag=dag)
# Get Tiller Status
armada_status = ArmadaOperator(
task_id='armada_status',
@ -67,7 +58,6 @@ def deploy_site_armada(parent_dag_name, child_dag_name, args):
dag=dag)
# Define dependencies
armada_status.set_upstream(armada_client)
armada_apply.set_upstream(armada_status)
armada_get_releases.set_upstream(armada_apply)

View File

@ -30,14 +30,6 @@ def deploy_site_drydock(parent_dag_name, child_dag_name, args):
'{}.{}'.format(parent_dag_name, child_dag_name),
default_args=args)
drydock_client = DryDockOperator(
task_id='create_drydock_client',
shipyard_conf=config_path,
action='create_drydock_client',
main_dag_name=parent_dag_name,
sub_dag_name=child_dag_name,
dag=dag)
drydock_verify_site = DryDockOperator(
task_id='verify_site',
shipyard_conf=config_path,
@ -71,7 +63,6 @@ def deploy_site_drydock(parent_dag_name, child_dag_name, args):
dag=dag)
# Define dependencies
drydock_verify_site.set_upstream(drydock_client)
drydock_prepare_site.set_upstream(drydock_verify_site)
drydock_prepare_nodes.set_upstream(drydock_prepare_site)
drydock_deploy_nodes.set_upstream(drydock_prepare_nodes)

View File

@ -76,19 +76,6 @@ class ArmadaOperator(BaseOperator):
# Logs uuid of action performed by the Operator
logging.info("Armada Operator for action %s", workflow_info['id'])
# Create Armada Client
if self.action == 'create_armada_client':
# Retrieve Endpoint Information
svc_type = 'armada'
context['svc_endpoint'] = ucp_service_endpoint(self,
svc_type=svc_type)
logging.info("Armada endpoint is %s", context['svc_endpoint'])
# Set up Armada Client
session_client = self.armada_session_client(context)
return session_client
# Retrieve Deckhand Design Reference
design_ref = self.get_deckhand_design_ref(context)
@ -118,10 +105,15 @@ class ArmadaOperator(BaseOperator):
return site_design_validity
# Retrieve armada_client via XCOM so as to perform other tasks
armada_client = task_instance.xcom_pull(
task_ids='create_armada_client',
dag_id=self.main_dag_name + '.' + self.sub_dag_name)
# Create Armada Client
# Retrieve Endpoint Information
svc_type = 'armada'
context['svc_endpoint'] = ucp_service_endpoint(self,
svc_type=svc_type)
logging.info("Armada endpoint is %s", context['svc_endpoint'])
# Set up Armada Client
armada_client = self.armada_session_client(context)
# Retrieve Tiller Information and assign to context 'query'
context['query'] = self.get_tiller_info(context)

View File

@ -88,19 +88,6 @@ class DryDockOperator(BaseOperator):
# Logs uuid of action performed by the Operator
logging.info("DryDock Operator for action %s", workflow_info['id'])
# DrydockClient
if self.action == 'create_drydock_client':
# Retrieve Endpoint Information
svc_type = 'physicalprovisioner'
context['svc_endpoint'] = ucp_service_endpoint(self,
svc_type=svc_type)
logging.info("DryDock endpoint is %s", context['svc_endpoint'])
# Set up DryDock Client
drydock_client = self.drydock_session_client(context)
return drydock_client
# Retrieve Deckhand Design Reference
self.design_ref = self.get_deckhand_design_ref(context)
@ -124,10 +111,15 @@ class DryDockOperator(BaseOperator):
return site_design_validity
# Retrieve drydock_client via XCOM so as to perform other tasks
drydock_client = task_instance.xcom_pull(
task_ids='create_drydock_client',
dag_id=self.main_dag_name + '.' + self.sub_dag_name)
# DrydockClient
# Retrieve Endpoint Information
svc_type = 'physicalprovisioner'
context['svc_endpoint'] = ucp_service_endpoint(self,
svc_type=svc_type)
logging.info("DryDock endpoint is %s", context['svc_endpoint'])
# Set up DryDock Client
drydock_client = self.drydock_session_client(context)
# Read shipyard.conf
config = configparser.ConfigParser()