This commit is contained in:
Ryan Williams
2010-05-22 13:30:03 -07:00
5 changed files with 39 additions and 8 deletions

View File

@@ -1,4 +1,4 @@
version_info = (0, 9, 7, "dev1") version_info = (0, 9, 8, "dev1")
__version__ = ".".join(map(str, version_info)) __version__ = ".".join(map(str, version_info))
try: try:

View File

@@ -38,15 +38,20 @@ def make_psycopg_green():
extensions.set_wait_callback(eventlet_wait_callback) extensions.set_wait_callback(eventlet_wait_callback)
def eventlet_wait_callback(conn, timeout=-1): def eventlet_wait_callback(conn, timeout=-1,
# access these objects with LOAD_FAST instead of LOAD_GLOBAL lookup
POLL_OK=extensions.POLL_OK,
POLL_READ=extensions.POLL_READ,
POLL_WRITE=extensions.POLL_WRITE,
trampoline=trampoline):
"""A wait callback useful to allow eventlet to work with Psycopg.""" """A wait callback useful to allow eventlet to work with Psycopg."""
while 1: while 1:
state = conn.poll() state = conn.poll()
if state == extensions.POLL_OK: if state == POLL_OK:
break break
elif state == extensions.POLL_READ: elif state == POLL_READ:
trampoline(conn.fileno(), read=True) trampoline(conn.fileno(), read=True)
elif state == extensions.POLL_WRITE: elif state == POLL_WRITE:
trampoline(conn.fileno(), write=True) trampoline(conn.fileno(), write=True)
else: else:
raise psycopg2.OperationalError( raise psycopg2.OperationalError(

View File

@@ -231,7 +231,7 @@ def setup():
_rpipe, _wpipe = os.pipe() _rpipe, _wpipe = os.pipe()
_wfile = greenio.GreenPipe(_wpipe, 'wb', 0) _wfile = greenio.GreenPipe(_wpipe, 'wb', 0)
_rfile = greenio.GreenPipe(_rpipe, 'rb', 0) _rfile = greenio.GreenPipe(_rpipe, 'rb', 0)
except ImportError: except (ImportError, NotImplementedError):
# This is Windows compatibility -- use a socket instead of a pipe because # This is Windows compatibility -- use a socket instead of a pipe because
# pipes don't really exist on Windows. # pipes don't really exist on Windows.
import socket import socket

View File

@@ -111,8 +111,11 @@ class Input(object):
if self.chunk_length > self.position: if self.chunk_length > self.position:
response.append(rfile.read( response.append(rfile.read(
min(self.chunk_length - self.position, length))) min(self.chunk_length - self.position, length)))
length -= len(response[-1]) last_read = len(response[-1])
self.position += len(response[-1]) if last_read == 0:
break
length -= last_read
self.position += last_read
if self.chunk_length == self.position: if self.chunk_length == self.position:
rfile.readline() rfile.readline()
else: else:

View File

@@ -11,6 +11,7 @@ from unittest import main
from eventlet import api from eventlet import api
from eventlet import util from eventlet import util
from eventlet import greenio from eventlet import greenio
from eventlet import event
from eventlet.green import socket as greensocket from eventlet.green import socket as greensocket
from eventlet import wsgi from eventlet import wsgi
from eventlet.support import get_errno from eventlet.support import get_errno
@@ -833,6 +834,28 @@ class TestHttpd(_TestBase):
# (one terminates the chunk, one terminates the body) # (one terminates the chunk, one terminates the body)
self.assertEqual(response, ['0', '', '']) self.assertEqual(response, ['0', '', ''])
def test_aborted_chunked_post(self):
read_content = event.Event()
def chunk_reader(env, start_response):
content = env['wsgi.input'].read(1024)
read_content.send(content)
start_response('200 OK', [('Content-Type', 'text/plain')])
return [content]
self.site.application = chunk_reader
expected_body = 'a bunch of stuff'
data = "\r\n".join(['PUT /somefile HTTP/1.0',
'Transfer-Encoding: chunked',
'',
'def',
expected_body])
# start PUT-ing some chunked data but close prematurely
sock = eventlet.connect(('127.0.0.1', self.port))
sock.sendall(data)
sock.close()
# the test passes if we successfully get here, and read all the data
# in spite of the early close
self.assertEqual(read_content.wait(), expected_body)
def read_headers(sock): def read_headers(sock):
fd = sock.makefile() fd = sock.makefile()
try: try: