Also introduces util.positional declarations. Reviewed in http://codereview.appspot.com/6441056/. Fixes issue #136.
160 lines
4.2 KiB
Python
160 lines
4.2 KiB
Python
# Copyright (C) 2010 Google Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
"""Sample for threading and queues.
|
|
|
|
A simple sample that processes many requests by constructing a threadpool and
|
|
passing client requests by a thread queue to be processed.
|
|
"""
|
|
from apiclient.discovery import build
|
|
from apiclient.errors import HttpError
|
|
from oauth2client.file import Storage
|
|
from oauth2client.client import OAuth2WebServerFlow
|
|
from oauth2client.tools import run
|
|
|
|
import Queue
|
|
import gflags
|
|
import httplib2
|
|
import logging
|
|
import sys
|
|
import threading
|
|
import time
|
|
|
|
# How many threads to start.
|
|
NUM_THREADS = 3
|
|
|
|
# A list of URLs to shorten.
|
|
BULK = [
|
|
"https://code.google.com/apis/moderator/",
|
|
"https://code.google.com/apis/latitude/",
|
|
"https://code.google.com/apis/urlshortener/",
|
|
"https://code.google.com/apis/customsearch/",
|
|
"https://code.google.com/apis/shopping/search/",
|
|
"https://code.google.com/apis/predict",
|
|
"https://code.google.com/more",
|
|
]
|
|
|
|
FLAGS = gflags.FLAGS
|
|
FLOW = OAuth2WebServerFlow(
|
|
client_id='433807057907.apps.googleusercontent.com',
|
|
client_secret='jigtZpMApkRxncxikFpR+SFg',
|
|
scope='https://www.googleapis.com/auth/urlshortener',
|
|
user_agent='urlshortener-cmdline-sample/1.0')
|
|
|
|
gflags.DEFINE_enum('logging_level', 'ERROR',
|
|
['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
|
|
'Set the level of logging detail.')
|
|
|
|
queue = Queue.Queue()
|
|
|
|
|
|
class Backoff:
|
|
"""Exponential Backoff
|
|
|
|
Implements an exponential backoff algorithm.
|
|
Instantiate and call loop() each time through
|
|
the loop, and each time a request fails call
|
|
fail() which will delay an appropriate amount
|
|
of time.
|
|
"""
|
|
|
|
def __init__(self, maxretries=8):
|
|
self.retry = 0
|
|
self.maxretries = maxretries
|
|
self.first = True
|
|
|
|
def loop(self):
|
|
if self.first:
|
|
self.first = False
|
|
return True
|
|
else:
|
|
return self.retry < self.maxretries
|
|
|
|
def fail(self):
|
|
self.retry += 1
|
|
delay = 2 ** self.retry
|
|
time.sleep(delay)
|
|
|
|
|
|
def start_threads(credentials):
|
|
"""Create the thread pool to process the requests."""
|
|
|
|
def process_requests(n):
|
|
http = httplib2.Http()
|
|
http = credentials.authorize(http)
|
|
loop = True
|
|
|
|
|
|
while loop:
|
|
request = queue.get()
|
|
backoff = Backoff()
|
|
while backoff.loop():
|
|
try:
|
|
response = request.execute(http=http)
|
|
print "Processed: %s in thread %d" % (response['id'], n)
|
|
break
|
|
except HttpError, e:
|
|
if e.resp.status in [402, 403, 408, 503, 504]:
|
|
print "Increasing backoff, got status code: %d" % e.resp.status
|
|
backoff.fail()
|
|
except Exception, e:
|
|
print "Unexpected error. Exiting." + str(e)
|
|
loop = False
|
|
break
|
|
|
|
print "Completed request"
|
|
queue.task_done()
|
|
|
|
|
|
for i in range(NUM_THREADS):
|
|
t = threading.Thread(target=process_requests, args=[i])
|
|
t.daemon = True
|
|
t.start()
|
|
|
|
|
|
def main(argv):
|
|
try:
|
|
argv = FLAGS(argv)
|
|
except gflags.FlagsError, e:
|
|
print '%s\\nUsage: %s ARGS\\n%s' % (e, argv[0], FLAGS)
|
|
sys.exit(1)
|
|
|
|
logging.getLogger().setLevel(getattr(logging, FLAGS.logging_level))
|
|
|
|
storage = Storage('threadqueue.dat')
|
|
credentials = storage.get()
|
|
if credentials is None or credentials.invalid == True:
|
|
credentials = run(FLOW, storage)
|
|
|
|
start_threads(credentials)
|
|
|
|
http = httplib2.Http()
|
|
http = credentials.authorize(http)
|
|
|
|
service = build("urlshortener", "v1", http=http,
|
|
developerKey="AIzaSyDRRpR3GS1F1_jKNNM9HCNd2wJQyPG3oN0")
|
|
shortener = service.url()
|
|
|
|
for url in BULK:
|
|
body = {"longUrl": url }
|
|
shorten_request = shortener.insert(body=body)
|
|
print "Adding request to queue"
|
|
queue.put(shorten_request)
|
|
|
|
# Wait for all the requests to finish
|
|
queue.join()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main(sys.argv)
|