Added sample that shows how to use threads and queues to batch up requests.

This commit is contained in:
Joe Gregorio
2010-10-14 08:27:59 -04:00
parent 7918e2345c
commit 0c73a67bef
3 changed files with 171 additions and 4 deletions

View File

@@ -42,7 +42,11 @@ class Error(Exception):
class HttpError(Error):
"""HTTP data was invalid or unexpected."""
pass
def __init__(self, resp, detail):
self.resp = resp
self.detail = detail
def __str__(self):
return self.detail
class UnknownLinkType(Error):
@@ -110,10 +114,10 @@ class JsonModel(object):
return simplejson.loads(content)['data']
else:
logging.debug('Content from bad request was: %s' % content)
if resp.get('content-type', '') != 'application/json':
raise HttpError('%d %s' % (resp.status, resp.reason))
if resp.get('content-type', '').startswith('application/json'):
raise HttpError(resp, simplejson.loads(content)['error'])
else:
raise HttpError(simplejson.loads(content)['error'])
raise HttpError(resp, '%d %s' % (resp.status, resp.reason))
def build(serviceName, version, http=None,

107
samples/threadqueue/main.py Normal file
View File

@@ -0,0 +1,107 @@
from apiclient.discovery import build
from apiclient.discovery import HttpError
import Queue
import httplib2
import pickle
import threading
import time
# Uncomment to get detailed logging
# httplib2.debuglevel = 4
NUM_THREADS = 4
NUM_ITEMS = 40
queue = Queue.Queue()
class Backoff:
"""Exponential Backoff
Implements an exponential backoff algorithm.
"""
def __init__(self, maxretries=8):
self.retry = 0
self.maxretries = maxretries
self.first = True
def loop(self):
if self.first:
self.first = False
return True
else:
return self.retry < self.maxretries
def fail(self):
self.retry += 1
delay = 2**self.retry
time.sleep(delay)
def start_threads(credentials):
# Start up NUM_THREADS to handle requests
def process_requests():
http = httplib2.Http()
http = credentials.authorize(http)
while True:
request = queue.get()
backoff = Backoff()
while backoff.loop():
try:
request.execute(http)
break
except HttpError, e:
if e.resp.status in [402, 403, 408, 503, 504]:
print "Increasing backoff, got status code: %d" % e.resp.status
backoff.fail()
print "Completed request"
queue.task_done()
for i in range(NUM_THREADS):
t = threading.Thread(target=process_requests)
t.daemon = True
t.start()
def main():
f = open("moderator.dat", "r")
credentials = pickle.loads(f.read())
f.close()
start_threads(credentials)
http = httplib2.Http()
http = credentials.authorize(http)
p = build("moderator", "v1", http=http)
series_body = {
"description": "An example of bulk creating topics",
"name": "Using threading and queues",
"videoSubmissionAllowed": False
}
series = p.series().insert(body=series_body).execute()
print "Created a new series"
for i in range(NUM_ITEMS):
topic_body = {
"data": {
"description": "Sample Topic # %d" % i,
"name": "Sample",
"presenter": "me"
}
}
topic_request = p.topics().insert(seriesId=series['id']['seriesId'], body=topic_body)
print "Adding request to queue"
queue.put(topic_request)
queue.join()
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,56 @@
# Copyright (C) 2010 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Do the OAuth 1.0a three legged dance.
Do the OAuth 1.0a three legged dance for
a Buzz command line application. Store the generated
credentials in a common file that is used by
other example apps in the same directory.
"""
__author__ = 'jcgregorio@google.com (Joe Gregorio)'
from apiclient.discovery import build
from apiclient.oauth import FlowThreeLegged
import pickle
moderator_discovery = build("moderator", "v1").auth_discovery()
flow = FlowThreeLegged(moderator_discovery,
consumer_key='anonymous',
consumer_secret='anonymous',
user_agent='google-api-client-python-threadqueue-sample/1.0',
domain='anonymous',
scope='https://www.googleapis.com/auth/moderator',
#scope='tag:google.com,2010:auth/moderator',
xoauth_displayname='Google API Client Example App')
authorize_url = flow.step1_get_authorize_url()
print 'Go to the following link in your browser:'
print authorize_url
print
accepted = 'n'
while accepted.lower() == 'n':
accepted = raw_input('Have you authorized me? (y/n) ')
verification = raw_input('What is the verification code? ').strip()
credentials = flow.step2_exchange(verification)
f = open('moderator.dat', 'w')
f.write(pickle.dumps(credentials))
f.close()