Add XSRF protection to oauth2decorator callback.

Also update all samples to use XSRF callback protection.

Reviewed in https://codereview.appspot.com/6473053/.
This commit is contained in:
Joe Gregorio
2012-08-24 11:57:58 -04:00
parent a633c7974d
commit 6ceea2de24
15 changed files with 470 additions and 214 deletions

View File

@@ -22,18 +22,20 @@ __author__ = 'jcgregorio@google.com (Joe Gregorio)'
import base64
import httplib2
import logging
import os
import pickle
import time
import clientsecrets
from google.appengine.api import app_identity
from google.appengine.api import memcache
from google.appengine.api import users
from google.appengine.ext import db
from google.appengine.ext import webapp
from google.appengine.ext.webapp.util import login_required
from google.appengine.ext.webapp.util import run_wsgi_app
from oauth2client import clientsecrets
from oauth2client import util
from oauth2client import xsrfutil
from oauth2client.anyjson import simplejson
from oauth2client.client import AccessTokenRefreshError
from oauth2client.client import AssertionCredentials
@@ -46,19 +48,60 @@ logger = logging.getLogger(__name__)
OAUTH2CLIENT_NAMESPACE = 'oauth2client#ns'
XSRF_MEMCACHE_ID = 'xsrf_secret_key'
class InvalidClientSecretsError(Exception):
"""The client_secrets.json file is malformed or missing required fields."""
pass
class InvalidXsrfTokenError(Exception):
"""The XSRF token is invalid or expired."""
class SiteXsrfSecretKey(db.Model):
"""Storage for the sites XSRF secret key.
There will only be one instance stored of this model, the one used for the
site. """
secret = db.StringProperty()
def _generate_new_xsrf_secret_key():
"""Returns a random XSRF secret key.
"""
return os.urandom(16).encode("hex")
def xsrf_secret_key():
"""Return the secret key for use for XSRF protection.
If the Site entity does not have a secret key, this method will also create
one and persist it.
Returns:
The secret key.
"""
secret = memcache.get(XSRF_MEMCACHE_ID, namespace=OAUTH2CLIENT_NAMESPACE)
if not secret:
# Load the one and only instance of SiteXsrfSecretKey.
model = SiteXsrfSecretKey.get_or_insert(key_name='site')
if not model.secret:
model.secret = _generate_new_xsrf_secret_key()
model.put()
secret = model.secret
memcache.add(XSRF_MEMCACHE_ID, secret, namespace=OAUTH2CLIENT_NAMESPACE)
return str(secret)
class AppAssertionCredentials(AssertionCredentials):
"""Credentials object for App Engine Assertion Grants
This object will allow an App Engine application to identify itself to Google
and other OAuth 2.0 servers that can verify assertions. It can be used for
the purpose of accessing data stored under an account assigned to the App
Engine application itself.
and other OAuth 2.0 servers that can verify assertions. It can be used for the
purpose of accessing data stored under an account assigned to the App Engine
application itself.
This credential does not require a flow to instantiate because it represents
a two legged flow, and therefore has all of the required information to
@@ -263,6 +306,48 @@ class CredentialsModel(db.Model):
credentials = CredentialsProperty()
def _build_state_value(request_handler, user):
"""Composes the value for the 'state' parameter.
Packs the current request URI and an XSRF token into an opaque string that
can be passed to the authentication server via the 'state' parameter.
Args:
request_handler: webapp.RequestHandler, The request.
user: google.appengine.api.users.User, The current user.
Returns:
The state value as a string.
"""
uri = request_handler.request.url
token = xsrfutil.generate_token(xsrf_secret_key(), user.user_id(),
action_id=str(uri))
return uri + ':' + token
def _parse_state_value(state, user):
"""Parse the value of the 'state' parameter.
Parses the value and validates the XSRF token in the state parameter.
Args:
state: string, The value of the state parameter.
user: google.appengine.api.users.User, The current user.
Raises:
InvalidXsrfTokenError: if the XSRF token is invalid.
Returns:
The redirect URI.
"""
uri, token = state.rsplit(':', 1)
if not xsrfutil.validate_token(xsrf_secret_key(), token, user.user_id(),
action_id=uri):
raise InvalidXsrfTokenError()
return uri
class OAuth2Decorator(object):
"""Utility for making OAuth 2.0 easier.
@@ -361,14 +446,14 @@ class OAuth2Decorator(object):
self._create_flow(request_handler)
# Store the request URI in 'state' so we can use it later
self.flow.params['state'] = request_handler.request.url
self.flow.params['state'] = _build_state_value(request_handler, user)
self.credentials = StorageByKeyName(
CredentialsModel, user.user_id(), 'credentials').get()
if not self.has_credentials():
return request_handler.redirect(self.authorize_url())
try:
method(request_handler, *args, **kwargs)
return method(request_handler, *args, **kwargs)
except AccessTokenRefreshError:
return request_handler.redirect(self.authorize_url())
@@ -422,10 +507,10 @@ class OAuth2Decorator(object):
self._create_flow(request_handler)
self.flow.params['state'] = request_handler.request.url
self.flow.params['state'] = _build_state_value(request_handler, user)
self.credentials = StorageByKeyName(
CredentialsModel, user.user_id(), 'credentials').get()
method(request_handler, *args, **kwargs)
return method(request_handler, *args, **kwargs)
return setup_oauth
def has_credentials(self):
@@ -500,7 +585,9 @@ class OAuth2Decorator(object):
credentials = decorator.flow.step2_exchange(self.request.params)
StorageByKeyName(
CredentialsModel, user.user_id(), 'credentials').put(credentials)
self.redirect(str(self.request.get('state')))
redirect_uri = _parse_state_value(str(self.request.get('state')),
user)
self.redirect(redirect_uri)
return OAuth2Handler
@@ -550,26 +637,24 @@ class OAuth2DecoratorFromClientSecrets(OAuth2Decorator):
scope: string or list of strings, scope(s) of the credentials being
requested.
message: string, A friendly string to display to the user if the
clientsecrets file is missing or invalid. The message may contain HTML and
will be presented on the web interface for any method that uses the
clientsecrets file is missing or invalid. The message may contain HTML
and will be presented on the web interface for any method that uses the
decorator.
cache: An optional cache service client that implements get() and set()
methods. See clientsecrets.loadfile() for details.
"""
try:
client_type, client_info = clientsecrets.loadfile(filename, cache=cache)
if client_type not in [clientsecrets.TYPE_WEB, clientsecrets.TYPE_INSTALLED]:
raise InvalidClientSecretsError('OAuth2Decorator doesn\'t support this OAuth 2.0 flow.')
super(OAuth2DecoratorFromClientSecrets,
self).__init__(
client_info['client_id'],
client_info['client_secret'],
scope,
auth_uri=client_info['auth_uri'],
token_uri=client_info['token_uri'],
message=message)
except clientsecrets.InvalidClientSecretsError:
self._in_error = True
client_type, client_info = clientsecrets.loadfile(filename, cache=cache)
if client_type not in [
clientsecrets.TYPE_WEB, clientsecrets.TYPE_INSTALLED]:
raise InvalidClientSecretsError(
'OAuth2Decorator doesn\'t support this OAuth 2.0 flow.')
super(OAuth2DecoratorFromClientSecrets, self).__init__(
client_info['client_id'],
client_info['client_secret'],
scope,
auth_uri=client_info['auth_uri'],
token_uri=client_info['token_uri'],
message=message)
if message is not None:
self._message = message
else:

View File

@@ -1141,7 +1141,7 @@ class OAuth2WebServerFlow(Flow):
if 'id_token' in d:
d['id_token'] = _extract_id_token(d['id_token'])
logger.info('Successfully retrieved access token: %s' % content)
logger.info('Successfully retrieved access token')
return OAuth2Credentials(access_token, self.client_id,
self.client_secret, refresh_token, token_expiry,
self.token_uri, self.user_agent,

106
oauth2client/xsrfutil.py Normal file
View File

@@ -0,0 +1,106 @@
#!/usr/bin/python2.5
#
# Copyright 2010 the Melange authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Helper methods for creating & verifying XSRF tokens."""
__authors__ = [
'"Doug Coker" <dcoker@google.com>',
'"Joe Gregorio" <jcgregorio@google.com>',
]
import base64
import hmac
import os # for urandom
import time
from oauth2client import util
# Delimiter character
DELIMITER = ':'
# 1 hour in seconds
DEFAULT_TIMEOUT_SECS = 1*60*60
@util.positional(2)
def generate_token(key, user_id, action_id="", when=None):
"""Generates a URL-safe token for the given user, action, time tuple.
Args:
key: secret key to use.
user_id: the user ID of the authenticated user.
action_id: a string identifier of the action they requested
authorization for.
when: the time in seconds since the epoch at which the user was
authorized for this action. If not set the current time is used.
Returns:
A string XSRF protection token.
"""
when = when or int(time.time())
digester = hmac.new(key)
digester.update(str(user_id))
digester.update(DELIMITER)
digester.update(action_id)
digester.update(DELIMITER)
digester.update(str(when))
digest = digester.digest()
token = base64.urlsafe_b64encode('%s%s%d' % (digest,
DELIMITER,
when))
return token
@util.positional(3)
def validate_token(key, token, user_id, action_id="", current_time=None):
"""Validates that the given token authorizes the user for the action.
Tokens are invalid if the time of issue is too old or if the token
does not match what generateToken outputs (i.e. the token was forged).
Args:
key: secret key to use.
token: a string of the token generated by generateToken.
user_id: the user ID of the authenticated user.
action_id: a string identifier of the action they requested
authorization for.
Returns:
A boolean - True if the user is authorized for the action, False
otherwise.
"""
if not token:
return False
try:
decoded = base64.urlsafe_b64decode(str(token))
token_time = long(decoded.split(DELIMITER)[-1])
except (TypeError, ValueError):
return False
if current_time is None:
current_time = time.time()
# If the token is too old it's not valid.
if current_time - token_time > DEFAULT_TIMEOUT_SECS:
return False
# The given token should match the generated one with the same time.
expected_token = generate_token(key, user_id, action_id=action_id,
when=token_time)
if token != expected_token:
return False
return True

View File

@@ -21,3 +21,4 @@ $1 runtests.py $FLAGS tests/test_schema.py
$1 runtests.py $FLAGS tests/test_oauth2client_appengine.py
$1 runtests.py $FLAGS tests/test_oauth2client_keyring.py
$1 runtests.py $FLAGS tests/test_oauth2client_gce.py
$1 runtests.py $FLAGS tests/test_oauth2client_xsrfutil.py

View File

@@ -8,7 +8,7 @@
application</a>.</p>
{% else %}
<p><a href="{{ url }}">Grant</a> this application permission to read your
Buzz information and it will let you know how many followers you have.</p>
Google+ information and it will let you know how many followers you have.</p>
{% endif %}
<p>You can always <a
href="https://www.google.com/accounts/b/0/IssuedAuthSubTokens">revoke</a>

View File

@@ -1,3 +0,0 @@
Demonstrates using oauth2client against the DailyMotion API.
keywords: oauth2 appengine

View File

@@ -1,9 +0,0 @@
application: dailymotoauth2test
version: 1
runtime: python
api_version: 1
handlers:
- url: .*
script: main.py

View File

@@ -1,11 +0,0 @@
indexes:
# AUTOGENERATED
# This index.yaml is automatically updated whenever the dev_appserver
# detects that a new type of query is run. If you want to manage the
# index.yaml file manually, remove the above marker line (the line
# saying "# AUTOGENERATED"). If you want to manage some indexes
# manually, move them above the marker line. The index.yaml file is
# automatically uploaded to the admin console when you next deploy
# your application using appcfg.py.

View File

@@ -1,100 +0,0 @@
#!/usr/bin/env python
#
# Copyright 2007 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
__author__ = 'jcgregorio@google.com (Joe Gregorio)'
import httplib2
import logging
import os
import pickle
from oauth2client.appengine import CredentialsProperty
from oauth2client.appengine import StorageByKeyName
from oauth2client.client import OAuth2WebServerFlow
from google.appengine.api import users
from google.appengine.ext import db
from google.appengine.ext import webapp
from google.appengine.ext.webapp import template
from google.appengine.ext.webapp import util
from google.appengine.ext.webapp.util import login_required
FLOW = OAuth2WebServerFlow(
client_id='2ad565600216d25d9cde',
client_secret='03b56df2949a520be6049ff98b89813f17b467dc',
scope='read',
redirect_uri='https://dailymotoauth2test.appspot.com/auth_return',
user_agent='oauth2client-sample/1.0',
auth_uri='https://api.dailymotion.com/oauth/authorize',
token_uri='https://api.dailymotion.com/oauth/token'
)
class Credentials(db.Model):
credentials = CredentialsProperty()
class MainHandler(webapp.RequestHandler):
@login_required
def get(self):
user = users.get_current_user()
credentials = StorageByKeyName(
Credentials, user.user_id(), 'credentials').get()
if credentials is None or credentials.invalid == True:
authorize_url = FLOW.step1_get_authorize_url()
self.redirect(authorize_url)
else:
http = httplib2.Http()
http = credentials.authorize(http)
resp, content = http.request('https://api.dailymotion.com/me')
path = os.path.join(os.path.dirname(__file__), 'welcome.html')
logout = users.create_logout_url('/')
variables = {
'content': content,
'logout': logout
}
self.response.out.write(template.render(path, variables))
class OAuthHandler(webapp.RequestHandler):
@login_required
def get(self):
user = users.get_current_user()
credentials = FLOW.step2_exchange(self.request.params)
StorageByKeyName(
Credentials, user.user_id(), 'credentials').put(credentials)
self.redirect("/")
def main():
application = webapp.WSGIApplication(
[
('/', MainHandler),
('/auth_return', OAuthHandler)
],
debug=True)
util.run_wsgi_app(application)
if __name__ == '__main__':
main()

View File

@@ -1,14 +0,0 @@
<html>
<head>
<title>Daily Motion Sample</title>
<style type=text/css>
td { vertical-align: top; padding: 0.5em }
img { border:0 }
</style>
</head>
<body>
<p><a href="{{ logout }}">Logout</a></p>
<h2>Response body:</h2>
<pre>{{ content }} </pre>
</body>
</html>

View File

@@ -0,0 +1,9 @@
{
"web": {
"client_id": "[[INSERT CLIENT ID HERE]]",
"client_secret": "[[INSERT CLIENT SECRET HERE]]",
"redirect_uris": [],
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://accounts.google.com/o/oauth2/token"
}
}

View File

@@ -18,8 +18,4 @@ class CredentialsAdmin(admin.ModelAdmin):
pass
class FlowAdmin(admin.ModelAdmin):
pass
admin.site.register(CredentialsModel, CredentialsAdmin)

View File

@@ -2,27 +2,29 @@ import os
import logging
import httplib2
from django.http import HttpResponse
from django.core.urlresolvers import reverse
from django.contrib.auth.decorators import login_required
from oauth2client.django_orm import Storage
from oauth2client.client import OAuth2WebServerFlow
from django_sample.plus.models import CredentialsModel
from apiclient.discovery import build
from django.contrib.auth.decorators import login_required
from django.core.urlresolvers import reverse
from django.http import HttpResponse
from django.http import HttpResponseBadRequest
from django.http import HttpResponseRedirect
from django.shortcuts import render_to_response
from django_sample.plus.models import CredentialsModel
from django_sample import settings
from oauth2client import xsrfutil
from oauth2client.client import flow_from_clientsecrets
from oauth2client.django_orm import Storage
STEP2_URI = 'http://localhost:8000/oauth2callback'
# CLIENT_SECRETS, name of a file containing the OAuth 2.0 information for this
# application, including client_id and client_secret, which are found
# on the API Access tab on the Google APIs
# Console <http://code.google.com/apis/console>
CLIENT_SECRETS = os.path.join(os.path.dirname(__file__), '..', 'client_secrets.json')
FLOW = OAuth2WebServerFlow(
client_id='[[Insert Client ID here.]]',
client_secret='[[Insert Client Secret here.]]',
FLOW = flow_from_clientsecrets(
CLIENT_SECRETS,
scope='https://www.googleapis.com/auth/plus.me',
redirect_uri=STEP2_URI,
user_agent='plus-django-sample/1.0',
)
redirect_uri='http://localhost:8000/oauth2callback')
@login_required
@@ -30,6 +32,8 @@ def index(request):
storage = Storage(CredentialsModel, 'id', request.user, 'credential')
credential = storage.get()
if credential is None or credential.invalid == True:
FLOW.params['state'] = xsrfutil.generate_token(settings.SECRET_KEY,
request.user)
authorize_url = FLOW.step1_get_authorize_url()
return HttpResponseRedirect(authorize_url)
else:
@@ -48,6 +52,9 @@ def index(request):
@login_required
def auth_return(request):
if not xsrfutil.validate_token(settings.SECRET_KEY, request.REQUEST['state'],
request.user):
return HttpResponseBadRequest()
credential = FLOW.step2_exchange(request.REQUEST)
storage = Storage(CredentialsModel, 'id', request.user, 'credential')
storage.put(credential)

View File

@@ -25,6 +25,7 @@ __author__ = 'jcgregorio@google.com (Joe Gregorio)'
import base64
import datetime
import httplib2
import mox
import os
import time
import unittest
@@ -49,8 +50,10 @@ from google.appengine.api.memcache import memcache_stub
from google.appengine.ext import db
from google.appengine.ext import testbed
from google.appengine.runtime import apiproxy_errors
from oauth2client import appengine
from oauth2client.anyjson import simplejson
from oauth2client.clientsecrets import _loadfile
from oauth2client.clientsecrets import InvalidClientSecretsError
from oauth2client.appengine import AppAssertionCredentials
from oauth2client.appengine import CredentialsModel
from oauth2client.appengine import FlowProperty
@@ -276,15 +279,18 @@ class StorageByKeyNameTest(unittest.TestCase):
self.assertEqual(None, credentials)
self.assertEqual(None, memcache.get('foo'))
class MockRequest(object):
url = 'https://example.org'
def relative_url(self, rel):
return self.url + rel
class MockRequestHandler(object):
request = MockRequest()
class DecoratorTests(unittest.TestCase):
def setUp(self):
@@ -343,18 +349,28 @@ class DecoratorTests(unittest.TestCase):
self.assertEqual('http://localhost/oauth2callback', q['redirect_uri'][0])
self.assertEqual('foo_client_id', q['client_id'][0])
self.assertEqual('foo_scope bar_scope', q['scope'][0])
self.assertEqual('http://localhost/foo_path', q['state'][0])
self.assertEqual('http://localhost/foo_path',
q['state'][0].rsplit(':', 1)[0])
self.assertEqual('code', q['response_type'][0])
self.assertEqual(False, self.decorator.has_credentials())
m = mox.Mox()
m.StubOutWithMock(appengine, "_parse_state_value")
appengine._parse_state_value('foo_path:xsrfkey123',
mox.IgnoreArg()).AndReturn('foo_path')
m.ReplayAll()
# Now simulate the callback to /oauth2callback.
response = self.app.get('/oauth2callback', {
'code': 'foo_access_code',
'state': 'foo_path',
'state': 'foo_path:xsrfkey123',
})
self.assertEqual('http://localhost/foo_path', response.headers['Location'])
self.assertEqual(None, self.decorator.credentials)
m.UnsetStubs()
m.VerifyAll()
# Now requesting the decorated path should work.
response = self.app.get('/foo_path')
self.assertEqual('200 OK', response.status)
@@ -380,10 +396,16 @@ class DecoratorTests(unittest.TestCase):
response = self.app.get('/foo_path')
self.assertTrue(response.status.startswith('302'))
m = mox.Mox()
m.StubOutWithMock(appengine, "_parse_state_value")
appengine._parse_state_value('foo_path:xsrfkey123',
mox.IgnoreArg()).AndReturn('foo_path')
m.ReplayAll()
# Now simulate the callback to /oauth2callback.
response = self.app.get('/oauth2callback', {
'code': 'foo_access_code',
'state': 'foo_path',
'state': 'foo_path:xsrfkey123',
})
self.assertEqual('http://localhost/foo_path', response.headers['Location'])
self.assertEqual(None, self.decorator.credentials)
@@ -398,6 +420,9 @@ class DecoratorTests(unittest.TestCase):
response = self.app.get('/foo_path')
self.assertTrue(response.status.startswith('302'))
m.UnsetStubs()
m.VerifyAll()
def test_aware(self):
# An initial request to an oauth_aware decorated path should not redirect.
response = self.app.get('/bar_path/2012/01')
@@ -409,18 +434,28 @@ class DecoratorTests(unittest.TestCase):
self.assertEqual('http://localhost/oauth2callback', q['redirect_uri'][0])
self.assertEqual('foo_client_id', q['client_id'][0])
self.assertEqual('foo_scope bar_scope', q['scope'][0])
self.assertEqual('http://localhost/bar_path/2012/01', q['state'][0])
self.assertEqual('http://localhost/bar_path/2012/01',
q['state'][0].rsplit(':', 1)[0])
self.assertEqual('code', q['response_type'][0])
m = mox.Mox()
m.StubOutWithMock(appengine, "_parse_state_value")
appengine._parse_state_value('bar_path:xsrfkey456',
mox.IgnoreArg()).AndReturn('bar_path')
m.ReplayAll()
# Now simulate the callback to /oauth2callback.
url = self.decorator.authorize_url()
response = self.app.get('/oauth2callback', {
'code': 'foo_access_code',
'state': 'bar_path',
'state': 'bar_path:xsrfkey456',
})
self.assertEqual('http://localhost/bar_path', response.headers['Location'])
self.assertEqual(False, self.decorator.has_credentials())
m.UnsetStubs()
m.VerifyAll()
# Now requesting the decorated path will have credentials.
response = self.app.get('/bar_path/2012/01')
self.assertEqual('200 OK', response.status)
@@ -431,7 +466,6 @@ class DecoratorTests(unittest.TestCase):
self.assertEqual('foo_access_token',
self.decorator.credentials.access_token)
def test_error_in_step2(self):
# An initial request to an oauth_aware decorated path should not redirect.
response = self.app.get('/bar_path/2012/01')
@@ -509,33 +543,77 @@ class DecoratorTests(unittest.TestCase):
def test_decorator_from_unfilled_client_secrets_required(self):
MESSAGE = 'File is missing'
decorator = oauth2decorator_from_clientsecrets(
datafile('unfilled_client_secrets.json'),
scope=['foo_scope', 'bar_scope'], message=MESSAGE)
self._finish_setup(decorator, user_mock=UserNotLoggedInMock)
self.assertTrue(decorator._in_error)
self.assertEqual(MESSAGE, decorator._message)
# An initial request to an oauth_required decorated path should be an
# error message.
response = self.app.get('/foo_path')
self.assertTrue(response.status.startswith('200'))
self.assertTrue(MESSAGE in str(response))
try:
decorator = oauth2decorator_from_clientsecrets(
datafile('unfilled_client_secrets.json'),
scope=['foo_scope', 'bar_scope'], message=MESSAGE)
except InvalidClientSecretsError:
pass
def test_decorator_from_unfilled_client_secrets_aware(self):
MESSAGE = 'File is missing'
decorator = oauth2decorator_from_clientsecrets(
datafile('unfilled_client_secrets.json'),
scope=['foo_scope', 'bar_scope'], message=MESSAGE)
self._finish_setup(decorator, user_mock=UserNotLoggedInMock)
self.assertTrue(decorator._in_error)
self.assertEqual(MESSAGE, decorator._message)
try:
decorator = oauth2decorator_from_clientsecrets(
datafile('unfilled_client_secrets.json'),
scope=['foo_scope', 'bar_scope'], message=MESSAGE)
except InvalidClientSecretsError:
pass
# An initial request to an oauth_aware decorated path should be an
# error message.
response = self.app.get('/bar_path/2012/03')
self.assertTrue(response.status.startswith('200'))
self.assertTrue(MESSAGE in str(response))
class DecoratorXsrfSecretTests(unittest.TestCase):
"""Test xsrf_secret_key."""
def setUp(self):
self.testbed = testbed.Testbed()
self.testbed.activate()
self.testbed.init_datastore_v3_stub()
self.testbed.init_memcache_stub()
def tearDown(self):
self.testbed.deactivate()
def test_build_and_parse_state(self):
secret = appengine.xsrf_secret_key()
# Secret shouldn't change from call to call.
secret2 = appengine.xsrf_secret_key()
self.assertEqual(secret, secret2)
# Secret shouldn't change if memcache goes away.
memcache.delete(appengine.XSRF_MEMCACHE_ID,
namespace=appengine.OAUTH2CLIENT_NAMESPACE)
secret3 = appengine.xsrf_secret_key()
self.assertEqual(secret2, secret3)
# Secret should change if both memcache and the model goes away.
memcache.delete(appengine.XSRF_MEMCACHE_ID,
namespace=appengine.OAUTH2CLIENT_NAMESPACE)
model = appengine.SiteXsrfSecretKey.get_or_insert('site')
model.delete()
secret4 = appengine.xsrf_secret_key()
self.assertNotEqual(secret3, secret4)
class DecoratorXsrfProtectionTests(unittest.TestCase):
"""Test _build_state_value and _parse_state_value."""
def setUp(self):
self.testbed = testbed.Testbed()
self.testbed.activate()
self.testbed.init_datastore_v3_stub()
self.testbed.init_memcache_stub()
def tearDown(self):
self.testbed.deactivate()
def test_build_and_parse_state(self):
state = appengine._build_state_value(MockRequestHandler(), UserMock())
self.assertEqual(
'https://example.org',
appengine._parse_state_value(state, UserMock()))
self.assertRaises(appengine.InvalidXsrfTokenError,
appengine._parse_state_value, state[1:], UserMock())
if __name__ == '__main__':

View File

@@ -0,0 +1,111 @@
# Copyright 2012 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Tests for oauth2client.xsrfutil.
Unit tests for oauth2client.xsrfutil.
"""
__author__ = 'jcgregorio@google.com (Joe Gregorio)'
import unittest
from oauth2client import xsrfutil
# Jan 17 2008, 5:40PM
TEST_KEY = 'test key'
TEST_TIME = 1200609642081230
TEST_USER_ID_1 = 123832983
TEST_USER_ID_2 = 938297432
TEST_ACTION_ID_1 = 'some_action'
TEST_ACTION_ID_2 = 'some_other_action'
TEST_EXTRA_INFO_1 = 'extra_info_1'
TEST_EXTRA_INFO_2 = 'more_extra_info'
class XsrfUtilTests(unittest.TestCase):
"""Test xsrfutil functions."""
def testGenerateAndValidateToken(self):
"""Test generating and validating a token."""
token = xsrfutil.generate_token(TEST_KEY,
TEST_USER_ID_1,
action_id=TEST_ACTION_ID_1,
when=TEST_TIME)
# Check that the token is considered valid when it should be.
self.assertTrue(xsrfutil.validate_token(TEST_KEY,
token,
TEST_USER_ID_1,
action_id=TEST_ACTION_ID_1,
current_time=TEST_TIME))
# Should still be valid 15 minutes later.
later15mins = TEST_TIME + 15*60
self.assertTrue(xsrfutil.validate_token(TEST_KEY,
token,
TEST_USER_ID_1,
action_id=TEST_ACTION_ID_1,
current_time=later15mins))
# But not if beyond the timeout.
later2hours = TEST_TIME + 2*60*60
self.assertFalse(xsrfutil.validate_token(TEST_KEY,
token,
TEST_USER_ID_1,
action_id=TEST_ACTION_ID_1,
current_time=later2hours))
# Or if the key is different.
self.assertFalse(xsrfutil.validate_token('another key',
token,
TEST_USER_ID_1,
action_id=TEST_ACTION_ID_1,
current_time=later15mins))
# Or the user ID....
self.assertFalse(xsrfutil.validate_token(TEST_KEY,
token,
TEST_USER_ID_2,
action_id=TEST_ACTION_ID_1,
current_time=later15mins))
# Or the action ID...
self.assertFalse(xsrfutil.validate_token(TEST_KEY,
token,
TEST_USER_ID_1,
action_id=TEST_ACTION_ID_2,
current_time=later15mins))
# Invalid when truncated
self.assertFalse(xsrfutil.validate_token(TEST_KEY,
token[:-1],
TEST_USER_ID_1,
action_id=TEST_ACTION_ID_1,
current_time=later15mins))
# Invalid with extra garbage
self.assertFalse(xsrfutil.validate_token(TEST_KEY,
token + 'x',
TEST_USER_ID_1,
action_id=TEST_ACTION_ID_1,
current_time=later15mins))
# Invalid with token of None
self.assertFalse(xsrfutil.validate_token(TEST_KEY,
None,
TEST_USER_ID_1,
action_id=TEST_ACTION_ID_1))
if __name__ == '__main__':
unittest.main()