Add Django Util With Decorator

Adds a submodule that provides views, decorator, and signals to help
a Django web application complete OAuth2 authorization.
This commit is contained in:
Bill Prin
2015-10-13 01:46:19 -07:00
parent b2a405593f
commit 5be95050da
18 changed files with 1170 additions and 0 deletions

3
.gitignore vendored
View File

@@ -11,6 +11,9 @@ docs/_build
# Test files
.tox/
# Django test database
db.sqlite3
# Coverage files
.coverage
coverage.xml

View File

@@ -0,0 +1,7 @@
oauth2client.contrib.django_util.apps module
============================================
.. automodule:: oauth2client.contrib.django_util.apps
:members:
:undoc-members:
:show-inheritance:

View File

@@ -0,0 +1,7 @@
oauth2client.contrib.django_util.decorators module
==================================================
.. automodule:: oauth2client.contrib.django_util.decorators
:members:
:undoc-members:
:show-inheritance:

View File

@@ -0,0 +1,22 @@
oauth2client.contrib.django_util package
========================================
Submodules
----------
.. toctree::
oauth2client.contrib.django_util.apps
oauth2client.contrib.django_util.decorators
oauth2client.contrib.django_util.signals
oauth2client.contrib.django_util.site
oauth2client.contrib.django_util.storage
oauth2client.contrib.django_util.views
Module contents
---------------
.. automodule:: oauth2client.contrib.django_util
:members:
:undoc-members:
:show-inheritance:

View File

@@ -0,0 +1,7 @@
oauth2client.contrib.django_util.signals module
===============================================
.. automodule:: oauth2client.contrib.django_util.signals
:members:
:undoc-members:
:show-inheritance:

View File

@@ -0,0 +1,7 @@
oauth2client.contrib.django_util.site module
============================================
.. automodule:: oauth2client.contrib.django_util.site
:members:
:undoc-members:
:show-inheritance:

View File

@@ -0,0 +1,7 @@
oauth2client.contrib.django_util.storage module
===============================================
.. automodule:: oauth2client.contrib.django_util.storage
:members:
:undoc-members:
:show-inheritance:

View File

@@ -0,0 +1,7 @@
oauth2client.contrib.django_util.views module
=============================================
.. automodule:: oauth2client.contrib.django_util.views
:members:
:undoc-members:
:show-inheritance:

View File

@@ -1,6 +1,13 @@
oauth2client.contrib package
============================
Subpackages
-----------
.. toctree::
oauth2client.contrib.django_util
Module contents
---------------

View File

@@ -0,0 +1,309 @@
# Copyright 2015 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities for the Django web framework
Provides Django views and helpers the make using the OAuth2 web server
flow easier. It includes an ``oauth_required`` decorator to automatically ensure
that user credentials are available, and an ``oauth_enabled`` decorator to check
if the user has authorized, and helper shortcuts to create the authorization
URL otherwise.
Configuration
=============
To configure, you'll need a set of OAuth2 web application credentials from
`Google Developer's Console <https://console.developers.google.com/project/_/apiui/credential>`.
Add the helper to your INSTALLED_APPS:
.. code-block:: python
:caption: settings.py
:name: installed_apps
INSTALLED_APPS = (
# other apps
"oauth2client.contrib.django_util"
)
Add the client secrets created earlier to the settings. You can either
specify the path to the credentials file in JSON format
.. code-block:: python
:caption: settings.py
:name: secrets_file
GOOGLE_OAUTH2_CLIENT_SECRETS_JSON=/path/to/client-secret.json
Or, directly configure the client Id and client secret.
.. code-block:: python
:caption: settings.py
:name: secrets_config
GOOGLE_OAUTH2_CLIENT_ID=client-id-field
GOOGLE_OAUTH2_CLIENT_SECRET=client-secret-field
By default, the default scopes for the required decorator only contains the
``email`` scopes. You can change that default in the settings.
.. code-block:: python
:caption: settings.py
:name: scopes
GOOGLE_OAUTH2_SCOPES = ('email', 'https://www.googleapis.com/auth/calendar',)
By default, the decorators will add an `oauth` object to the Django request
object, and include all of its state and helpers inside that object. If the
`oauth` name conflicts with another usage, it can be changed
.. code-block:: python
:caption: settings.py
:name: request_prefix
# changes request.oauth to request.google_oauth
GOOGLE_OAUTH2_REQUEST_ATTRIBUTE = 'google_oauth'
Add the oauth2 routes to your application's urls.py urlpatterns.
.. code-block:: python
:caption: urls.py
:name: urls
from oauth2client.contrib.django_util.site import urls as oauth2_urls
urlpatterns += [url(r'^oauth2/', include(oauth2_urls))]
To require OAuth2 credentials for a view, use the `oauth2_required` decorator.
This creates a credentials object with an id_token, and allows you to create an
`http` object to build service clients with. These are all attached to the
request.oauth
.. code-block:: python
:caption: views.py
:name: views_required
from oauth2client.contrib.django_util.decorators import oauth_required
@oauth_required
def requires_default_scopes(request):
email = request.credentials.id_token['email']
service = build(serviceName='calendar', version='v3',
http=request.oauth.http,
developerKey=API_KEY)
events = service.events().list(calendarId='primary').execute()['items']
return HttpResponse("email: %s , calendar: %s" % (email, str(events)))
To make OAuth2 optional and provide an authorization link in your own views.
.. code-block:: python
:caption: views.py
:name: views_enabled2
from oauth2client.contrib.django_util.decorators import oauth_enabled
@oauth_enabled
def optional_oauth2(request):
if request.oauth.has_credentials():
# this could be passed into a view
# request.oauth.http is also initialized
return HttpResponse("User email: %s"
% request.oauth.credentials.id_token['email'])
else:
return HttpResponse('Here is an OAuth Authorize link:
<a href="%s">Authorize</a>' % request.oauth.get_authorize_redirect())
If a view needs a scope not included in the default scopes specified in
the settings, you can use [incremental auth](https://developers.google.com/identity/sign-in/web/incremental-auth)
and specify additional scopes in the decorator arguments.
.. code-block:: python
:caption: views.py
:name: views_required_additional_scopes
@oauth_enabled(scopes=['https://www.googleapis.com/auth/drive'])
def drive_required(request):
if request.oauth.has_credentials():
service = build(serviceName='drive', version='v2',
http=request.oauth.http,
developerKey=API_KEY)
events = service.files().list().execute()['items']
return HttpResponse(str(events))
else:
return HttpResponse('Here is an OAuth Authorize link:
<a href="%s">Authorize</a>' % request.oauth.get_authorize_redirect())
To provide a callback on authorization being completed, use the
oauth2_authorized signal:
.. code-block:: python
:caption: views.py
:name: signals
from oauth2client.contrib.django_util.signals import oauth2_authorized
def test_callback(sender, request, credentials, **kwargs):
print "Authorization Signal Received %s" % credentials.id_token['email']
oauth2_authorized.connect(test_callback)
"""
import sys
import django.conf
from django.core import exceptions
from django.core import urlresolvers
import httplib2
from oauth2client import clientsecrets
from oauth2client.contrib.django_util import storage
from six.moves.urllib import parse
GOOGLE_OAUTH2_DEFAULT_SCOPES = ('email',)
GOOGLE_OAUTH2_REQUEST_ATTRIBUTE = 'oauth'
def _load_client_secrets(filename):
"""Loads client secrets from the given filename."""
client_type, client_info = clientsecrets.loadfile(filename)
if client_type != clientsecrets.TYPE_WEB:
raise ValueError(
'The flow specified in {} is not supported, only the WEB flow '
'type is supported.'.format(client_type))
return client_info['client_id'], client_info['client_secret']
def _get_oauth2_client_id_and_secret(settings_instance):
"""Initializes client id and client secret based on the settings"""
secret_json = getattr(django.conf.settings,
'GOOGLE_OAUTH2_CLIENT_SECRETS_JSON', None)
if secret_json is not None:
return _load_client_secrets(secret_json)
else:
client_id = getattr(settings_instance, "GOOGLE_OAUTH2_CLIENT_ID",
None)
client_secret = getattr(settings_instance,
"GOOGLE_OAUTH2_CLIENT_SECRET", None)
if client_id is not None and client_secret is not None:
return client_id, client_secret
else:
raise exceptions.ImproperlyConfigured(
"Must specify either GOOGLE_OAUTH2_CLIENT_SECRETS_JSON, or "
" both GOOGLE_OAUTH2_CLIENT_ID and GOOGLE_OAUTH2_CLIENT_SECRET "
"in settings.py")
class OAuth2Settings(object):
"""Initializes Django OAuth2 Helper Settings
This class loads the OAuth2 Settings from the Django settings, and then
provides those settings as attributes to the rest of the views and
decorators in the module.
Attributes:
scopes: A list of OAuth2 scopes that the decorators and views will use
as defaults
request_prefix: The name of the attribute that the decorators use to
attach the UserOAuth2 object to the Django request object.
client_id: The OAuth2 Client ID
client_secret: The OAuth2 Client Secret
"""
def __init__(self, settings_instance):
self.scopes = getattr(settings_instance, 'GOOGLE_OAUTH2_SCOPES',
GOOGLE_OAUTH2_DEFAULT_SCOPES)
self.request_prefix = getattr(settings_instance,
'GOOGLE_OAUTH2_REQUEST_ATTRIBUTE',
GOOGLE_OAUTH2_REQUEST_ATTRIBUTE)
self.client_id, self.client_secret = \
_get_oauth2_client_id_and_secret(settings_instance)
if ('django.contrib.sessions.middleware.SessionMiddleware'
not in settings_instance.MIDDLEWARE_CLASSES):
raise exceptions.ImproperlyConfigured(
"The Google OAuth2 Helper requires session middleware to "
"be installed. Edit your MIDDLEWARE_CLASSES setting"
" to include 'django.contrib.sessions.middleware."
"SessionMiddleware'.")
oauth2_settings = OAuth2Settings(django.conf.settings)
def _redirect_with_params(url_name, *args, **kwargs):
"""Helper method to create a redirect response that uses GET URL
parameters."""
url = urlresolvers.reverse(url_name, args=args)
params = parse.urlencode(kwargs, True)
return "{0}?{1}".format(url, params)
class UserOAuth2(object):
"""Class to create oauth2 objects on Django request objects containing
credentials and helper methods.
"""
def __init__(self, request, scopes=None, return_url=None):
"""Initialize the Oauth2 Object
:param request: Django request object
:param scopes: Scopes desired for this OAuth2 flow
:param return_url: URL to return to after authorization is complete
:return:
"""
self.request = request
self.return_url = return_url or request.get_full_path()
self.scopes = set(oauth2_settings.scopes)
if scopes:
self.scopes |= set(scopes)
# make sure previously requested custom scopes are maintained
# in future authorizations
credentials = storage.get_storage(self.request).get()
if credentials:
self.scopes |= credentials.scopes
def get_authorize_redirect(self):
"""Creates a URl to start the OAuth2 authorization flow"""
get_params = {
'return_url': self.return_url,
'scopes': self.scopes
}
return _redirect_with_params('google_oauth:authorize',
**get_params)
def has_credentials(self):
"""Returns True if there are valid credentials for the current user
and required scopes."""
return self.credentials and not self.credentials.invalid \
and self.credentials.has_scopes(self.scopes)
@property
def credentials(self):
"""Gets the authorized credentials for this flow, if they exist"""
return storage.get_storage(self.request).get()
@property
def http(self):
"""Helper method to create an HTTP client authorized with OAuth2
credentials"""
if self.has_credentials():
return self.credentials.authorize(httplib2.Http())
return None

View File

@@ -0,0 +1,31 @@
# Copyright 2015 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Application Config For Django OAuth2 Helper
Django 1.7+ provides an
[applications](https://docs.djangoproject.com/en/1.8/ref/applications/)
API so that Django projects can introspect on installed applications using a
stable API. This module exists to follow that convention.
"""
import sys
# Django 1.7+ only supports Python 2.7+
if sys.hexversion >= 0x02070000: # pragma: NO COVER
from django.apps import AppConfig
class GoogleOAuth2HelperConfig(AppConfig):
""" App Config for Django Helper"""
name = 'oauth2client.django_util'
verbose_name = "Google OAuth2 Django Helper"

View File

@@ -0,0 +1,117 @@
# Copyright 2015 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from django import shortcuts
from oauth2client.contrib import django_util
from six import wraps
def oauth_required(decorated_function=None, scopes=None, **decorator_kwargs):
""" Decorator to require OAuth2 credentials for a view
.. code-block:: python
:caption: views.py
:name: views_required_2
from oauth2client.django_util.decorators import oauth_required
@oauth_required
def requires_default_scopes(request):
email = request.credentials.id_token['email']
service = build(serviceName='calendar', version='v3',
http=request.oauth.http,
developerKey=API_KEY)
events = service.events().list(
calendarId='primary').execute()['items']
return HttpResponse("email: %s , calendar: %s" % (email, str(events)))
:param decorated_function: View function to decorate, must have the Django
request object as the first argument
:param scopes: Scopes to require, will default
:param decorator_kwargs: Can include ``return_url`` to specify the URL to
return to after OAuth2 authorization is complete
:return: An OAuth2 Authorize view if credentials are not found or if the
credentials are missing the required scopes. Otherwise,
the decorated view.
"""
def curry_wrapper(wrapped_function):
@wraps(wrapped_function)
def required_wrapper(request, *args, **kwargs):
return_url = decorator_kwargs.pop('return_url',
request.get_full_path())
user_oauth = django_util.UserOAuth2(request, scopes, return_url)
if not user_oauth.has_credentials():
return shortcuts.redirect(user_oauth.get_authorize_redirect())
setattr(request, django_util.oauth2_settings.request_prefix,
user_oauth)
return wrapped_function(request, *args, **kwargs)
return required_wrapper
if decorated_function:
return curry_wrapper(decorated_function)
else:
return curry_wrapper
def oauth_enabled(decorated_function=None, scopes=None, **decorator_kwargs):
""" Decorator to enable OAuth Credentials if authorized, and setup
the oauth object on the request object to provide helper functions
to start the flow otherwise.
.. code-block:: python
:caption: views.py
:name: views_enabled3
from oauth2client.django_util.decorators import oauth_enabled
@oauth_enabled
def optional_oauth2(request):
if request.oauth.has_credentials():
# this could be passed into a view
# request.oauth.http is also initialized
return HttpResponse("User email: %s" %
request.oauth.credentials.id_token['email'])
else:
return HttpResponse('Here is an OAuth Authorize link:
<a href="%s">Authorize</a>' %
request.oauth.get_authorize_redirect())
:param decorated_function: View function to decorate
:param scopes: Scopes to require, will default
:param decorator_kwargs: Can include ``return_url`` to specify the URL to
return to after OAuth2 authorization is complete
:return: The decorated view function
"""
def curry_wrapper(wrapped_function):
@wraps(wrapped_function)
def enabled_wrapper(request, *args, **kwargs):
return_url = decorator_kwargs.pop('return_url',
request.get_full_path())
user_oauth = django_util.UserOAuth2(request, scopes, return_url)
setattr(request, django_util.oauth2_settings.request_prefix,
user_oauth)
return wrapped_function(request, *args, **kwargs)
return enabled_wrapper
if decorated_function:
return curry_wrapper(decorated_function)
else:
return curry_wrapper

View File

@@ -0,0 +1,28 @@
# Copyright 2015 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
""" Signals for Google OAuth2 Helper
This module contains signals for Google OAuth2 Helper. Currently it only
contains one, which fires when an OAuth2 authorization flow has completed.
"""
import django.dispatch
"""Signal that fires when OAuth2 Flow has completed.
It passes the Django request object and the OAuth2 credentials object to the
receiver.
"""
oauth2_authorized = django.dispatch.Signal(
providing_args=["request", "credentials"])

View File

@@ -0,0 +1,23 @@
# Copyright 2015 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from django.conf import urls
from oauth2client.contrib.django_util import views
urlpatterns = [
urls.url(r'oauth2callback/', views.oauth2_callback, name="callback"),
urls.url(r'oauth2authorize/', views.oauth2_authorize, name="authorize")
]
urls = (urlpatterns, "google_oauth", "google_oauth")

View File

@@ -0,0 +1,52 @@
# Copyright 2015 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oauth2client import client
def get_storage(request):
# TODO(issue 319): Make this pluggable with different storage providers
# https://github.com/google/oauth2client/issues/319
""" Gets a Credentials storage object for the Django OAuth2 Helper object
:param request: Reference to the current request object
:return: A OAuth2Client Storage implementation based on sessions
"""
return DjangoSessionStorage(request.session)
_CREDENTIALS_KEY = 'google_oauth2_credentials'
class DjangoSessionStorage(client.Storage):
"""Storage implementation that uses Django sessions."""
def __init__(self, session):
self.session = session
def locked_get(self):
serialized = self.session.get(_CREDENTIALS_KEY)
if serialized is None:
return None
credentials = client.OAuth2Credentials.from_json(serialized)
credentials.set_store(self)
return credentials
def locked_put(self, credentials):
self.session[_CREDENTIALS_KEY] = credentials.to_json()
def locked_delete(self):
if _CREDENTIALS_KEY in self.session:
del self.session[_CREDENTIALS_KEY]

View File

@@ -0,0 +1,139 @@
# Copyright 2015 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import hashlib
import json
import os
import pickle
from django import http
from django.core import urlresolvers
from django import shortcuts
from oauth2client import client
from oauth2client.contrib import django_util
from oauth2client.contrib.django_util import signals
from oauth2client.contrib.django_util import storage
_CSRF_KEY = 'google_oauth2_csrf_token'
_FLOW_KEY = 'google_oauth2_flow_{0}'
def _make_flow(request, scopes, return_url=None):
"""Creates a Web Server Flow"""
# Generate a CSRF token to prevent malicious requests.
csrf_token = hashlib.sha256(os.urandom(1024)).hexdigest()
request.session[_CSRF_KEY] = csrf_token
state = json.dumps({
'csrf_token': csrf_token,
'return_url': return_url,
})
flow = client.OAuth2WebServerFlow(
client_id=django_util.oauth2_settings.client_id,
client_secret=django_util.oauth2_settings.client_secret,
scope=scopes,
state=state,
redirect_uri=request.build_absolute_uri(
urlresolvers.reverse("google_oauth:callback")))
flow_key = _FLOW_KEY.format(csrf_token)
request.session[flow_key] = pickle.dumps(flow)
return flow
def _get_flow_for_token(csrf_token, request):
""" Looks up the flow in session to recover information about requested
scopes."""
flow_pickle = request.session.get(_FLOW_KEY.format(csrf_token), None)
return None if flow_pickle is None else pickle.loads(flow_pickle)
def oauth2_callback(request):
""" View that handles the user's return from OAuth2 provider.
This view verifies the CSRF state and OAuth authorization code, and on
success stores the credentials obtained in the storage provider,
and redirects to the return_url specified in the authorize view and
stored in the session.
:param request: Django request
:return: A redirect response back to the return_url
"""
if 'error' in request.GET:
reason = request.GET.get(
'error_description', request.GET.get('error', ''))
return http.HttpResponseBadRequest(
'Authorization failed %s' % reason)
try:
encoded_state = request.GET['state']
code = request.GET['code']
except KeyError:
return http.HttpResponseBadRequest(
"Request missing state or authorization code")
try:
server_csrf = request.session[_CSRF_KEY]
except KeyError:
return http.HttpResponseBadRequest("No existing session for this flow.")
try:
state = json.loads(encoded_state)
client_csrf = state['csrf_token']
return_url = state['return_url']
except (ValueError, KeyError):
return http.HttpResponseBadRequest('Invalid state parameter.')
if client_csrf != server_csrf:
return http.HttpResponseBadRequest('Invalid CSRF token.')
flow = _get_flow_for_token(client_csrf, request)
if not flow:
return http.HttpResponseBadRequest("Missing Oauth2 flow.")
try:
credentials = flow.step2_exchange(code)
except client.FlowExchangeError as exchange_error:
return http.HttpResponseBadRequest(
"An error has occurred: {0}".format(exchange_error))
storage.get_storage(request).put(credentials)
signals.oauth2_authorized.send(sender=signals.oauth2_authorized,
request=request, credentials=credentials)
return shortcuts.redirect(return_url)
def oauth2_authorize(request):
""" View to start the OAuth2 Authorization flow
This view starts the OAuth2 authorization flow. If scopes is passed in
as a GET URL parameter, it will authorize those scopes, otherwise the
default scopes specified in settings. The return_url can also be
specified as a GET parameter, otherwise the referer header will be
checked, and if that isn't found it will return to the root path.
:param request: The Django request object
:return: A redirect to Google OAuth2 Authorization
"""
scopes = request.GET.getlist('scopes', django_util.oauth2_settings.scopes)
return_url = request.GET.get('return_url', None)
if not return_url:
return_url = request.META.get('HTTP_REFERER', '/')
flow = _make_flow(request=request, scopes=scopes, return_url=return_url)
auth_url = flow.step1_get_authorize_url()
return shortcuts.redirect(auth_url)

390
tests/test_django_util.py Normal file
View File

@@ -0,0 +1,390 @@
# Copyright 2015 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import unittest
from django.conf.urls import include, url
from django.core import exceptions
from django import http
from django import test
import mock
from oauth2client.client import FlowExchangeError, OAuth2WebServerFlow
import django.conf
from oauth2client.contrib import django_util
from oauth2client.contrib.django_util import decorators
from oauth2client.contrib.django_util import site
from oauth2client.contrib.django_util import storage
from oauth2client.contrib.django_util import views
from six.moves.urllib import parse
urlpatterns = [
url(r'^oauth2/', include(site.urls))
]
urlpatterns += [url(r'^oauth2/', include(site.urls))]
class OAuth2SetupTest(unittest.TestCase):
@mock.patch("oauth2client.contrib.django_util.clientsecrets")
def test_settings_initialize(self, clientsecrets):
django.conf.settings.GOOGLE_OAUTH2_CLIENT_SECRETS_JSON = 'file.json'
clientsecrets.loadfile.return_value = (
clientsecrets.TYPE_WEB,
{
'client_id': 'myid',
'client_secret': 'hunter2'
}
)
oauth2_settings = django_util.OAuth2Settings(django.conf.settings)
self.assertTrue(clientsecrets.loadfile.called)
self.assertEqual(oauth2_settings.client_id, 'myid')
self.assertEqual(oauth2_settings.client_secret, 'hunter2')
@mock.patch("oauth2client.contrib.django_util.clientsecrets")
def test_settings_initialize_invalid_type(self, clientsecrets):
django.conf.settings.GOOGLE_OAUTH2_CLIENT_SECRETS_JSON = 'file.json'
clientsecrets.loadfile.return_value = (
"wrong_type",
{
'client_id': 'myid',
'client_secret': 'hunter2'
}
)
self.assertRaises(ValueError, django_util.OAuth2Settings.__init__,
object.__new__(django_util.OAuth2Settings), django.conf.settings)
@mock.patch("oauth2client.contrib.django_util.clientsecrets")
def test_no_settings(self, clientsecrets):
django.conf.settings.GOOGLE_OAUTH2_CLIENT_SECRETS_JSON = None
django.conf.settings.GOOGLE_OAUTH2_CLIENT_SECRET = None
django.conf.settings.GOOGLE_OAUTH2_CLIENT_ID = None
self.assertRaises(exceptions.ImproperlyConfigured, django_util.OAuth2Settings.__init__,
object.__new__(django_util.OAuth2Settings), django.conf.settings)
@mock.patch("oauth2client.contrib.django_util.clientsecrets")
def test_no_session_middleware(self, clientsecrets):
old_classes = django.conf.settings.MIDDLEWARE_CLASSES
django.conf.settings.MIDDLEWARE_CLASSES = ()
self.assertRaises(exceptions.ImproperlyConfigured,
django_util.OAuth2Settings.__init__, object.__new__(
django_util.OAuth2Settings),
django.conf.settings)
django.conf.settings.MIDDLEWARE_CLASSES = old_classes
class TestWithSession(test.TestCase):
def setUp(self):
self.factory = test.RequestFactory()
from django.contrib.sessions.backends.file import SessionStore
store = SessionStore()
store.save()
self.session = store
class OAuth2EnabledDecoratorTest(TestWithSession):
def test_no_credentials_without_credentials(self):
request = self.factory.get('/test')
request.session = self.session
@decorators.oauth_enabled
def test_view(request):
return http.HttpResponse("test") # pragma: NO COVER
response = test_view(request)
self.assertEquals(response.status_code, 200)
self.assertIsNotNone(request.oauth)
self.assertFalse(request.oauth.has_credentials())
self.assertIsNone(request.oauth.http)
@mock.patch("oauth2client.client.OAuth2Credentials")
def test_has_credentials_in_storage(self, OAuth2Credentials):
request = self.factory.get('/test')
request.session = mock.MagicMock()
credentials_mock = mock.Mock(
scopes=set(django.conf.settings.GOOGLE_OAUTH2_SCOPES))
credentials_mock.has_scopes.return_value = True
credentials_mock.invalid = False
OAuth2Credentials.from_json.return_value = credentials_mock
@decorators.oauth_enabled
def test_view(request):
return http.HttpResponse("test")
response = test_view(request)
self.assertEquals(response.status_code, 200)
self.assertEquals(response.content, b"test")
self.assertTrue(request.oauth.has_credentials())
self.assertIsNotNone(request.oauth.http)
@mock.patch("oauth2client.client.OAuth2Credentials")
def test_specified_scopes(self, OAuth2Credentials):
request = self.factory.get('/test')
request.session = mock.MagicMock()
credentials_mock = mock.Mock(
scopes=set(django.conf.settings.GOOGLE_OAUTH2_SCOPES))
credentials_mock.has_scopes = True
credentials_mock.is_valid = True
OAuth2Credentials.from_json.return_value = credentials_mock
@decorators.oauth_enabled(scopes=['additional-scope'])
def test_view(request):
return http.HttpResponse("hello world") # pragma: NO COVER
response = test_view(request)
self.assertEquals(response.status_code, 200)
self.assertIsNotNone(request.oauth)
self.assertFalse(request.oauth.has_credentials())
class OAuth2RequiredDecoratorTest(TestWithSession):
def test_redirects_without_credentials(self):
request = self.factory.get('/test')
request.session = self.session
@decorators.oauth_required
def test_view(request):
return http.HttpResponse("test") # pragma: NO COVER
response = test_view(request)
self.assertTrue(isinstance(response, http.HttpResponseRedirect))
self.assertEquals(parse.urlparse(response['Location']).path,
"/oauth2/oauth2authorize/")
self.assertTrue("return_url=%2Ftest" in parse.urlparse(response['Location']).query)
self.assertEquals(response.status_code, 302)
@mock.patch("oauth2client.contrib.django_util.UserOAuth2", autospec=True)
def test_has_credentials_in_storage(self, UserOAuth2):
request = self.factory.get('/test')
request.session = mock.MagicMock()
@decorators.oauth_required
def test_view(request):
return http.HttpResponse("test")
my_user_oauth = mock.MagicMock()
UserOAuth2.return_value = my_user_oauth
my_user_oauth.has_credentials.return_value = True
response = test_view(request)
self.assertEquals(response.status_code, 200)
self.assertEquals(response.content, b"test")
@mock.patch("oauth2client.client.OAuth2Credentials")
def test_has_credentials_in_storage_no_scopes(self, OAuth2Credentials):
request = self.factory.get('/test')
request.session = mock.MagicMock()
credentials_mock = mock.Mock(scopes=set(django.conf.settings.GOOGLE_OAUTH2_SCOPES))
credentials_mock.has_scopes.return_value = False
OAuth2Credentials.from_json.return_value = credentials_mock
@decorators.oauth_required
def test_view(request):
return http.HttpResponse("test") # pragma: NO COVER
response = test_view(request)
self.assertEquals(response.status_code, 302)
@mock.patch("oauth2client.client.OAuth2Credentials")
def test_specified_scopes(self, OAuth2Credentials):
request = self.factory.get('/test')
request.session = mock.MagicMock()
credentials_mock = mock.Mock(scopes=set(django.conf.settings.GOOGLE_OAUTH2_SCOPES))
credentials_mock.has_scopes = False
OAuth2Credentials.from_json.return_value = credentials_mock
@decorators.oauth_required(scopes=['additional-scope'])
def test_view(request):
return http.HttpResponse("hello world") # pragma: NO COVER
response = test_view(request)
self.assertEquals(response.status_code, 302)
class Oauth2AuthorizeTest(TestWithSession):
def test_authorize_works(self):
request = self.factory.get('oauth2/oauth2authorize')
request.session = self.session
response = views.oauth2_authorize(request)
self.assertTrue(isinstance(response, http.HttpResponseRedirect))
def test_authorize_works_explicit_return_url(self):
request = self.factory.get('oauth2/oauth2authorize', data={
'return_url': '/return_endpoint'
})
request.session = self.session
response = views.oauth2_authorize(request)
self.assertTrue(isinstance(response, http.HttpResponseRedirect))
class Oauth2CallbackTest(TestWithSession):
def setUp(self):
global mycallback
mycallback = mock.Mock()
super(Oauth2CallbackTest, self).setUp()
self.CSRF_TOKEN = "token"
self.RETURN_URL = "http://return-url.com"
self.fake_state = {
'csrf_token': self.CSRF_TOKEN,
'return_url': self.RETURN_URL,
'scopes': django.conf.settings.GOOGLE_OAUTH2_SCOPES
}
@mock.patch("oauth2client.contrib.django_util.views.pickle")
def test_callback_works(self, pickle):
request = self.factory.get('oauth2/oauth2callback', data={
"state": json.dumps(self.fake_state),
"code": 123
})
self.session['google_oauth2_csrf_token'] = self.CSRF_TOKEN
flow = OAuth2WebServerFlow(
client_id='clientid',
client_secret='clientsecret',
scope=['email'],
state=json.dumps(self.fake_state),
redirect_uri=request.build_absolute_uri("oauth2/oauth2callback"))
self.session['google_oauth2_flow_{0}'.format(self.CSRF_TOKEN)] \
= pickle.dumps(flow)
flow.step2_exchange = mock.Mock()
pickle.loads.return_value = flow
request.session = self.session
response = views.oauth2_callback(request)
self.assertTrue(isinstance(response, http.HttpResponseRedirect))
self.assertEquals(response.status_code, 302)
self.assertEquals(response['Location'], self.RETURN_URL)
@mock.patch("oauth2client.contrib.django_util.views.pickle")
def test_callback_handles_bad_flow_exchange(self, pickle):
request = self.factory.get('oauth2/oauth2callback', data={
"state": json.dumps(self.fake_state),
"code": 123
})
self.session['google_oauth2_csrf_token'] = self.CSRF_TOKEN
flow = OAuth2WebServerFlow(
client_id='clientid',
client_secret='clientsecret',
scope=['email'],
state=json.dumps(self.fake_state),
redirect_uri=request.build_absolute_uri("oauth2/oauth2callback"))
self.session['google_oauth2_flow_{0}'.format(self.CSRF_TOKEN)]\
= pickle.dumps(flow)
def local_throws(code):
raise FlowExchangeError("test")
flow.step2_exchange = local_throws
pickle.loads.return_value = flow
request.session = self.session
response = views.oauth2_callback(request)
self.assertTrue(isinstance(response, http.HttpResponseBadRequest))
def test_error_returns_bad_request(self):
request = self.factory.get('oauth2/oauth2callback', data={
"error": "There was an error in your authorization.",
})
response = views.oauth2_callback(request)
self.assertTrue(isinstance(response, http.HttpResponseBadRequest))
self.assertTrue(b"Authorization failed" in response.content)
def test_no_session(self):
request = self.factory.get('oauth2/oauth2callback', data={
"code": 123,
"state": json.dumps(self.fake_state)
})
request.session = self.session
response = views.oauth2_callback(request)
self.assertTrue(isinstance(response, http.HttpResponseBadRequest))
self.assertEquals(response.content, b'No existing session for this flow.')
def test_missing_state_returns_bad_request(self):
request = self.factory.get('oauth2/oauth2callback', data={
"code": 123
})
self.session['google_oauth2_csrf_token'] = "token"
request.session = self.session
response = views.oauth2_callback(request)
self.assertTrue(isinstance(response, http.HttpResponseBadRequest))
def test_bad_state(self):
request = self.factory.get('oauth2/oauth2callback', data={
"code": 123,
"state": json.dumps({"wrong": "state"})
})
self.session['google_oauth2_csrf_token'] = "token"
request.session = self.session
response = views.oauth2_callback(request)
self.assertTrue(isinstance(response, http.HttpResponseBadRequest))
self.assertEquals(response.content, b'Invalid state parameter.')
def test_bad_csrf(self):
request = self.factory.get('oauth2/oauth2callback', data={
"state": json.dumps(self.fake_state),
"code": 123
})
self.session['google_oauth2_csrf_token'] = "WRONG TOKEN"
request.session = self.session
response = views.oauth2_callback(request)
self.assertTrue(isinstance(response, http.HttpResponseBadRequest))
self.assertEquals(response.content, b'Invalid CSRF token.')
def test_no_saved_flow(self):
request = self.factory.get('oauth2/oauth2callback', data={
"state": json.dumps(self.fake_state),
"code": 123
})
self.session['google_oauth2_csrf_token'] = self.CSRF_TOKEN
self.session['google_oauth2_flow_{0}'.format(self.CSRF_TOKEN)] = None
request.session = self.session
response = views.oauth2_callback(request)
self.assertTrue(isinstance(response, http.HttpResponseBadRequest))
self.assertEquals(response.content, b'Missing Oauth2 flow.')
class StorageTest(TestWithSession):
def test_session_delete(self):
self.session[storage._CREDENTIALS_KEY] = "test_val"
django_storage = storage.DjangoSessionStorage(self.session)
django_storage.delete()
self.assertIsNone(self.session.get(storage._CREDENTIALS_KEY))
def test_session_delete_nothing(self):
django_storage = storage.DjangoSessionStorage(self.session)
django_storage.delete()

View File

@@ -14,6 +14,7 @@ deps = {[testenv]basedeps}
django
setenv =
pypy: with_gmp=no
DJANGO_SETTINGS_MODULE=tests.test_django_settings
commands = nosetests --ignore-files=test_appengine\.py {posargs}
[coverbase]
@@ -50,8 +51,11 @@ commands =
--ignore-files=test_appengine\.py \
--ignore-files=test_django_orm\.py \
--ignore-files=test_django_settings\.py \
--ignore-files=test_django_util\.py \
--exclude-dir=oauth2client/contrib/django_util \
{posargs}
deps = {[testenv]basedeps}
nose-exclude
[testenv:py33]
basepython =
@@ -61,8 +65,11 @@ commands =
--ignore-files=test_appengine\.py \
--ignore-files=test_django_orm\.py \
--ignore-files=test_django_settings\.py \
--ignore-files=test_django_util\.py \
--exclude-dir=oauth2client/contrib/django_util \
{posargs}
deps = {[testenv]basedeps}
nose-exclude
[testenv:cover]
basepython = {[coverbase]basepython}