Merge "Update DeckhandOperator to do filtering/sorting server-side"
This commit is contained in:
commit
aff9036c19
|
@ -119,8 +119,10 @@ class DeckhandOperator(BaseOperator):
|
|||
logging.info("Retrieving revisions information...")
|
||||
|
||||
try:
|
||||
query_params = {'tag': 'committed', 'sort': 'id', 'order': 'desc'}
|
||||
revisions = yaml.safe_load(requests.get(
|
||||
revision_endpoint, headers=x_auth_token).text)
|
||||
revision_endpoint, headers=x_auth_token,
|
||||
params=query_params).text)
|
||||
except requests.exceptions.RequestException as e:
|
||||
raise AirflowException(e)
|
||||
|
||||
|
@ -133,14 +135,10 @@ class DeckhandOperator(BaseOperator):
|
|||
# Initialize Committed Version
|
||||
committed_ver = None
|
||||
|
||||
# Construct revision_list
|
||||
revision_list = revisions.get('results', [])
|
||||
|
||||
# Search for the last committed version and save it as xcom
|
||||
for revision in reversed(revision_list):
|
||||
if 'committed' in revision.get('tags'):
|
||||
committed_ver = revision.get('id')
|
||||
break
|
||||
revision_list = revisions.get('results', [])
|
||||
if revision_list:
|
||||
committed_ver = revision_list[-1].get('id')
|
||||
|
||||
if committed_ver:
|
||||
logging.info("Last committed revision is %d", committed_ver)
|
||||
|
|
Loading…
Reference in New Issue