Add support for resumable upload.

Reviewed in http://codereview.appspot.com/5417051/.
This commit is contained in:
Joe Gregorio
2011-11-22 09:49:47 -05:00
parent dae2f55bc6
commit d0bd388679
5 changed files with 617 additions and 33 deletions

View File

@@ -26,6 +26,7 @@ import copy
import httplib2
import logging
import os
import random
import re
import uritemplate
import urllib
@@ -48,6 +49,8 @@ from errors import UnacceptableMimeTypeError
from errors import UnknownApiNameOrVersion
from errors import UnknownLinkType
from http import HttpRequest
from http import MediaUpload
from http import MediaFileUpload
from model import JsonModel
URITEMPLATE = re.compile('{[^}]*}')
@@ -325,6 +328,7 @@ def createResource(http, baseUrl, model, requestBuilder,
if 'mediaUpload' in methodDesc:
mediaUpload = methodDesc['mediaUpload']
mediaPathUrl = mediaUpload['protocols']['simple']['path']
mediaResumablePathUrl = mediaUpload['protocols']['resumable']['path']
accept = mediaUpload['accept']
maxSize = _media_size_to_long(mediaUpload.get('maxSize', ''))
@@ -440,28 +444,46 @@ def createResource(http, baseUrl, model, requestBuilder,
expanded_url = uritemplate.expand(pathUrl, params)
url = urlparse.urljoin(self._baseUrl, expanded_url + query)
resumable = None
multipart_boundary = ''
if media_filename:
(media_mime_type, encoding) = mimetypes.guess_type(media_filename)
if media_mime_type is None:
raise UnknownFileType(media_filename)
if not mimeparse.best_match([media_mime_type], ','.join(accept)):
raise UnacceptableMimeTypeError(media_mime_type)
# Convert a simple filename into a MediaUpload object.
if isinstance(media_filename, basestring):
(media_mime_type, encoding) = mimetypes.guess_type(media_filename)
if media_mime_type is None:
raise UnknownFileType(media_filename)
if not mimeparse.best_match([media_mime_type], ','.join(accept)):
raise UnacceptableMimeTypeError(media_mime_type)
media_upload = MediaFileUpload(media_filename, media_mime_type)
elif isinstance(media_filename, MediaUpload):
media_upload = media_filename
else:
raise TypeError(
'media_filename must be str or MediaUpload. Got %s' % type(media_upload))
if media_upload.resumable():
resumable = media_upload
# Check the maxSize
if maxSize > 0 and os.path.getsize(media_filename) > maxSize:
raise MediaUploadSizeError(media_filename)
if maxSize > 0 and media_upload.size() > maxSize:
raise MediaUploadSizeError("Media larger than: %s" % maxSize)
# Use the media path uri for media uploads
expanded_url = uritemplate.expand(mediaPathUrl, params)
if media_upload.resumable():
expanded_url = uritemplate.expand(mediaResumablePathUrl, params)
else:
expanded_url = uritemplate.expand(mediaPathUrl, params)
url = urlparse.urljoin(self._baseUrl, expanded_url + query)
if body is None:
headers['content-type'] = media_mime_type
# make the body the contents of the file
f = file(media_filename, 'rb')
body = f.read()
f.close()
# This is a simple media upload
headers['content-type'] = media_upload.mimetype()
expanded_url = uritemplate.expand(mediaResumablePathUrl, params)
if not media_upload.resumable():
body = media_upload.getbytes(0, media_upload.size())
else:
# This is a multipart/related upload.
msgRoot = MIMEMultipart('related')
# msgRoot should not write out it's own headers
setattr(msgRoot, '_write_headers', lambda self: None)
@@ -472,19 +494,51 @@ def createResource(http, baseUrl, model, requestBuilder,
msgRoot.attach(msg)
# attach the media as the second part
msg = MIMENonMultipart(*media_mime_type.split('/'))
msg = MIMENonMultipart(*media_upload.mimetype().split('/'))
msg['Content-Transfer-Encoding'] = 'binary'
f = file(media_filename, 'rb')
msg.set_payload(f.read())
f.close()
msgRoot.attach(msg)
if media_upload.resumable():
# This is a multipart resumable upload, where a multipart payload
# looks like this:
#
# --===============1678050750164843052==
# Content-Type: application/json
# MIME-Version: 1.0
#
# {'foo': 'bar'}
# --===============1678050750164843052==
# Content-Type: image/png
# MIME-Version: 1.0
# Content-Transfer-Encoding: binary
#
# <BINARY STUFF>
# --===============1678050750164843052==--
#
# In the case of resumable multipart media uploads, the <BINARY
# STUFF> is large and will be spread across multiple PUTs. What we
# do here is compose the multipart message with a random payload in
# place of <BINARY STUFF> and then split the resulting content into
# two pieces, text before <BINARY STUFF> and text after <BINARY
# STUFF>. The text after <BINARY STUFF> is the multipart boundary.
# In apiclient.http the HttpRequest will send the text before
# <BINARY STUFF>, then send the actual binary media in chunks, and
# then will send the multipart delimeter.
body = msgRoot.as_string()
payload = hex(random.getrandbits(300))
msg.set_payload(payload)
msgRoot.attach(msg)
body = msgRoot.as_string()
body, _ = body.split(payload)
resumable = media_upload
else:
payload = media_upload.getbytes(0, media_upload.size())
msg.set_payload(payload)
msgRoot.attach(msg)
body = msgRoot.as_string()
# must appear after the call to as_string() to get the right boundary
multipart_boundary = msgRoot.get_boundary()
headers['content-type'] = ('multipart/related; '
'boundary="%s"') % msgRoot.get_boundary()
'boundary="%s"') % multipart_boundary
logging.info('URL being requested: %s' % url)
return self._requestBuilder(self._http,
@@ -493,7 +547,8 @@ def createResource(http, baseUrl, model, requestBuilder,
method=httpMethod,
body=body,
headers=headers,
methodId=methodId)
methodId=methodId,
resumable=resumable)
docs = [methodDesc.get('description', DEFAULT_METHOD_DOC), '\n\n']
if len(argmap) > 0:

View File

@@ -85,6 +85,11 @@ class MediaUploadSizeError(Error):
pass
class ResumableUploadError(Error):
"""Error occured during resumable upload."""
pass
class UnexpectedMethodError(Error):
"""Exception raised by RequestMockBuilder on unexpected calls."""

View File

@@ -25,16 +25,187 @@ __all__ = [
'set_user_agent', 'tunnel_patch'
]
import copy
import httplib2
import os
import mimeparse
import mimetypes
from model import JsonModel
from errors import HttpError
from errors import ResumableUploadError
from errors import UnexpectedBodyError
from errors import UnexpectedMethodError
from anyjson import simplejson
class MediaUploadProgress(object):
"""Status of a resumable upload."""
def __init__(self, resumable_progress, total_size):
"""Constructor.
Args:
resumable_progress: int, bytes sent so far.
total_size: int, total bytes in complete upload.
"""
self.resumable_progress = resumable_progress
self.total_size = total_size
def progress(self):
"""Percent of upload completed, as a float."""
return float(self.resumable_progress)/float(self.total_size)
class MediaUpload(object):
"""Describes a media object to upload.
Base class that defines the interface of MediaUpload subclasses.
"""
def getbytes(self, begin, end):
raise NotImplementedError()
def size(self):
raise NotImplementedError()
def chunksize(self):
raise NotImplementedError()
def mimetype(self):
return 'application/octet-stream'
def resumable(self):
return False
def _to_json(self, strip=None):
"""Utility function for creating a JSON representation of a MediaUpload.
Args:
strip: array, An array of names of members to not include in the JSON.
Returns:
string, a JSON representation of this instance, suitable to pass to
from_json().
"""
t = type(self)
d = copy.copy(self.__dict__)
if strip is not None:
for member in strip:
del d[member]
d['_class'] = t.__name__
d['_module'] = t.__module__
return simplejson.dumps(d)
def to_json(self):
"""Create a JSON representation of an instance of MediaUpload.
Returns:
string, a JSON representation of this instance, suitable to pass to
from_json().
"""
return self._to_json()
@classmethod
def new_from_json(cls, s):
"""Utility class method to instantiate a MediaUpload subclass from a JSON
representation produced by to_json().
Args:
s: string, JSON from to_json().
Returns:
An instance of the subclass of MediaUpload that was serialized with
to_json().
"""
data = simplejson.loads(s)
# Find and call the right classmethod from_json() to restore the object.
module = data['_module']
m = __import__(module, fromlist=module.split('.')[:-1])
kls = getattr(m, data['_class'])
from_json = getattr(kls, 'from_json')
return from_json(s)
class MediaFileUpload(MediaUpload):
"""A MediaUpload for a file.
Construct a MediaFileUpload and pass as the media_body parameter of the
method. For example, if we had a service that allowed uploading images:
media = MediaFileUpload('smiley.png', mimetype='image/png', chunksize=1000,
resumable=True)
service.objects().insert(
bucket=buckets['items'][0]['id'],
name='smiley.png',
media_body=media).execute()
"""
def __init__(self, filename, mimetype=None, chunksize=10000, resumable=False):
"""Constructor.
Args:
filename: string, Name of the file.
mimetype: string, Mime-type of the file. If None then a mime-type will be
guessed from the file extension.
chunksize: int, File will be uploaded in chunks of this many bytes. Only
used if resumable=True.
resumable: bool, True if this is a resumable upload. False means upload in
a single request.
"""
self._filename = filename
self._size = os.path.getsize(filename)
self._fd = None
if mimetype is None:
(mimetype, encoding) = mimetypes.guess_type(filename)
self._mimetype = mimetype
self._chunksize = chunksize
self._resumable = resumable
def mimetype(self):
return self._mimetype
def size(self):
return self._size
def chunksize(self):
return self._chunksize
def resumable(self):
return self._resumable
def getbytes(self, begin, length):
"""Get bytes from the media.
Args:
begin: int, offset from beginning of file.
length: int, number of bytes to read, starting at begin.
Returns:
A string of bytes read. May be shorted than length if EOF was reached
first.
"""
if self._fd is None:
self._fd = open(self._filename, 'rb')
self._fd.seek(begin)
return self._fd.read(length)
def to_json(self):
"""Creating a JSON representation of an instance of Credentials.
Returns:
string, a JSON representation of this instance, suitable to pass to
from_json().
"""
return self._to_json(['_fd'])
@staticmethod
def from_json(s):
d = simplejson.loads(s)
return MediaFileUpload(
d['_filename'], d['_mimetype'], d['_chunksize'], d['_resumable'])
class HttpRequest(object):
"""Encapsulates a single HTTP request.
"""
@@ -43,7 +214,8 @@ class HttpRequest(object):
method='GET',
body=None,
headers=None,
methodId=None):
methodId=None,
resumable=None):
"""Constructor for an HttpRequest.
Args:
@@ -53,16 +225,39 @@ class HttpRequest(object):
on an error.
uri: string, the absolute URI to send the request to
method: string, the HTTP method to use
body: string, the request body of the HTTP request
body: string, the request body of the HTTP request,
headers: dict, the HTTP request headers
methodId: string, a unique identifier for the API method being called.
resumable: MediaUpload, None if this is not a resumbale request.
"""
self.uri = uri
self.method = method
self.body = body
self.headers = headers or {}
self.methodId = methodId
self.http = http
self.postproc = postproc
self.resumable = resumable
major, minor, params = mimeparse.parse_mime_type(
headers.get('content-type', 'application/json'))
self.multipart_boundary = params.get('boundary', '').strip('"')
# If this was a multipart resumable, the size of the non-media part.
self.multipart_size = 0
# The resumable URI to send chunks to.
self.resumable_uri = None
# The bytes that have been uploaded.
self.resumable_progress = 0
if resumable is not None:
if self.body is not None:
self.multipart_size = len(self.body)
else:
self.multipart_size = 0
self.total_size = self.resumable.size() + self.multipart_size + len(self.multipart_boundary)
def execute(self, http=None):
"""Execute the request.
@@ -81,14 +276,118 @@ class HttpRequest(object):
"""
if http is None:
http = self.http
resp, content = http.request(self.uri, self.method,
body=self.body,
headers=self.headers)
if self.resumable:
body = None
while body is None:
_, body = self.next_chunk(http)
return body
else:
resp, content = http.request(self.uri, self.method,
body=self.body,
headers=self.headers)
if resp.status >= 300:
raise HttpError(resp, content, self.uri)
if resp.status >= 300:
raise HttpError(resp, content, self.uri)
return self.postproc(resp, content)
def next_chunk(self, http=None):
"""Execute the next step of a resumable upload.
Can only be used if the method being executed supports media uploads and the
MediaUpload object passed in was flagged as using resumable upload.
Example:
media = MediaFileUpload('smiley.png', mimetype='image/png', chunksize=1000,
resumable=True)
request = service.objects().insert(
bucket=buckets['items'][0]['id'],
name='smiley.png',
media_body=media)
response = None
while response is None:
status, response = request.next_chunk()
if status:
print "Upload %d%% complete." % int(status.progress() * 100)
Returns:
(status, body): (ResumableMediaStatus, object)
The body will be None until the resumable media is fully uploaded.
"""
if http is None:
http = self.http
if self.resumable_uri is None:
start_headers = copy.copy(self.headers)
start_headers['X-Upload-Content-Type'] = self.resumable.mimetype()
start_headers['X-Upload-Content-Length'] = str(self.resumable.size())
start_headers['Content-Length'] = '0'
resp, content = http.request(self.uri, self.method,
body="",
headers=start_headers)
if resp.status == 200 and 'location' in resp:
self.resumable_uri = resp['location']
else:
raise ResumableUploadError("Failed to retrieve starting URI.")
if self.body:
begin = 0
data = self.body
else:
begin = self.resumable_progress - self.multipart_size
data = self.resumable.getbytes(begin, self.resumable.chunksize())
# Tack on the multipart/related boundary if we are at the end of the file.
if begin + self.resumable.chunksize() >= self.resumable.size():
data += self.multipart_boundary
headers = {
'Content-Range': 'bytes %d-%d/%d' % (
self.resumable_progress, self.resumable_progress + len(data) - 1,
self.total_size),
}
resp, content = http.request(self.resumable_uri, 'PUT',
body=data,
headers=headers)
if resp.status in [200, 201]:
return None, self.postproc(resp, content)
# A "308 Resume Incomplete" indicates we are not done.
elif resp.status == 308:
self.resumable_progress = int(resp['range'].split('-')[1]) + 1
if self.resumable_progress >= self.multipart_size:
self.body = None
if 'location' in resp:
self.resumable_uri = resp['location']
else:
raise HttpError(resp, content, self.uri)
return MediaUploadProgress(self.resumable_progress, self.total_size), None
def to_json(self):
"""Returns a JSON representation of the HttpRequest."""
d = copy.copy(self.__dict__)
if d['resumable'] is not None:
d['resumable'] = self.resumable.to_json()
del d['http']
del d['postproc']
return simplejson.dumps(d)
@staticmethod
def from_json(s, http, postproc):
"""Returns an HttpRequest populated with info from a JSON object."""
d = simplejson.loads(s)
if d['resumable'] is not None:
d['resumable'] = MediaUpload.new_from_json(d['resumable'])
return HttpRequest(
http,
postproc,
uri = d['uri'],
method= d['method'],
body=d['body'],
headers=d['headers'],
methodId=d['methodId'],
resumable=d['resumable'])
class HttpRequestMock(object):
"""Mock of HttpRequest.
@@ -166,7 +465,7 @@ class RequestMockBuilder(object):
self.check_unexpected = check_unexpected
def __call__(self, http, postproc, uri, method='GET', body=None,
headers=None, methodId=None):
headers=None, methodId=None, resumable=None):
"""Implements the callable interface that discovery.build() expects
of requestBuilder, which is to build an object compatible with
HttpRequest.execute(). See that method for the description of the

View File

@@ -34,13 +34,16 @@ except ImportError:
from cgi import parse_qs
from apiclient.discovery import build, build_from_document, key2param
from apiclient.http import HttpMock
from apiclient.http import tunnel_patch
from apiclient.http import HttpMockSequence
from apiclient.errors import HttpError
from apiclient.errors import InvalidJsonError
from apiclient.errors import MediaUploadSizeError
from apiclient.errors import ResumableUploadError
from apiclient.errors import UnacceptableMimeTypeError
from apiclient.http import HttpMock
from apiclient.http import HttpMockSequence
from apiclient.http import MediaFileUpload
from apiclient.http import MediaUploadProgress
from apiclient.http import tunnel_patch
DATA_DIR = os.path.join(os.path.dirname(__file__), 'data')
@@ -317,6 +320,169 @@ class Discovery(unittest.TestCase):
request = zoo.animals().insert(body={})
self.assertTrue(request.headers['content-type'], 'application/json')
def test_resumable_multipart_media_good_upload(self):
self.http = HttpMock(datafile('zoo.json'), {'status': '200'})
zoo = build('zoo', 'v1', self.http)
media_upload = MediaFileUpload(datafile('small.png'), resumable=True)
request = zoo.animals().insert(media_body=media_upload, body={})
self.assertTrue(request.headers['content-type'].startswith(
'multipart/related'))
self.assertEquals('--==', request.body[0:4])
self.assertEquals(media_upload, request.resumable)
self.assertEquals('image/png', request.resumable.mimetype())
self.assertTrue(len(request.multipart_boundary) > 0)
self.assertNotEquals(request.body, None)
self.assertEquals(request.resumable_uri, None)
http = HttpMockSequence([
({'status': '200',
'location': 'http://upload.example.com'}, ''),
({'status': '308',
'location': 'http://upload.example.com/2',
'range': '0-12'}, ''),
({'status': '308',
'location': 'http://upload.example.com/3',
'range': '0-%d' % (request.total_size - 2)}, ''),
({'status': '200'}, '{"foo": "bar"}'),
])
status, body = request.next_chunk(http)
self.assertEquals(None, body)
self.assertTrue(isinstance(status, MediaUploadProgress))
self.assertEquals(13, status.resumable_progress)
# request.body is not None because the server only acknowledged 12 bytes,
# which is less than the size of the body, so we need to send it again.
self.assertNotEquals(request.body, None)
# Two requests should have been made and the resumable_uri should have been
# updated for each one.
self.assertEquals(request.resumable_uri, 'http://upload.example.com/2')
self.assertEquals(media_upload, request.resumable)
self.assertEquals(13, request.resumable_progress)
status, body = request.next_chunk(http)
self.assertEquals(request.resumable_uri, 'http://upload.example.com/3')
self.assertEquals(request.total_size-1, request.resumable_progress)
self.assertEquals(request.body, None)
# Final call to next_chunk should complete the upload.
status, body = request.next_chunk(http)
self.assertEquals(body, {"foo": "bar"})
self.assertEquals(status, None)
def test_resumable_media_good_upload(self):
"""Not a multipart upload."""
self.http = HttpMock(datafile('zoo.json'), {'status': '200'})
zoo = build('zoo', 'v1', self.http)
media_upload = MediaFileUpload(datafile('small.png'), resumable=True)
request = zoo.animals().insert(media_body=media_upload, body=None)
self.assertTrue(request.headers['content-type'].startswith(
'image/png'))
self.assertEquals(media_upload, request.resumable)
self.assertEquals('image/png', request.resumable.mimetype())
self.assertEquals(request.multipart_boundary, '')
self.assertEquals(request.body, None)
self.assertEquals(request.resumable_uri, None)
http = HttpMockSequence([
({'status': '200',
'location': 'http://upload.example.com'}, ''),
({'status': '308',
'location': 'http://upload.example.com/2',
'range': '0-12'}, ''),
({'status': '308',
'location': 'http://upload.example.com/3',
'range': '0-%d' % (request.total_size - 2)}, ''),
({'status': '200'}, '{"foo": "bar"}'),
])
status, body = request.next_chunk(http)
self.assertEquals(None, body)
self.assertTrue(isinstance(status, MediaUploadProgress))
self.assertEquals(13, status.resumable_progress)
# Two requests should have been made and the resumable_uri should have been
# updated for each one.
self.assertEquals(request.resumable_uri, 'http://upload.example.com/2')
self.assertEquals(media_upload, request.resumable)
self.assertEquals(13, request.resumable_progress)
status, body = request.next_chunk(http)
self.assertEquals(request.resumable_uri, 'http://upload.example.com/3')
self.assertEquals(request.total_size-1, request.resumable_progress)
self.assertEquals(request.body, None)
# Final call to next_chunk should complete the upload.
status, body = request.next_chunk(http)
self.assertEquals(body, {"foo": "bar"})
self.assertEquals(status, None)
def test_resumable_media_good_upload_from_execute(self):
"""Not a multipart upload."""
self.http = HttpMock(datafile('zoo.json'), {'status': '200'})
zoo = build('zoo', 'v1', self.http)
media_upload = MediaFileUpload(datafile('small.png'), resumable=True)
request = zoo.animals().insert(media_body=media_upload, body=None)
http = HttpMockSequence([
({'status': '200',
'location': 'http://upload.example.com'}, ''),
({'status': '308',
'location': 'http://upload.example.com/2',
'range': '0-12'}, ''),
({'status': '308',
'location': 'http://upload.example.com/3',
'range': '0-%d' % (request.total_size - 2)}, ''),
({'status': '200'}, '{"foo": "bar"}'),
])
body = request.execute(http)
self.assertEquals(body, {"foo": "bar"})
def test_resumable_media_fail_unknown_response_code_first_request(self):
"""Not a multipart upload."""
self.http = HttpMock(datafile('zoo.json'), {'status': '200'})
zoo = build('zoo', 'v1', self.http)
media_upload = MediaFileUpload(datafile('small.png'), resumable=True)
request = zoo.animals().insert(media_body=media_upload, body=None)
http = HttpMockSequence([
({'status': '400',
'location': 'http://upload.example.com'}, ''),
])
self.assertRaises(ResumableUploadError, request.execute, http)
def test_resumable_media_fail_unknown_response_code_subsequent_request(self):
"""Not a multipart upload."""
self.http = HttpMock(datafile('zoo.json'), {'status': '200'})
zoo = build('zoo', 'v1', self.http)
media_upload = MediaFileUpload(datafile('small.png'), resumable=True)
request = zoo.animals().insert(media_body=media_upload, body=None)
http = HttpMockSequence([
({'status': '200',
'location': 'http://upload.example.com'}, ''),
({'status': '400'}, ''),
])
self.assertRaises(HttpError, request.execute, http)
class Next(unittest.TestCase):
def test_next_successful_none_on_no_next_page_token(self):

View File

@@ -23,12 +23,22 @@ __author__ = 'jcgregorio@google.com (Joe Gregorio)'
# Do not remove the httplib2 import
import httplib2
import os
import unittest
from apiclient.http import set_user_agent
from apiclient.http import HttpMockSequence
from apiclient.http import HttpRequest
from apiclient.http import MediaUpload
from apiclient.http import MediaFileUpload
DATA_DIR = os.path.join(os.path.dirname(__file__), 'data')
def datafile(filename):
return os.path.join(DATA_DIR, filename)
class TestUserAgent(unittest.TestCase):
def test_set_user_agent(self):
@@ -50,5 +60,54 @@ class TestUserAgent(unittest.TestCase):
resp, content = http.request("http://example.com")
self.assertEqual(content['user-agent'], 'my_app/5.5 my_library/0.1')
def test_media_file_upload_to_from_json(self):
upload = MediaFileUpload(
datafile('small.png'), chunksize=500, resumable=True)
self.assertEquals('image/png', upload.mimetype())
self.assertEquals(190, upload.size())
self.assertEquals(True, upload.resumable())
self.assertEquals(500, upload.chunksize())
self.assertEquals('PNG', upload.getbytes(1, 3))
json = upload.to_json()
new_upload = MediaUpload.new_from_json(json)
self.assertEquals('image/png', new_upload.mimetype())
self.assertEquals(190, new_upload.size())
self.assertEquals(True, new_upload.resumable())
self.assertEquals(500, new_upload.chunksize())
self.assertEquals('PNG', new_upload.getbytes(1, 3))
def test_http_request_to_from_json(self):
def _postproc(*kwargs):
pass
http = httplib2.Http()
media_upload = MediaFileUpload(
datafile('small.png'), chunksize=500, resumable=True)
req = HttpRequest(
http,
_postproc,
'http://example.com',
method='POST',
body='{}',
headers={'content-type': 'multipart/related; boundary="---flubber"'},
methodId='foo',
resumable=media_upload)
json = req.to_json()
new_req = HttpRequest.from_json(json, http, _postproc)
self.assertEquals(new_req.headers,
{'content-type':
'multipart/related; boundary="---flubber"'})
self.assertEquals(new_req.uri, 'http://example.com')
self.assertEquals(new_req.body, '{}')
self.assertEquals(new_req.http, http)
self.assertEquals(new_req.resumable.to_json(), media_upload.to_json())
self.assertEquals(new_req.multipart_boundary, '---flubber')
if __name__ == '__main__':
unittest.main()