Added first pass at utility functions for OAuth. Updated command-line sample to use the utility functions.

This commit is contained in:
Joe Gregorio
2010-09-01 16:45:45 -04:00
parent d69e5e4b5a
commit 67d77777ba
2 changed files with 140 additions and 79 deletions

130
apiclient/oauth.py Normal file
View File

@@ -0,0 +1,130 @@
#!/usr/bin/python2.4
#
# Copyright 2010 Google Inc. All Rights Reserved.
"""Utilities for OAuth.
Utilities for making it easier to work with OAuth.
"""
__author__ = 'jcgregorio@google.com (Joe Gregorio)'
import copy
import urllib
import oauth2 as oauth
try:
from urlparse import parse_qs, parse_qsl
except ImportError:
from cgi import parse_qs, parse_qsl
class MissingParameter(Exception):
pass
def abstract():
raise NotImplementedError("You need to override this function")
class TokenStore(object):
def get(user, service):
"""Returns an oauth.Token based on the (user, service) returning
None if there is no Token for that (user, service).
"""
abstract()
def set(user, service, token):
abstract()
buzz_discovery = {
'required': ['domain', 'scope'],
'request': {
'url': 'https://www.google.com/accounts/OAuthGetRequestToken',
'params': ['xoauth_displayname']
},
'authorize': {
'url': 'https://www.google.com/buzz/api/auth/OAuthAuthorizeToken',
'params': ['iconUrl', 'oauth_token']
},
'access': {
'url': 'https://www.google.com/accounts/OAuthGetAccessToken',
'params': []
},
}
def _oauth_uri(name, discovery, params):
if name not in ['request', 'access', 'authorize']:
raise KeyError(name)
keys = []
keys.extend(discovery['required'])
keys.extend(discovery[name]['params'])
query = {}
for key in keys:
if key in params:
query[key] = params[key]
return discovery[name]['url'] + '?' + urllib.urlencode(query)
class Flow3LO(object):
def __init__(self, discovery, consumer_key, consumer_secret, user_agent,
**kwargs):
self.discovery = discovery
self.consumer_key = consumer_key
self.consumer_secret = consumer_secret
self.user_agent = user_agent
self.params = kwargs
self.request_token = {}
for key in discovery['required']:
if key not in self.params:
raise MissingParameter('Required parameter %s not supplied' % key)
def step1(self, oauth_callback='oob'):
"""Returns a URI to redirect to the provider.
If oauth_callback is 'oob' then the next call
should be to step2_pin, otherwise oauth_callback
is a URI and the next call should be to
step2_callback() with the query parameters
received at that callback.
"""
consumer = oauth.Consumer(self.consumer_key, self.consumer_secret)
client = oauth.Client(consumer)
headers = {
'user-agent': self.user_agent,
'content-type': 'application/x-www-form-urlencoded'
}
body = urllib.urlencode({'oauth_callback': oauth_callback})
uri = _oauth_uri('request', self.discovery, self.params)
resp, content = client.request(uri, 'POST', headers=headers,
body=body)
if resp['status'] != '200':
print content
raise Exception('Invalid response %s.' % resp['status'])
self.request_token = dict(parse_qsl(content))
auth_params = copy.copy(self.params)
auth_params['oauth_token'] = self.request_token['oauth_token']
uri = _oauth_uri('authorize', self.discovery, auth_params)
return uri
def step2_pin(self, pin):
"""Returns an oauth_token and oauth_token_secret in a dictionary"""
token = oauth.Token(self.request_token['oauth_token'],
self.request_token['oauth_token_secret'])
token.set_verifier(pin)
consumer = oauth.Consumer(self.consumer_key, self.consumer_secret)
client = oauth.Client(consumer, token)
headers = {
'user-agent': self.user_agent,
'content-type': 'application/x-www-form-urlencoded'
}
uri = _oauth_uri('access', self.discovery, self.params)
resp, content = client.request(uri, 'POST', headers=headers)
return dict(parse_qsl(content))
def step2_callback(self, query_params):
"""Returns an access token via oauth.Token"""
pass

View File

@@ -1,97 +1,28 @@
import urlparse
import oauth2 as oauth
import httplib2
import urllib
from apiclient.oauth import buzz_discovery, Flow3LO
import simplejson
try:
from urlparse import parse_qs, parse_qsl
except ImportError:
from cgi import parse_qs, parse_qsl
httplib2.debuglevel = 4
headers = {'user-agent': 'google-api-client-python-buzz-cmdline/1.0',
'content-type': 'application/x-www-form-urlencoded'
}
user_agent = 'google-api-client-python-buzz-cmdline/1.0',
consumer_key = 'anonymous'
consumer_secret = 'anonymous'
request_token_url = ('https://www.google.com/accounts/OAuthGetRequestToken'
'?domain=anonymous&scope=https://www.googleapis.com/auth/buzz')
flow = Flow3LO(buzz_discovery, consumer_key, consumer_secret, user_agent,
domain='anonymous',
scope='https://www.googleapis.com/auth/buzz',
xoauth_displayname='Google API Client for Python Example App')
access_token_url = ('https://www.google.com/accounts/OAuthGetAccessToken'
'?domain=anonymous&scope=https://www.googleapis.com/auth/buzz')
authorize_url = ('https://www.google.com/buzz/api/auth/OAuthAuthorizeToken'
'?domain=anonymous&scope=https://www.googleapis.com/auth/buzz')
consumer = oauth.Consumer(consumer_key, consumer_secret)
client = oauth.Client(consumer)
# Step 1: Get a request token. This is a temporary token that is used for
# having the user authorize an access token and to sign the request to obtain
# said access token.
resp, content = client.request(request_token_url, 'POST', headers=headers,
body='oauth_callback=oob')
if resp['status'] != '200':
print content
raise Exception('Invalid response %s.' % resp['status'])
request_token = dict(parse_qsl(content))
print 'Request Token:'
print ' - oauth_token = %s' % request_token['oauth_token']
print ' - oauth_token_secret = %s' % request_token['oauth_token_secret']
print
# Step 2: Redirect to the provider. Since this is a CLI script we do not
# redirect. In a web application you would redirect the user to the URL
# below.
base_url = urlparse.urlparse(authorize_url)
query = parse_qs(base_url.query)
query['oauth_token'] = request_token['oauth_token']
print urllib.urlencode(query, True)
url = (base_url.scheme, base_url.netloc, base_url.path, base_url.params,
urllib.urlencode(query, True), base_url.fragment)
authorize_url = urlparse.urlunparse(url)
authorize_url = flow.step1()
print 'Go to the following link in your browser:'
print authorize_url
print
# After the user has granted access to you, the consumer, the provider will
# redirect you to whatever URL you have told them to redirect to. You can
# usually define this in the oauth_callback argument as well.
accepted = 'n'
while accepted.lower() == 'n':
accepted = raw_input('Have you authorized me? (y/n) ')
oauth_verifier = raw_input('What is the PIN? ').strip()
pin = raw_input('What is the PIN? ').strip()
# Step 3: Once the consumer has redirected the user back to the oauth_callback
# URL you can request the access token the user has approved. You use the
# request token to sign this request. After this is done you throw away the
# request token and use the access token returned. You should store this
# access token somewhere safe, like a database, for future use.
token = oauth.Token(request_token['oauth_token'],
request_token['oauth_token_secret'])
token.set_verifier(oauth_verifier)
client = oauth.Client(consumer, token)
resp, content = client.request(access_token_url, 'POST', headers=headers)
access_token = dict(parse_qsl(content))
print 'Access Token:'
print ' - oauth_token = %s' % access_token['oauth_token']
print ' - oauth_token_secret = %s' % access_token['oauth_token_secret']
print
print 'You may now access protected resources using the access tokens above.'
print
access_token = flow.step2_pin(pin)
d = dict(
consumer_key='anonymous',