From 67d77777ba107984365bcafc9d6c7effd458d554 Mon Sep 17 00:00:00 2001 From: Joe Gregorio Date: Wed, 1 Sep 2010 16:45:45 -0400 Subject: [PATCH] Added first pass at utility functions for OAuth. Updated command-line sample to use the utility functions. --- apiclient/oauth.py | 130 ++++++++++++++++++++++++++ samples/cmdline/three_legged_dance.py | 89 ++---------------- 2 files changed, 140 insertions(+), 79 deletions(-) create mode 100644 apiclient/oauth.py diff --git a/apiclient/oauth.py b/apiclient/oauth.py new file mode 100644 index 0000000..c1908d1 --- /dev/null +++ b/apiclient/oauth.py @@ -0,0 +1,130 @@ +#!/usr/bin/python2.4 +# +# Copyright 2010 Google Inc. All Rights Reserved. + +"""Utilities for OAuth. + +Utilities for making it easier to work with OAuth. +""" + +__author__ = 'jcgregorio@google.com (Joe Gregorio)' + +import copy +import urllib +import oauth2 as oauth + +try: + from urlparse import parse_qs, parse_qsl +except ImportError: + from cgi import parse_qs, parse_qsl +class MissingParameter(Exception): + pass + +def abstract(): + raise NotImplementedError("You need to override this function") + + +class TokenStore(object): + def get(user, service): + """Returns an oauth.Token based on the (user, service) returning + None if there is no Token for that (user, service). + """ + abstract() + + def set(user, service, token): + abstract() + +buzz_discovery = { + 'required': ['domain', 'scope'], + 'request': { + 'url': 'https://www.google.com/accounts/OAuthGetRequestToken', + 'params': ['xoauth_displayname'] + }, + 'authorize': { + 'url': 'https://www.google.com/buzz/api/auth/OAuthAuthorizeToken', + 'params': ['iconUrl', 'oauth_token'] + }, + 'access': { + 'url': 'https://www.google.com/accounts/OAuthGetAccessToken', + 'params': [] + }, + } + +def _oauth_uri(name, discovery, params): + if name not in ['request', 'access', 'authorize']: + raise KeyError(name) + keys = [] + keys.extend(discovery['required']) + keys.extend(discovery[name]['params']) + query = {} + for key in keys: + if key in params: + query[key] = params[key] + return discovery[name]['url'] + '?' + urllib.urlencode(query) + +class Flow3LO(object): + def __init__(self, discovery, consumer_key, consumer_secret, user_agent, + **kwargs): + self.discovery = discovery + self.consumer_key = consumer_key + self.consumer_secret = consumer_secret + self.user_agent = user_agent + self.params = kwargs + self.request_token = {} + for key in discovery['required']: + if key not in self.params: + raise MissingParameter('Required parameter %s not supplied' % key) + + def step1(self, oauth_callback='oob'): + """Returns a URI to redirect to the provider. + + If oauth_callback is 'oob' then the next call + should be to step2_pin, otherwise oauth_callback + is a URI and the next call should be to + step2_callback() with the query parameters + received at that callback. + """ + consumer = oauth.Consumer(self.consumer_key, self.consumer_secret) + client = oauth.Client(consumer) + + headers = { + 'user-agent': self.user_agent, + 'content-type': 'application/x-www-form-urlencoded' + } + body = urllib.urlencode({'oauth_callback': oauth_callback}) + uri = _oauth_uri('request', self.discovery, self.params) + resp, content = client.request(uri, 'POST', headers=headers, + body=body) + if resp['status'] != '200': + print content + raise Exception('Invalid response %s.' % resp['status']) + + self.request_token = dict(parse_qsl(content)) + + auth_params = copy.copy(self.params) + auth_params['oauth_token'] = self.request_token['oauth_token'] + + uri = _oauth_uri('authorize', self.discovery, auth_params) + return uri + + def step2_pin(self, pin): + """Returns an oauth_token and oauth_token_secret in a dictionary""" + + token = oauth.Token(self.request_token['oauth_token'], + self.request_token['oauth_token_secret']) + token.set_verifier(pin) + consumer = oauth.Consumer(self.consumer_key, self.consumer_secret) + client = oauth.Client(consumer, token) + + headers = { + 'user-agent': self.user_agent, + 'content-type': 'application/x-www-form-urlencoded' + } + + uri = _oauth_uri('access', self.discovery, self.params) + resp, content = client.request(uri, 'POST', headers=headers) + return dict(parse_qsl(content)) + + def step2_callback(self, query_params): + """Returns an access token via oauth.Token""" + pass diff --git a/samples/cmdline/three_legged_dance.py b/samples/cmdline/three_legged_dance.py index a6b1377..31a22df 100644 --- a/samples/cmdline/three_legged_dance.py +++ b/samples/cmdline/three_legged_dance.py @@ -1,97 +1,28 @@ -import urlparse -import oauth2 as oauth -import httplib2 -import urllib +from apiclient.oauth import buzz_discovery, Flow3LO + import simplejson -try: - from urlparse import parse_qs, parse_qsl -except ImportError: - from cgi import parse_qs, parse_qsl - -httplib2.debuglevel = 4 -headers = {'user-agent': 'google-api-client-python-buzz-cmdline/1.0', - 'content-type': 'application/x-www-form-urlencoded' - } - +user_agent = 'google-api-client-python-buzz-cmdline/1.0', consumer_key = 'anonymous' consumer_secret = 'anonymous' -request_token_url = ('https://www.google.com/accounts/OAuthGetRequestToken' - '?domain=anonymous&scope=https://www.googleapis.com/auth/buzz') +flow = Flow3LO(buzz_discovery, consumer_key, consumer_secret, user_agent, + domain='anonymous', + scope='https://www.googleapis.com/auth/buzz', + xoauth_displayname='Google API Client for Python Example App') -access_token_url = ('https://www.google.com/accounts/OAuthGetAccessToken' - '?domain=anonymous&scope=https://www.googleapis.com/auth/buzz') - -authorize_url = ('https://www.google.com/buzz/api/auth/OAuthAuthorizeToken' - '?domain=anonymous&scope=https://www.googleapis.com/auth/buzz') - -consumer = oauth.Consumer(consumer_key, consumer_secret) -client = oauth.Client(consumer) - -# Step 1: Get a request token. This is a temporary token that is used for -# having the user authorize an access token and to sign the request to obtain -# said access token. - -resp, content = client.request(request_token_url, 'POST', headers=headers, - body='oauth_callback=oob') -if resp['status'] != '200': - print content - raise Exception('Invalid response %s.' % resp['status']) - -request_token = dict(parse_qsl(content)) - -print 'Request Token:' -print ' - oauth_token = %s' % request_token['oauth_token'] -print ' - oauth_token_secret = %s' % request_token['oauth_token_secret'] -print - -# Step 2: Redirect to the provider. Since this is a CLI script we do not -# redirect. In a web application you would redirect the user to the URL -# below. - -base_url = urlparse.urlparse(authorize_url) -query = parse_qs(base_url.query) -query['oauth_token'] = request_token['oauth_token'] - -print urllib.urlencode(query, True) - -url = (base_url.scheme, base_url.netloc, base_url.path, base_url.params, - urllib.urlencode(query, True), base_url.fragment) -authorize_url = urlparse.urlunparse(url) +authorize_url = flow.step1() print 'Go to the following link in your browser:' print authorize_url print -# After the user has granted access to you, the consumer, the provider will -# redirect you to whatever URL you have told them to redirect to. You can -# usually define this in the oauth_callback argument as well. accepted = 'n' while accepted.lower() == 'n': accepted = raw_input('Have you authorized me? (y/n) ') -oauth_verifier = raw_input('What is the PIN? ').strip() +pin = raw_input('What is the PIN? ').strip() - -# Step 3: Once the consumer has redirected the user back to the oauth_callback -# URL you can request the access token the user has approved. You use the -# request token to sign this request. After this is done you throw away the -# request token and use the access token returned. You should store this -# access token somewhere safe, like a database, for future use. -token = oauth.Token(request_token['oauth_token'], - request_token['oauth_token_secret']) -token.set_verifier(oauth_verifier) -client = oauth.Client(consumer, token) - -resp, content = client.request(access_token_url, 'POST', headers=headers) -access_token = dict(parse_qsl(content)) - -print 'Access Token:' -print ' - oauth_token = %s' % access_token['oauth_token'] -print ' - oauth_token_secret = %s' % access_token['oauth_token_secret'] -print -print 'You may now access protected resources using the access tokens above.' -print +access_token = flow.step2_pin(pin) d = dict( consumer_key='anonymous',