Switch over the threadqueue sample from moderator to urlshortener.

This commit is contained in:
Joe Gregorio
2011-03-29 16:29:37 -04:00
parent 5cdaa51eec
commit 05375c013f
2 changed files with 83 additions and 61 deletions

View File

@@ -1,22 +1,60 @@
from apiclient.discovery import build
# Copyright (C) 2010 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Sample for threading and queues.
A simple sample that processes many requests by constructing a threadpool and
passing client requests by a thread queue to be processed.
"""
from apiclient.discovery import build
from apiclient.errors import HttpError
from apiclient.ext.authtools import run
from apiclient.ext.file import Storage
from apiclient.oauth import CredentialsInvalidError
from apiclient.oauth import FlowThreeLegged
from oauth2client.file import Storage
from oauth2client.client import OAuth2WebServerFlow
from oauth2client.tools import run
import Queue
import gflags
import httplib2
import logging
import sys
import threading
import time
# Uncomment to get detailed logging
# httplib2.debuglevel = 4
# How many threads to start.
NUM_THREADS = 3
# A list of URLs to shorten.
BULK = [
"https://code.google.com/apis/buzz/",
"https://code.google.com/apis/moderator/",
"https://code.google.com/apis/latitude/",
"https://code.google.com/apis/urlshortener/",
"https://code.google.com/apis/customsearch/",
"https://code.google.com/apis/shopping/search/",
"https://code.google.com/apis/predict",
"https://code.google.com/more",
]
NUM_THREADS = 4
NUM_ITEMS = 40
FLAGS = gflags.FLAGS
FLOW = OAuth2WebServerFlow(
client_id='433807057907.apps.googleusercontent.com',
client_secret='jigtZpMApkRxncxikFpR+SFg',
scope='https://www.googleapis.com/auth/urlshortener',
user_agent='urlshortener-cmdline-sample/1.0')
gflags.DEFINE_enum('logging_level', 'ERROR',
['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
'Set the level of logging detail.')
queue = Queue.Queue()
@@ -25,6 +63,10 @@ class Backoff:
"""Exponential Backoff
Implements an exponential backoff algorithm.
Instantiate and call loop() each time through
the loop, and each time a request fails call
fail() which will delay an appropriate amount
of time.
"""
def __init__(self, maxretries=8):
@@ -46,27 +88,29 @@ class Backoff:
def start_threads(credentials):
# Start up NUM_THREADS to handle requests
"""Create the thread pool to process the requests."""
def process_requests():
def process_requests(n):
http = httplib2.Http()
http = credentials.authorize(http)
credentials_ok = True
loop = True
while credentials_ok:
while loop:
request = queue.get()
backoff = Backoff()
while backoff.loop():
try:
request.execute(http)
response = request.execute(http)
print "Processed: %s in thread %d" % (response['id'], n)
break
except HttpError, e:
if e.resp.status in [402, 403, 408, 503, 504]:
print "Increasing backoff, got status code: %d" % e.resp.status
backoff.fail()
except CredentialsInvalidError:
print "Credentials no long valid. Exiting."
credentials_ok = False
except Exception, e:
print "Unexpected error. Exiting." + str(e)
loop = False
break
print "Completed request"
@@ -74,64 +118,43 @@ def start_threads(credentials):
for i in range(NUM_THREADS):
t = threading.Thread(target=process_requests)
t = threading.Thread(target=process_requests, args=[i])
t.daemon = True
t.start()
def main():
storage = Storage('moderator.dat')
def main(argv):
try:
argv = FLAGS(argv)
except gflags.FlagsError, e:
print '%s\\nUsage: %s ARGS\\n%s' % (e, argv[0], FLAGS)
sys.exit(1)
logging.getLogger().setLevel(getattr(logging, FLAGS.logging_level))
storage = Storage('threadqueue.dat')
credentials = storage.get()
if credentials is None or credentials.invalid == True:
moderator_discovery = build("moderator", "v1").auth_discovery()
flow = FlowThreeLegged(moderator_discovery,
consumer_key='anonymous',
consumer_secret='anonymous',
user_agent='python-threading-sample/1.0',
domain='anonymous',
scope='https://www.googleapis.com/auth/moderator',
xoauth_displayname='Google API Client Example App')
credentials = run(flow, storage)
credentials = run(FLOW, storage)
start_threads(credentials)
http = httplib2.Http()
http = credentials.authorize(http)
service = build("moderator", "v1", http=http)
series_body = {
"data": {
"description": "An example of bulk creating topics",
"name": "Using threading and queues",
"videoSubmissionAllowed": False
}
}
try:
series = service.series().insert(body=series_body).execute()
print "Created a new series"
for i in range(NUM_ITEMS):
topic_body = {
"data": {
"description": "Sample Topic # %d" % i,
"name": "Sample",
"presenter": "me"
}
}
topic_request = service.topics().insert(
seriesId=series['id']['seriesId'], body=topic_body)
print "Adding request to queue"
queue.put(topic_request)
except CredentialsInvalidError:
print 'Your credentials are no longer valid.'
print 'Please re-run this application to re-authorize.'
service = build("urlshortener", "v1", http=http,
developerKey="AIzaSyDRRpR3GS1F1_jKNNM9HCNd2wJQyPG3oN0")
shortener = service.url()
for url in BULK:
body = {"longUrl": url }
shorten_request = shortener.insert(body=body)
print "Adding request to queue"
queue.put(shorten_request)
# Wait for all the requests to finish
queue.join()
if __name__ == "__main__":
main()
main(sys.argv)

View File

@@ -19,7 +19,6 @@ import sys
from apiclient.discovery import build
from oauth2client.file import Storage
from oauth2client.client import OAuth2WebServerFlow
from oauth2client.client import AccessTokenCredentials
from oauth2client.tools import run
FLAGS = gflags.FLAGS