Files
deb-python-falcon/tests/test_example.py
2013-02-18 13:55:02 -05:00

104 lines
3.3 KiB
Python

import json
import logging
import falcon
class StorageEngine:
pass
class StorageError(Exception):
pass
def token_is_valid(token, user_id):
return True # Suuuuuure it's valid...
def auth(req, resp, params):
# Alternatively, do this in middleware
token = req.get_header('X-Auth-Token')
if token is None:
raise falcon.HTTPUnauthorized('Auth token required',
'Please provide an auth token '
'as part of the request',
'http://docs.example.com/auth')
if not token_is_valid(token, params['user_id']):
raise falcon.HTTPUnauthorized('Authentication required',
'The provided auth token is '
'not valid. Please request a '
'new token and try again.',
'http://docs.example.com/auth')
def check_media_type(req, resp, params):
if not req.client_accepts_json():
raise falcon.HTTPUnsupportedMediaType(
'Media Type not Supported',
'This API only supports the JSON media type.',
'http://docs.examples.com/api/json')
class ThingsResource:
def __init__(self, db):
self.db = db
self.logger = logging.getLogger('thingsapi.' + __name__)
def on_get(self, req, resp, user_id):
marker = req.get_param('marker', default='')
limit = req.get_param('limit', default=50)
try:
result = self.db.get_things(marker, limit)
except Exception as ex:
self.logger.error(ex)
description = ('Aliens have attacked our base! We will '
'be back as soon as we fight them off. '
'We appreciate your patience.')
raise falcon.HTTPServiceUnavailable('Service Outage', description)
resp.set_header('X-Powered-By', 'Donuts')
resp.status = falcon.HTTP_200
resp.body = json.dumps(result)
def on_post(self, req, resp, user_id):
try:
raw_json = req.stream.read()
except Exception:
raise falcon.HTTPError(falcon.HTTP_748,
'Read Error',
'Could not read the request body. Must be '
'them ponies again.')
try:
thing = json.loads(raw_json, 'utf-8')
except ValueError:
raise falcon.HTTPError(falcon.HTTP_753,
'Malformed JSON',
'Could not decode the request body. The '
'JSON was incorrect.')
try:
proper_thing = self.db.add_thing(thing)
except StorageError:
raise falcon.HTTPError(falcon.HTTP_725,
'Database Error',
"Sorry, couldn't write your thing to the "
'database. It worked on my machine.')
resp.status = falcon.HTTP_201
resp.location = '/%s/things/%s' % (user_id, proper_thing.id)
wsgi_app = api = falcon.API(before=[auth, check_media_type])
db = StorageEngine()
things = ThingsResource(db)
api.add_route('/{user_id}/things', things)