204 lines
6.3 KiB
Python
204 lines
6.3 KiB
Python
from datetime import datetime, date
|
|
from decimal import Decimal
|
|
try:
|
|
from simplejson import loads
|
|
except:
|
|
from json import loads
|
|
try:
|
|
from sqlalchemy import orm, schema, types
|
|
from sqlalchemy.engine import create_engine
|
|
except ImportError:
|
|
create_engine = None
|
|
from unittest import TestCase
|
|
|
|
from pecan.jsonify import jsonify, encode, ResultProxy, RowProxy
|
|
from pecan import Pecan, expose, request
|
|
from webtest import TestApp
|
|
from webob.multidict import MultiDict, UnicodeMultiDict
|
|
|
|
def make_person():
|
|
class Person(object):
|
|
def __init__(self, first_name, last_name):
|
|
self.first_name = first_name
|
|
self.last_name = last_name
|
|
|
|
@property
|
|
def name(self):
|
|
return '%s %s' % (self.first_name, self.last_name)
|
|
return Person
|
|
|
|
|
|
def test_simple_rule():
|
|
Person = make_person()
|
|
|
|
# create a Person instance
|
|
p = Person('Jonathan', 'LaCour')
|
|
|
|
# register a generic JSON rule
|
|
@jsonify.when_type(Person)
|
|
def jsonify_person(obj):
|
|
return dict(
|
|
name=obj.name
|
|
)
|
|
|
|
# encode the object using our new rule
|
|
result = loads(encode(p))
|
|
assert result['name'] == 'Jonathan LaCour'
|
|
assert len(result) == 1
|
|
|
|
|
|
class TestJsonify(object):
|
|
|
|
def test_simple_jsonify(self):
|
|
Person = make_person()
|
|
|
|
# register a generic JSON rule
|
|
@jsonify.when_type(Person)
|
|
def jsonify_person(obj):
|
|
return dict(
|
|
name=obj.name
|
|
)
|
|
|
|
class RootController(object):
|
|
@expose('json')
|
|
def index(self):
|
|
# create a Person instance
|
|
p = Person('Jonathan', 'LaCour')
|
|
return p
|
|
|
|
app = TestApp(Pecan(RootController()))
|
|
|
|
r = app.get('/')
|
|
assert r.status_int == 200
|
|
assert loads(r.body) == {'name':'Jonathan LaCour'}
|
|
|
|
class TestJsonifyGenericEncoder(TestCase):
|
|
def test_json_callable(self):
|
|
class JsonCallable(object):
|
|
def __init__(self, arg):
|
|
self.arg = arg
|
|
def __json__(self):
|
|
return {"arg":self.arg}
|
|
|
|
result = encode(JsonCallable('foo'))
|
|
assert loads(result) == {'arg':'foo'}
|
|
|
|
def test_datetime(self):
|
|
today = date.today()
|
|
now = datetime.now()
|
|
|
|
result = encode(today)
|
|
assert loads(result) == str(today)
|
|
|
|
result = encode(now)
|
|
assert loads(result) == str(now)
|
|
|
|
def test_decimal(self):
|
|
# XXX Testing for float match which is inexact
|
|
|
|
d = Decimal('1.1')
|
|
result = encode(d)
|
|
assert loads(result) == float(d)
|
|
|
|
def test_multidict(self):
|
|
md = MultiDict()
|
|
md.add('arg', 'foo')
|
|
md.add('arg', 'bar')
|
|
result = encode(md)
|
|
assert loads(result) == {'arg': ['foo', 'bar']}
|
|
|
|
def test_fallback_to_builtin_encoder(self):
|
|
class Foo(object): pass
|
|
|
|
self.assertRaises(TypeError, encode, Foo())
|
|
|
|
class TestJsonifySQLAlchemyGenericEncoder(TestCase):
|
|
|
|
def setUp(self):
|
|
if not create_engine:
|
|
self.create_fake_proxies()
|
|
else:
|
|
self.create_sa_proxies()
|
|
|
|
def create_fake_proxies(self):
|
|
|
|
# create a fake SA object
|
|
class FakeSAObject(object):
|
|
def __init__(self):
|
|
self._sa_class_manager = object()
|
|
self._sa_instance_state = 'awesome'
|
|
self.id = 1
|
|
self.first_name = 'Jonathan'
|
|
self.last_name = 'LaCour'
|
|
|
|
# create a fake result proxy
|
|
class FakeResultProxy(ResultProxy):
|
|
def __init__(self):
|
|
self.rowcount = -1
|
|
self.rows = []
|
|
def __iter__(self):
|
|
return iter(self.rows)
|
|
def append(self, row):
|
|
self.rows.append(row)
|
|
|
|
# create a fake row proxy
|
|
class FakeRowProxy(RowProxy):
|
|
def __init__(self, arg=None):
|
|
self.row = dict(arg)
|
|
def __getitem__(self, key):
|
|
return self.row.__getitem__(key)
|
|
def keys(self):
|
|
return self.row.keys()
|
|
|
|
# get the SA objects
|
|
self.sa_object = FakeSAObject()
|
|
self.result_proxy = FakeResultProxy()
|
|
self.result_proxy.append(FakeRowProxy([('id', 1), ('first_name', 'Jonathan'), ('last_name', 'LaCour')]))
|
|
self.result_proxy.append(FakeRowProxy([('id', 2), ('first_name', 'Yoann'), ('last_name', 'Roman')]))
|
|
self.row_proxy = FakeRowProxy([('id', 1), ('first_name', 'Jonathan'), ('last_name', 'LaCour')])
|
|
|
|
def create_sa_proxies(self):
|
|
|
|
# create the table and mapper
|
|
metadata = schema.MetaData()
|
|
user_table = schema.Table('user', metadata,
|
|
schema.Column('id', types.Integer, primary_key=True),
|
|
schema.Column('first_name', types.Unicode(25)),
|
|
schema.Column('last_name', types.Unicode(25)))
|
|
class User(object):
|
|
pass
|
|
orm.mapper(User, user_table)
|
|
|
|
# create the session
|
|
engine = create_engine('sqlite:///:memory:')
|
|
metadata.bind = engine
|
|
metadata.create_all()
|
|
session = orm.sessionmaker(bind=engine)()
|
|
|
|
# add some dummy data
|
|
user_table.insert().execute([
|
|
{'first_name': u'Jonathan', 'last_name': u'LaCour'},
|
|
{'first_name': u'Yoann', 'last_name': u'Roman'}
|
|
])
|
|
|
|
# get the SA objects
|
|
self.sa_object = session.query(User).first()
|
|
select = user_table.select()
|
|
self.result_proxy = select.execute()
|
|
self.row_proxy = select.execute().fetchone()
|
|
|
|
def test_sa_object(self):
|
|
result = encode(self.sa_object)
|
|
assert loads(result) == {'id': 1, 'first_name': 'Jonathan', 'last_name': 'LaCour'}
|
|
|
|
def test_result_proxy(self):
|
|
result = encode(self.result_proxy)
|
|
assert loads(result) == {'count': 2, 'rows': [
|
|
{'id': 1, 'first_name': 'Jonathan', 'last_name': 'LaCour'},
|
|
{'id': 2, 'first_name': 'Yoann', 'last_name': 'Roman'}
|
|
]}
|
|
|
|
def test_row_proxy(self):
|
|
result = encode(self.row_proxy)
|
|
assert loads(result) == {'id': 1, 'first_name': 'Jonathan', 'last_name': 'LaCour'}
|