Update Media Upload to include io.Base and also fix some bugs.

Fixes issue #139.
Fixes issue #123.

Reviewed in http://codereview.appspot.com/6307067/.
This commit is contained in:
Joe Gregorio
2012-06-12 09:36:30 -04:00
parent 01c86b1ecc
commit 910b9b1270
3 changed files with 391 additions and 27 deletions

View File

@@ -46,6 +46,9 @@ from model import JsonModel
from oauth2client.anyjson import simplejson from oauth2client.anyjson import simplejson
DEFAULT_CHUNK_SIZE = 512*1024
class MediaUploadProgress(object): class MediaUploadProgress(object):
"""Status of a resumable upload.""" """Status of a resumable upload."""
@@ -54,14 +57,23 @@ class MediaUploadProgress(object):
Args: Args:
resumable_progress: int, bytes sent so far. resumable_progress: int, bytes sent so far.
total_size: int, total bytes in complete upload. total_size: int, total bytes in complete upload, or None if the total
upload size isn't known ahead of time.
""" """
self.resumable_progress = resumable_progress self.resumable_progress = resumable_progress
self.total_size = total_size self.total_size = total_size
def progress(self): def progress(self):
"""Percent of upload completed, as a float.""" """Percent of upload completed, as a float.
return float(self.resumable_progress) / float(self.total_size)
Returns:
the percentage complete as a float, returning 0.0 if the total size of
the upload is unknown.
"""
if self.total_size is not None:
return float(self.resumable_progress) / float(self.total_size)
else:
return 0.0
class MediaUpload(object): class MediaUpload(object):
@@ -77,21 +89,51 @@ class MediaUpload(object):
such as under certain classes of requests under Google App Engine. such as under certain classes of requests under Google App Engine.
""" """
def getbytes(self, begin, end):
raise NotImplementedError()
def size(self):
raise NotImplementedError()
def chunksize(self): def chunksize(self):
"""Chunk size for resumable uploads.
Returns:
Chunk size in bytes.
"""
raise NotImplementedError() raise NotImplementedError()
def mimetype(self): def mimetype(self):
"""Mime type of the body.
Returns:
Mime type.
"""
return 'application/octet-stream' return 'application/octet-stream'
def size(self):
"""Size of upload.
Returns:
Size of the body, or None of the size is unknown.
"""
return None
def resumable(self): def resumable(self):
"""Whether this upload is resumable.
Returns:
True if resumable upload or False.
"""
return False return False
def getbytes(self, begin, end):
"""Get bytes from the media.
Args:
begin: int, offset from beginning of file.
length: int, number of bytes to read, starting at begin.
Returns:
A string of bytes read. May be shorter than length if EOF was reached
first.
"""
raise NotImplementedError()
def _to_json(self, strip=None): def _to_json(self, strip=None):
"""Utility function for creating a JSON representation of a MediaUpload. """Utility function for creating a JSON representation of a MediaUpload.
@@ -148,15 +190,15 @@ class MediaFileUpload(MediaUpload):
method. For example, if we had a service that allowed uploading images: method. For example, if we had a service that allowed uploading images:
media = MediaFileUpload('smiley.png', mimetype='image/png', chunksize=1000, media = MediaFileUpload('smiley.png', mimetype='image/png',
resumable=True) chunksize=1024*1024, resumable=True)
service.objects().insert( service.objects().insert(
bucket=buckets['items'][0]['id'], bucket=buckets['items'][0]['id'],
name='smiley.png', name='smiley.png',
media_body=media).execute() media_body=media).execute()
""" """
def __init__(self, filename, mimetype=None, chunksize=256*1024, resumable=False): def __init__(self, filename, mimetype=None, chunksize=DEFAULT_CHUNK_SIZE, resumable=False):
"""Constructor. """Constructor.
Args: Args:
@@ -177,16 +219,36 @@ class MediaFileUpload(MediaUpload):
self._chunksize = chunksize self._chunksize = chunksize
self._resumable = resumable self._resumable = resumable
def chunksize(self):
"""Chunk size for resumable uploads.
Returns:
Chunk size in bytes.
"""
return self._chunksize
def mimetype(self): def mimetype(self):
"""Mime type of the body.
Returns:
Mime type.
"""
return self._mimetype return self._mimetype
def size(self): def size(self):
"""Size of upload.
Returns:
Size of the body, or None of the size is unknown.
"""
return self._size return self._size
def chunksize(self):
return self._chunksize
def resumable(self): def resumable(self):
"""Whether this upload is resumable.
Returns:
True if resumable upload or False.
"""
return self._resumable return self._resumable
def getbytes(self, begin, length): def getbytes(self, begin, length):
@@ -221,6 +283,98 @@ class MediaFileUpload(MediaUpload):
d['_filename'], d['_mimetype'], d['_chunksize'], d['_resumable']) d['_filename'], d['_mimetype'], d['_chunksize'], d['_resumable'])
class MediaIoBaseUpload(MediaUpload):
"""A MediaUpload for a io.Base objects.
Note that the Python file object is compatible with io.Base and can be used
with this class also.
fh = io.BytesIO('...Some data to upload...')
media = MediaIoBaseUpload(fh, mimetype='image/png',
chunksize=1024*1024, resumable=True)
service.objects().insert(
bucket='a_bucket_id',
name='smiley.png',
media_body=media).execute()
"""
def __init__(self, fh, mimetype, chunksize=DEFAULT_CHUNK_SIZE,
resumable=False):
"""Constructor.
Args:
fh: io.Base or file object, The source of the bytes to upload.
mimetype: string, Mime-type of the file. If None then a mime-type will be
guessed from the file extension.
chunksize: int, File will be uploaded in chunks of this many bytes. Only
used if resumable=True.
resumable: bool, True if this is a resumable upload. False means upload
in a single request.
"""
self._fh = fh
self._mimetype = mimetype
self._chunksize = chunksize
self._resumable = resumable
self._size = None
try:
if hasattr(self._fh, 'fileno'):
fileno = self._fh.fileno()
self._size = os.fstat(fileno).st_size
except IOError:
pass
def chunksize(self):
"""Chunk size for resumable uploads.
Returns:
Chunk size in bytes.
"""
return self._chunksize
def mimetype(self):
"""Mime type of the body.
Returns:
Mime type.
"""
return self._mimetype
def size(self):
"""Size of upload.
Returns:
Size of the body, or None of the size is unknown.
"""
return self._size
def resumable(self):
"""Whether this upload is resumable.
Returns:
True if resumable upload or False.
"""
return self._resumable
def getbytes(self, begin, length):
"""Get bytes from the media.
Args:
begin: int, offset from beginning of file.
length: int, number of bytes to read, starting at begin.
Returns:
A string of bytes read. May be shorted than length if EOF was reached
first.
"""
self._fh.seek(begin)
return self._fh.read(length)
def to_json(self):
"""This upload type is not serializable."""
raise NotImplementedError('MediaIoBaseUpload is not serializable.')
class MediaInMemoryUpload(MediaUpload): class MediaInMemoryUpload(MediaUpload):
"""MediaUpload for a chunk of bytes. """MediaUpload for a chunk of bytes.
@@ -229,7 +383,7 @@ class MediaInMemoryUpload(MediaUpload):
""" """
def __init__(self, body, mimetype='application/octet-stream', def __init__(self, body, mimetype='application/octet-stream',
chunksize=256*1024, resumable=False): chunksize=DEFAULT_CHUNK_SIZE, resumable=False):
"""Create a new MediaBytesUpload. """Create a new MediaBytesUpload.
Args: Args:
@@ -266,7 +420,7 @@ class MediaInMemoryUpload(MediaUpload):
"""Size of upload. """Size of upload.
Returns: Returns:
Size of the body. Size of the body, or None of the size is unknown.
""" """
return len(self._body) return len(self._body)
@@ -345,6 +499,7 @@ class HttpRequest(object):
self.http = http self.http = http
self.postproc = postproc self.postproc = postproc
self.resumable = resumable self.resumable = resumable
self._in_error_state = False
# Pull the multipart boundary out of the content-type header. # Pull the multipart boundary out of the content-type header.
major, minor, params = mimeparse.parse_mime_type( major, minor, params = mimeparse.parse_mime_type(
@@ -417,14 +572,24 @@ class HttpRequest(object):
Returns: Returns:
(status, body): (ResumableMediaStatus, object) (status, body): (ResumableMediaStatus, object)
The body will be None until the resumable media is fully uploaded. The body will be None until the resumable media is fully uploaded.
Raises:
apiclient.errors.HttpError if the response was not a 2xx.
httplib2.Error if a transport error has occured.
""" """
if http is None: if http is None:
http = self.http http = self.http
if self.resumable.size() is None:
size = '*'
else:
size = str(self.resumable.size())
if self.resumable_uri is None: if self.resumable_uri is None:
start_headers = copy.copy(self.headers) start_headers = copy.copy(self.headers)
start_headers['X-Upload-Content-Type'] = self.resumable.mimetype() start_headers['X-Upload-Content-Type'] = self.resumable.mimetype()
start_headers['X-Upload-Content-Length'] = str(self.resumable.size()) if size != '*':
start_headers['X-Upload-Content-Length'] = size
start_headers['content-length'] = str(self.body_size) start_headers['content-length'] = str(self.body_size)
resp, content = http.request(self.uri, self.method, resp, content = http.request(self.uri, self.method,
@@ -434,26 +599,63 @@ class HttpRequest(object):
self.resumable_uri = resp['location'] self.resumable_uri = resp['location']
else: else:
raise ResumableUploadError("Failed to retrieve starting URI.") raise ResumableUploadError("Failed to retrieve starting URI.")
elif self._in_error_state:
# If we are in an error state then query the server for current state of
# the upload by sending an empty PUT and reading the 'range' header in
# the response.
headers = {
'Content-Range': 'bytes */%s' % size,
'content-length': '0'
}
resp, content = http.request(self.resumable_uri, 'PUT',
headers=headers)
status, body = self._process_response(resp, content)
if body:
# The upload was complete.
return (status, body)
data = self.resumable.getbytes(self.resumable_progress, data = self.resumable.getbytes(
self.resumable.chunksize()) self.resumable_progress, self.resumable.chunksize())
headers = { headers = {
'Content-Range': 'bytes %d-%d/%d' % ( 'Content-Range': 'bytes %d-%d/%s' % (
self.resumable_progress, self.resumable_progress + len(data) - 1, self.resumable_progress, self.resumable_progress + len(data) - 1,
self.resumable.size()), size)
} }
resp, content = http.request(self.resumable_uri, 'PUT', try:
body=data, resp, content = http.request(self.resumable_uri, 'PUT',
headers=headers) body=data,
headers=headers)
except:
self._in_error_state = True
raise
return self._process_response(resp, content)
def _process_response(self, resp, content):
"""Process the response from a single chunk upload.
Args:
resp: httplib2.Response, the response object.
content: string, the content of the response.
Returns:
(status, body): (ResumableMediaStatus, object)
The body will be None until the resumable media is fully uploaded.
Raises:
apiclient.errors.HttpError if the response was not a 2xx or a 308.
"""
if resp.status in [200, 201]: if resp.status in [200, 201]:
self._in_error_state = False
return None, self.postproc(resp, content) return None, self.postproc(resp, content)
elif resp.status == 308: elif resp.status == 308:
self._in_error_state = False
# A "308 Resume Incomplete" indicates we are not done. # A "308 Resume Incomplete" indicates we are not done.
self.resumable_progress = int(resp['range'].split('-')[1]) + 1 self.resumable_progress = int(resp['range'].split('-')[1]) + 1
if 'location' in resp: if 'location' in resp:
self.resumable_uri = resp['location'] self.resumable_uri = resp['location']
else: else:
self._in_error_state = True
raise HttpError(resp, content, self.uri) raise HttpError(resp, content, self.uri)
return (MediaUploadProgress(self.resumable_progress, self.resumable.size()), return (MediaUploadProgress(self.resumable_progress, self.resumable.size()),
@@ -466,6 +668,7 @@ class HttpRequest(object):
d['resumable'] = self.resumable.to_json() d['resumable'] = self.resumable.to_json()
del d['http'] del d['http']
del d['postproc'] del d['postproc']
return simplejson.dumps(d) return simplejson.dumps(d)
@staticmethod @staticmethod

View File

@@ -27,6 +27,8 @@ import httplib2
import os import os
import unittest import unittest
import urlparse import urlparse
import StringIO
try: try:
from urlparse import parse_qs from urlparse import parse_qs
@@ -42,9 +44,10 @@ from apiclient.errors import UnacceptableMimeTypeError
from apiclient.http import HttpMock from apiclient.http import HttpMock
from apiclient.http import HttpMockSequence from apiclient.http import HttpMockSequence
from apiclient.http import MediaFileUpload from apiclient.http import MediaFileUpload
from apiclient.http import MediaIoBaseUpload
from apiclient.http import MediaUploadProgress from apiclient.http import MediaUploadProgress
from apiclient.http import tunnel_patch from apiclient.http import tunnel_patch
from oauth2client.anyjson import simplejson
DATA_DIR = os.path.join(os.path.dirname(__file__), 'data') DATA_DIR = os.path.join(os.path.dirname(__file__), 'data')
@@ -505,7 +508,90 @@ class Discovery(unittest.TestCase):
]) ])
self.assertRaises(HttpError, request.execute, http) self.assertRaises(HttpError, request.execute, http)
self.assertTrue(request._in_error_state)
http = HttpMockSequence([
({'status': '308',
'range': '0-5'}, ''),
({'status': '308',
'range': '0-6'}, ''),
])
status, body = request.next_chunk(http)
self.assertEquals(status.resumable_progress, 7,
'Should have first checked length and then tried to PUT more.')
self.assertFalse(request._in_error_state)
# Put it back in an error state.
http = HttpMockSequence([
({'status': '400'}, ''),
])
self.assertRaises(HttpError, request.execute, http)
self.assertTrue(request._in_error_state)
# Pretend the last request that 400'd actually succeeded.
http = HttpMockSequence([
({'status': '200'}, '{"foo": "bar"}'),
])
status, body = request.next_chunk(http)
self.assertEqual(body, {'foo': 'bar'})
def test_resumable_media_handle_uploads_of_unknown_size(self):
http = HttpMockSequence([
({'status': '200',
'location': 'http://upload.example.com'}, ''),
({'status': '200'}, 'echo_request_headers_as_json'),
])
self.http = HttpMock(datafile('zoo.json'), {'status': '200'})
zoo = build('zoo', 'v1', self.http)
fh = StringIO.StringIO('data goes here')
# Create an upload that doesn't know the full size of the media.
upload = MediaIoBaseUpload(
fh=fh, mimetype='image/png', chunksize=500, resumable=True)
request = zoo.animals().insert(media_body=upload, body=None)
status, body = request.next_chunk(http)
self.assertEqual(body, {'Content-Range': 'bytes 0-13/*'},
'Should be 14 out of * bytes.')
def test_resumable_media_handle_resume_of_upload_of_unknown_size(self):
http = HttpMockSequence([
({'status': '200',
'location': 'http://upload.example.com'}, ''),
({'status': '400'}, ''),
])
self.http = HttpMock(datafile('zoo.json'), {'status': '200'})
zoo = build('zoo', 'v1', self.http)
# Create an upload that doesn't know the full size of the media.
fh = StringIO.StringIO('data goes here')
upload = MediaIoBaseUpload(
fh=fh, mimetype='image/png', chunksize=500, resumable=True)
request = zoo.animals().insert(media_body=upload, body=None)
# Put it in an error state.
self.assertRaises(HttpError, request.next_chunk, http)
http = HttpMockSequence([
({'status': '400',
'range': '0-5'}, 'echo_request_headers_as_json'),
])
try:
# Should resume the upload by first querying the status of the upload.
request.next_chunk(http)
except HttpError, e:
expected = {
'Content-Range': 'bytes */*',
'content-length': '0'
}
self.assertEqual(expected, simplejson.loads(e.content),
'Should send an empty body when requesting the current upload status.')
class Next(unittest.TestCase): class Next(unittest.TestCase):

View File

@@ -25,6 +25,7 @@ __author__ = 'jcgregorio@google.com (Joe Gregorio)'
import httplib2 import httplib2
import os import os
import unittest import unittest
import StringIO
from apiclient.errors import BatchError from apiclient.errors import BatchError
from apiclient.http import BatchHttpRequest from apiclient.http import BatchHttpRequest
@@ -33,6 +34,7 @@ from apiclient.http import HttpRequest
from apiclient.http import MediaFileUpload from apiclient.http import MediaFileUpload
from apiclient.http import MediaUpload from apiclient.http import MediaUpload
from apiclient.http import MediaInMemoryUpload from apiclient.http import MediaInMemoryUpload
from apiclient.http import MediaIoBaseUpload
from apiclient.http import set_user_agent from apiclient.http import set_user_agent
from apiclient.model import JsonModel from apiclient.model import JsonModel
from oauth2client.client import Credentials from oauth2client.client import Credentials
@@ -110,6 +112,9 @@ class TestUserAgent(unittest.TestCase):
resp, content = http.request("http://example.com") resp, content = http.request("http://example.com")
self.assertEqual('my_app/5.5 my_library/0.1', content['user-agent']) self.assertEqual('my_app/5.5 my_library/0.1', content['user-agent'])
class TestMediaUpload(unittest.TestCase):
def test_media_file_upload_to_from_json(self): def test_media_file_upload_to_from_json(self):
upload = MediaFileUpload( upload = MediaFileUpload(
datafile('small.png'), chunksize=500, resumable=True) datafile('small.png'), chunksize=500, resumable=True)
@@ -176,6 +181,76 @@ class TestUserAgent(unittest.TestCase):
self.assertEqual(http, new_req.http) self.assertEqual(http, new_req.http)
self.assertEqual(media_upload.to_json(), new_req.resumable.to_json()) self.assertEqual(media_upload.to_json(), new_req.resumable.to_json())
class TestMediaIoBaseUpload(unittest.TestCase):
def test_media_io_base_upload_from_file_io(self):
try:
import io
fh = io.FileIO(datafile('small.png'), 'r')
upload = MediaIoBaseUpload(
fh=fh, mimetype='image/png', chunksize=500, resumable=True)
self.assertEqual('image/png', upload.mimetype())
self.assertEqual(190, upload.size())
self.assertEqual(True, upload.resumable())
self.assertEqual(500, upload.chunksize())
self.assertEqual('PNG', upload.getbytes(1, 3))
except ImportError:
pass
def test_media_io_base_upload_from_file_object(self):
f = open(datafile('small.png'), 'r')
upload = MediaIoBaseUpload(
fh=f, mimetype='image/png', chunksize=500, resumable=True)
self.assertEqual('image/png', upload.mimetype())
self.assertEqual(190, upload.size())
self.assertEqual(True, upload.resumable())
self.assertEqual(500, upload.chunksize())
self.assertEqual('PNG', upload.getbytes(1, 3))
f.close()
def test_media_io_base_upload_serializable(self):
f = open(datafile('small.png'), 'r')
upload = MediaIoBaseUpload(fh=f, mimetype='image/png')
try:
json = upload.to_json()
self.fail('MediaIoBaseUpload should not be serializable.')
except NotImplementedError:
pass
def test_media_io_base_upload_from_string_io(self):
f = open(datafile('small.png'), 'r')
fh = StringIO.StringIO(f.read())
f.close()
upload = MediaIoBaseUpload(
fh=fh, mimetype='image/png', chunksize=500, resumable=True)
self.assertEqual('image/png', upload.mimetype())
self.assertEqual(None, upload.size())
self.assertEqual(True, upload.resumable())
self.assertEqual(500, upload.chunksize())
self.assertEqual('PNG', upload.getbytes(1, 3))
f.close()
def test_media_io_base_upload_from_bytes(self):
try:
import io
f = open(datafile('small.png'), 'r')
fh = io.BytesIO(f.read())
upload = MediaIoBaseUpload(
fh=fh, mimetype='image/png', chunksize=500, resumable=True)
self.assertEqual('image/png', upload.mimetype())
self.assertEqual(None, upload.size())
self.assertEqual(True, upload.resumable())
self.assertEqual(500, upload.chunksize())
self.assertEqual('PNG', upload.getbytes(1, 3))
except ImportError:
pass
EXPECTED = """POST /someapi/v1/collection/?foo=bar HTTP/1.1 EXPECTED = """POST /someapi/v1/collection/?foo=bar HTTP/1.1
Content-Type: application/json Content-Type: application/json
MIME-Version: 1.0 MIME-Version: 1.0