105 lines
		
	
	
		
			3.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			105 lines
		
	
	
		
			3.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# Copyright (C) 2010 Google Inc.
 | 
						|
#
 | 
						|
# Licensed under the Apache License, Version 2.0 (the "License");
 | 
						|
# you may not use this file except in compliance with the License.
 | 
						|
# You may obtain a copy of the License at
 | 
						|
#
 | 
						|
#      http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
#
 | 
						|
# Unless required by applicable law or agreed to in writing, software
 | 
						|
# distributed under the License is distributed on an "AS IS" BASIS,
 | 
						|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 | 
						|
# See the License for the specific language governing permissions and
 | 
						|
# limitations under the License.
 | 
						|
 | 
						|
__author__ = 'ade@google.com'
 | 
						|
 | 
						|
import apiclient.discovery
 | 
						|
import logging
 | 
						|
import oauth_wrap
 | 
						|
import oauth2 as oauth
 | 
						|
import urllib
 | 
						|
import urlparse
 | 
						|
 | 
						|
try:
 | 
						|
  from urlparse import parse_qs, parse_qsl, urlparse
 | 
						|
except ImportError:
 | 
						|
  from cgi import parse_qs, parse_qsl
 | 
						|
 | 
						|
# TODO(ade) Replace user-agent with something specific
 | 
						|
HEADERS = {
 | 
						|
  'user-agent': 'gdata-python-v3-sample-client/0.1',
 | 
						|
  'content-type': 'application/x-www-form-urlencoded'
 | 
						|
  }
 | 
						|
 | 
						|
REQUEST_TOKEN_URL = 'https://www.google.com/accounts/OAuthGetRequestToken?domain=anonymous&scope=https://www.googleapis.com/auth/buzz'
 | 
						|
AUTHORIZE_URL = 'https://www.google.com/buzz/api/auth/OAuthAuthorizeToken?domain=anonymous&scope=https://www.googleapis.com/auth/buzz'
 | 
						|
ACCESS_TOKEN_URL = 'https://www.google.com/accounts/OAuthGetAccessToken'
 | 
						|
 | 
						|
class BuzzGaeClient(object):
 | 
						|
  def __init__(self, consumer_key='anonymous', consumer_secret='anonymous'):
 | 
						|
    self.consumer = oauth.Consumer(consumer_key, consumer_secret)
 | 
						|
    self.consumer_key = consumer_key
 | 
						|
    self.consumer_secret = consumer_secret
 | 
						|
 | 
						|
  def _make_post_request(self, client, url, parameters):
 | 
						|
    resp, content = client.request(url, 'POST', headers=HEADERS,
 | 
						|
        body=urllib.urlencode(parameters, True))
 | 
						|
 | 
						|
    if resp['status'] != '200':
 | 
						|
      logging.warn('Request: %s failed with status: %s. Content was: %s' % (url, resp['status'], content))
 | 
						|
      raise Exception('Invalid response %s.' % resp['status'])
 | 
						|
    return resp, content
 | 
						|
 | 
						|
  def get_request_token(self, callback_url, display_name = None):
 | 
						|
    parameters = {
 | 
						|
      'oauth_callback': callback_url
 | 
						|
      }
 | 
						|
 | 
						|
    if display_name is not None:
 | 
						|
      parameters['xoauth_displayname'] = display_name
 | 
						|
 | 
						|
    client = oauth.Client(self.consumer)
 | 
						|
    resp, content = self._make_post_request(client, REQUEST_TOKEN_URL, parameters)
 | 
						|
 | 
						|
    request_token = dict(parse_qsl(content))
 | 
						|
    return request_token
 | 
						|
 | 
						|
  def generate_authorisation_url(self, request_token):
 | 
						|
    """Returns the URL the user should be redirected to in other to gain access to their account."""
 | 
						|
    
 | 
						|
    base_url = urlparse.urlparse(AUTHORIZE_URL)
 | 
						|
    query = parse_qs(base_url.query)
 | 
						|
    query['oauth_token'] = request_token['oauth_token']
 | 
						|
 | 
						|
    logging.info(urllib.urlencode(query, True))
 | 
						|
 | 
						|
    url = (base_url.scheme, base_url.netloc, base_url.path, base_url.params,
 | 
						|
      urllib.urlencode(query, True), base_url.fragment)
 | 
						|
    authorisation_url = urlparse.urlunparse(url)
 | 
						|
    return authorisation_url
 | 
						|
 | 
						|
  def upgrade_to_access_token(self, request_token, oauth_verifier):
 | 
						|
    token = oauth.Token(request_token['oauth_token'],
 | 
						|
    request_token['oauth_token_secret'])
 | 
						|
    token.set_verifier(oauth_verifier)
 | 
						|
    client = oauth.Client(self.consumer, token)
 | 
						|
 | 
						|
    parameters = {}
 | 
						|
    resp, content = self._make_post_request(client, ACCESS_TOKEN_URL, parameters)
 | 
						|
    access_token = dict(parse_qsl(content))
 | 
						|
 | 
						|
    d = {
 | 
						|
      'consumer_key' : self.consumer_key,
 | 
						|
      'consumer_secret' : self.consumer_secret
 | 
						|
      }
 | 
						|
    d.update(access_token)
 | 
						|
    return d
 | 
						|
 | 
						|
  def build_api_client(self, oauth_params=None):
 | 
						|
    if oauth_params is not None:
 | 
						|
      http = oauth_wrap.get_authorised_http(oauth_params)
 | 
						|
      return apiclient.discovery.build('buzz', 'v1', http = http)
 | 
						|
    else:
 | 
						|
      return apiclient.discovery.build('buzz', 'v1')
 |