shipyard/shipyard_airflow/errors.py

53 lines
1.2 KiB
Python

# -*- coding: utf-8 -*-
import json
import falcon
try:
from collections import OrderedDict
except ImportError:
OrderedDict = dict
ERR_UNKNOWN = {
'status': falcon.HTTP_500,
'title': 'Internal Server Error'
}
ERR_AIRFLOW_RESPONSE = {
'status': falcon.HTTP_400,
'title': 'Error response from Airflow'
}
class AppError(Exception):
def __init__(self, error=ERR_UNKNOWN, description=None):
self.error = error
self.error['description'] = description
@property
def title(self):
return self.error['title']
@property
def status(self):
return self.error['status']
@property
def description(self):
return self.error['description']
@staticmethod
def handle(exception, req, res, error=None):
res.status = exception.status
meta = OrderedDict()
meta['message'] = exception.title
if exception.description:
meta['description'] = exception.description
res.body = json.dumps(meta)
class AirflowError(AppError):
def __init__(self, description=None):
super().__init__(ERR_AIRFLOW_RESPONSE)
self.error['description'] = description