397 lines
12 KiB
Plaintext
397 lines
12 KiB
Plaintext
"""\
|
|
@file httpc_test.py
|
|
@author Bryan O'Sullivan
|
|
|
|
Copyright (c) 2007, Linden Research, Inc.
|
|
Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
of this software and associated documentation files (the "Software"), to deal
|
|
in the Software without restriction, including without limitation the rights
|
|
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|
copies of the Software, and to permit persons to whom the Software is
|
|
furnished to do so, subject to the following conditions:
|
|
|
|
The above copyright notice and this permission notice shall be included in
|
|
all copies or substantial portions of the Software.
|
|
|
|
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
|
|
THE SOFTWARE.
|
|
"""
|
|
|
|
from eventlet import api
|
|
from eventlet import httpc
|
|
from eventlet import httpd
|
|
from eventlet import processes
|
|
from eventlet import util
|
|
import time
|
|
try:
|
|
from cStringIO import StringIO
|
|
except ImportError:
|
|
from StringIO import StringIO
|
|
|
|
|
|
util.wrap_socket_with_coroutine_socket()
|
|
|
|
|
|
from eventlet import tests
|
|
|
|
|
|
class Site(object):
|
|
def __init__(self):
|
|
self.stuff = {'hello': 'hello world'}
|
|
|
|
def adapt(self, obj, req):
|
|
req.write(str(obj))
|
|
|
|
def handle_request(self, req):
|
|
return getattr(self, 'handle_%s' % req.method().lower())(req)
|
|
|
|
|
|
class BasicSite(Site):
|
|
def handle_get(self, req):
|
|
req.set_header('x-get', 'hello')
|
|
resp = StringIO()
|
|
path = req.path().lstrip('/')
|
|
try:
|
|
resp.write(self.stuff[path])
|
|
except KeyError:
|
|
req.response(404, body='Not found')
|
|
return
|
|
for k,v in req.get_query_pairs():
|
|
resp.write(k + '=' + v + '\n')
|
|
req.write(resp.getvalue())
|
|
|
|
def handle_head(self, req):
|
|
req.set_header('x-head', 'hello')
|
|
path = req.path().lstrip('/')
|
|
try:
|
|
req.write('')
|
|
except KeyError:
|
|
req.response(404, body='Not found')
|
|
|
|
def handle_put(self, req):
|
|
req.set_header('x-put', 'hello')
|
|
path = req.path().lstrip('/')
|
|
if not path:
|
|
req.response(400, body='')
|
|
return
|
|
if path in self.stuff:
|
|
req.response(204)
|
|
else:
|
|
req.response(201)
|
|
self.stuff[path] = req.read_body()
|
|
req.write('')
|
|
|
|
def handle_delete(self, req):
|
|
req.set_header('x-delete', 'hello')
|
|
path = req.path().lstrip('/')
|
|
if not path:
|
|
req.response(400, body='')
|
|
return
|
|
try:
|
|
del self.stuff[path]
|
|
req.response(204)
|
|
except KeyError:
|
|
req.response(404)
|
|
req.write('')
|
|
|
|
def handle_post(self, req):
|
|
req.set_header('x-post', 'hello')
|
|
req.write(req.read_body())
|
|
|
|
|
|
class TestBase(object):
|
|
site_class = BasicSite
|
|
|
|
def base_url(self):
|
|
return 'http://localhost:31337/'
|
|
|
|
def setUp(self):
|
|
self.logfile = StringIO()
|
|
self.victim = api.spawn(httpd.server,
|
|
api.tcp_listener(('0.0.0.0', 31337)),
|
|
self.site_class(),
|
|
log=self.logfile,
|
|
max_size=128)
|
|
|
|
def tearDown(self):
|
|
api.kill(self.victim)
|
|
|
|
|
|
class TestHttpc(TestBase, tests.TestCase):
|
|
def test_get_bad_uri(self):
|
|
self.assertRaises(httpc.NotFound,
|
|
lambda: httpc.get(self.base_url() + 'b0gu5'))
|
|
|
|
def test_get(self):
|
|
response = httpc.get(self.base_url() + 'hello')
|
|
self.assertEquals(response, 'hello world')
|
|
|
|
def test_get_(self):
|
|
status, msg, body = httpc.get_(self.base_url() + 'hello')
|
|
self.assertEquals(status, 200)
|
|
self.assertEquals(msg.dict['x-get'], 'hello')
|
|
self.assertEquals(body, 'hello world')
|
|
|
|
def test_get_query(self):
|
|
response = httpc.get(self.base_url() + 'hello?foo=bar&foo=quux')
|
|
self.assertEquals(response, 'hello worldfoo=bar\nfoo=quux\n')
|
|
|
|
def test_head_(self):
|
|
status, msg, body = httpc.head_(self.base_url() + 'hello')
|
|
self.assertEquals(status, 200)
|
|
self.assertEquals(msg.dict['x-head'], 'hello')
|
|
self.assertEquals(body, '')
|
|
|
|
def test_head(self):
|
|
self.assertEquals(httpc.head(self.base_url() + 'hello'), '')
|
|
|
|
def test_post_(self):
|
|
data = 'qunge'
|
|
status, msg, body = httpc.post_(self.base_url() + '', data=data)
|
|
self.assertEquals(status, 200)
|
|
self.assertEquals(msg.dict['x-post'], 'hello')
|
|
self.assertEquals(body, data)
|
|
|
|
def test_post(self):
|
|
data = 'qunge'
|
|
self.assertEquals(httpc.post(self.base_url() + '', data=data),
|
|
data)
|
|
|
|
def test_put_bad_uri(self):
|
|
self.assertRaises(
|
|
httpc.BadRequest,
|
|
lambda: httpc.put(self.base_url() + '', data=''))
|
|
|
|
def test_put_empty(self):
|
|
httpc.put(self.base_url() + 'empty', data='')
|
|
self.assertEquals(httpc.get(self.base_url() + 'empty'), '')
|
|
|
|
def test_put_nonempty(self):
|
|
data = 'nonempty'
|
|
httpc.put(self.base_url() + 'nonempty', data=data)
|
|
self.assertEquals(httpc.get(self.base_url() + 'nonempty'), data)
|
|
|
|
def test_put_01_create(self):
|
|
data = 'goodbye world'
|
|
status, msg, body = httpc.put_(self.base_url() + 'goodbye',
|
|
data=data)
|
|
self.assertEquals(status, 201)
|
|
self.assertEquals(msg.dict['x-put'], 'hello')
|
|
self.assertEquals(body, '')
|
|
self.assertEquals(httpc.get(self.base_url() + 'goodbye'), data)
|
|
|
|
def test_put_02_modify(self):
|
|
self.test_put_01_create()
|
|
data = 'i really mean goodbye'
|
|
status = httpc.put_(self.base_url() + 'goodbye', data=data)[0]
|
|
self.assertEquals(status, 204)
|
|
self.assertEquals(httpc.get(self.base_url() + 'goodbye'), data)
|
|
|
|
def test_delete_(self):
|
|
httpc.put(self.base_url() + 'killme', data='killme')
|
|
status, msg, body = httpc.delete_(self.base_url() + 'killme')
|
|
self.assertEquals(status, 204)
|
|
self.assertRaises(
|
|
httpc.NotFound,
|
|
lambda: httpc.get(self.base_url() + 'killme'))
|
|
|
|
def test_delete(self):
|
|
httpc.put(self.base_url() + 'killme', data='killme')
|
|
self.assertEquals(httpc.delete(self.base_url() + 'killme'), '')
|
|
self.assertRaises(
|
|
httpc.NotFound,
|
|
lambda: httpc.get(self.base_url() + 'killme'))
|
|
|
|
def test_delete_bad_uri(self):
|
|
self.assertRaises(
|
|
httpc.NotFound,
|
|
lambda: httpc.delete(self.base_url() + 'b0gu5'))
|
|
|
|
|
|
class RedirectSite(BasicSite):
|
|
response_code = 301
|
|
|
|
def handle_request(self, req):
|
|
if req.path().startswith('/redirect/'):
|
|
url = ('http://' + req.get_header('host') +
|
|
req.uri().replace('/redirect/', '/'))
|
|
req.response(self.response_code, headers={'location': url},
|
|
body='')
|
|
return
|
|
return Site.handle_request(self, req)
|
|
|
|
class Site301(RedirectSite):
|
|
pass
|
|
|
|
|
|
class Site302(BasicSite):
|
|
def handle_request(self, req):
|
|
if req.path().startswith('/expired/'):
|
|
url = ('http://' + req.get_header('host') +
|
|
req.uri().replace('/expired/', '/'))
|
|
headers = {'location': url, 'expires': '0'}
|
|
req.response(302, headers=headers, body='')
|
|
return
|
|
if req.path().startswith('/expires/'):
|
|
url = ('http://' + req.get_header('host') +
|
|
req.uri().replace('/expires/', '/'))
|
|
expires = time.time() + (100 * 24 * 60 * 60)
|
|
headers = {'location': url, 'expires': httpc.to_http_time(expires)}
|
|
req.response(302, headers=headers, body='')
|
|
return
|
|
return Site.handle_request(self, req)
|
|
|
|
|
|
class Site303(RedirectSite):
|
|
response_code = 303
|
|
|
|
|
|
class Site307(RedirectSite):
|
|
response_code = 307
|
|
|
|
|
|
class TestHttpc301(TestBase, tests.TestCase):
|
|
site_class = Site301
|
|
|
|
def base_url(self):
|
|
return 'http://localhost:31337/redirect/'
|
|
|
|
def test_get(self):
|
|
try:
|
|
httpc.get(self.base_url() + 'hello')
|
|
self.assert_(False)
|
|
except httpc.MovedPermanently, err:
|
|
response = err.retry()
|
|
self.assertEquals(response, 'hello world')
|
|
|
|
def test_post(self):
|
|
data = 'qunge'
|
|
try:
|
|
response = httpc.post(self.base_url() + '', data=data)
|
|
self.assert_(False)
|
|
except httpc.MovedPermanently, err:
|
|
response = err.retry()
|
|
self.assertEquals(response, data)
|
|
|
|
|
|
class TestHttpc302(TestBase, tests.TestCase):
|
|
site_class = Site302
|
|
|
|
def test_get_expired(self):
|
|
try:
|
|
httpc.get(self.base_url() + 'expired/hello')
|
|
self.assert_(False)
|
|
except httpc.Found, err:
|
|
response = err.retry()
|
|
self.assertEquals(response, 'hello world')
|
|
|
|
def test_get_expires(self):
|
|
try:
|
|
httpc.get(self.base_url() + 'expires/hello')
|
|
self.assert_(False)
|
|
except httpc.Found, err:
|
|
response = err.retry()
|
|
self.assertEquals(response, 'hello world')
|
|
|
|
|
|
class TestHttpc303(TestBase, tests.TestCase):
|
|
site_class = Site303
|
|
|
|
def base_url(self):
|
|
return 'http://localhost:31337/redirect/'
|
|
|
|
def test_post(self):
|
|
data = 'hello world'
|
|
try:
|
|
response = httpc.post(self.base_url() + 'hello', data=data)
|
|
self.assert_(False)
|
|
except httpc.SeeOther, err:
|
|
response = err.retry()
|
|
self.assertEquals(response, data)
|
|
|
|
|
|
class TestHttpc307(TestBase, tests.TestCase):
|
|
site_class = Site307
|
|
|
|
def base_url(self):
|
|
return 'http://localhost:31337/redirect/'
|
|
|
|
def test_post(self):
|
|
data = 'hello world'
|
|
try:
|
|
response = httpc.post(self.base_url() + 'hello', data=data)
|
|
self.assert_(False)
|
|
except httpc.TemporaryRedirect, err:
|
|
response = err.retry()
|
|
self.assertEquals(response, data)
|
|
|
|
|
|
class Site500(BasicSite):
|
|
def handle_request(self, req):
|
|
req.response(500, body="screw you world")
|
|
return
|
|
|
|
|
|
class Site500(BasicSite):
|
|
def handle_request(self, req):
|
|
req.response(500, body="screw you world")
|
|
return
|
|
|
|
|
|
class TestHttpc500(TestBase, tests.TestCase):
|
|
site_class = Site500
|
|
|
|
def base_url(self):
|
|
return 'http://localhost:31337/'
|
|
|
|
def test_get(self):
|
|
data = 'screw you world'
|
|
try:
|
|
response = httpc.get(self.base_url())
|
|
self.fail()
|
|
except httpc.InternalServerError, e:
|
|
self.assertEquals(e.params.response_body, data)
|
|
self.assert_(str(e).count(data))
|
|
self.assert_(repr(e).count(data))
|
|
|
|
|
|
class Site504(BasicSite):
|
|
def handle_request(self, req):
|
|
req.response(504, body="screw you world")
|
|
|
|
|
|
class TestHttpc504(TestBase, tests.TestCase):
|
|
site_class = Site504
|
|
|
|
def base_url(self):
|
|
return 'http://localhost:31337/'
|
|
|
|
def test_post(self):
|
|
# Simply ensure that a 504 status code results in a
|
|
# GatewayTimeout. Don't bother retrying.
|
|
data = 'hello world'
|
|
self.assertRaises(httpc.GatewayTimeout,
|
|
lambda: httpc.post(self.base_url(), data=data))
|
|
|
|
|
|
class TestHttpTime(tests.TestCase):
|
|
rfc1123_time = 'Sun, 06 Nov 1994 08:49:37 GMT'
|
|
rfc850_time = 'Sunday, 06-Nov-94 08:49:37 GMT'
|
|
asctime_time = 'Sun Nov 6 08:49:37 1994'
|
|
secs_since_epoch = 784111777
|
|
def test_to_http_time(self):
|
|
self.assertEqual(self.rfc1123_time, httpc.to_http_time(self.secs_since_epoch))
|
|
|
|
def test_from_http_time(self):
|
|
for formatted in (self.rfc1123_time, self.rfc850_time, self.asctime_time):
|
|
ticks = httpc.from_http_time(formatted, 0)
|
|
self.assertEqual(ticks, self.secs_since_epoch)
|
|
|
|
if __name__ == '__main__':
|
|
tests.main()
|